CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: TensorFlow/Keras で Word2Vec の CBOW を実装してみる

(2021-02-04 追記): ニューラルネットワークのアーキテクチャで、出力側の Embedding が誤って Dense になっていた部分を修正した。

Word2Vec の CBOW (Continuous Bag-of-Words) は、単語の分散表現 (Word Embedding) を得るために用いられるニューラルネットワークのアーキテクチャのひとつ。 今回は、それを TensorFlow/Keras を使って実装してみた。 なお、ニューラルネットワークのアーキテクチャは、オライリーの「ゼロから作るDeep Learning ❷ ――自然言語処理編」を参考にしている。

ゼロから作るDeep Learning ❷ ―自然言語処理編

ゼロから作るDeep Learning ❷ ―自然言語処理編

  • 作者:斎藤 康毅
  • 発売日: 2018/07/21
  • メディア: 単行本(ソフトカバー)

ただし、あらかじめ断っておくと実用性はそれほどない。 実用上は gensim とかを使った方が学習は速いし、得られる分散表現も良い性能になるはず。

使った環境は次のとおり。

$ sw_vers
ProductName:    macOS
ProductVersion: 11.2
BuildVersion:   20D64
$ python -V       
Python 3.8.7
$ pip list | grep "tensorflow "
tensorflow             2.4.1

もくじ

下準備

あらかじめ、必要なパッケージをインストールしておく。

$ pip install tensorflow gensim scipy

また、学習するデータセットをダウンロードしておく。 今回は PTB (Penn Treebank) をコーパスに用いる。

$ wget https://raw.githubusercontent.com/tomsercu/lstm/master/data/ptb.train.txt
$ wget https://raw.githubusercontent.com/tomsercu/lstm/master/data/ptb.valid.txt

サンプルコード

早速だけど、以下にサンプルコードを示す。 ニューラルネットワークは SimpleContinuousBagOfWords という名前で実装した。 CBOW は、ある単語の左右に出現する単語から、その単語を推定するアーキテクチャになっている。 そのため、ネットワークに供給するデータは左右に出現する 2 つの単語になる。 そして、どんな単語が出るかを One-Hot 形式の予測値として出力している。 肝心の単語埋め込みは、ネットワーク内にある Embedding レイヤーの重み (Weights) として得られる。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from __future__ import annotations

import re
from itertools import count
from typing import Iterable
from typing import Iterator
from functools import partial

import numpy as np  # type: ignore
import tensorflow as tf  # type: ignore
from tensorflow.keras import Model  # type: ignore
from tensorflow.keras.layers import Embedding  # type: ignore
from tensorflow import Tensor  # type: ignore
from tensorflow.data import Dataset  # type: ignore
from tensorflow.keras.losses import CategoricalCrossentropy  # type: ignore
from tensorflow.keras.optimizers import Adam  # type: ignore
from tensorflow.keras.layers import Dense  # type: ignore
from tensorflow.keras.layers import Softmax  # type: ignore
from tensorflow.keras.layers import Layer # type: ignore
from tensorflow.keras.models import Sequential  # type: ignore
from tensorflow.keras.callbacks import Callback  # type: ignore
from gensim.test.utils import datapath  # type: ignore
from scipy.stats import pearsonr  # type: ignore


class OutputEmbedding(Layer):
    """出力側の単語埋め込み層"""

    def __init__(self, vocab_size: int):
        super().__init__()
        self.vocab_size = vocab_size

    def build(self, input_shape):
        # 埋め込み次元数
        self.embedding_size = input_shape[1]
        # 出力部分に使うので、通常の Embedding とは shape が転置している
        self.embedding = self.add_weight(shape=(self.embedding_size, self.vocab_size))

    def call(self, inputs, **kwargs):
        # 重みは最初から転置してあるので transpose する必要はない
        # (NxD) * (DxV) = (NxV)
        return tf.tensordot(inputs, self.embedding, axes=1)


class SimpleContinuousBagOfWords(Model):
    """Word2Vec の CBOW モデルを実装したクラス"""

    def __init__(self, vocab_size: int, embedding_size: int):
        super().__init__()

        # 入力側の単語埋め込み層
        self.embedding_layer = Embedding(input_dim=vocab_size,
                                         input_shape=(1, ),
                                         output_dim=embedding_size,
                                         name='word_embedding',
                                         )
        self.output_layer = Sequential([
            OutputEmbedding(vocab_size),
            Softmax(),
        ])

    def call(self, inputs: Tensor) -> Tensor:
        # 推定したい単語の左側にある単語のベクトルを取り出す
        left_context = inputs[:, 0]
        left_vector = self.embedding_layer(left_context)
        # 推定したい単語の右側にある単語のベクトルを取り出す
        right_context = inputs[:, 1]
        right_vector = self.embedding_layer(right_context)
        # 両方の単語に対応するベクトルを足して 2 で割る
        mixed_vector = (left_vector + right_vector) / 2
        # 出力側の Embedding と積を取ってから単語の出現確率にする
        prediction = self.output_layer(mixed_vector)
        return prediction


class WordSimilarity353Callback(Callback):
    """WordSim353 データセットを使って単語間の類似度を評価するコールバック"""

    def __init__(self, word_id_table: dict[str, int]):
        super().__init__()

        self.word_id_table = word_id_table
        self.model = None

        # 評価用データを読み込む
        self.eval_data = []
        wordsim_filepath = datapath('wordsim353.tsv')
        with open(wordsim_filepath, mode='r') as fp:
            # 最初の 2 行はヘッダなので読み飛ばす
            fp.readline()
            fp.readline()
            for line in fp:
                word1, word2, sim_score = line.strip().split('\t')
                self.eval_data.append((word1, word2, float(sim_score)))

    def set_model(self, model):
        self.model = model

    def _cosine_similarity(self, x, y):
        nx = x / np.sqrt(np.sum(x ** 2))
        ny = y / np.sqrt(np.sum(y ** 2))
        return np.dot(nx, ny)

    def on_epoch_end(self, epoch, logs=None):
        # モデルから学習させたレイヤーの重みを取り出す
        model_layers = {layer.name: layer for layer in self.model.layers}
        embedding_layer = model_layers['word_embedding']
        word_vectors = embedding_layer.weights[0].numpy()

        # 評価用データセットに含まれる単語間の類似度を計算する
        labels = []
        preds = []
        for word1, word2, sim_score in self.eval_data:
            # Out-of-Vocabulary な単語はスキップ
            if word1 not in self.word_id_table or word2 not in self.word_id_table:
                continue

            # コサイン類似度を計算する
            word1_vec = word_vectors[self.word_id_table[word1]]
            word2_vec = word_vectors[self.word_id_table[word2]]
            pred = self._cosine_similarity(word1_vec, word2_vec)
            preds.append(pred)
            # 正解ラベル
            labels.append(sim_score)

        # ピアソンの相関係数を求める
        r_score = pearsonr(labels, preds)[0]
        print(f'Pearson\'s r score with WordSim353: {r_score}')


def load_corpus(filepath: str) -> Iterator[str]:
    """テキストファイルからコーパスを読み出す"""
    tag_replace = re.compile('<.*?>')
    with open(filepath, mode='r') as fp:
        for line in fp:
            # コーパスに含まれる <unk> や <eos> は取り除きたい
            yield re.sub(tag_replace, '', line)


def sentences_to_words(sentences: Iterable[str], lower: bool = True) -> Iterator[list[str]]:
    """文章を単語に分割する"""
    for sentence in sentences:
        if lower:
            sentence = sentence.lower()
        words = re.split('\\W+', sentence)
        yield [word for word in words if len(word) > 0]  # 空文字は取り除く


def word_id_mappings(sentences: Iterable[Iterable[str]]) -> dict[str, int]:
    """単語を ID に変換する対応テーブルを作る"""
    counter = count(start=0)

    word_to_id = {}
    for sentence in sentences:
        for word in sentence:

            if word in word_to_id:
                # 登録済みの単語
                continue

            # 単語の識別子を採番する
            word_id = next(counter)
            word_to_id[word] = word_id

    return word_to_id


def words_to_ids(sentences: Iterable[list[str]], word_to_id: dict[str, int]) -> Iterator[list[int]]:
    # 単語を対応する ID に変換する
    for words in sentences:
        # NOTE: Out-of-Vocabulary への対応がない
        yield [word_to_id[word] for word in words]


def cosine_similarity_matrix(word_vectors: np.ndarray, eps: float = 1e-8) -> np.ndarray:
    """単語ベクトル間のコサイン類似度を計算する"""
    word_norm = np.sqrt(np.sum(word_vectors ** 2, axis=1)).reshape(word_vectors.shape[0], -1)
    normalized_word_vectors = word_vectors / (word_norm + eps)
    cs_matrix = np.dot(normalized_word_vectors, normalized_word_vectors.T)
    return cs_matrix


def most_similar_words(word_id: int, cs_matrix: np.ndarray, top_n: int = 5):
    """コサイン類似度が最も高い単語の ID を取り出す"""
    similarities = cs_matrix[word_id, :]
    similar_word_ids = np.argsort(similarities)[::-1]
    # 基本的にコサイン類似度が最も高い ID は自分自身になるはずなので先頭は取り除く
    return similar_word_ids[1: top_n + 1]


def extract_contexts(word_ids: Tensor, window_size: int = 1) -> Tensor:
    """コンテキストの単語をラベル形式で得る"""
    right_contexts = word_ids[window_size + 1:]
    left_contexts = word_ids[:-window_size - 1]
    contexts = tf.transpose([left_contexts, right_contexts])
    return contexts


def extract_targets(word_ids: Tensor, vocab_size: int, window_size: int = 1) -> Tensor:
    """ターゲットの単語を One-Hot 形式にする"""
    targets = word_ids[window_size:-window_size]
    # 正解との損失を計算するために One-Hot 表現にする
    one_hot_targets = tf.one_hot(targets, depth=vocab_size)
    return one_hot_targets


def dataset_pipeline(corpus_words: Iterator[list[str]], word_id_table: dict[str, int]) -> Dataset:
    """ターゲットとコンテキストを供給するパイプライン"""

    # ID に変換したコーパスを行ごとに読み出せるデータセット
    word_ids_ds = Dataset.from_generator(lambda: words_to_ids(corpus_words, word_id_table),
                                         tf.int32,
                                         output_shapes=[None])

    # コンテキストを抽出するパイプライン
    contexts_ds = word_ids_ds.map(extract_contexts,
                                  num_parallel_calls=tf.data.AUTOTUNE,
                                  # ターゲットと対応関係を作る必要があるので決定論的に動作させる
                                  deterministic=True)

    # ターゲットを抽出するパイプライン
    vocab_size = len(word_id_table.keys())
    f = partial(extract_targets, vocab_size=vocab_size)
    targets_ds = word_ids_ds.map(f,
                                 num_parallel_calls=tf.data.AUTOTUNE,
                                 # コンテキストと対応関係を作る必要があるので決定論的に動作させる
                                 deterministic=True)

    # 行単位で処理されているので flatten して結合する
    zipped_ds = Dataset.zip((contexts_ds.unbatch(), targets_ds.unbatch()))
    return zipped_ds


def count_words(corpus_words: Iterator[list[str]]) -> int:
    """コーパスに含まれる単語の数をカウントする"""
    return sum(len(words) for words in corpus_words)


def main():
    # Penn Treebank コーパスを読み込む
    train_sentences = load_corpus('ptb.train.txt')
    valid_sentences = load_corpus('ptb.valid.txt')

    # コーパスを単語に分割する
    train_corpus_words = list(sentences_to_words(train_sentences))
    valid_corpus_words = list(sentences_to_words(valid_sentences))

    # 単語 -> ID
    word_to_id = word_id_mappings(train_corpus_words)
    # コーパスの語彙数
    vocab_size = len(word_to_id.keys())

    # モデルとタスクを定義する
    EMBEDDING_SIZE = 50
    criterion = CategoricalCrossentropy()
    optimizer = Adam(learning_rate=1e-2)
    model = SimpleContinuousBagOfWords(vocab_size, EMBEDDING_SIZE)
    model.compile(optimizer=optimizer,
                  loss=criterion)

    # データセットを準備する
    TRAIN_BATCH_SIZE = 1024
    train_ds = dataset_pipeline(train_corpus_words, word_to_id)
    train_ds = train_ds.shuffle(buffer_size=512)
    train_ds = train_ds.repeat()
    train_ds = train_ds.batch(TRAIN_BATCH_SIZE)
    train_ds = train_ds.prefetch(buffer_size=tf.data.AUTOTUNE)
    # 検証用データはリピートしない
    VALID_BATCH_SIZE = 1024
    valid_ds = dataset_pipeline(valid_corpus_words, word_to_id)
    valid_ds = valid_ds.batch(VALID_BATCH_SIZE)
    valid_ds = valid_ds.prefetch(buffer_size=tf.data.AUTOTUNE)

    # 厳密に要素数を計算するのは面倒なので、ざっくり単語数の 2 倍をエポックということにしておく
    num_of_train_samples = count_words(train_corpus_words) * 2

    callbacks = [
        # WordSim353 データセットを使って単語間の類似度を相関係数で確認する
        WordSimilarity353Callback(word_to_id),
    ]
    # 学習する
    model.fit(train_ds,
              steps_per_epoch=num_of_train_samples // TRAIN_BATCH_SIZE,
              validation_data=valid_ds,
              epochs=5,
              callbacks=callbacks,
              verbose=1,
              )

    # モデルから学習させたレイヤーの重みを取り出す
    model_layers = {layer.name: layer for layer in model.layers}
    embedding_layer = model_layers['word_embedding']
    word_vectors = embedding_layer.weights[0].numpy()

    # 単語を表すベクトル間のコサイン類似度を計算する
    cs_matrix = cosine_similarity_matrix(word_vectors)
    # ID -> 単語
    id_to_word = {value: key for key, value in word_to_id.items()}

    # いくつか似ているベクトルを持った単語を確認してみる
    example_words = ['you', 'year', 'car', 'toyota']
    for target_word in example_words:
        # ID に変換した上で最も似ている単語とそのベクトルを取り出す
        print(f'The most similar words of "{target_word}"')
        target_word_id = word_to_id[target_word]
        most_similar_word_ids = most_similar_words(target_word_id, cs_matrix)
        # 単語とベクトルを表示する
        for rank, similar_word_id in enumerate(most_similar_word_ids, start=1):
            similar_word = id_to_word[similar_word_id]
            similarity = cs_matrix[target_word_id, similar_word_id]
            print(f'TOP {rank}: {similar_word} = {similarity}')
        print('-' * 50)


if __name__ == '__main__':
    main()

上記に名前をつけて保存したら実行してみよう。 CPU を使って学習すると、1 エポックにそれなりの時間がかかるけど、まあなんとかなるレベルのはず。 エポック間には、WordSim353 データセットを使って評価した、単語間の類似度がピアソンの相関係数として表示される。 学習が終わったら、いくつかの単語について、類似したベクトルを持つ単語を上位 5 件について表示させている。

$ python cbow.py

...

1651/1651 [==============================] - 181s 109ms/step - loss: 6.1526 - val_loss: 5.1804
Pearson's r score with WordSim353: 0.21794273571470427
Epoch 2/5
1651/1651 [==============================] - 182s 110ms/step - loss: 4.7424 - val_loss: 5.2353
Pearson's r score with WordSim353: 0.2428972583779604
Epoch 3/5
1651/1651 [==============================] - 172s 104ms/step - loss: 4.4591 - val_loss: 5.3129
Pearson's r score with WordSim353: 0.2583752228928214
Epoch 4/5
1651/1651 [==============================] - 160s 97ms/step - loss: 4.3708 - val_loss: 5.3941
Pearson's r score with WordSim353: 0.25454681209868246
Epoch 5/5
1651/1651 [==============================] - 4038s 2s/step - loss: 4.3027 - val_loss: 5.4816
Pearson's r score with WordSim353: 0.25160892813703517
The most similar words of "you"
TOP 1: we = 0.8296732306480408
TOP 2: they = 0.7543421983718872
TOP 3: i = 0.668086051940918
TOP 4: she = 0.5567612051963806
TOP 5: imbalances = 0.5095677375793457
--------------------------------------------------
The most similar words of "year"
TOP 1: week = 0.7451192140579224
TOP 2: month = 0.7027692794799805
TOP 3: day = 0.6059724688529968
TOP 4: spring = 0.5649460554122925
TOP 5: decade = 0.5465914011001587
--------------------------------------------------
The most similar words of "car"
TOP 1: seed = 0.6106906533241272
TOP 2: taxi = 0.6024419665336609
TOP 3: auto = 0.574679434299469
TOP 4: siemens = 0.5651793479919434
TOP 5: subscriber = 0.5451180934906006
--------------------------------------------------
The most similar words of "toyota"
TOP 1: ford = 0.6328699588775635
TOP 2: celebrity = 0.5823256373405457
TOP 3: humana = 0.5597572922706604
TOP 4: supermarkets = 0.5554667115211487
TOP 5: honda = 0.5467281937599182
--------------------------------------------------

上記をみると、学習データの損失はエポックを重ねるごとに小さくなっているが、検証データの損失は 2 エポック目以降から悪化している。 これは一般的には過学習している状態だが、WordSim353 を使った単語間類似度の評価は 2 エポック目も向上している。 また、ベストなエポックの結果ではないものの、最後に出力している類似の単語については、そこそこ納得感がある結果に思える。

ゼロから作るDeep Learning ❷ ―自然言語処理編

ゼロから作るDeep Learning ❷ ―自然言語処理編

  • 作者:斎藤 康毅
  • 発売日: 2018/07/21
  • メディア: 単行本(ソフトカバー)