diff --git a/BayesWaveUtils/bayeswave_pipe/bayeswave_pipe_utils.py b/BayesWaveUtils/bayeswave_pipe/bayeswave_pipe_utils.py index 9817d7d51ceb55c888e53a075eeef7c71ad81dba..34cb9b556f512f196766226faf67e62448155432 100644 --- a/BayesWaveUtils/bayeswave_pipe/bayeswave_pipe_utils.py +++ b/BayesWaveUtils/bayeswave_pipe/bayeswave_pipe_utils.py @@ -304,6 +304,21 @@ class eventTrigger: self.hv_time_lag = hv_time_lag self.lv_time_lag = lv_time_lag self.total_time_lag = total_time_lag + + # Keep track of actual GPS times in each IFO + ifo_list = ast.literal_eval(cp.get('input', 'ifo-list')) + self.perIFO_trigtime = {} + if 'H1' in ifo_list: + self.perIFO_trigtime['H1'] = trigger_time - total_time_lag + if 'L1' in ifo_list: + self.perIFO_trigtime['L1'] = trigger_time - hl_time_lag + if 'V1' in ifo_list: + if 'H1' in ifo_list: + self.perIFO_trigtime['V1'] = trigger_time - hv_time_lag + elif 'L1' in ifo_list: + self.perIFO_trigtime['V1'] = trigger_time - lv_time_lag + + self.trigger_frequency = trigger_frequency # Variable sample rate / window length [fixed TF volume] @@ -1203,7 +1218,11 @@ class bayeswaveJob(pipeline.CondorDAGJob, pipeline.AnalysisJob): # if cp.getboolean('condor', 'transfer-files'): - transferstring='datafind,$(macrooutputDir)' + # See if this works for individual caches + if cp.getboolean('condor','copy-frames'): + transferstring = '$(macrooutputDir)/datafind,$(macrooutputDir)' + else: + transferstring='datafind,$(macrooutputDir)' # Generate a script for PreCmd to setup directory structure if condor_precommand: @@ -1379,6 +1398,7 @@ class bayeswaveNode(pipeline.CondorDAGNode, pipeline.AnalysisNode): self.frames = ",".join(self.frames) self.add_macro('macroframes', self.frames) + # TODO: would it be cleaner to have a single time slide variable as a dictionary? def set_L1_timeslide(self, L1_timeslide): self.add_var_opt('L1-timeslide', L1_timeslide) self.L1_timeslide = L1_timeslide diff --git a/BayesWaveUtils/scripts/bayeswave_pipe b/BayesWaveUtils/scripts/bayeswave_pipe index 6ec846786b5ac8b265c1e0f3c8e4832a10f31524..94fc4f9011f3d756793ee41bb3ccd420f3a169ce 100755 --- a/BayesWaveUtils/scripts/bayeswave_pipe +++ b/BayesWaveUtils/scripts/bayeswave_pipe @@ -574,14 +574,14 @@ seglens = [trigger.seglen for trigger in trigger_list.triggers] if cp.has_option('input','gps-start-time'): gps_start_time = cp.getint('input','gps-start-time') -else: +else: #time slides trigtime = min(trigger_times) - (max(np.absolute(hl_lag_times))+25.0) seg, _ = job_times(trigtime, max(seglens), psdlen, padding) gps_start_time = seg[0] if cp.has_option('input','gps-end-time'): gps_end_time = cp.getint('input','gps-end-time') -else: +else: #time slides trigtime = max(trigger_times) + (max(np.absolute(hl_lag_times))+25.0) seg,_ = job_times(trigtime, max(seglens), psdlen, padding) gps_end_time = seg[1] @@ -981,12 +981,19 @@ for t,trigger in enumerate(trigger_list.triggers): #---------------------------------------- # Check job times fall within available data - job_segment, psd_start = job_times(trigger.trigger_time, trigger.seglen, + # Define job segment per ifo for time slides + print("lags etc",trigger.perIFO_trigtime) + job_segment = {} + psd_start = {} + for ifo in ifo_list: + job_segment[ifo], psd_start[ifo] = job_times(trigger.perIFO_trigtime[ifo], trigger.seglen, psdlen, padding) + # job_segment, psd_start = job_times(trigger.trigger_time, trigger.seglen, + # psdlen, padding) for ifo in ifo_list: - job_in_segments = [seg.__contains__(job_segment) \ + job_in_segments = [seg.__contains__(job_segment[ifo]) \ for seg in segmentList[ifo]] if not any(job_in_segments): @@ -997,7 +1004,7 @@ for t,trigger in enumerate(trigger_list.triggers): bad_job['seglen']=trigger.seglen bad_job['psdlen']=psdlen bad_job['padding']=padding - bad_job['job_segment']=job_segment + bad_job['job_segment']=job_segment[ifo] bad_job['data_segments']=segmentList[ifo] unanalyzeable_jobs.append(bad_job) @@ -1007,6 +1014,7 @@ for t,trigger in enumerate(trigger_list.triggers): print(bad_job, file=sys.stderr) break + #MM is this an orphan else? else: if 'H1' in ifo_list: @@ -1023,17 +1031,31 @@ for t,trigger in enumerate(trigger_list.triggers): if not cp.getboolean('datafind','sim-data'): - # + # MM note to self: this is within a loop over triggers # Identify frames associated with this job if opts.copy_frames: + custom_cache={} for ifo in ifo_list: - frame_idx = [seg.intersects(job_segment) for seg in frameSegs[ifo]] + cache_file = os.path.join(datafind_dir, + '{ifo}.cache'.format(ifo=ifo)) + with open(cache_file) as cache_entries: + cache_entries = cache_entries.readlines() + # cache_entries = np.loadtxt(cache_file, dtype=str) + # cache_entries = np.readlines(cache_file) + print("job_segment=",job_segment[ifo]) + frame_idx = [seg.intersects(job_segment[ifo]) for seg in frameSegs[ifo]] if cp.getboolean('condor','scitoken-auth'): transferFrames[ifo] = [f"igwn+{frame}" for f,frame in enumerate(framePaths[ifo]) if frame_idx[f]] else: transferFrames[ifo] = [frame for f,frame in - enumerate(framePaths[ifo]) if frame_idx[f]] + enumerate(framePaths[ifo]) if frame_idx[f]] + custom_cache[ifo] = [e for ii,e in enumerate(cache_entries)if frame_idx[ii]] + # custom_cache[ifo] = [cache_entries[f] for f,frame in + # enumerate(framePaths[ifo]) if frame_idx[f]] + print("custom cache =", custom_cache[ifo]) + # for yyy in custom_cache[ifo][0]: + # print(f"{yyy}") # Make output directory for this trigger outputDir = 'trigtime_' + str('%.9f'%trigger.trigger_time) + '_' + \ @@ -1042,7 +1064,23 @@ for t,trigger in enumerate(trigger_list.triggers): #str(float(trigger.hv_time_lag)) #+ str(uuid.uuid4()) outputDir = os.path.join(outputDir) if not os.path.exists(outputDir): os.makedirs(outputDir) + if not os.path.exists(os.path.join(outputDir,'datafind')): os.makedirs(os.path.join(outputDir,'datafind')) + # Try to make custom cache files dump_job_info(outputDir, trigger) + if opts.copy_frames: + for ifo in ifo_list: + cachefile = os.path.join(outputDir,'datafind','{ifo}.cache'.format(ifo=ifo)) + print("new cache file =",cachefile) + new_cache = open(cachefile, 'w') + + for xx in custom_cache[ifo]: + new_cache.write(f"{xx}") + # new_cache.writelines(custom_cache[ifo]) + new_cache.close() + + # A little hacky, but need to get single psd_start for BW command line + dummy, psd_start = job_times(trigger.trigger_time, trigger.seglen, + psdlen, padding) # ------------------------------------------------------------------ # BAYESLINE MEDIAN PSD NODES @@ -1211,6 +1249,9 @@ for t,trigger in enumerate(trigger_list.triggers): elif 'L1' in ifo_list and 'V1' in ifo_list: bayeswave_node.set_V1_timeslide(trigger.lv_time_lag) bayeswave_post_node.set_V1_timeslide(trigger.lv_time_lag) + + # bayeswave_node.set_perIFO_trigtime(ifo_list, trigger.trigger_time, trigger.total_time_lag, trigger.hl_time_lag, trigger.hv_time_lag) + # print("per IFO time=",bayeswave_node.perIFO_trigtime) if cp.has_option('bayeswave_options','BW-inject'): bayeswave_post_node.set_BW_event(trigger.BW_event)