Source code for models.pinn.simple_mlp

import torch.nn as nn
import torch.nn.functional as F
import torch
import torch.utils
from torch.utils.data import DataLoader

from .. import Model
from cetaceo.optimization import OptunaOptimizer
from cetaceo.data import BaseDataset
from typing import Dict, Tuple


[docs] class Net(nn.Module, Model): r""" Initialize the MLP model. Args: layer_dim (list): List containing the number of neurons in each layer. activation (function): Activation function to use. Default is F.tanh. device (torch.device): Device to use for computation. Default is torch.device("cpu"). **kwargs: Additional keyword arguments. """ def __init__( self, layer_dim, activation=F.tanh, device=torch.device("cpu"), **kwargs, ): super().__init__() self.activation = activation self.layer_dim = layer_dim self.layers = nn.ModuleList() self.device = device for i in range(1, len(self.layer_dim)): layer = nn.Linear(self.layer_dim[i - 1], self.layer_dim[i]) nn.init.xavier_uniform_(layer.weight) # Glorot initialization nn.init.constant_(layer.bias, 0.0) # Initialize bias to 0 self.layers.append(layer) self.to(device)
[docs] def forward(self, x): for layer in self.layers[:-1]: x = self.activation(layer(x)) x = self.layers[-1](x) return x
[docs] def fit( self, train_dataset: BaseDataset, eval_dataset=None, epochs=100, batch_size=32, lr=0.001, loss_fn=nn.MSELoss(), optimizer_class=torch.optim.Adam, print_rate=1, **kwargs, ): """ Fit the model to the training data. Args: train_dataset (BaseDataset): The training dataset. eval_dataset (Optional[BaseDataset]): The evaluation dataset. Default is None. epochs (int): The number of epochs to train. Default is 100. batch_size (int): The batch size. Default is 32. lr (float): The learning rate. Default is 0.001. loss_fn (nn.Module): The loss function. Default is nn.MSELoss(). optimizer_class (torch.optim.Optimizer): The optimizer class. Default is torch.optim.Adam. print_rate (int): The rate at which to print the training loss. Default is 1. **kwargs: Additional keyword arguments. """ # Create dataset and dataloader for training data dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) eval_inputs, eval_targets = eval_dataset[:] if eval_dataset is not None else ( None, None, ) # Initialize optimizer optimizer = optimizer_class(self.parameters(), lr=lr) train_losses, eval_losses = [], [] # Training loop for epoch in range(epochs): self.train() # Set model to training mode running_loss = 0.0 for inputs, targets in dataloader: optimizer.zero_grad() outputs = self.forward(inputs.to(self.device)) loss = loss_fn(outputs, targets.to(self.device)) loss.backward() optimizer.step() running_loss += loss.item() avg_train_loss = running_loss / len(dataloader) train_losses.append(avg_train_loss) if print_rate != 0 and ((epoch + 1) % print_rate == 0): print(f"Epoch {epoch+1}/{epochs}, Training Loss: {avg_train_loss}") # Evaluation on test set if provided if eval_dataset is not None: self.eval() # Set model to evaluation mode with torch.no_grad(): # Disable gradient computation eval_outputs = self.forward(eval_inputs.to(self.device)) eval_loss = loss_fn(eval_outputs, eval_targets.to(self.device)) eval_losses.append(eval_loss.item()) if print_rate != 0 and ((epoch + 1) % print_rate == 0): print( f"Epoch {epoch+1}/{epochs}, Evaluation Loss: {eval_loss.item()}" )
# return train_losses, eval_losses
[docs] def predict(self, X: BaseDataset, rescale_output=True): """ Predict the target values for the input data. Args: X (BaseDataset): The input data. rescale_output (bool): Whether to rescale the output with the scaler of the dataset. Default is True. Returns: torch.Tensor: The predicted target values. """ self.eval() inputs, _ = X[:] with torch.no_grad(): preds = self(inputs.to(self.device)) if rescale_output: preds = X.rescale_y(preds.cpu().detach().numpy()) return preds
[docs] def save(self, path): """ Save the model to a file. Args: path (str): The path to save the model. """ checkpoint = {"layer_dim": self.layer_dim, "state_dict": self.state_dict()} torch.save(checkpoint, path)
[docs] @staticmethod def load(path): """ Load the model from a file. Args: path (str): The path to load the model from. Returns: Model: The loaded model. Examples: >>> model = Net.load("model.pth") >>> model.predict(X) """ checkpoint = torch.load(path, map_location=torch.device("cpu")) model = Net(checkpoint["layer_dim"]) model.load_state_dict(checkpoint["state_dict"]) return model
# here, if different optimizers need to be used, a solution is creating as many optimization funtion as optimizers # classes are needed (which are expected to be few) and check on create_optimized_model which optimizer is being used
[docs] @classmethod def create_optimized_model(cls, train_dataset, eval_dataset, optuna_optimizer: OptunaOptimizer) -> Tuple[Model, Dict]: """ Create an optimized model using Optuna. Args: train_dataset (BaseDataset): The training dataset. eval_dataset (Optional[BaseDataset]): The evaluation dataset. optuna_optimizer (OptunaOptimizer): The optimizer to use for optimization. Returns: Tuple[Model, Dict]: A tuple containing the optimized model and the best parameters for training found by the optimizer """ optimization_params = optuna_optimizer.optimization_params input_dim, output_dim = train_dataset[0][0].shape[0], train_dataset[0][1].shape[0] def optimization_function(trial) -> float: training_params = {key : cls._get_optimizing_value(key, params, trial) for key, params in optimization_params.items()} try: hidden_layers = training_params['n_layers'] * [training_params["n_units"]] except KeyError as e: raise KeyError(f"{e.args[0]} must be in optimization_params") layer_dim = [input_dim] + hidden_layers + [output_dim] # calling the constructor with the updated parameters model = cls(layer_dim, **training_params) model.fit( train_dataset, eval_dataset=eval_dataset, **training_params ) y_pred = model.predict(eval_dataset, rescale_output=False) _, y = eval_dataset[:] return ((y_pred - y) ** 2).mean() best_params = optuna_optimizer.optimize( objective_function=optimization_function ) # update params with best ones print("Best parameters afer optimization:\n", best_params) for param in best_params.keys(): if param in optimization_params: optimization_params[param] = best_params[param] print("Using these parameters:\n", optimization_params) hidden_layers = optimization_params["n_layers"] * [ optimization_params["n_units"] ] layers = [input_dim] + hidden_layers + [output_dim] return cls(layers, **optimization_params), optimization_params
def _get_optimizing_value(name, value, trial): if isinstance(value, tuple) or isinstance(value, list): use_log = value[1] / value[0] >= 1000 if isinstance(value[0], int): return trial.suggest_int(name, value[0], value[1], log=use_log) elif isinstance(value[0], float): return trial.suggest_float(name, value[0], value[1], log=use_log) else: return value