Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
241 changes: 241 additions & 0 deletions linear_transformer_unconstrained.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
###########################################
# This file contains the following:
# 1. Linear Transformer Model
# 2. Function for clipping gradient
# 3. Function for generating random data
#
# The notation for linear attention follows
# the paper at https://arxiv.org/pdf/2306.00297.pdf
###########################################


import torch
from torch import nn
import numpy as np

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Definition of a single linear attention unit for linear-regression data
# P is the value matrix
# Q is the product of key,query matrices
# the dimensions of the input are
# B: batch-size of prompts
# N: context length (excluding query)
# d: covariate dimension
# P,Q are d x d matrices
# Z is a B x (N+1) + (d+1) matrix
# Output is also B x (N+1) + (d+1)

# For linear attention, activation = None
# For standard attention, activation(x) = torch.nn.functional.softmax(x, dim = 2)
# For ReLU attention, activation(x) = torch.nn.relu(x)
def attention(P,Q,Z, activation = None):
B= Z.shape[0]
N = Z.shape[1]-1
d = Z.shape[2]-1
P_full = P
Q_full = Q
A = torch.eye(N+1).to(device)
A[N,N] = 0
Attn = torch.einsum('BNi, ij, BMj -> BNM', (Z,Q_full,Z))
if activation is not None:
Attn = activation(Attn)
key = torch.einsum('ij, BNj -> BNi', (P_full,Z))
Output = torch.einsum('BNM,ML, BLi -> BNi', (Attn,A,key))
return Output /N


# The Linear Transformer module
# n_layer denotes the number of layers
# n_head denotes the number of heads. In most of our experiments, n_head = 1
# d denotes the dimension of covariates
# var denotes the variance of initialization. It needs to be sufficiently small, but exact value is not important
# allparam: contains all the parameters, has dimension n_layer x n_head x 2 x d x d
# For example
# - P matrix at layer i, head j is allparam[i,j,0,:,:]
# - Q matrix at layer i, head j is allparam[i,j,1,:,:]
class Transformer_F(nn.Module):
def __init__(self, n_layer, n_head, d, var):
super(Transformer_F, self).__init__()
self.register_parameter('allparam', torch.nn.Parameter(torch.zeros(n_layer, n_head, 2, d+1, d+1)))
with torch.no_grad():
self.allparam.normal_(0,var)
self.n_layer = n_layer
self.n_head = n_head

def forward(self, Z):
for i in range(self.n_layer):
Zi = Z
residues = 0
# the forwarad map of each layer is given by F(Z) = Z + attention(Z)
for j in range(self.n_head):
Pij = self.allparam[i,j,0,:,:]
Qij = self.allparam[i,j,1,:,:]
residues = residues + attention(Pij,Qij,Zi)
Z = Zi + residues
return Z

#enforces top-left-dxd-block sparsity on p
def zero_p(self):
for i in range(self.n_layer):
for j in range(self.n_head):
with torch.no_grad():
self.allparam[i,j,0,:,:].zero_()

# evaluate the loss of model, given data (Z,y)
def in_context_loss(model, Z, y):
N = Z.shape[1]-1
d = Z.shape[2]-1
output = model(Z)
diff = output[:,N,d]+y
loss = ((diff)**2).mean()
return loss

# generate random data for linear regression
# mode: distribution of samples to generate. Currently supports 'normal', 'gamma', 'sphere'
# N: number of context examples
# d: dimension of covariates
# For gamma distribution:
# - shape_k: shape parameter of gamma distribution (unused otherwise)
# - scale parameter: hard coded so that when shape_k = 5/2 and d=5, the generated data is standard normal
def generate_data(mode='normal',N=20,d=1,B=1000,shape_k=0.1, U=None, D=None):
W= torch.FloatTensor(B, d).normal_(0,1).cuda()
X = torch.FloatTensor(B, N, d).normal_(0, 1).cuda()
X_test = torch.FloatTensor(B,1,d).normal_(0, 1).cuda()

if U is not None:
U = U.cuda()
D = D.cuda()
W= torch.FloatTensor(B, d).normal_(0,1).cuda()
W = torch.mm(W,torch.inverse(D))
W = torch.mm(W,U.t())

if mode =='sphere':
X.div_(X.norm(p=2,dim=2)[:,:,None])
X_test.div_(X_test.norm(p=2,dim=2)[:,:,None])
elif mode == 'gamma':
# random gamma scaling for X
gamma_scales = np.random.gamma(shape=shape_k, scale=(10/shape_k)**(0.5), size=[B,N])
gamma_scales = torch.Tensor(gamma_scales).cuda()
gamma_scales = gamma_scales.sqrt()
# random gamma scaling for X_test
gamma_test_scales = np.random.gamma(shape=shape_k, scale=(10/shape_k)**(0.5), size=[B,1])
gamma_test_scales = torch.Tensor(gamma_test_scales).cuda()
gamma_test_scales = gamma_test_scales.sqrt()
# normalize to unit norm
X.div_(X.norm(p=2,dim=2)[:,:,None])
X_test.div_(X_test.norm(p=2,dim=2)[:,:,None])
# scale by gamma
X.mul_(gamma_scales[:,:,None])
X_test.mul_(gamma_test_scales[:,:,None])
elif mode =='normal':
assert True
elif mode == 'relu':
return generate_data_relu(N=N, d=d, B=B, hidden_dim=d)
elif mode == 'mlp':
generate_data_mlp(N=N, d=d, B=B, hidden_dim=d)
else:
assert False

if U is not None:
X = torch.einsum('ij, jk, BNk -> BNi', (U,D,X))
X_test = torch.einsum('ij, jk, BNk -> BNi', (U,D,X_test))

y = torch.einsum('bi,bni->bn', (W, X)).unsqueeze(2)
y_zero = torch.zeros(B,1,1).cuda()
y_test = torch.einsum('bi,bni->bn', (W, X_test)).squeeze(1)
X_comb= torch.cat([X,X_test],dim=1)
y_comb= torch.cat([y,y_zero],dim=1)
Z= torch.cat([X_comb,y_comb],dim=2)
return Z.to(device),y_test.to(device)

def generate_data_inplace(Z, U=None, D=None):


B = Z.shape[0]
N = Z.shape[1]-1
d = Z.shape[2]-1
X = Z[:,:,0:-1]
X.normal_(0, 1).cuda()
W= torch.FloatTensor(B, d).normal_(0,1).cuda()
if U is not None:
U = U.cuda()
D = D.cuda()
W = torch.mm(W,torch.inverse(D))
W = torch.mm(W,U.t())
Z[:,:,0:-1] = torch.einsum('ij, jk, BNk -> BNi', (U,D,X))

Z[:,:,-1] = torch.einsum('bi,bni->bn', (W, Z[:,:,0:-1])) #y update
y_test = Z[:,-1,-1].detach().clone()
Z[:,-1,-1].zero_()
return Z.to(device),y_test.to(device)

def generate_data_sine(N=10, B=1000):
# Sample amplitude a and phase p for each task
a = torch.FloatTensor(B).uniform_(0.1, 5).cuda()
p = torch.FloatTensor(B).uniform_(0, math.pi).cuda()

X = torch.FloatTensor(B, N).uniform_(-5, 5).cuda()

Y = a.unsqueeze(1) * torch.sin(p.unsqueeze(1) + X)

X = X.unsqueeze(-1)
Y = Y.unsqueeze(-1)

return X, Y

def generate_data_relu(mode='normal', N=20, d=1, B=1000, shape_k=0.1, U=None, D=None, hidden_dim=100):
# Generate random input data
X = torch.FloatTensor(B, N, d).normal_(0, 1).to(device)
X_test = torch.FloatTensor(B, 1, d).normal_(0, 1).to(device)

# Additional transformations if mode is 'sphere' or 'gamma' [Similar to the existing generate_data function]

# Define a 1-hidden layer ReLU network
model = nn.Sequential(
nn.Linear(d, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, 1)
).to(device)
model[0].weight.data.normal_(0, 0.1)
model[2].weight.data.normal_(0, 0.1)

# Generate y values using the ReLU network
y = model(X.view(-1, d)).view(B, N, 1)
y_test = model(X_test.view(-1, d)).view(B, 1).squeeze(1)

y_zero = torch.zeros(B, 1, 1).to(device)
X_comb = torch.cat([X, X_test], dim=1)
y_comb = torch.cat([y, y_zero], dim=1)
Z = torch.cat([X_comb, y_comb], dim=2)

return Z, y_test

def generate_data_mlp(N=20, d=1, B=1000, hidden_dim=100):
# Generate random input data
X = torch.FloatTensor(B, N, d).normal_(0, 1).to(device)
X_test = torch.FloatTensor(B, 1, d).normal_(0, 1).to(device)

# Additional transformations if mode is 'sphere' or 'gamma' [Similar to the existing generate_data function]

# Define a 1-hidden layer ReLU network
model = nn.Sequential(
nn.Linear(d, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, d)
).to(device)
model[0].weight.data.normal_(0, 1)
model[2].weight.data.normal_(0, 1)

X_MLP = model(X.view(-1, d)).view(B, N, d)
X_test_MLP = model(X_test.view(-1, d)).view(B, 1, d)

W = torch.FloatTensor(B, d).normal_(0,1).cuda()
y = torch.einsum('bi,bni->bn', (W, X_MLP)).unsqueeze(2)
y_zero = torch.zeros(B,1,1).cuda()
y_test = torch.einsum('bi,bni->bn', (W, X_test_MLP)).squeeze(1)
X_comb= torch.cat([X_MLP,X_test_MLP],dim=1)
y_comb= torch.cat([y,y_zero],dim=1)
Z= torch.cat([X_comb,y_comb],dim=2)

return Z, y_test
Loading