8
8
import numpy as np
9
9
from load_data import get_dataset
10
10
11
+ import torch .optim as optim
12
+ from time_series_transformer import TimeSeriesTransformer
13
+ from time_series_transformer_batchnorm import TSTransformerEncoder
14
+ from utils import get_dataloaders
11
15
12
- # Positional Encodings
13
- class FixedPositionalEncoding (nn .Module ):
14
- def __init__ (self , d_model , dropout = 0.1 , max_len = 5000 ):
15
- super (FixedPositionalEncoding , self ).__init__ ()
16
- self .dropout = nn .Dropout (p = dropout )
17
-
18
- pe = torch .zeros (max_len , d_model )
19
- position = torch .arange (0 , max_len , dtype = torch .float ).unsqueeze (1 )
20
- div_term = torch .exp (torch .arange (0 , d_model , 2 ).float () * (- math .log (10000.0 ) / d_model ))
21
- pe [:, 0 ::2 ] = torch .sin (position * div_term )
22
- pe [:, 1 ::2 ] = torch .cos (position * div_term )
23
- pe = pe .unsqueeze (0 )
24
- self .register_buffer ('pe' , pe )
25
-
26
- def forward (self , x ):
27
- x = x + self .pe [:, :x .size (1 ), :]
28
- return self .dropout (x )
29
-
30
- class LearnedPositionalEncoding (nn .Module ):
31
- def __init__ (self , d_model , dropout = 0.1 , max_len = 5000 ):
32
- super (LearnedPositionalEncoding , self ).__init__ ()
33
- self .dropout = nn .Dropout (p = dropout )
34
- self .pe = nn .Parameter (torch .randn (1 , max_len , d_model ))
35
-
36
- def forward (self , x ):
37
- x = x + self .pe [:, :x .size (1 ), :]
38
- return self .dropout (x )
39
-
40
- def get_pos_encoder (pos_encoding ):
41
- if pos_encoding == 'fixed' :
42
- return FixedPositionalEncoding
43
- elif pos_encoding == 'learned' :
44
- return LearnedPositionalEncoding
45
- else :
46
- raise ValueError (f"Unknown positional encoding type: { pos_encoding } " )
47
-
48
- # Activation Function
49
- def _get_activation_fn (activation ):
50
- if activation == "relu" :
51
- return F .relu
52
- elif activation == "gelu" :
53
- return F .gelu
54
- else :
55
- raise ValueError (f"Invalid activation function: { activation } " )
56
-
57
- # Custom Transformer Encoder Layer with Batch Normalization
58
- class TransformerBatchNormEncoderLayer (nn .Module ):
59
- def __init__ (self , d_model , nhead , dim_feedforward = 2048 , dropout = 0.1 , activation = "gelu" ):
60
- super ().__init__ ()
61
- self .self_attn = nn .MultiheadAttention (d_model , nhead , dropout = dropout )
62
- self .linear1 = nn .Linear (d_model , dim_feedforward )
63
- self .dropout = nn .Dropout (dropout )
64
- self .linear2 = nn .Linear (dim_feedforward , d_model )
65
- self .norm1 = nn .BatchNorm1d (d_model )
66
- self .norm2 = nn .BatchNorm1d (d_model )
67
- self .dropout1 = nn .Dropout (dropout )
68
- self .dropout2 = nn .Dropout (dropout )
69
-
70
- self .activation = _get_activation_fn (activation )
71
-
72
- def forward (self , src , src_mask = None , src_key_padding_mask = None , ** kwargs ):
73
- src2 = self .self_attn (src , src , src , attn_mask = src_mask , key_padding_mask = src_key_padding_mask )[0 ]
74
- src = src + self .dropout1 (src2 )
75
- src = src .transpose (0 , 1 ).transpose (1 , 2 )
76
- src = self .norm1 (src )
77
- src = src .transpose (1 , 2 ).transpose (0 , 1 )
78
- src2 = self .linear2 (self .dropout (self .activation (self .linear1 (src ))))
79
- src = src + self .dropout2 (src2 )
80
- src = src .transpose (0 , 1 ).transpose (1 , 2 )
81
- src = self .norm2 (src )
82
- src = src .transpose (1 , 2 ).transpose (0 , 1 )
83
- return src
84
-
85
- # Main Transformer Encoder Model
86
- class TSTransformerEncoder (nn .Module ):
87
- def __init__ (self , feat_dim , max_len , d_model , n_heads , num_layers , dim_feedforward , dropout = 0.1 ,
88
- pos_encoding = 'fixed' , activation = 'gelu' , norm = 'BatchNorm' , freeze = False ):
89
- super (TSTransformerEncoder , self ).__init__ ()
90
-
91
- self .max_len = max_len
92
- self .d_model = d_model
93
- self .n_heads = n_heads
94
-
95
- self .project_inp = nn .Linear (feat_dim , d_model )
96
- self .pos_enc = get_pos_encoder (pos_encoding )(d_model , dropout = dropout * (1.0 - freeze ), max_len = max_len )
97
-
98
- if norm == 'LayerNorm' :
99
- encoder_layer = TransformerEncoderLayer (d_model , self .n_heads , dim_feedforward , dropout * (1.0 - freeze ), activation = activation )
100
- else :
101
- encoder_layer = TransformerBatchNormEncoderLayer (d_model , self .n_heads , dim_feedforward , dropout * (1.0 - freeze ), activation = activation )
102
-
103
- self .transformer_encoder = nn .TransformerEncoder (encoder_layer , num_layers )
104
-
105
- self .output_layer = nn .Linear (d_model , feat_dim )
106
-
107
- self .act = _get_activation_fn (activation )
108
-
109
- self .dropout1 = nn .Dropout (dropout )
110
-
111
- self .feat_dim = feat_dim
112
-
113
- def forward (self , X , padding_masks ):
114
- inp = X .permute (1 , 0 , 2 )
115
- inp = self .project_inp (inp ) * math .sqrt (self .d_model )
116
- inp = self .pos_enc (inp )
117
- output = self .transformer_encoder (inp , src_key_padding_mask = ~ padding_masks )
118
- output = self .act (output )
119
- output = output .permute (1 , 0 , 2 )
120
- output = self .dropout1 (output )
121
- output = self .output_layer (output )
122
- return output
123
16
124
17
ds_list = ["UniMiB SHAR" ,
125
18
"UCI HAR" ,
@@ -136,54 +29,90 @@ def forward(self, X, padding_masks):
136
29
train_dataset = torch .utils .data .TensorDataset (torch .tensor (X_train , dtype = torch .float32 ), torch .tensor (y_train , dtype = torch .long ))
137
30
val_dataset = torch .utils .data .TensorDataset (torch .tensor (X_valid , dtype = torch .float32 ), torch .tensor (y_valid , dtype = torch .long ))
138
31
139
- train_loader = DataLoader (train_dataset , batch_size = 32 , shuffle = True )
140
- val_loader = DataLoader (val_dataset , batch_size = 32 , shuffle = False )
32
+ train_loader , valid_loader , test_loader = get_dataloaders (X_train , y_train , X_valid , y_valid , X_test , y_test )
141
33
142
- # Training Function
143
- def train_model (model , train_loader , criterion , optimizer , num_epochs ):
144
- model .train ()
34
+
35
+ input_timesteps = X_train .shape [1 ]
36
+ in_channels = X_train .shape [2 ]
37
+ patch_size = 16
38
+ embedding_dim = 128
39
+ num_classes = len (torch .unique (y_train ))
40
+
41
+ # Choose positional encoding type
42
+ pos_encoding_type = 'fixed'
43
+
44
+ # Instantiate models with chosen positional encoding
45
+ model1 = TimeSeriesTransformer (input_timesteps , in_channels , patch_size , embedding_dim , pos_encoding = pos_encoding_type , num_classes = num_classes )
46
+ model2 = TSTransformerEncoder (feat_dim = in_channels , max_len = input_timesteps , d_model = embedding_dim , n_heads = 8 , num_layers = 6 , dim_feedforward = 128 , norm = 'BatchNorm' , pos_encoding = pos_encoding_type , num_classes = num_classes )
47
+
48
+ # Set up optimizer and criterion
49
+ optimizer1 = optim .Adam (model1 .parameters (), lr = 0.001 )
50
+ optimizer2 = optim .Adam (model2 .parameters (), lr = 0.001 )
51
+ criterion = nn .CrossEntropyLoss ()
52
+
53
+ # Training function
54
+ def train_model (model , optimizer , train_loader , valid_loader , num_epochs = 10 ):
55
+ best_acc = 0.0
56
+ best_model_wts = None
145
57
for epoch in range (num_epochs ):
58
+ model .train ()
59
+ running_loss = 0.0
60
+ running_corrects = 0
146
61
for inputs , labels in train_loader :
147
62
optimizer .zero_grad ()
148
- padding_masks = (inputs != 0 ).any (dim = - 1 )
149
- outputs = model (inputs , padding_masks )
150
- outputs = outputs .view (- 1 , model .feat_dim ) # Flatten output to (batch_size * seq_length, feat_dim)
151
- labels = labels .view (- 1 ) # Flatten labels to (batch_size * seq_length)
152
- loss = criterion (outputs , labels )
63
+ outputs = model (inputs )
64
+ loss = criterion (outputs , labels .long ())
153
65
loss .backward ()
154
66
optimizer .step ()
155
67
156
- # Evaluation Function
157
- def evaluate_model (model , val_loader , criterion ):
158
- model .eval ()
159
- total_loss = 0
160
- correct = 0
161
- with torch .no_grad ():
162
- for inputs , labels in val_loader :
163
- padding_masks = (inputs != 0 ).any (dim = - 1 )
164
- outputs = model (inputs , padding_masks )
165
- outputs = outputs .view (- 1 , model .feat_dim ) # Flatten output to (batch_size * seq_length, feat_dim)
166
- labels = labels .view (- 1 ) # Flatten labels to (batch_size * seq_length)
167
- loss = criterion (outputs , labels )
168
- total_loss += loss .item ()
169
- preds = torch .argmax (outputs , dim = - 1 )
170
- correct += (preds == labels ).sum ().item ()
171
- accuracy = correct / (len (val_loader .dataset ) * val_loader .dataset .tensors [0 ].shape [1 ])
172
- return total_loss / len (val_loader ), accuracy
173
-
174
- # Initialize and Train Models with Different Positional Encodings
175
- models = {
176
- 'fixed' : TSTransformerEncoder (feat_dim = 32 , max_len = 50 , d_model = 128 , n_heads = 8 , num_layers = 6 , dim_feedforward = 512 , pos_encoding = 'fixed' ),
177
- 'learned' : TSTransformerEncoder (feat_dim = 32 , max_len = 50 , d_model = 128 , n_heads = 8 , num_layers = 6 , dim_feedforward = 512 , pos_encoding = 'learned' ),
178
- }
68
+ running_loss += loss .item () * inputs .size (0 )
69
+ _ , preds = torch .max (outputs , 1 )
70
+ running_corrects += torch .sum (preds == labels .data )
179
71
180
- criterion = nn .CrossEntropyLoss ()
181
- results = {}
72
+ epoch_loss = running_loss / len (train_loader .dataset )
73
+ epoch_acc = running_corrects .double () / len (train_loader .dataset )
74
+
75
+ model .eval ()
76
+ val_running_loss = 0.0
77
+ val_running_corrects = 0
78
+ with torch .no_grad ():
79
+ for inputs , labels in valid_loader :
80
+ outputs = model (inputs )
81
+ loss = criterion (outputs , labels .long ())
182
82
183
- for name , model in models .items ():
184
- optimizer = torch .optim .Adam (model .parameters (), lr = 1e-4 )
185
- train_model (model , train_loader , criterion , optimizer , num_epochs = 10 )
186
- val_loss , val_accuracy = evaluate_model (model , val_loader , criterion )
187
- results [name ] = {'val_loss' : val_loss , 'val_accuracy' : val_accuracy }
83
+ val_running_loss += loss .item () * inputs .size (0 )
84
+ _ , preds = torch .max (outputs , 1 )
85
+ val_running_corrects += torch .sum (preds == labels .data )
86
+
87
+ val_loss = val_running_loss / len (valid_loader .dataset )
88
+ val_acc = val_running_corrects .double () / len (valid_loader .dataset )
89
+
90
+ print (f'Epoch { epoch } /{ num_epochs - 1 } , Loss: { epoch_loss :.4f} , Acc: { epoch_acc :.4f} , Val Loss: { val_loss :.4f} , Val Acc: { val_acc :.4f} ' )
91
+
92
+ if val_acc > best_acc :
93
+ best_acc = val_acc
94
+ best_model_wts = model .state_dict ()
95
+
96
+ model .load_state_dict (best_model_wts )
97
+ return model
98
+
99
+ # Train both models
100
+ num_epochs = 10
101
+ print ("Training Model 1..." )
102
+ best_model1 = train_model (model1 , optimizer1 , train_loader , valid_loader , num_epochs )
103
+
104
+ print ("Training Model 2..." )
105
+ best_model2 = train_model (model2 , optimizer2 , train_loader , valid_loader , num_epochs )
106
+
107
+ # Evaluate both models on test data
108
+ def evaluate_model (model , test_loader ):
109
+ model .eval ()
110
+ test_running_corrects = 0
111
+ with torch .no_grad ():
112
+ for inputs , labels in test_loader :
113
+ outputs = model (inputs )
114
+ _ , preds = torch .max (outputs , 1 )
115
+ test_running_corrects += torch .sum (preds == labels .data )
188
116
189
- print (results )
117
+ test_acc = test_running_corrects .double () / len (test_loader .dataset )
118
+ print (f'Test Acc: { test_acc :.4f} ' )
0 commit comments