CUBE SUGAR CONTAINER

技術系のこと書きます。

PySpark: 時刻と文字列を相互に変換する (DataFrame / Spark SQL)

今回は Apache Spark のインターフェースの一つである PySpark で時刻と文字列を相互に変換する方法について扱う。 PySpark にはいくつかの API があるけど、その中でも DataFrame と Spark SQL を使った方法について紹介する。

使った環境は次の通り。

$ pyspark --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.1
      /_/
                        
Using Scala version 2.11.8, OpenJDK 64-Bit Server VM, 1.8.0_161
Branch 
Compiled by user felixcheung on 2017-11-24T23:19:45Z
Revision 
Url 
Type --help for more information.

下準備

まずは時刻情報を文字列で表した RDD オブジェクトを用意する。 Apache Spark では主に RDD と DataFrame という二つのオブジェクトを中心にデータを操作する。 その中でも RDD はスキーマレスなオブジェクトになっている。

>>> rdd = sc.parallelize([
...   ('01/Jan/2016 10:13:16',),
...   ('02/Feb/2017 11:14:17',),
...   ('03/Mar/2018 12:15:18',)
... ])

時刻の形式は英語圏にありがちなフォーマットにした。

RDD を DataFrame に変換するためにスキーマ情報を定義する。

>>> from pyspark.sql.types import StructType
>>> from pyspark.sql.types import StructField
>>> from pyspark.sql.types import StringType
>>> schema = StructType([
...   StructField('dt_str', StringType(), False),
... ])

RDD とスキーマを元に DataFrame を生成する。

>>> df = spark.createDataFrame(rdd, schema)
>>> df.show()
+--------------------+
|              dt_str|
+--------------------+
|01/Jan/2016 10:13:16|
|02/Feb/2017 11:14:17|
|03/Mar/2018 12:15:18|
+--------------------+

これで準備ができた。

string から timestamp への変換 (Spark 2.2 ~)

Apache Spark 2.2 以降では、文字列の時刻情報をタイムスタンプに変換する関数として to_timestamp() が用意されている。 この関数は例えば以下のようにして使う。

>>> from pyspark.sql.functions import to_timestamp
>>> df.select(to_timestamp(df.dt_str, 'dd/MMM/yyyy HH:mm:ss').alias('parsed_dt')).show()
+-------------------+
|          parsed_dt|
+-------------------+
|2016-01-01 10:13:16|
|2017-02-02 11:14:17|
|2018-03-03 12:15:18|
+-------------------+

変換した DataFrame の型を調べると、ちゃんと timestamp 型になっていることが分かる。

>>> df.select(to_timestamp(df.dt_str, 'dd/MMM/yyyy HH:mm:ss').alias('parsed_dt')).dtypes
[('parsed_dt', 'timestamp')]

string から timestamp への変換 (Spark 1.5 ~)

先ほど使った API はかなり新しいので使えない環境もあるかと思う。 そこで Apache Spark 1.5 以降であれば次のように unix_timestamp() 関数と cast() メソッドを組み合わせると良い。 ようするに、一旦 UNIX タイムにした上でそれを timestamp として解釈させるということ。

>>> from pyspark.sql.functions import unix_timestamp
>>> df.select(unix_timestamp(df.dt_str, 'dd/MMM/yyyy HH:mm:ss').cast('timestamp').alias('parsed_dt')).show()
+-------------------+
|          parsed_dt|
+-------------------+
|2016-01-01 10:13:16|
|2017-02-02 11:14:17|
|2018-03-03 12:15:18|
+-------------------+

変換した後の DataFrame の型を調べると、ちゃんと timestamp 型が使われていることが分かる。

>>> df.select(unix_timestamp(df.dt_str, 'dd/MMM/yyyy HH:mm:ss').cast('timestamp').alias('parsed_dt')).dtypes
[('parsed_dt', 'timestamp')]

timestamp 型に変換すると、次のように filter() メソッドで範囲指定なんかができる。

>>> parsed_df = df.select(to_timestamp(df.dt_str, 'dd/MMM/yyyy HH:mm:ss').alias('parsed_dt'))
>>> parsed_df.filter('parsed_dt > "2017"').show()
+-------------------+
|          parsed_dt|
+-------------------+
|2017-02-02 11:14:17|
|2018-03-03 12:15:18|
+-------------------+

string から timestamp への変換 (Spark SQL)

ちなみに上記の変換は DataFrame API を使う以外に Spark SQL を使うこともできる。 Spark SQL を使うと、文字通り SQL を通して各種データを操作できる。

Spark SQL を使うには、まず DataFrame を registerTempTable() メソッドを使ってテーブルとして扱えるようにする。

>>> df.registerTempTable('datetimes')

すると SparkSession オブジェクトの sql() メソッドで、上記で登録したテーブルを SQL から触れるようになる。

>>> spark.sql('''
... SELECT
...   CAST(from_unixtime(unix_timestamp(dt_str, 'dd/MMM/yyyy HH:mm:ss')) AS timestamp) AS parsed_dt
... FROM datetimes
... ''').show()
+-------------------+
|          parsed_dt|
+-------------------+
|2016-01-01 10:13:16|
|2017-02-02 11:14:17|
|2018-03-03 12:15:18|
+-------------------+

timestamp を string に変換する (Spark 1.5 ~)

続いては、これまでとは逆にタイムスタンプを文字列に変換してみよう。

タイムスタンプから文字列の変換には Apache Spark 1.5 以降であれば date_format() 関数が使える。

>>> from pyspark.sql.functions import date_format
>>> parsed_df.select(date_format(parsed_df.parsed_dt, 'dd/MMM/yyyy HH:mm:ss').alias('dt_str')).show()
+--------------------+
|              dt_str|
+--------------------+
|01/Jan/2016 10:13:16|
|02/Feb/2017 11:14:17|
|03/Mar/2018 12:15:18|
+--------------------+

変換後の型情報を確認すると、ちゃんと文字列になっていることが分かる。

>>> parsed_df.select(date_format(parsed_df.parsed_dt, 'dd/MMM/yyyy HH:mm:ss').alias('dt_str')).dtypes
[('dt_str', 'string')]

ちなみに、凝ったフォーマットが不要であれば以下のように文字列にキャストしてしまうだけでも事足りる。

>>> parsed_df.select(parsed_df.parsed_dt.cast('string').alias('dt_str')).show()
+-------------------+
|             dt_str|
+-------------------+
|2016-01-01 10:13:16|
|2017-02-02 11:14:17|
|2018-03-03 12:15:18|
+-------------------+

この場合でも、ちゃんと文字列に変換されている。

>>> parsed_df.select(parsed_df.parsed_dt.cast('string').alias('dt_str')).dtypes
[('dt_str', 'string')]

timestamp を string に変換する (Spark SQL)

上記の操作は、もちろん Spark SQL を使ってもできる。 先ほどと同じように、まずは DataFrame を registerTempTable() メソッドでテーブルとして扱えるようにする。

>>> parsed_df.registerTempTable('datetimes')

あとは SparkSession オブジェクト経由で、型を変換する SELECT 文を書けばいいだけ。

>>> spark.sql('''
... SELECT
...   date_format(parsed_dt, 'dd/MMM/yyyy HH:mm:ss') AS dt_str
... FROM datetimes
... ''').show()
+--------------------+
|              dt_str|
+--------------------+
|01/Jan/2016 10:13:16|
|02/Feb/2017 11:14:17|
|03/Mar/2018 12:15:18|
+--------------------+

いじょう。

めでたしめでたし。

入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム

入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム