Source code for eureka.S5_lightcurve_fitting.models.Model

import numpy as np
import matplotlib.pyplot as plt
import copy
import os

from ..utils import COLORS
from ...lib.readEPF import Parameters
from ...lib.split_channels import split


[docs] class Model: def __init__(self, **kwargs): """Create a model instance. Parameters ---------- **kwargs : dict Parameters to set in the Model object. Any parameter named log will not be loaded into the Model object as Logedit objects cannot be pickled which is required for multiprocessing. """ # Set up default model attributes self.name = kwargs.get('name', 'New Model') self.nchannel = kwargs.get('nchannel', 1) self.nchannel_fitted = kwargs.get('nchannel_fitted', 1) self.fitted_channels = kwargs.get('fitted_channels', [0, ]) self.multwhite = kwargs.get('multwhite') self.nints = kwargs.get('nints') self.fitter = kwargs.get('fitter', None) self.time = kwargs.get('time', None) self.time_units = kwargs.get('time_units', 'BMJD_TDB') self.flux = kwargs.get('flux', None) self.freenames = kwargs.get('freenames', None) self._parameters = kwargs.get('parameters', Parameters()) self.longparamlist = kwargs.get('longparamlist', None) self.paramtitles = kwargs.get('paramtitles', None) self.modeltype = kwargs.get('modeltype', None) self.fmt = kwargs.get('fmt', None) # Store the arguments as attributes for arg, val in kwargs.items(): if arg != 'log': setattr(self, arg, val) def __mul__(self, other): """Multiply model components to make a combined model. Parameters ---------- other : eureka.S5_lightcurve_fitting.models.Model The model to multiply. Returns ------- eureka.S5_lightcurve_fitting.models.CompositeModel The combined model. """ # Make sure it is the right type attrs = ['flux', 'time'] if not all([hasattr(other, attr) for attr in attrs]): raise TypeError('Only another Model instance may be multiplied.') # Combine the model parameters too parameters = self.parameters + other.parameters if self.paramtitles is None: paramtitles = other.paramtitles elif other.paramtitles is not None: paramtitles = self.paramtitles.append(other.paramtitles) else: paramtitles = self.paramtitles return CompositeModel([copy.copy(self), other], parameters=parameters, paramtitles=paramtitles) @property def flux(self): """A getter for the flux.""" return self._flux @flux.setter def flux(self, flux_array): """A setter for the flux Parameters ---------- flux_array : sequence The flux array """ # Check the type if not isinstance(flux_array, (np.ndarray, tuple, list, type(None))): raise TypeError("flux axis must be a tuple, list, or numpy array.") # Set the array self._flux = np.ma.masked_array(flux_array) @property def time(self): """A getter for the time""" return self._time @time.setter def time(self, time_array): """A setter for the time""" # Check the type if not isinstance(time_array, (np.ndarray, tuple, list, type(None))): raise TypeError("Time axis must be a tuple, list, or numpy array.") # Set the array # self._time = np.array(time_array) self._time = np.ma.masked_array(time_array) @property def parameters(self): """A getter for the parameters.""" return self._parameters @parameters.setter def parameters(self, params): """A setter for the parameters.""" # Process if it is a parameters file if isinstance(params, str) and os.path.isfile(params): params = Parameters(params) # Or a Parameters instance if (params is not None) and (type(params).__name__ != Parameters.__name__): raise TypeError("'params' argument must be a JSON file, " "ascii file, or parameters.Parameters instance.") # Set the parameters attribute self._parameters = params
[docs] def interp(self, new_time, nints, **kwargs): """Evaluate the model over a different time array. Parameters ---------- new_time : sequence The time array. nints : list The number of integrations for each channel, for the new time array. **kwargs : dict Additional parameters to pass to self.eval(). """ # Save the current values old_time = copy.deepcopy(self.time) old_nints = copy.deepcopy(self.nints) # Evaluate the model on the new time array self.time = new_time self.nints = nints interp_flux = self.eval(**kwargs) # Reset the old values self.time = old_time self.nints = old_nints return interp_flux
[docs] def update(self, newparams, **kwargs): """Update the model with new parameter values. Parameters ---------- newparams : ndarray New parameter values. **kwargs : dict Unused by the base eureka.S5_lightcurve_fitting.models.Model class. """ for val, arg in zip(newparams, self.freenames): # For now, the dict and Parameter are separate self.parameters.dict[arg][0] = val getattr(self.parameters, arg).value = val self._parse_coeffs() return
def _parse_coeffs(self): """A placeholder function to do any additional processing when calling update. """ return
[docs] def plot(self, components=False, ax=None, draw=False, color='blue', zorder=np.inf, share=False, chan=0, **kwargs): """Plot the model. Parameters ---------- components : bool; optional Plot all model components. ax : Matplotlib Axes; optional The figure axes to plot on. draw : bool; optional Whether or not to display the plot. Defaults to False. color : str; optional The color to use for the plot. Defaults to 'blue'. zorder : numeric; optional The zorder for the plot. Defaults to np.inf. share : bool; optional Whether or not this model is a shared model. Defaults to False. chan : int; optional The current channel number. Detaults to 0. **kwargs : dict Additional parameters to pass to plot and self.eval(). """ # Make the figure if ax is None: fig = plt.figure(5103, figsize=(8, 6)) ax = fig.gca() # Plot the model label = self.fitter if self.name != 'New Model': label += ': '+self.name if not share: channel = 0 else: channel = chan model = self.eval(channel=channel, incl_GP=True, **kwargs) time = self.time if self.multwhite: # Split the arrays that have lengths of the original time axis time = split([time, ], self.nints, chan)[0] ax.plot(time, model, '.', ls='', ms=1, label=label, color=color, zorder=zorder) if components and self.components is not None: for component in self.components: component.plot(self.time, ax=ax, draw=False, color=next(COLORS), zorder=zorder, share=share, chan=chan, **kwargs) # Format axes ax.set_xlabel(str(self.time_units)) ax.set_ylabel('Flux') if draw: fig.show() else: return
[docs] class CompositeModel(Model): """A class to create composite models.""" def __init__(self, models, **kwargs): """Initialize the composite model. Parameters ---------- models : sequence The list of models. **kwargs : dict Additional parameters to pass to eureka.S5_lightcurve_fitting.models.Model.__init__(). """ # Store the models self.components = models # Inherit from Model class super().__init__(**kwargs) self.GP = False for component in self.components: if component.modeltype == 'GP': self.GP = True @property def freenames(self): """A getter for the freenames.""" return self._freenames @freenames.setter def freenames(self, freenames): """A setter for the freenames.""" # Update the components' freenames for component in self.components: component.freenames = freenames # Set the freenames attribute self._freenames = freenames
[docs] def eval(self, incl_GP=False, channel=None, **kwargs): """Evaluate the model components. Parameters ---------- incl_GP : bool; optional Whether or not to include the GP's predictions in the evaluated model predictions. channel : int; optional If not None, only consider one of the channels. Defaults to None. **kwargs : dict Must pass in the time array here if not already set. Returns ------- flux : ndarray The evaluated model predictions at the times self.time. """ # Get the time if self.time is None: self.time = kwargs.get('time') if channel is None: nchan = self.nchannel_fitted else: nchan = 1 if self.multwhite: time = self.time if channel is not None: # Split the arrays that have lengths of the original time axis time = split([time, ], self.nints, channel)[0] flux = np.ones(len(time)) else: flux = np.ones(len(self.time)*nchan) # Evaluate flux of each component for component in self.components: if component.time is None: component.time = self.time if component.modeltype != 'GP': flux *= component.eval(channel=channel, **kwargs) if incl_GP: flux += self.GPeval(flux, channel=channel, **kwargs) return flux
[docs] def syseval(self, channel=None, incl_GP=False, **kwargs): """Evaluate the systematic model components only. Parameters ---------- channel : int; optional If not None, only consider one of the channels. Defaults to None. incl_GP : bool; optional Whether or not to include the GP's predictions in the evaluated model predictions. **kwargs : dict Must pass in the time array here if not already set. Returns ------- flux : ndarray The evaluated systematics model predictions at the times self.time. """ # Get the time if self.time is None: self.time = kwargs.get('time') if channel is None: nchan = self.nchannel_fitted else: nchan = 1 if self.multwhite: time = self.time if channel is not None: # Split the arrays that have lengths of the original time axis time = split([time, ], self.nints, channel)[0] flux = np.ones(len(time)) else: flux = np.ones(len(self.time)*nchan) # Evaluate flux at each component for component in self.components: if component.modeltype == 'systematic': if component.time is None: component.time = self.time flux *= component.eval(channel=channel, **kwargs) if incl_GP: flux += self.GPeval(flux, channel=channel, **kwargs) return flux
[docs] def GPeval(self, fit, channel=None, **kwargs): """Evaluate the GP model components only. Parameters ---------- fit : ndarray The model predictions (excluding the GP). channel : int; optional If not None, only consider one of the channels. Defaults to None. **kwargs : dict Must pass in the time array here if not already set. Returns ------- flux : ndarray The evaluated GP model predictions at the times self.time. """ # Get the time if self.time is None: self.time = kwargs.get('time') if channel is None: nchan = self.nchannel_fitted else: nchan = 1 if self.multwhite: time = self.time if channel is not None: # Split the arrays that have lengths of the original time axis time = split([time, ], self.nints, channel)[0] flux = np.zeros(len(time)) else: flux = np.zeros(len(self.time)*nchan) # Evaluate flux for component in self.components: if component.modeltype == 'GP': flux = component.eval(fit, channel=channel, **kwargs) return flux
[docs] def physeval(self, interp=False, channel=None, **kwargs): """Evaluate the physical model components only. Parameters ---------- interp : bool; optional Whether to uniformly sample in time or just use the self.time time points. Defaults to False. channel : int; optional If not None, only consider one of the channels. Defaults to None. **kwargs : dict Must pass in the time array here if not already set. Returns ------- flux : ndarray The evaluated physical model predictions at the times self.time if interp==False, else at evenly spaced times between self.time[0] and self.time[-1] with spacing self.time[1]-self.time[0]. new_time : ndarray The time values at which flux has been computed. nints_interp : list The number of time points per lightcurve for each lightcurve (after interpolation if interp is True). """ # Get the time if self.time is None: self.time = kwargs.get('time') if channel is None: nchan = self.nchannel_fitted channels = self.fitted_channels else: nchan = 1 channels = [channel] if interp: if self.multwhite: new_time = [] nints_interp = [] for chan in channels: # Split the arrays that have lengths of # the original time axis time = split([self.time, ], self.nints, chan)[0] dt = time[1]-time[0] steps = int(np.round((time[-1]-time[0])/dt+1)) nints_interp.append(steps) new_time.extend(np.linspace(time[0], time[-1], steps, endpoint=True)) new_time = np.array(new_time) else: time = self.time dt = time[1]-time[0] steps = int(np.round((time[-1]-time[0])/dt+1)) nints_interp = np.ones(nchan)*steps new_time = np.linspace(time[0], time[-1], steps, endpoint=True) else: new_time = self.time if self.multwhite and channel is not None: # Split the arrays that have lengths of the original time axis new_time = split([new_time, ], self.nints, channel)[0] nints_interp = self.nints # Setup the flux array if self.multwhite: flux = np.ones(len(new_time)) else: flux = np.ones(len(new_time)*nchan) # Evaluate flux at each component for component in self.components: if component.modeltype == 'physical': if component.time is None: component.time = self.time if interp: flux *= component.interp(new_time, nints_interp, **kwargs) else: flux *= component.eval(channel=channel, **kwargs) return flux, new_time, nints_interp
[docs] def update(self, newparams, **kwargs): """Update parameters in the model components. Parameters ---------- newparams : ndarray New parameter values. **kwargs : dict Additional parameters to pass to eureka.S5_lightcurve_fitting.models.Model.update(). """ for component in self.components: component.update(newparams, **kwargs)