"""
We will have an abstract Layer class, from which the following layers inherit:
- InputLayer
- ReservoirLayer
- RandomReservoir
- RecurrenceReservoir
- EvolvedReservoir
- ReadoutLayer
"""
from abc import ABC, abstractmethod
import numpy as np
import networkx as nx
from pyreco.utils_networks import (
gen_ER_graph,
compute_density,
get_num_nodes,
compute_spec_rad,
remove_nodes_from_graph,
set_spec_rad
)
# implements the abstract base class
[docs]
class Layer(ABC):
@abstractmethod
def __init__(self):
self.weights = None # every layer will have some weights (trainable or not)
self.name: str = "layer"
self._is_compiled: bool = False # flag to check if the overall model is compiled
pass
[docs]
@abstractmethod
def update_layer_properties(self):
pass
[docs]
class ReadoutLayer(Layer):
def __init__(self, output_shape, fraction_out=1.0):
# expects output_shape = (n_timesteps, n_states)
super().__init__()
self.output_shape: tuple = output_shape
self.n_time = output_shape[0]
self.n_states = output_shape[1]
self.fraction_out: float = (
fraction_out # fraction of connections to the reservoir
)
self.name = "readout_layer"
self.readout_nodes = [] # list of nodes that are linked to output
[docs]
def remove_nodes(self, nodes: list):
# remove nodes from the readout layer, when the user wants to delete specific reservoir nodes
# # delete entries that equal the nodes to be removed
# idx_del = np.where(np.isin(self.readout_nodes, nodes))[0]
# self.readout_nodes = np.delete(self.readout_nodes, idx_del)
# self.update_layer_properties()
raise NotImplementedError("This method is not yet implemented.")
[docs]
def update_layer_properties(self):
# updates the properties of the readout layer
self.fraction_out = len(self.readout_nodes) / self.weights.shape[0]
pass
[docs]
class ReservoirLayer(Layer): # subclass for the specific reservoir layers
def __init__(
self,
nodes,
density,
activation,
leakage_rate,
fraction_input,
init_res_sampling,
seed: int = 42,
):
super().__init__()
self.nodes: int = nodes # number of reservoir nodes
self.density: float = density
self.spec_rad = None
self.activation = activation
self.leakage_rate = leakage_rate
self.name = "reservoir_layer"
self.fraction_input = fraction_input
self.weights = None # the adjacency matrix / reservoir network
self.input_receiving_nodes = None
# initial reservoir state (will be set later)
self.initial_res_states = None
self.init_res_sampling = init_res_sampling
[docs]
def activation_fun(self, x: np.ndarray):
if self.activation == "sigmoid":
return 1 / (1 + np.exp(-x))
elif self.activation == "tanh":
return (np.exp(x) - np.exp(-x)) / (np.exp(x) + np.exp(-x))
else:
raise (ValueError(f"unknown activation function {self.activation}!"))
[docs]
def set_weights(self, network: np.ndarray):
# set reservoir network (adjacency matrix) from outside.
# Updates all related parameters.
# MUST be having the same shape as the existing reservoir network.
if not isinstance(network, np.ndarray):
raise TypeError("Network must be a numpy array.")
if network.ndim != 2:
raise ValueError("Network must be a 2D numpy array.")
if network.shape[0] != network.shape[1]:
raise ValueError("Network must be square (same number of rows and columns).")
# for now, we only accept matrices that have the same shape as the existing reservoir
if self._is_compiled and (network.shape != self.weights.shape):
raise ValueError(f"Network must have the same shape as the existing reservoir ({self.weights.shape}).")
# overwrite the existing adjacency matrix
self.weights = network
# update related reservoir properties (density, spectral radius, etc.)
self.update_layer_properties()
[docs]
def set_initial_state(self, r_init: np.ndarray):
# assigns an initial state to each of the reservoir nodes
if r_init.shape[0] != self.nodes:
raise (
ValueError(
"initial reservoir state does not match the number of nodes in the reservoir!"
)
)
self.initial_res_states = r_init
[docs]
def remove_nodes(self, nodes: list):
"""
Remove specified nodes from the reservoir network.
Parameters:
nodes (list): A list of indices representing the nodes to be removed.
1. Removes the specified nodes from the adjacency matrix.
2. Updates the reservoir properties including the number of nodes, density, and spectral radius.
"""
# remove nodes from the reservoir network
if not isinstance(nodes, list):
raise TypeError("Nodes must be provided as a list of indices.")
if np.max(nodes) > self.nodes:
raise ValueError("Node index exceeds the number of nodes in the reservoir.")
if np.min(nodes) < 0:
raise ValueError("Node index must be positive.")
# 1. remove nodes from the adjacency matrix
self.weights = remove_nodes_from_graph(graph=self.weights, nodes=nodes)
# 2. update reservoir properties
self.update_layer_properties()
# 3. update the initial reservoir state
self.initial_res_states = np.delete(self.initial_res_states, nodes, axis=0)
# # 4. update the info about input-receiving nodes
# self.input_receiving_nodes = np.delete(self.input_receiving_nodes, nodes)
[docs]
def update_layer_properties(self):
"""
Updates the reservoir properties including the number of nodes, density, and spectral radius.
This method should be called after any changes to the reservoir network.
"""
self.nodes = get_num_nodes(self.weights)
self.density = compute_density(self.weights)
self.spec_rad = compute_spec_rad(self.weights)
[docs]
def get_spec_rad(self):
return self.spec_rad
[docs]
def set_spec_rad(self, value):
""" Sets the spectral radius of the reservoir network.
Parameters:
- value (float): The desired spectral radius.
Raises:
- ValueError: If the value is not positive or if the weights are not set."""
# check that the value is positive and that the weights are set
if value <= 0:
raise ValueError("Spectral radius must be positive")
if self.weights is not None:
self.weights = set_spec_rad(self.weights, value)
else:
raise ValueError("Weights must be set before changing spectral radius.")
# check that the weights re-scaling was successful
_new_spec_rad = compute_spec_rad(self.weights)
if np.isclose(value, _new_spec_rad, atol=1e-5):
self.spec_rad = _new_spec_rad
else:
raise ValueError(
f"Failed to set spectral radius within tolerance 1e-5. Expected {value}, got {_new_spec_rad}."
)
[docs]
def get_leakage_rate(self):
return self.leakage_rate
[docs]
def set_leakage_rate(self, value):
if not (0 < value <= 1):
raise ValueError("Leakage rate must be in (0, 1]")
self.leakage_rate = value
[docs]
def get_activation(self):
return self.activation
[docs]
def set_activation(self, value):
if value not in ["tanh", "sigmoid"]:
raise ValueError("Activation must be 'tanh' or 'sigmoid'")
self.activation = value
[docs]
class RandomReservoirLayer(ReservoirLayer):
def __init__(
self,
nodes,
density: float = 0.1,
activation: str = "tanh",
leakage_rate: float = 0.5,
fraction_input: float = 0.8,
spec_rad: float = 0.9,
init_res_sampling="random_normal",
seed=None,
):
# Call the parent class's __init__ method
super().__init__(
nodes=nodes,
density=density,
activation=activation,
leakage_rate=leakage_rate,
fraction_input=fraction_input,
init_res_sampling=init_res_sampling,
seed=seed,
)
# initialize subclass-specific attributes
self.seed = seed
self.spec_rad = spec_rad
# generate a random ER graph using networkx
self.weights = gen_ER_graph(
nodes=nodes,
density=density,
spec_rad=self.spec_rad,
directed=True,
seed=seed,
)
# update the spectral radius and the density to the actual values
self.update_layer_properties()
# check that the graph actually satisfies the spectral radius and
# density requested by the user
_spec_rad = compute_spec_rad(self.weights)
_density = compute_density(self.weights)
if np.isclose(self.spec_rad, _spec_rad, atol=1e-5):
self.spec_rad = _spec_rad
else:
raise ValueError(
f"Failed to set spectral radius within tolerance 1e-5. Expected {self.spec_rad}, got {_spec_rad}."
)
if np.isclose(self.density, _density, atol=1e-5):
self.density = _density
else:
raise ValueError(
f"Failed to set density within tolerance 1e-5. Expected {self.density}, got {_density}."
)
#def update_layer_properties(self):
#Updates the reservoir properties including the number of nodes, density, and spectral radius.
#self.nodes = get_num_nodes(self.weights)
#self.density = compute_density(self.weights)
# self.spec_rad = compute_spec_rad(self.weights)
# class ReccurrenceLayer(ReservoirLayer):
# # To Do: accept a random seed
# def __init__(self, nodes, density, activation: str = 'tanh', leakage_rate: float = 0.2):
# # Call the parent class's __init__ method
# super().__init__(nodes, density, activation, leakage_rate)
#
# # Initialize subclass-specific attributes
# # https://pyts.readthedocs.io/en/stable/generated/pyts.image.RecurrencePlot.html#pyts.image.RecurrencePlot
# # https://tocsy.pik-potsdam.de/pyunicorn.php
#