Skip to content
Snippets Groups Projects
Commit 3218ad43 authored by Duncan Meacher's avatar Duncan Meacher
Browse files

gstlal_ll_inspiral_pipe: Added lvshm funtionality

parent bb73ba03
No related branches found
No related tags found
No related merge requests found
...@@ -65,62 +65,6 @@ from lal.utils import CacheEntry ...@@ -65,62 +65,6 @@ from lal.utils import CacheEntry
# lvalert_listen [style=filled, color=lightgrey, URL="https://www.lsc-group.phys.uwm.edu/daswg/docs/howto/lvalert-howto.html"]; # lvalert_listen [style=filled, color=lightgrey, URL="https://www.lsc-group.phys.uwm.edu/daswg/docs/howto/lvalert-howto.html"];
# } # }
# @enddot # @enddot
#
# ### Usage cases
#
# - Typical usage case
#
# ### Command line options
#
# "--psd-fft-length", metavar = "s", default = 32, type = "int", help = "FFT length, default 32s. Note that 50% will be used for zero-padding.")
# "--reference-psd", metavar = "filename", help = "Set the reference psd file.")
# "--bank-cache", metavar = "filenames", help = "Set the bank cache files in format H1=H1.cache,H2=H2.cache, etc..")
# "--channel-name", metavar = "name", default=[], action = "append", help = "Set the name of the channel to process (optional). The default is \"LSC-STRAIN\" for all detectors. Override with IFO=CHANNEL-NAME can be given multiple times")
# "--state-channel-name", metavar = "name", default=[], action = "append", help = "Set the name of the state channel to process (required).")
# "--dq-channel-name", metavar = "name", default=[], action = "append", help = "Set the name of the DQ channel to process (required).")
# "--framexmit-addr", metavar = "name", default=[], action = "append", help = "Set the framexmit address to process (required). IFO=ADDR:port can be given multiple times.")
# "--framexmit-iface", metavar = "name", default = "10.14.0.1", help = "Set the interface address to process (required). default 10.14.0.1")
# "--inj-channel-name", metavar = "name", default=[], action = "append", help = "Set the name of the injection channel to process (optional). IFO=CHANNEL-NAME can be given multiple times.")
# "--inj-state-channel-name", metavar = "name", default=[], action = "append", help = "Set the name of the injection state channel to process (required if --inj-channel-name set).")
# "--inj-dq-channel-name", metavar = "name", default=[], action = "append", help = "Set the name of the injection DQ channel to process (required if --inj-channel-name set).")
# "--inj-framexmit-addr", metavar = "name", default=[], action = "append", help = "Set the framexmit address to process for the injection stream (required if --inj-channel-name set). IFO=ADDR:port can be given multiple times.")
# "--inj-framexmit-iface", metavar = "name", default "10.14.0.1", action = "append", help = "Set the interface address to process for injections (required if --inj-channel-name set). default 10.14.0.1")
# "--ht-gate-threshold", metavar = "float", help = "Set the h(t) gate threshold to reject glitches", type="float")
# "--ht-gate-threshold-linear", metavar = "string", help = "Set the scale for h(t) gate threshold to reject glitches", type="string". set as mchirp_min:ht_gate_threshold_min-mchirp_max:ht_gate_threshold_max (example: --ht-gate-threshold-linear 0.8:12.0-45.0:100.0)
# "--max-jobs", metavar = "num", type = "int", help = "stop parsing the cache after reaching a certain number of jobs to limit what is submitted to the HTCondor pool")
# "--likelihood-cache", help = "set the cache containin likelihood files")
# "--zerolag-likelihood-cache", help = "set the cache containin zerolag likelihood files")
# "--marginalized-likelihood-file", help = "set the marginalized likelihood file, required")
# "--control-peak-time", default = 4, metavar = "secs", help = "set the control peak time, default 4")
# "--fir-stride", default = 4, metavar = "secs", help = "set the fir bank stride, default 4")
# "--thinca-interval", default = 10, metavar = "secs", help = "set the thinca interval, default 10")
# "--gracedb-far-threshold", type = "float", help = "false alarm rate threshold for gracedb (Hz), if not given gracedb events are not sent")
# "--gracedb-search", default = "LowMass", help = "gracedb type, default LowMass")
# "--gracedb-pipeline", default = "gstlal", help = "gracedb type, default gstlal")
# "--gracedb-group", default = "Test", help = "gracedb group, default Test")
# "--gracedb-service-url", default = "https://gracedb.ligo.org/api/", help = "GraceDb service url, default https://gracedb.ligo.org/api/")
# "--inj-gracedb-far-threshold", type = "float", help = "false alarm rate threshold for gracedb (Hz), if not given gracedb events are not sent (for injection stream)")
# "--inj-gracedb-group", default = "Test", help = "gracedb group, default Test (for injection stream)")
# "--inj-gracedb-search", default = "LowMass", help = "gracedb type, default LowMass (for injection stream)")
# "--inj-gracedb-pipeline", default = "gstlal", help = "gracedb type, default gstlal (for injection stream)")
# "--inj-gracedb-service-url", default = "https://simdb.cgca.uwm.edu/api/", help = "GraceDb service url, default https://simdb.cgca.uwm.edu/api/ (for injection stream)")
# "--data-source", metavar = "[lvshm|]", default = "lvshm", help = "Where to get the data from. Default lvshm")
# "--veto-segments-file", metavar = "filename", help = "Set the name of the LIGO light-weight XML file from which to load vetoes (optional).")
# "--veto-segments-name", metavar = "name", help = "Set the name of the segments to extract from the segment tables and use as the veto list.", default = "vetoes")
# "--state-vector-on-bits", metavar = "name", default = [], action = "append", help = "Set the state vector on bits to process (optional). The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times")
# "--state-vector-off-bits", metavar = "name", default = [], action = "append", help = "Set the state vector off bits to process (optional). The default is 0x160 for all detectors. Override with IFO=bits can be given multiple times")
# "--inj-state-vector-on-bits", metavar = "name", default = [], action = "append", help = "Set the state vector on bits to process (optional). The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times (for injection stream)")
# "--inj-state-vector-off-bits", metavar = "name", default = [], action = "append", help = "Set the state vector off bits to process (optional). The default is 0x160 for all detectors. Override with IFO=bits can be given multiple times (for injection stream)")
# "--dq-vector-on-bits", metavar = "name", default = [], action = "append", help = "Set the DQ vector on bits to process (optional). The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times")
# "--dq-vector-off-bits", metavar = "name", default = [], action = "append", help = "Set the DQ vector off bits to process (optional). The default is 0x160 for all detectors. Override with IFO=bits can be given multiple times")
# "--dq-state-vector-on-bits", metavar = "name", default = [], action = "append", help = "Set the DQ vector on bits to process (optional). The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times (for injection stream)")
# "--inj-dq-vector-off-bits", metavar = "name", default = [], action = "append", help = "Set the DQ vector off bits to process (optional). The default is 0x160 for all detectors. Override with IFO=bits can be given multiple times (for injection stream)")
# "--lvalert-listener-program", action = "append", default = [], metavar = "program", help = "set the programs to respond to lvalerts from this analysis, can be given multiple times")
# "--coincidence-threshold", metavar = "value", type = "float", default = 0.005, help = "Set the coincidence window in seconds (default = 0.005). The light-travel time between instruments will be added automatically in the coincidence test.")
# "--likelihood-snapshot-interval", type = "float", metavar = "seconds", help = "How often to reread the marginalized likelihoood data and snapshot the trigger files.")
# "--non-inspiral-condor-command", action = "append", default = [], metavar = "command=value", help = "set condor commands of the form command=value can be given multiple times")
# "--inspiral-condor-command", action = "append", default = [], metavar = "command=value", help = "set condor commands of the form command=value for inspiral jobs can be given multiple times")
class lvalert_listen_job(inspiral_pipe.generic_job): class lvalert_listen_job(inspiral_pipe.generic_job):
""" """
...@@ -247,16 +191,15 @@ group.initial.rebalance.delay.ms=0 ...@@ -247,16 +191,15 @@ group.initial.rebalance.delay.ms=0
def parse_command_line(): def parse_command_line():
parser = OptionParser(description = __doc__) parser = OptionParser(description = __doc__)
# append all the datasource specific options
datasource.append_options(parser)
parser.add_option("--psd-fft-length", metavar = "s", default = 32, type = "int", help = "FFT length, default 32s. Note that 50% will be used for zero-padding.") parser.add_option("--psd-fft-length", metavar = "s", default = 32, type = "int", help = "FFT length, default 32s. Note that 50% will be used for zero-padding.")
parser.add_option("--reference-psd", metavar = "filename", help = "Set the reference psd file.") parser.add_option("--reference-psd", metavar = "filename", help = "Set the reference psd file.")
parser.add_option("--bank-cache", metavar = "filenames", help = "Set the bank cache files in format H1=H1.cache,H2=H2.cache, etc..") parser.add_option("--bank-cache", metavar = "filenames", help = "Set the bank cache files in format H1=H1.cache,H2=H2.cache, etc..")
parser.add_option("--min-instruments", metavar = "count", type = "int", default = 2, help = "Set the minimum number of instruments that must contribute triggers to form a candidate (default = 2).") parser.add_option("--min-instruments", metavar = "count", type = "int", default = 2, help = "Set the minimum number of instruments that must contribute triggers to form a candidate (default = 2).")
parser.add_option("--min-log-L", metavar = "log likelihood ratio", type = "float", help = "Discard candidates that get assigned log likelihood ratios below this threshold (default = keep all).") parser.add_option("--min-log-L", metavar = "log likelihood ratio", type = "float", help = "Discard candidates that get assigned log likelihood ratios below this threshold (default = keep all).")
parser.add_option("--channel-name", metavar = "name", default=[], action = "append", help = "Set the name of the channel to process (optional). The default is \"LSC-STRAIN\" for all detectors. Override with IFO=CHANNEL-NAME can be given multiple times")
parser.add_option("--state-channel-name", metavar = "name", default=[], action = "append", help = "Set the name of the state channel to process (required).")
parser.add_option("--dq-channel-name", metavar = "name", default=[], action = "append", help = "Set the name of the DQ channel to process (required).")
parser.add_option("--framexmit-addr", metavar = "name", default=[], action = "append", help = "Set the framexmit address to process (required). IFO=ADDR:port can be given multiple times.")
parser.add_option("--framexmit-iface", metavar = "name", help = "Set the interface address to process (required).")
parser.add_option("--inj-channel-name", metavar = "name", default=[], action = "append", help = "Set the name of the injection channel to process for given mass bins (optional). 0000:0002:IFO1=CHANNEL-NAME1,IFO2=CHANNEL-NAME2 can be given multiple times.") parser.add_option("--inj-channel-name", metavar = "name", default=[], action = "append", help = "Set the name of the injection channel to process for given mass bins (optional). 0000:0002:IFO1=CHANNEL-NAME1,IFO2=CHANNEL-NAME2 can be given multiple times.")
parser.add_option("--inj-state-channel-name", metavar = "name", default=[], action = "append", help = "Set the name of the injection state channel to process (required if --inj-channel-name set).") parser.add_option("--inj-state-channel-name", metavar = "name", default=[], action = "append", help = "Set the name of the injection state channel to process (required if --inj-channel-name set).")
parser.add_option("--inj-dq-channel-name", metavar = "name", default=[], action = "append", help = "Set the name of the injection DQ channel to process (required if --inj-channel-name set).") parser.add_option("--inj-dq-channel-name", metavar = "name", default=[], action = "append", help = "Set the name of the injection DQ channel to process (required if --inj-channel-name set).")
...@@ -281,15 +224,10 @@ def parse_command_line(): ...@@ -281,15 +224,10 @@ def parse_command_line():
parser.add_option("--inj-gracedb-pipeline", default = "gstlal", help = "gracedb type, default gstlal (for injection stream)") parser.add_option("--inj-gracedb-pipeline", default = "gstlal", help = "gracedb type, default gstlal (for injection stream)")
parser.add_option("--inj-gracedb-group", default = "Test", help = "gracedb group, default Test (for injection stream)") parser.add_option("--inj-gracedb-group", default = "Test", help = "gracedb group, default Test (for injection stream)")
parser.add_option("--inj-gracedb-service-url", default = "https://simdb.cgca.uwm.edu/api/", help = "GraceDb service url, default https://simdb.cgca.uwm.edu/api/ (for injection stream)") parser.add_option("--inj-gracedb-service-url", default = "https://simdb.cgca.uwm.edu/api/", help = "GraceDb service url, default https://simdb.cgca.uwm.edu/api/ (for injection stream)")
parser.add_option("--data-source", metavar = "[lvshm|]", default = "lvshm", help = "Where to get the data from. Default lvshm")
parser.add_option("--veto-segments-file", metavar = "filename", help = "Set the name of the LIGO light-weight XML file from which to load vetoes (optional).") parser.add_option("--veto-segments-file", metavar = "filename", help = "Set the name of the LIGO light-weight XML file from which to load vetoes (optional).")
parser.add_option("--veto-segments-name", metavar = "name", help = "Set the name of the segments to extract from the segment tables and use as the veto list.", default = "vetoes") parser.add_option("--veto-segments-name", metavar = "name", help = "Set the name of the segments to extract from the segment tables and use as the veto list.", default = "vetoes")
parser.add_option("--state-vector-on-bits", metavar = "name", default = [], action = "append", help = "Set the state vector on bits to process (optional). The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times")
parser.add_option("--state-vector-off-bits", metavar = "name", default = [], action = "append", help = "Set the state vector off bits to process (optional). The default is 0x160 for all detectors. Override with IFO=bits can be given multiple times")
parser.add_option("--inj-state-vector-on-bits", metavar = "name", default = [], action = "append", help = "Set the state vector on bits to process (optional). The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times (for injection stream)") parser.add_option("--inj-state-vector-on-bits", metavar = "name", default = [], action = "append", help = "Set the state vector on bits to process (optional). The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times (for injection stream)")
parser.add_option("--inj-state-vector-off-bits", metavar = "name", default = [], action = "append", help = "Set the state vector off bits to process (optional). The default is 0x160 for all detectors. Override with IFO=bits can be given multiple times (for injection stream)") parser.add_option("--inj-state-vector-off-bits", metavar = "name", default = [], action = "append", help = "Set the state vector off bits to process (optional). The default is 0x160 for all detectors. Override with IFO=bits can be given multiple times (for injection stream)")
parser.add_option("--dq-vector-on-bits", metavar = "name", default = [], action = "append", help = "Set the DQ vector on bits to process (optional). The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times")
parser.add_option("--dq-vector-off-bits", metavar = "name", default = [], action = "append", help = "Set the DQ vector off bits to process (optional). The default is 0x160 for all detectors. Override with IFO=bits can be given multiple times")
parser.add_option("--inj-dq-vector-on-bits", metavar = "name", default = [], action = "append", help = "Set the DQ vector on bits to process (optional). The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times (for injection stream)") parser.add_option("--inj-dq-vector-on-bits", metavar = "name", default = [], action = "append", help = "Set the DQ vector on bits to process (optional). The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times (for injection stream)")
parser.add_option("--inj-dq-vector-off-bits", metavar = "name", default = [], action = "append", help = "Set the DQ vector off bits to process (optional). The default is 0x160 for all detectors. Override with IFO=bits can be given multiple times (for injection stream)") parser.add_option("--inj-dq-vector-off-bits", metavar = "name", default = [], action = "append", help = "Set the DQ vector off bits to process (optional). The default is 0x160 for all detectors. Override with IFO=bits can be given multiple times (for injection stream)")
parser.add_option("--lvalert-listener-program", action = "append", default = [], metavar = "program", help = "set the programs to respond to lvalerts from this analysis, can be given multiple times") parser.add_option("--lvalert-listener-program", action = "append", default = [], metavar = "program", help = "set the programs to respond to lvalerts from this analysis, can be given multiple times")
...@@ -307,6 +245,12 @@ def parse_command_line(): ...@@ -307,6 +245,12 @@ def parse_command_line():
options, filenames = parser.parse_args() options, filenames = parser.parse_args()
#
# extract data source configuration
#
datasourceinfo = datasource.GWDataSourceInfo(options)
fail = "" fail = ""
for option in ("bank_cache",): for option in ("bank_cache",):
if getattr(options, option) is None: if getattr(options, option) is None:
...@@ -318,13 +262,16 @@ def parse_command_line(): ...@@ -318,13 +262,16 @@ def parse_command_line():
else: else:
inj_name_dict = {} inj_name_dict = {}
if options.data_source not in datasourceinfo.live_sources :
raise ValueError("datasource option not supported for online analysis. Only framexmit and lvshm are supported.")
#FIXME add consistency check? #FIXME add consistency check?
bankcache = inspiral_pipe.parse_cache_str(options.bank_cache) bankcache = inspiral_pipe.parse_cache_str(options.bank_cache)
channel_dict = datasource.channel_dict_from_channel_list(options.channel_name) channel_dict = datasourceinfo.channel_dict
state_channel_dict = datasource.channel_dict_from_channel_list(options.state_channel_name) state_channel_dict = datasourceinfo.state_channel_dict
dq_channel_dict = datasource.channel_dict_from_channel_list(options.dq_channel_name) dq_channel_dict = datasourceinfo.dq_channel_dict
framexmit_dict = datasource.framexmit_ports['CIT'] # set the default framexmit_dict = datasourceinfo.framexmit_addr
framexmit_dict.update(datasource.framexmit_dict_from_framexmit_list(options.framexmit_addr)) shared_memory_partition_dict = datasourceinfo.shm_part_dict
inj_channel_dict = datasource.channel_dict_from_channel_list_with_node_range(options.inj_channel_name) inj_channel_dict = datasource.channel_dict_from_channel_list_with_node_range(options.inj_channel_name)
inj_state_channel_dict = datasource.channel_dict_from_channel_list(options.inj_state_channel_name) inj_state_channel_dict = datasource.channel_dict_from_channel_list(options.inj_state_channel_name)
inj_dq_channel_dict = datasource.channel_dict_from_channel_list(options.inj_dq_channel_name) inj_dq_channel_dict = datasource.channel_dict_from_channel_list(options.inj_dq_channel_name)
...@@ -341,13 +288,13 @@ def parse_command_line(): ...@@ -341,13 +288,13 @@ def parse_command_line():
if not ( set(inj_channel_dict[nodes].keys()) == set(channel_dict.keys()) ): if not ( set(inj_channel_dict[nodes].keys()) == set(channel_dict.keys()) ):
raise ValueError("Either no injection jobs must be given or the injection and non-injection channels must be specified for the same set of detectors") raise ValueError("Either no injection jobs must be given or the injection and non-injection channels must be specified for the same set of detectors")
options.state_vector_on_off_dict = datasource.state_vector_on_off_dict_from_bit_lists(options.state_vector_on_bits, options.state_vector_off_bits) options.state_vector_on_off_dict = datasourceinfo.state_vector_on_off_bits
options.dq_vector_on_off_dict = datasource.state_vector_on_off_dict_from_bit_lists(options.dq_vector_on_bits, options.dq_vector_off_bits) options.dq_vector_on_off_dict = datasourceinfo.dq_vector_on_off_bits
options.likelihood_files = [CacheEntry(line).url for line in open(options.likelihood_cache)] options.likelihood_files = [CacheEntry(line).url for line in open(options.likelihood_cache)]
options.zerolag_likelihood_files = [CacheEntry(line).url for line in open(options.zerolag_likelihood_cache)] options.zerolag_likelihood_files = [CacheEntry(line).url for line in open(options.zerolag_likelihood_cache)]
return options, filenames, bankcache, channel_dict, dq_channel_dict, state_channel_dict, framexmit_dict, inj_channel_dict, inj_dq_channel_dict, inj_state_channel_dict, inj_framexmit_dict, inj_name_dict, inj_range_dict return options, filenames, bankcache, channel_dict, dq_channel_dict, state_channel_dict, framexmit_dict, shared_memory_partition_dict, inj_channel_dict, inj_dq_channel_dict, inj_state_channel_dict, inj_framexmit_dict, inj_name_dict, inj_range_dict
# #
...@@ -355,7 +302,7 @@ def parse_command_line(): ...@@ -355,7 +302,7 @@ def parse_command_line():
# #
options, filenames, bank_cache, channel_dict, dq_channel_dict, state_channel_dict, framexmit_dict, inj_channel_dict, inj_dq_channel_dict, inj_state_channel_dict, inj_framexmit_dict, inj_name_dict, inj_range_dict = parse_command_line() options, filenames, bank_cache, channel_dict, dq_channel_dict, state_channel_dict, framexmit_dict, shared_memory_partition_dict, inj_channel_dict, inj_dq_channel_dict, inj_state_channel_dict, inj_framexmit_dict, inj_name_dict, inj_range_dict = parse_command_line()
try: os.mkdir("logs") try: os.mkdir("logs")
except: pass except: pass
...@@ -413,8 +360,19 @@ for ifo in channel_dict: ...@@ -413,8 +360,19 @@ for ifo in channel_dict:
os.makedirs(outpath) os.makedirs(outpath)
except OSError: except OSError:
pass pass
inspiral_pipe.generic_node(dqJob, dag, [],
opts = {"psd-fft-length":options.psd_fft_length, # Data source dag options
if (options.data_source == "framexmit"):
datasource_opts = {"framexmit-addr":datasource.framexmit_list_from_framexmit_dict({ifo: framexmit_dict[ifo]}),
"framexmit-iface":options.framexmit_iface
}
else :
datasource_opts = {"shared-memory-partition":datasource.pipeline_channel_list_from_channel_dict({ifo: shared_memory_partition_dict[ifo]}),
"shared-memory-block-size":options.shared_memory_block_size,
"shared-memory-assumed-duration":options.shared_memory_assumed_duration
}
common_opts = {"psd-fft-length":options.psd_fft_length,
"channel-name":datasource.pipeline_channel_list_from_channel_dict({ifo: channel_dict[ifo]}), "channel-name":datasource.pipeline_channel_list_from_channel_dict({ifo: channel_dict[ifo]}),
"state-channel-name":datasource.pipeline_channel_list_from_channel_dict({ifo: state_channel_dict[ifo]}, opt = "state-channel-name"), "state-channel-name":datasource.pipeline_channel_list_from_channel_dict({ifo: state_channel_dict[ifo]}, opt = "state-channel-name"),
"dq-channel-name":datasource.pipeline_channel_list_from_channel_dict({ifo: dq_channel_dict[ifo]}, opt = "dq-channel-name"), "dq-channel-name":datasource.pipeline_channel_list_from_channel_dict({ifo: dq_channel_dict[ifo]}, opt = "dq-channel-name"),
...@@ -422,12 +380,11 @@ for ifo in channel_dict: ...@@ -422,12 +380,11 @@ for ifo in channel_dict:
"state-vector-off-bits":options.state_vector_off_bits, "state-vector-off-bits":options.state_vector_off_bits,
"dq-vector-on-bits":options.dq_vector_on_bits, "dq-vector-on-bits":options.dq_vector_on_bits,
"dq-vector-off-bits":options.dq_vector_off_bits, "dq-vector-off-bits":options.dq_vector_off_bits,
"framexmit-addr":datasource.framexmit_list_from_framexmit_dict({ifo: framexmit_dict[ifo]}),
"framexmit-iface":options.framexmit_iface,
"data-source":options.data_source, "data-source":options.data_source,
"out-path": outpath "out-path": outpath
} }
) common_opts.update(datasource_opts)
inspiral_pipe.generic_node(dqJob, dag, [], opts = common_opts)
# #
# loop over banks to run gstlal inspiral pre clustering and far computation # loop over banks to run gstlal inspiral pre clustering and far computation
...@@ -460,39 +417,51 @@ for num_insp_nodes, (svd_banks, likefile, zerolikefile) in enumerate(zip(bank_gr ...@@ -460,39 +417,51 @@ for num_insp_nodes, (svd_banks, likefile, zerolikefile) in enumerate(zip(bank_gr
if options.ht_gate_threshold is not None: if options.ht_gate_threshold is not None:
threshold_values = [options.ht_gate_threshold] * len(svd_banks.items()[0][1]) # Use the ht-gate-threshold value given threshold_values = [options.ht_gate_threshold] * len(svd_banks.items()[0][1]) # Use the ht-gate-threshold value given
# Data source dag options
if (options.data_source == "framexmit"):
datasource_opts = {"framexmit-addr":datasource.framexmit_list_from_framexmit_dict(framexmit_dict),
"framexmit-iface":options.framexmit_iface
}
else :
datasource_opts = {"shared-memory-partition":datasource.pipeline_channel_list_from_channel_dict(shared_memory_partition_dict, opt = "shared-memory-partition"),
"shared-memory-block-size":options.shared_memory_block_size,
"shared-memory-assumed-duration":options.shared_memory_assumed_duration
}
common_opts = {"psd-fft-length":options.psd_fft_length,
"reference-psd":options.reference_psd,
"ht-gate-threshold":threshold_values,
"channel-name":datasource.pipeline_channel_list_from_channel_dict(channel_dict),
"state-channel-name":datasource.pipeline_channel_list_from_channel_dict(state_channel_dict, opt = "state-channel-name"),
"dq-channel-name":datasource.pipeline_channel_list_from_channel_dict(dq_channel_dict, opt = "dq-channel-name"),
"state-vector-on-bits":options.state_vector_on_bits,
"state-vector-off-bits":options.state_vector_off_bits,
"dq-vector-on-bits":options.dq_vector_on_bits,
"dq-vector-off-bits":options.dq_vector_off_bits,
"svd-bank":svd_bank_string,
"tmp-space":inspiral_pipe.condor_scratch_space(),
"track-psd":"",
"control-peak-time":options.control_peak_time,
"coincidence-threshold":options.coincidence_threshold,
"fir-stride":options.fir_stride,
"data-source":options.data_source,
"gracedb-far-threshold":options.gracedb_far_threshold,
"gracedb-group":options.gracedb_group,
"gracedb-pipeline":options.gracedb_pipeline,
"gracedb-search":options.gracedb_search,
"gracedb-service-url":options.gracedb_service_url,
"thinca-interval":options.thinca_interval,
"job-tag":jobTags[-1],
"likelihood-snapshot-interval":options.likelihood_snapshot_interval,
"min-instruments":options.min_instruments,
"min-log-L":options.min_log_L,
"time-slide-file":options.time_slide_file,
"output-kafka-server": options.output_kafka_server
}
common_opts.update(datasource_opts)
inspNode = inspiral_pipe.generic_node(gstlalInspiralJob, dag, [], inspNode = inspiral_pipe.generic_node(gstlalInspiralJob, dag, [],
opts = {"psd-fft-length":options.psd_fft_length, opts = common_opts,
"reference-psd":options.reference_psd,
"ht-gate-threshold":threshold_values,
"channel-name":datasource.pipeline_channel_list_from_channel_dict(channel_dict),
"state-channel-name":datasource.pipeline_channel_list_from_channel_dict(state_channel_dict, opt = "state-channel-name"),
"dq-channel-name":datasource.pipeline_channel_list_from_channel_dict(dq_channel_dict, opt = "dq-channel-name"),
"state-vector-on-bits":options.state_vector_on_bits,
"state-vector-off-bits":options.state_vector_off_bits,
"dq-vector-on-bits":options.dq_vector_on_bits,
"dq-vector-off-bits":options.dq_vector_off_bits,
"framexmit-addr":datasource.framexmit_list_from_framexmit_dict(framexmit_dict),
"framexmit-iface":options.framexmit_iface,
"svd-bank":svd_bank_string,
"tmp-space":inspiral_pipe.condor_scratch_space(),
"track-psd":"",
"control-peak-time":options.control_peak_time,
"coincidence-threshold":options.coincidence_threshold,
"fir-stride":options.fir_stride,
"data-source":options.data_source,
"gracedb-far-threshold":options.gracedb_far_threshold,
"gracedb-group":options.gracedb_group,
"gracedb-pipeline":options.gracedb_pipeline,
"gracedb-search":options.gracedb_search,
"gracedb-service-url":options.gracedb_service_url,
"thinca-interval":options.thinca_interval,
"job-tag":jobTags[-1],
"likelihood-snapshot-interval":options.likelihood_snapshot_interval,
"min-instruments":options.min_instruments,
"min-log-L":options.min_log_L,
"time-slide-file":options.time_slide_file,
"output-kafka-server": options.output_kafka_server
},
input_files = { input_files = {
"ranking-stat-input":[likefile], "ranking-stat-input":[likefile],
"ranking-stat-pdf":options.marginalized_likelihood_file "ranking-stat-pdf":options.marginalized_likelihood_file
...@@ -511,38 +480,50 @@ for num_insp_nodes, (svd_banks, likefile, zerolikefile) in enumerate(zip(bank_gr ...@@ -511,38 +480,50 @@ for num_insp_nodes, (svd_banks, likefile, zerolikefile) in enumerate(zip(bank_gr
# do this in the future, this system was just the simplest to start # do this in the future, this system was just the simplest to start
# with # with
inj_jobTags.append("%04d" % (num_insp_nodes + 1000)) inj_jobTags.append("%04d" % (num_insp_nodes + 1000))
# Data source dag options
if (options.data_source == "framexmit"):
datasource_opts = {"framexmit-addr":datasource.framexmit_list_from_framexmit_dict({ifo: framexmit_dict[ifo]}),
"framexmit-iface":options.framexmit_iface
}
else :
datasource_opts = {"shared-memory-partition":datasource.pipeline_channel_list_from_channel_dict({ifo: shared_memory_partition_dict[ifo]}),
"shared-memory-block-size":options.shared_memory_block_size,
"shared-memory-assumed-duration":options.shared_memory_assumed_duration
}
common_opts = {"psd-fft-length":options.psd_fft_length,
"reference-psd":options.reference_psd,
"ht-gate-threshold":threshold_values,
"channel-name":datasource.pipeline_channel_list_from_channel_dict_with_node_range(inj_channel_dict, node = jobTags[-1]),
"state-channel-name":datasource.pipeline_channel_list_from_channel_dict(inj_state_channel_dict, opt = "state-channel-name"),
"dq-channel-name":datasource.pipeline_channel_list_from_channel_dict(inj_dq_channel_dict, opt = "dq-channel-name"),
"state-vector-on-bits":options.inj_state_vector_on_bits,
"state-vector-off-bits":options.inj_state_vector_off_bits,
"dq-vector-on-bits":options.inj_dq_vector_on_bits,
"dq-vector-off-bits":options.inj_dq_vector_off_bits,
"svd-bank":svd_bank_string,
"tmp-space":inspiral_pipe.condor_scratch_space(),
"track-psd":"",
"control-peak-time":options.control_peak_time,
"coincidence-threshold":options.coincidence_threshold,
"fir-stride":options.fir_stride,
"data-source":options.data_source,
"gracedb-far-threshold":options.inj_gracedb_far_threshold,
"gracedb-group":options.inj_gracedb_group,
"gracedb-pipeline":options.inj_gracedb_pipeline,
"gracedb-search":options.inj_gracedb_search,
"gracedb-service-url":options.inj_gracedb_service_url,
"thinca-interval":options.thinca_interval,
"job-tag":inj_jobTags[-1],
"likelihood-snapshot-interval":options.likelihood_snapshot_interval,
"min-instruments":options.min_instruments,
"min-log-L":options.min_log_L,
"time-slide-file":options.time_slide_file
}
common_opts.update(datasource_opts)
inspInjNode = inspiral_pipe.generic_node(gstlalInspiralInjJob, dag, [], inspInjNode = inspiral_pipe.generic_node(gstlalInspiralInjJob, dag, [],
opts = {"psd-fft-length":options.psd_fft_length, opts = common_opts,
"reference-psd":options.reference_psd,
"ht-gate-threshold":threshold_values,
"channel-name":datasource.pipeline_channel_list_from_channel_dict_with_node_range(inj_channel_dict, node = jobTags[-1]),
"state-channel-name":datasource.pipeline_channel_list_from_channel_dict(inj_state_channel_dict, opt = "state-channel-name"),
"dq-channel-name":datasource.pipeline_channel_list_from_channel_dict(inj_dq_channel_dict, opt = "dq-channel-name"),
"state-vector-on-bits":options.inj_state_vector_on_bits,
"state-vector-off-bits":options.inj_state_vector_off_bits,
"dq-vector-on-bits":options.inj_dq_vector_on_bits,
"dq-vector-off-bits":options.inj_dq_vector_off_bits,
"framexmit-addr":datasource.framexmit_list_from_framexmit_dict(inj_framexmit_dict),
"framexmit-iface":options.inj_framexmit_iface,
"svd-bank":svd_bank_string,
"tmp-space":inspiral_pipe.condor_scratch_space(),
"track-psd":"",
"control-peak-time":options.control_peak_time,
"coincidence-threshold":options.coincidence_threshold,
"fir-stride":options.fir_stride,
"data-source":options.data_source,
"gracedb-far-threshold":options.inj_gracedb_far_threshold,
"gracedb-group":options.inj_gracedb_group,
"gracedb-pipeline":options.inj_gracedb_pipeline,
"gracedb-search":options.inj_gracedb_search,
"gracedb-service-url":options.inj_gracedb_service_url,
"thinca-interval":options.thinca_interval,
"job-tag":inj_jobTags[-1],
"likelihood-snapshot-interval":options.likelihood_snapshot_interval,
"min-instruments":options.min_instruments,
"min-log-L":options.min_log_L,
"time-slide-file":options.time_slide_file
},
input_files = { input_files = {
"ranking-stat-input":[likefile], "ranking-stat-input":[likefile],
"ranking-stat-pdf":options.marginalized_likelihood_file "ranking-stat-pdf":options.marginalized_likelihood_file
...@@ -583,8 +564,6 @@ else: ...@@ -583,8 +564,6 @@ else:
pageNode = inspiral_pipe.generic_node(pageJob, dag, [], opts = {"directory":".", "web-dir": options.web_dir}, input_files = {"":jobTags}, output_files = {}) pageNode = inspiral_pipe.generic_node(pageJob, dag, [], opts = {"directory":".", "web-dir": options.web_dir}, input_files = {"":jobTags}, output_files = {})
if options.state_backup_destination: if options.state_backup_destination:
stateNode = inspiral_pipe.generic_node(stateJob, dag, [], opts = {}, input_files = {"":[options.state_backup_destination, options.marginalized_likelihood_file] + options.likelihood_files}, output_files = {}) stateNode = inspiral_pipe.generic_node(stateJob, dag, [], opts = {}, input_files = {"":[options.state_backup_destination, options.marginalized_likelihood_file] + options.likelihood_files}, output_files = {})
......
...@@ -444,7 +444,7 @@ class GWDataSourceInfo(object): ...@@ -444,7 +444,7 @@ class GWDataSourceInfo(object):
self.channel_dict = channel_dict_from_channel_list(options.channel_name) self.channel_dict = channel_dict_from_channel_list(options.channel_name)
## A dictionary for shared memory partition, e.g., {"H1": "LHO_Data", "H2": "LHO_Data", "L1": "LLO_Data", "V1": "VIRGO_Data"} ## A dictionary for shared memory partition, e.g., {"H1": "LHO_Data", "H2": "LHO_Data", "L1": "LLO_Data", "V1": "VIRGO_Data"}
self.shm_part_dict = {"H1": "LHO_Data", "H2": "LHO_Data", "L1": "LLO_Data", "V1": "VIRGO_Data"} self.shm_part_dict = {"H1": "LHO_Data", "L1": "LLO_Data", "V1": "VIRGO_Data"}
if options.shared_memory_partition is not None: if options.shared_memory_partition is not None:
self.shm_part_dict.update( channel_dict_from_channel_list(options.shared_memory_partition) ) self.shm_part_dict.update( channel_dict_from_channel_list(options.shared_memory_partition) )
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment