Skip to content

Data Formats

TOAST has a built-in data format for storing compressed time ordered data on disk in one or more HDF5 files.

toast.ops.SaveHDF5

Bases: Operator

Operator which saves observations to HDF5.

This creates a file for each observation.

Source code in toast/ops/save_hdf5.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
@trait_docs
class SaveHDF5(Operator):
    """Operator which saves observations to HDF5.

    This creates a file for each observation.

    """

    # Class traits

    API = Int(0, help="Internal interface version for this operator")

    volume = Unicode(
        "toast_out_hdf5",
        allow_none=True,
        help="Top-level directory for the data volume",
    )

    # FIXME:  We should add a filtering mechanism here to dump a subset of
    # observations and / or detectors, as well as figure out subdirectory organization.

    meta = List([], allow_none=True, help="Only save this list of meta objects")

    detdata = List(
        [defaults.det_data, defaults.det_flags],
        help="Only save this list of detdata objects",
    )

    shared = List([], help="Only save this list of shared objects")

    intervals = List([], help="Only save this list of intervals objects")

    times = Unicode(defaults.times, help="Observation shared key for timestamps")

    config = Dict({}, help="Write this job config to the file")

    force_serial = Bool(
        False, help="Use serial HDF5 operations, even if parallel support available"
    )

    detdata_float32 = Bool(
        False, help="If True, convert any float64 detector data to float32 on write."
    )

    detdata_in_place = Bool(
        False,
        help="If True, all compressed detector data will be decompressed and written "
        "over the input data.",
    )

    compress_detdata = Bool(False, help="If True, use FLAC to compress detector signal")

    compress_precision = Int(
        None,
        allow_none=True,
        help="Number of significant digits to retain in detdata compression",
    )

    verify = Bool(False, help="If True, immediately load data back in and verify")

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    @function_timer
    def _exec(self, data, detectors=None, **kwargs):
        log = Logger.get()

        if self.volume is None:
            msg = "You must set the volume trait prior to calling exec()"
            log.error(msg)
            raise RuntimeError(msg)

        # One process creates the top directory
        if data.comm.world_rank == 0:
            os.makedirs(self.volume, exist_ok=True)
        if data.comm.comm_world is not None:
            data.comm.comm_world.barrier()

        meta_fields = None
        if len(self.meta) > 0:
            meta_fields = list(self.meta)

        shared_fields = None
        if len(self.shared) > 0:
            shared_fields = list(self.shared)

        intervals_fields = None
        if len(self.intervals) > 0:
            intervals_fields = list(self.intervals)

        for ob in data.obs:
            # Observations must have a name for this to work
            if ob.name is None:
                raise RuntimeError(
                    "Observations must have a name in order to save to HDF5 format"
                )

            # Check to see if any detector data objects are temporary and have just
            # a partial list of detectors.  Delete these.

            for dd in list(ob.detdata.keys()):
                if ob.detdata[dd].detectors != ob.local_detectors:
                    del ob.detdata[dd]

            if len(self.detdata) > 0:
                detdata_fields = list(self.detdata)
            else:
                detdata_fields = list()

            if self.compress_detdata:
                # Add generic compression instructions to detdata fields
                for ifield, field in enumerate(detdata_fields):
                    if not isinstance(field, str):
                        # Assume user already supplied instructions for this field
                        continue
                    if "flag" in field:
                        # Flags are ZIP-compressed
                        detdata_fields[ifield] = (field, {"type": "gzip"})
                    else:
                        # Everything else is FLAC-compressed
                        detdata_fields[ifield] = (
                            field,
                            {
                                "type": "flac",
                                "level": 5,
                                "precision": self.compress_precision,
                            },
                        )

            outpath = save_hdf5(
                ob,
                self.volume,
                meta=meta_fields,
                detdata=detdata_fields,
                shared=shared_fields,
                intervals=intervals_fields,
                config=self.config,
                times=str(self.times),
                force_serial=self.force_serial,
                detdata_float32=self.detdata_float32,
                detdata_in_place=self.detdata_in_place,
            )

            log.info_rank(f"Wrote {outpath}", comm=data.comm.comm_group)

            if self.verify:
                # We are going to load the data back in, but first we need to make
                # a modified copy of the input observation for comparison.  This is
                # because we may have only a portion of the data on disk and we
                # might have also converted data to 32bit floats.

                loadpath = outpath

                if len(self.detdata) > 0:
                    # There might be compression info
                    if isinstance(detdata_fields[0], (tuple, list)):
                        verify_fields = [x[0] for x in detdata_fields]
                    else:
                        verify_fields = list(detdata_fields)
                else:
                    # We saved nothing
                    verify_fields = list()

                if self.detdata_float32:
                    # We want to duplicate everything *except* float64 detdata
                    # fields.
                    dup_detdata = list()
                    conv_detdata = list()
                    for fld in verify_fields:
                        if ob.detdata[fld].dtype == np.dtype(np.float64):
                            conv_detdata.append(fld)
                        else:
                            dup_detdata.append(fld)
                    original = ob.duplicate(
                        times=str(self.times),
                        meta=meta_fields,
                        shared=shared_fields,
                        detdata=dup_detdata,
                        intervals=intervals_fields,
                    )
                    for fld in conv_detdata:
                        if len(ob.detdata[fld].detector_shape) == 1:
                            sample_shape = None
                        else:
                            sample_shape = ob.detdata[fld].detector_shape[1:]
                        original.detdata.create(
                            fld,
                            sample_shape=sample_shape,
                            dtype=np.float32,
                            detectors=ob.detdata[fld].detectors,
                            units=ob.detdata[fld].units,
                        )
                        original.detdata[fld].data[:] = (
                            ob.detdata[fld].data[:].astype(np.float32)
                        )
                else:
                    # Duplicate detdata
                    original = ob.duplicate(
                        times=str(self.times),
                        meta=meta_fields,
                        shared=shared_fields,
                        detdata=verify_fields,
                        intervals=intervals_fields,
                    )

                compare = load_hdf5(
                    loadpath,
                    data.comm,
                    process_rows=ob.comm_col_size,
                    meta=meta_fields,
                    detdata=verify_fields,
                    shared=shared_fields,
                    intervals=intervals_fields,
                    force_serial=self.force_serial,
                )

                if not obs_approx_equal(compare, original):
                    msg = "Observation HDF5 verify failed:\n"
                    msg += f"Input = {original}\n"
                    msg += f"Loaded = {compare}"
                    log.error(msg)
                    raise RuntimeError(msg)

        return

    def _finalize(self, data, **kwargs):
        return

    def _requires(self):
        req = {
            "meta": list(),
            "detdata": list(),
            "intervals": list(),
            "shared": [
                self.times,
            ],
        }
        if self.meta is not None:
            req["meta"].extend(self.meta)
        if self.detdata is not None:
            req["detdata"].extend(self.detdata)
        if self.intervals is not None:
            req["intervals"].extend(self.intervals)
        if self.shared is not None:
            req["shared"].extend(self.shared)
        return req

    def _provides(self):
        return dict()

API = Int(0, help='Internal interface version for this operator') class-attribute instance-attribute

compress_detdata = Bool(False, help='If True, use FLAC to compress detector signal') class-attribute instance-attribute

compress_precision = Int(None, allow_none=True, help='Number of significant digits to retain in detdata compression') class-attribute instance-attribute

config = Dict({}, help='Write this job config to the file') class-attribute instance-attribute

detdata = List([defaults.det_data, defaults.det_flags], help='Only save this list of detdata objects') class-attribute instance-attribute

detdata_float32 = Bool(False, help='If True, convert any float64 detector data to float32 on write.') class-attribute instance-attribute

detdata_in_place = Bool(False, help='If True, all compressed detector data will be decompressed and written over the input data.') class-attribute instance-attribute

force_serial = Bool(False, help='Use serial HDF5 operations, even if parallel support available') class-attribute instance-attribute

intervals = List([], help='Only save this list of intervals objects') class-attribute instance-attribute

meta = List([], allow_none=True, help='Only save this list of meta objects') class-attribute instance-attribute

shared = List([], help='Only save this list of shared objects') class-attribute instance-attribute

times = Unicode(defaults.times, help='Observation shared key for timestamps') class-attribute instance-attribute

verify = Bool(False, help='If True, immediately load data back in and verify') class-attribute instance-attribute

volume = Unicode('toast_out_hdf5', allow_none=True, help='Top-level directory for the data volume') class-attribute instance-attribute

__init__(**kwargs)

Source code in toast/ops/save_hdf5.py
199
200
def __init__(self, **kwargs):
    super().__init__(**kwargs)

_exec(data, detectors=None, **kwargs)

Source code in toast/ops/save_hdf5.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
@function_timer
def _exec(self, data, detectors=None, **kwargs):
    log = Logger.get()

    if self.volume is None:
        msg = "You must set the volume trait prior to calling exec()"
        log.error(msg)
        raise RuntimeError(msg)

    # One process creates the top directory
    if data.comm.world_rank == 0:
        os.makedirs(self.volume, exist_ok=True)
    if data.comm.comm_world is not None:
        data.comm.comm_world.barrier()

    meta_fields = None
    if len(self.meta) > 0:
        meta_fields = list(self.meta)

    shared_fields = None
    if len(self.shared) > 0:
        shared_fields = list(self.shared)

    intervals_fields = None
    if len(self.intervals) > 0:
        intervals_fields = list(self.intervals)

    for ob in data.obs:
        # Observations must have a name for this to work
        if ob.name is None:
            raise RuntimeError(
                "Observations must have a name in order to save to HDF5 format"
            )

        # Check to see if any detector data objects are temporary and have just
        # a partial list of detectors.  Delete these.

        for dd in list(ob.detdata.keys()):
            if ob.detdata[dd].detectors != ob.local_detectors:
                del ob.detdata[dd]

        if len(self.detdata) > 0:
            detdata_fields = list(self.detdata)
        else:
            detdata_fields = list()

        if self.compress_detdata:
            # Add generic compression instructions to detdata fields
            for ifield, field in enumerate(detdata_fields):
                if not isinstance(field, str):
                    # Assume user already supplied instructions for this field
                    continue
                if "flag" in field:
                    # Flags are ZIP-compressed
                    detdata_fields[ifield] = (field, {"type": "gzip"})
                else:
                    # Everything else is FLAC-compressed
                    detdata_fields[ifield] = (
                        field,
                        {
                            "type": "flac",
                            "level": 5,
                            "precision": self.compress_precision,
                        },
                    )

        outpath = save_hdf5(
            ob,
            self.volume,
            meta=meta_fields,
            detdata=detdata_fields,
            shared=shared_fields,
            intervals=intervals_fields,
            config=self.config,
            times=str(self.times),
            force_serial=self.force_serial,
            detdata_float32=self.detdata_float32,
            detdata_in_place=self.detdata_in_place,
        )

        log.info_rank(f"Wrote {outpath}", comm=data.comm.comm_group)

        if self.verify:
            # We are going to load the data back in, but first we need to make
            # a modified copy of the input observation for comparison.  This is
            # because we may have only a portion of the data on disk and we
            # might have also converted data to 32bit floats.

            loadpath = outpath

            if len(self.detdata) > 0:
                # There might be compression info
                if isinstance(detdata_fields[0], (tuple, list)):
                    verify_fields = [x[0] for x in detdata_fields]
                else:
                    verify_fields = list(detdata_fields)
            else:
                # We saved nothing
                verify_fields = list()

            if self.detdata_float32:
                # We want to duplicate everything *except* float64 detdata
                # fields.
                dup_detdata = list()
                conv_detdata = list()
                for fld in verify_fields:
                    if ob.detdata[fld].dtype == np.dtype(np.float64):
                        conv_detdata.append(fld)
                    else:
                        dup_detdata.append(fld)
                original = ob.duplicate(
                    times=str(self.times),
                    meta=meta_fields,
                    shared=shared_fields,
                    detdata=dup_detdata,
                    intervals=intervals_fields,
                )
                for fld in conv_detdata:
                    if len(ob.detdata[fld].detector_shape) == 1:
                        sample_shape = None
                    else:
                        sample_shape = ob.detdata[fld].detector_shape[1:]
                    original.detdata.create(
                        fld,
                        sample_shape=sample_shape,
                        dtype=np.float32,
                        detectors=ob.detdata[fld].detectors,
                        units=ob.detdata[fld].units,
                    )
                    original.detdata[fld].data[:] = (
                        ob.detdata[fld].data[:].astype(np.float32)
                    )
            else:
                # Duplicate detdata
                original = ob.duplicate(
                    times=str(self.times),
                    meta=meta_fields,
                    shared=shared_fields,
                    detdata=verify_fields,
                    intervals=intervals_fields,
                )

            compare = load_hdf5(
                loadpath,
                data.comm,
                process_rows=ob.comm_col_size,
                meta=meta_fields,
                detdata=verify_fields,
                shared=shared_fields,
                intervals=intervals_fields,
                force_serial=self.force_serial,
            )

            if not obs_approx_equal(compare, original):
                msg = "Observation HDF5 verify failed:\n"
                msg += f"Input = {original}\n"
                msg += f"Loaded = {compare}"
                log.error(msg)
                raise RuntimeError(msg)

    return

_finalize(data, **kwargs)

Source code in toast/ops/save_hdf5.py
364
365
def _finalize(self, data, **kwargs):
    return

_provides()

Source code in toast/ops/save_hdf5.py
386
387
def _provides(self):
    return dict()

_requires()

Source code in toast/ops/save_hdf5.py
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
def _requires(self):
    req = {
        "meta": list(),
        "detdata": list(),
        "intervals": list(),
        "shared": [
            self.times,
        ],
    }
    if self.meta is not None:
        req["meta"].extend(self.meta)
    if self.detdata is not None:
        req["detdata"].extend(self.detdata)
    if self.intervals is not None:
        req["intervals"].extend(self.intervals)
    if self.shared is not None:
        req["shared"].extend(self.shared)
    return req

toast.ops.LoadHDF5

Bases: Operator

Operator which loads HDF5 data files into observations.

This operator expects a top-level volume directory.

Source code in toast/ops/load_hdf5.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
@trait_docs
class LoadHDF5(Operator):
    """Operator which loads HDF5 data files into observations.

    This operator expects a top-level volume directory.

    """

    # Class traits

    API = Int(0, help="Internal interface version for this operator")

    volume = Unicode(
        None, allow_none=True, help="Top-level directory containing the data volume"
    )

    pattern = Unicode("obs_.*_.*\.h5", help="Regexp pattern to match files against")

    files = List([], help="Override `volume` and load a list of files")

    # FIXME:  We should add a filtering mechanism here to load a subset of
    # observations and / or detectors, as well as figure out subdirectory organization.

    meta = List([], help="Only load this list of meta objects")

    detdata = List(
        [defaults.det_data, defaults.det_flags],
        help="Only load this list of detdata objects",
    )

    shared = List([], help="Only load this list of shared objects")

    intervals = List([], help="Only load this list of intervals objects")

    sort_by_size = Bool(
        False, help="If True, sort observations by size before load balancing"
    )

    process_rows = Int(
        None,
        allow_none=True,
        help="The size of the rectangular process grid in the detector direction.",
    )

    force_serial = Bool(
        False, help="Use serial HDF5 operations, even if parallel support available"
    )

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    @function_timer
    def _exec(self, data, detectors=None, **kwargs):
        log = Logger.get()

        pattern = re.compile(self.pattern)

        filenames = None
        if len(self.files) > 0:
            if self.volume is not None:
                log.warning(
                    f'LoadHDF5: volume="{self.volume}" trait overridden by '
                    f"files={self.files}"
                )
            filenames = list(self.files)

        meta_fields = None
        if len(self.meta) > 0:
            meta_fields = list(self.meta)

        shared_fields = None
        if len(self.shared) > 0:
            shared_fields = list(self.shared)

        if len(self.detdata) > 0:
            detdata_fields = list(self.detdata)
        else:
            detdata_fields = list()

        intervals_fields = None
        if len(self.intervals) > 0:
            intervals_fields = list(self.intervals)

        # FIXME:  Eventually we will use the volume index / DB to select observations
        # and their sizes for a load-balanced assignment.  For now, we read this from
        # the file.

        # One global process computes the list of observations and their approximate
        # relative size.

        def _get_obs_samples(path):
            n_samp = 0
            with h5py.File(path, "r") as hf:
                n_samp = int(hf.attrs["observation_samples"])
            return n_samp

        obs_props = list()
        if data.comm.world_rank == 0:
            if filenames is None:
                filenames = []
                for root, dirs, files in os.walk(self.volume):
                    # Process top-level files
                    filenames += [
                        os.path.join(root, fname)
                        for fname in files
                        if pattern.search(fname) is not None
                    ]
                    # Also process sub-directories one level deep
                    for d in dirs:
                        for root2, dirs2, files2 in os.walk(os.path.join(root, d)):
                            filenames += [
                                os.path.join(root2, fname)
                                for fname in files2
                                if pattern.search(fname) is not None
                            ]
                    break

            for ofile in sorted(filenames):
                fsize = _get_obs_samples(ofile)
                obs_props.append((fsize, ofile))

        if self.sort_by_size:
            obs_props.sort(key=lambda x: x[0])
        else:
            obs_props.sort(key=lambda x: x[1])
        if data.comm.comm_world is not None:
            obs_props = data.comm.comm_world.bcast(obs_props, root=0)

        # Distribute observations among groups

        obs_sizes = [x[0] for x in obs_props]
        groupdist = distribute_discrete(obs_sizes, data.comm.ngroups)
        group_firstobs = groupdist[data.comm.group].offset
        group_numobs = groupdist[data.comm.group].n_elem

        # Every group loads its observations
        for obindx in range(group_firstobs, group_firstobs + group_numobs):
            obfile = obs_props[obindx][1]

            ob = load_hdf5(
                obfile,
                data.comm,
                process_rows=self.process_rows,
                meta=meta_fields,
                detdata=detdata_fields,
                shared=shared_fields,
                intervals=intervals_fields,
                force_serial=self.force_serial,
            )

            data.obs.append(ob)

        return

    def _finalize(self, data, **kwargs):
        return

    def _requires(self):
        return dict()

    def _provides(self):
        return dict()

API = Int(0, help='Internal interface version for this operator') class-attribute instance-attribute

detdata = List([defaults.det_data, defaults.det_flags], help='Only load this list of detdata objects') class-attribute instance-attribute

files = List([], help='Override `volume` and load a list of files') class-attribute instance-attribute

force_serial = Bool(False, help='Use serial HDF5 operations, even if parallel support available') class-attribute instance-attribute

intervals = List([], help='Only load this list of intervals objects') class-attribute instance-attribute

meta = List([], help='Only load this list of meta objects') class-attribute instance-attribute

pattern = Unicode('obs_.*_.*\\.h5', help='Regexp pattern to match files against') class-attribute instance-attribute

process_rows = Int(None, allow_none=True, help='The size of the rectangular process grid in the detector direction.') class-attribute instance-attribute

shared = List([], help='Only load this list of shared objects') class-attribute instance-attribute

sort_by_size = Bool(False, help='If True, sort observations by size before load balancing') class-attribute instance-attribute

volume = Unicode(None, allow_none=True, help='Top-level directory containing the data volume') class-attribute instance-attribute

__init__(**kwargs)

Source code in toast/ops/load_hdf5.py
70
71
def __init__(self, **kwargs):
    super().__init__(**kwargs)

_exec(data, detectors=None, **kwargs)

Source code in toast/ops/load_hdf5.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
@function_timer
def _exec(self, data, detectors=None, **kwargs):
    log = Logger.get()

    pattern = re.compile(self.pattern)

    filenames = None
    if len(self.files) > 0:
        if self.volume is not None:
            log.warning(
                f'LoadHDF5: volume="{self.volume}" trait overridden by '
                f"files={self.files}"
            )
        filenames = list(self.files)

    meta_fields = None
    if len(self.meta) > 0:
        meta_fields = list(self.meta)

    shared_fields = None
    if len(self.shared) > 0:
        shared_fields = list(self.shared)

    if len(self.detdata) > 0:
        detdata_fields = list(self.detdata)
    else:
        detdata_fields = list()

    intervals_fields = None
    if len(self.intervals) > 0:
        intervals_fields = list(self.intervals)

    # FIXME:  Eventually we will use the volume index / DB to select observations
    # and their sizes for a load-balanced assignment.  For now, we read this from
    # the file.

    # One global process computes the list of observations and their approximate
    # relative size.

    def _get_obs_samples(path):
        n_samp = 0
        with h5py.File(path, "r") as hf:
            n_samp = int(hf.attrs["observation_samples"])
        return n_samp

    obs_props = list()
    if data.comm.world_rank == 0:
        if filenames is None:
            filenames = []
            for root, dirs, files in os.walk(self.volume):
                # Process top-level files
                filenames += [
                    os.path.join(root, fname)
                    for fname in files
                    if pattern.search(fname) is not None
                ]
                # Also process sub-directories one level deep
                for d in dirs:
                    for root2, dirs2, files2 in os.walk(os.path.join(root, d)):
                        filenames += [
                            os.path.join(root2, fname)
                            for fname in files2
                            if pattern.search(fname) is not None
                        ]
                break

        for ofile in sorted(filenames):
            fsize = _get_obs_samples(ofile)
            obs_props.append((fsize, ofile))

    if self.sort_by_size:
        obs_props.sort(key=lambda x: x[0])
    else:
        obs_props.sort(key=lambda x: x[1])
    if data.comm.comm_world is not None:
        obs_props = data.comm.comm_world.bcast(obs_props, root=0)

    # Distribute observations among groups

    obs_sizes = [x[0] for x in obs_props]
    groupdist = distribute_discrete(obs_sizes, data.comm.ngroups)
    group_firstobs = groupdist[data.comm.group].offset
    group_numobs = groupdist[data.comm.group].n_elem

    # Every group loads its observations
    for obindx in range(group_firstobs, group_firstobs + group_numobs):
        obfile = obs_props[obindx][1]

        ob = load_hdf5(
            obfile,
            data.comm,
            process_rows=self.process_rows,
            meta=meta_fields,
            detdata=detdata_fields,
            shared=shared_fields,
            intervals=intervals_fields,
            force_serial=self.force_serial,
        )

        data.obs.append(ob)

    return

_finalize(data, **kwargs)

Source code in toast/ops/load_hdf5.py
176
177
def _finalize(self, data, **kwargs):
    return

_provides()

Source code in toast/ops/load_hdf5.py
182
183
def _provides(self):
    return dict()

_requires()

Source code in toast/ops/load_hdf5.py
179
180
def _requires(self):
    return dict()

toast.ops.SaveSpt3g

Bases: Operator

Operator which saves observations to SPT3G frame files.

This creates a directory for each observation, and writes framefiles with a target size.

Source code in toast/ops/save_spt3g.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
@trait_docs
class SaveSpt3g(Operator):
    """Operator which saves observations to SPT3G frame files.

    This creates a directory for each observation, and writes framefiles with
    a target size.

    """

    # Class traits

    API = Int(0, help="Internal interface version for this operator")

    directory = Unicode("spt3g_data", help="Top-level export directory")

    framefile_mb = Float(100.0, help="Target frame file size in MB")

    gzip = Bool(False, help="If True, gzip compress the frame files")

    # FIXME:  We should add a filtering mechanism here to dump a subset of
    # observations and / or detectors.

    obs_export = Instance(
        klass=object,
        allow_none=True,
        help="Export class to create frames from an observation",
    )

    purge = Bool(False, help="If True, delete observation data as it is saved")

    @traitlets.validate("obs_export")
    def _check_export(self, proposal):
        ex = proposal["value"]
        if ex is not None:
            # Check that this class has an "export_rank" attribute and is callable.
            if not callable(ex):
                raise traitlets.TraitError("obs_export class must be callable")
            if not hasattr(ex, "export_rank"):
                raise traitlets.TraitError(
                    "obs_export class must have an export_rank attribute"
                )
        return ex

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        if not available:
            raise RuntimeError("spt3g is not available")

    @function_timer
    def _exec(self, data, detectors=None, **kwargs):
        log = Logger.get()

        # Check that the export class is set
        if self.obs_export is None:
            raise RuntimeError(
                "You must set the obs_export trait before calling exec()"
            )

        # One process creates the top directory
        if data.comm.world_rank == 0:
            os.makedirs(self.directory, exist_ok=True)
        if data.comm.comm_world is not None:
            data.comm.comm_world.barrier()

        for ob in data.obs:
            # Observations must have a name for this to work
            if ob.name is None:
                raise RuntimeError(
                    "Observations must have a name in order to save to SPT3G format"
                )
            ob_dir = os.path.join(self.directory, ob.name)
            if ob.comm.group_rank == 0:
                # Make observation directory.  This should NOT already exist.
                os.makedirs(ob_dir)
            if ob.comm.comm_group is not None:
                ob.comm.comm_group.barrier()

            # Export observation to frames
            frames = self.obs_export(ob)

            # If the export rank is set, then frames will be gathered to one
            # process and written.  Otherwise each process will write to
            # sequential, independent frame files.
            ex_rank = self.obs_export.export_rank
            if ex_rank is None:
                # All processes write independently
                emitter = frame_emitter(frames=frames)
                save_pipe = c3g.G3Pipeline()
                save_pipe.Add(emitter)
                fname = f"frames-{ob.comm.group_rank:04d}.g3"
                if self.gzip:
                    fname += ".gz"
                save_pipe.Add(
                    c3g.G3Writer,
                    filename=os.path.join(ob_dir, fname),
                )
                save_pipe.Run()
                del save_pipe
                del emitter
            else:
                # Gather frames to one process and write
                if ob.comm.group_rank == ex_rank:
                    emitter = frame_emitter(frames=frames)
                    save_pipe = c3g.G3Pipeline()
                    save_pipe.Add(emitter)
                    fpattern = "frames-%04u.g3"
                    if self.gzip:
                        fpattern += ".gz"
                    save_pipe.Add(
                        c3g.G3MultiFileWriter,
                        filename=os.path.join(ob_dir, fpattern),
                        size_limit=int(self.framefile_mb * 1024**2),
                    )
                    save_pipe.Run()
                    del save_pipe
                    del emitter

            if ob.comm.comm_group is not None:
                ob.comm.comm_group.barrier()

            if self.purge:
                ob.clear()

        if self.purge:
            data.obs.clear()
        return

    def _finalize(self, data, **kwargs):
        return

    def _requires(self):
        return dict()

    def _provides(self):
        return dict()

API = Int(0, help='Internal interface version for this operator') class-attribute instance-attribute

directory = Unicode('spt3g_data', help='Top-level export directory') class-attribute instance-attribute

framefile_mb = Float(100.0, help='Target frame file size in MB') class-attribute instance-attribute

gzip = Bool(False, help='If True, gzip compress the frame files') class-attribute instance-attribute

obs_export = Instance(klass=object, allow_none=True, help='Export class to create frames from an observation') class-attribute instance-attribute

purge = Bool(False, help='If True, delete observation data as it is saved') class-attribute instance-attribute

__init__(**kwargs)

Source code in toast/ops/save_spt3g.py
65
66
67
68
def __init__(self, **kwargs):
    super().__init__(**kwargs)
    if not available:
        raise RuntimeError("spt3g is not available")

_check_export(proposal)

Source code in toast/ops/save_spt3g.py
52
53
54
55
56
57
58
59
60
61
62
63
@traitlets.validate("obs_export")
def _check_export(self, proposal):
    ex = proposal["value"]
    if ex is not None:
        # Check that this class has an "export_rank" attribute and is callable.
        if not callable(ex):
            raise traitlets.TraitError("obs_export class must be callable")
        if not hasattr(ex, "export_rank"):
            raise traitlets.TraitError(
                "obs_export class must have an export_rank attribute"
            )
    return ex

_exec(data, detectors=None, **kwargs)

Source code in toast/ops/save_spt3g.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
@function_timer
def _exec(self, data, detectors=None, **kwargs):
    log = Logger.get()

    # Check that the export class is set
    if self.obs_export is None:
        raise RuntimeError(
            "You must set the obs_export trait before calling exec()"
        )

    # One process creates the top directory
    if data.comm.world_rank == 0:
        os.makedirs(self.directory, exist_ok=True)
    if data.comm.comm_world is not None:
        data.comm.comm_world.barrier()

    for ob in data.obs:
        # Observations must have a name for this to work
        if ob.name is None:
            raise RuntimeError(
                "Observations must have a name in order to save to SPT3G format"
            )
        ob_dir = os.path.join(self.directory, ob.name)
        if ob.comm.group_rank == 0:
            # Make observation directory.  This should NOT already exist.
            os.makedirs(ob_dir)
        if ob.comm.comm_group is not None:
            ob.comm.comm_group.barrier()

        # Export observation to frames
        frames = self.obs_export(ob)

        # If the export rank is set, then frames will be gathered to one
        # process and written.  Otherwise each process will write to
        # sequential, independent frame files.
        ex_rank = self.obs_export.export_rank
        if ex_rank is None:
            # All processes write independently
            emitter = frame_emitter(frames=frames)
            save_pipe = c3g.G3Pipeline()
            save_pipe.Add(emitter)
            fname = f"frames-{ob.comm.group_rank:04d}.g3"
            if self.gzip:
                fname += ".gz"
            save_pipe.Add(
                c3g.G3Writer,
                filename=os.path.join(ob_dir, fname),
            )
            save_pipe.Run()
            del save_pipe
            del emitter
        else:
            # Gather frames to one process and write
            if ob.comm.group_rank == ex_rank:
                emitter = frame_emitter(frames=frames)
                save_pipe = c3g.G3Pipeline()
                save_pipe.Add(emitter)
                fpattern = "frames-%04u.g3"
                if self.gzip:
                    fpattern += ".gz"
                save_pipe.Add(
                    c3g.G3MultiFileWriter,
                    filename=os.path.join(ob_dir, fpattern),
                    size_limit=int(self.framefile_mb * 1024**2),
                )
                save_pipe.Run()
                del save_pipe
                del emitter

        if ob.comm.comm_group is not None:
            ob.comm.comm_group.barrier()

        if self.purge:
            ob.clear()

    if self.purge:
        data.obs.clear()
    return

_finalize(data, **kwargs)

Source code in toast/ops/save_spt3g.py
149
150
def _finalize(self, data, **kwargs):
    return

_provides()

Source code in toast/ops/save_spt3g.py
155
156
def _provides(self):
    return dict()

_requires()

Source code in toast/ops/save_spt3g.py
152
153
def _requires(self):
    return dict()

toast.ops.LoadSpt3g

Bases: Operator

Operator which loads SPT3G data files into observations.

This operator expects a top-level data directory.

Source code in toast/ops/load_spt3g.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
@trait_docs
class LoadSpt3g(Operator):
    """Operator which loads SPT3G data files into observations.

    This operator expects a top-level data directory.

    """

    # Class traits

    API = Int(0, help="Internal interface version for this operator")

    directory = Unicode(
        None, allow_none=True, help="Top-level directory of observations"
    )

    # FIXME:  We should add a filtering mechanism here to load a subset of
    # observations and / or detectors.

    obs_import = Instance(
        klass=object,
        allow_none=True,
        help="Import class to create observations from frame files",
    )

    @traitlets.validate("obs_import")
    def _check_import(self, proposal):
        im = proposal["value"]
        if im is not None:
            # Check that this class has an "import_rank" attribute and is callable.
            if not callable(im):
                raise traitlets.TraitError("obs_import class must be callable")
            if not hasattr(im, "import_rank"):
                raise traitlets.TraitError(
                    "obs_import class must have an import_rank attribute"
                )
        return im

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        if not available:
            raise RuntimeError("spt3g is not available")

    @function_timer
    def _exec(self, data, detectors=None, **kwargs):
        log = Logger.get()

        # Check that the import class is set
        if self.obs_import is None:
            raise RuntimeError(
                "You must set the obs_import trait before calling exec()"
            )

        # Find the process rank which will load all frames for a given group.
        im_rank = self.obs_import.import_rank
        if im_rank is None:
            raise RuntimeError(
                "The obs_import class must be configured to import frames on one process"
            )

        # FIXME:  we should add a check that self.obs_import.comm is the same as
        # the data.comm.comm_group communicator.

        # One global process computes the list of observations and their approximate
        # relative size (based on the total framefile sizes for each).

        obs_props = list()
        if data.comm.world_rank == 0:
            for root, dirs, files in os.walk(self.directory):
                for d in dirs:
                    framefiles = glob.glob(os.path.join(root, d, "*.g3"))
                    if len(framefiles) > 0:
                        # This is an observation directory
                        flist = list()
                        obtotal = 0
                        for ffile in sorted(framefiles):
                            fsize = os.path.getsize(ffile)
                            obtotal += fsize
                            flist.append(ffile)
                        obs_props.append((d, obtotal, flist))
                break
        obs_props.sort(key=lambda x: x[0])
        if data.comm.comm_world is not None:
            obs_props = data.comm.comm_world.bcast(obs_props, root=0)

        # Distribute observations among groups

        obs_sizes = [x[1] for x in obs_props]
        groupdist = distribute_discrete(obs_sizes, data.comm.ngroups)
        group_firstobs = groupdist[data.comm.group][0]
        group_numobs = groupdist[data.comm.group][1]

        # Every group creates its observations
        for obindx in range(group_firstobs, group_firstobs + group_numobs):
            obname = obs_props[obindx][0]
            obfiles = obs_props[obindx][2]

            collect = frame_collector()

            if data.comm.group_rank == im_rank:
                all_frames = list()
                # We are loading the frames
                for ffile in obfiles:
                    load_pipe = c3g.G3Pipeline()
                    load_pipe.Add(c3g.G3Reader(ffile))
                    load_pipe.Add(collect)
                    load_pipe.Run()

            # Convert collected frames to an observation
            ob = self.obs_import(collect.frames)
            data.obs.append(ob)
            del collect

        return

    def _finalize(self, data, **kwargs):
        return

    def _requires(self):
        return dict()

    def _provides(self):
        return dict()

API = Int(0, help='Internal interface version for this operator') class-attribute instance-attribute

directory = Unicode(None, allow_none=True, help='Top-level directory of observations') class-attribute instance-attribute

obs_import = Instance(klass=object, allow_none=True, help='Import class to create observations from frame files') class-attribute instance-attribute

__init__(**kwargs)

Source code in toast/ops/load_spt3g.py
62
63
64
65
def __init__(self, **kwargs):
    super().__init__(**kwargs)
    if not available:
        raise RuntimeError("spt3g is not available")

_check_import(proposal)

Source code in toast/ops/load_spt3g.py
49
50
51
52
53
54
55
56
57
58
59
60
@traitlets.validate("obs_import")
def _check_import(self, proposal):
    im = proposal["value"]
    if im is not None:
        # Check that this class has an "import_rank" attribute and is callable.
        if not callable(im):
            raise traitlets.TraitError("obs_import class must be callable")
        if not hasattr(im, "import_rank"):
            raise traitlets.TraitError(
                "obs_import class must have an import_rank attribute"
            )
    return im

_exec(data, detectors=None, **kwargs)

Source code in toast/ops/load_spt3g.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
@function_timer
def _exec(self, data, detectors=None, **kwargs):
    log = Logger.get()

    # Check that the import class is set
    if self.obs_import is None:
        raise RuntimeError(
            "You must set the obs_import trait before calling exec()"
        )

    # Find the process rank which will load all frames for a given group.
    im_rank = self.obs_import.import_rank
    if im_rank is None:
        raise RuntimeError(
            "The obs_import class must be configured to import frames on one process"
        )

    # FIXME:  we should add a check that self.obs_import.comm is the same as
    # the data.comm.comm_group communicator.

    # One global process computes the list of observations and their approximate
    # relative size (based on the total framefile sizes for each).

    obs_props = list()
    if data.comm.world_rank == 0:
        for root, dirs, files in os.walk(self.directory):
            for d in dirs:
                framefiles = glob.glob(os.path.join(root, d, "*.g3"))
                if len(framefiles) > 0:
                    # This is an observation directory
                    flist = list()
                    obtotal = 0
                    for ffile in sorted(framefiles):
                        fsize = os.path.getsize(ffile)
                        obtotal += fsize
                        flist.append(ffile)
                    obs_props.append((d, obtotal, flist))
            break
    obs_props.sort(key=lambda x: x[0])
    if data.comm.comm_world is not None:
        obs_props = data.comm.comm_world.bcast(obs_props, root=0)

    # Distribute observations among groups

    obs_sizes = [x[1] for x in obs_props]
    groupdist = distribute_discrete(obs_sizes, data.comm.ngroups)
    group_firstobs = groupdist[data.comm.group][0]
    group_numobs = groupdist[data.comm.group][1]

    # Every group creates its observations
    for obindx in range(group_firstobs, group_firstobs + group_numobs):
        obname = obs_props[obindx][0]
        obfiles = obs_props[obindx][2]

        collect = frame_collector()

        if data.comm.group_rank == im_rank:
            all_frames = list()
            # We are loading the frames
            for ffile in obfiles:
                load_pipe = c3g.G3Pipeline()
                load_pipe.Add(c3g.G3Reader(ffile))
                load_pipe.Add(collect)
                load_pipe.Run()

        # Convert collected frames to an observation
        ob = self.obs_import(collect.frames)
        data.obs.append(ob)
        del collect

    return

_finalize(data, **kwargs)

Source code in toast/ops/load_spt3g.py
139
140
def _finalize(self, data, **kwargs):
    return

_provides()

Source code in toast/ops/load_spt3g.py
145
146
def _provides(self):
    return dict()

_requires()

Source code in toast/ops/load_spt3g.py
142
143
def _requires(self):
    return dict()