Source code for models.KANupm_v4_2

import torch
import torch.nn as nn
import time
from torch.utils.data import DataLoader, Dataset
import torch.optim as optim
import numpy as np
import os
from cetaceo.data import BaseDataset
import psutil
[docs] class KANupm(nn.Module): r""" KAN (Kolmogorov-Arnold Network) model for regression tasks. This model is based on https://arxiv.org/abs/2404.19756, inspired by the Kolmogorov-Arnold representation theorem. Args: ninput (int): The number of input features. noutput (int): The number of output features. nlayers (int): The number of hidden layers. hidden_neur (int): The number of neurons in the hidden layers. layer_type (nn.Module): The type of layer to use in the model. It can be one of the following: ``JacobiLayer``, ``ChebyshevLayer``. name_model (str): The name of the model. dropout_p (float, optional): The dropout probability (default: ``0.0``). device (torch.device, optional): The device where the model is loaded (default: gpu if available). **layer_kwargs: Additional keyword arguments to pass to the layer type. For example, the order of the Taylor series or the degree of the Chebyshev polynomial. """ def __init__( self, ninput: int, noutput: int, nlayers: int, hidden_neur: int, layer_type, name_model: str, id="kan", dropout_p:float = 0, device: torch.device = torch.device("cpu"), **layer_kwargs): super(KANupm, self).__init__() self.ninput = ninput self.noutput = noutput self.nlayers = nlayers self.hidden_neur = hidden_neur self.layer_type = layer_type self.name_model = name_model self.id = id self.dropout_p = dropout_p self.device = device # Capas ocultas con dropout hidden_layers = [] for _ in range(nlayers): hidden_layers.append(layer_type(hidden_neur, hidden_neur, **layer_kwargs)) hidden_layers.append(nn.Dropout(p=dropout_p)) self.kan_layers = nn.ModuleList(hidden_layers) # Capas de entrada y salida self.input = layer_type(ninput, hidden_neur, **layer_kwargs) self.output = layer_type(hidden_neur, noutput, **layer_kwargs) self.to(self.device) print(f"Creating model KAN: {self.name_model}") keys_print = ['ninput', 'noutput', 'nlayers', 'hidden_neur', 'layer_type', 'dropout_p'] for key in keys_print: print(f" {key}: {getattr(self, key)}") # Usar getattr para obtener el valor del atributo
[docs] def forward(self, x): x = self.input(x) for layer in self.kan_layers: x = layer(x) x = self.output(x) return x
[docs] def fit(self, train_dataset, eval_dataset, epochs, batch_size, lr, scheduler_type="StepLR", # Nuevo parámetro para seleccionar el scheduler lr_kwargs=None, # Parámetros específicos del scheduler print_eval_rate=2, criterion=nn.MSELoss(), save_logs_path=None ): r""" Entrena el modelo utilizando el conjunto de datos proporcionado, con soporte para múltiples estrategias de ajuste dinámico del learning rate. Args: train_dataset (Dataset): Conjunto de datos de entrenamiento. eval_dataset (Dataset): Conjunto de datos de evaluación. epochs (int): Número total de épocas para entrenar el modelo. batch_size (int): Tamaño de los lotes para entrenamiento y evaluación. lr (float): Learning rate inicial para el optimizador. scheduler_type (str, opcional): Tipo de scheduler para ajustar dinámicamente el learning rate. Opciones disponibles: - "StepLR": Reduce el learning rate cada cierto número de épocas. - "ReduceLROnPlateau": Reduce el learning rate si la pérdida no mejora. - "OneCycleLR": Ajusta el learning rate siguiendo un único ciclo durante todo el entrenamiento. (Por defecto: "StepLR"). lr_kwargs (dict, opcional): Diccionario con parámetros específicos del scheduler seleccionado. Ejemplos: - Para StepLR: {"step_size": int, "gamma": float}. - Para ReduceLROnPlateau: {"mode": str, "factor": float, "patience": int}. - Para OneCycleLR: {"anneal_strategy": str, "div_factor": float}. (Por defecto: `{}`). print_eval_rate (int, opcional): Frecuencia de evaluación del modelo (en épocas). Si se establece en 0, no se realiza evaluación periódica. (Por defecto: 2). criterion (nn.Module, opcional): Función de pérdida a optimizar. (Por defecto: `nn.MSELoss()`). save_logs_path (str, opcional): Ruta donde guardar las pérdidas de entrenamiento y evaluación como archivos `.npy`. Si es `None`, no se guardan los resultados. (Por defecto: `None`). Returns: None: Entrena el modelo y, opcionalmente, guarda las pérdidas. """ if lr_kwargs is None: lr_kwargs = {} # Por defecto, sin parámetros adicionales start_n_time = time.time() print(' ') print(f"TRAINNING MODEL {self.name_model}") print(' ') print('Conditions:') print(f' epochs: {epochs}') print(f' batch size: 2**{np.log2(batch_size)}') print(f' scheduler: {scheduler_type}') print(f' criterion: {criterion}') print(f' save_path: {save_logs_path}') print(' ') train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=10, pin_memory=True ) test_loader = DataLoader(eval_dataset, batch_size=batch_size, shuffle=False, num_workers=10, pin_memory=True ) train_losses = torch.tensor([], device=self.device) test_losses = torch.tensor([], device=self.device) optimizer = optim.Adam(self.parameters(), lr=lr) if scheduler_type == "StepLR": scheduler = torch.optim.lr_scheduler.StepLR(optimizer, **lr_kwargs) elif scheduler_type == "ReduceLROnPlateau": scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, **lr_kwargs) elif scheduler_type == "OneCycleLR": scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr=lr, steps_per_epoch=len(train_loader), epochs=epochs, **lr_kwargs) else: raise ValueError(f"Unknown scheduler_type: {scheduler_type}") # scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=lr_scheduler_step, gamma=lr_gamma) if scheduler_type == "ReduceLROnPlateau": for epoch in range(epochs): self.train() train_loss = 0.0 for inputs, targets in train_loader: inputs, targets = inputs.float().to(self.device), targets.float().to(self.device) optimizer.zero_grad() #with torch.autograd.detect_anomaly(): outputs = self(inputs) loss = criterion(outputs, targets.unsqueeze(1))#.unsqueeze(2)) loss.backward() #torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) optimizer.step() train_loss += loss.item() train_loss /= len(train_loader) train_losses = torch.cat((train_losses, torch.tensor([train_loss], dtype=torch.float64, device=self.device))) scheduler.step(train_loss) # Usa la pérdida de entrenamiento if (epoch + 1) % print_eval_rate == 0: self.eval() test_loss = 0.0 with torch.no_grad(): for inputs, targets in test_loader: inputs, targets = inputs.float().to(self.device), targets.float().to(self.device) outputs = self(inputs) loss = criterion(outputs, targets.unsqueeze(1))#.unsqueeze(2)) test_loss += loss.item() test_loss /= len(test_loader) test_losses = torch.cat((test_losses, torch.tensor([test_loss], dtype=torch.float64, device=self.device))) current_lr = optimizer.param_groups[0]['lr'] if torch.cuda.is_available(): mem_used = torch.cuda.memory_allocated() / (1024 ** 3) # Uso de memoria en GB else: mem = psutil.virtual_memory() mem_used = mem.used / (1024 ** 3) # Uso de memoria RAM en GB print(f'Epoch {epoch + 1}/{epochs}, Train Loss: {train_loss:.4e}, Test Loss: {test_loss:.4e}, ' f'LR: {current_lr:.2e}, MEM: {mem_used:.2f} GB') if scheduler_type == "OneCycleLR": for epoch in range(epochs): self.train() train_loss = 0.0 for inputs, targets in train_loader: inputs, targets = inputs.float().to(self.device), targets.float().to(self.device) optimizer.zero_grad() outputs = self(inputs) loss = criterion(outputs, targets.unsqueeze(1)) loss.backward() optimizer.step() scheduler.step() # OneCycleLR se ajusta por batch train_loss += loss.item() train_loss /= len(train_loader) train_losses = torch.cat((train_losses, torch.tensor([train_loss], dtype=torch.float64, device=self.device))) if (epoch + 1) % print_eval_rate == 0: self.eval() test_loss = 0.0 with torch.no_grad(): for inputs, targets in test_loader: inputs, targets = inputs.float().to(self.device), targets.float().to(self.device) outputs = self(inputs) loss = criterion(outputs, targets.unsqueeze(1)) test_loss += loss.item() test_loss /= len(test_loader) test_losses = torch.cat((test_losses, torch.tensor([test_loss], dtype=torch.float64, device=self.device))) # Obtener el LR actual y memoria current_lr = optimizer.param_groups[0]['lr'] if torch.cuda.is_available(): mem_used = torch.cuda.memory_allocated() / (1024 ** 3) # GPU memory en GB else: mem = psutil.virtual_memory() mem_used = mem.used / (1024 ** 3) # CPU memory en GB print(f'Epoch {epoch + 1}/{epochs}, Train Loss: {train_loss:.4e}, Test Loss: {test_loss:.4e}, ' f'LR: {current_lr:.2e}, MEM: {mem_used:.2f} GB') else: for epoch in range(epochs): self.train() train_loss = 0.0 for inputs, targets in train_loader: inputs, targets = inputs.float().to(self.device), targets.float().to(self.device) optimizer.zero_grad() #with torch.autograd.detect_anomaly(): outputs = self(inputs) loss = criterion(outputs, targets.unsqueeze(1))#.unsqueeze(2)) loss.backward() #torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) optimizer.step() train_loss += loss.item() train_loss /= len(train_loader) train_losses = torch.cat((train_losses, torch.tensor([train_loss], dtype=torch.float64, device=self.device))) scheduler.step() if (epoch + 1) % print_eval_rate == 0: self.eval() test_loss = 0.0 with torch.no_grad(): for inputs, targets in test_loader: inputs, targets = inputs.float().to(self.device), targets.float().to(self.device) outputs = self(inputs) loss = criterion(outputs, targets.unsqueeze(1))#.unsqueeze(2)) test_loss += loss.item() test_loss /= len(test_loader) test_losses = torch.cat((test_losses, torch.tensor([test_loss], dtype=torch.float64, device=self.device))) current_lr = optimizer.param_groups[0]['lr'] if torch.cuda.is_available(): mem_used = torch.cuda.memory_allocated() / (1024 ** 3) # Uso de memoria en GB else: mem = psutil.virtual_memory() mem_used = mem.used / (1024 ** 3) # Uso de memoria RAM en GB print(f'Epoch {epoch + 1}/{epochs}, Train Loss: {train_loss:.4e}, Test Loss: {test_loss:.4e}, ' f'LR: {current_lr:.2e}, MEM: {mem_used:.2f} GB') if save_logs_path is not None: train_losses_np = train_losses.cpu().numpy() test_losses_np = test_losses.cpu().numpy() np.save(save_logs_path + "/train_losses_" + self.name_model + ".npy", train_losses_np) np.save(save_logs_path + "/test_losses_" + self.name_model + ".npy", test_losses_np) end_n_time = time.time() self.print_hours(start_n_time, end_n_time)
# print(f'Tiempo para {self.name_model}: {(end_n_time - start_n_time)/60:.2f} minutos')
[docs] def exam(self, data: torch.Tensor, **kwargs, ): """ Recibe un tensor con datos de test y resultados. Devuelve un array de numpy de predicciones y otro de valores verdaderos para comparar. Args: data (torch.Tensor): Tensor con (ninput + noutput) columnas y n filas, siendo n los casos a evaluar. Returns: Torch Dataset: Dataset con ninputs como columnas de entrada y noutputs como columnas de salida. """ assert data.shape[1] == (self.ninput + self.noutput), \ f"El tensor de datos debe tener {self.ninput + self.noutput} columnas (ninput + noutput)." # Separar los datos en entradas (ninput columnas) y salidas (noutput columnas) inputs = data[:, :self.ninput] outputs = data[:, self.ninput:] # Convertir las entradas y salidas en un Dataset de PyTorch dataset = MakeDataset_kan(inputs, outputs) dataloader_params = { "batch_size": 2**6, "shuffle": False, "num_workers": 5, "pin_memory": True, } for key in dataloader_params.keys(): if key in kwargs: dataloader_params[key] = kwargs[key] loader = DataLoader(dataset, **dataloader_params) outs=[] targ=[] self.to('cpu') with torch.no_grad(): for inputs, targets in loader: # inputs, targets = inputs.to(self.device), targets.to(self.device) outputs = self(inputs) outs.extend(outputs) targ.extend(targets) outs = np.array(outs) targ = np.array(targ) return outs, targ
[docs] def predict( self, X: BaseDataset, rescale_output: bool = True, return_targets: bool = False, **kwargs, ): r""" Predict the target values for the input data. The dataset is loaded to a DataLoader with the provided keyword arguments. The model is set to evaluation mode and the predictions are made using the input data. The output can be rescaled using the dataset scaler. Args: X (BaseDataset): The dataset whose target values are to be predicted using the input data. rescale_output (bool): Whether to rescale the output with the scaler of the dataset (default: ``True``). kwargs (dict, optional): Additional keyword arguments to pass to the DataLoader. Can be used to set the parameters of the DataLoader (see PyTorch documentation at https://pytorch.org/docs/stable/data.html#torch.utils.data.DataLoader): - batch_size (int, optional): Batch size (default: ``32``). - shuffle (bool, optional): Shuffle the data (default: ``True``). - num_workers (int, optional): Number of workers to use (default: ``0``). - pin_memory (bool, optional): Pin memory (default: ``True``). Returns: Tuple [np.ndarray, np.ndarray]: The predictions and the true target values. """ dataloader_params = { "batch_size": 2**5, "shuffle": False, "num_workers": 0, "pin_memory": True, } for key in dataloader_params.keys(): if key in kwargs: dataloader_params[key] = kwargs[key] predict_dataloader = DataLoader(X, **dataloader_params) total_rows = len(predict_dataloader.dataset) num_columns = self.noutput all_predictions = np.empty((total_rows, num_columns)) all_targets = np.empty((total_rows, num_columns)) self.eval() start_idx = 0 with torch.no_grad(): for x, y in predict_dataloader: output = self(x.to(self.device)) batch_size = x.size(0) end_idx = start_idx + batch_size all_predictions[start_idx:end_idx, :] = output.cpu().numpy() all_targets[start_idx:end_idx, :] = y.cpu().numpy() start_idx = end_idx if rescale_output: all_predictions = X.rescale_y(np.array(all_predictions)) all_targets = X.rescale_y(np.array(all_targets)) if return_targets: return all_predictions, all_targets else: return all_predictions
[docs] def save( self, path: str, ): r""" Save the model to a checkpoint file. Args: path (str): Path to save the model. It can be either a path to a directory or a file name. If it is a directory, the model will be saved with a filename that includes the number of epochs trained. save_only_model (bool, optional): Whether to only save the model, or also the optimizer and scheduler. Note that when this is true, you won't be able to resume training from checkpoint.(default: ``False``). """ checkpoint = { "ninput": self.ninput, "noutput": self.noutput, "nlayers": self.nlayers, "hidden_neur": self.hidden_neur, "layer_type": self.layer_type, "name_model": self.name_model, "id": self.id, "dropout": self.dropout_p, "device": self.device, "state_dict": self.state_dict(), } if isinstance(self.input, TaylorLayer): checkpoint['order'] = self.input.order elif hasattr(self.input, 'degree'): checkpoint['degree'] = self.input.degree if not os.path.exists(path): os.makedirs(path) filename = f'{self.name_model}' + '.pth' path = path + filename torch.save(checkpoint, path)
[docs] @classmethod def load(cls, path: str, device: torch.device = torch.device("cpu")): """ Loads a model from a checkpoint file. Args: path (str): Path to the checkpoint file. device (torch.device): Device where the model is loaded (default: cpu). Returns: model (KAN): The loaded KAN model with the trained weights. """ print('Loading model...') checkpoint = torch.load(path, map_location=device) if checkpoint['layer_type'] == TaylorLayer: order = checkpoint['order'] layer_kwargs = {'order': order} else: degree = checkpoint['degree'] layer_kwargs = {'degree': degree} model = cls( ninput=checkpoint['ninput'], noutput=checkpoint['noutput'], nlayers= checkpoint['nlayers'], hidden_neur=checkpoint['hidden_neur'], layer_type=checkpoint['layer_type'], name_model=checkpoint['name_model'], id=checkpoint['id'], dropout_p=checkpoint['dropout'], device=device, **layer_kwargs # Pasar los argumentos específicos de la capa ) model.load_state_dict(checkpoint['state_dict']) print(f"Loaded model KAN: {checkpoint['name_model']}") keys_print=['ninput', 'noutput', 'nlayers', 'hidden_neur', 'layer_type', 'dropout'] for key in keys_print: print(f" {key}: {checkpoint[key]}") return model
[docs] def print_hours(self, start, end): """ Recibe dos momentos temporales e imprime el intervalo de tiempo en formato sexagesimal """ total = end - start hours, remainder = divmod(total, 3600) minutes, seconds = divmod(remainder, 60) print(f'Tiempo para {self.name_model}: {int(hours):02}:{int(minutes):02}:{int(seconds):02}')
[docs] class ChebyLayer_v3(nn.Module): def __init__(self, input_dim, output_dim, degree): super(ChebyLayer_v3, self).__init__() self.inputdim = input_dim self.outdim = output_dim self.degree = degree self.cheby_coeffs = nn.Parameter(torch.empty(input_dim, output_dim, degree + 1)) nn.init.normal_(self.cheby_coeffs, mean=0.0, std=1 / (input_dim * (degree + 1))) self.register_buffer("arange", torch.arange(0, degree + 1, 1))
[docs] def forward(self, x): # Since Chebyshev polynomial is defined in [-1, 1] # We need to normalize x to [-1, 1] using tanh #x = torch.tanh(x) # View and repeat input degree + 1 times x = x.view((-1, self.inputdim, 1)).expand( -1, -1, self.degree + 1 ) # shape = (batch_size, inputdim, self.degree + 1) # Apply acos x = x.acos() # Multiply by arange [0 .. degree] x *= self.arange # Apply cos x = x.cos() # Compute the Chebyshev interpolation y = torch.einsum( "bid,iod->bo", x, self.cheby_coeffs ) # shape = (batch_size, outdim) y = y.view(-1, self.outdim) return y
[docs] class ChebyLayer_v2(nn.Module): def __init__(self, input_dim, output_dim, degree): super(ChebyLayer_v2, self).__init__() self.inputdim = input_dim self.outdim = output_dim self.degree = degree self.cheby_coeffs = nn.Parameter(torch.empty(input_dim, output_dim, degree + 1)) nn.init.normal_(self.cheby_coeffs, mean=0.0, std=1/(input_dim * (degree + 1)))
[docs] def forward(self, x): x = torch.reshape(x, (-1, self.inputdim)) # shape = (batch_size, inputdim) # Since Chebyshev polynomial is defined in [-1, 1] # We need to normalize x to [-1, 1] using tanh #x = torch.tanh(x) # Initialize Chebyshev polynomial tensors cheby = torch.ones(x.shape[0], self.inputdim, self.degree + 1, device=x.device) if self.degree > 0: cheby[:, :, 1] = x for i in range(2, self.degree + 1): cheby[:, :, i] = 2 * x * cheby[:, :, i - 1].clone() - cheby[:, :, i - 2].clone() # Compute the Chebyshev interpolation y = torch.einsum('bid,iod->bo', cheby, self.cheby_coeffs) # shape = (batch_size, outdim) y = y.view(-1, self.outdim) return y
[docs] class ChebyLayer_ant(nn.Module): def __init__(self, input_dim, output_dim, degree): super(ChebyLayer_ant, self).__init__() self.inputdim = input_dim self.outdim = output_dim self.degree = degree self.cheby_coeffs = nn.Parameter(torch.empty(input_dim, output_dim, degree + 1)) nn.init.normal_(self.cheby_coeffs, mean=0.0, std=1/(input_dim * (degree + 1)))
[docs] def forward(self, x): x = torch.reshape(x, (-1, self.inputdim)) # shape = (batch_size, inputdim) # Since Chebyshev polynomial is defined in [-1, 1] # We need to normalize x to [-1, 1] using tanh #x = torch.tanh(x) # Initialize Chebyshev polynomial tensors cheby = torch.ones(x.shape[0], self.inputdim, self.degree + 1, device=x.device) if self.degree > 0: cheby[:, :, 1] = x for i in range(2, self.degree + 1): cheby[:, :, i] = 2 * x * cheby[:, :, i - 1].clone() - cheby[:, :, i - 2].clone() # Compute the Chebyshev interpolation y = torch.einsum('bid,iod->bo', cheby, self.cheby_coeffs) # shape = (batch_size, outdim) y = y.view(-1, self.outdim) return y
[docs] class JacobiLayer(nn.Module): def __init__(self, input_dim, output_dim, degree, a=1.0, b=1.0): super(JacobiLayer, self).__init__() self.inputdim = input_dim self.outdim = output_dim self.a = a self.b = b self.degree = degree self.jacobi_coeffs = nn.Parameter(torch.empty(input_dim, output_dim, degree + 1)) nn.init.normal_(self.jacobi_coeffs, mean=0.0, std=1/(input_dim * (degree + 1)))
[docs] def forward(self, x): x = torch.reshape(x, (-1, self.inputdim)) # shape = (batch_size, inputdim) # Since Jacobian polynomial is defined in [-1, 1] # We need to normalize x to [-1, 1] using tanh #x = torch.tanh(x) # Initialize Jacobian polynomial tensors jacobi = torch.ones(x.shape[0], self.inputdim, self.degree + 1, device=x.device) if self.degree > 0: ## degree = 0: jacobi[:, :, 0] = 1 (already initialized) ; degree = 1: jacobi[:, :, 1] = x ; d jacobi[:, :, 1] = ((self.a-self.b) + (self.a+self.b+2) * x) / 2 for i in range(2, self.degree + 1): theta_k = (2*i+self.a+self.b)*(2*i+self.a+self.b-1) / (2*i*(i+self.a+self.b)) theta_k1 = (2*i+self.a+self.b-1)*(self.a*self.a-self.b*self.b) / (2*i*(i+self.a+self.b)*(2*i+self.a+self.b-2)) theta_k2 = (i+self.a-1)*(i+self.b-1)*(2*i+self.a+self.b) / (i*(i+self.a+self.b)*(2*i+self.a+self.b-2)) jacobi[:, :, i] = (theta_k * x + theta_k1) * jacobi[:, :, i - 1].clone() - theta_k2 * jacobi[:, :, i - 2].clone() # 2 * x * jacobi[:, :, i - 1].clone() - jacobi[:, :, i - 2].clone() # Compute the Jacobian interpolation y = torch.einsum('bid,iod->bo', jacobi, self.jacobi_coeffs) # shape = (batch_size, outdim) y = y.view(-1, self.outdim) return y
[docs] class TaylorLayer(nn.Module): def __init__(self, input_dim, out_dim, order, addbias=True): super(TaylorLayer, self).__init__() self.input_dim = input_dim self.out_dim = out_dim self.order = order self.addbias = addbias self.coeffs = nn.Parameter(torch.randn(out_dim, input_dim, order) * 0.01) if self.addbias: self.bias = nn.Parameter(torch.zeros(1, out_dim))
[docs] def forward(self, x): shape = x.shape outshape = shape[0:-1] + (self.out_dim,) x = torch.reshape(x, (-1, self.input_dim)) x_expanded = x.unsqueeze(1).expand(-1, self.out_dim, -1) y = torch.zeros((x.shape[0], self.out_dim), device=x.device) for i in range(self.order): term = (x_expanded ** i) * self.coeffs[:, :, i] y += term.sum(dim=-1) if self.addbias: y += self.bias y = torch.reshape(y, outshape) return y
[docs] class MakeDataset_kan(Dataset): def __init__(self, inputs: torch.Tensor, outputs: torch.Tensor): """ Inicializa el dataset con los datos de entrada y salida. Args: inputs (torch.Tensor): Tensor de entradas (n casos, ninput columnas). outputs (torch.Tensor): Tensor de salidas (n casos, noutput columnas). """ self.inputs = inputs self.outputs = outputs def __len__(self): """ Retorna el número de casos en el dataset. Returns: int: Número de filas en los datos. """ return len(self.inputs) def __getitem__(self, idx): """ Retorna un par (input, output) en el índice `idx`. Args: idx (int): Índice del caso que se quiere acceder. Returns: Tuple[torch.Tensor, torch.Tensor]: Par (input, output) en el índice dado. """ x = self.inputs[idx] y = self.outputs[idx] return x, y