Skip to content

Data Formats

TOAST has a built-in data format for storing compressed time ordered data on disk in one or more HDF5 files.

toast.ops.SaveHDF5

Bases: Operator

Operator which saves observations to HDF5.

This creates a file for each observation. Detector data compression can be enabled by specifying a tuple for each item in the detdata list. The first item in the tuple is the field name. The second item is either None, or a dictionary of FLAC comppression properties. Allowed compression parameters are:

"level": (int) the compression level
"quanta": (float) the quantization value, only for floating point data
"precision": (float) the fixed precision, only for floating point data

For integer data, an empty dictionary may be passed, and FLAC compression will use the default level (5). Floating point data must specify either the quanta or precision parameters.

Source code in toast/ops/save_hdf5.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
@trait_docs
class SaveHDF5(Operator):
    """Operator which saves observations to HDF5.

    This creates a file for each observation.  Detector data compression can be enabled
    by specifying a tuple for each item in the detdata list.  The first item in the
    tuple is the field name.  The second item is either None, or a dictionary of FLAC
    comppression properties.  Allowed compression parameters are:

        "level": (int) the compression level
        "quanta": (float) the quantization value, only for floating point data
        "precision": (float) the fixed precision, only for floating point data

    For integer data, an empty dictionary may be passed, and FLAC compression
    will use the default level (5).  Floating point data *must* specify either the
    quanta or precision parameters.

    """

    # Class traits

    API = Int(0, help="Internal interface version for this operator")

    volume = Unicode(
        "toast_out_hdf5",
        allow_none=True,
        help="Top-level directory for the data volume",
    )

    volume_index = Unicode(
        "DEFAULT",
        allow_none=True,
        help=(
            "Path to index file.  None disables use of index.  "
            "'DEFAULT' uses the default VolumeIndex name at the top of the volume."
        ),
    )

    volume_index_fields = Dict(
        {}, help="Extra observation metadata to index.  See `VolumeIndex.append()`."
    )

    session_dirs = Bool(
        False, help="Organize observation files in session sub-directories"
    )

    unix_time_dirs = Bool(
        False, help="If True, organize files in 5 digit, time-prefix sub-directories"
    )

    # FIXME:  We should add a filtering mechanism here to dump a subset of
    # observations and / or detectors, as well as figure out subdirectory organization.

    meta = List([], allow_none=True, help="Only save this list of meta objects")

    detdata = List(
        [defaults.det_data, defaults.det_flags],
        help="Only save this list of detdata objects",
    )

    shared = List([], help="Only save this list of shared objects")

    intervals = List([], help="Only save this list of intervals objects")

    times = Unicode(defaults.times, help="Observation shared key for timestamps")

    config = Dict({}, help="Write this job config to the file")

    force_serial = Bool(
        False, help="Use serial HDF5 operations, even if parallel support available"
    )

    detdata_float32 = Bool(
        False, help="(Deprecated) Specify the per-field compression parameters."
    )

    detdata_in_place = Bool(
        False,
        help="If True, all compressed detector data will be decompressed and written "
        "over the input data.",
    )

    compress_detdata = Bool(
        False, help="(Deprecated) Specify the per-field compression parameters"
    )

    compress_precision = Int(
        None,
        allow_none=True,
        help="(Deprecated) Specify the per-field compression parameters",
    )

    verify = Bool(False, help="If True, immediately load data back in and verify")

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    @function_timer
    def _exec(self, data, detectors=None, **kwargs):
        log = Logger.get()

        if self.volume is None:
            msg = "You must set the volume trait prior to calling exec()"
            log.error(msg)
            raise RuntimeError(msg)

        # Warn for deprecated traits that will be removed eventually.

        if self.detdata_float32:
            msg = "The detdata_float32 option is deprecated.  Instead, specify"
            msg = " a compression quanta / precision that is appropriate for"
            msg = " each detdata field."
            warnings.warn(msg, DeprecationWarning)

        if self.compress_detdata:
            msg = "The compress_detdata option is deprecated.  Instead, specify"
            msg = " a compression quanta / precision that is appropriate for"
            msg = " each detdata field."
            warnings.warn(msg, DeprecationWarning)

        if self.compress_precision is not None:
            msg = "The compress_precision option is deprecated.  Instead, specify"
            msg = " a compression quanta / precision that is appropriate for"
            msg = " each detdata field."
            warnings.warn(msg, DeprecationWarning)

        # One process creates the top directory
        if data.comm.world_rank == 0:
            os.makedirs(self.volume, exist_ok=True)
        if data.comm.comm_world is not None:
            data.comm.comm_world.barrier()

        # Index for the volume, if we are using it.
        if self.volume_index is None:
            vindx = None
        elif self.volume_index == "DEFAULT":
            vindx = VolumeIndex(os.path.join(self.volume, VolumeIndex.default_name))
        else:
            vindx = VolumeIndex(self.volume_index)

        meta_fields = None
        if len(self.meta) > 0:
            meta_fields = list(self.meta)

        shared_fields = None
        if len(self.shared) > 0:
            shared_fields = list(self.shared)

        intervals_fields = None
        if len(self.intervals) > 0:
            intervals_fields = list(self.intervals)

        if len(self.detdata) > 0:
            detdata_fields = list(self.detdata)
        else:
            detdata_fields = list()

        # Handle parsing of deprecated global compression options.  All
        # new code should specify the FLAC compression parameters per
        # field.
        for ifield, field in enumerate(detdata_fields):
            if not isinstance(field, str):
                # User already specified compression parameters
                continue
            cprops = {"level": 5}
            if self.compress_detdata:
                # Try to guess what to do.
                if "flag" not in field:
                    # Might be float data
                    if self.compress_precision is None:
                        # Compress to 32bit floats
                        cprops["quanta"] = np.finfo(np.float32).eps
                    else:
                        cprops["precision"] = self.compress_precision
                detdata_fields[ifield] = (field, cprops)
            elif self.detdata_float32:
                # Implement this truncation as just compression to 32bit float
                # precision
                cprops["quanta"] = np.finfo(np.float32).eps
                detdata_fields[ifield] = (field, cprops)
            else:
                # No compression
                detdata_fields[ifield] = (field, None)

        for ob in data.obs:
            # Observations must have a name for this to work
            if ob.name is None:
                raise RuntimeError(
                    "Observations must have a name in order to save to HDF5 format"
                )

            # Check to see if any detector data objects are temporary and have just
            # a partial list of detectors.  Delete these.
            for dd in list(ob.detdata.keys()):
                if ob.detdata[dd].detectors != ob.local_detectors:
                    del ob.detdata[dd]

            time_prefix = None
            if self.unix_time_dirs:
                time_prefix = f"{ob.session.start.timestamp()}"[:5]

            if self.session_dirs:
                if time_prefix is None:
                    outdir = os.path.join(self.volume, ob.session.name)
                else:
                    outdir = os.path.join(self.volume, time_prefix, ob.session.name)
                # One process creates the top directory
                if data.comm.group_rank == 0:
                    os.makedirs(outdir, exist_ok=True)
                if data.comm.comm_group is not None:
                    data.comm.comm_group.barrier()
            else:
                if time_prefix is None:
                    outdir = self.volume
                else:
                    outdir = os.path.join(self.volume, time_prefix)

            outpath = save_hdf5(
                ob,
                outdir,
                meta=meta_fields,
                detdata=detdata_fields,
                shared=shared_fields,
                intervals=intervals_fields,
                config=self.config,
                times=str(self.times),
                force_serial=self.force_serial,
                detdata_in_place=self.detdata_in_place,
            )

            log.info_rank(f"Wrote {outpath}", comm=data.comm.comm_group)

            # Add this observation to the volume index
            if vindx is not None:
                rel_path = os.path.relpath(outpath, start=self.volume)
                if len(self.volume_index_fields) == 0:
                    index_fields = None
                else:
                    index_fields = self.volume_index_fields
                msg = f"Add {ob.name} to index with extra fields {index_fields}"
                log.verbose_rank(msg, comm=data.comm.comm_group)
                vindx.append(ob, rel_path, indexfields=index_fields)

            if self.verify:
                # We are going to load the data back in, but first we need to make
                # a modified copy of the input observation for comparison.  This is
                # because we may have only a portion of the data on disk and we
                # might have also converted data to 32bit floats.

                loadpath = outpath

                if len(self.detdata) > 0:
                    # There might be compression info
                    if isinstance(detdata_fields[0], (tuple, list)):
                        verify_fields = [x[0] for x in detdata_fields]
                    else:
                        verify_fields = list(detdata_fields)
                else:
                    # We saved nothing
                    verify_fields = list()

                original = ob.duplicate(
                    times=str(self.times),
                    meta=meta_fields,
                    shared=shared_fields,
                    detdata=verify_fields,
                    intervals=intervals_fields,
                )

                compare = load_hdf5(
                    loadpath,
                    data.comm,
                    process_rows=ob.comm_col_size,
                    meta=meta_fields,
                    detdata=verify_fields,
                    shared=shared_fields,
                    intervals=intervals_fields,
                    force_serial=self.force_serial,
                )

                failed = 0
                fail_msg = None
                if not original.__eq__(compare, approx=True):
                    fail_msg = "Observation HDF5 verify failed:\n"
                    fail_msg += f"Input = {original}\n"
                    fail_msg += f"Loaded = {compare}\n"
                    fail_msg += f"Input signal[0] = {original.detdata['signal'][0]}\n"
                    fail_msg += f"Loaded signal[0] = {compare.detdata['signal'][0]}"
                    log.error(fail_msg)
                    failed = 1
                if data.comm.comm_group is not None:
                    failed = data.comm.comm_group.allreduce(failed, op=MPI.SUM)
                if failed:
                    raise RuntimeError(fail_msg)

        return

    def _finalize(self, data, **kwargs):
        return

    def _requires(self):
        req = {
            "meta": list(),
            "detdata": list(),
            "intervals": list(),
            "shared": [
                self.times,
            ],
        }
        if self.meta is not None:
            req["meta"].extend(self.meta)
        if self.detdata is not None:
            req["detdata"].extend(self.detdata)
        if self.intervals is not None:
            req["intervals"].extend(self.intervals)
        if self.shared is not None:
            req["shared"].extend(self.shared)
        return req

    def _provides(self):
        return dict()

API = Int(0, help='Internal interface version for this operator') class-attribute instance-attribute

compress_detdata = Bool(False, help='(Deprecated) Specify the per-field compression parameters') class-attribute instance-attribute

compress_precision = Int(None, allow_none=True, help='(Deprecated) Specify the per-field compression parameters') class-attribute instance-attribute

config = Dict({}, help='Write this job config to the file') class-attribute instance-attribute

detdata = List([defaults.det_data, defaults.det_flags], help='Only save this list of detdata objects') class-attribute instance-attribute

detdata_float32 = Bool(False, help='(Deprecated) Specify the per-field compression parameters.') class-attribute instance-attribute

detdata_in_place = Bool(False, help='If True, all compressed detector data will be decompressed and written over the input data.') class-attribute instance-attribute

force_serial = Bool(False, help='Use serial HDF5 operations, even if parallel support available') class-attribute instance-attribute

intervals = List([], help='Only save this list of intervals objects') class-attribute instance-attribute

meta = List([], allow_none=True, help='Only save this list of meta objects') class-attribute instance-attribute

session_dirs = Bool(False, help='Organize observation files in session sub-directories') class-attribute instance-attribute

shared = List([], help='Only save this list of shared objects') class-attribute instance-attribute

times = Unicode(defaults.times, help='Observation shared key for timestamps') class-attribute instance-attribute

unix_time_dirs = Bool(False, help='If True, organize files in 5 digit, time-prefix sub-directories') class-attribute instance-attribute

verify = Bool(False, help='If True, immediately load data back in and verify') class-attribute instance-attribute

volume = Unicode('toast_out_hdf5', allow_none=True, help='Top-level directory for the data volume') class-attribute instance-attribute

volume_index = Unicode('DEFAULT', allow_none=True, help="Path to index file. None disables use of index. 'DEFAULT' uses the default VolumeIndex name at the top of the volume.") class-attribute instance-attribute

volume_index_fields = Dict({}, help='Extra observation metadata to index. See `VolumeIndex.append()`.') class-attribute instance-attribute

__init__(**kwargs)

Source code in toast/ops/save_hdf5.py
113
114
def __init__(self, **kwargs):
    super().__init__(**kwargs)

_exec(data, detectors=None, **kwargs)

Source code in toast/ops/save_hdf5.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
@function_timer
def _exec(self, data, detectors=None, **kwargs):
    log = Logger.get()

    if self.volume is None:
        msg = "You must set the volume trait prior to calling exec()"
        log.error(msg)
        raise RuntimeError(msg)

    # Warn for deprecated traits that will be removed eventually.

    if self.detdata_float32:
        msg = "The detdata_float32 option is deprecated.  Instead, specify"
        msg = " a compression quanta / precision that is appropriate for"
        msg = " each detdata field."
        warnings.warn(msg, DeprecationWarning)

    if self.compress_detdata:
        msg = "The compress_detdata option is deprecated.  Instead, specify"
        msg = " a compression quanta / precision that is appropriate for"
        msg = " each detdata field."
        warnings.warn(msg, DeprecationWarning)

    if self.compress_precision is not None:
        msg = "The compress_precision option is deprecated.  Instead, specify"
        msg = " a compression quanta / precision that is appropriate for"
        msg = " each detdata field."
        warnings.warn(msg, DeprecationWarning)

    # One process creates the top directory
    if data.comm.world_rank == 0:
        os.makedirs(self.volume, exist_ok=True)
    if data.comm.comm_world is not None:
        data.comm.comm_world.barrier()

    # Index for the volume, if we are using it.
    if self.volume_index is None:
        vindx = None
    elif self.volume_index == "DEFAULT":
        vindx = VolumeIndex(os.path.join(self.volume, VolumeIndex.default_name))
    else:
        vindx = VolumeIndex(self.volume_index)

    meta_fields = None
    if len(self.meta) > 0:
        meta_fields = list(self.meta)

    shared_fields = None
    if len(self.shared) > 0:
        shared_fields = list(self.shared)

    intervals_fields = None
    if len(self.intervals) > 0:
        intervals_fields = list(self.intervals)

    if len(self.detdata) > 0:
        detdata_fields = list(self.detdata)
    else:
        detdata_fields = list()

    # Handle parsing of deprecated global compression options.  All
    # new code should specify the FLAC compression parameters per
    # field.
    for ifield, field in enumerate(detdata_fields):
        if not isinstance(field, str):
            # User already specified compression parameters
            continue
        cprops = {"level": 5}
        if self.compress_detdata:
            # Try to guess what to do.
            if "flag" not in field:
                # Might be float data
                if self.compress_precision is None:
                    # Compress to 32bit floats
                    cprops["quanta"] = np.finfo(np.float32).eps
                else:
                    cprops["precision"] = self.compress_precision
            detdata_fields[ifield] = (field, cprops)
        elif self.detdata_float32:
            # Implement this truncation as just compression to 32bit float
            # precision
            cprops["quanta"] = np.finfo(np.float32).eps
            detdata_fields[ifield] = (field, cprops)
        else:
            # No compression
            detdata_fields[ifield] = (field, None)

    for ob in data.obs:
        # Observations must have a name for this to work
        if ob.name is None:
            raise RuntimeError(
                "Observations must have a name in order to save to HDF5 format"
            )

        # Check to see if any detector data objects are temporary and have just
        # a partial list of detectors.  Delete these.
        for dd in list(ob.detdata.keys()):
            if ob.detdata[dd].detectors != ob.local_detectors:
                del ob.detdata[dd]

        time_prefix = None
        if self.unix_time_dirs:
            time_prefix = f"{ob.session.start.timestamp()}"[:5]

        if self.session_dirs:
            if time_prefix is None:
                outdir = os.path.join(self.volume, ob.session.name)
            else:
                outdir = os.path.join(self.volume, time_prefix, ob.session.name)
            # One process creates the top directory
            if data.comm.group_rank == 0:
                os.makedirs(outdir, exist_ok=True)
            if data.comm.comm_group is not None:
                data.comm.comm_group.barrier()
        else:
            if time_prefix is None:
                outdir = self.volume
            else:
                outdir = os.path.join(self.volume, time_prefix)

        outpath = save_hdf5(
            ob,
            outdir,
            meta=meta_fields,
            detdata=detdata_fields,
            shared=shared_fields,
            intervals=intervals_fields,
            config=self.config,
            times=str(self.times),
            force_serial=self.force_serial,
            detdata_in_place=self.detdata_in_place,
        )

        log.info_rank(f"Wrote {outpath}", comm=data.comm.comm_group)

        # Add this observation to the volume index
        if vindx is not None:
            rel_path = os.path.relpath(outpath, start=self.volume)
            if len(self.volume_index_fields) == 0:
                index_fields = None
            else:
                index_fields = self.volume_index_fields
            msg = f"Add {ob.name} to index with extra fields {index_fields}"
            log.verbose_rank(msg, comm=data.comm.comm_group)
            vindx.append(ob, rel_path, indexfields=index_fields)

        if self.verify:
            # We are going to load the data back in, but first we need to make
            # a modified copy of the input observation for comparison.  This is
            # because we may have only a portion of the data on disk and we
            # might have also converted data to 32bit floats.

            loadpath = outpath

            if len(self.detdata) > 0:
                # There might be compression info
                if isinstance(detdata_fields[0], (tuple, list)):
                    verify_fields = [x[0] for x in detdata_fields]
                else:
                    verify_fields = list(detdata_fields)
            else:
                # We saved nothing
                verify_fields = list()

            original = ob.duplicate(
                times=str(self.times),
                meta=meta_fields,
                shared=shared_fields,
                detdata=verify_fields,
                intervals=intervals_fields,
            )

            compare = load_hdf5(
                loadpath,
                data.comm,
                process_rows=ob.comm_col_size,
                meta=meta_fields,
                detdata=verify_fields,
                shared=shared_fields,
                intervals=intervals_fields,
                force_serial=self.force_serial,
            )

            failed = 0
            fail_msg = None
            if not original.__eq__(compare, approx=True):
                fail_msg = "Observation HDF5 verify failed:\n"
                fail_msg += f"Input = {original}\n"
                fail_msg += f"Loaded = {compare}\n"
                fail_msg += f"Input signal[0] = {original.detdata['signal'][0]}\n"
                fail_msg += f"Loaded signal[0] = {compare.detdata['signal'][0]}"
                log.error(fail_msg)
                failed = 1
            if data.comm.comm_group is not None:
                failed = data.comm.comm_group.allreduce(failed, op=MPI.SUM)
            if failed:
                raise RuntimeError(fail_msg)

    return

_finalize(data, **kwargs)

Source code in toast/ops/save_hdf5.py
316
317
def _finalize(self, data, **kwargs):
    return

_provides()

Source code in toast/ops/save_hdf5.py
338
339
def _provides(self):
    return dict()

_requires()

Source code in toast/ops/save_hdf5.py
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
def _requires(self):
    req = {
        "meta": list(),
        "detdata": list(),
        "intervals": list(),
        "shared": [
            self.times,
        ],
    }
    if self.meta is not None:
        req["meta"].extend(self.meta)
    if self.detdata is not None:
        req["detdata"].extend(self.detdata)
    if self.intervals is not None:
        req["intervals"].extend(self.intervals)
    if self.shared is not None:
        req["shared"].extend(self.shared)
    return req

toast.ops.LoadHDF5

Bases: Operator

Operator which loads HDF5 data files into observations.

Selected observation files are distributed across process groups in a load-balanced way, using the number of samples and (if using an index), the number of valid detectors in each observation.

The list of observation files can be built in multiple ways:

  • If files is specified, that is used and volume is ignored.

  • If volume_select is specified, it should contain the conditional portion of the SQL select command (e.g. "where X < Y"). This will be used to extract the observation metadata needed to load files.

  • If volume_select is not specified then all observations are chosen (either from the volume index if specified, or using the filesystem). This full list of observations then has the regex pattern applied to the basename of each observation file.

Source code in toast/ops/load_hdf5.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
@trait_docs
class LoadHDF5(Operator):
    """Operator which loads HDF5 data files into observations.

    Selected observation files are distributed across process groups in a load-balanced
    way, using the number of samples and (if using an index), the number of valid
    detectors in each observation.

    The list of observation files can be built in multiple ways:

    - If `files` is specified, that is used and `volume` is ignored.

    - If `volume_select` is specified, it should contain the conditional portion of the
      SQL select command (e.g. "where X < Y").  This will be used to extract the
      observation metadata needed to load files.

    - If `volume_select` is not specified then all observations are chosen (either
      from the volume index if specified, or using the filesystem).  This full list
      of observations then has the regex `pattern` applied to the basename of each
      observation file.

    """

    # Class traits

    API = Int(0, help="Internal interface version for this operator")

    volume = Unicode(
        None, allow_none=True, help="Top-level directory containing the data volume"
    )

    volume_index = Unicode(
        "DEFAULT",
        allow_none=True,
        help=(
            "Path to index file.  None disables use of index.  "
            "'DEFAULT' uses the default VolumeIndex name at the top of the volume."
        ),
    )

    volume_select = Unicode(
        None,
        allow_none=True,
        help="SQL selection string applied to the observation table of the index.",
    )

    pattern = Unicode(
        r".*\.h5",
        help="Regexp pattern to match files against (if not using volume_select).",
    )

    files = List(
        [],
        help="Override `volume` and load a list of files",
    )

    meta = List([], help="Only load this list of meta objects")

    detdata = List(
        [defaults.det_data, defaults.det_flags],
        help="Only load this list of detdata objects",
    )

    det_select = Dict(
        {}, help="Keep a subset of detectors whose focalplane columns match."
    )

    shared = List([], help="Only load this list of shared objects")

    intervals = List([], help="Only load this list of intervals objects")

    sort_by_size = Bool(
        False, help="If True, sort observations by size before load balancing"
    )

    process_rows = Int(
        None,
        allow_none=True,
        help="The size of the rectangular process grid in the detector direction.",
    )

    force_serial = Bool(
        False, help="Use serial HDF5 operations, even if parallel support available"
    )

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    @function_timer
    def _exec(self, data, detectors=None, **kwargs):
        meta_fields = None
        if len(self.meta) > 0:
            meta_fields = list(self.meta)

        shared_fields = None
        if len(self.shared) > 0:
            shared_fields = list(self.shared)

        if len(self.detdata) > 0:
            detdata_fields = list(self.detdata)
        else:
            detdata_fields = list()

        intervals_fields = None
        if len(self.intervals) > 0:
            intervals_fields = list(self.intervals)

        # Get our list of observation files and relative sizes for load-balancing
        obs_props = self._get_obs_props(data.comm.comm_world)

        # Distribute observations among groups
        obs_sizes = [x[0] for x in obs_props]
        groupdist = distribute_discrete(obs_sizes, data.comm.ngroups)
        group_firstobs = groupdist[data.comm.group].offset
        group_numobs = groupdist[data.comm.group].n_elem

        # Every group loads its observations
        for obindx in range(group_firstobs, group_firstobs + group_numobs):
            obfile = obs_props[obindx][1]

            ob = load_hdf5(
                obfile,
                data.comm,
                process_rows=self.process_rows,
                meta=meta_fields,
                detdata=detdata_fields,
                shared=shared_fields,
                intervals=intervals_fields,
                force_serial=self.force_serial,
                det_select=self.det_select,
            )

            data.obs.append(ob)

        return

    def _get_obs_samples(self, path):
        """Helper function to extract the number of samples in each observation."""
        n_samp = 0
        with h5py.File(path, "r") as hf:
            n_samp = int(hf.attrs["observation_samples"])
        return n_samp

    def _get_obs_props(self, comm):
        """Query the index or filesystem for observations and their properties."""
        log = Logger.get()
        if comm is None:
            rank = 0
        else:
            rank = comm.rank

        # Index for the volume, if we are using it.
        if self.volume_index is None or len(self.files) > 0:
            # Either we are loading a list of files or disabling use of the index
            vindx = None
        else:
            if self.volume_index == "DEFAULT":
                index_path = os.path.join(self.volume, VolumeIndex.default_name)
            else:
                index_path = self.volume_index
            index_exists = False
            if rank == 0:
                if os.path.isfile(index_path):
                    index_exists = True
            if comm is not None:
                index_exists = comm.bcast(index_exists, root=0)
            if index_exists:
                vindx = VolumeIndex(index_path)
            else:
                msg = f"Volume index '{index_path}' does not exist, scanning filesystem for observations."
                log.warning_rank(msg, comm=comm)
                vindx = None

        # If using the index, the fields we are querying
        select_prefix = "select name, path, samples, valid_dets from "
        select_prefix += VolumeIndex.obs_table
        select_prefix += " "

        obs_props = None
        if rank == 0:
            # Perform all filesystem and index queries on one process.
            obs_props = list()
            if len(self.files) > 0:
                # The user gave an explicit list of files.  We use the number of
                # samples for load-balancing.
                if self.volume is not None:
                    log.warning(
                        f'LoadHDF5: volume="{self.volume}" trait overridden by '
                        f"files={self.files}"
                    )
                for ofile in self.files:
                    fsize = self._get_obs_samples(ofile)
                    obs_props.append((fsize, ofile))
                msg = "LoadHDF5 using specified file list with sizes: "
                msg += f"{obs_props}"
                log.verbose(msg)
            else:
                # We are using the volume trait.  Get the list of relative file paths
                # for each observation.
                if self.volume is None:
                    msg = "Either the volume or a list of files must be specified"
                    log.error(msg)
                    raise RuntimeError(msg)
                if self.volume_select is None:
                    # We are selecting all observations, and matching a regex.
                    if vindx is None:
                        # We have no index, just check the filesystem.  This is slow
                        # for many observations.  Use the number of samples for load
                        # balancing.
                        rel_files = VolumeIndex.find_observations(
                            self.volume, pattern_str=self.pattern
                        )
                        for rfile in rel_files:
                            full_path = os.path.join(self.volume, rfile)
                            fsize = self._get_obs_samples(full_path)
                            obs_props.append((fsize, full_path))
                        msg = "LoadHDF5 using volume with NO index and matching"
                        msg += f" filename pattern '{self.pattern}' found sizes: "
                        msg += f"{obs_props}"
                        log.verbose(msg)
                    else:
                        # We are using the index.  Get the full list of obs and then
                        # apply the regex.  We use the number of valid detector-samples
                        # for load balancing.
                        pattern = re.compile(self.pattern)
                        all_obs = vindx.select(select_prefix)
                        for oname, opath, osamples, odets in all_obs:
                            if pattern.search(opath) is not None:
                                sz = osamples * odets
                                full_path = os.path.join(self.volume, opath)
                                obs_props.append((sz, full_path))
                        msg = "LoadHDF5 using volume WITH index and matching"
                        msg += f" filename pattern '{self.pattern}' found sizes: "
                        msg += f"{obs_props}"
                        log.verbose(msg)
                else:
                    # Using a custom selection with the index.  Use the number of
                    # valid detector-samples for load balancing.
                    sel_str = f"{select_prefix}{self.volume_select}"
                    sel_obs = vindx.select(sel_str)
                    for oname, opath, osamples, odets in sel_obs:
                        sz = osamples * odets
                        full_path = os.path.join(self.volume, opath)
                        obs_props.append((sz, full_path))
                    msg = f"LoadHDF5 using volume with query '{self.volume_select}'"
                    msg += f" found sizes: {obs_props}"
                    log.verbose(msg)
            if self.sort_by_size:
                obs_props.sort(key=lambda x: x[0])
            else:
                obs_props.sort(key=lambda x: x[1])
        if comm is not None:
            obs_props = comm.bcast(obs_props, root=0)
        return obs_props

    def _finalize(self, data, **kwargs):
        return

    def _requires(self):
        return dict()

    def _provides(self):
        return dict()

API = Int(0, help='Internal interface version for this operator') class-attribute instance-attribute

det_select = Dict({}, help='Keep a subset of detectors whose focalplane columns match.') class-attribute instance-attribute

detdata = List([defaults.det_data, defaults.det_flags], help='Only load this list of detdata objects') class-attribute instance-attribute

files = List([], help='Override `volume` and load a list of files') class-attribute instance-attribute

force_serial = Bool(False, help='Use serial HDF5 operations, even if parallel support available') class-attribute instance-attribute

intervals = List([], help='Only load this list of intervals objects') class-attribute instance-attribute

meta = List([], help='Only load this list of meta objects') class-attribute instance-attribute

pattern = Unicode('.*\\.h5', help='Regexp pattern to match files against (if not using volume_select).') class-attribute instance-attribute

process_rows = Int(None, allow_none=True, help='The size of the rectangular process grid in the detector direction.') class-attribute instance-attribute

shared = List([], help='Only load this list of shared objects') class-attribute instance-attribute

sort_by_size = Bool(False, help='If True, sort observations by size before load balancing') class-attribute instance-attribute

volume = Unicode(None, allow_none=True, help='Top-level directory containing the data volume') class-attribute instance-attribute

volume_index = Unicode('DEFAULT', allow_none=True, help="Path to index file. None disables use of index. 'DEFAULT' uses the default VolumeIndex name at the top of the volume.") class-attribute instance-attribute

volume_select = Unicode(None, allow_none=True, help='SQL selection string applied to the observation table of the index.') class-attribute instance-attribute

__init__(**kwargs)

Source code in toast/ops/load_hdf5.py
104
105
def __init__(self, **kwargs):
    super().__init__(**kwargs)

_exec(data, detectors=None, **kwargs)

Source code in toast/ops/load_hdf5.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
@function_timer
def _exec(self, data, detectors=None, **kwargs):
    meta_fields = None
    if len(self.meta) > 0:
        meta_fields = list(self.meta)

    shared_fields = None
    if len(self.shared) > 0:
        shared_fields = list(self.shared)

    if len(self.detdata) > 0:
        detdata_fields = list(self.detdata)
    else:
        detdata_fields = list()

    intervals_fields = None
    if len(self.intervals) > 0:
        intervals_fields = list(self.intervals)

    # Get our list of observation files and relative sizes for load-balancing
    obs_props = self._get_obs_props(data.comm.comm_world)

    # Distribute observations among groups
    obs_sizes = [x[0] for x in obs_props]
    groupdist = distribute_discrete(obs_sizes, data.comm.ngroups)
    group_firstobs = groupdist[data.comm.group].offset
    group_numobs = groupdist[data.comm.group].n_elem

    # Every group loads its observations
    for obindx in range(group_firstobs, group_firstobs + group_numobs):
        obfile = obs_props[obindx][1]

        ob = load_hdf5(
            obfile,
            data.comm,
            process_rows=self.process_rows,
            meta=meta_fields,
            detdata=detdata_fields,
            shared=shared_fields,
            intervals=intervals_fields,
            force_serial=self.force_serial,
            det_select=self.det_select,
        )

        data.obs.append(ob)

    return

_finalize(data, **kwargs)

Source code in toast/ops/load_hdf5.py
274
275
def _finalize(self, data, **kwargs):
    return

_get_obs_props(comm)

Query the index or filesystem for observations and their properties.

Source code in toast/ops/load_hdf5.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
def _get_obs_props(self, comm):
    """Query the index or filesystem for observations and their properties."""
    log = Logger.get()
    if comm is None:
        rank = 0
    else:
        rank = comm.rank

    # Index for the volume, if we are using it.
    if self.volume_index is None or len(self.files) > 0:
        # Either we are loading a list of files or disabling use of the index
        vindx = None
    else:
        if self.volume_index == "DEFAULT":
            index_path = os.path.join(self.volume, VolumeIndex.default_name)
        else:
            index_path = self.volume_index
        index_exists = False
        if rank == 0:
            if os.path.isfile(index_path):
                index_exists = True
        if comm is not None:
            index_exists = comm.bcast(index_exists, root=0)
        if index_exists:
            vindx = VolumeIndex(index_path)
        else:
            msg = f"Volume index '{index_path}' does not exist, scanning filesystem for observations."
            log.warning_rank(msg, comm=comm)
            vindx = None

    # If using the index, the fields we are querying
    select_prefix = "select name, path, samples, valid_dets from "
    select_prefix += VolumeIndex.obs_table
    select_prefix += " "

    obs_props = None
    if rank == 0:
        # Perform all filesystem and index queries on one process.
        obs_props = list()
        if len(self.files) > 0:
            # The user gave an explicit list of files.  We use the number of
            # samples for load-balancing.
            if self.volume is not None:
                log.warning(
                    f'LoadHDF5: volume="{self.volume}" trait overridden by '
                    f"files={self.files}"
                )
            for ofile in self.files:
                fsize = self._get_obs_samples(ofile)
                obs_props.append((fsize, ofile))
            msg = "LoadHDF5 using specified file list with sizes: "
            msg += f"{obs_props}"
            log.verbose(msg)
        else:
            # We are using the volume trait.  Get the list of relative file paths
            # for each observation.
            if self.volume is None:
                msg = "Either the volume or a list of files must be specified"
                log.error(msg)
                raise RuntimeError(msg)
            if self.volume_select is None:
                # We are selecting all observations, and matching a regex.
                if vindx is None:
                    # We have no index, just check the filesystem.  This is slow
                    # for many observations.  Use the number of samples for load
                    # balancing.
                    rel_files = VolumeIndex.find_observations(
                        self.volume, pattern_str=self.pattern
                    )
                    for rfile in rel_files:
                        full_path = os.path.join(self.volume, rfile)
                        fsize = self._get_obs_samples(full_path)
                        obs_props.append((fsize, full_path))
                    msg = "LoadHDF5 using volume with NO index and matching"
                    msg += f" filename pattern '{self.pattern}' found sizes: "
                    msg += f"{obs_props}"
                    log.verbose(msg)
                else:
                    # We are using the index.  Get the full list of obs and then
                    # apply the regex.  We use the number of valid detector-samples
                    # for load balancing.
                    pattern = re.compile(self.pattern)
                    all_obs = vindx.select(select_prefix)
                    for oname, opath, osamples, odets in all_obs:
                        if pattern.search(opath) is not None:
                            sz = osamples * odets
                            full_path = os.path.join(self.volume, opath)
                            obs_props.append((sz, full_path))
                    msg = "LoadHDF5 using volume WITH index and matching"
                    msg += f" filename pattern '{self.pattern}' found sizes: "
                    msg += f"{obs_props}"
                    log.verbose(msg)
            else:
                # Using a custom selection with the index.  Use the number of
                # valid detector-samples for load balancing.
                sel_str = f"{select_prefix}{self.volume_select}"
                sel_obs = vindx.select(sel_str)
                for oname, opath, osamples, odets in sel_obs:
                    sz = osamples * odets
                    full_path = os.path.join(self.volume, opath)
                    obs_props.append((sz, full_path))
                msg = f"LoadHDF5 using volume with query '{self.volume_select}'"
                msg += f" found sizes: {obs_props}"
                log.verbose(msg)
        if self.sort_by_size:
            obs_props.sort(key=lambda x: x[0])
        else:
            obs_props.sort(key=lambda x: x[1])
    if comm is not None:
        obs_props = comm.bcast(obs_props, root=0)
    return obs_props

_get_obs_samples(path)

Helper function to extract the number of samples in each observation.

Source code in toast/ops/load_hdf5.py
155
156
157
158
159
160
def _get_obs_samples(self, path):
    """Helper function to extract the number of samples in each observation."""
    n_samp = 0
    with h5py.File(path, "r") as hf:
        n_samp = int(hf.attrs["observation_samples"])
    return n_samp

_provides()

Source code in toast/ops/load_hdf5.py
280
281
def _provides(self):
    return dict()

_requires()

Source code in toast/ops/load_hdf5.py
277
278
def _requires(self):
    return dict()

toast.ops.SaveSpt3g

Bases: Operator

Operator which saves observations to SPT3G frame files.

This creates a directory for each observation, and writes framefiles with a target size.

Source code in toast/ops/save_spt3g.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
@trait_docs
class SaveSpt3g(Operator):
    """Operator which saves observations to SPT3G frame files.

    This creates a directory for each observation, and writes framefiles with
    a target size.

    """

    # Class traits

    API = Int(0, help="Internal interface version for this operator")

    directory = Unicode("spt3g_data", help="Top-level export directory")

    framefile_mb = Float(100.0, help="Target frame file size in MB")

    gzip = Bool(False, help="If True, gzip compress the frame files")

    # FIXME:  We should add a filtering mechanism here to dump a subset of
    # observations and / or detectors.

    obs_export = Instance(
        klass=object,
        allow_none=True,
        help="Export class to create frames from an observation",
    )

    purge = Bool(False, help="If True, delete observation data as it is saved")

    @traitlets.validate("obs_export")
    def _check_export(self, proposal):
        ex = proposal["value"]
        if ex is not None:
            # Check that this class has an "export_rank" attribute and is callable.
            if not callable(ex):
                raise traitlets.TraitError("obs_export class must be callable")
            if not hasattr(ex, "export_rank"):
                raise traitlets.TraitError(
                    "obs_export class must have an export_rank attribute"
                )
        return ex

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        if not available:
            raise RuntimeError("spt3g is not available")

    @function_timer
    def _exec(self, data, detectors=None, **kwargs):
        log = Logger.get()

        # Check that the export class is set
        if self.obs_export is None:
            raise RuntimeError(
                "You must set the obs_export trait before calling exec()"
            )

        # One process creates the top directory
        if data.comm.world_rank == 0:
            os.makedirs(self.directory, exist_ok=True)
        if data.comm.comm_world is not None:
            data.comm.comm_world.barrier()

        for ob in data.obs:
            # Observations must have a name for this to work
            if ob.name is None:
                raise RuntimeError(
                    "Observations must have a name in order to save to SPT3G format"
                )
            ob_dir = os.path.join(self.directory, ob.name)
            if ob.comm.group_rank == 0:
                # Make observation directory.  This should NOT already exist.
                os.makedirs(ob_dir)
            if ob.comm.comm_group is not None:
                ob.comm.comm_group.barrier()

            # Export observation to frames
            frames = self.obs_export(ob)

            # If the export rank is set, then frames will be gathered to one
            # process and written.  Otherwise each process will write to
            # sequential, independent frame files.
            ex_rank = self.obs_export.export_rank
            if ex_rank is None:
                # All processes write independently
                emitter = frame_emitter(frames=frames)
                save_pipe = c3g.G3Pipeline()
                save_pipe.Add(emitter)
                fname = f"frames-{ob.comm.group_rank:04d}.g3"
                if self.gzip:
                    fname += ".gz"
                save_pipe.Add(
                    c3g.G3Writer,
                    filename=os.path.join(ob_dir, fname),
                )
                save_pipe.Run()
                del save_pipe
                del emitter
            else:
                # Gather frames to one process and write
                if ob.comm.group_rank == ex_rank:
                    emitter = frame_emitter(frames=frames)
                    save_pipe = c3g.G3Pipeline()
                    save_pipe.Add(emitter)
                    fpattern = "frames-%04u.g3"
                    if self.gzip:
                        fpattern += ".gz"
                    save_pipe.Add(
                        c3g.G3MultiFileWriter,
                        filename=os.path.join(ob_dir, fpattern),
                        size_limit=int(self.framefile_mb * 1024**2),
                    )
                    save_pipe.Run()
                    del save_pipe
                    del emitter

            if ob.comm.comm_group is not None:
                ob.comm.comm_group.barrier()

            if self.purge:
                ob.clear()

        if self.purge:
            data.obs.clear()
        return

    def _finalize(self, data, **kwargs):
        return

    def _requires(self):
        return dict()

    def _provides(self):
        return dict()

API = Int(0, help='Internal interface version for this operator') class-attribute instance-attribute

directory = Unicode('spt3g_data', help='Top-level export directory') class-attribute instance-attribute

framefile_mb = Float(100.0, help='Target frame file size in MB') class-attribute instance-attribute

gzip = Bool(False, help='If True, gzip compress the frame files') class-attribute instance-attribute

obs_export = Instance(klass=object, allow_none=True, help='Export class to create frames from an observation') class-attribute instance-attribute

purge = Bool(False, help='If True, delete observation data as it is saved') class-attribute instance-attribute

__init__(**kwargs)

Source code in toast/ops/save_spt3g.py
65
66
67
68
def __init__(self, **kwargs):
    super().__init__(**kwargs)
    if not available:
        raise RuntimeError("spt3g is not available")

_check_export(proposal)

Source code in toast/ops/save_spt3g.py
52
53
54
55
56
57
58
59
60
61
62
63
@traitlets.validate("obs_export")
def _check_export(self, proposal):
    ex = proposal["value"]
    if ex is not None:
        # Check that this class has an "export_rank" attribute and is callable.
        if not callable(ex):
            raise traitlets.TraitError("obs_export class must be callable")
        if not hasattr(ex, "export_rank"):
            raise traitlets.TraitError(
                "obs_export class must have an export_rank attribute"
            )
    return ex

_exec(data, detectors=None, **kwargs)

Source code in toast/ops/save_spt3g.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
@function_timer
def _exec(self, data, detectors=None, **kwargs):
    log = Logger.get()

    # Check that the export class is set
    if self.obs_export is None:
        raise RuntimeError(
            "You must set the obs_export trait before calling exec()"
        )

    # One process creates the top directory
    if data.comm.world_rank == 0:
        os.makedirs(self.directory, exist_ok=True)
    if data.comm.comm_world is not None:
        data.comm.comm_world.barrier()

    for ob in data.obs:
        # Observations must have a name for this to work
        if ob.name is None:
            raise RuntimeError(
                "Observations must have a name in order to save to SPT3G format"
            )
        ob_dir = os.path.join(self.directory, ob.name)
        if ob.comm.group_rank == 0:
            # Make observation directory.  This should NOT already exist.
            os.makedirs(ob_dir)
        if ob.comm.comm_group is not None:
            ob.comm.comm_group.barrier()

        # Export observation to frames
        frames = self.obs_export(ob)

        # If the export rank is set, then frames will be gathered to one
        # process and written.  Otherwise each process will write to
        # sequential, independent frame files.
        ex_rank = self.obs_export.export_rank
        if ex_rank is None:
            # All processes write independently
            emitter = frame_emitter(frames=frames)
            save_pipe = c3g.G3Pipeline()
            save_pipe.Add(emitter)
            fname = f"frames-{ob.comm.group_rank:04d}.g3"
            if self.gzip:
                fname += ".gz"
            save_pipe.Add(
                c3g.G3Writer,
                filename=os.path.join(ob_dir, fname),
            )
            save_pipe.Run()
            del save_pipe
            del emitter
        else:
            # Gather frames to one process and write
            if ob.comm.group_rank == ex_rank:
                emitter = frame_emitter(frames=frames)
                save_pipe = c3g.G3Pipeline()
                save_pipe.Add(emitter)
                fpattern = "frames-%04u.g3"
                if self.gzip:
                    fpattern += ".gz"
                save_pipe.Add(
                    c3g.G3MultiFileWriter,
                    filename=os.path.join(ob_dir, fpattern),
                    size_limit=int(self.framefile_mb * 1024**2),
                )
                save_pipe.Run()
                del save_pipe
                del emitter

        if ob.comm.comm_group is not None:
            ob.comm.comm_group.barrier()

        if self.purge:
            ob.clear()

    if self.purge:
        data.obs.clear()
    return

_finalize(data, **kwargs)

Source code in toast/ops/save_spt3g.py
149
150
def _finalize(self, data, **kwargs):
    return

_provides()

Source code in toast/ops/save_spt3g.py
155
156
def _provides(self):
    return dict()

_requires()

Source code in toast/ops/save_spt3g.py
152
153
def _requires(self):
    return dict()

toast.ops.LoadSpt3g

Bases: Operator

Operator which loads SPT3G data files into observations.

This operator expects a top-level data directory.

Source code in toast/ops/load_spt3g.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
@trait_docs
class LoadSpt3g(Operator):
    """Operator which loads SPT3G data files into observations.

    This operator expects a top-level data directory.

    """

    # Class traits

    API = Int(0, help="Internal interface version for this operator")

    directory = Unicode(
        None, allow_none=True, help="Top-level directory of observations"
    )

    # FIXME:  We should add a filtering mechanism here to load a subset of
    # observations and / or detectors.

    obs_import = Instance(
        klass=object,
        allow_none=True,
        help="Import class to create observations from frame files",
    )

    @traitlets.validate("obs_import")
    def _check_import(self, proposal):
        im = proposal["value"]
        if im is not None:
            # Check that this class has an "import_rank" attribute and is callable.
            if not callable(im):
                raise traitlets.TraitError("obs_import class must be callable")
            if not hasattr(im, "import_rank"):
                raise traitlets.TraitError(
                    "obs_import class must have an import_rank attribute"
                )
        return im

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        if not available:
            raise RuntimeError("spt3g is not available")

    @function_timer
    def _exec(self, data, detectors=None, **kwargs):
        log = Logger.get()

        # Check that the import class is set
        if self.obs_import is None:
            raise RuntimeError(
                "You must set the obs_import trait before calling exec()"
            )

        # Find the process rank which will load all frames for a given group.
        im_rank = self.obs_import.import_rank
        if im_rank is None:
            raise RuntimeError(
                "The obs_import class must be configured to import frames on one process"
            )

        # FIXME:  we should add a check that self.obs_import.comm is the same as
        # the data.comm.comm_group communicator.

        # One global process computes the list of observations and their approximate
        # relative size (based on the total framefile sizes for each).

        obs_props = list()
        if data.comm.world_rank == 0:
            for root, dirs, files in os.walk(self.directory):
                for d in dirs:
                    framefiles = glob.glob(os.path.join(root, d, "*.g3"))
                    if len(framefiles) > 0:
                        # This is an observation directory
                        flist = list()
                        obtotal = 0
                        for ffile in sorted(framefiles):
                            fsize = os.path.getsize(ffile)
                            obtotal += fsize
                            flist.append(ffile)
                        obs_props.append((d, obtotal, flist))
                break
        obs_props.sort(key=lambda x: x[0])
        if data.comm.comm_world is not None:
            obs_props = data.comm.comm_world.bcast(obs_props, root=0)

        # Distribute observations among groups

        obs_sizes = [x[1] for x in obs_props]
        groupdist = distribute_discrete(obs_sizes, data.comm.ngroups)
        group_firstobs = groupdist[data.comm.group][0]
        group_numobs = groupdist[data.comm.group][1]

        # Every group creates its observations
        for obindx in range(group_firstobs, group_firstobs + group_numobs):
            obname = obs_props[obindx][0]
            obfiles = obs_props[obindx][2]

            collect = frame_collector()

            if data.comm.group_rank == im_rank:
                all_frames = list()
                # We are loading the frames
                for ffile in obfiles:
                    load_pipe = c3g.G3Pipeline()
                    load_pipe.Add(c3g.G3Reader(ffile))
                    load_pipe.Add(collect)
                    load_pipe.Run()

            # Convert collected frames to an observation
            ob = self.obs_import(collect.frames)
            data.obs.append(ob)
            del collect

        return

    def _finalize(self, data, **kwargs):
        return

    def _requires(self):
        return dict()

    def _provides(self):
        return dict()

API = Int(0, help='Internal interface version for this operator') class-attribute instance-attribute

directory = Unicode(None, allow_none=True, help='Top-level directory of observations') class-attribute instance-attribute

obs_import = Instance(klass=object, allow_none=True, help='Import class to create observations from frame files') class-attribute instance-attribute

__init__(**kwargs)

Source code in toast/ops/load_spt3g.py
62
63
64
65
def __init__(self, **kwargs):
    super().__init__(**kwargs)
    if not available:
        raise RuntimeError("spt3g is not available")

_check_import(proposal)

Source code in toast/ops/load_spt3g.py
49
50
51
52
53
54
55
56
57
58
59
60
@traitlets.validate("obs_import")
def _check_import(self, proposal):
    im = proposal["value"]
    if im is not None:
        # Check that this class has an "import_rank" attribute and is callable.
        if not callable(im):
            raise traitlets.TraitError("obs_import class must be callable")
        if not hasattr(im, "import_rank"):
            raise traitlets.TraitError(
                "obs_import class must have an import_rank attribute"
            )
    return im

_exec(data, detectors=None, **kwargs)

Source code in toast/ops/load_spt3g.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
@function_timer
def _exec(self, data, detectors=None, **kwargs):
    log = Logger.get()

    # Check that the import class is set
    if self.obs_import is None:
        raise RuntimeError(
            "You must set the obs_import trait before calling exec()"
        )

    # Find the process rank which will load all frames for a given group.
    im_rank = self.obs_import.import_rank
    if im_rank is None:
        raise RuntimeError(
            "The obs_import class must be configured to import frames on one process"
        )

    # FIXME:  we should add a check that self.obs_import.comm is the same as
    # the data.comm.comm_group communicator.

    # One global process computes the list of observations and their approximate
    # relative size (based on the total framefile sizes for each).

    obs_props = list()
    if data.comm.world_rank == 0:
        for root, dirs, files in os.walk(self.directory):
            for d in dirs:
                framefiles = glob.glob(os.path.join(root, d, "*.g3"))
                if len(framefiles) > 0:
                    # This is an observation directory
                    flist = list()
                    obtotal = 0
                    for ffile in sorted(framefiles):
                        fsize = os.path.getsize(ffile)
                        obtotal += fsize
                        flist.append(ffile)
                    obs_props.append((d, obtotal, flist))
            break
    obs_props.sort(key=lambda x: x[0])
    if data.comm.comm_world is not None:
        obs_props = data.comm.comm_world.bcast(obs_props, root=0)

    # Distribute observations among groups

    obs_sizes = [x[1] for x in obs_props]
    groupdist = distribute_discrete(obs_sizes, data.comm.ngroups)
    group_firstobs = groupdist[data.comm.group][0]
    group_numobs = groupdist[data.comm.group][1]

    # Every group creates its observations
    for obindx in range(group_firstobs, group_firstobs + group_numobs):
        obname = obs_props[obindx][0]
        obfiles = obs_props[obindx][2]

        collect = frame_collector()

        if data.comm.group_rank == im_rank:
            all_frames = list()
            # We are loading the frames
            for ffile in obfiles:
                load_pipe = c3g.G3Pipeline()
                load_pipe.Add(c3g.G3Reader(ffile))
                load_pipe.Add(collect)
                load_pipe.Run()

        # Convert collected frames to an observation
        ob = self.obs_import(collect.frames)
        data.obs.append(ob)
        del collect

    return

_finalize(data, **kwargs)

Source code in toast/ops/load_spt3g.py
139
140
def _finalize(self, data, **kwargs):
    return

_provides()

Source code in toast/ops/load_spt3g.py
145
146
def _provides(self):
    return dict()

_requires()

Source code in toast/ops/load_spt3g.py
142
143
def _requires(self):
    return dict()