読者です 読者をやめる 読者になる 読者になる

CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: concurrent.futures を使った並行・並列処理

Python

Python の concurrent.futures はバージョン 3.2 で追加された並行・並列処理用のパッケージ。 似たようなパッケージにはこれまでにも threading や multiprocessing があったんだけど、これはそれよりも高レベルの API になっている。 デフォルトでスレッド・プロセスプールが使えたり、マルチスレッドとマルチプロセスがほとんどコードを変えずに使い分けられるメリットがある。

下準備

使う Python のバージョンが 3.2 未満のときは PyPI にあるバックポート版のパッケージをインストールする必要がある。

$ pip install futures

ただし、今回使う環境は Python 3.5 なので関係ない。

$ python --version
Python 3.5.1
$ sw_vers 
ProductName:    Mac OS X
ProductVersion: 10.11.3
BuildVersion:   15D21

シングルスレッドの場合

今回は扱うものが並行・並列処理なので、計算量がたくさん必要になるものを題材とする。 試しに、いくつかの大きな数を素数か判定するプログラムをシングルスレッドで書いてみよう。 アルゴリズムとしては単純な試し割り法を使った。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import math


def _is_prime(n):
    """数値が素数か判定する関数"""
    if n < 2:
        return False

    if n == 2:
        return True

    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False

    return True


def _calc():
    # 調査対象の数値
    values = [
        112272535095293,
        112582705942171,
        112272535095293,
        115280095190773,
        115797848077099,
        1099726899285419,
    ]
    for target in values:
        # 素数かチェックして結果を表示する
        result = _is_prime(target)
        msg = '{n}: {prime}'.format(n=target, prime=result)
        print(msg)


def main():
    import time
    start = time.time()
    _calc()
    end = time.time()
    # 計算にかかった時間を表示する
    elapsed_time = end - start
    msg = 'Time: {elapsed_time} sec'.format(elapsed_time=elapsed_time)
    print(msg)


if __name__ == '__main__':
    main()

これを実行してみると、それぞれの数をリストに収まっている順番で判定していくことになる。

$ python singlethread.py
112272535095293: True
112582705942171: True
112272535095293: True
115280095190773: True
115797848077099: True
1099726899285419: False
Time: 3.3549559116363525 sec

シングルスレッド版は 3.35 秒かかった。

マルチスレッドの場合

次は先ほどのサンプルをマルチスレッドにしてみよう。 ソースコードの細かい説明はコメントで行っている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import math
from concurrent import futures


def _is_prime(n):
    """数値が素数か判定する関数"""
    if n < 2:
        return False

    if n == 2:
        return True

    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False

    return True


def _calc():
    # 調査対象の数値
    values = [
        112272535095293,
        112582705942171,
        112272535095293,
        115280095190773,
        115797848077099,
        1099726899285419,
    ]
    with futures.ThreadPoolExecutor() as executor:
        # 並行処理する対象とそれに対応する値 (処理の引数) を辞書で用意する
        mappings = {executor.submit(_is_prime, n): n for n in values}
        # futures.as_completed() は処理がおわったものから結果を返していくジェネレータ
        for future in futures.as_completed(mappings):
            # 完了した処理に対応する引数を辞書から取得する
            target = mappings[future]
            # 処理結果を取得する
            result = future.result()
            # 結果を表示する
            msg = '{n}: {prime}'.format(n=target, prime=result)
            print(msg)


def main():
    import time
    start = time.time()
    _calc()
    end = time.time()
    # 計算にかかった時間を表示する
    elapsed_time = end - start
    msg = 'Time: {elapsed_time} sec'.format(elapsed_time=elapsed_time)
    print(msg)


if __name__ == '__main__':
    main()

次も同じように実行してみよう。

$ python multithread.py
1099726899285419: False
112582705942171: True
112272535095293: True
112272535095293: True
115280095190773: True
115797848077099: True
Time: 3.3795218467712402 sec

先ほどとかかっている時間はほとんど変わっていない。 マルチスレッドにしたのに何故かと不思議に思われるかもしれない。 ただ、これにはちゃんとした原因がある。 CPython には GIL (Global Interpretor Lock) というものがあって、マルチスレッドにしても同時に実行できるスレッドはひとつだけだからだ。 むしろスレッドの生成コストの分だけ遅くなっても何ら不思議ではない。 とはいえ同時に実行できるスレッドがひとつなだけで、処理は「並行」に進められている。 その証拠に、先ほどとは数が出力されている順番が異なることに気づくことだろう。

マルチプロセスの場合

次は先ほどのサンプルをマルチプロセスにした場合。 なんと修正するところはたったひとつ。 先ほど使った ThreadPoolExecutor を ProcessPoolExecutor に変えるだけ。 これでマルチスレッドの実行がマルチプロセスでの実行に変わる。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import math
from concurrent import futures


def _is_prime(n):
    """数値が素数か判定する関数"""
    if n < 2:
        return False

    if n == 2:
        return True

    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False

    return True


def _calc():
    # 調査対象の数値
    values = [
        112272535095293,
        112582705942171,
        112272535095293,
        115280095190773,
        115797848077099,
        1099726899285419,
    ]
    with futures.ProcessPoolExecutor() as executor:
        # 並行処理する対象とそれに対応する値 (処理の引数) を辞書で用意する
        mappings = {executor.submit(_is_prime, n): n for n in values}
        # futures.as_completed() は処理がおわったものから結果を返していくジェネレータ
        for future in futures.as_completed(mappings):
            # 完了した処理に対応する引数を辞書から取得する
            target = mappings[future]
            # 処理結果を取得する
            result = future.result()
            # 結果を表示する
            msg = '{n}: {prime}'.format(n=target, prime=result)
            print(msg)


def main():
    import time
    start = time.time()
    _calc()
    end = time.time()
    # 計算にかかった時間を表示する
    elapsed_time = end - start
    msg = 'Time: {elapsed_time} sec'.format(elapsed_time=elapsed_time)
    print(msg)


if __name__ == '__main__':
    main()

同じように実行してみよう。

$ python multiprocess.py
112272535095293: True
112582705942171: True
112272535095293: True
115280095190773: True
1099726899285419: False
115797848077099: True
Time: 1.3142600059509277 sec

今度は複数のプロセスで「並列」に処理が進んだから実行時間が短縮されている。

まとめ

今回は Python の標準ライブラリ concurrent.futures を使った並行・並列処理について紹介した。 このライブラリを使うとマルチスレッドとマルチプロセスでほとんどソースコードを変えずに使い分けることができる。 ただし、マルチプロセスを使うときはひとつだけ注意が必要だ。 マルチプロセスの場合、プロセス間でのオブジェクトのやり取りは Pickle にもとづいて行われる。 そのため、並列化する関数の引数と返り値には Pickle 化できないオブジェクトを使うことができない。 Pickle についてはこのブログで以前に書いたことがあるのであわせて読んでもらいたい。

blog.amedama.jp