CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: 正の相互情報量 (PPMI) と特異値分解 (SVD) を使った単語の分散表現

(2021-02-02 追記): 共起行列の計算を NumPy の Integer array indexing を使った実装にした

オライリーの「ゼロから作るDeep Learning ❷ ――自然言語処理編」を読んでいる。 この中に、カウントベースで計算する初歩的な単語の分散表現が紹介されていて、なかなか面白かった。 単語の分散表現を得る方法は色々とあるけど、カウントベースは単語の出現数を元にした比較的シンプルなもの。 今回は、正の相互情報量 (Positive Pointwise Mutual Information; PPMI) と特異値分解 (Singular Value Decomposition; SVD) を使って単語の分散表現を獲得する例を実装してみた。 書籍に記載されている内容を参考にしつつも、自分なりのコードにしてある。 オリジナルとの差分で大きいのは、正の相互情報量と単語間のコサイン類似度を計算する部分を NumPy のベクトル演算にしたところかな。

使った環境は次のとおり。

$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.15.7
BuildVersion:   19H114
$ python -V               
Python 3.8.7

もくじ

下準備

あらかじめ、必要なパッケージをインストールしておく。

$ pip install scikit-learn

コーパスは、前述の書籍の中で用いられていた「Penn Treebank (PTB)」を使う。 こちらも、あらかじめダウンロードしておこう。

$ wget https://raw.githubusercontent.com/tomsercu/lstm/master/data/ptb.train.txt

サンプルコード

以下に実装したサンプルコードを示す。 コメントもつけてあるけど、サンプルコードでは次のような流れで処理している。

  • PTB コーパスを読み込む
  • 単語を ID に変換する
    • 語彙 (Vocabulary) を構築する
  • 共起行列を計算する
    • 文章の中で、ある単語と別の単語が共に用いられた回数を表している
      • つまり文の中で近い位置に登場した、ということ
  • 正の相互情報量を計算する
    • ある単語と別の単語が共に用いられる度合いを表している
  • 特異値分解して単語の分散表現を得る
    • 相互情報量を複数の行列に分解すると共に次元圧縮をかける
  • いくつかの単語について類似の分散表現を持った単語を調べる
    • 分散表現の近さをコサイン類似度で測る
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from __future__ import annotations

import re
from itertools import count
from typing import Iterable
from typing import Iterator

import numpy as np  # type: ignore
from sklearn.utils.extmath import randomized_svd  # type: ignore


def load_corpus(filepath: str) -> Iterator[str]:
    """テキストファイルからコーパスを読み出す"""
    tag_replace = re.compile('<.*?>')
    with open(filepath, mode='r') as fp:
        for line in fp:
            # コーパスに含まれる <unk> や <eos> は取り除きたい
            yield re.sub(tag_replace, '', line)


def sentences_to_words(sentences: Iterable[str], lower: bool = True) -> Iterator[list[str]]:
    """文章を単語に分割する"""
    for sentence in sentences:
        if lower:
            sentence = sentence.lower()
        words = re.split('\\W+', sentence)
        yield [word for word in words if len(word) > 0]  # 空文字は取り除く


def word_id_mappings(sentences: Iterable[Iterable[str]]) -> dict[str, int]:
    """単語を ID に変換する対応テーブルを作る"""
    counter = count(start=0)

    word_to_id = {}
    for sentence in sentences:
        for word in sentence:

            if word in word_to_id:
                # 登録済みの単語
                continue

            # 単語の識別子を採番する
            word_id = next(counter)
            word_to_id[word] = word_id

    return word_to_id


def words_to_ids(sentences: Iterable[list[str]], word_to_id: dict[str, int]) -> Iterator[list[int]]:
    # 単語を対応する ID に変換する
    for words in sentences:
        # NOTE: Out-of-Vocabulary への対応がない
        yield [word_to_id[word] for word in words]


def co_occurence_matrix(corpus_ids: Iterable[list[int]], vocab_size: int, window_size: int) -> np.ndarray:
    """共起行列 (Co-occurence Matrix) を計算する"""
    co_matrix = np.zeros((vocab_size, vocab_size), dtype=np.int32)

    for word_ids in corpus_ids:
        for context_len in range(1, window_size + 1):
            # 元の ID 列と、コンテキスト分をずらした ID 列を用意する
            base = word_ids[context_len:]
            shifted = word_ids[:-context_len]
            # 元の列とずらした列の対応を取れば、共起した単語を Integer array indexing で処理できる
            co_matrix[base, shifted] += 1
            co_matrix[shifted, base] += 1

    return co_matrix


def positive_pointwise_mutual_information(co_matrix: np.ndarray, eps: float = 1e-8):
    # 正の相互情報量 (Positive Pointwise Mutual Information) を計算する
    total_words = np.sum(co_matrix)
    word_count = np.sum(co_matrix, axis=0)

    # PPMI = max(PMI, 0)
    # PMI = log2(P(x, y) / (P(x) * P(y)) = log2(C(x, y) * N / (C(x) * C(y)))
    # P(x) = C(x) / N
    # P(x, y) = C(x, y) / N

    # ベクトル化した計算
    ppmi_matrix = co_matrix.copy().astype(np.float32)
    ppmi_matrix *= total_words
    ppmi_matrix /= word_count * word_count.reshape(word_count.shape[0], -1) + eps
    ppmi_matrix = np.log2(ppmi_matrix + eps)
    ppmi_matrix = np.clip(ppmi_matrix, 0., None)

    return ppmi_matrix


def cosine_similarity_matrix(word_vectors: np.ndarray, eps: float = 1e-8) -> np.ndarray:
    """単語ベクトル間のコサイン類似度を計算する"""
    word_norm = np.sqrt(np.sum(word_vectors ** 2, axis=1)).reshape(word_vectors.shape[0], -1)
    normalized_word_vectors = word_vectors / (word_norm + eps)
    cs_matrix = np.dot(normalized_word_vectors, normalized_word_vectors.T)
    return cs_matrix


def most_similar_words(word_id: int, cs_matrix: np.ndarray, top_n: int = 5):
    """コサイン類似度が最も高い単語の ID を取り出す"""
    similarities = cs_matrix[word_id, :]
    similar_word_ids = np.argsort(similarities)[::-1]
    # 基本的にコサイン類似度が最も高い ID は自分自身になるはずなので先頭は取り除く
    return similar_word_ids[1: top_n + 1]


def main():
    # Penn Treebank コーパスを読み込む
    corpus_sentences = load_corpus('ptb.train.txt')

    # コーパスを単語に分割する
    corpus_words = list(sentences_to_words(corpus_sentences))

    # 単語 -> ID
    word_to_id = word_id_mappings(corpus_words)

    # コーパスの語彙数
    vocab_size = len(word_to_id.keys())

    # コーパスの単語を ID に変換する
    corpus_ids = words_to_ids(corpus_words, word_to_id)

    # 共起行列を計算する
    co_matrix = co_occurence_matrix(corpus_ids, vocab_size, window_size=2)

    # PPMI を計算する
    ppmi_matrix = positive_pointwise_mutual_information(co_matrix)

    # 特異値分解 (Singular Value Decomposition; SVD) する
    # 特異値の大きな次元を基準に先頭を取り出すことで次元圧縮する
    word_vectors, _, _ = randomized_svd(ppmi_matrix, n_components=100)

    # 単語を表すベクトル間のコサイン類似度を計算する
    cs_matrix = cosine_similarity_matrix(word_vectors)

    # ID -> 単語
    id_to_word = {value: key for key, value in word_to_id.items()}

    # いくつか似ているベクトルを持った単語を確認してみる
    example_words = ['you', 'year', 'car', 'toyota']
    for target_word in example_words:
        # ID に変換した上で最も似ている単語とそのベクトルを取り出す
        print(f'The most similar words of "{target_word}"')
        target_word_id = word_to_id[target_word]
        most_similar_word_ids = most_similar_words(target_word_id, cs_matrix)
        # 単語とベクトルを表示する
        for rank, similar_word_id in enumerate(most_similar_word_ids, start=1):
            similar_word = id_to_word[similar_word_id]
            similarity = cs_matrix[target_word_id, similar_word_id]
            print(f'TOP {rank}: {similar_word} = {similarity}')
        print('-' * 50)


if __name__ == '__main__':
    main()

上記を適当な名前のモジュールとして保存した上で実行してみよう。 サンプルコードでは「you」「year」「car」「toyota」について類似の分散表現を持つ単語を調べている。

$ python pmisvd.py      
The most similar words of "you"
TOP 1: we = 0.7093088626861572
TOP 2: i = 0.699989378452301
TOP 3: yourself = 0.5935164093971252
TOP 4: anybody = 0.5696209073066711
TOP 5: ll = 0.537631094455719
--------------------------------------------------
The most similar words of "year"
TOP 1: month = 0.727214515209198
TOP 2: earlier = 0.6935610771179199
TOP 3: period = 0.6458954215049744
TOP 4: next = 0.6107259392738342
TOP 5: fiscal = 0.5819855332374573
--------------------------------------------------
The most similar words of "car"
TOP 1: auto = 0.706072986125946
TOP 2: luxury = 0.6855229735374451
TOP 3: vehicle = 0.48040351271629333
TOP 4: cars = 0.46684157848358154
TOP 5: truck = 0.4660819172859192
--------------------------------------------------
The most similar words of "toyota"
TOP 1: motor = 0.722280740737915
TOP 2: motors = 0.6682790517807007
TOP 3: nissan = 0.6666470170021057
TOP 4: honda = 0.6519786715507507
TOP 5: lexus = 0.6209456920623779
--------------------------------------------------

見ると、意味的に近い単語が類似した分散表現を持つように学習できていることがわかる。 「toyota」の類似語に「motor」と「motors」が別々に出てきてしまっているのはステミング (Stemming) の処理が入っていないため。

サンプルコードで共起行列を作るのに使ったウィンドウサイズは 2 ということを前提にして考えたとき、次のような感想を抱いた。

  • そんなに共起するか?っていう「you」と「we」や「i」が類似した分散表現になっているのは面白い
  • 「toyota」の類似語に「nissan」や「honda」があるところには納得感がある
    • 同じ文章の中で並べられることも多いだろうし

ところで、書籍の中には共起行列で特定の単語に対応する行ベクトルもカウントベースの分散表現の一種とあった。 個人的にその認識はなかったので、読んだときには「なるほど」と感じた。

ゼロから作るDeep Learning ❷ ―自然言語処理編

ゼロから作るDeep Learning ❷ ―自然言語処理編

  • 作者:斎藤 康毅
  • 発売日: 2018/07/21
  • メディア: 単行本(ソフトカバー)