CUBE SUGAR CONTAINER

技術系のこと書きます。

CentOS7 で Apache Hive を使ってみる

今回は Apache Hadoop 上で動作する MapReduce アプリケーションの一つ Apache Hive を使ってみる。 Apache Hive を使うと Hadoop/HDFS の上で HiveQL という SQL のサブセットが使えるようになる。 実行したクエリは MapReduce のジョブに変換されて Hadoop クラスタで分散並列処理されることから高スループットが得られる。

ただし、MapReduce アプリケーションのご多分に漏れずレイテンシーはでかい。 ようするに一つ一つのクエリの実行自体には時間がかかってしまう。 また、一度追加したレコードについては基本的に更新したり削除することができない。 それらの特性から、オンライントランザクション処理 (OLTP) のような用途には全く向いていない。 代わりに、どんどん一方的にデータが蓄積されていくような状況で後からバッチとかで集計するような用途には向いていると思う。

そんな Apache Hive を今回は CentOS7 上に構築して使ってみる。 今回も Hadoop ディストリビューションは使わず、OSS のリリースパッケージをそのままインストールする。 その方が、おそらく挙動や設定について多くの知見が得られると思うので。 そして、Apache Hive は Apache Hadoop/HDFS 上で動作する。 そのため、今回使う環境は以下のエントリにもとづいて Apache Hadoop が疑似分散モードでセットアップ済みなことを想定する。

blog.amedama.jp

上記のエントリにもあるけど、今回使った環境は次の通り。

$ cat /etc/redhat-release
CentOS Linux release 7.3.1611 (Core)
$ uname -r
3.10.0-514.16.1.el7.x86_64
$ bin/hadoop version
Hadoop 2.8.0
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r 91f2b7a13d1e97be65db92ddabc627cc29ac0009
Compiled by jdu on 2017-03-17T04:12Z
Compiled with protoc 2.5.0
From source with checksum 60125541c2b3e266cbf3becc5bda666
This command was run using /home/vagrant/hadoop-2.8.0/share/hadoop/common/hadoop-common-2.8.0.jar

インストールする

インストール方法については次のドキュメントを参照した。 ただし、Apache Hive 2.x 系についてはここに書かれている内容だけではエラーになってしまった。 そのため公式サイト以外にも調べて必要な手順を足している。

GettingStarted - Apache Hive - Apache Software Foundation

AdminManual Installation - Apache Hive - Apache Software Foundation

下準備

まずはパッケージをダウンロードするために wget をインストールしておく。

$ sudo yum -y install wget

続いては環境変数 HADOOP_HOME に Apache Hadoop のインストール先ディレクトリを指定しておく。

$ export HADOOP_HOME=~/hadoop-2.8.0

続いては Apache Hive のパッケージをダウンロードする。 ミラーサイトは次の通りなので、好きなところを選ぼう。

www.apache.org

ただし、現状で Apache Hive には 1.x 系と 2.x 系という二つのメジャーバージョンが併用されている。 軽く調べた感じ、まだ 1.x 系の方が使われているっぽいかな? ただ、2.x 系には 1.x 系にない機能が色々とあるみたい。 ちなみに、どうやら Apache Hive は最新の安定版リリース以外ダウンロードサイトに置いていないようだ。

バージョン 1.x を使う場合

もしバージョン 1.x 系を使うときは、こうする。

$ wget http://ftp.riken.jp/net/apache/hive/hive-1.2.2/apache-hive-1.2.2-bin.tar.gz
$ tar xf apache-hive-1.2.2-bin.tar.gz
$ cd apache-hive-1.2.2-bin
バージョン 2.x を使う場合

バージョン 2.x 系を使うときは、こう。

$ wget http://ftp.riken.jp/net/apache/hive/hive-2.1.1/apache-hive-2.1.1-bin.tar.gz
$ tar xf apache-hive-2.1.1-bin.tar.gz
$ cd apache-hive-2.1.1-bin

セットアップする

どちらのメジャーバージョンを使うときも同じ手順でセットアップできるようだ。 ただし 2.x 系については 1.x 系で不要な操作も必ずしなければいけないようになっている。

まずは HDFS に Hive のデータ置き場を作る。

$ $HADOOP_HOME/bin/hadoop fs -mkdir -p /user/hive/warehouse
$ $HADOOP_HOME/bin/hadoop fs -chmod g+w /user/hive/warehouse

同様にテンポラリディレクトリも必要なので作っておこう。

$ $HADOOP_HOME/bin/hadoop fs -mkdir -p /tmp
$ $HADOOP_HOME/bin/hadoop fs -chmod g+w /tmp

続いては Apache Hive がメタデータを入れておく RDBMS を準備する。 これには MySQL とか PostgreSQL とかも使えるけど、デフォルトでは Apache Derby という Java で書かれた組み込みのものが使われる。

$ bin/schematool -dbType derby -initSchema --verbose
...(snip)...
Initialization script completed
schemaTool completed

これで下準備はおわり。

使ってみる

続いては、実際に Apache Hive を使ってみることにしよう。 それにはまず hive というコマンドを起動する。

$ bin/hive

すると、次のように HiveQL を入力するためのシェルが起動する。

hive>

ちなみに、このとき 2.x 系を使っていると次のような警告が出る。 どうやら 2.x 系では MapReduce で動作させるのは非推奨となっていて、代わりに Spark とか Tez の上で動かせってことらしい。 ここらへんはまだよく分かっていないけど、おそらくレイテンシーの大きさを改善するための試みな気がしている。

Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.

シェルの使い方

基本的には MySQL なんかのシェルを使うのと似たような感じで操作できる。 例えばデータベースの一覧を得るには SHOW DATABASES コマンドが使える。

hive> SHOW DATABASES;
OK
default
Time taken: 0.011 seconds, Fetched: 1 row(s)

今操作しているデータベースを表示したいときは、こうする。

hive> set hive.cli.print.current.db=true;

シェルの中に表示されるようになった。

hive (default)>

データベースを作る

データベースを新しく作るには CREATE DATABASE コマンドを使う。

hive (default)> CREATE DATABASE mydb;
OK
Time taken: 0.106 seconds

使うには USE コマンドを。 ここらへんは MySQL とほとんど同じ。

hive (default)> USE mydb;
OK
Time taken: 0.038 seconds

hive (mydb)>

テーブルを作る

続いてはテーブルを作ってみる。 ここも至って普通の SQL が使える。 代理キーを入れてみたけど、特に自動生成してくれるような機能はなさそうかな・・・?

hive (mydb)> CREATE TABLE users (
           >   id INT,
           >   name STRING,
           >   age INT
           > );
OK
Time taken: 0.068 seconds

SHOW TABLES で確認すると、ちゃんとテーブルができている。

hive (mydb)> SHOW TABLES;
OK
users
Time taken: 0.022 seconds, Fetched: 1 row(s)

続いてはレコードを追加してみる。 さっきまでの処理は単に RDB にメタデータを追加するだけで済んでいたのかすぐに終わったけど、この処理には結構かかる。 がっつり MapReduce のジョブに変換されているみたいだ。

hive (mydb)> INSERT INTO users VALUES (1, 'Alice', 20);
...(snip)...
Total MapReduce CPU Time Spent: 1 seconds 860 msec
OK
Time taken: 31.601 seconds

SELECT で追加されたレコードを確認してみよう。

hive (mydb)> SELECT * FROM users;
OK
1  Alice   20
Time taken: 0.22 seconds, Fetched: 1 row(s)

ちなみに、クエリがどう処理されているかを詳しく見るには EXPLAIN を先頭につけるらしい。

hive (mydb)> EXPLAIN SELECT * FROM users;
OK
STAGE DEPENDENCIES:
  Stage-0 is a root stage

STAGE PLANS:
  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        TableScan
          alias: users
          Statistics: Num rows: 1 Data size: 10 Basic stats: COMPLETE Column stats: NONE
          Select Operator
            expressions: id (type: int), name (type: string), age (type: int)
            outputColumnNames: _col0, _col1, _col2
            Statistics: Num rows: 1 Data size: 10 Basic stats: COMPLETE Column stats: NONE
            ListSink

Time taken: 0.086 seconds, Fetched: 17 row(s)

続いては複数のテーブルを連結してみる。 試しにユーザのメールアドレスを想定したテーブルを作ってみよう。

hive (mydb)> CREATE TABLE emails (
           >   id INT,
           >   address STRING
           > );
OK
Time taken: 0.062 seconds

hive (mydb)> INSERT INTO emails VALUES (1, 'alice@example.jp');
...(snip)...
OK
Time taken: 18.881 seconds

テーブル同士を連結 (JOIN) するときも一般的な RDBMS と同じようにできる。 ただし、この処理も実行には時間がかかる。

hive (mydb)> SELECT *
           > FROM users
           > JOIN emails ON users.id = emails.id
           > WHERE name like 'Alice';
...(snip)...
OK
1  Alice   20 1  alice@example.jp
Time taken: 31.934 seconds, Fetched: 1 row(s)

ところで、これまで見てきたテーブルなどのデータは HDFS 上にどのようにして格納されているのだろう? 見ると、次のようにテーブルごとでディレクトリになってその中にファイルが入っている。

$ $HADOOP_HOME/bin/hdfs dfs -ls -R /user/hive/warehouse
drwxrwxr-x   - vagrant supergroup          0 2017-05-22 21:35 /user/hive/warehouse/mydb.db
drwxrwxr-x   - vagrant supergroup          0 2017-05-22 21:35 /user/hive/warehouse/mydb.db/emails
-rwxrwxr-x   1 vagrant supergroup         19 2017-05-22 21:35 /user/hive/warehouse/mydb.db/emails/000000_0
drwxrwxr-x   - vagrant supergroup          0 2017-05-22 21:33 /user/hive/warehouse/mydb.db/users
-rwxrwxr-x   1 vagrant supergroup         11 2017-05-22 21:33 /user/hive/warehouse/mydb.db/users/000000_0

さらにファイルの中身を見てみると・・・なんと、ただのテキストファイルになっている。 こんなデータ構造でも上手くいくのは Hadoop クラスタと HDFS の賜物といえるだろう。 HDFS で各ノードに分散配置された細切れのファイルを Hadoop クラスタが分散並列実行するために高いスループットが得られる。

$ $HADOOP_HOME/bin/hdfs dfs -cat /user/hive/warehouse/users/000000_0
1Alice20

テーブルを削除するには DROP TABLE を使う。

hive (mydb)> DROP TABLE users;
OK
Time taken: 0.946 seconds
hive (mydb)> DROP TABLE emails;
OK
Time taken: 0.113 seconds

テーブルを削除すればディレクトリごとファイルもなくなる。

$ $HADOOP_HOME/bin/hdfs dfs -ls -R /user/hive/warehouse
drwxrwxr-x   - vagrant supergroup          0 2017-05-22 12:39 /user/hive/warehouse/mydb.db

データ構造を CSV にする

先ほど見た通り Apache Hive が扱うデータ構造は、デフォルトではただのテキストファイルになっている。 このことから、データ構造を例えば CSV にすることもできる。

テーブルを作るときのオプションとしてカンマ区切りになるよう指定しよう。

hive (mydb)> CREATE TABLE users (
           >   id INT,
           >   name STRING,
           >   age INT
           > )
           > ROW FORMAT DELIMITED
           > FIELDS TERMINATED BY ','
           > STORED AS TEXTFILE;
OK
Time taken: 0.079 seconds

もちろん、こうしておけば外部から CSV を読み込むこともできる。 例えば、次のようなファイルを用意しよう。

$ cat << 'EOF' > /tmp/users.csv
1,Alice,20
2,Bob,30
3,Carol,10
EOF

そして Hive で LOAD DATA を使って、それを読み込む。

hive (mydb)> LOAD DATA LOCAL INPATH '/tmp/users.csv' INTO TABLE users;
Loading data to table mydb.users
Table mydb.users stats: [numFiles=1, totalSize=31]
OK
Time taken: 0.223 seconds

データがちゃんと読み込まれている。

hive (mydb)> SELECT * FROM users WHERE age < 25;
OK
1  Alice   20
3  Carol   10
Time taken: 0.141 seconds, Fetched: 2 row(s)

そして、HDFS 上のデータの中身もこの通り。

$ $HADOOP_HOME/bin/hdfs dfs -ls -R /user/hive/warehouse | grep users.csv
-rwxrwxr-x   1 vagrant supergroup         31 2017-05-22 21:40 /user/hive/warehouse/mydb.db/users/users.csv
$ $HADOOP_HOME/bin/hdfs dfs -cat /user/hive/warehouse/mydb.db/users/users.csv
1,Alice,20
2,Bob,30
3,Carol,10

Vagrantfile

上記のセットアップを手動でやるのは大変なので Vagrant で自動化してみた。 使い方は次の通り。

$ git clone https://gist.github.com/d89890c1e9874f5e15f8cff9311544a5.git hadoop-cluster-with-hive
$ cd hadoop-cluster-with-hive
$ sh boot.sh

あとはマスターノードにログインして色々とやる。

$ vagrant ssh master

まとめ

今回は CentOS7 に Apache Hive をインストールして使ってみた。 Apache Hive を使うことで Hadoop/HDFS 上でスケーラブルなデータストアを構築できる。 構築したデータストアは HiveQL という SQL のサブセットを通して操作できる。 HiveQL は、実行されると MapReduce のジョブに変換された上で Hadoop クラスタ上で並列分散実行される。 そのためレイテンシーは大きいものの高いスループットが期待できるようだ。

今回も次の本が理解の役に立った。

Hadoop徹底入門 第2版 オープンソース分散処理環境の構築

Hadoop徹底入門 第2版 オープンソース分散処理環境の構築

Python から Hadoop Streaming を使ってみる

今回は、任意のプログラミング言語から Apache Hadoop を使うことのできる Hadoop Streaming という機能を使ってみる。

通常、Hadoop を使って MapReduce のジョブを直接扱うときは Java を使ってマッパーとリデューサーを書くことになる。 ただ、ご存知の通り Java のソースコードというのは重厚長大で、なかなか読み書きがしやすいとは言いにくい。 そこで、任意のプログラミング言語、具体的には標準入出力を処理する実行ファイルさえあれば使える機能ができた。 それが Hadoop Streaming というもの。 この機能を使うことで低レイヤーな MapReduce の処理を、使い慣れたプログラミング言語を使うなど好きなやり方で記述できる。

ちなみに、今回のエントリでは事前に Apache Hadoop がセットアップされていることを前提に書いていく。 具体的には、次のエントリのようにして疑似分散モードないし完全分散モードで構築されていること。

blog.amedama.jp

また、使用するプログラミング言語については使い慣れた Python を使うことにした。 そして、題材とする MapReduce のアプリケーションはワードカウントにする。 これは、MapReduce のハローワールド的な位置づけのアプリケーションになっている。 具体的な処理としては、テキストファイルの中に含まれる単語の数をカウントするというもの。

使った環境については前述したエントリにもあるけど、次の通り。

$ cat /etc/redhat-release
CentOS Linux release 7.3.1611 (Core)
$ uname -r
3.10.0-514.el7.x86_64
$ bin/hadoop version
Hadoop 2.8.0
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r 91f2b7a13d1e97be65db92ddabc627cc29ac0009
Compiled by jdu on 2017-03-17T04:12Z
Compiled with protoc 2.5.0
From source with checksum 60125541c2b3e266cbf3becc5bda666
This command was run using /home/vagrant/hadoop-2.8.0/share/hadoop/common/hadoop-common-2.8.0.jar

Hadoop Streaming でワードカウントを実行する

まずはワードカウントの処理対象となるテキストファイルから用意しておこう。 各単語はシンプルにスペースで区切られている。 見たところHelloHadoop といった単語はたくさん含まれていそうだ。

$ cat << 'EOF' > /tmp/input1.txt
Hello Hadoop World!
This is the first Hadoop Streaming application!
EOF
$ cat << 'EOF' > /tmp/input2.txt
Hello Hello Hello
Hadoop Streaming application!
EOF

続いては上記のファイルを HDFS にコピーする。 ワーキングディレクトリについては、先のエントリでセットアップした Apache Hadoop のディレクトリ内にいることを想定している。

$ bin/hdfs dfs -put /tmp/input*.txt .

もし、次のようなエラーが出るときはユーザのホームディレクトリが HDFS 上に存在していないことを示している。

put: `.': No such file or directory: `hdfs://localhost:9000/user/vagrant'

そのときはホームディレクトリを用意しよう。

$ bin/hdfs dfs -mkdir -p /user/$(whoami)

次のようにファイルがコピーされていることを確認する。

$ bin/hdfs dfs -ls
Found 2 items
-rw-r--r--   1 vagrant supergroup         68 2017-05-20 01:09 input1.txt
-rw-r--r--   1 vagrant supergroup         48 2017-05-20 01:09 input2.txt

続いて、今回の本題である Hadoop Streaming 用のプログラムを用意する。 その前に Hadoop Streaming で使うプログラムの基礎知識について説明しておく。

まず、Hadoop Streaming では標準入出力を通して Apache Hadoop とユーザのプログラムがデータをやり取りする。 このとき、プログラムは標準入出力を通してやり取りする各行をキーバリューの含まれたものとして扱うことになる。 始めに入力から見ると、これは本当にただ行が標準入力から渡ってくるだけに過ぎない。 ユーザのプログラムは、それを適切にパースしてキーバリューに分解する。 そして、分解したキーバリューに対して何らかの処理を施した上で、最後は標準出力に書き出すことになる。 つまり、標準入出力さえ扱うことができれば、どんなプログラムであっても MapReduce の処理を記述できるということになる。 ただし、詳しくは後述するものの、その処理が並列分散実行される点については留意しておく必要がある。

さて、長くなってしまったけどそろそろ実例を見ていこう。 Hadoop Streaming ではマッパーとレデューサーを別々に用意する。 今回はワードカウントを題材にするので、そのプログラムを Python で書いてみた。

まずはマッパーから。 標準入力の内容をパースしてキーバリューにしたら、それを標準出力に書き出している。 キーとバリューのペアはタブを使って区切るのが望ましい。 なぜなら Hadoop Streaming がデフォルトでキーとバリューをタブ区切りとして扱うため。 もちろん、この区切り文字は変更できるんだけど、今回はデフォルトのままでいく。

$ cat << 'EOF' > /var/tmp/mapper.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import re
import sys


def map_(line):
    # 空白などで単語ごとに行を区切る
    words = re.split(r'\s', line)
    # 単語と出現回数 (ここでは常に1) のタプルにする
    return [(key, 1) for key in words if key]


def write(kvs):
    for key, value in kvs:
        # キーとバリューをタブ区切りで標準出力に書き出す
        message = '{0}\t{1}'.format(key, value)
        print(message)


def main():
    # 標準入力から処理対象の文字列を読み込む
    for line in sys.stdin:
        # タブ区切りのキーとバリューに分解する
        key_and_values = map_(line)
        # 標準出力に書き込んでいく
        write(key_and_values)


if __name__ == '__main__':
    main()
EOF

続いてはリデューサー。 標準入力に含まれるキーバリューをパースしたら、その内容を元に単語の数をカウントしている。

$ cat << 'EOF' > /var/tmp/reducer.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import sys
from collections import defaultdict


# 集計結果を入れる辞書
results = defaultdict(int)


def reduce_(line):
    # 入力された行をタブで区切る
    key, value = line.split('\t')
    # キーごとに出現回数をカウントする
    results[key] += int(value)


def main():
    # キーの出現回数をカウントする
    for line in sys.stdin:
        reduce_(line)

    # 集計結果をキーとバリューのタブ区切りで標準出力に書き出す
    for key, value in results.items():
        message = '{0}\t{1}'.format(key, value)
        print(message)


if __name__ == '__main__':
    main()
EOF

上記で作ったマッパーとリデューサーに実行権限をつけておく。 これは必須ではない。

$ chmod +x /var/tmp/mapper.py
$ chmod +x /var/tmp/reducer.py

これで準備が整った。 それでは早速 Hadoop Streaming を実行してみよう。 jar には Apache Hadoop に同梱されている Hadoop Streaming 用のものを指定する。 -files オプションで使用する実行ファイルを指定して、それを -mapper-reducer オプションで割り当てていく。 MapReduce ジョブの入出力については -input-output オプションで指定する。

$ bin/hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.8.0.jar \
    -files /var/tmp/mapper.py,/var/tmp/reducer.py \
    -mapper mapper.py \
    -reducer reducer.py \
    -input input*.txt \
    -output output

無事にジョブが完了すると結果が output ディレクトリ以下に出力される。 内容を確認してみよう。

$ bin/hdfs dfs -cat output/*
application!    2
This    1
is  1
Hadoop  3
Streaming   2
World!  1
the 1
Hello   4
first   1

どうやら、ちゃんとワードカウントが動作しているようだ。

マップだけする (リデュースしない) パターン

場合によっては MapReduce で、マップだけを使ってリデュースはしたくないということもあるはず。 これは、単純に Apache Hadoop クラスタを並列分散実行するためだけに使うパターン。 このときは -D mapred.reduce.tasks=0 というようにリデュースの並列度を 0 に指定する。

実際に試してみよう。 このパターンではリデューサーの指定が必要ない。

$ bin/hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.8.0.jar \
    -D mapred.reduce.tasks=0 \
    -files /var/tmp/mapper.py \
    -mapper mapper.py \
    -input input*.txt \
    -output output2

結果は次の通り。 マップ処理を通してキーバリュー形式になるだけで終わった。

$ bin/hdfs dfs -cat output2/*
Hello   1
Hadoop  1
World!  1
This    1
is  1
the 1
first   1
Hadoop  1
Streaming   1
application!    1
Hello   1
Hello   1
Hello   1
Hadoop  1
Streaming   1
application!    1

Hadoop Streaming を扱う上での注意点

Hadoop Streaming のプログラムを書く上での注意点として一つ気づいたことがある。 それは、マップ処理は並列分散処理されるのは当たり前としても、リデュース処理も並列分散処理されうるということ。 なので、リデュース処理の中で一つのノードで完結していることを想定したような処理は含めてはいけないはず。

例えば、次のリデューサーは最終的な出力結果が降順ソートされることを期待した処理が含まれている。

$ cat << 'EOF' > /var/tmp/reducer.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""リデューサーが並列実行されると上手く動かないので、こんな風には書かないこと"""

import sys
from collections import defaultdict


results = defaultdict(int)


def reduce_(line):
    key, value = line.split('\t')
    results[key] += int(value)


def main():
    for line in sys.stdin:
        reduce_(line)

    # キーの出現回数ごとに降順ソートする
    items = results.items()
    sorted_items = sorted(items, key=lambda x: x[1], reverse=True)

    for key, value in sorted_items:
        message = '{0}\t{1}'.format(key, value)
        print(message)


if __name__ == '__main__':
    main()
EOF

上記を実行してみよう。 まずはリデュース処理の並列度が 1 の場合から。 つまり、一つのノード上で全てのリデュース処理が実行される。

$ chmod +x /var/tmp/reducer.py
$ bin/hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.8.0.jar \
    -D mapred.reduce.tasks=1 \
    -files /var/tmp/mapper.py,/var/tmp/reducer.py \
    -mapper mapper.py \
    -reducer reducer.py \
    -input input*.txt \
    -output output3

この場合は結果がちゃんとソートされて出る。

$ bin/hdfs dfs -cat output3/*
Hello   4
Hadoop  3
application!    2
Streaming   2
This    1
is  1
World!  1
the 1
first   1

しかし、並列度を上げてみるとどうだろうか? 例えば、次のようにすると並列度が 10 になる。 つまり、マッパーが処理したデータが 10 のノードに散らばって実行されることになる。 ただし、今回に関しては疑似分散モードなので一つのノード上でリデュースのタスクが 10 できるだけ。

$ bin/hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.8.0.jar \
    -D mapred.reduce.tasks=10 \
    -files /var/tmp/mapper.py,/var/tmp/reducer.py \
    -mapper mapper.py \
    -reducer reducer.py \
    -input input*.txt \
    -output output4

同じことは -numReduceTasks オプションでも指定できる。

$ bin/hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.8.0.jar \
    -files /var/tmp/mapper.py,/var/tmp/reducer.py \
    -mapper mapper.py \
    -reducer reducer.py \
    -input input*.txt \
    -output output4 \
    -numReduceTasks 10

実行結果を確認してみよう。 今度は見事に出力結果がソートされずバラバラになっている。 これはリデュース処理が並列実行されたことで起こった。 各々のリデュース単位では降順ソートされたものの、全体ではそれを連結したものになるので意味をなさない。

$ bin/hdfs dfs -cat output4/*
Hello   4
is  1
the 1
first   1
Hadoop  3
application!    2
World!  1
Streaming   2
This    1

上記を見て次に気になるのは、どうやってリデュース処理を各タスクに分解して割り振っているか、ということだろう。 リデュース処理に関しては、同じキーは必ず同じノード上に集めて実行しなきゃいけない。 つまり標準入出力に流れる行の中で、どの部分がキーなのかを Apache Hadoop は認識する必要があるということ。 これは前述した区切り文字と関係していて、デフォルトでは区切り文字がタブになっている。 そのため、今回は特に何も指定しなくても Apache Hadoop は標準入出力の中からキーを認識できた。 もし、これが異なる場合には Hadoop Streaming を実行する際に、それをオプションで伝えてやる必要がある。

Hadoop Streaming アプリケーションの開発について

ところで、Hadoop Streaming のアプリケーションはどうやって開発すれば良いだろうか。 上記のように Hadoop Streaming 経由で毎回起動するのは、正直めんどくさい。 そんなときは HDFS を使わずにローカルディスクを、Hadoop Streaming の代わりにパイプを使ってデバッグするやり方がある。

結局のところ Hadoop Streaming がやっている標準入出力同士をつなぎ合わせるという処理は Unix のパイプと同じだ。 HDFS と Hadoop クラスタで、それが並列分散実行されるという点を除けば、だけど。

そのため、並列度が 1 の挙動については次のようにして確かめることができる。

$ cat /tmp/input*.txt | /var/tmp/mapper.py  | /var/tmp/reducer.py
application!    2
This    1
is  1
Hadoop  3
Streaming   2
World!  1
the 1
Hello   4
first   1

リデュースしたくないときはパイプをつながなければいいだけ。

$ cat /tmp/input*.txt | /var/tmp/mapper.py
Hello   1
Hadoop  1
World!  1
This    1
is  1
the 1
first   1
Hadoop  1
Streaming   1
application!    1
Hello   1
Hello   1
Hello   1
Hadoop  1
Streaming   1
application!    1

上記のようにして、まずは手元で動作確認した上で Hadoop Streaming にかければ良い。 この段階で動作しないアプリケーションは Hadoop Streaming に持っていっても絶対に動作はしないだろう。

まとめ

今回は Apache Hadoop を任意のプログラミング言語から扱うことのできる Hadoop Streaming という機能を試してみた。 この機能を使えば Java を使いたくないという人でも Apache Hadoop の低レイヤーな MapReduce 処理を記述できる。

参考文献

Hadoop徹底入門 第2版 オープンソース分散処理環境の構築

Hadoop徹底入門 第2版 オープンソース分散処理環境の構築

CentOS7 で Apache Hadoop の疑似分散モードを使ってみる

Apache Hadoop はビッグデータ処理基盤を構築するための超有名なオープンソースソフトウェア。 Google の発表した論文を元にして MapReduce アルゴリズムと Hadoop Distributed File System (HDFS) が実装されている。 この Hadoop/HDFS を中心として Apache HiveApache HBase などのミドルウェアが動作する一大エコシステムが形成されている。 今回は、そんな Apache Hadoop を CentOS7 で使ってみることにする。

尚、Apache Hadoop 関連の OSS をインストールするときは、いくつかの会社が出しているディストリビューションを利用することが多い。 例えば Cloudera CDH や Hortonworks HDP など。 しかし、今回はそれらのディストリビューションを使うのではなく Apache が出しているパッケージをそのまま使ってみる。 その方が内部でどんなことをやっているのかとか、どのように設定すれば良いのか理解が深まるだろうと考えたため。

ちなみに、Apache Hadoop は以下に示す三つの動作モードがある。

  • ローカルモード
    • 一つのホストで動作する
    • HDFS を使わない
  • 疑似分散モード
    • 一つのホストで動作する
    • HDFS を使う
  • 完全分散モード
    • 複数のホストで動作する
    • HDFS を使う

今回は、その中でも疑似分散モードを試すことにした。 実運用に乗せる環境なら確実に完全分散モードになるけど、手元での検証用なら疑似分散モードでも十分に使えそうなので。 ちなみに HDFS というのは複数のホストに細切れにしたファイルを配布した上で、それを各ホストで並列処理できる仕組みを備えたファイルシステムになっている。

疑似分散モード、というよりローカルモードを含むシングルホストで動かすときに参照するドキュメントは次の通り。

Apache Hadoop 2.8.0 – Hadoop: Setting up a Single Node Cluster.

もし完全分散モードなら、このドキュメントを参照する。

Apache Hadoop 2.8.0 – Hadoop Cluster Setup

使った環境は次の通り。

$ cat /etc/redhat-release
CentOS Linux release 7.3.1611 (Core)
$ uname -r
3.10.0-514.16.1.el7.x86_64

Java をインストールする

Hadoop 関連のソフトウェアは、その多くが JVM 言語で書かれている。 なので、まずは Java をインストールしておこう。 一応、動作確認が取れているバージョンはこのページにあった。

HadoopJavaVersions - Hadoop Wiki

OpenJDK だと 1.7 しか今のところ確認されていないっぽい。 とはいえ、まあ動かないなんてことはないだろうという考えで 1.8 を入れてみることにする。 jps コマンドが使いたいのでインストールするのを JDK にする。

$ sudo yum -y install java-1.8.0-openjdk-devel

java コマンドが使えることが確認する。

$ java -version
openjdk version "1.8.0_131"
OpenJDK Runtime Environment (build 1.8.0_131-b11)
OpenJDK 64-Bit Server VM (build 25.131-b11, mixed mode)

Apache Hadoop をローカルモードで動かす

まずは Apache Hadoop をダウンロードした上でローカルモードで動かしてみる。 というより、何も設定しない状態ではローカルモードとして動作する。

ひとまず最低限必要なパッケージを一通り入れておく。

$ sudo yum -y install openssh-clients rsync wget

次に Apache Hadoop をダウンロードする。

$ wget http://ftp.riken.jp/net/apache/hadoop/common/hadoop-2.8.0/hadoop-2.8.0.tar.gz

Apache Hadoop のミラーサイトは次の通り。 好きなところを選んでダウンロードしよう。

www.apache.org

ダウンロードできたら解凍する。

$ tar xf hadoop-2.8.0.tar.gz

解凍してできたディレクトリに移動しよう。

$ cd hadoop-2.8.0/

Hadoop を動かすには環境変数として JAVA_HOME が設定されている必要がある。 そのため、先ほどインストールした OpenJDK のディレクトリを指定する。

$ export JAVA_HOME=/usr/lib/jvm/jre-1.8.0-openjdk

あるいは、起動時の環境変数を etc/hadoop/hadoop-env.sh で指定することもできる。 設定を永続化したいときは、こちらを使えば良さそう。

$ sed -i -e '
  s:^export JAVA_HOME=.*$:export JAVA_HOME=/usr/lib/jvm/jre-1.8.0-openjdk:
' etc/hadoop/hadoop-env.sh

これで hadoop コマンドがエラーにならず実行できるようになる。

$ bin/hadoop version
Hadoop 2.8.0
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r 91f2b7a13d1e97be65db92ddabc627cc29ac0009
Compiled by jdu on 2017-03-17T04:12Z
Compiled with protoc 2.5.0
From source with checksum 60125541c2b3e266cbf3becc5bda666
This command was run using /home/vagrant/hadoop-2.8.0/share/hadoop/common/hadoop-common-2.8.0.jar

いくつかサンプルプログラムを動かしてみよう。 これには hadoop コマンドに付属のサンプルプログラムの入った jar ファイルを指定する。 例えば、これは円周率を計算するもの。

$ bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.8.0.jar pi 10 100000
Number of Maps  = 10
Samples per Map = 100000
Wrote input for Map #0
Wrote input for Map #1
Wrote input for Map #2
Wrote input for Map #3
Wrote input for Map #4
Wrote input for Map #5
Wrote input for Map #6
Wrote input for Map #7
Wrote input for Map #8
Wrote input for Map #9
Starting Job
...(snip)...
Job Finished in 1.526 seconds
Estimated value of Pi is 3.14155200000000000000

それ以外には、ディレクトリ内のファイルから特定の文字列を検索する grep とか。 ローカルモードでは HDFS を使わないので OS のファイルシステムを直接使うことになる。

検索対象として input ディレクトリの中に Hadoop の設定ファイルをコピーしておこう。

$ mkdir input
$ cp etc/hadoop/*.xml input

input ディレクトリの中にあるファイルから dfs から始まる言葉を探してみる。

$ bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.8.0.jar grep input output 'dfs[a-z.]+'

結果は output ディレクトリに出力される。 この例では dfsadmin という言葉が一つだけ見つかった。

$ cat output/*
1  dfsadmin

同じ文字列を使って設定ファイルを egrep すると、たしかに一つだけ見つかる。

$ egrep 'dfs[a-z.]+' etc/hadoop/*.xml
etc/hadoop/hadoop-policy.xml:    dfsadmin and mradmin commands to refresh the security policy in-effect.

疑似分散モード

続いては疑似分散モードで動かしてみる。 疑似分散モードでは一つのホストで完結しているもののファイルシステムとして HDFS を使うことになる。 これによって、より実運用に近づけた状態で動作確認ができる。

SSH でログインできるようにする

Hadoop は SSH 経由でホストを操作するので、あらかじめログインできるようにしておかないといけない。 そこで、パスワードなしの公開鍵のペアを作ってローカルホストに仕込んでおく。

$ ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa
$ ssh-copy-id -i ~/.ssh/id_rsa.pub localhost

ログインできることを確認しておこう。

$ ssh localhost

設定ファイル: etc/hadoop/core-site.xml

続いては設定ファイルを編集していく。

まずは etc/hadoop/core-site.xml に接続先ファイルシステム (HDFS) の設定を仕込む。

$ cat << 'EOF' > /tmp/core-site.xml.property
  <property>
    <name>fs.defaultFS</name>
    <value>hdfs://localhost:9000</value>
  </property>
EOF
$ sed -i -e '
  /^<configuration>$/r /tmp/core-site.xml.property
  /^$/d
' etc/hadoop/core-site.xml

こんな感じ。 これでローカルホストの HDFS に接続するようになる。

$ tail -n 6 etc/hadoop/core-site.xml
<configuration>
  <property>
    <name>fs.defaultFS</name>
    <value>hdfs://localhost:9000</value>
  </property>
</configuration>

設定ファイル: etc/hadoop/hdfs-site.xml

続いては etc/hadoop/hdfs-site.xml で HDFS の設定をする。 ここでは、ファイルのレプリケーション数を 1 に設定する。 HDFS では冗長性を担保するために、複数のホストで同じデータを重複して持つ。 これが HDFS のレプリケーション数と呼ばれるもので、デフォルトでは 3 になっている。 シングルホストで動く疑似分散モードでは、レプリケーション数は 1 にするしかない。

$ cat << 'EOF' > /tmp/hdfs-site.xml.property
  <property>
    <name>dfs.replication</name>
    <value>1</value>
  </property>
EOF
$ sed -i -e '
  /^<configuration>$/r /tmp/hdfs-site.xml.property
  /^$/d
' etc/hadoop/hdfs-site.xml

こんな感じになる。

$ tail -n 6 etc/hadoop/hdfs-site.xml
<configuration>
  <property>
    <name>dfs.replication</name>
    <value>1</value>
  </property>
</configuration>

HDFS をセットアップする

ここまで設定できたら HDFS をフォーマットする。

$ bin/hdfs namenode -format

そして、必要なサービス (デーモン) を起動する。

$ sbin/start-dfs.sh

上記を実行したら、ログにエラーメッセージなどが出ていないことを確認しておこう。

$ grep -ir ERROR logs/*

また、次のように jps を確認してネームノード、データノード、セカンダリネームノードが動作していれば良い。 ネームノードは HDFS のメタデータなどを保持するマスターサーバのこと。 データノードは、実際のデータを格納しているスレーブサーバのようなもの。 セカンダリネームノードはその名の通りネームノードのセカンダリ。

$ jps
28720 NameNode
28849 DataNode
29012 SecondaryNameNode
29129 Jps

ここまでで hdfs コマンドが使えるようになる。 このコマンドを経由して HDFS の操作を行う。 使い方は一般的な POSIX のファイルシステムと基本的には似通っている。 ひとまず作業用にホームディレクトリを作ってみよう。

$ bin/hdfs dfs -mkdir -p /user/$(whoami)

すると、こんな感じでディレクトリが作られる。

$ bin/hdfs dfs -ls -R /
drwxr-xr-x   - vagrant supergroup          0 2017-05-15 21:16 /user
drwxr-xr-x   - vagrant supergroup          0 2017-05-15 21:16 /user/vagrant

ちなみに、HDFS で操作した実体はもちろん OS のファイルシステム上に存在している。 デフォルトでは /tmp/hadoop-<username> に作られる。

$ ls /tmp/hadoop-$(whoami)

先ほどと同じように grep のサンプルプログラムを動かすために input ディレクトリを作る。 その上でローカルのファイルシステムにある設定ファイルを HDFS のディレクトリ内にコピーしよう。

$ bin/hdfs dfs -mkdir input
$ bin/hdfs dfs -put etc/hadoop/*.xml input

上記ができたらサンプルプログラムを実行する。

$ bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.8.0.jar grep input output 'dfs[a-z.]+'

実行が終わったら結果が出力されているディレクトリを HDFS からローカルのファイルシステムにコピーする。

$ bin/hdfs dfs -get output output2

結果を見ると、次のようになった。

$ cat output2/*
1  dfsadmin
1  dfs.replication

二つに増えているのは疑似分散モードで動かすための設定に dfs から始まる文字列が含まれていたため。

$ egrep 'dfs[a-z.]+' etc/hadoop/*.xml
etc/hadoop/hadoop-policy.xml:    dfsadmin and mradmin commands to refresh the security policy in-effect.
etc/hadoop/hdfs-site.xml:    <name>dfs.replication</name>

ちなみに、後片付けとして HDFS のサービス (デーモン) を停止するには、次のようにする。

$ sbin/stop-dfs.sh

YARN/MRv2 で動かす

実は Hadoop には一口に MapReduce といっても複数の実装がある。 それが、先ほど動かした MRv1 と、これから動かす YARN/MRv2 というもの。 まだ違いがよく分かっていないんだけど、YARN/MRv2 は MRv1 に存在した問題を解消するために作られたらしい。 主に、スケーラビリティや汎用性が改善されているっぽい。

設定ファイル: etc/hadoop/mapred-site.xml

ここからは、また設定ファイルを編集していく。

まずは設定ファイル etc/hadoop/mapred-site.xml のテンプレートをコピーしてくる。

$ cp etc/hadoop/mapred-site.xml{.template,}

その上で MapReduce のフレームワークとして YARN が使われるようにする。

$ cat << 'EOF' > /tmp/mapred-site.xml.property
  <property>
    <name>mapreduce.framework.name</name>
    <value>yarn</value>
  </property>
EOF
$ sed -i -e '
  /^<configuration>$/r /tmp/mapred-site.xml.property
  /^$/d
' etc/hadoop/mapred-site.xml

こんな感じ。

$ tail -n 6 etc/hadoop/mapred-site.xml
<configuration>
  <property>
    <name>mapreduce.framework.name</name>
    <value>yarn</value>
  </property>
</configuration>

設定ファイル: etc/hadoop/yarn-site.xml

次は YARN 自体の設定を etc/hadoop/yarn-site.xml に書き込む。

$ cat << 'EOF' > /tmp/yarn-site.xml.property
  <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle</value>
  </property>
EOF
$ sed -i -e '
  /^<configuration>$/r /tmp/yarn-site.xml.property
  /^$/d
' etc/hadoop/yarn-site.xml

こうなる。

$ tail -n 7 etc/hadoop/yarn-site.xml
<configuration>
  <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle</value>
  </property>
<!-- Site specific YARN configuration properties -->
</configuration>

あとは YARN のサービス (デーモン) を起動する。

$ sbin/start-yarn.sh

先ほどと同じようにログにエラーメッセージが出ていないか確認しておく。

$ grep -ir ERROR logs/*

次のようにリソースマネージャとノードマネージャが起動していれば良い。

$ jps
28720 NameNode
28849 DataNode
29602 Jps
29012 SecondaryNameNode
29194 ResourceManager
29309 NodeManager

YARN/MRv2 でサンプルプログラムを動かしてみる

YARN/MRv2 は MapReduce の実行フレームワークが異なるだけで基本的には MRv1 と同じプログラムが使えるらしい。

先ほどと同じようにサンプルプログラムを動かしてみよう。 一旦、さっき使ったディレクトリを削除して、改めて設定ファイルをコピーしておく。 今度は XML 以外のファイルも対象に含めてみる。

$ bin/hdfs dfs -rm -r -f input output
$ bin/hdfs dfs -put etc/hadoop input

grep のサンプルプログラムを実行する。 理由は分からないけど、今回はやたらと時間がかかった。 ただファイルが増えただけが理由なのか YARN/MRv2 を使っているせいなのか。

$ bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.8.0.jar grep input output 'dfs[a-z.]+'

終わったら実行結果を HDFS からローカルのファイルシステムにコピーする。

$ bin/hdfs dfs -get output output3

次のように dfs から始まる文字列が見つかった。

$ cat output3/*
6  dfs.audit.logger
4  dfs.class
3  dfs.logger
3  dfs.server.namenode.
2  dfs.audit.log.maxbackupindex
2  dfs.period
2  dfs.audit.log.maxfilesize
1  dfs.log
1  dfs.file
1  dfs.servers
1  dfsadmin
1  dfsmetrics.log
1  dfs.replication

後片付けするときは次のようにして YARN のサービス (デーモン) を停止する。

$ sbin/stop-yarn.sh

まとめ

今回は素の Apache Hadoop を CentOS7 の上で使ってみた。 動作モードとしてはシングルホスト上で HDFS と共に動作する疑似分散モードを選んだ。 実運用するなら完全分散モードになるけど、手元での動作検証用なら有用といえそうだ。 ビッグデータを扱う上で Apache Hadoop 関連のプロダクトは避けて通れないので、今後も知見を集めていきたい。

Apache Hadoop の概念や動作原理的な部分は、この本が分かりやすかった。 ただし、この本では発売時期的に MRv1 しか扱っておらず YARN/MRv2 については書かれていない。

Hadoop徹底入門 第2版 オープンソース分散処理環境の構築

Hadoop徹底入門 第2版 オープンソース分散処理環境の構築

Python: データパイプライン構築用フレームワーク Luigi を使ってみる

最近になって、バッチ処理においてデータパイプラインを組むためのフレームワークとして Luigi というものがあることを知った。 これは、Spotify という音楽のストリーミングサービスを提供する会社が作ったものらしい。 似たような OSS としては他にも Apache Airflow がある。 こちらは民宿サービスを提供する Airbnb が作ったものだけど、最近 Apache に寄贈されてインキュベータープロジェクトとなった。

Luigi の特徴としては、バッチ処理に特化している点が挙げられる。 ただし、定期的にバッチ処理を実行するような機能はない。 そこは、代わりに cron や systemd timer を使ってやる必要がある。 また、本体もそうだけどデータパイプラインについても Python を使って書く必要がある。

今回は、そんな Luigi を一通り触ってみることにする。 使った環境は次の通り。

$ sw_vers 
ProductName:    Mac OS X
ProductVersion: 10.12.4
BuildVersion:   16E195
$ python --version
Python 3.5.3

インストールする

インストールは Python のパッケージマネージャの pip を使ってさくっとできる。

$ pip install luigi
$ pip list --format=columns | grep luigi
luigi           2.6.1

ハローワールド

まず、Luigi における最も重要な概念としてタスクというものを知る必要がある。 タスクというのは、ユーザが Luigi にやらせたい何らかの仕事を細分化した最小単位となる。 そして、タスクを定義するには Task というクラスを継承したサブクラスを作る。

次のサンプルコードでは、最も単純なタスクを定義している。 このタスクでは run() メソッドをオーバーライドしてメッセージをプリントするようになっている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi


# Luigi で実行したい仕事は Task を継承したクラスにする
class Greet(luigi.Task):

    def run(self):
        """run() メソッドで具体的な処理を実行する"""
        print('Hello, World!')


def main():
    luigi.run()


if __name__ == '__main__':
    main()

上記のタスクを実行するには、次のようにする。 第二引数に渡している Greet が、実行したいタスク (のクラス名) を指している。 --local-scheduler オプションは、タスクのスケジューラをローカルモードに指定している。 スケジューラにはローカルとセントラルの二種類があるんだけど、詳しくは後ほど説明する。 とりあえず開発用途で使うならローカル、と覚えておけば良いかな。

$ python helloworld.py Greet --local-scheduler

実際に実行してみると、次のような出力が得られる。 実行サマリーとして Greet タスクが成功していることや、ログの中にメッセージが出力されていることが見て取れる。

$ python helloworld.py Greet --local-scheduler
DEBUG: Checking if Greet() is complete
/Users/amedama/.virtualenvs/py35/lib/python3.5/site-packages/luigi/worker.py:328: UserWarning: Task Greet() without outputs has no custom complete() method
  is_complete = task.complete()
INFO: Informed scheduler that task   Greet__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 7082] Worker Worker(salt=970230875, workers=1, host=macbookair.local, username=amedama, pid=7082) running   Greet()
Hello, World!
INFO: [pid 7082] Worker Worker(salt=970230875, workers=1, host=macbookair.local, username=amedama, pid=7082) done      Greet()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Greet__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=970230875, workers=1, host=macbookair.local, username=amedama, pid=7082) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 Greet()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

上記は、代わりに次のようにすることもできる。 このやり方では、まず python コマンドで luigi モジュールを実行して、そこ経由でタスクを実行している。

$ python -m luigi --module helloworld Greet --local-scheduler

このやり方の利点としては、パッケージ化されたインストール済みのタスクを実行できる点が挙げられる。 先ほどはカレントワーキングディレクトリ (CWD) にある Python ファイル (モジュール) を直接指定して実行していた。 それに対し、こちらは Python のパスにもとづいてモジュールが実行されるので、インストール済みのモジュールなら CWD の場所に関わらず実行できる。 そして、python コマンドのパスには CWD も含まれるので、先ほどと同じように実行できるというわけ。

もし、Python のモジュールとかがよく分からないというときは必要に応じて以下を参照してもらえると。 ようするに言いたいことは Python においてモジュールって呼ばれてるのは単なる Python ファイルのことだよ、という点。

blog.amedama.jp

また、もう一つのやり方として luigi コマンドを使うこともできる。 この場合も、実行するモジュールは --module オプションで指定する。 ただし、このやり方だと Python のパスに CWD が含まれない。 そのため CWD にあるモジュールを実行したいときは別途 PYTHONPATH 環境変数を指定する必要がある。

$ PYTHONPATH=. luigi --module helloworld Greet --local-scheduler

実行するタスクをモジュール内で指定する

とはいえ、毎回コマンドラインで実行するモジュールを指定するのも面倒だと思う。 そんなときは、次のようにして luigi.run() に実行するときのパラメータを指定してやると良さそう。 開発や検証用途なら、こうしておくと楽ちん。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi


class Greet(luigi.Task):

    def run(self):
        print('Hello, World!')


def main():
    # 起動したいタスクを指定して Luigi を実行する
    luigi.run(main_task_cls=Greet, local_scheduler=True)
    # あるいは
    # task = Greet()
    # luigi.build([task], local_scheduler=True)


if __name__ == '__main__':
    main()

上記のようにしてあればファイルを指定するだけで実行できるようになる。

$ python easyrun.py 
DEBUG: Checking if Greet() is complete
/Users/amedama/.virtualenvs/py35/lib/python3.5/site-packages/luigi/worker.py:328: UserWarning: Task Greet() without outputs has no custom complete() method
  is_complete = task.complete()
INFO: Informed scheduler that task   Greet__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 7165] Worker Worker(salt=224720963, workers=1, host=macbookair.local, username=amedama, pid=7165) running   Greet()
Hello, World!
INFO: [pid 7165] Worker Worker(salt=224720963, workers=1, host=macbookair.local, username=amedama, pid=7165) done      Greet()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Greet__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=224720963, workers=1, host=macbookair.local, username=amedama, pid=7165) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 Greet()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

ターゲットを指定する

Luigi において、もう一つ重要な概念がターゲットというもの。 これは、タスクを実行した結果を永続化する先を表している。 例えば、永続化する先はローカルディスクだったり Amazon S3 だったり HDFS だったりする。 タスクをチェーンしていく場合、このターゲットを介してデータをやり取りする。 つまり、一つ前のタスクが出力したターゲットの内容が、次のタスクの入力になるというわけ。

ちなみに、実は Luigi ではターゲットの指定がほとんど必須といってよいものになっている。 これまでの例では指定してなかったけど、それは単にすごくシンプルだったので何とかなっていたに過ぎない。 実際のところ、実行するときに次のような警告が出ていたのはターゲットを指定していなかったのが原因になっている。

/Users/amedama/.virtualenvs/py35/lib/python3.5/site-packages/luigi/worker.py:328: UserWarning: Task Greet() without outputs has no custom complete() method
  is_complete = task.complete()

次のサンプルコードでは、ターゲットとしてローカルディスクを指定している。 永続化する先としてローカルディスクを指定するときは LocalTarget というターゲットを使う。 ターゲットを指定するにはタスクのクラスに output() というメソッドをオーバーライドする。 そして、タスクの本体ともいえる run() メソッドでは output() メソッドで得られるターゲットに対して実行結果を書き込む。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi


class Greet(luigi.Task):

    def output(self):
        # Task の実行結果を保存する先を Target で指定する
        # LocalTarget はローカルディスクにファイルとして残す
        return luigi.LocalTarget('greeting.txt')

    def run(self):
        # output() メソッドで保存先の Target を取得する
        out = self.output()
        # ファイルを開く
        with out.open('w') as f:
            # 実行結果を書き込む
            f.write('Hello, World!\n')


def main():
    luigi.run(main_task_cls=Greet, local_scheduler=True)


if __name__ == '__main__':
    main()

それでは、上記を実行してみよう。 今回は、先ほどは表示されていた警告が出ていない点に注目してほしい。

$ python output.py 
DEBUG: Checking if Greet() is complete
INFO: Informed scheduler that task   Greet__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 7230] Worker Worker(salt=707987256, workers=1, host=macbookair.local, username=amedama, pid=7230) running   Greet()
INFO: [pid 7230] Worker Worker(salt=707987256, workers=1, host=macbookair.local, username=amedama, pid=7230) done      Greet()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Greet__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=707987256, workers=1, host=macbookair.local, username=amedama, pid=7230) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 Greet()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

そして、実行すると greeting.txt というファイルがディレクトリに作られている。

$ cat greeting.txt
Hello, World!

このように Luigi ではタスクの実行結果を永続化する。

テスト用途としてメモリをターゲットにする

先ほどの例ではターゲットとしてローカルディスクを使った。 ちなみに、テスト用途としてならターゲットをメモリにすることもできる。

次のサンプルコードでは MockTarget を使うことで実行結果を保存する先をメモリにしている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi
from luigi.mock import MockTarget


class Greet(luigi.Task):

    def output(self):
        # MockTarget はオンメモリにしか実行結果が残らない
        # 内部実装には io.Bytes が使われている
        # あくまでユニットテスト用途
        return MockTarget(self.__class__.__name__)

    def run(self):
        out = self.output()
        with out.open('w') as f:
            f.write('Hello, World!\n')


def main():
    luigi.run(main_task_cls=Greet, local_scheduler=True)


if __name__ == '__main__':
    main()

念のため、先ほど出力されたファイルを削除した上で上記を実行してみよう。

$ rm greeting.txt
$ python mock.py 
DEBUG: Checking if Greet() is complete
INFO: Informed scheduler that task   Greet__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 7256] Worker Worker(salt=817197119, workers=1, host=macbookair.local, username=amedama, pid=7256) running   Greet()
INFO: [pid 7256] Worker Worker(salt=817197119, workers=1, host=macbookair.local, username=amedama, pid=7256) done      Greet()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Greet__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=817197119, workers=1, host=macbookair.local, username=amedama, pid=7256) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 Greet()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

当然だけど、今度はファイルができない。

$ cat greeting.txt
cat: greeting.txt: No such file or directory

複数のタスクをチェーンする

これまでの例では一つのタスクを実行するだけだった。 現実世界の仕事では、タスクが一つだけということは滅多にないはず。 もちろん、色々なことをする巨大な一つのタスクを書くことも考えられるけど、それだと Luigi を使う魅力は半減してしまう。 なるべく、それ以上は細分化できないレベルまで仕事を小さくして、それをタスクに落とし込む。

次のサンプルコードでは Greet というタスクと Repeat というタスクをチェーンしている。 Repeat の実行には、まず Greet の実行が必要になる。 このような依存関係を表現するには、タスクのクラスに requires() というメソッドをオーバーライドする。 サンプルコードの内容としては Greet で出力された内容を Repeat で複数繰り返して出力する、というもの。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi


class Greet(luigi.Task):

    def output(self):
        return luigi.LocalTarget('greeting.txt')

    def run(self):
        out = self.output()
        with out.open('w') as f:
            f.write('Hello, World!\n')


class Repeat(luigi.Task):

    def requires(self):
        """Task の実行に必要な別の Task を指定する"""
        return Greet()

    def output(self):
        return luigi.LocalTarget('repeating.txt')

    def run(self):
        # requires() で指定した Task の実行結果 (Target) は input() メソッドで得られる
        input_ = self.input()
        output = self.output()

        with input_.open('r') as r, output.open('w') as w:
            # Greet が出力した内容を読み込む
            lines = r.readlines()

            # Greet の出力を 3 回繰り返し書き込む
            for _ in range(3):
                w.writelines(lines)


def main():
    luigi.run(main_task_cls=Repeat, local_scheduler=True)


if __name__ == '__main__':
    main()

上記を実行してみよう。

$ python requires.py 
DEBUG: Checking if Repeat() is complete
DEBUG: Checking if Greet() is complete
INFO: Informed scheduler that task   Repeat__99914b932b   has status   PENDING
INFO: Informed scheduler that task   Greet__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 7292] Worker Worker(salt=268804510, workers=1, host=macbookair.local, username=amedama, pid=7292) running   Greet()
INFO: [pid 7292] Worker Worker(salt=268804510, workers=1, host=macbookair.local, username=amedama, pid=7292) done      Greet()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Greet__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 7292] Worker Worker(salt=268804510, workers=1, host=macbookair.local, username=amedama, pid=7292) running   Repeat()
INFO: [pid 7292] Worker Worker(salt=268804510, workers=1, host=macbookair.local, username=amedama, pid=7292) done      Repeat()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Repeat__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=268804510, workers=1, host=macbookair.local, username=amedama, pid=7292) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 Greet()
    - 1 Repeat()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

すると、次のようにして実行結果のファイルが作られる。

$ cat greeting.txt 
Hello, World!
$ cat repeating.txt 
Hello, World!
Hello, World!
Hello, World!

ちなみに requires() メソッドをオーバーライドせずに依存関係を表現することもできるにはできる。 それは、次のように run() メソッドの中で直接タスクをインスタンス化して yield でターゲットを取得する方法。 これは、Dynamic dependencies と呼ばれている。 とはいえ、タスクの依存関係が分かりにくくなるので、使うのはなるべく避けた方が良い気がする。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi


class Greet(luigi.Task):

    def output(self):
        return luigi.LocalTarget('greeting.txt')

    def run(self):
        out = self.output()
        with out.open('w') as f:
            f.write('Hello, World!\n')


class Repeat(luigi.Task):

    def output(self):
        return luigi.LocalTarget('repeating.txt')

    def run(self):
        # requires() せずに直接 Task から yield するやり方もある
        input_ = yield Greet()
        output = self.output()

        with input_.open('r') as r, output.open('w') as w:
            lines = r.readlines()

            for _ in range(3):
                w.writelines(lines)


def main():
    luigi.run(main_task_cls=Repeat, local_scheduler=True)


if __name__ == '__main__':
    main()

また、依存するタスクは一つとは限らない。 そんなときは requires() メソッドの中でリストの形で依存するタスクを列挙する。 あるいは yield で一つずつ列挙していっても構わない。 ただし yield を使うやり方だと、後から依存タスクの実行を並列化したいときも、必ず直列に実行されるようになるらしい。 逆に言えば直列に実行することを担保したいときは yield を使うのが良さそう。 また、複数のタスクを集約して実行するためだけのタスクについては WrapperTask を継承すると良いらしい。

次のサンプルコードでは GreetEat そして Sleep というタスクを実行するためのタスクとして RequiresOnly を定義している。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi
from luigi.mock import MockTarget


class OnMemoryTask(luigi.Task):

    def output(self):
        return MockTarget(self.__class__.__name__)


class Greet(OnMemoryTask):

    def run(self):
        out = self.output()
        with out.open('w') as f:
            f.write('Hello, World!\n')


class Eat(OnMemoryTask):

    def run(self):
        out = self.output()
        with out.open('w') as f:
            f.write('Mog, Mog\n')


class Sleep(OnMemoryTask):

    def run(self):
        out = self.output()
        with out.open('w') as f:
            f.write('Zzz...\n')


class RequiresOnly(luigi.WrapperTask):

    def requires(self):
        # 依存している Task は複数書ける
        return [Greet(), Eat(), Sleep()]
        # あるいは
        # yield Greet()
        # yield Eat()
        # yield Sleep()
        # としても良い


def main():
    luigi.run(main_task_cls=RequiresOnly, local_scheduler=True)


if __name__ == '__main__':
    main()

上記を実行してみよう。 タスクの並列度を上げるには --workers オプションを指定する。

$ python reqonly.py --workers 3
DEBUG: Checking if RequiresOnly() is complete
DEBUG: Checking if Greet() is complete
DEBUG: Checking if Eat() is complete
DEBUG: Checking if Sleep() is complete
INFO: Informed scheduler that task   RequiresOnly__99914b932b   has status   PENDING
INFO: Informed scheduler that task   Sleep__99914b932b   has status   PENDING
INFO: Informed scheduler that task   Eat__99914b932b   has status   PENDING
INFO: Informed scheduler that task   Greet__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 3 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 4
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 3
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 7362] Worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360) running   Eat()
DEBUG: 3 running tasks, waiting for next task to finish
INFO: [pid 7362] Worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360) done      Eat()
INFO: [pid 7363] Worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360) running   Greet()
INFO: [pid 7363] Worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360) done      Greet()
INFO: Informed scheduler that task   Eat__99914b932b   has status   DONE
INFO: [pid 7364] Worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360) running   Sleep()
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: Greet__99914b932b is currently run by worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360)
DEBUG: Sleep__99914b932b is currently run by worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360)
INFO: Informed scheduler that task   Greet__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: Sleep__99914b932b is currently run by worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360)
INFO: [pid 7364] Worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360) done      Sleep()
INFO: Informed scheduler that task   Sleep__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: RequiresOnly__99914b932b is currently run by worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360)
INFO: [pid 7365] Worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360) running   RequiresOnly()
INFO: [pid 7365] Worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360) done      RequiresOnly()
INFO: Informed scheduler that task   RequiresOnly__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=629562084, workers=3, host=macbookair.local, username=amedama, pid=7360) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 4 tasks of which:
* 4 ran successfully:
    - 1 Eat()
    - 1 Greet()
    - 1 RequiresOnly()
    - 1 Sleep()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

タスクにパラメータを受け取る

タスクが動作するときに色々なパラメータを受け取りたい、という場面も多いはず。 そんなときは *Parameter を使えば良い。 次のサンプルコードでは、前述した Repeat タスクで繰り返しの数をパラメータ化している。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi


class Greet(luigi.Task):

    def output(self):
        return luigi.LocalTarget('greeting.txt')

    def run(self):
        out = self.output()
        with out.open('w') as f:
            f.write('Hello, World!\n')


class Repeat(luigi.Task):
    # Task の実行に必要な引数を Parameter として受け取る
    repeat_n = luigi.IntParameter(default=3)

    def requires(self):
        return Greet()

    def output(self):
        return luigi.LocalTarget('repeating.txt')

    def run(self):
        input_ = self.input()
        output = self.output()

        with input_.open('r') as r, output.open('w') as w:
            lines = r.readlines()

            # Parameter は Task のアトリビュートとして使える
            for _ in range(self.repeat_n):
                w.writelines(lines)


def main():
    luigi.run(main_task_cls=Repeat, local_scheduler=True)


if __name__ == '__main__':
    main()

上記を実行する前に、先ほど生成されたファイルを削除しておく。 これは、実行結果のファイルがあるときは実行がスキップされるため。 この動作は何のターゲットを使っても基本的にそうなっている。

$ rm repeating.txt

実行してみよう。 パラメータはコマンドラインから渡す。 例えば、今回の例では --repeat-n オプションになる。

$ python params.py --repeat-n=5
DEBUG: Checking if Repeat(repeat_n=5) is complete
DEBUG: Checking if Greet() is complete
INFO: Informed scheduler that task   Repeat_5_96577e160e   has status   PENDING
INFO: Informed scheduler that task   Greet__99914b932b   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 7436] Worker Worker(salt=344136773, workers=1, host=macbookair.local, username=amedama, pid=7436) running   Repeat(repeat_n=5)
INFO: [pid 7436] Worker Worker(salt=344136773, workers=1, host=macbookair.local, username=amedama, pid=7436) done      Repeat(repeat_n=5)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Repeat_5_96577e160e   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=344136773, workers=1, host=macbookair.local, username=amedama, pid=7436) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 1 present dependencies were encountered:
    - 1 Greet()
* 1 ran successfully:
    - 1 Repeat(repeat_n=5)

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

サマリーから Greet タスクはファイルがあるので実行されていないことが分かる。

Repeat タスクのターゲットは、パラメータに応じて再実行された。

$ cat repeating.txt 
Hello, World!
Hello, World!
Hello, World!
Hello, World!
Hello, World!

このように Luigi は実行結果があるときは実行をスキップするようになっている。 そのため、途中でタスクが失敗したときも原因を取り除いて再実行したとき最小限の処理でやり直すことができる。

特定期間のバッチ処理を実行する

例えば、あるバッチ処理が毎日決まった時間に実行されているとしよう。 それが、何らかの原因で失敗して、その日のうちに解決できなかったとする。 こんなときは、基本的に失敗した日の内容も後からリカバーしなきゃいけない。 ただ、当日以外のタスクを実行するというのは、そのように考えられて作っていないと意外と難しいもの。 Luigi だと、そういったユースケースもカバーしやすいように作られている。

例えば、次のようにして処理対象の日付を受け取って実行するようなタスクがあるとする。 こんな風に、処理対象の日付を指定するように作っておくのはバッチ処理でよく使われるパターンらしい。 また、生成されるターゲットのファイルに日付を含むようにしておくのもポイント。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi


class DailyGreet(luigi.Task):
    date = luigi.DateParameter()

    def output(self):
        return luigi.LocalTarget('daily-target-{}.txt'.format(str(self.date)))

    def run(self):
        out = self.output()
        with out.open('w') as f:
            f.write('Hello, World!\n')


def main():
    luigi.run(main_task_cls=DailyGreet, local_scheduler=True)


if __name__ == '__main__':
    main()

ひとまず、通常通り実行してみることにしよう。

$ python daily.py --date=2017-5-13
DEBUG: Checking if DailyGreet(date=2017-05-13) is complete
INFO: Informed scheduler that task   DailyGreet_2017_05_13_284b40e6ab   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 7481] Worker Worker(salt=817061721, workers=1, host=macbookair.local, username=amedama, pid=7481) running   DailyGreet(date=2017-05-13)
INFO: [pid 7481] Worker Worker(salt=817061721, workers=1, host=macbookair.local, username=amedama, pid=7481) done      DailyGreet(date=2017-05-13)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   DailyGreet_2017_05_13_284b40e6ab   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=817061721, workers=1, host=macbookair.local, username=amedama, pid=7481) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 DailyGreet(date=2017-05-13)

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

すると、実行結果のファイルが日付付きで作られる。

$ cat daily-target-2017-05-13.txt 
Hello, World!

Luigi では、このように日付を含むタスクを期間指定で一気に片付けるような方法が用意されている。 具体的には、次のようにして daily モジュールの RangeDailyBase タスクを指定した上で、--of オプションで実行したいタスクを指定する。 その上で、実行する日付を --start--stop オプションで指定する。

$ python -m luigi --module daily RangeDailyBase --of DailyGreet --start 2017-05-01 --stop 2017-05-12 --local-scheduler
...(省略)...
===== Luigi Execution Summary =====

Scheduled 12 tasks of which:
* 12 ran successfully:
    - 11 DailyGreet(date=2017-05-01...2017-05-11)
    - 1 RangeDailyBase(...)

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

すると、範囲内の日付でタスクが一気に実行される。

$ ls daily-target-2017-05-*
daily-target-2017-05-01.txt daily-target-2017-05-07.txt
daily-target-2017-05-02.txt daily-target-2017-05-08.txt
daily-target-2017-05-03.txt daily-target-2017-05-09.txt
daily-target-2017-05-04.txt daily-target-2017-05-10.txt
daily-target-2017-05-05.txt daily-target-2017-05-11.txt
daily-target-2017-05-06.txt daily-target-2017-05-13.txt

例えば、数日前から今日の日付までの範囲を指定して、先ほどの内容を cron などで指定しておくとする。 そうすれば、実行結果のあるものについてはスキップされることから、まだ完了していないものだけを実行できる。 もし一過性の問題で失敗していたとしても、翌日のバッチ処理などで自動的にリカバーされるというわけ。

組み込みのタスクを活用する

Luigi には、色々な外部コンポーネントやサービス、ライブラリと連携するための組み込みタスクが用意されている。 それらは contrib パッケージの中にあって、ざっと見ただけでも例えば Hadoop や Kubernetes などがある。

github.com

今回は一例として Python のオブジェクト・リレーショナルマッパーの一つである SQLAlchemy との連携を試してみる。

まずは SQLAlchemy をインストールしておく。

$ pip install sqlalchemy

次のサンプルコードでは UsersTask が出力する内容を SQLite3 のデータベースに保存している。 luigi.contrib.sqla.CopyToTable を継承したタスクは、依存するタスクからタブ区切りのターゲットを受け取って、内容を各カラムに保存していく。 今回はユーザ情報を模したテーブルを作るようにしてみた。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi
from luigi.mock import MockTarget
from luigi.contrib import sqla

from sqlalchemy import String
from sqlalchemy import Integer


class UsersTask(luigi.Task):

    def output(self):
        return MockTarget(self.__class__.__name__)

    def run(self):
        out = self.output()
        with out.open('w') as f:
            # 実行結果をタブ区切りで書き込んでいく
            f.write('Alice\t24\n')
            f.write('Bob\t30\n')
            f.write('Carol\t18\n')


# 実行結果を SQLAlchemy 経由で RDB に保存するための Task
class SQLATask(sqla.CopyToTable):
    # SQLAlchemy のテーブル定義
    columns = [
        (['name', String(64)], {'primary_key': True}),
        (['age', Integer()], {})
    ]
    # データベースへの接続 URL
    connection_string = 'sqlite:///users.db'
    # 保存するのに使うテーブル名
    table = 'users'

    def requires(self):
        # 実行結果がタブ区切りになっていれば、あとは依存先に指定するだけ
        return UsersTask()


def main():
    luigi.run(main_task_cls=SQLATask, local_scheduler=True)


if __name__ == '__main__':
    main()

上記を実行してみる。

$ python sqla.py        
DEBUG: Checking if SQLATask() is complete
DEBUG: Checking if UsersTask() is complete
INFO: Informed scheduler that task   SQLATask__99914b932b   has status   PENDING
INFO: Informed scheduler that task   UsersTask__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 7638] Worker Worker(salt=923323852, workers=1, host=macbookair.local, username=amedama, pid=7638) running   UsersTask()
INFO: [pid 7638] Worker Worker(salt=923323852, workers=1, host=macbookair.local, username=amedama, pid=7638) done      UsersTask()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   UsersTask__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 7638] Worker Worker(salt=923323852, workers=1, host=macbookair.local, username=amedama, pid=7638) running   SQLATask()
INFO: Running task copy to table for update id SQLATask__99914b932b for table users
INFO: Finished inserting 0 rows into SQLAlchemy target
INFO: Finished inserting rows into SQLAlchemy target
INFO: [pid 7638] Worker Worker(salt=923323852, workers=1, host=macbookair.local, username=amedama, pid=7638) done      SQLATask()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   SQLATask__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=923323852, workers=1, host=macbookair.local, username=amedama, pid=7638) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 SQLATask()
    - 1 UsersTask()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

すると SQLite3 のデータベースができる。

$ file users.db 
users.db: SQLite 3.x database

中身を見るとユーザ情報が保存されている。

$ sqlite3 users.db "SELECT * FROM users"
Alice|24
Bob|30
Carol|18

セントラルスケジューラを使う

前述した通り Luigi にはローカルスケジューラとセントラルスケジューラのモードがある。 これまでの例はローカルスケジューラを使うものだった。 次はセントラルスケジューラを使ってみることにする。

まず、Luigi のセントラルスケジューラには複数のジョブが同時に実行されないよう調整する機能が備わっている。 そして、タスクの状態などを確認するための WebUI なども付属する。 ただし、スケジューラ自体にはタスクを実行する機能はなくて、あくまで上記の二つの機能だけが提供されている。 つまり、タスクの実行自体はセントラルスケジューラに接続したクライアント (ワーカー) の仕事になる。 また、繰り返しになるけど定期・繰り返し実行などの機能はないので cron や systemd timer を使うことになる。

セントラルスケジューラは luigid というコマンドで起動する。

$ luigid

これで TCP:8082 ポートで WebUI が立ち上がる。

$ open http://localhost:8082

試しにセントラルスケジューラを使った状態でタスクを実行してみよう。 WebUI に成功 (done) なタスクが増えるはず。

$ python helloworld.py Greet

その他、セントラルスケジューラを動作させるときの設定などについては、このページを参照する。

Using the Central Scheduler — Luigi 2.6.1 documentation

成功・失敗時のコールバックを使う

タスクが成功したり失敗したときに何かしたいというニーズはあると思う。 例えば失敗したとき Slack とかのチャットに通知を送るとか。

次のサンプルコードでは、タスクに on_success() メソッドをオーバーライドしている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi


class Greet(luigi.Task):

    def run(self):
        print('Hello, World!')

    def on_success(self):
        """Task が成功したときのコールバック"""
        print('SUCCESS!')


def main():
    luigi.run(main_task_cls=Greet, local_scheduler=True)


if __name__ == '__main__':
    main()

上記を実行してみよう。 成功時のコールバックが呼び出されていることが分かる。

$ python event.py 
DEBUG: Checking if Greet() is complete
/Users/amedama/.virtualenvs/py35/lib/python3.5/site-packages/luigi/worker.py:328: UserWarning: Task Greet() without outputs has no custom complete() method
  is_complete = task.complete()
INFO: Informed scheduler that task   Greet__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 8126] Worker Worker(salt=727065743, workers=1, host=macbookair.local, username=amedama, pid=8126) running   Greet()
Hello, World!
SUCCESS!
INFO: [pid 8126] Worker Worker(salt=727065743, workers=1, host=macbookair.local, username=amedama, pid=8126) done      Greet()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Greet__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=727065743, workers=1, host=macbookair.local, username=amedama, pid=8126) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 Greet()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

より広範囲に、どのタスクの成功や失敗でも呼び出されるようにしたいときは次のようにする。 @luigi.Task.event_handler デコレータでコールバックを修飾した上で、呼び出すタイミングを引数で指定する。 今回は luigi.Event.SUCCESS を指定しているのでタスクが成功したときに呼び出される。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import luigi


class Greet(luigi.Task):

    def run(self):
        print('Hello, World!')


@luigi.Task.event_handler(luigi.Event.SUCCESS)
def success_handler(task):
    """Task が成功したときのコールバック (汎用)"""
    print('SUCCESS: {}'.format(task))


def main():
    luigi.run(main_task_cls=Greet, local_scheduler=True)


if __name__ == '__main__':
    main()

こちらも実行してみよう。 コールバックが呼び出されていることが分かる。

$ python event2.py 
DEBUG: Checking if Greet() is complete
/Users/amedama/.virtualenvs/py35/lib/python3.5/site-packages/luigi/worker.py:328: UserWarning: Task Greet() without outputs has no custom complete() method
  is_complete = task.complete()
INFO: Informed scheduler that task   Greet__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 8136] Worker Worker(salt=042982575, workers=1, host=macbookair.local, username=amedama, pid=8136) running   Greet()
Hello, World!
INFO: [pid 8136] Worker Worker(salt=042982575, workers=1, host=macbookair.local, username=amedama, pid=8136) done      Greet()
SUCCESS: Greet()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Greet__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=042982575, workers=1, host=macbookair.local, username=amedama, pid=8136) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 Greet()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

まとめ

今回は Python でバッチ処理を作るときデータパイプラインを構築するためのフレームワークである Luigi を使ってみた。 触ってみた感触としては部分的にクセはあるものの、なかなかシンプルな作りで用途にハマれば使いやすそうだ。

Mac OS X で Apache Spark を触ってみる

最近 Apache Spark について耳にすることが多い。 Apache Spark は、ビッグデータ処理における並列分散処理基盤を提供する OSS の一つ。 似たような用途としては Apache Hadoop も有名だけど、それよりも最大で 100 倍ほど高速に動作するんだとか。 高速に動作する理由としては、各ノードのメモリに乗り切るサイズのデータならディスクを介さずに扱える点が大きいらしい。

今回は、そんな Apache Spark を Mac OS X で軽く触ってみることにする。 本来であれば、用途的には複数のノードを用意して並列分散処理をさせるところだけど使うのは一つのノードだけ。 また Apache Spark を操作するには Java, Scala, Python のインターフェースがある。 その中でも、今回は Python のインターフェース (PySpark) を使ってみることにした。

環境は次の通り。

$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.12.4
BuildVersion:   16E195

インストール

まずは Apache Spark の動作に JRE が必要なのでインストールしておく。 インストールには Homebrew Cask を使うと楽できる。

$ brew cask install java

Apache Spark のインストールは Homebrew を使ってさくっといける。

$ brew install apache-spark

インタラクティブシェルから触ってみる

Apache Spark を Python から扱うには pyspark というコマンドを使う。 このコマンドを起動すると、Python から Apache Spark を扱う上で必要なパッケージなどが自動的にインポートされる。 それ以外については、特に普段使っている REPL と違いはないようだ。

$ pyspark
Python 2.7.10 (default, Feb  6 2017, 23:53:20) 
[GCC 4.2.1 Compatible Apple LLVM 8.0.0 (clang-800.0.34)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/05/04 18:42:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/05/04 18:43:13 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
17/05/04 18:43:13 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
17/05/04 18:43:14 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/

Using Python version 2.7.10 (default, Feb  6 2017 23:53:20)
SparkSession available as 'spark'.
>>> 

サイトパッケージの場所を調べてもシステムの Python の場所になっているし。

>>> from pip import locations
>>> locations.user_site
'/Users/amedama/Library/Python/2.7/lib/python/site-packages'

これは、どうやらデフォルトの python コマンドで起動されるものが使われているだけっぽい? 試しに virtualenv を使ってデフォルトが Python 3.5 になるようにしてみる。

$ python --version
Python 3.5.3

この状態で pyspark コマンドを実行すると、ちゃんと Python 3.5 が使われるようになった。 予想は当たっていたようだ。

$ pyspark    
Python 3.5.3 (default, Feb 26 2017, 01:47:55) 
[GCC 4.2.1 Compatible Apple LLVM 8.0.0 (clang-800.0.42.1)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/05/04 19:29:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/05/04 19:30:00 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/

Using Python version 3.5.3 (default, Feb 26 2017 01:47:55)
SparkSession available as 'spark'.
>>> 

インタラクティブシェルを終了するときは通常の Python の REPL と同じように Ctrl-D とか exit() 関数とかで。

>>> exit()

テキストファイルを処理してみる

それでは、次は実際に Apache Spark でテキストファイルを扱ってみることにしよう。 題材は Apache Spark の README ファイルにする。

$ brew install wget
$ wget https://raw.githubusercontent.com/apache/spark/master/README.md

あと、なんか操作していると随所で psutil 入れた方が良いよっていう警告が出るので入れておく。

$ pip install psutil

インタラクティブシェルを起動しよう。

$ pyspark

PySpark のインタラクティブシェルでは sc という SparkContext のインスタンスが処理の取っ掛かりになるみたい。 これは、あらかじめインタラクティブシェルを起動した時点で用意されている。

>>> sc
<pyspark.context.SparkContext object at 0x1068cf810>

まずは SparkContext#textFile() メソッドでテキストファイルを読み込む。

>>> textfile = sc.textFile("README.md")

これで得られるインスタンスは Resilient Distributed Dataset (RDD) と呼ばれる形式になっている。 基本的に Apache Spark では、この RDD という単位でデータを扱うことになる。

>>> textfile
README.md MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

この RDD には、あらかじめ色々な API が用意されている。 例えば RDD に含まれるデータの数は count() というメソッドで得られる。

>>> textfile.count()
103

他にも、データセットの最初の要素を取り出す first() だとか。

>>> textfile.first()
u'# Apache Spark'

データセットに含まれる要素全てを得るには collect() などを使う。

>>> textfile.collect()
[u'# Apache Spark', u'', u'Spark is a fast and general cluster computing system for Big Data. It provides', u'high-level APIs in Scala, Java, Python, and R, and an optimized engine that', u'supports general computation graphs for data analysis. It also supports a', u'rich set of higher-level tools including Spark SQL for SQL and DataFrames,', u'MLlib for machine learning, GraphX for graph processing,', u'and Spark Streaming for stream processing.', u'', u'<http://spark.apache.org/>', u'', u'', u'## Online Documentation', u'', u'You can find the latest Spark documentation, including a programming', u'guide, on the [project web page](http://spark.apache.org/documentation.html).', u'This README file only contains basic setup instructions.', u'', u'## Building Spark', u'', u'Spark is built using [Apache Maven](http://maven.apache.org/).', u'To build Spark and its example programs, run:', u'', u'    build/mvn -DskipTests clean package', u'', u'(You do not need to do this if you downloaded a pre-built package.)', u'', u'You can build Spark using more than one thread by using the -T option with Maven, see ["Parallel builds in Maven 3"](https://cwiki.apache.org/confluence/display/MAVEN/Parallel+builds+in+Maven+3).', u'More detailed documentation is available from the project site, at', u'["Building Spark"](http://spark.apache.org/docs/latest/building-spark.html).', u'', u'For general development tips, including info on developing Spark using an IDE, see ["Useful Developer Tools"](http://spark.apache.org/developer-tools.html).', u'', u'## Interactive Scala Shell', u'', u'The easiest way to start using Spark is through the Scala shell:', u'', u'    ./bin/spark-shell', u'', u'Try the following command, which should return 1000:', u'', u'    scala> sc.parallelize(1 to 1000).count()', u'', u'## Interactive Python Shell', u'', u'Alternatively, if you prefer Python, you can use the Python shell:', u'', u'    ./bin/pyspark', u'', u'And run the following command, which should also return 1000:', u'', u'    >>> sc.parallelize(range(1000)).count()', u'', u'## Example Programs', u'', u'Spark also comes with several sample programs in the `examples` directory.', u'To run one of them, use `./bin/run-example <class> [params]`. For example:', u'', u'    ./bin/run-example SparkPi', u'', u'will run the Pi example locally.', u'', u'You can set the MASTER environment variable when running examples to submit', u'examples to a cluster. This can be a mesos:// or spark:// URL,', u'"yarn" to run on YARN, and "local" to run', u'locally with one thread, or "local[N]" to run locally with N threads. You', u'can also use an abbreviated class name if the class is in the `examples`', u'package. For instance:', u'', u'    MASTER=spark://host:7077 ./bin/run-example SparkPi', u'', u'Many of the example programs print usage help if no params are given.', u'', u'## Running Tests', u'', u'Testing first requires [building Spark](#building-spark). Once Spark is built, tests', u'can be run using:', u'', u'    ./dev/run-tests', u'', u'Please see the guidance on how to', u'[run tests for a module, or individual tests](http://spark.apache.org/developer-tools.html#individual-tests).', u'', u'## A Note About Hadoop Versions', u'', u'Spark uses the Hadoop core library to talk to HDFS and other Hadoop-supported', u'storage systems. Because the protocols have changed in different versions of', u'Hadoop, you must build Spark against the same version that your cluster runs.', u'', u'Please refer to the build documentation at', u'["Specifying the Hadoop Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version)', u'for detailed guidance on building for a particular distribution of Hadoop, including', u'building for particular Hive and Hive Thriftserver distributions.', u'', u'## Configuration', u'', u'Please refer to the [Configuration Guide](http://spark.apache.org/docs/latest/configuration.html)', u'in the online documentation for an overview on how to configure Spark.', u'', u'## Contributing', u'', u'Please review the [Contribution to Spark guide](http://spark.apache.org/contributing.html)', u'for information on how to get started contributing to the project.']

RDD に用意されている全ての API について知りたいときは、以下の公式ドキュメントを参照する感じで。

spark.apache.org

例として「Spark」という文字列が含まれる行だけを取り出してみよう。 このような要素には filter() メソッドが使える。

>>> filtered_rdd = textfile.filter(lambda line: u'Spark' in line)

処理の結果も、また RDD で得られる。 見たところ 20 行あるようだ。

>>> filtered_rdd.count()
20

要素を見ると、たしかにどの行にも「Spark」の文字列が含まれている。

>>> filtered_rdd.collect()
[u'# Apache Spark', u'Spark is a fast and general cluster computing system for Big Data. It provides', u'rich set of higher-level tools including Spark SQL for SQL and DataFrames,', u'and Spark Streaming for stream processing.', u'You can find the latest Spark documentation, including a programming', u'## Building Spark', u'Spark is built using [Apache Maven](http://maven.apache.org/).', u'To build Spark and its example programs, run:', u'You can build Spark using more than one thread by using the -T option with Maven, see ["Parallel builds in Maven 3"](https://cwiki.apache.org/confluence/display/MAVEN/Parallel+builds+in+Maven+3).', u'["Building Spark"](http://spark.apache.org/docs/latest/building-spark.html).', u'For general development tips, including info on developing Spark using an IDE, see ["Useful Developer Tools"](http://spark.apache.org/developer-tools.html).', u'The easiest way to start using Spark is through the Scala shell:', u'Spark also comes with several sample programs in the `examples` directory.', u'    ./bin/run-example SparkPi', u'    MASTER=spark://host:7077 ./bin/run-example SparkPi', u'Testing first requires [building Spark](#building-spark). Once Spark is built, tests', u'Spark uses the Hadoop core library to talk to HDFS and other Hadoop-supported', u'Hadoop, you must build Spark against the same version that your cluster runs.', u'in the online documentation for an overview on how to configure Spark.', u'Please review the [Contribution to Spark guide](http://spark.apache.org/contributing.html)']

次は、ビッグデータ処理のハローワールドとも言えるワードカウントを試してみよう。 まずは、テキストの内容をスペースで区切る。 これは MapReduce アルゴリズムでいう Map に相当する。

>>> words = textfile.flatMap(lambda line: line.split())
>>> words.count()
495

まあ、このままだと空白で区切っただけなので単語ではないものも含まれちゃうけど。

>>> words.first()
u'#'

先ほどテキストを区切るのに flatMap() を使ったのは、ただの map() だとリストが複数含まれるデータセットになってしまうため。 リストをさらに開いた状態 (flat) にしておかないと扱いづらい。

>>> textfile.map(lambda line: line.split()).first()
[u'#', u'Apache', u'Spark']

続いて、各単語に対して出現頻度をカウントするために数字を添えてタプルにする。 これも Map 処理だね。

>>> words_tuple = words.map(lambda word: (word, 1))
>>> words_tuple.first()
(u'#', 1)

あとはキー (各単語) ごとに出現頻度をカウントする。 これが MapReduce でいう Reduce に相当する。 ここの処理では、タプル同士を足し算すると第二要素のカウンタが増えることになる。 この動作は Python の流儀からすると、ちょっと直感に反するね。

>>> words_count = words_tuple.reduceByKey(lambda a, b: a + b)

これで、それぞれの単語の出現頻度がカウントできた。

>>> words_count.collect()[:10]
[('guide,', 1), ('APIs', 1), ('optimized', 1), ('name', 1), ('storage', 1), ('developing', 1), ('It', 2), ('package.', 1), ('particular', 2), ('development', 1)]

一番登場する頻度が多いのはどれなのかを調べるために、カウンタの値にもとづいて降順ソートする。 どうやら「the」が一番多いらしい。

>>> words_count_sorted = words_count.sortBy(lambda t: t[1], False)
>>> words_count_sorted.collect()[:10]
[('the', 24), ('to', 17), ('Spark', 16), ('for', 12), ('and', 9), ('##', 9), ('a', 8), ('run', 7), ('on', 7), ('can', 7)]

まとめ

今回は Apache Spark を Python のインタラクティブシェルを通して軽く触ってみた。

Python3 エンジニア基礎認定試験を受けてみた

表題の通り、Python3 エンジニア基礎認定試験という民間の試験を受けてみた。

www.pythonic-exam.com

最近になって合格証書が届いたので、どんな感じだったか軽く書いてみる。

f:id:momijiame:20170409221053j:plain

受けるまでの経緯

Python の認定試験が始まるらしいということは以前から知っていたんだけど、続報を聞かないのでどうなっているのかなーと思ってた。 そんな折、友人と Python について話す機会があって、そこでベータ試験が始まっていることを教えてもらい受けてみることにした。 最初の方のベータ試験は受験料が無料だったみたいだけど、受けたのは最終ベータ試験ということで有料 (¥10,800) だった。 ちなみに、ベータ試験であっても合格すれば本試験に合格したのと同じ扱いになるらしい。

蛇足だけど、ベータ試験の場合は払った費用が受験料という名目ではなくて試験を主催してる団体への協賛金という扱いになるっぽい。 要するに、お金周りの動きのチェックってことなんだろうね。 あと、特典として Pythonic マグカップがもらえた。

f:id:momijiame:20170204155948j:plain:w300

受験しての所感

問題の内容は、特に引っかけという感じの設問もなく素直な印象を受けた。 普段 Python で実務をこなしてるような人なら、特に勉強しなくても受かるんじゃないだろうか。 一応、認定教材としては Python チュートリアルが挙げられている。 試験の問題は、この本の内容に沿って章ごとに一定の割合で出されるっぽい。

Pythonチュートリアル 第3版

Pythonチュートリアル 第3版

それと、合格証書と一緒に、こんな感じで何処でどう得点したか示された紙がもらえる。 得点の低かった標準ライブラリは盲点で、普段使わない機能とかが出てくると分からないね・・・。 別に使ったことなくても、そのときは公式ドキュメントを調べれば良いんだし!(言い訳)

f:id:momijiame:20170409221934j:plain

受験時には、問題の誤りや Pythonic じゃないと感じた点があったら問題用紙に記号を書き込んで教えてくれ、というようなアナウンスもあった。 これは、おそらくベータ試験だからの措置で本試験ではきっとなくなるんだろうと思う。 ちなみに、試験後に問題用紙は回収されるので持って帰ることはできなかった。 あと、受験したベータ試験ではマークシート方式の筆記試験だったけど、本試験が始まったら CBT になるっぽい?

まとめ

今回、Python3 エンジニア基礎認定試験のベータ試験を受けて合格した。 内容的には落とすための試験という感じではないので、基本的な文法や機能さえちゃんと理解していれば受かりそう。

Mac OS X で Apache Kafka を触ってみる

Apache Kafka は OSS の分散型メッセージングミドルウェア。 似た性質を持ったソフトウェアとしては ActiveMQRabbitMQ などが挙げられる。 ただし、ActiveMQ や RabbitMQ との大きな違いは、独自のバイナリプロトコルを用いてメッセージをやり取りするところ。 ActiveMQ や RabbitMQ は標準化された AMQPMQTT を扱う場合が多い。 独自プロトコルというと、なんだか未来が無さそうなイメージがあるけど、その逆で Kafka はビッグデータ処理の場面ではほぼデファクトの位置にあるようだ。

今回は、そんな Kafka を手元の Mac でさくっと試してみることにする。 使った環境は次の通り。

$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.12.4
BuildVersion:   16E195

インストールする

Kafka は Homebrew でインストールできる。 なので、まずは Homebrew をインストールしておく。 その上で Homebrew Cask についても使えるようにしておく。

$ /usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"
$ brew tap caskroom/cask

Kafka は Java と Scala を使って書かれている。 なので、まずは動作に必要な Java の処理系をインストールする。

$ brew cask install java

あとはお目当ての Kafka をインストールするだけ。

$ brew install kafka

このとき、依存パッケージとして Apache ZooKeeper もインストールされる。

サービスを動かす

続いてはインストールした Kafka を動作させる。 これには Homebrew のサービス機能を使うと楽ができる。

インストールした直後では Kafka と ZooKeeper は動いていない。

$ brew services list
Name      Status  User Plist
kafka     stopped
zookeeper stopped

そこで ZooKeeper と Kafka を順番に立ち上げていく。

$ brew services start zookeeper
$ brew services start kafka

両方のサービスが立ち上がればオッケー。

$ brew services list           
Name      Status  User    Plist
kafka     started amedama /Users/amedama/Library/LaunchAgents/homebrew.mxcl.kafka.plist
zookeeper started amedama /Users/amedama/Library/LaunchAgents/homebrew.mxcl.zookeeper.plist

ちなみに、上記の機能を使わなくても次のようにしてコマンドラインから起動することもできる。

$ zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
$ kafka-server-start /usr/local/etc/kafka/server.properties

もし、上手く立ち上がらないときは、それぞれのログを確認しよう。 ログはデフォルトで /usr/local/var/log に保存されている。

同梱されているコマンドで Kafka を操作してみる

Kafka には操作するためのコマンドラインツールが同梱されている。 今回は、それを使ってみる。

トピックを作る

まずは、動作確認用のトピックを作成する。 トピックは Pub/Sub 型のメッセージングミドルウェアによく登場する概念だ。 これは、例えば扱うメッセージの種類やコンポーネントごとに用意する。

$ kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".

--replication-factor オプションはクラスタ内でのメッセージのレプリケーション数を表している。 今回はシングルホスト構成なので必然的に 1 となる。 --partitions オプションはトピックでメッセージを分散処理するための機能の指定で、これを使った例については後ほど紹介する。

まずは、これで動作確認用の test トピックができた。

$ kafka-topics --list --zookeeper localhost:2181
test

メッセージを読み書きする

次に、作成したトピックに対してコンシューマ、つまりメッセージをトピックから読み出す存在を接続する。

$ kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning

続いて、作成したトピックに対してプロデューサ、つまりメッセージをトピックに書き込む存在を接続する。 ターミナルから、何か適当にメッセージを書き込んでみよう。

$ kafka-console-producer --broker-list localhost:9092 --topic test
Hello, World!

すると、先ほど接続したコンシューマに、プロデューサが書き込んだメッセージが送られる。

$ kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
Hello, World!

ちなみに、Kafka のメッセージはデフォルトで永続化されている。 例えば、Ctrl-C で一旦コンシューマを停止させてから、もう一度立ち上げてみよう。

$ kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
Hello, World!

ちゃんと、先ほど受信したのと同じ内容が表示された。 これは、Kafka を再起動しても同じ結果になるので試してみると面白い。

ちなみに、コマンドを --from-beginning オプションを付けずに実行すれば、途中から受信できる。

$ kafka-console-consumer --bootstrap-server localhost:9092 --topic test

複数のコンシューマでメッセージを分散処理する

これまでの例では、トピックを作るときにパーティションを 1 に指定した動作を試してきた。 次は、複数のコンピューマでメッセージを分散処理するために、トピックに複数のパーティーションを作ってみる。

今度はトピック greet をパーティーション数 2 で作成する。

$ kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic greet
Created topic "greet".

続いては、作成したトピックにコンシューマを接続する。 このとき、自身が受信するパーティションを --partition オプションで指定する。 一つ目のターミナルでは、まずは 0 を指定しておく。

$ kafka-console-consumer --bootstrap-server localhost:9092 --partition 0 --topic greet --from-beginning

続いて別のターミナルを開いて、パーティーションが 1 のコンシューマを接続する。

$ kafka-console-consumer --bootstrap-server localhost:9092 --partition 1 --topic greet --from-beginning

次はトピックにプロデューサを接続して、適当にメッセージを送ってみよう。

$ kafka-console-producer --broker-list localhost:9092 --topic greet
Message1
Message2

すると、送ったメッセージがバラバラにコンシューマに届くことが分かる。 まず、最初のメッセージはパーティーション 0 のコンシューマに送られた。

$ kafka-console-consumer --bootstrap-server localhost:9092 --partition 0 --topic greet --from-beginning
Message1

そして、二番目のメッセージはパーティーション 1 のコンシューマに送られている。

$ kafka-console-consumer --bootstrap-server localhost:9092 --partition 1 --topic greet --from-beginning
Message2

このように、パーティーションの機能を使うことでメッセージを複数のコンシューマで分散処理できるようになっている。

まとめ

今回は Mac OS X を使って Apache Kafka を軽く触ってみた。 実運用に載せるなら色々と考えるところはあるけど、手元の検証用としてなら簡単に環境を用意して触れることが分かった。 Apache Kafka はビッグデータ処理のメッセージングミドルウェアとしてはデファクトの存在なので引き続き知見を貯めていきたい。