Skip to content
Snippets Groups Projects
Commit 6a456115 authored by Duncan Meacher's avatar Duncan Meacher
Browse files

gstlal_etg_pipe: Set segment gps times to be integer multiples of output file cadence

parent ed81d7f3
No related branches found
No related tags found
No related merge requests found
......@@ -47,6 +47,7 @@ from gstlal import dagparts as gstlaldagparts
from gstlal import datasource
from gstlal import multichannel_datasource
from gstlal import idq_multirate_datasource
from gstlal import idq_aggregator
class LIGOLWContentHandler(ligolw.LIGOLWContentHandler):
pass
......@@ -57,6 +58,38 @@ lsctables.use_in(LIGOLWContentHandler)
# get a dictionary of all the segments
#
def breakupseg(seg, maxextent, overlap):
if maxextent <= 0:
raise ValueError, "maxextent must be positive, not %s" % repr(maxextent)
# Simple case of only one segment
if abs(seg) < maxextent:
return segments.segmentlist([seg])
# adjust maxextent so that segments are divided roughly equally
maxextent = max(int(abs(seg) / (int(abs(seg)) // int(maxextent) + 1)), overlap)
maxextent = int(math.ceil(abs(seg) / math.ceil(abs(seg) / maxextent)))
end = seg[1]
seglist = segments.segmentlist()
while abs(seg):
if (seg[0] + maxextent + overlap) < end:
# Round down segment gps end time to integer multiple of cadence.
seglist.append(segments.segment(seg[0], idq_aggregator.floor_div(int(seg[0]) + maxextent + overlap, options.cadence)))
seg = segments.segment(seglist[-1][1] - overlap, seg[1])
else:
seglist.append(segments.segment(seg[0], end))
break
return seglist
def breakupsegs(seglist, maxextent, overlap):
newseglist = segments.segmentlist()
for bigseg in seglist:
newseglist.extend(breakupseg(bigseg, maxextent, overlap))
return newseglist
def analysis_segments(ifo, allsegs, boundary_seg, max_template_length = 30):
segsdict = segments.segmentlistdict()
# 512 seconds for the whitener to settle + the maximum template_length
......@@ -66,7 +99,8 @@ def analysis_segments(ifo, allsegs, boundary_seg, max_template_length = 30):
segsdict[ifo] = segments.segmentlist([boundary_seg])
segsdict[ifo] = segsdict[ifo].protract(start_pad)
segsdict[ifo] = gstlaldagparts.breakupsegs(segsdict[ifo], segment_length, start_pad)
# FIXME revert to gstlaldagparts.breakupsegs and remove above two functions when we no longer write to ascii.
segsdict[ifo] = breakupsegs(segsdict[ifo], segment_length, start_pad)
if not segsdict[ifo]:
del segsdict[ifo]
......@@ -82,10 +116,11 @@ def etg_node_gen(gstlalETGJob, dag, parent_nodes, segsdict, ifo, options, channe
total_rates = 0
outstr = ""
n_channels = 0
n_cpu = 0
n_process = 0
trig_start = options.gps_start_time
write_channel_list = True
# Loop over all channels to determine number of streams and minimum number of processes needed
# Loop over all channels to determine number of threads and minimum number of processes needed
for ii, channel in enumerate(channels,1):
samp_rate = data_source_info.channel_dict[channel]['fsamp']
max_samp_rate = min(2048, int(samp_rate))
......@@ -93,20 +128,24 @@ def etg_node_gen(gstlalETGJob, dag, parent_nodes, segsdict, ifo, options, channe
n_rates = int(numpy.log2(max_samp_rate/min_samp_rate) + 1)
cumsum_rates += n_rates
total_rates += n_rates
if cumsum_rates >= options.streams or ii == len(data_source_info.channel_dict.keys()):
n_cpu += 1
if cumsum_rates >= options.threads or ii == len(data_source_info.channel_dict.keys()):
n_process += 1
cumsum_rates = 0
# Create more even distribution of channels across minimum number of processes
n_streams = math.ceil(total_rates / n_cpu)
n_threads = math.ceil(total_rates / n_process)
if options.verbose:
print "Total streams =", total_rates
print "Total jobs needed =", n_cpu
print "Evenly distributed streams per job =", int(n_streams)
print "Total threads =", total_rates
print "Total jobs needed =", n_process
print "Evenly distributed threads per job =", int(n_threads)
for seg in segsdict[ifo]:
cumsum_rates = 0
out_index = 0
channel_list = []
for ii, channel in enumerate(channels,1):
n_channels += 1
......@@ -116,15 +155,16 @@ def etg_node_gen(gstlalETGJob, dag, parent_nodes, segsdict, ifo, options, channe
n_rates = int(numpy.log2(max_samp_rate/min_samp_rate) + 1)
cumsum_rates += n_rates
outstr = outstr + channel + ":" + str(int(samp_rate))
channel_list.append(channel + " " + str(int(samp_rate)))
# Adds channel to current process
if cumsum_rates < n_streams and ii < len(data_source_info.channel_dict.keys()):
if cumsum_rates < n_threads and ii < len(data_source_info.channel_dict.keys()):
outstr = outstr + " --channel-name="
# Finalise each process once number of streams passes threshold
if cumsum_rates >= n_streams or ii == len(data_source_info.channel_dict.keys()):
# Finalise each process once number of threads passes threshold
if cumsum_rates >= n_threads or ii == len(data_source_info.channel_dict.keys()):
out_index += 1
outpath = options.out_path + "/gstlal_etg/gstlal_etg_%04d/%i-%i" %(out_index, int(trig_start), int(seg[1])-int(trig_start))
outpath = options.out_path + "/gstlal_etg/gstlal_etg_%04d" %(out_index)
etg_nodes[channel] = \
inspiral_pipe.generic_node(gstlalETGJob, dag, parent_nodes = parent_nodes,
opts = {"gps-start-time":int(seg[0]),
......@@ -136,19 +176,28 @@ def etg_node_gen(gstlalETGJob, dag, parent_nodes, segsdict, ifo, options, channe
"mismatch":options.mismatch,
"qhigh":options.qhigh,
"cadence":options.cadence,
#"triggers-from-dataframe":"",
"disable-web-service":""
},
input_files = {"frame-cache":options.frame_cache},
output_files = {"out-path":outpath}
)
if options.verbose:
print "Job %04d, number of channels = %3d, number of streams = %4d" %(out_index, n_channels, cumsum_rates)
if options.verbose and write_channel_list is True :
print "Job %04d, number of channels = %3d, number of threads = %4d" %(out_index, n_channels, cumsum_rates)
if write_channel_list is True :
listpath = options.out_path + "/gstlal_etg/channel_lists/channel_list_%04d.txt" %(out_index)
f = open(listpath,'w')
for channel_out in channel_list:
f.write(channel_out+'\n')
f.close()
cumsum_rates = 0
outstr = ""
n_channels = 0
channel_list = []
trig_start = int(seg[1])
write_channel_list = False
return etg_nodes
......@@ -166,13 +215,13 @@ def parse_command_line():
# trigger generation options
parser.add_option("-v", "--verbose", action = "store_true", help = "Be verbose.")
parser.add_option("--triggers-from-dataframe", action = "store_true", default = False,
help = "If set, will output iDQ-compatible triggers to disk straight from dataframe once every cadence")
help = "If set, will output iDQ-compatible triggers to disk straight from dataframe once every cadence")
parser.add_option("--disable-web-service", action = "store_true", help = "If set, disables web service that allows monitoring of PSDS of aux channels.")
parser.add_option("--description", metavar = "string", default = "GSTLAL_IDQ_TRIGGERS", help = "Set the filename description in which to save the output.")
parser.add_option("--cadence", type = "int", default = 32, help = "Rate at which to write trigger files to disk. Default = 32 seconds.")
parser.add_option("-m", "--mismatch", type = "float", default = 0.2, help = "Mismatch between templates, mismatch = 1 - minimal match. Default = 0.2.")
parser.add_option("-q", "--qhigh", type = "float", default = 20, help = "Q high value for half sine-gaussian waveforms. Default = 20.")
parser.add_option("-s", "--streams", type = "float", default = 100, help = "Number of streams to process per node. Default = 100.")
parser.add_option("-t", "--threads", type = "float", default = 100, help = "Number of threads to process per node. Default = 100.")
parser.add_option("-l", "--latency", action = "store_true", help = "Print latency to output ascii file. Temporary.")
parser.add_option("--save-hdf", action = "store_true", default = False, help = "If set, will save hdf5 files to disk straight from dataframe once every cadence")
parser.add_option("--out-path", metavar = "path", default = ".", help = "Write to this path. Default = .")
......@@ -186,7 +235,6 @@ def parse_command_line():
return options, filenames
#
# Useful variables
#
......@@ -195,6 +243,10 @@ options, filenames = parse_command_line()
output_dir = "plots"
listdir = os.path.join(options.out_path, "gstlal_etg/channel_lists")
if not os.path.exists(listdir):
os.makedirs(listdir)
#
#
#
......@@ -238,4 +290,3 @@ etg_nodes = etg_node_gen(gstlalETGJob, dag, [], segsdict, ifo, options, channels
dag.write_sub_files()
dag.write_dag()
dag.write_script()
#dag.write_cache()
This diff is collapsed.
......@@ -9,22 +9,18 @@ CONDOR_COMMANDS:=--condor-command=accounting_group=$(ACCOUNTING_TAG) --condor-co
# Triggering parameters #
#########################
# The GPS start time for analysis
#START = 1176638000
#START = 1176630000
SEG_PAD = 1000
# The GPS start time for analysis
START = 1187000000
FSTART=$(shell echo $$((${START}-${SEG_PAD})))
# The GPS end time for analysis
#STOP = 1176639000
#STOP = 1176640000
#STOP = 1188000000
STOP = 1187100000
OUTPATH = $(PWD)
# Number of streams (N_channels x N_rates_per_channel) that each processor will analise
N_STREAMS = 500
# Number of threads (N_channels x N_rates_per_channel) that each cpu will process
N_THREADS = 400
MISMATCH = 0.2
QHIGH = 40
......@@ -54,27 +50,17 @@ SEGMENT_MIN_LENGTH = 512
FRAME_TYPE=R
#######################
# GSTLAL VETO Options #
#######################
# Vetoes file names
# Obtain veto definer from here: https://code.pycbc.phy.syr.edu/detchar/veto-definitions/blob/master/cbc/O2/
# As of commit on August 30, 2017: "Added new version of L1, H1 earthquake flag"
COMMIT = 5449edf6bf96fbd428add6551a51397bc5777f11
VETODEF = H1L1-HOFT_C00_O2_CBC.xml
#################
# Web directory #
#################
# A user tag for the run
TAG = O2_C00
#TAG = O2_C00
# Run number
RUN = run_1
#RUN = run_1
# A web directory for output (note difference between cit+uwm and Atlas)
# cit & uwm
WEBDIR = ~/public_html/observing/$(TAG)/$(START)-$(STOP)-$(RUN)
#WEBDIR = ~/public_html/observing/$(TAG)/$(START)-$(STOP)-$(RUN)
# Atlas
#WEBDIR = ~/WWW/LSC/testing/$(TAG)/$(START)-$(STOP)-test_dag-$(RUN)
......@@ -93,36 +79,39 @@ dag : frame.cache plots channel_list.txt segments.xml.gz
--gps-start-time $(START) \
--gps-end-time $(STOP) \
--frame-cache frame.cache \
--frame-segments-file segments.xml.gz \
--channel-list channel_list.txt \
--out-path $(OUTPATH) \
--streams $(N_STREAMS)\
--threads $(N_THREADS)\
--mismatch $(MISMATCH) \
--qhigh $(QHIGH) \
$(CONDOR_COMMANDS) \
--request-cpu 2 \
--request-memory 5GB \
--request-memory 11GB \
--verbose \
--disable-web-service
# --web-dir $(WEBDIR) \
full_channel_list.txt : frame.cache
FrChannels $$(head -n 1 $^ | awk '{ print $$5}' | sed -e "s@file://localhost@@g") > $@
# FIXME Determine channel list automatically.
#full_channel_list.txt : frame.cache
# FrChannels $$(head -n 1 $^ | awk '{ print $$5}' | sed -e "s@file://localhost@@g") > $@
# Produce segments file
segments.xml.gz : frame.cache
ligolw_segment_query_dqsegdb --segment-url=${SEG_SERVER} -q --gps-start-time ${START} --gps-end-time ${STOP} --include-segments=$(LIGO_SEGMENTS) --result-name=datasegments > $@
ligolw_segment_query_dqsegdb --segment-url=${SEG_SERVER} -q --gps-start-time ${FSTART} --gps-end-time ${STOP} --include-segments=$(LIGO_SEGMENTS) --result-name=datasegments > $@
ligolw_cut --delete-column segment:segment_def_cdb --delete-column segment:creator_db --delete-column segment_definer:insertion_time $@
gstlal_segments_trim --trim $(SEGMENT_TRIM) --gps-start-time $(START) --gps-end-time $(STOP) --min-length $(SEGMENT_MIN_LENGTH) --output $@ $@
gstlal_segments_trim --trim $(SEGMENT_TRIM) --gps-start-time $(FSTART) --gps-end-time $(STOP) --min-length $(SEGMENT_MIN_LENGTH) --output $@ $@
frame.cache :
# FIXME force the observatory column to actually be instrument
if [[ ${CLUSTER} == *"ligo-wa.caltech.edu" ]] ; then \
gw_data_find -o H -t H1_$(FRAME_TYPE) -l -s $(START) -e $(STOP) --url-type file -O $@ ; \
gw_data_find -o H -t H1_$(FRAME_TYPE) -l -s $(FSTART) -e $(STOP) --url-type file -O $@ ; \
elif [[ ${CLUSTER} == *"ligo-la.caltech.edu" ]] ; then \
gw_data_find -o L -t L1_$(FRAME_TYPE) -l -s $(START) -e $(STOP) --url-type file -O $@ ; \
gw_data_find -o L -t L1_$(FRAME_TYPE) -l -s $(FSTART) -e $(STOP) --url-type file -O $@ ; \
fi
# FIXME Add webpages once we have output
# Make webpage directory and copy files across
#$(WEBDIR) : $(MAKEFILE_LIST)
# mkdir -p $(WEBDIR)/OPEN-BOX
......
H1:CAL-CS_CARM_DELTAF_DQ 16384
H1:CAL-CS_LINE_SUM_DQ 16384
H1:CAL-DARM_CTRL_WHITEN_OUT_DBL_DQ 16384
H1:CAL-DARM_ERR_WHITEN_OUT_DBL_DQ 16384
H1:CAL-DELTAL_EXTERNAL_DQ 16384
H1:CAL-DELTAL_RESIDUAL_DBL_DQ 16384
H1:CAL-PCALX_FPGA_DTONE_IN1_DQ 16384
H1:CAL-PCALX_IRIGB_OUT_DQ 16384
H1:CAL-PCALX_RX_PD_OUT_DQ 16384
H1:CAL-PCALX_TX_PD_OUT_DQ 16384
H1:CAL-PCALY_EXC_SUM_DQ 16384
H1:CAL-PCALY_FPGA_DTONE_IN1_DQ 16384
H1:CAL-PCALY_IRIGB_OUT_DQ 16384
H1:CAL-PCALY_RX_PD_OUT_DQ 16384
H1:CAL-PCALY_TX_PD_OUT_DQ 16384
H1:IMC-F_OUT_DQ 16384
H1:IMC-I_OUT_DQ 16384
H1:LSC-ASAIR_A_RF45_I_ERR_DQ 16384
H1:LSC-ASAIR_A_RF45_Q_ERR_DQ 16384
H1:LSC-DARM_IN1_DQ 16384
H1:LSC-DARM_OUT_DQ 16384
H1:LSC-MCL_IN1_DQ 16384
H1:LSC-MCL_OUT_DQ 16384
H1:LSC-MICH_IN1_DQ 16384
H1:LSC-MICH_OUT_DQ 16384
H1:LSC-MOD_RF45_AM_AC_OUT_DQ 16384
H1:LSC-PRCL_IN1_DQ 16384
H1:LSC-PRCL_OUT_DQ 16384
H1:LSC-REFLAIR_A_RF45_I_ERR_DQ 16384
H1:LSC-REFLAIR_A_RF45_Q_ERR_DQ 16384
H1:LSC-REFLAIR_A_RF9_I_ERR_DQ 16384
H1:LSC-REFLAIR_A_RF9_Q_ERR_DQ 16384
H1:LSC-REFL_SERVO_CTRL_OUT_DQ 16384
H1:LSC-REFL_SERVO_ERR_OUT_DQ 16384
H1:LSC-SRCL_IN1_DQ 16384
H1:LSC-SRCL_OUT_DQ 16384
H1:OMC-DCPD_NORM_OUT_DQ 16384
H1:OMC-DCPD_NULL_OUT_DQ 16384
H1:OMC-DCPD_SUM_OUT_DQ 16384
H1:OMC-LSC_DITHER_OUT_DQ 16384
H1:OMC-LSC_I_OUT_DQ 16384
H1:OMC-PZT1_MON_AC_OUT_DQ 16384
H1:OMC-PZT2_MON_AC_OUT_DQ 16384
H1:PEM-CS_ACC_BEAMTUBE_MCTUBE_Y_DQ 16384
H1:PEM-CS_ACC_BSC1_ITMY_Y_DQ 16384
H1:PEM-CS_ACC_BSC3_ITMX_X_DQ 16384
H1:PEM-CS_ACC_HAM6_OMC_Z_DQ 16384
H1:PEM-CS_ACC_PSL_PERISCOPE_X_DQ 16384
H1:PEM-CS_ADC_4_30_16K_OUT_DQ 16384
H1:PEM-CS_ADC_4_31_16K_OUT_DQ 16384
H1:PEM-CS_MIC_EBAY_RACKS_DQ 16384
H1:PEM-CS_MIC_LVEA_BS_DQ 16384
H1:PEM-CS_MIC_LVEA_HAM7_DQ 16384
H1:PEM-CS_MIC_LVEA_INPUTOPTICS_DQ 16384
H1:PEM-CS_MIC_LVEA_OUTPUTOPTICS_DQ 16384
H1:PEM-CS_MIC_LVEA_VERTEX_DQ 16384
H1:PEM-CS_MIC_LVEA_XMANSPOOL_DQ 16384
H1:PEM-CS_MIC_LVEA_YMANSPOOL_DQ 16384
H1:PEM-CS_MIC_PSL_CENTER_DQ 16384
H1:PEM-CS_RADIO_EBAY_NARROWBAND_1_DQ 16384
H1:PEM-CS_RADIO_EBAY_NARROWBAND_2_DQ 16384
H1:PEM-CS_RADIO_LVEA_NARROWBAND_1_DQ 16384
H1:PEM-CS_RADIO_LVEA_NARROWBAND_2_DQ 16384
H1:PEM-CS_RADIO_ROOF1_BROADBAND_DQ 16384
H1:PEM-CS_RADIO_ROOF2_BROADBAND_DQ 16384
H1:PEM-CS_RADIO_ROOF3_BROADBAND_DQ 16384
H1:PEM-CS_RADIO_ROOF4_BROADBAND_DQ 16384
H1:PEM-EX_ACC_BSC9_ETMX_Y_DQ 16384
H1:PEM-EX_MIC_EBAY_RACKS_DQ 16384
H1:PEM-EX_MIC_VEA_MINUSX_DQ 16384
H1:PEM-EX_MIC_VEA_PLUSX_DQ 16384
H1:PEM-EY_ACC_BSC10_ETMY_X_DQ 16384
H1:PEM-EY_MIC_EBAY_RACKS_DQ 16384
H1:PEM-EY_MIC_VEA_MINUSY_DQ 16384
H1:PEM-EY_MIC_VEA_PLUSY_DQ 16384
H1:PSL-FSS_FAST_MON_OUT_DQ 16384
H1:PSL-FSS_MIXER_OUT_DQ 16384
H1:PSL-FSS_PC_MON_OUT_DQ 16384
H1:PSL-FSS_TPD_DC_OUT_DQ 16384
H1:PSL-ILS_HV_MON_OUT_DQ 16384
H1:PSL-ILS_MIXER_OUT_DQ 16384
H1:PSL-ISS_AOM_DRIVER_MON_OUT_DQ 16384
H1:PSL-ISS_PDA_REL_OUT_DQ 16384
H1:PSL-ISS_PDB_REL_OUT_DQ 16384
H1:PSL-OSC_PD_AMP_DC_OUT_DQ 16384
H1:PSL-OSC_PD_BP_DC_OUT_DQ 16384
H1:PSL-OSC_PD_INT_DC_OUT_DQ 16384
H1:PSL-OSC_PD_ISO_DC_OUT_DQ 16384
H1:PSL-PMC_HV_MON_OUT_DQ 16384
H1:PSL-PMC_MIXER_OUT_DQ 16384
H1:PSL-PWR_HPL_DC_OUT_DQ 16384
H1:PEM-CS_MAG_EBAY_LSCRACK_X_DQ 8192
H1:PEM-CS_MAG_EBAY_LSCRACK_Y_DQ 8192
H1:PEM-CS_MAG_EBAY_LSCRACK_Z_DQ 8192
H1:PEM-CS_MAG_EBAY_SUSRACK_X_DQ 8192
H1:PEM-CS_MAG_EBAY_SUSRACK_Y_DQ 8192
H1:PEM-CS_MAG_EBAY_SUSRACK_Z_DQ 8192
H1:PEM-CS_MAG_LVEA_INPUTOPTICS_X_DQ 8192
H1:PEM-CS_MAG_LVEA_INPUTOPTICS_Y_DQ 8192
H1:PEM-CS_MAG_LVEA_INPUTOPTICS_Z_DQ 8192
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment