Skip to content

Multichannel

gwmock_signal.multichannel

Multi-channel GW strain stacks (fixed detector order).

DetectorStrainStack

Aligned strains for a fixed list of detectors (one GWpy series per row).

See docs/user_guide/multi-channel-strains.md (examples) and docs/api/multichannel/index.md (API reference).

Source code in src/gwmock_signal/multichannel/stack.py
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
class DetectorStrainStack:
    """Aligned strains for a fixed list of detectors (one GWpy series per row).

    See ``docs/user_guide/multi-channel-strains.md`` (examples) and
    ``docs/api/multichannel/index.md`` (API reference).
    """

    def __init__(self, detector_names: tuple[str, ...], channels: tuple[TimeSeries, ...]) -> None:
        """Prefer the ``from_mapping`` classmethod for public construction.

        Args:
            detector_names: Channel order; row ``i`` of ``data`` corresponds to
                ``detector_names[i]``.
            channels: Tuple of GWpy time series objects, one per detector.

        Raises:
            ValueError: If ``detector_names`` and ``channels`` have different lengths.
            ValueError: If ``detector_names`` is not unique.
            ValueError: If ``channels`` is empty.
            TypeError: If a channel is not a GWpy [`TimeSeries`](https://gwpy.github.io/docs/latest/api/gwpy.timeseries.TimeSeries/).
            ValueError: If the channels are not aligned on the same grid.
            ValueError: If the channels are not compatible (mismatched unit or sample rate).

        """
        if len(detector_names) != len(channels):
            raise ValueError("detector_names and channels must have the same length.")
        if len(set(detector_names)) != len(detector_names):
            raise ValueError("detector_names must be unique.")
        for i, s in enumerate(channels):
            if not isinstance(s, TimeSeries):
                raise TypeError(f"channels[{i}] must be gwpy.timeseries.TimeSeries, got {type(s)}")
        _validate_aligned_channels(channels)
        self._names = detector_names
        self._channels = channels

    @classmethod
    def from_mapping(
        cls,
        detector_names: Sequence[str],
        strains: Mapping[str, TimeSeries],
    ) -> DetectorStrainStack:
        """Build a stack in the given detector order; validate a single shared time grid.

        Args:
            detector_names: Channel order; row ``i`` of ``data`` corresponds to
                ``detector_names[i]``.
            strains: Mapping containing every listed name. Extra keys are ignored.

        Returns:
            New ``DetectorStrainStack`` instance.

        Raises:
            KeyError: If any ``detector_names`` entry is missing from ``strains``.
            ValueError: If channels are not aligned on the same grid.
            TypeError: If a strain is not a GWpy [`TimeSeries`](https://gwpy.github.io/docs/latest/api/gwpy.timeseries.TimeSeries/).
        """
        names = tuple(str(n) for n in detector_names)
        if not names:
            raise ValueError("detector_names must be non-empty.")
        missing = [n for n in names if n not in strains]
        if missing:
            raise KeyError(f"Missing strain keys for detectors: {missing}")
        channels = []
        for n in names:
            s = strains[n]
            if not isinstance(s, TimeSeries):
                raise TypeError(f"strains[{n!r}] must be gwpy.timeseries.TimeSeries, got {type(s)}")
            channels.append(s)
        ch_tuple = tuple(channels)
        _validate_aligned_channels(ch_tuple)
        return cls(names, ch_tuple)

    @property
    def detector_names(self) -> tuple[str, ...]:
        """IFO order used for stacking (immutable)."""
        return self._names

    @property
    def t0(self):
        """GPS start of the first sample (same for every channel)."""
        return self._channels[0].t0

    @property
    def sample_rate(self) -> Quantity:
        """Sample rate (GWpy quantity), identical for every channel.

        Returns:
            Sample rate of the first channel.
        """
        return self._channels[0].sample_rate

    @property
    def data(self) -> np.ndarray:
        """Strain samples shaped ``(n_detectors, n_samples)`` (C-contiguous copy).

        Returns:
            Strain samples shaped ``(n_detectors, n_samples)`` (C-contiguous copy).
        """
        rows = [np.asarray(s.value, dtype=float) for s in self._channels]
        return np.stack(rows, axis=0).copy()

    def __getitem__(self, key: int | str) -> TimeSeries:
        """Return one channel by index or detector name (same object as stored).

        Args:
            key: Index or detector name.

        Returns:
            GWpy TimeSeries object for the specified channel.

        Raises:
            KeyError: If the key is not found.
            TypeError: If the key is not an integer or string.
            TypeError: If the key is a boolean.
        """
        if isinstance(key, str):
            try:
                idx = self._names.index(key)
            except ValueError as exc:
                raise KeyError(key) from exc
            return self._channels[idx]
        if isinstance(key, bool):
            raise TypeError("index must be int or str, got bool")
        if isinstance(key, int):
            return self._channels[key]
        raise TypeError(f"index must be int or str, got {type(key)}")

    def __len__(self) -> int:
        """Number of detectors (channels).

        Returns:
            Number of detectors (channels).
        """
        return len(self._channels)

    def write(
        self,
        path: str | Path,
        format: Literal["gwf", "hdf5", "npy", "txt"] = "hdf5",  # noqa: A002
    ) -> None:
        """Write the stack to a file.

        Args:
            path: Output file path.
            format: Output format — one of ``'gwf'``, ``'hdf5'``, ``'npy'``,
                or ``'txt'``. Defaults to ``'hdf5'``.

        Raises:
            ValueError: If ``format`` is not recognised.

        Note:
            GWF writing requires an optional frame library
            (``python-ldas-tools-framecpp`` or ``framel``).  Install via your
            system package manager or conda.
        """
        path = Path(path)
        t0_val = float(self.t0.value)
        dt_val = 1.0 / float(self.sample_rate.value)
        channel_names = list(self._names)

        if format == "gwf":
            tsd = TimeSeriesDict()
            for name, ts in zip(self._names, self._channels, strict=True):
                ts_named = ts.copy()
                ts_named.name = name
                tsd[name] = ts_named
            tsd.write(str(path), format="gwf")

        elif format == "hdf5":
            with h5py.File(path, "w") as fh:
                # Preserve the detector/channel ordering explicitly since HDF5
                # group key iteration order is not guaranteed across environments.
                fh.attrs[_HDF5_STACK_ORDER_ATTR] = json.dumps(channel_names)
                for name, ts in zip(self._names, self._channels, strict=True):
                    ds = fh.create_dataset(name, data=np.asarray(ts.value, dtype=np.float64))
                    ds.attrs["t0"] = t0_val
                    ds.attrs["dt"] = dt_val
                    ds.attrs["unit"] = "strain"

        elif format == "npy":
            arr = np.stack([np.asarray(ts.value, dtype=np.float64) for ts in self._channels], axis=1)
            np.save(path, arr)
            sidecar = Path(path).with_suffix(".json")
            sidecar.write_text(json.dumps({"t0": t0_val, "dt": dt_val, "channels": channel_names}))

        elif format == "txt":
            arr = np.stack([np.asarray(ts.value, dtype=np.float64) for ts in self._channels], axis=1)
            header = f"t0={t0_val} dt={dt_val} channels={','.join(channel_names)}"
            np.savetxt(path, arr, header=header)

        else:
            raise ValueError(f"Unknown format {format!r}. Choose from 'gwf', 'hdf5', 'npy', 'txt'.")

    @classmethod
    def read(
        cls,
        path: str | Path,
        format: Literal["hdf5", "npy"],  # noqa: A002
    ) -> DetectorStrainStack:
        """Reconstruct a ``DetectorStrainStack`` from an HDF5 or npy file.

        Args:
            path: Input file path.  For ``'npy'`` format a JSON sidecar at
                ``<stem>.json`` must also be present.
            format: Input format — ``'hdf5'`` or ``'npy'``.

        Returns:
            Reconstructed ``DetectorStrainStack``.

        Raises:
            NotImplementedError: If ``format`` is ``'gwf'`` or ``'txt'``
                (write-only formats).
            ValueError: If ``format`` is not recognised.
        """
        path = Path(path)

        if format in ("gwf", "txt"):
            raise NotImplementedError(f"Reading format {format!r} is not yet supported.")

        if format == "hdf5":
            with h5py.File(path, "r") as fh:
                names = list(fh.keys())
                ordered_names = names

                order_raw = fh.attrs.get(_HDF5_STACK_ORDER_ATTR)
                if order_raw is not None:
                    if isinstance(order_raw, bytes):
                        order_raw = order_raw.decode("utf-8")
                    try:
                        candidate = json.loads(order_raw)
                    except (TypeError, ValueError):
                        candidate = None

                    if (
                        isinstance(candidate, list)
                        and all(isinstance(n, str) for n in candidate)
                        and len(set(candidate)) == len(candidate)
                        and set(candidate) == set(names)
                    ):
                        ordered_names = candidate

                channels = []
                for name in ordered_names:
                    ds = fh[name]
                    data = ds[...]
                    t0 = float(ds.attrs["t0"])
                    dt = float(ds.attrs["dt"])
                    channels.append(TimeSeries(data, t0=t0, dt=dt, unit="strain", name=name))
            return cls(tuple(ordered_names), tuple(channels))

        if format == "npy":
            arr = np.load(path)
            sidecar = path.with_suffix(".json")
            meta = json.loads(sidecar.read_text())
            t0 = float(meta["t0"])
            dt = float(meta["dt"])
            channel_names = meta["channels"]
            channels = [
                TimeSeries(arr[:, i], t0=t0, dt=dt, unit="strain", name=name) for i, name in enumerate(channel_names)
            ]
            return cls(tuple(channel_names), tuple(channels))

        raise ValueError(f"Unknown format {format!r}. Choose from 'hdf5', 'npy'.")

    def to_dict(self) -> dict[str, TimeSeries]:
        """Map detector name to GWpy series (same objects as ``__getitem__``).

        Returns:
            Mapping of detector name to GWpy series.
        """
        return dict(zip(self._names, self._channels, strict=True))

data property

Strain samples shaped (n_detectors, n_samples) (C-contiguous copy).

Returns:

Type Description
ndarray

Strain samples shaped (n_detectors, n_samples) (C-contiguous copy).

detector_names property

IFO order used for stacking (immutable).

sample_rate property

Sample rate (GWpy quantity), identical for every channel.

Returns:

Type Description
Quantity

Sample rate of the first channel.

t0 property

GPS start of the first sample (same for every channel).

__getitem__(key)

Return one channel by index or detector name (same object as stored).

Parameters:

Name Type Description Default
key int | str

Index or detector name.

required

Returns:

Type Description
TimeSeries

GWpy TimeSeries object for the specified channel.

Raises:

Type Description
KeyError

If the key is not found.

TypeError

If the key is not an integer or string.

TypeError

If the key is a boolean.

Source code in src/gwmock_signal/multichannel/stack.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
def __getitem__(self, key: int | str) -> TimeSeries:
    """Return one channel by index or detector name (same object as stored).

    Args:
        key: Index or detector name.

    Returns:
        GWpy TimeSeries object for the specified channel.

    Raises:
        KeyError: If the key is not found.
        TypeError: If the key is not an integer or string.
        TypeError: If the key is a boolean.
    """
    if isinstance(key, str):
        try:
            idx = self._names.index(key)
        except ValueError as exc:
            raise KeyError(key) from exc
        return self._channels[idx]
    if isinstance(key, bool):
        raise TypeError("index must be int or str, got bool")
    if isinstance(key, int):
        return self._channels[key]
    raise TypeError(f"index must be int or str, got {type(key)}")

__init__(detector_names, channels)

Prefer the from_mapping classmethod for public construction.

Parameters:

Name Type Description Default
detector_names tuple[str, ...]

Channel order; row i of data corresponds to detector_names[i].

required
channels tuple[TimeSeries, ...]

Tuple of GWpy time series objects, one per detector.

required

Raises:

Type Description
ValueError

If detector_names and channels have different lengths.

ValueError

If detector_names is not unique.

ValueError

If channels is empty.

TypeError

If a channel is not a GWpy TimeSeries.

ValueError

If the channels are not aligned on the same grid.

ValueError

If the channels are not compatible (mismatched unit or sample rate).

Source code in src/gwmock_signal/multichannel/stack.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def __init__(self, detector_names: tuple[str, ...], channels: tuple[TimeSeries, ...]) -> None:
    """Prefer the ``from_mapping`` classmethod for public construction.

    Args:
        detector_names: Channel order; row ``i`` of ``data`` corresponds to
            ``detector_names[i]``.
        channels: Tuple of GWpy time series objects, one per detector.

    Raises:
        ValueError: If ``detector_names`` and ``channels`` have different lengths.
        ValueError: If ``detector_names`` is not unique.
        ValueError: If ``channels`` is empty.
        TypeError: If a channel is not a GWpy [`TimeSeries`](https://gwpy.github.io/docs/latest/api/gwpy.timeseries.TimeSeries/).
        ValueError: If the channels are not aligned on the same grid.
        ValueError: If the channels are not compatible (mismatched unit or sample rate).

    """
    if len(detector_names) != len(channels):
        raise ValueError("detector_names and channels must have the same length.")
    if len(set(detector_names)) != len(detector_names):
        raise ValueError("detector_names must be unique.")
    for i, s in enumerate(channels):
        if not isinstance(s, TimeSeries):
            raise TypeError(f"channels[{i}] must be gwpy.timeseries.TimeSeries, got {type(s)}")
    _validate_aligned_channels(channels)
    self._names = detector_names
    self._channels = channels

__len__()

Number of detectors (channels).

Returns:

Type Description
int

Number of detectors (channels).

Source code in src/gwmock_signal/multichannel/stack.py
169
170
171
172
173
174
175
def __len__(self) -> int:
    """Number of detectors (channels).

    Returns:
        Number of detectors (channels).
    """
    return len(self._channels)

from_mapping(detector_names, strains) classmethod

Build a stack in the given detector order; validate a single shared time grid.

Parameters:

Name Type Description Default
detector_names Sequence[str]

Channel order; row i of data corresponds to detector_names[i].

required
strains Mapping[str, TimeSeries]

Mapping containing every listed name. Extra keys are ignored.

required

Returns:

Type Description
DetectorStrainStack

New DetectorStrainStack instance.

Raises:

Type Description
KeyError

If any detector_names entry is missing from strains.

ValueError

If channels are not aligned on the same grid.

TypeError

If a strain is not a GWpy TimeSeries.

Source code in src/gwmock_signal/multichannel/stack.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
@classmethod
def from_mapping(
    cls,
    detector_names: Sequence[str],
    strains: Mapping[str, TimeSeries],
) -> DetectorStrainStack:
    """Build a stack in the given detector order; validate a single shared time grid.

    Args:
        detector_names: Channel order; row ``i`` of ``data`` corresponds to
            ``detector_names[i]``.
        strains: Mapping containing every listed name. Extra keys are ignored.

    Returns:
        New ``DetectorStrainStack`` instance.

    Raises:
        KeyError: If any ``detector_names`` entry is missing from ``strains``.
        ValueError: If channels are not aligned on the same grid.
        TypeError: If a strain is not a GWpy [`TimeSeries`](https://gwpy.github.io/docs/latest/api/gwpy.timeseries.TimeSeries/).
    """
    names = tuple(str(n) for n in detector_names)
    if not names:
        raise ValueError("detector_names must be non-empty.")
    missing = [n for n in names if n not in strains]
    if missing:
        raise KeyError(f"Missing strain keys for detectors: {missing}")
    channels = []
    for n in names:
        s = strains[n]
        if not isinstance(s, TimeSeries):
            raise TypeError(f"strains[{n!r}] must be gwpy.timeseries.TimeSeries, got {type(s)}")
        channels.append(s)
    ch_tuple = tuple(channels)
    _validate_aligned_channels(ch_tuple)
    return cls(names, ch_tuple)

read(path, format) classmethod

Reconstruct a DetectorStrainStack from an HDF5 or npy file.

Parameters:

Name Type Description Default
path str | Path

Input file path. For 'npy' format a JSON sidecar at <stem>.json must also be present.

required
format Literal['hdf5', 'npy']

Input format — 'hdf5' or 'npy'.

required

Returns:

Type Description
DetectorStrainStack

Reconstructed DetectorStrainStack.

Raises:

Type Description
NotImplementedError

If format is 'gwf' or 'txt' (write-only formats).

ValueError

If format is not recognised.

Source code in src/gwmock_signal/multichannel/stack.py
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
@classmethod
def read(
    cls,
    path: str | Path,
    format: Literal["hdf5", "npy"],  # noqa: A002
) -> DetectorStrainStack:
    """Reconstruct a ``DetectorStrainStack`` from an HDF5 or npy file.

    Args:
        path: Input file path.  For ``'npy'`` format a JSON sidecar at
            ``<stem>.json`` must also be present.
        format: Input format — ``'hdf5'`` or ``'npy'``.

    Returns:
        Reconstructed ``DetectorStrainStack``.

    Raises:
        NotImplementedError: If ``format`` is ``'gwf'`` or ``'txt'``
            (write-only formats).
        ValueError: If ``format`` is not recognised.
    """
    path = Path(path)

    if format in ("gwf", "txt"):
        raise NotImplementedError(f"Reading format {format!r} is not yet supported.")

    if format == "hdf5":
        with h5py.File(path, "r") as fh:
            names = list(fh.keys())
            ordered_names = names

            order_raw = fh.attrs.get(_HDF5_STACK_ORDER_ATTR)
            if order_raw is not None:
                if isinstance(order_raw, bytes):
                    order_raw = order_raw.decode("utf-8")
                try:
                    candidate = json.loads(order_raw)
                except (TypeError, ValueError):
                    candidate = None

                if (
                    isinstance(candidate, list)
                    and all(isinstance(n, str) for n in candidate)
                    and len(set(candidate)) == len(candidate)
                    and set(candidate) == set(names)
                ):
                    ordered_names = candidate

            channels = []
            for name in ordered_names:
                ds = fh[name]
                data = ds[...]
                t0 = float(ds.attrs["t0"])
                dt = float(ds.attrs["dt"])
                channels.append(TimeSeries(data, t0=t0, dt=dt, unit="strain", name=name))
        return cls(tuple(ordered_names), tuple(channels))

    if format == "npy":
        arr = np.load(path)
        sidecar = path.with_suffix(".json")
        meta = json.loads(sidecar.read_text())
        t0 = float(meta["t0"])
        dt = float(meta["dt"])
        channel_names = meta["channels"]
        channels = [
            TimeSeries(arr[:, i], t0=t0, dt=dt, unit="strain", name=name) for i, name in enumerate(channel_names)
        ]
        return cls(tuple(channel_names), tuple(channels))

    raise ValueError(f"Unknown format {format!r}. Choose from 'hdf5', 'npy'.")

to_dict()

Map detector name to GWpy series (same objects as __getitem__).

Returns:

Type Description
dict[str, TimeSeries]

Mapping of detector name to GWpy series.

Source code in src/gwmock_signal/multichannel/stack.py
306
307
308
309
310
311
312
def to_dict(self) -> dict[str, TimeSeries]:
    """Map detector name to GWpy series (same objects as ``__getitem__``).

    Returns:
        Mapping of detector name to GWpy series.
    """
    return dict(zip(self._names, self._channels, strict=True))

write(path, format='hdf5')

Write the stack to a file.

Parameters:

Name Type Description Default
path str | Path

Output file path.

required
format Literal['gwf', 'hdf5', 'npy', 'txt']

Output format — one of 'gwf', 'hdf5', 'npy', or 'txt'. Defaults to 'hdf5'.

'hdf5'

Raises:

Type Description
ValueError

If format is not recognised.

Note

GWF writing requires an optional frame library (python-ldas-tools-framecpp or framel). Install via your system package manager or conda.

Source code in src/gwmock_signal/multichannel/stack.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
def write(
    self,
    path: str | Path,
    format: Literal["gwf", "hdf5", "npy", "txt"] = "hdf5",  # noqa: A002
) -> None:
    """Write the stack to a file.

    Args:
        path: Output file path.
        format: Output format — one of ``'gwf'``, ``'hdf5'``, ``'npy'``,
            or ``'txt'``. Defaults to ``'hdf5'``.

    Raises:
        ValueError: If ``format`` is not recognised.

    Note:
        GWF writing requires an optional frame library
        (``python-ldas-tools-framecpp`` or ``framel``).  Install via your
        system package manager or conda.
    """
    path = Path(path)
    t0_val = float(self.t0.value)
    dt_val = 1.0 / float(self.sample_rate.value)
    channel_names = list(self._names)

    if format == "gwf":
        tsd = TimeSeriesDict()
        for name, ts in zip(self._names, self._channels, strict=True):
            ts_named = ts.copy()
            ts_named.name = name
            tsd[name] = ts_named
        tsd.write(str(path), format="gwf")

    elif format == "hdf5":
        with h5py.File(path, "w") as fh:
            # Preserve the detector/channel ordering explicitly since HDF5
            # group key iteration order is not guaranteed across environments.
            fh.attrs[_HDF5_STACK_ORDER_ATTR] = json.dumps(channel_names)
            for name, ts in zip(self._names, self._channels, strict=True):
                ds = fh.create_dataset(name, data=np.asarray(ts.value, dtype=np.float64))
                ds.attrs["t0"] = t0_val
                ds.attrs["dt"] = dt_val
                ds.attrs["unit"] = "strain"

    elif format == "npy":
        arr = np.stack([np.asarray(ts.value, dtype=np.float64) for ts in self._channels], axis=1)
        np.save(path, arr)
        sidecar = Path(path).with_suffix(".json")
        sidecar.write_text(json.dumps({"t0": t0_val, "dt": dt_val, "channels": channel_names}))

    elif format == "txt":
        arr = np.stack([np.asarray(ts.value, dtype=np.float64) for ts in self._channels], axis=1)
        header = f"t0={t0_val} dt={dt_val} channels={','.join(channel_names)}"
        np.savetxt(path, arr, header=header)

    else:
        raise ValueError(f"Unknown format {format!r}. Choose from 'gwf', 'hdf5', 'npy', 'txt'.")

For usage examples, see User guide — Multichannel strain examples.