Skip to content
Snippets Groups Projects
Commit 10936619 authored by Jameson Graef Rollins's avatar Jameson Graef Rollins
Browse files

implement resource exception handling in TSResourceSource

Resource exceptions were not actually being handled.  This reworks how data
is pulled from the in queue to check if there are exceptions in the resource
(indicated by the reworked self.exception_signal), and then exit the
pipeline if any exceptions are encountered.

tests are added to maintain test coverage over the modified parts of the code.
parent 7e906ceb
No related branches found
No related tags found
1 merge request!115TSResourceSource fixes
......@@ -2,6 +2,7 @@ from __future__ import annotations
import queue
import threading
import time as stime
from collections import deque
from dataclasses import dataclass
......@@ -806,7 +807,7 @@ class TSResourceSource(_TSSource):
"""Initialize the RealTimeDataSource class."""
if not self.__is_setup:
self.stop_thread = queue.Queue()
self.exception_queue = queue.Queue()
self.exception_event = threading.Event()
self.in_queue = {p: queue.Queue(self.__in_queue_length) for p in self.rsrcs}
self.out_queue = {p: deque() for p in self.rsrcs}
self.latest_buffer_properties = {p: None for p in self.rsrcs}
......@@ -838,9 +839,9 @@ class TSResourceSource(_TSSource):
break
except queue.Empty:
pass
except Exception as e:
print(e)
self.exception_queue.put(e)
except Exception:
self.exception_event.set()
raise
def __exit__(self):
self.stop()
......@@ -858,30 +859,47 @@ class TSResourceSource(_TSSource):
self.thread = None
self.__is_setup = False
def _flush_queue(self, q, timeout=0):
assert timeout >= 1, "timeouts can not be less than one second."
# how long do we block before checking for resource exceptions
# FIXME: is should we wait for more or less time?
sleep = 0.01 # seconds
# track start time so we know when timeout is reached
start_time = stime.time()
# now wait for the elements in the queue, checking for
# execeptions from the resource while we wait
while stime.time() - start_time < timeout:
# check to see if any exceptions have been raise by the
# resource, and stop the thread and exit is so.
if self.exception_event.is_set():
self.stop()
raise RuntimeError("exception raised in resource thread, aborting.")
if q.empty():
stime.sleep(sleep)
else:
break
else:
self.stop()
raise ValueError(f"could not read from resource after {timeout} seconds")
# drain the queue
out = []
while not q.empty():
out.append(q.get(block=False))
return out
def get_data_from_queue(self):
"""Retrieve data from the queue with a timeout."""
try:
for pad in self.out_queue:
# get at least one
if self.in_queue[pad].empty():
self.out_queue[pad].append(
self.in_queue[pad].get(timeout=self.in_queue_timeout)
)
# get the rest
while not self.in_queue[pad].empty():
self.out_queue[pad].append(self.in_queue[pad].get(0))
self.latest_buffer_properties[pad] = self.out_queue[pad][-1].properties
if self.first_buffer_properties[pad] is None:
self.first_buffer_properties[pad] = self.out_queue[pad][
0
].properties
# We should have a t0 now
if self.__end is None and self.duration is not None:
self.__end = self.t0 + self.duration
except queue.Empty:
raise ValueError(
"could not read from resource after {self.in_queue_timeout} seconds"
for pad in self.out_queue:
self.out_queue[pad] += self._flush_queue(
self.in_queue[pad],
timeout=self.in_queue_timeout,
)
self.latest_buffer_properties[pad] = self.out_queue[pad][-1].properties
if self.first_buffer_properties[pad] is None:
self.first_buffer_properties[pad] = self.out_queue[pad][0].properties
# We should have a t0 now
if self.__end is None and self.duration is not None:
self.__end = self.t0 + self.duration
def set_data(self, out_frame, pad):
"""This method will set data on out_frame based on the contents of the
......
#!/usr/bin/env python3
from dataclasses import dataclass
from sgn.apps import Pipeline
import time
import numpy
import pytest
from sgn.apps import Pipeline
from sgn.sources import SignalEOS
from sgnts.base import TSResourceSource
from sgnts.base.buffer import SeriesBuffer
from sgnts.base.offset import Offset
from sgnts.sinks import NullSeriesSink
from sgnts.utils import gpsnow
import numpy
import queue
from sgn.sources import SignalEOS
#
......@@ -20,6 +21,7 @@ from sgn.sources import SignalEOS
class DataServer:
block_duration: int = 2
simulate_skip_data: bool = False
simulate_hang: int = 0
description = {
"H1:FOO": {"rate": 2048, "sample-shape": ()},
......@@ -29,6 +31,7 @@ class DataServer:
def stream(self, channels, start=None, end=None):
assert not (set(channels) - set(self.description))
t0 = int(gpsnow()) - 1.0 if start is None else start
time.sleep(self.simulate_hang)
while True:
out = {}
if end is not None and t0 >= end:
......@@ -59,11 +62,13 @@ class DataServer:
class FakeLiveSource(TSResourceSource):
simulate_skip_data: bool = False
block_duration: int = 4
simulate_hang: int = 0
def __post_init__(self):
self.server = DataServer(
block_duration=self.block_duration,
simulate_skip_data=self.simulate_skip_data,
simulate_hang=self.simulate_hang,
)
super().__post_init__()
......@@ -71,18 +76,12 @@ class FakeLiveSource(TSResourceSource):
for stream in self.server.stream(self.srcs, self.start_time, self.end_time):
for channel, block in stream.items():
pad = self.srcs[channel]
buf = SeriesBuffer(
offset=Offset.fromsec(block["t0"]),
data=block["data"],
sample_rate=block["rate"],
)
self.in_queue[pad].put(buf)
try:
self.stop_thread.get(0)
break
except queue.Empty:
pass
yield pad, buf
def test_resource_source():
......@@ -110,5 +109,57 @@ def test_resource_source():
pipeline.run()
def test_resource_fail():
pipeline = Pipeline()
src = FakeLiveSource(
name="src",
source_pad_names=("H1:BAR",),
duration=10,
block_duration=4,
)
snk = NullSeriesSink(
name="snk",
sink_pad_names=("H1",),
verbose=True,
)
pipeline.insert(
src,
snk,
link_map={snk.snks["H1"]: src.srcs["H1:BAR"]},
)
with pytest.raises(RuntimeError):
pipeline.run()
def test_resource_hang():
pipeline = Pipeline()
src = FakeLiveSource(
name="src",
source_pad_names=("H1:FOO",),
duration=10,
block_duration=4,
simulate_hang=2,
in_queue_timeout=1,
)
snk = NullSeriesSink(
name="snk",
sink_pad_names=("H1",),
verbose=True,
)
pipeline.insert(
src,
snk,
link_map={snk.snks["H1"]: src.srcs["H1:FOO"]},
)
with pytest.raises(ValueError):
pipeline.run()
if __name__ == "__main__":
test_resource_source()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment