import numpy as np
import os
import glob
from astropy.io import fits
from . import sort_nicely as sn
[docs]def readfiles(meta):
"""Reads in the files saved in topdir + inputdir and saves them into a list.
Parameters
----------
meta : eureka.lib.readECF.MetaClass
The metadata object.
Returns
-------
meta : eureka.lib.readECF.MetaClass
The metadata object with added segment_list containing the sorted
data fits files.
"""
meta.segment_list = []
# Look for files in the input directory
for fname in glob.glob(meta.inputdir+'*'+meta.suffix+'.fits'):
meta.segment_list.append(fname)
# Need to allow for separated sci and cal directories for WFC3
if len(meta.segment_list) == 0:
# Add files from the sci directory if present
if not hasattr(meta, 'sci_dir') or meta.sci_dir is None:
meta.sci_dir = 'sci'
sci_path = os.path.join(meta.inputdir, meta.sci_dir)+os.sep
for fname in glob.glob(sci_path+'*'+meta.suffix+'.fits'):
meta.segment_list.append(fname)
# Add files from the cal directory if present
if not hasattr(meta, 'cal_dir') or meta.cal_dir is None:
meta.cal_dir = 'cal'
cal_path = os.path.join(meta.inputdir, meta.cal_dir)+os.sep
for fname in glob.glob(cal_path+'*'+meta.suffix+'.fits'):
meta.segment_list.append(fname)
with fits.open(meta.segment_list[-1]) as hdulist:
# Figure out which instrument we are using
meta.inst = hdulist[0].header['INSTRUME'].lower()
meta.segment_list = np.array(sn.sort_nicely(meta.segment_list))
return meta
[docs]def trim(data, meta):
"""Removes the edges of the data arrays.
Parameters
----------
data : Xarray Dataset
The Dataset object.
meta : eureka.lib.readECF.MetaClass
The metadata object.
Returns
-------
subdata : Xarray Dataset
A new Dataset object with arrays that have been trimmed, depending on
xwindow and ywindow as set in the S3 ecf.
meta : eureka.lib.readECF.MetaClass
The metadata object.
"""
subdata = data.isel(y=np.arange(meta.ywindow[0], meta.ywindow[1]),
x=np.arange(meta.xwindow[0], meta.xwindow[1]))
meta.subny = meta.ywindow[1] - meta.ywindow[0]
meta.subnx = meta.xwindow[1] - meta.xwindow[0]
if meta.inst == 'wfc3':
subdata['guess'] = subdata.guess - meta.ywindow[0]
return subdata, meta
[docs]def check_nans(data, mask, log, name=''):
"""Checks where a data array has NaNs.
Parameters
----------
data : ndarray
a data array (e.g. data, err, dq, ...).
mask : ndarray
Input mask.
log : logedit.Logedit
The open log in which NaNs will be mentioned if existent.
name : str; optional
The name of the data array passed in (e.g. SUBDATA, SUBERR, SUBV0).
Defaults to ''.
Returns
-------
mask : ndarray
Output mask where 0 will be written where the input data array has NaNs
"""
num_nans = np.sum(np.isnan(data.values))
if num_nans > 0:
log.writelog(f" WARNING: {name} has {num_nans} NaNs. Your subregion "
f"may be off the edge of the detector subarray.\n"
f" Masking NaN region and continuing, but you should "
f"really stop and reconsider your choices.")
inan = np.where(np.isnan(data))
# subdata[inan] = 0
mask[inan] = 0
return mask
[docs]def makedirectory(meta, stage, counter=None, **kwargs):
"""Creates a directory for the current stage.
Parameters
----------
meta : eureka.lib.readECF.MetaClass
The metadata object.
stage : str
'S#' string denoting stage number (i.e. 'S3', 'S4').
counter : int; optional
The run number if you want to force a particular run number.
Defaults to None which automatically finds the run number.
**kwargs : dict
Additional key,value pairs to add to the folder name
(e.g. {'ap': 4, 'bg': 10}).
Returns
-------
run : int
The run number
"""
# This code allows the input and output files to be stored outside
# of the Eureka! folder
rootdir = os.path.join(meta.topdir, *meta.outputdir_raw.split(os.sep))
if rootdir[-1] != os.sep:
rootdir += os.sep
outputdir = rootdir+stage+'_'+meta.datetime+'_'+meta.eventlabel+'_run'
if counter is None:
counter = 1
while os.path.exists(outputdir+str(counter)):
counter += 1
outputdir += str(counter)+os.sep
else:
outputdir += str(counter)+os.sep
# Nest the different folders underneath one main folder for this run
for key, value in kwargs.items():
outputdir += key+str(value)+'_'
# Remove trailing _ if present
if outputdir[-1] == '_':
outputdir = outputdir[:-1]
# Add trailing slash
if outputdir[-1] != os.sep:
outputdir += os.sep
if not os.path.exists(outputdir):
try:
os.makedirs(outputdir)
except (PermissionError, OSError) as e:
# Raise a more helpful error message so that users know to update
# topdir in their ecf file
message = (f'You do not have the permissions to make the folder '
f'{outputdir}\nYour topdir is currently set to'
f'{meta.topdir}, but your user account is called '
f'{os.getenv("USER")}.\nYou likely need to update the '
f'topdir setting in your {stage} .ecf file.')
raise PermissionError(message) from e
if not os.path.exists(os.path.join(outputdir, "figs")):
os.makedirs(os.path.join(outputdir, "figs"))
return counter
[docs]def pathdirectory(meta, stage, run, old_datetime=None, **kwargs):
"""Finds the directory for the requested stage, run, and datetime
(or old_datetime).
Parameters
----------
meta : eureka.lib.readECF.MetaClass
The metadata object.
stage : str
'S#' string denoting stage number (i.e. 'S3', 'S4')
run : int
run #, output from makedirectory function
old_datetime : str; optional
The date that a previous run was made (for looking up old data).
Defaults to None in which case meta.datetime is used instead.
**kwargs : dict
Additional key,value pairs to add to the folder name
(e.g. {'ap': 4, 'bg': 10}).
Returns
-------
path : str
Directory path for given parameters
"""
if old_datetime is not None:
datetime = old_datetime
else:
datetime = meta.datetime
# This code allows the input and output files to be stored outside
# of the Eureka! folder
rootdir = os.path.join(meta.topdir, *meta.outputdir_raw.split(os.sep))
if rootdir[-1] != os.sep:
rootdir += os.sep
outputdir = (rootdir+stage+'_'+datetime+'_'+meta.eventlabel+'_run' +
str(run)+os.sep)
for key, value in kwargs.items():
outputdir += key+str(value)+'_'
# Remove trailing _ if present
if outputdir[-1] == '_':
outputdir = outputdir[:-1]
# Add trailing slash
if outputdir[-1] != os.sep:
outputdir += os.sep
return outputdir
[docs]def find_fits(meta):
'''Locates S1 or S2 output FITS files if unable to find an metadata file.
Parameters
----------
meta : eureka.lib.readECF.MetaClass
The new meta object for the current stage processing.
Returns
-------
meta : eureka.lib.readECF.MetaClass
The meta object with the updated inputdir pointing to the location of
the input files to use.
Notes
-----
History:
- April 25, 2022 Taylor Bell
Initial version.
'''
fnames = glob.glob(meta.inputdir+'*'+meta.suffix + '.fits')
if len(fnames) == 0:
# There were no rateints files in that folder, so let's see if
# there are in children folders
fnames = glob.glob(meta.inputdir+'**'+os.sep+'*'+meta.suffix+'.fits',
recursive=True)
fnames = sn.sort_nicely(fnames)
if len(fnames) == 0:
# If the code can't find any of the reqested files, raise an error
# and give a helpful message
message = (f'Unable to find any "{meta.suffix}.fits" files in the '
f'inputdir: \n"{meta.inputdir}"!\nYou likely need to change'
f' the inputdir in {meta.filename} to point to the folder '
f'containing the "{meta.suffix}.fits" files.')
raise AssertionError(message)
folders = np.unique([os.sep.join(fname.split(os.sep)[:-1])
for fname in fnames])
if len(folders) >= 1:
# get the file with the latest modified time
folder = max(folders, key=os.path.getmtime)
if len(folders) > 1:
# There may be multiple runs - use the most recent but warn the user
print(f'WARNING: There are multiple folders containing '
f'"{meta.suffix}.fits" files in your inputdir:\n'
f'"{meta.inputdir}"\n'
f'Using the files in: \n{folder}\n'
f'and will consider aperture ranges listed there. If this '
f'metadata file is not a part\nof the run you intended, please '
f'provide a more precise folder for the metadata file.')
meta.inputdir = folder
meta.inputdir_raw = folder[len(meta.topdir):]
# Make sure there's a trailing slash at the end of the paths
if meta.inputdir[-1] != os.sep:
meta.inputdir += os.sep
return meta
[docs]def normalize_spectrum(meta, optspec, opterr=None, optmask=None):
"""Normalize a spectrum by its temporal mean.
Parameters
----------
meta : eureka.lib.readECF.MetaClass
The new meta object for the current stage processing.
optspec : ndarray
The spectrum to normalize.
opterr : ndarray, optional
The noise array to normalize using optspec, by default None.
optmask : ndarray (1D), optional
A mask array to use if optspec is not a masked array. Defaults to None
in which case only the invalid values of optspec will be masked.
Returns
-------
normspec
The normalized spectrum.
normerr : ndarray, optional
The normalized error. Only returned if opterr is not none.
"""
normspec = np.ma.masked_invalid(np.ma.copy(optspec))
normspec = np.ma.masked_where(optmask, normspec)
if opterr is not None:
normerr = np.ma.masked_invalid(np.ma.copy(opterr))
normerr = np.ma.masked_where(np.ma.getmaskarray(normspec), normerr)
# Normalize the spectrum
if meta.inst == 'wfc3':
scandir = np.repeat(meta.scandir, meta.nreads)
for p in range(2):
iscans = np.where(scandir == p)[0]
if len(iscans) > 0:
for r in range(meta.nreads):
if opterr is not None:
normerr[iscans[r::meta.nreads]] /= np.ma.mean(
normspec[iscans[r::meta.nreads]], axis=0)
normspec[iscans[r::meta.nreads]] /= np.ma.mean(
normspec[iscans[r::meta.nreads]], axis=0)
else:
if opterr is not None:
normerr = normerr/np.ma.mean(normspec, axis=0)
normspec = normspec/np.ma.mean(normspec, axis=0)
if opterr is not None:
return normspec, normerr
else:
return normspec
[docs]def get_mad(meta, log, wave_1d, optspec, optmask=None,
wave_min=None, wave_max=None):
"""Computes variation on median absolute deviation (MAD) using ediff1d
for 2D data.
Parameters
----------
meta : eureka.lib.readECF.MetaClass
Unused. The metadata object.
log : logedit.Logedit
The current log.
wave_1d : ndarray
Wavelength array (nx) with trimmed edges depending on xwindow and
ywindow which have been set in the S3 ecf
optspec : ndarray
Optimally extracted spectra, 2D array (time, nx)
optmask : ndarray (1D), optional
A mask array to use if optspec is not a masked array. Defaults to None
in which case only the invalid values of optspec will be masked.
wave_min : float; optional
Minimum wavelength for binned lightcurves, as given in the S4 .ecf
file. Defaults to None which does not impose a lower limit.
wave_maxf : float; optional
Maximum wavelength for binned lightcurves, as given in the S4 .ecf
file. Defaults to None which does not impose an upper limit.
Returns
-------
mad : float
Single MAD value in ppm
"""
optspec = np.ma.masked_invalid(optspec)
optspec = np.ma.masked_where(optmask, optspec)
if wave_min is not None:
iwmin = np.argmin(np.abs(wave_1d-wave_min))
else:
iwmin = 0
if wave_max is not None:
iwmax = np.argmin(np.abs(wave_1d-wave_max))
else:
iwmax = None
# Normalize the spectrum
normspec = normalize_spectrum(meta, optspec[:, iwmin:iwmax])
# Compute the MAD
n_int = normspec.shape[0]
ediff = np.ma.zeros(n_int)
for m in range(n_int):
ediff[m] = get_mad_1d(normspec[m])
if meta.inst == 'wfc3':
scandir = np.repeat(meta.scandir, meta.nreads)
# Compute the MAD for each scan direction
for p in range(2):
iscans = np.where(scandir == p)[0]
if len(iscans) > 0:
mad = np.ma.mean(ediff[iscans])
log.writelog(f"Scandir {p} MAD = {int(np.round(mad))} ppm")
setattr(meta, f'mad_scandir{p}', mad)
return np.ma.mean(ediff)
[docs]def get_mad_1d(data, ind_min=0, ind_max=-1):
"""Computes variation on median absolute deviation (MAD) using ediff1d
for 1D data.
Parameters
----------
data : ndarray
The array from which to calculate MAD.
int_min : int
Minimum index to consider.
ind_max : int
Maximum index to consider (excluding ind_max).
Returns
-------
mad : float
Single MAD value in ppm
"""
return 1e6 * np.ma.median(np.ma.abs(np.ma.ediff1d(data[ind_min:ind_max])))
[docs]def read_time(meta, data, log):
"""Read in a time CSV file instead of using the FITS time array.
Parameters
----------
meta : eureka.lib.readECF.MetaClass
The metadata object.
data : Xarray Dataset
The Dataset object with the fits data stored inside.
log : logedit.Logedit
The current log.
Returns
-------
time : ndarray
The time array stored in the meta.time_file CSV file.
"""
fname = os.path.join(meta.topdir,
os.sep.join(meta.time_file.split(os.sep)))
if meta.firstFile:
log.writelog(' Note: Using the time stamps from:\n '+fname)
time = np.loadtxt(fname).flatten()[data.attrs['intstart']:
data.attrs['intend']-1]
return time
[docs]def manmask(data, meta, log):
'''Manually mask input bad pixels.
Parameters
----------
data : Xarray Dataset
The Dataset object.
meta : eureka.lib.readECF.MetaClass
The metadata object.
log : logedit.Logedit
The current log.
Returns
-------
data : Xarray Dataset
The updated Dataset object with requested pixels masked.
'''
log.writelog(" Masking manually identified bad pixels...",
mute=(not meta.verbose))
for i in range(len(meta.manmask)):
colstart, colend, rowstart, rowend = meta.manmask[i]
data['mask'][rowstart:rowend, colstart:colend] = 0
return data