Skip to content
Snippets Groups Projects
Commit 6c1713d6 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

allow frame caching/checkpointing if file transfer is disabled for inspiral jobs

parent cebff3a4
No related branches found
No related tags found
1 merge request!41DAG Workflow Overhaul + OSG DAG support
......@@ -190,37 +190,89 @@ def filter_layer(config, dag, ref_psd_cache, svd_bank_cache):
if config.condor.singularity_image:
common_opts.append(Option("disable-service-discovery"))
ref_psds = ref_psd_cache.groupby("ifo", "time")
svd_banks = svd_bank_cache.groupby("ifo", "bin")
dist_stats = dist_stat_cache.groupby("ifo", "time", "bin")
for (ifo_combo, span, svd_bin), triggers in trigger_cache.groupby("ifo", "time", "bin").items():
ifos = config.to_ifo_list(ifo_combo)
start, end = span
filter_opts = [
Option("ht-gate-threshold", calc_gate_threshold(config, svd_bin)),
Option("gps-start-time", int(start)),
Option("gps-end-time", int(end)),
Option("channel-name", dagutil.format_ifo_args(ifos, config.source.channel_name)),
Option("frame-type", dagutil.format_ifo_args(ifos, config.source.frame_type)),
]
filter_opts.extend(common_opts)
# if using file transfer, do not checkpoint or group SVD bins together
# NOTE: not decided whether this is optimal, and should be revisited
# if file transfer is disabled, checkpoint by grouping SVD bins together
# and enable local frame caching if there if more than one SVD bin per group
max_concurrency = 40
num_per_group = 1 + (len(config.svd.bins) // max_concurrency)
if not config.condor.transfer_files and num_per_group > 1:
common_opts.append(Option("local-frame-caching"))
ref_psds = ref_psd_cache.groupby("ifo", "time")
svd_banks = svd_bank_cache.groupby("ifo", "bin")
dist_stats = dist_stat_cache.groupby("ifo", "time", "bin")
for (ifo_combo, span), triggers in trigger_cache.groupby("ifo", "time").items():
ifos = config.to_ifo_list(ifo_combo)
start, end = span
filter_opts = [
Option("gps-start-time", int(start)),
Option("gps-end-time", int(end)),
Option("channel-name", dagutil.format_ifo_args(ifos, config.source.channel_name)),
Option("frame-type", dagutil.format_ifo_args(ifos, config.source.frame_type)),
]
filter_opts.extend(common_opts)
for trigger_group in triggers.chunked(num_per_group):
svd_bins = trigger_group.groupby("bin").keys()
thresholds = [calc_gate_threshold(config, svd_bin) for svd_bin in svd_bins]
these_opts = [Option("ht-gate-threshold", thresholds), *filter_opts]
svd_bank_files = dagutil.flatten(
[svd_banks[(ifo, svd_bin)].files for ifo in ifos for svd_bin in svd_bins]
)
dist_stat_files = dagutil.flatten(
[dist_stats[(ifo_combo, span, svd_bin)].files for svd_bin in svd_bins]
)
layer += Node(
arguments = these_opts,
inputs = [
Option("frame-segments-file", config.source.frame_segments_file),
Option("veto-segments-file", config.filter.veto_segments_file),
Option("reference-psd", ref_psds[(ifo_combo, span)].files),
Option("time-slide-file", config.filter.time_slide_file),
Option("svd-bank", svd_bank_files),
],
outputs = [
Option("output", trigger_group.files),
Option("ranking-stat-output", dist_stat_files),
],
)
else:
ref_psds = ref_psd_cache.groupby("ifo", "time")
svd_banks = svd_bank_cache.groupby("ifo", "bin")
dist_stats = dist_stat_cache.groupby("ifo", "time", "bin")
for (ifo_combo, span, svd_bin), triggers in trigger_cache.groupby("ifo", "time", "bin").items():
ifos = config.to_ifo_list(ifo_combo)
start, end = span
filter_opts = [
Option("ht-gate-threshold", calc_gate_threshold(config, svd_bin)),
Option("gps-start-time", int(start)),
Option("gps-end-time", int(end)),
Option("channel-name", dagutil.format_ifo_args(ifos, config.source.channel_name)),
Option("frame-type", dagutil.format_ifo_args(ifos, config.source.frame_type)),
]
filter_opts.extend(common_opts)
svd_bank_files = dagutil.flatten([svd_banks[(ifo, svd_bin)].files for ifo in ifos])
layer += Node(
arguments=filter_opts,
inputs = [
Option("frame-segments-file", config.source.frame_segments_file),
Option("veto-segments-file", config.filter.veto_segments_file),
Option("reference-psd", ref_psds[(ifo_combo, span)].files),
Option("time-slide-file", config.filter.time_slide_file),
Option("svd-bank", svd_bank_files),
],
outputs = [
Option("output", triggers.files),
Option("ranking-stat-output", dist_stats[(ifo_combo, span, svd_bin)].files),
],
)
svd_bank_files = dagutil.flatten([svd_banks[(ifo, svd_bin)].files for ifo in ifos])
layer += Node(
arguments=filter_opts,
inputs = [
Option("frame-segments-file", config.source.frame_segments_file),
Option("veto-segments-file", config.filter.veto_segments_file),
Option("reference-psd", ref_psds[(ifo_combo, span)].files),
Option("time-slide-file", config.filter.time_slide_file),
Option("svd-bank", svd_bank_files),
],
outputs = [
Option("output", triggers.files),
Option("ranking-stat-output", dist_stats[(ifo_combo, span, svd_bin)].files),
],
)
dag.attach(layer)
......@@ -266,36 +318,83 @@ def filter_injections_layer(config, dag, ref_psd_cache, svd_bank_cache):
if config.condor.singularity_image:
common_opts.append(Option("disable-service-discovery"))
ref_psds = ref_psd_cache.groupby("ifo", "time")
svd_banks = svd_bank_cache.groupby("ifo", "bin")
for (ifo_combo, span, svd_bin, inj_type), triggers in trigger_cache.groupby("ifo", "time", "bin", "subtype").items():
ifos = config.to_ifo_list(ifo_combo)
start, end = span
# if using file transfer, do not checkpoint or group SVD bins together
# NOTE: not decided whether this is optimal, and should be revisited
# if file transfer is disabled, checkpoint by grouping SVD bins together
# and enable local frame caching if there if more than one SVD bin per group
max_concurrency = 40
num_per_group = 1 + (len(config.svd.bins) // max_concurrency)
if not config.condor.transfer_files and num_per_group > 1:
common_opts.append(Option("local-frame-caching"))
ref_psds = ref_psd_cache.groupby("ifo", "time")
svd_banks = svd_bank_cache.groupby("ifo", "bin")
for (ifo_combo, span, inj_type), triggers in trigger_cache.groupby("ifo", "time", "subtype").items():
ifos = config.to_ifo_list(ifo_combo)
start, end = span
filter_opts = [
Option("gps-start-time", int(start)),
Option("gps-end-time", int(end)),
Option("channel-name", dagutil.format_ifo_args(ifos, config.source.channel_name)),
Option("frame-type", dagutil.format_ifo_args(ifos, config.source.frame_type)),
]
filter_opts.extend(common_opts)
injection_file = config.filter.injections[inj_type.lower()]["file"]
for trigger_group in triggers.chunked(num_per_group):
svd_bins = trigger_group.groupby("bin").keys()
thresholds = [calc_gate_threshold(config, svd_bin) for svd_bin in svd_bins]
these_opts = [Option("ht-gate-threshold", thresholds), *filter_opts]
svd_bank_files = dagutil.flatten(
[svd_banks[(ifo, svd_bin)].files for ifo in ifos for svd_bin in svd_bins]
)
layer += Node(
arguments = these_opts,
inputs = [
Option("frame-segments-file", config.source.frame_segments_file),
Option("veto-segments-file", config.filter.veto_segments_file),
Option("reference-psd", ref_psds[(ifo_combo, span)].files),
Option("time-slide-file", config.filter.time_slide_file),
Option("svd-bank", svd_bank_files),
Option("injections", injection_file),
],
outputs = Option("output", trigger_group.files),
)
else:
ref_psds = ref_psd_cache.groupby("ifo", "time")
svd_banks = svd_bank_cache.groupby("ifo", "bin")
for (ifo_combo, span, svd_bin, inj_type), triggers in trigger_cache.groupby("ifo", "time", "bin", "subtype").items():
ifos = config.to_ifo_list(ifo_combo)
start, end = span
filter_opts = [
Option("ht-gate-threshold", calc_gate_threshold(config, svd_bin)),
Option("gps-start-time", int(start)),
Option("gps-end-time", int(end)),
Option("channel-name", dagutil.format_ifo_args(ifos, config.source.channel_name)),
Option("frame-type", dagutil.format_ifo_args(ifos, config.source.frame_type)),
]
filter_opts.extend(common_opts)
filter_opts = [
Option("ht-gate-threshold", calc_gate_threshold(config, svd_bin)),
Option("gps-start-time", int(start)),
Option("gps-end-time", int(end)),
Option("channel-name", dagutil.format_ifo_args(ifos, config.source.channel_name)),
Option("frame-type", dagutil.format_ifo_args(ifos, config.source.frame_type)),
]
filter_opts.extend(common_opts)
svd_bank_files = dagutil.flatten([svd_banks[(ifo, svd_bin)].files for ifo in ifos])
injection_file = config.filter.injections[inj_type.lower()]["file"]
svd_bank_files = dagutil.flatten([svd_banks[(ifo, svd_bin)].files for ifo in ifos])
injection_file = config.filter.injections[inj_type.lower()]["file"]
layer += Node(
arguments=filter_opts,
inputs = [
Option("frame-segments-file", config.source.frame_segments_file),
Option("veto-segments-file", config.filter.veto_segments_file),
Option("reference-psd", ref_psds[(ifo_combo, span)].files),
Option("time-slide-file", config.filter.injection_time_slide_file),
Option("svd-bank", svd_bank_files),
Option("injections", injection_file),
],
outputs=Option("output", triggers.files),
)
layer += Node(
arguments=filter_opts,
inputs = [
Option("frame-segments-file", config.source.frame_segments_file),
Option("veto-segments-file", config.filter.veto_segments_file),
Option("reference-psd", ref_psds[(ifo_combo, span)].files),
Option("time-slide-file", config.filter.injection_time_slide_file),
Option("svd-bank", svd_bank_files),
Option("injections", injection_file),
],
outputs=Option("output", triggers.files),
)
dag.attach(layer)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment