Source code for mojito.sampling
"""
Time and frequency grids
========================
This module provides helper classes to handle time and frequency sampling
attributes in Mojito L1 files.
.. autoclass:: UniformTimeSampling
:members:
.. autoclass:: LogUniformFrequencySampling
:members:
"""
import logging
from math import ceil, floor
from typing import Self
import numpy as np
from attrs import field, frozen
from attrs.validators import gt
from h5py import Group
from numpy.typing import NDArray
logger = logging.getLogger(__name__)
[docs]
@frozen(kw_only=True)
class UniformTimeSampling:
"""Helper class to handle sampling attributes in Mojito L1 files."""
[docs]
@classmethod
def from_h5_group(cls, group: Group) -> Self:
"""Create an instance from an HDF5 group.
Parameters
----------
group
HDF5 group containing the sampling attributes.
Returns
-------
TimeSampling
Instance populated with the group's attributes.
"""
t0_attr = group.attrs["t0"]
dt_attr = group.attrs["dt"]
size_attr = group.attrs["size"]
assert isinstance(t0_attr, np.number)
assert isinstance(dt_attr, np.number)
assert isinstance(size_attr, np.integer)
return cls(
t0=float(t0_attr),
dt=float(dt_attr),
size=int(size_attr),
)
t0: float
"""Start time of the sampling [s]."""
dt: float = field(validator=gt(0))
"""Time step of the sampling [s]."""
size: int = field(validator=gt(0))
"""Number of samples in the time series."""
@property
def fs(self) -> float:
"""Sampling frequency [Hz]."""
return 1.0 / self.dt
@property
def duration(self) -> float:
"""Duration of the time series [s].
This is defined as ``size * dt``. Note that the last timestamp is
not ``t0 + duration``, but ``t0 + (size - 1) * dt``.
"""
return self.size * self.dt
[docs]
def tmax(self, elapsed: bool = False) -> float:
"""Return the last timestamp.
The last timestamp is defined as the last element of the array returned
by :meth:`t`. Note that this is not ``t0 + duration``.
If ``elapsed`` is False, it corresponds to ``t0 + (size - 1) * dt``. If
``elapsed`` is True, the time origin is ignored and the last timestamp
is ``(size - 1) * dt``.
Parameters
----------
elapsed
If True, ignore ``t0``.
Returns
-------
Last timestamp [s].
"""
tmax = (self.size - 1) * self.dt
if not elapsed:
tmax += self.t0
return tmax
[docs]
def t(
self,
idx: slice | None = None,
*,
elapsed: bool = False,
) -> NDArray:
"""Return the uniform array of times.
The time grid is defined as ``t0 + i * dt`` for ``i`` in
``range(size)``.
Use the ``slice`` parameter to return a sliced version of the time grid.
This is useful for long datasets, as this will only allocate the
selected timestamps in memory. For example:
>>> sampling = UniformTimeSampling(t0=1.5, dt=0.5, size=10)
>>> sampling.t(slice(2, 7, 2))
array([2.5, 3.5, 4.5])
If ``elapsed`` is True, the time grid is defined as ``i * dt`` for ``i``
in ``range(size)``. This can be useful to preserve numerical precision.
Parameters
----------
idx
Slice selecting which timestamps to return.
elapsed
If True, return elapsed time grid starting at 0.
Returns
-------
NDArray
Array of times [s]. Shape is ``(size,)`` if ``idx`` is None,
otherwise it matches the selected slice length.
"""
t0 = 0 if elapsed else self.t0
if idx is None:
indices = np.arange(self.size, dtype=float)
else:
start_idx, stop_idx, step_idx = idx.indices(self.size)
indices = np.arange(start_idx, stop_idx, step_idx, dtype=float)
return t0 + indices * self.dt
[docs]
def slice_between(self, tmin: float | None, tmax: float | None) -> slice:
r"""Return a slice that contains the interval [tmin, tmax].
We guarantee that any time ``t`` such that ``tmin <= t <= tmax`` is
included in the returned slice.
.. code-block:: python
sampling = UniformTimeSampling(t0=0.0, dt=0.1, size=100)
time_array = sampling.t()
# Pick a random time between 2.3s and 5.7s
tmin = 2.3
tmax = 5.7
random_t = np.random.uniform(tmin, tmax)
# Slice the time array
time_slice = sampling.slice_between(tmin, tmax)
selected_times = time_array[time_slice]
# We guarantee that random_t is in selected_times
assert random_t in selected_times
.. warning::
Because floating-point arithmetics is not exact, we extend the slice
so that it will always contain ``tmin`` and ``tmax``. As a result,
the returned slice can be larger than ``(tmax - tmin) / dt``. Note
that it is at most ``2 * self.dt`` larger.
Parameters
----------
tmin
Lower bound of the time range [s]. If None, the slice starts at the
beginning of the time series.
tmax
Upper bound of the time range [s]. If None, the slice ends at the
end of the time series.
Returns
-------
A slice instance such that all times ``t`` with ``tmin <= t <=
tmax`` are included in ``self.t()[slice]``.
Raises
------
ValueError
If ``tmin < self.t0``, or if ``tmax > self.tmax()``.
"""
if tmin is not None and tmin < self.t0:
raise ValueError(
"tmin must be larger or equal to self.t0"
f"(got {tmin} while self.t0 is {self.t0})"
)
if tmax is not None and tmax > self.tmax():
raise ValueError(
"tmax must be larger or equal to self.tmax()"
f"(got {tmin} while self.tmax() is {self.tmax()})"
)
if tmin is None:
idx_min = None
else:
idx_min = floor((tmin - self.t0 - 1e-15 * self.dt) / self.dt)
idx_min = max(0, idx_min)
if tmax is None:
idx_max = None
else:
idx_max = ceil((tmax - self.t0 + 1e-15 * self.dt) / self.dt) + 1
idx_max = min(self.size, idx_max)
return slice(idx_min, idx_max)
[docs]
@frozen(kw_only=True)
class LogUniformFrequencySampling:
"""Helper class to handle logarithmic frequency sampling in Mojito L1 files.
Frequencies must be positive here.
"""
[docs]
@classmethod
def from_h5_group(cls, group: Group) -> Self:
"""Create an instance from an HDF5 group.
Parameters
----------
group
HDF5 group containing the frequency sampling attributes.
Returns
-------
LogUniformFrequencySampling
Instance populated with the group's attributes.
"""
fmin_attr = group.attrs["fmin"]
fmax_attr = group.attrs["fmax"]
size_attr = group.attrs["size"]
assert isinstance(fmin_attr, np.number)
assert isinstance(fmax_attr, np.number)
assert isinstance(size_attr, np.integer)
return cls(
fmin=float(fmin_attr),
fmax=float(fmax_attr),
size=int(size_attr),
)
fmin: float = field(validator=gt(0))
"""Minimum frequency [Hz]."""
fmax: float = field(validator=gt(0))
"""Maximum frequency [Hz]."""
size: int = field(validator=gt(0))
"""Number of frequencies."""
def __attrs_post_init__(self) -> None:
"""Post-initialization checks."""
if self.fmax <= self.fmin:
raise ValueError("fmax must be greater than fmin.")
[docs]
def f(self, idx: slice | None = None) -> NDArray:
r"""Return the log-uniform array of frequencies.
The frequency grid is defined as :math:`f_i = 10^{F_i}` with :math:`F_i
= \log_{10}(f_\mathrm{min}) + i \Delta \log_{10}(f)`, for :math:`i`
ranging from ``0`` to ``size - 1`` and :math:`\Delta \log_{10}(f) =
(\log_{10}(f_\mathrm{max}) - \log_{10}(f_\mathrm{min})) / (\text{size} -
1)`.
Use the ``idx`` parameter to return a sliced version of the frequency
grid. This is useful for long datasets, as this will only allocate the
selected frequencies in memory. For example:
>>> sampling = LogUniformFrequencySampling(fmin=1.0, fmax=100.0, size=10)
>>> sampling.f(slice(2, 7, 2))
array([ 2.7825594 , 7.74263683, 21.5443469 ])
Parameters
----------
idx
Slice selecting which frequencies to return.
Returns
-------
NDArray
Array of frequencies [Hz]. Shape is ``(size,)`` if ``idx`` is
None, otherwise it matches the selected slice length.
"""
if idx is None:
indices = np.arange(self.size, dtype=float)
else:
start_idx, stop_idx, step_idx = idx.indices(self.size)
indices = np.arange(start_idx, stop_idx, step_idx, dtype=float)
log_fmin = np.log10(self.fmin)
if self.size == 1:
log_f = np.full_like(indices, log_fmin)
else:
delta_log_f = (np.log10(self.fmax) - log_fmin) / (self.size - 1)
log_f = log_fmin + indices * delta_log_f
return np.power(10.0, log_f)