CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: 定期実行のアルゴリズムについて

今回は割と小ネタで、特定の処理を定期実行するようなプログラムを書く場合について考えてみる。 ただし、前提としてあくまで定期実行は Python の中で処理して cron 的なものには頼らないものとする。

使った環境は次の通り。

$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.14.5
BuildVersion:   18F132
$ python -V       
Python 3.7.3

ダメなパターン: 定期実行の時間だけ単純に sleep する

最初に考えられるのは、定期実行したい間隔で time.sleep() のような関数を使ってインターバルを入れるというもの。 ただし、このパターンでは肝心の定期実行したい処理にかかる時間が考慮できていない。

以下のサンプルコードでは 3 秒ごとに定期実行しているつもりでいる。 しかし、肝心の定期実行したい処理には 2 秒かかっている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from datetime import datetime
import time


def interval_task():
    """定期的に実行したい何か時間のかかる処理"""
    now = datetime.now()
    print(now.strftime('%H:%M:%S.%f'))
    # 実行に 2 秒くらいかかる
    time.sleep(2)


def schedule(interval_sec, callable_task, args=None, kwargs=None):
    """何らかの処理を定期的に実行する関数"""
    args = args or []
    kwargs = kwargs or {}
    while True:
        callable_task(*args, **kwargs)  # ここで時間を食う
        time.sleep(interval_sec)  # さらにスリープしてしまう


def main():
    # 3 秒ごとに実行している...つもり
    schedule(interval_sec=3, callable_task=interval_task)


if __name__ == '__main__':
    main()

上記を実行してみる。 表示を見て分かる通り 3 + 2 = 5 秒の間隔で時刻が表示されてしまっている。

$ python sched1.py 
11:24:16.698956
11:24:21.709522
11:24:26.710433
11:24:31.718227
...

ダメなパターン: 処理にかかる時間を開始・終了の前後で毎回計測して計算する

続いて考えられるのが、定期実行したい処理の前後で開始・終了時刻を計測してスリープする時間を補正するというもの。 ただし、この場合は定期実行したい処理以外にかかる処理時間が考慮できていない。

以下のサンプルコードでは定期実行したい処理の前後で時刻を計測してスリープする時間を補正している。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from datetime import datetime
import time


def interval_task():
    """定期的に実行したい何か時間のかかる処理"""
    now = datetime.now()
    print(now.strftime('%H:%M:%S.%f'))
    # 実行に 2 秒くらいかかる
    time.sleep(2)


def schedule(interval_sec, callable_task, args=None, kwargs=None):
    """何らかの処理を定期的に実行する関数"""
    args = args or []
    kwargs = kwargs or {}
    while True:
        # 処理開始時間を取得する
        start_timing = datetime.now()

        callable_task(*args, **kwargs)

        # 処理完了時間を取得する
        end_timing = datetime.now()
        # 実行間隔との差分を取る
        time_delta_sec = (end_timing - start_timing).total_seconds()
        # スリープすべき時間を計算する
        sleep_sec = interval_sec - time_delta_sec

        time.sleep(max(sleep_sec, 0))


def main():
    # 3 秒ごとに実行される...と良いなあ
    schedule(interval_sec=3, callable_task=interval_task)


if __name__ == '__main__':
    main()

上記を実行してみると、だいたい 3 秒ごとに時刻が表示されるため一見すると上手くいっているように見える。 しかし、よく見ると 1 ~ 10 ミリ秒の単位は単調に時刻が増加していることが分かる。 これは、定期実行以外の処理にかかる時間が考慮できていないため、実際には間隔がわずかに長くなってしまっている。

$ python sched2.py 
11:25:41.820993
11:25:44.826223
11:25:47.831453
11:25:50.836683
11:25:53.840000
11:25:56.844355
11:25:59.849602
...

また、間隔が少し長くなる以外にも、もう一つの問題がある。 定期実行したい処理が実行間隔よりもかかってしまうと、スリープする時間がマイナスになってしまう。 そのため、スリープを全く入れなくても実際より実行間隔が長くなってしまう。 いわゆるバッチの突き抜けみたいな状態。

以下のサンプルコードでは実行間隔として 3 秒を意図しているにもかかわらず、実際には定期実行の処理には 4 秒かかる。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from datetime import datetime
import time


def interval_task():
    """定期的に実行したい何か時間のかかる処理"""
    now = datetime.now()
    print(now.strftime('%H:%M:%S.%f'))
    # 実行に 4 秒くらいかかる
    time.sleep(4)


def schedule(interval_sec, callable_task, args=None, kwargs=None):
    """何らかの処理を定期的に実行する関数"""
    args = args or []
    kwargs = kwargs or {}
    while True:
        start_timing = datetime.now()

        # ここが意図した実行間隔よりも長くかかる (バッチの突き抜け)
        callable_task(*args, **kwargs)

        end_timing = datetime.now()
        time_delta_sec = (end_timing - start_timing).total_seconds()
        sleep_sec = interval_sec - time_delta_sec

        # sleep_sec が負の値になる
        time.sleep(max(sleep_sec, 0))


def main():
    schedule(interval_sec=3, callable_task=interval_task)


if __name__ == '__main__':
    main()

上記を実行すると、次のように本来意図した 3 秒ではなく 4 秒間隔で実行される。

$ python sched3.py 
11:26:52.933087
11:26:56.938491
11:27:00.943913
11:27:04.948757
11:27:08.949384

解決策: 特定の基準時刻を元にスリープする時間を補正しつつ別のスレッドで実行する

先ほどの問題点を解決するために二つの施策が必要となる。 まずひとつ目は一つの基準時刻を設けて、それを元にスリープする時刻を補正する。 これで、実行間隔がやや長くなってしまう問題が解決できる。 もうひとつは定期実行の処理を別のスレッドで実行することで、バッチの突き抜けを防止できる。

以下のサンプルコードでは特定の基準時刻を元にスリープする時間を補正している。 具体的には、処理の最初で取得した時刻から剰余演算でスリープすべき時間を計算する。 その上で定期実行の処理は別のスレッドを起動している。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from datetime import datetime
import time
import threading


def interval_task():
    """定期的に実行したい何か時間のかかる処理"""
    now = datetime.now()
    print(now.strftime('%H:%M:%S.%f'))
    # 実行に 4 秒くらいかかる
    time.sleep(4)


def schedule(interval_sec, callable_task,
             args=None, kwargs=None):
    """何らかの処理を定期的に実行する関数"""
    args = args or []
    kwargs = kwargs or {}
    # 基準時刻を作る
    base_timing = datetime.now()
    while True:
        # 処理を別スレッドで実行する
        t = threading.Thread(target=callable_task,
                             args=args, kwargs=kwargs)
        t.start()

        # 基準時刻と現在時刻の剰余を元に、次の実行までの時間を計算する
        current_timing = datetime.now()
        elapsed_sec = (current_timing - base_timing).total_seconds()
        sleep_sec = interval_sec - (elapsed_sec % interval_sec)

        time.sleep(max(sleep_sec, 0))


def main():
    schedule(interval_sec=3, callable_task=interval_task)


if __name__ == '__main__':
    main()

上記の実行結果は次の通り。 定期実行の処理に 4 秒かかったとしても、正しく 3 秒間隔で処理が実行できていることが分かる。 また、ミリ秒単位についても単調増加していない。

$ python sched4.py 
11:27:38.941382
11:27:41.946648
11:27:44.942504
11:27:47.946613
11:27:50.942494
...

オプション: スレッドプールを使う

先ほどの処理では単純に定期実行の度にスレッドを起動していた。 しかし、スレッドの生成には時間的・空間的な計算量がかかる。 もし、オーバーヘッドを小さくしたり、メモリを過剰に使われたくないときはスレッドプールを利用することが検討できるはず。

以下のサンプルコードではワーカーが 10 のスレッドプールを使って先ほどと同じ処理を実行している。 これで、10 を越えるスレッドが同時に生成されることがなくなる。 また、一度作られたスレッドは再利用されるため時間的な計算量でもわずかながら有利になるはず。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from datetime import datetime
import time
from concurrent.futures import ThreadPoolExecutor


def interval_task():
    """定期的に実行したい何か時間のかかる処理"""
    now = datetime.now()
    print(now.strftime('%H:%M:%S.%f'))
    # 実行に 4 秒くらいかかる
    time.sleep(4)


def schedule(interval_sec, callable_task,
             args=None, kwargs=None,
             workers_n=10):
    """何らかの処理を定期的に実行する関数"""
    args = args or []
    kwargs = kwargs or {}
    base_timing = datetime.now()

    # 必要以上にスレッドが生成されないようにスレッドプールを使う
    with ThreadPoolExecutor(max_workers=workers_n) as executor:
        while True:
            future = executor.submit(callable_task,
                                     *args, **kwargs)

            current_timing = datetime.now()
            elapsed_sec = (current_timing - base_timing).total_seconds()
            sleep_sec = interval_sec - (elapsed_sec % interval_sec)

            time.sleep(max(sleep_sec, 0))


def main():
    schedule(interval_sec=3, callable_task=interval_task)


if __name__ == '__main__':
    main()

上記の実行結果は次の通り。

$ python sched5.py 
11:30:30.000741
11:30:33.005632
11:30:36.002308
11:30:39.003651
11:30:42.002279
...

ただし、スレッドプールにはプールの上限に達したときにバッチの突き抜けが起こるという問題がある。 メモリが枯渇して OOM Killer に殺されるか、殺されないけど突き抜けるかは状況によって選ぶのが良いと思う。 とはいえメモリが枯渇するほどスケジュール実行のスレッドが生成される状況って、暴走しているような場合くらいな気もする?

以下のサンプルコードではワーカーの数を 1 に制限することで、意図的に突き抜けを起こるようにしている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from datetime import datetime
import time
from concurrent.futures import ThreadPoolExecutor


def interval_task():
    """定期的に実行したい何か時間のかかる処理"""
    now = datetime.now()
    print(now.strftime('%H:%M:%S.%f'))
    # 実行に 4 秒くらいかかる
    time.sleep(4)


def schedule(interval_sec, callable_task,
             args=None, kwargs=None,
             workers_n=10):
    """何らかの処理を定期的に実行する関数"""
    args = args or []
    kwargs = kwargs or {}
    base_timing = datetime.now()

    with ThreadPoolExecutor(max_workers=workers_n) as executor:
        while True:
            future = executor.submit(callable_task,
                                     *args, **kwargs)

            current_timing = datetime.now()
            elapsed_sec = (current_timing - base_timing).total_seconds()
            sleep_sec = interval_sec - (elapsed_sec % interval_sec)

            time.sleep(max(sleep_sec, 0))


def main():
    # 並列度を 1 にして時間のかかる処理を 1 秒ごとに実行した場合
    # スレッドが空くまで待たされる
    schedule(interval_sec=1, callable_task=interval_task,
             workers_n=1)


if __name__ == '__main__':
    main()

上記を実行してみよう。 たしかに突き抜けて処理に 4 秒かかっていることが分かる。

$ python sched7.py 
11:32:48.013018
11:32:52.014658
11:32:56.018407
11:33:00.024064
11:33:04.029571
...

なお、巷にはスケジュール実行するためのライブラリも色々とあって、それらを使うことで色々と楽ができる。 ただし、意図通りに動作させるためには上記のような考慮点についてあらかじめ検討しておく必要がある。 また、リアルタイム OS でない限り今回用いたようなコードで正しく定期実行されるという保証は実のところないはず。

いじょう。