Skip to content

API Reference

The flacarray package consists of a primary class (FlacArray) plus a variety of helper functions.

Compressed Array Representation

The FlacArray class stores a compressed representation of an N dimensional array where the last dimension consists of "streams" of numbers to be compressed.

flacarray.FlacArray

FLAC compressed array representation.

This class holds a compressed representation of an N-dimensional array. The final (fastest changing) dimension is the axis along which the data is compressed. Each of the vectors in this last dimension is called a "stream" here. The leading dimensions of the original matrix form an array of these streams.

Internally, the data is stored as a contiguous concatenation of the bytes from these compressed streams. A separate array contains the starting byte of each stream in the overall bytes array. The shape of the starting array corresponds to the shape of the leading, un-compressed dimensions of the original array.

The input data is converted to 32bit integers. The "quanta" value is used for floating point data conversion and represents the floating point increment for a single integer value. If quanta is None, each stream is scaled independently based on its data range. If quanta is a scalar, all streams are scaled with the same value. If quanta is an array, it specifies the scaling independently for each stream.

Alternatively, if "precision" is provided, each data vector is scaled to retain the prescribed number of significant digits when converting to integers.

The following rules specify the data conversion that is performed depending on the input type:

  • int32: No conversion.

  • int64: Subtract the integer closest to the mean, then truncate to lower 32 bits, and check that the higher bits were zero.

  • float32: Subtract the mean and scale data based on the quanta value (see above). Then round to nearest 32bit integer.

  • float64: Subtract the mean and scale data based on the quanta value (see above). Then round to nearest 32bit integer.

After conversion to 32bit integers, each stream's data is separately compressed into a sequence of FLAC bytes, which is appended to the bytestream. The offset in bytes for each stream is recorded.

A FlacArray is only constructed directly when making a copy. Use the class methods to create FlacArrays from numpy arrays or on-disk representations.

Parameters:

Name Type Description Default
other FlacArray

Construct a copy of the input FlacArray.

required
Source code in flacarray/array.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
class FlacArray:
    """FLAC compressed array representation.

    This class holds a compressed representation of an N-dimensional array.  The final
    (fastest changing) dimension is the axis along which the data is compressed.  Each
    of the vectors in this last dimension is called a "stream" here.  The leading
    dimensions of the original matrix form an array of these streams.

    Internally, the data is stored as a contiguous concatenation of the bytes from
    these compressed streams.  A separate array contains the starting byte of each
    stream in the overall bytes array.  The shape of the starting array corresponds
    to the shape of the leading, un-compressed dimensions of the original array.

    The input data is converted to 32bit integers.  The "quanta" value is used
    for floating point data conversion and represents the floating point increment
    for a single integer value.  If quanta is None, each stream is scaled independently
    based on its data range.  If quanta is a scalar, all streams are scaled with the
    same value.  If quanta is an array, it specifies the scaling independently for each
    stream.

    Alternatively, if "precision" is provided, each data vector is scaled to retain
    the prescribed number of significant digits when converting to integers.

    The following rules specify the data conversion that is performed depending on
    the input type:

    * int32:  No conversion.

    * int64:  Subtract the integer closest to the mean, then truncate to lower
        32 bits, and check that the higher bits were zero.

    * float32:  Subtract the mean and scale data based on the quanta value (see
        above).  Then round to nearest 32bit integer.

    * float64:  Subtract the mean and scale data based on the quanta value (see
        above).  Then round to nearest 32bit integer.

    After conversion to 32bit integers, each stream's data is separately compressed
    into a sequence of FLAC bytes, which is appended to the bytestream.  The offset in
    bytes for each stream is recorded.

    A FlacArray is only constructed directly when making a copy.  Use the class methods
    to create FlacArrays from numpy arrays or on-disk representations.

    Args:
        other (FlacArray):  Construct a copy of the input FlacArray.

    """

    def __init__(
        self,
        other,
        shape=None,
        global_shape=None,
        compressed=None,
        stream_starts=None,
        stream_nbytes=None,
        stream_offsets=None,
        stream_gains=None,
        mpi_comm=None,
        mpi_dist=None,
    ):
        if other is not None:
            # We are copying an existing object, make sure we have an
            # independent copy.
            self._shape = copy.deepcopy(other._shape)
            self._global_shape = copy.deepcopy(other._global_shape)
            self._compressed = copy.deepcopy(other._compressed)
            self._stream_starts = copy.deepcopy(other._stream_starts)
            self._stream_nbytes = copy.deepcopy(other._stream_nbytes)
            self._stream_offsets = copy.deepcopy(other._stream_offsets)
            self._stream_gains = copy.deepcopy(other._stream_gains)
            self._mpi_dist = copy.deepcopy(other._mpi_dist)
            # MPI communicators can be limited in number and expensive to create.
            self._mpi_comm = other._mpi_comm
        else:
            # This form of constructor is used in the class methods where we
            # have already created these arrays for use by this instance.
            self._shape = shape
            self._global_shape = global_shape
            self._compressed = compressed
            self._stream_starts = stream_starts
            self._stream_nbytes = stream_nbytes
            self._stream_offsets = stream_offsets
            self._stream_gains = stream_gains
            self._mpi_comm = mpi_comm
            self._mpi_dist = mpi_dist
        self._init_params()

    def _init_params(self):
        self._local_nbytes = self._compressed.nbytes
        (
            self._global_nbytes,
            self._global_proc_nbytes,
            self._global_stream_starts,
        ) = global_bytes(self._local_nbytes, self._stream_starts, self._mpi_comm)
        self._stream_size = self._shape[-1]
        self._leading_shape = self._stream_starts.shape
        self._local_nstreams = np.prod(self._leading_shape)
        if len(self._global_shape) == 1:
            self._global_leading_shape = (1,)
        else:
            self._global_leading_shape = self._global_shape[:-1]
        self._global_nstreams = np.prod(self._global_leading_shape)
        # For reference, record the type of the original data.
        if self._stream_offsets is not None:
            if self._stream_gains is not None:
                # This is floating point data
                if self._stream_gains.dtype == np.dtype(np.float64):
                    self._typestr = "float64"
                else:
                    self._typestr = "float32"
            else:
                # This is int64 data
                self._typestr = "int64"
        else:
            self._typestr = "int32"

    # Shapes of decompressed array

    @property
    def shape(self):
        """The shape of the local, uncompressed array."""
        return self._shape

    @property
    def global_shape(self):
        """The global shape of array across any MPI communicator."""
        return self._global_shape

    @property
    def leading_shape(self):
        """The local shape of leading uncompressed dimensions."""
        return self._leading_shape

    @property
    def global_leading_shape(self):
        """The global shape of leading uncompressed dimensions across all processes."""
        return self._global_leading_shape

    @property
    def stream_size(self):
        """The uncompressed length of each stream."""
        return self._shape[-1]

    # Properties of the compressed data

    @property
    def nbytes(self):
        """The total number of bytes used by compressed data on the local process."""
        return self._local_nbytes

    @property
    def global_nbytes(self):
        """The sum of total bytes used by compressed data across all processes."""
        return self._global_nbytes

    @property
    def global_process_nbytes(self):
        """The bytes used by compressed data on each process."""
        return self._global_proc_bytes

    @property
    def nstreams(self):
        """The number of local streams (product of entries of `leading_shape`)"""
        return self._local_nstreams

    @property
    def global_nstreams(self):
        """Number of global streams (product of entries of `global_leading_shape`)"""
        return self._global_nstreams

    @property
    def compressed(self):
        """The concatenated raw bytes of all streams on the local process."""
        return self._compressed

    @property
    def stream_starts(self):
        """The array of starting bytes for each stream on the local process."""
        return self._stream_starts

    @property
    def stream_nbytes(self):
        """The array of nbytes for each stream on the local process."""
        return self._stream_nbytes

    @property
    def global_stream_starts(self):
        """The array of starting bytes within the global compressed data."""
        return self._global_stream_starts

    @property
    def global_stream_nbytes(self):
        """The array of nbytes within the global compressed data."""
        return self._global_stream_nbytes

    @property
    def stream_offsets(self):
        """The value subtracted from each stream during conversion to int32."""
        return self._stream_offsets

    @property
    def stream_gains(self):
        """The gain factor for each stream during conversion to int32."""
        return self._stream_gains

    @property
    def mpi_comm(self):
        """The MPI communicator over which the array is distributed."""
        return self._mpi_comm

    @property
    def mpi_dist(self):
        """The range of the leading dimension assigned to each MPI process."""
        return self._mpi_dist

    def _keep_view(self, key):
        if len(key) != len(self._leading_shape):
            raise ValueError("view size does not match leading dimensions")
        view = np.zeros(self._leading_shape, dtype=bool)
        view[key] = True
        return view

    def __getitem__(self, key):
        """Decompress a slice of data on the fly."""
        first = None
        last = None
        keep = None
        if isinstance(key, tuple):
            # We are slicing on multiple dimensions
            if len(key) == len(self._shape):
                # Slicing on the sample dimension too
                keep = self._keep_view(key[:-1])
                samp_key = key[-1]
                if isinstance(samp_key, slice):
                    # A slice
                    if samp_key.step is not None and samp_key.step != 1:
                        raise ValueError("Only stride==1 supported on stream slices")
                    first = samp_key.start
                    last = samp_key.stop
                elif isinstance(samp_key, (int, np.integer)):
                    # Just a scalar
                    first = samp_key
                    last = samp_key + 1
                else:
                    raise ValueError(
                        "Only contiguous slices supported on the stream dimension"
                    )
            else:
                # Only slicing the leading dimensions
                vw = list(key)
                vw.extend(
                    [slice(None) for x in range(len(self._leading_shape) - len(key))]
                )
                keep = self._keep_view(tuple(vw))
        else:
            # We are slicing / indexing only the leading dimension
            vw = [slice(None) for x in range(len(self._leading_shape))]
            vw[0] = key
            keep = self._keep_view(tuple(vw))

        arr, _ = array_decompress_slice(
            self._compressed,
            self._stream_size,
            self._stream_starts,
            self._stream_nbytes,
            stream_offsets=self._stream_offsets,
            stream_gains=self._stream_gains,
            keep=keep,
            first_stream_sample=first,
            last_stream_sample=last,
        )
        return arr

    def __delitem__(self, key):
        raise RuntimeError("Cannot delete individual streams")

    def __setitem__(self, key, value):
        raise RuntimeError("Cannot modify individual byte streams")

    def __repr__(self):
        rank = 0
        mpistr = ""
        if self._mpi_comm is not None:
            rank = self._mpi_comm.rank
            mpistr = f" | Rank {rank:04d} "
            mpistr += f"{self._mpi_dist[rank][0]}-"
            mpistr += f"{self._mpi_dist[rank][1] - 1} |"
        rep = f"<FlacArray{mpistr} {self._typestr} "
        rep += f"shape={self._shape} bytes={self._local_nbytes}>"
        return rep

    def __eq__(self, other):
        if self._shape != other._shape:
            log.debug(f"other shape {other._shape} != {self._shape}")
            return False
        if self._global_shape != other._global_shape:
            msg = f"other global_shape {other._global_shape} != {self._global_shape}"
            log.debug(msg)
            return False
        if not np.array_equal(self._stream_starts, other._stream_starts):
            msg = f"other starts {other._stream_starts} != {self._stream_starts}"
            log.debug(msg)
            return False
        if not np.array_equal(self._compressed, other._compressed):
            msg = f"other compressed {other._compressed} != {self._compressed}"
            log.debug(msg)
            return False
        if self._stream_offsets is None:
            if other._stream_offsets is not None:
                log.debug("other stream_offsets not None, self is None")
                return False
        else:
            if other._stream_offsets is None:
                log.debug("other stream_offsets is None, self is not None")
                return False
            else:
                if not np.allclose(self._stream_offsets, other._stream_offsets):
                    msg = f"other stream_offsets {other._stream_offsets} != "
                    msg += f"{self._stream_offsets}"
                    log.debug(msg)
                    return False
        if self._stream_gains is None:
            if other._stream_gains is not None:
                log.debug("other stream_gains not None, self is None")
                return False
        else:
            if other._stream_gains is None:
                log.debug("other stream_offsets is None, self is not None")
                return False
            else:
                if not np.allclose(self._stream_gains, other._stream_gains):
                    msg = f"other stream_gains {other._stream_gains} != "
                    msg += f"{self._stream_gains}"
                    log.debug(msg)
                    return False
        return True

    def to_array(
        self, keep=None, stream_slice=None, keep_indices=False, use_threads=False
    ):
        """Decompress local data into a numpy array.

        This uses the compressed representation to reconstruct a normal numpy
        array.  The returned data type will be either int32, int64, float32, or
        float64 depending on the original data type.

        If `stream_slice` is specified, the returned array will have only that
        range of samples in the final dimension.

        If `keep` is specified, this should be a boolean array with the same shape
        as the leading dimensions of the original array.  True values in this array
        indicate that the stream should be kept.

        If `keep` is specified, the returned array WILL NOT have the same shape as
        the original.  Instead it will be a 2D array of decompressed streams- the
        streams corresponding to True values in the `keep` mask.

        If `keep_indices` is True and `keep` is specified, then a tuple of two values
        is returned.  The first is the array of decompressed streams.  The second is
        a list of tuples, each of which specifies the indices of the stream in the
        original array.

        Args:
            keep (array):  Bool array of streams to keep in the decompression.
            stream_slice (slice):  A python slice with step size of one, indicating
                the sample range to extract from each stream.
            keep_indices (bool):  If True, also return the original indices of the
                streams.
            use_threads (bool):  If True, use OpenMP threads to parallelize decoding.
                This is only beneficial for large arrays.

        """
        first_samp = None
        last_samp = None
        if stream_slice is not None:
            if stream_slice.step is not None and stream_slice.step != 1:
                raise RuntimeError(
                    "Only stream slices with a step size of 1 are supported"
                )
            first_samp = stream_slice.start
            last_samp = stream_slice.stop

        arr, indices = array_decompress_slice(
            self._compressed,
            self._stream_size,
            self._stream_starts,
            self._stream_nbytes,
            stream_offsets=self._stream_offsets,
            stream_gains=self._stream_gains,
            keep=keep,
            first_stream_sample=first_samp,
            last_stream_sample=last_samp,
            use_threads=use_threads,
        )
        if keep is not None and keep_indices:
            return (arr, indices)
        else:
            return arr

    @classmethod
    def from_array(
        cls, arr, level=5, quanta=None, precision=None, mpi_comm=None, use_threads=False
    ):
        """Construct a FlacArray from a numpy ndarray.

        Args:
            arr (numpy.ndarray):  The input array data.
            level (int):  Compression level (0-8).
            quanta (float, array):  For floating point data, the floating point
                increment of each 32bit integer value.  Optionally an iterable of
                increments, one per stream.
            precision (int, array):  Number of significant digits to retain in
                float-to-int conversion.  Alternative to `quanta`.  Optionally an
                iterable of values, one per stream.
            mpi_comm (MPI.Comm):  If specified, the input array is assumed to be
                distributed across the communicator at the leading dimension.  The
                local piece of the array is passed in on each process.
            use_threads (bool):  If True, use OpenMP threads to parallelize decoding.
                This is only beneficial for large arrays.

        Returns:
            (FlacArray):  A newly constructed FlacArray.

        """
        # Get the global shape of the array
        global_props = global_array_properties(arr.shape, mpi_comm=mpi_comm)
        global_shape = global_props["shape"]
        mpi_dist = global_props["dist"]

        # Compress our local piece of the array
        compressed, starts, nbytes, offsets, gains = array_compress(
            arr,
            level=level,
            quanta=quanta,
            precision=precision,
            use_threads=use_threads,
        )

        return FlacArray(
            None,
            shape=arr.shape,
            global_shape=global_shape,
            compressed=compressed,
            stream_starts=starts,
            stream_nbytes=nbytes,
            stream_offsets=offsets,
            stream_gains=gains,
            mpi_comm=mpi_comm,
            mpi_dist=mpi_dist,
        )

    def write_hdf5(self, hgrp):
        """Write data to an HDF5 Group.

        The internal object properties are written to an open HDF5 group.  If you
        wish to use MPI I/O to write data to the group, then you must be using an MPI
        enabled h5py and you should pass in a valid handle to the group on all
        processes.

        If the `FlacArray` is distributed over an MPI communicator, but the h5py
        implementation does not support MPI I/O, then all data will be communicated
        to the rank zero process for writing.  In this case, the `hgrp` argument should
        be None except on the root process.

        Args:
            hgrp (h5py.Group):  The open Group for writing.

        Returns:
            None

        """
        hdf5_write_compressed(
            hgrp,
            self._leading_shape,
            self._global_leading_shape,
            self._stream_size,
            self._stream_starts,
            self._global_stream_starts,
            self._stream_nbytes,
            self._stream_offsets,
            self._stream_gains,
            self._compressed,
            self._compressed.nbytes,
            self._global_nbytes,
            self._global_proc_nbytes,
            self._mpi_comm,
            self._mpi_dist,
        )

    @classmethod
    def read_hdf5(
        cls,
        hgrp,
        keep=None,
        mpi_comm=None,
        mpi_dist=None,
    ):
        """Construct a FlacArray from an HDF5 Group.

        This function loads all information about the array from an HDF5 group.  If
        `mpi_comm` is specified, the created array is distributed over that
        communicator.  If you also wish to use MPI I/O to read data from the group,
        then you must be using an MPI-enabled h5py and you should pass in a valid
        handle to the group on all processes.

        If `mpi_dist` is specified, it should be an iterable with the number of leading
        dimension elements assigned to each process.  If None, the leading dimension
        will be distributed uniformly.

        If `keep` is specified, this should be a boolean array with the same shape
        as the leading dimensions of the original array.  True values in this array
        indicate that the stream should be kept.

        If `keep` is specified, the returned array WILL NOT have the same shape as
        the original.  Instead it will be a 2D array of decompressed streams- the
        streams corresponding to True values in the `keep` mask.

        Args:
            hgrp (h5py.Group):  The open Group for reading.
            keep (array):  Bool array of streams to keep in the decompression.
            mpi_comm (MPI.Comm):  If specified, the communicator over which to
                distribute the leading dimension.
            mpi_dist (array):  If specified, assign blocks of these sizes to processes
                when distributing the leading dimension.

        Returns:
            (FlacArray):  A newly constructed FlacArray.

        """
        (
            local_shape,
            global_shape,
            compressed,
            stream_starts,
            stream_nbytes,
            stream_offsets,
            stream_gains,
            mpi_dist,
            keep_indices,
        ) = hdf5_read_compressed(
            hgrp,
            keep=keep,
            mpi_comm=mpi_comm,
            mpi_dist=mpi_dist,
        )

        return FlacArray(
            None,
            shape=local_shape,
            global_shape=global_shape,
            compressed=compressed,
            stream_starts=stream_starts,
            stream_nbytes=stream_nbytes,
            stream_offsets=stream_offsets,
            stream_gains=stream_gains,
            mpi_comm=mpi_comm,
            mpi_dist=mpi_dist,
        )

    def write_zarr(self, zgrp):
        """Write data to an Zarr Group.

        The internal object properties are written to an open zarr group.

        If the `FlacArray` is distributed over an MPI communicator, then all data will
        be communicated to the rank zero process for writing.  In this case, the `zgrp`
        argument should be None except on the root process.

        Args:
            zgrp (zarr.Group):  The open Group for writing.

        Returns:
            None

        """
        zarr_write_compressed(
            zgrp,
            self._leading_shape,
            self._global_leading_shape,
            self._stream_size,
            self._stream_starts,
            self._global_stream_starts,
            self._stream_nbytes,
            self._stream_offsets,
            self._stream_gains,
            self._compressed,
            self._compressed.nbytes,
            self._global_nbytes,
            self._global_proc_nbytes,
            self._mpi_comm,
            self._mpi_dist,
        )

    @classmethod
    def read_zarr(
        cls,
        zgrp,
        keep=None,
        mpi_comm=None,
        mpi_dist=None,
    ):
        """Construct a FlacArray from a Zarr Group.

        This function loads all information about the array from a zarr group.  If
        `mpi_comm` is specified, the created array is distributed over that
        communicator.

        If `mpi_dist` is specified, it should be an iterable with the number of leading
        dimension elements assigned to each process.  If None, the leading dimension
        will be distributed uniformly.

        If `keep` is specified, this should be a boolean array with the same shape
        as the leading dimensions of the original array.  True values in this array
        indicate that the stream should be kept.

        If `keep` is specified, the returned array WILL NOT have the same shape as
        the original.  Instead it will be a 2D array of decompressed streams- the
        streams corresponding to True values in the `keep` mask.

        Args:
            zgrp (zarr.Group):  The open Group for reading.
            keep (array):  Bool array of streams to keep in the decompression.
            mpi_comm (MPI.Comm):  If specified, the communicator over which to
                distribute the leading dimension.
            mpi_dist (array):  If specified, assign blocks of these sizes to processes
                when distributing the leading dimension.

        Returns:
            (FlacArray):  A newly constructed FlacArray.

        """
        (
            local_shape,
            global_shape,
            compressed,
            stream_starts,
            stream_nbytes,
            stream_offsets,
            stream_gains,
            mpi_dist,
            keep_indices,
        ) = zarr_read_compressed(
            zgrp,
            keep=keep,
            mpi_comm=mpi_comm,
            mpi_dist=mpi_dist,
        )

        return FlacArray(
            None,
            shape=local_shape,
            global_shape=global_shape,
            compressed=compressed,
            stream_starts=stream_starts,
            stream_nbytes=stream_nbytes,
            stream_offsets=stream_offsets,
            stream_gains=stream_gains,
            mpi_comm=mpi_comm,
            mpi_dist=mpi_dist,
        )

compressed property

The concatenated raw bytes of all streams on the local process.

global_leading_shape property

The global shape of leading uncompressed dimensions across all processes.

global_nbytes property

The sum of total bytes used by compressed data across all processes.

global_nstreams property

Number of global streams (product of entries of global_leading_shape)

global_process_nbytes property

The bytes used by compressed data on each process.

global_shape property

The global shape of array across any MPI communicator.

global_stream_nbytes property

The array of nbytes within the global compressed data.

global_stream_starts property

The array of starting bytes within the global compressed data.

leading_shape property

The local shape of leading uncompressed dimensions.

mpi_comm property

The MPI communicator over which the array is distributed.

mpi_dist property

The range of the leading dimension assigned to each MPI process.

nbytes property

The total number of bytes used by compressed data on the local process.

nstreams property

The number of local streams (product of entries of leading_shape)

shape property

The shape of the local, uncompressed array.

stream_gains property

The gain factor for each stream during conversion to int32.

stream_nbytes property

The array of nbytes for each stream on the local process.

stream_offsets property

The value subtracted from each stream during conversion to int32.

stream_size property

The uncompressed length of each stream.

stream_starts property

The array of starting bytes for each stream on the local process.

__getitem__(key)

Decompress a slice of data on the fly.

Source code in flacarray/array.py
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
def __getitem__(self, key):
    """Decompress a slice of data on the fly."""
    first = None
    last = None
    keep = None
    if isinstance(key, tuple):
        # We are slicing on multiple dimensions
        if len(key) == len(self._shape):
            # Slicing on the sample dimension too
            keep = self._keep_view(key[:-1])
            samp_key = key[-1]
            if isinstance(samp_key, slice):
                # A slice
                if samp_key.step is not None and samp_key.step != 1:
                    raise ValueError("Only stride==1 supported on stream slices")
                first = samp_key.start
                last = samp_key.stop
            elif isinstance(samp_key, (int, np.integer)):
                # Just a scalar
                first = samp_key
                last = samp_key + 1
            else:
                raise ValueError(
                    "Only contiguous slices supported on the stream dimension"
                )
        else:
            # Only slicing the leading dimensions
            vw = list(key)
            vw.extend(
                [slice(None) for x in range(len(self._leading_shape) - len(key))]
            )
            keep = self._keep_view(tuple(vw))
    else:
        # We are slicing / indexing only the leading dimension
        vw = [slice(None) for x in range(len(self._leading_shape))]
        vw[0] = key
        keep = self._keep_view(tuple(vw))

    arr, _ = array_decompress_slice(
        self._compressed,
        self._stream_size,
        self._stream_starts,
        self._stream_nbytes,
        stream_offsets=self._stream_offsets,
        stream_gains=self._stream_gains,
        keep=keep,
        first_stream_sample=first,
        last_stream_sample=last,
    )
    return arr

from_array(arr, level=5, quanta=None, precision=None, mpi_comm=None, use_threads=False) classmethod

Construct a FlacArray from a numpy ndarray.

Parameters:

Name Type Description Default
arr ndarray

The input array data.

required
level int

Compression level (0-8).

5
quanta (float, array)

For floating point data, the floating point increment of each 32bit integer value. Optionally an iterable of increments, one per stream.

None
precision (int, array)

Number of significant digits to retain in float-to-int conversion. Alternative to quanta. Optionally an iterable of values, one per stream.

None
mpi_comm Comm

If specified, the input array is assumed to be distributed across the communicator at the leading dimension. The local piece of the array is passed in on each process.

None
use_threads bool

If True, use OpenMP threads to parallelize decoding. This is only beneficial for large arrays.

False

Returns:

Type Description
FlacArray

A newly constructed FlacArray.

Source code in flacarray/array.py
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
@classmethod
def from_array(
    cls, arr, level=5, quanta=None, precision=None, mpi_comm=None, use_threads=False
):
    """Construct a FlacArray from a numpy ndarray.

    Args:
        arr (numpy.ndarray):  The input array data.
        level (int):  Compression level (0-8).
        quanta (float, array):  For floating point data, the floating point
            increment of each 32bit integer value.  Optionally an iterable of
            increments, one per stream.
        precision (int, array):  Number of significant digits to retain in
            float-to-int conversion.  Alternative to `quanta`.  Optionally an
            iterable of values, one per stream.
        mpi_comm (MPI.Comm):  If specified, the input array is assumed to be
            distributed across the communicator at the leading dimension.  The
            local piece of the array is passed in on each process.
        use_threads (bool):  If True, use OpenMP threads to parallelize decoding.
            This is only beneficial for large arrays.

    Returns:
        (FlacArray):  A newly constructed FlacArray.

    """
    # Get the global shape of the array
    global_props = global_array_properties(arr.shape, mpi_comm=mpi_comm)
    global_shape = global_props["shape"]
    mpi_dist = global_props["dist"]

    # Compress our local piece of the array
    compressed, starts, nbytes, offsets, gains = array_compress(
        arr,
        level=level,
        quanta=quanta,
        precision=precision,
        use_threads=use_threads,
    )

    return FlacArray(
        None,
        shape=arr.shape,
        global_shape=global_shape,
        compressed=compressed,
        stream_starts=starts,
        stream_nbytes=nbytes,
        stream_offsets=offsets,
        stream_gains=gains,
        mpi_comm=mpi_comm,
        mpi_dist=mpi_dist,
    )

read_hdf5(hgrp, keep=None, mpi_comm=None, mpi_dist=None) classmethod

Construct a FlacArray from an HDF5 Group.

This function loads all information about the array from an HDF5 group. If mpi_comm is specified, the created array is distributed over that communicator. If you also wish to use MPI I/O to read data from the group, then you must be using an MPI-enabled h5py and you should pass in a valid handle to the group on all processes.

If mpi_dist is specified, it should be an iterable with the number of leading dimension elements assigned to each process. If None, the leading dimension will be distributed uniformly.

If keep is specified, this should be a boolean array with the same shape as the leading dimensions of the original array. True values in this array indicate that the stream should be kept.

If keep is specified, the returned array WILL NOT have the same shape as the original. Instead it will be a 2D array of decompressed streams- the streams corresponding to True values in the keep mask.

Parameters:

Name Type Description Default
hgrp Group

The open Group for reading.

required
keep array

Bool array of streams to keep in the decompression.

None
mpi_comm Comm

If specified, the communicator over which to distribute the leading dimension.

None
mpi_dist array

If specified, assign blocks of these sizes to processes when distributing the leading dimension.

None

Returns:

Type Description
FlacArray

A newly constructed FlacArray.

Source code in flacarray/array.py
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
@classmethod
def read_hdf5(
    cls,
    hgrp,
    keep=None,
    mpi_comm=None,
    mpi_dist=None,
):
    """Construct a FlacArray from an HDF5 Group.

    This function loads all information about the array from an HDF5 group.  If
    `mpi_comm` is specified, the created array is distributed over that
    communicator.  If you also wish to use MPI I/O to read data from the group,
    then you must be using an MPI-enabled h5py and you should pass in a valid
    handle to the group on all processes.

    If `mpi_dist` is specified, it should be an iterable with the number of leading
    dimension elements assigned to each process.  If None, the leading dimension
    will be distributed uniformly.

    If `keep` is specified, this should be a boolean array with the same shape
    as the leading dimensions of the original array.  True values in this array
    indicate that the stream should be kept.

    If `keep` is specified, the returned array WILL NOT have the same shape as
    the original.  Instead it will be a 2D array of decompressed streams- the
    streams corresponding to True values in the `keep` mask.

    Args:
        hgrp (h5py.Group):  The open Group for reading.
        keep (array):  Bool array of streams to keep in the decompression.
        mpi_comm (MPI.Comm):  If specified, the communicator over which to
            distribute the leading dimension.
        mpi_dist (array):  If specified, assign blocks of these sizes to processes
            when distributing the leading dimension.

    Returns:
        (FlacArray):  A newly constructed FlacArray.

    """
    (
        local_shape,
        global_shape,
        compressed,
        stream_starts,
        stream_nbytes,
        stream_offsets,
        stream_gains,
        mpi_dist,
        keep_indices,
    ) = hdf5_read_compressed(
        hgrp,
        keep=keep,
        mpi_comm=mpi_comm,
        mpi_dist=mpi_dist,
    )

    return FlacArray(
        None,
        shape=local_shape,
        global_shape=global_shape,
        compressed=compressed,
        stream_starts=stream_starts,
        stream_nbytes=stream_nbytes,
        stream_offsets=stream_offsets,
        stream_gains=stream_gains,
        mpi_comm=mpi_comm,
        mpi_dist=mpi_dist,
    )

read_zarr(zgrp, keep=None, mpi_comm=None, mpi_dist=None) classmethod

Construct a FlacArray from a Zarr Group.

This function loads all information about the array from a zarr group. If mpi_comm is specified, the created array is distributed over that communicator.

If mpi_dist is specified, it should be an iterable with the number of leading dimension elements assigned to each process. If None, the leading dimension will be distributed uniformly.

If keep is specified, this should be a boolean array with the same shape as the leading dimensions of the original array. True values in this array indicate that the stream should be kept.

If keep is specified, the returned array WILL NOT have the same shape as the original. Instead it will be a 2D array of decompressed streams- the streams corresponding to True values in the keep mask.

Parameters:

Name Type Description Default
zgrp Group

The open Group for reading.

required
keep array

Bool array of streams to keep in the decompression.

None
mpi_comm Comm

If specified, the communicator over which to distribute the leading dimension.

None
mpi_dist array

If specified, assign blocks of these sizes to processes when distributing the leading dimension.

None

Returns:

Type Description
FlacArray

A newly constructed FlacArray.

Source code in flacarray/array.py
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
@classmethod
def read_zarr(
    cls,
    zgrp,
    keep=None,
    mpi_comm=None,
    mpi_dist=None,
):
    """Construct a FlacArray from a Zarr Group.

    This function loads all information about the array from a zarr group.  If
    `mpi_comm` is specified, the created array is distributed over that
    communicator.

    If `mpi_dist` is specified, it should be an iterable with the number of leading
    dimension elements assigned to each process.  If None, the leading dimension
    will be distributed uniformly.

    If `keep` is specified, this should be a boolean array with the same shape
    as the leading dimensions of the original array.  True values in this array
    indicate that the stream should be kept.

    If `keep` is specified, the returned array WILL NOT have the same shape as
    the original.  Instead it will be a 2D array of decompressed streams- the
    streams corresponding to True values in the `keep` mask.

    Args:
        zgrp (zarr.Group):  The open Group for reading.
        keep (array):  Bool array of streams to keep in the decompression.
        mpi_comm (MPI.Comm):  If specified, the communicator over which to
            distribute the leading dimension.
        mpi_dist (array):  If specified, assign blocks of these sizes to processes
            when distributing the leading dimension.

    Returns:
        (FlacArray):  A newly constructed FlacArray.

    """
    (
        local_shape,
        global_shape,
        compressed,
        stream_starts,
        stream_nbytes,
        stream_offsets,
        stream_gains,
        mpi_dist,
        keep_indices,
    ) = zarr_read_compressed(
        zgrp,
        keep=keep,
        mpi_comm=mpi_comm,
        mpi_dist=mpi_dist,
    )

    return FlacArray(
        None,
        shape=local_shape,
        global_shape=global_shape,
        compressed=compressed,
        stream_starts=stream_starts,
        stream_nbytes=stream_nbytes,
        stream_offsets=stream_offsets,
        stream_gains=stream_gains,
        mpi_comm=mpi_comm,
        mpi_dist=mpi_dist,
    )

to_array(keep=None, stream_slice=None, keep_indices=False, use_threads=False)

Decompress local data into a numpy array.

This uses the compressed representation to reconstruct a normal numpy array. The returned data type will be either int32, int64, float32, or float64 depending on the original data type.

If stream_slice is specified, the returned array will have only that range of samples in the final dimension.

If keep is specified, this should be a boolean array with the same shape as the leading dimensions of the original array. True values in this array indicate that the stream should be kept.

If keep is specified, the returned array WILL NOT have the same shape as the original. Instead it will be a 2D array of decompressed streams- the streams corresponding to True values in the keep mask.

If keep_indices is True and keep is specified, then a tuple of two values is returned. The first is the array of decompressed streams. The second is a list of tuples, each of which specifies the indices of the stream in the original array.

Parameters:

Name Type Description Default
keep array

Bool array of streams to keep in the decompression.

None
stream_slice slice

A python slice with step size of one, indicating the sample range to extract from each stream.

None
keep_indices bool

If True, also return the original indices of the streams.

False
use_threads bool

If True, use OpenMP threads to parallelize decoding. This is only beneficial for large arrays.

False
Source code in flacarray/array.py
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
def to_array(
    self, keep=None, stream_slice=None, keep_indices=False, use_threads=False
):
    """Decompress local data into a numpy array.

    This uses the compressed representation to reconstruct a normal numpy
    array.  The returned data type will be either int32, int64, float32, or
    float64 depending on the original data type.

    If `stream_slice` is specified, the returned array will have only that
    range of samples in the final dimension.

    If `keep` is specified, this should be a boolean array with the same shape
    as the leading dimensions of the original array.  True values in this array
    indicate that the stream should be kept.

    If `keep` is specified, the returned array WILL NOT have the same shape as
    the original.  Instead it will be a 2D array of decompressed streams- the
    streams corresponding to True values in the `keep` mask.

    If `keep_indices` is True and `keep` is specified, then a tuple of two values
    is returned.  The first is the array of decompressed streams.  The second is
    a list of tuples, each of which specifies the indices of the stream in the
    original array.

    Args:
        keep (array):  Bool array of streams to keep in the decompression.
        stream_slice (slice):  A python slice with step size of one, indicating
            the sample range to extract from each stream.
        keep_indices (bool):  If True, also return the original indices of the
            streams.
        use_threads (bool):  If True, use OpenMP threads to parallelize decoding.
            This is only beneficial for large arrays.

    """
    first_samp = None
    last_samp = None
    if stream_slice is not None:
        if stream_slice.step is not None and stream_slice.step != 1:
            raise RuntimeError(
                "Only stream slices with a step size of 1 are supported"
            )
        first_samp = stream_slice.start
        last_samp = stream_slice.stop

    arr, indices = array_decompress_slice(
        self._compressed,
        self._stream_size,
        self._stream_starts,
        self._stream_nbytes,
        stream_offsets=self._stream_offsets,
        stream_gains=self._stream_gains,
        keep=keep,
        first_stream_sample=first_samp,
        last_stream_sample=last_samp,
        use_threads=use_threads,
    )
    if keep is not None and keep_indices:
        return (arr, indices)
    else:
        return arr

write_hdf5(hgrp)

Write data to an HDF5 Group.

The internal object properties are written to an open HDF5 group. If you wish to use MPI I/O to write data to the group, then you must be using an MPI enabled h5py and you should pass in a valid handle to the group on all processes.

If the FlacArray is distributed over an MPI communicator, but the h5py implementation does not support MPI I/O, then all data will be communicated to the rank zero process for writing. In this case, the hgrp argument should be None except on the root process.

Parameters:

Name Type Description Default
hgrp Group

The open Group for writing.

required

Returns:

Type Description

None

Source code in flacarray/array.py
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
def write_hdf5(self, hgrp):
    """Write data to an HDF5 Group.

    The internal object properties are written to an open HDF5 group.  If you
    wish to use MPI I/O to write data to the group, then you must be using an MPI
    enabled h5py and you should pass in a valid handle to the group on all
    processes.

    If the `FlacArray` is distributed over an MPI communicator, but the h5py
    implementation does not support MPI I/O, then all data will be communicated
    to the rank zero process for writing.  In this case, the `hgrp` argument should
    be None except on the root process.

    Args:
        hgrp (h5py.Group):  The open Group for writing.

    Returns:
        None

    """
    hdf5_write_compressed(
        hgrp,
        self._leading_shape,
        self._global_leading_shape,
        self._stream_size,
        self._stream_starts,
        self._global_stream_starts,
        self._stream_nbytes,
        self._stream_offsets,
        self._stream_gains,
        self._compressed,
        self._compressed.nbytes,
        self._global_nbytes,
        self._global_proc_nbytes,
        self._mpi_comm,
        self._mpi_dist,
    )

write_zarr(zgrp)

Write data to an Zarr Group.

The internal object properties are written to an open zarr group.

If the FlacArray is distributed over an MPI communicator, then all data will be communicated to the rank zero process for writing. In this case, the zgrp argument should be None except on the root process.

Parameters:

Name Type Description Default
zgrp Group

The open Group for writing.

required

Returns:

Type Description

None

Source code in flacarray/array.py
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
def write_zarr(self, zgrp):
    """Write data to an Zarr Group.

    The internal object properties are written to an open zarr group.

    If the `FlacArray` is distributed over an MPI communicator, then all data will
    be communicated to the rank zero process for writing.  In this case, the `zgrp`
    argument should be None except on the root process.

    Args:
        zgrp (zarr.Group):  The open Group for writing.

    Returns:
        None

    """
    zarr_write_compressed(
        zgrp,
        self._leading_shape,
        self._global_leading_shape,
        self._stream_size,
        self._stream_starts,
        self._global_stream_starts,
        self._stream_nbytes,
        self._stream_offsets,
        self._stream_gains,
        self._compressed,
        self._compressed.nbytes,
        self._global_nbytes,
        self._global_proc_nbytes,
        self._mpi_comm,
        self._mpi_dist,
    )

Direct I/O

Sometimes code has no need to store compressed arrays in memory. Instead, it may be desirable to have full arrays in memory and compressed arrays on disk. In those situations, you can use several helper functions to write and read numpy arrays directly to / from files.

HDF5

You can write to / read from an h5py Group using functions in the hdf5 submodule.

flacarray.hdf5.write_array(arr, hgrp, level=5, quanta=None, precision=None, mpi_comm=None, use_threads=False)

Compress a numpy array and write to an HDF5 group.

This function is useful if you do not need to access the compressed array in memory and only wish to write it directly to HDF5. The input array is compressed and then the write_compressed() function is called.

Parameters:

Name Type Description Default
arr array

The input numpy array.

required
hgrp Group

The Group to use.

required
level int

Compression level (0-8).

5
quanta (float, array)

For floating point data, the floating point increment of each 32bit integer value. Optionally an iterable of increments, one per stream.

None
precision (int, array)

Number of significant digits to retain in float-to-int conversion. Alternative to quanta. Optionally an iterable of values, one per stream.

None
mpi_comm Comm

If specified, the input array is assumed to be distributed across the communicator at the leading dimension. The local piece of the array is passed in on each process.

None
use_threads bool

If True, use OpenMP threads to parallelize decoding. This is only beneficial for large arrays.

False

Returns:

Type Description

None

Source code in flacarray/hdf5.py
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
@function_timer
def write_array(
    arr, hgrp, level=5, quanta=None, precision=None, mpi_comm=None, use_threads=False
):
    """Compress a numpy array and write to an HDF5 group.

    This function is useful if you do not need to access the compressed array in memory
    and only wish to write it directly to HDF5.  The input array is compressed and then
    the `write_compressed()` function is called.

    Args:
        arr (array):  The input numpy array.
        hgrp (h5py.Group):  The Group to use.
        level (int):  Compression level (0-8).
        quanta (float, array):  For floating point data, the floating point
            increment of each 32bit integer value.  Optionally an iterable of
            increments, one per stream.
        precision (int, array):  Number of significant digits to retain in
            float-to-int conversion.  Alternative to `quanta`.  Optionally an
            iterable of values, one per stream.
        mpi_comm (MPI.Comm):  If specified, the input array is assumed to be
            distributed across the communicator at the leading dimension.  The
            local piece of the array is passed in on each process.
        use_threads (bool):  If True, use OpenMP threads to parallelize decoding.
            This is only beneficial for large arrays.

    Returns:
        None

    """
    if not have_hdf5:
        raise RuntimeError("h5py is not importable, cannot write to HDF5")

    # Get the global shape of the array
    global_props = global_array_properties(arr.shape, mpi_comm=mpi_comm)
    global_shape = global_props["shape"]
    mpi_dist = global_props["dist"]

    # Compress our local piece of the array
    compressed, starts, nbytes, offsets, gains = array_compress(
        arr, level=level, quanta=quanta, precision=precision, use_threads=use_threads
    )

    local_nbytes = compressed.nbytes
    global_nbytes, global_proc_bytes, global_starts = global_bytes(
        local_nbytes, starts, mpi_comm
    )
    stream_size = arr.shape[-1]
    leading_shape = starts.shape
    if len(global_shape) == 1:
        global_leading_shape = (1,)
    else:
        global_leading_shape = global_shape[:-1]

    write_compressed(
        hgrp,
        leading_shape,
        global_leading_shape,
        stream_size,
        starts,
        global_starts,
        nbytes,
        offsets,
        gains,
        compressed,
        local_nbytes,
        global_nbytes,
        global_proc_bytes,
        mpi_comm,
        mpi_dist,
    )

flacarray.hdf5.read_array(hgrp, keep=None, stream_slice=None, keep_indices=False, mpi_comm=None, mpi_dist=None, use_threads=False)

Load a numpy array from compressed HDF5.

This function is useful if you do not need to store a compressed representation of the array in memory. Each stream will be read individually from the file and the desired slice decompressed. This avoids storing the full compressed data.

This function acts as a dispatch to the correct version of the reading function. The function is selected based on the format version string in the data.

If stream_slice is specified, the returned array will have only that range of samples in the final dimension.

If keep is specified, this should be a boolean array with the same shape as the leading dimensions of the original array. True values in this array indicate that the stream should be kept.

If keep is specified, the returned array WILL NOT have the same shape as the original. Instead it will be a 2D array of decompressed streams- the streams corresponding to True values in the keep mask.

If keep_indices is True and keep is specified, then an additional list is returned containing the indices of each stream that was kept.

Parameters:

Name Type Description Default
hgrp Group

The group to read.

required
keep array

Bool array of streams to keep in the decompression.

None
stream_slice slice

A python slice with step size of one, indicating the sample range to extract from each stream.

None
keep_indices bool

If True, also return the original indices of the streams.

False
mpi_comm Comm

The optional MPI communicator over which to distribute the leading dimension of the array.

None
mpi_dist list

The optional list of tuples specifying the first / last element of the leading dimension to assign to each process.

None
use_threads bool

If True, use OpenMP threads to parallelize decoding. This is only beneficial for large arrays.

False

Returns:

Type Description
array

The loaded and decompressed data OR the array and the kept indices.

Source code in flacarray/hdf5.py
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
@function_timer
def read_array(
    hgrp,
    keep=None,
    stream_slice=None,
    keep_indices=False,
    mpi_comm=None,
    mpi_dist=None,
    use_threads=False,
):
    """Load a numpy array from compressed HDF5.

    This function is useful if you do not need to store a compressed representation
    of the array in memory.  Each stream will be read individually from the file and
    the desired slice decompressed.  This avoids storing the full compressed data.

    This function acts as a dispatch to the correct version of the reading function.
    The function is selected based on the format version string in the data.

    If `stream_slice` is specified, the returned array will have only that
    range of samples in the final dimension.

    If `keep` is specified, this should be a boolean array with the same shape
    as the leading dimensions of the original array.  True values in this array
    indicate that the stream should be kept.

    If `keep` is specified, the returned array WILL NOT have the same shape as
    the original.  Instead it will be a 2D array of decompressed streams- the
    streams corresponding to True values in the `keep` mask.

    If `keep_indices` is True and `keep` is specified, then an additional list
    is returned containing the indices of each stream that was kept.

    Args:
        hgrp (h5py.Group):  The group to read.
        keep (array):  Bool array of streams to keep in the decompression.
        stream_slice (slice):  A python slice with step size of one, indicating
            the sample range to extract from each stream.
        keep_indices (bool):  If True, also return the original indices of the
            streams.
        mpi_comm (MPI.Comm):  The optional MPI communicator over which to distribute
            the leading dimension of the array.
        mpi_dist (list):  The optional list of tuples specifying the first / last
            element of the leading dimension to assign to each process.
        use_threads (bool):  If True, use OpenMP threads to parallelize decoding.
            This is only beneficial for large arrays.

    Returns:
        (array):  The loaded and decompressed data OR the array and the kept indices.

    """
    if not have_hdf5:
        raise RuntimeError("h5py is not importable, cannot write to HDF5")

    format_version = None
    if hgrp is not None:
        if "flacarray_format_version" in hgrp.attrs:
            format_version = hgrp.attrs["flacarray_format_version"]
    if mpi_comm is not None:
        format_version = mpi_comm.bcast(format_version, root=0)
    if format_version is None:
        raise RuntimeError("h5py Group does not contain a FlacArray")

    mod_name = f".hdf5_load_v{format_version}"
    mod = importlib.import_module(mod_name, package="flacarray")
    read_func = getattr(mod, "read_array")
    return read_func(
        hgrp,
        keep=keep,
        stream_slice=stream_slice,
        keep_indices=keep_indices,
        mpi_comm=mpi_comm,
        mpi_dist=mpi_dist,
        use_threads=use_threads,
    )

Zarr

You can write to / read from a zarr hierarch Group using functions in the zarr submodule.

flacarray.zarr.write_array(arr, zgrp, level=5, quanta=None, precision=None, mpi_comm=None, use_threads=False)

Compress a numpy array and write to an Zarr group.

This function is useful if you do not need to access the compressed array in memory and only wish to write it directly to Zarr files. The input array is compressed and then the write_compressed() function is called.

Parameters:

Name Type Description Default
arr array

The input numpy array.

required
zgrp Group

The Group to use.

required
level int

Compression level (0-8).

5
quanta (float, array)

For floating point data, the floating point increment of each 32bit integer value. Optionally an iterable of increments, one per stream.

None
precision (int, array)

Number of significant digits to retain in float-to-int conversion. Alternative to quanta. Optionally an iterable of values, one per stream.

None
mpi_comm Comm

If specified, the input array is assumed to be distributed across the communicator at the leading dimension. The local piece of the array is passed in on each process.

None
use_threads bool

If True, use OpenMP threads to parallelize decoding. This is only beneficial for large arrays.

False

Returns:

Type Description

None

Source code in flacarray/zarr.py
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
@function_timer
def write_array(
    arr, zgrp, level=5, quanta=None, precision=None, mpi_comm=None, use_threads=False
):
    """Compress a numpy array and write to an Zarr group.

    This function is useful if you do not need to access the compressed array in memory
    and only wish to write it directly to Zarr files.  The input array is compressed
    and then the `write_compressed()` function is called.

    Args:
        arr (array):  The input numpy array.
        zgrp (zarr.Group):  The Group to use.
        level (int):  Compression level (0-8).
        quanta (float, array):  For floating point data, the floating point
            increment of each 32bit integer value.  Optionally an iterable of
            increments, one per stream.
        precision (int, array):  Number of significant digits to retain in
            float-to-int conversion.  Alternative to `quanta`.  Optionally an
            iterable of values, one per stream.
        mpi_comm (MPI.Comm):  If specified, the input array is assumed to be
            distributed across the communicator at the leading dimension.  The
            local piece of the array is passed in on each process.
        use_threads (bool):  If True, use OpenMP threads to parallelize decoding.
            This is only beneficial for large arrays.

    Returns:
        None

    """
    if not have_zarr:
        raise RuntimeError("zarr is not importable, cannot write to zarr.Group")

    # Get the global shape of the array
    global_props = global_array_properties(arr.shape, mpi_comm=mpi_comm)
    global_shape = global_props["shape"]
    mpi_dist = global_props["dist"]

    # Compress our local piece of the array
    compressed, starts, nbytes, offsets, gains = array_compress(
        arr, level=level, quanta=quanta, precision=precision, use_threads=use_threads
    )

    local_nbytes = compressed.nbytes
    global_nbytes, global_proc_bytes, global_starts = global_bytes(
        local_nbytes, starts, mpi_comm
    )
    stream_size = arr.shape[-1]
    leading_shape = starts.shape
    if len(global_shape) == 1:
        global_leading_shape = (1,)
    else:
        global_leading_shape = global_shape[:-1]

    write_compressed(
        zgrp,
        leading_shape,
        global_leading_shape,
        stream_size,
        starts,
        global_starts,
        nbytes,
        offsets,
        gains,
        compressed,
        local_nbytes,
        global_nbytes,
        global_proc_bytes,
        mpi_comm,
        mpi_dist,
    )

flacarray.zarr.read_array(zgrp, keep=None, stream_slice=None, keep_indices=False, mpi_comm=None, mpi_dist=None, use_threads=False)

Load a numpy array from a compressed Zarr group.

This function is useful if you do not need to store a compressed representation of the array in memory. Each stream will be read individually from the file and the desired slice decompressed. This avoids storing the full compressed data.

This function acts as a dispatch to the correct version of the reading function. The function is selected based on the format version string in the data.

If stream_slice is specified, the returned array will have only that range of samples in the final dimension.

If keep is specified, this should be a boolean array with the same shape as the leading dimensions of the original array. True values in this array indicate that the stream should be kept.

If keep is specified, the returned array WILL NOT have the same shape as the original. Instead it will be a 2D array of decompressed streams- the streams corresponding to True values in the keep mask.

If keep_indices is True and keep is specified, then an additional list is returned containing the indices of each stream that was kept.

Parameters:

Name Type Description Default
zgrp Group

The group to read.

required
keep array

Bool array of streams to keep in the decompression.

None
stream_slice slice

A python slice with step size of one, indicating the sample range to extract from each stream.

None
keep_indices bool

If True, also return the original indices of the streams.

False
mpi_comm Comm

The optional MPI communicator over which to distribute the leading dimension of the array.

None
mpi_dist list

The optional list of tuples specifying the first / last element of the leading dimension to assign to each process.

None
use_threads bool

If True, use OpenMP threads to parallelize decoding. This is only beneficial for large arrays.

False

Returns:

Type Description
array

The loaded and decompressed data OR the array and the kept indices.

Source code in flacarray/zarr.py
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
@function_timer
def read_array(
    zgrp,
    keep=None,
    stream_slice=None,
    keep_indices=False,
    mpi_comm=None,
    mpi_dist=None,
    use_threads=False,
):
    """Load a numpy array from a compressed Zarr group.

    This function is useful if you do not need to store a compressed representation
    of the array in memory.  Each stream will be read individually from the file and
    the desired slice decompressed.  This avoids storing the full compressed data.

    This function acts as a dispatch to the correct version of the reading function.
    The function is selected based on the format version string in the data.

    If `stream_slice` is specified, the returned array will have only that
    range of samples in the final dimension.

    If `keep` is specified, this should be a boolean array with the same shape
    as the leading dimensions of the original array.  True values in this array
    indicate that the stream should be kept.

    If `keep` is specified, the returned array WILL NOT have the same shape as
    the original.  Instead it will be a 2D array of decompressed streams- the
    streams corresponding to True values in the `keep` mask.

    If `keep_indices` is True and `keep` is specified, then an additional list
    is returned containing the indices of each stream that was kept.

    Args:
        zgrp (zarr.Group):  The group to read.
        keep (array):  Bool array of streams to keep in the decompression.
        stream_slice (slice):  A python slice with step size of one, indicating
            the sample range to extract from each stream.
        keep_indices (bool):  If True, also return the original indices of the
            streams.
        mpi_comm (MPI.Comm):  The optional MPI communicator over which to distribute
            the leading dimension of the array.
        mpi_dist (list):  The optional list of tuples specifying the first / last
            element of the leading dimension to assign to each process.
        use_threads (bool):  If True, use OpenMP threads to parallelize decoding.
            This is only beneficial for large arrays.

    Returns:
        (array):  The loaded and decompressed data OR the array and the kept indices.

    """
    if not have_zarr:
        raise RuntimeError("zarr is not importable, cannot write to a Zarr Group")

    format_version = None
    if zgrp is not None:
        if "flacarray_format_version" in zgrp.attrs:
            format_version = zgrp.attrs["flacarray_format_version"]
    if mpi_comm is not None:
        format_version = mpi_comm.bcast(format_version, root=0)
    if format_version is None:
        raise RuntimeError("Zarr Group does not contain a FlacArray")

    mod_name = f".zarr_load_v{format_version}"
    mod = importlib.import_module(mod_name, package="flacarray")
    read_func = getattr(mod, "read_array")
    return read_func(
        zgrp,
        keep=keep,
        stream_slice=stream_slice,
        keep_indices=keep_indices,
        mpi_comm=mpi_comm,
        mpi_dist=mpi_dist,
        use_threads=use_threads,
    )

Interactive Tools

The flacarray.demo submodule contains a few helper functions that are not imported by default. You will need to have optional dependencies (matplotlib) installed to use the visualization tools. For testing, it is convenient to generate arrays consisting of random timestreams with some structure. The create_fake_data function can be used for this.

flacarray.demo.create_fake_data(shape, sigma=1.0, dtype=np.float64)

Source code in flacarray/demo.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def create_fake_data(shape, sigma=1.0, dtype=np.float64):
    flatshape = np.prod(shape)
    stream_size = shape[-1]
    leading_shape = shape[:-1]
    leading_shape_ext = leading_shape + (1,)

    rng = np.random.default_rng(seed=123456789)

    # Construct a random DC level for each stream that is +/- 5 sigma
    dc = 5 * sigma * (rng.random(size=leading_shape_ext) - 0.5)

    # Construct a simple low frequency waveform (assume 1Hz sampling)
    wave = np.zeros(stream_size, dtype=dtype)
    t = np.arange(stream_size)
    minf = 5 / stream_size
    for freq, amp in zip([3 * minf, minf], [2 * sigma, 6 * sigma]):
        wave[:] += amp * np.sin(2 * np.pi * freq * t)

    # Initialize all streams to a scaled version of this waveform plus the DC level
    scale = rng.random(size=leading_shape_ext)
    leading_slc = tuple([slice(None) for x in leading_shape])
    data = np.empty(shape, dtype=dtype)
    data[leading_slc] = dc
    data[leading_slc] += scale * wave

    # Add some Gaussian random noise to each stream
    data[:] += rng.normal(0.0, sigma, flatshape).reshape(shape)

    return data

Most data arrays in practice have 2 or 3 dimensions. If the number of streams is relatively small, then an uncompressed array can be plotted with the plot_data function.

flacarray.demo.plot_data(data, keep=None, stream_slc=slice(None), file=None)

Source code in flacarray/demo.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def plot_data(data, keep=None, stream_slc=slice(None), file=None):
    # We only import matplotlib if we are actually going to make some plots.
    # This is not a required package.
    import matplotlib.pyplot as plt

    if len(data.shape) > 3:
        raise NotImplementedError("Can only plot 1D and 2D arrays of streams")

    if len(data.shape) == 1:
        plot_rows = 1
        plot_cols = 1
    elif len(data.shape) == 2:
        plot_rows = data.shape[0]
        plot_cols = 1
    else:
        plot_rows = data.shape[1]
        plot_cols = data.shape[0]

    fig_dpi = 100
    fig_width = 6 * plot_cols
    fig_height = 4 * plot_rows
    fig = plt.figure(figsize=(fig_width, fig_height), dpi=fig_dpi)
    if len(data.shape) == 1:
        # Single stream
        ax = fig.add_subplot(1, 1, 1, aspect="auto")
        ax.plot(data[stream_slc])
    elif len(data.shape) == 2:
        # 1-D array of streams, plot vertically
        for iplot in range(data.shape[0]):
            ax = fig.add_subplot(plot_rows, 1, iplot + 1, aspect="auto")
            ax.plot(data[iplot, stream_slc])
    else:
        # 2-D array of streams, plot in a grid
        for row in range(plot_rows):
            for col in range(plot_cols):
                slc = (col, row, stream_slc)
                ax = fig.add_subplot(
                    plot_rows, plot_cols, row * plot_cols + col + 1, aspect="auto"
                )
                ax.plot(data[slc], color="black")
    if file is None:
        plt.show()
    else:
        plt.savefig(file)
        plt.close()

Low-Level Tools

For specialized use cases, you can also work directly with the compressed bytestream and auxiliary arrays and convert to / from numpy arrays.

flacarray.compress.array_compress(arr, level=5, quanta=None, precision=None, use_threads=False)

Compress a numpy array with optional floating point conversion.

If arr is an int32 array, the returned stream offsets and gains will be None. if arr is an int64 array, the stream offsets will be the integer value subtracted when converting to int32. Both float32 and float64 data will have floating point offset and gain arrays returned.

Parameters:

Name Type Description Default
arr ndarray

The input array data.

required
level int

Compression level (0-8).

5
quanta (float, array)

For floating point data, the floating point increment of each 32bit integer value. Optionally an array of increments, one per stream.

None
precision (int, array)

Number of significant digits to retain in float-to-int conversion. Alternative to quanta. Optionally an iterable of values, one per stream.

None
use_threads bool

If True, use OpenMP threads to parallelize decoding. This is only beneficial for large arrays.

False

Returns:

Type Description
tuple

The (compressed bytes, stream starts, stream_nbytes, stream offsets, stream gains)

Source code in flacarray/compress.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
@function_timer
def array_compress(arr, level=5, quanta=None, precision=None, use_threads=False):
    """Compress a numpy array with optional floating point conversion.

    If `arr` is an int32 array, the returned stream offsets and gains will be None.
    if `arr` is an int64 array, the stream offsets will be the integer value subtracted
    when converting to int32.  Both float32 and float64 data will have floating point
    offset and gain arrays returned.

    Args:
        arr (numpy.ndarray):  The input array data.
        level (int):  Compression level (0-8).
        quanta (float, array):  For floating point data, the floating point
            increment of each 32bit integer value.  Optionally an array of
            increments, one per stream.
        precision (int, array):  Number of significant digits to retain in
            float-to-int conversion.  Alternative to `quanta`.  Optionally an
            iterable of values, one per stream.
        use_threads (bool):  If True, use OpenMP threads to parallelize decoding.
            This is only beneficial for large arrays.

    Returns:
        (tuple): The (compressed bytes, stream starts, stream_nbytes, stream offsets,
            stream gains)

    """
    if arr.size == 0:
        raise ValueError("Cannot compress a zero-sized array!")
    leading_shape = arr.shape[:-1]

    if quanta is not None:
        if precision is not None:
            raise RuntimeError("Cannot set both quanta and precision")
        try:
            nq = len(quanta)
            # This is an array
            if nq.shape != leading_shape:
                msg = "If not a scalar, quanta must have the same shape as the "
                msg += "leading dimensions of the array"
                raise ValueError(msg)
            dquanta = quanta.astype(arr.dtype)
        except TypeError:
            # This is a scalar, applied to all detectors
            dquanta = quanta * np.ones(leading_shape, dtype=arr.dtype)
    else:
        dquanta = None

    if arr.dtype == np.dtype(np.int32):
        (compressed, starts, nbytes) = encode_flac(arr, level, use_threads=use_threads)
        return (compressed, starts, nbytes, None, None)
    elif arr.dtype == np.dtype(np.int64):
        idata, ioff = int64_to_int32(arr)
        (compressed, starts, nbytes) = encode_flac(
            idata, level, use_threads=use_threads
        )
        return (compressed, starts, nbytes, ioff, None)
    elif arr.dtype == np.dtype(np.float64) or arr.dtype == np.dtype(np.float32):
        idata, foff, gains = float_to_int32(arr, quanta=dquanta, precision=precision)
        (compressed, starts, nbytes) = encode_flac(
            idata, level, use_threads=use_threads
        )
        return (compressed, starts, nbytes, foff, gains)
    else:
        raise ValueError(f"Unsupported data type '{arr.dtype}'")

flacarray.decompress.array_decompress(compressed, stream_size, stream_starts, stream_nbytes, stream_offsets=None, stream_gains=None, first_stream_sample=None, last_stream_sample=None, use_threads=False)

Decompress a FLAC encoded array and restore original data type.

If stream_gains is specified, the output data will be float32 and stream_offsets must also be provided. If stream_gains is not specified, but stream_offsets is, then the returned data will be int64. If neither offsets or gains are specified, the decompressed int32 array is returned.

To decompress a subset of samples in all streams, specify the first_stream_sample and last_stream_sample values. None values or negative values disable this feature.

Parameters:

Name Type Description Default
compressed array

The array of compressed bytes.

required
stream_size int

The length of the decompressed final dimension.

required
stream_starts array

The array of starting bytes in the bytestream.

required
stream_nbytes array

The array of number of bytes in each stream.

required
stream_offsets array

The array of offsets, one per stream.

None
stream_gains array

The array of gains, one per stream.

None
first_stream_sample int

The first sample of every stream to decompress.

None
last_stream_sample int

The last sample of every stream to decompress.

None
use_threads bool

If True, use OpenMP threads to parallelize decoding. This is only beneficial for large arrays.

False

Returns:

Type Description
array

The output array.

Source code in flacarray/decompress.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
@function_timer
def array_decompress(
    compressed,
    stream_size,
    stream_starts,
    stream_nbytes,
    stream_offsets=None,
    stream_gains=None,
    first_stream_sample=None,
    last_stream_sample=None,
    use_threads=False,
):
    """Decompress a FLAC encoded array and restore original data type.

    If `stream_gains` is specified, the output data will be float32 and `stream_offsets`
    must also be provided.  If `stream_gains` is not specified, but `stream_offsets` is,
    then the returned data will be int64.  If neither offsets or gains are specified,
    the decompressed int32 array is returned.

    To decompress a subset of samples in all streams, specify the `first_stream_sample`
    and `last_stream_sample` values.  None values or negative values disable this
    feature.

    Args:
        compressed (array):  The array of compressed bytes.
        stream_size (int):  The length of the decompressed final dimension.
        stream_starts (array):  The array of starting bytes in the bytestream.
        stream_nbytes (array):  The array of number of bytes in each stream.
        stream_offsets (array):  The array of offsets, one per stream.
        stream_gains (array):  The array of gains, one per stream.
        first_stream_sample (int):  The first sample of every stream to decompress.
        last_stream_sample (int):  The last sample of every stream to decompress.
        use_threads (bool):  If True, use OpenMP threads to parallelize decoding.
            This is only beneficial for large arrays.

    Returns:
        (array): The output array.

    """
    arr, _ = array_decompress_slice(
        compressed,
        stream_size,
        stream_starts,
        stream_nbytes,
        stream_offsets=stream_offsets,
        stream_gains=stream_gains,
        keep=None,
        first_stream_sample=first_stream_sample,
        last_stream_sample=last_stream_sample,
        use_threads=use_threads,
    )
    return arr

flacarray.decompress.array_decompress_slice(compressed, stream_size, stream_starts, stream_nbytes, stream_offsets=None, stream_gains=None, keep=None, first_stream_sample=None, last_stream_sample=None, use_threads=False)

Decompress a slice of a FLAC encoded array and restore original data type.

If stream_gains is specified, the output data will be float32 and stream_offsets must also be provided. If stream_gains is not specified, but stream_offsets is, then the returned data will be int64. If neither offsets or gains are specified, the decompressed int32 array is returned.

To decompress a subset of samples in all streams, specify the first_stream_sample and last_stream_sample values. None values or negative values disable this feature.

To decompress a subset of streams, pass a boolean array to the keep argument. This should have the same shape as the starts array. Only streams with a True value in the keep array will be decompressed.

If the keep array is specified, the output tuple will contain the 2D array of streams that were kept, as well as a list of tuples indicating the original array indices for each stream in the output. If the keep array is None, the output tuple will contain an array with the original N-dimensional leading array shape and the trailing number of samples. The second element of the tuple will be None.

Parameters:

Name Type Description Default
compressed array

The array of compressed bytes.

required
stream_size int

The length of the decompressed final dimension.

required
stream_starts array

The array of starting bytes in the bytestream.

required
stream_nbytes array

The array of number of bytes in each stream.

required
stream_offsets array

The array of offsets, one per stream.

None
stream_gains array

The array of gains, one per stream.

None
keep array

Bool array of streams to keep in the decompression.

None
first_stream_sample int

The first sample of every stream to decompress.

None
last_stream_sample int

The last sample of every stream to decompress.

None
use_threads bool

If True, use OpenMP threads to parallelize decoding. This is only beneficial for large arrays.

False

Returns:

Type Description
tuple

The (output array, list of stream indices).

Source code in flacarray/decompress.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
@function_timer
def array_decompress_slice(
    compressed,
    stream_size,
    stream_starts,
    stream_nbytes,
    stream_offsets=None,
    stream_gains=None,
    keep=None,
    first_stream_sample=None,
    last_stream_sample=None,
    use_threads=False,
):
    """Decompress a slice of a FLAC encoded array and restore original data type.

    If `stream_gains` is specified, the output data will be float32 and `stream_offsets`
    must also be provided.  If `stream_gains` is not specified, but `stream_offsets` is,
    then the returned data will be int64.  If neither offsets or gains are specified,
    the decompressed int32 array is returned.

    To decompress a subset of samples in all streams, specify the `first_stream_sample`
    and `last_stream_sample` values.  None values or negative values disable this
    feature.

    To decompress a subset of streams, pass a boolean array to the `keep` argument.
    This should have the same shape as the `starts` array.  Only streams with a True
    value in the `keep` array will be decompressed.

    If the `keep` array is specified, the output tuple will contain the 2D array of
    streams that were kept, as well as a list of tuples indicating the original array
    indices for each stream in the output.  If the `keep` array is None, the output
    tuple will contain an array with the original N-dimensional leading array shape
    and the trailing number of samples.  The second element of the tuple will be None.

    Args:
        compressed (array):  The array of compressed bytes.
        stream_size (int):  The length of the decompressed final dimension.
        stream_starts (array):  The array of starting bytes in the bytestream.
        stream_nbytes (array):  The array of number of bytes in each stream.
        stream_offsets (array):  The array of offsets, one per stream.
        stream_gains (array):  The array of gains, one per stream.
        keep (array):  Bool array of streams to keep in the decompression.
        first_stream_sample (int):  The first sample of every stream to decompress.
        last_stream_sample (int):  The last sample of every stream to decompress.
        use_threads (bool):  If True, use OpenMP threads to parallelize decoding.
            This is only beneficial for large arrays.

    Returns:
        (tuple): The (output array, list of stream indices).

    """
    if first_stream_sample is None:
        first_stream_sample = -1
    if last_stream_sample is None:
        last_stream_sample = -1

    starts, nbytes, indices = keep_select(keep, stream_starts, stream_nbytes)
    offsets = select_keep_indices(stream_offsets, indices)
    gains = select_keep_indices(stream_gains, indices)

    if stream_offsets is not None:
        if stream_gains is not None:
            # This is floating point data
            idata = decode_flac(
                compressed,
                starts,
                nbytes,
                stream_size,
                first_sample=first_stream_sample,
                last_sample=last_stream_sample,
                use_threads=use_threads,
            )
            arr = int32_to_float(idata, offsets, gains)
        else:
            # This is int64 data
            idata = decode_flac(
                compressed,
                starts,
                nbytes,
                stream_size,
                first_sample=first_stream_sample,
                last_sample=last_stream_sample,
                use_threads=use_threads,
            )
            ext_shape = offsets.shape + (1,)
            arr = idata.astype(np.int64) + offsets.reshape(ext_shape)
    else:
        if stream_gains is not None:
            raise RuntimeError(
                "When specifying gains, you must also provide the offsets"
            )
        # This is int32 data
        arr = decode_flac(
            compressed,
            starts,
            nbytes,
            stream_size,
            first_sample=first_stream_sample,
            last_sample=last_stream_sample,
            use_threads=use_threads,
        )
    return (arr, indices)