Maintenance will be performed on git.ligo.org, chat.ligo.org, containers.ligo.org, and docs.ligo.org tomorrow, 2020/08/04, starting at approximately 9am PDT. It is expected to take around 15 minutes and there will be a short period of downtime towards the end of the maintenance window. Please direct any comments, questions or concerns to computing-help@ligo.org.

Commit 4c853b8b authored by Jameson Rollins's avatar Jameson Rollins

single thread: move CAS processing out of separate thread into main daemon thread thread

This is a fairly large overhaul of the original threading model that had
the pcaspy portable channel access server in a separate thread.  We here
bring the cas back in to the main thread, and process it's select loop at
the top of the daemon main loop.

Digging into the seg faults [0] with an instrumented address sanitizer
versions of python and pcaspy we discovered that pcaspy is not thread safe.
When run in a separate thread pcaspy was producing a strncpy
heap-use-after-free when pushing the unit CTRL sub-record (gdb showed that
it was in strncpy of the string "seconds", where the only use of "seconds"
in guardian is in unit sub-record for EXECTIME).  Attempts to wrap guardian's
reading and writing to cas driver records was not sufficient, and cas does
not provide any hooks to locks for it's access either.  So why not try to
get rid of the separate cas thread altogether...

The cas.process() is moved in to the dead time at the top of the main daemon
loop, where it was waiting for the clock tick. This has the side affect of actually
synchonizing to the wall clock microsecond step (good?).

The rest of the patch is just cleanup to remove all the old thread/lock
stuff that is no longer needed, and to simplify the relevant interfaces.

[0] https://alog.ligo-wa.caltech.edu/aLOG/index.php?callRep=40765
parent 6a553dc4
......@@ -26,11 +26,10 @@ class CADriver(pcaspy.Driver):
super(CADriver, self).__init__()
self._state_index = {}
self._request_event = request_event
self.lock = threading.Lock()
##########
# these methods are for the external interface
# NOTE: CALLER MUST LOCK (and updatePV after set)
# NOTE: caller must updatePV after set
def __getitem__(self, channel):
value = self.getParam(channel)
......@@ -57,12 +56,6 @@ class CADriver(pcaspy.Driver):
##########
# these methods are for the internal interface, e.g. CA clients
# (locking handled)
# def read(self, channel):
# with self.lock:
# return self.getParam(channel)
def write(self, channel, value):
# NOTE: for enum records the value here is the numeric value,
......@@ -80,23 +73,21 @@ class CADriver(pcaspy.Driver):
and value >= len(guarddb[channel]['enums']):
return False
params = []
if channel in ['REQUEST', 'REQUEST_ENUM']:
if channel == 'REQUEST':
# reject invalid state requests
if value not in self._state_index.keys():
return False
if value in guarddb['REQUEST_ENUM']['enums']:
params.append(('REQUEST_ENUM', request_enum_index(value)))
self.setParam('REQUEST_ENUM', request_enum_index(value))
svalue = value
elif channel == 'REQUEST_ENUM':
params.append(('REQUEST_ENUM', value))
self.setParam('REQUEST_ENUM', value)
svalue = guarddb['REQUEST_ENUM']['enums'][value]
params.append(('REQUEST', svalue))
params.append(('REQUEST_S', svalue))
params.append(('REQUEST_N', self._state_index[svalue]))
self.setParam('REQUEST', svalue)
self.setParam('REQUEST_S', svalue)
self.setParam('REQUEST_N', self._state_index[svalue])
self._request_event.set()
elif channel == 'MODE':
......@@ -106,49 +97,30 @@ class CADriver(pcaspy.Driver):
# specifying manager
if mode == 'MANAGED':
return False
params.append(('MODE', value))
self.setParam('MODE', value)
# make sure manager channel is unset if switching away
# from managed mode
if mode in ['AUTO', 'MANUAL']:
params.append(('MANAGER', ''))
self.setParam('MANAGER', '')
elif channel == 'MANAGER':
# reject setting manager to empty string
if value == '':
return False
params.append(('MANAGER', value))
params.append(('MODE', guarddb['MODE']['enums'].index('MANAGED')))
self.setParam('MANAGER', value)
self.setParam('MODE', guarddb['MODE']['enums'].index('MANAGED'))
else:
params.append((channel, value))
# HACK: the setParam method sets pv flag=True, which indicates
# that the value needs to be writen out on the next
# updatePVs(). This is usually not needed during write(),
# since the values being written are already externally
# up-to-date when write() is executed. However, since we're
# updating auxilliary records during write() (e.g. state
# readbacks), we need to run updatePVs() for those aux
# channels to be updated. This then causes the write()
# channels to be double updated, which in turn causes
# connected clients to see two updates. Setting flag=False
# for the original write channel prevents this double update
# from happening. This should probably be fixed upstream.
# UPDATE: this does not seem to work in pcaspy 0.7, and
# instead causes channels to not show updates properly
#self.pvDB[channel].flag = False
with self.lock:
for param in params:
self.setParam(*param)
self.updatePVs()
self.setParam(channel, value)
self.updatePVs()
return True
class CAServer(threading.Thread):
class CAServer(object):
def __init__(self, system_name):
super(CAServer, self).__init__()
self.prefix = const.CAS_PREFIX_FMT.format(IFO=const.IFO, SYSTEM=system_name)
self._ready_event = threading.Event()
self._request_event = threading.Event()
self._server = pcaspy.SimpleServer()
#self._server.setDebugLevel(4)
......@@ -165,28 +137,24 @@ class CAServer(threading.Thread):
def _loaddb(self):
self._server.createPV(self.prefix, guarddb)
########################################
def __getitem__(self, channel):
with self._driver.lock:
return self._driver[channel]
def process(self, timeout):
"""Server process select loop
Returns after tproc seconds.
def __setitem__(self, channel, value):
with self._driver.lock:
self._driver[channel] = value
self._driver.updatePVs()
"""
self._server.process(timeout)
########################################
def get_many(self, channels):
"""Get multiple values from CAS.
def __getitem__(self, channel):
return self._driver[channel]
"""
chanvals = []
with self._driver.lock:
for channel in channels:
chanvals.append((channel, self._driver[channel]))
return chanvals
def __setitem__(self, channel, value):
self._driver[channel] = value
self._driver.updatePVs()
def update_state_index(self, index, init=False):
......@@ -198,11 +166,10 @@ class CAServer(threading.Thread):
return
# update all guardstate channels, in case the state:index
# mapping has changed
with self._driver.lock:
for channel, entry in guarddb.items():
if entry.get('guardstate', False):
self._driver[channel] = self._driver[channel]
self._driver.updatePVs()
for channel, entry in guarddb.items():
if entry.get('guardstate', False):
self._driver[channel] = self._driver[channel]
self._driver.updatePVs()
def update_request_enum(self, enum, init=False):
......@@ -214,28 +181,11 @@ class CAServer(threading.Thread):
if enum == guarddb['REQUEST_ENUM']['enums']:
return False
guarddb['REQUEST_ENUM']['enums'] = enum
with self._driver.lock:
self._driver.setParamEnums('REQUEST_ENUM', enum)
if not init:
# update request enum, in case it's changed
request = self._driver.getParam('REQUEST')
if request in guarddb['REQUEST_ENUM']['enums']:
self._driver.setParam('REQUEST_ENUM', request_enum_index(request))
self._driver.updatePVs()
self._driver.setParamEnums('REQUEST_ENUM', enum)
if not init:
# update request enum, in case it's changed
request = self._driver.getParam('REQUEST')
if request in guarddb['REQUEST_ENUM']['enums']:
self._driver.setParam('REQUEST_ENUM', request_enum_index(request))
self._driver.updatePVs()
return True
########################################
def start(self):
super(CAServer, self).start()
self._ready_event.wait()
def stop(self):
self._running = False
def run(self):
self._ready_event.set()
while self._running:
self._server.process(const.TIME_STEP)
import time
import threading
class Clock(threading.Thread):
"""daemon run clock.
Caller should call wait() method to be released on every cycle.
"""
def __init__(self, time_step):
super(Clock, self).__init__()
self._time_step = time_step
self.daemon = True
# clock cycle condition object
self._cv = threading.Condition()
self._cv.daemon = True
# run flag
self._running = True
def stop(self):
self._running = False
def run(self):
while self._running:
time.sleep(self._time_step)
with self._cv:
self._cv.notify_all()
def wait(self):
"""Wait for the next tick of the timer"""
with self._cv:
self._cv.wait()
......@@ -13,7 +13,8 @@ CAS_PREFIX_FMT = '{IFO}:GRD-{SYSTEM}_'
# main daemon step time in Hz
CPS = int(os.getenv('GUARD_CPS', 16))
TIME_STEP = 1./CPS
if 1000000 % CPS != 0:
raise ValueError("CPS does not correspond to an integer number of microseconds")
# how long to wait, in seconds, on REDIRECT before terminating worker
REDIRECT_TIMEOUT = 1
......
......@@ -27,7 +27,6 @@ from ._version import __version__
from . import const
from .system import GuardSystemError
from .db import guarddb, Database
from .clock import Clock
from .worker import Worker
############################################################
......@@ -44,7 +43,6 @@ class Daemon(object):
initial_mode='AUTO',
initial_state=None,
initial_request=None,
time_step=const.TIME_STEP,
redirect_timeout=const.REDIRECT_TIMEOUT,
single_shot=False,
archive=None,
......@@ -69,9 +67,6 @@ class Daemon(object):
if self.single_shot:
self.log.info("single-shot mode")
# initialize clock thread
self.clock = Clock(time_step)
# worker subprocess
self.worker = None
......@@ -185,34 +180,6 @@ guardian version: {}
########################################
def clock_start(self):
self.log.debug("starting run clock...")
self.clock.start()
self.log.debug("run clock started")
def clock_terminate(self):
self.log.debug("terminating run clock...")
self.clock.stop()
self.clock.join()
self.log.debug("run clock terminated")
########################################
def cas_start(self):
if self.cas:
self.log.debug("starting cas...")
self.cas.start()
self.log.debug("cas started")
def cas_terminate(self):
if self.cas:
self.log.debug("terminating cas...")
self.cas.stop()
self.cas.join()
self.log.debug("cas terminated")
########################################
def worker_init(self):
self.log.debug("initializing worker...")
snapshot_file = None
......@@ -389,8 +356,6 @@ guardian version: {}
sd_notify("STOPPING=1")
self.log.info("stopping daemon...")
self.worker_terminate()
self.clock_terminate()
self.cas_terminate()
self.log.info("daemon stopped.")
########################################
......@@ -409,11 +374,15 @@ guardian version: {}
# holds time at which to reset log level
loglevel_reset_time = None
# start the channel access server thread
self.cas_start()
# start the main run clock
self.clock_start()
# use cas.process as sleep if available
if self.cas:
sleep = self.cas.process
else:
sleep = time.sleep
# handle times in int usec
time_step = int(1000000/const.CPS)
# start on the next integer second
next_step = int(time.time() + 1) * 1000000
# start the worker subprocess
self.worker_init()
......@@ -425,7 +394,18 @@ guardian version: {}
########################################
while True:
self.clock.wait()
# sleep/process till next cycle
while int(time.time()*1000000) < next_step:
# FIXME: this sleep time affects the load put on the system. If
# this is set too low the process will be in R more than S,
# which jacks the system load up considerably. If it's set too
# high, there's more skew in the total loop processing time.
# The main part of the loop below usually completely in less
# than 5ms, so there should be ~58ms in this sleep/process loop.
# A 5ms sleep here seems to be a reasonable compromise.
sleep(0.005)
next_step += time_step
self.log.log(5, '====== start cycle ======')
sd_notify("WATCHDOG=1")
......
......@@ -101,7 +101,7 @@ guarddb = {
'prec': 4,
'lolo': -1,
'low': -1,
'high': const.TIME_STEP,
'high': 1./const.CPS,
'hihi': 1,
'log_changes': False,
'archive': True,
......@@ -122,7 +122,7 @@ guarddb = {
'prec': 4,
'lolo': -1,
'low': -1,
'high': const.TIME_STEP,
'high': 1./const.CPS,
'hihi': 1,
'log_changes': False,
},
......@@ -325,7 +325,9 @@ class Database(object):
# CAS interaction
def update(self):
"""Update the db from the cas."""
"""Update the db from the cas.
"""
if not self._cas:
return
......@@ -333,11 +335,16 @@ class Database(object):
self.request_event = True
self._cas._request_event.clear()
for channel, value in self._cas.get_many(self._writable):
self._db[channel] = value
for channel in self._writable:
self._db[channel] = self._cas[channel]
def update_system(self, system, init=False):
"""Update system definition
E.g. state index and request state enum
"""
if not self._cas:
return
index = system.state_index_dict()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment