diff --git a/gstlal-inspiral/bin/gstlal_ll_inspiral_event_plotter b/gstlal-inspiral/bin/gstlal_ll_inspiral_event_plotter index ef78f266f8b06516c8591ac50deb71ae9a335824..21851f3f55236e26c22cbae7f92277564b7a4ead 100755 --- a/gstlal-inspiral/bin/gstlal_ll_inspiral_event_plotter +++ b/gstlal-inspiral/bin/gstlal_ll_inspiral_event_plotter @@ -94,6 +94,8 @@ def parse_command_line(): parser.add_option('--processing-cadence', type = 'float', default = 0.1, help = 'Rate at which the event plotter acquires and processes data. Default = 0.1 seconds.') parser.add_option('--request-timeout', type = 'float', default = 0.2, help = 'Timeout for requesting messages from a topic. Default = 0.2 seconds.') parser.add_option('--kafka-server', metavar = 'string', help = 'Sets the server url that the kafka topic is hosted on. Required.') + parser.add_option("--upload-topic", metavar = "string", help = "Sets the input kafka topic to get uploaded event info from. Required.") + parser.add_option("--ranking-stat-topic", metavar = "string", help = "Sets the input kafka topic to get ranking stat info from. Required.") parser.add_option('--gracedb-group', metavar = 'name', default = 'Test', help = 'Gracedb group to which to upload events (default is Test).') parser.add_option('--gracedb-pipeline', metavar = 'name', default = 'gstlal', help = 'Name of pipeline to provide in GracedB uploads (default is gstlal).') parser.add_option('--gracedb-search', metavar = 'name', default = 'LowMass', help = 'Name of search to provide in GracedB uploads (default is LowMass).') @@ -123,8 +125,8 @@ class EventPlotter(events.EventProcessor): def __init__(self, options): logging.info('setting up event plotter...') - self.upload_topic = f'gstlal.{options.tag}.uploads' - self.ranking_stat_topic = f'gstlal.{options.tag}.ranking_stat' + self.upload_topic = f'gstlal.{options.tag}.{options.upload_topic}' + self.ranking_stat_topic = f'gstlal.{options.tag}.{options.ranking_stat_topic}' events.EventProcessor.__init__( self, process_cadence=options.processing_cadence, diff --git a/gstlal-inspiral/bin/gstlal_ll_inspiral_event_uploader b/gstlal-inspiral/bin/gstlal_ll_inspiral_event_uploader index 70675565ac677c8cc67bac34bb3e5872cace1512..9e1fc22184891a1cc7c1b51b753a3b3dd03fb99a 100755 --- a/gstlal-inspiral/bin/gstlal_ll_inspiral_event_uploader +++ b/gstlal-inspiral/bin/gstlal_ll_inspiral_event_uploader @@ -77,7 +77,7 @@ def parse_command_line(): parser.add_option("--processing-cadence", type = "float", default = 0.1, help = "Rate at which the event uploader acquires and processes data. Default = 0.1 seconds.") parser.add_option("--request-timeout", type = "float", default = 0.2, help = "Timeout for requesting messages from a topic. Default = 0.2 seconds.") parser.add_option("--kafka-server", metavar = "string", help = "Sets the server url that the kafka topic is hosted on. Required.") - parser.add_option("--input-topic", metavar = "string", action="append", help = "Sets the input kafka topic(s). Required.") + parser.add_option("--input-topic", metavar = "string", help = "Sets the input kafka topic. Required.") parser.add_option("--gracedb-group", metavar = "name", default = "Test", help = "Gracedb group to which to upload events (default is Test).") parser.add_option("--gracedb-pipeline", metavar = "name", default = "gstlal", help = "Name of pipeline to provide in GracedB uploads (default is gstlal).") parser.add_option("--gracedb-search", metavar = "name", default = "LowMass", help = "Name of search to provide in GracedB uploads (default is LowMass).") @@ -107,7 +107,7 @@ class EventUploader(events.EventProcessor): request_timeout=options.request_timeout, num_messages=options.num_jobs, kafka_server=options.kafka_server, - input_topic=[f"gstlal.{options.tag}.{topic}" for topic in options.input_topic], + input_topic="gstlal.{options.tag}.{options.input_topic}", tag=options.tag ) @@ -135,9 +135,10 @@ class EventUploader(events.EventProcessor): self.events = OrderedDict() ### favored event settings + topic_prefix = "" if options.input_topic == "events" else "inj_" self.public_far_threshold = options.far_threshold / options.far_trials_factor - self.favored_event_topic = f'gstlal.{self.tag}.favored_events' - self.upload_topic = f'gstlal.{self.tag}.uploads' + self.favored_event_topic = f'gstlal.{self.tag}.{topic_prefix}favored_events' + self.upload_topic = f'gstlal.{self.tag}.{topic_prefix}uploads' ### heartbeat settings self.last_heartbeat = 0.0 @@ -566,6 +567,10 @@ if __name__ == '__main__': # parse arguments options, args = parse_command_line() + # check input topic + if not options.input_topic in ("events", "inj_events"): + raise Exception("Input topic should be either events or inj_events.") + # set up logging log_level = logging.DEBUG if options.verbose else logging.INFO logging.basicConfig(format = '%(asctime)s | event_uploader : %(levelname)s : %(message)s') diff --git a/gstlal-inspiral/python/dags/layers/inspiral.py b/gstlal-inspiral/python/dags/layers/inspiral.py index 0cfa4f4bdbc3a279b5a1b08e26cc9bf1010f9ee0..a8ff595ad7be8ab57c7459095eb1e4a0dd911270 100644 --- a/gstlal-inspiral/python/dags/layers/inspiral.py +++ b/gstlal-inspiral/python/dags/layers/inspiral.py @@ -1888,13 +1888,15 @@ def upload_pastro_layer(config, dag): retries=1000, ) + input_topics = ["uploads", "inj_uploads"] if config.upload.enable_injection_uploads else ["uploads"] + for model, options in config.pastro.items(): layer += Node( arguments = [ Option("kafka-server", config.services.kafka_server), Option("gracedb-service-url", config.upload.gracedb_service_url), Option("tag", config.tag), - Option("input-topic", "favored_events"), + Option("input-topic", input_topics), Option("model-name", model), Option("pastro-filename", options.upload_file), Option("pastro-model-file", options.mass_model), @@ -1918,17 +1920,23 @@ def plot_events_layer(config, dag): retries=1000, ) - layer += Node( - arguments = [ - Option("kafka-server", config.services.kafka_server), - Option("gracedb-group", config.upload.gracedb_group), - Option("gracedb-pipeline", config.upload.gracedb_pipeline), - Option("gracedb-search", config.upload.gracedb_search), - Option("gracedb-service-url", config.upload.gracedb_service_url), - Option("tag", config.tag), - Option("verbose"), - ], - ) + upload_topics = ["uploads", "inj_uploads"] if config.upload.enable_injection_uploads else ["uploads"] + ranking_stat_topics = ["ranking_stat", "inj_ranking_stat"] if config.upload.enable_injection_uploads else ["ranking_stat"] + + for upload_topic, ranking_stat_topic in zip(upload_topics, ranking_stat_topics): + layer += Node( + arguments = [ + Option("kafka-server", config.services.kafka_server), + Option("upload-topic", upload_topic), + Option("ranking-stat-topic", ranking_stat_topic), + Option("gracedb-group", config.upload.gracedb_group), + Option("gracedb-pipeline", config.upload.gracedb_pipeline), + Option("gracedb-search", config.upload.gracedb_search), + Option("gracedb-service-url", config.upload.gracedb_service_url), + Option("tag", config.tag), + Option("verbose"), + ], + ) dag.attach(layer)