-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathminiProject1.py
More file actions
170 lines (137 loc) · 7.69 KB
/
miniProject1.py
File metadata and controls
170 lines (137 loc) · 7.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
from dlc_practical_prologue import generate_pair_sets
from miniProject1Modules import CNN, BCMLP, MLP, AuxMLP
import torch
import torch.nn as nn
import torch.optim as optim
import csv
def train_model(model = "cnn",weight_sharing = False, aux_loss = False, num_epochs = 25, mini_batch_size = 25):
train_input, train_target, train_classes, test_input, test_target, test_classes = generate_pair_sets(1000)
#We create an random shuffling of indices
rand_indices = torch.randperm(1000)
#and randomize the train set
train_input, train_target, train_classes = train_input[rand_indices], train_target[rand_indices], train_classes[rand_indices]
bcmlp = BCMLP()
#bcmlp.apply(init_weights)
if(model == 'cnn'):
#If we are using the CNN model, we need to reshape the input images so that they have one channel for the convolutional layers
train_input = train_input.view(-1,2, 1,14,14).float()
test_input = test_input.view(-1,2, 1,14,14).float()
#If we use weight sharing, we only need to define one instance of our CNN
if(weight_sharing):
main_model = CNN()
#main_model.apply(init_weights)
params = list(main_model.parameters()) + list(bcmlp.parameters())
#If we are not using weight sharing, we define two instances of the CNN model (the weights will thus not be the same for model1 and model2)
elif(not weight_sharing):
model_1 = CNN()
model_2 = CNN()
#model_1.apply(init_weights)
#model_2.apply(init_weights)
params = list(model_1.parameters()) + list(model_2.parameters()) + list(bcmlp.parameters())
elif(model == 'mlp'):
#If we use weight sharing, we only need to define one instance of our MLP
if(weight_sharing):
main_model = MLP()
#main_model.apply(init_weights)
params = list(main_model.parameters()) + list(bcmlp.parameters())
#If we are not using weight sharing, we define two instances of the MLP model (the weights will thus not be the same for model1 and model2)
elif(not weight_sharing):
model_1 = MLP()
model_2 = MLP()
#model_1.apply(init_weights)
#model_2.apply(init_weights)
params = list(model_1.parameters()) + list(model_2.parameters()) + list(bcmlp.parameters())
#If we are using auxiliary loss, we also need to initialize the Aux MLP and the criterion for the auxliary task
if(aux_loss):
train_classes = train_classes.view(-1,2,1)
lambda_aux = 0.5
#we use cross entropy as our criterion for the aux loss (cross entropy contains a softmax so no need to add one)
aux_criterion = nn.CrossEntropyLoss()
aux_MLP = AuxMLP()
#aux_MLP.apply(init_weights)
params += list(aux_MLP.parameters())
#We use binary cross entropy as our main criterion
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.SGD(params, lr=0.001)
for epoch in range(num_epochs): # loop over the dataset multiple times
running_loss = 0.0
#We randomize the order of the train input at each epoch
for i in range(0, train_input.size(0), mini_batch_size):
# zero the parameter gradients
optimizer.zero_grad()
# we pass the 2 images through the forwad, which results in two tensors of size 16
if(weight_sharing):
x1 = main_model.forward(train_input[:,0].narrow(0, i, mini_batch_size))
x2 = main_model.forward(train_input[:,1].narrow(0, i, mini_batch_size))
elif(not weight_sharing):
x1 = model_1.forward(train_input[:,0].narrow(0, i, mini_batch_size))
x2 = model_2.forward(train_input[:,1].narrow(0, i, mini_batch_size))
if(aux_loss):
#We pass the images through an auxiliary MLP to compute the auxiliary loss
aux_x1 = aux_MLP.forward(x1)
aux_x2 = aux_MLP.forward(x2)
#We concatenate the two tensors x1 and x2 to pass them through the final MLP
concat_data = torch.concat((x1,x2), 1)
#This mlp reduces the data to a tensor of size 1, that is then passed through a sigmoid
output = bcmlp.forward(concat_data).view(-1)#.to(torch.float32)
#we compute the loss with the binary classifier
loss = criterion(output, train_target.narrow(0, i, mini_batch_size).to(torch.float32))
if(aux_loss):
loss2 = aux_criterion(aux_x1, train_classes[:,0].narrow(0, i, mini_batch_size).view(-1))
loss3 = aux_criterion(aux_x2, train_classes[:,1].narrow(0, i, mini_batch_size).view(-1))
loss += lambda_aux*(loss2 + loss3)
loss.backward()
optimizer.step()
running_loss += loss.item()
#print('Loss after ' + str(epoch + 1) + ' iterations : ', running_loss)
running_loss = 0
print('Finished Training')
if(weight_sharing and not aux_loss):
return main_model, main_model, bcmlp, None
elif(weight_sharing and aux_loss):
return main_model, main_model, bcmlp, aux_MLP
elif(not weight_sharing and not aux_loss):
return model_1, model_2, bcmlp, None
elif(not weight_sharing and aux_loss):
return model_1, model_2, bcmlp, aux_MLP
def predict_output(sample, target, trained_model1, trained_model2, trained_bcmlp, trained_aux ):
t1 = trained_model1.forward(sample[0])
t2 = trained_model2.forward(sample[1])
concat_data = torch.concat((t1,t2), 1)
res = trained_bcmlp.forward(concat_data)[0][0]
if torch.sigmoid(res) > 0.5 and target.item() == 1:
#print("Prediction : First number <= second one, truth : ", target)
#print((torch.sigmoid(res) > 0.5 ) == target)
return True
elif torch.sigmoid(res) <= 0.5 and target.item() == 0:
#print("Prediction : First number > second one, truth : ", target)
#print((torch.sigmoid(res) <= 0.5 ) == target)
return True
else:
return False
#We write the results in the file 'output.txt'
with open('output.txt', 'w') as f:
writer = csv.writer(f)
#We write the columns
f.write('Model,WeightSharing,AuxLoss,RunNb,Score\n')
#Loop over the two architectures
for chosen_model in ['cnn', 'mlp']:
#Loop over the two possibilities for weight sharing : with and without
for weight_sharing_active in [True, False]:
#Loop over the two possibilities for auxiliary loss : with or without
for aux_loss_active in [True, False]:
for k in range(10):
trained_model1, trained_model2, trained_bcmlp, trained_aux = train_model(model = chosen_model,weight_sharing = weight_sharing_active, aux_loss = aux_loss_active, num_epochs = 25)
train_input, train_target, train_classes, test_input, test_target, test_classes = generate_pair_sets(1000)
if(chosen_model == 'cnn'):
test_input = test_input.view(-1,2, 1,14,14).float()
n_correct = 0
n_false = 0
for j in range(len(test_input)):
if(predict_output(test_input[j], test_target[j], trained_model1, trained_model2, trained_bcmlp, trained_aux)):
n_correct += 1
else:
n_false += 1
score = n_correct/(n_correct+n_false)
print(str(100*n_correct/(n_correct+n_false)) + '% of correct answers', k+1, 'th run')
writer.writerow([chosen_model,str(weight_sharing_active),str(aux_loss_active),str(k),str(score)])