CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: データパイプライン構築用フレームワーク Luigi を使ってみる

最近になって、バッチ処理においてデータパイプラインを組むためのフレームワークとして Luigi というものがあることを知った。 これは、Spotify という音楽のストリーミングサービスを提供する会社が作ったものらしい。 似たような OSS としては他にも Apache Airflow がある。 こちらは民宿サービスを提供する Airbnb が作ったものだけど、最近 Apache に寄贈されてインキュベータープロジェクトとなった。

Luigi の特徴としては、バッチ処理に特化している点が挙げられる。 ただし、定期的にバッチ処理を実行するような機能はない。 そこは、代わりに cron や systemd timer を使ってやる必要がある。 また、本体もそうだけどデータパイプラインについても Python を使って書く必要がある。

今回は、そんな Luigi を一通り触ってみることにする。 使った環境は次の通り。

$ sw_vers 
ProductName:    Mac OS X
ProductVersion: 10.12.4
BuildVersion:   16E195
$ python --version
Python 3.5.3

インストールする

インストールは Python のパッケージマネージャの pip を使ってさくっとできる。

$ pip install luigi
$ pip list --format=columns | grep luigi
luigi           2.6.1

ハローワールド

まず、Luigi における最も重要な概念としてタスクというものを知る必要がある。 タスクというのは、ユーザが Luigi にやらせたい何らかの仕事を細分化した最小単位となる。 そして、タスクを定義するには Task というクラスを継承したサブクラスを作る。

次のサンプルコードでは、最も単純なタスクを定義している。 このタスクでは run() メソッドをオーバーライドしてメッセージをプリントするようになっている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi


# Luigi で実行したい仕事は Task を継承したクラスにする
class Greet(luigi.Task):

    def run(self):
        """run() メソッドで具体的な処理を実行する"""
        print('Hello, World!')


def main():
    luigi.run()


if __name__ == '__main__':
    main()

上記のタスクを実行するには、次のようにする。 第二引数に渡している Greet が、実行したいタスク (のクラス名) を指している。 --local-scheduler オプションは、タスクのスケジューラをローカルモードに指定している。 スケジューラにはローカルとセントラルの二種類があるんだけど、詳しくは後ほど説明する。 とりあえず開発用途で使うならローカル、と覚えておけば良いかな。

$ python helloworld.py Greet --local-scheduler

実際に実行してみると、次のような出力が得られる。 実行サマリーとして Greet タスクが成功していることや、ログの中にメッセージが出力されていることが見て取れる。

$ python helloworld.py Greet --local-scheduler
DEBUG: Checking if Greet() is complete
/Users/amedama/.virtualenvs/py35/lib/python3.5/site-packages/luigi/worker.py:328: UserWarning: Task Greet() without outputs has no custom complete() method
  is_complete = task.complete()
INFO: Informed scheduler that task   Greet__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 7082] Worker Worker(salt=970230875, workers=1, host=macbookair.local, username=amedama, pid=7082) running   Greet()
Hello, World!
INFO: [pid 7082] Worker Worker(salt=970230875, workers=1, host=macbookair.local, username=amedama, pid=7082) done      Greet()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Greet__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=970230875, workers=1, host=macbookair.local, username=amedama, pid=7082) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 Greet()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

上記は、代わりに次のようにすることもできる。 このやり方では、まず python コマンドで luigi モジュールを実行して、そこ経由でタスクを実行している。

$ python -m luigi --module helloworld Greet --local-scheduler

このやり方の利点としては、パッケージ化されたインストール済みのタスクを実行できる点が挙げられる。 先ほどはカレントワーキングディレクトリ (CWD) にある Python ファイル (モジュール) を直接指定して実行していた。 それに対し、こちらは Python のパスにもとづいてモジュールが実行されるので、インストール済みのモジュールなら CWD の場所に関わらず実行できる。 そして、python コマンドのパスには CWD も含まれるので、先ほどと同じように実行できるというわけ。

もし、Python のモジュールとかがよく分からないというときは必要に応じて以下を参照してもらえると。 ようするに言いたいことは Python においてモジュールって呼ばれてるのは単なる Python ファイルのことだよ、という点。

blog.amedama.jp

また、もう一つのやり方として luigi コマンドを使うこともできる。 この場合も、実行するモジュールは --module オプションで指定する。 ただし、このやり方だと Python のパスに CWD が含まれない。 そのため CWD にあるモジュールを実行したいときは別途 PYTHONPATH 環境変数を指定する必要がある。

$ PYTHONPATH=. luigi --module helloworld Greet --local-scheduler

実行するタスクをモジュール内で指定する

とはいえ、毎回コマンドラインで実行するモジュールを指定するのも面倒だと思う。 そんなときは、次のようにして luigi.run() に実行するときのパラメータを指定してやると良さそう。 開発や検証用途なら、こうしておくと楽ちん。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi


class Greet(luigi.Task):

    def run(self):
        print('Hello, World!')


def main():
    # 起動したいタスクを指定して Luigi を実行する
    luigi.run(main_task_cls=Greet, local_scheduler=True)
    # あるいは
    # task = Greet()
    # luigi.build([task], local_scheduler=True)


if __name__ == '__main__':
    main()

上記のようにしてあればファイルを指定するだけで実行できるようになる。

$ python easyrun.py 
DEBUG: Checking if Greet() is complete
/Users/amedama/.virtualenvs/py35/lib/python3.5/site-packages/luigi/worker.py:328: UserWarning: Task Greet() without outputs has no custom complete() method
  is_complete = task.complete()
INFO: Informed scheduler that task   Greet__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 7165] Worker Worker(salt=224720963, workers=1, host=macbookair.local, username=amedama, pid=7165) running   Greet()
Hello, World!
INFO: [pid 7165] Worker Worker(salt=224720963, workers=1, host=macbookair.local, username=amedama, pid=7165) done      Greet()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Greet__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=224720963, workers=1, host=macbookair.local, username=amedama, pid=7165) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 Greet()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

ターゲットを指定する

Luigi において、もう一つ重要な概念がターゲットというもの。 これは、タスクを実行した結果を永続化する先を表している。 例えば、永続化する先はローカルディスクだったり Amazon S3 だったり HDFS だったりする。 タスクをチェーンしていく場合、このターゲットを介してデータをやり取りする。 つまり、一つ前のタスクが出力したターゲットの内容が、次のタスクの入力になるというわけ。

ちなみに、実は Luigi ではターゲットの指定がほとんど必須といってよいものになっている。 これまでの例では指定してなかったけど、それは単にすごくシンプルだったので何とかなっていたに過ぎない。 実際のところ、実行するときに次のような警告が出ていたのはターゲットを指定していなかったのが原因になっている。

/Users/amedama/.virtualenvs/py35/lib/python3.5/site-packages/luigi/worker.py:328: UserWarning: Task Greet() without outputs has no custom complete() method
  is_complete = task.complete()

次のサンプルコードでは、ターゲットとしてローカルディスクを指定している。 永続化する先としてローカルディスクを指定するときは LocalTarget というターゲットを使う。 ターゲットを指定するにはタスクのクラスに output() というメソッドをオーバーライドする。 そして、タスクの本体ともいえる run() メソッドでは output() メソッドで得られるターゲットに対して実行結果を書き込む。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi


class Greet(luigi.Task):

    def output(self):
        # Task の実行結果を保存する先を Target で指定する
        # LocalTarget はローカルディスクにファイルとして残す
        return luigi.LocalTarget('greeting.txt')

    def run(self):
        # output() メソッドで保存先の Target を取得する
        out = self.output()
        # ファイルを開く
        with out.open('w') as f:
            # 実行結果を書き込む
            f.write('Hello, World!\n')


def main():
    luigi.run(main_task_cls=Greet, local_scheduler=True)


if __name__ == '__main__':
    main()

それでは、上記を実行してみよう。 今回は、先ほどは表示されていた警告が出ていない点に注目してほしい。

$ python output.py 
DEBUG: Checking if Greet() is complete
INFO: Informed scheduler that task   Greet__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 7230] Worker Worker(salt=707987256, workers=1, host=macbookair.local, username=amedama, pid=7230) running   Greet()
INFO: [pid 7230] Worker Worker(salt=707987256, workers=1, host=macbookair.local, username=amedama, pid=7230) done      Greet()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Greet__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=707987256, workers=1, host=macbookair.local, username=amedama, pid=7230) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 Greet()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

そして、実行すると greeting.txt というファイルがディレクトリに作られている。

$ cat greeting.txt
Hello, World!

このように Luigi ではタスクの実行結果を永続化する。

テスト用途としてメモリをターゲットにする

先ほどの例ではターゲットとしてローカルディスクを使った。 ちなみに、テスト用途としてならターゲットをメモリにすることもできる。

次のサンプルコードでは MockTarget を使うことで実行結果を保存する先をメモリにしている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi
from luigi.mock import MockTarget


class Greet(luigi.Task):

    def output(self):
        # MockTarget はオンメモリにしか実行結果が残らない
        # 内部実装には io.Bytes が使われている
        # あくまでユニットテスト用途
        return MockTarget(self.__class__.__name__)

    def run(self):
        out = self.output()
        with out.open('w') as f:
            f.write('Hello, World!\n')


def main():
    luigi.run(main_task_cls=Greet, local_scheduler=True)


if __name__ == '__main__':
    main()

念のため、先ほど出力されたファイルを削除した上で上記を実行してみよう。

$ rm greeting.txt
$ python mock.py 
DEBUG: Checking if Greet() is complete
INFO: Informed scheduler that task   Greet__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 7256] Worker Worker(salt=817197119, workers=1, host=macbookair.local, username=amedama, pid=7256) running   Greet()
INFO: [pid 7256] Worker Worker(salt=817197119, workers=1, host=macbookair.local, username=amedama, pid=7256) done      Greet()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Greet__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=817197119, workers=1, host=macbookair.local, username=amedama, pid=7256) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 Greet()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

当然だけど、今度はファイルができない。

$ cat greeting.txt
cat: greeting.txt: No such file or directory

複数のタスクをチェーンする

これまでの例では一つのタスクを実行するだけだった。 現実世界の仕事では、タスクが一つだけということは滅多にないはず。 もちろん、色々なことをする巨大な一つのタスクを書くことも考えられるけど、それだと Luigi を使う魅力は半減してしまう。 なるべく、それ以上は細分化できないレベルまで仕事を小さくして、それをタスクに落とし込む。

次のサンプルコードでは Greet というタスクと Repeat というタスクをチェーンしている。 Repeat の実行には、まず Greet の実行が必要になる。 このような依存関係を表現するには、タスクのクラスに requires() というメソッドをオーバーライドする。 サンプルコードの内容としては Greet で出力された内容を Repeat で複数繰り返して出力する、というもの。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi


class Greet(luigi.Task):

    def output(self):
        return luigi.LocalTarget('greeting.txt')

    def run(self):
        out = self.output()
        with out.open('w') as f:
            f.write('Hello, World!\n')


class Repeat(luigi.Task):

    def requires(self):
        """Task の実行に必要な別の Task を指定する"""
        return Greet()

    def output(self):
        return luigi.LocalTarget('repeating.txt')

    def run(self):
        # requires() で指定した Task の実行結果 (Target) は input() メソッドで得られる
        input_ = self.input()
        output = self.output()

        with input_.open('r') as r, output.open('w') as w:
            # Greet が出力した内容を読み込む
            lines = r.readlines()

            # Greet の出力を 3 回繰り返し書き込む
            for _ in range(3):
                w.writelines(lines)


def main():
    luigi.run(main_task_cls=Repeat, local_scheduler=True)


if __name__ == '__main__':
    main()

上記を実行してみよう。

$ python requires.py 
DEBUG: Checking if Repeat() is complete
DEBUG: Checking if Greet() is complete
INFO: Informed scheduler that task   Repeat__99914b932b   has status   PENDING
INFO: Informed scheduler that task   Greet__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 7292] Worker Worker(salt=268804510, workers=1, host=macbookair.local, username=amedama, pid=7292) running   Greet()
INFO: [pid 7292] Worker Worker(salt=268804510, workers=1, host=macbookair.local, username=amedama, pid=7292) done      Greet()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Greet__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 7292] Worker Worker(salt=268804510, workers=1, host=macbookair.local, username=amedama, pid=7292) running   Repeat()
INFO: [pid 7292] Worker Worker(salt=268804510, workers=1, host=macbookair.local, username=amedama, pid=7292) done      Repeat()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Repeat__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=268804510, workers=1, host=macbookair.local, username=amedama, pid=7292) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 Greet()
    - 1 Repeat()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

すると、次のようにして実行結果のファイルが作られる。

$ cat greeting.txt 
Hello, World!
$ cat repeating.txt 
Hello, World!
Hello, World!
Hello, World!

ちなみに requires() メソッドをオーバーライドせずに依存関係を表現することもできるにはできる。 それは、次のように run() メソッドの中で直接タスクをインスタンス化して yield でターゲットを取得する方法。 これは、Dynamic dependencies と呼ばれている。 とはいえ、タスクの依存関係が分かりにくくなるので、使うのはなるべく避けた方が良い気がする。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi


class Greet(luigi.Task):

    def output(self):
        return luigi.LocalTarget('greeting.txt')

    def run(self):
        out = self.output()
        with out.open('w') as f:
            f.write('Hello, World!\n')


class Repeat(luigi.Task):

    def output(self):
        return luigi.LocalTarget('repeating.txt')

    def run(self):
        # requires() せずに直接 Task から yield するやり方もある
        input_ = yield Greet()
        output = self.output()

        with input_.open('r') as r, output.open('w') as w:
            lines = r.readlines()

            for _ in range(3):
                w.writelines(lines)


def main():
    luigi.run(main_task_cls=Repeat, local_scheduler=True)


if __name__ == '__main__':
    main()

また、依存するタスクは一つとは限らない。 そんなときは requires() メソッドの中でリストの形で依存するタスクを列挙する。 あるいは yield で一つずつ列挙していっても構わない。 ただし yield を使うやり方だと、後から依存タスクの実行を並列化したいときも、必ず直列に実行されるようになるらしい。 逆に言えば直列に実行することを担保したいときは yield を使うのが良さそう。 また、複数のタスクを集約して実行するためだけのタスクについては WrapperTask を継承すると良いらしい。

次のサンプルコードでは GreetEat そして Sleep というタスクを実行するためのタスクとして RequiresOnly を定義している。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi
from luigi.mock import MockTarget


class OnMemoryTask(luigi.Task):

    def output(self):
        return MockTarget(self.__class__.__name__)


class Greet(OnMemoryTask):

    def run(self):
        out = self.output()
        with out.open('w') as f:
            f.write('Hello, World!\n')


class Eat(OnMemoryTask):

    def run(self):
        out = self.output()
        with out.open('w') as f:
            f.write('Mog, Mog\n')


class Sleep(OnMemoryTask):

    def run(self):
        out = self.output()
        with out.open('w') as f:
            f.write('Zzz...\n')


class RequiresOnly(luigi.WrapperTask):

    def requires(self):
        # 依存している Task は複数書ける
        return [Greet(), Eat(), Sleep()]
        # あるいは
        # yield Greet()
        # yield Eat()
        # yield Sleep()
        # としても良い


def main():
    luigi.run(main_task_cls=RequiresOnly, local_scheduler=True)


if __name__ == '__main__':
    main()

上記を実行してみよう。 タスクの並列度を上げるには --workers オプションを指定する。

$ python reqonly.py --workers 3
DEBUG: Checking if RequiresOnly() is complete
DEBUG: Checking if Greet() is complete
DEBUG: Checking if Eat() is complete
DEBUG: Checking if Sleep() is complete
INFO: Informed scheduler that task   RequiresOnly__99914b932b   has status   PENDING
INFO: Informed scheduler that task   Sleep__99914b932b   has status   PENDING
INFO: Informed scheduler that task   Eat__99914b932b   has status   PENDING
INFO: Informed scheduler that task   Greet__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 3 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 4
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 3
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 7362] Worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360) running   Eat()
DEBUG: 3 running tasks, waiting for next task to finish
INFO: [pid 7362] Worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360) done      Eat()
INFO: [pid 7363] Worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360) running   Greet()
INFO: [pid 7363] Worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360) done      Greet()
INFO: Informed scheduler that task   Eat__99914b932b   has status   DONE
INFO: [pid 7364] Worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360) running   Sleep()
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: Greet__99914b932b is currently run by worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360)
DEBUG: Sleep__99914b932b is currently run by worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360)
INFO: Informed scheduler that task   Greet__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: Sleep__99914b932b is currently run by worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360)
INFO: [pid 7364] Worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360) done      Sleep()
INFO: Informed scheduler that task   Sleep__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: RequiresOnly__99914b932b is currently run by worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360)
INFO: [pid 7365] Worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360) running   RequiresOnly()
INFO: [pid 7365] Worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360) done      RequiresOnly()
INFO: Informed scheduler that task   RequiresOnly__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 4 tasks of which:
* 4 ran successfully:
    - 1 Eat()
    - 1 Greet()
    - 1 RequiresOnly()
    - 1 Sleep()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

タスクにパラメータを受け取る

タスクが動作するときに色々なパラメータを受け取りたい、という場面も多いはず。 そんなときは *Parameter を使えば良い。 次のサンプルコードでは、前述した Repeat タスクで繰り返しの数をパラメータ化している。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi


class Greet(luigi.Task):

    def output(self):
        return luigi.LocalTarget('greeting.txt')

    def run(self):
        out = self.output()
        with out.open('w') as f:
            f.write('Hello, World!\n')


class Repeat(luigi.Task):
    # Task の実行に必要な引数を Parameter として受け取る
    repeat_n = luigi.IntParameter(default=3)

    def requires(self):
        return Greet()

    def output(self):
        return luigi.LocalTarget('repeating.txt')

    def run(self):
        input_ = self.input()
        output = self.output()

        with input_.open('r') as r, output.open('w') as w:
            lines = r.readlines()

            # Parameter は Task のアトリビュートとして使える
            for _ in range(self.repeat_n):
                w.writelines(lines)


def main():
    luigi.run(main_task_cls=Repeat, local_scheduler=True)


if __name__ == '__main__':
    main()

上記を実行する前に、先ほど生成されたファイルを削除しておく。 これは、実行結果のファイルがあるときは実行がスキップされるため。 この動作は何のターゲットを使っても基本的にそうなっている。

$ rm repeating.txt

実行してみよう。 パラメータはコマンドラインから渡す。 例えば、今回の例では --repeat-n オプションになる。

$ python params.py --repeat-n=5
DEBUG: Checking if Repeat(repeat_n=5) is complete
DEBUG: Checking if Greet() is complete
INFO: Informed scheduler that task   Repeat_5_96577e160e   has status   PENDING
INFO: Informed scheduler that task   Greet__99914b932b   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 7436] Worker Worker(salt=344136773, workers=1, host=macbookair.local, username=amedama, pid=7436) running   Repeat(repeat_n=5)
INFO: [pid 7436] Worker Worker(salt=344136773, workers=1, host=macbookair.local, username=amedama, pid=7436) done      Repeat(repeat_n=5)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Repeat_5_96577e160e   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=344136773, workers=1, host=macbookair.local, username=amedama, pid=7436) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 1 present dependencies were encountered:
    - 1 Greet()
* 1 ran successfully:
    - 1 Repeat(repeat_n=5)

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

サマリーから Greet タスクはファイルがあるので実行されていないことが分かる。

Repeat タスクのターゲットは、パラメータに応じて再実行された。

$ cat repeating.txt 
Hello, World!
Hello, World!
Hello, World!
Hello, World!
Hello, World!

このように Luigi は実行結果があるときは実行をスキップするようになっている。 そのため、途中でタスクが失敗したときも原因を取り除いて再実行したとき最小限の処理でやり直すことができる。

特定期間のバッチ処理を実行する

例えば、あるバッチ処理が毎日決まった時間に実行されているとしよう。 それが、何らかの原因で失敗して、その日のうちに解決できなかったとする。 こんなときは、基本的に失敗した日の内容も後からリカバーしなきゃいけない。 ただ、当日以外のタスクを実行するというのは、そのように考えられて作っていないと意外と難しいもの。 Luigi だと、そういったユースケースもカバーしやすいように作られている。

例えば、次のようにして処理対象の日付を受け取って実行するようなタスクがあるとする。 こんな風に、処理対象の日付を指定するように作っておくのはバッチ処理でよく使われるパターンらしい。 また、生成されるターゲットのファイルに日付を含むようにしておくのもポイント。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi


class DailyGreet(luigi.Task):
    date = luigi.DateParameter()

    def output(self):
        return luigi.LocalTarget('daily-target-{}.txt'.format(str(self.date)))

    def run(self):
        out = self.output()
        with out.open('w') as f:
            f.write('Hello, World!\n')


def main():
    luigi.run(main_task_cls=DailyGreet, local_scheduler=True)


if __name__ == '__main__':
    main()

ひとまず、通常通り実行してみることにしよう。

$ python daily.py --date=2017-5-13
DEBUG: Checking if DailyGreet(date=2017-05-13) is complete
INFO: Informed scheduler that task   DailyGreet_2017_05_13_284b40e6ab   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 7481] Worker Worker(salt=817061721, workers=1, host=macbookair.local, username=amedama, pid=7481) running   DailyGreet(date=2017-05-13)
INFO: [pid 7481] Worker Worker(salt=817061721, workers=1, host=macbookair.local, username=amedama, pid=7481) done      DailyGreet(date=2017-05-13)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   DailyGreet_2017_05_13_284b40e6ab   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=817061721, workers=1, host=macbookair.local, username=amedama, pid=7481) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 DailyGreet(date=2017-05-13)

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

すると、実行結果のファイルが日付付きで作られる。

$ cat daily-target-2017-05-13.txt 
Hello, World!

Luigi では、このように日付を含むタスクを期間指定で一気に片付けるような方法が用意されている。 具体的には、次のようにして daily モジュールの RangeDailyBase タスクを指定した上で、--of オプションで実行したいタスクを指定する。 その上で、実行する日付を --start--stop オプションで指定する。

$ python -m luigi --module daily RangeDailyBase --of DailyGreet --start 2017-05-01 --stop 2017-05-12 --local-scheduler
...(省略)...
===== Luigi Execution Summary =====

Scheduled 12 tasks of which:
* 12 ran successfully:
    - 11 DailyGreet(date=2017-05-01...2017-05-11)
    - 1 RangeDailyBase(...)

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

すると、範囲内の日付でタスクが一気に実行される。

$ ls daily-target-2017-05-*
daily-target-2017-05-01.txt daily-target-2017-05-07.txt
daily-target-2017-05-02.txt daily-target-2017-05-08.txt
daily-target-2017-05-03.txt daily-target-2017-05-09.txt
daily-target-2017-05-04.txt daily-target-2017-05-10.txt
daily-target-2017-05-05.txt daily-target-2017-05-11.txt
daily-target-2017-05-06.txt daily-target-2017-05-13.txt

例えば、数日前から今日の日付までの範囲を指定して、先ほどの内容を cron などで指定しておくとする。 そうすれば、実行結果のあるものについてはスキップされることから、まだ完了していないものだけを実行できる。 もし一過性の問題で失敗していたとしても、翌日のバッチ処理などで自動的にリカバーされるというわけ。

組み込みのタスクを活用する

Luigi には、色々な外部コンポーネントやサービス、ライブラリと連携するための組み込みタスクが用意されている。 それらは contrib パッケージの中にあって、ざっと見ただけでも例えば Hadoop や Kubernetes などがある。

github.com

今回は一例として Python のオブジェクト・リレーショナルマッパーの一つである SQLAlchemy との連携を試してみる。

まずは SQLAlchemy をインストールしておく。

$ pip install sqlalchemy

次のサンプルコードでは UsersTask が出力する内容を SQLite3 のデータベースに保存している。 luigi.contrib.sqla.CopyToTable を継承したタスクは、依存するタスクからタブ区切りのターゲットを受け取って、内容を各カラムに保存していく。 今回はユーザ情報を模したテーブルを作るようにしてみた。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi
from luigi.mock import MockTarget
from luigi.contrib import sqla

from sqlalchemy import String
from sqlalchemy import Integer


class UsersTask(luigi.Task):

    def output(self):
        return MockTarget(self.__class__.__name__)

    def run(self):
        out = self.output()
        with out.open('w') as f:
            # 実行結果をタブ区切りで書き込んでいく
            f.write('Alice\t24\n')
            f.write('Bob\t30\n')
            f.write('Carol\t18\n')


# 実行結果を SQLAlchemy 経由で RDB に保存するための Task
class SQLATask(sqla.CopyToTable):
    # SQLAlchemy のテーブル定義
    columns = [
        (['name', String(64)], {'primary_key': True}),
        (['age', Integer()], {})
    ]
    # データベースへの接続 URL
    connection_string = 'sqlite:///users.db'
    # 保存するのに使うテーブル名
    table = 'users'

    def requires(self):
        # 実行結果がタブ区切りになっていれば、あとは依存先に指定するだけ
        return UsersTask()


def main():
    luigi.run(main_task_cls=SQLATask, local_scheduler=True)


if __name__ == '__main__':
    main()

上記を実行してみる。

$ python sqla.py        
DEBUG: Checking if SQLATask() is complete
DEBUG: Checking if UsersTask() is complete
INFO: Informed scheduler that task   SQLATask__99914b932b   has status   PENDING
INFO: Informed scheduler that task   UsersTask__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 7638] Worker Worker(salt=923323852, workers=1, host=macbookair.local, username=amedama, pid=7638) running   UsersTask()
INFO: [pid 7638] Worker Worker(salt=923323852, workers=1, host=macbookair.local, username=amedama, pid=7638) done      UsersTask()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   UsersTask__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 7638] Worker Worker(salt=923323852, workers=1, host=macbookair.local, username=amedama, pid=7638) running   SQLATask()
INFO: Running task copy to table for update id SQLATask__99914b932b for table users
INFO: Finished inserting 0 rows into SQLAlchemy target
INFO: Finished inserting rows into SQLAlchemy target
INFO: [pid 7638] Worker Worker(salt=923323852, workers=1, host=macbookair.local, username=amedama, pid=7638) done      SQLATask()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   SQLATask__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=923323852, workers=1, host=macbookair.local, username=amedama, pid=7638) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 SQLATask()
    - 1 UsersTask()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

すると SQLite3 のデータベースができる。

$ file users.db 
users.db: SQLite 3.x database

中身を見るとユーザ情報が保存されている。

$ sqlite3 users.db "SELECT * FROM users"
Alice|24
Bob|30
Carol|18

セントラルスケジューラを使う

前述した通り Luigi にはローカルスケジューラとセントラルスケジューラのモードがある。 これまでの例はローカルスケジューラを使うものだった。 次はセントラルスケジューラを使ってみることにする。

まず、Luigi のセントラルスケジューラには複数のジョブが同時に実行されないよう調整する機能が備わっている。 そして、タスクの状態などを確認するための WebUI なども付属する。 ただし、スケジューラ自体にはタスクを実行する機能はなくて、あくまで上記の二つの機能だけが提供されている。 つまり、タスクの実行自体はセントラルスケジューラに接続したクライアント (ワーカー) の仕事になる。 また、繰り返しになるけど定期・繰り返し実行などの機能はないので cron や systemd timer を使うことになる。

セントラルスケジューラは luigid というコマンドで起動する。

$ luigid

これで TCP:8082 ポートで WebUI が立ち上がる。

$ open http://localhost:8082

試しにセントラルスケジューラを使った状態でタスクを実行してみよう。 WebUI に成功 (done) なタスクが増えるはず。

$ python helloworld.py Greet

その他、セントラルスケジューラを動作させるときの設定などについては、このページを参照する。

Using the Central Scheduler — Luigi 2.6.1 documentation

成功・失敗時のコールバックを使う

タスクが成功したり失敗したときに何かしたいというニーズはあると思う。 例えば失敗したとき Slack とかのチャットに通知を送るとか。

次のサンプルコードでは、タスクに on_success() メソッドをオーバーライドしている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi


class Greet(luigi.Task):

    def run(self):
        print('Hello, World!')

    def on_success(self):
        """Task が成功したときのコールバック"""
        print('SUCCESS!')


def main():
    luigi.run(main_task_cls=Greet, local_scheduler=True)


if __name__ == '__main__':
    main()

上記を実行してみよう。 成功時のコールバックが呼び出されていることが分かる。

$ python event.py 
DEBUG: Checking if Greet() is complete
/Users/amedama/.virtualenvs/py35/lib/python3.5/site-packages/luigi/worker.py:328: UserWarning: Task Greet() without outputs has no custom complete() method
  is_complete = task.complete()
INFO: Informed scheduler that task   Greet__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 8126] Worker Worker(salt=727065743, workers=1, host=macbookair.local, username=amedama, pid=8126) running   Greet()
Hello, World!
SUCCESS!
INFO: [pid 8126] Worker Worker(salt=727065743, workers=1, host=macbookair.local, username=amedama, pid=8126) done      Greet()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Greet__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=727065743, workers=1, host=macbookair.local, username=amedama, pid=8126) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 Greet()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

より広範囲に、どのタスクの成功や失敗でも呼び出されるようにしたいときは次のようにする。 @luigi.Task.event_handler デコレータでコールバックを修飾した上で、呼び出すタイミングを引数で指定する。 今回は luigi.Event.SUCCESS を指定しているのでタスクが成功したときに呼び出される。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi


class Greet(luigi.Task):

    def run(self):
        print('Hello, World!')


@luigi.Task.event_handler(luigi.Event.SUCCESS)
def success_handler(task):
    """Task が成功したときのコールバック (汎用)"""
    print('SUCCESS: {}'.format(task))


def main():
    luigi.run(main_task_cls=Greet, local_scheduler=True)


if __name__ == '__main__':
    main()

こちらも実行してみよう。 コールバックが呼び出されていることが分かる。

$ python event2.py 
DEBUG: Checking if Greet() is complete
/Users/amedama/.virtualenvs/py35/lib/python3.5/site-packages/luigi/worker.py:328: UserWarning: Task Greet() without outputs has no custom complete() method
  is_complete = task.complete()
INFO: Informed scheduler that task   Greet__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 8136] Worker Worker(salt=042982575, workers=1, host=macbookair.local, username=amedama, pid=8136) running   Greet()
Hello, World!
INFO: [pid 8136] Worker Worker(salt=042982575, workers=1, host=macbookair.local, username=amedama, pid=8136) done      Greet()
SUCCESS: Greet()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Greet__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=042982575, workers=1, host=macbookair.local, username=amedama, pid=8136) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 Greet()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

まとめ

今回は Python でバッチ処理を作るときデータパイプラインを構築するためのフレームワークである Luigi を使ってみた。 触ってみた感触としては部分的にクセはあるものの、なかなかシンプルな作りで用途にハマれば使いやすそうだ。