Source code for accvlab.on_demand_video_decoder._internal.shared_gop_store

# Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
SharedGopStore: Cross-process shared GOP store backed by POSIX SharedMemory.

Workers :meth:`SharedGopStore.put` GOP packet data and pass lightweight
:class:`GopRef` references through the DataLoader IPC queue.  The main
process calls :meth:`SharedGopStore.get_batch` to read references as
zero-copy numpy views.

Synchronization uses file-based locking (``flock``) which works across
``spawn``'d DataLoader workers (unlike ``multiprocessing.Lock``).

Usage::

    # Main process -- create before spawning DataLoader workers
    store = SharedGopStore.create(capacity=120, store_id=0)

    # Worker -- attach to existing store
    store = SharedGopStore.attach(capacity=120, store_id=0)
    ref = store.lookup(video_path, frame_id)
    if ref is None:
        data = load_from_disk(...)
        ref = store.put(video_path, first_frame_id, gop_len, data)
    # Pass ``ref`` (lightweight) through DataLoader IPC queue

    # Main process -- get batch of GOP data, then clean orphans
    arrays = store.get_batch(refs)

    # Main process -- shutdown
    store.cleanup()
"""

import fcntl
import glob as glob_mod
import hashlib
import os
from multiprocessing import shared_memory
from typing import List, Optional

import numpy as np

from .types import GopRef

# ---------------------------------------------------------------------------
# Metadata table layout
# ---------------------------------------------------------------------------

# 96 bytes per entry -- fits in L1 cache for capacity < 500 (< 48 KB total).
ENTRY_DTYPE = np.dtype(
    [
        ('video_path_hash', np.uint64),  # deterministic MD5 hash of video_path
        ('first_frame_id', np.int32),  # first frame ID of this GOP
        ('gop_len', np.int32),  # number of frames in this GOP
        ('data_size', np.int32),  # actual GOP data size in bytes
        ('state', np.uint8),  # 0=FREE, 1=USED
        ('_pad', np.uint8, (3,)),  # alignment padding
        ('access_tick', np.int64),  # monotonic counter for LRU ordering
        ('shm_name', 'S64'),  # SharedMemory name for the data block
    ]
)

_STATE_FREE = np.uint8(0)
_STATE_USED = np.uint8(1)

# SharedMemory name prefix used for GOP data blocks.
_SHM_PREFIX = "gs"

# Private key to prevent direct instantiation of SharedGopStore.
# Mirrors the pattern used by ``CachedGopDecoder`` in ``_internal/decoder.py``.
_CREATION_KEY = object()


def _hash_video_path(video_path: str) -> np.uint64:
    """Deterministic uint64 hash for a video path string.

    Uses MD5 (truncated to 8 bytes) instead of Python's built-in ``hash()``,
    because ``hash()`` uses a per-process random seed when
    ``PYTHONHASHSEED != 0``, which breaks cross-process consistency with
    ``spawn``'d DataLoader workers.
    """
    digest = hashlib.md5(video_path.encode()).digest()[:8]
    return np.uint64(int.from_bytes(digest, 'little'))


[docs] class SharedGopStore: """Cross-process shared GOP store backed by POSIX SharedMemory. Stores GOP packet data in per-GOP SharedMemory blocks. A small SharedMemory block holds the metadata table (index). File-based locking (``flock``) provides cross-process safety under ``spawn`` mode. **Capacity sizing:** ``capacity`` must exceed the maximum number of GOPs that can be "in flight" (queued in the DataLoader + being consumed by the training loop):: min_capacity > (prefetch_factor * num_workers + 1) * batch_size * num_cameras A recommended formula is ``batch_size * num_cameras * 10``. Note: Do not instantiate this class directly. Use :meth:`create` (main process, before spawning workers) or :meth:`attach` (worker processes) instead — these factories manage shared-memory creation and tear-down correctly. Args: capacity: Maximum number of GOPs to cache. store_id: Unique identifier (typically ``LOCAL_RANK``). _create: Internal flag -- use :meth:`create` / :meth:`attach`. Raises: RuntimeError: If called directly instead of via :meth:`create` / :meth:`attach`. """ def __init__(self, capacity: int, store_id: int, _create: bool, *, _key=None): if _key is not _CREATION_KEY: raise RuntimeError( "SharedGopStore cannot be instantiated directly. " "Use SharedGopStore.create() (main process) or " "SharedGopStore.attach() (workers) instead." ) self.capacity = capacity self.store_id = store_id self._is_creator = _create # --- Metadata SharedMemory --- meta_name = f"gs_meta_{store_id}" meta_size = capacity * ENTRY_DTYPE.itemsize if _create: _cleanup_stale_shm(meta_name) self._meta_shm = shared_memory.SharedMemory(name=meta_name, create=True, size=meta_size) # Zero-initialize all entries (state=FREE) np.frombuffer(self._meta_shm.buf, dtype=np.uint8)[:] = 0 else: try: self._meta_shm = shared_memory.SharedMemory(name=meta_name, create=False) except FileNotFoundError: raise FileNotFoundError( f"SharedGopStore with store_id={store_id} not found. " f"Call SharedGopStore.create() in the main process first." ) self._entries = np.ndarray(capacity, dtype=ENTRY_DTYPE, buffer=self._meta_shm.buf) # --- File-based lock (works across spawn'd processes) --- self._lock_path = f"/dev/shm/gs_lock_{store_id}" if _create: with open(self._lock_path, 'w'): pass self._lock_fd = os.open(self._lock_path, os.O_RDWR) # --- Monotonic tick counter in a tiny SharedMemory --- tick_name = f"gs_tick_{store_id}" if _create: _cleanup_stale_shm(tick_name) self._tick_shm = shared_memory.SharedMemory(name=tick_name, create=True, size=8) np.frombuffer(self._tick_shm.buf, dtype=np.int64)[:] = 0 else: self._tick_shm = shared_memory.SharedMemory(name=tick_name, create=False) self._tick_arr = np.ndarray(1, dtype=np.int64, buffer=self._tick_shm.buf) # --- Per-process handle cache (not shared) --- self._local_shm_handles = {} # --- Stats (per-process, not shared) --- self._hits = 0 self._misses = 0 self._puts = 0 self._evictions = 0 # ------------------------------------------------------------------ # # Factory class methods # ------------------------------------------------------------------ #
[docs] @classmethod def create(cls, capacity: int, store_id: int = 0) -> 'SharedGopStore': """Allocate a new store. Call from main process before spawning workers. Args: capacity: Max number of GOPs to cache. store_id: Unique identifier (typically ``LOCAL_RANK``). """ return cls(capacity=capacity, store_id=store_id, _create=True, _key=_CREATION_KEY)
[docs] @classmethod def attach(cls, capacity: int, store_id: int = 0) -> 'SharedGopStore': """Attach to an existing store. Call from worker processes. Raises: FileNotFoundError: If the store has not been created yet. """ return cls(capacity=capacity, store_id=store_id, _create=False, _key=_CREATION_KEY)
# ------------------------------------------------------------------ # # Locking helpers (flock) # ------------------------------------------------------------------ # def _lock(self): fcntl.flock(self._lock_fd, fcntl.LOCK_EX) def _unlock(self): fcntl.flock(self._lock_fd, fcntl.LOCK_UN) def _next_tick(self) -> int: """Increment and return the monotonic tick counter. Not truly atomic across processes without the lock, but the tick is only used for LRU ordering -- a rare duplicate is harmless. """ self._tick_arr[0] += 1 return int(self._tick_arr[0]) # ------------------------------------------------------------------ # # Worker API # ------------------------------------------------------------------ #
[docs] def lookup(self, video_path: str, frame_id: int) -> Optional[GopRef]: """Lock-free lookup for a cached GOP containing *frame_id*. Returns a :class:`GopRef` on hit, ``None`` on miss. Lock-free design means the worst case is a stale miss (one extra disk read), never a correctness issue. """ vp_hash = _hash_video_path(video_path) for i in range(self.capacity): e = self._entries[i] if ( e['state'] == _STATE_USED and e['video_path_hash'] == vp_hash and e['first_frame_id'] <= frame_id < e['first_frame_id'] + e['gop_len'] ): e['access_tick'] = self._next_tick() self._hits += 1 return GopRef( shm_name=e['shm_name'].decode(), data_size=int(e['data_size']), first_frame_id=int(e['first_frame_id']), gop_len=int(e['gop_len']), ) self._misses += 1 return None
[docs] def put(self, video_path: str, first_frame_id: int, gop_len: int, data: np.ndarray) -> GopRef: """Store GOP packet data and return a :class:`GopRef`. Holds ``flock`` during eviction + insertion to guarantee atomicity. Performs a double-check after acquiring the lock (another worker may have inserted while we waited). """ self._lock() try: # Double-check: another worker may have inserted while we waited existing = self.lookup(video_path, first_frame_id) if existing is not None: return existing slot_idx = self._find_free_or_evict() # Content-addressed naming -- same GOP always gets the same name. vp_hash = _hash_video_path(video_path) shm_name = f"{_SHM_PREFIX}_{self.store_id}_{vp_hash}_{first_frame_id}" data_size = int(data.nbytes) # Clean up if same GOP was previously cached then evicted _cleanup_stale_shm(shm_name) shm = shared_memory.SharedMemory(name=shm_name, create=True, size=data_size) shm.buf[:data_size] = data.tobytes() shm.close() # close handle; shm persists until unlink # Write metadata entry self._entries[slot_idx]['video_path_hash'] = vp_hash self._entries[slot_idx]['first_frame_id'] = first_frame_id self._entries[slot_idx]['gop_len'] = gop_len self._entries[slot_idx]['data_size'] = data_size self._entries[slot_idx]['access_tick'] = self._next_tick() self._entries[slot_idx]['shm_name'] = shm_name.encode() # Set state last (acts as publish barrier for lock-free readers) self._entries[slot_idx]['state'] = _STATE_USED self._puts += 1 return GopRef( shm_name=shm_name, data_size=data_size, first_frame_id=first_frame_id, gop_len=gop_len, ) finally: self._unlock()
# ------------------------------------------------------------------ # # Main Process API # ------------------------------------------------------------------ #
[docs] def read(self, ref: GopRef) -> np.ndarray: """Zero-copy uint8 numpy view of GOP data in shared memory. Caches ``SharedMemory`` handles per-process to avoid repeated ``shm_open()`` system calls. """ shm_name = ref.shm_name if shm_name not in self._local_shm_handles: try: shm = shared_memory.SharedMemory(name=shm_name, create=False) except FileNotFoundError: import warnings warnings.warn( f"SharedGopStore: shm block '{shm_name}' not found. " f"This means a GOP was evicted before the main process " f"could read it. Increase store capacity " f"(current: {self.capacity}) — it must exceed " f"(prefetch_factor * num_workers + 1) * batch_size * num_cameras. " f"Returning zeros as fallback.", RuntimeWarning, stacklevel=2, ) return np.zeros(ref.data_size, dtype=np.uint8) self._local_shm_handles[shm_name] = shm else: shm = self._local_shm_handles[shm_name] return np.frombuffer(shm.buf[: ref.data_size], dtype=np.uint8)
[docs] def get_batch(self, refs: List[GopRef]) -> List[np.ndarray]: """Read a batch of GOPs from shared memory (zero-copy). Call once per training iteration from the main process. Holds ``flock`` during the entire operation so that no worker can evict a block while handles are being opened. After opening, orphaned shm blocks (evicted but not yet unlinked) are cleaned up. Args: refs: Flat list of :class:`GopRef` from DataLoader workers. Returns: List of zero-copy uint8 numpy views, same order as *refs*. """ self._lock() try: result = [self.read(ref) for ref in refs] self._unlink_orphans() return result finally: self._unlock()
# ------------------------------------------------------------------ # # Stats # ------------------------------------------------------------------ #
[docs] def get_stats(self) -> dict: """Per-process cache statistics.""" total = self._hits + self._misses hit_rate = self._hits / total if total > 0 else 0.0 used_slots = int(np.count_nonzero(self._entries['state'] == _STATE_USED)) return { 'hits': self._hits, 'misses': self._misses, 'hit_rate': hit_rate, 'puts': self._puts, 'evictions': self._evictions, 'pool_usage': f"{used_slots}/{self.capacity}", }
[docs] def reset_stats(self) -> None: """Reset per-process statistics counters.""" self._hits = 0 self._misses = 0 self._puts = 0 self._evictions = 0
# ------------------------------------------------------------------ # # Internal helpers # ------------------------------------------------------------------ # def _find_free_or_evict(self) -> int: """Find a FREE slot, or evict the LRU entry. Must hold flock.""" min_tick = np.iinfo(np.int64).max min_idx = 0 for i in range(self.capacity): if self._entries[i]['state'] == _STATE_FREE: return i if self._entries[i]['access_tick'] < min_tick: min_tick = int(self._entries[i]['access_tick']) min_idx = i # Evict the LRU slot. Do NOT unlink the shm block here -- workers # don't know which blocks the main process still needs. The main # process cleans up orphans in get_batch() -> _unlink_orphans(). self._entries[min_idx]['state'] = _STATE_FREE self._evictions += 1 return min_idx def _unlink_orphans(self): """Unlink shm blocks that were evicted but not yet cleaned up. Called under flock by :meth:`get_batch`. Scans ``/dev/shm`` for blocks belonging to this store that are no longer in the metadata table. """ # Active shm names from metadata table active_names = set() for i in range(self.capacity): if self._entries[i]['state'] == _STATE_USED: name = self._entries[i]['shm_name'].decode() if name: active_names.add(name) # Scan /dev/shm and unlink orphans prefix = f"{_SHM_PREFIX}_{self.store_id}_" for path in glob_mod.glob(f"/dev/shm/{prefix}*"): name = os.path.basename(path) if name not in active_names: # Close our cached handle if any if name in self._local_shm_handles: _force_close_shm(self._local_shm_handles[name]) del self._local_shm_handles[name] # Unlink the orphan try: shm = shared_memory.SharedMemory(name=name, create=False) shm.close() shm.unlink() except FileNotFoundError: pass # ------------------------------------------------------------------ # # Lifecycle # ------------------------------------------------------------------ #
[docs] def cleanup(self) -> None: """Unlink **all** SharedMemory blocks and the lock file. Call from the main process on shutdown. """ # Close local handles (may have live numpy views -> _force_close_shm) for shm in self._local_shm_handles.values(): _force_close_shm(shm) self._local_shm_handles.clear() # Glob-clean ALL shm blocks for this store (catches orphans) prefix = f"{_SHM_PREFIX}_{self.store_id}_" for path in glob_mod.glob(f"/dev/shm/{prefix}*"): name = os.path.basename(path) try: shm = shared_memory.SharedMemory(name=name, create=False) shm.close() shm.unlink() except FileNotFoundError: pass # Unlink meta and tick SharedMemory _force_close_shm(self._meta_shm) _force_close_shm(self._tick_shm) if self._is_creator: try: self._meta_shm.unlink() except FileNotFoundError: pass try: self._tick_shm.unlink() except FileNotFoundError: pass try: os.close(self._lock_fd) os.unlink(self._lock_path) except OSError: pass
[docs] def close(self) -> None: """Close SharedMemory handles **without** unlinking. Call from worker processes before exit. """ for shm in self._local_shm_handles.values(): _force_close_shm(shm) self._local_shm_handles.clear() _force_close_shm(self._meta_shm) _force_close_shm(self._tick_shm) try: os.close(self._lock_fd) except OSError: pass
# --------------------------------------------------------------------------- # Module-private helpers # --------------------------------------------------------------------------- def _force_close_shm(shm) -> None: """Close a SharedMemory handle, suppressing BufferError from live numpy views. When ``np.frombuffer(shm.buf)`` creates a zero-copy view, the numpy array holds a reference to the underlying mmap. ``shm.close()`` then raises ``BufferError: cannot close exported pointers exist``. We catch this, then clear the internal ``_mmap`` and ``_buf`` attributes so that ``SharedMemory.__del__`` (called later by GC) does not re-raise the same error. The actual backing memory is freed by the OS when the last numpy view is garbage-collected. """ try: shm.close() except BufferError: # Prevent __del__ from retrying and printing the same traceback. shm._buf = None shm._mmap = None except Exception: pass def _cleanup_stale_shm(name: str) -> None: """Remove a SharedMemory block left over from a previous run.""" try: stale = shared_memory.SharedMemory(name=name, create=False) stale.close() stale.unlink() except FileNotFoundError: pass