CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: fasteners の便利な排他ロックを試す

Python には標準ライブラリとして、いくつか排他ロックの実装が用意されている。 例えば threading モジュールの Lock オブジェクトなどは、その代表といえる。

しかしながら、標準では用意されていないものもある。 例えばプロセス間の排他ロックやリードライトロックは標準ライブラリに用意されていない。 そのため、例えばもし自前でプロセス間の排他ロックを用意するとしたら fcntl モジュールなどを使って書くことになる。

とはいえ、排他ロックの実装というのはバグを作り込みやすい。 そもそも、マルチスレッドやマルチプロセスで競合しないプログラムを作ること自体が深く注意を払わなければできない作業だ。 その上、排他ロックの機構まで自前で用意するとしたら、二重でリスクを負うことになる。

fasteners

前置きが長くなったけど fasteners は前述したような排他ロックの機構を提供してくれるパッケージ。 今回は、このパッケージが用意している排他ロックを使ってみる。

公式ドキュメントは以下にある。

Welcome to Fasteners’s documentation! — Fasteners 0.14.1 documentation

使った環境は次の通り。

$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.11.6
BuildVersion:   15G1108
$ python --version
Python 3.5.2

インストール

fasteners はサードパーティ製のパッケージなので pip コマンドを使ってインストールする。

$ pip install fasteners

プロセス間の排他ロック

まず最初にプロセス間の排他ロックから。 これは、複数のプロセスが何らかのリソースを共有するシチュエーションで用いることになる。 複数のプロセスから共有リソースにアクセスを試みたときに、実際にアクセスすることができるプロセスは一つだけに制限するのに使う。 排他ロックを取得できたプロセスだけが、共有リソースにアクセスする権利を持つということ。

ここから始める検証では、テストのためにターミナルで Python の REPL をふたつ開いて実施しよう。 Python の REPL がふたつ、ということはそれぞれにプロセスが立ち上がるのでプロセス間の排他ロックの検証ができる。 ちなみに、ここで紹介するのはあくまでもマルチプロセスでの排他ロックなので、マルチスレッドでの排他ロックには使ってはいけない。

まず、ひとつ目の REPL で fasteners パッケージをインポートしよう。

>>> import fasteners

そして、プロセス間の排他ロックを司る InterProcessLock クラスをインスタンス化する。

>>> lock = fasteners.InterProcessLock('/var/tmp/lockfile')

このとき渡しているのは、ロックに用いるファイルを作るためのパスになっている。 つまり、各プロセスはこのロックファイルの状態を調べることで、別のプロセスが既にロックを取得していないかを確認し合うことになる。

そして、実際にロックをかけるには InterProcessLock#acquire() メソッドを呼び出す。 このメソッドはロックを取得できたか否かを真偽値で返してくる。

>>> lock.acquire()
True

上記では True が返っているので正常にロックが取れたことを意味している。

それでは、続いて別のターミナルで REPL を開いて同じようにロックの取得を試みてみよう。

>>> import fasteners
>>> lock = fasteners.InterProcessLock('/var/tmp/lockfile')

ふたつ目の REPL でもロックを取ろうとすると、そこでブロックする。 これは、ひとつ目の REPL が既にロックを取っているためだ。

>>> lock.acquire()
...

この状態で、ひとつ目の REPL でロックを開放してみよう。 ロックを開放するには InterProcessLock#release() メソッドを呼び出す。

>>> lock.release()

すると、ふたつ目の REPL でブロックしていた InterProcessLock#acquire() メソッドが返ってくるはず。

>>> lock.acquire()
True

返り値として True が返ってきていて、これでふたつ目の REPL にロックが移ったことが分かる。

後始末として、ひとまず、ふたつ目の REPL もロックを開放しておこう。

>>> lock.release()

ちなみに、ロックで怖いことの一つが誰かがロックを握ったまま手放さなくなることだ。 fasteners は内部的に fcntl モジュールを使って、そこらへんをケアしているので大丈夫そう。 試しに Python のプロセスを kill -KILL コマンドなどで強制終了してもロックはちゃんと開放される。

ブロックさせたくないとき

先ほどは別のプロセスがロックを取っている間は、別のプロセスがロックを取ろうとするとブロックしていた。 とはいえ、ロックを取ろうと試みて、もしだめなら別の仕事をするなりそのまま終了したいというような状況もあるはず。

そんなときは InterProcessLock#acquire() メソッドの引数である blocking に False を指定しよう。 こうすると、もしロックが取れなかったときは即座にリターンされる。

まず、ひとつ目の REPL でロックを取る。

>>> lock.acquire(blocking=False)
True

そして、ふたつ目の REPL でロックを取ろうと試みる。 このとき blocking に False を指定していると即座にリターンされる。 そして、返り値はロックが取れなかったことを表す False になっている。

>>> lock.acquire(blocking=False)
False

後片付けとしてロックを開放しておくのは忘れずに。

>>> lock.release()

コンテキストマネージャとして使う

ちなみに、ここまで使ってきた InterProcessLock クラスのインスタンスはコンテキストマネージャとしても動作する。 こうすると、コンテキストマネージャのブロック内がひとつのプロセスだけで実行されるようになる。 ただし、コンテキストマネージャとして使うときは blocking パラメータを渡すことはできないようだ。

>>> with fasteners.InterProcessLock('/var/tmp/lockfile'):
...     # do something
...     pass
...

この方がロックの開放漏れが起きづらくて良い。

上記のように使わないときは、必ずロックが開放されるように次のように書く必要があるはず。

>>> lock = fasteners.InterProcessLock('/var/tmp/lockfile')
>>> lock.acquire()
True
>>> try:
...     # do something
...     pass
... finally:
...     lock.release()
... 

デコレータとして使う

また InterProcessLock クラスのインスタンスはデコレータとしても使うことができる。 この場合はデコレータで修飾された関数のブロック内がロックを取得できたひとつのプロセスだけで実行されることになる。

>>> @fasteners.interprocess_locked('/var/tmp/lockfile')
... def do_something():
...     # do something
...     pass
...

ブロックを抜けるときは自動的にロックが開放されるので、これもロックの開放漏れが起こりづらいだろう。

リードライトロック

fasteners の、もうひとつの便利な排他ロックがリードライトロックだ。

通常の排他ロックの場合、実施する操作が読み込みだろうと書き込みだろうと関係ない。 ただ単に、排他ロックを取得できたスレッドないしプロセスだけが共有リソースにアクセスする権利を得る。 これは先ほど紹介したプロセス間の排他ロックも同様だった。

しかし、実際には読み込みだけは並列にアクセスできるようにしたい場面も多い。 なぜなら、読み込み操作は副作用のない処理だから。 副作用がなければ、複数のスレッドないしプロセスが並列にアクセスしても共有リソースが不整合を起こす心配はない。

そのため、リードライトロックでは、読み込みと書き込みが非対称な関係になっている。 読み取りについては複数のスレッドないしプロセスから並列でアクセスできる。 しかし、書き込みについてはひとつのスレッドないしプロセスだけで排他的に処理されるようになっている。

ちなみに、リードライトロック自体はスレッドやプロセスに関係のない概念となっている。 しかしながら fasteners のリードライトロックはスレッドにのみ対応している点には留意が必要だ。

また、リードライトロックを Python の標準ライブラリに追加しようという動きもあるようだ。 とはいえ、チケットを見る限り当初は Python 3.4 を目指して作業されていたものの、最近は進展が見られないようだ。

Issue 8800: add threading.RWLock - Python tracker

前置きが長くなったけど次に fasteners のリードライトロックを使ったサンプルプログラムを示す。 具体的にはリードライトロックを司る ReaderWriterLock クラスのインスタンスを使う。 ReaderWriterLock#read_lock() で読み込みロックを、ReaderWriterLock#write_lock() で書き込みロックを取得できる。 前述したように ReaderWriterLock#read_lock() は同時に複数のスレッドが取得できる。 それに対し ReaderWriterLock#write_lock() はひとつのスレッドだけが取得できる。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import time
import threading
import random

import fasteners


def reader(lock):
    # 読み込みロックをかける (読み込みだけは同時にできる)
    with lock.read_lock():
        thread_id = threading.get_ident()
        print('Thread {thread_id} is reading'.format(thread_id=thread_id))
        time.sleep(2)


def writer(lock):
    # 書き込みロックをかける (他の読み書きは排他制御される)
    with lock.write_lock():
        thread_id = threading.get_ident()
        print('Thread {thread_id} is writing'.format(thread_id=thread_id))
        time.sleep(4)


def main():
    lock = fasteners.ReaderWriterLock()

    # 読み込みスレッドを 6 つ用意する
    read_threads = [threading.Thread(target=reader, args=(lock,))
                    for _ in range(6)]
    # 書き込みスレッドを 4 つ用意する
    write_threads = [threading.Thread(target=writer, args=(lock,))
                     for _ in range(4)]

    # 一つにまとめてシャッフルする
    threads = read_threads + write_threads
    random.shuffle(threads)

    try:
        # スレッドを起動する
        for t in threads:
            t.start()
    finally:
        # 全てのスレッドが処理を終えるまで待つ
        for t in threads:
            t.join()


if __name__ == '__main__':
    main()

読み込みスレッドを 6 つ、書き込みスレッドを 4 つ用意して起動される順番をシャッフルした上で実行している。

上記を実行した結果の一例がこちら。 書き込みスレッドの処理はひとつずつ実行されるが、読み込みスレッドがロックを取得すると別の読み込みスレッドも続々とロックを取得して処理し始めることが分かる。

$ python rwlock.py
Thread 123145307557888 is writing
Thread 123145328578560 is reading
Thread 123145318068224 is reading
Thread 123145333833728 is reading
Thread 123145312813056 is reading
Thread 123145323323392 is reading
Thread 123145339088896 is reading
Thread 123145344344064 is writing
Thread 123145349599232 is writing
Thread 123145354854400 is writing

別のパターン。 こちらは最初にロックを取得したのが読み込みスレッドだったので、読み込みスレッドの処理が最初にいっぺんに実行されている。

$ python rwlock.py
Thread 123145307557888 is reading
Thread 123145312813056 is reading
Thread 123145323323392 is reading
Thread 123145328578560 is reading
Thread 123145333833728 is reading
Thread 123145349599232 is reading
Thread 123145318068224 is writing
Thread 123145339088896 is writing
Thread 123145344344064 is writing
Thread 123145354854400 is writing

ロックデコレータ

ここからは「そんなに便利か?」という感じではあるけど fasteners にあるロックデコレータの機能を紹介していく。

まずは @fasteners.locked デコレータを使うと、そのメソッドが排他に処理されるというもの。 これには、まずクラスの中で self._lock という名前でロックオブジェクトを用意しておく必要がある。 デコレータはメソッドからインスタンスのロックオブジェクトを取得して自動的にロックをかけてくれる。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import threading
import time

import fasteners


class SharedAmongThreads(object):
    '''スレッド間で共有されるオブジェクト'''

    def __init__(self):
        self._lock = threading.Lock()

    @fasteners.locked
    def have_to_thread_safe(self):
        '''マルチスレッドで呼び出されるためスレッドセーフである必要があるメソッド'''
        # do something
        thread_id = threading.get_ident()
        print('Thread {thread_id} is doing'.format(thread_id=thread_id))
        time.sleep(1)


def main():
    # スレッド間で共有するインスタンスを作る
    obj = SharedAmongThreads()

    # スレッド群を用意する
    threads = [threading.Thread(target=obj.have_to_thread_safe)
               for _ in range(5)]

    try:
        # スレッドを起動する
        for t in threads:
            t.start()
    finally:
        # 全てのスレッドの処理が終わるまで待つ
        for t in threads:
            t.join()


if __name__ == '__main__':
    main()

上記を実行した結果がこちら。 実際に実行してみると一秒ごとに処理が進んでいく様子が見て取れる。

$ python lockdeco.py
Thread 123145307557888 is doing
Thread 123145312813056 is doing
Thread 123145318068224 is doing
Thread 123145323323392 is doing
Thread 123145328578560 is doing

まあ、とはいえ fasteners の機能を使わなくてもコンテキストマネージャを使えば十分可読性は確保できそうな気がする。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import threading
import time


class SharedAmongThreads(object):
    '''スレッド間で共有されるオブジェクト'''

    def __init__(self):
        self._lock = threading.Lock()

    def have_to_thread_safe(self):
        '''マルチスレッドで呼び出されるためスレッドセーフである必要があるメソッド'''
        with self._lock:
            # do something
            thread_id = threading.get_ident()
            print('Thread {thread_id} is doing'.format(thread_id=thread_id))
            time.sleep(1)


def main():
    # スレッド間で共有するインスタンスを作る
    obj = SharedAmongThreads()

    # スレッド群を用意する
    threads = [threading.Thread(target=obj.have_to_thread_safe)
               for _ in range(5)]

    try:
        # スレッドを起動する
        for t in threads:
            t.start()
    finally:
        # 全てのスレッドの処理が終わるまで待つ
        for t in threads:
            t.join()


if __name__ == '__main__':
    main()

同様に、リストの中にロックオブジェクトを複数入れておくと @fasteners.locked() デコレータで同時にロックを取得してくれる。 ロックオブジェクトに使うメンバの名前が _lock 以外のときはデコレータの lock 引数でメンバ名を指定する。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import threading
import time

import fasteners


class SharedAmongThreads(object):
    '''スレッド間で共有されるオブジェクト'''

    def __init__(self):
        self._locks = [threading.Lock(), threading.Lock()]

    @fasteners.locked(lock='_locks')
    def have_to_thread_safe(self):
        '''マルチスレッドで呼び出されるためスレッドセーフである必要があるメソッド'''
        # do something
        thread_id = threading.get_ident()
        print('Thread {thread_id} is doing'.format(thread_id=thread_id))
        time.sleep(1)


def main():
    # スレッド間で共有するインスタンスを作る
    obj = SharedAmongThreads()

    # スレッド群を用意する
    threads = [threading.Thread(target=obj.have_to_thread_safe)
               for _ in range(5)]

    try:
        # スレッドを起動する
        for t in threads:
            t.start()
    finally:
        # 全てのスレッドの処理が終わるまで待つ
        for t in threads:
            t.join()


if __name__ == '__main__':
    main()

まあ、とはいえ上記も最近の Python ならコンテキストマネージャを横に並べるだけで事足りる。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import threading
import time


class SharedAmongThreads(object):
    '''スレッド間で共有されるオブジェクト'''

    def __init__(self):
        self._locks = [threading.Lock(), threading.Lock()]

    def have_to_thread_safe(self):
        '''マルチスレッドで呼び出されるためスレッドセーフである必要があるメソッド'''
        with self._locks[0], self._locks[1]:
            # do something
            thread_id = threading.get_ident()
            print('Thread {thread_id} is doing'.format(thread_id=thread_id))
            time.sleep(1)


def main():
    # スレッド間で共有するインスタンスを作る
    obj = SharedAmongThreads()

    # スレッド群を用意する
    threads = [threading.Thread(target=obj.have_to_thread_safe)
               for _ in range(5)]

    try:
        # スレッドを起動する
        for t in threads:
            t.start()
    finally:
        # 全てのスレッドの処理が終わるまで待つ
        for t in threads:
            t.join()


if __name__ == '__main__':
    main()

まとめ

  • fasteners を使うとプロセス間の排他ロックやリードライトロックといった便利な排他ロックが使える