Skip to content

Data Containers

TOAST uses several key classes for storing time ordered data, instrument properties, and pixel domain data. TOAST supports fully distributing time and pixel domain data across multiple nodes of a cluster or supercomputer.

Data Object

toast.mpi.Comm

Bases: object

Class which represents a two-level hierarchy of MPI communicators.

A Comm object splits the full set of processes into groups of size groupsize. If groupsize does not divide evenly into the size of the given communicator, then some processes remain idle.

A Comm object stores several MPI communicators: The "world" communicator given here, which contains all processes to consider, a "group" communicator (one per group), and a "rank" communicator which contains the processes with the same group-rank across all groups.

This object also stores a "node" communicator containing all processes with access to the same shared memory, and a "node rank" communicator for processes with the same rank on a node. There is a node rank communicator for all nodes and also one for within the group.

Additionally, there is a mechanism for creating and caching row / column communicators for process grids within a group.

If MPI is not enabled, then all communicators are set to None. Additionally, there may be cases where MPI is enabled in the environment, but the user wishes to disable it when creating a Comm object. This can be done by passing MPI.COMM_SELF as the world communicator.

Parameters:

Name Type Description Default
world Comm

the MPI communicator containing all processes.

None
groupsize int

the size of each process group.

0
Source code in toast/mpi.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
class Comm(object):
    """Class which represents a two-level hierarchy of MPI communicators.

    A Comm object splits the full set of processes into groups of size
    `groupsize`.  If groupsize does not divide evenly into the size of the given
    communicator, then some processes remain idle.

    A Comm object stores several MPI communicators:  The "world" communicator
    given here, which contains all processes to consider, a "group"
    communicator (one per group), and a "rank" communicator which contains the
    processes with the same group-rank across all groups.

    This object also stores a "node" communicator containing all processes with
    access to the same shared memory, and a "node rank" communicator for
    processes with the same rank on a node.  There is a node rank communicator
    for all nodes and also one for within the group.

    Additionally, there is a mechanism for creating and caching row / column
    communicators for process grids within a group.

    If MPI is not enabled, then all communicators are set to None.  Additionally,
    there may be cases where MPI is enabled in the environment, but the user wishes
    to disable it when creating a Comm object.  This can be done by passing
    MPI.COMM_SELF as the world communicator.

    Args:
        world (mpi4py.MPI.Comm): the MPI communicator containing all processes.
        groupsize (int): the size of each process group.

    """

    def __init__(self, world=None, groupsize=0):
        log = Logger.get()
        if world is None:
            if use_mpi:
                # Default is COMM_WORLD
                world = MPI.COMM_WORLD
            else:
                # MPI is disabled, leave world as None.
                pass
        else:
            if use_mpi:
                # We were passed a communicator to use. Check that it is
                # actually a communicator, otherwise fall back to COMM_WORLD.
                if not isinstance(world, MPI.Comm):
                    log.warning(
                        "Specified world communicator is not a valid "
                        "mpi4py.MPI.Comm object.  Using COMM_WORLD."
                    )
                    world = MPI.COMM_WORLD
            else:
                log.warning(
                    "World communicator specified even though "
                    "MPI is disabled.  Ignoring this constructor "
                    "argument."
                )
                world = None
            # Special case, MPI available but the user wants a serial
            # data object
            if world == MPI.COMM_SELF:
                world = None

        self._wcomm = world
        self._wrank = 0
        self._wsize = 1
        self._nodecomm = None
        self._noderankcomm = None
        self._nodeprocs = 1
        self._noderankprocs = 1
        if self._wcomm is not None:
            self._wrank = self._wcomm.rank
            self._wsize = self._wcomm.size
            self._nodecomm = self._wcomm.Split_type(MPI.COMM_TYPE_SHARED, 0)
            self._nodeprocs = self._nodecomm.size
            myworldnode = self._wrank // self._nodeprocs
            self._noderankcomm = self._wcomm.Split(self._nodecomm.rank, myworldnode)
            self._noderankprocs = self._noderankcomm.size

        self._gsize = groupsize

        if (self._gsize < 0) or (self._gsize > self._wsize):
            log.warning(
                "Invalid groupsize ({}).  Should be between {} "
                "and {}.  Using single process group instead.".format(
                    groupsize, 0, self._wsize
                )
            )
            self._gsize = 0

        if self._gsize == 0:
            self._gsize = self._wsize

        self._ngroups = self._wsize // self._gsize

        if self._ngroups * self._gsize != self._wsize:
            msg = (
                "World communicator size ({}) is not evenly divisible "
                "by requested group size ({}).".format(self._wsize, self._gsize)
            )
            log.error(msg)
            raise RuntimeError(msg)

        if self._gsize > self._nodeprocs:
            if self._gsize % self._nodeprocs != 0:
                msg = f"Group size of {self._gsize} is not a whole number of "
                msg += f"nodes (there are {self._nodeprocs} processes per node)"
                log.error(msg)
                raise RuntimeError(msg)
            self._gnodes = self._gsize // self._nodeprocs
        else:
            self._gnodes = 1
        self._group = self._wrank // self._gsize
        self._grank = self._wrank % self._gsize
        self._cleanup_group_comm = False

        self._gnodecomm = None
        self._gnoderankcomm = None
        self._gnodeprocs = 1
        if self._ngroups == 1:
            # We just have one group with all processes.
            self._gcomm = self._wcomm
            self._gnodecomm = self._nodecomm
            self._gnoderankcomm = self._noderankcomm
            self._rcomm = None
            self._gnoderankprocs = self._noderankprocs
        else:
            # We need to split the communicator.  This code is never executed
            # unless MPI is enabled and we have multiple groups.
            self._gcomm = self._wcomm.Split(self._group, self._grank)
            self._rcomm = self._wcomm.Split(self._grank, self._group)
            self._gnodecomm = self._gcomm.Split_type(MPI.COMM_TYPE_SHARED, 0)
            self._gnodeprocs = self._gnodecomm.size
            mygroupnode = self._grank // self._gnodeprocs
            self._gnoderankcomm = self._gcomm.Split(self._gnodecomm.rank, mygroupnode)
            self._gnoderankprocs = self._gnoderankcomm.size
            self._cleanup_group_comm = True

        msg = f"Comm on world rank {self._wrank}:\n"
        msg += f"  world comm = {self._wcomm} with {self._wsize} ranks\n"
        msg += (
            f"  intra-node comm = {self._nodecomm} ({self._nodeprocs} ranks per node)\n"
        )
        msg += f"  inter-node rank comm = {self._noderankcomm} ({self._noderankprocs} ranks)\n"
        msg += (
            f"  in group {self._group + 1} / {self._ngroups} with rank {self._grank}\n"
        )
        msg += f"  intra-group comm = {self._gcomm} ({self._gsize} ranks)\n"
        msg += f"  inter-group rank comm = {self._rcomm}\n"
        msg += f"  intra-node group comm = {self._gnodecomm} ({self._gnodeprocs} ranks per node)\n"
        msg += f"  inter-node group rank comm = {self._gnoderankcomm} ({self._noderankprocs} ranks)\n"
        log.verbose(msg)

        if self._gnoderankprocs != self._gnodes:
            msg = f"Number of group node rank procs ({self._gnoderankprocs}) does "
            msg += f"not match the number of nodes in a group ({self._gnodes})"
            log.error(msg)
            raise RuntimeError(msg)

        # Create a cache of row / column communicators for each group.  These can
        # then be re-used for observations with the same grid shapes.
        self._rowcolcomm = dict()

    def close(self):
        # Explicitly free communicators if needed.
        # Go through the cache of row / column grid communicators and free
        if hasattr(self, "_rowcolcomm"):
            for process_rows, comms in self._rowcolcomm.items():
                if comms["cleanup"]:
                    # We previously allocated new communicators for this grid.
                    # Free them now.
                    for subcomm in [
                        "row_rank_node",
                        "row_node",
                        "col_rank_node",
                        "col_node",
                        "row",
                        "col",
                    ]:
                        if comms[subcomm] is not None:
                            comms[subcomm].Free()
                            del comms[subcomm]
            del self._rowcolcomm
        # Optionally delete the group communicators if they were created.
        if hasattr(self, "_cleanup_group_comm") and self._cleanup_group_comm:
            self._gcomm.Free()
            self._rcomm.Free()
            self._gnodecomm.Free()
            self._gnoderankcomm.Free()
            del self._gcomm
            del self._rcomm
            del self._gnodecomm
            del self._gnoderankcomm
            del self._cleanup_group_comm
        # We always need to clean up the world node and node-rank communicators
        # if they exist
        if hasattr(self, "_noderankcomm") and self._noderankcomm is not None:
            self._noderankcomm.Free()
            del self._noderankcomm
        if hasattr(self, "_nodecomm") and self._nodecomm is not None:
            self._nodecomm.Free()
            del self._nodecomm
        return

    def __del__(self):
        self.close()

    @property
    def world_size(self):
        """The size of the world communicator."""
        return self._wsize

    @property
    def world_rank(self):
        """The rank of this process in the world communicator."""
        return self._wrank

    @property
    def ngroups(self):
        """The number of process groups."""
        return self._ngroups

    @property
    def group(self):
        """The group containing this process."""
        return self._group

    @property
    def group_size(self):
        """The size of the group containing this process."""
        return self._gsize

    @property
    def group_rank(self):
        """The rank of this process in the group communicator."""
        return self._grank

    # All the different types of relevant MPI communicators.

    @property
    def comm_world(self):
        """The world communicator."""
        return self._wcomm

    @property
    def comm_world_node(self):
        """The communicator shared by world processes on the same node."""
        return self._nodecomm

    @property
    def comm_world_node_rank(self):
        """The communicator shared by world processes with the same node rank across all nodes."""
        return self._noderankcomm

    @property
    def comm_group(self):
        """The communicator shared by processes within this group."""
        return self._gcomm

    @property
    def comm_group_rank(self):
        """The communicator shared by processes with the same group_rank."""
        return self._rcomm

    @property
    def comm_group_node(self):
        """The communicator shared by group processes on the same node."""
        return self._gnodecomm

    @property
    def comm_group_node_rank(self):
        """The communicator shared by group processes with the same node rank on nodes within the group."""
        return self._gnoderankcomm

    def comm_row_col(self, process_rows):
        """Return the row and column communicators for this group and grid shape.

        This function will create and / or return the communicators needed for
        a given process grid.  The return value is a dictionary with the following
        keys:

            - "row":  The row communicator.
            - "col":  The column communicator.
            - "row_node":  The node-local communicator within the row communicator
            - "col_node":  The node-local communicator within the col communicator
            - "row_rank_node":  The communicator across nodes among processes with
                the same node-rank within the row communicator.
            - "col_rank_node":  The communicator across nodes among processes with
                the same node-rank within the column communicator.

        Args:
            process_rows (int):  The number of rows in the process grid.

        Returns:
            (dict):  The communicators for this grid shape.

        """
        if process_rows not in self._rowcolcomm:
            # Does not exist yet, create it.
            if self._gcomm is None:
                if process_rows != 1:
                    msg = "MPI not in use, so only process_rows == 1 is allowed"
                    log.error(msg)
                    raise ValueError(msg)
                self._rowcolcomm[process_rows] = {
                    "row": None,
                    "col": None,
                    "row_node": None,
                    "row_rank_node": None,
                    "col_node": None,
                    "col_rank_node": None,
                    "cleanup": False,
                }
            else:
                if self._gcomm.size % process_rows != 0:
                    msg = f"The number of process_rows ({process_rows}) "
                    msg += f"does not divide evenly into the communicator "
                    msg += f"size ({self._gcomm.size})"
                    log.error(msg)
                    raise RuntimeError(msg)
                process_cols = self._gcomm.size // process_rows

                if process_rows == 1:
                    # We can re-use the group communicators as the grid column
                    # communicators
                    comm_row = self._gcomm
                    comm_row_node = self._gnodecomm
                    comm_row_rank_node = self._gnoderankcomm
                    comm_col = None
                    comm_col_node = None
                    comm_col_rank_node = None
                    cleanup = False
                elif process_cols == 1:
                    # We can re-use the group communicators as the grid row
                    # communicators
                    comm_col = self._gcomm
                    comm_col_node = self._gnodecomm
                    comm_col_rank_node = self._gnoderankcomm
                    comm_row = None
                    comm_row_node = None
                    comm_row_rank_node = None
                    cleanup = False
                else:
                    # We have to create new split communicators
                    col_rank = self._gcomm.rank // process_cols
                    row_rank = self._gcomm.rank % process_cols
                    comm_row = self._gcomm.Split(col_rank, row_rank)
                    comm_col = self._gcomm.Split(row_rank, col_rank)

                    # Node and node-rank comms for each row and col.
                    comm_row_node = comm_row.Split_type(MPI.COMM_TYPE_SHARED, 0)
                    row_nodeprocs = comm_row_node.size
                    row_node = comm_row.rank // row_nodeprocs
                    comm_row_rank_node = comm_row.Split(comm_row_node.rank, row_node)

                    comm_col_node = comm_col.Split_type(MPI.COMM_TYPE_SHARED, 0)
                    col_nodeprocs = comm_col_node.size
                    col_node = comm_col.rank // col_nodeprocs
                    comm_col_rank_node = comm_col.Split(comm_col_node.rank, col_node)
                    cleanup = True

                msg = f"Comm on world rank {self._wrank} create grid with {process_rows} rows:\n"
                msg += f"  row comm = {comm_row}\n"
                msg += f"  node comm = {comm_row_node}\n"
                msg += f"  node rank comm = {comm_row_rank_node}\n"
                msg += f"  col comm = {comm_col}\n"
                msg += f"  node comm = {comm_col_node}\n"
                msg += f"  node rank comm = {comm_col_rank_node}"
                log.verbose(msg)

                self._rowcolcomm[process_rows] = {
                    "row": comm_row,
                    "row_node": comm_row_node,
                    "row_rank_node": comm_row_rank_node,
                    "col": comm_col,
                    "col_node": comm_col_node,
                    "col_rank_node": comm_col_rank_node,
                    "cleanup": cleanup,
                }
        return self._rowcolcomm[process_rows]

    def __repr__(self):
        lines = [
            "  World MPI communicator = {}".format(self._wcomm),
            "  World MPI size = {}".format(self._wsize),
            "  World MPI rank = {}".format(self._wrank),
            "  Group MPI communicator = {}".format(self._gcomm),
            "  Group MPI size = {}".format(self._gsize),
            "  Group MPI rank = {}".format(self._grank),
            "  Rank MPI communicator = {}".format(self._rcomm),
        ]
        return "<toast.Comm\n{}\n>".format("\n".join(lines))

_cleanup_group_comm = False instance-attribute

_gcomm = self._wcomm instance-attribute

_gnodecomm = None instance-attribute

_gnodeprocs = 1 instance-attribute

_gnoderankcomm = None instance-attribute

_gnoderankprocs = self._noderankprocs instance-attribute

_gnodes = self._gsize // self._nodeprocs instance-attribute

_grank = self._wrank % self._gsize instance-attribute

_group = self._wrank // self._gsize instance-attribute

_gsize = groupsize instance-attribute

_ngroups = self._wsize // self._gsize instance-attribute

_nodecomm = None instance-attribute

_nodeprocs = 1 instance-attribute

_noderankcomm = None instance-attribute

_noderankprocs = 1 instance-attribute

_rcomm = None instance-attribute

_rowcolcomm = dict() instance-attribute

_wcomm = world instance-attribute

_wrank = 0 instance-attribute

_wsize = 1 instance-attribute

comm_group property

The communicator shared by processes within this group.

comm_group_node property

The communicator shared by group processes on the same node.

comm_group_node_rank property

The communicator shared by group processes with the same node rank on nodes within the group.

comm_group_rank property

The communicator shared by processes with the same group_rank.

comm_world property

The world communicator.

comm_world_node property

The communicator shared by world processes on the same node.

comm_world_node_rank property

The communicator shared by world processes with the same node rank across all nodes.

group property

The group containing this process.

group_rank property

The rank of this process in the group communicator.

group_size property

The size of the group containing this process.

ngroups property

The number of process groups.

world_rank property

The rank of this process in the world communicator.

world_size property

The size of the world communicator.

__del__()

Source code in toast/mpi.py
316
317
def __del__(self):
    self.close()

__init__(world=None, groupsize=0)

Source code in toast/mpi.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
def __init__(self, world=None, groupsize=0):
    log = Logger.get()
    if world is None:
        if use_mpi:
            # Default is COMM_WORLD
            world = MPI.COMM_WORLD
        else:
            # MPI is disabled, leave world as None.
            pass
    else:
        if use_mpi:
            # We were passed a communicator to use. Check that it is
            # actually a communicator, otherwise fall back to COMM_WORLD.
            if not isinstance(world, MPI.Comm):
                log.warning(
                    "Specified world communicator is not a valid "
                    "mpi4py.MPI.Comm object.  Using COMM_WORLD."
                )
                world = MPI.COMM_WORLD
        else:
            log.warning(
                "World communicator specified even though "
                "MPI is disabled.  Ignoring this constructor "
                "argument."
            )
            world = None
        # Special case, MPI available but the user wants a serial
        # data object
        if world == MPI.COMM_SELF:
            world = None

    self._wcomm = world
    self._wrank = 0
    self._wsize = 1
    self._nodecomm = None
    self._noderankcomm = None
    self._nodeprocs = 1
    self._noderankprocs = 1
    if self._wcomm is not None:
        self._wrank = self._wcomm.rank
        self._wsize = self._wcomm.size
        self._nodecomm = self._wcomm.Split_type(MPI.COMM_TYPE_SHARED, 0)
        self._nodeprocs = self._nodecomm.size
        myworldnode = self._wrank // self._nodeprocs
        self._noderankcomm = self._wcomm.Split(self._nodecomm.rank, myworldnode)
        self._noderankprocs = self._noderankcomm.size

    self._gsize = groupsize

    if (self._gsize < 0) or (self._gsize > self._wsize):
        log.warning(
            "Invalid groupsize ({}).  Should be between {} "
            "and {}.  Using single process group instead.".format(
                groupsize, 0, self._wsize
            )
        )
        self._gsize = 0

    if self._gsize == 0:
        self._gsize = self._wsize

    self._ngroups = self._wsize // self._gsize

    if self._ngroups * self._gsize != self._wsize:
        msg = (
            "World communicator size ({}) is not evenly divisible "
            "by requested group size ({}).".format(self._wsize, self._gsize)
        )
        log.error(msg)
        raise RuntimeError(msg)

    if self._gsize > self._nodeprocs:
        if self._gsize % self._nodeprocs != 0:
            msg = f"Group size of {self._gsize} is not a whole number of "
            msg += f"nodes (there are {self._nodeprocs} processes per node)"
            log.error(msg)
            raise RuntimeError(msg)
        self._gnodes = self._gsize // self._nodeprocs
    else:
        self._gnodes = 1
    self._group = self._wrank // self._gsize
    self._grank = self._wrank % self._gsize
    self._cleanup_group_comm = False

    self._gnodecomm = None
    self._gnoderankcomm = None
    self._gnodeprocs = 1
    if self._ngroups == 1:
        # We just have one group with all processes.
        self._gcomm = self._wcomm
        self._gnodecomm = self._nodecomm
        self._gnoderankcomm = self._noderankcomm
        self._rcomm = None
        self._gnoderankprocs = self._noderankprocs
    else:
        # We need to split the communicator.  This code is never executed
        # unless MPI is enabled and we have multiple groups.
        self._gcomm = self._wcomm.Split(self._group, self._grank)
        self._rcomm = self._wcomm.Split(self._grank, self._group)
        self._gnodecomm = self._gcomm.Split_type(MPI.COMM_TYPE_SHARED, 0)
        self._gnodeprocs = self._gnodecomm.size
        mygroupnode = self._grank // self._gnodeprocs
        self._gnoderankcomm = self._gcomm.Split(self._gnodecomm.rank, mygroupnode)
        self._gnoderankprocs = self._gnoderankcomm.size
        self._cleanup_group_comm = True

    msg = f"Comm on world rank {self._wrank}:\n"
    msg += f"  world comm = {self._wcomm} with {self._wsize} ranks\n"
    msg += (
        f"  intra-node comm = {self._nodecomm} ({self._nodeprocs} ranks per node)\n"
    )
    msg += f"  inter-node rank comm = {self._noderankcomm} ({self._noderankprocs} ranks)\n"
    msg += (
        f"  in group {self._group + 1} / {self._ngroups} with rank {self._grank}\n"
    )
    msg += f"  intra-group comm = {self._gcomm} ({self._gsize} ranks)\n"
    msg += f"  inter-group rank comm = {self._rcomm}\n"
    msg += f"  intra-node group comm = {self._gnodecomm} ({self._gnodeprocs} ranks per node)\n"
    msg += f"  inter-node group rank comm = {self._gnoderankcomm} ({self._noderankprocs} ranks)\n"
    log.verbose(msg)

    if self._gnoderankprocs != self._gnodes:
        msg = f"Number of group node rank procs ({self._gnoderankprocs}) does "
        msg += f"not match the number of nodes in a group ({self._gnodes})"
        log.error(msg)
        raise RuntimeError(msg)

    # Create a cache of row / column communicators for each group.  These can
    # then be re-used for observations with the same grid shapes.
    self._rowcolcomm = dict()

__repr__()

Source code in toast/mpi.py
493
494
495
496
497
498
499
500
501
502
503
def __repr__(self):
    lines = [
        "  World MPI communicator = {}".format(self._wcomm),
        "  World MPI size = {}".format(self._wsize),
        "  World MPI rank = {}".format(self._wrank),
        "  Group MPI communicator = {}".format(self._gcomm),
        "  Group MPI size = {}".format(self._gsize),
        "  Group MPI rank = {}".format(self._grank),
        "  Rank MPI communicator = {}".format(self._rcomm),
    ]
    return "<toast.Comm\n{}\n>".format("\n".join(lines))

close()

Source code in toast/mpi.py
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
def close(self):
    # Explicitly free communicators if needed.
    # Go through the cache of row / column grid communicators and free
    if hasattr(self, "_rowcolcomm"):
        for process_rows, comms in self._rowcolcomm.items():
            if comms["cleanup"]:
                # We previously allocated new communicators for this grid.
                # Free them now.
                for subcomm in [
                    "row_rank_node",
                    "row_node",
                    "col_rank_node",
                    "col_node",
                    "row",
                    "col",
                ]:
                    if comms[subcomm] is not None:
                        comms[subcomm].Free()
                        del comms[subcomm]
        del self._rowcolcomm
    # Optionally delete the group communicators if they were created.
    if hasattr(self, "_cleanup_group_comm") and self._cleanup_group_comm:
        self._gcomm.Free()
        self._rcomm.Free()
        self._gnodecomm.Free()
        self._gnoderankcomm.Free()
        del self._gcomm
        del self._rcomm
        del self._gnodecomm
        del self._gnoderankcomm
        del self._cleanup_group_comm
    # We always need to clean up the world node and node-rank communicators
    # if they exist
    if hasattr(self, "_noderankcomm") and self._noderankcomm is not None:
        self._noderankcomm.Free()
        del self._noderankcomm
    if hasattr(self, "_nodecomm") and self._nodecomm is not None:
        self._nodecomm.Free()
        del self._nodecomm
    return

comm_row_col(process_rows)

Return the row and column communicators for this group and grid shape.

This function will create and / or return the communicators needed for a given process grid. The return value is a dictionary with the following keys:

- "row":  The row communicator.
- "col":  The column communicator.
- "row_node":  The node-local communicator within the row communicator
- "col_node":  The node-local communicator within the col communicator
- "row_rank_node":  The communicator across nodes among processes with
    the same node-rank within the row communicator.
- "col_rank_node":  The communicator across nodes among processes with
    the same node-rank within the column communicator.

Parameters:

Name Type Description Default
process_rows int

The number of rows in the process grid.

required

Returns:

Type Description
dict

The communicators for this grid shape.

Source code in toast/mpi.py
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
def comm_row_col(self, process_rows):
    """Return the row and column communicators for this group and grid shape.

    This function will create and / or return the communicators needed for
    a given process grid.  The return value is a dictionary with the following
    keys:

        - "row":  The row communicator.
        - "col":  The column communicator.
        - "row_node":  The node-local communicator within the row communicator
        - "col_node":  The node-local communicator within the col communicator
        - "row_rank_node":  The communicator across nodes among processes with
            the same node-rank within the row communicator.
        - "col_rank_node":  The communicator across nodes among processes with
            the same node-rank within the column communicator.

    Args:
        process_rows (int):  The number of rows in the process grid.

    Returns:
        (dict):  The communicators for this grid shape.

    """
    if process_rows not in self._rowcolcomm:
        # Does not exist yet, create it.
        if self._gcomm is None:
            if process_rows != 1:
                msg = "MPI not in use, so only process_rows == 1 is allowed"
                log.error(msg)
                raise ValueError(msg)
            self._rowcolcomm[process_rows] = {
                "row": None,
                "col": None,
                "row_node": None,
                "row_rank_node": None,
                "col_node": None,
                "col_rank_node": None,
                "cleanup": False,
            }
        else:
            if self._gcomm.size % process_rows != 0:
                msg = f"The number of process_rows ({process_rows}) "
                msg += f"does not divide evenly into the communicator "
                msg += f"size ({self._gcomm.size})"
                log.error(msg)
                raise RuntimeError(msg)
            process_cols = self._gcomm.size // process_rows

            if process_rows == 1:
                # We can re-use the group communicators as the grid column
                # communicators
                comm_row = self._gcomm
                comm_row_node = self._gnodecomm
                comm_row_rank_node = self._gnoderankcomm
                comm_col = None
                comm_col_node = None
                comm_col_rank_node = None
                cleanup = False
            elif process_cols == 1:
                # We can re-use the group communicators as the grid row
                # communicators
                comm_col = self._gcomm
                comm_col_node = self._gnodecomm
                comm_col_rank_node = self._gnoderankcomm
                comm_row = None
                comm_row_node = None
                comm_row_rank_node = None
                cleanup = False
            else:
                # We have to create new split communicators
                col_rank = self._gcomm.rank // process_cols
                row_rank = self._gcomm.rank % process_cols
                comm_row = self._gcomm.Split(col_rank, row_rank)
                comm_col = self._gcomm.Split(row_rank, col_rank)

                # Node and node-rank comms for each row and col.
                comm_row_node = comm_row.Split_type(MPI.COMM_TYPE_SHARED, 0)
                row_nodeprocs = comm_row_node.size
                row_node = comm_row.rank // row_nodeprocs
                comm_row_rank_node = comm_row.Split(comm_row_node.rank, row_node)

                comm_col_node = comm_col.Split_type(MPI.COMM_TYPE_SHARED, 0)
                col_nodeprocs = comm_col_node.size
                col_node = comm_col.rank // col_nodeprocs
                comm_col_rank_node = comm_col.Split(comm_col_node.rank, col_node)
                cleanup = True

            msg = f"Comm on world rank {self._wrank} create grid with {process_rows} rows:\n"
            msg += f"  row comm = {comm_row}\n"
            msg += f"  node comm = {comm_row_node}\n"
            msg += f"  node rank comm = {comm_row_rank_node}\n"
            msg += f"  col comm = {comm_col}\n"
            msg += f"  node comm = {comm_col_node}\n"
            msg += f"  node rank comm = {comm_col_rank_node}"
            log.verbose(msg)

            self._rowcolcomm[process_rows] = {
                "row": comm_row,
                "row_node": comm_row_node,
                "row_rank_node": comm_row_rank_node,
                "col": comm_col,
                "col_node": comm_col_node,
                "col_rank_node": comm_col_rank_node,
                "cleanup": cleanup,
            }
    return self._rowcolcomm[process_rows]

toast.Data

Bases: MutableMapping

Class which represents distributed data

A Data object contains a list of observations assigned to each process group in the Comm.

Parameters:

Name Type Description Default
comm

class:toast.Comm): The toast Comm class for distributing the data.

Comm()
view bool

If True, do not explicitly clear observation data on deletion.

False
Source code in toast/data.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
class Data(MutableMapping):
    """Class which represents distributed data

    A Data object contains a list of observations assigned to
    each process group in the Comm.

    Args:
        comm (:class:`toast.Comm`):  The toast Comm class for distributing the data.
        view (bool):  If True, do not explicitly clear observation data on deletion.

    """

    def __init__(self, comm=Comm(), view=False):
        self._comm = comm
        self._view = view
        self.obs = []
        """The list of observations.
        """
        self._internal = dict()

    def __getitem__(self, key):
        return self._internal[key]

    def __delitem__(self, key):
        del self._internal[key]

    def __setitem__(self, key, value):
        self._internal[key] = value

    def __iter__(self):
        return iter(self._internal)

    def __len__(self):
        return len(self._internal)

    def __repr__(self):
        val = "<Data with {} Observations:\n".format(len(self.obs))
        for ob in self.obs:
            val += "{}\n".format(ob)
        val += "Metadata:\n"
        val += "{}".format(self._internal)
        val += "\n>"
        return val

    def __del__(self):
        if hasattr(self, "obs"):
            self.clear()

    @property
    def comm(self):
        """The toast.Comm over which the data is distributed."""
        return self._comm

    def clear(self):
        """Clear the list of observations."""
        if not self._view:
            self.accel_clear()
            for ob in self.obs:
                ob.clear()
        self.obs.clear()
        if not self._view:
            self._internal.clear()
        return

    def all_local_detectors(self, selection=None, flagmask=0):
        """Get the superset of local detectors in all observations.

        This builds up the result from calling `select_local_detectors()` on
        all observations.

        Args:
            selection (list):  Only consider this list of detectors
            flagmask (int):  Apply this det_mask to the detector selection in
                each observation.

        Returns:
            (list):  The list of all local detectors across all observations.

        """
        all_dets = OrderedDict()
        for ob in self.obs:
            dets = ob.select_local_detectors(selection=selection, flagmask=flagmask)
            for d in dets:
                if d not in all_dets:
                    all_dets[d] = None
        return list(all_dets.keys())

    def detector_units(self, det_data):
        """Get the detector data units for a given field.

        This verifies that the specified detector data field has the same
        units in all observations where it occurs, and returns that unit.

        Args:
            det_data (str):  The detector data field.

        Returns:
            (Unit):  The unit used across all observations.

        """
        log = Logger.get()
        local_units = None
        for ob in self.obs:
            if det_data not in ob.detdata:
                continue
            ob_units = ob.detdata[det_data].units
            if local_units is None:
                local_units = ob_units
            else:
                if ob_units != local_units:
                    msg = f"obs {ob.name} detdata {det_data} units "
                    msg += f"{ob_units} != {local_units}"
                    log.error(msg)
                    raise RuntimeError(msg)
        if self.comm.comm_world is None:
            det_units = local_units
        else:
            det_units = self.comm.comm_world.gather(local_units, root=0)
            if self.comm.world_rank == 0:
                for dtu in det_units:
                    if dtu != local_units:
                        msg = f"observations have different units "
                        msg += f"{dtu} != {local_units}"
                        log.error(msg)
                        raise RuntimeError(msg)
            # We know that every process is the same now
            det_units = local_units
        return det_units

    def info(self, handle=None):
        """Print information about the distributed data.

        Information is written to the specified file handle.  Only the rank 0
        process writes.

        Args:
            handle (descriptor):  file descriptor supporting the write()
                method.  If None, use print().

        Returns:
            None

        """
        # Each process group gathers their output

        groupstr = ""
        procstr = ""

        gcomm = self._comm.comm_group
        wcomm = self._comm.comm_world
        rcomm = self._comm.comm_group_rank

        if wcomm is None:
            msg = "Data distributed over a single process (no MPI)"
            if handle is None:
                print(msg, flush=True)
            else:
                handle.write(msg)
        else:
            if wcomm.rank == 0:
                msg = "Data distributed over {} processes in {} groups\n".format(
                    self._comm.world_size, self._comm.ngroups
                )
                if handle is None:
                    print(msg, flush=True)
                else:
                    handle.write(msg)

        def _get_optional(k, dt):
            if k in dt:
                return dt[k]
            else:
                return None

        for ob in self.obs:
            if self._comm.group_rank == 0:
                groupstr = "{}{}\n".format(groupstr, str(ob))

        # The world rank 0 process collects output from all groups and
        # writes to the handle

        recvgrp = ""
        if self._comm.world_rank == 0:
            if handle is None:
                print(groupstr, flush=True)
            else:
                handle.write(groupstr)
        if wcomm is not None:
            for g in range(1, self._comm.ngroups):
                if wcomm.rank == 0:
                    recvgrp = rcomm.recv(source=g, tag=g)
                    if handle is None:
                        print(recvgrp, flush=True)
                    else:
                        handle.write(recvgrp)
                elif g == self._comm.group:
                    if gcomm.rank == 0:
                        rcomm.send(groupstr, dest=0, tag=g)
                wcomm.barrier()
        return

    def split(
        self,
        obs_index=False,
        obs_name=False,
        obs_uid=False,
        obs_session_name=False,
        obs_key=None,
        require_full=False,
    ):
        """Split the Data object.

        Create new Data objects that have views into unique subsets of the observations
        (the observations are not copied).  Only one "criteria" may be used to perform
        this splitting operation.  The observations may be split by index in the
        original list, by name, by UID, by session, or by the value of a specified key.

        The new Data objects are returned in a dictionary whose keys are the value of
        the selection criteria (index, name, uid, or value of the key).  Any observation
        that cannot be placed (because it is missing a name, uid or key) will be ignored
        and not added to any of the returned Data objects.  If the `require_full`
        parameter is set to True, such situations will raise an exception.

        Args:
            obs_index (bool):  If True, split by index in original list of observations.
            obs_name (bool):  If True, split by observation name.
            obs_uid (bool):  If True, split by observation UID.
            obs_session_name (bool):  If True, split by session name.
            obs_key (str):  Split by values of this observation key.
            require_full (bool):  If True, every observation must be placed in the
                output.

        Returns:
            (OrderedDict):  The dictionary of new Data objects.

        """
        log = Logger.get()
        check = (
            int(obs_index)
            + int(obs_name)
            + int(obs_uid)
            + int(obs_session_name)
            + int(obs_key is not None)
        )
        if check == 0 or check > 1:
            raise RuntimeError("You must specify exactly one split criteria")

        datasplit = OrderedDict()

        group_rank = self.comm.group_rank
        group_comm = self.comm.comm_group

        if obs_index:
            # Splitting by (unique) index
            for iob, ob in enumerate(self.obs):
                newdat = Data(comm=self._comm, view=True)
                newdat._internal = self._internal
                newdat.obs.append(ob)
                datasplit[iob] = newdat
        elif obs_name:
            # Splitting by (unique) name
            for iob, ob in enumerate(self.obs):
                if ob.name is None:
                    if require_full:
                        msg = f"require_full is True, but observation {iob} has no name"
                        log.error_rank(msg, comm=group_comm)
                        raise RuntimeError(msg)
                else:
                    newdat = Data(comm=self._comm, view=True)
                    newdat._internal = self._internal
                    newdat.obs.append(ob)
                    datasplit[ob.name] = newdat
        elif obs_uid:
            # Splitting by UID
            for iob, ob in enumerate(self.obs):
                if ob.uid is None:
                    if require_full:
                        msg = f"require_full is True, but observation {iob} has no UID"
                        log.error_rank(msg, comm=group_comm)
                        raise RuntimeError(msg)
                else:
                    newdat = Data(comm=self._comm, view=True)
                    newdat._internal = self._internal
                    newdat.obs.append(ob)
                    datasplit[ob.uid] = newdat
        elif obs_session_name:
            # Splitting by (non-unique) session name
            for iob, ob in enumerate(self.obs):
                if ob.session is None or ob.session.name is None:
                    if require_full:
                        msg = f"require_full is True, but observation {iob} has no session name"
                        log.error_rank(msg, comm=group_comm)
                        raise RuntimeError(msg)
                else:
                    sname = ob.session.name
                    if sname not in datasplit:
                        newdat = Data(comm=self._comm, view=True)
                        newdat._internal = self._internal
                        datasplit[sname] = newdat
                    datasplit[sname].obs.append(ob)
        elif obs_key is not None:
            # Splitting by arbitrary key.  Unlike name / uid which are built it to the
            # observation class, arbitrary keys might be modified in different ways
            # across all processes in a group.  For this reason, we do an additional
            # check for consistent values across the process group.
            for iob, ob in enumerate(self.obs):
                if obs_key not in ob:
                    if require_full:
                        msg = f"require_full is True, but observation {iob} has no key '{obs_key}'"
                        log.error_rank(msg, comm=group_comm)
                        raise RuntimeError(msg)
                else:
                    obs_val = ob[obs_key]
                    # Get the values from all processes in the group
                    group_vals = None
                    if group_comm is None:
                        group_vals = [obs_val]
                    else:
                        group_vals = group_comm.allgather(obs_val)
                    if group_vals.count(group_vals[0]) != len(group_vals):
                        msg = f"observation {iob}, key '{obs_key}' has inconsistent values across processes"
                        log.error_rank(msg, comm=group_comm)
                        raise RuntimeError(msg)
                    if obs_val not in datasplit:
                        newdat = Data(comm=self._comm, view=True)
                        newdat._internal = self._internal
                        datasplit[obs_val] = newdat
                    datasplit[obs_val].obs.append(ob)
        return datasplit

    def select(
        self,
        obs_index=None,
        obs_name=None,
        obs_uid=None,
        obs_session_name=None,
        obs_key=None,
        obs_val=None,
    ):
        """Create a new Data object with a subset of observations.

        The returned Data object just has a view of the original observations (they
        are not copied).

        The list of observations in the new Data object is a logical OR of the
        criteria passed in:
            * Index location in the original list of observations
            * Name of the observation
            * UID of the observation
            * Session of the observation
            * Existence of the specified dictionary key
            * Required value of the specified dictionary key

        Args:
            obs_index (int):  Observation location in the original list.
            obs_name (str):  The observation name or a compiled regular expression
                object to use for matching.
            obs_uid (int):  The observation UID to select.
            obs_session_name (str):  The name of the session.
            obs_key (str):  The observation dictionary key to examine.
            obs_val (str):  The required value of the observation dictionary key or a
                compiled regular expression object to use for matching.

        Returns:
            (Data):  A new Data object with references to the orginal metadata and
                a subset of observations.

        """
        log = Logger.get()
        if obs_val is not None and obs_key is None:
            raise RuntimeError("If you specify obs_val, you must also specify obs_key")

        group_rank = self.comm.group_rank
        group_comm = self.comm.comm_group

        new_data = Data(comm=self._comm, view=True)

        # Use a reference to the original metadata
        new_data._internal = self._internal

        for iob, ob in enumerate(self.obs):
            if obs_index is not None and obs_index == iob:
                new_data.obs.append(ob)
                continue
            if obs_name is not None and ob.name is not None:
                if isinstance(obs_name, re.Pattern):
                    if obs_name.match(ob.name) is not None:
                        new_data.obs.append(ob)
                        continue
                elif obs_name == ob.name:
                    new_data.obs.append(ob)
                    continue
            if obs_uid is not None and ob.uid is not None and obs_uid == ob.uid:
                new_data.obs.append(ob)
                continue
            if (
                obs_session_name is not None
                and ob.session is not None
                and obs_session_name == ob.session.name
            ):
                new_data.obs.append(ob)
                continue
            if obs_key is not None and obs_key in ob:
                # Get the values from all processes in the group and check
                # for consistency.
                group_vals = None
                if group_comm is None:
                    group_vals = [ob[obs_key]]
                else:
                    group_vals = group_comm.allgather(ob[obs_key])
                if group_vals.count(group_vals[0]) != len(group_vals):
                    msg = f"observation {iob}, key '{obs_key}' has inconsistent values across processes"
                    log.error_rank(msg, comm=group_comm)
                    raise RuntimeError(msg)

                if obs_val is None:
                    # We have the key, and are accepting any value
                    new_data.obs.append(ob)
                    continue
                elif isinstance(obs_val, re.Pattern):
                    if obs_val.match(ob[obs_key]) is not None:
                        # Matches our regex
                        new_data.obs.append(ob)
                        continue
                elif obs_val == ob[obs_key]:
                    new_data.obs.append(ob)
                    continue
        return new_data

    # Accelerator use

    def accel_create(self, names):
        """Create a set of data objects on the device.

        This takes a dictionary with the same format as those used by the Operator
        provides() and requires() methods.  If the data already exists on the
        device then no action is taken.

        Args:
            names (dict):  Dictionary of lists.

        Returns:
            None

        """
        log = Logger.get()
        if not accel_enabled():
            log.verbose(f"accel_enabled is False, canceling accel_create.")
            return

        for ob in self.obs:
            for objname, objmgr in [
                ("detdata", ob.detdata),
                ("shared", ob.shared),
                ("intervals", ob.intervals),
            ]:
                for key in names[objname]:
                    if key not in objmgr:
                        msg = f"ob {ob.name} {objname} accel_create '{key}' "
                        msg += f"not present, ignoring"
                        log.verbose(msg)
                        continue
                    if objmgr.accel_exists(key):
                        msg = f"ob {ob.name} {objname}: accel_create '{key}'"
                        msg += f" already exists"
                        log.verbose(msg)
                    else:
                        log.verbose(f"ob {ob.name} {objname}: accel_create '{key}'")
                        objmgr.accel_create(key)

        for key in names["global"]:
            val = self._internal.get(key, None)
            if isinstance(val, AcceleratorObject):
                if not val.accel_exists():
                    log.verbose(f"Data accel_create: '{key}'")
                    val.accel_create(key)
                else:
                    log.verbose(f"Data accel_create: '{key}' already on device")
            else:
                log.verbose(
                    f"Data accel_create: '{key}' ({type(val)}) is not an AcceleratorObject"
                )

    def accel_update_device(self, names):
        """Copy a set of data objects to the device.

        This takes a dictionary with the same format as those used by the Operator
        provides() and requires() methods.

        Args:
            names (dict):  Dictionary of lists.

        Returns:
            None

        """
        if not accel_enabled():
            return
        log = Logger.get()

        for ob in self.obs:
            for objname, objmgr in [
                ("detdata", ob.detdata),
                ("shared", ob.shared),
                ("intervals", ob.intervals),
            ]:
                for key in names[objname]:
                    if key not in objmgr:
                        msg = f"ob {ob.name} {objname} update_device key '{key}'"
                        msg += f" not present, ignoring"
                        log.verbose(msg)
                        continue
                    if not objmgr.accel_exists(key):
                        msg = f"ob {ob.name} {objname} update_device key '{key}'"
                        msg += f" does not exist on accelerator"
                        log.error(msg)
                        raise RuntimeError(msg)
                    if objmgr.accel_in_use(key):
                        msg = f"ob {ob.name} {objname}: skip update_device '{key}'"
                        msg += f" already in use"
                        log.verbose(msg)
                    else:
                        log.verbose(f"ob {ob.name} {objname}: update_device '{key}'")
                        objmgr.accel_update_device(key)

        for key in names["global"]:
            val = self._internal.get(key, None)
            if isinstance(val, AcceleratorObject):
                if val.accel_in_use():
                    msg = f"Skipping update_device for '{key}', "
                    msg += "device data in use"
                    log.verbose(msg)
                else:
                    log.verbose(f"Calling Data update_device for '{key}'")
                    val.accel_update_device()
            else:
                msg = f"Data accel_update_device: '{key}' ({type(val)}) "
                msg += "is not an AcceleratorObject"
                log.verbose(msg)

    def accel_update_host(self, names):
        """Copy a set of data objects to the host.

        This takes a dictionary with the same format as those used by the Operator
        provides() and requires() methods.

        Args:
            names (dict):  Dictionary of lists.

        Returns:
            None

        """
        if not accel_enabled():
            return
        log = Logger.get()

        for ob in self.obs:
            for objname, objmgr in [
                ("detdata", ob.detdata),
                ("shared", ob.shared),
                ("intervals", ob.intervals),
            ]:
                for key in names[objname]:
                    if key not in objmgr:
                        msg = f"ob {ob.name} {objname} update_host key '{key}'"
                        msg += f" not present, ignoring"
                        log.verbose(msg)
                        continue
                    if not objmgr.accel_exists(key):
                        msg = f"ob {ob.name} {objname} update_host key '{key}'"
                        msg += f" does not exist on accelerator, ignoring"
                        log.verbose(msg)
                        continue
                    if not objmgr.accel_in_use(key):
                        msg = f"ob {ob.name} {objname}: skip update_host, '{key}'"
                        msg += f" already on host"
                        log.verbose(msg)
                    else:
                        log.verbose(f"ob {ob.name} {objname}: update_host '{key}'")
                        objmgr.accel_update_host(key)

        for key in names["global"]:
            val = self._internal.get(key, None)
            if isinstance(val, AcceleratorObject):
                if not val.accel_in_use():
                    msg = f"Skipping update_host for '{key}', "
                    msg += "host data already in use"
                    log.verbose(msg)
                else:
                    log.verbose(f"Calling Data update_host for '{key}'")
                    val.accel_update_host()
            else:
                msg = f"Data accel_update_host: '{key}' ({type(val)}) "
                msg += "is not an AcceleratorObject"
                log.verbose(msg)

    def accel_delete(self, names):
        """Delete a specific set of device objects

        This takes a dictionary with the same format as those used by the Operator
        provides() and requires() methods.

        Args:
            names (dict):  Dictionary of lists.

        Returns:
            None

        """
        if not accel_enabled():
            return
        log = Logger.get()

        for ob in self.obs:
            for objname, objmgr in [
                ("detdata", ob.detdata),
                ("shared", ob.shared),
                ("intervals", ob.intervals),
            ]:
                for key in names[objname]:
                    if key not in objmgr:
                        msg = f"ob {ob.name} {objname} accel_delete key '{key}'"
                        msg += f" not present, ignoring"
                        log.verbose(msg)
                        continue
                    if objmgr.accel_exists(key):
                        log.verbose(f"ob {ob.name} {objname}: accel_delete '{key}'")
                        objmgr.accel_delete(key)
                    else:
                        msg = f"ob {ob.name} {objname}: accel_delete '{key}'"
                        msg += f" not present on device"
                        log.verbose(msg)

        for key in names["global"]:
            val = self._internal.get(key, None)
            if isinstance(val, AcceleratorObject):
                if val.accel_exists():
                    log.verbose(f"Calling Data accel_delete for '{key}'")
                    val.accel_delete()
            else:
                msg = f"Data accel_delete: '{key}' ({type(val)}) "
                msg += "is not an AcceleratorObject"
                log.verbose(msg)

    def accel_clear(self):
        """Delete all accelerator data."""
        if not accel_enabled():
            return
        log = Logger.get()

        for ob in self.obs:
            ob.accel_clear()

        for key, val in self._internal.items():
            if isinstance(val, AcceleratorObject):
                if val.accel_exists():
                    val.accel_delete()
            else:
                msg = f"Data accel_clear: '{key}' ({type(val)}) "
                msg += "is not an AcceleratorObject"
                log.verbose(msg)

_comm = comm instance-attribute

_internal = dict() instance-attribute

_view = view instance-attribute

comm property

The toast.Comm over which the data is distributed.

obs = [] instance-attribute

The list of observations.

__del__()

Source code in toast/data.py
60
61
62
def __del__(self):
    if hasattr(self, "obs"):
        self.clear()

__delitem__(key)

Source code in toast/data.py
39
40
def __delitem__(self, key):
    del self._internal[key]

__getitem__(key)

Source code in toast/data.py
36
37
def __getitem__(self, key):
    return self._internal[key]

__init__(comm=Comm(), view=False)

Source code in toast/data.py
28
29
30
31
32
33
34
def __init__(self, comm=Comm(), view=False):
    self._comm = comm
    self._view = view
    self.obs = []
    """The list of observations.
    """
    self._internal = dict()

__iter__()

Source code in toast/data.py
45
46
def __iter__(self):
    return iter(self._internal)

__len__()

Source code in toast/data.py
48
49
def __len__(self):
    return len(self._internal)

__repr__()

Source code in toast/data.py
51
52
53
54
55
56
57
58
def __repr__(self):
    val = "<Data with {} Observations:\n".format(len(self.obs))
    for ob in self.obs:
        val += "{}\n".format(ob)
    val += "Metadata:\n"
    val += "{}".format(self._internal)
    val += "\n>"
    return val

__setitem__(key, value)

Source code in toast/data.py
42
43
def __setitem__(self, key, value):
    self._internal[key] = value

accel_clear()

Delete all accelerator data.

Source code in toast/data.py
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
def accel_clear(self):
    """Delete all accelerator data."""
    if not accel_enabled():
        return
    log = Logger.get()

    for ob in self.obs:
        ob.accel_clear()

    for key, val in self._internal.items():
        if isinstance(val, AcceleratorObject):
            if val.accel_exists():
                val.accel_delete()
        else:
            msg = f"Data accel_clear: '{key}' ({type(val)}) "
            msg += "is not an AcceleratorObject"
            log.verbose(msg)

accel_create(names)

Create a set of data objects on the device.

This takes a dictionary with the same format as those used by the Operator provides() and requires() methods. If the data already exists on the device then no action is taken.

Parameters:

Name Type Description Default
names dict

Dictionary of lists.

required

Returns:

Type Description

None

Source code in toast/data.py
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
def accel_create(self, names):
    """Create a set of data objects on the device.

    This takes a dictionary with the same format as those used by the Operator
    provides() and requires() methods.  If the data already exists on the
    device then no action is taken.

    Args:
        names (dict):  Dictionary of lists.

    Returns:
        None

    """
    log = Logger.get()
    if not accel_enabled():
        log.verbose(f"accel_enabled is False, canceling accel_create.")
        return

    for ob in self.obs:
        for objname, objmgr in [
            ("detdata", ob.detdata),
            ("shared", ob.shared),
            ("intervals", ob.intervals),
        ]:
            for key in names[objname]:
                if key not in objmgr:
                    msg = f"ob {ob.name} {objname} accel_create '{key}' "
                    msg += f"not present, ignoring"
                    log.verbose(msg)
                    continue
                if objmgr.accel_exists(key):
                    msg = f"ob {ob.name} {objname}: accel_create '{key}'"
                    msg += f" already exists"
                    log.verbose(msg)
                else:
                    log.verbose(f"ob {ob.name} {objname}: accel_create '{key}'")
                    objmgr.accel_create(key)

    for key in names["global"]:
        val = self._internal.get(key, None)
        if isinstance(val, AcceleratorObject):
            if not val.accel_exists():
                log.verbose(f"Data accel_create: '{key}'")
                val.accel_create(key)
            else:
                log.verbose(f"Data accel_create: '{key}' already on device")
        else:
            log.verbose(
                f"Data accel_create: '{key}' ({type(val)}) is not an AcceleratorObject"
            )

accel_delete(names)

Delete a specific set of device objects

This takes a dictionary with the same format as those used by the Operator provides() and requires() methods.

Parameters:

Name Type Description Default
names dict

Dictionary of lists.

required

Returns:

Type Description

None

Source code in toast/data.py
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
def accel_delete(self, names):
    """Delete a specific set of device objects

    This takes a dictionary with the same format as those used by the Operator
    provides() and requires() methods.

    Args:
        names (dict):  Dictionary of lists.

    Returns:
        None

    """
    if not accel_enabled():
        return
    log = Logger.get()

    for ob in self.obs:
        for objname, objmgr in [
            ("detdata", ob.detdata),
            ("shared", ob.shared),
            ("intervals", ob.intervals),
        ]:
            for key in names[objname]:
                if key not in objmgr:
                    msg = f"ob {ob.name} {objname} accel_delete key '{key}'"
                    msg += f" not present, ignoring"
                    log.verbose(msg)
                    continue
                if objmgr.accel_exists(key):
                    log.verbose(f"ob {ob.name} {objname}: accel_delete '{key}'")
                    objmgr.accel_delete(key)
                else:
                    msg = f"ob {ob.name} {objname}: accel_delete '{key}'"
                    msg += f" not present on device"
                    log.verbose(msg)

    for key in names["global"]:
        val = self._internal.get(key, None)
        if isinstance(val, AcceleratorObject):
            if val.accel_exists():
                log.verbose(f"Calling Data accel_delete for '{key}'")
                val.accel_delete()
        else:
            msg = f"Data accel_delete: '{key}' ({type(val)}) "
            msg += "is not an AcceleratorObject"
            log.verbose(msg)

accel_update_device(names)

Copy a set of data objects to the device.

This takes a dictionary with the same format as those used by the Operator provides() and requires() methods.

Parameters:

Name Type Description Default
names dict

Dictionary of lists.

required

Returns:

Type Description

None

Source code in toast/data.py
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
def accel_update_device(self, names):
    """Copy a set of data objects to the device.

    This takes a dictionary with the same format as those used by the Operator
    provides() and requires() methods.

    Args:
        names (dict):  Dictionary of lists.

    Returns:
        None

    """
    if not accel_enabled():
        return
    log = Logger.get()

    for ob in self.obs:
        for objname, objmgr in [
            ("detdata", ob.detdata),
            ("shared", ob.shared),
            ("intervals", ob.intervals),
        ]:
            for key in names[objname]:
                if key not in objmgr:
                    msg = f"ob {ob.name} {objname} update_device key '{key}'"
                    msg += f" not present, ignoring"
                    log.verbose(msg)
                    continue
                if not objmgr.accel_exists(key):
                    msg = f"ob {ob.name} {objname} update_device key '{key}'"
                    msg += f" does not exist on accelerator"
                    log.error(msg)
                    raise RuntimeError(msg)
                if objmgr.accel_in_use(key):
                    msg = f"ob {ob.name} {objname}: skip update_device '{key}'"
                    msg += f" already in use"
                    log.verbose(msg)
                else:
                    log.verbose(f"ob {ob.name} {objname}: update_device '{key}'")
                    objmgr.accel_update_device(key)

    for key in names["global"]:
        val = self._internal.get(key, None)
        if isinstance(val, AcceleratorObject):
            if val.accel_in_use():
                msg = f"Skipping update_device for '{key}', "
                msg += "device data in use"
                log.verbose(msg)
            else:
                log.verbose(f"Calling Data update_device for '{key}'")
                val.accel_update_device()
        else:
            msg = f"Data accel_update_device: '{key}' ({type(val)}) "
            msg += "is not an AcceleratorObject"
            log.verbose(msg)

accel_update_host(names)

Copy a set of data objects to the host.

This takes a dictionary with the same format as those used by the Operator provides() and requires() methods.

Parameters:

Name Type Description Default
names dict

Dictionary of lists.

required

Returns:

Type Description

None

Source code in toast/data.py
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
def accel_update_host(self, names):
    """Copy a set of data objects to the host.

    This takes a dictionary with the same format as those used by the Operator
    provides() and requires() methods.

    Args:
        names (dict):  Dictionary of lists.

    Returns:
        None

    """
    if not accel_enabled():
        return
    log = Logger.get()

    for ob in self.obs:
        for objname, objmgr in [
            ("detdata", ob.detdata),
            ("shared", ob.shared),
            ("intervals", ob.intervals),
        ]:
            for key in names[objname]:
                if key not in objmgr:
                    msg = f"ob {ob.name} {objname} update_host key '{key}'"
                    msg += f" not present, ignoring"
                    log.verbose(msg)
                    continue
                if not objmgr.accel_exists(key):
                    msg = f"ob {ob.name} {objname} update_host key '{key}'"
                    msg += f" does not exist on accelerator, ignoring"
                    log.verbose(msg)
                    continue
                if not objmgr.accel_in_use(key):
                    msg = f"ob {ob.name} {objname}: skip update_host, '{key}'"
                    msg += f" already on host"
                    log.verbose(msg)
                else:
                    log.verbose(f"ob {ob.name} {objname}: update_host '{key}'")
                    objmgr.accel_update_host(key)

    for key in names["global"]:
        val = self._internal.get(key, None)
        if isinstance(val, AcceleratorObject):
            if not val.accel_in_use():
                msg = f"Skipping update_host for '{key}', "
                msg += "host data already in use"
                log.verbose(msg)
            else:
                log.verbose(f"Calling Data update_host for '{key}'")
                val.accel_update_host()
        else:
            msg = f"Data accel_update_host: '{key}' ({type(val)}) "
            msg += "is not an AcceleratorObject"
            log.verbose(msg)

all_local_detectors(selection=None, flagmask=0)

Get the superset of local detectors in all observations.

This builds up the result from calling select_local_detectors() on all observations.

Parameters:

Name Type Description Default
selection list

Only consider this list of detectors

None
flagmask int

Apply this det_mask to the detector selection in each observation.

0

Returns:

Type Description
list

The list of all local detectors across all observations.

Source code in toast/data.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def all_local_detectors(self, selection=None, flagmask=0):
    """Get the superset of local detectors in all observations.

    This builds up the result from calling `select_local_detectors()` on
    all observations.

    Args:
        selection (list):  Only consider this list of detectors
        flagmask (int):  Apply this det_mask to the detector selection in
            each observation.

    Returns:
        (list):  The list of all local detectors across all observations.

    """
    all_dets = OrderedDict()
    for ob in self.obs:
        dets = ob.select_local_detectors(selection=selection, flagmask=flagmask)
        for d in dets:
            if d not in all_dets:
                all_dets[d] = None
    return list(all_dets.keys())

clear()

Clear the list of observations.

Source code in toast/data.py
69
70
71
72
73
74
75
76
77
78
def clear(self):
    """Clear the list of observations."""
    if not self._view:
        self.accel_clear()
        for ob in self.obs:
            ob.clear()
    self.obs.clear()
    if not self._view:
        self._internal.clear()
    return

detector_units(det_data)

Get the detector data units for a given field.

This verifies that the specified detector data field has the same units in all observations where it occurs, and returns that unit.

Parameters:

Name Type Description Default
det_data str

The detector data field.

required

Returns:

Type Description
Unit

The unit used across all observations.

Source code in toast/data.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def detector_units(self, det_data):
    """Get the detector data units for a given field.

    This verifies that the specified detector data field has the same
    units in all observations where it occurs, and returns that unit.

    Args:
        det_data (str):  The detector data field.

    Returns:
        (Unit):  The unit used across all observations.

    """
    log = Logger.get()
    local_units = None
    for ob in self.obs:
        if det_data not in ob.detdata:
            continue
        ob_units = ob.detdata[det_data].units
        if local_units is None:
            local_units = ob_units
        else:
            if ob_units != local_units:
                msg = f"obs {ob.name} detdata {det_data} units "
                msg += f"{ob_units} != {local_units}"
                log.error(msg)
                raise RuntimeError(msg)
    if self.comm.comm_world is None:
        det_units = local_units
    else:
        det_units = self.comm.comm_world.gather(local_units, root=0)
        if self.comm.world_rank == 0:
            for dtu in det_units:
                if dtu != local_units:
                    msg = f"observations have different units "
                    msg += f"{dtu} != {local_units}"
                    log.error(msg)
                    raise RuntimeError(msg)
        # We know that every process is the same now
        det_units = local_units
    return det_units

info(handle=None)

Print information about the distributed data.

Information is written to the specified file handle. Only the rank 0 process writes.

Parameters:

Name Type Description Default
handle descriptor

file descriptor supporting the write() method. If None, use print().

None

Returns:

Type Description

None

Source code in toast/data.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def info(self, handle=None):
    """Print information about the distributed data.

    Information is written to the specified file handle.  Only the rank 0
    process writes.

    Args:
        handle (descriptor):  file descriptor supporting the write()
            method.  If None, use print().

    Returns:
        None

    """
    # Each process group gathers their output

    groupstr = ""
    procstr = ""

    gcomm = self._comm.comm_group
    wcomm = self._comm.comm_world
    rcomm = self._comm.comm_group_rank

    if wcomm is None:
        msg = "Data distributed over a single process (no MPI)"
        if handle is None:
            print(msg, flush=True)
        else:
            handle.write(msg)
    else:
        if wcomm.rank == 0:
            msg = "Data distributed over {} processes in {} groups\n".format(
                self._comm.world_size, self._comm.ngroups
            )
            if handle is None:
                print(msg, flush=True)
            else:
                handle.write(msg)

    def _get_optional(k, dt):
        if k in dt:
            return dt[k]
        else:
            return None

    for ob in self.obs:
        if self._comm.group_rank == 0:
            groupstr = "{}{}\n".format(groupstr, str(ob))

    # The world rank 0 process collects output from all groups and
    # writes to the handle

    recvgrp = ""
    if self._comm.world_rank == 0:
        if handle is None:
            print(groupstr, flush=True)
        else:
            handle.write(groupstr)
    if wcomm is not None:
        for g in range(1, self._comm.ngroups):
            if wcomm.rank == 0:
                recvgrp = rcomm.recv(source=g, tag=g)
                if handle is None:
                    print(recvgrp, flush=True)
                else:
                    handle.write(recvgrp)
            elif g == self._comm.group:
                if gcomm.rank == 0:
                    rcomm.send(groupstr, dest=0, tag=g)
            wcomm.barrier()
    return

select(obs_index=None, obs_name=None, obs_uid=None, obs_session_name=None, obs_key=None, obs_val=None)

Create a new Data object with a subset of observations.

The returned Data object just has a view of the original observations (they are not copied).

The list of observations in the new Data object is a logical OR of the criteria passed in: * Index location in the original list of observations * Name of the observation * UID of the observation * Session of the observation * Existence of the specified dictionary key * Required value of the specified dictionary key

Parameters:

Name Type Description Default
obs_index int

Observation location in the original list.

None
obs_name str

The observation name or a compiled regular expression object to use for matching.

None
obs_uid int

The observation UID to select.

None
obs_session_name str

The name of the session.

None
obs_key str

The observation dictionary key to examine.

None
obs_val str

The required value of the observation dictionary key or a compiled regular expression object to use for matching.

None

Returns:

Type Description
Data

A new Data object with references to the orginal metadata and a subset of observations.

Source code in toast/data.py
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
def select(
    self,
    obs_index=None,
    obs_name=None,
    obs_uid=None,
    obs_session_name=None,
    obs_key=None,
    obs_val=None,
):
    """Create a new Data object with a subset of observations.

    The returned Data object just has a view of the original observations (they
    are not copied).

    The list of observations in the new Data object is a logical OR of the
    criteria passed in:
        * Index location in the original list of observations
        * Name of the observation
        * UID of the observation
        * Session of the observation
        * Existence of the specified dictionary key
        * Required value of the specified dictionary key

    Args:
        obs_index (int):  Observation location in the original list.
        obs_name (str):  The observation name or a compiled regular expression
            object to use for matching.
        obs_uid (int):  The observation UID to select.
        obs_session_name (str):  The name of the session.
        obs_key (str):  The observation dictionary key to examine.
        obs_val (str):  The required value of the observation dictionary key or a
            compiled regular expression object to use for matching.

    Returns:
        (Data):  A new Data object with references to the orginal metadata and
            a subset of observations.

    """
    log = Logger.get()
    if obs_val is not None and obs_key is None:
        raise RuntimeError("If you specify obs_val, you must also specify obs_key")

    group_rank = self.comm.group_rank
    group_comm = self.comm.comm_group

    new_data = Data(comm=self._comm, view=True)

    # Use a reference to the original metadata
    new_data._internal = self._internal

    for iob, ob in enumerate(self.obs):
        if obs_index is not None and obs_index == iob:
            new_data.obs.append(ob)
            continue
        if obs_name is not None and ob.name is not None:
            if isinstance(obs_name, re.Pattern):
                if obs_name.match(ob.name) is not None:
                    new_data.obs.append(ob)
                    continue
            elif obs_name == ob.name:
                new_data.obs.append(ob)
                continue
        if obs_uid is not None and ob.uid is not None and obs_uid == ob.uid:
            new_data.obs.append(ob)
            continue
        if (
            obs_session_name is not None
            and ob.session is not None
            and obs_session_name == ob.session.name
        ):
            new_data.obs.append(ob)
            continue
        if obs_key is not None and obs_key in ob:
            # Get the values from all processes in the group and check
            # for consistency.
            group_vals = None
            if group_comm is None:
                group_vals = [ob[obs_key]]
            else:
                group_vals = group_comm.allgather(ob[obs_key])
            if group_vals.count(group_vals[0]) != len(group_vals):
                msg = f"observation {iob}, key '{obs_key}' has inconsistent values across processes"
                log.error_rank(msg, comm=group_comm)
                raise RuntimeError(msg)

            if obs_val is None:
                # We have the key, and are accepting any value
                new_data.obs.append(ob)
                continue
            elif isinstance(obs_val, re.Pattern):
                if obs_val.match(ob[obs_key]) is not None:
                    # Matches our regex
                    new_data.obs.append(ob)
                    continue
            elif obs_val == ob[obs_key]:
                new_data.obs.append(ob)
                continue
    return new_data

split(obs_index=False, obs_name=False, obs_uid=False, obs_session_name=False, obs_key=None, require_full=False)

Split the Data object.

Create new Data objects that have views into unique subsets of the observations (the observations are not copied). Only one "criteria" may be used to perform this splitting operation. The observations may be split by index in the original list, by name, by UID, by session, or by the value of a specified key.

The new Data objects are returned in a dictionary whose keys are the value of the selection criteria (index, name, uid, or value of the key). Any observation that cannot be placed (because it is missing a name, uid or key) will be ignored and not added to any of the returned Data objects. If the require_full parameter is set to True, such situations will raise an exception.

Parameters:

Name Type Description Default
obs_index bool

If True, split by index in original list of observations.

False
obs_name bool

If True, split by observation name.

False
obs_uid bool

If True, split by observation UID.

False
obs_session_name bool

If True, split by session name.

False
obs_key str

Split by values of this observation key.

None
require_full bool

If True, every observation must be placed in the output.

False

Returns:

Type Description
OrderedDict

The dictionary of new Data objects.

Source code in toast/data.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
def split(
    self,
    obs_index=False,
    obs_name=False,
    obs_uid=False,
    obs_session_name=False,
    obs_key=None,
    require_full=False,
):
    """Split the Data object.

    Create new Data objects that have views into unique subsets of the observations
    (the observations are not copied).  Only one "criteria" may be used to perform
    this splitting operation.  The observations may be split by index in the
    original list, by name, by UID, by session, or by the value of a specified key.

    The new Data objects are returned in a dictionary whose keys are the value of
    the selection criteria (index, name, uid, or value of the key).  Any observation
    that cannot be placed (because it is missing a name, uid or key) will be ignored
    and not added to any of the returned Data objects.  If the `require_full`
    parameter is set to True, such situations will raise an exception.

    Args:
        obs_index (bool):  If True, split by index in original list of observations.
        obs_name (bool):  If True, split by observation name.
        obs_uid (bool):  If True, split by observation UID.
        obs_session_name (bool):  If True, split by session name.
        obs_key (str):  Split by values of this observation key.
        require_full (bool):  If True, every observation must be placed in the
            output.

    Returns:
        (OrderedDict):  The dictionary of new Data objects.

    """
    log = Logger.get()
    check = (
        int(obs_index)
        + int(obs_name)
        + int(obs_uid)
        + int(obs_session_name)
        + int(obs_key is not None)
    )
    if check == 0 or check > 1:
        raise RuntimeError("You must specify exactly one split criteria")

    datasplit = OrderedDict()

    group_rank = self.comm.group_rank
    group_comm = self.comm.comm_group

    if obs_index:
        # Splitting by (unique) index
        for iob, ob in enumerate(self.obs):
            newdat = Data(comm=self._comm, view=True)
            newdat._internal = self._internal
            newdat.obs.append(ob)
            datasplit[iob] = newdat
    elif obs_name:
        # Splitting by (unique) name
        for iob, ob in enumerate(self.obs):
            if ob.name is None:
                if require_full:
                    msg = f"require_full is True, but observation {iob} has no name"
                    log.error_rank(msg, comm=group_comm)
                    raise RuntimeError(msg)
            else:
                newdat = Data(comm=self._comm, view=True)
                newdat._internal = self._internal
                newdat.obs.append(ob)
                datasplit[ob.name] = newdat
    elif obs_uid:
        # Splitting by UID
        for iob, ob in enumerate(self.obs):
            if ob.uid is None:
                if require_full:
                    msg = f"require_full is True, but observation {iob} has no UID"
                    log.error_rank(msg, comm=group_comm)
                    raise RuntimeError(msg)
            else:
                newdat = Data(comm=self._comm, view=True)
                newdat._internal = self._internal
                newdat.obs.append(ob)
                datasplit[ob.uid] = newdat
    elif obs_session_name:
        # Splitting by (non-unique) session name
        for iob, ob in enumerate(self.obs):
            if ob.session is None or ob.session.name is None:
                if require_full:
                    msg = f"require_full is True, but observation {iob} has no session name"
                    log.error_rank(msg, comm=group_comm)
                    raise RuntimeError(msg)
            else:
                sname = ob.session.name
                if sname not in datasplit:
                    newdat = Data(comm=self._comm, view=True)
                    newdat._internal = self._internal
                    datasplit[sname] = newdat
                datasplit[sname].obs.append(ob)
    elif obs_key is not None:
        # Splitting by arbitrary key.  Unlike name / uid which are built it to the
        # observation class, arbitrary keys might be modified in different ways
        # across all processes in a group.  For this reason, we do an additional
        # check for consistent values across the process group.
        for iob, ob in enumerate(self.obs):
            if obs_key not in ob:
                if require_full:
                    msg = f"require_full is True, but observation {iob} has no key '{obs_key}'"
                    log.error_rank(msg, comm=group_comm)
                    raise RuntimeError(msg)
            else:
                obs_val = ob[obs_key]
                # Get the values from all processes in the group
                group_vals = None
                if group_comm is None:
                    group_vals = [obs_val]
                else:
                    group_vals = group_comm.allgather(obs_val)
                if group_vals.count(group_vals[0]) != len(group_vals):
                    msg = f"observation {iob}, key '{obs_key}' has inconsistent values across processes"
                    log.error_rank(msg, comm=group_comm)
                    raise RuntimeError(msg)
                if obs_val not in datasplit:
                    newdat = Data(comm=self._comm, view=True)
                    newdat._internal = self._internal
                    datasplit[obs_val] = newdat
                datasplit[obs_val].obs.append(ob)
    return datasplit

Observation

toast.observation.Observation

Bases: MutableMapping

Class representing the data for one observation.

An Observation stores information about data distribution across one or more MPI processes and is a container for four types of objects:

* Local detector data (unique to each process).
* Shared data that has one common copy for every node spanned by the
  observation.
* Intervals defining spans of data with some common characteristic.
* Other arbitrary small metadata.

Small metadata can be stored directly in the Observation using normal square bracket "[]" access to elements (an Observation is a dictionary). Groups of detector data (e.g. "signal", "flags", etc) can be accessed in the separate detector data dictionary (the "detdata" attribute). Shared data can be similarly stored in the "shared" attribute. Lists of intervals are accessed in the "intervals" attribute and data views can use any interval list to access subsets of detector and shared data.

Notes on distributed use with MPI

The detector data within an Observation is distributed among the processes in an MPI communicator. The processes in the communicator are arranged in a rectangular grid, with each process storing some number of detectors for a piece of time covered by the observation. The most common configuration (and the default) is to make this grid the size of the communicator in the "detector direction" and a size of one in the "sample direction"::

MPI           det1  sample(0), sample(1), sample(2), ...., sample(N-1)
rank 0        det2  sample(0), sample(1), sample(2), ...., sample(N-1)
----------------------------------------------------------------------
MPI           det3  sample(0), sample(1), sample(2), ...., sample(N-1)
rank 1        det4  sample(0), sample(1), sample(2), ...., sample(N-1)

So each process has a subset of detectors for the whole span of the observation time. You can override this shape by setting the process_rows to something else. For example, process_rows=1 would result in this::

MPI rank 0                        |        MPI rank 1
----------------------------------+----------------------------
det1  sample(0), sample(1), ...,  |  ...., sample(N-1)
det2  sample(0), sample(1), ...,  |  ...., sample(N-1)
det3  sample(0), sample(1), ...,  |  ...., sample(N-1)
det4  sample(0), sample(1), ...,  |  ...., sample(N-1)

Parameters:

Name Type Description Default
comm Comm

The toast communicator containing information about the process group for this observation.

required
telescope Telescope

An instance of a Telescope object.

required
n_samples int

The total number of samples for this observation.

required
name str

(Optional) The observation name.

None
uid int

(Optional) The Unique ID for this observation. If not specified, the UID will be computed from a hash of the name.

None
session Session

The observing session that this observation is contained in or None.

None
detector_sets list

(Optional) List of lists containing detector names. These discrete detector sets are used to distribute detectors- a detector set will always be within a single row of the process grid. If None, every detector is a set of one.

None
sample_sets list

(Optional) List of lists of chunk sizes (integer numbers of samples). These discrete sample sets are used to distribute sample data. A sample set will always be within a single column of the process grid. If None, any distribution break in the sample direction will happen at an arbitrary place. The sum of all chunks must equal the total number of samples.

None
process_rows int

(Optional) The size of the rectangular process grid in the detector direction. This number must evenly divide into the size of comm. If not specified, defaults to the size of the communicator.

None
Source code in toast/observation.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
class Observation(MutableMapping):
    """Class representing the data for one observation.

    An Observation stores information about data distribution across one or more MPI
    processes and is a container for four types of objects:

        * Local detector data (unique to each process).
        * Shared data that has one common copy for every node spanned by the
          observation.
        * Intervals defining spans of data with some common characteristic.
        * Other arbitrary small metadata.

    Small metadata can be stored directly in the Observation using normal square
    bracket "[]" access to elements (an Observation is a dictionary).  Groups of
    detector data (e.g. "signal", "flags", etc) can be accessed in the separate
    detector data dictionary (the "detdata" attribute).  Shared data can be similarly
    stored in the "shared" attribute.  Lists of intervals are accessed in the
    "intervals" attribute and data views can use any interval list to access subsets
    of detector and shared data.

    **Notes on distributed use with MPI**

    The detector data within an Observation is distributed among the processes in an
    MPI communicator.  The processes in the communicator are arranged in a rectangular
    grid, with each process storing some number of detectors for a piece of time
    covered by the observation.  The most common configuration (and the default) is to
    make this grid the size of the communicator in the "detector direction" and a size
    of one in the "sample direction"::

        MPI           det1  sample(0), sample(1), sample(2), ...., sample(N-1)
        rank 0        det2  sample(0), sample(1), sample(2), ...., sample(N-1)
        ----------------------------------------------------------------------
        MPI           det3  sample(0), sample(1), sample(2), ...., sample(N-1)
        rank 1        det4  sample(0), sample(1), sample(2), ...., sample(N-1)

    So each process has a subset of detectors for the whole span of the observation
    time.  You can override this shape by setting the process_rows to something
    else.  For example, process_rows=1 would result in this::

        MPI rank 0                        |        MPI rank 1
        ----------------------------------+----------------------------
        det1  sample(0), sample(1), ...,  |  ...., sample(N-1)
        det2  sample(0), sample(1), ...,  |  ...., sample(N-1)
        det3  sample(0), sample(1), ...,  |  ...., sample(N-1)
        det4  sample(0), sample(1), ...,  |  ...., sample(N-1)


    Args:
        comm (toast.Comm):  The toast communicator containing information about the
            process group for this observation.
        telescope (Telescope):  An instance of a Telescope object.
        n_samples (int):  The total number of samples for this observation.
        name (str):  (Optional) The observation name.
        uid (int):  (Optional) The Unique ID for this observation.  If not specified,
            the UID will be computed from a hash of the name.
        session (Session):  The observing session that this observation is contained
            in or None.
        detector_sets (list):  (Optional) List of lists containing detector names.
            These discrete detector sets are used to distribute detectors- a detector
            set will always be within a single row of the process grid.  If None,
            every detector is a set of one.
        sample_sets (list):  (Optional) List of lists of chunk sizes (integer numbers of
            samples).  These discrete sample sets are used to distribute sample data.
            A sample set will always be within a single column of the process grid.  If
            None, any distribution break in the sample direction will happen at an
            arbitrary place.  The sum of all chunks must equal the total number of
            samples.
        process_rows (int):  (Optional) The size of the rectangular process grid
            in the detector direction.  This number must evenly divide into the size of
            comm.  If not specified, defaults to the size of the communicator.

    """

    view = ViewInterface()

    @function_timer
    def __init__(
        self,
        comm,
        telescope,
        n_samples,
        name=None,
        uid=None,
        session=None,
        detector_sets=None,
        sample_sets=None,
        process_rows=None,
    ):
        log = Logger.get()
        self._telescope = telescope
        self._name = name
        self._uid = uid
        self._session = session

        if self._uid is None and self._name is not None:
            self._uid = name_UID(self._name)

        if self._session is None:
            if self._name is not None:
                self._session = Session(
                    name=self._name,
                    uid=self._uid,
                    start=None,
                    end=None,
                )
        elif not isinstance(self._session, Session):
            raise RuntimeError("session should be a Session instance or None")

        self.dist = DistDetSamp(
            n_samples,
            self._telescope.focalplane.detectors,
            sample_sets,
            detector_sets,
            comm,
            process_rows,
        )

        # The internal metadata dictionary
        self._internal = dict()

        # Set up the data managers
        self.detdata = DetDataManager(self.dist)
        self.shared = SharedDataManager(self.dist)
        self.intervals = IntervalsManager(self.dist, n_samples)

        # Set up local per-detector cutting
        self._detflags = {x: int(0) for x in self.dist.dets[self.dist.comm.group_rank]}

    # Fully clear the observation

    def clear(self):
        self.view.clear()
        self.intervals.clear()
        self.detdata.clear()
        self.shared.clear()
        self._internal.clear()

    # General properties

    @property
    def telescope(self):
        """
        (Telescope):  The Telescope instance for this observation.
        """
        return self._telescope

    @property
    def name(self):
        """
        (str):  The name of the observation.
        """
        return self._name

    @property
    def uid(self):
        """
        (int):  The Unique ID for this observation.
        """
        return self._uid

    @property
    def session(self):
        """
        (Session):  The Session instance for this observation.
        """
        return self._session

    @property
    def comm(self):
        """
        (toast.Comm):  The overall communicator.
        """
        return self.dist.comm

    # The MPI communicator along the current row of the process grid

    @property
    def comm_row(self):
        """
        (mpi4py.MPI.Comm):  The communicator for processes in the same row (or None).
        """
        return self.dist.comm_row

    @property
    def comm_row_size(self):
        """
        (int): The number of processes in the row communicator.
        """
        return self.dist.comm_row_size

    @property
    def comm_row_rank(self):
        """
        (int): The rank of this process in the row communicator.
        """
        return self.dist.comm_row_rank

    # The MPI communicator along the current column of the process grid

    @property
    def comm_col(self):
        """
        (mpi4py.MPI.Comm):  The communicator for processes in the same column (or None).
        """
        return self.dist.comm_col

    @property
    def comm_col_size(self):
        """
        (int): The number of processes in the column communicator.
        """
        return self.dist.comm_col_size

    @property
    def comm_col_rank(self):
        """
        (int): The rank of this process in the column communicator.
        """
        return self.dist.comm_col_rank

    # Detector distribution

    @property
    def all_detectors(self):
        """
        (list): All detectors stored in this observation.
        """
        return self.dist.detectors

    @property
    def local_detectors(self):
        """
        (list): The detectors assigned to this process.
        """
        return self.dist.dets[self.dist.comm.group_rank]

    @property
    def local_detector_flags(self):
        """(dict): The local per-detector flags"""
        return self._detflags

    def update_local_detector_flags(self, vals):
        """Update the per-detector flagging.

        This does a bitwise OR with the existing flag values.

        Args:
            vals (dict):  The flag values for one or more detectors.

        Returns:
            None

        """
        ldets = set(self.local_detectors)
        for k, v in vals.items():
            if k not in ldets:
                msg = f"Cannot update per-detector flag for '{k}', which is"
                msg += " not a local detector"
                raise RuntimeError(msg)
            self._detflags[k] |= int(v)

    def set_local_detector_flags(self, vals):
        """Set the per-detector flagging.

        This resets the per-detector flags to the specified values.

        Args:
            vals (dict):  The flag values for one or more detectors.

        Returns:
            None

        """
        ldets = set(self.local_detectors)
        for k, v in vals.items():
            if k not in ldets:
                msg = f"Cannot set per-detector flag for '{k}', which is"
                msg += " not a local detector"
                raise RuntimeError(msg)
            self._detflags[k] = int(v)

    def select_local_detectors(
        self,
        selection=None,
        flagmask=0,
    ):
        """Get the local detectors assigned to this process.

        This takes the full list of local detectors and optionally prunes them
        by the specified selection and / or applies per-detector flags with
        the given mask.

        Args:
            selection (list):  Only return detectors in this set.
            flagmask (uint8):  Apply this mask to per-detector flags and only
                include detectors with a result of zero (good).

        Returns:
            (list):  The selected detectors.

        """
        if flagmask is None:
            good = set(self.local_detectors)
        else:
            good = set(
                [
                    x
                    for x in self.local_detectors
                    if (self.local_detector_flags[x] & flagmask) == 0
                ]
            )
        dets = list()
        if selection is None:
            for det in self.local_detectors:
                if det in good:
                    dets.append(det)
        else:
            sel_set = set(selection)
            for det in self.local_detectors:
                if (det in sel_set) and (det in good):
                    dets.append(det)
        # print(f"SELECT mask {int(flagmask)} {selection}: {dets}", flush=True)
        return dets

    # Detector set distribution

    @property
    def all_detector_sets(self):
        """
        (list):  The total list of detector sets for this observation.
        """
        return self.dist.detector_sets

    @property
    def local_detector_sets(self):
        """
        (list):  The detector sets assigned to this process (or None).
        """
        if self.dist.detector_sets is None:
            return None
        else:
            ds = list()
            for d in range(self.dist.det_sets[self.dist.comm.group_rank].n_elem):
                off = self.dist.det_sets[self.dist.comm.group_rank].offset
                ds.append(self.dist.detector_sets[off + d])
            return ds

    # Sample distribution

    @property
    def n_all_samples(self):
        """(int): the total number of samples in this observation."""
        return self.dist.samples

    @property
    def local_index_offset(self):
        """
        The first sample on this process, relative to the observation start.
        """
        return self.dist.samps[self.dist.comm.group_rank].offset

    @property
    def n_local_samples(self):
        """
        The number of local samples on this process.
        """
        return self.dist.samps[self.dist.comm.group_rank].n_elem

    # Sample set distribution

    @property
    def all_sample_sets(self):
        """
        (list):  The input full list of sample sets used in data distribution
        """
        return self.dist.sample_sets

    @property
    def local_sample_sets(self):
        """
        (list):  The sample sets assigned to this process (or None).
        """
        if self.dist.sample_sets is None:
            return None
        else:
            ss = list()
            for s in range(self.dist.samp_sets[self.dist.comm.group_rank].n_elem):
                off = self.dist.samp_sets[self.dist.comm.group_rank].offset
                ss.append(self.dist.sample_sets[off + s])
            return ss

    # Helper methods to check for the 2 most common cases, where data is
    # distributed either fully by detector or fully by sample.  Note that if there
    # is only one process then both conditions are true.

    @property
    def is_distributed_by_sample(self):
        if self.dist.comm_row_size == self.dist.comm.group_size:
            return True
        else:
            return False

    @property
    def is_distributed_by_detector(self):
        if self.dist.comm_col_size == self.dist.comm.group_size:
            return True
        else:
            return False

    # Mapping methods

    def __getitem__(self, key):
        return self._internal[key]

    def __delitem__(self, key):
        del self._internal[key]

    def __setitem__(self, key, value):
        self._internal[key] = value

    def __iter__(self):
        return iter(self._internal)

    def __len__(self):
        return len(self._internal)

    def __del__(self):
        if hasattr(self, "detdata"):
            self.detdata.clear()
        if hasattr(self, "shared"):
            self.shared.clear()

    def __repr__(self):
        val = "<Observation"
        val += f"\n  name = '{self.name}'"
        val += f"\n  uid = '{self.uid}'"
        if self.comm.comm_group is None:
            val += "  group has a single process (no MPI)"
        else:
            val += f"  group has {self.comm.group_size} processes"
        val += f"\n  telescope = {self._telescope.__repr__()}"
        val += f"\n  session = {self._session.__repr__()}"
        for k, v in self._internal.items():
            val += f"\n  {k} = {v}"
        val += f"\n  {self.n_all_samples} total samples ({self.n_local_samples} local)"
        val += f"\n  shared:  {self.shared}"
        val += f"\n  detdata:  {self.detdata}"
        val += f"\n  intervals:  {self.intervals}"
        val += "\n>"
        return val

    def __eq__(self, other):
        # Note that testing for equality is quite expensive, since it means testing all
        # metadata and also all detector, shared, and interval data.  This is mainly
        # used for unit tests.
        log = Logger.get()
        fail = 0
        if self.name != other.name:
            fail = 1
            log.verbose(
                f"Proc {self.comm.world_rank}:  Obs names {self.name} != {other.name}"
            )
        if self.uid != other.uid:
            fail = 1
            log.verbose(
                f"Proc {self.comm.world_rank}:  Obs uid {self.uid} != {other.uid}"
            )
        if self.telescope != other.telescope:
            fail = 1
            log.verbose(f"Proc {self.comm.world_rank}:  Obs telescopes not equal")
        if self.session != other.session:
            fail = 1
            log.verbose(f"Proc {self.comm.world_rank}:  Obs sessions not equal")
        if self.dist != other.dist:
            fail = 1
            log.verbose(f"Proc {self.comm.world_rank}:  Obs distributions not equal")
        if set(self._internal.keys()) != set(other._internal.keys()):
            fail = 1
            log.verbose(f"Proc {self.comm.world_rank}:  Obs metadata keys not equal")
        for k, v in self._internal.items():
            if v != other._internal[k]:
                feq = True
                try:
                    feq = np.allclose(v, other._internal[k])
                except Exception:
                    # Not floating point data
                    feq = False
                if not feq:
                    fail = 1
                    log.verbose(
                        f"Proc {self.comm.world_rank}:  Obs metadata[{k}]:  {v} != {other[k]}"
                    )
                    break
        if self.shared != other.shared:
            fail = 1
            log.verbose(f"Proc {self.comm.world_rank}:  Obs shared data not equal")
        if self.detdata != other.detdata:
            fail = 1
            log.verbose(f"Proc {self.comm.world_rank}:  Obs detdata not equal")
        if self.intervals != other.intervals:
            fail = 1
            log.verbose(f"Proc {self.comm.world_rank}:  Obs intervals not equal")
        if self.comm.comm_group is not None:
            fail = self.comm.comm_group.allreduce(fail, op=MPI.SUM)
        return fail == 0

    def __ne__(self, other):
        return not self.__eq__(other)

    def duplicate(
        self, times=None, meta=None, shared=None, detdata=None, intervals=None
    ):
        """Return a copy of the observation and all its data.

        The times field should be the name of the shared field containing timestamps.
        This is used when copying interval lists to the new observation so that these
        objects reference the timestamps within this observation (rather than the old
        one).  If this is not specified and some intervals exist, then an exception is
        raised.

        The meta, shared, detdata, and intervals list specifies which of those objects
        to copy to the new observation.  If these are None, then all objects are
        duplicated.

        Args:
            times (str):  The name of the timestamps shared field.
            meta (list):  List of metadata objects to copy, or None.
            shared (list):  List of shared objects to copy, or None.
            detdata (list):  List of detdata objects to copy, or None.
            intervals (list):  List of intervals objects to copy, or None.

        Returns:
            (Observation):  The new copy of the observation.

        """
        log = Logger.get()
        if times is None and len(self.intervals) > 0:
            msg = "You must specify the times field when duplicating observations "
            msg += "that have some intervals defined."
            log.error(msg)
            raise RuntimeError(msg)
        new_obs = Observation(
            self.dist.comm,
            self.telescope,
            self.n_all_samples,
            name=self.name,
            uid=self.uid,
            session=self.session,
            detector_sets=self.all_detector_sets,
            sample_sets=self.all_sample_sets,
            process_rows=self.dist.process_rows,
        )
        new_obs.set_local_detector_flags(self.local_detector_flags)
        for k, v in self._internal.items():
            if meta is None or k in meta:
                new_obs[k] = copy.deepcopy(v)
        for name, data in self.detdata.items():
            if detdata is None or name in detdata:
                new_obs.detdata[name] = data
        copy_shared = list()
        if times is not None:
            copy_shared.append(times)
        if shared is not None:
            copy_shared.extend(shared)
        for name, data in self.shared.items():
            if shared is None or name in copy_shared:
                # Create the object on the corresponding communicator in the new obs
                new_obs.shared.assign_mpishared(name, data, self.shared.comm_type(name))
        for name, data in self.intervals.items():
            if intervals is None or name in intervals:
                timespans = [(x.start, x.stop) for x in data]
                new_obs.intervals[name] = IntervalList(
                    new_obs.shared[times], timespans=timespans
                )
        return new_obs

    def memory_use(self):
        """Estimate the memory used by shared and detector data.

        This sums the memory used by the shared and detdata attributes and returns the
        total on all processes.  This function is blocking on the observation
        communicator.

        Returns:
            (int):  The number of bytes of memory used by timestream data.

        """
        # Get local memory from detector data
        local_mem = self.detdata.memory_use()

        # If there are many intervals, this could take up non-trivial space.  Add them
        # to the local total
        for iname, it in self.intervals.items():
            if len(it) > 0:
                local_mem += len(it) * interval_dtype.itemsize

        # Sum the aggregate local memory
        total = None
        if self.comm.comm_group is None:
            total = local_mem
        else:
            total = self.comm.comm_group.allreduce(local_mem, op=MPI.SUM)

        # The total shared memory use is already returned on every process by this
        # next function.
        total += self.shared.memory_use()
        return total

    # Redistribution

    @function_timer
    def redistribute(
        self,
        process_rows,
        times=None,
        override_sample_sets=False,
        override_detector_sets=False,
        return_global_intervals=False,
    ):
        """Take the currently allocated observation and redistribute in place.

        This changes the data distribution within the observation.  After
        re-assigning all detectors and samples, the currently allocated shared data
        objects and detector data objects are redistributed using the observation
        communicator.

        Args:
            process_rows (int):  The size of the new process grid in the detector
                direction.  This number must evenly divide into the size of the
                observation communicator.
            times (str):  The shared data field representing the timestamps.  This
                is used to recompute the intervals after redistribution.
            override_sample_sets (False, None or list):  If not False, override
                existing sample set boundaries in the redistributed data.
            override_detector_sets (False, None or list):  If not False, override
                existing detector set boundaries in the redistributed data.
            return_global_intervals (bool):  Return a list of global intervals for
                reference

        Returns:
            None or global_intervals

        """
        log = Logger.get()
        if process_rows == self.dist.process_rows:
            # Nothing to do!
            return

        if override_sample_sets == False:
            sample_sets = self.dist.sample_sets
        else:
            sample_sets = override_sample_sets

        if override_detector_sets == False:
            detector_sets = self.dist.detector_sets
        else:
            detector_sets = override_detector_sets

        # Get the total set of per-detector flags
        if self.comm_col_size == 1:
            all_det_flags = self.local_detector_flags
        else:
            pdflags = self.comm_col.gather(self.local_detector_flags, root=0)
            all_det_flags = None
            if self.comm_col_rank == 0:
                all_det_flags = dict()
                for pf in pdflags:
                    all_det_flags.update(pf)
            all_det_flags = self.comm_col.bcast(all_det_flags, root=0)

        # Create the new distribution
        new_dist = DistDetSamp(
            self.dist.samples,
            self._telescope.focalplane.detectors,
            sample_sets,
            detector_sets,
            self.dist.comm,
            process_rows,
        )

        # Do the actual redistribution
        new_shr_manager, new_det_manager, new_intervals_manager, global_intervals = (
            redistribute_data(
                self.dist,
                new_dist,
                self.shared,
                self.detdata,
                self.intervals,
                times=times,
                dbg=self.name,
            )
        )

        # Redistribute any metadata objects that support it.
        for k, v in self._internal.items():
            if hasattr(v, "redistribute"):
                v.redistribute(self.dist, new_dist)

        # Replace our distribution and data managers with the new ones.
        del self.dist
        self.dist = new_dist

        self.shared.clear()
        del self.shared
        self.shared = new_shr_manager

        self.detdata.clear()
        del self.detdata
        self.detdata = new_det_manager

        self.intervals.clear()
        del self.intervals
        self.intervals = new_intervals_manager

        # Restore detector flags for our new local detectors
        self._detflags = {x: int(0) for x in self.dist.dets[self.dist.comm.group_rank]}
        self.set_local_detector_flags(
            {x: all_det_flags[x] for x in self.local_detectors}
        )

        if return_global_intervals:
            global_intervals = self.dist.comm.comm_group.bcast(global_intervals)
            return global_intervals
        else:
            return

    # Accelerator use

    def accel_create(self, names):
        """Create a set of data objects on the device.

        This takes a dictionary with the same format as those used by the Operator
        provides() and requires() methods.

        Args:
            names (dict):  Dictionary of lists.

        Returns:
            None

        """
        for key in names["detdata"]:
            self.detdata.accel_create(key)
        for key in names["shared"]:
            self.shared.accel_create(key)
        for key in names["intervals"]:
            self.intervals.accel_create(key)
        for key, val in self._internal.items():
            if isinstance(val, AcceleratorObject):
                if not val.accel_exists():
                    val.accel_create()

    def accel_update_device(self, names):
        """Copy data objects to the device.

        This takes a dictionary with the same format as those used by the Operator
        provides() and requires() methods.

        Args:
            names (dict):  Dictionary of lists.

        Returns:
            None

        """
        for key in names["detdata"]:
            self.detdata.accel_update_device(key)
        for key in names["shared"]:
            self.shared.accel_update_device(key)
        for key in names["intervals"]:
            self.intervals.accel_update_device(key)
        for key, val in self._internal.items():
            if isinstance(val, AcceleratorObject):
                if not val.accel_in_use():
                    val.accel_update_device()

    def accel_update_host(self, names):
        """Copy data objects from the device.

        This takes a dictionary with the same format as those used by the Operator
        provides() and requires() methods.

        Args:
            names (dict):  Dictionary of lists.

        Returns:
            None

        """
        for key in names["detdata"]:
            self.detdata.accel_update_host(key)
        for key in names["shared"]:
            self.shared.accel_update_host(key)
        for key in names["intervals"]:
            self.intervals.accel_update_host(key)
        for key, val in self._internal.items():
            if isinstance(val, AcceleratorObject):
                if val.accel_in_use():
                    val.accel_update_host()

    def accel_clear(self):
        self.detdata.accel_clear()
        self.shared.accel_clear()
        self.intervals.accel_clear()
        for key, val in self._internal.items():
            if isinstance(val, AcceleratorObject):
                if val.accel_exists():
                    val.accel_delete()

_detflags = {x: int(0)for x in self.dist.dets[self.dist.comm.group_rank]} instance-attribute

_internal = dict() instance-attribute

_name = name instance-attribute

_session = session instance-attribute

_telescope = telescope instance-attribute

_uid = uid instance-attribute

all_detector_sets property

(list): The total list of detector sets for this observation.

all_detectors property

(list): All detectors stored in this observation.

all_sample_sets property

(list): The input full list of sample sets used in data distribution

comm property

(toast.Comm): The overall communicator.

comm_col property

(mpi4py.MPI.Comm): The communicator for processes in the same column (or None).

comm_col_rank property

(int): The rank of this process in the column communicator.

comm_col_size property

(int): The number of processes in the column communicator.

comm_row property

(mpi4py.MPI.Comm): The communicator for processes in the same row (or None).

comm_row_rank property

(int): The rank of this process in the row communicator.

comm_row_size property

(int): The number of processes in the row communicator.

detdata = DetDataManager(self.dist) instance-attribute

dist = DistDetSamp(n_samples, self._telescope.focalplane.detectors, sample_sets, detector_sets, comm, process_rows) instance-attribute

intervals = IntervalsManager(self.dist, n_samples) instance-attribute

is_distributed_by_detector property

is_distributed_by_sample property

local_detector_flags property

(dict): The local per-detector flags

local_detector_sets property

(list): The detector sets assigned to this process (or None).

local_detectors property

(list): The detectors assigned to this process.

local_index_offset property

The first sample on this process, relative to the observation start.

local_sample_sets property

(list): The sample sets assigned to this process (or None).

n_all_samples property

(int): the total number of samples in this observation.

n_local_samples property

The number of local samples on this process.

name property

(str): The name of the observation.

session property

(Session): The Session instance for this observation.

shared = SharedDataManager(self.dist) instance-attribute

telescope property

(Telescope): The Telescope instance for this observation.

uid property

(int): The Unique ID for this observation.

view = ViewInterface() class-attribute instance-attribute

__del__()

Source code in toast/observation.py
552
553
554
555
556
def __del__(self):
    if hasattr(self, "detdata"):
        self.detdata.clear()
    if hasattr(self, "shared"):
        self.shared.clear()

__delitem__(key)

Source code in toast/observation.py
540
541
def __delitem__(self, key):
    del self._internal[key]

__eq__(other)

Source code in toast/observation.py
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
def __eq__(self, other):
    # Note that testing for equality is quite expensive, since it means testing all
    # metadata and also all detector, shared, and interval data.  This is mainly
    # used for unit tests.
    log = Logger.get()
    fail = 0
    if self.name != other.name:
        fail = 1
        log.verbose(
            f"Proc {self.comm.world_rank}:  Obs names {self.name} != {other.name}"
        )
    if self.uid != other.uid:
        fail = 1
        log.verbose(
            f"Proc {self.comm.world_rank}:  Obs uid {self.uid} != {other.uid}"
        )
    if self.telescope != other.telescope:
        fail = 1
        log.verbose(f"Proc {self.comm.world_rank}:  Obs telescopes not equal")
    if self.session != other.session:
        fail = 1
        log.verbose(f"Proc {self.comm.world_rank}:  Obs sessions not equal")
    if self.dist != other.dist:
        fail = 1
        log.verbose(f"Proc {self.comm.world_rank}:  Obs distributions not equal")
    if set(self._internal.keys()) != set(other._internal.keys()):
        fail = 1
        log.verbose(f"Proc {self.comm.world_rank}:  Obs metadata keys not equal")
    for k, v in self._internal.items():
        if v != other._internal[k]:
            feq = True
            try:
                feq = np.allclose(v, other._internal[k])
            except Exception:
                # Not floating point data
                feq = False
            if not feq:
                fail = 1
                log.verbose(
                    f"Proc {self.comm.world_rank}:  Obs metadata[{k}]:  {v} != {other[k]}"
                )
                break
    if self.shared != other.shared:
        fail = 1
        log.verbose(f"Proc {self.comm.world_rank}:  Obs shared data not equal")
    if self.detdata != other.detdata:
        fail = 1
        log.verbose(f"Proc {self.comm.world_rank}:  Obs detdata not equal")
    if self.intervals != other.intervals:
        fail = 1
        log.verbose(f"Proc {self.comm.world_rank}:  Obs intervals not equal")
    if self.comm.comm_group is not None:
        fail = self.comm.comm_group.allreduce(fail, op=MPI.SUM)
    return fail == 0

__getitem__(key)

Source code in toast/observation.py
537
538
def __getitem__(self, key):
    return self._internal[key]

__init__(comm, telescope, n_samples, name=None, uid=None, session=None, detector_sets=None, sample_sets=None, process_rows=None)

Source code in toast/observation.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
@function_timer
def __init__(
    self,
    comm,
    telescope,
    n_samples,
    name=None,
    uid=None,
    session=None,
    detector_sets=None,
    sample_sets=None,
    process_rows=None,
):
    log = Logger.get()
    self._telescope = telescope
    self._name = name
    self._uid = uid
    self._session = session

    if self._uid is None and self._name is not None:
        self._uid = name_UID(self._name)

    if self._session is None:
        if self._name is not None:
            self._session = Session(
                name=self._name,
                uid=self._uid,
                start=None,
                end=None,
            )
    elif not isinstance(self._session, Session):
        raise RuntimeError("session should be a Session instance or None")

    self.dist = DistDetSamp(
        n_samples,
        self._telescope.focalplane.detectors,
        sample_sets,
        detector_sets,
        comm,
        process_rows,
    )

    # The internal metadata dictionary
    self._internal = dict()

    # Set up the data managers
    self.detdata = DetDataManager(self.dist)
    self.shared = SharedDataManager(self.dist)
    self.intervals = IntervalsManager(self.dist, n_samples)

    # Set up local per-detector cutting
    self._detflags = {x: int(0) for x in self.dist.dets[self.dist.comm.group_rank]}

__iter__()

Source code in toast/observation.py
546
547
def __iter__(self):
    return iter(self._internal)

__len__()

Source code in toast/observation.py
549
550
def __len__(self):
    return len(self._internal)

__ne__(other)

Source code in toast/observation.py
632
633
def __ne__(self, other):
    return not self.__eq__(other)

__repr__()

Source code in toast/observation.py
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
def __repr__(self):
    val = "<Observation"
    val += f"\n  name = '{self.name}'"
    val += f"\n  uid = '{self.uid}'"
    if self.comm.comm_group is None:
        val += "  group has a single process (no MPI)"
    else:
        val += f"  group has {self.comm.group_size} processes"
    val += f"\n  telescope = {self._telescope.__repr__()}"
    val += f"\n  session = {self._session.__repr__()}"
    for k, v in self._internal.items():
        val += f"\n  {k} = {v}"
    val += f"\n  {self.n_all_samples} total samples ({self.n_local_samples} local)"
    val += f"\n  shared:  {self.shared}"
    val += f"\n  detdata:  {self.detdata}"
    val += f"\n  intervals:  {self.intervals}"
    val += "\n>"
    return val

__setitem__(key, value)

Source code in toast/observation.py
543
544
def __setitem__(self, key, value):
    self._internal[key] = value

accel_clear()

Source code in toast/observation.py
926
927
928
929
930
931
932
933
def accel_clear(self):
    self.detdata.accel_clear()
    self.shared.accel_clear()
    self.intervals.accel_clear()
    for key, val in self._internal.items():
        if isinstance(val, AcceleratorObject):
            if val.accel_exists():
                val.accel_delete()

accel_create(names)

Create a set of data objects on the device.

This takes a dictionary with the same format as those used by the Operator provides() and requires() methods.

Parameters:

Name Type Description Default
names dict

Dictionary of lists.

required

Returns:

Type Description

None

Source code in toast/observation.py
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
def accel_create(self, names):
    """Create a set of data objects on the device.

    This takes a dictionary with the same format as those used by the Operator
    provides() and requires() methods.

    Args:
        names (dict):  Dictionary of lists.

    Returns:
        None

    """
    for key in names["detdata"]:
        self.detdata.accel_create(key)
    for key in names["shared"]:
        self.shared.accel_create(key)
    for key in names["intervals"]:
        self.intervals.accel_create(key)
    for key, val in self._internal.items():
        if isinstance(val, AcceleratorObject):
            if not val.accel_exists():
                val.accel_create()

accel_update_device(names)

Copy data objects to the device.

This takes a dictionary with the same format as those used by the Operator provides() and requires() methods.

Parameters:

Name Type Description Default
names dict

Dictionary of lists.

required

Returns:

Type Description

None

Source code in toast/observation.py
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
def accel_update_device(self, names):
    """Copy data objects to the device.

    This takes a dictionary with the same format as those used by the Operator
    provides() and requires() methods.

    Args:
        names (dict):  Dictionary of lists.

    Returns:
        None

    """
    for key in names["detdata"]:
        self.detdata.accel_update_device(key)
    for key in names["shared"]:
        self.shared.accel_update_device(key)
    for key in names["intervals"]:
        self.intervals.accel_update_device(key)
    for key, val in self._internal.items():
        if isinstance(val, AcceleratorObject):
            if not val.accel_in_use():
                val.accel_update_device()

accel_update_host(names)

Copy data objects from the device.

This takes a dictionary with the same format as those used by the Operator provides() and requires() methods.

Parameters:

Name Type Description Default
names dict

Dictionary of lists.

required

Returns:

Type Description

None

Source code in toast/observation.py
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
def accel_update_host(self, names):
    """Copy data objects from the device.

    This takes a dictionary with the same format as those used by the Operator
    provides() and requires() methods.

    Args:
        names (dict):  Dictionary of lists.

    Returns:
        None

    """
    for key in names["detdata"]:
        self.detdata.accel_update_host(key)
    for key in names["shared"]:
        self.shared.accel_update_host(key)
    for key in names["intervals"]:
        self.intervals.accel_update_host(key)
    for key, val in self._internal.items():
        if isinstance(val, AcceleratorObject):
            if val.accel_in_use():
                val.accel_update_host()

clear()

Source code in toast/observation.py
256
257
258
259
260
261
def clear(self):
    self.view.clear()
    self.intervals.clear()
    self.detdata.clear()
    self.shared.clear()
    self._internal.clear()

duplicate(times=None, meta=None, shared=None, detdata=None, intervals=None)

Return a copy of the observation and all its data.

The times field should be the name of the shared field containing timestamps. This is used when copying interval lists to the new observation so that these objects reference the timestamps within this observation (rather than the old one). If this is not specified and some intervals exist, then an exception is raised.

The meta, shared, detdata, and intervals list specifies which of those objects to copy to the new observation. If these are None, then all objects are duplicated.

Parameters:

Name Type Description Default
times str

The name of the timestamps shared field.

None
meta list

List of metadata objects to copy, or None.

None
shared list

List of shared objects to copy, or None.

None
detdata list

List of detdata objects to copy, or None.

None
intervals list

List of intervals objects to copy, or None.

None

Returns:

Type Description
Observation

The new copy of the observation.

Source code in toast/observation.py
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
def duplicate(
    self, times=None, meta=None, shared=None, detdata=None, intervals=None
):
    """Return a copy of the observation and all its data.

    The times field should be the name of the shared field containing timestamps.
    This is used when copying interval lists to the new observation so that these
    objects reference the timestamps within this observation (rather than the old
    one).  If this is not specified and some intervals exist, then an exception is
    raised.

    The meta, shared, detdata, and intervals list specifies which of those objects
    to copy to the new observation.  If these are None, then all objects are
    duplicated.

    Args:
        times (str):  The name of the timestamps shared field.
        meta (list):  List of metadata objects to copy, or None.
        shared (list):  List of shared objects to copy, or None.
        detdata (list):  List of detdata objects to copy, or None.
        intervals (list):  List of intervals objects to copy, or None.

    Returns:
        (Observation):  The new copy of the observation.

    """
    log = Logger.get()
    if times is None and len(self.intervals) > 0:
        msg = "You must specify the times field when duplicating observations "
        msg += "that have some intervals defined."
        log.error(msg)
        raise RuntimeError(msg)
    new_obs = Observation(
        self.dist.comm,
        self.telescope,
        self.n_all_samples,
        name=self.name,
        uid=self.uid,
        session=self.session,
        detector_sets=self.all_detector_sets,
        sample_sets=self.all_sample_sets,
        process_rows=self.dist.process_rows,
    )
    new_obs.set_local_detector_flags(self.local_detector_flags)
    for k, v in self._internal.items():
        if meta is None or k in meta:
            new_obs[k] = copy.deepcopy(v)
    for name, data in self.detdata.items():
        if detdata is None or name in detdata:
            new_obs.detdata[name] = data
    copy_shared = list()
    if times is not None:
        copy_shared.append(times)
    if shared is not None:
        copy_shared.extend(shared)
    for name, data in self.shared.items():
        if shared is None or name in copy_shared:
            # Create the object on the corresponding communicator in the new obs
            new_obs.shared.assign_mpishared(name, data, self.shared.comm_type(name))
    for name, data in self.intervals.items():
        if intervals is None or name in intervals:
            timespans = [(x.start, x.stop) for x in data]
            new_obs.intervals[name] = IntervalList(
                new_obs.shared[times], timespans=timespans
            )
    return new_obs

memory_use()

Estimate the memory used by shared and detector data.

This sums the memory used by the shared and detdata attributes and returns the total on all processes. This function is blocking on the observation communicator.

Returns:

Type Description
int

The number of bytes of memory used by timestream data.

Source code in toast/observation.py
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
def memory_use(self):
    """Estimate the memory used by shared and detector data.

    This sums the memory used by the shared and detdata attributes and returns the
    total on all processes.  This function is blocking on the observation
    communicator.

    Returns:
        (int):  The number of bytes of memory used by timestream data.

    """
    # Get local memory from detector data
    local_mem = self.detdata.memory_use()

    # If there are many intervals, this could take up non-trivial space.  Add them
    # to the local total
    for iname, it in self.intervals.items():
        if len(it) > 0:
            local_mem += len(it) * interval_dtype.itemsize

    # Sum the aggregate local memory
    total = None
    if self.comm.comm_group is None:
        total = local_mem
    else:
        total = self.comm.comm_group.allreduce(local_mem, op=MPI.SUM)

    # The total shared memory use is already returned on every process by this
    # next function.
    total += self.shared.memory_use()
    return total

redistribute(process_rows, times=None, override_sample_sets=False, override_detector_sets=False, return_global_intervals=False)

Take the currently allocated observation and redistribute in place.

This changes the data distribution within the observation. After re-assigning all detectors and samples, the currently allocated shared data objects and detector data objects are redistributed using the observation communicator.

Parameters:

Name Type Description Default
process_rows int

The size of the new process grid in the detector direction. This number must evenly divide into the size of the observation communicator.

required
times str

The shared data field representing the timestamps. This is used to recompute the intervals after redistribution.

None
override_sample_sets (False, None or list)

If not False, override existing sample set boundaries in the redistributed data.

False
override_detector_sets (False, None or list)

If not False, override existing detector set boundaries in the redistributed data.

False
return_global_intervals bool

Return a list of global intervals for reference

False

Returns:

Type Description

None or global_intervals

Source code in toast/observation.py
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
@function_timer
def redistribute(
    self,
    process_rows,
    times=None,
    override_sample_sets=False,
    override_detector_sets=False,
    return_global_intervals=False,
):
    """Take the currently allocated observation and redistribute in place.

    This changes the data distribution within the observation.  After
    re-assigning all detectors and samples, the currently allocated shared data
    objects and detector data objects are redistributed using the observation
    communicator.

    Args:
        process_rows (int):  The size of the new process grid in the detector
            direction.  This number must evenly divide into the size of the
            observation communicator.
        times (str):  The shared data field representing the timestamps.  This
            is used to recompute the intervals after redistribution.
        override_sample_sets (False, None or list):  If not False, override
            existing sample set boundaries in the redistributed data.
        override_detector_sets (False, None or list):  If not False, override
            existing detector set boundaries in the redistributed data.
        return_global_intervals (bool):  Return a list of global intervals for
            reference

    Returns:
        None or global_intervals

    """
    log = Logger.get()
    if process_rows == self.dist.process_rows:
        # Nothing to do!
        return

    if override_sample_sets == False:
        sample_sets = self.dist.sample_sets
    else:
        sample_sets = override_sample_sets

    if override_detector_sets == False:
        detector_sets = self.dist.detector_sets
    else:
        detector_sets = override_detector_sets

    # Get the total set of per-detector flags
    if self.comm_col_size == 1:
        all_det_flags = self.local_detector_flags
    else:
        pdflags = self.comm_col.gather(self.local_detector_flags, root=0)
        all_det_flags = None
        if self.comm_col_rank == 0:
            all_det_flags = dict()
            for pf in pdflags:
                all_det_flags.update(pf)
        all_det_flags = self.comm_col.bcast(all_det_flags, root=0)

    # Create the new distribution
    new_dist = DistDetSamp(
        self.dist.samples,
        self._telescope.focalplane.detectors,
        sample_sets,
        detector_sets,
        self.dist.comm,
        process_rows,
    )

    # Do the actual redistribution
    new_shr_manager, new_det_manager, new_intervals_manager, global_intervals = (
        redistribute_data(
            self.dist,
            new_dist,
            self.shared,
            self.detdata,
            self.intervals,
            times=times,
            dbg=self.name,
        )
    )

    # Redistribute any metadata objects that support it.
    for k, v in self._internal.items():
        if hasattr(v, "redistribute"):
            v.redistribute(self.dist, new_dist)

    # Replace our distribution and data managers with the new ones.
    del self.dist
    self.dist = new_dist

    self.shared.clear()
    del self.shared
    self.shared = new_shr_manager

    self.detdata.clear()
    del self.detdata
    self.detdata = new_det_manager

    self.intervals.clear()
    del self.intervals
    self.intervals = new_intervals_manager

    # Restore detector flags for our new local detectors
    self._detflags = {x: int(0) for x in self.dist.dets[self.dist.comm.group_rank]}
    self.set_local_detector_flags(
        {x: all_det_flags[x] for x in self.local_detectors}
    )

    if return_global_intervals:
        global_intervals = self.dist.comm.comm_group.bcast(global_intervals)
        return global_intervals
    else:
        return

select_local_detectors(selection=None, flagmask=0)

Get the local detectors assigned to this process.

This takes the full list of local detectors and optionally prunes them by the specified selection and / or applies per-detector flags with the given mask.

Parameters:

Name Type Description Default
selection list

Only return detectors in this set.

None
flagmask uint8

Apply this mask to per-detector flags and only include detectors with a result of zero (good).

0

Returns:

Type Description
list

The selected detectors.

Source code in toast/observation.py
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
def select_local_detectors(
    self,
    selection=None,
    flagmask=0,
):
    """Get the local detectors assigned to this process.

    This takes the full list of local detectors and optionally prunes them
    by the specified selection and / or applies per-detector flags with
    the given mask.

    Args:
        selection (list):  Only return detectors in this set.
        flagmask (uint8):  Apply this mask to per-detector flags and only
            include detectors with a result of zero (good).

    Returns:
        (list):  The selected detectors.

    """
    if flagmask is None:
        good = set(self.local_detectors)
    else:
        good = set(
            [
                x
                for x in self.local_detectors
                if (self.local_detector_flags[x] & flagmask) == 0
            ]
        )
    dets = list()
    if selection is None:
        for det in self.local_detectors:
            if det in good:
                dets.append(det)
    else:
        sel_set = set(selection)
        for det in self.local_detectors:
            if (det in sel_set) and (det in good):
                dets.append(det)
    # print(f"SELECT mask {int(flagmask)} {selection}: {dets}", flush=True)
    return dets

set_local_detector_flags(vals)

Set the per-detector flagging.

This resets the per-detector flags to the specified values.

Parameters:

Name Type Description Default
vals dict

The flag values for one or more detectors.

required

Returns:

Type Description

None

Source code in toast/observation.py
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
def set_local_detector_flags(self, vals):
    """Set the per-detector flagging.

    This resets the per-detector flags to the specified values.

    Args:
        vals (dict):  The flag values for one or more detectors.

    Returns:
        None

    """
    ldets = set(self.local_detectors)
    for k, v in vals.items():
        if k not in ldets:
            msg = f"Cannot set per-detector flag for '{k}', which is"
            msg += " not a local detector"
            raise RuntimeError(msg)
        self._detflags[k] = int(v)

update_local_detector_flags(vals)

Update the per-detector flagging.

This does a bitwise OR with the existing flag values.

Parameters:

Name Type Description Default
vals dict

The flag values for one or more detectors.

required

Returns:

Type Description

None

Source code in toast/observation.py
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def update_local_detector_flags(self, vals):
    """Update the per-detector flagging.

    This does a bitwise OR with the existing flag values.

    Args:
        vals (dict):  The flag values for one or more detectors.

    Returns:
        None

    """
    ldets = set(self.local_detectors)
    for k, v in vals.items():
        if k not in ldets:
            msg = f"Cannot update per-detector flag for '{k}', which is"
            msg += " not a local detector"
            raise RuntimeError(msg)
        self._detflags[k] |= int(v)

Each Observation has its own instrument model (see Instrument Model section).