import numpy as np
import os
import glob
from astropy.io import fits
from scipy.interpolate import griddata
from scipy.ndimage import zoom
from scipy.stats import binned_statistic
import multiprocessing as mp
from tqdm import tqdm
from . import sort_nicely as sn
from .naninterp1d import naninterp1d
from .citations import CITATIONS
# populate common imports for current stage
COMMON_IMPORTS = np.array([
["astropy", "eureka", "h5py", "jwst", "numpy", ],
["astropy", "eureka", "h5py", "jwst", "matplotlib", ],
["astraeus", "astropy", "crds", "eureka", "h5py", "jwst",
"matplotlib", "numpy", "scipy", "xarray", ],
["astraeus", "astropy", "eureka", "h5py", "matplotlib", "numpy",
"pandas", "scipy", "xarray", ],
["astraeus", "astropy", "eureka", "h5py", "matplotlib", "numpy",
"pandas", "scipy", "xarray", ],
["astraeus", "astropy", "eureka", "h5py", "matplotlib", "numpy",
"pandas", "xarray", ],
], dtype=object)
[docs]
def readfiles(meta):
"""Read in the files saved in topdir + inputdir and save them to a list.
Parameters
----------
meta : eureka.lib.readECF.MetaClass
The metadata object.
Returns
-------
meta : eureka.lib.readECF.MetaClass
The metadata object with added segment_list containing the sorted
data fits files.
"""
meta.segment_list = []
# Look for files in the input directory
for fname in glob.glob(meta.inputdir+'*'+meta.suffix+'.fits'):
if not ignore_nonscience(fname):
meta.segment_list.append(fname)
# Need to allow for separated sci and cal directories for WFC3
if len(meta.segment_list) == 0:
# Add files from the sci directory if present
if not hasattr(meta, 'sci_dir') or meta.sci_dir is None:
meta.sci_dir = 'sci'
sci_path = os.path.join(meta.inputdir, meta.sci_dir)+os.sep
for fname in glob.glob(sci_path+'*'+meta.suffix+'.fits'):
if not ignore_nonscience(fname):
meta.segment_list.append(fname)
# Add files from the cal directory if present
if not hasattr(meta, 'cal_dir') or meta.cal_dir is None:
meta.cal_dir = 'cal'
cal_path = os.path.join(meta.inputdir, meta.cal_dir)+os.sep
for fname in glob.glob(cal_path+'*'+meta.suffix+'.fits'):
if not ignore_nonscience(fname):
meta.segment_list.append(fname)
meta.num_data_files = len(meta.segment_list)
if meta.num_data_files == 0:
raise AssertionError(f'Unable to find any "{meta.suffix}.fits" files '
f'in the inputdir: \n"{meta.inputdir}"!\n'
f'You likely need to change the inputdir in '
f'{meta.filename} to point to the folder '
f'containing the "{meta.suffix}.fits" files.')
meta.segment_list = np.array(sn.sort_nicely(meta.segment_list))
meta = get_inst(meta, meta.segment_list[-1])
return meta
[docs]
def ignore_nonscience(filename):
"""Determine whether a file is a TA-related file that should be ignored.
Parameters
----------
filename : str
The fully qualified path to a FITS file.
Returns
-------
bool
True if the specified file is a TA-related file that should be ignored.
False otherwise.
"""
with fits.open(filename) as hdulist:
try:
nonscience = (hdulist[0].header['EXP_TYPE'] in
['MIR_TACQ', 'MIR_TACONFIRM',
'NRC_TACQ', 'NRC_TACONFIRM',
'NRS_WATA', 'NRS_MSATA',
'NRS_BOTA', 'NRS_TASLIT',
'NRS_TACQ', 'NRS_CONFIRM',
'NRS_TACONFIRM', 'NRS_VERIFY',
'NIS_TACQ', 'NIS_TACONFIRM'])
except KeyError:
# HST data doesn't have EXP_TYPE, and we shouldn't need to worry
# about removing non-science data for HST anyway
nonscience = False
return nonscience
[docs]
def get_inst(meta, file):
"""Get the instrument used to collect a FITS file.
Parameters
----------
meta : eureka.lib.readECF.MetaClass
The metadata object.
file : str
The filename of a FITS file.
Returns
-------
meta : eureka.lib.readECF.MetaClass
The updated metadata object with meta.inst set (if not previously
set) to a lowercase string specifying the utilized instrument and with
meta.photometry set (if not previously set) to a boolean specifying
whether or not the data is photometric.
"""
with fits.open(file) as hdulist:
# Figure out which instrument we are using
meta.inst = getattr(meta, 'inst',
hdulist[0].header['INSTRUME'].lower())
if meta.inst != 'wfc3':
# Also figure out which pipeline we need to use
# (spectra or images)
exp_type = getattr(meta, 'exp_type',
hdulist[0].header['EXP_TYPE'])
if 'IMAGE' in exp_type:
# EXP_TYPE header is either MIR_IMAGE, NRC_IMAGE,
# NRC_TSIMAGE, NIS_IMAGE, or NRS_IMAGING
meta.photometry = getattr(meta, 'photometry', True)
else:
# EXP_TYPE doesn't say image, so it should be a spectrum
# (or someone is putting weird files into Eureka!)
meta.photometry = getattr(meta, 'photometry', False)
else:
# WFC3 photometry isn't supported
meta.photometry = getattr(meta, 'photometry', False)
return meta
[docs]
def trim(data, meta):
"""Removes the edges of the data arrays.
Parameters
----------
data : Xarray Dataset
The Dataset object.
meta : eureka.lib.readECF.MetaClass
The metadata object.
Returns
-------
subdata : Xarray Dataset
A new Dataset object with arrays that have been trimmed, depending on
xwindow and ywindow as set in the S3 ecf.
meta : eureka.lib.readECF.MetaClass
The metadata object.
"""
subdata = data.isel(y=np.arange(meta.ywindow[0], meta.ywindow[1]),
x=np.arange(meta.xwindow[0], meta.xwindow[1]))
meta.subny = meta.ywindow[1] - meta.ywindow[0]
meta.subnx = meta.xwindow[1] - meta.xwindow[0]
if meta.inst == 'wfc3':
subdata['guess'] = subdata.guess - meta.ywindow[0]
return subdata, meta
[docs]
def manual_clip(lc, meta, log, channel=0):
"""Manually clip integrations along time axis.
Parameters
----------
lc : Xarray Dataset
The Dataset object containing light curve and time data.
meta : eureka.lib.readECF.MetaClass
The current metadata object.
log : logedit.Logedit
The open log in which notes from this step can be added.
channel : int; optional
The current channel for multwhite fits.
Returns
-------
lc : Xarray Dataset
The updated Dataset object containing light curve and time data
with the requested integrations removed.
meta : eureka.lib.readECF.MetaClass
The updated metadata object.
log : logedit.Logedit
The updated log.
"""
log.writelog('Manually removing data points from meta.manual_clip...',
mute=(not meta.verbose))
meta.manual_clip = np.array(meta.manual_clip)
if len(meta.manual_clip.shape) == 1:
# The user didn't quite enter things right, so reshape
meta.manual_clip = meta.manual_clip[np.newaxis]
if meta.multwhite and len(meta.manual_clip.shape) == 2:
# The user didn't quite enter things right, so reshape
meta.manual_clip = np.repeat(meta.manual_clip[np.newaxis],
len(meta.inputdirlist)+1, axis=0)
if meta.multwhite:
manual_clip_temp = meta.manual_clip[channel]
else:
manual_clip_temp = meta.manual_clip
# Figure out which indices are being clipped
time_bool = np.ones(len(lc.data.time), dtype=bool)
for inds in manual_clip_temp:
time_bool[inds[0]:inds[1]] = False
time_inds = np.arange(len(lc.data.time))[time_bool]
# Remove the requested integrations
lc = lc.isel(time=time_inds)
return meta, lc, log
[docs]
def check_nans(data, mask, log, name=''):
"""Checks where a data-like array is invalid (contains NaNs or infs).
Parameters
----------
data : ndarray
a data-like array (e.g. data, err, dq, ...).
mask : ndarray
Input boolean mask, where mask is applied where True.
log : logedit.Logedit
The open log in which NaNs/Infs will be mentioned, if existent.
name : str; optional
The name of the data array passed in (e.g. SUBDATA, SUBERR, SUBV0).
Defaults to ''.
Returns
-------
mask : ndarray
Output mask where 0 will be written where the input data array has NaNs
or infs.
"""
data = np.ma.masked_invalid(data)
data = np.ma.masked_where(mask, data)
masked = np.ma.getmaskarray(data)
inan = np.where(masked)
num_nans = np.sum(masked)
num_pixels = np.size(data)
perc_nans = 100*num_nans/num_pixels
if num_nans > 0 and name == 'wavelength':
log.writelog(f" WARNING: Your {name} array has {num_nans} NaNs, which"
f" are outside of the wavelength solution. You should "
f"consider removing indices {inan} as their data quality "
f"may be poor.")
elif num_nans > 0:
log.writelog(f" {name} has {num_nans} NaNs/infs, which is "
f"{perc_nans:.2f}% of all pixels.")
mask[inan] = True
if perc_nans > 10:
log.writelog(" WARNING: Your region of interest may be off the edge "
"of the detector subarray. Masking NaN/inf regions and "
"continuing, but you should really stop and reconsider "
"your choices.")
return mask
[docs]
def makedirectory(meta, stage, counter=None, **kwargs):
"""Creates a directory for the current stage.
Parameters
----------
meta : eureka.lib.readECF.MetaClass
The metadata object.
stage : str
'S#' string denoting stage number (i.e. 'S3', 'S4').
counter : int; optional
The run number if you want to force a particular run number.
Defaults to None which automatically finds the run number.
**kwargs : dict
Additional key,value pairs to add to the folder name
(e.g. {'ap': 4, 'bg': 10}).
Returns
-------
run : int
The run number
"""
# This code allows the input and output files to be stored outside
# of the Eureka! folder
rootdir = os.path.join(meta.topdir, *meta.outputdir_raw.split(os.sep))
if rootdir[-1] != os.sep:
rootdir += os.sep
outputdir = rootdir+stage+'_'+meta.datetime+'_'+meta.eventlabel+'_run'
if counter is None:
counter = 1
while os.path.exists(outputdir+str(counter)):
counter += 1
outputdir += str(counter)+os.sep
else:
outputdir += str(counter)+os.sep
# Nest the different folders underneath one main folder for this run
for key, value in kwargs.items():
outputdir += key+str(value)+'_'
# Remove trailing _ if present
if outputdir[-1] == '_':
outputdir = outputdir[:-1]
# Add trailing slash
if outputdir[-1] != os.sep:
outputdir += os.sep
if not os.path.exists(outputdir):
try:
os.makedirs(outputdir)
except (PermissionError, OSError) as e:
# Raise a more helpful error message so that users know to update
# topdir in their ecf file
message = (f'You do not have the permissions to make the folder '
f'{outputdir}\nYour topdir is currently set to'
f'{meta.topdir}, but your user account is called '
f'{os.getenv("USER")}.\nYou likely need to update the '
f'topdir setting in your {stage} .ecf file.')
raise PermissionError(message) from e
if not os.path.exists(os.path.join(outputdir, "figs")):
os.makedirs(os.path.join(outputdir, "figs"))
return counter
[docs]
def pathdirectory(meta, stage, run, old_datetime=None, **kwargs):
"""Finds the directory for the requested stage, run, and datetime
(or old_datetime).
Parameters
----------
meta : eureka.lib.readECF.MetaClass
The metadata object.
stage : str
'S#' string denoting stage number (i.e. 'S3', 'S4')
run : int
run #, output from makedirectory function
old_datetime : str; optional
The date that a previous run was made (for looking up old data).
Defaults to None in which case meta.datetime is used instead.
**kwargs : dict
Additional key,value pairs to add to the folder name
(e.g. {'ap': 4, 'bg': 10}).
Returns
-------
path : str
Directory path for given parameters
"""
if old_datetime is not None:
datetime = old_datetime
else:
datetime = meta.datetime
# This code allows the input and output files to be stored outside
# of the Eureka! folder
rootdir = os.path.join(meta.topdir, *meta.outputdir_raw.split(os.sep))
if rootdir[-1] != os.sep:
rootdir += os.sep
outputdir = (rootdir+stage+'_'+datetime+'_'+meta.eventlabel+'_run' +
str(run)+os.sep)
for key, value in kwargs.items():
outputdir += key+str(value)+'_'
# Remove trailing _ if present
if outputdir[-1] == '_':
outputdir = outputdir[:-1]
# Add trailing slash
if outputdir[-1] != os.sep:
outputdir += os.sep
return outputdir
[docs]
def find_fits(meta):
'''Locates S1 or S2 output FITS files if unable to find an metadata file.
Parameters
----------
meta : eureka.lib.readECF.MetaClass
The new meta object for the current stage processing.
Returns
-------
meta : eureka.lib.readECF.MetaClass
The meta object with the updated inputdir pointing to the location of
the input files to use.
'''
fnames = glob.glob(meta.inputdir+'*'+meta.suffix + '.fits')
if len(fnames) == 0:
# There were no rateints files in that folder, so let's see if
# there are in children folders
fnames = glob.glob(meta.inputdir+'**'+os.sep+'*'+meta.suffix+'.fits',
recursive=True)
if len(fnames) == 0:
# If the code can't find any of the reqested files, raise an error
# and give a helpful message
message = (f'Unable to find any "{meta.suffix}.fits" files in the '
f'inputdir: \n"{meta.inputdir}"!\nYou likely need to change'
f' the inputdir in {meta.filename} to point to the folder '
f'containing the "{meta.suffix}.fits" files.')
raise AssertionError(message)
folders = np.unique([os.sep.join(fname.split(os.sep)[:-1])
for fname in fnames])
if len(folders) >= 1:
# get the file with the latest modified time
folder = max(folders, key=os.path.getmtime)
if len(folders) > 1:
# There may be multiple runs - use the most recent but warn the user
print(f'WARNING: There are multiple folders containing '
f'"{meta.suffix}.fits" files in your inputdir:\n'
f'"{meta.inputdir}"\n'
f'Using the files in: \n{folder}\n'
f'and will consider aperture ranges listed there. If this '
f'metadata file is not a part\nof the run you intended, please '
f'provide a more precise folder for the metadata file.')
meta.inputdir = folder
meta.inputdir_raw = folder[len(meta.topdir):]
# Make sure there's a trailing slash at the end of the paths
if meta.inputdir[-1] != os.sep:
meta.inputdir += os.sep
# Only get the relevant files
# Now also protecting against nested folders and against non-science files
relevant_fnames = [fname for fname in fnames if
# check that the file is in the relevant folder
(folder == os.sep.join(fname.split(os.sep)[:-1])
# check that the file is not a TA-related file
and not ignore_nonscience(fname))]
meta = get_inst(meta, relevant_fnames[-1])
return meta
[docs]
def binData(data, mask=None, nbin=100, err=False):
"""Temporally bin data for easier visualization.
Parameters
----------
data : ndarray (1D)
The data to temporally bin.
mask : ndarray (1D); optional
A boolean array of bad values (marked with True) that should be masked.
Defaults to None, in which case only non-finite values will be masked.
nbin : int; optional
The number of bins there should be. By default 100.
err : bool; optional
If True, divide the binned data by sqrt(N) to get the error on the
mean. By default False.
Returns
-------
binned : ndarray
The binned data.
"""
if mask is None:
mask = ~np.isfinite(data)
# Make a copy for good measure
data = np.ma.masked_where(mask, data)
# Make sure there's a whole number of bins
data = data[:nbin*int(len(data)/nbin)]
# Bin data
binned = np.ma.mean(data.reshape(nbin, -1), axis=1)
if err:
binned /= np.sqrt(int(len(data)/nbin))
return binned
[docs]
def binData_time(data, time, mask=None, nbin=100, err=False):
"""Temporally bin data for easier visualization.
Parameters
----------
data : ndarray (1D)
The data to temporally bin.
time : ndarray (1D)
The time axis along which to bin
mask : ndarray (1D); optional
A boolean array of bad values (marked with True) that should be masked.
Defaults to None, in which case only non-finite values will be masked.
nbin : int, optional
The number of bins there should be. By default 100.
err : bool, optional
If True, divide the binned data by sqrt(N) to get the error on the
mean. By default False.
Returns
-------
binned : ndarray
The binned data.
"""
if mask is None:
mask = ~np.isfinite(data)
# Make a copy for good measure
data = np.ma.masked_where(mask, data, copy=True)
# Binned_statistic will copy data without keeping it a masked array
# so we have to manually remove invalid points
if (type(data.mask) is np.bool_):
# Only good data
# np.ma.maskarray doesn't work with np.bool_ objects
good_time = time
good_data = data
else:
good_time = time[~np.ma.getmaskarray(data)]
good_data = data[~np.ma.getmaskarray(data)]
binned, _, _ = binned_statistic(good_time, good_data,
statistic='mean',
bins=nbin)
if err:
binned_count, _, _ = binned_statistic(good_time, good_data,
statistic='count',
bins=nbin)
binned /= np.sqrt(binned_count)
# Need to mask invalid data in case there is an empty bin
# (leading to divide by zero)
return np.ma.masked_invalid(binned)
[docs]
def normalize_spectrum(meta, optspec, opterr=None, optmask=None, scandir=None):
"""Normalize a spectrum by its temporal mean.
Parameters
----------
meta : eureka.lib.readECF.MetaClass
The new meta object for the current stage processing.
optspec : ndarray
The spectrum to normalize.
opterr : ndarray; optional
The noise array to normalize using optspec, by default None.
optmask : ndarray (1D); optional
A boolean mask array to use if optspec is not a masked array. Defaults
to None in which case only the invalid values of optspec will be
masked. Will mask the values where the mask value is set to True.
scandir : ndarray; optional
For HST spatial scanning mode, 0=forward scan and 1=reverse scan.
Defaults to None which is fine for JWST data, but must be provided
for HST data (can be all zero values if not spatial scanning mode).
Returns
-------
normspec
The normalized spectrum.
normerr : ndarray; optional
The normalized error. Only returned if opterr is not none.
"""
normspec = np.ma.masked_invalid(np.ma.copy(optspec))
normspec = np.ma.masked_where(optmask, normspec)
if opterr is not None:
normerr = np.ma.masked_invalid(np.ma.copy(opterr))
normerr = np.ma.masked_where(np.ma.getmaskarray(normspec), normerr)
normerr = np.ma.abs(normerr)
# Normalize the spectrum
if meta.inst == 'wfc3':
for p in range(2):
iscans = np.atleast_1d(scandir == p).nonzero()[0]
if len(iscans) > 0:
for r in range(meta.nreads):
normFactor = np.ma.abs(np.ma.mean(
normspec[iscans[r::meta.nreads]], axis=0))
normspec[iscans[r::meta.nreads]] /= normFactor
if opterr is not None:
normerr[iscans[r::meta.nreads]] /= normFactor
else:
normFactor = np.ma.abs(np.ma.mean(normspec, axis=0))
normspec /= normFactor
if opterr is not None:
normerr /= normFactor
if opterr is not None:
return normspec, normerr
else:
return normspec
[docs]
def get_mad(meta, log, wave_1d, optspec, optmask=None,
wave_min=None, wave_max=None, scandir=None):
"""Computes variation on median absolute deviation (MAD) using ediff1d
for 2D data.
The computed MAD is the average MAD along the time axis. In
otherwords, the MAD is computed in the time direction for each
wavelength, and then the returned value is the average of those MAD
values.
Parameters
----------
meta : eureka.lib.readECF.MetaClass
Unused. The metadata object.
log : logedit.Logedit
The current log.
wave_1d : ndarray
Wavelength array (nx) with trimmed edges depending on xwindow and
ywindow which have been set in the S3 ecf.
optspec : ndarray
Optimally extracted spectra, 2D array (time, nx)
optmask : ndarray (1D); optional
A boolean mask array to use if optspec is not a masked array. Defaults
to None in which case only the invalid values of optspec will be
masked. Will mask the values where the mask value is set to True.
wave_min : float; optional
Minimum wavelength for binned lightcurves, as given in the S4 .ecf
file. Defaults to None which does not impose a lower limit.
wave_max : float; optional
Maximum wavelength for binned lightcurves, as given in the S4 .ecf
file. Defaults to None which does not impose an upper limit.
scandir : ndarray; optional
For HST spatial scanning mode, 0=forward scan and 1=reverse scan.
Defaults to None which is fine for JWST data, but must be provided
for HST data (can be all zero values if not spatial scanning mode).
Returns
-------
mad : float
Single MAD value in ppm
"""
# Make sure wavelengths are in ascending order
if wave_1d[0] > wave_1d[-1]:
wave_1d = wave_1d[::-1]
optspec = optspec[::-1]
optmask = optmask[::-1]
optspec = np.ma.masked_invalid(optspec)
optspec = np.ma.masked_where(optmask, optspec)
if wave_min is not None:
iwmin = np.nanargmin(np.abs(wave_1d-wave_min))
else:
iwmin = 0
if wave_max is not None:
iwmax = np.nanargmin(np.abs(wave_1d-wave_max))
else:
iwmax = None
# Normalize the spectrum
normspec = normalize_spectrum(meta, optspec[:, iwmin:iwmax],
optmask=optmask[:, iwmin:iwmax],
scandir=scandir)
if meta.inst == 'wfc3':
# Setup 1D MAD arrays
n_wav = normspec.shape[1]
ediff = np.ma.zeros((2, n_wav))
# Compute the MAD for each scan direction
for p in range(2):
iscans = np.atleast_1d(scandir == p).nonzero()[0]
if len(iscans) > 0:
# Compute the MAD
for m in range(n_wav):
ediff[p, m] = get_mad_1d(normspec[iscans, m])
mad = np.ma.mean(ediff[p])
log.writelog(f"Scandir {p} MAD = {int(np.round(mad))} ppm")
setattr(meta, f'mad_scandir{p}', mad)
if np.all(scandir == scandir[0]):
# Only scanned in one direction, so get rid of the other
ediff = ediff[scandir[0]]
else:
# Collapse the MAD along the scan direction
ediff = np.mean(ediff, axis=0)
else:
# Setup 1D MAD array
n_wav = normspec.shape[1]
ediff = np.ma.zeros(n_wav)
# Compute the MAD
for m in range(n_wav):
ediff[m] = get_mad_1d(normspec[:, m])
return np.ma.mean(ediff)
[docs]
def get_mad_1d(data, ind_min=0, ind_max=None):
"""Computes variation on median absolute deviation (MAD) using ediff1d
for 1D data.
Parameters
----------
data : ndarray
The array from which to calculate MAD.
int_min : int
Minimum index to consider.
ind_max : int
Maximum index to consider (excluding ind_max).
Returns
-------
mad : float
Single MAD value in ppm
"""
return 1e6 * np.ma.median(np.ma.abs(np.ma.ediff1d(data[ind_min:ind_max])))
[docs]
def read_time(meta, data, log):
"""Read in a time CSV file instead of using the FITS time array.
Parameters
----------
meta : eureka.lib.readECF.MetaClass
The metadata object.
data : Xarray Dataset
The Dataset object with the fits data stored inside.
log : logedit.Logedit
The current log.
Returns
-------
time : ndarray
The time array stored in the meta.time_file CSV file.
"""
fname = os.path.join(meta.topdir,
os.sep.join(meta.time_file.split(os.sep)))
if meta.firstFile:
log.writelog(' Note: Using the time stamps from:\n '+fname)
time = np.loadtxt(fname).flatten()[data.attrs['intstart']:
data.attrs['intend']-1]
return time
[docs]
def manmask(data, meta, log):
'''Manually mask input bad pixels specified through meta.manmask.
Parameters
----------
data : Xarray Dataset
The Dataset object.
meta : eureka.lib.readECF.MetaClass
The metadata object.
log : logedit.Logedit
The current log.
Returns
-------
data : Xarray Dataset
The updated Dataset object with requested pixels masked.
'''
log.writelog(" Masking manually identified bad pixels...",
mute=(not meta.verbose))
for i in range(len(meta.manmask)):
colstart, colend, rowstart, rowend = meta.manmask[i]
data['mask'][:, rowstart:rowend+1, colstart:colend+1] = True
return data
# PHOTOMETRY
[docs]
def interp_masked(data, meta, log):
"""
Interpolates masked pixels.
Based on the example here:
https://docs.scipy.org/doc/scipy/reference/generated/scipy.interpolate.griddata.html
Parameters
----------
data : Xarray Dataset
The Dataset object.
meta : eureka.lib.readECF.MetaClass
The metadata object.
log : logedit.Logedit
The current log.
Returns
-------
data : Xarray Dataset
The updated Dataset object with requested pixels masked.
"""
nx = data.flux.values.shape[2]
ny = data.flux.values.shape[1]
grid_x, grid_y = np.mgrid[0:ny-1:complex(0, ny), 0:nx-1:complex(0, nx)]
if meta.interp_method not in ['nearest', 'linear', 'cubic']:
message = (f'Your method for interpolation "{meta.interp_method}" '
'is not supported! Please choose between None, nearest, '
'linear or cubic.')
log.writelog(message, mute=(not meta.verbose))
raise ValueError(message)
# Write interpolated values
def writeInterploated(arg):
grid_z, i = arg
data.flux.values[i] = grid_z
return
message = ' Interpolating masked values'
log.writelog(message+'...', mute=True)
if meta.ncpu == 1:
# Only 1 CPU
iterfn = range(len(data.time))
if meta.verbose:
iterfn = tqdm(iterfn, desc=message)
for i in iterfn:
writeInterploated(interp_masked_helper(
data.flux.values[i], data.mask.values[i], grid_x, grid_y,
meta.interp_method, i))
else:
# Multiple CPUs
pool = mp.Pool(meta.ncpu)
jobs = [pool.apply_async(func=interp_masked_helper,
args=(data.flux.values[i],
data.mask.values[i],
grid_x, grid_y,
meta.interp_method, i,),
callback=writeInterploated)
for i in range(len(data.time))]
pool.close()
iterfn = jobs
if meta.verbose:
iterfn = tqdm(iterfn, desc=message)
for job in iterfn:
job.get()
return data
[docs]
def interp_masked_helper(flux, mask, grid_x, grid_y, interp_method, i):
"""A helper function to do bad-pixel intepolation when multiprocessing.
Parameters
----------
flux : ndarray (2D)
A 2D float array.
mask : ndarray
A 2D boolean array, where True values are masked.
grid_x : ndarray
The x position for every index.
grid_y : ndarray
The y position for every index.
interp_method : str
One of ['linear', 'nearest', 'cubic']. For more details, read the
documentation for scipy.interpolate.griddata.
i : int
The current integration number.
Returns
-------
grid_z : ndarray
The input flux array with bad values interpolated over.
i : int
The same input i, to be used when multiprocessing.
"""
if not np.any(mask):
# Don't bother trying to replace bad values if there are none
return flux, i
# flux values of good pixels
values = flux[~mask]
# x,y positions of good pixels
points = np.array(np.where(~mask)).T
# Use scipy.interpolate.griddata to interpolate
grid_z = griddata(points, values, (grid_x, grid_y), method=interp_method)
# Replace flux with interpolated values
return grid_z, i
[docs]
def phot_arrays(data):
"""Setting up arrays for the photometry routine.
These arrays will be populated by the returns coming from centerdriver.py
and apphot.py
Parameters
----------
data : Xarray Dataset
The Dataset object.
Returns
-------
data : Xarray Dataset
The updated Dataset object with new arrays where the
outputs from the photometry routine will be saved in.
"""
keys = ['centroid_x', 'centroid_y', 'centroid_sx', 'centroid_sy',
'aplev', 'aperr', 'nappix', 'skylev', 'skyerr', 'nskypix',
'nskyideal', 'status', 'betaper']
for key in keys:
data[key] = (['time'], np.zeros_like(data.time))
data['aplev'].attrs['flux_units'] = data.flux.attrs['flux_units']
data['aplev'].attrs['time_units'] = data.flux.attrs['time_units']
data['aperr'].attrs['flux_units'] = data.flux.attrs['flux_units']
data['aperr'].attrs['time_units'] = data.flux.attrs['time_units']
return data
[docs]
def make_citations(meta, stage=None):
"""Store relevant citation information in the current meta file.
Searches through imported libraries and current ECF parameters for
terms that match BibTeX entries in citations.py. Every entry that
matches gets added to a bibliography field in the meta file.
Parameters
----------
meta : eureka.lib.readECF.MetaClass
The current metadata object.
mods : array-like
Array of strings containing the currently installed modules.
stage: integer
The integer number of the current stage (1,2,3,4,5,6)
"""
# get common modules for the current stage
module_cites = COMMON_IMPORTS[stage-1]
# in S5, extract fitting methods/myfuncs to grab citations
other_cites = []
# check for nircam photometry in S3
if stage == 3:
if hasattr(meta, 'inst') and hasattr(meta, "photometry"):
if meta.photometry and meta.inst == "nircam":
other_cites = other_cites + ["nircam_photometry"]
if stage == 5:
# concat non-lsq fit methods (emcee/dynesty) to the citation list
if "emcee" in meta.fit_method:
other_cites = other_cites + ["emcee"]
if "dynesty" in meta.fit_method:
other_cites = other_cites + ["dynesty"]
if "exoplanet" in meta.fit_method:
other_cites = other_cites + ["exoplanet"]
# check if batman or GP is being used for transit/eclipse modeling
if "batman_tr" in meta.run_myfuncs or "batman_ecl" in meta.run_myfuncs:
other_cites.append("batman")
if "catwoman_tr" in meta.run_myfuncs:
other_cites.append("catwoman")
if "fleck_tr" in meta.run_myfuncs:
other_cites.append("fleck")
if "GP" in meta.run_myfuncs:
if hasattr(meta, "GP_package"):
other_cites.append(meta.GP_package)
# I set the instrument in the relevant bits of S1/2, so I don't think this
# should really be necessary. Taylor's boilerplate for later
if not hasattr(meta, 'inst'):
valid = False
insts = ['miri', 'nirspec', 'nircam', 'niriss', 'wfc3']
while not valid:
inst = input('Which JWST/HST instrument are you using? \
(leave blank for none): ').lower()
if inst != '':
if inst in insts:
# The entered instrument was valid, so continue
meta.inst = inst
valid = True
else:
# No instrument, so just continue
valid = True
if not valid:
# The entered instrument was not valid, so explain, ask again
print(f'The instrument {inst} is not a valid instrument. \
Please choose from {insts}.')
# make sure instrument is in citation list
other_cites.append(meta.inst)
# get all new citations together
current_cites = np.union1d(module_cites, other_cites)
# check if meta has existing list of citations/bibitems, if it does, make
# sure we include imports from previous stages in our citations
prev_cites = []
if hasattr(meta, 'citations'):
prev_cites = meta.citations
all_cites = np.union1d(current_cites, prev_cites).tolist()
# make sure everything in meta citation list can be added to bibliography
for entry in all_cites:
if entry not in CITATIONS.keys():
all_cites.remove(entry)
# store everything in the meta object
meta.citations = all_cites
meta.bibliography = [CITATIONS[entry] for entry in meta.citations]
[docs]
def supersample(data, expand, type, axis=1):
"""Apply subpixel interpolation to the given arrays in the cross-disperion
direction.
Parameters
----------
data : ND array
Array of values to be super-sampled.
expand : int
Super-sampling factor along the given axis.
type : str
Options are: data, err, dq, or wave.
axis : int, Optional
Axis along which interpolation is performed (default is 1).
Returns
-------
zdata : ND array
The updated array at higher resolution
"""
# Build array of expansion factors
# e.g., [1, 5, 1] for axis=1 and expand=5
ndim = np.ndim(data)
expand_seq = np.ones(ndim, dtype=int)
expand_seq[axis] = expand
# SciPy's zoom can't handle NaNs, so let's replace them via interpolation
if ndim == 3:
for i in range(data.shape[0]):
for j in range(data.shape[1]):
data[i, j] = naninterp1d(data[i, j])
if ndim == 2:
for i in range(data.shape[0]):
data[i] = naninterp1d(data[i])
# Apply linear interpolation along axis
if type == 'flux':
# Divide by 'expand' to conserve flux/variance
zdata = zoom(data, expand_seq, order=1, mode='nearest')/expand
elif type == 'err':
# Divide by 'sqrt(expand)'' to conserve uncertainty
zdata = zoom(data, expand_seq, order=1, mode='nearest')/np.sqrt(expand)
elif type == 'cal':
# Apply same dq flag, gain values, etc to all super-sampled pixels
zdata = np.repeat(data, expand, axis=axis)
elif type == 'wave':
zdata = zoom(data, expand_seq, order=1, mode='nearest')
else:
print(f"Type {type} not supported. Must be one of flux, err, cal, " +
"or wave. No super-sampling applied.")
zdata = data
return zdata
[docs]
def load_attrs_from_xarray(data):
"""Function to load attrs from xarray file, ensuring
that potential conversions performed in add_meta_to_xarray()
are reversed.
Parameters
----------
data : Xarray Dataset
The updated Dataset object, meta parameters can be accessed in
data.attrs.
Returns
-------
attrs : dict
Dictionary of attributes saved to the Xarray.
"""
attrs = data.attrs
for attr in attrs.keys():
# Convert None strings back to None
if isinstance(attrs[attr], str) and attrs[attr] == 'None':
attrs[attr] = None
# Restructure bibliography correctly
if attr == 'bibliography':
attrs[attr] = [citations.split('_ENDOFCITATION_')
for citations in attrs[attr]]
return attrs
[docs]
def get_unexpanded_hws(expand, spec_hw_val, bg_hw_val):
"""Get the unexpanded half-width values for the spectrum and background.
Parameters
----------
expand : int
The super-sampling factor.
spec_hw_val : float
The half-width value for the spectrum.
bg_hw_val : float
The half-width value for the background.
Returns
-------
spec_hw_val_unexpanded : float
The unexpanded half-width value for the spectrum.
bg_hw_val_unexpanded : float
The unexpanded half-width value for the background.
"""
if not isinstance(bg_hw_val, str):
# Only divide if value is not a string (spectroscopic modes)
if isinstance(bg_hw_val, float):
bg_hw_val /= expand
else:
bg_hw_val //= expand
if isinstance(spec_hw_val, float):
spec_hw_val /= expand
else:
spec_hw_val //= expand
return spec_hw_val, bg_hw_val