Source code for mojito.lazy

"""
Lazy datasets
=============

This module provides lightweight wrappers that expose array-like, lazy access to
datasets without loading full arrays into memory.

The interface is defined by :class:`DatasetLike` and mimics that of h5py
datasets, allowing users to slice and access data on demand. Specific
implementations load the necessary data and apply transformations, such as
stacking, scaling, summing, etc., only for the requested slices.

.. code-block:: python

    xyz = LazyStackedDataset([ds1, ds2, ds3])

    # Loaded and computed only for this slice
    xyz_slice = xyz[100:200]

    # Writing is supported for LazyScaledDataset and LazyStackedDataset
    xyz[100:200] = new_data
    scaled = LazyScaledDataset(ds, scale_factor=2.0)
    scaled[50:100] = values  # Applies inverse scaling before writing

    combined_signal = LazySumDataset([ds1, ds2])
    combined_flags = LazyBooleanOrDataset([flags1, flags2])

    # Loaded and computed only for this slice (read-only)
    signal_slice = combined_signal[0:1024]
    flags_slice = combined_flags[0:1024]

Some operations also supports writing. For example, :class:`LazyScaledDataset`
applies the inverse scaling operation before writing to the underlying dataset.

.. code-block:: python

    xyz = LazyScaledDataset(ds, scale_factor=2.0)
    xyz[100:200] = new_values  # Writes new_values / 2.0 to ds[100:200]

Dataset interface
-----------------

.. autoclass:: DatasetLike
    :members:
    :special-members: __getitem__, __setitem__

Operations on datasets
----------------------

.. autoclass:: LazyScaledDataset
    :members:
    :special-members: __getitem__, __setitem__

.. autoclass:: LazySumDataset
    :members:
    :special-members: __getitem__

.. autoclass:: LazyBooleanOrDataset
    :members:
    :special-members: __getitem__

Stacking datasets
-----------------

.. autoclass:: LazyStackedDataset
    :members:
    :special-members: __getitem__, __setitem__

"""

from typing import Any, Protocol, Sequence, runtime_checkable

import numpy as np


[docs] @runtime_checkable class DatasetLike(Protocol): """Protocol for (potentially lazy) datasets with NumPy-style access. Implementations must expose shape, dimensionality, dtype and support slicing through ``__getitem__`` similarly to ``h5py.Dataset``. Writing via ``__setitem__`` is optional and may not be supported by all implementations. """ @property def shape(self) -> tuple[int, ...]: """Shape of the dataset.""" ... @property def ndim(self) -> int: """Number of dimensions.""" ... @property def dtype(self) -> np.dtype[Any]: """Data type of the dataset.""" ...
[docs] def __getitem__(self, *args: Any, **kwargs: Any) -> Any: """Return a sliced array.""" ...
[docs] def __setitem__(self, *args: Any, **kwargs: Any) -> None: """Write to the dataset. Raises ------ NotImplementedError If writing is not supported by the implementation. """ ...
[docs] class LazyScaledDataset: """Lazy-loaded dataset that applies scaling on slicing. This is useful to provide access to scaled datasets without loading all data into memory at once, similar to h5py datasets. Parameters ---------- dataset HDF5 dataset to scale. scale_factor Scaling factor to apply to all values. """ def __init__(self, dataset: DatasetLike, scale_factor: float = 1.0) -> None: self.dataset = dataset self.scale_factor = scale_factor @property def shape(self) -> tuple[int, ...]: """Shape of the dataset.""" return self.dataset.shape @property def ndim(self) -> int: """Number of dimensions.""" return self.dataset.ndim @property def dtype(self) -> np.dtype: """Data type of the dataset.""" return self.dataset.dtype
[docs] def __getitem__(self, key) -> Any: """Slice and scale the dataset lazily. Parameters ---------- key Slicing key (integers, slices, lists, ellipsis). Returns ------- NDArray Scaled sliced array from the dataset. """ return self.dataset[key] * self.scale_factor
[docs] def __setitem__(self, key: Any, value: Any) -> None: """Write scaled values to the dataset lazily. The inverse scaling operation is applied before writing to the underlying dataset. Parameters ---------- key Slicing key (integers, slices, lists, ellipsis). value Data to write. Will be divided by ``scale_factor`` before writing. Raises ------ ValueError If scale_factor is zero (inverse scaling undefined). """ if self.scale_factor == 0: raise ValueError("Cannot write to dataset with scale_factor=0") self.dataset[key] = value / self.scale_factor
[docs] class LazySumDataset: """Lazy-loaded dataset that sums values across multiple datasets on slicing. This is useful for lazily combining multiple datasets via element-wise summation without loading all data into memory at once. Parameters ---------- datasets Sequence of HDF5 datasets to sum. Raises ------ ValueError If datasets list is empty. """ def __init__(self, datasets: Sequence[DatasetLike]) -> None: self.datasets = list(datasets) if not self.datasets: raise ValueError("At least one dataset is required") @property def shape(self) -> tuple[int, ...]: """Shape of the dataset.""" return self.datasets[0].shape @property def ndim(self) -> int: """Number of dimensions.""" return len(self.shape) @property def dtype(self) -> np.dtype: """Data type of the dataset.""" return self.datasets[0].dtype
[docs] def __getitem__(self, key) -> Any: """Slice and sum all datasets lazily. Parameters ---------- key Slicing key (integers, slices, lists, ellipsis). Returns ------- NDArray Sum of sliced arrays from all datasets. """ # Sum slices from all datasets slices = [ds[key] for ds in self.datasets] return np.sum(slices, axis=0)
def __setitem__(self, *args: Any, **kwargs: Any) -> None: """Writing is not supported for summed datasets. Raises ------ NotImplementedError Always, as there is no unambiguous way to distribute a write across multiple source datasets. """ raise NotImplementedError
[docs] class LazyBooleanOrDataset: """Lazy-loaded dataset that applies Boolean OR across multiple datasets. This is useful for lazily combining flag datasets via element-wise Boolean OR without loading all data into memory at once. Parameters ---------- datasets Sequence of HDF5 datasets to OR. Raises ------ ValueError If datasets list is empty. """ def __init__(self, datasets: Sequence[DatasetLike]) -> None: self.datasets = list(datasets) if not self.datasets: raise ValueError("At least one dataset is required") @property def shape(self) -> tuple[int, ...]: """Shape of the dataset.""" return self.datasets[0].shape @property def ndim(self) -> int: """Number of dimensions.""" return len(self.shape) @property def dtype(self) -> np.dtype: """Data type of the dataset.""" return self.datasets[0].dtype
[docs] def __getitem__(self, key) -> Any: """Slice and OR all datasets lazily. Parameters ---------- key Slicing key (integers, slices, lists, ellipsis). Returns ------- NDArray Boolean OR of sliced arrays from all datasets. """ # OR slices from all datasets slices = [ds[key] for ds in self.datasets] return np.logical_or.reduce(slices)
def __setitem__(self, *args: Any, **kwargs: Any) -> None: """Writing is not supported for Boolean OR datasets. Raises ------ NotImplementedError Always, as the inverse operation of Boolean OR is undefined. """ raise NotImplementedError
[docs] class LazyStackedDataset: """Lazy-loaded stacked dataset that does not load all data into memory. This is useful to provide access to stacked and normalized datasets without loading all data into memory at once, similar to h5py datasets. >>> stacked = LazyStackedDataset([ds1, ds2, ds3]) >>> print(stacked.shape) (1000, 3) >>> data_slice = stacked[100:200] # Loads only the requested slice >>> data_slice2 = stacked[100:200, ..., 0] # Slicing along stacked axis Parameters ---------- datasets Sequence of HDF5 datasets to stack. axis Axis along which to stack the datasets. Raises ------ ValueError If ``axis`` is out of bounds for stacking. """ def __init__( self, datasets: Sequence[DatasetLike], axis: int = -1, ) -> None: self.datasets = list(datasets) self._validate_datasets() self.axis = axis # Compute the shape of the stacked array shape = list(self.datasets[0].shape) if self.axis < -len(shape) - 1 or self.axis > len(shape): raise ValueError("Axis out of bounds for stacking") if self.axis < 0: shape.insert(len(shape) + self.axis + 1, len(datasets)) else: shape.insert(self.axis, len(datasets)) self._shape = tuple(shape) def _validate_datasets(self) -> None: """Make sure all datasets have the same shape and dtype. Raises ------ ValueError If ``self.datasets`` is empty. ValueError If all elements in ``self.datasets`` do not have the same shape. ValueError If all elements in ``self.datasets`` do not have the same dtype. """ # Make sure there is at least one dataset if not self.datasets: raise ValueError("At least one dataset is required") # Make sure all datasets have the same shape and dtype first_shape = self.datasets[0].shape first_dtype = self.datasets[0].dtype for ds in self.datasets: if ds.shape != first_shape: raise ValueError("All datasets must have the same shape") if ds.dtype != first_dtype: raise ValueError("All datasets must have the same dtype") @property def normalized_axis(self) -> int: """Normalize axis to a positive value.""" axis = self.axis if axis < 0: axis = self.ndim + axis return axis @property def shape(self) -> tuple[int, ...]: """Shape of the stacked array.""" return self._shape @property def ndim(self) -> int: """Number of dimensions of the stacked array.""" return len(self._shape) @property def dtype(self) -> np.dtype: """Data type of the stacked array.""" return self.datasets[0].dtype
[docs] def __getitem__(self, key) -> Any: """Slice the stacked datasets on demand without loading all data. Supports slicing along all dimensions including the stacked axis. The stacked axis can be sliced using integers, slices, or lists of indices. Note that fancy indexing (i.e., using arrays of indices for multiple axes) for non-stacked axes is not supported due to a limitation in h5py. However, it is supported for a single non-stacked axis at a time. Insertion of None or np.newaxis is not supported. Parameters ---------- key Slicing key, can include integers (Python int or Numpy integer scalars), slices, lists of indices, and ellipsis. Returns ------- Any Sliced and stacked array. Raises ------ IndexError If None or np.newaxis is used in the key. IndexError If too many indices are provided for the array. """ # Convert key to a tuple if not isinstance(key, tuple): key = (key,) # Make sure None or np.newaxis are not used if any(k is None for k in key): raise IndexError("None or np.newaxis indexing is not supported") # Convert numpy scalar indices to Python scalars key = tuple(int(k) if isinstance(k, np.integer) else k for k in key) # Expand ellipsis in key if Ellipsis in key: ellipsis_index = key.index(Ellipsis) num_missing = self.ndim - (len(key) - 1) key = ( key[:ellipsis_index] + (slice(None),) * num_missing + key[ellipsis_index + 1 :] ) # Make sure key has the right number of dimensions if len(key) < self.ndim: key = key + (slice(None),) * (self.ndim - len(key)) elif len(key) > self.ndim: raise IndexError("Too many indices for array") # Select datasets along the stacked axis stacked_axis = self.normalized_axis dataset_slice = key[stacked_axis] if isinstance(dataset_slice, int): dataset_indices = [dataset_slice] elif isinstance(dataset_slice, slice): dataset_slice_idx = dataset_slice.indices(len(self.datasets)) dataset_indices = list(range(*dataset_slice_idx)) else: dataset_indices = list(dataset_slice) # Prepare the slices for each dataset remaining_slices_list = list(key) del remaining_slices_list[stacked_axis] remaining_slices = tuple(remaining_slices_list) slices = [self.datasets[i][remaining_slices] for i in dataset_indices] # If only one dataset is selected, collapse the stacked axis if isinstance(dataset_slice, int): return slices[0] # Adjust stacked_axis to account for collapsed dimensions from integer # slicing: count how many integer indices appear before the stacked axis num_collapsed_before = sum( 1 for i, idx in enumerate(key) if i < stacked_axis and isinstance(idx, int) ) adjusted_stacked_axis = stacked_axis - num_collapsed_before # Stack the selected slices along the adjusted stacked axis stacked_array = np.stack(slices, axis=adjusted_stacked_axis) return stacked_array
[docs] def __setitem__(self, key: Any, value: Any) -> None: """Write to stacked datasets on demand. The value is unstacked along the stacked axis and distributed to the appropriate underlying datasets. Parameters ---------- key Slicing key, can include integers, slices, or lists of indices. value Data to write. Must be compatible with the sliced shape. Raises ------ IndexError If None or np.newaxis is used in the key, or if too many indices are provided. ValueError If value shape is incompatible with the requested slice. """ # Convert key to a tuple if not isinstance(key, tuple): key = (key,) # Make sure None or np.newaxis are not used if any(k is None for k in key): raise IndexError("None or np.newaxis indexing is not supported") # Convert numpy scalar indices to Python scalars key = tuple(int(k) if isinstance(k, np.integer) else k for k in key) # Expand ellipsis in key if Ellipsis in key: ellipsis_index = key.index(Ellipsis) num_missing = self.ndim - (len(key) - 1) key = ( key[:ellipsis_index] + (slice(None),) * num_missing + key[ellipsis_index + 1 :] ) # Make sure key has the right number of dimensions if len(key) < self.ndim: key = key + (slice(None),) * (self.ndim - len(key)) elif len(key) > self.ndim: raise IndexError("Too many indices for array") # Convert value to array value = np.asarray(value) # Select datasets along the stacked axis stacked_axis = self.normalized_axis dataset_slice = key[stacked_axis] if isinstance(dataset_slice, int): dataset_indices = [dataset_slice] elif isinstance(dataset_slice, slice): dataset_slice_idx = dataset_slice.indices(len(self.datasets)) dataset_indices = list(range(*dataset_slice_idx)) else: dataset_indices = list(dataset_slice) # Prepare the slices for each dataset remaining_slices_list = list(key) del remaining_slices_list[stacked_axis] remaining_slices = tuple(remaining_slices_list) # If only one dataset is selected, write directly if isinstance(dataset_slice, int): self.datasets[dataset_indices[0]][remaining_slices] = value else: # Unstack the value along the stacked axis and write to each dataset # Count how many integer indices appear before the stacked axis to find # where the stacked axis ends up in the value array num_collapsed_before = sum( 1 for i, idx in enumerate(key) if i < stacked_axis and isinstance(idx, int) ) value_stacked_axis = stacked_axis - num_collapsed_before # Split value along the stacked axis and write to each dataset for i, ds_idx in enumerate(dataset_indices): value_slice = np.take(value, i, axis=value_stacked_axis) self.datasets[ds_idx][remaining_slices] = value_slice