Skip to content

Global Settings

The toast module has several global settings that can be configured at runtime with environment variables or by explicitly calling functions in the package.

toast

Time Ordered Astrophysics Scalable Tools (TOAST) is a software package designed to allow the processing of data from telescopes that acquire data as timestreams (rather than images).

Runtime behavior of this package can be controlled by setting some environment variables (before importing the package):

TOAST_LOGLEVEL= * Possible values are "VERBOSE", "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL". Default is "INFO". * Controls logging of both C++ and Python code.

TOAST_FUNCTIME= * Values "1", "true", or "yes" will enable python function timers in many parts of the code.

TOAST_GPU_OPENMP= * Values "1", "true", or "yes" will enable runtime-support for OpenMP target offload. * Requires compile-time support for OpenMP 5.x features.

TOAST_GPU_MEM_GB= * Value in GB of memory to use for OpenMP on each GPU. * Conservative default is 2GB.

TOAST_GPU_JAX= * Values "1", "true", or "yes" will enable runtime support for jax. * Requires jax to be available / importable.

TOAST_GPU_HYBRID_PIPELINES= * Values "0", "false", or "no" will disable runtime support for hybrid GPU pipelines. * Requires TOAST_GPU_OPENMP or TOAST_GPU_JAX to be enabled.

OMP_NUM_THREADS= * Toast uses OpenMP threading in several places and the concurrency is set by the usual environment variable.

OMP_TARGET_OFFLOAD=[MANDATORY | DISABLED | DEFAULT] * If the TOAST_GPU_OPENMP environment variable is set, this standard OpenMP environment variable controls the offload behavior.

MPI_DISABLE= * Any non-empty value will disable a try block that looks for mpi4py. Needed on some systems where mpi4py is available but does not work. * The same variable also controls the pshmem package used by toast.

CUDA_MEMPOOL_FRACTION= * If compiled with CUDA support (-DUSE_CUDA), create a memory pool that pre-allocates this fraction of the device memory allocated to each process.

__version__ = env.version() module-attribute

env = Environment.get() module-attribute

interval_dtype = build_interval_dtype() module-attribute

relfile = os.path.join(thisdir, 'RELEASE') module-attribute

thisdir = os.path.abspath(os.path.dirname(__file__)) module-attribute

Comm

Bases: object

Class which represents a two-level hierarchy of MPI communicators.

A Comm object splits the full set of processes into groups of size groupsize. If groupsize does not divide evenly into the size of the given communicator, then some processes remain idle.

A Comm object stores several MPI communicators: The "world" communicator given here, which contains all processes to consider, a "group" communicator (one per group), and a "rank" communicator which contains the processes with the same group-rank across all groups.

This object also stores a "node" communicator containing all processes with access to the same shared memory, and a "node rank" communicator for processes with the same rank on a node. There is a node rank communicator for all nodes and also one for within the group.

Additionally, there is a mechanism for creating and caching row / column communicators for process grids within a group.

If MPI is not enabled, then all communicators are set to None. Additionally, there may be cases where MPI is enabled in the environment, but the user wishes to disable it when creating a Comm object. This can be done by passing MPI.COMM_SELF as the world communicator.

Parameters:

Name Type Description Default
world Comm

the MPI communicator containing all processes.

None
groupsize int

the size of each process group.

0
Source code in toast/mpi.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
class Comm(object):
    """Class which represents a two-level hierarchy of MPI communicators.

    A Comm object splits the full set of processes into groups of size
    `groupsize`.  If groupsize does not divide evenly into the size of the given
    communicator, then some processes remain idle.

    A Comm object stores several MPI communicators:  The "world" communicator
    given here, which contains all processes to consider, a "group"
    communicator (one per group), and a "rank" communicator which contains the
    processes with the same group-rank across all groups.

    This object also stores a "node" communicator containing all processes with
    access to the same shared memory, and a "node rank" communicator for
    processes with the same rank on a node.  There is a node rank communicator
    for all nodes and also one for within the group.

    Additionally, there is a mechanism for creating and caching row / column
    communicators for process grids within a group.

    If MPI is not enabled, then all communicators are set to None.  Additionally,
    there may be cases where MPI is enabled in the environment, but the user wishes
    to disable it when creating a Comm object.  This can be done by passing
    MPI.COMM_SELF as the world communicator.

    Args:
        world (mpi4py.MPI.Comm): the MPI communicator containing all processes.
        groupsize (int): the size of each process group.

    """

    def __init__(self, world=None, groupsize=0):
        log = Logger.get()
        if world is None:
            if use_mpi:
                # Default is COMM_WORLD
                world = MPI.COMM_WORLD
            else:
                # MPI is disabled, leave world as None.
                pass
        else:
            if use_mpi:
                # We were passed a communicator to use. Check that it is
                # actually a communicator, otherwise fall back to COMM_WORLD.
                if not isinstance(world, MPI.Comm):
                    log.warning(
                        "Specified world communicator is not a valid "
                        "mpi4py.MPI.Comm object.  Using COMM_WORLD."
                    )
                    world = MPI.COMM_WORLD
            else:
                log.warning(
                    "World communicator specified even though "
                    "MPI is disabled.  Ignoring this constructor "
                    "argument."
                )
                world = None
            # Special case, MPI available but the user wants a serial
            # data object
            if world == MPI.COMM_SELF:
                world = None

        self._wcomm = world
        self._wrank = 0
        self._wsize = 1
        self._nodecomm = None
        self._noderankcomm = None
        self._nodeprocs = 1
        self._noderankprocs = 1
        if self._wcomm is not None:
            self._wrank = self._wcomm.rank
            self._wsize = self._wcomm.size
            self._nodecomm = self._wcomm.Split_type(MPI.COMM_TYPE_SHARED, 0)
            self._nodeprocs = self._nodecomm.size
            myworldnode = self._wrank // self._nodeprocs
            self._noderankcomm = self._wcomm.Split(self._nodecomm.rank, myworldnode)
            self._noderankprocs = self._noderankcomm.size

        self._gsize = groupsize

        if (self._gsize < 0) or (self._gsize > self._wsize):
            log.warning(
                "Invalid groupsize ({}).  Should be between {} "
                "and {}.  Using single process group instead.".format(
                    groupsize, 0, self._wsize
                )
            )
            self._gsize = 0

        if self._gsize == 0:
            self._gsize = self._wsize

        self._ngroups = self._wsize // self._gsize

        if self._ngroups * self._gsize != self._wsize:
            msg = (
                "World communicator size ({}) is not evenly divisible "
                "by requested group size ({}).".format(self._wsize, self._gsize)
            )
            log.error(msg)
            raise RuntimeError(msg)

        if self._gsize > self._nodeprocs:
            if self._gsize % self._nodeprocs != 0:
                msg = f"Group size of {self._gsize} is not a whole number of "
                msg += f"nodes (there are {self._nodeprocs} processes per node)"
                log.error(msg)
                raise RuntimeError(msg)
            self._gnodes = self._gsize // self._nodeprocs
        else:
            self._gnodes = 1
        self._group = self._wrank // self._gsize
        self._grank = self._wrank % self._gsize
        self._cleanup_group_comm = False

        self._gnodecomm = None
        self._gnoderankcomm = None
        self._gnodeprocs = 1
        if self._ngroups == 1:
            # We just have one group with all processes.
            self._gcomm = self._wcomm
            self._gnodecomm = self._nodecomm
            self._gnoderankcomm = self._noderankcomm
            self._rcomm = None
            self._gnoderankprocs = self._noderankprocs
        else:
            # We need to split the communicator.  This code is never executed
            # unless MPI is enabled and we have multiple groups.
            self._gcomm = self._wcomm.Split(self._group, self._grank)
            self._rcomm = self._wcomm.Split(self._grank, self._group)
            self._gnodecomm = self._gcomm.Split_type(MPI.COMM_TYPE_SHARED, 0)
            self._gnodeprocs = self._gnodecomm.size
            mygroupnode = self._grank // self._gnodeprocs
            self._gnoderankcomm = self._gcomm.Split(self._gnodecomm.rank, mygroupnode)
            self._gnoderankprocs = self._gnoderankcomm.size
            self._cleanup_group_comm = True

        msg = f"Comm on world rank {self._wrank}:\n"
        msg += f"  world comm = {self._wcomm} with {self._wsize} ranks\n"
        msg += (
            f"  intra-node comm = {self._nodecomm} ({self._nodeprocs} ranks per node)\n"
        )
        msg += f"  inter-node rank comm = {self._noderankcomm} ({self._noderankprocs} ranks)\n"
        msg += (
            f"  in group {self._group + 1} / {self._ngroups} with rank {self._grank}\n"
        )
        msg += f"  intra-group comm = {self._gcomm} ({self._gsize} ranks)\n"
        msg += f"  inter-group rank comm = {self._rcomm}\n"
        msg += f"  intra-node group comm = {self._gnodecomm} ({self._gnodeprocs} ranks per node)\n"
        msg += f"  inter-node group rank comm = {self._gnoderankcomm} ({self._noderankprocs} ranks)\n"
        log.verbose(msg)

        if self._gnoderankprocs != self._gnodes:
            msg = f"Number of group node rank procs ({self._gnoderankprocs}) does "
            msg += f"not match the number of nodes in a group ({self._gnodes})"
            log.error(msg)
            raise RuntimeError(msg)

        # Create a cache of row / column communicators for each group.  These can
        # then be re-used for observations with the same grid shapes.
        self._rowcolcomm = dict()

    def close(self):
        # Explicitly free communicators if needed.
        # Go through the cache of row / column grid communicators and free
        if hasattr(self, "_rowcolcomm"):
            for process_rows, comms in self._rowcolcomm.items():
                if comms["cleanup"]:
                    # We previously allocated new communicators for this grid.
                    # Free them now.
                    for subcomm in [
                        "row_rank_node",
                        "row_node",
                        "col_rank_node",
                        "col_node",
                        "row",
                        "col",
                    ]:
                        if comms[subcomm] is not None:
                            comms[subcomm].Free()
                            del comms[subcomm]
            del self._rowcolcomm
        # Optionally delete the group communicators if they were created.
        if hasattr(self, "_cleanup_group_comm") and self._cleanup_group_comm:
            self._gcomm.Free()
            self._rcomm.Free()
            self._gnodecomm.Free()
            self._gnoderankcomm.Free()
            del self._gcomm
            del self._rcomm
            del self._gnodecomm
            del self._gnoderankcomm
            del self._cleanup_group_comm
        # We always need to clean up the world node and node-rank communicators
        # if they exist
        if hasattr(self, "_noderankcomm") and self._noderankcomm is not None:
            self._noderankcomm.Free()
            del self._noderankcomm
        if hasattr(self, "_nodecomm") and self._nodecomm is not None:
            self._nodecomm.Free()
            del self._nodecomm
        return

    def __del__(self):
        self.close()

    @property
    def world_size(self):
        """The size of the world communicator."""
        return self._wsize

    @property
    def world_rank(self):
        """The rank of this process in the world communicator."""
        return self._wrank

    @property
    def ngroups(self):
        """The number of process groups."""
        return self._ngroups

    @property
    def group(self):
        """The group containing this process."""
        return self._group

    @property
    def group_size(self):
        """The size of the group containing this process."""
        return self._gsize

    @property
    def group_rank(self):
        """The rank of this process in the group communicator."""
        return self._grank

    # All the different types of relevant MPI communicators.

    @property
    def comm_world(self):
        """The world communicator."""
        return self._wcomm

    @property
    def comm_world_node(self):
        """The communicator shared by world processes on the same node."""
        return self._nodecomm

    @property
    def comm_world_node_rank(self):
        """The communicator shared by world processes with the same node rank across all nodes."""
        return self._noderankcomm

    @property
    def comm_group(self):
        """The communicator shared by processes within this group."""
        return self._gcomm

    @property
    def comm_group_rank(self):
        """The communicator shared by processes with the same group_rank."""
        return self._rcomm

    @property
    def comm_group_node(self):
        """The communicator shared by group processes on the same node."""
        return self._gnodecomm

    @property
    def comm_group_node_rank(self):
        """The communicator shared by group processes with the same node rank on nodes within the group."""
        return self._gnoderankcomm

    def comm_row_col(self, process_rows):
        """Return the row and column communicators for this group and grid shape.

        This function will create and / or return the communicators needed for
        a given process grid.  The return value is a dictionary with the following
        keys:

            - "row":  The row communicator.
            - "col":  The column communicator.
            - "row_node":  The node-local communicator within the row communicator
            - "col_node":  The node-local communicator within the col communicator
            - "row_rank_node":  The communicator across nodes among processes with
                the same node-rank within the row communicator.
            - "col_rank_node":  The communicator across nodes among processes with
                the same node-rank within the column communicator.

        Args:
            process_rows (int):  The number of rows in the process grid.

        Returns:
            (dict):  The communicators for this grid shape.

        """
        if process_rows not in self._rowcolcomm:
            # Does not exist yet, create it.
            if self._gcomm is None:
                if process_rows != 1:
                    msg = "MPI not in use, so only process_rows == 1 is allowed"
                    log.error(msg)
                    raise ValueError(msg)
                self._rowcolcomm[process_rows] = {
                    "row": None,
                    "col": None,
                    "row_node": None,
                    "row_rank_node": None,
                    "col_node": None,
                    "col_rank_node": None,
                    "cleanup": False,
                }
            else:
                if self._gcomm.size % process_rows != 0:
                    msg = f"The number of process_rows ({process_rows}) "
                    msg += f"does not divide evenly into the communicator "
                    msg += f"size ({self._gcomm.size})"
                    log.error(msg)
                    raise RuntimeError(msg)
                process_cols = self._gcomm.size // process_rows

                if process_rows == 1:
                    # We can re-use the group communicators as the grid column
                    # communicators
                    comm_row = self._gcomm
                    comm_row_node = self._gnodecomm
                    comm_row_rank_node = self._gnoderankcomm
                    comm_col = None
                    comm_col_node = None
                    comm_col_rank_node = None
                    cleanup = False
                elif process_cols == 1:
                    # We can re-use the group communicators as the grid row
                    # communicators
                    comm_col = self._gcomm
                    comm_col_node = self._gnodecomm
                    comm_col_rank_node = self._gnoderankcomm
                    comm_row = None
                    comm_row_node = None
                    comm_row_rank_node = None
                    cleanup = False
                else:
                    # We have to create new split communicators
                    col_rank = self._gcomm.rank // process_cols
                    row_rank = self._gcomm.rank % process_cols
                    comm_row = self._gcomm.Split(col_rank, row_rank)
                    comm_col = self._gcomm.Split(row_rank, col_rank)

                    # Node and node-rank comms for each row and col.
                    comm_row_node = comm_row.Split_type(MPI.COMM_TYPE_SHARED, 0)
                    row_nodeprocs = comm_row_node.size
                    row_node = comm_row.rank // row_nodeprocs
                    comm_row_rank_node = comm_row.Split(comm_row_node.rank, row_node)

                    comm_col_node = comm_col.Split_type(MPI.COMM_TYPE_SHARED, 0)
                    col_nodeprocs = comm_col_node.size
                    col_node = comm_col.rank // col_nodeprocs
                    comm_col_rank_node = comm_col.Split(comm_col_node.rank, col_node)
                    cleanup = True

                msg = f"Comm on world rank {self._wrank} create grid with {process_rows} rows:\n"
                msg += f"  row comm = {comm_row}\n"
                msg += f"  node comm = {comm_row_node}\n"
                msg += f"  node rank comm = {comm_row_rank_node}\n"
                msg += f"  col comm = {comm_col}\n"
                msg += f"  node comm = {comm_col_node}\n"
                msg += f"  node rank comm = {comm_col_rank_node}"
                log.verbose(msg)

                self._rowcolcomm[process_rows] = {
                    "row": comm_row,
                    "row_node": comm_row_node,
                    "row_rank_node": comm_row_rank_node,
                    "col": comm_col,
                    "col_node": comm_col_node,
                    "col_rank_node": comm_col_rank_node,
                    "cleanup": cleanup,
                }
        return self._rowcolcomm[process_rows]

    def __repr__(self):
        lines = [
            "  World MPI communicator = {}".format(self._wcomm),
            "  World MPI size = {}".format(self._wsize),
            "  World MPI rank = {}".format(self._wrank),
            "  Group MPI communicator = {}".format(self._gcomm),
            "  Group MPI size = {}".format(self._gsize),
            "  Group MPI rank = {}".format(self._grank),
            "  Rank MPI communicator = {}".format(self._rcomm),
        ]
        return "<toast.Comm\n{}\n>".format("\n".join(lines))

comm_group property

The communicator shared by processes within this group.

comm_group_node property

The communicator shared by group processes on the same node.

comm_group_node_rank property

The communicator shared by group processes with the same node rank on nodes within the group.

comm_group_rank property

The communicator shared by processes with the same group_rank.

comm_world property

The world communicator.

comm_world_node property

The communicator shared by world processes on the same node.

comm_world_node_rank property

The communicator shared by world processes with the same node rank across all nodes.

group property

The group containing this process.

group_rank property

The rank of this process in the group communicator.

group_size property

The size of the group containing this process.

ngroups property

The number of process groups.

world_rank property

The rank of this process in the world communicator.

world_size property

The size of the world communicator.

comm_row_col(process_rows)

Return the row and column communicators for this group and grid shape.

This function will create and / or return the communicators needed for a given process grid. The return value is a dictionary with the following keys:

- "row":  The row communicator.
- "col":  The column communicator.
- "row_node":  The node-local communicator within the row communicator
- "col_node":  The node-local communicator within the col communicator
- "row_rank_node":  The communicator across nodes among processes with
    the same node-rank within the row communicator.
- "col_rank_node":  The communicator across nodes among processes with
    the same node-rank within the column communicator.

Parameters:

Name Type Description Default
process_rows int

The number of rows in the process grid.

required

Returns:

Type Description
dict

The communicators for this grid shape.

Source code in toast/mpi.py
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
def comm_row_col(self, process_rows):
    """Return the row and column communicators for this group and grid shape.

    This function will create and / or return the communicators needed for
    a given process grid.  The return value is a dictionary with the following
    keys:

        - "row":  The row communicator.
        - "col":  The column communicator.
        - "row_node":  The node-local communicator within the row communicator
        - "col_node":  The node-local communicator within the col communicator
        - "row_rank_node":  The communicator across nodes among processes with
            the same node-rank within the row communicator.
        - "col_rank_node":  The communicator across nodes among processes with
            the same node-rank within the column communicator.

    Args:
        process_rows (int):  The number of rows in the process grid.

    Returns:
        (dict):  The communicators for this grid shape.

    """
    if process_rows not in self._rowcolcomm:
        # Does not exist yet, create it.
        if self._gcomm is None:
            if process_rows != 1:
                msg = "MPI not in use, so only process_rows == 1 is allowed"
                log.error(msg)
                raise ValueError(msg)
            self._rowcolcomm[process_rows] = {
                "row": None,
                "col": None,
                "row_node": None,
                "row_rank_node": None,
                "col_node": None,
                "col_rank_node": None,
                "cleanup": False,
            }
        else:
            if self._gcomm.size % process_rows != 0:
                msg = f"The number of process_rows ({process_rows}) "
                msg += f"does not divide evenly into the communicator "
                msg += f"size ({self._gcomm.size})"
                log.error(msg)
                raise RuntimeError(msg)
            process_cols = self._gcomm.size // process_rows

            if process_rows == 1:
                # We can re-use the group communicators as the grid column
                # communicators
                comm_row = self._gcomm
                comm_row_node = self._gnodecomm
                comm_row_rank_node = self._gnoderankcomm
                comm_col = None
                comm_col_node = None
                comm_col_rank_node = None
                cleanup = False
            elif process_cols == 1:
                # We can re-use the group communicators as the grid row
                # communicators
                comm_col = self._gcomm
                comm_col_node = self._gnodecomm
                comm_col_rank_node = self._gnoderankcomm
                comm_row = None
                comm_row_node = None
                comm_row_rank_node = None
                cleanup = False
            else:
                # We have to create new split communicators
                col_rank = self._gcomm.rank // process_cols
                row_rank = self._gcomm.rank % process_cols
                comm_row = self._gcomm.Split(col_rank, row_rank)
                comm_col = self._gcomm.Split(row_rank, col_rank)

                # Node and node-rank comms for each row and col.
                comm_row_node = comm_row.Split_type(MPI.COMM_TYPE_SHARED, 0)
                row_nodeprocs = comm_row_node.size
                row_node = comm_row.rank // row_nodeprocs
                comm_row_rank_node = comm_row.Split(comm_row_node.rank, row_node)

                comm_col_node = comm_col.Split_type(MPI.COMM_TYPE_SHARED, 0)
                col_nodeprocs = comm_col_node.size
                col_node = comm_col.rank // col_nodeprocs
                comm_col_rank_node = comm_col.Split(comm_col_node.rank, col_node)
                cleanup = True

            msg = f"Comm on world rank {self._wrank} create grid with {process_rows} rows:\n"
            msg += f"  row comm = {comm_row}\n"
            msg += f"  node comm = {comm_row_node}\n"
            msg += f"  node rank comm = {comm_row_rank_node}\n"
            msg += f"  col comm = {comm_col}\n"
            msg += f"  node comm = {comm_col_node}\n"
            msg += f"  node rank comm = {comm_col_rank_node}"
            log.verbose(msg)

            self._rowcolcomm[process_rows] = {
                "row": comm_row,
                "row_node": comm_row_node,
                "row_rank_node": comm_row_rank_node,
                "col": comm_col,
                "col_node": comm_col_node,
                "col_rank_node": comm_col_rank_node,
                "cleanup": cleanup,
            }
    return self._rowcolcomm[process_rows]

Data

Bases: MutableMapping

Class which represents distributed data

A Data object contains a list of observations assigned to each process group in the Comm.

Parameters:

Name Type Description Default
comm

class:toast.Comm): The toast Comm class for distributing the data.

Comm()
view bool

If True, do not explicitly clear observation data on deletion.

False
Source code in toast/data.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
class Data(MutableMapping):
    """Class which represents distributed data

    A Data object contains a list of observations assigned to
    each process group in the Comm.

    Args:
        comm (:class:`toast.Comm`):  The toast Comm class for distributing the data.
        view (bool):  If True, do not explicitly clear observation data on deletion.

    """

    def __init__(self, comm=Comm(), view=False):
        self._comm = comm
        self._view = view
        self.obs = []
        """The list of observations.
        """
        self._internal = dict()

    def __getitem__(self, key):
        return self._internal[key]

    def __delitem__(self, key):
        del self._internal[key]

    def __setitem__(self, key, value):
        self._internal[key] = value

    def __iter__(self):
        return iter(self._internal)

    def __len__(self):
        return len(self._internal)

    def __repr__(self):
        val = "<Data with {} Observations:\n".format(len(self.obs))
        for ob in self.obs:
            val += "{}\n".format(ob)
        val += "Metadata:\n"
        val += "{}".format(self._internal)
        val += "\n>"
        return val

    def __del__(self):
        if hasattr(self, "obs"):
            self.clear()

    @property
    def comm(self):
        """The toast.Comm over which the data is distributed."""
        return self._comm

    def clear(self):
        """Clear the list of observations."""
        if not self._view:
            self.accel_clear()
            for ob in self.obs:
                ob.clear()
        self.obs.clear()
        if not self._view:
            self._internal.clear()
        return

    def all_local_detectors(self, selection=None, flagmask=0):
        """Get the superset of local detectors in all observations.

        This builds up the result from calling `select_local_detectors()` on
        all observations.

        Args:
            selection (list):  Only consider this list of detectors
            flagmask (int):  Apply this det_mask to the detector selection in
                each observation.

        Returns:
            (list):  The list of all local detectors across all observations.

        """
        all_dets = OrderedDict()
        for ob in self.obs:
            dets = ob.select_local_detectors(selection=selection, flagmask=flagmask)
            for d in dets:
                if d not in all_dets:
                    all_dets[d] = None
        return list(all_dets.keys())

    def detector_units(self, det_data):
        """Get the detector data units for a given field.

        This verifies that the specified detector data field has the same
        units in all observations where it occurs, and returns that unit.

        Args:
            det_data (str):  The detector data field.

        Returns:
            (Unit):  The unit used across all observations.

        """
        log = Logger.get()
        local_units = None
        for ob in self.obs:
            if det_data not in ob.detdata:
                continue
            ob_units = ob.detdata[det_data].units
            if local_units is None:
                local_units = ob_units
            else:
                if ob_units != local_units:
                    msg = f"obs {ob.name} detdata {det_data} units "
                    msg += f"{ob_units} != {local_units}"
                    log.error(msg)
                    raise RuntimeError(msg)
        if self.comm.comm_world is None:
            det_units = local_units
        else:
            det_units = self.comm.comm_world.gather(local_units, root=0)
            if self.comm.world_rank == 0:
                for dtu in det_units:
                    if dtu != local_units:
                        msg = f"observations have different units "
                        msg += f"{dtu} != {local_units}"
                        log.error(msg)
                        raise RuntimeError(msg)
            # We know that every process is the same now
            det_units = local_units
        return det_units

    def info(self, handle=None):
        """Print information about the distributed data.

        Information is written to the specified file handle.  Only the rank 0
        process writes.

        Args:
            handle (descriptor):  file descriptor supporting the write()
                method.  If None, use print().

        Returns:
            None

        """
        # Each process group gathers their output

        groupstr = ""
        procstr = ""

        gcomm = self._comm.comm_group
        wcomm = self._comm.comm_world
        rcomm = self._comm.comm_group_rank

        if wcomm is None:
            msg = "Data distributed over a single process (no MPI)"
            if handle is None:
                print(msg, flush=True)
            else:
                handle.write(msg)
        else:
            if wcomm.rank == 0:
                msg = "Data distributed over {} processes in {} groups\n".format(
                    self._comm.world_size, self._comm.ngroups
                )
                if handle is None:
                    print(msg, flush=True)
                else:
                    handle.write(msg)

        def _get_optional(k, dt):
            if k in dt:
                return dt[k]
            else:
                return None

        for ob in self.obs:
            if self._comm.group_rank == 0:
                groupstr = "{}{}\n".format(groupstr, str(ob))

        # The world rank 0 process collects output from all groups and
        # writes to the handle

        recvgrp = ""
        if self._comm.world_rank == 0:
            if handle is None:
                print(groupstr, flush=True)
            else:
                handle.write(groupstr)
        if wcomm is not None:
            for g in range(1, self._comm.ngroups):
                if wcomm.rank == 0:
                    recvgrp = rcomm.recv(source=g, tag=g)
                    if handle is None:
                        print(recvgrp, flush=True)
                    else:
                        handle.write(recvgrp)
                elif g == self._comm.group:
                    if gcomm.rank == 0:
                        rcomm.send(groupstr, dest=0, tag=g)
                wcomm.barrier()
        return

    def split(
        self,
        obs_index=False,
        obs_name=False,
        obs_uid=False,
        obs_session_name=False,
        obs_key=None,
        require_full=False,
    ):
        """Split the Data object.

        Create new Data objects that have views into unique subsets of the observations
        (the observations are not copied).  Only one "criteria" may be used to perform
        this splitting operation.  The observations may be split by index in the
        original list, by name, by UID, by session, or by the value of a specified key.

        The new Data objects are returned in a dictionary whose keys are the value of
        the selection criteria (index, name, uid, or value of the key).  Any observation
        that cannot be placed (because it is missing a name, uid or key) will be ignored
        and not added to any of the returned Data objects.  If the `require_full`
        parameter is set to True, such situations will raise an exception.

        Args:
            obs_index (bool):  If True, split by index in original list of observations.
            obs_name (bool):  If True, split by observation name.
            obs_uid (bool):  If True, split by observation UID.
            obs_session_name (bool):  If True, split by session name.
            obs_key (str):  Split by values of this observation key.
            require_full (bool):  If True, every observation must be placed in the
                output.

        Returns:
            (OrderedDict):  The dictionary of new Data objects.

        """
        log = Logger.get()
        check = (
            int(obs_index)
            + int(obs_name)
            + int(obs_uid)
            + int(obs_session_name)
            + int(obs_key is not None)
        )
        if check == 0 or check > 1:
            raise RuntimeError("You must specify exactly one split criteria")

        datasplit = OrderedDict()

        group_rank = self.comm.group_rank
        group_comm = self.comm.comm_group

        if obs_index:
            # Splitting by (unique) index
            for iob, ob in enumerate(self.obs):
                newdat = Data(comm=self._comm, view=True)
                newdat._internal = self._internal
                newdat.obs.append(ob)
                datasplit[iob] = newdat
        elif obs_name:
            # Splitting by (unique) name
            for iob, ob in enumerate(self.obs):
                if ob.name is None:
                    if require_full:
                        msg = f"require_full is True, but observation {iob} has no name"
                        log.error_rank(msg, comm=group_comm)
                        raise RuntimeError(msg)
                else:
                    newdat = Data(comm=self._comm, view=True)
                    newdat._internal = self._internal
                    newdat.obs.append(ob)
                    datasplit[ob.name] = newdat
        elif obs_uid:
            # Splitting by UID
            for iob, ob in enumerate(self.obs):
                if ob.uid is None:
                    if require_full:
                        msg = f"require_full is True, but observation {iob} has no UID"
                        log.error_rank(msg, comm=group_comm)
                        raise RuntimeError(msg)
                else:
                    newdat = Data(comm=self._comm, view=True)
                    newdat._internal = self._internal
                    newdat.obs.append(ob)
                    datasplit[ob.uid] = newdat
        elif obs_session_name:
            # Splitting by (non-unique) session name
            for iob, ob in enumerate(self.obs):
                if ob.session is None or ob.session.name is None:
                    if require_full:
                        msg = f"require_full is True, but observation {iob} has no session name"
                        log.error_rank(msg, comm=group_comm)
                        raise RuntimeError(msg)
                else:
                    sname = ob.session.name
                    if sname not in datasplit:
                        newdat = Data(comm=self._comm, view=True)
                        newdat._internal = self._internal
                        datasplit[sname] = newdat
                    datasplit[sname].obs.append(ob)
        elif obs_key is not None:
            # Splitting by arbitrary key.  Unlike name / uid which are built it to the
            # observation class, arbitrary keys might be modified in different ways
            # across all processes in a group.  For this reason, we do an additional
            # check for consistent values across the process group.
            for iob, ob in enumerate(self.obs):
                if obs_key not in ob:
                    if require_full:
                        msg = f"require_full is True, but observation {iob} has no key '{obs_key}'"
                        log.error_rank(msg, comm=group_comm)
                        raise RuntimeError(msg)
                else:
                    obs_val = ob[obs_key]
                    # Get the values from all processes in the group
                    group_vals = None
                    if group_comm is None:
                        group_vals = [obs_val]
                    else:
                        group_vals = group_comm.allgather(obs_val)
                    if group_vals.count(group_vals[0]) != len(group_vals):
                        msg = f"observation {iob}, key '{obs_key}' has inconsistent values across processes"
                        log.error_rank(msg, comm=group_comm)
                        raise RuntimeError(msg)
                    if obs_val not in datasplit:
                        newdat = Data(comm=self._comm, view=True)
                        newdat._internal = self._internal
                        datasplit[obs_val] = newdat
                    datasplit[obs_val].obs.append(ob)
        return datasplit

    def select(
        self,
        obs_index=None,
        obs_name=None,
        obs_uid=None,
        obs_session_name=None,
        obs_key=None,
        obs_val=None,
    ):
        """Create a new Data object with a subset of observations.

        The returned Data object just has a view of the original observations (they
        are not copied).

        The list of observations in the new Data object is a logical OR of the
        criteria passed in:
            * Index location in the original list of observations
            * Name of the observation
            * UID of the observation
            * Session of the observation
            * Existence of the specified dictionary key
            * Required value of the specified dictionary key

        Args:
            obs_index (int):  Observation location in the original list.
            obs_name (str):  The observation name or a compiled regular expression
                object to use for matching.
            obs_uid (int):  The observation UID to select.
            obs_session_name (str):  The name of the session.
            obs_key (str):  The observation dictionary key to examine.
            obs_val (str):  The required value of the observation dictionary key or a
                compiled regular expression object to use for matching.

        Returns:
            (Data):  A new Data object with references to the orginal metadata and
                a subset of observations.

        """
        log = Logger.get()
        if obs_val is not None and obs_key is None:
            raise RuntimeError("If you specify obs_val, you must also specify obs_key")

        group_rank = self.comm.group_rank
        group_comm = self.comm.comm_group

        new_data = Data(comm=self._comm, view=True)

        # Use a reference to the original metadata
        new_data._internal = self._internal

        for iob, ob in enumerate(self.obs):
            if obs_index is not None and obs_index == iob:
                new_data.obs.append(ob)
                continue
            if obs_name is not None and ob.name is not None:
                if isinstance(obs_name, re.Pattern):
                    if obs_name.match(ob.name) is not None:
                        new_data.obs.append(ob)
                        continue
                elif obs_name == ob.name:
                    new_data.obs.append(ob)
                    continue
            if obs_uid is not None and ob.uid is not None and obs_uid == ob.uid:
                new_data.obs.append(ob)
                continue
            if (
                obs_session_name is not None
                and ob.session is not None
                and obs_session_name == ob.session.name
            ):
                new_data.obs.append(ob)
                continue
            if obs_key is not None and obs_key in ob:
                # Get the values from all processes in the group and check
                # for consistency.
                group_vals = None
                if group_comm is None:
                    group_vals = [ob[obs_key]]
                else:
                    group_vals = group_comm.allgather(ob[obs_key])
                if group_vals.count(group_vals[0]) != len(group_vals):
                    msg = f"observation {iob}, key '{obs_key}' has inconsistent values across processes"
                    log.error_rank(msg, comm=group_comm)
                    raise RuntimeError(msg)

                if obs_val is None:
                    # We have the key, and are accepting any value
                    new_data.obs.append(ob)
                    continue
                elif isinstance(obs_val, re.Pattern):
                    if obs_val.match(ob[obs_key]) is not None:
                        # Matches our regex
                        new_data.obs.append(ob)
                        continue
                elif obs_val == ob[obs_key]:
                    new_data.obs.append(ob)
                    continue
        return new_data

    # Accelerator use

    def accel_create(self, names):
        """Create a set of data objects on the device.

        This takes a dictionary with the same format as those used by the Operator
        provides() and requires() methods.  If the data already exists on the
        device then no action is taken.

        Args:
            names (dict):  Dictionary of lists.

        Returns:
            None

        """
        log = Logger.get()
        if not accel_enabled():
            log.verbose(f"accel_enabled is False, canceling accel_create.")
            return

        for ob in self.obs:
            for objname, objmgr in [
                ("detdata", ob.detdata),
                ("shared", ob.shared),
                ("intervals", ob.intervals),
            ]:
                for key in names[objname]:
                    if key not in objmgr:
                        msg = f"ob {ob.name} {objname} accel_create '{key}' "
                        msg += f"not present, ignoring"
                        log.verbose(msg)
                        continue
                    if objmgr.accel_exists(key):
                        msg = f"ob {ob.name} {objname}: accel_create '{key}'"
                        msg += f" already exists"
                        log.verbose(msg)
                    else:
                        log.verbose(f"ob {ob.name} {objname}: accel_create '{key}'")
                        objmgr.accel_create(key)

        for key in names["global"]:
            val = self._internal.get(key, None)
            if isinstance(val, AcceleratorObject):
                if not val.accel_exists():
                    log.verbose(f"Data accel_create: '{key}'")
                    val.accel_create(key)
                else:
                    log.verbose(f"Data accel_create: '{key}' already on device")
            else:
                log.verbose(
                    f"Data accel_create: '{key}' ({type(val)}) is not an AcceleratorObject"
                )

    def accel_update_device(self, names):
        """Copy a set of data objects to the device.

        This takes a dictionary with the same format as those used by the Operator
        provides() and requires() methods.

        Args:
            names (dict):  Dictionary of lists.

        Returns:
            None

        """
        if not accel_enabled():
            return
        log = Logger.get()

        for ob in self.obs:
            for objname, objmgr in [
                ("detdata", ob.detdata),
                ("shared", ob.shared),
                ("intervals", ob.intervals),
            ]:
                for key in names[objname]:
                    if key not in objmgr:
                        msg = f"ob {ob.name} {objname} update_device key '{key}'"
                        msg += f" not present, ignoring"
                        log.verbose(msg)
                        continue
                    if not objmgr.accel_exists(key):
                        msg = f"ob {ob.name} {objname} update_device key '{key}'"
                        msg += f" does not exist on accelerator"
                        log.error(msg)
                        raise RuntimeError(msg)
                    if objmgr.accel_in_use(key):
                        msg = f"ob {ob.name} {objname}: skip update_device '{key}'"
                        msg += f" already in use"
                        log.verbose(msg)
                    else:
                        log.verbose(f"ob {ob.name} {objname}: update_device '{key}'")
                        objmgr.accel_update_device(key)

        for key in names["global"]:
            val = self._internal.get(key, None)
            if isinstance(val, AcceleratorObject):
                if val.accel_in_use():
                    msg = f"Skipping update_device for '{key}', "
                    msg += "device data in use"
                    log.verbose(msg)
                else:
                    log.verbose(f"Calling Data update_device for '{key}'")
                    val.accel_update_device()
            else:
                msg = f"Data accel_update_device: '{key}' ({type(val)}) "
                msg += "is not an AcceleratorObject"
                log.verbose(msg)

    def accel_update_host(self, names):
        """Copy a set of data objects to the host.

        This takes a dictionary with the same format as those used by the Operator
        provides() and requires() methods.

        Args:
            names (dict):  Dictionary of lists.

        Returns:
            None

        """
        if not accel_enabled():
            return
        log = Logger.get()

        for ob in self.obs:
            for objname, objmgr in [
                ("detdata", ob.detdata),
                ("shared", ob.shared),
                ("intervals", ob.intervals),
            ]:
                for key in names[objname]:
                    if key not in objmgr:
                        msg = f"ob {ob.name} {objname} update_host key '{key}'"
                        msg += f" not present, ignoring"
                        log.verbose(msg)
                        continue
                    if not objmgr.accel_exists(key):
                        msg = f"ob {ob.name} {objname} update_host key '{key}'"
                        msg += f" does not exist on accelerator, ignoring"
                        log.verbose(msg)
                        continue
                    if not objmgr.accel_in_use(key):
                        msg = f"ob {ob.name} {objname}: skip update_host, '{key}'"
                        msg += f" already on host"
                        log.verbose(msg)
                    else:
                        log.verbose(f"ob {ob.name} {objname}: update_host '{key}'")
                        objmgr.accel_update_host(key)

        for key in names["global"]:
            val = self._internal.get(key, None)
            if isinstance(val, AcceleratorObject):
                if not val.accel_in_use():
                    msg = f"Skipping update_host for '{key}', "
                    msg += "host data already in use"
                    log.verbose(msg)
                else:
                    log.verbose(f"Calling Data update_host for '{key}'")
                    val.accel_update_host()
            else:
                msg = f"Data accel_update_host: '{key}' ({type(val)}) "
                msg += "is not an AcceleratorObject"
                log.verbose(msg)

    def accel_delete(self, names):
        """Delete a specific set of device objects

        This takes a dictionary with the same format as those used by the Operator
        provides() and requires() methods.

        Args:
            names (dict):  Dictionary of lists.

        Returns:
            None

        """
        if not accel_enabled():
            return
        log = Logger.get()

        for ob in self.obs:
            for objname, objmgr in [
                ("detdata", ob.detdata),
                ("shared", ob.shared),
                ("intervals", ob.intervals),
            ]:
                for key in names[objname]:
                    if key not in objmgr:
                        msg = f"ob {ob.name} {objname} accel_delete key '{key}'"
                        msg += f" not present, ignoring"
                        log.verbose(msg)
                        continue
                    if objmgr.accel_exists(key):
                        log.verbose(f"ob {ob.name} {objname}: accel_delete '{key}'")
                        objmgr.accel_delete(key)
                    else:
                        msg = f"ob {ob.name} {objname}: accel_delete '{key}'"
                        msg += f" not present on device"
                        log.verbose(msg)

        for key in names["global"]:
            val = self._internal.get(key, None)
            if isinstance(val, AcceleratorObject):
                if val.accel_exists():
                    log.verbose(f"Calling Data accel_delete for '{key}'")
                    val.accel_delete()
            else:
                msg = f"Data accel_delete: '{key}' ({type(val)}) "
                msg += "is not an AcceleratorObject"
                log.verbose(msg)

    def accel_clear(self):
        """Delete all accelerator data."""
        if not accel_enabled():
            return
        log = Logger.get()

        for ob in self.obs:
            ob.accel_clear()

        for key, val in self._internal.items():
            if isinstance(val, AcceleratorObject):
                if val.accel_exists():
                    val.accel_delete()
            else:
                msg = f"Data accel_clear: '{key}' ({type(val)}) "
                msg += "is not an AcceleratorObject"
                log.verbose(msg)

comm property

The toast.Comm over which the data is distributed.

obs = [] instance-attribute

The list of observations.

accel_clear()

Delete all accelerator data.

Source code in toast/data.py
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
def accel_clear(self):
    """Delete all accelerator data."""
    if not accel_enabled():
        return
    log = Logger.get()

    for ob in self.obs:
        ob.accel_clear()

    for key, val in self._internal.items():
        if isinstance(val, AcceleratorObject):
            if val.accel_exists():
                val.accel_delete()
        else:
            msg = f"Data accel_clear: '{key}' ({type(val)}) "
            msg += "is not an AcceleratorObject"
            log.verbose(msg)

accel_create(names)

Create a set of data objects on the device.

This takes a dictionary with the same format as those used by the Operator provides() and requires() methods. If the data already exists on the device then no action is taken.

Parameters:

Name Type Description Default
names dict

Dictionary of lists.

required

Returns:

Type Description

None

Source code in toast/data.py
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
def accel_create(self, names):
    """Create a set of data objects on the device.

    This takes a dictionary with the same format as those used by the Operator
    provides() and requires() methods.  If the data already exists on the
    device then no action is taken.

    Args:
        names (dict):  Dictionary of lists.

    Returns:
        None

    """
    log = Logger.get()
    if not accel_enabled():
        log.verbose(f"accel_enabled is False, canceling accel_create.")
        return

    for ob in self.obs:
        for objname, objmgr in [
            ("detdata", ob.detdata),
            ("shared", ob.shared),
            ("intervals", ob.intervals),
        ]:
            for key in names[objname]:
                if key not in objmgr:
                    msg = f"ob {ob.name} {objname} accel_create '{key}' "
                    msg += f"not present, ignoring"
                    log.verbose(msg)
                    continue
                if objmgr.accel_exists(key):
                    msg = f"ob {ob.name} {objname}: accel_create '{key}'"
                    msg += f" already exists"
                    log.verbose(msg)
                else:
                    log.verbose(f"ob {ob.name} {objname}: accel_create '{key}'")
                    objmgr.accel_create(key)

    for key in names["global"]:
        val = self._internal.get(key, None)
        if isinstance(val, AcceleratorObject):
            if not val.accel_exists():
                log.verbose(f"Data accel_create: '{key}'")
                val.accel_create(key)
            else:
                log.verbose(f"Data accel_create: '{key}' already on device")
        else:
            log.verbose(
                f"Data accel_create: '{key}' ({type(val)}) is not an AcceleratorObject"
            )

accel_delete(names)

Delete a specific set of device objects

This takes a dictionary with the same format as those used by the Operator provides() and requires() methods.

Parameters:

Name Type Description Default
names dict

Dictionary of lists.

required

Returns:

Type Description

None

Source code in toast/data.py
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
def accel_delete(self, names):
    """Delete a specific set of device objects

    This takes a dictionary with the same format as those used by the Operator
    provides() and requires() methods.

    Args:
        names (dict):  Dictionary of lists.

    Returns:
        None

    """
    if not accel_enabled():
        return
    log = Logger.get()

    for ob in self.obs:
        for objname, objmgr in [
            ("detdata", ob.detdata),
            ("shared", ob.shared),
            ("intervals", ob.intervals),
        ]:
            for key in names[objname]:
                if key not in objmgr:
                    msg = f"ob {ob.name} {objname} accel_delete key '{key}'"
                    msg += f" not present, ignoring"
                    log.verbose(msg)
                    continue
                if objmgr.accel_exists(key):
                    log.verbose(f"ob {ob.name} {objname}: accel_delete '{key}'")
                    objmgr.accel_delete(key)
                else:
                    msg = f"ob {ob.name} {objname}: accel_delete '{key}'"
                    msg += f" not present on device"
                    log.verbose(msg)

    for key in names["global"]:
        val = self._internal.get(key, None)
        if isinstance(val, AcceleratorObject):
            if val.accel_exists():
                log.verbose(f"Calling Data accel_delete for '{key}'")
                val.accel_delete()
        else:
            msg = f"Data accel_delete: '{key}' ({type(val)}) "
            msg += "is not an AcceleratorObject"
            log.verbose(msg)

accel_update_device(names)

Copy a set of data objects to the device.

This takes a dictionary with the same format as those used by the Operator provides() and requires() methods.

Parameters:

Name Type Description Default
names dict

Dictionary of lists.

required

Returns:

Type Description

None

Source code in toast/data.py
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
def accel_update_device(self, names):
    """Copy a set of data objects to the device.

    This takes a dictionary with the same format as those used by the Operator
    provides() and requires() methods.

    Args:
        names (dict):  Dictionary of lists.

    Returns:
        None

    """
    if not accel_enabled():
        return
    log = Logger.get()

    for ob in self.obs:
        for objname, objmgr in [
            ("detdata", ob.detdata),
            ("shared", ob.shared),
            ("intervals", ob.intervals),
        ]:
            for key in names[objname]:
                if key not in objmgr:
                    msg = f"ob {ob.name} {objname} update_device key '{key}'"
                    msg += f" not present, ignoring"
                    log.verbose(msg)
                    continue
                if not objmgr.accel_exists(key):
                    msg = f"ob {ob.name} {objname} update_device key '{key}'"
                    msg += f" does not exist on accelerator"
                    log.error(msg)
                    raise RuntimeError(msg)
                if objmgr.accel_in_use(key):
                    msg = f"ob {ob.name} {objname}: skip update_device '{key}'"
                    msg += f" already in use"
                    log.verbose(msg)
                else:
                    log.verbose(f"ob {ob.name} {objname}: update_device '{key}'")
                    objmgr.accel_update_device(key)

    for key in names["global"]:
        val = self._internal.get(key, None)
        if isinstance(val, AcceleratorObject):
            if val.accel_in_use():
                msg = f"Skipping update_device for '{key}', "
                msg += "device data in use"
                log.verbose(msg)
            else:
                log.verbose(f"Calling Data update_device for '{key}'")
                val.accel_update_device()
        else:
            msg = f"Data accel_update_device: '{key}' ({type(val)}) "
            msg += "is not an AcceleratorObject"
            log.verbose(msg)

accel_update_host(names)

Copy a set of data objects to the host.

This takes a dictionary with the same format as those used by the Operator provides() and requires() methods.

Parameters:

Name Type Description Default
names dict

Dictionary of lists.

required

Returns:

Type Description

None

Source code in toast/data.py
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
def accel_update_host(self, names):
    """Copy a set of data objects to the host.

    This takes a dictionary with the same format as those used by the Operator
    provides() and requires() methods.

    Args:
        names (dict):  Dictionary of lists.

    Returns:
        None

    """
    if not accel_enabled():
        return
    log = Logger.get()

    for ob in self.obs:
        for objname, objmgr in [
            ("detdata", ob.detdata),
            ("shared", ob.shared),
            ("intervals", ob.intervals),
        ]:
            for key in names[objname]:
                if key not in objmgr:
                    msg = f"ob {ob.name} {objname} update_host key '{key}'"
                    msg += f" not present, ignoring"
                    log.verbose(msg)
                    continue
                if not objmgr.accel_exists(key):
                    msg = f"ob {ob.name} {objname} update_host key '{key}'"
                    msg += f" does not exist on accelerator, ignoring"
                    log.verbose(msg)
                    continue
                if not objmgr.accel_in_use(key):
                    msg = f"ob {ob.name} {objname}: skip update_host, '{key}'"
                    msg += f" already on host"
                    log.verbose(msg)
                else:
                    log.verbose(f"ob {ob.name} {objname}: update_host '{key}'")
                    objmgr.accel_update_host(key)

    for key in names["global"]:
        val = self._internal.get(key, None)
        if isinstance(val, AcceleratorObject):
            if not val.accel_in_use():
                msg = f"Skipping update_host for '{key}', "
                msg += "host data already in use"
                log.verbose(msg)
            else:
                log.verbose(f"Calling Data update_host for '{key}'")
                val.accel_update_host()
        else:
            msg = f"Data accel_update_host: '{key}' ({type(val)}) "
            msg += "is not an AcceleratorObject"
            log.verbose(msg)

all_local_detectors(selection=None, flagmask=0)

Get the superset of local detectors in all observations.

This builds up the result from calling select_local_detectors() on all observations.

Parameters:

Name Type Description Default
selection list

Only consider this list of detectors

None
flagmask int

Apply this det_mask to the detector selection in each observation.

0

Returns:

Type Description
list

The list of all local detectors across all observations.

Source code in toast/data.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def all_local_detectors(self, selection=None, flagmask=0):
    """Get the superset of local detectors in all observations.

    This builds up the result from calling `select_local_detectors()` on
    all observations.

    Args:
        selection (list):  Only consider this list of detectors
        flagmask (int):  Apply this det_mask to the detector selection in
            each observation.

    Returns:
        (list):  The list of all local detectors across all observations.

    """
    all_dets = OrderedDict()
    for ob in self.obs:
        dets = ob.select_local_detectors(selection=selection, flagmask=flagmask)
        for d in dets:
            if d not in all_dets:
                all_dets[d] = None
    return list(all_dets.keys())

clear()

Clear the list of observations.

Source code in toast/data.py
69
70
71
72
73
74
75
76
77
78
def clear(self):
    """Clear the list of observations."""
    if not self._view:
        self.accel_clear()
        for ob in self.obs:
            ob.clear()
    self.obs.clear()
    if not self._view:
        self._internal.clear()
    return

detector_units(det_data)

Get the detector data units for a given field.

This verifies that the specified detector data field has the same units in all observations where it occurs, and returns that unit.

Parameters:

Name Type Description Default
det_data str

The detector data field.

required

Returns:

Type Description
Unit

The unit used across all observations.

Source code in toast/data.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def detector_units(self, det_data):
    """Get the detector data units for a given field.

    This verifies that the specified detector data field has the same
    units in all observations where it occurs, and returns that unit.

    Args:
        det_data (str):  The detector data field.

    Returns:
        (Unit):  The unit used across all observations.

    """
    log = Logger.get()
    local_units = None
    for ob in self.obs:
        if det_data not in ob.detdata:
            continue
        ob_units = ob.detdata[det_data].units
        if local_units is None:
            local_units = ob_units
        else:
            if ob_units != local_units:
                msg = f"obs {ob.name} detdata {det_data} units "
                msg += f"{ob_units} != {local_units}"
                log.error(msg)
                raise RuntimeError(msg)
    if self.comm.comm_world is None:
        det_units = local_units
    else:
        det_units = self.comm.comm_world.gather(local_units, root=0)
        if self.comm.world_rank == 0:
            for dtu in det_units:
                if dtu != local_units:
                    msg = f"observations have different units "
                    msg += f"{dtu} != {local_units}"
                    log.error(msg)
                    raise RuntimeError(msg)
        # We know that every process is the same now
        det_units = local_units
    return det_units

info(handle=None)

Print information about the distributed data.

Information is written to the specified file handle. Only the rank 0 process writes.

Parameters:

Name Type Description Default
handle descriptor

file descriptor supporting the write() method. If None, use print().

None

Returns:

Type Description

None

Source code in toast/data.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def info(self, handle=None):
    """Print information about the distributed data.

    Information is written to the specified file handle.  Only the rank 0
    process writes.

    Args:
        handle (descriptor):  file descriptor supporting the write()
            method.  If None, use print().

    Returns:
        None

    """
    # Each process group gathers their output

    groupstr = ""
    procstr = ""

    gcomm = self._comm.comm_group
    wcomm = self._comm.comm_world
    rcomm = self._comm.comm_group_rank

    if wcomm is None:
        msg = "Data distributed over a single process (no MPI)"
        if handle is None:
            print(msg, flush=True)
        else:
            handle.write(msg)
    else:
        if wcomm.rank == 0:
            msg = "Data distributed over {} processes in {} groups\n".format(
                self._comm.world_size, self._comm.ngroups
            )
            if handle is None:
                print(msg, flush=True)
            else:
                handle.write(msg)

    def _get_optional(k, dt):
        if k in dt:
            return dt[k]
        else:
            return None

    for ob in self.obs:
        if self._comm.group_rank == 0:
            groupstr = "{}{}\n".format(groupstr, str(ob))

    # The world rank 0 process collects output from all groups and
    # writes to the handle

    recvgrp = ""
    if self._comm.world_rank == 0:
        if handle is None:
            print(groupstr, flush=True)
        else:
            handle.write(groupstr)
    if wcomm is not None:
        for g in range(1, self._comm.ngroups):
            if wcomm.rank == 0:
                recvgrp = rcomm.recv(source=g, tag=g)
                if handle is None:
                    print(recvgrp, flush=True)
                else:
                    handle.write(recvgrp)
            elif g == self._comm.group:
                if gcomm.rank == 0:
                    rcomm.send(groupstr, dest=0, tag=g)
            wcomm.barrier()
    return

select(obs_index=None, obs_name=None, obs_uid=None, obs_session_name=None, obs_key=None, obs_val=None)

Create a new Data object with a subset of observations.

The returned Data object just has a view of the original observations (they are not copied).

The list of observations in the new Data object is a logical OR of the criteria passed in: * Index location in the original list of observations * Name of the observation * UID of the observation * Session of the observation * Existence of the specified dictionary key * Required value of the specified dictionary key

Parameters:

Name Type Description Default
obs_index int

Observation location in the original list.

None
obs_name str

The observation name or a compiled regular expression object to use for matching.

None
obs_uid int

The observation UID to select.

None
obs_session_name str

The name of the session.

None
obs_key str

The observation dictionary key to examine.

None
obs_val str

The required value of the observation dictionary key or a compiled regular expression object to use for matching.

None

Returns:

Type Description
Data

A new Data object with references to the orginal metadata and a subset of observations.

Source code in toast/data.py
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
def select(
    self,
    obs_index=None,
    obs_name=None,
    obs_uid=None,
    obs_session_name=None,
    obs_key=None,
    obs_val=None,
):
    """Create a new Data object with a subset of observations.

    The returned Data object just has a view of the original observations (they
    are not copied).

    The list of observations in the new Data object is a logical OR of the
    criteria passed in:
        * Index location in the original list of observations
        * Name of the observation
        * UID of the observation
        * Session of the observation
        * Existence of the specified dictionary key
        * Required value of the specified dictionary key

    Args:
        obs_index (int):  Observation location in the original list.
        obs_name (str):  The observation name or a compiled regular expression
            object to use for matching.
        obs_uid (int):  The observation UID to select.
        obs_session_name (str):  The name of the session.
        obs_key (str):  The observation dictionary key to examine.
        obs_val (str):  The required value of the observation dictionary key or a
            compiled regular expression object to use for matching.

    Returns:
        (Data):  A new Data object with references to the orginal metadata and
            a subset of observations.

    """
    log = Logger.get()
    if obs_val is not None and obs_key is None:
        raise RuntimeError("If you specify obs_val, you must also specify obs_key")

    group_rank = self.comm.group_rank
    group_comm = self.comm.comm_group

    new_data = Data(comm=self._comm, view=True)

    # Use a reference to the original metadata
    new_data._internal = self._internal

    for iob, ob in enumerate(self.obs):
        if obs_index is not None and obs_index == iob:
            new_data.obs.append(ob)
            continue
        if obs_name is not None and ob.name is not None:
            if isinstance(obs_name, re.Pattern):
                if obs_name.match(ob.name) is not None:
                    new_data.obs.append(ob)
                    continue
            elif obs_name == ob.name:
                new_data.obs.append(ob)
                continue
        if obs_uid is not None and ob.uid is not None and obs_uid == ob.uid:
            new_data.obs.append(ob)
            continue
        if (
            obs_session_name is not None
            and ob.session is not None
            and obs_session_name == ob.session.name
        ):
            new_data.obs.append(ob)
            continue
        if obs_key is not None and obs_key in ob:
            # Get the values from all processes in the group and check
            # for consistency.
            group_vals = None
            if group_comm is None:
                group_vals = [ob[obs_key]]
            else:
                group_vals = group_comm.allgather(ob[obs_key])
            if group_vals.count(group_vals[0]) != len(group_vals):
                msg = f"observation {iob}, key '{obs_key}' has inconsistent values across processes"
                log.error_rank(msg, comm=group_comm)
                raise RuntimeError(msg)

            if obs_val is None:
                # We have the key, and are accepting any value
                new_data.obs.append(ob)
                continue
            elif isinstance(obs_val, re.Pattern):
                if obs_val.match(ob[obs_key]) is not None:
                    # Matches our regex
                    new_data.obs.append(ob)
                    continue
            elif obs_val == ob[obs_key]:
                new_data.obs.append(ob)
                continue
    return new_data

split(obs_index=False, obs_name=False, obs_uid=False, obs_session_name=False, obs_key=None, require_full=False)

Split the Data object.

Create new Data objects that have views into unique subsets of the observations (the observations are not copied). Only one "criteria" may be used to perform this splitting operation. The observations may be split by index in the original list, by name, by UID, by session, or by the value of a specified key.

The new Data objects are returned in a dictionary whose keys are the value of the selection criteria (index, name, uid, or value of the key). Any observation that cannot be placed (because it is missing a name, uid or key) will be ignored and not added to any of the returned Data objects. If the require_full parameter is set to True, such situations will raise an exception.

Parameters:

Name Type Description Default
obs_index bool

If True, split by index in original list of observations.

False
obs_name bool

If True, split by observation name.

False
obs_uid bool

If True, split by observation UID.

False
obs_session_name bool

If True, split by session name.

False
obs_key str

Split by values of this observation key.

None
require_full bool

If True, every observation must be placed in the output.

False

Returns:

Type Description
OrderedDict

The dictionary of new Data objects.

Source code in toast/data.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
def split(
    self,
    obs_index=False,
    obs_name=False,
    obs_uid=False,
    obs_session_name=False,
    obs_key=None,
    require_full=False,
):
    """Split the Data object.

    Create new Data objects that have views into unique subsets of the observations
    (the observations are not copied).  Only one "criteria" may be used to perform
    this splitting operation.  The observations may be split by index in the
    original list, by name, by UID, by session, or by the value of a specified key.

    The new Data objects are returned in a dictionary whose keys are the value of
    the selection criteria (index, name, uid, or value of the key).  Any observation
    that cannot be placed (because it is missing a name, uid or key) will be ignored
    and not added to any of the returned Data objects.  If the `require_full`
    parameter is set to True, such situations will raise an exception.

    Args:
        obs_index (bool):  If True, split by index in original list of observations.
        obs_name (bool):  If True, split by observation name.
        obs_uid (bool):  If True, split by observation UID.
        obs_session_name (bool):  If True, split by session name.
        obs_key (str):  Split by values of this observation key.
        require_full (bool):  If True, every observation must be placed in the
            output.

    Returns:
        (OrderedDict):  The dictionary of new Data objects.

    """
    log = Logger.get()
    check = (
        int(obs_index)
        + int(obs_name)
        + int(obs_uid)
        + int(obs_session_name)
        + int(obs_key is not None)
    )
    if check == 0 or check > 1:
        raise RuntimeError("You must specify exactly one split criteria")

    datasplit = OrderedDict()

    group_rank = self.comm.group_rank
    group_comm = self.comm.comm_group

    if obs_index:
        # Splitting by (unique) index
        for iob, ob in enumerate(self.obs):
            newdat = Data(comm=self._comm, view=True)
            newdat._internal = self._internal
            newdat.obs.append(ob)
            datasplit[iob] = newdat
    elif obs_name:
        # Splitting by (unique) name
        for iob, ob in enumerate(self.obs):
            if ob.name is None:
                if require_full:
                    msg = f"require_full is True, but observation {iob} has no name"
                    log.error_rank(msg, comm=group_comm)
                    raise RuntimeError(msg)
            else:
                newdat = Data(comm=self._comm, view=True)
                newdat._internal = self._internal
                newdat.obs.append(ob)
                datasplit[ob.name] = newdat
    elif obs_uid:
        # Splitting by UID
        for iob, ob in enumerate(self.obs):
            if ob.uid is None:
                if require_full:
                    msg = f"require_full is True, but observation {iob} has no UID"
                    log.error_rank(msg, comm=group_comm)
                    raise RuntimeError(msg)
            else:
                newdat = Data(comm=self._comm, view=True)
                newdat._internal = self._internal
                newdat.obs.append(ob)
                datasplit[ob.uid] = newdat
    elif obs_session_name:
        # Splitting by (non-unique) session name
        for iob, ob in enumerate(self.obs):
            if ob.session is None or ob.session.name is None:
                if require_full:
                    msg = f"require_full is True, but observation {iob} has no session name"
                    log.error_rank(msg, comm=group_comm)
                    raise RuntimeError(msg)
            else:
                sname = ob.session.name
                if sname not in datasplit:
                    newdat = Data(comm=self._comm, view=True)
                    newdat._internal = self._internal
                    datasplit[sname] = newdat
                datasplit[sname].obs.append(ob)
    elif obs_key is not None:
        # Splitting by arbitrary key.  Unlike name / uid which are built it to the
        # observation class, arbitrary keys might be modified in different ways
        # across all processes in a group.  For this reason, we do an additional
        # check for consistent values across the process group.
        for iob, ob in enumerate(self.obs):
            if obs_key not in ob:
                if require_full:
                    msg = f"require_full is True, but observation {iob} has no key '{obs_key}'"
                    log.error_rank(msg, comm=group_comm)
                    raise RuntimeError(msg)
            else:
                obs_val = ob[obs_key]
                # Get the values from all processes in the group
                group_vals = None
                if group_comm is None:
                    group_vals = [obs_val]
                else:
                    group_vals = group_comm.allgather(obs_val)
                if group_vals.count(group_vals[0]) != len(group_vals):
                    msg = f"observation {iob}, key '{obs_key}' has inconsistent values across processes"
                    log.error_rank(msg, comm=group_comm)
                    raise RuntimeError(msg)
                if obs_val not in datasplit:
                    newdat = Data(comm=self._comm, view=True)
                    newdat._internal = self._internal
                    datasplit[obs_val] = newdat
                datasplit[obs_val].obs.append(ob)
    return datasplit

Environment

Bases: pybind11_builtins.pybind11_object

Global runtime environment.

This singleton class provides a unified place to parse environment variables at runtime and to change global settings that impact the overall package.

__doc__ = '\n Global runtime environment.\n\n This singleton class provides a unified place to parse environment\n variables at runtime and to change global settings that impact the\n overall package.\n\n ' class

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

__module__ = 'toast._libtoast' class

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

__init__(*args, **kwargs) method descriptor

Initialize self. See help(type(self)) for accurate signature.

__repr__() method descriptor

repr(self: toast._libtoast.Environment) -> str

current_threads() method descriptor

current_threads(self: toast._libtoast.Environment) -> int

Return the current threading concurrency in use.

disable_function_timers() method descriptor

disable_function_timers(self: toast._libtoast.Environment) -> None

Explicitly disable function timers.

enable_function_timers() method descriptor

enable_function_timers(self: toast._libtoast.Environment) -> None

Explicitly enable function timers.

function_timers() method descriptor

function_timers(self: toast._libtoast.Environment) -> bool

Return True if function timing has been enabled.

get() method descriptor

get() -> toast._libtoast.Environment

Get a handle to the global environment class.

get_acc() method descriptor

get_acc(self: toast._libtoast.Environment) -> tuple

Get the OpenACC device properties.

Returns:

Type Description
tuple

The (num devices, proc per device, my device) integers.

log_level() method descriptor

log_level(self: toast._libtoast.Environment) -> str

Return the string of the current Logging level.

max_threads() method descriptor

max_threads(self: toast._libtoast.Environment) -> int

Returns the maximum number of threads used by compiled code.

set_acc() method descriptor

set_acc(self: toast._libtoast.Environment, n_acc_device: int, n_acc_proc_per_device: int, my_acc_device: int) -> None

Set the OpenACC device properties.

Parameters:

Name Type Description Default
n_acc_device int

The number of accelerator devices.

required
n_acc_proc_per_device int

The number of processes sharing each device.

required
my_acc_device int

The device to use for this process.

required

Returns:

Type Description

None

set_log_level() method descriptor

set_log_level(self: toast._libtoast.Environment, level: str) -> None

Set the Logging level.

Parameters:

Name Type Description Default
level str

one of DEBUG, INFO, WARNING, ERROR or CRITICAL.

required

Returns:

Type Description

None

set_threads() method descriptor

set_threads(self: toast._libtoast.Environment, nthread: int) -> None

Set the number of threads in use.

Parameters:

Name Type Description Default
nthread int

The number of threads to use.

required

Returns:

Type Description

None

signals() method descriptor

signals(self: toast._libtoast.Environment) -> list[str]

Return a list of the currently available signals.

tod_buffer_length() method descriptor

tod_buffer_length(self: toast._libtoast.Environment) -> int

Returns the number of samples to buffer for TOD operations.

version() method descriptor

version(self: toast._libtoast.Environment) -> str

Return the current source code version string.

Focalplane

Bases: object

Class representing the focalplane for one observation.

The detector_data Table may store arbitrary columns, but several are required. They include:

"name":  The detector name.
"quat":  Each row should be a 4-element numpy array.
"gamma":  If using a half wave plate, we need the rotation angle of the
    detector polarization orientation from the focalplane frame X-axis.

Some columns are optional:

"uid":  Unique integer ID for each detector.  Computed from detector name if
    not specified.
"pol_angle":  Quantity to specify the polarization angle.  Default assumes
    the polarization sensitive direction is aligned with the detector
    quaternion rotation.  Computed if not specified.
"pol_leakage":  Float value "epsilon" between 0-1.  Set to zero by default.
"pol_efficiency":  Float value "eta" = (1 - epsilon) / (1 + epsilon).  Set
    to one by default.
"fwhm":  Quantity with the nominal beam FWHM.  Used for plotting and for
    smoothing of simulated sky signal with PySM.
"bandcenter":  Quantity for the band center.  Used for bandpass integration
    with PySM simulations.
"bandwidth":  Quantity for width of the band.  Used for bandpass integration
    with PySM simulations.
"psd_net":  The detector sensitivity.  Quantity used to create a synthetic
    noise model with the DefaultNoiseModel operator.
"psd_fknee":  Quantity used to create a synthetic noise model with the
    DefaultNoiseModel operator.
"psd_fmin":  Quantity used to create a synthetic noise model with the
    DefaultNoiseModel operator.
"psd_alpha":  Quantity used to create a synthetic noise model with the
    DefaultNoiseModel operator.
"elevation_noise_a" and "elevation_noise_c":  Parameters of elevation scaling
    noise model: PSD_{out} = PSD_{ref} * (a / sin(el) + c)^2.  Only applicable
    to ground data.
"pwv_a0", "pwv_a1" and "pwv_a2":  quadratic fit of the NET modulation by
    PWV.  Only applicable to ground data.

Parameters:

Name Type Description Default
detector_data QTable

Table of detector properties.

None
field_of_view Quantity

Angular diameter of the focal plane. Used to increase the effective size of the focalplane when simulating atmosphere, etc. Will be calculated from the detector offsets by default.

None
sample_rate Quantity

The common (nominal) sample rate for all detectors.

None
thinfp int

Only sample the detectors in the file.

None
Source code in toast/instrument.py
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
class Focalplane(object):
    """Class representing the focalplane for one observation.

    The detector_data Table may store arbitrary columns, but several are required.
    They include:

        "name":  The detector name.
        "quat":  Each row should be a 4-element numpy array.
        "gamma":  If using a half wave plate, we need the rotation angle of the
            detector polarization orientation from the focalplane frame X-axis.

    Some columns are optional:

        "uid":  Unique integer ID for each detector.  Computed from detector name if
            not specified.
        "pol_angle":  Quantity to specify the polarization angle.  Default assumes
            the polarization sensitive direction is aligned with the detector
            quaternion rotation.  Computed if not specified.
        "pol_leakage":  Float value "epsilon" between 0-1.  Set to zero by default.
        "pol_efficiency":  Float value "eta" = (1 - epsilon) / (1 + epsilon).  Set
            to one by default.
        "fwhm":  Quantity with the nominal beam FWHM.  Used for plotting and for
            smoothing of simulated sky signal with PySM.
        "bandcenter":  Quantity for the band center.  Used for bandpass integration
            with PySM simulations.
        "bandwidth":  Quantity for width of the band.  Used for bandpass integration
            with PySM simulations.
        "psd_net":  The detector sensitivity.  Quantity used to create a synthetic
            noise model with the DefaultNoiseModel operator.
        "psd_fknee":  Quantity used to create a synthetic noise model with the
            DefaultNoiseModel operator.
        "psd_fmin":  Quantity used to create a synthetic noise model with the
            DefaultNoiseModel operator.
        "psd_alpha":  Quantity used to create a synthetic noise model with the
            DefaultNoiseModel operator.
        "elevation_noise_a" and "elevation_noise_c":  Parameters of elevation scaling
            noise model: PSD_{out} = PSD_{ref} * (a / sin(el) + c)^2.  Only applicable
            to ground data.
        "pwv_a0", "pwv_a1" and "pwv_a2":  quadratic fit of the NET modulation by
            PWV.  Only applicable to ground data.

    Args:
        detector_data (QTable):  Table of detector properties.
        field_of_view (Quantity):  Angular diameter of the focal plane.  Used to
            increase the effective size of the focalplane when simulating atmosphere,
            etc.  Will be calculated from the detector offsets by default.
        sample_rate (Quantity):  The common (nominal) sample rate for all detectors.
        thinfp (int):  Only sample the detectors in the file.

    """

    XAXIS, YAXIS, ZAXIS = np.eye(3)

    @function_timer
    def __init__(
        self,
        detector_data=None,
        field_of_view=None,
        sample_rate=None,
        thinfp=None,
    ):
        self.detector_data = detector_data
        self.field_of_view = field_of_view
        self.sample_rate = sample_rate
        self.thinfp = thinfp
        if detector_data is not None and len(detector_data) > 0:
            # We have some dets
            self._initialize()

    @function_timer
    def _initialize(self):
        log = Logger.get()

        if self.thinfp is not None:
            # Pick only every `thinfp` pixel on the focal plane
            ndet = len(self.detector_data)
            for idet in range(ndet - 1, -1, -1):
                if int(idet // 2) % self.thinfp != 0:
                    del self.detector_data[idet]

        # Add UID if not given
        if "uid" not in self.detector_data.colnames:
            self.detector_data.add_column(
                Column(
                    name="uid", data=[name_UID(x["name"]) for x in self.detector_data]
                )
            )

        # Build index of detector to table row
        self._det_to_row = {y["name"]: x for x, y in enumerate(self.detector_data)}

        if self.field_of_view is None:
            self._compute_fov()
        self._get_pol_angles()
        self._get_pol_efficiency()
        self._get_bandpass()

    @function_timer
    def _get_bandpass(self):
        """Use the bandpass parameters to instantiate a bandpass model"""

        if "bandcenter" in self.detector_data.colnames:
            bandcenter = {}
            bandwidth = {}
            for row in self.detector_data:
                name = row["name"]
                bandcenter[name] = row["bandcenter"]
                bandwidth[name] = row["bandwidth"]
            self.bandpass = Bandpass(bandcenter, bandwidth)
        else:
            self.bandpass = None
        return

    @function_timer
    def _compute_fov(self):
        """Compute the field of view"""
        # Find the largest distance from the bore sight
        cosangs = list()
        for row in self.detector_data:
            quat = row["quat"]
            vec = qarray.rotate(quat, self.ZAXIS)
            cosangs.append(np.dot(self.ZAXIS, vec))
        mincos = np.amin(cosangs)
        # Add a very small margin to avoid numeric issues
        # in the atmospheric simulation
        self.field_of_view = 1.01 * 2.0 * np.arccos(mincos) * u.radian
        # If we just have boresight detectors, we will need to give this some non-zero
        # value.
        if self.field_of_view == 0:
            self.field_of_view = 1.0 * u.degree

    @function_timer
    def _get_pol_angles(self):
        """Get the detector polarization angles from the quaternions"""

        if "pol_angle" not in self.detector_data.colnames:
            n_rows = len(self.detector_data)
            self.detector_data.add_column(
                Column(name="pol_angle", length=n_rows, unit=u.radian)
            )
            for row in self.detector_data:
                quat = row["quat"]
                a = quat[3]
                d = quat[2]
                pol_angle = np.arctan2(2 * a * d, a**2 - d**2) % np.pi
                row["pol_angle"] = pol_angle * u.radian

    @function_timer
    def _get_pol_efficiency(self):
        """Get the polarization efficiency from polarization leakage or vice versa"""

        n_rows = len(self.detector_data)
        if ("pol_leakage" in self.detector_data.colnames) and (
            "pol_efficiency" in self.detector_data.colnames
        ):
            # Check that efficiency and leakage are consistent
            epsilon = self.detector_data["pol_leakage"]
            eta = self.detector_data["pol_efficiency"]
            np.testing.assert_allclose(
                eta,
                (1 + epsilon) / (1 - epsilon),
                rtol=1e-6,
                err_msg="inconsistent polarization leakage and efficiency",
            )
            return
        elif "pol_leakage" in self.detector_data.colnames:
            self.detector_data.add_column(
                Column(
                    name="pol_efficiency",
                    data=[(1 - x) / (1 + x) for x in self.detector_data["pol_leakage"]],
                )
            )
        elif "pol_efficiency" in self.detector_data.colnames:
            self.detector_data.add_column(
                Column(
                    name="pol_leakage",
                    data=[
                        (1 - x) / (1 + x) for x in self.detector_data["pol_efficiency"]
                    ],
                )
            )
        else:
            self.detector_data.add_column(
                Column(name="pol_efficiency", data=np.ones(n_rows))
            )
            self.detector_data.add_column(
                Column(name="pol_leakage", data=np.zeros(n_rows))
            )

    def __contains__(self, key):
        return key in self._det_to_row

    def __getitem__(self, key):
        return self.detector_data[self._det_to_row[key]]

    def __setitem__(self, key, value):
        if key not in self._det_to_row:
            msg = "cannot assign to non-existent detector '{}'".format(key)
            raise ValueError(msg)
        indx = self._det_to_row[key]
        if hasattr(value, "fields"):
            # numpy structured array
            if value.fields is None:
                raise ValueError("assignment value must be structured")
            for cname, ctype in value.fields.items():
                if cname not in self.detector_data.colnames:
                    msg = "assignment value element '{}' is not a det column".format(
                        cname
                    )
                    raise ValueError(msg)
                self.detector_data[indx][cname] = value[cname]
        elif hasattr(value, "colnames"):
            # table row
            for c in value.colnames:
                if c not in self.detector_data.colnames:
                    msg = "assignment value element '{}' is not a det column".format(c)
                    raise ValueError(msg)
                self.detector_data[indx][c] = value[c]
        else:
            # see if it is like a dictionary
            try:
                for k, v in value.items():
                    if k not in self.detector_data.colnames:
                        msg = (
                            "assignment value element '{}' is not a det column".format(
                                k
                            )
                        )
                        raise ValueError(msg)
                    self.detector_data[indx][k] = v
            except Exception:
                raise ValueError(
                    "assignment value must be a dictionary, Row, or structured array"
                )

    @property
    def detectors(self):
        return list(self._det_to_row.keys())

    @property
    def n_detectors(self):
        return len(self._det_to_row.keys())

    def keys(self):
        return self.detectors

    @function_timer
    def detector_groups(self, column):
        """Group detectors by a common value in one property.

        This returns a dictionary whose keys are the unique values of the specified
        detector_data column.  The values for each key are a list of detectors that
        have that value.  This can be useful for creating detector sets for data
        distribution or for considering detectors with correlations.

        Since the column values will be used for dictionary keys, the column must
        be a data type which is hashable.

        Args:
            column (str):  The detector_data column.

        Returns:
            (dict):  The detector names grouped by unique column values.

        """
        if column not in self.detector_data.colnames:
            raise RuntimeError(f"'{column}' is not a valid det data column")
        detgroups = dict()
        for d in self.detectors:
            indx = self._det_to_row[d]
            val = self.detector_data[column][indx]
            if val not in detgroups:
                detgroups[val] = list()
            detgroups[val].append(d)
        return detgroups

    def __repr__(self):
        value = "<Focalplane: {} detectors, sample_rate = {} Hz, FOV = {} deg, detectors = [".format(
            len(self.detector_data),
            self.sample_rate.to_value(u.Hz),
            self.field_of_view.to_value(u.degree),
        )
        value += "{} .. {}".format(self.detectors[0], self.detectors[-1])
        value += "]>"
        return value

    def __eq__(self, other):
        if self.sample_rate != other.sample_rate:
            return False
        if self.field_of_view != other.field_of_view:
            return False
        if self.detectors != other.detectors:
            return False
        if self.detector_data.colnames != other.detector_data.colnames:
            return False
        if not self.detector_data.values_equal(other.detector_data):
            return False
        return True

    def __ne__(self, other):
        return not self.__eq__(other)

    def load_hdf5(self, handle, comm=None, **kwargs):
        """Load the focalplane from an HDF5 group.

        Args:
            handle (h5py.Group):  The group containing the "focalplane" dataset.
            comm (MPI.Comm):  If loading from a file, optional communicator to broadcast
                across.

        Returns:
            None

        """
        log = Logger.get()

        # Determine if we need to broadcast results.  This occurs if only one process
        # has the file open but the communicator has more than one process.
        need_bcast = hdf5_use_serial(handle, comm)

        if self.detector_data is not None:
            raise RuntimeError("Reading detector data over existing table")

        if handle is not None:
            self.detector_data = read_table_hdf5(handle, path="focalplane")

        if need_bcast and comm is not None:
            self.detector_data = comm.bcast(self.detector_data, root=0)

        # Only use the sampling rate recorded in the file if it was not
        # overridden in the constructor
        if self.sample_rate is None:
            self.sample_rate = self.detector_data.meta["sample_rate"]
        if self.field_of_view is None and "field_of_view" in self.detector_data.meta:
            self.field_of_view = self.detector_data.meta["field_of_view"]

        # Initialize other properties
        self._initialize()

        log.debug_rank(
            f"Focalplane has {len(self.detector_data)} detectors that span "
            f"{self.field_of_view.to_value(u.deg):.3f} degrees and are sampled at "
            f"{self.sample_rate.to_value(u.Hz)} Hz.",
            comm=comm,
        )

    def save_hdf5(self, handle, comm=None, **kwargs):
        """Save the focalplane to an HDF5 group.

        Args:
            handle (h5py.Group):  The parent group of the focalplane dataset.
            comm (MPI.Comm):  If loading from a file, optional communicator to broadcast
                across.

        Returns:
            None

        """
        self.detector_data.meta["sample_rate"] = self.sample_rate
        self.detector_data.meta["field_of_view"] = self.field_of_view
        table_write_parallel_hdf5(self.detector_data, handle, "focalplane", comm=comm)

detector_groups(column)

Group detectors by a common value in one property.

This returns a dictionary whose keys are the unique values of the specified detector_data column. The values for each key are a list of detectors that have that value. This can be useful for creating detector sets for data distribution or for considering detectors with correlations.

Since the column values will be used for dictionary keys, the column must be a data type which is hashable.

Parameters:

Name Type Description Default
column str

The detector_data column.

required

Returns:

Type Description
dict

The detector names grouped by unique column values.

Source code in toast/instrument.py
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
@function_timer
def detector_groups(self, column):
    """Group detectors by a common value in one property.

    This returns a dictionary whose keys are the unique values of the specified
    detector_data column.  The values for each key are a list of detectors that
    have that value.  This can be useful for creating detector sets for data
    distribution or for considering detectors with correlations.

    Since the column values will be used for dictionary keys, the column must
    be a data type which is hashable.

    Args:
        column (str):  The detector_data column.

    Returns:
        (dict):  The detector names grouped by unique column values.

    """
    if column not in self.detector_data.colnames:
        raise RuntimeError(f"'{column}' is not a valid det data column")
    detgroups = dict()
    for d in self.detectors:
        indx = self._det_to_row[d]
        val = self.detector_data[column][indx]
        if val not in detgroups:
            detgroups[val] = list()
        detgroups[val].append(d)
    return detgroups

load_hdf5(handle, comm=None, **kwargs)

Load the focalplane from an HDF5 group.

Parameters:

Name Type Description Default
handle Group

The group containing the "focalplane" dataset.

required
comm Comm

If loading from a file, optional communicator to broadcast across.

None

Returns:

Type Description

None

Source code in toast/instrument.py
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
def load_hdf5(self, handle, comm=None, **kwargs):
    """Load the focalplane from an HDF5 group.

    Args:
        handle (h5py.Group):  The group containing the "focalplane" dataset.
        comm (MPI.Comm):  If loading from a file, optional communicator to broadcast
            across.

    Returns:
        None

    """
    log = Logger.get()

    # Determine if we need to broadcast results.  This occurs if only one process
    # has the file open but the communicator has more than one process.
    need_bcast = hdf5_use_serial(handle, comm)

    if self.detector_data is not None:
        raise RuntimeError("Reading detector data over existing table")

    if handle is not None:
        self.detector_data = read_table_hdf5(handle, path="focalplane")

    if need_bcast and comm is not None:
        self.detector_data = comm.bcast(self.detector_data, root=0)

    # Only use the sampling rate recorded in the file if it was not
    # overridden in the constructor
    if self.sample_rate is None:
        self.sample_rate = self.detector_data.meta["sample_rate"]
    if self.field_of_view is None and "field_of_view" in self.detector_data.meta:
        self.field_of_view = self.detector_data.meta["field_of_view"]

    # Initialize other properties
    self._initialize()

    log.debug_rank(
        f"Focalplane has {len(self.detector_data)} detectors that span "
        f"{self.field_of_view.to_value(u.deg):.3f} degrees and are sampled at "
        f"{self.sample_rate.to_value(u.Hz)} Hz.",
        comm=comm,
    )

save_hdf5(handle, comm=None, **kwargs)

Save the focalplane to an HDF5 group.

Parameters:

Name Type Description Default
handle Group

The parent group of the focalplane dataset.

required
comm Comm

If loading from a file, optional communicator to broadcast across.

None

Returns:

Type Description

None

Source code in toast/instrument.py
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
def save_hdf5(self, handle, comm=None, **kwargs):
    """Save the focalplane to an HDF5 group.

    Args:
        handle (h5py.Group):  The parent group of the focalplane dataset.
        comm (MPI.Comm):  If loading from a file, optional communicator to broadcast
            across.

    Returns:
        None

    """
    self.detector_data.meta["sample_rate"] = self.sample_rate
    self.detector_data.meta["field_of_view"] = self.field_of_view
    table_write_parallel_hdf5(self.detector_data, handle, "focalplane", comm=comm)

GlobalTimers

Bases: pybind11_builtins.pybind11_object

Global timer registry.

This singleton class stores timers that can be started / stopped anywhere in the code to accumulate the total time for different operations.

__doc__ = '\n Global timer registry.\n\n This singleton class stores timers that can be started / stopped\n anywhere in the code to accumulate the total time for different\n operations.\n ' class

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

__module__ = 'toast._libtoast' class

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

__init__(*args, **kwargs) method descriptor

Initialize self. See help(type(self)) for accurate signature.

clear_all() method descriptor

clear_all(self: toast._libtoast.GlobalTimers) -> None

Clear all global timers.

collect() method descriptor

collect(self: toast._libtoast.GlobalTimers) -> dict

Stop all timers and return the current state.

Returns:

Type Description
dict

A dictionary of Timers.

get() method descriptor

get() -> toast._libtoast.GlobalTimers

Get a handle to the singleton class.

is_running() method descriptor

is_running(self: toast._libtoast.GlobalTimers, name: str) -> bool

Is the specified timer running?

Parameters:

Name Type Description Default
name str

The name of the global timer.

required

Returns:

Type Description
bool

True if the timer is running, else False.

names() method descriptor

names(self: toast._libtoast.GlobalTimers) -> list[str]

Return the names of all currently registered timers.

Returns:

Type Description
list

The names of the timers.

report() method descriptor

report(self: toast._libtoast.GlobalTimers) -> None

Report results of all global timers to STDOUT.

seconds() method descriptor

seconds(self: toast._libtoast.GlobalTimers, name: str) -> float

Get the elapsed time for a timer.

The timer must be stopped.

Parameters:

Name Type Description Default
name str

The name of the global timer.

required

Returns:

Type Description
float

The elapsed time in seconds.

start() method descriptor

start(self: toast._libtoast.GlobalTimers, name: str) -> None

Start the specified timer.

If the named timer does not exist, it is first created before being started.

Parameters:

Name Type Description Default
name str

The name of the global timer.

required

Returns:

Type Description

None

stop() method descriptor

stop(self: toast._libtoast.GlobalTimers, name: str) -> None

Stop the specified timer.

The timer must already exist.

Parameters:

Name Type Description Default
name str

The name of the global timer.

required

Returns:

Type Description

None

stop_all() method descriptor

stop_all(self: toast._libtoast.GlobalTimers) -> None

Stop all global timers.

GroundSite

Bases: Site

Site on the Earth.

This represents a fixed location on the Earth.

Parameters:

Name Type Description Default
name str

Site name

required
lat Quantity

Site latitude.

required
lon Quantity

Site longitude.

required
alt Quantity

Site altitude.

required
uid int

Unique identifier. If not specified, constructed from a hash of the site name.

None
weather Weather

Weather information for this site.

None
Source code in toast/instrument.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
class GroundSite(Site):
    """Site on the Earth.

    This represents a fixed location on the Earth.

    Args:
        name (str):  Site name
        lat (Quantity):  Site latitude.
        lon (Quantity):  Site longitude.
        alt (Quantity):  Site altitude.
        uid (int):  Unique identifier.  If not specified, constructed from a hash
            of the site name.
        weather (Weather):  Weather information for this site.
    """

    def __init__(self, name, lat, lon, alt, uid=None, weather=None):
        super().__init__(name, uid)
        self.earthloc = coord.EarthLocation.from_geodetic(lon, lat, height=alt)
        self.weather = weather

    def __repr__(self):
        value = "<GroundSite '{}' : uid = {}, lon = {}, lat = {}, alt = {}, weather = {}>".format(
            self.name,
            self.uid,
            self.earthloc.lon,
            self.earthloc.lat,
            self.earthloc.height,
            self.weather,
        )
        return value

    def __eq__(self, other):
        if self.name != other.name:
            return False
        if self.uid != other.uid:
            return False
        if not np.isclose(other.earthloc.lon, self.earthloc.lon):
            return False
        if not np.isclose(other.earthloc.lat, self.earthloc.lat):
            return False
        if not np.isclose(other.earthloc.height, self.earthloc.height):
            return False
        if self.weather != other.weather:
            return False
        return True

    def _position_velocity(self, times):
        # Compute data at 10 second intervals and interpolate.  If the timestamps are
        # more coarsely sampled than that, just compute those times directly.
        sparse_incr = 10.0
        do_interp = True
        if len(times) < 100 or (times[1] - times[0]) > sparse_incr:
            do_interp = False

        if do_interp:
            n_sparse = int((times[-1] - times[0]) / sparse_incr)
            sparse_times = np.linspace(times[0], times[-1], num=n_sparse, endpoint=True)
        else:
            n_sparse = len(times)
            sparse_times = times
        pos_x = np.zeros(n_sparse, np.float64)
        pos_y = np.zeros(n_sparse, np.float64)
        pos_z = np.zeros(n_sparse, np.float64)
        vel_x = np.zeros(n_sparse, np.float64)
        vel_y = np.zeros(n_sparse, np.float64)
        vel_z = np.zeros(n_sparse, np.float64)
        for i, t in enumerate(sparse_times):
            atime = astime.Time(t, format="unix")
            p, v = coord.get_body_barycentric_posvel("earth", atime)
            # FIXME:  apply translation from earth center to earth location.
            # itrs = self.earthloc.get_itrs(obstime)
            pm = p.xyz.to_value(u.kilometer)
            vm = v.xyz.to_value(u.kilometer / u.second)
            pos_x[i] = pm[0]
            pos_y[i] = pm[1]
            pos_z[i] = pm[2]
            vel_x[i] = vm[0]
            vel_y[i] = vm[1]
            vel_z[i] = vm[2]

        if do_interp:
            pos_x = np.interp(times, sparse_times, pos_x)
            pos_y = np.interp(times, sparse_times, pos_y)
            pos_z = np.interp(times, sparse_times, pos_z)
            vel_x = np.interp(times, sparse_times, vel_x)
            vel_y = np.interp(times, sparse_times, vel_y)
            vel_z = np.interp(times, sparse_times, vel_z)
        pos = np.stack([pos_x, pos_y, pos_z], axis=-1)
        vel = np.stack([vel_x, vel_y, vel_z], axis=-1)
        return pos, vel

    def _position(self, times):
        p, v = self._position_velocity(times)
        return p

    def _velocity(self, times):
        p, v = self._position_velocity(times)
        return v

IntervalList

Bases: Sequence, AcceleratorObject

An list of Intervals which supports logical operations.

The timestamps define the valid local range of intervals. When constructing from intervals, timespans, or samplespans, the inputs are truncated to the allowed range given by the timestamps.

Parameters:

Name Type Description Default
timestamps array

Array of local sample times, required.

required
intervals list

An existing IntervalsList or raw intervals array.

None
timespans list

A list of tuples containing start and stop times.

None
samplespans list

A list of tuples containing first and last (inclusive) sample ranges.

None
Source code in toast/intervals.py
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
class IntervalList(Sequence, AcceleratorObject):
    """An list of Intervals which supports logical operations.

    The timestamps define the valid local range of intervals.  When constructing
    from intervals, timespans, or samplespans, the inputs are truncated to the
    allowed range given by the timestamps.

    Args:
        timestamps (array):  Array of local sample times, required.
        intervals (list):  An existing IntervalsList or raw intervals array.
        timespans (list):  A list of tuples containing start and stop times.
        samplespans (list):  A list of tuples containing first and last (inclusive)
            sample ranges.

    """

    def __init__(self, timestamps, intervals=None, timespans=None, samplespans=None):
        super().__init__()
        self.timestamps = timestamps
        if intervals is not None:
            # Construct intervals using timespans from the provided intervals
            if timespans is not None or samplespans is not None:
                raise RuntimeError(
                    "If constructing from intervals, other spans should be None"
                )
            if len(intervals) == 0:
                self.data = np.zeros(0, dtype=interval_dtype).view(np.recarray)
            else:
                timespans = [(x.start, x.stop) for x in intervals]
                indices, times = self._find_indices(timespans)
                self.data = np.array(
                    [
                        (time[0], time[1], ind[0], ind[1])
                        for (time, ind) in zip(times, indices)
                    ],
                    dtype=interval_dtype,
                ).view(np.recarray)
        elif timespans is not None:
            # Construct intervals using provided timespans
            if samplespans is not None:
                raise RuntimeError("Cannot construct from both time and sample spans")
            if len(timespans) == 0:
                self.data = np.zeros(0, dtype=interval_dtype).view(np.recarray)
            else:
                timespans = np.vstack(timespans)
                for i in range(len(timespans) - 1):
                    if np.isclose(timespans[i][1], timespans[i + 1][0], rtol=1e-12):
                        # Force nearly equal timestamps to match
                        timespans[i][1] = timespans[i + 1][0]
                    if timespans[i][1] > timespans[i + 1][0]:
                        raise RuntimeError("Timespans must be sorted and disjoint")
                indices, times = self._find_indices(timespans)
                self.data = np.array(
                    [
                        (time[0], time[1], ind[0], ind[1])
                        for (time, ind) in zip(times, indices)
                    ],
                    dtype=interval_dtype,
                ).view(np.recarray)
        elif samplespans is not None:
            # Construct intervals from sample ranges
            if len(samplespans) == 0:
                self.data = np.zeros(0, dtype=interval_dtype).view(np.recarray)
            else:
                for i in range(len(samplespans) - 1):
                    if samplespans[i][1] > samplespans[i + 1][0]:
                        raise RuntimeError("Sample spans must be sorted and disjoint")
                builder = list()
                for first, last in samplespans:
                    if last < 0 or first >= len(self.timestamps):
                        continue
                    if first < 0:
                        first = 0
                    if last > len(self.timestamps):
                        last = len(self.timestamps)
                    builder.append(
                        (self._sample_time(first), self._sample_time(last), first, last)
                    )
                self.data = np.array(builder, dtype=interval_dtype).view(np.recarray)
        else:
            # No data yet
            self.data = np.zeros(0, dtype=interval_dtype).view(np.recarray)

    def _sample_time(self, sample):
        nsample = len(self.timestamps)
        if sample < 0 or sample > nsample:
            msg = f"Invalid sample index: {sample} not in [0, {nsample}]"
            raise RuntimeError(msg)
        if sample == nsample:
            # Handle the end of the timestamps differently
            return self.timestamps[sample - 1]
        else:
            return self.timestamps[sample]

    def _find_indices(self, timespans):
        # Each interval covers all samples where the sample time meets:
        # interval.start <= self.timestamps AND self.timestamps < interval.stop
        # (open-ended interval)
        # with one exception: if the interval ends at the last timestamp, the
        # corresponding sample is included (closed interval)
        start_time, stop_time = np.vstack(timespans).T
        # Cut out timespans that do not overlap with the available timestamps
        good = np.logical_and(
            start_time < self.timestamps[-1], stop_time > self.timestamps[0]
        )
        start_time = start_time[good]
        stop_time = stop_time[good]
        start_indx = np.searchsorted(self.timestamps, start_time, side="left")
        stop_indx = np.searchsorted(self.timestamps, stop_time, side="left")
        # Include the last sample where the stop time matches the last time stamp
        nsample = len(self.timestamps)
        stop_indx[stop_indx == nsample - 1] = nsample
        times = list()
        samples = list()
        for start, stop, first, last in zip(
                start_time, stop_time, start_indx, stop_indx
        ):
            times.append((start, stop))
            samples.append((first, last))
        return samples, times

    def __getitem__(self, key):
        return self.data[key]

    def __delitem__(self, key):
        raise RuntimeError("Cannot delete individual elements from an IntervalList")
        return

    def __contains__(self, item):
        for ival in self.data:
            if ival == item:
                return True
        return False

    def __iter__(self):
        return iter(self.data)

    def __len__(self):
        return len(self.data)

    def __repr__(self):
        return self.data.__repr__()

    def __eq__(self, other):
        if len(self.data) != len(other):
            return False
        if len(self.timestamps) != len(other.timestamps):
            return False
        # Comparing timestamps with default tolerances to np.isclose
        # is always True.  Must use sufficiently tight tolerances
        if not np.isclose(self.timestamps[0], other.timestamps[0], rtol=1e-12) \
           or not np.isclose( self.timestamps[-1], other.timestamps[-1], rtol=1e-12):
            return False
        for s, o in zip(self.data, other.data):
            if s.first != o.first:
                return False
            if s.last != o.last:
                return False
        return True

    def __ne__(self, other):
        return not self.__eq__(other)

    def simplify(self):
        if len(self.data) == 0:
            return
        propose = list()
        first = self.data[0].first
        last = self.data[0].last
        start = self.data[0].start
        stop = self.data[0].stop
        for i in range(1, len(self.data)):
            cur_first = self.data[i].first
            cur_last = self.data[i].last
            cur_start = self.data[i].start
            cur_stop = self.data[i].stop
            if cur_first == last:
                # This interval is contiguous with the previous one
                last = cur_last
                stop = cur_stop
            else:
                # There is a gap
                propose.append((start, stop, first, last))
                first = cur_first
                last = cur_last
                start = cur_start
                stop = cur_stop
        propose.append((start, stop, first, last))
        if len(propose) < len(self.data):
            # Need to update
            self.data = np.array(propose, dtype=interval_dtype).view(np.recarray)

    def __invert__(self):
        if len(self.data) == 0:
            return
        neg = list()
        # Handle range before first interval
        if not np.isclose(self.timestamps[0], self.data[0].start, rtol=1e-12):
            neg.append((self.timestamps[0], self.data[0].start, 0, self.data[0].first))
        for i in range(len(self.data) - 1):
            # Handle gaps between intervals
            cur_last = self.data[i].last
            cur_stop = self.data[i].stop
            next_first = self.data[i + 1].first
            next_start = self.data[i + 1].start
            if next_first != cur_last + 1:
                # There are some samples in between
                neg.append((cur_stop, next_start, cur_last, next_first))
        # Handle range after last interval
        if not np.isclose(self.timestamps[-1], self.data[-1].stop, rtol=1e-12):
            neg.append(
                (
                    self.data[-1].stop,
                    self.timestamps[-1],
                    self.data[-1].last,
                    len(self.timestamps),
                )
            )
        return IntervalList(
            self.timestamps,
            intervals=np.array(neg, dtype=interval_dtype).view(np.recarray),
        )

    def __and__(self, other):
        if len(self.timestamps) != len(other.timestamps):
            raise RuntimeError(
                "Cannot do AND operation on intervals with different timestamps"
            )
        if not np.isclose(self.timestamps[0], other.timestamps[0], rtol=1e-12) \
           or not np.isclose(self.timestamps[-1], other.timestamps[-1], rtol=1e-12):
            raise RuntimeError(
                "Cannot do AND operation on intervals with different timestamps"
            )
        if len(self.data) == 0 or len(other) == 0:
            return IntervalList(self.timestamps)
        result = list()
        curself = 0
        curother = 0

        # Walk both sequences, building up the intersection.
        while (curself < len(self.data)) and (curother < len(other)):
            start = max(self.data[curself].start, other[curother].start)
            stop = min(self.data[curself].stop, other[curother].stop)
            if start < stop:
                low = max(self.data[curself].first, other[curother].first)
                high = min(self.data[curself].last, other[curother].last)
                result.append((start, stop, low, high))
            if self.data[curself].stop < other[curother].stop:
                curself += 1
            else:
                curother += 1

        return IntervalList(
            self.timestamps,
            intervals=np.array(result, dtype=interval_dtype).view(np.recarray),
        )

    def __or__(self, other):
        if len(self.timestamps) != len(other.timestamps):
            raise RuntimeError(
                "Cannot do OR operation on intervals with different timestamps"
            )
        if not np.isclose(self.timestamps[0], other.timestamps[0], rtol=1e-12) \
           or not np.isclose(self.timestamps[-1], other.timestamps[-1], rtol=1e-12):
            raise RuntimeError(
                "Cannot do OR operation on intervals with different timestamps"
            )
        if len(self.data) == 0:
            return IntervalList(self.timestamps, intervals=other.data)
        elif len(other) == 0:
            return IntervalList(self.timestamps, intervals=self.data)

        result = list()
        res_first = None
        res_last = None
        res_start = None
        res_stop = None
        curself = 0
        curother = 0

        # Walk both sequences.
        done_self = False
        done_other = False
        while (not done_self) or (not done_other):
            next_ = None
            if done_self:
                next_ = other[curother]
                curother += 1
            elif done_other:
                next_ = self.data[curself]
                curself += 1
            else:
                if self.data[curself].first < other[curother].first:
                    next_ = self.data[curself]
                    curself += 1
                else:
                    next_ = other[curother]
                    curother += 1
            if curself >= len(self.data):
                done_self = True
            if curother >= len(other):
                done_other = True

            if res_first is None:
                res_first = next_.first
                res_last = next_.last
                res_start = next_.start
                res_stop = next_.stop
            else:
                # We use '<' here instead of '<=', so that intervals which are next to
                # each other (but not overlapping) are not combined.  If the combination
                # is desired, the simplify() method can be used.
                if next_.first < res_last:
                    # We overlap last interval
                    if next_.last > res_last:
                        # This interval extends beyond the last interval
                        res_last = next_.last
                        res_stop = next_.stop
                else:
                    # We have a break, close out previous interval and start a new one
                    result.append(
                        (
                            res_start,
                            res_stop,
                            res_first,
                            res_last,
                        )
                    )
                    res_first = next_.first
                    res_last = next_.last
                    res_start = next_.start
                    res_stop = next_.stop
        # Close out final interval
        result.append((res_start, res_stop, res_first, res_last))

        return IntervalList(
            self.timestamps,
            intervals=np.array(result, dtype=interval_dtype).view(np.recarray),
        )

    def _accel_exists(self):
        if use_accel_omp:
            return accel_data_present(self.data, self._accel_name)
        elif use_accel_jax:
            # specialised for the INTERVALS_JAX dtype
            return isinstance(self.data, INTERVALS_JAX)
        else:
            return False

    def _accel_create(self):
        if use_accel_omp:
            self.data = accel_data_create(self.data, self._accel_name)
        elif use_accel_jax:
            # specialised for the INTERVALS_JAX dtype
            # NOTE: this call is timed at the INTERVALS_JAX level
            self.data = INTERVALS_JAX(self.data)

    def _accel_update_device(self):
        if use_accel_omp:
            self.data = accel_data_update_device(self.data, self._accel_name)
        elif use_accel_jax:
            # specialised for the INTERVALS_JAX dtype
            # NOTE: this call is timed at the INTERVALS_JAX level
            self.data = INTERVALS_JAX(self.data)

    def _accel_update_host(self):
        if use_accel_omp:
            self.data = accel_data_update_host(self.data, self._accel_name)
        elif use_accel_jax:
            # specialised for the INTERVALS_JAX dtype
            # this moves the data back into a numpy array
            # NOTE: this call is timed at the INTERVALS_JAX level
            self.data = self.data.to_host()

    def _accel_delete(self):
        if use_accel_omp:
            self.data = accel_data_delete(self.data, self._accel_name)
        elif use_accel_jax and self._accel_exists():
            # Ensures data has been properly reset
            # if we observe that its type is still a GPU type
            # does NOT move data back from GPU
            self.data = self.data.host_data

ObsMat

Bases: object

Observation Matrix class

Source code in toast/ops/obsmat.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class ObsMat(object):
    """Observation Matrix class"""

    def __init__(self, filename=None):
        self.filename = filename
        self.matrix = None
        self.load()
        return

    @function_timer
    def load(self, filename=None):
        if filename is not None:
            self.filename = filename
        if self.filename is None:
            self.matrix = None
            self.nnz = 0
            self.nrow, self.ncol = 0, 0
            return
        self.matrix = scipy.sparse.load_npz(self.filename)
        self.nnz = self.matrix.nnz
        if self.nnz < 0:
            msg = f"Overflow in {self.filename}: nnz = {self.nnz}"
            raise RuntimeError(msg)
        self.nrow, self.ncol = self.matrix.shape
        return

    @function_timer
    def apply(self, map_in):
        nmap, npix = np.atleast_2d(map_in).shape
        npixtot = np.prod(map_in.shape)
        if npixtot != self.ncol:
            msg = f"Map is incompatible with the observation matrix. "
            msg += f"shape(matrix) = {self.matrix.shape}, shape(map) = {map_in.shape}"
            raise RuntimeError(msg)
        map_out = self.matrix.dot(map_in.ravel())
        if nmap != 1:
            map_out = map_out.reshape([nmap, -1])
        return map_out

    def sort_indices(self):
        self.matrix.sort_indices()

    @property
    def data(self):
        return self.matrix.data

    def __iadd__(self, other):
        if hasattr(other, "matrix"):
            self.matrix += other.matrix
        else:
            self.matrix += other
        return self

    def __imul__(self, other):
        if hasattr(other, "matrix"):
            self.matrix *= other.matrix
        else:
            self.matrix *= other
        return self

Observation

Bases: MutableMapping

Class representing the data for one observation.

An Observation stores information about data distribution across one or more MPI processes and is a container for four types of objects:

* Local detector data (unique to each process).
* Shared data that has one common copy for every node spanned by the
  observation.
* Intervals defining spans of data with some common characteristic.
* Other arbitrary small metadata.

Small metadata can be stored directly in the Observation using normal square bracket "[]" access to elements (an Observation is a dictionary). Groups of detector data (e.g. "signal", "flags", etc) can be accessed in the separate detector data dictionary (the "detdata" attribute). Shared data can be similarly stored in the "shared" attribute. Lists of intervals are accessed in the "intervals" attribute and data views can use any interval list to access subsets of detector and shared data.

Notes on distributed use with MPI

The detector data within an Observation is distributed among the processes in an MPI communicator. The processes in the communicator are arranged in a rectangular grid, with each process storing some number of detectors for a piece of time covered by the observation. The most common configuration (and the default) is to make this grid the size of the communicator in the "detector direction" and a size of one in the "sample direction"::

MPI           det1  sample(0), sample(1), sample(2), ...., sample(N-1)
rank 0        det2  sample(0), sample(1), sample(2), ...., sample(N-1)
----------------------------------------------------------------------
MPI           det3  sample(0), sample(1), sample(2), ...., sample(N-1)
rank 1        det4  sample(0), sample(1), sample(2), ...., sample(N-1)

So each process has a subset of detectors for the whole span of the observation time. You can override this shape by setting the process_rows to something else. For example, process_rows=1 would result in this::

MPI rank 0                        |        MPI rank 1
----------------------------------+----------------------------
det1  sample(0), sample(1), ...,  |  ...., sample(N-1)
det2  sample(0), sample(1), ...,  |  ...., sample(N-1)
det3  sample(0), sample(1), ...,  |  ...., sample(N-1)
det4  sample(0), sample(1), ...,  |  ...., sample(N-1)

Parameters:

Name Type Description Default
comm Comm

The toast communicator containing information about the process group for this observation.

required
telescope Telescope

An instance of a Telescope object.

required
n_samples int

The total number of samples for this observation.

required
name str

(Optional) The observation name.

None
uid int

(Optional) The Unique ID for this observation. If not specified, the UID will be computed from a hash of the name.

None
session Session

The observing session that this observation is contained in or None.

None
detector_sets list

(Optional) List of lists containing detector names. These discrete detector sets are used to distribute detectors- a detector set will always be within a single row of the process grid. If None, every detector is a set of one.

None
sample_sets list

(Optional) List of lists of chunk sizes (integer numbers of samples). These discrete sample sets are used to distribute sample data. A sample set will always be within a single column of the process grid. If None, any distribution break in the sample direction will happen at an arbitrary place. The sum of all chunks must equal the total number of samples.

None
process_rows int

(Optional) The size of the rectangular process grid in the detector direction. This number must evenly divide into the size of comm. If not specified, defaults to the size of the communicator.

None
Source code in toast/observation.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
class Observation(MutableMapping):
    """Class representing the data for one observation.

    An Observation stores information about data distribution across one or more MPI
    processes and is a container for four types of objects:

        * Local detector data (unique to each process).
        * Shared data that has one common copy for every node spanned by the
          observation.
        * Intervals defining spans of data with some common characteristic.
        * Other arbitrary small metadata.

    Small metadata can be stored directly in the Observation using normal square
    bracket "[]" access to elements (an Observation is a dictionary).  Groups of
    detector data (e.g. "signal", "flags", etc) can be accessed in the separate
    detector data dictionary (the "detdata" attribute).  Shared data can be similarly
    stored in the "shared" attribute.  Lists of intervals are accessed in the
    "intervals" attribute and data views can use any interval list to access subsets
    of detector and shared data.

    **Notes on distributed use with MPI**

    The detector data within an Observation is distributed among the processes in an
    MPI communicator.  The processes in the communicator are arranged in a rectangular
    grid, with each process storing some number of detectors for a piece of time
    covered by the observation.  The most common configuration (and the default) is to
    make this grid the size of the communicator in the "detector direction" and a size
    of one in the "sample direction"::

        MPI           det1  sample(0), sample(1), sample(2), ...., sample(N-1)
        rank 0        det2  sample(0), sample(1), sample(2), ...., sample(N-1)
        ----------------------------------------------------------------------
        MPI           det3  sample(0), sample(1), sample(2), ...., sample(N-1)
        rank 1        det4  sample(0), sample(1), sample(2), ...., sample(N-1)

    So each process has a subset of detectors for the whole span of the observation
    time.  You can override this shape by setting the process_rows to something
    else.  For example, process_rows=1 would result in this::

        MPI rank 0                        |        MPI rank 1
        ----------------------------------+----------------------------
        det1  sample(0), sample(1), ...,  |  ...., sample(N-1)
        det2  sample(0), sample(1), ...,  |  ...., sample(N-1)
        det3  sample(0), sample(1), ...,  |  ...., sample(N-1)
        det4  sample(0), sample(1), ...,  |  ...., sample(N-1)


    Args:
        comm (toast.Comm):  The toast communicator containing information about the
            process group for this observation.
        telescope (Telescope):  An instance of a Telescope object.
        n_samples (int):  The total number of samples for this observation.
        name (str):  (Optional) The observation name.
        uid (int):  (Optional) The Unique ID for this observation.  If not specified,
            the UID will be computed from a hash of the name.
        session (Session):  The observing session that this observation is contained
            in or None.
        detector_sets (list):  (Optional) List of lists containing detector names.
            These discrete detector sets are used to distribute detectors- a detector
            set will always be within a single row of the process grid.  If None,
            every detector is a set of one.
        sample_sets (list):  (Optional) List of lists of chunk sizes (integer numbers of
            samples).  These discrete sample sets are used to distribute sample data.
            A sample set will always be within a single column of the process grid.  If
            None, any distribution break in the sample direction will happen at an
            arbitrary place.  The sum of all chunks must equal the total number of
            samples.
        process_rows (int):  (Optional) The size of the rectangular process grid
            in the detector direction.  This number must evenly divide into the size of
            comm.  If not specified, defaults to the size of the communicator.

    """

    view = ViewInterface()

    @function_timer
    def __init__(
        self,
        comm,
        telescope,
        n_samples,
        name=None,
        uid=None,
        session=None,
        detector_sets=None,
        sample_sets=None,
        process_rows=None,
    ):
        log = Logger.get()
        self._telescope = telescope
        self._name = name
        self._uid = uid
        self._session = session

        if self._uid is None and self._name is not None:
            self._uid = name_UID(self._name)

        if self._session is None:
            if self._name is not None:
                self._session = Session(
                    name=self._name,
                    uid=self._uid,
                    start=None,
                    end=None,
                )
        elif not isinstance(self._session, Session):
            raise RuntimeError("session should be a Session instance or None")

        self.dist = DistDetSamp(
            n_samples,
            self._telescope.focalplane.detectors,
            sample_sets,
            detector_sets,
            comm,
            process_rows,
        )

        # The internal metadata dictionary
        self._internal = dict()

        # Set up the data managers
        self.detdata = DetDataManager(self.dist)
        self.shared = SharedDataManager(self.dist)
        self.intervals = IntervalsManager(self.dist, n_samples)

        # Set up local per-detector cutting
        self._detflags = {x: int(0) for x in self.dist.dets[self.dist.comm.group_rank]}

    # Fully clear the observation

    def clear(self):
        self.view.clear()
        self.intervals.clear()
        self.detdata.clear()
        self.shared.clear()
        self._internal.clear()

    # General properties

    @property
    def telescope(self):
        """
        (Telescope):  The Telescope instance for this observation.
        """
        return self._telescope

    @property
    def name(self):
        """
        (str):  The name of the observation.
        """
        return self._name

    @property
    def uid(self):
        """
        (int):  The Unique ID for this observation.
        """
        return self._uid

    @property
    def session(self):
        """
        (Session):  The Session instance for this observation.
        """
        return self._session

    @property
    def comm(self):
        """
        (toast.Comm):  The overall communicator.
        """
        return self.dist.comm

    # The MPI communicator along the current row of the process grid

    @property
    def comm_row(self):
        """
        (mpi4py.MPI.Comm):  The communicator for processes in the same row (or None).
        """
        return self.dist.comm_row

    @property
    def comm_row_size(self):
        """
        (int): The number of processes in the row communicator.
        """
        return self.dist.comm_row_size

    @property
    def comm_row_rank(self):
        """
        (int): The rank of this process in the row communicator.
        """
        return self.dist.comm_row_rank

    # The MPI communicator along the current column of the process grid

    @property
    def comm_col(self):
        """
        (mpi4py.MPI.Comm):  The communicator for processes in the same column (or None).
        """
        return self.dist.comm_col

    @property
    def comm_col_size(self):
        """
        (int): The number of processes in the column communicator.
        """
        return self.dist.comm_col_size

    @property
    def comm_col_rank(self):
        """
        (int): The rank of this process in the column communicator.
        """
        return self.dist.comm_col_rank

    # Detector distribution

    @property
    def all_detectors(self):
        """
        (list): All detectors stored in this observation.
        """
        return self.dist.detectors

    @property
    def local_detectors(self):
        """
        (list): The detectors assigned to this process.
        """
        return self.dist.dets[self.dist.comm.group_rank]

    @property
    def local_detector_flags(self):
        """(dict): The local per-detector flags"""
        return self._detflags

    def update_local_detector_flags(self, vals):
        """Update the per-detector flagging.

        This does a bitwise OR with the existing flag values.

        Args:
            vals (dict):  The flag values for one or more detectors.

        Returns:
            None

        """
        ldets = set(self.local_detectors)
        for k, v in vals.items():
            if k not in ldets:
                msg = f"Cannot update per-detector flag for '{k}', which is"
                msg += " not a local detector"
                raise RuntimeError(msg)
            self._detflags[k] |= int(v)

    def set_local_detector_flags(self, vals):
        """Set the per-detector flagging.

        This resets the per-detector flags to the specified values.

        Args:
            vals (dict):  The flag values for one or more detectors.

        Returns:
            None

        """
        ldets = set(self.local_detectors)
        for k, v in vals.items():
            if k not in ldets:
                msg = f"Cannot set per-detector flag for '{k}', which is"
                msg += " not a local detector"
                raise RuntimeError(msg)
            self._detflags[k] = int(v)

    def select_local_detectors(
        self,
        selection=None,
        flagmask=0,
    ):
        """Get the local detectors assigned to this process.

        This takes the full list of local detectors and optionally prunes them
        by the specified selection and / or applies per-detector flags with
        the given mask.

        Args:
            selection (list):  Only return detectors in this set.
            flagmask (uint8):  Apply this mask to per-detector flags and only
                include detectors with a result of zero (good).

        Returns:
            (list):  The selected detectors.

        """
        if flagmask is None:
            good = set(self.local_detectors)
        else:
            good = set(
                [
                    x
                    for x in self.local_detectors
                    if (self.local_detector_flags[x] & flagmask) == 0
                ]
            )
        dets = list()
        if selection is None:
            for det in self.local_detectors:
                if det in good:
                    dets.append(det)
        else:
            sel_set = set(selection)
            for det in self.local_detectors:
                if (det in sel_set) and (det in good):
                    dets.append(det)
        # print(f"SELECT mask {int(flagmask)} {selection}: {dets}", flush=True)
        return dets

    # Detector set distribution

    @property
    def all_detector_sets(self):
        """
        (list):  The total list of detector sets for this observation.
        """
        return self.dist.detector_sets

    @property
    def local_detector_sets(self):
        """
        (list):  The detector sets assigned to this process (or None).
        """
        if self.dist.detector_sets is None:
            return None
        else:
            ds = list()
            for d in range(self.dist.det_sets[self.dist.comm.group_rank].n_elem):
                off = self.dist.det_sets[self.dist.comm.group_rank].offset
                ds.append(self.dist.detector_sets[off + d])
            return ds

    # Sample distribution

    @property
    def n_all_samples(self):
        """(int): the total number of samples in this observation."""
        return self.dist.samples

    @property
    def local_index_offset(self):
        """
        The first sample on this process, relative to the observation start.
        """
        return self.dist.samps[self.dist.comm.group_rank].offset

    @property
    def n_local_samples(self):
        """
        The number of local samples on this process.
        """
        return self.dist.samps[self.dist.comm.group_rank].n_elem

    # Sample set distribution

    @property
    def all_sample_sets(self):
        """
        (list):  The input full list of sample sets used in data distribution
        """
        return self.dist.sample_sets

    @property
    def local_sample_sets(self):
        """
        (list):  The sample sets assigned to this process (or None).
        """
        if self.dist.sample_sets is None:
            return None
        else:
            ss = list()
            for s in range(self.dist.samp_sets[self.dist.comm.group_rank].n_elem):
                off = self.dist.samp_sets[self.dist.comm.group_rank].offset
                ss.append(self.dist.sample_sets[off + s])
            return ss

    # Helper methods to check for the 2 most common cases, where data is
    # distributed either fully by detector or fully by sample.  Note that if there
    # is only one process then both conditions are true.

    @property
    def is_distributed_by_sample(self):
        if self.dist.comm_row_size == self.dist.comm.group_size:
            return True
        else:
            return False

    @property
    def is_distributed_by_detector(self):
        if self.dist.comm_col_size == self.dist.comm.group_size:
            return True
        else:
            return False

    # Mapping methods

    def __getitem__(self, key):
        return self._internal[key]

    def __delitem__(self, key):
        del self._internal[key]

    def __setitem__(self, key, value):
        self._internal[key] = value

    def __iter__(self):
        return iter(self._internal)

    def __len__(self):
        return len(self._internal)

    def __del__(self):
        if hasattr(self, "detdata"):
            self.detdata.clear()
        if hasattr(self, "shared"):
            self.shared.clear()

    def __repr__(self):
        val = "<Observation"
        val += f"\n  name = '{self.name}'"
        val += f"\n  uid = '{self.uid}'"
        if self.comm.comm_group is None:
            val += "  group has a single process (no MPI)"
        else:
            val += f"  group has {self.comm.group_size} processes"
        val += f"\n  telescope = {self._telescope.__repr__()}"
        val += f"\n  session = {self._session.__repr__()}"
        for k, v in self._internal.items():
            val += f"\n  {k} = {v}"
        val += f"\n  {self.n_all_samples} total samples ({self.n_local_samples} local)"
        val += f"\n  shared:  {self.shared}"
        val += f"\n  detdata:  {self.detdata}"
        val += f"\n  intervals:  {self.intervals}"
        val += "\n>"
        return val

    def __eq__(self, other):
        # Note that testing for equality is quite expensive, since it means testing all
        # metadata and also all detector, shared, and interval data.  This is mainly
        # used for unit tests.
        log = Logger.get()
        fail = 0
        if self.name != other.name:
            fail = 1
            log.verbose(
                f"Proc {self.comm.world_rank}:  Obs names {self.name} != {other.name}"
            )
        if self.uid != other.uid:
            fail = 1
            log.verbose(
                f"Proc {self.comm.world_rank}:  Obs uid {self.uid} != {other.uid}"
            )
        if self.telescope != other.telescope:
            fail = 1
            log.verbose(f"Proc {self.comm.world_rank}:  Obs telescopes not equal")
        if self.session != other.session:
            fail = 1
            log.verbose(f"Proc {self.comm.world_rank}:  Obs sessions not equal")
        if self.dist != other.dist:
            fail = 1
            log.verbose(f"Proc {self.comm.world_rank}:  Obs distributions not equal")
        if set(self._internal.keys()) != set(other._internal.keys()):
            fail = 1
            log.verbose(f"Proc {self.comm.world_rank}:  Obs metadata keys not equal")
        for k, v in self._internal.items():
            if v != other._internal[k]:
                feq = True
                try:
                    feq = np.allclose(v, other._internal[k])
                except Exception:
                    # Not floating point data
                    feq = False
                if not feq:
                    fail = 1
                    log.verbose(
                        f"Proc {self.comm.world_rank}:  Obs metadata[{k}]:  {v} != {other[k]}"
                    )
                    break
        if self.shared != other.shared:
            fail = 1
            log.verbose(f"Proc {self.comm.world_rank}:  Obs shared data not equal")
        if self.detdata != other.detdata:
            fail = 1
            log.verbose(f"Proc {self.comm.world_rank}:  Obs detdata not equal")
        if self.intervals != other.intervals:
            fail = 1
            log.verbose(f"Proc {self.comm.world_rank}:  Obs intervals not equal")
        if self.comm.comm_group is not None:
            fail = self.comm.comm_group.allreduce(fail, op=MPI.SUM)
        return fail == 0

    def __ne__(self, other):
        return not self.__eq__(other)

    def duplicate(
        self, times=None, meta=None, shared=None, detdata=None, intervals=None
    ):
        """Return a copy of the observation and all its data.

        The times field should be the name of the shared field containing timestamps.
        This is used when copying interval lists to the new observation so that these
        objects reference the timestamps within this observation (rather than the old
        one).  If this is not specified and some intervals exist, then an exception is
        raised.

        The meta, shared, detdata, and intervals list specifies which of those objects
        to copy to the new observation.  If these are None, then all objects are
        duplicated.

        Args:
            times (str):  The name of the timestamps shared field.
            meta (list):  List of metadata objects to copy, or None.
            shared (list):  List of shared objects to copy, or None.
            detdata (list):  List of detdata objects to copy, or None.
            intervals (list):  List of intervals objects to copy, or None.

        Returns:
            (Observation):  The new copy of the observation.

        """
        log = Logger.get()
        if times is None and len(self.intervals) > 0:
            msg = "You must specify the times field when duplicating observations "
            msg += "that have some intervals defined."
            log.error(msg)
            raise RuntimeError(msg)
        new_obs = Observation(
            self.dist.comm,
            self.telescope,
            self.n_all_samples,
            name=self.name,
            uid=self.uid,
            session=self.session,
            detector_sets=self.all_detector_sets,
            sample_sets=self.all_sample_sets,
            process_rows=self.dist.process_rows,
        )
        new_obs.set_local_detector_flags(self.local_detector_flags)
        for k, v in self._internal.items():
            if meta is None or k in meta:
                new_obs[k] = copy.deepcopy(v)
        for name, data in self.detdata.items():
            if detdata is None or name in detdata:
                new_obs.detdata[name] = data
        copy_shared = list()
        if times is not None:
            copy_shared.append(times)
        if shared is not None:
            copy_shared.extend(shared)
        for name, data in self.shared.items():
            if shared is None or name in copy_shared:
                # Create the object on the corresponding communicator in the new obs
                new_obs.shared.assign_mpishared(name, data, self.shared.comm_type(name))
        for name, data in self.intervals.items():
            if intervals is None or name in intervals:
                timespans = [(x.start, x.stop) for x in data]
                new_obs.intervals[name] = IntervalList(
                    new_obs.shared[times], timespans=timespans
                )
        return new_obs

    def memory_use(self):
        """Estimate the memory used by shared and detector data.

        This sums the memory used by the shared and detdata attributes and returns the
        total on all processes.  This function is blocking on the observation
        communicator.

        Returns:
            (int):  The number of bytes of memory used by timestream data.

        """
        # Get local memory from detector data
        local_mem = self.detdata.memory_use()

        # If there are many intervals, this could take up non-trivial space.  Add them
        # to the local total
        for iname, it in self.intervals.items():
            if len(it) > 0:
                local_mem += len(it) * interval_dtype.itemsize

        # Sum the aggregate local memory
        total = None
        if self.comm.comm_group is None:
            total = local_mem
        else:
            total = self.comm.comm_group.allreduce(local_mem, op=MPI.SUM)

        # The total shared memory use is already returned on every process by this
        # next function.
        total += self.shared.memory_use()
        return total

    # Redistribution

    @function_timer
    def redistribute(
        self,
        process_rows,
        times=None,
        override_sample_sets=False,
        override_detector_sets=False,
        return_global_intervals=False,
    ):
        """Take the currently allocated observation and redistribute in place.

        This changes the data distribution within the observation.  After
        re-assigning all detectors and samples, the currently allocated shared data
        objects and detector data objects are redistributed using the observation
        communicator.

        Args:
            process_rows (int):  The size of the new process grid in the detector
                direction.  This number must evenly divide into the size of the
                observation communicator.
            times (str):  The shared data field representing the timestamps.  This
                is used to recompute the intervals after redistribution.
            override_sample_sets (False, None or list):  If not False, override
                existing sample set boundaries in the redistributed data.
            override_detector_sets (False, None or list):  If not False, override
                existing detector set boundaries in the redistributed data.
            return_global_intervals (bool):  Return a list of global intervals for
                reference

        Returns:
            None or global_intervals

        """
        log = Logger.get()
        if process_rows == self.dist.process_rows:
            # Nothing to do!
            return

        if override_sample_sets == False:
            sample_sets = self.dist.sample_sets
        else:
            sample_sets = override_sample_sets

        if override_detector_sets == False:
            detector_sets = self.dist.detector_sets
        else:
            detector_sets = override_detector_sets

        # Get the total set of per-detector flags
        if self.comm_col_size == 1:
            all_det_flags = self.local_detector_flags
        else:
            pdflags = self.comm_col.gather(self.local_detector_flags, root=0)
            all_det_flags = None
            if self.comm_col_rank == 0:
                all_det_flags = dict()
                for pf in pdflags:
                    all_det_flags.update(pf)
            all_det_flags = self.comm_col.bcast(all_det_flags, root=0)

        # Create the new distribution
        new_dist = DistDetSamp(
            self.dist.samples,
            self._telescope.focalplane.detectors,
            sample_sets,
            detector_sets,
            self.dist.comm,
            process_rows,
        )

        # Do the actual redistribution
        new_shr_manager, new_det_manager, new_intervals_manager, global_intervals = (
            redistribute_data(
                self.dist,
                new_dist,
                self.shared,
                self.detdata,
                self.intervals,
                times=times,
                dbg=self.name,
            )
        )

        # Redistribute any metadata objects that support it.
        for k, v in self._internal.items():
            if hasattr(v, "redistribute"):
                v.redistribute(self.dist, new_dist)

        # Replace our distribution and data managers with the new ones.
        del self.dist
        self.dist = new_dist

        self.shared.clear()
        del self.shared
        self.shared = new_shr_manager

        self.detdata.clear()
        del self.detdata
        self.detdata = new_det_manager

        self.intervals.clear()
        del self.intervals
        self.intervals = new_intervals_manager

        # Restore detector flags for our new local detectors
        self._detflags = {x: int(0) for x in self.dist.dets[self.dist.comm.group_rank]}
        self.set_local_detector_flags(
            {x: all_det_flags[x] for x in self.local_detectors}
        )

        if return_global_intervals:
            global_intervals = self.dist.comm.comm_group.bcast(global_intervals)
            return global_intervals
        else:
            return

    # Accelerator use

    def accel_create(self, names):
        """Create a set of data objects on the device.

        This takes a dictionary with the same format as those used by the Operator
        provides() and requires() methods.

        Args:
            names (dict):  Dictionary of lists.

        Returns:
            None

        """
        for key in names["detdata"]:
            self.detdata.accel_create(key)
        for key in names["shared"]:
            self.shared.accel_create(key)
        for key in names["intervals"]:
            self.intervals.accel_create(key)
        for key, val in self._internal.items():
            if isinstance(val, AcceleratorObject):
                if not val.accel_exists():
                    val.accel_create()

    def accel_update_device(self, names):
        """Copy data objects to the device.

        This takes a dictionary with the same format as those used by the Operator
        provides() and requires() methods.

        Args:
            names (dict):  Dictionary of lists.

        Returns:
            None

        """
        for key in names["detdata"]:
            self.detdata.accel_update_device(key)
        for key in names["shared"]:
            self.shared.accel_update_device(key)
        for key in names["intervals"]:
            self.intervals.accel_update_device(key)
        for key, val in self._internal.items():
            if isinstance(val, AcceleratorObject):
                if not val.accel_in_use():
                    val.accel_update_device()

    def accel_update_host(self, names):
        """Copy data objects from the device.

        This takes a dictionary with the same format as those used by the Operator
        provides() and requires() methods.

        Args:
            names (dict):  Dictionary of lists.

        Returns:
            None

        """
        for key in names["detdata"]:
            self.detdata.accel_update_host(key)
        for key in names["shared"]:
            self.shared.accel_update_host(key)
        for key in names["intervals"]:
            self.intervals.accel_update_host(key)
        for key, val in self._internal.items():
            if isinstance(val, AcceleratorObject):
                if val.accel_in_use():
                    val.accel_update_host()

    def accel_clear(self):
        self.detdata.accel_clear()
        self.shared.accel_clear()
        self.intervals.accel_clear()
        for key, val in self._internal.items():
            if isinstance(val, AcceleratorObject):
                if val.accel_exists():
                    val.accel_delete()

all_detector_sets property

(list): The total list of detector sets for this observation.

all_detectors property

(list): All detectors stored in this observation.

all_sample_sets property

(list): The input full list of sample sets used in data distribution

comm property

(toast.Comm): The overall communicator.

comm_col property

(mpi4py.MPI.Comm): The communicator for processes in the same column (or None).

comm_col_rank property

(int): The rank of this process in the column communicator.

comm_col_size property

(int): The number of processes in the column communicator.

comm_row property

(mpi4py.MPI.Comm): The communicator for processes in the same row (or None).

comm_row_rank property

(int): The rank of this process in the row communicator.

comm_row_size property

(int): The number of processes in the row communicator.

local_detector_flags property

(dict): The local per-detector flags

local_detector_sets property

(list): The detector sets assigned to this process (or None).

local_detectors property

(list): The detectors assigned to this process.

local_index_offset property

The first sample on this process, relative to the observation start.

local_sample_sets property

(list): The sample sets assigned to this process (or None).

n_all_samples property

(int): the total number of samples in this observation.

n_local_samples property

The number of local samples on this process.

name property

(str): The name of the observation.

session property

(Session): The Session instance for this observation.

telescope property

(Telescope): The Telescope instance for this observation.

uid property

(int): The Unique ID for this observation.

accel_create(names)

Create a set of data objects on the device.

This takes a dictionary with the same format as those used by the Operator provides() and requires() methods.

Parameters:

Name Type Description Default
names dict

Dictionary of lists.

required

Returns:

Type Description

None

Source code in toast/observation.py
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
def accel_create(self, names):
    """Create a set of data objects on the device.

    This takes a dictionary with the same format as those used by the Operator
    provides() and requires() methods.

    Args:
        names (dict):  Dictionary of lists.

    Returns:
        None

    """
    for key in names["detdata"]:
        self.detdata.accel_create(key)
    for key in names["shared"]:
        self.shared.accel_create(key)
    for key in names["intervals"]:
        self.intervals.accel_create(key)
    for key, val in self._internal.items():
        if isinstance(val, AcceleratorObject):
            if not val.accel_exists():
                val.accel_create()

accel_update_device(names)

Copy data objects to the device.

This takes a dictionary with the same format as those used by the Operator provides() and requires() methods.

Parameters:

Name Type Description Default
names dict

Dictionary of lists.

required

Returns:

Type Description

None

Source code in toast/observation.py
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
def accel_update_device(self, names):
    """Copy data objects to the device.

    This takes a dictionary with the same format as those used by the Operator
    provides() and requires() methods.

    Args:
        names (dict):  Dictionary of lists.

    Returns:
        None

    """
    for key in names["detdata"]:
        self.detdata.accel_update_device(key)
    for key in names["shared"]:
        self.shared.accel_update_device(key)
    for key in names["intervals"]:
        self.intervals.accel_update_device(key)
    for key, val in self._internal.items():
        if isinstance(val, AcceleratorObject):
            if not val.accel_in_use():
                val.accel_update_device()

accel_update_host(names)

Copy data objects from the device.

This takes a dictionary with the same format as those used by the Operator provides() and requires() methods.

Parameters:

Name Type Description Default
names dict

Dictionary of lists.

required

Returns:

Type Description

None

Source code in toast/observation.py
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
def accel_update_host(self, names):
    """Copy data objects from the device.

    This takes a dictionary with the same format as those used by the Operator
    provides() and requires() methods.

    Args:
        names (dict):  Dictionary of lists.

    Returns:
        None

    """
    for key in names["detdata"]:
        self.detdata.accel_update_host(key)
    for key in names["shared"]:
        self.shared.accel_update_host(key)
    for key in names["intervals"]:
        self.intervals.accel_update_host(key)
    for key, val in self._internal.items():
        if isinstance(val, AcceleratorObject):
            if val.accel_in_use():
                val.accel_update_host()

duplicate(times=None, meta=None, shared=None, detdata=None, intervals=None)

Return a copy of the observation and all its data.

The times field should be the name of the shared field containing timestamps. This is used when copying interval lists to the new observation so that these objects reference the timestamps within this observation (rather than the old one). If this is not specified and some intervals exist, then an exception is raised.

The meta, shared, detdata, and intervals list specifies which of those objects to copy to the new observation. If these are None, then all objects are duplicated.

Parameters:

Name Type Description Default
times str

The name of the timestamps shared field.

None
meta list

List of metadata objects to copy, or None.

None
shared list

List of shared objects to copy, or None.

None
detdata list

List of detdata objects to copy, or None.

None
intervals list

List of intervals objects to copy, or None.

None

Returns:

Type Description
Observation

The new copy of the observation.

Source code in toast/observation.py
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
def duplicate(
    self, times=None, meta=None, shared=None, detdata=None, intervals=None
):
    """Return a copy of the observation and all its data.

    The times field should be the name of the shared field containing timestamps.
    This is used when copying interval lists to the new observation so that these
    objects reference the timestamps within this observation (rather than the old
    one).  If this is not specified and some intervals exist, then an exception is
    raised.

    The meta, shared, detdata, and intervals list specifies which of those objects
    to copy to the new observation.  If these are None, then all objects are
    duplicated.

    Args:
        times (str):  The name of the timestamps shared field.
        meta (list):  List of metadata objects to copy, or None.
        shared (list):  List of shared objects to copy, or None.
        detdata (list):  List of detdata objects to copy, or None.
        intervals (list):  List of intervals objects to copy, or None.

    Returns:
        (Observation):  The new copy of the observation.

    """
    log = Logger.get()
    if times is None and len(self.intervals) > 0:
        msg = "You must specify the times field when duplicating observations "
        msg += "that have some intervals defined."
        log.error(msg)
        raise RuntimeError(msg)
    new_obs = Observation(
        self.dist.comm,
        self.telescope,
        self.n_all_samples,
        name=self.name,
        uid=self.uid,
        session=self.session,
        detector_sets=self.all_detector_sets,
        sample_sets=self.all_sample_sets,
        process_rows=self.dist.process_rows,
    )
    new_obs.set_local_detector_flags(self.local_detector_flags)
    for k, v in self._internal.items():
        if meta is None or k in meta:
            new_obs[k] = copy.deepcopy(v)
    for name, data in self.detdata.items():
        if detdata is None or name in detdata:
            new_obs.detdata[name] = data
    copy_shared = list()
    if times is not None:
        copy_shared.append(times)
    if shared is not None:
        copy_shared.extend(shared)
    for name, data in self.shared.items():
        if shared is None or name in copy_shared:
            # Create the object on the corresponding communicator in the new obs
            new_obs.shared.assign_mpishared(name, data, self.shared.comm_type(name))
    for name, data in self.intervals.items():
        if intervals is None or name in intervals:
            timespans = [(x.start, x.stop) for x in data]
            new_obs.intervals[name] = IntervalList(
                new_obs.shared[times], timespans=timespans
            )
    return new_obs

memory_use()

Estimate the memory used by shared and detector data.

This sums the memory used by the shared and detdata attributes and returns the total on all processes. This function is blocking on the observation communicator.

Returns:

Type Description
int

The number of bytes of memory used by timestream data.

Source code in toast/observation.py
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
def memory_use(self):
    """Estimate the memory used by shared and detector data.

    This sums the memory used by the shared and detdata attributes and returns the
    total on all processes.  This function is blocking on the observation
    communicator.

    Returns:
        (int):  The number of bytes of memory used by timestream data.

    """
    # Get local memory from detector data
    local_mem = self.detdata.memory_use()

    # If there are many intervals, this could take up non-trivial space.  Add them
    # to the local total
    for iname, it in self.intervals.items():
        if len(it) > 0:
            local_mem += len(it) * interval_dtype.itemsize

    # Sum the aggregate local memory
    total = None
    if self.comm.comm_group is None:
        total = local_mem
    else:
        total = self.comm.comm_group.allreduce(local_mem, op=MPI.SUM)

    # The total shared memory use is already returned on every process by this
    # next function.
    total += self.shared.memory_use()
    return total

redistribute(process_rows, times=None, override_sample_sets=False, override_detector_sets=False, return_global_intervals=False)

Take the currently allocated observation and redistribute in place.

This changes the data distribution within the observation. After re-assigning all detectors and samples, the currently allocated shared data objects and detector data objects are redistributed using the observation communicator.

Parameters:

Name Type Description Default
process_rows int

The size of the new process grid in the detector direction. This number must evenly divide into the size of the observation communicator.

required
times str

The shared data field representing the timestamps. This is used to recompute the intervals after redistribution.

None
override_sample_sets (False, None or list)

If not False, override existing sample set boundaries in the redistributed data.

False
override_detector_sets (False, None or list)

If not False, override existing detector set boundaries in the redistributed data.

False
return_global_intervals bool

Return a list of global intervals for reference

False

Returns:

Type Description

None or global_intervals

Source code in toast/observation.py
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
@function_timer
def redistribute(
    self,
    process_rows,
    times=None,
    override_sample_sets=False,
    override_detector_sets=False,
    return_global_intervals=False,
):
    """Take the currently allocated observation and redistribute in place.

    This changes the data distribution within the observation.  After
    re-assigning all detectors and samples, the currently allocated shared data
    objects and detector data objects are redistributed using the observation
    communicator.

    Args:
        process_rows (int):  The size of the new process grid in the detector
            direction.  This number must evenly divide into the size of the
            observation communicator.
        times (str):  The shared data field representing the timestamps.  This
            is used to recompute the intervals after redistribution.
        override_sample_sets (False, None or list):  If not False, override
            existing sample set boundaries in the redistributed data.
        override_detector_sets (False, None or list):  If not False, override
            existing detector set boundaries in the redistributed data.
        return_global_intervals (bool):  Return a list of global intervals for
            reference

    Returns:
        None or global_intervals

    """
    log = Logger.get()
    if process_rows == self.dist.process_rows:
        # Nothing to do!
        return

    if override_sample_sets == False:
        sample_sets = self.dist.sample_sets
    else:
        sample_sets = override_sample_sets

    if override_detector_sets == False:
        detector_sets = self.dist.detector_sets
    else:
        detector_sets = override_detector_sets

    # Get the total set of per-detector flags
    if self.comm_col_size == 1:
        all_det_flags = self.local_detector_flags
    else:
        pdflags = self.comm_col.gather(self.local_detector_flags, root=0)
        all_det_flags = None
        if self.comm_col_rank == 0:
            all_det_flags = dict()
            for pf in pdflags:
                all_det_flags.update(pf)
        all_det_flags = self.comm_col.bcast(all_det_flags, root=0)

    # Create the new distribution
    new_dist = DistDetSamp(
        self.dist.samples,
        self._telescope.focalplane.detectors,
        sample_sets,
        detector_sets,
        self.dist.comm,
        process_rows,
    )

    # Do the actual redistribution
    new_shr_manager, new_det_manager, new_intervals_manager, global_intervals = (
        redistribute_data(
            self.dist,
            new_dist,
            self.shared,
            self.detdata,
            self.intervals,
            times=times,
            dbg=self.name,
        )
    )

    # Redistribute any metadata objects that support it.
    for k, v in self._internal.items():
        if hasattr(v, "redistribute"):
            v.redistribute(self.dist, new_dist)

    # Replace our distribution and data managers with the new ones.
    del self.dist
    self.dist = new_dist

    self.shared.clear()
    del self.shared
    self.shared = new_shr_manager

    self.detdata.clear()
    del self.detdata
    self.detdata = new_det_manager

    self.intervals.clear()
    del self.intervals
    self.intervals = new_intervals_manager

    # Restore detector flags for our new local detectors
    self._detflags = {x: int(0) for x in self.dist.dets[self.dist.comm.group_rank]}
    self.set_local_detector_flags(
        {x: all_det_flags[x] for x in self.local_detectors}
    )

    if return_global_intervals:
        global_intervals = self.dist.comm.comm_group.bcast(global_intervals)
        return global_intervals
    else:
        return

select_local_detectors(selection=None, flagmask=0)

Get the local detectors assigned to this process.

This takes the full list of local detectors and optionally prunes them by the specified selection and / or applies per-detector flags with the given mask.

Parameters:

Name Type Description Default
selection list

Only return detectors in this set.

None
flagmask uint8

Apply this mask to per-detector flags and only include detectors with a result of zero (good).

0

Returns:

Type Description
list

The selected detectors.

Source code in toast/observation.py
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
def select_local_detectors(
    self,
    selection=None,
    flagmask=0,
):
    """Get the local detectors assigned to this process.

    This takes the full list of local detectors and optionally prunes them
    by the specified selection and / or applies per-detector flags with
    the given mask.

    Args:
        selection (list):  Only return detectors in this set.
        flagmask (uint8):  Apply this mask to per-detector flags and only
            include detectors with a result of zero (good).

    Returns:
        (list):  The selected detectors.

    """
    if flagmask is None:
        good = set(self.local_detectors)
    else:
        good = set(
            [
                x
                for x in self.local_detectors
                if (self.local_detector_flags[x] & flagmask) == 0
            ]
        )
    dets = list()
    if selection is None:
        for det in self.local_detectors:
            if det in good:
                dets.append(det)
    else:
        sel_set = set(selection)
        for det in self.local_detectors:
            if (det in sel_set) and (det in good):
                dets.append(det)
    # print(f"SELECT mask {int(flagmask)} {selection}: {dets}", flush=True)
    return dets

set_local_detector_flags(vals)

Set the per-detector flagging.

This resets the per-detector flags to the specified values.

Parameters:

Name Type Description Default
vals dict

The flag values for one or more detectors.

required

Returns:

Type Description

None

Source code in toast/observation.py
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
def set_local_detector_flags(self, vals):
    """Set the per-detector flagging.

    This resets the per-detector flags to the specified values.

    Args:
        vals (dict):  The flag values for one or more detectors.

    Returns:
        None

    """
    ldets = set(self.local_detectors)
    for k, v in vals.items():
        if k not in ldets:
            msg = f"Cannot set per-detector flag for '{k}', which is"
            msg += " not a local detector"
            raise RuntimeError(msg)
        self._detflags[k] = int(v)

update_local_detector_flags(vals)

Update the per-detector flagging.

This does a bitwise OR with the existing flag values.

Parameters:

Name Type Description Default
vals dict

The flag values for one or more detectors.

required

Returns:

Type Description

None

Source code in toast/observation.py
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def update_local_detector_flags(self, vals):
    """Update the per-detector flagging.

    This does a bitwise OR with the existing flag values.

    Args:
        vals (dict):  The flag values for one or more detectors.

    Returns:
        None

    """
    ldets = set(self.local_detectors)
    for k, v in vals.items():
        if k not in ldets:
            msg = f"Cannot update per-detector flag for '{k}', which is"
            msg += " not a local detector"
            raise RuntimeError(msg)
        self._detflags[k] |= int(v)

PixelData

Bases: AcceleratorObject

Distributed map-domain data.

The distribution information is stored in a PixelDistribution instance passed to the constructor. Each process has local data stored in one or more "submaps".

Although multiple processes may have the same submap of data stored locally, only one process is considered the "owner". This ownership is used when serializing the data and when doing reductions in certain cases. Ownership can be set to either the lowest rank process which has the submap or to a balanced distribution.

Parameters:

Name Type Description Default
dist PixelDistribution

The distribution of submaps.

required
dtype dtype

A numpy-compatible dtype for each element of the data. The only supported types are 1, 2, 4, and 8 byte signed and unsigned integers, 4 and 8 byte floating point numbers, and 4 and 8 byte complex numbers.

required
n_value int

The number of values per pixel.

1
units Unit

The units of the map data.

dimensionless_unscaled
Source code in toast/pixels.py
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
class PixelData(AcceleratorObject):
    """Distributed map-domain data.

    The distribution information is stored in a PixelDistribution instance passed to
    the constructor.  Each process has local data stored in one or more "submaps".

    Although multiple processes may have the same submap of data stored locally, only
    one process is considered the "owner".  This ownership is used when serializing the
    data and when doing reductions in certain cases.  Ownership can be set to either
    the lowest rank process which has the submap or to a balanced distribution.

    Args:
        dist (PixelDistribution):  The distribution of submaps.
        dtype (numpy.dtype):  A numpy-compatible dtype for each element of the data.
            The only supported types are 1, 2, 4, and 8 byte signed and unsigned
            integers, 4 and 8 byte floating point numbers, and 4 and 8 byte complex
            numbers.
        n_value (int):  The number of values per pixel.
        units (Unit):  The units of the map data.

    """

    def __init__(self, dist, dtype, n_value=1, units=u.dimensionless_unscaled):
        super().__init__()
        log = Logger.get()

        self._dist = dist
        self._n_value = n_value
        self._units = units

        # construct a new dtype in case the parameter given is shortcut string
        ttype = np.dtype(dtype)

        self.storage_class = None
        if ttype.char == "b":
            self.storage_class = AlignedI8
        elif ttype.char == "B":
            self.storage_class = AlignedU8
        elif ttype.char == "h":
            self.storage_class = AlignedI16
        elif ttype.char == "H":
            self.storage_class = AlignedU16
        elif ttype.char == "i":
            self.storage_class = AlignedI32
        elif ttype.char == "I":
            self.storage_class = AlignedU32
        elif (ttype.char == "q") or (ttype.char == "l"):
            self.storage_class = AlignedI64
        elif (ttype.char == "Q") or (ttype.char == "L"):
            self.storage_class = AlignedU64
        elif ttype.char == "f":
            self.storage_class = AlignedF32
        elif ttype.char == "d":
            self.storage_class = AlignedF64
        elif ttype.char == "F":
            raise NotImplementedError("No support yet for complex numbers")
        elif ttype.char == "D":
            raise NotImplementedError("No support yet for complex numbers")
        else:
            msg = "Unsupported data typecode '{}'".format(ttype.char)
            log.error(msg)
            raise ValueError(msg)
        self._dtype = ttype

        self.mpitype = None
        self.mpibytesize = None
        if self._dist.comm is not None:
            self.mpibytesize, self.mpitype = mpi_data_type(self._dist.comm, self._dtype)

        self._shape = (
            self._dist.n_local_submap,
            self._dist.n_pix_submap,
            self._n_value,
        )
        self._flatshape = (
            self._dist.n_local_submap * self._dist.n_pix_submap * self._n_value
        )
        self._n_submap_value = self._dist.n_pix_submap * self._n_value

        self.raw = self.storage_class.zeros(self._flatshape)
        self.data = self.raw.array().reshape(self._shape)

        # Allreduce quantities
        self._all_comm_submap = None
        self._all_send = None
        self._all_send_raw = None
        self._all_recv = None
        self._all_recv_raw = None

        # Alltoallv quantities
        self._send_counts = None
        self._send_displ = None
        self._recv_counts = None
        self._recv_displ = None
        self._recv_locations = None
        self.receive = None
        self._receive_raw = None
        self.reduce_buf = None
        self._reduce_buf_raw = None

    def clear(self):
        """Delete the underlying memory.

        This will forcibly delete the C-allocated memory and invalidate all python
        references to this object.  DO NOT CALL THIS unless you are sure all references
        are no longer being used and you are about to delete the object.

        """
        if hasattr(self, "data"):
            # we keep the attribute to avoid errors in _accel_exists
            self.data = None
        if hasattr(self, "raw"):
            if self.accel_exists():
                self.accel_delete()
            if self.raw is not None:
                self.raw.clear()
            del self.raw
        if hasattr(self, "receive"):
            del self.receive
            if self._receive_raw is not None:
                self._receive_raw.clear()
            del self._receive_raw
        if hasattr(self, "reduce_buf"):
            del self.reduce_buf
            if self._reduce_buf_raw is not None:
                self._reduce_buf_raw.clear()
            del self._reduce_buf_raw
        if hasattr(self, "_all_send"):
            del self._all_send
            if self._all_send_raw is not None:
                self._all_send_raw.clear()
            del self._all_send_raw
        if hasattr(self, "_all_recv"):
            del self._all_recv
            if self._all_recv_raw is not None:
                self._all_recv_raw.clear()
            del self._all_recv_raw

    def __del__(self):
        self.clear()

    def reset(self):
        """Set memory to zero"""
        self.raw[:] = 0
        if self.accel_exists():
            self.accel_reset()

    @property
    def distribution(self):
        """(PixelDistribution): The distribution information."""
        return self._dist

    @property
    def dtype(self):
        """(numpy.dtype): The data type of the values."""
        return self._dtype

    @property
    def n_value(self):
        """(int): The number of non-zero values per pixel."""
        return self._n_value

    @property
    def units(self):
        """(Unit):  The map data units."""
        return self._units

    def update_units(self, new_units):
        """Update the units associated with the data."""
        self._units = new_units

    def __getitem__(self, key):
        return np.array(self.data[key], dtype=self._dtype, copy=False)

    def __delitem__(self, key):
        raise NotImplementedError("Cannot delete individual memory elements")
        return

    def __setitem__(self, key, value):
        self.data[key] = value

    def __iter__(self):
        return iter(self.data)

    def __len__(self):
        return len(self.data)

    def __repr__(self):
        val = (
            "<PixelData {} values per pixel, dtype = {}, units= {}, dist = {}>".format(
                self._n_value, self._dtype, self._units, self._dist
            )
        )
        return val

    def __eq__(self, other):
        if self.distribution != other.distribution:
            return False
        if self.dtype.char != other.dtype.char:
            return False
        if self.n_value != other.n_value:
            return False
        if not np.allclose(self.data, other.data):
            return False
        return True

    def __ne__(self, other):
        return not self.__eq__(other)

    def duplicate(self):
        """Create a copy of the data with the same distribution.

        Returns:
            (PixelData):  A duplicate of the instance with copied data but the same
                distribution.

        """
        dup = PixelData(
            self.distribution, self.dtype, n_value=self.n_value, units=self._units
        )
        dup.data[:] = self.data
        return dup

    def comm_nsubmap(self, bytes):
        """Given a buffer size, compute the number of submaps to communicate.

        Args:
            bytes (int):  The number of bytes.

        Returns:
            (int):  The number of submaps in each buffer.

        """
        dbytes = self._dtype.itemsize
        nsub = int(bytes / (dbytes * self._n_submap_value))
        if nsub == 0:
            nsub = 1
        allsub = int(self._dist.n_pix / self._dist.n_pix_submap)
        if nsub > allsub:
            nsub = allsub
        return nsub

    @function_timer
    def setup_allreduce(self, n_submap):
        """Check that allreduce buffers exist and create them if needed."""
        # Allocate persistent send / recv buffers that only change if number of submaps
        # change.
        if self._all_comm_submap is not None:
            # We have already done a reduce...
            if n_submap == self._all_comm_submap:
                # Already allocated with correct size
                return
            else:
                # Delete the old buffers.
                del self._all_send
                self._all_send_raw.clear()
                del self._all_send_raw
                del self._all_recv
                self._all_recv_raw.clear()
                del self._all_recv_raw
        # Allocate with the new size
        self._all_comm_submap = n_submap
        self._all_recv_raw = self.storage_class.zeros(n_submap * self._n_submap_value)
        self._all_recv = self._all_recv_raw.array()
        self._all_send_raw = self.storage_class.zeros(n_submap * self._n_submap_value)
        self._all_send = self._all_send_raw.array()

    @function_timer
    def sync_allreduce(self, comm_bytes=10000000):
        """Perform a buffered allreduce of the data.

        Args:
            comm_bytes (int): The approximate message size to use.

        Returns:
            None.

        """
        if self.accel_in_use():
            msg = f"PixelData {self._accel_name} currently on accelerator"
            msg += " cannot do MPI communication"
            raise RuntimeError(msg)

        if self._dist.comm is None:
            return

        comm_submap = self.comm_nsubmap(comm_bytes)
        self.setup_allreduce(comm_submap)

        dist = self._dist
        nsub = dist.n_submap

        sendview = self._all_send.reshape(
            (comm_submap, dist.n_pix_submap, self._n_value)
        )

        recvview = self._all_recv.reshape(
            (comm_submap, dist.n_pix_submap, self._n_value)
        )

        owners = dist.submap_owners

        submap_off = 0
        ncomm = comm_submap

        gt = GlobalTimers.get()

        while submap_off < nsub:
            if submap_off + ncomm > nsub:
                ncomm = nsub - submap_off
            if np.sum(owners[submap_off : submap_off + ncomm]) != -ncomm:
                # At least one submap has some hits.  Do the allreduce.
                # Otherwise we would skip this buffer to avoid reducing a
                # bunch of zeros.
                for c in range(ncomm):
                    glob = submap_off + c
                    if glob in dist.local_submaps:
                        # copy our data in.
                        loc = dist.global_submap_to_local[glob]
                        sendview[c, :, :] = self.data[loc, :, :]

                gt.start("PixelData.sync_allreduce MPI Allreduce")
                dist.comm.Allreduce(self._all_send, self._all_recv, op=MPI.SUM)
                gt.stop("PixelData.sync_allreduce MPI Allreduce")

                for c in range(ncomm):
                    glob = submap_off + c
                    if glob in dist.local_submaps:
                        # copy the reduced data
                        loc = dist.global_submap_to_local[glob]
                        self.data[loc, :, :] = recvview[c, :, :]

                self._all_send.fill(0)
                self._all_recv.fill(0)

            submap_off += ncomm

        return

    @staticmethod
    def local_reduction(n_submap_value, receive_locations, receive, reduce_buf):
        # Locally reduce owned submaps
        for sm, locs in receive_locations.items():
            reduce_buf[:] = 0
            for lc in locs:
                reduce_buf += receive[lc : lc + n_submap_value]
            for lc in locs:
                receive[lc : lc + n_submap_value] = reduce_buf

    @function_timer
    def setup_alltoallv(self):
        """Check that alltoallv buffers exist and create them if needed."""
        if self._send_counts is None:
            if self.accel_in_use():
                msg = f"PixelData {self._accel_name} currently on accelerator"
                msg += " cannot do MPI communication"
                raise RuntimeError(msg)
            log = Logger.get()
            # Get the parameters in terms of submaps.
            (
                send_counts,
                send_displ,
                recv_counts,
                recv_displ,
                recv_locations,
            ) = self._dist.alltoallv_info

            # Pixel values per submap
            scale = self._n_submap_value

            # Scale these quantites by the submap size and the number of values per
            # pixel.

            self._send_counts = scale * np.array(send_counts, dtype=np.int32)
            self._send_displ = scale * np.array(send_displ, dtype=np.int32)
            self._recv_counts = scale * np.array(recv_counts, dtype=np.int32)
            self._recv_displ = scale * np.array(recv_displ, dtype=np.int32)
            self._recv_locations = dict()
            for sm, locs in recv_locations.items():
                self._recv_locations[sm] = scale * np.array(locs, dtype=np.int32)

            msg_rank = 0
            if self._dist.comm is not None:
                msg_rank = self._dist.comm.rank
            msg = f"setup_alltoallv[{msg_rank}]:\n"
            msg += f"  send_counts={self._send_counts} "
            msg += f"send_displ={self._send_displ}\n"
            msg += f"  recv_counts={self._recv_counts} "
            msg += f"recv_displ={self._recv_displ} "
            msg += f"recv_locations={self._recv_locations}"
            log.verbose(msg)

            # Allocate a persistent single-submap buffer
            self._reduce_buf_raw = self.storage_class.zeros(self._n_submap_value)
            self.reduce_buf = self._reduce_buf_raw.array()

            buf_check_fail = False
            try:
                if self._dist.comm is None:
                    # For this case, point the receive member to the original data.
                    # This will allow codes processing locally owned submaps to work
                    # transparently in the serial case.
                    self.receive = self.data.reshape((-1,))
                else:
                    # Check that our send and receive buffers do not exceed 32bit
                    # indices required by MPI
                    max_int = 2147483647
                    recv_buf_size = self._recv_displ[-1] + self._recv_counts[-1]
                    if recv_buf_size > max_int:
                        msg = "Proc {} Alltoallv receive buffer size exceeds max 32bit integer".format(
                            self._dist.comm.rank
                        )
                        log.error(msg)
                        buf_check_fail = True
                    if len(self.raw) > max_int:
                        msg = "Proc {} Alltoallv send buffer size exceeds max 32bit integer".format(
                            self._dist.comm.rank
                        )
                        log.error(msg)
                        buf_check_fail = True

                    # Allocate a persistent receive buffer
                    msg = f"{msg_rank}:  allocate receive buffer of "
                    msg += f"{recv_buf_size} elements"
                    log.verbose(msg)
                    self._receive_raw = self.storage_class.zeros(recv_buf_size)
                    self.receive = self._receive_raw.array()
            except:
                buf_check_fail = True
            if self._dist.comm is not None:
                buf_check_fail = self._dist.comm.allreduce(buf_check_fail, op=MPI.LOR)
            if buf_check_fail:
                msg = "alltoallv buffer setup failed on one or more processes"
                raise RuntimeError(msg)

    @function_timer
    def forward_alltoallv(self):
        """Communicate submaps into buffers on the owning process.

        On the first call, some initialization is done to compute send and receive
        displacements and counts.  A persistent receive buffer is allocated.  Submap
        data is sent to their owners simultaneously using alltoallv.

        Returns:
            None.

        """
        if self.accel_in_use():
            msg = f"PixelData {self._accel_name} currently on accelerator"
            msg += " cannot do MPI communication"
            raise RuntimeError(msg)

        log = Logger.get()
        gt = GlobalTimers.get()
        self.setup_alltoallv()

        if self._dist.comm is None:
            # No communication needed
            return

        # Gather owned submaps locally
        gt.start("PixelData.forward_alltoallv MPI Alltoallv")
        self._dist.comm.Alltoallv(
            [self.raw, self._send_counts, self._send_displ, self.mpitype],
            [self.receive, self._recv_counts, self._recv_displ, self.mpitype],
        )
        gt.stop("PixelData.forward_alltoallv MPI Alltoallv")
        return

    @function_timer
    def reverse_alltoallv(self):
        """Communicate submaps from the owning process back to all processes.

        Returns:
            None.

        """
        if self.accel_in_use():
            msg = f"PixelData {self._accel_name} currently on accelerator"
            msg += " cannot do MPI communication"
            raise RuntimeError(msg)

        gt = GlobalTimers.get()
        if self._dist.comm is None:
            # No communication needed
            return
        if self._send_counts is None:
            raise RuntimeError(
                "Cannot do reverse alltoallv before buffers have been setup"
            )

        # Scatter result back
        gt.start("PixelData.reverse_alltoallv MPI Alltoallv")
        self._dist.comm.Alltoallv(
            [self.receive, self._recv_counts, self._recv_displ, self.mpitype],
            [self.raw, self._send_counts, self._send_displ, self.mpitype],
        )
        gt.stop("PixelData.reverse_alltoallv MPI Alltoallv")
        return

    @function_timer
    def sync_alltoallv(self, local_func=None):
        """Perform operations on locally owned submaps using Alltoallv communication.

        On the first call, some initialization is done to compute send and receive
        displacements and counts.  A persistent receive buffer is allocated.  Submap
        data is sent to their owners simultaneously using alltoallv.  Each process does
        a local operation on their owned submaps before sending the result back with
        another alltoallv call.

        Args:
            local_func (function):  A function for processing the local submap data.

        Returns:
            None.

        """
        self.forward_alltoallv()

        if local_func is None:
            local_func = self.local_reduction

        # Run operation on locally owned submaps
        local_func(
            self._n_submap_value, self._recv_locations, self.receive, self.reduce_buf
        )

        self.reverse_alltoallv()
        return

    @function_timer
    def stats(self, comm_bytes=10000000):
        """Compute some simple statistics of the pixel data.

        The map should already be consistent across all processes with overlapping
        submaps.

        Args:
            comm_bytes (int): The approximate message size to use.

        Returns:
            (dict):  The computed properties on rank zero, None on other ranks.

        """
        if self.accel_in_use():
            msg = f"PixelData {self._accel_name} currently on accelerator"
            msg += " cannot do MPI communication"
            raise RuntimeError(msg)
        dist = self._dist
        nsub = dist.n_submap

        if dist.comm is None:
            return {
                "sum": [np.sum(self.data[:, :, x]) for x in range(self._n_value)],
                "mean": [np.mean(self.data[:, :, x]) for x in range(self._n_value)],
                "rms": [np.std(self.data[:, :, x]) for x in range(self._n_value)],
            }

        # The lowest rank with a locally-hit submap will contribute to the reduction
        local_hit_submaps = dist.comm.size * np.ones(nsub, dtype=np.int32)
        local_hit_submaps[dist.local_submaps] = dist.comm.rank
        hit_submaps = np.zeros_like(local_hit_submaps)
        dist.comm.Allreduce(local_hit_submaps, hit_submaps, op=MPI.MIN)
        del local_hit_submaps

        comm_submap = self.comm_nsubmap(comm_bytes)

        send_buf = np.zeros(
            comm_submap * dist.n_pix_submap * self._n_value,
            dtype=self._dtype,
        ).reshape((comm_submap, dist.n_pix_submap, self._n_value))

        recv_buf = None
        if dist.comm.rank == 0:
            # Alloc receive buffer
            recv_buf = np.zeros(
                comm_submap * dist.n_pix_submap * self._n_value,
                dtype=self._dtype,
            ).reshape((comm_submap, dist.n_pix_submap, self._n_value))
            # Variables for variance calc
            accum_sum = np.zeros(self._n_value, dtype=np.float64)
            accum_count = np.zeros(self._n_value, dtype=np.int64)
            accum_mean = np.zeros(self._n_value, dtype=np.float64)
            accum_var = np.zeros(self._n_value, dtype=np.float64)

        # Doing a two-pass variance calculation is faster than looping
        # over individual samples in python.

        submap_off = 0
        ncomm = comm_submap

        while submap_off < nsub:
            if submap_off + ncomm > nsub:
                ncomm = nsub - submap_off
            send_buf[:, :, :] = 0
            for sm in range(ncomm):
                abs_sm = submap_off + sm
                if hit_submaps[abs_sm] == dist.comm.rank:
                    # Contribute
                    loc = dist.global_submap_to_local[abs_sm]
                    send_buf[sm, :, :] = self.data[loc, :, :]

            dist.comm.Reduce(send_buf, recv_buf, op=MPI.SUM, root=0)

            if dist.comm.rank == 0:
                for sm in range(ncomm):
                    for v in range(self._n_value):
                        accum_sum[v] += np.sum(recv_buf[sm, :, v])
                        accum_count[v] += dist.n_pix_submap
            dist.comm.barrier()
            submap_off += ncomm

        if dist.comm.rank == 0:
            for v in range(self._n_value):
                accum_mean[v] = accum_sum[v] / accum_count[v]

        submap_off = 0
        ncomm = comm_submap

        while submap_off < nsub:
            if submap_off + ncomm > nsub:
                ncomm = nsub - submap_off
            send_buf[:, :, :] = 0
            for sm in range(ncomm):
                abs_sm = submap_off + sm
                if hit_submaps[abs_sm] == dist.comm.rank:
                    # Contribute
                    loc = dist.global_submap_to_local[abs_sm]
                    send_buf[sm, :, :] = self.data[loc, :, :]

            dist.comm.Reduce(send_buf, recv_buf, op=MPI.SUM, root=0)

            if dist.comm.rank == 0:
                for sm in range(ncomm):
                    for v in range(self._n_value):
                        accum_var[v] += np.sum(
                            (recv_buf[sm, :, v] - accum_mean[v]) ** 2
                        )
            dist.comm.barrier()
            submap_off += ncomm

        if dist.comm.rank == 0:
            return {
                "sum": [float(accum_sum[x]) for x in range(self._n_value)],
                "mean": [float(accum_mean[x]) for x in range(self._n_value)],
                "rms": [
                    np.sqrt(accum_var[x] / (accum_count[x] - 1))
                    for x in range(self._n_value)
                ],
            }
        else:
            return None

    @function_timer
    def broadcast_map(self, fdata, comm_bytes=10000000):
        """Distribute a map located on a single process.

        The root process takes a map in memory and distributes it.   Chunks of submaps
        are broadcast to all processes, and each process copies data to its local
        submaps.

        Args:
            fdata (array): The input data (only significant on process 0).
            comm_bytes (int): The approximate message size to use.

        Returns:
            None

        """
        if self.accel_in_use():
            msg = f"PixelData {self._accel_name} currently on accelerator"
            msg += " cannot do MPI communication"
            raise RuntimeError(msg)
        rank = 0
        if self._dist.comm is not None:
            rank = self._dist.comm.rank
        comm_submap = self.comm_nsubmap(comm_bytes)

        # we make the assumption that FITS binary tables are still stored in
        # blocks of 2880 bytes just like always...
        dbytes = self._dtype.itemsize
        rowbytes = self._n_value * dbytes
        optrows = int(2880 / rowbytes)

        # Get a tuple of all columns in the table.  We choose memmap here so
        # that we can (hopefully) read through all columns in chunks such that
        # we only ever have a couple FITS blocks in memory.
        if rank == 0:
            if self._n_value == 1:
                fdata = (fdata,)

        buf = np.zeros(
            comm_submap * self._dist.n_pix_submap * self._n_value, dtype=self._dtype
        )
        view = buf.reshape((comm_submap, self._dist.n_pix_submap, self._n_value))

        in_off = 0
        out_off = 0
        submap_off = 0

        rows = optrows
        while in_off < self._dist.n_pix:
            if in_off + rows > self._dist.n_pix:
                rows = self._dist.n_pix - in_off
            # is this the last block for this communication?
            islast = False
            copyrows = rows
            if out_off + rows > (comm_submap * self._dist.n_pix_submap):
                copyrows = (comm_submap * self._dist.n_pix_submap) - out_off
                islast = True

            if rank == 0:
                for col in range(self._n_value):
                    coloff = (out_off * self._n_value) + col
                    buf[
                        coloff : coloff + (copyrows * self._n_value) : self._n_value
                    ] = fdata[col][in_off : in_off + copyrows]

            out_off += copyrows
            in_off += copyrows

            if islast:
                if self._dist.comm is not None:
                    self._dist.comm.Bcast(buf, root=0)
                # loop over these submaps, and copy any that we are assigned
                for sm in range(submap_off, submap_off + comm_submap):
                    if sm in self._dist.local_submaps:
                        loc = self._dist.global_submap_to_local[sm]
                        self.data[loc, :, :] = view[sm - submap_off, :, :]
                out_off = 0
                submap_off += comm_submap
                buf.fill(0)
                islast = False

        # flush the remaining buffer

        if out_off > 0:
            if self._dist.comm is not None:
                self._dist.comm.Bcast(buf, root=0)
            # loop over these submaps, and copy any that we are assigned
            for sm in range(submap_off, submap_off + comm_submap):
                if sm in self._dist.local_submaps:
                    loc = self._dist.global_submap_to_local[sm]
                    self.data[loc, :, :] = view[sm - submap_off, :, :]
        return

    def _accel_exists(self):
        if use_accel_omp:
            return accel_data_present(self.raw, self._accel_name)
        elif use_accel_jax:
            return accel_data_present(self.data)
        else:
            return False

    def _accel_create(self, zero_out=False):
        if use_accel_omp:
            self.raw = accel_data_create(self.raw, self._accel_name, zero_out=zero_out)
        elif use_accel_jax:
            self.data = accel_data_create(self.data, zero_out=zero_out)

    def _accel_update_device(self):
        if use_accel_omp:
            self.raw = accel_data_update_device(self.raw, self._accel_name)
        elif use_accel_jax:
            self.data = accel_data_update_device(self.data)

    def _accel_update_host(self):
        if use_accel_omp:
            self.raw = accel_data_update_host(self.raw, self._accel_name)
        elif use_accel_jax:
            self.data = accel_data_update_host(self.data)

    def _accel_reset(self):
        if use_accel_omp:
            accel_data_reset(self.raw, self._accel_name)
        elif use_accel_jax:
            accel_data_reset(self.data)

    def _accel_delete(self):
        if use_accel_omp:
            self.raw = accel_data_delete(self.raw, self._accel_name)
        elif use_accel_jax:
            self.data = accel_data_delete(self.data)

distribution property

(PixelDistribution): The distribution information.

dtype property

(numpy.dtype): The data type of the values.

n_value property

(int): The number of non-zero values per pixel.

units property

(Unit): The map data units.

broadcast_map(fdata, comm_bytes=10000000)

Distribute a map located on a single process.

The root process takes a map in memory and distributes it. Chunks of submaps are broadcast to all processes, and each process copies data to its local submaps.

Parameters:

Name Type Description Default
fdata array

The input data (only significant on process 0).

required
comm_bytes int

The approximate message size to use.

10000000

Returns:

Type Description

None

Source code in toast/pixels.py
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
@function_timer
def broadcast_map(self, fdata, comm_bytes=10000000):
    """Distribute a map located on a single process.

    The root process takes a map in memory and distributes it.   Chunks of submaps
    are broadcast to all processes, and each process copies data to its local
    submaps.

    Args:
        fdata (array): The input data (only significant on process 0).
        comm_bytes (int): The approximate message size to use.

    Returns:
        None

    """
    if self.accel_in_use():
        msg = f"PixelData {self._accel_name} currently on accelerator"
        msg += " cannot do MPI communication"
        raise RuntimeError(msg)
    rank = 0
    if self._dist.comm is not None:
        rank = self._dist.comm.rank
    comm_submap = self.comm_nsubmap(comm_bytes)

    # we make the assumption that FITS binary tables are still stored in
    # blocks of 2880 bytes just like always...
    dbytes = self._dtype.itemsize
    rowbytes = self._n_value * dbytes
    optrows = int(2880 / rowbytes)

    # Get a tuple of all columns in the table.  We choose memmap here so
    # that we can (hopefully) read through all columns in chunks such that
    # we only ever have a couple FITS blocks in memory.
    if rank == 0:
        if self._n_value == 1:
            fdata = (fdata,)

    buf = np.zeros(
        comm_submap * self._dist.n_pix_submap * self._n_value, dtype=self._dtype
    )
    view = buf.reshape((comm_submap, self._dist.n_pix_submap, self._n_value))

    in_off = 0
    out_off = 0
    submap_off = 0

    rows = optrows
    while in_off < self._dist.n_pix:
        if in_off + rows > self._dist.n_pix:
            rows = self._dist.n_pix - in_off
        # is this the last block for this communication?
        islast = False
        copyrows = rows
        if out_off + rows > (comm_submap * self._dist.n_pix_submap):
            copyrows = (comm_submap * self._dist.n_pix_submap) - out_off
            islast = True

        if rank == 0:
            for col in range(self._n_value):
                coloff = (out_off * self._n_value) + col
                buf[
                    coloff : coloff + (copyrows * self._n_value) : self._n_value
                ] = fdata[col][in_off : in_off + copyrows]

        out_off += copyrows
        in_off += copyrows

        if islast:
            if self._dist.comm is not None:
                self._dist.comm.Bcast(buf, root=0)
            # loop over these submaps, and copy any that we are assigned
            for sm in range(submap_off, submap_off + comm_submap):
                if sm in self._dist.local_submaps:
                    loc = self._dist.global_submap_to_local[sm]
                    self.data[loc, :, :] = view[sm - submap_off, :, :]
            out_off = 0
            submap_off += comm_submap
            buf.fill(0)
            islast = False

    # flush the remaining buffer

    if out_off > 0:
        if self._dist.comm is not None:
            self._dist.comm.Bcast(buf, root=0)
        # loop over these submaps, and copy any that we are assigned
        for sm in range(submap_off, submap_off + comm_submap):
            if sm in self._dist.local_submaps:
                loc = self._dist.global_submap_to_local[sm]
                self.data[loc, :, :] = view[sm - submap_off, :, :]
    return

clear()

Delete the underlying memory.

This will forcibly delete the C-allocated memory and invalidate all python references to this object. DO NOT CALL THIS unless you are sure all references are no longer being used and you are about to delete the object.

Source code in toast/pixels.py
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
def clear(self):
    """Delete the underlying memory.

    This will forcibly delete the C-allocated memory and invalidate all python
    references to this object.  DO NOT CALL THIS unless you are sure all references
    are no longer being used and you are about to delete the object.

    """
    if hasattr(self, "data"):
        # we keep the attribute to avoid errors in _accel_exists
        self.data = None
    if hasattr(self, "raw"):
        if self.accel_exists():
            self.accel_delete()
        if self.raw is not None:
            self.raw.clear()
        del self.raw
    if hasattr(self, "receive"):
        del self.receive
        if self._receive_raw is not None:
            self._receive_raw.clear()
        del self._receive_raw
    if hasattr(self, "reduce_buf"):
        del self.reduce_buf
        if self._reduce_buf_raw is not None:
            self._reduce_buf_raw.clear()
        del self._reduce_buf_raw
    if hasattr(self, "_all_send"):
        del self._all_send
        if self._all_send_raw is not None:
            self._all_send_raw.clear()
        del self._all_send_raw
    if hasattr(self, "_all_recv"):
        del self._all_recv
        if self._all_recv_raw is not None:
            self._all_recv_raw.clear()
        del self._all_recv_raw

comm_nsubmap(bytes)

Given a buffer size, compute the number of submaps to communicate.

Parameters:

Name Type Description Default
bytes int

The number of bytes.

required

Returns:

Type Description
int

The number of submaps in each buffer.

Source code in toast/pixels.py
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
def comm_nsubmap(self, bytes):
    """Given a buffer size, compute the number of submaps to communicate.

    Args:
        bytes (int):  The number of bytes.

    Returns:
        (int):  The number of submaps in each buffer.

    """
    dbytes = self._dtype.itemsize
    nsub = int(bytes / (dbytes * self._n_submap_value))
    if nsub == 0:
        nsub = 1
    allsub = int(self._dist.n_pix / self._dist.n_pix_submap)
    if nsub > allsub:
        nsub = allsub
    return nsub

duplicate()

Create a copy of the data with the same distribution.

Returns:

Type Description
PixelData

A duplicate of the instance with copied data but the same distribution.

Source code in toast/pixels.py
628
629
630
631
632
633
634
635
636
637
638
639
640
def duplicate(self):
    """Create a copy of the data with the same distribution.

    Returns:
        (PixelData):  A duplicate of the instance with copied data but the same
            distribution.

    """
    dup = PixelData(
        self.distribution, self.dtype, n_value=self.n_value, units=self._units
    )
    dup.data[:] = self.data
    return dup

forward_alltoallv()

Communicate submaps into buffers on the owning process.

On the first call, some initialization is done to compute send and receive displacements and counts. A persistent receive buffer is allocated. Submap data is sent to their owners simultaneously using alltoallv.

Returns:

Type Description

None.

Source code in toast/pixels.py
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
@function_timer
def forward_alltoallv(self):
    """Communicate submaps into buffers on the owning process.

    On the first call, some initialization is done to compute send and receive
    displacements and counts.  A persistent receive buffer is allocated.  Submap
    data is sent to their owners simultaneously using alltoallv.

    Returns:
        None.

    """
    if self.accel_in_use():
        msg = f"PixelData {self._accel_name} currently on accelerator"
        msg += " cannot do MPI communication"
        raise RuntimeError(msg)

    log = Logger.get()
    gt = GlobalTimers.get()
    self.setup_alltoallv()

    if self._dist.comm is None:
        # No communication needed
        return

    # Gather owned submaps locally
    gt.start("PixelData.forward_alltoallv MPI Alltoallv")
    self._dist.comm.Alltoallv(
        [self.raw, self._send_counts, self._send_displ, self.mpitype],
        [self.receive, self._recv_counts, self._recv_displ, self.mpitype],
    )
    gt.stop("PixelData.forward_alltoallv MPI Alltoallv")
    return

reset()

Set memory to zero

Source code in toast/pixels.py
560
561
562
563
564
def reset(self):
    """Set memory to zero"""
    self.raw[:] = 0
    if self.accel_exists():
        self.accel_reset()

reverse_alltoallv()

Communicate submaps from the owning process back to all processes.

Returns:

Type Description

None.

Source code in toast/pixels.py
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
@function_timer
def reverse_alltoallv(self):
    """Communicate submaps from the owning process back to all processes.

    Returns:
        None.

    """
    if self.accel_in_use():
        msg = f"PixelData {self._accel_name} currently on accelerator"
        msg += " cannot do MPI communication"
        raise RuntimeError(msg)

    gt = GlobalTimers.get()
    if self._dist.comm is None:
        # No communication needed
        return
    if self._send_counts is None:
        raise RuntimeError(
            "Cannot do reverse alltoallv before buffers have been setup"
        )

    # Scatter result back
    gt.start("PixelData.reverse_alltoallv MPI Alltoallv")
    self._dist.comm.Alltoallv(
        [self.receive, self._recv_counts, self._recv_displ, self.mpitype],
        [self.raw, self._send_counts, self._send_displ, self.mpitype],
    )
    gt.stop("PixelData.reverse_alltoallv MPI Alltoallv")
    return

setup_allreduce(n_submap)

Check that allreduce buffers exist and create them if needed.

Source code in toast/pixels.py
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
@function_timer
def setup_allreduce(self, n_submap):
    """Check that allreduce buffers exist and create them if needed."""
    # Allocate persistent send / recv buffers that only change if number of submaps
    # change.
    if self._all_comm_submap is not None:
        # We have already done a reduce...
        if n_submap == self._all_comm_submap:
            # Already allocated with correct size
            return
        else:
            # Delete the old buffers.
            del self._all_send
            self._all_send_raw.clear()
            del self._all_send_raw
            del self._all_recv
            self._all_recv_raw.clear()
            del self._all_recv_raw
    # Allocate with the new size
    self._all_comm_submap = n_submap
    self._all_recv_raw = self.storage_class.zeros(n_submap * self._n_submap_value)
    self._all_recv = self._all_recv_raw.array()
    self._all_send_raw = self.storage_class.zeros(n_submap * self._n_submap_value)
    self._all_send = self._all_send_raw.array()

setup_alltoallv()

Check that alltoallv buffers exist and create them if needed.

Source code in toast/pixels.py
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
@function_timer
def setup_alltoallv(self):
    """Check that alltoallv buffers exist and create them if needed."""
    if self._send_counts is None:
        if self.accel_in_use():
            msg = f"PixelData {self._accel_name} currently on accelerator"
            msg += " cannot do MPI communication"
            raise RuntimeError(msg)
        log = Logger.get()
        # Get the parameters in terms of submaps.
        (
            send_counts,
            send_displ,
            recv_counts,
            recv_displ,
            recv_locations,
        ) = self._dist.alltoallv_info

        # Pixel values per submap
        scale = self._n_submap_value

        # Scale these quantites by the submap size and the number of values per
        # pixel.

        self._send_counts = scale * np.array(send_counts, dtype=np.int32)
        self._send_displ = scale * np.array(send_displ, dtype=np.int32)
        self._recv_counts = scale * np.array(recv_counts, dtype=np.int32)
        self._recv_displ = scale * np.array(recv_displ, dtype=np.int32)
        self._recv_locations = dict()
        for sm, locs in recv_locations.items():
            self._recv_locations[sm] = scale * np.array(locs, dtype=np.int32)

        msg_rank = 0
        if self._dist.comm is not None:
            msg_rank = self._dist.comm.rank
        msg = f"setup_alltoallv[{msg_rank}]:\n"
        msg += f"  send_counts={self._send_counts} "
        msg += f"send_displ={self._send_displ}\n"
        msg += f"  recv_counts={self._recv_counts} "
        msg += f"recv_displ={self._recv_displ} "
        msg += f"recv_locations={self._recv_locations}"
        log.verbose(msg)

        # Allocate a persistent single-submap buffer
        self._reduce_buf_raw = self.storage_class.zeros(self._n_submap_value)
        self.reduce_buf = self._reduce_buf_raw.array()

        buf_check_fail = False
        try:
            if self._dist.comm is None:
                # For this case, point the receive member to the original data.
                # This will allow codes processing locally owned submaps to work
                # transparently in the serial case.
                self.receive = self.data.reshape((-1,))
            else:
                # Check that our send and receive buffers do not exceed 32bit
                # indices required by MPI
                max_int = 2147483647
                recv_buf_size = self._recv_displ[-1] + self._recv_counts[-1]
                if recv_buf_size > max_int:
                    msg = "Proc {} Alltoallv receive buffer size exceeds max 32bit integer".format(
                        self._dist.comm.rank
                    )
                    log.error(msg)
                    buf_check_fail = True
                if len(self.raw) > max_int:
                    msg = "Proc {} Alltoallv send buffer size exceeds max 32bit integer".format(
                        self._dist.comm.rank
                    )
                    log.error(msg)
                    buf_check_fail = True

                # Allocate a persistent receive buffer
                msg = f"{msg_rank}:  allocate receive buffer of "
                msg += f"{recv_buf_size} elements"
                log.verbose(msg)
                self._receive_raw = self.storage_class.zeros(recv_buf_size)
                self.receive = self._receive_raw.array()
        except:
            buf_check_fail = True
        if self._dist.comm is not None:
            buf_check_fail = self._dist.comm.allreduce(buf_check_fail, op=MPI.LOR)
        if buf_check_fail:
            msg = "alltoallv buffer setup failed on one or more processes"
            raise RuntimeError(msg)

stats(comm_bytes=10000000)

Compute some simple statistics of the pixel data.

The map should already be consistent across all processes with overlapping submaps.

Parameters:

Name Type Description Default
comm_bytes int

The approximate message size to use.

10000000

Returns:

Type Description
dict

The computed properties on rank zero, None on other ranks.

Source code in toast/pixels.py
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
@function_timer
def stats(self, comm_bytes=10000000):
    """Compute some simple statistics of the pixel data.

    The map should already be consistent across all processes with overlapping
    submaps.

    Args:
        comm_bytes (int): The approximate message size to use.

    Returns:
        (dict):  The computed properties on rank zero, None on other ranks.

    """
    if self.accel_in_use():
        msg = f"PixelData {self._accel_name} currently on accelerator"
        msg += " cannot do MPI communication"
        raise RuntimeError(msg)
    dist = self._dist
    nsub = dist.n_submap

    if dist.comm is None:
        return {
            "sum": [np.sum(self.data[:, :, x]) for x in range(self._n_value)],
            "mean": [np.mean(self.data[:, :, x]) for x in range(self._n_value)],
            "rms": [np.std(self.data[:, :, x]) for x in range(self._n_value)],
        }

    # The lowest rank with a locally-hit submap will contribute to the reduction
    local_hit_submaps = dist.comm.size * np.ones(nsub, dtype=np.int32)
    local_hit_submaps[dist.local_submaps] = dist.comm.rank
    hit_submaps = np.zeros_like(local_hit_submaps)
    dist.comm.Allreduce(local_hit_submaps, hit_submaps, op=MPI.MIN)
    del local_hit_submaps

    comm_submap = self.comm_nsubmap(comm_bytes)

    send_buf = np.zeros(
        comm_submap * dist.n_pix_submap * self._n_value,
        dtype=self._dtype,
    ).reshape((comm_submap, dist.n_pix_submap, self._n_value))

    recv_buf = None
    if dist.comm.rank == 0:
        # Alloc receive buffer
        recv_buf = np.zeros(
            comm_submap * dist.n_pix_submap * self._n_value,
            dtype=self._dtype,
        ).reshape((comm_submap, dist.n_pix_submap, self._n_value))
        # Variables for variance calc
        accum_sum = np.zeros(self._n_value, dtype=np.float64)
        accum_count = np.zeros(self._n_value, dtype=np.int64)
        accum_mean = np.zeros(self._n_value, dtype=np.float64)
        accum_var = np.zeros(self._n_value, dtype=np.float64)

    # Doing a two-pass variance calculation is faster than looping
    # over individual samples in python.

    submap_off = 0
    ncomm = comm_submap

    while submap_off < nsub:
        if submap_off + ncomm > nsub:
            ncomm = nsub - submap_off
        send_buf[:, :, :] = 0
        for sm in range(ncomm):
            abs_sm = submap_off + sm
            if hit_submaps[abs_sm] == dist.comm.rank:
                # Contribute
                loc = dist.global_submap_to_local[abs_sm]
                send_buf[sm, :, :] = self.data[loc, :, :]

        dist.comm.Reduce(send_buf, recv_buf, op=MPI.SUM, root=0)

        if dist.comm.rank == 0:
            for sm in range(ncomm):
                for v in range(self._n_value):
                    accum_sum[v] += np.sum(recv_buf[sm, :, v])
                    accum_count[v] += dist.n_pix_submap
        dist.comm.barrier()
        submap_off += ncomm

    if dist.comm.rank == 0:
        for v in range(self._n_value):
            accum_mean[v] = accum_sum[v] / accum_count[v]

    submap_off = 0
    ncomm = comm_submap

    while submap_off < nsub:
        if submap_off + ncomm > nsub:
            ncomm = nsub - submap_off
        send_buf[:, :, :] = 0
        for sm in range(ncomm):
            abs_sm = submap_off + sm
            if hit_submaps[abs_sm] == dist.comm.rank:
                # Contribute
                loc = dist.global_submap_to_local[abs_sm]
                send_buf[sm, :, :] = self.data[loc, :, :]

        dist.comm.Reduce(send_buf, recv_buf, op=MPI.SUM, root=0)

        if dist.comm.rank == 0:
            for sm in range(ncomm):
                for v in range(self._n_value):
                    accum_var[v] += np.sum(
                        (recv_buf[sm, :, v] - accum_mean[v]) ** 2
                    )
        dist.comm.barrier()
        submap_off += ncomm

    if dist.comm.rank == 0:
        return {
            "sum": [float(accum_sum[x]) for x in range(self._n_value)],
            "mean": [float(accum_mean[x]) for x in range(self._n_value)],
            "rms": [
                np.sqrt(accum_var[x] / (accum_count[x] - 1))
                for x in range(self._n_value)
            ],
        }
    else:
        return None

sync_allreduce(comm_bytes=10000000)

Perform a buffered allreduce of the data.

Parameters:

Name Type Description Default
comm_bytes int

The approximate message size to use.

10000000

Returns:

Type Description

None.

Source code in toast/pixels.py
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
@function_timer
def sync_allreduce(self, comm_bytes=10000000):
    """Perform a buffered allreduce of the data.

    Args:
        comm_bytes (int): The approximate message size to use.

    Returns:
        None.

    """
    if self.accel_in_use():
        msg = f"PixelData {self._accel_name} currently on accelerator"
        msg += " cannot do MPI communication"
        raise RuntimeError(msg)

    if self._dist.comm is None:
        return

    comm_submap = self.comm_nsubmap(comm_bytes)
    self.setup_allreduce(comm_submap)

    dist = self._dist
    nsub = dist.n_submap

    sendview = self._all_send.reshape(
        (comm_submap, dist.n_pix_submap, self._n_value)
    )

    recvview = self._all_recv.reshape(
        (comm_submap, dist.n_pix_submap, self._n_value)
    )

    owners = dist.submap_owners

    submap_off = 0
    ncomm = comm_submap

    gt = GlobalTimers.get()

    while submap_off < nsub:
        if submap_off + ncomm > nsub:
            ncomm = nsub - submap_off
        if np.sum(owners[submap_off : submap_off + ncomm]) != -ncomm:
            # At least one submap has some hits.  Do the allreduce.
            # Otherwise we would skip this buffer to avoid reducing a
            # bunch of zeros.
            for c in range(ncomm):
                glob = submap_off + c
                if glob in dist.local_submaps:
                    # copy our data in.
                    loc = dist.global_submap_to_local[glob]
                    sendview[c, :, :] = self.data[loc, :, :]

            gt.start("PixelData.sync_allreduce MPI Allreduce")
            dist.comm.Allreduce(self._all_send, self._all_recv, op=MPI.SUM)
            gt.stop("PixelData.sync_allreduce MPI Allreduce")

            for c in range(ncomm):
                glob = submap_off + c
                if glob in dist.local_submaps:
                    # copy the reduced data
                    loc = dist.global_submap_to_local[glob]
                    self.data[loc, :, :] = recvview[c, :, :]

            self._all_send.fill(0)
            self._all_recv.fill(0)

        submap_off += ncomm

    return

sync_alltoallv(local_func=None)

Perform operations on locally owned submaps using Alltoallv communication.

On the first call, some initialization is done to compute send and receive displacements and counts. A persistent receive buffer is allocated. Submap data is sent to their owners simultaneously using alltoallv. Each process does a local operation on their owned submaps before sending the result back with another alltoallv call.

Parameters:

Name Type Description Default
local_func function

A function for processing the local submap data.

None

Returns:

Type Description

None.

Source code in toast/pixels.py
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
@function_timer
def sync_alltoallv(self, local_func=None):
    """Perform operations on locally owned submaps using Alltoallv communication.

    On the first call, some initialization is done to compute send and receive
    displacements and counts.  A persistent receive buffer is allocated.  Submap
    data is sent to their owners simultaneously using alltoallv.  Each process does
    a local operation on their owned submaps before sending the result back with
    another alltoallv call.

    Args:
        local_func (function):  A function for processing the local submap data.

    Returns:
        None.

    """
    self.forward_alltoallv()

    if local_func is None:
        local_func = self.local_reduction

    # Run operation on locally owned submaps
    local_func(
        self._n_submap_value, self._recv_locations, self.receive, self.reduce_buf
    )

    self.reverse_alltoallv()
    return

update_units(new_units)

Update the units associated with the data.

Source code in toast/pixels.py
586
587
588
def update_units(self, new_units):
    """Update the units associated with the data."""
    self._units = new_units

PixelDistribution

Bases: AcceleratorObject

Class representing the distribution of submaps.

This object is used to describe the properties of a pixelization scheme and which "submaps" are strored on each process. The size of the submap can be tuned to balance storage (smaller submap size means fewer wasted pixels stored) and ease of indexing (larger submap size means faster global-to-local pixel lookups).

Parameters:

Name Type Description Default
n_pix int

the total number of pixels.

None
n_submap int

the number of submaps to use.

1000
local_submaps array

the list of local submaps (integers).

None
comm Comm

The MPI communicator or None.

None
Source code in toast/pixels.py
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
class PixelDistribution(AcceleratorObject):
    """Class representing the distribution of submaps.

    This object is used to describe the properties of a pixelization scheme and which
    "submaps" are strored on each process.  The size of the submap can be tuned to
    balance storage (smaller submap size means fewer wasted pixels stored) and ease of
    indexing (larger submap size means faster global-to-local pixel lookups).

    Args:
        n_pix (int): the total number of pixels.
        n_submap (int): the number of submaps to use.
        local_submaps (array): the list of local submaps (integers).
        comm (mpi4py.MPI.Comm): The MPI communicator or None.

    """

    def __init__(self, n_pix=None, n_submap=1000, local_submaps=None, comm=None):
        super().__init__()
        self._n_pix = n_pix
        self._n_submap = n_submap
        if self._n_submap > self._n_pix:
            msg = "Cannot create a PixelDistribution with more submaps ({}) than pixels ({})".format(
                n_submap, n_pix
            )
            raise RuntimeError(msg)
        self._n_pix_submap = self._n_pix // self._n_submap
        if self._n_pix % self._n_submap != 0:
            self._n_pix_submap += 1

        self._local_submaps = local_submaps
        self._comm = comm

        self._n_local = 0
        self._glob2loc = AlignedI64.zeros(self._n_submap)
        self._glob2loc[:] = -1

        if self._local_submaps is not None and len(self._local_submaps) > 0:
            if np.max(self._local_submaps) > self._n_submap - 1:
                raise RuntimeError("local submap indices out of range")
            self._n_local = len(self._local_submaps)
            for ilocal_submap, iglobal_submap in enumerate(self._local_submaps):
                self._glob2loc[iglobal_submap] = ilocal_submap

        self._submap_owners = None
        self._owned_submaps = None
        self._alltoallv_info = None
        self._all_hit_submaps = None

    def __eq__(self, other):
        local_eq = True
        if self._n_pix != other._n_pix:
            local_eq = False
        if self._n_submap != other._n_submap:
            local_eq = False
        if self._n_pix_submap != other._n_pix_submap:
            local_eq = False
        if not np.array_equal(self._local_submaps, other._local_submaps):
            local_eq = False
        if self._comm is None and other._comm is not None:
            local_eq = False
        if self._comm is not None and other._comm is None:
            local_eq = False
        if self._comm is not None:
            comp = MPI.Comm.Compare(self._comm, other._comm)
            if comp not in (MPI.IDENT, MPI.CONGRUENT):
                local_eq = False
        return local_eq

    def __ne__(self, other):
        return not self.__eq__(other)

    def clear(self):
        """Delete the underlying memory.

        This will forcibly delete the C-allocated memory and invalidate all python
        references to this object.  DO NOT CALL THIS unless you are sure all references
        are no longer being used and you are about to delete the object.

        """
        if hasattr(self, "_glob2loc"):
            if self._glob2loc is not None:
                self._glob2loc.clear()
                del self._glob2loc

    def __del__(self):
        self.clear()

    @property
    def comm(self):
        """(mpi4py.MPI.Comm): The MPI communicator used (or None)"""
        return self._comm

    @property
    def n_pix(self):
        """(int): The global number of pixels."""
        return self._n_pix

    @property
    def n_pix_submap(self):
        """(int): The number of pixels in each submap."""
        return self._n_pix_submap

    @property
    def n_submap(self):
        """(int): The total number of submaps."""
        return self._n_submap

    @property
    def n_local_submap(self):
        """(int): The number of submaps stored on this process."""
        return self._n_local

    @property
    def local_submaps(self):
        """(array): The list of local submaps or None if process has no data."""
        return self._local_submaps

    @property
    def all_hit_submaps(self):
        """(array): The list of submaps local to atleast one process."""
        if self._all_hit_submaps is None:
            hits = np.zeros(self._n_submap)
            hits[self._local_submaps] += 1
            if self._comm is not None:
                self._comm.Allreduce(MPI.IN_PLACE, hits)
            self._all_hit_submaps = np.argwhere(hits != 0).ravel()
        return self._all_hit_submaps

    @property
    def global_submap_to_local(self):
        """(array): The mapping from global submap to local."""
        return self._glob2loc

    @function_timer
    def global_pixel_to_submap(self, gl):
        """Convert global pixel indices into the local submap and pixel.

        Args:
            gl (array): The global pixel numbers.

        Returns:
            (tuple):  A tuple of arrays containing the local submap index (int) and the
                pixel index local to that submap (int).

        """
        if len(gl) == 0:
            return (np.zeros_like(gl), np.zeros_like(gl))
        log = Logger.get()
        if np.max(gl) >= self._n_pix:
            msg = "Global pixel indices exceed the maximum for the pixelization"
            log.error(msg)
            raise RuntimeError(msg)
        return libtoast_global_to_local(gl, self._n_pix_submap, self._glob2loc)

    @function_timer
    def global_pixel_to_local(self, gl):
        """Convert global pixel indices into local pixel indices.

        Args:
            gl (array): The global pixel numbers.

        Returns:
            (array): The local raw (flat packed) buffer index for each pixel.

        """
        if len(gl) == 0:
            return np.zeros_like(gl)
        if np.max(gl) >= self._n_pix:
            log = Logger.get()
            msg = "Global pixel indices exceed the maximum for the pixelization"
            log.error(msg)
            raise RuntimeError(msg)
        local_sm, pixels = libtoast_global_to_local(
            gl, self._n_pix_submap, self._glob2loc
        )
        local_sm *= self._n_pix_submap
        pixels += local_sm
        return pixels

    def __repr__(self):
        val = "<PixelDistribution {} pixels, {} submaps, submap size = {}>".format(
            self._n_pix, self._n_submap, self._n_pix_submap
        )
        return val

    @property
    def submap_owners(self):
        """The owning process for every hit submap.

        This information is used in several other operations, including serializing
        PixelData objects to a single process and also communication needed for
        reducing data globally.
        """
        if self._submap_owners is not None:
            # Already computed
            return self._submap_owners

        self._submap_owners = np.empty(self._n_submap, dtype=np.int32)
        self._submap_owners[:] = -1

        if self._comm is None:
            # Trivial case
            if self._local_submaps is not None and len(self._local_submaps) > 0:
                self._submap_owners[self._local_submaps] = 0
        else:
            # Need to compute it.
            local_hit_submaps = np.zeros(self._n_submap, dtype=np.uint8)
            local_hit_submaps[self._local_submaps] = 1

            hit_submaps = None
            if self._comm.rank == 0:
                hit_submaps = np.zeros(self._n_submap, dtype=np.uint8)

            self._comm.Reduce(local_hit_submaps, hit_submaps, op=MPI.LOR, root=0)
            del local_hit_submaps

            if self._comm.rank == 0:
                total_hit_submaps = np.sum(hit_submaps.astype(np.int32))
                tdist = distribute_uniform(total_hit_submaps, self._comm.size)

                # The target number of submaps per process
                target = [x[1] for x in tdist]

                # Assign the submaps in rank order.  This ensures better load
                # distribution when serializing some operations and also reduces needed
                # memory copies when using Alltoallv.
                proc_offset = 0
                proc = 0
                for sm in range(self._n_submap):
                    if hit_submaps[sm] > 0:
                        self._submap_owners[sm] = proc
                        proc_offset += 1
                        if proc_offset >= target[proc]:
                            proc += 1
                            proc_offset = 0
                del hit_submaps

            self._comm.Bcast(self._submap_owners, root=0)
        return self._submap_owners

    @property
    def owned_submaps(self):
        """The submaps owned by this process."""
        if self._owned_submaps is not None:
            # Already computed
            return self._owned_submaps
        owners = self.submap_owners
        if self._comm is None:
            self._owned_submaps = np.array(
                [x for x, y in enumerate(owners) if y == 0], dtype=np.int32
            )
        else:
            self._owned_submaps = np.array(
                [x for x, y in enumerate(owners) if y == self._comm.rank],
                dtype=np.int32,
            )
        return self._owned_submaps

    @property
    def alltoallv_info(self):
        """Return the offset information for Alltoallv communication.

        This returns a tuple containing:
            - The send displacements for the Alltoallv submap gather
            - The send counts for the Alltoallv submap gather
            - The receive displacements for the Alltoallv submap gather
            - The receive counts for the Alltoallv submap gather
            - The locations in the receive buffer of each submap.

        """
        log = Logger.get()
        if self._alltoallv_info is not None:
            # Already computed
            return self._alltoallv_info

        owners = self.submap_owners
        our_submaps = self.owned_submaps

        send_counts = None
        send_displ = None
        recv_counts = None
        recv_displ = None
        recv_locations = None

        if self._comm is None:
            recv_counts = len(self._local_submaps) * np.ones(1, dtype=np.int32)
            recv_displ = np.zeros(1, dtype=np.int32)
            recv_locations = dict()
            for offset, sm in enumerate(self._local_submaps):
                recv_locations[sm] = np.array([offset], dtype=np.int32)
            send_counts = len(self._local_submaps) * np.ones(1, dtype=np.int32)
            send_displ = np.zeros(1, dtype=np.int32)
        else:
            # Compute the other "contributing" processes that have submaps which we own.
            # Also track the receive buffer offsets for each owned submap.
            send = [list() for x in range(self._comm.size)]
            for sm in self._local_submaps:
                # Tell the owner of this submap that we are a contributor
                send[owners[sm]].append(sm)
            recv = self._comm.alltoall(send)

            recv_counts = np.zeros(self._comm.size, dtype=np.int32)
            recv_displ = np.zeros(self._comm.size, dtype=np.int32)
            recv_locations = dict()

            offset = 0
            for proc, sms in enumerate(recv):
                recv_displ[proc] = offset
                for sm in sms:
                    if sm not in recv_locations:
                        recv_locations[sm] = list()
                    recv_locations[sm].append(offset)
                    recv_counts[proc] += 1
                    offset += 1

            for sm in list(recv_locations.keys()):
                recv_locations[sm] = np.array(recv_locations[sm], dtype=np.int32)

            # Compute the Alltoallv send offsets in terms of submaps
            send_counts = np.zeros(self._comm.size, dtype=np.int32)
            send_displ = np.zeros(self._comm.size, dtype=np.int32)
            offset = 0
            last_offset = 0
            last_own = -1
            for sm in self._local_submaps:
                if last_own != owners[sm]:
                    # Moving on to next owning process...
                    if last_own >= 0:
                        send_displ[last_own] = last_offset
                        last_offset = offset
                send_counts[owners[sm]] += 1
                offset += 1
                last_own = owners[sm]
            if last_own >= 0:
                # Finish up last process
                send_displ[last_own] = last_offset

        self._alltoallv_info = (
            send_counts,
            send_displ,
            recv_counts,
            recv_displ,
            recv_locations,
        )

        msg_rank = 0
        if self._comm is not None:
            msg_rank = self._comm.rank
        msg = f"alltoallv_info[{msg_rank}]:\n"
        msg += f"  send_counts={send_counts} "
        msg += f"send_displ={send_displ}\n"
        msg += f"  recv_counts={recv_counts} "
        msg += f"recv_displ={recv_displ} "
        msg += f"recv_locations={recv_locations}"
        log.verbose(msg)

        return self._alltoallv_info

    def _accel_exists(self):
        return accel_data_present(self._glob2loc, self._accel_name)

    def _accel_create(self):
        self._glob2loc = accel_data_create(self._glob2loc, self._accel_name)

    def _accel_update_device(self):
        self._glob2loc = accel_data_update_device(self._glob2loc, self._accel_name)

    def _accel_update_host(self):
        self._glob2loc = accel_data_update_host(self._glob2loc, self._accel_name)

    def _accel_reset(self):
        accel_data_reset(self._glob2loc, self._accel_name)

    def _accel_delete(self):
        self._glob2loc = accel_data_delete(self._glob2loc, self._accel_name)

all_hit_submaps property

(array): The list of submaps local to atleast one process.

alltoallv_info property

Return the offset information for Alltoallv communication.

This returns a tuple containing
  • The send displacements for the Alltoallv submap gather
  • The send counts for the Alltoallv submap gather
  • The receive displacements for the Alltoallv submap gather
  • The receive counts for the Alltoallv submap gather
  • The locations in the receive buffer of each submap.

comm property

(mpi4py.MPI.Comm): The MPI communicator used (or None)

global_submap_to_local property

(array): The mapping from global submap to local.

local_submaps property

(array): The list of local submaps or None if process has no data.

n_local_submap property

(int): The number of submaps stored on this process.

n_pix property

(int): The global number of pixels.

n_pix_submap property

(int): The number of pixels in each submap.

n_submap property

(int): The total number of submaps.

owned_submaps property

The submaps owned by this process.

submap_owners property

The owning process for every hit submap.

This information is used in several other operations, including serializing PixelData objects to a single process and also communication needed for reducing data globally.

clear()

Delete the underlying memory.

This will forcibly delete the C-allocated memory and invalidate all python references to this object. DO NOT CALL THIS unless you are sure all references are no longer being used and you are about to delete the object.

Source code in toast/pixels.py
113
114
115
116
117
118
119
120
121
122
123
124
def clear(self):
    """Delete the underlying memory.

    This will forcibly delete the C-allocated memory and invalidate all python
    references to this object.  DO NOT CALL THIS unless you are sure all references
    are no longer being used and you are about to delete the object.

    """
    if hasattr(self, "_glob2loc"):
        if self._glob2loc is not None:
            self._glob2loc.clear()
            del self._glob2loc

global_pixel_to_local(gl)

Convert global pixel indices into local pixel indices.

Parameters:

Name Type Description Default
gl array

The global pixel numbers.

required

Returns:

Type Description
array

The local raw (flat packed) buffer index for each pixel.

Source code in toast/pixels.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
@function_timer
def global_pixel_to_local(self, gl):
    """Convert global pixel indices into local pixel indices.

    Args:
        gl (array): The global pixel numbers.

    Returns:
        (array): The local raw (flat packed) buffer index for each pixel.

    """
    if len(gl) == 0:
        return np.zeros_like(gl)
    if np.max(gl) >= self._n_pix:
        log = Logger.get()
        msg = "Global pixel indices exceed the maximum for the pixelization"
        log.error(msg)
        raise RuntimeError(msg)
    local_sm, pixels = libtoast_global_to_local(
        gl, self._n_pix_submap, self._glob2loc
    )
    local_sm *= self._n_pix_submap
    pixels += local_sm
    return pixels

global_pixel_to_submap(gl)

Convert global pixel indices into the local submap and pixel.

Parameters:

Name Type Description Default
gl array

The global pixel numbers.

required

Returns:

Type Description
tuple

A tuple of arrays containing the local submap index (int) and the pixel index local to that submap (int).

Source code in toast/pixels.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
@function_timer
def global_pixel_to_submap(self, gl):
    """Convert global pixel indices into the local submap and pixel.

    Args:
        gl (array): The global pixel numbers.

    Returns:
        (tuple):  A tuple of arrays containing the local submap index (int) and the
            pixel index local to that submap (int).

    """
    if len(gl) == 0:
        return (np.zeros_like(gl), np.zeros_like(gl))
    log = Logger.get()
    if np.max(gl) >= self._n_pix:
        msg = "Global pixel indices exceed the maximum for the pixelization"
        log.error(msg)
        raise RuntimeError(msg)
    return libtoast_global_to_local(gl, self._n_pix_submap, self._glob2loc)

SpaceSite

Bases: Site

Site with no atmosphere.

This represents a location beyond the Earth's atmosphere. In practice, this should be sub-classed for real satellite experiments.

Parameters:

Name Type Description Default
name str

Site name

required
uid int

Unique identifier. If not specified, constructed from a hash of the site name.

None
Source code in toast/instrument.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
class SpaceSite(Site):
    """Site with no atmosphere.

    This represents a location beyond the Earth's atmosphere.  In practice, this
    should be sub-classed for real satellite experiments.

    Args:
        name (str):  Site name
        uid (int):  Unique identifier.  If not specified, constructed from a hash
            of the site name.

    """

    def __init__(self, name, uid=None):
        super().__init__(name, uid)

    def __repr__(self):
        value = "<SpaceSite '{}' : uid = {}>".format(self.name, self.uid)
        return value

    def _position_velocity(self, times):
        # Compute data at 10 minute intervals and interpolate.  If the timestamps are
        # more coarsely sampled than that, just compute those times directly.
        sparse_incr = 600.0
        do_interp = True
        if len(times) < 100 or (times[1] - times[0]) > sparse_incr:
            do_interp = False

        if do_interp:
            n_sparse = 1 + int((times[-1] - times[0]) / sparse_incr)
            sparse_times = np.linspace(times[0], times[-1], num=n_sparse, endpoint=True)
        else:
            n_sparse = len(times)
            sparse_times = times
        pos_x = np.zeros(n_sparse, np.float64)
        pos_y = np.zeros(n_sparse, np.float64)
        pos_z = np.zeros(n_sparse, np.float64)
        vel_x = np.zeros(n_sparse, np.float64)
        vel_y = np.zeros(n_sparse, np.float64)
        vel_z = np.zeros(n_sparse, np.float64)
        for i, t in enumerate(sparse_times):
            atime = astime.Time(t, format="unix")
            # Get the satellite position and velocity in the equatorial frame (ICRS)
            p, v = coord.get_body_barycentric_posvel("earth", atime)
            # FIXME:  apply translation from earth center to L2.
            pm = p.xyz.to_value(u.kilometer)
            vm = v.xyz.to_value(u.kilometer / u.second)
            pos_x[i] = pm[0]
            pos_y[i] = pm[1]
            pos_z[i] = pm[2]
            vel_x[i] = vm[0]
            vel_y[i] = vm[1]
            vel_z[i] = vm[2]

        if do_interp:
            pos_x = np.interp(times, sparse_times, pos_x)
            pos_y = np.interp(times, sparse_times, pos_y)
            pos_z = np.interp(times, sparse_times, pos_z)
            vel_x = np.interp(times, sparse_times, vel_x)
            vel_y = np.interp(times, sparse_times, vel_y)
            vel_z = np.interp(times, sparse_times, vel_z)
        pos = np.stack([pos_x, pos_y, pos_z], axis=-1)
        vel = np.stack([vel_x, vel_y, vel_z], axis=-1)
        return pos, vel

    def _position(self, times):
        p, v = self._position_velocity(times)
        return p

    def _velocity(self, times):
        p, v = self._position_velocity(times)
        return v

Telescope

Bases: object

Class representing telescope properties for one observation.

Parameters:

Name Type Description Default
name str

The name of the telescope.

required
uid int

The Unique ID of the telescope. If not specified, constructed from a hash of the site name.

None
focalplane Focalplane

The focalplane for this observation.

None
site Site

The site of the telescope for this observation.

None
Source code in toast/instrument.py
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
class Telescope(object):
    """Class representing telescope properties for one observation.

    Args:
        name (str):  The name of the telescope.
        uid (int):  The Unique ID of the telescope.  If not specified, constructed from
            a hash of the site name.
        focalplane (Focalplane):  The focalplane for this observation.
        site (Site):  The site of the telescope for this observation.

    """

    def __init__(self, name, uid=None, focalplane=None, site=None):
        self.name = name
        self.uid = uid
        if self.uid is None:
            self.uid = name_UID(name)
        if not isinstance(focalplane, Focalplane):
            raise RuntimeError("focalplane should be a Focalplane class instance")
        self.focalplane = focalplane
        if not isinstance(site, Site):
            raise RuntimeError("site should be a Site class instance")
        self.site = site

    def __repr__(self):
        value = "<Telescope '{}': uid = {}, site = {}, ".format(
            self.name,
            self.uid,
            self.site,
        )
        value += "focalplane = {}".format(self.focalplane.__repr__())
        value += ">"
        return value

    def __eq__(self, other):
        if self.name != other.name:
            return False
        if self.uid != other.uid:
            return False
        if self.site != other.site:
            return False
        if self.focalplane != other.focalplane:
            return False
        return True

    def __ne__(self, other):
        return not self.__eq__(other)

Timer

Bases: pybind11_builtins.pybind11_object

Simple timer class.

This class is just a timer that you can start / stop / clear and report the results. It tracks the elapsed time and the number of times it was started.

__doc__ = '\n Simple timer class.\n\n This class is just a timer that you can start / stop / clear\n and report the results. It tracks the elapsed time and the number\n of times it was started.\n\n ' class

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

__module__ = 'toast._libtoast' class

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

__getstate__() method descriptor

getstate(self: toast._libtoast.Timer) -> tuple

__init__() method descriptor

init(args, *kwargs) Overloaded function.

  1. init(self: toast._libtoast.Timer) -> None

  2. init(self: toast._libtoast.Timer, init_time: float, init_calls: int) -> None

       Create the timer with some initial state.
    
       Used mainly when pickling / communicating the timer.  The timer is created
       in the "stopped" state.
    
       Args:
           init_time (float):  Initial elapsed seconds.
           init_calls (int):  Inital number of calls.
    

__repr__() method descriptor

repr(self: toast._libtoast.Timer) -> str

__setstate__() method descriptor

setstate(self: toast._libtoast.Timer, arg0: tuple) -> None

calls() method descriptor

calls(self: toast._libtoast.Timer) -> int

Return the number of calls.

Returns:

Type Description
int

The number of calls (if timer is stopped) else 0.

clear() method descriptor

clear(self: toast._libtoast.Timer) -> None

Clear the timer.

elapsed_seconds() method descriptor

elapsed_seconds(self: toast._libtoast.Timer) -> float

Return the elapsed seconds from a running timer without modifying the timer state.

Returns:

Type Description
float

The elapsed seconds (if timer is running).

is_running() method descriptor

is_running(self: toast._libtoast.Timer) -> bool

Is the timer running?

Returns:

Type Description
bool

True if the timer is running, else False.

report() method descriptor

report(self: toast._libtoast.Timer, message: str) -> None

Report results of the timer to STDOUT.

Parameters:

Name Type Description Default
message str

A message to prepend to the timing results.

required

Returns:

Type Description

None

report_clear() method descriptor

report_clear(self: toast._libtoast.Timer, message: str) -> None

Report results of the timer to STDOUT and clear the timer.

If the timer was running, it is stopped before reporting and clearing and then restarted. If the timer was stopped, then it is left in the stopped state after reporting and clearing.

Parameters:

Name Type Description Default
message str

A message to prepend to the timing results.

required

Returns:

Type Description

None

report_elapsed() method descriptor

report_elapsed(self: toast._libtoast.Timer, message: str) -> None

Report results of a running timer to STDOUT without modifying the timer state.

Parameters:

Name Type Description Default
message str

A message to prepend to the timing results.

required

Returns:

Type Description

None

seconds() method descriptor

seconds(self: toast._libtoast.Timer) -> float

Return the elapsed seconds.

Returns:

Type Description
float

The elapsed seconds (if timer is stopped) else -1.

start() method descriptor

start(self: toast._libtoast.Timer) -> None

Start the timer.

stop() method descriptor

stop(self: toast._libtoast.Timer) -> None

Stop the timer.

Weather

Bases: object

Base class representing the weather for one observation.

This class returns the nominal weather properties for an observation. Currently these are constant, under the assumption that the weather changes slowly during a good science observation.

This base class can be used directly to hold values specified at construction.

Parameters:

Name Type Description Default
time datetime

A python date/time in UTC.

None
ice_water Quantity

Precipitable ice water.

None
liquid_water Quantity

Precipitable liquid water.

None
pwv Quantity

Precipitable water vapor.

None
humidity Quantity

Specific humidity at 10m altitude.

None
surface_pressure Quantity

Surface pressure.

None
surface_temperature Quantity

Surface temperature.

None
air_temperature Quantity

Air temperature at 10m altitude.

None
west_wind Quantity

Eastward moving wind at 10m altitude.

None
south_wind Quantity

Northward moving wind at 10m altitude.

None
Source code in toast/weather.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
class Weather(object):
    """Base class representing the weather for one observation.

    This class returns the nominal weather properties for an observation.  Currently
    these are constant, under the assumption that the weather changes slowly during a
    good science observation.

    This base class can be used directly to hold values specified at construction.

    Args:
        time (datetime):  A python date/time in UTC.
        ice_water (Quantity):  Precipitable ice water.
        liquid_water (Quantity):  Precipitable liquid water.
        pwv (Quantity):  Precipitable water vapor.
        humidity (Quantity):  Specific humidity at 10m altitude.
        surface_pressure (Quantity):  Surface pressure.
        surface_temperature (Quantity):  Surface temperature.
        air_temperature (Quantity):  Air temperature at 10m altitude.
        west_wind (Quantity):  Eastward moving wind at 10m altitude.
        south_wind (Quantity):  Northward moving wind at 10m altitude.

    """

    def __init__(
        self,
        time=None,
        ice_water=None,
        liquid_water=None,
        pwv=None,
        humidity=None,
        surface_pressure=None,
        surface_temperature=None,
        air_temperature=None,
        west_wind=None,
        south_wind=None,
    ):
        self._time_val = time
        self._ice_water_val = ice_water
        self._liquid_water_val = liquid_water
        self._pwv_val = pwv
        self._humidity_val = humidity
        self._surface_pressure_val = surface_pressure
        self._surface_temperature_val = surface_temperature
        self._air_temperature_val = air_temperature
        self._west_wind_val = west_wind
        self._south_wind_val = south_wind

    def __eq__(self, other):
        if self._time_val != other._time_val:
            return False
        if not np.isclose(self._ice_water_val.value, other._ice_water_val.value):
            return False
        if not np.isclose(self._liquid_water_val.value, other._liquid_water_val.value):
            return False
        if not np.isclose(self._pwv_val.value, other._pwv_val.value):
            return False
        if not np.isclose(self._humidity_val.value, other._humidity_val.value):
            return False
        if not np.isclose(
            self._surface_pressure_val.value, other._surface_pressure_val.value
        ):
            return False
        if not np.isclose(
            self._surface_temperature_val.value, other._surface_temperature_val.value
        ):
            return False
        if not np.isclose(
            self._air_temperature_val.value, other._air_temperature_val.value
        ):
            return False
        if not np.isclose(self._west_wind_val.value, other._west_wind_val.value):
            return False
        if not np.isclose(self._south_wind_val.value, other._south_wind_val.value):
            return False
        return True

    def __ne__(self, other):
        return not self.__eq__(other)

    def _time(self):
        if self._time_val is None:
            raise NotImplementedError("Base class _time() called, but has no value")
        return self._time_val

    @property
    def time(self):
        """The time for these weather properties."""
        return self._time()

    def _ice_water(self):
        if self._ice_water_val is None:
            raise NotImplementedError(
                "Base class _ice_water() called, but has no value"
            )
        return self._ice_water_val

    @property
    def ice_water(self):
        """Total precipitable ice water [kg/m^2] (also [mm])."""
        return self._ice_water()

    def _liquid_water(self):
        if self._liquid_water_val is None:
            raise NotImplementedError(
                "Base class _liquid_water() called, but has no value"
            )
        return self._liquid_water_val

    @property
    def liquid_water(self):
        """Total precipitable liquid water [kg/m^2] (also [mm])."""
        return self._liquid_water()

    def _pwv(self):
        if self._pwv_val is None:
            raise NotImplementedError("Base class _pwv() called, but has no value")
        return self._pwv_val

    @property
    def pwv(self):
        """Total precipitable water vapor [kg/m^2] (also [mm])."""
        return self._pwv()

    def _humidity(self):
        if self._humidity_val is None:
            raise NotImplementedError("Base class _humidity() called, but has no value")
        return self._humidity_val

    @property
    def humidity(self):
        """10-meter specific humidity [kg/kg]"""
        return self._humidity()

    def _surface_pressure(self):
        if self._surface_pressure_val is None:
            raise NotImplementedError(
                "Base class _surface_pressure() called, but has no value"
            )
        return self._surface_pressure_val

    @property
    def surface_pressure(self):
        """Surface pressure [Pa]."""
        return self._surface_pressure()

    def _surface_temperature(self):
        if self._surface_temperature_val is None:
            raise NotImplementedError(
                "Base class _surface_temperature() called, but has no value"
            )
        return self._surface_temperature_val

    @property
    def surface_temperature(self):
        """Surface skin temperature [K]."""
        return self._surface_temperature()

    def _air_temperature(self):
        if self._air_temperature_val is None:
            raise NotImplementedError(
                "Base class _air_temperature() called, but has no value"
            )
        return self._air_temperature_val

    @property
    def air_temperature(self):
        """10-meter air temperature [K]."""
        return self._air_temperature()

    def _west_wind(self):
        if self._west_wind_val is None:
            raise NotImplementedError(
                "Base class _west_wind() called, but has no value"
            )
        return self._west_wind_val

    @property
    def west_wind(self):
        """10-meter eastward wind [m/s]."""
        return self._west_wind()

    def _south_wind(self):
        if self._south_wind_val is None:
            raise NotImplementedError(
                "Base class _south_wind() called, but has no value"
            )
        return self._south_wind_val

    @property
    def south_wind(self):
        """10-meter northward wind [m/s]."""
        return self._south_wind()

air_temperature property

10-meter air temperature [K].

humidity property

10-meter specific humidity [kg/kg]

ice_water property

Total precipitable ice water [kg/m^2] (also [mm]).

liquid_water property

Total precipitable liquid water [kg/m^2] (also [mm]).

pwv property

Total precipitable water vapor [kg/m^2] (also [mm]).

south_wind property

10-meter northward wind [m/s].

surface_pressure property

Surface pressure [Pa].

surface_temperature property

Surface skin temperature [K].

time property

The time for these weather properties.

west_wind property

10-meter eastward wind [m/s].

create_from_config(conf)

Instantiate classes in a configuration.

This iteratively instantiates classes defined in the configuration, replacing object names with references to those objects. References to other objects in the config are specified with the string '@config:' followed by a UNIX-style "path" where each element of the path is a dictionary key in the config. For example:

@config:/operators/pointing

Would reference an object at conf["operators"]["pointing"]. Object references like this only work if the target of the reference is a built-in type (str, float, int, etc) or a class derived from TraitConfig.

Parameters:

Name Type Description Default
conf dict

the configuration

required

Returns:

Type Description
SimpleNamespace

A namespace containing the sections and instantiated objects specified in the original config structure.

Source code in toast/traits.py
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
def create_from_config(conf):
    """Instantiate classes in a configuration.

    This iteratively instantiates classes defined in the configuration, replacing
    object names with references to those objects.  References to other objects in the
    config are specified with the string '@config:' followed by a UNIX-style "path"
    where each element of the path is a dictionary key in the config.  For example:

        @config:/operators/pointing

    Would reference an object at conf["operators"]["pointing"].  Object references like
    this only work if the target of the reference is a built-in type (str, float, int,
    etc) or a class derived from TraitConfig.

    Args:
        conf (dict):  the configuration

    Returns:
        (SimpleNamespace):  A namespace containing the sections and instantiated
            objects specified in the original config structure.

    """
    log = Logger.get()
    ref_prefix = "@config:"
    ref_pat = re.compile("^{}/(.*)".format(ref_prefix))

    def _get_node(path, tree):
        """Given the path as a list of keys, descend tree and return the node."""
        node = tree
        for elem in path:
            if isinstance(node, list):
                if not isinstance(elem, int):
                    msg = f"Node path {path}, element {elem} is not an integer."
                    msg += f" Cannot index list node {node}."
                    raise RuntimeError(msg)
                if elem >= len(node):
                    # We hit the end of our path, node does not yet exist.
                    return None
                node = node[elem]
            elif isinstance(node, dict):
                if elem not in node:
                    # We hit the end of our path, node does not yet exist.
                    return None
                node = node[elem]
            else:
                # We have hit a leaf node without getting to the end of our
                # path.  This means the node does not exist yet.
                return None
        return node

    def _dereference(value, tree):
        """If the object is a string with a path reference, return the object at
        that location in the tree.  Otherwise return the object.
        """
        if not isinstance(value, str):
            return value
        ref_mat = ref_pat.match(value)
        if ref_mat is not None:
            # This string contains a reference
            ref_path = ref_mat.group(1).split("/")
            return _get_node(ref_path, tree)
        else:
            # No reference, it is just a string
            return value

    def _insert_element(obj, parent_path, key, tree):
        """Insert object as a child of the parent path."""
        parent = _get_node(parent_path, tree)
        # print(f"{parent_path}: insert at {parent}: {obj}", flush=True)
        if parent is None:
            msg = f"Node {parent} at {parent_path} does not exist, cannot"
            msg += f" insert {obj}"
            raise RuntimeError(msg)
        elif isinstance(parent, list):
            parent.append(obj)
        elif isinstance(parent, dict):
            if key is None:
                msg = f"Node {parent} at {parent_path} cannot add {obj} without key"
                raise RuntimeError(msg)
            parent[key] = obj
        else:
            msg = f"Node {parent} at {parent_path} is not a list or dict"
            raise RuntimeError(msg)

    def _parse_string(value, parent_path, key, out):
        """Add a string to the output tree if it resolves."""
        obj = _dereference(value, out)
        # print(f"parse_string DEREF str {value} -> {obj}", flush=True)
        if obj is None:
            # Does not yet exist
            return 1
        # Add to output
        _insert_element(obj, parent_path, key, out)
        return 0

    def _parse_trait_value(obj, tree):
        """Recursively check trait value for references.
        Note that the input has already been tested for None values,
        so returning None from this function indicates that the
        trait value contains undefined references.
        """
        if isinstance(obj, str):
            temp = _dereference(obj, tree)
            # print(f"parse_trait DEREF str {obj} -> {temp}")
            return temp
        if isinstance(obj, list):
            ret = list()
            for it in obj:
                if it is None:
                    ret.append(None)
                else:
                    check = _parse_trait_value(it, tree)
                    if check is None:
                        return None
                    else:
                        ret.append(check)
            return ret
        if isinstance(obj, tuple):
            ret = list()
            for it in obj:
                if it is None:
                    ret.append(None)
                else:
                    check = _parse_trait_value(it, tree)
                    if check is None:
                        return None
                    else:
                        ret.append(check)
            return tuple(ret)
        if isinstance(obj, set):
            ret = set()
            for it in obj:
                if it is None:
                    ret.add(None)
                else:
                    check = _parse_trait_value(it, tree)
                    if check is None:
                        return None
                    else:
                        ret.add(check)
            return ret
        if isinstance(obj, dict):
            ret = dict()
            for k, v in obj.items():
                if v is None:
                    ret[k] = None
                else:
                    check = _parse_trait_value(v, tree)
                    if check is None:
                        return None
                    else:
                        ret[k] = check
            return ret
        # This must be some other scalar trait with no references
        return obj

    def _parse_traitconfig(value, parent_path, key, out):
        instance_name = None
        ctor = dict()
        for tname, tprops in value.items():
            if tname == "class":
                ctor["class"] = tprops
                continue
            ctor[tname] = dict()
            ctor[tname]["type"] = tprops["type"]
            tstring = tprops["value"]
            trait = string_to_trait(tstring)
            # print(f"{key} trait {tname} = {trait}", flush=True)
            if trait is None:
                ctor[tname]["value"] = None
                # print(f"{key} trait {tname} value = None", flush=True)
            else:
                check = _parse_trait_value(trait, out)
                # print(f"{key} trait {tname} value check = {check}", flush=True)
                if check is None:
                    # This trait contained unresolved references
                    # print(f"{key} trait {tname} value unresolved", flush=True)
                    return 1
                ctor[tname]["value"] = check
        # If we got this far, it means that we parsed all traits and can
        # instantiate the class.
        # print(f"{parent_path}|{key}: parse_tc ctor = {ctor}", flush=True)
        obj = TraitConfig.from_config(instance_name, ctor)
        # print(f"{parent_path}|{key}: parse_tc {obj}", flush=True)
        _insert_element(obj, parent_path, key, out)
        return 0

    def _parse_list(value, parent_path, key, out):
        parent = _get_node(parent_path, out)
        # print(f"{parent_path}: parse_list parent = {parent}", flush=True)
        _insert_element(list(), parent_path, key, out)
        child_path = list(parent_path)
        child_path.append(key)
        # print(f"{parent_path}:   parse_list child = {child_path}", flush=True)
        unresolved = 0
        for val in value:
            if isinstance(val, list):
                unresolved += _parse_list(val, child_path, None, out)
                # print(f"parse_list: after {val} unresolved = {unresolved}", flush=True)
            elif isinstance(val, dict):
                if "class" in val:
                    # This is a TraitConfig instance
                    unresolved += _parse_traitconfig(val, child_path, None, out)
                    # print(
                    #     f"parse_list: after {val} unresolved = {unresolved}", flush=True
                    # )
                else:
                    # Just a normal dictionary
                    unresolved += _parse_dict(val, child_path, None, out)
                    # print(
                    #     f"parse_list: after {val} unresolved = {unresolved}", flush=True
                    # )
            else:
                unresolved += _parse_string(val, child_path, None, out)
                # print(f"parse_list: after {val} unresolved = {unresolved}", flush=True)
        return unresolved

    def _parse_dict(value, parent_path, key, out):
        parent = _get_node(parent_path, out)
        # print(f"{parent_path}: parse_dict parent = {parent}", flush=True)
        _insert_element(OrderedDict(), parent_path, key, out)
        child_path = list(parent_path)
        child_path.append(key)
        # print(f"{parent_path}:   parse_dict child = {child_path}", flush=True)
        unresolved = 0
        for k, val in value.items():
            if isinstance(val, list):
                unresolved += _parse_list(val, child_path, k, out)
                # print(f"parse_dict: after {k} unresolved = {unresolved}", flush=True)
            elif isinstance(val, dict):
                if "class" in val:
                    # This is a TraitConfig instance
                    unresolved += _parse_traitconfig(val, child_path, k, out)
                    # print(
                    #     f"parse_dict: after {k} unresolved = {unresolved}", flush=True
                    # )
                else:
                    # Just a normal dictionary
                    unresolved += _parse_dict(val, child_path, k, out)
                    # print(
                    #     f"parse_dict: after {k} unresolved = {unresolved}", flush=True
                    # )
            else:
                unresolved += _parse_string(val, child_path, k, out)
                # print(f"parse_dict: after {k} unresolved = {unresolved}", flush=True)
        return unresolved

    # Iteratively instantiate objects

    out = OrderedDict()
    done = False
    last_unresolved = None

    it = 0
    while not done:
        # print("PARSE iter ", it)
        done = True
        unresolved = 0

        # Go through the top-level dictionary
        for topkey in list(conf.keys()):
            if isinstance(conf[topkey], str):
                out[topkey] = conf[topkey]
                continue
            if len(conf[topkey]) > 0:
                unresolved += _parse_dict(conf[topkey], list(), topkey, out)
                # print(f"PARSE {it}: {topkey} unresolved now {unresolved}", flush=True)

        if last_unresolved is not None:
            if unresolved == last_unresolved:
                msg = f"Cannot resolve all references ({unresolved} remaining)"
                msg += f" in the configuration"
                log.error(msg)
                raise RuntimeError(msg)
        last_unresolved = unresolved
        if unresolved > 0:
            done = False
        it += 1

    # Convert this recursively into a namespace for easy use
    root_temp = dict()
    for sect in list(out.keys()):
        if isinstance(out[sect], str):
            root_temp[sect] = out[sect]
        else:
            sect_ns = types.SimpleNamespace(**out[sect])
            root_temp[sect] = sect_ns
    out_ns = types.SimpleNamespace(**root_temp)
    return out_ns

fake_hexagon_focalplane(n_pix=7, width=5.0 * u.degree, sample_rate=1.0 * u.Hz, epsilon=0.0, fwhm=10.0 * u.arcmin, bandcenter=150 * u.GHz, bandwidth=20 * u.GHz, psd_net=0.1 * u.K * np.sqrt(1 * u.second), psd_fmin=0.0 * u.Hz, psd_alpha=1.0, psd_fknee=0.05 * u.Hz, fwhm_sigma=0.0 * u.arcmin, bandcenter_sigma=0 * u.GHz, bandwidth_sigma=0 * u.GHz, random_seed=123456)

Create a simple focalplane model for testing.

This function creates a basic focalplane with hexagon-packed pixels, each with two orthogonal detectors. It is intended for unit tests, benchmarking, etc where a Focalplane is needed but the details are less important. In addition to nominal detector properties, this function adds other simulation-specific parameters to the metadata.

Parameters:

Name Type Description Default
n_pix int

The number of pixels with hexagonal packing (e.g. 1, 7, 19, 37, 61, etc).

7
width Quantity

The angular width of the focalplane field of view on the sky.

5.0 * degree
sample_rate Quantity

The sample rate for all detectors.

1.0 * Hz
epsilon float

The cross-polar response for all detectors.

0.0
fwhm Quantity

The beam FWHM

10.0 * arcmin
bandcenter Quantity

The detector band center.

150 * GHz
bandwidth Quantity

The detector band width.

20 * GHz
psd_net Quantity

The Noise Equivalent Temperature of each detector.

0.1 * K * sqrt(1 * second)
psd_fmin Quantity

The frequency below which to roll off the 1/f spectrum.

0.0 * Hz
psd_alpha float

The spectral slope.

1.0
psd_fknee Quantity

The 1/f knee frequency.

0.05 * Hz
fwhm_sigma Quantity

Draw random detector FWHM values from a normal distribution with this width.

0.0 * arcmin
bandcenter_sigma Quantity

Draw random bandcenter values from a normal distribution with this width.

0 * GHz
bandwidth_sigma Quantity

Draw random bandwidth values from a normal distribution with this width.

0 * GHz
random_seed int

The seed to use for numpy random.

123456

Returns:

Type Description
Focalplane

The fake focalplane.

Source code in toast/instrument_sim.py
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
def fake_hexagon_focalplane(
    n_pix=7,
    width=5.0 * u.degree,
    sample_rate=1.0 * u.Hz,
    epsilon=0.0,
    fwhm=10.0 * u.arcmin,
    bandcenter=150 * u.GHz,
    bandwidth=20 * u.GHz,
    psd_net=0.1 * u.K * np.sqrt(1 * u.second),
    psd_fmin=0.0 * u.Hz,
    psd_alpha=1.0,
    psd_fknee=0.05 * u.Hz,
    fwhm_sigma=0.0 * u.arcmin,
    bandcenter_sigma=0 * u.GHz,
    bandwidth_sigma=0 * u.GHz,
    random_seed=123456,
):
    """Create a simple focalplane model for testing.

    This function creates a basic focalplane with hexagon-packed pixels, each with
    two orthogonal detectors.  It is intended for unit tests, benchmarking, etc where
    a Focalplane is needed but the details are less important.  In addition to nominal
    detector properties, this function adds other simulation-specific parameters to
    the metadata.

    Args:
        n_pix (int):  The number of pixels with hexagonal packing
            (e.g. 1, 7, 19, 37, 61, etc).
        width (Quantity):  The angular width of the focalplane field of view on the sky.
        sample_rate (Quantity):  The sample rate for all detectors.
        epsilon (float):  The cross-polar response for all detectors.
        fwhm (Quantity):  The beam FWHM
        bandcenter (Quantity):  The detector band center.
        bandwidth (Quantity):  The detector band width.
        psd_net (Quantity):  The Noise Equivalent Temperature of each detector.
        psd_fmin (Quantity):  The frequency below which to roll off the 1/f spectrum.
        psd_alpha (float):  The spectral slope.
        psd_fknee (Quantity):  The 1/f knee frequency.
        fwhm_sigma (Quantity):  Draw random detector FWHM values from a normal
            distribution with this width.
        bandcenter_sigma (Quantity):  Draw random bandcenter values from a normal
            distribution with this width.
        bandwidth_sigma (Quantity):  Draw random bandwidth values from a normal
            distribution with this width.
        random_seed (int):  The seed to use for numpy random.

    Returns:
        (Focalplane):  The fake focalplane.

    """
    zaxis = np.array([0.0, 0.0, 1.0])
    center = None

    pol_A = hex_gamma_angles_qu(n_pix, offset=0.0 * u.degree)
    pol_B = hex_gamma_angles_qu(n_pix, offset=90.0 * u.degree)
    props_A = hex_layout(n_pix, width, "D", "A", pol_A, center=center)
    props_B = hex_layout(n_pix, width, "D", "B", pol_B, center=center)

    temp_data = dict(props_A)
    temp_data.update(props_B)

    # Sort by detector name so that detector pairs are together
    det_data = {x: temp_data[x] for x in sorted(temp_data.keys())}

    nrings = hex_nring(n_pix)

    n_det = len(det_data)

    nominal_freq = str(int(bandcenter.to_value(u.GHz)))
    det_names = [f"{x}-{nominal_freq}" for x in det_data.keys()]
    det_gamma = u.Quantity([det_data[x]["gamma"] for x in det_data.keys()], u.radian)

    det_table = QTable(
        [
            Column(name="name", data=det_names),
            Column(name="quat", data=[det_data[x]["quat"] for x in det_data.keys()]),
            Column(name="pol_leakage", length=n_det, unit=None),
            Column(name="psi_pol", length=n_det, unit=u.rad),
            Column(name="gamma", length=n_det, unit=u.rad),
            Column(name="fwhm", length=n_det, unit=u.arcmin),
            Column(name="psd_fmin", length=n_det, unit=u.Hz),
            Column(name="psd_fknee", length=n_det, unit=u.Hz),
            Column(name="psd_alpha", length=n_det, unit=None),
            Column(name="psd_net", length=n_det, unit=(u.K * np.sqrt(1.0 * u.second))),
            Column(name="bandcenter", length=n_det, unit=u.GHz),
            Column(name="bandwidth", length=n_det, unit=u.GHz),
            Column(
                name="pixel", data=[x.rstrip("A").rstrip("B") for x in det_data.keys()]
            ),
        ]
    )

    np.random.seed(random_seed)

    for idet, det in enumerate(det_data.keys()):
        det_table[idet]["pol_leakage"] = epsilon
        # psi_pol is the rotation from the PXX beam frame to the polarization
        # sensitive direction.
        if det.endswith("A"):
            det_table[idet]["psi_pol"] = 0 * u.rad
        else:
            det_table[idet]["psi_pol"] = np.pi / 2 * u.rad
        det_table[idet]["gamma"] = det_gamma[idet]
        det_table[idet]["fwhm"] = fwhm * (
            1 + np.random.randn() * fwhm_sigma.to_value(fwhm.unit)
        )
        det_table[idet]["bandcenter"] = bandcenter * (
            1 + np.random.randn() * bandcenter_sigma.to_value(bandcenter.unit)
        )
        det_table[idet]["bandwidth"] = bandwidth * (
            1 + np.random.randn() * bandwidth_sigma.to_value(bandcenter.unit)
        )
        det_table[idet]["psd_fmin"] = psd_fmin
        det_table[idet]["psd_fknee"] = psd_fknee
        det_table[idet]["psd_alpha"] = psd_alpha
        det_table[idet]["psd_net"] = psd_net

    return Focalplane(
        detector_data=det_table,
        sample_rate=sample_rate,
        field_of_view=1.1 * (width + 2 * fwhm),
    )

get_world()

Retrieve the default world communicator and its properties.

If MPI is enabled, this returns MPI.COMM_WORLD and the process rank and number of processes. If MPI is disabled, this returns None for the communicator, zero for the rank, and one for the number of processes.

Returns:

Type Description
tuple

The communicator, number of processes, and rank.

Source code in toast/mpi.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def get_world():
    """Retrieve the default world communicator and its properties.

    If MPI is enabled, this returns MPI.COMM_WORLD and the process rank and number of
    processes.  If MPI is disabled, this returns None for the communicator, zero
    for the rank, and one for the number of processes.

    Returns:
        (tuple):  The communicator, number of processes, and rank.

    """
    rank = 0
    procs = 1
    world = None
    if use_mpi:
        world = MPI.COMM_WORLD
        rank = world.rank
        procs = world.size
    return world, procs, rank

job_group_size(world_comm, job_args, schedule=None, obs_times=None, focalplane=None, num_dets=None, sample_rate=None, det_copies=2, full_pointing=False)

Given parameters about the job, estimate the best group size.

Using the provided information, this function determines the distribution of MPI processes across nodes and selects the group size to be a whole number of nodes with the following criteria:

- The nodes per group divides evenly into the total number of nodes.

- The nodes in one group have enough total memory to fit the largest
  observation.

- If possible, the observations are load balanced (in terms of memory) across
  the groups.

Args:

Returns:

Type Description
int

The best number of processes in a group.

Source code in toast/job.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
def job_group_size(
    world_comm,
    job_args,
    schedule=None,
    obs_times=None,
    focalplane=None,
    num_dets=None,
    sample_rate=None,
    det_copies=2,
    full_pointing=False,
):
    """Given parameters about the job, estimate the best group size.

    Using the provided information, this function determines the distribution of MPI
    processes across nodes and selects the group size to be a whole number of nodes
    with the following criteria:

        - The nodes per group divides evenly into the total number of nodes.

        - The nodes in one group have enough total memory to fit the largest
          observation.

        - If possible, the observations are load balanced (in terms of memory) across
          the groups.

    Args:

    Returns:
        (int):  The best number of processes in a group.

    """
    log = Logger.get()
    gb = 1024**3

    rank = 0
    procs = 1

    if world_comm is not None:
        rank = world_comm.rank
        procs = world_comm.size

    # If the user has manually specifed the group size, just return that.
    if job_args.group_size is not None:
        if rank == 0:
            log.info(
                "Job plan: user-specified group size set to {}".format(
                    job_args.group_size
                )
            )
        return job_args.group_size

    n_det = None
    rate = None
    if focalplane is not None:
        n_det = len(focalplane.detector_data)
        rate = focalplane.sample_rate.to_value(u.Hz)
    else:
        if num_dets is None or sample_rate is None:
            msg = "specify either a focalplane or both the number of detectors and rate"
            log.error(msg)
            raise RuntimeError(msg)
        n_det = num_dets
        rate = sample_rate.to_value(u.Hz)

    obs_len = list()
    if schedule is not None:
        for sc in schedule.scans:
            obs_len.append((sc.stop - sc.start).total_seconds())
    elif obs_times is not None:
        for start, stop in obs_times:
            obs_len.append((stop - start).total_seconds())
    else:
        msg = "specify either a schedule or a list of observation start / stop tuples"
        log.error(msg)
        raise RuntimeError(msg)
    obs_len = list(sorted(obs_len, reverse=True))

    # Get memory available

    n_node, procs_per_node, min_avail, max_avail = job_size(world_comm)

    node_mem = min_avail
    if job_args.node_mem is not None:
        node_mem = job_args.node_mem

    def observation_mem(seconds, nodes):
        m = 0
        # Assume 2 copies of node-shared boresight pointing
        m += 2 * 4 * 8 * rate * seconds * nodes
        # Assume 8 byte floats per det sample
        m += det_copies * n_det * 8 * rate * seconds
        return m

    group_nodes = 1
    n_group = n_node // group_nodes
    group_mem = group_nodes * node_mem
    obs_mem = [observation_mem(x, group_nodes) for x in obs_len]

    if rank == 0:
        log.info(
            "Job has {} nodes each with {} processes ({} total)".format(
                n_node, procs_per_node, procs
            )
        )
        log.info(
            "Job nodes have {:02f}GB / {:02f}GB available memory (min / max)".format(
                min_avail / gb, max_avail / gb
            )
        )
        if job_args.node_mem is not None:
            log.info(
                "Job plan using user-specified node memory of {:02f}GB".format(
                    node_mem / gb
                )
            )
        else:
            log.info(
                "Job plan using minimum node memory of {:02f}GB".format(node_mem / gb)
            )
        log.info(
            "Job observation lengths = {} minutes / {} minutes (min / max)".format(
                int(obs_len[-1] / 60), int(obs_len[0] / 60)
            )
        )

    if rank == 0:
        log.verbose(
            "Job test {} nodes per group, largest obs = {:0.2f}GB".format(
                group_nodes, obs_mem[0] / gb
            )
        )

    while (group_nodes < n_node) and (
        (group_mem < obs_mem[0]) or (n_group > len(obs_len))
    ):
        # The group size cannot fit the largest observation
        # Increase to the next valid value.
        try_group = group_nodes + 1
        while (try_group < n_node) and (n_node % try_group != 0):
            try_group += 1
        group_nodes = try_group
        n_group = n_node // group_nodes
        group_mem = group_nodes * node_mem
        obs_mem = [observation_mem(x, group_nodes) for x in obs_len]
        if rank == 0:
            log.verbose(
                "Job test {} nodes per group, largest obs = {:0.2f}GB".format(
                    group_nodes, obs_mem[0] / gb
                )
            )

    if rank == 0:
        log.info("Job selecting {} nodes per group".format(group_nodes))

    total_mem = np.sum(obs_mem)
    if total_mem > n_node * node_mem:
        msg = "Sum of observation memory use ({:0.2f}GB) greater than job total ({:0.2f}GB)".format(
            total_mem / gb, (n_node * node_mem) / gb
        )
        log.error(msg)
        raise RuntimeError(msg)

    return group_nodes * procs_per_node

load_config(file, input=None, comm=None)

Load a config file in a supported format.

This loads the document into a config dictionary. If input is specified, the file contents are merged into this dictionary.

Parameters:

Name Type Description Default
file str

The file to load.

required
input dict

Append to this dictionary.

None

Returns:

Type Description
dict

The result.

Source code in toast/config/cli.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def load_config(file, input=None, comm=None):
    """Load a config file in a supported format.

    This loads the document into a config dictionary.  If input is specified, the file
    contents are merged into this dictionary.

    Args:
        file (str):  The file to load.
        input (dict):  Append to this dictionary.

    Returns:
        (dict):  The result.

    """
    ret = None
    # If the file has a recognized extension, just use that instead of guessing.
    ext = os.path.splitext(file)[1]
    if ext in [".yml", ".yaml"]:
        return load_yaml(file, input=input, comm=comm)
    elif ext in [".json", ".jsn"]:
        return load_json(file, input=input, comm=comm)
    elif ext in [".toml", ".tml"]:
        return load_toml(file, input=input, comm=comm)
    else:
        try:
            ret = load_toml(file, input=input, comm=comm)
        except TOMLKitError:
            try:
                ret = load_json(file, input=input, comm=comm)
            except (ValueError, JSONDecodeError):
                ret = load_yaml(file, input=input, comm=comm)
        return ret

parse_config(parser, operators=list(), templates=list(), prefix='', opts=None)

Load command line arguments associated with object properties.

This function:

* Adds a "--config" option to the parser which accepts multiple config file
  paths to load.

* Adds "--default_toml" and "--default_json" options to dump the default config
  options to files.

* Adds a option "--job_group_size" to provide the commonly used option for
  setting the group size of the TOAST communicator.

* Adds arguments for all object parameters in the defaults for the specified
  classes.

* Builds a config dictionary starting from the defaults, updating these using
  values from any config files, and then applies any overrides from the
  commandline.

Parameters:

Name Type Description Default
parser ArgumentParser

The argparse parser.

required
operators list

The operator instances to configure from files or commandline. These instances should have their "name" attribute set to something meaningful, since that name is used to construct the commandline options. An enable / disable option is created for each, which toggles the TraitConfig base class "disabled" trait.

list()
templates list

The template instances to add to the commandline. These instances should have their "name" attribute set to something meaningful, since that name is used to construct the commandline options. An enable / disable option is created for each, which toggles the TraitConfig base class "disabled" trait.

list()
prefix str

Optional string to prefix all options by.

''
opts list

If not None, parse arguments from this list instead of sys.argv.

None

Returns:

Type Description
tuple

The (config dictionary, args). The args namespace contains all the remaining parameters after extracting the operator and template options.

Source code in toast/config/cli.py
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
def parse_config(
    parser,
    operators=list(),
    templates=list(),
    prefix="",
    opts=None,
):
    """Load command line arguments associated with object properties.

    This function:

        * Adds a "--config" option to the parser which accepts multiple config file
          paths to load.

        * Adds "--default_toml" and "--default_json" options to dump the default config
          options to files.

        * Adds a option "--job_group_size" to provide the commonly used option for
          setting the group size of the TOAST communicator.

        * Adds arguments for all object parameters in the defaults for the specified
          classes.

        * Builds a config dictionary starting from the defaults, updating these using
          values from any config files, and then applies any overrides from the
          commandline.

    Args:
        parser (ArgumentParser):  The argparse parser.
        operators (list):  The operator instances to configure from files or
            commandline.  These instances should have their "name" attribute set to
            something meaningful, since that name is used to construct the commandline
            options.  An enable / disable option is created for each, which toggles the
            TraitConfig base class "disabled" trait.
        templates (list):  The template instances to add to the commandline.
            These instances should have their "name" attribute set to something
            meaningful, since that name is used to construct the commandline options.
            An enable / disable option is created for each, which toggles the
            TraitConfig base class "disabled" trait.
        prefix (str):  Optional string to prefix all options by.
        opts (list):  If not None, parse arguments from this list instead of sys.argv.

    Returns:
        (tuple):  The (config dictionary, args).  The args namespace contains all the
            remaining parameters after extracting the operator and template options.

    """

    # The default configuration
    defaults_op = build_config(operators)
    defaults_tmpl = build_config(templates)

    # Add commandline overrides
    if len(operators) > 0:
        add_config_args(
            parser,
            defaults_op,
            "operators",
            ignore=["API"],
            prefix=prefix,
        )
    if len(templates) > 0:
        add_config_args(
            parser,
            defaults_tmpl,
            "templates",
            ignore=["API"],
            prefix=prefix,
        )

    # Combine all the defaults
    defaults = OrderedDict()
    defaults["operators"] = OrderedDict()
    defaults["templates"] = OrderedDict()
    if "operators" in defaults_op:
        defaults["operators"].update(defaults_op["operators"])
    if "templates" in defaults_tmpl:
        defaults["templates"].update(defaults_tmpl["templates"])

    # Add an option to load one or more config files.  These should have compatible
    # names for the operators used in defaults.
    parser.add_argument(
        "--config",
        type=str,
        required=False,
        nargs="+",
        help="One or more input config files.",
    )

    # Add options to dump default values
    parser.add_argument(
        "--defaults_toml",
        type=str,
        required=False,
        default=None,
        help="Dump default config values to a TOML file",
    )
    parser.add_argument(
        "--defaults_json",
        type=str,
        required=False,
        default=None,
        help="Dump default config values to a JSON file",
    )
    parser.add_argument(
        "--defaults_yaml",
        type=str,
        required=False,
        default=None,
        help="Dump default config values to a YAML file",
    )

    parser.add_argument(
        "--job_group_size",
        required=False,
        type=int,
        default=None,
        help="(Advanced) Size of the process groups",
    )

    parser.add_argument(
        "--job_node_mem",
        required=False,
        type=int,
        default=None,
        help="(Advanced) Override the detected memory per node in bytes",
    )

    # Parse commandline or list of options.
    if opts is None:
        opts = sys.argv[1:]
    args = parser.parse_args(args=opts)

    # Dump default config values.
    if args.defaults_toml is not None:
        dump_toml(args.defaults_toml, defaults)
    if args.defaults_json is not None:
        dump_json(args.defaults_json, defaults)
    if args.defaults_yaml is not None:
        dump_yaml(args.defaults_yaml, defaults)

    # Parse job args
    jobargs = types.SimpleNamespace(
        node_mem=args.job_node_mem,
        group_size=args.job_group_size,
    )
    del args.job_node_mem
    del args.job_group_size

    # Load any config files.  This overrides default values with config file contents.
    config = copy.deepcopy(defaults)

    if args.config is not None:
        for conf in args.config:
            config = load_config(conf, input=config)

    # Parse operator and template commandline options.  These override any config
    # file or default values.
    if len(operators) > 0:
        op_remaining = args_update_config(
            args, config, opts, "operators", prefix=prefix
        )
    else:
        op_remaining = args
    if len(templates) > 0:
        remaining = args_update_config(
            op_remaining, config, opts, "templates", prefix=prefix
        )
    else:
        remaining = op_remaining

    # Remove the options we created in this function
    del remaining.config
    del remaining.defaults_toml
    del remaining.defaults_json
    del remaining.defaults_yaml

    return config, remaining, jobargs

Accelerator Use

TOAST supports the use of JAX for offloading some operations. To enable this...