Source code for simba.data_processors.pybursts_calculator

### CODE MODIFID FROM PYBURST PACKAGE  https://pypi.org/project/pybursts/
import math

import numpy as np


[docs]def kleinberg_burst_detection(offsets: np.ndarray, s: float, gamma: float): """ Detect hierarchical bursts in a 1D sequence of event times using Kleinberg's two-state infinite-state automaton (modified from `pybursts <https://pypi.org/project/pybursts/>`_). Bursts are intervals where events arrive at a higher-than-baseline rate. The algorithm assigns each inter-event gap to a discrete *level* ``q``: level ``0`` is baseline, higher levels (1, 2, …) are progressively faster (higher-rate) bursts. Each level transition opens or closes a burst at that level, producing a hierarchy of nested bursts. .. note:: Private helper used by :class:`~simba.data_processors.kleinberg_calculator.KleinbergCalculator`. For an end-to-end pipeline (frame indices → bouts → bursts), use that class. :param np.ndarray offsets: 1D numeric array of event times (seconds, frame indices, or any monotonically meaningful unit). May be unsorted; sorted internally. Must have **strictly positive gaps** (no two events at the same time). :param float s: Base of the rate scale (``> 1``). The candidate rate at level ``j`` is ``s**j / mean_gap``, so larger ``s`` means levels grow farther apart and bursts must be more pronounced to reach higher levels. Common choice: ``s = 2``. :param float gamma: Cost of moving up one level (``>= 0``). Higher ``gamma`` penalizes rising into a burst, producing fewer / shorter bursts. Lower ``gamma`` makes the detector more sensitive. Common choice: ``gamma = 1``. :return: 2D ``np.ndarray`` of shape ``(N, 3)`` and ``dtype=object`` with one row per detected burst, columns ``[level, start_offset, end_offset]``: * ``level`` — integer burst level (``0`` is the baseline level, higher levels are nested faster bursts). * ``start_offset`` — value from ``offsets`` where this burst opens. * ``end_offset`` — value from ``offsets`` where this burst closes (inclusive of the last event in the run). For a single-event input, returns a single row ``[0, offsets[0], offsets[0]]``. :rtype: np.ndarray :raises ValueError: If ``offsets`` contains two or more events at the same time (zero gap). .. seealso:: :func:`~simba.data_processors.pybursts_calculator_numba.kleinberg_burst_detection` for Numba JIT-accelerated version. .. csv-table:: EXPECTED RUNTIMES :file: ../../docs/tables/kleinberg_burst_detection.csv :widths: 20, 15, 20, 20, 15 :align: center :header-rows: 1 :example: >>> import numpy as np >>> from simba.data_processors.pybursts_calculator import kleinberg_burst_detection >>> offsets = np.array([1.0, 1.1, 1.2, 5.0, 9.0, 9.05, 9.1]) >>> bursts = kleinberg_burst_detection(offsets=offsets, s=2.0, gamma=1.0) >>> bursts.shape[1] 3 """ offsets = np.array(offsets, dtype=object) if offsets.size == 1: bursts = np.array([0, offsets[0], offsets[0]], ndmin=2, dtype=object) return bursts offsets = np.sort(offsets) gaps = np.diff(offsets) if not np.all(gaps): raise ValueError("Input cannot contain events with zero time between!") T = np.sum(gaps) n = np.size(gaps) g_hat = T / n k = int(math.ceil(float(1 + math.log(T, s) + math.log(1 / np.amin(gaps), s)))) gamma_log_n = gamma * math.log(n) def tau(i, j): if i >= j: return 0 else: return (j - i) * gamma_log_n alpha_function = np.vectorize(lambda x: s**x / g_hat) alpha = alpha_function(np.arange(k)) log_alpha = np.log(alpha) C = np.repeat(float("inf"), k) C[0] = 0 q = np.empty((k, 0)) for t in range(n): C_prime = np.repeat(float("inf"), k) q_prime = np.empty((k, t + 1)) q_prime.fill(np.nan) for j in range(k): cost_function = np.vectorize(lambda x: C[x] + tau(x, j)) cost = cost_function(np.arange(0, k)) el = np.argmin(cost) C_prime[j] = cost[el] - log_alpha[j] + alpha[j] * gaps[t] if t > 0: q_prime[j, :t] = q[el, :] q_prime[j, t] = j + 1 C = C_prime q = q_prime j = np.argmin(C) q = q[j, :] prev_q = 0 N = 0 for t in range(n): if q[t] > prev_q: N = N + q[t] - prev_q prev_q = q[t] bursts = np.array( [np.repeat(np.nan, N), np.repeat(offsets[0], N), np.repeat(offsets[0], N)], ndmin=2, dtype=object, ).transpose() burst_counter = -1 prev_q = 0 stack = np.repeat(np.nan, N) stack_counter = -1 for t in range(n): if q[t] > prev_q: num_levels_opened = q[t] - prev_q for i in range(int(num_levels_opened)): burst_counter += 1 bursts[burst_counter, 0] = prev_q + i bursts[burst_counter, 1] = offsets[t] stack_counter += 1 stack[stack_counter] = burst_counter elif q[t] < prev_q: num_levels_closed = prev_q - q[t] for i in range(int(num_levels_closed)): bursts[int(stack[stack_counter]), 2] = offsets[t] stack_counter -= 1 prev_q = q[t] while stack_counter >= 0: bursts[int(stack[stack_counter]), 2] = offsets[n] stack_counter -= 1 return bursts
# # # if __name__ == "__main__": # import time # # np.random.seed(42) # sizes = [100, 1_000, 10_000, 100_000, 1_000_000] # s, gamma = 2.0, 0.3 # # for n in sizes: # offsets = np.sort(np.cumsum(np.random.exponential(scale=1.0, size=n))) # start = time.perf_counter() # result = kleinberg_burst_detection(offsets=offsets, s=s, gamma=gamma) # elapsed = time.perf_counter() - start # print(f"n={n:>10,} bursts={result.shape[0]:>6,} time={elapsed:.4f}s")