今回は、Python のデータパイプライン構築用フレームワークの Luigi から、Amazon 以外が提供している S3 互換のオブジェクトストレージを利用する方法について書いてみる。 S3 互換のオブジェクトストレージとしては、ひとまず以下のエントリで紹介した MinIO をローカルホストで動かした。
使った環境は次のとおり。
$ sw_vers ProductName: macOS ProductVersion: 11.4 BuildVersion: 20F71 $ python -V Python 3.9.5 $ pip list | grep -i luigi luigi 3.0.3 $ minio --version minio version RELEASE.2021-06-17T00-10-46Z
もくじ
下準備
下準備として、MinIO と AWS CLI、それに Luigi と Boto3 をインストールしておく。 Boto3 は AWS を操作するための Python のクライアントライブラリで、Luigi で AWS 関連の処理をするときに必要となる。
$ brew install minio awscli $ pip install luigi boto3
デフォルトの設定で MinIO のサーバを起動する。
$ mkdir -p /tmp/minio $ minio server /tmp/minio
そして、テスト用のバケットを example-bucket
という名前で作っておく。
$ export AWS_ACCESS_KEY_ID=minioadmin $ export AWS_SECRET_ACCESS_KEY=minioadmin $ aws s3 --endpoint-url http://localhost:9000 mb s3://example-bucket
サンプルコード
早速だけど、以下にサンプルコードを示す。
サンプルコードでは、ExampleTask
というタスクを 1 つ定義している。
タスクの output()
メソッドを見ると分かるとおり、Luigi でタスクのターゲットを S3 にしたいときは luigi.contrib.s3.S3Target
を使えば良い。
そして、このタスクは実行すると s3://example-bucket/greet.txt
という URL にファイルを作る。
ファイルの中には `Hello, World!
という文字列が書き込まれる。
#!/usr/bin/env python3 # -*- coding: utf-8 -*- import luigi from luigi.contrib.s3 import S3Target class ExampleTask(luigi.Task): def run(self): # NOTE: バケットは自動で作られない点に注意する with self.output().open(mode='w') as out_fp: print('Hello, World!', file=out_fp) def output(self): return S3Target(path=f's3://example-bucket/greet.txt') if __name__ == '__main__': luigi.run(main_task_cls=ExampleTask, local_scheduler=True)
上記に適当な名前をつけて保存する。
ここでは例として example.py
という名前にした。
さて、問題は上記をそのまま実行すると、アクセス先が本家の AWS になってしまうところ。 どうにかしてローカルホストにアクセスしてもらわないといけない。
結論から先に述べると、Luigi の設定ファイルに [s3]
というセクションを作って、そこに設定を書けば良い。
前述したとおり、Luigi の S3 関連の処理は AWS SDK for Python (Boto3) に依存している。
[s3]
というセクションに定義したパラメータは、boto3.client()
を初期化するときの引数としてそのまま渡される。
つまり、ここでアクセス先や認証情報を変更できる。
$ cat << 'EOF' > luigi.cfg [s3] aws_access_key_id=minioadmin aws_secret_access_key=minioadmin use_ssl=False endpoint_url=http://localhost:9000 EOF
設定できたところでタスクを実行してみよう。
$ python example.py ... ===== Luigi Execution Summary ===== Scheduled 1 tasks of which: * 1 ran successfully: - 1 ExampleTask() This progress looks :) because there were no failed tasks or missing dependencies ===== Luigi Execution Summary =====
結果を確認してみよう。 バケットを確認すると、ちゃんとオブジェクトができている。
$ aws --endpoint-url http://localhost:9000 s3 ls s3://example-bucket/ --recursive 2021-06-23 18:32:31 14 greet.txt
オブジェクトの中身を確認すると、ちゃんとメッセージが書き込まれていることがわかる。
$ aws --endpoint-url http://localhost:9000 s3 cp s3://example-bucket/greet.txt - Hello, World!
めでたしめでたし。