CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: TensorFlow/Keras で Word2Vec の SGNS を実装してみる

以前のエントリで、Word2Vec の CBOW (ContinuousBagOfWords) モデルを TensorFlow/Keras で実装した。 CBOW は、コンテキスト (周辺語) からターゲット (入力語) を推定する多値分類のタスクが考え方のベースになっている。

blog.amedama.jp

今回扱うのは、CBOW と対を成すモデルの Skip Gram をベースにした SGNS (Skip Gram with Negative Sampling) になる。 Skip Gram では、CBOW とは反対にターゲット (入力語) からコンテキスト (周辺語) を推定する多値分類のタスクを扱う。 ただし、with Negative Sampling と付くことで、タスクを多値分類から二値分類にして計算量を削減している。 SGNS では、ターゲットとコンテキストを入力にして、それらが共起 (Co-occurrence) するか否かを推定することになる。 コーパスを処理して実際に共起する単語ペアを正例、出現頻度を元にランダムにサンプルした単語ペアを共起していない負例としてモデルに与える。

使った環境は次のとおり。

$ sw_vers
ProductName:    macOS
ProductVersion: 11.2.1
BuildVersion:   20D74
$ python -V  
Python 3.8.7

下準備

まずは、必要なパッケージをインストールする。

$ pip install tensorflow gensim scipy tqdm

そして、コーパスとして PTB (Penn Treebank) データセットをダウンロードしておく。

$ wget https://raw.githubusercontent.com/tomsercu/lstm/master/data/ptb.train.txt

サンプルコード

早速だけど、サンプルコードを以下に示す。 いくらかマジックナンバーがコードに残ってしまっていて、あんまりキレイではないかも。 各エポックの終了時には、WordSim353 データセットを使って単語間類似度で単語埋め込みを評価している。 また、学習が終わった後には、いくつかの単語で類似する単語や類推語の結果を確認している。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from __future__ import annotations

import re
from itertools import count
from typing import Iterable
from typing import Iterator
from functools import reduce
from functools import partial
from collections import Counter

import numpy as np  # type: ignore
import tensorflow as tf  # type: ignore
from tensorflow.keras import Model  # type: ignore
from tensorflow.keras.layers import Embedding  # type: ignore
from tensorflow import Tensor  # type: ignore
from tensorflow.data import Dataset  # type: ignore
from tensorflow.keras.optimizers import Adam  # type: ignore
from tensorflow.keras.layers import Dense  # type: ignore
from tensorflow.keras.models import Sequential  # type: ignore
from tensorflow.keras.callbacks import Callback  # type: ignore
from tensorflow.keras.layers import Dot  # type: ignore
from tensorflow.keras.layers import Flatten  # type: ignore
from tensorflow.keras.losses import BinaryCrossentropy  # type: ignore
from gensim.test.utils import datapath  # type: ignore
from scipy.stats import pearsonr  # type: ignore
from tqdm import tqdm  # type: ignore


class SkipGramWithNegativeSampling(Model):
    """Word2Vec の SGNS モデルを実装したクラス"""

    def __init__(self, vocab_size: int, embedding_size: int):
        super().__init__()

        # ターゲット (入力語) の埋め込み
        self.target_embedding = Embedding(input_dim=vocab_size,
                                          input_shape=(1, ),
                                          output_dim=embedding_size,
                                          name='word_embedding',
                                          )
        # コンテキスト (周辺語) の埋め込み
        self.context_embedding = Embedding(input_dim=vocab_size,
                                           input_shape=(1, ),
                                           output_dim=embedding_size,
                                           name='context_embedding',
                                           )

        self.dot = Dot(axes=1)
        self.output_layer = Sequential([
            Flatten(),
            Dense(1, activation='sigmoid'),
        ])

    def call(self, inputs: Tensor) -> Tensor:
        # ターゲットのベクトルを取り出す
        target_label = inputs[:, 0]
        target_vector = self.target_embedding(target_label)
        # コンテキストのベクトルを取り出す
        context_label = inputs[:, 1]
        context_vector = self.context_embedding(context_label)
        # ターゲットとコンテキストの内積を計算する
        x = self.dot([target_vector, context_vector])
        # 共起したか・していないかを二値の確率にする
        prediction = self.output_layer(x)
        return prediction


def cosine_similarity_one_to_one(x, y):
    """1:1 のコサイン類似度"""
    nx = x / np.sqrt(np.sum(x ** 2))
    ny = y / np.sqrt(np.sum(y ** 2))
    return np.dot(nx, ny)


class WordSimilarity353Callback(Callback):
    """WordSim353 データセットを使って単語間の類似度を評価するコールバック"""

    def __init__(self, word_id_table: dict[str, int]):
        super().__init__()

        self.word_id_table = word_id_table
        self.model = None

        # 評価用データを読み込む
        self.eval_data = []
        wordsim_filepath = datapath('wordsim353.tsv')
        with open(wordsim_filepath, mode='r') as fp:
            # 最初の 2 行はヘッダなので読み飛ばす
            fp.readline()
            fp.readline()
            for line in fp:
                word1, word2, sim_score = line.strip().split('\t')
                self.eval_data.append((word1, word2, float(sim_score)))

    def set_model(self, model):
        self.model = model

    def on_epoch_end(self, epoch, logs=None):
        # モデルから学習させたレイヤーの重みを取り出す
        model_layers = {layer.name: layer for layer in self.model.layers}
        embedding_layer = model_layers['word_embedding']
        word_vectors = embedding_layer.weights[0].numpy()

        # 評価用データセットに含まれる単語間の類似度を計算する
        labels = []
        preds = []
        for word1, word2, sim_score in self.eval_data:
            # Out-of-Vocabulary な単語はスキップ
            if word1 not in self.word_id_table or word2 not in self.word_id_table:
                continue

            # コサイン類似度を計算する
            word1_vec = word_vectors[self.word_id_table[word1]]
            word2_vec = word_vectors[self.word_id_table[word2]]
            pred = cosine_similarity_one_to_one(word1_vec, word2_vec)
            preds.append(pred)
            # 正解ラベル
            labels.append(sim_score)

        # ピアソンの相関係数を求める
        r_score = pearsonr(labels, preds)[0]
        print(f'Pearson\'s r score with WordSim353: {r_score}')


def load_corpus(filepath: str) -> Iterator[str]:
    """テキストファイルからコーパスを読み出す"""
    with open(filepath, mode='r') as fp:
        for line in fp:
            # 改行コードは取り除く
            yield line.rstrip()


def sentences_to_words(sentences: Iterable[str], lower: bool = True) -> Iterator[list[str]]:
    """文章を単語に分割する"""
    for sentence in sentences:
        if lower:
            sentence = sentence.lower()
        words = re.split('\\W+', sentence)
        yield [word for word in words if len(word) > 0]  # 空文字は取り除く


def word_id_mappings(sentences: Iterable[Iterable[str]]) -> dict[str, int]:
    """単語を ID に変換する対応テーブルを作る"""
    counter = count(start=0)

    word_to_id = {}
    for sentence in sentences:
        for word in sentence:

            if word in word_to_id:
                # 登録済みの単語はスキップする
                continue

            # 単語の識別子を採番する
            word_id = next(counter)
            word_to_id[word] = word_id

    return word_to_id


def words_to_ids(sentences: Iterable[list[str]], word_to_id: dict[str, int]) -> Iterator[list[int]]:
    # 単語を対応するインデックスに変換する
    for words in sentences:
        # NOTE: Out-of-Vocabulary への対応がない
        yield [word_to_id[word] for word in words]


def extract_contexts(word_ids: Tensor, window_size: int) -> Tensor:
    """コンテキストの単語をラベル形式で得る"""
    target_ids = word_ids[:-window_size]
    context_ids = word_ids[window_size:]
    # ウィンドウサイズ分ずらした Tensor 同士をくっつける
    co_occurrences = tf.transpose([target_ids, context_ids])
    # 逆順でも共起したのは同じ
    reversed_co_occurrences = tf.transpose([context_ids, target_ids])
    concat_co_occurrences = tf.concat([co_occurrences,
                                       reversed_co_occurrences],
                                      axis=0)
    # ラベル (正例なので 1)
    labels = tf.ones_like(concat_co_occurrences[:, 0],
                          dtype=tf.int8)
    return concat_co_occurrences, labels


def positive_pipeline(ds: Dataset, window_size: int) -> Dataset:
    """正例を供給するパイプライン"""

    ctx_ds_list = []
    for window in range(1, window_size + 1):
        partialed = partial(extract_contexts, window_size=window)
        # ウィンドウサイズごとに共起した単語を抽出する
        mapped_ds = ds.map(partialed,
                           num_parallel_calls=tf.data.AUTOTUNE,
                           deterministic=False)
        ctx_ds_list.append(mapped_ds)

    # すべての Dataset をつなげる
    context_ds = reduce(lambda l_ds, r_ds: l_ds.concatenate(r_ds),
                        ctx_ds_list)
    return context_ds


def word_frequency(sentences: Iterable[Iterable[str]], word_id_table: dict[str, int]) -> dict[str, int]:
    """単語の出現頻度を調べる"""
    counter = Counter(word for words in sentences for word in words)
    id_count = {word_id_table[word]: count for word, count in counter.items()}
    # ID 順でソートされた出現頻度
    sorted_freq = np.array([count for _, count in sorted(id_count.items(), key=lambda x: x[0])],
                           dtype=np.int32)
    return sorted_freq


def noisy_word_pairs(word_proba: list[float], eps: float = 1e-6) -> Iterator[Tensor]:
    """単語の出現頻度を元にネガティブサンプルの単語ペアを生成するジェネレータ関数"""
    p = tf.constant(word_proba) + eps
    logits = tf.math.log([p, p])
    while True:
        word_pair = tf.random.categorical(logits, num_samples=2**12)
        word_pair_t = tf.transpose(word_pair)
        # ラベル (負例なので 0)
        labels = tf.zeros_like(word_pair_t[:, 0], dtype=tf.int8)
        yield word_pair_t, labels


def negative_pipeline(sentences: Iterable[Iterable[str]], word_id_table: dict[str, int]) -> Dataset:
    """負例を供給するパイプライン"""
    # 単語の出現頻度からサンプリングテーブルを求める
    word_freq = word_frequency(sentences, word_id_table)
    word_proba = word_freq / np.sum(word_freq)
    # 0.75 乗することで、出現頻度の低い単語をちょっとだけ選ばれやすくする
    ADJUST_FACTOR = 0.75
    adjusted_word_proba = np.power(word_proba, ADJUST_FACTOR)
    adjusted_word_proba /= np.sum(adjusted_word_proba)
    # 単語の出現頻度を元にノイジーワードペアを生成する
    negative_ds = Dataset.from_generator(lambda: noisy_word_pairs(adjusted_word_proba),
                                         (tf.int32, tf.int8))

    return negative_ds


def batched_concat(pos_tensor: Tensor, neg_tensor: Tensor) -> Tensor:
    """正例と負例を直列に結合する関数"""
    pos_word_pairs, pos_labels = pos_tensor
    neg_word_pairs, neg_labels = neg_tensor
    word_pairs = tf.concat((pos_word_pairs, neg_word_pairs), axis=0)
    labels = tf.concat((pos_labels, neg_labels), axis=0)
    return word_pairs, labels


def skip_grams_with_negative_sampling_dataset(positive_ds: Dataset,
                                              negative_ds: Dataset,
                                              negative_sampling_ratio: int):
    """データセットで共起した単語ペアを正例、出現頻度を元にランダムに選んだ単語ペアを負例として供給するパイプライン"""
    positive_batch_size = 1024  # 正例の供給単位
    batched_pos_ds = positive_ds.unbatch().batch(positive_batch_size)
    batched_neg_ds = negative_ds.unbatch().batch(positive_batch_size * negative_sampling_ratio)
    zipped_ds = tf.data.Dataset.zip((batched_pos_ds, batched_neg_ds))
    concat_ds = zipped_ds.map(batched_concat,
                              num_parallel_calls=tf.data.AUTOTUNE,
                              deterministic=False).unbatch()
    # バッチサイズ単位でシャッフルする
    shuffle_buffer_size = positive_batch_size * (negative_sampling_ratio + 1)
    shuffled_ds = concat_ds.shuffle(buffer_size=shuffle_buffer_size)
    return shuffled_ds


def cosine_similarity_matrix(word_vectors: np.ndarray, eps: float = 1e-8) -> np.ndarray:
    """N:N のコサイン類似度を計算する"""
    word_norm = np.sqrt(np.sum(word_vectors ** 2, axis=1)).reshape(word_vectors.shape[0], -1)
    normalized_word_vectors = word_vectors / (word_norm + eps)
    cs_matrix = np.dot(normalized_word_vectors, normalized_word_vectors.T)
    return cs_matrix


def most_similar_words(similarities: np.ndarray, top_n: int = 5):
    """コサイン類似度が最も高い単語の ID を得る"""
    similar_word_ids = np.argsort(similarities)[::-1]
    top_n_word_ids = similar_word_ids[:top_n]
    top_n_word_sims = similarities[similar_word_ids][:top_n]
    return zip(top_n_word_ids, top_n_word_sims)


def cosine_similarity_one_to_many(word_vector: np.ndarray,
                                  word_vectors: np.ndarray,
                                  eps: float = 1e-8):
    """1:N のコサイン類似度"""
    normalized_word_vector = word_vector / np.sqrt(np.sum(word_vector ** 2))
    word_norm = np.sqrt(np.sum(word_vectors ** 2, axis=1)).reshape(word_vectors.shape[0], -1)
    normalized_word_vectors = word_vectors / (word_norm + eps)
    return np.dot(normalized_word_vector, normalized_word_vectors.T)


def main():
    # Penn Treebank コーパスを読み込む
    train_sentences = load_corpus('ptb.train.txt')

    # コーパスを単語に分割する
    train_corpus_words = list(sentences_to_words(train_sentences))

    # 単語に ID を振る
    word_to_id = word_id_mappings(train_corpus_words)

    # コーパスの語彙数
    vocab_size = len(word_to_id.keys())

    # データセットを準備する
    # ID に変換したコーパスを行ごとに読み出せるデータセット
    train_word_ids_ds = Dataset.from_generator(lambda: words_to_ids(train_corpus_words, word_to_id),
                                               tf.int32,
                                               output_shapes=[None])

    # 共起したと判断する単語の距離
    CONTEXT_WINDOW_SIZE = 5
    positive_ds = positive_pipeline(train_word_ids_ds, window_size=CONTEXT_WINDOW_SIZE)
    negative_ds = negative_pipeline(train_corpus_words, word_to_id)

    # 正例に対する負例の比率 (一般的に 5 ~ 10)
    NEGATIVE_SAMPLING_RATIO = 5
    train_ds = skip_grams_with_negative_sampling_dataset(positive_ds,
                                                         negative_ds,
                                                         NEGATIVE_SAMPLING_RATIO)

    # モデルとタスクを定義する
    EMBEDDING_SIZE = 100  # 埋め込み次元数
    criterion = BinaryCrossentropy()
    optimizer = Adam(learning_rate=1e-2)
    model = SkipGramWithNegativeSampling(vocab_size, EMBEDDING_SIZE)
    model.compile(optimizer=optimizer,
                  loss=criterion,
                  )

    # データセットを準備する
    TRAIN_BATCH_SIZE = 2 ** 14
    train_ds = train_ds.batch(TRAIN_BATCH_SIZE)
    train_ds = train_ds.prefetch(buffer_size=tf.data.AUTOTUNE)
    train_ds = train_ds.cache()

    print('caching train data...')
    num_of_steps_per_epoch = sum(1 for _ in tqdm(train_ds))
    print(f'{num_of_steps_per_epoch=}')
    train_ds = train_ds.repeat()

    callbacks = [
        # WordSim353 データセットを使って単語間の類似度を相関係数で確認する
        WordSimilarity353Callback(word_to_id),
    ]
    # 学習する
    model.fit(train_ds,
              steps_per_epoch=num_of_steps_per_epoch,
              epochs=5,
              callbacks=callbacks,
              verbose=1,
              )

    # モデルから学習させたレイヤーの重みを取り出す
    model_layers = {layer.name: layer for layer in model.layers}
    embedding_layer = model_layers['word_embedding']
    word_vectors = embedding_layer.weights[0].numpy()

    # 単語を表すベクトル間のコサイン類似度を計算する
    cs_matrix = cosine_similarity_matrix(word_vectors)
    # ID -> 単語
    id_to_word = {value: key for key, value in word_to_id.items()}

    # いくつか似ているベクトルを持った単語を確認してみる
    example_words = ['you', 'year', 'car', 'toyota']
    for target_word in example_words:
        # ID に変換した上で最も似ている単語とそのベクトルを取り出す
        print(f'The most similar words of "{target_word}"')
        target_word_id = word_to_id[target_word]
        similarities = cs_matrix[target_word_id, :]
        top_n_most_similars = most_similar_words(similarities, top_n=6)
        # 先頭は自分自身になるので取り除く
        next(top_n_most_similars)
        # 単語と類似度を表示する
        for rank, (similar_word_id, similarity) in enumerate(top_n_most_similars, start=1):
            similar_word = id_to_word[similar_word_id]
            print(f'TOP {rank}: {similar_word} = {similarity}')
        print('-' * 50)

    # いくつか類推語を確認してみる
    analogies = [
        ('king', 'man', 'woman'),
        ('took', 'take', 'go'),
        ('cars', 'car', 'child'),
        ('better', 'good', 'bad'),
    ]
    for word1, word2, word3 in analogies:
        print(f'The most similar words of "{word1}" - "{word2}" + "{word3}"')
        word1_vec = word_vectors[word_to_id[word1]]
        word2_vec = word_vectors[word_to_id[word2]]
        word3_vec = word_vectors[word_to_id[word3]]
        new_vec = word1_vec - word2_vec + word3_vec
        similarities = cosine_similarity_one_to_many(new_vec, word_vectors)
        top_n_most_similars = most_similar_words(similarities)
        # 単語と類似度を表示する
        for rank, (similar_word_id, similarity) in enumerate(top_n_most_similars, start=1):
            similar_word = id_to_word[similar_word_id]
            print(f'TOP {rank}: {similar_word} = {similarity}')
        print('-' * 50)


if __name__ == '__main__':
    main()

上記を実行してみよう。 コーパスの前処理に結構時間がかかる。 今回使った環境では 1 分 47 秒かかった。 学習に関しては、WordSim353 データセットを使った内省的評価で 4 エポック目にはサチる感じ。

$ python sgns.py
caching train data...
2802it [01:47, 26.01it/s]
num_of_steps_per_epoch=2802
Epoch 1/5
2802/2802 [==============================] - 50s 18ms/step - loss: 0.3845
Pearson's r score with WordSim353: 0.2879142572631919
Epoch 2/5
2802/2802 [==============================] - 47s 17ms/step - loss: 0.3412
Pearson's r score with WordSim353: 0.367370159567898
Epoch 3/5
2802/2802 [==============================] - 48s 17ms/step - loss: 0.3307
Pearson's r score with WordSim353: 0.3898624474454972
Epoch 4/5
2802/2802 [==============================] - 48s 17ms/step - loss: 0.3248
Pearson's r score with WordSim353: 0.39416965929977094
Epoch 5/5
2802/2802 [==============================] - 48s 17ms/step - loss: 0.3211
Pearson's r score with WordSim353: 0.39503500234447125
The most similar words of "you"
TOP 1: your = 0.6509251594543457
TOP 2: i = 0.6414255499839783
TOP 3: we = 0.569475531578064
TOP 4: re = 0.5692735314369202
TOP 5: someone = 0.5565952658653259
--------------------------------------------------
The most similar words of "year"
TOP 1: earlier = 0.5841510891914368
TOP 2: month = 0.5817509293556213
TOP 3: period = 0.572060763835907
TOP 4: ago = 0.5633273720741272
TOP 5: last = 0.5298276543617249
--------------------------------------------------
The most similar words of "car"
TOP 1: cars = 0.6105561852455139
TOP 2: luxury = 0.5986034870147705
TOP 3: truck = 0.563898503780365
TOP 4: ford = 0.5133273005485535
TOP 5: auto = 0.5039612054824829
--------------------------------------------------
The most similar words of "toyota"
TOP 1: infiniti = 0.6949869394302368
TOP 2: honda = 0.6433103084564209
TOP 3: mazda = 0.6296555995941162
TOP 4: lexus = 0.6275536417961121
TOP 5: motor = 0.6175971627235413
--------------------------------------------------
The most similar words of "king" - "man" + "woman"
TOP 1: king = 0.7013309001922607
TOP 2: woman = 0.6033017039299011
TOP 3: burger = 0.4819037914276123
TOP 4: md = 0.46715104579925537
TOP 5: egg = 0.45565301179885864
--------------------------------------------------
The most similar words of "took" - "take" + "go"
TOP 1: took = 0.6121359467506409
TOP 2: go = 0.6096295714378357
TOP 3: stands = 0.4905664920806885
TOP 4: hammack = 0.45641931891441345
TOP 5: refuge = 0.4478893578052521
--------------------------------------------------
The most similar words of "cars" - "car" + "child"
TOP 1: child = 0.8112377524375916
TOP 2: women = 0.5026379823684692
TOP 3: cars = 0.4889577627182007
TOP 4: patients = 0.4796864092350006
TOP 5: custody = 0.47176921367645264
--------------------------------------------------
The most similar words of "better" - "good" + "bad"
TOP 1: bad = 0.6880872249603271
TOP 2: better = 0.510699987411499
TOP 3: involved = 0.45395123958587646
TOP 4: serious = 0.4192639887332916
TOP 5: hardest = 0.41672468185424805
--------------------------------------------------

CBOW のときは WordSim353 の評価が 0.25 前後だったことを考えると、今回の 0.39 前後という結果はかなり良く見える。 ただし、CBOW の実験ではコンテキストウィンドウサイズが 1 だったのに対し、上記の SGNS では 5 を使っている。 同じコンテキストウィンドウサイズの 1 に揃えると、評価指標は 0.28 前後まで落ちる。

学習が終わってから確認している類似語は、CBOW のときと同じでなかなか良い感じに見える。 一方で、類推語の方はほとんど上手くいっていない。 類推語を解ける位の単語埋め込みを学習するには、もっと大きなコーパスが必要なのだろうか?

ゼロから作るDeep Learning ❷ ―自然言語処理編

ゼロから作るDeep Learning ❷ ―自然言語処理編

  • 作者:斎藤 康毅
  • 発売日: 2018/07/21
  • メディア: 単行本(ソフトカバー)

Python: TensorFlow の Dataset API を試す

ニューラルネットワークの並列計算には、今や GPU や TPU を使うのが一般的になっている。 一方で、それらのデバイスにデータを供給する部分がボトルネックにならないよう気をつけなければいけない。 具体的には、デバイスが計算している最中に、次に計算するデータを用意しておく必要がある。 今回は、TensorFlow で効率的なデータ供給を実現するために用意された Dataset API を試してみる。

使った環境は次のとおり。

$ sw_vers
ProductName:    macOS
ProductVersion: 11.2.1
BuildVersion:   20D74
$ python -V             
Python 3.8.7
$ pip list | grep "tensorflow "
tensorflow             2.4.1

もくじ

下準備

あらかじめ、TensorFlow をインストールしておく。

$ pip install tensorflow

インストールできたら Python の REPL を起動する。

$ python

そして、TensorFlow をインポートする。

>>> import tensorflow as tf

Dataset API について

Dataset API を使うと、TensorFlow/Keras で定義したモデルに対して、効率的に Tensor を供給するためのパイプライン処理が記述できる。 たとえば、tensorflow.keras.Modelfit() メソッドには NumPy 配列や Tensor オブジェクト以外にも、Dataset というオブジェクトを渡すことができる。

www.tensorflow.org

Dataset オブジェクトの作り方

Dataset オブジェクトはいくつかの作り方がある。

Dataset.range()

一番シンプルなのが Dataset.range() を使うやり方かな。 これはおそらくパイプラインをデバッグするときに活躍する。

>>> ds = tf.data.Dataset.range(1, 6)
>>> ds
<RangeDataset shapes: (), types: tf.int64>

Dataset オブジェクトはイテラブルなので iter() 関数を使うとイテレータが返ってくる。

>>> it = iter(ds)
>>> it
<tensorflow.python.data.ops.iterator_ops.OwnedIterator object at 0x14cd26d30>

イテレータに対して next() を使うと、Tensor オブジェクトが得られる。

>>> next(it)
<tf.Tensor: shape=(), dtype=int64, numpy=1>
>>> next(it)
<tf.Tensor: shape=(), dtype=int64, numpy=2>
>>> next(it)
<tf.Tensor: shape=(), dtype=int64, numpy=3>

中身を一通り確認したいときは as_numpy_iterator() メソッドを使うのが楽かな。

>>> list(ds.as_numpy_iterator())
[1, 2, 3, 4, 5]

Dataset.from_tensor_slices()

既存の配列や辞書なんかから作りたいときは Dataset.from_tensor_slices() を使うと良い。 たとえば、配列から作るときはこんな感じ。

>>> slices = tf.data.Dataset.from_tensor_slices([2, 3, 5, 7, 11])
>>> list(slices.as_numpy_iterator())
[2, 3, 5, 7, 11]

多次元のときはタプルか配列でネストする。

>>> slices = tf.data.Dataset.from_tensor_slices([(1, 1), (2, 3), (5, 8), (13, 21)])
>>> list(slices.as_numpy_iterator())
[array([1, 1], dtype=int32), array([2, 3], dtype=int32), array([5, 8], dtype=int32), array([13, 21], dtype=int32)]

単純な Tensor だけでなく、辞書にネストした Tensor を要素として返すこともできるらしい。

>>> elements = {
...     'x': ([1, 4, 7], [2, 5, 8]),
...     'y': [3, 6, 9],
... }
>>> dict_ds = tf.data.Dataset.from_tensor_slices(elements)
>>> list(dict_ds.as_numpy_iterator())
[{'x': (1, 2), 'y': 3}, {'x': (4, 5), 'y': 6}, {'x': (7, 8), 'y': 9}]

Dataset.from_tensor()

既存の Tensor を流用する API としては Dataset.from_tensor() もある。 こちらは各 Tensor の形状が一致していなくてもエラーにならない。

>>> t1 = tf.constant([1, 2, 3])
>>> t2 = tf.constant([4, 5])
>>> ts_ds = tf.data.Dataset.from_tensors((t1, t2))
>>> list(ts_ds.as_numpy_iterator())
[(array([1, 2, 3], dtype=int32), array([4, 5], dtype=int32))]

Dataset.from_generator()

割とよく使いそうなのが Dataset.from_generator() で、これはジェネレータを元に Dataset オブジェクトが作れる。

何か適当にジェネレータ関数を用意する。

>>> def g():
...     yield 1
...     yield 2
...     yield 3
... 

あとはそれを元に作れる。 ジェネレータをそのまま渡すと、一回読み出して終わりになってしまうので lambda 式と組み合わせるのが良いと思う。

>>> g_ds = tf.data.Dataset.from_generator(lambda: g(), output_types=tf.uint32)
>>> list(g_ds.as_numpy_iterator())
[1, 2, 3]

多次元だったりデータ型が複合的なときはこんな感じ。

>>> def g():
...     yield ('a', 1)
...     yield ('b', 2)
...     yield ('c', 3)
... 
>>> g_ds = tf.data.Dataset.from_generator(lambda: g(), output_types=(tf.string, tf.uint32))
>>> list(g_ds.as_numpy_iterator())
[(b'a', 1), (b'b', 2), (b'c', 3)]

Dataset.list_files()

ファイルを元に Dataset オブジェクトを作りたいときは Dataset.list_files() を使うのが便利そう。

たとえばカレンとワーキングディレクトリに、次のようにテキストファイルを用意しておく。

$ echo "Hello, World" > greet.txt
$ echo "Konnichiwa, Sekai" > aisatsu.txt

Dataset.list_files() にファイル名のパターンを入れると、それに該当するファイル名が抽出される。

>>> files_ds = tf.data.Dataset.list_files('*.txt')
>>> list(files_ds.as_numpy_iterator())
[b'./aisatsu.txt', b'./greet.txt']

例えば、これを後述する map() とかで処理すると並列化が楽にできる。

基本的な情報を得る

Dataset オブジェクトには、基本的な情報を得るためのアトリビュートがいくつかある。

Dataset#elementspec

たとえば、得られるデータの形状や型は elementspec というプロパティで確認できる。

>>> ds.element_spec
TensorSpec(shape=(), dtype=tf.int64, name=None)

Dataset#cardinality()

サイズが固定だったり、いくつかの条件を満たすときは cardinality() というメソッドで要素の種類が得られる。

>>> ds.cardinality()
<tf.Tensor: shape=(), dtype=int64, numpy=5>

len()

Dataset オブジェクトのサイズも、あらかじめ分かっているときに関しては得られる。

>>> len(ds)
5

とはいえ、加工するとすぐに得られなくなるので使い所はなかなか難しい。

>>> len(ds.repeat())
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/amedama/.virtualenvs/py38/lib/python3.8/site-packages/tensorflow/python/data/ops/dataset_ops.py", line 452, in __len__
    raise TypeError("dataset length is infinite.")
TypeError: dataset length is infinite.

データセットを加工する

ここでは、Dataset オブジェクトを加工する方法について書く。 加工には、関数型プログラミング的なインターフェースが用意されている。

Dataset#filter()

特定の条件を満たす要素だけを残したいときには filter() メソッドを使う。 以下は偶数の要素だけを抽出する例。

>>> even_ds = ds.filter(lambda x: x % 2 == 0)
>>> list(even_ds.as_numpy_iterator())
[2, 4]

Dataset#reduce()

要素を集約したいときは reduce() メソッドが使える。 以下は要素をすべて足していく例。

>>> ds.reduce(initial_state=tf.Variable(0, dtype=tf.int64),
...           reduce_func=lambda cumulo, current: cumulo + current)
<tf.Tensor: shape=(), dtype=int64, numpy=15>

Dataset#map()

すべての要素に処理を適用したいときは map() メソッドを使う。 以下はすべての要素を 2 倍する例。

>>> double_ds = ds.map(lambda x: x * 2)
>>> list(double_ds.as_numpy_iterator())
[2, 4, 6, 8, 10]

map() メソッドは処理を並列化できる。 並列度は num_parallel_calls オプションで指定する。 また、順序を保持する必要があるかを deterministic オプションで指定する。 以下は、4 並列で順序を保持しない場合の例。

>>> double_ds = ds.map(lambda x: x * 2, num_parallel_calls=4, deterministic=False)
>>> list(double_ds.as_numpy_iterator())
[4, 2, 6, 8, 10]

24 の順番が入れ替わっていることがわかる。 パフォーマンスを追い求めるときは deterministic=False が推奨されている。

また、並列度をランタイムで良い感じに決めてほしいときは tf.data.AUTOTUNE という定数を指定すれば良いっぽい。

>>> ds.map(lambda x: x * 2, num_parallel_calls=tf.data.AUTOTUNE, deterministic=False)

Dataset#flat_map()

要素が入れ子構造になっているときに、開きながら処理したいときは flat_map() を使うと良い。 例えば、次のような 2 次元の要素を返す Dataset があったとする。

>>> nested_ds = tf.data.Dataset.from_tensor_slices([[1, 2], [3, 4], [5, 6]])
>>> list(nested_ds.as_numpy_iterator())
[array([1, 2], dtype=int32), array([3, 4], dtype=int32), array([5, 6], dtype=int32)]

ただ単に flatten したいなら、次のように関数の中で Dataset オブジェクトを返すようにする。

>>> flatten_ds = nested_ds.flat_map(lambda x: tf.data.Dataset.from_tensor_slices(x))
>>> list(flatten_ds.as_numpy_iterator())
[1, 2, 3, 4, 5, 6]

flatten して、さらに各要素に処理をしたいなら、メソッドチェーンしてこんな感じ。 もちろん、flatten した Dataset オブジェクトに map() メソッドを呼んでも良いんだけど。

>> flatten_double_ds = nested_ds.flat_map(lambda x: tf.data.Dataset.from_tensor_slices(x).map(lambda x: x * 2))
>>> list(flatten_double_ds.as_numpy_iterator())
[2, 4, 6, 8, 10, 12]

Dataset#interleave()

flat_map() の並列化版とでもいえるようなのが interleave() メソッド。

先ほどと同じように、ただ flatten するような処理をしてみよう。

>>> flatten_ds = nested_ds.interleave(lambda x: tf.data.Dataset.from_tensor_slices(x))
>>> list(flatten_ds.as_numpy_iterator())
[1, 3, 5, 2, 4, 6]

すると、要素の順番が入れ替わっていることがわかる。 これは、デフォルトで処理が並列化されているため。

interleave() と同じ結果にしたいときは cycle_length1 に指定する必要がある。

>>> flatten_ds = nested_ds.interleave(lambda x: tf.data.Dataset.from_tensor_slices(x), cycle_length=1)
>>> list(flatten_ds.as_numpy_iterator())
[1, 2, 3, 4, 5, 6]

cycle_length って何よ?って話になるんだけど、これは Dataset オブジェクトをキューのように処理する (仮想的な) Consumer の数を指している。 例えば、cycle_length2 のときは、キューが交互に処理されていくということ。 もちろん、新しい Dataset オブジェクトに要素が積まれる順番は早いもの勝ち。 試しに cycle_length=2 で処理してみよう。

>>> flatten_ds = nested_ds.interleave(lambda x: tf.data.Dataset.from_tensor_slices(x), cycle_length=2)
>>> list(flatten_ds.as_numpy_iterator())
[1, 3, 2, 4, 5, 6]

要素の順番が入れ替わった。

一度に処理する要素の数も block_length で指定できる。 試しに 2 を指定してみよう。

>>> flatten_ds = nested_ds.interleave(lambda x: tf.data.Dataset.from_tensor_slices(x), cycle_length=2, block_length=2)
>>> list(flatten_ds.as_numpy_iterator())
[1, 2, 3, 4, 5, 6]

結果からは分かりにくいけど、上記は [1, 2] -> [3, 4] -> [5, 6] という単位で処理されているはず。

ちょっとややこしいのは num_parallel_calls っていうオプションもあるところ。 ドキュメントによると、こちらのオプションに tf.data.AUTOTUNE を指定しておけば cycle_length は自動的に最大の並列度になるらしい。

>>> flatten_ds = nested_ds.interleave(lambda x: tf.data.Dataset.from_tensor_slices(x), num_parallel_calls=tf.data.AUTOTUNE, deterministic=False)
>>> list(flatten_ds.as_numpy_iterator())
[1, 3, 5, 2, 4, 6]

要するに、以下のように使えば複数のファイルを並列で処理できて良い感じってことみたい。 複数のテキストファイルに書かれている文章を、一つの Dataset オブジェクトとしてまとめる例。

>>> files_ds = tf.data.Dataset.list_files('*.txt')
>>> lines_ds = files_ds.interleave(lambda filepath: tf.data.TextLineDataset(filepath), num_parallel_calls=tf.data.AUTOTUNE, deterministic=False)
>>> list(lines_ds.as_numpy_iterator())
[b'Konnichiwa, Sekai', b'Hello, World']

いきなり tf.data.TextLineDataset が登場してるけど、これはテキストファイルを処理するために用意された Dataset オブジェクトの子クラス。 ファイルに書かれたテキストを一行ずつ読み出せる。 こういった、特定の処理に特化したクラスもいくつか用意されているが、ここでは詳しく扱わない。

Dataset#shuffle()

要素の順番を意図的に入れ替えたいときは shuffle() メソッドを使う。 ただし、ストリーミング的にデータを処理するので、バッファ単位でシャッフルすることになる。

バッファというのは、いわばキューのようなもので、そのキューの中で要素がランダムにシャッフルされる。 たとえば buffer_size1 なら、要素の順番は決して入れ替わらない。

>>> list(ds.shuffle(buffer_size=1).as_numpy_iterator())
[1, 2, 3, 4, 5]

buffer_size2 なら、要素の順番が入れ替わる。 ただし、最初の方にある要素が最後の方までいくのには、相当な運が必要になる。 なぜならシャッフルの処理で、そのままの位置にあっては取り出されてしまうため、連続で後ろに移動する必要がある。

>>> list(ds.shuffle(buffer_size=2).as_numpy_iterator())
[2, 1, 3, 5, 4]

そのため、バッファのサイズが大きいほど、より広い範囲で値が入れ替わりやすくなる。

>>> list(ds.shuffle(buffer_size=10).as_numpy_iterator())
[2, 1, 4, 3, 5]

Dataset#enumerate()

組み込み関数の enumerate() と同じ概念だけど、enumerate() メソッドを使うと連番の添字を付与できる。

>> enum_ds = ds.enumerate(start=100)
>>> list(enum_ds.as_numpy_iterator())
[(100, 1), (101, 2), (102, 3), (103, 4), (104, 5)]

Dataset#window()

window() メソッドを使うと、指定した数だけずらした要素が取れる。 活躍するとしたら、自然言語処理の分布仮説に関わる処理をするところかな。

例えば、1 つずらした要素を 2 つずつ得たい場合は次のようにする。

>>> shifted_ds = ds.window(size=2, shift=1)

window() メソッドはちょっとクセがあって、そのままだと as_numpy_iterator() メソッドが実行できない。

>>> shifted_ds.as_numpy_iterator()
Traceback (most recent call last):
...
TypeError: Dataset.as_numpy_iterator() does not support datasets containing <class 'tensorflow.python.data.ops.dataset_ops.DatasetV2'>

これは、Dataset オブジェクトから返る要素自体が Dataset オブジェクトになっているため。 なので、こんな感じで確認することになる。

>>> for window in shifted_ds:
...     print(list(window.as_numpy_iterator()))
... 
[1, 2]
[2, 3]
[3, 4]
[4, 5]
[5]

上記を見てわかるとおり、そのままだと末尾に size に満たない要素が得られてしまう。 これを避けるためには drop_remainder を有効にする。

>>> shifted_ds = ds.window(size=2, shift=1, drop_remainder=True)
>>> for window in shifted_ds:
...     print(list(window.as_numpy_iterator()))
... 
[1, 2]
[2, 3]
[3, 4]
[4, 5]

データ供給の効率を向上させる

ここでは Dataset を扱う上で、データ供給を効率的に行うために必要な処理を書いていく。 なお、処理を並列化する num_parallel_calls みたいなオプションについては、既にいくつかのメソッドで紹介した。

Dataset#prefetch()

Dataset オブジェクトの各要素は、イテレータから要素を取り出そうとするタイミングで評価される。 パイプラインが長大だとレイテンシが増加するので、prefetch() メソッドを使うことで先に読み込んでおくことができる。

>>> prefetched_ds = ds.prefetch(buffer_size=5)
>>> list(prefetched_ds.as_numpy_iterator())
[1, 2, 3, 4, 5]

Dataset#cache()

cache() メソッドを使うと、一度呼び出した Dataset オブジェクトの要素をファイルにキャッシュできる。

要素をシャッフルする加工が入ったパイプラインを使って動作を確認してみよう。 そのままだと、毎回シャッフルの処理が評価されるため得られる結果が異なる。

>>> shuffled_ds = ds.shuffle(buffer_size=10)
>>> list(shuffled_ds.as_numpy_iterator())
[4, 1, 2, 5, 3]
>>> list(shuffled_ds.as_numpy_iterator())
[5, 1, 4, 3, 2]

一方で cache() すると、一旦要素をすべて読み出し尽くした Dataset オブジェクトからは何度読んでも同じ順番で要素が返ってくる。

>>> cached_ds = shuffled_ds.cache()
>>> list(cached_ds.as_numpy_iterator())
[5, 1, 3, 4, 2]
>>> list(cached_ds.as_numpy_iterator())
[5, 1, 3, 4, 2]

ランダムな処理ではデータの多様性が失われる恐れはあるものの、時間のかかる処理がパイプラインに含まれるときには有効なはず。 cache() メソッドはお手軽な一方で、もうちょっと真面目にやるなら TFRecord 形式で永続化した方が良いかもしれない。

www.tensorflow.org

Dataset#shard()

データベースの負荷分散技術としてもよく知られているシャーディング。 shard() メソッドを使うと、データセットを複数に分割できる。 たぶん、分散学習とか複数のデバイスで学習するときに使われるんだと思う。

例えば、以下はシャード数に 2 を指定してデータセットを分割した場合。

>>> ds1of2 = ds.shard(num_shards=2, index=0)
>>> ds2of2 = ds.shard(num_shards=2, index=1)

確認すると、データが交互に分けられている。

>>> list(ds1of2.as_numpy_iterator())
[1, 3, 5]
>>> list(ds2of2.as_numpy_iterator())
[2, 4]

Dataset#batch()

現在のニューラルネットワークの学習で一般的に用いられるミニバッチ勾配降下法では、複数の要素をミニバッチという単位で読み込む。 batch() メソッドを使うと、Dataset オブジェクトからミニバッチの単位毎にデータを読み込めるようになる。

>>> batched_ds = ds.batch(3)
>>> list(batched_ds.as_numpy_iterator())
[array([1, 2, 3]), array([4, 5])]

drop_remainder を有効にすると、バッチサイズに満たない残りの要素は読み出されなくなる。

>>> batched_ds = ds.batch(3, drop_remainder=True)
>>> list(batched_ds.as_numpy_iterator())
[array([1, 2, 3])]

ちなみに batch() メソッドを適用した Dataset オブジェクトは、要素がミニバッチの単位で処理される点に注意が必要となる。 たとえば、prefetch() メソッドを呼んだときはバッファが 1 つ消費される毎にミニバッチが 1 回読み込まれることになる。

Dataset#padded_batch()

Dataset オブジェクトが要素毎に shape が異なる Tensor を返す場合には、padded_batch() を使うと揃えることができる。

例えば、以下のような感じで各要素の次元が異なる場合を考える。 自然言語処理とかで文の長さが違うとかかな。

>>> mixed_shape_ds = ds.map(lambda x: tf.fill([x], x))
>>> list(mixed_shape_ds.as_numpy_iterator())
[array([1]), array([2, 2]), array([3, 3, 3]), array([4, 4, 4, 4]), array([5, 5, 5, 5, 5])]

例えば 2 つずつ要素を取り出す場合で試してみる。

>>> padded_ds = mixed_shape_ds.padded_batch(2)

取り出される要素を確認すると、各バッチ毎に次元数が揃っていることがわかる。

>>> from pprint import pprint
>>> pprint(list(padded_ds.as_numpy_iterator()))
[array([[1, 0],
       [2, 2]]),
 array([[3, 3, 3, 0],
       [4, 4, 4, 4]]),
 array([[5, 5, 5, 5, 5]])]

padded_shapes オプションを使えば、明示的にパディングするサイズを指定することもできる。

>>> padded_ds = mixed_shape_ds.padded_batch(2, padded_shapes=(10))
>>> pprint(list(padded_ds.as_numpy_iterator()))
[array([[1, 0, 0, 0, 0, 0, 0, 0, 0, 0],
       [2, 2, 0, 0, 0, 0, 0, 0, 0, 0]]),
 array([[3, 3, 3, 0, 0, 0, 0, 0, 0, 0],
       [4, 4, 4, 4, 0, 0, 0, 0, 0, 0]]),
 array([[5, 5, 5, 5, 5, 0, 0, 0, 0, 0]])]

パディングに使う値は padding_values オプションで指定できる。

>>> padded_ds = mixed_shape_ds.padded_batch(2, padded_shapes=(10), padding_values=tf.constant(-1, dtype=tf.int64))
>>> pprint(list(padded_ds.as_numpy_iterator()))
[array([[ 1, -1, -1, -1, -1, -1, -1, -1, -1, -1],
       [ 2,  2, -1, -1, -1, -1, -1, -1, -1, -1]]),
 array([[ 3,  3,  3, -1, -1, -1, -1, -1, -1, -1],
       [ 4,  4,  4,  4, -1, -1, -1, -1, -1, -1]]),
 array([[ 5,  5,  5,  5,  5, -1, -1, -1, -1, -1]])]

バッチサイズに満たない要素を切り捨てる場合に drop_remainder オプションを使うのは batch() メソッドと同様。

>>> padded_ds = mixed_shape_ds.padded_batch(2, padded_shapes=(10), padding_values=tf.constant(-1, dtype=tf.int64), drop_remainder=True)
>>> pprint(list(padded_ds.as_numpy_iterator()))
[array([[ 1, -1, -1, -1, -1, -1, -1, -1, -1, -1],
       [ 2,  2, -1, -1, -1, -1, -1, -1, -1, -1]]),
 array([[ 3,  3,  3, -1, -1, -1, -1, -1, -1, -1],
       [ 4,  4,  4,  4, -1, -1, -1, -1, -1, -1]])]

Dataset#unbatch()

batch() メソッドとは反対に、各要素単位で読み込めるように戻すのが unbatch() メソッドになる。 パフォーマンス向上などを目的として、パイプラインの中にバッチ単位で処理する箇所があると、意外と使うことになる。

>>> batched_ds = ds.batch(3)
>>> unbatched_ds = batched_ds.unbatch()
>>> list(unbatched_ds.as_numpy_iterator())
[1, 2, 3, 4, 5]

ちなみに入れ子になった構造を flatten するときにも使えたりする。

>>> nested_ds = tf.data.Dataset.from_tensor_slices([[1, 2], [3, 4], [5, 6]])
>>> list(nested_ds.unbatch().as_numpy_iterator())
[1, 2, 3, 4, 5, 6]

複数のデータセットをまとめる

パイプラインによっては、複数のデータセットをひとまとめにして扱いたいことがある。

Dataset#concatenate()

複数の Dataset オブジェクトを直列につなげたいときは concatenate() メソッドを使う。

>>> ds1 = tf.data.Dataset.range(0, 5)
>>> ds2 = tf.data.Dataset.range(5, 10)
>>> concat_ds = ds1.concatenate(ds2)
>>> list(concat_ds.as_numpy_iterator())
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Dataset.zip()

concatenate() とは反対に、並列にまとめたいときは zip() が使える。

>>> ds1 = tf.data.Dataset.range(0, 5)
>>> ds2 = tf.data.Dataset.range(5, 10)
>>> zip_ds = tf.data.Dataset.zip((ds1, ds2))
>>> list(zip_ds.as_numpy_iterator())
[(0, 5), (1, 6), (2, 7), (3, 8), (4, 9)]

3 つ以上をまとめることもできる。 また、要素数に違いがあるときは短いものに合わせられる。

>>> ds3 = tf.data.Dataset.range(10, 19)
>>> zip_ds = tf.data.Dataset.zip((ds1, ds2, ds3))
>>> list(zip_ds.as_numpy_iterator())
[(0, 5, 10), (1, 6, 11), (2, 7, 12), (3, 8, 13), (4, 9, 14)]

まとめる Dataset オブジェクトは shape が異なっていても問題ない。

>>> zip_ds = tf.data.Dataset.zip((ds1, ds2, ds3.batch(2)))
>>> list(zip_ds.as_numpy_iterator())
[(0, 5, array([10, 11])), (1, 6, array([12, 13])), (2, 7, array([14, 15])), (3, 8, array([16, 17])), (4, 9, array([18]))]

その他の処理

その他の分類が難しい処理たち。

Dataset#repeat()

repeat() メソッドを使うと、Dataset オブジェクトが返す内容を繰り返すようにできる。

例えば、2 回繰り返すようにしてみる。

>>> repeat2_ds = ds.repeat(2)
>>> list(repeat2_ds.as_numpy_iterator())
[1, 2, 3, 4, 5, 1, 2, 3, 4, 5]

繰り返しの回数を明示的に指定しない場合には無限ループになる。

>>> inf_loop_ds = ds.repeat()
>>> [next(it).numpy() for _ in range(20)]
[1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5]

長さやカーディナリティは取れなくなる。

>>> len(inf_loop_ds)
Traceback (most recent call last):
...
TypeError: dataset length is infinite.
>>> inf_loop_ds.cardinality().numpy()
-1

Dataset#apply()

Dataset オブジェクトを加工するための一連の処理を、一度に適用するのに Dataset#apply() メソッドが使える。

例えば、偶数の要素だけを残して値を 2 倍にする処理を考える。 そのためには、次のように Dataset オブジェクトを受け取って、処理を適用した Dataset オブジェクトを返す関数を定義する。

>>> def pipeline(ds: tf.data.Dataset) -> tf.data.Dataset:
...     odd_ds = ds.filter(lambda x: x % 2 != 0)
...     double_ds = odd_ds.map(lambda x: x * 2)
...     return double_ds
... 

あとは、関数を引数にして apply() メソッドを呼ぶだけ。

>>> applied_ds = ds.apply(pipeline)
>>> list(applied_ds.as_numpy_iterator())
[2, 6, 10]

ただ、これってメソッドチェーンの形で処理が書けるだけなので、どこに嬉しさがあるのかはあんまりよくわからない。

>>> list(pipeline(ds).as_numpy_iterator())
[2, 6, 10]

Dataset#take()

take() メソッドを使うと、先頭の要素だけを返す Dataset オブジェクトを作れる。

>>> head_ds = ds.take(2)
>>> list(head_ds.as_numpy_iterator())
[1, 2]

Dataset#skip()

take() メソッドとは反対に skip() メソッドを使うと、先頭の要素を読み飛ばす Dataset オブジェクトが作れる。

>>> tail_ds = ds.skip(2)
>>> list(tail_ds.as_numpy_iterator())
[3, 4, 5]

参考

www.tensorflow.org

www.tensorflow.org

www.tensorflow.org

Python: NumPy の empty() / zeros() を呼び出した直後は物理メモリの使用量が増えない

表題のとおりなんだけど、NumPy の empty()zeros() は呼び出した直後はメモリの RSS (Resident Set Size) が増えない。 ようするに、呼び出した直後は配列に物理メモリが割り当てられていない、ということ。 今回は、そのせいでちょっとハマったのでメモとして詳細について書き残しておく。 ただし、この事象が起こるのは仮想記憶に COW (Copy-On-Write) の機構を備えた OS、という条件はある。

使った環境は次のとおり。

$ lsb_release -a
No LSB modules are available.
Distributor ID: Ubuntu
Description:    Ubuntu 20.04.2 LTS
Release:    20.04
Codename:   focal
$ uname -r
5.4.0-1036-gcp

TL; DR

  • NumPy の empty() / zeros() は内部的に calloc(3) / malloc(3) を呼び出している
    • 仮想記憶の COW (Copy-On-Write) が働くので、実際にメモリの書き込みが生じないと物理メモリが割り当てられない
  • OS がメモリの管理にページング方式を採用していると、使った領域の分だけ物理メモリが増えていく
    • そのため zeros() で作ったスパースな配列を操作していると突然物理メモリが爆発して戸惑う
  • 一方で ones()zeros_like()malloc(3) した上でメモリに書き込みが生じるため最初から物理メモリが割り当てられる
  • 最初から必要な物理メモリを見積もりたいときは zeros() は使わず empty() した上で 0 を書き込むと良い

もくじ

下準備

今回は NumPy のソースコードをデバッガで追いかけるので、ちょっと準備の手数が多い。

まずは必要なパッケージをインストールしておく。

$ sudo apt-get -y install git python3-venv gdb

Python の仮想環境を作ってアクティベートする。

$ python3 -m venv example
$ source example/bin/activate

Python の REPL からメモリの情報を取りたいので psutil をインストールしておく。

$ pip3 install psutil

NumPy のリポジトリをクローンして、リリース版のタグをチェックアウトする。

$ git clone https://github.com/numpy/numpy.git
$ cd numpy/
$ git checkout -b v1.20.1 tags/v1.20.1

依存パッケージをインストールしたら、デバッグ用の情報を残して拡張モジュールをビルドする。

$ pip3 install -r test_requirements.txt
$ CFLAGS='-Wall -O0 -g' python3 setup.py build_ext -i

あとは開発モードでインストールする。

$ pip3 install -e .

Python の REPL を起動したら準備完了。

$ python3

実験してみる

それでは、実際に NumPy の empty()zeros() を呼び出して仮想メモリと物理メモリの使用状況を確認してみよう。

まずは、仮想メモリと物理メモリの使用状況を表示する関数を定義する。

>>> import psutil
>>> def print_memory_usage():
...     process = psutil.Process()
...     mem_info = process.memory_info()
...     print(f'vms={mem_info.vms // 1024 // 1024}MB rss={mem_info.rss // 1024 // 1024}MB')
...

定義したら NumPy をインポートする。

>>> import numpy as np

メモリの初期状態はこんな感じ。

>>> print_memory_usage()
vms=44MB rss=26MB

zeros() を使って配列を作ってみよう。

>>> x = np.zeros((10_000, 10_000))
>>> print_memory_usage()
vms=807MB rss=26MB

すると、仮想メモリの使用量は増えているが、物理メモリの使用量は増えていないことがわかる。

一旦、配列を削除しよう。 すると、仮想メモリの使用量も減る。

>>> del x
>>> print_memory_usage()
vms=44MB rss=26MB

empty() も同じように試してみる。

>>> x = np.empty((10_000, 10_000))
>>> print_memory_usage()
vms=807MB rss=26MB

こちらも、同じように仮想メモリの使用量しか増えない。

では、ones() はどうか?

>>> del x
>>> print_memory_usage()
vms=44MB rss=26MB
>>> x = np.ones((10_000, 10_000))
>>> print_memory_usage()
vms=807MB rss=789MB

なんと、これは物理メモリまで増えるのだ。

同じように zeros_like() も、呼び出した直後に物理メモリの割り当てが生じる。

>>> del x
>>> x = np.empty((10_000, 10_000))
>>> print_memory_usage()
vms=807MB rss=26MB
>>> y = np.zeros_like(x)
>>> print_memory_usage()
vms=1570MB rss=789MB

ハマった状況について

この振る舞いによって、実際に自分がハマったパターンについて説明しておく。 zeros() を使って初期化したスパースな配列を操作していたところ、途中までは何も問題がなかった。 しかし、配列に対して np.sum() を使ったところ物理メモリが突如として大量に必要となってプロセスが落ちた。

もうお分かりだと思うけど、そもそも初期化した時点で物理メモリに収まらないサイズの配列だった。 しかし、スパースだったが故に、途中まで必要とする物理メモリが少なかった。 今回であれば np.sum() という、およそメモリを大量に消費することなどなさそうな関数でプロセスが死んでデバッグに苦労した。

問題の回避策

問題を回避するには物理メモリを割り当ててやれば良い。 つまり、初期化した後に自分で値を書き込む。 例えば、次のように empty() で初期化した上で自分で 0 を書き込んでやる。

>>> del x
>>> del y
>>> print_memory_usage()
vms=44MB rss=26MB
>>> x = np.empty((10_000, 10_000))
>>> print_memory_usage()
vms=807MB rss=26MB
>>> x[...] = 0
>>> print_memory_usage()
vms=807MB rss=789MB

ソースコードを追いかけてみる

ここからは、実際に NumPy のソースコードを追いかけてみよう。

一旦、Python のインタプリタのプロセスは立ち上げ直しておく。

$ python3

そして、改めて NumPy をインポートしよう。

>>> import numpy as np

プロセス ID を確認する。

>>> import os
>>> os.getpid()
14592

別のターミナルで gdb を起動する。

$ sudo gdb

Python の REPL プロセスをアタッチする。

(gdb) attach 14592

NumPy で配列のメモリを確保している箇所にブレークポイントを打つ。

(gdb) break PyDataMem_NEW_ZEROED
Breakpoint 1 at 0x7f4c524c9010: file numpy/core/src/multiarray/alloc.c, line 265.

REPL に制御を戻す。

(gdb) c
Continuing.

REPL で zeros() を呼び出す。

>>> x = np.zeros((100, 100))

ブレークポイントに到達するのでバックトレースを表示する。

Breakpoint 1, PyDataMem_NEW_ZEROED (size=size@entry=80000, elsize=elsize@entry=1) at numpy/core/src/multiarray/alloc.c:265
265    {
(gdb) bt
#0  PyDataMem_NEW_ZEROED (size=size@entry=80000, elsize=elsize@entry=1) at numpy/core/src/multiarray/alloc.c:265
#1  0x00007f4c524c90ba in npy_alloc_cache_zero (sz=80000) at numpy/core/src/multiarray/alloc.c:155
#2  0x00007f4c52500486 in PyArray_NewFromDescr_int (subtype=<optimized out>, descr=0x7f4c527c1940 <DOUBLE_Descr>, nd=nd@entry=2, dims=dims@entry=0x13c1da0, 
    strides=strides@entry=0x0, data=data@entry=0x0, flags=<optimized out>, obj=<optimized out>, base=<optimized out>, zeroed=<optimized out>, allow_emptystring=<optimized out>)
    at numpy/core/src/multiarray/ctors.c:826
#3  0x00007f4c525047ee in PyArray_Zeros (nd=2, dims=0x13c1da0, type=<optimized out>, is_f_order=0) at numpy/core/src/multiarray/ctors.c:2833
#4  0x00007f4c5258dc33 in array_zeros (__NPY_UNUSED_TAGGEDignored=<optimized out>, args=0x7f4c52a894c0, kwds=0x0) at numpy/core/src/multiarray/multiarraymodule.c:2045
#5  0x00000000005f4249 in PyCFunction_Call ()
#6  0x00000000005f46d6 in _PyObject_MakeTpCall ()
#7  0x0000000000570936 in _PyEval_EvalFrameDefault ()
#8  0x000000000056955a in _PyEval_EvalCodeWithName ()
#9  0x000000000068c4a7 in PyEval_EvalCode ()
#10 0x000000000067bc91 in ?? ()
#11 0x000000000067bd0f in ?? ()
#12 0x00000000004a3291 in ?? ()
#13 0x00000000004a4305 in PyRun_InteractiveLoopFlags ()
#14 0x000000000067e059 in PyRun_AnyFileExFlags ()
#15 0x00000000004ee716 in ?? ()
#16 0x00000000006b63bd in Py_BytesMain ()
#17 0x00007f4c530000b3 in __libc_start_main (main=0x4eea30 <main>, argc=1, argv=0x7ffdff0db088, init=<optimized out>, fini=<optimized out>, rtld_fini=<optimized out>, 
    stack_end=0x7ffdff0db078) at ../csu/libc-start.c:308
#18 0x00000000005fa4de in _start ()

元は numpy/core/src/multiarray/multiarraymodule.c::array_zeros() から呼び出されていることがわかる。

github.com

行き着く先がブレークポイントを打った関数で、中で calloc(3) している。

github.com

同様に empty() に関しては PyDataMem_NEW() 経由で malloc(3) が呼ばれるっぽい。

github.com

じゃあ何で ones()zeros_like() は物理メモリが増えるかというと、これはデバッガを使うまでもなくわかる。 まず、ones() に関しては empty() で初期化した配列に 1 をコピーしているから。

github.com

同じように zeros_like()empty_like() で作った配列に 0 をコピーしている。

github.com

理由さえ分かってしまえば何ということはない話だった。

Python: TensorFlow2 の自動微分を試してみる

今回は、TensorFlow2 のプリミティブな API を使って、自動微分と勾配法で計算グラフを最適化する方法が気になったので試してみた。 普段は Keras (tf.keras) を使ったミニバッチ学習をすることが多いけど、データのサイズが小さければバッチ学習で解く選択肢もあると思うので。

使った環境は次のとおり。

$ sw_vers
ProductName:    macOS
ProductVersion: 11.2
BuildVersion:   20D64
$ python -V     
Python 3.8.7
$ pip list | grep "tensorflow "
tensorflow             2.4.1

もくじ

下準備

あらかじめ、TensorFlow をインストールしておく。

$ pip install tensorflow

サンプルコード

早速だけど以下にサンプルコードを示す。 今回は単純すぎる例だけど x0^2 + x_1^2 を最小化してみた。 TensorFlow2 では GradientTape というコンテキストマネージャの中で Variable を操作すると、計算グラフが構築されて自動微分ができるようだ。 勾配さえ計算できてしまえば、あとはそれをオプティマイザに放り込んでパラメータを更新するだけ。 オプティマイザは特に何を使っても良いけどシンプルな例なので SGD を使った。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from __future__ import annotations

import tensorflow as tf  # type: ignore


def objective(params: tf.Variable) -> tf.Tensor:
    """最小化したい目的関数"""
    # x_0^2 + x_1^2
    loss = params[0] ** 2 + params[1] ** 2
    return loss


@tf.function
def training_step(params: tf.Variable, optimizer: tf.keras.optimizers.Optimizer) -> None:
    # 自動微分する
    with tf.GradientTape(watch_accessed_variables=False) as t:
        # 勾配を計算するために追跡が必要なパラメータを登録する
        t.watch(params)  # watch_accessed_variables=True のときは不要
        # 計算グラフを構築する
        loss = objective(params)
    # 勾配を計算する
    grads = t.gradient(loss, params)
    # 勾配を使ってパラメータを更新する
    optimizer.apply_gradients([(grads, params)])
    # 現在のパラメータを出力する
    tf.print(params)


def main():
    # 定数で初期化した変数を用意する
    tensor = tf.constant([1., 4.], dtype=tf.float32)
    params = tf.Variable(tensor, trainable=True)

    # SGD で最適化する
    optimizer = tf.keras.optimizers.SGD(learning_rate=1e-1)

    # イテレーションを回す
    for _ in range(20):
        training_step(params, optimizer)

    # 結果を出力する
    print(objective(params))


if __name__ == '__main__':
    main()

上記を保存して実行してみる。 20 回イテレーションを回して、それぞれでパラメータの値を出力している。

$ python tfautograd.py 

...

[0.8 3.2]
[0.64 2.56]
[0.511999965 2.04799986]
[0.40959996 1.63839984]
[0.327679962 1.31071985]
[0.26214397 1.04857588]
[0.209715173 0.838860691]
[0.167772144 0.671088576]
[0.134217709 0.536870837]
[0.107374169 0.429496676]
[0.0858993381 0.343597353]
[0.068719469 0.274877876]
[0.0549755767 0.219902307]
[0.0439804606 0.175921842]
[0.0351843685 0.140737474]
[0.0281474944 0.112589978]
[0.0225179959 0.0900719836]
[0.0180143975 0.0720575899]
[0.0144115184 0.0576460734]
[0.0115292147 0.0461168587]
tf.Tensor(0.0022596875, shape=(), dtype=float32)

上記をみると、ちゃんとパラメータの値が更新されて、最終的な結果が約 0.002 と小さくなっていることがわかる。

いじょう。

参考

www.tensorflow.org

Python: TensorFlow/Keras で Word2Vec の CBOW を実装してみる

(2021-02-04 追記): ニューラルネットワークのアーキテクチャで、出力側の Embedding が誤って Dense になっていた部分を修正した。

Word2Vec の CBOW (Continuous Bag-of-Words) は、単語の分散表現 (Word Embedding) を得るために用いられるニューラルネットワークのアーキテクチャのひとつ。 今回は、それを TensorFlow/Keras を使って実装してみた。 なお、ニューラルネットワークのアーキテクチャは、オライリーの「ゼロから作るDeep Learning ❷ ――自然言語処理編」を参考にしている。

ゼロから作るDeep Learning ❷ ―自然言語処理編

ゼロから作るDeep Learning ❷ ―自然言語処理編

  • 作者:斎藤 康毅
  • 発売日: 2018/07/21
  • メディア: 単行本(ソフトカバー)

ただし、あらかじめ断っておくと実用性はそれほどない。 実用上は gensim とかを使った方が学習は速いし、得られる分散表現も良い性能になるはず。

使った環境は次のとおり。

$ sw_vers
ProductName:    macOS
ProductVersion: 11.2
BuildVersion:   20D64
$ python -V       
Python 3.8.7
$ pip list | grep "tensorflow "
tensorflow             2.4.1

もくじ

下準備

あらかじめ、必要なパッケージをインストールしておく。

$ pip install tensorflow gensim scipy

また、学習するデータセットをダウンロードしておく。 今回は PTB (Penn Treebank) をコーパスに用いる。

$ wget https://raw.githubusercontent.com/tomsercu/lstm/master/data/ptb.train.txt
$ wget https://raw.githubusercontent.com/tomsercu/lstm/master/data/ptb.valid.txt

サンプルコード

早速だけど、以下にサンプルコードを示す。 ニューラルネットワークは SimpleContinuousBagOfWords という名前で実装した。 CBOW は、ある単語の左右に出現する単語から、その単語を推定するアーキテクチャになっている。 そのため、ネットワークに供給するデータは左右に出現する 2 つの単語になる。 そして、どんな単語が出るかを One-Hot 形式の予測値として出力している。 肝心の単語埋め込みは、ネットワーク内にある Embedding レイヤーの重み (Weights) として得られる。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from __future__ import annotations

import re
from itertools import count
from typing import Iterable
from typing import Iterator
from functools import partial

import numpy as np  # type: ignore
import tensorflow as tf  # type: ignore
from tensorflow.keras import Model  # type: ignore
from tensorflow.keras.layers import Embedding  # type: ignore
from tensorflow import Tensor  # type: ignore
from tensorflow.data import Dataset  # type: ignore
from tensorflow.keras.losses import CategoricalCrossentropy  # type: ignore
from tensorflow.keras.optimizers import Adam  # type: ignore
from tensorflow.keras.layers import Dense  # type: ignore
from tensorflow.keras.layers import Softmax  # type: ignore
from tensorflow.keras.layers import Layer # type: ignore
from tensorflow.keras.models import Sequential  # type: ignore
from tensorflow.keras.callbacks import Callback  # type: ignore
from gensim.test.utils import datapath  # type: ignore
from scipy.stats import pearsonr  # type: ignore


class OutputEmbedding(Layer):
    """出力側の単語埋め込み層"""

    def __init__(self, vocab_size: int):
        super().__init__()
        self.vocab_size = vocab_size

    def build(self, input_shape):
        # 埋め込み次元数
        self.embedding_size = input_shape[1]
        # 出力部分に使うので、通常の Embedding とは shape が転置している
        self.embedding = self.add_weight(shape=(self.embedding_size, self.vocab_size))

    def call(self, inputs, **kwargs):
        # 重みは最初から転置してあるので transpose する必要はない
        # (NxD) * (DxV) = (NxV)
        return tf.tensordot(inputs, self.embedding, axes=1)


class SimpleContinuousBagOfWords(Model):
    """Word2Vec の CBOW モデルを実装したクラス"""

    def __init__(self, vocab_size: int, embedding_size: int):
        super().__init__()

        # 入力側の単語埋め込み層
        self.embedding_layer = Embedding(input_dim=vocab_size,
                                         input_shape=(1, ),
                                         output_dim=embedding_size,
                                         name='word_embedding',
                                         )
        self.output_layer = Sequential([
            OutputEmbedding(vocab_size),
            Softmax(),
        ])

    def call(self, inputs: Tensor) -> Tensor:
        # 推定したい単語の左側にある単語のベクトルを取り出す
        left_context = inputs[:, 0]
        left_vector = self.embedding_layer(left_context)
        # 推定したい単語の右側にある単語のベクトルを取り出す
        right_context = inputs[:, 1]
        right_vector = self.embedding_layer(right_context)
        # 両方の単語に対応するベクトルを足して 2 で割る
        mixed_vector = (left_vector + right_vector) / 2
        # 出力側の Embedding と積を取ってから単語の出現確率にする
        prediction = self.output_layer(mixed_vector)
        return prediction


class WordSimilarity353Callback(Callback):
    """WordSim353 データセットを使って単語間の類似度を評価するコールバック"""

    def __init__(self, word_id_table: dict[str, int]):
        super().__init__()

        self.word_id_table = word_id_table
        self.model = None

        # 評価用データを読み込む
        self.eval_data = []
        wordsim_filepath = datapath('wordsim353.tsv')
        with open(wordsim_filepath, mode='r') as fp:
            # 最初の 2 行はヘッダなので読み飛ばす
            fp.readline()
            fp.readline()
            for line in fp:
                word1, word2, sim_score = line.strip().split('\t')
                self.eval_data.append((word1, word2, float(sim_score)))

    def set_model(self, model):
        self.model = model

    def _cosine_similarity(self, x, y):
        nx = x / np.sqrt(np.sum(x ** 2))
        ny = y / np.sqrt(np.sum(y ** 2))
        return np.dot(nx, ny)

    def on_epoch_end(self, epoch, logs=None):
        # モデルから学習させたレイヤーの重みを取り出す
        model_layers = {layer.name: layer for layer in self.model.layers}
        embedding_layer = model_layers['word_embedding']
        word_vectors = embedding_layer.weights[0].numpy()

        # 評価用データセットに含まれる単語間の類似度を計算する
        labels = []
        preds = []
        for word1, word2, sim_score in self.eval_data:
            # Out-of-Vocabulary な単語はスキップ
            if word1 not in self.word_id_table or word2 not in self.word_id_table:
                continue

            # コサイン類似度を計算する
            word1_vec = word_vectors[self.word_id_table[word1]]
            word2_vec = word_vectors[self.word_id_table[word2]]
            pred = self._cosine_similarity(word1_vec, word2_vec)
            preds.append(pred)
            # 正解ラベル
            labels.append(sim_score)

        # ピアソンの相関係数を求める
        r_score = pearsonr(labels, preds)[0]
        print(f'Pearson\'s r score with WordSim353: {r_score}')


def load_corpus(filepath: str) -> Iterator[str]:
    """テキストファイルからコーパスを読み出す"""
    tag_replace = re.compile('<.*?>')
    with open(filepath, mode='r') as fp:
        for line in fp:
            # コーパスに含まれる <unk> や <eos> は取り除きたい
            yield re.sub(tag_replace, '', line)


def sentences_to_words(sentences: Iterable[str], lower: bool = True) -> Iterator[list[str]]:
    """文章を単語に分割する"""
    for sentence in sentences:
        if lower:
            sentence = sentence.lower()
        words = re.split('\\W+', sentence)
        yield [word for word in words if len(word) > 0]  # 空文字は取り除く


def word_id_mappings(sentences: Iterable[Iterable[str]]) -> dict[str, int]:
    """単語を ID に変換する対応テーブルを作る"""
    counter = count(start=0)

    word_to_id = {}
    for sentence in sentences:
        for word in sentence:

            if word in word_to_id:
                # 登録済みの単語
                continue

            # 単語の識別子を採番する
            word_id = next(counter)
            word_to_id[word] = word_id

    return word_to_id


def words_to_ids(sentences: Iterable[list[str]], word_to_id: dict[str, int]) -> Iterator[list[int]]:
    # 単語を対応する ID に変換する
    for words in sentences:
        # NOTE: Out-of-Vocabulary への対応がない
        yield [word_to_id[word] for word in words]


def cosine_similarity_matrix(word_vectors: np.ndarray, eps: float = 1e-8) -> np.ndarray:
    """単語ベクトル間のコサイン類似度を計算する"""
    word_norm = np.sqrt(np.sum(word_vectors ** 2, axis=1)).reshape(word_vectors.shape[0], -1)
    normalized_word_vectors = word_vectors / (word_norm + eps)
    cs_matrix = np.dot(normalized_word_vectors, normalized_word_vectors.T)
    return cs_matrix


def most_similar_words(word_id: int, cs_matrix: np.ndarray, top_n: int = 5):
    """コサイン類似度が最も高い単語の ID を取り出す"""
    similarities = cs_matrix[word_id, :]
    similar_word_ids = np.argsort(similarities)[::-1]
    # 基本的にコサイン類似度が最も高い ID は自分自身になるはずなので先頭は取り除く
    return similar_word_ids[1: top_n + 1]


def extract_contexts(word_ids: Tensor, window_size: int = 1) -> Tensor:
    """コンテキストの単語をラベル形式で得る"""
    right_contexts = word_ids[window_size + 1:]
    left_contexts = word_ids[:-window_size - 1]
    contexts = tf.transpose([left_contexts, right_contexts])
    return contexts


def extract_targets(word_ids: Tensor, vocab_size: int, window_size: int = 1) -> Tensor:
    """ターゲットの単語を One-Hot 形式にする"""
    targets = word_ids[window_size:-window_size]
    # 正解との損失を計算するために One-Hot 表現にする
    one_hot_targets = tf.one_hot(targets, depth=vocab_size)
    return one_hot_targets


def dataset_pipeline(corpus_words: Iterator[list[str]], word_id_table: dict[str, int]) -> Dataset:
    """ターゲットとコンテキストを供給するパイプライン"""

    # ID に変換したコーパスを行ごとに読み出せるデータセット
    word_ids_ds = Dataset.from_generator(lambda: words_to_ids(corpus_words, word_id_table),
                                         tf.int32,
                                         output_shapes=[None])

    # コンテキストを抽出するパイプライン
    contexts_ds = word_ids_ds.map(extract_contexts,
                                  num_parallel_calls=tf.data.AUTOTUNE,
                                  # ターゲットと対応関係を作る必要があるので決定論的に動作させる
                                  deterministic=True)

    # ターゲットを抽出するパイプライン
    vocab_size = len(word_id_table.keys())
    f = partial(extract_targets, vocab_size=vocab_size)
    targets_ds = word_ids_ds.map(f,
                                 num_parallel_calls=tf.data.AUTOTUNE,
                                 # コンテキストと対応関係を作る必要があるので決定論的に動作させる
                                 deterministic=True)

    # 行単位で処理されているので flatten して結合する
    zipped_ds = Dataset.zip((contexts_ds.unbatch(), targets_ds.unbatch()))
    return zipped_ds


def count_words(corpus_words: Iterator[list[str]]) -> int:
    """コーパスに含まれる単語の数をカウントする"""
    return sum(len(words) for words in corpus_words)


def main():
    # Penn Treebank コーパスを読み込む
    train_sentences = load_corpus('ptb.train.txt')
    valid_sentences = load_corpus('ptb.valid.txt')

    # コーパスを単語に分割する
    train_corpus_words = list(sentences_to_words(train_sentences))
    valid_corpus_words = list(sentences_to_words(valid_sentences))

    # 単語 -> ID
    word_to_id = word_id_mappings(train_corpus_words)
    # コーパスの語彙数
    vocab_size = len(word_to_id.keys())

    # モデルとタスクを定義する
    EMBEDDING_SIZE = 50
    criterion = CategoricalCrossentropy()
    optimizer = Adam(learning_rate=1e-2)
    model = SimpleContinuousBagOfWords(vocab_size, EMBEDDING_SIZE)
    model.compile(optimizer=optimizer,
                  loss=criterion)

    # データセットを準備する
    TRAIN_BATCH_SIZE = 1024
    train_ds = dataset_pipeline(train_corpus_words, word_to_id)
    train_ds = train_ds.shuffle(buffer_size=512)
    train_ds = train_ds.repeat()
    train_ds = train_ds.batch(TRAIN_BATCH_SIZE)
    train_ds = train_ds.prefetch(buffer_size=tf.data.AUTOTUNE)
    # 検証用データはリピートしない
    VALID_BATCH_SIZE = 1024
    valid_ds = dataset_pipeline(valid_corpus_words, word_to_id)
    valid_ds = valid_ds.batch(VALID_BATCH_SIZE)
    valid_ds = valid_ds.prefetch(buffer_size=tf.data.AUTOTUNE)

    # 厳密に要素数を計算するのは面倒なので、ざっくり単語数の 2 倍をエポックということにしておく
    num_of_train_samples = count_words(train_corpus_words) * 2

    callbacks = [
        # WordSim353 データセットを使って単語間の類似度を相関係数で確認する
        WordSimilarity353Callback(word_to_id),
    ]
    # 学習する
    model.fit(train_ds,
              steps_per_epoch=num_of_train_samples // TRAIN_BATCH_SIZE,
              validation_data=valid_ds,
              epochs=5,
              callbacks=callbacks,
              verbose=1,
              )

    # モデルから学習させたレイヤーの重みを取り出す
    model_layers = {layer.name: layer for layer in model.layers}
    embedding_layer = model_layers['word_embedding']
    word_vectors = embedding_layer.weights[0].numpy()

    # 単語を表すベクトル間のコサイン類似度を計算する
    cs_matrix = cosine_similarity_matrix(word_vectors)
    # ID -> 単語
    id_to_word = {value: key for key, value in word_to_id.items()}

    # いくつか似ているベクトルを持った単語を確認してみる
    example_words = ['you', 'year', 'car', 'toyota']
    for target_word in example_words:
        # ID に変換した上で最も似ている単語とそのベクトルを取り出す
        print(f'The most similar words of "{target_word}"')
        target_word_id = word_to_id[target_word]
        most_similar_word_ids = most_similar_words(target_word_id, cs_matrix)
        # 単語とベクトルを表示する
        for rank, similar_word_id in enumerate(most_similar_word_ids, start=1):
            similar_word = id_to_word[similar_word_id]
            similarity = cs_matrix[target_word_id, similar_word_id]
            print(f'TOP {rank}: {similar_word} = {similarity}')
        print('-' * 50)


if __name__ == '__main__':
    main()

上記に名前をつけて保存したら実行してみよう。 CPU を使って学習すると、1 エポックにそれなりの時間がかかるけど、まあなんとかなるレベルのはず。 エポック間には、WordSim353 データセットを使って評価した、単語間の類似度がピアソンの相関係数として表示される。 学習が終わったら、いくつかの単語について、類似したベクトルを持つ単語を上位 5 件について表示させている。

$ python cbow.py

...

1651/1651 [==============================] - 181s 109ms/step - loss: 6.1526 - val_loss: 5.1804
Pearson's r score with WordSim353: 0.21794273571470427
Epoch 2/5
1651/1651 [==============================] - 182s 110ms/step - loss: 4.7424 - val_loss: 5.2353
Pearson's r score with WordSim353: 0.2428972583779604
Epoch 3/5
1651/1651 [==============================] - 172s 104ms/step - loss: 4.4591 - val_loss: 5.3129
Pearson's r score with WordSim353: 0.2583752228928214
Epoch 4/5
1651/1651 [==============================] - 160s 97ms/step - loss: 4.3708 - val_loss: 5.3941
Pearson's r score with WordSim353: 0.25454681209868246
Epoch 5/5
1651/1651 [==============================] - 4038s 2s/step - loss: 4.3027 - val_loss: 5.4816
Pearson's r score with WordSim353: 0.25160892813703517
The most similar words of "you"
TOP 1: we = 0.8296732306480408
TOP 2: they = 0.7543421983718872
TOP 3: i = 0.668086051940918
TOP 4: she = 0.5567612051963806
TOP 5: imbalances = 0.5095677375793457
--------------------------------------------------
The most similar words of "year"
TOP 1: week = 0.7451192140579224
TOP 2: month = 0.7027692794799805
TOP 3: day = 0.6059724688529968
TOP 4: spring = 0.5649460554122925
TOP 5: decade = 0.5465914011001587
--------------------------------------------------
The most similar words of "car"
TOP 1: seed = 0.6106906533241272
TOP 2: taxi = 0.6024419665336609
TOP 3: auto = 0.574679434299469
TOP 4: siemens = 0.5651793479919434
TOP 5: subscriber = 0.5451180934906006
--------------------------------------------------
The most similar words of "toyota"
TOP 1: ford = 0.6328699588775635
TOP 2: celebrity = 0.5823256373405457
TOP 3: humana = 0.5597572922706604
TOP 4: supermarkets = 0.5554667115211487
TOP 5: honda = 0.5467281937599182
--------------------------------------------------

上記をみると、学習データの損失はエポックを重ねるごとに小さくなっているが、検証データの損失は 2 エポック目以降から悪化している。 これは一般的には過学習している状態だが、WordSim353 を使った単語間類似度の評価は 2 エポック目も向上している。 また、ベストなエポックの結果ではないものの、最後に出力している類似の単語については、そこそこ納得感がある結果に思える。

ゼロから作るDeep Learning ❷ ―自然言語処理編

ゼロから作るDeep Learning ❷ ―自然言語処理編

  • 作者:斎藤 康毅
  • 発売日: 2018/07/21
  • メディア: 単行本(ソフトカバー)

Python: 正の相互情報量 (PPMI) と特異値分解 (SVD) を使った単語の分散表現

(2021-02-02 追記): 共起行列の計算を NumPy の Integer array indexing を使った実装にした

オライリーの「ゼロから作るDeep Learning ❷ ――自然言語処理編」を読んでいる。 この中に、カウントベースで計算する初歩的な単語の分散表現が紹介されていて、なかなか面白かった。 単語の分散表現を得る方法は色々とあるけど、カウントベースは単語の出現数を元にした比較的シンプルなもの。 今回は、正の相互情報量 (Positive Pointwise Mutual Information; PPMI) と特異値分解 (Singular Value Decomposition; SVD) を使って単語の分散表現を獲得する例を実装してみた。 書籍に記載されている内容を参考にしつつも、自分なりのコードにしてある。 オリジナルとの差分で大きいのは、正の相互情報量と単語間のコサイン類似度を計算する部分を NumPy のベクトル演算にしたところかな。

使った環境は次のとおり。

$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.15.7
BuildVersion:   19H114
$ python -V               
Python 3.8.7

もくじ

下準備

あらかじめ、必要なパッケージをインストールしておく。

$ pip install scikit-learn

コーパスは、前述の書籍の中で用いられていた「Penn Treebank (PTB)」を使う。 こちらも、あらかじめダウンロードしておこう。

$ wget https://raw.githubusercontent.com/tomsercu/lstm/master/data/ptb.train.txt

サンプルコード

以下に実装したサンプルコードを示す。 コメントもつけてあるけど、サンプルコードでは次のような流れで処理している。

  • PTB コーパスを読み込む
  • 単語を ID に変換する
    • 語彙 (Vocabulary) を構築する
  • 共起行列を計算する
    • 文章の中で、ある単語と別の単語が共に用いられた回数を表している
      • つまり文の中で近い位置に登場した、ということ
  • 正の相互情報量を計算する
    • ある単語と別の単語が共に用いられる度合いを表している
  • 特異値分解して単語の分散表現を得る
    • 相互情報量を複数の行列に分解すると共に次元圧縮をかける
  • いくつかの単語について類似の分散表現を持った単語を調べる
    • 分散表現の近さをコサイン類似度で測る
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from __future__ import annotations

import re
from itertools import count
from typing import Iterable
from typing import Iterator

import numpy as np  # type: ignore
from sklearn.utils.extmath import randomized_svd  # type: ignore


def load_corpus(filepath: str) -> Iterator[str]:
    """テキストファイルからコーパスを読み出す"""
    tag_replace = re.compile('<.*?>')
    with open(filepath, mode='r') as fp:
        for line in fp:
            # コーパスに含まれる <unk> や <eos> は取り除きたい
            yield re.sub(tag_replace, '', line)


def sentences_to_words(sentences: Iterable[str], lower: bool = True) -> Iterator[list[str]]:
    """文章を単語に分割する"""
    for sentence in sentences:
        if lower:
            sentence = sentence.lower()
        words = re.split('\\W+', sentence)
        yield [word for word in words if len(word) > 0]  # 空文字は取り除く


def word_id_mappings(sentences: Iterable[Iterable[str]]) -> dict[str, int]:
    """単語を ID に変換する対応テーブルを作る"""
    counter = count(start=0)

    word_to_id = {}
    for sentence in sentences:
        for word in sentence:

            if word in word_to_id:
                # 登録済みの単語
                continue

            # 単語の識別子を採番する
            word_id = next(counter)
            word_to_id[word] = word_id

    return word_to_id


def words_to_ids(sentences: Iterable[list[str]], word_to_id: dict[str, int]) -> Iterator[list[int]]:
    # 単語を対応する ID に変換する
    for words in sentences:
        # NOTE: Out-of-Vocabulary への対応がない
        yield [word_to_id[word] for word in words]


def co_occurence_matrix(corpus_ids: Iterable[list[int]], vocab_size: int, window_size: int) -> np.ndarray:
    """共起行列 (Co-occurence Matrix) を計算する"""
    co_matrix = np.zeros((vocab_size, vocab_size), dtype=np.int32)

    for word_ids in corpus_ids:
        for context_len in range(1, window_size + 1):
            # 元の ID 列と、コンテキスト分をずらした ID 列を用意する
            base = word_ids[context_len:]
            shifted = word_ids[:-context_len]
            # 元の列とずらした列の対応を取れば、共起した単語を Integer array indexing で処理できる
            co_matrix[base, shifted] += 1
            co_matrix[shifted, base] += 1

    return co_matrix


def positive_pointwise_mutual_information(co_matrix: np.ndarray, eps: float = 1e-8):
    # 正の相互情報量 (Positive Pointwise Mutual Information) を計算する
    total_words = np.sum(co_matrix)
    word_count = np.sum(co_matrix, axis=0)

    # PPMI = max(PMI, 0)
    # PMI = log2(P(x, y) / (P(x) * P(y)) = log2(C(x, y) * N / (C(x) * C(y)))
    # P(x) = C(x) / N
    # P(x, y) = C(x, y) / N

    # ベクトル化した計算
    ppmi_matrix = co_matrix.copy().astype(np.float32)
    ppmi_matrix *= total_words
    ppmi_matrix /= word_count * word_count.reshape(word_count.shape[0], -1) + eps
    ppmi_matrix = np.log2(ppmi_matrix + eps)
    ppmi_matrix = np.clip(ppmi_matrix, 0., None)

    return ppmi_matrix


def cosine_similarity_matrix(word_vectors: np.ndarray, eps: float = 1e-8) -> np.ndarray:
    """単語ベクトル間のコサイン類似度を計算する"""
    word_norm = np.sqrt(np.sum(word_vectors ** 2, axis=1)).reshape(word_vectors.shape[0], -1)
    normalized_word_vectors = word_vectors / (word_norm + eps)
    cs_matrix = np.dot(normalized_word_vectors, normalized_word_vectors.T)
    return cs_matrix


def most_similar_words(word_id: int, cs_matrix: np.ndarray, top_n: int = 5):
    """コサイン類似度が最も高い単語の ID を取り出す"""
    similarities = cs_matrix[word_id, :]
    similar_word_ids = np.argsort(similarities)[::-1]
    # 基本的にコサイン類似度が最も高い ID は自分自身になるはずなので先頭は取り除く
    return similar_word_ids[1: top_n + 1]


def main():
    # Penn Treebank コーパスを読み込む
    corpus_sentences = load_corpus('ptb.train.txt')

    # コーパスを単語に分割する
    corpus_words = list(sentences_to_words(corpus_sentences))

    # 単語 -> ID
    word_to_id = word_id_mappings(corpus_words)

    # コーパスの語彙数
    vocab_size = len(word_to_id.keys())

    # コーパスの単語を ID に変換する
    corpus_ids = words_to_ids(corpus_words, word_to_id)

    # 共起行列を計算する
    co_matrix = co_occurence_matrix(corpus_ids, vocab_size, window_size=2)

    # PPMI を計算する
    ppmi_matrix = positive_pointwise_mutual_information(co_matrix)

    # 特異値分解 (Singular Value Decomposition; SVD) する
    # 特異値の大きな次元を基準に先頭を取り出すことで次元圧縮する
    word_vectors, _, _ = randomized_svd(ppmi_matrix, n_components=100)

    # 単語を表すベクトル間のコサイン類似度を計算する
    cs_matrix = cosine_similarity_matrix(word_vectors)

    # ID -> 単語
    id_to_word = {value: key for key, value in word_to_id.items()}

    # いくつか似ているベクトルを持った単語を確認してみる
    example_words = ['you', 'year', 'car', 'toyota']
    for target_word in example_words:
        # ID に変換した上で最も似ている単語とそのベクトルを取り出す
        print(f'The most similar words of "{target_word}"')
        target_word_id = word_to_id[target_word]
        most_similar_word_ids = most_similar_words(target_word_id, cs_matrix)
        # 単語とベクトルを表示する
        for rank, similar_word_id in enumerate(most_similar_word_ids, start=1):
            similar_word = id_to_word[similar_word_id]
            similarity = cs_matrix[target_word_id, similar_word_id]
            print(f'TOP {rank}: {similar_word} = {similarity}')
        print('-' * 50)


if __name__ == '__main__':
    main()

上記を適当な名前のモジュールとして保存した上で実行してみよう。 サンプルコードでは「you」「year」「car」「toyota」について類似の分散表現を持つ単語を調べている。

$ python pmisvd.py      
The most similar words of "you"
TOP 1: we = 0.7093088626861572
TOP 2: i = 0.699989378452301
TOP 3: yourself = 0.5935164093971252
TOP 4: anybody = 0.5696209073066711
TOP 5: ll = 0.537631094455719
--------------------------------------------------
The most similar words of "year"
TOP 1: month = 0.727214515209198
TOP 2: earlier = 0.6935610771179199
TOP 3: period = 0.6458954215049744
TOP 4: next = 0.6107259392738342
TOP 5: fiscal = 0.5819855332374573
--------------------------------------------------
The most similar words of "car"
TOP 1: auto = 0.706072986125946
TOP 2: luxury = 0.6855229735374451
TOP 3: vehicle = 0.48040351271629333
TOP 4: cars = 0.46684157848358154
TOP 5: truck = 0.4660819172859192
--------------------------------------------------
The most similar words of "toyota"
TOP 1: motor = 0.722280740737915
TOP 2: motors = 0.6682790517807007
TOP 3: nissan = 0.6666470170021057
TOP 4: honda = 0.6519786715507507
TOP 5: lexus = 0.6209456920623779
--------------------------------------------------

見ると、意味的に近い単語が類似した分散表現を持つように学習できていることがわかる。 「toyota」の類似語に「motor」と「motors」が別々に出てきてしまっているのはステミング (Stemming) の処理が入っていないため。

サンプルコードで共起行列を作るのに使ったウィンドウサイズは 2 ということを前提にして考えたとき、次のような感想を抱いた。

  • そんなに共起するか?っていう「you」と「we」や「i」が類似した分散表現になっているのは面白い
  • 「toyota」の類似語に「nissan」や「honda」があるところには納得感がある
    • 同じ文章の中で並べられることも多いだろうし

ところで、書籍の中には共起行列で特定の単語に対応する行ベクトルもカウントベースの分散表現の一種とあった。 個人的にその認識はなかったので、読んだときには「なるほど」と感じた。

ゼロから作るDeep Learning ❷ ―自然言語処理編

ゼロから作るDeep Learning ❷ ―自然言語処理編

  • 作者:斎藤 康毅
  • 発売日: 2018/07/21
  • メディア: 単行本(ソフトカバー)

Python: REPL に複数行をペーストしたときの挙動が変わって困った件について

表題のとおりなんだけど、最近 Python の REPL に複数行のコードをペーストしたときの挙動が以前と変わってしまい困っていた。 その Python というのは、具体的には Homebrew でインストールしたものや、Pyenv を使ってソースコードからビルドしたもの。

使っている環境は次のとおり。

$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.15.7
BuildVersion:   19H114
$ brew info readline | head -n 1
readline: stable 8.1 (bottled) [keg-only]
$ pyenv --version
pyenv 1.2.22

もくじ

TL;DR

結論から先に述べると、これは Python のビルドに使った GNU Readline のオプションが関係している。 具体的には、バージョン 8.1 から enable-bracketed-paste というオプションのデフォルト値が off から on に変更されたらしい。

そのため、問題のワークアラウンドとしては、このオプションの値を明示的に指定すれば良い。 たとえば、以下のようなコマンドで GNU Readline の設定ファイルを作ろう。

$ echo "set enable-bracketed-paste off" >> ~/.inputrc

これで、以前と同じ挙動になる。

問題についてもうちょっと詳しく

たとえば、次のような 2 行をコピーアンドペーストしたいときを想定する。

a = 1
b = 2

以前の REPL であれば、次のように別々の行として認識された。

>>> a = 1
>>> b = 2

それが、問題のある環境では以下のように SyntaxError 例外になってしまう。

>>> a = 1
b = 2
  File "<stdin>", line 1
    a = 1
b = 2
         ^
SyntaxError: multiple statements found while compiling a single statement

複数行を一度にペーストできないのは REPL として使い勝手が良くない。

上記の問題が起こる環境には、バージョンは問わず割と最近にビルドしたもの、という共通点があった。 そのため、GNU Readline が怪しいという当たりはついていたので、そこを重点的に調べていったところ以下のページにたどり着いた。

github.com

どうやら GNU Readline のバージョン 8.1 から、enable-bracketed-paste というオプションがデフォルトで on になったらしい。 その影響を受けて、複数行のペーストが上記のような挙動に変わってしまった。

最近の端末エミュレータには、Bracketed Paste Mode というものがある。 これは、通常の文字の入力とペーストによる入力を区別できるようにすることを意図して追加された機能らしい。 そして、GNU Readline はコマンドラインの編集操作に関する機能を提供するライブラリとなっている。 これまで、GNU Readline はペースト時のデフォルトの設定として Bracketed Paste Mode を無効にしていたようだ。 それが、8.1 からデフォルトの設定が変更になって、今回の問題が生じた、ということらしい。

それはそれとして、Python の REPL が Bracketed Paste に対応するという話もあると思う。 対応していると、空行の入ったスクリプトのコードをそのまま入れてもエラーにならないという嬉しさがあるはず。 ただ、調べたところ以下のチケットはあったが現状であまり話は前に進んでいないようだ。

bugs.python.org

いじょう。