Simplify parallelize and subprocess API
- Parallelize context manager now runs automatically within pipeline.run()
- Propagate errors from worker process in parallelize elements
- Use worker_process() instead of an argdict for parallelized elements:
Before:
@dataclass
class MyProcessingElement(ParallelizeTransformElement):
multiplier: int = 2
def __post_init__(self):
super().__post_init__()
# Pass parameters to worker via argdict
self.worker_argdict = {"multiplier": self.multiplier}
def pull(self, pad, frame):
self.in_queue.put(frame)
if frame.EOS:
self.at_eos = True
@staticmethod
def sub_process_internal(**kwargs):
"""Worker runs as static method with kwargs"""
inq = kwargs["inq"]
outq = kwargs["outq"]
worker_stop = kwargs["worker_stop"]
multiplier = kwargs.get("worker_argdict", {}).get("multiplier", 1)
try:
if worker_stop.is_set():
return
frame = inq.get(timeout=0.1)
if frame and not frame.EOS:
frame.data *= multiplier
outq.put(frame)
except Empty:
pass
def new(self, pad):
return self.out_queue.get()
After:
@dataclass
class MyProcessingElement(ParallelizeTransformElement):
multiplier: int = 2 # Instance attributes become worker parameters automatically
def pull(self, pad, frame):
self.in_queue.put(frame)
if frame.EOS:
self.at_eos = True
@staticmethod
def worker_process(context: WorkerContext, multiplier: int):
"""Worker runs as instance method with WorkerContext"""
try:
if context.should_stop():
return
frame = context.input_queue.get(timeout=0.1)
if frame and not frame.EOS:
frame.data *= multiplier
context.output_queue.put(frame)
except queue.Empty:
pass
def new(self, pad):
return self.out_queue.get()
Some elements need to save state across function calls. This can also be accessed through the worker context via context.state["key"] = value
.
Also moved all the subprocess tests into a single test file and removed/consolidated redundant or low-value-add tests. Test coverage is unchanged.
Edited by Patrick Godwin