Source code for socs.agents.acti_camera.agent

import argparse
import os
import shutil
import time
from pathlib import Path

import requests
import txaio
from ocs import ocs_agent, site_config
from ocs.ocs_twisted import Pacemaker, TimeoutLock

# For logging
txaio.use_twisted()


[docs] class ACTiCameraAgent: """Grab screenshots from ACTi cameras. Parameters ---------- agent : OCSAgent OCSAgent object which forms this Agent camera_addresses : list List of IP addresses (as strings) of cameras. user : str Username of cameras. password : str Password of cameras. Attributes ---------- agent : OCSAgent OCSAgent object which forms this Agent is_streaming : bool Tracks whether or not the agent is actively issuing requests to grab screenshots from cameras. Setting to false stops sending commands. log : txaio.tx.Logger txaio logger object, created by the OCSAgent """ def __init__(self, agent, camera_addresses, locations, user, password, resolutions=None): self.agent = agent self.is_streaming = False self.log = self.agent.log self.lock = TimeoutLock() # Default resolution if none provided if resolutions is None: resolutions = ['N640x480,100'] * len(camera_addresses) else: for i in range(len(camera_addresses)): if len(resolutions) <= i: resolutions.append('N640x480,100') self.cameras = [] for (location, address, resolution) in (zip(locations, camera_addresses, resolutions)): self.cameras.append({'location': location, 'address': address, 'resolution': resolution, 'connected': True}) self.user = user self.password = password agg_params = { 'frame_length': 10 * 60 # [sec] } self.agent.register_feed('cameras', record=True, agg_params=agg_params, buffer_time=0)
[docs] @ocs_agent.param('test_mode', default=False, type=bool) def acq(self, session, params=None): """acq(test_mode=False) **Process** - Grab screenshots from ACTi cameras. Parameters ---------- test_mode : bool, optional Run the Process loop only once. Meant only for testing. Default is False. Notes ----- The most recent data collected is stored in session.data in the structure:: >>> response.session['data'] # for each camera {'location1': {'location': 'location1', 'last_attempt': 1701983575.032506, 'connected': True, 'address': '10.10.10.41'}, 'location2': ... } """ pm = Pacemaker(1 / 60, quantize=False) self.is_streaming = True while self.is_streaming: # Use UTC timestamp = time.time() data = {} for camera in self.cameras: data[camera['location']] = {'location': camera['location']} self.log.info(f"Grabbing screenshot from {camera['location']}") payload = {'USER': self.user, 'PWD': self.password, 'SNAPSHOT': camera['resolution']} url = f"http://{camera['address']}/cgi-bin/encoder" # Format directory and filename ctime = int(timestamp) ctime_dir = int(str(timestamp)[:5]) Path(f"screenshots/{camera['location']}/{ctime_dir}").mkdir(parents=True, exist_ok=True) filename = f"screenshots/{camera['location']}/{ctime_dir}/{ctime}.jpg" latest_filename = f"screenshots/{camera['location']}/latest.jpg" # If no response from camera, update connection status and continue try: response = requests.get(url, params=payload, stream=True, timeout=5) except requests.exceptions.RequestException as e: self.log.error(f'{e}') self.log.info("Unable to get response from camera.") camera['connected'] = False data[camera['location']]['last_attempt'] = time.time() data[camera['location']]['connected'] = camera['connected'] continue camera['connected'] = True self.log.debug("Received screenshot from camera.") # Write screenshot to file and update latest file with open(filename, 'wb') as out_file: shutil.copyfileobj(response.raw, out_file) self.log.debug(f"Wrote {ctime}.jpg to /{camera['location']}/{ctime_dir}.") shutil.copy2(filename, latest_filename) self.log.debug(f"Updated latest.jpg in /{camera['location']}.") del response data[camera['location']]['last_attempt'] = time.time() data[camera['location']]['connected'] = camera['connected'] # Update session.data and publish to feed for camera in self.cameras: data[camera['location']]['address'] = camera['address'] session.data = data self.log.debug("{data}", data=session.data) message = { 'block_name': 'cameras', 'timestamp': timestamp, 'data': {} } for camera in self.cameras: message['data'][camera['location'] + "_connected"] = int(camera['connected']) session.app.publish_to_feed('cameras', message) self.log.debug("{msg}", msg=message) if params['test_mode']: break pm.sleep() return True, "Finished Recording"
def _stop_acq(self, session, params=None): """_stop_acq() **Task** - Stop task associated with acq process. """ if self.is_streaming: session.set_status('stopping') self.is_streaming = False return True, "Stopping Recording" else: return False, "Acq is not currently running"
def add_agent_args(parser=None): """ Build the argument parser for the Agent. Allows sphinx to automatically build documentation based on this function. """ if parser is None: parser = argparse.ArgumentParser() pgroup = parser.add_argument_group("Agent Options") pgroup.add_argument("--camera-addresses", nargs='+', type=str, help="List of camera IP addresses.") pgroup.add_argument("--locations", nargs='+', type=str, help="List of camera locations.") pgroup.add_argument("--resolutions", nargs='+', type=str, help="List of resolution and quality. Ex: N640x480,100 where 100 is quality %.") pgroup.add_argument("--user", help="Username of camera.") pgroup.add_argument("--password", help="Password of camera.") pgroup.add_argument("--mode", choices=['acq', 'test']) return parser def main(args=None): # Start logging txaio.start_logging(level=os.environ.get("LOGLEVEL", "info")) parser = add_agent_args() args = site_config.parse_args(agent_class='ACTiCameraAgent', parser=parser, args=args) if args.mode == 'acq': init_params = True elif args.mode == 'test': init_params = False agent, runner = ocs_agent.init_site_agent(args) p = ACTiCameraAgent(agent, camera_addresses=args.camera_addresses, locations=args.locations, resolutions=args.resolutions, user=args.user, password=args.password) agent.register_process("acq", p.acq, p._stop_acq, startup=init_params) runner.run(agent, auto_reconnect=True) if __name__ == "__main__": main()