Source code for pyreco.custom_models

import numpy as np
from abc import ABC
from typing import Union
import copy
import multiprocessing
from functools import partial

from pyreco.layers import (
    Layer,
    InputLayer,
    ReservoirLayer,
    ReadoutLayer,
)
from pyreco.optimizers import Optimizer, assign_optimizer
from pyreco.metrics import assign_metric
from pyreco.node_selector import NodeSelector
from pyreco.initializer import NetworkInitializer
from pyreco.utils_networks import rename_nodes_after_removal


# def sample_random_nodes(total_nodes: int, fraction: float):
#     """
#     Select a subset of randomly chosen nodes.

#     Args:
#     total_nodes (int): Total number of available nodes.
#     fraction (float): Fraction of nodes to select.

#     Returns:
#     np.ndarray: Array of randomly selected node indices.
#     """
#     return np.random.choice(
#         total_nodes, size=int(total_nodes * fraction), replace=False
#     )


# def discard_transients_indices(n_batches, n_timesteps, transients):
#     indices_to_remove = []
#     for i in range(n_batches * n_timesteps):
#         t = i % n_timesteps  # Current timestep within the batch
#         if t < transients:
#             indices_to_remove.append(i)
#     return indices_to_remove


[docs] class CustomModel(ABC): """ Abstract base class for custom reservoir computing model. Has a syntax similar to the one of TensorFlow model API, e.g. using the model.add() statement to add layers to the model. A model hast an input layer, a reservoir layer and a readout layer. """ def __init__(self): """ Initialize the CustomModel with empty layers and default values. """ # Initialize layers self.input_layer: InputLayer self.reservoir_layer: ReservoirLayer self.readout_layer: ReadoutLayer # Initialize hyperparameters self.metrics = [] self.metrics_fun = [] self.optimizer: Optimizer self.discard_transients = 0 # Initialize other attributes self.num_trainable_weights: int self.num_nodes: int self.num_time_in: int self.num_time_out: int self.num_states_in: int self.num_states_out: int
[docs] def add(self, layer: Layer): """ Add a layer to the model. Is type-sensitive and will assign the layer to the correct attribute. Args: layer (Layer): Layer to be added to the model. """ # Sanity check for the correct shape of the input argument layer if not isinstance(layer, Layer): raise TypeError( "The layer must be an instance of the Layer class or its subclasses." ) # assign the layer to the correct attribute if isinstance(layer, InputLayer): self.input_layer = layer elif issubclass(type(layer), ReservoirLayer): self.reservoir_layer = layer elif isinstance(layer, ReadoutLayer): self.readout_layer = layer else: raise ValueError("Unsupported layer type.")
# TODO: the following method should be implemented in the CustomModel class # def _set_readin_nodes(self, nodes: Union[list, np.ndarray] = None):
[docs] def compile( self, optimizer: str = "ridge", metrics: Union[list, str] = None, discard_transients: int = 0, ): """ Configure the model for training. Args: optimizer (str): Name of the optimizer. metrics (list): List of metric names. discard_transients (int): Number of initial transient timesteps to discard. """ # sanity checks for inputs if not isinstance(discard_transients, int): raise TypeError("discard_transients must be a positive integer!") elif discard_transients < 0: raise ValueError("discard_transients must be >=0") if metrics is None: metrics = ["mse"] # check consistency of layers, data shapes etc. # TODO: do we have input, reservoir and readout layer? # TODO: are all shapes correct on input and output side? # TODO: let the user specify the reservoir initialization method # set the metrics (like in TensorFlow) self._set_metrics(metrics) # set the optimizer that will find the readout weights self._set_optimizer(optimizer) # set number of transients to discard (warmup phase) self.discard_transients = int(discard_transients) # copy some layer properties to the model level for easier access self.num_states_in = self.input_layer.n_states self.num_states_out = self.readout_layer.n_states self.num_nodes = self.reservoir_layer.nodes # Sample the input connections: create W_in read-in weight matrix self._connect_input_to_reservoir() # check for dependency injection here! # Set initial states of the reservoir # TODO: let the user specify the reservoir initialization method self._initialize_network(method="random_normal") # Select readout nodes according to the fraction specified by the user in the readout layer. By default, randomly sample nodes. User can also provide a list of nodes to use for readout. self._set_readout_nodes() # flag all layers as compiled (required for later manipulation from outside) self.input_layer._is_compiled = True self.reservoir_layer._is_compiled = True self.readout_layer._is_compiled = True
[docs] def fit(self, x: np.ndarray, y: np.ndarray, n_init: int = 1, store_states: bool = False ) -> dict: """ RC training with batch processing. """ # sanity checks for inputs if not isinstance(x, np.ndarray) or not isinstance(y, np.ndarray): raise TypeError("Input and target data must be numpy arrays.") if x.ndim != 3 or y.ndim != 3: raise ValueError( "Input and target data must have 3 dimensions (n_batch, n_timesteps, n_features)." ) if x.shape[0] != y.shape[0]: raise ValueError( "Input and target data must have the same number of samples (first dimension)." ) if x.shape[1] < y.shape[1]: raise ValueError( "Input data must have at least as many timesteps as the target data." ) if not isinstance(n_init, int): raise TypeError("Number of initializations must be an integer (positive).") if n_init < 1: raise ValueError("Number of initializations must be at least 1.") if not isinstance(store_states, bool): raise TypeError("store_states must be a boolean.") # Extract shapes of inputs and outputs, expected to be 3D arrays # of shape [n_batch, n_timesteps, n_features] n_batch, n_time_in = x.shape[0], x.shape[1] n_time_out, n_states_out = y.shape[-2], y.shape[-1] n_nodes = self.reservoir_layer.nodes # discard transients (warmup phase). This is done by removing the first n_transients timesteps from the reservoir states. # Hence, the targets can have a maximum of (t_in - t_discard) steps, before we have to cut also from the targets # If the number of t_out steps is even smaller, we will discard more steps from the reservoir states self.num_transients_to_remove = 0 if self.discard_transients >= n_time_in: raise ValueError( f"Number of transients to discard ({self.discard_transients}) must be smaller than the number of time steps in the input data ({n_time_in})." ) if (n_time_in - self.discard_transients) < n_time_out: print( f"Discarding {self.discard_transients} time steps will reduce the number of output time steps to {n_time_in-self.discard_transients}. The given targets had {n_time_out} time steps." ) # cut first steps from the targets to match the desired warmup phase y = y[:, self.discard_transients :, :] n_time_out = n_time_in - self.discard_transients self.num_transients_to_remove = self.discard_transients if (n_time_in - self.discard_transients) > n_time_out: # enlarge the number of transients to discard to match the output shape self.num_transients_to_remove = n_time_in - n_time_out print( f"discarding {self.num_transients_to_remove} reservoir states to match the number of time steps on the output." ) # update some class attributes that depend on the training data self.num_time_in = n_time_in self.num_states_in = x.shape[-1] self.num_time_out = n_time_out self.num_states_out = n_states_out # Pre-allocate arrays for storing results n_R0 = np.zeros((n_init, n_nodes)) n_weights = np.zeros( (n_init, len(self.readout_layer.readout_nodes), n_states_out) ) n_scores = np.zeros(n_init) n_res_states = [] if store_states else None # Get metric functions that scores the model performance metric_fun = ( self.metrics_fun[0] if self.metrics_fun else assign_metric("mean_squared_error") ) # Batch process multiple reservoir initializations for i in range(n_init): if n_init > 1: print(f"initialization {i}/{n_init}: computing reservoir states") # train the model on the given data reservoir_states, _ = self._train_model(x=x, y=y) # Tracking the variation of initial conditions n_R0[i] = self.reservoir_layer.initial_res_states n_weights[i] = self.readout_layer.weights n_scores[i] = metric_fun(y, self.predict(x=x)) if store_states: n_res_states.append(reservoir_states) # Select best initialization idx_optimal = np.argmin(n_scores) self.reservoir_layer.set_initial_state(n_R0[idx_optimal]) self.readout_layer.weights = n_weights[idx_optimal] # Update trainable weights count self.num_trainable_weights = self.reservoir_layer.weights.size # Build history dictionary history = { "init_res_states": n_R0, "readout_weights": n_weights, "train_scores": n_scores, } if store_states: history["res_states"] = n_res_states return history
[docs] def predict(self, x: np.ndarray) -> np.ndarray: """ Make predictions for given input. Args: x (np.ndarray): Input data of shape [n_batch, n_timestep, n_states] one_shot (bool): If True, don't re-initialize reservoir between samples. Returns: np.ndarray: Predictions of shape [n_batch, n_timestep, n_states] """ # makes prediction for given input (single-step prediction) # expects inputs of shape [n_batch, n_timestep, n_states] # returns predictions in shape of [n_batch, n_timestep, n_states] # one_shot = True will *not* re-initialize the reservoir from sample to sample. Introduces a dependency on the # sequence by which the samples are given # TODO: external function that is going to check the input dimensionality # and raise an error if shape is not correct # Compute reservoir states. Returns reservoir states of shape # [n_batch, n_timesteps+1, n_nodes] # (n_timesteps+1 because we also store the initial state) reservoir_states = self.compute_reservoir_state(x) # discard the given transients from the reservoir states, incl. initial reservoir state. Should give the size of (n_batch, n_time_out, n_nodes) del_mask = np.arange(0, self.num_transients_to_remove + 1) reservoir_states = np.delete(reservoir_states, del_mask, axis=1) # Masking non-readout nodes: if the user specified to not use all nodes for output, we can get rid of the non-readout node states reservoir_states = reservoir_states[:, :, self.readout_layer.readout_nodes] # make predictions y = R * W_out, W_out has a shape of [n_out, N] y_pred = np.einsum( "bik,jk->bij", reservoir_states, self.readout_layer.weights.T ) return y_pred
[docs] def evaluate( self, x: np.ndarray, y: np.ndarray, metrics: Union[str, list, None] = None ) -> tuple: """ Evaluate metrics on predictions made for input data. Args: x (np.ndarray): Input data of shape [n_batch, n_timesteps, n_states] y (np.ndarray): Target data of shape [n_batch, n_timesteps_out, n_states_out] metrics (Union[str, list, None], optional): List of metric names or a single metric name. If None, use metrics from .compile() Returns: tuple: Metric values """ # evaluate metrics on predictions made for input data # expects: x of shape [n_batch, n_timesteps, n_states] # expects: y of shape [n_batch, n_timesteps_out, n_states_out] # depends on self.metrics = metrics from .compile() # returns float, if multiple metrics, then in given order (TODO: implement this) if ( metrics is None ): # user did not specify metric, take the one(s) given to .compile() metrics = self.metrics if type(metrics) is str: # make sure that we are working with lists of strings metrics = [metrics] # self.metrics_available = ['mse', 'mae # # eval_metrics = self.metrics + metrics # combine from .compile and user specified # eval_metrics = list(set(eval_metrics)) # removes potential duplicates # get metric function handle from the list of metrics specified as str metric_funs = [assign_metric(m) for m in metrics] # make predictions y_pred = self.predict(x=x) # remove the time steps that were discarded during training (transient removal, generic seq-to-seq modeling) n_time_out = y_pred.shape[-2] y = y[:, -n_time_out:, :] # if self.discard_transients > 0: # y = y[:, self.discard_transients :, :] # get metric values metric_values = [] for _metric_fun in metric_funs: metric_values.append(float(_metric_fun(y, y_pred))) return metric_values
def _train_model(self, x: np.ndarray, y: np.ndarray): """ Train the model with a single reservoir initialization. Args: x (np.ndarray): Input data of shape [n_batch, n_timesteps, n_states] y (np.ndarray): Target data of shape [n_batch, n_timesteps, n_states] Returns: dict: History of the training process """ # extract shapes n_batch = x.shape[0] n_time_out, n_states_out = y.shape[1], y.shape[2] # Compute reservoir states. This is the most time-consuming part of the training process. # returns reservoir states of shape [n_batch, n_timesteps+1, n_nodes] # (n_timesteps+1 because we also store the initial state) reservoir_states = self.compute_reservoir_state(x) # discard the requested transients from the reservoir states, incl. initial reservoir state. Should give the size of (n_batch, n_time_out, n_nodes) # self.num_transients_to_remove del_mask = np.arange(0, self.num_transients_to_remove + 1) # del_mask = np.arange(0, self.discard_transients + 1) reservoir_states = np.delete(reservoir_states, del_mask, axis=1) # # now select only the reservoir states that are needed for the output # reservoir_states = reservoir_states[:, -self.num_time_out :, :] # Masking non-readout nodes: if the user specified to not use all nodes for output, we can get rid of the non-readout node states # TODO: check if the readout nodes are in the correct order!!! reservoir_states = reservoir_states[:, :, self.readout_layer.readout_nodes] # Training: Solve regression problem y = R^T * W_out # First reshape reservoir states and targets such that we regress across # all batches and time steps b = y.reshape(n_batch * n_time_out, n_states_out) num_readout = len(self.readout_layer.readout_nodes) A = reservoir_states.reshape(n_batch * n_time_out, num_readout) # Solve the regression problem b = Ax to find readout weights self.readout_layer.weights = self.optimizer.solve(A=A, b=b) # is there is only a single system state to predict, we need to add that dim # TODO: move this to the sanity checks and add an artificial dimension prior to fitting! if self.readout_layer.weights.ndim == 1: self.readout_layer.weights = np.expand_dims( self.readout_layer.weights, axis=-1 ) return reservoir_states, self.readout_layer.weights
[docs] def remove_reservoir_nodes(self, nodes: list): # removes specific nodes from the reservoir matrix, and # deletes accordingly the relevant readin weights and readout weights # print( # f"removing nodes {nodes} from the reservoir. You need to retrain the model!" # ) if not isinstance(nodes, list): raise TypeError("Nodes must be provided as a list of indices.") if np.max(nodes) > self.reservoir_layer.nodes: raise ValueError( f"Node index {np.max(nodes)} exceeds the number of nodes {self.reservoir_layer.nodes} in the reservoir." ) if np.min(nodes) < 0: raise ValueError("Node index must be positive.") if len(nodes) >= self.reservoir_layer.nodes: raise ValueError("You cannot remove all nodes from the reservoir.") # 1. remove nodes from the reservoir layer self.reservoir_layer.remove_nodes(nodes) # 2. remove nodes from the read-in weights matrix of shape [num_nodes, num_states_in] self.input_layer.remove_nodes(nodes) # 2.b update input-receiving nodes in reservoir layer # Find non-zero rows non_zero_rows = np.all(self.input_layer.weights != 0, axis=1) # Get the indices of zero rows non_zero_row_indices = np.where(non_zero_rows)[0] self.reservoir_layer.input_receiving_nodes = non_zero_row_indices # 3. remove nodes from the list of readout-nodes in the readout layer # update the indices in the readout.readout_nodes list self.readout_layer.readout_nodes = rename_nodes_after_removal( original_nodes=self.readout_layer.readout_nodes, removed_nodes=nodes ) self.readout_layer.update_layer_properties()
# TODO: any more attributes to change here? """ The setter methods are used to set the parameters of the model. """ def _set_readin_weights(self, weights: Union[list, np.ndarray]): """ Set the read-in weights matrix. Args: weights (Union[list, np.ndarray]): Read-in weights matrix. """ # some sanity checks # type check if not isinstance(weights, np.ndarray) and not isinstance(weights, list): raise TypeError("Read-in weights matrix has to be a numpy array or a list.") if isinstance(weights, list): # convert to np.array if list weights = np.array(weights) # read-in weights matrix has to have the shape [n_nodes, n_states_in] if weights.shape != (self.reservoir_layer.nodes, self.num_states_in): raise ValueError( f"Read-in weights matrix has to have the shape [n_nodes, n_states_in], i.e. {self.reservoir_layer.nodes}, {self.num_states_in}]" ) # set the read-in weights in the input layer self.input_layer.weights = weights def _set_readout_nodes(self, nodes: Union[list, np.ndarray] = None): """ Sets the nodes that will be linked to the output. Args: nodes (Union[list, np.ndarray], optional): Specific nodes to use for readout provided as indices. If None, randomly sample nodes. """ if nodes is None: selector = NodeSelector( total_nodes=self.reservoir_layer.nodes, strategy="random_uniform_wo_repl", ) nodes = selector.select_nodes(fraction=self.readout_layer.fraction_out) # set the readout nodes in the readout layer self.readout_layer.readout_nodes = nodes # sorted(nodes) def _set_optimizer(self, optimizer: Union[str, Optimizer]): """ Sets the optimizer that will find the readout weights. Args: optimizer (Union[str, Optimizer]): Name of the optimizer or an Optimizer instance. """ self.optimizer = assign_optimizer(optimizer) def _set_metrics(self, metrics: Union[list, str]): """ Sets the metric(s) for model evaluation. Args: metrics (Union[list, str]): List of metric names or a single metric name. """ if isinstance(metrics, str): # only single metric given self.metrics = [metrics] else: self.metrics = metrics # if metrics is a list of strings. # assign the metric functions (callable) according to the metric names self.metrics_fun = [] # has been initialized, we reset it here for metric in self.metrics: self.metrics_fun.append(assign_metric(metric)) def _set_init_states(self, init_states: Union[list, np.ndarray]): """ Sets the initial states of the reservoir nodes. Args: init_states (np.ndarray, optional): Array of initial states. If None, sample initial states using the specified method. """ # set the initial states in the reservoir layer self.reservoir_layer.set_initial_state(r_init=init_states) def _initialize_network(self, method: str = "random_normal"): """ Initialize the reservoir states. Args: method (str, optional): Method for sampling initial states. """ num_nodes = self.reservoir_layer.nodes initializer = NetworkInitializer(method=method) init_states = initializer.gen_initial_states(num_nodes) self._set_init_states(init_states=init_states) def _connect_input_to_reservoir(self): """ Wire input layer with reservoir layer. Creates a random matrix of shape [nodes x n_states_in], i.e. number of reservoir nodes x state dimension of input. If no full connection is desired, a fraction of nodes will be selected according to the fraction_input parameter of the reservoir layer. """ # generate random input connection matrix [nodes, n_states_in] net_generator = NetworkInitializer(method="random_normal") full_input_weights = net_generator.gen_initial_states( shape=(self.num_nodes, self.num_states_in) ) # select read-in node indices according to the fraction specified by the user node_selector = NodeSelector( total_nodes=self.num_nodes, strategy="random_uniform_wo_repl", ) # select the fraction of nodes that are input nodes [nodes, n_states_in] input_receiving_nodes = node_selector.select_nodes( fraction=self.reservoir_layer.fraction_input ) self.reservoir_layer.input_receiving_nodes = input_receiving_nodes node_mask = np.ones_like(full_input_weights) node_mask[input_receiving_nodes] = 0 # set the input layer weight matrix self._set_readin_weights(weights=(full_input_weights * node_mask))
[docs] def compute_reservoir_state(self, x: np.ndarray) -> np.ndarray: """ Vectorized computation of reservoir states with batch processing. Args: x (np.ndarray): Input data of shape [n_batch, n_timesteps, n_states] Returns: np.ndarray: Reservoir states of shape [(n_batch * n_timesteps), N] """ # sanity checks for inputs if not isinstance(x, np.ndarray): raise TypeError("Input data must be a numpy array.") if x.ndim != 3: raise ValueError( "Input data must have 3 dimensions (n_batch, n_timesteps, n_states)." ) # Extract shapes and parameters n_batch, n_time = x.shape[0], x.shape[1] # get local variables for easier access num_nodes = self.reservoir_layer.nodes activation = self.reservoir_layer.activation_fun alpha = self.reservoir_layer.leakage_rate # leakage rate A = self.reservoir_layer.weights # reservoir weight matrix (adjacency matrix) W_in = self.input_layer.weights # read-in weight matrix # We will compute the reservoir states for all time steps in the first sample, # then reset the reservoir state to the initial values, and proceed with the # next sample. This makes sure to have no data leakage between samples. # Pre-allocate reservoir state matrix and initialize with initial states states = np.zeros((n_batch, n_time + 1, num_nodes)) states[:, 0] = self.reservoir_layer.initial_res_states # vectorized computation of reservoir states over time steps # 1. compute dot(W_in, x) for all time steps across all batches # (can be done before the loop as it does not depend on the reservoir states) input_contrib = np.einsum( "ij,btj->bti", W_in, x ) # shape [n_batch, n_time, n_nodes] # 2. now step through time to compute reservoir states for t in range(n_time): # compute dot(A, r(t)) for all batches reservoir_contrib = np.einsum("ij,bj->bi", A, states[:, t]) # now compute r(t+1) = (1-alpha) * r(t) + alpha * g(r(t) * A + x * W_in) states[:, t + 1] = (1 - alpha) * states[:, t] + alpha * activation( reservoir_contrib + input_contrib[:, t] ) # flatten reservoir states along batch dimension: # [(n_batch * n_timesteps), num_nodes] # states[:, 1:].reshape(-1, num_nodes) return states
[docs] def fit_evolve(self, X: np.ndarray, y: np.ndarray): # build an evolving reservoir computer: performance-dependent node addition and removal history = None return history
# # @abstractmethod # def get_params(self, deep=True): # """ # Get parameters for scikit-learn compatibility. # Args: # deep (bool): If True, return a deep copy of parameters. # Returns: # dict: Dictionary of model parameters. # """ # # needed for scikit-learn compatibility # return { # "input_layer": self.input_layer, # "reservoir_layer": self.reservoir_layer, # "readout_layer": self.readout_layer, # } # @abstractmethod
[docs] def save(self, path: str): """ Store the model to disk. Args: path (str): Path to save the model. """ # store the model to disk raise NotImplementedError("Method not implemented yet.")
[docs] def plot(self, path: str): """ Print the model to some figure file. Args: path (str): Path to save the figure. """ # print the model to some figure file raise NotImplementedError("Method not implemented yet.")
[docs] def set_hp(self, **kwargs): """ Set one or more reservoir hyperparameters. Supported kwargs: spec_rad, leakage_rate, activation """ supported_hps = {"spec_rad", "leakage_rate", "activation"} unsupported = set(kwargs) - supported_hps if unsupported: raise ValueError(f"Unsupported hyperparameter(s): {', '.join(unsupported)}") if 'spec_rad' in kwargs: self.reservoir_layer.set_spec_rad(kwargs['spec_rad']) if 'leakage_rate' in kwargs: self.reservoir_layer.set_leakage_rate(kwargs['leakage_rate']) if 'activation' in kwargs: self.reservoir_layer.set_activation(kwargs['activation'])
# Add more as needed
[docs] def get_hp(self, *args): """ Get one or more reservoir hyperparameters. If no args, returns all. Usage: get_hp('spec_rad', 'activation') or get_hp() """ all_hps = { 'spec_rad': self.reservoir_layer.get_spec_rad(), 'leakage_rate': self.reservoir_layer.get_leakage_rate(), 'activation': self.reservoir_layer.get_activation(), # Add more as needed } if args: return {hp: all_hps[hp] for hp in args if hp in all_hps} else: return all_hps
[docs] class RC(CustomModel): # the non-auto version """ Non-autonomous version of the reservoir computer. """ def __init__(self): # at the moment we do not have any arguments to pass super().__init__()
[docs] class AutoRC(CustomModel): """ Autonomous version of the reservoir computer. """ def __init__(self): pass
[docs] def predict_ar(self, X: np.ndarray, n_steps: int = 10): """ Perform auto-regressive prediction (time series forecasting). Args: X (np.ndarray): Initial input data. n_steps (int): Number of steps to predict into the future. Returns: np.ndarray: Predicted future states. """ pass
[docs] class HybridRC(CustomModel): """ Hybrid version of the reservoir computer. """ def __init__(self): pass
if __name__ == "__main__": print("hello")