Attention 탄생 배경
- Long-term Dependency: sequence to sequence 구조에서 encoder가 입력 sequence를 고정길이 context vector로 압축하는 과정에서 정보가 손실되어서 이전의 정보를 잊는다. -> attention의 등장 배경
- Attention은 문맥에 따라 집중할 단어를 결정하는 방식으로 문맥을 최대한 고려할 수 있는 방식이다.
Attention 이해하기
- 한글을 영어로 번역하는 task에서 Decoder의 현재 time step을 밑에 그림에서 <sos> 다음인 I 라고 가정해 보자.
- 기존의 방식 (Attention 없음): 디코더가 인코더로부터 전달받은 압축된 context 벡터를 기반으로 번역을 수행한다. 디코더는 타임스텝 I에서 어떤 한국어 단어와 관련이 있는지 문맥적으로 파악하기 어려울 수 있다. 즉, 고정된 context 벡터가 모든 소스 단어의 중요도를 충분히 반영하지 못할 수 있다.
- Attention방식: Attention 메커니즘을 적용하면, 디코더는 타임스텝 I 에서 각 소스 단어의 중요도를 동적으로 계산할 수 있다. 이는 디코더는 인코더의 출력에서 현재 타임스텝과 관련된 단어의 중요도를 평가하고, 이 중요도를 기반으로 번역을 수행하여 보다 정확한 번역 결과를 얻을 수 있다.
- Key = Value: Encoder의 hidden state
- Query : 현재 time step에서의 Decoder의 hidden state
- 예시로 설명:
- 번역을 하기 위해 Decoder에서 I 가들이 왔을 때 다음단어 ate를 예측해야 한다. 이때 I의 hidden state(Query)와 encoder의 모든 hidden state(Key)를 dot-product를 수행하여 Attention score를 구한다. 이후 softmax를 통과하여 Attention Distribution을 구한다. 여기서 Long-term dependency를 해결한 핵심이다. 즉, Decoder의 time step마다 Attention Distribution을 계산하기 때문에 각 time step마다 집중적으로 봐야 하는 단어가 바뀌기 때문이다. 이후 softmax 취한 결과와 encoder의 모든 hidden state(Value)를 Weighted sum을 하여 Context Vector를 생성한다. 이후 I의 hidden state와 concatenate 하여 선형변환(Linear)과 tanh을 적용한다. 그 벡터를 Decoder의 단어 사전 길이만큼 선형변환(Linear)을 거친 후 softmax를 취하여 가장 확률이 높은 단어를 예측하게 된다.
- 위와 같은 방법으로 <s> 토큰을 받아서 </s>가 나올 때까지 위 과정을 반복하면 encoder input으로 번역하고 싶은 문장을 넣으면 Decoder output으로 번역한 문장이 나오는 구조이다.
Attention 코드로 이해하기
- LSTM을 이용하여 Dot-product 대신 Bahdanau attention을 이용하여 Encoder, Decoder 구현
- torch의 차원에 유의하며 정리한다.
- 여기선 코드로 Attention이 어떻게 동작하는지 보기 위함으로 나머지는 생략
- Encoder
- layer 수는 2와 bidirectional을 사용한 LSTM구조이다. Encoder 구조는 평이하고 Decoder가 중요하다.
- pack_padded_sequence: 변동 길이의 시퀀스가 있을 때, 시퀀스의 길이가 다른 배치가 들어오면 이를 처리하기 위해 패딩(padding)을 사용한다. 하지만, RNN이 이 패딩 된 값도 함께 학습하려고 하기에 불필요한 연산이 발생한다. 이를 방지하기 위해 pack_padded_sequence를 사용하여 시퀀스의 실제 데이터만 처리하도록 한다.
- pad_packed_sequence: pack_padded_sequence로 RNN에서 처리된 데이터를 원래의 패딩 된 형식으로 복원합니다. 또한 각 시퀀스의 실제 길이도 함께 반환해 준다.
class Encoder(nn.Module):
def __init__(self, input_size, embedding_size, hidden_size, n_layers=1):
super(Encoder, self).__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.n_layers = n_layers
self.embedding = nn.Embedding(input_size, embedding_size) # 단어사전 길이 input_size만큼 embedding_size다 미리 매핑함 즉 정의함
self.lstm = nn.LSTM(embedding_size, hidden_size, num_layers=n_layers, batch_first=True, bidirectional=True)
# hidden state 초기화
def init_hidden(self, inputs):
hidden = Variable(torch.zeros(self.n_layers * self.n_direction, inputs.size(0), self.hidden_size))
return hidden.cuda() if USE_CUDA else hidden
def init_weight(self):
self.embedding.weight = nn.init.xavier_uniform(self.embedding.weight)
self.lstm.weight_hh_l0 = nn.init.xavier_uniform(self.lstm.weight_hh_l0) # l0 첫번째 층
self.lstm.weight_ih_l0 = nn.init.xavier_uniform(self.lstm.weight_ih_l0)
def forward(self, inputs, input_lengths):
hidden = self.init_hidden(inputs)
# input_lengths 는 입력 시퀀스의 실제 길이를 담고 있는 리스트, 이 리스트는 패딩이 적용되기 전, 각 시퀀스가 실제로 가지는 길이를 기록
# hidden의 shape: (n_layers * n_direction, batch_size, hidden_size)
embedded = self.embedding(inputs) # inputs shape: (batch_size x sequence_length) -> embedded: (batch_size, sequence_length, embedding_size)
packed = torch.nn.utils.rnn.pack_padded_sequence(embedded, input_lengths, batch_first=True)
outputs, hidden = self.lstm(packed, hidden) # hidden의 shape: (n_layers * n_direction, batch_size, hidden_size)
outputs, output_lengths = torch.nn.utils.rnn.pad_packed_sequence(outputs,batch_first=True)
# output: (batch_size, max_sequence_length, n_direction * hidden_size), output_lengths: (batch_size,) 각 시퀀스의 원래 길이
hidden = hidden[-2:] # 양방향: (n_layers * n_direction, batch_size, hidden_size) # n_layers중 마지막 layer만 가져옴
return outputs, torch.cat([h for h in hidden], 1).unsqueeze(1)
# torch.cat 이후 hidden의 shape: (batch_size, hidden_size * n_direction) # 같은 batch차원에서 사용하기위해
# unsqueeze(1) 이후: (batch_size, 1, hidden_size * n_direction)
BATCH_SIZE = 32
EMBEDDING_SIZE = 300
HIDDEN_SIZE = 512
encoder = Encoder(
input_size=len(source2index),
embedding_size=EMBEDDING_SIZE,
hidden_size=HIDDEN_SIZE,
n_layers=3
).cuda()
Decoder
- nn.LSTM(embedding_size+hidden_size, hidden_size, n_layers, batch_first=True)
- encoder에서는 512 hidden_size과 bidirectional사용하였고 Decoder에서는 bidirectional사용 안 하기에 크기를 맞혀주기 위해 1024 hidden_size를 가진다.
- 입력으로 이전 time step의 context vector와 이전 time step에서 예측한 단어 embedded를 concat 해서 받기에 input_size는 embedding_size+hidden_size(1024)를 받는다.
- def Attention(self, hidden, encoder_outputs)
- 파라미터로 hidden(Query)(디코더의 현재 time step), encoder_outputs(Key, Value)를 받는다.
- attention score를 계산하기 위해 Key를 Linear에 통과시키고 Query와 행렬곱을 한다. -> attn_energies
- softmax를 통과하여 attention distribution을 구한다. 이후 value와 행렬곱을 통해 context를 구한다.
- forward
- <s>를 받아서 max_length까지 반복한다. 첫 시작을 예시로 들어 <s>를 받았다고 가정하자.
- <s>를 inputs으로 받고 임베딩을 거쳐서 embedded변수(batch, 1, embedding)에 할당된다. 디코더의 첫 시작은 encoder의 마지막 hidden state(b, 1, hidden(512)*n_direc(2))를 받고 이를 context에 할당한다. 이 둘을 concat 하여 LSTM에 input으로 준다.
- 이후 LSTM에서 나온 hidden과 encoder의 마지막 hidden state인 context를 concat 하여 Linear를 거쳐 디코더 사전 길이만큼 차원을 조절한다.
- 이후 softmax를 거친 후 가장 높은 확률을 가진 단어 인덱스를 임베딩하여 embedded를 업데이트한다.
- 앞서 정의한 Attention함수에 현재 Decoder의 time step hidden state와 encoder 전체 hidden state를 넘겨줘 Context를 업데이트한다. 여기서 Long-term-dependency를 해결한다.
- 이 과정을 반복한 후 최종 scores(batch*max_len, vocab_size)를 반환한다. 즉, 매 time step마다 전체 단어사전에서 확률 분포를 가진 값이다.
- <s>를 받아서 max_length까지 반복한다. 첫 시작을 예시로 들어 <s>를 받았다고 가정하자.
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
class Decoder(nn.Module):
# 초기화 함수
def __init__(self, input_size, embedding_size, hidden_size, n_layers=1, dropout_p=0.1):
# (len(target2index), EMBEDDING_SIZE, HIDDEN_SIZE * 2) 를 받음
super(Decoder, self).__init__()
self.hidden_size = hidden_size
self.n_layers = n_layers
# 임베딩 레이어
self.embedding = nn.Embedding(input_size, embedding_size)
# 드롭아웃 레이어
self.dropout = nn.Dropout(dropout_p)
# lstm 레이어, input_size = embedding_size + hidden_size
self.lstm = nn.LSTM(embedding_size + hidden_size, hidden_size, n_layers, batch_first=True)
# 선형 레이어
self.linear = nn.Linear(hidden_size * 2, input_size)
# Attention 레이어
self.attn = nn.Linear(self.hidden_size, self.hidden_size)
def init_hidden(self,inputs):
hidden = Variable(torch.zeros(self.n_layers, inputs.size(0), self.hidden_size))
return hidden.cuda() if USE_CUDA else hidden
def init_weight(self):
self.embedding.weight = nn.init.xavier_uniform(self.embedding.weight)
self.lstm.weight_hh_l0 = nn.init.xavier_uniform(self.lstm.weight_hh_l0)
self.lstm.weight_ih_l0 = nn.init.xavier_uniform(self.lstm.weight_ih_l0)
self.linear.weight = nn.init.xavier_uniform(self.linear.weight)
self.attn.weight = nn.init.xavier_uniform(self.attn.weight)
# Attention 메커니즘을 구현
def Attention(self, hidden, encoder_outputs):
"""
hidden : (1,batch, hidden_size)
encoder_outputs : (batch, max_seq_len, hidden_size)
hidden_size = 1024
Query : 디코더의 현재 hidden state (hidden)
Key : 인코더의 출력 (encoder_outputs)
Value : 인코더의 출력 (encoder_outputs), Key와 동일
"""
hidden = hidden[0].unsqueeze(2) # (1,batch, hidden_size) -> (batch, hidden_size,1)
batch_size = encoder_outputs.size(0) # b
max_len = encoder_outputs.size(1) # max_len
# attention 에너지를 계산
energies = self.attn(encoder_outputs.contiguous().view(batch_size * max_len, -1)) # Linear통과하기위해 (b, max_len, h)를 (b*max_len, h)로 바꿈
energies = energies.view(batch_size,max_len, -1) # (b, max_len, h)
attn_energies = energies.bmm(hidden).squeeze(2) # (b, max_len, h) * (b, h, 1) = (b, max_len)
# softmax를 사용하여 attention 가중치를 계산
alpha = F.softmax(attn_energies,1) # (b, max_len)
alpha = alpha.unsqueeze(1) # (b, 1, max_len)
# context 벡터를 계산
context = alpha.bmm(encoder_outputs) # (b, 1, max_len) * (b, max_len, h) = (b, 1, h)
return context, alpha
"""
attn_energies : query(hidden)와 key(encoder_outputs)의 유사도 계산 결과
F.softmax(attn_energies, 1) : attention score alpha를 계산하며, 이것이 각 타임스텝마다 value에 곱해지는 가중치
alpha : query와 key간의 유사도를 나태내는 attention weight
context = alpha.bmm(encoder_outputs)에서는 alpha (attention weight)를 value (encoder_outputs)에 곱해 최종 context vector를 계산
"""
# 순전파 함수
def forward(self, inputs, context, max_length, encoder_outputs, is_training=False):
"""
inputs : (batch,1) (시작 심볼, <s>)
context : (batch,1, hidden_size(512*2)) (마지막 인코더 은닉 상태)
max_length : 디코딩할 최대 길이
encoder_outputs : (batch, max_len, hidden_size(512*2))
input_size = decoder 사전 길이
"""
embedded = self.embedding(inputs) # 입력 단어의 임베딩을 계산
hidden = self.init_hidden(inputs)
if is_training:
embedded = self.dropout(embedded)
# embedded : (batch, 1, embedding_size)
decode = []
for i in range(max_length):
# LSTM의 입력으로 embedded와 context를 연결한 것을 사용
_, hidden = self.lstm(torch.cat((embedded, context), 2), hidden)
# torch.cat((embedded, context), 2) -> (batch, 1, hidden_size+embedding)
# hidden : (1, batch, hidden_size)
# hidden과 context를 concat하여 concated를 생성
concated = torch.cat((hidden, context.transpose(0, 1)), 2) # concated : (1, batch, hidden_size*2(2048))
# Linear를 통해 score를 계산
score = self.linear(concated.squeeze(0)) # (batch, hidden_size*2(2048)) -> (batch, input_size)
# score에 softmax를 적용하여 확률 분포를 얻음
softmaxed = F.log_softmax(score,1) # (batch, input_size)
# softmaxed를 decode 리스트에 추가
decode.append(softmaxed)
# softmaxed의 최대값 인덱스를 decoded에 저장
decoded = softmaxed.max(1)[1] # (batch,)
# decoded를 임베딩하여 embedded를 업데이트
embedded = self.embedding(decoded).unsqueeze(1) # (batch, 1, embedding_size)
# 훈련 단계에서만 드롭아웃 적용
if is_training:
embedded = self.dropout(embedded)
# attention을 사용하여 다음 context 벡터 업데이트
context, alpha = self.Attention(hidden, encoder_outputs)
scores = torch.cat(decode, 1) # (batch, max_len, input_size)
# scores의 크기를 변경 # nn.CrossEntropyLoss 함수는 모델의 예측 결과를 (N, C) 형태로 받음
return scores.view(inputs.size(0) * max_length, -1) # (batch*max_len, input_size)
'DL' 카테고리의 다른 글
Encoder Model BERT (0) | 2024.09.24 |
---|---|
Transformer 이해하기 (0) | 2024.09.23 |
[Generation] Diffusion Model의 이해 (DDIM, with GAN) (0) | 2024.08.26 |
[Generation] Diffusion Model의 이해 (DPM, DDPM) (2) | 2024.08.25 |
[Generation] Generative Adversarial Network (GAN) (1) | 2024.08.20 |