Skip to content
3 changes: 2 additions & 1 deletion cosipy/response/PointSourceResponse.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,11 @@ def get_expectation(self, spectrum, polarization=None, flux=None):

# unpolarized weights
weights = np.full(pol_axis.nbins, (1. - polarization_level) / pol_axis.nbins)

# add polarized weights
polarization_bin_index = pol_axis.find_bin(polarization_angle * u.deg)
weights[polarization_bin_index] += polarization_level
weights *= self.axes['Pol'].nbins

contents = np.tensordot(weights, self.contents, axes=(0, self.axes.label_to_index('Pol')))

Expand Down
Binary file added cosipy/test_data/polarization_data_binned.hdf5
Binary file not shown.
14 changes: 14 additions & 0 deletions cosipy/test_data/polarization_data_mlm.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#----------#
# Data I/O:

data_file: "abc" # full path
ori_file: 'data3/ori.ori'
unbinned_output: 'hdf5' # 'fits' or 'hdf5'
time_bins: 1 # time bin size in seconds. Takes int or list of bin edges.
energy_bins: [200., 500., 1000., 2000., 10000.] # Takes list. Needs to match response.
phi_pix_size: 6 # binning of Compton scattering angle [deg]
nside: 1 # healpix binning of psi chi local
scheme: 'ring' # healpix binning of psi chi local
tmin: 0. # Min time cut in seconds.
tmax: 13. # Max time cut in seconds.
#----------#
65 changes: 56 additions & 9 deletions cosipy/threeml/COSILike.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

from cosipy.response.FullDetectorResponse import FullDetectorResponse
from cosipy.response.ExtendedSourceResponse import ExtendedSourceResponse
from cosipy.polarization.polarization_angle import PolarizationAngle
from cosipy.polarization.conventions import IAUPolarizationConvention, MEGAlibRelativeX, MEGAlibRelativeY, MEGAlibRelativeZ

from scoords import SpacecraftFrame, Attitude

Expand Down Expand Up @@ -59,9 +61,11 @@ class COSILike(PluginPrototype):
Full path to precomputed point source response in Galactic coordinates
earth_occ : bool, optional
Option to include Earth occultation in fit (default is True).
response_pa_convention : str, optional
Polarization reference convention of response ('RelativeX', 'RelativeY', or 'RelativeZ'). Required if response contains polarization angle axis
"""
def __init__(self, name, dr, data, bkg, sc_orientation,
nuisance_param=None, coordsys=None, precomputed_psr_file=None, earth_occ=True, **kwargs):
def __init__(self, name, dr, data, bkg, sc_orientation, nuisance_param=None, coordsys=None,
precomputed_psr_file=None, earth_occ=True, response_pa_convention=None, **kwargs):

# create the hash for the nuisance parameters. We have none for now.
self._nuisance_parameters = collections.OrderedDict()
Expand All @@ -72,7 +76,7 @@ def __init__(self, name, dr, data, bkg, sc_orientation,
# User inputs needed to compute the likelihood
self._name = name
self._rsp_path = dr
self._dr = FullDetectorResponse.open(dr)
self._dr = FullDetectorResponse.open(dr, pa_convention=response_pa_convention)
self._data = data
self._bkg = bkg
self._sc_orientation = sc_orientation
Expand Down Expand Up @@ -117,6 +121,24 @@ def __init__(self, name, dr, data, bkg, sc_orientation,
logger.info("... loading the pre-computed image response ...")
self.image_response = ExtendedSourceResponse.open(self.precomputed_psr_file)
logger.info("--> done")

if 'Pol' in self._dr.axes.labels:
self._response_pa_convention = response_pa_convention
if self._coordsys == 'spacecraftframe':
if self._response_pa_convention == 'RelativeX':
self._pa_convention = MEGAlibRelativeX(attitude=self._sc_orientation.get_attitude()[0])
elif self._response_pa_convention == 'RelativeY':
self._pa_convention = MEGAlibRelativeY(attitude=self._sc_orientation.get_attitude()[0])
elif self._response_pa_convention == 'RelativeZ':
self._pa_convention = MEGAlibRelativeZ(attitude=self._sc_orientation.get_attitude()[0])
else:
raise RuntimeError("Response convention must be 'RelativeX', 'RelativeY', or 'RelativeZ'")
elif self._coordsys == 'galactic':
self._pa_convention = IAUPolarizationConvention()
else:
raise RuntimeError("Unknown coordinate system")
else:
self._response_pa_convention = None

def set_model(self, model):
"""
Expand Down Expand Up @@ -215,7 +237,7 @@ def set_model(self, model):
dwell_time_map = self._get_dwell_time_map(coord)
self._psr[name] = self._dr.get_point_source_response(exposure_map=dwell_time_map)
elif self._coordsys == 'galactic':
scatt_map = self._get_scatt_map()
scatt_map = self._get_scatt_map(coord)
self._psr[name] = self._dr.get_point_source_response(coord=coord, scatt_map=scatt_map)
else:
raise RuntimeError("Unknown coordinate system")
Expand All @@ -225,11 +247,35 @@ def set_model(self, model):
# Get expectation for point sources:
for name,source in point_sources.items():

# Convolve with spectrum
# See also the Detector Response and Source Injector tutorials
spectrum = source.spectrum.main.shape
if hasattr(source.spectrum, 'main'):

spectrum = source.spectrum.main.shape
total_expectation = self._psr[name].get_expectation(spectrum)

else:

component_counter = 0

for item in source.spectrum.to_dict():

total_expectation = self._psr[name].get_expectation(spectrum)
spectrum = getattr(source.spectrum, item).shape

if not 'Pol' in self._dr.axes.labels:
this_expectation = self._psr[name].get_expectation(spectrum)
else:
if self._coordsys == 'spacecraftframe':
this_expectation = self._psr[name].get_expectation(spectrum, source.components[item].polarization)
elif self._coordsys == 'galactic':
this_expectation = self._psr[name].get_expectation(spectrum, source.components[item].polarization)
else:
raise RuntimeError("Unknown coordinate system")

if component_counter == 0:
total_expectation = this_expectation
else:
total_expectation += this_expectation

component_counter += 1

# Save expected counts for source:
self._expected_counts[name] = total_expectation.copy()
Expand Down Expand Up @@ -321,7 +367,8 @@ def _get_dwell_time_map(self, coord):

src_path = self._sc_orientation.get_target_in_sc_frame(coord)
dwell_time_map = self._sc_orientation.get_dwell_map(response = self._rsp_path,
src_path = src_path)
src_path = src_path,
pa_convention = self._response_pa_convention)

return dwell_time_map

Expand Down
Loading