CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: 中心化移動平均 (CMA: Centered Moving Average) について

以前から移動平均 (MA: Moving Average) という手法自体は知っていたけど、中心化移動平均 (CMA: Centered Moving Average) というものがあることは知らなかった。 一般的な移動平均である後方移動平均は、データの対応関係が原系列に対して遅れてしまう。 そこで、中心化移動平均という手法を使うことで遅れをなくすらしい。 この手法は、たとえば次のような用途でひとつのやり方として使われているようだ。

  • 不規則変動の除去
  • 季節変動の除去

使った環境は次のとおり。

$ sw_vers  
ProductName:    Mac OS X
ProductVersion: 10.14.6
BuildVersion:   18G3020
$ python -V
Python 3.7.7

下準備

下準備として、必要なパッケージをインストールしておく。

$ pip install pandas seaborn

インストールできたら Python のインタプリタを起動する。

$ python

起動できたら、1 ~ 12 までの数字が入った Series のオブジェクトを作る。

>>> import pandas as pd
>>> x = pd.Series(range(1, 12 + 1))
>>> x
0      1
1      2
2      3
3      4
4      5
5      6
6      7
7      8
8      9
9     10
10    11
11    12
dtype: int64

上記のオブジェクトを使って移動平均の計算方法について学んでいく。

(後方) 移動平均 (MA: Moving Average)

はじめに、一般的な移動平均である後方移動平均から試す。 Pandas では rolling() メソッドを使うことで移動平均を計算できる。 以下では 3 点の要素で平均をとる 3 点移動平均を計算している。

>>> backward_3p_ma = x.rolling(window=3).mean()
>>> backward_3p_ma
0      NaN
1      NaN
2      2.0
3      3.0
4      4.0
5      5.0
6      6.0
7      7.0
8      8.0
9      9.0
10    10.0
11    11.0
dtype: float64

この計算では、たとえばインデックスで 2 に入る要素は、次のように計算される。

backward_3p_ma[2] = (x[0] + x[1] + x[2]) / 3

つまり、自分よりも後ろの値を使って平均を計算することになる。

この計算方法では、原系列との対応関係を考えたとき次のように 1 つ分の遅れが出る。

>>> pd.concat([x, backward_3p_ma], axis=1)
     0     1
0    1   NaN
1    2   NaN
2    3   2.0
3    4   3.0
4    5   4.0
5    6   5.0
6    7   6.0
7    8   7.0
8    9   8.0
9   10   9.0
10  11  10.0
11  12  11.0

中心化移動平均 (CMA: Centered Moving Average)

そこで、遅れの影響を取り除いたものが中心化移動平均と呼ばれるらしい。 たとえば、平均をとる要素数が奇数のときは、単に rolling() メソッドの center オプションを有効にするだけで良い。

>>> centered_3p_ma = x.rolling(window=3, center=True).mean()
>>> centered_3p_ma
0      NaN
1      2.0
2      3.0
3      4.0
4      5.0
5      6.0
6      7.0
7      8.0
8      9.0
9     10.0
10    11.0
11     NaN
dtype: float64

このオプションが有効だと、たとえばインデックスで 2 に入る要素は、次のように計算される。 つまり、自分の前後にある値を使って平均を計算することになる。

centered_3p_ma[2] = (x[1] + x[2] + x[3]) / 3

現系列との対応関係を確認すると、このやり方では遅れがなくなっている。

>>> pd.concat([x, centered_3p_ma], axis=1)
     0     1
0    1   NaN
1    2   2.0
2    3   3.0
3    4   4.0
4    5   5.0
5    6   6.0
6    7   7.0
7    8   8.0
8    9   9.0
9   10  10.0
10  11  11.0
11  12   NaN

ただし、このオプションは実装的には単に後方移動平均を計算した上で shift() しているだけに過ぎないらしい。 ようするに、こういうこと。

>>> backward_3p_ma_shifted = x.rolling(window=3).mean().shift(-1)
>>> pd.concat([x, backward_3p_ma_shifted], axis=1)
     0     1
0    1   NaN
1    2   2.0
2    3   3.0
3    4   4.0
4    5   5.0
5    6   6.0
6    7   7.0
7    8   8.0
8    9   9.0
9   10  10.0
10  11  11.0
11  12   NaN

そのため、平均をとる要素数が偶数のときに困ったことが起こる。

平均をとる要素数が偶数のときの問題点について

試しに、平均をとる要素数を 4 点に増やして、そのまま計算してみよう。

>>> centerd_4p_ma = x.rolling(window=4, center=True).mean() 
>>> centerd_4p_ma
0      NaN
1      NaN
2      2.5
3      3.5
4      4.5
5      5.5
6      6.5
7      7.5
8      8.5
9      9.5
10    10.5
11     NaN
dtype: float64

すると、計算結果に端数が出ている。

原系列と比較すると、対応関係に 0.5 の遅れがあることがわかる。

>>> pd.concat([x, centerd_4p_ma], axis=1)
     0     1
0    1   NaN
1    2   NaN
2    3   2.5
3    4   3.5
4    5   4.5
5    6   5.5
6    7   6.5
7    8   7.5
8    9   8.5
9   10   9.5
10  11  10.5
11  12   NaN

つまり、中心化移動平均では平均をとる要素数が偶数と奇数のときで計算方法を変えなければいけない。

要素数が偶数のときの計算方法

要素数が偶数のときの中心化移動平均は、計算が 2 段階に分かれている。 はじめに、後方移動平均をそのまま計算する。

>>> backward_4p_ma =x.rolling(window=4).mean()
>>> backward_4p_ma
0      NaN
1      NaN
2      NaN
3      2.5
4      3.5
5      4.5
6      5.5
7      6.5
8      7.5
9      8.5
10     9.5
11    10.5
dtype: float64

その上で、移動平均に使った要素数の半分だけデータをずらし、もう一度要素数 2 で平均を取り直す。

centerd_4p_ma = backward_4p_ma.shift(-2).rolling(window=2).mean()

原系列との対応関係を確認すると、遅れが解消していることがわかる。

>>> pd.concat([x, centerd_4p_ma], axis=1)
     0     1
0    1   NaN
1    2   NaN
2    3   3.0
3    4   4.0
4    5   5.0
5    6   6.0
6    7   7.0
7    8   8.0
8    9   9.0
9   10  10.0
10  11   NaN
11  12   NaN

別のデータで中心化移動平均を計算してみる

もうちょっとちゃんとしたデータでも計算してみよう。 次のサンプルコードでは、旅客機の乗客数の推移に対して 12 点で中心化移動平均を計算している。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from calendar import month_name

import seaborn as sns
from matplotlib import pyplot as plt
import pandas as pd
from pandas.plotting import register_matplotlib_converters
register_matplotlib_converters()


def main():
    # 航空機の乗客を記録したデータセットを読み込む
    df = sns.load_dataset('flights')
    # 月の名前を数字に直す
    month_name_mappings = {name: str(n).zfill(2) for n, name in
                           enumerate(month_name)}
    df['month'] = df['month'].apply(lambda x: month_name_mappings[x])
    # 年と月を結合したカラムを用意する
    df['year-month'] = df.year.astype(str) + '-' + df.month.astype(str)
    df['year-month'] = pd.to_datetime(df['year-month'], format='%Y-%m')

    # 原系列
    sns.lineplot(data=df, x='year-month', y='passengers', label='original')
    # 中心化移動平均
    df['passengers-cma'] = df['passengers'].rolling(window=12).mean().shift(-6).rolling(2).mean()
    sns.lineplot(data=df, x='year-month', y='passengers-cma', label='CMA')

    # グラフを表示する
    plt.show()


if __name__ == '__main__':
    main()

上記を実行してみる。

$ python flightscma.py

得られるグラフが次のとおり。

f:id:momijiame:20200401184339p:plain
旅客機の乗客数データに対して 12 点中心化移動平均を計算する

ここまでデータ点数が多いと、正直 0.5 の遅れとか言われてもよくわからないけど、これで計算できているはず。

エンタープライズ向けヘリウム充填 HDD が安く手に入ることがあるらしい

この製品は、一昨年あたりから「HDD ガチャ」という愛称で親しまれているようだ。 今回は、Amazon セールで安くなっていたので実際にガチャを引いてみることにした。 このエントリは、その備忘録になる。

HDD ガチャとは

Western Digital (WD) が販売している外付けハードディスク Elements Desktop シリーズの大容量モデル (8TB~) のこと。

この製品がガチャと呼ばれている理由は、8TB に関して内部の HDD に複数のモデルが使われているところらしい。 具体的には、次の 2 種類がこれまでに確認されている。

  • WD80EMAZ-00WJTA0
    • Ultrastar DC HC510 の廉価版
    • ヘリウム充填モデル
  • WD80EMAZ-00M9AA0
    • Ultrastar DC HC320 の廉価版

Ultrastar DC HC シリーズは、Western Digital に買収された HGST が製造しているエンタープライズ向けの製品になる。 つまり、この製品はコンシューマ向けにもかかわらず、内部にはエンタープライズ向けの HDD (の廉価版) が使われていることになる。 ようするに、企業のデータセンターの中でサーバに使うような高信頼モデル 1 ということ。

ガチャでいう「当たり」は HC510 の廉価版であるヘリウム充填モデルになる。 しかも、ウワサによると昨年あたりからの出荷分はすべて中身が「当たり」のヘリウム充填モデルになっているらしい。 また、10TB 以上に関してはヘリウム充填モデルしか確認されておらず、すべて「当たり」といえる。

そして、ガチャが楽しまれている理由は、使われている内部の HDD の珍しさからというわけではない。 何なら Ultrastar DC HC も、お金さえ出せば買うことができる 2 。 その主な理由は値段で、今回 Amazon のセールでは 8TB のモデルが約 1.7 万円になっていた。 また、セールでなくても約 2 万円で買うことができる。 そして、同じ容量だとコンシューマ向け NAS 用途の WD Red シリーズで約 2.5 万円する。

とはいえ、内部で使われているハードディスクは製品の仕様になっているわけではない。 ある時から異なるモデルが使われることも十分に考えられるため、実際のところは買ってみないとわからない。 そうした意味では、ガチャ要素はいまだ健在といえるだろう。

ガチャを引いてみた

そんなわけで、前置きが長くなったけど今回はガチャを引いてみた。

以下が届いた製品のパッケージで、至ってシンプル。

f:id:momijiame:20200328144335j:plain
製品のパッケージ

付属品は本体・電源アダプタ・USB ケーブルの 3 つ。

f:id:momijiame:20200328144615j:plain
製品の付属品

付属の USB ケーブルでつないで CrystalDiskInfo を確認した画面が次のとおり。 この型番は、ガチャでいう「当たり」のヘリウム充填モデルだ。

f:id:momijiame:20200328194002p:plain
CrystalDiskInfo

せっかくなので中身の HDD も確認してみよう。 筐体のカバーはツメで固定されているので、使っていないカードなどを差し込んでツメを外していく。

f:id:momijiame:20200328165626j:plain
隙間にカードなどを差し込んでツメを外す

開封すると、こんな感じ。 たしかに、Ultrastar DC HC の筐体に酷似しているようだ。

f:id:momijiame:20200328160420j:plain
中身の 3.5 インチハードディスク

注意点

この製品は、単なる USB 接続の外付けハードディスクとして使う分には何の懸念もない。 ただ単にエンタープライズ向けの製品が安く手に入るというだけ。 しかし、中のハードディスクを取り出して SATA で使うとき (殻割り) には、いくつかの注意点がある。

3.3V 問題

この製品は、通称 3.3V 問題と呼ばれる相性の問題がある。 そのため、使うマザーボードによっては別途ペリフェラル4ピン電源変換ケーブルを必要とする。

製品保証外になる

この製品には購入時から 2 年間の保証がついているけど、殻割りして使う分には当然ながら保証の対象外になる。 一般的な HDD は購入時に保証がついているので、その点は割り引いて考える必要があるはず。

参考

henjinkutsu.com

nyanshiba.hatenablog.com


  1. ただし、データセンター向けの製品が常に優れているというわけではない。騒音や発熱が大きかったり、動作温度領域が狭かったりと、家庭では扱いにくいこともある。

  2. 5 万円くらいする。

Python: 時系列データの交差検証と TimeSeriesSplit の改良について

一般的に、時系列データを扱うタスクでは過去のデータを使って未来のデータを予測することになる。 そのため、交差検証するときも過去のデータを使ってモデルを学習させた上で未来のデータを使って検証しなければいけない。 もし、未来のデータがモデルの学習データに混入すると、本来は利用できないデータにもとづいた楽観的な予測が得られてしまう。 今回は、そんな時系列データの交差検証と scikit-learn の TimeSeriesSplit の改良について書いてみる。

使った環境は次のとおり。

$ sw_vers           
ProductName:    Mac OS X
ProductVersion: 10.14.6
BuildVersion:   18G3020
$ python -V                 
Python 3.8.1

下準備

あらかじめ、必要なパッケージをインストールしておく。

$ pip install scikit-learn pandas seaborn

scikit-learn の TimeSeriesSplit を使った時系列データの交差検証

scikit-learn には、時系列データの交差検証に使うための KFold 実装として TimeSeriesSplit というクラスがある。 このクラスは、データが時系列にもとづいてソートされているという前提でリークが起こらないようにデータを分割できる。

試しに、航空機の旅客数を記録したデータセットを使って動作を確かめてみよう。 以下にサンプルコードを示す。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from calendar import month_name

import pandas as pd
from matplotlib import pyplot as plt
import seaborn as sns
from sklearn.model_selection import TimeSeriesSplit
from pandas.plotting import register_matplotlib_converters
register_matplotlib_converters()


def main():
    # 航空機の旅客数を記録したデータセットを読み込む
    df = sns.load_dataset('flights')

    # 時系列のカラムを用意する
    month_name_mappings = {name: str(n).zfill(2) for n, name in
                           enumerate(month_name)}
    df['month'] = df['month'].apply(lambda x: month_name_mappings[x])
    df['year-month'] = df.year.astype(str) + '-' + df.month.astype(str)
    df['year-month'] = pd.to_datetime(df['year-month'], format='%Y-%m')

    # データの並び順を元に分割する
    folds = TimeSeriesSplit(n_splits=5)

    # 5 枚のグラフを用意する
    fig, axes = plt.subplots(5, 1, figsize=(12, 12))

    # 学習用のデータとテスト用のデータに分割するためのインデックス情報を得る
    for i, (train_index, test_index) in enumerate(folds.split(df)):
        # 生のインデックス
        print(f'index of train: {train_index}')
        print(f'index of test: {test_index}')
        print('----------')
        # 元のデータを描く
        sns.lineplot(data=df, x='year-month', y='passengers', ax=axes[i], label='original')
        # 学習用データを描く
        sns.lineplot(data=df.iloc[train_index], x='year-month', y='passengers', ax=axes[i], label='train')
        # テスト用データを描く
        sns.lineplot(data=df.iloc[test_index], x='year-month', y='passengers', ax=axes[i], label='test')

    # グラフを表示する
    plt.legend()
    plt.show()


if __name__ == '__main__':
    main()

上記のサンプルコードを実行する。 TimeSeriesSplit#split() から得られた添字が表示される。

$ python flights.py
index of train: [ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23]
index of test: [24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47]
----------
index of train: [ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47]
index of test: [48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71]
----------
index of train: [ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71]
index of test: [72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95]
----------
index of train: [ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95]
index of test: [ 96  97  98  99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
 114 115 116 117 118 119]
----------
index of train: [  0   1   2   3   4   5   6   7   8   9  10  11  12  13  14  15  16  17
  18  19  20  21  22  23  24  25  26  27  28  29  30  31  32  33  34  35
  36  37  38  39  40  41  42  43  44  45  46  47  48  49  50  51  52  53
  54  55  56  57  58  59  60  61  62  63  64  65  66  67  68  69  70  71
  72  73  74  75  76  77  78  79  80  81  82  83  84  85  86  87  88  89
  90  91  92  93  94  95  96  97  98  99 100 101 102 103 104 105 106 107
 108 109 110 111 112 113 114 115 116 117 118 119]
index of test: [120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
 138 139 140 141 142 143]
----------

また、次のような折れ線グラフが得られる。

f:id:momijiame:20200327180723p:plain
TimeSeriesSplit を用いた分割

グラフを見ると、時系列で過去のデータが学習用、未来のデータが検証用として分割されていることがわかる。

TimeSeriesSplit の使いにくさについて

ただ、TimeSeriesSplit は使ってみると意外と面倒くさい。 なぜかというと、データが時系列にもとづいてソートされていないと使えないため。

たとえば、先ほどのサンプルコードに次のようなデータをシャッフルするコードを挿入してみよう。 こうすると、データが時系列順ではなくなる。

    # データの並び順をシャッフルする (時系列順ではなくなる)
    df = df.sample(frac=1.0, random_state=42)

コードを挿入した状態で実行してみよう。

$ python shuffledflights.py
index of train: [ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23]
index of test: [24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47]
----------
index of train: [ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47]
index of test: [48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71]
----------
index of train: [ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71]
index of test: [72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95]
----------
index of train: [ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95]
index of test: [ 96  97  98  99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
 114 115 116 117 118 119]
----------
index of train: [  0   1   2   3   4   5   6   7   8   9  10  11  12  13  14  15  16  17
  18  19  20  21  22  23  24  25  26  27  28  29  30  31  32  33  34  35
  36  37  38  39  40  41  42  43  44  45  46  47  48  49  50  51  52  53
  54  55  56  57  58  59  60  61  62  63  64  65  66  67  68  69  70  71
  72  73  74  75  76  77  78  79  80  81  82  83  84  85  86  87  88  89
  90  91  92  93  94  95  96  97  98  99 100 101 102 103 104 105 106 107
 108 109 110 111 112 113 114 115 116 117 118 119]
index of test: [120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
 138 139 140 141 142 143]
----------

得られるグラフは次のとおり。 当たり前だけど、先ほどとは違って分割の仕方が時系列とは関係ないぐちゃぐちゃな状態になっている。 これでは、学習用データが検証用データよりも未来にあることがあるので、適切な交差検証ができない。

f:id:momijiame:20200327180824p:plain
TimeSeriesSplit を用いた分割 (時系列にもとづかないソート済み)

TimeSeriesSplit を改良してみる

そこで、TimeSeriesSplit を改良してみることにした。 具体的には、次のようなポイント。

  • DataFrame を受け取れる
  • 時系列にもとづいてソートされていなくても良い
  • 代わりに、特定のカラムを時系列の情報として指定できる

以下にサンプルコードを示す。 MovingWindowKFold というクラスがそれで、TimeSeriesSplit のラッパーとして振る舞うように作ってある。 MovingWindowKFold#split()DataFrame#iloc で使える添字のリストを返す。 DataFrame そのもののインデックスを返すことも考えたけど、まあそこは必要に応じて直してもらえば。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import sys
from calendar import month_name

import pandas as pd
from matplotlib import pyplot as plt
import seaborn as sns
from sklearn.model_selection import TimeSeriesSplit
from pandas.plotting import register_matplotlib_converters
register_matplotlib_converters()


class MovingWindowKFold(TimeSeriesSplit):
    """時系列情報が含まれるカラムでソートした iloc を返す KFold"""

    def __init__(self, ts_column, clipping=False, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # 時系列データのカラムの名前
        self.ts_column = ts_column
        # 得られる添字のリストの長さを過去最小の Fold に揃えるフラグ
        self.clipping = clipping

    def split(self, X, *args, **kwargs):
        # 渡されるデータは DataFrame を仮定する
        assert isinstance(X, pd.DataFrame)

        # clipping が有効なときの長さの初期値
        train_fold_min_len, test_fold_min_len = sys.maxsize, sys.maxsize

        # 時系列のカラムを取り出す
        ts = X[self.ts_column]
        # 元々のインデックスを振り直して iloc として使える値 (0, 1, 2...) にする
        ts_df = ts.reset_index()
        # 時系列でソートする
        sorted_ts_df = ts_df.sort_values(by=self.ts_column)
        # スーパークラスのメソッドで添字を計算する
        for train_index, test_index in super().split(sorted_ts_df, *args, **kwargs):
            # 添字を元々の DataFrame の iloc として使える値に変換する
            train_iloc_index = sorted_ts_df.iloc[train_index].index
            test_iloc_index = sorted_ts_df.iloc[test_index].index

            if self.clipping:
                # TimeSeriesSplit.split() で返される Fold の大きさが徐々に大きくなることを仮定している
                train_fold_min_len = min(train_fold_min_len, len(train_iloc_index))
                test_fold_min_len = min(test_fold_min_len, len(test_iloc_index))

            yield list(train_iloc_index[-train_fold_min_len:]), list(test_iloc_index[-test_fold_min_len:])


def main():
    df = sns.load_dataset('flights')

    month_name_mappings = {name: str(n).zfill(2) for n, name in
                           enumerate(month_name)}
    df['month'] = df['month'].apply(lambda x: month_name_mappings[x])
    df['year-month'] = df.year.astype(str) + '-' + df.month.astype(str)
    df['year-month'] = pd.to_datetime(df['year-month'], format='%Y-%m')

    # データの並び順をシャッフルする
    df = df.sample(frac=1.0, random_state=42)

    # 特定のカラムを時系列としてソートした分割
    folds = MovingWindowKFold(ts_column='year-month', n_splits=5)

    fig, axes = plt.subplots(5, 1, figsize=(12, 12))

    # 元々のデータを時系列ソートした iloc が添字として得られる
    for i, (train_index, test_index) in enumerate(folds.split(df)):
        print(f'index of train: {train_index}')
        print(f'index of test: {test_index}')
        print('----------')
        sns.lineplot(data=df, x='year-month', y='passengers', ax=axes[i], label='original')
        sns.lineplot(data=df.iloc[train_index], x='year-month', y='passengers', ax=axes[i], label='train')
        sns.lineplot(data=df.iloc[test_index], x='year-month', y='passengers', ax=axes[i], label='test')

    plt.legend()
    plt.show()


if __name__ == '__main__':
    main()

上記を実行してみよう。 もちろん、先ほどと同じようにデータはランダムにシャッフルされている。

$ python movwin.py
index of train: [42, 128, 98, 91, 27, 72, 96, 80, 87, 26, 34, 20, 5, 88, 141, 53, 33, 92, 9, 1, 138, 116, 56, 49]
index of test: [47, 48, 28, 16, 44, 125, 61, 30, 119, 65, 78, 76, 32, 124, 93, 55, 45, 110, 37, 81, 52, 25, 103, 60]
----------
index of train: [42, 128, 98, 91, 27, 72, 96, 80, 87, 26, 34, 20, 5, 88, 141, 53, 33, 92, 9, 1, 138, 116, 56, 49, 47, 48, 28, 16, 44, 125, 61, 30, 119, 65, 78, 76, 32, 124, 93, 55, 45, 110, 37, 81, 52, 25, 103, 60]
index of test: [113, 75, 101, 10, 129, 71, 100, 24, 4, 117, 111, 121, 41, 105, 68, 122, 15, 7, 8, 51, 57, 17, 82, 139]
----------
index of train: [42, 128, 98, 91, 27, 72, 96, 80, 87, 26, 34, 20, 5, 88, 141, 53, 33, 92, 9, 1, 138, 116, 56, 49, 47, 48, 28, 16, 44, 125, 61, 30, 119, 65, 78, 76, 32, 124, 93, 55, 45, 110, 37, 81, 52, 25, 103, 60, 113, 75, 101, 10, 129, 71, 100, 24, 4, 117, 111, 121, 41, 105, 68, 122, 15, 7, 8, 51, 57, 17, 82, 139]
index of test: [94, 19, 135, 118, 63, 77, 11, 107, 58, 66, 2, 84, 43, 73, 46, 134, 114, 83, 112, 109, 142, 35, 12, 54]
----------
index of train: [42, 128, 98, 91, 27, 72, 96, 80, 87, 26, 34, 20, 5, 88, 141, 53, 33, 92, 9, 1, 138, 116, 56, 49, 47, 48, 28, 16, 44, 125, 61, 30, 119, 65, 78, 76, 32, 124, 93, 55, 45, 110, 37, 81, 52, 25, 103, 60, 113, 75, 101, 10, 129, 71, 100, 24, 4, 117, 111, 121, 41, 105, 68, 122, 15, 7, 8, 51, 57, 17, 82, 139, 94, 19, 135, 118, 63, 77, 11, 107, 58, 66, 2, 84, 43, 73, 46, 134, 114, 83, 112, 109, 142, 35, 12, 54]
index of test: [40, 3, 31, 132, 14, 97, 143, 131, 102, 104, 140, 126, 89, 74, 22, 36, 62, 23, 90, 50, 133, 0, 38, 21]
----------
index of train: [42, 128, 98, 91, 27, 72, 96, 80, 87, 26, 34, 20, 5, 88, 141, 53, 33, 92, 9, 1, 138, 116, 56, 49, 47, 48, 28, 16, 44, 125, 61, 30, 119, 65, 78, 76, 32, 124, 93, 55, 45, 110, 37, 81, 52, 25, 103, 60, 113, 75, 101, 10, 129, 71, 100, 24, 4, 117, 111, 121, 41, 105, 68, 122, 15, 7, 8, 51, 57, 17, 82, 139, 94, 19, 135, 118, 63, 77, 11, 107, 58, 66, 2, 84, 43, 73, 46, 134, 114, 83, 112, 109, 142, 35, 12, 54, 40, 3, 31, 132, 14, 97, 143, 131, 102, 104, 140, 126, 89, 74, 22, 36, 62, 23, 90, 50, 133, 0, 38, 21]
index of test: [95, 136, 99, 85, 29, 18, 115, 39, 123, 86, 130, 79, 6, 13, 127, 70, 108, 64, 120, 69, 59, 106, 67, 137]
----------

得られるグラフは次のとおり。 ランダムにシャッフルされたデータでも、ちゃんと時系列でデータが分割されていることがわかる。

f:id:momijiame:20200327181559p:plain
MovingWindowKFold を用いた分割

ちなみに、学習データを等幅に切りそろえる機能も用意した。 この手法に世間的にはなんて名前がついているのかわからないけど、とりあえず clipping オプションを有効にすれば使える。

    folds = MovingWindowKFold(ts_column='year-month', clipping=True, n_splits=5)

有効にした状態で実行するとこんな感じ。 リストの長さが揃っている。

$ python movwin.py
index of train: [42, 128, 98, 91, 27, 72, 96, 80, 87, 26, 34, 20, 5, 88, 141, 53, 33, 92, 9, 1, 138, 116, 56, 49]
index of test: [47, 48, 28, 16, 44, 125, 61, 30, 119, 65, 78, 76, 32, 124, 93, 55, 45, 110, 37, 81, 52, 25, 103, 60]
----------
index of train: [47, 48, 28, 16, 44, 125, 61, 30, 119, 65, 78, 76, 32, 124, 93, 55, 45, 110, 37, 81, 52, 25, 103, 60]
index of test: [113, 75, 101, 10, 129, 71, 100, 24, 4, 117, 111, 121, 41, 105, 68, 122, 15, 7, 8, 51, 57, 17, 82, 139]
----------
index of train: [113, 75, 101, 10, 129, 71, 100, 24, 4, 117, 111, 121, 41, 105, 68, 122, 15, 7, 8, 51, 57, 17, 82, 139]
index of test: [94, 19, 135, 118, 63, 77, 11, 107, 58, 66, 2, 84, 43, 73, 46, 134, 114, 83, 112, 109, 142, 35, 12, 54]
----------
index of train: [94, 19, 135, 118, 63, 77, 11, 107, 58, 66, 2, 84, 43, 73, 46, 134, 114, 83, 112, 109, 142, 35, 12, 54]
index of test: [40, 3, 31, 132, 14, 97, 143, 131, 102, 104, 140, 126, 89, 74, 22, 36, 62, 23, 90, 50, 133, 0, 38, 21]
----------
index of train: [40, 3, 31, 132, 14, 97, 143, 131, 102, 104, 140, 126, 89, 74, 22, 36, 62, 23, 90, 50, 133, 0, 38, 21]
index of test: [95, 136, 99, 85, 29, 18, 115, 39, 123, 86, 130, 79, 6, 13, 127, 70, 108, 64, 120, 69, 59, 106, 67, 137]
----------

グラフにするとこうなる。

f:id:momijiame:20200327181632p:plain
MovingWindowKFold を用いた分割 (クリッピング)

時系列版 train_test_split() も作ってみる

あとは、データが大きかったり Nested CV する状況ならホールドアウトも必要だろうなーと思うので train_test_split() も作ってみた。 サンプルコードは次のとおり。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from calendar import month_name

import pandas as pd
from matplotlib import pyplot as plt
import seaborn as sns
from pandas.plotting import register_matplotlib_converters
from sklearn.model_selection import train_test_split

register_matplotlib_converters()


def ts_train_test_split(df, ts_column, **options):
    """時系列情報が含まれるカラムでソートした iloc を返す Hold-Out"""
    # シャッフルしない
    options['shuffle'] = False
    # 時系列のカラムを取り出す
    ts = df[ts_column]
    # 元々のインデックスを振り直して iloc として使える値 (0, 1, 2...) にする
    ts_df = ts.reset_index()
    # 時系列でソートする
    sorted_ts_df = ts_df.sort_values(by=ts_column)
    # 添字を計算する
    train_index, test_index = train_test_split(sorted_ts_df.index, **options)
    return list(train_index), list(test_index)

def main():
    df = sns.load_dataset('flights')

    month_name_mappings = {name: str(n).zfill(2) for n, name in
                           enumerate(month_name)}
    df['month'] = df['month'].apply(lambda x: month_name_mappings[x])
    df['year-month'] = df.year.astype(str) + '-' + df.month.astype(str)
    df['year-month'] = pd.to_datetime(df['year-month'], format='%Y-%m')

    # データの並び順をシャッフルする
    df = df.sample(frac=1.0, random_state=42)

    # 学習データとテストデータに分割する
    train_index, test_index = ts_train_test_split(df, ts_column='year-month', test_size=0.33)

    # 添字
    print(f'index of train: {train_index}')
    print(f'index of test: {test_index}')

    # グラフに描いてみる
    sns.lineplot(data=df.iloc[train_index], x='year-month', y='passengers', label='train')
    sns.lineplot(data=df.iloc[test_index], x='year-month', y='passengers', label='test')

    plt.legend()
    plt.show()


if __name__ == '__main__':
    main()

上記を実行してみよう。

$ python tsholtout.py
index of train: [42, 128, 98, 91, 27, 72, 96, 80, 87, 26, 34, 20, 5, 88, 141, 53, 33, 92, 9, 1, 138, 116, 56, 49, 47, 48, 28, 16, 44, 125, 61, 30, 119, 65, 78, 76, 32, 124, 93, 55, 45, 110, 37, 81, 52, 25, 103, 60, 113, 75, 101, 10, 129, 71, 100, 24, 4, 117, 111, 121, 41, 105, 68, 122, 15, 7, 8, 51, 57, 17, 82, 139, 94, 19, 135, 118, 63, 77, 11, 107, 58, 66, 2, 84, 43, 73, 46, 134, 114, 83, 112, 109, 142, 35, 12, 54]
index of test: [40, 3, 31, 132, 14, 97, 143, 131, 102, 104, 140, 126, 89, 74, 22, 36, 62, 23, 90, 50, 133, 0, 38, 21, 95, 136, 99, 85, 29, 18, 115, 39, 123, 86, 130, 79, 6, 13, 127, 70, 108, 64, 120, 69, 59, 106, 67, 137]

次のようなグラフが得られる。

f:id:momijiame:20200327181748p:plain
時系列データの Hold-Out 分割 (1)

train_test_split() 関数のラッパーとして動作するので、オプションはそのまま使える。 たとえば、検証用データの比率を 50% まで増やしてみよう。

    train_index, test_index = ts_train_test_split(df, ts_column='year-month', test_size=0.5)

上記のコードにしたモジュールを実行してみる。

$ python tsholtout.py
index of train: [42, 128, 98, 91, 27, 72, 96, 80, 87, 26, 34, 20, 5, 88, 141, 53, 33, 92, 9, 1, 138, 116, 56, 49, 47, 48, 28, 16, 44, 125, 61, 30, 119, 65, 78, 76, 32, 124, 93, 55, 45, 110, 37, 81, 52, 25, 103, 60, 113, 75, 101, 10, 129, 71, 100, 24, 4, 117, 111, 121, 41, 105, 68, 122, 15, 7, 8, 51, 57, 17, 82, 139]
index of test: [94, 19, 135, 118, 63, 77, 11, 107, 58, 66, 2, 84, 43, 73, 46, 134, 114, 83, 112, 109, 142, 35, 12, 54, 40, 3, 31, 132, 14, 97, 143, 131, 102, 104, 140, 126, 89, 74, 22, 36, 62, 23, 90, 50, 133, 0, 38, 21, 95, 136, 99, 85, 29, 18, 115, 39, 123, 86, 130, 79, 6, 13, 127, 70, 108, 64, 120, 69, 59, 106, 67, 137]

得られるグラフは次のとおり。

f:id:momijiame:20200327181811p:plain
時系列データの Hold-Out 分割 (2)

いじょう。

Kaggleで勝つデータ分析の技術

Kaggleで勝つデータ分析の技術

Python: Luigi のパラメータ爆発問題について

Luigi は、Python を使って実装された、バッチ処理のパイプラインを扱うためのフレームワーク。 Luigi でパイプラインを定義するときは、基本的には個別のタスクを依存関係でつないでいくことになる。 このとき、扱う処理によってはパイプラインは長大になると共に扱うパラメータの数も増える。 そうすると、依存関係で上流にあるタスクに対して、どのようにパラメータを渡すか、という問題が生じる。

この問題は、公式のドキュメントではパラメータ爆発 (parameter explosion) と表現されている。

luigi.readthedocs.io

今回は、このパラメータ爆発問題を解決する方法について。 なお、基本的には上記のドキュメントに解決方法が書いてあるので、そちらを読むでも良い。

使った環境は次のとおり。

$ sw_vers           
ProductName:    Mac OS X
ProductVersion: 10.14.6
BuildVersion:   18G3020
$ python -V                 
Python 3.8.1
$ pip list | grep -i luigi   
luigi           2.8.12 

下準備

事前の準備として、Luigi をインストールしておく。

$ pip install luigi

パラメータ爆発によって生じるパラメータのバケツリレー

まずは、パラメータ爆発によって何が引き起こされるのかについて解説する。 たとえば、タスクとして UpstreamTaskDownstreamTask があるとする。 DownstreamTaskUpstreamTask に依存していて、それぞれ動作に必要なパラメータがある。

このとき、愚直にパイプラインを定義すると、次のようなコードになる。 DownstreamTask では、UpstreamTask に必要なパラメータを二重に定義した上で、タスクを生成するときに渡している。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi


class ExampleCommonTask(luigi.Task):
    """ターゲットファイルを生成しない例示用のタスク"""
    # タスクの実行が完了したかを示すフラグ
    done = False

    def run(self):
        """タスクが実行されたときに呼ばれるメソッド"""
        print(f'run: {self}')
        # 実行されたらタスクが完了したことにする
        self.done = True

    def complete(self):
        """タスク完了のチェックに使われるメソッド"""
        return self.done


class UpstreamTask(ExampleCommonTask):
    """他のタスクから依存されているタスク"""

    # 上流タスクのパラメータ
    upstream_task_param = luigi.Parameter()


class DownstreamTask(ExampleCommonTask):
    """他のタスクに依存しているタスク"""

    # タスク固有のパラメータ
    downstream_task_param = luigi.Parameter()

    # 上流タスクのパラメータ
    # FIXME: 同じ内容を二重に定義してしまっている
    upstream_task_param = luigi.Parameter()

    def requires(self):
        """依存しているタスクを示すメソッド"""
        # FIXME: 依存しているタスクを生成するときにパラメータのバケツリレーが生じている
        yield UpstreamTask(self.upstream_task_param)


def main():
    # 実行したい下流タスクにすべてのパラメータを渡してバケツリレーする
    luigi.run(cmdline_args=['DownstreamTask',
                            '--downstream-task-param=downstream',
                            '--upstream-task-param=upstream',
                            '--local-scheduler'])


if __name__ == '__main__':
    main()

上記をファイルに保存して実行してみよう。

$ python example.py
...(省略)...
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 DownstreamTask(downstream_task_param=downstream, upstream_task_param=upstream)
    - 1 UpstreamTask(upstream_task_param=upstream)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

実行サマリーを見ると、ちゃんと DownstreamTask に渡したパラメータが UpstreamTask に渡されている。 たしかに、問題なく動いてはいる。 しかし、これではパラメータが二重管理になっているし書き間違いも起きやすい。 もし、上流のタスクが追加されたら下流のタスクはすべて修正が必要になってしまう。 タスクの依存関係とパラメータが少ないうちは何とかなっても、規模が大きくなれば早々に破綻するのは明らかだろう。

クラス名を指定してパラメータを渡す

ここからは、上記の問題を解決する方法について書いていく。 まず、ひとつ目のやり方はタスクの依存関係を示すときにパラメータを指定しないというもの。 こうすると、不足しているパラメータはコマンドラインやコンフィグファイル経由で渡すことになる。

以下にサンプルコードを示す。 先ほどとの違いは DownstreamTaskrequires() メソッドのところ。 メソッドで UpstreamTask のインスタンスを返すときに、パラメータをまったく指定していない。 代わりに、実行するときにコマンドラインからそれぞれのクラスの名前を指定してパラメータを渡している。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi


class ExampleCommonTask(luigi.Task):
    """ターゲットファイルを生成しない例示用のタスク"""
    # タスクの実行が完了したかを示すフラグ
    done = False

    def run(self):
        """タスクが実行されたときに呼ばれるメソッド"""
        print(f'run: {self}')
        # 実行されたらタスクが完了したことにする
        self.done = True

    def complete(self):
        """タスク完了のチェックに使われるメソッド"""
        return self.done


class UpstreamTask(ExampleCommonTask):
    """他のタスクから依存されているタスク"""

    # 上流タスクのパラメータ
    upstream_task_param = luigi.Parameter()


class DownstreamTask(ExampleCommonTask):
    """他のタスクに依存しているタスク"""

    # タスク固有のパラメータ
    downstream_task_param = luigi.Parameter()

    def requires(self):
        """依存しているタスクを示すメソッド"""
        # 依存しているタスクを生成するときにパラメータを渡さない
        # 代わりにコマンドラインの引数でクラス名を指定してパラメータを渡す
        yield UpstreamTask()


def main():
    # 実行するときにクラス名を指定することで上流タスクに直接パラメータを渡せる
    luigi.run(cmdline_args=['DownstreamTask',
                            '--DownstreamTask-downstream-task-param=downstream',
                            '--UpstreamTask-upstream-task-param=upstream',
                            '--local-scheduler'])


if __name__ == '__main__':
    main()

上記をファイルに保存して実行してみよう。

$ python example.py
...(省略)...
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 DownstreamTask(downstream_task_param=downstream)
    - 1 UpstreamTask(upstream_task_param=upstream)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

実行サマリからは、ピンポイントにそれぞれのクラスにパラメータが渡されていることがわかる。 ただし、この方法は下流にあるタスクが上流のタスクのパラメータを利用するときには使えない

@requires デコレータを使う

次に紹介するのは luigi.util.requires デコレータを使うやり方。 このデコレータを使ってタスクのクラスを修飾すると、自動的に必要なパラメータとメソッドを追加してくれる。

以下にサンプルコードを示す。 DownstreamTask@requires デコレータを使って UpstreamTask を引数に修飾している。 こうすると、指定したクラスで必要となるパラメータを自動的に追加すると共に、requires() メソッドでクラスを指定したのと同じことになる。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi
from luigi.util import requires


class ExampleCommonTask(luigi.Task):
    """ターゲットファイルを生成しない例示用のタスク"""

    # タスクの実行が完了したかを示すフラグ
    done = False

    def run(self):
        """タスクが実行されたときに呼ばれるメソッド"""
        print(f'run: {self}')
        # 実行されたらタスクが完了したことにする
        self.done = True

    def complete(self):
        """タスク完了のチェックに使われるメソッド"""
        return self.done


class UpstreamTask(ExampleCommonTask):
    """他のタスクから依存されているタスク"""

    # 上流タスクのパラメータ
    upstream_task_param = luigi.Parameter()


# 他のタスクに依存していることをデコレータで示す
# 必要なパラメータも自動的に定義されるので二重管理がなくなる
@requires(UpstreamTask)
class DownstreamTask(ExampleCommonTask):
    """他のタスクに依存しているタスク"""

    # タスク固有のパラメータ
    downstream_task_param = luigi.Parameter()


def main():
    luigi.run(cmdline_args=['DownstreamTask',
                            '--downstream-task-param=downstream',
                            '--upstream-task-param=upstream',
                            '--local-scheduler'])


if __name__ == '__main__':
    main()

上記をファイルに保存して実行してみよう。

$ python example.py
...(省略)...
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 DownstreamTask(upstream_task_param=upstream, downstream_task_param=downstream)
    - 1 UpstreamTask(upstream_task_param=upstream)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

実行サマリを見ると、最初のパターンと同じように UpstreamTask で必要とするパラメータが DownstreamTask にも重複して渡されている。 これなら下流のタスクでも上流のタスクのパラメータが利用できる。

@inherits デコレータを使う

もうひとつのやり方は @inherits デコレータを使うもので、アプローチは @requires デコレータと似ている。 ただし、こちらは本来の目的とはちょっと違うものを無理やり転用して使っている感じがある。

次のサンプルコードでは DownstreamTask@inherits デコレータで UpstreamTask を引数に修飾している。 こうすると DownstreamTaskUpstreamTask と同じパラメータを持つようになる。 ただし、@requires デコレータと違って依存関係はセットされない。 そのため、自分で requires() メソッドに必要なタスクを指定することになる。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi
from luigi.util import inherits


class ExampleCommonTask(luigi.Task):
    """ターゲットファイルを生成しない例示用のタスク"""

    # タスクの実行が完了したかを示すフラグ
    done = False

    def run(self):
        """タスクが実行されたときに呼ばれるメソッド"""
        print(f'run: {self}')
        # 実行されたらタスクが完了したことにする
        self.done = True

    def complete(self):
        """タスク完了のチェックに使われるメソッド"""
        return self.done


class UpstreamTask(ExampleCommonTask):
    """他のタスクから依存されているタスク"""

    # 上流タスクのパラメータ
    upstream_task_param = luigi.Parameter()


@inherits(UpstreamTask)
class DownstreamTask(ExampleCommonTask):
    """他のタスクに依存しているタスク"""

    # タスク固有のパラメータ
    downstream_task_param = luigi.Parameter()

    def requires(self):
        """依存しているタスクを示すメソッド"""
        # clone() することで、自身のパラメータを適用したタスクが得られる
        # self.clone_parent() でも良い
        return self.clone(UpstreamTask)


def main():
    luigi.run(cmdline_args=['DownstreamTask',
                            '--downstream-task-param=downstream',
                            '--upstream-task-param=upstream',
                            '--local-scheduler'])


if __name__ == '__main__':
    main()

上記をファイルに保存して実行してみよう。

$ python example.py
...(省略)...
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 DownstreamTask(upstream_task_param=upstream, downstream_task_param=downstream)
    - 1 UpstreamTask(upstream_task_param=upstream)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

実行サマリを見ると、ちゃんとパラメータを適用したタスクが実行されていることがわかる。

まとめ

今回は、パラメータ爆発問題を回避するいくつかの方法を試してみた。

  • タスクの依存関係を requires() メソッドで示すときにパラメータをブランクにする
  • @requires デコレータを使う
  • @inherits デコレータを使う

いじょう。

Linuxで動かしながら学ぶTCP/IPネットワーク入門

Linuxで動かしながら学ぶTCP/IPネットワーク入門

  • 作者:もみじあめ
  • 発売日: 2020/03/06
  • メディア: オンデマンド (ペーパーバック)

Python: Luigi のイベントハンドラを試してみる

今回は、Luigi でタスクの開始や成功・失敗などのときに発火するイベントハンドラを扱ってみる。 なお、Luigi はバッチ処理などのパイプラインを組むのに使われるソフトウェアのこと。 基本的な使い方については以下を参照してほしい。

blog.amedama.jp

使った環境は次の通り。

$ sw_vers 
ProductName:    Mac OS X
ProductVersion: 10.14.6
BuildVersion:   18G3020
$ python -V        
Python 3.7.6

下準備

下準備として Luigi をインストールしておく。

$ pip install luigi

イベントハンドラを登録する

早速だけど以下にサンプルコードを示す。 Luigi では、デコレータを使ってイベントが発生したときに実行したい処理を登録できる。 サンプルコードでは、タスクが開始されたタイミングと成功・失敗したタイミングで実行される処理を登録している。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi


class ExampleTask(luigi.Task):
    """サンプルのタスク"""

    def run(self):
        print(f'run: {self}')


@luigi.Task.event_handler(luigi.Event.START)
def on_start(task):
    """タスクを開始したときのハンドラ"""
    print(f'on_start: {task}')


@luigi.Task.event_handler(luigi.Event.SUCCESS)
def on_success(task):
    """タスクが成功したときのハンドラ"""
    print(f'on_success: {task}')


@luigi.Task.event_handler(luigi.Event.FAILURE)
def on_failure(task, exception):
    """タスクが失敗したときのハンドラ"""
    print(f'on_failure: {task} with {exception}')


def main():
    # 最終的に実行したいタスクを指定して開始する
    luigi.run(main_task_cls=ExampleTask, local_scheduler=True)


if __name__ == '__main__':
    main()

上記のサンプルコードを実行してみよう。 読みやすくなるように標準エラー出力は表示していない。

$ python evhandler.py 2>/dev/null
on_start: ExampleTask()
run: ExampleTask()
on_success: ExampleTask()

上記を見ると、ちゃんとタスクの開始と成功したタイミングでイベントハンドラが実行されていることがわかる。

意図的にタスクを失敗させてみる

試しに、意図的にタスクを失敗させてみよう。 次のサンプルコードではタスクで例外を上げている。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi


class ExampleTask(luigi.Task):

    def run(self):
        print(f'run: {self}')
        # 例外を上げる
        raise Exception('Oops!')


@luigi.Task.event_handler(luigi.Event.START)
def on_start(task):
    print(f'on_start: {task}')


@luigi.Task.event_handler(luigi.Event.SUCCESS)
def on_success(task):
    print(f'on_success: {task}')


@luigi.Task.event_handler(luigi.Event.FAILURE)
def on_failure(task, exception):
    print(f'on_failure: {task} with {exception}')


def main():
    luigi.run(main_task_cls=ExampleTask, local_scheduler=True)


if __name__ == '__main__':
    main()

上記を実行してみよう。

$ python evhandler.py 2>/dev/null
on_start: ExampleTask()
run: ExampleTask()
on_failure: ExampleTask() with Oops!

今度は、失敗したときのイベントハンドラが実行されていることがわかる。 また、同時に例外オブジェクトもハンドラに渡されている。

一連の複数のタスクを実行してみる

続いては、依存関係をもった複数のタスクが実行されたときの挙動を確認しておく。 次のサンプルコードでは、ExampleTaskA と、それに依存した ExampleTaskB というタスクを定義している。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi
from luigi.mock import MockTarget


class ExampleTaskA(luigi.Task):

    def run(self):
        print(f'run: {self}')
        # モックの出力にメッセージを書き込む
        out = self.output()
        with out.open('w') as f:
            f.write('Hello, World!\n')

    def output(self):
        # オンメモリのモックを出力にする
        return MockTarget('mock')


class ExampleTaskB(luigi.Task):

    def requires(self):
        # このタスクの実行には事前に以下のタスクを実行する必要がある
        return ExampleTaskA()

    def run(self):
        print(f'run: {self}')


@luigi.Task.event_handler(luigi.Event.START)
def on_start(task):
    """タスクを開始したときのハンドラ"""
    print(f'on_start: {task}')


@luigi.Task.event_handler(luigi.Event.SUCCESS)
def on_success(task):
    """タスクが成功したときのハンドラ"""
    print(f'on_success: {task}')


@luigi.Task.event_handler(luigi.Event.FAILURE)
def on_failure(task, exception):
    """タスクが失敗したときのハンドラ"""
    print(f'on_failure: {task} with {exception}')


def main():
    # 最終的に実行したいタスクを指定して開始する
    luigi.run(main_task_cls=ExampleTaskB, local_scheduler=True)


if __name__ == '__main__':
    main()

上記を実行してみる。

$ python evhandler.py 2>/dev/null
on_start: ExampleTaskA()
run: ExampleTaskA()
on_success: ExampleTaskA()
on_start: ExampleTaskB()
run: ExampleTaskB()
on_success: ExampleTaskB()

ちゃんと、それぞれのタスクが開始・成功したタイミングでイベントハンドラが実行されているようだ。

特定のタスクだけで実行したいハンドラを登録する

ここまでのイベントハンドラは、すべてのタスクに共通して実行されるものだった。 続いては、特定のタスクだけで実行されるイベントハンドラを登録してみる。

次のサンプルコードでは ExampleTaskB にだけタスクが成功したときに実行されるイベントハンドラを登録している。 Luigi の Task クラスには event_handler() というクラスメソッドが定義されていて、これを使ってタスクに対してイベントハンドラを登録できるようになっている。 そこで、定義したタスクの event_handler() メソッドを使えば、そのタスクだけで実行されるイベントハンドラを登録できる。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi
from luigi.mock import MockTarget


class ExampleTaskA(luigi.Task):

    def run(self):
        print(f'run: {self}')
        # モックの出力にメッセージを書き込む
        out = self.output()
        with out.open('w') as f:
            f.write('Hello, World!\n')

    def output(self):
        # オンメモリのモックを出力にする
        return MockTarget('mock')


class ExampleTaskB(luigi.Task):

    def requires(self):
        # このタスクの実行には事前に以下のタスクを実行する必要がある
        return ExampleTaskA()

    def run(self):
        print(f'run: {self}')


# 特定のタスクのイベントをハンドルしたいときはクラスメソッドの eventhandler() を使う
@ExampleTaskB.event_handler(luigi.Event.SUCCESS)
def on_success(task):
    print(f'on_success: {task}')


def main():
    # 最終的に実行したいタスクを指定して開始する
    luigi.run(main_task_cls=ExampleTaskB, local_scheduler=True)


if __name__ == '__main__':
    main()

あるいは、単純にタスクに各イベントハンドラがあらかじめ定義されているので、それをオーバーライドしても良い。 おそらく、ほとんどのユースケースではこちらを使えば問題ないはず。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi
from luigi.mock import MockTarget


class ExampleTaskA(luigi.Task):

    def run(self):
        print(f'run: {self}')
        # モックの出力にメッセージを書き込む
        out = self.output()
        with out.open('w') as f:
            f.write('Hello, World!\n')

    def output(self):
        # オンメモリのモックを出力にする
        return MockTarget('mock')


class ExampleTaskB(luigi.Task):

    def requires(self):
        # このタスクの実行には事前に以下のタスクを実行する必要がある
        return ExampleTaskA()

    def run(self):
        print(f'run: {self}')

    # 特定のタスクのイベントをハンドルしたいときはメソッドをオーバーライドするだけ
    def on_success(task):
        print(f'on_success: {task}')


def main():
    # 最終的に実行したいタスクを指定して開始する
    luigi.run(main_task_cls=ExampleTaskB, local_scheduler=True)


if __name__ == '__main__':
    main()

上記を実行してみよう。

$ python evhandler.py 2>/dev/null
run: ExampleTaskA()
run: ExampleTaskB()
on_success: ExampleTaskB()

すると、今度は ExampleTaskB の成功時だけイベントハンドラが実行されていることがわかる。

めでたしめでたし。

「Linuxで動かしながら学ぶTCP/IPネットワーク入門」という本を書きました

表題のとおり TCP/IP に関する本を書きました。 今回は、そのご紹介です!

今のところは Kindle 版が先行して販売されています。 一週間ほどで、紙の本も手に入るようになる見込みです。

(2020-03-06 追記) 紙の本が注文できるようになりました。

(2020-03-28 追記) Kindle 版には、特定の端末で閲覧できない問題を解消したコンテンツが配信されています。 以前に購入された方はコンテンツの更新をお試しください。

どんな本なの?

Linux を使って実際にネットワークを組んで動かしながら TCP/IP について学べる本です。 実際に手を動かすことで、より実践的で風化しにくい知識と技術を身につけることが本の目的です。

こんな人にオススメ

次のいずれかに当てはまるような方には、この本が参考になると思います。

  • ネットワークが専門ではない IT エンジニア、またはそれを志す学生さん
  • 他の TCP/IP に関する本を読んだことはあるけど、身についている実感が少ない
  • インターネットやインフラの技術についてよく知らないけど興味はある
  • ネットワークを気軽に組んで実験できる環境の作り方に興味がある

そして、この本を読んで試した後には、次のような効果が見込めます。

  • 「インターネットの本質を理解できた」という実感が得られる
  • ネットワークのトラブルシュートで見るべきポイントがわかる
  • なんとなく持っていたネットワークに対する苦手意識がなくなる
  • 「ネットワークがこういう状況では何が起こるんだろう」と思ったときに、試す環境が作れるようになる

執筆のきっかけ

執筆のきっかけは、実際に手を動かしながら TCP/IP について学べる本が世の中に必要なんじゃないか、と感じたことです。 TCP/IP という技術は、理屈について書いてある本はたくさんあるものの、実際に手を動かせるものとなると少なくなります。 さらに、手を動かすための環境構築までカバーした本となると、より限られます。

その原因は、TCP/IP を学ぶための環境構築が難しい点が挙げられると思います。 一般的に、ネットワークを組んで勉強しようとすると、複数台のコンピュータを用意する必要があるためです。 もちろん、仮想マシンを使えば物理的なコンピュータは 1 台で済みます。 ですが、それでもネットワークの構築という手間のかかる作業は残っています。

この本では、その問題を Linux の Network Namespace という機能を使って解決しています。 Network Namespace というのは、Docker といった Linux のコンテナ仮想化技術を支えている機能のひとつです。 Network Namespace を使うと、1 台の Linux マシンの中に、ネットワーク的にはシステムから独立した空間を作れます。 実験用のネットワークを作るのに必要な作業はいくつかのコマンドライン操作だけで、完了するまでには数秒もかかりません。

また、Linux を動かす環境には仮想マシンが使えるため、用意すべき物理的なコンピュータは今使っているラップトップで十分です。 もちろん、利用するソフトウェアはどれもフリーなものなので、追加で必要となる費用もありません。 読みながら試すときは Linux のコマンドライン操作が必要になりますが、あまり親しみのない人にも読んでもらえるように、付録には簡単にですが操作ガイドも用意しました。

どんなことが書いてあるの?

TCP/IP を理解する上で、重要なポイントをギュギュッと詰め込みました。 この本は、プロトコルのヘッダに含まれるすべてのフィールドを網羅的に解説していくような性格の本ではありません。 そのため、TCP/IP に詳しい方からすると「これは書いてないのか」と思う点もあるかもしれません。 ですが、この本を読んで「TCP/IP という技術の地図とコンパス」が頭に入れば、他の本も読みやすくなると思います。 また、書く内容を重要なポイントに絞っても、尚 B5 版で 200 ページを超えるボリュームになりました (付録含む)。

本書は、7 つの章に分かれています。 OSI 参照モデルでいうと L2 から L7 までを一冊でひととおりカバーしています。 説明の順序としては「L3 -> L2 -> L4 -> L7 -> NAT -> ソケットプログラミング」という流れです。 ざっくりと、どんなことを書いてあるのか次に示します。

  • 「TCP/IP とは」
    • この章では TCP/IP の概要とインターネットの動く仕組みについて学びます
    • インターネットでパケットがバケツリレーされる仕組みについて、実際に ping や traceroute コマンドを実行しながら説明を進めます
  • 「Network Namespace」
    • この章では Network Namespace を使って実験用ネットワークを組む方法を学びます
    • 「ルータなし → ルータあり → ルータ 2 台」という流れで、少しずつ複雑なネットワークを組みながら説明を進めます
    • ping を使ってネットワークの疎通を確認したり、パケットを tcpdump コマンドを使って観察します
  • 「イーサネット」
    • この章ではイーサネットが近隣までパケットを運ぶ仕組みについて学びます
    • MAC アドレスを使ってフレームが届く仕組みや、ブリッジ (Linux Network Bridge) でブロードキャストドメインを広げる方法を扱います
  • 「トランスポート層のプロトコル」
    • この章では、トランスポート層のプロトコルを使って、通信の種類を識別したり、信頼性のある通信を実現する方法を学びます
    • はじめに、UDP でポート番号を使って通信するアプリケーションを識別する仕組みについて扱います
    • その次に、TCP が信頼性のある通信を実現している仕組みを扱います
  • 「アプリケーション層のプロトコル」
    • この章では、TCP/IP の集大成としてインターネットでやり取りされる実用的な通信について学びます
    • 扱うプロトコルは HTTP、DNS、DHCP です
  • 「NAT」
    • この章では、IPv4 で組まれた家庭やオフィスなどのネットワークのほとんどで用いられている NAT という技術を学びます
    • はじめに、グローバルアドレスを節約するために用いられる Source NAT について扱います
    • その次に、「ポートを空ける」という表現でよく知られている Destination NAT について扱います
  • 「ソケットプログラミング」
    • この章では、TCP/IP をソフトウェアから扱う方法について学びます
    • プログラミング言語としては Python を使います
    • 取り扱う題材は、HTTP クライアント、エコーサーバ、独自の足し算バイナリプロトコルです

どんな人が書いたの?

著者は、とあるインターネット関連企業で働いているソフトウェアエンジニアです。 ネットワーク機器の開発で IPv6 プロトコルスタックを担当していた経験があります。 一時期は SDN (Software Defined Network) 関連のソフトウェアの研究開発や OSS コントリビューションを仕事としてやっていました。 また、時期によっては Web アプリケーションの開発もしていたので、L2 ~ L7 にかけてひととおりの開発経験があります。 なので、出自は「ソフトウェア寄りのネットワーク屋さん」といったところです。

なお、過去にブログで TCP/IP について扱って話題となったエントリだと、次のようなものがあります。

blog.amedama.jp

blog.amedama.jp

中身を軽く読んでみたい

本の雰囲気を確かめてから買いたいという方もいらっしゃるかと思うので、以下に立ち読み版の PDF をご用意しました。

drive.google.com

おわりに

私の好きな言葉のひとつに「特定の分野におけるプロフェッショナルとは、その分野でひととおりの失敗を経験した人のことをいう」という言葉があります。 ぜひ、この本を使って、たくさん失敗できる環境を手にしていただければ幸いです。

謝辞

この場を借りて、本の表紙を書いてくれた Asano Sonoko さん、技術的なレビューをしてくださった @ttsubo さんに心からお礼を申し上げます。 おかげさまで、プライベートで 2 年ほどかけて少しずつ書いた本を世に出すことができました。 本当にありがとうございました。

Python: Optuna を使って QWK の閾値を最適化してみる

最近、Twitter のタイムラインで QWK (Quadratic Weighted Kappa: 二次の重み付きカッパ係数) の最適化が話題になっていたので個人的に調べていた。 QWK は順序つきの多値分類問題を評価するための指標で、予測を大きく外すほど大きなペナルティが与えられるようになっている。 また、予測値の分布が結果に影響する点も特徴的で、この点が今回取り扱う最適化にも関係してくる。

QWK の最適化については、Kaggle 本と、その著者 @Maxwell_110 さんによる次のブログエントリが詳しい。 ようするに、真のラベルの分布に沿った形で予測しないと最適な結果が得られない、ということ。

Kaggleで勝つデータ分析の技術

Kaggleで勝つデータ分析の技術

note.com

Kaggle 本や、Web をざっと調べた限りでは scipy.optimize.minimize() 関数を使って Nelder Mead 法で最適化するのが一般的なようだった。

docs.scipy.org

今回は、せっかくなので Optuna を使って TPE (Tree-structured Parzen Estimator) で最適化してみた。

使った環境は次のとおり。

$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.14.6
BuildVersion:   18G3020
$ python -V                                                                
Python 3.7.6

下準備

はじめに、利用するパッケージをインストールしておく。

$ pip install scikit-learn optuna lightgbm pandas

その上で、Kaggle で開催された次のコンペのデータセットをダウンロードしておく。

www.kaggle.com

これは、前述した Kaggle 本で QWK の最適化が有効に働く例として挙げられているもの。 カレントワーキングディレクトリに、次のファイルがある状態にしておく。

$ ls | grep .zip
sample_submission.csv.zip
test.csv.zip
train.csv.zip

多値分類問題として解いてみる

そもそも、順序つきのラベルを予測するタスクの場合、多値分類問題として解く方法と回帰問題として解く方法がある。 まずは、ベーシックな考え方として多値分類問題として解く方法を試してみる。

早速だけど、サンプルコードは次のとおり。 LightGBM を使って多値分類問題として解いている。 目的関数は Multi LogLoss で、5-Fold CV で QWK について評価している。

#!/usr/bin/env python3

import numbers

import pandas as pd
import numpy as np
import lightgbm as lgb
from sklearn.model_selection import KFold
from sklearn.metrics import cohen_kappa_score


def lgb_custom_metric_qwk_multiclass(preds, data):
    """LightGBM のカスタムメトリックを計算する関数

    多値分類問題として解いた予測から QWK を計算する"""
    # 正解ラベル
    y_true = data.get_label()
    # ラベルの数
    num_labels = data.params['num_class']
    # 多値分類問題では本来は二次元配列が一次元配列になって提供される
    reshaped_preds = preds.reshape(num_labels, len(preds) // num_labels)
    # 最尤と判断したクラスを選ぶ 
    y_pred = np.argmax(reshaped_preds, axis=0)  # 最尤と判断したクラスを選ぶ
    # QWK を計算する
    return 'qwk', qwk(y_true, y_pred), True


def qwk(y_true, y_pred):
    """QWK (Quadratic Weighted Kappa) を計算する関数"""
    return cohen_kappa_score(y_true, y_pred, weights='quadratic')


def _numerical_only_df(df_):
    """数値のカラムだけ取り出す関数"""
    number_cols = [name for name, dtype in df_.dtypes.items()
                   if issubclass(dtype.type, numbers.Number)]
    numerical_df = df_[number_cols]
    return numerical_df


def main():
    # データセットを読み込む
    train_df = pd.read_csv('train.csv.zip')

    # 説明変数
    x_train = train_df.drop('Response', axis=1)
    # 簡単にするため、とりあえずすぐに使えるものだけ取り出して使う
    x_train = _numerical_only_df(x_train)

    # 目的変数
    y_train = train_df.Response.astype(float)
    # 目的変数はゼロからスタートにする
    y_train -= 1

    # LightGBM のデータセット表現にする
    lgb_train = lgb.Dataset(x_train, y_train)

    # 多値分類問題として解く
    lgbm_params = {
        'objective': 'multiclass',
        'metric': 'multi_logloss',
        'num_class': len(y_train.unique()),
        'first_metric_only': True,
        'verbose': -1,
    }

    # データ分割の方法
    folds = KFold(n_splits=5, shuffle=True, random_state=42)

    # 5-Fold CV で検証する
    result = lgb.cv(lgbm_params,
                    lgb_train,
                    num_boost_round=1000,
                    early_stopping_rounds=100,
                    folds=folds,
                    verbose_eval=10,
                    feval=lgb_custom_metric_qwk_multiclass,
                    seed=42,
                    )

    # early stop したラウンドでの QWK を出力する
    print(f'CV Mean QWK: {result["qwk-mean"][-1]}')


if __name__ == '__main__':
    main()

上記を実行した結果が次のとおり。

$ python qwkmc.py
[10]   cv_agg's multi_logloss: 1.37765 + 0.0076621   cv_agg's qwk: 0.442683 + 0.00532416
[20]   cv_agg's multi_logloss: 1.25563 + 0.0095355   cv_agg's qwk: 0.507798 + 0.00703211
[30]   cv_agg's multi_logloss: 1.20349 + 0.0100597   cv_agg's qwk: 0.519766 + 0.00748977
...(省略)...
[220]  cv_agg's multi_logloss: 1.14153 + 0.0126548   cv_agg's qwk: 0.560446 + 0.00516118
[230]  cv_agg's multi_logloss: 1.14198 + 0.0126144   cv_agg's qwk: 0.56074 + 0.00444787
[240]  cv_agg's multi_logloss: 1.14242 + 0.0130142   cv_agg's qwk: 0.561591 + 0.0046819
CV Mean QWK: 0.5574903141807788

交差検証では QWK の平均として約 0.5574 が得られている。

回帰問題として解いてみる

続いては、同じデータを回帰問題として解くパターンについて。 ラベルが順序つきであれば、回帰問題として解いた上で結果を clip + 四捨五入するような方法もある。

サンプルコードは次のとおり。 やっていることは同じで、解き方が違うだけ。

#!/usr/bin/env python3

import numbers

import numpy as np
import pandas as pd
import lightgbm as lgb
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import cohen_kappa_score


def lgb_custom_metric_qwk_regression(preds, data):
    """LightGBM のカスタムメトリックを計算する関数

    回帰問題として解いた予測から QWK を計算する"""
    # 正解ラベル
    y_true = data.get_label()
    # 予測ラベル
    y_pred = np.clip(preds, 0, 7).round()  # 単純に予測値を clip して四捨五入する
    # QWK を計算する
    return 'qwk', qwk(y_true, y_pred), True


def qwk(y_true, y_pred):
    """QWK (Quadratic Weighted Kappa) を計算する関数"""
    return cohen_kappa_score(y_true, y_pred, weights='quadratic')


def _numerical_only_df(df_):
    """数値のカラムだけ取り出す関数"""
    number_cols = [name for name, dtype in df_.dtypes.items()
                   if issubclass(dtype.type, numbers.Number)]
    numerical_df = df_[number_cols]
    return numerical_df


def main():
    # データセットを読み込む
    train_df = pd.read_csv('train.csv.zip')

    # 説明変数
    x_train = train_df.drop('Response', axis=1)
    # 簡単にするため、とりあえずすぐに使えるものだけ取り出して使う
    x_train = _numerical_only_df(x_train)

    # 目的変数
    y_train = train_df.Response.astype(float)
    # 目的変数はゼロからスタートにする
    y_train -= 1

    # LightGBM のデータセット表現にする
    lgb_train = lgb.Dataset(x_train, y_train)

    # 回帰問題として解く
    lgbm_params = {
        'objective': 'regression',
        'metric': 'rmse',
        'first_metric_only': True,
        'verbose': -1,
    }

    # データ分割の方法
    folds = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

    # 5-Fold CV で検証する
    result = lgb.cv(lgbm_params,
                    lgb_train,
                    num_boost_round=1000,
                    early_stopping_rounds=100,
                    folds=folds,
                    verbose_eval=10,
                    feval=lgb_custom_metric_qwk_regression,
                    seed=42,
                    )

    # early stop したラウンドでの QWK を出力する
    print(f'CV Mean QWK: {result["qwk-mean"][-1]}')


if __name__ == '__main__':
    main()

上記の実行結果は次のとおり。

$ python qwkreg.py
[10]   cv_agg's rmse: 2.02046 + 0.0085424    cv_agg's qwk: 0.409782 + 0.0042456
[20]   cv_agg's rmse: 1.91647 + 0.0125972    cv_agg's qwk: 0.506518 + 0.00716283
[30]   cv_agg's rmse: 1.87532 + 0.0140473    cv_agg's qwk: 0.553375 + 0.00638559
...(省略)...
[220]  cv_agg's rmse: 1.83657 + 0.0177681    cv_agg's qwk: 0.607652 + 0.00776567
[230]  cv_agg's rmse: 1.83694 + 0.0177138    cv_agg's qwk: 0.608083 + 0.00775286
[240]  cv_agg's rmse: 1.83728 + 0.0176133    cv_agg's qwk: 0.608366 + 0.00780103
CV Mean QWK: 0.6064040204540543

交差検証では QWK の平均として約 0.6064 が得られている。 先ほどよりもスコアが改善している。

回帰問題として解いた上で OOF Prediction を最適化する

続いては、今回の本題である QWK の閾値の最適化について。 OOF で予測した値を、閾値を最適化することでスコアが改善するかどうか確認してみる。

サンプルコードは次のとおり。 OptunaRounder というクラスを導入して、OOF Prediction を最適化している。

#!/usr/bin/env python3

import numbers

import optuna
import pandas as pd
import numpy as np
import lightgbm as lgb
from sklearn.model_selection import KFold
from sklearn.metrics import cohen_kappa_score


class OptunaRounder:
    """Optuna を使って QWK の最適な閾値を探索するクラス"""

    def __init__(self, y_true, y_pred):
        # 真のラベル
        self.y_true = y_true
        # 予測したラベル
        self.y_pred = y_pred
        # ラベルの種類
        self.labels = np.unique(y_true)

    def __call__(self, trial):
        """最大化したい目的関数"""
        # 閾値を Define by run で追加していく
        thresholds = []
        # ラベルの数 - 1 が必要な閾値の数になる
        for i in range(len(self.labels) - 1):
            # 閾値の下限 (既存の最大 or ラベルの最小値)
            low = max(thresholds) if i > 0 else min(self.labels)
            # 閾値の上限 (ラベルの最大値)
            high = max(self.labels)
            # 閾値の候補を追加する
            t = trial.suggest_uniform(f't{i}', low, high)
            thresholds.append(t)

        # 閾値の候補を元に QWK を計算する
        opt_y_pred = self.adjust(self.y_pred, thresholds)
        return qwk(self.y_true, opt_y_pred)

    def adjust(self, y_pred, thresholds):
        """閾値にもとづいて予測を補正するメソッド"""
        opt_y_pred = pd.cut(y_pred,
                            [-np.inf] + thresholds + [np.inf],
                            labels=self.labels)
        return opt_y_pred


def lgb_custom_metric_qwk_regression(preds, data):
    """LightGBM のカスタムメトリックを計算する関数

    回帰問題として解いた予測から QWK を計算する"""
    # 正解ラベル
    y_true = data.get_label()
    # 予測ラベル
    y_pred = np.clip(preds, 0, 7).round()  # 単純に clip して四捨五入する
    # QWK を計算する
    return 'qwk', qwk(y_true, y_pred), True


def qwk(y_true, y_pred):
    """QWK (Quadratic Weighted Kappa) を計算する関数"""
    return cohen_kappa_score(y_true, y_pred, weights='quadratic')


class ModelExtractionCallback(object):
    """lightgbm.cv() から学習済みモデルを取り出すためのコールバックに使うクラス"""

    def __init__(self):
        self._model = None

    def __call__(self, env):
        # _CVBooster の参照を保持する
        self._model = env.model

    def _assert_called_cb(self):
        if self._model is None:
            # コールバックが呼ばれていないときは例外にする
            raise RuntimeError('callback has not called yet')

    @property
    def boosters_proxy(self):
        self._assert_called_cb()
        # Booster へのプロキシオブジェクトを返す
        return self._model

    @property
    def raw_boosters(self):
        self._assert_called_cb()
        # Booster のリストを返す
        return self._model.boosters

    @property
    def best_iteration(self):
        self._assert_called_cb()
        # Early stop したときの boosting round を返す
        return self._model.best_iteration


def _numerical_only_df(df_):
    """数値のカラムだけ取り出す関数"""
    number_cols = [name for name, dtype in df_.dtypes.items()
                   if issubclass(dtype.type, numbers.Number)]
    numerical_df = df_[number_cols]
    return numerical_df


def main():
    # データセットを読み込む
    train_df = pd.read_csv('train.csv.zip')

    # 説明変数
    x_train = train_df.drop('Response', axis=1)
    # 簡単にするため、とりあえずすぐに使えるものだけ取り出して使う
    x_train = _numerical_only_df(x_train)
    # Id 列をインデックスに設定する
    x_train = x_train.set_index('Id')

    # 目的変数
    y_train = train_df.Response.astype(float)
    # 目的変数はゼロからスタートにする
    y_train -= 1

    # LightGBM のデータセット表現にする
    lgb_train = lgb.Dataset(x_train, y_train)

    # 回帰問題として解く
    lgbm_params = {
        'objective': 'regression',
        'metric': 'rmse',
        'first_metric_only': True,
        'verbose': -1,
    }

    # データ分割の方法
    folds = KFold(n_splits=5, shuffle=True, random_state=42)

    # 学習済みモデルを取り出すためのコールバックを用意する
    extraction_cb = ModelExtractionCallback()
    callbacks = [
        extraction_cb,
    ]

    # 5-Fold CV で検証する
    result = lgb.cv(lgbm_params,
                    lgb_train,
                    num_boost_round=10000,
                    early_stopping_rounds=100,
                    folds=folds,
                    verbose_eval=10,
                    feval=lgb_custom_metric_qwk_regression,
                    callbacks=callbacks,
                    seed=42,
                    )

    # early stop したラウンドでの QWK を出力する
    print(f'CV Mean QWK: {result["qwk-mean"][-1]}')

    # 学習データを学習済みモデルを使って OOF で予測する
    boosters = extraction_cb.raw_boosters
    best_iteration = extraction_cb.best_iteration
    oof_y_pred = np.zeros_like(y_train)
    for (_, val_index), booster in zip(folds.split(x_train, y_train), boosters):
        x_val = x_train.iloc[val_index]
        oof_y_pred[val_index] = booster.predict(list(x_val.to_numpy()),
                                                num_iteration=best_iteration)

    # ひとまず clipping + 四捨五入する
    oof_y_pred = np.clip(oof_y_pred, 0, 7).round()

    # 最適化していない状態での OOF Prediction の QWK
    raw_oof_qwk = qwk(y_train, oof_y_pred)
    print(f'Raw OOF QWK: {raw_oof_qwk}')

    # Optuna を使って QWK の閾値を最適化する
    objective = OptunaRounder(y_train, oof_y_pred)

    # 探索に 100 秒かける
    study = optuna.create_study(direction='maximize')
    study.optimize(objective, timeout=100)

    # 見つけた閾値
    best_thresholds = sorted(study.best_params.values())
    print(f'Optimized thresholds: {best_thresholds}')

    # 閾値を最適化した場合の QWK
    optimized_oof_y_pred = objective.adjust(oof_y_pred, best_thresholds)
    optimized_oof_qwk = qwk(y_train, optimized_oof_y_pred)
    print(f'Optimized OOF QWK: {optimized_oof_qwk}')

    # テスト用データを読み込む
    test_df = pd.read_csv('test.csv.zip')
    test_df = test_df.set_index('Id')

    # 数値のカラムだけ取り出す
    numerical_test_df = _numerical_only_df(test_df)

    # 学習済みモデルの Averaging で予測する
    boosters_proxy = extraction_cb.boosters_proxy
    test_y_preds = boosters_proxy.predict(numerical_test_df,
                                          num_iteration=best_iteration)
    test_y_pred_avg = np.mean(test_y_preds, axis=0)

    # サンプル提出ファイルを読み込む
    submission_df = pd.read_csv('sample_submission.csv.zip')
    submission_df = submission_df.set_index('Id')

    # 最適化していない予測値 (単純なクリッピングと四捨五入)
    submission_df.Response = np.clip(test_y_pred_avg, y_train.min(), y_train.max() + 1).round().astype(int) + 1
    submission_df.to_csv('unoptimized_submission.csv')

    # 最適化した予測値
    optimized_test_y_pred_avg = objective.adjust(test_y_pred_avg, best_thresholds)
    submission_df.Response = optimized_test_y_pred_avg.astype(int) + 1
    submission_df.to_csv('optimized_submission.csv')


if __name__ == '__main__':
    main()

上記を実行した結果は次のとおり。

$ python qwkregopt.py
[10]   cv_agg's rmse: 2.01959 + 0.0101767    cv_agg's qwk: 0.408957 + 0.00513284
[20]   cv_agg's rmse: 1.9165 + 0.008155  cv_agg's qwk: 0.507252 + 0.00192446
[30]   cv_agg's rmse: 1.87492 + 0.00797763   cv_agg's qwk: 0.553388 + 0.00129979
...(省略)...
[200]  cv_agg's rmse: 1.8374 + 0.00729844    cv_agg's qwk: 0.60726 + 0.00206979
[210]  cv_agg's rmse: 1.83764 + 0.00757892   cv_agg's qwk: 0.607434 + 0.00210639
[220]  cv_agg's rmse: 1.83771 + 0.00759351   cv_agg's qwk: 0.607882 + 0.00205622
CV Mean QWK: 0.6054069736940326
Raw OOF QWK: 0.605407593931218
[I 2020-03-01 12:39:28,349] Finished trial#0 resulted in value: 0.14760909846772208. Current best value is 0.14760909846772208 with parameters: {'t0': 0.6069999513322623, 't1': 6.071871351250824, 't2': 6.421127597260041, 't3': 6.6499083619322334, 't4': 6.825786239070231, 't5': 6.94896643994053, 't6': 6.988042174746154}.
[I 2020-03-01 12:39:28,461] Finished trial#1 resulted in value: 0.3273196067078864. Current best value is 0.3273196067078864 with parameters: {'t0': 1.3022593301640533, 't1': 3.3523582269835885, 't2': 5.775809715879871, 't3': 6.748066158840195, 't4': 6.813600862422318, 't5': 6.952492269479133, 't6': 6.986909664863027}.
[I 2020-03-01 12:39:28,580] Finished trial#2 resulted in value: 0.1845825834695184. Current best value is 0.3273196067078864 with parameters: {'t0': 1.3022593301640533, 't1': 3.3523582269835885, 't2': 5.775809715879871, 't3': 6.748066158840195, 't4': 6.813600862422318, 't5': 6.952492269479133, 't6': 6.986909664863027}.
...(省略)...
[I 2020-03-01 12:41:08,371] Finished trial#660 resulted in value: 0.3366380459371958. Current best value is 0.6461578531751463 with parameters: {'t0': 1.9366086554521658, 't1': 2.765496885821397, 't2': 3.711341767628701, 't3': 3.8616368179727982, 't4': 4.309923993071266, 't5': 5.4467521001848525, 't6': 5.753790447645578}.
Optimized thresholds: [1.9366086554521658, 2.765496885821397, 3.711341767628701, 3.8616368179727982, 4.309923993071266, 5.4467521001848525, 5.753790447645578]
Optimized OOF QWK: 0.6461578531751463

最適化する前の OOF Prediction の QWK が約 0.6054 なのに対して、閾値を最適化すると約 0.6461 と、大きくスコアが向上している。

手元の交差検証だけでは何か間違っている恐れがあるので、テストデータに対するサブミッションまでやってみた。 先ほどのモジュールを実行すると unoptimized_submission.csv という最適化前のファイルと、optimized_submission.csv という最適化後のファイルができる。 すると、最適化する前の Public LB / Private LB が 0.61931 / 0.62190 だったのに対して、最適化した後は 0.65801 / 0.66005 と、ちゃんと効果があることが確認できた。

所感など

QWK の最適化は、真のラベルの分布に依存している。 そのため、学習済みモデルを使って予測する対象が学習時の分布と異なる場合、最適化の効果が得られない。 この点から、Private データの分布が異なる場合には最適化が有効ではないと考えられる。

だとすると、Private データの分布が学習時と同じであるという確証が得られない限りは使うのは賭けになるのでは、という疑問が浮かんだ。 とはいえ、うまくいった場合の効果は絶大なので、ある程度の仮定のもとに使う、という選択肢は現実的なのかもしれない。

いじょう。

参考

Kaggleで勝つデータ分析の技術

Kaggleで勝つデータ分析の技術

www.kaggle.com

blog.amedama.jp

blog.amedama.jp