CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: 集約特徴量を作るための scikit-learn Transformer 互換クラスの実装例について

ふと、集約特徴量を作るための scikit-learn Transformer 互換な実装を巷であまり見かけないなと思った。 そこで、自作しているものを公開してみる。

使った環境は次のとおり。

$ sw_vers          
ProductName:    macOS
ProductVersion: 12.4
BuildVersion:   21F79
$ python -V                                   
Python 3.9.12
$ pip list | egrep "(pandas|scikit-learn)"
pandas                        1.4.2
scikit-learn                  1.1.1

もくじ

集約特徴量とは

まずは、このエントリで集約特徴量と呼んでいるものについて説明する。 あらかじめ断っておくと、一般的に明確な呼称と定義があるわけではない。 Pandas で言えば、groupby() してから agg() して、それを元のデータフレームあるいは別のデータフレームとマージしたカラムのこと。 これによって、行方向の加工だけでは得られない、予測にとって有益な情報を列方向 (= 別の行) から抽出できる。

たとえば Count Encoding なんかも、集約特徴量の一種と言える。 agg() で計算する統計量が、出現回数 (count) というだけ。 今回は、そういった特定の統計量に特化させるのではなく、汎用に使えるものが欲しいよねという気持ちがある。

どうして必要なのか

まず、集約特徴量を作るのに、特に大した労力は必要ない。 前述したとおり、groupby() してから agg() した上で、データフレームを merge() するだけで得られる。 では、何故わざわざ scikit-learn の Transformer 互換の実装が必要になるのか。 それは、新規のデータの予測に使う場合を考えてのこと。

たとえば、あらかじめ学習データとテストデータが静的な CSV ファイルとして与えられる状況を考えてみる。 この場合、テストデータに集約特徴量を付与するには、両者を単純に連結して計算しちゃうことも考えられる。 統計量を計算する対象が目的変数でなければ、テストデータにも同じデータはあるはずなので、できてしまう。 しかし、このやり方ではまったく新しいデータに特徴量を付与したくなったときに都合が良くない。 なぜなら、まったく新しいデータに適用するには、既存のデータで計算した統計量を何処かに保存しておく必要があるため。 統計量の入ったデータフレームをバラバラと管理するのは汎用性がないので、できればやりたくない。

じゃあ、どこに保存しておくかというと、何らかのインスタンスのメンバに入っていると都合が良さそう。 そして、こういった場合によく用いられるのが scikit-learn の Transformer API を実装したクラス、ということになる。 scikit-learn の Transformer は、学習するだけの fit() メソッドと、適用するだけの transform() メソッドがある。 これによって、fit() で統計量を計算して、transform() でデータフレームとマージ、と処理が分けられる。

下準備

前置きが長くなったけど、ここからは実際にサンプルコードを示していく。 あらかじめ、必要なパッケージをインストールしておく。

$ pip install pandas scikit-learn

実装例

以下に集約特徴量を生成するための scikit-learn Transformer 互換の API を備えたクラスを示す。 使い方については後述する。

from __future__ import annotations

from typing import Callable
from typing import Iterable
from typing import Union

import pandas as pd
from sklearn.base import TransformerMixin
from sklearn.base import BaseEstimator


class AggregationEncoder(TransformerMixin, BaseEstimator):
    """集約特徴量を生成するための scikit-learn Transformer 互換 API を備えたクラス

    pandas の DataFrame が入力と出力になる
    """

    def __init__(
        self,
        group_keys: list[Union[str, Iterable[str]]],
        group_values: list[str],
        agg_methods: list[Union[str, Callable]],
    ):
        """

        :param group_keys: グループ化に使うカラム名
        :param group_values: 集約特徴量を計算するカラム名
        :param agg_methods: 計算する集約特徴量の種類 (統計量)
        """
        self.group_keys = group_keys
        self.group_values = group_values
        self.agg_methods = agg_methods

        # エンコードしたカラム名の一覧
        self.encoded_cols_: list[str] = []
        # Aggregation したデータフレームを記録しておく場所
        self._group_df_agg: list[tuple[Union[str, Iterable[str]], pd.DataFrame]] = []

    def fit(self, input_df: pd.DataFrame) -> TransformerMixin:
        """集約特徴量の生成に使う統計量を学習する

        :param input_df: 集約特徴量を生成するための統計量を求めるデータ
        """
        for agg_key in self.group_keys:
            for agg_method in self.agg_methods:

                # 集約方法の名前を作る
                if isinstance(agg_method, str):
                    # 文字列ならそのまま使う
                    agg_method_name = agg_method
                elif callable(agg_method):
                    # 呼び出し可能オブジェクトは名前を取り出して使う
                    agg_method_name = agg_method.__name__
                else:
                    raise ValueError(
                        "'agg_methods' must be a list of str or callable objects"
                    )

                for agg_value in self.group_values:
                    # 集約特徴量を作るために必要なカラムを求める
                    need_cols = [agg_value]
                    if isinstance(agg_key, str):
                        need_cols += [agg_key]
                    elif isinstance(agg_key, Iterable):
                        need_cols += list(agg_key)
                    else:
                        raise ValueError(
                            "'group_keys' must be a list of str or iterable objects"
                        )
                    # 集約特徴量を作るための計算
                    df_group_by = input_df[need_cols].groupby(agg_key)
                    df_agg = df_group_by[[agg_value]].agg(agg_method)

                    # 追加するカラムの名前を生成する
                    agg_key_name = (
                        agg_key if isinstance(agg_key, str) else "".join(agg_key)
                    )
                    # 名前の生成パターンが気に食わないときはココを変更する
                    col_name = "agg_{0}_{1}_groupby_{2}".format(
                        agg_method_name,
                        agg_value,
                        agg_key_name,
                    )
                    df_agg.columns = [col_name]

                    self.encoded_cols_.append(col_name)
                    self._group_df_agg.append((agg_key, df_agg))
        return self

    def transform(self, input_df: pd.DataFrame) -> pd.DataFrame:
        """集約特徴量を生成する

        :param input_df: 学習済みの集約特徴量を生成する対象となるデータ
        :return: 生成された集約特徴量
        """
        # 副作用が生じないようにコピーする
        new_df = input_df.copy()

        # 生成対象のデータフレームとマージしていく
        for group_key, df_agg in self._group_df_agg:
            new_df = pd.merge(
                new_df, df_agg, how="left", right_index=True, left_on=group_key
            )

        # 元々あったカラムは落とす
        new_df.drop(input_df.columns, axis=1, inplace=True)

        return new_df

    def fit_transform(self, input_df: pd.DataFrame) -> pd.DataFrame:
        """集約特徴量を学習して、同時に生成まで行う

        :param input_df: 集約特徴量を生成する対象のデータ
        :return: 生成された集約特徴量
        """
        self.fit(input_df)
        return self.transform(input_df)

    def get_params(self, deep=True) -> dict[str, object]:
        return {
            "group_keys": self.group_keys,
            "group_values": self.group_values,
            "agg_methods": self.agg_methods,
        }

上記を aggregation.py とか適当な名前で保存しておこう。

使ってみる

上記を利用するサンプルコードをいくつか書いてみよう。 上記のファイルがあるのと同じディレクトリで Python の REPL を立ち上げる。

$ python

基本的な使い方

たとえば、以下のような Pandas のデータフレームがあるとする。

>>> data = {
...     "group": ["A", "A", "A", "B", "B", "C"],
...     "value": [10, 20, 30, 40, 50, 60],
... }
>>> import pandas as pd
>>> df = pd.DataFrame(data)
>>> df
  group  value
0     A     10
1     A     20
2     A     30
3     B     40
4     B     50
5     C     60

上記から AggregationEncoder を使って集約特徴量を計算する準備をする。 まず、グルーピングのキーとして group_keys 引数にリストで group カラムを指定する。 同様に、バリューとして group_values 引数にリストで value カラムを指定する。 計算する統計量としては、agg_methods 引数で出現回数 (count) と平均 (mean) を指定した。

>>> from aggregation import AggregationEncoder
>>> encoder = AggregationEncoder(
...     group_keys=["group"],
...     group_values=["value"],
...     agg_methods=["count", "mean"]
... )

先ほどのデータフレームに対して fit_transform() メソッドを呼び出すと、次のとおり集約特徴量が生成される。 それぞれ、グループの出現回数と平均値になっている。

>>> encoder.fit_transform(df)
   agg_count_value_groupby_group  agg_mean_value_groupby_group
0                              3                          20.0
1                              3                          20.0
2                              3                          20.0
3                              2                          45.0
4                              2                          45.0
5                              1                          60.0

まったく新しいデータに適用する

前述したとおり、scikit-learn の Transformer にしたからには、新規のデータに適用したい。 次はそれを試してみよう。

以下のとおり、新しいデータフレームを用意する。 このデータフレームには、学習したデータには含まれていなかった Z という値も含まれている。

>>> data = {
...     "group": ["A", "B", "C", "Z"],
... }
>>> test_df = pd.DataFrame(data)
>>> test_df
  group
0     A
1     B
2     C
3     Z

先ほど学習させた AggregationEncodertransform() メソッドを呼び出してデータフレームを渡す。 すると、学習データの統計量を元に集約特徴量が生成されることがわかる。 初見の値については NaN になっている。

>>> encoder.transform(test_df)
   agg_count_value_groupby_group  agg_mean_value_groupby_group
0                            3.0                          20.0
1                            2.0                          45.0
2                            1.0                          60.0
3                            NaN                           NaN

特徴量を総当り的に作る

こういった特徴量は、総当り的にばーっと作って、とりあえずモデルに突っ込むというパターンが頻出する。 なので、そういったニーズも満たせるように作ってある。

たとえば、次のようにカテゴリ変数と連続変数が 2 つずつあるようなデータフレームを考える。

>>> import numpy as np
>>> data = {
...     "group1": ["A", "A", "A", "B", "B", "C"],
...     "group2": ["x", "y", "z", "x", "y", "z"],
...     "value1": [10, 20, 30, 40, 50, 60],
...     "value2": [np.nan, 500, 400, 300, 200, 100],
... }
>>> df = pd.DataFrame(data)
>>> df
  group1 group2  value1  value2
0      A      x      10     NaN
1      A      y      20   500.0
2      A      z      30   400.0
3      B      x      40   300.0
4      B      y      50   200.0
5      C      z      60   100.0

この場合は、group_keysgroup_values に計算したいカラムをリストで放り込んでおけばいい。

>>> encoder = AggregationEncoder(
...     group_keys=["group1", "group2"],
...     group_values=["value1", "value2"],
...     agg_methods=["min", "max"]
... )

あとは、「グルーピングのキー x グルーピングのバリュー x 特徴量の種類」の組み合わせで特徴量をばーっと作れる。 今回であれば「2 x 2 x 2 = 8」カラムになる。

>>> encoder.fit_transform(df)
   agg_min_value1_groupby_group1  agg_min_value2_groupby_group1  ...  agg_max_value1_groupby_group2  agg_max_value2_groupby_group2
0                             10                          400.0  ...                             40                          300.0
1                             10                          400.0  ...                             50                          500.0
2                             10                          400.0  ...                             60                          400.0
3                             40                          200.0  ...                             40                          300.0
4                             40                          200.0  ...                             50                          500.0
5                             60                          100.0  ...                             60                          400.0

[6 rows x 8 columns]

生成されるカラム名を取得する

組み合わせが増えると、どんなカラムができるんだっけ?と後から確認したくなる。 なので、生成されるカラムの名前を encoded_cols_ メンバで得られる。

>>> from pprint import pprint
>>> pprint(encoder.encoded_cols_)
['agg_min_value1_groupby_group1',
 'agg_min_value2_groupby_group1',
 'agg_max_value1_groupby_group1',
 'agg_max_value2_groupby_group1',
 'agg_min_value1_groupby_group2',
 'agg_min_value2_groupby_group2',
 'agg_max_value1_groupby_group2',
 'agg_max_value2_groupby_group2']

複数のキーでグルーピングしたい

複数のキーを使ってグルーピングしたい、という場合もあるはず。 そんなときは group_keys の要素にリストで複数のカラムを放り込む。

>>> encoder = AggregationEncoder(
...     group_keys=[["group1", "group2"]],
...     group_values=["value1"],
...     agg_methods=["mean"]
... )

上記であれば group1group2 の両方が一致するものを、同じグループとして扱う。 今回の指定とデータフレームだと、同じグループが存在しないので value1 の値がそのまま出てしまうけど。

>>> encoder.fit_transform(df)
   agg_mean_value1_groupby_group1group2
0                                  10.0
1                                  20.0
2                                  30.0
3                                  40.0
4                                  50.0
5                                  60.0

カスタマイズした統計量を計算する

既存の統計量だけでなく、自分なりの統計量を計算したいというニーズもある。 そんなときは、各グループで層化されたデータフレームを受け取って統計量を計算する関数を用意する。 以下では、たとえば中央値を計算している。 まあ、これは単に "median" を指定すれば良いだけなんだけど。

>>> def median(x):
...     return x.quantile(0.5)
... 

上記の関数を agg_methods に放り込めば良い。

>>> encoder = AggregationEncoder(
...     group_keys=["group1"],
...     group_values=["value1"],
...     agg_methods=[median]
... )

後は何も変わらない。カラム名の中には関数名が入る。

>>> encoder.fit_transform(df)
   agg_median_value1_groupby_group1
0                              20.0
1                              20.0
2                              20.0
3                              45.0
4                              45.0
5                              60.0

まとめ

今回は、集約特徴量を作るための scikit-learn Transoformer 互換クラスの実装例を紹介した。