Source code for models.KANupm_v8

import torch
import torch.nn as nn
import time
from torch.utils.data import DataLoader, Dataset
import torch.optim as optim
import numpy as np
import os
from cetaceo.data import BaseDataset
import psutil
import matplotlib.pyplot as plt
[docs] class KANupm(nn.Module): r""" KAN (Kolmogorov-Arnold Network) model for regression tasks. This model is based on https://arxiv.org/abs/2404.19756, inspired by the Kolmogorov-Arnold representation theorem. Args: ninput (int): The number of input features. noutput (int): The number of output features. nlayers (int): The number of hidden layers. hidden_neur (int): The number of neurons in the hidden layers. layer_type (nn.Module): The type of layer to use in the model. It can be one of the following: ``JacobiLayer``, ``ChebyshevLayer``. model_name (str): The name of the model. dropout_p (float, optional): The dropout probability (default: ``0.0``). device (torch.device, optional): The device where the model is loaded (default: gpu if available). **layer_kwargs: Additional keyword arguments to pass to the layer type. For example, the order of the Taylor series or the degree of the Chebyshev polynomial. """ def __init__( self, ninput: int, noutput: int, nlayers: int, hidden_neur: int, layer_type, model_name: str, dropout_p:float = 0, device: torch.device = torch.device("cpu"), intro:bool = True, degree = 5, # **layer_kwargs ): super(KANupm, self).__init__() self.ninput = ninput self.noutput = noutput self.nlayers = nlayers self.hidden_neur = hidden_neur self.layer_type = layer_type self.model_name = model_name self.dropout_p = dropout_p self.device = device self.degree = degree self.intro = intro hidden_layers = [] for i in range(nlayers): hidden_layers.append(layer_type(hidden_neur, hidden_neur, degree)) hidden_layers.append(nn.Dropout(p=dropout_p)) self.kan_layers = nn.ModuleList(hidden_layers) self.input = layer_type(ninput, hidden_neur, degree) self.output = layer_type(hidden_neur, noutput, degree) self.to(self.device) if intro: print(f"Creating model KAN: {self.model_name} - v8.0") keys_print = ['ninput', 'noutput', 'nlayers', 'hidden_neur', 'layer_type', 'dropout_p', 'device'] for key in keys_print: print(f" {key}: {getattr(self, key)}") print(f' total_size (trained params): {self.count_trainable_params()}')
[docs] def forward(self, x): x = self.input(x) for layer in self.kan_layers: x = layer(x) x = self.output(x) return x
[docs] def fit( self, train_dataset, eval_dataset, batch_size: int = 32, epochs: int = 100, lr: float = 0.001, optimizer=optim.Adam, scheduler_type="StepLR", opti_kwargs={}, lr_kwargs={}, print_eval_rate: int = 2, loss_fn=nn.MSELoss(), save_logs_path=None, intro: bool = True, max_norm_grad=float("inf"), **kwargs, ): r""" Train the model using the provided training dataset. The model is trained using the Adam optimizer with the provided learning rate and learning rate decay factor. Args: train_dataset: The training dataset. eval_dataset: The evaluation dataset. epochs (int): The number of epochs to train the model. batch_size (int): The batch size. lr (float): The learning rate for the Adam optimizer. optimizer (torch.optim, optional): The optimizer to use. All PyTorch optimizers except AdaDelta are available. (default: ``optim.Adam``). scheduler_type (str, optional): Type of scheduler used to dynamically adjust the learning rate. Available options: - "StepLR": Decreases the learning rate after a specified number of epochs. - "ReduceLROnPlateau": Reduces the learning rate when the loss stops improving. - "OneCycleLR": Adjusts the learning rate following a single cycle throughout training. (Default: "StepLR"). lr_kwargs (dict, optional): Dictionary with specific parameters for the selected scheduler. Examples: - For StepLR: {"step_size": int (example: (epochs_per_step*len(train_dataset)) // (batch_size)), "gamma": float}. - For ReduceLROnPlateau: {"mode": str, "factor": float, "patience": int}. - For OneCycleLR: {"anneal_strategy": str, "div_factor": float}. (Default: `{}`). print_eval_rate (int, optional): The model will be evaluated every ``print_eval_rate`` epochs, and the losses will be printed. If set to 0, no evaluations will be displayed (default: ``2``). loss_fn (torch.nn.Module, optional): The loss function to be optimized (default: ``nn.MSELoss()``). save_logs_path (str, optional): Path to save the training and evaluation losses as `.npy` files. If set to `None`, no logs will be saved. (Default: ``None``). intro (bool, optional): Whether to print model training information (default: ``True``). max_norm_grad (float, optional): The maximum gradient norm allowed. If set to `float('inf')`, no restriction is applied (default: ``float('inf')``). kwargs (dict, optional): Additional keyword arguments to be passed to the DataLoader. These can be used to configure DataLoader parameters (see the PyTorch documentation at https://pytorch.org/docs/stable/data.html#torch.utils.data.DataLoader): - batch_size (int, optional): Batch size (default: ``32``). - shuffle (bool, optional): Whether to shuffle the data (default: ``True``). - num_workers (int, optional): Number of workers to use for data loading (default: ``0``). - pin_memory (bool, optional): Whether to use pinned memory (default: ``True``). """ start_n_time = time.time() if intro: print(" ") print(f"TRAINNING MODEL {self.model_name} - v8.0") print(" ") print("Conditions:") print(f" epochs: {epochs}") print(f" batch size: 2**{int(np.log2(batch_size))}") print(f" optimizer: {optimizer}") print(f" scheduler: {scheduler_type}") print(f" loss_fn: {loss_fn}") print(f" save_path: {save_logs_path}") print(" ") print("Scheduler conditions:") for key, value in sorted(lr_kwargs.items()): if isinstance(value, dict): print(f" {key}:") for subkey, subvalue in sorted(value.items()): print(f" {subkey}: {subvalue}") else: print(f" {key}: {value}") print(" ") dataloader_params = { "batch_size": batch_size, "shuffle": True, "num_workers": 0, "pin_memory": True, } for key in dataloader_params.keys(): if key in kwargs: dataloader_params[key] = kwargs[key] train_loader = DataLoader(train_dataset, **dataloader_params) test_loader = DataLoader(eval_dataset, **dataloader_params) train_losses = torch.tensor([], device=self.device) test_losses = torch.tensor([], device=self.device) loss_iterations_train = [] loss_iterations_test = [] optimizer = optimizer(self.parameters(), lr=lr, **opti_kwargs) current_lr_vec = [] grad_norms = [] if scheduler_type == "StepLR": scheduler = torch.optim.lr_scheduler.StepLR(optimizer, **lr_kwargs) elif scheduler_type == "ReduceLROnPlateau": scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau( optimizer, **lr_kwargs ) elif scheduler_type == "OneCycleLR": scheduler = torch.optim.lr_scheduler.OneCycleLR( optimizer, max_lr=lr, steps_per_epoch=len(train_loader)+1, epochs=1, **lr_kwargs, ) else: raise ValueError(f"Unprogrammed scheduler_type: {scheduler_type}") if hasattr(self, "optimizer_state_dict"): self.optimizer.load_state_dict(self.optimizer_state_dict) del self.optimizer_state_dict if hasattr(self, "scheduler_state_dict"): self.scheduler.load_state_dict(self.scheduler_state_dict) del self.scheduler_state_dict for epoch in range(epochs): self.train() train_loss = 0.0 def closure(): optimizer.zero_grad() outputs = self(inputs) loss = loss_fn(outputs, targets.reshape(outputs.shape)) loss.backward() torch.nn.utils.clip_grad_norm_( self.parameters(), max_norm=max_norm_grad ) loss_iterations_train.append(loss.item()) return loss for inputs, targets in train_loader: inputs, targets = ( inputs.float().to(self.device), targets.float().to(self.device), ) train_loss += optimizer.step(closure).item() total_norm = torch.norm( torch.stack([p.grad.norm() for p in self.parameters() if p.grad is not None]) ) grad_norms.append(total_norm.item()) if scheduler_type != "ReduceLROnPlateau": scheduler.step() current_lr = optimizer.param_groups[0]["lr"] current_lr_vec.append(current_lr) train_loss /= len(train_loader) train_losses = torch.cat( ( train_losses, torch.tensor([train_loss], dtype=torch.float64, device=self.device), ) ) if scheduler_type == "ReduceLROnPlateau": scheduler.step(train_loss) if (epoch + 1) % print_eval_rate == 0: self.eval() test_loss = 0.0 with torch.no_grad(): for inputs, targets in test_loader: inputs, targets = ( inputs.float().to(self.device), targets.float().to(self.device), ) outputs = self(inputs) loss = loss_fn(outputs, targets.reshape(outputs.shape)) loss_iterations_test.append(loss.item()) test_loss += loss.item() test_loss /= len(test_loader) test_losses = torch.cat( ( test_losses, torch.tensor( [test_loss], dtype=torch.float64, device=self.device ), ) ) current_lr = optimizer.param_groups[0]["lr"] current_lr_vec.append(current_lr) if torch.cuda.is_available(): mem_used = torch.cuda.memory_allocated() / ( 1024**2 ) # Uso de memoria en MB else: mem = psutil.virtual_memory() mem_used = mem.used / (1024**2) # Uso de memoria RAM en MB print( f"Epoch {epoch + 1}/{epochs}, Train Loss: {train_loss:.4e}, Test Loss: {test_loss:.4e}, " f"LR: {current_lr:.2e}, MEM: {mem_used:.2f} MB, Grad_norm: {total_norm:.2e}", ) # Check for NaN in test_loss and break if detected if torch.isnan(torch.tensor(test_loss)): print(f"Stopping training at epoch {epoch + 1} due to NaN in test loss.") results = { "train_loss": train_losses.cpu().numpy(), "test_loss": test_losses.cpu().numpy(), "lr": np.array(current_lr_vec), "loss_iterations_train": np.array(loss_iterations_train), "loss_iterations_test": np.array(loss_iterations_test), "grad_norms": np.array(grad_norms), 'check': [False] } if save_logs_path is not None: if os.path.isdir(save_logs_path): print(f"Printing losses on path {save_logs_path}") else: print("Path not found. Printing losses on local folder (.)") save_logs_path = '.' if os.path.isfile(save_logs_path + f"training_results_{self.model_name}.npy"): results_old = np.load(save_logs_path + f"training_results_{self.model_name}.npy", allow_pickle=True).item() for key in results.keys(): if key != 'check': results[key]=np.concatenate((results[key], results_old[key]), axis=0) else: results[key].extend(results_old[key][:]) print("Updating previous data in file" + save_logs_path + f"training_results_{self.model_name}.npy") else: pass ## PUEDO INTRODUCIR AQUÍ QUE SI EXISTE UN ARCHIVO DEL MISMO NOMBRE QUE CONCATENE LOS RESULTADOS. ## CUIDADO SI RESULTA QUE EL DICCIONARIO DE AHORA TIENE MÁS CLAVES QUE EL ANTERIOR, AUNQUE NO DEBERÍA PUES ES LA MISMA VERSIÓN DE MODELO Y ENTRENAMIENTO. print(f"Training results saved at {save_logs_path}training_results_{self.model_name}.npy") np.save(save_logs_path + f"/training_results_{self.model_name}.npy", results) end_n_time = time.time() self.print_hours(start_n_time, end_n_time, epochs=None) return results results = { "train_loss": train_losses.cpu().numpy(), "test_loss": test_losses.cpu().numpy(), "lr": np.array(current_lr_vec), "loss_iterations_train": np.array(loss_iterations_train), "loss_iterations_test": np.array(loss_iterations_test), "grad_norms": np.array(grad_norms), "check": [True], } if save_logs_path is not None: if os.path.isdir(save_logs_path): print(f"Printing losses on path {save_logs_path}") else: print("Path not found. Printing losses on local folder (.)") save_logs_path = '.' if os.path.isfile(save_logs_path + f"training_results_{self.model_name}.npy"): results_old = np.load(save_logs_path + f"training_results_{self.model_name}.npy", allow_pickle=True).item() for key in results.keys(): if key != 'check': results[key]=np.concatenate((results[key], results_old[key]), axis=0) else: results[key].extend(results_old[key][:]) print("Updating previous data in file" + save_logs_path + f"training_results_{self.model_name}.npy") else: pass np.save(save_logs_path + f"training_results_{self.model_name}.npy", results) print(f"Training results saved at {save_logs_path}training_results_{self.model_name}.npy") end_n_time = time.time() self.print_hours(start_n_time, end_n_time, epochs) return results
[docs] def fit_scored( self, train_dataset, eval_dataset, batch_size: int = 32, epochs: int = 100, lr: float = 0.001, optimizer=optim.Adam, scheduler_type="StepLR", opti_kwargs={}, lr_kwargs={}, print_eval_rate: int = 2, save_logs_path=None, intro: bool = True, max_norm_grad=float("inf"), **kwargs, ): r""" Train the model using the provided training dataset (input, score, target). The model is trained using the Adam optimizer with the provided learning rate and learning rate decay factor. Args: train_dataset: The training dataset. Must be made by MakeDatasetScored_kan eval_dataset: The evaluation dataset. Must be made by MakeDatasetScored_kan epochs (int): The number of epochs to train the model. batch_size (int): The batch size. lr (float): The learning rate for the Adam optimizer. optimizer (torch.optim, optional): The optimizer to use. All PyTorch optimizers except AdaDelta are available. (default: ``optim.Adam``). scheduler_type (str, optional): Type of scheduler used to dynamically adjust the learning rate. Available options: - "StepLR": Decreases the learning rate after a specified number of epochs. - "ReduceLROnPlateau": Reduces the learning rate when the loss stops improving. - "OneCycleLR": Adjusts the learning rate following a single cycle throughout training. (Default: "StepLR"). lr_kwargs (dict, optional): Dictionary with specific parameters for the selected scheduler. Examples: - For StepLR: {"step_size": int, "gamma": float}. - For ReduceLROnPlateau: {"mode": str, "factor": float, "patience": int}. - For OneCycleLR: {"anneal_strategy": str, "div_factor": float}. (Default: `{}`). print_eval_rate (int, optional): The model will be evaluated every ``print_eval_rate`` epochs, and the losses will be printed. If set to 0, no evaluations will be displayed (default: ``2``). save_logs_path (str, optional): Path to save the training and evaluation losses as `.npy` files. If set to `None`, no logs will be saved. (Default: ``None``). intro (bool, optional): Whether to print model training information (default: ``True``). max_norm_grad (float, optional): The maximum gradient norm allowed. If set to `float('inf')`, no restriction is applied (default: ``float('inf')``). kwargs (dict, optional): Additional keyword arguments to be passed to the DataLoader. These can be used to configure DataLoader parameters (see the PyTorch documentation at https://pytorch.org/docs/stable/data.html#torch.utils.data.DataLoader): - batch_size (int, optional): Batch size (default: ``32``). - shuffle (bool, optional): Whether to shuffle the data (default: ``True``). - num_workers (int, optional): Number of workers to use for data loading (default: ``0``). - pin_memory (bool, optional): Whether to use pinned memory (default: ``True``). """ start_n_time = time.time() if intro: print(" ") print(f"TRAINNING MODEL {self.model_name} - Scored - v6.1") print(" ") print("Conditions:") print(f" epochs: {epochs}") # print(f" batch size: 2**{int(np.log2(batch_size))}") print(f" optimizer: {optimizer}") print(f" scheduler: {scheduler_type}") print(f" save_path: {save_logs_path}") print(" ") print("Scheduler conditions:") for key, value in sorted(lr_kwargs.items()): if isinstance(value, dict): print(f" {key}:") for subkey, subvalue in sorted(value.items()): print(f" {subkey}: {subvalue}") else: print(f" {key}: {value}") print(" ") dataloader_params = { "batch_size": batch_size, "shuffle": True, "num_workers": 10, "pin_memory": True, } for key in dataloader_params.keys(): if key in kwargs: dataloader_params[key] = kwargs[key] train_loader = DataLoader( train_dataset, # batch_sampler=sampler, **dataloader_params) test_loader = DataLoader( eval_dataset, # batch_sampler=sampler, **dataloader_params) # train_loader = DataLoader(train_dataset, **dataloader_params) # test_loader = DataLoader(eval_dataset, **dataloader_params) train_losses = torch.tensor([], device=self.device) test_losses = torch.tensor([], device=self.device) loss_iterations_train = [] loss_iterations_test = [] var_loss_iterations_test = [] optimizer = optimizer(self.parameters(), lr=lr, **opti_kwargs) current_lr_vec = [] grad_norms = [] p_values = [] if scheduler_type == "StepLR": scheduler = torch.optim.lr_scheduler.StepLR(optimizer, **lr_kwargs) elif scheduler_type == "ReduceLROnPlateau": scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau( optimizer, **lr_kwargs ) elif scheduler_type == "OneCycleLR": scheduler = torch.optim.lr_scheduler.OneCycleLR( optimizer, max_lr=lr, steps_per_epoch=len(train_loader), epochs=epochs, **lr_kwargs, ) else: raise ValueError(f"Unprogrammed scheduler_type: {scheduler_type}") if hasattr(self, "optimizer_state_dict"): self.optimizer.load_state_dict(self.optimizer_state_dict) del self.optimizer_state_dict if hasattr(self, "scheduler_state_dict"): self.scheduler.load_state_dict(self.scheduler_state_dict) del self.scheduler_state_dict loss_fn_train = NormPLoss() loss_fn_test = StatisticalLoss() current_p = 2 p_values.append(current_p) for epoch in range(epochs): self.train() train_loss = 0.0 def closure(): optimizer.zero_grad() outputs = self(inputs) loss = loss_fn_train(outputs, scores, targets, p=current_p) loss.backward() torch.nn.utils.clip_grad_norm_( self.parameters(), max_norm=max_norm_grad ) loss_iterations_train.append(loss.item()) return loss for inputs, scores, targets in train_loader: inputs, scores, targets = ( inputs.float().to(self.device), scores.float().to(self.device), targets.float().to(self.device), ) train_loss += optimizer.step(closure).item() ### INTRODUCIR UN P ACORDE AL ERROR TEST QUE ME SALGA total_norm = torch.norm( torch.stack([param.grad.norm() for param in self.parameters() if param.grad is not None]) ) grad_norms.append(total_norm.item()) if scheduler_type != "ReduceLROnPlateau": scheduler.step() current_lr = optimizer.param_groups[0]["lr"] current_lr_vec.append(current_lr) train_loss /= len(train_loader) train_losses = torch.cat( ( train_losses, torch.tensor( [train_loss], dtype=torch.float64,device=self.device ), ) ) if scheduler_type == "ReduceLROnPlateau": scheduler.step(train_loss) if (epoch + 1) % print_eval_rate == 0: self.eval() test_loss = 0.0 with torch.no_grad(): for inputs, _, targets in test_loader: inputs, targets = ( inputs.float().to(self.device), targets.float().to(self.device), ) outputs = self(inputs) #miniestufdio estadistico loss, var_loss = loss_fn_test(outputs, targets) loss_iterations_test.append(loss.item()) var_loss_iterations_test.append(var_loss.item()) test_loss += loss.item() var_loss += var_loss.item() test_loss /= len(test_loader) var_loss /= len(test_loader) current_p = min(10, max(1, 2 + (var_loss - test_loss))) p_values.append(current_p) test_losses = torch.cat( ( test_losses, torch.tensor( [test_loss], dtype=torch.float64, device=self.device ), ) ) current_lr = optimizer.param_groups[0]["lr"] current_lr_vec.append(current_lr) if torch.cuda.is_available(): mem_used = torch.cuda.memory_allocated() / ( 1024**2 ) # Uso de memoria en MB else: mem = psutil.virtual_memory() mem_used = mem.used / (1024**2) # Uso de memoria RAM en MB print( f"Epoch {epoch + 1}/{epochs}, Train Loss: {train_loss:.4e}, Test Loss: {test_loss:.4e}, " f"LR: {current_lr:.2e}, MEM: {mem_used:.2f} MB, Grad_norm: {total_norm:.2e}", ) # Check for NaN in test_loss and break if detected if torch.isnan(torch.tensor(test_loss)): print(f"Stopping training at epoch {epoch + 1} due to NaN in test loss.") results = { "train_loss": train_losses.cpu().numpy(), "test_loss": test_losses.cpu().numpy(), "lr": np.array(current_lr_vec), "loss_iterations_train": np.array(loss_iterations_train), "loss_iterations_test": np.array(loss_iterations_test), "var_loss_iterations_test": np.array(var_loss_iterations_test), "grad_norms": np.array(grad_norms), 'check': [False] } if save_logs_path is not None: if os.path.isdir(save_logs_path): print(f"Printing losses on path {save_logs_path}") else: print("Path not found. Printing losses on local folder (.)") save_logs_path = '.' if os.path.isfile(save_logs_path + f"training_results_{self.model_name}.npy"): results_old = np.load(save_logs_path + f"training_results_{self.model_name}.npy", allow_pickle=True).item() for key in results.keys(): if key != 'check': results[key]=np.concatenate((results[key], results_old[key]), axis=0) else: results[key].extend(results_old[key][:]) print("Updating previous data in file" + save_logs_path + f"training_results_{self.model_name}.npy") else: pass ## PUEDO INTRODUCIR AQUÍ QUE SI EXISTE UN ARCHIVO DEL MISMO NOMBRE QUE CONCATENE LOS RESULTADOS. ## CUIDADO SI RESULTA QUE EL DICCIONARIO DE AHORA TIENE MÁS CLAVES QUE EL ANTERIOR, AUNQUE NO DEBERÍA PUES ES LA MISMA VERSIÓN DE MODELO Y ENMTRENAMIENTO. print(f"Training results saved at {save_logs_path}training_results_{self.model_name}.npy") np.save(save_logs_path + f"/training_results_{self.model_name}.npy", results) end_n_time = time.time() self.print_hours(start_n_time, end_n_time, epochs=None) return results results = { "train_loss": train_losses.cpu().numpy(), "test_loss": test_losses.cpu().numpy(), "lr": np.array(current_lr_vec), "loss_iterations_train": np.array(loss_iterations_train), "loss_iterations_test": np.array(loss_iterations_test), "var_loss_iterations_test": np.array(var_loss_iterations_test), "grad_norms": np.array(grad_norms), "check": [True], } if save_logs_path is not None: if os.path.isdir(save_logs_path): print(f"Printing losses on path {save_logs_path}") else: print("Path not found. Printing losses on local folder (.)") save_logs_path = '.' if os.path.isfile(save_logs_path + f"training_results_{self.model_name}.npy"): results_old = np.load(save_logs_path + f"training_results_{self.model_name}.npy", allow_pickle=True).item() for key in results.keys(): if key != 'check': results[key]=np.concatenate((results[key], results_old[key]), axis=0) else: results[key].extend(results_old[key][:]) print("Updating previous data in file" + save_logs_path + f"training_results_{self.model_name}.npy") else: pass np.save(save_logs_path + f"training_results_{self.model_name}.npy", results) print(f"Training results saved at {save_logs_path}training_results_{self.model_name}.npy") end_n_time = time.time() self.print_hours(start_n_time, end_n_time, epochs) return results
[docs] def exam(self, data: dict, **kwargs, ): """ Recibe un set con datos de test y resultados, además de las referencias de escalados. Devuelve un array de numpy de predicciones y otro de valores verdaderos para comparar. SOLO FUNCIONA CON DATOS VERSIÓN JARAIZ. Args: data (dictionary): Diccionario de datos con las keys: - tensor (torch.Tensor): Tensor con los datos reales - scaled (torch.Tensor): Tensor con los datos escalados. - mins (torch.Tensor): Tensor unidimensional con los valores mínimos de las columnas de datos. - maxs (torch.Tensor): Tensor unidimensional con los valores máximos de las columnas de datos. - info (dict): Diccionario con información adicional sobre los datos. kwargs (dict, optional): Additional keyword arguments to pass to the DataLoader. Can be used to set the parameters of the DataLoader (see PyTorch documentation at https://pytorch.org/docs/stable/data.html#torch.utils.data.DataLoader): - batch_size (int, optional): Batch size (default: ``32``). - shuffle (bool, optional): Shuffle the data (default: ``True``). - num_workers (int, optional): Number of workers to use (default: ``0``). - pin_memory (bool, optional): Pin memory (default: ``True``). Returns: mse: Mean Square Error entre la predicción y los resultados reales. outs: Predicción del modelo, reescalado o no en función del parámetro de entrada correspondiente. targ: Valores de referencia, reescalados o no en función del parámetro de entrada correspondiente. """ assert data['scaled'].shape[1] == (self.ninput + self.noutput), \ f"El tensor de datos debe tener {self.ninput + self.noutput} columnas (ninput + noutput)." mins=data['mins'].detach().numpy() maxs=data['maxs'].detach().numpy() # Separar los datos en entradas (ninput columnas) y salidas (noutput columnas) inputs = data['scaled'][:, :self.ninput] outputs = data['scaled'][:, self.ninput:] # Convertir las entradas y salidas en un Dataset de PyTorch dataset = MakeDataset_kan(tensor_data=data) dataloader_params = { "batch_size": 2**6, "shuffle": False, "num_workers": 5, "pin_memory": True, } for key in dataloader_params.keys(): if key in kwargs: dataloader_params[key] = kwargs[key] loader = DataLoader(dataset, **dataloader_params) outs=[] targ=[] self.eval() self.to(self.device) with torch.no_grad(): for inputs, targets in loader: # inputs, targets = inputs.to(self.device), targets.to(self.device) outputs = self(inputs.to(self.device)) outs.extend(outputs) targ.extend(targets) outs = np.array([out.cpu().numpy() for out in outs]) targ = np.array([tar.cpu().numpy() for tar in targ]) # outs = np.array(outs) # targ = np.array(targ) assert outs.shape == targ.shape, \ f"Shape mismatch between predictions and targets: {outs.shape} vs {targ.shape}" assert mins.shape[0] == maxs.shape[0], "mins and maxs must have the same shape" assert mins.shape[0] == (self.ninput + self.noutput), \ f"mins/maxs shape mismatch: expected {self.ninput + self.noutput}, got {mins.shape[0]}" outs=mins[self.ninput:] + (maxs[self.ninput:] - mins[self.ninput:])*outs targ=mins[self.ninput:] + (maxs[self.ninput:] - mins[self.ninput:])*targ mse=np.mean((outs - targ) ** 2) return mse, outs, targ
[docs] def predict( self, X: BaseDataset, rescale_output: bool = True, return_targets: bool = False, **kwargs, ): r""" Predict the target values for the input data. The dataset is loaded to a DataLoader with the provided keyword arguments. The model is set to evaluation mode and the predictions are made using the input data. The output can be rescaled using the dataset scaler. Args: X (BaseDataset): The dataset whose target values are to be predicted using the input data. rescale_output (bool): Whether to rescale the output with the scaler of the dataset (default: ``True``). kwargs (dict, optional): Additional keyword arguments to pass to the DataLoader. Can be used to set the parameters of the DataLoader (see PyTorch documentation at https://pytorch.org/docs/stable/data.html#torch.utils.data.DataLoader): - batch_size (int, optional): Batch size (default: ``32``). - shuffle (bool, optional): Shuffle the data (default: ``True``). - num_workers (int, optional): Number of workers to use (default: ``0``). - pin_memory (bool, optional): Pin memory (default: ``True``). Returns: Tuple [np.ndarray, np.ndarray]: The predictions and the true target values. """ dataloader_params = { "batch_size": 2**5, "shuffle": False, "num_workers": 0, "pin_memory": True, } for key in dataloader_params.keys(): if key in kwargs: dataloader_params[key] = kwargs[key] predict_dataloader = DataLoader(X, **dataloader_params) total_rows = len(predict_dataloader.dataset) num_columns = self.noutput all_predictions = np.empty((total_rows, num_columns)) all_targets = np.empty((total_rows, num_columns)) self.eval() start_idx = 0 with torch.no_grad(): for x, y in predict_dataloader: output = self(x.to(self.device)) batch_size = x.size(0) end_idx = start_idx + batch_size all_predictions[start_idx:end_idx, :] = output.cpu().numpy() all_targets[start_idx:end_idx, :] = y.cpu().numpy() start_idx = end_idx if rescale_output: all_predictions = X.rescale_y(np.array(all_predictions)) all_targets = X.rescale_y(np.array(all_targets)) if return_targets: return all_predictions, all_targets else: return all_predictions
[docs] def define_checkpoint(self): checkpoint = { "ninput": self.ninput, "noutput": self.noutput, "nlayers": self.nlayers, "hidden_neur": self.hidden_neur, "layer_type": self.layer_type, "model_name": self.model_name, "dropout_p": self.dropout_p, "degree": self.degree, "state_dict": self.state_dict(), } return checkpoint
[docs] def save( self, path: str, version:int=0 ): r""" Save the model to a checkpoint file. Args: path (str): Path to save the model. It can be either a path to a directory or a file name. If it is a directory, the model will be saved with a filename that includes the number of epochs trained. save_only_model (bool, optional): Whether to only save the model, or also the optimizer and scheduler. Note that when this is true, you won't be able to resume training from checkpoint.(default: ``False``). version (int): An integer which describe the model's version. Zero means no version. """ # checkpoint = { # "ninput": self.ninput, # "noutput": self.noutput, # "nlayers": self.nlayers, # "hidden_neur": self.hidden_neur, # "layer_type": self.layer_type, # "model_name": self.model_name, # "dropout": self.dropout_p, # "device": self.device, # "state_dict": self.state_dict(), # } # if isinstance(self.input, TaylorLayer): # checkpoint['order'] = self.input.order # elif hasattr(self.input, 'degree'): # checkpoint['degree'] = self.input.degree checkpoint = self.define_checkpoint() if not os.path.exists(path): os.makedirs(path) filename = f'{self.model_name}' + '.pth' path = path + filename torch.save(checkpoint, path)
[docs] @classmethod def load( cls, path: str, device: torch.device = torch.device("cpu") ): """ Loads a model from a checkpoint file. Args: path (str): Path to the checkpoint file. device (torch.device): Device where the model is loaded (default: cpu). Returns: model (KAN): The loaded KAN model with the trained weights. """ print('Loading model...') checkpoint = torch.load(path, map_location=device) state_dict = checkpoint['state_dict'] if 'device' in checkpoint: del checkpoint['device'] del checkpoint['state_dict'] model = cls(device = device, **checkpoint) # model = cls( # ninput=checkpoint['ninput'], # noutput=checkpoint['noutput'], # nlayers= checkpoint['nlayers'], # hidden_neur=checkpoint['hidden_neur'], # layer_type=checkpoint['layer_type'], # model_name=checkpoint['model_name'], # dropout_p=checkpoint['dropout'], # device=device, # **layer_kwargs # Pasar los argumentos específicos de la capa # ) model.load_state_dict(state_dict) print(f"Loaded model KAN: {checkpoint['model_name']}") keys_print=['ninput', 'noutput', 'nlayers', 'hidden_neur', 'layer_type', 'dropout_p', 'degree'] for key in keys_print: print(f" {key}: {checkpoint[key]}") return model
[docs] def print_hours(self, start, end, epochs=None): """ Recibe dos momentos temporales e imprime el intervalo de tiempo en formato sexagesimal """ total = end - start hours, remainder = divmod(total, 3600) minutes, seconds = divmod(remainder, 60) if epochs is not None: vel=total/epochs if (vel<60): print(f'Velocidad de entrenamiento para {self.model_name}: {vel:.2f} seg/epoch') elif (vel>60) & (vel<3600): print(f'Velocidad de entrenamiento para {self.model_name}: {vel/60:.2f} min/epoch') else: print(f'Velocidad de entrenamiento para {self.model_name}: {vel/3600:.2f} hours/epoch') print(f'Tiempo total para {self.model_name}: {int(hours):02}:{int(minutes):02}:{int(seconds):02}')
[docs] def count_trainable_params(self): """ Devuelve el número total de parámetros entrenables del modelo. Returns: int: Número de parámetros entrenables. """ return sum(p.numel() for p in self.parameters() if p.requires_grad)
[docs] def get_weights( self, layer_idx: int, neuron_idx: int, print_info: bool = False, ploted: bool = False ): """ Retrieve the weights of a specific neuron in a given layer. Args: layer_idx (int): The index of the layer to inspect. neuron_idx (int): The index of the neuron in that layer. print_info (bool): If True, prints weight information. ploted (bool): If True, plots the corresponding Chebyshev function. Returns: neuron_weights (torch.Tensor): Tensor containing the weights. """ if layer_idx == 0: # Input layer layer = self.input layer_name = "Input Layer" elif layer_idx == self.nlayers + 1: # Output layer layer = self.output layer_name = "Output Layer" else: # Hidden layers if layer_idx * 2 >= len(self.kan_layers): # Each layer has dropout raise ValueError(f"Layer index {layer_idx} is out of range.") layer = self.kan_layers[layer_idx * 2] # Get the actual Chebyshev layer layer_name = f"Hidden Layer {layer_idx}" # Check if neuron index is valid if neuron_idx >= layer.cheby_coeffs.shape[1]: # Out of bounds raise ValueError(f"Neuron index {neuron_idx} exceeds layer size {layer.cheby_coeffs.shape[1]}.") # Extract weights for this neuron neuron_weights = layer.cheby_coeffs[:, neuron_idx, :].mean(dim=0) # Take mean across input_dim if print_info: print(f"Location: {layer_name}, Neuron: {neuron_idx}") print(f"Layer Shape: {neuron_weights.shape}") print(f"Weights for Neuron {neuron_idx}:\n{neuron_weights}") if ploted: # Create tensor for x values in range [-1, 1] x = torch.linspace(-1, 1, steps=100, device=neuron_weights.device) # Initialize Chebyshev polynomials T = torch.ones((100, neuron_weights.shape[0]), device=neuron_weights.device) if neuron_weights.shape[0] > 1: T[:, 1] = x # T1(x) = x for i in range(2, neuron_weights.shape[0]): # Generate higher-order Chebyshev polynomials T[:, i] = 2 * x * T[:, i - 1] - T[:, i - 2] # Tn(x) = 2xTn-1(x) - Tn-2(x) # Compute the final function using tensor operations y = T @ neuron_weights # Weighted sum of Chebyshev basis functions fig, ax = plt.subplots(figsize=(10, 6)) # Plot final function ax.plot(x.cpu(), y.detach().cpu(), label="Chebyshev Function (Final)", color="black", linewidth=2) # Plot individual Chebyshev basis functions for i in range(neuron_weights.shape[0]): ax.plot(x.cpu(), (T[:, i] * neuron_weights[i]).detach().cpu(), linestyle="--", alpha=0.6, label=f"T{i}(x) * w{i}") ax.set_title(f"Chebyshev Polynomial Expansion for Neuron {neuron_idx} in {layer_name}") ax.set_xlabel("x (normalized input)") ax.set_ylabel("Function Output") ax.legend() ax.grid(True) plt.show() return neuron_weights
[docs] def plot_structure( self, save_path:str=None, labels:bool = False, figsize:tuple = (12,6), ): import igraph as ig """ Visualizes the KAN model as a graph using igraph. TODAVÍA POR HACER ALGUNOS RETOQUES Args: model (KANupm): The trained neural network model. Returns: None (Displays the graph) """ layer_sizes = [self.ninput] + [self.hidden_neur] * self.nlayers + [self.noutput] # Create graph g = ig.Graph(directed=True) # Assign neuron positions positions = {} neuron_idx = 0 layers = len(layer_sizes) for layer, num_neurons in enumerate(layer_sizes): for neuron in range(num_neurons): g.add_vertex(name=f"L{layer}_N{neuron}") positions[f"L{layer}_N{neuron}"] = (layer, -neuron) # X: layer index, Y: neuron index (negative for order) # Add edges (connections between layers) for layer in range(layers - 1): # From input → hidden layers → output prev_layer_size = layer_sizes[layer] curr_layer_size = layer_sizes[layer + 1] for prev_neuron in range(prev_layer_size): for curr_neuron in range(curr_layer_size): g.add_edge(f"L{layer}_N{prev_neuron}", f"L{layer+1}_N{curr_neuron}") # Plot fig, ax = plt.subplots(figsize=figsize) layout = [positions[v["name"]] for v in g.vs] # Extract positions if labels: vertex_label = [v["name"] for v in g.vs] else: vertex_label = [] ig.plot( g, target=ax, layout=layout, vertex_size=30, vertex_label=vertex_label, edge_color="gray",vertex_label_dist=-1.2,vertex_label_size=12 ,edge_size=20, vertex_color="lightblue" ) plt.title(f"Structure of KAN Model: {self.model_name}") if save_path is not None: if os.path.isdir(save_path): print(f"Printing losses on path {save_path}") else: print("Path not found. Printing losses on local folder (.)") save_path = '.' fig.savefig(save_path + f"/structure_{self.model_name}.jpg") plt.show()
# def define_setp_size( # method:str = 'epochs', # len_ref:int = , # ): # if method == 'epochs':
[docs] class SineLayer(nn.Module): def __init__(self, input_size, output_size, sigma=1.0): super(SineLayer, self).__init__() self.linear = nn.Linear(input_size, output_size) self.sigma = sigma self.init_weights()
[docs] def init_weights(self): nn.init.normal_(self.linear.weight, mean=0.0, std=self.sigma) nn.init.zeros_(self.linear.bias)
[docs] def forward(self, x): return torch.sin( self.linear(x))
[docs] class ChebyLayer_v3(nn.Module): def __init__(self, input_dim, output_dim, degree): super(ChebyLayer_v3, self).__init__() self.inputdim = input_dim self.outdim = output_dim self.degree = degree self.cheby_coeffs = nn.Parameter(torch.empty(input_dim, output_dim, degree + 1)) nn.init.normal_(self.cheby_coeffs, mean=0.0, std=1 / (input_dim * (degree + 1))) self.register_buffer("arange", torch.arange(0, degree + 1, 1))
[docs] def forward(self, x): # Since Chebyshev polynomial is defined in [-1, 1] # We need to normalize x to [-1, 1] using tanh #x = torch.tanh(x) # View and repeat input degree + 1 times x = x.view((-1, self.inputdim, 1)).expand( -1, -1, self.degree + 1 ) # shape = (batch_size, inputdim, self.degree + 1) # Apply acos x = x.acos() # Multiply by arange [0 .. degree] x *= self.arange # Apply cos x = x.cos() # Compute the Chebyshev interpolation y = torch.einsum( "bid,iod->bo", x, self.cheby_coeffs ) # shape = (batch_size, outdim) y = y.view(-1, self.outdim) return y
[docs] class ChebyLayer_v2(nn.Module): def __init__(self, input_dim, output_dim, degree): super(ChebyLayer_v2, self).__init__() self.inputdim = input_dim self.outdim = output_dim self.degree = degree self.cheby_coeffs = nn.Parameter(torch.empty(input_dim, output_dim, degree + 1)) nn.init.normal_(self.cheby_coeffs, mean=0.0, std=1/(input_dim * (degree + 1)))
[docs] def forward(self, x): x = torch.reshape(x, (-1, self.inputdim)) # shape = (batch_size, inputdim) # Since Chebyshev polynomial is defined in [-1, 1] # We need to normalize x to [-1, 1] using tanh #x = torch.tanh(x) # Initialize Chebyshev polynomial tensors cheby = torch.ones(x.shape[0], self.inputdim, self.degree + 1, device=x.device) if self.degree > 0: cheby[:, :, 1] = x for i in range(2, self.degree + 1): cheby[:, :, i] = 2 * x * cheby[:, :, i - 1].clone() - cheby[:, :, i - 2].clone() # Compute the Chebyshev interpolation y = torch.einsum('bid,iod->bo', cheby, self.cheby_coeffs) # shape = (batch_size, outdim) y = y.view(-1, self.outdim) return y
[docs] class ChebyLayer_ant(nn.Module): def __init__(self, input_dim, output_dim, degree): super(ChebyLayer_ant, self).__init__() self.inputdim = input_dim self.outdim = output_dim self.degree = degree self.cheby_coeffs = nn.Parameter(torch.empty(input_dim, output_dim, degree + 1)) nn.init.normal_(self.cheby_coeffs, mean=0.0, std=1 / (input_dim * (degree + 1))) self.register_buffer("arange", torch.arange(0, degree + 1, 1))
[docs] def forward(self, x): # Since Chebyshev polynomial is defined in [-1, 1] # We need to normalize x to [-1, 1] using tanh # x = torch.tanh(x) # View and repeat input degree + 1 times x = x.view((-1, self.inputdim, 1)).expand( -1, -1, self.degree + 1 ) # shape = (batch_size, inputdim, self.degree + 1) # Apply acos x = x.acos() # Multiply by arange [0 .. degree] x *= self.arange # Apply cos x = x.cos() # Compute the Chebyshev interpolation y = torch.einsum( "bid,iod->bo", x, self.cheby_coeffs ) # shape = (batch_size, outdim) y = y.view(-1, self.outdim) return y
[docs] class JacobiLayer(nn.Module): def __init__(self, input_dim, output_dim, degree, a=1.0, b=1.0): super(JacobiLayer, self).__init__() self.inputdim = input_dim self.outdim = output_dim self.a = a self.b = b self.degree = degree self.jacobi_coeffs = nn.Parameter(torch.empty(input_dim, output_dim, degree + 1)) nn.init.normal_(self.jacobi_coeffs, mean=0.0, std=1/(input_dim * (degree + 1)))
[docs] def forward(self, x): x = torch.reshape(x, (-1, self.inputdim)) # shape = (batch_size, inputdim) # Since Jacobian polynomial is defined in [-1, 1] # We need to normalize x to [-1, 1] using tanh #x = torch.tanh(x) # Initialize Jacobian polynomial tensors jacobi = torch.ones(x.shape[0], self.inputdim, self.degree + 1, device=x.device) if self.degree > 0: ## degree = 0: jacobi[:, :, 0] = 1 (already initialized) ; degree = 1: jacobi[:, :, 1] = x ; d jacobi[:, :, 1] = ((self.a-self.b) + (self.a+self.b+2) * x) / 2 for i in range(2, self.degree + 1): theta_k = (2*i+self.a+self.b)*(2*i+self.a+self.b-1) / (2*i*(i+self.a+self.b)) theta_k1 = (2*i+self.a+self.b-1)*(self.a*self.a-self.b*self.b) / (2*i*(i+self.a+self.b)*(2*i+self.a+self.b-2)) theta_k2 = (i+self.a-1)*(i+self.b-1)*(2*i+self.a+self.b) / (i*(i+self.a+self.b)*(2*i+self.a+self.b-2)) jacobi[:, :, i] = (theta_k * x + theta_k1) * jacobi[:, :, i - 1].clone() - theta_k2 * jacobi[:, :, i - 2].clone() # 2 * x * jacobi[:, :, i - 1].clone() - jacobi[:, :, i - 2].clone() # Compute the Jacobian interpolation y = torch.einsum('bid,iod->bo', jacobi, self.jacobi_coeffs) # shape = (batch_size, outdim) y = y.view(-1, self.outdim) return y
[docs] class TaylorLayer(nn.Module): def __init__(self, input_dim, out_dim, degree, addbias=True): super(TaylorLayer, self).__init__() self.input_dim = input_dim self.out_dim = out_dim self.degree = degree self.addbias = addbias self.coeffs = nn.Parameter(torch.randn(out_dim, input_dim, degree) * 0.01) if self.addbias: self.bias = nn.Parameter(torch.zeros(1, out_dim))
[docs] def forward(self, x): shape = x.shape outshape = shape[0:-1] + (self.out_dim,) x = torch.reshape(x, (-1, self.input_dim)) x_expanded = x.unsqueeze(1).expand(-1, self.out_dim, -1) y = torch.zeros((x.shape[0], self.out_dim), device=x.device) for i in range(self.degree): term = (x_expanded ** i) * self.coeffs[:, :, i] y += term.sum(dim=-1) if self.addbias: y += self.bias y = torch.reshape(y, outshape) return y
[docs] class MakeDataset_kan(Dataset): def __init__(self, tensor_data): """ Inicializa el dataset con los datos generados por create_final_tensor. Args: tensor_data (dict): Diccionario con claves: - 'scaled' (torch.Tensor): Tensor de entradas normalizadas. - 'tensor' (torch.Tensor): Tensor original (para obtener coordenadas). - 'info' (dict): Diccionario con información adicional sobre los datos. """ self.inputs = tensor_data['scaled'][:,:tensor_data['info']['ninputs']] self.outputs = tensor_data['scaled'][:, -tensor_data['info']['noutputs']:] def __len__(self): """ Retorna el número de casos en el dataset. Returns: int: Número de filas en los datos. """ return len(self.inputs) def __getitem__(self, idx): """ Retorna un par (input, output) en el índice `idx`. Args: idx (int): Índice del caso que se quiere acceder. Returns: Tuple[torch.Tensor, torch.Tensor]: Par (input, output) en el índice dado. """ x = self.inputs[idx] y = self.outputs[idx] # # Si outputs tiene una dimensión extra innecesaria, la quitamos # if y.ndim > 1 and y.shape[-1] == 1: # y = y.squeeze(-1) return x, y
[docs] class MakeDatasetScored_kan(Dataset): def __init__( self, tensor_data ): """ Inicializa el dataset con los datos generados por create_final_tensor_scored. Args: tensor_data (dict): Diccionario con claves: - 'scaled' (torch.Tensor): Tensor de entradas normalizadas. - 'score' (torch.Tensor): Tensor de notas de salida. - 'info' (dict): Diccionario con información adicional sobre los datos. - 'tensor' (torch.Tensor): Tensor original (para obtener coordenadas). """ self.inputs = tensor_data['scaled'][:,:tensor_data['info']['ninputs']] self.score = tensor_data['score'] self.outputs = tensor_data['scaled'][:, -tensor_data['info']['noutputs']:] self.points = tensor_data['tensor'][:, :3] def __len__(self): """ Retorna el número de casos en el dataset. """ return len(self.inputs) def __getitem__(self, idx): """ Retorna una tupla (input, score, output) en el índice `idx`. Args: idx (int): Índice del caso. Returns: Tuple[torch.Tensor, torch.Tensor, torch.Tensor]: (input, score, output). """ x = self.inputs[idx] s = self.score[idx] y = self.outputs[idx] # Asegurar que no haya dimensiones extra en y # if y.ndim > 1 and y.shape[-1] == 1: # y = y.squeeze(-1) return x, s, y
[docs] class MakeDatasetScored_kan_ant(Dataset): def __init__(self, inputs: torch.Tensor, score: torch.Tensor, outputs: torch.Tensor): """ Inicializa el dataset con los datos de entrada y salida. Args: inputs (torch.Tensor): Tensor de entradas (n casos, ninput columnas). score (torch.Tensor): Tensor de notas de salida (n casos, noutput columnas) outputs (torch.Tensor): Tensor de salidas (n casos, noutput columnas). """ self.inputs = inputs self.score = score self.outputs = outputs def __len__(self): """ Retorna el número de casos en el dataset. Returns: int: Número de filas en los datos. """ return len(self.inputs) def __getitem__(self, idx): """ Retorna un trio (input, score, output) en el índice `idx`. Args: idx (int): Índice del caso que se quiere acceder. Returns: Tuple[torch.Tensor, torch.Tensor, torch.Tensor]: Tupla (input, score, output) en el índice dado. """ x = self.inputs[idx] s = self.score[idx] y = self.outputs[idx] # Si outputs tiene una dimensión extra innecesaria, la quitamos if y.ndim > 1 and y.shape[-1] == 1: y = y.squeeze(-1) return x, s, y
[docs] class NormPLoss(nn.Module): def __init__(self): """ Inicializa la función de pérdida basada en la norma p. """ super(NormPLoss, self).__init__()
[docs] def forward(self, input, score, target, p=2): """ Calcula la norma p del vector de pérdidas ponderadas por `score`. Args: input (torch.Tensor): Predicciones del modelo. target (torch.Tensor): Valores reales. score (torch.Tensor): Ponderaciones de importancia. p (float, optional): Valor de la norma p (default, 2). Returns: torch.Tensor: Pérdida basada en la norma p. """ # Cálculo de la norma p con score como ponderación loss_vector = score * torch.abs(input - target) loss = torch.sum(loss_vector**p) ** (1/p) return loss
[docs] class StatisticalLoss(nn.Module): def __init__(self): """ Inicializa la función de pérdida que devuelve la media y varianza del error absoluto. """ super(StatisticalLoss, self).__init__()
[docs] def forward(self, input, target): """ Calcula la media y la varianza de los errores absolutos entre `input` y `target`. Args: input (torch.Tensor): Predicciones del modelo. target (torch.Tensor): Valores reales. Returns: tuple: (media del error, varianza del error) """ error = torch.abs(input - target) # Cálculo del error absoluto mean_error = torch.mean(error) # Media del error var_error = torch.var(error, unbiased=True) # Varianza del error return mean_error, var_error
# class WeightedMSELoss_ant(nn.Module): # def __init__(self): # """Inicializa la función de pérdida.""" # super(WeightedMSELoss, self).__init__() # def forward(self, input, score, target): # """ # Calcula el MSE ponderado por score. # Args: # input (torch.Tensor): Predicciones del modelo. # target (torch.Tensor): Valores reales. # score (torch.Tensor): Ponderaciones de importancia (score). # Returns: # torch.Tensor: Pérdida MSE ponderada. # """ # alpha = 1 # # loss = (score * (input - target) ** 2).mean() # MSE ponderado # loss = torch.sum((score ** alpha) * (input - target) ** 2) / torch.sum(score ** alpha) # return loss # class WeightedMSELoss(nn.Module): # def __init__( # self, # initial_alpha=1, # min_alpha=0.5, # max_alpha=3, # decrease_threshold=0.01, # increase_threshold=0.1, # adjust_factor=0.1 # ): # """ # Inicializa la función de pérdida con un alpha dinámico. # Args: # initial_alpha (float): Valor inicial de alpha. # min_alpha (float): Valor mínimo de alpha. # max_alpha (float): Valor máximo de alpha. # decrease_threshold (float): Umbral de pérdida para disminuir alpha. # increase_threshold (float): Umbral de pérdida para aumentar alpha. # adjust_factor (float): Factor de ajuste para modificar alpha. # """ # super(WeightedMSELoss, self).__init__() # self.alpha = initial_alpha # self.min_alpha = min_alpha # self.max_alpha = max_alpha # self.decrease_threshold = decrease_threshold # self.increase_threshold = increase_threshold # self.adjust_factor = adjust_factor # self.prev_loss = None # Almacena la pérdida del paso anterior # def forward(self, input, score, target): # """ # Calcula el MSE ponderado por `score`, ajustando `alpha` dinámicamente. # Args: # input (torch.Tensor): Predicciones del modelo. # target (torch.Tensor): Valores reales. # score (torch.Tensor): Ponderaciones de importancia. # Returns: # torch.Tensor: Pérdida MSE ponderada. # """ # # loss = torch.sum((score ** self.alpha) * (input - target) ** 2) / torch.sum(score ** self.alpha) # loss = torch.sum((torch.exp(score**self.alpha) / torch.sum(torch.exp(score**self.alpha))) * (input - target)**2) # # Ajuste de alpha basado en la pérdida actual # if self.prev_loss is not None: # delta_loss = self.prev_loss - loss.item() # Cambio en la pérdida # if delta_loss < self.decrease_threshold: # Pérdida no mejora lo suficiente # self.alpha = min(self.alpha + self.adjust_factor, self.max_alpha) # elif delta_loss > self.increase_threshold: # Pérdida mejora bastante # self.alpha = max(self.alpha - self.adjust_factor, self.min_alpha) # self.prev_loss = loss.item() # Actualizar la pérdida previa # return loss