Source code for specreduce.extract
# Licensed under a 3-clause BSD style license - see LICENSE.rst
import warnings
from dataclasses import dataclass, field
import numpy as np
from astropy import units as u
from astropy.modeling import Model, models, fitting
from astropy.nddata import NDData, VarianceUncertainty
from numpy import ndarray
from scipy.integrate import trapezoid
from scipy.interpolate import RectBivariateSpline
from specreduce.compat import Spectrum
from specreduce.core import SpecreduceOperation, ImageLike, MaskingOption
from specreduce.tracing import Trace, FlatTrace
__all__ = ["BoxcarExtract", "HorneExtract", "OptimalExtract"]
def _get_boxcar_weights(center, hwidth, npix):
"""
Compute weights given an aperture center, half width,
and number of pixels.
Based on `get_boxcar_weights()` from a JDAT Notebook by Karl Gordon:
https://github.com/spacetelescope/jdat_notebooks/blob/main/notebooks/MIRI_LRS_spectral_extraction/miri_lrs_spectral_extraction.ipynb
Parameters
----------
center : float, required
The index of the aperture's center pixel on the larger image's
cross-dispersion axis.
hwidth : float, required
Half of the aperture's width in the cross-dispersion direction.
npix : float, required
The number of pixels in the larger image's cross-dispersion
axis.
Returns
-------
weights : `~numpy.ndarray`
A 2D image with weights assigned to pixels that fall within the
defined aperture.
"""
weights = np.zeros((npix))
if hwidth == 0:
# the logic below would return all zeros anyways, so might as well save the time
# (negative widths should be avoided by earlier logic!)
return weights
if center - hwidth > npix - 0.5 or center + hwidth < -0.5:
# entire window is out-of-bounds
return weights
lower_edge = max(-0.5, center - hwidth) # where -0.5 is lower bound of the image
upper_edge = min(center + hwidth, npix - 0.5) # where npix-0.5 is upper bound of the image
# let's avoid recomputing the round repeatedly
int_round_lower_edge = int(round(lower_edge))
int_round_upper_edge = int(round(upper_edge))
# inner pixels that get full weight
# the round in conjunction with the +1 handles the half-pixel "offset",
# the upper bound doesn't have the +1 because array slicing is inclusive on the lower index and
# exclusive on the upper-index
# NOTE: round(-0.5) == 0, which is helpful here for the case where lower_edge == -0.5
weights[int_round_lower_edge + 1 : int_round_upper_edge] = 1
# handle edge pixels (for cases where an edge pixel is fully-weighted, this will set it again,
# but should still compute a weight of 1. By using N:N+1, we avoid index errors if the edge
# is outside the image bounds. But we do need to avoid negative indices which would count
# from the end of the array.
if int_round_lower_edge >= 0:
weights[int_round_lower_edge : int_round_lower_edge + 1] = (
round(lower_edge) + 0.5 - lower_edge
)
weights[int_round_upper_edge : int_round_upper_edge + 1] = upper_edge - (
round(upper_edge) - 0.5
)
return weights
def _ap_weight_image(trace, width, disp_axis, crossdisp_axis, image_shape):
"""
Create a weight image that defines the desired extraction aperture.
Based on `ap_weight_images()` from a JDAT Notebook by Karl Gordon:
https://github.com/spacetelescope/jdat_notebooks/blob/main/notebooks/MIRI_LRS_spectral_extraction/miri_lrs_spectral_extraction.ipynb
Parameters
----------
trace : `~specreduce.tracing.Trace`, required
trace object
width : float, required
width of extraction aperture in pixels
disp_axis : int, required
dispersion axis
crossdisp_axis : int, required
cross-dispersion axis
image_shape : tuple with 2 elements, required
size (shape) of image
Returns
-------
wimage : `~numpy.ndarray`
a 2D weight image defining the aperture
"""
wimage = np.zeros(image_shape)
hwidth = 0.5 * width
image_sizes = image_shape[crossdisp_axis]
# loop in dispersion direction and compute weights.
for i in range(image_shape[disp_axis]):
# TODO trace must handle transposed data (disp_axis == 0)
# pass trace.trace.data[i] to avoid any mask if part of the regions is out-of-bounds
# ArrayTrace can have nonfinite or masked data in trace, and this will fail,
# so figure out how to handle that...
wimage[:, i] = _get_boxcar_weights(trace.trace.data[i], hwidth, image_sizes)
return wimage
[docs]
@dataclass
class BoxcarExtract(SpecreduceOperation):
"""
Standard boxcar extraction along a trace.
Example: ::
trace = FlatTrace(image, trace_pos)
extract = BoxcarExtract(image, trace)
spectrum = extract(width=width)
Parameters
----------
image
image with 2-D spectral image data
trace_object
trace object
width
width of extraction aperture in pixels
disp_axis
dispersion axis
crossdisp_axis
cross-dispersion axis
mask_treatment
Specifies how to handle masked or non-finite values in the input image.
The accepted values are:
- ``apply``: The image remains unchanged, and any existing mask is combined\
with a mask derived from non-finite values.
- ``ignore``: The image remains unchanged, and any existing mask is dropped.
- ``propagate``: The image remains unchanged, and any masked or non-finite pixel\
causes the mask to extend across the entire cross-dispersion axis.
- ``zero_fill``: Pixels that are either masked or non-finite are replaced with 0.0,\
and the mask is dropped.
- ``nan_fill``: Pixels that are either masked or non-finite are replaced with nan,\
and the mask is dropped.
- ``apply_mask_only``: The image and mask are left unmodified.
- ``apply_nan_only``: The image is left unmodified, the old mask is dropped, and a\
new mask is created based on non-finite values.
Returns
-------
spec : `~specutils.Spectrum1D`
The extracted 1d spectrum expressed in DN and pixel units
"""
image: ImageLike
trace_object: Trace
width: float = 5
disp_axis: int = 1
crossdisp_axis: int = 0
# TODO: should disp_axis and crossdisp_axis be defined in the Trace object?
mask_treatment: MaskingOption = "apply"
_valid_mask_treatment_methods = (
"apply",
"ignore",
"propagate",
"zero_fill",
"nan_fill",
"apply_mask_only",
"apply_nan_only",
)
@property
def spectrum(self):
return self.__call__()
[docs]
def __call__(
self,
image: ImageLike | None = None,
trace: Trace | None = None,
width: float | None = None,
disp_axis: int | None = None,
crossdisp_axis: int | None = None,
) -> Spectrum:
"""
Extract the 1D spectrum using the boxcar method.
Parameters
----------
image
The image with 2-D spectral image data
trace
The trace object
width
The width of extraction aperture in pixels
disp_axis
The dispersion axis
crossdisp_axis
The cross-dispersion axis
Returns
-------
spec
The extracted 1d spectrum with flux expressed in the same
units as the input image, or u.DN, and pixel units
"""
image = image if image is not None else self.image
trace = trace or self.trace_object
width = width or self.width
disp_axis = disp_axis or self.disp_axis
cdisp_axis = crossdisp_axis or self.crossdisp_axis
if width <= 0:
raise ValueError("The window width must be positive")
self.image = self._parse_image(
image, disp_axis=disp_axis, mask_treatment=self.mask_treatment
)
# Spectrum extraction
# ===================
# Assign no weight to non-finite pixels outside the window. Non-finite pixels inside
# the window will be propagated to the sum if mask treatment is either ``ignore`` or
# ``propagate`` or excluded if the chosen mask treatment option is ``apply``. In the
# latter case, the flux is calculated as the average of the non-masked pixels inside
# the window multiplied by the window width.
window_weights = _ap_weight_image(trace, width, disp_axis, cdisp_axis, self.image.shape)
if self.mask_treatment == "apply":
image_cleaned = np.where(~self.image.mask, self.image.data * window_weights, 0.0)
weights = np.where(~self.image.mask, window_weights, 0.0)
spectrum = (
image_cleaned.sum(axis=cdisp_axis)
/ weights.sum(axis=cdisp_axis)
* window_weights.sum(axis=cdisp_axis)
)
else:
image_windowed = np.where(window_weights, self.image.data * window_weights, 0.0)
spectrum = np.sum(image_windowed, axis=cdisp_axis)
return Spectrum(spectrum * self.image.unit, spectral_axis=self.image.spectral_axis)
[docs]
@dataclass
class HorneExtract(SpecreduceOperation):
"""
Perform a Horne (a.k.a. optimal) extraction on a two-dimensional
spectrum.
There are two options for fitting the spatial profile used for
extraction - by default, a 1D gaussian is fit and as a uniform profile
across the spectrum. Alternativley, the ``self profile`` option may be
chosen - when this option is chosen, the spatial profile will be sampled
(using a default of 10 sample bins, but can be modified with
``spatial_profile``) and interpolated between to produce a smoothly varying
spatial profile across the spectrum.
If using the Gaussian option for the spatial profile, a background profile
may be fit (but not subtracted) simultaneously to the data. By default,
this is done with a 2nd degree polynomial. If using the
``interpolated_profile`` option, the background model must be set to None.
Parameters
----------
image : `~astropy.nddata.NDData`-like or array-like, required
The input 2D spectrum from which to extract a source. An
NDData object must specify uncertainty and a mask. An array
requires use of the ``variance``, ``mask``, & ``unit`` arguments.
trace_object : `~specreduce.tracing.Trace`, required
The associated 1D trace object created for the 2D image.
disp_axis : int, optional
The index of the image's dispersion axis. [default: 1]
crossdisp_axis : int, optional
The index of the image's cross-dispersion axis. [default: 0]
bkgrd_prof : `~astropy.modeling.Model` or None, optional
A model for the image's background flux when using the ``gaussian``
spatial profile. If ``spatial_profile`` is set to ``gaussian``, it defaults
to ``models.Polynomial1D(2)``. Note that the ``interpolated_profile`` option
does not support a background model, so ``bkgrd_prof`` must be left as ``None``.
spatial_profile : str or dict, optional
The shape of the object profile. The first option is 'gaussian' to fit
a uniform 1D gaussian to the average of pixels in the cross-dispersion
direction. The other option is 'interpolated_profile' - when this
option is used, the profile is sampled in bins and these samples are
interpolated between to construct a continuously varying, empirical
spatial profile for extraction. For this option, if passed in as a
string (i.e spatial_profile='interpolated_profile') the default values
for the number of bins used (10) and degree of interpolation
(linear in x and y, by default) will be used. To set these parameters,
pass in a dictionary with the keys 'n_bins_interpolated_profile' (which
accepts an integer number of bins) and 'interp_degree' (which accepts an
int, or tuple of ints for x and y degree, respectively).
[default: gaussian]
variance : `~numpy.ndarray`, optional
(Only used if ``image`` is not an NDData object.)
The associated variances for each pixel in the image. Must
have the same dimensions as ``image``. If all zeros, the variance
will be ignored and treated as all ones. If any zeros, those
elements will be excluded via masking. If any negative values,
an error will be raised. [default: None]
mask : `~numpy.ndarray`, optional
(Only used if ``image`` is not an NDData object.)
Whether to mask each pixel in the image. Must have the same
dimensions as ``image``. If blank, all non-NaN pixels are
unmasked. [default: None]
unit : `~astropy.units.Unit` or str, optional
(Only used if ``image`` is not an NDData object.)
The associated unit for the data in ``image``. If blank,
fluxes are interpreted in DN. [default: None]
"""
image: NDData
trace_object: Trace
bkgrd_prof: None | Model = None
spatial_profile: str | dict = "gaussian"
variance: np.ndarray = field(default=None)
mask: np.ndarray = field(default=None)
unit: np.ndarray = field(default=None)
disp_axis: int = 1
crossdisp_axis: int = 0
# TODO: should disp_axis and crossdisp_axis be defined in the Trace object?
@property
def spectrum(self):
return self.__call__()
def _parse_image(self, image, variance=None, mask=None, unit=None, disp_axis=1):
"""
Convert all accepted image types to a consistently formatted
Spectrum object.
HorneExtract needs its own version of this method because it is
more stringent in its requirements for input images. The extra
arguments are needed to handle cases where these parameters were
specified as arguments and those where they came as attributes
of the image object.
Parameters
----------
image : `~astropy.nddata.NDData`-like or array-like, required
The image to be parsed. If None, defaults to class' own
image attribute.
variance : `~numpy.ndarray`, optional
(Only used if ``image`` is not an NDData object.)
The associated variances for each pixel in the image. Must
have the same dimensions as ``image``. If all zeros, the variance
will be ignored and treated as all ones. If any zeros, those
elements will be excluded via masking. If any negative values,
an error will be raised.
mask : `~numpy.ndarray`, optional
(Only used if ``image`` is not an NDData object.)
Whether to mask each pixel in the image. Must have the same
dimensions as ``image``. If blank, all non-NaN pixels are
unmasked.
unit : `~astropy.units.Unit` or str, optional
(Only used if ``image`` is not an NDData object.)
The associated unit for the data in ``image``. If blank,
fluxes are interpreted in DN.
disp_axis : int, optional
The index of the image's dispersion axis. Should not be
changed until operations can handle variable image
orientations. [default: 1]
"""
if isinstance(image, np.ndarray):
img = image
elif isinstance(image, u.quantity.Quantity):
img = image.value
else: # NDData, including CCDData and Spectrum
img = image.data
# mask is set as None when not specified upon creating a Spectrum
# object, so we must check whether it is absent *and* whether it's
# present but set as None
if getattr(image, "mask", None) is not None:
mask = image.mask
elif mask is not None:
pass
else:
# if user provides no mask at all, don't mask anywhere
mask = np.zeros_like(img)
if img.shape != mask.shape:
raise ValueError("image and mask shapes must match.")
# Process uncertainties, converting to variances when able and throwing
# an error when uncertainties are missing or less easily converted
if hasattr(image, "uncertainty") and image.uncertainty is not None:
if image.uncertainty.uncertainty_type == "var":
variance = image.uncertainty.array
elif image.uncertainty.uncertainty_type == "std":
warnings.warn(
"image NDData object's uncertainty "
"interpreted as standard deviation. if "
"incorrect, use VarianceUncertainty when "
"assigning image object's uncertainty."
)
variance = image.uncertainty.array**2
elif image.uncertainty.uncertainty_type == "ivar":
variance = 1 / image.uncertainty.array
else:
# other options are InverseUncertainty and UnknownUncertainty
raise ValueError(
"image NDData object has unexpected "
"uncertainty type. instead, try "
"VarianceUncertainty or StdDevUncertainty."
)
elif hasattr(image, "uncertainty") and image.uncertainty is None:
# ignore variance arg to focus on updating NDData object
raise ValueError("image NDData object lacks uncertainty")
else:
if variance is None:
raise ValueError(
"if image is a numpy or Quantity array, a "
"variance must be specified. consider "
"wrapping it into one object by instead "
"passing an NDData image."
)
elif image.shape != variance.shape:
raise ValueError("image and variance shapes must match")
if np.any(variance < 0):
raise ValueError("variance must be fully positive")
if np.all(variance == 0):
# technically would result in infinities, but since they're all
# zeros, we can override ones to simulate an unweighted case
variance = np.ones_like(variance)
if np.any(variance == 0):
# exclude such elements by editing the input mask
mask[variance == 0] = True
# replace the variances to avoid a divide by zero warning
variance[variance == 0] = np.nan
variance = VarianceUncertainty(variance)
unit = getattr(image, "unit", u.Unit(unit) if unit is not None else u.Unit("DN"))
spectral_axis = getattr(image, "spectral_axis", np.arange(img.shape[disp_axis]) * u.pix)
return Spectrum(img * unit, spectral_axis=spectral_axis, uncertainty=variance, mask=mask)
def _fit_gaussian_spatial_profile(
self, img: ndarray, disp_axis: int, crossdisp_axis: int, or_mask: ndarray, bkgrd_prof: Model
):
"""Fit a 1D Gaussian spatial profile to spectrum in `img`.
Fits an 1D Gaussian profile to spectrum in `img`. Takes the weighted mean
of ``img`` along the cross-dispersion axis all ignoring masked pixels
(i.e, takes the mean of each row for a horizontal trace). A Background model
(optional) is fit simultaneously. Returns an `astropy.model.Gaussian1D` (or
compound model, if `bkgrd_prof` is supplied) fit to data.
"""
nrows = img.shape[crossdisp_axis]
xd_pixels = np.arange(nrows)
# co-add signal in each image row, ignore masked pixels
coadd = np.ma.masked_array(img, mask=or_mask).mean(disp_axis)
# use the sum of brightest row as an inital guess for Gaussian amplitude,
# the the location of the brightest row as an initial guess for the mean
gauss_prof = models.Gaussian1D(amplitude=coadd.max(), mean=coadd.argmax(), stddev=2)
# Fit extraction kernel (Gaussian + background model) to coadded rows
# with combined model (must exclude masked indices manually;
# LevMarLSQFitter does not)
if bkgrd_prof is not None:
ext_prof = gauss_prof + bkgrd_prof
else:
# add a trivial constant model so attribute names are the same
ext_prof = gauss_prof + models.Const1D(0, fixed={"amplitude": True})
with warnings.catch_warnings():
warnings.simplefilter("ignore")
fitter = fitting.LMLSQFitter()
fit_ext_kernel = fitter(ext_prof, xd_pixels[~coadd.mask], coadd.compressed())
return fit_ext_kernel
def _fit_spatial_profile(
self,
img: ndarray,
disp_axis: int,
crossdisp_axis: int,
mask: ndarray,
n_bins: int,
kx: int,
ky: int,
) -> RectBivariateSpline:
"""
Fit a spatial profile by sampling the median profile along the dispersion direction.
This method extracts the spatial profile from an input spectrum by binning
the data along the dispersion axis. It calculates the median profile for each bin,
normalizes it, and then interpolates between these profiles to create a smooth
2D representation of the spatial profile. The resulting interpolator object can be
used to evaluate the spatial profile at any coordinate within the bounds of the data.
Parameters
----------
img
The 2D array of spectral data to process.
disp_axis
The image axis corresponding to the dispersion direction.
crossdisp_axis
The image axis corresponding to the cross-dispersion direction.
mask
A boolean mask array with the same shape as the image. Values of ``True``
in the mask indicate invalid data points to be ignored during computation.
n_bins
The number of bins to use along the dispersion axis for sampling
the median spatial profile.
kx
The degree of the spline along the dispersion axis.
ky
The degree of the spline along the cross-dispersion axis.
Returns
-------
RectBivariateSpline
Interpolator object that provides a smoothed 2D spatial profile.
"""
img = np.where(~mask, img, np.nan)
nrows = img.shape[crossdisp_axis]
ncols = img.shape[disp_axis]
samples = np.zeros((n_bins, nrows))
sample_locs = np.linspace(0, ncols - 1, n_bins + 1, dtype=int)
bin_centers = [
(sample_locs[i] + sample_locs[i + 1]) // 2 for i in range(len(sample_locs) - 1)
]
for i in range(n_bins):
bin_median = np.nanmedian(img[:, sample_locs[i] : sample_locs[i + 1]], axis=disp_axis)
samples[i, :] = bin_median / bin_median.sum()
return RectBivariateSpline(x=bin_centers, y=np.arange(nrows), z=samples, kx=kx, ky=ky)
[docs]
def __call__(
self,
image=None,
trace_object=None,
disp_axis=None,
crossdisp_axis=None,
bkgrd_prof=None,
spatial_profile=None,
n_bins_interpolated_profile=None,
interp_degree_interpolated_profile=None,
variance=None,
mask=None,
unit=None,
):
"""
Run the Horne calculation on a region of an image and extract a 1D spectrum.
Parameters
----------
image : `~astropy.nddata.NDData`-like or array-like, required
The input 2D spectrum from which to extract a source. An
NDData object must specify uncertainty and a mask. An array
requires use of the ``variance``, ``mask``, & ``unit`` arguments.
trace_object : `~specreduce.tracing.Trace`, required
The associated 1D trace object created for the 2D image.
disp_axis : int, optional
The index of the image's dispersion axis.
crossdisp_axis : int, optional
The index of the image's cross-dispersion axis.
bkgrd_prof : `~astropy.modeling.Model`, optional
A model for the image's background flux when using the ``gaussian``
spatial profile. If ``spatial_profile`` is set to ``gaussian``, it defaults
to ``models.Polynomial1D(2)``. Note that the ``interpolated_profile`` option
does not support a background model, so ``bkgrd_prof`` must be left as ``None``.
spatial_profile : str or dict, optional
The shape of the object profile. The first option is 'gaussian' to fit
a uniform 1D gaussian to the average of pixels in the cross-dispersion
direction. The other option is 'interpolated_profile' - when this
option is used, the profile is sampled in bins and these samples are
interpolated between to construct a continuously varying, empirical
spatial profile for extraction. For this option, if passed in as a
string (i.e spatial_profile='interpolated_profile') the default values
for the number of bins used (10) and degree of interpolation
(linear in x and y, by default) will be used. To set these parameters,
pass in a dictionary with the keys 'n_bins_interpolated_profile' (which
accepts an integer number of bins) and 'interp_degree' (which accepts an
int, or tuple of ints for x and y degree, respectively).
[default: gaussian]
variance : `~numpy.ndarray`, optional
(Only used if ``image`` is not an NDData object.)
The associated variances for each pixel in the image. Must
have the same dimensions as ``image``. If all zeros, the variance
will be ignored and treated as all ones. If any zeros, those
elements will be excluded via masking. If any negative values,
an error will be raised.
mask : `~numpy.ndarray`, optional
(Only used if ``image`` is not an NDData object.)
Whether to mask each pixel in the image. Must have the same
dimensions as ``image``. If blank, all non-NaN pixels are
unmasked.
unit : `~astropy.units.Unit` or str, optional
(Only used if ``image`` is not an NDData object.)
The associated unit for the data in ``image``. If blank,
fluxes are interpreted in DN.
Returns
-------
spec_1d : `~specutils.Spectrum1D`
The final, Horne extracted 1D spectrum.
"""
image = image if image is not None else self.image
trace_object = trace_object if trace_object is not None else self.trace_object
disp_axis = disp_axis if disp_axis is not None else self.disp_axis
crossdisp_axis = crossdisp_axis if crossdisp_axis is not None else self.crossdisp_axis
bkgrd_prof = bkgrd_prof if bkgrd_prof is not None else self.bkgrd_prof
profile = spatial_profile if spatial_profile is not None else self.spatial_profile
variance = variance if variance is not None else self.variance
mask = mask if mask is not None else self.mask
unit = unit if unit is not None else self.unit
profile_choices = ("gaussian", "interpolated_profile")
if not isinstance(profile, (str, dict)):
raise ValueError("spatial_profile must be a string or dictionary.")
if isinstance(profile, str):
profile = dict(name=profile)
profile_type = profile["name"].lower()
if profile_type not in profile_choices:
raise ValueError("spatial_profile must be one of" f"{', '.join(profile_choices)}")
n_bins_interpolated_profile = profile.get("n_bins_interpolated_profile", 10)
interp_degree_interpolated_profile = profile.get("interp_degree_interpolated_profile", 1)
if bkgrd_prof is None and profile_type == 'gaussian':
bkgrd_prof = models.Polynomial1D(2)
self.image = self._parse_image(image, variance, mask, unit, disp_axis)
variance = self.image.uncertainty.represent_as(VarianceUncertainty).array
mask = self.image.mask.astype(bool) | (~np.isfinite(self.image.data))
unit = self.image.unit
img = self.image.data
ncross = img.shape[crossdisp_axis]
ndisp = img.shape[disp_axis]
# If the trace is not flat, shift the rows in each column
# so the image is aligned along the trace:
if not isinstance(trace_object, FlatTrace):
img = _align_along_trace(
img, trace_object.trace, disp_axis=disp_axis, crossdisp_axis=crossdisp_axis
)
if profile_type == "gaussian":
fit_ext_kernel = self._fit_gaussian_spatial_profile(
img, disp_axis, crossdisp_axis, mask, bkgrd_prof
)
if isinstance(trace_object, FlatTrace):
mean_cross_pix = trace_object.trace
else:
mean_cross_pix = np.broadcast_to(ncross // 2, ndisp)
else: # interpolated_profile
# determine interpolation degree from input and make tuple if int
# this can also be moved to another method to parse the input
# 'spatial_profile' arg, eventually
if isinstance(interp_degree_interpolated_profile, int):
kx = ky = interp_degree_interpolated_profile
else: # if input is tuple of ints
if not isinstance(interp_degree_interpolated_profile, tuple):
raise ValueError(
"``interp_degree_interpolated_profile`` must be ",
"an integer or tuple of integers.",
)
if not all(isinstance(x, int) for x in interp_degree_interpolated_profile):
raise ValueError(
"``interp_degree_interpolated_profile`` must be ",
"an integer or tuple of integers.",
)
kx, ky = interp_degree_interpolated_profile
interp_spatial_prof = self._fit_spatial_profile(
img, disp_axis, crossdisp_axis, mask, n_bins_interpolated_profile, kx, ky
)
# add private attribute to save fit profile. should this be public?
self._interp_spatial_prof = interp_spatial_prof
xd_pixels = np.arange(ncross)
kernel_vals = np.zeros(img.shape)
norms = np.full(ndisp, np.nan)
valid = ~mask
if profile_type == "gaussian":
norms[:] = fit_ext_kernel.amplitude_0 * fit_ext_kernel.stddev_0 * np.sqrt(2 * np.pi)
for idisp in range(ndisp):
if not np.any(valid[:, idisp]):
continue
if profile_type == "gaussian":
fit_ext_kernel.mean_0 = mean_cross_pix[idisp]
fitted_col = fit_ext_kernel(xd_pixels)
kernel_vals[:, idisp] = fitted_col
else:
fitted_col = interp_spatial_prof(idisp, xd_pixels)
kernel_vals[:, idisp] = fitted_col
norms[idisp] = trapezoid(fitted_col, dx=1)[0]
with np.errstate(divide="ignore", invalid="ignore"):
num = np.sum(np.where(valid, img * kernel_vals / variance, 0.0), axis=crossdisp_axis)
den = np.sum(np.where(valid, kernel_vals**2 / variance, 0.0), axis=crossdisp_axis)
extraction = (num / den) * norms
return Spectrum(extraction * unit, spectral_axis=self.image.spectral_axis)
def _align_along_trace(img, trace_array, disp_axis=1, crossdisp_axis=0):
"""
Given an arbitrary trace ``trace_array`` (an np.ndarray), roll
all columns of ``nddata`` to shift the NDData's pixels nearest
to the trace to the center of the spatial dimension of the
NDData.
"""
# TODO: this workflow does not support extraction for >2D spectra
if not (disp_axis == 1 and crossdisp_axis == 0):
# take the transpose to ensure the rows are the cross-disp axis:
img = img.T
n_rows, n_cols = img.shape
# indices of all columns, in their original order
rows = np.broadcast_to(np.arange(n_rows)[:, None], img.shape)
cols = np.broadcast_to(np.arange(n_cols), img.shape)
# we want to "roll" each column so that the trace sits in
# the central row of the final image
shifts = trace_array.astype(int) - n_rows // 2
# we wrap the indices so we don't index out of bounds
shifted_rows = np.mod(rows + shifts[None, :], n_rows)
return img[shifted_rows, cols]
[docs]
@dataclass
class OptimalExtract(HorneExtract):
"""
An alias for `HorneExtract`.
"""
__doc__ += HorneExtract.__doc__
pass