Source code for comet.connectivity

import random
import numpy as np
from tqdm import tqdm
from tqdm_joblib import tqdm_joblib
from matplotlib import pyplot as plt
from typing import Literal, Union
from abc import ABCMeta, abstractmethod
from joblib import Parallel, delayed
from threadpoolctl import threadpool_limits
from statsmodels.stats.weightstats import DescrStatsW
from pycwt import cwt, Morlet
from hmmlearn import hmm
from ksvd import ApproximateKSVD
from sklearn.cluster import KMeans
from sklearn.metrics import mutual_info_score
from sklearn.covariance import LedoitWolf
from scipy.stats import zscore
from scipy.spatial import distance
from scipy.signal import windows, hilbert
from scipy.linalg import eigh, solve, det, inv
from scipy.optimize import minimize

"""
SECTION: Class template for all dynamic functional connectivity methods.
         New methods should be implemented as child classes.
"""
[docs] class ConnectivityMethod(metaclass=ABCMeta): """ Base class for all dynamic functional connectivity methods. Attributes ---------- time_series : np.ndarray Time series data. T : int Number of timepoints. P : int Number of parcels. diagonal : int or float Value to set on the diagonal of connectivity matrices. fisher_z : bool Whether to apply Fisher z-transformation. tril : bool Whether to return only the lower triangle of the matrices. """ def __init__(self, time_series, diagonal=0, fisher_z=False, tril=False): """ Initializes the ConnectivityMethod with the given parameters. Parameters ---------- time_series : np.ndarray The input time series data. diagonal : int or float, optional Value to set on the diagonal of connectivity matrices. Default is 0. fisher_z : bool, optional Whether to apply Fisher z-transformation. Default is False. tril : bool, optional Whether to return only the lower triangle of the matrices. Default is False. """ self.time_series = time_series self.diagonal = diagonal self.fisher_z = fisher_z self.tril = tril # Shape checks and conversion to array if isinstance(self.time_series, list) or (isinstance(self.time_series, np.ndarray) and self.time_series.dtype == object): if len(self.time_series) == 0: raise ValueError("time_series is empty.") seq = [np.asarray(x, dtype="float32") for x in self.time_series] shapes = {a.shape for a in seq} if not all(a.ndim == 2 for a in seq): raise ValueError("All subject time series must be 2D arrays (T, P).") if len(shapes) != 1: raise ValueError(f"All subject time series must have the same shape (T, P); got {shapes}.") self.time_series = np.stack(seq, axis=0) # (S, T, P) # Case B: proper ndarray → just ensure float32 elif isinstance(self.time_series, np.ndarray): self.time_series = self.time_series.astype("float32", copy=False) else: raise ValueError("time_series must be a 2D ndarray, 3D ndarray, or a list of 2D ndarrays.") # Prepare the data and create some variables self.time_series = self.time_series.astype("float32") if np.ndim(self.time_series) == 3 : self.n_subjects = self.time_series.shape[0] self.T = self.time_series.shape[1] # Timepoints self.P = self.time_series.shape[2] # Parcels (brain regions) self.time_series3D = self.time_series # Reshape the data to 2D by stacking all subjects (n_subjects * T, P) self.time_series = self.time_series.reshape(-1, self.time_series.shape[-1]) elif np.ndim(self.time_series) == 2: self.n_subjects = 1 self.T = self.time_series.shape[0] self.P = self.time_series.shape[1] self.time_series3D = self.time_series.reshape(1, self.T, self.P) else: raise ValueError("Input data must be either a 2D array, 3D array, or a list of 2D arrays.") return # All subclasses must implement an estimate() method and should use postproc() for simple post-processing
[docs] @abstractmethod def estimate(self): """ Abstract method to compute the connectivity matrix. This method should be implemented in each child class. Raises ------ NotImplementedError If the method is not implemented in the child class. """ raise NotImplementedError("This method should be implemented in each child class.")
[docs] def postproc(self): """ Post-process the connectivity matrix with optional Fisher z-transformation, z-standardization, diagonal setting, and lower triangle extraction. Returns ------- np.ndarray The post-processed connectivity matrix. """ if hasattr(self, "dfc"): dfc = self.dfc elif hasattr(self, "fc"): dfc = self.fc else: raise AttributeError("No connectivity estimates available.") # Fisher z-transformation if self.fisher_z: dfc = np.clip(dfc, -1 + np.finfo(float).eps, 1 - np.finfo(float).eps) dfc = np.arctanh(dfc) # Set main diagonal if len(dfc.shape) == 3: for estimate in range(dfc.shape[2]): np.fill_diagonal(dfc[:,:, estimate], self.diagonal) else: np.fill_diagonal(dfc, self.diagonal) # Get lower triangle to save space if self.tril: if len(dfc.shape) == 3: mask = np.tril(dfc[:, :, 0], k=-1) != 0 dfc = np.array([matrix_2d[mask] for matrix_2d in dfc.transpose(2, 0, 1)]) else: mask = np.tril(dfc, k=-1) != 0 dfc = dfc[mask] return dfc
""" SECTION: Continuous dFC methods """
[docs] class SlidingWindow(ConnectivityMethod): """ Sliding Window connectivity method. This is the most widely used dynamic functional connectivity method. It involves sliding a window over the data. Covariance is estimated for each windowed section. Parameters ---------- time_series : np.ndarray The input time series data. windowsize : int, optional Size of the sliding window. Default is 29. stepsize : int, optional Step size for sliding the window. Default is 1. shape : {'rectangular', 'gaussian', 'hamming'}, optional Shape of the window. Default is 'rectangular'. std : float, optional Standard deviation for the Gaussian window. Default is 10. diagonal : int, optional Value to set on the diagonal of connectivity matrices. Default is 0. fisher_z : bool, optional Whether to apply Fisher z-transformation. Default is False. tril : bool, optional Whether to return only the lower triangle of the matrices. Default is False. """ name = "CONT Sliding Window" def __init__(self, time_series: np.ndarray, windowsize: int = 29, stepsize: int = 1, shape: Literal["rectangular", "gaussian", "hamming"] = "rectangular", std: float = 10, diagonal: int = 0, fisher_z: bool = False, tril: bool = False): super().__init__(time_series, diagonal, fisher_z, tril) self.windowsize = windowsize self.stepsize = stepsize self.shape = shape self.std = std self.N_estimates = (self.T - self.windowsize) // self.stepsize + 1 # N possible estimates given the window and step size self.dfc = np.full((self.P,self.P, self.N_estimates), np.nan) if not self.windowsize <= self.T: raise ValueError("windowsize is larger than time series") def _weights(self): """ Create weights for sliding window. The window shape can be: - rectangular - gaussian - hamming Returns ------- np.ndarray Weights for each windowed section. """ weights_init = np.zeros(self.T) if self.shape == 'rectangular': weights_init[0:self.windowsize] = np.ones(self.windowsize) if self.shape == 'gaussian': weights_init[0:self.windowsize] = windows.gaussian(self.windowsize, self.std) if self.shape == 'hamming': weights_init[0:self.windowsize] = windows.hamming(self.windowsize) weights = np.array([np.roll(weights_init, i) for i in range(0, self.T + 1 - self.windowsize, self.stepsize)]) return weights
[docs] def centers(self): """ Calculate the central index of each window so dynamic functional connectivity (dFC) estimates can be related to the original time series. Returns ------- np.ndarray Central index of each window. """ centers = np.arange(self.windowsize // 2, self.T - self.windowsize // 2, self.stepsize) return centers
[docs] def estimate(self): """ Calculate sliding window correlation. Returns ------- np.ndarray Dynamic functional connectivity as a PxPxN array. """ weights = self._weights() for estimate in range(self.N_estimates): self.dfc[:,:,estimate] = DescrStatsW(self.time_series, weights[estimate, :]).corrcoef self.dfc = self.postproc() return self.dfc
[docs] class Jackknife(ConnectivityMethod): """ Jackknife correlation method. Parameters ---------- time_series : np.ndarray The input time series data. windowsize : int, optional Size of the sliding window. Default is 1. stepsize : int, optional Step size for sliding the window. Default is 1. diagonal : int, optional Value to set on the diagonal of connectivity matrices. Default is 0. fisher_z : bool, optional Whether to apply Fisher z-transformation. Default is False. tril : bool, optional Whether to return only the lower triangle of the matrices. Default is False. References ---------- Richter CG, Thompson WH, Bosman CA, Fries P. A jackknife approach to quantifying single-trial correlation between covariance-based metrics undefined on a single-trial basis. https://doi.org/10.1016/j.neuroimage.2015.04.040 """ name = "CONT Jackknife Correlation" def __init__(self, time_series: np.ndarray, windowsize: int = 1, stepsize: int = 1, diagonal: int = 0, fisher_z: bool = False, tril: bool = False): super().__init__(time_series, diagonal, fisher_z, tril) self.windowsize = windowsize self.stepsize = stepsize self.N_estimates = (self.T - self.windowsize) // self.stepsize + 1 # N possible estimates given the window size self.dfc = np.full((self.P,self.P, self.N_estimates), np.nan) if not self.windowsize <= self.T: raise ValueError("windowsize is larger than time series") def _weights(self): """ Create logical weight vectors for jackknife correlation. Example: 01111 -> 10111 -> 11011 -> 11101 -> 11110 Returns ------- np.ndarray Boolean weights for each windowed section. """ weights_init = np.ones(self.T) weights_init[0:self.windowsize] = 0 weights = np.array([np.roll(weights_init, i) for i in range(0, self.T + 1 - self.windowsize, self.stepsize)]) return weights.astype(bool)
[docs] def centers(self): """ Calculate the central index of each window so dynamic functional connectivity (dFC) estimates can be related to the original time series. Returns ------- np.ndarray Central index of each window. """ centers = np.arange(self.windowsize // 2, self.T - self.windowsize // 2, self.stepsize) return centers
[docs] def estimate(self): """ Calculate jackknife correlation. Returns ------- np.ndarray Dynamic functional connectivity as a PxPxN array. """ weights = self._weights() for estimate in range(self.N_estimates): ts_jackknife = self.time_series[weights[estimate,:],:] self.dfc[:,:,estimate] = np.corrcoef(ts_jackknife.T) * -1 # correlation estimation and sign inversion self.dfc = self.postproc() return self.dfc
[docs] class SpatialDistance(ConnectivityMethod): """ Spatial Distance connectivity method. Parameters ---------- time_series : np.ndarray The input time series data. dist : {'euclidean', 'cosine', 'cityblock'}, optional Type of distance metric to use. Default is 'euclidean'. diagonal : int, optional Value to set on the diagonal of connectivity matrices. Default is 0. fisher_z : bool, optional Whether to apply Fisher z-transformation. Default is False. tril : bool, optional Whether to return only the lower triangle of the matrices. Default is False. References ---------- William Hedley Thompson, Per Brantefors, Peter Fransson. From static to temporal network theory: Applications to functional brain connectivity. https://doi.org/10.1162/NETN_a_00011 """ name = "CONT Spatial Distance" def __init__(self, time_series: np.ndarray, dist: Literal["euclidean", "cosine", "cityblock"] = "euclidean", diagonal: int = 0, fisher_z: bool = False, tril: bool = False): super().__init__(time_series, diagonal, fisher_z, tril) self.distance = self._distance_functions(dist) self.N_estimates = self.T self.dfc = np.full((self.P,self.P, self.N_estimates), np.nan) def _distance_functions(self, dist): """ Get the distance function based on the specified metric. Parameters ---------- dist : str The type of distance metric to use. Returns ------- function The corresponding distance function. """ options = {'euclidean': distance.euclidean, 'cosine' : distance.cosine, 'cityblock': distance.cityblock} return options[dist] def _weights(self): """ Calculate adjacency matrix for spatial distance. Returns ------- np.ndarray The spatial distance adjacency matrix. """ weights = np.array([self.distance(self.time_series[n, :], self.time_series[t, :]) for n in np.arange(0, self.T) for t in np.arange(0, self.T)]) weights = np.reshape(weights, [self.T, self.T]) np.fill_diagonal(weights, np.nan) weights = 1 / weights weights = (weights - np.nanmin(weights)) / (np.nanmax(weights) - np.nanmin(weights)) np.fill_diagonal(weights, 1) return weights
[docs] def estimate(self): """ Calculate spatial distance correlation. Returns ------- np.ndarray Dynamic functional connectivity as a PxPxN array. """ weights = self._weights() # in this case this is the distance matrix for estimate in range(self.N_estimates): self.dfc[:,:,estimate] = DescrStatsW(self.time_series, weights[estimate, :]).corrcoef self.dfc = self.postproc() return self.dfc
[docs] class TemporalDerivatives(ConnectivityMethod): """ Multiplication of Temporal Derivatives connectivity method. Parameters ---------- time_series : np.ndarray The input time series data. windowsize : int, optional Size of the sliding window. Default is 7. diagonal : int, optional Value to set on the diagonal of connectivity matrices. Default is 0. fisher_z : bool, optional Whether to apply Fisher z-transformation. Default is False. tril : bool, optional Whether to return only the lower triangle of the matrices. Default is False. References ---------- Shine JM, Koyejo O, Bell PT, Gorgolewski KJ, Gilat M, Poldrack RA. Estimation of dynamic functional connectivity using Multiplication of Temporal Derivatives. https://doi.org/10.1016/j.neuroimage.2015.07.064. """ name = "CONT Multiplication of Temporal Derivatives" def __init__(self, time_series: np.ndarray, windowsize: int = 7, diagonal: int = 0, fisher_z: bool = False, tril: bool = False): super().__init__(time_series, diagonal, fisher_z, tril) self.windowsize = windowsize self.N_estimates = self.T - self.windowsize self.dfc = np.full((self.P,self.P, self.N_estimates), np.nan) if not self.windowsize <= self.T: raise ValueError("windowsize is larger than time series")
[docs] def centers(self): """ Calculate the central index of each window so dynamic functional connectivity (dFC) estimates can be related to the original time series. Returns ------- np.ndarray Central index of each window. """ centers = np.arange(self.windowsize // 2 + 1, self.T - self.windowsize // 2) return centers
[docs] def estimate(self): """ Calculate multiplication of temporal derivatives. Returns ------- np.ndarray Dynamic functional connectivity as a PxPxN array. """ derivatives = self.time_series[1:, :] - self.time_series[:-1, :] derivatives = derivatives / np.std(derivatives, axis=0) coupling = np.array([derivatives[:, i] * derivatives[:, j] for i in range(self.P) for j in range(self.P)]) # multiplicative coupling # Convolve with rectangular kernel for smoothing kernel = np.ones(self.windowsize) / self.windowsize smooth_coupling = np.full((self.P * self.P, self.N_estimates), np.nan) for i in range(self.P * self.P): smooth_coupling[i,:] = np.convolve(coupling[i,:], kernel, mode='valid') self.dfc = np.reshape(smooth_coupling, [self.P, self.P, self.N_estimates]) #reshape into PxPxN connectivity matrix self.dfc = self.postproc() return self.dfc
[docs] class FlexibleLeastSquares(ConnectivityMethod): """ Flexible Least Squares connectivity method. This implementation estimates dynamic functional connectivity using the Flexible Least Squares (FLS) algorithm as described in the DynamicBC toolbox. For each source region i, the FLS linear system is built once and solved against all target time series simultaneously. Parallel execution across regions is supported via joblib. Parameters ---------- time_series : np.ndarray The input time series data of shape (T, P), where T is the number of time points and P the number of regions. standardizeData : bool, optional Whether to standardize the time series column-wise before estimation. Default is True. mu : float, optional Regularization parameter controlling the smoothness of betas. Default is 100.0. num_cores : int, optional Number of CPU cores to use for parallel processing. Set to 1 for single-core execution. Default is 4. progress_bar : bool, optional Whether to display a progress bar during estimation. Default is True. diagonal : int, optional Value to set on the diagonal of connectivity matrices. Default is 0. fisher_z : bool, optional Whether to apply Fisher z-transformation in post-processing. Default is False. tril : bool, optional Whether to return only the lower triangle of the matrices. Default is False. Attributes ---------- dfc : np.ndarray The dynamic connectivity estimates of shape (P, P, T). N_estimates : int Number of estimates (equals the number of time points T). References ---------- Liao, W., Wu, G. R., Xu, Q., Ji, G. J., Zhang, Z., Zang, Y. F., & Lu, G. (2014). DynamicBC: a MATLAB toolbox for dynamic brain connectome analysis. Brain Connectivity, 4(10), 780–790. https://doi.org/10.1089/brain.2014.0253 """ name = "CONT Flexible Least Squares" def __init__(self, time_series: np.ndarray, standardizeData: bool = True, mu: float = 100.0, num_cores: int = 4, progress_bar: bool = True, diagonal: int = 0, fisher_z: bool = False, tril: bool = False): super().__init__(time_series, diagonal, fisher_z, tril) self.standardizeData = bool(standardizeData) self.mu = float(mu) self.num_cores = int(num_cores) self.progress_bar = bool(progress_bar) self.N_estimates = self.T self.dfc = np.full((self.P, self.P, self.N_estimates), np.nan) def _betas(self, X: np.ndarray, Y: np.ndarray) -> np.ndarray: """ Compute FLS betas for one predictor X against all targets Y. Parameters ---------- X : np.ndarray Predictor column of shape (T, 1) for a fixed source region i. Y : np.ndarray Full time series of shape (T, P), all target regions. Returns ------- np.ndarray Betas of shape (P, T) for the given source i to all targets. """ N, K = X.shape G = np.zeros((N * K, N)) A = np.zeros((N * K, N * K)) mui = self.mu * np.eye(K) ind = np.arange(K) for t in range(N): G[ind, t] = X[t, :] if t == 0: Ai = X[t, :].T @ X[t, :] + mui A[ind, ind] = Ai if N > 1: A[ind, ind + K] = -mui elif t != N - 1: Ai = X[t, :].T @ X[t, :] + 2 * mui A[ind, ind] = Ai A[ind, ind + K] = -mui A[ind, ind - K] = -mui else: Ai = X[t, :].T @ X[t, :] + mui A[ind, ind] = Ai A[ind, ind - K] = -mui ind += K RHS = G.T @ Y with threadpool_limits(limits=1): betas = solve(A, RHS, assume_a='sym') # (T, P) return betas.T # (P, T) @staticmethod def _compute_i(i: int, ts: np.ndarray, mu: float): """ Worker function to compute betas for a single source region i. Parameters ---------- i : int Source region index. ts : np.ndarray Standardized time series, shape (T, P). mu : float Regularization parameter. Returns ------- tuple (i, beta_i) where beta_i has shape (P, T). """ N = ts.shape[0] X = ts[:, i].reshape(N, 1) K = 1 G = np.zeros((N * K, N)) A = np.zeros((N * K, N * K)) mui = mu * np.eye(K) ind = np.arange(K) for t in range(N): G[ind, t] = X[t, :] if t == 0: Ai = X[t, :].T @ X[t, :] + mui A[ind, ind] = Ai if N > 1: A[ind, ind + K] = -mui elif t != N - 1: Ai = X[t, :].T @ X[t, :] + 2 * mui A[ind, ind] = Ai A[ind, ind + K] = -mui A[ind, ind - K] = -mui else: Ai = X[t, :].T @ X[t, :] + mui A[ind, ind] = Ai A[ind, ind - K] = -mui ind += K RHS = G.T @ ts with threadpool_limits(limits=1): betas = solve(A, RHS, assume_a='sym') return i, betas.T
[docs] def estimate(self) -> np.ndarray: """ Estimate dynamic functional connectivity using FLS. Returns ------- np.ndarray Dynamic connectivity as a (P, P, T) array, symmetrized and post-processed according to class settings. """ # Standardize if requested if self.standardizeData: mu = np.mean(self.time_series, axis=0, keepdims=True) sd = np.std(self.time_series, axis=0, keepdims=True) sd = np.where(sd == 0, 1.0, sd) self.time_series = (self.time_series - mu) / sd T, P = self.time_series.shape beta = np.zeros((P, P, T), dtype=float) if self.num_cores == 1: rng = tqdm(range(P), desc="FLS (single core)") if self.progress_bar else range(P) for i in rng: beta[i, :, :] = self._betas(self.time_series[:, i].reshape(T, 1), self.time_series) else: if self.progress_bar: with tqdm_joblib(total=P, desc=f"FLS ({self.num_cores} cores)"): results = Parallel(n_jobs=self.num_cores, prefer="processes", batch_size=1)( delayed(FlexibleLeastSquares._compute_i)(i, self.time_series, self.mu) for i in range(P) ) else: results = Parallel(n_jobs=self.num_cores, prefer="processes", batch_size=1)( delayed(FlexibleLeastSquares._compute_i)(i, self.time_series, self.mu) for i in range(P) ) for i, beta_i in results: beta[i, :, :] = beta_i self.dfc = 0.5 * (beta + beta.transpose(1, 0, 2)) self.dfc = self.postproc() return self.dfc
[docs] class PhaseSynchronization(ConnectivityMethod): """ Instantaneous Phase Synchronization methods. Parameters ---------- time_series : np.ndarray The input time series data. method : {'crp', 'phcoh', 'teneto'}, optional The phase synchrony method to use. Default is 'crp'. diagonal : int, optional Value to set on the diagonal of connectivity matrices. Default is 0. fisher_z : bool, optional Whether to apply Fisher z-transformation. Default is False. tril : bool, optional Whether to return only the lower triangle of the matrices. Default is False. References ---------- Honari, H., Choe, A. S., & Lindquist, M. A. (2021). Evaluating phase synchronization methods in fMRI: A comparison study and new approaches. NeuroImage, 228, 117704. https://doi.org/10.1016/j.neuroimage.2020.117704 """ name = "CONT Phase Synchronization" def __init__(self, time_series: np.ndarray, method: Literal["crp", "phcoh", "teneto"] = "crp", diagonal: int = 0, fisher_z: bool = False, tril: bool = False): super().__init__(time_series, diagonal, fisher_z, tril) self.N_estimates = self.T self.dfc = np.full((self.P,self.P, self.N_estimates), np.nan) self.method = method
[docs] def estimate(self): analytic = hilbert(self.time_series, axis=0) phi = np.angle(np.asarray(analytic)) # (N, P) N, P = phi.shape # All pairwise phase diffs for all times in one shot: (N, P, P) dphi = phi[:, :, None] - phi[:, None, :] if self.method == "crp": M = np.cos(dphi) # (N, P, P) elif self.method == "phcoh": M = 1 - np.abs(np.sin(dphi)) elif self.method == "teneto": M = 1 - np.sin(np.abs(dphi) / 2) else: raise ValueError(f"Unknown estimation method: {self.method}") ips = np.moveaxis(M, 0, 2) # -> (P, P, N) # Enforce symmetry (in case of tiny asymmetries) self.dfc = 0.5 * (ips + ips.transpose(1, 0, 2)) self.dfc = self.postproc() return self.dfc
[docs] class LeiDA(ConnectivityMethod): """ Leading Eigenvector Dynamics. Parameters ---------- time_series : np.ndarray The input time series data. flip_eigenvectors : bool, optional The sign of each leading eigenvector is adjusted so that the majority of its elements are negative. This ensures consistent orientation of eigenvectors across time points and subjects. Default is True. diagonal : int, optional Value to set on the diagonal of connectivity matrices. Default is 0. fisher_z : bool, optional Whether to apply Fisher z-transformation. Default is False. tril : bool, optional Whether to return only the lower triangle of the matrices. Default is False. References ---------- Cabral, J., Vidaurre, D., Marques, P., Magalhães, R., Silva Moreira, P., Miguel Soares, J., ... & Kringelbach, M. L. (2017). Cognitive performance in healthy older adults relates to spontaneous switching between states of functional connectivity during rest. Scientific reports, 7(1), 5135. https://doi.org/10.1038/s41598-017-05425-7 Olsen, A. S., Lykkebo-Valløe, A., Ozenne, B., Madsen, M. K., Stenbæk, D. S., Armand, S., ... & Fisher, P. M. (2022). Psilocybin modulation of time-varying functional connectivity is associated with plasma psilocin and subjective effects. Neuroimage, 264, 119716. https://doi.org/10.1016/j.neuroimage.2022.119716 Vohryzek, J., Deco, G., Cessac, B., Kringelbach, M. L., & Cabral, J. (2020). Ghost attractors in spontaneous brain activity: Recurrent excursions into functionally-relevant BOLD phase-locking states. Frontiers in systems neuroscience, 14, 20. https://doi.org/10.3389/fnsys.2020.00020 """ name = "CONT Leading Eigenvector Dynamics" def __init__(self, time_series: np.ndarray, flip_eigenvectors: bool = True, diagonal: int = 0, fisher_z: bool = False, tril: bool = False): super().__init__(time_series, diagonal, fisher_z, tril) self.N_estimates = self.T self.dfc = np.full((self.P,self.P, self.N_estimates), np.nan) self.flip_eigenvectors = flip_eigenvectors self.V1 = np.full((self.N_estimates, self.P), np.nan)
[docs] def estimate(self): """ Calculate Leading Eigenvector Dynamics Analysis (LeiDA). The leading eigenvectors are saved in as the V1 attribute. Returns ------- np.ndarray Dynamic functional connectivity as a PxPxN array. """ analytic_signal = hilbert(self.time_series, axis=0) instantaneous_phase = np.angle(np.asarray(analytic_signal)) # Compute the leading eigenvector for each time point for n in range(self.N_estimates): cohmat = np.cos(np.subtract.outer(instantaneous_phase[n,:], instantaneous_phase[n,:])) # Compute coherence matrix _, eigenvectors = eigh(cohmat) # Compute the eigenvectors (ignore eigenvalues) V1 = eigenvectors[:, -1] # The leading eigenvector is the one corresponding to the largest eigenvalue if self.flip_eigenvectors: # Make sure the largest component is negative if np.mean(V1 > 0) > 0.5: V1 = -V1 self.dfc[:, :, n] = np.outer(V1, V1) self.V1[n, :] = V1 self.dfc = self.postproc() return self.dfc
[docs] class WaveletCoherence(ConnectivityMethod): """ Instantaneous Wavelet Coherence. Parameters ---------- time_series : np.ndarray The input time series data. method : {'weighted'}, optional The method to use for calculating wavelet coherence. Default is 'weighted'. TR : float, optional Repetition time of the data. Default is 0.72. fmin : float, optional Minimum frequency for wavelet transform. Default is 0.007. fmax : float, optional Maximum frequency for wavelet transform. Default is 0.15. n_scales : int, optional Number of scales for wavelet transform. Default is 15. drop_scales : int, optional Number of scales to drop from the edges. Default is 2. drop_timepoints : int, optional Number of time points to drop from the edges. Default is 50. diagonal : int, optional Value to set on the diagonal of connectivity matrices. Default is 0. fisher_z : bool, optional Whether to apply Fisher z-transformation. Default is False. tril : bool, optional Whether to return only the lower triangle of the matrices. Default is False. References ---------- Jacob Billings, Manish Saggar, Jaroslav Hlinka, Shella Keilholz, Giovanni Petri; Simplicial and topological descriptions of human brain dynamics. Network Neuroscience 2021; 5 (2): 549–568. https://doi.org/10.1162/netn_a_00190 """ name = "CONT Wavelet Coherence" def __init__(self, time_series: np.ndarray, method: Literal["weighted"] = "weighted", TR: float = 0.72, fmin: float = 0.007, fmax: float = 0.15, n_scales: int = 15, drop_scales: int = 2, drop_timepoints: int = 50, progress_bar: bool = True, diagonal: int = 0, fisher_z: bool = False, tril: bool = False): super().__init__(time_series, diagonal, fisher_z, tril) self.N_estimates = self.T self.dfc = np.full((self.P,self.P, self.N_estimates), np.nan) self.method = method self.TR = TR self.fmin = fmin self.fmax = fmax self.n_scales = n_scales self.drop_scales = drop_scales self.drop_timepoints = drop_timepoints self.progress_bar = progress_bar self.iwc = None
[docs] def estimate(self): """ Calculate instantaneous wavelet coherence. Returns ------- np.ndarray Dynamic functional connectivity as a PxPxN array. """ # Time series dimensions P = self.time_series.shape[1] T = self.time_series.shape[0] # Initial parameters dt = self.TR # TR of the data s0 = 1 / self.fmax # Smallest scale of the wavelet J = self.n_scales - 1 # Scales range from s0 up to s0 * 2**(J * dj), which gives a total of (J + 1) scales dj = np.log2(self.fmax / self.fmin) / J # Spacing between discrete scales to achieve the desired frequency range mother_wavelet = Morlet(6) # Morlet wavelet with omega_0 = 6 self.iwc = np.full((P, P, J+1, T), np.nan) # resulting wavelet coherence matrices W_list = [] S_list = [] for i in range(P): # Calculate continuous wavelet transform y_normal = (self.time_series[:,i] - self.time_series[:,i].mean()) / self.time_series[:,i].std() W, s, _, _, _, _ = cwt(y_normal, dt, dj=dj, s0=s0, J=J, wavelet=mother_wavelet) scales = np.ones([1, y_normal.size]) * s[:, None] S = mother_wavelet.smooth(np.abs(W) ** 2 / scales, dt, dj, s) # Smooth the wavelet spectra W_list.append(W) S_list.append(S) y_normal = (self.time_series[:,0] - self.time_series[:,0].mean()) / self.time_series[:,0].std() _, s1, _, _, _, _ = cwt(y_normal, dt, dj=dj, s0=s0, J=J, wavelet=mother_wavelet) scales = np.ones([1, y_normal.size]) * s1[:, None] total_pairs = P * (P + 1) // 2 pair_iter = ((i, j) for i in range(P) for j in range(i, P)) for i, j in tqdm(pair_iter, total=total_pairs, disable=not self.progress_bar, desc="Wavelet coherence", dynamic_ncols=True): W1 = W_list[i]; W2 = W_list[j] S1 = S_list[i]; S2 = S_list[j] # Calculate wavelet coherence W12 = W1 * np.conj(W2) S12 = mother_wavelet.smooth(W12 / scales, dt, dj, s1) WCT = np.abs(S12) ** 2 / (S1 * S2) self.iwc[i, j, :, :] = WCT self.iwc[j, i, :, :] = WCT if self.method == "weighted": CWP = np.abs(W1 * np.conj(W2)) CWP = CWP[self.drop_scales:-self.drop_scales, self.drop_timepoints:-self.drop_timepoints] WCT2 = WCT[self.drop_scales:-self.drop_scales, self.drop_timepoints:-self.drop_timepoints] cross_power = CWP / np.sum(CWP, axis=0) vals = 1 - (cross_power * WCT2).sum(axis=0) self.dfc[i, j, self.drop_timepoints:-self.drop_timepoints] = vals self.dfc[j, i, self.drop_timepoints:-self.drop_timepoints] = vals else: raise NotImplementedError("Other methods not yet implemented") # Get rid of empty time points self.dfc = self.dfc[:,:, self.drop_timepoints:-self.drop_timepoints] self.dfc = self.postproc() return self.dfc
[docs] class DCC(ConnectivityMethod): """ Dynamic Conditional Correlation (DCC) as described by Lindquist et al. (2014). Parameters ---------- time_series : np.ndarray The input time series data. num_cores : int, optional Number of CPU cores to use for parallel processing. Default is 16. standardizeData : bool, optional Whether to standardize the time series data. Default is True. diagonal : int, optional Value to set on the diagonal of connectivity matrices. Default is 0. fisher_z : bool, optional Whether to apply Fisher z-transformation. Default is False. tril : bool, optional Whether to return only the lower triangle of the matrices. Default is False. References ---------- Lindquist, M. A., Xu, Y., Nebel, M. B., & Caffo, B. S. (2014). Evaluating dynamic bivariate correlations in resting-state fMRI: a comparison study and a new approach. NeuroImage, 101, 531-546. https://doi.org/10.1016/j.neuroimage.2014.06.052 """ name = "CONT Dynamic Conditional Correlation" def __init__(self, time_series: np.ndarray, num_cores: int = 16, standardizeData: bool = True, progress_bar: bool = True, diagonal: int = 0, fisher_z: bool = False, tril: bool = False): super().__init__(time_series, diagonal, fisher_z, tril) self.N_estimates = self.T self.dfc = np.full((self.P,self.P, self.N_estimates), np.nan) self.standardizeData = standardizeData self.progress_bar = progress_bar self.num_cores = num_cores self.H = None # dynamic conditional covariance tensor self.Theta = None # GARCH(1,1) parameters self.X = None # DCC parameters def _loglikelihood_garch11(self, theta, data): """ Calculates a real number proportional to the negative log-likelihood of the GARCH(1,1) model Parameters ---------- theta : 1-by-3 vector GARCH(1,1) parameter vector data: 1-by-n vector one dimensional time series Returns ------- output : float real number proportional to the negative log-likelihood (to be minimized) """ T = len(data) h = np.zeros_like(data) h[0] = np.mean(data ** 2) output = 0 eps = 1e-15 for t in range(1,T): # h[t-1] neds to be > 0 and extremely small values can cause an overflow if h[t-1] > eps: output += data[t-1] ** 2 / h[t-1] + np.log(h[t-1]) h[t] = theta[0] + theta[1] * data[t-1] ** 2 + theta[2] * h[t-1] return output def _rToEpsilon(self, r, theta): """Calculates the standardized residual vector and standardized conditional volatility vector (eq. 25) Parameters ---------- r: 1-by-n vector one dimensional time series theta : 1-by-3 vector fitted GARCH(1,1) parameters Returns ------- epsilon : n-by-1 np.ndarray standardized residual vector d : 1-by-n np.ndarray estimated conditional volatility vector """ T = len(r) epsilon = np.zeros_like(r) d = np.zeros_like(r) d[0] = np.mean(r ** 2) for t in range(T-1): epsilon[t] = r[t] / np.sqrt(d[t]) d[t+1] = theta[0] + theta[1] * r[t] ** 2 + theta[2] * d[t] epsilon[T-1] = r[T-1] / np.sqrt(d[T-1]) return epsilon, d def _epsilonToR(self, epsilon, theta): """ Calculates the time-varying conditional correlation matrices from standardized returns Parameters ---------- epsilon : T-by-N matrix standardized residual time series theta : 1-by-2 vector dcc parameter vector Returns ------- R : N-by-N-by-T np.ndarray conditional correlation matrix """ T, N = epsilon.shape S2 = np.corrcoef(epsilon.T) SS = S2 * (1 - np.sum(theta)) Q = S2 R = np.zeros((N, N, T)) for t in range(T): temp = epsilon[t, :] * np.sqrt(np.diag(Q)) Q = SS + theta[0] * np.outer(temp, temp) + theta[1] * Q R[:, :, t] = np.diag(1. / np.sqrt(np.diag(Q))) @ Q @ np.diag(1. / np.sqrt(np.diag(Q))) return R def _LcOriginal(self, epsilon, theta): """ Calculates the correlation component of the log-likelihood (eq. 29) Parameters ---------- epsilon : T-by-N matrix standardized residual time series theta : 1-by-2 vector dcc parameter vector Returns ------- output : float real number proportional to the correlation component in negative log-likelihood (to be minimized) """ T, _ = epsilon.shape S2 = np.cov(epsilon, rowvar=False) SS = S2 * (1 - sum(theta)) Q = S2.copy() R = np.diag(1/np.sqrt(np.diag(Q))) @ Q @ np.diag(1/np.sqrt(np.diag(Q))) output = 0 for t in range(T): output += np.log(det(R)) + epsilon[t, :] @ inv(R) @ epsilon[t, :] temp = epsilon[t, :] * np.sqrt(np.diag(Q)) Q = SS + theta[0] * np.outer(temp, temp) + theta[1] * Q R = np.diag(1/np.sqrt(np.diag(Q))) @ Q @ np.diag(1/np.sqrt(np.diag(Q))) return output def _compute_garch(self, n): """ Fit a univariate GARCH(1,1) model to a time series and calculate standardized residuals and conditional volatilities. Parameters ---------- n : int Index of the time series to be processed. Returns ------- tuple - np.ndarray : Fitted GARCH(1,1) parameters. - np.ndarray : Standardized residuals. - np.ndarray : Estimated conditional volatilities. """ ts_n = np.ascontiguousarray(self.time_series[:, n]) constraints = {'type': 'ineq', 'fun': lambda x: np.array([1 - x[0] - x[1] - x[2], x[0], x[1], x[2]])} bounds = ((0, 1), (0, 1), (0, 1)) theta0 = (0.25, 0.25, 0.25) res = minimize(self._loglikelihood_garch11, theta0, args=(ts_n,), constraints=constraints, bounds=bounds) ep, d = self._rToEpsilon(ts_n, res.x) return res.x, ep, d
[docs] def estimate(self): """ DCC algorithm Parameters ---------- theta : T-by-N matrix fMRI time series data Returns ------- R : N*N*T np.ndarray estimated dynamic conditional correlation tensor """ T, N = self.time_series.shape # T timepoints x N parcels ts = self.time_series - np.mean(self.time_series, axis=0) # Demean # Initialize output data H = np.zeros((N,N,T)) # conditional covariance matrices R = np.zeros((N,N,T)) # conditional correlation matrices Theta = np.zeros((N,3)) # GARCH(1,1) parameters # Initialize intermediate parameters epsilon = np.zeros_like(ts) D = np.zeros_like(ts) # Fit a univariate GARCH process for each n if getattr(self, "num_cores", 1) == 1: it = tqdm(range(N), desc="DCC GARCH (1 core)") if getattr(self, "progress_bar", True) else range(N) results = [self._compute_garch(n) for n in it] else: cm = tqdm_joblib(total=N, desc=f"DCC GARCH ({self.num_cores} cores)") if getattr(self, "progress_bar", True) else None gen = (delayed(self._compute_garch)(n) for n in range(N)) if cm: with cm: results = Parallel(n_jobs=self.num_cores, prefer="processes", batch_size=1)(gen) else: results = Parallel(n_jobs=self.num_cores, prefer="processes", batch_size=1)(gen) for n, (theta, ep, d) in enumerate(results): Theta[n, :] = theta epsilon[:, n] = ep D[:, n] = d # Estimate parameter X for dynamic correlation matrix constraints = {'type': 'ineq', 'fun': lambda x: np.array([1 - x[0] - x[1], x[0], x[1]])} bounds = ((0, 1), (0, 1)) x0 = (0.25, 0.25) for attempt in range(5): try: res = minimize(lambda x: self._LcOriginal(epsilon, x), x0, constraints=constraints, bounds=bounds) break except Exception as e: print(f"Exception: {e}") print(f"Attempt {attempt + 1} failed, trying new random initial values") x0 = (random.uniform(0.25, 0.5), random.uniform(0.25, 0.5)) if attempt == 4: print("All attempts failed.") X = res.x # Time-varying conditional correlation matrices R = self._epsilonToR(epsilon, X) # Time-varying conditional covariance matrices for t in tqdm(range(T), disable=not self.progress_bar, desc="Conditional covariance matrices", dynamic_ncols=True): H[:,:,t] = np.diag(np.sqrt(D[t,:])) @ R[:,:,t] @ np.diag(np.sqrt(D[t,:])) self.dfc = R self.dfc = self.postproc() self.H = H self.Theta = Theta self.X = X return self.dfc
[docs] class EdgeConnectivity(ConnectivityMethod): """ Edge-centric connectivity method. Parameters ---------- time_series : np.ndarray The input time series data. method : string, optional The specific connectivity to calculate. Default is "eTS". - eTS: returns the edge time series (edges x time) - eFC: returns the edge functional connectivity (edges x edges x time). standardizeData : bool, optional Whether to standardize the time series data. Default is True. vlim : float, optional Limit for plotting in the GUI (not used in the method itself). Default is 3. References ---------- Faskowitz, J., Esfahlani, F. Z., Jo, Y., Sporns, O., & Betzel, R. F. (2020). Edge-centric functional network representations of human cerebral cortex reveal overlapping system-level architecture. Nature neuroscience, 23(12), 1644–1654. DOI: https://doi.org/10.1038/s41593-020-00719-y """ name = "CONT Edge-centric Connectivity" def __init__(self, time_series: np.ndarray, method: Literal["eTS", "eFC"] = "eTS", standardizeData: bool = True, labels: list = None, vlim: float = 3): super().__init__(time_series, 0, False, False) self.method = method self.standardizeData = standardizeData self.labels = labels self.edge_labels = None self.u = None # Row indices of the upper triangle of the connectivity matrix self.v = None # Column indices of the upper triangle of the connectivity matrix
[docs] def estimate(self): """ Calculate edge-centric connectivity (eTS or eFC). Returns ------- np.ndarray Dynamic functional connectivity depending on the method. For eTS: Time x Edge array. For eFC: Edge x Edge x Time array. """ z = zscore(self.time_series, axis=0, ddof=1) if self.standardizeData else self.time_series self.u, self.v = np.triu_indices(self.time_series.shape[1], k=1) print(self.u, self.v) a = np.multiply(z[:, self.u], z[:, self.v]) # edge time series # If labels are provided we can create edge names if isinstance(self.labels, list): self.edge_labels = [f"{self.labels[i]}{self.labels[j]}" for i, j in zip(self.u, self.v)] if self.method == "eTS": return a b = a.T @ a # Inner product c = np.sqrt(np.diag(b)) # Square root of the diagonal elements (variance) to normalize d = np.outer(c, c) # Normalization matrix eFC = b / d # Element-wise division to get the correlation matrix self.dfc = eFC self.dfc = self.postproc() return self.dfc
""" SECTION: State based dFC methods """
[docs] class SlidingWindowClustering(ConnectivityMethod): """ Siding window clustering (SWC) state-based connectivity (2-level clustering). References ---------- Torabi, M., Mitsis, G. D., & Poline, J. B. (2024). On the variability of dynamic functional connectivity assessment methods. GigaScience, 13, giae009. https://doi.org/10.1093/gigascience/giae009 Parameters ---------- time_series : list or np.ndarray The input time series data. n_states : int, optional Number of states for the method. Default is 5. subject_clusters : int, optional Number of clusters for the first level clustering. Default is 5. windowsize : int, optional Size of the sliding window. Default is 29. shape : str, optional Shape of the window. Default is "gaussian". std : float, optional Standard deviation for gaussian window. Default is 10. stepsize : int, optional Step size for the sliding window. Default is 15. """ name = "STATE Sliding Window Clustering" def __init__(self, time_series: Union[np.ndarray, list], n_states: int = 5, subject_clusters: int = 5, windowsize: int = 29, shape: Literal["rectangular", "gaussian", "hamming"] = "gaussian", std: float = 10, stepsize: int = 15, random_state: int | None = None, n_init: int = 50, progress_bar: bool = True): super().__init__(time_series, 0, False, False) self.n_states = n_states self.windowsize = windowsize self.shape = shape self.std = std self.stepsize = stepsize self.random_state = random_state self.n_init = n_init self.progress_bar = progress_bar self.subject_clusters = subject_clusters self.N_estimates = (self.T - self.windowsize) // self.stepsize + 1
[docs] def vec2mat(self, F, N): T = F.shape[0] C = np.zeros((T, N, N), dtype=F.dtype) iu = np.triu_indices(N, k=1) C[:, iu[0], iu[1]] = F # Assign vectorized values to upper triangles C = C + np.transpose(C, (0, 2, 1)) + np.eye(N) # Symmetrize and set main diagonal return C
[docs] def mat2vec(self, C_t): if C_t.ndim == 2: # 2D square matrix return C_t[np.triu_indices_from(C_t, k=1)] elif C_t.ndim == 3: # 3D array, C_t is a stack of square matrices. idx = np.triu_indices_from(C_t[0], k=1) return C_t[:, idx[0], idx[1]] else: raise ValueError("Input must be a 2D or 3D array.")
[docs] def estimate(self): """ Estimate state-based connectivity Returns ------- np.ndarray State time course (n_subjects x T) np.ndarray Connectivity states (P x P x n_states) """ FCS_1st_level = None SW_dFC = None for i in tqdm(range(self.n_subjects), disable=not self.progress_bar, desc="Sliding Window Clustering", dynamic_ncols=True): # Sliding window subject_ts = self.time_series3D[i, :, :] sw = SlidingWindow(time_series=subject_ts, windowsize=self.windowsize, stepsize=self.stepsize, shape=self.shape, std=self.std, diagonal=1) dfc = sw.estimate() dfc = np.moveaxis(dfc, -1, 0) F = self.mat2vec(dfc) # First level clustering kmeans_ = KMeans(n_clusters=self.subject_clusters, random_state=self.random_state, n_init=self.n_init).fit(F) F_cent = kmeans_.cluster_centers_ FCS = self.vec2mat(F_cent, N=self.P) if FCS_1st_level is None: FCS_1st_level = FCS else: FCS_1st_level = np.concatenate((FCS_1st_level, FCS), axis=0) if SW_dFC is None: SW_dFC = dfc else: SW_dFC = np.concatenate((SW_dFC, dfc), axis=0) # Second level clustering F = self.mat2vec(FCS_1st_level) kmeans_ = KMeans(n_clusters=self.n_states, n_init=self.n_init, random_state=self.random_state).fit(F) F_cent = kmeans_.cluster_centers_ self.states = self.vec2mat(F_cent, N=self.P) self.states = self.states.transpose(1, 2, 0) self.state_tc = kmeans_.predict(self.mat2vec(SW_dFC)) self.state_tc = self.state_tc.reshape(self.n_subjects, len(self.state_tc)//self.n_subjects) return self.state_tc, self.states
[docs] class KSVD(ConnectivityMethod): """ Windowless state-based connectivity based on K-SVD. References ---------- Torabi, M., Mitsis, G. D., & Poline, J. B. (2024). On the variability of dynamic functional connectivity assessment methods. GigaScience, 13, giae009. https://doi.org/10.1093/gigascience/giae009 Rubinstein, R., Zibulevsky, M., & Elad, M. (2008). Efficient implementation of the K-SVD algorithm using batch orthogonal matching pursuit. Cs Technion, 40(8), 1-15. Parameters ---------- time_series : list or np.ndarray The input time series data. n_states : int, optional Number of states for the method. Default is 5. """ name = "STATE K-SVD" def __init__(self, time_series: Union[np.ndarray, list], n_states: int = 5): super().__init__(time_series, 0, False, False) self.n_states = n_states self.N_estimates = self.n_subjects * self.T self.state_tc = np.zeros(self.N_estimates) self.states = np.full((self.P,self.P, self.n_states), np.nan)
[docs] def estimate(self): """ Estimate state-based connectivity Returns ------- np.ndarray State time course (n_subjects x T) np.ndarray Connectivity states (P x P x n_states) """ # Estimate states aksvd = ApproximateKSVD(n_components=self.n_states, transform_n_nonzero_coefs=1) dictionary = np.asarray(aksvd.fit(self.time_series).components_) gamma = aksvd.transform(self.time_series) # Build state matrices: v_k v_k^T for each atom k (P x P x K) self.states = np.einsum('kp,kq->pqk', dictionary, dictionary) # Assign each time sample to a state and reshape to (n_subjects, T) idx = np.argmax(np.abs(gamma), axis=1) # (N,) self.state_tc = idx.reshape(self.n_subjects, self.T) return self.state_tc, self.states
[docs] class CoactivationPatterns(ConnectivityMethod): """ Co-activation patterns (state-based connectivity). References ---------- Torabi, M., Mitsis, G. D., & Poline, J. B. (2024). On the variability of dynamic functional connectivity assessment methods. GigaScience, 13, giae009. https://doi.org/10.1093/gigascience/giae009 Parameters ---------- time_series : list or np.ndarray The input time series data. n_states : int, optional Number of states for the method. Default is 5. subject_clusters : int, optional Number of subject clusters. Default is 5. """ name = "STATE Co-activation Patterns" def __init__(self, time_series: Union[np.ndarray, list], n_states: int = 5, subject_clusters: int = 5, random_state: int | None = None, n_init: int = 50, progress_bar: bool = True): super().__init__(time_series, 0, False, False) self.N_estimates = self.T * time_series.shape[-1] if isinstance(time_series, np.ndarray) else self.T * len(time_series) self.n_states = n_states self.subject_clusters = subject_clusters self.random_state = random_state self.n_init = n_init self.progress_bar = progress_bar
[docs] def cluster_ts(self, act, n_clusters): kmeans = KMeans(n_clusters=n_clusters, n_init=self.n_init, random_state=self.random_state).fit(act) centroids = kmeans.cluster_centers_ return centroids, kmeans
[docs] def estimate(self): """ Estimate state-based connectivity Returns ------- np.ndarray State time course (n_subjects x T) np.ndarray Connectivity states (P x P x n_states) """ center_1st_level = None for subject in tqdm(range(self.n_subjects), disable=not self.progress_bar, desc="CAP", dynamic_ncols=True): ts = self.time_series3D[subject, :,:] if ts.shape[0] < self.subject_clusters: print(f"Number of subject-level clusters changed to {ts.shape[0]} as they cannot be more than time samples.") self.subject_clusters = ts.shape[0] centroids, _ = self.cluster_ts(act=ts, n_clusters=self.subject_clusters) if center_1st_level is None: center_1st_level = centroids else: center_1st_level = np.concatenate((center_1st_level, centroids), axis=0) group_centroids, kmeans= self.cluster_ts(act=center_1st_level, n_clusters=self.n_states) self.states = np.full((self.P,self.P, self.n_states), np.nan) for i, group_centroid in enumerate(group_centroids): self.states[:,:,i] = np.multiply(group_centroid[:,np.newaxis], group_centroid[np.newaxis,:]) self.state_tc = kmeans.predict(self.time_series) self.state_tc = self.state_tc.reshape(self.n_subjects, self.T) return self.state_tc, self.states
[docs] class ContinuousHMM(ConnectivityMethod): """ Continuous hidden markov model (state-based connectivity). References ---------- Torabi, M., Mitsis, G. D., & Poline, J. B. (2024). On the variability of dynamic functional connectivity assessment methods. GigaScience, 13, giae009. https://doi.org/10.1093/gigascience/giae009 Parameters ---------- time_series : list or np.ndarray The input time series data. n_states : int, optional Number of states for the method. Default is 5. hmm_iter : int, optional Number of iterations for the HMM. Default is 20. """ name = "STATE Continuous Hidden Markov Model" def __init__(self, time_series: Union[np.ndarray, list], n_states: int = 5, hmm_iter: int = 20, progress_bar: bool = True): super().__init__(time_series, 0, False, False) self.N_estimates = self.T * time_series.shape[-1] if isinstance(time_series, np.ndarray) else self.T * len(time_series) self.n_states = n_states self.hmm_iter = hmm_iter self.progress_bar = progress_bar
[docs] def estimate(self): """ Estimate state-based connectivity Returns ------- np.ndarray State time course (n_subjects x T) np.ndarray Connectivity states (P x P x n_states) """ models, scores = [], [] for _ in tqdm(range(self.hmm_iter), disable=not self.progress_bar, desc="Continuous HMM", dynamic_ncols=True): model = hmm.GaussianHMM(n_components=self.n_states, covariance_type="full") model.fit(self.time_series) models.append(model) score = model.score(self.time_series) scores.append(score) hmm_model = models[np.argmax(scores)] self.states = hmm_model.covars_ self.states = self.states.transpose(1, 2, 0) self.state_tc = hmm_model.predict(self.time_series) self.state_tc = self.state_tc.reshape(self.n_subjects, self.T) return self.state_tc, self.states
[docs] class DiscreteHMM(ConnectivityMethod): """ Discrete hidden markov model (state-based connectivity). References ---------- Torabi, M., Mitsis, G. D., & Poline, J. B. (2024). On the variability of dynamic functional connectivity assessment methods. GigaScience, 13, giae009. https://doi.org/10.1093/gigascience/giae009 Parameters ---------- time_series : list or np.ndarray The input time series data. n_states : int, optional Number of states for the method. Default is 5. state_ratio : float, optional Ratio of states to use for clustering. Default is 3/5. subject_clusters : int, optional Number of subject clusters. Default is 5. windowsize : int, optional Size of the sliding window. Default is 29. shape : str, optional Shape of the window. Default is "gaussian". std : float, optional Standard deviation for gaussian window. Default is 10. stepsize : float, optional Step size for the sliding window. Default is 15. hmm_iter : int, optional Number of iterations for the HMM. Default is 20. """ name = "STATE Discrete Hidden Markov Model" def __init__(self, time_series: Union[np.ndarray, list], n_states: int = 5, state_ratio: float = 3/5, subject_clusters: int = 5, windowsize: int = 29, shape: Literal["rectangular", "gaussian", "hamming"] = "gaussian", std: float = 10, stepsize: int = 15, hmm_iter: int = 20, progress_bar: bool = True): super().__init__(time_series, 0, False, False) self.time_series = self.time_series3D self.n_states = n_states self.state_ratio = state_ratio self.subject_clusters = subject_clusters self.windowsize = windowsize self.shape = shape self.std = std self.stepsize = stepsize self.hmm_iter = hmm_iter self.progress_bar = progress_bar self.N_estimates = (self.T - self.windowsize) // self.stepsize + 1
[docs] def estimate(self): """ Estimate state-based connectivity Returns ------- np.ndarray State time course (n_subjects x T) np.ndarray Connectivity states (P x P x n_states) """ # Run sliding window clustering n_cluster_states = int(self.n_states * self.state_ratio) state_tc, states = SlidingWindowClustering(self.time_series, n_states=n_cluster_states, subject_clusters=self.subject_clusters, windowsize=self.windowsize, shape=self.shape, stepsize=self.stepsize).estimate() states = states.transpose(2, 0, 1) SWC_dFC = states[state_tc.flatten()] state_tc = state_tc.reshape(-1, 1) # Fit the categorical HMM models, scores = [], [] for i in tqdm(range(self.hmm_iter), disable=not self.progress_bar, desc="Discrete HMM", dynamic_ncols=True): model = hmm.CategoricalHMM(n_components=self.n_states) model.fit(state_tc) models.append(model) score = model.score(state_tc) scores.append(score) # Select the best model and get the states/connectivity estimates hmm_model = models[np.argmax(scores)] self.state_tc = hmm_model.predict(state_tc) self.states = np.full((self.P, self.P, self.n_states), np.nan) for i in range(self.n_states): ids = np.array([int(state == i) for state in self.state_tc]) self.states[:,:,i] = np.average(SWC_dFC, weights=ids, axis=0) self.state_tc = self.state_tc.reshape(self.n_subjects, self.N_estimates) return self.state_tc, self.states
""" SECTION: Static FC methods """
[docs] class Static_Pearson(ConnectivityMethod): """ Static functional connectivity method using Pearson correlation. Parameters ---------- time_series : np.ndarray The input time series data. shrinkage : str, optional Shrinkage for covariance estimation. Can be None or Ledoit-Wolf. Default is None. diagonal : int, optional Value to set on the diagonal of connectivity matrices. Default is 0. fisher_z : bool, optional Whether to apply Fisher z-transformation. Default is False. tril : bool, optional Whether to return only the lower triangle of the matrices. Default is False. """ name = "STATIC Pearson Correlation" def __init__(self, time_series: np.ndarray, cov_estimator: Literal[None, "LedoitWolf"] = None, diagonal: int = 0, fisher_z: bool = False, tril: bool = False): self.fc = None self.cov_estimator = cov_estimator super().__init__(time_series, diagonal, fisher_z, tril)
[docs] def estimate(self): """ Estimate the functional connectivity. Returns ------- np.ndarray Static functional connectivity matrix. """ if self.cov_estimator == "LedoitWolf": C = LedoitWolf().fit(self.time_series).covariance_ else: C = np.cov(self.time_series.T) D = np.diag(1/np.sqrt(np.diag(C))) self.fc = D @ C @ D self.fc = self.postproc() return self.fc
[docs] class Static_Partial(ConnectivityMethod): """ Static functional connectivity method using partial correlation. Parameters ---------- time_series : np.ndarray The input time series data. cov_estimator : str, optional Shrinkage for covariance estimation. Can be None or Ledoit-Wolf. Default is None. diagonal : int, optional Value to set on the diagonal of connectivity matrices. Default is 0. fisher_z : bool, optional Whether to apply Fisher z-transformation. Default is False. tril : bool, optional Whether to return only the lower triangle of the matrices. Default is False. """ name = "STATIC Partial Correlation" def __init__(self, time_series: np.ndarray, cov_estimator: Literal["LedoitWolf", None] = None, diagonal: int = 0, fisher_z: bool = False, tril: bool = False): self.fc = None self.cov_estimator = cov_estimator super().__init__(time_series, diagonal, fisher_z, tril)
[docs] def estimate(self): """ Estimate the functional connectivity. Returns ------- np.ndarray Static functional connectivity matrix. """ if self.cov_estimator == "LedoitWolf": C = LedoitWolf().fit(self.time_series).covariance_ else: C = np.cov(self.time_series.T) P = np.linalg.inv(C) D = np.diag(1/np.sqrt(np.diag(P))) self.fc = -D @ P @ D self.fc = self.postproc() return self.fc
[docs] class Static_Mutual_Info(ConnectivityMethod): """ Static functional connectivity method using mutual information. Parameters ---------- time_series : np.ndarray The input time series data. num_bins : int, optional Number of bins to use for the mutual information calculation. Default is 10. diagonal : int, optional Value to set on the diagonal of connectivity matrices. Default is 0. fisher_z : bool, optional Whether to apply Fisher z-transformation. Default is False. tril : bool, optional Whether to return only the lower triangle of the matrices. Default is False. """ name = "STATIC Mutual Information" def __init__(self, time_series: np.ndarray, num_bins: int = 10, diagonal: int = 0, fisher_z: bool = False, tril: bool = False): self.fc = None self.num_bins = num_bins super().__init__(time_series, diagonal, fisher_z, tril)
[docs] def estimate(self): """ Estimate the functional connectivity. Returns ------- np.ndarray Static functional connectivity matrix. """ if self.num_bins is None: raise ValueError("Number of bins must be specified for mutual information method") binned_data = np.zeros_like(self.time_series, dtype=int) # Determine the bin edges and bin the data for each time series for i in range(self.P): bin_edges = np.histogram_bin_edges(self.time_series[:, i], bins=self.num_bins) binned_data[:, i] = np.digitize(self.time_series[:, i], bins=bin_edges, right=False) self.fc = np.zeros((self.P, self.P)) for i in range(self.P): for j in range(i + 1, self.P): mi = mutual_info_score(binned_data[:, i], binned_data[:, j]) self.fc[i, j] = mi self.fc[j, i] = mi self.fc = self.postproc() return self.fc
[docs] class Static_Covariance(ConnectivityMethod): """ Static functional connectivity method using covariance. Parameters ---------- time_series : np.ndarray The input time series data. cov_estimator : str, optional Shrinkage for covariance estimation. Can be None or Ledoit-Wolf. Default is None. diagonal : int, optional Value to set on the diagonal of connectivity matrices. Default is 0. fisher_z : bool, optional Whether to apply Fisher z-transformation. Default is False. tril : bool, optional Whether to return only the lower triangle of the matrices. Default is False. """ name = "STATIC Covariance" def __init__(self, time_series: np.ndarray, cov_estimator: Literal["LedoitWolf", None] = None, diagonal: int = 0, fisher_z: bool = False, tril: bool = False): self.fc = None self.cov_estimator = cov_estimator super().__init__(time_series, diagonal, fisher_z, tril)
[docs] def estimate(self): """ Estimate the functional connectivity. Returns ------- np.ndarray Static functional connectivity matrix. """ if self.cov_estimator == "LedoitWolf": self.fc = LedoitWolf().fit(self.time_series).covariance_ else: self.fc = np.cov(self.time_series.T) self.fc = self.postproc() return self.fc