CUBE SUGAR CONTAINER

技術系のこと書きます。

SQL: UNION を使ってテーブルを縦に連結する

今回は SQL の UNION を使ってみる。

試した環境は次の通り。

$ sw_vers 
ProductName:    Mac OS X
ProductVersion: 10.12.5
BuildVersion:   16F73
$ mysql --version
mysql  Ver 15.1 Distrib 10.2.6-MariaDB, for osx10.12 (x86_64) using readline 5.1

典型的な使い方

例えばスキーマ設計において、パフォーマンスの向上などを目的として同じカラムを持ったテーブルが複数あったりすることがある。 特定の月のデータを入れるテーブルとか、あるいは毎月新しいテーブルを作っていくとか、そんな感じ。

次のサンプルでは同じカラムを持ったテーブルが日付で分かれている。

> CREATE TABLE purchases_20170627 (
    ->   purchase_id INTEGER,
    ->   user_id VARCHAR(255)
    -> );
Query OK, 0 rows affected (0.04 sec)

> CREATE TABLE purchases_20170628 (
    ->   purchase_id INTEGER,
    ->   user_id VARCHAR(255)
    -> );
Query OK, 0 rows affected (0.02 sec)

上記のテーブルに、それぞれサンプルのレコードを追加しておこう。

> INSERT INTO
    ->   purchases_20170627
    -> VALUES
    ->   (1, 'Alice'),
    ->   (2, 'Bob'),
    ->   (3, 'Carol');
Query OK, 3 rows affected (0.00 sec)
Records: 3  Duplicates: 0  Warnings: 0

> INSERT INTO
    ->   purchases_20170628
    -> VALUES
    ->   (4, 'Alice'),
    ->   (5, 'Bob');
Query OK, 2 rows affected (0.00 sec)
Records: 2  Duplicates: 0  Warnings: 0

さて、上記のようなテーブルを一つのクエリで処理したいとする。 そんなときこそ今回扱う UNION の出番といえる。 UNION では複数の SELECT 文を繋げて扱うことができる。 もちろん同じカラムという前提はあるけど。

次のクエリでは先ほどの二つのテーブルの内容をまとめて取得している。

> SELECT
    ->   '20170627' AS date,
    ->   purchase_id,
    ->   user_id
    -> FROM purchases_20170627
    -> UNION ALL
    -> SELECT
    ->   '20170628' AS date,
    ->   purchase_id,
    ->   user_id
    -> FROM purchases_20170628;
+----------+-------------+---------+
| date     | purchase_id | user_id |
+----------+-------------+---------+
| 20170627 |           1 | Alice   |
| 20170627 |           2 | Bob     |
| 20170627 |           3 | Carol   |
| 20170628 |           4 | Alice   |
| 20170628 |           5 | Bob     |
+----------+-------------+---------+
5 rows in set (0.00 sec)

同じカラムを持った内容をどんどん縦に繋げていくイメージ。

擬似的なテーブルを作るのに使う

続いては UNION を使って擬似的なテーブルを作るやり方について。 例えば SELECT 文を UNION で繋いでいけば CREATE TABLE ... しなくても擬似的なテーブルが用意できる。

次のクエリでは WITH と共に使うことで擬似的に作ったテーブルの内容を表示している。

> WITH
    -> device_types AS (
    ->   SELECT 1 AS device_type, 'LB' AS device_name
    ->   UNION ALL
    ->   SELECT 2 AS device_type, 'L2SW' AS device_name
    ->   UNION ALL
    ->   SELECT 3 AS device_type, 'L3SW' AS device_name
    -> )
    -> SELECT
    ->   *
    -> FROM device_types;
+-------------+-------------+
| device_type | device_name |
+-------------+-------------+
|           1 | LB          |
|           2 | L2SW        |
|           3 | L3SW        |
+-------------+-------------+
3 rows in set (0.00 sec)

ちなみに、これまでは UNION ALL を使っていたけど、重複する内容を省きたいときは UNION DISTINC を使う。

> WITH
    -> device_types AS (
    ->   SELECT 1 AS device_type, 'LB' AS device_name
    ->   UNION DISTINCT
    ->   SELECT 1 AS device_type, 'LB' AS device_name
    ->   UNION DISTINCT
    ->   SELECT 3 AS device_type, 'L3SW' AS device_name
    -> )
    -> SELECT
    ->   *
    -> FROM device_types;
+-------------+-------------+
| device_type | device_name |
+-------------+-------------+
|           1 | LB          |
|           3 | L3SW        |
+-------------+-------------+
2 rows in set (0.00 sec)

同じ内容を持ったレコードが削除されている。

UNION で作った擬似的なテーブルも CREATE TABLE ... で作ったテーブルと同じように扱うことができる。 例えば、次のように擬似的なテーブルと関連するようなテーブルを作っておく。

> CREATE TABLE devices(
    ->   id INTEGER,
    ->   type INTEGER
    -> );
Query OK, 0 rows affected (0.02 sec)

> INSERT INTO
    ->   devices
    -> VALUES
    ->   (1, 1),
    ->   (2, 1),
    ->   (3, 2),
    ->   (4, 3);
Query OK, 4 rows affected (0.00 sec)
Records: 4  Duplicates: 0  Warnings: 0

そして両者を JOIN してみよう。

> WITH
    -> device_types AS (
    ->   SELECT 1 AS device_type, 'LB' AS device_name
    ->   UNION ALL
    ->   SELECT 2 AS device_type, 'L2SW' AS device_name
    ->   UNION ALL
    ->   SELECT 3 AS device_type, 'L3SW' AS device_name
    -> )
    -> SELECT
    ->   devices.id,
    ->   device_types.device_name AS device_type
    -> FROM devices
    -> JOIN device_types
    -> ON devices.type = device_types.device_type;
+------+-------------+
| id   | device_type |
+------+-------------+
|    1 | LB          |
|    2 | LB          |
|    3 | L2SW        |
|    4 | L3SW        |
+------+-------------+
4 rows in set (0.01 sec)

ちゃんと上手く JOIN できた。

まあ上記くらいの内容なら擬似的なテーブルを作るよりも CASE を使った方が楽ちんかな。

> SELECT
    ->   devices.id,
    ->   CASE
    ->     WHEN devices.type = 1 THEN 'LB'
    ->     WHEN devices.type = 2 THEN 'L2SW'
    ->     WHEN devices.type = 3 THEN 'L3SW'
    ->   END AS device_type
    -> FROM devices;
+------+-------------+
| id   | device_type |
+------+-------------+
|    1 | LB          |
|    2 | LB          |
|    3 | L2SW        |
|    4 | L3SW        |
+------+-------------+
4 rows in set (0.00 sec)

再帰クエリの中で使う

UNION のもう一つの重要な使い方として再帰クエリ (WITH RECURSIVE) の中での用法がある。 これは再帰的にクエリを実行して得られた内容を結合するのに UNION を使うということ。 再帰クエリについては以下に詳しく書いた。

blog.amedama.jp

ちなみに MySQL 5.7 には再帰クエリが実装されていないけど MariaDB 10.2 なら使える。

まとめ

今回は UNION を使う場面について見てみた。

ビッグデータ分析・活用のためのSQLレシピ

ビッグデータ分析・活用のためのSQLレシピ

  • 作者: 加嵜長門,田宮直人,丸山弘詩
  • 出版社/メーカー: マイナビ出版
  • 発売日: 2017/03/27
  • メディア: 単行本(ソフトカバー)
  • この商品を含むブログを見る

SQL:2003 のウィンドウ関数を MariaDB 10.2 で試す

今回は SQL:2003 の規格で追加されたウィンドウ関数を使ってみる。 この機能を使うとカラムをグループ化して集約関数を使うのが楽になる。

使った環境は次の通り。

$ sw_vers                    
ProductName:    Mac OS X
ProductVersion: 10.12.5
BuildVersion:   16F73
$ mysql --version
mysql  Ver 15.1 Distrib 10.2.6-MariaDB, for osx10.12 (x86_64) using readline 5.1

MariaDB では 10.2.0 からウィンドウ関数が使えるようになっている。

Window Functions - MariaDB Knowledge Base

インストールは Homebrew を使えばさくっといける。

$ brew install mariadb
$ brew services start mariadb

ちなみに MySQL 5.7 ではウィンドウ関数がまだ実装されていない。

サンプル用のデータを用意する

まずは MariaDB のシェルに入る。

$ mysql -u root

サンプル用のデータを入れるためのデータベースを作成する。

> DROP DATABASE IF EXISTS sample;
Query OK, 0 rows affected, 2 warnings (0.01 sec)
> CREATE DATABASE sample;
Query OK, 1 row affected (0.00 sec)
> USE sample
Database changed

サンプル用のテーブルを作る。 今回はボードゲームのレーティングを示すテーブルっぽいものにした。 テーブル名はプレイヤーの方が良かったかな。

> DROP TABLE IF EXISTS users;
Query OK, 0 rows affected, 1 warning (0.00 sec)
> CREATE TABLE users (
    ->   name VARCHAR(255),
    ->   category VARCHAR(255),
    ->   rate INTEGER
    -> );
Query OK, 0 rows affected (0.02 sec)

なんか適当にレコードを追加しておく。 ゲームのカテゴリごとにプレイヤーとレーティングが格納されている。

> INSERT INTO
    ->   users
    -> VALUES
    ->   ('Alice', 'Shogi', 2000),
    ->   ('Bob', 'Igo', 1800),
    ->   ('Carol', 'Shogi', 1800),
    ->   ('Daniel', 'Igo', 1600);
Query OK, 4 rows affected (0.00 sec)
Records: 4  Duplicates: 0  Warnings: 0

なんかこんな感じのデータができた。

> SELECT
    ->   *
    -> FROM users;
+--------+----------+------+
| name   | category | rate |
+--------+----------+------+
| Alice  | Shogi    | 2000 |
| Bob    | Igo      | 1800 |
| Carol  | Shogi    | 1800 |
| Daniel | Igo      | 1600 |
+--------+----------+------+
4 rows in set (0.00 sec)

GROUP BY と共に集約関数を使う

まずはオーソドックスな GROUP BY と一緒に集約関数を使うやり方を見てみる。

カテゴリごとのレーティングを AVG() 関数を使って計算してみよう。

> SELECT
    ->   category,
    ->   AVG(rate) AS avg
    -> FROM users
    -> GROUP BY category;
+----------+-----------+
| category | avg       |
+----------+-----------+
| Igo      | 1700.0000 |
| Shogi    | 1900.0000 |
+----------+-----------+
2 rows in set (0.00 sec)

まあ、見慣れた感じだ。

ちなみに GROUP BY を使うと基本的には SELECT で表示できるカラムが限定される。 具体的には GROUP BY で指定したものか、あるいは集約関数を適用したものしか使うことができない。 次の例では GROUP BY で指定しているわけでも集約関数を適用したわけでもない name カラムを SELECT に指定している。 これはエラーにはなっていないものの、平均しているのに特定のレコードの内容 (name) が表示されていて、あまり意味をなしていない。

> SELECT
    ->   name,
    ->   category,
    ->   AVG(rate) AS avg
    -> FROM users
    -> GROUP BY category;
+-------+----------+-----------+
| name  | category | avg       |
+-------+----------+-----------+
| Bob   | Igo      | 1700.0000 |
| Alice | Shogi    | 1900.0000 |
+-------+----------+-----------+
2 rows in set (0.00 sec)

ウィンドウ関数を使う

それでは続いてウィンドウ関数を使うパターンを紹介する。 ウィンドウ関数では GROUP BY の代わりに集約関数の後に OVER() をつける。

次の例では先ほどと同じようにカテゴリごとのレーティングの平均を計算している。 異なるのは GROUP BY の代わりにウィンドウ関数を使っているところ。 OVER() には PARTITION BY でグループ化するカラムを指定する。

> SELECT
    ->   name,
    ->   category,
    ->   AVG(rate) OVER(PARTITION BY category) AS avg
    -> FROM users;
+--------+----------+-----------+
| name   | category | avg       |
+--------+----------+-----------+
| Alice  | Shogi    | 1900.0000 |
| Bob    | Igo      | 1700.0000 |
| Carol  | Shogi    | 1900.0000 |
| Daniel | Igo      | 1700.0000 |
+--------+----------+-----------+
4 rows in set (0.01 sec)

ウィンドウ関数を使う場合は全てのレコードに対して集約関数の結果が表示されている。 また、もちろん AVG() だけでなく SUM()COUNT() といった集約関数でも同じように使える。

RANK/ROW_NUMBER/DENSE_RANK

また、ウィンドウ関数では新たに使える集約関数が増えている。 例えば RANK() を使うと特定のカラムの内容に応じて順位をつけたりできる。 次の例では rate の内容に応じてソートした上で、それに順位をつけている。

> SELECT
    ->   name,
    ->   rate,
    ->   RANK() OVER(ORDER BY rate DESC) AS rank
    -> FROM users;
+--------+------+------+
| name   | rate | rank |
+--------+------+------+
| Alice  | 2000 |    1 |
| Bob    | 1800 |    2 |
| Carol  | 1800 |    2 |
| Daniel | 1600 |    4 |
+--------+------+------+
4 rows in set (0.00 sec)

ちなみに、同じ値のときでも異なる番号を振りたいときは ROW_NUMBER() を使う。

> SELECT
    ->   name,
    ->   rate,
    ->   ROW_NUMBER() OVER(ORDER BY rate DESC) AS rank
    -> FROM users;
+--------+------+------+
| name   | rate | rank |
+--------+------+------+
| Alice  | 2000 |    1 |
| Bob    | 1800 |    2 |
| Carol  | 1800 |    3 |
| Daniel | 1600 |    4 |
+--------+------+------+
4 rows in set (0.00 sec)

同じ順位があったとき、その分の順位を飛ばさないようにするには DENSE_RANK() を使う。

> SELECT
    ->   name,
    ->   rate,
    ->   DENSE_RANK() OVER(ORDER BY rate DESC) AS rank
    -> FROM users;
+--------+------+------+
| name   | rate | rank |
+--------+------+------+
| Alice  | 2000 |    1 |
| Bob    | 1800 |    2 |
| Carol  | 1800 |    2 |
| Daniel | 1600 |    3 |
+--------+------+------+
4 rows in set (0.01 sec)

PARTITION BY と ORDER BY を組み合わせて使う

先ほどの例だとカテゴリの違いを無視して順位付けをしたので、ちょっと不自然だったかもしれない。 グループ化した上で順位をつけたいときは、次のように PARTITION BYORDER BY を組み合わせて使う。

> SELECT
    ->   name,
    ->   category,
    ->   rate,
    ->   RANK() OVER(PARTITION BY category ORDER BY rate DESC) AS rank
    -> FROM users;
+--------+----------+------+------+
| name   | category | rate | rank |
+--------+----------+------+------+
| Alice  | Shogi    | 2000 |    1 |
| Bob    | Igo      | 1800 |    1 |
| Carol  | Shogi    | 1800 |    2 |
| Daniel | Igo      | 1600 |    2 |
+--------+----------+------+------+
4 rows in set (0.00 sec)

LAG/LEAD

それ以外にも LAG()LEAD() を使うと現在のレコードの前後のレコードが取得できる。

> SELECT
    ->   name,
    ->   category,
    ->   rate,
    ->   -- 一つ前のレコードを得る
    ->   LAG(name) OVER(ORDER BY rate DESC) AS prev1,
    ->   -- 二つ前のレコードを得る
    ->   LAG(name, 2) OVER(ORDER BY rate DESC) AS prev2,
    ->   -- 一つ先のレコードを得る
    ->   LEAD(name) OVER(ORDER BY rate DESC) AS next1,
    ->   -- 二つ先のレコードを得る
    ->   LEAD(name, 2) OVER(ORDER BY rate DESC) AS next2
    -> FROM users;
+--------+----------+------+-------+-------+--------+--------+
| name   | category | rate | prev1 | prev2 | next1  | next2  |
+--------+----------+------+-------+-------+--------+--------+
| Alice  | Shogi    | 2000 | NULL  | NULL  | Bob    | Carol  |
| Bob    | Igo      | 1800 | Alice | NULL  | Carol  | Daniel |
| Carol  | Shogi    | 1800 | Bob   | Alice | Daniel | NULL   |
| Daniel | Igo      | 1600 | Carol | Bob   | NULL   | NULL   |
+--------+----------+------+-------+-------+--------+--------+
4 rows in set (0.00 sec)

ROWS … で集計する範囲を指定する

また、集約関数の対象となる範囲を ROWS ... で指定できる。

例えば、次のようにすると現在のレコードの前後一つずつを SUM() で合計できる。 PRECEDING の前に指定するのが前いくつ分を対象にするか。 そして FOLLOWING の前に指定するのが後ろいくつ分を対象にするかになる。

> SELECT
    ->   name,
    ->   category,
    ->   rate,
    ->   -- 前後一つずつのレートを合計する
    ->   SUM(rate) OVER(
    ->     ORDER BY rate DESC
    ->     ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING
    ->   ) AS sum_neighbor
    -> FROM users;
+--------+----------+------+--------------+
| name   | category | rate | sum_neighbor |
+--------+----------+------+--------------+
| Alice  | Shogi    | 2000 | 3800         |
| Bob    | Igo      | 1800 | 5600         |
| Carol  | Shogi    | 1800 | 5200         |
| Daniel | Igo      | 1600 | 3400         |
+--------+----------+------+--------------+
4 rows in set (0.00 sec)

変数となっているところを UNBOUNDED にすると無制限になる。

> SELECT
    ->   name,
    ->   category,
    ->   rate,
    ->   SUM(rate) OVER(
    ->     ORDER BY rate DESC
    ->     ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
    ->   ) AS sum_neighbor
    -> FROM users;
+--------+----------+------+--------------+
| name   | category | rate | sum_neighbor |
+--------+----------+------+--------------+
| Alice  | Shogi    | 2000 | 7200         |
| Bob    | Igo      | 1800 | 7200         |
| Carol  | Shogi    | 1800 | 7200         |
| Daniel | Igo      | 1600 | 7200         |
+--------+----------+------+--------------+
4 rows in set (0.00 sec)

上手く使えば、こんな感じで累積の値を簡単に集計できる。

> SELECT
    ->   name,
    ->   category,
    ->   rate,
    ->   SUM(rate) OVER(
    ->     ORDER BY rate DESC
    ->     ROWS BETWEEN UNBOUNDED PRECEDING AND 0 FOLLOWING
    ->   ) AS cumulative_sum
    -> FROM users;
+--------+----------+------+----------------+
| name   | category | rate | cumulative_sum |
+--------+----------+------+----------------+
| Alice  | Shogi    | 2000 | 2000           |
| Bob    | Igo      | 1800 | 3800           |
| Carol  | Shogi    | 1800 | 5600           |
| Daniel | Igo      | 1600 | 7200           |
+--------+----------+------+----------------+
4 rows in set (0.00 sec)

この ROWS ... の指定をしないと挙動が変わってしまう集約関数もあるようなので気をつけたい。

> SELECT
    ->   name,
    ->   category,
    ->   rate,
    ->   SUM(rate) OVER(
    ->     ORDER BY rate DESC
    ->   ) AS sum_neighbor
    -> FROM users;
+--------+----------+------+--------------+
| name   | category | rate | sum_neighbor |
+--------+----------+------+--------------+
| Alice  | Shogi    | 2000 | 2000         |
| Bob    | Igo      | 1800 | 5600         |
| Carol  | Shogi    | 1800 | 5600         |
| Daniel | Igo      | 1600 | 7200         |
+--------+----------+------+--------------+
4 rows in set (0.00 sec)

FIRST_VALUE/LAST_VALUE

例えば最初に登場する要素と最後に登場する要素を取得する FIRST_VALUE()LAST_VALUE()ROWS ... の指定がいるようだ。

> SELECT
    ->   name,
    ->   category,
    ->   rate,
    ->   -- レートでソートして一番最初にくる内容を表示する
    ->   FIRST_VALUE(name) OVER(
    ->     ORDER BY rate DESC
    ->     ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
    ->   ) AS first,
    ->   -- レートでソートして一番最後にくる内容を表示する
    ->   LAST_VALUE(name) OVER(
    ->     ORDER BY rate DESC
    ->     ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
    ->   ) AS last
    -> FROM users;
+--------+----------+------+-------+--------+
| name   | category | rate | first | last   |
+--------+----------+------+-------+--------+
| Alice  | Shogi    | 2000 | Alice | Daniel |
| Bob    | Igo      | 1800 | Alice | Daniel |
| Carol  | Shogi    | 1800 | Alice | Daniel |
| Daniel | Igo      | 1600 | Alice | Daniel |
+--------+----------+------+-------+--------+
4 rows in set (0.00 sec)

ROWS ... をコメントアウトすると、次のように意図した挙動にならない。

> SELECT
    ->   name,
    ->   category,
    ->   rate,
    ->   -- ROWS ... を入れないと上手く動作しない場合がある
    ->   FIRST_VALUE(name) OVER(
    ->     ORDER BY rate DESC
    ->     -- ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
    ->   ) AS first,
    ->   -- ROWS ... を入れないと上手く動作しない場合がある
    ->   LAST_VALUE(name) OVER(
    ->     ORDER BY rate DESC
    ->     -- ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
    ->   ) AS last
    -> FROM users;
+--------+----------+------+-------+--------+
| name   | category | rate | first | last   |
+--------+----------+------+-------+--------+
| Alice  | Shogi    | 2000 | Alice | Alice  |
| Bob    | Igo      | 1800 | Alice | Carol  |
| Carol  | Shogi    | 1800 | Alice | Carol  |
| Daniel | Igo      | 1600 | Alice | Daniel |
+--------+----------+------+-------+--------+
4 rows in set (0.00 sec)

まとめ

SQL:2003 の規格で追加されたウィンドウ関数を使うとグループ化して集約関数を適用するのが楽にできる。 ただし、集約関数の適用範囲を決める ROWS ... の挙動には注意が必要となる。

ビッグデータ分析・活用のためのSQLレシピ

ビッグデータ分析・活用のためのSQLレシピ

  • 作者: 加嵜長門,田宮直人,丸山弘詩
  • 出版社/メーカー: マイナビ出版
  • 発売日: 2017/03/27
  • メディア: 単行本(ソフトカバー)
  • この商品を含むブログを見る

Apache Hive を JDBC 経由で操作する

以前、Apache Hive の環境構築についてこのブログで書いた。

blog.amedama.jp

上記では Hive を操作するのに、同梱されたシェルを使っていた。 今回は JDBC (Java Database Connectivity) を使って操作する例を示す。

環境については先ほど紹介したエントリで構築されていることが前提になっている。 具体的には、次の環境変数が設定されているところがポイント。

$ echo $HADOOP_CONF_DIR
/home/vagrant/hadoop-2.8.0/etc/hadoop
$ echo $HIVE_HOME
/home/vagrant/apache-hive-1.2.2-bin

HiveServer2 を起動する

ぶっちゃけ JDBC 経由で Hive を操作するのに必要なのは HiveServer2 を起動することだけ。

$ $HIVE_HOME/bin/hiveserver2

あるいは hive コマンドに --service オプションで hiveserver2 を指定しても起動できる。

$ $HIVE_HOME/bin/hive --service hiveserver2

これで TCP:10000 で JDBC 用のポートを Listen し始める。

$ sudo netstat -tlnp | grep 10000
tcp        0      0 0.0.0.0:10000           0.0.0.0:*               LISTEN      22315/java

beeline で動作を確認する

HiveServer2 が起動できたら、まずは Hive に同梱されている beeline というシェルを使って接続してみよう。

次のようにして接続する。

$ $HIVE_HOME/bin/beeline -u jdbc:hive2://master:10000 -n vagrant

すると、次のようにインタラクティブシェルが起動する。

0: jdbc:hive2://master:10000>

あとは、ここに Hive で実行したい SQL を入力していくだけ。

> SHOW DATABASES;
+----------------+--+
| database_name  |
+----------------+--+
| default        |
+----------------+--+
1 row selected (1.439 seconds)

次のような感じで普通に使える。

> CREATE TABLE users (name STRING);
No rows affected (0.372 seconds)
> SHOW TABLES;
+-----------+--+
| tab_name  |
+-----------+--+
| users     |
+-----------+--+
1 row selected (0.053 seconds)
> INSERT INTO users VALUES ("Alice");
...(snip)...
No rows affected (19.367 seconds)
> SELECT * FROM users;
+-------------+--+
| users.name  |
+-------------+--+
| Alice       |
+-------------+--+
1 row selected (0.192 seconds)
> DROP TABLE users;
No rows affected (1.088 seconds)

Apache Spark から使ってみる

試しに Apache Spark からも接続してみよう。

まずはバイナリをダウンロードしてきて展開する。

$ wget https://d3kbcqa49mib13.cloudfront.net/spark-2.1.1-bin-hadoop2.7.tgz
$ tar xf spark-2.1.1-bin-hadoop2.7.tgz

環境変数 SPARK_HOME を設定しておく。

$ cat << 'EOF' >> ~/.bashrc
export SPARK_HOME=~/spark-2.1.1-bin-hadoop2.7
EOF
$ source ~/.bashrc

Apache Spark のディレクトリ内には Hive に JDBC で接続するための jar ファイルがある。 なので、特に新しい jar ファイルを用意しなくても接続できる。

$ find $SPARK_HOME/jars | grep hive-jdbc
/home/vagrant/spark-2.1.1-bin-hadoop2.7/jars/hive-jdbc-1.2.1.spark2.jar

クラスタマネージャに YARN を指定して Spark シェルを起動しよう。

$ $SPARK_HOME/bin/spark-shell --master yarn

Hive 用の JDBC ドライバをロードする。

scala> Class.forName("org.apache.hive.jdbc.HiveDriver");
res0: Class[_] = class org.apache.hive.jdbc.HiveDriver

SQL のコネクションを取得するための DriverManager をインポートする。

scala> import java.sql.DriverManager
import java.sql.DriverManager

先ほど beeline で使ったのと同じ URL を指定してコネクションを取得する。

scala> val connection = DriverManager.getConnection("jdbc:hive2://master:10000", "vagrant", "");
connection: java.sql.Connection = org.apache.hive.jdbc.HiveConnection@1a4564a2

そしてコネクションからステートメントを得る。

scala> val statement = connection.createStatement();
statement: java.sql.Statement = org.apache.hive.jdbc.HiveStatement@7a085d02

あとは、ここで SQL を実行すれば良い。 結果は ResultSet として得られる。

scala> val resultSet = statement.executeQuery("SHOW DATABASES");
resultSet: java.sql.ResultSet = org.apache.hive.jdbc.HiveQueryResultSet@3100b5cf

あとは ResultSet から結果を取り出す。

scala> resultSet.next()
res1: Boolean = true
scala> resultSet.getString("DATABASE_NAME")
res2: String = default

おまけ (SparkSQL)

とはいえ Apache Spark は SparkSQL があるので上記のように JDBC 経由で Hive を使わなくても SQL が実行できるっぽい。

まずは SparkSession をインポートしておく。

scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession

その上で Hive サポートを有効にした SparkSession を取得する。

scala> val sparkSession = SparkSession.builder.master("yarn").appName("SparkSession with Hive support").enableHiveSupport().getOrCreate()
17/06/22 20:59:35 WARN sql.SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
sparkSession: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@55ba4bff

あとは得られたセッション経由で SQL を実行するだけ。

scala> val df = sparkSession.sql("SHOW DATABASES")
df: org.apache.spark.sql.DataFrame = [databaseName: string]

結果が DataFrame で返ってくるから扱いやすい。

scala> df.show()
+------------+
|databaseName|
+------------+
|     default|
+------------+

まとめ

今回は HiveServer2 を起動することで Apache Hive を JDBC 経由で扱えるようにする方法について書いた。

プログラミング Hive

プログラミング Hive

  • 作者: Edward Capriolo,Dean Wampler,Jason Rutherglen,佐藤直生,嶋内翔,Sky株式会社玉川竜司
  • 出版社/メーカー: オライリージャパン
  • 発売日: 2013/06/15
  • メディア: 大型本
  • この商品を含むブログ (3件) を見る

統計: 偏相関係数で擬似(無)相関の有無を調べる

以前、このブログでは共分散や相関係数について扱ったことがある。 共分散や相関係数というのは、二つの変数間に線形な関係があるかを調べる方法だった。

blog.amedama.jp

しかし、実はただの相関係数では「第三の変数」からの影響を受けてしまう場合がある。 それというのは、第三の変数の存在によって、あたかも相関しているように見える (疑似相関) あるいは相関していないように見える (疑似無相関) というもの。

これは実際の例がないと、なかなか分かりづらいものだと思うんだけど良い例があったので紹介してみる。 今回はプロ野球の打撃成績に潜む疑似無相関を偏相関係数であぶり出してみることにする。

データをスクレイピングする

ひとまずデータがないと話にならないので、まずはスクレイピングしてくるところから始める。

使った環境は次の通り。

$ sw_vers 
ProductName:    Mac OS X
ProductVersion: 10.12.5
BuildVersion:   16F73
$ python --version
Python 3.6.1

今回は全て Python を使って処理する。 使うライブラリは pandasnumpy だけ。 ただし pandas をスクレイピングに使うときは別途依存ライブラリが必要になる。

$ pip install pandas beautifulsoup4 requests lxml html5lib

各年度の選手成績はプロ野球の公式サイトから得られる。

npb.jp

試しに規定打席に達した選手の打撃成績を取得してみよう。 pandas#read_html() にスクレイピングしたい URL を指定する。

>>> import pandas as pd
>>> url = 'http://npb.jp/bis/2016/stats/bat_c.html'
>>> tables = pd.read_html(url)

これだけでページに含まれるテーブルを上手いことパースして DataFrame に落とし込んでくれる。

>>> df_with_header = tables[0]  # 成績のテーブルを取り出す
>>> df_with_header
                            0       1    2     3    4    5    6    7    8   \
0   規定打席 :チーム試合数×3.1 (端数は四捨五入)     NaN  NaN   NaN  NaN  NaN  NaN  NaN  NaN
1                          順 位     選 手  打 率   試 合  打 席  打 数  得 点  安 打  二塁打
2                            1   角中 勝也  (ロ)  .339  143  607  525   74  178
3                            2   西川 遥輝  (日)  .314  138  593  493   76  155
4                            3   浅村 栄斗  (西)  .309  143  611  557   73  172
5                            4   糸井 嘉男  (オ)  .306  143  616  532   79  163
6                            5   柳田 悠岐  (ソ)  .306  120  536  428   82  131
7                            6   内川 聖一  (ソ)  .304  141  605  556   62  169
8                            7   秋山 翔吾  (西)  .296  143  671  578   98  171
9                            8    陽 岱鋼  (日)  .293  130  555  495   66  145
10                           9    中村 晃  (ソ)  .287  143  612  488   69  140
...

十年分のデータを取得する

先ほど実行した内容にもとづいて十年分のデータを取得してみよう。 これは、単年ではデータ点数が少なくてあまり信頼のおけるものではなくなるため。

今回扱うのは「安打数」と「三振数」と「打席数」なので、それを入れる変数を用意しておく。

>>> import numpy as np
>>> h = pd.Series(dtype=np.int64)
>>> bb = pd.Series(dtype=np.int64)
>>> ab = pd.Series(dtype=np.int64)

あとは上記の変数に十年分のセ・パのデータを追加していく。

>>> import itertools
>>> import time
>>> for year, league in itertools.product(range(2006, 2017), ['p', 'c']):
...     url = 'http://npb.jp/bis/{year}/stats/bat_{league}.html'.format(year=year, league=league)  # スクレイピング対象 の URL
...     tables = pandas.read_html(url)  # テーブルを抽出する
...     df_with_header = tables[0]  # 成績のテーブルを取り出す
...     features = df_with_header[2:]  # ヘッダを捨てる
...     h = h.append(features[8])  # 安打数のデータを取り出す
...     bb = bb.append(features[21])  # 三振数のデータを取り出す
...     ab = ab.append(features[5])  # 打席数のデータを取り出す
...     time.sleep(1)  # リクエスト間で負荷をかけないように少し待つ
...

これで 636 人分の選手データが集まった。

>>> len(h)
636
>>> len(bb)
636
>>> len(ab)
636

安打と三振の相関係数を計算する

データが揃ったので、次は相関係数を計算してみる。

仮説として、安打をたくさん打つ選手ほど三振の数は少ないのではないか?と考えて両者の相関係数を調べてみよう。 この仮説が正しければ安打と三振の間には負の相関があるはずだ。

それでは numpy#corrcoef() を使って相関行列を計算させてみよう。

>>> np.corrcoef(h, bb)  # 安打数と三振数の相関行列を計算する
array([[ 1.        ,  0.06108444],
       [ 0.06108444,  1.        ]])

安打と三振の相関係数は 0.06 なので、ほとんど相関がないといえる。 これだけ見ると、仮説は間違っていたのだろうか?と思える。

偏相関係数を計算する

しかし、実際には第三の変数によって疑似無相関になっている可能性がある。

そんなときは第三の変数の影響を無くした相関係数として「偏相関係数」を計算してみよう。 次の数式が偏相関係数を求めるためのもの。

 r_{yz_x} = \frac{r_{yz} - r_{xy} r_{xz}}{\sqrt{1 - r_{xy}^2} \sqrt{1 - r_{xz}^2}}

ここで  r_{ab} は変数 ab の相関係数を表している。 上記は第三の変数 x の影響を取り除いた yz の偏相関係数を求める式になっている。

調べたところ、どうやら Python の既知のライブラリには偏相関係数を計算する実装がないようだ。 なので、ひとまず自前で関数を用意した。

>>> import math
>>> def partial_corrcoef(x, y, z):
...     """第三の変数 x の影響を除いた y と z の相関係数 (偏相関係数) を求める関数"""
...     correlation_matrix = np.corrcoef((x, y, z))
...     r_xy = correlation_matrix[0, 1]
...     r_xz = correlation_matrix[0, 2]
...     r_yz = correlation_matrix[1, 2]
...     r_yz_x = (r_yz - r_xy * r_xz) / (math.sqrt(1 - r_xy ** 2) * math.sqrt(1 - r_xz ** 2))  # noqa
...     return r_yz_x
...

上記の関数を使って「安打数」と「三振数」の相関係数から「打席数」の影響を取り除いてみよう。

>>> partial_corrcoef(ab, h, bb)  # 打席数の影響を取り除いた安打数と三振数の相関係数を計算する
-0.31105000209505901

結果は -0.311 と「弱い負の相関」があることが分かった。 どうやら打席数という第三の変数が影響することで擬似無相関になっていたらしい。

まあ、実際には第三の変数を見つけることが大変だから一通り偏相関係数を求めた上で相関係数との変動の大小を比べるのが良いのかな。

まとめ

  • 単純な相関係数では第三の変数の影響により疑似相関・疑似無相関になっている恐れがある
  • 第三の変数の影響を取り除くには偏相関係数を計算すれば良い

SQL: 内部的なコードを人間に分かりやすいラベルに変換して表示する

RDB のスキーマには、たまに対応表などを参照しながらでないと分からないような内部的なコードが使われていることがある。 大抵はアプリケーションの中で変換して表示するだろうけど、これを直接 SELECT とかで確認しようとすると分かりにくい。 今回は、それを見やすくするためのテクニックについて。

使った環境は次の通り。

$ sw_vers 
ProductName:    Mac OS X
ProductVersion: 10.12.5
BuildVersion:   16F73
$ mysqld --version
mysqld  Ver 5.7.18 for osx10.12 on x86_64 (Homebrew)
$ mysql --version
mysql  Ver 14.14 Distrib 5.7.18, for osx10.12 (x86_64) using  EditLine wrapper

まずは MySQL のインタラクティブシェルに入る。

$ mysql -u root

そしてサンプル用のテーブルを用意しておく。 内部的なコードに対応するのは users.position_code だ。

mysql> DROP TABLE IF EXISTS users;
Query OK, 0 rows affected, 1 warning (0.00 sec)
mysql> CREATE TABLE users (
    ->   id integer,
    ->   name varchar(255),
    ->   age integer,
    ->   position_code integer
    -> );
Query OK, 0 rows affected (0.04 sec)

そして、いくつかレコードを追加しておこう。

mysql> INSERT INTO users
    -> VALUES
    ->   (1, 'Alice', 20, 1),
    ->   (2, 'Bob', 25, 2),
    ->   (3, 'Carol', 30, 3);
Query OK, 3 rows affected (0.00 sec)
Records: 3  Duplicates: 0  Warnings: 0

こんな感じでデータが入る。 この状態では position_code に一体どんな意味があるのか分からない。

mysql> SELECT *
    -> FROM users;
+------+-------+------+---------------+
| id   | name  | age  | position_code |
+------+-------+------+---------------+
|    1 | Alice |   20 |             1 |
|    2 | Bob   |   25 |             2 |
|    3 | Carol |   30 |             3 |
+------+-------+------+---------------+
3 rows in set (0.00 sec)

そこで CASE ~ END を用いて分かりやすいラベルに変換する。 条件を WHEN で指定したら、対応する値を THEN で返せば良い。 どれにも該当しない場合には ELSE を使う。

mysql> SELECT
    ->   id,
    ->   name,
    ->   CASE
    ->     WHEN position_code = 1 THEN '一般職員'
    ->     WHEN position_code = 2 THEN '主任'
    ->     WHEN position_code = 3 THEN '課長'
    ->     ELSE '不明'
    ->   END as position
    -> FROM users;
+------+-------+--------------+
| id   | name  | position     |
+------+-------+--------------+
|    1 | Alice | 一般職員     |
|    2 | Bob   | 主任         |
|    3 | Carol | 課長         |
+------+-------+--------------+
3 rows in set (0.00 sec)

これで分かりやすくなった。

もちろんこのテクニックはコードの変換だけでなく特定の条件に一致するか、みたいな確認にも使える。 次の例は年齢が 30 を超えているかを misoji カラムに表示している。

mysql> SELECT
    ->   id,
    ->   name,
    ->   CASE
    ->     WHEN age >= 30 THEN 'YES'
    ->     ELSE 'NO'
    ->   END as misoji
    -> FROM users;
+------+-------+--------+
| id   | name  | misoji |
+------+-------+--------+
|    1 | Alice | NO     |
|    2 | Bob   | NO     |
|    3 | Carol | YES    |
+------+-------+--------+
3 rows in set (0.00 sec)

めでたしめでたし。

ビッグデータ分析・活用のためのSQLレシピ

ビッグデータ分析・活用のためのSQLレシピ

  • 作者: 加嵜長門,田宮直人,丸山弘詩
  • 出版社/メーカー: マイナビ出版
  • 発売日: 2017/03/27
  • メディア: 単行本(ソフトカバー)
  • この商品を含むブログを見る

ファイルから SQL を読み込む (MySQL, PostgreSQL, SQLite3)

色々な RDBMS で SQL の書かれたファイルを読み込む方法について調べた。

まずはSQL の書かれたテキストファイルを用意しておく。

$ cat << 'EOF' > sample.sql
DROP TABLE IF EXISTS users;
CREATE TABLE users (
  id integer,
  name varchar(255),
  age integer
);
INSERT INTO users
VALUES
  (1, 'Alice', 20),
  (2, 'Bob', 30),
  (3, 'Carol', 40);
EOF

こんな感じ。 users テーブルを作って、そこにレコードを追加している。

$ cat sample.sql
DROP TABLE IF EXISTS users;
CREATE TABLE users (
  id integer,
  name varchar(255),
  age integer
);
INSERT INTO users
VALUES
  (1, 'Alice', 20),
  (2, 'Bob', 30),
  (3, 'Carol', 40);

TL;DR

早見表

RDBMS コマンドライン インタラクティブシェル
MySQL mysql -D [database] < [filepath] source [filepath]
PostgreSQL psql -d [database] -f [filepath] \i [filepath]
SQLite3 sqlite3 [database] < [filepath] .read [filepath]

MySQL

まずは MySQL から。 使ったバージョンは次の通り。

$ mysqld --version
mysqld  Ver 5.7.18 for osx10.12 on x86_64 (Homebrew)

最初に、読み込む先となる sample データベースを用意しておく。

$ mysql -u root -e "CREATE DATABASE IF NOT EXISTS sample"

コマンドラインから読み込む

まずは通常のシェルから mysql コマンドを使って読み込む方法について。

mysql コマンドを使って読みこむにはリダイレクトを使うだけで良い。

$ mysql -u root -D sample < sample.sql

これだけでテーブルができてレコードが追加されている。

$ mysql -u root -D sample -e "SELECT * FROM users"
+------+-------+------+
| id   | name  | age  |
+------+-------+------+
|    1 | Alice |   20 |
|    2 | Bob   |   30 |
|    3 | Carol |   40 |
+------+-------+------+

シェルから読み込む

次に mysql コマンドのインタラクティブシェルに落ちて読み込む方法について。

$ mysql -u root -D sample

ひとまず、先ほど読み込んだ内容は一旦削除しておく。

mysql> DROP TABLE users;

mysql コマンドのシェルから SQL の書かれたファイルを読み込むには source コマンドを使う。

mysql> source sample.sql

これでファイルに書かれた SQL が読み込まれる。

mysql> SELECT * FROM users;
+------+-------+------+
| id   | name  | age  |
+------+-------+------+
|    1 | Alice |   20 |
|    2 | Bob   |   30 |
|    3 | Carol |   40 |
+------+-------+------+
3 rows in set (0.00 sec)

あるいは上記の代わりに \. を使っても構わない。

mysql> \. sample.sql

PostgreSQL

次に PostgreSQL の場合。 使ったバージョンは次の通り。

$ psql --version
psql (PostgreSQL) 9.6.3

まずは読み込む先のデータベースを用意しておく。

$ psql -c "DROP DATABASE sample"
$ psql -c "CREATE DATABASE sample"

コマンドラインから読み込む

psql コマンドで読み込む場合には、読ませたいファイルを -f オプションで指定する。

$ psql -d sample -f sample.sql

これで内容が読み込まれた。

$ psql -d sample -c "SELECT * FROM users"
 id | name  | age
----+-------+-----
  1 | Alice |  20
  2 | Bob   |  30
  3 | Carol |  40
(3 rows)

シェルから読み込む

次は psql コマンドのシェルに入って読み込む方法について。

まずはデータベースを指定してインタラクティブシェルに落ちる。

$ psql -d sample

先ほど作ったテーブルは一旦削除しておこう。

sample=# DROP TABLE users;

SQL の書かれたファイルを読み込むには \i を使う。

sample=# \i sample.sql

これで内容が読み込まれた。

sample=# SELECT * FROM users;
 id | name  | age
----+-------+-----
  1 | Alice |  20
  2 | Bob   |  30
  3 | Carol |  40
(3 rows)

SQLite3

次は SQLite3 の場合。 使ったバージョンは次の通り。

$ sqlite3 --version
3.16.0 2016-11-04 19:09:39 0e5ffd9123d6d2d2b8f3701e8a73cc98a3a7ff5f

コマンドラインから読み込む

SQLite3 でコマンドラインから読み込むにはデータベースのファイルを指定しながら MySQL と同様にリダイレクトを使うだけで良い。

$ sqlite3 sample.db < sample.sql

これで読み込まれた。

$ sqlite3 sample.db "SELECT * FROM users"
1|Alice|20
2|Bob|30
3|Carol|40

シェルから読み込む

次にインタラクティブシェルから読み込む方法について。 まずはデータベースのファイルを指定して sqlite3 コマンドを実行することでシェルに落ちる。

$ sqlite3 sample.db

先ほど作ったテーブルは一旦削除しておこう。

sqlite> DROP TABLE users;

SQLite3 のシェルでファイルを読み込むには .read を使う。

sqlite> .read sample.sql

これで内容が読み込まれた。

sqlite> SELECT * FROM users;
1|Alice|20
2|Bob|30
3|Carol|40

いじょう。

Apache Spark を完全分散モードの YARN クラスタで動かす

Apache Spark を使って複数ノードで分散並列処理をする場合、まずは動作させるためのクラスタマネージャを選ぶことになる。 Apache Spark では以下のクラスタマネージャに対応している。

  • Apache Spark 組み込み (これはスタンドアロンモードと呼ばれる)
  • Apache Hadoop YARN
  • Apache Mesos

今回は、その中で二番目の Apache Hadoop の提供する YARN を使ってみる。 また、なるべく実環境に近いものを作りたいので Apache Hadoop は完全分散モードを使うことにした。 そのため、まず前提として次のエントリを元に Hadoop クラスタが組まれていることが前提となる。

blog.amedama.jp

Apache Hadoop を設定する

Apache Spark のクラスタマネージャに YARN を使うときのポイントは次の環境変数が設定されていること。

$ echo $HADOOP_HOME
/home/vagrant/hadoop-2.8.0
$ echo $HADOOP_CONF_DIR
/home/vagrant/hadoop-2.8.0/etc/hadoop

また、上記の手順で構築した場合、メモリが少ないと Apache Spark を動作させたときエラーになってしまう。 これは、実行前のチェックによってマシンに積まれているメモリを一定の割合以上使うことができないため。 そこで、次のようにして設定ファイルを編集することで、そのチェックを無効にしてやる。

$ cat << 'EOF' > /tmp/yarn-site.xml.property
  <property>
    <name>yarn.nodemanager.pmem-check-enabled</name>
    <value>false</value>
  </property>
  <property>
    <name>yarn.nodemanager.vmem-check-enabled</name>
    <value>false</value>
  </property>
EOF
$ sed -i -e '
  /^<configuration>$/r /tmp/yarn-site.xml.property
  /^$/d
' $HADOOP_HOME/etc/hadoop/yarn-site.xml

設定ファイルを編集したので、それを各ノードに配布する。 そして NodeManager のプロセスも設定を読み直すために再起動が必要になるのでプロセスを止める。

$ for node in node1 node2
do
  scp $HADOOP_HOME/etc/hadoop/yarn-site.xml $node:$HADOOP_HOME/etc/hadoop/
  ssh $node 'pkill -f NodeManager'
done

そして、改めて各ノードの NodeManager を起動してやる。

$ $HADOOP_HOME/sbin/start-yarn.sh
starting yarn daemons
resourcemanager running as process 19527. Stop it first.
node1: starting nodemanager, logging to /home/vagrant/hadoop-2.8.0/logs/yarn-vagrant-nodemanager-node1.out
node2: starting nodemanager, logging to /home/vagrant/hadoop-2.8.0/logs/yarn-vagrant-nodemanager-node2.out

あとはユーザのホームディレクトリを作成しておこう。

$ $HADOOP_HOME/bin/hdfs dfs -mkdir -p .

これで Hadoop クラスタ側の準備はできた。

Apache Spark を設定する

次に Apache Spark をインストールした上で設定する。

まずは公式サイトからバイナリをダウンロードして解凍する。

$ wget https://d3kbcqa49mib13.cloudfront.net/spark-2.1.1-bin-hadoop2.7.tgz
$ tar xf spark-2.1.1-bin-hadoop2.7.tgz

上記のディレクトリを環境変数 SPARK_HOME に設定しておこう。

$ cat << 'EOF' >> ~/.bashrc
export SPARK_HOME=~/spark-2.1.1-bin-hadoop2.7
EOF
$ source ~/.bashrc

これだけで Apache Spark を使う準備は整った。

サンプルコードを動かしてみる

準備ができたので、試しに Apache Spark に同梱されているサンプルコードを動かしてみよう。 例えば、次のようにすると円周率を計算するプログラムを起動できる。

$ $SPARK_HOME/bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master yarn \
  $SPARK_HOME/examples/jars/spark-examples_*.jar \
  1000
...(snip)...
Pi is roughly 3.1417053514170536
...(snip)

エラーにならず円周率は大体 3.1417… ということが計算できた。

PySpark を使ってみる

次は Apache Spark を PySpark と呼ばれる Python で使えるインタラクティブシェルから使ってみる。 そして、分散並列処理のハローワールドとも言えるワードカウントを書いてみることにしよう。

ワードカウントの対象としては Apache Spark の README ファイルを使うことにした。 次のようにして、まずは HDFS にファイルをコピーする。

$ $HADOOP_HOME/bin/hdfs dfs -put $SPARK_HOME/README.md .

分散並列処理をするには、対象のファイルが全てのノードから参照できる状態にないといけない。 今回は HDFS にコピーしたけど Amazon S3 とか対応しているファイルシステムは色々とある。

pyspark コマンドを使って PySpark のインタラクティブシェルを起動しよう。 ここでポイントとなるのは --master オプションで yarn を指定すること。 これでローカルではなく YARN を使った分散実行がされるようになる。

$ $SPARK_HOME/bin/pyspark --master yarn
...(snip)...
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.1
      /_/

Using Python version 2.7.5 (default, Nov  6 2016 00:28:07)
SparkSession available as 'spark'.
>>>

HDFS に先ほどコピーしたファイルを読み込もう。 ファイルの読み込みは SparkContext#textFile() を使う。

>>> textfile = sc.textFile('hdfs://master:9000/user/vagrant/README.md')

これで HDFS のファイルのデータが各ノードに読み込まれた。

>>> textfile.first()
u'# Apache Spark'

まずはテキストを RDD#flatMap() を使ってスペースで分割する。 こうすると map した結果から得られたリストを展開してくれる。

>>> words = textfile.flatMap(lambda line: line.split(' '))

すると、こんな風になる。

>>> words.take(5)
[u'#', u'Apache', u'Spark', u'', u'Spark']

上記には空白のように意味のない単語も含まれるので、それを RDD#filter() で取り除く。

>>> valid_words = words.filter(lambda word: word)
>>> valid_words.take(5)
[u'#', u'Apache', u'Spark', u'Spark', u'is']

次に、各単語を RDD#map() を使ってキー・バリュー形式にする。 キーは単語でバリューは登場回数になる。

>>> keyvalues = valid_words.map(lambda word: (word, 1))
>>> keyvalues.first()
(u'#', 1)

最後に、各キーごとに RDD#reduceByKey() を使ってバリューを集計する。 これで各単語の出現回数が集計できる。

>>> word_count = keyvalues.reduceByKey(lambda a, b: a + b)
>>> word_count.take(3)
[(u'storage', 1), (u'"local"', 1), (u'including', 4)]

あとは集計結果を HDFS に書き出そう。

>>> word_count.saveAsTextFile('hdfs://master:9000/user/vagrant/output')

使い終わったらインタラクティブシェルから抜ける。

>>> exit()

HDFS に書き出された内容を確認してみよう。 どうやら、ちゃんと単語ごとの出現回数がカウントできているようだ。

$ $HADOOP_HOME/bin/hdfs dfs -cat output/*
(u'storage', 1)
(u'"local"', 1)
(u'including', 4)
...(snip)...
(u'<class>', 1)
(u'learning,', 1)
(u'latest', 1)

PySpark で Python スクリプトを実行する

先ほどはインタラクティブシェルを使って PySpark を使ったけど、もちろんスクリプトファイルから実行することもできる。

一旦、先ほど書き出した HDFS のディレクトリは削除しておこう。

$ $HADOOP_HOME/bin/hdfs dfs -rm -r -f output

次のスクリプトは、先ほどのワードカウントをスクリプトファイルにしたもの。 ポイントとしては、インタラクティブシェルでは最初からインスタンス化されていた SparkContext を自分で用意しなきゃいけないところ。 インタラクティブシェルではオプションで実行するモードを YARN にしていた代わりに、この場合は SparkConf で設定する必要がある。 とはいえ、他の部分は特に変わらない。

$ cat << 'EOF' > /var/tmp/wc.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-

from pyspark import SparkConf
from pyspark import SparkContext


def main():
    conf = SparkConf()
    conf.setMaster('yarn')
    conf.setAppName('WordCount')

    sc = SparkContext(conf=conf)

    textfile = sc.textFile('hdfs://master:9000/user/vagrant/README.md')

    words = textfile.flatMap(lambda line: line.split(' '))
    valid_words = words.filter(lambda word: word)
    keyvalues = valid_words.map(lambda word: (word, 1))
    word_count = keyvalues.reduceByKey(lambda a, b: a + b)

    word_count.saveAsTextFile('hdfs://master:9000/user/vagrant/output')


if __name__ == '__main__':
    main()
EOF

実行する

スクリプトファイルを実行するときは spark-submit コマンドを使う。 このとき --py-files オプションを使って実行に必要なスクリプトファイルを、あらかじめノードに配布しておく。

$ $SPARK_HOME/bin/spark-submit \
  --master yarn \
  --py-files /var/tmp/wc.py \
  /var/tmp/wc.py

上記の実行が上手くいったら結果を出力したディレクトリを確認してみよう。

$ $HADOOP_HOME/bin/hdfs dfs -cat output/*
(u'storage', 1)
(u'"local"', 1)
(u'including', 4)
...(snip)...
(u'<class>', 1)
(u'learning,', 1)
(u'latest', 1)

実行したアプリケーションの確認

ちなみに Apache Spark が YARN 経由で実行したアプリケーションの情報は Hadoop の管理画面から状態を確認できる。

http://192.168.33.10:8088/

上記クラスタの構築を自動化する

ちなみに上記を全て手動で構築するのは手間がかかるので Vagrantfile を書いてみた。 次のようにして実行する。 メモリを 2GB 積んだ仮想マシンを 3 台起動するので、最低でも 8GB できれば 16GB のメモリを積んだ物理マシンで実行してもらいたい。

$ git clone https://gist.github.com/fefb9831e9f032ef264d8d517df57cb4.git spark-on-yarn
$ cd spark-on-yarn
$ sh boot.sh

vagrant ssh コマンドでマスターサーバにログインできる。

$ vagrant ssh

いじょう。

参考

Apache Spark については次の本を読んで勉強してみた。

初めてのSpark

初めてのSpark

  • 作者: Holden Karau,Andy Konwinski,Patrick Wendell,Matei Zaharia,Sky株式会社玉川竜司
  • 出版社/メーカー: オライリージャパン
  • 発売日: 2015/08/22
  • メディア: 大型本
  • この商品を含むブログ (4件) を見る