Source code for mne_videobrowser.raw_timestamp_computaton

"""Contains function for extracting timestamps from a trigger channel."""

# License: BSD-3-Clause
# Copyright (c) 2014 BioMag Laboratory, Helsinki University Central Hospital
# Copyright (c) 2025 Aalto University

import logging

import mne
import numpy as np
import numpy.typing as npt

# Parameters for detecting timestamps. Should match the parameters used for
# generating the timing sequence
_BASELINE = 5  # seconds
_TRAIN_INTRVL = 10  # seconds
_TRAIN_STEP = 0.015  # seconds
_NBITS = 43  # including the parity bit

logger = logging.getLogger(__name__)


[docs] def compute_raw_timestamps( raw: mne.io.Raw, timing_channel: str ) -> npt.NDArray[np.float64]: """Get the timestamps from raw data having Helsinki VideoMEG timing channel. Parameters ---------- raw : mne.io.Raw The raw data object. timing_channel : str Channel name string for the timing channel. Returns ------- NDArray[np.float64] Array of timestamps corresponding to each sample in the raw data. """ timing_data = raw.get_data(picks=timing_channel, return_times=False) assert isinstance(timing_data, np.ndarray), ( "With return_times=False, timing data should be an ndarray" ) timing_data = timing_data.squeeze() # remove the channel dimension return _comp_tstamps(timing_data, raw.info["sfreq"])
def _read_timestamp(dtrigs, cur, step, nbits): """Read and decode one timestamp. Return the timestamp on success or -1 otherwise. """ ts = 0 parity = False if cur + nbits >= len(dtrigs): logger.warning("end of input reached before all the bits read") return -1 # Read the bits for i in range(nbits): # check the interval between the two triggers if (dtrigs[cur + i + 1] < step * 1.5) or (dtrigs[cur + i + 1] > step * 4.5): logger.warning("invalid interval between two triggers") return -1 # check whether the next bit is 0 or 1 if dtrigs[cur + i + 1] > step * 3: parity = not parity if i < nbits - 1: # don't read the parity bit into the timestamp ts = ts + 2**i if parity: logger.warning("parity check failed") return -1 else: return ts def _comp_tstamps_1bit(inp, sfreq) -> npt.NDArray[np.float64]: """Extract timestamps from a "normal" (not composite) trigger channel. Parameters ---------- inp - vector of samples for the trigger channel sfreq - sampling frequency Return the vector of the same length as inp, containing timestamps for each entry of inp. For detecting timestamps use parameters in the beginning of the file. Assume that the input values are either 0 or 1. TODO: this function does not handle the boundary case for the first train of pulses correctly. This is because there is no trigger before the train and there will be no dtrigs value before the first trigger of the train. Thus the first pulse train will always be ignored. It would be neat to fix this. """ THRESH = 0.5 # input should be a 1-d vector assert inp.ndim == 1 # find all triggers (threshold crossings) trigs = np.where((inp[:-1] < THRESH) & (inp[1:] > THRESH))[0] + 1 # iterate over all timestamp candidates samps = [] tss = [] dtrigs = np.diff(trigs) for i in np.where(dtrigs > _BASELINE * sfreq)[0]: ts = _read_timestamp(dtrigs, i, _TRAIN_STEP * sfreq, _NBITS) if ts != -1: samps.append(trigs[i + 1]) tss.append(ts) # do some sanity checking if len(tss) < 2: raise Exception("Less than 2 timestamps found") if len(tss) * _TRAIN_INTRVL * sfreq < len(inp) * 0.1: raise Exception("Too few timestamps detected") # fit timestamps to samples with linear regression p = np.polyfit(samps, tss, 1) data_tstamps = np.polyval(p, np.arange(len(inp))) errs = np.abs(np.polyval(p, samps) - tss) if data_tstamps.dtype != np.float64: logger.warning( f"Data type of raw data timestamps is {data_tstamps.dtype}, " "converting to float64" ) data_tstamps = data_tstamps.astype(np.float64) logger.info( f"Raw timestamp computation: regression fit errors (abs): mean {errs.mean():f}," f" median {np.median(errs):f}, max {errs.max():f}" ) return data_tstamps def _comp_tstamps(inp, sfreq) -> npt.NDArray[np.float64]: """Extract timestamps from a trigger channel. Parameters ---------- inp - vector of samples for the trigger channel sfreq - sampling frequency Check individual bits of the inp to see whether any of them contains timing information. Return the vector of the same length as inp, containing timestamps for each entry of inp. """ if inp.min() < 0: raise ValueError("Negative values in composite? channel") # Try different bits until succeeding or running out of the bits while inp.max() > 0: try: return _comp_tstamps_1bit(inp % 2, sfreq) except Exception: inp = inp // 2 raise Exception("No timing information found")