CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: Apache Parquet フォーマットを扱ってみる

今回は、最近知った Apache Parquet フォーマットというものを Python で扱ってみる。 これは、データエンジニアリングなどの領域でデータを永続化するのに使うフォーマットになっている。 具体的には、データセットの配布や異なるコンポーネント間でのデータ交換がユースケースとして考えられる。

これまで、同様のユースケースには CSV や Python の Pickle フォーマットが用いられていた。 ただ、CSV は行志向のフォーマットなので不要なカラムであっても必ず読まなければいけないという問題点がある。 また Pickle の場合は、それに加えて扱えるのが Python のコンポーネントに限られてしまう。

そこで登場するのが今回紹介する Apache Parquet フォーマットということらしい。 Apache Parquet フォーマットは Apache Hadoop エコシステムの一貫として開発されている。 カラム志向のファイルフォーマットになっていて、取り扱う上での時間計算量・空間計算量を減らすための工夫がなされているみたい。

今回試した環境は次の通り。

$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.12.6
BuildVersion:   16G29
$ python --version
Python 3.6.3

下準備

Apache Parquet を使った永続化について説明をする前に、これまではどういった選択肢があったかについて見ていきたい。 内容としては pandas の DataFrame オブジェクトを色々なフォーマットで保存したり読み込んでみる。

そこで、まずは pandas をインストールしておこう。

$ pip install pandas

サンプルとなる DataFrame オブジェクトを用意する。

>>> import pandas as pd
>>> df = pd.DataFrame([('Alice', 15),
...                    ('Bob', 20),
...                    ('Carol', 25)],
...                    columns=['name', 'age'])
>>> df
    name  age
0  Alice   15
1    Bob   20
2  Carol   25

CSV フォーマット

まず、最初の選択肢として考えられるのは CSV フォーマットを使ったもの。 これは、多くのデータセットの配布に用いられているものなので、ほとんどの人にとって馴染み深いものだと思う。

pandas では DataFrame オブジェクトに to_csv() メソッドがあるので、それを使って永続化する。 index オプションに False を指定しているのは pandas がつけたインデックスのカラムを出力させないため。

>>> df.to_csv('users.csv', index=False)

これだけでカンマで区切られたおなじみのファイルができあがる。

$ cat users.csv
name,age
Alice,15
Bob,20
Carol,25

CSV フォーマットのファイルから DataFrame オブジェクトを復元するには read_csv() 関数を使う。

>>> pd.read_csv('users.csv')
    name  age
0  Alice   15
1    Bob   20
2  Carol   25

Pickle フォーマット

続いては Python のシリアライズ・デシリアライズ機構である pickle モジュールを用いたやり方。 Pickle については次のブログエントリに詳しく書いている。

blog.amedama.jp

pandas には DataFrame オブジェクトを Pickle フォーマットのファイルにシリアライズするためのメソッドとして to_pickle() がある。

>>> df.to_pickle('users.pickle')

上記のメソッドを使っても構わないし、もちろん自分で pickle モジュールを使ってシリアライズしても良い。

>>> import pickle
>>> with open('users.pickle', mode='wb') as f:
...     pickle.dump(df, f)
...

読み込むときは read_pickle() 関数が使える。

>>> pd.read_pickle('users.pickle')
    name  age
0  Alice   15
1    Bob   20
2  Carol   25

もちろん、読み込みに関しても pickle モジュールを使って自分でデシリアライズしても構わない。

>>> with open('users.pickle', mode='rb') as f:
...     pickle.load(f)
...
    name  age
0  Alice   15
1    Bob   20
2  Carol   25

ただ、実際のところは pandas の API を通して Pickle を扱うのがおすすめ。 なぜかというと、簡単にデータの圧縮ができるので。

例えば、次のように compression オプションにフォーマットを指定するだけで圧縮できる。

>>> df.to_pickle('users.pickle', compression='gzip')
>>> pd.read_pickle('users.pickle', compression='gzip')
    name  age
0  Alice   15
1    Bob   20
2  Carol   25

Parquet フォーマット

さて、本題に入るまで長かったけど、ここからやっと Parquet フォーマットについて扱う。

pandas の DataFrame オブジェクトを Parquet フォーマットでやり取りするにはいくつかのライブラリがある。 例えば fastparquetpyarrow がある。 今回は、その両方を試してみることにした。

fastparquet

まずは fastparquet から。

何はともあれ fastparquet パッケージをインストールする。

$ pip install fastparquet
$ pip list --format=columns | grep fastparquet
fastparquet        0.1.3

pandas の DataFrame オブジェクトを fastparquet でシリアライズするときは write() 関数を使う。

>>> from fastparquet import write
>>> write('users.parquet', df)

デシリアライズには ParquetFile オブジェクトを作った上で to_pandas() メソッドで DataFrame オブジェクトに変換する。

>>> from fastparquet import ParquetFile
>>> pf = ParquetFile('users.parquet')
>>> pf.to_pandas()
    name  age
0  Alice   15
1    Bob   20
2  Carol   25

ちなみにシリアライズするときに圧縮をかけることもできる。 例えば GZIP フォーマットで圧縮するときは次のようにする。

>>> write('users.parquet', df, compression='GZIP')
>>> pf = ParquetFile('users.parquet')
>>> pf.to_pandas()
    name  age
0  Alice   15
1    Bob   20
2  Carol   25

pyarrow

続いては pyarrow を使ってみよう。 pyarrow は Apache Arrow プロジェクトの Python 実装という位置づけ。 Apache Arrow というのは、データエンジニアリングにおいてプログラミング言語などに依存しないメモリ上での共通のオブジェクト表現を実現するためのプロジェクト。 Apache Arrow のオブジェクトを永続化するために Apache Parquet フォーマットが使える。 pandas のオブジェクトを直接使うことはできないので、一旦 Apache Arrow のオブジェクトに変換することになる。

まずは pyarrow パッケージをインストールしておく。

$ pip install pyarrow
$ pip list --format=columns | grep pyarrow
pyarrow            0.7.1

まずは pandas の DataFrame オブジェクトを pyarrow で Parquet フォーマットにシリアライズする手順から。 前述した通り、これには一旦 pyarrow の Table オブジェクトに変換する必要がある。

>>> import pyarrow as pa
>>> table = pa.Table.from_pandas(df)

その上で parquet モジュールを使ってテーブルをファイルに書き出す。

>>> from pyarrow import parquet as pq
>>> pq.write_table(table, 'users.parquet')

デシリアライズするときも、まずはファイルの内容を Table オブジェクトとして読み込む。

>>> table = pq.read_table('users.parquet')

その上で Table オブジェクトを pandas の DataFrame オブジェクトに変換する。

>>> table.to_pandas()
    name  age
0  Alice   15
1    Bob   20
2  Carol   25

Parquet ファイルを圧縮したいときは write_table() メソッドに compression オプションを指定する。 ファイルを read_table() 関数で読み込むときは、自動で圧縮状態を認識してくれるようなので指定はいらない。

>>> pq.write_table(table, 'users.parquet', compression='gzip')
>>> table = pq.read_table('users.parquet')
>>> table.to_pandas()
    name  age
0  Alice   15
1    Bob   20
2  Carol   25

各フォーマットでのファイルサイズの比較

一通りの使い方が紹介できたので、続いてはフォーマットごとのファイルサイズを比較してみる。

サンプルとして、アイリスデータセットをダウンロードしてくる。

$ wget https://raw.github.com/vincentarelbundock/Rdatasets/master/csv/datasets/iris.csv

見慣れたいつものあれ。

$ head iris.csv
"","Sepal.Length","Sepal.Width","Petal.Length","Petal.Width","Species"
"1",5.1,3.5,1.4,0.2,"setosa"
"2",4.9,3,1.4,0.2,"setosa"
"3",4.7,3.2,1.3,0.2,"setosa"
"4",4.6,3.1,1.5,0.2,"setosa"
"5",5,3.6,1.4,0.2,"setosa"
"6",5.4,3.9,1.7,0.4,"setosa"
"7",4.6,3.4,1.4,0.3,"setosa"
"8",5,3.4,1.5,0.2,"setosa"
"9",4.4,2.9,1.4,0.2,"setosa"

元のサイズは 8kB だった。

$ du -h iris.csv
8.0K   iris.csv

できれば、もうちょっと大きいものを例にした方が良いと思うんだけど、とりあえずということで。

配布するときは圧縮をかけることも考えられるので tar.gz にもしてみよう。

$ tar czf iris.tar.gz iris.csv
$ du -h iris.tar.gz
4.0K   iris.tar.gz

4kB ということで、だいたい半分になった。

これを、まずは pandas の DataFrame オブジェクトとして読み込んでおく。

>>> import pandas as pd
>>> df = pd.read_csv('iris.csv')

Pickle

まずは Pickle フォーマットで保存する。 種類としては無圧縮のものと GZIP 圧縮をかけたものの二つ用意する。

>>> df.to_pickle('iris.pickle')
>>> df.to_pickle('iris.gzip.pickle', compression='gzip')

サイズを確認すると無圧縮のものは元の CSV と変わらず 8kB で圧縮をかけたものは 4kB になった。

$ du -h iris.pickle
8.0K   iris.pickle
$ du -h iris.gzip.pickle
4.0K   iris.gzip.pickle

fastparquet

続いては fastparquet を使って Parquet フォーマットで保存する。 今回も無圧縮のものと GZIP で圧縮したものを用意する。

>>> from fastparquet import write
>>> write('iris.fp.parquet', df)
>>> write('iris.fp.gzip.parquet', df, compression='GZIP')

サイズを確認すると無圧縮のものについては 12kB となって、元の CSV よりもサイズが増えている。 GZIP で圧縮したものについては 4kB と、かろうじて Pickle 形式のサイズに並んだ。

$ du -h iris.fp.parquet
 12K    iris.fp.parquet
$ du -h iris.fp.gzip.parquet
4.0K   iris.fp.gzip.parquet

pyarrow

続いては pyarrow を使って Parquet フォーマットで保存する。

前述した通り pyarrow では Parquet で保存する前に、まずは一旦 Table オブジェクトにする。 これがファイルサイズにどう影響するのかは気になった。

>>> import pyarrow as pa
>>> table = pa.Table.from_pandas(df)

変換した Table オブジェクトを Parquet フォーマットで保存する。 これには pyarrow.parquet.write_table() 関数を使う。 この関数はデフォルトで snappy を使ってデータ圧縮をかけるので、もし圧縮しないなら明示的に 'none' を指定する。

>>> from pyarrow import parquet as pq
>>> pq.write_table(table, 'iris.pa.parquet', compression='none')
>>> pq.write_table(table, 'iris.pa.gzip.parquet', compression='gzip')

ファイルサイズを確認すると 8kB となっていて元の CSV ファイルと同じだった。 また GZIP フォーマットで圧縮をかけた場合にもサイズが減っていない点は不思議に感じた。

$ du -h iris.pa.parquet
8.0K   iris.pa.parquet
$ du -h iris.pa.gzip.parquet
8.0K   iris.pa.gzip.parquet

ファイルのハッシュは異なるので、少なくともバイナリの内容は変化しているはずなんだけど。

$ md5 iris.pa.parquet
MD5 (iris.pa.parquet) = a148d8a431ec8903df4db4b67c2901b9
$ md5 iris.pa.gzip.parquet
MD5 (iris.pa.gzip.parquet) = faa7b6d73915b43e3a479d8418eb8cd1

もしかすると Apache Arrow が扱うオブジェクトの時点で何らかの最適化が行われているのかもしれない。

ファイルサイズのまとめ

一通り見ていくと保存したときのファイルサイズに関しては、どれも大して差はないようだった。 Apache Parquet についても元の CSV と同じか、もう少し大きくなる程度みたいだ。

各フォーマットでの読み込み時間の比較

続いてはメモリ上のオブジェクトにロードするときにかかる時間を比べてみることにした。 これ自体は、ファイルサイズの違いとか扱うオブジェクトの違いとかにも影響を受ける。 なので、今回のケースではこんな感じでしたっていう参考程度かも。

まずは、特定の処理を 10000 回実行するときの時間を測るための関数 stopwatch() を次のように用意しておく。

>>> import time
>>> def stopwatch(f, n=10000):
...     t0 = time.time()
...     for _ in range(n):
...         f()
...     t1 = time.time()
...     return t1 - t0
...

CSV

まずは無圧縮の CSV を 10000 回読む時間を測ってみよう。

対象は pandas の read_csv() 関数になる。 先ほどの stopwatch() 関数で時間を測れるようにデータセット名を関数に部分適用したものを作る。

>>> import functools
>>> read_csv = functools.partial(pd.read_csv, 'iris.csv')

上記で作った read_csv() 関数を使って時間を測ってみよう。

>>> stopwatch(read_csv)
9.748593091964722

今回使った環境では 9.74 秒かかった。

Pickle

続いては Pickle フォーマットを試してみる。

さっきと同じようにパラメータを部分適用した関数を作る。 無圧縮のものと GZIP 圧縮をかけたもの両方で測る。

>>> import functools
>>> read_pickle = functools.partial(pd.read_pickle, 'iris.pickle')
>>> read_gzip_pickle = functools.partial(pd.read_pickle, 'iris.gzip.pickle', compression='gzip')

実行してみると無圧縮のものが 3.58 秒で GZIP 圧縮したものが 4.77 秒だった。 CSV を扱うときの半分くらいで読めたことになる。

>>> stopwatch(read_pickle)
3.5826637744903564
>>> stopwatch(read_gzip_pickle)
4.771883010864258

圧縮したものが少し遅いのは解凍するのに時間がかかったせいなのかな。 これくらいのサイズだとディスクから読み込む時間には大差がなかったということかも。 もっと大きなデータセットを扱う場合には、結果がまた違ってくると思う。

Parquet (fastparquet)

続いては fastparquet を使って保存した Parquet フォーマットを測ってみる。

>>> import functools
>>> from fastparquet import ParquetFile
>>> read_fp_parquet = lambda: ParquetFile('iris.fp.parquet').to_pandas()
>>> read_fp_gzip_parquet = lambda: ParquetFile('iris.fp.gzip.parquet').to_pandas()

時間を測ってみよう。

>>> stopwatch(read_fp_parquet)
40.765795946121216
>>> stopwatch(read_fp_gzip_parquet)
50.68596577644348

ちょっとこれは予想外だったんだけど無圧縮のもので 40 秒、圧縮をかけたものでは 50 秒もかかった。 どこにボトルネックがあるのかは分からないけど時間がかかりすぎている。

Parquet (pyarrow)

続いては pyarrow を使って保存した Parquet フォーマットを測ってみる。

>>> import functools
>>> from pyarrow import parquet as pq
>>> read_pa_parquet = lambda: functools.partial(pq.read_table, 'iris.pa.parquet')().to_pandas()
>>> read_pa_gzip_parquet = lambda: functools.partial(pq.read_table, 'iris.pa.gzip.parquet')().to_pandas()

時間を測ってみよう。

>>> stopwatch(read_pa_parquet)
10.029934883117676
>>> stopwatch(read_pa_gzip_parquet)
10.79091191291809

pyarrow に関しては圧縮をかけてもかけなくても 10 秒前後となった。 CSV を扱うときと似たような時間になっている。

せっかくのカラム志向フォーマットなので、特定のカラムだけを読むような場合についても試してみよう。 データセットの中から Sepal.Length だけを読み出してみる。

>>> read_pa_parquet_sl = lambda: functools.partial(pq.read_table, 'iris.pa.parquet', columns=['Sepal.Length'])().to_pandas()
>>> read_pa_gzip_parquet_sl = lambda: functools.partial(pq.read_table, 'iris.pa.gzip.parquet', columns=['Sepal.Length'])().to_pandas()

時間を測ってみよう。

>>> stopwatch(read_pa_parquet_sl)
4.239685297012329
>>> stopwatch(read_pa_gzip_parquet_sl)
4.328531980514526

読み込むカラムを絞ると 4.3 秒前後と明確に時間が短くなった。

読み込み時間のまとめ

今回扱った環境では Pickle フォーマットの読み込みが早かった。 CSV と Parquet (pyarrow) がそれに続く感じ。 Parquet (fastparquet) は何が原因かまでは分からないものの、ちょっと時間がかかりすぎている。

Parquet (pyarrow) については読み込むカラムを制限すると明確に時間が短くなるのが面白かった。 ここらへんは、さすがカラム志向フォーマットという感じだろう。 取り扱うカラムがデータセットの一部だけ、という場面でこの特性は有利に働くと思われる。

全体のまとめ

今回は Apache Parquet フォーマットの Python 実装を試して、その特性を調べてみた。 今のところ Python でしか扱わないのであれば Pickle フォーマットで十分かなあという気もする。 とはいえデータセットが大きくて取り扱うカラムが一部だけという場面では pyarrow も使えそうだった。

pyarrow に関しては Apache Arrow という観点からも今後は使いやすくなっていくかもしれない。 Apache Arrow ではオブジェクトのメモリ上での表現方法の共通化についても狙ってきている。 つまり、今回でいえば Table オブジェクトが異なる言語やソフトウェアでも同様に扱えるようになっていくはず。 今回はデータの互換性という観点では見てこなかったけど、そこについても将来性があるということだ。