CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: Luigi のパラメータ爆発問題について

Luigi は、Python を使って実装された、バッチ処理のパイプラインを扱うためのフレームワーク。 Luigi でパイプラインを定義するときは、基本的には個別のタスクを依存関係でつないでいくことになる。 このとき、扱う処理によってはパイプラインは長大になると共に扱うパラメータの数も増える。 そうすると、依存関係で上流にあるタスクに対して、どのようにパラメータを渡すか、という問題が生じる。

この問題は、公式のドキュメントではパラメータ爆発 (parameter explosion) と表現されている。

luigi.readthedocs.io

今回は、このパラメータ爆発問題を解決する方法について。 なお、基本的には上記のドキュメントに解決方法が書いてあるので、そちらを読むでも良い。

使った環境は次のとおり。

$ sw_vers           
ProductName:    Mac OS X
ProductVersion: 10.14.6
BuildVersion:   18G3020
$ python -V                 
Python 3.8.1
$ pip list | grep -i luigi   
luigi           2.8.12 

下準備

事前の準備として、Luigi をインストールしておく。

$ pip install luigi

パラメータ爆発によって生じるパラメータのバケツリレー

まずは、パラメータ爆発によって何が引き起こされるのかについて解説する。 たとえば、タスクとして UpstreamTaskDownstreamTask があるとする。 DownstreamTaskUpstreamTask に依存していて、それぞれ動作に必要なパラメータがある。

このとき、愚直にパイプラインを定義すると、次のようなコードになる。 DownstreamTask では、UpstreamTask に必要なパラメータを二重に定義した上で、タスクを生成するときに渡している。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi


class ExampleCommonTask(luigi.Task):
    """ターゲットファイルを生成しない例示用のタスク"""
    # タスクの実行が完了したかを示すフラグ
    done = False

    def run(self):
        """タスクが実行されたときに呼ばれるメソッド"""
        print(f'run: {self}')
        # 実行されたらタスクが完了したことにする
        self.done = True

    def complete(self):
        """タスク完了のチェックに使われるメソッド"""
        return self.done


class UpstreamTask(ExampleCommonTask):
    """他のタスクから依存されているタスク"""

    # 上流タスクのパラメータ
    upstream_task_param = luigi.Parameter()


class DownstreamTask(ExampleCommonTask):
    """他のタスクに依存しているタスク"""

    # タスク固有のパラメータ
    downstream_task_param = luigi.Parameter()

    # 上流タスクのパラメータ
    # FIXME: 同じ内容を二重に定義してしまっている
    upstream_task_param = luigi.Parameter()

    def requires(self):
        """依存しているタスクを示すメソッド"""
        # FIXME: 依存しているタスクを生成するときにパラメータのバケツリレーが生じている
        yield UpstreamTask(self.upstream_task_param)


def main():
    # 実行したい下流タスクにすべてのパラメータを渡してバケツリレーする
    luigi.run(cmdline_args=['DownstreamTask',
                            '--downstream-task-param=downstream',
                            '--upstream-task-param=upstream',
                            '--local-scheduler'])


if __name__ == '__main__':
    main()

上記をファイルに保存して実行してみよう。

$ python example.py
...(省略)...
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 DownstreamTask(downstream_task_param=downstream, upstream_task_param=upstream)
    - 1 UpstreamTask(upstream_task_param=upstream)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

実行サマリーを見ると、ちゃんと DownstreamTask に渡したパラメータが UpstreamTask に渡されている。 たしかに、問題なく動いてはいる。 しかし、これではパラメータが二重管理になっているし書き間違いも起きやすい。 もし、上流のタスクが追加されたら下流のタスクはすべて修正が必要になってしまう。 タスクの依存関係とパラメータが少ないうちは何とかなっても、規模が大きくなれば早々に破綻するのは明らかだろう。

クラス名を指定してパラメータを渡す

ここからは、上記の問題を解決する方法について書いていく。 まず、ひとつ目のやり方はタスクの依存関係を示すときにパラメータを指定しないというもの。 こうすると、不足しているパラメータはコマンドラインやコンフィグファイル経由で渡すことになる。

以下にサンプルコードを示す。 先ほどとの違いは DownstreamTaskrequires() メソッドのところ。 メソッドで UpstreamTask のインスタンスを返すときに、パラメータをまったく指定していない。 代わりに、実行するときにコマンドラインからそれぞれのクラスの名前を指定してパラメータを渡している。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi


class ExampleCommonTask(luigi.Task):
    """ターゲットファイルを生成しない例示用のタスク"""
    # タスクの実行が完了したかを示すフラグ
    done = False

    def run(self):
        """タスクが実行されたときに呼ばれるメソッド"""
        print(f'run: {self}')
        # 実行されたらタスクが完了したことにする
        self.done = True

    def complete(self):
        """タスク完了のチェックに使われるメソッド"""
        return self.done


class UpstreamTask(ExampleCommonTask):
    """他のタスクから依存されているタスク"""

    # 上流タスクのパラメータ
    upstream_task_param = luigi.Parameter()


class DownstreamTask(ExampleCommonTask):
    """他のタスクに依存しているタスク"""

    # タスク固有のパラメータ
    downstream_task_param = luigi.Parameter()

    def requires(self):
        """依存しているタスクを示すメソッド"""
        # 依存しているタスクを生成するときにパラメータを渡さない
        # 代わりにコマンドラインの引数でクラス名を指定してパラメータを渡す
        yield UpstreamTask()


def main():
    # 実行するときにクラス名を指定することで上流タスクに直接パラメータを渡せる
    luigi.run(cmdline_args=['DownstreamTask',
                            '--DownstreamTask-downstream-task-param=downstream',
                            '--UpstreamTask-upstream-task-param=upstream',
                            '--local-scheduler'])


if __name__ == '__main__':
    main()

上記をファイルに保存して実行してみよう。

$ python example.py
...(省略)...
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 DownstreamTask(downstream_task_param=downstream)
    - 1 UpstreamTask(upstream_task_param=upstream)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

実行サマリからは、ピンポイントにそれぞれのクラスにパラメータが渡されていることがわかる。 ただし、この方法は下流にあるタスクが上流のタスクのパラメータを利用するときには使えない

@requires デコレータを使う

次に紹介するのは luigi.util.requires デコレータを使うやり方。 このデコレータを使ってタスクのクラスを修飾すると、自動的に必要なパラメータとメソッドを追加してくれる。

以下にサンプルコードを示す。 DownstreamTask@requires デコレータを使って UpstreamTask を引数に修飾している。 こうすると、指定したクラスで必要となるパラメータを自動的に追加すると共に、requires() メソッドでクラスを指定したのと同じことになる。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi
from luigi.util import requires


class ExampleCommonTask(luigi.Task):
    """ターゲットファイルを生成しない例示用のタスク"""

    # タスクの実行が完了したかを示すフラグ
    done = False

    def run(self):
        """タスクが実行されたときに呼ばれるメソッド"""
        print(f'run: {self}')
        # 実行されたらタスクが完了したことにする
        self.done = True

    def complete(self):
        """タスク完了のチェックに使われるメソッド"""
        return self.done


class UpstreamTask(ExampleCommonTask):
    """他のタスクから依存されているタスク"""

    # 上流タスクのパラメータ
    upstream_task_param = luigi.Parameter()


# 他のタスクに依存していることをデコレータで示す
# 必要なパラメータも自動的に定義されるので二重管理がなくなる
@requires(UpstreamTask)
class DownstreamTask(ExampleCommonTask):
    """他のタスクに依存しているタスク"""

    # タスク固有のパラメータ
    downstream_task_param = luigi.Parameter()


def main():
    luigi.run(cmdline_args=['DownstreamTask',
                            '--downstream-task-param=downstream',
                            '--upstream-task-param=upstream',
                            '--local-scheduler'])


if __name__ == '__main__':
    main()

上記をファイルに保存して実行してみよう。

$ python example.py
...(省略)...
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 DownstreamTask(upstream_task_param=upstream, downstream_task_param=downstream)
    - 1 UpstreamTask(upstream_task_param=upstream)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

実行サマリを見ると、最初のパターンと同じように UpstreamTask で必要とするパラメータが DownstreamTask にも重複して渡されている。 これなら下流のタスクでも上流のタスクのパラメータが利用できる。

@inherits デコレータを使う

もうひとつのやり方は @inherits デコレータを使うもので、アプローチは @requires デコレータと似ている。 ただし、こちらは本来の目的とはちょっと違うものを無理やり転用して使っている感じがある。

次のサンプルコードでは DownstreamTask@inherits デコレータで UpstreamTask を引数に修飾している。 こうすると DownstreamTaskUpstreamTask と同じパラメータを持つようになる。 ただし、@requires デコレータと違って依存関係はセットされない。 そのため、自分で requires() メソッドに必要なタスクを指定することになる。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi
from luigi.util import inherits


class ExampleCommonTask(luigi.Task):
    """ターゲットファイルを生成しない例示用のタスク"""

    # タスクの実行が完了したかを示すフラグ
    done = False

    def run(self):
        """タスクが実行されたときに呼ばれるメソッド"""
        print(f'run: {self}')
        # 実行されたらタスクが完了したことにする
        self.done = True

    def complete(self):
        """タスク完了のチェックに使われるメソッド"""
        return self.done


class UpstreamTask(ExampleCommonTask):
    """他のタスクから依存されているタスク"""

    # 上流タスクのパラメータ
    upstream_task_param = luigi.Parameter()


@inherits(UpstreamTask)
class DownstreamTask(ExampleCommonTask):
    """他のタスクに依存しているタスク"""

    # タスク固有のパラメータ
    downstream_task_param = luigi.Parameter()

    def requires(self):
        """依存しているタスクを示すメソッド"""
        # clone() することで、自身のパラメータを適用したタスクが得られる
        # self.clone_parent() でも良い
        return self.clone(UpstreamTask)


def main():
    luigi.run(cmdline_args=['DownstreamTask',
                            '--downstream-task-param=downstream',
                            '--upstream-task-param=upstream',
                            '--local-scheduler'])


if __name__ == '__main__':
    main()

上記をファイルに保存して実行してみよう。

$ python example.py
...(省略)...
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 DownstreamTask(upstream_task_param=upstream, downstream_task_param=downstream)
    - 1 UpstreamTask(upstream_task_param=upstream)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

実行サマリを見ると、ちゃんとパラメータを適用したタスクが実行されていることがわかる。

まとめ

今回は、パラメータ爆発問題を回避するいくつかの方法を試してみた。

  • タスクの依存関係を requires() メソッドで示すときにパラメータをブランクにする
  • @requires デコレータを使う
  • @inherits デコレータを使う

いじょう。

Linuxで動かしながら学ぶTCP/IPネットワーク入門

Linuxで動かしながら学ぶTCP/IPネットワーク入門

  • 作者:もみじあめ
  • 発売日: 2020/03/06
  • メディア: オンデマンド (ペーパーバック)