-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsimulation_as.py
205 lines (160 loc) · 7.44 KB
/
simulation_as.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# Licensed under a 3 - clause BSD style license - see LICENSE.rst
# my code to include background
from __future__ import absolute_import, division, print_function, unicode_literals
from collections import OrderedDict
import logging
import astropy.units as u
from regions import CircleSkyRegion
from ..utils.random import get_random_state
from ..utils.energy import EnergyBounds
from .utils import CountsPredictor
from .core import PHACountsSpectrum
from .observation import SpectrumObservation, SpectrumObservationList
from ..background.reflected import ReflectedRegionsBackgroundEstimator
from ..image import SkyImage
from ..irf import EnergyDispersion, EnergyDispersion2D, EffectiveAreaTable, EffectiveAreaTable2D
__all__ = [
'SimulationRealBkg'
]
log = logging.getLogger(__name__)
class SimulationRealBkg(object):
"""Simulate `~gammapy.spectrum.SpectrumObservation` with a real background observation.
Call the ReflectRegionsBackgroundEstimator to estimate background counts/spectra.
IMPORTANT: Works only for point like IRFs (or very extended sources)
Parameters
----------
livetime : `~astropy.units.Quantity`, optional [if not specified, takes the observation duration
Livetime
source_model : `~gammapy.spectrum.models.SpectralModel`
Source model
background_model : `~gammapy.spectrum.models.SpectralModel`, optional
Background model
alpha : float, optional
Exposure ratio between source and background
obsrun: a real observation run to extract the background from.
"""
def __init__(self, source_model, obsrun, obspos, livetime=None):
self.source_model = source_model # the model used during simulation
self.obsrun=obsrun # the observation run
self.obslist=[obsrun,] #patchy
self.obspos=obspos #the source observed
self.offset = SkyCoord.separation(obsrun.pointing_radec,src_obs) #calculate the offset
self.aeff = obsrun.aeff.to_effective_area_table(offset=self.offset)
self.edisp = obsrun.edisp.to_energy_dispersion(offset=self.offset)
if livetime == None:
self.src_livetime = obsrun.observation_live_time_duration
else:
self.src_livetime = livetime
self.bkg_livetime = obsrun.observation_live_time_duration
self.on_vector = None
self.off_vector = None
self.obs = None
self.result = SpectrumObservationList()
@property
def npred_source(self):
"""Predicted source `~gammapy.spectrum.CountsSpectrum`.
Calls :func:`gammapy.spectrum.utils.CountsPredictor`.
Same function as before
"""
predictor = CountsPredictor(livetime=self.livetime,
aeff=self.aeff,
edisp=self.edisp,
model=self.source_model)
predictor.run()
return predictor.npred
def run(self, seed):
"""Simulate `~gammapy.spectrum.SpectrumObservationList`.
The seeds will be set as observation ID.
Previously produced results will be overwritten.
Parameters
----------
seed : array of ints
Random number generator seeds
"""
self.reset()
n_obs = len(seed)
log.info("Simulating {} observations".format(n_obs))
for counter, current_seed in enumerate(seed):
progress = ((counter + 1) / n_obs) * 100
if progress % 10 == 0:
log.info("Progress : {} %".format(progress))
self.simulate_obs(seed=current_seed, obs_id=current_seed)
self.result.append(self.obs)
def reset(self):
"""Clear all results."""
self.result = SpectrumObservationList()
self.obs = None
self.on_vector = None
self.off_vector = None
def simulate_obs(self, obs_id, seed='random-seed'):
"""Simulate one `~gammapy.spectrum.SpectrumObservation`.
The result is stored as ``obs`` attribute
Parameters
----------
obs_id : int
Observation identifier
seed : {int, 'random-seed', 'global-rng', `~numpy.random.RandomState`}
see :func:~`gammapy.utils.random.get_random_state`
"""
random_state = get_random_state(seed)
self.simulate_source_counts(random_state)
obs = SpectrumObservation(on_vector=self.on_vector,
off_vector=self.off_vector,
aeff=self.aeff,
edisp=self.edisp)
self.simulate_background_counts()
obs.obs_id = obs_id
self.obs = obs
def simulate_source_counts(self, rand):
"""Simulate source `~gammapy.spectrum.PHACountsSpectrum`.
Source counts are added to the on vector.
Parameters
----------
rand: `~numpy.random.RandomState`
random state
"""
on_counts = rand.poisson(self.npred_source.data.data.value)
on_vector = PHACountsSpectrum(energy_lo=self.e_reco.lower_bounds,
energy_hi=self.e_reco.upper_bounds,
data=on_counts,
backscal=1,
meta=self._get_meta())
on_vector.livetime = self.src_livetime
self.on_vector = on_vector
def estimate_reflected(self, EXCLUSION_FILE = EXCLUSION_FILE, size=size):
#just extracts the reflected background
on_size=0.11 * u.deg #0.11 for point source cuts...
allsky_mask = SkyImage.read(EXCLUSION_FILE)
exclusion_mask = allsky_mask.cutout(position=obspos,size=size)
on_region=CircleSkyRegion(self.obspos,on_size)
background_estimator = ReflectedRegionsBackgroundEstimator(obs_list=obslist, on_region=on_region, exclusion_mask = exclusion_mask)
background_estimator.run()
return background_estimator.result[0]
def simulate_background_counts(self):
bkg_res=estimate_reflected(EXCLUSION_FILE ='$GAMMAPY_EXTRA/datasets/exclusion_masks/tevcat_exclusion.fits',size=Angle('6 deg'))
a_off=bkg_res.a_off
a_on = bkg_res.a_on
nOFF= len(bkg_res.off_events.table)
alpha=float(a_on)/float(a_off)
nbkg=alpha*nOFF # number of background events in on region
nbkg = np.random.poisson(nbkg)
idxON=np.random.choice(np.arange(nOFF),nbkg,replace=False)
bkg_ev=bkg_res.off_events.table[idxON] #background events in on region
bkg_hist,edge=np.histogram(bkg_ev["ENERGY"],sim.on_vector.energy.bins)
# Add background to on_vector
self.on_vector.data.data += bkg_hist * u.ct
# Create off vector
off_events=bkg_res.off_events.table
off_events.remove_rows(idxON) # should these be removed ?
alpha_new=float(a_on)/float(a_off - 1)
off_counts,edge=np.histogram(off_events["ENERGY"],sim.on_vector.energy.bins)
off_vector = PHACountsSpectrum(energy_lo=sim.on_vector.energy.lo,
energy_hi=sim.on_vector.energy.hi,
data=off_counts,
backscal=1. / alpha_new, is_bkg=True, meta=self._get_meta())
self.off_vector = off_vector
def _get_meta(self):
"""Meta info added to simulated counts spectra."""
meta = OrderedDict()
meta['CREATOR'] = self.__class__.__name__
return meta