import numpy as np
import pandas as pd
from scipy.signal import find_peaks
from scipy.integrate import odeint
from scipy.optimize import curve_fit
import matplotlib.pyplot as plt
import plotly.graph_objs as go
from ..sleep.sleep import Roenneberg, Oakley, Crespo, Scripps, Sadeh, Cole_Kripke
[docs]
class Model:
"""
Numerically integrates a system of ordinary differential equations (ODEs), whose evolution may
depend on external inputs (e.g., light) and initial conditions. Time points and input data are
either provided directly or extracted from a pandas.Series with a DatetimeIndex.
Attributes
----------
initial_conditions : numpy.ndarray
Array of initial conditions for the model states.
model_states : numpy.ndarray or None
Array containing the model state trajectories after integration.
data : pandas.Series, optional
Input data series with a DatetimeIndex, where the index specifies the time points and
the values represent the input variable (e.g., light intensity). Time and value arrays
are extracted from this series.
time : numpy.ndarray, optional
Array of time points (in hours); must be monotonically increasing.
inputs : numpy.ndarray, optional
Array of input values (e.g., light intensity) corresponding to each time point.
"""
def __init__(self, initial_conditions, data=None, time=None, inputs=None):
self.initial_conditions = initial_conditions
self.model_states = None
self.data = data
# Extract time from data index
if time is None or inputs is None:
if data is not None:
self.time = np.asarray(
(data.index - data.index.min()).total_seconds() / 3600
)
self.inputs = np.asarray(data.values)
else:
raise ValueError(
"Must provide either light time series (data) or input and time."
)
else:
self.time = time
self.inputs = inputs
[docs]
def initialize_model_states(self):
"""
Initializes the model states by numerically integrating the system equations.
This method runs the model integration using the current initial conditions,
input values, and time vector, and stores the result in `self.model_states`.
"""
self.model_states = self.integrate()
[docs]
def integrate(self, light_vector=None, time_vector=None, initial_condition=None):
"""
Numerically integrates a system of ordinary differential equations (ODEs).
This method uses SciPy `odeint` function to simulate the model dynamics over time, given a set of
initial conditions and external inputs (e.g., light intensity). By default, it uses the class attributes
`self.inputs`, `self.time`, and `self.initial_condition`, but alternative arrays can be provided.
The input and time vectors must be of the same length, and the system's equations must be defined
by the `derivative`method (which should be implemented within subclass of the Model class).
Parameters
----------
light_vector (numpy.ndarray, optional):
Array of input values (typically, light intensity).
time_vector (numpy.ndarray, optional):
Array of time points (typically, time in hours).
initial_condition (numpy.ndarray, optional):
Array of initial conditions.
Returns
-------
numpy.ndarray:
Simulated state trajectories over the specified time vector.
"""
light_vector = light_vector if light_vector is not None else self.inputs
time_vector = time_vector if time_vector is not None else self.time
initial_condition = (
initial_condition
if initial_condition is not None
else self.initial_conditions
)
# Define a function to include light input
def system_with_light(state, t):
# light_input = light_vector[min(int(t), len(light_vector) - 1)]
# index = np.where(time_vector <= t)[0][-1]
index = np.searchsorted(time_vector, t, side="right")
index = min(index, len(time_vector) - 1)
light_input = light_vector[index]
return self.derivative(t, state, light_input)
# Use odeint to integrate the system
solution = odeint(system_with_light, initial_condition, time_vector)
return solution
[docs]
def get_initial_conditions(
self, loop_number, data=None, light_vector=None, time_vector=None
):
"""
Attempts to equilibrate the model's initial conditions by repeated simulation given a light and time vector.
This method iteratively integrates the model for a user-specified number of cycles to assess whether equilibrium
is achieved, as indicated by the stabilization of the predicted DLMO (Dim Light Melatonin Onset).
Parameters
----------
loop_number (int):
Number of consecutive simulation cycles performed to assess whether the state trajectory of the circadian
system's state variables converges to equilibrium under the given input conditions.
data (pandas.Series, optional):
Input data series with a DatetimeIndex. The index represents time points and the values represent input
data (e.g., light intensity). The time and input arrays are extracted from this series.
light_vector (numpy.ndarray, optional):
Array of input values (typically, light intensity).
time_vector (numpy.ndarray, optional):
Array of time points (typically, time in hours).
Returns
----------
numpy.ndarray:
The final state of the model after attempting to reach equilibrium (either an equilibrated solution or last
simulated state).
"""
# Extract time from light index
if time_vector is None or light_vector is None:
if data is not None:
time_vector = np.asarray(
(data.index - data.index.min()).total_seconds() / 3600
)
light_vector = np.asarray(data.values)
else:
raise ValueError("Must provide either light series or input and time.")
else:
time_vector = time_vector
light_vector = light_vector
# Initialize with default initial conditions
initial_condition = self.initial_conditions
# List to collect dlmos
dlmos = []
for i in range(loop_number):
# Calculate the solution for the models
solution = self.integrate(
initial_condition=initial_condition,
light_vector=light_vector,
time_vector=time_vector,
)
# Set current model states the solution
self.model_states = solution
# Calculate dlmos from the solution
dlmos.append(self.dlmos())
# Update initial_condition for the next iteration
initial_condition = solution[-1]
# Exit early if entrainment is detected
if i > 1 and np.isclose(dlmos[-1][-1], dlmos[-2][-1], atol=1e-3):
# Print number of loops required for entrainment
# print(f"The model entrained after {i} loops.")
# Update model initial conditions to entrained state
self.initial_conditions = solution[-1]
# Return entrained model solution
return solution[-1]
# Non-entrainment message(free-running rhythm)
# print(
# "The model did not entrain due to insufficient loops or unentrainable light schedule."
# )
# Return unentrained model solution
self.initial_conditions = solution[-1]
return solution[-1]
[docs]
def dlmos(self):
"""
Calculates the predicted Dim Light Melatonin Onset (DLMO) time points using a fixed,
pre-specified offset (default: seven hours).
Returns
-------
numpy.ndarray:
Array of time points (in hours) at which DLMO is expected to occur.
"""
return self.cbt() - self.cbt_to_dlmo
[docs]
def plot(self, states=False, dlmo=False, cbtmin=False):
# Create a new plotly figure
fig = go.Figure()
if states:
# Calculate number of states available
states = self.model_states.shape[1]
match self.model_name:
case "Forger":
labels = ["x", "xc", "Light drive"]
case "Jewett":
labels = ["x", "xc", "Light drive"]
case "HannaySP":
labels = ["Amplitude", "Phase", "Light drive"]
case "HannayTP":
labels = ["Ventral Amplitude",
"Dorsal Amplitude",
"Ventral Phase",
"Dorsal Phase",
"Light Drive"
]
case "Hilaire07":
labels = ["x", "xc", "Light drive"]
case "Skeldon23":
labels = ["x", "xc", "Light drive", "Sleep pressure"]
case "Breslow13":
labels = ["x",
"xc",
"Light Drive",
"Pineal melatonin",
"Plasma melatonin",
"Exogenous melatonin"
]
# Iterate over states and plot them
for i in range(states):
fig.add_trace(
go.Scatter(
x=self.data.index.astype(str),
y=self.model_states[:, i],
name=f"{labels[i]}",
)
)
fig.update_layout(
title="Model States",
xaxis=dict(title="Time"),
yaxis=dict(title="Model States"),
)
return fig
if dlmo:
# Plot daily predicted DLMO
fig.add_trace(
go.Scatter(
x=pd.Series(self.data.index.date.astype(str)).unique(),
y=self.dlmos() % 24,
name="Predicted DLMO",
)
)
fig.update_layout(
title="Predicted DLMO",
xaxis=dict(title="Day"),
yaxis=dict(title="DLMO time"),
)
return fig
if cbtmin:
# Plot daily predicted DLMO
fig.add_trace(
go.Scatter(
x=pd.Series(self.data.index.date.astype(str)).unique(),
y=self.cbt() % 24,
name="Predicted CBTmin",
)
)
fig.update_layout(
title="Predicted CBTmin",
xaxis=dict(title="Day"),
yaxis=dict(title="CBTmin time"),
)
return fig
[docs]
class Forger(Model):
"""
Implements the mathematical model of human circadian rhythms developed by Forger, Jewett and Kronauer [1].
The formalism includes a representation of the biochemical conversion of the light signal into a drive on
the circadian pacemaker, which is modeled as a van der Pol oscillator. This cubic model is characterized
by three state variables: x, xc and n. While n can be interpreted as the proportion of activated photoreceptors,
at a given time, x and xc cannot directly be mapped to specific physiological mechanisms. Instead, x and xc are
used to predict biologically meaningful quantities, such as the core body temperature minimum (CBTmin).
Our implementation closely follows the approach of the `circadian`package by Arcascope [2]. However, we use the
more powerful LSODA integrator (via SciPy's `odeint`) for numerical integration, enabling the integration of the
system using more complex light trajectories.
Attributes
----------
taux : float
Intrinsic period of the oscillator (hours).
mu : float
stiffness of the van der Pol oscillator.
g : float
Light sensitivity scaling parameter.
alpha_0 : float
Baseline forward rate constant governing the photon-driven activation
(or depletion) of photoreceptors. This parameter scales the rate at which
incident light converts available photoreceptors into an active or
“used” state.
beta : float
Backward rate constant governing photoreceptor recovery or regeneration.
This parameter controls the rate at which used photoreceptors return to
the available state, representing dark adaptation or biochemical
recovery processes.
p : float
Power-law exponent for light-dependent photoreceptor activation. This
parameter determines how sensitively the forward rate constant α scales
with light intensity.
i0 : float
Reference light intensity used to normalize the input light intensity vector I.
k : float
Coupling coefficient scaling the influence of the photic drive B on the
circadian pacemaker. It modulates the sensitivity of the oscillator to light.
cbt_to_dlmo : float
Time offset (in hours) from CBTmin to DLMO.
initial_conditions : numpy.ndarray
State vector at the start of simulation (default: [-0.0843259, -1.09607546, 0.45584306]).
model_states : numpy.ndarray
Integrated state trajectories of the model.
time : numpy.ndarray
Array of time points for simulation.
inputs : numpy.ndarray
Array of input values (e.g., light intensity) over time.
hilaire_correction : bool
Whether to use Hilaire's formula to compute alpha. Default is False.
Methods
-------
derivative(t, state, light)
Computes the derivatives of the state variables at a given time and light input.
amplitude()
Calculates the amplitude of the oscillator from integrated states.
phase()
Calculates the phase angle of the oscillator from integrated states.
cbt()
Identifies the timing of core body temperature minima from integrated states.
References
----------
[1] Forger DB, Jewett ME, Kronauer RE. A Simpler Model of the Human Circadian Pacemaker.
Journal of Biological Rhythms. 1999;14(6):533-538. doi:10.1177/074873099129000867
[2] Tavella, F., Hannay, K., & Walch, O. (2023). Arcascope/circadian: Refactoring of readers
and metrics modules, Zenodo, v1.0.2. https://doi.org/10.5281/zenodo.8206871
"""
def __init__(
self,
data=None,
inputs=None,
time=None,
taux=24.2,
mu=0.23,
g=33.75,
alpha_0=0.05,
beta=0.0075,
p=0.50,
i0=9500.0,
k=0.55,
cbt_to_dlmo=7.0,
initial_condition=None,
hilaire_correction=False,
):
if inputs is None or time is None:
super().__init__(
data=data,
initial_conditions=np.array([-0.0843259, -1.09607546, 0.45584306]),
)
else:
super().__init__(
inputs=inputs,
time=time,
initial_conditions=np.array([-0.0843259, -1.09607546, 0.45584306]),
)
# Check for a scenario in which a initial condition is provided
if initial_condition is not None:
self.initial_conditions = initial_condition
# self.initial_conditions = np.array([-0.0843259, -1.09607546, 0.45584306])
# self.inputs = inputs
# self.time = time
self.model_name = self.__class__.__name__
self.taux = taux
self.mu = mu
self.g = g
self.alpha_0 = alpha_0
self.beta = beta
self.p = p
self.i0 = i0
self.k = k
self.cbt_to_dlmo = cbt_to_dlmo
# self.model_states = self.integrate()
self.hilaire_correction = hilaire_correction
self.initialize_model_states()
[docs]
def derivative(self, t, state, light):
"""
Computes the derivatives of the state variables at a given time and light input.
Parameters
----------
t : float
Current simulation time (in hours).
state : numpy.ndarray
Current values of the state variables at time t.
light : float
Light intensity input (in lux) at time t.
Returns
-------
numpy.ndarray
Derivatives at time t.
"""
x = state[0]
xc = state[1]
n = state[2]
if self.hilaire_correction:
# note this correction on the alpha term (important to test Forger's model
# better approximates Hannay's under the ModelComparer framework)
alpha = self.alpha_0 * np.power(light / self.i0, self.p) * (light / (light + 100))
else:
alpha = self.alpha_0 * pow((light / self.i0), self.p)
Bhat = self.g * (1.0 - n) * alpha * (1 - 0.4 * x) * (1 - 0.4 * xc)
mu_term = self.mu * (xc - 4.0 / 3.0 * pow(xc, 3.0))
taux_term = pow(24.0 / (0.99669 * self.taux), 2.0) + self.k * Bhat
dydt = np.zeros_like(state)
dydt[0] = np.pi / 12.0 * (xc + Bhat)
dydt[1] = np.pi / 12.0 * (mu_term - x * taux_term)
dydt[2] = 60.0 * (alpha * (1.0 - n) - self.beta * n)
return dydt
[docs]
def amplitude(self):
"""
Amplitude of the oscillator computed from the integrated state trajectory.
Returns
-------
numpy.ndarray
Amplitude at each time point: sqrt(x^2 + xc^2).
"""
# Integrate model and extract amplitude
x = self.model_states[:, 0]
xc = self.model_states[:, 1]
return np.sqrt(x**2 + xc**2)
[docs]
def phase(self):
"""
Returns the phase angle of the oscillator computed from the integrated state trajectory.
Returns
-------
numpy.ndarray
Phase angle (radians) at each time point: arctangent(xc/x).
"""
# Integrate model and extract phase
x = self.model_states[:, 0]
# Multiplying xc makes the phase move clockwise
xc = -1.0 * self.model_states[:, 1]
# Observe that np.angle(x + complex(0,1) * xc) == np.atan2(xc,x)
# The y (in this case, xc) appears first in the atan2 formula
return np.angle(x + complex(0, 1) * xc)
# return np.atan2(xc,x)
[docs]
def cbt(self):
"""
Time points corresponding to the predicted core bod temperature minima (CBTmin), derived from the
state variable x.
Returns
-------
numpy.ndarray
Array of time points (in hours) where minima of x occur, corresponding to the CBTmin.
"""
# Calculate time step (dt) between consecutive time points
dt = np.diff(self.time)[0]
# Invert cos(x) to turn the minima into maxima (peaks)
inverted_x = -1.0 * self.model_states[:, 0]
# Identify the indices where the minima occur
cbt_min_indices, _ = find_peaks(inverted_x, distance=np.ceil(13.0 / dt))
# Use the previous indices to find the cbtmin times
cbtmin_times = self.time[cbt_min_indices]
# if you want to know in clock time, just do cbtmin_times % 24
return cbtmin_times
[docs]
class Jewett(Model):
"""
Implements a refined version of the Forger, Jewett and Kronauer (FJK) model of human circadian rhythms,
containing a nonlinearity of degree seven [1]. Compared to the FJK model implemented in the Forger subclass,
the revised model recovers strength more slowly when the rhythm is very weak (low amplitude), but recovers
faster once it is close to a stable rhythm (high amplitude).
This model is characterized by three state variables: x, xc and n. While n can be interpreted as the proportion
of activated photoreceptors, at a given time, x and xc cannot directly be mapped to specific physiological
mechanisms. Instead, x and xc are used to predict biologically meaningful quantities, such as the core body
temperature minimum (CBTmin).
Our implementation closely follows the approach of the `circadian`package by Arcascope [2]. However, we use the
more powerful LSODA integrator (via SciPy's `odeint`) for numerical integration, enabling the integration of the
system using more complex light trajectories.
Attributes
----------
taux : float
Intrinsic period of the oscillator (hours).
mu : float
Stiffness of the van der Pol oscillator.
g : float
Light sensitivity scaling parameter.
alpha_0 : float
Baseline forward rate constant governing the photon-driven activation
(or depletion) of photoreceptors. This parameter scales the rate at which
incident light converts available photoreceptors into an active or
“used” state.
beta : float
Backward rate constant governing photoreceptor recovery or regeneration.
This parameter controls the rate at which used photoreceptors return to
the available state, representing dark adaptation or biochemical
recovery processes.
p : float
Power-law exponent for light-dependent photoreceptor activation. This
parameter determines how sensitively the forward rate constant α scales
with light intensity.
i0 : float
Reference light intensity used to normalize the input light intensity vector I.
k : float
Coupling coefficient scaling the influence of the photic drive B on the
circadian pacemaker. It modulates the sensitivity of the oscillator to light.
q : float
Coefficient governing an additional effect of light whereby, while light
is present, it acts to shorten the intrinsic period of the pacemaker in
addition to shifting its phase and amplitude.
phi_ref : float
Reference phase offset relating the timing of the model state minimum
(x_min) to the timing of core body temperature minimum (CBT_min).
cbt_to_dlmo : float
Time offset (in hours) from CBTmin to DLMO.
initial_conditions : numpy.ndarray
State vector at the start of simulation (default: [-0.10097101, -1.21985662, 0.50529415]).
model_states : numpy.ndarray
Integrated state trajectories of the model.
time : numpy.ndarray
Array of time points for simulation.
inputs : numpy.ndarray
Array of input values (e.g., light intensity) over time.
Methods
-------
derivative(t, state, light)
Computes the derivatives of the state variables at a given time and light input.
amplitude()
Calculates the amplitude of the oscillator from integrated states.
phase()
Calculates the phase angle of the oscillator from integrated states.
cbt()
Identifies the timing of core body temperature minima from integrated states.
References
----------
[1] Jewett ME, Forger DB, Kronauer RE. Revised Limit Cycle Oscillator Model of Human Circadian
Pacemaker. Journal of Biological Rhythms. 1999;14(6):493-500. https://doi.org/10.1177/074873049901400608
[2] Tavella, F., Hannay, K., & Walch, O. (2023). Arcascope/circadian: Refactoring of readers
and metrics modules, Zenodo, v1.0.2. https://doi.org/10.5281/zenodo.8206871
"""
def __init__(
self,
data=None,
inputs=None,
time=None,
taux=24.2,
mu=0.13,
g=19.875,
beta=0.013,
k=0.55,
q=1.0 / 3.0,
i0=9500,
p=0.6,
alpha_0=0.16,
phi_ref=0.8,
cbt_to_dlmo=7.0,
initial_condition=None,
):
if inputs is None or time is None:
super().__init__(
data=data,
initial_conditions=np.array([-0.10097101, -1.21985662, 0.50529415]),
)
else:
super().__init__(
inputs=inputs,
time=time,
initial_conditions=np.array([-0.10097101, -1.21985662, 0.50529415]),
)
# Check for a scenario in which a initial condition is provided
if initial_condition is not None:
self.initial_conditions = initial_condition
# self.initial_conditions= np.array([-0.10097101, -1.21985662, 0.50529415])
# self.inputs = inputs
# self.time = time
self.model_name = self.__class__.__name__
self.taux = taux
self.mu = mu
self.g = g
self.beta = beta
self.k = k
self.q = q
self.i0 = i0
self.p = p
self.alpha_0 = alpha_0
self.phi_ref = phi_ref
self.cbt_to_dlmo = cbt_to_dlmo
# self.model_states = self.integrate()
self.initialize_model_states()
[docs]
def derivative(self, t, state, light):
"""
Computes the derivatives of the state variables at a given time and light input.
Parameters
----------
t : float
Current simulation time (in hours).
state : numpy.ndarray
Current values of the state variables at time t.
light : float
Light intensity input (in lux) at time t.
Returns
-------
numpy.ndarray
Derivatives at time t.
"""
x = state[0]
xc = state[1]
n = state[2]
alpha = self.alpha_0 * (light / self.i0) ** self.p
Bhat = self.g * alpha * (1 - n) * (1 - 0.4 * x) * (1 - 0.4 * xc)
mu_term = self.mu * (1.0 / 3.0 * x + 4.0 / 3.0 * x**3 - 256.0 / 105.0 * x**7)
taux_term = pow(24.0 / (0.99729 * self.taux), 2) + self.k * Bhat
dydt = np.zeros_like(state)
dydt[0] = np.pi / 12 * (xc + mu_term + Bhat)
dydt[1] = np.pi / 12 * (self.q * Bhat * xc - x * taux_term)
dydt[2] = 60.0 * (alpha * (1 - n) - self.beta * n)
return dydt
[docs]
def amplitude(self):
"""
Amplitude of the oscillator computed from the integrated state trajectory.
Returns
-------
numpy.ndarray
Amplitude at each time point: sqrt(x^2 + xc^2).
"""
# Integrate model and extract amplitude
x = self.model_states[:, 0]
xc = self.model_states[:, 1]
return np.sqrt(x**2 + xc**2)
[docs]
def phase(self):
"""
Returns the phase angle of the oscillator computed from the integrated state trajectory.
Returns
-------
numpy.ndarray
Phase angle (radians) at each time point: arctangent(xc/x).
"""
# Integrate model and extract phase
x = self.model_states[:, 0]
# Multiplying xc makes the phase move clockwise
xc = -1.0 * self.model_states[:, 1]
# Observe that np.angle(x + complex(0,1) * xc) == np.atan2(xc,x)
# The y (in this case, xc) appears first in the atan2 formula
return np.angle(x + complex(0, 1) * xc)
# return np.atan2(xc,x)
[docs]
def cbt(self):
"""
Time points corresponding to the predicted core bod temperature minima (CBTmin), derived from the
state variable x.
Returns
-------
numpy.ndarray
Array of time points (in hours) where minima of x occur, corresponding to the CBTmin.
"""
# Calculate time step (dt) between consecutive time points
dt = np.diff(self.time)[0]
# Invert cos(x) to turn the minima into maxima (peaks)
inverted_x = -1.0 * self.model_states[:, 0]
# Identify the indices where the minima occur
cbt_min_indices, _ = find_peaks(inverted_x, distance=np.ceil(13.0 / dt))
# Use the previous indices to find the cbtmin times
cbtmin_times = self.time[cbt_min_indices]
# if you want to know in clock time, just do cbtmin_times % 24
return cbtmin_times
[docs]
class HannaySP(Model):
"""
Implements the Hannay Single-Population (SP) model of human circadian rhythms [1]. It describes a population of
weakly-coupled oscillators using a formalism with three state variables: R (collective amplitude), Psi (collective
phase), and n (proportion of light receptors used). In contrast to the FJK formalism, all three state variables are
directly biologically interpretable. The model is derived from the mathematical description of the rhythm within
individual cells in the suprachiasmatic nucleus (SCN) of the hypothalamus, from which a coherent behavior emerges.
Our implementation closely follows the approach of the `circadian`package by Arcascope [2]. However, we use the
more powerful LSODA integrator (via SciPy's `odeint`) for numerical integration, enabling the integration of the
system using more complex light trajectories.
Attributes
----------
tau : float
Intrinsic period of the oscillator (hours).
k : float
Average coupling strength between clock neurons.
gamma : float
Width (dispersion) of the intrinsic frequency distribution across the
population of clock neurons, reflecting variability between neurons.
beta : float
Backward rate constant governing photoreceptor recovery or regeneration.
This parameter controls the rate at which used photoreceptors return to
the available state, representing dark adaptation or biochemical
recovery processes.
a1 : float
Coefficient that scales the first harmonic component of the single-cell phase-response curve
a2 : float
Coefficient that scales the first harmonic component of the single-cell phase-response curve
betal1 : float
Phase offset for first harmonic component of the single-cell phase-response curve (radians).
betal2 : float
Phase offset for second harmonic component of the single-cell phase-response curve (radians).
sigma : float
Factor scaling the effect of light input on the circadian pacemaker (B(t)).
g : float
Light sensitivity scaling parameter.
alpha_0 : float
Baseline forward rate constant governing the photon-driven activation
(or depletion) of photoreceptors. This parameter scales the rate at which
incident light converts available photoreceptors into an active or
“used” state.
delta : float
Backward rate constant governing photoreceptor recovery or regeneration.
This parameter controls the rate at which used photoreceptors return to
the available state, representing dark adaptation or biochemical
recovery processes.
p : float
Power-law exponent for light-dependent photoreceptor activation. This
parameter determines how sensitively the forward rate constant α scales
with light intensity.
i0 : float
Reference light intensity used to normalize the input light intensity vector I.
cbt_to_dlmo : float
Time offset (in hours) from CBTmin to DLMO.
initial_conditions : numpy.ndarray
State vector at the start of simulation (default: [0.82041911, 1.71383697, 0.52318122]).
model_states : numpy.ndarray
Integrated state trajectories of the model.
time : numpy.ndarray
Array of time points for simulation.
inputs : numpy.ndarray
Array of input values (e.g., light intensity) over time.
Methods
-------
derivative(t, state, light)
Computes the derivatives of the state variables at a given time and light input.
amplitude()
Returns the collective rhythm amplitude (R) over time.
phase()
Returns the collective phase (Psi) as a wrapped angle over time.
cbt()
Identifies the timing of core body temperature minima from the phase trajectory.
References
----------
[1] Hannay KM, Booth V, Forger DB. Macroscopic Models for Human Circadian Rhythms.
Journal of Biological Rhythms. 2019;34(6):658-671. https://doi.org/10.1177/0748730419878298
[2] Tavella, F., Hannay, K., & Walch, O. (2023). Arcascope/circadian: Refactoring of readers
and metrics modules, Zenodo, v1.0.2. https://doi.org/10.5281/zenodo.8206871
"""
def __init__(
self,
data=None,
inputs=None,
time=None,
tau=23.84,
k=0.06358,
gamma=0.024,
beta=-0.09318,
a1=0.3855,
a2=0.1977,
betal1=-0.0026,
betal2=-0.957756,
sigma=0.0400692,
g=33.75,
alpha_0=0.05,
delta=0.0075,
p=1.5,
i0=9325.0,
cbt_to_dlmo=7.0,
initial_condition=None,
):
if inputs is None or time is None:
super().__init__(
data=data,
initial_conditions=np.array([0.82041911, 1.71383697, 0.52318122]),
)
else:
super().__init__(
inputs=inputs,
time=time,
initial_conditions=np.array([0.82041911, 1.71383697, 0.52318122]),
)
# Check for a scenario in which a initial condition is provided
if initial_condition is not None:
self.initial_conditions = initial_condition
# self.initial_conditions = np.array([0.82041911, 1.71383697, 0.52318122])
# self.inputs = inputs
# self.time = time
self.model_name = self.__class__.__name__
self.tau = tau
self.k = k
self.gamma = gamma
self.beta = beta
self.a1 = a1
self.a2 = a2
self.betal1 = betal1
self.betal2 = betal2
self.sigma = sigma
self.g = g
self.alpha_0 = alpha_0
self.delta = delta
self.p = p
self.i0 = i0
self.cbt_to_dlmo = cbt_to_dlmo
# self.model_states = self.integrate()
self.initialize_model_states()
[docs]
def derivative(self, t, state, light):
"""
Computes the derivatives of the state variables at a given time and light input.
Parameters
----------
t : float
Current simulation time (in hours).
state : numpy.ndarray
Current values of the state variables at time t.
light : float
Light intensity input (in lux) at time t.
Returns
-------
numpy.ndarray
Derivatives at time t.
"""
R = state[0]
Psi = state[1]
n = state[2]
alpha = self.alpha_0 * pow(light, self.p) / (pow(light, self.p) + self.i0)
Bhat = self.g * (1.0 - n) * alpha
A1_term_amp = (
self.a1 * 0.5 * Bhat * (1.0 - pow(R, 4.0)) * np.cos(Psi + self.betal1)
)
A2_term_amp = (
self.a2
* 0.5
* Bhat
* R
* (1.0 - pow(R, 8.0))
* np.cos(2.0 * Psi + self.betal2)
)
LightAmp = A1_term_amp + A2_term_amp
A1_term_phase = (
self.a1 * Bhat * 0.5 * (pow(R, 3.0) + 1.0 / R) * np.sin(Psi + self.betal1)
)
A2_term_phase = (
self.a2 * Bhat * 0.5 * (1.0 + pow(R, 8.0)) * np.sin(2.0 * Psi + self.betal2)
)
LightPhase = self.sigma * Bhat - A1_term_phase - A2_term_phase
dydt = np.zeros_like(state)
dydt[0] = (
-1.0 * self.gamma * R
+ self.k * np.cos(self.beta) / 2.0 * R * (1.0 - pow(R, 4.0))
+ LightAmp
)
dydt[1] = (
2 * np.pi / self.tau
+ self.k / 2.0 * np.sin(self.beta) * (1 + pow(R, 4.0))
+ LightPhase
)
dydt[2] = 60.0 * (alpha * (1.0 - n) - self.delta * n)
return dydt
[docs]
def amplitude(self):
"""
Collective rhythm amplitude (R) of the oscillator population.
Returns
-------
numpy.ndarray
Amplitude (R) at each time point.
"""
# Integrate model and extract collective rhythm amplitude (r)
return self.model_states[:, 0]
[docs]
def phase(self):
"""
Collective ventral phase (Psi) of the oscillator population, wrapped to [-pi,pi].
Returns
-------
numpy.ndarray
Phase angle (radians) at each time point.
"""
# Integrate model and extract collective phase (phi)
phi = self.model_states[:, 1]
x = np.cos(phi)
y = np.sin(phi)
return np.angle(x + complex(0, 1) * y)
[docs]
def cbt(self):
"""
Time points corresponding to the predicted core bod temperature minima (CBTmin), derived from the
collective phase trajectory.
Returns
-------
numpy.ndarray
Array of time points (in hours) where minima of cos(Psi) occur, corresponding to the CBTmin.
"""
# Calculate time step (dt) between consecutive time points
dt = np.diff(self.time)[0]
# Invert cos(x) to turn the minima into maxima (peaks)
inverted_x = -np.cos(self.model_states[:, 1])
# Identify the indices where the minima occur
cbt_min_indices, _ = find_peaks(inverted_x, distance=np.ceil(13.0 / dt))
# Use the previous indices to find the cbtmin times
cbtmin_times = self.time[cbt_min_indices]
# if you want to know in clock time, just do cbtmin_times % 24
return cbtmin_times
[docs]
class HannayTP(Model):
"""
Implements the Hannay Two-Population (TP) model of human circadian rhythms [1]. The neuroanatomy of the
suprachiasmatic nucleus (SCN) in the hypothalamus distinguishes two primary populations: the ventral cluster,
which receives most light input, and the dorsal cluster. To better reflect this neurophysiological organization,
Hannay et al. extended the Single Population (SP) model to explicitly represent both the ventral and dorsal
groups, resulting in the TP model. Similar to the HannaySP, all five state variables are directly biologically
interpretable. The model is also derived from the mathematical description of the rhythm within individual cells,
from which a coherent behavior emerges.
Our implementation closely follows the approach of the `circadian`package by Arcascope [2]. However, we use the
more powerful LSODA integrator (via SciPy's `odeint`) for numerical integration, enabling the integration of the
system using more complex light trajectories.
Attributes
----------
tauv : float
Intrinsic period of the ventral oscillator (hours).
taud : float
Intrinsic period of the dorsal oscillator (hours).
kvv : float
Average coupling strength within the ventral oscillator population.
kdd : float
Average coupling strength within the dorsal oscillator population.
kvd : float
Average strength from ventral to dorsal population.
kdv : float
Average strength from dorsal to ventral population.
gamma : float
Width (dispersion) of the intrinsic frequency distribution across the
population of clock neurons, reflecting variability between neurons.
a1 : float
Coefficient that scales the first harmonic component of the single-cell phase-response curve
a2 : float
Coefficient that scales the first harmonic component of the single-cell phase-response curve
betal1 : float
Phase offset for first harmonic component of the single-cell phase-response curve (radians).
betal2 : float
Phase offset for second harmonic component of the single-cell phase-response curve (radians).
sigma : float
Factor scaling the effect of light input on the circadian pacemaker (B(t)).
g : float
Light sensitivity scaling parameter.
alpha_0 : float
Baseline forward rate constant governing the photon-driven activation
(or depletion) of photoreceptors. This parameter scales the rate at which
incident light converts available photoreceptors into an active or
“used” state.
delta : float
Backward rate constant governing photoreceptor recovery or regeneration.
This parameter controls the rate at which used photoreceptors return to
the available state, representing dark adaptation or biochemical
recovery processes.
p : float
Power-law exponent for light-dependent photoreceptor activation. This
parameter determines how sensitively the forward rate constant α scales
with light intensity.
i0 : float
Reference light intensity used to normalize the input light intensity vector I.
cbt_to_dlmo : float
Time offset (in hours) from CBTmin to DLMO.
initial_conditions : numpy.ndarray
State vector at the start of simulation (default: [0.82423745, 0.82304996, 1.75233424, 1.863457, 0.52318122]).
model_states : numpy.ndarray
Integrated state trajectories of the model.
time : numpy.ndarray
Array of time points for simulation.
inputs : numpy.ndarray
Array of input values (e.g., light intensity) over time.
Methods
-------
derivative(t, state, light)
Computes the derivatives of the state variables at a given time and light input.
amplitude()
Returns the collective ventral rhythm amplitude (Rv) over time.
phase()
Returns the collective ventral phase (Psiv) as a wrapped angle over time.
cbt()
Identifies the timing of core body temperature minima based on the ventral phase trajectory.
References
----------
[1] Hannay KM, Booth V, Forger DB. Macroscopic Models for Human Circadian Rhythms.
Journal of Biological Rhythms. 2019;34(6):658-671. https://doi.org/10.1177/0748730419878298
[2] Tavella, F., Hannay, K., & Walch, O. (2023). Arcascope/circadian: Refactoring of readers
and metrics modules, Zenodo, v1.0.2. https://doi.org/10.5281/zenodo.8206871
"""
def __init__(
self,
data=None,
inputs=None,
time=None,
tauv=24.25,
taud=24.0,
kvv=0.05,
kdd=0.04,
kvd=0.05,
kdv=0.01,
gamma=0.024,
a1=0.440068,
a2=0.159136,
betal=0.06452,
betal2=-1.38935,
sigma=0.0477375,
g=33.75,
alpha_0=0.05,
delta=0.0075,
p=1.5,
i0=9325.0,
cbt_to_dlmo=7.0,
initial_condition=None,
):
if inputs is None or time is None:
super().__init__(
data=data,
initial_conditions=np.array(
[0.82423745, 0.82304996, 1.75233424, 1.863457, 0.52318122]
),
)
else:
super().__init__(
inputs=inputs,
time=time,
initial_conditions=np.array(
[0.82423745, 0.82304996, 1.75233424, 1.863457, 0.52318122]
),
)
# Check for a scenario in which a initial condition is provided
if initial_condition is not None:
self.initial_conditions = initial_condition
# self.initial_conditions = np.array([0.82423745, 0.82304996, 1.75233424, 1.863457, 0.52318122])
# self.inputs = inputs
# self.time = time
self.model_name = self.__class__.__name__
self.tauv = tauv
self.taud = taud
self.kvv = kvv
self.kdd = kdd
self.kvd = kvd
self.kdv = kdv
self.gamma = gamma
self.a1 = a1
self.a2 = a2
self.betal = betal
self.betal2 = betal2
self.sigma = sigma
self.g = g
self.alpha_0 = alpha_0
self.delta = delta
self.p = p
self.i0 = i0
self.cbt_to_dlmo = cbt_to_dlmo
# self.model_states = self.integrate()
self.initialize_model_states()
[docs]
def derivative(self, t, state, light):
"""
Computes the derivatives of the state variables at a given time and light input.
Parameters
----------
t : float
Current simulation time (in hours).
state : numpy.ndarray
Current values of the state variables at time t.
light : float
Light intensity input (in lux) at time t.
Returns
-------
numpy.ndarray
Derivatives at time t.
"""
Rv = state[0]
Rd = state[1]
Psiv = state[2]
Psid = state[3]
n = state[4]
alpha = self.alpha_0 * pow(light, self.p) / (pow(light, self.p) + self.i0)
Bhat = self.g * (1.0 - n) * alpha
A1_term_amp = (
self.a1 * 0.5 * Bhat * (1.0 - pow(Rv, 4.0)) * np.cos(Psiv + self.betal)
)
A2_term_amp = (
self.a2
* 0.5
* Bhat
* Rv
* (1.0 - pow(Rv, 8.0))
* np.cos(2.0 * Psiv + self.betal2)
)
LightAmp = A1_term_amp + A2_term_amp
A1_term_phase = (
self.a1 * Bhat * 0.5 * (pow(Rv, 3.0) + 1.0 / Rv) * np.sin(Psiv + self.betal)
)
A2_term_phase = (
self.a2
* Bhat
* 0.5
* (1.0 + pow(Rv, 8.0))
* np.sin(2.0 * Psiv + self.betal2)
)
LightPhase = self.sigma * Bhat - A1_term_phase - A2_term_phase
dydt = np.zeros_like(state)
dydt[0] = (
-self.gamma * Rv
+ self.kvv / 2.0 * Rv * (1 - pow(Rv, 4.0))
+ self.kdv / 2.0 * Rd * (1 - pow(Rv, 4.0)) * np.cos(Psid - Psiv)
+ LightAmp
)
dydt[1] = (
-self.gamma * Rd
+ self.kdd / 2.0 * Rd * (1 - pow(Rd, 4.0))
+ self.kvd / 2.0 * Rv * (1.0 - pow(Rd, 4.0)) * np.cos(Psid - Psiv)
)
dydt[2] = (
2.0 * np.pi / self.tauv
+ self.kdv / 2.0 * Rd * (pow(Rv, 3.0) + 1.0 / Rv) * np.sin(Psid - Psiv)
+ LightPhase
)
dydt[3] = 2.0 * np.pi / self.taud - self.kvd / 2.0 * Rv * (
pow(Rd, 3.0) + 1.0 / Rd
) * np.sin(Psid - Psiv)
dydt[4] = 60.0 * (alpha * (1.0 - n) - self.delta * n)
return dydt
[docs]
def amplitude(self):
"""
Collective ventral rhythm amplitude (Rv) of the oscillator population.
Returns
-------
numpy.ndarray
Amplitude Rv at each time point.
"""
# Integrate model and extract collective rhythm amplitude (r)
return self.model_states[:, 0]
[docs]
def phase(self):
"""
Collective ventral phase (Psiv) of the oscillator population, wrapped to [-pi,pi].
Returns
-------
numpy.ndarray
Phase angle (radians) at each time point.
"""
# Integrate model and extract collective phase (phi)
phi = self.model_states[:, 2]
x = np.cos(phi)
y = np.sin(phi)
return np.angle(x + complex(0, 1) * y)
[docs]
def cbt(self):
"""
Time points corresponding to the predicted core bod temperature minima (CBTmin), derived from the
ventral phase trajectory.
Returns
-------
numpy.ndarray
Array of time points (in hours) where minima of cos(Psiv) occur, corresponding to the CBTmin.
"""
# Calculate time step (dt) between consecutive time points
dt = np.diff(self.time)[0]
# Invert cos(x) to turn the minima into maxima (peaks)
inverted_x = -np.cos(self.model_states[:, 2])
# Identify the indices where the minima occur
cbt_min_indices, _ = find_peaks(inverted_x, distance=np.ceil(13.0 / dt))
# Use the previous indices to find the cbtmin times
cbtmin_times = self.time[cbt_min_indices]
# if you want to know in clock time, just do cbtmin_times % 24
return cbtmin_times
[docs]
class ESRI:
"""
Compute the Entrainment Signal Regularity Index (ESRI) for a given light schedule.
The ESRI quantifies the strength and regularity of a light schedule by measuring how clustered
the phases of a simulated circadian oscillator become after a fixed-duration exposure. A high
ESRI indicates strong, regular entrainment (phases converge to a similar value), whereas a low
ESRI indicates weak or irregular entrainment.
The computation follows this procedure:
1. Slide a window of length `window_size_days` across the input time series in steps of
`esri_time_step_hours`.
2. For each window start time, initialize a HannaySP model with a small amplitude and phase.
3. Simulate the oscillator under the window's light input.
4. Record the final amplitude as the ESRI value for that window.
Parameters
----------
time : array_like
Time points (in hours) corresponding to the provided light intensities.
inputs : array_like
Light intensity values (e.g., lux) at each time point.
window_size_days : float, optional
Size of the sliding window in days over which ESRI is computed (default: 4.0).
esri_time_step_hours : float, optional
Step size in hours for sliding the window (default: 1.0).
initial_amplitude : float, optional
Initial amplitude for the HannaySP model; should be low to represent maximal phase dispersion
(default: 0.1).
midnight_phase : float, optional
Phase (in radians) corresponding to circadian midnight (default: 1.65238233).
Attributes
----------
window_size : float
Window length in days.
esri_time_step : float
Step size in hours for sliding the window.
initial_amplitude : float
Initial oscillator amplitude for each simulation.
midnight_phase : float
Phase offset for circadian midnight (radians).
time_vector : ndarray
Time points (hours) used for ESRI calculation.
light_vector : ndarray
Light intensity values corresponding to `time_vector`.
raw_values : pandas.DataFrame
DataFrame indexed by window start time (hours) with column `esri` for computed values.
mean : float
Mean ESRI value across all windows.
std : float
Standard deviation of ESRI values across all windows.
"""
def __init__(
self,
data=None,
time=None,
inputs=None,
window_size_days=4.0,
esri_time_step_hours=1.0,
initial_amplitude=0.1,
midnight_phase=1.65238233,
):
# Set parameter values for ESRI calculation
self.window_size = window_size_days
self.esri_time_step = esri_time_step_hours
self.initial_amplitude = initial_amplitude
self.midnight_phase = midnight_phase
# Extract time and light vector
if time is None or inputs is None:
if data is not None:
self.time_vector = np.asarray(
(data.index - data.index.min()).total_seconds() / 3600
)
self.light_vector = np.asarray(data.values)
else:
raise ValueError(
"Must provide either light time series (data) or input and time."
)
else:
self.time_vector = time
self.light_vector = inputs
# Calculate ESRI array, mean and standard deviation
self.raw_values = self.calculate()
self.mean = self.raw_values["esri"].mean()
self.std = self.raw_values["esri"].std()
[docs]
def calculate(self):
"""
Calculates the ESRI time points and corresponding regularity values.
Returns:
--------
pandas.DataFrame
Dataframe indexed by window start time (hours) with column:
esri : float
Final oscillator amplitude (ESRI value) for each window start time.
"""
# Determine the model's time step from the provided time vector
model_time_step = np.diff(self.time_vector)[0]
# Create an array of time points corresponding to the movement of the sliding window
esri_time = np.arange(
self.time_vector[0],
self.time_vector[-1] - self.window_size * 24,
self.esri_time_step,
)
# Array to store esri values for each time point in esri_time
esri_values = np.zeros_like(esri_time)
# Move the sliding window to calculate all ESRI values
for i, t in enumerate(esri_time):
# Initial phase in radians for the current posiition of the sliding window
current_phase_init = self.midnight_phase + (np.mod(t, 24.0) * np.pi / 12)
# Define initial condition based on initial amplitude, current_phase_init and n
model_initial_condition = np.array(
[self.initial_amplitude, current_phase_init, 0.0]
)
# Generate array with model time points to later compute the corresponding light intensity values by linear interpolation
model_time_points = np.arange(
t, t + self.window_size * 24.0, model_time_step
)
# Using linear interpolation, calculate light intensity value at the specified model time points, based on the time and light vector.
linterp_light_vector = np.interp(
model_time_points, self.time_vector, self.light_vector
)
# Calculate the trajetory using the linterp_light_vector and the model_time_points
model = HannaySP(
inputs=linterp_light_vector,
time=model_time_points,
initial_condition=model_initial_condition,
k=0.0,
gamma=0.0,
)
# model amplitude at the end of the simulation
esri_values[i] = model.model_states[-1, 0]
# Any negative values are replaced with NaN
esri_values[esri_values < 0] = np.nan
# Pack the time point and the corresponding esri value into a dataframe
esri_df = pd.DataFrame({"time": esri_time, "esri": esri_values})
# Set 'time' column as the index of the dataframe
esri_df = esri_df.set_index("time")
return esri_df
[docs]
class ModelComparer:
"""
Framework for mapping state variables between circadian rhythm models.
The ModelComparer provides a framework for relating the state variables
of the Forger model to the state variables of the HannaySP model. It enables
a direct comparison between the two models and facilitates a more interpretable,
physiologically grounded understanding of the Forger model states by expressing
them as functions of the circadian phase predicted by the HannaySP model.
Parameters
----------
data : pandas.Series or pandas.DataFrame, optional
Time-indexed light exposure data. If provided, the time vector is
inferred from the index (in hours since the first sample), and the
light input is taken from the values.
inputs : array-like, optional
Light input time series. Must be provided together with `time` if
`data` is not supplied.
time : array-like, optional
Time vector (in hours). Must be provided together with `inputs` if
`data` is not supplied.
equilibrate : bool, default False
If True, both models are equilibrated prior to simulation by repeatedly
looping the input light profile to estimate stable initial conditions.
loop_number : int, default 10
Number of light-profile repetitions used during equilibration.
a1, a2 : float, default 1.0
Amplitude parameters for the trigonometric mapping of the Forger state
variables.
p1, p2 : float, default 1.0
Phase scaling parameters for the trigonometric mapping.
m1, m2 : float, default 0.0
Offset parameters for the trigonometric mapping.
Attributes
----------
time_vector : ndarray
Time vector (hours).
light_vector : ndarray
Light input time series.
forger : Forger
Instance of the Forger model.
hannay : HannaySP
Instance of the HannaySP model.
x, xc : ndarray
State variables of the Forger model.
phase_vector : ndarray
Collective phase of the HannaySP model.
predicted_x, predicted_xc : ndarray or None
Predicted Forger state variables obtained from the HannaySP phase.
"""
def __init__(
self,
data=None,
inputs=None,
time=None,
equilibrate=False,
loop_number=10,
hilaire_correction=True,
a1=1.0,
a2=1.0,
p1=1.0,
p2=1.0,
m1=0.0,
m2=0.0,
):
# Translation params
self.a1 = a1
self.a2 = a2
self.p1 = p1
self.p2 = p2
self.m1 = m1
self.m2 = m2
# Extract time from data index
if time is None or inputs is None:
if data is not None:
self.time_vector = np.asarray(
(data.index - data.index.min()).total_seconds() / 3600
)
self.light_vector = np.asarray(data.values)
else:
raise ValueError(
"Must provide either light time series (data) or input and time."
)
else:
self.time_vector = time
self.light_vector = inputs
# Instantiate forger model
self.forger = Forger(
inputs=self.light_vector,
time=self.time_vector,
hilaire_correction=hilaire_correction
)
if equilibrate:
# First, calculate initial conditions based on the light and time vector
ics = self.forger.get_initial_conditions(
loop_number=loop_number,
light_vector=self.light_vector,
time_vector=self.time_vector,
)
# Second, integrate the equilibrated model and use last state as initial conditions
new_ics = self.forger.integrate(
light_vector=self.light_vector,
time_vector=self.time_vector,
initial_condition=ics,
)[-1]
# Third, use the calculated initial conditions to define current model states
self.forger.model_states = self.forger.integrate(
light_vector=self.light_vector,
time_vector=self.time_vector,
initial_condition=ics,
)
# Instantiate hannay model
self.hannay = HannaySP(inputs=self.light_vector, time=self.time_vector)
if equilibrate:
# First, calculate initial conditions based on the light and time vector
ics = self.hannay.get_initial_conditions(
loop_number=50,
light_vector=self.light_vector,
time_vector=self.time_vector,
)
# Second, integrate the equilibrated model and use last state as initial conditions
new_ics = self.hannay.integrate(
light_vector=self.light_vector,
time_vector=self.time_vector,
initial_condition=ics,
)[-1]
# Third, use the calculated initial conditions to define current model states
self.hannay.model_states = self.hannay.integrate(
light_vector=self.light_vector,
time_vector=self.time_vector,
initial_condition=new_ics,
)
# Store state variables of the forger model
self.x = self.forger.model_states[:, 0]
self.xc = self.forger.model_states[:, 1]
# Store the collective phase of Hannay's model
self.phase_vector = self.hannay.model_states[:, 1]
# Predicted x and xc attributes
self.predicted_x = None
self.predicted_xc = None
[docs]
def predict_forger(self, change_params=True):
"""
Predict Forger state variables using the collective phase predicted
by the HannaySP model.
Expresses the Forger state variables (`x`, `xc`) as explicit functions
of the collective phase predicted by the HannaySP model.
Parameters
----------
change_params: bool, default True
If True, stores the predicted values for the `x` and `xc` state variables
in the attributes `predicted_x` and `predicted_xc`.
Returns
-------
predicted_x : ndarray
Predicted values of the Forger model state variable `x`
predicted_xc : ndarray
Predicted values of the Forger model state variable `xc`
"""
# Predict state variable x in FJK model (either Forger or Jewett)
predicted_x = np.array(
[self.a1 * np.cos(self.p1 * phase) + self.m1 for phase in self.phase_vector]
)
# Predict state variable xc in FJK model (either Forger or Jewett)
predicted_xc = np.array(
[
-1.0 * self.a2 * np.sin(self.p2 * phase) + self.m2
for phase in self.phase_vector
]
)
# Define the predicted_x and predicted_xc attributes
if change_params:
self.predicted_x = predicted_x
self.predicted_xc = predicted_xc
# Return predictions for x and xc as a tuple
return predicted_x, predicted_xc
[docs]
def linearize_phase(self, change_params=False):
"""
Linearize the phase of the Forger model state variables.
Fits a first-degree polynomial to the phase trajectory of the HannaySP
and optionally replaces the stored phase vector with its linear approximation.
Parameters
----------
change_params: bool, default False
If True, replaces `phase_vector` with the linearized phase.
Returns
-------
linear_phase : ndarray
Linear approximation of the collective phase.
"""
# Attempt to fit a straight line (a polynomial of degree 1) to the collective phase
coefficients = np.polyfit(self.time_vector, self.phase_vector, 1)
slope, intercept = coefficients
if change_params == True:
self.phase_vector = slope * self.time_vector + intercept
return slope * self.time_vector + intercept
[docs]
def find_optimal_params(self, change_params=True):
"""
Estimate the optimal mapping parameters between HannaySP and Forger states.
Uses non-linear least squares regression to estimate amplitude, offset, and
phase-scaling parameters for the mapping between the HannaySP phase and the
Forger model state variables `x` and `xc`.
Parameters
----------
change_params : bool, default True
If True, updates the mapping parameters stored in the object.
Returns
-------
params : ndarray, shape (2, 3)
Estimated parameters for `x` and `xc` mappings:
[[a1, m1, p1],
[a2, m2, p2]]
"""
# Define a mapping function for the state variable x in the FJK model
def map_x(phase, A1, M1, P1):
return A1 * np.cos(P1 * phase) + M1
# Define a mapping function for the state variable xc in the FJK model
def map_xc(phase, A2, M2, P2):
return -1.0 * A2 * np.sin(P2 * phase) + M2
# Initial values for A1 and A2, M1 and M2, and P1 and P2
initial_guess = [1.0, 0.0, 1.0]
def calculate_params(map_function, initial_guess, vector_to_predict, phase):
# Find optimal parameters using non-linear least squares regression
optimal_params, covariance = curve_fit(
map_function, phase, vector_to_predict, p0=initial_guess
)
return optimal_params
# Calculate A1, M1, P1, A2, M2 and P2
a1, m1, p1 = calculate_params(map_x, initial_guess, self.x, self.phase_vector)
a2, m2, p2 = calculate_params(map_xc, initial_guess, self.xc, self.phase_vector)
# Change the attributes of the object
if change_params:
self.a1 = a1
self.a2 = a2
self.m1 = m1
self.m2 = m2
self.p1 = p1
self.p2 = p2
# Return a 2 * 3 matrix with parameters
return np.array([[a1, m1, p1], [a2, m2, p2]])
[docs]
def rmse(self):
"""
Compute the root-mean-square error (RMSE) of the predicted Forger model states.
Returns
-------
rmse_x : float
RMSE between observed and predicted `x`.
rmse_xc : float
RMSE between observed and predicted `xc`.
Notes
-------
Requires `predict_forger` to be called beforehand with
`change_params=True`.
"""
rmse_x = np.sqrt(np.mean((self.x - self.predicted_x) ** 2))
rmse_xc = np.sqrt(np.mean((self.xc - self.predicted_xc) ** 2))
return rmse_x, rmse_xc
[docs]
def cumulative_rmse(self):
"""
Compute cumulative RMSE for the predicted Forger model states.
Returns
-------
crmse_x : ndarray
Cumulative RMSE between observed and predicted `x`.
crmse_xc : ndarray
Cumulative RMSE for state variable `xc`.
Notes
-------
Useful for assessing how prediction error evolves over time.
Requires predicted values to be available.
"""
error_x = (self.x - self.predicted_x) ** 2
error_xc = (self.xc - self.predicted_xc) ** 2
crmse_x = np.sqrt(np.cumsum(error_x) / np.arange(1, len(self.x) + 1))
crmse_xc = np.sqrt(np.cumsum(error_xc) / np.arange(1, len(self.xc) + 1))
return crmse_x, crmse_xc
[docs]
def half_life_crmse(self):
"""
Fit an exponential decay function to the cumulative RMSE curves and compute
the half life times of the RMSE for both the `x` and `xc` values.
The half-life represents the time required for the prediction error to decrease
by 50%, and can be used as an index of how readily a given light schedule entrains
the circadian pacemaker.
Returns
-------
half_life_x : float
Half-decay time for the `x` state variable.
half_life_xc : float
Half-decay time for the `xc` state variable.
"""
# Compute the cumulative rmse for x and xc state variables
crmse_x, crmse_xc = self.cumulative_rmse()
# Define exponential decay function
def exponential_decay(t, A, k, C):
return A * np.exp(-k * t) + C
# Obtain optimal parameters to fit exponential decay
def optimal_params(crmse):
return curve_fit(
exponential_decay, # function
self.time_vector, # xdata
crmse, # ydata
p0=[
crmse[0] - crmse[-1], # Initial guess for the amplitude A
0.01, # k: slow decay guess
crmse[-1], # C: asymptotic RMSE value
], # Initial guess for the parameters
maxfev=10000, # Iterations for curve fitting
)[0]
# Unpack optimal params for `x` and `xc`
a_x, k_x, c_x = optimal_params(crmse_x)
a_xc, k_xc, c_xc = optimal_params(crmse_xc)
# Calculate the half-life for state variables
half_life_x = np.inf if k_x <= 0 else np.log(2) / k_x
half_life_xc = np.inf if k_xc <= 0 else np.log(2) / k_xc
# Return the half-life for `x` and `xc` prediction
return half_life_x, half_life_xc
[docs]
class Hilaire07(Model):
"""
St. Hilaire et al. (2007) human circadian pacemaker model.
This model extends earlier light-driven circadian oscillator models (e.g.,
Kronauer / Forger formulations) by including a non-photic component representing
behavioral modulation (sleep/wake state).
Our implementation closely follows the approach of the `circadian`package by Arcascope [2]. However, we use the
more powerful LSODA integrator (via SciPy's `odeint`) for numerical integration, enabling the integration of the
system using more complex light trajectories.
Model inputs
------------
The system is driven by:
- Light intensity (lux)
- Sleep/rest stat (binary; 1=sleep/rest, 0=wake)
Sleep state may be computed automatically from actigraphy data using
one of the supported sleep scoring algorithms available in circStudio
(e.g., Cole-Kripke, Roenneberg), or supplied directly by the user.
Methods
-------
derivative(t, state, light)
Differential equation system of the oscillator.
amplitude()
Calculates the amplitude of the oscillator from integrated states.
phase()
Calculates the phase angle of the oscillator from integrated states.
cbt()
Identifies the timing of core body temperature minima from integrated states.
Parameters
----------
data : pandas.Series, optional
Time-indexed light data. Required if `inputs` and `time` are not explicitly provided.
sleep_algo : str or pandas.Series, optional
Sleep scoring algorithm to use (default: "Roenneberg").
Alternatively, a precomputed binary sleep series may be supplied.
inputs : array-like, optional
Light input.
time : array-like, optional
Time input.
taux : float
Intrinsic circadian period (hours).
g, k, mu, beta, q, rho, i0, p, a0 : float
Model parameters governing light response, non-photic drive,
and oscillator dynamics.
phi_xcx : float
Polar phase angle (radians) between the state variables x and x_c.
phi_ref : float
Fixed phase offset (radians) used to convert internal oscillator phase
to clock time.
cbt_to_dlmo : float
Phase relationship between CBT minimum and DLMO (hours).
initial_condition : array-like, optional
Initial state vector [-0.0480751, -1.22504441, 0.51854818].
References
----------
[1] St. Hilaire, M. A., Klerman, E. B., Khalsa, S. B. S., Wright, K. P., Czeisler, C. A., & Kronauer, R. E. (2007).
Addition of a non-photic component to a light-based mathematical model of the human circadian pacemaker.
Journal of Theoretical Biology, 247(4), 583–599. https://doi.org/10.1016/j.jtbi.2007.04.001
[2] Tavella, F., Hannay, K., & Walch, O. (2023). Arcascope/circadian: Refactoring of readers
and metrics modules, Zenodo, v1.0.2. https://doi.org/10.5281/zenodo.8206871
"""
def __init__(
self,
data=None,
sleep_algo='Roenneberg',
inputs=None,
time=None,
taux=24.2,
g=37.0,
k=0.55,
mu=0.13,
beta=0.007,
q=1.0 / 3.0,
rho=0.032,
i0=9500.0,
p=0.5,
a0=0.1,
phi_xcx=-2.98,
phi_ref=0.97,
cbt_to_dlmo=7.0,
initial_condition=None,
):
if inputs is None or time is None:
super().__init__(
data=data,
initial_conditions=np.array([-0.0480751, -1.22504441, 0.51854818]),
)
else:
super().__init__(
inputs=inputs,
time=time,
initial_conditions=np.array([-0.0480751, -1.22504441, 0.51854818]),
)
# Check for a scenario in which a initial condition is provided
if initial_condition is not None:
self.initial_conditions = initial_condition
self.model_name = self.__class__.__name__
self.taux = taux
self.g = g
self.k = k
self.mu = mu
self.beta = beta
self.q = q
self.rho = rho
self.i0 = i0
self.p = p
self.a0 = a0
self.phi_xcx = phi_xcx
self.phi_ref = phi_ref
self.cbt_to_dlmo = cbt_to_dlmo
self.sleep = self.sleep_vector(algo=sleep_algo)
self.initialize_model_states() if self.sleep is not None else None
[docs]
def sleep_vector(self, algo='Roenneberg', **kwargs):
"""
Return a binary sleep/rest series align to self.data.
This method either:
* Runs one of circStudio's supported sleep-detection algorithms
on self.data or
* Returns a user-supplied sleep series directly.
The output is always a pandas Series indexed by the same DatetimeIndex
as the input data (or by the inex of the user-supplied series).
Parameters
----------
algo : str or pandas.Series, optional
Sleep scoring method to use or pandas Series to use as sleep vector
- If a string, must be one of:
{"Roenneberg", "Crespo", "Oakley", "Scripps", "Sadeh", "Cole_Kripke"}.
- If a pandas Series, it is assumed to already represent a sleep/rest
vector (binary) and is returned unchanged. This is useful when the
user has computed sleep elsewhere or wants full control
kwargs
Additional keyword arguments passed through the selected algorithm.
For example, you can override algorithm default parameters such as
thresholds, windows, or rescoring options.
Returns
-------
pandas.Series
Binary sleep/rest classification time series.
Convention: values are expected to encode sleep/rest as 1 and wake as 0.
If an external algorithm uses the opposite convention, convert it before
passing it here.
Raises
------
ValueError
If self.data is not indexed by a DatetimeIndex (needed to infer the sampling
interval), or if algo is an unknown algorithm.
"""
if not isinstance(self.data.index, pd.DatetimeIndex):
raise ValueError("Data must have a DatetimeIndex.")
# Estimate sampling interval (minutes)
dt = self.data.index.to_series().diff().dropna().median()
freq_minutes = dt.total_seconds() / 60
match algo:
case 'Roenneberg':
return Roenneberg(data=self.data, plot=False, **kwargs)
case 'Crespo':
return Crespo(
data=self.data,
frequency=freq_minutes,
plot=False,
**kwargs
)
case 'Oakley':
return Oakley(data=self.data, **kwargs)
case 'Scripps':
scripps = Scripps(data=self.data, **kwargs)
return scripps
case 'Sadeh':
return Sadeh(data=self.data, **kwargs)
case 'Cole_Kripke':
return Cole_Kripke(data=self.data, **kwargs)
case _:
if isinstance(algo, pd.Series):
return algo
raise ValueError(f'Unknown algo {algo}')
[docs]
def derivative(self, t, state, input):
"""
Computes the derivatives of the state variables at a given time, sleep and light input.
Parameters
----------
t : float
Current simulation time (in hours).
state : numpy.ndarray
Current values of the state variables at time t.
input: tuple
Contains light intensity input (in lux) at time t and the corresponding wake state.
Returns
-------
numpy.ndarray
Derivatives at time t.
"""
x = state[0]
xc = state[1]
n = state[2]
light = input
#wake = self.sleep
# note this correction on the alpha term (important to test Forger's model
# better approximates Hannay's under the ModelComparer framework)
alpha = self.a0 * np.power(light / self.i0, self.p) * (light / (light + 100))
bhat = self.g * (1 - n) * alpha * (1 - (0.4 * x)) * (1 - 0.4 * xc)
# Sigma is set to one if rest/sleep
idx = np.searchsorted(self.time, t, side="right") - 1
idx = max(0, min(idx, len(self.time) - 1))
wake = bool(self.sleep.iloc[idx] if hasattr(self.sleep, "iloc") else self.sleep[idx])
sigma = 1.0 if wake else 0.0
# Calculate psi_cx
c = t % 24
cbtmin = self.phi_xcx + self.phi_ref
cbtminlocal = (cbtmin * 24.0) / (2 * np.pi)
psi_cx = (c - cbtminlocal) % 24
# Calculate Ns
if 16.5 < psi_cx < 21.0:
nsh = self.rho * (1.0 / 3.0)
else:
nsh = self.rho * ((1.0 / 3.0) - sigma)
ns = nsh * (1 - np.tanh(10.0 * x))
mu_term = self.mu * (
(1.0 / 3.0) * x
+ (4.0 / 3.0) * np.power(x, 3.0)
- (256.0 / 105.0) * np.power(x, 7.0)
)
taux_term = np.power((24.0 / (0.99729 * self.taux)), 2.0) + self.k * bhat
# Differential equation system
dydt = np.zeros_like(state)
dydt[0] = np.pi / 12.0 * (xc + mu_term + bhat + ns)
dydt[1] = np.pi / 12.0 * (self.q * bhat * xc - x * taux_term)
dydt[2] = 60.0 * (alpha * (1.0 - n) - self.beta * n)
return dydt
[docs]
def amplitude(self):
"""
Amplitude of the oscillator computed from the integrated state trajectory.
Returns
-------
numpy.ndarray
Amplitude at each time point: sqrt(x^2 + xc^2).
"""
# Integrate model and extract amplitude
x = self.model_states[:, 0]
xc = self.model_states[:, 1]
return np.sqrt(x**2 + xc**2)
[docs]
def phase(self):
"""
Returns the phase angle of the oscillator computed from the integrated state trajectory.
Returns
-------
numpy.ndarray
Phase angle (radians) at each time point: arctangent(xc/x).
"""
# Integrate model and extract phase
x = self.model_states[:, 0]
# Multiplying xc makes the phase move clockwise
xc = -1.0 * self.model_states[:, 1]
# Observe that np.angle(x + complex(0,1) * xc) == np.atan2(xc,x)
# The y (in this case, xc) appears first in the atan2 formula
return np.angle(x + complex(0, 1) * xc)
# return np.atan2(xc,x)
[docs]
def cbt(self):
"""
Time points corresponding to the predicted core bod temperature minima (CBTmin), derived from the
state variable x.
Returns
-------
numpy.ndarray
Array of time points (in hours) where minima of x occur, corresponding to the CBTmin.
"""
# Calculate time step (dt) between consecutive time points
dt = np.diff(self.time)[0]
# Invert cos(x) to turn the minima into maxima (peaks)
inverted_x = -1.0 * self.model_states[:, 0]
# Identify the indices where the minima occur
cbt_min_indices, _ = find_peaks(inverted_x, distance=np.ceil(13.0 / dt))
# Use the previous indices to find the cbtmin times
cbtmin_times = self.time[cbt_min_indices] + self.phi_ref
# if you want to know in clock time, just do cbtmin_times % 24
return cbtmin_times
[docs]
class Breslow13(Model):
"""
Breslow et al. (2013) circadian model with explicit melatonin dynamics.
This model extends classical light-driven limit-cycle oscillator models of the human circadian pacemaker by
incorporating a multi-compartment melatonin system. In addition to the core state variables (x, x_c, n),
it models pineal, plasma, and exogenous melatonin concentrations.
Our implementation closely follows the approach of the `circadian`package by Arcascope [2]. However, we use the
more powerful LSODA integrator (via SciPy's `odeint`) for numerical integration, enabling the integration of the
system using more complex light trajectories.
State variables
---------------
x : Core oscillator state variable
x_c : Complementary oscillator variable (quadrature component)
n : Phototransduction state
h1 : Pineal melatonin concentration
h2 : Plasma melatonin concentration
h3 : Exogenous melatonin compartment
Attributes
----------
h1_threshold : float
Lower bound used to prevent negative pineal melatonin production.
tauc : float
Intrinsic circadian period (hours).
phi_on : float
Phase angle (radians) at which melatonin synthesis is activated.
phi_off : float
Phase angle (radians) at which melatonin synthesis is deactivated.
phi_ref : float
Phase offset (radians) used when converting internal oscillator
phase to clock time (e.g., CBTmin timing).
cbt_to_dlmo : float
Time offset (in hours) from CBTmin to DLMO.
initial_conditions : numpy.ndarray
State vector at the start of simulation (default: [-0.219, -1.22, 0.519, 0.0, 0.0, 0.0]).
model_states : numpy.ndarray
Integrated state trajectories of the model.
time : numpy.ndarray
Array of time points for simulation.
inputs : numpy.ndarray
Array of input values (e.g., light intensity and wake) over time.
Methods
-------
derivative(t, state, light)
Computes the derivatives of the state variables at a given time and light input.
amplitude()
Calculates the amplitude of the oscillator from integrated states.
phase()
Calculates the phase angle of the oscillator from integrated states.
cbt()
Identifies the timing of core body temperature minima from integrated states.
References
----------
[1] Breslow ER, Phillips AJ, Huang JM, St Hilaire MA, Klerman EB. A mathematical model of the circadian
phase-shifting effects of exogenous melatonin. J Biol Rhythms. 2013 Feb;28(1):79-89.
doi: 10.1177/0748730412468081. PMID: 23382594; PMCID: PMC3733227.
[2] Tavella, F., Hannay, K., & Walch, O. (2023). Arcascope/circadian: Refactoring of readers
and metrics modules, Zenodo, v1.0.2. https://doi.org/10.5281/zenodo.8206871
"""
def __init__(
self,
data=None,
inputs=None,
time=None,
h1_threshold=0.001,
k=0.55,
i0=9500.0,
i1=100.0,
alpha_0=0.1,
beta=0.007,
p=0.5,
r=15.36,
g=37.0,
b=0.4,
gamma=0.13,
kappa=12.0 / np.pi,
tauc=24.1,
f=0.99729,
m=7.0,
eta=0.04,
xi=0.54,
beta_ip=0.000783 * 3600.0,
beta_cp=0.000335 * 3600.0,
beta_ap=0.000162 * 3600.0,
a=0.0010422 * 3600.0,
phi_on=6.113 - 2 * np.pi,
phi_off=4.352 - 2 * np.pi,
delta=600.0,
mmax=0.019513,
hstat=861.0,
sigma=50.0,
phi_ref=0.97,
cbt_to_dlmo=7.0,
initial_condition=None,
):
if inputs is None or time is None:
super().__init__(
data=data,
initial_conditions=np.array([-0.219, -1.22, 0.519, 0.0, 0.0, 0.0]),
)
else:
super().__init__(
inputs=inputs,
time=time,
initial_conditions=np.array([-0.219, -1.22, 0.519, 0.0, 0.0, 0.0]),
)
# Check for a scenario in which a initial condition is provided
if initial_condition is not None:
self.initial_conditions = initial_condition
self.model_name = self.__class__.__name__
self.h1_threshold = h1_threshold
self.k = k
self.i0 = i0
self.i1 = i1
self.alpha_0 = alpha_0
self.beta = beta
self.p = p
self.r = r
self.g = g
self.b = b
self.gamma = gamma
self.kappa = kappa
self.tauc = tauc
self.f = f
self.m = m
self.eta = eta
self.xi = xi
self.beta_ip = beta_ip
self.beta_cp = beta_cp
self.beta_ap = beta_ap
self.a = a
self.phi_on = phi_on
self.phi_off = phi_off
self.delta = delta
self.mmax = mmax
self.hstat = hstat
self.sigma = sigma
self.phi_ref = phi_ref
self.cbt_to_dlmo = cbt_to_dlmo
self.initialize_model_states()
[docs]
def avoid_negative_h1(self, h1, b):
if h1 < self.h1_threshold and (1.0 - self.m * b) < 0:
return 0.0
else:
return 1.0
[docs]
def melatonin_drive(self, h2):
exp_factor = 1.0 + np.exp((self.hstat - h2) / self.sigma)
return self.mmax / exp_factor
[docs]
def derivative(self, t, state, input):
"""
Computes the derivatives of the state variables at a given time and light input.
Parameters
----------
t : float
Current simulation time (in hours).
state : numpy.ndarray
Current values of the state variables at time t.
input: float
Contains light intensity input (in lux) at time t and the corresponding wake state.
Returns
-------
numpy.ndarray
Derivatives at time t.
"""
x = state[0]
xc = state[1]
n = state[2]
h1 = state[3] # pineal melatonin
h2 = state[4] # plasma melatonin
h3 = state[5] # exogenous melatonin
light = input
alpha = (
self.alpha_0 * ((light / self.i0) ** self.p) * (light / (light + self.i1))
)
b = self.g * alpha * (1.0 - n) * (1.0 - self.b * x) * (1.0 - self.b * xc)
gamma_term = self.gamma * (x / 3.0 + 4.0 / 3.0 * x**3 - 256.0 / 105.0 * x**7)
tauc_term = pow(24.0 / (self.f * self.tauc), 2) + self.k * b
m = self.melatonin_drive(h2)
s_h1_b = self.avoid_negative_h1(h1, b)
phase = np.arctan2(
x, xc
) # note this is the opposite to all other models (here xc is on the x-axis)
if self.phi_on > phase > self.phi_off:
activation = 1.0 - np.exp(
-self.delta * np.mod(self.phi_on - phase, 2 * np.pi)
)
normalization = 1.0 - np.exp(
-self.delta * np.mod(self.phi_on - self.phi_off, 2 * np.pi)
)
a = self.a * activation / normalization
else:
a = self.a * np.exp(-self.r * np.mod(self.phi_on - self.phi_off, 2 * np.pi))
# Differential equation system
dydt = np.zeros_like(state)
dydt[0] = (1.0 / self.kappa) * (xc + gamma_term + b - self.eta * m)
dydt[1] = (1.0 / self.kappa) * (b * xc / 3.0 - x * tauc_term - self.xi * m)
dydt[2] = 60.0 * (alpha * (1 - n) - self.beta * n) # typo on paper missing 60.0
dydt[3] = -self.beta_ip * h1 + a * (1.0 - self.m * b) * s_h1_b
dydt[4] = self.beta_ip * h1 - self.beta_cp * h2 + self.beta_ap * h3
dydt[5] = -self.beta_ap * h3
return dydt
[docs]
def amplitude(self):
"""
Amplitude of the oscillator computed from the integrated state trajectory.
Returns
-------
numpy.ndarray
Amplitude at each time point: sqrt(x^2 + xc^2).
"""
# Integrate model and extract amplitude
x = self.model_states[:, 0]
xc = self.model_states[:, 1]
return np.sqrt(x**2 + xc**2)
[docs]
def phase(self):
"""
Returns the phase angle of the oscillator computed from the integrated state trajectory.
Returns
-------
numpy.ndarray
Phase angle (radians) at each time point: arctangent(xc/x).
"""
# Integrate model and extract phase
x = self.model_states[:, 0]
# Multiplying xc makes the phase move clockwise
xc = -1.0 * self.model_states[:, 1]
# Observe that np.angle(x + complex(0,1) * xc) == np.atan2(xc,x)
# The y (in this case, xc) appears first in the atan2 formula
return np.angle(x + complex(0, 1) * xc)
# return np.atan2(xc,x)
[docs]
def cbt(self):
"""
Time points corresponding to the predicted core bod temperature minima (CBTmin), derived from the
state variable x.
Returns
-------
numpy.ndarray
Array of time points (in hours) where minima of x occur, corresponding to the CBTmin.
"""
# Calculate time step (dt) between consecutive time points
dt = np.diff(self.time)[0]
# Invert cos(x) to turn the minima into maxima (peaks)
inverted_x = -1.0 * self.model_states[:, 0]
# Identify the indices where the minima occur
cbt_min_indices, _ = find_peaks(inverted_x, distance=np.ceil(13.0 / dt))
# Use the previous indices to find the cbtmin times
cbtmin_times = self.time[cbt_min_indices] + self.phi_ref
# if you want to know in clock time, just do cbtmin_times % 24
return cbtmin_times
[docs]
class Skeldon23:
"""
Skeldon et al. (2023) model. It simulates circadian phase and sleep/wake patterns from a light time series.
This model takes a light exposure (lux over time) and simulates:
- A circadian rhythm signal (internal clock dynamics)
- A sleep-pressure signal (builds up during wake, dissipates during sleep)
- A predicted sleep/wake state (wake=0, sleep=1).
The user does not need to provide a sleep series. The model generates sleep/wake internally by applying a switching
rule based on sleep pressure and circadian timing.
Our implementation closely follows the approach of the `circadian`package by Arcascope [2]. However, we use the
more powerful LSODA integrator (via SciPy's `odeint`) for numerical integration, enabling the integration of the
system using more complex light trajectories.
Attributes
----------
data : pandas.Series, optional
Time-indexed light intensity series (lux) with a DatetimeIndex.
forced_wakeup_weekday_only : bool, optional
If True, apply forced wake-up only on weekdays according to the author's rule.
forced_wakeup_light_threshold : float or None, optional
If provided, forces wake (s=0) whenever light exceeds this threshold.
mu, chi, h0, delta, ca : float
Sleep homeostasis and switching parameters.
tauc, f, g, p, k, b, gamma, alpha_0, beta, i0, kappa : float
Circadian oscillator and phototransduction parameters.
Light is transformed into a photic drive via alpha(light), then b(t).
c20, alpha21, alpha22, beta21, beta22, beta23 : float
Parameters for circadian modulation term C(x, x_c) that shifts sleep thresholds.
s0: float, optional
Initial sleep state (0 = wake, 1 = sleep). Default is 0.
cbt_to_dlmo : float
Time offset (in hours) from CBTmin to DLMO.
initial_conditions : numpy.ndarray
State vector at the start of simulation (default: [0.23995682, -1.1547196, 0.50529415, 12.83846474]).
model_states : numpy.ndarray
Integrated state trajectories of the model.
time : numpy.ndarray
Array of time points for simulation.
inputs : numpy.ndarray
Array of input values (e.g., light intensity) over time.
Methods
-------
derivative(t, state, light)
Computes the derivatives of the state variables at a given time and light input.
amplitude()
Calculates the amplitude of the oscillator from integrated states.
phase()
Calculates the phase angle of the oscillator from integrated states.
cbt()
Identifies the timing of core body temperature minima from integrated states.
References
----------
[1] Skeldon, A. C., Garcia, T. R., Cleator, S. F., Monica, C. della, Ravindran, K. K. G., Revell,
V. L., & Dijk, D.-J. (2023). Method to determine whether sleep phenotypes are driven by endogenous
circadian rhythms or environmental light by combining longitudinal data and personalised mathematical
models. PLOS Computational Biology, 19(12), e1011743. https://doi.org/10.1371/journal.pcbi.1011743
[2] Tavella, F., Hannay, K., & Walch, O. (2023). Arcascope/circadian: Refactoring of readers
and metrics modules, Zenodo, v1.0.2. https://doi.org/10.5281/zenodo.8206871
"""
def __init__(
self,
data=None,
inputs=None,
time=None,
mu=17.78,
chi=45.0,
h0=13.0,
delta=1.0,
ca=1.72,
tauc=24.2,
f=0.99669,
g=19.9,
p=0.6,
k=0.55,
b=0.4,
gamma=0.23,
alpha_0=0.16,
beta=0.013,
i0=9500.0,
kappa=24.0 / (2.0 * np.pi),
c20=0.7896,
alpha21=-0.3912,
alpha22=0.7583,
beta21=-0.4442,
beta22=0.0250,
beta23=-0.9647,
s0=0.0,
forced_wakeup_light_threshold=None,
forced_wakeup_weekday_only=False,
cbt_to_dlmo=7.0,
initial_condition=np.array([0.23995682, -1.1547196, 0.50529415, 12.83846474]),
):
self.sleep_state = None
self.received_light = None
self.model_name = self.__class__.__name__
self.initial_conditions = initial_condition
self.model_states = None
self.data = data
# -------------------------------------
# Input handling logic (time and light)
# -------------------------------------
# Extract time from data index
if time is None or inputs is None:
if data is not None:
self.time = np.asarray(
(data.index - data.index.min()).total_seconds() / 3600
)
self.inputs = np.asarray(data.values)
else:
raise ValueError(
"Must provide either light time series (data) or input and time."
)
else:
self.time = time
self.inputs = inputs
# ----------------
# Model parameters
# ----------------
# Sleep/wake regulation parameters
self.mu = mu
self.chi = chi
self.h0 = h0
self.delta = delta
self.ca = ca
# Light/circadian parameters
self.tauc = tauc
self.f = f
self.g = g
self.p = p
self.k = k
self.b = b
self.gamma = gamma
self.alpha_0 = alpha_0
self.beta = beta
self.i0 = i0
self.kappa = kappa
# Parameters that perform circadian modulatation of wakefulness
self.c20 = c20
self.alpha21 = alpha21
self.alpha22 = alpha22
self.beta21 = beta21
self.beta22 = beta22
self.beta23 = beta23
# Sleep state (0 for wake, 1 for sleep) - first sleep state given by user
self.current_sleep_state = s0
# Forced wake up
self.forced_wakeup_light_threshold = forced_wakeup_light_threshold
self.forced_wakeup_weekday_only = forced_wakeup_weekday_only
self.cbt_to_dlmo = cbt_to_dlmo
[docs]
def dlmos(self):
"""
Calculates the predicted Dim Light Melatonin Onset (DLMO) time points using a fixed,
pre-specified offset (default: seven hours).
Returns
-------
numpy.ndarray:
Array of time points (in hours) at which DLMO is expected to occur.
"""
return self.cbt() - self.cbt_to_dlmo
[docs]
def get_initial_conditions(
self, loop_number, data=None, light_vector=None, time_vector=None
):
"""
Attempts to equilibrate the model's initial conditions by repeated simulation.
Works with the piecewise odeint integrator that returns:
(time_hours, state_trajectory, sleep_state_trajectory)
Returns
-------
np.ndarray
Final continuous state [x, xc, n, h] after entrainment attempt.
"""
# Extract time from light index
if time_vector is None or light_vector is None:
if data is not None:
time_vector = np.asarray(
(data.index - data.index.min()).total_seconds() / 3600
)
light_vector = np.asarray(data.values)
else:
raise ValueError("Must provide either light series or input and time.")
else:
time_vector = np.asarray(time_vector, dtype=float)
light_vector = np.asarray(light_vector, dtype=float)
# Save existing inputs/time to prevent being permanently overwritten
old_time = getattr(self, "time", None)
old_inputs = getattr(self, "inputs", None)
# Initialize with default initial conditions
initial_condition = np.asarray(self.initial_conditions, dtype=float)
# List to collect dlmos
dlmos = []
try:
for i in range(loop_number):
# Calculate the solution for the models
self.time = time_vector
self.inputs = light_vector
# Run one full cycle simulation starting from current initial_conditions
time_hours, state_trajectory, sleep_state_trajectory = (
self.integrate_piecewise_odeint(initial_state=initial_condition)
)
self.model_states = state_trajectory
# Compute dlmos from the solution
dlmos.append(self.dlmos())
# Update initial_condition for the next iteration
initial_condition = state_trajectory[-1]
# Exit early if entrainment is detected
if i > 1 and np.isclose(dlmos[-1][-1], dlmos[-2][-1], atol=1e-3):
# Print number of loops required for entrainment
# print(f"The model entrained after {i} loops.")
# Update model initial conditions to entrained state
self.initial_conditions = state_trajectory[-1]
# Return entrained model solution
return state_trajectory[-1]
# Return unentrained model solution
self.initial_conditions = state_trajectory[-1]
return state_trajectory[-1]
finally:
# Restore whatever was previously on the model
self.time = old_time
self.inputs = old_inputs
# ---------------------------------------------------
# Switching between awake and asleep (discrete steps)
# ---------------------------------------------------
def _circadian_modulation_C(self, x, xc):
"""
Circadian modulation term C(x,xc) used in the sleep switching threshold
"""
linear_term = self.c20 + self.alpha21 * xc + self.alpha22 * x
quadratic_term = (
self.beta21 * xc * xc + self.beta22 * xc * x + self.beta23 * x * x
)
return linear_term + quadratic_term
def _is_weekday(self, t_hours: float) -> bool:
"""
Author's weekday rule: treat t=0 as start of a 7-day cycle;
weekday if (t/24) % 7 < 5 (5 is Saturday)
"""
return ((t_hours / 24.0) % 7) < 5
[docs]
def update_sleep_state(
self, t: float, state: np.ndarray, light_input: float
) -> float:
"""
Authors' post-step sleep-state update rule.
This method should be called exactly once per external time step, after
integrating the continuous ODE across that step.
"""
x = state[0]
xc = state[1]
h = state[3]
s = float(self.current_sleep_state)
if s not in (0.0, 1.0):
raise ValueError("current_sleep_state must be 0 or 1")
c = self._circadian_modulation_C(x, xc)
new_s = s
if s == 0.0:
# wake -> sleep
h_plus = self.h0 + 0.5 * self.delta + self.ca * c
if h >= h_plus:
new_s = 1.0
else:
# sleep -> wake
h_minus = self.h0 - 0.5 * self.delta + self.ca * c
if h <= h_minus:
new_s = 0.0
# Forced wake-up by light (mirror authors' behavior)
if self.forced_wakeup_light_threshold is not None:
weekday_ok = True
if self.forced_wakeup_weekday_only:
weekday_ok = self._is_weekday(t)
if weekday_ok and (light_input > self.forced_wakeup_light_threshold):
new_s = 0.0
self.current_sleep_state = new_s
return new_s
# -----------------------------
# Integration step using odeint
# -----------------------------
[docs]
def integrate_piecewise_odeint(self, initial_state=None):
"""
Integrate the Skeldon23 sleep/circadian model over time.
This model contains:
- Continuous variables (circadian + sleep pressure)
- A discrete sleep state (awake=0, asleep=1)
Because sleep state is discrete, we:
1) Keep sleep state fixed within each time interval
2) Integrate the continuous equations across that interval
3) Update sleep state at the end of the interval
Returns
-------
time_hours : np.ndarray
Time values in hours.
state_trajectory : np.ndarray
Continuous state history [x, xc, n, h].
sleep_state_history : np.ndarray
Sleep/wake state over time (0=wake, 1=sleep).
"""
# Convert time and light input to numpy arrays
time_hours = np.asarray(self.time, dtype=float)
light_input = np.asarray(self.inputs)
# Initial continuous state
if initial_state is None:
current_state = np.asarray(self.initial_conditions, dtype=float)
else:
current_state = np.asarray(initial_state, dtype=float)
# Number of timepoints in the total state trajectory
n_timepoints = len(time_hours)
# Store full trajectory of state variables
state_trajectory = np.zeros((n_timepoints, 4), dtype=float)
# Store full trajectory of sleep states (0=wake,1=sleep)
sleep_state_trajectory = np.zeros(n_timepoints, dtype=float)
# Set initial values
state_trajectory[0] = current_state
sleep_state_trajectory[0] = self.current_sleep_state
# Loop over each time interval
for i in range(n_timepoints - 1):
# Define start and end times
start_time = float(time_hours[i])
end_time = float(time_hours[i + 1])
# Assume that light is constant within this interval
interval_light = float(light_input[i])
# Freeze sleep state during this interval
sleep_state_fixed = float(self.current_sleep_state)
def derivative(state, tt):
"""
Model master equations.
"""
# Model state equations (no direct physiological interpretation)
x = state[0]
xc = state[1]
# Photic drive
n = state[2]
# Sleep pressure
h = state[3]
# Assume that light only affects the system when awake
light = (1.0 - sleep_state_fixed) * interval_light
# Convert light into biological drive
if light <= 0:
alpha = 0.0
else:
alpha = self.alpha_0 * (light / self.i0) ** self.p
# Photic input to the circadian system
b = (
self.g
* (1.0 - n)
* alpha
* (1.0 - self.b * x)
* (1.0 - self.b * xc)
)
# Oscillator dynamics
gamma_term = self.gamma * (xc - 4.0 / 3.0 * xc**3)
tauc_term = (24.0 / (self.f * self.tauc)) ** 2 + self.k * b
# Define derivatives
dydt = np.zeros_like(state)
dydt[0] = (1.0 / self.kappa) * (xc + b)
dydt[1] = (1.0 / self.kappa) * (gamma_term - x * tauc_term)
dydt[2] = 60.0 * (alpha * (1.0 - n) - self.beta * n)
# Sleep pressure increases when awake, decreases when asleep
dydt[3] = (1.0 / self.chi) * (-h + (1.0 - sleep_state_fixed) * self.mu)
return dydt
# Integrate system across interval
t_interval = [start_time, end_time]
solution = odeint(derivative, state_trajectory[i], t_interval)
new_state = solution[-1]
state_trajectory[i + 1] = new_state
# Update sleep state AFTER integration
new_sleep_state = self.update_sleep_state(
end_time, new_state, interval_light
)
sleep_state_trajectory[i + 1] = new_sleep_state
return time_hours, state_trajectory, sleep_state_trajectory
[docs]
def amplitude(self):
"""
Amplitude of the oscillator computed from the integrated state trajectory.
Returns
-------
numpy.ndarray
Amplitude at each time point: sqrt(x^2 + xc^2).
"""
# Integrate model and extract amplitude
x = self.model_states[:, 0]
xc = self.model_states[:, 1]
return np.sqrt(x**2 + xc**2)
[docs]
def phase(self):
"""
Returns the phase angle of the oscillator computed from the integrated state trajectory.
Returns
-------
numpy.ndarray
Phase angle (radians) at each time point: arctangent(xc/x).
"""
# Integrate model and extract phase
x = self.model_states[:, 0]
# Multiplying xc makes the phase move clockwise
xc = -1.0 * self.model_states[:, 1]
# Observe that np.angle(x + complex(0,1) * xc) == np.atan2(xc,x)
# The y (in this case, xc) appears first in the atan2 formula
return np.angle(x + complex(0, 1) * xc)
# return np.atan2(xc,x)
[docs]
def cbt(self):
"""
Time points corresponding to the predicted core bod temperature minima (CBTmin), derived from the
state variable x.
Returns
-------
numpy.ndarray
Array of time points (in hours) where minima of x occur, corresponding to the CBTmin.
"""
# Calculate time step (dt) between consecutive time points
dt = np.diff(self.time)[0]
# Invert cos(x) to turn the minima into maxima (peaks)
inverted_x = -1.0 * self.model_states[:, 0]
# Identify the indices where the minima occur
cbt_min_indices, _ = find_peaks(inverted_x, distance=np.ceil(13.0 / dt))
# Use the previous indices to find the cbtmin times
cbtmin_times = self.time[cbt_min_indices]
# if you want to know in clock time, just do cbtmin_times % 24
return cbtmin_times
[docs]
def plot(self, states=False, dlmo=False, cbtmin=False):
# Create a new plotly figure
fig = go.Figure()
if states:
# Calculate number of states available
states = self.model_states.shape[1]
labels = ["x", "xc", "Light drive", "Sleep pressure"]
# Iterate over states and plot them
for i in range(states):
fig.add_trace(
go.Scatter(
x=self.data.index.astype(str),
y=self.model_states[:, i],
name=f"{labels[i]}",
)
)
fig.update_layout(
title="Model States",
xaxis=dict(title="Time"),
yaxis=dict(title="Model States"),
)
return fig
if dlmo:
# Plot daily predicted DLMO
fig.add_trace(
go.Scatter(
x=pd.Series(self.data.index.date.astype(str)).unique(),
y=self.dlmos() % 24,
name="Predicted DLMO",
)
)
fig.update_layout(
title="Predicted DLMO",
xaxis=dict(title="Day"),
yaxis=dict(title="DLMO time"),
)
return fig
if cbtmin:
# Plot daily predicted DLMO
fig.add_trace(
go.Scatter(
x=pd.Series(self.data.index.date.astype(str)).unique(),
y=self.cbt() % 24,
name="Predicted CBTmin",
)
)
fig.update_layout(
title="Predicted CBTmin",
xaxis=dict(title="Day"),
yaxis=dict(title="CBTmin time"),
)
return fig
[docs]
def run(self, initial_state=None):
t, states, sleep = self.integrate_piecewise_odeint(initial_state=initial_state)
self.model_states = states
self.sleep_state = sleep
self.received_light = self.inputs
return t, states, sleep
[docs]
def main():
# Parameters for the light schedule
total_days = 10 # Number of days
light_on_hours = 16 # Hours lights are on
light_off_hours = 8 # Hours lights are off
bins_per_hour = 6 # 10-minute bins per hour
# bins_per_hour = 10
# bins_per_hour = 1
# Total bins for light on and off periods
light_on_bins = light_on_hours * bins_per_hour
light_off_bins = light_off_hours * bins_per_hour
# Generate random light levels for the light-on period (e.g., between 100 and 800 lux)
np.random.seed(42)
light_on_variation = np.random.randint(low=900, high=901, size=light_on_bins)
# Create the daily schedule: light-off period (0 lux) followed by light-on period
daily_schedule = np.concatenate([np.zeros(light_off_bins), light_on_variation])
# Repeat for the total number of days
light = np.tile(daily_schedule, total_days)
dt = 10 / 60 # 10 minutes in hours
# dt = 1
# dt=1/10 # for 10 bins/h
time = np.arange(0, len(light) * dt, dt)
# SKELDON
skeldon = Skeldon23(inputs=light, time=time)
initial = skeldon.get_initial_conditions(
loop_number=10, light_vector=light, time_vector=time
)
t, states, sleep = skeldon.integrate_piecewise_odeint(initial_state=initial)
x = states[:, 0]
xc = states[:, 1]
n = states[:, 2]
h = states[:, 3]
plt.figure(figsize=(15, 8))
plt.plot(t, x)
plt.ylabel("var")
plt.show()
return None
# SECTION FOR COMPARING CIRCADIAN MODELS (FORGER AND HANNAY)
comparison = ModelComparer(inputs=light, time=time, equilibrate=True)
comparison.linearize_phase(change_params=True)
comparison.find_optimal_params(change_params=True)
comparison.predict_forger(change_params=True)
# comparison.error(change_params=True)
print(comparison.rmse())
# max_error_x, min_error_x, error_band_x, magnitude_x = comparison.error_stats()[0]
# max_error_xc, min_error_xc, error_band_xc, magnitude_xc = comparison.error_stats()[1]
# SECTION WITH GENERAL COMMANDS
# hannay = HannaySP(inputs=light, time=time)
# forger = HannaySP(inputs=light, time=time)
# esri = ESRI(inputs=light, time=time)
# esri.calculate()
# start = time.time()
# r = forger.model_states[:,1]
# states = list(r)
# r = jewett.integrate()[:,0]
# r = hannay.amplitude()
# r = hannay.phase()
# x = forger.integrate()[:,1]
# print(hannay.dlmos() % 24)
# phi = hannay.integrate()[:, 1]
# predicted_x = hannay.predict_forger()[0]
# hannay.initial_conditions = np.array([1,2,3])
# print(forger.equilibrate(100, light_vector, time_vector))
# print(hannay.equilibrate(100, light_vector, time_vector))
# end = time.time()
# REGION FOR PLOTS
# plt.figure(figsize=(18, 8))
# plt.plot(phi)
# plt.xlabel("Time")
# plt.ylabel("Phi (state variable)")
# plt.title("State Evolution Over Time")
# plt.legend()
# plt.grid(True)
# plt.show()
# plt.figure()
# plt.plot(comparison.predicted_x, label="predicted_x", color="black", alpha=0.5)
# plt.plot(comparison.predicted_x, label="predicted_x", color="blue", alpha=0.9)
# peaks,_= find_peaks(comparison.error_x)
# for peak in peaks:
# print(f'{comparison.error_x[peak]:.3f}')
# plt.plot(comparison.error_x, label="Error", color="pink", alpha=0.9)
# plt.axhline(y=max_error_x, color="green", linestyle="--", label="Max Error")
# plt.axhline(y=min_error_x, color="purple", linestyle="--", label="Min error")
# plt.text(
# x=len(comparison.x) // 2,
# y=(max_error_x + min_error_x) / 2,
# s=f"Error band width {error_band_x:.2f}",
# color="orange",
# ha="center",
# va="center",
# fontsize=12,
# backgroundcolor="white",
# )
# plt.xlabel("Time")
# plt.ylabel("State variable")
# plt.title("State Evolution Over Time")
# plt.legend()
# plt.grid(True)
# print(f"Magnitude of the error in predicting x = {magnitude_x}")
# plt.figure()
# plt.plot(comparison.predicted_xc, label="predicted_xc", color="black", alpha=0.5)
# plt.plot(comparison.xc, label="observed_xc", color="blue", alpha=0.9)
# plt.plot(comparison.error_xc, label="Error", color="pink", alpha=0.9)
# plt.axhline(y=max_error_xc, color="green", linestyle="--", label="Max Error")
# plt.axhline(y=min_error_xc, color="purple", linestyle="--", label="Min error")
# plt.text(
# x=len(comparison.xc) // 2,
# y=(max_error_xc + min_error_xc) / 2,
# s=f"Error band width {error_band_xc:.2f}",
# color="orange",
# ha="center",
# va="center",
# fontsize=12,
# backgroundcolor="white",
# )
# plt.xlabel("Time")
# plt.ylabel("State variable")
# plt.title("State Evolution Over Time")
# plt.legend()
# plt.grid(True)
# print(f"Magnitude of the error in predicting x = {magnitude_xc}")
# plt.figure(figsize=(18,8))
# plt.plot(comparison.error_x, label='Error', color='pink', alpha=0.9)
# plt.xlabel("Time")
# plt.ylabel("Error in x prediction")
# plt.title("x Error Evolution Over Time")
# plt.legend()
# plt.grid(True)
# plt.figure(figsize=(18,8))
# plt.plot(comparison.error_xc, label='Error', color='pink', alpha=0.9)
# plt.xlabel("Time")
# plt.ylabel("Error in xc prediction")
# plt.title("xc Error Evolution Over Time")
# plt.legend()
# plt.grid(True)
# plt.figure()
# plt.plot(r, label=f'State r', color='blue', alpha=0.7)
# plt.xlabel("Time")
# plt.ylabel(f"Model state r")
# plt.title(f"Model State r Evolution Over Time")
# plt.legend()
# plt.grid(True)
# plt.show()
# print(f"Execution time is {end-start} seconds.")
if __name__ == "__main__":
main()