(2021-02-04 追記): ニューラルネットワークのアーキテクチャで、出力側の Embedding が誤って Dense になっていた部分を修正した。
Word2Vec の CBOW (Continuous Bag-of-Words) は、単語の分散表現 (Word Embedding) を得るために用いられるニューラルネットワークのアーキテクチャのひとつ。 今回は、それを TensorFlow/Keras を使って実装してみた。 なお、ニューラルネットワークのアーキテクチャは、オライリーの「ゼロから作るDeep Learning ❷ ――自然言語処理編」を参考にしている。
ゼロから作るDeep Learning ❷ ―自然言語処理編
- 作者:斎藤 康毅
- 発売日: 2018/07/21
- メディア: 単行本(ソフトカバー)
ただし、あらかじめ断っておくと実用性はそれほどない。 実用上は gensim とかを使った方が学習は速いし、得られる分散表現も良い性能になるはず。
使った環境は次のとおり。
$ sw_vers ProductName: macOS ProductVersion: 11.2 BuildVersion: 20D64 $ python -V Python 3.8.7 $ pip list | grep "tensorflow " tensorflow 2.4.1
もくじ
下準備
あらかじめ、必要なパッケージをインストールしておく。
$ pip install tensorflow gensim scipy
また、学習するデータセットをダウンロードしておく。 今回は PTB (Penn Treebank) をコーパスに用いる。
$ wget https://raw.githubusercontent.com/tomsercu/lstm/master/data/ptb.train.txt $ wget https://raw.githubusercontent.com/tomsercu/lstm/master/data/ptb.valid.txt
サンプルコード
早速だけど、以下にサンプルコードを示す。
ニューラルネットワークは SimpleContinuousBagOfWords
という名前で実装した。
CBOW は、ある単語の左右に出現する単語から、その単語を推定するアーキテクチャになっている。
そのため、ネットワークに供給するデータは左右に出現する 2 つの単語になる。
そして、どんな単語が出るかを One-Hot 形式の予測値として出力している。
肝心の単語埋め込みは、ネットワーク内にある Embedding レイヤーの重み (Weights) として得られる。
#!/usr/bin/env python3 # -*- coding: utf-8 -*- from __future__ import annotations import re from itertools import count from typing import Iterable from typing import Iterator from functools import partial import numpy as np # type: ignore import tensorflow as tf # type: ignore from tensorflow.keras import Model # type: ignore from tensorflow.keras.layers import Embedding # type: ignore from tensorflow import Tensor # type: ignore from tensorflow.data import Dataset # type: ignore from tensorflow.keras.losses import CategoricalCrossentropy # type: ignore from tensorflow.keras.optimizers import Adam # type: ignore from tensorflow.keras.layers import Dense # type: ignore from tensorflow.keras.layers import Softmax # type: ignore from tensorflow.keras.layers import Layer # type: ignore from tensorflow.keras.models import Sequential # type: ignore from tensorflow.keras.callbacks import Callback # type: ignore from gensim.test.utils import datapath # type: ignore from scipy.stats import pearsonr # type: ignore class OutputEmbedding(Layer): """出力側の単語埋め込み層""" def __init__(self, vocab_size: int): super().__init__() self.vocab_size = vocab_size def build(self, input_shape): # 埋め込み次元数 self.embedding_size = input_shape[1] # 出力部分に使うので、通常の Embedding とは shape が転置している self.embedding = self.add_weight(shape=(self.embedding_size, self.vocab_size)) def call(self, inputs, **kwargs): # 重みは最初から転置してあるので transpose する必要はない # (NxD) * (DxV) = (NxV) return tf.tensordot(inputs, self.embedding, axes=1) class SimpleContinuousBagOfWords(Model): """Word2Vec の CBOW モデルを実装したクラス""" def __init__(self, vocab_size: int, embedding_size: int): super().__init__() # 入力側の単語埋め込み層 self.embedding_layer = Embedding(input_dim=vocab_size, input_shape=(1, ), output_dim=embedding_size, name='word_embedding', ) self.output_layer = Sequential([ OutputEmbedding(vocab_size), Softmax(), ]) def call(self, inputs: Tensor) -> Tensor: # 推定したい単語の左側にある単語のベクトルを取り出す left_context = inputs[:, 0] left_vector = self.embedding_layer(left_context) # 推定したい単語の右側にある単語のベクトルを取り出す right_context = inputs[:, 1] right_vector = self.embedding_layer(right_context) # 両方の単語に対応するベクトルを足して 2 で割る mixed_vector = (left_vector + right_vector) / 2 # 出力側の Embedding と積を取ってから単語の出現確率にする prediction = self.output_layer(mixed_vector) return prediction class WordSimilarity353Callback(Callback): """WordSim353 データセットを使って単語間の類似度を評価するコールバック""" def __init__(self, word_id_table: dict[str, int]): super().__init__() self.word_id_table = word_id_table self.model = None # 評価用データを読み込む self.eval_data = [] wordsim_filepath = datapath('wordsim353.tsv') with open(wordsim_filepath, mode='r') as fp: # 最初の 2 行はヘッダなので読み飛ばす fp.readline() fp.readline() for line in fp: word1, word2, sim_score = line.strip().split('\t') self.eval_data.append((word1, word2, float(sim_score))) def set_model(self, model): self.model = model def _cosine_similarity(self, x, y): nx = x / np.sqrt(np.sum(x ** 2)) ny = y / np.sqrt(np.sum(y ** 2)) return np.dot(nx, ny) def on_epoch_end(self, epoch, logs=None): # モデルから学習させたレイヤーの重みを取り出す model_layers = {layer.name: layer for layer in self.model.layers} embedding_layer = model_layers['word_embedding'] word_vectors = embedding_layer.weights[0].numpy() # 評価用データセットに含まれる単語間の類似度を計算する labels = [] preds = [] for word1, word2, sim_score in self.eval_data: # Out-of-Vocabulary な単語はスキップ if word1 not in self.word_id_table or word2 not in self.word_id_table: continue # コサイン類似度を計算する word1_vec = word_vectors[self.word_id_table[word1]] word2_vec = word_vectors[self.word_id_table[word2]] pred = self._cosine_similarity(word1_vec, word2_vec) preds.append(pred) # 正解ラベル labels.append(sim_score) # ピアソンの相関係数を求める r_score = pearsonr(labels, preds)[0] print(f'Pearson\'s r score with WordSim353: {r_score}') def load_corpus(filepath: str) -> Iterator[str]: """テキストファイルからコーパスを読み出す""" tag_replace = re.compile('<.*?>') with open(filepath, mode='r') as fp: for line in fp: # コーパスに含まれる <unk> や <eos> は取り除きたい yield re.sub(tag_replace, '', line) def sentences_to_words(sentences: Iterable[str], lower: bool = True) -> Iterator[list[str]]: """文章を単語に分割する""" for sentence in sentences: if lower: sentence = sentence.lower() words = re.split('\\W+', sentence) yield [word for word in words if len(word) > 0] # 空文字は取り除く def word_id_mappings(sentences: Iterable[Iterable[str]]) -> dict[str, int]: """単語を ID に変換する対応テーブルを作る""" counter = count(start=0) word_to_id = {} for sentence in sentences: for word in sentence: if word in word_to_id: # 登録済みの単語 continue # 単語の識別子を採番する word_id = next(counter) word_to_id[word] = word_id return word_to_id def words_to_ids(sentences: Iterable[list[str]], word_to_id: dict[str, int]) -> Iterator[list[int]]: # 単語を対応する ID に変換する for words in sentences: # NOTE: Out-of-Vocabulary への対応がない yield [word_to_id[word] for word in words] def cosine_similarity_matrix(word_vectors: np.ndarray, eps: float = 1e-8) -> np.ndarray: """単語ベクトル間のコサイン類似度を計算する""" word_norm = np.sqrt(np.sum(word_vectors ** 2, axis=1)).reshape(word_vectors.shape[0], -1) normalized_word_vectors = word_vectors / (word_norm + eps) cs_matrix = np.dot(normalized_word_vectors, normalized_word_vectors.T) return cs_matrix def most_similar_words(word_id: int, cs_matrix: np.ndarray, top_n: int = 5): """コサイン類似度が最も高い単語の ID を取り出す""" similarities = cs_matrix[word_id, :] similar_word_ids = np.argsort(similarities)[::-1] # 基本的にコサイン類似度が最も高い ID は自分自身になるはずなので先頭は取り除く return similar_word_ids[1: top_n + 1] def extract_contexts(word_ids: Tensor, window_size: int = 1) -> Tensor: """コンテキストの単語をラベル形式で得る""" right_contexts = word_ids[window_size + 1:] left_contexts = word_ids[:-window_size - 1] contexts = tf.transpose([left_contexts, right_contexts]) return contexts def extract_targets(word_ids: Tensor, vocab_size: int, window_size: int = 1) -> Tensor: """ターゲットの単語を One-Hot 形式にする""" targets = word_ids[window_size:-window_size] # 正解との損失を計算するために One-Hot 表現にする one_hot_targets = tf.one_hot(targets, depth=vocab_size) return one_hot_targets def dataset_pipeline(corpus_words: Iterator[list[str]], word_id_table: dict[str, int]) -> Dataset: """ターゲットとコンテキストを供給するパイプライン""" # ID に変換したコーパスを行ごとに読み出せるデータセット word_ids_ds = Dataset.from_generator(lambda: words_to_ids(corpus_words, word_id_table), tf.int32, output_shapes=[None]) # コンテキストを抽出するパイプライン contexts_ds = word_ids_ds.map(extract_contexts, num_parallel_calls=tf.data.AUTOTUNE, # ターゲットと対応関係を作る必要があるので決定論的に動作させる deterministic=True) # ターゲットを抽出するパイプライン vocab_size = len(word_id_table.keys()) f = partial(extract_targets, vocab_size=vocab_size) targets_ds = word_ids_ds.map(f, num_parallel_calls=tf.data.AUTOTUNE, # コンテキストと対応関係を作る必要があるので決定論的に動作させる deterministic=True) # 行単位で処理されているので flatten して結合する zipped_ds = Dataset.zip((contexts_ds.unbatch(), targets_ds.unbatch())) return zipped_ds def count_words(corpus_words: Iterator[list[str]]) -> int: """コーパスに含まれる単語の数をカウントする""" return sum(len(words) for words in corpus_words) def main(): # Penn Treebank コーパスを読み込む train_sentences = load_corpus('ptb.train.txt') valid_sentences = load_corpus('ptb.valid.txt') # コーパスを単語に分割する train_corpus_words = list(sentences_to_words(train_sentences)) valid_corpus_words = list(sentences_to_words(valid_sentences)) # 単語 -> ID word_to_id = word_id_mappings(train_corpus_words) # コーパスの語彙数 vocab_size = len(word_to_id.keys()) # モデルとタスクを定義する EMBEDDING_SIZE = 50 criterion = CategoricalCrossentropy() optimizer = Adam(learning_rate=1e-2) model = SimpleContinuousBagOfWords(vocab_size, EMBEDDING_SIZE) model.compile(optimizer=optimizer, loss=criterion) # データセットを準備する TRAIN_BATCH_SIZE = 1024 train_ds = dataset_pipeline(train_corpus_words, word_to_id) train_ds = train_ds.shuffle(buffer_size=512) train_ds = train_ds.repeat() train_ds = train_ds.batch(TRAIN_BATCH_SIZE) train_ds = train_ds.prefetch(buffer_size=tf.data.AUTOTUNE) # 検証用データはリピートしない VALID_BATCH_SIZE = 1024 valid_ds = dataset_pipeline(valid_corpus_words, word_to_id) valid_ds = valid_ds.batch(VALID_BATCH_SIZE) valid_ds = valid_ds.prefetch(buffer_size=tf.data.AUTOTUNE) # 厳密に要素数を計算するのは面倒なので、ざっくり単語数の 2 倍をエポックということにしておく num_of_train_samples = count_words(train_corpus_words) * 2 callbacks = [ # WordSim353 データセットを使って単語間の類似度を相関係数で確認する WordSimilarity353Callback(word_to_id), ] # 学習する model.fit(train_ds, steps_per_epoch=num_of_train_samples // TRAIN_BATCH_SIZE, validation_data=valid_ds, epochs=5, callbacks=callbacks, verbose=1, ) # モデルから学習させたレイヤーの重みを取り出す model_layers = {layer.name: layer for layer in model.layers} embedding_layer = model_layers['word_embedding'] word_vectors = embedding_layer.weights[0].numpy() # 単語を表すベクトル間のコサイン類似度を計算する cs_matrix = cosine_similarity_matrix(word_vectors) # ID -> 単語 id_to_word = {value: key for key, value in word_to_id.items()} # いくつか似ているベクトルを持った単語を確認してみる example_words = ['you', 'year', 'car', 'toyota'] for target_word in example_words: # ID に変換した上で最も似ている単語とそのベクトルを取り出す print(f'The most similar words of "{target_word}"') target_word_id = word_to_id[target_word] most_similar_word_ids = most_similar_words(target_word_id, cs_matrix) # 単語とベクトルを表示する for rank, similar_word_id in enumerate(most_similar_word_ids, start=1): similar_word = id_to_word[similar_word_id] similarity = cs_matrix[target_word_id, similar_word_id] print(f'TOP {rank}: {similar_word} = {similarity}') print('-' * 50) if __name__ == '__main__': main()
上記に名前をつけて保存したら実行してみよう。 CPU を使って学習すると、1 エポックにそれなりの時間がかかるけど、まあなんとかなるレベルのはず。 エポック間には、WordSim353 データセットを使って評価した、単語間の類似度がピアソンの相関係数として表示される。 学習が終わったら、いくつかの単語について、類似したベクトルを持つ単語を上位 5 件について表示させている。
$ python cbow.py ... 1651/1651 [==============================] - 181s 109ms/step - loss: 6.1526 - val_loss: 5.1804 Pearson's r score with WordSim353: 0.21794273571470427 Epoch 2/5 1651/1651 [==============================] - 182s 110ms/step - loss: 4.7424 - val_loss: 5.2353 Pearson's r score with WordSim353: 0.2428972583779604 Epoch 3/5 1651/1651 [==============================] - 172s 104ms/step - loss: 4.4591 - val_loss: 5.3129 Pearson's r score with WordSim353: 0.2583752228928214 Epoch 4/5 1651/1651 [==============================] - 160s 97ms/step - loss: 4.3708 - val_loss: 5.3941 Pearson's r score with WordSim353: 0.25454681209868246 Epoch 5/5 1651/1651 [==============================] - 4038s 2s/step - loss: 4.3027 - val_loss: 5.4816 Pearson's r score with WordSim353: 0.25160892813703517 The most similar words of "you" TOP 1: we = 0.8296732306480408 TOP 2: they = 0.7543421983718872 TOP 3: i = 0.668086051940918 TOP 4: she = 0.5567612051963806 TOP 5: imbalances = 0.5095677375793457 -------------------------------------------------- The most similar words of "year" TOP 1: week = 0.7451192140579224 TOP 2: month = 0.7027692794799805 TOP 3: day = 0.6059724688529968 TOP 4: spring = 0.5649460554122925 TOP 5: decade = 0.5465914011001587 -------------------------------------------------- The most similar words of "car" TOP 1: seed = 0.6106906533241272 TOP 2: taxi = 0.6024419665336609 TOP 3: auto = 0.574679434299469 TOP 4: siemens = 0.5651793479919434 TOP 5: subscriber = 0.5451180934906006 -------------------------------------------------- The most similar words of "toyota" TOP 1: ford = 0.6328699588775635 TOP 2: celebrity = 0.5823256373405457 TOP 3: humana = 0.5597572922706604 TOP 4: supermarkets = 0.5554667115211487 TOP 5: honda = 0.5467281937599182 --------------------------------------------------
上記をみると、学習データの損失はエポックを重ねるごとに小さくなっているが、検証データの損失は 2 エポック目以降から悪化している。 これは一般的には過学習している状態だが、WordSim353 を使った単語間類似度の評価は 2 エポック目も向上している。 また、ベストなエポックの結果ではないものの、最後に出力している類似の単語については、そこそこ納得感がある結果に思える。
ゼロから作るDeep Learning ❷ ―自然言語処理編
- 作者:斎藤 康毅
- 発売日: 2018/07/21
- メディア: 単行本(ソフトカバー)
- 作者:もみじあめ
- 発売日: 2020/02/29
- メディア: Kindle版