Skip to content

Processing Model

The highest-level control of the workflow is done by the user, often by writing a small Python script or notebook (some examples are included). Such scripts make use of TOAST functionality for distributing data and then call built-in or custom operators to simulate and / or process the timestream data.

toast.ops.Operator

Bases: TraitConfig

Base class for Operators.

An operator has methods which work with a toast.dist.Data object. This base class defines some interfaces and also some common helper methods.

Source code in toast/ops/operator.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
class Operator(TraitConfig):
    """Base class for Operators.

    An operator has methods which work with a toast.dist.Data object.  This base class
    defines some interfaces and also some common helper methods.

    """

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def _exec(self, data, detectors=None, **kwargs):
        raise NotImplementedError("Fell through to Operator base class")

    @function_timer_stackskip
    def exec(self, data, detectors=None, **kwargs):
        """Perform operations on a Data object.

        If a list of detectors is specified, only process these detectors.  Any extra
        kwargs are passed to the derived class internal method.

        Accelerator use:  If the derived class supports OpenMP target offload and all the required
        data objects exist on the device, then the `_exec()` method will be called
        with the "use_accel=True" option.  Any operator that returns "True" from its
        _supports_accel() method should also accept the "use_accel" keyword argument.

        Args:
            data (toast.Data):  The distributed data.
            detectors (list):  A list of detector names or indices.  If None, this
                indicates a list of all detectors.

        Returns:
            None

        """
        log = Logger.get()
        if self.enabled:
            self._exec(
                data,
                detectors=detectors,
                **kwargs,
            )
        else:
            if data.comm.world_rank == 0:
                msg = f"Operator {self.name} is disabled, skipping call to exec()"
                log.debug(msg)

    def _finalize(self, data, **kwargs):
        raise NotImplementedError("Fell through to Operator base class")

    @function_timer_stackskip
    def finalize(self, data, **kwargs):
        """Perform any final operations / communication.

        A call to this function indicates that all calls to the 'exec()' method are
        complete, and the operator should perform any final actions.  Any extra
        kwargs are passed to the derived class internal method.

        Args:
            data (toast.Data):  The distributed data.

        Returns:
            (value):  None or an Operator-dependent result.

        """
        log = Logger.get()
        if self.enabled:
            msg = f"Calling finalize() for operator {self.name}"
            log.verbose(msg)
            return self._finalize(data, **kwargs)
        else:
            if data.comm.world_rank == 0:
                msg = f"Operator {self.name} is disabled, skipping call to finalize()"
                log.debug(msg)

    @function_timer_stackskip
    def apply(self, data, detectors=None, **kwargs):
        """Run exec() and finalize().

        This is a convenience wrapper that calls exec() exactly once with an optional
        detector list and then immediately calls finalize().  This is really only
        useful when working interactively to save a bit of typing.  When a `Pipeline`
        is calling other operators it will always use exec() and finalize() explicitly.

        After calling this, any future calls to exec() may produce unexpected results,
        since finalize() has already been called.

        Args:
            data (toast.Data):  The distributed data.
            detectors (list):  A list of detector names or indices.  If None, this
                indicates a list of all detectors.

        Returns:
            (value):  None or an Operator-dependent result.

        """
        self.exec(data, detectors=detectors, **kwargs)
        return self.finalize(data, **kwargs)

    @function_timer_stackskip
    def load_exec(self, data, detectors=None, **kwargs):
        """Perform operations on a Data object that is not yet in memory.

        In some cases, the full detector data across multiple observations is too
        large to fit in memory.  This method calls exec() one observation at a time
        and looks for an attribute named "loader" in each observation.  If this
        exists, it should be an instance of a Loader class that defines 2 methods
        that can be called like this:

            load(Observation)
            unload(Observation)

        These should populate and clear any DetectorData in the observation.  The
        experiment-specific code which defines and instantiates the Loader class
        should ensure that any metadata needed to create and read the detector data
        is either contained in the Loader instance or in the Observation data or
        metadata.

        All kwargs are passed to the underlying call to exec().

        Args:
            data (toast.Data):  The distributed data.

        Returns:
            None

        """
        log = Logger.get()
        if self.enabled:
            for iobs, obs in enumerate(data.obs):
                unload = False
                if hasattr(obs, "loader"):
                    obs.loader.load(obs)
                    unload = True
                temp_data = data.select(obs_index=iobs)
                self.exec(temp_data, detectors=detectors, **kwargs)
                del temp_data
                if unload:
                    obs.loader.unload(obs)
        else:
            if data.comm.world_rank == 0:
                msg = f"Operator {self.name} is disabled, skipping call to load_exec()"
                log.debug(msg)

    @function_timer_stackskip
    def load_apply(self, data, detectors=None, **kwargs):
        """Run load_exec() and finalize().

        This is a convenience wrapper that calls load_exec() once and then immediately
        calls finalize().  Note that operator finalize methods should not rely on the
        existence of any detector data.

        After calling this, any future calls to exec() or load_exec() may produce
        unexpected results, since finalize() has already been called.

        All kwargs are passed to load_exec() and finalize().

        Args:
            data (toast.Data):  The distributed data.

        Returns:
            (value):  None or an Operator-dependent result.

        """
        self.load_exec(data, detectors=detectors, **kwargs)
        return self.finalize(data, **kwargs)

    def _requires(self):
        raise NotImplementedError("Fell through to Operator base class")
        return dict()

    def requires(self):
        """Dictionary of Observation keys directly used by this Operator.
        Including optional keys that will be created by the operator if they do not exist.

        This dictionary should have 5 keys, each containing a list of "global",
        "metadata", "detdata", "shared", and "intervals" fields.  Global keys are
        contained in the top-level data object.  Metadata keys are those contained
        in the primary observation dictionary.  Detdata, shared, and intervals keys are
        those contained in the "detdata", "shared", and "intervals" observation
        attributes.

        Returns:
            (dict):  The keys in the Observation dictionary required by the operator.

        """
        # Ensure that all keys exist
        req = self._requires()
        for key in ["global", "meta", "detdata", "shared", "intervals"]:
            if key not in req:
                req[key] = list()
        # All operators use an implied interval list of the full sample range
        if None not in req["intervals"]:
            req["intervals"].append(None)
        return req

    def _provides(self):
        raise NotImplementedError("Fell through to Operator base class")
        return dict()

    def provides(self):
        """Dictionary of Observation keys created or modified by this Operator.

        This dictionary should have 5 keys, each containing a list of "global",
        "metadata", "detdata", "shared", and "intervals" fields.  Global keys are
        contained in the top-level data object.  Metadata keys are those contained
        in the primary observation dictionary.  Detdata, shared, and intervals keys are
        those contained in the "detdata", "shared", and "intervals" observation
        attributes.

        Returns:
            (dict):  The keys in the Observation dictionary that will be created
                or modified.

        """
        # Ensure that all keys exist
        prov = self._provides()
        for key in ["global", "meta", "detdata", "shared", "intervals"]:
            if key not in prov:
                prov[key] = list()
        return prov

    @classmethod
    def get_class_config_path(cls):
        return "/operators/{}".format(cls.__qualname__)

    def get_config_path(self):
        if self.name is None:
            return None
        return "/operators/{}".format(self.name)

    @classmethod
    def get_class_config(cls, input=None):
        """Return a dictionary of the default traits of an Operator class.

        This returns a new or appended dictionary.  The class instance properties are
        contained in a dictionary found in result["operators"][cls.name].

        If the specified named location in the input config already exists then an
        exception is raised.

        Args:
            input (dict):  The optional input dictionary to update.

        Returns:
            (dict):  The created or updated dictionary.

        """
        return super().get_class_config(section="operators", input=input)

    def get_config(self, input=None):
        """Return a dictionary of the current traits of an Operator *instance*.

        This returns a new or appended dictionary.  The operator instance properties are
        contained in a dictionary found in result["operators"][self.name].

        If the specified named location in the input config already exists then an
        exception is raised.

        Args:
            input (dict):  The optional input dictionary to update.

        Returns:
            (dict):  The created or updated dictionary.

        """
        return super().get_config(section="operators", input=input)

    @classmethod
    def translate(cls, props):
        """Given a config dictionary, modify it to match the current API."""
        # For operators, the derived classes should implement this method as needed
        # and then call super().translate(props) to trigger this method.  Here we strip
        # the 'API' key from the config.
        props = super().translate(props)
        if "API" in props:
            del props["API"]
        return props

__init__(**kwargs)

Source code in toast/ops/operator.py
18
19
def __init__(self, **kwargs):
    super().__init__(**kwargs)

_exec(data, detectors=None, **kwargs)

Source code in toast/ops/operator.py
21
22
def _exec(self, data, detectors=None, **kwargs):
    raise NotImplementedError("Fell through to Operator base class")

_finalize(data, **kwargs)

Source code in toast/ops/operator.py
57
58
def _finalize(self, data, **kwargs):
    raise NotImplementedError("Fell through to Operator base class")

_provides()

Source code in toast/ops/operator.py
206
207
208
def _provides(self):
    raise NotImplementedError("Fell through to Operator base class")
    return dict()

_requires()

Source code in toast/ops/operator.py
177
178
179
def _requires(self):
    raise NotImplementedError("Fell through to Operator base class")
    return dict()

apply(data, detectors=None, **kwargs)

Run exec() and finalize().

This is a convenience wrapper that calls exec() exactly once with an optional detector list and then immediately calls finalize(). This is really only useful when working interactively to save a bit of typing. When a Pipeline is calling other operators it will always use exec() and finalize() explicitly.

After calling this, any future calls to exec() may produce unexpected results, since finalize() has already been called.

Parameters:

Name Type Description Default
data Data

The distributed data.

required
detectors list

A list of detector names or indices. If None, this indicates a list of all detectors.

None

Returns:

Type Description
value

None or an Operator-dependent result.

Source code in toast/ops/operator.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
@function_timer_stackskip
def apply(self, data, detectors=None, **kwargs):
    """Run exec() and finalize().

    This is a convenience wrapper that calls exec() exactly once with an optional
    detector list and then immediately calls finalize().  This is really only
    useful when working interactively to save a bit of typing.  When a `Pipeline`
    is calling other operators it will always use exec() and finalize() explicitly.

    After calling this, any future calls to exec() may produce unexpected results,
    since finalize() has already been called.

    Args:
        data (toast.Data):  The distributed data.
        detectors (list):  A list of detector names or indices.  If None, this
            indicates a list of all detectors.

    Returns:
        (value):  None or an Operator-dependent result.

    """
    self.exec(data, detectors=detectors, **kwargs)
    return self.finalize(data, **kwargs)

exec(data, detectors=None, **kwargs)

Perform operations on a Data object.

If a list of detectors is specified, only process these detectors. Any extra kwargs are passed to the derived class internal method.

Accelerator use: If the derived class supports OpenMP target offload and all the required data objects exist on the device, then the _exec() method will be called with the "use_accel=True" option. Any operator that returns "True" from its _supports_accel() method should also accept the "use_accel" keyword argument.

Parameters:

Name Type Description Default
data Data

The distributed data.

required
detectors list

A list of detector names or indices. If None, this indicates a list of all detectors.

None

Returns:

Type Description

None

Source code in toast/ops/operator.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@function_timer_stackskip
def exec(self, data, detectors=None, **kwargs):
    """Perform operations on a Data object.

    If a list of detectors is specified, only process these detectors.  Any extra
    kwargs are passed to the derived class internal method.

    Accelerator use:  If the derived class supports OpenMP target offload and all the required
    data objects exist on the device, then the `_exec()` method will be called
    with the "use_accel=True" option.  Any operator that returns "True" from its
    _supports_accel() method should also accept the "use_accel" keyword argument.

    Args:
        data (toast.Data):  The distributed data.
        detectors (list):  A list of detector names or indices.  If None, this
            indicates a list of all detectors.

    Returns:
        None

    """
    log = Logger.get()
    if self.enabled:
        self._exec(
            data,
            detectors=detectors,
            **kwargs,
        )
    else:
        if data.comm.world_rank == 0:
            msg = f"Operator {self.name} is disabled, skipping call to exec()"
            log.debug(msg)

finalize(data, **kwargs)

Perform any final operations / communication.

A call to this function indicates that all calls to the 'exec()' method are complete, and the operator should perform any final actions. Any extra kwargs are passed to the derived class internal method.

Parameters:

Name Type Description Default
data Data

The distributed data.

required

Returns:

Type Description
value

None or an Operator-dependent result.

Source code in toast/ops/operator.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
@function_timer_stackskip
def finalize(self, data, **kwargs):
    """Perform any final operations / communication.

    A call to this function indicates that all calls to the 'exec()' method are
    complete, and the operator should perform any final actions.  Any extra
    kwargs are passed to the derived class internal method.

    Args:
        data (toast.Data):  The distributed data.

    Returns:
        (value):  None or an Operator-dependent result.

    """
    log = Logger.get()
    if self.enabled:
        msg = f"Calling finalize() for operator {self.name}"
        log.verbose(msg)
        return self._finalize(data, **kwargs)
    else:
        if data.comm.world_rank == 0:
            msg = f"Operator {self.name} is disabled, skipping call to finalize()"
            log.debug(msg)

get_class_config(input=None) classmethod

Return a dictionary of the default traits of an Operator class.

This returns a new or appended dictionary. The class instance properties are contained in a dictionary found in result["operators"][cls.name].

If the specified named location in the input config already exists then an exception is raised.

Parameters:

Name Type Description Default
input dict

The optional input dictionary to update.

None

Returns:

Type Description
dict

The created or updated dictionary.

Source code in toast/ops/operator.py
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
@classmethod
def get_class_config(cls, input=None):
    """Return a dictionary of the default traits of an Operator class.

    This returns a new or appended dictionary.  The class instance properties are
    contained in a dictionary found in result["operators"][cls.name].

    If the specified named location in the input config already exists then an
    exception is raised.

    Args:
        input (dict):  The optional input dictionary to update.

    Returns:
        (dict):  The created or updated dictionary.

    """
    return super().get_class_config(section="operators", input=input)

get_class_config_path() classmethod

Source code in toast/ops/operator.py
232
233
234
@classmethod
def get_class_config_path(cls):
    return "/operators/{}".format(cls.__qualname__)

get_config(input=None)

Return a dictionary of the current traits of an Operator instance.

This returns a new or appended dictionary. The operator instance properties are contained in a dictionary found in result["operators"][self.name].

If the specified named location in the input config already exists then an exception is raised.

Parameters:

Name Type Description Default
input dict

The optional input dictionary to update.

None

Returns:

Type Description
dict

The created or updated dictionary.

Source code in toast/ops/operator.py
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
def get_config(self, input=None):
    """Return a dictionary of the current traits of an Operator *instance*.

    This returns a new or appended dictionary.  The operator instance properties are
    contained in a dictionary found in result["operators"][self.name].

    If the specified named location in the input config already exists then an
    exception is raised.

    Args:
        input (dict):  The optional input dictionary to update.

    Returns:
        (dict):  The created or updated dictionary.

    """
    return super().get_config(section="operators", input=input)

get_config_path()

Source code in toast/ops/operator.py
236
237
238
239
def get_config_path(self):
    if self.name is None:
        return None
    return "/operators/{}".format(self.name)

load_apply(data, detectors=None, **kwargs)

Run load_exec() and finalize().

This is a convenience wrapper that calls load_exec() once and then immediately calls finalize(). Note that operator finalize methods should not rely on the existence of any detector data.

After calling this, any future calls to exec() or load_exec() may produce unexpected results, since finalize() has already been called.

All kwargs are passed to load_exec() and finalize().

Parameters:

Name Type Description Default
data Data

The distributed data.

required

Returns:

Type Description
value

None or an Operator-dependent result.

Source code in toast/ops/operator.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
@function_timer_stackskip
def load_apply(self, data, detectors=None, **kwargs):
    """Run load_exec() and finalize().

    This is a convenience wrapper that calls load_exec() once and then immediately
    calls finalize().  Note that operator finalize methods should not rely on the
    existence of any detector data.

    After calling this, any future calls to exec() or load_exec() may produce
    unexpected results, since finalize() has already been called.

    All kwargs are passed to load_exec() and finalize().

    Args:
        data (toast.Data):  The distributed data.

    Returns:
        (value):  None or an Operator-dependent result.

    """
    self.load_exec(data, detectors=detectors, **kwargs)
    return self.finalize(data, **kwargs)

load_exec(data, detectors=None, **kwargs)

Perform operations on a Data object that is not yet in memory.

In some cases, the full detector data across multiple observations is too large to fit in memory. This method calls exec() one observation at a time and looks for an attribute named "loader" in each observation. If this exists, it should be an instance of a Loader class that defines 2 methods that can be called like this:

load(Observation)
unload(Observation)

These should populate and clear any DetectorData in the observation. The experiment-specific code which defines and instantiates the Loader class should ensure that any metadata needed to create and read the detector data is either contained in the Loader instance or in the Observation data or metadata.

All kwargs are passed to the underlying call to exec().

Parameters:

Name Type Description Default
data Data

The distributed data.

required

Returns:

Type Description

None

Source code in toast/ops/operator.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
@function_timer_stackskip
def load_exec(self, data, detectors=None, **kwargs):
    """Perform operations on a Data object that is not yet in memory.

    In some cases, the full detector data across multiple observations is too
    large to fit in memory.  This method calls exec() one observation at a time
    and looks for an attribute named "loader" in each observation.  If this
    exists, it should be an instance of a Loader class that defines 2 methods
    that can be called like this:

        load(Observation)
        unload(Observation)

    These should populate and clear any DetectorData in the observation.  The
    experiment-specific code which defines and instantiates the Loader class
    should ensure that any metadata needed to create and read the detector data
    is either contained in the Loader instance or in the Observation data or
    metadata.

    All kwargs are passed to the underlying call to exec().

    Args:
        data (toast.Data):  The distributed data.

    Returns:
        None

    """
    log = Logger.get()
    if self.enabled:
        for iobs, obs in enumerate(data.obs):
            unload = False
            if hasattr(obs, "loader"):
                obs.loader.load(obs)
                unload = True
            temp_data = data.select(obs_index=iobs)
            self.exec(temp_data, detectors=detectors, **kwargs)
            del temp_data
            if unload:
                obs.loader.unload(obs)
    else:
        if data.comm.world_rank == 0:
            msg = f"Operator {self.name} is disabled, skipping call to load_exec()"
            log.debug(msg)

provides()

Dictionary of Observation keys created or modified by this Operator.

This dictionary should have 5 keys, each containing a list of "global", "metadata", "detdata", "shared", and "intervals" fields. Global keys are contained in the top-level data object. Metadata keys are those contained in the primary observation dictionary. Detdata, shared, and intervals keys are those contained in the "detdata", "shared", and "intervals" observation attributes.

Returns:

Type Description
dict

The keys in the Observation dictionary that will be created or modified.

Source code in toast/ops/operator.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def provides(self):
    """Dictionary of Observation keys created or modified by this Operator.

    This dictionary should have 5 keys, each containing a list of "global",
    "metadata", "detdata", "shared", and "intervals" fields.  Global keys are
    contained in the top-level data object.  Metadata keys are those contained
    in the primary observation dictionary.  Detdata, shared, and intervals keys are
    those contained in the "detdata", "shared", and "intervals" observation
    attributes.

    Returns:
        (dict):  The keys in the Observation dictionary that will be created
            or modified.

    """
    # Ensure that all keys exist
    prov = self._provides()
    for key in ["global", "meta", "detdata", "shared", "intervals"]:
        if key not in prov:
            prov[key] = list()
    return prov

requires()

Dictionary of Observation keys directly used by this Operator. Including optional keys that will be created by the operator if they do not exist.

This dictionary should have 5 keys, each containing a list of "global", "metadata", "detdata", "shared", and "intervals" fields. Global keys are contained in the top-level data object. Metadata keys are those contained in the primary observation dictionary. Detdata, shared, and intervals keys are those contained in the "detdata", "shared", and "intervals" observation attributes.

Returns:

Type Description
dict

The keys in the Observation dictionary required by the operator.

Source code in toast/ops/operator.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def requires(self):
    """Dictionary of Observation keys directly used by this Operator.
    Including optional keys that will be created by the operator if they do not exist.

    This dictionary should have 5 keys, each containing a list of "global",
    "metadata", "detdata", "shared", and "intervals" fields.  Global keys are
    contained in the top-level data object.  Metadata keys are those contained
    in the primary observation dictionary.  Detdata, shared, and intervals keys are
    those contained in the "detdata", "shared", and "intervals" observation
    attributes.

    Returns:
        (dict):  The keys in the Observation dictionary required by the operator.

    """
    # Ensure that all keys exist
    req = self._requires()
    for key in ["global", "meta", "detdata", "shared", "intervals"]:
        if key not in req:
            req[key] = list()
    # All operators use an implied interval list of the full sample range
    if None not in req["intervals"]:
        req["intervals"].append(None)
    return req

translate(props) classmethod

Given a config dictionary, modify it to match the current API.

Source code in toast/ops/operator.py
278
279
280
281
282
283
284
285
286
287
@classmethod
def translate(cls, props):
    """Given a config dictionary, modify it to match the current API."""
    # For operators, the derived classes should implement this method as needed
    # and then call super().translate(props) to trigger this method.  Here we strip
    # the 'API' key from the config.
    props = super().translate(props)
    if "API" in props:
        del props["API"]
    return props