Source code for socs.agents.smurf_stream_simulator.agent

import argparse
import os
import time
from collections import deque
from enum import Enum
from os import environ

import numpy as np
import txaio

# For logging
txaio.use_twisted()

ON_RTD = os.environ.get('READTHEDOCS') == 'True'
if not ON_RTD:
    import so3g  # noqa: F401
    from ocs import ocs_agent, site_config
    from spt3g import core


class FlowControl(Enum):
    """Flow control enumeration."""
    ALIVE = 0
    START = 1
    END = 2
    CLEANSE = 3


# Could extend FlowControl, but this keeps this simulator only thing separate
SHUTDOWN = 'shutdown'


class StreamChannel:
    """Simulated SMuRF channel for stream testing.

    Uses np.random.normal to generate random Gaussian data.

    Parameters
    ----------
    mean : float
        Mean value of Gaussian to simulate data with
    stdev : float
        Standard deviation of Gaussian to simulate data

    """

    def __init__(self, mean, stdev):
        self.mean = mean
        self.stdev = stdev

    def read(self):
        """Read a value from the channel.

        Returns
        -------
        float
            Random, normally distributed, value

        """
        return np.random.normal(self.mean, self.stdev)


[docs] class SmurfStreamSimulator: """OCS Agent to simulate data streaming without connection to a SMuRF. Parameters ---------- agent : OCSAgent OCSAgent object which forms this Agent target_host : str Target remote host address port : int Port to send data over num_chans : int Number of channels to simulate stream_id : str Stream ID to put into G3Frames. Defaults to "stream_sim" Attributes ---------- agent : OCSAgent OCSAgent object which forms this Agent log : txaio.tx.Logger txaio logger ojbect, created by the OCSAgent target_host : str Target remote host address port : int Port to send data over writer : spt3g.core.G3NetworkSender G3 writer for sending frames to, in this case a G3NetworkSender is_streaming : bool flag to track if we're currently streaming, used in stop task to stop the streaming process. If stopped then keep alive flow control frames are still being sent. running_in_background : bool flag to track if the streaming process is running in the background, channels : list List of simulated channels to stream """ def __init__(self, agent, target_host="*", port=4536, num_chans=528, stream_id='stream_sim'): self.agent = agent self.log = agent.log self.target_host = target_host self.port = port self.stream_id = stream_id self.writer = None self.is_streaming = False self.running_in_background = False self.channels = [ StreamChannel(0, 1) for i in range(num_chans) ]
[docs] def start_background_streamer(self, session, params=None): """start_background_streamer(params=None) Process to run streaming process. A data stream is started automatically. It can be stopped and started by the start and stop tasks. Either way keep alive flow control frames are being sent. Parameters ---------- frame_rate : float, optional Frequency [Hz] at which G3Frames are sent over the network. Defaults to 1 frame pers sec. sample_rate : float, optional Sample rate [Hz] for each channel. Defaults to 10 Hz. """ if params is None: params = {} self.writer = core.G3NetworkSender(hostname=self.target_host, port=self.port) frame_rate = params.get('frame_rate', 1.) sample_rate = params.get('sample_rate', 10.) frame_num = 0 self.running_in_background = True # Control flags FIFO stack to keep Writer single threaded self.flags = deque([FlowControl.START]) while self.running_in_background: # Send START frame if next(iter(self.flags), None) is FlowControl.START: self._set_stream_on() # sends start flowcontrol self.is_streaming = True self.flags.popleft() print("stream running in background") self.log.debug("control flags: {f}", f=self.flags) # Send keep alive flow control frame f = core.G3Frame(core.G3FrameType.none) f['sostream_flowcontrol'] = FlowControl.ALIVE.value self.writer.Process(f) if self.is_streaming: frame_start = time.time() time.sleep(1. / frame_rate) frame_stop = time.time() times = np.arange(frame_start, frame_stop, 1. / sample_rate) f = core.G3Frame(core.G3FrameType.Scan) f['session_id'] = 0 f['frame_num'] = frame_num f['sostream_id'] = self.stream_id f['data'] = core.G3TimestreamMap() for i, chan in enumerate(self.channels): ts = core.G3Timestream([chan.read() for t in times]) ts.start = core.G3Time(frame_start * core.G3Units.sec) ts.stop = core.G3Time(frame_stop * core.G3Units.sec) f['data'][f"r{i:04}"] = ts self.writer.Process(f) self.log.info("Writing frame...") frame_num += 1 # Send END frame if next(iter(self.flags), None) is FlowControl.END: self._send_end_flowcontrol_frame() self._send_cleanse_flowcontrol_frame() self.is_streaming = False self.flags.popleft() else: # Don't send keep alive frames too quickly time.sleep(1) # Shutdown streamer if next(iter(self.flags), None) is SHUTDOWN: self.running_in_background = False self.flags.popleft() # Teardown writer self.writer.Close() self.writer = None return True, "Finished streaming"
# Not thread safe, be aware of where you're calling these def _send_start_flowcontrol_frame(self): """Send START flowcontrol frame.""" if self.writer is not None: self.log.info("Sending START flowcontrol frame") f = core.G3Frame(core.G3FrameType.none) f['sostream_flowcontrol'] = FlowControl.START.value self.writer.Process(f) # Send new Observation frame at start of observation f = core.G3Frame(core.G3FrameType.Observation) f['session_id'] = 0 f['start_time'] = time.time() f['sostream_id'] = self.stream_id self.writer.Process(f) def _send_end_flowcontrol_frame(self): """Send END flowcontrol frame.""" if self.writer is not None: self.log.info("Sending END flowcontrol frame") f = core.G3Frame(core.G3FrameType.none) f['sostream_flowcontrol'] = FlowControl.END.value self.writer.Process(f) def _send_cleanse_flowcontrol_frame(self): """Send CLEANSE flowcontrol frames.""" if self.writer is not None: self.log.info("Sending CLEANSE flowcontrol frame") f = core.G3Frame(core.G3FrameType.Observation) f['sostream_flowcontrol'] = FlowControl.CLEANSE.value self.writer.Process(f) f = core.G3Frame(core.G3FrameType.Wiring) f['sostream_flowcontrol'] = FlowControl.CLEANSE.value self.writer.Process(f) def _set_stream_on(self): """Private method that that isn't a "task". Starts the stream of actual data frames from the background streaming process. """ self._send_start_flowcontrol_frame() self.is_streaming = True # Thread safe
[docs] def stop_background_streamer(self, session, params=None): """stop_background_streamer(params=None) Stop method associated with start_background_streamer process. The design here sets a flag that is checked in the main background_streamer process, that way we keep the G3Writer writing from a single thread. """ if self.is_streaming: if FlowControl.END not in self.flags: self.flags.append(FlowControl.END) if SHUTDOWN not in self.flags: self.flags.append(SHUTDOWN) return True, "Stopping stream"
[docs] def set_stream_on(self, session, params=None): """set_stream_on(params=None) Task to start the stream of actual data frames from the background streaming process. The design here sets a flag that is checked in the main background_streamer process, that way we keep the G3Writer writing from a single thread. Parameters ---------- force : bool Whether to force a start frame or not, defaults to False """ if params is None: params = {} force = params.get('force', False) if force: self.log.debug("force starting stream") if not self.is_streaming or force: if FlowControl.START not in self.flags: self.flags.append(FlowControl.START) return True, "Started stream"
[docs] def set_stream_off(self, session, params=None): """set_stream_off(params=None) Task to stop the stream of actual data frames from the background streaming process. Keep alive flow control frames will still be sent. The design here sets a flag that is checked in the main background_streamer process, that way we keep the G3Writer writing from a single thread. Parameters ---------- force : bool Whether to force a start frame or not, defaults to False """ if params is None: params = {} force = params.get('force', False) if self.is_streaming or force: if FlowControl.END not in self.flags: self.flags.append(FlowControl.END) return True, "Stopped stream"
def make_parser(parser=None): """Build the argument parser for the Agent. Allows sphinx to automatically build documentation based on this function. """ if parser is None: parser = argparse.ArgumentParser() # Add options specific to this agent. pgroup = parser.add_argument_group("Agent Options") pgroup.add_argument("--auto-start", default=False, type=bool, help="Automatically start streaming at " + "Agent startup.") pgroup.add_argument("--target-host", default="*", help="Target remote host.") pgroup.add_argument("--port", default=50000, help="Port to listen on.") pgroup.add_argument("--num-chans", default=528, help="Number of detector channels to simulate.") pgroup.add_argument("--stream-id", default="stream_sim", help="Stream ID for the simulator.") return parser def main(args=None): # Start logging txaio.start_logging(level=environ.get("LOGLEVEL", "info")) parser = make_parser() args = site_config.parse_args(agent_class='SmurfStreamSimulator', parser=parser, args=args) agent, runner = ocs_agent.init_site_agent(args) sim = SmurfStreamSimulator(agent, target_host=args.target_host, port=int(args.port), num_chans=int(args.num_chans), stream_id=args.stream_id) agent.register_process('stream', sim.start_background_streamer, sim.stop_background_streamer, startup=bool(args.auto_start)) agent.register_task('start', sim.set_stream_on) agent.register_task('stop', sim.set_stream_off) runner.run(agent, auto_reconnect=True) if __name__ == '__main__': main()