Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
289 changes: 289 additions & 0 deletions src/neuronumba/observables/distance_rule.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,289 @@
from abc import abstractmethod
import numpy as np
from scipy.stats import binned_statistic
from scipy.optimize import curve_fit
from neuronumba.basic.attr import HasAttr, Attr


class DistanceRule(HasAttr):
"""
Base class for distance-based connectivity rules.

This class provides:
- common attributes (e.g. NSTD, lambda_val)
- utility methods for histogram computation and exponential fitting

Subclasses must implement the `compute_Dist_Rule` method.

Code by Giuseppe Pau, December 27, 2025
Refactored by Gustavo Patow, January 13, 2025
"""

# Number of standard deviations used as threshold for long-range connections
NSTD = Attr(required=False, default=1)
# Exponential decay parameter lambda (can be fitted or externally provided)
lambda_val = Attr(required=False, default=None)

def compute(self,
cog_dist: np.ndarray,
lambda_val: float = None):
if lambda_val is None:
if self.lambda_val is None:
raise ValueError(
"lambda_val is not set. Run fit_exponential or provide it explicitly."
)
lambda_val = self.lambda_val
return self.compute_Dist_Rule(cog_dist, lambda_val)

@abstractmethod
def compute_Dist_Rule(self, cog_dist: np.ndarray, lambda_val: float):
"""
Abstract method to compute a distance-based connectivity rule.
"""
pass

def compute_hist(self, C, rr, NR):
"""
Compute statistics of a connectivity matrix binned by distance.

Parameters
----------
C : np.ndarray
Connectivity matrix (e.g. SC or EDR).
rr : np.ndarray
Pairwise distance matrix.
NR : int
Number of distance bins.

Returns
-------
means : np.ndarray
Mean connectivity per distance bin.
stds : np.ndarray
Standard deviation per distance bin.
bin_edges : np.ndarray
Distance bin edges.
maxs : np.ndarray
Maximum connectivity value per bin.
"""
rr_flat = rr.flatten()
C_flat = C.flatten()

min_dist = rr_flat.min()
max_dist = rr_flat.max()

means, bin_edges, _ = binned_statistic(
rr_flat, C_flat,
statistic='mean',
bins=NR,
range=(min_dist, max_dist)
)

stds, _, _ = binned_statistic(
rr_flat, C_flat,
statistic='std',
bins=NR,
range=(min_dist, max_dist)
)

maxs, _, _ = binned_statistic(
rr_flat, C_flat,
statistic='max',
bins=NR,
range=(min_dist, max_dist)
)

return means, stds, bin_edges, maxs

def fit_exponential(self, centers, means, start_index=24):
"""
Fit an exponential decay to the binned connectivity profile.

The model is:
C(d) = A1 * exp(-lambda * d)

Parameters
----------
centers : np.ndarray
Bin center distances.
means : np.ndarray
Mean connectivity values per bin.
start_index : int
Index from which the fit is performed (to ignore short distances).

Returns
-------
lambda_fit : float
Estimated exponential decay parameter lambda.
"""

def expfunc(x, A1, A2):
return A1 * np.exp(-A2 * x)

xdata = centers[start_index:]
ydata = means[start_index:]

# Initial guess and bounds for nonlinear fitting
A0 = [0.15, 0.18]
bounds = ([-100, -100], [100, 100])

popt, _ = curve_fit(expfunc, xdata, ydata, p0=A0, bounds=bounds)

# The second fitted parameter corresponds to lambda
lambda_fit = popt[1]
return lambda_fit


class EDR_distance_rule(DistanceRule):
"""
Exponential Distance Rule (EDR).

Computes:
- pairwise Euclidean distances
- exponential decay connectivity matrix
"""
def __init__(self, lambda_val: float):
super().__init__(lambda_val=lambda_val)

def compute_Dist_Rule(self,
cog_dist: np.ndarray,
lambda_val: float):
"""
Compute distance matrix and EDR connectivity.

Parameters
----------
cog_dist : np.ndarray
Coordinates of centers of gravity (N x 3).
lambda_val : float, optional
Exponential decay parameter. If None, self.lambda_val is used.

Returns
-------
rr : np.ndarray
Pairwise distance matrix.
c_exp : np.ndarray
Exponential distance rule connectivity matrix.
"""
N = cog_dist.shape[0]
rr = np.zeros((N, N))

for i in range(N):
for j in range(N):
rr[i, j] = np.linalg.norm(cog_dist[i, :] - cog_dist[j, :])

c_exp = np.exp(-lambda_val * rr)
np.fill_diagonal(c_exp, 1.0)

return rr, c_exp


class EDR_LR_distance_rule(DistanceRule):
"""
Extension of the EDR model including long-range connections (Clong). From
Deco, Gustavo et al., Rare long-range cortical connections enhance human
information processing, Current Biology, Volume 31, Issue 20, 4436 - 4448.e5
DOI: 10.1016/j.cub.2021.07.064

original code by Gustavo Deco, 2021
"""

c_exp = Attr(dependant=True)
rr = Attr(dependant=True)

def __init__(self, sc=None):
super().__init__()
self.sc = sc

def _init_dependant(self):
super()._init_dependant()
# First, we need the rr and Clong matrices

def compute_Dist_Rule(self,
cog_dist: np.ndarray,
lambda_val: float):
"""
Compute EDR + long-range connectivity matrix.

Parameters
----------
rr : np.ndarray
Distance matrix.
Clong : np.ndarray
Long-range connectivity matrix.
lambda_val : float, optional
Exponential decay parameter.

Returns
-------
EDR_Clong : np.ndarray
Combined EDR + Clong connectivity matrix.
"""
C = np.exp(-lambda_val * rr)
EDR_Clong = C + Clong
EDR_Clong = np.clip(EDR_Clong, 0, 1)
np.fill_diagonal(EDR_Clong, 1.0)
return rr, EDR_Clong

def compute_Clong(self, rr, means, stds, bin_edges,
NRini, NRfin, DistRange=0, A1=None):
"""
Compute the long-range connectivity matrix (Clong).

A connection is considered long-range if:
- its distance bin is between NRini and NRfin
- its distance is greater than DistRange
- its strength exceeds mean + NSTD * std within the bin

Parameters
----------
rr : np.ndarray
Distance matrix.
sc_matrix : np.ndarray
Structural connectivity matrix.
means, stds : np.ndarray
Mean and standard deviation per distance bin.
bin_edges : np.ndarray
Distance bin edges.
NRini, NRfin : int
First and last bin indices (MATLAB-style, 1-based).
DistRange : float
Minimum distance threshold.
A1 : float
Normalization constant (from exponential fit).

Returns
-------
Clong : np.ndarray
Long-range connectivity matrix.
"""
if A1 is None:
raise ValueError("A1 (from exponential fit) must be provided.")

NSTD = self.NSTD
N = rr.shape[0]
Clong = np.zeros((N, N))

# Convert MATLAB-style indices (1-based) to Python (0-based)
NRini_py = NRini - 1
NRfin_py = NRfin - 1

bin_indices = np.digitize(rr, bin_edges) - 1

for i in range(N):
for j in range(N):
bin_id = bin_indices[i, j]

if bin_id < NRini_py or bin_id > NRfin_py:
continue
if rr[i, j] <= DistRange:
continue

mv = means[bin_id]
st = stds[bin_id]

if self.sc[i, j] > mv + NSTD * st:
Clong[i, j] = self.sc[i, j]

Clong /= A1
return Clong
49 changes: 24 additions & 25 deletions src/neuronumba/observables/turbulence.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from neuronumba.basic.attr import Attr
from neuronumba.observables.base_observable import ObservableFMRI
from neuronumba.observables.distance_rule import DistanceRule, EDR_distance_rule
from neuronumba.tools import matlab_tricks


Expand All @@ -24,27 +25,25 @@ class Turbulence(ObservableFMRI):
Code by Gustavo Deco, 2020.
Translated by Marc Gregoris, May 21, 2024
Refactored by Gustavo Patow, June 9, 2024
Refactored by Giuseppe Pau, December 2025
"""

lambda_val = Attr(default=0.18, required=False)
cog_dist = Attr(required=True)
c_exp = Attr(dependant=True)
rr = Attr(dependant=True)
distance_rule = Attr(default=EDR_distance_rule(lambda_val=lambda_val), dependant=True)

# Ensure conceptual consistency: the model lambda must match the distance rule lambda
def _init_dependant(self):
super()._init_dependant()
self._compute_exp_law()

def _compute_exp_law(self):
N = self.cog_dist.shape[0]
# Compute the distance matrix
rr = np.zeros((N, N))
for i in range(N):
for j in range(N):
rr[i, j] = np.linalg.norm(self.cog_dist[i, :] - self.cog_dist[j, :])
# Build the exponential-distance matrix
c_exp = np.exp(-self.lambda_val * rr)
np.fill_diagonal(c_exp, 1)

if not isinstance(self.distance_rule, DistanceRule):
self.distance_rule = EDR_distance_rule(lambda_val=self.lambda_val)

self.distance_rule.lambda_val = self.lambda_val
rr, c_exp = self.distance_rule.compute(self.cog_dist)

self.rr = rr
self.c_exp = c_exp

Expand All @@ -57,7 +56,7 @@ def compute_turbulence(self, bold_signal):
# bold_signal (ndarray): Bold signal with shape (n_rois, n_time_samples)
n_rois, t_max = bold_signal.shape
# Initialization of results-storing data
enstrophy = np.zeros((n_rois, t_max))
entrophy = np.zeros((n_rois, t_max))
Phases = np.zeros((n_rois, t_max))

# Hilbert transform to calculate the instantaneous phases per node
Expand All @@ -69,25 +68,25 @@ def compute_turbulence(self, bold_signal):
# Calculate Kuramoto LOCAL order parameter for all nodes
for i in range(n_rois):
sumphases = np.nansum(np.tile(self.c_exp[i, :], (t_max,1)).T * np.exp(1j * Phases), axis=0) / np.nansum(self.c_exp[i, :])
enstrophy[i] = np.abs(sumphases) # Kuramoto local order parameter
entrophy[i] = np.abs(sumphases) # Kuramoto local order parameter

# Calculate Kuramoto global order parameter and metastability for all nodes
gKoP = np.nanmean(np.abs(np.sum(np.exp(1j * Phases), axis=0)) / n_rois) # Global Kuramoto parameter (synchronization) for all nodes
Meta = np.nanstd(np.abs(np.sum(np.exp(1j * Phases), axis=0)) / n_rois) # Global metastability for all nodes

R_spa_time = np.nanstd(enstrophy) # Amplitude turbulence (std of Kuramoto local order parameter across nodes and timepoints)
R_spa = np.nanstd(enstrophy, axis=1).T # Amplitude turbulence (std of Kuramoto local order parameter per timepoint across nodes)
R_time = np.nanstd(enstrophy, axis=0) # Amplitude turbulence (std of Kuramoto local order parameter per node across timepoints)
R_spa_time = np.nanstd(entrophy) # Amplitude turbulence (std of Kuramoto local order parameter across nodes and timepoints)
R_spa = np.nanstd(entrophy, axis=1).T # Amplitude turbulence (std of Kuramoto local order parameter per timepoint across nodes)
R_time = np.nanstd(entrophy, axis=0) # Amplitude turbulence (std of Kuramoto local order parameter per node across timepoints)
acf_spa = matlab_tricks.autocorr(R_spa, 100) # Autocorrelation of R in space
acf_time = matlab_tricks.autocorr(R_time, 100) # Autocorrelation of R in time

return {
'Rspatime': R_spa_time, # Amplitude turbulence
'Rspa': R_spa.T, # Amplitude turbulence across nodes per timepoint
'Rtime': R_time, # Amplitude turbulence across timepoints per node
'acfspa': acf_spa, # Autocorrelation of R across space
'acftime': acf_time, # Autocorrelation of R across time
'enstrophy': enstrophy, # Kuramoto local order parameter
'gKoP': gKoP, # Global Kuramoto parameter (synchronization)
'Meta': Meta # Global metastability
'R_spa_time': R_spa_time, # Amplitude turbulence
'R_spa': R_spa.T, # Amplitude turbulence across nodes per timepoint
'R_time': R_time, # Amplitude turbulence across timepoints per node
'acf_spa': acf_spa, # Autocorrelation of R across space
'acf_time': acf_time, # Autocorrelation of R across time
'entrophy': entrophy, # Kuramoto local order parameter
'gKoP': gKoP, # Global Kuramoto parameter (synchronization)
'Meta': Meta # Global metastability
}