Commit ea1bcb44 authored by Jameson Graef Rollins's avatar Jameson Graef Rollins
Browse files

WIP more parallelization in data fetching

parent 1cf74672
......@@ -251,12 +251,16 @@ class DataStore(QtCore.QObject):
def add_channel(self, channel):
self._channels[channel] += 1
assert self._channels[channel] >= 0
# return if we already had reference to this channel
if self._channels[channel] > 1:
return
logging.debug((channel, self._channels[channel]))
# if initializing don't fill
if self.init:
return
# if online restart
elif self.threads.get('online'):
# FIXME: need to handle other online trend types
elif self.threads.get('raw-online'):
self.data_retrieve_done.connect(self.online)
self.terminate()
# else fill in missing data
......@@ -265,7 +269,7 @@ class DataStore(QtCore.QObject):
if not self.db[trend]:
continue
self.remote_cmd(
'fetch', 'full', trend,
trend, 'full',
self.db[trend].span,
channels=[channel])
......@@ -310,7 +314,7 @@ class DataStore(QtCore.QObject):
logging.debug("DATA ONLINE")
self.reset()
self.new_data.connect(self._online_backfill)
self.remote_cmd('iterate', 'online', 'raw', None)
self.remote_cmd('raw', 'online', None)
def _online_backfill(self):
self.new_data.disconnect(self._online_backfill)
......@@ -318,7 +322,7 @@ class DataStore(QtCore.QObject):
return
start, end = self.db['raw'].span
start -= self.lookback
self.remote_cmd('fetch', 'left', 'raw', (start, end))
self.remote_cmd('raw', 'extendleft', (start, end))
def update(self, start_end, trend):
# expand range to ints
......@@ -336,7 +340,7 @@ class DataStore(QtCore.QObject):
pend = min(pend, gpsnow() - 10)
if self.db[trend] is None:
self.remote_cmd('fetch', 'full', trend, (pstart, pend))
self.remote_cmd(trend, 'full', (pstart, pend))
else:
# emit what data we have (in case the caller is requesting
......@@ -352,29 +356,40 @@ class DataStore(QtCore.QObject):
end = int(end)
if pstart < start:
self.remote_cmd('fetch', 'left', trend, (pstart, start))
self.remote_cmd(trend, 'extendleft', (pstart, start))
if pend > end:
self.remote_cmd('fetch', 'right', trend, (end, pend))
self.remote_cmd(trend, 'extend', (end, pend))
##########
def remote_cmd(self, method, tid, trend, start_end=None, channels=None):
def remote_cmd(self, trend, cmd, start_end=None, channels=None):
self.init = False
# this is a thread ID used as a kind of primitive lock.
# the ID should be unique enough to block requests from similar
# action, e.g. command ('extend', 'online') on trend type.
tid = '{}-{}'.format(trend, cmd)
if tid in self.threads and self.threads[tid]:
logging.debug("DATA BUSY: {} {} {} {}".format(method, tid, trend, start_end))
logging.debug("DATA BUSY: {} {}".format(tid, start_end))
return
logging.debug("DATA RETRIEVE: {} {} {} {}".format(method, tid, trend, start_end))
logging.debug("DATA CMND: {} {}".format(tid, start_end))
if cmd == 'online':
desc = 'online '
else:
desc = ''
if trend == 'sec':
ctype = "second trend"
desc += "second trend"
elif trend == 'min':
ctype = "minute trend"
elif tid == 'online':
ctype = "online"
else:
ctype = "full"
desc += "minute trend"
elif trend == 'raw':
desc += "raw"
self.data_retrieve_start.emit("Retrieving {} data...".format(desc))
channels = channels or self.channels
self.data_retrieve_start.emit("Retrieving {} data...".format(ctype))
t = nds.NDSThread(method, tid, trend, channels, start_end)
t = nds.NDSThread(tid, trend, cmd, channels, start_end)
t.new_data.connect(self.remote_recv)
t.done.connect(self.remote_done)
t.start()
......@@ -382,23 +397,23 @@ class DataStore(QtCore.QObject):
@Slot('PyQt_PyObject')
def remote_recv(self, recv):
tid = recv[0]
trend = recv[1]
trend = recv[0]
cmd = recv[1]
buffers = recv[2]
logging.log(5, "DATA RECV {} {}".format(tid, trend))
logging.log(5, "DATA RECV: {} {}".format(trend, cmd))
# FIXME: should the NDS object just return this directly?
dbd = DataBufferDict(buffers, self._lookback)
if tid == 'full':
if cmd == 'full':
if not self.db[trend]:
self.db[trend] = dbd
else:
self.db[trend].update(dbd)
elif tid == 'online':
elif cmd == 'online':
if not self.db[trend]:
self.db[trend] = dbd
else:
self.db[trend].append(dbd)
elif tid == 'left':
elif cmd == 'extendleft':
try:
self.db[trend].extendleft(dbd)
except AssertionError:
......@@ -415,21 +430,21 @@ class DataStore(QtCore.QObject):
# that are likely to fail, we probably still want to
# catch this error during online mode.
logging.error(traceback.format_exc(0))
elif tid == 'right':
elif cmd == 'extend':
self.db[trend].extend(dbd)
self._emit_data(trend, online=tid=='online')
self._emit_data(trend, online=cmd=='online')
@Slot('PyQt_PyObject')
def remote_done(self, recv):
tid, trend, error = recv
logging.debug("DATA DONE: {} {} {}".format(tid, trend, error))
self.threads[tid] = None
tid, error = recv
logging.debug("DATA DONE: {} {}".format(tid, error))
if error:
error = "NDS error: {}".format(error)
logging.warning(error)
else:
error = None
# return if there are still outstanding threads
self.threads[tid] = None
# signal only if there are no outstanding threads
for tid, thread in self.threads.items():
if thread:
return
......
......@@ -148,14 +148,17 @@ class NDSThread(QtCore.QThread):
_type_map = {'raw': None, 'sec': 's', 'min': 'm'}
def __init__(self, method, tid, trend, channels, start_end=None):
assert method in ['fetch', 'iterate']
if method == 'fetch':
assert start_end is not None
def __init__(self, tid, trend, cmd, channels, start_end=None):
super(NDSThread, self).__init__()
self.method = method
self.tid = tid
self.trend = trend
self.cmd = cmd
if self.cmd == 'online':
self.method = 'iterate'
else:
self.method = 'fetch'
if self.method == 'fetch':
assert start_end is not None
self.channels = []
for chan in channels:
if trend == 'raw':
......@@ -177,10 +180,10 @@ class NDSThread(QtCore.QThread):
self._running = True
def emit_data(self, bufs):
self.new_data.emit((self.tid, self.trend, bufs))
self.new_data.emit((self.trend, self.cmd, bufs))
def emit_done(self, error=None):
self.done.emit((self.tid, self.trend, error))
self.done.emit((self.tid, error))
@property
def running(self):
......@@ -199,8 +202,8 @@ class NDSThread(QtCore.QThread):
if self.method == 'fetch':
try:
bufs = fetch(self.channels, self.start_end)
logging.log(5, "NDS fetched: {}.{:09}".format(bufs[0].gps_seconds, bufs[0].gps_nanoseconds))
self.emit_data(bufs)
logging.log(5, "NDS fetched: {} {}".format(bufs[0].gps_seconds, bufs[0].gps_nanoseconds))
except RuntimeError as e:
error = str(e).split('\n')[0]
......@@ -210,7 +213,7 @@ class NDSThread(QtCore.QThread):
if not self.running:
break
self.emit_data(bufs)
logging.log(5, "NDS iterate: {} {}".format(bufs[0].gps_seconds, bufs[0].gps_nanoseconds))
logging.log(5, "NDS iterate: {}.{:09}".format(bufs[0].gps_seconds, bufs[0].gps_nanoseconds))
if not self.running:
break
except RuntimeError as e:
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment