-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscaling_compare.py
241 lines (196 loc) · 9.18 KB
/
scaling_compare.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
#from image_pipe.py
#SingleObsImageMaker
from __future__ import absolute_import, division, print_function, unicode_literals
import numpy as np
from astropy.units import Quantity
from astropy.table import Table
from astropy.coordinates import Angle
from astropy.convolution import Tophat2DKernel
from gammapy.utils.energy import EnergyBounds
from gammapy.stats import significance
from gammapy.background import fill_acceptance_image
from gammapy.image import SkyImage, SkyImageList
__all__ = ['SingleObsImageMaker']
class SingleObsImageMaker(object):
"""Compute images for one observation.
The computed images are stored in a ``images`` attribute of
type `~gammapy.image.SkyImageList` with the following keys:
* ``counts`` : Counts
* ``bkg`` : Background model
* ``exposure`` : Exposure
* ``excess`` : Excess
* ``significance`` : Significance
Parameters
----------
obs : `~gammapy.data.DataStoreObservation`
Observation data
empty_image : `~gammapy.image.SkyImage`
Reference image
energy_band : `~gammapy.utils.energy.Energy`
Energy band selection
offset_band : `astropy.coordinates.Angle`
Offset band selection
exclusion_mask : `~gammapy.image.SkyImage`
Exclusion mask
ncounts_min : int
Minimum counts required for the observation (TODO: used how?)
save_bkg_scale: bool
True if you want to save the normalisation of the bkg computed outside the exlusion region in a Table
"""
def __init__(self, obs, empty_image,
energy_band, offset_band, exclusion_mask=None, ncounts_min=0, save_bkg_scale=True):
# Select the events in the given energy and offset range
self.energy_band = energy_band
self.offset_band = offset_band
events = obs.events
self.obs_id = events.table.meta["OBS_ID"]
events = events.select_energy(self.energy_band)
self.events = events.select_offset(self.offset_band)
self.images = SkyImageList()
self.empty_image = empty_image
self.header = self.empty_image.to_image_hdu().header
if exclusion_mask:
exclusion_mask.name = 'exclusion'
self.images['exclusion'] = exclusion_mask
self.ncounts_min = ncounts_min
self.aeff = obs.aeff
self.edisp = obs.edisp
self.psf = obs.psf
self.bkg = obs.bkg
self.obs_center = obs.pointing_radec
self.livetime = obs.observation_live_time_duration
self.save_bkg_scale = save_bkg_scale
if self.save_bkg_scale:
self.table_bkg_scale = Table(names=["OBS_ID", "bkg_scale", "N_counts"])
def counts_image(self):
"""Fill the counts image for the events of one observation."""
self.images['counts'] = SkyImage.empty_like(self.empty_image, name='counts')
if len(self.events.table) > self.ncounts_min:
self.images['counts'].fill_events(self.events)
def bkg_image(self, bkg_norm=True):
"""
Make the background image for one observation from a bkg model.
Parameters
----------
bkg_norm : bool
If true, apply the scaling factor from the number of counts
outside the exclusion region to the bkg image
"""
bkg_image = SkyImage.empty_like(self.empty_image)
table = self.bkg.acceptance_curve_in_energy_band(energy_band=self.energy_band)
center = self.obs_center.galactic
bkg_hdu = fill_acceptance_image(self.header, center, table["offset"], table["Acceptance"], self.offset_band[1])
bkg_image.data = Quantity(bkg_hdu.data, table["Acceptance"].unit) * bkg_image.solid_angle() * self.livetime
bkg_image.data = bkg_image.data.decompose()
bkg_image.data = bkg_image.data.value
if bkg_norm:
scale, counts = self.background_norm_factor(self.images["counts"], bkg_image)
bkg_image.data = scale * bkg_image.data
if self.save_bkg_scale:
self.table_bkg_scale.add_row([self.obs_id, scale, counts])
self.images["bkg"] = bkg_image
def make_1d_expected_counts(self, spectral_index=2.3, for_integral_flux=False, eref=None):
"""Compute the 1D exposure table for one observation for an offset table.
Parameters
----------
spectral_index : float
Assumed power-law spectral index
for_integral_flux : bool
True if you want that the total excess / exposure gives the integrated flux
eref: `~gammapy.utils.energy.Energy`
Reference energy at which you want to compute the exposure. Default is the log center of the energy band of
the image.
Returns
-------
table : `astropy.table.Table`
Two columns: offset in the FOV "theta" and expected counts "npred"
"""
energy = EnergyBounds.equal_log_spacing(self.energy_band[0].value, self.energy_band[1].value, 100,
self.energy_band.unit)
energy_band = energy.bands
energy_bin = energy.log_centers
if not eref:
eref = EnergyBounds(self.energy_band).log_centers
spectrum = (energy_bin / eref) ** (-spectral_index)
offset = Angle(np.linspace(self.offset_band[0].value, self.offset_band[1].value, 10), self.offset_band.unit)
arf = self.aeff.data.evaluate(offset=offset, energy=energy_bin).T
npred = np.sum(arf * spectrum * energy_band, axis=1)
npred *= self.livetime
if for_integral_flux:
norm = np.sum(spectrum * energy_band)
npred /= norm
table = Table()
table['theta'] = offset
table['npred'] = npred
return table
def exposure_image(self, spectral_index=2.3, for_integral_flux=False):
r"""Compute the exposure image for one observation.
Excess/exposure will give the differential flux at the energy Eref at the middle of the ``self.energy_band``
If ``for_integral_flux`` is true, it will give the integrated flux over the ``self.energy_band``
Exposure is define as follow:
.. math ::
EXPOSURE = \int_{E_1}^{E_2} A(E) \phi(E) * T \, dE
with ``T`` the observation livetime, ``A(E)`` the effective area,
the energy integration range :math:`[E_1,E_2]` given by ``self.energy_range``
and assuming a power law for the flux :math:`\phi(E) = \phi_{Eref} \times \frac{E}{E_{ref}}^{\gamma}`
with :math:`\gamma` the spectral index of the assumed power law.
If ``for_integral_flux`` is true,
:math:`EXPOSURE = \int_{E_1}^{E_2} A(E) \phi_{E} * T \, dE / \int \phi_{E} \, dE`
Parameters
----------
spectral_index : float
Assumed power-law spectral index
for_integral_flux : bool
True if you want that the total excess / exposure gives the integrated flux
"""
from scipy.interpolate import interp1d
# TODO: should be re-implemented using the make_exposure_cube function
table = self.make_1d_expected_counts(spectral_index, for_integral_flux)
exposure = SkyImage.empty_like(self.empty_image, unit=table["npred"].unit)
# calculate pixel offset from center (in world coordinates)
coord = exposure.coordinates()
offset = coord.separation(self.obs_center)
# Interpolate for the offset of each pixel
f = interp1d(table["theta"], table["npred"], bounds_error=False, fill_value=0)
exposure.data = f(offset)
exposure.data[offset >= self.offset_band[1]] = 0
self.images["exposure"] = exposure
self.images["exposure"] = 0.0
def background_norm_factor(self, counts, bkg):
"""Determine the scaling factor to apply to the background image.
Compares the events in the counts images and the bkg image outside the exclusion images.
Parameters
----------
counts : `~gammapy.image.SkyImage`
counts image
bkg : `~gammapy.image.SkyImage`
bkg image
Returns
-------
scale : float
scaling factor between the counts and the bkg images outside the exclusion region.
"""
counts_sum = np.sum(counts.data * self.images['exclusion'].data)
bkg_sum = np.sum(bkg.data * self.images['exclusion'].data)
scale = counts_sum / bkg_sum
#return -999, -999
return scale, counts_sum
def significance_image(self, radius):
"""Make the significance image from the counts and bkg images.
Parameters
----------
radius : float
Disk radius in pixels.
"""
image = SkyImage.empty_like(self.empty_image)
disk = Tophat2DKernel(radius)
disk.normalize('peak')
counts = self.images["counts"].convolve(disk.array)
bkg = self.images["bkg"].convolve(disk.array)
image.data = significance(counts.data, bkg.data)
self.images["significance"] = image
def excess_image(self):
"""Compute excess between counts and bkg image."""
total_excess = SkyImage.empty_like(self.empty_image)
total_excess.data = self.images["counts"].data - self.images["bkg"].data
self.images["excess"] = total_excess