CUBE SUGAR CONTAINER

技術系のこと書きます。

Apache Hive を JDBC 経由で操作する

以前、Apache Hive の環境構築についてこのブログで書いた。

blog.amedama.jp

上記では Hive を操作するのに、同梱されたシェルを使っていた。 今回は JDBC (Java Database Connectivity) を使って操作する例を示す。

環境については先ほど紹介したエントリで構築されていることが前提になっている。 具体的には、次の環境変数が設定されているところがポイント。

$ echo $HADOOP_CONF_DIR
/home/vagrant/hadoop-2.8.0/etc/hadoop
$ echo $HIVE_HOME
/home/vagrant/apache-hive-1.2.2-bin

HiveServer2 を起動する

ぶっちゃけ JDBC 経由で Hive を操作するのに必要なのは HiveServer2 を起動することだけ。

$ $HIVE_HOME/bin/hiveserver2

あるいは hive コマンドに --service オプションで hiveserver2 を指定しても起動できる。

$ $HIVE_HOME/bin/hive --service hiveserver2

これで TCP:10000 で JDBC 用のポートを Listen し始める。

$ sudo netstat -tlnp | grep 10000
tcp        0      0 0.0.0.0:10000           0.0.0.0:*               LISTEN      22315/java

beeline で動作を確認する

HiveServer2 が起動できたら、まずは Hive に同梱されている beeline というシェルを使って接続してみよう。

次のようにして接続する。

$ $HIVE_HOME/bin/beeline -u jdbc:hive2://master:10000 -n vagrant

すると、次のようにインタラクティブシェルが起動する。

0: jdbc:hive2://master:10000>

あとは、ここに Hive で実行したい SQL を入力していくだけ。

> SHOW DATABASES;
+----------------+--+
| database_name  |
+----------------+--+
| default        |
+----------------+--+
1 row selected (1.439 seconds)

次のような感じで普通に使える。

> CREATE TABLE users (name STRING);
No rows affected (0.372 seconds)
> SHOW TABLES;
+-----------+--+
| tab_name  |
+-----------+--+
| users     |
+-----------+--+
1 row selected (0.053 seconds)
> INSERT INTO users VALUES ("Alice");
...(snip)...
No rows affected (19.367 seconds)
> SELECT * FROM users;
+-------------+--+
| users.name  |
+-------------+--+
| Alice       |
+-------------+--+
1 row selected (0.192 seconds)
> DROP TABLE users;
No rows affected (1.088 seconds)

Apache Spark から使ってみる

試しに Apache Spark からも接続してみよう。

まずはバイナリをダウンロードしてきて展開する。

$ wget https://d3kbcqa49mib13.cloudfront.net/spark-2.1.1-bin-hadoop2.7.tgz
$ tar xf spark-2.1.1-bin-hadoop2.7.tgz

環境変数 SPARK_HOME を設定しておく。

$ cat << 'EOF' >> ~/.bashrc
export SPARK_HOME=~/spark-2.1.1-bin-hadoop2.7
EOF
$ source ~/.bashrc

Apache Spark のディレクトリ内には Hive に JDBC で接続するための jar ファイルがある。 なので、特に新しい jar ファイルを用意しなくても接続できる。

$ find $SPARK_HOME/jars | grep hive-jdbc
/home/vagrant/spark-2.1.1-bin-hadoop2.7/jars/hive-jdbc-1.2.1.spark2.jar

クラスタマネージャに YARN を指定して Spark シェルを起動しよう。

$ $SPARK_HOME/bin/spark-shell --master yarn

Hive 用の JDBC ドライバをロードする。

scala> Class.forName("org.apache.hive.jdbc.HiveDriver");
res0: Class[_] = class org.apache.hive.jdbc.HiveDriver

SQL のコネクションを取得するための DriverManager をインポートする。

scala> import java.sql.DriverManager
import java.sql.DriverManager

先ほど beeline で使ったのと同じ URL を指定してコネクションを取得する。

scala> val connection = DriverManager.getConnection("jdbc:hive2://master:10000", "vagrant", "");
connection: java.sql.Connection = org.apache.hive.jdbc.HiveConnection@1a4564a2

そしてコネクションからステートメントを得る。

scala> val statement = connection.createStatement();
statement: java.sql.Statement = org.apache.hive.jdbc.HiveStatement@7a085d02

あとは、ここで SQL を実行すれば良い。 結果は ResultSet として得られる。

scala> val resultSet = statement.executeQuery("SHOW DATABASES");
resultSet: java.sql.ResultSet = org.apache.hive.jdbc.HiveQueryResultSet@3100b5cf

あとは ResultSet から結果を取り出す。

scala> resultSet.next()
res1: Boolean = true
scala> resultSet.getString("DATABASE_NAME")
res2: String = default

おまけ (SparkSQL)

とはいえ Apache Spark は SparkSQL があるので上記のように JDBC 経由で Hive を使わなくても SQL が実行できるっぽい。

まずは SparkSession をインポートしておく。

scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession

その上で Hive サポートを有効にした SparkSession を取得する。

scala> val sparkSession = SparkSession.builder.master("yarn").appName("SparkSession with Hive support").enableHiveSupport().getOrCreate()
17/06/22 20:59:35 WARN sql.SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
sparkSession: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@55ba4bff

あとは得られたセッション経由で SQL を実行するだけ。

scala> val df = sparkSession.sql("SHOW DATABASES")
df: org.apache.spark.sql.DataFrame = [databaseName: string]

結果が DataFrame で返ってくるから扱いやすい。

scala> df.show()
+------------+
|databaseName|
+------------+
|     default|
+------------+

まとめ

今回は HiveServer2 を起動することで Apache Hive を JDBC 経由で扱えるようにする方法について書いた。

プログラミング Hive

プログラミング Hive

  • 作者: Edward Capriolo,Dean Wampler,Jason Rutherglen,佐藤直生,嶋内翔,Sky株式会社玉川竜司
  • 出版社/メーカー: オライリージャパン
  • 発売日: 2013/06/15
  • メディア: 大型本
  • この商品を含むブログ (3件) を見る

統計: 偏相関係数で擬似(無)相関の有無を調べる

以前、このブログでは共分散や相関係数について扱ったことがある。 共分散や相関係数というのは、二つの変数間に線形な関係があるかを調べる方法だった。

blog.amedama.jp

しかし、実はただの相関係数では「第三の変数」からの影響を受けてしまう場合がある。 それというのは、第三の変数の存在によって、あたかも相関しているように見える (疑似相関) あるいは相関していないように見える (疑似無相関) というもの。

これは実際の例がないと、なかなか分かりづらいものだと思うんだけど良い例があったので紹介してみる。 今回はプロ野球の打撃成績に潜む疑似無相関を偏相関係数であぶり出してみることにする。

データをスクレイピングする

ひとまずデータがないと話にならないので、まずはスクレイピングしてくるところから始める。

使った環境は次の通り。

$ sw_vers 
ProductName:    Mac OS X
ProductVersion: 10.12.5
BuildVersion:   16F73
$ python --version
Python 3.6.1

今回は全て Python を使って処理する。 使うライブラリは pandasnumpy だけ。 ただし pandas をスクレイピングに使うときは別途依存ライブラリが必要になる。

$ pip install pandas beautifulsoup4 requests lxml html5lib

各年度の選手成績はプロ野球の公式サイトから得られる。

npb.jp

試しに規定打席に達した選手の打撃成績を取得してみよう。 pandas#read_html() にスクレイピングしたい URL を指定する。

>>> import pandas as pd
>>> url = 'http://npb.jp/bis/2016/stats/bat_c.html'
>>> tables = pd.read_html(url)

これだけでページに含まれるテーブルを上手いことパースして DataFrame に落とし込んでくれる。

>>> df_with_header = tables[0]  # 成績のテーブルを取り出す
>>> df_with_header
                            0       1    2     3    4    5    6    7    8   \
0   規定打席 :チーム試合数×3.1 (端数は四捨五入)     NaN  NaN   NaN  NaN  NaN  NaN  NaN  NaN
1                          順 位     選 手  打 率   試 合  打 席  打 数  得 点  安 打  二塁打
2                            1   角中 勝也  (ロ)  .339  143  607  525   74  178
3                            2   西川 遥輝  (日)  .314  138  593  493   76  155
4                            3   浅村 栄斗  (西)  .309  143  611  557   73  172
5                            4   糸井 嘉男  (オ)  .306  143  616  532   79  163
6                            5   柳田 悠岐  (ソ)  .306  120  536  428   82  131
7                            6   内川 聖一  (ソ)  .304  141  605  556   62  169
8                            7   秋山 翔吾  (西)  .296  143  671  578   98  171
9                            8    陽 岱鋼  (日)  .293  130  555  495   66  145
10                           9    中村 晃  (ソ)  .287  143  612  488   69  140
...

十年分のデータを取得する

先ほど実行した内容にもとづいて十年分のデータを取得してみよう。 これは、単年ではデータ点数が少なくてあまり信頼のおけるものではなくなるため。

今回扱うのは「安打数」と「三振数」と「打席数」なので、それを入れる変数を用意しておく。

>>> import numpy as np
>>> h = pd.Series(dtype=np.int64)
>>> bb = pd.Series(dtype=np.int64)
>>> ab = pd.Series(dtype=np.int64)

あとは上記の変数に十年分のセ・パのデータを追加していく。

>>> import itertools
>>> import time
>>> for year, league in itertools.product(range(2006, 2017), ['p', 'c']):
...     url = 'http://npb.jp/bis/{year}/stats/bat_{league}.html'.format(year=year, league=league)  # スクレイピング対象 の URL
...     tables = pandas.read_html(url)  # テーブルを抽出する
...     df_with_header = tables[0]  # 成績のテーブルを取り出す
...     features = df_with_header[2:]  # ヘッダを捨てる
...     h = h.append(features[8])  # 安打数のデータを取り出す
...     bb = bb.append(features[21])  # 三振数のデータを取り出す
...     ab = ab.append(features[5])  # 打席数のデータを取り出す
...     time.sleep(1)  # リクエスト間で負荷をかけないように少し待つ
...

これで 636 人分の選手データが集まった。

>>> len(h)
636
>>> len(bb)
636
>>> len(ab)
636

安打と三振の相関係数を計算する

データが揃ったので、次は相関係数を計算してみる。

仮説として、安打をたくさん打つ選手ほど三振の数は少ないのではないか?と考えて両者の相関係数を調べてみよう。 この仮説が正しければ安打と三振の間には負の相関があるはずだ。

それでは numpy#corrcoef() を使って相関行列を計算させてみよう。

>>> np.corrcoef(h, bb)  # 安打数と三振数の相関行列を計算する
array([[ 1.        ,  0.06108444],
       [ 0.06108444,  1.        ]])

安打と三振の相関係数は 0.06 なので、ほとんど相関がないといえる。 これだけ見ると、仮説は間違っていたのだろうか?と思える。

偏相関係数を計算する

しかし、実際には第三の変数によって疑似無相関になっている可能性がある。

そんなときは第三の変数の影響を無くした相関係数として「偏相関係数」を計算してみよう。 次の数式が偏相関係数を求めるためのもの。

 r_{yz_x} = \frac{r_{yz} - r_{xy} r_{xz}}{\sqrt{1 - r_{xy}^2} \sqrt{1 - r_{xz}^2}}

ここで  r_{ab} は変数 ab の相関係数を表している。 上記は第三の変数 x の影響を取り除いた yz の偏相関係数を求める式になっている。

調べたところ、どうやら Python の既知のライブラリには偏相関係数を計算する実装がないようだ。 なので、ひとまず自前で関数を用意した。

>>> import math
>>> def partial_corrcoef(x, y, z):
...     """第三の変数 x の影響を除いた y と z の相関係数 (偏相関係数) を求める関数"""
...     correlation_matrix = np.corrcoef((x, y, z))
...     r_xy = correlation_matrix[0, 1]
...     r_xz = correlation_matrix[0, 2]
...     r_yz = correlation_matrix[1, 2]
...     r_yz_x = (r_yz - r_xy * r_xz) / (math.sqrt(1 - r_xy ** 2) * math.sqrt(1 - r_xz ** 2))  # noqa
...     return r_yz_x
...

上記の関数を使って「安打数」と「三振数」の相関係数から「打席数」の影響を取り除いてみよう。

>>> partial_corrcoef(ab, h, bb)  # 打席数の影響を取り除いた安打数と三振数の相関係数を計算する
-0.31105000209505901

結果は -0.311 と「弱い負の相関」があることが分かった。 どうやら打席数という第三の変数が影響することで擬似無相関になっていたらしい。

まあ、実際には第三の変数を見つけることが大変だから一通り偏相関係数を求めた上で相関係数との変動の大小を比べるのが良いのかな。

まとめ

  • 単純な相関係数では第三の変数の影響により疑似相関・疑似無相関になっている恐れがある
  • 第三の変数の影響を取り除くには偏相関係数を計算すれば良い

SQL: 内部的なコードを人間に分かりやすいラベルに変換して表示する

RDB のスキーマには、たまに対応表などを参照しながらでないと分からないような内部的なコードが使われていることがある。 大抵はアプリケーションの中で変換して表示するだろうけど、これを直接 SELECT とかで確認しようとすると分かりにくい。 今回は、それを見やすくするためのテクニックについて。

使った環境は次の通り。

$ sw_vers 
ProductName:    Mac OS X
ProductVersion: 10.12.5
BuildVersion:   16F73
$ mysqld --version
mysqld  Ver 5.7.18 for osx10.12 on x86_64 (Homebrew)
$ mysql --version
mysql  Ver 14.14 Distrib 5.7.18, for osx10.12 (x86_64) using  EditLine wrapper

まずは MySQL のインタラクティブシェルに入る。

$ mysql -u root

そしてサンプル用のテーブルを用意しておく。 内部的なコードに対応するのは users.position_code だ。

mysql> DROP TABLE IF EXISTS users;
Query OK, 0 rows affected, 1 warning (0.00 sec)
mysql> CREATE TABLE users (
    ->   id integer,
    ->   name varchar(255),
    ->   age integer,
    ->   position_code integer
    -> );
Query OK, 0 rows affected (0.04 sec)

そして、いくつかレコードを追加しておこう。

mysql> INSERT INTO users
    -> VALUES
    ->   (1, 'Alice', 20, 1),
    ->   (2, 'Bob', 25, 2),
    ->   (3, 'Carol', 30, 3);
Query OK, 3 rows affected (0.00 sec)
Records: 3  Duplicates: 0  Warnings: 0

こんな感じでデータが入る。 この状態では position_code に一体どんな意味があるのか分からない。

mysql> SELECT *
    -> FROM users;
+------+-------+------+---------------+
| id   | name  | age  | position_code |
+------+-------+------+---------------+
|    1 | Alice |   20 |             1 |
|    2 | Bob   |   25 |             2 |
|    3 | Carol |   30 |             3 |
+------+-------+------+---------------+
3 rows in set (0.00 sec)

そこで CASE ~ END を用いて分かりやすいラベルに変換する。 条件を WHEN で指定したら、対応する値を THEN で返せば良い。 どれにも該当しない場合には ELSE を使う。

mysql> SELECT
    ->   id,
    ->   name,
    ->   CASE
    ->     WHEN position_code = 1 THEN '一般職員'
    ->     WHEN position_code = 2 THEN '主任'
    ->     WHEN position_code = 3 THEN '課長'
    ->     ELSE '不明'
    ->   END as position
    -> FROM users;
+------+-------+--------------+
| id   | name  | position     |
+------+-------+--------------+
|    1 | Alice | 一般職員     |
|    2 | Bob   | 主任         |
|    3 | Carol | 課長         |
+------+-------+--------------+
3 rows in set (0.00 sec)

これで分かりやすくなった。

もちろんこのテクニックはコードの変換だけでなく特定の条件に一致するか、みたいな確認にも使える。 次の例は年齢が 30 を超えているかを misoji カラムに表示している。

mysql> SELECT
    ->   id,
    ->   name,
    ->   CASE
    ->     WHEN age >= 30 THEN 'YES'
    ->     ELSE 'NO'
    ->   END as misoji
    -> FROM users;
+------+-------+--------+
| id   | name  | misoji |
+------+-------+--------+
|    1 | Alice | NO     |
|    2 | Bob   | NO     |
|    3 | Carol | YES    |
+------+-------+--------+
3 rows in set (0.00 sec)

めでたしめでたし。

ビッグデータ分析・活用のためのSQLレシピ

ビッグデータ分析・活用のためのSQLレシピ

  • 作者: 加嵜長門,田宮直人,丸山弘詩
  • 出版社/メーカー: マイナビ出版
  • 発売日: 2017/03/27
  • メディア: 単行本(ソフトカバー)
  • この商品を含むブログを見る

ファイルから SQL を読み込む (MySQL, PostgreSQL, SQLite3)

色々な RDBMS で SQL の書かれたファイルを読み込む方法について調べた。

まずはSQL の書かれたテキストファイルを用意しておく。

$ cat << 'EOF' > sample.sql
DROP TABLE IF EXISTS users;
CREATE TABLE users (
  id integer,
  name varchar(255),
  age integer
);
INSERT INTO users
VALUES
  (1, 'Alice', 20),
  (2, 'Bob', 30),
  (3, 'Carol', 40);
EOF

こんな感じ。 users テーブルを作って、そこにレコードを追加している。

$ cat sample.sql
DROP TABLE IF EXISTS users;
CREATE TABLE users (
  id integer,
  name varchar(255),
  age integer
);
INSERT INTO users
VALUES
  (1, 'Alice', 20),
  (2, 'Bob', 30),
  (3, 'Carol', 40);

TL;DR

早見表

RDBMS コマンドライン インタラクティブシェル
MySQL mysql -D [database] < [filepath] source [filepath]
PostgreSQL psql -d [database] -f [filepath] \i [filepath]
SQLite3 sqlite3 [database] < [filepath] .read [filepath]

MySQL

まずは MySQL から。 使ったバージョンは次の通り。

$ mysqld --version
mysqld  Ver 5.7.18 for osx10.12 on x86_64 (Homebrew)

最初に、読み込む先となる sample データベースを用意しておく。

$ mysql -u root -e "CREATE DATABASE IF NOT EXISTS sample"

コマンドラインから読み込む

まずは通常のシェルから mysql コマンドを使って読み込む方法について。

mysql コマンドを使って読みこむにはリダイレクトを使うだけで良い。

$ mysql -u root -D sample < sample.sql

これだけでテーブルができてレコードが追加されている。

$ mysql -u root -D sample -e "SELECT * FROM users"
+------+-------+------+
| id   | name  | age  |
+------+-------+------+
|    1 | Alice |   20 |
|    2 | Bob   |   30 |
|    3 | Carol |   40 |
+------+-------+------+

シェルから読み込む

次に mysql コマンドのインタラクティブシェルに落ちて読み込む方法について。

$ mysql -u root -D sample

ひとまず、先ほど読み込んだ内容は一旦削除しておく。

mysql> DROP TABLE users;

mysql コマンドのシェルから SQL の書かれたファイルを読み込むには source コマンドを使う。

mysql> source sample.sql

これでファイルに書かれた SQL が読み込まれる。

mysql> SELECT * FROM users;
+------+-------+------+
| id   | name  | age  |
+------+-------+------+
|    1 | Alice |   20 |
|    2 | Bob   |   30 |
|    3 | Carol |   40 |
+------+-------+------+
3 rows in set (0.00 sec)

あるいは上記の代わりに \. を使っても構わない。

mysql> \. sample.sql

PostgreSQL

次に PostgreSQL の場合。 使ったバージョンは次の通り。

$ psql --version
psql (PostgreSQL) 9.6.3

まずは読み込む先のデータベースを用意しておく。

$ psql -c "DROP DATABASE sample"
$ psql -c "CREATE DATABASE sample"

コマンドラインから読み込む

psql コマンドで読み込む場合には、読ませたいファイルを -f オプションで指定する。

$ psql -d sample -f sample.sql

これで内容が読み込まれた。

$ psql -d sample -c "SELECT * FROM users"
 id | name  | age
----+-------+-----
  1 | Alice |  20
  2 | Bob   |  30
  3 | Carol |  40
(3 rows)

シェルから読み込む

次は psql コマンドのシェルに入って読み込む方法について。

まずはデータベースを指定してインタラクティブシェルに落ちる。

$ psql -d sample

先ほど作ったテーブルは一旦削除しておこう。

sample=# DROP TABLE users;

SQL の書かれたファイルを読み込むには \i を使う。

sample=# \i sample.sql

これで内容が読み込まれた。

sample=# SELECT * FROM users;
 id | name  | age
----+-------+-----
  1 | Alice |  20
  2 | Bob   |  30
  3 | Carol |  40
(3 rows)

SQLite3

次は SQLite3 の場合。 使ったバージョンは次の通り。

$ sqlite3 --version
3.16.0 2016-11-04 19:09:39 0e5ffd9123d6d2d2b8f3701e8a73cc98a3a7ff5f

コマンドラインから読み込む

SQLite3 でコマンドラインから読み込むにはデータベースのファイルを指定しながら MySQL と同様にリダイレクトを使うだけで良い。

$ sqlite3 sample.db < sample.sql

これで読み込まれた。

$ sqlite3 sample.db "SELECT * FROM users"
1|Alice|20
2|Bob|30
3|Carol|40

シェルから読み込む

次にインタラクティブシェルから読み込む方法について。 まずはデータベースのファイルを指定して sqlite3 コマンドを実行することでシェルに落ちる。

$ sqlite3 sample.db

先ほど作ったテーブルは一旦削除しておこう。

sqlite> DROP TABLE users;

SQLite3 のシェルでファイルを読み込むには .read を使う。

sqlite> .read sample.sql

これで内容が読み込まれた。

sqlite> SELECT * FROM users;
1|Alice|20
2|Bob|30
3|Carol|40

いじょう。

Apache Spark を完全分散モードの YARN クラスタで動かす

Apache Spark を使って複数ノードで分散並列処理をする場合、まずは動作させるためのクラスタマネージャを選ぶことになる。 Apache Spark では以下のクラスタマネージャに対応している。

  • Apache Spark 組み込み (これはスタンドアロンモードと呼ばれる)
  • Apache Hadoop YARN
  • Apache Mesos

今回は、その中で二番目の Apache Hadoop の提供する YARN を使ってみる。 また、なるべく実環境に近いものを作りたいので Apache Hadoop は完全分散モードを使うことにした。 そのため、まず前提として次のエントリを元に Hadoop クラスタが組まれていることが前提となる。

blog.amedama.jp

Apache Hadoop を設定する

Apache Spark のクラスタマネージャに YARN を使うときのポイントは次の環境変数が設定されていること。

$ echo $HADOOP_HOME
/home/vagrant/hadoop-2.8.0
$ echo $HADOOP_CONF_DIR
/home/vagrant/hadoop-2.8.0/etc/hadoop

また、上記の手順で構築した場合、メモリが少ないと Apache Spark を動作させたときエラーになってしまう。 これは、実行前のチェックによってマシンに積まれているメモリを一定の割合以上使うことができないため。 そこで、次のようにして設定ファイルを編集することで、そのチェックを無効にしてやる。

$ cat << 'EOF' > /tmp/yarn-site.xml.property
  <property>
    <name>yarn.nodemanager.pmem-check-enabled</name>
    <value>false</value>
  </property>
  <property>
    <name>yarn.nodemanager.vmem-check-enabled</name>
    <value>false</value>
  </property>
EOF
$ sed -i -e '
  /^<configuration>$/r /tmp/yarn-site.xml.property
  /^$/d
' $HADOOP_HOME/etc/hadoop/yarn-site.xml

設定ファイルを編集したので、それを各ノードに配布する。 そして NodeManager のプロセスも設定を読み直すために再起動が必要になるのでプロセスを止める。

$ for node in node1 node2
do
  scp $HADOOP_HOME/etc/hadoop/yarn-site.xml $node:$HADOOP_HOME/etc/hadoop/
  ssh $node 'pkill -f NodeManager'
done

そして、改めて各ノードの NodeManager を起動してやる。

$ $HADOOP_HOME/sbin/start-yarn.sh
starting yarn daemons
resourcemanager running as process 19527. Stop it first.
node1: starting nodemanager, logging to /home/vagrant/hadoop-2.8.0/logs/yarn-vagrant-nodemanager-node1.out
node2: starting nodemanager, logging to /home/vagrant/hadoop-2.8.0/logs/yarn-vagrant-nodemanager-node2.out

あとはユーザのホームディレクトリを作成しておこう。

$ $HADOOP_HOME/bin/hdfs dfs -mkdir -p .

これで Hadoop クラスタ側の準備はできた。

Apache Spark を設定する

次に Apache Spark をインストールした上で設定する。

まずは公式サイトからバイナリをダウンロードして解凍する。

$ wget https://d3kbcqa49mib13.cloudfront.net/spark-2.1.1-bin-hadoop2.7.tgz
$ tar xf spark-2.1.1-bin-hadoop2.7.tgz

上記のディレクトリを環境変数 SPARK_HOME に設定しておこう。

$ cat << 'EOF' >> ~/.bashrc
export SPARK_HOME=~/spark-2.1.1-bin-hadoop2.7
EOF
$ source ~/.bashrc

これだけで Apache Spark を使う準備は整った。

サンプルコードを動かしてみる

準備ができたので、試しに Apache Spark に同梱されているサンプルコードを動かしてみよう。 例えば、次のようにすると円周率を計算するプログラムを起動できる。

$ $SPARK_HOME/bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master yarn \
  $SPARK_HOME/examples/jars/spark-examples_*.jar \
  1000
...(snip)...
Pi is roughly 3.1417053514170536
...(snip)

エラーにならず円周率は大体 3.1417… ということが計算できた。

PySpark を使ってみる

次は Apache Spark を PySpark と呼ばれる Python で使えるインタラクティブシェルから使ってみる。 そして、分散並列処理のハローワールドとも言えるワードカウントを書いてみることにしよう。

ワードカウントの対象としては Apache Spark の README ファイルを使うことにした。 次のようにして、まずは HDFS にファイルをコピーする。

$ $HADOOP_HOME/bin/hdfs dfs -put $SPARK_HOME/README.md .

分散並列処理をするには、対象のファイルが全てのノードから参照できる状態にないといけない。 今回は HDFS にコピーしたけど Amazon S3 とか対応しているファイルシステムは色々とある。

pyspark コマンドを使って PySpark のインタラクティブシェルを起動しよう。 ここでポイントとなるのは --master オプションで yarn を指定すること。 これでローカルではなく YARN を使った分散実行がされるようになる。

$ $SPARK_HOME/bin/pyspark --master yarn
...(snip)...
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.1
      /_/

Using Python version 2.7.5 (default, Nov  6 2016 00:28:07)
SparkSession available as 'spark'.
>>>

HDFS に先ほどコピーしたファイルを読み込もう。 ファイルの読み込みは SparkContext#textFile() を使う。

>>> textfile = sc.textFile('hdfs://master:9000/user/vagrant/README.md')

これで HDFS のファイルのデータが各ノードに読み込まれた。

>>> textfile.first()
u'# Apache Spark'

まずはテキストを RDD#flatMap() を使ってスペースで分割する。 こうすると map した結果から得られたリストを展開してくれる。

>>> words = textfile.flatMap(lambda line: line.split(' '))

すると、こんな風になる。

>>> words.take(5)
[u'#', u'Apache', u'Spark', u'', u'Spark']

上記には空白のように意味のない単語も含まれるので、それを RDD#filter() で取り除く。

>>> valid_words = words.filter(lambda word: word)
>>> valid_words.take(5)
[u'#', u'Apache', u'Spark', u'Spark', u'is']

次に、各単語を RDD#map() を使ってキー・バリュー形式にする。 キーは単語でバリューは登場回数になる。

>>> keyvalues = valid_words.map(lambda word: (word, 1))
>>> keyvalues.first()
(u'#', 1)

最後に、各キーごとに RDD#reduceByKey() を使ってバリューを集計する。 これで各単語の出現回数が集計できる。

>>> word_count = keyvalues.reduceByKey(lambda a, b: a + b)
>>> word_count.take(3)
[(u'storage', 1), (u'"local"', 1), (u'including', 4)]

あとは集計結果を HDFS に書き出そう。

>>> word_count.saveAsTextFile('hdfs://master:9000/user/vagrant/output')

使い終わったらインタラクティブシェルから抜ける。

>>> exit()

HDFS に書き出された内容を確認してみよう。 どうやら、ちゃんと単語ごとの出現回数がカウントできているようだ。

$ $HADOOP_HOME/bin/hdfs dfs -cat output/*
(u'storage', 1)
(u'"local"', 1)
(u'including', 4)
...(snip)...
(u'<class>', 1)
(u'learning,', 1)
(u'latest', 1)

PySpark で Python スクリプトを実行する

先ほどはインタラクティブシェルを使って PySpark を使ったけど、もちろんスクリプトファイルから実行することもできる。

一旦、先ほど書き出した HDFS のディレクトリは削除しておこう。

$ $HADOOP_HOME/bin/hdfs dfs -rm -r -f output

次のスクリプトは、先ほどのワードカウントをスクリプトファイルにしたもの。 ポイントとしては、インタラクティブシェルでは最初からインスタンス化されていた SparkContext を自分で用意しなきゃいけないところ。 インタラクティブシェルではオプションで実行するモードを YARN にしていた代わりに、この場合は SparkConf で設定する必要がある。 とはいえ、他の部分は特に変わらない。

$ cat << 'EOF' > /var/tmp/wc.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-

from pyspark import SparkConf
from pyspark import SparkContext


def main():
    conf = SparkConf()
    conf.setMaster('yarn')
    conf.setAppName('WordCount')

    sc = SparkContext(conf=conf)

    textfile = sc.textFile('hdfs://master:9000/user/vagrant/README.md')

    words = textfile.flatMap(lambda line: line.split(' '))
    valid_words = words.filter(lambda word: word)
    keyvalues = valid_words.map(lambda word: (word, 1))
    word_count = keyvalues.reduceByKey(lambda a, b: a + b)

    word_count.saveAsTextFile('hdfs://master:9000/user/vagrant/output')


if __name__ == '__main__':
    main()
EOF

実行する

スクリプトファイルを実行するときは spark-submit コマンドを使う。 このとき --py-files オプションを使って実行に必要なスクリプトファイルを、あらかじめノードに配布しておく。

$ $SPARK_HOME/bin/spark-submit \
  --master yarn \
  --py-files /var/tmp/wc.py \
  /var/tmp/wc.py

上記の実行が上手くいったら結果を出力したディレクトリを確認してみよう。

$ $HADOOP_HOME/bin/hdfs dfs -cat output/*
(u'storage', 1)
(u'"local"', 1)
(u'including', 4)
...(snip)...
(u'<class>', 1)
(u'learning,', 1)
(u'latest', 1)

実行したアプリケーションの確認

ちなみに Apache Spark が YARN 経由で実行したアプリケーションの情報は Hadoop の管理画面から状態を確認できる。

http://192.168.33.10:8088/

上記クラスタの構築を自動化する

ちなみに上記を全て手動で構築するのは手間がかかるので Vagrantfile を書いてみた。 次のようにして実行する。 メモリを 2GB 積んだ仮想マシンを 3 台起動するので、最低でも 8GB できれば 16GB のメモリを積んだ物理マシンで実行してもらいたい。

$ git clone https://gist.github.com/fefb9831e9f032ef264d8d517df57cb4.git spark-on-yarn
$ cd spark-on-yarn
$ sh boot.sh

vagrant ssh コマンドでマスターサーバにログインできる。

$ vagrant ssh

いじょう。

参考

Apache Spark については次の本を読んで勉強してみた。

初めてのSpark

初めてのSpark

  • 作者: Holden Karau,Andy Konwinski,Patrick Wendell,Matei Zaharia,Sky株式会社玉川竜司
  • 出版社/メーカー: オライリージャパン
  • 発売日: 2015/08/22
  • メディア: 大型本
  • この商品を含むブログ (4件) を見る

Python: pep8 は pycodestyle になったし pep257 は pydocstyle になった

意外とまだあんまり知られていないような気がしたので、このブログにも書いておく。

PEP8 と pep8 と pycodestyle

Python には PEP8 という有名なコーディングスタイルガイドラインがある。

www.python.org

そして、そのコーディングスタイルに沿ったコードになっているのかをチェックするツールとして pep8 というパッケージがあった。

pypi.python.org

過去形にするのは半分正しくなくて、上記のように今もある。 ただ、これは後方互換のために残されているだけで、もうバージョンアップはされないだろう。

今後は代わりに pycodestyle というパッケージを使うことになる。

pypi.python.org

これは単にパッケージとコマンドの名前が変わっただけ。 とはいえ、こちらはバージョンアップが続くので最新の PEP8 に追従していくしチェックできる範囲も増えていくはず。 (PEP に書かれている内容は必要に応じて更新される)

試しに使ってみることにしよう。 まずは Python のパッケージマネージャである pip でインストールする。

$ pip install pycodestyle

サンプルコードとして PEP8 違反がいくつか含まれるものを用意した。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# 改行が足りない
def greeting(name = 'World'):  # 余分なスペースがある
    print('Hello, {name}'.format(name=name))


def main():
  greeting()  # インデントが 4 スペースでない


if __name__ == '__main__':
    main()

# 改行が多い

pycodestyle コマンドで上記のコードをチェックしてみる。

$ pycodestyle sample.py
sample.py:5:1: E302 expected 2 blank lines, found 1
sample.py:5:18: E251 unexpected spaces around keyword / parameter equals
sample.py:5:20: E251 unexpected spaces around keyword / parameter equals
sample.py:10:3: E111 indentation is not a multiple of four
sample.py:17:1: W391 blank line at end of file

色々と PEP8 に準拠していない箇所が見つかった。

PEP257 と pep257 と pydocstyle

同じことが pep257pydocstyle にも起きている。

PEP257 は docstring のフォーマットを規定したドキュメントの名前を指している。

www.python.org

そして、PEP8 と同じように pep257 というチェックツールがあった。

pypi.python.org

そして、同じように pydocstyle という名前に変更されている。

pypi.python.org

こちらも試しに使ってみることにしよう。 pip でさくっと入る。

$ pip install pydocstyle

そして PEP257 に違反している箇所を含むサンプルコードを用意した。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# そもそも docstring がない


def greeting(name='World'):
    '''ダブルクォートを使っていない'''
    print('Hello, {name}'.format(name=name))


def main():
    """
        改行が多い
    """
    greeting()


if __name__ == '__main__':
    main()

pydocstyle コマンドで上記のコードをチェックしてみよう。

$ pydocstyle sample.py
sample.py:1 at module level:
        D100: Missing docstring in public module
sample.py:7 in public function `greeting`:
        D300: Use """triple double quotes""" (found '''-quotes)
sample.py:7 in public function `greeting`:
        D400: First line should end with a period (not '')
sample.py:12 in public function `main`:
        D200: One-line docstring should fit on one line with quotes (found 3)
sample.py:12 in public function `main`:
        D208: Docstring is over-indented
sample.py:12 in public function `main`:
        D400: First line should end with a period (not '')

PEP257 に違反している箇所が見つかった。

周辺ツールの対応

リネームされたパッケージを依存パッケージに持つ主要なパッケージはどうなっているかなー、というのも一応は確認しておく。 例えば flake8 とか autopep8 について pipdeptree で見てみよう。

$ pip install flake8 autopep8 pipdeptree

flake8pycodestyle を使うようになっている。

$ pipdeptree | grep -A 3 flake8
flake8==3.3.0
  - mccabe [required: <0.7.0,>=0.6.0, installed: 0.6.1]
  - pycodestyle [required: >=2.0.0,<2.4.0, installed: 2.3.1]
  - pyflakes [required: >=1.5.0,<1.6.0, installed: 1.5.0]

autopep8pycodestyle を使うようになっていた。

$ pipdeptree | grep -A 1 autopep8
autopep8==1.3.2
  - pycodestyle [required: >=2.3, installed: 2.3.1]

間接的に使っている場合には特に対応する必要はなさそうだ。

まとめ

  • pep8pycodestyle に名前が変わった
  • pep257pydocstyle に名前が変わった
  • flake8 とかで間接的に使っている分には特に対応する必要はない

参考

事の発端は、このチケットらしい。

Please rename this tool · Issue #466 · PyCQA/pycodestyle · GitHub

いじょう。

Python: ... (Ellipsis) は任意の処理を示すのにも便利かも

PEP 484 – Type Hints を読んで「なるほど、こういう使い方もあるのか」と気づかれたのでブログに書いておく。 尚、このエントリの内容を実行するには Python 3 以降が必要となる。

使った Python のバージョンは次の通り。

$ python --version
Python 3.6.1

Ellipsis について

Python 3 には Ellipsis というオブジェクトがある。 これはドットを三つ連続させたもので得られる。

>>> ...
Ellipsis

これの使いみちとしてはコンテナオブジェクトでスライスと共に用いられることが多い。 Ellipsis 自体の解説は以前こちらのエントリで紹介している。

blog.amedama.jp

任意の処理を示すためのコードについて

ところで、これまで概念的な説明をするときに書くコードも、なるべく実行できるように書くようにしていた。

例えば、関数ならこんな感じで書くことが多かったように思う。 ここには任意の処理が入りますよ、みたいなのを説明するのにはコメントを使って、中身は pass にするとか。

def f():
    # Do something
    pass

これは、コメントだけだと文法的に正しくないため。

>>> def f():
...     # Do something
...
  File "<stdin>", line 3

    ^
IndentationError: expected an indented block

あるいは docstring を利用して、こんな風にすることもできるかもしれない。

def f():
    """Do something"""

docstring は実のところ単なる文字列なので、それが式になることでこの場合は前述したようなエラーにならない。

任意の処理を … (Ellipsis) で表現する

上記でも伝わりさえすれば問題はないんだけど Ellipsis を使えばさらに分かりやすくなる気がする。

def f():
    # Do something
    ...

Ellipsis 単独でもちゃんとした式なので、上記は Python 3 の処理系がちゃんと解釈できる。

あるいは「この変数には何かが入るよ」みたいな処理も Ellipsis で表現できる。

>>> variable = ...

これも variable に Ellipsis を代入しているだけなので、もちろん有効な Python コードになる。

問題は … が Ellipsis オブジェクトになるのが Python 3 から、っていうところかな。 上記は Python 2 では使うことができない。