Source code for eureka.S5_lightcurve_fitting.models.GPModel

import numpy as np
import george
from george import kernels
import celerite

from .Model import Model
from ..likelihood import update_uncertainty
from ...lib.split_channels import split

# tinygp is not supported yet
    import tinygp
except ModuleNotFoundError:
    # tinygp isn't supported yet, so don't throw an exception if it
    # isn't installed

[docs] class GPModel(Model): """Model for Gaussian Process (GP)""" def __init__(self, kernel_classes, kernel_inputs, lc, gp_code='george', normalize=False, useHODLR=False, **kwargs): """Initialize the GP model. Parameters ---------- kernel_classes : list The types of GP kernels to use. kernel_inputs : list The names of the GP kernel inputs. lc : eureka.S5_lightcurve_fitting.lightcurve The current lightcurve object. gp_code : str; optional Type GP package to use from ('george', 'celerite'), by default 'george'. normalize : bool; optional If True, normalize the covariate by mean subtracting it and dividing by the standard deviation. By default, False. useHODLR : bool; optional If True, use george's HODLRSolver instead of the default solver. Only relevant if gp_code is 'george'. By default, False. **kwargs : dict Additional parameters to pass to eureka.S5_lightcurve_fitting.models.Model.__init__(). Can pass in the parameters, longparamlist, nchan, and paramtitles arguments here. """ # Inherit from Model class super().__init__(**kwargs) # Define model type (physical, systematic, other) self.modeltype = 'GP' # Get GP parameters self.gp_code_name = gp_code self.normalize = normalize self.kernel_types = kernel_classes self.kernel_input_names = kernel_inputs self.kernel_inputs = None self.useHODLR = useHODLR self.nkernels = len(kernel_classes) self.flux = lc.flux = np.ones_like(self.flux) self.unc = lc.unc self.unc_fit = lc.unc_fit self.time = lc.time if self.gp_code_name == 'celerite': if self.nkernels > 1: raise AssertionError('Celerite cannot compute multi-' 'dimensional GPs. Please choose a ' 'different GP code') elif self.kernel_types[0] != 'Matern32': raise AssertionError('Our celerite implementation currently ' 'only supports a Matern32 kernel.') # Update coefficients self.coeffs = np.zeros((self.nchannel_fitted, self.nkernels, 2)) self._parse_coeffs() def _parse_coeffs(self): """Convert dict of coefficients into an array.""" # Parse parameters as coefficients for c in range(self.nchannel_fitted): if self.nchannel_fitted > 1: chan = self.fitted_channels[c] else: chan = 0 if chan == 0: chankey = '' else: chankey = f'_{chan}' for i, par in enumerate(['A', 'm']): for k in range(self.nkernels): if k == 0: kernelkey = '' else: kernelkey = str(k) try: index = f'{par}{kernelkey}{chankey}' self.coeffs[c, k, i] = self.parameters.dict[index][0] except KeyError: pass
[docs] def update(self, newparams, **kwargs): # Inherit from Model class super().update(newparams, **kwargs) self.unc_fit = update_uncertainty(newparams, self.nints, self.unc, self.freenames)
[docs] def eval(self, fit, channel=None, gp=None, **kwargs): """Compute GP with the given parameters Parameters ---------- fit : ndarray The rest of the current model evaluated. channel : int; optional If not None, only consider one of the channels. Defaults to None. gp : celerite.GP, george.GP, or tinygp.GaussianProcess; optional The current GP object. **kwargs : dict Must pass in the time array here if not already set. Returns ------- lcfinal : ndarray Predicted systematics model """ if channel is None: nchan = self.nchannel_fitted channels = self.fitted_channels else: nchan = 1 channels = [channel, ] # Get the time if self.time is None: self.time = kwargs.get('time') = fit lcfinal = np.array([]) for c in range(nchan): if self.nchannel_fitted > 1: chan = channels[c] # get flux and uncertainties for current channel flux, unc_fit = split([self.flux, self.unc_fit], self.nints, chan) if channel is None: fit = split([, ], self.nints, chan)[0] else: # If only a specific channel is being evaluated, then only # that channel's fitted mode will be passed in fit = else: chan = 0 # get flux and uncertainties for current channel flux = self.flux fit = unc_fit = self.unc_fit residuals = flux-fit time = self.time if self.multwhite: # Split the arrays that have lengths of the original time axis time = split([time, ], self.nints, chan)[0] # Create the GP object with current parameters if gp is None: gp = self.setup_GP(c) if self.gp_code_name == 'george': gp.compute(self.kernel_inputs[chan].T, unc_fit) mu = gp.predict(residuals, self.kernel_inputs[chan].T, return_cov=False) elif self.gp_code_name == 'celerite': gp.compute(self.kernel_inputs[chan][0], yerr=unc_fit) mu = gp.predict(residuals) elif self.gp_code_name == 'tinygp': cond_gp = gp.condition(residuals, noise=unc_fit).gp mu = cond_gp.loc lcfinal = np.append(lcfinal, mu) return lcfinal
[docs] def setup_inputs(self): """Setting up kernel inputs as array and standardizing them if asked. For details on the benefits of normalization, see e.g. Evans et al. 2017. """ self.kernel_inputs = [] for c in range(self.nchannel_fitted): if self.nchannel_fitted > 1: chan = self.fitted_channels[c] else: chan = 0 kernel_inputs_channel = [] for name in self.kernel_input_names: if name == 'time': x = np.copy(self.time) else: # add more input options here raise ValueError('Currently, only GPs as a function of ' 'time are supported, but you have ' 'specified a GP as a function of ' f'{name}.') if self.multwhite: x = split([x, ], self.nints, chan)[0] if self.normalize: x = (x-x.mean())/x.std() kernel_inputs_channel.append(x) kernel_inputs_channel = np.array(kernel_inputs_channel) # Need to reshape if there is only one covariate if len(kernel_inputs_channel.shape) == 1: kernel_inputs_channel = kernel_inputs_channel[np.newaxis] self.kernel_inputs.append(kernel_inputs_channel)
[docs] def setup_GP(self, c=0): """Set up GP kernels and GP object. Parameters ---------- c : int; optional The current channel index. Defaults to 0. Returns ------- celerite.GP, george.GP, or tinygp.GaussianProcess The GP object to use for this fit. """ if self.kernel_inputs is None: self.setup_inputs() # get the kernel which is the sum of the individual kernel functions kernel = self.get_kernel(self.kernel_types[0], 0, c) for k in range(1, self.nkernels): kernel += self.get_kernel(self.kernel_types[k], k, c) # Make the gp object if self.gp_code_name == 'george': if self.useHODLR: solver = george.solvers.HODLRSolver else: solver = None gp = george.GP(kernel, mean=0, fit_mean=False, solver=solver) elif self.gp_code_name == 'celerite': gp = celerite.GP(kernel, mean=0, fit_mean=False) elif self.gp_code_name == 'tinygp': if self.nchannel_fitted > 1: chan = self.fitted_channels[c] else: chan = 0 gp = tinygp.GaussianProcess(kernel, self.kernel_inputs[chan].T, mean=0) return gp
[docs] def get_kernel(self, kernel_name, k, c=0): """Get individual kernels. Parameters ---------- kernel_name : str The name of the kernel to get. k : int The kernel number. c : int; optional The channel index, by default 0. Returns ------- kernel The requested george, celerite, or tinygp kernel. Raises ------ AssertionError Celerite currently only supports a Matern32 kernel. """ if self.gp_code_name == 'george': # get metric and amplitude for the current kernel and channel amp = np.exp(self.coeffs[c, k, 0]*2) # Want exp(sigma)^2 metric = np.exp(self.coeffs[c, k, 1]*2) if kernel_name == 'Matern32': kernel = amp*kernels.Matern32Kernel( metric, ndim=self.nkernels, axes=k) elif kernel_name == 'ExpSquared': kernel = amp*kernels.ExpSquaredKernel( metric, ndim=self.nkernels, axes=k) elif kernel_name == 'RationalQuadratic': kernel = amp*kernels.RationalQuadraticKernel( log_alpha=1, metric=metric, ndim=self.nkernels, axes=k) elif kernel_name == 'Exp': kernel = amp*kernels.ExpKernel( metric, ndim=self.nkernels, axes=k) else: raise AssertionError(f'The kernel {kernel_name} is not in the ' 'currently supported list of kernels for ' 'george which includes:\nMatern32, ' 'ExpSquared, RationalQuadratic, Exp.') elif self.gp_code_name == 'celerite': # get metric and amplitude for the current kernel and channel amp = self.coeffs[c, k, 0] metric = self.coeffs[c, k, 1] kernel = celerite.terms.Matern32Term(log_sigma=0, log_rho=metric) # Setting the amplitude kernel *= celerite.terms.RealTerm(log_a=amp, log_c=-10) elif self.gp_code_name == 'tinygp': # get metric and amplitude for the current kernel and channel amp = np.exp(self.coeffs[c, k, 0]*2) # Want exp(sigma)^2 metric = np.exp(self.coeffs[c, k, 1]*2) if kernel_name == 'Matern32': kernel = amp*tinygp.kernels.Matern32(metric) elif kernel_name == 'ExpSquared': kernel = amp*tinygp.kernels.ExpSquared(metric) elif kernel_name == 'RationalQuadratic': kernel = amp*tinygp.kernels.RationalQuadratic(alpha=1, scale=metric) elif kernel_name == 'Exp': kernel = amp*tinygp.kernels.Exp(metric) else: raise AssertionError(f'The kernel {kernel_name} is not in the ' 'currently supported list of kernels for ' 'tinygp which includes:\nMatern32, ' 'ExpSquared, RationalQuadratic, Exp.') return kernel
[docs] def loglikelihood(self, fit, channel=None): """Compute log likelihood of GP Parameters ---------- fit : ndarray The fitted model. channel : int; optional If not None, only consider one of the channels. Defaults to None. Returns ------- float log likelihood of the GP evaluated by george/tinygp/celerite """ if channel is None: nchan = self.nchannel_fitted channels = self.fitted_channels else: nchan = 1 channels = [channel, ] # update uncertainty = fit logL = [] for c in np.arange(nchan): if self.nchannel_fitted > 1: chan = channels[c] # get flux and uncertainties for current channel flux, fit, unc_fit = split([self.flux,, self.unc_fit], self.nints, chan) else: chan = 0 # get flux and uncertainties for current channel flux = self.flux fit = unc_fit = self.unc_fit residuals = flux-fit # set up GP with current parameters gp = self.setup_GP(c) if self.gp_code_name == 'george': gp.compute(self.kernel_inputs[chan].T, unc_fit) logL_temp = gp.lnlikelihood(residuals, quiet=True) elif self.gp_code_name == 'celerite': gp.compute(self.kernel_inputs[chan][0], yerr=unc_fit) logL_temp = gp.log_likelihood(residuals) elif self.gp_code_name == 'tinygp': cond = gp.condition(residuals, diag=unc_fit) logL_temp = cond.log_probability logL.append(logL_temp) return sum(logL)