diff --git a/gwcelery/tasks/alerts.py b/gwcelery/tasks/alerts.py index 41cb3b8d222b405b538dc852562bd47e65d7bae9..0f030fc2c63d5922f40fba1e7c59bc167ae53168 100644 --- a/gwcelery/tasks/alerts.py +++ b/gwcelery/tasks/alerts.py @@ -59,7 +59,8 @@ def _create_base_alert_dict(classification, superevent, alert_type, @gracedb.task(shared=False) -def _add_external_coinc_to_alert(alert_dict, superevent): +def _add_external_coinc_to_alert(alert_dict, superevent, + combined_skymap_filename): external_event = gracedb.get_event(superevent['em_type']) alert_dict['external_coinc'] = { @@ -74,6 +75,10 @@ def _add_external_coinc_to_alert(alert_dict, superevent): 'time_sky_position_coincidence_far': superevent['space_coinc_far'] } + if combined_skymap_filename: + alert_dict['external_coinc']['combined_skymap_filename'] = \ + combined_skymap_filename + return alert_dict @@ -131,7 +136,7 @@ def _send(self, alert_dict, skymap, brokerhost): @app.task(bind=True, ignore_result=True, queue='kafka', shared=False) def send(self, skymap_and_classification, superevent, alert_type, - raven_coinc=False): + raven_coinc=False, combined_skymap_filename=None): """Send an public alert to all currently connected kafka brokers. Parameters @@ -168,7 +173,8 @@ def send(self, skymap_and_classification, superevent, alert_type, if raven_coinc and alert_type != 'retraction': canvas = ( - _add_external_coinc_to_alert.s(alert_dict, superevent) + _add_external_coinc_to_alert.s(alert_dict, superevent, + combined_skymap_filename) | group( ( @@ -199,7 +205,8 @@ def _create_skymap_classification_tuple(skymap, classification): @app.task(shared=False, ignore_result=True) def download_skymap_and_send_alert(classification, superevent, alert_type, - skymap_filename=None, raven_coinc=False): + skymap_filename=None, raven_coinc=False, + combined_skymap_filename=False): """Wrapper for send function when caller has not already downloaded the skymap. diff --git a/gwcelery/tasks/external_skymaps.py b/gwcelery/tasks/external_skymaps.py index 64cc3280b5eb5a69b86e89cecd2f96316c9d31b5..d3e09521c12d064fcc631e33d69065252300eef4 100644 --- a/gwcelery/tasks/external_skymaps.py +++ b/gwcelery/tasks/external_skymaps.py @@ -1,11 +1,13 @@ """Create and upload external sky maps.""" from astropy import units as u from astropy.coordinates import ICRS, SkyCoord +import astropy_healpix as ah from astropy_healpix import HEALPix, pixel_resolution_to_nside from celery import group # import astropy.utils.data import numpy as np -from ligo.skymap.io import fits +from ligo.skymap.io import fits, read_sky_map, write_sky_map +from ligo.skymap.distance import parameters_to_marginal_moments from ligo.skymap.tool import ligo_skymap_combine import gcn import healpy as hp @@ -22,6 +24,7 @@ from ..util.tempfile import NamedTemporaryFile from ..import _version +@app.task(shared=False) def create_combined_skymap(se_id, ext_id): """Creates and uploads the combined LVC-Fermi skymap. @@ -30,29 +33,39 @@ def create_combined_skymap(se_id, ext_id): """ se_skymap_filename = get_skymap_filename(se_id) ext_skymap_filename = get_skymap_filename(ext_id) - new_skymap_filename = re.findall(r'(.*).fits.gz', se_skymap_filename)[0] + new_filename = \ + ('combined-ext.multiorder.fits' if '.multiorder.fits' in + se_skymap_filename else 'combined-ext.fits.gz') - # FIXME: put download functions in canvas - se_skymap = gracedb.download(se_skymap_filename, se_id) - ext_skymap = gracedb.download(ext_skymap_filename, ext_id) message = 'Combined LVC-external sky map using {0} and {1}'.format( se_skymap_filename, ext_skymap_filename) message_png = ( 'Mollweide projection of <a href="/api/events/{graceid}/files/' '{filename}">{filename}</a>').format( - graceid=se_id, filename=new_skymap_filename + '-ext.fits.gz') + graceid=ext_id, + filename=new_filename) ( - combine_skymaps.si(se_skymap, ext_skymap) + _download_skymaps.si( + se_skymap_filename, ext_skymap_filename, se_id, ext_id + ) + | + combine_skymaps.s() | group( - gracedb.upload.s(new_skymap_filename + '-ext.fits.gz', se_id, - message, ['sky_loc', 'public']), + gracedb.upload.s(new_filename, ext_id, + message, ['sky_loc', 'ext_coinc']), skymaps.plot_allsky.s() | - gracedb.upload.s(new_skymap_filename + '-ext.png', se_id, - message_png, ['sky_loc', 'ext_coinc', 'public']) + gracedb.upload.s('combined-ext.png', ext_id, + message_png, ['sky_loc', 'ext_coinc']) + ) + | + group( + gracedb.create_label.si('COMBINEDSKYMAP_READY', se_id), + + gracedb.create_label.si('COMBINEDSKYMAP_READY', ext_id) ) ).delay() @@ -67,10 +80,16 @@ def get_skymap_filename(graceid): """ gracedb_log = gracedb.get_log(graceid) if 'S' in graceid: + # Try first to get a multiordered sky map for message in reversed(gracedb_log): filename = message['filename'] if filename.endswith('.multiorder.fits'): return filename + # Try next to get a flattened sky map + for message in reversed(gracedb_log): + filename = message['filename'] + if filename.endswith('.fits.gz'): + return filename else: for message in reversed(gracedb_log): filename = message['filename'] @@ -81,17 +100,64 @@ def get_skymap_filename(graceid): @app.task(shared=False) -def combine_skymaps(skymap1filebytes, skymap2filebytes): +def _download_skymaps(se_filename, ext_filename, se_id, ext_id): + """Download both superevent and external sky map to be combined.""" + se_skymap = gracedb.download(se_filename, se_id) + ext_skymap = gracedb.download(ext_filename, ext_id) + return se_skymap, ext_skymap + + +@app.task(shared=False) +def combine_skymaps(skymapfilesbytes): """This task combines the two input skymaps, in this case the external trigger skymap and the LVC skymap and writes to a temporary output file. It then returns the contents of the file as a byte array. """ - with NamedTemporaryFile(mode='rb', suffix='.fits.gz') as combinedskymap, \ + skymap1filebytes, skymap2filebytes = skymapfilesbytes + with NamedTemporaryFile(mode='rb', suffix='.fits') as combinedskymap, \ NamedTemporaryFile(content=skymap1filebytes) as skymap1file, \ NamedTemporaryFile(content=skymap2filebytes) as skymap2file, \ handling_system_exit(): - ligo_skymap_combine.main([skymap1file.name, - skymap2file.name, combinedskymap.name]) + + # Determine whether GW sky map is multiordered or flat + gw_moc = '.multiorder.fits' in skymap1file.name + + if gw_moc: + # FIXME: Use method that regrids the combined sky map e.g. mhealpy + # Load sky maps + gw_sky = read_sky_map(skymap1file.name, moc=True) + ext_sky, ext_header = read_sky_map(skymap2file.name, moc=False) + + # Find ra/dec of each GW pixel + level, ipix = ah.uniq_to_level_ipix(gw_sky["UNIQ"]) + nsides = ah.level_to_nside(level) + areas = ah.nside_to_pixel_area(nsides) + ra_gw, dec_gw = ah.healpix_to_lonlat(ipix, nsides, order='nested') + + # Find corresponding external sky map indicies + ext_nside = ah.npix_to_nside(len(ext_sky)) + ext_ind = ah.lonlat_to_healpix( + ra_gw, dec_gw, ext_nside, + order='nested' if ext_header['nest'] else 'ring') + + # Weight GW prob density by external sky map probabilities + gw_sky['PROBDENSITY'] *= ext_sky[ext_ind] + gw_sky['PROBDENSITY'] /= \ + np.sum(gw_sky['PROBDENSITY'] * areas).value + + # Modify GW sky map with new data + distmean, diststd = parameters_to_marginal_moments( + gw_sky['PROBDENSITY'] * areas.value, + gw_sky['DISTMU'], gw_sky['DISTSIGMA']) + gw_sky.meta['distmean'], gw_sky.meta['diststd'] = distmean, diststd + gw_sky.meta['instruments'].update(ext_header['instruments']) + gw_sky.meta['HISTORY'].extend([ + '', 'The values were reweighted by using data from {}'.format( + list(ext_header['instruments'])[0])]) + write_sky_map(combinedskymap.name, gw_sky) + else: + ligo_skymap_combine.main([skymap1file.name, + skymap2file.name, combinedskymap.name]) return combinedskymap.read() diff --git a/gwcelery/tasks/external_triggers.py b/gwcelery/tasks/external_triggers.py index 9a1e951b99fa8bbe58befcf050f2575d12bc09f8..bde0edd969518c76d7417a0c8b111af0e83db75e 100644 --- a/gwcelery/tasks/external_triggers.py +++ b/gwcelery/tasks/external_triggers.py @@ -16,8 +16,7 @@ log = get_logger(__name__) REQUIRED_LABELS_BY_TASK = { - 'compare': {'SKYMAP_READY', 'EXT_SKYMAP_READY', 'EM_COINC'}, - 'combine': {'SKYMAP_READY', 'EXT_SKYMAP_READY', 'RAVEN_ALERT'} + 'compare': {'SKYMAP_READY', 'EXT_SKYMAP_READY', 'EM_COINC'} } """These labels should be present on an external event to consider it to be ready for sky map comparison. @@ -301,23 +300,23 @@ def handle_grb_igwn_alert(alert): if _skymaps_are_ready(alert['object'], alert['data']['name'], 'compare'): # if both sky maps present and a coincidence, compare sky maps - superevent_id, ext_ids = _get_superevent_ext_ids( - graceid, alert['object'], 'compare') + superevent_id, ext_id = _get_superevent_ext_ids( + graceid, alert['object'], 'combine') superevent = gracedb.get_superevent(superevent_id) - preferred_event_id = superevent['preferred_event'] - gw_group = gracedb.get_group(preferred_event_id) + gw_group = superevent['preferred_event_data']['group'] tl, th = raven._time_window(graceid, gw_group, [alert['object']['pipeline']], [alert['object']['search']]) - raven.raven_pipeline([alert['object']], superevent_id, superevent, - tl, th, gw_group) - if _skymaps_are_ready(alert['object'], alert['data']['name'], - 'combine'): - # if both sky maps present and a raven alert, create combined - # skymap - superevent_id, ext_id = _get_superevent_ext_ids( - graceid, alert['object'], 'combine') - external_skymaps.create_combined_skymap(superevent_id, ext_id) + # FIXME: both overlap integral and combined sky map could be + # done by the same function since they are so similar + group( + raven.raven_pipeline.si([alert['object']], superevent_id, + superevent, tl, th, gw_group), + + external_skymaps.create_combined_skymap.si( + superevent_id, ext_id + ) + ).delay() elif 'EM_COINC' in alert['object']['labels']: # if not complete, check if GW sky map; apply label to external # event if GW sky map diff --git a/gwcelery/tasks/orchestrator.py b/gwcelery/tasks/orchestrator.py index 60e3129376beb642973021d5a9c997426d54f8e2..efc6cec151595e7da66618d90ca1f9396b4a96d0 100644 --- a/gwcelery/tasks/orchestrator.py +++ b/gwcelery/tasks/orchestrator.py @@ -753,6 +753,8 @@ def earlywarning_preliminary_initial_update_alert( skymap_needed = (skymap_filename is None) em_bright_needed = (em_bright_filename is None) p_astro_needed = (p_astro_filename is None) + combined_skymap_needed = \ + {"RAVEN_ALERT", "COMBINEDSKYMAP_READY"}.issubset(set(labels)) if skymap_needed or em_bright_needed or p_astro_needed: for message in gracedb.get_log(superevent_id): t = message['tag_names'] @@ -774,6 +776,40 @@ def earlywarning_preliminary_initial_update_alert( and f.endswith('.json'): p_astro_filename = fv + if combined_skymap_needed: + # if both labels present, download sky map from external event + # FIXME: use file inheritance once available + ext_id = superevent['em_type'] + combined_skymap_filename = \ + ('combined-ext.multiorder.fits' if '.multiorder.fits' in + skymap_filename else 'combined-ext.fits.gz') + message = 'Combined LVC-external sky map using {0} and {1}'.format( + superevent_id, ext_id) + message_png = ( + 'Mollweide projection of <a href="/api/events/{graceid}/files/' + '{filename}">{filename}</a>').format( + graceid=superevent_id, + filename=combined_skymap_filename) + + combined_skymap_canvas = group( + gracedb.download.si(combined_skymap_filename, ext_id) + | + gracedb.upload.s( + combined_skymap_filename, superevent_id, + message, ['sky_loc', 'ext_coinc', 'public'] + ), + + gracedb.download.si('combined-ext.png', ext_id) + | + gracedb.upload.s( + 'combined-ext.png', superevent_id, + message_png, ['sky_loc', 'ext_coinc', 'public'] + ) + ) + else: + combined_skymap_filename = None + combined_skymap_canvas = identity.si() + if alert_type in {'earlywarning', 'preliminary', 'initial'}: if 'RAVEN_ALERT' in labels: circular_task = circulars.create_emcoinc_circular.si(superevent_id) @@ -810,14 +846,16 @@ def earlywarning_preliminary_initial_update_alert( skymap_filename=skymap_filename, internal=False, open_alert=True, - raven_coinc=('RAVEN_ALERT' in labels) + raven_coinc=('RAVEN_ALERT' in labels), + combined_skymap_filename=combined_skymap_filename ) kafka_alert_canvas = alerts.send.si( (skymap, em_bright, p_astro), superevent, alert_type, - raven_coinc=('RAVEN_ALERT' in labels) + raven_coinc=('RAVEN_ALERT' in labels), + combined_skymap_filename=combined_skymap_filename ) else: # Download em_bright and p_astro files here for voevent @@ -832,7 +870,8 @@ def earlywarning_preliminary_initial_update_alert( skymap_filename=skymap_filename, internal=False, open_alert=True, - raven_coinc=('RAVEN_ALERT' in labels) + raven_coinc=('RAVEN_ALERT' in labels), + combined_skymap_filename=combined_skymap_filename ) # The skymap has not been downloaded at this point, so we need to @@ -841,7 +880,8 @@ def earlywarning_preliminary_initial_update_alert( superevent, alert_type, skymap_filename=skymap_filename, - raven_coinc=('RAVEN_ALERT' in labels) + raven_coinc=('RAVEN_ALERT' in labels), + combined_skymap_filename=combined_skymap_filename ) download_andor_expose_group += [ @@ -864,6 +904,8 @@ def earlywarning_preliminary_initial_update_alert( ) canvas = ( + combined_skymap_canvas + | group(download_andor_expose_group) | group(voevent_canvas, kafka_alert_canvas) diff --git a/gwcelery/tests/data/gracedb_setrigger_log.json b/gwcelery/tests/data/gracedb_setrigger_log.json index 66149c1b0f7004b547a56a8be3356a95ba37e0a2..1a4c33fac7adf6bbebfadb0d4006de673c07c7ed 100644 --- a/gwcelery/tests/data/gracedb_setrigger_log.json +++ b/gwcelery/tests/data/gracedb_setrigger_log.json @@ -8,6 +8,6 @@ "filename": "bayestar.multiorder.fits", "file_version": 0, "tag_names": ["sky_loc", "public"], - "file": "https://gracedb-dev1.ligo.org/api/superevents/S170817/files/bayestar.fits.gz,0" + "file": "https://gracedb-dev1.ligo.org/api/superevents/S170817/files/bayestar.multiorder.fits,0" } ] diff --git a/gwcelery/tests/test_tasks_external_triggers.py b/gwcelery/tests/test_tasks_external_triggers.py index 661aa328a1d01d0384d0e3e8af6457922743dbc3..b99d859c50cf79cf277511f860f2ad2d50c9c3dd 100644 --- a/gwcelery/tests/test_tasks_external_triggers.py +++ b/gwcelery/tests/test_tasks_external_triggers.py @@ -260,12 +260,14 @@ def test_handle_create_skymap_label_from_superevent(mock_create_label, mock_create_label.assert_called_once_with('SKYMAP_READY', 'E1212') -@patch('gwcelery.tasks.gracedb.get_group', return_value='CBC') -@patch('gwcelery.tasks.raven.raven_pipeline') +@patch('gwcelery.tasks.raven.raven_pipeline.run') +@patch('gwcelery.tasks.external_skymaps.create_combined_skymap.run') @patch('gwcelery.tasks.gracedb.get_superevent', return_value={ 'superevent_id': 'S1234', - 'preferred_event': 'G1234' + 'preferred_event': 'G1234', + 'preferred_event_data': + {'group': 'CBC'} }) @patch('gwcelery.tasks.gracedb.get_event', return_value={ @@ -273,7 +275,8 @@ def test_handle_create_skymap_label_from_superevent(mock_create_label, 'group': 'CBC' }) def test_handle_skymap_comparison(mock_get_event, mock_get_superevent, - mock_raven_pipeline, mock_get_group): + mock_create_combined_skymap, + mock_raven_pipeline): alert = {"uid": "E1212", "alert_type": "label_added", "data": {"name": "SKYMAP_READY"}, @@ -289,7 +292,9 @@ def test_handle_skymap_comparison(mock_get_event, mock_get_superevent, external_triggers.handle_grb_igwn_alert(alert) mock_raven_pipeline.assert_called_once_with([alert['object']], 'S1234', {'superevent_id': 'S1234', - 'preferred_event': 'G1234'}, + 'preferred_event': 'G1234', + 'preferred_event_data': + {'group': 'CBC'}}, -5, 1, 'CBC') @@ -332,20 +337,31 @@ def test_handle_label_removed(mock_get_superevent, ) -@patch('gwcelery.tasks.external_skymaps.create_combined_skymap') -def test_handle_skymap_combine(mock_create_combined_skymap): +@patch('gwcelery.tasks.gracedb.get_superevent', + return_value={ + 'superevent_id': 'S1234', + 'preferred_event': 'G1234', + 'preferred_event_data': + {'group': 'CBC'} + }) +@patch('gwcelery.tasks.raven.raven_pipeline.run') +@patch('gwcelery.tasks.external_skymaps.create_combined_skymap.run') +def test_handle_skymap_combine(mock_create_combined_skymap, + mock_raven_pipeline, mock_get_superevent): alert = {"uid": "E1212", "alert_type": "label_added", - "data": {"name": "RAVEN_ALERT"}, + "data": {"name": "SKYMAP_READY"}, "object": { "graceid": "E1212", "group": "External", - "labels": ["EM_COINC", "EXT_SKYMAP_READY", "SKYMAP_READY", - "RAVEN_ALERT"], - "superevent": "S1234"} + "labels": ["EM_COINC", "EXT_SKYMAP_READY", "SKYMAP_READY"], + "superevent": "S1234", + "pipeline": "Fermi", + "search": "GRB"} } external_triggers.handle_grb_igwn_alert(alert) - mock_create_combined_skymap.assert_called_once_with('S1234', 'E1212') + mock_create_combined_skymap.assert_called_once_with( + 'S1234', 'E1212', gw_moc=True) @patch('gwcelery.tasks.detchar.dqr_json', return_value='dqrjson') diff --git a/gwcelery/tests/test_tasks_orchestrator.py b/gwcelery/tests/test_tasks_orchestrator.py index b949368c82cde76a6f6937a70c01938013171d64..5ee5f895eb7a8bc4e2e8f3bc9b0465bfbf0bc473 100644 --- a/gwcelery/tests/test_tasks_orchestrator.py +++ b/gwcelery/tests/test_tasks_orchestrator.py @@ -279,7 +279,8 @@ def test_handle_superevent_initial_alert(mock_create_initial_circular, 'S1234', 'initial', BBH=0.02, BNS=0.94, NSBH=0.03, ProbHasNS=0.0, ProbHasRemnant=0.0, Terrestrial=0.01, internal=False, open_alert=True, skymap_filename='foobar.multiorder.fits,0', skymap_type='foobar', - raven_coinc='RAVEN_ALERT' in labels) + raven_coinc='RAVEN_ALERT' in labels, + combined_skymap_filename=None) mock_alerts_send.assert_called_once_with(( superevent_initial_alert_download('foobar.multiorder.fits,0', 'S1234'), superevent_initial_alert_download('em_bright.json,0', 'S1234'),