# Licensed under a 3-clause BSD style license - see LICENSE.rst
import inspect
from copy import deepcopy
from dataclasses import dataclass
from typing import Literal
import numpy as np
from astropy import units as u
from astropy.nddata import VarianceUncertainty, NDData
from specreduce.compat import SPECUTILS_LT_2, Spectrum
__all__ = ["SpecreduceOperation"]
MaskingOption = Literal[
"apply", "ignore", "propagate", "zero_fill", "nan_fill", "apply_mask_only", "apply_nan_only"
]
ImageLike = np.ndarray | NDData | u.Quantity
class _ImageParser:
"""
Coerces images from accepted formats to Spectrum objects for
internal use in specreduce's operation classes.
Fills any and all of uncertainty, mask, units, and spectral axis
that are missing in the provided image with generic values.
Accepted image types are:
- `~specutils.Spectrum1D` (preferred)
- `~astropy.nddata.ccddata.CCDData`
- `~astropy.nddata.ndddata.NDDData`
- `~astropy.units.quantity.Quantity`
- `~numpy.ndarray`
"""
# The '_valid_mask_treatment_methods' in the Background, Trace, and Extract
# classes is a subset of implemented methods.
implemented_mask_treatment_methods = (
"apply",
"ignore",
"propagate",
"zero_fill",
"nan_fill",
"apply_mask_only",
"apply_nan_only",
)
def _parse_image(
self, image: ImageLike, disp_axis: int = 1, mask_treatment: MaskingOption = "apply"
) -> Spectrum:
"""
Convert all accepted image types to a consistently formatted Spectrum object.
Parameters
----------
image : `~astropy.nddata.NDData`-like or array-like
The image to be parsed. If None, defaults to class' own
image attribute.
disp_axis
The index of the image's dispersion axis. Should not be
changed until operations can handle variable image
orientations.
mask_treatment
Specifies how to handle masked or non-finite values in the input image.
The accepted values are:
- ``apply``: The image remains unchanged, and any existing mask is combined\
with a mask derived from non-finite values.
- ``ignore``: The image remains unchanged, and any existing mask is dropped.
- ``propagate``: The image remains unchanged, and any masked or non-finite pixel\
causes the mask to extend across the entire cross-dispersion axis.
- ``zero_fill``: Pixels that are either masked or non-finite are replaced with 0.0,\
and the mask is dropped.
- ``nan_fill``: Pixels that are either masked or non-finite are replaced with nan,\
and the mask is dropped.
- ``apply_mask_only``: The image and mask are left unmodified.
- ``apply_nan_only``: The image is left unmodified, the old mask is dropped, and a\
new mask is created based on non-finite values.
Returns
-------
Spectrum
"""
# would be nice to handle (cross)disp_axis consistently across
# operations (public attribute? private attribute? argument only?) so
# it can be called from self instead of via kwargs...
if image is None:
# useful for Background's instance methods
return self.image
return self._get_data_from_image(image, disp_axis=disp_axis, mask_treatment=mask_treatment)
@staticmethod
def _get_data_from_image(
image: ImageLike, disp_axis: int = 1, mask_treatment: MaskingOption = "apply"
) -> Spectrum:
"""
Extract data array from various input types for `image`.
Parameters
----------
image
Input image from which data is extracted. This can be a 2D numpy
array, Quantity, or an NDData object.
disp_axis
The dispersion axis of the image.
mask_treatment
Specifies how to handle masked or non-finite values in the input image.
Returns
-------
Spectrum
"""
if isinstance(image, u.quantity.Quantity):
img = image.value
elif isinstance(image, np.ndarray):
img = image
else: # NDData, including CCDData and Spectrum
img = image.data
mask = getattr(image, "mask", None)
crossdisp_axis = (disp_axis + 1) % 2
# next, handle masked and non-finite data in image.
# A mask will be created from any non-finite image data, and combined
# with any additional 'mask' passed in. If image is being parsed within
# a specreduce operation that has 'mask_treatment' options, this will be
# handled as well. Note that input data may be modified if a fill value
# is chosen to handle masked data. The returned image will always have
# `image.mask` even if there are no non-finite or masked values.
img, mask = _ImageParser._mask_and_nonfinite_data_handling(
image=img, mask=mask, mask_treatment=mask_treatment, crossdisp_axis=crossdisp_axis
)
# mask (handled above) and uncertainty are set as None when they aren't
# specified upon creating a Spectrum object, so we must check whether
# these attributes are absent *and* whether they are present but set as None
if hasattr(image, "uncertainty"):
uncertainty = image.uncertainty
else:
uncertainty = VarianceUncertainty(np.ones(img.shape))
unit = getattr(image, "unit", u.Unit("DN"))
spectral_axis = getattr(image, "spectral_axis", np.arange(img.shape[disp_axis]) * u.pix)
if SPECUTILS_LT_2:
kwargs = {}
else:
kwargs = {"spectral_axis_index": img.ndim - 1}
img = Spectrum(
img * unit, spectral_axis=spectral_axis, uncertainty=uncertainty, mask=mask,
**kwargs
)
return img
@staticmethod
def _mask_and_nonfinite_data_handling(
image: ImageLike,
mask: ImageLike | None = None,
mask_treatment: str = "apply",
crossdisp_axis: int = 1,
) -> tuple[np.ndarray, np.ndarray]:
"""
Handle the treatment of masked and non-finite data.
All operations in Specreduce can take in a mask for the data as
part of the input NDData.
There are five options currently implemented for the treatment
of masked and non-finite data - apply, ignore, zero_fill, nan_fill,
apply_mask_only, and apply_nan_only. Depending on the routine,
all or a subset of these three options are valid.
Parameters
----------
image : array-like
The input image data array that may contain non-finite values.
mask : array-like of bool or None
An optional Boolean mask array. Non-finite values in the image will be added
to this mask.
mask_treatment
Specifies how to handle masked or non-finite values in the input image.
"""
if mask_treatment not in _ImageParser.implemented_mask_treatment_methods:
raise ValueError(
"'mask_treatment' must be one of "
f"{_ImageParser.implemented_mask_treatment_methods}"
)
if mask is not None and (mask.dtype not in (bool, int)):
raise ValueError("'mask' must be a boolean or integer array.")
match mask_treatment:
case "apply":
mask = mask | (~np.isfinite(image)) if mask is not None else ~np.isfinite(image)
case "ignore":
mask = np.zeros(image.shape, dtype=bool)
case "propagate":
if mask is None:
mask = ~np.isfinite(image)
else:
mask = mask | (~np.isfinite(image))
mask[:] = mask.any(axis=crossdisp_axis, keepdims=True)
case "zero_fill" | "nan_fill":
mask = mask | (~np.isfinite(image)) if mask is not None else ~np.isfinite(image)
image = deepcopy(image)
if mask_treatment == "zero_fill":
image[mask] = 0.0
else:
image[mask] = np.nan
mask[:] = False
case "apply_nan_only":
mask = ~np.isfinite(image)
case "apply_mask_only":
mask = mask.copy() if mask is not None else np.zeros(image.shape, dtype=bool)
if mask.all():
raise ValueError("Image is fully masked. Check for invalid values.")
return image, mask
[docs]
@dataclass
class SpecreduceOperation(_ImageParser):
"""
An operation to perform as part of a spectroscopic reduction pipeline.
This class primarily exists to define the basic API for operations:
parameters for the operation are provided at object creation,
and then the operation object is called with the data objects required for
the operation, which then *return* the data objects resulting from the
operation.
"""
[docs]
def __call__(self):
raise NotImplementedError("__call__ on a SpecreduceOperation needs to " "be overridden")
[docs]
@classmethod
def as_function(cls, *args, **kwargs):
"""
Run this operation as a function. Syntactic sugar for e.g.,
``Operation.as_function(arg1, arg2, keyword=value)`` maps to
``Operation(arg2, keyword=value)(arg1)`` (if the ``__call__`` of
``Operation`` has only one argument)
"""
argspec = inspect.getargs(cls.__call__.__code__)
if argspec.varargs:
raise NotImplementedError(
"There is not a way to determine the "
"number of inputs of a *args style "
"operation"
)
ninputs = len(argspec.args) - 1
callargs = args[:ninputs]
noncallargs = args[ninputs:]
op = cls(*noncallargs, **kwargs)
return op(*callargs)