Files
MediaNCognition/hw4/code/model.py
2024-05-27 00:01:48 +08:00

356 lines
18 KiB
Python

# ========================================================
# Media and Cognition
# Homework 4 Sequence Modeling
# model.py - Model definition
# Student ID: 2022010639
# Name: Yixuan Gao
# Tsinghua University
# (C) Copyright 2024
# ========================================================
# Import required libraries
############################################################
import math
import torch
import torch.nn as nn
from torch.nn import functional as F
import numpy as np
############################################################
# Define the GELU activation function used in OpenAI GPT
############################################################
def gelu(z):
"""
Reference: Gaussian Error Linear Units (GELU) paper: https://arxiv.org/abs/1606.08415
0.5z(1 + tanh[(2/π)^(1/2) * (z + 0.044715 z^3)])
"""
return 0.5 * z * (1.0 + torch.tanh(math.sqrt(2.0 / math.pi) * (z + 0.044715 * torch.pow(z, 3.0))))
############################################################
# Define the Multi-Head SelfAttention module
############################################################
class SelfAttention(nn.Module):
def __init__(self, embed_dim, num_head, dropout):
super().__init__()
# define there linear layers for q, k, v generation separately
self.q_layer = nn.Linear(embed_dim, embed_dim)
self.k_layer = nn.Linear(embed_dim, embed_dim)
self.v_layer = nn.Linear(embed_dim, embed_dim)
# define the projection layer for output
self.proj_layer = nn.Linear(embed_dim, embed_dim)
# define the dropout layer for attention and output calculation
self.attn_drop = nn.Dropout(dropout)
self.proj_drop = nn.Dropout(dropout)
self.num_head = num_head
self.head_dim = embed_dim // num_head
def forward(self, x):
batch_size, seq_len, dim = x.shape
# >>> TODO 1: complete the forward process of the Multi-Head SelfAttention module.
# Step 1.1: obtain q, k, v via self.q_layer(), self.k_layer(), self.v_layer() respectively.
# the shape of q, k, v: (batch_size, seq_len, num_heads * head_dim)
# num_heads denotes the number of heads in multi-head attention, head_dim denotes the dimension of each head
q = self.q_layer(x)
k = self.k_layer(x)
v = self.v_layer(x)
# Step 1.2: in order to calculate multi-head attention in parallel, reshape q, k, v first.
# first use `Tensor.reshape()` to reshape q, k, v to: (batch_size, seq_len, num_heads, head_dim)
q = q.reshape(batch_size, seq_len, self.num_head, self.head_dim)
k = k.reshape(batch_size, seq_len, self.num_head, self.head_dim)
v = v.reshape(batch_size, seq_len, self.num_head, self.head_dim)
# then use `Tensor.transpose()` or `Tensor.permute()` to exchange the dim of q, k, v for matrix multiplication
# the shape of q, k, v from (batch_size, seq_len, num_heads, head_dim) to (batch_size, num_heads, seq_len, head_dim)
q = q.transpose(1, 2)
k = k.transpose(1, 2)
v = v.transpose(1, 2)
# Step 1.3: calculate multi-head attention in parallel: Attention(q, k, v) = softmax(qk^T / sqrt(head_dim)) v
# Step 1.3.1: do matrix multiplication via `torch.matmul()`: attn = qk^T / sqrt(head_dim)
# the shape of `attn`: (batch_size, num_heads, seq_len, seq_len)
attn = (q @ k.transpose(2, 3)) / torch.sqrt(torch.tensor(self.head_dim))
# Step 1.3.2: for Auto-Regressive Language Model, the predictions for position i can depend only on the input at positions less than i.
# Therefore, a mask is used to prevent positions from attending to subsequent positions
# attn_mask = (0,1,...,1; 0,0,1,...,0; ...; 0,...,0) is a upper triangular matrix with shape (seq_len, seq_len)
# Hint:
# use torch.ones(?, device=x.device) to generate a matrix filled with value 1 with shape (seq_len, seq_len)
attn_mask = torch.ones(seq_len, seq_len, device=attn.device)
# use torch.triu(?, diagonal=1) to obtain a upper triangular matrix where elements on the main diagonal are 0
attn_mask = torch.triu(attn_mask, diagonal=1)
# use Tensor.bool() to convert the matrix to a boolean matrix
attn_mask = attn_mask.bool()
# fill the position of `attn` where `attn_mask==True` with value `float('-inf')` via `Tensor.masked_fill()`
attn = attn.masked_fill(attn_mask, -np.inf)
# Step 1.3.3: normalize `attn` via softmax funtion: attn = Softmax(attn) = Softmax(qk^T / sqrt(head_dim))
attn = torch.softmax(attn, dim=3)
# Step 1.3.4: apply dropout to `attn` via self.attn_drop()
attn = self.attn_drop(attn)
# Step 1.3.5: multiply v by `attn` via torch.matmul(): out = Attention(q, k, v) = attn v
# the shape of `out`: (batch_size, num_heads, seq_len, head_dim)
out = attn @ v
# Step 1.4: use `Tensor.transpose()` and `Tensor.reshape()' to concatenate output of different heads
# the shape of `out` from (batch_size, num_heads, seq_len, head_dim) to (batch_size, seq_len, num_heads*head_dim)
out = out.transpose(1, 2).reshape(batch_size, seq_len, self.num_head * self.head_dim)
# Step 1.5: obtain the final results via self.proj_layer() and self.proj_drop(): result = Dropout(MultiHead(Q, K, V)) = Dropout(out W^O)
result = self.proj_drop(self.proj_layer(out))
# <<< TODO 1
# return the final results `result` and attention weights `attn`
return result, attn
############################################################
# Define the feed forward network (FFN)
############################################################
class FFN(nn.Module):
def __init__(self, embed_dim, feedforward_dim, dropout):
super().__init__()
self.fc1 = nn.Linear(embed_dim, feedforward_dim)
self.fc2 = nn.Linear(feedforward_dim, embed_dim)
self.drop = nn.Dropout(dropout)
def forward(self, x):
x = self.fc1(x)
x = gelu(x)
x = self.fc2(x)
x = self.drop(x)
return x
############################################################
# Define the TransformerLayer
############################################################
class TransformerLayer(nn.Module):
def __init__(self, embed_dim, num_head, feedforward_dim, dropout, no_res):
super().__init__()
self.norm1 = nn.LayerNorm(embed_dim)
self.attn = SelfAttention(embed_dim, num_head, dropout)
self.norm2 = nn.LayerNorm(embed_dim)
self.ffn = FFN(embed_dim, feedforward_dim, dropout)
self.no_res = no_res # whether to use residual connection
def forward(self, x):
# >>> TODO 2: complete the forward process of the TransformerLayer module.
# Step 2.1: calculate the output of multi-head self-attention
# normalize the input via `self.norm1()`: x_norm = LayerNorm(x)
x_norm = self.norm1(x)
# calculate the output of multi-head self-attention via `self.attn()`: x_attn, attn = SelfAttention(x_norm)
x_attn, attn = self.attn(x_norm)
# add the input 'x' to the output of attention (x_attn) if self.no_res is False: x_attn = x + x_attn if no_res is False else x_attn
if not self.no_res:
x_attn = x_attn + x
# Step 2.2: calculate the output of feed forward network
# calculate the output of feed forward network via `self.ffn()` and `self.norm2()`: x_ffn = FFN(LayerNorm(x_attn))
x_ffn = self.ffn(self.norm2(x_attn))
# add the output of attention (x_attn) to the output of feed forward network (x_ffn) if self.no_res is False: out = x_attn + x_ffn if no_res is False else x_ffn
if not self.no_res:
out = x_attn + x_ffn
else:
out = x_ffn
# <<< TODO 2
return out, attn
############################################################
# Define the GPT module
############################################################
class GPT(nn.Module):
def __init__(self, vocab_size, max_seq_len, num_layer, embed_dim, num_head, feedforward_dim, dropout, no_res=False, no_pos=False):
'''
vocab_size: the size of vocabulary
max_seq_len: the maximum length of input texts
num_layer: the number of transformer layers
embed_dim: the embedding dimension
num_head: the number of heads in Multi-Head Self Attention
feedforward_dim: the dimension in the feed forward network
dropout: dropout ratio
no_res: whether to use residual connection in transformer layers
no_pos: whether to use position embeddings
'''
super().__init__()
self.num_layer = num_layer
self.max_seq_len = max_seq_len
self.no_pos = no_pos
# Define Embedding Layer to transfer input text tokens and positions to embeddings
self.word_token_embedding = nn.Embedding(vocab_size, embed_dim)
self.word_pos_embedding = nn.Embedding(max_seq_len, embed_dim)
self.drop = nn.Dropout(dropout)
# Define the transformer layers
self.transformer = nn.ModuleList([TransformerLayer(embed_dim, num_head, feedforward_dim, dropout, no_res) for _ in range(num_layer)])
# Define the head layer to predict output
self.norm = nn.LayerNorm(embed_dim)
self.language_model_head = nn.Linear(embed_dim, vocab_size, bias=False)
"""
Weight tying improves the performance of language models by tying (sharing) the weights of the embedding and softmax layers.
Reference: https://paperswithcode.com/method/weight-tying
"""
self.word_token_embedding.weight = self.language_model_head.weight
self.init_weights()
def init_weights(self):
for m in self.modules():
if isinstance(m, nn.Linear):
torch.nn.init.normal_(m.weight, mean=0.0, std=0.02)
if m.bias is not None:
torch.nn.init.zeros_(m.bias)
elif isinstance(m, nn.Embedding):
torch.nn.init.normal_(m.weight, mean=0.0, std=0.02)
# apply special scaled init to the residual projections, per GPT-2 paper
for pn, p in self.named_parameters():
if pn.endswith('proj_layer.weight'):
torch.nn.init.normal_(p, mean=0.0, std=0.02/math.sqrt(2 * self.num_layer))
def forward(self, word_idx, targets=None):
batch_size, seq_len = word_idx.shape
# >>> TODO 3: complete the forward process of GPT
# Step 3.1: use torch.arange(?, dtype=torch.long, device=word_idx.device) to generate the position sequence `pos` [0, 1, ..., seq_len-1]
pos = torch.arange(seq_len, dtype=torch.long, device=word_idx.device)
# Step 3.2: use self.word_token_embedding() and self.word_pos_embedding() to transfer `word_idx` and `pos` to embeddings ('token_embed` and `pos_embed`)
token_embed = self.word_token_embedding(word_idx)
pos_embed = self.word_pos_embedding(pos)
# Step 3.3: initialize the input embeddings `x` of transformer layers
# add the token embeddings and position embeddings to obtain the input embeddings `x` if self.no_pos is False
if not self.no_pos:
x = token_embed + pos_embed
else:
x = token_embed
# apply dropout to the input embeddings via `self.drop()`
x = self.drop(x)
# Step 3.4: use for loop to obtain the output and attention weights of multiple transformer layers
# define a list `attention_weights` and append the attention weights of each transformer layer into the list
attention_weights = list()
for i in range(self.num_layer):
# Step 4.1: obtain the output and attention weights of transformer layers
x, attn = self.transformer[i](x)
# Step 4.2: append the attention weights of transformer layers into the list `attention_weights`
attention_weights.append(attn)
# Step 3.5: use self.norm() to normalize the output of transformer layers and then use self.language_model_head() to obtain the `logits` for prediction
# self.language_model_head() is a linear layer defined in __init__() function
# Note: do not add softmax here since it is included in the cross entropy loss function
x = self.norm(x)
logits = self.language_model_head(x)
# <<< TODO 3
# return logits and loss or attention weights
if targets is not None:
loss = torch.nn.functional.cross_entropy(logits.reshape(-1, logits.size(-1)), targets.reshape(-1), ignore_index=0)
return logits, loss
assert isinstance(attention_weights, list), "attention_weights must be a list, please check whether to append the attention weights of all transformer layers into it!"
return logits, attention_weights
def configure_optimizers(self, weight_decay):
"""
This long function is unfortunately doing something very simple and is being very defensive:
We are separating out all parameters of the model into two buckets: those that will experience
weight decay for regularization and those that won't (biases, and layernorm/embedding weights).
We are then returning the PyTorch optimizer object.
"""
# separate out all parameters to those that will and won't experience regularizing weight decay
decay = set()
no_decay = set()
whitelist_weight_modules = (nn.Linear, )
blacklist_weight_modules = (nn.LayerNorm, torch.nn.Embedding)
for mn, m in self.named_modules():
for pn, p in m.named_parameters():
fpn = '%s.%s' % (mn, pn) if mn else pn # full param name
# random note: because named_modules and named_parameters are recursive
# we will see the same tensors p many many times. but doing it this way
# allows us to know which parent module any tensor p belongs to...
if pn.endswith('bias'):
# all biases will not be decayed
no_decay.add(fpn)
elif pn.endswith('weight') and isinstance(m, whitelist_weight_modules):
# weights of whitelist modules will be weight decayed
decay.add(fpn)
elif pn.endswith('weight') and isinstance(m, blacklist_weight_modules):
# weights of blacklist modules will NOT be weight decayed
no_decay.add(fpn)
# subtle: 'transformer.wte.weight' and 'lm_head.weight' are tied, so they
# will appear in the no_decay and decay sets respectively after the above.
# In addition, because named_parameters() doesn't return duplicates, it
# will only return the first occurence, key'd by 'transformer.wte.weight', below.
# so let's manually remove 'lm_head.weight' from decay set. This will include
# this tensor into optimization via transformer.wte.weight only, and not decayed.
decay.remove('language_model_head.weight')
# validate that we considered every parameter
param_dict = {pn: p for pn, p in self.named_parameters()}
inter_params = decay & no_decay
union_params = decay | no_decay
assert len(inter_params) == 0, "parameters %s made it into both decay/no_decay sets!" % (str(inter_params), )
assert len(param_dict.keys() - union_params) == 0, "parameters %s were not separated into either decay/no_decay set!" \
% (str(param_dict.keys() - union_params), )
# create the pytorch optimizer object
optim_groups = [
{"params": [param_dict[pn] for pn in sorted(list(decay))], "weight_decay": weight_decay},
{"params": [param_dict[pn] for pn in sorted(list(no_decay))], "weight_decay": 0.0},
]
return optim_groups
@torch.no_grad()
def generate(self, idx, max_new_tokens, temperature=1.0, top_k=None):
"""
Take a conditioning sequence of indices idx (LongTensor of shape (b,t)) and complete
the sequence max_new_tokens times, feeding the predictions back into the model each time.
Most likely you'll want to make sure to be in model.eval() mode of operation for this.
"""
for _ in range(max_new_tokens):
# if the sequence context is growing too long we must crop it at block_size
idx_cond = idx
# forward the model to get the logits for the index in the sequence
logits, _ = self(idx_cond)
# pluck the logits at the final step and scale by desired temperature
logits = logits[:, -1, :] / temperature
# optionally crop the logits to only the top k options
if top_k is not None:
v, _ = torch.topk(logits, min(top_k, logits.size(-1)))
logits[logits < v[:, [-1]]] = -float('Inf')
# apply softmax to convert logits to (normalized) probabilities
probs = F.softmax(logits, dim=-1)
# sample from the distribution
idx_next = torch.multinomial(probs, num_samples=1)
# append sampled index to the running sequence and continue
idx = torch.cat((idx, idx_next), dim=1)
return idx.squeeze().cpu().numpy()
############################################################
GPTConfig = {
'mygpt': dict(num_layer=4, embed_dim=128, num_head=4, feedforward_dim=128*4, dropout=0.0),
'gpt2-mini': dict(num_layer=6, embed_dim=384, num_head=6, feedforward_dim=384*4, dropout=0.2),
'gpt2': dict(num_layer=12, embed_dim=768, num_head=12, feedforward_dim=768*4, dropout=0.2),
}