CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: Luigi でタスク共通のパラメータを扱う

今回は、Luigi で複数のタスクが共通のパラメータを扱う方法について考えてみる。 ここらへん、調べてもあまりドキュメントなどが出てこなかった。 なので、ソースコードを読んでリバースエンジニアリング的に「こういう風にできそう」と判明した内容を書いてみる。 使う API のレイヤー的に、高レベルなやり方と低レベルなやり方が見つかったので、どちらも記載する。

使った環境は次のとおり。

$ sw_vers 
ProductName:    macOS
ProductVersion: 11.4
BuildVersion:   20F71
$ python -V        
Python 3.9.5
$ pip list | grep -i luigi
luigi           3.0.3

もくじ

下準備

まずは、下準備として Luigi をインストールしておく。

$ pip install luigi

低レベル API (luigi.configuration.get_config()) を使う

まずは低レベル API の luigi.configuration.get_config() を使うやり方から。 この API を使うと、Luigi の設定ファイルを辞書形式でそのまま読み込むことができる。 読み込んだコンフィグは、どのタスクから利用することもできるため共通のパラメータを扱うことができる。

以下にサンプルコードを示す。 サンプルコードには TaskATaskB という、2 つのタスクを定義している。 この中では、それぞれ設定ファイルから [SharedConfig] セクションの shared_param パラメータを読み込んで使っている。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi
# 設定を取得するための API
from luigi.configuration import get_config


class NoOutputTask(luigi.Task):
    """output() を介さずに完了を制御するテスト用のタスク

    NOTE: 今回のサンプルコードの中では本質的な内容ではない"""
    # タスクが完了しているかを示すフラグ
    done = False

    def run(self):
        # run() が一度でも実行されたら完了フラグを立てる
        self.done = True

    def complete(self):
        # タスクの完了はフラグで判断する
        return self.done


class TaskA(NoOutputTask):

    def run(self):
        super().run()
        # SharedConfig セクションのパラメータを取得する
        section_dict = dict(get_config().items('SharedConfig'))
        # パラメータの内容を標準出力に書き出す (ほんとは output() に書くべき)
        print('Hello,', section_dict['shared_param'], 'by TaskA')


class TaskB(NoOutputTask):

    def run(self):
        super().run()
        # 同じパラメータを使う
        section_dict = dict(get_config().items('SharedConfig'))
        print('Hello,', section_dict['shared_param'], 'by TaskB')


class Wrapper(luigi.WrapperTask):
    """上記で定義した 2 つのタスクをキックするためだけのタスク"""

    def requires(self):
        yield TaskA()
        yield TaskB()


if __name__ == '__main__':
    luigi.run(main_task_cls=Wrapper,
              local_scheduler=True)

次のように設定ファイルを用意しよう。

$ cat << 'EOF' > luigi.cfg  
[SharedConfig]
shared_param=World
EOF

あとは実行するだけ。

$ python lowlayer.py

...

Hello, World by TaskB

...

Hello, World by TaskA

...

===== Luigi Execution Summary =====

Scheduled 3 tasks of which:
* 3 ran successfully:
    - 1 TaskA()
    - 1 TaskB()
    - 1 Wrapper()

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

上記の結果からわかるように、タスクが出力するメッセージの中に設定ファイルで指定した値が使われている。

高レベル API (luigi.Config) を使う

続いては高レベル API の luigi.Config を使うパターン。 こちらは luigi.Config というクラスを継承したクラスを定義する。 設定ファイからは、定義したクラスと同名のセクション経由でパラメータを設定できる。 複数のタスクからは、クラスをインスタンス化してやればパラメータがインジェクションされて得られる。

以下にサンプルコードを示す。 先ほどのサンプルコードから SharedConfig というクラスが増えている。 そして、TaskATaskBSharedConfig をインスタンス化して shared_param パラメータにアクセスしている。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi


class SharedConfig(luigi.Config):
    """複数のタスクから参照される共通のパラメータ"""
    shared_param = luigi.Parameter()


class NoOutputTask(luigi.Task):
    """output() を介さずに完了を制御するテスト用のタスク

    NOTE: 今回のサンプルコードの中では本質的な内容ではない"""
    # タスクが完了しているかを示すフラグ
    done = False

    def run(self):
        # run() が一度でも実行されたら完了フラグを立てる
        self.done = True

    def complete(self):
        # タスクの完了はフラグで判断する
        return self.done


class TaskA(NoOutputTask):

    def run(self):
        super().run()
        # SharedConfig をインスタンス化してパラメータを取り出す
        # パラメータは luigi の設定ファイルで指定できる
        print('Hello,', SharedConfig().shared_param, 'by TaskA')


class TaskB(NoOutputTask):

    def run(self):
        super().run()
        # こちらも同様
        print('Hello,', SharedConfig().shared_param, 'by TaskB')


class Wrapper(luigi.WrapperTask):
    """上記で定義した 2 つのタスクをキックするためだけのタスク"""

    def requires(self):
        yield TaskA()
        yield TaskB()


if __name__ == '__main__':
    luigi.run(main_task_cls=Wrapper,
              local_scheduler=True)

設定ファイルは先ほどと同じで良い。 セクション名とパラメータ名が同じになるようにクラスを定義してあるため。 作り直すなら次のようにする。

$ cat << 'EOF' > luigi.cfg  
[SharedConfig]
shared_param=World
EOF

実行してみよう。

$ python highlayer.py 

...

Hello, World by TaskB

...

Hello, World by TaskA

...

===== Luigi Execution Summary =====

Scheduled 3 tasks of which:
* 3 ran successfully:
    - 1 TaskA()
    - 1 TaskB()
    - 1 Wrapper()

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

上記の実行結果から、ちゃんとパラメータを参照できていることがわかる。

ところで、上記で使った luigi.Config というクラス、定義を見ると面白いことがわかる。 以下がそのソースコード。

github.com

なんと、luigi.Configluigi.Task を継承しているだけで、他に何もしていない。 つまり、ほとんど同一のものということになる。 実は、luigi.Config を使わなくても、luigi.Task でも同じことはできるのだ。

いじょう。