This post is to record the issues I encounted during the implementation from Keras to PyTorch. At first the structure of the CNN modes is introduced. Then, the codes in Keras are shown. Next, The partial implementation in pure Pytorch is given and at last, the implementation in Pytorch Lightning is also presented.
Several days ago, my first research paper was published online . The research aimed to develop calibration models of UV spectra using CNNs. The research showed that the different loss functions might make a significant impact on the performance metrics. The results showed that CNN models with MAPE or MAE as the loss function predicted the spectra samples more accurately than those with MSE as the loss function.
I used Keras) to conduct the experiments at that time. Recently I have been learning PyTorch and thus, I try to implemented the models using Pytorch.
CNN mode structure
The model consists of 7 layers.
Conv
: the convolutional layer; BN
: the batch normalization layer; : the activation function
Each spectrum vector () is feeded into the the input layer, and then the features are extracted after the convolutional layer. The batchnorm layer could standardize the data, which allows no preprocessing on the spectra. Moreover, the batchnorm layer allows faster convergence. The dropout layer helps the model avoid overfitting.
Keras code
1. Import packages
import tensorflow as tf
from tensorflow import keras # the version of tensorflow is 2.1.0
from tensorflow.keras.layers import (Dense, Conv1D, Flatten, Activation, BatchNormalization, Dropout)
from tensorflow.keras import Sequential, regularizers, optimizers
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
physical_devices = tf.config.list_physical_devices('GPU')
tf.config.experimental.set_memory_growth(physical_devices[0], True)
2. Prepare data
data = pd.read_csv("./data.csv")
# dataset split
train = data[data['split'] == "Calibration"]
val = data[data['split'] == "Validation"]
test = data[data['split'] == "Test"]
Xtrain, ytrain = train.iloc[:, 5:], train.iloc[:, 3]
Xval, yval = val.iloc[:, 5:], val.iloc[:, 3]
Xtest, ytest = test.iloc[:, 5:], test.iloc[:, 3]
Xtrain, ytrain = Xtrain.values, ytrain.values,
Xval, yval = Xval.values, yval.values
Xtest, ytest = Xtest.values, ytest.values
# reshape the arrays from 1D to 2D
Xtrain = Xtrain.reshape(Xtrain.shape[0], Xtrain.shape[1], 1)
Xval = Xval.reshape(Xval.shape[0], Xval.shape[1], 1)
Xtest = Xtest.reshape(Xtest.shape[0], Xtest.shape[1], 1)
3. Build model and create callbacks
# Function: build the CNN model with different parameters like filters, l2.
def build_model(filters=8, kernel_size=200, l2=0.0001, dropout_ratio=0.2, units=32):
model = Sequential()
# the convolutional layer with ReLu activation
model.add(
Conv1D(filters=filters,
kernel_size=kernel_size,
strides=1,
activation='relu',
padding='same',
input_shape=(Xtrain.shape[1], 1),
kernel_initializer='VarianceScaling',
kernel_regularizer=regularizers.l2(l2)))
model.add(Flatten())
model.add(BatchNormalization())
model.add(Activation('relu'))
model.add(Dropout(dropout_ratio))
model.add(Dense(units, kernel_initializer='VarianceScaling', kernel_regularizer=regularizers.l2(l2)))
# the output layer
model.add(Dense(1))
return model
# build model
model = build_model(filters=2, kernel_size=11, l2=0.1, dropout_ratio=0.1, units=32)
optimizer = optimizers.Adam(lr=0.0001)
# use earlystop to avoid overfitting and to save training time
earlystop = keras.callbacks.EarlyStopping(patience=500)
# use checkpoint to save the best model while training
if os.path.isdir("./best"):
pass
else:
os.mkdir("./best")
weight_path = "./best/weights_best.hdf5"
checkpoint = keras.callbacks.ModelCheckpoint(weight_path, monitor='val_loss', verbose=0, save_best_only=True)
# define the loss function: 'mse'/'mape'/'mae'
model.compile(optimizer=optimizer, loss='mse', metrics=['mse'])
4. Train the model
# model training process
history = model.fit(
x=Xtrain,
y=ytrain,
batch_size=64, # mini-batch size
epochs=1000,
verbose=0,
validation_data=(Xval, yval),
callbacks=[earlystop, checkpoint])
del history
# load the best model weights during training for predictions
model.load_weights(weight_path)
ypred_test = model.predict(Xtest)
ypred_train = model.predict(Xtrain)
ypred_val = model.predict(Xval)
Pure Pytorch code
1. Import packages
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
2. Prepare data
data = pd.read_csv("./data.csv")
# dataset split
train = data[data['split'] == "Calibration"]
val = data[data['split'] == "Validation"]
test = data[data['split'] == "Test"]
Xtrain, ytrain = train.iloc[:, 5:], train.iloc[:, 3]
Xval, yval = val.iloc[:, 5:], val.iloc[:, 3]
Xtest, ytest = test.iloc[:, 5:], test.iloc[:, 3]
Xtrain, ytrain = Xtrain.values, ytrain.values,
Xval, yval = Xval.values, yval.values
Xtest, ytest = Xtest.values, ytest.values
Xtrain = Xtrain.reshape(-1, 1, 2016)
Xval = Xval.reshape(-1, 1, 2016)
Xtest = Xtest.reshape(-1, 1, 2016)
ytrain = ytrain.reshape(-1, 1)
yval = yval.reshape(-1, 1)
ytest = ytest.reshape(-1, 1)
# transform the numpy arrays to tensors
Xtrain_t = torch.from_numpy(Xtrain).type(torch.float).to(device)
Xval_t = torch.from_numpy(Xval).type(torch.float).to(device)
Xtest_t = torch.from_numpy(Xtest).type(torch.float).to(device)
ytrain_t = torch.from_numpy(ytrain).type(torch.float).to(device)
yval_t = torch.from_numpy(yval).type(torch.float).to(device)
ytest_t = torch.from_numpy(ytest).type(torch.float).to(device)
# create datasets and dataloaders
dataset_train = TensorDataset(Xtrain_t, ytrain_t)
dataset_val = TensorDataset(Xval_t, yval_t)
dataset_test = TensorDataset(Xtest_t, ytest_t)
train_loader = DataLoader(dataset_train, batch_size=64, shuffle=False)
val_loader = DataLoader(dataset_val, batch_size=64, shuffle=False)
test_loader = DataLoader(dataset_test, batch_size=64, shuffle=False)
3. Define the model
class CNN(nn.Module):
def __init__(self, filters=2, kernel_size=11, units=32):
super().__init__()
self.conv = nn.Conv1d(1, filters, kernel_size)
self.flatten = nn.Flatten()
self.bn = nn.BatchNorm1d((2016 - kernel_size + 1) * filters)
self.dropout = nn.Dropout(0.1)
self.fc1 = nn.Linear((2016 - kernel_size + 1) * filters, units)
self.fc2 = nn.Linear(units, 1)
# weight initializiation as variancescaling in Keras
nn.init.normal_(self.conv.weight, std=torch.sqrt(torch.tensor(1/2016)))
self.conv.bias.data.fill_(0)
nn.init.normal_(self.fc1.weight, std=torch.sqrt(torch.tensor(1/(2016 - kernel_size + 1) * filters)))
self.fc1.bias.data.fill_(0)
nn.init.normal_(self.fc2.weight, std=torch.sqrt(torch.tensor(1/units)))
self.fc2.bias.data.fill_(0)
def forward(self, x):
x = self.conv(x)
x = F.relu(x)
x = self.flatten(x)
x = self.bn(x)
x = self.dropout(x)
x = self.fc1(x)
x = F.relu(x)
x = self.fc2(x)
return x
model = CNN(2, 11)
model = model.to(device)
There are two points that need to be addressed.
One is the weight initialization. In Keras, we used variancescaling
method to initialize the weights and 0
to initialize the biases. But in Pytorch, the default initialization method is not variancescaling
. The detail of Pytorch initialization can be found in the docs. So, we need to implement the initialization method by hand.
Indeed, the initialization issue is the key of the transfer. I have found that if the weights are initialized differently, the optimized paramters in Keras are not suitable for the Pytorch code.
But the implementation here is not exactly same as the Keras. In Keras, variancescaling
is to sample from a truncted normal distribution, but I did not make the truncted distribution. The results are still satisfactory, though.
The other is the weight decay in pytorch. While for standard SGD, L2 regularization can be repalced by weight decay through reparameterization, the Adam optimizer is somewhat different. In this case, the weight decay parameter is not carefully chosen, but the model performance of pytorcg and keras are quite similar.
4. Train the model
def train(model, train_loader=train_loader, lr=1e-4, weight_decay=0.1):
length = len(train_loader)
loss_fn = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)
for idx, (X, y) in enumerate(train_loader):
optimizer.zero_grad()
pred = model(X)
loss = loss_fn(pred, y)
loss.backward()
optimizer.step()
if idx % 50 == 0:
loss_item, current = loss.item(), idx * len(X)
print(f"loss: {loss_item:>7f} [{current:>5d}/{length:>5d}]")
epochs = 2000
for t in range(epochs):
print(f"Epoch {t+1}\n-------------------------------")
train(model, train_loader, lr=1e-4, weight_decay=0.1)
model.eval()
y_pred = model(Xtrain_t)
Pytorch_Lightning code
Using pure pytorch is not as easy as Keras to create callbacks. But we can easily use Pytorch_Lightning to implement the callbacks.
The data peraperation code is the same as the pure Pytorch code, and the model definition section is also very similar.
1. Import packages
import pytorch_lightning as pl
from pytorch_lightning import Trainer
from pytorch_lightning.callbacks.early_stopping import EarlyStopping
from pytorch_lightning.callbacks import ModelCheckpoint
2. Define the model
class LitCNN(pl.LightningModule):
def __init__(self, filters=2, kernel_size=11, units=32):
super().__init__()
self.conv = nn.Conv1d(1, filters, kernel_size)
self.flatten = nn.Flatten()
self.bn = nn.BatchNorm1d((2016-kernel_size+1)*filters)
self.dropout = nn.Dropout(0.1)
self.fc1 = nn.Linear((2016-kernel_size+1)*filters, units)
self.fc2 = nn.Linear(units, 1)
nn.init.normal_(self.conv.weight, std=torch.sqrt(torch.tensor(1/2016)))
self.conv.bias.data.fill_(0)
nn.init.normal_(self.fc1.weight, std=torch.sqrt(torch.tensor(1/(2016 - kernel_size + 1) * filters)))
self.fc1.bias.data.fill_(0)
nn.init.normal_(self.fc2.weight, std=torch.sqrt(torch.tensor(1/units)))
self.fc2.bias.data.fill_(0)
def forward(self, x):
x = self.conv(x)
x = F.relu(x)
x = self.flatten(x)
x = self.bn(x)
x = self.dropout(x)
x = self.fc1(x)
x = F.relu(x)
x = self.fc2(x)
return x
def configure_optimizers(self):
optimizer = optim.Adam(self.parameters(), lr=1e-4, weight_decay=0.1)
return optimizer
def training_step(self, batch, batch_idx):
X, y = batch
ypred = self(X)
loss = F.mse_loss(ypred, y)
self.log("train_RMSE", torch.sqrt(loss))
return loss
def validation_step(self, batch, batch_idx):
X, y = batch
ypred = self(X)
loss = F.mse_loss(ypred, y)
self.log("val_RMSE", torch.sqrt(loss))
return loss
def test_step(self, batch, batch_idx):
X, y = batch
ypred = self(X)
loss = F.mse_loss(ypred, y)
self.log("test_RMSE", torch.sqrt(loss))
return loss
model_ckpt = LitCNN(2, 11)
3. Create callbacks
# earlystop (note the mini_delata should be reasonable)
earlystop = EarlyStopping(monitor='val_RMSE', min_delta=0.002, patience=500, mode='min')
# checkpoint (save the best model)
checkpoint = ModelCheckpoint(
monitor="val_RMSE",
dirpath="./checkpoint/",
filename="best",
save_top_k=1,
mode="min",
every_n_epochs=1,
save_on_train_epoch_end=True,
verbose=True
)
4. Train the model
trainer_ckpt = pl.Trainer(gpus=1, max_epochs=2000, progress_bar_refresh_rate=10,
callbacks=[earlystop, checkpoint])
trainer_ckpt.fit(model_ckpt, train_loader, val_loader)
# evaluate the last state
model_ckpt.eval()
y_pred_train = model_ckpt(Xtrain_t)
y_pred_val = model_ckpt(Xval_t)
y_pred_test = model_ckpt(Xtest_t)
print(torch.sqrt(F.mse_loss(y_pred_train, ytrain_t)))
print(torch.sqrt(F.mse_loss(y_pred_val, yval_t)))
print(torch.sqrt(F.mse_loss(y_pred_test, ytest_t)))
# evaluate the best state
model_loaded = LitCNN.load_from_checkpoint(checkpoint.best_model_path)
model_loaded.eval()
y_pred_train = model_loaded(Xtrain_t)
y_pred_val = model_loaded(Xval_t)
y_pred_test = model_loaded(Xtest_t)
print(F.mse_loss(y_pred_train, ytrain_t))
print(F.mse_loss(y_pred_val, yval_t))
print(F.mse_loss(y_pred_test, ytest_t))