CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: SQLAlchemy で悲観的・楽観的排他制御を試す

以前、MySQL を使ってリレーショナルデータベースの悲観的・楽観的排他制御を試してみた。 今回は、その悲観的・楽観的排他制御を Python の O/R マッパーの SQLAlchemy で実際に使ってみることにする。

blog.amedama.jp

使用する MySQL のバージョンは次の通り。

$ mysql -u root -e "show variables like 'version'"
+---------------+--------+
| Variable_name | Value  |
+---------------+--------+
| version       | 5.6.26 |
+---------------+--------+

トランザクション分離レベルはデフォルトの REPEATABLE READ を使う。

$ mysql -u root -e "select @@tx_isolation"
+-----------------+
| @@tx_isolation  |
+-----------------+
| REPEATABLE-READ |
+-----------------+

SQLALchemy をインストールする

まずは PIP を使って SQLAlchemy をインストールする。 また、リレーショナルデータベースには MySQL を使うのでドライバの mysqlclient も同時に必要となる。

$ python --version
Python 3.5.0
$ pip install sqlalchemy mysqlclient

悲観的排他制御

まずは悲観的排他制御から試してみよう。 サンプルコードは次の通り。 全体の流れとしては、まず動作確認に使う行をひとつ追加した上で、それを複数のスレッドから更新している。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import time
import threading

from sqlalchemy.ext.declarative.api import declarative_base
from sqlalchemy.sql.schema import Column
from sqlalchemy.sql.sqltypes import BigInteger
from sqlalchemy.sql.sqltypes import Text
from sqlalchemy.engine import create_engine
from sqlalchemy.orm.session import sessionmaker
from sqlalchemy.exc import SQLAlchemyError


Base = declarative_base()


# 銀行口座を模したテーブル
class Account(Base):
    __tablename__ = 'accounts'

    # 主キー
    id = Column(BigInteger, primary_key=True)
    # 口座の持ち主
    name = Column(Text, nullable=False)
    # 残高
    cash = Column(BigInteger, nullable=False)


def _update(session, name, cash_delta, wait):
    '''
    口座の残高を更新する関数
    :param Session session: セッション
    :param str name: 口座の持ち主
    :param int cash_delta: 更新する金額
    :param int wait: 更新するまでのディレイ
    '''
    try:
        with session.begin(subtransactions=True):
            # SELECT 時に排他ロックを取得する
            query = session.query(Account).with_lockmode('update')
            # 口座の持ち主から絞り込む
            account = query.filter_by(name=name).one()
            # ディレイをかける
            time.sleep(wait)
            # 口座の残高を更新する
            account.cash += cash_delta
    except SQLAlchemyError:
        session.rollback()


def main():
    # データベースと接続する
    engine = create_engine(
        'mysql://root@localhost/test',
        echo=True,
    )

    # テーブルを作成する
    Base.metadata.create_all(engine)

    # セッション作成用のオブジェクトを作る
    # autocommit: セッションの中で必要に応じて自動で commit する
    # expire_on_commit: commit 時にセッションに紐付いたオブジェクトを expire する
    SessionMaker = sessionmaker(
        bind=engine,
        autocommit=True,
        expire_on_commit=False,
    )

    # セッションを作る
    session = SessionMaker()
    # 最初に行をひとつ追加しておく
    with session.begin(subtransactions=True):
        account = Account(name='foo', cash=100)
        session.add(account)

    # 口座に 50 を加算するスレッド (ディレイを 1 秒かける)
    t1 = threading.Thread(target=_update, args=(SessionMaker(), 'foo', 50, 1))
    # 口座に 100 を加算するスレッド (ディレイなし)
    t2 = threading.Thread(target=_update, args=(SessionMaker(), 'foo', 100, 0))

    # スレッドを開始する
    t1.start()
    t2.start()


if __name__ == '__main__':
    main()

ポイントとしてはクエリを取得する際に Query#with_lockmode('update') を使って排他ロックを取得している点だ。

上記に適当な名前を付けて実行してみよう。 ログを見て分かる通り先に排他ロックを取得したスレッドのトランザクションがコミットするまで、後のスレッドは処理がブロックしている。 そのため、適切に口座の残高が 100 + 50 + 100 = 250 に更新されている。

$ python pessimistic.py
...(省略)...
2015-09-28 06:09:18,683 INFO sqlalchemy.engine.base.Engine BEGIN (implicit)
2015-09-28 06:09:18,683 INFO sqlalchemy.engine.base.Engine SELECT accounts.id AS accounts_id, accounts.name AS accounts_name, accounts.cash AS accounts_cash 
FROM accounts 
WHERE accounts.name = %s FOR UPDATE
2015-09-28 06:09:18,683 INFO sqlalchemy.engine.base.Engine ('foo',)
2015-09-28 06:09:18,684 INFO sqlalchemy.engine.base.Engine BEGIN (implicit)
2015-09-28 06:09:18,685 INFO sqlalchemy.engine.base.Engine SELECT accounts.id AS accounts_id, accounts.name AS accounts_name, accounts.cash AS accounts_cash 
FROM accounts 
WHERE accounts.name = %s FOR UPDATE
2015-09-28 06:09:18,685 INFO sqlalchemy.engine.base.Engine ('foo',)
2015-09-28 06:09:19,686 INFO sqlalchemy.engine.base.Engine UPDATE accounts SET cash=%s WHERE accounts.id = %s
2015-09-28 06:09:19,686 INFO sqlalchemy.engine.base.Engine (150, 1)
2015-09-28 06:09:19,687 INFO sqlalchemy.engine.base.Engine COMMIT
2015-09-28 06:09:19,689 INFO sqlalchemy.engine.base.Engine UPDATE accounts SET cash=%s WHERE accounts.id = %s
2015-09-28 06:09:19,689 INFO sqlalchemy.engine.base.Engine (250, 1)
2015-09-28 06:09:19,690 INFO sqlalchemy.engine.base.Engine COMMIT

悲観的排他制御を行わない場合

先ほどのソースコードから Query#with_lockmode() を取り除いた上で実行してみよう。 尚、先ほどの実行でできたテーブルは削除する必要がある。

$ python pessimistic.py
...(省略)...
2015-09-28 06:17:39,922 INFO sqlalchemy.engine.base.Engine BEGIN (implicit)
2015-09-28 06:17:39,922 INFO sqlalchemy.engine.base.Engine SELECT accounts.id AS accounts_id, accounts.name AS accounts_name, accounts.cash AS accounts_cash 
FROM accounts 
WHERE accounts.name = %s
2015-09-28 06:17:39,923 INFO sqlalchemy.engine.base.Engine ('foo',)
2015-09-28 06:17:39,924 INFO sqlalchemy.engine.base.Engine BEGIN (implicit)
2015-09-28 06:17:39,925 INFO sqlalchemy.engine.base.Engine SELECT accounts.id AS accounts_id, accounts.name AS accounts_name, accounts.cash AS accounts_cash 
FROM accounts 
WHERE accounts.name = %s
2015-09-28 06:17:39,925 INFO sqlalchemy.engine.base.Engine ('foo',)
2015-09-28 06:17:39,927 INFO sqlalchemy.engine.base.Engine UPDATE accounts SET cash=%s WHERE accounts.id = %s
2015-09-28 06:17:39,927 INFO sqlalchemy.engine.base.Engine (200, 1)
2015-09-28 06:17:39,927 INFO sqlalchemy.engine.base.Engine COMMIT
2015-09-28 06:17:40,930 INFO sqlalchemy.engine.base.Engine UPDATE accounts SET cash=%s WHERE accounts.id = %s
2015-09-28 06:17:40,930 INFO sqlalchemy.engine.base.Engine (150, 1)
2015-09-28 06:17:40,931 INFO sqlalchemy.engine.base.Engine COMMIT

排他制御がかかっていないため、先にコミットしたトランザクションの内容は後からのコミットで上書きされてしまっていることが分かる。

楽観的排他制御

次に楽観的排他制御を試してみる。 サンプルコードは次の通り。 全体の流れは先ほどと変わっていない。 楽観的排他制御では更新対象が変更されていないことを確認するのにバージョン番号などを使うため、テーブルにそれ用のカラムが必要になる。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import time
import threading

from sqlalchemy.ext.declarative.api import declarative_base
from sqlalchemy.sql.schema import Column
from sqlalchemy.sql.sqltypes import BigInteger
from sqlalchemy.sql.sqltypes import Text
from sqlalchemy.engine import create_engine
from sqlalchemy.orm.session import sessionmaker
from sqlalchemy.exc import SQLAlchemyError


Base = declarative_base()


# 銀行口座を模したテーブル
class Account(Base):
    __tablename__ = 'accounts'

    # 主キー
    id = Column(BigInteger, primary_key=True)
    # バージョン番号
    version_id = Column(BigInteger, nullable=False)
    # 口座の持ち主
    name = Column(Text, nullable=False)
    # 残高
    cash = Column(BigInteger, nullable=False)

    # バージョン番号のカラムを登録しておく
    __mapper_args__ = {
        'version_id_col': version_id
    }


def _update(session, name, cash_delta, wait):
    '''
    口座の残高を更新する関数
    :param Session session: セッション
    :param str name: 口座の持ち主
    :param int cash_delta: 更新する金額
    :param int wait: 更新するまでのディレイ
    '''
    try:
        with session.begin(subtransactions=True):
            # SELECT 時に排他ロックを取得する
            query = session.query(Account)
            # 口座の持ち主から絞り込む
            account = query.filter_by(name=name).one()
            # ディレイをかける
            time.sleep(wait)
            # 口座の残高を更新する
            account.cash += cash_delta
    except SQLAlchemyError:
        session.rollback()


def main():
    # データベースと接続する
    engine = create_engine(
        'mysql://root@localhost/test',
        echo=True,
    )

    # テーブルを作成する
    Base.metadata.create_all(engine)

    # セッション作成用のオブジェクトを作る
    # autocommit: セッションの中で必要に応じて自動で commit する
    # expire_on_commit: commit 時にセッションに紐付いたオブジェクトを expire する
    SessionMaker = sessionmaker(
        bind=engine,
        autocommit=True,
        expire_on_commit=False,
    )

    # セッションを作る
    session = SessionMaker()
    # 最初に行をひとつ追加しておく
    with session.begin(subtransactions=True):
        account = Account(name='foo', cash=100)
        session.add(account)

    # 口座に 50 を加算するスレッド (ディレイを 1 秒かける)
    t1 = threading.Thread(target=_update, args=(SessionMaker(), 'foo', 50, 1))
    # 口座に 100 を加算するスレッド (ディレイなし)
    t2 = threading.Thread(target=_update, args=(SessionMaker(), 'foo', 100, 0))

    # スレッドを開始する
    t1.start()
    t2.start()


if __name__ == '__main__':
    main()

それでは実行してみよう。

$ python optimistic.py
...(省略)...
2015-09-28 06:29:10,309 INFO sqlalchemy.engine.base.Engine BEGIN (implicit)
2015-09-28 06:29:10,309 INFO sqlalchemy.engine.base.Engine SELECT accounts.id AS accounts_id, accounts.version_id AS accounts_version_id, accounts.name AS accounts_name, accounts.cash AS accounts_cash 
FROM accounts 
WHERE accounts.name = %s
2015-09-28 06:29:10,310 INFO sqlalchemy.engine.base.Engine ('foo',)
2015-09-28 06:29:10,311 INFO sqlalchemy.engine.base.Engine BEGIN (implicit)
2015-09-28 06:29:10,311 INFO sqlalchemy.engine.base.Engine SELECT accounts.id AS accounts_id, accounts.version_id AS accounts_version_id, accounts.name AS accounts_name, accounts.cash AS accounts_cash 
FROM accounts 
WHERE accounts.name = %s
2015-09-28 06:29:10,312 INFO sqlalchemy.engine.base.Engine ('foo',)
2015-09-28 06:29:10,313 INFO sqlalchemy.engine.base.Engine UPDATE accounts SET version_id=%s, cash=%s WHERE accounts.id = %s AND accounts.version_id = %s
2015-09-28 06:29:10,313 INFO sqlalchemy.engine.base.Engine (2, 200, 1, 1)
2015-09-28 06:29:10,313 INFO sqlalchemy.engine.base.Engine COMMIT
2015-09-28 06:29:11,317 INFO sqlalchemy.engine.base.Engine UPDATE accounts SET version_id=%s, cash=%s WHERE accounts.id = %s AND accounts.version_id = %s
2015-09-28 06:29:11,317 INFO sqlalchemy.engine.base.Engine (2, 150, 1, 1)
2015-09-28 06:29:11,318 INFO sqlalchemy.engine.base.Engine ROLLBACK

楽観的排他制御では自分が操作しようとしている行が別のトランザクションから変更されていないことをバージョン番号などを元に確認する。 もし、変更されていることが分かれば割り込みを防ぐために内容を破棄してロールバックすることになる。 上記のログからも適切に変更を検出してトランザクションをロールバックしていることがわかる。