Luigi は、Python を使って実装された、バッチ処理のパイプラインを扱うためのフレームワーク。 Luigi でパイプラインを定義するときは、基本的には個別のタスクを依存関係でつないでいくことになる。 このとき、扱う処理によってはパイプラインは長大になると共に扱うパラメータの数も増える。 そうすると、依存関係で上流にあるタスクに対して、どのようにパラメータを渡すか、という問題が生じる。
この問題は、公式のドキュメントではパラメータ爆発 (parameter explosion) と表現されている。
今回は、このパラメータ爆発問題を解決する方法について。 なお、基本的には上記のドキュメントに解決方法が書いてあるので、そちらを読むでも良い。
使った環境は次のとおり。
$ sw_vers ProductName: Mac OS X ProductVersion: 10.14.6 BuildVersion: 18G3020 $ python -V Python 3.8.1 $ pip list | grep -i luigi luigi 2.8.12
下準備
事前の準備として、Luigi をインストールしておく。
$ pip install luigi
パラメータ爆発によって生じるパラメータのバケツリレー
まずは、パラメータ爆発によって何が引き起こされるのかについて解説する。
たとえば、タスクとして UpstreamTask
と DownstreamTask
があるとする。
DownstreamTask
は UpstreamTask
に依存していて、それぞれ動作に必要なパラメータがある。
このとき、愚直にパイプラインを定義すると、次のようなコードになる。
DownstreamTask
では、UpstreamTask
に必要なパラメータを二重に定義した上で、タスクを生成するときに渡している。
#!/usr/bin/env python3 # -*- coding: utf-8 -*- import luigi class ExampleCommonTask(luigi.Task): """ターゲットファイルを生成しない例示用のタスク""" # タスクの実行が完了したかを示すフラグ done = False def run(self): """タスクが実行されたときに呼ばれるメソッド""" print(f'run: {self}') # 実行されたらタスクが完了したことにする self.done = True def complete(self): """タスク完了のチェックに使われるメソッド""" return self.done class UpstreamTask(ExampleCommonTask): """他のタスクから依存されているタスク""" # 上流タスクのパラメータ upstream_task_param = luigi.Parameter() class DownstreamTask(ExampleCommonTask): """他のタスクに依存しているタスク""" # タスク固有のパラメータ downstream_task_param = luigi.Parameter() # 上流タスクのパラメータ # FIXME: 同じ内容を二重に定義してしまっている upstream_task_param = luigi.Parameter() def requires(self): """依存しているタスクを示すメソッド""" # FIXME: 依存しているタスクを生成するときにパラメータのバケツリレーが生じている yield UpstreamTask(self.upstream_task_param) def main(): # 実行したい下流タスクにすべてのパラメータを渡してバケツリレーする luigi.run(cmdline_args=['DownstreamTask', '--downstream-task-param=downstream', '--upstream-task-param=upstream', '--local-scheduler']) if __name__ == '__main__': main()
上記をファイルに保存して実行してみよう。
$ python example.py ...(省略)... ===== Luigi Execution Summary ===== Scheduled 2 tasks of which: * 2 ran successfully: - 1 DownstreamTask(downstream_task_param=downstream, upstream_task_param=upstream) - 1 UpstreamTask(upstream_task_param=upstream) This progress looks :) because there were no failed tasks or missing dependencies ===== Luigi Execution Summary =====
実行サマリーを見ると、ちゃんと DownstreamTask
に渡したパラメータが UpstreamTask
に渡されている。
たしかに、問題なく動いてはいる。
しかし、これではパラメータが二重管理になっているし書き間違いも起きやすい。
もし、上流のタスクが追加されたら下流のタスクはすべて修正が必要になってしまう。
タスクの依存関係とパラメータが少ないうちは何とかなっても、規模が大きくなれば早々に破綻するのは明らかだろう。
クラス名を指定してパラメータを渡す
ここからは、上記の問題を解決する方法について書いていく。 まず、ひとつ目のやり方はタスクの依存関係を示すときにパラメータを指定しないというもの。 こうすると、不足しているパラメータはコマンドラインやコンフィグファイル経由で渡すことになる。
以下にサンプルコードを示す。
先ほどとの違いは DownstreamTask
の requires()
メソッドのところ。
メソッドで UpstreamTask
のインスタンスを返すときに、パラメータをまったく指定していない。
代わりに、実行するときにコマンドラインからそれぞれのクラスの名前を指定してパラメータを渡している。
#!/usr/bin/env python3 # -*- coding: utf-8 -*- import luigi class ExampleCommonTask(luigi.Task): """ターゲットファイルを生成しない例示用のタスク""" # タスクの実行が完了したかを示すフラグ done = False def run(self): """タスクが実行されたときに呼ばれるメソッド""" print(f'run: {self}') # 実行されたらタスクが完了したことにする self.done = True def complete(self): """タスク完了のチェックに使われるメソッド""" return self.done class UpstreamTask(ExampleCommonTask): """他のタスクから依存されているタスク""" # 上流タスクのパラメータ upstream_task_param = luigi.Parameter() class DownstreamTask(ExampleCommonTask): """他のタスクに依存しているタスク""" # タスク固有のパラメータ downstream_task_param = luigi.Parameter() def requires(self): """依存しているタスクを示すメソッド""" # 依存しているタスクを生成するときにパラメータを渡さない # 代わりにコマンドラインの引数でクラス名を指定してパラメータを渡す yield UpstreamTask() def main(): # 実行するときにクラス名を指定することで上流タスクに直接パラメータを渡せる luigi.run(cmdline_args=['DownstreamTask', '--DownstreamTask-downstream-task-param=downstream', '--UpstreamTask-upstream-task-param=upstream', '--local-scheduler']) if __name__ == '__main__': main()
上記をファイルに保存して実行してみよう。
$ python example.py ...(省略)... ===== Luigi Execution Summary ===== Scheduled 2 tasks of which: * 2 ran successfully: - 1 DownstreamTask(downstream_task_param=downstream) - 1 UpstreamTask(upstream_task_param=upstream) This progress looks :) because there were no failed tasks or missing dependencies ===== Luigi Execution Summary =====
実行サマリからは、ピンポイントにそれぞれのクラスにパラメータが渡されていることがわかる。 ただし、この方法は下流にあるタスクが上流のタスクのパラメータを利用するときには使えない
@requires デコレータを使う
次に紹介するのは luigi.util.requires
デコレータを使うやり方。
このデコレータを使ってタスクのクラスを修飾すると、自動的に必要なパラメータとメソッドを追加してくれる。
以下にサンプルコードを示す。
DownstreamTask
を @requires
デコレータを使って UpstreamTask
を引数に修飾している。
こうすると、指定したクラスで必要となるパラメータを自動的に追加すると共に、requires()
メソッドでクラスを指定したのと同じことになる。
#!/usr/bin/env python3 # -*- coding: utf-8 -*- import luigi from luigi.util import requires class ExampleCommonTask(luigi.Task): """ターゲットファイルを生成しない例示用のタスク""" # タスクの実行が完了したかを示すフラグ done = False def run(self): """タスクが実行されたときに呼ばれるメソッド""" print(f'run: {self}') # 実行されたらタスクが完了したことにする self.done = True def complete(self): """タスク完了のチェックに使われるメソッド""" return self.done class UpstreamTask(ExampleCommonTask): """他のタスクから依存されているタスク""" # 上流タスクのパラメータ upstream_task_param = luigi.Parameter() # 他のタスクに依存していることをデコレータで示す # 必要なパラメータも自動的に定義されるので二重管理がなくなる @requires(UpstreamTask) class DownstreamTask(ExampleCommonTask): """他のタスクに依存しているタスク""" # タスク固有のパラメータ downstream_task_param = luigi.Parameter() def main(): luigi.run(cmdline_args=['DownstreamTask', '--downstream-task-param=downstream', '--upstream-task-param=upstream', '--local-scheduler']) if __name__ == '__main__': main()
上記をファイルに保存して実行してみよう。
$ python example.py ...(省略)... ===== Luigi Execution Summary ===== Scheduled 2 tasks of which: * 2 ran successfully: - 1 DownstreamTask(upstream_task_param=upstream, downstream_task_param=downstream) - 1 UpstreamTask(upstream_task_param=upstream) This progress looks :) because there were no failed tasks or missing dependencies ===== Luigi Execution Summary =====
実行サマリを見ると、最初のパターンと同じように UpstreamTask
で必要とするパラメータが DownstreamTask
にも重複して渡されている。
これなら下流のタスクでも上流のタスクのパラメータが利用できる。
@inherits デコレータを使う
もうひとつのやり方は @inherits
デコレータを使うもので、アプローチは @requires
デコレータと似ている。
ただし、こちらは本来の目的とはちょっと違うものを無理やり転用して使っている感じがある。
次のサンプルコードでは DownstreamTask
を @inherits
デコレータで UpstreamTask
を引数に修飾している。
こうすると DownstreamTask
は UpstreamTask
と同じパラメータを持つようになる。
ただし、@requires
デコレータと違って依存関係はセットされない。
そのため、自分で requires()
メソッドに必要なタスクを指定することになる。
#!/usr/bin/env python3 # -*- coding: utf-8 -*- import luigi from luigi.util import inherits class ExampleCommonTask(luigi.Task): """ターゲットファイルを生成しない例示用のタスク""" # タスクの実行が完了したかを示すフラグ done = False def run(self): """タスクが実行されたときに呼ばれるメソッド""" print(f'run: {self}') # 実行されたらタスクが完了したことにする self.done = True def complete(self): """タスク完了のチェックに使われるメソッド""" return self.done class UpstreamTask(ExampleCommonTask): """他のタスクから依存されているタスク""" # 上流タスクのパラメータ upstream_task_param = luigi.Parameter() @inherits(UpstreamTask) class DownstreamTask(ExampleCommonTask): """他のタスクに依存しているタスク""" # タスク固有のパラメータ downstream_task_param = luigi.Parameter() def requires(self): """依存しているタスクを示すメソッド""" # clone() することで、自身のパラメータを適用したタスクが得られる # self.clone_parent() でも良い return self.clone(UpstreamTask) def main(): luigi.run(cmdline_args=['DownstreamTask', '--downstream-task-param=downstream', '--upstream-task-param=upstream', '--local-scheduler']) if __name__ == '__main__': main()
上記をファイルに保存して実行してみよう。
$ python example.py ...(省略)... ===== Luigi Execution Summary ===== Scheduled 2 tasks of which: * 2 ran successfully: - 1 DownstreamTask(upstream_task_param=upstream, downstream_task_param=downstream) - 1 UpstreamTask(upstream_task_param=upstream) This progress looks :) because there were no failed tasks or missing dependencies ===== Luigi Execution Summary =====
実行サマリを見ると、ちゃんとパラメータを適用したタスクが実行されていることがわかる。
まとめ
今回は、パラメータ爆発問題を回避するいくつかの方法を試してみた。
- タスクの依存関係を
requires()
メソッドで示すときにパラメータをブランクにする @requires
デコレータを使う@inherits
デコレータを使う
いじょう。
- 作者:もみじあめ
- 発売日: 2020/03/06
- メディア: オンデマンド (ペーパーバック)