enterprise
¶by: Justin Ellis$^{1,2}$, Michele Vallisneri$^2$, Paul Baker$^1$, Steve Taylor$^2$
$^1$ Center for Gravitational Waves and Cosmology, West Virginia University
$^2$ Jet Propulsion Laboratory, California Institute of Technology
Code website: https://github.com/nanograv/enterprise
Documentation: https://enterprise.readthedocs.io/
shared/enterprise_tutorial
enterprise
?¶enterprise
is an Enhanced Numerical Toolbox Enabling a Robust PulsaR Inference SuitE (complaints about this backronym are to be directed to Steve Taylor)enterprise
really is a toolbox, not a set of black box functions and scripts!pep8
tests.enterprise
?¶PAL2
/NX01
/piccard
code?PINT
compatibility.enterprise
to be the base for all of your future pulsar timing residual modeling needs.enterprise
code architecture¶Signal
components which have their own Parameter
s.Signal
components is a SignalCollection
.enterprise
code architecture¶SignalCollection
that are combined to form a PTA
.Signal
s are shared across pulsars Likelihood
s act on PTA
s. enterprise
Signal
¶class Signal(object):
"""Base class for Signal objects."""
def get_ndiag(self, params):
"""Returns the diagonal of the white noise vector `N`.
This method also supports block diagaonal sparse matrices.
"""
return None
def get_delay(self, params):
"""Returns the waveform of a deterministic signal."""
return 0
def get_basis(self, params=None):
"""Returns the basis array of shape N_toa x N_basis."""
return None
def get_phi(self, params):
"""Returns a diagonal or full rank covaraince matrix
of the basis amplitudes."""
return None
def get_phiinv(self, params):
"""Returns inverse of the covaraince of basis amplitudes."""
return None
# define white noise parameters
efac = parameter.Uniform(0.1, 10)
log10_equad = parameter.Uniform(-10, -5)
log10_ecorr = parameter.Uniform(-10, -5)
# red noise parameters
log10_A = parameter.Uniform(-20, -11)
gamma = parameter.Uniform(0, 7)
# define selection by observing backend
selection = selections.Selection(selections.by_backend)
# define white noise signals (selection tells it to split based on backend)
ef = white_signals.MeasurementNoise(efac=efac, selection=selection)
eq = white_signals.EquadNoise(log10_equad=log10_equad, selection=selection)
ec = white_signals.EcorrKernelNoise(log10_ecorr=log10_ecorr,
selection=selection)
# define powerlaw PSD and red noise signal
pl = utils.powerlaw(log10_A=log10_A, gamma=gamma)
rn = gp_signals.FourierBasisGP(pl, components=30)
# linear timing model
tm = gp_signals.TimingModel()
# total signal (this is some high tech shit right here!)
s = ef + eq + ec + tm + rn
# setup PTA
pta = signal_base.PTA([s(psr)])
print pta.params
["B1855+09_430_ASP_efac":Uniform(0.1,10), "B1855+09_430_ASP_log10_ecorr":Uniform(-10,-5), "B1855+09_430_ASP_log10_equad":Uniform(-10,-5), "B1855+09_430_PUPPI_efac":Uniform(0.1,10), "B1855+09_430_PUPPI_log10_ecorr":Uniform(-10,-5), "B1855+09_430_PUPPI_log10_equad":Uniform(-10,-5), "B1855+09_L-wide_ASP_efac":Uniform(0.1,10), "B1855+09_L-wide_ASP_log10_ecorr":Uniform(-10,-5), "B1855+09_L-wide_ASP_log10_equad":Uniform(-10,-5), "B1855+09_L-wide_PUPPI_efac":Uniform(0.1,10), "B1855+09_L-wide_PUPPI_log10_ecorr":Uniform(-10,-5), "B1855+09_L-wide_PUPPI_log10_equad":Uniform(-10,-5), "B1855+09_gamma":Uniform(0,7), "B1855+09_log10_A":Uniform(-20,-11)]
# random parameters for testing
xs = {p.name: p.sample() for p in pta.params}
# likelihood
print pta.get_lnlikelihood(xs)
# prior
print pta.get_lnprior(xs)
43167.2424803 -26.1887770544
# white noise parameters
# set them to constant here and we will input the noise values
# after the model is initialized
efac = parameter.Constant()
equad = parameter.Constant()
ecorr = parameter.Constant()
# red noise parameters
# LinearExp prior places uniform prior on A while sampling in log10_A
log10_A = parameter.LinearExp(-20, -11)
gamma = parameter.Uniform(0, 7)
# common red noise
# naming signals here tells enterprise that these parameters
# are shared across pulsars
log10_Agw = parameter.LinearExp(-18, -11)('log10_Agw')
gamma_gw = parameter.Constant(4.33)('gamma_gw')
# define selection by observing backend
selection = selections.Selection(selections.by_backend)
# define white noise signals
ef = white_signals.MeasurementNoise(efac=efac, selection=selection)
eq = white_signals.EquadNoise(log10_equad=equad, selection=selection)
ec = white_signals.EcorrKernelNoise(log10_ecorr=ecorr, selection=selection)
# get overall time span
tmin = np.min([p.toas.min() for p in psrs])
tmax = np.min([p.toas.max() for p in psrs])
Tspan = tmax - tmin
# define powerlaw PSD and red noise signal
pl = utils.powerlaw(log10_A=log10_A, gamma=gamma)
rn = gp_signals.FourierBasisGP(pl, components=30, Tspan=Tspan)
# GWB
cpl = utils.powerlaw(log10_A=log10_Agw, gamma=gamma_gw)
orf = utils.hd_orf()
crn = gp_signals.FourierBasisCommonGP(cpl, orf, components=30, Tspan=Tspan)
# linear timing model
tm = gp_signals.TimingModel()
## Physical ephemeris model
eph = deterministic_signals.PhysicalEphemerisSignal(use_epoch_toas=True)
# total signal
s = ef + eq + ec + rn + crn + tm + eph
# setup PTA
pta = signal_base.PTA([s(p) for p in psrs])
# set fixed white noise parameters
pta.set_default_params(setpars)
gp_signals.BasisGP
for Gaussian Process signals that have a set of basis functions and a kernel.white_signals.WhiteNoise
for white noise signals with a specific variance.deterministic_signals.Deterministic
for determinisic signals that return a delay waveform.@signal_base.function
def scaled_tm_basis(Mmat):
"""Returns mean and variance scaled timing model
design matrix and weights to be used in prior
function."""
mn = Mmat.mean(axis=0)
sd = Mmat.std(axis=0)
ret = Mmat.copy()
ret[:, 1:] -= mn[1:]
ret[:, 1:] /= sd[None, 1:]
return ret, np.ones_like(mn)
@signal_base.function
def ridge_prior(weights, log10_variance=-14):
"""Return gaussian prior with variance parameter."""
return weights * 10**(log10_variance)
ntoa
x nbasis
array and an nbasis
vector of weights.nbasis
array (diagonal of covariance matrix) or the full nbasis
x nbasis
covariance matrix.log10_variance = parameter.Uniform(-20, -10)
basis = scaled_tm_basis()
prior = ridge_prior(log10_variance=log10_variance)
ridge = gp_signals.BasisGP(prior, basis, name='ridge')
# define DM EQUAD variance function
@signal_base.function
def dmequad_ndiag(freqs, log10_dmequad=-8):
"""Reads in radio frequencies and DMEQUAD value and outputs variance."""
return np.ones_like(freqs) * (1400/freqs)**4 * 10**(2*log10_dmequad)
enterprise
Function
it reads freqs
directly from the Pulsar
object.log10_dmequad = parameter.Uniform(-10, -5)
dmvariance = dmequad_ndiag(log10_dmequad=log10_equad)
dmeq = white_signals.WhiteNoise(dmvariance)
@signal_base.function
def wavelet(toas, log10_A=-7, log10_Q=2, t0=55000, log10_f0=-7.5, phi0=0):
# convert units
A = 10**log10_A
Q = 10**log10_Q * const.day
t0 *= const.day
f0 = 10**log10_f0
wv = A * np.cos(2*np.pi*f0*(toas-t0)+phi0) * np.exp(-(toas-t0)**2/2/Q**2)
return wv
log10_A = parameter.Uniform(-10, -5)
log10_Q = parameter.Uniform(0, 3)
t0 = parameter.Uniform(53000, 58000)
log10_f0 = parameter.Uniform(-9, -6)
phi0 = parameter.Uniform(0, 2*np.pi)
wf = wavelet(log10_A=log10_A, log10_Q=log10_Q, t0=t0,
log10_f0=log10_f0, phi0=phi0)
wvlt = deterministic_signals.Deterministic(wf)
Signal
class if your signal is very different.FourierBasisGP
class is actually part of enterprise
but it works for an example of how to sublcass an existing Signal
.def FourierBasisGP(spectrum, components=20,
selection=selections.Selection(selections.no_selection),
Tspan=None):
"""Convenience function to return a BasisGP class with a
fourier basis."""
# use fourier basis and use BasisGP
basis = utils.createfourierdesignmatrix_red(nmodes=components, Tspan=Tspan)
BaseClass = BasisGP(spectrum, basis, selection=selection)
class FourierBasisGP(BaseClass):
signal_type = 'basis'
signal_name = 'red noise'
return FourierBasisGP
Deterministic
example¶def WaveletSignal(log10_A, log10_Q, t0, log10_f0, phi0, use_epoch_toas=True):
"""Wavelet signal with option to use epoch TOAs"""
# setup wavelet signal and use Deterministic
wf = wavelet(log10_A=log10_A, log10_Q=log10_Q, t0=t0,
log10_f0=log10_f0, phi0=phi0)
BaseClass = deterministic_signals.Deterministic(wf, name='wavelet')
class WaveletSignal(BaseClass):
def __init__(self, psr):
super(WaveletSignal, self).__init__(psr)
if use_epoch_toas:
# get quantization matrix and calculate daily average TOAs
U, _ = utils.create_quantization_matrix(psr.toas, nmin=1)
self.uinds = utils.quant2ind(U)
avetoas = np.array([psr.toas[sc].mean() for sc in self.uinds])
# replace kwarg in waveform with avetoas instead of toas
self._wf[''].add_kwarg(toas=avetoas)
@signal_base.cache_call('delay_params')
def get_delay(self, params):
delay = self._wf[''](params=params)
if use_epoch_toas:
for slc, val in zip(self.uinds, delay):
self._delay[slc] = val
return self._delay
else:
return delay
return WaveletSignal
Signal
s via Selection
s¶selection
arguments above.Selection
functionBasisGP
, WhiteNoise
, and Deterministic
Signals
support selection.def cut_half(toas):
"""Selection function to split by data segment"""
midpoint = (toas.max() + toas.min()) / 2
return dict(zip(['t1', 't2'], [toas <= midpoint, toas > midpoint]))
selection=selections.Selection(cut_half)
as a keyword argument in the signal and thats it!