diff --git a/neurolib/utils/atlases.py b/neurolib/utils/atlases.py new file mode 100644 index 00000000..d2b4acce --- /dev/null +++ b/neurolib/utils/atlases.py @@ -0,0 +1,325 @@ +""" +Set of anatomical atlases for convenience. + +(c) neurolib-devs +""" + +import logging + + +class BaseAtlas: + """ + Simple base for anatomical atlases. Internal representation as dictionary + {node_num: "name"}. + """ + + name = "" + label = "" + + def __init__(self, atlas): + """ + :param atlas: anatomical atlas / parcellation + :type atlas: dict + """ + if sorted(atlas)[0] != 0: + logging.warning("Atlas doesn't start at 0, reindexing...") + reindexed = {} + for new_idx, old_idx in enumerate(sorted(atlas)): + reindexed[new_idx] = atlas[old_idx] + atlas = reindexed + self.atlas = atlas + + def __len__(self): + return len(self.atlas) + + def __getitem__(self, item): + return self.atlas[item] + + def __str__(self): + return f"{self.name} atlas with {self.no_rois} ROIs." + + @property + def node_names(self): + """ + Return node names in the correct order, i.e. sorted. + """ + return [self[key] for key in sorted(self.atlas)] + + @property + def no_rois(self): + """ + Return number of ROIs in the atlas. + """ + return len(self.atlas) + + def add_rois(self, extra_rois): + """ + Add additional ROIs to the atlas. + + :param extra_rois: ROIs to add to the atlas, must have unique keys + :type extra_rois: dict + """ + for key in extra_rois: + assert key not in self.atlas, f"Node {key} already exists" + self.atlas.update(extra_rois) + + def remove_rois(self, rois_to_remove, reindex=False): + """ + Remove some ROIs from the atlas. Optionally re-index. + + :param rois_to_remove: list of ROIs to remove + :type rois_to_remove: list + :param reindex: whether to reindex ROIs that are still in the atlas + :type reindex: bool + """ + for key in rois_to_remove: + res = self.atlas.pop(key, None) + if res is None: + logging.warning(f"Node {key} not found, doing nothing...") + if reindex: + reindexed = {} + # new indices as order in sorted old dict by keys + for new_idx, old_idx in enumerate(sorted(self.atlas)): + reindexed[new_idx] = self.atlas[old_idx] + self.atlas = reindexed + + +class AutomatedAnatomicalParcellation2(BaseAtlas): + """ + AAL2 atlas. + + Rolls ET, Joliot M, Tzourio-Mazoyer N (2015) Implementation of a new + parcellation of the orbitofrontal cortex in the automated anatomical + abeling atlas. NeuroImage 10.1016/j.neuroimage.2015.07.075. + """ + + name = "Automated anatomical Parcellation 2" + label = "AAL2" + + aal2 = { + 1: "Precentral_L", + 2: "Precentral_R", + 3: "Frontal_Sup_2_L", + 4: "Frontal_Sup_2_R", + 5: "Frontal_Mid_2_L", + 6: "Frontal_Mid_2_R", + 7: "Frontal_Inf_Oper_L", + 8: "Frontal_Inf_Oper_R", + 9: "Frontal_Inf_Tri_L", + 10: "Frontal_Inf_Tri_R", + 11: "Frontal_Inf_Orb_2_L", + 12: "Frontal_Inf_Orb_2_R", + 13: "Rolandic_Oper_L", + 14: "Rolandic_Oper_R", + 15: "Supp_Motor_Area_L", + 16: "Supp_Motor_Area_R", + 17: "Olfactory_L", + 18: "Olfactory_R", + 19: "Frontal_Sup_Medial_L", + 20: "Frontal_Sup_Medial_R", + 21: "Frontal_Med_Orb_L", + 22: "Frontal_Med_Orb_R", + 23: "Rectus_L", + 24: "Rectus_R", + 25: "OFCmed_L", + 26: "OFCmed_R", + 27: "OFCant_L", + 28: "OFCant_R", + 29: "OFCpost_L", + 30: "OFCpost_R", + 31: "OFClat_L", + 32: "OFClat_R", + 33: "Insula_L", + 34: "Insula_R", + 35: "Cingulate_Ant_L", + 36: "Cingulate_Ant_R", + 37: "Cingulate_Mid_L", + 38: "Cingulate_Mid_R", + 39: "Cingulate_Post_L", + 40: "Cingulate_Post_R", + 41: "Hippocampus_L", + 42: "Hippocampus_R", + 43: "ParaHippocampal_L", + 44: "ParaHippocampal_R", + 45: "Amygdala_L", + 46: "Amygdala_R", + 47: "Calcarine_L", + 48: "Calcarine_R", + 49: "Cuneus_L", + 50: "Cuneus_R", + 51: "Lingual_L", + 52: "Lingual_R", + 53: "Occipital_Sup_L", + 54: "Occipital_Sup_R", + 55: "Occipital_Mid_L", + 56: "Occipital_Mid_R", + 57: "Occipital_Inf_L", + 58: "Occipital_Inf_R", + 59: "Fusiform_L", + 60: "Fusiform_R", + 61: "Postcentral_L", + 62: "Postcentral_R", + 63: "Parietal_Sup_L", + 64: "Parietal_Sup_R", + 65: "Parietal_Inf_L", + 66: "Parietal_Inf_R", + 67: "SupraMarginal_L", + 68: "SupraMarginal_R", + 69: "Angular_L", + 70: "Angular_R", + 71: "Precuneus_L", + 72: "Precuneus_R", + 73: "Paracentral_Lobule_L", + 74: "Paracentral_Lobule_R", + 75: "Caudate_L", + 76: "Caudate_R", + 77: "Putamen_L", + 78: "Putamen_R", + 79: "Pallidum_L", + 80: "Pallidum_R", + 81: "Thalamus_L", + 82: "Thalamus_R", + 83: "Heschl_L", + 84: "Heschl_R", + 85: "Temporal_Sup_L", + 86: "Temporal_Sup_R", + 87: "Temporal_Pole_Sup_L", + 88: "Temporal_Pole_Sup_R", + 89: "Temporal_Mid_L", + 90: "Temporal_Mid_R", + 91: "Temporal_Pole_Mid_L", + 92: "Temporal_Pole_Mid_R", + 93: "Temporal_Inf_L", + 94: "Temporal_Inf_R", + 95: "Cerebelum_Crus1_L", + 96: "Cerebelum_Crus1_R", + 97: "Cerebelum_Crus2_L", + 98: "Cerebelum_Crus2_R", + 99: "Cerebelum_3_L", + 100: "Cerebelum_3_R", + 101: "Cerebelum_4_5_L", + 102: "Cerebelum_4_5_R", + 103: "Cerebelum_6_L", + 104: "Cerebelum_6_R", + 105: "Cerebelum_7b_L", + 106: "Cerebelum_7b_R", + 107: "Cerebelum_8_L", + 108: "Cerebelum_8_R", + 109: "Cerebelum_9_L", + 110: "Cerebelum_9_R", + 111: "Cerebelum_10_L", + 112: "Cerebelum_10_R", + 113: "Vermis_1_2", + 114: "Vermis_3", + 115: "Vermis_4_5", + 116: "Vermis_6", + 117: "Vermis_7", + 118: "Vermis_8", + 119: "Vermis_9", + 120: "Vermis_10", + } + + def __init__(self): + super().__init__(self.aal2) + # define convenience regions - indexing from 0!! + self.cerebellum = [region for region in range(94, 120)] + self.thalamus = [80, 81] + self.basal_ganglia = [region for region in range(74, 80)] + self.amygdala = [44, 45] + self.hippocampus = [region for region in range(40, 44)] + self.subcortical = ( + self.cerebellum + + self.thalamus + + self.basal_ganglia + + self.amygdala + + self.hippocampus + ) + + +class DesikanKilliany(BaseAtlas): + """ + Desikan-Killiany atlas. + + Desikan, R. S., Ségonne, F., Fischl, B., Quinn, B. T., Dickerson, B. C., + Blacker, D., et al. (2006). An automated labeling system for + subdividing the human cerebral cortex on MRI scans into gyral based + regions of interest. NeuroImage, 31(3), 968–980. + http://doi.org/10.1016/j.neuroimage.2006.01.021 + """ + + name = "Desikan-Killiany" + label = "DK" + + dk = { + 1: "Banks_superior_temporal_sulcus_L", + 35: "Banks_superior_temporal_sulcus_R", + 2: "Caudal_anterior_cingulate_cortex_L", + 36: "Caudal_anterior_cingulate_cortex_R", + 3: "Caudal_middle_frontal_gyrus_L", + 37: "Caudal_middle_frontal_gyrus_R", + 4: "Cuneus_cortex_L", + 38: "Cuneus_cortex_R", + 5: "Entorhinal_cortex_L", + 39: "Entorhinal_cortex_R", + 6: "Fusiform_gyrus_L", + 40: "Fusiform_gyrus_R", + 7: "Inferior_parietal_cortex_L", + 41: "Inferior_parietal_cortex_R", + 8: "Inferior_temporal_gyrus_L", + 42: "Inferior_temporal_gyrus_R", + 9: "Isthmus_cingulate_cortex_L", + 43: "Isthmus_cingulate_cortex_R", + 10: "Lateral_occipital_cortex_L", + 44: "Lateral_occipital_cortex_R", + 11: "Lateral_orbital_frontal_cortex_L", + 45: "Lateral_orbital_frontal_cortex_R", + 12: "Lingual_gyrus_L", + 46: "Lingual_gyrus_R", + 13: "Medial_orbital_frontal_cortex_L", + 47: "Medial_orbital_frontal_cortex_R", + 14: "Middle_temporal_gyrus_L", + 48: "Middle_temporal_gyrus_R", + 15: "Parahippocampal_gyrus_L", + 49: "Parahippocampal_gyrus_R", + 16: "Paracentral_lobule_L", + 50: "Paracentral_lobule_R", + 17: "Pars_opercularis_L", + 51: "Pars_opercularis_R", + 18: "Pars_orbitalis_L", + 52: "Pars_orbitalis_R", + 19: "Pars_triangularis_L", + 53: "Pars_triangularis_R", + 20: "Pericalcarine_cortex_L", + 54: "Pericalcarine_cortex_R", + 21: "Postcentral_gyrus_L", + 55: "Postcentral_gyrus_R", + 22: "Posterior_cingulate_cortex_L", + 56: "Posterior_cingulate_cortex_R", + 23: "Precentral_gyrus_L", + 57: "Precentral_gyrus_R", + 24: "Precuneus_cortex_L", + 58: "Precuneus_cortex_R", + 25: "Rostral_anterior_cingulate_cortex_L", + 59: "Rostral_anterior_cingulate_cortex_R", + 26: "Rostral_middle_frontal_gyrus_L", + 60: "Rostral_middle_frontal_gyrus_R", + 27: "Superior_frontal_cortex_L", + 61: "Superior_frontal_cortex_R", + 28: "Superior_parietal_cortex_L", + 62: "Superior_parietal_cortex_R", + 29: "Superior_temporal_gyrus_L", + 63: "Superior_temporal_gyrus_R", + 30: "Supramarginal_gyrus_L", + 64: "Supramarginal_gyrus_R", + 31: "Frontal_pole_L", + 65: "Frontal_pole_R", + 32: "Temporal_pole_L", + 66: "Temporal_pole_R", + 33: "Transverse_temporal_cortex_L", + 67: "Transverse_temporal_cortex_R", + 34: "Insula_L", + 68: "Insula_R", + } + + def __init__(self): + super().__init__(self.dk) diff --git a/neurolib/utils/connectivity.py b/neurolib/utils/connectivity.py new file mode 100644 index 00000000..da4e72e6 --- /dev/null +++ b/neurolib/utils/connectivity.py @@ -0,0 +1,449 @@ +""" +Set of tools for brain connectivities. + +(c) neurolib-devs + +# TODO do we do this upper file part? +""" + +import logging +from glob import glob + +import numpy as np + +import networkx as nx +from h5py import File +from scipy.io import loadmat + +from .atlases import BaseAtlas # TODO relative or absolute imports? + +MATLAB_EXT = ".mat" +HDF_EXT = ".h5" + + +class BaseConnectivity: + """ + Represents basic connectivity within the brain according to some atlas. + Implements basic methods. + """ + + connectivity_type = None + label = "" + + @classmethod + def dummy_single_node(cls, self_weight=1.0): + """ + Init class as dummy single node connectivity. + """ + + return cls(np.ones((1, 1)) * self_weight, False, False, False, None) + + @classmethod + def from_mat_file( + cls, + filename, + key=None, + nullify_diagonal=False, + symmetrize=False, + normalize=False, + atlas=None, + ): + """ + Init class from matlab `.mat` file. + """ + loaded_data = loadmat(filename) + if key is None: + key = [ + k for k in list(loaded_data.keys()) if not k.startswith("__") + ][0] + matrix = np.array(loaded_data[key]) + return cls(matrix, nullify_diagonal, symmetrize, normalize, atlas) + + @classmethod + def from_hdf_file( + cls, + filename, + key=None, + nullify_diagonal=False, + symmetrize=False, + normalize=False, + atlas=None, + ): + """ + Init class from hdf `.h5` file. + """ + loaded_data = File(filename, "r") + if key is None: + key = next(iter(loaded_data.keys())) + matrix = np.array(loaded_data[key]) + loaded_data.close() + return cls(matrix, nullify_diagonal, symmetrize, normalize, atlas) + + def __init__( + self, + matrix, + nullify_diagonal=False, + symmetrize=False, + normalize=False, + atlas=None, + ): + """ + Init class with optional preprocessing. + + :param matrix: connectivity matrix - should be 2D + :type matrix: np.ndarray + :param nullify_diagonal: whether to insert zeros at the diagonal, i.e. + at self-connections + :type nullify_diagonal: bool + :param symmetrize: whether to symmetrize the matrix + :type symmetrize: bool + :param normalize: whether to normalize the matrix to unit row and + column sum + :type normalize: bool + :param atlas: brain parcellation atlas, if known + :type atlas: str|tools.atlases.BaseAtlas|None + """ + assert matrix.ndim == 2, "Connectivity must be 2D" + assert ( + matrix.shape[0] == matrix.shape[1] + ), "Non-square matrices not supported" + self.matrix = matrix.astype(np.float) + # optional preprocess + if nullify_diagonal: + np.fill_diagonal(self.matrix, 0.0) + if symmetrize: + self.matrix = self.symmetrize_matrix(self.matrix) + if normalize: + self.matrix = self.normalize_matrix(self.matrix) + + self.graph = None + # if BaseAtlas, check number of nodes + if isinstance(atlas, BaseAtlas): + assert self.no_rois == atlas.no_rois + self.label += f" ({atlas.label})" + elif isinstance(atlas, str): + self.label += f" ({atlas})" + self.atlas = atlas + + def __str__(self): + return ( + f"{self.label} matrix with {self.no_rois} ROIs of " + f"{self.connectivity_type} type." + ) + + @property + def no_rois(self): + """ + Return number of ROIs. + """ + return self.matrix.shape[0] + + @property + def shape(self): + """ + Return full shape of the matrix. + """ + return self.matrix.shape + + @property + def triu_indices(self): + """ + Return upper triangle indices from connectivity matrix. + """ + return np.triu_indices(self.no_rois, k=1) + + @property + def triu_indices_w_diagonal(self): + """ + Return upper triangle indices including the diagonal from connectivity + matrix. + """ + return np.triu_indices(self.no_rois, k=0) + + @staticmethod + def symmetrize_matrix(matrix): + """ + Symmetrize single square matrix. + """ + return (matrix + matrix.transpose()) / 2.0 + + @staticmethod + def normalize_matrix(matrix): + """ + Normalize single square matrix. + """ + d = np.diag(np.power(np.sum(matrix, axis=0), -0.5)) + return np.dot(np.dot(d, matrix), d) + + def adjust_density(self, target_density): + """ + Adjust density of matrix such that it has target density. + + :param target_density: desired density + :type target_density: float + """ + assert ( + 0 < target_density < 1 + ), f"Density must be 0-1; got {target_density}" + # number of needed nonzero elements + nonzero_elements = self.no_rois * (self.no_rois - 1) * target_density + # get threshold element + threshold = np.sort(self.matrix.flatten())[::-1][int(nonzero_elements)] + # threshold the matrix in-place + self.matrix *= self.matrix > threshold + + def create_nx_graph(self, threshold=0.0, directed=False): + """ + Create a networkx DiGraph representation from matrix. + + :param threshold: whether to threshold adjacency matrix + :type threshold: float + :param directed: whether to get directed or undirected graph + :type directed: bool + """ + graph = nx.DiGraph(self.matrix * (self.matrix > threshold)) + if directed: + self.graph = graph + else: + self.graph = graph.to_undirected() + + +class ConnectivityEnsemble: + """ + Represents an ensemble of connectivites, typically from more subjects. + """ + + @classmethod + def from_folder( + cls, + path, + file_type, + connectivity_type, + key=None, + nullify_diagonal=False, + symmetrize=False, + normalize=False, + atlas=None, + ): + """ + Load connectivity ensemble of given type from folder. + + :param path: path to connectivities + :type path: str + :param file_type: type of files from which to load connectivities: + "mat" or "h5" + :param connectivity_type: type of connectivities + :type connectivity_type: str + """ + if connectivity_type == "structural": + base_type = StructuralConnectivity + elif connectivity_type == "functional": + base_type = FunctionalConnectivity + elif connectivity_type == "fiber_lengths": + base_type = FiberLengthConnectivity + elif connectivity_type == "delay": + base_type = DelayMatrix + else: + raise ValueError(f"Unknown connectivity type: {connectivity_type}") + + if file_type == "mat": + ext = MATLAB_EXT + loading_function = base_type.from_mat_file + elif file_type == "h5": + ext = HDF_EXT + loading_function = base_type.from_hdf_file + + connectivities = [] + for conn_file in glob(f"{path}/*{ext}"): + connectivities.append( + loading_function( + conn_file, + key=key, + nullify_diagonal=nullify_diagonal, + symmetrize=symmetrize, + normalize=normalize, + atlas=atlas, + ) + ) + + return cls(connectivities) + + def __init__(self, matrices): + """ + :param matrices: list of connectivites per subject + :type matrices: list + """ + assert all(isinstance(matrix, BaseConnectivity) for matrix in matrices) + shapes = set(matrix.shape for matrix in matrices) + assert len(shapes) == 1, f"Different shapes found: {shapes}" + self.grand_matrix = np.dstack( + [matrix.matrix for matrix in matrices] + ).transpose(2, 0, 1) + # save original matrices as a list for convenience + self.original_matrices = matrices + atlases = set(matrix.atlas for matrix in matrices) + if len(atlases) == 1: + self.atlas = matrices[0].atlas + else: + logging.warning( + f"Multiple atlases found: {atlases}, not sure what to do..." + ) + self.atlas = None + + def rebuild_grand_matrix(self): + """ + Rebuild grand matrix, e.g. when doing some operation on individual + matrices (stored in `original_matrices`) - any operation like this must + be done inplace on the list with orig. matrices. + """ + self.grand_matrix = np.dstack( + [matrix.matrix for matrix in self.original_matrices] + ).transpose(2, 0, 1) + + @property + def no_rois(self): + """ + Return number of ROIs. + """ + return self.grand_matrix.shape[1] + + @property + def no_subjects(self): + """ + Return number of subject. + """ + return self.grand_matrix.shape[0] + + @property + def mean_connectivity(self): + """ + Return mean connectivity matrix. + """ + return np.nanmean(self.grand_matrix, axis=0) + + @property + def median_connectivity(self): + """ + Return median connectivity matrix. + """ + return np.nanmedian(self.grand_matrix, axis=0) + + +class StructuralConnectivity(BaseConnectivity): + """ + Structural connectivity matrix, usually from DTI measurement. + """ + + connectivity_type = "structural" + label = "SC" + + def scale_to_relative(self): + """ + Scale SC matrix to relative strengths between 0 and 1. + """ + self.matrix /= np.max(self.matrix) + + +class FunctionalConnectivity(BaseConnectivity): + """ + Functional connectivity matrix. + """ + + connectivity_type = "functional" + label = "FC" + + @classmethod + def from_timeseries( + cls, timeseries, fc_type="corr", time_as_row=True, atlas=None + ): + """ + Init functional connectivity from timeseries. + + :param timeseries: timeseries for FC + :fc_type timeseries: np.ndarray + :param fc_type: type of functional connectivity, for now corr or cov + supported + :type fc_type: str + :param time_as_row: whether shape of the array is time x ROI or + transpossed + :type time_as_row: bool + :param atlas: brain parcellation atlas, if known + :type atlas: str|tools.atlases.BaseAtlas|None + """ + assert timeseries.ndim == 2, "Only 2D matrices supported" + if fc_type == "corr": + matrix = np.corrcoef(timeseries, rowvar=not time_as_row) + elif fc_type == "cov": + matrix = np.cov(timeseries, rowvar=not time_as_row) + else: + raise ValueError(f"Unknown FC type: {fc_type}") + + return cls(matrix, False, False, False, atlas=atlas) + + +class FiberLengthConnectivity(BaseConnectivity): + """ + Matrix representing fiber lengths in segments. + """ + + connectivity_type = "fiber_length" + label = "FibLen" + + def transform_to_delay_matrix(self, signal_velocity, segment_length=1.0): + """ + Transform fiber length matrix into delay matrix. Delay matrix unit is + ms. + + :param signal_velocity: signal velocity in m/s + :type signal_velocity: float + :param segment_length: length of a single segment in mm + :type segment_length: float + :return: initialized delay matrix with delays in ms + :rtype: `DelayMatrix` + """ + length_matrix = self.matrix * segment_length + if signal_velocity > 0.0: + matrix = length_matrix / signal_velocity + elif signal_velocity == 0.0: + matrix = np.zeros_like(self.matrix) + else: + raise ValueError("Cannot do negative signal velocity") + return DelayMatrix(matrix, False, False, False, atlas=self.atlas) + + +class DelayMatrix(BaseConnectivity): + """ + Matrix representing delay in ms. + """ + + connectivity_type = "delay" + label = "Delay" + + @property + def max_delay(self): + """ + Return max delay. + """ + return np.max(self.matrix) + + def unit_delay_diagonal(self, delay): + """ + Insert given delay at the diagonal. + + :param delay: delay on diagonal, hence self-connection, in ms + :type delay: float + """ + np.fill_diagonal(self.matrix, delay) + + def transform_to_multiples_dt(self, dt, dt_in_seconds=True): + """ + Transform delay matrix in ms to multiple of dts for simulation. + + :param dt: dt for the simulation, in ms or s + :type dt: float + :param dt_in_seconds: whether passed dt is in seconds + :type dt_in_seconds: bool + """ + if dt_in_seconds: + dt *= 1000.0 + self.matrix = np.around(self.matrix / dt).astype(int) diff --git a/requirements.txt b/requirements.txt index 4d55b40a..e7401309 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,4 @@ numpy scipy pytest flake8 +networkx diff --git a/tests/test_models.py b/tests/models/test_models.py similarity index 61% rename from tests/test_models.py rename to tests/models/test_models.py index 4edb0b92..139d79e2 100644 --- a/tests/test_models.py +++ b/tests/models/test_models.py @@ -1,5 +1,3 @@ -import logging -import time import unittest from neurolib.models import aln, hopf @@ -12,30 +10,19 @@ class TestALNModel(unittest.TestCase): """ def test_single_node(self): - logging.info("\t > ALN: Testing single node ...") - start = time.time() - alnModel = aln.ALNModel() alnModel.params["duration"] = 2.0 * 1000 alnModel.params["sigma_ou"] = 0.1 # add some noise alnModel.run() - end = time.time() - logging.info("\t > Done in {:.2f} s".format(end - start)) def test_network(self): - logging.info("\t > ALN: Testing brain network (chunkwise integration and BOLD" " simulation) ...") - start = time.time() - ds = Dataset("gw") - alnModel = aln.ALNModel(Cmat=ds.Cmat, Dmat=ds.Dmat, simulateBOLD=True) # in ms, simulates for 5 minutes alnModel.params["duration"] = 10 * 1000 alnModel.run() - end = time.time() - logging.info("\t > Done in {:.2f} s".format(end - start)) class TestHopfModel(unittest.TestCase): @@ -44,19 +31,13 @@ class TestHopfModel(unittest.TestCase): """ def test_single_node(self): - logging.info("\t > Hopf: Testing single node ...") - start = time.time() hopfModel = hopf.HopfModel() hopfModel.params["duration"] = 2.0 * 1000 hopfModel.params["sigma_ou"] = 0.03 hopfModel.run() - end = time.time() - logging.info("\t > Done in {:.2f} s".format(end - start)) def test_network(self): - logging.info("\t > Hopf: Testing brain network (chunkwise integration and BOLD" " simulation) ...") - start = time.time() ds = Dataset("gw") hopfModel = hopf.HopfModel(Cmat=ds.Cmat, Dmat=ds.Dmat, simulateBOLD=True) hopfModel.params["w"] = 1.0 @@ -66,8 +47,6 @@ def test_network(self): hopfModel.params["K_gl"] = 0.6 hopfModel.run() - end = time.time() - logging.info("\t > Done in {:.2f} s".format(end - start)) if __name__ == "__main__": diff --git a/tests/utils/test_connectivity.py b/tests/utils/test_connectivity.py new file mode 100644 index 00000000..65ff6ef2 --- /dev/null +++ b/tests/utils/test_connectivity.py @@ -0,0 +1,266 @@ +""" +Set of basic tests for connectivity, i.e. connectome matrices. + +(c) neurolib-devs +""" + +import os +import unittest + +import numpy as np + +import networkx as nx +from neurolib.utils.connectivity import ( + BaseConnectivity, + ConnectivityEnsemble, + DelayMatrix, + FiberLengthConnectivity, + FunctionalConnectivity, + StructuralConnectivity, +) + +TEST_FOLDER = os.path.join( + os.path.dirname(os.path.abspath(__file__)), "test_results" +) + + +class TestBaseConnectivity(unittest.TestCase): + """ + Basic tests for connectivity matrices. + """ + + dummy_matrix = np.array([[1.32, 0.7], [-0.3, 1.3]]) + symm_matrix = np.array([[1.32, 0.2], [0.2, 1.3]]) + norm_matrix = np.array([[1.29411765, 0.49009803], [-0.21004201, 0.65]]) + adj_density_matrix = np.array([[1.32, 0.0], [-0.0, 0.0]]) + + def test_init(self): + mat = BaseConnectivity(self.dummy_matrix) + self.assertEqual(mat.no_rois, 2) + self.assertEqual(mat.shape, (2, 2)) + np.testing.assert_equal(mat.matrix, self.dummy_matrix) + np.testing.assert_equal(mat.matrix[mat.triu_indices], np.array([0.7])) + np.testing.assert_equal( + mat.matrix[mat.triu_indices_w_diagonal], np.array([1.32, 0.7, 1.3]) + ) + + def test_load_from_file(self): + # set connectivity + mat = BaseConnectivity(self.dummy_matrix) + # load from mat file + from_file = BaseConnectivity.from_mat_file( + os.path.join(TEST_FOLDER, "dummy.mat"), key="mat_test" + ) + np.testing.assert_equal(mat.matrix, from_file.matrix) + # load from h5 file + from_file = BaseConnectivity.from_hdf_file( + os.path.join(TEST_FOLDER, "dummy.h5"), key="h5_test" + ) + np.testing.assert_equal(mat.matrix, from_file.matrix) + + def test_single_node(self): + weight = 8 + mat = BaseConnectivity.dummy_single_node(weight) + np.testing.assert_equal(mat.matrix, np.atleast_2d(weight)) + + def test_basic_functions(self): + mat = BaseConnectivity(self.dummy_matrix) + # assert basic functions + np.testing.assert_almost_equal( + mat.symmetrize_matrix(mat.matrix), self.symm_matrix + ) + np.testing.assert_almost_equal( + mat.normalize_matrix(mat.matrix), self.norm_matrix + ) + + def test_nx_graph(self): + # test graph creation + mat = BaseConnectivity(self.dummy_matrix, symmetrize=True) + mat.create_nx_graph() + graph = nx.Graph(mat.matrix) + self.assertTrue(isinstance(mat.graph, nx.Graph)) + # compare graphs using isomorphism + em = nx.algorithms.isomorphism.numerical_edge_match("weight", 1) + self.assertTrue(nx.is_isomorphic(graph, mat.graph, edge_match=em)) + + def test_adjust_density(self): + mat = BaseConnectivity(self.dummy_matrix) + mat.adjust_density(target_density=0.5) + np.testing.assert_equal(mat.matrix, self.adj_density_matrix) + + +class TestStructuralConnectivity(unittest.TestCase): + """ + Tests for structural connectivity matrix. + """ + + dummy_matrix = np.array([[1.32, 0.7], [-0.3, 1.3]]) + scaled_matrix = np.array([[1.0, 0.53030303], [-0.22727273, 0.98484848]]) + + def test_scale_to_relative(self): + mat = StructuralConnectivity(self.dummy_matrix) + mat.scale_to_relative() + np.testing.assert_almost_equal(mat.matrix, self.scaled_matrix) + + +class TestFunctionalConnectivity(unittest.TestCase): + """ + Tests for functional connectivity matrix. + """ + + means = [0.5, -0.5, 1.4] + cov_mat = np.array([[1.2, -0.7, 1.6], [-0.7, 0.5, 2.3], [1.6, 2.3, 0.83]]) + + def test_init_from_timeseries(self): + np.random.seed(42) + timeseries = np.random.multivariate_normal( + self.means, self.cov_mat, size=(10000) + ) + # covariance FC + fc_cov = FunctionalConnectivity.from_timeseries( + timeseries, time_as_row=True, fc_type="cov" + ) + # compute covariance as XX^T + x_tilde = timeseries - np.mean(timeseries, axis=0) + cov_computed = x_tilde.T.dot(x_tilde) / x_tilde.shape[0] + np.testing.assert_almost_equal(fc_cov.matrix, cov_computed, decimal=3) + # correlation FC + fc_corr = FunctionalConnectivity.from_timeseries( + timeseries, time_as_row=True, fc_type="corr" + ) + # compute correlation from cov + diag = np.diag(np.sqrt(np.diag(cov_computed))) # sqrt(diagonal) matrix + corr_computed = np.dot( + np.dot(np.linalg.inv(diag), cov_computed), np.linalg.inv(diag) + ) + np.testing.assert_almost_equal(fc_corr.matrix, corr_computed) + + +class TestFiberLengthConnectivity(unittest.TestCase): + """ + Tests for fiber lengths matrix. + """ + + fiber_mat = np.array([[0.0, 3.2, 12.3], [3.2, 0.0, 0.9], [12.3, 0.9, 0.0]]) + sig_velocity = 7.5 # m/s + + def test_transform_to_delay_matrix(self): + fib_conn = FiberLengthConnectivity(self.fiber_mat) + # signal velocity is 7.5m/s with each segment being 1mm long + delay_mat = fib_conn.transform_to_delay_matrix( + signal_velocity=self.sig_velocity, segment_length=1.0 + ) + self.assertEqual(delay_mat.connectivity_type, "delay") + np.testing.assert_equal( + delay_mat.matrix, self.fiber_mat / self.sig_velocity + ) + + +class TestDelayMatrix(unittest.TestCase): + """ + Tests for delay matrix. + """ + + fiber_mat = np.array([[0.0, 3.2, 12.3], [3.2, 0.0, 0.9], [12.3, 0.9, 0.0]]) + sig_velocity = 7.5 # m/s + diagonal_delay = 0.06 # ms + dt = 1e-6 # s + + def test_max_delay(self): + delay_mat = DelayMatrix(self.fiber_mat / self.sig_velocity) + # check maximal delay + self.assertTrue( + delay_mat.max_delay, np.max(self.fiber_mat / self.sig_velocity) + ) + + def test_unit_delay_diagonal(self): + delay_mat = DelayMatrix(self.fiber_mat / self.sig_velocity) + delay_mat.unit_delay_diagonal(self.diagonal_delay) + # check delay matrix diagonal + np.testing.assert_equal( + np.diag(delay_mat.matrix), + np.ones(self.fiber_mat.shape[0]) * self.diagonal_delay, + ) + + def test_transform_to_multiples_dt(self): + delay_mat = DelayMatrix(self.fiber_mat / self.sig_velocity) + delay_mat.transform_to_multiples_dt(dt=self.dt, dt_in_seconds=True) + np.testing.assert_equal( + delay_mat.matrix, + np.around( + (self.fiber_mat / self.sig_velocity) / (1000 * self.dt) + ).astype(np.int), + ) + + +class TestConnectivityEnsamble(unittest.TestCase): + """ + Basic tests for connectivity ensemble. + """ + + def test_load_from_folder(self): + for file_type, key in zip(["mat", "h5"], ["mat_test", "h5_test"]): + conn_ensemble = ConnectivityEnsemble.from_folder( + TEST_FOLDER, + file_type=file_type, + connectivity_type="structural", + key=key, + ) + self.assertEqual(conn_ensemble.no_rois, 2) + self.assertEqual(conn_ensemble.no_subjects, 1) + self.assertTrue( + all( + isinstance(conn, StructuralConnectivity) + for conn in conn_ensemble.original_matrices + ) + ) + + def test_aggregated_conns(self): + conn_ensemble = ConnectivityEnsemble.from_folder( + TEST_FOLDER, + file_type="mat", + connectivity_type="structural", + key="mat_test", + ) + # since we have only one subject, the mean is equal to that one subject + np.testing.assert_equal( + conn_ensemble.mean_connectivity, + conn_ensemble.original_matrices[0].matrix, + ) + np.testing.assert_equal( + conn_ensemble.median_connectivity, + conn_ensemble.original_matrices[0].matrix, + ) + + def test_rebuild_grand_matrix(self): + conn_ensemble = ConnectivityEnsemble.from_folder( + TEST_FOLDER, + file_type="mat", + connectivity_type="structural", + key="mat_test", + ) + self.assertEqual(conn_ensemble.no_subjects, 1) + # create new matrix by added 0.5 to existing one + new_matrix = conn_ensemble.original_matrices[0].matrix.copy() + 0.5 + # create structural connectivity out of it + new_conn = StructuralConnectivity(new_matrix) + # append to ensemble + conn_ensemble.original_matrices.append(new_conn) + # rebuild grand + conn_ensemble.rebuild_grand_matrix() + # check we have more subjects + self.assertEqual(conn_ensemble.no_subjects, 2) + # check mean and median + manual_conn = np.dstack( + [conn_ensemble.original_matrices[0].matrix, new_matrix] + ).transpose(2, 0, 1) + np.testing.assert_equal( + conn_ensemble.mean_connectivity, np.mean(manual_conn, axis=0) + ) + np.testing.assert_equal( + conn_ensemble.median_connectivity, np.median(manual_conn, axis=0) + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/utils/test_results/dummy.h5 b/tests/utils/test_results/dummy.h5 new file mode 100644 index 00000000..7e580939 Binary files /dev/null and b/tests/utils/test_results/dummy.h5 differ diff --git a/tests/utils/test_results/dummy.mat b/tests/utils/test_results/dummy.mat new file mode 100644 index 00000000..4467927c Binary files /dev/null and b/tests/utils/test_results/dummy.mat differ