CUBE SUGAR CONTAINER

技術系のこと書きます。

CentOS7 で Apache Hadoop の疑似分散モードを使ってみる

Apache Hadoop はビッグデータ処理基盤を構築するための超有名なオープンソースソフトウェア。 Google の発表した論文を元にして MapReduce アルゴリズムと Hadoop Distributed File System (HDFS) が実装されている。 この Hadoop/HDFS を中心として Apache HiveApache HBase などのミドルウェアが動作する一大エコシステムが形成されている。 今回は、そんな Apache Hadoop を CentOS7 で使ってみることにする。

尚、Apache Hadoop 関連の OSS をインストールするときは、いくつかの会社が出しているディストリビューションを利用することが多い。 例えば Cloudera CDH や Hortonworks HDP など。 しかし、今回はそれらのディストリビューションを使うのではなく Apache が出しているパッケージをそのまま使ってみる。 その方が内部でどんなことをやっているのかとか、どのように設定すれば良いのか理解が深まるだろうと考えたため。

ちなみに、Apache Hadoop は以下に示す三つの動作モードがある。

  • ローカルモード
    • 一つのホストで動作する
    • HDFS を使わない
  • 疑似分散モード
    • 一つのホストで動作する
    • HDFS を使う
  • 完全分散モード
    • 複数のホストで動作する
    • HDFS を使う

今回は、その中でも疑似分散モードを試すことにした。 実運用に乗せる環境なら確実に完全分散モードになるけど、手元での検証用なら疑似分散モードでも十分に使えそうなので。 ちなみに HDFS というのは複数のホストに細切れにしたファイルを配布した上で、それを各ホストで並列処理できる仕組みを備えたファイルシステムになっている。

疑似分散モード、というよりローカルモードを含むシングルホストで動かすときに参照するドキュメントは次の通り。

Apache Hadoop 2.8.0 – Hadoop: Setting up a Single Node Cluster.

もし完全分散モードなら、このドキュメントを参照する。

Apache Hadoop 2.8.0 – Hadoop Cluster Setup

使った環境は次の通り。

$ cat /etc/redhat-release
CentOS Linux release 7.3.1611 (Core)
$ uname -r
3.10.0-514.16.1.el7.x86_64

Java をインストールする

Hadoop 関連のソフトウェアは、その多くが JVM 言語で書かれている。 なので、まずは Java をインストールしておこう。 一応、動作確認が取れているバージョンはこのページにあった。

HadoopJavaVersions - Hadoop Wiki

OpenJDK だと 1.7 しか今のところ確認されていないっぽい。 とはいえ、まあ動かないなんてことはないだろうという考えで 1.8 を入れてみることにする。 jps コマンドが使いたいのでインストールするのを JDK にする。

$ sudo yum -y install java-1.8.0-openjdk-devel

java コマンドが使えることが確認する。

$ java -version
openjdk version "1.8.0_131"
OpenJDK Runtime Environment (build 1.8.0_131-b11)
OpenJDK 64-Bit Server VM (build 25.131-b11, mixed mode)

Apache Hadoop をローカルモードで動かす

まずは Apache Hadoop をダウンロードした上でローカルモードで動かしてみる。 というより、何も設定しない状態ではローカルモードとして動作する。

ひとまず最低限必要なパッケージを一通り入れておく。

$ sudo yum -y install openssh-clients rsync wget

次に Apache Hadoop をダウンロードする。

$ wget http://ftp.riken.jp/net/apache/hadoop/common/hadoop-2.8.0/hadoop-2.8.0.tar.gz

Apache Hadoop のミラーサイトは次の通り。 好きなところを選んでダウンロードしよう。

www.apache.org

ダウンロードできたら解凍する。

$ tar xf hadoop-2.8.0.tar.gz

解凍してできたディレクトリに移動しよう。

$ cd hadoop-2.8.0/

Hadoop を動かすには環境変数として JAVA_HOME が設定されている必要がある。 そのため、先ほどインストールした OpenJDK のディレクトリを指定する。

$ export JAVA_HOME=/usr/lib/jvm/jre-1.8.0-openjdk

あるいは、起動時の環境変数を etc/hadoop/hadoop-env.sh で指定することもできる。 設定を永続化したいときは、こちらを使えば良さそう。

$ sed -i -e '
  s:^export JAVA_HOME=.*$:export JAVA_HOME=/usr/lib/jvm/jre-1.8.0-openjdk:
' etc/hadoop/hadoop-env.sh

これで hadoop コマンドがエラーにならず実行できるようになる。

$ bin/hadoop version
Hadoop 2.8.0
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r 91f2b7a13d1e97be65db92ddabc627cc29ac0009
Compiled by jdu on 2017-03-17T04:12Z
Compiled with protoc 2.5.0
From source with checksum 60125541c2b3e266cbf3becc5bda666
This command was run using /home/vagrant/hadoop-2.8.0/share/hadoop/common/hadoop-common-2.8.0.jar

いくつかサンプルプログラムを動かしてみよう。 これには hadoop コマンドに付属のサンプルプログラムの入った jar ファイルを指定する。 例えば、これは円周率を計算するもの。

$ bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.8.0.jar pi 10 100000
Number of Maps  = 10
Samples per Map = 100000
Wrote input for Map #0
Wrote input for Map #1
Wrote input for Map #2
Wrote input for Map #3
Wrote input for Map #4
Wrote input for Map #5
Wrote input for Map #6
Wrote input for Map #7
Wrote input for Map #8
Wrote input for Map #9
Starting Job
...(snip)...
Job Finished in 1.526 seconds
Estimated value of Pi is 3.14155200000000000000

それ以外には、ディレクトリ内のファイルから特定の文字列を検索する grep とか。 ローカルモードでは HDFS を使わないので OS のファイルシステムを直接使うことになる。

検索対象として input ディレクトリの中に Hadoop の設定ファイルをコピーしておこう。

$ mkdir input
$ cp etc/hadoop/*.xml input

input ディレクトリの中にあるファイルから dfs から始まる言葉を探してみる。

$ bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.8.0.jar grep input output 'dfs[a-z.]+'

結果は output ディレクトリに出力される。 この例では dfsadmin という言葉が一つだけ見つかった。

$ cat output/*
1  dfsadmin

同じ文字列を使って設定ファイルを egrep すると、たしかに一つだけ見つかる。

$ egrep 'dfs[a-z.]+' etc/hadoop/*.xml
etc/hadoop/hadoop-policy.xml:    dfsadmin and mradmin commands to refresh the security policy in-effect.

疑似分散モード

続いては疑似分散モードで動かしてみる。 疑似分散モードでは一つのホストで完結しているもののファイルシステムとして HDFS を使うことになる。 これによって、より実運用に近づけた状態で動作確認ができる。

SSH でログインできるようにする

Hadoop は SSH 経由でホストを操作するので、あらかじめログインできるようにしておかないといけない。 そこで、パスワードなしの公開鍵のペアを作ってローカルホストに仕込んでおく。

$ ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa
$ ssh-copy-id -i ~/.ssh/id_rsa.pub localhost

ログインできることを確認しておこう。

$ ssh localhost

設定ファイル: etc/hadoop/core-site.xml

続いては設定ファイルを編集していく。

まずは etc/hadoop/core-site.xml に接続先ファイルシステム (HDFS) の設定を仕込む。

$ cat << 'EOF' > /tmp/core-site.xml.property
  <property>
    <name>fs.defaultFS</name>
    <value>hdfs://localhost:9000</value>
  </property>
EOF
$ sed -i -e '
  /^<configuration>$/r /tmp/core-site.xml.property
  /^$/d
' etc/hadoop/core-site.xml

こんな感じ。 これでローカルホストの HDFS に接続するようになる。

$ tail -n 6 etc/hadoop/core-site.xml
<configuration>
  <property>
    <name>fs.defaultFS</name>
    <value>hdfs://localhost:9000</value>
  </property>
</configuration>

設定ファイル: etc/hadoop/hdfs-site.xml

続いては etc/hadoop/hdfs-site.xml で HDFS の設定をする。 ここでは、ファイルのレプリケーション数を 1 に設定する。 HDFS では冗長性を担保するために、複数のホストで同じデータを重複して持つ。 これが HDFS のレプリケーション数と呼ばれるもので、デフォルトでは 3 になっている。 シングルホストで動く疑似分散モードでは、レプリケーション数は 1 にするしかない。

$ cat << 'EOF' > /tmp/hdfs-site.xml.property
  <property>
    <name>dfs.replication</name>
    <value>1</value>
  </property>
EOF
$ sed -i -e '
  /^<configuration>$/r /tmp/hdfs-site.xml.property
  /^$/d
' etc/hadoop/hdfs-site.xml

こんな感じになる。

$ tail -n 6 etc/hadoop/hdfs-site.xml
<configuration>
  <property>
    <name>dfs.replication</name>
    <value>1</value>
  </property>
</configuration>

HDFS をセットアップする

ここまで設定できたら HDFS をフォーマットする。

$ bin/hdfs namenode -format

そして、必要なサービス (デーモン) を起動する。

$ sbin/start-dfs.sh

上記を実行したら、ログにエラーメッセージなどが出ていないことを確認しておこう。

$ grep -ir ERROR logs/*

また、次のように jps を確認してネームノード、データノード、セカンダリネームノードが動作していれば良い。 ネームノードは HDFS のメタデータなどを保持するマスターサーバのこと。 データノードは、実際のデータを格納しているスレーブサーバのようなもの。 セカンダリネームノードはその名の通りネームノードのセカンダリ。

$ jps
28720 NameNode
28849 DataNode
29012 SecondaryNameNode
29129 Jps

ここまでで hdfs コマンドが使えるようになる。 このコマンドを経由して HDFS の操作を行う。 使い方は一般的な POSIX のファイルシステムと基本的には似通っている。 ひとまず作業用にホームディレクトリを作ってみよう。

$ bin/hdfs dfs -mkdir -p /user/$(whoami)

すると、こんな感じでディレクトリが作られる。

$ bin/hdfs dfs -ls -R /
drwxr-xr-x   - vagrant supergroup          0 2017-05-15 21:16 /user
drwxr-xr-x   - vagrant supergroup          0 2017-05-15 21:16 /user/vagrant

ちなみに、HDFS で操作した実体はもちろん OS のファイルシステム上に存在している。 デフォルトでは /tmp/hadoop-<username> に作られる。

$ ls /tmp/hadoop-$(whoami)

先ほどと同じように grep のサンプルプログラムを動かすために input ディレクトリを作る。 その上でローカルのファイルシステムにある設定ファイルを HDFS のディレクトリ内にコピーしよう。

$ bin/hdfs dfs -mkdir input
$ bin/hdfs dfs -put etc/hadoop/*.xml input

上記ができたらサンプルプログラムを実行する。

$ bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.8.0.jar grep input output 'dfs[a-z.]+'

実行が終わったら結果が出力されているディレクトリを HDFS からローカルのファイルシステムにコピーする。

$ bin/hdfs dfs -get output output2

結果を見ると、次のようになった。

$ cat output2/*
1  dfsadmin
1  dfs.replication

二つに増えているのは疑似分散モードで動かすための設定に dfs から始まる文字列が含まれていたため。

$ egrep 'dfs[a-z.]+' etc/hadoop/*.xml
etc/hadoop/hadoop-policy.xml:    dfsadmin and mradmin commands to refresh the security policy in-effect.
etc/hadoop/hdfs-site.xml:    <name>dfs.replication</name>

ちなみに、後片付けとして HDFS のサービス (デーモン) を停止するには、次のようにする。

$ sbin/stop-dfs.sh

YARN/MRv2 で動かす

実は Hadoop には一口に MapReduce といっても複数の実装がある。 それが、先ほど動かした MRv1 と、これから動かす YARN/MRv2 というもの。 まだ違いがよく分かっていないんだけど、YARN/MRv2 は MRv1 に存在した問題を解消するために作られたらしい。 主に、スケーラビリティや汎用性が改善されているっぽい。

設定ファイル: etc/hadoop/mapred-site.xml

ここからは、また設定ファイルを編集していく。

まずは設定ファイル etc/hadoop/mapred-site.xml のテンプレートをコピーしてくる。

$ cp etc/hadoop/mapred-site.xml{.template,}

その上で MapReduce のフレームワークとして YARN が使われるようにする。

$ cat << 'EOF' > /tmp/mapred-site.xml.property
  <property>
    <name>mapreduce.framework.name</name>
    <value>yarn</value>
  </property>
EOF
$ sed -i -e '
  /^<configuration>$/r /tmp/mapred-site.xml.property
  /^$/d
' etc/hadoop/mapred-site.xml

こんな感じ。

$ tail -n 6 etc/hadoop/mapred-site.xml
<configuration>
  <property>
    <name>mapreduce.framework.name</name>
    <value>yarn</value>
  </property>
</configuration>

設定ファイル: etc/hadoop/yarn-site.xml

次は YARN 自体の設定を etc/hadoop/yarn-site.xml に書き込む。

$ cat << 'EOF' > /tmp/yarn-site.xml.property
  <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle</value>
  </property>
EOF
$ sed -i -e '
  /^<configuration>$/r /tmp/yarn-site.xml.property
  /^$/d
' etc/hadoop/yarn-site.xml

こうなる。

$ tail -n 7 etc/hadoop/yarn-site.xml
<configuration>
  <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle</value>
  </property>
<!-- Site specific YARN configuration properties -->
</configuration>

あとは YARN のサービス (デーモン) を起動する。

$ sbin/start-yarn.sh

先ほどと同じようにログにエラーメッセージが出ていないか確認しておく。

$ grep -ir ERROR logs/*

次のようにリソースマネージャとノードマネージャが起動していれば良い。

$ jps
28720 NameNode
28849 DataNode
29602 Jps
29012 SecondaryNameNode
29194 ResourceManager
29309 NodeManager

YARN/MRv2 でサンプルプログラムを動かしてみる

YARN/MRv2 は MapReduce の実行フレームワークが異なるだけで基本的には MRv1 と同じプログラムが使えるらしい。

先ほどと同じようにサンプルプログラムを動かしてみよう。 一旦、さっき使ったディレクトリを削除して、改めて設定ファイルをコピーしておく。 今度は XML 以外のファイルも対象に含めてみる。

$ bin/hdfs dfs -rm -r -f input output
$ bin/hdfs dfs -put etc/hadoop input

grep のサンプルプログラムを実行する。 理由は分からないけど、今回はやたらと時間がかかった。 ただファイルが増えただけが理由なのか YARN/MRv2 を使っているせいなのか。

$ bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.8.0.jar grep input output 'dfs[a-z.]+'

終わったら実行結果を HDFS からローカルのファイルシステムにコピーする。

$ bin/hdfs dfs -get output output3

次のように dfs から始まる文字列が見つかった。

$ cat output3/*
6  dfs.audit.logger
4  dfs.class
3  dfs.logger
3  dfs.server.namenode.
2  dfs.audit.log.maxbackupindex
2  dfs.period
2  dfs.audit.log.maxfilesize
1  dfs.log
1  dfs.file
1  dfs.servers
1  dfsadmin
1  dfsmetrics.log
1  dfs.replication

後片付けするときは次のようにして YARN のサービス (デーモン) を停止する。

$ sbin/stop-yarn.sh

まとめ

今回は素の Apache Hadoop を CentOS7 の上で使ってみた。 動作モードとしてはシングルホスト上で HDFS と共に動作する疑似分散モードを選んだ。 実運用するなら完全分散モードになるけど、手元での動作検証用なら有用といえそうだ。 ビッグデータを扱う上で Apache Hadoop 関連のプロダクトは避けて通れないので、今後も知見を集めていきたい。

Apache Hadoop の概念や動作原理的な部分は、この本が分かりやすかった。 ただし、この本では発売時期的に MRv1 しか扱っておらず YARN/MRv2 については書かれていない。

Hadoop徹底入門 第2版 オープンソース分散処理環境の構築

Hadoop徹底入門 第2版 オープンソース分散処理環境の構築

Python: データパイプライン構築用フレームワーク Luigi を使ってみる

最近になって、バッチ処理においてデータパイプラインを組むためのフレームワークとして Luigi というものがあることを知った。 これは、Spotify という音楽のストリーミングサービスを提供する会社が作ったものらしい。 似たような OSS としては他にも Apache Airflow がある。 こちらは民宿サービスを提供する Airbnb が作ったものだけど、最近 Apache に寄贈されてインキュベータープロジェクトとなった。

Luigi の特徴としては、バッチ処理に特化している点が挙げられる。 ただし、定期的にバッチ処理を実行するような機能はない。 そこは、代わりに cron や systemd timer を使ってやる必要がある。 また、本体もそうだけどデータパイプラインについても Python を使って書く必要がある。

今回は、そんな Luigi を一通り触ってみることにする。 使った環境は次の通り。

$ sw_vers 
ProductName:    Mac OS X
ProductVersion: 10.12.4
BuildVersion:   16E195
$ python --version
Python 3.5.3

インストールする

インストールは Python のパッケージマネージャの pip を使ってさくっとできる。

$ pip install luigi
$ pip list --format=columns | grep luigi
luigi           2.6.1

ハローワールド

まず、Luigi における最も重要な概念としてタスクというものを知る必要がある。 タスクというのは、ユーザが Luigi にやらせたい何らかの仕事を細分化した最小単位となる。 そして、タスクを定義するには Task というクラスを継承したサブクラスを作る。

次のサンプルコードでは、最も単純なタスクを定義している。 このタスクでは run() メソッドをオーバーライドしてメッセージをプリントするようになっている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi


# Luigi で実行したい仕事は Task を継承したクラスにする
class Greet(luigi.Task):

    def run(self):
        """run() メソッドで具体的な処理を実行する"""
        print('Hello, World!')


def main():
    luigi.run()


if __name__ == '__main__':
    main()

上記のタスクを実行するには、次のようにする。 第二引数に渡している Greet が、実行したいタスク (のクラス名) を指している。 --local-scheduler オプションは、タスクのスケジューラをローカルモードに指定している。 スケジューラにはローカルとセントラルの二種類があるんだけど、詳しくは後ほど説明する。 とりあえず開発用途で使うならローカル、と覚えておけば良いかな。

$ python helloworld.py Greet --local-scheduler

実際に実行してみると、次のような出力が得られる。 実行サマリーとして Greet タスクが成功していることや、ログの中にメッセージが出力されていることが見て取れる。

$ python helloworld.py Greet --local-scheduler
DEBUG: Checking if Greet() is complete
/Users/amedama/.virtualenvs/py35/lib/python3.5/site-packages/luigi/worker.py:328: UserWarning: Task Greet() without outputs has no custom complete() method
  is_complete = task.complete()
INFO: Informed scheduler that task   Greet__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 7082] Worker Worker(salt=970230875, workers=1, host=macbookair.local, username=amedama, pid=7082) running   Greet()
Hello, World!
INFO: [pid 7082] Worker Worker(salt=970230875, workers=1, host=macbookair.local, username=amedama, pid=7082) done      Greet()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Greet__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=970230875, workers=1, host=macbookair.local, username=amedama, pid=7082) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 Greet()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

上記は、代わりに次のようにすることもできる。 このやり方では、まず python コマンドで luigi モジュールを実行して、そこ経由でタスクを実行している。

$ python -m luigi --module helloworld Greet --local-scheduler

このやり方の利点としては、パッケージ化されたインストール済みのタスクを実行できる点が挙げられる。 先ほどはカレントワーキングディレクトリ (CWD) にある Python ファイル (モジュール) を直接指定して実行していた。 それに対し、こちらは Python のパスにもとづいてモジュールが実行されるので、インストール済みのモジュールなら CWD の場所に関わらず実行できる。 そして、python コマンドのパスには CWD も含まれるので、先ほどと同じように実行できるというわけ。

もし、Python のモジュールとかがよく分からないというときは必要に応じて以下を参照してもらえると。 ようするに言いたいことは Python においてモジュールって呼ばれてるのは単なる Python ファイルのことだよ、という点。

blog.amedama.jp

また、もう一つのやり方として luigi コマンドを使うこともできる。 この場合も、実行するモジュールは --module オプションで指定する。 ただし、このやり方だと Python のパスに CWD が含まれない。 そのため CWD にあるモジュールを実行したいときは別途 PYTHONPATH 環境変数を指定する必要がある。

$ PYTHONPATH=. luigi --module helloworld Greet --local-scheduler

実行するタスクをモジュール内で指定する

とはいえ、毎回コマンドラインで実行するモジュールを指定するのも面倒だと思う。 そんなときは、次のようにして luigi.run() に実行するときのパラメータを指定してやると良さそう。 開発や検証用途なら、こうしておくと楽ちん。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi


class Greet(luigi.Task):

    def run(self):
        print('Hello, World!')


def main():
    # 起動したいタスクを指定して Luigi を実行する
    luigi.run(main_task_cls=Greet, local_scheduler=True)
    # あるいは
    # task = Greet()
    # luigi.build([task], local_scheduler=True)


if __name__ == '__main__':
    main()

上記のようにしてあればファイルを指定するだけで実行できるようになる。

$ python easyrun.py 
DEBUG: Checking if Greet() is complete
/Users/amedama/.virtualenvs/py35/lib/python3.5/site-packages/luigi/worker.py:328: UserWarning: Task Greet() without outputs has no custom complete() method
  is_complete = task.complete()
INFO: Informed scheduler that task   Greet__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 7165] Worker Worker(salt=224720963, workers=1, host=macbookair.local, username=amedama, pid=7165) running   Greet()
Hello, World!
INFO: [pid 7165] Worker Worker(salt=224720963, workers=1, host=macbookair.local, username=amedama, pid=7165) done      Greet()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Greet__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=224720963, workers=1, host=macbookair.local, username=amedama, pid=7165) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 Greet()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

ターゲットを指定する

Luigi において、もう一つ重要な概念がターゲットというもの。 これは、タスクを実行した結果を永続化する先を表している。 例えば、永続化する先はローカルディスクだったり Amazon S3 だったり HDFS だったりする。 タスクをチェーンしていく場合、このターゲットを介してデータをやり取りする。 つまり、一つ前のタスクが出力したターゲットの内容が、次のタスクの入力になるというわけ。

ちなみに、実は Luigi ではターゲットの指定がほとんど必須といってよいものになっている。 これまでの例では指定してなかったけど、それは単にすごくシンプルだったので何とかなっていたに過ぎない。 実際のところ、実行するときに次のような警告が出ていたのはターゲットを指定していなかったのが原因になっている。

/Users/amedama/.virtualenvs/py35/lib/python3.5/site-packages/luigi/worker.py:328: UserWarning: Task Greet() without outputs has no custom complete() method
  is_complete = task.complete()

次のサンプルコードでは、ターゲットとしてローカルディスクを指定している。 永続化する先としてローカルディスクを指定するときは LocalTarget というターゲットを使う。 ターゲットを指定するにはタスクのクラスに output() というメソッドをオーバーライドする。 そして、タスクの本体ともいえる run() メソッドでは output() メソッドで得られるターゲットに対して実行結果を書き込む。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi


class Greet(luigi.Task):

    def output(self):
        # Task の実行結果を保存する先を Target で指定する
        # LocalTarget はローカルディスクにファイルとして残す
        return luigi.LocalTarget('greeting.txt')

    def run(self):
        # output() メソッドで保存先の Target を取得する
        out = self.output()
        # ファイルを開く
        with out.open('w') as f:
            # 実行結果を書き込む
            f.write('Hello, World!\n')


def main():
    luigi.run(main_task_cls=Greet, local_scheduler=True)


if __name__ == '__main__':
    main()

それでは、上記を実行してみよう。 今回は、先ほどは表示されていた警告が出ていない点に注目してほしい。

$ python output.py 
DEBUG: Checking if Greet() is complete
INFO: Informed scheduler that task   Greet__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 7230] Worker Worker(salt=707987256, workers=1, host=macbookair.local, username=amedama, pid=7230) running   Greet()
INFO: [pid 7230] Worker Worker(salt=707987256, workers=1, host=macbookair.local, username=amedama, pid=7230) done      Greet()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Greet__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=707987256, workers=1, host=macbookair.local, username=amedama, pid=7230) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 Greet()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

そして、実行すると greeting.txt というファイルがディレクトリに作られている。

$ cat greeting.txt
Hello, World!

このように Luigi ではタスクの実行結果を永続化する。

テスト用途としてメモリをターゲットにする

先ほどの例ではターゲットとしてローカルディスクを使った。 ちなみに、テスト用途としてならターゲットをメモリにすることもできる。

次のサンプルコードでは MockTarget を使うことで実行結果を保存する先をメモリにしている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi
from luigi.mock import MockTarget


class Greet(luigi.Task):

    def output(self):
        # MockTarget はオンメモリにしか実行結果が残らない
        # 内部実装には io.Bytes が使われている
        # あくまでユニットテスト用途
        return MockTarget(self.__class__.__name__)

    def run(self):
        out = self.output()
        with out.open('w') as f:
            f.write('Hello, World!\n')


def main():
    luigi.run(main_task_cls=Greet, local_scheduler=True)


if __name__ == '__main__':
    main()

念のため、先ほど出力されたファイルを削除した上で上記を実行してみよう。

$ rm greeting.txt
$ python mock.py 
DEBUG: Checking if Greet() is complete
INFO: Informed scheduler that task   Greet__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 7256] Worker Worker(salt=817197119, workers=1, host=macbookair.local, username=amedama, pid=7256) running   Greet()
INFO: [pid 7256] Worker Worker(salt=817197119, workers=1, host=macbookair.local, username=amedama, pid=7256) done      Greet()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Greet__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=817197119, workers=1, host=macbookair.local, username=amedama, pid=7256) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 Greet()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

当然だけど、今度はファイルができない。

$ cat greeting.txt
cat: greeting.txt: No such file or directory

複数のタスクをチェーンする

これまでの例では一つのタスクを実行するだけだった。 現実世界の仕事では、タスクが一つだけということは滅多にないはず。 もちろん、色々なことをする巨大な一つのタスクを書くことも考えられるけど、それだと Luigi を使う魅力は半減してしまう。 なるべく、それ以上は細分化できないレベルまで仕事を小さくして、それをタスクに落とし込む。

次のサンプルコードでは Greet というタスクと Repeat というタスクをチェーンしている。 Repeat の実行には、まず Greet の実行が必要になる。 このような依存関係を表現するには、タスクのクラスに requires() というメソッドをオーバーライドする。 サンプルコードの内容としては Greet で出力された内容を Repeat で複数繰り返して出力する、というもの。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi


class Greet(luigi.Task):

    def output(self):
        return luigi.LocalTarget('greeting.txt')

    def run(self):
        out = self.output()
        with out.open('w') as f:
            f.write('Hello, World!\n')


class Repeat(luigi.Task):

    def requires(self):
        """Task の実行に必要な別の Task を指定する"""
        return Greet()

    def output(self):
        return luigi.LocalTarget('repeating.txt')

    def run(self):
        # requires() で指定した Task の実行結果 (Target) は input() メソッドで得られる
        input_ = self.input()
        output = self.output()

        with input_.open('r') as r, output.open('w') as w:
            # Greet が出力した内容を読み込む
            lines = r.readlines()

            # Greet の出力を 3 回繰り返し書き込む
            for _ in range(3):
                w.writelines(lines)


def main():
    luigi.run(main_task_cls=Repeat, local_scheduler=True)


if __name__ == '__main__':
    main()

上記を実行してみよう。

$ python requires.py 
DEBUG: Checking if Repeat() is complete
DEBUG: Checking if Greet() is complete
INFO: Informed scheduler that task   Repeat__99914b932b   has status   PENDING
INFO: Informed scheduler that task   Greet__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 7292] Worker Worker(salt=268804510, workers=1, host=macbookair.local, username=amedama, pid=7292) running   Greet()
INFO: [pid 7292] Worker Worker(salt=268804510, workers=1, host=macbookair.local, username=amedama, pid=7292) done      Greet()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Greet__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 7292] Worker Worker(salt=268804510, workers=1, host=macbookair.local, username=amedama, pid=7292) running   Repeat()
INFO: [pid 7292] Worker Worker(salt=268804510, workers=1, host=macbookair.local, username=amedama, pid=7292) done      Repeat()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Repeat__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=268804510, workers=1, host=macbookair.local, username=amedama, pid=7292) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 Greet()
    - 1 Repeat()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

すると、次のようにして実行結果のファイルが作られる。

$ cat greeting.txt 
Hello, World!
$ cat repeating.txt 
Hello, World!
Hello, World!
Hello, World!

ちなみに requires() メソッドをオーバーライドせずに依存関係を表現することもできるにはできる。 それは、次のように run() メソッドの中で直接タスクをインスタンス化して yield でターゲットを取得する方法。 これは、Dynamic dependencies と呼ばれている。 とはいえ、タスクの依存関係が分かりにくくなるので、使うのはなるべく避けた方が良い気がする。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi


class Greet(luigi.Task):

    def output(self):
        return luigi.LocalTarget('greeting.txt')

    def run(self):
        out = self.output()
        with out.open('w') as f:
            f.write('Hello, World!\n')


class Repeat(luigi.Task):

    def output(self):
        return luigi.LocalTarget('repeating.txt')

    def run(self):
        # requires() せずに直接 Task から yield するやり方もある
        input_ = yield Greet()
        output = self.output()

        with input_.open('r') as r, output.open('w') as w:
            lines = r.readlines()

            for _ in range(3):
                w.writelines(lines)


def main():
    luigi.run(main_task_cls=Repeat, local_scheduler=True)


if __name__ == '__main__':
    main()

また、依存するタスクは一つとは限らない。 そんなときは requires() メソッドの中でリストの形で依存するタスクを列挙する。 あるいは yield で一つずつ列挙していっても構わない。 ただし yield を使うやり方だと、後から依存タスクの実行を並列化したいときも、必ず直列に実行されるようになるらしい。 逆に言えば直列に実行することを担保したいときは yield を使うのが良さそう。 また、複数のタスクを集約して実行するためだけのタスクについては WrapperTask を継承すると良いらしい。

次のサンプルコードでは GreetEat そして Sleep というタスクを実行するためのタスクとして RequiresOnly を定義している。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi
from luigi.mock import MockTarget


class OnMemoryTask(luigi.Task):

    def output(self):
        return MockTarget(self.__class__.__name__)


class Greet(OnMemoryTask):

    def run(self):
        out = self.output()
        with out.open('w') as f:
            f.write('Hello, World!\n')


class Eat(OnMemoryTask):

    def run(self):
        out = self.output()
        with out.open('w') as f:
            f.write('Mog, Mog\n')


class Sleep(OnMemoryTask):

    def run(self):
        out = self.output()
        with out.open('w') as f:
            f.write('Zzz...\n')


class RequiresOnly(luigi.WrapperTask):

    def requires(self):
        # 依存している Task は複数書ける
        return [Greet(), Eat(), Sleep()]
        # あるいは
        # yield Greet()
        # yield Eat()
        # yield Sleep()
        # としても良い


def main():
    luigi.run(main_task_cls=RequiresOnly, local_scheduler=True)


if __name__ == '__main__':
    main()

上記を実行してみよう。 タスクの並列度を上げるには --workers オプションを指定する。

$ python reqonly.py --workers 3
DEBUG: Checking if RequiresOnly() is complete
DEBUG: Checking if Greet() is complete
DEBUG: Checking if Eat() is complete
DEBUG: Checking if Sleep() is complete
INFO: Informed scheduler that task   RequiresOnly__99914b932b   has status   PENDING
INFO: Informed scheduler that task   Sleep__99914b932b   has status   PENDING
INFO: Informed scheduler that task   Eat__99914b932b   has status   PENDING
INFO: Informed scheduler that task   Greet__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 3 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 4
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 3
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 7362] Worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360) running   Eat()
DEBUG: 3 running tasks, waiting for next task to finish
INFO: [pid 7362] Worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360) done      Eat()
INFO: [pid 7363] Worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360) running   Greet()
INFO: [pid 7363] Worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360) done      Greet()
INFO: Informed scheduler that task   Eat__99914b932b   has status   DONE
INFO: [pid 7364] Worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360) running   Sleep()
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: Greet__99914b932b is currently run by worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360)
DEBUG: Sleep__99914b932b is currently run by worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360)
INFO: Informed scheduler that task   Greet__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: Sleep__99914b932b is currently run by worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360)
INFO: [pid 7364] Worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360) done      Sleep()
INFO: Informed scheduler that task   Sleep__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: RequiresOnly__99914b932b is currently run by worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360)
INFO: [pid 7365] Worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360) running   RequiresOnly()
INFO: [pid 7365] Worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360) done      RequiresOnly()
INFO: Informed scheduler that task   RequiresOnly__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 4 tasks of which:
* 4 ran successfully:
    - 1 Eat()
    - 1 Greet()
    - 1 RequiresOnly()
    - 1 Sleep()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

タスクにパラメータを受け取る

タスクが動作するときに色々なパラメータを受け取りたい、という場面も多いはず。 そんなときは *Parameter を使えば良い。 次のサンプルコードでは、前述した Repeat タスクで繰り返しの数をパラメータ化している。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi


class Greet(luigi.Task):

    def output(self):
        return luigi.LocalTarget('greeting.txt')

    def run(self):
        out = self.output()
        with out.open('w') as f:
            f.write('Hello, World!\n')


class Repeat(luigi.Task):
    # Task の実行に必要な引数を Parameter として受け取る
    repeat_n = luigi.IntParameter(default=3)

    def requires(self):
        return Greet()

    def output(self):
        return luigi.LocalTarget('repeating.txt')

    def run(self):
        input_ = self.input()
        output = self.output()

        with input_.open('r') as r, output.open('w') as w:
            lines = r.readlines()

            # Parameter は Task のアトリビュートとして使える
            for _ in range(self.repeat_n):
                w.writelines(lines)


def main():
    luigi.run(main_task_cls=Repeat, local_scheduler=True)


if __name__ == '__main__':
    main()

上記を実行する前に、先ほど生成されたファイルを削除しておく。 これは、実行結果のファイルがあるときは実行がスキップされるため。 この動作は何のターゲットを使っても基本的にそうなっている。

$ rm repeating.txt

実行してみよう。 パラメータはコマンドラインから渡す。 例えば、今回の例では --repeat-n オプションになる。

$ python params.py --repeat-n=5
DEBUG: Checking if Repeat(repeat_n=5) is complete
DEBUG: Checking if Greet() is complete
INFO: Informed scheduler that task   Repeat_5_96577e160e   has status   PENDING
INFO: Informed scheduler that task   Greet__99914b932b   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 7436] Worker Worker(salt=344136773, workers=1, host=macbookair.local, username=amedama, pid=7436) running   Repeat(repeat_n=5)
INFO: [pid 7436] Worker Worker(salt=344136773, workers=1, host=macbookair.local, username=amedama, pid=7436) done      Repeat(repeat_n=5)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Repeat_5_96577e160e   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=344136773, workers=1, host=macbookair.local, username=amedama, pid=7436) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 1 present dependencies were encountered:
    - 1 Greet()
* 1 ran successfully:
    - 1 Repeat(repeat_n=5)

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

サマリーから Greet タスクはファイルがあるので実行されていないことが分かる。

Repeat タスクのターゲットは、パラメータに応じて再実行された。

$ cat repeating.txt 
Hello, World!
Hello, World!
Hello, World!
Hello, World!
Hello, World!

このように Luigi は実行結果があるときは実行をスキップするようになっている。 そのため、途中でタスクが失敗したときも原因を取り除いて再実行したとき最小限の処理でやり直すことができる。

特定期間のバッチ処理を実行する

例えば、あるバッチ処理が毎日決まった時間に実行されているとしよう。 それが、何らかの原因で失敗して、その日のうちに解決できなかったとする。 こんなときは、基本的に失敗した日の内容も後からリカバーしなきゃいけない。 ただ、当日以外のタスクを実行するというのは、そのように考えられて作っていないと意外と難しいもの。 Luigi だと、そういったユースケースもカバーしやすいように作られている。

例えば、次のようにして処理対象の日付を受け取って実行するようなタスクがあるとする。 こんな風に、処理対象の日付を指定するように作っておくのはバッチ処理でよく使われるパターンらしい。 また、生成されるターゲットのファイルに日付を含むようにしておくのもポイント。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi


class DailyGreet(luigi.Task):
    date = luigi.DateParameter()

    def output(self):
        return luigi.LocalTarget('daily-target-{}.txt'.format(str(self.date)))

    def run(self):
        out = self.output()
        with out.open('w') as f:
            f.write('Hello, World!\n')


def main():
    luigi.run(main_task_cls=DailyGreet, local_scheduler=True)


if __name__ == '__main__':
    main()

ひとまず、通常通り実行してみることにしよう。

$ python daily.py --date=2017-5-13
DEBUG: Checking if DailyGreet(date=2017-05-13) is complete
INFO: Informed scheduler that task   DailyGreet_2017_05_13_284b40e6ab   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 7481] Worker Worker(salt=817061721, workers=1, host=macbookair.local, username=amedama, pid=7481) running   DailyGreet(date=2017-05-13)
INFO: [pid 7481] Worker Worker(salt=817061721, workers=1, host=macbookair.local, username=amedama, pid=7481) done      DailyGreet(date=2017-05-13)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   DailyGreet_2017_05_13_284b40e6ab   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=817061721, workers=1, host=macbookair.local, username=amedama, pid=7481) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 DailyGreet(date=2017-05-13)

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

すると、実行結果のファイルが日付付きで作られる。

$ cat daily-target-2017-05-13.txt 
Hello, World!

Luigi では、このように日付を含むタスクを期間指定で一気に片付けるような方法が用意されている。 具体的には、次のようにして daily モジュールの RangeDailyBase タスクを指定した上で、--of オプションで実行したいタスクを指定する。 その上で、実行する日付を --start--stop オプションで指定する。

$ python -m luigi --module daily RangeDailyBase --of DailyGreet --start 2017-05-01 --stop 2017-05-12 --local-scheduler
...(省略)...
===== Luigi Execution Summary =====

Scheduled 12 tasks of which:
* 12 ran successfully:
    - 11 DailyGreet(date=2017-05-01...2017-05-11)
    - 1 RangeDailyBase(...)

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

すると、範囲内の日付でタスクが一気に実行される。

$ ls daily-target-2017-05-*
daily-target-2017-05-01.txt daily-target-2017-05-07.txt
daily-target-2017-05-02.txt daily-target-2017-05-08.txt
daily-target-2017-05-03.txt daily-target-2017-05-09.txt
daily-target-2017-05-04.txt daily-target-2017-05-10.txt
daily-target-2017-05-05.txt daily-target-2017-05-11.txt
daily-target-2017-05-06.txt daily-target-2017-05-13.txt

例えば、数日前から今日の日付までの範囲を指定して、先ほどの内容を cron などで指定しておくとする。 そうすれば、実行結果のあるものについてはスキップされることから、まだ完了していないものだけを実行できる。 もし一過性の問題で失敗していたとしても、翌日のバッチ処理などで自動的にリカバーされるというわけ。

組み込みのタスクを活用する

Luigi には、色々な外部コンポーネントやサービス、ライブラリと連携するための組み込みタスクが用意されている。 それらは contrib パッケージの中にあって、ざっと見ただけでも例えば Hadoop や Kubernetes などがある。

github.com

今回は一例として Python のオブジェクト・リレーショナルマッパーの一つである SQLAlchemy との連携を試してみる。

まずは SQLAlchemy をインストールしておく。

$ pip install sqlalchemy

次のサンプルコードでは UsersTask が出力する内容を SQLite3 のデータベースに保存している。 luigi.contrib.sqla.CopyToTable を継承したタスクは、依存するタスクからタブ区切りのターゲットを受け取って、内容を各カラムに保存していく。 今回はユーザ情報を模したテーブルを作るようにしてみた。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi
from luigi.mock import MockTarget
from luigi.contrib import sqla

from sqlalchemy import String
from sqlalchemy import Integer


class UsersTask(luigi.Task):

    def output(self):
        return MockTarget(self.__class__.__name__)

    def run(self):
        out = self.output()
        with out.open('w') as f:
            # 実行結果をタブ区切りで書き込んでいく
            f.write('Alice\t24\n')
            f.write('Bob\t30\n')
            f.write('Carol\t18\n')


# 実行結果を SQLAlchemy 経由で RDB に保存するための Task
class SQLATask(sqla.CopyToTable):
    # SQLAlchemy のテーブル定義
    columns = [
        (['name', String(64)], {'primary_key': True}),
        (['age', Integer()], {})
    ]
    # データベースへの接続 URL
    connection_string = 'sqlite:///users.db'
    # 保存するのに使うテーブル名
    table = 'users'

    def requires(self):
        # 実行結果がタブ区切りになっていれば、あとは依存先に指定するだけ
        return UsersTask()


def main():
    luigi.run(main_task_cls=SQLATask, local_scheduler=True)


if __name__ == '__main__':
    main()

上記を実行してみる。

$ python sqla.py        
DEBUG: Checking if SQLATask() is complete
DEBUG: Checking if UsersTask() is complete
INFO: Informed scheduler that task   SQLATask__99914b932b   has status   PENDING
INFO: Informed scheduler that task   UsersTask__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 7638] Worker Worker(salt=923323852, workers=1, host=macbookair.local, username=amedama, pid=7638) running   UsersTask()
INFO: [pid 7638] Worker Worker(salt=923323852, workers=1, host=macbookair.local, username=amedama, pid=7638) done      UsersTask()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   UsersTask__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 7638] Worker Worker(salt=923323852, workers=1, host=macbookair.local, username=amedama, pid=7638) running   SQLATask()
INFO: Running task copy to table for update id SQLATask__99914b932b for table users
INFO: Finished inserting 0 rows into SQLAlchemy target
INFO: Finished inserting rows into SQLAlchemy target
INFO: [pid 7638] Worker Worker(salt=923323852, workers=1, host=macbookair.local, username=amedama, pid=7638) done      SQLATask()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   SQLATask__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=923323852, workers=1, host=macbookair.local, username=amedama, pid=7638) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 SQLATask()
    - 1 UsersTask()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

すると SQLite3 のデータベースができる。

$ file users.db 
users.db: SQLite 3.x database

中身を見るとユーザ情報が保存されている。

$ sqlite3 users.db "SELECT * FROM users"
Alice|24
Bob|30
Carol|18

セントラルスケジューラを使う

前述した通り Luigi にはローカルスケジューラとセントラルスケジューラのモードがある。 これまでの例はローカルスケジューラを使うものだった。 次はセントラルスケジューラを使ってみることにする。

まず、Luigi のセントラルスケジューラには複数のジョブが同時に実行されないよう調整する機能が備わっている。 そして、タスクの状態などを確認するための WebUI なども付属する。 ただし、スケジューラ自体にはタスクを実行する機能はなくて、あくまで上記の二つの機能だけが提供されている。 つまり、タスクの実行自体はセントラルスケジューラに接続したクライアント (ワーカー) の仕事になる。 また、繰り返しになるけど定期・繰り返し実行などの機能はないので cron や systemd timer を使うことになる。

セントラルスケジューラは luigid というコマンドで起動する。

$ luigid

これで TCP:8082 ポートで WebUI が立ち上がる。

$ open http://localhost:8082

試しにセントラルスケジューラを使った状態でタスクを実行してみよう。 WebUI に成功 (done) なタスクが増えるはず。

$ python helloworld.py Greet

その他、セントラルスケジューラを動作させるときの設定などについては、このページを参照する。

Using the Central Scheduler — Luigi 2.6.1 documentation

成功・失敗時のコールバックを使う

タスクが成功したり失敗したときに何かしたいというニーズはあると思う。 例えば失敗したとき Slack とかのチャットに通知を送るとか。

次のサンプルコードでは、タスクに on_success() メソッドをオーバーライドしている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi


class Greet(luigi.Task):

    def run(self):
        print('Hello, World!')

    def on_success(self):
        """Task が成功したときのコールバック"""
        print('SUCCESS!')


def main():
    luigi.run(main_task_cls=Greet, local_scheduler=True)


if __name__ == '__main__':
    main()

上記を実行してみよう。 成功時のコールバックが呼び出されていることが分かる。

$ python event.py 
DEBUG: Checking if Greet() is complete
/Users/amedama/.virtualenvs/py35/lib/python3.5/site-packages/luigi/worker.py:328: UserWarning: Task Greet() without outputs has no custom complete() method
  is_complete = task.complete()
INFO: Informed scheduler that task   Greet__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 8126] Worker Worker(salt=727065743, workers=1, host=macbookair.local, username=amedama, pid=8126) running   Greet()
Hello, World!
SUCCESS!
INFO: [pid 8126] Worker Worker(salt=727065743, workers=1, host=macbookair.local, username=amedama, pid=8126) done      Greet()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Greet__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=727065743, workers=1, host=macbookair.local, username=amedama, pid=8126) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 Greet()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

より広範囲に、どのタスクの成功や失敗でも呼び出されるようにしたいときは次のようにする。 @luigi.Task.event_handler デコレータでコールバックを修飾した上で、呼び出すタイミングを引数で指定する。 今回は luigi.Event.SUCCESS を指定しているのでタスクが成功したときに呼び出される。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi


class Greet(luigi.Task):

    def run(self):
        print('Hello, World!')


@luigi.Task.event_handler(luigi.Event.SUCCESS)
def success_handler(task):
    """Task が成功したときのコールバック (汎用)"""
    print('SUCCESS: {}'.format(task))


def main():
    luigi.run(main_task_cls=Greet, local_scheduler=True)


if __name__ == '__main__':
    main()

こちらも実行してみよう。 コールバックが呼び出されていることが分かる。

$ python event2.py 
DEBUG: Checking if Greet() is complete
/Users/amedama/.virtualenvs/py35/lib/python3.5/site-packages/luigi/worker.py:328: UserWarning: Task Greet() without outputs has no custom complete() method
  is_complete = task.complete()
INFO: Informed scheduler that task   Greet__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 8136] Worker Worker(salt=042982575, workers=1, host=macbookair.local, username=amedama, pid=8136) running   Greet()
Hello, World!
INFO: [pid 8136] Worker Worker(salt=042982575, workers=1, host=macbookair.local, username=amedama, pid=8136) done      Greet()
SUCCESS: Greet()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Greet__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=042982575, workers=1, host=macbookair.local, username=amedama, pid=8136) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 Greet()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

まとめ

今回は Python でバッチ処理を作るときデータパイプラインを構築するためのフレームワークである Luigi を使ってみた。 触ってみた感触としては部分的にクセはあるものの、なかなかシンプルな作りで用途にハマれば使いやすそうだ。

Mac OS X で Apache Spark を触ってみる

最近 Apache Spark について耳にすることが多い。 Apache Spark は、ビッグデータ処理における並列分散処理基盤を提供する OSS の一つ。 似たような用途としては Apache Hadoop も有名だけど、それよりも最大で 100 倍ほど高速に動作するんだとか。 高速に動作する理由としては、各ノードのメモリに乗り切るサイズのデータならディスクを介さずに扱える点が大きいらしい。

今回は、そんな Apache Spark を Mac OS X で軽く触ってみることにする。 本来であれば、用途的には複数のノードを用意して並列分散処理をさせるところだけど使うのは一つのノードだけ。 また Apache Spark を操作するには Java, Scala, Python のインターフェースがある。 その中でも、今回は Python のインターフェース (PySpark) を使ってみることにした。

環境は次の通り。

$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.12.4
BuildVersion:   16E195

インストール

まずは Apache Spark の動作に JRE が必要なのでインストールしておく。 インストールには Homebrew Cask を使うと楽できる。

$ brew cask install java

Apache Spark のインストールは Homebrew を使ってさくっといける。

$ brew install apache-spark

インタラクティブシェルから触ってみる

Apache Spark を Python から扱うには pyspark というコマンドを使う。 このコマンドを起動すると、Python から Apache Spark を扱う上で必要なパッケージなどが自動的にインポートされる。 それ以外については、特に普段使っている REPL と違いはないようだ。

$ pyspark
Python 2.7.10 (default, Feb  6 2017, 23:53:20) 
[GCC 4.2.1 Compatible Apple LLVM 8.0.0 (clang-800.0.34)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/05/04 18:42:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/05/04 18:43:13 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
17/05/04 18:43:13 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
17/05/04 18:43:14 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/

Using Python version 2.7.10 (default, Feb  6 2017 23:53:20)
SparkSession available as 'spark'.
>>> 

サイトパッケージの場所を調べてもシステムの Python の場所になっているし。

>>> from pip import locations
>>> locations.user_site
'/Users/amedama/Library/Python/2.7/lib/python/site-packages'

これは、どうやらデフォルトの python コマンドで起動されるものが使われているだけっぽい? 試しに virtualenv を使ってデフォルトが Python 3.5 になるようにしてみる。

$ python --version
Python 3.5.3

この状態で pyspark コマンドを実行すると、ちゃんと Python 3.5 が使われるようになった。 予想は当たっていたようだ。

$ pyspark    
Python 3.5.3 (default, Feb 26 2017, 01:47:55) 
[GCC 4.2.1 Compatible Apple LLVM 8.0.0 (clang-800.0.42.1)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/05/04 19:29:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/05/04 19:30:00 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/

Using Python version 3.5.3 (default, Feb 26 2017 01:47:55)
SparkSession available as 'spark'.
>>> 

インタラクティブシェルを終了するときは通常の Python の REPL と同じように Ctrl-D とか exit() 関数とかで。

>>> exit()

テキストファイルを処理してみる

それでは、次は実際に Apache Spark でテキストファイルを扱ってみることにしよう。 題材は Apache Spark の README ファイルにする。

$ brew install wget
$ wget https://raw.githubusercontent.com/apache/spark/master/README.md

あと、なんか操作していると随所で psutil 入れた方が良いよっていう警告が出るので入れておく。

$ pip install psutil

インタラクティブシェルを起動しよう。

$ pyspark

PySpark のインタラクティブシェルでは sc という SparkContext のインスタンスが処理の取っ掛かりになるみたい。 これは、あらかじめインタラクティブシェルを起動した時点で用意されている。

>>> sc
<pyspark.context.SparkContext object at 0x1068cf810>

まずは SparkContext#textFile() メソッドでテキストファイルを読み込む。

>>> textfile = sc.textFile("README.md")

これで得られるインスタンスは Resilient Distributed Dataset (RDD) と呼ばれる形式になっている。 基本的に Apache Spark では、この RDD という単位でデータを扱うことになる。

>>> textfile
README.md MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

この RDD には、あらかじめ色々な API が用意されている。 例えば RDD に含まれるデータの数は count() というメソッドで得られる。

>>> textfile.count()
103

他にも、データセットの最初の要素を取り出す first() だとか。

>>> textfile.first()
u'# Apache Spark'

データセットに含まれる要素全てを得るには collect() などを使う。

>>> textfile.collect()
[u'# Apache Spark', u'', u'Spark is a fast and general cluster computing system for Big Data. It provides', u'high-level APIs in Scala, Java, Python, and R, and an optimized engine that', u'supports general computation graphs for data analysis. It also supports a', u'rich set of higher-level tools including Spark SQL for SQL and DataFrames,', u'MLlib for machine learning, GraphX for graph processing,', u'and Spark Streaming for stream processing.', u'', u'<http://spark.apache.org/>', u'', u'', u'## Online Documentation', u'', u'You can find the latest Spark documentation, including a programming', u'guide, on the [project web page](http://spark.apache.org/documentation.html).', u'This README file only contains basic setup instructions.', u'', u'## Building Spark', u'', u'Spark is built using [Apache Maven](http://maven.apache.org/).', u'To build Spark and its example programs, run:', u'', u'    build/mvn -DskipTests clean package', u'', u'(You do not need to do this if you downloaded a pre-built package.)', u'', u'You can build Spark using more than one thread by using the -T option with Maven, see ["Parallel builds in Maven 3"](https://cwiki.apache.org/confluence/display/MAVEN/Parallel+builds+in+Maven+3).', u'More detailed documentation is available from the project site, at', u'["Building Spark"](http://spark.apache.org/docs/latest/building-spark.html).', u'', u'For general development tips, including info on developing Spark using an IDE, see ["Useful Developer Tools"](http://spark.apache.org/developer-tools.html).', u'', u'## Interactive Scala Shell', u'', u'The easiest way to start using Spark is through the Scala shell:', u'', u'    ./bin/spark-shell', u'', u'Try the following command, which should return 1000:', u'', u'    scala> sc.parallelize(1 to 1000).count()', u'', u'## Interactive Python Shell', u'', u'Alternatively, if you prefer Python, you can use the Python shell:', u'', u'    ./bin/pyspark', u'', u'And run the following command, which should also return 1000:', u'', u'    >>> sc.parallelize(range(1000)).count()', u'', u'## Example Programs', u'', u'Spark also comes with several sample programs in the `examples` directory.', u'To run one of them, use `./bin/run-example <class> [params]`. For example:', u'', u'    ./bin/run-example SparkPi', u'', u'will run the Pi example locally.', u'', u'You can set the MASTER environment variable when running examples to submit', u'examples to a cluster. This can be a mesos:// or spark:// URL,', u'"yarn" to run on YARN, and "local" to run', u'locally with one thread, or "local[N]" to run locally with N threads. You', u'can also use an abbreviated class name if the class is in the `examples`', u'package. For instance:', u'', u'    MASTER=spark://host:7077 ./bin/run-example SparkPi', u'', u'Many of the example programs print usage help if no params are given.', u'', u'## Running Tests', u'', u'Testing first requires [building Spark](#building-spark). Once Spark is built, tests', u'can be run using:', u'', u'    ./dev/run-tests', u'', u'Please see the guidance on how to', u'[run tests for a module, or individual tests](http://spark.apache.org/developer-tools.html#individual-tests).', u'', u'## A Note About Hadoop Versions', u'', u'Spark uses the Hadoop core library to talk to HDFS and other Hadoop-supported', u'storage systems. Because the protocols have changed in different versions of', u'Hadoop, you must build Spark against the same version that your cluster runs.', u'', u'Please refer to the build documentation at', u'["Specifying the Hadoop Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version)', u'for detailed guidance on building for a particular distribution of Hadoop, including', u'building for particular Hive and Hive Thriftserver distributions.', u'', u'## Configuration', u'', u'Please refer to the [Configuration Guide](http://spark.apache.org/docs/latest/configuration.html)', u'in the online documentation for an overview on how to configure Spark.', u'', u'## Contributing', u'', u'Please review the [Contribution to Spark guide](http://spark.apache.org/contributing.html)', u'for information on how to get started contributing to the project.']

RDD に用意されている全ての API について知りたいときは、以下の公式ドキュメントを参照する感じで。

spark.apache.org

例として「Spark」という文字列が含まれる行だけを取り出してみよう。 このような要素には filter() メソッドが使える。

>>> filtered_rdd = textfile.filter(lambda line: u'Spark' in line)

処理の結果も、また RDD で得られる。 見たところ 20 行あるようだ。

>>> filtered_rdd.count()
20

要素を見ると、たしかにどの行にも「Spark」の文字列が含まれている。

>>> filtered_rdd.collect()
[u'# Apache Spark', u'Spark is a fast and general cluster computing system for Big Data. It provides', u'rich set of higher-level tools including Spark SQL for SQL and DataFrames,', u'and Spark Streaming for stream processing.', u'You can find the latest Spark documentation, including a programming', u'## Building Spark', u'Spark is built using [Apache Maven](http://maven.apache.org/).', u'To build Spark and its example programs, run:', u'You can build Spark using more than one thread by using the -T option with Maven, see ["Parallel builds in Maven 3"](https://cwiki.apache.org/confluence/display/MAVEN/Parallel+builds+in+Maven+3).', u'["Building Spark"](http://spark.apache.org/docs/latest/building-spark.html).', u'For general development tips, including info on developing Spark using an IDE, see ["Useful Developer Tools"](http://spark.apache.org/developer-tools.html).', u'The easiest way to start using Spark is through the Scala shell:', u'Spark also comes with several sample programs in the `examples` directory.', u'    ./bin/run-example SparkPi', u'    MASTER=spark://host:7077 ./bin/run-example SparkPi', u'Testing first requires [building Spark](#building-spark). Once Spark is built, tests', u'Spark uses the Hadoop core library to talk to HDFS and other Hadoop-supported', u'Hadoop, you must build Spark against the same version that your cluster runs.', u'in the online documentation for an overview on how to configure Spark.', u'Please review the [Contribution to Spark guide](http://spark.apache.org/contributing.html)']

次は、ビッグデータ処理のハローワールドとも言えるワードカウントを試してみよう。 まずは、テキストの内容をスペースで区切る。 これは MapReduce アルゴリズムでいう Map に相当する。

>>> words = textfile.flatMap(lambda line: line.split())
>>> words.count()
495

まあ、このままだと空白で区切っただけなので単語ではないものも含まれちゃうけど。

>>> words.first()
u'#'

先ほどテキストを区切るのに flatMap() を使ったのは、ただの map() だとリストが複数含まれるデータセットになってしまうため。 リストをさらに開いた状態 (flat) にしておかないと扱いづらい。

>>> textfile.map(lambda line: line.split()).first()
[u'#', u'Apache', u'Spark']

続いて、各単語に対して出現頻度をカウントするために数字を添えてタプルにする。 これも Map 処理だね。

>>> words_tuple = words.map(lambda word: (word, 1))
>>> words_tuple.first()
(u'#', 1)

あとはキー (各単語) ごとに出現頻度をカウントする。 これが MapReduce でいう Reduce に相当する。 ここの処理では、タプル同士を足し算すると第二要素のカウンタが増えることになる。 この動作は Python の流儀からすると、ちょっと直感に反するね。

>>> words_count = words_tuple.reduceByKey(lambda a, b: a + b)

これで、それぞれの単語の出現頻度がカウントできた。

>>> words_count.collect()[:10]
[('guide,', 1), ('APIs', 1), ('optimized', 1), ('name', 1), ('storage', 1), ('developing', 1), ('It', 2), ('package.', 1), ('particular', 2), ('development', 1)]

一番登場する頻度が多いのはどれなのかを調べるために、カウンタの値にもとづいて降順ソートする。 どうやら「the」が一番多いらしい。

>>> words_count_sorted = words_count.sortBy(lambda t: t[1], False)
>>> words_count_sorted.collect()[:10]
[('the', 24), ('to', 17), ('Spark', 16), ('for', 12), ('and', 9), ('##', 9), ('a', 8), ('run', 7), ('on', 7), ('can', 7)]

まとめ

今回は Apache Spark を Python のインタラクティブシェルを通して軽く触ってみた。

Python3 エンジニア基礎認定試験を受けてみた

表題の通り、Python3 エンジニア基礎認定試験という民間の試験を受けてみた。

www.pythonic-exam.com

最近になって合格証書が届いたので、どんな感じだったか軽く書いてみる。

f:id:momijiame:20170409221053j:plain

受けるまでの経緯

Python の認定試験が始まるらしいということは以前から知っていたんだけど、続報を聞かないのでどうなっているのかなーと思ってた。 そんな折、友人と Python について話す機会があって、そこでベータ試験が始まっていることを教えてもらい受けてみることにした。 最初の方のベータ試験は受験料が無料だったみたいだけど、受けたのは最終ベータ試験ということで有料 (¥10,800) だった。 ちなみに、ベータ試験であっても合格すれば本試験に合格したのと同じ扱いになるらしい。

蛇足だけど、ベータ試験の場合は払った費用が受験料という名目ではなくて試験を主催してる団体への協賛金という扱いになるっぽい。 要するに、お金周りの動きのチェックってことなんだろうね。 あと、特典として Pythonic マグカップがもらえた。

f:id:momijiame:20170204155948j:plain:w300

受験しての所感

問題の内容は、特に引っかけという感じの設問もなく素直な印象を受けた。 普段 Python で実務をこなしてるような人なら、特に勉強しなくても受かるんじゃないだろうか。 一応、認定教材としては Python チュートリアルが挙げられている。 試験の問題は、この本の内容に沿って章ごとに一定の割合で出されるっぽい。

Pythonチュートリアル 第3版

Pythonチュートリアル 第3版

それと、合格証書と一緒に、こんな感じで何処でどう得点したか示された紙がもらえる。 得点の低かった標準ライブラリは盲点で、普段使わない機能とかが出てくると分からないね・・・。 別に使ったことなくても、そのときは公式ドキュメントを調べれば良いんだし!(言い訳)

f:id:momijiame:20170409221934j:plain

受験時には、問題の誤りや Pythonic じゃないと感じた点があったら問題用紙に記号を書き込んで教えてくれ、というようなアナウンスもあった。 これは、おそらくベータ試験だからの措置で本試験ではきっとなくなるんだろうと思う。 ちなみに、試験後に問題用紙は回収されるので持って帰ることはできなかった。 あと、受験したベータ試験ではマークシート方式の筆記試験だったけど、本試験が始まったら CBT になるっぽい?

まとめ

今回、Python3 エンジニア基礎認定試験のベータ試験を受けて合格した。 内容的には落とすための試験という感じではないので、基本的な文法や機能さえちゃんと理解していれば受かりそう。

Mac OS X で Apache Kafka を触ってみる

Apache Kafka は OSS の分散型メッセージングミドルウェア。 似た性質を持ったソフトウェアとしては ActiveMQRabbitMQ などが挙げられる。 ただし、ActiveMQ や RabbitMQ との大きな違いは、独自のバイナリプロトコルを用いてメッセージをやり取りするところ。 ActiveMQ や RabbitMQ は標準化された AMQPMQTT を扱う場合が多い。 独自プロトコルというと、なんだか未来が無さそうなイメージがあるけど、その逆で Kafka はビッグデータ処理の場面ではほぼデファクトの位置にあるようだ。

今回は、そんな Kafka を手元の Mac でさくっと試してみることにする。 使った環境は次の通り。

$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.12.4
BuildVersion:   16E195

インストールする

Kafka は Homebrew でインストールできる。 なので、まずは Homebrew をインストールしておく。 その上で Homebrew Cask についても使えるようにしておく。

$ /usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"
$ brew tap caskroom/cask

Kafka は Java と Scala を使って書かれている。 なので、まずは動作に必要な Java の処理系をインストールする。

$ brew cask install java

あとはお目当ての Kafka をインストールするだけ。

$ brew install kafka

このとき、依存パッケージとして Apache ZooKeeper もインストールされる。

サービスを動かす

続いてはインストールした Kafka を動作させる。 これには Homebrew のサービス機能を使うと楽ができる。

インストールした直後では Kafka と ZooKeeper は動いていない。

$ brew services list
Name      Status  User Plist
kafka     stopped
zookeeper stopped

そこで ZooKeeper と Kafka を順番に立ち上げていく。

$ brew services start zookeeper
$ brew services start kafka

両方のサービスが立ち上がればオッケー。

$ brew services list           
Name      Status  User    Plist
kafka     started amedama /Users/amedama/Library/LaunchAgents/homebrew.mxcl.kafka.plist
zookeeper started amedama /Users/amedama/Library/LaunchAgents/homebrew.mxcl.zookeeper.plist

ちなみに、上記の機能を使わなくても次のようにしてコマンドラインから起動することもできる。

$ zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
$ kafka-server-start /usr/local/etc/kafka/server.properties

もし、上手く立ち上がらないときは、それぞれのログを確認しよう。 ログはデフォルトで /usr/local/var/log に保存されている。

同梱されているコマンドで Kafka を操作してみる

Kafka には操作するためのコマンドラインツールが同梱されている。 今回は、それを使ってみる。

トピックを作る

まずは、動作確認用のトピックを作成する。 トピックは Pub/Sub 型のメッセージングミドルウェアによく登場する概念だ。 これは、例えば扱うメッセージの種類やコンポーネントごとに用意する。

$ kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".

--replication-factor オプションはクラスタ内でのメッセージのレプリケーション数を表している。 今回はシングルホスト構成なので必然的に 1 となる。 --partitions オプションはトピックでメッセージを分散処理するための機能の指定で、これを使った例については後ほど紹介する。

まずは、これで動作確認用の test トピックができた。

$ kafka-topics --list --zookeeper localhost:2181
test

メッセージを読み書きする

次に、作成したトピックに対してコンシューマ、つまりメッセージをトピックから読み出す存在を接続する。

$ kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning

続いて、作成したトピックに対してプロデューサ、つまりメッセージをトピックに書き込む存在を接続する。 ターミナルから、何か適当にメッセージを書き込んでみよう。

$ kafka-console-producer --broker-list localhost:9092 --topic test
Hello, World!

すると、先ほど接続したコンシューマに、プロデューサが書き込んだメッセージが送られる。

$ kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
Hello, World!

ちなみに、Kafka のメッセージはデフォルトで永続化されている。 例えば、Ctrl-C で一旦コンシューマを停止させてから、もう一度立ち上げてみよう。

$ kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
Hello, World!

ちゃんと、先ほど受信したのと同じ内容が表示された。 これは、Kafka を再起動しても同じ結果になるので試してみると面白い。

ちなみに、コマンドを --from-beginning オプションを付けずに実行すれば、途中から受信できる。

$ kafka-console-consumer --bootstrap-server localhost:9092 --topic test

複数のコンシューマでメッセージを分散処理する

これまでの例では、トピックを作るときにパーティションを 1 に指定した動作を試してきた。 次は、複数のコンピューマでメッセージを分散処理するために、トピックに複数のパーティーションを作ってみる。

今度はトピック greet をパーティーション数 2 で作成する。

$ kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic greet
Created topic "greet".

続いては、作成したトピックにコンシューマを接続する。 このとき、自身が受信するパーティションを --partition オプションで指定する。 一つ目のターミナルでは、まずは 0 を指定しておく。

$ kafka-console-consumer --bootstrap-server localhost:9092 --partition 0 --topic greet --from-beginning

続いて別のターミナルを開いて、パーティーションが 1 のコンシューマを接続する。

$ kafka-console-consumer --bootstrap-server localhost:9092 --partition 1 --topic greet --from-beginning

次はトピックにプロデューサを接続して、適当にメッセージを送ってみよう。

$ kafka-console-producer --broker-list localhost:9092 --topic greet
Message1
Message2

すると、送ったメッセージがバラバラにコンシューマに届くことが分かる。 まず、最初のメッセージはパーティーション 0 のコンシューマに送られた。

$ kafka-console-consumer --bootstrap-server localhost:9092 --partition 0 --topic greet --from-beginning
Message1

そして、二番目のメッセージはパーティーション 1 のコンシューマに送られている。

$ kafka-console-consumer --bootstrap-server localhost:9092 --partition 1 --topic greet --from-beginning
Message2

このように、パーティーションの機能を使うことでメッセージを複数のコンシューマで分散処理できるようになっている。

まとめ

今回は Mac OS X を使って Apache Kafka を軽く触ってみた。 実運用に載せるなら色々と考えるところはあるけど、手元の検証用としてなら簡単に環境を用意して触れることが分かった。 Apache Kafka はビッグデータ処理のメッセージングミドルウェアとしてはデファクトの存在なので引き続き知見を貯めていきたい。

Python: scikit-learn で決定木 (Decision Tree) を試してみる

今回は機械学習アルゴリズムの一つである決定木を scikit-learn で試してみることにする。 決定木は、その名の通り木構造のモデルとなっていて、分類問題ないし回帰問題を解くのに使える。 また、決定木自体はランダムフォレストのような、より高度なアルゴリズムのベースとなっている。

使うときの API は scikit-learn が抽象化しているので、まずは軽く触ってみるところから始めよう。 決定木がどんな構造を持ったモデルなのかは最後にグラフで示す。 また、決定木自体は回帰問題にも使えるけど、今回は分類問題だけにフォーカスしている。

使った環境は次の通り。

$ sw_vers    
ProductName:    Mac OS X
ProductVersion: 10.12.4
BuildVersion:   16E195
$ python --version
Python 3.5.3

下準備

まずは、今回のサンプルコードを動かすのに必要な Python のパッケージをインストールしておく。

$ pip install scipy scikit-learn matplotlib

アイリスデータセットを分類してみる

まずは定番のアイリス (あやめ) データセットを決定木で分類してみることにする。 といっても scikit-learn を使う限りは、分類器が違っても API は同じなので使用感は変わらない。

次のサンプルコードではアイリスデータセットに含まれる三種類の花の品種を決定木で分類している。 モデルの汎化性能は LOO 法を使って計算した。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from sklearn import datasets
from sklearn.tree import DecisionTreeClassifier
from sklearn.model_selection import LeaveOneOut
from sklearn.metrics import accuracy_score


def main():
    # アイリスデータセットを読み込む
    dataset = datasets.load_iris()

    # 教師データとラベルデータを取り出す
    features = dataset.data
    targets = dataset.target

    # 判定したラベルデータを入れるリスト
    predicted_labels = []
    # LOO 法で汎化性能を調べる
    loo = LeaveOneOut()
    for train, test in loo.split(features):
        # 学習に使うデータ
        train_data = features[train]
        target_data = targets[train]

        # モデルを学習させる
        clf = DecisionTreeClassifier()
        clf.fit(train_data, target_data)

        # テストに使うデータを正しく判定できるか
        predicted_label = clf.predict(features[test])
        predicted_labels.append(predicted_label)

    # テストデータでの正解率 (汎化性能) を出力する
    score = accuracy_score(targets, predicted_labels)
    print(score)


if __name__ == '__main__':
    main()

上記を実行すると、次のような結果が得られる。 約 95.3% の汎化性能が得られた。 ただし、決定木はどんな木構造になるかが毎回異なるので汎化性能も微妙に異なってくる。

0.953333333333

ハイパーパラメータを調整する

機械学習アルゴリズムで、人間が調整してやらなきゃいけないパラメータのことをハイパーパラメータという。 決定木では、木構造の深さがモデルの複雑度を調整するためのハイパーパラメータになっている。 深いものはより複雑で、浅いものはより単純なモデルになる。

次のサンプルコードは、決定木の深さを指定した数に制限した状態での汎化性能を示すものになっている。 具体的な深さについては 1 ~ 20 を順番に試行している。 ちなみに、指定できるのは「最大の深さ」なので、できあがる木構造がそれよりも浅いということは十分にありうる。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from matplotlib import pyplot as plt

from sklearn import datasets
from sklearn.tree import DecisionTreeClassifier
from sklearn.model_selection import LeaveOneOut
from sklearn.metrics import accuracy_score


def main():
    dataset = datasets.load_iris()

    features = dataset.data
    targets = dataset.target

    # 調べる深さ
    MAX_DEPTH = 20
    depths = range(1, MAX_DEPTH)

    # 決定木の最大深度ごとに正解率を計算する
    accuracy_scores = []
    for depth in depths:

        predicted_labels = []
        loo = LeaveOneOut()
        for train, test in loo.split(features):
            train_data = features[train]
            target_data = targets[train]

            clf = DecisionTreeClassifier(max_depth=depth)
            clf.fit(train_data, target_data)

            predicted_label = clf.predict(features[test])
            predicted_labels.append(predicted_label)

        # 各深度での汎化性能を出力する
        score = accuracy_score(targets, predicted_labels)
        print('max depth={0}: {1}'.format(depth, score))

        accuracy_scores.append(score)

    # 最大深度ごとの正解率を折れ線グラフで可視化する
    X = list(depths)
    plt.plot(X, accuracy_scores)

    plt.xlabel('max depth')
    plt.ylabel('accuracy rate')
    plt.show()


if __name__ == '__main__':
    main()

上記の実行結果は次の通り。 前述した通り決定木がどんな木構造になるかは毎回異なるので、これも毎回微妙に異なるはず。

max depth=1: 0.3333333333333333
max depth=2: 0.9533333333333334
max depth=3: 0.9466666666666667
max depth=4: 0.9466666666666667
max depth=5: 0.9466666666666667
max depth=6: 0.9466666666666667
max depth=7: 0.9466666666666667
max depth=8: 0.94
max depth=9: 0.9533333333333334
max depth=10: 0.94
max depth=11: 0.9533333333333334
max depth=12: 0.9466666666666667
max depth=13: 0.9466666666666667
max depth=14: 0.94
max depth=15: 0.94
max depth=16: 0.9466666666666667
max depth=17: 0.96
max depth=18: 0.9466666666666667
max depth=19: 0.9466666666666667

同時に、次のような折れ線グラフが得られる。 どうやら、今回のケースでは最大の深さが 3 以上であれば、汎化性能はどれもそんなに変わらないようだ。 f:id:momijiame:20170425221114p:plain

どのように分類されているのか可視化してみる

先ほどは深さによって汎化性能がどのように変わってくるかを見てみた。 今回扱うデータセットでは 3 以上あれば汎化性能にはさほど大きな影響がないらしいことが分かった。 次は、木構造の深さ (つまりモデルの複雑度) によって分類のされ方がどのように変わるのかを見てみたい。

次のサンプルコードでは、二次元の散布図を元に分類される様子を見るために教師データを二次元に絞っている。 具体的には、データセットの教師データの中から「Petal length」と「Petal width」だけを取り出して使っている。 その上で、それぞれを x 軸と y 軸にプロットした。 また、同時にどの点がどの品種として分類されているかを背景に色付けしている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import numpy as np

import matplotlib.pyplot as plt

from sklearn import datasets
from sklearn.tree import DecisionTreeClassifier


def main():
    dataset = datasets.load_iris()

    features = dataset.data
    targets = dataset.target

    # Petal length と Petal width だけを特徴量として使う (二次元で図示したいので)
    petal_features = features[:, 2:]

    # 決定木の最大深度は制限しない
    clf = DecisionTreeClassifier()
    clf.fit(petal_features, targets)

    # 教師データの取りうる範囲 +-1 を計算する
    train_x_min = petal_features[:, 0].min() - 1
    train_y_min = petal_features[:, 1].min() - 1
    train_x_max = petal_features[:, 0].max() + 1
    train_y_max = petal_features[:, 1].max() + 1

    # 教師データの取りうる範囲でメッシュ状の座標を作る
    grid_interval = 0.2
    xx, yy = np.meshgrid(
        np.arange(train_x_min, train_x_max, grid_interval),
        np.arange(train_y_min, train_y_max, grid_interval),
    )

    # メッシュの座標を学習したモデルで判定させる
    Z = clf.predict(np.c_[xx.ravel(), yy.ravel()])
    # 各点の判定結果をグラフに描画する
    plt.contourf(xx, yy, Z.reshape(xx.shape), cmap=plt.cm.bone)

    # 教師データもプロットしておく
    for c in np.unique(targets):
        plt.scatter(petal_features[targets == c, 0],
                    petal_features[targets == c, 1])

    feature_names = dataset.feature_names
    plt.xlabel(feature_names[2])
    plt.ylabel(feature_names[3])
    plt.show()


if __name__ == '__main__':
    main()

このモデルについては木構造の深さを制限していない。

上記を実行すると、次のような散布図が得られる。

f:id:momijiame:20170425221614p:plain

次は、上記のサンプルコードに木構造の深さの制限を入れてみよう。 とりあえず最大の深さを 3 までに制限してみる。 前述した通り、こうしても汎化性能自体には大きな影響はないようだった。 分類のされ方には変化が出てくるだろうか?

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import numpy as np

import matplotlib.pyplot as plt

from sklearn import datasets
from sklearn.tree import DecisionTreeClassifier


def main():
    dataset = datasets.load_iris()

    features = dataset.data
    targets = dataset.target

    petal_features = features[:, 2:]

    # 決定木の深さを 3 までに制限する
    clf = DecisionTreeClassifier(max_depth=3)
    clf.fit(petal_features, targets)

    train_x_min = petal_features[:, 0].min() - 1
    train_y_min = petal_features[:, 1].min() - 1
    train_x_max = petal_features[:, 0].max() + 1
    train_y_max = petal_features[:, 1].max() + 1

    grid_interval = 0.2
    xx, yy = np.meshgrid(
        np.arange(train_x_min, train_x_max, grid_interval),
        np.arange(train_y_min, train_y_max, grid_interval),
    )
    Z = clf.predict(np.c_[xx.ravel(), yy.ravel()])
    plt.contourf(xx, yy, Z.reshape(xx.shape), cmap=plt.cm.bone)

    for c in np.unique(targets):
        plt.scatter(petal_features[targets == c, 0],
                    petal_features[targets == c, 1])

    feature_names = dataset.feature_names
    plt.xlabel(feature_names[2])
    plt.ylabel(feature_names[3])
    plt.show()


if __name__ == '__main__':
    main()

上記を実行すると、次のような散布図が得られる。

f:id:momijiame:20170425222036p:plain

先ほどの例と比べてみよう。 オレンジ色の品種が緑色の品種のところに食い込んでいるところが、このケースでは正しく認識されなくなっている。 モデルがより単純になったと考えられるだろう。

木構造を可視化してみる

scikit-learn には決定木の構造を DOT 言語で出力する機能がある。 その機能を使って木構造を可視化してみることにしよう。

まずは DOT 言語を処理するために Graphviz をインストールする。

$ brew install graphviz

そして、次のように学習させたモデルから DecisionTreeClassifier#export_graphviz() メソッドで DOT 言語で書かれたファイルを出力させる。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from sklearn import datasets
from sklearn.tree import DecisionTreeClassifier
from sklearn import tree


def main():
    dataset = datasets.load_iris()

    features = dataset.data
    targets = dataset.target

    # Petal length と Petal width だけを特徴量として使う
    petal_features = features[:, 2:]

    # モデルを学習させる
    clf = DecisionTreeClassifier(max_depth=3)
    clf.fit(petal_features, targets)

    # DOT 言語のフォーマットで決定木の形を出力する
    with open('iris-dtree.dot', mode='w') as f:
        tree.export_graphviz(clf, out_file=f)


if __name__ == '__main__':
    main()

これを Graphviz で画像データに変換する。

$ dot -T png iris-dtree.dot -o iris-dtree.png

すると、次のようなグラフが得られる。

f:id:momijiame:20170425223006p:plain

グラフでは、葉ノード以外が分類をするための分岐になっている。 これは、ようするに木構造が深くなるに従ってだんだんと対象を絞り込んでいっていることを意味する。 例えば、最初の分岐では x 軸が 2.45 未満のところで分岐している。 そして、左側の葉ノードは青色の品種が全て集まっていることが分かる。

まとめ

  • 今回は決定木を scikit-learn で試してみた
  • 決定木はランダムフォレストのようなアルゴリズムのベースとなっている
  • 決定木のモデルの複雑さは木構造の深さで制御する
  • 木構造の深さが浅くなるほど分類のされ方も単純になった

はじめてのパターン認識

はじめてのパターン認識

Python: SQLAlchemy の生成する SQL をテストするパッケージを作ってみた

SQLAlchemy は Python でよく使われている O/R マッパーの一つ。 今回は、そんな SQLAlchemy が生成する SQL 文を確認するためのパッケージを作ってみたよ、という話。

具体的には、以下の sqlalchemy-profile というパッケージを作ってみた。 このエントリでは、なんでこんなものを作ったのかみたいな話をしてみる。

github.com

使った環境は次の通り。 ただし sqlalchemy-profile 自体はプラットフォームに依存せず Python 2.7, 3.3 ~ 3.6 に対応している。

$ sw_vers 
ProductName:    Mac OS X
ProductVersion: 10.12.4
BuildVersion:   16E195
$ python --version
Python 3.6.1

O/R マッパーについて

O/R マッパーというのは、プログラミング言語からリレーショナルデータベース (RDB) を良い感じに使うための機能ないしライブラリの総称。 プログラミング言語から RDB を操作するための SQL 文を直に扱ってしまうと、両者のパラダイムの違いから色々な問題が起こる。 この問題は、一般にインピーダンスミスマッチと呼ばれている。 そこで登場するのが O/R マッパーで、これを使うとプログラミング言語のオブジェクトを操作する形で RDB を操作できるようになる。

論よりソースということで、まずは SQLAlchemy の基本的な使い方から見てみよう。 その前に SQLAlchemy 自体をインストールしておく。

$ pip install sqlalchemy

そして次に示すのがサンプルコード。 ユーザ情報を模したモデルクラスを用意して、それを SQLite のオンメモリデータベースで永続化している。 この中には SQL 文が全く登場していないところがポイントとなる。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from sqlalchemy.ext import declarative
from sqlalchemy import Column
from sqlalchemy import Integer
from sqlalchemy import Text
from sqlalchemy import create_engine
from sqlalchemy.orm.session import sessionmaker

Base = declarative.declarative_base()


class User(Base):
    """SQLAlchemy のモデルクラス

    このクラスが RDB のテーブルと対応し、インスタンスはテーブルの一レコードに対応する
    ここではユーザの情報を格納するテーブルを模している"""
    __tablename__ = 'users'

    # テーブルの主キー
    id = Column(Integer, primary_key=True)
    # 名前を入れるカラム
    name = Column(Text, nullable=False)


def main():
    # データベースとの接続に使う情報
    # ここでは SQLite のオンメモリデータベースを使う
    # echo=True とすることで生成される SQL 文を確認できる
    engine = create_engine('sqlite:///', echo=True)
    # モデルの情報を元にテーブルを生成する
    Base.metadata.create_all(engine)
    # データベースとのセッションを確立する
    session_maker = sessionmaker(bind=engine)
    session = session_maker()

    # データベースのトランザクションを作る
    with session.begin(subtransactions=True):
        # レコードに対応するモデルのインスタンスを作る
        user = User(name='Alice')
        # そのインスタンスをセッションに追加する
        session.add(user)

    # トランザクションがコミットされてオブジェクトが RDB で永続化される

if __name__ == '__main__':
    main()

上記のサンプルコードでは生成される SQL 文を標準出力に表示するようにしている。 なので、実行するとこんな感じの出力が得られる。

2017-04-20 04:48:30,976 INFO sqlalchemy.engine.base.Engine SELECT CAST('test plain returns' AS VARCHAR(60)) AS anon_1
2017-04-20 04:48:30,976 INFO sqlalchemy.engine.base.Engine ()
2017-04-20 04:48:30,978 INFO sqlalchemy.engine.base.Engine SELECT CAST('test unicode returns' AS VARCHAR(60)) AS anon_1
2017-04-20 04:48:30,978 INFO sqlalchemy.engine.base.Engine ()
2017-04-20 04:48:30,980 INFO sqlalchemy.engine.base.Engine PRAGMA table_info("users")
2017-04-20 04:48:30,980 INFO sqlalchemy.engine.base.Engine ()
2017-04-20 04:48:30,982 INFO sqlalchemy.engine.base.Engine 
CREATE TABLE users (
    id INTEGER NOT NULL, 
    name TEXT NOT NULL, 
    PRIMARY KEY (id)
)


2017-04-20 04:48:30,983 INFO sqlalchemy.engine.base.Engine ()
2017-04-20 04:48:30,984 INFO sqlalchemy.engine.base.Engine COMMIT
2017-04-20 04:48:30,987 INFO sqlalchemy.engine.base.Engine BEGIN (implicit)
2017-04-20 04:48:30,989 INFO sqlalchemy.engine.base.Engine INSERT INTO users (name) VALUES (?)
2017-04-20 04:48:30,989 INFO sqlalchemy.engine.base.Engine ('Alice',)

たしかに Python のオブジェクトを使うだけで RDB を操作できた。便利。 ただし、上記で使った生成した SQL 文を出力する機能はデバッグ用途なので普段は無効にされる場合が多い。

SQL が隠蔽されることのメリットとデメリット

先ほど見た通り O/R マッパーを使うと Python のオブジェクトを通して RDB を操作できるようになる。 これにはインピーダンスミスマッチの解消という多大なメリットがある反面、生成される SQL が隠蔽されるというデメリットもある。

例えば、直接 SQL を書くならそんな非効率なクエリは組まないよね・・・というような内容も、気をつけていないと生成されうる。 これは、典型的には N + 1 問題とか。 それを防ぐには、これまでだとコードから生成される SQL 文を推測したり、あるいは先ほどのようにして実際に目で見て確かめていた。 慣れてくるとどんな SQL 文が発行されるか分かってくるのと、実際に目で見て確かめるのは手間なので大体は前者になっている。

ただ、パフォーマンスチューニングの世界では、推測する前に測定せよという格言もある。 実際に生成される SQL 文を、ユニットテストで確認できるようになっているべきなのでは、という考えに至った。 それが、今回作ったパッケージ sqlalchemy-profile のモチベーションになっている。

ただし、どんな SQL 文が生成されるかは SQLAlchemy のアルゴリズム次第なので、気をつけないとテストのメンテナンス性が低下する恐れはあると思う。 これは、SQLAlchemy のバージョン変更とか、些細なモデルの構造変更でテストを修正する手間がかかるかも、ということ。 とはいえ、それはそれで生成される SQL が変更されたことにちゃんと気づけるのは大事じゃないかという感じでいる。

sqlalchemy-profile について

やっと本題に入るんだけど、前述した問題を解消すべく sqlalchemy-profile という Python のパッケージを作ってみた。 これを使うことで、SQLAlchemy が生成する SQL 文を確かめることができる。

Python のパッケージリポジトリである PyPI にも登録しておいた。 pypi.python.org

インストールは Python のパッケージマネージャの PIP からできる。

$ pip install sqlalchemy-profile

使い方

ここからは sqlalchemy-profile の具体的な使い方について見ていく。 シンプルなのでサンプルコードをいくつか見れば、すぐに分かってもらえると思う。 ちなみに、トラッキングしている SQL 文は今のところ INSERT, UPDATE, SELECT, DELETE の四つ。

以下のサンプルコードでは、最も基本的な使い方を示している。 まず、プロファイラとなる StatementProfiler には SQLAlchemy のデータベースとの接続情報を渡す。 そして、プロファイルしている期間中に実行された SQL 文を記録する、というもの。 ユニットテストで利用することを意図しているので、サンプルコードも Python の unittest モジュールを使うものにしてみた。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import unittest

from sqlalchemy import create_engine

from sqlalchemy_profile import StatementProfiler


class Test_RawExecute(unittest.TestCase):

    def test(self):
        # データベースとの接続を確立する
        engine = create_engine('sqlite:///')
        connection = engine.connect()

        # データベースとの接続情報を渡してプロファイラをインスタンス化する
        profiler = StatementProfiler(engine)
        # プロファイルを開始する
        profiler.start()

        # SQLAlchemy を使って RDB を操作する
        # ここでは、サンプルコードをシンプルにする目的で低レイヤーな API を使っている
        connection.execute('SELECT 1')
        connection.execute('SELECT 2')

        # プロファイルを停止する
        profiler.stop()

        # 実行された SQL 文の内容を確認する
        assert profiler.count == 2
        assert profiler.select == 2


if __name__ == '__main__':
    unittest.main()

上記では、分かりやすくするためにあえて SQLAlchemy の直接 SQL 文を扱う低レイヤーな API を使っている。

上記を実行するとテストがパスする。

$ python profile101.py 
.
----------------------------------------------------------------------
Ran 1 test in 0.020s

OK

このとき assert しているところの数値を変更すると、当然だけどテストは失敗するようになる。 想定していた SQL 文の数と、実際に発行された数が一致しないため。

$ python profile101.py
F
======================================================================
FAIL: test (__main__.Test_RawExecute)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "profile101.py", line 32, in test
    assert profiler.count == 1
AssertionError

----------------------------------------------------------------------
Ran 1 test in 0.017s

FAILED (failures=1)

O/R マッピングと共に使う

先ほどの例では、分かりやすさのためにあえて SQLAlchemy の直接 SQL 文を扱う低レイヤーな API を使っていた。 もちろん sqlalchemy-profile は O/R マッピングをしたコードでも動作するし、使い方については何も変わらない。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import unittest

from sqlalchemy.ext import declarative
from sqlalchemy import Column
from sqlalchemy import Integer
from sqlalchemy import Text
from sqlalchemy import create_engine
from sqlalchemy.orm.session import sessionmaker

from sqlalchemy_profile import StatementProfiler

Base = declarative.declarative_base()


class _User(Base):
    """ユーザ情報を模したモデルクラス"""
    __tablename__ = 'users'

    # 主キー
    id = Column(Integer, primary_key=True)
    # 名前を格納するカラム
    name = Column(Text, nullable=False)


class Test_ORMapping(unittest.TestCase):

    def setUp(self):
        """テストが実行される前の下準備"""
        self.engine = create_engine('sqlite:///')
        Base.metadata.create_all(self.engine)
        self.session_maker = sessionmaker(bind=self.engine)

    def tearDown(self):
        """テストが実行された後の後始末"""
        Base.metadata.drop_all(self.engine)

    def test(self):
        session = self.session_maker()

        profiler = StatementProfiler(self.engine)
        profiler.start()

        # 以下のユーザを模したインスタンスを一通り CRUD していく
        user = _User(name='Alice')

        # INSERT
        with session.begin(subtransactions=True):
            session.add(user)

        # UPDATE
        with session.begin(subtransactions=True):
            user.name = 'Bob'

        # SELECT
        session.query(_User).all()

        # DELETE
        with session.begin(subtransactions=True):
            session.delete(user)

        profiler.stop()

        # SQL 文は各一回ずつ実行されているはず
        assert profiler.count == 4
        assert profiler.insert == 1
        assert profiler.update == 1
        assert profiler.select == 1
        assert profiler.delete == 1


if __name__ == '__main__':
    unittest.main()

with ステートメントと共に使う

これまでの例ではプロファイリング期間を start() メソッドと stop() メソッドで制御したけど、これは with でも代用できる。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import unittest

from sqlalchemy import create_engine

from sqlalchemy_profile import StatementProfiler


class Test_WithStatement(unittest.TestCase):

    def test(self):
        engine = create_engine('sqlite:///')
        connection = engine.connect()

        # with ステートメントのスコープで実行された SQL 文を記録する
        with StatementProfiler(engine) as profiler:
            connection.execute('SELECT 1')
            connection.execute('SELECT 2')

        assert profiler.count == 2
        assert profiler.select == 2


if __name__ == '__main__':
    unittest.main()

デコレータと共に使う

with を使うのもめんどくさいなー、というときはテストメソッド自体をデコレータで修飾しちゃうような使い方もできる。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import unittest

from sqlalchemy import create_engine

from sqlalchemy_profile import sqlprofile

ENGINE = create_engine('sqlite:///')


class Test_Decorator(unittest.TestCase):

    # ユニットテストのメソッドをデコレータで修飾する
    # メソッド内で実行されることが想定される SQL 文の数を指定する
    @sqlprofile(ENGINE, count=2, select=2)
    def test(self):
        connection = ENGINE.connect()

        connection.execute('SELECT 1')
        connection.execute('SELECT 2')


if __name__ == '__main__':
    unittest.main()

SQL 文の種類と順序まで確認したい

いやいや回数だけのアサーションとかアバウトすぎるでしょ、っていうときは StatementProfiler#sequence を使う。 これで INSERT, UPDATE, SELECT, DELETE が、どんな順番で実行されたかを確認できる。 中身は文字列で、それぞれの操作の頭文字が入っている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import unittest

from sqlalchemy.ext import declarative
from sqlalchemy import Column
from sqlalchemy import Integer
from sqlalchemy import Text
from sqlalchemy import create_engine
from sqlalchemy.orm.session import sessionmaker

from sqlalchemy_profile import StatementProfiler

Base = declarative.declarative_base()


class _User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    name = Column(Text, nullable=False)


class Test_ORMapping(unittest.TestCase):

    def setUp(self):
        self.engine = create_engine('sqlite:///')
        Base.metadata.create_all(self.engine)
        self.session_maker = sessionmaker(bind=self.engine)

    def tearDown(self):
        Base.metadata.drop_all(self.engine)

    def test(self):
        session = self.session_maker()

        profiler = StatementProfiler(self.engine)
        profiler.start()

        user = _User(name='Alice')

        # INSERT
        with session.begin(subtransactions=True):
            session.add(user)

        # UPDATE
        with session.begin(subtransactions=True):
            user.name = 'Bob'

        # SELECT
        session.query(_User).all()

        # DELETE
        with session.begin(subtransactions=True):
            session.delete(user)

        profiler.stop()

        # [I]NSERT -> [U]PDATE -> [S]ELECT -> [D]ELETE
        assert profiler.sequence == 'IUSD'


if __name__ == '__main__':
    unittest.main()

もちろんデコレータの API でも使える。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import unittest

from sqlalchemy import create_engine

from sqlalchemy_profile import sqlprofile

ENGINE = create_engine('sqlite:///')


class Test_Decorator(unittest.TestCase):

    # SELECT -> SELECT = SS
    @sqlprofile(ENGINE, seq='SS')
    def test(self):
        connection = ENGINE.connect()

        connection.execute('SELECT 1')
        connection.execute('SELECT 2')


if __name__ == '__main__':
    unittest.main()

もっと厳密にアサーションしたい

いやいや SQL 文の構造までもっと調べたいよ、というときは StatementProfiler#statementsStatementProfiler#statements_with_parameters を使う。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import unittest

from sqlalchemy.ext import declarative
from sqlalchemy import Column
from sqlalchemy import Integer
from sqlalchemy import Text
from sqlalchemy import create_engine
from sqlalchemy.orm.session import sessionmaker

from sqlalchemy_profile import StatementProfiler

Base = declarative.declarative_base()


class _User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    name = Column(Text, nullable=False)


class Test_ORMapping(unittest.TestCase):

    def setUp(self):
        self.engine = create_engine('sqlite:///')
        Base.metadata.create_all(self.engine)
        self.session_maker = sessionmaker(bind=self.engine)

    def tearDown(self):
        Base.metadata.drop_all(self.engine)

    def test(self):
        session = self.session_maker()

        profiler = StatementProfiler(self.engine)
        profiler.start()

        user = _User(name='Alice')

        # INSERT
        with session.begin(subtransactions=True):
            session.add(user)

        profiler.stop()

        assert profiler.count == 1
        assert profiler.insert == 1

        # 生の SQL 文を取得する
        print(profiler.statements)
        print(profiler.statements_with_parameters)


if __name__ == '__main__':
    unittest.main()

こんな感じ。

['INSERT INTO users (name) VALUES (?)']
[('INSERT INTO users (name) VALUES (?)', ('Alice',))]
.
----------------------------------------------------------------------
Ran 1 test in 0.019s

OK

こちらは、今のところデコレータの API では使えない。

まとめ

  • SQLAlchemy の生成する SQL 文を確認するための sqlalchemy-profile というパッケージを作ってみた
  • O/R マッピングをすると、生成される SQL 文をプログラマが把握しにくくなる
  • 非効率なクエリをコードや実行結果から目で見て確認するのは手間がかかる
  • sqlalchemy-profile を使うことで実行される SQL 文をユニットテストで確認できるようになる

もしかすると似たようなことができるパッケージが既にあるかも。