import numpy as np
import re
import shutil
import os
from astroquery.mast import Observations
import astropy.io.fits as pf
from astropy.io import ascii
[docs]
def columnNames():
"""Print column names from MAST Observation table.
Notes
-----
History:
- June 2022 Kevin Stevenson
Initial version
"""
meta_table = Observations.get_metadata("observations")
for val in meta_table['Column Name']:
print(val)
return
[docs]
def writeTable_JWST(proposal_id, observation, visit, filename, format='csv'):
"""Write all products from specified visit to an ASCII file
Parameters
----------
proposal_id : str or int
JWST proposal/program ID (e.g., 1366).
observation : str or int
JWST observation number listed on the Visit Status Report (e.g., 2).
See www.stsci.edu/cgi-bin/get-visit-status?id=XXXXX&observatory=JWST,
where XXXXX is the proposal/program ID.
visit : str or int
JWST visit number listed on the Visit Status Report (e.g., 1).
See www.stsci.edu/cgi-bin/get-visit-status?id=XXXXX&observatory=JWST,
where XXXXX is the proposal/program ID.
filename : str; optional
The file format to use. Defaults to 'csv'.
Notes
-----
History:
- July 2022 Kevin Stevenson
Initial version
"""
# Convert to string
if type(proposal_id) is not str:
proposal_id = str(proposal_id).zfill(5)
if type(observation) is not str:
observation = str(observation).zfill(3)
if type(visit) is not str:
visit = str(visit).zfill(3)
# Specify obsid using wildcards
obsid = 'jw'+proposal_id+observation+visit+'*'
# Query MAST for requested visit
sci_table = Observations.query_criteria(proposal_id=proposal_id,
obs_id=obsid)
ascii.write(sci_table, filename, format=format)
return
[docs]
def login(mast_token=None):
"""Log into the MAST portal.
Parameters
----------
mast_token : str; optional
The token to authenticate the user. Default is None.
This can be generated at https://auth.mast.stsci.edu/token.
If not supplied, it will be prompted for if not in the keyring
or set via $MAST_API_TOKEN.
Notes
-----
History:
- July 2022 Kevin Stevenson
Initial version
"""
Observations.login(mast_token)
return
[docs]
def logout():
"""Log out of current MAST session.
Notes
-----
History:
- July 2022 Kevin Stevenson
Initial version
"""
Observations.logout()
return
[docs]
def downloadHST(proposal_id, visit, inst='WFC3', download_dir='.',
subgroup='IMA'):
"""Download observation visit number from specified proposal ID.
Parameters
----------
proposal_id : str or int
HST proposal/program ID (e.g., 13467).
visit : str or int
HST visit number listed on the Visit Status Report (e.g., 60).
See https://www.stsci.edu/cgi-bin/get-visit-status?id=XXXXX,
where XXXXX is the proposal/program ID.
inst : str; optional
HST instrument name, can be upper or lower case.
Supported options include: WFC3, STIS, COS, or FGS.
Defaults to 'WFC3'.
download_dir : str; optional
Temporary download directory will be 'download_dir'/mastDownload/...
Defaults to '.'.
subgroup : str; optional
FITS file type (usually IMA, sometimes FLT). Defaults to 'IMA'.
Returns
-------
result : AstroPy Table
The manifest of files downloaded.
Notes
-----
History:
- June 2022 Kevin Stevenson
Initial version
"""
# Convert to string
if type(proposal_id) is not str:
proposal_id = str(proposal_id).zfill(5)
if type(visit) is not str:
visit = str(visit).zfill(2)
# Determine instrument ID, as indicated by a single letter
# L=COS; I=WFC3; J=ACS; N=NICMOS; O=STIS; U=WFPC2; W=WFPC;
# X=FOC; Y=FOS; Z=GHRS; F=FGS; V=HSP;
if inst.casefold() == 'wfc3':
iid = 'i'
elif inst.casefold() == 'stis':
iid = 'o'
elif inst.casefold() == 'cos':
iid = 'l'
elif inst.casefold() == 'fgs':
iid = 'f'
else:
print(f"Unknown instrument: {inst}")
print("Supported options include: WFC3, STIS, COS, or FGS")
return None
# Specify obsid using wildcards
obsid = iid+'*'+visit+'*'
# Query MAST for requested visit
sci_table = Observations.query_criteria(proposal_id=proposal_id,
obs_id=obsid)
# AstroQuery doesn't support single character wildcards,
# so sometimes it identifies extra, unwanted files.
# Using regex to remove these unwanted files
counter = 0
for ii, val in enumerate(sci_table['obs_id']):
if not re.search('i...'+visit, val):
sci_table.remove_row(ii-counter)
# Increment counter to get index right
# when multiple files need to be removed
counter += 1
# Get product list
data_products_by_id = Observations.get_product_list(sci_table)
# Filter for IMA files
filtered = Observations.filter_products(
data_products_by_id, productSubGroupDescription=subgroup)
nimage = np.sum(filtered['dataproduct_type'] == 'image')
nspec = np.sum(filtered['dataproduct_type'] == 'spectrum')
print("Number of image products:", nimage)
print("Number of spectrum products:", nspec)
# Download data products
result = Observations.download_products(filtered, curl_flag=False,
download_dir=download_dir)
return result
[docs]
def filterJWST(proposal_id, observation, visit, calib_level, subgroup):
"""Find JWST data products by applying standard filters.
Parameters
----------
proposal_id : str or int
JWST proposal/program ID (e.g., 1366).
observation : str or int
JWST observation number listed on the Visit Status Report (e.g., 2).
See www.stsci.edu/cgi-bin/get-visit-status?id=XXXXX&observatory=JWST,
where XXXXX is the proposal/program ID.
visit : str or int
JWST visit number listed on the Visit Status Report (e.g., 2).
See www.stsci.edu/cgi-bin/get-visit-status?id=XXXXX&observatory=JWST,
where XXXXX is the proposal/program ID.
calib_level : list or int
Product Calibration Level (0 = raw, 1 = uncalibrated, 2 = calibrated,
3 = science product, 4 = contributed science product)
subgroup : str
FITS file type, varies by calib_level.
1: UNCAL, GS-ACQ1, GS-ACQ2, GS-FG, GS-ID, GS-TRACK
2: CAL, CALINTS, RATE, RATEINTS, X1DINTS, ANNNN_CRFINTS,
GS-ACQ1, GS-ACQ2, GS-FG, GS-ID, GS-TRACK, RAMP
3: X1DINTS, WHTLT
Returns
-------
table : AstroPy Table
The filtered table of data products.
Notes
-----
History:
- July 2022 Kevin Stevenson
Initial version
"""
# Convert to string
if type(proposal_id) is not str:
proposal_id = str(proposal_id).zfill(5)
if type(observation) is not str:
observation = str(observation).zfill(3)
if type(visit) is not str:
visit = str(visit).zfill(3)
if type(calib_level) is int:
calib_level = [calib_level]
# Specify obsid using wildcards, obs_id can come in two flavours
obsid = f'jw{proposal_id}{observation}{visit}_04*'
obsid2 = f'jw{proposal_id}-o{observation}_t{visit}*'
# Query MAST for requested visit
sci_table = Observations.query_criteria(proposal_id=proposal_id,
obs_id=obsid)
sci_table2 = Observations.query_criteria(proposal_id=proposal_id,
obs_id=obsid2)
table = []
if len(sci_table) > 0:
# Get product list
data_products_by_id = Observations.get_product_list(sci_table)
# Filter for desired files
table = Observations.filter_products(
data_products_by_id, productSubGroupDescription=subgroup,
calib_level=calib_level)
if len(sci_table2) > 0:
# Get product list
data_products_by_id2 = Observations.get_product_list(sci_table2)
# Filter for desired files
table2 = Observations.filter_products(
data_products_by_id2, productSubGroupDescription=subgroup,
calib_level=calib_level)
if len(sci_table) > 0 and len(sci_table2) > 0:
# Concatenate tables one row at a time, checking for uniqueness
for row in range(len(table2)):
if (~(table['obsID'] == table2['obsID'][row])).all():
table.add_row(table2[row])
elif len(sci_table2) > 0:
table = table2
print("Total number of data products:", len(table))
print("Number of data products with exclusive access:",
np.sum(table['dataRights'] == 'EXCLUSIVE_ACCESS'))
return table
[docs]
def consolidate(result, final_dir):
"""Consolidate downloaded files into a single directory
Parameters
----------
result : AstroPy Table
The manifest of files downloaded, returned from
mastDownload.download().
final_dir : str
Final destination of files.
Notes
-----
History:
- June 2022 Kevin Stevenson
Initial version
"""
# Create directory
if not os.path.exists(final_dir):
os.makedirs(final_dir)
# Move files
for path in result['Local Path']:
filename = path.split('/')[-1]
try:
shutil.move(path, os.path.join(final_dir, filename))
except:
print(f"File not found: {path}")
return
[docs]
def sortJWST(source_dir, target_dir, filetype):
"""
"""
# Create directory
if not os.path.exists(target_dir):
os.makedirs(target_dir)
# Move files
for filename in os.listdir(source_dir):
if filename.endswith(filetype):
shutil.move(os.path.join(source_dir, filename),
os.path.join(target_dir, filename))
return
[docs]
def sortHST(final_dir, sci_dir='sci', cal_dir='cal'):
"""Sort files into science and calibration subdirectories.
Parameters
----------
final_dir : str
Final destination of files.
sci_dir : str; optional
Name of science subdirectory within 'final_dir'.
Defaults to 'sci'.
cal_dir : str; optional
Name of calibration subdirectory within 'final_dir'.
Defaults to 'cal'.
Notes
-----
History:
- June 2022 Kevin Stevenson
Initial version
"""
# Create directories
if not os.path.exists(os.path.join(final_dir, sci_dir)):
os.makedirs(os.path.join(final_dir, sci_dir))
if not os.path.exists(os.path.join(final_dir, cal_dir)):
os.makedirs(os.path.join(final_dir, cal_dir))
# Move files
for filename in os.listdir(final_dir):
if filename.endswith('.fits'):
hdr = pf.getheader(os.path.join(final_dir, filename))
# If spectrum, move to science directory
if hdr['filter'].startswith('G'):
shutil.move(os.path.join(final_dir, filename),
os.path.join(final_dir, sci_dir, filename))
# If image, move to calibration directory
elif hdr['filter'].startswith('F'):
shutil.move(os.path.join(final_dir, filename),
os.path.join(final_dir, cal_dir, filename))
# Otherwise, leave file in current location
return
[docs]
def cleanup(download_dir='.'):
"""Remove empty folders from download directory.
Parameters
----------
download_dir : str; optional
Temporary download directory specified for mastDownload.download().
Defaults to '.'.
Notes
-----
History:
- June 2022 Kevin Stevenson
Initial version
"""
src_dir = os.path.join(download_dir, 'mastDownload')
for dirpath, _, _ in os.walk(src_dir, topdown=False):
try:
# Remove empty folder
os.rmdir(dirpath)
except OSError as ex:
# Report any non-empty folders
print(ex)
return