diff --git a/ratinabox/contribs/FieldOfViewNeurons.py b/ratinabox/contribs/FieldOfViewNeurons.py deleted file mode 100644 index cc1c507..0000000 --- a/ratinabox/contribs/FieldOfViewNeurons.py +++ /dev/null @@ -1,280 +0,0 @@ -from ratinabox.Environment import Environment -from ratinabox.Agent import Agent -from ratinabox.Neurons import * -from ratinabox.utils import * - - -import matplotlib -from matplotlib.collections import EllipseCollection - -import copy -import numpy as np -import math -import types -from warnings import warn - - -class FieldOfViewNeurons(Neurons): - """FieldOfViewNeurons are collection of boundary vector cells or object vector cells organised so as to represent the local field of view i.e. what walls or objects the agent can "see" in the local vicinity. They work as follow: - - A "manifold" of boundary vector cells (BVCs) or object vector cells (OVCs) tiling the agents FoV is initialised. Users define the radius and angular extent of this manifold which determine the distances and angles available in the FoV. These default to a 180 degree FoV from distance of 0 to 20cm. Each point on this manifold is therefore defined by tuple (angle and radius), or (θ,r). Egocentric cells are initialised which tile this manifold (uniformly or growing with radius), the cell at position (θ,r) will fire with a preference for boundaries (in the case of BVCs) or objects (for OVCs) a distance r from the agent at an angle θ relative to the current heading direction (they are "egocentric"). Thus only if the part of the manifold where the cell sits crosses a wall or object, will that cell fire. Thus only the cells on the part of a manifold touching or close to a wall/object will fire. - - By default these are BVCs (the agent can only "see" walls), but if cell_type == 'OVC' then for each unique object type present in the Environment a manifold of these cells is created. So if there are two "types " of objects (red objects and blue objects) then one manifold will reveal the presence of red objects in the Agents FoV and the other blue objects. See Neurons.ObjectVectorCells for more information about how OVCs work. - - The tiling resolution (the spatial and angular tuning sizes of the smallest cells receptive fields) is given by the `spatial_resolution` param in the input dictionary. - - In order to visuale the manifold, we created a plotting function. First make a trajectory figure, then pass this into the plotting func: - >>> fig, ax = Ag.plot_trajectory() - >>> fig, ax = my_FoVNeurons.display_manifold(fig, ax) - or animate it with - >>> fig, ax = Ag.animate_trajectory(additional_plot_func=my_FoVNeurons.display_manifold) - - """ - - default_params = { - "FoV_distance": [0.0, 0.2], # min and max distances the agent can "see" - "FoV_angles": [ - 0, - 90, - ], # angluar FoV in degrees (will be symmetric on both sides, so give range in 0 (forwards) to 180 (backwards) - "spatial_resolution": 0.04, # resolution of each BVC tiling FoV - "manifold_function": "uniform", # whether all cells have "uniform" receptive field sizes or they grow ("hartley") with radius. - "cell_type": "BVC", # FoV neurons can either respond to local boundaries ("BVC") or objects ("OVC") - } - - def __init__(self, Agent, params={}): - - """This throws a deprecation warning on initialization.""" - warn(f'{self.__class__.__name__} will be deprecated. Please use the new class that can be imported using: from ratinabox.Neurons import FieldOfViewOVCs, FieldOfViewBVCs', DeprecationWarning, stacklevel=2) - - - self.params = copy.deepcopy(__class__.default_params) - self.params.update(params) - - # initialise the manifold of BVCs/OVCs tiling the Agents FoV - - if ( - type(self.params["manifold_function"]) is types.FunctionType - ): # allow users pass their own function which generates the manifold dictionary - manifold = self.params["manifold_function"]() - else: - manifold = self.get_manifold( - FoV_distance=self.params["FoV_distance"], - FoV_angles=self.params["FoV_angles"], - spatial_resolution=self.params["spatial_resolution"], - manifold_function=self.params["manifold_function"], - ) - self.manifold = manifold - - self.params["reference_frame"] = "egocentric" - - # initialise BVCs and set their distance/angular preferences and widths - if self.params["cell_type"] == "BVC": - self.params["n"] = manifold["n"] # one manifold, - super().__init__(Agent, self.params) - self.super = BoundaryVectorCells(Agent, self.params) - self.super.tuning_distances = manifold["mu_d"] - self.super.sigma_distances = manifold["sigma_d"] - self.super.tuning_angles = manifold["mu_theta"] - self.super.sigma_angles = manifold["sigma_theta"] - - # normalise the firing rates of the cells so their maximum fr is approximately given by max_fr - locs = self.Agent.Environment.discretise_environment(dx=0.04) - locs = locs.reshape(-1, locs.shape[-1]) - self.super.cell_fr_norm = np.ones(self.n) - self.super.cell_fr_norm = np.max( - self.get_state(evaluate_at=None, pos=locs), axis=1 - ) - - elif self.params["cell_type"] == "OVC": - unique_objects = np.unique(Agent.Environment.objects["object_types"]) - self.params["n"] = manifold["n"] * len( - unique_objects - ) # one manifold for each unique object type - super().__init__(Agent, self.params) - self.super = ObjectVectorCells(Agent, self.params) - for i, obj_id in enumerate(unique_objects): - # for each unique object type, make a manifold of neurons which respond to it at different angles/distances tiling the FOV - sid, eid = i * manifold["n"], (i + 1) * manifold["n"] - self.super.tuning_types[sid:eid] = obj_id * np.ones(manifold["n"]) - self.super.tuning_distances[sid:eid] = manifold["mu_d"] - self.super.sigma_distances[sid:eid] = manifold["sigma_d"] - self.super.tuning_angles[sid:eid] = manifold["mu_theta"] - self.super.sigma_angles[sid:eid] = manifold["sigma_theta"] - - def get_state(self, evaluate_at="agent", **kwargs): - return self.super.get_state(evaluate_at, **kwargs) - - def get_manifold( - self, - FoV_distance=[0.0, 0.2], - FoV_angles=[0, 90], - spatial_resolution=0.04, - manifold_function="uniform", - ): - """Returns the manifold of parameters for the B/OVCs tiling the Agents FoV. - - Args: - • FoV_distance (list): [min,max] distance from the agent to tile the manifold - • FoV_angles (list): [min,max] angular extent of the manifold in degrees - • spatial_resolution (float): size of each receptive field ("uniform") or the smallest receptive field ("hartley") - • manifold_function (str): "uniform" (all receptive fields the same size or "hartley" (further away receptive fields are larger in line with Hartley et al. 2000 eqn (1)) - - Returns: - • manifold (dict): a dictionary containing all details of the manifold of cells including - • "mu_d" : distance preferences - • "mu_theta" : angular preferences (radians) - • "sigma_d" : distance tunings - • "sigma_theta" : angular tunings (radians) - • "manifold_coords_polar" : coordinates in egocentric polar coords [d, theta] - • "manifold_coords_euclid" : coordinates in egocentric euclidean coords [x,y] - • "n" : number of cells on the manifold - - """ - FoV_angles_radians = [a * np.pi / 180 for a in FoV_angles] - (mu_d, mu_theta, sigma_d, sigma_theta) = ([], [], [], []) - - if manifold_function == "uniform": - dx = spatial_resolution - radii = np.arange(max(0.01, FoV_distance[0]), FoV_distance[1], dx) - for radius in radii: - dtheta = dx / radius - right_thetas = np.arange( - FoV_angles_radians[0] + dtheta / 2, - FoV_angles_radians[1], - dtheta, - ) - left_thetas = -right_thetas[::-1] - thetas = np.concatenate((left_thetas, right_thetas)) - for theta in thetas: - mu_d.append(radius) - mu_theta.append(theta) - sigma_d.append(spatial_resolution) - sigma_theta.append(spatial_resolution / radius) - - elif manifold_function == "hartley": - # Hartley model says that the spatial tuning width of a B/OVC should be proportional to the distance from the agent according to - # sigma_d = mu_d / beta + xi where beta := 12 and xi := 0.08 m are constants. - # In this case, however, we want to force the smallest cells to have sigma_d = spatial_resolution setting the constraint that xi = spatial_resolution - min_radius / beta - radius = max(0.01, FoV_distance[0]) - beta = 5 # smaller means larger rate of increase of cell size with radius - xi = spatial_resolution - radius / beta - while radius < FoV_distance[1]: - resolution = xi + radius / beta # spatial resolution of this row - dtheta = resolution / radius - if dtheta / 2 > FoV_angles_radians[1]: - right_thetas = np.array( - [FoV_angles_radians[0] + dtheta / 2] - ) # at least one cell in this row - else: - right_thetas = np.arange( - FoV_angles_radians[0] + dtheta / 2, - FoV_angles_radians[1], - dtheta, - ) - left_thetas = -right_thetas[::-1] - thetas = np.concatenate((left_thetas, right_thetas)) - for theta in thetas: - mu_d.append(radius) - mu_theta.append(theta) - sigma_d.append(resolution) - sigma_theta.append(resolution / radius) - # the radius of the next row of cells is found by solving the simultaneous equations: - # • r2 = r1 + sigma1/2 + sigma2/2 (the radius, i.e. sigma/2, of the second cell just touches the first cell) - # • sigma2 = r2/beta + xi (Hartleys rule) - # the solution is: r2 = (2*r1 + sigma1 +xi) / (2 - 1/beta) - radius = (2 * radius + resolution + xi) / (2 - 1 / beta) - mu_d, mu_theta, sigma_d, sigma_theta = ( - np.array(mu_d), - np.array(mu_theta), - np.array(sigma_d), - np.array(sigma_theta), - ) - manifold_coords_polar = np.array([mu_d, mu_theta]).T - manifold_coords_euclid = np.array( - [mu_d * np.sin(mu_theta), mu_d * np.cos(mu_theta)] - ).T - n = len(mu_d) - - manifold = { - "mu_d": mu_d, - "mu_theta": mu_theta, - "sigma_d": sigma_d, - "sigma_theta": sigma_theta, - "manifold_coords_polar": manifold_coords_polar, - "manifold_coords_euclid": manifold_coords_euclid, - "n": n, - } - - return manifold - - def display_manifold(self, fig=None, ax=None, t=None, object_type=0, **kwargs): - """Visualises the current firing rate of these cells relative to the Agent. Essentially this plots the "manifold" ontop of the Agent. Each cell is plotted as an ellipse where the alpha-value of its facecolor reflects the current firing rate (normalised against the approximate maximum firing rate for all cells, but, take this just as a visualisation) - - Args: - • fig, ax: the matplotlib fig, ax objects to plot on (if any), otherwise will plot the Environment - • t (float): time to plot at - • object_type (int): if self.cell_type=="OVC", which object type to plot - - Returns: - fig, ax: with the - """ - if t is None: - t = self.Agent.history["t"][-1] - t_id = np.argmin(np.abs(np.array(self.Agent.history["t"]) - t)) - - if fig is None and ax is None: - fig, ax = self.Agent.plot_trajectory(t_start=t - 10, t_end=t, **kwargs) - - pos = self.Agent.history["pos"][t_id] - head_direction = self.Agent.history["head_direction"][t_id] - head_direction_angle = (180 / np.pi) * ( - ratinabox.utils.get_angle(head_direction) - np.pi / 2 - ) # head direction angle (CCW from true North) - fr = self.history["firingrate"][t_id] - ego_y = head_direction / np.linalg.norm( - head_direction - ) # this is the "y" direction" in egocentric coords - ego_x = utils.rotate(ego_y, -np.pi / 2) - facecolor = self.color or "C1" - - if self.cell_type == "OVC": - sid, eid = int(object_type * self.manifold["n"]), int( - (object_type + 1) * self.manifold["n"] - ) - fr = fr[sid:eid] - facecolor = matplotlib.cm.get_cmap(self.Agent.Environment.object_colormap) - facecolor = facecolor( - object_type / (self.Agent.Environment.n_object_types - 1 + 1e-8) - ) - - # TODO redo this with ellipse collections - for i, coord in enumerate(self.manifold["manifold_coords_euclid"]): - [x, y] = coord - pos_of_cell = pos - x * ego_x + y * ego_y - facecolor = list(matplotlib.colors.to_rgba(facecolor)) - facecolor[-1] = max( - 0, min(1, fr[i] / (0.5 * self.super.max_fr)) - ) # set the alpha value - # circ = matplotlib.patches.Circle( - # (pos_of_cell[0], pos_of_cell[1]), - # radius=0.5 * self.spatial_resolution, - # linewidth=0.5, - # edgecolor="dimgrey", - # facecolor=facecolor, - # zorder=2.1, - # ) - ellipse = matplotlib.patches.Ellipse( - (pos_of_cell[0], pos_of_cell[1]), - width=self.manifold["sigma_theta"][i] * self.manifold["mu_d"][i], - height=self.manifold["sigma_d"][i], - angle=1.0 * head_direction_angle - + self.manifold["mu_theta"][i] * 180 / np.pi, - linewidth=0.5, - edgecolor="dimgrey", - facecolor=facecolor, - zorder=2.1, - ) - ax.add_patch(ellipse) - - return fig, ax diff --git a/ratinabox/contribs/NeuralNetworkNeurons.py b/ratinabox/contribs/NeuralNetworkNeurons.py deleted file mode 100644 index b486cb4..0000000 --- a/ratinabox/contribs/NeuralNetworkNeurons.py +++ /dev/null @@ -1,146 +0,0 @@ -from ratinabox.Environment import Environment -from ratinabox.Neurons import Neurons -import copy -import numpy as np - -import torch #pytorch, for the neural network -import torch.nn as nn -import warnings - - -class NeuralNetworkNeurons(Neurons): - """The NeuralNetworkNeurons class takes as inputs other RiaB Neurons classes and uses these to compute its own firing rates by concatenating the firing rates of all the inputs and passing them through a neural network model provided by the user. - - For example a NeuralNetworkNeurons could take the firing rates of a set of place cells and a set of grid cells as inputs and passes these through the deep neural network. This would go as follows: - - Env = Environment() - Ag = Agent(Env) - PCs = PlaceCells(Ag) - GCs = GridCells(Ag) - NNNs = NeuralNetworkNeurons(Ag, params={"input_layers": [PCs, GCs], "NeuralNetworkModule": }) - - Training the neural network: In order to maintain compatibility with other RiaB functionalities the .get_state() method returns a numpy array of firing rates detached from the pytorch graph (which cannot then be used for training). However, when called in the standard update()-loop it also saves an attribute called .firingrate_torch which is an attached pytorch tensor and can be used to take gradients for training etc., if this is needed. - - Users either provide a "NeuralNetworkModule" and no "n" (in which case self.n is set to be the output size of the NN) or "n" and no "NeuralNetworkModule" (in which case a default ReLU MLP with two hidden layers is used with self.n_out = self.n). If both are provided then a warning is raised and the user provided "n" is overwritten by the output size of the NN. - - Args: - input_layers (list): a list of input layers, these are RatInABox.Neurons classes and must all be provided at initialisation. - - NeuralNetworkModule (nn.Module): Any nn.Module that has a .forward() method. It is the users job to ensure that the input size of this network matches the total number of neurons in the passed input_layers. The number of neurons in the output of this network will serve as the number of neurons in this layer. The .forward(X) function must accept X as a torch array of shape (n_batch, n_in) and returns a 2D torch array of shape (n_batch, n_out). Note, however, most nn.Modules assume the first dimension is the batch dimension anyway. - """ - - default_params = { - "n":None, #number of neurons in this layer, must match output size of NeuralNetworkModule. Typically leave this as None and let it be set by the output size of the NeuralNetworkModule - "input_layers": [], # a list of input layers, each must be a ratinabox.Neurons class - "NeuralNetworkModule": None, #Any torch nn.Sequential or nn.Module with a .forward() method - "name": "NeuralNetworkNeurons",} - - def __init__(self,Agent,params={}): - self.Agent = Agent - self.params = copy.deepcopy(__class__.default_params) - self.params.update(params) - - super().__init__(Agent, self.params) - - assert isinstance(self.input_layers, list), "param['input_layers'] must be a list of Neurons." - assert len(self.input_layers) > 0, "No input layers have been provided. Hand them in in the params dictionary params['input_layers']=[list,of,inputs]" - - self.n_in = sum([layer.n for layer in self.input_layers]) - - # Check the number of output of the NeuralNetworkModule and self.n are compatible - # in order of preference... - if (self.n is None) and (self.NeuralNetworkModule is not None): - self.n = self.NeuralNetworkModule(torch.zeros(1,self.n_in)).shape[1] - - elif (self.n is not None) and (self.NeuralNetworkModule is None): - self.NeuralNetworkModule = MultiLayerPerceptron(n_in=self.n_in, n_out=self.n, n_hidden=[20,20]) - warnings.warn(f"No NeuralNetworkModule was provided so a default MLP with {self.n_in} inputs, {self.n} outputs and 2 hidden ReLU layers of size 20 will be used. Alternatively provide one in the params['NeuralNetworkModule']= and don't set 'n'.") - - elif (self.n is not None) and (self.NeuralNetworkModule is not None): - raise ValueError(f"You provided both 'n' and `NeuralNetworkModule` as parameters. These are mutually exclusive. Either provide 'NeuralNetworkModule' and no 'n' (the output size of the NeuralNetworkModule will be used as the number of neurons in this layer) or 'n' and no 'NeuralNetworkModule (a default MLP will be initialised)") - - elif (self.n is None) and (self.NeuralNetworkModule is None): - raise ValueError("You provided neither a 'NeuralNetworkModule' nor 'n' as parameters. Either provide 'NeuralNetworkModule' and no 'n' (the output size of the NeuralNetworkModule will be used as the number of neurons in this layer) or 'n' and no 'NeuralNetworkModule (a default MLP will be initialised)") - - - #Finally, check that the NeuralNetworkModule accepts the right sized inputs - try: - self.NeuralNetworkModule(torch.zeros(1,self.n_in)) - except: - raise ValueError(f"You provided inputs layers with a total of {self.n_in} neurons but the NeuralNetworkModule you provided does not accept inputs of size (1,{self.n_in}) so they are incompatible") - - return - - def get_state(self, evaluate_at="last", save_torch=False, **kwargs): - """Returns the firing rate of the NeuralNetworkNeurons. There are two stages: - - 1. First the input layer Neurons are queried for their firing rates. By default, this just takes the last firingrate (i.e. from the last time .update() was called on these Neurons). Alternatively `evaluate_at` and `kwargs` can be set in order to evaluate the inputs in some other way (e.g. at some other location: evaluate_at=None, pos=np.array([x,y])....or an array of all locations evaluate_all='all' etc.). All inputs are concatentated into a single array of shape (n_batch, n_in) where n_in is the total number of input neurons. n_batch is typical 1 (for a single location) but can be more if evaluating at multiple locations (e.g. evaluate_all='all'). - - 2. The concatenated inputs are passed through the NeuralNetworkModule to compute the firing rate of the firing rate of the input layers is established these are concatenated and passeed through the neural network to calculate the output. - - Note the function returns the firingrate as a detached numpy array (i.e. no gradients) but also saves it as an attribute self.firingrate_torch which is a torch tensor and can be used to take gradients. - - Args: - • evaluate_at (str, optional): If "last" (default) then the firing rate of the input layers is taken as the last firing rate (i.e. from the last time .update() was called on these Neurons). Alternatively, this can be set to None in which case the inputs are evaluated at some other location (e.g. evaluate_at=None, pos=np.array([x,y])....or an array of all locations evaluate_all='all' etc.). Defaults to "last". - • save_torch (bool, optional): If True then the torch tensor self.firingrate_torch is saved as an attribute and can be used for optimsation of the network. Defaults to False. - • kwargs: any additional arguments to be passed to the input layer's .get_state() method. - Returns: - firingrate: array of firing rates - """ - #Gets the inputs in the shape (n_batch, n_in) - if evaluate_at=="last": - inputs = np.concatenate([layer.firingrate for layer in self.input_layers]).reshape(1,-1) - else: - #else kick the can down the road and just run get_state on all inputs and concatenate these - # note the convention change of (n_batch, ) - inputs = np.concatenate([layer.get_state(evaluate_at, **kwargs) for layer in self.input_layers]).T - - inputs_torch = torch.Tensor(inputs.astype(np.float32)) - inputs_torch.requires_grad = True - firingrate_torch = self.NeuralNetworkModule(inputs_torch) - if save_torch: - self.firingrate_torch = firingrate_torch # <-- note the shape convention on this in (n_batch, n_neurons, the opposite of RiaB standard - - return firingrate_torch.detach().numpy().T - - - def update(self): - """Updates the neuron but sets save_torch arg to True so self.firingrate_torch will be saved giving user access to the computational graph""" - super().update(save_torch=True) - - - - - - - - - - - - - - - -#An example neural network which will act as the core neural network module at work within for the DeepNeuralNetworkNeurons class -class MultiLayerPerceptron(nn.Module): - """A generic ReLU neural network class, default used for the core function in NeuralNetworkNeurons. - Specify input size, output size and hidden layer sizes (a list). Biases are used by default. - - Args: - n_in (int, optional): The number of input neurons. Defaults to 20. - n_out (int, optional): The number of output neurons. Defaults to 1. - n_hidden (list, optional): A list of integers specifying the number of neurons in each hidden layer. Defaults to [20,20].""" - - def __init__(self, n_in=20, n_out=1, n_hidden=[20,20]): - nn.Module.__init__(self) - n = [n_in] + n_hidden + [n_out] - layers = nn.ModuleList() - for i in range(len(n)-1): - layers.append(nn.Linear(n[i],n[i+1])) - if i < len(n)-2: layers.append(nn.ReLU()) #add a ReLU after each hidden layer (but not the last) - self.net = nn.Sequential(*layers) - - def forward(self, X): - """Forward pass, X must be a torch tensor. Returns an (attached) torch tensor through which you can take gradients. """ - return self.net(X) \ No newline at end of file