Skip to content
Snippets Groups Projects
Commit 5acd15c3 authored by Chad Hanna's avatar Chad Hanna
Browse files

Merge branch 'mr-resource-event' into 'main'

TSResourceSource updates

See merge request !120
parents 9d3209d4 8f9ae46e
No related branches found
No related tags found
1 merge request!120TSResourceSource updates
Pipeline #718808 passed
......@@ -693,6 +693,7 @@ class TSSource(_TSSource):
@dataclass
class TSResourceSource(_TSSource):
"""Source class that is entirely data driven by an external resource.
The resource will gather data in a separate thread. The user
must implement the get_data() method and probably doesn't
need to implement any other methods. The get_data()
......@@ -705,9 +706,8 @@ class TSResourceSource(_TSSource):
with additional metadata e.g., "time_ns", "sample_rate", and "data".
An implementation of get_data() might look like this:
def get_data(self):
for stream in arrakis.stream(tuple(self.srcs)):
for stream in server.stream(tuple(self.srcs)):
for channel, block in stream.items():
pad = self.srcs[channel]
buf = SeriesBuffer(
......@@ -715,27 +715,20 @@ class TSResourceSource(_TSSource):
data=block.data,
sample_rate=block.channel.sample_rate,
)
self.in_queue[pad].put(buf)
try:
self.stop_thread.get(0)
break
except queue.Empty:
pass
There are also two additional queues. One "stop_thread" should be checked
to see if the thread should end. The other "exception_queue" should
be populated with exceptions from this thread.
yield pad, buf
start_time: Optional[int] = None
- If None, implies should start at now and is a real-time server
duration: Optional[int] = None
- If None, go on forever
Args:
start_time: Optional[int] = None
- If None, implies should start at now and is a real-time
server
duration: Optional[int] = None
- If None, go on forever
in_queue_timeout: int = 60
- How long to wait for a buffer from the resource before
timing out with a fatal error. This needs to be longer
than the duration of buffers coming from a real-time
server or it will hang.
in_queue_timeout: int = 60
- How long to wait for a buffer from the resource before timing out with
a fatal error. This needs to be longer than the duration of buffers
coming from a real-time server or it will hang.
"""
start_time: Optional[int] = None
......@@ -800,7 +793,7 @@ class TSResourceSource(_TSSource):
def setup(self):
"""Initialize the RealTimeDataSource class."""
if not self.__is_setup:
self.stop_thread = queue.Queue()
self.stop_event = threading.Event()
self.exception_event = threading.Event()
self.in_queue = {p: queue.Queue(self.__in_queue_length) for p in self.rsrcs}
self.out_queue = {p: deque() for p in self.rsrcs}
......@@ -828,11 +821,8 @@ class TSResourceSource(_TSSource):
try:
for pad, buf in self.get_data():
self.in_queue[pad].put(buf)
try:
self.stop_thread.get(0)
if self.stop_event.is_set():
break
except queue.Empty:
pass
except Exception:
self.exception_event.set()
raise
......@@ -847,7 +837,7 @@ class TSResourceSource(_TSSource):
self.thread.start()
def stop(self):
self.stop_thread.put(True)
self.stop_event.set()
if self.thread and self.thread.is_alive():
self.thread.join()
self.thread = None
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment