CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: Luigi のイベントハンドラを試してみる

今回は、Luigi でタスクの開始や成功・失敗などのときに発火するイベントハンドラを扱ってみる。 なお、Luigi はバッチ処理などのパイプラインを組むのに使われるソフトウェアのこと。 基本的な使い方については以下を参照してほしい。

blog.amedama.jp

使った環境は次の通り。

$ sw_vers 
ProductName:    Mac OS X
ProductVersion: 10.14.6
BuildVersion:   18G3020
$ python -V        
Python 3.7.6

下準備

下準備として Luigi をインストールしておく。

$ pip install luigi

イベントハンドラを登録する

早速だけど以下にサンプルコードを示す。 Luigi では、デコレータを使ってイベントが発生したときに実行したい処理を登録できる。 サンプルコードでは、タスクが開始されたタイミングと成功・失敗したタイミングで実行される処理を登録している。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi


class ExampleTask(luigi.Task):
    """サンプルのタスク"""

    def run(self):
        print(f'run: {self}')


@luigi.Task.event_handler(luigi.Event.START)
def on_start(task):
    """タスクを開始したときのハンドラ"""
    print(f'on_start: {task}')


@luigi.Task.event_handler(luigi.Event.SUCCESS)
def on_success(task):
    """タスクが成功したときのハンドラ"""
    print(f'on_success: {task}')


@luigi.Task.event_handler(luigi.Event.FAILURE)
def on_failure(task, exception):
    """タスクが失敗したときのハンドラ"""
    print(f'on_failure: {task} with {exception}')


def main():
    # 最終的に実行したいタスクを指定して開始する
    luigi.run(main_task_cls=ExampleTask, local_scheduler=True)


if __name__ == '__main__':
    main()

上記のサンプルコードを実行してみよう。 読みやすくなるように標準エラー出力は表示していない。

$ python evhandler.py 2>/dev/null
on_start: ExampleTask()
run: ExampleTask()
on_success: ExampleTask()

上記を見ると、ちゃんとタスクの開始と成功したタイミングでイベントハンドラが実行されていることがわかる。

意図的にタスクを失敗させてみる

試しに、意図的にタスクを失敗させてみよう。 次のサンプルコードではタスクで例外を上げている。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi


class ExampleTask(luigi.Task):

    def run(self):
        print(f'run: {self}')
        # 例外を上げる
        raise Exception('Oops!')


@luigi.Task.event_handler(luigi.Event.START)
def on_start(task):
    print(f'on_start: {task}')


@luigi.Task.event_handler(luigi.Event.SUCCESS)
def on_success(task):
    print(f'on_success: {task}')


@luigi.Task.event_handler(luigi.Event.FAILURE)
def on_failure(task, exception):
    print(f'on_failure: {task} with {exception}')


def main():
    luigi.run(main_task_cls=ExampleTask, local_scheduler=True)


if __name__ == '__main__':
    main()

上記を実行してみよう。

$ python evhandler.py 2>/dev/null
on_start: ExampleTask()
run: ExampleTask()
on_failure: ExampleTask() with Oops!

今度は、失敗したときのイベントハンドラが実行されていることがわかる。 また、同時に例外オブジェクトもハンドラに渡されている。

一連の複数のタスクを実行してみる

続いては、依存関係をもった複数のタスクが実行されたときの挙動を確認しておく。 次のサンプルコードでは、ExampleTaskA と、それに依存した ExampleTaskB というタスクを定義している。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi
from luigi.mock import MockTarget


class ExampleTaskA(luigi.Task):

    def run(self):
        print(f'run: {self}')
        # モックの出力にメッセージを書き込む
        out = self.output()
        with out.open('w') as f:
            f.write('Hello, World!\n')

    def output(self):
        # オンメモリのモックを出力にする
        return MockTarget('mock')


class ExampleTaskB(luigi.Task):

    def requires(self):
        # このタスクの実行には事前に以下のタスクを実行する必要がある
        return ExampleTaskA()

    def run(self):
        print(f'run: {self}')


@luigi.Task.event_handler(luigi.Event.START)
def on_start(task):
    """タスクを開始したときのハンドラ"""
    print(f'on_start: {task}')


@luigi.Task.event_handler(luigi.Event.SUCCESS)
def on_success(task):
    """タスクが成功したときのハンドラ"""
    print(f'on_success: {task}')


@luigi.Task.event_handler(luigi.Event.FAILURE)
def on_failure(task, exception):
    """タスクが失敗したときのハンドラ"""
    print(f'on_failure: {task} with {exception}')


def main():
    # 最終的に実行したいタスクを指定して開始する
    luigi.run(main_task_cls=ExampleTaskB, local_scheduler=True)


if __name__ == '__main__':
    main()

上記を実行してみる。

$ python evhandler.py 2>/dev/null
on_start: ExampleTaskA()
run: ExampleTaskA()
on_success: ExampleTaskA()
on_start: ExampleTaskB()
run: ExampleTaskB()
on_success: ExampleTaskB()

ちゃんと、それぞれのタスクが開始・成功したタイミングでイベントハンドラが実行されているようだ。

特定のタスクだけで実行したいハンドラを登録する

ここまでのイベントハンドラは、すべてのタスクに共通して実行されるものだった。 続いては、特定のタスクだけで実行されるイベントハンドラを登録してみる。

次のサンプルコードでは ExampleTaskB にだけタスクが成功したときに実行されるイベントハンドラを登録している。 Luigi の Task クラスには event_handler() というクラスメソッドが定義されていて、これを使ってタスクに対してイベントハンドラを登録できるようになっている。 そこで、定義したタスクの event_handler() メソッドを使えば、そのタスクだけで実行されるイベントハンドラを登録できる。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi
from luigi.mock import MockTarget


class ExampleTaskA(luigi.Task):

    def run(self):
        print(f'run: {self}')
        # モックの出力にメッセージを書き込む
        out = self.output()
        with out.open('w') as f:
            f.write('Hello, World!\n')

    def output(self):
        # オンメモリのモックを出力にする
        return MockTarget('mock')


class ExampleTaskB(luigi.Task):

    def requires(self):
        # このタスクの実行には事前に以下のタスクを実行する必要がある
        return ExampleTaskA()

    def run(self):
        print(f'run: {self}')


# 特定のタスクのイベントをハンドルしたいときはクラスメソッドの eventhandler() を使う
@ExampleTaskB.event_handler(luigi.Event.SUCCESS)
def on_success(task):
    print(f'on_success: {task}')


def main():
    # 最終的に実行したいタスクを指定して開始する
    luigi.run(main_task_cls=ExampleTaskB, local_scheduler=True)


if __name__ == '__main__':
    main()

あるいは、単純にタスクに各イベントハンドラがあらかじめ定義されているので、それをオーバーライドしても良い。 おそらく、ほとんどのユースケースではこちらを使えば問題ないはず。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi
from luigi.mock import MockTarget


class ExampleTaskA(luigi.Task):

    def run(self):
        print(f'run: {self}')
        # モックの出力にメッセージを書き込む
        out = self.output()
        with out.open('w') as f:
            f.write('Hello, World!\n')

    def output(self):
        # オンメモリのモックを出力にする
        return MockTarget('mock')


class ExampleTaskB(luigi.Task):

    def requires(self):
        # このタスクの実行には事前に以下のタスクを実行する必要がある
        return ExampleTaskA()

    def run(self):
        print(f'run: {self}')

    # 特定のタスクのイベントをハンドルしたいときはメソッドをオーバーライドするだけ
    def on_success(task):
        print(f'on_success: {task}')


def main():
    # 最終的に実行したいタスクを指定して開始する
    luigi.run(main_task_cls=ExampleTaskB, local_scheduler=True)


if __name__ == '__main__':
    main()

上記を実行してみよう。

$ python evhandler.py 2>/dev/null
run: ExampleTaskA()
run: ExampleTaskB()
on_success: ExampleTaskB()

すると、今度は ExampleTaskB の成功時だけイベントハンドラが実行されていることがわかる。

めでたしめでたし。