人間の思考を彩るのは言語です。言語自体の表現力が乏しいと思考は制限されてしまいます。
つまり、多様な言語から情報をインプットすることで、思考が豊かになります。ただ、一つの言語を学ぶのには多大なリソースを使います。それならば、機械に翻訳してもらおうということで、昨今は翻訳システムの実用化が進んでいます。今後さらに、翻訳サービスが増えていくいことは間違いないでしょう。そこで、今回は翻訳モデルのベースとなっているAttention付きSeq2Seqについて学びます。
今回実行するAttention付きSes2Seqは、PyTorchのチュートリアルにもあります。以下のチュートリアルのまま実行すればよいですが、モデルの内部でどのような処理が行われているか理解できなかったので、このブログで自分なりにまとめました。
Language Translation with TorchText — PyTorch Tutorials 1.7.1 documentation
モデルを構築するために作ったファイルはここ(Git hub)に保存してあります。
翻訳データを取得
翻訳モデルを作るためには、異なる言語の1対1のデータが無ければいけません。torchtextでは、翻訳モデルを構築するためのデータが予め用意されており、簡単に使うことができます。それでは、このデータの使い方に関して見ていきましょう。
まず必要なモジュールを先にインポートします。
必要なモジュールのインポート
from pathlib import Path import pandas as pd import numpy as np import random from typing import Tuple import math import time import torch import torch.nn as nn import torch.optim as optim import torch.nn.functional as F from torch import Tensor from torchtext.datasets import Multi30k from torchtext.data import Field, BucketIterator
torchtextの翻訳データを取得
翻訳データを取得しますが、言語を使った予測モデルを構築するのは大変だと感じる方が多いと思います。そのように感じる理由として、言語データを使ってモデルを学習させるために行う工程が多く、やり方が多様だからだと思います。それをなるべく簡易にするために作られたのがtorchtextです。torchtextモジュールにはFieldクラスがあり、Fieldクラスでは読み込んだデータに施す前処理とその前処理の結果を管理してくれます。これにより一つのクラスに前処理関数とデータが収まるため、とっちらかなくなります。
今回は翻訳データとして、spaCyでトークンに分割されたデータを使います。そのため、まずはspaCyをインストールし、spaCyの言語データをインストールします。spaCyはpipを使って簡単に入れられます。
pip install -U spacy
次に、spaCyでトークン化された翻訳データの英語とドイツ語をインストールします。
python -m spacy download en python -m spacy download de
インストールが終われば、翻訳データが入ったインスタンスを作っていきますが、その前に文章を単語に分割するためのFieldインスタンスを先に定義します。下記のコードでは、SRCインスタンスが入力、TRGインスタンスが出力データになります。下記を見て分かる通り、今回作成するのはドイツ語から英語に翻訳するモデルです。
SRC = Field( tokenize = "spacy", tokenizer_language="de", init_token = '<sos>', eos_token = '<eos>', lower = True ) TRG = Field( tokenize = "spacy", tokenizer_language="en", init_token = '<sos>', eos_token = '<eos>', lower = True )
それでは、翻訳データを取得します。Multi30kのデータは翻訳機械学習用のデータセットであり、指定した言語のデータを取得することができます。下記では、ドイツ語と英語を指定し、fileds引数に上記で指定した文章を分割して小文字に変換するインスタンスを代入しています。
train_data, valid_data, test_data = Multi30k.splits( exts = ('.de', '.en'), fields = (SRC, TRG) )
取得したデータの確認
取得したデータには、「Fieldクラス」と文章を格納する「Example」クラスのインスタンスが入っています。「Field」クラスは先程定義したものです。
train_data.fields
入力データと出力データの両方のfiledが格納されていることがわかります。次に、文章データを確認します。取得したデータには、Exampleクラスがexamplesにリストとして格納されており、Example一つ一つに入力・出力用の単語に分割された文章が入っています。
examples = train_data.examples example = examples[0] src = example.src trg = example.trg print(src) print(trg)
単語のインデックス辞書の作成
文章を単語に適切に分割したデータを習得することができましたが、このままでは学習データとして扱えません。学習データとして扱うためには、数値ベクトルに変換しなければいけず(Embedding)、そのためには各単語をインデックスに変換する必要があります。各単語をインデックスに変換するために、単語とインデックスが対応した辞書を作ります。以下のコマンドにより、train_dataのsrc、trgそれぞれに対して、単語の出現費度が高いものからインデックスをはった辞書ができあがります。このときmin_freq引数で、最小の出現頻度のしきい値を設けています。
SRC.build_vocab(train_data, min_freq = 2) TRG.build_vocab(train_data, min_freq = 2)
文章をインデックス化&バッチサイズ化
単語とインデックスの辞書を作成したので、データをすべてインデックスに変換していきます。これは、BucketIterator.splits関数を使えば簡単にでき、更に、指定したバッチサイズでバッチ化もしてくれます。
BATCH_SIZE = 128
train_iterator, valid_iterator, test_iterator = BucketIterator.splits(
(train_data, valid_data, test_data),
batch_size = BATCH_SIZE
)
上記のコードによって単語がインデックス化されたかを確認します。学習用データの最初のバッチの中身を見てみましょう。
batch = iter(train_iterator).__next__() print(batch.src) print(batch.trg)
各単語がインデックスに変換されており、データのサイズが文章の長さ×バッチサイズになっています。文章の長さが行方向になっているのは、RNNのモデルで時系列を加味して予測するために、各文章の1行目から処理していくからです。
以上で学習データの作成は終わりです。次からは翻訳モデルを構築する話に移ります。
翻訳モデルの構築
翻訳モデルでは、2つのRNNを組み合わせて使います。今回は更にAttentionモデルを組み合わせて精度を上げています。Attentionモデルは、Seq2Seqが長い系列でも予測できるように、どの単語に注力するかを確率的に表すモデルです。Attention付きSeq2Seqモデルのアーキテクチャーは ここに書かれています。
複数のモデルが組み合わさっている場合は、まずモデルの全体像を捉えておくことが大事です。
上図を見て分かる通り、入力データ(本来はインデックス化されたデータ)は、Encoderモデルで変換され、その変換されたものがDecoderモデルで翻訳されています。AttentionモデルはDecoderモデルの中に含まれています。
Encoderモデル
各モデルを定義します。ただ、モデルを定義しているコードだけを見てもちんぷんかんぷんなので、各レイヤーで作られた行列の遷移を見て処理の流れを理解していきます。まずは、入力データを真っ先に受け取るEncoderモデルを定義し、処理フローを理解します。
class Encoder(nn.Module): def __init__(self, input_dim, emb_dim, enc_hid_dim, dec_hid_dim, dropout): super().__init__() self.input_dim = input_dim self.emb_dim = emb_dim self.enc_hid_dim = enc_hid_dim self.dec_hid_dim = dec_hid_dim self.dropout = dropout self.embedding = nn.Embedding(input_dim, emb_dim) self.rnn = nn.GRU( emb_dim, enc_hid_dim, bidirectional = True ) self.fc = nn.Linear(enc_hid_dim * 2, dec_hid_dim) self.dropout = nn.Dropout(dropout) def forward(self, src): embedded = self.dropout(self.embedding(src)) outputs, hidden = self.rnn(embedded) hidden = torch.tanh(self.fc( torch.cat(( hidden[-2, :, :], hidden[-1, :, :]), dim = 1 ) )) return outputs, hidden
このコードを見ただけではどのような処理が行われているかわからないので、モデルの各レイヤーでの出力行列の大きさに着目して、処理フローを描いてみます。
上図を見て分かる通り、入力データはEmbeddingされたのち、rnnレイヤーでoutputsとhiddenの2つの行列が作られます。outputsはこのままEncoderモデルの出力の1つになり、hiddenはfcレイヤーで重みがけをした後にもう一つの出力になります。Encoderモデルはこれで完成です。
Attentionレイヤー
次に、Attentionモデルを見ていきます。このモデルはEncoderのoutputsとhidden(2文字目からはdecoder_hidden)から注視すべき単語を確率的に表すモデルです。このモデルの予測値はencoder_outputsと積和され、注視する単語のベクトルにより重みが与えられるようにします。
class Attention(nn.Module): def __init__(self, enc_hid_dim, dec_hid_dim, attn_dim): super().__init__() self.enc_hid_dim = enc_hid_dim self.dec_hid_dim = dec_hid_dim self.attn_in = (enc_hid_dim * 2) + dec_hid_dim self.attn = nn.Linear(self.attn_in, attn_dim) def forward(self, decoder_hidden, encoder_outputs): src_len = encoder_outputs.shape[0] repeated_decoder_hidden = decoder_hidden.unsqueeze(1).repeat( 1, src_len, 1 ) encoder_outputs = encoder_outputs.permute(1, 0, 2) energy = torch.tanh(self.attn( torch.cat(( repeated_decoder_hidden, encoder_outputs ), dim = 2 ))) attention = torch.sum(energy, dim=2) return F.softmax(attention, dim=1)
Attentionモデルのニューラルネットのレイヤーは1層だけなので、そこまで複雑な処理ではないです。
上図を見て分かる通り、EncoderモデルのRNNのoutputsとhidden(2文字目の予測からはDecoderモデルのRNNのhidden)を入力行列として使います。それを適切に行列変換したあと結合し、attnレイヤーで重みがけしています。そして、その後に、各単語で重みを合算したあと、softmax関数で全体が1になるように調整しています。
Decoderレイヤー
今回このレイヤーを理解するのが一番大変だと思います。このモデルは、Encoderモデルの出力行列と時系列予測のそれまでの結果を利用して、順々に翻訳文字を予測していきます。処理の途中でAttentionモデルとRNNが使われているため、複雑な処理に見えてしまいます。とりあえず、Decoderモデルのコードを見ます。
class Decoder(nn.Module): def __init__(self, output_dim, emb_dim, enc_hid_dim, dec_hid_dim, dropout, attention): super().__init__() self.emb_dim = emb_dim self.enc_hid_dim = enc_hid_dim self.dec_hid_dim = dec_hid_dim self.output_dim = output_dim self.dropout = dropout self.attention = attention self.embedding = nn.Embedding(output_dim, emb_dim) self.rnn = nn.GRU( (enc_hid_dim * 2) + emb_dim, dec_hid_dim ) self.out = nn.Linear(self.attention.attn_in + emb_dim, output_dim) self.dropout = nn.Dropout(dropout) def _weighted_encoder_rep(self, decoder_hidden, encoder_outputs): a = self.attention(decoder_hidden, encoder_outputs) a = a.unsqueeze(1) encoder_outputs = encoder_outputs.permute(1, 0, 2) weighted_encoder_rep = torch.bmm(a, encoder_outputs) weighted_encoder_rep = weighted_encoder_rep.permute(1, 0, 2) return weighted_encoder_rep def forward(self, input, decoder_hidden, encoder_outputs): input = input.unsqueeze(0) embedded = self.dropout(self.embedding(input)) weighted_encoder_rep = self._weighted_encoder_rep( decoder_hidden, encoder_outputs ) rnn_input = torch.cat((embedded, weighted_encoder_rep), dim = 2) output, decoder_hidden = self.rnn(rnn_input, decoder_hidden.unsqueeze(0)) embedded = embedded.squeeze(0) output = output.squeeze(0) weighted_encoder_rep = weighted_encoder_rep.squeeze(0) output = self.out( torch.cat(( output, weighted_encoder_rep, embedded ), dim = 1) ) return output, decoder_hidden.squeeze(0)
内部で関数定義もしているため、やはり複雑ですね。それでは、各レイヤーの出力行列に注目して、図にしてみます。
大きな処理の流れとしては、対象の文字までの予測結果をRNNに流し込むのと、EncoderのRNNのoutputsからAttentionモデルで注視すべき単語を確率で表していることです。その後に、次の文字の予測のための各文字のインデックスの確率を算出しています。この確率が最大のインデックスの文字が翻訳文字になります。
Seq2Seq モデルの合体
今回必要なモデルの定義は、上記3つで終わりました。これらを組み合わせて一つのモデルにします。
class Seq2Seq(nn.Module): def __init__(self, encoder, decoder): super().__init__() self.encoder = encoder self.decoder = decoder def forward(self, src, trg, teacher_forcing_ratio=0.5): batch_size = src.shape[1] max_len = trg.shape[0] trg_vocab_size = self.decoder.output_dim outputs = torch.zeros( max_len, batch_size, trg_vocab_size ) encoder_outputs, hidden = self.encoder(src) # first input to the decoder is the <sos> token output = trg[0,:] for t in range(1, max_len): output, hidden = self.decoder( output, hidden, encoder_outputs ) outputs[t] = output teacher_force = random.random() < teacher_forcing_ratio top1 = output.max(1)[1] output = (trg[t] if teacher_force else top1) return outputs
これで今回必要とする翻訳モデルの定義は終わりです。このクラスでは、これまで定義したモデルを組み合わせているだけなので、複雑な処理はないです。Decoderモデルでは、予測した文字を次の文字を予測するために使用しています。forループで囲まれているところがその処理です。
モデルの構築に必要な関数の定義
上記まででモデル構築の骨格になるクラス定義は完成しました。ここでは、モデル実行時に必要なパラメータの初期化関数とパラメータの要素数を合計する関数を定義します。
パラメータの初期化関数
def init_weights(m): for name, param in m.named_parameters(): if 'weight' in name: nn.init.normal_(param.data, mean=0, std=0.01) else: nn.init.constant_(param.data, 0)
要素数を合計する関数
def count_parameters(model): return sum(p.numel() for p in model.parameters() if p.requires_grad)
翻訳モデルのインスタンスを作る
これまでのコードで翻訳データと翻訳モデルのクラス定義が終わりました。なので、定義したクラスの引数に合わせてインスタンスを作成すれば、いよいよ翻訳モデルを学習することができます。それでは、翻訳モデルのインスタンスを作りましょう。
翻訳モデルのハイパーパラメータの定義
INPUT_DIM = len(SRC.vocab) # 入力データの単語数 OUTPUT_DIM = len(TRG.vocab) # 出力データの単語数 ENC_EMB_DIM = 32 # Encoder用のembeddingの次元数 DEC_EMB_DIM = 32 # Decoder用のembeddingの次元数 ENC_HID_DIM = 64 # Encoder用の隠れ層の次元数 DEC_HID_DIM = 64 # Decoder用の隠れ層の次元数 ATTN_DIM = 8 # Attentionの隠れ層の次元数 ENC_DROPOUT = 0.5 # Encoder用のDropout確率 DEC_DROPOUT = 0.5 # Decoder用のDropout確率
翻訳モデルのインスタンスの作成
enc = Encoder( input_dim=INPUT_DIM, emb_dim=ENC_EMB_DIM, enc_hid_dim=ENC_HID_DIM, dec_hid_dim=DEC_HID_DIM, dropout=ENC_DROPOUT ) attn = Attention( enc_hid_dim=ENC_HID_DIM, dec_hid_dim=DEC_HID_DIM, attn_dim=ATTN_DIM ) dec = Decoder( output_dim=OUTPUT_DIM, emb_dim=DEC_EMB_DIM, enc_hid_dim=ENC_HID_DIM, dec_hid_dim=DEC_HID_DIM, dropout=DEC_DROPOUT, attention=attn ) model = Seq2Seq( encoder=enc, decoder=dec ) model.apply(init_weights) optimizer = optim.Adam(model.parameters())
上記のコードでモデルのインスタンスの作成は完成です。modelインスタンスがAttention付きSeq2Seqモデルになっています。このインスタンスにデータを与えることで、翻訳できるように学習していきます。ここまで長い道のりでしたが、ついにモデルの学習まで進むことができました。
翻訳モデルの学習
翻訳モデルのインスタンスができたので、学習データを与えてモデルを賢くしていきましょう。そのために、モデルを学習する関数とモデルの精度を評価する関数を定義します。
モデルを学習する関数
各バッチごとに引数criterionに定義した損失関数に基づいて、backward関数でバックプロパゲーションをしています。modelインスタンスに翻訳モデルが収まっているため、train関数内の処理はスッキリしています。
def train(model, iterator, optimizer, criterion, clip): # モデルを学習モードにする model.train() epoch_loss = 0 for _, batch in enumerate(iterator): src = batch.src trg = batch.trg optimizer.zero_grad() output = model(src, trg) output = output[1:].view(-1, output.shape[-1]) trg = trg[1:].view(-1) loss = criterion(output, trg) loss.backward() torch.nn.utils.clip_grad_norm_(model.parameters(), clip) optimizer.step() epoch_loss += loss.item() return epoch_loss / len(iterator)
モデルを評価する関数
次にモデルを評価する関数を定義します。train関数と違うところは、model.eval()関数でモデルを確認モードにし、torch.no_grad()でパラメータの保存をやめるようにししてます。なぜ、torch.no_grad()が必要かを補足すると、Pytorchではモデルの学習時、forward関数において勾配計算用のパラメータを保存してバックプロパゲーションの計算の高速化を図っており、これをモデルの評価の際には行わないようにするためです。
def evaluate(model, iterator, criterion): # 確認モードに切り替える(Dropoutを行わないなどの切り替え) model.eval() epoch_loss = 0 with torch.no_grad(): for _, batch in enumerate(iterator): src = batch.src trg = batch.trg output = model(src, trg, 0) #turn off teacher forcing output = output[1:].view(-1, output.shape[-1]) trg = trg[1:].view(-1) loss = criterion(output, trg) epoch_loss += loss.item() return epoch_loss / len(iterator)
翻訳モデル学習時間の整形関数
モデルの学習に要した時間を分と秒に分けるための関数です。
def epoch_time(start_time, end_time): elapsed_time = end_time - start_time elapsed_mins = int(elapsed_time / 60) elapsed_secs = int(elapsed_time - (elapsed_mins * 60)) return elapsed_mins, elapsed_secs
長かったですが、遂にモデルの学習までたどり着きました。モデルのインスタンスを作り、学習用/評価用ともに関数化しているので、学習を行う下記のコードも処理の流れはシンプルです。
N_EPOCHS = 30 CLIP = 1 PAD_IDX = TRG.vocab.stoi['<pad>'] criterion = nn.CrossEntropyLoss(ignore_index=PAD_IDX) best_valid_loss = float('inf') train_loss_list = [] valid_loss_list = [] for epoch in range(N_EPOCHS): start_time = time.time() train_loss = train( model, train_iterator, optimizer, criterion, CLIP ) valid_loss = evaluate( model, valid_iterator, criterion ) end_time = time.time() epoch_mins, epoch_secs = epoch_time(start_time, end_time) print(f'Epoch: {epoch+1:02} | Time: {epoch_mins}m {epoch_secs}s') print(f'\tTrain Loss: {train_loss:.3f} | Train PPL: {math.exp(train_loss):7.3f}') print(f'\t Val. Loss: {valid_loss:.3f} | Val. PPL: {math.exp(valid_loss):7.3f}') train_loss_list.append(train_loss) valid_loss_list.append(valid_loss) test_loss = evaluate(model, test_iterator, criterion) print(f'| Test Loss: {test_loss:.3f} | Test PPL: {math.exp(test_loss):7.3f} |')
上図のように、1epochずつ学習していきます。
損失値の推移の確認
学習時と評価時の損失値をデータフレームにして、seabornモジュールでも、ローカルでも扱えるようにしています。
result = pd.DataFrame({ 'epoch' : np.arange(1, len(train_loss_list)+1), 'train_loss' : train_loss_list, 'valid_loss' : valid_loss_list }) result.to_csv('model_result.csv', index=False)
上記の結果を見て分かる通り、学習するにつれ学習用データ・確認用データの両方において損失値が減少していることがわかります。もう少し学習回数を増やしても精度が上がりそうですね。
参考文献
今回のテーマは翻訳モデルを作る際の基礎的なお話でしたが、pytorchを使って姿勢推定や動画分類などに関してはこちらの本が非常にわかりやすかったです。