CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: pep8 は pycodestyle になったし pep257 は pydocstyle になった

意外とまだあんまり知られていないような気がしたので、このブログにも書いておく。

PEP8 と pep8 と pycodestyle

Python には PEP8 という有名なコーディングスタイルガイドラインがある。

www.python.org

そして、そのコーディングスタイルに沿ったコードになっているのかをチェックするツールとして pep8 というパッケージがあった。

pypi.python.org

過去形にするのは半分正しくなくて、上記のように今もある。 ただ、これは後方互換のために残されているだけで、もうバージョンアップはされないだろう。

今後は代わりに pycodestyle というパッケージを使うことになる。

pypi.python.org

これは単にパッケージとコマンドの名前が変わっただけ。 とはいえ、こちらはバージョンアップが続くので最新の PEP8 に追従していくしチェックできる範囲も増えていくはず。 (PEP に書かれている内容は必要に応じて更新される)

試しに使ってみることにしよう。 まずは Python のパッケージマネージャである pip でインストールする。

$ pip install pycodestyle

サンプルコードとして PEP8 違反がいくつか含まれるものを用意した。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# 改行が足りない
def greeting(name = 'World'):  # 余分なスペースがある
    print('Hello, {name}'.format(name=name))


def main():
  greeting()  # インデントが 4 スペースでない


if __name__ == '__main__':
    main()

# 改行が多い

pycodestyle コマンドで上記のコードをチェックしてみる。

$ pycodestyle sample.py
sample.py:5:1: E302 expected 2 blank lines, found 1
sample.py:5:18: E251 unexpected spaces around keyword / parameter equals
sample.py:5:20: E251 unexpected spaces around keyword / parameter equals
sample.py:10:3: E111 indentation is not a multiple of four
sample.py:17:1: W391 blank line at end of file

色々と PEP8 に準拠していない箇所が見つかった。

PEP257 と pep257 と pydocstyle

同じことが pep257pydocstyle にも起きている。

PEP257 は docstring のフォーマットを規定したドキュメントの名前を指している。

www.python.org

そして、PEP8 と同じように pep257 というチェックツールがあった。

pypi.python.org

そして、同じように pydocstyle という名前に変更されている。

pypi.python.org

こちらも試しに使ってみることにしよう。 pip でさくっと入る。

$ pip install pydocstyle

そして PEP257 に違反している箇所を含むサンプルコードを用意した。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# そもそも docstring がない


def greeting(name='World'):
    '''ダブルクォートを使っていない'''
    print('Hello, {name}'.format(name=name))


def main():
    """
        改行が多い
    """
    greeting()


if __name__ == '__main__':
    main()

pydocstyle コマンドで上記のコードをチェックしてみよう。

$ pydocstyle sample.py
sample.py:1 at module level:
        D100: Missing docstring in public module
sample.py:7 in public function `greeting`:
        D300: Use """triple double quotes""" (found '''-quotes)
sample.py:7 in public function `greeting`:
        D400: First line should end with a period (not '')
sample.py:12 in public function `main`:
        D200: One-line docstring should fit on one line with quotes (found 3)
sample.py:12 in public function `main`:
        D208: Docstring is over-indented
sample.py:12 in public function `main`:
        D400: First line should end with a period (not '')

PEP257 に違反している箇所が見つかった。

周辺ツールの対応

リネームされたパッケージを依存パッケージに持つ主要なパッケージはどうなっているかなー、というのも一応は確認しておく。 例えば flake8 とか autopep8 について pipdeptree で見てみよう。

$ pip install flake8 autopep8 pipdeptree

flake8pycodestyle を使うようになっている。

$ pipdeptree | grep -A 3 flake8
flake8==3.3.0
  - mccabe [required: <0.7.0,>=0.6.0, installed: 0.6.1]
  - pycodestyle [required: >=2.0.0,<2.4.0, installed: 2.3.1]
  - pyflakes [required: >=1.5.0,<1.6.0, installed: 1.5.0]

autopep8pycodestyle を使うようになっていた。

$ pipdeptree | grep -A 1 autopep8
autopep8==1.3.2
  - pycodestyle [required: >=2.3, installed: 2.3.1]

間接的に使っている場合には特に対応する必要はなさそうだ。

まとめ

  • pep8pycodestyle に名前が変わった
  • pep257pydocstyle に名前が変わった
  • flake8 とかで間接的に使っている分には特に対応する必要はない

参考

事の発端は、このチケットらしい。

Please rename this tool · Issue #466 · PyCQA/pycodestyle · GitHub

いじょう。

Python: ... (Ellipsis) は任意の処理を示すのにも便利かも

PEP 484 – Type Hints を読んで「なるほど、こういう使い方もあるのか」と気づかれたのでブログに書いておく。 尚、このエントリの内容を実行するには Python 3 以降が必要となる。

使った Python のバージョンは次の通り。

$ python --version
Python 3.6.1

Ellipsis について

Python 3 には Ellipsis というオブジェクトがある。 これはドットを三つ連続させたもので得られる。

>>> ...
Ellipsis

これの使いみちとしてはコンテナオブジェクトでスライスと共に用いられることが多い。 Ellipsis 自体の解説は以前こちらのエントリで紹介している。

blog.amedama.jp

任意の処理を示すためのコードについて

ところで、これまで概念的な説明をするときに書くコードも、なるべく実行できるように書くようにしていた。

例えば、関数ならこんな感じで書くことが多かったように思う。 ここには任意の処理が入りますよ、みたいなのを説明するのにはコメントを使って、中身は pass にするとか。

def f():
    # Do something
    pass

これは、コメントだけだと文法的に正しくないため。

>>> def f():
...     # Do something
...
  File "<stdin>", line 3

    ^
IndentationError: expected an indented block

あるいは docstring を利用して、こんな風にすることもできるかもしれない。

def f():
    """Do something"""

docstring は実のところ単なる文字列なので、それが式になることでこの場合は前述したようなエラーにならない。

任意の処理を … (Ellipsis) で表現する

上記でも伝わりさえすれば問題はないんだけど Ellipsis を使えばさらに分かりやすくなる気がする。

def f():
    # Do something
    ...

Ellipsis 単独でもちゃんとした式なので、上記は Python 3 の処理系がちゃんと解釈できる。

あるいは「この変数には何かが入るよ」みたいな処理も Ellipsis で表現できる。

>>> variable = ...

これも variable に Ellipsis を代入しているだけなので、もちろん有効な Python コードになる。

問題は … が Ellipsis オブジェクトになるのが Python 3 から、っていうところかな。 上記は Python 2 では使うことができない。

Python から Hadoop Streaming を使ってみる

今回は、任意のプログラミング言語から Apache Hadoop を使うことのできる Hadoop Streaming という機能を使ってみる。

通常、Hadoop を使って MapReduce のジョブを直接扱うときは Java を使ってマッパーとリデューサーを書くことになる。 ただ、ご存知の通り Java のソースコードというのは重厚長大で、なかなか読み書きがしやすいとは言いにくい。 そこで、任意のプログラミング言語、具体的には標準入出力を処理する実行ファイルさえあれば使える機能ができた。 それが Hadoop Streaming というもの。 この機能を使うことで低レイヤーな MapReduce の処理を、使い慣れたプログラミング言語を使うなど好きなやり方で記述できる。

ちなみに、今回のエントリでは事前に Apache Hadoop がセットアップされていることを前提に書いていく。 具体的には、次のエントリのようにして疑似分散モードないし完全分散モードで構築されていること。

blog.amedama.jp

また、使用するプログラミング言語については使い慣れた Python を使うことにした。 そして、題材とする MapReduce のアプリケーションはワードカウントにする。 これは、MapReduce のハローワールド的な位置づけのアプリケーションになっている。 具体的な処理としては、テキストファイルの中に含まれる単語の数をカウントするというもの。

使った環境については前述したエントリにもあるけど、次の通り。

$ cat /etc/redhat-release
CentOS Linux release 7.3.1611 (Core)
$ uname -r
3.10.0-514.el7.x86_64
$ bin/hadoop version
Hadoop 2.8.0
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r 91f2b7a13d1e97be65db92ddabc627cc29ac0009
Compiled by jdu on 2017-03-17T04:12Z
Compiled with protoc 2.5.0
From source with checksum 60125541c2b3e266cbf3becc5bda666
This command was run using /home/vagrant/hadoop-2.8.0/share/hadoop/common/hadoop-common-2.8.0.jar

Hadoop Streaming でワードカウントを実行する

まずはワードカウントの処理対象となるテキストファイルから用意しておこう。 各単語はシンプルにスペースで区切られている。 見たところHelloHadoop といった単語はたくさん含まれていそうだ。

$ cat << 'EOF' > /tmp/input1.txt
Hello Hadoop World!
This is the first Hadoop Streaming application!
EOF
$ cat << 'EOF' > /tmp/input2.txt
Hello Hello Hello
Hadoop Streaming application!
EOF

続いては上記のファイルを HDFS にコピーする。 ワーキングディレクトリについては、先のエントリでセットアップした Apache Hadoop のディレクトリ内にいることを想定している。

$ bin/hdfs dfs -put /tmp/input*.txt .

もし、次のようなエラーが出るときはユーザのホームディレクトリが HDFS 上に存在していないことを示している。

put: `.': No such file or directory: `hdfs://localhost:9000/user/vagrant'

そのときはホームディレクトリを用意しよう。

$ bin/hdfs dfs -mkdir -p /user/$(whoami)

次のようにファイルがコピーされていることを確認する。

$ bin/hdfs dfs -ls
Found 2 items
-rw-r--r--   1 vagrant supergroup         68 2017-05-20 01:09 input1.txt
-rw-r--r--   1 vagrant supergroup         48 2017-05-20 01:09 input2.txt

続いて、今回の本題である Hadoop Streaming 用のプログラムを用意する。 その前に Hadoop Streaming で使うプログラムの基礎知識について説明しておく。

まず、Hadoop Streaming では標準入出力を通して Apache Hadoop とユーザのプログラムがデータをやり取りする。 このとき、プログラムは標準入出力を通してやり取りする各行をキーバリューの含まれたものとして扱うことになる。 始めに入力から見ると、これは本当にただ行が標準入力から渡ってくるだけに過ぎない。 ユーザのプログラムは、それを適切にパースしてキーバリューに分解する。 そして、分解したキーバリューに対して何らかの処理を施した上で、最後は標準出力に書き出すことになる。 つまり、標準入出力さえ扱うことができれば、どんなプログラムであっても MapReduce の処理を記述できるということになる。 ただし、詳しくは後述するものの、その処理が並列分散実行される点については留意しておく必要がある。

さて、長くなってしまったけどそろそろ実例を見ていこう。 Hadoop Streaming ではマッパーとレデューサーを別々に用意する。 今回はワードカウントを題材にするので、そのプログラムを Python で書いてみた。

まずはマッパーから。 標準入力の内容をパースしてキーバリューにしたら、それを標準出力に書き出している。 キーとバリューのペアはタブを使って区切るのが望ましい。 なぜなら Hadoop Streaming がデフォルトでキーとバリューをタブ区切りとして扱うため。 もちろん、この区切り文字は変更できるんだけど、今回はデフォルトのままでいく。

$ cat << 'EOF' > /var/tmp/mapper.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import re
import sys


def map_(line):
    # 空白などで単語ごとに行を区切る
    words = re.split(r'\s', line)
    # 単語と出現回数 (ここでは常に1) のタプルにする
    return [(key, 1) for key in words if key]


def write(kvs):
    for key, value in kvs:
        # キーとバリューをタブ区切りで標準出力に書き出す
        message = '{0}\t{1}'.format(key, value)
        print(message)


def main():
    # 標準入力から処理対象の文字列を読み込む
    for line in sys.stdin:
        # タブ区切りのキーとバリューに分解する
        key_and_values = map_(line)
        # 標準出力に書き込んでいく
        write(key_and_values)


if __name__ == '__main__':
    main()
EOF

続いてはリデューサー。 標準入力に含まれるキーバリューをパースしたら、その内容を元に単語の数をカウントしている。

$ cat << 'EOF' > /var/tmp/reducer.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import sys
from collections import defaultdict


# 集計結果を入れる辞書
results = defaultdict(int)


def reduce_(line):
    # 入力された行をタブで区切る
    key, value = line.split('\t')
    # キーごとに出現回数をカウントする
    results[key] += int(value)


def main():
    # キーの出現回数をカウントする
    for line in sys.stdin:
        reduce_(line)

    # 集計結果をキーとバリューのタブ区切りで標準出力に書き出す
    for key, value in results.items():
        message = '{0}\t{1}'.format(key, value)
        print(message)


if __name__ == '__main__':
    main()
EOF

上記で作ったマッパーとリデューサーに実行権限をつけておく。 これは必須ではない。

$ chmod +x /var/tmp/mapper.py
$ chmod +x /var/tmp/reducer.py

これで準備が整った。 それでは早速 Hadoop Streaming を実行してみよう。 jar には Apache Hadoop に同梱されている Hadoop Streaming 用のものを指定する。 -files オプションで使用する実行ファイルを指定して、それを -mapper-reducer オプションで割り当てていく。 MapReduce ジョブの入出力については -input-output オプションで指定する。

$ bin/hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.8.0.jar \
    -files /var/tmp/mapper.py,/var/tmp/reducer.py \
    -mapper mapper.py \
    -reducer reducer.py \
    -input input*.txt \
    -output output

無事にジョブが完了すると結果が output ディレクトリ以下に出力される。 内容を確認してみよう。

$ bin/hdfs dfs -cat output/*
application!    2
This    1
is  1
Hadoop  3
Streaming   2
World!  1
the 1
Hello   4
first   1

どうやら、ちゃんとワードカウントが動作しているようだ。

マップだけする (リデュースしない) パターン

場合によっては MapReduce で、マップだけを使ってリデュースはしたくないということもあるはず。 これは、単純に Apache Hadoop クラスタを並列分散実行するためだけに使うパターン。 このときは -D mapred.reduce.tasks=0 というようにリデュースの並列度を 0 に指定する。

実際に試してみよう。 このパターンではリデューサーの指定が必要ない。

$ bin/hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.8.0.jar \
    -D mapred.reduce.tasks=0 \
    -files /var/tmp/mapper.py \
    -mapper mapper.py \
    -input input*.txt \
    -output output2

結果は次の通り。 マップ処理を通してキーバリュー形式になるだけで終わった。

$ bin/hdfs dfs -cat output2/*
Hello   1
Hadoop  1
World!  1
This    1
is  1
the 1
first   1
Hadoop  1
Streaming   1
application!    1
Hello   1
Hello   1
Hello   1
Hadoop  1
Streaming   1
application!    1

Hadoop Streaming を扱う上での注意点

Hadoop Streaming のプログラムを書く上での注意点として一つ気づいたことがある。 それは、マップ処理は並列分散処理されるのは当たり前としても、リデュース処理も並列分散処理されうるということ。 なので、リデュース処理の中で一つのノードで完結していることを想定したような処理は含めてはいけないはず。

例えば、次のリデューサーは最終的な出力結果が降順ソートされることを期待した処理が含まれている。

$ cat << 'EOF' > /var/tmp/reducer.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""リデューサーが並列実行されると上手く動かないので、こんな風には書かないこと"""

import sys
from collections import defaultdict


results = defaultdict(int)


def reduce_(line):
    key, value = line.split('\t')
    results[key] += int(value)


def main():
    for line in sys.stdin:
        reduce_(line)

    # キーの出現回数ごとに降順ソートする
    items = results.items()
    sorted_items = sorted(items, key=lambda x: x[1], reverse=True)

    for key, value in sorted_items:
        message = '{0}\t{1}'.format(key, value)
        print(message)


if __name__ == '__main__':
    main()
EOF

上記を実行してみよう。 まずはリデュース処理の並列度が 1 の場合から。 つまり、一つのノード上で全てのリデュース処理が実行される。

$ chmod +x /var/tmp/reducer.py
$ bin/hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.8.0.jar \
    -D mapred.reduce.tasks=1 \
    -files /var/tmp/mapper.py,/var/tmp/reducer.py \
    -mapper mapper.py \
    -reducer reducer.py \
    -input input*.txt \
    -output output3

この場合は結果がちゃんとソートされて出る。

$ bin/hdfs dfs -cat output3/*
Hello   4
Hadoop  3
application!    2
Streaming   2
This    1
is  1
World!  1
the 1
first   1

しかし、並列度を上げてみるとどうだろうか? 例えば、次のようにすると並列度が 10 になる。 つまり、マッパーが処理したデータが 10 のノードに散らばって実行されることになる。 ただし、今回に関しては疑似分散モードなので一つのノード上でリデュースのタスクが 10 できるだけ。

$ bin/hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.8.0.jar \
    -D mapred.reduce.tasks=10 \
    -files /var/tmp/mapper.py,/var/tmp/reducer.py \
    -mapper mapper.py \
    -reducer reducer.py \
    -input input*.txt \
    -output output4

同じことは -numReduceTasks オプションでも指定できる。

$ bin/hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.8.0.jar \
    -files /var/tmp/mapper.py,/var/tmp/reducer.py \
    -mapper mapper.py \
    -reducer reducer.py \
    -input input*.txt \
    -output output4 \
    -numReduceTasks 10

実行結果を確認してみよう。 今度は見事に出力結果がソートされずバラバラになっている。 これはリデュース処理が並列実行されたことで起こった。 各々のリデュース単位では降順ソートされたものの、全体ではそれを連結したものになるので意味をなさない。

$ bin/hdfs dfs -cat output4/*
Hello   4
is  1
the 1
first   1
Hadoop  3
application!    2
World!  1
Streaming   2
This    1

上記を見て次に気になるのは、どうやってリデュース処理を各タスクに分解して割り振っているか、ということだろう。 リデュース処理に関しては、同じキーは必ず同じノード上に集めて実行しなきゃいけない。 つまり標準入出力に流れる行の中で、どの部分がキーなのかを Apache Hadoop は認識する必要があるということ。 これは前述した区切り文字と関係していて、デフォルトでは区切り文字がタブになっている。 そのため、今回は特に何も指定しなくても Apache Hadoop は標準入出力の中からキーを認識できた。 もし、これが異なる場合には Hadoop Streaming を実行する際に、それをオプションで伝えてやる必要がある。

Hadoop Streaming アプリケーションの開発について

ところで、Hadoop Streaming のアプリケーションはどうやって開発すれば良いだろうか。 上記のように Hadoop Streaming 経由で毎回起動するのは、正直めんどくさい。 そんなときは HDFS を使わずにローカルディスクを、Hadoop Streaming の代わりにパイプを使ってデバッグするやり方がある。

結局のところ Hadoop Streaming がやっている標準入出力同士をつなぎ合わせるという処理は Unix のパイプと同じだ。 HDFS と Hadoop クラスタで、それが並列分散実行されるという点を除けば、だけど。

そのため、並列度が 1 の挙動については次のようにして確かめることができる。

$ cat /tmp/input*.txt | /var/tmp/mapper.py  | /var/tmp/reducer.py
application!    2
This    1
is  1
Hadoop  3
Streaming   2
World!  1
the 1
Hello   4
first   1

リデュースしたくないときはパイプをつながなければいいだけ。

$ cat /tmp/input*.txt | /var/tmp/mapper.py
Hello   1
Hadoop  1
World!  1
This    1
is  1
the 1
first   1
Hadoop  1
Streaming   1
application!    1
Hello   1
Hello   1
Hello   1
Hadoop  1
Streaming   1
application!    1

上記のようにして、まずは手元で動作確認した上で Hadoop Streaming にかければ良い。 この段階で動作しないアプリケーションは Hadoop Streaming に持っていっても絶対に動作はしないだろう。

まとめ

今回は Apache Hadoop を任意のプログラミング言語から扱うことのできる Hadoop Streaming という機能を試してみた。 この機能を使えば Java を使いたくないという人でも Apache Hadoop の低レイヤーな MapReduce 処理を記述できる。

参考文献

Hadoop徹底入門 第2版 オープンソース分散処理環境の構築

Hadoop徹底入門 第2版 オープンソース分散処理環境の構築

Python: データパイプライン構築用フレームワーク Luigi を使ってみる

最近になって、バッチ処理においてデータパイプラインを組むためのフレームワークとして Luigi というものがあることを知った。 これは、Spotify という音楽のストリーミングサービスを提供する会社が作ったものらしい。 似たような OSS としては他にも Apache Airflow がある。 こちらは民宿サービスを提供する Airbnb が作ったものだけど、最近 Apache に寄贈されてインキュベータープロジェクトとなった。

Luigi の特徴としては、バッチ処理に特化している点が挙げられる。 ただし、定期的にバッチ処理を実行するような機能はない。 そこは、代わりに cron や systemd timer を使ってやる必要がある。 また、本体もそうだけどデータパイプラインについても Python を使って書く必要がある。

今回は、そんな Luigi を一通り触ってみることにする。 使った環境は次の通り。

$ sw_vers 
ProductName:    Mac OS X
ProductVersion: 10.12.4
BuildVersion:   16E195
$ python --version
Python 3.5.3

インストールする

インストールは Python のパッケージマネージャの pip を使ってさくっとできる。

$ pip install luigi
$ pip list --format=columns | grep luigi
luigi           2.6.1

ハローワールド

まず、Luigi における最も重要な概念としてタスクというものを知る必要がある。 タスクというのは、ユーザが Luigi にやらせたい何らかの仕事を細分化した最小単位となる。 そして、タスクを定義するには Task というクラスを継承したサブクラスを作る。

次のサンプルコードでは、最も単純なタスクを定義している。 このタスクでは run() メソッドをオーバーライドしてメッセージをプリントするようになっている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi


# Luigi で実行したい仕事は Task を継承したクラスにする
class Greet(luigi.Task):

    def run(self):
        """run() メソッドで具体的な処理を実行する"""
        print('Hello, World!')


def main():
    luigi.run()


if __name__ == '__main__':
    main()

上記のタスクを実行するには、次のようにする。 第二引数に渡している Greet が、実行したいタスク (のクラス名) を指している。 --local-scheduler オプションは、タスクのスケジューラをローカルモードに指定している。 スケジューラにはローカルとセントラルの二種類があるんだけど、詳しくは後ほど説明する。 とりあえず開発用途で使うならローカル、と覚えておけば良いかな。

$ python helloworld.py Greet --local-scheduler

実際に実行してみると、次のような出力が得られる。 実行サマリーとして Greet タスクが成功していることや、ログの中にメッセージが出力されていることが見て取れる。

$ python helloworld.py Greet --local-scheduler
DEBUG: Checking if Greet() is complete
/Users/amedama/.virtualenvs/py35/lib/python3.5/site-packages/luigi/worker.py:328: UserWarning: Task Greet() without outputs has no custom complete() method
  is_complete = task.complete()
INFO: Informed scheduler that task   Greet__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 7082] Worker Worker(salt=970230875, workers=1, host=macbookair.local, username=amedama, pid=7082) running   Greet()
Hello, World!
INFO: [pid 7082] Worker Worker(salt=970230875, workers=1, host=macbookair.local, username=amedama, pid=7082) done      Greet()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Greet__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=970230875, workers=1, host=macbookair.local, username=amedama, pid=7082) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 Greet()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

上記は、代わりに次のようにすることもできる。 このやり方では、まず python コマンドで luigi モジュールを実行して、そこ経由でタスクを実行している。

$ python -m luigi --module helloworld Greet --local-scheduler

このやり方の利点としては、パッケージ化されたインストール済みのタスクを実行できる点が挙げられる。 先ほどはカレントワーキングディレクトリ (CWD) にある Python ファイル (モジュール) を直接指定して実行していた。 それに対し、こちらは Python のパスにもとづいてモジュールが実行されるので、インストール済みのモジュールなら CWD の場所に関わらず実行できる。 そして、python コマンドのパスには CWD も含まれるので、先ほどと同じように実行できるというわけ。

もし、Python のモジュールとかがよく分からないというときは必要に応じて以下を参照してもらえると。 ようするに言いたいことは Python においてモジュールって呼ばれてるのは単なる Python ファイルのことだよ、という点。

blog.amedama.jp

また、もう一つのやり方として luigi コマンドを使うこともできる。 この場合も、実行するモジュールは --module オプションで指定する。 ただし、このやり方だと Python のパスに CWD が含まれない。 そのため CWD にあるモジュールを実行したいときは別途 PYTHONPATH 環境変数を指定する必要がある。

$ PYTHONPATH=. luigi --module helloworld Greet --local-scheduler

実行するタスクをモジュール内で指定する

とはいえ、毎回コマンドラインで実行するモジュールを指定するのも面倒だと思う。 そんなときは、次のようにして luigi.run() に実行するときのパラメータを指定してやると良さそう。 開発や検証用途なら、こうしておくと楽ちん。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi


class Greet(luigi.Task):

    def run(self):
        print('Hello, World!')


def main():
    # 起動したいタスクを指定して Luigi を実行する
    luigi.run(main_task_cls=Greet, local_scheduler=True)
    # あるいは
    # task = Greet()
    # luigi.build([task], local_scheduler=True)


if __name__ == '__main__':
    main()

上記のようにしてあればファイルを指定するだけで実行できるようになる。

$ python easyrun.py 
DEBUG: Checking if Greet() is complete
/Users/amedama/.virtualenvs/py35/lib/python3.5/site-packages/luigi/worker.py:328: UserWarning: Task Greet() without outputs has no custom complete() method
  is_complete = task.complete()
INFO: Informed scheduler that task   Greet__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 7165] Worker Worker(salt=224720963, workers=1, host=macbookair.local, username=amedama, pid=7165) running   Greet()
Hello, World!
INFO: [pid 7165] Worker Worker(salt=224720963, workers=1, host=macbookair.local, username=amedama, pid=7165) done      Greet()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Greet__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=224720963, workers=1, host=macbookair.local, username=amedama, pid=7165) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 Greet()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

ターゲットを指定する

Luigi において、もう一つ重要な概念がターゲットというもの。 これは、タスクを実行した結果を永続化する先を表している。 例えば、永続化する先はローカルディスクだったり Amazon S3 だったり HDFS だったりする。 タスクをチェーンしていく場合、このターゲットを介してデータをやり取りする。 つまり、一つ前のタスクが出力したターゲットの内容が、次のタスクの入力になるというわけ。

ちなみに、実は Luigi ではターゲットの指定がほとんど必須といってよいものになっている。 これまでの例では指定してなかったけど、それは単にすごくシンプルだったので何とかなっていたに過ぎない。 実際のところ、実行するときに次のような警告が出ていたのはターゲットを指定していなかったのが原因になっている。

/Users/amedama/.virtualenvs/py35/lib/python3.5/site-packages/luigi/worker.py:328: UserWarning: Task Greet() without outputs has no custom complete() method
  is_complete = task.complete()

次のサンプルコードでは、ターゲットとしてローカルディスクを指定している。 永続化する先としてローカルディスクを指定するときは LocalTarget というターゲットを使う。 ターゲットを指定するにはタスクのクラスに output() というメソッドをオーバーライドする。 そして、タスクの本体ともいえる run() メソッドでは output() メソッドで得られるターゲットに対して実行結果を書き込む。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi


class Greet(luigi.Task):

    def output(self):
        # Task の実行結果を保存する先を Target で指定する
        # LocalTarget はローカルディスクにファイルとして残す
        return luigi.LocalTarget('greeting.txt')

    def run(self):
        # output() メソッドで保存先の Target を取得する
        out = self.output()
        # ファイルを開く
        with out.open('w') as f:
            # 実行結果を書き込む
            f.write('Hello, World!\n')


def main():
    luigi.run(main_task_cls=Greet, local_scheduler=True)


if __name__ == '__main__':
    main()

それでは、上記を実行してみよう。 今回は、先ほどは表示されていた警告が出ていない点に注目してほしい。

$ python output.py 
DEBUG: Checking if Greet() is complete
INFO: Informed scheduler that task   Greet__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 7230] Worker Worker(salt=707987256, workers=1, host=macbookair.local, username=amedama, pid=7230) running   Greet()
INFO: [pid 7230] Worker Worker(salt=707987256, workers=1, host=macbookair.local, username=amedama, pid=7230) done      Greet()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Greet__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=707987256, workers=1, host=macbookair.local, username=amedama, pid=7230) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 Greet()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

そして、実行すると greeting.txt というファイルがディレクトリに作られている。

$ cat greeting.txt
Hello, World!

このように Luigi ではタスクの実行結果を永続化する。

テスト用途としてメモリをターゲットにする

先ほどの例ではターゲットとしてローカルディスクを使った。 ちなみに、テスト用途としてならターゲットをメモリにすることもできる。

次のサンプルコードでは MockTarget を使うことで実行結果を保存する先をメモリにしている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi
from luigi.mock import MockTarget


class Greet(luigi.Task):

    def output(self):
        # MockTarget はオンメモリにしか実行結果が残らない
        # 内部実装には io.Bytes が使われている
        # あくまでユニットテスト用途
        return MockTarget(self.__class__.__name__)

    def run(self):
        out = self.output()
        with out.open('w') as f:
            f.write('Hello, World!\n')


def main():
    luigi.run(main_task_cls=Greet, local_scheduler=True)


if __name__ == '__main__':
    main()

念のため、先ほど出力されたファイルを削除した上で上記を実行してみよう。

$ rm greeting.txt
$ python mock.py 
DEBUG: Checking if Greet() is complete
INFO: Informed scheduler that task   Greet__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 7256] Worker Worker(salt=817197119, workers=1, host=macbookair.local, username=amedama, pid=7256) running   Greet()
INFO: [pid 7256] Worker Worker(salt=817197119, workers=1, host=macbookair.local, username=amedama, pid=7256) done      Greet()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Greet__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=817197119, workers=1, host=macbookair.local, username=amedama, pid=7256) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 Greet()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

当然だけど、今度はファイルができない。

$ cat greeting.txt
cat: greeting.txt: No such file or directory

複数のタスクをチェーンする

これまでの例では一つのタスクを実行するだけだった。 現実世界の仕事では、タスクが一つだけということは滅多にないはず。 もちろん、色々なことをする巨大な一つのタスクを書くことも考えられるけど、それだと Luigi を使う魅力は半減してしまう。 なるべく、それ以上は細分化できないレベルまで仕事を小さくして、それをタスクに落とし込む。

次のサンプルコードでは Greet というタスクと Repeat というタスクをチェーンしている。 Repeat の実行には、まず Greet の実行が必要になる。 このような依存関係を表現するには、タスクのクラスに requires() というメソッドをオーバーライドする。 サンプルコードの内容としては Greet で出力された内容を Repeat で複数繰り返して出力する、というもの。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi


class Greet(luigi.Task):

    def output(self):
        return luigi.LocalTarget('greeting.txt')

    def run(self):
        out = self.output()
        with out.open('w') as f:
            f.write('Hello, World!\n')


class Repeat(luigi.Task):

    def requires(self):
        """Task の実行に必要な別の Task を指定する"""
        return Greet()

    def output(self):
        return luigi.LocalTarget('repeating.txt')

    def run(self):
        # requires() で指定した Task の実行結果 (Target) は input() メソッドで得られる
        input_ = self.input()
        output = self.output()

        with input_.open('r') as r, output.open('w') as w:
            # Greet が出力した内容を読み込む
            lines = r.readlines()

            # Greet の出力を 3 回繰り返し書き込む
            for _ in range(3):
                w.writelines(lines)


def main():
    luigi.run(main_task_cls=Repeat, local_scheduler=True)


if __name__ == '__main__':
    main()

上記を実行してみよう。

$ python requires.py 
DEBUG: Checking if Repeat() is complete
DEBUG: Checking if Greet() is complete
INFO: Informed scheduler that task   Repeat__99914b932b   has status   PENDING
INFO: Informed scheduler that task   Greet__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 7292] Worker Worker(salt=268804510, workers=1, host=macbookair.local, username=amedama, pid=7292) running   Greet()
INFO: [pid 7292] Worker Worker(salt=268804510, workers=1, host=macbookair.local, username=amedama, pid=7292) done      Greet()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Greet__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 7292] Worker Worker(salt=268804510, workers=1, host=macbookair.local, username=amedama, pid=7292) running   Repeat()
INFO: [pid 7292] Worker Worker(salt=268804510, workers=1, host=macbookair.local, username=amedama, pid=7292) done      Repeat()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Repeat__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=268804510, workers=1, host=macbookair.local, username=amedama, pid=7292) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 Greet()
    - 1 Repeat()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

すると、次のようにして実行結果のファイルが作られる。

$ cat greeting.txt 
Hello, World!
$ cat repeating.txt 
Hello, World!
Hello, World!
Hello, World!

ちなみに requires() メソッドをオーバーライドせずに依存関係を表現することもできるにはできる。 それは、次のように run() メソッドの中で直接タスクをインスタンス化して yield でターゲットを取得する方法。 これは、Dynamic dependencies と呼ばれている。 とはいえ、タスクの依存関係が分かりにくくなるので、使うのはなるべく避けた方が良い気がする。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi


class Greet(luigi.Task):

    def output(self):
        return luigi.LocalTarget('greeting.txt')

    def run(self):
        out = self.output()
        with out.open('w') as f:
            f.write('Hello, World!\n')


class Repeat(luigi.Task):

    def output(self):
        return luigi.LocalTarget('repeating.txt')

    def run(self):
        # requires() せずに直接 Task から yield するやり方もある
        input_ = yield Greet()
        output = self.output()

        with input_.open('r') as r, output.open('w') as w:
            lines = r.readlines()

            for _ in range(3):
                w.writelines(lines)


def main():
    luigi.run(main_task_cls=Repeat, local_scheduler=True)


if __name__ == '__main__':
    main()

また、依存するタスクは一つとは限らない。 そんなときは requires() メソッドの中でリストの形で依存するタスクを列挙する。 あるいは yield で一つずつ列挙していっても構わない。 ただし yield を使うやり方だと、後から依存タスクの実行を並列化したいときも、必ず直列に実行されるようになるらしい。 逆に言えば直列に実行することを担保したいときは yield を使うのが良さそう。 また、複数のタスクを集約して実行するためだけのタスクについては WrapperTask を継承すると良いらしい。

次のサンプルコードでは GreetEat そして Sleep というタスクを実行するためのタスクとして RequiresOnly を定義している。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi
from luigi.mock import MockTarget


class OnMemoryTask(luigi.Task):

    def output(self):
        return MockTarget(self.__class__.__name__)


class Greet(OnMemoryTask):

    def run(self):
        out = self.output()
        with out.open('w') as f:
            f.write('Hello, World!\n')


class Eat(OnMemoryTask):

    def run(self):
        out = self.output()
        with out.open('w') as f:
            f.write('Mog, Mog\n')


class Sleep(OnMemoryTask):

    def run(self):
        out = self.output()
        with out.open('w') as f:
            f.write('Zzz...\n')


class RequiresOnly(luigi.WrapperTask):

    def requires(self):
        # 依存している Task は複数書ける
        return [Greet(), Eat(), Sleep()]
        # あるいは
        # yield Greet()
        # yield Eat()
        # yield Sleep()
        # としても良い


def main():
    luigi.run(main_task_cls=RequiresOnly, local_scheduler=True)


if __name__ == '__main__':
    main()

上記を実行してみよう。 タスクの並列度を上げるには --workers オプションを指定する。

$ python reqonly.py --workers 3
DEBUG: Checking if RequiresOnly() is complete
DEBUG: Checking if Greet() is complete
DEBUG: Checking if Eat() is complete
DEBUG: Checking if Sleep() is complete
INFO: Informed scheduler that task   RequiresOnly__99914b932b   has status   PENDING
INFO: Informed scheduler that task   Sleep__99914b932b   has status   PENDING
INFO: Informed scheduler that task   Eat__99914b932b   has status   PENDING
INFO: Informed scheduler that task   Greet__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 3 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 4
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 3
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 7362] Worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360) running   Eat()
DEBUG: 3 running tasks, waiting for next task to finish
INFO: [pid 7362] Worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360) done      Eat()
INFO: [pid 7363] Worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360) running   Greet()
INFO: [pid 7363] Worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360) done      Greet()
INFO: Informed scheduler that task   Eat__99914b932b   has status   DONE
INFO: [pid 7364] Worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360) running   Sleep()
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: Greet__99914b932b is currently run by worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360)
DEBUG: Sleep__99914b932b is currently run by worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360)
INFO: Informed scheduler that task   Greet__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: Sleep__99914b932b is currently run by worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360)
INFO: [pid 7364] Worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360) done      Sleep()
INFO: Informed scheduler that task   Sleep__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: RequiresOnly__99914b932b is currently run by worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360)
INFO: [pid 7365] Worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360) running   RequiresOnly()
INFO: [pid 7365] Worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360) done      RequiresOnly()
INFO: Informed scheduler that task   RequiresOnly__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 4 tasks of which:
* 4 ran successfully:
    - 1 Eat()
    - 1 Greet()
    - 1 RequiresOnly()
    - 1 Sleep()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

タスクにパラメータを受け取る

タスクが動作するときに色々なパラメータを受け取りたい、という場面も多いはず。 そんなときは *Parameter を使えば良い。 次のサンプルコードでは、前述した Repeat タスクで繰り返しの数をパラメータ化している。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi


class Greet(luigi.Task):

    def output(self):
        return luigi.LocalTarget('greeting.txt')

    def run(self):
        out = self.output()
        with out.open('w') as f:
            f.write('Hello, World!\n')


class Repeat(luigi.Task):
    # Task の実行に必要な引数を Parameter として受け取る
    repeat_n = luigi.IntParameter(default=3)

    def requires(self):
        return Greet()

    def output(self):
        return luigi.LocalTarget('repeating.txt')

    def run(self):
        input_ = self.input()
        output = self.output()

        with input_.open('r') as r, output.open('w') as w:
            lines = r.readlines()

            # Parameter は Task のアトリビュートとして使える
            for _ in range(self.repeat_n):
                w.writelines(lines)


def main():
    luigi.run(main_task_cls=Repeat, local_scheduler=True)


if __name__ == '__main__':
    main()

上記を実行する前に、先ほど生成されたファイルを削除しておく。 これは、実行結果のファイルがあるときは実行がスキップされるため。 この動作は何のターゲットを使っても基本的にそうなっている。

$ rm repeating.txt

実行してみよう。 パラメータはコマンドラインから渡す。 例えば、今回の例では --repeat-n オプションになる。

$ python params.py --repeat-n=5
DEBUG: Checking if Repeat(repeat_n=5) is complete
DEBUG: Checking if Greet() is complete
INFO: Informed scheduler that task   Repeat_5_96577e160e   has status   PENDING
INFO: Informed scheduler that task   Greet__99914b932b   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 7436] Worker Worker(salt=344136773, workers=1, host=macbookair.local, username=amedama, pid=7436) running   Repeat(repeat_n=5)
INFO: [pid 7436] Worker Worker(salt=344136773, workers=1, host=macbookair.local, username=amedama, pid=7436) done      Repeat(repeat_n=5)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Repeat_5_96577e160e   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=344136773, workers=1, host=macbookair.local, username=amedama, pid=7436) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 1 present dependencies were encountered:
    - 1 Greet()
* 1 ran successfully:
    - 1 Repeat(repeat_n=5)

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

サマリーから Greet タスクはファイルがあるので実行されていないことが分かる。

Repeat タスクのターゲットは、パラメータに応じて再実行された。

$ cat repeating.txt 
Hello, World!
Hello, World!
Hello, World!
Hello, World!
Hello, World!

このように Luigi は実行結果があるときは実行をスキップするようになっている。 そのため、途中でタスクが失敗したときも原因を取り除いて再実行したとき最小限の処理でやり直すことができる。

特定期間のバッチ処理を実行する

例えば、あるバッチ処理が毎日決まった時間に実行されているとしよう。 それが、何らかの原因で失敗して、その日のうちに解決できなかったとする。 こんなときは、基本的に失敗した日の内容も後からリカバーしなきゃいけない。 ただ、当日以外のタスクを実行するというのは、そのように考えられて作っていないと意外と難しいもの。 Luigi だと、そういったユースケースもカバーしやすいように作られている。

例えば、次のようにして処理対象の日付を受け取って実行するようなタスクがあるとする。 こんな風に、処理対象の日付を指定するように作っておくのはバッチ処理でよく使われるパターンらしい。 また、生成されるターゲットのファイルに日付を含むようにしておくのもポイント。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi


class DailyGreet(luigi.Task):
    date = luigi.DateParameter()

    def output(self):
        return luigi.LocalTarget('daily-target-{}.txt'.format(str(self.date)))

    def run(self):
        out = self.output()
        with out.open('w') as f:
            f.write('Hello, World!\n')


def main():
    luigi.run(main_task_cls=DailyGreet, local_scheduler=True)


if __name__ == '__main__':
    main()

ひとまず、通常通り実行してみることにしよう。

$ python daily.py --date=2017-5-13
DEBUG: Checking if DailyGreet(date=2017-05-13) is complete
INFO: Informed scheduler that task   DailyGreet_2017_05_13_284b40e6ab   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 7481] Worker Worker(salt=817061721, workers=1, host=macbookair.local, username=amedama, pid=7481) running   DailyGreet(date=2017-05-13)
INFO: [pid 7481] Worker Worker(salt=817061721, workers=1, host=macbookair.local, username=amedama, pid=7481) done      DailyGreet(date=2017-05-13)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   DailyGreet_2017_05_13_284b40e6ab   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=817061721, workers=1, host=macbookair.local, username=amedama, pid=7481) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 DailyGreet(date=2017-05-13)

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

すると、実行結果のファイルが日付付きで作られる。

$ cat daily-target-2017-05-13.txt 
Hello, World!

Luigi では、このように日付を含むタスクを期間指定で一気に片付けるような方法が用意されている。 具体的には、次のようにして daily モジュールの RangeDailyBase タスクを指定した上で、--of オプションで実行したいタスクを指定する。 その上で、実行する日付を --start--stop オプションで指定する。

$ python -m luigi --module daily RangeDailyBase --of DailyGreet --start 2017-05-01 --stop 2017-05-12 --local-scheduler
...(省略)...
===== Luigi Execution Summary =====

Scheduled 12 tasks of which:
* 12 ran successfully:
    - 11 DailyGreet(date=2017-05-01...2017-05-11)
    - 1 RangeDailyBase(...)

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

すると、範囲内の日付でタスクが一気に実行される。

$ ls daily-target-2017-05-*
daily-target-2017-05-01.txt daily-target-2017-05-07.txt
daily-target-2017-05-02.txt daily-target-2017-05-08.txt
daily-target-2017-05-03.txt daily-target-2017-05-09.txt
daily-target-2017-05-04.txt daily-target-2017-05-10.txt
daily-target-2017-05-05.txt daily-target-2017-05-11.txt
daily-target-2017-05-06.txt daily-target-2017-05-13.txt

例えば、数日前から今日の日付までの範囲を指定して、先ほどの内容を cron などで指定しておくとする。 そうすれば、実行結果のあるものについてはスキップされることから、まだ完了していないものだけを実行できる。 もし一過性の問題で失敗していたとしても、翌日のバッチ処理などで自動的にリカバーされるというわけ。

組み込みのタスクを活用する

Luigi には、色々な外部コンポーネントやサービス、ライブラリと連携するための組み込みタスクが用意されている。 それらは contrib パッケージの中にあって、ざっと見ただけでも例えば Hadoop や Kubernetes などがある。

github.com

今回は一例として Python のオブジェクト・リレーショナルマッパーの一つである SQLAlchemy との連携を試してみる。

まずは SQLAlchemy をインストールしておく。

$ pip install sqlalchemy

次のサンプルコードでは UsersTask が出力する内容を SQLite3 のデータベースに保存している。 luigi.contrib.sqla.CopyToTable を継承したタスクは、依存するタスクからタブ区切りのターゲットを受け取って、内容を各カラムに保存していく。 今回はユーザ情報を模したテーブルを作るようにしてみた。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi
from luigi.mock import MockTarget
from luigi.contrib import sqla

from sqlalchemy import String
from sqlalchemy import Integer


class UsersTask(luigi.Task):

    def output(self):
        return MockTarget(self.__class__.__name__)

    def run(self):
        out = self.output()
        with out.open('w') as f:
            # 実行結果をタブ区切りで書き込んでいく
            f.write('Alice\t24\n')
            f.write('Bob\t30\n')
            f.write('Carol\t18\n')


# 実行結果を SQLAlchemy 経由で RDB に保存するための Task
class SQLATask(sqla.CopyToTable):
    # SQLAlchemy のテーブル定義
    columns = [
        (['name', String(64)], {'primary_key': True}),
        (['age', Integer()], {})
    ]
    # データベースへの接続 URL
    connection_string = 'sqlite:///users.db'
    # 保存するのに使うテーブル名
    table = 'users'

    def requires(self):
        # 実行結果がタブ区切りになっていれば、あとは依存先に指定するだけ
        return UsersTask()


def main():
    luigi.run(main_task_cls=SQLATask, local_scheduler=True)


if __name__ == '__main__':
    main()

上記を実行してみる。

$ python sqla.py        
DEBUG: Checking if SQLATask() is complete
DEBUG: Checking if UsersTask() is complete
INFO: Informed scheduler that task   SQLATask__99914b932b   has status   PENDING
INFO: Informed scheduler that task   UsersTask__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 7638] Worker Worker(salt=923323852, workers=1, host=macbookair.local, username=amedama, pid=7638) running   UsersTask()
INFO: [pid 7638] Worker Worker(salt=923323852, workers=1, host=macbookair.local, username=amedama, pid=7638) done      UsersTask()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   UsersTask__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 7638] Worker Worker(salt=923323852, workers=1, host=macbookair.local, username=amedama, pid=7638) running   SQLATask()
INFO: Running task copy to table for update id SQLATask__99914b932b for table users
INFO: Finished inserting 0 rows into SQLAlchemy target
INFO: Finished inserting rows into SQLAlchemy target
INFO: [pid 7638] Worker Worker(salt=923323852, workers=1, host=macbookair.local, username=amedama, pid=7638) done      SQLATask()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   SQLATask__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=923323852, workers=1, host=macbookair.local, username=amedama, pid=7638) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 SQLATask()
    - 1 UsersTask()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

すると SQLite3 のデータベースができる。

$ file users.db 
users.db: SQLite 3.x database

中身を見るとユーザ情報が保存されている。

$ sqlite3 users.db "SELECT * FROM users"
Alice|24
Bob|30
Carol|18

セントラルスケジューラを使う

前述した通り Luigi にはローカルスケジューラとセントラルスケジューラのモードがある。 これまでの例はローカルスケジューラを使うものだった。 次はセントラルスケジューラを使ってみることにする。

まず、Luigi のセントラルスケジューラには複数のジョブが同時に実行されないよう調整する機能が備わっている。 そして、タスクの状態などを確認するための WebUI なども付属する。 ただし、スケジューラ自体にはタスクを実行する機能はなくて、あくまで上記の二つの機能だけが提供されている。 つまり、タスクの実行自体はセントラルスケジューラに接続したクライアント (ワーカー) の仕事になる。 また、繰り返しになるけど定期・繰り返し実行などの機能はないので cron や systemd timer を使うことになる。

セントラルスケジューラは luigid というコマンドで起動する。

$ luigid

これで TCP:8082 ポートで WebUI が立ち上がる。

$ open http://localhost:8082

試しにセントラルスケジューラを使った状態でタスクを実行してみよう。 WebUI に成功 (done) なタスクが増えるはず。

$ python helloworld.py Greet

その他、セントラルスケジューラを動作させるときの設定などについては、このページを参照する。

Using the Central Scheduler — Luigi 2.6.1 documentation

成功・失敗時のコールバックを使う

タスクが成功したり失敗したときに何かしたいというニーズはあると思う。 例えば失敗したとき Slack とかのチャットに通知を送るとか。

次のサンプルコードでは、タスクに on_success() メソッドをオーバーライドしている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi


class Greet(luigi.Task):

    def run(self):
        print('Hello, World!')

    def on_success(self):
        """Task が成功したときのコールバック"""
        print('SUCCESS!')


def main():
    luigi.run(main_task_cls=Greet, local_scheduler=True)


if __name__ == '__main__':
    main()

上記を実行してみよう。 成功時のコールバックが呼び出されていることが分かる。

$ python event.py 
DEBUG: Checking if Greet() is complete
/Users/amedama/.virtualenvs/py35/lib/python3.5/site-packages/luigi/worker.py:328: UserWarning: Task Greet() without outputs has no custom complete() method
  is_complete = task.complete()
INFO: Informed scheduler that task   Greet__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 8126] Worker Worker(salt=727065743, workers=1, host=macbookair.local, username=amedama, pid=8126) running   Greet()
Hello, World!
SUCCESS!
INFO: [pid 8126] Worker Worker(salt=727065743, workers=1, host=macbookair.local, username=amedama, pid=8126) done      Greet()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Greet__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=727065743, workers=1, host=macbookair.local, username=amedama, pid=8126) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 Greet()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

より広範囲に、どのタスクの成功や失敗でも呼び出されるようにしたいときは次のようにする。 @luigi.Task.event_handler デコレータでコールバックを修飾した上で、呼び出すタイミングを引数で指定する。 今回は luigi.Event.SUCCESS を指定しているのでタスクが成功したときに呼び出される。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi


class Greet(luigi.Task):

    def run(self):
        print('Hello, World!')


@luigi.Task.event_handler(luigi.Event.SUCCESS)
def success_handler(task):
    """Task が成功したときのコールバック (汎用)"""
    print('SUCCESS: {}'.format(task))


def main():
    luigi.run(main_task_cls=Greet, local_scheduler=True)


if __name__ == '__main__':
    main()

こちらも実行してみよう。 コールバックが呼び出されていることが分かる。

$ python event2.py 
DEBUG: Checking if Greet() is complete
/Users/amedama/.virtualenvs/py35/lib/python3.5/site-packages/luigi/worker.py:328: UserWarning: Task Greet() without outputs has no custom complete() method
  is_complete = task.complete()
INFO: Informed scheduler that task   Greet__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 8136] Worker Worker(salt=042982575, workers=1, host=macbookair.local, username=amedama, pid=8136) running   Greet()
Hello, World!
INFO: [pid 8136] Worker Worker(salt=042982575, workers=1, host=macbookair.local, username=amedama, pid=8136) done      Greet()
SUCCESS: Greet()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Greet__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=042982575, workers=1, host=macbookair.local, username=amedama, pid=8136) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 Greet()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

まとめ

今回は Python でバッチ処理を作るときデータパイプラインを構築するためのフレームワークである Luigi を使ってみた。 触ってみた感触としては部分的にクセはあるものの、なかなかシンプルな作りで用途にハマれば使いやすそうだ。

Mac OS X で Apache Spark を触ってみる

最近 Apache Spark について耳にすることが多い。 Apache Spark は、ビッグデータ処理における並列分散処理基盤を提供する OSS の一つ。 似たような用途としては Apache Hadoop も有名だけど、それよりも最大で 100 倍ほど高速に動作するんだとか。 高速に動作する理由としては、各ノードのメモリに乗り切るサイズのデータならディスクを介さずに扱える点が大きいらしい。

今回は、そんな Apache Spark を Mac OS X で軽く触ってみることにする。 本来であれば、用途的には複数のノードを用意して並列分散処理をさせるところだけど使うのは一つのノードだけ。 また Apache Spark を操作するには Java, Scala, Python のインターフェースがある。 その中でも、今回は Python のインターフェース (PySpark) を使ってみることにした。

環境は次の通り。

$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.12.4
BuildVersion:   16E195

インストール

まずは Apache Spark の動作に JRE が必要なのでインストールしておく。 インストールには Homebrew Cask を使うと楽できる。

$ brew cask install java

Apache Spark のインストールは Homebrew を使ってさくっといける。

$ brew install apache-spark

インタラクティブシェルから触ってみる

Apache Spark を Python から扱うには pyspark というコマンドを使う。 このコマンドを起動すると、Python から Apache Spark を扱う上で必要なパッケージなどが自動的にインポートされる。 それ以外については、特に普段使っている REPL と違いはないようだ。

$ pyspark
Python 2.7.10 (default, Feb  6 2017, 23:53:20) 
[GCC 4.2.1 Compatible Apple LLVM 8.0.0 (clang-800.0.34)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/05/04 18:42:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/05/04 18:43:13 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
17/05/04 18:43:13 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
17/05/04 18:43:14 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/

Using Python version 2.7.10 (default, Feb  6 2017 23:53:20)
SparkSession available as 'spark'.
>>> 

サイトパッケージの場所を調べてもシステムの Python の場所になっているし。

>>> from pip import locations
>>> locations.user_site
'/Users/amedama/Library/Python/2.7/lib/python/site-packages'

これは、どうやらデフォルトの python コマンドで起動されるものが使われているだけっぽい? 試しに virtualenv を使ってデフォルトが Python 3.5 になるようにしてみる。

$ python --version
Python 3.5.3

この状態で pyspark コマンドを実行すると、ちゃんと Python 3.5 が使われるようになった。 予想は当たっていたようだ。

$ pyspark    
Python 3.5.3 (default, Feb 26 2017, 01:47:55) 
[GCC 4.2.1 Compatible Apple LLVM 8.0.0 (clang-800.0.42.1)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/05/04 19:29:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/05/04 19:30:00 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/

Using Python version 3.5.3 (default, Feb 26 2017 01:47:55)
SparkSession available as 'spark'.
>>> 

インタラクティブシェルを終了するときは通常の Python の REPL と同じように Ctrl-D とか exit() 関数とかで。

>>> exit()

テキストファイルを処理してみる

それでは、次は実際に Apache Spark でテキストファイルを扱ってみることにしよう。 題材は Apache Spark の README ファイルにする。

$ brew install wget
$ wget https://raw.githubusercontent.com/apache/spark/master/README.md

あと、なんか操作していると随所で psutil 入れた方が良いよっていう警告が出るので入れておく。

$ pip install psutil

インタラクティブシェルを起動しよう。

$ pyspark

PySpark のインタラクティブシェルでは sc という SparkContext のインスタンスが処理の取っ掛かりになるみたい。 これは、あらかじめインタラクティブシェルを起動した時点で用意されている。

>>> sc
<pyspark.context.SparkContext object at 0x1068cf810>

まずは SparkContext#textFile() メソッドでテキストファイルを読み込む。

>>> textfile = sc.textFile("README.md")

これで得られるインスタンスは Resilient Distributed Dataset (RDD) と呼ばれる形式になっている。 基本的に Apache Spark では、この RDD という単位でデータを扱うことになる。

>>> textfile
README.md MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

この RDD には、あらかじめ色々な API が用意されている。 例えば RDD に含まれるデータの数は count() というメソッドで得られる。

>>> textfile.count()
103

他にも、データセットの最初の要素を取り出す first() だとか。

>>> textfile.first()
u'# Apache Spark'

データセットに含まれる要素全てを得るには collect() などを使う。

>>> textfile.collect()
[u'# Apache Spark', u'', u'Spark is a fast and general cluster computing system for Big Data. It provides', u'high-level APIs in Scala, Java, Python, and R, and an optimized engine that', u'supports general computation graphs for data analysis. It also supports a', u'rich set of higher-level tools including Spark SQL for SQL and DataFrames,', u'MLlib for machine learning, GraphX for graph processing,', u'and Spark Streaming for stream processing.', u'', u'<http://spark.apache.org/>', u'', u'', u'## Online Documentation', u'', u'You can find the latest Spark documentation, including a programming', u'guide, on the [project web page](http://spark.apache.org/documentation.html).', u'This README file only contains basic setup instructions.', u'', u'## Building Spark', u'', u'Spark is built using [Apache Maven](http://maven.apache.org/).', u'To build Spark and its example programs, run:', u'', u'    build/mvn -DskipTests clean package', u'', u'(You do not need to do this if you downloaded a pre-built package.)', u'', u'You can build Spark using more than one thread by using the -T option with Maven, see ["Parallel builds in Maven 3"](https://cwiki.apache.org/confluence/display/MAVEN/Parallel+builds+in+Maven+3).', u'More detailed documentation is available from the project site, at', u'["Building Spark"](http://spark.apache.org/docs/latest/building-spark.html).', u'', u'For general development tips, including info on developing Spark using an IDE, see ["Useful Developer Tools"](http://spark.apache.org/developer-tools.html).', u'', u'## Interactive Scala Shell', u'', u'The easiest way to start using Spark is through the Scala shell:', u'', u'    ./bin/spark-shell', u'', u'Try the following command, which should return 1000:', u'', u'    scala> sc.parallelize(1 to 1000).count()', u'', u'## Interactive Python Shell', u'', u'Alternatively, if you prefer Python, you can use the Python shell:', u'', u'    ./bin/pyspark', u'', u'And run the following command, which should also return 1000:', u'', u'    >>> sc.parallelize(range(1000)).count()', u'', u'## Example Programs', u'', u'Spark also comes with several sample programs in the `examples` directory.', u'To run one of them, use `./bin/run-example <class> [params]`. For example:', u'', u'    ./bin/run-example SparkPi', u'', u'will run the Pi example locally.', u'', u'You can set the MASTER environment variable when running examples to submit', u'examples to a cluster. This can be a mesos:// or spark:// URL,', u'"yarn" to run on YARN, and "local" to run', u'locally with one thread, or "local[N]" to run locally with N threads. You', u'can also use an abbreviated class name if the class is in the `examples`', u'package. For instance:', u'', u'    MASTER=spark://host:7077 ./bin/run-example SparkPi', u'', u'Many of the example programs print usage help if no params are given.', u'', u'## Running Tests', u'', u'Testing first requires [building Spark](#building-spark). Once Spark is built, tests', u'can be run using:', u'', u'    ./dev/run-tests', u'', u'Please see the guidance on how to', u'[run tests for a module, or individual tests](http://spark.apache.org/developer-tools.html#individual-tests).', u'', u'## A Note About Hadoop Versions', u'', u'Spark uses the Hadoop core library to talk to HDFS and other Hadoop-supported', u'storage systems. Because the protocols have changed in different versions of', u'Hadoop, you must build Spark against the same version that your cluster runs.', u'', u'Please refer to the build documentation at', u'["Specifying the Hadoop Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version)', u'for detailed guidance on building for a particular distribution of Hadoop, including', u'building for particular Hive and Hive Thriftserver distributions.', u'', u'## Configuration', u'', u'Please refer to the [Configuration Guide](http://spark.apache.org/docs/latest/configuration.html)', u'in the online documentation for an overview on how to configure Spark.', u'', u'## Contributing', u'', u'Please review the [Contribution to Spark guide](http://spark.apache.org/contributing.html)', u'for information on how to get started contributing to the project.']

RDD に用意されている全ての API について知りたいときは、以下の公式ドキュメントを参照する感じで。

spark.apache.org

例として「Spark」という文字列が含まれる行だけを取り出してみよう。 このような要素には filter() メソッドが使える。

>>> filtered_rdd = textfile.filter(lambda line: u'Spark' in line)

処理の結果も、また RDD で得られる。 見たところ 20 行あるようだ。

>>> filtered_rdd.count()
20

要素を見ると、たしかにどの行にも「Spark」の文字列が含まれている。

>>> filtered_rdd.collect()
[u'# Apache Spark', u'Spark is a fast and general cluster computing system for Big Data. It provides', u'rich set of higher-level tools including Spark SQL for SQL and DataFrames,', u'and Spark Streaming for stream processing.', u'You can find the latest Spark documentation, including a programming', u'## Building Spark', u'Spark is built using [Apache Maven](http://maven.apache.org/).', u'To build Spark and its example programs, run:', u'You can build Spark using more than one thread by using the -T option with Maven, see ["Parallel builds in Maven 3"](https://cwiki.apache.org/confluence/display/MAVEN/Parallel+builds+in+Maven+3).', u'["Building Spark"](http://spark.apache.org/docs/latest/building-spark.html).', u'For general development tips, including info on developing Spark using an IDE, see ["Useful Developer Tools"](http://spark.apache.org/developer-tools.html).', u'The easiest way to start using Spark is through the Scala shell:', u'Spark also comes with several sample programs in the `examples` directory.', u'    ./bin/run-example SparkPi', u'    MASTER=spark://host:7077 ./bin/run-example SparkPi', u'Testing first requires [building Spark](#building-spark). Once Spark is built, tests', u'Spark uses the Hadoop core library to talk to HDFS and other Hadoop-supported', u'Hadoop, you must build Spark against the same version that your cluster runs.', u'in the online documentation for an overview on how to configure Spark.', u'Please review the [Contribution to Spark guide](http://spark.apache.org/contributing.html)']

次は、ビッグデータ処理のハローワールドとも言えるワードカウントを試してみよう。 まずは、テキストの内容をスペースで区切る。 これは MapReduce アルゴリズムでいう Map に相当する。

>>> words = textfile.flatMap(lambda line: line.split())
>>> words.count()
495

まあ、このままだと空白で区切っただけなので単語ではないものも含まれちゃうけど。

>>> words.first()
u'#'

先ほどテキストを区切るのに flatMap() を使ったのは、ただの map() だとリストが複数含まれるデータセットになってしまうため。 リストをさらに開いた状態 (flat) にしておかないと扱いづらい。

>>> textfile.map(lambda line: line.split()).first()
[u'#', u'Apache', u'Spark']

続いて、各単語に対して出現頻度をカウントするために数字を添えてタプルにする。 これも Map 処理だね。

>>> words_tuple = words.map(lambda word: (word, 1))
>>> words_tuple.first()
(u'#', 1)

あとはキー (各単語) ごとに出現頻度をカウントする。 これが MapReduce でいう Reduce に相当する。 ここの処理では、タプル同士を足し算すると第二要素のカウンタが増えることになる。 この動作は Python の流儀からすると、ちょっと直感に反するね。

>>> words_count = words_tuple.reduceByKey(lambda a, b: a + b)

これで、それぞれの単語の出現頻度がカウントできた。

>>> words_count.collect()[:10]
[('guide,', 1), ('APIs', 1), ('optimized', 1), ('name', 1), ('storage', 1), ('developing', 1), ('It', 2), ('package.', 1), ('particular', 2), ('development', 1)]

一番登場する頻度が多いのはどれなのかを調べるために、カウンタの値にもとづいて降順ソートする。 どうやら「the」が一番多いらしい。

>>> words_count_sorted = words_count.sortBy(lambda t: t[1], False)
>>> words_count_sorted.collect()[:10]
[('the', 24), ('to', 17), ('Spark', 16), ('for', 12), ('and', 9), ('##', 9), ('a', 8), ('run', 7), ('on', 7), ('can', 7)]

まとめ

今回は Apache Spark を Python のインタラクティブシェルを通して軽く触ってみた。

Python3 エンジニア基礎認定試験を受けてみた

表題の通り、Python3 エンジニア基礎認定試験という民間の試験を受けてみた。

www.pythonic-exam.com

最近になって合格証書が届いたので、どんな感じだったか軽く書いてみる。

f:id:momijiame:20170409221053j:plain

受けるまでの経緯

Python の認定試験が始まるらしいということは以前から知っていたんだけど、続報を聞かないのでどうなっているのかなーと思ってた。 そんな折、友人と Python について話す機会があって、そこでベータ試験が始まっていることを教えてもらい受けてみることにした。 最初の方のベータ試験は受験料が無料だったみたいだけど、受けたのは最終ベータ試験ということで有料 (¥10,800) だった。 ちなみに、ベータ試験であっても合格すれば本試験に合格したのと同じ扱いになるらしい。

蛇足だけど、ベータ試験の場合は払った費用が受験料という名目ではなくて試験を主催してる団体への協賛金という扱いになるっぽい。 要するに、お金周りの動きのチェックってことなんだろうね。 あと、特典として Pythonic マグカップがもらえた。

f:id:momijiame:20170204155948j:plain:w300

受験しての所感

問題の内容は、特に引っかけという感じの設問もなく素直な印象を受けた。 普段 Python で実務をこなしてるような人なら、特に勉強しなくても受かるんじゃないだろうか。 一応、認定教材としては Python チュートリアルが挙げられている。 試験の問題は、この本の内容に沿って章ごとに一定の割合で出されるっぽい。

Pythonチュートリアル 第3版

Pythonチュートリアル 第3版

それと、合格証書と一緒に、こんな感じで何処でどう得点したか示された紙がもらえる。 得点の低かった標準ライブラリは盲点で、普段使わない機能とかが出てくると分からないね・・・。 別に使ったことなくても、そのときは公式ドキュメントを調べれば良いんだし!(言い訳)

f:id:momijiame:20170409221934j:plain

受験時には、問題の誤りや Pythonic じゃないと感じた点があったら問題用紙に記号を書き込んで教えてくれ、というようなアナウンスもあった。 これは、おそらくベータ試験だからの措置で本試験ではきっとなくなるんだろうと思う。 ちなみに、試験後に問題用紙は回収されるので持って帰ることはできなかった。 あと、受験したベータ試験ではマークシート方式の筆記試験だったけど、本試験が始まったら CBT になるっぽい?

まとめ

今回、Python3 エンジニア基礎認定試験のベータ試験を受けて合格した。 内容的には落とすための試験という感じではないので、基本的な文法や機能さえちゃんと理解していれば受かりそう。

Python: scikit-learn で決定木 (Decision Tree) を試してみる

今回は機械学習アルゴリズムの一つである決定木を scikit-learn で試してみることにする。 決定木は、その名の通り木構造のモデルとなっていて、分類問題ないし回帰問題を解くのに使える。 また、決定木自体はランダムフォレストのような、より高度なアルゴリズムのベースとなっている。

使うときの API は scikit-learn が抽象化しているので、まずは軽く触ってみるところから始めよう。 決定木がどんな構造を持ったモデルなのかは最後にグラフで示す。 また、決定木自体は回帰問題にも使えるけど、今回は分類問題だけにフォーカスしている。

使った環境は次の通り。

$ sw_vers    
ProductName:    Mac OS X
ProductVersion: 10.12.4
BuildVersion:   16E195
$ python --version
Python 3.5.3

下準備

まずは、今回のサンプルコードを動かすのに必要な Python のパッケージをインストールしておく。

$ pip install scipy scikit-learn matplotlib

アイリスデータセットを分類してみる

まずは定番のアイリス (あやめ) データセットを決定木で分類してみることにする。 といっても scikit-learn を使う限りは、分類器が違っても API は同じなので使用感は変わらない。

次のサンプルコードではアイリスデータセットに含まれる三種類の花の品種を決定木で分類している。 モデルの汎化性能は LOO 法を使って計算した。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from sklearn import datasets
from sklearn.tree import DecisionTreeClassifier
from sklearn.model_selection import LeaveOneOut
from sklearn.metrics import accuracy_score


def main():
    # アイリスデータセットを読み込む
    dataset = datasets.load_iris()

    # 教師データとラベルデータを取り出す
    features = dataset.data
    targets = dataset.target

    # 判定したラベルデータを入れるリスト
    predicted_labels = []
    # LOO 法で汎化性能を調べる
    loo = LeaveOneOut()
    for train, test in loo.split(features):
        # 学習に使うデータ
        train_data = features[train]
        target_data = targets[train]

        # モデルを学習させる
        clf = DecisionTreeClassifier()
        clf.fit(train_data, target_data)

        # テストに使うデータを正しく判定できるか
        predicted_label = clf.predict(features[test])
        predicted_labels.append(predicted_label)

    # テストデータでの正解率 (汎化性能) を出力する
    score = accuracy_score(targets, predicted_labels)
    print(score)


if __name__ == '__main__':
    main()

上記を実行すると、次のような結果が得られる。 約 95.3% の汎化性能が得られた。 ただし、決定木はどんな木構造になるかが毎回異なるので汎化性能も微妙に異なってくる。

0.953333333333

ハイパーパラメータを調整する

機械学習アルゴリズムで、人間が調整してやらなきゃいけないパラメータのことをハイパーパラメータという。 決定木では、木構造の深さがモデルの複雑度を調整するためのハイパーパラメータになっている。 深いものはより複雑で、浅いものはより単純なモデルになる。

次のサンプルコードは、決定木の深さを指定した数に制限した状態での汎化性能を示すものになっている。 具体的な深さについては 1 ~ 20 を順番に試行している。 ちなみに、指定できるのは「最大の深さ」なので、できあがる木構造がそれよりも浅いということは十分にありうる。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from matplotlib import pyplot as plt

from sklearn import datasets
from sklearn.tree import DecisionTreeClassifier
from sklearn.model_selection import LeaveOneOut
from sklearn.metrics import accuracy_score


def main():
    dataset = datasets.load_iris()

    features = dataset.data
    targets = dataset.target

    # 調べる深さ
    MAX_DEPTH = 20
    depths = range(1, MAX_DEPTH)

    # 決定木の最大深度ごとに正解率を計算する
    accuracy_scores = []
    for depth in depths:

        predicted_labels = []
        loo = LeaveOneOut()
        for train, test in loo.split(features):
            train_data = features[train]
            target_data = targets[train]

            clf = DecisionTreeClassifier(max_depth=depth)
            clf.fit(train_data, target_data)

            predicted_label = clf.predict(features[test])
            predicted_labels.append(predicted_label)

        # 各深度での汎化性能を出力する
        score = accuracy_score(targets, predicted_labels)
        print('max depth={0}: {1}'.format(depth, score))

        accuracy_scores.append(score)

    # 最大深度ごとの正解率を折れ線グラフで可視化する
    X = list(depths)
    plt.plot(X, accuracy_scores)

    plt.xlabel('max depth')
    plt.ylabel('accuracy rate')
    plt.show()


if __name__ == '__main__':
    main()

上記の実行結果は次の通り。 前述した通り決定木がどんな木構造になるかは毎回異なるので、これも毎回微妙に異なるはず。

max depth=1: 0.3333333333333333
max depth=2: 0.9533333333333334
max depth=3: 0.9466666666666667
max depth=4: 0.9466666666666667
max depth=5: 0.9466666666666667
max depth=6: 0.9466666666666667
max depth=7: 0.9466666666666667
max depth=8: 0.94
max depth=9: 0.9533333333333334
max depth=10: 0.94
max depth=11: 0.9533333333333334
max depth=12: 0.9466666666666667
max depth=13: 0.9466666666666667
max depth=14: 0.94
max depth=15: 0.94
max depth=16: 0.9466666666666667
max depth=17: 0.96
max depth=18: 0.9466666666666667
max depth=19: 0.9466666666666667

同時に、次のような折れ線グラフが得られる。 どうやら、今回のケースでは最大の深さが 3 以上であれば、汎化性能はどれもそんなに変わらないようだ。 f:id:momijiame:20170425221114p:plain

どのように分類されているのか可視化してみる

先ほどは深さによって汎化性能がどのように変わってくるかを見てみた。 今回扱うデータセットでは 3 以上あれば汎化性能にはさほど大きな影響がないらしいことが分かった。 次は、木構造の深さ (つまりモデルの複雑度) によって分類のされ方がどのように変わるのかを見てみたい。

次のサンプルコードでは、二次元の散布図を元に分類される様子を見るために教師データを二次元に絞っている。 具体的には、データセットの教師データの中から「Petal length」と「Petal width」だけを取り出して使っている。 その上で、それぞれを x 軸と y 軸にプロットした。 また、同時にどの点がどの品種として分類されているかを背景に色付けしている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import numpy as np

import matplotlib.pyplot as plt

from sklearn import datasets
from sklearn.tree import DecisionTreeClassifier


def main():
    dataset = datasets.load_iris()

    features = dataset.data
    targets = dataset.target

    # Petal length と Petal width だけを特徴量として使う (二次元で図示したいので)
    petal_features = features[:, 2:]

    # 決定木の最大深度は制限しない
    clf = DecisionTreeClassifier()
    clf.fit(petal_features, targets)

    # 教師データの取りうる範囲 +-1 を計算する
    train_x_min = petal_features[:, 0].min() - 1
    train_y_min = petal_features[:, 1].min() - 1
    train_x_max = petal_features[:, 0].max() + 1
    train_y_max = petal_features[:, 1].max() + 1

    # 教師データの取りうる範囲でメッシュ状の座標を作る
    grid_interval = 0.2
    xx, yy = np.meshgrid(
        np.arange(train_x_min, train_x_max, grid_interval),
        np.arange(train_y_min, train_y_max, grid_interval),
    )

    # メッシュの座標を学習したモデルで判定させる
    Z = clf.predict(np.c_[xx.ravel(), yy.ravel()])
    # 各点の判定結果をグラフに描画する
    plt.contourf(xx, yy, Z.reshape(xx.shape), cmap=plt.cm.bone)

    # 教師データもプロットしておく
    for c in np.unique(targets):
        plt.scatter(petal_features[targets == c, 0],
                    petal_features[targets == c, 1])

    feature_names = dataset.feature_names
    plt.xlabel(feature_names[2])
    plt.ylabel(feature_names[3])
    plt.show()


if __name__ == '__main__':
    main()

このモデルについては木構造の深さを制限していない。

上記を実行すると、次のような散布図が得られる。

f:id:momijiame:20170425221614p:plain

次は、上記のサンプルコードに木構造の深さの制限を入れてみよう。 とりあえず最大の深さを 3 までに制限してみる。 前述した通り、こうしても汎化性能自体には大きな影響はないようだった。 分類のされ方には変化が出てくるだろうか?

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import numpy as np

import matplotlib.pyplot as plt

from sklearn import datasets
from sklearn.tree import DecisionTreeClassifier


def main():
    dataset = datasets.load_iris()

    features = dataset.data
    targets = dataset.target

    petal_features = features[:, 2:]

    # 決定木の深さを 3 までに制限する
    clf = DecisionTreeClassifier(max_depth=3)
    clf.fit(petal_features, targets)

    train_x_min = petal_features[:, 0].min() - 1
    train_y_min = petal_features[:, 1].min() - 1
    train_x_max = petal_features[:, 0].max() + 1
    train_y_max = petal_features[:, 1].max() + 1

    grid_interval = 0.2
    xx, yy = np.meshgrid(
        np.arange(train_x_min, train_x_max, grid_interval),
        np.arange(train_y_min, train_y_max, grid_interval),
    )
    Z = clf.predict(np.c_[xx.ravel(), yy.ravel()])
    plt.contourf(xx, yy, Z.reshape(xx.shape), cmap=plt.cm.bone)

    for c in np.unique(targets):
        plt.scatter(petal_features[targets == c, 0],
                    petal_features[targets == c, 1])

    feature_names = dataset.feature_names
    plt.xlabel(feature_names[2])
    plt.ylabel(feature_names[3])
    plt.show()


if __name__ == '__main__':
    main()

上記を実行すると、次のような散布図が得られる。

f:id:momijiame:20170425222036p:plain

先ほどの例と比べてみよう。 オレンジ色の品種が緑色の品種のところに食い込んでいるところが、このケースでは正しく認識されなくなっている。 モデルがより単純になったと考えられるだろう。

木構造を可視化してみる

scikit-learn には決定木の構造を DOT 言語で出力する機能がある。 その機能を使って木構造を可視化してみることにしよう。

まずは DOT 言語を処理するために Graphviz をインストールする。

$ brew install graphviz

そして、次のように学習させたモデルから DecisionTreeClassifier#export_graphviz() メソッドで DOT 言語で書かれたファイルを出力させる。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from sklearn import datasets
from sklearn.tree import DecisionTreeClassifier
from sklearn import tree


def main():
    dataset = datasets.load_iris()

    features = dataset.data
    targets = dataset.target

    # Petal length と Petal width だけを特徴量として使う
    petal_features = features[:, 2:]

    # モデルを学習させる
    clf = DecisionTreeClassifier(max_depth=3)
    clf.fit(petal_features, targets)

    # DOT 言語のフォーマットで決定木の形を出力する
    with open('iris-dtree.dot', mode='w') as f:
        tree.export_graphviz(clf, out_file=f)


if __name__ == '__main__':
    main()

これを Graphviz で画像データに変換する。

$ dot -T png iris-dtree.dot -o iris-dtree.png

すると、次のようなグラフが得られる。

f:id:momijiame:20170425223006p:plain

グラフでは、葉ノード以外が分類をするための分岐になっている。 これは、ようするに木構造が深くなるに従ってだんだんと対象を絞り込んでいっていることを意味する。 例えば、最初の分岐では x 軸が 2.45 未満のところで分岐している。 そして、左側の葉ノードは青色の品種が全て集まっていることが分かる。

まとめ

  • 今回は決定木を scikit-learn で試してみた
  • 決定木はランダムフォレストのようなアルゴリズムのベースとなっている
  • 決定木のモデルの複雑さは木構造の深さで制御する
  • 木構造の深さが浅くなるほど分類のされ方も単純になった

はじめてのパターン認識

はじめてのパターン認識