"""
Lazy datasets
=============
This module provides lightweight wrappers that expose array-like, lazy access to
datasets without loading full arrays into memory.
The interface is defined by :class:`DatasetLike` and mimics that of h5py
datasets, allowing users to slice and access data on demand. Specific
implementations load the necessary data and apply transformations, such as
stacking, scaling, summing, etc., only for the requested slices.
.. code-block:: python
xyz = LazyStackedDataset([ds1, ds2, ds3])
# Loaded and computed only for this slice
xyz_slice = xyz[100:200]
# Writing is supported for LazyScaledDataset and LazyStackedDataset
xyz[100:200] = new_data
scaled = LazyScaledDataset(ds, scale_factor=2.0)
scaled[50:100] = values # Applies inverse scaling before writing
combined_signal = LazySumDataset([ds1, ds2])
combined_flags = LazyBooleanOrDataset([flags1, flags2])
# Loaded and computed only for this slice (read-only)
signal_slice = combined_signal[0:1024]
flags_slice = combined_flags[0:1024]
Some operations also supports writing. For example, :class:`LazyScaledDataset`
applies the inverse scaling operation before writing to the underlying dataset.
.. code-block:: python
xyz = LazyScaledDataset(ds, scale_factor=2.0)
xyz[100:200] = new_values # Writes new_values / 2.0 to ds[100:200]
Dataset interface
-----------------
.. autoclass:: DatasetLike
:members:
:special-members: __getitem__, __setitem__
Operations on datasets
----------------------
.. autoclass:: LazyScaledDataset
:members:
:special-members: __getitem__, __setitem__
.. autoclass:: LazySumDataset
:members:
:special-members: __getitem__
.. autoclass:: LazyBooleanOrDataset
:members:
:special-members: __getitem__
Stacking datasets
-----------------
.. autoclass:: LazyStackedDataset
:members:
:special-members: __getitem__, __setitem__
"""
from typing import Any, Protocol, Sequence, runtime_checkable
import numpy as np
[docs]
@runtime_checkable
class DatasetLike(Protocol):
"""Protocol for (potentially lazy) datasets with NumPy-style access.
Implementations must expose shape, dimensionality, dtype and support slicing
through ``__getitem__`` similarly to ``h5py.Dataset``. Writing via
``__setitem__`` is optional and may not be supported by all implementations.
"""
@property
def shape(self) -> tuple[int, ...]:
"""Shape of the dataset."""
...
@property
def ndim(self) -> int:
"""Number of dimensions."""
...
@property
def dtype(self) -> np.dtype[Any]:
"""Data type of the dataset."""
...
[docs]
def __getitem__(self, *args: Any, **kwargs: Any) -> Any:
"""Return a sliced array."""
...
[docs]
def __setitem__(self, *args: Any, **kwargs: Any) -> None:
"""Write to the dataset.
Raises
------
NotImplementedError
If writing is not supported by the implementation.
"""
...
[docs]
class LazyScaledDataset:
"""Lazy-loaded dataset that applies scaling on slicing.
This is useful to provide access to scaled datasets without loading all data
into memory at once, similar to h5py datasets.
Parameters
----------
dataset
HDF5 dataset to scale.
scale_factor
Scaling factor to apply to all values.
"""
def __init__(self, dataset: DatasetLike, scale_factor: float = 1.0) -> None:
self.dataset = dataset
self.scale_factor = scale_factor
@property
def shape(self) -> tuple[int, ...]:
"""Shape of the dataset."""
return self.dataset.shape
@property
def ndim(self) -> int:
"""Number of dimensions."""
return self.dataset.ndim
@property
def dtype(self) -> np.dtype:
"""Data type of the dataset."""
return self.dataset.dtype
[docs]
def __getitem__(self, key) -> Any:
"""Slice and scale the dataset lazily.
Parameters
----------
key
Slicing key (integers, slices, lists, ellipsis).
Returns
-------
NDArray
Scaled sliced array from the dataset.
"""
return self.dataset[key] * self.scale_factor
[docs]
def __setitem__(self, key: Any, value: Any) -> None:
"""Write scaled values to the dataset lazily.
The inverse scaling operation is applied before writing to the
underlying dataset.
Parameters
----------
key
Slicing key (integers, slices, lists, ellipsis).
value
Data to write. Will be divided by ``scale_factor`` before writing.
Raises
------
ValueError
If scale_factor is zero (inverse scaling undefined).
"""
if self.scale_factor == 0:
raise ValueError("Cannot write to dataset with scale_factor=0")
self.dataset[key] = value / self.scale_factor
[docs]
class LazySumDataset:
"""Lazy-loaded dataset that sums values across multiple datasets on slicing.
This is useful for lazily combining multiple datasets via element-wise
summation without loading all data into memory at once.
Parameters
----------
datasets
Sequence of HDF5 datasets to sum.
Raises
------
ValueError
If datasets list is empty.
"""
def __init__(self, datasets: Sequence[DatasetLike]) -> None:
self.datasets = list(datasets)
if not self.datasets:
raise ValueError("At least one dataset is required")
@property
def shape(self) -> tuple[int, ...]:
"""Shape of the dataset."""
return self.datasets[0].shape
@property
def ndim(self) -> int:
"""Number of dimensions."""
return len(self.shape)
@property
def dtype(self) -> np.dtype:
"""Data type of the dataset."""
return self.datasets[0].dtype
[docs]
def __getitem__(self, key) -> Any:
"""Slice and sum all datasets lazily.
Parameters
----------
key
Slicing key (integers, slices, lists, ellipsis).
Returns
-------
NDArray
Sum of sliced arrays from all datasets.
"""
# Sum slices from all datasets
slices = [ds[key] for ds in self.datasets]
return np.sum(slices, axis=0)
def __setitem__(self, *args: Any, **kwargs: Any) -> None:
"""Writing is not supported for summed datasets.
Raises
------
NotImplementedError
Always, as there is no unambiguous way to distribute a write across
multiple source datasets.
"""
raise NotImplementedError
[docs]
class LazyBooleanOrDataset:
"""Lazy-loaded dataset that applies Boolean OR across multiple datasets.
This is useful for lazily combining flag datasets via element-wise Boolean
OR without loading all data into memory at once.
Parameters
----------
datasets
Sequence of HDF5 datasets to OR.
Raises
------
ValueError
If datasets list is empty.
"""
def __init__(self, datasets: Sequence[DatasetLike]) -> None:
self.datasets = list(datasets)
if not self.datasets:
raise ValueError("At least one dataset is required")
@property
def shape(self) -> tuple[int, ...]:
"""Shape of the dataset."""
return self.datasets[0].shape
@property
def ndim(self) -> int:
"""Number of dimensions."""
return len(self.shape)
@property
def dtype(self) -> np.dtype:
"""Data type of the dataset."""
return self.datasets[0].dtype
[docs]
def __getitem__(self, key) -> Any:
"""Slice and OR all datasets lazily.
Parameters
----------
key
Slicing key (integers, slices, lists, ellipsis).
Returns
-------
NDArray
Boolean OR of sliced arrays from all datasets.
"""
# OR slices from all datasets
slices = [ds[key] for ds in self.datasets]
return np.logical_or.reduce(slices)
def __setitem__(self, *args: Any, **kwargs: Any) -> None:
"""Writing is not supported for Boolean OR datasets.
Raises
------
NotImplementedError
Always, as the inverse operation of Boolean OR is undefined.
"""
raise NotImplementedError
[docs]
class LazyStackedDataset:
"""Lazy-loaded stacked dataset that does not load all data into memory.
This is useful to provide access to stacked and normalized datasets without
loading all data into memory at once, similar to h5py datasets.
>>> stacked = LazyStackedDataset([ds1, ds2, ds3])
>>> print(stacked.shape)
(1000, 3)
>>> data_slice = stacked[100:200] # Loads only the requested slice
>>> data_slice2 = stacked[100:200, ..., 0] # Slicing along stacked axis
Parameters
----------
datasets
Sequence of HDF5 datasets to stack.
axis
Axis along which to stack the datasets.
Raises
------
ValueError
If ``axis`` is out of bounds for stacking.
"""
def __init__(
self,
datasets: Sequence[DatasetLike],
axis: int = -1,
) -> None:
self.datasets = list(datasets)
self._validate_datasets()
self.axis = axis
# Compute the shape of the stacked array
shape = list(self.datasets[0].shape)
if self.axis < -len(shape) - 1 or self.axis > len(shape):
raise ValueError("Axis out of bounds for stacking")
if self.axis < 0:
shape.insert(len(shape) + self.axis + 1, len(datasets))
else:
shape.insert(self.axis, len(datasets))
self._shape = tuple(shape)
def _validate_datasets(self) -> None:
"""Make sure all datasets have the same shape and dtype.
Raises
------
ValueError
If ``self.datasets`` is empty.
ValueError
If all elements in ``self.datasets`` do not have the same shape.
ValueError
If all elements in ``self.datasets`` do not have the same dtype.
"""
# Make sure there is at least one dataset
if not self.datasets:
raise ValueError("At least one dataset is required")
# Make sure all datasets have the same shape and dtype
first_shape = self.datasets[0].shape
first_dtype = self.datasets[0].dtype
for ds in self.datasets:
if ds.shape != first_shape:
raise ValueError("All datasets must have the same shape")
if ds.dtype != first_dtype:
raise ValueError("All datasets must have the same dtype")
@property
def normalized_axis(self) -> int:
"""Normalize axis to a positive value."""
axis = self.axis
if axis < 0:
axis = self.ndim + axis
return axis
@property
def shape(self) -> tuple[int, ...]:
"""Shape of the stacked array."""
return self._shape
@property
def ndim(self) -> int:
"""Number of dimensions of the stacked array."""
return len(self._shape)
@property
def dtype(self) -> np.dtype:
"""Data type of the stacked array."""
return self.datasets[0].dtype
[docs]
def __getitem__(self, key) -> Any:
"""Slice the stacked datasets on demand without loading all data.
Supports slicing along all dimensions including the stacked axis. The
stacked axis can be sliced using integers, slices, or lists of indices.
Note that fancy indexing (i.e., using arrays of indices for multiple
axes) for non-stacked axes is not supported due to a limitation in h5py.
However, it is supported for a single non-stacked axis at a time.
Insertion of None or np.newaxis is not supported.
Parameters
----------
key
Slicing key, can include integers (Python int or Numpy integer
scalars), slices, lists of indices, and ellipsis.
Returns
-------
Any
Sliced and stacked array.
Raises
------
IndexError
If None or np.newaxis is used in the key.
IndexError
If too many indices are provided for the array.
"""
# Convert key to a tuple
if not isinstance(key, tuple):
key = (key,)
# Make sure None or np.newaxis are not used
if any(k is None for k in key):
raise IndexError("None or np.newaxis indexing is not supported")
# Convert numpy scalar indices to Python scalars
key = tuple(int(k) if isinstance(k, np.integer) else k for k in key)
# Expand ellipsis in key
if Ellipsis in key:
ellipsis_index = key.index(Ellipsis)
num_missing = self.ndim - (len(key) - 1)
key = (
key[:ellipsis_index]
+ (slice(None),) * num_missing
+ key[ellipsis_index + 1 :]
)
# Make sure key has the right number of dimensions
if len(key) < self.ndim:
key = key + (slice(None),) * (self.ndim - len(key))
elif len(key) > self.ndim:
raise IndexError("Too many indices for array")
# Select datasets along the stacked axis
stacked_axis = self.normalized_axis
dataset_slice = key[stacked_axis]
if isinstance(dataset_slice, int):
dataset_indices = [dataset_slice]
elif isinstance(dataset_slice, slice):
dataset_slice_idx = dataset_slice.indices(len(self.datasets))
dataset_indices = list(range(*dataset_slice_idx))
else:
dataset_indices = list(dataset_slice)
# Prepare the slices for each dataset
remaining_slices_list = list(key)
del remaining_slices_list[stacked_axis]
remaining_slices = tuple(remaining_slices_list)
slices = [self.datasets[i][remaining_slices] for i in dataset_indices]
# If only one dataset is selected, collapse the stacked axis
if isinstance(dataset_slice, int):
return slices[0]
# Adjust stacked_axis to account for collapsed dimensions from integer
# slicing: count how many integer indices appear before the stacked axis
num_collapsed_before = sum(
1 for i, idx in enumerate(key) if i < stacked_axis and isinstance(idx, int)
)
adjusted_stacked_axis = stacked_axis - num_collapsed_before
# Stack the selected slices along the adjusted stacked axis
stacked_array = np.stack(slices, axis=adjusted_stacked_axis)
return stacked_array
[docs]
def __setitem__(self, key: Any, value: Any) -> None:
"""Write to stacked datasets on demand.
The value is unstacked along the stacked axis and distributed to the
appropriate underlying datasets.
Parameters
----------
key
Slicing key, can include integers, slices, or lists of indices.
value
Data to write. Must be compatible with the sliced shape.
Raises
------
IndexError
If None or np.newaxis is used in the key, or if too many indices
are provided.
ValueError
If value shape is incompatible with the requested slice.
"""
# Convert key to a tuple
if not isinstance(key, tuple):
key = (key,)
# Make sure None or np.newaxis are not used
if any(k is None for k in key):
raise IndexError("None or np.newaxis indexing is not supported")
# Convert numpy scalar indices to Python scalars
key = tuple(int(k) if isinstance(k, np.integer) else k for k in key)
# Expand ellipsis in key
if Ellipsis in key:
ellipsis_index = key.index(Ellipsis)
num_missing = self.ndim - (len(key) - 1)
key = (
key[:ellipsis_index]
+ (slice(None),) * num_missing
+ key[ellipsis_index + 1 :]
)
# Make sure key has the right number of dimensions
if len(key) < self.ndim:
key = key + (slice(None),) * (self.ndim - len(key))
elif len(key) > self.ndim:
raise IndexError("Too many indices for array")
# Convert value to array
value = np.asarray(value)
# Select datasets along the stacked axis
stacked_axis = self.normalized_axis
dataset_slice = key[stacked_axis]
if isinstance(dataset_slice, int):
dataset_indices = [dataset_slice]
elif isinstance(dataset_slice, slice):
dataset_slice_idx = dataset_slice.indices(len(self.datasets))
dataset_indices = list(range(*dataset_slice_idx))
else:
dataset_indices = list(dataset_slice)
# Prepare the slices for each dataset
remaining_slices_list = list(key)
del remaining_slices_list[stacked_axis]
remaining_slices = tuple(remaining_slices_list)
# If only one dataset is selected, write directly
if isinstance(dataset_slice, int):
self.datasets[dataset_indices[0]][remaining_slices] = value
else:
# Unstack the value along the stacked axis and write to each dataset
# Count how many integer indices appear before the stacked axis to find
# where the stacked axis ends up in the value array
num_collapsed_before = sum(
1
for i, idx in enumerate(key)
if i < stacked_axis and isinstance(idx, int)
)
value_stacked_axis = stacked_axis - num_collapsed_before
# Split value along the stacked axis and write to each dataset
for i, ds_idx in enumerate(dataset_indices):
value_slice = np.take(value, i, axis=value_stacked_axis)
self.datasets[ds_idx][remaining_slices] = value_slice