Skip to content

Simplify parallelize and subprocess API

  • Parallelize context manager now runs automatically within pipeline.run()
  • Propagate errors from worker process in parallelize elements
  • Use worker_process() instead of an argdict for parallelized elements:

Before:

 @dataclass
  class MyProcessingElement(ParallelizeTransformElement):
      multiplier: int = 2

      def __post_init__(self):
          super().__post_init__()
          # Pass parameters to worker via argdict
          self.worker_argdict = {"multiplier": self.multiplier}

      def pull(self, pad, frame):
          self.in_queue.put(frame)
          if frame.EOS:
              self.at_eos = True

      @staticmethod
      def sub_process_internal(**kwargs):
          """Worker runs as static method with kwargs"""
          inq = kwargs["inq"]
          outq = kwargs["outq"]
          worker_stop = kwargs["worker_stop"]
          multiplier = kwargs.get("worker_argdict", {}).get("multiplier", 1)

          try:
              if worker_stop.is_set():
                  return
              frame = inq.get(timeout=0.1)
              if frame and not frame.EOS:
                  frame.data *= multiplier
                  outq.put(frame)
          except Empty:
              pass

      def new(self, pad):
          return self.out_queue.get()

After:

  @dataclass  
  class MyProcessingElement(ParallelizeTransformElement):
      multiplier: int = 2  # Instance attributes become worker parameters automatically

      def pull(self, pad, frame):
          self.in_queue.put(frame)
          if frame.EOS:
              self.at_eos = True

      @staticmethod
      def worker_process(context: WorkerContext, multiplier: int):
          """Worker runs as instance method with WorkerContext"""
          try:
              if context.should_stop():
                  return
              frame = context.input_queue.get(timeout=0.1)
              if frame and not frame.EOS:
                  frame.data *= multiplier
                  context.output_queue.put(frame)
          except queue.Empty:
              pass

      def new(self, pad):
          return self.out_queue.get()

Some elements need to save state across function calls. This can also be accessed through the worker context via context.state["key"] = value.

Also moved all the subprocess tests into a single test file and removed/consolidated redundant or low-value-add tests. Test coverage is unchanged.

Edited by Patrick Godwin

Merge request reports

Loading