Skip to content
Snippets Groups Projects
Commit cbaadb92 authored by Jonathan Hanks's avatar Jonathan Hanks
Browse files

Updated dc_cas.py, and added more pv outputs to zmq_threads.

dc_cas.py will auto restart its IOC if it notices a change in the list
of PVs.  It can now also process multiple messages in an iteration, to
speed up running through a back log.  It still only exports the most
recent message to EPICS.

Fixed some alarm parameters in zmq_threads pv export and added some
PVs to monitor the count of endpoints that contributed data during the
last second.

git-svn-id: https://redoubt.ligo-wa.caltech.edu/svn/advLigoRTS/trunk@4744 6dcd42c9-f523-4c6d-aada-af552506706e
parent 70e71e9f
No related branches found
No related tags found
No related merge requests found
......@@ -31,17 +31,17 @@ class State(object):
return
self.buf += os.read(f, 1024)
def process_message(self):
def process_single_message(self):
# no point in looking for a message if there is not enough data
# to do the header
if len(self.buf) < self.min_header_len:
return
return False
# look for the message boundary
offset = self.buf.find(self.msg_border)
if offset < 0:
# we are mid message, get rid of all the data except for len(self.msg_border)
self.buf = self.buf[:-len(self.msg_border)]
return
return False
elif offset > 0:
self.buf = self.buf[offset:]
# we are pretty sure at this point that the header starts at self.buf[0]
......@@ -49,15 +49,35 @@ class State(object):
if msg_len > 1024*1024:
# assume it is junk if the length is > 1MB, push past the message border and try again next time
self.buf = self.buf[4:]
return
return False
if len(self.buf) >= msg_len + self.min_header_len:
# we have a message
try:
self.messages.append(json.loads(self.buf[self.min_header_len:self.min_header_len + msg_len]))
except:
# malformed message, skip the header and move on
self.buf = self.buf[4:]
return
return False
self.buf = self.buf[self.min_header_len + msg_len:]
return True
return False
def process_messages(self):
while self.process_single_message():
pass
def reset_epics_if_needed(self):
if self.epics is None:
return
if len(self.messages) == 0:
return
msg = self.messages[-1]
expected_pvs = set(self.epics['db'].keys())
received_pvs = set([pv['name'] for pv in msg['pvs']])
if expected_pvs != received_pvs:
del self.epics['srv']
self.epics = None
def initialize_epics_if_needed(self):
if not self.epics is None:
......@@ -96,7 +116,7 @@ class State(object):
msg = self.messages[-1]
drv = self.epics['drv']
for pv in msg['pvs']:
print("Received message: {0}".format(msg))
# print("Received message: {0}".format(msg))
if pv['name'] in self.epics['db']:
drv.setParam(pv['name'], pv['value'])
drv.updatePVs()
......@@ -112,7 +132,8 @@ def main():
state = State()
while True:
state.injest_data(fd)
state.process_message()
state.process_messages()
state.reset_epics_if_needed()
state.initialize_epics_if_needed()
state.epics_loop()
finally:
......
......@@ -396,6 +396,10 @@ main(int argc, char **argv)
volatile int mean_cycle_time = 0;
volatile int pv_dcu_count = 0;
volatile int pv_total_datablock_size = 0;
volatile int endpoint_min_count = 1 << 30;
volatile int endpoint_max_count = 0;
volatile int endpoint_mean_count = 0;
int cur_endpoint_ready_count;
int n_cycle_time = 0;
int mytotaldcu = 0;
char *zbuffer;
......@@ -448,8 +452,8 @@ main(int argc, char **argv)
&pv_dcu_count,
120,
115,
0,
115,
0,
},
{
......@@ -457,10 +461,38 @@ main(int argc, char **argv)
&pv_total_datablock_size,
100*1024*1024,
0,
90*1024*1024,
1*1024*1024,
0,
}
1*1024*1024,
},
{
"ENDPOINT_MIN_COUNT",
&endpoint_min_count,
32,
0,
30,
1,
},
{
"ENDPOINT_MAX_COUNT",
&endpoint_max_count,
32,
0,
30,
1,
},
{
"ENDPOINT_MEAN_COUNT",
&endpoint_mean_count,
32,
0,
30,
1,
}
};
if (pv_debug_pipe_name)
{
......@@ -524,9 +556,11 @@ main(int argc, char **argv)
ifoDataBlock = (daq_multi_dcu_data_t *)nextData;
zbuffer = (char *)nextData + header_size;
cur_endpoint_ready_count = 0;
// Loop over all data buffers received from FE computers
for(ii=0;ii<nsys;ii++) {
if(dataRdy[ii]) {
++cur_endpoint_ready_count;
int myc = mxDataBlockSingle[ii].header.dcuTotalModels;
// For each model, copy over data header information
for(jj=0;jj<myc;jj++) {
......@@ -564,6 +598,13 @@ main(int argc, char **argv)
dc_datablock_size += mydbs;
}
}
if (cur_endpoint_ready_count < endpoint_min_count) {
endpoint_min_count = cur_endpoint_ready_count;
}
if (cur_endpoint_ready_count > endpoint_max_count) {
endpoint_max_count = cur_endpoint_ready_count;
}
endpoint_mean_count += cur_endpoint_ready_count;
// Write total data block size to shared memory header
ifoDataBlock->header.fullDataBlockSize = dc_datablock_size;
// Write total dcu count to shared memory header
......@@ -578,6 +619,7 @@ main(int argc, char **argv)
pv_dcu_count = mytotaldcu;
pv_total_datablock_size = dc_datablock_size;
mean_cycle_time = (n_cycle_time > 0 ? mean_cycle_time/n_cycle_time : 1<<31);
endpoint_mean_count = (n_cycle_time > 0 ? endpoint_mean_count/n_cycle_time : 1<<31);
send_pv_update(pv_debug_pipe, pv_prefix, pvs, sizeof(pvs)/sizeof(pvs[0]));
if (do_verbose)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment