CUBE SUGAR CONTAINER

技術系のこと書きます。

battery で Apple Silicon な MacBook のバッテリー充電に上限を設ける

リチウムイオン電池を使ったバッテリーは、一般的に残量が 0% や 100% の付近にあると劣化が進みやすい。 また、充電サイクルの回数が増える毎に、少しずつではあるが着実に劣化していく。 バッテリーが劣化すると、設計上の容量よりも電気を蓄える力が落ちて機器の稼働時間の短縮につながる。

最近の macOS には、バッテリーの寿命を延ばす目的で「バッテリー充電の最適化」という機能がある。 これは、機器の使われ方をデバイスが学習して、なるべく残量が 100% の状態を維持し続けないようにするもの。 この機能を有効にしていると、たまに充電が 80% 前後でしばらく止まることがある。 しかし、この機能を使ってもユーザが明示的に充電を止めることはできない。

そこで、今回は battery というアプリケーションを紹介する。 このアプリケーションを使うと Apple Silicon な MacBook で明示的にバッテリーの充電に上限を設定できる。

github.com

充電中に指定したバッテリーの残量まで到達すると、充電が自動で停止してそれ以上は増えない。 また、充電が停止している間も動作用の電源はケーブルから供給され続ける。 そのため、バッテリーの充電サイクルがむやみに増加することもない。 結果として、バッテリーの寿命を伸ばせる可能性がある。

もくじ

下準備

battery のインストール方法はいくつかある。 その中でも Homebrew からインストールするのが、おそらく一番カンタンだと思う。

$ brew install --cask battery

battery を実行する

battery は CUI と GUI の両方のアプリケーションがある。

GUI で実行する

GUI のアプリケーションは /Applications 以下にインストールされる。

$ open /Applications/battery.app

アプリケーションを起動すると、ツールバーにバッテリーのアイコンが現れる。 この状態で、自動でバッテリーの充電に 80% の上限が設定される。 設定は SMC (System Management Controller) に書き込まれるため、マシンを再起動しても設定はそのままになるようだ。

CUI で実行する

CUI に関しては battery というコマンドがインストールされる。 このコマンドで充電される上限を設定することもできる。 というより GUI は、この CUI の単なるラッパーのようだ。

たとえば battery status コマンドでバッテリーの充電状態を確認できたりもする。 以下は 80% に到達したことで充電が停止した際のログ。

$ battery status
10/15/23-17:19:32 - Battery at 80% (attached; remaining), smc charging disabled
10/15/23-17:19:32 - Your battery is currently being maintained at 80%

上限の数値を変更したいときは battery maintain サブコマンドを使う。

$ battery maintain 70

上記を実行すると GUI で実行しているアプリの表示も変わるはず。

ちなみに上記のコマンドは、内部的に smc-command という実装を使っているようだ。 これは Mac の SMC に対して特定のキー・バリューを書き込むもの。

github.com

なお、SMC に書き込みをするとファンの回転数の制御などもできる。 そのため、上記のリポジトリには「使い方によってはマシンに不可逆なダメージを与えうるので何があっても自己責任だよ」という注意書きがある。

まとめ

今回は battery というアプリケーションを使って Apple Silicon な MacBook のバッテリー充電に上限を設定する方法を書いた。

Python: tarfile で tar ファイルを圧縮・展開する

Python の標準ライブラリには tarfile というモジュールがある。 このモジュールを使うと tar 形式で複数のファイルをまとめることができる。 また tarfile モジュールは gzip や bzip2 といった形式の圧縮・展開もサポートしている。 今回は、そんな tarfile モジュールで、利用する場面の多い tar 形式でまとめて gzip 形式で圧縮したファイル (tar.gz) を扱ってみる。

使った環境は次のとおり。

$ sw_vers
ProductName:        macOS
ProductVersion:     13.5
BuildVersion:       22G74
$ python -V
Python 3.10.12
$ tar --version
bsdtar 3.5.3 - libarchive 3.5.3 zlib/1.2.11 liblzma/5.0.5 bz2lib/1.0.8

もくじ

tar ファイルを展開する

まずは extraction.txt というテキストファイルが 1 つ入った tar.gz ファイルを用意する。 ファイルは tar(1) コマンドを使って作成して、名前は extraction.tar.gz にした。

$ echo "Hello, World" > extraction.txt
$ tar czvf extraction.tar.gz extraction.txt 
a extraction.txt
$ file extraction.tar.gz 
extraction.tar.gz: gzip compressed data, last modified: Tue Aug 20 03:03:04 2023, from Unix, original size modulo 2^32 2048

上記のファイルを Python の tarfile モジュールで展開してみる。 以下のサンプルコードでは、アーカイブに含まれるファイルのバイト列を読み取ってファイル名と共に出力している。 tarfile.open() 関数は、モードに "r" を指定すると自動的に圧縮アルゴリズムを読み取って展開の処理をしてくれる。

"""任意の圧縮方式を使った tar ファイルを展開するサンプルコード"""

import tarfile


def main():
    filepath = "extraction.tar.gz"
    with tarfile.open(filepath, mode="r") as tar:
        # アーカイブに含まれるファイルの情報を一覧で取得する
        members: list[tarfile.TarInfo] = tar.getmembers()
        for tar_info in members:
            # ファイル名または TarInfo を指定してファイルオブジェクトにアクセスできる
            with tar.extractfile(tar_info) as file_fp:
                # ファイルの内容を出力する
                print(f"{tar_info.name}: {file_fp.read()}")


if __name__ == "__main__":
    main()

上記を実行してみよう。

$ python extraction.py
extraction.txt: b'Hello, World\n'

ちゃんと含まれている extraction.txt に "Hello, World" という文字列に解釈できるバイト列が含まれることが確認できた。

tar ファイルを作成する

続いては tar ファイルを作成するサンプルコードを示す。 tarfile モジュールの API は、すでにファイルシステムに存在するファイルを扱うものが多い。 しかし、オンメモリのデータをアーカイブにすることも、もちろんできる。 以下では io.BytesIO を使ってファイルライクオブジェクトを作成し、それをアーカイブに含めている。 アーカイブは compression.tar.gz という名前で、アーカイブに含まれるファイル名は compression.txt にした。 なお、gzip 形式で圧縮する場合には tarfile.open() 関数の mode 引数に "w:gz" を指定する。

"""gzip 形式で圧縮した tar ファイルを作成するサンプルコード"""

import tarfile
import io


def main():
    # アーカイブに含めるファイルの内容を用意する
    file_buffer = io.BytesIO()
    file_buffer.write("Hello, World\n".encode("ascii"))
    file_buffer.seek(0)

    # "w:gz" モードでファイルを開くことで gzip で圧縮した tar ファイルになる
    filepath = "compression.tar.gz"
    with tarfile.open(filepath, mode="w:gz") as tar:
        # アーカイブに含めるファイルの名前を指定する
        tar_info = tarfile.TarInfo(name="compression.txt")
        # アーカイブに含めるファイルのサイズを指定する
        tar_info.size = len(file_buffer.getvalue())
        # アーカイブにファイルを追加する
        tar.addfile(tar_info, fileobj=file_buffer)


if __name__ == "__main__":
    main()

上記を実行してみよう。

$ python compression.py

すると compression.tar.gz という名前でアーカイブのファイルができる。

$ file compression.tar.gz 
compression.tar.gz: gzip compressed data, was "compression.tar", last modified: Tue Aug 20 10:50:49 2023, max compression, original size modulo 2^32 10240

tar(1) コマンドを使って展開してみよう。

$ tar zxvf compression.tar.gz              
x compression.txt

すると compression.txt というファイルができる。 内容を確認してみよう。

$ cat compression.txt        
Hello, World

ちゃんとサンプルコードで使った文字列が書き込まれている。

ちなみに、今回はアーカイブの直下にファイルを配置した。 もし、展開したときにファイルをディレクトリに入れたいときは TarInfo の引数 name にスラッシュを含めよう。

まとめ

今回は Python の tarfile モジュールを使って tar ファイルを圧縮・展開してみた。

参考

docs.python.org

GCP: Cloud Functions で Cloud Storage にオブジェクトを保存する

今回は Google Cloud の Cloud Functions で実行した処理の中で Cloud Storage にオブジェクトを保存する方法について。 Cloud Functions で実行した何らかの処理の成果物を保存する先として Cloud Storage を使うイメージになる。

操作は、基本的に Google Cloud SDK の gcloud コマンドから実施する。 なお、操作の対象となる Google Cloud API が有効化されていない場合には、別途有効化するかを確認する表示が出ることもある。

使った環境は次のとおり。

$ sw_vers              
ProductName:        macOS
ProductVersion:     13.5
BuildVersion:       22G74
$ gcloud version                            
Google Cloud SDK 442.0.0
bq 2.0.96
core 2023.08.04
gcloud-crc32c 1.0.0
gsutil 5.25

下準備

まずは Google Cloud SDK をインストールしておく。

$ brew install google-cloud-sdk

そして、利用したいアカウントにログインする。

$ gcloud auth login

必要に応じて、デフォルトで操作する対象のプロジェクトを指定する。

$ gcloud config set project <project-id>

Cloud Pub/Sub のトピックを作成する

Cloud Functions のトリガーとして Cloud Pub/Sub を使いたい。 そこで、まずはトピックを作成しておく。 ここでは example-pubsub-topic という名前にした。

$ gcloud pubsub topics create example-pubsub-topic

Cloud Storage のバケットを作成する

続いて Cloud Storage にオブジェクトを保存する先となるバケットを作成する。 バケットに付ける名前は、Cloud Storage のシステム全体でユニークな必要がある。

今回はユーザ名や日付などから、ユニークになりやすい組み合わせでサンプルとなるバケットの名前を作った。 別にこれでなくとも誰かが使っているバケットの名前と衝突しなければ何でも構わない。

$ UNIQUE_BUCKET_NAME=$(echo $(whoami)-example-bucket-$(date +%Y%m%d)) 
$ echo $UNIQUE_BUCKET_NAME  
amedama-example-bucket-20230806

バケットは gcloud storage buckets create コマンドで作成する。 基本的に Cloud Storage の操作をするときは、対象を URI で指定する。 その際、スキーマとして先頭に gs:// をつける必要がある。 また、--location オプションを指定しない場合はマルチリージョンで us にバケットが作成される。

$ gcloud storage buckets create \
    gs://${UNIQUE_BUCKET_NAME} \
    --location=asia

Cloud Functions を作成する

続いては Cloud Storage のバケットにオブジェクトを作成する Cloud Functions を作成する。

まずは必要なファイルを入れるディレクトリを用意する。 ここでは helloworld という名前で作成した。

$ mkdir helloworld

続いて Cloud Functions の本体となる処理を Python のモジュールとして作成する。 ポイントは google.cloud.storage パッケージを使って Cloud Storage にオブジェクトを作成しているところ。 バケットの名前は BUCKET_NAME という環境変数から取得するように作ってある。 なお、環境変数が指定されていない場合のエラーハンドリングは省略してある。

$ cat << 'EOF' > helloworld/main.py
import logging
import os
from datetime import datetime

from google.cloud import logging as gcloud_logging
from google.cloud import storage as gcloud_storage


gcloud_logging.Client().setup_logging()
LOG = logging.getLogger(__name__)


def main(event, context):
    gcs_client = gcloud_storage.Client()
    bucket_name = os.environ.get("BUCKET_NAME")
    bucket = gcs_client.bucket(bucket_name)

    now = datetime.now()
    file_name = now.strftime("%Y-%m-%d/%H:%M:%S.log")
    blob = bucket.blob(file_name)

    timestamp = now.strftime("%Y%m%d%H%M%S")
    file_data = f"executed: {timestamp}\n"
    blob.upload_from_string(file_data)

    LOG.info(
        "successfully saved to %s in %s",
        file_name,
        bucket_name
    )

EOF

オブジェクトには Cloud Functions が実行された時刻をタイムスタンプとして書き込んでいる。

動作に必要なパッケージは requirements.txt に記述する。

$ cat << 'EOF' > helloworld/requirements.txt
google-cloud-logging
google-cloud-storage
EOF

上記で次のようなファイル構成が作られる。

$ tree helloworld 
helloworld
├── main.py
└── requirements.txt

1 directory, 2 files

Cloud Functions をデプロイする

先ほどのディレクトリを元に Cloud Functions をデプロイするには gcloud functions deploy コマンドを使う。 このとき --set-env-vars オプションを使って環境変数を設定できる。 バケットの名前として BUCKET_NAME を忘れずに指定しよう。 また --trigger-topic を指定することで、指定した Cloud Pub/Sub のトピックにメッセージが到着した際に処理がトリガーされる。

$ gcloud functions deploy helloworld \
  --gen2 \
  --no-allow-unauthenticated \
  --runtime python310 \
  --memory 128Mi \
  --region asia-northeast1 \
  --trigger-topic example-pubsub-topic \
  --source helloworld \
  --entry-point main \
  --set-env-vars BUCKET_NAME=${UNIQUE_BUCKET_NAME}

うまくいけば gcloud functions list にデプロイした Cloud Functions が表示される。

$ gcloud functions list
NAME        STATE   TRIGGER                      REGION      ENVIRONMENT
helloworld  ACTIVE  topic: example-pubsub-topic  asia-northeast1  2nd gen

また gcloud functions logs read コマンドでエラーなどが出ていないかも確認しておこう。

$ gcloud functions logs read helloworld --region asia-northeast1
LEVEL  NAME        TIME_UTC                 LOG
I      helloworld  2023-08-06 14:56:35.116  Default STARTUP TCP probe succeeded after 1 attempt for container "helloworld-1" on port 8080.

正常にデプロイできたら Cloud Pub/Sub にメッセージを送信して Cloud Functions を起動しよう。

$ gcloud pubsub topics publish projects/$(gcloud config get-value project)/topics/example-pubsub-topic --message "-"

少ししたら、また gcloud functions logs read コマンドを実行してログを確認しよう。 うまくいけば次のようなログが出力される。

$ gcloud functions logs read helloworld --region asia-northeast1                                                         
LEVEL  NAME        TIME_UTC                 LOG
I      helloworld  2023-08-06 14:59:00.192  successfully saved to 2023-08-06/14:58:57.log in amedama-example-bucket-20230806
I      helloworld  2023-08-06 14:59:00.191
I      helloworld  2023-08-06 14:58:57.341
I      helloworld  2023-08-06 14:56:35.116  Default STARTUP TCP probe succeeded after 1 attempt for container "helloworld-1" on port 8080.

Cloud Storage の内容を確認する

Cloud Functions から作成された Cloud Storage のオブジェクトを確認しよう。

gcloud storage ls -r コマンドを使うことでバケットの中身を再帰的に表示できる。

$ gcloud storage ls -r gs://${UNIQUE_BUCKET_NAME}
gs://amedama-example-bucket-20230806/:

gs://amedama-example-bucket-20230806/2023-08-06/:
gs://amedama-example-bucket-20230806/2023-08-06/14:58:57.log

オブジェクトの内容は gcloud storage cat コマンドで確認できる。

$ gcloud storage cat gs://${UNIQUE_BUCKET_NAME}/2023-08-06/14:58:57.log 
executed: 20230806145857

どうやら意図した通りに作成されているようだ。

後片付けする

動作の確認が終わったら後片付けをしよう。

まずは Cloud Storage のオブジェクトとバケットを削除する。 gcloud storage rm -r を使うとバケットの中身とバケット自体が再帰的に削除できる。

$ gcloud storage rm -r gs://${UNIQUE_BUCKET_NAME}

続いて gcloud functions delete コマンドを使って Cloud Functions の関数を削除する。

$ gcloud functions delete helloworld --region asia-northeast1

そして gcloud pubsub topics delete コマンドを使って Cloud Pub/Sub のトピックを削除する。

$ gcloud pubsub topics delete example-pubsub-topic

また、Cloud Functions ではデプロイしたファイル群などを以下のような名前で Cloud Storage に保存している。

gcf-v2-sources-<project-id>-<region>
gcf-v2-uploads-<project-id>-<region>

これらも気になる場合には先ほどと同じ要領で削除しておこう。

$ gcloud storage buckets list | grep name
$ gcloud storage rm -r gs://<bucket-name> 

まとめ

今回は Cloud Functions を使って Cloud Storage にオブジェクトを保存する方法を確認した。

Python: lhafile で LZH フォーマットの圧縮ファイルを展開する

一昔前の日本では、ファイルの圧縮に LZH フォーマットがよく使われていた。 今ではほとんど使われることが無くなったとはいえ、しぶとく生き残っているシステムもある。 今回は、そうしたシステムからダウンロードしたファイルを Python の lhafile で展開する方法について書く。

使った環境は次のとおり。

$ sw_vers
ProductName:        macOS
ProductVersion:     13.5
BuildVersion:       22G74
$ uname -srm
Darwin 22.6.0 arm64
$ lha --version
LHa for UNIX version 1.14i-ac20050924p1 (arm-apple-darwin22.1.0)
$ python -V
Python 3.10.12
$ pip list | grep lhafile
lhafile         0.3.0

もくじ

下準備

LZH フォーマットの圧縮ファイルを用意するために lha をインストールしておく。

$ brew install lha

そして、本題となる lhafile もインストールしておく。

$ pip install lhafile

圧縮ファイルを用意する

まずは展開のサンプルに使う圧縮ファイルを用意したい。

まずは greet.txt という名前でテキストファイルを作成する。

$ echo "Hello, World" > greet.txt

上記のファイルを lha(1) で圧縮する。

$ lha a greet.lzh greet.txt 
greet.txt   - Frozen(100%)o

これで greet.lzh という名前で圧縮ファイルができた。

$ file greet.lzh                       
greet.lzh:   LHarc 1.x/ARX archive data  [lh0], 'U' OS

lhafile で展開する

ここからは Python の REPL を使って動作を見ていく。

$ python

まずは lhafile モジュールから LhaFile クラスをインポートする。

>>> from lhafile import LhaFile

先ほどのファイル名を指定してクラスをインスタンス化しよう。

>>> lha_file = LhaFile("greet.lzh")

圧縮ファイルに含まれるファイル名のリストは LhaFile#namelist() メソッドで得られる。

>>> lha_file.namelist()
['greet.txt']

圧縮ファイルに含まれるファイルの内容を展開したいときは LhaFile#read() メソッドを使う。 先ほど得られたファイル名を指定することで、展開したファイルのバイト列が得られる。

>>> lha_file.read("greet.txt")
b'Hello, World\n'

バイト列さえ得られれば、あとは好きに処理すれば良い。 たとえば、またファイルとして書き出してみよう。

>>> with open("/tmp/greet.txt", mode="wb") as fp:
...     fp.write(lha_file.read("greet.txt"))
... 
13

書き出したファイルの中身を確認すると、ちゃんと圧縮前の内容が得られる。

$ cat /tmp/greet.txt      
Hello, World

いじょう。

参考

github.com

GCP: Cloud Functions を Cloud Scheduler から定期実行する

何らかの処理を定期的に実行したくなる場面は多い。 トラディショナルなやり方であれば、仮想マシンを用意して cron などで処理を呼び出すと思う。 もちろん、それでも良いんだけど、よりシンプルに実装したい気持ちが出てくる。 具体的にはマシンの管理をなくした、いわゆるサーバレス・コンピューティングで楽がしたくなる。

Google Cloud であれば、このようなニーズに対して以下のサービスを組み合わせるのが良いようだ。

  • Cloud Functions

    • サーバレスで特定の処理 (関数) を呼び出すためのサービス
  • Cloud Scheduler

    • フルマネージドな cron ジョブを提供するサービス
  • Cloud Pub/Sub

    • 非同期のスケーラブルなメッセージングを提供するサービス

利用の流れは次のとおり。 まず、Cloud Functions で定期的に実行したい何らかの処理を定義する。 その際、Cloud Pub/Sub にメッセージが到達したタイミングで処理が実行されるように設定する。 そして、Cloud Scheduler から特定のタイミングで Cloud Pub/Sub にメッセージを送ることになる。

今回は、サービスを組み合わせて 1 分ごとに Cloud Functions を実行させてみよう。 操作は、基本的に Google Cloud SDK の gcloud コマンドから実施する。 なお、操作の対象となる Google Cloud API が有効化されていない場合には、別途有効化するかを確認する表示が出ることもある。

使った環境は次のとおり。

$ sw_vers              
ProductName:        macOS
ProductVersion:     13.5
BuildVersion:       22G74
$ gcloud version             
Google Cloud SDK 441.0.0
bq 2.0.95
core 2023.07.28
gcloud-crc32c 1.0.0
gsutil 5.25

もくじ

下準備

まずは下準備として Google Cloud SDK をインストールしておく。

$ brew install google-cloud-sdk  

gcloud コマンドが使えるようになるのでログインする。

$ gcloud auth login

操作するプロジェクトを指定する。 なお、プロジェクトはあらかじめ作成しておく。

$ gcloud config set project <project-name>

Cloud Pub/Sub のトピックを作成する

まずは Cloud Scheduler と Cloud Functions の間をつなぐ Cloud Pub/Sub のトピックを作成する。 名前は分かりやすければ何でも構わない。 今回は example-pubsub-topic という名前にした。

$ gcloud pubsub topics create example-pubsub-topic

Cloud Functions を Python で作成する

続いて Cloud Functions を作成する。 プログラミング言語として今回は Python を利用する。

まずは必要なファイル一式を収めるディレクトリを作成する。 ここでは helloworld という名前にした。

$ mkdir helloworld

続いて肝心の実行される処理を定義する。 以下では main.py というモジュール名で作成している。 処理の本体は main() 関数で、この関数がイベントハンドラとして呼び出される。 ただし、中身の処理はログを 1 行出力しているだけ。 なお、ログの出力先を Cloud Logging にするために google-cloud-logging パッケージを利用している。

$ cat << 'EOF' > helloworld/main.py
import logging

from google.cloud import logging as gcloud_logging


# Cloud Logging にログを出力できるようセットアップする
gcloud_logging.Client().setup_logging()
LOG = logging.getLogger(__name__)


def main(event, context):
    # Cloud Functions のイベントハンドラでログを出力する
    LOG.info("Hello, World!")

EOF

処理が定義できたら必要なパッケージを requirements.txt で定義する。 ここでは、先ほどの処理の中で利用していた google-cloud-logging をインストールしている。

$ cat << 'EOF' > helloworld/requirements.txt
google-cloud-logging
EOF

ここまでで、ディレクトリの構成は次のようになっている。

$ tree helloworld 
helloworld
├── main.py
└── requirements.txt

1 directory, 2 files

これで Cloud Functions に必要な準備が整った。

Cloud Functions をデプロイする

続いて Cloud Functions をデプロイする。 デプロイには gcloud functions deploy コマンドを使う。

以下では helloworld という名前で Cloud Functions をデプロイしている。

$ gcloud functions deploy helloworld \
  --gen2 \
  --no-allow-unauthenticated \
  --runtime python310 \
  --memory 128Mi \
  --region asia-east1 \
  --trigger-topic example-pubsub-topic \
  --source helloworld \
  --entry-point main

オプションについては次のような意味になる。

  • --gen2

    • 現在 (2023-08) の Cloud Functions には第 1 世代と第 2 世代があり、後者を利用するために指定している
  • --no-allow-unauthenticated

    • 任意のユーザが呼び出しできないように指定している
  • --runtime python310

    • Python 3.10 / Ubuntu 22.04 LTS の環境で実行されるように指定している
  • --memory 128Mi

    • ランタイムが利用できるメモリのサイズを指定している
  • --region asia-east1

    • デプロイ先のリージョンを指定している
  • --trigger-topic example-pubsub-topic

    • メッセージが到着した際に実行されるトピックを指定している
  • --source helloworld

    • デプロイするディレクトリを指定している
  • --entry-point main

    • イベントハンドラの関数名を指定している

デプロイが成功すると gcloud functions list コマンドで確認できる。 もしエラーになったときはログなどから原因を調査する。

$ gcloud functions list
NAME        STATE   TRIGGER                      REGION      ENVIRONMENT
helloworld  ACTIVE  topic: example-pubsub-topic  asia-east1  2nd gen

デプロイされた時点で Cloud Functions のログが gcloud functions logs read コマンドで確認できる。

$ gcloud functions logs read helloworld --region asia-east1
LEVEL    NAME        TIME_UTC                 LOG
I        helloworld  2023-08-06 15:48:33.154  Default STARTUP TCP probe succeeded after 1 attempt for container "helloworld-1" on port 8080.

デプロイできたら、手動で Cloud Pub/Sub にメッセージを送って Cloud Functions を実行してみる。

$ gcloud pubsub topics publish projects/$(gcloud config get-value project)/topics/example-pubsub-topic --message "-"

うまくいけばメッセージの到着によって Cloud Functions が実行される。 次のようにログが出力されることを確認しよう。 なお、空白のログは Cloud Functions が起動されたことを示しているらしい。 空白のログが 2 回出るのはデプロイした後の初回の起動時だけのようだ。

$ gcloud functions logs read helloworld --region asia-east1
LEVEL    NAME        TIME_UTC                 LOG
I        helloworld  2023-08-06 15:50:14.490  Hello, World!
I        helloworld  2023-08-06 15:50:14.395
I        helloworld  2023-08-06 15:50:14.260
I        helloworld  2023-08-06 15:48:33.154  Default STARTUP TCP probe succeeded after 1 attempt for container "helloworld-1" on port 8080.

これで Cloud Functions が Cloud Pub/Sub のメッセージが到着した際に想定通り実行されることが確認できた。

Cloud Scheduler のジョブを作成する

最後に Cloud Scheduler を設定する。 まず、現在のジョブを gcloud scheduler jobs list で確認する。 ここでは何も設定されていない。

$ gcloud scheduler jobs list --location=asia-east1
Listed 0 items.

続いて Cloud Scheduler のジョブを作成する。 ジョブのタイプとして pubsub を指定する。 ここではジョブの名前に helloworld を指定している。

$ gcloud scheduler jobs create pubsub helloworld \
    --location asia-east1 \
    --schedule "* * * * *" \
    --topic "projects/$(gcloud config get-value project)/topics/example-pubsub-topic" \
    --message-body "-"

指定しているオプションの意味は次のとおり。

  • --location asia-east1

    • ジョブを作成するリージョンを指定している
  • --schedule "* * * * *"

    • UNIX cron の書式でスケジュールが実行されるタイミングを指定している
  • --topic "projects/$(gcloud config get-value project)/topics/example-pubsub-topic"

    • メッセージを送信する Cloud Pub/Sub のトピックを指定している
  • --message-body "-"

    • メッセージの内容を指定している (今回、中身は使っていないためダミー)

ちなみに UNIX cron の書式は以下のようなサービスで確認すると効率が良い。

crontab.guru

うまくいけば次のようにジョブが作成される。 これで 1 分ごとに Cloud Pub/Sub にメッセージが送信される。

$ gcloud scheduler jobs list --location=asia-east1
ID          LOCATION    SCHEDULE (TZ)        TARGET_TYPE  STATE
helloworld  asia-east1  * * * * * (Etc/UTC)  Pub/Sub      ENABLED

少し待って Cloud Functions のログを確認してみよう。 次のように 1 分ごとに処理が実行されてログが出力されていれば上手くいっている。

$ gcloud functions logs read helloworld --region asia-east1 | head -n 6 
LEVEL    NAME        TIME_UTC                 LOG
I        helloworld  2023-08-06 15:58:03.741  Hello, World!
I        helloworld  2023-08-06 15:58:03.718
I        helloworld  2023-08-06 15:57:04.491  Hello, World!
I        helloworld  2023-08-06 15:57:04.427
I        helloworld  2023-08-06 15:50:14.490  Hello, World!

後片付け

動作の確認が終わったら後片付けしよう。

まずは Cloud Scheduler のジョブを削除する。

$ gcloud scheduler jobs delete helloworld --location=asia-east1

続いて Cloud Functions の定義を削除する。

$ gcloud functions delete helloworld --region asia-east1

そして Cloud Pub/Sub のトピックを削除する。

$ gcloud pubsub topics delete example-pubsub-topic

また、Cloud Functions のファイル群は Cloud Storage にアップロードされる。 今回の構成であれば、次のような名前のバケットが作成されているはず。

  • gcf-v2-sources-<project-id>-<region>
  • gcf-v2-uploads-<project-id>-<region>

そこで、まずは次のようにバケットを確認する。

$ gcloud storage buckets list | grep name

バケットが確認できたら、次のようにして削除する。

$ gcloud storage rm -r gs://<bucket-name> 

いじょう。

まとめ

今回は Cloud Functions を Cloud Scheduler から定期実行する方法を試してみた。 また、一連の操作は基本的に Google Cloud SDK の CLI で実施した。

Python: TrueSkill が収束する様子を眺めてみる

TrueSkill は 2 人以上のプレイヤーまたはチームが対戦して勝敗を決める競技において、プレイヤーの実力を数値にする手法のひとつ。 TrueSkill は Microsoft が開発して特許や商標を保持している。 そのため、アルゴリズムを商用で利用するためには同社からライセンスを受ける必要がある。 なんでも Xbox のゲームでプレイヤーの実力を数値化して、適正なマッチングをするために使われているらしい。 同種のレーティングアルゴリズムとして有名なイロレーティング (Elo Rating) に比べると、次のようなメリットがある。

  • 1 vs 1 以外の競技にも使える
  • レーティングの収束が早い
  • レーティングの不確実性が得られる

なお、今回のエントリは以下のレーティングアルゴリズムを TrueSkill にしたバージョンとなっている。

blog.amedama.jp

使った環境は次のとおり。

$ sw_vers
ProductName:        macOS
ProductVersion:     13.5
BuildVersion:       22G74
$ python -V             
Python 3.10.12
$ pip list | egrep -i "(matplotlib|trueskill|numpy)"
matplotlib      3.7.2
numpy           1.25.1
trueskill       0.4.5

もくじ

下準備

$ pip install trueskill matplotlib

サンプルコード

これから示すサンプルコードは 2 つのフェーズに分かれている。

  1. 理論上のイロレーティングを与えたプレイヤーからモンテカルロ法で擬似的な対戦データを作成するフェーズ
  2. 作成した擬似的な対戦データから TrueSkill のレーティングの推移を計算して収束する様子を観察するフェーズ

まず 1. については、あらかじめ特定の値でイロレーティングを指定したプレイヤー同士をランダムに対戦させて、その勝率が疑似乱数を上回るかどうかで作成する。 この点は、イロレーティングが理論上の対戦勝率を計算できる点を利用している。

そして 2. については、1. で作成した疑似データを使って、各プレイヤーが初期値の状態から TrueSkill のレーティングを更新していく。 このとき、疑似データが十分にあれば、理論上のレーティングへと収束していくはずである。

疑似データは Alice, Bob, Charlie, David, Eve, Frank という名前をつけた 6 人のプレイヤーから生成する。 6 人のプレイヤーは各ラウンドでランダムにシャッフルされて 2 人ずつ対戦する。 対戦した際の勝者は、前述したとおり事前に規定したイロレーティングと疑似乱数にもとづいて計算される。

以下にサンプルコードを示す。 各プレイヤーは 200 刻みで理論上のイロレーティングを付与している。 そして、500 回のラウンドを実施する。 各ラウンドは 3 回の対戦が含まれるため、全体で 1,500 回の対戦が生じる。 TrueSkill の計算については、Python の trueskill パッケージを利用した。

import logging
import random

import numpy as np
from matplotlib import pyplot as plt
from trueskill import Rating
from trueskill import rate_1vs1


LOG = logging.getLogger(__name__)


class Player:
    """プレイヤーを表現したクラス"""

    def __init__(self, rating=1500):
        self.rating = rating

    def win_proba(self, other_player: "Player") -> float:
        """他のプレイヤーに勝利する確率を計算するメソッド"""
        return 1. / (10. ** ((other_player.rating - self.rating) / 400.) + 1.)


def simulated_match(player_a: Player, player_b: Player) -> bool:
    """モンテカルロ法でプレイヤー同士を対決させる

    :returns: シミュレーションで player_a が勝利したかを表す真偽値
    """
    player_a_win_ratio = player_a.win_proba(player_b)
    return random.random() < player_a_win_ratio


def main():
    # ログレベルを設定する
    logging.basicConfig(level=logging.INFO)

    # 乱数シードを固定する
    random.seed(42)

    # プレイヤーの理論上のレーティング
    ideal_players = {
        "Alice": Player(2000),
        "Bob": Player(1800),
        "Charlie": Player(1600),
        "David": Player(1400),
        "Eve": Player(1200),
        "Frank": Player(1000),
    }

    # モンテカルロ法で対戦履歴を生成する
    match_results = []
    for _ in range(500):
        # プレイヤーをシャッフルする
        shuffled_player_names = random.sample(
            list(ideal_players.keys()),
            len(ideal_players),
        )
        while len(shuffled_player_names) > 0:
            # シャッフルした結果から 1 vs 1 を取り出していく
            player_a_name, player_b_name = shuffled_player_names.pop(), shuffled_player_names.pop()
            player_a, player_b = ideal_players[player_a_name], ideal_players[player_b_name]
            # レーティングとランダムな値を元に対戦結果を求める
            win_player_name = player_a_name if simulated_match(player_a, player_b) else player_b_name
            match_results.append((player_a_name, player_b_name, win_player_name))
            # 対戦成績をログに出力する
            LOG.info(
                "match %s vs %s, winner: %s",
                player_a_name,
                player_b_name,
                win_player_name,
            )

    # プレイヤーのレーティング履歴
    simulated_players = {
        "Alice": [Rating()],
        "Bob": [Rating()],
        "Charlie": [Rating()],
        "David": [Rating()],
        "Eve": [Rating()],
        "Frank": [Rating()],
    }

    # 対戦成績を 1 件ずつ処理する
    for player_a_name, player_b_name, win_player_name in match_results:
        # 勝利プレイヤーと敗北プレイヤーの名前を取り出す
        winner_name = win_player_name
        loser_name = player_b_name if win_player_name == player_a_name else player_a_name

        # 履歴で最後 (最新) のレーティングを取り出す
        winner = simulated_players[winner_name][-1]
        loser = simulated_players[loser_name][-1]

        # 対戦成績を元にレーティングを更新する
        updated_winner, updated_loser = rate_1vs1(winner, loser)

        # 履歴の末尾に追加する
        simulated_players[winner_name].append(updated_winner)
        simulated_players[loser_name].append(updated_loser)

        # レーティングの更新をログに出力する
        LOG.info(
            "winner %s %.3f (%.2f) -> %.3f (%.2f), loser %s %.3f (%.2f) -> %.3f (%.2f)",
            winner_name,
            winner.mu,
            updated_winner.sigma,
            updated_winner.mu,
            updated_winner.sigma,
            loser_name,
            loser.mu,
            loser.sigma,
            updated_loser.mu,
            updated_loser.sigma,
        )

    # 最終的なレーティングをログに出力する
    for player_name, player_history in simulated_players.items():
        LOG.info(
            "player %s %.3f (%.2f)",
            player_name,
            player_history[-1].mu,
            player_history[-1].sigma,
        )

    # レーティングの推移を折れ線グラフで可視化する
    fig, ax = plt.subplots(1, 1)
    for player_name, player_history in simulated_players.items():
        # ミュー (推定されたレーティング) を折れ線グラフで図示する
        player_mu_history = np.array([player.mu for player in player_history])
        ax.plot(
            player_mu_history,
            label=player_name,
        )
        # シグマ (不確実性) を半透明の領域で折れ線の上下に図示する
        player_sigma_history = np.array([player.sigma for player in player_history])
        ax.fill_between(
            range(1, len(player_history) + 1),
            player_mu_history - player_sigma_history,
            player_mu_history + player_sigma_history,
            alpha=0.3,
        )
    ax.set_title(f"TrueSkill Rating History")
    ax.set_xlabel("Rounds")
    ax.set_ylabel("Rating")
    ax.grid()
    ax.legend()
    plt.show()


if __name__ == "__main__":
    main()

上記を実行しよう。 すると、最初に擬似的な対戦成績がログとして出力される。 その次に TrueSkill のレーティングが更新される様子がログとして出力される。 最後に、最終的なレーティングを出力している。

$ python ts.py 
INFO:__main__:match David vs Bob, winner: Bob
INFO:__main__:match Charlie vs Eve, winner: Charlie
INFO:__main__:match Alice vs Frank, winner: Alice
...(省略)...
INFO:__main__:winner Bob 31.327 (0.95) -> 31.351 (0.95), loser David 22.064 (0.89) -> 22.043 (0.89)
INFO:__main__:winner Alice 36.249 (1.08) -> 36.279 (1.08), loser Charlie 26.857 (0.90) -> 26.835 (0.90)
INFO:__main__:winner Eve 18.549 (0.94) -> 18.623 (0.94), loser Frank 14.695 (1.03) -> 14.606 (1.03)
INFO:__main__:player Alice 36.279 (1.08)
INFO:__main__:player Bob 31.351 (0.95)
INFO:__main__:player Charlie 26.835 (0.90)
INFO:__main__:player David 22.043 (0.89)
INFO:__main__:player Eve 18.623 (0.94)
INFO:__main__:player Frank 14.606 (1.03)

上記でカッコ内の数値はレーティングの不確実さを表している。 言いかえると、値が小さいほど、そのレーティングの数値を信頼できる。

また、実行が完了すると次のような折れ線グラフが得られる。 これは、各プレイヤーのレーティングの推移を示している。 網掛けの部分はレーティングの不確実性を表している。

TrueSkill の収束する様子

上記から 100 ラウンドほどで収束している様子が確認できる。 これは、先のエントリで確認したイロレーティングが収束するラウンドよりも少ない。

なお、イロレーティングと TrueSkill ではデフォルトで使用される平均的なプレイヤーの数値が異なる。 一般にイロレーティングでは 1500 を使う一方で、今回利用した TrueSkill の実装では 25 になっている。 そのため擬似的な対戦データを生成するのに使った理論上のイロレーティングの数値と TrueSkill の数値が大きく異なっている。

両者を比較しやすいようにイロレーティングの数値を TrueSkill の数値に換算してみよう。 まず、今回はプレイヤーのイロレーティングの理論値を次のようにした。

>>> import numpy as np
>>> elo_ratings = np.array([2000, 1800, 1600, 1400, 1200, 1000])

レーティングの数値から平均を引いて標準偏差で割ることで標準化する。 これでレーティングは平均が 0 で標準偏差が 1 になる。

>>> normalized_ratings = (elo_ratings - elo_ratings.mean()) / elo_ratings.std()
>>> normalized_ratings
array([ 1.46385011,  0.87831007,  0.29277002, -0.29277002, -0.87831007,
       -1.46385011])

次に標準化した数値に TrueSkill で使われている標準偏差をかけて平均を足す。 これで先ほどのイロレーティングの数値が TrueSkill の数値に換算できるはず。

>>> trueskill_ratings = normalized_ratings * 8.333333333333334 + 25
>>> trueskill_ratings
array([37.19875091, 32.31925055, 27.43975018, 22.56024982, 17.68074945,
       12.80124909])

先ほど実行した結果で、最終的に収束したレーティングの数値と比べると近いことが確認できる。

めでたしめでたし。

Python: イロレーティングが収束する様子を眺めてみる

イロレーティング (Elo Rating) は 2 人のプレイヤーが対戦して勝敗を決める競技において、プレイヤーの実力を数値にする手法のひとつ 1。 歴史のある古典的な手法だけど、現在でも様々な競技のレーティングに用いられている。 今回は、そんなイロレーティングをモンテカルロ法で作成した擬似的な対戦データを元に計算してみる。

使った環境は次のとおり。

$ sw_vers
ProductName:        macOS
ProductVersion:     13.5
BuildVersion:       22G74
$ python -V             
Python 3.10.12
$ pip list | grep -i matplotlib
matplotlib      3.7.2

もくじ

下準備

レーティングの推移を可視化するために、あらかじめ Matplotlib をインストールしておく。

$ pip install matplotlib

サンプルコード

これから示すサンプルコードは 2 つのフェーズに分かれている。

  1. 理論上のイロレーティングを与えたプレイヤーからモンテカルロ法で擬似的な対戦データを作成するフェーズ
  2. 作成した擬似的な対戦データからイロレーティングの推移を計算して収束する様子を観察するフェーズ

まず、1. についてはイロレーティングが理論上の対戦勝率を計算できる点を利用している。 あらかじめ特定の値でイロレーティングを指定したプレイヤー同士をランダムに対戦させて、その勝率が疑似乱数を上回るかどうかで作成する。

そして 2. については、1. で作成した疑似データを使って、各プレイヤーが初期値の状態からイロレーティングの数値を更新していく。 このとき、疑似データが十分にあれば、理論上のイロレーティングに収束していくはずである。

なお 1. の疑似データは Alice, Bob, Charlie, David, Eve, Frank という名前をつけた 6 人のプレイヤーから生成する。 6 人のプレイヤーは各ラウンドでランダムにシャッフルされて 2 人ずつ対戦する。 対戦した際の勝者は、前述したとおり事前に規定したイロレーティングと疑似乱数にもとづいて計算される。

以下にサンプルコードを示す。 各プレイヤーは 200 刻みで理論上のイロレーティングを付与している。 そして、500 回のラウンドを実施する。 各ラウンドは 3 回の対戦が含まれるため、全体で 1,500 回の対戦が生じる。

import logging
import random

from matplotlib import pyplot as plt


LOG = logging.getLogger(__name__)

# レーティングが更新される大きさを表す K ファクター
K_FACTOR = 32


class Player:
    """プレイヤーを表現したクラス"""

    def __init__(self, rating=1500):
        self.rating = rating

    def win_proba(self, other_player: "Player") -> float:
        """他のプレイヤーに勝利する確率を計算するメソッド"""
        return 1. / (10. ** ((other_player.rating - self.rating) / 400.) + 1.)


def simulated_match(player_a: Player, player_b: Player) -> bool:
    """モンテカルロ法でプレイヤー同士を対決させる

    :returns: シミュレーションで player_a が勝利したかを表す真偽値
    """
    player_a_win_ratio = player_a.win_proba(player_b)
    return random.random() < player_a_win_ratio


def update_rating(winner: Player, loser: Player) -> tuple[Player, Player]:
    """対戦成績を元にレーティングを更新する"""
    new_winner_rating = winner.rating + K_FACTOR * loser.win_proba(winner)
    new_loser_rating = loser.rating - K_FACTOR * loser.win_proba(winner)
    return Player(new_winner_rating), Player(new_loser_rating)


def main():
    # ログレベルを設定する
    logging.basicConfig(level=logging.INFO)

    # 乱数シードを固定する
    random.seed(42)

    # プレイヤーの理論上のレーティング
    ideal_players = {
        "Alice": Player(2000),
        "Bob": Player(1800),
        "Charlie": Player(1600),
        "David": Player(1400),
        "Eve": Player(1200),
        "Frank": Player(1000),
    }

    # モンテカルロ法で対戦履歴を生成する
    match_results = []
    for _ in range(500):
        # プレイヤーをシャッフルする
        shuffled_player_names = random.sample(
            list(ideal_players.keys()),
            len(ideal_players),
        )
        while len(shuffled_player_names) > 0:
            # シャッフルした結果から 1 vs 1 を取り出していく
            player_a_name, player_b_name = shuffled_player_names.pop(), shuffled_player_names.pop()
            player_a, player_b = ideal_players[player_a_name], ideal_players[player_b_name]
            # レーティングとランダムな値を元に対戦結果を求める
            win_player_name = player_a_name if simulated_match(player_a, player_b) else player_b_name
            match_results.append((player_a_name, player_b_name, win_player_name))
            # 対戦成績をログに出力する
            LOG.info(
                "match %s vs %s, winner: %s",
                player_a_name,
                player_b_name,
                win_player_name,
            )

    # プレイヤーのレーティング履歴
    simulated_players = {
        "Alice": [Player()],
        "Bob": [Player()],
        "Charlie": [Player()],
        "David": [Player()],
        "Eve": [Player()],
        "Frank": [Player()],
    }

    # 対戦成績を 1 件ずつ処理する
    for player_a_name, player_b_name, win_player_name in match_results:
        # 勝利プレイヤーと敗北プレイヤーの名前を取り出す
        winner_name = win_player_name
        loser_name = player_b_name if win_player_name == player_a_name else player_a_name

        # 履歴で最後 (最新) のレーティングを取り出す
        winner = simulated_players[winner_name][-1]
        loser = simulated_players[loser_name][-1]

        # 対戦成績を元にレーティングを更新する
        updated_winner, updated_loser = update_rating(winner, loser)

        # 履歴の末尾に追加する
        simulated_players[winner_name].append(updated_winner)
        simulated_players[loser_name].append(updated_loser)

        # レーティングの更新をログに出力する
        LOG.info(
            "winner %s %d -> %d (%d), loser %s %d -> %d (%d)",
            winner_name,
            winner.rating,
            updated_winner.rating,
            updated_winner.rating - winner.rating,
            loser_name,
            loser.rating,
            updated_loser.rating,
            updated_loser.rating - loser.rating,
        )

    # 最終的なレーティングをログに出力する
    for player_name, player_history in simulated_players.items():
        LOG.info("player %s %d", player_name, player_history[-1].rating)

    # レーティングの推移を折れ線グラフで可視化する
    fig, ax = plt.subplots(1, 1)
    for player_name, player_history in simulated_players.items():
        player_rating_history = [player.rating for player in player_history]
        ax.plot(player_rating_history, label=player_name)
    ax.set_title(f"Elo Rating History (K={K_FACTOR})")
    ax.set_xlabel("Rounds")
    ax.set_ylabel("Rating")
    ax.grid()
    ax.legend()
    plt.show()


if __name__ == "__main__":
    main()

上記を実行してみよう。 最初に出力されるログがモンテカルロ法で作成した擬似的な対戦データで、次に出力されるのが対戦成績にもとづいてイロレーティングが更新される様子になる。

$ python elo.py         
INFO:__main__:match David vs Bob, winner: Bob
INFO:__main__:match Charlie vs Eve, winner: Charlie
INFO:__main__:match Alice vs Frank, winner: Alice
...(省略)...
INFO:__main__:winner Bob 1808 -> 1811 (2), loser David 1357 -> 1355 (-2)
INFO:__main__:winner Alice 2062 -> 2064 (1), loser Charlie 1584 -> 1583 (-1)
INFO:__main__:winner Eve 1170 -> 1179 (9), loser Frank 1015 -> 1005 (-9)
INFO:__main__:player Alice 2064
INFO:__main__:player Bob 1811
INFO:__main__:player Charlie 1583
INFO:__main__:player David 1355
INFO:__main__:player Eve 1179
INFO:__main__:player Frank 1005

また、実行すると次のような折れ線グラフが得られる。

イロレーティングの収束する様子

上記から、理論上のレーティングに収束するまで大体 200 ラウンドほど要していることが確認できる。

収束を早くしたい場合には、レーティングを計算する際の定数 K (サンプルコードの K_FACTOR) を大きくすれば良い。 しかし、大きくすると今度はレーティングの変化も大きくなるため値が安定しにくくなるというトレードオフがある。 このトレードオフを緩和するために、最初の頃の対戦では定数を大きくしておいて、その後は小さくしていくようなテクニックもあるようだ。

いじょう。


  1. 1 対 1 の対戦でさえあれば、複数人のチームであっても構わない