Skip to content

Draft: Port to new resource source implementation

This ports the arrakis source to work with greg/sgn-ts!146

I couldn't figure out how to test it despite trying for a while. But you can run this with SGNLOGLEVEL=DEBUG and it seems to do something (assuming arrakis server is running in mock)

#!/usr/bin/env python3
"""
SGN Pipeline to read H1:CAL-DELTAL_EXTERNAL_DQ channel from Arrakis mock server
in real-time
"""

import os

# Set the Arrakis server URL before importing
os.environ["ARRAKIS_SERVER"] = "grpc://localhost:31206"

from sgn import Pipeline
from sgn.subprocess import Parallelize
from sgnts.sinks import NullSeriesSink

from sgn_arrakis import ArrakisSource


def main():
    # Define the channel to read
    channels = ["H1:CAL-DELTAL_EXTERNAL_DQ"]

    print(f"Starting pipeline to read channels: {channels}")
    print("Press Ctrl+C to stop\n")

    # Create the Arrakis source to read from the mock server
    # It will automatically connect to the server configured in ../sgn-ligo
    # The mock server has data starting around GPS time 1435366385
    source = ArrakisSource(
        source_pad_names=channels,
        start_time=1435366385,  # GPS time where mock data starts
        duration=10,  # Read 10 seconds of data
    )

    # Create a null sink that will print received data (verbose=True)
    # You can replace this with a different sink to process the data
    sink = NullSeriesSink(
        sink_pad_names=channels,
        verbose=False,  # Print information about received blocks
    )

    # Create the link map to connect source outputs to sink inputs
    link_map = {sink.snks[channel]: source.srcs[channel] for channel in channels}

    # Build the pipeline
    pipeline = Pipeline()
    pipeline.insert(source, sink, link_map=link_map)

    # Run the pipeline with Parallelize context manager (required for TSResourceSource)
    with Parallelize(pipeline) as parallelize:
        print("Pipeline is running and reading from mock server...")
        parallelize.run()

    print("Pipeline completed successfully")

    import threading

    active_threads = threading.enumerate()
    print("Active threads:")
    for thread in active_threads:
        print(
            f"  - Name: {thread.name}, Alive: {thread.is_alive()}, "
            f"Daemon: {thread.daemon}"
        )


if __name__ == "__main__":
    main()
    import sys

    sys.exit(0)

Merge request reports

Loading