CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: Luigi の DateIntervalParameter について

バッチ処理に特化した Python のデータパイプライン構築用のフレームワークに Luigi がある。 今回は、特定の時系列的な範囲を Task が受け取るのに使える DateIntervalParameter というパラメータを紹介する。 これは、たとえば一週間とか一ヶ月あるいは特定の日付から日付といった範囲で、何らかの集計をする処理を書くときに便利に使える。

使った環境は次のとおり。

$ sw_vers
ProductName:    macOS
ProductVersion: 11.6
BuildVersion:   20G165
$ python -V        
Python 3.9.7
$ pip list | grep -i luigi
luigi                    3.0.3

もくじ

下準備

あらかじめ、Luigi をインストールしておく。

$ pip install luigi

DateIntervalParameter について

早速だけど以下にサンプルコードを示す。

以下では ExampleTask というタスクを定義している。 このタスクは dt_interval という名前で DateIntervalParameter 型のパラメータを受け取る。 タスクが実行されると DateIntervalParameter#dates() メソッドを呼んで、範囲に含まれる日付を標準出力に書き出す。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-


import luigi


class ExampleTask(luigi.Task):
    # 期間を指定できるパラメータ
    dt_interval = luigi.DateIntervalParameter()

    def run(self):
        for date in self.dt_interval.dates():
            # 各日付に適用する擬似的な処理
            print(f'Processing: {date}')

    def complete(self):
        # 動作確認用に output() メソッドを定義しないで常にタスクが実行されるようにする
        return False

上記を実行してみよう。 まずは "2021-W01" という文字列を渡してみる。 これは 2021 年の第 01 週を表している。

$ python -m luigi \
    --local-scheduler \
    --module example \
    ExampleTask \
    --dt-interval "2021-W01"

... (snip) ...

Processing: 2021-01-04
Processing: 2021-01-05
Processing: 2021-01-06
Processing: 2021-01-07
Processing: 2021-01-08
Processing: 2021-01-09
Processing: 2021-01-10

... (snip)

上記を見ると 2021-01-04 から 2021-01-10 が、指定した 2021-W01 に含まれる日付ということがわかる。

同様に "2021-01" という文字列を渡してみよう。 これは 2021 年の 1 月を表している。

$ python -m luigi \
    --local-scheduler \
    --module example \
    ExampleTask \
    --dt-interval "2021-01"

... (snip) ...

Processing: 2021-01-01
Processing: 2021-01-02
Processing: 2021-01-03
... (snip) ...
Processing: 2021-01-29
Processing: 2021-01-30
Processing: 2021-01-31

... (snip)

数が多いので省略しているけど、2021-01-01 から 2021-01-31 が範囲に含まれることがわかる。

また、ISO 8601 形式の日付をハイフン (-) でつなぐと、任意の日付の範囲が指定できる。 たとえば 2021-09-01 から 2021-09-07 を指定してみよう。

$ python -m luigi \
    --local-scheduler \
    --module example \
    ExampleTask \
    --dt-interval "2021-09-01-2021-09-07"

... (snip) ...

Processing: 2021-09-01
Processing: 2021-09-02
Processing: 2021-09-03
Processing: 2021-09-04
Processing: 2021-09-05
Processing: 2021-09-06

... (snip)

末尾の日付は含まれずに、2021-09-01 から 2021-09-06 が範囲に含まれていることがわかる。

このように DateIntervalParameter を使うと、特定の日付の範囲を受け取る処理が書きやすい。 典型的には、開始日と終了日を個別に取っていたような処理を置き換えることができる。

動作原理

使い方はわかったので、この DateIntervalParameter というパラメータが、どのように実現されているのか見ていこう。

以下のサンプルコードでは、受け取った dt_interval の型を表示している。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-


import luigi


class ExampleTask(luigi.Task):
    dt_interval = luigi.DateIntervalParameter()

    def run(self):
        # dt_interval の型を表示する
        print(f'*** Type of dt_interval: {type(self.dt_interval)} ***')

    def complete(self):
        return False

上記に "2021-W01" という文字列を渡すと luigi.date_interval.Week という型になっていることが確認できる。

$ python -m luigi \
    --local-scheduler \
    --module example \
    ExampleTask \
    --dt-interval "2021-W01"

... (snip) ...

*** Type of dt_interval: <class 'luigi.date_interval.Week'> ***

... (snip)

同様に、"2021-01" を渡したときでは luigi.date_interval.Month になる。

$ python -m luigi \
    --local-scheduler \
    --module example \
    ExampleTask \
    --dt-interval "2021-01"

... (snip) ...

*** Type of dt_interval: <class 'luigi.date_interval.Month'> ***

... (snip)

以下、実行については省略しつつ、以下のように対応している。

  • <年-月-日>
    • luigi.date_interval.Date
  • <年-W週>
    • luigi.date_interval.Week
  • <年-月>
    • luigi.date_interval.Month
  • <年-月-日>-<年-月-日>
    • luigi.date_interval.Custom

ここからは Python のインタプリタを使って確認していこう。

$ python

luigi.date_interval をインポートする。

>>> from luigi import date_interval

たとえば luigi.date_interval.Week をインスタンス化してみよう。 このクラスには年と週数を渡す必要がある。

>>> from pprint import pprint
>>> week_interval = date_interval.Week(2021, 1)

ちなみに、実行時と同じように文字列を使ってインスタンス化するときは parse() メソッドを使えば良い。

>>> date_interval.Week.parse('2021-W01')
2021-W01

このオブジェクトには、先ほどのサンプルコードでも登場したように dates() というメソッドがある。 このメソッドは、指定された期間に含まれる日付を返す。

>>> pprint(week_interval.dates())
[datetime.date(2021, 1, 4),
 datetime.date(2021, 1, 5),
 datetime.date(2021, 1, 6),
 datetime.date(2021, 1, 7),
 datetime.date(2021, 1, 8),
 datetime.date(2021, 1, 9),
 datetime.date(2021, 1, 10)]

また、next() メソッドを使うと次の期間が、prev() メソッドを使うと前の期間が得られる。

>>> week_interval.next()
2021-W02
>>> week_interval.prev()
2020-W53

これらのメソッドは、Week 以外にも MonthCustom などでそれぞれ実装されている。

ちなみに DateIntervalParameter 自体は、受け取った文字列をそれぞれのクラスの parse() に順番に渡す実装になっている。

luigi.readthedocs.io

いじょう。