Skip to content
Snippets Groups Projects
Commit 404708ba authored by Patrick Godwin's avatar Patrick Godwin
Browse files

add gstlal_snax_workflow_online with online layers

parent 14240dbd
No related branches found
No related tags found
No related merge requests found
......@@ -11,4 +11,5 @@ dist_bin_SCRIPTS = \
gstlal_snax_sink \
gstlal_snax_synchronize \
gstlal_snax_whiten \
gstlal_snax_workflow
gstlal_snax_workflow \
gstlal_snax_workflow_online
......@@ -25,7 +25,6 @@ from gstlal.snax.dags import DAG
parser = argparse.ArgumentParser()
parser.add_argument("-c", "--config", help="Sets the path to read configuration from.")
parser.add_argument("-w", "--workflow", default="offline", help="Sets the type of workflow to run.")
# load config
args = parser.parse_args()
......@@ -36,10 +35,10 @@ dag = DAG(config)
dag.create_log_dir()
# generate dag layers
features = dag.extract_features()
dag.combine_features(features)
features = dag.extract()
dag.combine(features)
# write dag/script to disk
dag_name = f"snax_{args.workflow}_dag"
dag_name = "snax_offline_dag"
dag.write_dag(f"{dag_name}.dag")
dag.write_script(f"{dag_name}.sh")
#!/usr/bin/env python3
#
# Copyright (C) 2020 Patrick Godwin (patrick.godwin@ligo.org)
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import argparse
from gstlal.snax.config import Config
from gstlal.snax.dags import DAG
parser = argparse.ArgumentParser()
parser.add_argument("-c", "--config", help="Sets the path to read configuration from.")
# load config
args = parser.parse_args()
config = Config.load(args.config)
# create dag
dag = DAG(config)
dag.create_log_dir()
# generate dag layers
features = dag.extract_online()
dag.synchronize(features)
dag.save(features)
# write dag/script to disk
dag_name = "snax_online_dag"
dag.write_dag(f"{dag_name}.dag")
dag.write_script(f"{dag_name}.sh")
......@@ -37,6 +37,8 @@ class Config(BaseConfig):
self.metrics = dotdict(replace_keys(kwargs["metrics"]))
if "services" in kwargs:
self.services = dotdict(replace_keys(kwargs["services"]))
if "stream" in kwargs:
self.stream = dotdict(replace_keys(kwargs["stream"]))
self.channels = multichannel_datasource.channel_dict_from_channel_ini(**self.source)
self.channel_groups = multichannel_datasource.partition_channels_to_equal_subsets(
......@@ -45,5 +47,4 @@ class Config(BaseConfig):
self.source.get("min_sample_rate", 32),
self.source.get("max_sample_rate", 4096),
)
#self.channel_groups = [[pair[1] for pair in subset] for subset in subsets]
self.channel_bins = [f"{i:04d}" for i, _ in enumerate(self.channel_groups)]
......@@ -102,7 +102,6 @@ def combine_features_layer(config, dag, feature_cache):
Option("start-time", start + 512), # FIXME: don't hardcode
Option("end-time", end),
Option("instrument", config.ifo),
Option("basename", "SNAX_FEATURES"),
Option("tag", "offline"),
],
inputs = [
......@@ -116,6 +115,119 @@ def combine_features_layer(config, dag, feature_cache):
return combined_feature_cache
def extract_features_online_layer(config, dag):
layer = Layer(
"gstlal_snax_extract",
requirements={"request_cpus": 2, "request_memory": 8000, **config.condor.submit},
transfer_files=config.condor.transfer_files,
retries=1000,
)
# set up datasource options
if config.source.data_source == "framexmit":
datasource_args = [
Option("data-source", "framexmit"),
Option("framexmit-addr", f"{config.ifo}={config.source.framexmit_addr[config.ifo]}"),
Option("framexmit-iface", config.source.framexmit_iface),
]
elif config.source.data_source == "lvshm":
datasource_args = [
Option("data-source", "lvshm"),
Option("shared-memory-partition", f"{config.ifo}={config.source.shared_memory_partition[config.ifo]}"),
Option("shared-memory-block-size", config.source.shared_memory_block_size),
Option("shared-memory-assumed-duration", config.source.shared_memory_assumed_duration),
]
else:
raise ValueError(f"data source = {config.source.data_source} not valid for online jobs")
common_args = [
Option("sample-rate", config.features.sample_rate),
Option("mismatch", config.features.mismatch),
Option("waveform", config.features.waveform),
Option("q-high", config.features.q_high),
Option("psd-fft-length", config.psd.fft_length),
Option("kafka-server", config.services.kafka_server),
Option("kafka-partition", config.stream.kafka_partition),
Option("kafka-topic", config.stream.kafka_topic),
]
feature_cache = DataCache.generate(
DataType.SNAX_FEATURES,
config.all_ifos,
svd_bins=config.channel_bins
)
for subset, features in feature_cache.groupby("bin").items():
channel_subset = config.channel_groups[int(subset)]
channel_names = format_channel_names(config.channels, channel_subset)
layer += Node(
arguments = [
Option("job-id", subset),
Option("channel-name", channel_names),
*common_args,
*datasource_args,
]
)
dag.attach(layer)
return feature_cache
def synchronize_features_layer(config, dag, feature_cache):
layer = Layer(
"gstlal_snax_synchronize",
requirements={"request_cpus": 1, "request_memory": 4000, **config.condor.submit},
transfer_files=config.condor.transfer_files,
retries=1000,
)
layer += Node(
arguments = [
Option("tag", config.tag),
Option("num-topics", len(feature_cache.groupby("bin").keys())),
Option("kafka-server", config.services.kafka_server),
Option("processing-cadence", config.stream.processing_cadence),
Option("request-timeout", config.stream.request_timeout),
Option("latency-timeout", config.stream.latency_timeout),
Option("input-topic-basename", config.stream.kafka_topic),
Option("output-topic-basename", f"synchronizer_{config.tag}")
]
)
dag.attach(layer)
def save_features_layer(config, dag, feature_cache):
layer = Layer(
"gstlal_snax_sink",
requirements={"request_cpus": 1, "request_memory": 4000, **config.condor.submit},
transfer_files=config.condor.transfer_files,
retries=1000,
)
layer += Node(
arguments = [
Option("tag", config.tag),
Option("instrument", config.ifo),
Option("kafka-server", config.services.kafka_server),
Option("input-topic-basename", f"synchronizer_{config.tag}"),
Option("processing-cadence", config.stream.processing_cadence),
Option("request-timeout", config.stream.request_timeout),
Option("features-path", config.output.directory),
Option("channel-list", config.source.channel_list),
Option("safety-include", config.source.safety_include),
Option("unsafe-channel-include", config.source.unsafe_channel_include),
Option("waveform", config.features.waveform),
Option("sample-rate", config.features.sample_rate),
Option("write-cadence", config.output.dataset_cadence),
Option("persist-cadence", config.output.file_cadence),
]
)
dag.attach(layer)
def format_channel_names(channel_dict, channels):
"""
given a channel dict and a list of a channels, format
......@@ -127,6 +239,9 @@ def format_channel_names(channel_dict, channels):
@plugins.register
def layers():
return {
"extract_features": extract_features_layer,
"combine_features": combine_features_layer,
"extract": extract_features_layer,
"combine": combine_features_layer,
"extract_online": extract_features_online_layer,
"synchronize": synchronize_features_layer,
"save": save_features_layer,
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment