CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: PyTorch の MLP で Iris データセットを分類してみる

ニューラルネットワークのことが何も分からないので少しずつでも慣れていきたい。 そのためには、とにかくたくさんのコードを読んで書くしかないと思う。 一環として、今回はこれ以上ないほど簡単なタスクを解くコードを書いてみる。 具体的には、シンプルな MLP (Multi Layer Perceptron) を使って Iris データセットの多値分類タスクを解く。 実用性はないけど PyTorch のやり方は一通り詰まっている感じがする。

使った環境は次のとおり。

$ sw_vers 
ProductName:        macOS
ProductVersion:     14.5
BuildVersion:       23F79
$ python -V        
Python 3.11.9
$ pip list | egrep "(torch|scikit-learn)"
scikit-learn      1.5.0
torch             2.3.0

もくじ

下準備

下準備として PyTorch と scikit-learn をインストールしておく。

$ pip install torch scikit-learn

サンプルコード

早速だけど以下にサンプルコードを示す。 説明は適宜コメントを入れている。 やっていることに対して行数が多い印象は受ける。 とはいえ、生の PyTorch を使う場合はこんな感じなんだろう。

#!/usr/bin/env python3

"""PyTorch を使って Iris データセットを分類する
モデルには 3 層 MLP を使用する
"""

import random

import torch
import torch.nn as nn
import torch.optim as optim
from sklearn import datasets
from sklearn.metrics import f1_score, precision_score, recall_score
from sklearn.model_selection import train_test_split
from torch.nn import functional as F
from torch.utils.data import DataLoader, Dataset


def set_seed(seed):
    """シード値を設定する"""
    random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)


class OnMemoryDataset(Dataset):
    """オンメモリ上の NumPy / Tensor 配列を使用するデータセット"""

    def __init__(self, x, y):
        self.x = torch.from_numpy(x) if not torch.is_tensor(x) else x
        self.y = torch.from_numpy(y) if not torch.is_tensor(y) else y
        self.x = self.x.to(torch.float32)

    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        return self.x[idx], self.y[idx]


class MLPClassifier(nn.Module):
    """3 層 MLP の分類器"""

    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(4, 100)
        self.fc2 = nn.Linear(100, 50)
        self.fc3 = nn.Linear(50, 3)

    def forward(self, batch):
        out = self.fc1(batch)
        out = F.relu(out)
        out = self.fc2(out)
        out = F.relu(out)
        out = self.fc3(out)
        return out


def evaluate(model, dataloader, device, criterion):
    """評価・検証用データに対する損失や評価指標を求める関数"""
    # モデルを評価モードにする
    model.eval()
    all_labels = []
    all_preds = []
    running_loss = 0.0
    # 勾配は必要ない
    with torch.no_grad():
        # DataLoader からバッチを読み込む
        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            # 損失を求める
            loss = criterion(outputs, labels)
            running_loss += loss.item()
            _, preds = torch.max(outputs, 1)
            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(preds.cpu().numpy())

    # 損失、Precision、Recall、F-1 スコアを求める
    average_loss = running_loss / len(dataloader)
    precision = precision_score(all_labels, all_preds, average="macro", zero_division=0)
    recall = recall_score(all_labels, all_preds, average="macro")
    f1 = f1_score(all_labels, all_preds, average="macro")

    return average_loss, precision, recall, f1


def acceleration_device(force=None):
    """環境によって計算に使うデバイスを切り替える関数"""
    if force is not None:
        return force
    if torch.cuda.is_available():
        return "cuda"
    if torch.backends.mps.is_available():
        return "mps"
    return "cpu"


def main():
    # 乱数シードを指定する
    set_seed(42)

    # Iris データセットを読み込む
    x, y = datasets.load_iris(return_X_y=True)

    # 学習データと評価データに分割する
    train_x, test_x, train_y, test_y = train_test_split(
        x,
        y,
        shuffle=True,
        random_state=42,
        test_size=0.2,
    )

    # 学習データと検証データに分割する
    train_x, valid_x, train_y, valid_y = train_test_split(
        train_x,
        train_y,
        shuffle=True,
        random_state=42,
        test_size=0.3,
    )

    # DataLoader で読み込むバッチサイズ
    batch_size = 64

    # 学習データ
    train_dataset = OnMemoryDataset(
        train_x,
        train_y,
    )
    train_dataloader = DataLoader(
        train_dataset,
        batch_size=batch_size,
        shuffle=True,
    )
    # 検証データ
    valid_dataset = OnMemoryDataset(
        valid_x,
        valid_y,
    )
    valid_dataloader = DataLoader(
        valid_dataset,
        batch_size=batch_size,
        shuffle=False,
    )
    # 評価データ
    test_dataset = OnMemoryDataset(
        test_x,
        test_y,
    )
    test_dataloader = DataLoader(
        test_dataset,
        batch_size=batch_size,
        shuffle=False,
    )

    # モデル
    model = MLPClassifier()
    # 損失関数
    criterion = nn.CrossEntropyLoss()
    # オプティマイザ
    optimizer = optim.Adam(model.parameters(), lr=1e-3)
    # 計算に用いるデバイス
    device = acceleration_device()
    model = model.to(device)
    print(f"Device: {device}")

    # 最大エポック数
    num_epochs = 1000
    # ログを出力するバッチ間隔
    log_interval = -1
    # 改善が見られなかった場合に停止する Early Stopping のエポック数
    early_stopping_patience = 10
    # Early Stopping に使うカウンタ
    early_stopping_patience_counter = 0
    # Early Stopping に使う検証データに対する損失
    early_stopping_best_val_loss = float("inf")

    # エポックを回す
    for epoch in range(num_epochs):
        # モデルを学習モードにする
        model.train()
        # エポック単位で計算する損失
        running_loss = 0.0
        # DataLoader からバッチを読み込む
        for batch_idx, (inputs, labels) in enumerate(train_dataloader):
            # デバイスに転送する
            inputs, labels = inputs.to(device), labels.to(device)
            # 勾配を初期化する
            optimizer.zero_grad()
            # 出力を得る
            outputs = model(inputs)
            # 損失を求める
            loss = criterion(outputs, labels)
            # 勾配を求める
            loss.backward()
            # 最適化する
            optimizer.step()
            # バッチの損失を足す
            running_loss += loss.item()

            if log_interval == -1:
                continue

            # 指定されたバッチ数の間隔でログを出力する
            if (batch_idx + 1) % log_interval == 0:
                print(
                    f"Epoch [{epoch + 1}/{num_epochs}], Batch [{batch_idx + 1}/{len(train_dataloader)}], Training Loss: {loss.item():.4f}"
                )

        # エポックでの学習損失を求める
        print(
            f"Epoch [{epoch + 1}/{num_epochs}], Training Loss: {running_loss / len(train_dataloader):.4f}"
        )

        # 検証データに対する損失や評価指標をログに出力する
        val_loss, val_precision, val_recall, val_f1 = evaluate(
            model, valid_dataloader, device, criterion
        )
        print(
            f"Epoch [{epoch + 1}/{num_epochs}], Validation Loss: {val_loss:.4f}, Precision: {val_precision:.4f}, Recall: {val_recall:.4f}, F1 Score: {val_f1:.4f}"
        )

        # 検証データの損失を元に Early Stopping の処理をする
        if val_loss < early_stopping_best_val_loss:
            early_stopping_best_val_loss = val_loss
            early_stopping_patience_counter = 0
        else:
            early_stopping_patience_counter += 1

        # 条件を満たしたら学習のループを抜ける
        if early_stopping_patience_counter >= early_stopping_patience:
            print(f"Early stopping at epoch {epoch + 1}")
            break

    # 学習が完了した
    print("Training Finished")

    # 評価データに対する損失や評価指標をログに出力する
    test_loss, test_precision, test_recall, test_f1 = evaluate(
        model, test_dataloader, device, criterion
    )
    print(
        f"Test Set Evaluation - Loss: {test_loss:.4f}, Precision: {test_precision:.4f}, Recall: {test_recall:.4f}, F1 Score: {test_f1:.4f}"
    )


if __name__ == "__main__":
    main()

上記を実行してみよう。

$ python irismlp.py
Device: mps
Epoch [1/1000], Training Loss: 1.0881
Epoch [1/1000], Validation Loss: 1.0115, Precision: 0.1389, Recall: 0.3333, F1 Score: 0.1961
Epoch [2/1000], Training Loss: 1.0227
Epoch [2/1000], Validation Loss: 0.9958, Precision: 0.4691, Recall: 0.3667, F1 Score: 0.2536
Epoch [3/1000], Training Loss: 0.9711
Epoch [3/1000], Validation Loss: 0.9820, Precision: 0.4744, Recall: 0.6667, F1 Score: 0.5315
...(省略)...
Epoch [95/1000], Training Loss: 0.0695
Epoch [95/1000], Validation Loss: 0.1670, Precision: 0.9286, Recall: 0.9333, F1 Score: 0.9230
Epoch [96/1000], Training Loss: 0.0726
Epoch [96/1000], Validation Loss: 0.1752, Precision: 0.9286, Recall: 0.9333, F1 Score: 0.9230
Epoch [97/1000], Training Loss: 0.0694
Epoch [97/1000], Validation Loss: 0.1925, Precision: 0.9286, Recall: 0.9333, F1 Score: 0.9230
Early stopping at epoch 97
Training Finished
Test Set Evaluation - Loss: 0.0939, Precision: 1.0000, Recall: 1.0000, F1 Score: 1.0000

最終的に評価用のデータに対して高い評価指標が得られている。

いじょう。