CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: Luigi から S3 互換のオブジェクトストレージを使う

今回は、Python のデータパイプライン構築用フレームワークの Luigi から、Amazon 以外が提供している S3 互換のオブジェクトストレージを利用する方法について書いてみる。 S3 互換のオブジェクトストレージとしては、ひとまず以下のエントリで紹介した MinIO をローカルホストで動かした。

blog.amedama.jp

使った環境は次のとおり。

$ sw_vers     
ProductName:    macOS
ProductVersion: 11.4
BuildVersion:   20F71
$ python -V
Python 3.9.5
$ pip list | grep -i luigi   
luigi           3.0.3
$ minio --version
minio version RELEASE.2021-06-17T00-10-46Z

もくじ

下準備

下準備として、MinIO と AWS CLI、それに Luigi と Boto3 をインストールしておく。 Boto3 は AWS を操作するための Python のクライアントライブラリで、Luigi で AWS 関連の処理をするときに必要となる。

$ brew install minio awscli
$ pip install luigi boto3

デフォルトの設定で MinIO のサーバを起動する。

$ mkdir -p /tmp/minio
$ minio server /tmp/minio

そして、テスト用のバケットを example-bucket という名前で作っておく。

$ export AWS_ACCESS_KEY_ID=minioadmin
$ export AWS_SECRET_ACCESS_KEY=minioadmin
$ aws s3 --endpoint-url http://localhost:9000 mb s3://example-bucket

サンプルコード

早速だけど、以下にサンプルコードを示す。 サンプルコードでは、ExampleTask というタスクを 1 つ定義している。 タスクの output() メソッドを見ると分かるとおり、Luigi でタスクのターゲットを S3 にしたいときは luigi.contrib.s3.S3Target を使えば良い。 そして、このタスクは実行すると s3://example-bucket/greet.txt という URL にファイルを作る。 ファイルの中には `Hello, World! という文字列が書き込まれる。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-


import luigi
from luigi.contrib.s3 import S3Target


class ExampleTask(luigi.Task):

    def run(self):
        # NOTE: バケットは自動で作られない点に注意する
        with self.output().open(mode='w') as out_fp:
            print('Hello, World!', file=out_fp)

    def output(self):
        return S3Target(path=f's3://example-bucket/greet.txt')


if __name__ == '__main__':
    luigi.run(main_task_cls=ExampleTask,
              local_scheduler=True)

上記に適当な名前をつけて保存する。 ここでは例として example.py という名前にした。

さて、問題は上記をそのまま実行すると、アクセス先が本家の AWS になってしまうところ。 どうにかしてローカルホストにアクセスしてもらわないといけない。

結論から先に述べると、Luigi の設定ファイルに [s3] というセクションを作って、そこに設定を書けば良い。 前述したとおり、Luigi の S3 関連の処理は AWS SDK for Python (Boto3) に依存している。 [s3] というセクションに定義したパラメータは、boto3.client() を初期化するときの引数としてそのまま渡される。 つまり、ここでアクセス先や認証情報を変更できる。

$ cat << 'EOF' > luigi.cfg
[s3]
aws_access_key_id=minioadmin
aws_secret_access_key=minioadmin
use_ssl=False
endpoint_url=http://localhost:9000
EOF

設定できたところでタスクを実行してみよう。

$ python example.py

...

===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 ExampleTask()

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

結果を確認してみよう。 バケットを確認すると、ちゃんとオブジェクトができている。

$ aws --endpoint-url http://localhost:9000 s3 ls s3://example-bucket/ --recursive
2021-06-23 18:32:31         14 greet.txt

オブジェクトの中身を確認すると、ちゃんとメッセージが書き込まれていることがわかる。

$ aws --endpoint-url http://localhost:9000 s3 cp s3://example-bucket/greet.txt -
Hello, World!

めでたしめでたし。