バッチ処理に特化した Python のデータパイプライン構築用のフレームワークに Luigi がある。
今回は、特定の時系列的な範囲を Task
が受け取るのに使える DateIntervalParameter
というパラメータを紹介する。
これは、たとえば一週間とか一ヶ月あるいは特定の日付から日付といった範囲で、何らかの集計をする処理を書くときに便利に使える。
使った環境は次のとおり。
$ sw_vers ProductName: macOS ProductVersion: 11.6 BuildVersion: 20G165 $ python -V Python 3.9.7 $ pip list | grep -i luigi luigi 3.0.3
もくじ
下準備
あらかじめ、Luigi をインストールしておく。
$ pip install luigi
DateIntervalParameter について
早速だけど以下にサンプルコードを示す。
以下では ExampleTask
というタスクを定義している。
このタスクは dt_interval
という名前で DateIntervalParameter
型のパラメータを受け取る。
タスクが実行されると DateIntervalParameter#dates()
メソッドを呼んで、範囲に含まれる日付を標準出力に書き出す。
#!/usr/bin/env python3 # -*- coding: utf-8 -*- import luigi class ExampleTask(luigi.Task): # 期間を指定できるパラメータ dt_interval = luigi.DateIntervalParameter() def run(self): for date in self.dt_interval.dates(): # 各日付に適用する擬似的な処理 print(f'Processing: {date}') def complete(self): # 動作確認用に output() メソッドを定義しないで常にタスクが実行されるようにする return False
上記を実行してみよう。 まずは "2021-W01" という文字列を渡してみる。 これは 2021 年の第 01 週を表している。
$ python -m luigi \
--local-scheduler \
--module example \
ExampleTask \
--dt-interval "2021-W01"
... (snip) ...
Processing: 2021-01-04
Processing: 2021-01-05
Processing: 2021-01-06
Processing: 2021-01-07
Processing: 2021-01-08
Processing: 2021-01-09
Processing: 2021-01-10
... (snip)
上記を見ると 2021-01-04 から 2021-01-10 が、指定した 2021-W01 に含まれる日付ということがわかる。
同様に "2021-01" という文字列を渡してみよう。 これは 2021 年の 1 月を表している。
$ python -m luigi \
--local-scheduler \
--module example \
ExampleTask \
--dt-interval "2021-01"
... (snip) ...
Processing: 2021-01-01
Processing: 2021-01-02
Processing: 2021-01-03
... (snip) ...
Processing: 2021-01-29
Processing: 2021-01-30
Processing: 2021-01-31
... (snip)
数が多いので省略しているけど、2021-01-01 から 2021-01-31 が範囲に含まれることがわかる。
また、ISO 8601 形式の日付をハイフン (-
) でつなぐと、任意の日付の範囲が指定できる。
たとえば 2021-09-01 から 2021-09-07 を指定してみよう。
$ python -m luigi \
--local-scheduler \
--module example \
ExampleTask \
--dt-interval "2021-09-01-2021-09-07"
... (snip) ...
Processing: 2021-09-01
Processing: 2021-09-02
Processing: 2021-09-03
Processing: 2021-09-04
Processing: 2021-09-05
Processing: 2021-09-06
... (snip)
末尾の日付は含まれずに、2021-09-01 から 2021-09-06 が範囲に含まれていることがわかる。
このように DateIntervalParameter
を使うと、特定の日付の範囲を受け取る処理が書きやすい。
典型的には、開始日と終了日を個別に取っていたような処理を置き換えることができる。
動作原理
使い方はわかったので、この DateIntervalParameter
というパラメータが、どのように実現されているのか見ていこう。
以下のサンプルコードでは、受け取った dt_interval
の型を表示している。
#!/usr/bin/env python3 # -*- coding: utf-8 -*- import luigi class ExampleTask(luigi.Task): dt_interval = luigi.DateIntervalParameter() def run(self): # dt_interval の型を表示する print(f'*** Type of dt_interval: {type(self.dt_interval)} ***') def complete(self): return False
上記に "2021-W01" という文字列を渡すと luigi.date_interval.Week
という型になっていることが確認できる。
$ python -m luigi \ --local-scheduler \ --module example \ ExampleTask \ --dt-interval "2021-W01" ... (snip) ... *** Type of dt_interval: <class 'luigi.date_interval.Week'> *** ... (snip)
同様に、"2021-01" を渡したときでは luigi.date_interval.Month
になる。
$ python -m luigi \ --local-scheduler \ --module example \ ExampleTask \ --dt-interval "2021-01" ... (snip) ... *** Type of dt_interval: <class 'luigi.date_interval.Month'> *** ... (snip)
以下、実行については省略しつつ、以下のように対応している。
<年-月-日>
luigi.date_interval.Date
<年-W週>
luigi.date_interval.Week
<年-月>
luigi.date_interval.Month
<年-月-日>-<年-月-日>
luigi.date_interval.Custom
ここからは Python のインタプリタを使って確認していこう。
$ python
luigi.date_interval
をインポートする。
>>> from luigi import date_interval
たとえば luigi.date_interval.Week
をインスタンス化してみよう。
このクラスには年と週数を渡す必要がある。
>>> from pprint import pprint >>> week_interval = date_interval.Week(2021, 1)
ちなみに、実行時と同じように文字列を使ってインスタンス化するときは parse()
メソッドを使えば良い。
>>> date_interval.Week.parse('2021-W01') 2021-W01
このオブジェクトには、先ほどのサンプルコードでも登場したように dates()
というメソッドがある。
このメソッドは、指定された期間に含まれる日付を返す。
>>> pprint(week_interval.dates()) [datetime.date(2021, 1, 4), datetime.date(2021, 1, 5), datetime.date(2021, 1, 6), datetime.date(2021, 1, 7), datetime.date(2021, 1, 8), datetime.date(2021, 1, 9), datetime.date(2021, 1, 10)]
また、next()
メソッドを使うと次の期間が、prev()
メソッドを使うと前の期間が得られる。
>>> week_interval.next() 2021-W02 >>> week_interval.prev() 2020-W53
これらのメソッドは、Week
以外にも Month
や Custom
などでそれぞれ実装されている。
ちなみに DateIntervalParameter
自体は、受け取った文字列をそれぞれのクラスの parse()
に順番に渡す実装になっている。
いじょう。