CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: pandas で対応関係を渡して値を変換する

例えばデータセットの中のカラムが文字列型なんかで入っていると、それを数値に直したくなることはよくあると思う。 今回はそれを pandas でやる方法について書く。 結論から先に書くと Series オブジェクトにある map() メソッドを使うと上手くいく。

使った環境は次の通り。

$ python -V
Python 3.6.4
$ pip list --format=columns | grep -i pandas
pandas             0.22.0

まずは pandas をインストールして Python の REPL を起動しておく。

$ pip install pandas
$ python

サンプルになる DataFrame オブジェクトを用意する。 各行には何かの商品のグレードと価格に関する情報が入っているイメージ。

>>> import pandas as pd
>>> data = [
...     (1, 'C', 1000),
...     (2, 'B', 2500),
...     (3, 'A', 5000),
...     (4, 'S', 10000),
... ]
>>> columns = ['id', 'grade', 'price']
>>> df = pd.DataFrame(data, columns=columns)
>>> df
   id grade  price
0   1     C   1000
1   2     B   2500
2   3     A   5000
3   4     S  10000

この中で grade カラムは文字列で情報が格納されているため扱いにくい。 そこで、このカラムに入っている内容を数値に変換したいと考える。

>>> df['grade']
0    C
1    B
2    A
3    S
Name: grade, dtype: object

対応関係については次のように S ~ C に対して数値を割り振ることにする。

>>> grade_mapping = {
...     'C': 1,
...     'B': 2,
...     'A': 3,
...     'S': 4,
... }

次の通り DataFrame オブジェクトの中で各カラムは Series オブジェクトとして管理されている。

>>> type(df['grade'])
<class 'pandas.core.series.Series'>

この場合、次のように変換したいカラムに対して Series#map() メソッドを使うと上手くいく。

>>> df['grade'].map(grade_mapping)
0    1
1    2
2    3
3    4
Name: grade, dtype: int64

変換後のカラムを元の DataFrame の別のカラムと一緒に使いたいときは、次のようにすると良い。 以下のコードでは既存の DataFramegrade_norm という名前で数値に正規化したカラムを追加している。

>>> df.assign(grade_norm = df['grade'].map(grade_mapping))
   id grade  price  grade_norm
0   1     C   1000           1
1   2     B   2500           2
2   3     A   5000           3
3   4     S  10000           4

上記は DataFrame を非破壊的に操作する (新しいオブジェクトを作る) やり方になる。 もし、既存の DataFrame を破壊的に更新したいときは次のように既存のカラムに代入してしまえば良い。

>>> df['grade'] = df['grade'].map(grade_mapping)
>>> df
   id  grade  price
0   1      1   1000
1   2      2   2500
2   3      3   5000
3   4      4  10000

いじょう。

めでたしめでたし。

Pythonによるデータ分析入門 ―NumPy、pandasを使ったデータ処理

Pythonによるデータ分析入門 ―NumPy、pandasを使ったデータ処理

Python: pandas で特定の値がいくつあるか数える

今回は pandas で特定の値がいくつ数える方法について。 結論から先に書いてしまうと value_counts() という専用のメソッドがあるよ、という話。

使った環境は次の通り。

$ python -V
Python 3.6.4
$ pip list --format=columns | grep -i pandas
pandas             0.22.0

まずは pandas をインストールして Python の REPL を起動しておく。

$ pip install pandas
$ python

続いて、サンプル用のデータフレームを作っておく。 何かのグレードが英字一文字で格納されたデータのようだ。

>>> import pandas as pd
>>> data = [
...     "A",
...     "B",
...     "B",
...     "C",
... ]
>>> columns = ['grade']
>>> df = pd.DataFrame(data, columns=columns)

上記について特定の値がいくつあるか数えるとき、これまでは groupby() してから size() していた。

>>> df.groupby('grade').size()
grade
A    1
B    2
C    1
dtype: int64

もちろん、上記も間違いではないんだけど pandas の Series オブジェクトには value_counts() という専用のメソッドがある。

>>> df['grade'].value_counts()
B    2
A    1
C    1
Name: grade, dtype: int64

得られる結果はどちらも変わらない。 返り値は Series オブジェクトになる。

>>> s = df['grade'].value_counts()
>>> type(s)
<class 'pandas.core.series.Series'>

ちなみに、上記で得られた SeriesDataFrame に変換するには reset_index() メソッドを使うと便利っぽい。

>>> s.reset_index()
  index  grade
0     B      2
1     A      1
2     C      1
>>> type(s.reset_index())
<class 'pandas.core.frame.DataFrame'>

カラム名については name オプションで指定した方が良いかも。

>>> s.reset_index(name='count')
  index  count
0     B      2
1     A      1
2     C      1

いじょう。

めでたしめでたし。

Pythonによるデータ分析入門 ―NumPy、pandasを使ったデータ処理

Pythonによるデータ分析入門 ―NumPy、pandasを使ったデータ処理

PySpark のスクリプトファイルで引数を扱う

今回は Apache Spark の Python インターフェースである PySpark について。 PySpark では定型的な作業についてはスクリプトファイル (*.py) にまとめて spark-submit コマンドで実行することになる。 その際に、動作に必要な引数をさばく方法について。 結論から先に書いてしまうと spark-submit コマンドでスクリプトファイルの後ろにアプリケーション用の引数を渡せば良いだけ。

使った環境は次の通り。 Apache Spark は YARN を使って分散環境を構築してある。

$ cat /etc/redhat-release 
CentOS Linux release 7.4.1708 (Core) 
$ uname -r
3.10.0-693.17.1.el7.x86_64
$ pyspark --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.3.0
      /_/
                        
Using Scala version 2.11.8, OpenJDK 64-Bit Server VM, 1.8.0_161
Branch master
Compiled by user sameera on 2018-02-22T19:24:29Z
Revision a0d7949896e70f427e7f3942ff340c9484ff0aab
Url git@github.com:sameeragarwal/spark.git
Type --help for more information.
$ hadoop version
Hadoop 2.8.3
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r b3fe56402d908019d99af1f1f4fc65cb1d1436a2
Compiled by jdu on 2017-12-05T03:43Z
Compiled with protoc 2.5.0
From source with checksum 9ff4856d824e983fa510d3f843e3f19d
This command was run using /home/vagrant/hadoop-2.8.3/share/hadoop/common/hadoop-common-2.8.3.jar

Apache Spark で PySpark のスクリプトファイルを実行する

とりあえず、まずは引数とかに関係なく PySpark のスクリプトファイルを実行する方法から。 記述する上での注意点として、最も重要なのは pyspark コマンドで起動する REPL とは若干流儀が異なるということ。 具体的には REPL であれば最初から用意されているいくつかのインスタンス変数が用意されていない。 そのためスクリプトファイルの中でそれらを自分で作成しなければいけない。

以下のサンプルコードでは REPL であれば最初から用意されている SparkContextSparkSession のインスタンスを作っている。 そして、それらを作った上で SparkSQL を呼び出すという内容になっている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession


def main():
    conf = SparkConf()
    conf.setAppName('example')
    sc = SparkContext(conf=conf)
    spark = SparkSession(sc)

    df = spark.sql('SELECT "Hello, World!" AS message')
    df.show()


if __name__ == '__main__':
    main()

この内容ではアプリケーションの引数がどうのとかは関係ない。

上記を適当な名前で保存して spark-submit コマンドで実行する。 今回はクラスタを YARN で管理しているので --master オプションに yarn を指定している。

$ spark-submit --master yarn example.py
...
+-------------+
|      message|
+-------------+
|Hello, World!|
+-------------+
...

うまくいけば上記のように SparkSQL で得られた DataFrame の内容がログの中に表示されるはず。

アプリケーションが動作するための引数を扱う

続いてはアプリケーションが動作するのに必要な引数をどうやって渡すかについて。 これについては spark-submit コマンドのヘルプを見れば、それについて書いてある。

$ spark-submit --help 2>&1 | head -n 1
Usage: spark-submit [options] <app jar | python file> [app arguments]

スクリプトファイルの後ろにアプリケーション固有の引数を書けば良いらしい。

上記を確認するために、以下のようなサンプルコードを用意した。 このコードでは Spark 固有の機能は使っておらず、ただ単に sys.argv を出力している。 これによって Python のモジュールに渡されてきた引数の内容を表示できる。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from __future__ import print_function

import sys

from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession


def main():
    conf = SparkConf()
    conf.setAppName('example')
    sc = SparkContext(conf=conf)
    spark = SparkSession(sc)

    print('args:', sys.argv)


if __name__ == '__main__':
    main()

上記のスクリプトファイルを spark-submit で実行してみよう。 スクリプトファイルの後ろにはアプリケーションの引数に見立てた文字列をいくつか追加しておく。

$ spark-submit --master yarn example.py foo bar baz
...
args: ['/home/vagrant/example.py', 'foo', 'bar', 'baz']
...

ちゃんと spark-submit 自体の引数とは別に sys.argv に引数が渡っていることがわかる。

アプリケーション固有の引数をパースする

アプリケーションへの引数の渡し方が分かったので、あとはパースしてアプリケーションの中で使うだけ。 次のサンプルコードでは argparse モジュールを使って引数をパースして SparkSQL の中に内容を埋め込んでいる。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from __future__ import print_function

import argparse

from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession


def main():
    conf = SparkConf()
    conf.setAppName('example')
    sc = SparkContext(conf=conf)
    spark = SparkSession(sc)

    parser = argparse.ArgumentParser(description='pyspark app args example')
    parser.add_argument('--message', type=str, required=True, help='show message')
    args = parser.parse_args()

    sql = 'SELECT "{msg}" AS message'.format(msg=args.message)
    df = spark.sql(sql)
    df.show()


if __name__ == '__main__':
    main()

上記を実行してみよう。 上手くいけば表示される内容が引数で渡した内容になる。

$ spark-submit --master yarn example.py --message "Hello, PySpark"
...
+--------------+
|       message|
+--------------+
|Hello, PySpark|
+--------------+
...

ばっちり。

めでたしめでたし。

入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム

入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム

Sparkによる実践データ解析 ―大規模データのための機械学習事例集

Sparkによる実践データ解析 ―大規模データのための機械学習事例集

PySpark の DataFrame を SparkSQL で操作する

Apache Spark には SQL の実行エンジンが組み込まれていて、そのインターフェースは SparkSQL と呼ばれている。 この機能を使うと Spark で主に扱われるデータ構造の DataFrame オブジェクトを SQL で操作できる。 今回は PySpark から DataFrame を SparkSQL で操作する方法について書いてみる。

使った環境は次の通り。 Spark は YARN の上で動作するように環境構築してある。 ただし、今回扱う範囲であれば別にスタンドアロンな環境でも動くはず。

$ cat /etc/redhat-release 
CentOS Linux release 7.4.1708 (Core) 
$ uname -r
3.10.0-693.17.1.el7.x86_64
$ hadoop version
Hadoop 2.8.3
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r b3fe56402d908019d99af1f1f4fc65cb1d1436a2
Compiled by jdu on 2017-12-05T03:43Z
Compiled with protoc 2.5.0
From source with checksum 9ff4856d824e983fa510d3f843e3f19d
This command was run using /home/vagrant/hadoop-2.8.3/share/hadoop/common/hadoop-common-2.8.3.jar
$ spark-shell --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.1
      /_/
                        
Using Scala version 2.11.8, OpenJDK 64-Bit Server VM, 1.8.0_161
Branch 
Compiled by user felixcheung on 2017-11-24T23:19:45Z
Revision 
Url 
Type --help for more information.

まずは PySpark のシェルを起動しておこう。

$ pyspark --master yarn

サンプルの DataFrame を用意する

まずは操作対象のサンプルとなる DataFrmae を用意する。 ここでは RDD から作ることにした。 中身はユーザの名前と年齢が記録されたものになる。

>>> rdd = sc.parallelize([
...   ('Alice', 20),
...   ('Bob', 25),
...   ('Carol', 30),
...   ('Daniel', 30),
... ])

これで RDD ができた。

>>> rdd.collect()
[('Alice', 20), ('Bob', 25), ('Carol', 30), ('Daniel', 30)]

DataFrame には RDD と違ってスキーマがあるので、次はそれを定義する。

>>> from pyspark.sql.types import StructType
>>> from pyspark.sql.types import StructField
>>> from pyspark.sql.types import StringType
>>> from pyspark.sql.types import IntegerType
>>> schema = StructType([
...   StructField('name', StringType(), False),
...   StructField('age', IntegerType(), False),
... ])

RDD とスキーマの両方が揃ったら DataFrame に変換しよう。

>>> df = spark.createDataFrame(rdd, schema)

この通り、ちゃんと DataFrame ができた。

>>> df.show()
+------+---+
|  name|age|
+------+---+
| Alice| 20|
|   Bob| 25|
| Carol| 30|
|Daniel| 30|
+------+---+

ちなみに、ちっこいデータであれば上記のようにする以外にも次のような作り方もある。 実は、これは正に SparkSQL の機能を使っている。

>>> df = spark.sql('SELECT "Hello, World!" AS message')
>>> df
DataFrame[message: string]
>>> df.show()
+-------------+
|      message|
+-------------+
|Hello, World!|
+-------------+

PySpark のシェルにデフォルトで用意されている spark 変数は SparkSession のインスタンスになっている。 このインスタンスを通して SparkSQL の機能が利用できる。

>>> spark
<pyspark.sql.session.SparkSession object at 0x7fc2959daba8>

SparkSQL で DataFrame を操作する

それでは、先ほど作成した DaraFrame を早速 SparkSQL を使って操作してみる。 それには、まず DataFrame#registerTempTable() というメソッドを使う。 このメソッドを使うと DataFrame を SparkSQL が操作するテーブルとして登録できる。

>>> df.registerTempTable('users')

登録したら、あとは SparkSession#sql() メソッド経由で SQL から先ほどの DataFrame の内容を操作できるようになる。 試しに DataFrame を元に登録したテーブルから全レコードを取り出してみよう。

>>> df = spark.sql('SELECT * FROM users')
>>> df.show()
+------+---+
|  name|age|
+------+---+
| Alice| 20|
|   Bob| 25|
| Carol| 30|
|Daniel| 30|
+------+---+

ばっちり取り出せた。

上記を見て何となく分かるかもしれないけど SparkSQL を使って得られる結果は、やっぱり DataFrame になっている。 ここでは WHERE 句を使って先ほどの DataFrame から絞り込んだ結果を、また新たな DataFrame として取得していることになる。

>>> df = spark.sql('SELECT * FROM users WHERE age < 30')
>>> df.show()
+-----+---+
| name|age|
+-----+---+
|Alice| 20|
|  Bob| 25|
+-----+---+

つまり SparkSQL を使って得た DataFrame をさらに登録して・・・という感じでアドホックに分析を進められる。 さっき取り出した 30 歳未満だけに限定したユーザの DataFrame をテーブルとして登録してみよう。

>>> df.registerTempTable('users_age_under_30')

この通り、ちゃんと登録できた。

>>> df = spark.sql('SELECT * FROM users_age_under_30')
>>> df.show()
+-----+---+
| name|age|
+-----+---+
|Alice| 20|
|  Bob| 25|
+-----+---+

テーブルの揮発性

ちなみに DataFrame#registerTempTable() で登録した内容はメソッド名の通り一次的なものになっている。 そのため PySpark のシェルを終了すると消える。

>>> exit()

一旦シェルを終了してから、再度 PySpark のシェルを立ち上げてテーブルを参照してみることにしよう。

$ pyspark --master yarn

この通り、そんなテーブルはないよと言われる。

>>> spark.sql('SELECT * FROM users').show()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/vagrant/spark-2.2.1-bin-hadoop2.7/python/pyspark/sql/session.py", line 603, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
  File "/home/vagrant/spark-2.2.1-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
  File "/home/vagrant/spark-2.2.1-bin-hadoop2.7/python/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u'Table or view not found: users; line 1 pos 14'

そのため、何度も使いたいものについては何らかの形で何処かに書き出しておく必要がある。 書き込む先はローカルのディスクなり HDFS なり S3 なり、自分の使っているファイルシステム上になる。

SparkSQL を使わない場合との比較

ちなみに DataFrame は、もちろん SparkSQL を使わずに操作することもできる。 次は SparkSQL を使う場合と使わない場合を軽く比較してみる。

まずは、改めて DataFrame を用意しておく。

>>> users_df = spark.sql('SELECT * FROM users')

PySpark のシェルを終了した場合にはもう一度作り直そう。

SparkSQL を使わずに DataFrame の操作をするときは pyspark.sql.functions を使うことが多いと思う。 とはいえパッケージ名からして sql が入っているんだけど。

>>> from pyspark.sql import functions as F

試しに年齢の平均値を出してみる。 DataFrame#select() で年齢のカラムに pyspark.sql.functions.avg() 関数を適用する。

>>> users_age_avg_df = df.select(F.avg(users_df.age))
>>> users_age_avg_df
DataFrame[avg(age): double]

確認すると、たしかに平均が計算できた。

>>> users_age_avg_df.show()
+--------+
|avg(age)|
+--------+
|   26.25|
+--------+

ちなみに、カラム名を指定するときは Column#alias() メソッドを使う。

>>> users_df.select(F.avg(users_df.age).alias('age_avg')).show()
+-------+
|age_avg|
+-------+
|  26.25|
+-------+

SparkSQL を使うときはこんな感じ。 特に説明は不要だと思う。

>>> spark.sql('SELECT AVG(age) AS age_avg FROM users').show()
+-------+
|age_avg|
+-------+
|  26.25|
+-------+

こうして見ると SparkSQL を使わない場合でも、それに至るまでに扱う API 自体は近いことが分かる。

次は年齢ごとの人数を数えてみよう。 まずは DataFrame#groupBy() メソッドで集約に使うカラムを指定する。

>>> gd = users_df.groupBy(users_df.age)

返ってくるのは GroupedData になる。

>>> gd
<pyspark.sql.group.GroupedData object at 0x1720ed0>

集約した結果をカウントするために GroupedData#count() メソッドを使うと DataFrame が返る。

>>> df = gd.count()
>>> df
DataFrame[age: int, count: bigint]

これで各年齢の人数が分かった。

>>> df.show()
+---+-----+                                                                     
|age|count|
+---+-----+
| 20|    1|
| 25|    1|
| 30|    2|
+---+-----+

SparkSQL を使うときはこんな感じ。 見慣れた SQL そのままだ。

>>> spark.sql('SELECT age, COUNT(1) AS count FROM users GROUP BY age').show()
+---+-----+                                                                     
|age|count|
+---+-----+
| 20|    1|
| 25|    1|
| 30|    2|
+---+-----+

まとめ

今回は PySpark の DataFrame オブジェクトを SparkSQL から操作してみた。 データエンジニアリングの世界では、色々なプログラミング言語や環境、そして API が登場する。 そうした中 Apache Spark ではデータ分析における共通言語ともいえる SQL を使ってデータ構造を操作する選択肢があることはありがたい。

入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム

入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム

Sparkによる実践データ解析 ―大規模データのための機械学習事例集

Sparkによる実践データ解析 ―大規模データのための機械学習事例集

Apache Hive の Vectorization 機能を試す

今回は Apache Hive の Vectorization 機能を使ってパフォーマンスが向上するか試してみる。 Apache Hive では、通常 HDFS に保存されたデータを一行ずつ処理する。 それに対し Vectorization 機能を使うと、状況は限られるものの複数行をまとめて処理できるようになる。 結論から先に書くと、機能を有効にすることで多少のパフォーマンス向上はあることが分かった。

使った環境は次の通り。

$ cat /etc/redhat-release 
CentOS Linux release 7.4.1708 (Core) 
$ uname -r
3.10.0-693.17.1.el7.x86_64
$ hive --version
Hive 2.3.2
Git git://stakiar-MBP.local/Users/stakiar/Desktop/scratch-space/apache-hive -r 857a9fd8ad725a53bd95c1b2d6612f9b1155f44d
Compiled by stakiar on Thu Nov 9 09:11:39 PST 2017
From source with checksum dc38920061a4eb32c4d15ebd5429ac8a
$ hadoop version
Hadoop 2.8.3
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r b3fe56402d908019d99af1f1f4fc65cb1d1436a2
Compiled by jdu on 2017-12-05T03:43Z
Compiled with protoc 2.5.0
From source with checksum 9ff4856d824e983fa510d3f843e3f19d
This command was run using /home/vagrant/hadoop-2.8.3/share/hadoop/common/hadoop-common-2.8.3.jar

ダミーデータを用意する

パフォーマンスを測るからには、それなりのデータ量が必要となるはずなのでダミーデータを生成する。 今回は Golang とフェイクデータを生成するパッケージの github.com/icrowley/fake を使った。

まずは Golang をインストールしてパッケージをダウンロードしておく。

$ sudo yum -y install golang git
$ go get github.com/icrowley/fake

以下のようなダミーデータ生成用のプログラムを用意した。

$ cat << 'EOF' > dummy_users.go
package main

import (
  "fmt"
  "strconv"
  "os"
  "math/rand"
  "time"
  "github.com/icrowley/fake"
)

func main() {
  rand.Seed(time.Now().UnixNano())

  if len(os.Args) < 2 {
    fmt.Println("Too few arguments")
    fmt.Printf("%s N\n", os.Args[0])
    os.Exit(1)
  }

  N, err := strconv.Atoi(os.Args[1])
  if err != nil {
    fmt.Printf("Please enter the integer: %s\n", N)
    os.Exit(1)
  }

  for i := 0; i < N; i++ {
    name := fake.FirstName()
    age := rand.Intn(100)
    fmt.Printf("%d,%s,%d\n", i, name, age)
  }
}
EOF

上記のプログラムをビルドする。

$ go build dummy_users.go 

実行すると ID と名前と年齢のダミーデータを CSV で出力する。

$ ./dummy_users 10
0,Gregory,74
1,Sharon,35
2,Beverly,22
3,Mark,25
4,Ralph,12
5,Douglas,63
6,Catherine,44
7,Joyce,84
8,Bruce,87
9,Kevin,3

とりあえず 1000 万件 (行) のデータを生成しておく。

$ ./dummy_users 10000000 > dummy_users.csv

次のように 168MB の CSV ができた。

$ wc -l dummy_users.csv 
10000000 dummy_users.csv
$ du -m dummy_users.csv 
168    dummy_users.csv

ダミーデータを Hive のテーブルに読み込む

続いてはダミーデータを Apache Hive のテーブルに読み込む。

まずは Hive の CLI を起動する。

$ hive

先ほど生成したダミーデータと一致するようなテーブルを作っておく。

hive> CREATE TABLE users (
    >   id BIGINT,
    >   name STRING,
    >   age INT
    > )
    > ROW FORMAT DELIMITED
    > FIELDS TERMINATED BY ','
    > STORED AS TEXTFILE;
OK
Time taken: 1.243 seconds

あとは先ほど作った CSV をテーブルに読み込むだけ。

hive> LOAD DATA LOCAL INPATH 'dummy_users.csv' OVERWRITE INTO TABLE users;
Loading data to table default.users
OK
Time taken: 12.993 seconds

ORC フォーマットのテーブルにコピーする

Apache Hive の Vectorization 機能は、当初は ORC ファイルフォーマットだけに対応していた。 ただし Apache Hive のバージョン 2.1 以降では、それ以外のフォーマットもサポートされているらしい。 とはいえ、軽く触ったところ ORC の方がパフォーマンス向上が望めそうなので、そちらを使うことにする。

[HIVE-12878] Support Vectorization for TEXTFILE and other formats - ASF JIRA

まずは、先ほどとスキーマは同じでファイルフォーマットだけ ORC に変更したテーブルを用意する。

hive> CREATE TABLE users_orc (
    >   id BIGINT,
    >   name STRING,
    >   age INT
    > )
    > ROW FORMAT DELIMITED
    > FIELDS TERMINATED BY ','
    > STORED AS ORC;
OK
Time taken: 0.089 seconds

あとは、元のテーブルからデータをコピーしてくるだけ。

hive> INSERT INTO TABLE users_orc SELECT * FROM users;
...
OK
Time taken: 47.313 seconds

Vectorization 機能を有効にしてみる

Vectorization 機能は hive.vectorized.execution.enabledhive.vectorized.execution.reduce.enabled で有効・無効を切り替える。 次のように、デフォルトでは無効化されている。

hive> set hive.vectorized.execution.enabled;
hive.vectorized.execution.enabled=true
hive> set hive.vectorized.execution.reduce.enabled;
hive.vectorized.execution.reduce.enabled=true

この項目を true にすれば機能が有効になる。

hive> set hive.vectorized.execution.enabled=true;
hive> set hive.vectorized.execution.reduce.enabled=true;

Vectorization 機能が使えない状況では単に設定が無視されるだけなので、とりあえず有効にしておいても良さそう。

機能の有無でパフォーマンスを比較する

Vectorization 機能は、比較的単純なクエリを実行するときに使えるらしい。 そこで、次のような年齢ごとのユーザ数を集計するクエリで比較してみよう。

hive> SELECT
    >   age,
    >   COUNT(1)
    > FROM users_orc
    > GROUP BY age;
...
0  100244
1  100033
2  99509
3  100067
4  99326
5  100026
...
95 99622
96 99675
97 100287
98 99751
99 100240

機能の切り替えを簡単にしたいので、設定を ~/.hiverc という設定ファイルに書き込むことにする。 ここに書いておくと Hive の CLI を起動するとき、自動でそこに記述されている内容を読み込んでくれる。

$ cat << 'EOF' > ~/.hiverc
set hive.vectorized.execution.enabled=false;
set hive.vectorized.execution.reduce.enabled=false;
EOF

まずは機能を無効にした状態で、次のように 10 回ほどクエリを実行したときの時間を測ってみた。

$ SQL='SELECT age, COUNT(1) FROM users_orc GROUP BY age'
$ for i in {1..10}; do hive -e "$SQL" 2>&1 | tail -n 1; done;
Time taken: 48.642 seconds, Fetched: 100 row(s)
Time taken: 46.033 seconds, Fetched: 100 row(s)
Time taken: 48.642 seconds, Fetched: 100 row(s)
Time taken: 47.711 seconds, Fetched: 100 row(s)
Time taken: 47.276 seconds, Fetched: 100 row(s)
Time taken: 48.677 seconds, Fetched: 100 row(s)
Time taken: 47.498 seconds, Fetched: 100 row(s)
Time taken: 46.391 seconds, Fetched: 100 row(s)
Time taken: 46.013 seconds, Fetched: 100 row(s)
Time taken: 48.593 seconds, Fetched: 100 row(s)

次は設定ファイルで機能を有効化する。

$ cat << 'EOF' > ~/.hiverc
set hive.vectorized.execution.enabled=true;
set hive.vectorized.execution.reduce.enabled=true;
EOF

先ほどと同じように実行時間を測ってみた結果が次の通り。 何だか、ほんのり速くなっているような?

$ for i in {1..10}; do hive -e "$SQL" 2>&1 | tail -n 1; done;
Time taken: 45.088 seconds, Fetched: 100 row(s)
Time taken: 43.822 seconds, Fetched: 100 row(s)
Time taken: 45.188 seconds, Fetched: 100 row(s)
Time taken: 45.157 seconds, Fetched: 100 row(s)
Time taken: 43.627 seconds, Fetched: 100 row(s)
Time taken: 45.167 seconds, Fetched: 100 row(s)
Time taken: 45.897 seconds, Fetched: 100 row(s)
Time taken: 45.647 seconds, Fetched: 100 row(s)
Time taken: 43.263 seconds, Fetched: 100 row(s)
Time taken: 44.603 seconds, Fetched: 100 row(s)

実行にかかった時間を比較する

なんだか、ほんのり速くなっているような感じはするけど確証がないので詳しく見ていくことにしよう。 これ以降の部分は、別に Apache Hive とは関係がないので、どういった環境を使っても構わない。

調査するために、まずは Python のサードパーティ製パッケージの scipynumpy をインストールしておく。

$ pip install scipy numpy

Python の REPL を起動する。

$ python

機能を有効にする前と後の実行時間を numpy の配列として用意する。

>>> import numpy as np
>>> before = np.array([
...     48.642,
...     46.033,
...     48.642,
...     47.711,
...     47.276,
...     48.677,
...     47.498,
...     46.391,
...     46.013,
...     48.593,
... ])
>>> after = np.array([
...     45.088,
...     43.822,
...     45.188,
...     45.157,
...     43.627,
...     45.167,
...     45.897,
...     45.647,
...     43.263,
...     44.603,
... ])

平均実行時間を比べると、機能を有効にしたときの方が 3 秒前後短いようだ。

>>> before.mean()
47.5476
>>> after.mean()
44.7459

ということで Vectorization 機能はパフォーマンスの向上に有効でした。 ...というのは、あまりにも短絡的なので、一応検定しておくことにする。

ウェルチの t 検定で、機能を有効にした方の平均が「有意に小さい」を対立仮説に「有意に小さくない」を帰無仮説として検定する。

>>> _, p = stats.ttest_ind(before, after, equal_var=False)
>>> p / 2
3.957745312666858e-06

p-value は非常に小さな値なので、仮に有意水準 1% でも余裕で帰無仮説は棄却され対立仮説を採用することになる。

最初、F 検定からの単純 t 検定を使っていたんだけど、どうやら最近は等分散とか関係なくウェルチの t 検定でええやんってことみたい。 ちなみに、最初の方法でも等分散かつ有意だった。

laboratoryofbiology.blogspot.jp

めでたしめでたし。

プログラミング Hive

プログラミング Hive

  • 作者: Edward Capriolo,Dean Wampler,Jason Rutherglen,佐藤直生,嶋内翔,Sky株式会社玉川竜司
  • 出版社/メーカー: オライリージャパン
  • 発売日: 2013/06/15
  • メディア: 大型本
  • この商品を含むブログ (3件) を見る

統計学入門 (基礎統計学?)

統計学入門 (基礎統計学?)

Apache Hive で圧縮形式のデータを扱う

Apache Hive のテーブルを構成するデータは、デフォルトでは無圧縮になっている。 しかし、設定を変更することで圧縮形式のデータも扱うことができる。 そこで、今回は Apache Hive で圧縮形式のデータを扱ってみることにする。

データを圧縮することには、主に二つのメリットがある。 まず一つ目は HDFS 上のサイズが小さくなるのでディスク容量の節約になること。 そして二つ目こそ本命だけどサイズが小さくなるので読み出しにかかるディスク I/O の負荷も下げることができる。 Hadoop においてディスク I/O は最もボトルネックになりやすいところなので、これは重要となる。

使った環境は次の通り。

$ cat /etc/redhat-release
CentOS Linux release 7.4.1708 (Core) 
$ uname -r
3.10.0-693.11.1.el7.x86_64
$ hive --version
Hive 2.3.2
Git git://stakiar-MBP.local/Users/stakiar/Desktop/scratch-space/apache-hive -r 857a9fd8ad725a53bd95c1b2d6612f9b1155f44d
Compiled by stakiar on Thu Nov 9 09:11:39 PST 2017
From source with checksum dc38920061a4eb32c4d15ebd5429ac8a
$ hadoop version
Hadoop 2.8.3
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r b3fe56402d908019d99af1f1f4fc65cb1d1436a2
Compiled by jdu on 2017-12-05T03:43Z
Compiled with protoc 2.5.0
From source with checksum 9ff4856d824e983fa510d3f843e3f19d
This command was run using /home/vagrant/hadoop-2.8.3/share/hadoop/common/hadoop-common-2.8.3.jar

準備

まずは、ひとまず Hive の CLI を起動しておく。

$ hive

起動したら、次のように set 命令を使って設定を確認しておこう。 hive.exec.compress.output は出力結果を圧縮形式にするかを設定する項目になっている。 そして mapred.output.compression.codec は圧縮に使うコーデックを指定する項目になっている。

hive> set hive.exec.compress.output;
hive.exec.compress.output=false
hive> set mapred.output.compression.codec;
mapred.output.compression.codec=org.apache.hadoop.io.compress.DefaultCodec

上記のように、デフォルトでは出力結果を圧縮しないようになっている。

結果が圧縮されないことを確認したところで、まずは無圧縮のテーブルを作ってみよう。 以下の設定ではデータが CSV で保存されることになる。

hive> CREATE TABLE users (
    >   name STRING,
    >   age INT
    > )
    > ROW FORMAT DELIMITED
    > FIELDS TERMINATED BY ','
    > STORED AS TEXTFILE;
OK
Time taken: 12.171 seconds

レコードを追加する。

hive> INSERT INTO TABLE users VALUES
    >   ("Alice", 20),
    >   ("Bob", 25),
    >   ("Carol", 30);
...
OK
Time taken: 40.852 seconds

この状態で、テーブルを構成するデータがどのように保存されているかをまずは確認しておこう。 テーブルの保存先は SHOW CREATE TABLE で確認できる。

hive> SHOW CREATE TABLE users;
OK
CREATE TABLE `users`(
  `name` string, 
  `age` int)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' 
WITH SERDEPROPERTIES ( 
  'field.delim'=',', 
  'serialization.format'=',') 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  'hdfs://master:9000/user/hive/warehouse/users'
TBLPROPERTIES (
  'transient_lastDdlTime'='1518700567')
Time taken: 0.203 seconds, Fetched: 16 row(s)

上記の LOCATION が保存先を表している。

別のターミナルで hdfs dfs -ls コマンドを使って上記のパスを確認してみよう。

$ hdfs dfs -ls /user/hive/warehouse/users
Found 1 items
-rwxrwxr-x   2 vagrant supergroup         25 2018-02-15 13:16 /user/hive/warehouse/users/000000_0

ディレクトリの中には 000000_0 というファイルだけがあることが分かる。

hdfs dfs -cat コマンドを使って内容を確認してみよう。

$ hdfs dfs -cat /user/hive/warehouse/users/000000_0
Alice,20
Bob,25
Carol,30

すると、これが CSV ファイルで先ほど投入したレコードが入っていることが分かる。 デフォルトでは、このようにファイルが無圧縮で HDFS 上にそのまま保存されることになる。

テーブルを構成するデータを圧縮形式にする

続いてはテーブルを構成するデータを圧縮したものにしてみよう。 それには、前述した設定項目を編集する。

hive> set hive.exec.compress.output=true;
hive> set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;

ここではデータを GZIP で圧縮するように設定した。

先ほど作った無圧縮のテーブルから、新しいテーブルを作ってみる。 テーブル自体の設定自体は先ほどと変えておらず中身は CSV になるよう指定している。

hive> CREATE TABLE users_gzip
    >   ROW FORMAT DELIMITED
    >   FIELDS TERMINATED BY ','
    >   STORED AS TEXTFILE
    > AS SELECT * FROM users;
...
OK
Time taken: 28.895 seconds

先ほどと同じように保存先の HDFS を確認してみよう。

$ hdfs dfs -ls /user/hive/warehouse/users_gzip
Found 1 items
-rwxrwxr-x   2 vagrant supergroup         45 2018-02-15 13:22 /user/hive/warehouse/users_gzip/000000_0.gz

すると、今度はディレクトリの中にあるファイルに .gz という名前がついていることが分かる。

ファイルをローカルにダウンロードしてこよう。

$ hdfs dfs -get /user/hive/warehouse/users_gzip/000000_0.gz

file コマンドを使って形式を確認すると、ちゃんと GZIP ファイルとなっている。

$ file 000000_0.gz
000000_0.gz: gzip compressed data, from Unix

gunzip コマンドを使って圧縮ファイルを解凍してみよう。

$ gunzip 000000_0.gz

解凍したファイルを確認すると CSV ファイルになっていることが分かる。

$ cat 000000_0 
Alice,20
Bob,25
Carol,30

このように設定を切り替えることでテーブルを構成するデータを圧縮形式にできる。

圧縮形式によるサイズの違いを比べてみる

続いては圧縮形式を変えることでテーブルのサイズがどのように変化するか見てみる。 違いを確かめるには、ある程度のサイズが必要なのでダミーデータを作ることにした。

ダミーデータの生成には Python の faker を使うことにする。 pip コマンドを使ってインストールしよう。

$ sudo yum -y install epel-release
$ sudo yum -y install python-pip
$ sudo pip install faker

インストールができたら Python の REPL を起動する。

$ python

次のようにして 10 万件のダミーデータが入った CSV ファイルを作る。

>>> from faker import Faker
>>> fake = Faker()
>>> import random
>>> N = 100000
>>> with open('users.csv', 'w') as f:
...     for _ in range(N):
...         age = random.randint(0, 100)
...         name = fake.last_name()
...         f.write(name + ',' + str(age) + '\n')
...

こんな感じでダミーデータの入った CSV ファイルができる。

$ wc -l users.csv
100000 users.csv
$ head users.csv 
Moore,82
Jensen,40
Robinson,42
White,11
Atkinson,56
Small,17
Wilson,76
Johnson,64
Moody,85
Barnes,61

それを Hive のテーブルに取り込む。

hive> LOAD DATA LOCAL INPATH 'users.csv' OVERWRITE INTO TABLE users;
Loading data to table default.users
OK
Time taken: 1.026 seconds

上手くいけば次のように 10 万件のデータが見えるようになる。

hive> SELECT COUNT(1) FROM users;
...
OK
100000
Time taken: 39.95 seconds, Fetched: 1 row(s)

GZIP

まずは GZIP 形式から。 先ほど作ったテーブルは一旦消しておこう。

DROP TABLE users_gzip;

圧縮に使うコーデックとして GZIP を指定する。

hive> set hive.exec.compress.output=true;
hive> set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;

先ほど作った 10 万件のテーブルをコピーして新しいテーブルを作る。

hive> CREATE TABLE users_gzip
    >   ROW FORMAT DELIMITED
    >   FIELDS TERMINATED BY ','
    >   STORED AS TEXTFILE
    > AS SELECT * FROM users;
...
OK
Time taken: 27.286 seconds

この通りデータがコピーされた。

hive> SELECT COUNT(1) FROM users_gzip;
OK
100000
Time taken: 0.207 seconds, Fetched: 1 row(s)

テーブルのサイズは hdfs dfs -du コマンドを使って確認できる。

$ hdfs dfs -du -h /user/hive/warehouse | grep users
974.5 K  /user/hive/warehouse/users
335.9 K  /user/hive/warehouse/users_gzip

GZIP で圧縮するとテーブルのサイズが約 34% まで小さくなった。

BZIP2

同じことを BZIP2 でもやる。

hive> set hive.exec.compress.output=true;
hive> set mapred.output.compression.codec=org.apache.hadoop.io.compress.BZip2Codec;
hive> CREATE TABLE users_bzip2
    >   ROW FORMAT DELIMITED
    >   FIELDS TERMINATED BY ','
    >   STORED AS TEXTFILE
    > AS SELECT * FROM users;
...
OK
Time taken: 27.634 seconds

Snappy

続いては Snappy で。

hive> set hive.exec.compress.output=true;
hive> set mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
hive> CREATE TABLE users_snappy
    >   ROW FORMAT DELIMITED
    >   FIELDS TERMINATED BY ','
    >   STORED AS TEXTFILE
    > AS SELECT * FROM users;
...
OK
Time taken: 26.979 seconds

LZ4

最後に LZ4 を。

hive> set hive.exec.compress.output=true;
hive> set mapred.output.compression.codec=org.apache.hadoop.io.compress.Lz4Codec;
hive> CREATE TABLE users_lz4
    >   ROW FORMAT DELIMITED
    >   FIELDS TERMINATED BY ','
    >   STORED AS TEXTFILE
    > AS SELECT * FROM users;
...
OK
Time taken: 26.433 seconds

圧縮率を比較する

これで一通り出揃ったのでサイズを確認してみよう。 ちなみに圧縮に使えるコーデックは Hadoop のバージョンによって異なる。

$ hdfs dfs -du -h /user/hive/warehouse | grep users
974.5 K  /user/hive/warehouse/users
228.6 K  /user/hive/warehouse/users_bzip2
335.9 K  /user/hive/warehouse/users_gzip
583.6 K  /user/hive/warehouse/users_lz4
603.8 K  /user/hive/warehouse/users_snappy

上記から圧縮したときのサイズは BZIP2 < GZIP < LZ4 < Snappy ということが分かった。 ただし、圧縮率の高いコーデックは解凍に時間がかかる。 つまり、ディスク I/O の負担は減るが、代わりに CPU の負担が増えることになる。

また、圧縮形式を決める上では、その形式が「スプリット可能」かについても注意する必要があるらしい。 スプリット可能というのは、大きなファイルを分割して複数のマッパー、リデューサーで処理できることを表している。 前述した圧縮形式の中では BZIP2 についてはスプリット可能、LZ4 と GZIP と Snappy はスプリット不能となっている。 スプリット不能な圧縮形式では、大きなファイルも一つのマッパー、リデューサーで処理しなければならない。 そのため、分散処理のメリットが薄れてしまう。

ただし、上記はあくまで「大きなファイルがある」ことが前提になっている。 つまり、テーブルを構成するファイルが全て適度な大きさになっていればスプリット不能でも問題にはならない。 また GZIP や Snappy といった圧縮形式がスプリット不能なのは、あくまでテキストファイルで保存する場合に限られるようだ。 後述する Sequence ファイルや ORC ファイルといったフォーマットで保存するなら、スプリット可能になるらしい。

一つのテーブルをまぜこぜの圧縮形式で構成してみる

ここまでやってきて、ちょっとした疑問が浮かぶ。 圧縮形式を Hive の設定項目で切り替えるということは、テーブル自体にはそのメタ情報が保存されていないことを意味する。 だとすれば、一つのテーブルを複数の圧縮形式が入り乱れた状態で作ることもできるのだろうか。 先に結論から書くと、これはできるようになっている。

検証用のテーブルを用意する。

hive> CREATE TABLE users_mixed (
    >   name STRING,
    >   age INT
    > )
    > ROW FORMAT DELIMITED
    > FIELDS TERMINATED BY ','
    > STORED AS TEXTFILE;
OK
Time taken: 0.091 seconds

圧縮に使うコーデックの設定を切り替えながらレコードを追加する。

hive> set hive.exec.compress.intermediate=true;
hive> set mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
hive> INSERT INTO TABLE users_mixed VALUES ("Alice", 20);
...
hive> set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;
hive> INSERT INTO TABLE users_mixed VALUES ("Bob", 25);
...
hive> set mapred.output.compression.codec=org.apache.hadoop.io.compress.BZip2Codec;
hive> INSERT INTO TABLE users_mixed VALUES ("Carol", 30);

レコードを追加した後、テーブルを構成するファイルを確認すると、次のような結果が得られる。 それぞれのファイルの末尾には異なる拡張子がついていて、複数の圧縮形式が混在している状況だ。

$ hdfs dfs -ls /user/hive/warehouse/users_mixed
Found 3 items
-rwxrwxr-x   2 vagrant supergroup         54 2018-02-15 13:49 /user/hive/warehouse/users_mixed/000000_0.bz2
-rwxrwxr-x   2 vagrant supergroup         27 2018-02-15 13:48 /user/hive/warehouse/users_mixed/000000_0.gz
-rwxrwxr-x   2 vagrant supergroup         19 2018-02-15 13:48 /user/hive/warehouse/users_mixed/000000_0.snappy

この状態であっても、テーブルからはちゃんとレコードを読み出せる。

hive> SELECT * FROM users_mixed;
OK
Carol   30
Bob 25
Alice   20
Time taken: 0.271 seconds, Fetched: 3 row(s)

どうしてこんなことができるかというと、拡張子からファイルの種類を自動的に推定して処理してくれるようになっているため。 ただし、この処理方法はデータを保存するフォーマットがテキストファイルの場合だから、という点に注意が必要となる。 後述する別のフォーマットでは、ファイルの種類に関する情報がファイル自体やテーブルのメタデータに書き込まれることになる。 そのため、まぜこぜにできない場合もある。

Sequence ファイルを圧縮してみる

ここまでの例では、全てファイルのフォーマットとしてテキストファイルを使っていた。 つまり、テーブル定義で STORED AS TEXTFILE としていた。 ここからは別のフォーマットを使った場合にどうなるか確認してみよう。 テキストファイルを使ったときとは、少し勝手が異なる場合があるようだ。

最初は Sequence ファイルというフォーマットを試してみよう。 このフォーマットはバイナリ形式になっている。 Sequence ファイルのフォーマットであれば、テキストファイルでスプリット不能だった圧縮形式でもスプリット可能になる。

まずは設定で GZIP を使った圧縮を有効にしておく。

hive> set hive.exec.compress.output=true;
hive> set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;

続いて Sequence ファイルを使ってデータを保存するようにしたテーブルを定義する。 具体的にはテーブルを定義する時点で STORED AS SEQUENCEFILE とする。

hive> CREATE TABLE users_sequence (
    >   name STRING,
    >   age INT
    > )
    > ROW FORMAT DELIMITED
    > FIELDS TERMINATED BY ','
    > STORED AS SEQUENCEFILE;
OK
Time taken: 0.07 seconds

作成したテーブルにレコードを追加してみよう。

hive> INSERT INTO TABLE users_sequence VALUES ("Alice", 20);
...
OK
Time taken: 27.658 seconds

レコードが追加できたら HDFS 上でどのようになっているか確認する。

$ hdfs dfs -ls /user/hive/warehouse/users_sequence
Found 1 items
-rwxrwxr-x   2 vagrant supergroup        168 2018-02-15 14:07 /user/hive/warehouse/users_sequence/000000_0

すると、先ほどのテキストファイルの場合とは異なりファイルに .gz の拡張子がついていない。

バイナリフォーマットのファイルなので、ちょっと乱暴だけど内容をテキストとして表示してみよう。

$ hdfs dfs -cat /user/hive/warehouse/users_sequence/000000_0
SEQ"org.apache.hadoop.io.BytesWritableorg.apache.hadoop.io.Text'org.apache.hadoop.io.compress.GzipCodec41e?:???Qt?j!!?p??LN?123ݟ?   

すると、ファイルの中に圧縮に使ったコーデックの情報が含まれていることが分かる。 このように Sequence ファイルでは、ファイル自体に圧縮形式の情報が含まれることになる。 また、このやり方ならファイルを丸ごと圧縮する方法ではないためスプリット可能になる理由もうなずける。 ちゃんとレコードの区切りを考えて各パートごとに圧縮した情報をメタデータとしてファイルに残しておけるからだ。

ちなみに Sequence ファイルについては hdfs dfs -text コマンドで中身が確認できるようになっている。

$ hdfs dfs -text /user/hive/warehouse/users_sequence/000000_0
18/02/15 14:16:13 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
18/02/15 14:16:13 INFO compress.CodecPool: Got brand-new decompressor [.gz]
    Alice,20

一応 Snappy で圧縮したときの結果も確認しておこう。

hive> set mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
hive> INSERT INTO TABLE users_sequence VALUES ("Bob", 25);
...
OK
Time taken: 29.17 seconds

さっきと同じようにファイルの中に圧縮に使ったコーデックの情報が記録されていることが分かる。

$ hdfs dfs -cat /user/hive/warehouse/users_sequence/000000_0_copy_1
SEQ"org.apache.hadoop.io.BytesWritableorg.apache.hadoop.io.Text)org.apache.hadoop.io.compress.SnappyCodech???7?
                                                                                                               ??h?   Bob,25

ORC

次は ORC (The Optimized Row Columnar) ファイルについて。 ORC ファイルは Sequence ファイルと同じようにバイナリ形式のフォーマットになっている。 Hive に最適化されたカラム志向型のデータフォーマットになっており高速に動作する。 いくつかの資料を見ると、この ORC ファイルを使うのが Hive のベストプラクティスのようだ。

jp.hortonworks.com

LanguageManual ORC - Apache Hive - Apache Software Foundation

ORC ファイルを圧縮するときは、これまでとはまたちょっと勝手が違っている。 何かというと、テーブル自体のメタデータに圧縮形式が記録されるため。 つまり、これまで使ってきた設定項目の hive.exec.compress.output などの内容は無視されるので注意が必要となる。

それでは ORC ファイルでデータを保存するテーブルを定義しよう。 実は、特に指定はないけどこれだけでファイルが GZIP で圧縮されるようにデフォルト値がなっている。

hive> CREATE TABLE users_orc (
    >   name STRING,
    >   age INT
    > )
    > ROW FORMAT DELIMITED
    > FIELDS TERMINATED BY ','
    > STORED AS ORC;
OK
Time taken: 0.143 seconds

レコードを追加してみよう。

hive> INSERT INTO TABLE users_orc VALUES ("Alice", 20);
...
OK
Time taken: 24.535 seconds

今回も、ファイルを確認すると拡張子には何もついていない。

$ hdfs dfs -ls /user/hive/warehouse/users_orc/
Found 1 items
-rwxrwxr-x   2 vagrant supergroup        295 2018-02-15 14:24 /user/hive/warehouse/users_orc/000000_0

中身を確認すると ORC という文字列が見えるため、どうやら確かに ORC ファイルのようだ。

$ hdfs dfs -cat /user/hive/warehouse/users_orc/000000_0
ORC
P8??be!1F%.Vǜ??T%??+


(((P
    AliceFPN(V?b?``???ь?`?IBH3?@? H???LlBL
                                                @??T???b?`
        ???`hg???`p?Q??`T??bbd?b?K?M?bNLOUb?`bfF+?+1F%.Vǜ??T%???`b??А`p?[??"!6
                                                                            (-0??ORC

続いては、どのように ORC ファイルのテーブルで圧縮形式を指定するか見ていこう。 その前に、一旦先ほど作ったテーブルは削除しておく。

hive> DROP TABLE users_orc;
OK
Time taken: 0.195 seconds

結論から先に書くと ORC ファイルでは TBLPROPERTIES という書式を用いて圧縮形式を指定する。 この中にキーバリュー形式で orc.compress に圧縮形式を指定する。 以下では SNAPPY を指定している。 次のクエリではダミーデータの入ったテーブルからレコードをコピーしている。

hive> CREATE TABLE users_orc_snappy
    >   ROW FORMAT DELIMITED
    >   FIELDS TERMINATED BY ','
    >   STORED AS ORC TBLPROPERTIES ("orc.compress" = "SNAPPY")
    > AS SELECT * FROM users;

同じように、指定なし、ZLIB、NONE を指定したものを作っていこう。

hive> CREATE TABLE users_orc
    >   ROW FORMAT DELIMITED
    >   FIELDS TERMINATED BY ','
    >   STORED AS ORC
    > AS SELECT * FROM users;
...
hive> CREATE TABLE users_orc_gzip
    >   ROW FORMAT DELIMITED
    >   FIELDS TERMINATED BY ','
    >   STORED AS ORC TBLPROPERTIES ("orc.compress" = "ZLIB")
    > AS SELECT * FROM users;
...
hive> CREATE TABLE users_orc_none
    >   ROW FORMAT DELIMITED
    >   FIELDS TERMINATED BY ','
    >   STORED AS ORC TBLPROPERTIES ("orc.compress" = "NONE")
    > AS SELECT * FROM users;

結果は次の通り。 指定しなかったものと GZIP が同じサイズになっており、デフォルトで圧縮が効いていることが確認できる。 今回のデータでは、圧縮をかけない NONE と Snappy のサイズの違いがほとんど出なかった。

$ hdfs dfs -du -h /user/hive/warehouse | grep users_orc
231.6 K  /user/hive/warehouse/users_orc
231.6 K  /user/hive/warehouse/users_orc_gzip
301.2 K  /user/hive/warehouse/users_orc_none
300.1 K  /user/hive/warehouse/users_orc_snappy

まとめ

今回は Apache Hive のテーブルを構成するファイルを圧縮する方法について扱った。 ファイルを圧縮すると、ディスク容量の節約やパフォーマンスの向上が見込める。 ただし、ファイルを圧縮するときは、その特性やファイルフォーマットとの相性について理解を深める必要もある。 例えば、テキストファイルでは圧縮形式によってスプリット可能・不能といった問題が出てくる。 また、フォーマットごとに圧縮形式の情報を何処に残すかが異なっていたり、使える形式についても違ったりすることも分かった。

プログラミング Hive

プログラミング Hive

  • 作者: Edward Capriolo,Dean Wampler,Jason Rutherglen,佐藤直生,嶋内翔,Sky株式会社玉川竜司
  • 出版社/メーカー: オライリージャパン
  • 発売日: 2013/06/15
  • メディア: 大型本
  • この商品を含むブログ (3件) を見る

Docker コンテナのログを syslog でリモートホストに飛ばす

今回は Docker コンテナのログを syslog で別のホストに飛ばしてみることにする。 言うまでもなく、ロギングはシステムを運用する上で欠かせない要素の一つ。

Docker には、あらかじめ複数のロギングドライバが組み込まれていて、それらを使い分けることができる。 今回使うのはその中の一つで syslog ドライバという名前がついている。 このドライバで飛ばした Docker コンテナのログを syslog-ng で受け取ってファイルに書き出してみよう。 ただし、リモートホストといっても TCP 越しにログを飛ばすだけで単一のホスト上で組んでしまうことにする。

ロギングドライバについての詳細はこのドキュメントに記載されている。

docs.docker.com

syslog ドライバに限った話についてはこちらに書いてある。

docs.docker.com

使った環境は次の通り。

$ cat /etc/redhat-release 
CentOS Linux release 7.4.1708 (Core) 
$ uname -r
3.10.0-693.17.1.el7.x86_64
$ sudo docker version
Client:
 Version:         1.12.6
 API version:     1.24
 Package version: docker-1.12.6-71.git3e8e77d.el7.centos.1.x86_64
 Go version:      go1.8.3
 Git commit:      3e8e77d/1.12.6
 Built:           Tue Jan 30 09:17:00 2018
 OS/Arch:         linux/amd64

Server:
 Version:         1.12.6
 API version:     1.24
 Package version: docker-1.12.6-71.git3e8e77d.el7.centos.1.x86_64
 Go version:      go1.8.3
 Git commit:      3e8e77d/1.12.6
 Built:           Tue Jan 30 09:17:00 2018
 OS/Arch:         linux/amd64

syslog-ng をインストールする

まずはログを受ける syslog-ng をインストールする。 CentOS 7 の場合は EPEL を入れると yum でインストールできるようになる。

$ sudo yum -y install epel-release
$ sudo yum -y install syslog-ng

続いて syslog-ng の設定ファイルを用意する。 ここでは TCP/514 番ポートで受けた syslog を /var/log/docker/docker.log に書き出すように設定している。

$ cat << 'EOF' | sudo tee /etc/syslog-ng/conf.d/docker.conf > /dev/null
source s_remote_tcp {
  tcp(ip(0.0.0.0), port(514));
};

destination d_docker {
  file("/var/log/docker/docker.log");
};

log {
  source(s_remote_tcp);
  destination(d_docker);
};
EOF

設定ファイルに不備がないかをチェックしておこう。 以下のコマンドで出力が何も返ってこなければ問題ない。 もし設定ファイルの書式を間違えていると、その箇所を教えてくれる。

$ sudo syslog-ng -s

続いて、ログを書き出す先のディレクトリを作っておこう。

$ sudo mkdir -p /var/log/docker

これでログを受ける準備が整ったので syslog-ng のサービスを起動する。

$ sudo systemctl enable syslog-ng
$ sudo systemctl start syslog-ng

ちゃんと設定通り TCP/514 番ポートで Listen していることが見て取れる。

$ sudo ss -tlnp | grep 514
LISTEN     0      128          *:514                      *:*                   users:(("syslog-ng",pid=3438,fd=8))

Docker をインストールする

続いて Docker をインストールする。 Docker は標準リポジトリからでもインストールできる。 ちょっとバージョンが古いけどね。

$ sudo yum -y install docker

インストールできたら Docker のサービスを起動する。

$ sudo systemctl enable docker
$ sudo systemctl start docker

Docker コンテナのログを syslog で飛ばす

あとはDocker コンテナの起動時にドライバとそのオプションを設定するだけ。 以下のように --log-driver オプションに syslog を指定すると syslog ドライバを使うようになる。 さらに --log-opt オプションで syslog-address を指定すると目当てのプロトコルとポートにログを送れる。

$ sudo docker run \
  --log-driver=syslog \
  --log-opt syslog-address=tcp://localhost:514 \
  -it alpine \
  ls /

syslog-ng の出力先のファイルを確認すると、ちゃんと実行結果がログとして残されていることが分かる。

$ sudo cat /var/log/docker/docker.log 
Feb 13 13:41:53 127.0.0.1 docker/f825d108f7fa[3590]: bin    etc    lib    mnt    root   sbin   sys    usr
Feb 13 13:41:53 127.0.0.1 docker/f825d108f7fa[3590]: dev    home   media  proc   run    srv    tmp    var

念のため、もうちょっとログの量が多い場合も試しておこう。 以下では MySQL のコンテナを起動している。

$ sudo docker run \
  --log-driver=syslog \
  --log-opt syslog-address=tcp://localhost:514 \
  -e MYSQL_ROOT_PASSWORD=rootpasswd \
  -it mysql

syslog-ng の出力先を確認すると、ちゃんと MySQL のログが出力されていることが分かる。

$ sudo tail /var/log/docker/docker.log 
Feb 13 13:45:18 127.0.0.1 docker/6b6389751769[3590]: 2018-02-13T13:45:18.041553Z 0 [Warning] 'user' entry 'mysql.session@localhost' ignored in --skip-name-resolve mode.
Feb 13 13:45:18 127.0.0.1 docker/6b6389751769[3590]: 2018-02-13T13:45:18.041564Z 0 [Warning] 'user' entry 'mysql.sys@localhost' ignored in --skip-name-resolve mode.
Feb 13 13:45:18 127.0.0.1 docker/6b6389751769[3590]: 2018-02-13T13:45:18.041597Z 0 [Warning] 'db' entry 'performance_schema mysql.session@localhost' ignored in --skip-name-resolve mode.
Feb 13 13:45:18 127.0.0.1 docker/6b6389751769[3590]: 2018-02-13T13:45:18.041606Z 0 [Warning] 'db' entry 'sys mysql.sys@localhost' ignored in --skip-name-resolve mode.
Feb 13 13:45:18 127.0.0.1 docker/6b6389751769[3590]: 2018-02-13T13:45:18.041622Z 0 [Warning] 'proxies_priv' entry '@ root@localhost' ignored in --skip-name-resolve mode.
Feb 13 13:45:18 127.0.0.1 docker/6b6389751769[3590]: 2018-02-13T13:45:18.043787Z 0 [Warning] 'tables_priv' entry 'user mysql.session@localhost' ignored in --skip-name-resolve mode.
Feb 13 13:45:18 127.0.0.1 docker/6b6389751769[3590]: 2018-02-13T13:45:18.043806Z 0 [Warning] 'tables_priv' entry 'sys_config mysql.sys@localhost' ignored in --skip-name-resolve mode.
Feb 13 13:45:18 127.0.0.1 docker/6b6389751769[3590]: 2018-02-13T13:45:18.054543Z 0 [Note] Event Scheduler: Loaded 0 events
Feb 13 13:45:18 127.0.0.1 docker/6b6389751769[3590]: 2018-02-13T13:45:18.054868Z 0 [Note] mysqld: ready for connections.
Feb 13 13:45:18 127.0.0.1 docker/6b6389751769[3590]: Version: '5.7.21'  socket: '/var/run/mysqld/mysqld.sock'  port: 3306  MySQL Community Server (GPL)

ばっちり。

めでたしめでたし。