CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: map/filter 処理を並列化する

今回は Python でリストなんかへの map/filter 処理をマルチプロセスで並列化する方法について。 この説明だけだと、なんのこっちゃという感じだと思うので詳しくは後述する。

使った環境は次の通り。

$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.12.6
BuildVersion:   16G29
$ python --version
Python 3.6.3

下準備

Python の REPL を使って説明していくので、まずは起動する。

$ python

まずは、サンプルのデータとして 0 から 9 までの数字が入ったリストの data を用意しておく。

>>> list(range(10))
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> data = list(range(10))

map/filter

まず map 処理というのは、具体的には次のようなもの。 リストなどの全ての要素に対して同じ処理を適用して返ってきた内容で新しいリストなどのオブジェクトを作ること。 次のサンプルコードでは map 処理を使って各要素を 2 倍している。

>>> [i * 2 for i in data]
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
>>> list(map(lambda x: x * 2, data))
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

ちなみに、可読性の面から言って内包表記を使う方が望ましい。

そして filter 処理というのは次のようにリストなどの中から特定の条件に合致するものだけを取り出して新しいオブジェクトを作ること。 次のサンプルコードでは filter 処理を使って偶数だけを取り出している。

>>> [i for i in data if i % 2 == 0]
[0, 2, 4, 6, 8]
>>> list(filter(lambda x: x % 2 == 0, data))
[0, 2, 4, 6, 8]

上記の処理は、そのままだとシングルプロセス・シングルスレッドで実行される。 もちろんリストに含まれるデータ数が少なかったり、それぞれに適用する処理が軽ければ、それでも問題はない。 ただ、データ数が多くて適用する処理も多い場合には、これが全体のボトルネックになることがある。

concurrent map

まずは並列化した map 処理から示す。

マルチプロセスの処理を扱いたいので、まずは multiprocessing モジュールをインポートする。

>>> from multiprocessing import Pool
>>> import multiprocessing

続いては map 処理で適用する処理を関数として定義しておく。

>>> def double(x):
...     return x * 2
...

あとは multiprocessing.Pool#map() メソッドを使って関数をリストの各要素に適用してやる。

>>> with Pool(multiprocessing.cpu_count()) as pool:
...     pool.map(double, data)
...
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

これで map 処理がマルチプロセスで並列化できる。

ちなみに multiprocessing.Pool がコンテキストマネージャとして使えるのは Python 3 以降らしい。 なので Python 2 であれば、次のようにして後始末をする。 具体的には、使い終わったら multiprocessing.Pool#close() メソッドを呼ぶ。

>>> pool = Pool(multiprocessing.cpu_count())
>>> try:
...     pool.map(double, data)
... finally:
...     pool.close()
...
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

これを忘れるとプロセスプールが使うメモリが開放されない。

concurrent filter

続いては filter 処理の並列化について。

まずは filter 処理に使う関数を定義しておく。

>>> def is_even(x):
...     return x % 2 == 0
...

並列化する場合は次のようにする。 さっきの map 処理よりも、ちょっと難しいかもしれない。

>>> with Pool(multiprocessing.cpu_count()) as pool:
...     [i for i, keep in zip(data, pool.map(is_even, data)) if keep]
...
[0, 2, 4, 6, 8]

上記の処理を分解しながら見ていく。 まず、最初のポイントは pool.map(is_even, data) のところで、先ほど用意した is_even() をリストに適用している。 その部分だけを取り出すと、こんな感じ。

>>> with Pool(multiprocessing.cpu_count()) as pool:
...     pool.map(is_even, data)
... 
[True, False, True, False, True, False, True, False, True, False]

続いては、それで得られたリストと元のリストを zip() 関数でタプルを要素にもったイテレータにしている。

>>> with Pool(multiprocessing.cpu_count()) as pool:
...     list(zip(data, pool.map(is_even, data)))
... 
[(0, True), (1, False), (2, True), (3, False), (4, True), (5, False), (6, True), (7, False), (8, True), (9, False)]

あとは、タプルのうち二番目の要素が残すべきかを示すフラグになっているのでリスト内包表記の if を使って除外すれば良い。

>>> with Pool(multiprocessing.cpu_count()) as pool:
...     [i for i, keep in zip(data, pool.map(is_even, data)) if keep]
...
[0, 2, 4, 6, 8]

そうした意味では、除外する処理そのものはシングルスレッドになっている。 マルチプロセスで並列化されているのは、除外すべきか否かを判定する処理ということになる。

まとめ

今回はリストなどに対する map/filter 処理をマルチプロセスで並列化する方法について見てきた。

ちなみに、上記のコードを使って自分のタスクが本当に速くなるかはあらかじめ確認した方が良い。 なぜかというと、マルチプロセスでの並列化にはプロセス生成やプロセス間通信のオーバーヘッドがあるため。 特に後者に支払うコストがでかい。 上記のようにデータ数が少ない、適用する処理が軽いという場合には確実にオーバーヘッドの方が大きくなる。

適用できる場面としては、まず前提としてデータ数が多くて適用する処理が重いこと。 さらに、実際に適用した上でパフォーマンスを測ってみて速くなることが確認できる場合のみ、ということになる。 ちなみに自分で試した限りだとログのパース処理なんかにはかなり効いた。