Skip to content
Snippets Groups Projects

gwistat updates

Merged Rebecca Ewing requested to merge gwistat-updates into master
1 file
+ 21
3
Compare changes
  • Side-by-side
  • Inline
@@ -53,6 +53,7 @@ def parse_command_line():
class IFOStatusTracker(object):
def __init__(self, options):
logging.info('setting up ifo status tracker...')
self.output_url = options.output_url
self.last_reduce = None
@@ -81,11 +82,13 @@ class IFOStatusTracker(object):
bit = int(bit)
time = float(time)
state = 1 if (bit & 0b1 == 0b1) else 0
statechange = False
## track duration
if state == self.previous[ifo][1]:
self.duration[ifo] += time - self.previous[ifo][0]
elif self.previous[ifo][0]:
statechange = True
self.duration[ifo] = time - self.previous[ifo][0]
logging.debug(f'{ifo} h(t) state at {time} = {state} | duration = {self.duration[ifo]}')
@@ -99,7 +102,9 @@ class IFOStatusTracker(object):
self.last_reduce = time
## write to influx
if time - self.last_reduce >= 100.:
if time - self.last_reduce >= 50. or statechange:
if statechange:
logging.info(f'{ifo} h(t) state changed to {state} | time since last reduced = {time-self.last_reduce}')
self.last_reduce = int(utils.gps_now())
logging.debug(f'Reducing data to influx...')
@@ -125,10 +130,16 @@ def main():
channel_dict = {}
dirs_dict = {}
for channel in options.channel_name:
channel_dict.update({channel.split(':')[0]: channel.split(':')[1]})
ifo, channel_name = channel.split('=')
channel_dict.update({
ifo: channel_name
})
for dir in options.shared_memory_directory:
dirs_dict.update({dir.split(':')[0]: dir.split(':')[1]})
ifo, shm_dir = dir.split('=')
dirs_dict.update({
ifo: shm_dir
})
if not channel_dict.keys() == dirs_dict.keys():
raise Exception(f'List of IFOs provided with channels does not match list of IFOs provided with SHM directories.')
@@ -157,8 +168,15 @@ def main():
if not pipeline.set_state(Gst.State.PLAYING):
raise RuntimeError("pipeline failed to enter PLAYING state")
logging.info('starting...')
mainloop.run()
# done. always end with an error code so that dagman does
# not mark the job "done" and the job will be restarted when the dag is
# restarted.
#
sys.exit(1)
if __name__ == '__main__':
main()
Loading