import torch.nn as nn
import torch.nn.functional as F
import torch
import torch.utils
from torch.utils.data import DataLoader
from .. import Model
from cetaceo.optimization import OptunaOptimizer
from cetaceo.data import BaseDataset
from typing import Dict, Tuple
[docs]
class Net(nn.Module, Model):
r"""
Initialize the MLP model.
Args:
layer_dim (list): List containing the number of neurons in each layer.
activation (function): Activation function to use. Default is F.tanh.
device (torch.device): Device to use for computation. Default is torch.device("cpu").
**kwargs: Additional keyword arguments.
"""
def __init__(
self,
layer_dim,
activation=F.tanh,
device=torch.device("cpu"),
**kwargs,
):
super().__init__()
self.activation = activation
self.layer_dim = layer_dim
self.layers = nn.ModuleList()
self.device = device
for i in range(1, len(self.layer_dim)):
layer = nn.Linear(self.layer_dim[i - 1], self.layer_dim[i])
nn.init.xavier_uniform_(layer.weight) # Glorot initialization
nn.init.constant_(layer.bias, 0.0) # Initialize bias to 0
self.layers.append(layer)
self.to(device)
[docs]
def forward(self, x):
for layer in self.layers[:-1]:
x = self.activation(layer(x))
x = self.layers[-1](x)
return x
[docs]
def fit(
self,
train_dataset: BaseDataset,
eval_dataset=None,
epochs=100,
batch_size=32,
lr=0.001,
loss_fn=nn.MSELoss(),
optimizer_class=torch.optim.Adam,
print_rate=1,
**kwargs,
):
"""
Fit the model to the training data.
Args:
train_dataset (BaseDataset): The training dataset.
eval_dataset (Optional[BaseDataset]): The evaluation dataset. Default is None.
epochs (int): The number of epochs to train. Default is 100.
batch_size (int): The batch size. Default is 32.
lr (float): The learning rate. Default is 0.001.
loss_fn (nn.Module): The loss function. Default is nn.MSELoss().
optimizer_class (torch.optim.Optimizer): The optimizer class. Default is torch.optim.Adam.
print_rate (int): The rate at which to print the training loss. Default is 1.
**kwargs: Additional keyword arguments.
"""
# Create dataset and dataloader for training data
dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
eval_inputs, eval_targets = eval_dataset[:] if eval_dataset is not None else (
None,
None,
)
# Initialize optimizer
optimizer = optimizer_class(self.parameters(), lr=lr)
train_losses, eval_losses = [], []
# Training loop
for epoch in range(epochs):
self.train() # Set model to training mode
running_loss = 0.0
for inputs, targets in dataloader:
optimizer.zero_grad()
outputs = self.forward(inputs.to(self.device))
loss = loss_fn(outputs, targets.to(self.device))
loss.backward()
optimizer.step()
running_loss += loss.item()
avg_train_loss = running_loss / len(dataloader)
train_losses.append(avg_train_loss)
if print_rate != 0 and ((epoch + 1) % print_rate == 0):
print(f"Epoch {epoch+1}/{epochs}, Training Loss: {avg_train_loss}")
# Evaluation on test set if provided
if eval_dataset is not None:
self.eval() # Set model to evaluation mode
with torch.no_grad(): # Disable gradient computation
eval_outputs = self.forward(eval_inputs.to(self.device))
eval_loss = loss_fn(eval_outputs, eval_targets.to(self.device))
eval_losses.append(eval_loss.item())
if print_rate != 0 and ((epoch + 1) % print_rate == 0):
print(
f"Epoch {epoch+1}/{epochs}, Evaluation Loss: {eval_loss.item()}"
)
# return train_losses, eval_losses
[docs]
def predict(self, X: BaseDataset, rescale_output=True):
"""
Predict the target values for the input data.
Args:
X (BaseDataset): The input data.
rescale_output (bool): Whether to rescale the output with the scaler of the dataset. Default is True.
Returns:
torch.Tensor: The predicted target values.
"""
self.eval()
inputs, _ = X[:]
with torch.no_grad():
preds = self(inputs.to(self.device))
if rescale_output:
preds = X.rescale_y(preds.cpu().detach().numpy())
return preds
[docs]
def save(self, path):
"""
Save the model to a file.
Args:
path (str): The path to save the model.
"""
checkpoint = {"layer_dim": self.layer_dim, "state_dict": self.state_dict()}
torch.save(checkpoint, path)
[docs]
@staticmethod
def load(path):
"""
Load the model from a file.
Args:
path (str): The path to load the model from.
Returns:
Model: The loaded model.
Examples:
>>> model = Net.load("model.pth")
>>> model.predict(X)
"""
checkpoint = torch.load(path, map_location=torch.device("cpu"))
model = Net(checkpoint["layer_dim"])
model.load_state_dict(checkpoint["state_dict"])
return model
# here, if different optimizers need to be used, a solution is creating as many optimization funtion as optimizers
# classes are needed (which are expected to be few) and check on create_optimized_model which optimizer is being used
[docs]
@classmethod
def create_optimized_model(cls, train_dataset, eval_dataset, optuna_optimizer: OptunaOptimizer) -> Tuple[Model, Dict]:
"""
Create an optimized model using Optuna.
Args:
train_dataset (BaseDataset): The training dataset.
eval_dataset (Optional[BaseDataset]): The evaluation dataset.
optuna_optimizer (OptunaOptimizer): The optimizer to use for optimization.
Returns:
Tuple[Model, Dict]: A tuple containing the optimized model and the best parameters for training found by the optimizer
"""
optimization_params = optuna_optimizer.optimization_params
input_dim, output_dim = train_dataset[0][0].shape[0], train_dataset[0][1].shape[0]
def optimization_function(trial) -> float:
training_params = {key : cls._get_optimizing_value(key, params, trial) for key, params in optimization_params.items()}
try:
hidden_layers = training_params['n_layers'] * [training_params["n_units"]]
except KeyError as e:
raise KeyError(f"{e.args[0]} must be in optimization_params")
layer_dim = [input_dim] + hidden_layers + [output_dim]
# calling the constructor with the updated parameters
model = cls(layer_dim, **training_params)
model.fit(
train_dataset,
eval_dataset=eval_dataset,
**training_params
)
y_pred = model.predict(eval_dataset, rescale_output=False)
_, y = eval_dataset[:]
return ((y_pred - y) ** 2).mean()
best_params = optuna_optimizer.optimize(
objective_function=optimization_function
)
# update params with best ones
print("Best parameters afer optimization:\n", best_params)
for param in best_params.keys():
if param in optimization_params:
optimization_params[param] = best_params[param]
print("Using these parameters:\n", optimization_params)
hidden_layers = optimization_params["n_layers"] * [
optimization_params["n_units"]
]
layers = [input_dim] + hidden_layers + [output_dim]
return cls(layers, **optimization_params), optimization_params
def _get_optimizing_value(name, value, trial):
if isinstance(value, tuple) or isinstance(value, list):
use_log = value[1] / value[0] >= 1000
if isinstance(value[0], int):
return trial.suggest_int(name, value[0], value[1], log=use_log)
elif isinstance(value[0], float):
return trial.suggest_float(name, value[0], value[1], log=use_log)
else:
return value