CUBE SUGAR CONTAINER

技術系のこと書きます。

PySpark のスクリプトファイルで引数を扱う

今回は Apache Spark の Python インターフェースである PySpark について。 PySpark では定型的な作業についてはスクリプトファイル (*.py) にまとめて spark-submit コマンドで実行することになる。 その際に、動作に必要な引数をさばく方法について。 結論から先に書いてしまうと spark-submit コマンドでスクリプトファイルの後ろにアプリケーション用の引数を渡せば良いだけ。

使った環境は次の通り。 Apache Spark は YARN を使って分散環境を構築してある。

$ cat /etc/redhat-release 
CentOS Linux release 7.4.1708 (Core) 
$ uname -r
3.10.0-693.17.1.el7.x86_64
$ pyspark --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.3.0
      /_/
                        
Using Scala version 2.11.8, OpenJDK 64-Bit Server VM, 1.8.0_161
Branch master
Compiled by user sameera on 2018-02-22T19:24:29Z
Revision a0d7949896e70f427e7f3942ff340c9484ff0aab
Url git@github.com:sameeragarwal/spark.git
Type --help for more information.
$ hadoop version
Hadoop 2.8.3
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r b3fe56402d908019d99af1f1f4fc65cb1d1436a2
Compiled by jdu on 2017-12-05T03:43Z
Compiled with protoc 2.5.0
From source with checksum 9ff4856d824e983fa510d3f843e3f19d
This command was run using /home/vagrant/hadoop-2.8.3/share/hadoop/common/hadoop-common-2.8.3.jar

Apache Spark で PySpark のスクリプトファイルを実行する

とりあえず、まずは引数とかに関係なく PySpark のスクリプトファイルを実行する方法から。 記述する上での注意点として、最も重要なのは pyspark コマンドで起動する REPL とは若干流儀が異なるということ。 具体的には REPL であれば最初から用意されているいくつかのインスタンス変数が用意されていない。 そのためスクリプトファイルの中でそれらを自分で作成しなければいけない。

以下のサンプルコードでは REPL であれば最初から用意されている SparkContextSparkSession のインスタンスを作っている。 そして、それらを作った上で SparkSQL を呼び出すという内容になっている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession


def main():
    conf = SparkConf()
    conf.setAppName('example')
    sc = SparkContext(conf=conf)
    spark = SparkSession(sc)

    df = spark.sql('SELECT "Hello, World!" AS message')
    df.show()


if __name__ == '__main__':
    main()

この内容ではアプリケーションの引数がどうのとかは関係ない。

上記を適当な名前で保存して spark-submit コマンドで実行する。 今回はクラスタを YARN で管理しているので --master オプションに yarn を指定している。

$ spark-submit --master yarn example.py
...
+-------------+
|      message|
+-------------+
|Hello, World!|
+-------------+
...

うまくいけば上記のように SparkSQL で得られた DataFrame の内容がログの中に表示されるはず。

アプリケーションが動作するための引数を扱う

続いてはアプリケーションが動作するのに必要な引数をどうやって渡すかについて。 これについては spark-submit コマンドのヘルプを見れば、それについて書いてある。

$ spark-submit --help 2>&1 | head -n 1
Usage: spark-submit [options] <app jar | python file> [app arguments]

スクリプトファイルの後ろにアプリケーション固有の引数を書けば良いらしい。

上記を確認するために、以下のようなサンプルコードを用意した。 このコードでは Spark 固有の機能は使っておらず、ただ単に sys.argv を出力している。 これによって Python のモジュールに渡されてきた引数の内容を表示できる。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from __future__ import print_function

import sys

from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession


def main():
    conf = SparkConf()
    conf.setAppName('example')
    sc = SparkContext(conf=conf)
    spark = SparkSession(sc)

    print('args:', sys.argv)


if __name__ == '__main__':
    main()

上記のスクリプトファイルを spark-submit で実行してみよう。 スクリプトファイルの後ろにはアプリケーションの引数に見立てた文字列をいくつか追加しておく。

$ spark-submit --master yarn example.py foo bar baz
...
args: ['/home/vagrant/example.py', 'foo', 'bar', 'baz']
...

ちゃんと spark-submit 自体の引数とは別に sys.argv に引数が渡っていることがわかる。

アプリケーション固有の引数をパースする

アプリケーションへの引数の渡し方が分かったので、あとはパースしてアプリケーションの中で使うだけ。 次のサンプルコードでは argparse モジュールを使って引数をパースして SparkSQL の中に内容を埋め込んでいる。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from __future__ import print_function

import argparse

from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession


def main():
    conf = SparkConf()
    conf.setAppName('example')
    sc = SparkContext(conf=conf)
    spark = SparkSession(sc)

    parser = argparse.ArgumentParser(description='pyspark app args example')
    parser.add_argument('--message', type=str, required=True, help='show message')
    args = parser.parse_args()

    sql = 'SELECT "{msg}" AS message'.format(msg=args.message)
    df = spark.sql(sql)
    df.show()


if __name__ == '__main__':
    main()

上記を実行してみよう。 上手くいけば表示される内容が引数で渡した内容になる。

$ spark-submit --master yarn example.py --message "Hello, PySpark"
...
+--------------+
|       message|
+--------------+
|Hello, PySpark|
+--------------+
...

ばっちり。

めでたしめでたし。

入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム

入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム

Sparkによる実践データ解析 ―大規模データのための機械学習事例集

Sparkによる実践データ解析 ―大規模データのための機械学習事例集