"""Contains VideoFile interface and its implementations for reading video files."""
# License: BSD-3-Clause
# Copyright (c) 2014 BioMag Laboratory, Helsinki University Central Hospital
# Copyright (c) 2025 Aalto University
import logging
import struct
from abc import ABC, abstractmethod
import cv2
import numpy as np
import numpy.typing as npt
from ._helsinki_videomeg_file_utils import UnknownVersionError, read_block_attributes
logger = logging.getLogger(__name__)
class VideoFile(ABC):
"""Container that holds a video file and provides method to read frames from it."""
@abstractmethod
def __del__(self) -> None:
"""Ensure the video file is released when the object is deleted."""
pass
@abstractmethod
def __enter__(self) -> "VideoFile":
"""Enter the runtime context with opened video file."""
pass
@abstractmethod
def __exit__(self, exc_type, exc_value, traceback) -> None:
"""Exit the runtime context and release the video file."""
pass
@abstractmethod
def get_frame_at(self, frame_idx: int) -> npt.NDArray[np.uint8] | None:
"""Read a specific frame from the video file.
Parameters
----------
frame_idx : int
Index of the frame to read.
Returns
-------
npt.NDArray[np.uint8] | None
The frame as a NumPy array of shape (height, width, 3) or None if the frame
cannot be read. The color format is RGB and the frame is in row-major order.
"""
pass
@abstractmethod
def close(self) -> None:
"""Release the video file."""
pass
@property
@abstractmethod
def frame_count(self) -> int:
"""Return the number of frames in the video file."""
pass
@property
@abstractmethod
def fps(self) -> float:
"""Return the frames per second of the video file."""
pass
@property
@abstractmethod
def frame_width(self) -> int:
"""Return the width of the video frames in pixels."""
pass
@property
@abstractmethod
def frame_height(self) -> int:
"""Return the height of the video frames in pixels."""
pass
@property
@abstractmethod
def fname(self) -> str:
"""Return the full path to the video file."""
pass
@property
def duration(self) -> float:
"""Return the possibly estimated duration of the video in seconds."""
return self.frame_count / self.fps if self.fps > 0 else 0.0
def print_stats(self) -> None:
"""Print statistics about the video file."""
print(f"Stats for video {self.fname}:")
print(f" - Frame count: {self.frame_count}")
print(f" - FPS: {self.fps:.2f}")
print(f" - Duration: {self.duration:.2f} seconds")
print(f" - Frame size: {self.frame_width}x{self.frame_height}")
[docs]
class VideoFileCV2(VideoFile):
"""Video file reader for video files supported by OpenCV.
Parameters
----------
fname : str
Full path to the video file to be read.
"""
def __init__(self, fname: str) -> None:
self._fname = fname
# Capture the video file for processing
self._cap = cv2.VideoCapture(fname)
if not self._cap.isOpened():
raise ValueError(f"Could not open video file: {fname}")
# Matches to cv2.CAP_PROP_POS_FRAMES and tells the index of the next
# frame to be read
self._next_frame_idx = 0
def __del__(self) -> None:
"""Ensure the video capture object is released when the object is deleted."""
self.close()
def __enter__(self) -> "VideoFileCV2":
"""Enter the runtime context for the video file."""
return self
def __exit__(self, exc_type, exc_value, traceback) -> None:
"""Exit the runtime context and release the video file."""
self.close()
[docs]
def close(self) -> None:
"""Release the video capture object."""
if hasattr(self, "_cap") and self._cap.isOpened():
self._cap.release()
[docs]
def get_frame_at(self, frame_idx: int):
"""Read a specific frame from the video file.
Parameters
----------
frame_idx : int
Index of the frame to read.
Returns
-------
npt.NDArray[np.uint8] | None
The frame as a NumPy array of shape (height, width, 3) or None if the frame
cannot be read. The color format is RGB and the frame is in row-major order.
"""
if not self._cap.isOpened():
raise ValueError("Trying to read from a closed video file.")
if frame_idx < 0 or frame_idx >= self.frame_count:
logger.debug(f"Frame index out of bounds: {frame_idx}, returning None.")
return None
# Only alter the next frame to be read if necessary.
# This increases performance when reading frames sequentially.
if frame_idx != self._next_frame_idx:
self._set_next_frame(frame_idx)
return self._read_next_frame()
@property
def frame_count(self) -> int:
return int(self._cap.get(cv2.CAP_PROP_FRAME_COUNT))
@property
def fps(self) -> float:
return self._cap.get(cv2.CAP_PROP_FPS)
@property
def frame_width(self) -> int:
return int(self._cap.get(cv2.CAP_PROP_FRAME_WIDTH))
@property
def frame_height(self) -> int:
return int(self._cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
@property
def fname(self) -> str:
return self._fname
def _read_next_frame(self) -> npt.NDArray[np.uint8] | None:
"""Read the next frame from the video file."""
if not self._cap.isOpened():
raise ValueError("Trying to read from a closed video file.")
ret, frame = self._cap.read()
if not ret:
# End of video?
return None
self._next_frame_idx += 1
# Convert the frame from BGR to RGB format
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
return frame.astype(np.uint8, copy=False)
def _set_next_frame(self, frame_idx: int) -> None:
"""Set the next frame to be read from the video file."""
if frame_idx < 0 or frame_idx >= self.frame_count:
raise ValueError(f"Frame index out of bounds: {frame_idx}")
self._next_frame_idx = frame_idx
self._cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
# --- Code below is adapted from PyVideoMEG project ---
[docs]
class VideoFileHelsinkiVideoMEG(VideoFile):
"""Video file reader for video files in Helsinki VideoMEG project format.
Frame timestamps in milliseconds are stored in the `timestamps_ms` attribute,
individual frames can be accessed using the `get_frame_at` method.
Parameters
----------
fname : str
Full path to the video file to be read.
magic_str : str
Magic string that should be at the beginning of video file.
"""
def __init__(
self, fname: str, magic_str: str = "HELSINKI_VIDEO_MEG_PROJECT_VIDEO_FILE"
) -> None:
self._file_name = fname
self._file = open(fname, "rb")
if not self._file.read(len(magic_str)) == magic_str.encode("utf-8"):
self._file.close()
raise ValueError(
f"File {fname} does not start with the expected "
f"magic string: {magic_str}."
)
self._version = struct.unpack("I", self._file.read(4))[0]
logger.debug(f"Video file version: {self._version}")
# Parse site id and is_sender attribute from the file depending on the version.
if self._version == 0:
pass # nothing to parse
elif self._version == 1 or self._version == 2:
self.site_id = -1
self.is_sender = -1
elif self._version == 3:
self.site_id, self.is_sender = struct.unpack("BB", self._file.read(2))
else:
self._file.close()
raise UnknownVersionError(self._version)
# Get the file size.
begin_data = self._file.tell()
self._file.seek(0, 2)
end_data = self._file.tell()
self._file.seek(begin_data, 0)
# For each frame, store timestamp and pointer to the frame data on disk.
timestamps_list = []
self._frame_ptrs = [] # List of tuples (offset, size) for each frame
while self._file.tell() < end_data: # we did not reach end of file
ts, sz, total_sz = read_block_attributes(self._file, self._version)
timestamps_list.append(ts)
self._frame_ptrs.append((self._file.tell(), sz))
assert self._file.tell() + sz <= end_data
self._file.seek(sz, 1)
# Convert timestamps to numpy array
self.timestamps_ms = np.array(timestamps_list, dtype=np.float64)
self._nframes = len(timestamps_list)
# Use first frame to determine width and height
first_frame = self.get_frame_at(0)
if first_frame is None:
self._file.close()
raise ValueError("Could not read the first frame of the video.")
self._frame_width = first_frame.shape[1]
self._frame_height = first_frame.shape[0]
self._fps = self._estimate_fps(estimate_with="mean")
def __del__(self) -> None:
"""Ensure the video file is closed when the object is deleted."""
self.close()
def __enter__(self) -> "VideoFileHelsinkiVideoMEG":
"""Enter the runtime context with opened video file."""
return self
def __exit__(self, exc_type, exc_value, traceback) -> None:
"""Exit the runtime context and close the video file."""
self.close()
[docs]
def close(self) -> None:
"""Close the video file."""
if hasattr(self, "_file") and not self._file.closed:
try:
self._file.close()
except Exception as e:
logger.warning(f"Error closing video file {self._file_name}: {e}")
[docs]
def get_frame_at(self, frame_idx: int) -> npt.NDArray[np.uint8] | None:
"""Read a specific frame from the video file.
Parameters
----------
frame_idx : int
Index of the frame to read.
Returns
-------
npt.NDArray[np.uint8] | None
The frame as a NumPy array of shape (height, width, 3) or None if the frame
cannot be read. The color format is RGB and the frame is in row-major order.
"""
if self._file.closed:
raise ValueError("Trying to read from a closed video file.")
if frame_idx < 0 or frame_idx >= self._nframes:
logger.debug(f"Frame index out of bounds: {frame_idx}, returning None.")
return None
offset, sz = self._frame_ptrs[frame_idx]
self._file.seek(offset)
frame_bytes = self._file.read(sz)
frame_arr = np.frombuffer(frame_bytes, dtype=np.uint8)
frame = cv2.imdecode(frame_arr, cv2.IMREAD_COLOR)
if frame is None:
logger.error(
f"Could not decode frame at index {frame_idx}, returning None."
)
return None
# Convert the frame from BGR to RGB format
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
return frame.astype(np.uint8, copy=False)
@property
def frame_count(self) -> int:
return self._nframes
@property
def fps(self) -> float:
"""Return the ESTIMATED frames per second of the video file."""
return self._fps
@property
def frame_width(self) -> int:
return self._frame_width
@property
def frame_height(self) -> int:
return self._frame_height
@property
def fname(self) -> str:
return self._file_name
@property
def duration(self) -> float:
"""Return the duration of the video in seconds.
This overrides the base class method to provide a more accurate
duration that is based on the timestamps of the frames.
"""
if self._nframes < 2:
return 0.0
# Duration is the difference between the last and first timestamp in seconds
return (self.timestamps_ms[-1] - self.timestamps_ms[0]) / 1000.0
[docs]
def print_stats(self) -> None:
"""Print statistics about the video file."""
# Override the base class method to include additional stats about timestamps.
super().print_stats()
print(
f" - Timestamp range: {self.timestamps_ms[0] / 1000:.2f} s to "
f"{self.timestamps_ms[-1] / 1000:.2f} s"
)
print(
" - Timestamp mean interval: "
f"{(np.diff(self.timestamps_ms).mean()):.2f} ms"
)
def _estimate_fps(self, estimate_with: str = "mean") -> float:
"""Estimate frames per second (FPS) based on timestamps."""
if self._nframes < 2:
return 0
ts_in_seconds = self.timestamps_ms / 1000
time_diff = np.diff(ts_in_seconds)
if estimate_with == "mean":
avg_time_diff = np.mean(time_diff)
elif estimate_with == "median":
avg_time_diff = np.median(time_diff)
else:
raise ValueError(f"Unknown estimation method: {estimate_with}")
if avg_time_diff <= 0:
raise ValueError(
f"Average time difference is non-positive: {avg_time_diff}. "
"Cannot estimate FPS."
)
return float(1 / avg_time_diff)