¶by: Justin Ellis$^{1,2}$, Michele Vallisneri$^2$, Paul Baker$^1$, Steve Taylor$^2$
$^1$ Center for Gravitational Waves and Cosmology, West Virginia University
$^2$ Jet Propulsion Laboratory, California Institute of Technology
Code website: https://github.com/nanograv/enterprise
Documentation: https://enterprise.readthedocs.io/
is an Enhanced Numerical Toolbox Enabling a Robust PulsaR Inference SuitE (complaints about this backronym are to be directed to Steve Taylor)enterprise
really is a toolbox, not a set of black box functions and scripts!pep8
to be the base for all of your future pulsar timing residual modeling needs.enterprise
code architecture¶Signal
components which have their own Parameter
components is a SignalCollection
code architecture¶SignalCollection
that are combined to form a PTA
s are shared across pulsars Likelihood
s act on PTA
s. enterprise
¶class Signal(object):
"""Base class for Signal objects."""
def get_ndiag(self, params):
"""Returns the diagonal of the white noise vector `N`.
This method also supports block diagaonal sparse matrices.
return None
def get_delay(self, params):
"""Returns the waveform of a deterministic signal."""
return 0
def get_basis(self, params=None):
"""Returns the basis array of shape N_toa x N_basis."""
return None
def get_phi(self, params):
"""Returns a diagonal or full rank covaraince matrix
of the basis amplitudes."""
return None
def get_phiinv(self, params):
"""Returns inverse of the covaraince of basis amplitudes."""
return None
# define white noise parameters
efac = parameter.Uniform(0.1, 10)
log10_equad = parameter.Uniform(-10, -5)
log10_ecorr = parameter.Uniform(-10, -5)
# red noise parameters
log10_A = parameter.Uniform(-20, -11)
gamma = parameter.Uniform(0, 7)
# define selection by observing backend
selection = selections.Selection(selections.by_backend)
# define white noise signals (selection tells it to split based on backend)
ef = white_signals.MeasurementNoise(efac=efac, selection=selection)
eq = white_signals.EquadNoise(log10_equad=log10_equad, selection=selection)
ec = white_signals.EcorrKernelNoise(log10_ecorr=log10_ecorr,
# define powerlaw PSD and red noise signal
pl = utils.powerlaw(log10_A=log10_A, gamma=gamma)
rn = gp_signals.FourierBasisGP(pl, components=30)
# linear timing model
tm = gp_signals.TimingModel()
# total signal (this is some high tech shit right here!)
s = ef + eq + ec + tm + rn
# setup PTA
pta = signal_base.PTA([s(psr)])
print pta.params
["B1855+09_430_ASP_efac":Uniform(0.1,10), "B1855+09_430_ASP_log10_ecorr":Uniform(-10,-5), "B1855+09_430_ASP_log10_equad":Uniform(-10,-5), "B1855+09_430_PUPPI_efac":Uniform(0.1,10), "B1855+09_430_PUPPI_log10_ecorr":Uniform(-10,-5), "B1855+09_430_PUPPI_log10_equad":Uniform(-10,-5), "B1855+09_L-wide_ASP_efac":Uniform(0.1,10), "B1855+09_L-wide_ASP_log10_ecorr":Uniform(-10,-5), "B1855+09_L-wide_ASP_log10_equad":Uniform(-10,-5), "B1855+09_L-wide_PUPPI_efac":Uniform(0.1,10), "B1855+09_L-wide_PUPPI_log10_ecorr":Uniform(-10,-5), "B1855+09_L-wide_PUPPI_log10_equad":Uniform(-10,-5), "B1855+09_gamma":Uniform(0,7), "B1855+09_log10_A":Uniform(-20,-11)]
# random parameters for testing
xs = {p.name: p.sample() for p in pta.params}
# likelihood
print pta.get_lnlikelihood(xs)
# prior
print pta.get_lnprior(xs)
43167.2424803 -26.1887770544
# white noise parameters
# set them to constant here and we will input the noise values
# after the model is initialized
efac = parameter.Constant()
equad = parameter.Constant()
ecorr = parameter.Constant()
# red noise parameters
# LinearExp prior places uniform prior on A while sampling in log10_A
log10_A = parameter.LinearExp(-20, -11)
gamma = parameter.Uniform(0, 7)
# common red noise
# naming signals here tells enterprise that these parameters
# are shared across pulsars
log10_Agw = parameter.LinearExp(-18, -11)('log10_Agw')
gamma_gw = parameter.Constant(4.33)('gamma_gw')
# define selection by observing backend
selection = selections.Selection(selections.by_backend)
# define white noise signals
ef = white_signals.MeasurementNoise(efac=efac, selection=selection)
eq = white_signals.EquadNoise(log10_equad=equad, selection=selection)
ec = white_signals.EcorrKernelNoise(log10_ecorr=ecorr, selection=selection)
# get overall time span
tmin = np.min([p.toas.min() for p in psrs])
tmax = np.min([p.toas.max() for p in psrs])
Tspan = tmax - tmin
# define powerlaw PSD and red noise signal
pl = utils.powerlaw(log10_A=log10_A, gamma=gamma)
rn = gp_signals.FourierBasisGP(pl, components=30, Tspan=Tspan)
cpl = utils.powerlaw(log10_A=log10_Agw, gamma=gamma_gw)
orf = utils.hd_orf()
crn = gp_signals.FourierBasisCommonGP(cpl, orf, components=30, Tspan=Tspan)
# linear timing model
tm = gp_signals.TimingModel()
## Physical ephemeris model
eph = deterministic_signals.PhysicalEphemerisSignal(use_epoch_toas=True)
# total signal
s = ef + eq + ec + rn + crn + tm + eph
# setup PTA
pta = signal_base.PTA([s(p) for p in psrs])
# set fixed white noise parameters
for Gaussian Process signals that have a set of basis functions and a kernel.white_signals.WhiteNoise
for white noise signals with a specific variance.deterministic_signals.Deterministic
for determinisic signals that return a delay waveform.@signal_base.function
def scaled_tm_basis(Mmat):
"""Returns mean and variance scaled timing model
design matrix and weights to be used in prior
mn = Mmat.mean(axis=0)
sd = Mmat.std(axis=0)
ret = Mmat.copy()
ret[:, 1:] -= mn[1:]
ret[:, 1:] /= sd[None, 1:]
return ret, np.ones_like(mn)
def ridge_prior(weights, log10_variance=-14):
"""Return gaussian prior with variance parameter."""
return weights * 10**(log10_variance)
x nbasis
array and an nbasis
vector of weights.nbasis
array (diagonal of covariance matrix) or the full nbasis
x nbasis
covariance matrix.log10_variance = parameter.Uniform(-20, -10)
basis = scaled_tm_basis()
prior = ridge_prior(log10_variance=log10_variance)
ridge = gp_signals.BasisGP(prior, basis, name='ridge')
# define DM EQUAD variance function
def dmequad_ndiag(freqs, log10_dmequad=-8):
"""Reads in radio frequencies and DMEQUAD value and outputs variance."""
return np.ones_like(freqs) * (1400/freqs)**4 * 10**(2*log10_dmequad)
it reads freqs
directly from the Pulsar
object.log10_dmequad = parameter.Uniform(-10, -5)
dmvariance = dmequad_ndiag(log10_dmequad=log10_equad)
dmeq = white_signals.WhiteNoise(dmvariance)
def wavelet(toas, log10_A=-7, log10_Q=2, t0=55000, log10_f0=-7.5, phi0=0):
# convert units
A = 10**log10_A
Q = 10**log10_Q * const.day
t0 *= const.day
f0 = 10**log10_f0
wv = A * np.cos(2*np.pi*f0*(toas-t0)+phi0) * np.exp(-(toas-t0)**2/2/Q**2)
return wv
log10_A = parameter.Uniform(-10, -5)
log10_Q = parameter.Uniform(0, 3)
t0 = parameter.Uniform(53000, 58000)
log10_f0 = parameter.Uniform(-9, -6)
phi0 = parameter.Uniform(0, 2*np.pi)
wf = wavelet(log10_A=log10_A, log10_Q=log10_Q, t0=t0,
log10_f0=log10_f0, phi0=phi0)
wvlt = deterministic_signals.Deterministic(wf)
class if your signal is very different.FourierBasisGP
class is actually part of enterprise
but it works for an example of how to sublcass an existing Signal
.def FourierBasisGP(spectrum, components=20,
"""Convenience function to return a BasisGP class with a
fourier basis."""
# use fourier basis and use BasisGP
basis = utils.createfourierdesignmatrix_red(nmodes=components, Tspan=Tspan)
BaseClass = BasisGP(spectrum, basis, selection=selection)
class FourierBasisGP(BaseClass):
signal_type = 'basis'
signal_name = 'red noise'
return FourierBasisGP
example¶def WaveletSignal(log10_A, log10_Q, t0, log10_f0, phi0, use_epoch_toas=True):
"""Wavelet signal with option to use epoch TOAs"""
# setup wavelet signal and use Deterministic
wf = wavelet(log10_A=log10_A, log10_Q=log10_Q, t0=t0,
log10_f0=log10_f0, phi0=phi0)
BaseClass = deterministic_signals.Deterministic(wf, name='wavelet')
class WaveletSignal(BaseClass):
def __init__(self, psr):
super(WaveletSignal, self).__init__(psr)
if use_epoch_toas:
# get quantization matrix and calculate daily average TOAs
U, _ = utils.create_quantization_matrix(psr.toas, nmin=1)
self.uinds = utils.quant2ind(U)
avetoas = np.array([psr.toas[sc].mean() for sc in self.uinds])
# replace kwarg in waveform with avetoas instead of toas
def get_delay(self, params):
delay = self._wf[''](params=params)
if use_epoch_toas:
for slc, val in zip(self.uinds, delay):
self._delay[slc] = val
return self._delay
return delay
return WaveletSignal
s via Selection
arguments above.Selection
, WhiteNoise
, and Deterministic
support selection.def cut_half(toas):
"""Selection function to split by data segment"""
midpoint = (toas.max() + toas.min()) / 2
return dict(zip(['t1', 't2'], [toas <= midpoint, toas > midpoint]))
as a keyword argument in the signal and thats it!