Skip to content

API Reference

The flacarray package consists of a primary class (FlacArray) plus a variety of helper functions.

Compressed Array Representation

The FlacArray class stores a compressed representation of an N dimensional array where the last dimension consists of "streams" of numbers to be compressed.

flacarray.FlacArray

FLAC compressed array representation.

This class holds a compressed representation of an N-dimensional array. The final (fastest changing) dimension is the axis along which the data is compressed. Each of the vectors in this last dimension is called a "stream" here. The leading dimensions of the original matrix form an array of these streams.

Internally, the data is stored as a contiguous concatenation of the bytes from these compressed streams. A separate array contains the starting byte of each stream in the overall bytes array. The shape of the starting array corresponds to the shape of the leading, un-compressed dimensions of the original array.

If the input data is 32bit or 64bit integers, each stream in the array is compressed directly with FLAC.

If the input data is 32bit or 64bit floating point numbers, then you must specify exactly one of either quanta or precision when calling from_array(). For floating point data, the mean of each stream is computed and rounded to the nearest whole quanta. This "offset" per stream is recorded and subtracted from the stream. The offset-subtracted stream data is then rescaled and truncated to integers (int32 or int64 depending on the bit width of the input array). If quanta is specified, the data is rescaled by 1 / quanta. The quanta may either be a scalar applied to all streams, or an array of values, one per stream. If instead the precision (integer number of decimal places) is specified, this is converted to a quanta by dividing the stream RMS by 10^{precision}. Similar to quanta, the precision may be specified as a single value for all streams, or as an array of values, one per stream.

If you choose a quanta value that is close to machine epsilon (e.g. 1e-7 for 32bit or 1e-16 for 64bit), then the compression amount will be negligible but the results nearly lossless. Compression of floating point data should not be done blindly and you should consider the underlying precision of the data you are working with in order to achieve the best compression possible.

The following rules summarize the data conversion that is performed depending on the input type:

  • int32: No conversion. Compressed to single channel FLAC bytestream.

  • int64: No conversion. Compressed to 2-channel (stereo) FLAC bytestream.

  • float32: Subtract the offset per stream and scale data based on the quanta value or precision (see above). Then round to nearest 32bit integer.

  • float64: Subtract the offset per stream and scale data based on the quanta value or precision (see above). Then round to nearest 64bit integer.

After conversion to integers, each stream's data is separately compressed into a sequence of FLAC bytes, which is appended to the bytestream. The offset in bytes for each stream is recorded.

A FlacArray is only constructed directly when making a copy. Use the class methods to create FlacArrays from numpy arrays or on-disk representations.

Parameters:

Name Type Description Default
other FlacArray

Construct a copy of the input FlacArray.

required
Source code in flacarray/array.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
class FlacArray:
    """FLAC compressed array representation.

    This class holds a compressed representation of an N-dimensional array.  The final
    (fastest changing) dimension is the axis along which the data is compressed.  Each
    of the vectors in this last dimension is called a "stream" here.  The leading
    dimensions of the original matrix form an array of these streams.

    Internally, the data is stored as a contiguous concatenation of the bytes from
    these compressed streams.  A separate array contains the starting byte of each
    stream in the overall bytes array.  The shape of the starting array corresponds
    to the shape of the leading, un-compressed dimensions of the original array.

    If the input data is 32bit or 64bit integers, each stream in the array is
    compressed directly with FLAC.

    If the input data is 32bit or 64bit floating point numbers, then you **must**
    specify exactly one of either quanta or precision when calling `from_array()`.  For
    floating point data, the mean of each stream is computed and rounded to the nearest
    whole quanta.  This "offset" per stream is recorded and subtracted from the
    stream.  The offset-subtracted stream data is then rescaled and truncated to
    integers (int32 or int64 depending on the bit width of the input array).  If
    `quanta` is specified, the data is rescaled by 1 / quanta.  The quanta may either
    be a scalar applied to all streams, or an array of values, one per stream.  If
    instead the precision (integer number of decimal places) is specified, this is
    converted to a quanta by dividing the stream RMS by `10^{precision}`.  Similar to
    quanta, the precision may be specified as a single value for all streams, or as an
    array of values, one per stream.

    If you choose a quanta value that is close to machine epsilon (e.g. 1e-7 for 32bit
    or 1e-16 for 64bit), then the compression amount will be negligible but the results
    nearly lossless. Compression of floating point data should not be done blindly and
    you should consider the underlying precision of the data you are working with in
    order to achieve the best compression possible.

    The following rules summarize the data conversion that is performed depending on
    the input type:

    * int32:  No conversion.  Compressed to single channel FLAC bytestream.

    * int64:  No conversion.  Compressed to 2-channel (stereo) FLAC bytestream.

    * float32:  Subtract the offset per stream and scale data based on the quanta value
        or precision (see above).  Then round to nearest 32bit integer.

    * float64:  Subtract the offset per stream and scale data based on the quanta value
        or precision (see above).  Then round to nearest 64bit integer.

    After conversion to integers, each stream's data is separately compressed into a
    sequence of FLAC bytes, which is appended to the bytestream.  The offset in bytes
    for each stream is recorded.

    A FlacArray is only constructed directly when making a copy.  Use the class methods
    to create FlacArrays from numpy arrays or on-disk representations.

    Args:
        other (FlacArray):  Construct a copy of the input FlacArray.

    """

    def __init__(
        self,
        other,
        shape=None,
        global_shape=None,
        compressed=None,
        dtype=None,
        stream_starts=None,
        stream_nbytes=None,
        stream_offsets=None,
        stream_gains=None,
        mpi_comm=None,
        mpi_dist=None,
    ):
        if other is not None:
            # We are copying an existing object, make sure we have an
            # independent copy.
            self._shape = copy.deepcopy(other._shape)
            self._global_shape = copy.deepcopy(other._global_shape)
            self._compressed = copy.deepcopy(other._compressed)
            self._dtype = np.dtype(other._dtype)
            self._stream_starts = copy.deepcopy(other._stream_starts)
            self._stream_nbytes = copy.deepcopy(other._stream_nbytes)
            self._stream_offsets = copy.deepcopy(other._stream_offsets)
            self._stream_gains = copy.deepcopy(other._stream_gains)
            self._mpi_dist = copy.deepcopy(other._mpi_dist)
            # MPI communicators can be limited in number and expensive to create.
            self._mpi_comm = other._mpi_comm
        else:
            # This form of constructor is used in the class methods where we
            # have already created these arrays for use by this instance.
            self._shape = shape
            self._global_shape = global_shape
            self._compressed = compressed
            self._dtype = np.dtype(dtype)
            self._stream_starts = stream_starts
            self._stream_nbytes = stream_nbytes
            self._stream_offsets = stream_offsets
            self._stream_gains = stream_gains
            self._mpi_comm = mpi_comm
            self._mpi_dist = mpi_dist
        self._init_params()

    def _init_params(self):
        # The input `_shape` parameter is the original shape when the instance
        # was created from an array or read from disk.  In the case of a single
        # stream, this tracks the user intentions about whether to flatten the
        # leading dimension.  We also track the "local shape", with is the same,
        # but which always keeps the leading dimension.
        if len(self._shape) == 1:
            self._flatten_single = True
            self._local_shape = (1, self._shape[0])
        else:
            self._flatten_single = False
            self._local_shape = self._shape

        self._local_nbytes = self._compressed.nbytes
        (
            self._global_nbytes,
            self._global_proc_nbytes,
            self._global_stream_starts,
        ) = global_bytes(self._local_nbytes, self._stream_starts, self._mpi_comm)
        self._leading_shape = self._local_shape[:-1]
        self._global_leading_shape = self._global_shape[:-1]
        self._stream_size = self._local_shape[-1]

        # For reference, record the type string of the original data.
        self._typestr = self._dtype_str(self._dtype)
        # Track whether we have 32bit or 64bit data
        self._is_int64 = self._dtype == np.dtype(np.int64) or self._dtype == np.dtype(
            np.float64
        )

    @staticmethod
    def _dtype_str(dt):
        if dt == np.dtype(np.float64):
            return "float64"
        elif dt == np.dtype(np.float32):
            return "float32"
        elif dt == np.dtype(np.int64):
            return "int64"
        elif dt == np.dtype(np.int32):
            return "int32"
        else:
            msg = f"Unsupported dtype '{dt}'"
            raise RuntimeError(msg)
        return None

    # Shapes of decompressed array

    @property
    def shape(self):
        """The shape of the local, uncompressed array."""
        return self._shape

    @property
    def global_shape(self):
        """The global shape of array across any MPI communicator."""
        return self._global_shape

    @property
    def leading_shape(self):
        """The local shape of leading uncompressed dimensions."""
        return self._leading_shape

    @property
    def global_leading_shape(self):
        """The global shape of leading uncompressed dimensions across all processes."""
        return self._global_leading_shape

    @property
    def stream_size(self):
        """The uncompressed length of each stream."""
        return self._stream_size

    # Properties of the compressed data

    @property
    def nbytes(self):
        """The total number of bytes used by compressed data on the local process."""
        return self._local_nbytes

    @property
    def global_nbytes(self):
        """The sum of total bytes used by compressed data across all processes."""
        return self._global_nbytes

    @property
    def global_process_nbytes(self):
        """The bytes used by compressed data on each process."""
        return self._global_proc_bytes

    @property
    def nstreams(self):
        """The number of local streams (product of entries of `leading_shape`)"""
        return self._local_nstreams

    @property
    def global_nstreams(self):
        """Number of global streams (product of entries of `global_leading_shape`)"""
        return self._global_nstreams

    @property
    def compressed(self):
        """The concatenated raw bytes of all streams on the local process."""
        return self._compressed

    @property
    def stream_starts(self):
        """The array of starting bytes for each stream on the local process."""
        return self._stream_starts

    @property
    def stream_nbytes(self):
        """The array of nbytes for each stream on the local process."""
        return self._stream_nbytes

    @property
    def global_stream_starts(self):
        """The array of starting bytes within the global compressed data."""
        return self._global_stream_starts

    @property
    def global_stream_nbytes(self):
        """The array of nbytes within the global compressed data."""
        return self._global_stream_nbytes

    @property
    def stream_offsets(self):
        """The value subtracted from each stream during conversion to int32."""
        return self._stream_offsets

    @property
    def stream_gains(self):
        """The gain factor for each stream during conversion to int32."""
        return self._stream_gains

    @property
    def mpi_comm(self):
        """The MPI communicator over which the array is distributed."""
        return self._mpi_comm

    @property
    def mpi_dist(self):
        """The range of the leading dimension assigned to each MPI process."""
        return self._mpi_dist

    @property
    def dtype(self):
        """The dtype of the uncompressed array."""
        return self._dtype

    @property
    def typestr(self):
        """A string representation of the original data type."""
        return self._typestr

    # __getitem__ slicing / decompression on the fly and associated
    # helper functions.

    def _slice_nelem(self, slc, dim):
        """Get the number of elements in a slice."""
        start, stop, step = slc.indices(dim)
        nslc = (stop - start) // step
        if nslc < 0:
            nslc = 0
        return nslc

    def _keep_view(self, key):
        """Convert leading-shape key to bool array."""
        if len(key) != len(self._leading_shape):
            msg = f"keep_view {key} does not match leading "
            msg += f"dimensions {len(self._leading_shape)}"
            raise ValueError(msg)
        view = np.zeros(self._leading_shape, dtype=bool)
        view[key] = True
        return view

    def _get_full_key(self, key):
        """Process the incoming key so that it covers all dimensions.

        Args:
            key (tuple):  The input key consisting of an integer or a tuple
                of slices and / or integers.

        Result:
            (tuple):  The full key.

        """
        ndim = len(self._local_shape)
        full_key = list()
        if self._flatten_single:
            # Our array is a single stream with flattened shape.  The user
            # supplied key should only contain the sample axis.
            if isinstance(key, tuple):
                # It better have length == 1...
                if len(key) != 1:
                    msg = f"Slice key {key} is not valid for single, "
                    msg += "flattened stream."
                    raise ValueError(msg)
                full_key = [0, key[0]]
            else:
                # Single element, compress sample dimension
                full_key = [0, key]
        else:
            if isinstance(key, tuple):
                for axis, axkey in enumerate(key):
                    full_key.append(axkey)
            else:
                full_key.append(key)

        if len(full_key) > ndim:
            msg = f"Invalid slice key {key}, too many dimensions"
            raise ValueError(msg)

        # Fill in remaining dimensions
        filled = len(full_key)
        full_key.extend([slice(None) for x in range(len(self._local_shape) - filled)])
        return full_key

    def _get_leading_axes(self, full_key):
        """Process the leading axes.

        Args:
            full_key (tuple):  The full-rank selection key.

        Returns:
            (tuple):  The (leading_shape, keep array).

        """
        leading_shape = list()
        keep_slice = list()

        if self._flatten_single:
            # Our array is a single stream with flattened shape.
            keep_slice = [0,]
        else:
            for axis, axkey in enumerate(full_key[:-1]):
                if not isinstance(axkey, (int, np.integer)):
                    # Some kind of slice, do not compress this dimension.
                    nslc = self._slice_nelem(axkey, self._local_shape[axis])
                    leading_shape.append(nslc)
                else:
                    # Check for validity
                    if axkey < 0 or axkey >= self._local_shape[axis]:
                        # Insert a zero-length dimension so that a zero-length
                        # array is returned in the calling code.
                        leading_shape.append(0)
                    else:
                        # This dimension is a single element and will be
                        # compressed.
                        pass
                keep_slice.append(axkey)
        leading_shape = tuple(leading_shape)
        keep_slice = tuple(keep_slice)
        if len(keep_slice) == 0:
            keep = None
        else:
            keep = self._keep_view(keep_slice)
        return leading_shape, keep

    def _get_sample_axis(self, full_key):
        """Process any slicing of the stream axis.

        Args:
            full_key (tuple):  The full-rank selection key.

        Returns:
            (tuple):  The (first, last, sample_shape).

        """
        sample_key = full_key[-1]
        if sample_key is None:
            return (0, self._stream_size, (self._stream_size,))
        if isinstance(sample_key, slice):
            start, stop, step = sample_key.indices(self._stream_size)
            if step != 1:
                msg = "Only stride==1 supported on stream slices"
                raise ValueError(msg)
            if stop - start <= 0:
                # No samples
                return (0, 0, (0,))
            return (start, stop, (stop-start,))
        elif isinstance(sample_key, (int, np.integer)):
            # Just a scalar
            return (sample_key, sample_key + 1, ())
        else:
            msg = "Stream dimension supports contiguous slices or single indices."
            raise ValueError(msg)

    def __getitem__(self, raw_key):
        """Decompress a slice of data on the fly.

        Args:
            raw_key (tuple):  A tuple of slices or integers.

        Returns:
            (array):  The decompressed array slice.

        """
        # Get the key for all dimensions
        key = self._get_full_key(raw_key)

        # Compute the output leading shape and keep array
        leading_shape, keep = self._get_leading_axes(key)

        # Compute sample axis slice
        first, last, sample_shape = self._get_sample_axis(key)

        full_shape = leading_shape + sample_shape
        if len(full_shape) == 0:
            n_total = 0
        else:
            n_total = np.prod(full_shape)
        if n_total == 0:
            # At least one dimension was zero, return empty array
            return np.zeros(full_shape, dtype=self._dtype)
        else:
            arr, strm_indices = array_decompress_slice(
                self._compressed,
                self._stream_size,
                self._stream_starts,
                self._stream_nbytes,
                stream_offsets=self._stream_offsets,
                stream_gains=self._stream_gains,
                keep=keep,
                first_stream_sample=first,
                last_stream_sample=last,
                is_int64=self._is_int64,
            )
            return arr.reshape(full_shape)

    def __delitem__(self, key):
        raise RuntimeError("Cannot delete individual streams")

    def __setitem__(self, key, value):
        raise RuntimeError("Cannot modify individual byte streams")

    def __repr__(self):
        rank = 0
        mpistr = ""
        if self._mpi_comm is not None:
            rank = self._mpi_comm.rank
            mpistr = f" | Rank {rank:04d} "
            mpistr += f"{self._mpi_dist[rank][0]}-"
            mpistr += f"{self._mpi_dist[rank][1] - 1} |"
        rep = f"<FlacArray{mpistr} {self._typestr} "
        rep += f"shape={self._shape} bytes={self._local_nbytes}>"
        return rep

    def __eq__(self, other):
        if self._shape != other._shape:
            log.debug(f"other shape {other._shape} != {self._shape}")
            return False
        if self._dtype != other._dtype:
            log.debug(f"other dtype {other._dtype} != {self._dtype}")
            return False
        if self._global_shape != other._global_shape:
            msg = f"other global_shape {other._global_shape} != {self._global_shape}"
            log.debug(msg)
            return False
        if not np.array_equal(self._stream_starts, other._stream_starts):
            msg = f"other starts {other._stream_starts} != {self._stream_starts}"
            log.debug(msg)
            return False
        if not np.array_equal(self._compressed, other._compressed):
            msg = f"other compressed {other._compressed} != {self._compressed}"
            log.debug(msg)
            return False
        if self._stream_offsets is None:
            if other._stream_offsets is not None:
                log.debug("other stream_offsets not None, self is None")
                return False
        else:
            if other._stream_offsets is None:
                log.debug("other stream_offsets is None, self is not None")
                return False
            else:
                if not np.allclose(self._stream_offsets, other._stream_offsets):
                    msg = f"other stream_offsets {other._stream_offsets} != "
                    msg += f"{self._stream_offsets}"
                    log.debug(msg)
                    return False
        if self._stream_gains is None:
            if other._stream_gains is not None:
                log.debug("other stream_gains not None, self is None")
                return False
        else:
            if other._stream_gains is None:
                log.debug("other stream_offsets is None, self is not None")
                return False
            else:
                if not np.allclose(self._stream_gains, other._stream_gains):
                    msg = f"other stream_gains {other._stream_gains} != "
                    msg += f"{self._stream_gains}"
                    log.debug(msg)
                    return False
        return True

    def to_array(
        self,
        keep=None,
        stream_slice=None,
        keep_indices=False,
        use_threads=False,
    ):
        """Decompress local data into a numpy array.

        This uses the compressed representation to reconstruct a normal numpy
        array.  The returned data type will be either int32, int64, float32, or
        float64 depending on the original data type.

        If `stream_slice` is specified, the returned array will have only that
        range of samples in the final dimension.

        If `keep` is specified, this should be a boolean array with the same shape
        as the leading dimensions of the original array.  True values in this array
        indicate that the stream should be kept.

        If `keep` is specified, the returned array WILL NOT have the same shape as
        the original.  Instead it will be a 2D array of decompressed streams- the
        streams corresponding to True values in the `keep` mask.

        If `keep_indices` is True and `keep` is specified, then a tuple of two values
        is returned.  The first is the array of decompressed streams.  The second is
        a list of tuples, each of which specifies the indices of the stream in the
        original array.

        Args:
            keep (array):  Bool array of streams to keep in the decompression.
            stream_slice (slice):  A python slice with step size of one, indicating
                the sample range to extract from each stream.
            keep_indices (bool):  If True, also return the original indices of the
                streams.
            use_threads (bool):  If True, use OpenMP threads to parallelize decoding.
                This is only beneficial for large arrays.

        """
        first_samp = None
        last_samp = None
        if stream_slice is not None:
            if stream_slice.step is not None and stream_slice.step != 1:
                raise RuntimeError(
                    "Only stream slices with a step size of 1 are supported"
                )
            first_samp = stream_slice.start
            last_samp = stream_slice.stop

        arr, indices = array_decompress_slice(
            self._compressed,
            self._stream_size,
            self._stream_starts,
            self._stream_nbytes,
            stream_offsets=self._stream_offsets,
            stream_gains=self._stream_gains,
            keep=keep,
            first_stream_sample=first_samp,
            last_stream_sample=last_samp,
            is_int64=self._is_int64,
            use_threads=use_threads,
            no_flatten=(not self._flatten_single),
        )
        if keep is not None and keep_indices:
            return (arr, indices)
        else:
            return arr

    @classmethod
    def from_array(
        cls, arr, level=5, quanta=None, precision=None, mpi_comm=None, use_threads=False
    ):
        """Construct a FlacArray from a numpy ndarray.

        Args:
            arr (numpy.ndarray):  The input array data.
            level (int):  Compression level (0-8).
            quanta (float, array):  For floating point data, the floating point
                increment of each 32bit integer value.  Optionally an iterable of
                increments, one per stream.
            precision (int, array):  Number of significant digits to retain in
                float-to-int conversion.  Alternative to `quanta`.  Optionally an
                iterable of values, one per stream.
            mpi_comm (MPI.Comm):  If specified, the input array is assumed to be
                distributed across the communicator at the leading dimension.  The
                local piece of the array is passed in on each process.
            use_threads (bool):  If True, use OpenMP threads to parallelize decoding.
                This is only beneficial for large arrays.

        Returns:
            (FlacArray):  A newly constructed FlacArray.

        """
        # Get the global shape of the array
        global_props = global_array_properties(arr.shape, mpi_comm=mpi_comm)
        global_shape = global_props["shape"]
        mpi_dist = global_props["dist"]

        # Compress our local piece of the array
        compressed, starts, nbytes, offsets, gains = array_compress(
            arr,
            level=level,
            quanta=quanta,
            precision=precision,
            use_threads=use_threads,
        )

        return FlacArray(
            None,
            shape=arr.shape,
            global_shape=global_shape,
            compressed=compressed,
            dtype=arr.dtype,
            stream_starts=starts,
            stream_nbytes=nbytes,
            stream_offsets=offsets,
            stream_gains=gains,
            mpi_comm=mpi_comm,
            mpi_dist=mpi_dist,
        )

    def write_hdf5(self, hgrp):
        """Write data to an HDF5 Group.

        The internal object properties are written to an open HDF5 group.  If you
        wish to use MPI I/O to write data to the group, then you must be using an MPI
        enabled h5py and you should pass in a valid handle to the group on all
        processes.

        If the `FlacArray` is distributed over an MPI communicator, but the h5py
        implementation does not support MPI I/O, then all data will be communicated
        to the rank zero process for writing.  In this case, the `hgrp` argument should
        be None except on the root process.

        Args:
            hgrp (h5py.Group):  The open Group for writing.

        Returns:
            None

        """
        if self._is_int64:
            n_channels = 2
        else:
            n_channels = 1

        hdf5_write_compressed(
            hgrp,
            self._leading_shape,
            self._global_leading_shape,
            self._stream_size,
            self._stream_starts,
            self._global_stream_starts,
            self._stream_nbytes,
            self._stream_offsets,
            self._stream_gains,
            self._compressed,
            n_channels,
            self._compressed.nbytes,
            self._global_nbytes,
            self._global_proc_nbytes,
            self._mpi_comm,
            self._mpi_dist,
        )

    @classmethod
    def read_hdf5(
        cls,
        hgrp,
        keep=None,
        mpi_comm=None,
        mpi_dist=None,
        no_flatten=False,
    ):
        """Construct a FlacArray from an HDF5 Group.

        This function loads all information about the array from an HDF5 group.  If
        `mpi_comm` is specified, the created array is distributed over that
        communicator.  If you also wish to use MPI I/O to read data from the group,
        then you must be using an MPI-enabled h5py and you should pass in a valid
        handle to the group on all processes.

        If `mpi_dist` is specified, it should be an iterable with the number of leading
        dimension elements assigned to each process.  If None, the leading dimension
        will be distributed uniformly.

        If `keep` is specified, this should be a boolean array with the same shape
        as the leading dimensions of the original array.  True values in this array
        indicate that the stream should be kept.

        If `keep` is specified, the returned array WILL NOT have the same shape as
        the original.  Instead it will be a 2D array of decompressed streams- the
        streams corresponding to True values in the `keep` mask.

        Args:
            hgrp (h5py.Group):  The open Group for reading.
            keep (array):  Bool array of streams to keep in the decompression.
            mpi_comm (MPI.Comm):  If specified, the communicator over which to
                distribute the leading dimension.
            mpi_dist (array):  If specified, assign blocks of these sizes to processes
                when distributing the leading dimension.
            no_flatten (bool):  If True, for single-stream arrays, leave the leading
                dimension of (1,) in the result.

        Returns:
            (FlacArray):  A newly constructed FlacArray.

        """
        (
            local_shape,
            global_shape,
            compressed,
            n_channels,
            stream_starts,
            stream_nbytes,
            stream_offsets,
            stream_gains,
            mpi_dist,
            keep_indices,
        ) = hdf5_read_compressed(
            hgrp,
            keep=keep,
            mpi_comm=mpi_comm,
            mpi_dist=mpi_dist,
        )

        dt = compressed_dtype(n_channels, stream_offsets, stream_gains)

        if (len(local_shape) == 2 and local_shape[0] == 1) and not no_flatten:
            # Flatten
            shape = (local_shape[1],)
        else:
            shape = local_shape

        return FlacArray(
            None,
            shape=shape,
            global_shape=global_shape,
            compressed=compressed,
            dtype=dt,
            stream_starts=stream_starts,
            stream_nbytes=stream_nbytes,
            stream_offsets=stream_offsets,
            stream_gains=stream_gains,
            mpi_comm=mpi_comm,
            mpi_dist=mpi_dist,
        )

    def write_zarr(self, zgrp):
        """Write data to an Zarr Group.

        The internal object properties are written to an open zarr group.

        If the `FlacArray` is distributed over an MPI communicator, then all data will
        be communicated to the rank zero process for writing.  In this case, the `zgrp`
        argument should be None except on the root process.

        Args:
            zgrp (zarr.Group):  The open Group for writing.

        Returns:
            None

        """
        if self._is_int64:
            n_channels = 2
        else:
            n_channels = 1
        zarr_write_compressed(
            zgrp,
            self._leading_shape,
            self._global_leading_shape,
            self._stream_size,
            self._stream_starts,
            self._global_stream_starts,
            self._stream_nbytes,
            self._stream_offsets,
            self._stream_gains,
            self._compressed,
            n_channels,
            self._compressed.nbytes,
            self._global_nbytes,
            self._global_proc_nbytes,
            self._mpi_comm,
            self._mpi_dist,
        )

    @classmethod
    def read_zarr(
        cls,
        zgrp,
        keep=None,
        mpi_comm=None,
        mpi_dist=None,
        no_flatten=False,
    ):
        """Construct a FlacArray from a Zarr Group.

        This function loads all information about the array from a zarr group.  If
        `mpi_comm` is specified, the created array is distributed over that
        communicator.

        If `mpi_dist` is specified, it should be an iterable with the number of leading
        dimension elements assigned to each process.  If None, the leading dimension
        will be distributed uniformly.

        If `keep` is specified, this should be a boolean array with the same shape
        as the leading dimensions of the original array.  True values in this array
        indicate that the stream should be kept.

        If `keep` is specified, the returned array WILL NOT have the same shape as
        the original.  Instead it will be a 2D array of decompressed streams- the
        streams corresponding to True values in the `keep` mask.

        Args:
            zgrp (zarr.Group):  The open Group for reading.
            keep (array):  Bool array of streams to keep in the decompression.
            mpi_comm (MPI.Comm):  If specified, the communicator over which to
                distribute the leading dimension.
            mpi_dist (array):  If specified, assign blocks of these sizes to processes
                when distributing the leading dimension.
            no_flatten (bool):  If True, for single-stream arrays, leave the leading
                dimension of (1,) in the result.

        Returns:
            (FlacArray):  A newly constructed FlacArray.

        """
        (
            local_shape,
            global_shape,
            compressed,
            n_channels,
            stream_starts,
            stream_nbytes,
            stream_offsets,
            stream_gains,
            mpi_dist,
            keep_indices,
        ) = zarr_read_compressed(
            zgrp,
            keep=keep,
            mpi_comm=mpi_comm,
            mpi_dist=mpi_dist,
        )

        dt = compressed_dtype(n_channels, stream_offsets, stream_gains)

        if (len(local_shape) == 2 and local_shape[0] == 1) and not no_flatten:
            # Flatten
            shape = (local_shape[1],)
        else:
            shape = local_shape

        return FlacArray(
            None,
            shape=shape,
            global_shape=global_shape,
            compressed=compressed,
            dtype=dt,
            stream_starts=stream_starts,
            stream_nbytes=stream_nbytes,
            stream_offsets=stream_offsets,
            stream_gains=stream_gains,
            mpi_comm=mpi_comm,
            mpi_dist=mpi_dist,
        )

compressed property

The concatenated raw bytes of all streams on the local process.

dtype property

The dtype of the uncompressed array.

global_leading_shape property

The global shape of leading uncompressed dimensions across all processes.

global_nbytes property

The sum of total bytes used by compressed data across all processes.

global_nstreams property

Number of global streams (product of entries of global_leading_shape)

global_process_nbytes property

The bytes used by compressed data on each process.

global_shape property

The global shape of array across any MPI communicator.

global_stream_nbytes property

The array of nbytes within the global compressed data.

global_stream_starts property

The array of starting bytes within the global compressed data.

leading_shape property

The local shape of leading uncompressed dimensions.

mpi_comm property

The MPI communicator over which the array is distributed.

mpi_dist property

The range of the leading dimension assigned to each MPI process.

nbytes property

The total number of bytes used by compressed data on the local process.

nstreams property

The number of local streams (product of entries of leading_shape)

shape property

The shape of the local, uncompressed array.

stream_gains property

The gain factor for each stream during conversion to int32.

stream_nbytes property

The array of nbytes for each stream on the local process.

stream_offsets property

The value subtracted from each stream during conversion to int32.

stream_size property

The uncompressed length of each stream.

stream_starts property

The array of starting bytes for each stream on the local process.

typestr property

A string representation of the original data type.

__getitem__(raw_key)

Decompress a slice of data on the fly.

Parameters:

Name Type Description Default
raw_key tuple

A tuple of slices or integers.

required

Returns:

Type Description
array

The decompressed array slice.

Source code in flacarray/array.py
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
def __getitem__(self, raw_key):
    """Decompress a slice of data on the fly.

    Args:
        raw_key (tuple):  A tuple of slices or integers.

    Returns:
        (array):  The decompressed array slice.

    """
    # Get the key for all dimensions
    key = self._get_full_key(raw_key)

    # Compute the output leading shape and keep array
    leading_shape, keep = self._get_leading_axes(key)

    # Compute sample axis slice
    first, last, sample_shape = self._get_sample_axis(key)

    full_shape = leading_shape + sample_shape
    if len(full_shape) == 0:
        n_total = 0
    else:
        n_total = np.prod(full_shape)
    if n_total == 0:
        # At least one dimension was zero, return empty array
        return np.zeros(full_shape, dtype=self._dtype)
    else:
        arr, strm_indices = array_decompress_slice(
            self._compressed,
            self._stream_size,
            self._stream_starts,
            self._stream_nbytes,
            stream_offsets=self._stream_offsets,
            stream_gains=self._stream_gains,
            keep=keep,
            first_stream_sample=first,
            last_stream_sample=last,
            is_int64=self._is_int64,
        )
        return arr.reshape(full_shape)

from_array(arr, level=5, quanta=None, precision=None, mpi_comm=None, use_threads=False) classmethod

Construct a FlacArray from a numpy ndarray.

Parameters:

Name Type Description Default
arr ndarray

The input array data.

required
level int

Compression level (0-8).

5
quanta (float, array)

For floating point data, the floating point increment of each 32bit integer value. Optionally an iterable of increments, one per stream.

None
precision (int, array)

Number of significant digits to retain in float-to-int conversion. Alternative to quanta. Optionally an iterable of values, one per stream.

None
mpi_comm Comm

If specified, the input array is assumed to be distributed across the communicator at the leading dimension. The local piece of the array is passed in on each process.

None
use_threads bool

If True, use OpenMP threads to parallelize decoding. This is only beneficial for large arrays.

False

Returns:

Type Description
FlacArray

A newly constructed FlacArray.

Source code in flacarray/array.py
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
@classmethod
def from_array(
    cls, arr, level=5, quanta=None, precision=None, mpi_comm=None, use_threads=False
):
    """Construct a FlacArray from a numpy ndarray.

    Args:
        arr (numpy.ndarray):  The input array data.
        level (int):  Compression level (0-8).
        quanta (float, array):  For floating point data, the floating point
            increment of each 32bit integer value.  Optionally an iterable of
            increments, one per stream.
        precision (int, array):  Number of significant digits to retain in
            float-to-int conversion.  Alternative to `quanta`.  Optionally an
            iterable of values, one per stream.
        mpi_comm (MPI.Comm):  If specified, the input array is assumed to be
            distributed across the communicator at the leading dimension.  The
            local piece of the array is passed in on each process.
        use_threads (bool):  If True, use OpenMP threads to parallelize decoding.
            This is only beneficial for large arrays.

    Returns:
        (FlacArray):  A newly constructed FlacArray.

    """
    # Get the global shape of the array
    global_props = global_array_properties(arr.shape, mpi_comm=mpi_comm)
    global_shape = global_props["shape"]
    mpi_dist = global_props["dist"]

    # Compress our local piece of the array
    compressed, starts, nbytes, offsets, gains = array_compress(
        arr,
        level=level,
        quanta=quanta,
        precision=precision,
        use_threads=use_threads,
    )

    return FlacArray(
        None,
        shape=arr.shape,
        global_shape=global_shape,
        compressed=compressed,
        dtype=arr.dtype,
        stream_starts=starts,
        stream_nbytes=nbytes,
        stream_offsets=offsets,
        stream_gains=gains,
        mpi_comm=mpi_comm,
        mpi_dist=mpi_dist,
    )

read_hdf5(hgrp, keep=None, mpi_comm=None, mpi_dist=None, no_flatten=False) classmethod

Construct a FlacArray from an HDF5 Group.

This function loads all information about the array from an HDF5 group. If mpi_comm is specified, the created array is distributed over that communicator. If you also wish to use MPI I/O to read data from the group, then you must be using an MPI-enabled h5py and you should pass in a valid handle to the group on all processes.

If mpi_dist is specified, it should be an iterable with the number of leading dimension elements assigned to each process. If None, the leading dimension will be distributed uniformly.

If keep is specified, this should be a boolean array with the same shape as the leading dimensions of the original array. True values in this array indicate that the stream should be kept.

If keep is specified, the returned array WILL NOT have the same shape as the original. Instead it will be a 2D array of decompressed streams- the streams corresponding to True values in the keep mask.

Parameters:

Name Type Description Default
hgrp Group

The open Group for reading.

required
keep array

Bool array of streams to keep in the decompression.

None
mpi_comm Comm

If specified, the communicator over which to distribute the leading dimension.

None
mpi_dist array

If specified, assign blocks of these sizes to processes when distributing the leading dimension.

None
no_flatten bool

If True, for single-stream arrays, leave the leading dimension of (1,) in the result.

False

Returns:

Type Description
FlacArray

A newly constructed FlacArray.

Source code in flacarray/array.py
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
@classmethod
def read_hdf5(
    cls,
    hgrp,
    keep=None,
    mpi_comm=None,
    mpi_dist=None,
    no_flatten=False,
):
    """Construct a FlacArray from an HDF5 Group.

    This function loads all information about the array from an HDF5 group.  If
    `mpi_comm` is specified, the created array is distributed over that
    communicator.  If you also wish to use MPI I/O to read data from the group,
    then you must be using an MPI-enabled h5py and you should pass in a valid
    handle to the group on all processes.

    If `mpi_dist` is specified, it should be an iterable with the number of leading
    dimension elements assigned to each process.  If None, the leading dimension
    will be distributed uniformly.

    If `keep` is specified, this should be a boolean array with the same shape
    as the leading dimensions of the original array.  True values in this array
    indicate that the stream should be kept.

    If `keep` is specified, the returned array WILL NOT have the same shape as
    the original.  Instead it will be a 2D array of decompressed streams- the
    streams corresponding to True values in the `keep` mask.

    Args:
        hgrp (h5py.Group):  The open Group for reading.
        keep (array):  Bool array of streams to keep in the decompression.
        mpi_comm (MPI.Comm):  If specified, the communicator over which to
            distribute the leading dimension.
        mpi_dist (array):  If specified, assign blocks of these sizes to processes
            when distributing the leading dimension.
        no_flatten (bool):  If True, for single-stream arrays, leave the leading
            dimension of (1,) in the result.

    Returns:
        (FlacArray):  A newly constructed FlacArray.

    """
    (
        local_shape,
        global_shape,
        compressed,
        n_channels,
        stream_starts,
        stream_nbytes,
        stream_offsets,
        stream_gains,
        mpi_dist,
        keep_indices,
    ) = hdf5_read_compressed(
        hgrp,
        keep=keep,
        mpi_comm=mpi_comm,
        mpi_dist=mpi_dist,
    )

    dt = compressed_dtype(n_channels, stream_offsets, stream_gains)

    if (len(local_shape) == 2 and local_shape[0] == 1) and not no_flatten:
        # Flatten
        shape = (local_shape[1],)
    else:
        shape = local_shape

    return FlacArray(
        None,
        shape=shape,
        global_shape=global_shape,
        compressed=compressed,
        dtype=dt,
        stream_starts=stream_starts,
        stream_nbytes=stream_nbytes,
        stream_offsets=stream_offsets,
        stream_gains=stream_gains,
        mpi_comm=mpi_comm,
        mpi_dist=mpi_dist,
    )

read_zarr(zgrp, keep=None, mpi_comm=None, mpi_dist=None, no_flatten=False) classmethod

Construct a FlacArray from a Zarr Group.

This function loads all information about the array from a zarr group. If mpi_comm is specified, the created array is distributed over that communicator.

If mpi_dist is specified, it should be an iterable with the number of leading dimension elements assigned to each process. If None, the leading dimension will be distributed uniformly.

If keep is specified, this should be a boolean array with the same shape as the leading dimensions of the original array. True values in this array indicate that the stream should be kept.

If keep is specified, the returned array WILL NOT have the same shape as the original. Instead it will be a 2D array of decompressed streams- the streams corresponding to True values in the keep mask.

Parameters:

Name Type Description Default
zgrp Group

The open Group for reading.

required
keep array

Bool array of streams to keep in the decompression.

None
mpi_comm Comm

If specified, the communicator over which to distribute the leading dimension.

None
mpi_dist array

If specified, assign blocks of these sizes to processes when distributing the leading dimension.

None
no_flatten bool

If True, for single-stream arrays, leave the leading dimension of (1,) in the result.

False

Returns:

Type Description
FlacArray

A newly constructed FlacArray.

Source code in flacarray/array.py
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
@classmethod
def read_zarr(
    cls,
    zgrp,
    keep=None,
    mpi_comm=None,
    mpi_dist=None,
    no_flatten=False,
):
    """Construct a FlacArray from a Zarr Group.

    This function loads all information about the array from a zarr group.  If
    `mpi_comm` is specified, the created array is distributed over that
    communicator.

    If `mpi_dist` is specified, it should be an iterable with the number of leading
    dimension elements assigned to each process.  If None, the leading dimension
    will be distributed uniformly.

    If `keep` is specified, this should be a boolean array with the same shape
    as the leading dimensions of the original array.  True values in this array
    indicate that the stream should be kept.

    If `keep` is specified, the returned array WILL NOT have the same shape as
    the original.  Instead it will be a 2D array of decompressed streams- the
    streams corresponding to True values in the `keep` mask.

    Args:
        zgrp (zarr.Group):  The open Group for reading.
        keep (array):  Bool array of streams to keep in the decompression.
        mpi_comm (MPI.Comm):  If specified, the communicator over which to
            distribute the leading dimension.
        mpi_dist (array):  If specified, assign blocks of these sizes to processes
            when distributing the leading dimension.
        no_flatten (bool):  If True, for single-stream arrays, leave the leading
            dimension of (1,) in the result.

    Returns:
        (FlacArray):  A newly constructed FlacArray.

    """
    (
        local_shape,
        global_shape,
        compressed,
        n_channels,
        stream_starts,
        stream_nbytes,
        stream_offsets,
        stream_gains,
        mpi_dist,
        keep_indices,
    ) = zarr_read_compressed(
        zgrp,
        keep=keep,
        mpi_comm=mpi_comm,
        mpi_dist=mpi_dist,
    )

    dt = compressed_dtype(n_channels, stream_offsets, stream_gains)

    if (len(local_shape) == 2 and local_shape[0] == 1) and not no_flatten:
        # Flatten
        shape = (local_shape[1],)
    else:
        shape = local_shape

    return FlacArray(
        None,
        shape=shape,
        global_shape=global_shape,
        compressed=compressed,
        dtype=dt,
        stream_starts=stream_starts,
        stream_nbytes=stream_nbytes,
        stream_offsets=stream_offsets,
        stream_gains=stream_gains,
        mpi_comm=mpi_comm,
        mpi_dist=mpi_dist,
    )

to_array(keep=None, stream_slice=None, keep_indices=False, use_threads=False)

Decompress local data into a numpy array.

This uses the compressed representation to reconstruct a normal numpy array. The returned data type will be either int32, int64, float32, or float64 depending on the original data type.

If stream_slice is specified, the returned array will have only that range of samples in the final dimension.

If keep is specified, this should be a boolean array with the same shape as the leading dimensions of the original array. True values in this array indicate that the stream should be kept.

If keep is specified, the returned array WILL NOT have the same shape as the original. Instead it will be a 2D array of decompressed streams- the streams corresponding to True values in the keep mask.

If keep_indices is True and keep is specified, then a tuple of two values is returned. The first is the array of decompressed streams. The second is a list of tuples, each of which specifies the indices of the stream in the original array.

Parameters:

Name Type Description Default
keep array

Bool array of streams to keep in the decompression.

None
stream_slice slice

A python slice with step size of one, indicating the sample range to extract from each stream.

None
keep_indices bool

If True, also return the original indices of the streams.

False
use_threads bool

If True, use OpenMP threads to parallelize decoding. This is only beneficial for large arrays.

False
Source code in flacarray/array.py
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
def to_array(
    self,
    keep=None,
    stream_slice=None,
    keep_indices=False,
    use_threads=False,
):
    """Decompress local data into a numpy array.

    This uses the compressed representation to reconstruct a normal numpy
    array.  The returned data type will be either int32, int64, float32, or
    float64 depending on the original data type.

    If `stream_slice` is specified, the returned array will have only that
    range of samples in the final dimension.

    If `keep` is specified, this should be a boolean array with the same shape
    as the leading dimensions of the original array.  True values in this array
    indicate that the stream should be kept.

    If `keep` is specified, the returned array WILL NOT have the same shape as
    the original.  Instead it will be a 2D array of decompressed streams- the
    streams corresponding to True values in the `keep` mask.

    If `keep_indices` is True and `keep` is specified, then a tuple of two values
    is returned.  The first is the array of decompressed streams.  The second is
    a list of tuples, each of which specifies the indices of the stream in the
    original array.

    Args:
        keep (array):  Bool array of streams to keep in the decompression.
        stream_slice (slice):  A python slice with step size of one, indicating
            the sample range to extract from each stream.
        keep_indices (bool):  If True, also return the original indices of the
            streams.
        use_threads (bool):  If True, use OpenMP threads to parallelize decoding.
            This is only beneficial for large arrays.

    """
    first_samp = None
    last_samp = None
    if stream_slice is not None:
        if stream_slice.step is not None and stream_slice.step != 1:
            raise RuntimeError(
                "Only stream slices with a step size of 1 are supported"
            )
        first_samp = stream_slice.start
        last_samp = stream_slice.stop

    arr, indices = array_decompress_slice(
        self._compressed,
        self._stream_size,
        self._stream_starts,
        self._stream_nbytes,
        stream_offsets=self._stream_offsets,
        stream_gains=self._stream_gains,
        keep=keep,
        first_stream_sample=first_samp,
        last_stream_sample=last_samp,
        is_int64=self._is_int64,
        use_threads=use_threads,
        no_flatten=(not self._flatten_single),
    )
    if keep is not None and keep_indices:
        return (arr, indices)
    else:
        return arr

write_hdf5(hgrp)

Write data to an HDF5 Group.

The internal object properties are written to an open HDF5 group. If you wish to use MPI I/O to write data to the group, then you must be using an MPI enabled h5py and you should pass in a valid handle to the group on all processes.

If the FlacArray is distributed over an MPI communicator, but the h5py implementation does not support MPI I/O, then all data will be communicated to the rank zero process for writing. In this case, the hgrp argument should be None except on the root process.

Parameters:

Name Type Description Default
hgrp Group

The open Group for writing.

required

Returns:

Type Description

None

Source code in flacarray/array.py
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
def write_hdf5(self, hgrp):
    """Write data to an HDF5 Group.

    The internal object properties are written to an open HDF5 group.  If you
    wish to use MPI I/O to write data to the group, then you must be using an MPI
    enabled h5py and you should pass in a valid handle to the group on all
    processes.

    If the `FlacArray` is distributed over an MPI communicator, but the h5py
    implementation does not support MPI I/O, then all data will be communicated
    to the rank zero process for writing.  In this case, the `hgrp` argument should
    be None except on the root process.

    Args:
        hgrp (h5py.Group):  The open Group for writing.

    Returns:
        None

    """
    if self._is_int64:
        n_channels = 2
    else:
        n_channels = 1

    hdf5_write_compressed(
        hgrp,
        self._leading_shape,
        self._global_leading_shape,
        self._stream_size,
        self._stream_starts,
        self._global_stream_starts,
        self._stream_nbytes,
        self._stream_offsets,
        self._stream_gains,
        self._compressed,
        n_channels,
        self._compressed.nbytes,
        self._global_nbytes,
        self._global_proc_nbytes,
        self._mpi_comm,
        self._mpi_dist,
    )

write_zarr(zgrp)

Write data to an Zarr Group.

The internal object properties are written to an open zarr group.

If the FlacArray is distributed over an MPI communicator, then all data will be communicated to the rank zero process for writing. In this case, the zgrp argument should be None except on the root process.

Parameters:

Name Type Description Default
zgrp Group

The open Group for writing.

required

Returns:

Type Description

None

Source code in flacarray/array.py
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
def write_zarr(self, zgrp):
    """Write data to an Zarr Group.

    The internal object properties are written to an open zarr group.

    If the `FlacArray` is distributed over an MPI communicator, then all data will
    be communicated to the rank zero process for writing.  In this case, the `zgrp`
    argument should be None except on the root process.

    Args:
        zgrp (zarr.Group):  The open Group for writing.

    Returns:
        None

    """
    if self._is_int64:
        n_channels = 2
    else:
        n_channels = 1
    zarr_write_compressed(
        zgrp,
        self._leading_shape,
        self._global_leading_shape,
        self._stream_size,
        self._stream_starts,
        self._global_stream_starts,
        self._stream_nbytes,
        self._stream_offsets,
        self._stream_gains,
        self._compressed,
        n_channels,
        self._compressed.nbytes,
        self._global_nbytes,
        self._global_proc_nbytes,
        self._mpi_comm,
        self._mpi_dist,
    )

Direct I/O

Sometimes code has no need to store compressed arrays in memory. Instead, it may be desirable to have full arrays in memory and compressed arrays on disk. In those situations, you can use several helper functions to write and read numpy arrays directly to / from files.

HDF5

You can write to / read from an h5py Group using functions in the hdf5 submodule.

flacarray.hdf5.write_array(arr, hgrp, level=5, quanta=None, precision=None, mpi_comm=None, use_threads=False)

Compress a numpy array and write to an HDF5 group.

This function is useful if you do not need to access the compressed array in memory and only wish to write it directly to HDF5. The input array is compressed and then the write_compressed() function is called.

If the input array is int32 or int64, the compression is lossless and the compressed bytes and ancillary data is written to datasets within the output group. If the array is float32 or float64, either the quanta or precision must be specified. See discussion in the FlacArray class documentation about how the offsets and gains are computed for a given quanta. The offsets and gains are also written as datasets within the output group.

Parameters:

Name Type Description Default
arr array

The input numpy array.

required
hgrp Group

The Group to use.

required
level int

Compression level (0-8).

5
quanta (float, array)

For floating point data, the floating point increment of each 32bit integer value. Optionally an iterable of increments, one per stream.

None
precision (int, array)

Number of significant digits to retain in float-to-int conversion. Alternative to quanta. Optionally an iterable of values, one per stream.

None
mpi_comm Comm

If specified, the input array is assumed to be distributed across the communicator at the leading dimension. The local piece of the array is passed in on each process.

None
use_threads bool

If True, use OpenMP threads to parallelize decoding. This is only beneficial for large arrays.

False

Returns:

Type Description

None

Source code in flacarray/hdf5.py
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
@function_timer
def write_array(
    arr, hgrp, level=5, quanta=None, precision=None, mpi_comm=None, use_threads=False
):
    """Compress a numpy array and write to an HDF5 group.

    This function is useful if you do not need to access the compressed array in memory
    and only wish to write it directly to HDF5.  The input array is compressed and then
    the `write_compressed()` function is called.

    If the input array is int32 or int64, the compression is lossless and the compressed
    bytes and ancillary data is written to datasets within the output group.  If the
    array is float32 or float64, either the `quanta` or `precision` must be specified.
    See discussion in the `FlacArray` class documentation about how the offsets and
    gains are computed for a given quanta.  The offsets and gains are also written as
    datasets within the output group.

    Args:
        arr (array):  The input numpy array.
        hgrp (h5py.Group):  The Group to use.
        level (int):  Compression level (0-8).
        quanta (float, array):  For floating point data, the floating point
            increment of each 32bit integer value.  Optionally an iterable of
            increments, one per stream.
        precision (int, array):  Number of significant digits to retain in
            float-to-int conversion.  Alternative to `quanta`.  Optionally an
            iterable of values, one per stream.
        mpi_comm (MPI.Comm):  If specified, the input array is assumed to be
            distributed across the communicator at the leading dimension.  The
            local piece of the array is passed in on each process.
        use_threads (bool):  If True, use OpenMP threads to parallelize decoding.
            This is only beneficial for large arrays.

    Returns:
        None

    """
    if not have_hdf5:
        raise RuntimeError("h5py is not importable, cannot write to HDF5")

    # Get the global shape of the array
    global_props = global_array_properties(arr.shape, mpi_comm=mpi_comm)
    global_shape = global_props["shape"]
    mpi_dist = global_props["dist"]

    # Get the number of channels
    if arr.dtype == np.dtype(np.int64) or arr.dtype == np.dtype(np.float64):
        n_channels = 2
    else:
        n_channels = 1

    # Compress our local piece of the array
    compressed, starts, nbytes, offsets, gains = array_compress(
        arr, level=level, quanta=quanta, precision=precision, use_threads=use_threads
    )

    local_nbytes = compressed.nbytes
    global_nbytes, global_proc_bytes, global_starts = global_bytes(
        local_nbytes, starts, mpi_comm
    )
    stream_size = arr.shape[-1]

    if len(arr.shape) == 1:
        leading_shape = (1,)
    else:
        leading_shape = arr.shape[:-1]
    global_leading_shape = global_shape[:-1]

    write_compressed(
        hgrp,
        leading_shape,
        global_leading_shape,
        stream_size,
        starts,
        global_starts,
        nbytes,
        offsets,
        gains,
        compressed,
        n_channels,
        local_nbytes,
        global_nbytes,
        global_proc_bytes,
        mpi_comm,
        mpi_dist,
    )

flacarray.hdf5.read_array(hgrp, keep=None, stream_slice=None, keep_indices=False, mpi_comm=None, mpi_dist=None, use_threads=False)

Load a numpy array from compressed HDF5.

This function is useful if you do not need to store a compressed representation of the array in memory. Each stream will be read individually from the file and the desired slice decompressed. This avoids storing the full compressed data.

This function acts as a dispatch to the correct version of the reading function. The function is selected based on the format version string in the data.

If stream_slice is specified, the returned array will have only that range of samples in the final dimension.

If keep is specified, this should be a boolean array with the same shape as the leading dimensions of the original array. True values in this array indicate that the stream should be kept.

If keep is specified, the returned array WILL NOT have the same shape as the original. Instead it will be a 2D array of decompressed streams- the streams corresponding to True values in the keep mask.

If keep_indices is True and keep is specified, then an additional list is returned containing the indices of each stream that was kept.

Parameters:

Name Type Description Default
hgrp Group

The group to read.

required
keep array

Bool array of streams to keep in the decompression.

None
stream_slice slice

A python slice with step size of one, indicating the sample range to extract from each stream.

None
keep_indices bool

If True, also return the original indices of the streams.

False
mpi_comm Comm

The optional MPI communicator over which to distribute the leading dimension of the array.

None
mpi_dist list

The optional list of tuples specifying the first / last element of the leading dimension to assign to each process.

None
use_threads bool

If True, use OpenMP threads to parallelize decoding. This is only beneficial for large arrays.

False

Returns:

Type Description
array

The loaded and decompressed data OR the array and the kept indices.

Source code in flacarray/hdf5.py
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
@function_timer
def read_array(
    hgrp,
    keep=None,
    stream_slice=None,
    keep_indices=False,
    mpi_comm=None,
    mpi_dist=None,
    use_threads=False,
):
    """Load a numpy array from compressed HDF5.

    This function is useful if you do not need to store a compressed representation
    of the array in memory.  Each stream will be read individually from the file and
    the desired slice decompressed.  This avoids storing the full compressed data.

    This function acts as a dispatch to the correct version of the reading function.
    The function is selected based on the format version string in the data.

    If `stream_slice` is specified, the returned array will have only that
    range of samples in the final dimension.

    If `keep` is specified, this should be a boolean array with the same shape
    as the leading dimensions of the original array.  True values in this array
    indicate that the stream should be kept.

    If `keep` is specified, the returned array WILL NOT have the same shape as
    the original.  Instead it will be a 2D array of decompressed streams- the
    streams corresponding to True values in the `keep` mask.

    If `keep_indices` is True and `keep` is specified, then an additional list
    is returned containing the indices of each stream that was kept.

    Args:
        hgrp (h5py.Group):  The group to read.
        keep (array):  Bool array of streams to keep in the decompression.
        stream_slice (slice):  A python slice with step size of one, indicating
            the sample range to extract from each stream.
        keep_indices (bool):  If True, also return the original indices of the
            streams.
        mpi_comm (MPI.Comm):  The optional MPI communicator over which to distribute
            the leading dimension of the array.
        mpi_dist (list):  The optional list of tuples specifying the first / last
            element of the leading dimension to assign to each process.
        use_threads (bool):  If True, use OpenMP threads to parallelize decoding.
            This is only beneficial for large arrays.

    Returns:
        (array):  The loaded and decompressed data OR the array and the kept indices.

    """
    if not have_hdf5:
        raise RuntimeError("h5py is not importable, cannot write to HDF5")

    format_version = None
    if hgrp is not None:
        if "flacarray_format_version" in hgrp.attrs:
            format_version = hgrp.attrs["flacarray_format_version"]
    if mpi_comm is not None:
        format_version = mpi_comm.bcast(format_version, root=0)
    if format_version is None:
        raise RuntimeError("h5py Group does not contain a FlacArray")

    mod_name = f".hdf5_load_v{format_version}"
    mod = importlib.import_module(mod_name, package="flacarray")
    read_func = getattr(mod, "read_array")
    return read_func(
        hgrp,
        keep=keep,
        stream_slice=stream_slice,
        keep_indices=keep_indices,
        mpi_comm=mpi_comm,
        mpi_dist=mpi_dist,
        use_threads=use_threads,
        no_flatten=False,
    )

Zarr

You can write to / read from a zarr hierarch Group using functions in the zarr submodule.

flacarray.zarr.write_array(arr, zgrp, level=5, quanta=None, precision=None, mpi_comm=None, use_threads=False)

Compress a numpy array and write to an Zarr group.

This function is useful if you do not need to access the compressed array in memory and only wish to write it directly to Zarr files. The input array is compressed and then the write_compressed() function is called.

If the input array is int32 or int64, the compression is lossless and the compressed bytes and ancillary data is written to datasets within the output group. If the array is float32 or float64, either the quanta or precision must be specified. See discussion in the FlacArray class documentation about how the offsets and gains are computed for a given quanta. The offsets and gains are also written as datasets within the output group.

Parameters:

Name Type Description Default
arr array

The input numpy array.

required
zgrp Group

The Group to use.

required
level int

Compression level (0-8).

5
quanta (float, array)

For floating point data, the floating point increment of each 32bit integer value. Optionally an iterable of increments, one per stream.

None
precision (int, array)

Number of significant digits to retain in float-to-int conversion. Alternative to quanta. Optionally an iterable of values, one per stream.

None
mpi_comm Comm

If specified, the input array is assumed to be distributed across the communicator at the leading dimension. The local piece of the array is passed in on each process.

None
use_threads bool

If True, use OpenMP threads to parallelize decoding. This is only beneficial for large arrays.

False

Returns:

Type Description

None

Source code in flacarray/zarr.py
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
@function_timer
def write_array(
    arr, zgrp, level=5, quanta=None, precision=None, mpi_comm=None, use_threads=False
):
    """Compress a numpy array and write to an Zarr group.

    This function is useful if you do not need to access the compressed array in memory
    and only wish to write it directly to Zarr files.  The input array is compressed
    and then the `write_compressed()` function is called.

    If the input array is int32 or int64, the compression is lossless and the compressed
    bytes and ancillary data is written to datasets within the output group.  If the
    array is float32 or float64, either the `quanta` or `precision` must be specified.
    See discussion in the `FlacArray` class documentation about how the offsets and
    gains are computed for a given quanta.  The offsets and gains are also written as
    datasets within the output group.

    Args:
        arr (array):  The input numpy array.
        zgrp (zarr.Group):  The Group to use.
        level (int):  Compression level (0-8).
        quanta (float, array):  For floating point data, the floating point
            increment of each 32bit integer value.  Optionally an iterable of
            increments, one per stream.
        precision (int, array):  Number of significant digits to retain in
            float-to-int conversion.  Alternative to `quanta`.  Optionally an
            iterable of values, one per stream.
        mpi_comm (MPI.Comm):  If specified, the input array is assumed to be
            distributed across the communicator at the leading dimension.  The
            local piece of the array is passed in on each process.
        use_threads (bool):  If True, use OpenMP threads to parallelize decoding.
            This is only beneficial for large arrays.

    Returns:
        None

    """
    if not have_zarr:
        raise RuntimeError("zarr is not importable, cannot write to zarr.Group")

    # Get the global shape of the array
    global_props = global_array_properties(arr.shape, mpi_comm=mpi_comm)
    global_shape = global_props["shape"]
    mpi_dist = global_props["dist"]

    # Get the number of channels
    if arr.dtype == np.dtype(np.int64) or arr.dtype == np.dtype(np.float64):
        n_channels = 2
    else:
        n_channels = 1

    # Compress our local piece of the array
    compressed, starts, nbytes, offsets, gains = array_compress(
        arr, level=level, quanta=quanta, precision=precision, use_threads=use_threads
    )

    local_nbytes = compressed.nbytes
    global_nbytes, global_proc_bytes, global_starts = global_bytes(
        local_nbytes, starts, mpi_comm
    )
    stream_size = arr.shape[-1]

    if len(arr.shape) == 1:
        leading_shape = (1,)
    else:
        leading_shape = arr.shape[:-1]
    global_leading_shape = global_shape[:-1]

    write_compressed(
        zgrp,
        leading_shape,
        global_leading_shape,
        stream_size,
        starts,
        global_starts,
        nbytes,
        offsets,
        gains,
        compressed,
        n_channels,
        local_nbytes,
        global_nbytes,
        global_proc_bytes,
        mpi_comm,
        mpi_dist,
    )

flacarray.zarr.read_array(zgrp, keep=None, stream_slice=None, keep_indices=False, mpi_comm=None, mpi_dist=None, use_threads=False, no_flatten=False)

Load a numpy array from a compressed Zarr group.

This function is useful if you do not need to store a compressed representation of the array in memory. Each stream will be read individually from the file and the desired slice decompressed. This avoids storing the full compressed data.

This function acts as a dispatch to the correct version of the reading function. The function is selected based on the format version string in the data.

If stream_slice is specified, the returned array will have only that range of samples in the final dimension.

If keep is specified, this should be a boolean array with the same shape as the leading dimensions of the original array. True values in this array indicate that the stream should be kept.

If keep is specified, the returned array WILL NOT have the same shape as the original. Instead it will be a 2D array of decompressed streams- the streams corresponding to True values in the keep mask.

If keep_indices is True and keep is specified, then an additional list is returned containing the indices of each stream that was kept.

Parameters:

Name Type Description Default
zgrp Group

The group to read.

required
keep array

Bool array of streams to keep in the decompression.

None
stream_slice slice

A python slice with step size of one, indicating the sample range to extract from each stream.

None
keep_indices bool

If True, also return the original indices of the streams.

False
mpi_comm Comm

The optional MPI communicator over which to distribute the leading dimension of the array.

None
mpi_dist list

The optional list of tuples specifying the first / last element of the leading dimension to assign to each process.

None
use_threads bool

If True, use OpenMP threads to parallelize decoding. This is only beneficial for large arrays.

False
no_flatten bool

If True, for single-stream arrays, leave the leading dimension of (1,) in the result.

False

Returns:

Type Description
array

The loaded and decompressed data OR the array and the kept indices.

Source code in flacarray/zarr.py
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
@function_timer
def read_array(
    zgrp,
    keep=None,
    stream_slice=None,
    keep_indices=False,
    mpi_comm=None,
    mpi_dist=None,
    use_threads=False,
    no_flatten=False,
):
    """Load a numpy array from a compressed Zarr group.

    This function is useful if you do not need to store a compressed representation
    of the array in memory.  Each stream will be read individually from the file and
    the desired slice decompressed.  This avoids storing the full compressed data.

    This function acts as a dispatch to the correct version of the reading function.
    The function is selected based on the format version string in the data.

    If `stream_slice` is specified, the returned array will have only that
    range of samples in the final dimension.

    If `keep` is specified, this should be a boolean array with the same shape
    as the leading dimensions of the original array.  True values in this array
    indicate that the stream should be kept.

    If `keep` is specified, the returned array WILL NOT have the same shape as
    the original.  Instead it will be a 2D array of decompressed streams- the
    streams corresponding to True values in the `keep` mask.

    If `keep_indices` is True and `keep` is specified, then an additional list
    is returned containing the indices of each stream that was kept.

    Args:
        zgrp (zarr.Group):  The group to read.
        keep (array):  Bool array of streams to keep in the decompression.
        stream_slice (slice):  A python slice with step size of one, indicating
            the sample range to extract from each stream.
        keep_indices (bool):  If True, also return the original indices of the
            streams.
        mpi_comm (MPI.Comm):  The optional MPI communicator over which to distribute
            the leading dimension of the array.
        mpi_dist (list):  The optional list of tuples specifying the first / last
            element of the leading dimension to assign to each process.
        use_threads (bool):  If True, use OpenMP threads to parallelize decoding.
            This is only beneficial for large arrays.
        no_flatten (bool):  If True, for single-stream arrays, leave the leading
            dimension of (1,) in the result.

    Returns:
        (array):  The loaded and decompressed data OR the array and the kept indices.

    """
    if not have_zarr:
        raise RuntimeError("zarr is not importable, cannot write to a Zarr Group")

    format_version = None
    if zgrp is not None:
        if "flacarray_format_version" in zgrp.attrs:
            format_version = zgrp.attrs["flacarray_format_version"]
    if mpi_comm is not None:
        format_version = mpi_comm.bcast(format_version, root=0)
    if format_version is None:
        raise RuntimeError("Zarr Group does not contain a FlacArray")

    mod_name = f".zarr_load_v{format_version}"
    mod = importlib.import_module(mod_name, package="flacarray")
    read_func = getattr(mod, "read_array")
    return read_func(
        zgrp,
        keep=keep,
        stream_slice=stream_slice,
        keep_indices=keep_indices,
        mpi_comm=mpi_comm,
        mpi_dist=mpi_dist,
        use_threads=use_threads,
        no_flatten=False,
    )

Interactive Tools

The flacarray.demo submodule contains a few helper functions that are not imported by default. You will need to have optional dependencies (matplotlib) installed to use the visualization tools. For testing, it is convenient to generate arrays consisting of random timestreams with some structure. The create_fake_data function can be used for this.

flacarray.demo.create_fake_data(local_shape, sigma=1.0, dtype=np.float64, seed=123456789, comm=None, dc_sigma=5)

Create fake random data for testing.

This is a helper function to generate some random data for testing. if sigma is None, uniform randoms are return. If sigma is not None, samples drawn from a Gaussian distribution are returned.

If comm is not None, the data is created on one process and then pieces are distributed among the processes.

Parameters:

Name Type Description Default
local_shape tuple

The local shape of the data on this process.

required
sigma float

The width of the distribution or None.

1.0
dtype dtype

The data type of the returned array.

float64
seed int

The optional seed for np.random.

123456789
comm Comm

The MPI communicator or None.

None

Returns:

Type Description
tuple

(The random data on the local process, MPI distribution).

Source code in flacarray/demo.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def create_fake_data(
    local_shape, sigma=1.0, dtype=np.float64, seed=123456789, comm=None, dc_sigma=5,
):
    """Create fake random data for testing.

    This is a helper function to generate some random data for testing.
    if `sigma` is None, uniform randoms are return.  If sigma is not None,
    samples drawn from a Gaussian distribution are returned.

    If `comm` is not None, the data is created on one process and then pieces are
    distributed among the processes.

    Args:
        local_shape (tuple):  The local shape of the data on this process.
        sigma (float):  The width of the distribution or None.
        dtype (np.dtype):  The data type of the returned array.
        seed (int):  The optional seed for np.random.
        comm (MPI.Comm):  The MPI communicator or None.

    Returns:
        (tuple):  (The random data on the local process, MPI distribution).

    """
    if comm is None:
        rank = 0
    else:
        rank = comm.rank

    # Get the global array properties
    gprops = global_array_properties(local_shape, comm)
    shape = gprops["shape"]
    mpi_dist = gprops["dist"]

    flatshape = np.prod(shape)
    stream_size = shape[-1]
    leading_shape = shape[:-1]
    leading_shape_ext = leading_shape + (1,)

    rng = np.random.default_rng(seed=seed)
    global_data = None
    if rank == 0:
        if sigma is None:
            # Uniform randoms. Verify that we can fully encode the high / low
            # values by setting a few samples to those extremes.
            if dtype == np.dtype(np.int64) or dtype == np.dtype(np.int32):
                low = np.iinfo(dtype).min
                high = np.iinfo(dtype).max
                flat_data = rng.integers(
                    low=low, high=high, size=flatshape, dtype=np.int64
                ).astype(dtype)
            else:
                low = np.finfo(dtype).min
                high = np.finfo(dtype).max
                flat_data = rng.uniform(
                    low=low, high=high, size=flatshape, dtype=np.float64
                ).astype(dtype)
            flat_data[0] = low
            flat_data[1] = high
            global_data = flat_data.reshape(shape)
        else:
            # Construct a random DC level for each stream.
            if dc_sigma is None:
                dc = 0
            else:
                dc = dc_sigma * sigma * (rng.random(size=leading_shape_ext) - 0.5)

            # Construct a simple low frequency waveform (assume 1Hz sampling)
            wave = np.zeros(stream_size, dtype=dtype)
            t = np.arange(stream_size)
            minf = 5 / stream_size
            for freq, amp in zip([3 * minf, minf], [2 * sigma, 6 * sigma]):
                wave[:] += amp * np.sin(2 * np.pi * freq * t)

            # Initialize all streams to a scaled version of this waveform plus
            # the DC level
            scale = rng.random(size=leading_shape_ext)
            global_data = np.empty(shape, dtype=dtype)
            if len(leading_shape) == 0:
                global_data[:] = dc
                global_data[:] += scale * wave
            else:
                leading_slc = tuple([slice(None) for x in leading_shape])
                global_data[leading_slc] = dc
                global_data[leading_slc] += scale * wave

            # Add some Gaussian random noise to each stream
            global_data[:] += rng.normal(0.0, sigma, flatshape).reshape(shape)
    if comm is not None:
        global_data = comm.bcast(global_data, root=0)

    # Extract our local piece of the global data
    if len(leading_shape) == 0 or (len(leading_shape) == 1 and leading_shape[0] == 1):
        data = global_data
    else:
        local_start = mpi_dist[rank][0]
        local_stop = mpi_dist[rank][1]
        local_slice = [slice(local_start, local_stop, 1)]
        local_slice.extend([slice(None) for x in shape[1:]])
        local_slice = tuple(local_slice)
        data = global_data[local_slice]
    if len(data.shape) == 2 and data.shape[0] == 1:
        data = data.reshape((-1))

    return data, mpi_dist

Most data arrays in practice have 2 or 3 dimensions. If the number of streams is relatively small, then an uncompressed array can be plotted with the plot_data function.

flacarray.demo.plot_data(data, keep=None, stream_slc=slice(None), file=None)

Source code in flacarray/demo.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
def plot_data(data, keep=None, stream_slc=slice(None), file=None):
    # We only import matplotlib if we are actually going to make some plots.
    # This is not a required package.
    import matplotlib.pyplot as plt

    if len(data.shape) > 3:
        raise NotImplementedError("Can only plot 1D and 2D arrays of streams")

    if len(data.shape) == 1:
        plot_rows = 1
        plot_cols = 1
    elif len(data.shape) == 2:
        plot_rows = data.shape[0]
        plot_cols = 1
    else:
        plot_rows = data.shape[1]
        plot_cols = data.shape[0]

    fig_dpi = 100
    fig_width = 6 * plot_cols
    fig_height = 4 * plot_rows
    fig = plt.figure(figsize=(fig_width, fig_height), dpi=fig_dpi)
    if len(data.shape) == 1:
        # Single stream
        ax = fig.add_subplot(1, 1, 1, aspect="auto")
        ax.plot(data[stream_slc])
    elif len(data.shape) == 2:
        # 1-D array of streams, plot vertically
        for iplot in range(data.shape[0]):
            ax = fig.add_subplot(plot_rows, 1, iplot + 1, aspect="auto")
            ax.plot(data[iplot, stream_slc])
    else:
        # 2-D array of streams, plot in a grid
        for row in range(plot_rows):
            for col in range(plot_cols):
                slc = (col, row, stream_slc)
                ax = fig.add_subplot(
                    plot_rows, plot_cols, row * plot_cols + col + 1, aspect="auto"
                )
                ax.plot(data[slc], color="black")
    if file is None:
        plt.show()
    else:
        plt.savefig(file)
        plt.close()

Low-Level Tools

For specialized use cases, you can also work directly with the compressed bytestream and auxiliary arrays and convert to / from numpy arrays.

flacarray.compress.array_compress(arr, level=5, quanta=None, precision=None, use_threads=False)

Compress a numpy array with optional floating point conversion.

If arr is an int32 array, the returned stream offsets and gains will be None. if arr is an int64 array, the returned stream offsets and gains will be None and the calling code is responsible for tracking that the compressed bytes are associated with a 64bit stream.

If the input array is float32 or float64, exactly one of quanta or precision must be specified. Both float32 and float64 data will have floating point offset and gain arrays returned. See discussion in the FlacArray class documentation about how the offsets and gains are computed for a given quanta.

The shape of the returned auxiliary arrays (starts, nbytes, etc) will have a shape corresponding to the leading shape of the input array. If the input array is a single stream, the returned auxiliary information will be arrays with a single element.

Parameters:

Name Type Description Default
arr ndarray

The input array data.

required
level int

Compression level (0-8).

5
quanta (float, array)

For floating point data, the floating point increment of each integer value. Optionally an array of increments, one per stream.

None
precision (int, array)

Number of significant digits to retain in float-to-int conversion. Alternative to quanta. Optionally an iterable of values, one per stream.

None
use_threads bool

If True, use OpenMP threads to parallelize decoding. This is only beneficial for large arrays.

False

Returns:

Type Description
tuple

The (compressed bytes, stream starts, stream_nbytes, stream offsets, stream gains)

Source code in flacarray/compress.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
@function_timer
def array_compress(arr, level=5, quanta=None, precision=None, use_threads=False):
    """Compress a numpy array with optional floating point conversion.

    If `arr` is an int32 array, the returned stream offsets and gains will be None.
    if `arr` is an int64 array, the returned stream offsets and gains will be None and
    the calling code is responsible for tracking that the compressed bytes are
    associated with a 64bit stream.

    If the input array is float32 or float64, exactly one of quanta or precision
    must be specified.  Both float32 and float64 data will have floating point offset
    and gain arrays returned.  See discussion in the `FlacArray` class documentation
    about how the offsets and gains are computed for a given quanta.

    The shape of the returned auxiliary arrays (starts, nbytes, etc) will have a shape
    corresponding to the leading shape of the input array.  If the input array is a
    single stream, the returned auxiliary information will be arrays with a single
    element.

    Args:
        arr (numpy.ndarray):  The input array data.
        level (int):  Compression level (0-8).
        quanta (float, array):  For floating point data, the floating point
            increment of each integer value.  Optionally an array of increments,
            one per stream.
        precision (int, array):  Number of significant digits to retain in
            float-to-int conversion.  Alternative to `quanta`.  Optionally an
            iterable of values, one per stream.
        use_threads (bool):  If True, use OpenMP threads to parallelize decoding.
            This is only beneficial for large arrays.

    Returns:
        (tuple): The (compressed bytes, stream starts, stream_nbytes, stream offsets,
            stream gains)

    """
    if arr.size == 0:
        raise ValueError("Cannot compress a zero-sized array!")
    leading_shape = arr.shape[:-1]

    if arr.dtype == np.dtype(np.float32) or arr.dtype == np.dtype(np.float64):
        # Floating point data
        if quanta is None and precision is None:
            msg = f"Compressing floating point data ('{arr.dtype}') "
            msg = "requires specifying either quanta or precision."
            raise RuntimeError(msg)
        if quanta is not None:
            if precision is not None:
                raise RuntimeError("Cannot set both quanta and precision")
            try:
                nq = len(quanta)
                # This is an array
                if nq.shape != leading_shape:
                    msg = "If not a scalar, quanta must have the same shape as the "
                    msg += "leading dimensions of the array"
                    raise ValueError(msg)
                dquanta = quanta.astype(arr.dtype)
            except TypeError:
                # This is a scalar, applied to all detectors
                dquanta = quanta * np.ones(leading_shape, dtype=arr.dtype)
        else:
            # We are using precision instead
            dquanta = None
        idata, foff, gains = float_to_int(arr, quanta=dquanta, precision=precision)
        (compressed, starts, nbytes) = encode_flac(
            idata, level, use_threads=use_threads
        )
        return (compressed, starts, nbytes, foff, gains)
    elif arr.dtype == np.dtype(np.int32) or arr.dtype == np.dtype(np.int64):
        # Integer data
        (compressed, starts, nbytes) = encode_flac(arr, level, use_threads=use_threads)
        return (compressed, starts, nbytes, None, None)
    else:
        raise ValueError(f"Unsupported data type '{arr.dtype}'")

flacarray.decompress.array_decompress(compressed, stream_size, stream_starts, stream_nbytes, stream_offsets=None, stream_gains=None, first_stream_sample=None, last_stream_sample=None, is_int64=False, use_threads=False, no_flatten=False)

Decompress a FLAC encoded array and restore original data type.

If both stream_gains and stream_offsets are specified, the output will be floating point data. If neither is specified, the output will be integer data. It is an error to specify only one of those options.

The compressed byte stream might contain either int32 or int64 data, and the calling code is responsible for tracking this. The is_int64 parameter should be set to True if the byte stream contains 64bit integers.

To decompress a subset of samples in all streams, specify the first_stream_sample and last_stream_sample values. None values or negative values disable this feature.

Parameters:

Name Type Description Default
compressed array

The array of compressed bytes.

required
stream_size int

The length of the decompressed final dimension.

required
stream_starts array

The array of starting bytes in the bytestream.

required
stream_nbytes array

The array of number of bytes in each stream.

required
stream_offsets array

The array of offsets, one per stream.

None
stream_gains array

The array of gains, one per stream.

None
first_stream_sample int

The first sample of every stream to decompress.

None
last_stream_sample int

The last sample of every stream to decompress.

None
is_int64 bool

If True, the compressed stream contains 64bit integers.

False
use_threads bool

If True, use OpenMP threads to parallelize decoding. This is only beneficial for large arrays.

False
no_flatten bool

If True, for single-stream arrays, leave the leading dimension of (1,) in the result.

False

Returns:

Type Description
array

The output array.

Source code in flacarray/decompress.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
@function_timer
def array_decompress(
    compressed,
    stream_size,
    stream_starts,
    stream_nbytes,
    stream_offsets=None,
    stream_gains=None,
    first_stream_sample=None,
    last_stream_sample=None,
    is_int64=False,
    use_threads=False,
    no_flatten=False,
):
    """Decompress a FLAC encoded array and restore original data type.

    If both `stream_gains` and `stream_offsets` are specified, the output will be
    floating point data.  If neither is specified, the output will be integer data.
    It is an error to specify only one of those options.

    The compressed byte stream might contain either int32 or int64 data, and the calling
    code is responsible for tracking this.  The `is_int64` parameter should be set to
    True if the byte stream contains 64bit integers.

    To decompress a subset of samples in all streams, specify the `first_stream_sample`
    and `last_stream_sample` values.  None values or negative values disable this
    feature.

    Args:
        compressed (array):  The array of compressed bytes.
        stream_size (int):  The length of the decompressed final dimension.
        stream_starts (array):  The array of starting bytes in the bytestream.
        stream_nbytes (array):  The array of number of bytes in each stream.
        stream_offsets (array):  The array of offsets, one per stream.
        stream_gains (array):  The array of gains, one per stream.
        first_stream_sample (int):  The first sample of every stream to decompress.
        last_stream_sample (int):  The last sample of every stream to decompress.
        is_int64 (bool):  If True, the compressed stream contains 64bit integers.
        use_threads (bool):  If True, use OpenMP threads to parallelize decoding.
            This is only beneficial for large arrays.
        no_flatten (bool):  If True, for single-stream arrays, leave the leading
            dimension of (1,) in the result.

    Returns:
        (array): The output array.

    """
    arr, _ = array_decompress_slice(
        compressed,
        stream_size,
        stream_starts,
        stream_nbytes,
        stream_offsets=stream_offsets,
        stream_gains=stream_gains,
        keep=None,
        first_stream_sample=first_stream_sample,
        last_stream_sample=last_stream_sample,
        is_int64=is_int64,
        use_threads=use_threads,
        no_flatten=no_flatten,
    )
    return arr

flacarray.decompress.array_decompress_slice(compressed, stream_size, stream_starts, stream_nbytes, stream_offsets=None, stream_gains=None, keep=None, first_stream_sample=None, last_stream_sample=None, is_int64=False, use_threads=False, no_flatten=False)

Decompress a slice of a FLAC encoded array and restore original data type.

If both stream_gains and stream_offsets are specified, the output will be floating point data. If neither is specified, the output will be integer data. It is an error to specify only one of those options.

The compressed byte stream might contain either int32 or int64 data, and the calling code is responsible for tracking this. The is_int64 parameter should be set to True if the byte stream contains 64bit integers.

To decompress a subset of samples in all streams, specify the first_stream_sample and last_stream_sample values. None values or negative values disable this feature.

To decompress a subset of streams, pass a boolean array to the keep argument. This should have the same shape as the starts array. Only streams with a True value in the keep array will be decompressed.

If the keep array is specified, the output tuple will contain the 2D array of streams that were kept, as well as a list of tuples indicating the original array indices for each stream in the output. If the keep array is None, the output tuple will contain an array with the original N-dimensional leading array shape and the trailing number of samples. The second element of the tuple will be None.

Parameters:

Name Type Description Default
compressed array

The array of compressed bytes.

required
stream_size int

The length of the decompressed final dimension.

required
stream_starts array

The array of starting bytes in the bytestream.

required
stream_nbytes array

The array of number of bytes in each stream.

required
stream_offsets array

The array of offsets, one per stream.

None
stream_gains array

The array of gains, one per stream.

None
keep array

Bool array of streams to keep in the decompression.

None
first_stream_sample int

The first sample of every stream to decompress.

None
last_stream_sample int

The last sample of every stream to decompress.

None
is_int64 bool

If True, the compressed stream contains 64bit integers.

False
use_threads bool

If True, use OpenMP threads to parallelize decoding. This is only beneficial for large arrays.

False
no_flatten bool

If True, for single-stream arrays, leave the leading dimension of (1,) in the result.

False

Returns:

Type Description
tuple

The (output array, list of stream indices).

Source code in flacarray/decompress.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
@function_timer
def array_decompress_slice(
    compressed,
    stream_size,
    stream_starts,
    stream_nbytes,
    stream_offsets=None,
    stream_gains=None,
    keep=None,
    first_stream_sample=None,
    last_stream_sample=None,
    is_int64=False,
    use_threads=False,
    no_flatten=False,
):
    """Decompress a slice of a FLAC encoded array and restore original data type.

    If both `stream_gains` and `stream_offsets` are specified, the output will be
    floating point data.  If neither is specified, the output will be integer data.
    It is an error to specify only one of those options.

    The compressed byte stream might contain either int32 or int64 data, and the calling
    code is responsible for tracking this.  The `is_int64` parameter should be set to
    True if the byte stream contains 64bit integers.

    To decompress a subset of samples in all streams, specify the `first_stream_sample`
    and `last_stream_sample` values.  None values or negative values disable this
    feature.

    To decompress a subset of streams, pass a boolean array to the `keep` argument.
    This should have the same shape as the `starts` array.  Only streams with a True
    value in the `keep` array will be decompressed.

    If the `keep` array is specified, the output tuple will contain the 2D array of
    streams that were kept, as well as a list of tuples indicating the original array
    indices for each stream in the output.  If the `keep` array is None, the output
    tuple will contain an array with the original N-dimensional leading array shape
    and the trailing number of samples.  The second element of the tuple will be None.

    Args:
        compressed (array):  The array of compressed bytes.
        stream_size (int):  The length of the decompressed final dimension.
        stream_starts (array):  The array of starting bytes in the bytestream.
        stream_nbytes (array):  The array of number of bytes in each stream.
        stream_offsets (array):  The array of offsets, one per stream.
        stream_gains (array):  The array of gains, one per stream.
        keep (array):  Bool array of streams to keep in the decompression.
        first_stream_sample (int):  The first sample of every stream to decompress.
        last_stream_sample (int):  The last sample of every stream to decompress.
        is_int64 (bool):  If True, the compressed stream contains 64bit integers.
        use_threads (bool):  If True, use OpenMP threads to parallelize decoding.
            This is only beneficial for large arrays.
        no_flatten (bool):  If True, for single-stream arrays, leave the leading
            dimension of (1,) in the result.

    Returns:
        (tuple): The (output array, list of stream indices).

    """
    if first_stream_sample is None:
        first_stream_sample = -1
    if last_stream_sample is None:
        last_stream_sample = -1

    # If we have one stream, ensure that our auxiliary data are arrays
    is_scalar = False
    if (
        not isinstance(stream_starts, np.ndarray) or
        (len(stream_starts.shape) == 1 and stream_starts.shape[0] == 1)
    ):
        # This is a scalar
        is_scalar = True
        stream_starts = ensure_one_element(stream_starts, np.int64)
        stream_nbytes = ensure_one_element(stream_nbytes, np.int64)
        if stream_offsets is not None:
            # We have float values
            if is_int64:
                stream_offsets = ensure_one_element(stream_offsets, np.float64)
                stream_gains = ensure_one_element(stream_gains, np.float64)
            else:
                stream_offsets = ensure_one_element(stream_offsets, np.float32)
                stream_gains = ensure_one_element(stream_gains, np.float32)

    starts, nbytes, indices = keep_select(keep, stream_starts, stream_nbytes)
    offsets = select_keep_indices(stream_offsets, indices)
    gains = select_keep_indices(stream_gains, indices)

    if stream_offsets is not None:
        if stream_gains is not None:
            # This is floating point data.
            idata = decode_flac(
                compressed,
                starts,
                nbytes,
                stream_size,
                first_sample=first_stream_sample,
                last_sample=last_stream_sample,
                use_threads=use_threads,
                is_int64=is_int64,
            )
            arr = int_to_float(idata, offsets, gains)
        else:
            raise RuntimeError(
                "When specifying offsets, you must also provide the gains"
            )
    else:
        if stream_gains is not None:
            raise RuntimeError(
                "When specifying gains, you must also provide the offsets"
            )
        # This is integer data
        arr = decode_flac(
            compressed,
            starts,
            nbytes,
            stream_size,
            first_sample=first_stream_sample,
            last_sample=last_stream_sample,
            use_threads=use_threads,
            is_int64=is_int64,
        )
    if is_scalar and not no_flatten:
        return (arr.reshape((-1)), indices)
    else:
        return (arr, indices)