CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: Luigi の RangeDaily 系の使い方と注意点について

Python の Luigi はバッチ処理に特化したデータパイプライン構築用のフレームワーク。 バッチ処理に特化しているとあって、定期的に実行する系のユーティリティも色々と用意されている。 今回は、その中でも特定の期間に実行すべきバッチ処理をまとめて扱うことのできる、RangeDaily を代表としたクラス群について書いてみる。

使った環境は次のとおり。

$ sw_vers
ProductName:    macOS
ProductVersion: 11.6
BuildVersion:   20G165
$ python -V
Python 3.9.7
$ pip list | grep -i luigi    
luigi                    3.0.3

もくじ

下準備

まずは Luigi をインストールしておく。

$ pip install luigi

RangeDaily について

RangeDaily は、日付を受け取って何らかの処理をする Task をまとめて扱うことのできるラッパータスク。 これを使うと、一日ずつ手動でタスクを実行する代わりに、開始期間 (と終了期間) を指定して一気に処理が実行できる。 とはいえ、文章で説明してもあんまりイメージできないと思うのでサンプルコードを使って説明していく。

以下のサンプルコードでは MyDailyTask というタスクを定義している。 このタスクは date というパラメータ名で日付を受け取る。 実行すると、/tmp 以下に受け取った日付を名前に含んだファイルを作成する。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi


class MyDailyTask(luigi.Task):
    """日付を受け取って、名前に日付を含むファイルを生成するタスク"""
    date = luigi.DateParameter()

    def run(self):
        with self.output().open(mode='w') as fp:
            print('Hello, World!', file=fp)

    def output(self):
        path = '/tmp/luigi-{date:%Y-%m-%d}'.format(date=self.date)
        return luigi.LocalTarget(path)

上記を example.py という名前で保存しよう。

通常なら、上記のタスクを実行するには次のようにする。 Luigi のスケジューラに、タスクを表したクラスと、実行したい日付を --date パラメータで指定する。

$ python -m luigi \
    --local-scheduler \
    --module example \
    MyDailyTask \
    --date 2021-09-01

... (snip) ...

===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 MyDailyTask(date=2021-09-01)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

実行すると、次のようにファイルが作られる。

$ ls /tmp/luigi-2021-09-01  
/tmp/luigi-2021-09-01
$ cat /tmp/luigi-2021-09-01 
Hello, World!

上記は一日ずつ処理する場合の例になる。

続いては、今回の主題である RangeDaily を使ってみよう。 RangeDaily を使うと、この日付以降をまとめて実行する、といったことができる。 以下は --start オプションを使って 2021-09-01 以降を一気に実行する場合の例。 実行するタスクとしては RangeDaily を指定して、--of パラメータで自分で定義したタスクを指定する。

$ python -m luigi \
    --local-scheduler \
    RangeDaily \
    --module example \
    --of MyDailyTask \
    --start 2021-09-01

... (snip) ...

===== Luigi Execution Summary =====

Scheduled 19 tasks of which:
* 19 ran successfully:
    - 18 MyDailyTask(date=2021-09-02...2021-09-19)
    - 1 RangeDaily(...)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

上記の実行結果を見ると、指定した日付以降を一括で実行できていることがわかる。 なお、これを実行しているシステムの日付は 2021-09-19 である。

$ date "+%Y-%m-%d"
2021-09-19

ディレクトリを確認すると 2021-09-01 から 2021-09-19 の範囲でファイルができている。 なお、2021-09-01 については最初に単発で実行したもの。

$ ls /tmp/luigi-* | head -n 1
/tmp/luigi-2021-09-01
$ ls /tmp/luigi-* | tail -n 1
/tmp/luigi-2021-09-19

試しにもう一度同じコマンドを実行してみよう。 すると、いずれのタスクも実行されないことがわかる。 これは、既にタスクが成果物としているファイルが存在するため。

$ python -m luigi \
    --local-scheduler \
    RangeDaily \
    --module example \
    --of MyDailyTask \
    --start 2021-09-01

... (snip) ...

===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 complete ones were encountered:
    - 1 RangeDaily(...)

Did not run any tasks
This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

この特性はバッチ処理を扱う上でなかなか便利だったりする。 というのも、何らかの理由で成果物のファイルが日付として歯抜けになっている場合、自動で存在しない日付を検出して実行してくれる。 また、日付の最後も特に指定しない限り今日になるので、バッチ処理として仕込むコマンドに RangeDaily を指定することもできる。

注意点について

続いては RangeDaily の注意点について。 端的に言うと、現時刻から遡れる日数とタスク数に上限が設定されている。

たとえば開始の日付として年初を指定して実行してみよう。

$ python -m luigi \
    --local-scheduler \
    RangeDaily \
    --module example \
    --of MyDailyTask \
    --start 2021-01-01

... (snip) ...

===== Luigi Execution Summary =====

Scheduled 51 tasks of which:
* 51 ran successfully:
    - 50 MyDailyTask(date=2021-06-12...2021-07-31)
    - 1 RangeDaily(...)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

上記の実行結果 (Luigi Execution Summary) を見て違和感を覚えたかもしれない。 というのも、作成されたファイルの日付が 2021-06-12 から始まっているため。

$ ls /tmp/luigi-* | sort | head -n 5
/tmp/luigi-2021-06-12
/tmp/luigi-2021-06-13
/tmp/luigi-2021-06-14
/tmp/luigi-2021-06-15
/tmp/luigi-2021-06-16

また、末尾についても 2021-07-31 で終わっている。

$ ls /tmp/luigi-* | sort | tail -n 25 | head -n 10
/tmp/luigi-2021-07-26
/tmp/luigi-2021-07-27
/tmp/luigi-2021-07-28
/tmp/luigi-2021-07-29
/tmp/luigi-2021-07-30
/tmp/luigi-2021-07-31
/tmp/luigi-2021-09-01
/tmp/luigi-2021-09-02
/tmp/luigi-2021-09-03
/tmp/luigi-2021-09-04

本来なら 2021-01-01 から歯抜けなくファイルができてほしかった。 どうして、こんなことが起こるのか。

この問題はドキュメントを見るとわかる。

luigi.readthedocs.io

遡れる日数の上限 (--days-back)

まず、RangeDailyBase という、RangeDaily の基底クラスには days_back というパラメータがある。 このパラメータは、現在の日時から遡る日付の上限を定めていて、デフォルトで 100 に設定されている。

確認すると、今日から 100 日前は 2021-06-11 だった。 つまり、その日付を含む過去は遡って実行されないようになっている。

$ brew install coreutils
$ gdate --iso-8601 --date '100 days ago'
2021-06-11

試しに --days-back パラメータに 365 を指定して、上限を 1 年まで広げてみよう。

$ python -m luigi \
    --local-scheduler \
    RangeDaily \
    --module example \
    --of MyDailyTask \
    --start 2021-01-01 \
    --days-back 365

... (snip) ...

===== Luigi Execution Summary =====

Scheduled 51 tasks of which:
* 51 ran successfully:
    - 50 MyDailyTask(date=2021-01-01...2021-02-19)
    - 1 RangeDaily(...)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

すると、上記を見てわかるとおり年初から実行されるようになった。

実行できるタスク数の上限 (--task-limit)

同様に、一度に実行できるタスクの数にも上限がある。 こちらは --task-limit というパラメータで指定できる。 デフォルトでは 50 に設定されている。

試しに、タスク数の上限も 365 に引きあげて実行してみよう。

$ python -m luigi \
    --local-scheduler \
    RangeDaily \
    --module example \
    --of MyDailyTask \
    --start 2021-01-01 \
    --days-back 365 \
    --task-limit 365

... (snip) ...

===== Luigi Execution Summary =====

Scheduled 144 tasks of which:
* 144 ran successfully:
    - 143 MyDailyTask(date=2021-02-20,2021-02-21,2021-02-22,...)
    - 1 RangeDaily(...)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

上記を見てわかるとおり、これまでの上限だった 50 を越えてタスクが実行されている。

ファイルの数を検算しても、年初から今日までの日数と一致した。

$ ls /tmp/luigi-* | wc -l
     262
$ gdate --iso-8601 --date '2021-01-01 261 days'
2021-09-19

確認がおわったら、一旦作られたファイルをすべて削除しておこう。

$ rm /tmp/luigi-*

一ヶ月毎の処理 (RangeMonthly)

ちなみに、タイトルに RangeDaily と書いたように、日次のバッチ以外を扱うためのクラスも用意されている。

以下のサンプルコードでは特定の月に対して実行することを想定した MyMonthlyTask というタスクを定義している。 パラメータの名前は date だけど、型が luigi.DateParameter ではなく luigi.MonthParameter になっている。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi


class MyMonthlyTask(luigi.Task):
    """月を受け取って、名前に月を含むファイルを生成するタスク"""
    date = luigi.MonthParameter()

    def run(self):
        with self.output().open(mode='w') as fp:
            print('Hello, World!', file=fp)

    def output(self):
        path = '/tmp/luigi-{month:%Y-%m}'.format(month=self.date)
        return luigi.LocalTarget(path)

先ほどと同じように example.py という名前で保存しておこう。

上記を RangeMonthly 経由で実行する。

$ python -m luigi \
    --local-scheduler \
    RangeMonthly \
    --module example \
    --of MyMonthlyTask \
    --start 2021-01

... (snip) ...

===== Luigi Execution Summary =====

Scheduled 9 tasks of which:
* 9 ran successfully:
    - 8 MyMonthlyTask(date=2021-01...2021-08)
    - 1 RangeMonthly(...)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

上記を見ると、今月を含まない形でタスクが実行されていることがわかる。 これは典型的には日付が揃わないうちに集計処理を実行することは少ないことが関係しているんだろう。

先ほどと同じように、確認できたらファイルを一旦きれいにしておく。

$ rm /tmp/luigi-*

一時間毎の処理 (RangeHourly)

同じように一時間毎の処理も扱うことができる。 以下では一時間毎に実行することを期待した処理を MyHourlyTask という名前で定義している。 パラメータの名前は date だけど、クラスは luigi.DateHourParameter を用いる。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi


class MyHourlyTask(luigi.Task):
    """時間 (Hour) を受け取って、名前に時間を含むファイルを生成するタスク"""
    date = luigi.DateHourParameter()

    def run(self):
        with self.output().open(mode='w') as fp:
            print('Hello, World!', file=fp)

    def output(self):
        path = '/tmp/luigi-{month:%Y-%m-%dT%H}'.format(month=self.date)
        return luigi.LocalTarget(path)

一時間ごとの処理になるとファイルが増えるので、今回は明示的に終了の時刻も指定しておこう。 以下では --start2021-09-15T00 を、--stop2021-09-16T12 を指定している。 T 以降が時間を表している。

$ python -m luigi \
    --local-scheduler \
    RangeHourly \
    --module example \
    --of MyHourlyTask \
    --start 2021-09-15T00 \
    --stop 2021-09-16T12

... (snip) ...

===== Luigi Execution Summary =====

Scheduled 37 tasks of which:
* 37 ran successfully:
    - 36 MyHourlyTask(date=2021-09-15T00...2021-09-16T11)
    - 1 RangeHourly(...)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

結果を見てわかるとおり、--start の時刻を含んで --stop を含まない形でタスクが実行されていることがわかる。

$ ls /tmp/luigi-*  
/tmp/luigi-2021-09-15T00    /tmp/luigi-2021-09-15T12    /tmp/luigi-2021-09-16T00
/tmp/luigi-2021-09-15T01    /tmp/luigi-2021-09-15T13    /tmp/luigi-2021-09-16T01
/tmp/luigi-2021-09-15T02    /tmp/luigi-2021-09-15T14    /tmp/luigi-2021-09-16T02
/tmp/luigi-2021-09-15T03    /tmp/luigi-2021-09-15T15    /tmp/luigi-2021-09-16T03
/tmp/luigi-2021-09-15T04    /tmp/luigi-2021-09-15T16    /tmp/luigi-2021-09-16T04
/tmp/luigi-2021-09-15T05    /tmp/luigi-2021-09-15T17    /tmp/luigi-2021-09-16T05
/tmp/luigi-2021-09-15T06    /tmp/luigi-2021-09-15T18    /tmp/luigi-2021-09-16T06
/tmp/luigi-2021-09-15T07    /tmp/luigi-2021-09-15T19    /tmp/luigi-2021-09-16T07
/tmp/luigi-2021-09-15T08    /tmp/luigi-2021-09-15T20    /tmp/luigi-2021-09-16T08
/tmp/luigi-2021-09-15T09    /tmp/luigi-2021-09-15T21    /tmp/luigi-2021-09-16T09
/tmp/luigi-2021-09-15T10    /tmp/luigi-2021-09-15T22    /tmp/luigi-2021-09-16T10
/tmp/luigi-2021-09-15T11    /tmp/luigi-2021-09-15T23    /tmp/luigi-2021-09-16T11

まとめ

今回は Luigi で特定の期間毎に実行するバッチ処理を、まとめて扱う RangeDaily 系の使い方と注意点について紹介した。 注意点としては、遡る日付や実行するタスクの数に上限がある点や、期間によって開始・終了を含む・含まないが微妙に異なる点が挙げられる。