import torch
import torch.nn as nn
import time
from torch.utils.data import DataLoader, Dataset
import torch.optim as optim
import numpy as np
import os
from cetaceo.data import BaseDataset
[docs]
class KANupm(nn.Module):
def __init__(
self,
ninput: int,
noutput: int,
nlayers: int,
hidden_neur: int,
layer_type,
name_model: str,
id="kan",
dropout_p:float =0,
device: torch.device = torch.device("cpu"),
**layer_kwargs):
super(KANupm, self).__init__()
self.ninput = ninput
self.noutput = noutput
self.nlayers = nlayers
self.hidden_neur = hidden_neur
self.layer_type = layer_type
self.name_model = name_model
self.id = id
self.dropout_p = dropout_p
self.device = device
# Capas ocultas con dropout
hidden_layers = []
for _ in range(nlayers):
hidden_layers.append(layer_type(hidden_neur, hidden_neur, **layer_kwargs))
hidden_layers.append(nn.Dropout(p=dropout_p))
self.kan_layers = nn.ModuleList(hidden_layers)
# Capas de entrada y salida
self.input = layer_type(ninput, hidden_neur, **layer_kwargs)
self.output = layer_type(hidden_neur, noutput, **layer_kwargs)
self.to(self.device)
[docs]
def forward(self, x):
x = self.input(x)
for layer in self.kan_layers:
x = layer(x)
x = self.output(x)
return x
[docs]
def fit(self,
train_dataset,
eval_dataset,
epochs,
batch,
lr,
lr_gamma,
lr_scheduler_step,
print_eval_rate=2,
criterion=nn.MSELoss(),
folder_save=None):
start_n_time = time.time()
train_loader = DataLoader(train_dataset, batch_size=batch, shuffle=True)
test_loader = DataLoader(eval_dataset, batch_size=batch, shuffle=False)
train_losses = torch.tensor([], device=self.device)
test_losses = torch.tensor([], device=self.device)
optimizer = optim.Adam(self.parameters(), lr=lr)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=lr_scheduler_step, gamma=lr_gamma)
# batch=2**batch_size_p
for epoch in range(epochs):
self.train()
train_loss = 0.0
for inputs, targets in train_loader:
inputs, targets = inputs.float().to(self.device), targets.float().to(self.device)
optimizer.zero_grad()
#with torch.autograd.detect_anomaly():
outputs = self(inputs)
loss = criterion(outputs, targets.unsqueeze(1))
loss.backward()
#torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
optimizer.step()
train_loss += loss.item()
train_loss /= len(train_loader)
train_losses = torch.cat((train_losses, torch.tensor([train_loss], dtype=torch.float64, device=self.device)))
scheduler.step()
if (epoch + 1) % print_eval_rate == 0:
self.eval()
test_loss = 0.0
with torch.no_grad():
for inputs, targets in test_loader:
inputs, targets = inputs.float().to(self.device), targets.float().to(self.device)
outputs = self(inputs)
loss = criterion(outputs, targets.unsqueeze(1))
test_loss += loss.item()
test_loss /= len(test_loader)
test_losses = torch.cat((test_losses, torch.tensor([test_loss], dtype=torch.float64, device=self.device)))
print(f'Epoch {epoch + 1}/{epochs}, Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}')
if folder_save is not None:
train_losses_np = train_losses.cpu().numpy()
test_losses_np = test_losses.cpu().numpy()
np.save(folder_save + "train_losses_" + self.name_model + ".npy", train_losses_np)
np.save(folder_save + "test_losses_" + self.name_model + ".npy", test_losses_np)
end_n_time = time.time()
print(f'Tiempo para {self.name_model}: {(end_n_time - start_n_time)/60:.2f} minutos')
[docs]
def exam(self,
data: torch.Tensor,
**kwargs,
):
"""
Recibe un tensor con datos de test y resultados. Devuelve un array de numpy de predicciones y otro de valores verdaderos para comparar.
Args:
data (torch.Tensor): Tensor con (ninput + noutput) columnas y n filas, siendo n los casos a evaluar.
Returns:
Torch Dataset: Dataset con ninputs como columnas de entrada y noutputs como columnas de salida.
"""
assert data.shape[1] == (self.ninput + self.noutput), \
f"El tensor de datos debe tener {self.ninput + self.noutput} columnas (ninput + noutput)."
# Separar los datos en entradas (ninput columnas) y salidas (noutput columnas)
inputs = data[:, :self.ninput]
outputs = data[:, self.ninput:]
# Convertir las entradas y salidas en un Dataset de PyTorch
dataset = MakeDataset_kan(inputs, outputs)
dataloader_params = {
"batch_size": 2**5,
"shuffle": False,
"num_workers": 0,
"pin_memory": True,
}
for key in dataloader_params.keys():
if key in kwargs:
dataloader_params[key] = kwargs[key]
loader = DataLoader(dataset, **dataloader_params)
outs=[]
targ=[]
with torch.no_grad():
for inputs, targets in loader:
inputs, targets = inputs.to(self.device), targets.to(self.device)
outputs = self(inputs).to(self.device)
outs.extend(outputs)
targ.extend(targets)
outs = np.array(outs)
targ = np.array(targ)
return outs, targ
[docs]
def predict(
self,
X: BaseDataset,
rescale_output: bool = True,
return_targets: bool = False,
**kwargs,
):
r"""
Predict the target values for the input data. The dataset is loaded to a DataLoader with the provided keyword arguments.
The model is set to evaluation mode and the predictions are made using the input data. The output can be rescaled using
the dataset scaler.
Args:
X (BaseDataset): The dataset whose target values are to be predicted using the input data.
rescale_output (bool): Whether to rescale the output with the scaler of the dataset (default: ``True``).
kwargs (dict, optional): Additional keyword arguments to pass to the DataLoader. Can be used to set the parameters of the DataLoader (see PyTorch documentation at https://pytorch.org/docs/stable/data.html#torch.utils.data.DataLoader):
- batch_size (int, optional): Batch size (default: ``32``).
- shuffle (bool, optional): Shuffle the data (default: ``True``).
- num_workers (int, optional): Number of workers to use (default: ``0``).
- pin_memory (bool, optional): Pin memory (default: ``True``).
Returns:
Tuple [np.ndarray, np.ndarray]: The predictions and the true target values.
"""
dataloader_params = {
"batch_size": 2**5,
"shuffle": False,
"num_workers": 0,
"pin_memory": True,
}
for key in dataloader_params.keys():
if key in kwargs:
dataloader_params[key] = kwargs[key]
predict_dataloader = DataLoader(X, **dataloader_params)
total_rows = len(predict_dataloader.dataset)
num_columns = self.noutput
all_predictions = np.empty((total_rows, num_columns))
all_targets = np.empty((total_rows, num_columns))
self.eval()
start_idx = 0
with torch.no_grad():
for x, y in predict_dataloader:
output = self(x.to(self.device))
batch_size = x.size(0)
end_idx = start_idx + batch_size
all_predictions[start_idx:end_idx, :] = output.cpu().numpy()
all_targets[start_idx:end_idx, :] = y.cpu().numpy()
start_idx = end_idx
if rescale_output:
all_predictions = X.rescale_y(np.array(all_predictions))
all_targets = X.rescale_y(np.array(all_targets))
if return_targets:
return all_predictions, all_targets
else:
return all_predictions
[docs]
def save(
self,
path: str,
):
checkpoint = {
"ninput": self.ninput,
"noutput": self.noutput,
"nlayers": self.nlayers,
"hidden_neur": self.hidden_neur,
"layer_type": self.layer_type,
"name_model": self.name_model,
"id": self.id,
"dropout": self.dropout_p,
"device": self.device,
"state_dict": self.state_dict(),
}
if isinstance(self.input, TaylorLayer):
checkpoint['order'] = self.input.order
elif hasattr(self.input, 'degree'):
checkpoint['degree'] = self.input.degree
if not os.path.exists(path):
os.makedirs(path)
filename = f'{self.name_model}' + '.pth'
path = path + filename
torch.save(checkpoint, path)
[docs]
@classmethod
def load(cls, path: str, device: torch.device = torch.device("cpu")):
"""
Cargar el modelo desde un archivo checkpoint.
Args:
path (str): Ruta del archivo de checkpoint.
device (torch.device): El dispositivo donde cargar el modelo.
Returns:
KANupm: El modelo cargado con los pesos restaurados.
"""
print('Loading model...')
checkpoint = torch.load(path, map_location=device)
if checkpoint['layer_type'] == TaylorLayer:
order = checkpoint['order']
layer_kwargs = {'order': order}
else:
degree = checkpoint['degree']
layer_kwargs = {'degree': degree}
model = cls(
ninput=checkpoint['ninput'],
noutput=checkpoint['noutput'],
nlayers= checkpoint['nlayers'],
hidden_neur=checkpoint['hidden_neur'],
layer_type=checkpoint['layer_type'],
name_model=checkpoint['name_model'],
id=checkpoint['id'],
dropout_p=checkpoint['dropout'],
device=device,
**layer_kwargs # Pasar los argumentos específicos de la capa
)
model.load_state_dict(checkpoint['state_dict'])
print(f"Loaded model KAN: {checkpoint['name_model']}")
keys_print=['ninput', 'noutput', 'nlayers', 'hidden_neur', 'layer_type', 'dropout']
for key in keys_print:
print(f" {key}: {checkpoint[key]}")
return model
[docs]
class ChebyLayer(nn.Module):
def __init__(self, input_dim, output_dim, degree):
super(ChebyLayer, self).__init__()
self.inputdim = input_dim
self.outdim = output_dim
self.degree = degree
self.cheby_coeffs = nn.Parameter(torch.empty(input_dim, output_dim, degree + 1))
nn.init.normal_(self.cheby_coeffs, mean=0.0, std=1 / (input_dim * (degree + 1)))
self.register_buffer("arange", torch.arange(0, degree + 1, 1))
[docs]
def forward(self, x):
# Since Chebyshev polynomial is defined in [-1, 1]
# We need to normalize x to [-1, 1] using tanh
#x = torch.tanh(x)
# View and repeat input degree + 1 times
x = x.view((-1, self.inputdim, 1)).expand(
-1, -1, self.degree + 1
) # shape = (batch_size, inputdim, self.degree + 1)
# Apply acos
x = x.acos()
# Multiply by arange [0 .. degree]
x *= self.arange
# Apply cos
x = x.cos()
# Compute the Chebyshev interpolation
y = torch.einsum(
"bid,iod->bo", x, self.cheby_coeffs
) # shape = (batch_size, outdim)
y = y.view(-1, self.outdim)
return y
[docs]
class ChebyLayer_v2(nn.Module):
def __init__(self, input_dim, output_dim, degree):
super(ChebyLayer_v2, self).__init__()
self.inputdim = input_dim
self.outdim = output_dim
self.degree = degree
self.cheby_coeffs = nn.Parameter(torch.empty(input_dim, output_dim, degree + 1))
nn.init.normal_(self.cheby_coeffs, mean=0.0, std=1/(input_dim * (degree + 1)))
[docs]
def forward(self, x):
x = torch.reshape(x, (-1, self.inputdim)) # shape = (batch_size, inputdim)
# Since Chebyshev polynomial is defined in [-1, 1]
# We need to normalize x to [-1, 1] using tanh
#x = torch.tanh(x)
# Initialize Chebyshev polynomial tensors
cheby = torch.ones(x.shape[0], self.inputdim, self.degree + 1, device=x.device)
if self.degree > 0:
cheby[:, :, 1] = x
for i in range(2, self.degree + 1):
cheby[:, :, i] = 2 * x * cheby[:, :, i - 1].clone() - cheby[:, :, i - 2].clone()
# Compute the Chebyshev interpolation
y = torch.einsum('bid,iod->bo', cheby, self.cheby_coeffs) # shape = (batch_size, outdim)
y = y.view(-1, self.outdim)
return y
[docs]
class ChebyLayer_ant(nn.Module):
def __init__(self, input_dim, output_dim, degree):
super(ChebyLayer_ant, self).__init__()
self.inputdim = input_dim
self.outdim = output_dim
self.degree = degree
self.cheby_coeffs = nn.Parameter(torch.empty(input_dim, output_dim, degree + 1))
nn.init.normal_(self.cheby_coeffs, mean=0.0, std=1/(input_dim * (degree + 1)))
[docs]
def forward(self, x):
x = torch.reshape(x, (-1, self.inputdim)) # shape = (batch_size, inputdim)
# Since Chebyshev polynomial is defined in [-1, 1]
# We need to normalize x to [-1, 1] using tanh
#x = torch.tanh(x)
# Initialize Chebyshev polynomial tensors
cheby = torch.ones(x.shape[0], self.inputdim, self.degree + 1, device=x.device)
if self.degree > 0:
cheby[:, :, 1] = x
for i in range(2, self.degree + 1):
cheby[:, :, i] = 2 * x * cheby[:, :, i - 1].clone() - cheby[:, :, i - 2].clone()
# Compute the Chebyshev interpolation
y = torch.einsum('bid,iod->bo', cheby, self.cheby_coeffs) # shape = (batch_size, outdim)
y = y.view(-1, self.outdim)
return y
[docs]
class JacobiLayer(nn.Module):
def __init__(self, input_dim, output_dim, degree, a=1.0, b=1.0):
super(JacobiLayer, self).__init__()
self.inputdim = input_dim
self.outdim = output_dim
self.a = a
self.b = b
self.degree = degree
self.jacobi_coeffs = nn.Parameter(torch.empty(input_dim, output_dim, degree + 1))
nn.init.normal_(self.jacobi_coeffs, mean=0.0, std=1/(input_dim * (degree + 1)))
[docs]
def forward(self, x):
x = torch.reshape(x, (-1, self.inputdim)) # shape = (batch_size, inputdim)
# Since Jacobian polynomial is defined in [-1, 1]
# We need to normalize x to [-1, 1] using tanh
#x = torch.tanh(x)
# Initialize Jacobian polynomial tensors
jacobi = torch.ones(x.shape[0], self.inputdim, self.degree + 1, device=x.device)
if self.degree > 0: ## degree = 0: jacobi[:, :, 0] = 1 (already initialized) ; degree = 1: jacobi[:, :, 1] = x ; d
jacobi[:, :, 1] = ((self.a-self.b) + (self.a+self.b+2) * x) / 2
for i in range(2, self.degree + 1):
theta_k = (2*i+self.a+self.b)*(2*i+self.a+self.b-1) / (2*i*(i+self.a+self.b))
theta_k1 = (2*i+self.a+self.b-1)*(self.a*self.a-self.b*self.b) / (2*i*(i+self.a+self.b)*(2*i+self.a+self.b-2))
theta_k2 = (i+self.a-1)*(i+self.b-1)*(2*i+self.a+self.b) / (i*(i+self.a+self.b)*(2*i+self.a+self.b-2))
jacobi[:, :, i] = (theta_k * x + theta_k1) * jacobi[:, :, i - 1].clone() - theta_k2 * jacobi[:, :, i - 2].clone() # 2 * x * jacobi[:, :, i - 1].clone() - jacobi[:, :, i - 2].clone()
# Compute the Jacobian interpolation
y = torch.einsum('bid,iod->bo', jacobi, self.jacobi_coeffs) # shape = (batch_size, outdim)
y = y.view(-1, self.outdim)
return y
[docs]
class TaylorLayer(nn.Module):
def __init__(self, input_dim, out_dim, order, addbias=True):
super(TaylorLayer, self).__init__()
self.input_dim = input_dim
self.out_dim = out_dim
self.order = order
self.addbias = addbias
self.coeffs = nn.Parameter(torch.randn(out_dim, input_dim, order) * 0.01)
if self.addbias:
self.bias = nn.Parameter(torch.zeros(1, out_dim))
[docs]
def forward(self, x):
shape = x.shape
outshape = shape[0:-1] + (self.out_dim,)
x = torch.reshape(x, (-1, self.input_dim))
x_expanded = x.unsqueeze(1).expand(-1, self.out_dim, -1)
y = torch.zeros((x.shape[0], self.out_dim), device=x.device)
for i in range(self.order):
term = (x_expanded ** i) * self.coeffs[:, :, i]
y += term.sum(dim=-1)
if self.addbias:
y += self.bias
y = torch.reshape(y, outshape)
return y
[docs]
class MakeDataset_kan(Dataset):
def __init__(self,
inputs: torch.Tensor,
outputs: torch.Tensor):
"""
Inicializa el dataset con los datos de entrada y salida.
Args:
inputs (torch.Tensor): Tensor de entradas (n casos, ninput columnas).
outputs (torch.Tensor): Tensor de salidas (n casos, noutput columnas).
"""
self.inputs = inputs
self.outputs = outputs
def __len__(self):
"""
Retorna el número de casos en el dataset.
Returns:
int: Número de filas en los datos.
"""
return len(self.inputs)
def __getitem__(self, idx):
"""
Retorna un par (input, output) en el índice `idx`.
Args:
idx (int): Índice del caso que se quiere acceder.
Returns:
Tuple[torch.Tensor, torch.Tensor]: Par (input, output) en el índice dado.
"""
x = self.inputs[idx]
y = self.outputs[idx]
return x, y