import numpy as np
from . import sort_nicely as sn
import os
import glob
[docs]def readfiles(meta):
"""Reads in the files saved in topdir + inputdir and saves them into a list.
Parameters
----------
meta : eureka.lib.readECF.MetaClass
The metadata object.
Returns
-------
meta : eureka.lib.readECF.MetaClass
The metadata object with added segment_list containing the sorted
data fits files.
"""
meta.segment_list = []
for fname in os.listdir(meta.inputdir):
if fname.endswith(meta.suffix + '.fits'):
meta.segment_list.append(meta.inputdir + fname)
meta.segment_list = np.array(sn.sort_nicely(meta.segment_list))
return meta
[docs]def trim(data, meta):
"""Removes the edges of the data arrays.
Parameters
----------
data : Xarray Dataset
The Dataset object.
meta : eureka.lib.readECF.MetaClass
The metadata object.
Returns
-------
subdata : Xarray Dataset
A new Dataset object with arrays that have been trimmed, depending on
xwindow and ywindow as set in the S3 ecf.
meta : eureka.lib.readECF.MetaClass
The metadata object.
"""
subdata = data.isel(y=np.arange(meta.ywindow[0], meta.ywindow[1]),
x=np.arange(meta.xwindow[0], meta.xwindow[1]))
meta.subny = meta.ywindow[1] - meta.ywindow[0]
meta.subnx = meta.xwindow[1] - meta.xwindow[0]
if hasattr(meta, 'diffmask'):
# Need to crop diffmask and variance from WFC3 as well
meta.subdiffmask.append(
meta.diffmask[-1][:, meta.ywindow[0]:meta.ywindow[1],
meta.xwindow[0]:meta.xwindow[1]])
# data.subvariance = np.copy(
# data.variance[:, meta.ywindow[0]:meta.ywindow[1],
# meta.xwindow[0]:meta.xwindow[1]])
# delattr(data, 'variance')
return subdata, meta
[docs]def check_nans(data, mask, log, name=''):
"""Checks where a data array has NaNs.
Parameters
----------
data : ndarray
a data array (e.g. data, err, dq, ...).
mask : ndarray
Input mask.
log : logedit.Logedit
The open log in which NaNs will be mentioned if existent.
name : str; optional
The name of the data array passed in (e.g. SUBDATA, SUBERR, SUBV0).
Defaults to ''.
Returns
-------
mask : ndarray
Output mask where 0 will be written where the input data array has NaNs
"""
num_nans = np.sum(np.isnan(data))
if num_nans > 0:
log.writelog(f" WARNING: {name} has {num_nans} NaNs. Your subregion "
f"may be off the edge of the detector subarray.\n"
"Masking NaN region and continuing, but you should really"
" stop and reconsider your choices.")
inan = np.where(np.isnan(data))
# subdata[inan] = 0
mask[inan] = 0
return mask
[docs]def makedirectory(meta, stage, counter=None, **kwargs):
"""Creates a directory for the current stage.
Parameters
----------
meta : eureka.lib.readECF.MetaClass
The metadata object.
stage : str
'S#' string denoting stage number (i.e. 'S3', 'S4').
counter : int; optional
The run number if you want to force a particular run number.
Defaults to None which automatically finds the run number.
**kwargs : dict
Additional key,value pairs to add to the folder name
(e.g. {'ap': 4, 'bg': 10}).
Returns
-------
run : int
The run number
"""
# This code allows the input and output files to be stored outside
# of the Eureka! folder
rootdir = os.path.join(meta.topdir, *meta.outputdir_raw.split(os.sep))
if rootdir[-1] != os.sep:
rootdir += os.sep
outputdir = rootdir+stage+'_'+meta.datetime+'_'+meta.eventlabel+'_run'
if counter is None:
counter = 1
while os.path.exists(outputdir+str(counter)):
counter += 1
outputdir += str(counter)+os.sep
else:
outputdir += str(counter)+os.sep
# Nest the different folders underneath one main folder for this run
for key, value in kwargs.items():
outputdir += key+str(value)+'_'
# Remove trailing _ if present
if outputdir[-1] == '_':
outputdir = outputdir[:-1]
# Add trailing slash
if outputdir[-1] != os.sep:
outputdir += os.sep
if not os.path.exists(outputdir):
try:
os.makedirs(outputdir)
except (PermissionError, OSError) as e:
# Raise a more helpful error message so that users know to update
# topdir in their ecf file
message = (f'You do not have the permissions to make the folder '
f'{outputdir}\nYour topdir is currently set to'
f'{meta.topdir}, but your user account is called '
f'{os.getenv("USER")}.\nYou likely need to update the '
f'topdir setting in your {stage} .ecf file.')
raise PermissionError(message) from e
if not os.path.exists(os.path.join(outputdir, "figs")):
os.makedirs(os.path.join(outputdir, "figs"))
return counter
[docs]def pathdirectory(meta, stage, run, old_datetime=None, **kwargs):
"""Finds the directory for the requested stage, run, and datetime
(or old_datetime).
Parameters
----------
meta : eureka.lib.readECF.MetaClass
The metadata object.
stage : str
'S#' string denoting stage number (i.e. 'S3', 'S4')
run : int
run #, output from makedirectory function
old_datetime : str; optional
The date that a previous run was made (for looking up old data).
Defaults to None in which case meta.datetime is used instead.
**kwargs : dict
Additional key,value pairs to add to the folder name
(e.g. {'ap': 4, 'bg': 10}).
Returns
-------
path : str
Directory path for given parameters
"""
if old_datetime is not None:
datetime = old_datetime
else:
datetime = meta.datetime
# This code allows the input and output files to be stored outside
# of the Eureka! folder
rootdir = os.path.join(meta.topdir, *meta.outputdir_raw.split(os.sep))
if rootdir[-1] != os.sep:
rootdir += os.sep
outputdir = (rootdir+stage+'_'+datetime+'_'+meta.eventlabel+'_run' +
str(run)+os.sep)
for key, value in kwargs.items():
outputdir += key+str(value)+'_'
# Remove trailing _ if present
if outputdir[-1] == '_':
outputdir = outputdir[:-1]
# Add trailing slash
if outputdir[-1] != os.sep:
outputdir += os.sep
return outputdir
[docs]def find_fits(meta):
'''Locates S1 or S2 output FITS files if unable to find an metadata file.
Parameters
----------
meta : eureka.lib.readECF.MetaClass
The new meta object for the current stage processing.
Returns
-------
meta : eureka.lib.readECF.MetaClass
The meta object with the updated inputdir pointing to the location of
the input files to use.
Notes
-----
History:
- April 25, 2022 Taylor Bell
Initial version.
'''
fnames = glob.glob(meta.inputdir+'*'+meta.suffix + '.fits')
if len(fnames) == 0:
# There were no rateints files in that folder, so let's see if
# there are in children folders
fnames = glob.glob(meta.inputdir+'**'+os.sep+'*'+meta.suffix+'.fits',
recursive=True)
fnames = sn.sort_nicely(fnames)
if len(fnames) == 0:
# If the code can't find any of the reqested files, raise an error
# and give a helpful message
message = (f'Unable to find any "{meta.suffix}.fits" files in the '
f'inputdir: \n"{meta.inputdir}"!\nYou likely need to change'
f' the inputdir in {meta.filename} to point to the folder '
f'containing the "{meta.suffix}.fits" files.')
raise AssertionError(message)
folders = np.unique([os.sep.join(fname.split(os.sep)[:-1])
for fname in fnames])
if len(folders) >= 1:
# get the file with the latest modified time
folder = max(folders, key=os.path.getmtime)
if len(folders) > 1:
# There may be multiple runs - use the most recent but warn the user
print(f'WARNING: There are multiple folders containing '
f'"{meta.suffix}.fits" files in your inputdir:\n'
f'"{meta.inputdir}"\n'
f'Using the files in: \n{folder}\n'
f'and will consider aperture ranges listed there. If this '
f'metadata file is not a part\nof the run you intended, please '
f'provide a more precise folder for the metadata file.')
meta.inputdir = folder
meta.inputdir_raw = folder[len(meta.topdir):]
# Make sure there's a trailing slash at the end of the paths
if meta.inputdir[-1] != os.sep:
meta.inputdir += os.sep
return meta
[docs]def get_mad(meta, wave_1d, optspec, wave_min=None, wave_max=None):
"""Computes variation on median absolute deviation (MAD) using ediff1d
for 2D data.
Parameters
----------
meta : eureka.lib.readECF.MetaClass
Unused. The metadata object.
wave_1d : ndarray
Wavelength array (nx) with trimmed edges depending on xwindow and
ywindow which have been set in the S3 ecf
optspec : ndarray
Optimally extracted spectra, 2D array (time, nx)
wave_min : float; optional
Minimum wavelength for binned lightcurves, as given in the S4 .ecf
file. Defaults to None which does not impose a lower limit.
wave_maxf : float; optional
Maximum wavelength for binned lightcurves, as given in the S4 .ecf
file. Defaults to None which does not impose an upper limit.
Returns
-------
mad : float
Single MAD value in ppm
"""
optspec = np.ma.masked_invalid(optspec)
n_int, nx = optspec.shape
if wave_min is not None:
iwmin = np.argmin(np.abs(wave_1d-wave_min))
else:
iwmin = 0
if wave_max is not None:
iwmax = np.argmin(np.abs(wave_1d-wave_max))
else:
iwmax = None
normspec = optspec / np.ma.mean(optspec, axis=0)
ediff = np.ma.zeros(n_int)
for m in range(n_int):
ediff[m] = get_mad_1d(normspec[m], iwmin, iwmax)
mad = np.ma.mean(ediff)
return mad
[docs]def get_mad_1d(data, ind_min=0, ind_max=-1):
"""Computes variation on median absolute deviation (MAD) using ediff1d
for 1D data.
Parameters
----------
data : ndarray
The array from which to calculate MAD.
int_min : int
Minimum index to consider.
ind_max : int
Maximum index to consider (excluding ind_max).
Returns
-------
mad : float
Single MAD value in ppm
"""
return 1e6 * np.ma.median(np.ma.abs(np.ma.ediff1d(data[ind_min:ind_max])))
[docs]def read_time(meta, data):
"""Read in a time CSV file instead of using the FITS time array.
Parameters
----------
meta : eureka.lib.readECF.MetaClass
The metadata object.
data : Xarray Dataset
The Dataset object with the fits data stored inside.
Returns
-------
time : ndarray
The time array stored in the meta.time_file CSV file.
"""
fname = os.path.join(meta.topdir,
os.sep.join(meta.time_file.split(os.sep)))
if meta.firstFile:
print(' Note: Using the time stamps from:\n'+fname)
time = np.loadtxt(fname).flatten()[data.attrs['intstart']-1:
data.attrs['intend']-1]
return time