CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: gensim で学習済み単語ベクトル表現を扱ってみる

Python で自然言語処理を扱うためのパッケージのひとつに gensim がある。 今回は、gensim で学習済み単語ベクトル表現 (Pre-trained Word Vectors) を使った Word Embedding を試してみた。 Word Embedding というのは単語 (Word) をベクトル表現の特徴量にする手法のこと。 ごく単純な One-Hot Encoding から、ニューラルネットワークの学習時の重みを元にしたものまで様々な手法が知られている。

今回は、その中でも Facebook の公開している fastText と呼ばれる学習済み単語ベクトル表現を使うことにした。 学習に使われているコーパスは Web クローラのデータと Wikipedia があるようだけど、とりあえずクローラの方にしてみる。 詳細については以下を参照のこと。

fasttext.cc

使った環境は次のとおり。

$ sw_vers           
ProductName:    Mac OS X
ProductVersion: 10.14.6
BuildVersion:   18G3020
$ python -V             
Python 3.7.7

下準備

下準備として、まずは公開されている学習済み単語ベクトル表現をダウンロードしておく。 次の URL からは Web クローラで収集した日本語コーパスを使って、単語を 300 次元のベクトルに埋め込むためのモデルが得られる。 このファイルは 1GB ほどあるので割と時間がかかる。

$ wget https://dl.fbaipublicfiles.com/fasttext/vectors-crawl/cc.ja.300.vec.gz

なお、fastText の Python パッケージを使って、次のようにしても良い。 この機能は今のところリポジトリの HEAD にしか入っていないけど、次のリリースでは含まれるはず。

$ pip install -U git+https://github.com/facebookresearch/fastText.git
$ python -c "from fasttext import util; util.download_model('ja')"

そして、gensim パッケージをインストールしておく。

$ pip install gensim

インストールできたら、Python の REPL を起動しておく。

$ python

gensim から fastText の学習済み単語ベクトル表現を使う

はじめに gensim をインポートする。

>>> import gensim

先ほどダウンロードしたファイルを gensim で読み込む。 これで単語をベクトル表現に変換するためのモデルができる。 なお、この操作にはかなり時間がかかるので気長に待つ必要がある。 だいたい 10 分くらい 1

>>> model = gensim.models.KeyedVectors.load_word2vec_format('cc.ja.300.vec.gz', binary=False)
>>> model
<gensim.models.keyedvectors.Word2VecKeyedVectors object at 0x127797310>

このモデルのざっくりした使い方は以下のドキュメントに記載されている。 ただ、ソースコードと見比べていくとイマイチ更新が追いついていない雰囲気も伺える。

radimrehurek.com

モデルの読み込みが終わったら、早速単語をベクトル表現にしてみよう。 試しに「猫」という単語をベクトル表現にすると、次のようになる。 モデルには辞書ライクなインターフェースがあるので、ブラケットを使ってアクセスする。

>>> model['猫']
array([-2.618e-01, -7.520e-02, -1.930e-02,  2.088e-01, -3.005e-01,
        1.936e-01, -1.561e-01, -3.540e-02,  1.220e-01,  2.718e-01,
        7.460e-02,  1.356e-01,  2.299e-01,  1.851e-01, -2.684e-01,
...(snip)...
       -9.260e-02, -8.890e-02,  1.143e-01, -3.381e-01, -1.913e-01,
        8.160e-02, -4.420e-02, -2.405e-01, -2.170e-02, -1.062e-01,
        3.230e-02, -2.380e-02,  1.860e-02, -2.750e-02, -1.900e-01],
      dtype=float32)

確認すると、この単語ベクトルはちゃんと 300 次元ある。

>>> model['猫'].shape
(300,)

学習した単語のボキャブラリーには vocab という名前でアクセスできる。 このモデルでは 200 万の単語を学習しているようだ。

>>> list(model.vocab.keys())[:10]
['の', '、', '。', 'に', 'は', 'が', 'を', 'て', 'た', 'で']
>>> len(model.vocab.keys())
2000000

most_similar() メソッドを使うと、特定の単語に似ている単語が得られる。

>>> from pprint import pprint
>>> pprint(model.most_similar('猫', topn=10))
[('ネコ', 0.8059155941009521),
 ('ねこ', 0.7272598147392273),
 ('子猫', 0.720253586769104),
 ('仔猫', 0.7062687873840332),
 ('ニャンコ', 0.7058036923408508),
 ('野良猫', 0.7030349969863892),
 ('犬', 0.6505385041236877),
 ('ミケ', 0.6356303691864014),
 ('野良ねこ', 0.6340526342391968),
 ('飼猫', 0.6265145540237427)]

単語の類似度は similarity() メソッドで得られる。 たとえば、「猫」と「犬」の方が「猫」と「人」よりも単語としては似ているようだ。

>>> model.similarity('猫', '犬')
0.65053856
>>> model.similarity('猫', '人')
0.23371725

ところで、Word2vec なんかだと、まるで単語の意味を理解しているかのように単語ベクトル間の足し引きができるとかよく言われている。 このモデルではどんなもんだろうか。

ありがちな例として「王様」から「男」を抜いて「女」を足してみよう。 女王になるみたいな説明、よく見る。

>>> new_vec = model['王様'] - model['男'] + model['女']

similar_by_vector() メソッドを使って得られたベクトルに似ている単語を確認してみる。

>>> pprint(model.similar_by_vector(new_vec))
[('王様', 0.8916897773742676),
 ('女王', 0.527921199798584),
 ('ラジオキッズ', 0.5255386829376221),
 ('王さま', 0.5226017236709595),
 ('王妃', 0.5000214576721191),
 ('裸', 0.487439900636673),
 ('タプチム', 0.4832267761230469),
 ('アンナ・レオノーウェンズ', 0.4807651937007904),
 ('ゲムケン', 0.48058977723121643),
 ('お姫様', 0.4792743921279907)]

最も似ている単語は変換前の単語ということになってしまった。 変換前の単語は取り除くことにするのかな? だとしたら、次に「女王」が得られているけど。

そもそも元の単語ベクトルに類似した単語は次のとおり。

>>> pprint(model.similar_by_word('王様'))
[('王さま', 0.5732064247131348),
 ('ラジオキッズ', 0.5712020993232727),
 ('女王', 0.5381238460540771),
 ('ゲムケン', 0.5354433059692383),
 ('裸', 0.5288593769073486),
 ('ブランチ', 0.5283758640289307),
 ('王', 0.5277552604675293),
 ('タプチム', 0.5203500986099243),
 ('乃女', 0.5120265483856201),
 ('王妃', 0.5055921077728271)]

学習済み単語ベクトル表現の読み込みが遅い問題について

一般的に、公開されている学習済み単語ベクトル表現は、特定の言語や環境に依存しないフォーマットになっているようだ。 そのため、gensim に限らず読み込む際には結構重たいパース処理が必要になっている。 この点は、一般的な機械学習において CSV などのデータを読み込むときのテクニックを応用すれば高速化が見込めそうだ。

具体的には Python であれば Pickle を使って直列化・非直列化する。 まずは pickle パッケージを読み込む。

>>> import pickle

ファイル名を指定してモデルをストレージに直列化する。

>>> with open('gensim-kvecs.cc.ja.300.vec.pkl', mode='wb') as fp:
...     pickle.dump(model, fp)
... 

直列化したら、あとは使いたいタイミングで非直列化するだけ。 ストレージのスループット次第だけど、それでも 10 分待たされるようなことはないはず。

>>> with open('gensim-kvecs.cc.ja.300.vec.pkl', mode='rb') as fp:
...     model = pickle.load(fp)
... 

ただし、直列化されない内部的なキャッシュもあるようで一発目の呼び出しではちょっとモタつく。 とはいえ、もちろん 10 分とか待たされるようなことはない。

いじょう。


  1. 今の実装には明確な処理のボトルネックがあって、それを解消すると 1/3 くらいの時間に短縮できる。

Python: statsmodels で時系列データを基本成分に分解する

時系列データを扱うとき、原系列が傾向変動・季節変動・不規則変動という基本成分の合成で成り立っていると捉えることがある。 傾向変動は中長期的な増加・減少といった変化であり、季節変動は例えば 1 ヶ月や 1 年といった周期的な変化を指している。 不規則変動は、前者 2 つに当てはまらない変化で、誤差変動と特異的変動に分けて考える場合もあるようだ。

原系列が基本成分の合成と考える場合でも、捉え方として加法モデルと乗法モデルにさらに分かれる。 まず、加法モデルでは傾向変動  T(t) と季節変動  S(t)、誤差変動  I(t) の和を原系列  O(t) と捉える。 この考え方では、傾向変動の大きさに関係なく一定の季節変動が加えられる。

 O(t) = T(t) + S(t) + I(t)

一方で、乗法モデルでは積と捉える。 つまり、傾向変動が大きくなれば、それに比例して季節変動も大きくなる、という考え方。

 O(t) = T(t) \times S(t) \times I(t)

これらは扱うデータやタスクによって使い分ける必要があるらしい。 今回は、Python の statsmodels を使って原系列を基本成分に分解してみる。

使った環境は次のとおり。

$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.14.6
BuildVersion:   18G3020
$ python -V                      
Python 3.7.7

下準備

まずは、必要なパッケージをインストールしておく。

$ pip install statsmodels seaborn

現系列を基本成分に分解する

現系列を基本成分に分解するには seasonal_decompose() 関数を使う。 次のサンプルコードでは、旅客機の乗客数のデータを分解してみた。 なお、モデルとしては乗法モデルを仮定している。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import pandas as pd
import seaborn as sns
from statsmodels import api as sm
from matplotlib import pyplot as plt
from pandas.plotting import register_matplotlib_converters
register_matplotlib_converters()


def main():
    # 旅客機の乗客数のデータセットを読み込む
    df = sns.load_dataset('flights')

    df['year-month'] = df.month.astype(str) + ', ' + df.year.astype(str)
    df['year-month'] = pd.to_datetime(df['year-month'], format='%B, %Y')
    df = df.set_index('year-month')

    # 時系列データを傾向変動・季節変動・残差に分解する
    decompose_result = sm.tsa.seasonal_decompose(df['passengers'],
                                                 # 乗法モデルを仮定する
                                                 model='multiplicative')

    # これでもグラフが描ける
    # decompose_result.plot()

    # 描画する領域を用意する
    fig, axes = plt.subplots(nrows=4, ncols=1, figsize=(8, 8), sharex=True)

    # 原系列
    axes[0].set_title('Observed')
    axes[0].plot(decompose_result.observed)

    # 傾向変動
    axes[1].set_title('Trend')
    axes[1].plot(decompose_result.trend)

    # 季節変動
    axes[2].set_title('Seasonal')
    axes[2].plot(decompose_result.seasonal)

    # 残差 (不規則変動 = 誤差変動 + 特異的変動)
    axes[3].set_title('Residual')
    axes[3].plot(decompose_result.resid)

    # グラフを表示する
    plt.show()


if __name__ == '__main__':
    main()

上記を実行してみよう。

$ python decompose.py

次のようなグラフが得られる。 上から原系列、傾向変動、季節変動、不規則変動 (残差) となっている。

f:id:momijiame:20200406232249p:plain
原系列を基本成分に分解する

注意すべきポイントとして、seasonal_decompose() 関数では傾向変動と不規則変動は端の要素が NaN になってしまうようだ。

原系列と傾向変動で偏自己相関を見比べてみる

時系列データに周期性、つまり季節成分が含まれるか調べる方法のひとつに、偏自己相関を調べるというのがある。 今回は、分解する前の原系列と、分解した後の傾向変動について、自己相関と偏自己相関を調べて見比べてみよう。 自己相関では、ある時点のデータ  O(t) には 1 つ前のデータ  O(t - 1) が関係している可能性がある。 そして、1 つ前のデータには、さらにその 1 つ前のデータが、というような積み重ねの影響を受けている恐れがある。 偏自己相関は、そのような積み重ねの影響を消去することを目的とした統計量になっている。

次のサンプルコードでは、分解する前の原系列と分解した後の傾向変動について自己相関と偏自己相関を計算してグラフにプロットしている。 なお、データに NaN があると自己相関がうまく計算できないようなので、傾向変動からは NaN を除外している。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import pandas as pd
import seaborn as sns
from statsmodels import api as sm
from matplotlib import pyplot as plt
from pandas.plotting import register_matplotlib_converters
register_matplotlib_converters()


def main():
    # 旅客機の乗客数のデータセットを読み込む
    df = sns.load_dataset('flights')

    df['year-month'] = df.month.astype(str) + ', ' + df.year.astype(str)
    df['year-month'] = pd.to_datetime(df['year-month'], format='%B, %Y')
    df = df.set_index('year-month')

    # 時系列データを傾向変動・季節変動・残差に分解する
    decompose_result = sm.tsa.seasonal_decompose(df['passengers'],
                                                 # 乗法モデルを仮定する
                                                 model='multiplicative')

    _, axes = plt.subplots(nrows=2, ncols=2, figsize=(8, 8))

    # 原系列の ACF
    sm.tsa.graphics.plot_acf(df['passengers'], ax=axes[0][0])
    # 原系列の PACF
    sm.tsa.graphics.plot_pacf(df['passengers'], ax=axes[1][0])

    # 傾向変動の ACF
    sm.tsa.graphics.plot_acf(decompose_result.trend.dropna(), ax=axes[0][1])
    # 傾向変動の PACF
    sm.tsa.graphics.plot_pacf(decompose_result.trend.dropna(), ax=axes[1][1])

    # グラフを表示する
    plt.show()


if __name__ == '__main__':
    main()

上記を実行してみよう。

$ python decompacf.py

次のようなグラフが得られる。 左側が原系列、右側が傾向変動について計算したもの。 青い帯は「相関がない」を帰無仮説とした 95% 信頼区間を表している。 つまり、帯の外にある値は 5% の有意水準で相関があることになる。

f:id:momijiame:20200406233714p:plain
原系列と傾向変動の自己相関 (ACF) と偏自己相関 (PACF)

原系列の偏自己相関では、ところどころに相関があることがわかる。 それに対して、傾向変動の偏自己相関では、ラグ 1 以外に相関がある箇所は見当たらない。 つまり、季節変動を取り除くことができていることになるようだ。

いじょう。

Python: 中心化移動平均 (CMA: Centered Moving Average) について

以前から移動平均 (MA: Moving Average) という手法自体は知っていたけど、中心化移動平均 (CMA: Centered Moving Average) というものがあることは知らなかった。 一般的な移動平均である後方移動平均は、データの対応関係が原系列に対して遅れてしまう。 そこで、中心化移動平均という手法を使うことで遅れをなくすらしい。 この手法は、たとえば次のような用途でひとつのやり方として使われているようだ。

  • 不規則変動の除去
  • 季節変動の除去

使った環境は次のとおり。

$ sw_vers  
ProductName:    Mac OS X
ProductVersion: 10.14.6
BuildVersion:   18G3020
$ python -V
Python 3.7.7

下準備

下準備として、必要なパッケージをインストールしておく。

$ pip install pandas seaborn

インストールできたら Python のインタプリタを起動する。

$ python

起動できたら、1 ~ 12 までの数字が入った Series のオブジェクトを作る。

>>> import pandas as pd
>>> x = pd.Series(range(1, 12 + 1))
>>> x
0      1
1      2
2      3
3      4
4      5
5      6
6      7
7      8
8      9
9     10
10    11
11    12
dtype: int64

上記のオブジェクトを使って移動平均の計算方法について学んでいく。

(後方) 移動平均 (MA: Moving Average)

はじめに、一般的な移動平均である後方移動平均から試す。 Pandas では rolling() メソッドを使うことで移動平均を計算できる。 以下では 3 点の要素で平均をとる 3 点移動平均を計算している。

>>> backward_3p_ma = x.rolling(window=3).mean()
>>> backward_3p_ma
0      NaN
1      NaN
2      2.0
3      3.0
4      4.0
5      5.0
6      6.0
7      7.0
8      8.0
9      9.0
10    10.0
11    11.0
dtype: float64

この計算では、たとえばインデックスで 2 に入る要素は、次のように計算される。

backward_3p_ma[2] = (x[0] + x[1] + x[2]) / 3

つまり、自分よりも後ろの値を使って平均を計算することになる。

この計算方法では、原系列との対応関係を考えたとき次のように 1 つ分の遅れが出る。

>>> pd.concat([x, backward_3p_ma], axis=1)
     0     1
0    1   NaN
1    2   NaN
2    3   2.0
3    4   3.0
4    5   4.0
5    6   5.0
6    7   6.0
7    8   7.0
8    9   8.0
9   10   9.0
10  11  10.0
11  12  11.0

中心化移動平均 (CMA: Centered Moving Average)

そこで、遅れの影響を取り除いたものが中心化移動平均と呼ばれるらしい。 たとえば、平均をとる要素数が奇数のときは、単に rolling() メソッドの center オプションを有効にするだけで良い。

>>> centered_3p_ma = x.rolling(window=3, center=True).mean()
>>> centered_3p_ma
0      NaN
1      2.0
2      3.0
3      4.0
4      5.0
5      6.0
6      7.0
7      8.0
8      9.0
9     10.0
10    11.0
11     NaN
dtype: float64

このオプションが有効だと、たとえばインデックスで 2 に入る要素は、次のように計算される。 つまり、自分の前後にある値を使って平均を計算することになる。

centered_3p_ma[2] = (x[1] + x[2] + x[3]) / 3

現系列との対応関係を確認すると、このやり方では遅れがなくなっている。

>>> pd.concat([x, centered_3p_ma], axis=1)
     0     1
0    1   NaN
1    2   2.0
2    3   3.0
3    4   4.0
4    5   5.0
5    6   6.0
6    7   7.0
7    8   8.0
8    9   9.0
9   10  10.0
10  11  11.0
11  12   NaN

ただし、このオプションは実装的には単に後方移動平均を計算した上で shift() しているだけに過ぎないらしい。 ようするに、こういうこと。

>>> backward_3p_ma_shifted = x.rolling(window=3).mean().shift(-1)
>>> pd.concat([x, backward_3p_ma_shifted], axis=1)
     0     1
0    1   NaN
1    2   2.0
2    3   3.0
3    4   4.0
4    5   5.0
5    6   6.0
6    7   7.0
7    8   8.0
8    9   9.0
9   10  10.0
10  11  11.0
11  12   NaN

そのため、平均をとる要素数が偶数のときに困ったことが起こる。

平均をとる要素数が偶数のときの問題点について

試しに、平均をとる要素数を 4 点に増やして、そのまま計算してみよう。

>>> centerd_4p_ma = x.rolling(window=4, center=True).mean() 
>>> centerd_4p_ma
0      NaN
1      NaN
2      2.5
3      3.5
4      4.5
5      5.5
6      6.5
7      7.5
8      8.5
9      9.5
10    10.5
11     NaN
dtype: float64

すると、計算結果に端数が出ている。

原系列と比較すると、対応関係に 0.5 の遅れがあることがわかる。

>>> pd.concat([x, centerd_4p_ma], axis=1)
     0     1
0    1   NaN
1    2   NaN
2    3   2.5
3    4   3.5
4    5   4.5
5    6   5.5
6    7   6.5
7    8   7.5
8    9   8.5
9   10   9.5
10  11  10.5
11  12   NaN

つまり、中心化移動平均では平均をとる要素数が偶数と奇数のときで計算方法を変えなければいけない。

要素数が偶数のときの計算方法

要素数が偶数のときの中心化移動平均は、計算が 2 段階に分かれている。 はじめに、後方移動平均をそのまま計算する。

>>> backward_4p_ma =x.rolling(window=4).mean()
>>> backward_4p_ma
0      NaN
1      NaN
2      NaN
3      2.5
4      3.5
5      4.5
6      5.5
7      6.5
8      7.5
9      8.5
10     9.5
11    10.5
dtype: float64

その上で、移動平均に使った要素数の半分だけデータをずらし、もう一度要素数 2 で平均を取り直す。

centerd_4p_ma = backward_4p_ma.shift(-2).rolling(window=2).mean()

原系列との対応関係を確認すると、遅れが解消していることがわかる。

>>> pd.concat([x, centerd_4p_ma], axis=1)
     0     1
0    1   NaN
1    2   NaN
2    3   3.0
3    4   4.0
4    5   5.0
5    6   6.0
6    7   7.0
7    8   8.0
8    9   9.0
9   10  10.0
10  11   NaN
11  12   NaN

別のデータで中心化移動平均を計算してみる

もうちょっとちゃんとしたデータでも計算してみよう。 次のサンプルコードでは、旅客機の乗客数の推移に対して 12 点で中心化移動平均を計算している。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from calendar import month_name

import seaborn as sns
from matplotlib import pyplot as plt
import pandas as pd
from pandas.plotting import register_matplotlib_converters
register_matplotlib_converters()


def main():
    # 航空機の乗客を記録したデータセットを読み込む
    df = sns.load_dataset('flights')
    # 月の名前を数字に直す
    month_name_mappings = {name: str(n).zfill(2) for n, name in
                           enumerate(month_name)}
    df['month'] = df['month'].apply(lambda x: month_name_mappings[x])
    # 年と月を結合したカラムを用意する
    df['year-month'] = df.year.astype(str) + '-' + df.month.astype(str)
    df['year-month'] = pd.to_datetime(df['year-month'], format='%Y-%m')

    # 原系列
    sns.lineplot(data=df, x='year-month', y='passengers', label='original')
    # 中心化移動平均
    df['passengers-cma'] = df['passengers'].rolling(window=12).mean().shift(-6).rolling(2).mean()
    sns.lineplot(data=df, x='year-month', y='passengers-cma', label='CMA')

    # グラフを表示する
    plt.show()


if __name__ == '__main__':
    main()

上記を実行してみる。

$ python flightscma.py

得られるグラフが次のとおり。

f:id:momijiame:20200401184339p:plain
旅客機の乗客数データに対して 12 点中心化移動平均を計算する

ここまでデータ点数が多いと、正直 0.5 の遅れとか言われてもよくわからないけど、これで計算できているはず。

エンタープライズ向けヘリウム充填 HDD が安く手に入ることがあるらしい

この製品は、一昨年あたりから「HDD ガチャ」や「HDD おみくじ」といった愛称で親しまれているようだ。 今回は、Amazon セールで安くなっていたので実際にガチャを引いてみることにした。 このエントリは、その備忘録になる。

HDD ガチャ / HDD おみくじとは

Western Digital (WD) が販売している外付けハードディスク Elements Desktop シリーズの大容量モデル (8TB~) のこと。

この製品がガチャやおみくじと呼ばれている理由は、8TB に関して内部の HDD に複数のモデルが使われているところらしい。 具体的には、次の 2 種類がこれまでに確認されている。

  • WD80EMAZ-00WJTA0
    • Ultrastar DC HC510 の廉価版というウワサ
      • 回転速度は 7200rpm ではなく 5400rpm
    • ヘリウム充填モデル
  • WD80EMAZ-00M9AA0
    • Ultrastar DC HC320 の廉価版というウワサ
      • 回転速度は 7200rpm ではなく 5400rpm

Ultrastar DC HC シリーズは、Western Digital に買収された HGST が製造しているエンタープライズ向けの製品になる。 つまり、ウワサが本当であれば、この製品はコンシューマ向けにもかかわらず、内部にはエンタープライズ向けの HDD (の廉価版) が使われていることになる。 ようするに、企業のデータセンターの中でサーバに使うような高信頼モデル 1 ということ。

ガチャでいう「当たり」は HC510 の廉価版であるヘリウム充填モデルになる。 しかも、ウワサによると昨年あたりからの出荷分はすべて中身が「当たり」のヘリウム充填モデルになっているらしい。 また、10TB 以上に関してはヘリウム充填モデルしか確認されておらず、すべて「当たり」といえる。

そして、ガチャが楽しまれている理由は、使われている内部の HDD の珍しさからというわけではない。 何なら Ultrastar DC HC も、お金さえ出せば買うことができる 2 。 その主な理由は値段で、今回 Amazon のセールでは 8TB のモデルが約 1.7 万円になっていた。 また、セールでなくても約 2 万円で買うことができる。 そして、同じ容量だとコンシューマ向け NAS 用途の WD Red シリーズで約 2.5 万円する。

とはいえ、内部で使われているハードディスクは製品の仕様になっているわけではない。 ある時から異なるモデルが使われることも十分に考えられるため、実際のところは買ってみないとわからない。 そうした意味では、ガチャ要素はいまだ健在といえるだろう。

ガチャを引いてみた

そんなわけで、前置きが長くなったけど今回はガチャを引いてみた。

以下が届いた製品のパッケージで、至ってシンプル。

f:id:momijiame:20200328144335j:plain
製品のパッケージ

付属品は本体・電源アダプタ・USB ケーブルの 3 つ。

f:id:momijiame:20200328144615j:plain
製品の付属品

付属の USB ケーブルでつないで CrystalDiskInfo を確認した画面が次のとおり。 この型番は、ガチャでいう「当たり」のヘリウム充填モデルだ。

f:id:momijiame:20200328194002p:plain
CrystalDiskInfo

せっかくなので中身の HDD も確認してみよう。 筐体のカバーはツメで固定されているので、使っていないカードなどを差し込んでツメを外していく。

f:id:momijiame:20200328165626j:plain
隙間にカードなどを差し込んでツメを外す

開封すると、こんな感じ。 たしかに、ヘリウム充填モデルの筐体だ。

f:id:momijiame:20200328160420j:plain
中身の 3.5 インチハードディスク

注意点

この製品は、単なる USB 接続の外付けハードディスクとして使う分には何の懸念もない。 ただ単にエンタープライズ向けの製品が安く手に入るというだけ。 しかし、中のハードディスクを取り出して SATA で使うとき (殻割り) には、いくつかの注意点がある。

3.3V 問題

この製品は、通称 3.3V 問題と呼ばれる相性の問題がある。 そのため、使うマザーボードによっては別途ペリフェラル4ピン電源変換ケーブルを必要とする。

製品保証外になる

この製品には購入時から 2 年間の保証がついているけど、殻割りして使う分には当然ながら保証の対象外になる。 一般的な HDD は購入時に保証がついているので、その点は割り引いて考える必要があるはず。

(2020-04-18 追記)

その後、殻割りして自宅の NAS (Synology DS218+) のストレージになった。

f:id:momijiame:20200329191354j:plain
殻割り

(2020-07-08 追記)

少し前のセールから、以下のようなレビューが報告されている。

  • ヘリウム非充填モデル (WD80EMAZ-00M9AA0) がまた含まれるようになった
  • ヘリウム充填モデルの型番が WD80EMAZ-00WJTA0 から WD80EZAZ-11TDBA0 に変わった
    • ただし R/N は US7SAL080 で変更なし

参考

henjinkutsu.com

nyanshiba.hatenablog.com


  1. ただし、データセンター向けの製品が常に優れているというわけではない。騒音や発熱が大きかったり、動作温度領域が狭かったりと、家庭では扱いにくいこともある。

  2. 5 万円くらいする。

Python: 時系列データの交差検証と TimeSeriesSplit の改良について

一般的に、時系列データを扱うタスクでは過去のデータを使って未来のデータを予測することになる。 そのため、交差検証するときも過去のデータを使ってモデルを学習させた上で未来のデータを使って検証しなければいけない。 もし、未来のデータがモデルの学習データに混入すると、本来は利用できないデータにもとづいた楽観的な予測が得られてしまう。 今回は、そんな時系列データの交差検証と scikit-learn の TimeSeriesSplit の改良について書いてみる。

使った環境は次のとおり。

$ sw_vers           
ProductName:    Mac OS X
ProductVersion: 10.14.6
BuildVersion:   18G3020
$ python -V                 
Python 3.8.1

下準備

あらかじめ、必要なパッケージをインストールしておく。

$ pip install scikit-learn pandas seaborn

scikit-learn の TimeSeriesSplit を使った時系列データの交差検証

scikit-learn には、時系列データの交差検証に使うための KFold 実装として TimeSeriesSplit というクラスがある。 このクラスは、データが時系列にもとづいてソートされているという前提でリークが起こらないようにデータを分割できる。

試しに、航空機の旅客数を記録したデータセットを使って動作を確かめてみよう。 以下にサンプルコードを示す。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from calendar import month_name

import pandas as pd
from matplotlib import pyplot as plt
import seaborn as sns
from sklearn.model_selection import TimeSeriesSplit
from pandas.plotting import register_matplotlib_converters
register_matplotlib_converters()


def main():
    # 航空機の旅客数を記録したデータセットを読み込む
    df = sns.load_dataset('flights')

    # 時系列のカラムを用意する
    month_name_mappings = {name: str(n).zfill(2) for n, name in
                           enumerate(month_name)}
    df['month'] = df['month'].apply(lambda x: month_name_mappings[x])
    df['year-month'] = df.year.astype(str) + '-' + df.month.astype(str)
    df['year-month'] = pd.to_datetime(df['year-month'], format='%Y-%m')

    # データの並び順を元に分割する
    folds = TimeSeriesSplit(n_splits=5)

    # 5 枚のグラフを用意する
    fig, axes = plt.subplots(5, 1, figsize=(12, 12))

    # 学習用のデータとテスト用のデータに分割するためのインデックス情報を得る
    for i, (train_index, test_index) in enumerate(folds.split(df)):
        # 生のインデックス
        print(f'index of train: {train_index}')
        print(f'index of test: {test_index}')
        print('----------')
        # 元のデータを描く
        sns.lineplot(data=df, x='year-month', y='passengers', ax=axes[i], label='original')
        # 学習用データを描く
        sns.lineplot(data=df.iloc[train_index], x='year-month', y='passengers', ax=axes[i], label='train')
        # テスト用データを描く
        sns.lineplot(data=df.iloc[test_index], x='year-month', y='passengers', ax=axes[i], label='test')

    # グラフを表示する
    plt.legend()
    plt.show()


if __name__ == '__main__':
    main()

上記のサンプルコードを実行する。 TimeSeriesSplit#split() から得られた添字が表示される。

$ python flights.py
index of train: [ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23]
index of test: [24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47]
----------
index of train: [ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47]
index of test: [48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71]
----------
index of train: [ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71]
index of test: [72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95]
----------
index of train: [ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95]
index of test: [ 96  97  98  99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
 114 115 116 117 118 119]
----------
index of train: [  0   1   2   3   4   5   6   7   8   9  10  11  12  13  14  15  16  17
  18  19  20  21  22  23  24  25  26  27  28  29  30  31  32  33  34  35
  36  37  38  39  40  41  42  43  44  45  46  47  48  49  50  51  52  53
  54  55  56  57  58  59  60  61  62  63  64  65  66  67  68  69  70  71
  72  73  74  75  76  77  78  79  80  81  82  83  84  85  86  87  88  89
  90  91  92  93  94  95  96  97  98  99 100 101 102 103 104 105 106 107
 108 109 110 111 112 113 114 115 116 117 118 119]
index of test: [120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
 138 139 140 141 142 143]
----------

また、次のような折れ線グラフが得られる。

f:id:momijiame:20200327180723p:plain
TimeSeriesSplit を用いた分割

グラフを見ると、時系列で過去のデータが学習用、未来のデータが検証用として分割されていることがわかる。

TimeSeriesSplit の使いにくさについて

ただ、TimeSeriesSplit は使ってみると意外と面倒くさい。 なぜかというと、データが時系列にもとづいてソートされていないと使えないため。

たとえば、先ほどのサンプルコードに次のようなデータをシャッフルするコードを挿入してみよう。 こうすると、データが時系列順ではなくなる。

    # データの並び順をシャッフルする (時系列順ではなくなる)
    df = df.sample(frac=1.0, random_state=42)

コードを挿入した状態で実行してみよう。

$ python shuffledflights.py
index of train: [ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23]
index of test: [24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47]
----------
index of train: [ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47]
index of test: [48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71]
----------
index of train: [ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71]
index of test: [72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95]
----------
index of train: [ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95]
index of test: [ 96  97  98  99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
 114 115 116 117 118 119]
----------
index of train: [  0   1   2   3   4   5   6   7   8   9  10  11  12  13  14  15  16  17
  18  19  20  21  22  23  24  25  26  27  28  29  30  31  32  33  34  35
  36  37  38  39  40  41  42  43  44  45  46  47  48  49  50  51  52  53
  54  55  56  57  58  59  60  61  62  63  64  65  66  67  68  69  70  71
  72  73  74  75  76  77  78  79  80  81  82  83  84  85  86  87  88  89
  90  91  92  93  94  95  96  97  98  99 100 101 102 103 104 105 106 107
 108 109 110 111 112 113 114 115 116 117 118 119]
index of test: [120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
 138 139 140 141 142 143]
----------

得られるグラフは次のとおり。 当たり前だけど、先ほどとは違って分割の仕方が時系列とは関係ないぐちゃぐちゃな状態になっている。 これでは、学習用データが検証用データよりも未来にあることがあるので、適切な交差検証ができない。

f:id:momijiame:20200327180824p:plain
TimeSeriesSplit を用いた分割 (時系列にもとづかないソート済み)

TimeSeriesSplit を改良してみる

そこで、TimeSeriesSplit を改良してみることにした。 具体的には、次のようなポイント。

  • DataFrame を受け取れる
  • 時系列にもとづいてソートされていなくても良い
  • 代わりに、特定のカラムを時系列の情報として指定できる

以下にサンプルコードを示す。 MovingWindowKFold というクラスがそれで、TimeSeriesSplit のラッパーとして振る舞うように作ってある。 MovingWindowKFold#split()DataFrame#iloc で使える添字のリストを返す。 DataFrame そのもののインデックスを返すことも考えたけど、まあそこは必要に応じて直してもらえば。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import sys
from calendar import month_name

import pandas as pd
from matplotlib import pyplot as plt
import seaborn as sns
from sklearn.model_selection import TimeSeriesSplit
from pandas.plotting import register_matplotlib_converters
register_matplotlib_converters()


class MovingWindowKFold(TimeSeriesSplit):
    """時系列情報が含まれるカラムでソートした iloc を返す KFold"""

    def __init__(self, ts_column, clipping=False, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # 時系列データのカラムの名前
        self.ts_column = ts_column
        # 得られる添字のリストの長さを過去最小の Fold に揃えるフラグ
        self.clipping = clipping

    def split(self, X, *args, **kwargs):
        # 渡されるデータは DataFrame を仮定する
        assert isinstance(X, pd.DataFrame)

        # clipping が有効なときの長さの初期値
        train_fold_min_len, test_fold_min_len = sys.maxsize, sys.maxsize

        # 時系列のカラムを取り出す
        ts = X[self.ts_column]
        # 元々のインデックスを振り直して iloc として使える値 (0, 1, 2...) にする
        ts_df = ts.reset_index()
        # 時系列でソートする
        sorted_ts_df = ts_df.sort_values(by=self.ts_column)
        # スーパークラスのメソッドで添字を計算する
        for train_index, test_index in super().split(sorted_ts_df, *args, **kwargs):
            # 添字を元々の DataFrame の iloc として使える値に変換する
            train_iloc_index = sorted_ts_df.iloc[train_index].index
            test_iloc_index = sorted_ts_df.iloc[test_index].index

            if self.clipping:
                # TimeSeriesSplit.split() で返される Fold の大きさが徐々に大きくなることを仮定している
                train_fold_min_len = min(train_fold_min_len, len(train_iloc_index))
                test_fold_min_len = min(test_fold_min_len, len(test_iloc_index))

            yield list(train_iloc_index[-train_fold_min_len:]), list(test_iloc_index[-test_fold_min_len:])


def main():
    df = sns.load_dataset('flights')

    month_name_mappings = {name: str(n).zfill(2) for n, name in
                           enumerate(month_name)}
    df['month'] = df['month'].apply(lambda x: month_name_mappings[x])
    df['year-month'] = df.year.astype(str) + '-' + df.month.astype(str)
    df['year-month'] = pd.to_datetime(df['year-month'], format='%Y-%m')

    # データの並び順をシャッフルする
    df = df.sample(frac=1.0, random_state=42)

    # 特定のカラムを時系列としてソートした分割
    folds = MovingWindowKFold(ts_column='year-month', n_splits=5)

    fig, axes = plt.subplots(5, 1, figsize=(12, 12))

    # 元々のデータを時系列ソートした iloc が添字として得られる
    for i, (train_index, test_index) in enumerate(folds.split(df)):
        print(f'index of train: {train_index}')
        print(f'index of test: {test_index}')
        print('----------')
        sns.lineplot(data=df, x='year-month', y='passengers', ax=axes[i], label='original')
        sns.lineplot(data=df.iloc[train_index], x='year-month', y='passengers', ax=axes[i], label='train')
        sns.lineplot(data=df.iloc[test_index], x='year-month', y='passengers', ax=axes[i], label='test')

    plt.legend()
    plt.show()


if __name__ == '__main__':
    main()

上記を実行してみよう。 もちろん、先ほどと同じようにデータはランダムにシャッフルされている。

$ python movwin.py
index of train: [42, 128, 98, 91, 27, 72, 96, 80, 87, 26, 34, 20, 5, 88, 141, 53, 33, 92, 9, 1, 138, 116, 56, 49]
index of test: [47, 48, 28, 16, 44, 125, 61, 30, 119, 65, 78, 76, 32, 124, 93, 55, 45, 110, 37, 81, 52, 25, 103, 60]
----------
index of train: [42, 128, 98, 91, 27, 72, 96, 80, 87, 26, 34, 20, 5, 88, 141, 53, 33, 92, 9, 1, 138, 116, 56, 49, 47, 48, 28, 16, 44, 125, 61, 30, 119, 65, 78, 76, 32, 124, 93, 55, 45, 110, 37, 81, 52, 25, 103, 60]
index of test: [113, 75, 101, 10, 129, 71, 100, 24, 4, 117, 111, 121, 41, 105, 68, 122, 15, 7, 8, 51, 57, 17, 82, 139]
----------
index of train: [42, 128, 98, 91, 27, 72, 96, 80, 87, 26, 34, 20, 5, 88, 141, 53, 33, 92, 9, 1, 138, 116, 56, 49, 47, 48, 28, 16, 44, 125, 61, 30, 119, 65, 78, 76, 32, 124, 93, 55, 45, 110, 37, 81, 52, 25, 103, 60, 113, 75, 101, 10, 129, 71, 100, 24, 4, 117, 111, 121, 41, 105, 68, 122, 15, 7, 8, 51, 57, 17, 82, 139]
index of test: [94, 19, 135, 118, 63, 77, 11, 107, 58, 66, 2, 84, 43, 73, 46, 134, 114, 83, 112, 109, 142, 35, 12, 54]
----------
index of train: [42, 128, 98, 91, 27, 72, 96, 80, 87, 26, 34, 20, 5, 88, 141, 53, 33, 92, 9, 1, 138, 116, 56, 49, 47, 48, 28, 16, 44, 125, 61, 30, 119, 65, 78, 76, 32, 124, 93, 55, 45, 110, 37, 81, 52, 25, 103, 60, 113, 75, 101, 10, 129, 71, 100, 24, 4, 117, 111, 121, 41, 105, 68, 122, 15, 7, 8, 51, 57, 17, 82, 139, 94, 19, 135, 118, 63, 77, 11, 107, 58, 66, 2, 84, 43, 73, 46, 134, 114, 83, 112, 109, 142, 35, 12, 54]
index of test: [40, 3, 31, 132, 14, 97, 143, 131, 102, 104, 140, 126, 89, 74, 22, 36, 62, 23, 90, 50, 133, 0, 38, 21]
----------
index of train: [42, 128, 98, 91, 27, 72, 96, 80, 87, 26, 34, 20, 5, 88, 141, 53, 33, 92, 9, 1, 138, 116, 56, 49, 47, 48, 28, 16, 44, 125, 61, 30, 119, 65, 78, 76, 32, 124, 93, 55, 45, 110, 37, 81, 52, 25, 103, 60, 113, 75, 101, 10, 129, 71, 100, 24, 4, 117, 111, 121, 41, 105, 68, 122, 15, 7, 8, 51, 57, 17, 82, 139, 94, 19, 135, 118, 63, 77, 11, 107, 58, 66, 2, 84, 43, 73, 46, 134, 114, 83, 112, 109, 142, 35, 12, 54, 40, 3, 31, 132, 14, 97, 143, 131, 102, 104, 140, 126, 89, 74, 22, 36, 62, 23, 90, 50, 133, 0, 38, 21]
index of test: [95, 136, 99, 85, 29, 18, 115, 39, 123, 86, 130, 79, 6, 13, 127, 70, 108, 64, 120, 69, 59, 106, 67, 137]
----------

得られるグラフは次のとおり。 ランダムにシャッフルされたデータでも、ちゃんと時系列でデータが分割されていることがわかる。

f:id:momijiame:20200327181559p:plain
MovingWindowKFold を用いた分割

ちなみに、学習データを等幅に切りそろえる機能も用意した。 この手法に世間的にはなんて名前がついているのかわからないけど、とりあえず clipping オプションを有効にすれば使える。

    folds = MovingWindowKFold(ts_column='year-month', clipping=True, n_splits=5)

有効にした状態で実行するとこんな感じ。 リストの長さが揃っている。

$ python movwin.py
index of train: [42, 128, 98, 91, 27, 72, 96, 80, 87, 26, 34, 20, 5, 88, 141, 53, 33, 92, 9, 1, 138, 116, 56, 49]
index of test: [47, 48, 28, 16, 44, 125, 61, 30, 119, 65, 78, 76, 32, 124, 93, 55, 45, 110, 37, 81, 52, 25, 103, 60]
----------
index of train: [47, 48, 28, 16, 44, 125, 61, 30, 119, 65, 78, 76, 32, 124, 93, 55, 45, 110, 37, 81, 52, 25, 103, 60]
index of test: [113, 75, 101, 10, 129, 71, 100, 24, 4, 117, 111, 121, 41, 105, 68, 122, 15, 7, 8, 51, 57, 17, 82, 139]
----------
index of train: [113, 75, 101, 10, 129, 71, 100, 24, 4, 117, 111, 121, 41, 105, 68, 122, 15, 7, 8, 51, 57, 17, 82, 139]
index of test: [94, 19, 135, 118, 63, 77, 11, 107, 58, 66, 2, 84, 43, 73, 46, 134, 114, 83, 112, 109, 142, 35, 12, 54]
----------
index of train: [94, 19, 135, 118, 63, 77, 11, 107, 58, 66, 2, 84, 43, 73, 46, 134, 114, 83, 112, 109, 142, 35, 12, 54]
index of test: [40, 3, 31, 132, 14, 97, 143, 131, 102, 104, 140, 126, 89, 74, 22, 36, 62, 23, 90, 50, 133, 0, 38, 21]
----------
index of train: [40, 3, 31, 132, 14, 97, 143, 131, 102, 104, 140, 126, 89, 74, 22, 36, 62, 23, 90, 50, 133, 0, 38, 21]
index of test: [95, 136, 99, 85, 29, 18, 115, 39, 123, 86, 130, 79, 6, 13, 127, 70, 108, 64, 120, 69, 59, 106, 67, 137]
----------

グラフにするとこうなる。

f:id:momijiame:20200327181632p:plain
MovingWindowKFold を用いた分割 (クリッピング)

時系列版 train_test_split() も作ってみる

あとは、データが大きかったり Nested CV する状況ならホールドアウトも必要だろうなーと思うので train_test_split() も作ってみた。 サンプルコードは次のとおり。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from calendar import month_name

import pandas as pd
from matplotlib import pyplot as plt
import seaborn as sns
from pandas.plotting import register_matplotlib_converters
from sklearn.model_selection import train_test_split

register_matplotlib_converters()


def ts_train_test_split(df, ts_column, **options):
    """時系列情報が含まれるカラムでソートした iloc を返す Hold-Out"""
    # シャッフルしない
    options['shuffle'] = False
    # 時系列のカラムを取り出す
    ts = df[ts_column]
    # 元々のインデックスを振り直して iloc として使える値 (0, 1, 2...) にする
    ts_df = ts.reset_index()
    # 時系列でソートする
    sorted_ts_df = ts_df.sort_values(by=ts_column)
    # 添字を計算する
    train_index, test_index = train_test_split(sorted_ts_df.index, **options)
    return list(train_index), list(test_index)

def main():
    df = sns.load_dataset('flights')

    month_name_mappings = {name: str(n).zfill(2) for n, name in
                           enumerate(month_name)}
    df['month'] = df['month'].apply(lambda x: month_name_mappings[x])
    df['year-month'] = df.year.astype(str) + '-' + df.month.astype(str)
    df['year-month'] = pd.to_datetime(df['year-month'], format='%Y-%m')

    # データの並び順をシャッフルする
    df = df.sample(frac=1.0, random_state=42)

    # 学習データとテストデータに分割する
    train_index, test_index = ts_train_test_split(df, ts_column='year-month', test_size=0.33)

    # 添字
    print(f'index of train: {train_index}')
    print(f'index of test: {test_index}')

    # グラフに描いてみる
    sns.lineplot(data=df.iloc[train_index], x='year-month', y='passengers', label='train')
    sns.lineplot(data=df.iloc[test_index], x='year-month', y='passengers', label='test')

    plt.legend()
    plt.show()


if __name__ == '__main__':
    main()

上記を実行してみよう。

$ python tsholtout.py
index of train: [42, 128, 98, 91, 27, 72, 96, 80, 87, 26, 34, 20, 5, 88, 141, 53, 33, 92, 9, 1, 138, 116, 56, 49, 47, 48, 28, 16, 44, 125, 61, 30, 119, 65, 78, 76, 32, 124, 93, 55, 45, 110, 37, 81, 52, 25, 103, 60, 113, 75, 101, 10, 129, 71, 100, 24, 4, 117, 111, 121, 41, 105, 68, 122, 15, 7, 8, 51, 57, 17, 82, 139, 94, 19, 135, 118, 63, 77, 11, 107, 58, 66, 2, 84, 43, 73, 46, 134, 114, 83, 112, 109, 142, 35, 12, 54]
index of test: [40, 3, 31, 132, 14, 97, 143, 131, 102, 104, 140, 126, 89, 74, 22, 36, 62, 23, 90, 50, 133, 0, 38, 21, 95, 136, 99, 85, 29, 18, 115, 39, 123, 86, 130, 79, 6, 13, 127, 70, 108, 64, 120, 69, 59, 106, 67, 137]

次のようなグラフが得られる。

f:id:momijiame:20200327181748p:plain
時系列データの Hold-Out 分割 (1)

train_test_split() 関数のラッパーとして動作するので、オプションはそのまま使える。 たとえば、検証用データの比率を 50% まで増やしてみよう。

    train_index, test_index = ts_train_test_split(df, ts_column='year-month', test_size=0.5)

上記のコードにしたモジュールを実行してみる。

$ python tsholtout.py
index of train: [42, 128, 98, 91, 27, 72, 96, 80, 87, 26, 34, 20, 5, 88, 141, 53, 33, 92, 9, 1, 138, 116, 56, 49, 47, 48, 28, 16, 44, 125, 61, 30, 119, 65, 78, 76, 32, 124, 93, 55, 45, 110, 37, 81, 52, 25, 103, 60, 113, 75, 101, 10, 129, 71, 100, 24, 4, 117, 111, 121, 41, 105, 68, 122, 15, 7, 8, 51, 57, 17, 82, 139]
index of test: [94, 19, 135, 118, 63, 77, 11, 107, 58, 66, 2, 84, 43, 73, 46, 134, 114, 83, 112, 109, 142, 35, 12, 54, 40, 3, 31, 132, 14, 97, 143, 131, 102, 104, 140, 126, 89, 74, 22, 36, 62, 23, 90, 50, 133, 0, 38, 21, 95, 136, 99, 85, 29, 18, 115, 39, 123, 86, 130, 79, 6, 13, 127, 70, 108, 64, 120, 69, 59, 106, 67, 137]

得られるグラフは次のとおり。

f:id:momijiame:20200327181811p:plain
時系列データの Hold-Out 分割 (2)

いじょう。

Kaggleで勝つデータ分析の技術

Kaggleで勝つデータ分析の技術

Python: Luigi のパラメータ爆発問題について

Luigi は、Python を使って実装された、バッチ処理のパイプラインを扱うためのフレームワーク。 Luigi でパイプラインを定義するときは、基本的には個別のタスクを依存関係でつないでいくことになる。 このとき、扱う処理によってはパイプラインは長大になると共に扱うパラメータの数も増える。 そうすると、依存関係で上流にあるタスクに対して、どのようにパラメータを渡すか、という問題が生じる。

この問題は、公式のドキュメントではパラメータ爆発 (parameter explosion) と表現されている。

luigi.readthedocs.io

今回は、このパラメータ爆発問題を解決する方法について。 なお、基本的には上記のドキュメントに解決方法が書いてあるので、そちらを読むでも良い。

使った環境は次のとおり。

$ sw_vers           
ProductName:    Mac OS X
ProductVersion: 10.14.6
BuildVersion:   18G3020
$ python -V                 
Python 3.8.1
$ pip list | grep -i luigi   
luigi           2.8.12 

下準備

事前の準備として、Luigi をインストールしておく。

$ pip install luigi

パラメータ爆発によって生じるパラメータのバケツリレー

まずは、パラメータ爆発によって何が引き起こされるのかについて解説する。 たとえば、タスクとして UpstreamTaskDownstreamTask があるとする。 DownstreamTaskUpstreamTask に依存していて、それぞれ動作に必要なパラメータがある。

このとき、愚直にパイプラインを定義すると、次のようなコードになる。 DownstreamTask では、UpstreamTask に必要なパラメータを二重に定義した上で、タスクを生成するときに渡している。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi


class ExampleCommonTask(luigi.Task):
    """ターゲットファイルを生成しない例示用のタスク"""
    # タスクの実行が完了したかを示すフラグ
    done = False

    def run(self):
        """タスクが実行されたときに呼ばれるメソッド"""
        print(f'run: {self}')
        # 実行されたらタスクが完了したことにする
        self.done = True

    def complete(self):
        """タスク完了のチェックに使われるメソッド"""
        return self.done


class UpstreamTask(ExampleCommonTask):
    """他のタスクから依存されているタスク"""

    # 上流タスクのパラメータ
    upstream_task_param = luigi.Parameter()


class DownstreamTask(ExampleCommonTask):
    """他のタスクに依存しているタスク"""

    # タスク固有のパラメータ
    downstream_task_param = luigi.Parameter()

    # 上流タスクのパラメータ
    # FIXME: 同じ内容を二重に定義してしまっている
    upstream_task_param = luigi.Parameter()

    def requires(self):
        """依存しているタスクを示すメソッド"""
        # FIXME: 依存しているタスクを生成するときにパラメータのバケツリレーが生じている
        yield UpstreamTask(self.upstream_task_param)


def main():
    # 実行したい下流タスクにすべてのパラメータを渡してバケツリレーする
    luigi.run(cmdline_args=['DownstreamTask',
                            '--downstream-task-param=downstream',
                            '--upstream-task-param=upstream',
                            '--local-scheduler'])


if __name__ == '__main__':
    main()

上記をファイルに保存して実行してみよう。

$ python example.py
...(省略)...
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 DownstreamTask(downstream_task_param=downstream, upstream_task_param=upstream)
    - 1 UpstreamTask(upstream_task_param=upstream)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

実行サマリーを見ると、ちゃんと DownstreamTask に渡したパラメータが UpstreamTask に渡されている。 たしかに、問題なく動いてはいる。 しかし、これではパラメータが二重管理になっているし書き間違いも起きやすい。 もし、上流のタスクが追加されたら下流のタスクはすべて修正が必要になってしまう。 タスクの依存関係とパラメータが少ないうちは何とかなっても、規模が大きくなれば早々に破綻するのは明らかだろう。

クラス名を指定してパラメータを渡す

ここからは、上記の問題を解決する方法について書いていく。 まず、ひとつ目のやり方はタスクの依存関係を示すときにパラメータを指定しないというもの。 こうすると、不足しているパラメータはコマンドラインやコンフィグファイル経由で渡すことになる。

以下にサンプルコードを示す。 先ほどとの違いは DownstreamTaskrequires() メソッドのところ。 メソッドで UpstreamTask のインスタンスを返すときに、パラメータをまったく指定していない。 代わりに、実行するときにコマンドラインからそれぞれのクラスの名前を指定してパラメータを渡している。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi


class ExampleCommonTask(luigi.Task):
    """ターゲットファイルを生成しない例示用のタスク"""
    # タスクの実行が完了したかを示すフラグ
    done = False

    def run(self):
        """タスクが実行されたときに呼ばれるメソッド"""
        print(f'run: {self}')
        # 実行されたらタスクが完了したことにする
        self.done = True

    def complete(self):
        """タスク完了のチェックに使われるメソッド"""
        return self.done


class UpstreamTask(ExampleCommonTask):
    """他のタスクから依存されているタスク"""

    # 上流タスクのパラメータ
    upstream_task_param = luigi.Parameter()


class DownstreamTask(ExampleCommonTask):
    """他のタスクに依存しているタスク"""

    # タスク固有のパラメータ
    downstream_task_param = luigi.Parameter()

    def requires(self):
        """依存しているタスクを示すメソッド"""
        # 依存しているタスクを生成するときにパラメータを渡さない
        # 代わりにコマンドラインの引数でクラス名を指定してパラメータを渡す
        yield UpstreamTask()


def main():
    # 実行するときにクラス名を指定することで上流タスクに直接パラメータを渡せる
    luigi.run(cmdline_args=['DownstreamTask',
                            '--DownstreamTask-downstream-task-param=downstream',
                            '--UpstreamTask-upstream-task-param=upstream',
                            '--local-scheduler'])


if __name__ == '__main__':
    main()

上記をファイルに保存して実行してみよう。

$ python example.py
...(省略)...
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 DownstreamTask(downstream_task_param=downstream)
    - 1 UpstreamTask(upstream_task_param=upstream)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

実行サマリからは、ピンポイントにそれぞれのクラスにパラメータが渡されていることがわかる。 ただし、この方法は下流にあるタスクが上流のタスクのパラメータを利用するときには使えない

@requires デコレータを使う

次に紹介するのは luigi.util.requires デコレータを使うやり方。 このデコレータを使ってタスクのクラスを修飾すると、自動的に必要なパラメータとメソッドを追加してくれる。

以下にサンプルコードを示す。 DownstreamTask@requires デコレータを使って UpstreamTask を引数に修飾している。 こうすると、指定したクラスで必要となるパラメータを自動的に追加すると共に、requires() メソッドでクラスを指定したのと同じことになる。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi
from luigi.util import requires


class ExampleCommonTask(luigi.Task):
    """ターゲットファイルを生成しない例示用のタスク"""

    # タスクの実行が完了したかを示すフラグ
    done = False

    def run(self):
        """タスクが実行されたときに呼ばれるメソッド"""
        print(f'run: {self}')
        # 実行されたらタスクが完了したことにする
        self.done = True

    def complete(self):
        """タスク完了のチェックに使われるメソッド"""
        return self.done


class UpstreamTask(ExampleCommonTask):
    """他のタスクから依存されているタスク"""

    # 上流タスクのパラメータ
    upstream_task_param = luigi.Parameter()


# 他のタスクに依存していることをデコレータで示す
# 必要なパラメータも自動的に定義されるので二重管理がなくなる
@requires(UpstreamTask)
class DownstreamTask(ExampleCommonTask):
    """他のタスクに依存しているタスク"""

    # タスク固有のパラメータ
    downstream_task_param = luigi.Parameter()


def main():
    luigi.run(cmdline_args=['DownstreamTask',
                            '--downstream-task-param=downstream',
                            '--upstream-task-param=upstream',
                            '--local-scheduler'])


if __name__ == '__main__':
    main()

上記をファイルに保存して実行してみよう。

$ python example.py
...(省略)...
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 DownstreamTask(upstream_task_param=upstream, downstream_task_param=downstream)
    - 1 UpstreamTask(upstream_task_param=upstream)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

実行サマリを見ると、最初のパターンと同じように UpstreamTask で必要とするパラメータが DownstreamTask にも重複して渡されている。 これなら下流のタスクでも上流のタスクのパラメータが利用できる。

@inherits デコレータを使う

もうひとつのやり方は @inherits デコレータを使うもので、アプローチは @requires デコレータと似ている。 ただし、こちらは本来の目的とはちょっと違うものを無理やり転用して使っている感じがある。

次のサンプルコードでは DownstreamTask@inherits デコレータで UpstreamTask を引数に修飾している。 こうすると DownstreamTaskUpstreamTask と同じパラメータを持つようになる。 ただし、@requires デコレータと違って依存関係はセットされない。 そのため、自分で requires() メソッドに必要なタスクを指定することになる。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi
from luigi.util import inherits


class ExampleCommonTask(luigi.Task):
    """ターゲットファイルを生成しない例示用のタスク"""

    # タスクの実行が完了したかを示すフラグ
    done = False

    def run(self):
        """タスクが実行されたときに呼ばれるメソッド"""
        print(f'run: {self}')
        # 実行されたらタスクが完了したことにする
        self.done = True

    def complete(self):
        """タスク完了のチェックに使われるメソッド"""
        return self.done


class UpstreamTask(ExampleCommonTask):
    """他のタスクから依存されているタスク"""

    # 上流タスクのパラメータ
    upstream_task_param = luigi.Parameter()


@inherits(UpstreamTask)
class DownstreamTask(ExampleCommonTask):
    """他のタスクに依存しているタスク"""

    # タスク固有のパラメータ
    downstream_task_param = luigi.Parameter()

    def requires(self):
        """依存しているタスクを示すメソッド"""
        # clone() することで、自身のパラメータを適用したタスクが得られる
        # self.clone_parent() でも良い
        return self.clone(UpstreamTask)


def main():
    luigi.run(cmdline_args=['DownstreamTask',
                            '--downstream-task-param=downstream',
                            '--upstream-task-param=upstream',
                            '--local-scheduler'])


if __name__ == '__main__':
    main()

上記をファイルに保存して実行してみよう。

$ python example.py
...(省略)...
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 DownstreamTask(upstream_task_param=upstream, downstream_task_param=downstream)
    - 1 UpstreamTask(upstream_task_param=upstream)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

実行サマリを見ると、ちゃんとパラメータを適用したタスクが実行されていることがわかる。

まとめ

今回は、パラメータ爆発問題を回避するいくつかの方法を試してみた。

  • タスクの依存関係を requires() メソッドで示すときにパラメータをブランクにする
  • @requires デコレータを使う
  • @inherits デコレータを使う

いじょう。

Linuxで動かしながら学ぶTCP/IPネットワーク入門

Linuxで動かしながら学ぶTCP/IPネットワーク入門

  • 作者:もみじあめ
  • 発売日: 2020/03/06
  • メディア: オンデマンド (ペーパーバック)

Python: Luigi のイベントハンドラを試してみる

今回は、Luigi でタスクの開始や成功・失敗などのときに発火するイベントハンドラを扱ってみる。 なお、Luigi はバッチ処理などのパイプラインを組むのに使われるソフトウェアのこと。 基本的な使い方については以下を参照してほしい。

blog.amedama.jp

使った環境は次の通り。

$ sw_vers 
ProductName:    Mac OS X
ProductVersion: 10.14.6
BuildVersion:   18G3020
$ python -V        
Python 3.7.6

下準備

下準備として Luigi をインストールしておく。

$ pip install luigi

イベントハンドラを登録する

早速だけど以下にサンプルコードを示す。 Luigi では、デコレータを使ってイベントが発生したときに実行したい処理を登録できる。 サンプルコードでは、タスクが開始されたタイミングと成功・失敗したタイミングで実行される処理を登録している。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi


class ExampleTask(luigi.Task):
    """サンプルのタスク"""

    def run(self):
        print(f'run: {self}')


@luigi.Task.event_handler(luigi.Event.START)
def on_start(task):
    """タスクを開始したときのハンドラ"""
    print(f'on_start: {task}')


@luigi.Task.event_handler(luigi.Event.SUCCESS)
def on_success(task):
    """タスクが成功したときのハンドラ"""
    print(f'on_success: {task}')


@luigi.Task.event_handler(luigi.Event.FAILURE)
def on_failure(task, exception):
    """タスクが失敗したときのハンドラ"""
    print(f'on_failure: {task} with {exception}')


def main():
    # 最終的に実行したいタスクを指定して開始する
    luigi.run(main_task_cls=ExampleTask, local_scheduler=True)


if __name__ == '__main__':
    main()

上記のサンプルコードを実行してみよう。 読みやすくなるように標準エラー出力は表示していない。

$ python evhandler.py 2>/dev/null
on_start: ExampleTask()
run: ExampleTask()
on_success: ExampleTask()

上記を見ると、ちゃんとタスクの開始と成功したタイミングでイベントハンドラが実行されていることがわかる。

意図的にタスクを失敗させてみる

試しに、意図的にタスクを失敗させてみよう。 次のサンプルコードではタスクで例外を上げている。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi


class ExampleTask(luigi.Task):

    def run(self):
        print(f'run: {self}')
        # 例外を上げる
        raise Exception('Oops!')


@luigi.Task.event_handler(luigi.Event.START)
def on_start(task):
    print(f'on_start: {task}')


@luigi.Task.event_handler(luigi.Event.SUCCESS)
def on_success(task):
    print(f'on_success: {task}')


@luigi.Task.event_handler(luigi.Event.FAILURE)
def on_failure(task, exception):
    print(f'on_failure: {task} with {exception}')


def main():
    luigi.run(main_task_cls=ExampleTask, local_scheduler=True)


if __name__ == '__main__':
    main()

上記を実行してみよう。

$ python evhandler.py 2>/dev/null
on_start: ExampleTask()
run: ExampleTask()
on_failure: ExampleTask() with Oops!

今度は、失敗したときのイベントハンドラが実行されていることがわかる。 また、同時に例外オブジェクトもハンドラに渡されている。

一連の複数のタスクを実行してみる

続いては、依存関係をもった複数のタスクが実行されたときの挙動を確認しておく。 次のサンプルコードでは、ExampleTaskA と、それに依存した ExampleTaskB というタスクを定義している。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi
from luigi.mock import MockTarget


class ExampleTaskA(luigi.Task):

    def run(self):
        print(f'run: {self}')
        # モックの出力にメッセージを書き込む
        out = self.output()
        with out.open('w') as f:
            f.write('Hello, World!\n')

    def output(self):
        # オンメモリのモックを出力にする
        return MockTarget('mock')


class ExampleTaskB(luigi.Task):

    def requires(self):
        # このタスクの実行には事前に以下のタスクを実行する必要がある
        return ExampleTaskA()

    def run(self):
        print(f'run: {self}')


@luigi.Task.event_handler(luigi.Event.START)
def on_start(task):
    """タスクを開始したときのハンドラ"""
    print(f'on_start: {task}')


@luigi.Task.event_handler(luigi.Event.SUCCESS)
def on_success(task):
    """タスクが成功したときのハンドラ"""
    print(f'on_success: {task}')


@luigi.Task.event_handler(luigi.Event.FAILURE)
def on_failure(task, exception):
    """タスクが失敗したときのハンドラ"""
    print(f'on_failure: {task} with {exception}')


def main():
    # 最終的に実行したいタスクを指定して開始する
    luigi.run(main_task_cls=ExampleTaskB, local_scheduler=True)


if __name__ == '__main__':
    main()

上記を実行してみる。

$ python evhandler.py 2>/dev/null
on_start: ExampleTaskA()
run: ExampleTaskA()
on_success: ExampleTaskA()
on_start: ExampleTaskB()
run: ExampleTaskB()
on_success: ExampleTaskB()

ちゃんと、それぞれのタスクが開始・成功したタイミングでイベントハンドラが実行されているようだ。

特定のタスクだけで実行したいハンドラを登録する

ここまでのイベントハンドラは、すべてのタスクに共通して実行されるものだった。 続いては、特定のタスクだけで実行されるイベントハンドラを登録してみる。

次のサンプルコードでは ExampleTaskB にだけタスクが成功したときに実行されるイベントハンドラを登録している。 Luigi の Task クラスには event_handler() というクラスメソッドが定義されていて、これを使ってタスクに対してイベントハンドラを登録できるようになっている。 そこで、定義したタスクの event_handler() メソッドを使えば、そのタスクだけで実行されるイベントハンドラを登録できる。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi
from luigi.mock import MockTarget


class ExampleTaskA(luigi.Task):

    def run(self):
        print(f'run: {self}')
        # モックの出力にメッセージを書き込む
        out = self.output()
        with out.open('w') as f:
            f.write('Hello, World!\n')

    def output(self):
        # オンメモリのモックを出力にする
        return MockTarget('mock')


class ExampleTaskB(luigi.Task):

    def requires(self):
        # このタスクの実行には事前に以下のタスクを実行する必要がある
        return ExampleTaskA()

    def run(self):
        print(f'run: {self}')


# 特定のタスクのイベントをハンドルしたいときはクラスメソッドの eventhandler() を使う
@ExampleTaskB.event_handler(luigi.Event.SUCCESS)
def on_success(task):
    print(f'on_success: {task}')


def main():
    # 最終的に実行したいタスクを指定して開始する
    luigi.run(main_task_cls=ExampleTaskB, local_scheduler=True)


if __name__ == '__main__':
    main()

あるいは、単純にタスクに各イベントハンドラがあらかじめ定義されているので、それをオーバーライドしても良い。 おそらく、ほとんどのユースケースではこちらを使えば問題ないはず。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi
from luigi.mock import MockTarget


class ExampleTaskA(luigi.Task):

    def run(self):
        print(f'run: {self}')
        # モックの出力にメッセージを書き込む
        out = self.output()
        with out.open('w') as f:
            f.write('Hello, World!\n')

    def output(self):
        # オンメモリのモックを出力にする
        return MockTarget('mock')


class ExampleTaskB(luigi.Task):

    def requires(self):
        # このタスクの実行には事前に以下のタスクを実行する必要がある
        return ExampleTaskA()

    def run(self):
        print(f'run: {self}')

    # 特定のタスクのイベントをハンドルしたいときはメソッドをオーバーライドするだけ
    def on_success(task):
        print(f'on_success: {task}')


def main():
    # 最終的に実行したいタスクを指定して開始する
    luigi.run(main_task_cls=ExampleTaskB, local_scheduler=True)


if __name__ == '__main__':
    main()

上記を実行してみよう。

$ python evhandler.py 2>/dev/null
run: ExampleTaskA()
run: ExampleTaskB()
on_success: ExampleTaskB()

すると、今度は ExampleTaskB の成功時だけイベントハンドラが実行されていることがわかる。

めでたしめでたし。