最近になって、バッチ処理においてデータパイプラインを組むためのフレームワークとして Luigi というものがあることを知った。
これは、Spotify という音楽のストリーミングサービスを提供する会社が作ったものらしい。
似たような OSS としては他にも Apache Airflow がある。
こちらは民宿サービスを提供する Airbnb が作ったものだけど、最近 Apache に寄贈されてインキュベータープロジェクトとなった。
Luigi の特徴としては、バッチ処理に特化している点が挙げられる。
ただし、定期的にバッチ処理を実行するような機能はない。
そこは、代わりに cron や systemd timer を使ってやる必要がある。
また、本体もそうだけどデータパイプラインについても Python を使って書く必要がある。
今回は、そんな Luigi を一通り触ってみることにする。
使った環境は次の通り。
$ sw_vers
ProductName: Mac OS X
ProductVersion: 10.12.4
BuildVersion: 16E195
$ python --version
Python 3.5.3
インストールする
インストールは Python のパッケージマネージャの pip を使ってさくっとできる。
$ pip install luigi
$ pip list --format=columns | grep luigi
luigi 2.6.1
ハローワールド
まず、Luigi における最も重要な概念としてタスクというものを知る必要がある。
タスクというのは、ユーザが Luigi にやらせたい何らかの仕事を細分化した最小単位となる。
そして、タスクを定義するには Task というクラスを継承したサブクラスを作る。
次のサンプルコードでは、最も単純なタスクを定義している。
このタスクでは run()
メソッドをオーバーライドしてメッセージをプリントするようになっている。
import luigi
class Greet(luigi.Task):
def run(self):
"""run() メソッドで具体的な処理を実行する"""
print('Hello, World!')
def main():
luigi.run()
if __name__ == '__main__':
main()
上記のタスクを実行するには、次のようにする。
第二引数に渡している Greet
が、実行したいタスク (のクラス名) を指している。
--local-scheduler
オプションは、タスクのスケジューラをローカルモードに指定している。
スケジューラにはローカルとセントラルの二種類があるんだけど、詳しくは後ほど説明する。
とりあえず開発用途で使うならローカル、と覚えておけば良いかな。
$ python helloworld.py Greet --local-scheduler
実際に実行してみると、次のような出力が得られる。
実行サマリーとして Greet
タスクが成功していることや、ログの中にメッセージが出力されていることが見て取れる。
$ python helloworld.py Greet --local-scheduler
DEBUG: Checking if Greet() is complete
/Users/amedama/.virtualenvs/py35/lib/python3.5/site-packages/luigi/worker.py:328: UserWarning: Task Greet() without outputs has no custom complete() method
is_complete = task.complete()
INFO: Informed scheduler that task Greet__99914b932b has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 7082] Worker Worker(salt=970230875, workers=1, host=macbookair.local, username=amedama, pid=7082) running Greet()
Hello, World!
INFO: [pid 7082] Worker Worker(salt=970230875, workers=1, host=macbookair.local, username=amedama, pid=7082) done Greet()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task Greet__99914b932b has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=970230875, workers=1, host=macbookair.local, username=amedama, pid=7082) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 1 tasks of which:
* 1 ran successfully:
- 1 Greet()
This progress looks :) because there were no failed tasks or missing external dependencies
===== Luigi Execution Summary =====
上記は、代わりに次のようにすることもできる。
このやり方では、まず python コマンドで luigi
モジュールを実行して、そこ経由でタスクを実行している。
$ python -m luigi --module helloworld Greet --local-scheduler
このやり方の利点としては、パッケージ化されたインストール済みのタスクを実行できる点が挙げられる。
先ほどはカレントワーキングディレクトリ (CWD) にある Python ファイル (モジュール) を直接指定して実行していた。
それに対し、こちらは Python のパスにもとづいてモジュールが実行されるので、インストール済みのモジュールなら CWD の場所に関わらず実行できる。
そして、python コマンドのパスには CWD も含まれるので、先ほどと同じように実行できるというわけ。
もし、Python のモジュールとかがよく分からないというときは必要に応じて以下を参照してもらえると。
ようするに言いたいことは Python においてモジュールって呼ばれてるのは単なる Python ファイルのことだよ、という点。
blog.amedama.jp
また、もう一つのやり方として luigi
コマンドを使うこともできる。
この場合も、実行するモジュールは --module
オプションで指定する。
ただし、このやり方だと Python のパスに CWD が含まれない。
そのため CWD にあるモジュールを実行したいときは別途 PYTHONPATH
環境変数を指定する必要がある。
$ PYTHONPATH=. luigi --module helloworld Greet --local-scheduler
実行するタスクをモジュール内で指定する
とはいえ、毎回コマンドラインで実行するモジュールを指定するのも面倒だと思う。
そんなときは、次のようにして luigi.run()
に実行するときのパラメータを指定してやると良さそう。
開発や検証用途なら、こうしておくと楽ちん。
import luigi
class Greet(luigi.Task):
def run(self):
print('Hello, World!')
def main():
luigi.run(main_task_cls=Greet, local_scheduler=True)
if __name__ == '__main__':
main()
上記のようにしてあればファイルを指定するだけで実行できるようになる。
$ python easyrun.py
DEBUG: Checking if Greet() is complete
/Users/amedama/.virtualenvs/py35/lib/python3.5/site-packages/luigi/worker.py:328: UserWarning: Task Greet() without outputs has no custom complete() method
is_complete = task.complete()
INFO: Informed scheduler that task Greet__99914b932b has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 7165] Worker Worker(salt=224720963, workers=1, host=macbookair.local, username=amedama, pid=7165) running Greet()
Hello, World!
INFO: [pid 7165] Worker Worker(salt=224720963, workers=1, host=macbookair.local, username=amedama, pid=7165) done Greet()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task Greet__99914b932b has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=224720963, workers=1, host=macbookair.local, username=amedama, pid=7165) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 1 tasks of which:
* 1 ran successfully:
- 1 Greet()
This progress looks :) because there were no failed tasks or missing external dependencies
===== Luigi Execution Summary =====
ターゲットを指定する
Luigi において、もう一つ重要な概念がターゲットというもの。
これは、タスクを実行した結果を永続化する先を表している。
例えば、永続化する先はローカルディスクだったり Amazon S3 だったり HDFS だったりする。
タスクをチェーンしていく場合、このターゲットを介してデータをやり取りする。
つまり、一つ前のタスクが出力したターゲットの内容が、次のタスクの入力になるというわけ。
ちなみに、実は Luigi ではターゲットの指定がほとんど必須といってよいものになっている。
これまでの例では指定してなかったけど、それは単にすごくシンプルだったので何とかなっていたに過ぎない。
実際のところ、実行するときに次のような警告が出ていたのはターゲットを指定していなかったのが原因になっている。
/Users/amedama/.virtualenvs/py35/lib/python3.5/site-packages/luigi/worker.py:328: UserWarning: Task Greet() without outputs has no custom complete() method
is_complete = task.complete()
次のサンプルコードでは、ターゲットとしてローカルディスクを指定している。
永続化する先としてローカルディスクを指定するときは LocalTarget
というターゲットを使う。
ターゲットを指定するにはタスクのクラスに output()
というメソッドをオーバーライドする。
そして、タスクの本体ともいえる run()
メソッドでは output()
メソッドで得られるターゲットに対して実行結果を書き込む。
import luigi
class Greet(luigi.Task):
def output(self):
return luigi.LocalTarget('greeting.txt')
def run(self):
out = self.output()
with out.open('w') as f:
f.write('Hello, World!\n')
def main():
luigi.run(main_task_cls=Greet, local_scheduler=True)
if __name__ == '__main__':
main()
それでは、上記を実行してみよう。
今回は、先ほどは表示されていた警告が出ていない点に注目してほしい。
$ python output.py
DEBUG: Checking if Greet() is complete
INFO: Informed scheduler that task Greet__99914b932b has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 7230] Worker Worker(salt=707987256, workers=1, host=macbookair.local, username=amedama, pid=7230) running Greet()
INFO: [pid 7230] Worker Worker(salt=707987256, workers=1, host=macbookair.local, username=amedama, pid=7230) done Greet()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task Greet__99914b932b has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=707987256, workers=1, host=macbookair.local, username=amedama, pid=7230) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 1 tasks of which:
* 1 ran successfully:
- 1 Greet()
This progress looks :) because there were no failed tasks or missing external dependencies
===== Luigi Execution Summary =====
そして、実行すると greeting.txt
というファイルがディレクトリに作られている。
$ cat greeting.txt
Hello, World!
このように Luigi ではタスクの実行結果を永続化する。
テスト用途としてメモリをターゲットにする
先ほどの例ではターゲットとしてローカルディスクを使った。
ちなみに、テスト用途としてならターゲットをメモリにすることもできる。
次のサンプルコードでは MockTarget
を使うことで実行結果を保存する先をメモリにしている。
import luigi
from luigi.mock import MockTarget
class Greet(luigi.Task):
def output(self):
return MockTarget(self.__class__.__name__)
def run(self):
out = self.output()
with out.open('w') as f:
f.write('Hello, World!\n')
def main():
luigi.run(main_task_cls=Greet, local_scheduler=True)
if __name__ == '__main__':
main()
念のため、先ほど出力されたファイルを削除した上で上記を実行してみよう。
$ rm greeting.txt
$ python mock.py
DEBUG: Checking if Greet() is complete
INFO: Informed scheduler that task Greet__99914b932b has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 7256] Worker Worker(salt=817197119, workers=1, host=macbookair.local, username=amedama, pid=7256) running Greet()
INFO: [pid 7256] Worker Worker(salt=817197119, workers=1, host=macbookair.local, username=amedama, pid=7256) done Greet()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task Greet__99914b932b has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=817197119, workers=1, host=macbookair.local, username=amedama, pid=7256) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 1 tasks of which:
* 1 ran successfully:
- 1 Greet()
This progress looks :) because there were no failed tasks or missing external dependencies
===== Luigi Execution Summary =====
当然だけど、今度はファイルができない。
$ cat greeting.txt
cat: greeting.txt: No such file or directory
複数のタスクをチェーンする
これまでの例では一つのタスクを実行するだけだった。
現実世界の仕事では、タスクが一つだけということは滅多にないはず。
もちろん、色々なことをする巨大な一つのタスクを書くことも考えられるけど、それだと Luigi を使う魅力は半減してしまう。
なるべく、それ以上は細分化できないレベルまで仕事を小さくして、それをタスクに落とし込む。
次のサンプルコードでは Greet
というタスクと Repeat
というタスクをチェーンしている。
Repeat
の実行には、まず Greet
の実行が必要になる。
このような依存関係を表現するには、タスクのクラスに requires()
というメソッドをオーバーライドする。
サンプルコードの内容としては Greet
で出力された内容を Repeat
で複数繰り返して出力する、というもの。
import luigi
class Greet(luigi.Task):
def output(self):
return luigi.LocalTarget('greeting.txt')
def run(self):
out = self.output()
with out.open('w') as f:
f.write('Hello, World!\n')
class Repeat(luigi.Task):
def requires(self):
"""Task の実行に必要な別の Task を指定する"""
return Greet()
def output(self):
return luigi.LocalTarget('repeating.txt')
def run(self):
input_ = self.input()
output = self.output()
with input_.open('r') as r, output.open('w') as w:
lines = r.readlines()
for _ in range(3):
w.writelines(lines)
def main():
luigi.run(main_task_cls=Repeat, local_scheduler=True)
if __name__ == '__main__':
main()
上記を実行してみよう。
$ python requires.py
DEBUG: Checking if Repeat() is complete
DEBUG: Checking if Greet() is complete
INFO: Informed scheduler that task Repeat__99914b932b has status PENDING
INFO: Informed scheduler that task Greet__99914b932b has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 7292] Worker Worker(salt=268804510, workers=1, host=macbookair.local, username=amedama, pid=7292) running Greet()
INFO: [pid 7292] Worker Worker(salt=268804510, workers=1, host=macbookair.local, username=amedama, pid=7292) done Greet()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task Greet__99914b932b has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 7292] Worker Worker(salt=268804510, workers=1, host=macbookair.local, username=amedama, pid=7292) running Repeat()
INFO: [pid 7292] Worker Worker(salt=268804510, workers=1, host=macbookair.local, username=amedama, pid=7292) done Repeat()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task Repeat__99914b932b has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=268804510, workers=1, host=macbookair.local, username=amedama, pid=7292) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 2 tasks of which:
* 2 ran successfully:
- 1 Greet()
- 1 Repeat()
This progress looks :) because there were no failed tasks or missing external dependencies
===== Luigi Execution Summary =====
すると、次のようにして実行結果のファイルが作られる。
$ cat greeting.txt
Hello, World!
$ cat repeating.txt
Hello, World!
Hello, World!
Hello, World!
ちなみに requires()
メソッドをオーバーライドせずに依存関係を表現することもできるにはできる。
それは、次のように run()
メソッドの中で直接タスクをインスタンス化して yield
でターゲットを取得する方法。
これは、Dynamic dependencies と呼ばれている。
とはいえ、タスクの依存関係が分かりにくくなるので、使うのはなるべく避けた方が良い気がする。
import luigi
class Greet(luigi.Task):
def output(self):
return luigi.LocalTarget('greeting.txt')
def run(self):
out = self.output()
with out.open('w') as f:
f.write('Hello, World!\n')
class Repeat(luigi.Task):
def output(self):
return luigi.LocalTarget('repeating.txt')
def run(self):
input_ = yield Greet()
output = self.output()
with input_.open('r') as r, output.open('w') as w:
lines = r.readlines()
for _ in range(3):
w.writelines(lines)
def main():
luigi.run(main_task_cls=Repeat, local_scheduler=True)
if __name__ == '__main__':
main()
また、依存するタスクは一つとは限らない。
そんなときは requires()
メソッドの中でリストの形で依存するタスクを列挙する。
あるいは yield
で一つずつ列挙していっても構わない。
ただし yield
を使うやり方だと、後から依存タスクの実行を並列化したいときも、必ず直列に実行されるようになるらしい。
逆に言えば直列に実行することを担保したいときは yield
を使うのが良さそう。
また、複数のタスクを集約して実行するためだけのタスクについては WrapperTask
を継承すると良いらしい。
次のサンプルコードでは Greet
と Eat
そして Sleep
というタスクを実行するためのタスクとして RequiresOnly
を定義している。
import luigi
from luigi.mock import MockTarget
class OnMemoryTask(luigi.Task):
def output(self):
return MockTarget(self.__class__.__name__)
class Greet(OnMemoryTask):
def run(self):
out = self.output()
with out.open('w') as f:
f.write('Hello, World!\n')
class Eat(OnMemoryTask):
def run(self):
out = self.output()
with out.open('w') as f:
f.write('Mog, Mog\n')
class Sleep(OnMemoryTask):
def run(self):
out = self.output()
with out.open('w') as f:
f.write('Zzz...\n')
class RequiresOnly(luigi.WrapperTask):
def requires(self):
return [Greet(), Eat(), Sleep()]
def main():
luigi.run(main_task_cls=RequiresOnly, local_scheduler=True)
if __name__ == '__main__':
main()
上記を実行してみよう。
タスクの並列度を上げるには --workers
オプションを指定する。
$ python reqonly.py --workers 3
DEBUG: Checking if RequiresOnly() is complete
DEBUG: Checking if Greet() is complete
DEBUG: Checking if Eat() is complete
DEBUG: Checking if Sleep() is complete
INFO: Informed scheduler that task RequiresOnly__99914b932b has status PENDING
INFO: Informed scheduler that task Sleep__99914b932b has status PENDING
INFO: Informed scheduler that task Eat__99914b932b has status PENDING
INFO: Informed scheduler that task Greet__99914b932b has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 3 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 4
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 3
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 7362] Worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360) running Eat()
DEBUG: 3 running tasks, waiting for next task to finish
INFO: [pid 7362] Worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360) done Eat()
INFO: [pid 7363] Worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360) running Greet()
INFO: [pid 7363] Worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360) done Greet()
INFO: Informed scheduler that task Eat__99914b932b has status DONE
INFO: [pid 7364] Worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360) running Sleep()
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: Greet__99914b932b is currently run by worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360)
DEBUG: Sleep__99914b932b is currently run by worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360)
INFO: Informed scheduler that task Greet__99914b932b has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: Sleep__99914b932b is currently run by worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360)
INFO: [pid 7364] Worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360) done Sleep()
INFO: Informed scheduler that task Sleep__99914b932b has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: RequiresOnly__99914b932b is currently run by worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360)
INFO: [pid 7365] Worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360) running RequiresOnly()
INFO: [pid 7365] Worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360) done RequiresOnly()
INFO: Informed scheduler that task RequiresOnly__99914b932b has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 4 tasks of which:
* 4 ran successfully:
- 1 Eat()
- 1 Greet()
- 1 RequiresOnly()
- 1 Sleep()
This progress looks :) because there were no failed tasks or missing external dependencies
===== Luigi Execution Summary =====
タスクにパラメータを受け取る
タスクが動作するときに色々なパラメータを受け取りたい、という場面も多いはず。
そんなときは *Parameter
を使えば良い。
次のサンプルコードでは、前述した Repeat
タスクで繰り返しの数をパラメータ化している。
import luigi
class Greet(luigi.Task):
def output(self):
return luigi.LocalTarget('greeting.txt')
def run(self):
out = self.output()
with out.open('w') as f:
f.write('Hello, World!\n')
class Repeat(luigi.Task):
repeat_n = luigi.IntParameter(default=3)
def requires(self):
return Greet()
def output(self):
return luigi.LocalTarget('repeating.txt')
def run(self):
input_ = self.input()
output = self.output()
with input_.open('r') as r, output.open('w') as w:
lines = r.readlines()
for _ in range(self.repeat_n):
w.writelines(lines)
def main():
luigi.run(main_task_cls=Repeat, local_scheduler=True)
if __name__ == '__main__':
main()
上記を実行する前に、先ほど生成されたファイルを削除しておく。
これは、実行結果のファイルがあるときは実行がスキップされるため。
この動作は何のターゲットを使っても基本的にそうなっている。
$ rm repeating.txt
実行してみよう。
パラメータはコマンドラインから渡す。
例えば、今回の例では --repeat-n
オプションになる。
$ python params.py --repeat-n=5
DEBUG: Checking if Repeat(repeat_n=5) is complete
DEBUG: Checking if Greet() is complete
INFO: Informed scheduler that task Repeat_5_96577e160e has status PENDING
INFO: Informed scheduler that task Greet__99914b932b has status DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 7436] Worker Worker(salt=344136773, workers=1, host=macbookair.local, username=amedama, pid=7436) running Repeat(repeat_n=5)
INFO: [pid 7436] Worker Worker(salt=344136773, workers=1, host=macbookair.local, username=amedama, pid=7436) done Repeat(repeat_n=5)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task Repeat_5_96577e160e has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=344136773, workers=1, host=macbookair.local, username=amedama, pid=7436) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 2 tasks of which:
* 1 present dependencies were encountered:
- 1 Greet()
* 1 ran successfully:
- 1 Repeat(repeat_n=5)
This progress looks :) because there were no failed tasks or missing external dependencies
===== Luigi Execution Summary =====
サマリーから Greet
タスクはファイルがあるので実行されていないことが分かる。
Repeat
タスクのターゲットは、パラメータに応じて再実行された。
$ cat repeating.txt
Hello, World!
Hello, World!
Hello, World!
Hello, World!
Hello, World!
このように Luigi は実行結果があるときは実行をスキップするようになっている。
そのため、途中でタスクが失敗したときも原因を取り除いて再実行したとき最小限の処理でやり直すことができる。
特定期間のバッチ処理を実行する
例えば、あるバッチ処理が毎日決まった時間に実行されているとしよう。
それが、何らかの原因で失敗して、その日のうちに解決できなかったとする。
こんなときは、基本的に失敗した日の内容も後からリカバーしなきゃいけない。
ただ、当日以外のタスクを実行するというのは、そのように考えられて作っていないと意外と難しいもの。
Luigi だと、そういったユースケースもカバーしやすいように作られている。
例えば、次のようにして処理対象の日付を受け取って実行するようなタスクがあるとする。
こんな風に、処理対象の日付を指定するように作っておくのはバッチ処理でよく使われるパターンらしい。
また、生成されるターゲットのファイルに日付を含むようにしておくのもポイント。
import luigi
class DailyGreet(luigi.Task):
date = luigi.DateParameter()
def output(self):
return luigi.LocalTarget('daily-target-{}.txt'.format(str(self.date)))
def run(self):
out = self.output()
with out.open('w') as f:
f.write('Hello, World!\n')
def main():
luigi.run(main_task_cls=DailyGreet, local_scheduler=True)
if __name__ == '__main__':
main()
ひとまず、通常通り実行してみることにしよう。
$ python daily.py --date=2017-5-13
DEBUG: Checking if DailyGreet(date=2017-05-13) is complete
INFO: Informed scheduler that task DailyGreet_2017_05_13_284b40e6ab has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 7481] Worker Worker(salt=817061721, workers=1, host=macbookair.local, username=amedama, pid=7481) running DailyGreet(date=2017-05-13)
INFO: [pid 7481] Worker Worker(salt=817061721, workers=1, host=macbookair.local, username=amedama, pid=7481) done DailyGreet(date=2017-05-13)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task DailyGreet_2017_05_13_284b40e6ab has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=817061721, workers=1, host=macbookair.local, username=amedama, pid=7481) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 1 tasks of which:
* 1 ran successfully:
- 1 DailyGreet(date=2017-05-13)
This progress looks :) because there were no failed tasks or missing external dependencies
===== Luigi Execution Summary =====
すると、実行結果のファイルが日付付きで作られる。
$ cat daily-target-2017-05-13.txt
Hello, World!
Luigi では、このように日付を含むタスクを期間指定で一気に片付けるような方法が用意されている。
具体的には、次のようにして daily
モジュールの RangeDailyBase
タスクを指定した上で、--of
オプションで実行したいタスクを指定する。
その上で、実行する日付を --start
と --stop
オプションで指定する。
$ python -m luigi --module daily RangeDailyBase --of DailyGreet --start 2017-05-01 --stop 2017-05-12 --local-scheduler
...(省略)...
===== Luigi Execution Summary =====
Scheduled 12 tasks of which:
* 12 ran successfully:
- 11 DailyGreet(date=2017-05-01...2017-05-11)
- 1 RangeDailyBase(...)
This progress looks :) because there were no failed tasks or missing external dependencies
===== Luigi Execution Summary =====
すると、範囲内の日付でタスクが一気に実行される。
$ ls daily-target-2017-05-*
daily-target-2017-05-01.txt daily-target-2017-05-07.txt
daily-target-2017-05-02.txt daily-target-2017-05-08.txt
daily-target-2017-05-03.txt daily-target-2017-05-09.txt
daily-target-2017-05-04.txt daily-target-2017-05-10.txt
daily-target-2017-05-05.txt daily-target-2017-05-11.txt
daily-target-2017-05-06.txt daily-target-2017-05-13.txt
例えば、数日前から今日の日付までの範囲を指定して、先ほどの内容を cron などで指定しておくとする。
そうすれば、実行結果のあるものについてはスキップされることから、まだ完了していないものだけを実行できる。
もし一過性の問題で失敗していたとしても、翌日のバッチ処理などで自動的にリカバーされるというわけ。
組み込みのタスクを活用する
Luigi には、色々な外部コンポーネントやサービス、ライブラリと連携するための組み込みタスクが用意されている。
それらは contrib パッケージの中にあって、ざっと見ただけでも例えば Hadoop や Kubernetes などがある。
github.com
今回は一例として Python のオブジェクト・リレーショナルマッパーの一つである SQLAlchemy との連携を試してみる。
まずは SQLAlchemy をインストールしておく。
$ pip install sqlalchemy
次のサンプルコードでは UsersTask
が出力する内容を SQLite3 のデータベースに保存している。
luigi.contrib.sqla.CopyToTable
を継承したタスクは、依存するタスクからタブ区切りのターゲットを受け取って、内容を各カラムに保存していく。
今回はユーザ情報を模したテーブルを作るようにしてみた。
import luigi
from luigi.mock import MockTarget
from luigi.contrib import sqla
from sqlalchemy import String
from sqlalchemy import Integer
class UsersTask(luigi.Task):
def output(self):
return MockTarget(self.__class__.__name__)
def run(self):
out = self.output()
with out.open('w') as f:
f.write('Alice\t24\n')
f.write('Bob\t30\n')
f.write('Carol\t18\n')
class SQLATask(sqla.CopyToTable):
columns = [
(['name', String(64)], {'primary_key': True}),
(['age', Integer()], {})
]
connection_string = 'sqlite:///users.db'
table = 'users'
def requires(self):
return UsersTask()
def main():
luigi.run(main_task_cls=SQLATask, local_scheduler=True)
if __name__ == '__main__':
main()
上記を実行してみる。
$ python sqla.py
DEBUG: Checking if SQLATask() is complete
DEBUG: Checking if UsersTask() is complete
INFO: Informed scheduler that task SQLATask__99914b932b has status PENDING
INFO: Informed scheduler that task UsersTask__99914b932b has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 7638] Worker Worker(salt=923323852, workers=1, host=macbookair.local, username=amedama, pid=7638) running UsersTask()
INFO: [pid 7638] Worker Worker(salt=923323852, workers=1, host=macbookair.local, username=amedama, pid=7638) done UsersTask()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task UsersTask__99914b932b has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 7638] Worker Worker(salt=923323852, workers=1, host=macbookair.local, username=amedama, pid=7638) running SQLATask()
INFO: Running task copy to table for update id SQLATask__99914b932b for table users
INFO: Finished inserting 0 rows into SQLAlchemy target
INFO: Finished inserting rows into SQLAlchemy target
INFO: [pid 7638] Worker Worker(salt=923323852, workers=1, host=macbookair.local, username=amedama, pid=7638) done SQLATask()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task SQLATask__99914b932b has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=923323852, workers=1, host=macbookair.local, username=amedama, pid=7638) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 2 tasks of which:
* 2 ran successfully:
- 1 SQLATask()
- 1 UsersTask()
This progress looks :) because there were no failed tasks or missing external dependencies
===== Luigi Execution Summary =====
すると SQLite3 のデータベースができる。
$ file users.db
users.db: SQLite 3.x database
中身を見るとユーザ情報が保存されている。
$ sqlite3 users.db "SELECT * FROM users"
Alice|24
Bob|30
Carol|18
セントラルスケジューラを使う
前述した通り Luigi にはローカルスケジューラとセントラルスケジューラのモードがある。
これまでの例はローカルスケジューラを使うものだった。
次はセントラルスケジューラを使ってみることにする。
まず、Luigi のセントラルスケジューラには複数のジョブが同時に実行されないよう調整する機能が備わっている。
そして、タスクの状態などを確認するための WebUI なども付属する。
ただし、スケジューラ自体にはタスクを実行する機能はなくて、あくまで上記の二つの機能だけが提供されている。
つまり、タスクの実行自体はセントラルスケジューラに接続したクライアント (ワーカー) の仕事になる。
また、繰り返しになるけど定期・繰り返し実行などの機能はないので cron や systemd timer を使うことになる。
セントラルスケジューラは luigid
というコマンドで起動する。
$ luigid
これで TCP:8082 ポートで WebUI が立ち上がる。
$ open http://localhost:8082
試しにセントラルスケジューラを使った状態でタスクを実行してみよう。
WebUI に成功 (done) なタスクが増えるはず。
$ python helloworld.py Greet
その他、セントラルスケジューラを動作させるときの設定などについては、このページを参照する。
Using the Central Scheduler — Luigi 2.6.1 documentation
成功・失敗時のコールバックを使う
タスクが成功したり失敗したときに何かしたいというニーズはあると思う。
例えば失敗したとき Slack とかのチャットに通知を送るとか。
次のサンプルコードでは、タスクに on_success()
メソッドをオーバーライドしている。
import luigi
class Greet(luigi.Task):
def run(self):
print('Hello, World!')
def on_success(self):
"""Task が成功したときのコールバック"""
print('SUCCESS!')
def main():
luigi.run(main_task_cls=Greet, local_scheduler=True)
if __name__ == '__main__':
main()
上記を実行してみよう。
成功時のコールバックが呼び出されていることが分かる。
$ python event.py
DEBUG: Checking if Greet() is complete
/Users/amedama/.virtualenvs/py35/lib/python3.5/site-packages/luigi/worker.py:328: UserWarning: Task Greet() without outputs has no custom complete() method
is_complete = task.complete()
INFO: Informed scheduler that task Greet__99914b932b has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 8126] Worker Worker(salt=727065743, workers=1, host=macbookair.local, username=amedama, pid=8126) running Greet()
Hello, World!
SUCCESS!
INFO: [pid 8126] Worker Worker(salt=727065743, workers=1, host=macbookair.local, username=amedama, pid=8126) done Greet()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task Greet__99914b932b has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=727065743, workers=1, host=macbookair.local, username=amedama, pid=8126) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 1 tasks of which:
* 1 ran successfully:
- 1 Greet()
This progress looks :) because there were no failed tasks or missing external dependencies
===== Luigi Execution Summary =====
より広範囲に、どのタスクの成功や失敗でも呼び出されるようにしたいときは次のようにする。
@luigi.Task.event_handler
デコレータでコールバックを修飾した上で、呼び出すタイミングを引数で指定する。
今回は luigi.Event.SUCCESS
を指定しているのでタスクが成功したときに呼び出される。
import luigi
class Greet(luigi.Task):
def run(self):
print('Hello, World!')
@luigi.Task.event_handler(luigi.Event.SUCCESS)
def success_handler(task):
"""Task が成功したときのコールバック (汎用)"""
print('SUCCESS: {}'.format(task))
def main():
luigi.run(main_task_cls=Greet, local_scheduler=True)
if __name__ == '__main__':
main()
こちらも実行してみよう。
コールバックが呼び出されていることが分かる。
$ python event2.py
DEBUG: Checking if Greet() is complete
/Users/amedama/.virtualenvs/py35/lib/python3.5/site-packages/luigi/worker.py:328: UserWarning: Task Greet() without outputs has no custom complete() method
is_complete = task.complete()
INFO: Informed scheduler that task Greet__99914b932b has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 8136] Worker Worker(salt=042982575, workers=1, host=macbookair.local, username=amedama, pid=8136) running Greet()
Hello, World!
INFO: [pid 8136] Worker Worker(salt=042982575, workers=1, host=macbookair.local, username=amedama, pid=8136) done Greet()
SUCCESS: Greet()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task Greet__99914b932b has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=042982575, workers=1, host=macbookair.local, username=amedama, pid=8136) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 1 tasks of which:
* 1 ran successfully:
- 1 Greet()
This progress looks :) because there were no failed tasks or missing external dependencies
===== Luigi Execution Summary =====
まとめ
今回は Python でバッチ処理を作るときデータパイプラインを構築するためのフレームワークである Luigi を使ってみた。
触ってみた感触としては部分的にクセはあるものの、なかなかシンプルな作りで用途にハマれば使いやすそうだ。