CUBE SUGAR CONTAINER

技術系のこと書きます。

エンタープライズ向けヘリウム充填 HDD が安く手に入ることがあるらしい

この製品は、一昨年あたりから「HDD ガチャ」や「HDD おみくじ」といった愛称で親しまれているようだ。 今回は、Amazon セールで安くなっていたので実際にガチャを引いてみることにした。 このエントリは、その備忘録になる。

HDD ガチャ / HDD おみくじとは

Western Digital (WD) が販売している外付けハードディスク Elements Desktop シリーズの大容量モデル (8TB~) のこと。

この製品がガチャやおみくじと呼ばれている理由は、8TB に関して内部の HDD に複数のモデルが使われているところらしい。 具体的には、次の 2 種類がこれまでに確認されている。

  • WD80EMAZ-00WJTA0
    • Ultrastar DC HC510 の廉価版というウワサ
      • 回転速度は 7200rpm ではなく 5400rpm
    • ヘリウム充填モデル
  • WD80EMAZ-00M9AA0
    • Ultrastar DC HC320 の廉価版というウワサ
      • 回転速度は 7200rpm ではなく 5400rpm

Ultrastar DC HC シリーズは、Western Digital に買収された HGST が製造しているエンタープライズ向けの製品になる。 つまり、ウワサが本当であれば、この製品はコンシューマ向けにもかかわらず、内部にはエンタープライズ向けの HDD (の廉価版) が使われていることになる。 ようするに、企業のデータセンターの中でサーバに使うような高信頼モデル 1 ということ。

ガチャでいう「当たり」は HC510 の廉価版であるヘリウム充填モデルになる。 しかも、ウワサによると昨年あたりからの出荷分はすべて中身が「当たり」のヘリウム充填モデルになっているらしい。 また、10TB 以上に関してはヘリウム充填モデルしか確認されておらず、すべて「当たり」といえる。

そして、ガチャが楽しまれている理由は、使われている内部の HDD の珍しさからというわけではない。 何なら Ultrastar DC HC も、お金さえ出せば買うことができる 2 。 その主な理由は値段で、今回 Amazon のセールでは 8TB のモデルが約 1.7 万円になっていた。 また、セールでなくても約 2 万円で買うことができる。 そして、同じ容量だとコンシューマ向け NAS 用途の WD Red シリーズで約 2.5 万円する。

とはいえ、内部で使われているハードディスクは製品の仕様になっているわけではない。 ある時から異なるモデルが使われることも十分に考えられるため、実際のところは買ってみないとわからない。 そうした意味では、ガチャ要素はいまだ健在といえるだろう。

ガチャを引いてみた

そんなわけで、前置きが長くなったけど今回はガチャを引いてみた。

以下が届いた製品のパッケージで、至ってシンプル。

f:id:momijiame:20200328144335j:plain
製品のパッケージ

付属品は本体・電源アダプタ・USB ケーブルの 3 つ。

f:id:momijiame:20200328144615j:plain
製品の付属品

付属の USB ケーブルでつないで CrystalDiskInfo を確認した画面が次のとおり。 この型番は、ガチャでいう「当たり」のヘリウム充填モデルだ。

f:id:momijiame:20200328194002p:plain
CrystalDiskInfo

せっかくなので中身の HDD も確認してみよう。 筐体のカバーはツメで固定されているので、使っていないカードなどを差し込んでツメを外していく。

f:id:momijiame:20200328165626j:plain
隙間にカードなどを差し込んでツメを外す

開封すると、こんな感じ。 たしかに、ヘリウム充填モデルの筐体だ。

f:id:momijiame:20200328160420j:plain
中身の 3.5 インチハードディスク

注意点

この製品は、単なる USB 接続の外付けハードディスクとして使う分には何の懸念もない。 ただ単にエンタープライズ向けの製品が安く手に入るというだけ。 しかし、中のハードディスクを取り出して SATA で使うとき (殻割り) には、いくつかの注意点がある。

3.3V 問題

この製品は、通称 3.3V 問題と呼ばれる相性の問題がある。 そのため、使うマザーボードによっては別途ペリフェラル4ピン電源変換ケーブルを必要とする。

製品保証外になる

この製品には購入時から 2 年間の保証がついているけど、殻割りして使う分には当然ながら保証の対象外になる。 一般的な HDD は購入時に保証がついているので、その点は割り引いて考える必要があるはず。

(2020-04-18 追記)

その後、殻割りして自宅の NAS (Synology DS218+) のストレージになった。

f:id:momijiame:20200329191354j:plain
殻割り

(2020-07-08 追記)

少し前のセールから、以下のようなレビューが報告されている。

  • ヘリウム非充填モデル (WD80EMAZ-00M9AA0) がまた含まれるようになった
  • ヘリウム充填モデルの型番が WD80EMAZ-00WJTA0 から WD80EZAZ-11TDBA0 に変わった
    • ただし R/N は US7SAL080 で変更なし

参考

henjinkutsu.com

nyanshiba.hatenablog.com


  1. ただし、データセンター向けの製品が常に優れているというわけではない。騒音や発熱が大きかったり、動作温度領域が狭かったりと、家庭では扱いにくいこともある。

  2. 5 万円くらいする。

Python: 時系列データの交差検証と TimeSeriesSplit の改良について

一般的に、時系列データを扱うタスクでは過去のデータを使って未来のデータを予測することになる。 そのため、交差検証するときも過去のデータを使ってモデルを学習させた上で未来のデータを使って検証しなければいけない。 もし、未来のデータがモデルの学習データに混入すると、本来は利用できないデータにもとづいた楽観的な予測が得られてしまう。 今回は、そんな時系列データの交差検証と scikit-learn の TimeSeriesSplit の改良について書いてみる。

使った環境は次のとおり。

$ sw_vers           
ProductName:    Mac OS X
ProductVersion: 10.14.6
BuildVersion:   18G3020
$ python -V                 
Python 3.8.1

下準備

あらかじめ、必要なパッケージをインストールしておく。

$ pip install scikit-learn pandas seaborn

scikit-learn の TimeSeriesSplit を使った時系列データの交差検証

scikit-learn には、時系列データの交差検証に使うための KFold 実装として TimeSeriesSplit というクラスがある。 このクラスは、データが時系列にもとづいてソートされているという前提でリークが起こらないようにデータを分割できる。

試しに、航空機の旅客数を記録したデータセットを使って動作を確かめてみよう。 以下にサンプルコードを示す。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from calendar import month_name

import pandas as pd
from matplotlib import pyplot as plt
import seaborn as sns
from sklearn.model_selection import TimeSeriesSplit
from pandas.plotting import register_matplotlib_converters
register_matplotlib_converters()


def main():
    # 航空機の旅客数を記録したデータセットを読み込む
    df = sns.load_dataset('flights')

    # 時系列のカラムを用意する
    month_name_mappings = {name: str(n).zfill(2) for n, name in
                           enumerate(month_name)}
    df['month'] = df['month'].apply(lambda x: month_name_mappings[x])
    df['year-month'] = df.year.astype(str) + '-' + df.month.astype(str)
    df['year-month'] = pd.to_datetime(df['year-month'], format='%Y-%m')

    # データの並び順を元に分割する
    folds = TimeSeriesSplit(n_splits=5)

    # 5 枚のグラフを用意する
    fig, axes = plt.subplots(5, 1, figsize=(12, 12))

    # 学習用のデータとテスト用のデータに分割するためのインデックス情報を得る
    for i, (train_index, test_index) in enumerate(folds.split(df)):
        # 生のインデックス
        print(f'index of train: {train_index}')
        print(f'index of test: {test_index}')
        print('----------')
        # 元のデータを描く
        sns.lineplot(data=df, x='year-month', y='passengers', ax=axes[i], label='original')
        # 学習用データを描く
        sns.lineplot(data=df.iloc[train_index], x='year-month', y='passengers', ax=axes[i], label='train')
        # テスト用データを描く
        sns.lineplot(data=df.iloc[test_index], x='year-month', y='passengers', ax=axes[i], label='test')

    # グラフを表示する
    plt.legend()
    plt.show()


if __name__ == '__main__':
    main()

上記のサンプルコードを実行する。 TimeSeriesSplit#split() から得られた添字が表示される。

$ python flights.py
index of train: [ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23]
index of test: [24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47]
----------
index of train: [ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47]
index of test: [48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71]
----------
index of train: [ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71]
index of test: [72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95]
----------
index of train: [ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95]
index of test: [ 96  97  98  99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
 114 115 116 117 118 119]
----------
index of train: [  0   1   2   3   4   5   6   7   8   9  10  11  12  13  14  15  16  17
  18  19  20  21  22  23  24  25  26  27  28  29  30  31  32  33  34  35
  36  37  38  39  40  41  42  43  44  45  46  47  48  49  50  51  52  53
  54  55  56  57  58  59  60  61  62  63  64  65  66  67  68  69  70  71
  72  73  74  75  76  77  78  79  80  81  82  83  84  85  86  87  88  89
  90  91  92  93  94  95  96  97  98  99 100 101 102 103 104 105 106 107
 108 109 110 111 112 113 114 115 116 117 118 119]
index of test: [120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
 138 139 140 141 142 143]
----------

また、次のような折れ線グラフが得られる。

f:id:momijiame:20200327180723p:plain
TimeSeriesSplit を用いた分割

グラフを見ると、時系列で過去のデータが学習用、未来のデータが検証用として分割されていることがわかる。

TimeSeriesSplit の使いにくさについて

ただ、TimeSeriesSplit は使ってみると意外と面倒くさい。 なぜかというと、データが時系列にもとづいてソートされていないと使えないため。

たとえば、先ほどのサンプルコードに次のようなデータをシャッフルするコードを挿入してみよう。 こうすると、データが時系列順ではなくなる。

    # データの並び順をシャッフルする (時系列順ではなくなる)
    df = df.sample(frac=1.0, random_state=42)

コードを挿入した状態で実行してみよう。

$ python shuffledflights.py
index of train: [ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23]
index of test: [24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47]
----------
index of train: [ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47]
index of test: [48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71]
----------
index of train: [ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71]
index of test: [72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95]
----------
index of train: [ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95]
index of test: [ 96  97  98  99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
 114 115 116 117 118 119]
----------
index of train: [  0   1   2   3   4   5   6   7   8   9  10  11  12  13  14  15  16  17
  18  19  20  21  22  23  24  25  26  27  28  29  30  31  32  33  34  35
  36  37  38  39  40  41  42  43  44  45  46  47  48  49  50  51  52  53
  54  55  56  57  58  59  60  61  62  63  64  65  66  67  68  69  70  71
  72  73  74  75  76  77  78  79  80  81  82  83  84  85  86  87  88  89
  90  91  92  93  94  95  96  97  98  99 100 101 102 103 104 105 106 107
 108 109 110 111 112 113 114 115 116 117 118 119]
index of test: [120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
 138 139 140 141 142 143]
----------

得られるグラフは次のとおり。 当たり前だけど、先ほどとは違って分割の仕方が時系列とは関係ないぐちゃぐちゃな状態になっている。 これでは、学習用データが検証用データよりも未来にあることがあるので、適切な交差検証ができない。

f:id:momijiame:20200327180824p:plain
TimeSeriesSplit を用いた分割 (時系列にもとづかないソート済み)

TimeSeriesSplit を改良してみる

そこで、TimeSeriesSplit を改良してみることにした。 具体的には、次のようなポイント。

  • DataFrame を受け取れる
  • 時系列にもとづいてソートされていなくても良い
  • 代わりに、特定のカラムを時系列の情報として指定できる

以下にサンプルコードを示す。 MovingWindowKFold というクラスがそれで、TimeSeriesSplit のラッパーとして振る舞うように作ってある。 MovingWindowKFold#split()DataFrame#iloc で使える添字のリストを返す。 DataFrame そのもののインデックスを返すことも考えたけど、まあそこは必要に応じて直してもらえば。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import sys
from calendar import month_name

import pandas as pd
from matplotlib import pyplot as plt
import seaborn as sns
from sklearn.model_selection import TimeSeriesSplit
from pandas.plotting import register_matplotlib_converters
register_matplotlib_converters()


class MovingWindowKFold(TimeSeriesSplit):
    """時系列情報が含まれるカラムでソートした iloc を返す KFold"""

    def __init__(self, ts_column, clipping=False, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # 時系列データのカラムの名前
        self.ts_column = ts_column
        # 得られる添字のリストの長さを過去最小の Fold に揃えるフラグ
        self.clipping = clipping

    def split(self, X, *args, **kwargs):
        # 渡されるデータは DataFrame を仮定する
        assert isinstance(X, pd.DataFrame)

        # clipping が有効なときの長さの初期値
        train_fold_min_len, test_fold_min_len = sys.maxsize, sys.maxsize

        # 時系列のカラムを取り出す
        ts = X[self.ts_column]
        # 元々のインデックスを振り直して iloc として使える値 (0, 1, 2...) にする
        ts_df = ts.reset_index()
        # 時系列でソートする
        sorted_ts_df = ts_df.sort_values(by=self.ts_column)
        # スーパークラスのメソッドで添字を計算する
        for train_index, test_index in super().split(sorted_ts_df, *args, **kwargs):
            # 添字を元々の DataFrame の iloc として使える値に変換する
            train_iloc_index = sorted_ts_df.iloc[train_index].index
            test_iloc_index = sorted_ts_df.iloc[test_index].index

            if self.clipping:
                # TimeSeriesSplit.split() で返される Fold の大きさが徐々に大きくなることを仮定している
                train_fold_min_len = min(train_fold_min_len, len(train_iloc_index))
                test_fold_min_len = min(test_fold_min_len, len(test_iloc_index))

            yield list(train_iloc_index[-train_fold_min_len:]), list(test_iloc_index[-test_fold_min_len:])


def main():
    df = sns.load_dataset('flights')

    month_name_mappings = {name: str(n).zfill(2) for n, name in
                           enumerate(month_name)}
    df['month'] = df['month'].apply(lambda x: month_name_mappings[x])
    df['year-month'] = df.year.astype(str) + '-' + df.month.astype(str)
    df['year-month'] = pd.to_datetime(df['year-month'], format='%Y-%m')

    # データの並び順をシャッフルする
    df = df.sample(frac=1.0, random_state=42)

    # 特定のカラムを時系列としてソートした分割
    folds = MovingWindowKFold(ts_column='year-month', n_splits=5)

    fig, axes = plt.subplots(5, 1, figsize=(12, 12))

    # 元々のデータを時系列ソートした iloc が添字として得られる
    for i, (train_index, test_index) in enumerate(folds.split(df)):
        print(f'index of train: {train_index}')
        print(f'index of test: {test_index}')
        print('----------')
        sns.lineplot(data=df, x='year-month', y='passengers', ax=axes[i], label='original')
        sns.lineplot(data=df.iloc[train_index], x='year-month', y='passengers', ax=axes[i], label='train')
        sns.lineplot(data=df.iloc[test_index], x='year-month', y='passengers', ax=axes[i], label='test')

    plt.legend()
    plt.show()


if __name__ == '__main__':
    main()

上記を実行してみよう。 もちろん、先ほどと同じようにデータはランダムにシャッフルされている。

$ python movwin.py
index of train: [42, 128, 98, 91, 27, 72, 96, 80, 87, 26, 34, 20, 5, 88, 141, 53, 33, 92, 9, 1, 138, 116, 56, 49]
index of test: [47, 48, 28, 16, 44, 125, 61, 30, 119, 65, 78, 76, 32, 124, 93, 55, 45, 110, 37, 81, 52, 25, 103, 60]
----------
index of train: [42, 128, 98, 91, 27, 72, 96, 80, 87, 26, 34, 20, 5, 88, 141, 53, 33, 92, 9, 1, 138, 116, 56, 49, 47, 48, 28, 16, 44, 125, 61, 30, 119, 65, 78, 76, 32, 124, 93, 55, 45, 110, 37, 81, 52, 25, 103, 60]
index of test: [113, 75, 101, 10, 129, 71, 100, 24, 4, 117, 111, 121, 41, 105, 68, 122, 15, 7, 8, 51, 57, 17, 82, 139]
----------
index of train: [42, 128, 98, 91, 27, 72, 96, 80, 87, 26, 34, 20, 5, 88, 141, 53, 33, 92, 9, 1, 138, 116, 56, 49, 47, 48, 28, 16, 44, 125, 61, 30, 119, 65, 78, 76, 32, 124, 93, 55, 45, 110, 37, 81, 52, 25, 103, 60, 113, 75, 101, 10, 129, 71, 100, 24, 4, 117, 111, 121, 41, 105, 68, 122, 15, 7, 8, 51, 57, 17, 82, 139]
index of test: [94, 19, 135, 118, 63, 77, 11, 107, 58, 66, 2, 84, 43, 73, 46, 134, 114, 83, 112, 109, 142, 35, 12, 54]
----------
index of train: [42, 128, 98, 91, 27, 72, 96, 80, 87, 26, 34, 20, 5, 88, 141, 53, 33, 92, 9, 1, 138, 116, 56, 49, 47, 48, 28, 16, 44, 125, 61, 30, 119, 65, 78, 76, 32, 124, 93, 55, 45, 110, 37, 81, 52, 25, 103, 60, 113, 75, 101, 10, 129, 71, 100, 24, 4, 117, 111, 121, 41, 105, 68, 122, 15, 7, 8, 51, 57, 17, 82, 139, 94, 19, 135, 118, 63, 77, 11, 107, 58, 66, 2, 84, 43, 73, 46, 134, 114, 83, 112, 109, 142, 35, 12, 54]
index of test: [40, 3, 31, 132, 14, 97, 143, 131, 102, 104, 140, 126, 89, 74, 22, 36, 62, 23, 90, 50, 133, 0, 38, 21]
----------
index of train: [42, 128, 98, 91, 27, 72, 96, 80, 87, 26, 34, 20, 5, 88, 141, 53, 33, 92, 9, 1, 138, 116, 56, 49, 47, 48, 28, 16, 44, 125, 61, 30, 119, 65, 78, 76, 32, 124, 93, 55, 45, 110, 37, 81, 52, 25, 103, 60, 113, 75, 101, 10, 129, 71, 100, 24, 4, 117, 111, 121, 41, 105, 68, 122, 15, 7, 8, 51, 57, 17, 82, 139, 94, 19, 135, 118, 63, 77, 11, 107, 58, 66, 2, 84, 43, 73, 46, 134, 114, 83, 112, 109, 142, 35, 12, 54, 40, 3, 31, 132, 14, 97, 143, 131, 102, 104, 140, 126, 89, 74, 22, 36, 62, 23, 90, 50, 133, 0, 38, 21]
index of test: [95, 136, 99, 85, 29, 18, 115, 39, 123, 86, 130, 79, 6, 13, 127, 70, 108, 64, 120, 69, 59, 106, 67, 137]
----------

得られるグラフは次のとおり。 ランダムにシャッフルされたデータでも、ちゃんと時系列でデータが分割されていることがわかる。

f:id:momijiame:20200327181559p:plain
MovingWindowKFold を用いた分割

ちなみに、学習データを等幅に切りそろえる機能も用意した。 この手法に世間的にはなんて名前がついているのかわからないけど、とりあえず clipping オプションを有効にすれば使える。

    folds = MovingWindowKFold(ts_column='year-month', clipping=True, n_splits=5)

有効にした状態で実行するとこんな感じ。 リストの長さが揃っている。

$ python movwin.py
index of train: [42, 128, 98, 91, 27, 72, 96, 80, 87, 26, 34, 20, 5, 88, 141, 53, 33, 92, 9, 1, 138, 116, 56, 49]
index of test: [47, 48, 28, 16, 44, 125, 61, 30, 119, 65, 78, 76, 32, 124, 93, 55, 45, 110, 37, 81, 52, 25, 103, 60]
----------
index of train: [47, 48, 28, 16, 44, 125, 61, 30, 119, 65, 78, 76, 32, 124, 93, 55, 45, 110, 37, 81, 52, 25, 103, 60]
index of test: [113, 75, 101, 10, 129, 71, 100, 24, 4, 117, 111, 121, 41, 105, 68, 122, 15, 7, 8, 51, 57, 17, 82, 139]
----------
index of train: [113, 75, 101, 10, 129, 71, 100, 24, 4, 117, 111, 121, 41, 105, 68, 122, 15, 7, 8, 51, 57, 17, 82, 139]
index of test: [94, 19, 135, 118, 63, 77, 11, 107, 58, 66, 2, 84, 43, 73, 46, 134, 114, 83, 112, 109, 142, 35, 12, 54]
----------
index of train: [94, 19, 135, 118, 63, 77, 11, 107, 58, 66, 2, 84, 43, 73, 46, 134, 114, 83, 112, 109, 142, 35, 12, 54]
index of test: [40, 3, 31, 132, 14, 97, 143, 131, 102, 104, 140, 126, 89, 74, 22, 36, 62, 23, 90, 50, 133, 0, 38, 21]
----------
index of train: [40, 3, 31, 132, 14, 97, 143, 131, 102, 104, 140, 126, 89, 74, 22, 36, 62, 23, 90, 50, 133, 0, 38, 21]
index of test: [95, 136, 99, 85, 29, 18, 115, 39, 123, 86, 130, 79, 6, 13, 127, 70, 108, 64, 120, 69, 59, 106, 67, 137]
----------

グラフにするとこうなる。

f:id:momijiame:20200327181632p:plain
MovingWindowKFold を用いた分割 (クリッピング)

時系列版 train_test_split() も作ってみる

あとは、データが大きかったり Nested CV する状況ならホールドアウトも必要だろうなーと思うので train_test_split() も作ってみた。 サンプルコードは次のとおり。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from calendar import month_name

import pandas as pd
from matplotlib import pyplot as plt
import seaborn as sns
from pandas.plotting import register_matplotlib_converters
from sklearn.model_selection import train_test_split

register_matplotlib_converters()


def ts_train_test_split(df, ts_column, **options):
    """時系列情報が含まれるカラムでソートした iloc を返す Hold-Out"""
    # シャッフルしない
    options['shuffle'] = False
    # 時系列のカラムを取り出す
    ts = df[ts_column]
    # 元々のインデックスを振り直して iloc として使える値 (0, 1, 2...) にする
    ts_df = ts.reset_index()
    # 時系列でソートする
    sorted_ts_df = ts_df.sort_values(by=ts_column)
    # 添字を計算する
    train_index, test_index = train_test_split(sorted_ts_df.index, **options)
    return list(train_index), list(test_index)

def main():
    df = sns.load_dataset('flights')

    month_name_mappings = {name: str(n).zfill(2) for n, name in
                           enumerate(month_name)}
    df['month'] = df['month'].apply(lambda x: month_name_mappings[x])
    df['year-month'] = df.year.astype(str) + '-' + df.month.astype(str)
    df['year-month'] = pd.to_datetime(df['year-month'], format='%Y-%m')

    # データの並び順をシャッフルする
    df = df.sample(frac=1.0, random_state=42)

    # 学習データとテストデータに分割する
    train_index, test_index = ts_train_test_split(df, ts_column='year-month', test_size=0.33)

    # 添字
    print(f'index of train: {train_index}')
    print(f'index of test: {test_index}')

    # グラフに描いてみる
    sns.lineplot(data=df.iloc[train_index], x='year-month', y='passengers', label='train')
    sns.lineplot(data=df.iloc[test_index], x='year-month', y='passengers', label='test')

    plt.legend()
    plt.show()


if __name__ == '__main__':
    main()

上記を実行してみよう。

$ python tsholtout.py
index of train: [42, 128, 98, 91, 27, 72, 96, 80, 87, 26, 34, 20, 5, 88, 141, 53, 33, 92, 9, 1, 138, 116, 56, 49, 47, 48, 28, 16, 44, 125, 61, 30, 119, 65, 78, 76, 32, 124, 93, 55, 45, 110, 37, 81, 52, 25, 103, 60, 113, 75, 101, 10, 129, 71, 100, 24, 4, 117, 111, 121, 41, 105, 68, 122, 15, 7, 8, 51, 57, 17, 82, 139, 94, 19, 135, 118, 63, 77, 11, 107, 58, 66, 2, 84, 43, 73, 46, 134, 114, 83, 112, 109, 142, 35, 12, 54]
index of test: [40, 3, 31, 132, 14, 97, 143, 131, 102, 104, 140, 126, 89, 74, 22, 36, 62, 23, 90, 50, 133, 0, 38, 21, 95, 136, 99, 85, 29, 18, 115, 39, 123, 86, 130, 79, 6, 13, 127, 70, 108, 64, 120, 69, 59, 106, 67, 137]

次のようなグラフが得られる。

f:id:momijiame:20200327181748p:plain
時系列データの Hold-Out 分割 (1)

train_test_split() 関数のラッパーとして動作するので、オプションはそのまま使える。 たとえば、検証用データの比率を 50% まで増やしてみよう。

    train_index, test_index = ts_train_test_split(df, ts_column='year-month', test_size=0.5)

上記のコードにしたモジュールを実行してみる。

$ python tsholtout.py
index of train: [42, 128, 98, 91, 27, 72, 96, 80, 87, 26, 34, 20, 5, 88, 141, 53, 33, 92, 9, 1, 138, 116, 56, 49, 47, 48, 28, 16, 44, 125, 61, 30, 119, 65, 78, 76, 32, 124, 93, 55, 45, 110, 37, 81, 52, 25, 103, 60, 113, 75, 101, 10, 129, 71, 100, 24, 4, 117, 111, 121, 41, 105, 68, 122, 15, 7, 8, 51, 57, 17, 82, 139]
index of test: [94, 19, 135, 118, 63, 77, 11, 107, 58, 66, 2, 84, 43, 73, 46, 134, 114, 83, 112, 109, 142, 35, 12, 54, 40, 3, 31, 132, 14, 97, 143, 131, 102, 104, 140, 126, 89, 74, 22, 36, 62, 23, 90, 50, 133, 0, 38, 21, 95, 136, 99, 85, 29, 18, 115, 39, 123, 86, 130, 79, 6, 13, 127, 70, 108, 64, 120, 69, 59, 106, 67, 137]

得られるグラフは次のとおり。

f:id:momijiame:20200327181811p:plain
時系列データの Hold-Out 分割 (2)

いじょう。

Kaggleで勝つデータ分析の技術

Kaggleで勝つデータ分析の技術

Python: Luigi のパラメータ爆発問題について

Luigi は、Python を使って実装された、バッチ処理のパイプラインを扱うためのフレームワーク。 Luigi でパイプラインを定義するときは、基本的には個別のタスクを依存関係でつないでいくことになる。 このとき、扱う処理によってはパイプラインは長大になると共に扱うパラメータの数も増える。 そうすると、依存関係で上流にあるタスクに対して、どのようにパラメータを渡すか、という問題が生じる。

この問題は、公式のドキュメントではパラメータ爆発 (parameter explosion) と表現されている。

luigi.readthedocs.io

今回は、このパラメータ爆発問題を解決する方法について。 なお、基本的には上記のドキュメントに解決方法が書いてあるので、そちらを読むでも良い。

使った環境は次のとおり。

$ sw_vers           
ProductName:    Mac OS X
ProductVersion: 10.14.6
BuildVersion:   18G3020
$ python -V                 
Python 3.8.1
$ pip list | grep -i luigi   
luigi           2.8.12 

下準備

事前の準備として、Luigi をインストールしておく。

$ pip install luigi

パラメータ爆発によって生じるパラメータのバケツリレー

まずは、パラメータ爆発によって何が引き起こされるのかについて解説する。 たとえば、タスクとして UpstreamTaskDownstreamTask があるとする。 DownstreamTaskUpstreamTask に依存していて、それぞれ動作に必要なパラメータがある。

このとき、愚直にパイプラインを定義すると、次のようなコードになる。 DownstreamTask では、UpstreamTask に必要なパラメータを二重に定義した上で、タスクを生成するときに渡している。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi


class ExampleCommonTask(luigi.Task):
    """ターゲットファイルを生成しない例示用のタスク"""
    # タスクの実行が完了したかを示すフラグ
    done = False

    def run(self):
        """タスクが実行されたときに呼ばれるメソッド"""
        print(f'run: {self}')
        # 実行されたらタスクが完了したことにする
        self.done = True

    def complete(self):
        """タスク完了のチェックに使われるメソッド"""
        return self.done


class UpstreamTask(ExampleCommonTask):
    """他のタスクから依存されているタスク"""

    # 上流タスクのパラメータ
    upstream_task_param = luigi.Parameter()


class DownstreamTask(ExampleCommonTask):
    """他のタスクに依存しているタスク"""

    # タスク固有のパラメータ
    downstream_task_param = luigi.Parameter()

    # 上流タスクのパラメータ
    # FIXME: 同じ内容を二重に定義してしまっている
    upstream_task_param = luigi.Parameter()

    def requires(self):
        """依存しているタスクを示すメソッド"""
        # FIXME: 依存しているタスクを生成するときにパラメータのバケツリレーが生じている
        yield UpstreamTask(self.upstream_task_param)


def main():
    # 実行したい下流タスクにすべてのパラメータを渡してバケツリレーする
    luigi.run(cmdline_args=['DownstreamTask',
                            '--downstream-task-param=downstream',
                            '--upstream-task-param=upstream',
                            '--local-scheduler'])


if __name__ == '__main__':
    main()

上記をファイルに保存して実行してみよう。

$ python example.py
...(省略)...
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 DownstreamTask(downstream_task_param=downstream, upstream_task_param=upstream)
    - 1 UpstreamTask(upstream_task_param=upstream)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

実行サマリーを見ると、ちゃんと DownstreamTask に渡したパラメータが UpstreamTask に渡されている。 たしかに、問題なく動いてはいる。 しかし、これではパラメータが二重管理になっているし書き間違いも起きやすい。 もし、上流のタスクが追加されたら下流のタスクはすべて修正が必要になってしまう。 タスクの依存関係とパラメータが少ないうちは何とかなっても、規模が大きくなれば早々に破綻するのは明らかだろう。

クラス名を指定してパラメータを渡す

ここからは、上記の問題を解決する方法について書いていく。 まず、ひとつ目のやり方はタスクの依存関係を示すときにパラメータを指定しないというもの。 こうすると、不足しているパラメータはコマンドラインやコンフィグファイル経由で渡すことになる。

以下にサンプルコードを示す。 先ほどとの違いは DownstreamTaskrequires() メソッドのところ。 メソッドで UpstreamTask のインスタンスを返すときに、パラメータをまったく指定していない。 代わりに、実行するときにコマンドラインからそれぞれのクラスの名前を指定してパラメータを渡している。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi


class ExampleCommonTask(luigi.Task):
    """ターゲットファイルを生成しない例示用のタスク"""
    # タスクの実行が完了したかを示すフラグ
    done = False

    def run(self):
        """タスクが実行されたときに呼ばれるメソッド"""
        print(f'run: {self}')
        # 実行されたらタスクが完了したことにする
        self.done = True

    def complete(self):
        """タスク完了のチェックに使われるメソッド"""
        return self.done


class UpstreamTask(ExampleCommonTask):
    """他のタスクから依存されているタスク"""

    # 上流タスクのパラメータ
    upstream_task_param = luigi.Parameter()


class DownstreamTask(ExampleCommonTask):
    """他のタスクに依存しているタスク"""

    # タスク固有のパラメータ
    downstream_task_param = luigi.Parameter()

    def requires(self):
        """依存しているタスクを示すメソッド"""
        # 依存しているタスクを生成するときにパラメータを渡さない
        # 代わりにコマンドラインの引数でクラス名を指定してパラメータを渡す
        yield UpstreamTask()


def main():
    # 実行するときにクラス名を指定することで上流タスクに直接パラメータを渡せる
    luigi.run(cmdline_args=['DownstreamTask',
                            '--DownstreamTask-downstream-task-param=downstream',
                            '--UpstreamTask-upstream-task-param=upstream',
                            '--local-scheduler'])


if __name__ == '__main__':
    main()

上記をファイルに保存して実行してみよう。

$ python example.py
...(省略)...
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 DownstreamTask(downstream_task_param=downstream)
    - 1 UpstreamTask(upstream_task_param=upstream)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

実行サマリからは、ピンポイントにそれぞれのクラスにパラメータが渡されていることがわかる。 ただし、この方法は下流にあるタスクが上流のタスクのパラメータを利用するときには使えない

@requires デコレータを使う

次に紹介するのは luigi.util.requires デコレータを使うやり方。 このデコレータを使ってタスクのクラスを修飾すると、自動的に必要なパラメータとメソッドを追加してくれる。

以下にサンプルコードを示す。 DownstreamTask@requires デコレータを使って UpstreamTask を引数に修飾している。 こうすると、指定したクラスで必要となるパラメータを自動的に追加すると共に、requires() メソッドでクラスを指定したのと同じことになる。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi
from luigi.util import requires


class ExampleCommonTask(luigi.Task):
    """ターゲットファイルを生成しない例示用のタスク"""

    # タスクの実行が完了したかを示すフラグ
    done = False

    def run(self):
        """タスクが実行されたときに呼ばれるメソッド"""
        print(f'run: {self}')
        # 実行されたらタスクが完了したことにする
        self.done = True

    def complete(self):
        """タスク完了のチェックに使われるメソッド"""
        return self.done


class UpstreamTask(ExampleCommonTask):
    """他のタスクから依存されているタスク"""

    # 上流タスクのパラメータ
    upstream_task_param = luigi.Parameter()


# 他のタスクに依存していることをデコレータで示す
# 必要なパラメータも自動的に定義されるので二重管理がなくなる
@requires(UpstreamTask)
class DownstreamTask(ExampleCommonTask):
    """他のタスクに依存しているタスク"""

    # タスク固有のパラメータ
    downstream_task_param = luigi.Parameter()


def main():
    luigi.run(cmdline_args=['DownstreamTask',
                            '--downstream-task-param=downstream',
                            '--upstream-task-param=upstream',
                            '--local-scheduler'])


if __name__ == '__main__':
    main()

上記をファイルに保存して実行してみよう。

$ python example.py
...(省略)...
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 DownstreamTask(upstream_task_param=upstream, downstream_task_param=downstream)
    - 1 UpstreamTask(upstream_task_param=upstream)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

実行サマリを見ると、最初のパターンと同じように UpstreamTask で必要とするパラメータが DownstreamTask にも重複して渡されている。 これなら下流のタスクでも上流のタスクのパラメータが利用できる。

@inherits デコレータを使う

もうひとつのやり方は @inherits デコレータを使うもので、アプローチは @requires デコレータと似ている。 ただし、こちらは本来の目的とはちょっと違うものを無理やり転用して使っている感じがある。

次のサンプルコードでは DownstreamTask@inherits デコレータで UpstreamTask を引数に修飾している。 こうすると DownstreamTaskUpstreamTask と同じパラメータを持つようになる。 ただし、@requires デコレータと違って依存関係はセットされない。 そのため、自分で requires() メソッドに必要なタスクを指定することになる。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi
from luigi.util import inherits


class ExampleCommonTask(luigi.Task):
    """ターゲットファイルを生成しない例示用のタスク"""

    # タスクの実行が完了したかを示すフラグ
    done = False

    def run(self):
        """タスクが実行されたときに呼ばれるメソッド"""
        print(f'run: {self}')
        # 実行されたらタスクが完了したことにする
        self.done = True

    def complete(self):
        """タスク完了のチェックに使われるメソッド"""
        return self.done


class UpstreamTask(ExampleCommonTask):
    """他のタスクから依存されているタスク"""

    # 上流タスクのパラメータ
    upstream_task_param = luigi.Parameter()


@inherits(UpstreamTask)
class DownstreamTask(ExampleCommonTask):
    """他のタスクに依存しているタスク"""

    # タスク固有のパラメータ
    downstream_task_param = luigi.Parameter()

    def requires(self):
        """依存しているタスクを示すメソッド"""
        # clone() することで、自身のパラメータを適用したタスクが得られる
        # self.clone_parent() でも良い
        return self.clone(UpstreamTask)


def main():
    luigi.run(cmdline_args=['DownstreamTask',
                            '--downstream-task-param=downstream',
                            '--upstream-task-param=upstream',
                            '--local-scheduler'])


if __name__ == '__main__':
    main()

上記をファイルに保存して実行してみよう。

$ python example.py
...(省略)...
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 DownstreamTask(upstream_task_param=upstream, downstream_task_param=downstream)
    - 1 UpstreamTask(upstream_task_param=upstream)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

実行サマリを見ると、ちゃんとパラメータを適用したタスクが実行されていることがわかる。

まとめ

今回は、パラメータ爆発問題を回避するいくつかの方法を試してみた。

  • タスクの依存関係を requires() メソッドで示すときにパラメータをブランクにする
  • @requires デコレータを使う
  • @inherits デコレータを使う

いじょう。

Linuxで動かしながら学ぶTCP/IPネットワーク入門

Linuxで動かしながら学ぶTCP/IPネットワーク入門

  • 作者:もみじあめ
  • 発売日: 2020/03/06
  • メディア: オンデマンド (ペーパーバック)

Python: Luigi のイベントハンドラを試してみる

今回は、Luigi でタスクの開始や成功・失敗などのときに発火するイベントハンドラを扱ってみる。 なお、Luigi はバッチ処理などのパイプラインを組むのに使われるソフトウェアのこと。 基本的な使い方については以下を参照してほしい。

blog.amedama.jp

使った環境は次の通り。

$ sw_vers 
ProductName:    Mac OS X
ProductVersion: 10.14.6
BuildVersion:   18G3020
$ python -V        
Python 3.7.6

下準備

下準備として Luigi をインストールしておく。

$ pip install luigi

イベントハンドラを登録する

早速だけど以下にサンプルコードを示す。 Luigi では、デコレータを使ってイベントが発生したときに実行したい処理を登録できる。 サンプルコードでは、タスクが開始されたタイミングと成功・失敗したタイミングで実行される処理を登録している。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi


class ExampleTask(luigi.Task):
    """サンプルのタスク"""

    def run(self):
        print(f'run: {self}')


@luigi.Task.event_handler(luigi.Event.START)
def on_start(task):
    """タスクを開始したときのハンドラ"""
    print(f'on_start: {task}')


@luigi.Task.event_handler(luigi.Event.SUCCESS)
def on_success(task):
    """タスクが成功したときのハンドラ"""
    print(f'on_success: {task}')


@luigi.Task.event_handler(luigi.Event.FAILURE)
def on_failure(task, exception):
    """タスクが失敗したときのハンドラ"""
    print(f'on_failure: {task} with {exception}')


def main():
    # 最終的に実行したいタスクを指定して開始する
    luigi.run(main_task_cls=ExampleTask, local_scheduler=True)


if __name__ == '__main__':
    main()

上記のサンプルコードを実行してみよう。 読みやすくなるように標準エラー出力は表示していない。

$ python evhandler.py 2>/dev/null
on_start: ExampleTask()
run: ExampleTask()
on_success: ExampleTask()

上記を見ると、ちゃんとタスクの開始と成功したタイミングでイベントハンドラが実行されていることがわかる。

意図的にタスクを失敗させてみる

試しに、意図的にタスクを失敗させてみよう。 次のサンプルコードではタスクで例外を上げている。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi


class ExampleTask(luigi.Task):

    def run(self):
        print(f'run: {self}')
        # 例外を上げる
        raise Exception('Oops!')


@luigi.Task.event_handler(luigi.Event.START)
def on_start(task):
    print(f'on_start: {task}')


@luigi.Task.event_handler(luigi.Event.SUCCESS)
def on_success(task):
    print(f'on_success: {task}')


@luigi.Task.event_handler(luigi.Event.FAILURE)
def on_failure(task, exception):
    print(f'on_failure: {task} with {exception}')


def main():
    luigi.run(main_task_cls=ExampleTask, local_scheduler=True)


if __name__ == '__main__':
    main()

上記を実行してみよう。

$ python evhandler.py 2>/dev/null
on_start: ExampleTask()
run: ExampleTask()
on_failure: ExampleTask() with Oops!

今度は、失敗したときのイベントハンドラが実行されていることがわかる。 また、同時に例外オブジェクトもハンドラに渡されている。

一連の複数のタスクを実行してみる

続いては、依存関係をもった複数のタスクが実行されたときの挙動を確認しておく。 次のサンプルコードでは、ExampleTaskA と、それに依存した ExampleTaskB というタスクを定義している。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi
from luigi.mock import MockTarget


class ExampleTaskA(luigi.Task):

    def run(self):
        print(f'run: {self}')
        # モックの出力にメッセージを書き込む
        out = self.output()
        with out.open('w') as f:
            f.write('Hello, World!\n')

    def output(self):
        # オンメモリのモックを出力にする
        return MockTarget('mock')


class ExampleTaskB(luigi.Task):

    def requires(self):
        # このタスクの実行には事前に以下のタスクを実行する必要がある
        return ExampleTaskA()

    def run(self):
        print(f'run: {self}')


@luigi.Task.event_handler(luigi.Event.START)
def on_start(task):
    """タスクを開始したときのハンドラ"""
    print(f'on_start: {task}')


@luigi.Task.event_handler(luigi.Event.SUCCESS)
def on_success(task):
    """タスクが成功したときのハンドラ"""
    print(f'on_success: {task}')


@luigi.Task.event_handler(luigi.Event.FAILURE)
def on_failure(task, exception):
    """タスクが失敗したときのハンドラ"""
    print(f'on_failure: {task} with {exception}')


def main():
    # 最終的に実行したいタスクを指定して開始する
    luigi.run(main_task_cls=ExampleTaskB, local_scheduler=True)


if __name__ == '__main__':
    main()

上記を実行してみる。

$ python evhandler.py 2>/dev/null
on_start: ExampleTaskA()
run: ExampleTaskA()
on_success: ExampleTaskA()
on_start: ExampleTaskB()
run: ExampleTaskB()
on_success: ExampleTaskB()

ちゃんと、それぞれのタスクが開始・成功したタイミングでイベントハンドラが実行されているようだ。

特定のタスクだけで実行したいハンドラを登録する

ここまでのイベントハンドラは、すべてのタスクに共通して実行されるものだった。 続いては、特定のタスクだけで実行されるイベントハンドラを登録してみる。

次のサンプルコードでは ExampleTaskB にだけタスクが成功したときに実行されるイベントハンドラを登録している。 Luigi の Task クラスには event_handler() というクラスメソッドが定義されていて、これを使ってタスクに対してイベントハンドラを登録できるようになっている。 そこで、定義したタスクの event_handler() メソッドを使えば、そのタスクだけで実行されるイベントハンドラを登録できる。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi
from luigi.mock import MockTarget


class ExampleTaskA(luigi.Task):

    def run(self):
        print(f'run: {self}')
        # モックの出力にメッセージを書き込む
        out = self.output()
        with out.open('w') as f:
            f.write('Hello, World!\n')

    def output(self):
        # オンメモリのモックを出力にする
        return MockTarget('mock')


class ExampleTaskB(luigi.Task):

    def requires(self):
        # このタスクの実行には事前に以下のタスクを実行する必要がある
        return ExampleTaskA()

    def run(self):
        print(f'run: {self}')


# 特定のタスクのイベントをハンドルしたいときはクラスメソッドの eventhandler() を使う
@ExampleTaskB.event_handler(luigi.Event.SUCCESS)
def on_success(task):
    print(f'on_success: {task}')


def main():
    # 最終的に実行したいタスクを指定して開始する
    luigi.run(main_task_cls=ExampleTaskB, local_scheduler=True)


if __name__ == '__main__':
    main()

あるいは、単純にタスクに各イベントハンドラがあらかじめ定義されているので、それをオーバーライドしても良い。 おそらく、ほとんどのユースケースではこちらを使えば問題ないはず。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import luigi
from luigi.mock import MockTarget


class ExampleTaskA(luigi.Task):

    def run(self):
        print(f'run: {self}')
        # モックの出力にメッセージを書き込む
        out = self.output()
        with out.open('w') as f:
            f.write('Hello, World!\n')

    def output(self):
        # オンメモリのモックを出力にする
        return MockTarget('mock')


class ExampleTaskB(luigi.Task):

    def requires(self):
        # このタスクの実行には事前に以下のタスクを実行する必要がある
        return ExampleTaskA()

    def run(self):
        print(f'run: {self}')

    # 特定のタスクのイベントをハンドルしたいときはメソッドをオーバーライドするだけ
    def on_success(task):
        print(f'on_success: {task}')


def main():
    # 最終的に実行したいタスクを指定して開始する
    luigi.run(main_task_cls=ExampleTaskB, local_scheduler=True)


if __name__ == '__main__':
    main()

上記を実行してみよう。

$ python evhandler.py 2>/dev/null
run: ExampleTaskA()
run: ExampleTaskB()
on_success: ExampleTaskB()

すると、今度は ExampleTaskB の成功時だけイベントハンドラが実行されていることがわかる。

めでたしめでたし。

「Linuxで動かしながら学ぶTCP/IPネットワーク入門」という本を書きました

表題のとおり TCP/IP に関する本を書きました。 今回は、そのご紹介です!

どんな本なの?

Linux を使って実際にネットワークを組んで動かしながら TCP/IP について学べる本です。 実際に手を動かすことで、より実践的で風化しにくい知識と技術を身につけることが本の目的です。

こんな人にオススメ

次のいずれかに当てはまるような方には、この本が参考になると思います。

  • ネットワークが専門ではない IT エンジニア、またはそれを志す学生さん
  • 他の TCP/IP に関する本を読んだことはあるけど、身についている実感が少ない
  • インターネットやインフラの技術についてよく知らないけど興味はある
  • ネットワークを気軽に組んで実験できる環境の作り方に興味がある

そして、この本を読んで試した後には、次のような効果が見込めます。

  • 「インターネットの本質を理解できた」という実感が得られる
  • ネットワークのトラブルシュートで見るべきポイントがわかる
  • なんとなく持っていたネットワークに対する苦手意識がなくなる
  • 「ネットワークがこういう状況では何が起こるんだろう」と思ったときに、試す環境が作れるようになる

執筆のきっかけ

執筆のきっかけは、実際に手を動かしながら TCP/IP について学べる本が世の中に必要なんじゃないか、と感じたことです。 TCP/IP という技術は、理屈について書いてある本はたくさんあるものの、実際に手を動かせるものとなると少なくなります。 さらに、手を動かすための環境構築までカバーした本となると、より限られます。

その原因は、TCP/IP を学ぶための環境構築が難しい点が挙げられると思います。 一般的に、ネットワークを組んで勉強しようとすると、複数台のコンピュータを用意する必要があるためです。 もちろん、仮想マシンを使えば物理的なコンピュータは 1 台で済みます。 ですが、それでもネットワークの構築という手間のかかる作業は残っています。

この本では、その問題を Linux の Network Namespace という機能を使って解決しています。 Network Namespace というのは、Docker といった Linux のコンテナ仮想化技術を支えている機能のひとつです。 Network Namespace を使うと、1 台の Linux マシンの中に、ネットワーク的にはシステムから独立した空間を作れます。 実験用のネットワークを作るのに必要な作業はいくつかのコマンドライン操作だけで、完了するまでには数秒もかかりません。

また、Linux を動かす環境には仮想マシンが使えるため、用意すべき物理的なコンピュータは今使っているラップトップで十分です。 もちろん、利用するソフトウェアはどれもフリーなものなので、追加で必要となる費用もありません。 読みながら試すときは Linux のコマンドライン操作が必要になりますが、あまり親しみのない人にも読んでもらえるように、付録には簡単にですが操作ガイドも用意しました。

どんなことが書いてあるの?

TCP/IP を理解する上で、重要なポイントをギュギュッと詰め込みました。 この本は、プロトコルのヘッダに含まれるすべてのフィールドを網羅的に解説していくような性格の本ではありません。 そのため、TCP/IP に詳しい方からすると「これは書いてないのか」と思う点もあるかもしれません。 ですが、この本を読んで「TCP/IP という技術の地図とコンパス」が頭に入れば、他の本も読みやすくなると思います。 また、書く内容を重要なポイントに絞っても、尚 B5 版で 200 ページを超えるボリュームになりました (付録含む)。

本書は、7 つの章に分かれています。 OSI 参照モデルでいうと L2 から L7 までを一冊でひととおりカバーしています。 説明の順序としては「L3 -> L2 -> L4 -> L7 -> NAT -> ソケットプログラミング」という流れです。 ざっくりと、どんなことを書いてあるのか次に示します。

  • 「TCP/IP とは」
    • この章では TCP/IP の概要とインターネットの動く仕組みについて学びます
    • インターネットでパケットがバケツリレーされる仕組みについて、実際に ping や traceroute コマンドを実行しながら説明を進めます
  • 「Network Namespace」
    • この章では Network Namespace を使って実験用ネットワークを組む方法を学びます
    • 「ルータなし → ルータあり → ルータ 2 台」という流れで、少しずつ複雑なネットワークを組みながら説明を進めます
    • ping を使ってネットワークの疎通を確認したり、パケットを tcpdump コマンドを使って観察します
  • 「イーサネット」
    • この章ではイーサネットが近隣までパケットを運ぶ仕組みについて学びます
    • MAC アドレスを使ってフレームが届く仕組みや、ブリッジ (Linux Network Bridge) でブロードキャストドメインを広げる方法を扱います
  • 「トランスポート層のプロトコル」
    • この章では、トランスポート層のプロトコルを使って、通信の種類を識別したり、信頼性のある通信を実現する方法を学びます
    • はじめに、UDP でポート番号を使って通信するアプリケーションを識別する仕組みについて扱います
    • その次に、TCP が信頼性のある通信を実現している仕組みを扱います
  • 「アプリケーション層のプロトコル」
    • この章では、TCP/IP の集大成としてインターネットでやり取りされる実用的な通信について学びます
    • 扱うプロトコルは HTTP、DNS、DHCP です
  • 「NAT」
    • この章では、IPv4 で組まれた家庭やオフィスなどのネットワークのほとんどで用いられている NAT という技術を学びます
    • はじめに、グローバルアドレスを節約するために用いられる Source NAT について扱います
    • その次に、「ポートを空ける」という表現でよく知られている Destination NAT について扱います
  • 「ソケットプログラミング」
    • この章では、TCP/IP をソフトウェアから扱う方法について学びます
    • プログラミング言語としては Python を使います
    • 取り扱う題材は、HTTP クライアント、エコーサーバ、独自の足し算バイナリプロトコルです

どんな人が書いたの?

著者は、とあるインターネット関連企業で働いているソフトウェアエンジニアです。 ネットワーク機器の開発で IPv6 プロトコルスタックを担当していた経験があります。 一時期は SDN (Software Defined Network) 関連のソフトウェアの研究開発や OSS コントリビューションを仕事としてやっていました。 また、時期によっては Web アプリケーションの開発もしていたので、L2 ~ L7 にかけてひととおりの開発経験があります。 なので、出自は「ソフトウェア寄りのネットワーク屋さん」といったところです。

なお、過去にブログで TCP/IP について扱って話題となったエントリだと、次のようなものがあります。

blog.amedama.jp

blog.amedama.jp

中身を軽く読んでみたい

本の雰囲気を確かめてから買いたいという方もいらっしゃるかと思うので、以下に立ち読み版の PDF をご用意しました。

drive.google.com

おわりに

私の好きな言葉のひとつに「特定の分野におけるプロフェッショナルとは、その分野でひととおりの失敗を経験した人のことをいう」という言葉があります。 ぜひ、この本を使って、たくさん失敗できる環境を手にしていただければ幸いです。

謝辞

この場を借りて、本の表紙を書いてくれた Asano Sonoko さん、技術的なレビューをしてくださった @ttsubo さんに心からお礼を申し上げます。 おかげさまで、プライベートで 2 年ほどかけて少しずつ書いた本を世に出すことができました。 本当にありがとうございました。

Python: Optuna を使って QWK の閾値を最適化してみる

最近、Twitter のタイムラインで QWK (Quadratic Weighted Kappa: 二次の重み付きカッパ係数) の最適化が話題になっていたので個人的に調べていた。 QWK は順序つきの多値分類問題を評価するための指標で、予測を大きく外すほど大きなペナルティが与えられるようになっている。 また、予測値の分布が結果に影響する点も特徴的で、この点が今回取り扱う最適化にも関係してくる。

QWK の最適化については、Kaggle 本と、その著者 @Maxwell_110 さんによる次のブログエントリが詳しい。 ようするに、真のラベルの分布に沿った形で予測しないと最適な結果が得られない、ということ。

Kaggleで勝つデータ分析の技術

Kaggleで勝つデータ分析の技術

note.com

Kaggle 本や、Web をざっと調べた限りでは scipy.optimize.minimize() 関数を使って Nelder Mead 法で最適化するのが一般的なようだった。

docs.scipy.org

今回は、せっかくなので Optuna を使って TPE (Tree-structured Parzen Estimator) で最適化してみた。

使った環境は次のとおり。

$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.14.6
BuildVersion:   18G3020
$ python -V                                                                
Python 3.7.6

下準備

はじめに、利用するパッケージをインストールしておく。

$ pip install scikit-learn optuna lightgbm pandas

その上で、Kaggle で開催された次のコンペのデータセットをダウンロードしておく。

www.kaggle.com

これは、前述した Kaggle 本で QWK の最適化が有効に働く例として挙げられているもの。 カレントワーキングディレクトリに、次のファイルがある状態にしておく。

$ ls | grep .zip
sample_submission.csv.zip
test.csv.zip
train.csv.zip

多値分類問題として解いてみる

そもそも、順序つきのラベルを予測するタスクの場合、多値分類問題として解く方法と回帰問題として解く方法がある。 まずは、ベーシックな考え方として多値分類問題として解く方法を試してみる。

早速だけど、サンプルコードは次のとおり。 LightGBM を使って多値分類問題として解いている。 目的関数は Multi LogLoss で、5-Fold CV で QWK について評価している。

#!/usr/bin/env python3

import numbers

import pandas as pd
import numpy as np
import lightgbm as lgb
from sklearn.model_selection import KFold
from sklearn.metrics import cohen_kappa_score


def lgb_custom_metric_qwk_multiclass(preds, data):
    """LightGBM のカスタムメトリックを計算する関数

    多値分類問題として解いた予測から QWK を計算する"""
    # 正解ラベル
    y_true = data.get_label()
    # ラベルの数
    num_labels = data.params['num_class']
    # 多値分類問題では本来は二次元配列が一次元配列になって提供される
    reshaped_preds = preds.reshape(num_labels, len(preds) // num_labels)
    # 最尤と判断したクラスを選ぶ 
    y_pred = np.argmax(reshaped_preds, axis=0)  # 最尤と判断したクラスを選ぶ
    # QWK を計算する
    return 'qwk', qwk(y_true, y_pred), True


def qwk(y_true, y_pred):
    """QWK (Quadratic Weighted Kappa) を計算する関数"""
    return cohen_kappa_score(y_true, y_pred, weights='quadratic')


def _numerical_only_df(df_):
    """数値のカラムだけ取り出す関数"""
    number_cols = [name for name, dtype in df_.dtypes.items()
                   if issubclass(dtype.type, numbers.Number)]
    numerical_df = df_[number_cols]
    return numerical_df


def main():
    # データセットを読み込む
    train_df = pd.read_csv('train.csv.zip')

    # 説明変数
    x_train = train_df.drop('Response', axis=1)
    # 簡単にするため、とりあえずすぐに使えるものだけ取り出して使う
    x_train = _numerical_only_df(x_train)

    # 目的変数
    y_train = train_df.Response.astype(float)
    # 目的変数はゼロからスタートにする
    y_train -= 1

    # LightGBM のデータセット表現にする
    lgb_train = lgb.Dataset(x_train, y_train)

    # 多値分類問題として解く
    lgbm_params = {
        'objective': 'multiclass',
        'metric': 'multi_logloss',
        'num_class': len(y_train.unique()),
        'first_metric_only': True,
        'verbose': -1,
    }

    # データ分割の方法
    folds = KFold(n_splits=5, shuffle=True, random_state=42)

    # 5-Fold CV で検証する
    result = lgb.cv(lgbm_params,
                    lgb_train,
                    num_boost_round=1000,
                    early_stopping_rounds=100,
                    folds=folds,
                    verbose_eval=10,
                    feval=lgb_custom_metric_qwk_multiclass,
                    seed=42,
                    )

    # early stop したラウンドでの QWK を出力する
    print(f'CV Mean QWK: {result["qwk-mean"][-1]}')


if __name__ == '__main__':
    main()

上記を実行した結果が次のとおり。

$ python qwkmc.py
[10]   cv_agg's multi_logloss: 1.37765 + 0.0076621   cv_agg's qwk: 0.442683 + 0.00532416
[20]   cv_agg's multi_logloss: 1.25563 + 0.0095355   cv_agg's qwk: 0.507798 + 0.00703211
[30]   cv_agg's multi_logloss: 1.20349 + 0.0100597   cv_agg's qwk: 0.519766 + 0.00748977
...(省略)...
[220]  cv_agg's multi_logloss: 1.14153 + 0.0126548   cv_agg's qwk: 0.560446 + 0.00516118
[230]  cv_agg's multi_logloss: 1.14198 + 0.0126144   cv_agg's qwk: 0.56074 + 0.00444787
[240]  cv_agg's multi_logloss: 1.14242 + 0.0130142   cv_agg's qwk: 0.561591 + 0.0046819
CV Mean QWK: 0.5574903141807788

交差検証では QWK の平均として約 0.5574 が得られている。

回帰問題として解いてみる

続いては、同じデータを回帰問題として解くパターンについて。 ラベルが順序つきであれば、回帰問題として解いた上で結果を clip + 四捨五入するような方法もある。

サンプルコードは次のとおり。 やっていることは同じで、解き方が違うだけ。

#!/usr/bin/env python3

import numbers

import numpy as np
import pandas as pd
import lightgbm as lgb
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import cohen_kappa_score


def lgb_custom_metric_qwk_regression(preds, data):
    """LightGBM のカスタムメトリックを計算する関数

    回帰問題として解いた予測から QWK を計算する"""
    # 正解ラベル
    y_true = data.get_label()
    # 予測ラベル
    y_pred = np.clip(preds, 0, 7).round()  # 単純に予測値を clip して四捨五入する
    # QWK を計算する
    return 'qwk', qwk(y_true, y_pred), True


def qwk(y_true, y_pred):
    """QWK (Quadratic Weighted Kappa) を計算する関数"""
    return cohen_kappa_score(y_true, y_pred, weights='quadratic')


def _numerical_only_df(df_):
    """数値のカラムだけ取り出す関数"""
    number_cols = [name for name, dtype in df_.dtypes.items()
                   if issubclass(dtype.type, numbers.Number)]
    numerical_df = df_[number_cols]
    return numerical_df


def main():
    # データセットを読み込む
    train_df = pd.read_csv('train.csv.zip')

    # 説明変数
    x_train = train_df.drop('Response', axis=1)
    # 簡単にするため、とりあえずすぐに使えるものだけ取り出して使う
    x_train = _numerical_only_df(x_train)

    # 目的変数
    y_train = train_df.Response.astype(float)
    # 目的変数はゼロからスタートにする
    y_train -= 1

    # LightGBM のデータセット表現にする
    lgb_train = lgb.Dataset(x_train, y_train)

    # 回帰問題として解く
    lgbm_params = {
        'objective': 'regression',
        'metric': 'rmse',
        'first_metric_only': True,
        'verbose': -1,
    }

    # データ分割の方法
    folds = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

    # 5-Fold CV で検証する
    result = lgb.cv(lgbm_params,
                    lgb_train,
                    num_boost_round=1000,
                    early_stopping_rounds=100,
                    folds=folds,
                    verbose_eval=10,
                    feval=lgb_custom_metric_qwk_regression,
                    seed=42,
                    )

    # early stop したラウンドでの QWK を出力する
    print(f'CV Mean QWK: {result["qwk-mean"][-1]}')


if __name__ == '__main__':
    main()

上記の実行結果は次のとおり。

$ python qwkreg.py
[10]   cv_agg's rmse: 2.02046 + 0.0085424    cv_agg's qwk: 0.409782 + 0.0042456
[20]   cv_agg's rmse: 1.91647 + 0.0125972    cv_agg's qwk: 0.506518 + 0.00716283
[30]   cv_agg's rmse: 1.87532 + 0.0140473    cv_agg's qwk: 0.553375 + 0.00638559
...(省略)...
[220]  cv_agg's rmse: 1.83657 + 0.0177681    cv_agg's qwk: 0.607652 + 0.00776567
[230]  cv_agg's rmse: 1.83694 + 0.0177138    cv_agg's qwk: 0.608083 + 0.00775286
[240]  cv_agg's rmse: 1.83728 + 0.0176133    cv_agg's qwk: 0.608366 + 0.00780103
CV Mean QWK: 0.6064040204540543

交差検証では QWK の平均として約 0.6064 が得られている。 先ほどよりもスコアが改善している。

回帰問題として解いた上で OOF Prediction を最適化する

続いては、今回の本題である QWK の閾値の最適化について。 OOF で予測した値を、閾値を最適化することでスコアが改善するかどうか確認してみる。

サンプルコードは次のとおり。 OptunaRounder というクラスを導入して、OOF Prediction を最適化している。

#!/usr/bin/env python3

import numbers

import optuna
import pandas as pd
import numpy as np
import lightgbm as lgb
from sklearn.model_selection import KFold
from sklearn.metrics import cohen_kappa_score


class OptunaRounder:
    """Optuna を使って QWK の最適な閾値を探索するクラス"""

    def __init__(self, y_true, y_pred):
        # 真のラベル
        self.y_true = y_true
        # 予測したラベル
        self.y_pred = y_pred
        # ラベルの種類
        self.labels = np.unique(y_true)

    def __call__(self, trial):
        """最大化したい目的関数"""
        # 閾値を Define by run で追加していく
        thresholds = []
        # ラベルの数 - 1 が必要な閾値の数になる
        for i in range(len(self.labels) - 1):
            # 閾値の下限 (既存の最大 or ラベルの最小値)
            low = max(thresholds) if i > 0 else min(self.labels)
            # 閾値の上限 (ラベルの最大値)
            high = max(self.labels)
            # 閾値の候補を追加する
            t = trial.suggest_uniform(f't{i}', low, high)
            thresholds.append(t)

        # 閾値の候補を元に QWK を計算する
        opt_y_pred = self.adjust(self.y_pred, thresholds)
        return qwk(self.y_true, opt_y_pred)

    def adjust(self, y_pred, thresholds):
        """閾値にもとづいて予測を補正するメソッド"""
        opt_y_pred = pd.cut(y_pred,
                            [-np.inf] + thresholds + [np.inf],
                            labels=self.labels)
        return opt_y_pred


def lgb_custom_metric_qwk_regression(preds, data):
    """LightGBM のカスタムメトリックを計算する関数

    回帰問題として解いた予測から QWK を計算する"""
    # 正解ラベル
    y_true = data.get_label()
    # 予測ラベル
    y_pred = np.clip(preds, 0, 7).round()  # 単純に clip して四捨五入する
    # QWK を計算する
    return 'qwk', qwk(y_true, y_pred), True


def qwk(y_true, y_pred):
    """QWK (Quadratic Weighted Kappa) を計算する関数"""
    return cohen_kappa_score(y_true, y_pred, weights='quadratic')


class ModelExtractionCallback(object):
    """lightgbm.cv() から学習済みモデルを取り出すためのコールバックに使うクラス"""

    def __init__(self):
        self._model = None

    def __call__(self, env):
        # _CVBooster の参照を保持する
        self._model = env.model

    def _assert_called_cb(self):
        if self._model is None:
            # コールバックが呼ばれていないときは例外にする
            raise RuntimeError('callback has not called yet')

    @property
    def boosters_proxy(self):
        self._assert_called_cb()
        # Booster へのプロキシオブジェクトを返す
        return self._model

    @property
    def raw_boosters(self):
        self._assert_called_cb()
        # Booster のリストを返す
        return self._model.boosters

    @property
    def best_iteration(self):
        self._assert_called_cb()
        # Early stop したときの boosting round を返す
        return self._model.best_iteration


def _numerical_only_df(df_):
    """数値のカラムだけ取り出す関数"""
    number_cols = [name for name, dtype in df_.dtypes.items()
                   if issubclass(dtype.type, numbers.Number)]
    numerical_df = df_[number_cols]
    return numerical_df


def main():
    # データセットを読み込む
    train_df = pd.read_csv('train.csv.zip')

    # 説明変数
    x_train = train_df.drop('Response', axis=1)
    # 簡単にするため、とりあえずすぐに使えるものだけ取り出して使う
    x_train = _numerical_only_df(x_train)
    # Id 列をインデックスに設定する
    x_train = x_train.set_index('Id')

    # 目的変数
    y_train = train_df.Response.astype(float)
    # 目的変数はゼロからスタートにする
    y_train -= 1

    # LightGBM のデータセット表現にする
    lgb_train = lgb.Dataset(x_train, y_train)

    # 回帰問題として解く
    lgbm_params = {
        'objective': 'regression',
        'metric': 'rmse',
        'first_metric_only': True,
        'verbose': -1,
    }

    # データ分割の方法
    folds = KFold(n_splits=5, shuffle=True, random_state=42)

    # 学習済みモデルを取り出すためのコールバックを用意する
    extraction_cb = ModelExtractionCallback()
    callbacks = [
        extraction_cb,
    ]

    # 5-Fold CV で検証する
    result = lgb.cv(lgbm_params,
                    lgb_train,
                    num_boost_round=10000,
                    early_stopping_rounds=100,
                    folds=folds,
                    verbose_eval=10,
                    feval=lgb_custom_metric_qwk_regression,
                    callbacks=callbacks,
                    seed=42,
                    )

    # early stop したラウンドでの QWK を出力する
    print(f'CV Mean QWK: {result["qwk-mean"][-1]}')

    # 学習データを学習済みモデルを使って OOF で予測する
    boosters = extraction_cb.raw_boosters
    best_iteration = extraction_cb.best_iteration
    oof_y_pred = np.zeros_like(y_train)
    for (_, val_index), booster in zip(folds.split(x_train, y_train), boosters):
        x_val = x_train.iloc[val_index]
        oof_y_pred[val_index] = booster.predict(list(x_val.to_numpy()),
                                                num_iteration=best_iteration)

    # ひとまず clipping + 四捨五入する
    oof_y_pred = np.clip(oof_y_pred, 0, 7).round()

    # 最適化していない状態での OOF Prediction の QWK
    raw_oof_qwk = qwk(y_train, oof_y_pred)
    print(f'Raw OOF QWK: {raw_oof_qwk}')

    # Optuna を使って QWK の閾値を最適化する
    objective = OptunaRounder(y_train, oof_y_pred)

    # 探索に 100 秒かける
    study = optuna.create_study(direction='maximize')
    study.optimize(objective, timeout=100)

    # 見つけた閾値
    best_thresholds = sorted(study.best_params.values())
    print(f'Optimized thresholds: {best_thresholds}')

    # 閾値を最適化した場合の QWK
    optimized_oof_y_pred = objective.adjust(oof_y_pred, best_thresholds)
    optimized_oof_qwk = qwk(y_train, optimized_oof_y_pred)
    print(f'Optimized OOF QWK: {optimized_oof_qwk}')

    # テスト用データを読み込む
    test_df = pd.read_csv('test.csv.zip')
    test_df = test_df.set_index('Id')

    # 数値のカラムだけ取り出す
    numerical_test_df = _numerical_only_df(test_df)

    # 学習済みモデルの Averaging で予測する
    boosters_proxy = extraction_cb.boosters_proxy
    test_y_preds = boosters_proxy.predict(numerical_test_df,
                                          num_iteration=best_iteration)
    test_y_pred_avg = np.mean(test_y_preds, axis=0)

    # サンプル提出ファイルを読み込む
    submission_df = pd.read_csv('sample_submission.csv.zip')
    submission_df = submission_df.set_index('Id')

    # 最適化していない予測値 (単純なクリッピングと四捨五入)
    submission_df.Response = np.clip(test_y_pred_avg, y_train.min(), y_train.max() + 1).round().astype(int) + 1
    submission_df.to_csv('unoptimized_submission.csv')

    # 最適化した予測値
    optimized_test_y_pred_avg = objective.adjust(test_y_pred_avg, best_thresholds)
    submission_df.Response = optimized_test_y_pred_avg.astype(int) + 1
    submission_df.to_csv('optimized_submission.csv')


if __name__ == '__main__':
    main()

上記を実行した結果は次のとおり。

$ python qwkregopt.py
[10]   cv_agg's rmse: 2.01959 + 0.0101767    cv_agg's qwk: 0.408957 + 0.00513284
[20]   cv_agg's rmse: 1.9165 + 0.008155  cv_agg's qwk: 0.507252 + 0.00192446
[30]   cv_agg's rmse: 1.87492 + 0.00797763   cv_agg's qwk: 0.553388 + 0.00129979
...(省略)...
[200]  cv_agg's rmse: 1.8374 + 0.00729844    cv_agg's qwk: 0.60726 + 0.00206979
[210]  cv_agg's rmse: 1.83764 + 0.00757892   cv_agg's qwk: 0.607434 + 0.00210639
[220]  cv_agg's rmse: 1.83771 + 0.00759351   cv_agg's qwk: 0.607882 + 0.00205622
CV Mean QWK: 0.6054069736940326
Raw OOF QWK: 0.605407593931218
[I 2020-03-01 12:39:28,349] Finished trial#0 resulted in value: 0.14760909846772208. Current best value is 0.14760909846772208 with parameters: {'t0': 0.6069999513322623, 't1': 6.071871351250824, 't2': 6.421127597260041, 't3': 6.6499083619322334, 't4': 6.825786239070231, 't5': 6.94896643994053, 't6': 6.988042174746154}.
[I 2020-03-01 12:39:28,461] Finished trial#1 resulted in value: 0.3273196067078864. Current best value is 0.3273196067078864 with parameters: {'t0': 1.3022593301640533, 't1': 3.3523582269835885, 't2': 5.775809715879871, 't3': 6.748066158840195, 't4': 6.813600862422318, 't5': 6.952492269479133, 't6': 6.986909664863027}.
[I 2020-03-01 12:39:28,580] Finished trial#2 resulted in value: 0.1845825834695184. Current best value is 0.3273196067078864 with parameters: {'t0': 1.3022593301640533, 't1': 3.3523582269835885, 't2': 5.775809715879871, 't3': 6.748066158840195, 't4': 6.813600862422318, 't5': 6.952492269479133, 't6': 6.986909664863027}.
...(省略)...
[I 2020-03-01 12:41:08,371] Finished trial#660 resulted in value: 0.3366380459371958. Current best value is 0.6461578531751463 with parameters: {'t0': 1.9366086554521658, 't1': 2.765496885821397, 't2': 3.711341767628701, 't3': 3.8616368179727982, 't4': 4.309923993071266, 't5': 5.4467521001848525, 't6': 5.753790447645578}.
Optimized thresholds: [1.9366086554521658, 2.765496885821397, 3.711341767628701, 3.8616368179727982, 4.309923993071266, 5.4467521001848525, 5.753790447645578]
Optimized OOF QWK: 0.6461578531751463

最適化する前の OOF Prediction の QWK が約 0.6054 なのに対して、閾値を最適化すると約 0.6461 と、大きくスコアが向上している。

手元の交差検証だけでは何か間違っている恐れがあるので、テストデータに対するサブミッションまでやってみた。 先ほどのモジュールを実行すると unoptimized_submission.csv という最適化前のファイルと、optimized_submission.csv という最適化後のファイルができる。 すると、最適化する前の Public LB / Private LB が 0.61931 / 0.62190 だったのに対して、最適化した後は 0.65801 / 0.66005 と、ちゃんと効果があることが確認できた。

所感など

QWK の最適化は、真のラベルの分布に依存している。 そのため、学習済みモデルを使って予測する対象が学習時の分布と異なる場合、最適化の効果が得られない。 この点から、Private データの分布が異なる場合には最適化が有効ではないと考えられる。

だとすると、Private データの分布が学習時と同じであるという確証が得られない限りは使うのは賭けになるのでは、という疑問が浮かんだ。 とはいえ、うまくいった場合の効果は絶大なので、ある程度の仮定のもとに使う、という選択肢は現実的なのかもしれない。

いじょう。

参考

Kaggleで勝つデータ分析の技術

Kaggleで勝つデータ分析の技術

www.kaggle.com

blog.amedama.jp

blog.amedama.jp

Linux の Network Namespace と radvd / dnsmasq で IPv6 SLAAC (+RDNSS) を試す

今回は、Linux の Network Namespace と radvd / dnsmasq を使って IPv6 の SLAAC を試してみる。 IPv6 では、アドレスの自動設定にいくつかのやり方がある。 SLAAC というのは、そのひとつで RFC 4862 で定義されている IPv6 Stateless Address Autoconfiguration のことを指す。 SLAAC では ICMPv6 NDP (Neighbor Discovery Protocol) の RA (Router Advertisement) というメッセージでアドレスとデフォルトルートを設定する。 その上で、RFC 8415 で定義されている Stateless DHCPv6 というプロトコルを使って DNS や NTP サーバを設定する。 なお、現在では RFC 8106 で定義されている RDNSS というオプションを使うことで、RA 単独でも DNS サーバを設定することができる。

使った環境は次のとおり。

$ cat /etc/lsb-release 
DISTRIB_ID=Ubuntu
DISTRIB_RELEASE=18.04
DISTRIB_CODENAME=bionic
DISTRIB_DESCRIPTION="Ubuntu 18.04.4 LTS"
$ uname -r
4.15.0-76-generic
$ dpkg -l | egrep "(radvd|dnsmasq|isc-dhcp-client)"
ii  dnsmasq                         2.79-1                              all          Small caching DNS proxy and DHCP/TFTP server
ii  dnsmasq-base                    2.79-1                              amd64        Small caching DNS proxy and DHCP/TFTP server
ii  isc-dhcp-client                 4.3.5-3ubuntu7.1                    amd64        DHCP client for automatically obtaining an IP address
ii  radvd                           1:2.16-3                            amd64        Router Advertisement Daemon

もくじ

作るネットワーク

用意するネットワークの物理的な構成は次のとおり。 白抜きになっている箱が Network Namespace を表している。 また、端点をもった線は veth インターフェイスを表している。

ネットワークの物理的な構成

ネットワークの論理的な構成は次のとおり。

ネットワークの論理的な構成

実験では host のアドレスやデフォルトルートを設定することになる。

下準備

下準備として、必要なパッケージをインストールしておく。

$ sudo apt-get -y install radvd dnsmasq isc-dhcp-client tcpdump

ネットワークを作る

まずは Network Namespace を作る。

$ sudo ip netns add host
$ sudo ip netns add router

そして、Network Namespace 同士をつなぐ veth インターフェイスを作る。

$ sudo ip link add ht-veth0 type veth peer name gw-veth0

作ったインターフェイスを Network Namespace に所属させる。

$ sudo ip link set ht-veth0 netns host
$ sudo ip link set gw-veth0 netns router

デフォルトでは EUI-64 を使ってアドレスの下位 64 ビットが生成されるため、わかりやすいように MAC アドレスを変更しておく。

$ sudo ip netns exec host ip link set dev ht-veth0 address 00:00:5E:00:53:01
$ sudo ip netns exec router ip link set dev gw-veth0 address 00:00:5E:00:53:02

veth インターフェイスの状態を UP に設定する。

$ sudo ip netns exec host ip link set ht-veth0 up
$ sudo ip netns exec router ip link set gw-veth0 up

router の方にリンクローカルアドレス (fe80::1 と、グローバルアドレスを模したドキュメンテーションアドレス (2001:db8::1) を付与しておく。

$ sudo ip netns exec router ip address add fe80::1/64 dev gw-veth0
$ sudo ip netns exec router ip address add 2001:db8::1/64 dev gw-veth0

次のようにアドレスが付与された。

$ sudo ip netns exec router ip address show gw-veth0
16: gw-veth0@if17: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc noqueue state UP group default qlen 1000
    link/ether 00:00:5e:00:53:02 brd ff:ff:ff:ff:ff:ff link-netnsid 0
    inet6 2001:db8::1/64 scope global 
       valid_lft forever preferred_lft forever
    inet6 fe80::1/64 scope link 
       valid_lft forever preferred_lft forever
    inet6 fe80::200:5eff:fe00:5302/64 scope link 
       valid_lft forever preferred_lft forever

あとは router の方で IPv6 のルーティングを有効にしておく。 ちなみに、このカーネルパラメータのフラグを立てておかないと radvd が起動しない。

$ sudo ip netns exec router sysctl -w net.ipv6.conf.all.forwarding=1

radvd

はじめに、radvd から試してみる。 このプログラムは ICMPv6 NDP の RA を送ることができる。

はじめに、radvd の設定ファイルを用意する。 次の設定で、プレフィックスとして 2001:db8::/64 を広告しつつ、デフォルトルートを 2001:db8::1 に向けることができる。

cat << 'EOF' > radvd.conf
interface gw-veth0 {

    # 定期的にルータ広告を送る
    AdvSendAdvert on;

    # SLAAC でアドレスの自動生成に使うプレフィックスを広告する
    prefix 2001:db8::/64 { };

};
EOF

通信を観察するために tcpdump をしかけておく。

$ sudo ip netns exec host tcpdump -tnlvv -i ht-veth0 ip6
tcpdump: listening on ht-veth0, link-type EN10MB (Ethernet), capture size 262144 bytes

準備ができたら、用意した設定ファイルを使って radvd を起動する。

$ sudo ip netns exec router radvd -C radvd.con

すると、次のように prefix オプションを含む RA メッセージが出る。

$ sudo ip netns exec host tcpdump -tnlvv -i ht-veth0 ip6
tcpdump: listening on ht-veth0, link-type EN10MB (Ethernet), capture size 262144 bytes
IP6 (flowlabel 0xa72b0, hlim 255, next-header ICMPv6 (58) payload length: 56) fe80::1 > ff02::1: [icmp6 sum ok] ICMP6, router advertisement, length 56
    hop limit 64, Flags [none], pref medium, router lifetime 1800s, reachable time 0ms, retrans timer 0ms
      prefix info option (3), length 32 (4): 2001:db8::/64, Flags [onlink, auto], valid time 86400s, pref. time 14400s
        0x0000:  40c0 0001 5180 0000 3840 0000 0000 2001
        0x0010:  0db8 0000 0000 0000 0000 0000 0000
      source link-address option (1), length 8 (1): 00:00:5e:00:53:02
        0x0000:  0000 5e00 5302

host のインターフェイスは、RA を受信して prefix オプションを使ってアドレスを自動設定する。

$ sudo ip netns exec host ip address show dynamic ht-veth0
11: ht-veth0@if10: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc noqueue state UP group default qlen 1000
    link/ether 00:00:5e:00:53:01 brd ff:ff:ff:ff:ff:ff link-netnsid 1
    inet6 2001:db8::200:5eff:fe00:5301/64 scope global dynamic mngtmpaddr 
       valid_lft 86379sec preferred_lft 14379sec

また、同時にルータ広告を送ってきたリンクローカルアドレスにデフォルトルートを設定する。

$ sudo ip netns exec host ip -6 route show
2001:db8::/64 dev ht-veth0 proto kernel metric 256 expires 86388sec pref medium
fe80::/64 dev ht-veth0 proto kernel metric 256 pref medium
default via fe80::1 dev ht-veth0 proto ra metric 1024 expires 1788sec hoplimit 64 pref medium

RDNSS の設定を追加してみる

RA の基本的な動作が確認できたので、つづいては RDNSS の設定を追加してパケットを観察してみる。

radvd の設定ファイルに RDNSS の設定を追加する。 配布する DNS サーバのアドレスは 2001:db8::dead:beef に指定した。

$ cat << 'EOF' > radvd.conf
interface gw-veth0 {

    # 定期的にルータ広告を送る
    AdvSendAdvert on;

    # SLAAC でアドレスの自動生成に使うプレフィックスを広告する
    prefix 2001:db8::/64 { };

    # RDNSS (RFC 8106) で DNS サーバを広告する
    RDNSS 2001:db8::dead:beef { };
};
EOF

radvd のプロセスに SIGHUP を送って設定ファイルを読み直させる。

$ sudo kill -HUP $(cat /var/run/radvd.pid)

すると、次のとおり RDNSS オプションを含む RA が観察できた。

$ sudo ip netns exec host tcpdump -tnlvv -i ht-veth0 ip6
tcpdump: listening on ht-veth0, link-type EN10MB (Ethernet), capture size 262144 bytes

...(snip)...

IP6 (flowlabel 0xa72b0, hlim 255, next-header ICMPv6 (58) payload length: 80) fe80::1 > ff02::1: [icmp6 sum ok] ICMP6, router advertisement, length 80
    hop limit 64, Flags [none], pref medium, router lifetime 1800s, reachable time 0ms, retrans timer 0ms
      prefix info option (3), length 32 (4): 2001:db8::/64, Flags [onlink, auto], valid time 86400s, pref. time 14400s
        0x0000:  40c0 0001 5180 0000 3840 0000 0000 2001
        0x0010:  0db8 0000 0000 0000 0000 0000 0000
      rdnss option (25), length 24 (3):  lifetime 600s, addr: 2001:db8::dead:beef
        0x0000:  0000 0000 0258 2001 0db8 0000 0000 0000
        0x0010:  0000 dead beef
      source link-address option (1), length 8 (1): 00:00:5e:00:53:02
        0x0000:  0000 5e00 5302

dnsmasq (RA + Stateless DHCPv6)

つづいては dnsmasq を使って RA + Stateless DHCPv6 のパターンを検証してみる。

どうやら dnsmasq には RA を送る機能もあるようなので、いったん radvd のプロセスは kill しておく。

$ sudo kill $(cat /var/run/radvd.pid)

あらためて tcpdump をしかけておく。

$ sudo ip netns exec host tcpdump -tnlvv -i ht-veth0 ip6

そして、dnsmasq を起動する。 --enable-ra オプションが RA を送る指定になっている。

$ sudo ip netns exec router dnsmasq \
  --enable-ra \
  --dhcp-range=::,constructor:gw-veth0,ra-stateless \
  --dhcp-option=option6:dns-server,[2001:db8::dead:beef] \
  --dhcp-option=option6:ntp-server,[2001:db8::dead:beef] \
  --no-daemon

準備ができたら isc-dhcp の dhclient を起動する。 -6 オプションと -S オプションを組み合わせることで IPv6 SLAAC のモードになる。

$ sudo ip netns exec host dhclient -6 -S ht-veth0

tcpdump のターミナルを確認すると、次のとおり RA と Stateless DHCPv6 のパケットがやり取りされていることがわかる。 なお、RA にはデフォルトで RDNSS オプションが付与されるらしい。 まあ、たしかに Stateless DHCPv6 と同じ DNS サーバのアドレスを配布するなら、オプションがあっても副作用はとくにないのかな?

$ sudo ip netns exec host tcpdump -tnlvv -i ht-veth0 ip6
tcpdump: listening on ht-veth0, link-type EN10MB (Ethernet), capture size 262144 bytes

... (snip) ...

IP6 (class 0xc0, flowlabel 0xa72b0, hlim 255, next-header ICMPv6 (58) payload length: 88) fe80::1 > ff02::1: [icmp6 sum ok] ICMP6, router advertisement, length 88
    hop limit 64, Flags [other stateful], pref medium, router lifetime 1800s, reachable time 0ms, retrans timer 0ms
      prefix info option (3), length 32 (4): 2001:db8::/64, Flags [onlink, auto], valid time 3600s, pref. time 3600s
        0x0000:  40c0 0000 0e10 0000 0e10 0000 0000 2001
        0x0010:  0db8 0000 0000 0000 0000 0000 0000
      mtu option (5), length 8 (1):  1500
        0x0000:  0000 0000 05dc
      source link-address option (1), length 8 (1): 00:00:5e:00:53:02
        0x0000:  0000 5e00 5302
      rdnss option (25), length 24 (3):  lifetime 3600s, addr: 2001:db8::dead:beaf
        0x0000:  0000 0000 0e10 2001 0db8 0000 0000 0000
        0x0010:  0000 dead beaf

IP6 (flowlabel 0x5f40a, hlim 1, next-header UDP (17) payload length: 48) fe80::200:5eff:fe00:5301.546 > ff02::1:2.547: [bad udp cksum 0xafc9 -> 0x2ffe!] dhcp6 inf-req (xid=29221d (client-ID hwaddr/time type 1 time 634459916 00005e005301) (option-request DNS-server DNS-search-list Client-FQDN SNTP-servers) (elapsed-time 0))

IP6 (class 0xc0, flowlabel 0x48785, hlim 64, next-header UDP (17) payload length: 76) fe80::1.547 > fe80::200:5eff:fe00:5301.546: [bad udp cksum 0xaf61 -> 0x705b!] dhcp6 reply (xid=29221d (client-ID hwaddr/time type 1 time 634459916 00005e005301) (server-ID hwaddr/time type 1 time 634459172 00005e005302) (DNS-server 2001:db8::dead:beaf) (lifetime 3600))

めでたしめでたし。

参考文献

tools.ietf.org

tools.ietf.org

tools.ietf.org

linux.die.net

linux.die.net

linux.die.net