Skip to content
Snippets Groups Projects

Plot horizon distance from ranking statistics

Merged ChiWai Chan requested to merge plot_psd_horizon into master
1 unresolved thread
1 file
+ 85
156
Compare changes
  • Side-by-side
  • Inline
@@ -191,91 +191,55 @@ def filter_layer(config, dag, ref_psd_cache, svd_bank_cache):
if config.condor.singularity_image:
common_opts.append(Option("disable-service-discovery"))
# if using file transfer, do not checkpoint or group SVD bins together
# NOTE: not decided whether this is optimal, and should be revisited
# if file transfer is disabled, checkpoint by grouping SVD bins together
# and enable local frame caching if there if more than one SVD bin per group
max_concurrency = 20
# checkpoint by grouping SVD bins together + enable local
# frame caching if there if more than one SVD bin per group
if config.condor.transfer_files:
max_concurrency = 10
else:
max_concurrency = 20
num_per_group = 1 + (len(config.svd.bins) // max_concurrency)
if not config.condor.transfer_files and num_per_group > 1:
if num_per_group > 1:
common_opts.append(Option("local-frame-caching"))
ref_psds = ref_psd_cache.groupby("ifo", "time")
svd_banks = svd_bank_cache.groupby("ifo", "bin")
dist_stats = dist_stat_cache.groupby("ifo", "time", "bin")
for (ifo_combo, span), triggers in trigger_cache.groupby("ifo", "time").items():
ifos = config.to_ifo_list(ifo_combo)
start, end = span
filter_opts = [
Option("gps-start-time", int(start)),
Option("gps-end-time", int(end)),
Option("channel-name", dagutil.format_ifo_args(ifos, config.source.channel_name)),
]
if config.source.frame_cache:
filter_opts.append(Option("frame-cache", config.source.frame_cache, track=False))
else:
filter_opts.extend([
Option("frame-type", dagutil.format_ifo_args(ifos, config.source.frame_type)),
Option("data-find-server", config.source.data_find_server),
])
filter_opts.extend(common_opts)
for trigger_group in triggers.chunked(num_per_group):
svd_bins = trigger_group.groupby("bin").keys()
thresholds = [calc_gate_threshold(config, svd_bin) for svd_bin in svd_bins]
these_opts = [Option("ht-gate-threshold", thresholds), *filter_opts]
svd_bank_files = dagutil.flatten(
[svd_banks[(ifo, svd_bin)].files for ifo in ifos for svd_bin in svd_bins]
)
dist_stat_files = dagutil.flatten(
[dist_stats[(ifo_combo, span, svd_bin)].files for svd_bin in svd_bins]
)
layer += Node(
arguments = these_opts,
inputs = [
Option("frame-segments-file", config.source.frame_segments_file),
Option("veto-segments-file", config.filter.veto_segments_file),
Option("reference-psd", ref_psds[(ifo_combo, span)].files),
Option("time-slide-file", config.filter.time_slide_file),
Option("svd-bank", svd_bank_files),
],
outputs = [
Option("output", trigger_group.files),
Option("ranking-stat-output", dist_stat_files),
],
)
else:
ref_psds = ref_psd_cache.groupby("ifo", "time")
svd_banks = svd_bank_cache.groupby("ifo", "bin")
dist_stats = dist_stat_cache.groupby("ifo", "time", "bin")
for (ifo_combo, span, svd_bin), triggers in trigger_cache.groupby("ifo", "time", "bin").items():
ifos = config.to_ifo_list(ifo_combo)
start, end = span
filter_opts = [
Option("ht-gate-threshold", calc_gate_threshold(config, svd_bin)),
Option("gps-start-time", int(start)),
Option("gps-end-time", int(end)),
Option("channel-name", dagutil.format_ifo_args(ifos, config.source.channel_name)),
]
if config.source.frame_cache:
filter_opts.append(Option("frame-cache", config.source.frame_cache, track=False))
else:
filter_opts.extend([
Option("frame-type", dagutil.format_ifo_args(ifos, config.source.frame_type)),
Option("data-find-server", config.source.data_find_server),
])
ref_psds = ref_psd_cache.groupby("ifo", "time")
svd_banks = svd_bank_cache.groupby("ifo", "bin")
dist_stats = dist_stat_cache.groupby("ifo", "time", "bin")
for (ifo_combo, span), triggers in trigger_cache.groupby("ifo", "time").items():
ifos = config.to_ifo_list(ifo_combo)
start, end = span
filter_opts = [
Option("gps-start-time", int(start)),
Option("gps-end-time", int(end)),
Option("channel-name", dagutil.format_ifo_args(ifos, config.source.channel_name)),
Option("frame-type", dagutil.format_ifo_args(ifos, config.source.frame_type)),
]
if config.source.frame_cache:
filter_opts.append(Option("frame-cache", config.source.frame_cache, track=False))
else:
filter_opts.extend([
Option("frame-type", dagutil.format_ifo_args(ifos, config.source.frame_type)),
Option("data-find-server", config.source.data_find_server),
])
filter_opts.extend(common_opts)
filter_opts.extend(common_opts)
for trigger_group in triggers.chunked(num_per_group):
svd_bins = trigger_group.groupby("bin").keys()
thresholds = [calc_gate_threshold(config, svd_bin) for svd_bin in svd_bins]
these_opts = [Option("ht-gate-threshold", thresholds), *filter_opts]
svd_bank_files = dagutil.flatten(
[svd_banks[(ifo, svd_bin)].files for ifo in ifos for svd_bin in svd_bins]
)
dist_stat_files = dagutil.flatten(
[dist_stats[(ifo_combo, span, svd_bin)].files for svd_bin in svd_bins]
)
svd_bank_files = dagutil.flatten([svd_banks[(ifo, svd_bin)].files for ifo in ifos])
layer += Node(
arguments=filter_opts,
arguments = these_opts,
inputs = [
Option("frame-segments-file", config.source.frame_segments_file),
Option("veto-segments-file", config.filter.veto_segments_file),
@@ -284,8 +248,8 @@ def filter_layer(config, dag, ref_psd_cache, svd_bank_cache):
Option("svd-bank", svd_bank_files),
],
outputs = [
Option("output", triggers.files),
Option("ranking-stat-output", dist_stats[(ifo_combo, span, svd_bin)].files),
Option("output", trigger_group.files),
Option("ranking-stat-output", dist_stat_files),
],
)
@@ -332,95 +296,60 @@ def filter_injections_layer(config, dag, ref_psd_cache, svd_bank_cache):
if config.condor.singularity_image:
common_opts.append(Option("disable-service-discovery"))
# if using file transfer, do not checkpoint or group SVD bins together
# NOTE: not decided whether this is optimal, and should be revisited
# if file transfer is disabled, checkpoint by grouping SVD bins together
# and enable local frame caching if there if more than one SVD bin per group
max_concurrency = 20
# checkpoint by grouping SVD bins together + enable local
# frame caching if there if more than one SVD bin per group
if config.condor.transfer_files:
max_concurrency = 10
else:
max_concurrency = 20
num_per_group = 1 + (len(config.svd.bins) // max_concurrency)
if not config.condor.transfer_files and num_per_group > 1:
if num_per_group > 1:
common_opts.append(Option("local-frame-caching"))
ref_psds = ref_psd_cache.groupby("ifo", "time")
svd_banks = svd_bank_cache.groupby("ifo", "bin")
for (ifo_combo, span, inj_type), triggers in trigger_cache.groupby("ifo", "time", "subtype").items():
ifos = config.to_ifo_list(ifo_combo)
start, end = span
ref_psds = ref_psd_cache.groupby("ifo", "time")
svd_banks = svd_bank_cache.groupby("ifo", "bin")
for (ifo_combo, span, inj_type), triggers in trigger_cache.groupby("ifo", "time", "subtype").items():
ifos = config.to_ifo_list(ifo_combo)
start, end = span
filter_opts = [
Option("gps-start-time", int(start)),
Option("gps-end-time", int(end)),
Option("channel-name", dagutil.format_ifo_args(ifos, config.source.channel_name)),
]
if config.source.frame_cache:
filter_opts.append(Option("frame-cache", config.source.frame_cache, track=False))
else:
filter_opts.extend([
Option("frame-type", dagutil.format_ifo_args(ifos, config.source.frame_type)),
Option("data-find-server", config.source.data_find_server),
])
filter_opts.extend(common_opts)
injection_file = config.filter.injections[inj_type.lower()]["file"]
for trigger_group in triggers.chunked(num_per_group):
svd_bins = trigger_group.groupby("bin").keys()
thresholds = [calc_gate_threshold(config, svd_bin) for svd_bin in svd_bins]
these_opts = [Option("ht-gate-threshold", thresholds), *filter_opts]
svd_bank_files = dagutil.flatten(
[svd_banks[(ifo, svd_bin)].files for ifo in ifos for svd_bin in svd_bins]
)
layer += Node(
arguments = these_opts,
inputs = [
Option("frame-segments-file", config.source.frame_segments_file),
Option("veto-segments-file", config.filter.veto_segments_file),
Option("reference-psd", ref_psds[(ifo_combo, span)].files),
Option("time-slide-file", config.filter.time_slide_file),
Option("svd-bank", svd_bank_files),
Option("injections", injection_file),
],
outputs = Option("output", trigger_group.files),
)
else:
ref_psds = ref_psd_cache.groupby("ifo", "time")
svd_banks = svd_bank_cache.groupby("ifo", "bin")
for (ifo_combo, span, svd_bin, inj_type), triggers in trigger_cache.groupby("ifo", "time", "bin", "subtype").items():
ifos = config.to_ifo_list(ifo_combo)
start, end = span
filter_opts = [
Option("ht-gate-threshold", calc_gate_threshold(config, svd_bin)),
Option("gps-start-time", int(start)),
Option("gps-end-time", int(end)),
Option("channel-name", dagutil.format_ifo_args(ifos, config.source.channel_name)),
]
if config.source.frame_cache:
filter_opts.append(Option("frame-cache", config.source.frame_cache, track=False))
else:
filter_opts.extend([
Option("frame-type", dagutil.format_ifo_args(ifos, config.source.frame_type)),
Option("data-find-server", config.source.data_find_server),
])
filter_opts.extend(common_opts)
filter_opts = [
Option("gps-start-time", int(start)),
Option("gps-end-time", int(end)),
Option("channel-name", dagutil.format_ifo_args(ifos, config.source.channel_name)),
Option("frame-type", dagutil.format_ifo_args(ifos, config.source.frame_type)),
]
if config.source.frame_cache:
filter_opts.append(Option("frame-cache", config.source.frame_cache, track=False))
else:
filter_opts.extend([
Option("frame-type", dagutil.format_ifo_args(ifos, config.source.frame_type)),
Option("data-find-server", config.source.data_find_server),
])
filter_opts.extend(common_opts)
injection_file = config.filter.injections[inj_type.lower()]["file"]
svd_bank_files = dagutil.flatten([svd_banks[(ifo, svd_bin)].files for ifo in ifos])
injection_file = config.filter.injections[inj_type.lower()]["file"]
for trigger_group in triggers.chunked(num_per_group):
svd_bins = trigger_group.groupby("bin").keys()
thresholds = [calc_gate_threshold(config, svd_bin) for svd_bin in svd_bins]
these_opts = [Option("ht-gate-threshold", thresholds), *filter_opts]
svd_bank_files = dagutil.flatten(
[svd_banks[(ifo, svd_bin)].files for ifo in ifos for svd_bin in svd_bins]
)
layer += Node(
arguments=filter_opts,
arguments = these_opts,
inputs = [
Option("frame-segments-file", config.source.frame_segments_file),
Option("veto-segments-file", config.filter.veto_segments_file),
Option("reference-psd", ref_psds[(ifo_combo, span)].files),
Option("time-slide-file", config.filter.injection_time_slide_file),
Option("time-slide-file", config.filter.time_slide_file),
Option("svd-bank", svd_bank_files),
Option("injections", injection_file),
],
outputs=Option("output", triggers.files),
outputs = Option("output", trigger_group.files),
)
dag.attach(layer)
Loading