今回は、Luigi で複数のタスクが共通のパラメータを扱う方法について考えてみる。 ここらへん、調べてもあまりドキュメントなどが出てこなかった。 なので、ソースコードを読んでリバースエンジニアリング的に「こういう風にできそう」と判明した内容を書いてみる。 使う API のレイヤー的に、高レベルなやり方と低レベルなやり方が見つかったので、どちらも記載する。
使った環境は次のとおり。
$ sw_vers ProductName: macOS ProductVersion: 11.4 BuildVersion: 20F71 $ python -V Python 3.9.5 $ pip list | grep -i luigi luigi 3.0.3
もくじ
下準備
まずは、下準備として Luigi をインストールしておく。
$ pip install luigi
低レベル API (luigi.configuration.get_config()) を使う
まずは低レベル API の luigi.configuration.get_config()
を使うやり方から。
この API を使うと、Luigi の設定ファイルを辞書形式でそのまま読み込むことができる。
読み込んだコンフィグは、どのタスクから利用することもできるため共通のパラメータを扱うことができる。
以下にサンプルコードを示す。
サンプルコードには TaskA
と TaskB
という、2 つのタスクを定義している。
この中では、それぞれ設定ファイルから [SharedConfig]
セクションの shared_param
パラメータを読み込んで使っている。
#!/usr/bin/env python3 # -*- coding: utf-8 -*- import luigi # 設定を取得するための API from luigi.configuration import get_config class NoOutputTask(luigi.Task): """output() を介さずに完了を制御するテスト用のタスク NOTE: 今回のサンプルコードの中では本質的な内容ではない""" # タスクが完了しているかを示すフラグ done = False def run(self): # run() が一度でも実行されたら完了フラグを立てる self.done = True def complete(self): # タスクの完了はフラグで判断する return self.done class TaskA(NoOutputTask): def run(self): super().run() # SharedConfig セクションのパラメータを取得する section_dict = dict(get_config().items('SharedConfig')) # パラメータの内容を標準出力に書き出す (ほんとは output() に書くべき) print('Hello,', section_dict['shared_param'], 'by TaskA') class TaskB(NoOutputTask): def run(self): super().run() # 同じパラメータを使う section_dict = dict(get_config().items('SharedConfig')) print('Hello,', section_dict['shared_param'], 'by TaskB') class Wrapper(luigi.WrapperTask): """上記で定義した 2 つのタスクをキックするためだけのタスク""" def requires(self): yield TaskA() yield TaskB() if __name__ == '__main__': luigi.run(main_task_cls=Wrapper, local_scheduler=True)
次のように設定ファイルを用意しよう。
$ cat << 'EOF' > luigi.cfg [SharedConfig] shared_param=World EOF
あとは実行するだけ。
$ python lowlayer.py ... Hello, World by TaskB ... Hello, World by TaskA ... ===== Luigi Execution Summary ===== Scheduled 3 tasks of which: * 3 ran successfully: - 1 TaskA() - 1 TaskB() - 1 Wrapper() This progress looks :) because there were no failed tasks or missing dependencies ===== Luigi Execution Summary =====
上記の結果からわかるように、タスクが出力するメッセージの中に設定ファイルで指定した値が使われている。
高レベル API (luigi.Config) を使う
続いては高レベル API の luigi.Config
を使うパターン。
こちらは luigi.Config
というクラスを継承したクラスを定義する。
設定ファイからは、定義したクラスと同名のセクション経由でパラメータを設定できる。
複数のタスクからは、クラスをインスタンス化してやればパラメータがインジェクションされて得られる。
以下にサンプルコードを示す。
先ほどのサンプルコードから SharedConfig
というクラスが増えている。
そして、TaskA
と TaskB
は SharedConfig
をインスタンス化して shared_param
パラメータにアクセスしている。
#!/usr/bin/env python3 # -*- coding: utf-8 -*- import luigi class SharedConfig(luigi.Config): """複数のタスクから参照される共通のパラメータ""" shared_param = luigi.Parameter() class NoOutputTask(luigi.Task): """output() を介さずに完了を制御するテスト用のタスク NOTE: 今回のサンプルコードの中では本質的な内容ではない""" # タスクが完了しているかを示すフラグ done = False def run(self): # run() が一度でも実行されたら完了フラグを立てる self.done = True def complete(self): # タスクの完了はフラグで判断する return self.done class TaskA(NoOutputTask): def run(self): super().run() # SharedConfig をインスタンス化してパラメータを取り出す # パラメータは luigi の設定ファイルで指定できる print('Hello,', SharedConfig().shared_param, 'by TaskA') class TaskB(NoOutputTask): def run(self): super().run() # こちらも同様 print('Hello,', SharedConfig().shared_param, 'by TaskB') class Wrapper(luigi.WrapperTask): """上記で定義した 2 つのタスクをキックするためだけのタスク""" def requires(self): yield TaskA() yield TaskB() if __name__ == '__main__': luigi.run(main_task_cls=Wrapper, local_scheduler=True)
設定ファイルは先ほどと同じで良い。 セクション名とパラメータ名が同じになるようにクラスを定義してあるため。 作り直すなら次のようにする。
$ cat << 'EOF' > luigi.cfg [SharedConfig] shared_param=World EOF
実行してみよう。
$ python highlayer.py ... Hello, World by TaskB ... Hello, World by TaskA ... ===== Luigi Execution Summary ===== Scheduled 3 tasks of which: * 3 ran successfully: - 1 TaskA() - 1 TaskB() - 1 Wrapper() This progress looks :) because there were no failed tasks or missing dependencies ===== Luigi Execution Summary =====
上記の実行結果から、ちゃんとパラメータを参照できていることがわかる。
ところで、上記で使った luigi.Config
というクラス、定義を見ると面白いことがわかる。
以下がそのソースコード。
なんと、luigi.Config
は luigi.Task
を継承しているだけで、他に何もしていない。
つまり、ほとんど同一のものということになる。
実は、luigi.Config
を使わなくても、luigi.Task
でも同じことはできるのだ。
いじょう。