Source code for socs.agents.http_camera.agent

import argparse
import json
import os
import shutil
import time
from pathlib import Path

import requests
import txaio
import yaml
from ocs import ocs_agent, site_config
from ocs.ocs_twisted import Pacemaker, TimeoutLock
# Disable unverified HTTPS warnings (https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings)
from urllib3.exceptions import InsecureRequestWarning, ReadTimeoutError

requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)

# For logging
txaio.use_twisted()


[docs] class HTTPCameraAgent: """Grab screenshots from HTTP cameras. Parameters ---------- agent : OCSAgent OCSAgent object which forms this Agent config_file : str Config file path relative to OCS_CONFIG_DIR Attributes ---------- agent : OCSAgent OCSAgent object which forms this Agent is_streaming : bool Tracks whether or not the agent is actively issuing requests to grab screenshots from cameras. Setting to false stops sending commands. log : txaio.tx.Logger txaio logger object, created by the OCSAgent """ def __init__(self, agent, config_file, renew_token=2700): self.agent = agent self.is_streaming = False self.log = self.agent.log self.lock = TimeoutLock() file_path = os.path.join(os.environ['OCS_CONFIG_DIR'], config_file) with open(file_path, 'r') as f: self.config = yaml.safe_load(f) self.renew_token = renew_token agg_params = { 'frame_length': 10 * 60 # [sec] } self.agent.register_feed('cameras', record=True, agg_params=agg_params, buffer_time=0)
[docs] @ocs_agent.param('test_mode', default=False, type=bool) def acq(self, session, params=None): """acq(test_mode=False) **Process** - Grab screenshots from HTTP cameras. Parameters ---------- test_mode : bool, optional Run the Process loop only once. Meant only for testing. Default is False. Notes ----- The most recent data collected is stored in session.data in the structure:: >>> response.session['data'] # for each camera {'location1': {'location': 'location1', 'last_attempt': 1701983575.032506, 'connected': True, 'address': '10.10.10.41'}, 'location2': ... } """ pm = Pacemaker(1 / 60, quantize=False) self.is_streaming = True while self.is_streaming: pm.sleep() # Use UTC timestamp = time.time() data = {} for camera in self.config['cameras']: data[camera['location']] = {'location': camera['location']} self.log.info(f"Grabbing screenshot from {camera['location']}") if camera['brand'] == 'reolink': token = camera.get('token', None) token_ts = camera.get('token_ts', 0) # Token lease time is 1hr. expired = (timestamp - token_ts) > self.renew_token if token is None or expired: login_url = f"https://{camera['address']}/api.cgi?cmd=Login" login_payload = [{"cmd": "Login", "param": {"User": {"Version": 0, "userName": camera['user'], "password": camera['password']}}}] try: resp = requests.post(login_url, data=json.dumps(login_payload), verify=False) except (requests.exceptions.ConnectionError, requests.exceptions.RequestException, ReadTimeoutError) as e: self.log.error(f'{e}') self.log.info("Unable to get response from camera.") data[camera['location']]['last_attempt'] = time.time() data[camera['location']]['connected'] = False continue rdata = resp.json() value = rdata[0].get('value', None) if value is None: self.log.info("Unable to get token. Max number of tokens used.") data[camera['location']]['last_attempt'] = time.time() data[camera['location']]['connected'] = False continue camera['token'] = value['Token']['name'] camera['token_ts'] = timestamp payload = {'cmd': "Snap", 'channel': "0", 'rs': "flsYJfZgM6RTB_os", 'token': camera['token']} url = f"https://{camera['address']}/cgi-bin/api.cgi" elif camera['brand'] == 'acti': payload = {'USER': camera['user'], 'PWD': camera['password'], 'SNAPSHOT': camera.get('resolution', 'N640x480,100')} url = f"http://{camera['address']}/cgi-bin/encoder" else: self.log.info(f"{camera['brand']} is an unsupported camera brand. Skipping this config block.") self.config['cameras'].remove(camera) continue # Format directory and filename ctime = int(timestamp) ctime_dir = int(str(timestamp)[:5]) Path(f"screenshots/{camera['location']}/{ctime_dir}").mkdir(parents=True, exist_ok=True) filename = f"screenshots/{camera['location']}/{ctime_dir}/{ctime}.jpg" latest_filename = f"screenshots/{camera['location']}/latest.jpg" # If no response from camera, update connection status and continue try: if camera['brand'] == 'reolink': response = requests.get(url, params=payload, stream=True, timeout=5, verify=False) elif camera['brand'] == 'acti': response = requests.get(url, params=payload, stream=True, timeout=5) except (requests.exceptions.ConnectionError, requests.exceptions.RequestException, ReadTimeoutError) as e: self.log.error(f'{e}') self.log.info("Unable to get response from camera.") data[camera['location']]['last_attempt'] = time.time() data[camera['location']]['connected'] = False continue self.log.debug("Received screenshot from camera.") # Write screenshot to file and update latest file try: with open(filename, 'wb') as out_file: for chunk in response.iter_content(chunk_size=8192): if chunk: out_file.write(chunk) out_file.flush() os.fsync(out_file.fileno()) self.log.debug(f"Wrote {ctime}.jpg to /{camera['location']}/{ctime_dir}.") except (requests.exceptions.ConnectionError, requests.exceptions.RequestException, ReadTimeoutError) as e: self.log.error(f'{e}') self.log.info("Timeout occurred while writing to file.") data[camera['location']]['last_attempt'] = time.time() data[camera['location']]['connected'] = False continue except Exception as e: self.log.error(f'{e}') self.log.info("Unexpected error occurred while writing to file.") data[camera['location']]['last_attempt'] = time.time() data[camera['location']]['connected'] = False continue finally: response.close() shutil.copy2(filename, latest_filename) self.log.debug(f"Updated latest.jpg in /{camera['location']}.") data[camera['location']]['last_attempt'] = time.time() data[camera['location']]['connected'] = True # Update session.data and publish to feed for camera in self.config['cameras']: data[camera['location']]['address'] = camera['address'] session.data = data self.log.debug("{data}", data=session.data) message = { 'block_name': 'cameras', 'timestamp': timestamp, 'data': {} } for camera in self.config['cameras']: message['data'][camera['location'] + "_connected"] = int(data[camera['location']]['connected']) session.app.publish_to_feed('cameras', message) self.log.debug("{msg}", msg=message) if params['test_mode']: break return True, "Finished Recording"
def _stop_acq(self, session, params=None): """_stop_acq() **Task** - Stop task associated with acq process. """ if self.is_streaming: session.set_status('stopping') self.is_streaming = False return True, "Stopping Recording" else: return False, "Acq is not currently running"
def add_agent_args(parser=None): """ Build the argument parser for the Agent. Allows sphinx to automatically build documentation based on this function. """ if parser is None: parser = argparse.ArgumentParser() pgroup = parser.add_argument_group("Agent Options") pgroup.add_argument("--config-file", type=str, help="Config file path relative to OCS_CONFIG_DIR") pgroup.add_argument("--mode", choices=['acq', 'test']) pgroup.add_argument("--renew-token", type=int, default=2700, help="Renew API token after this amount of seconds. Used for Reolink cameras.") return parser def main(args=None): # Start logging txaio.start_logging(level=os.environ.get("LOGLEVEL", "info")) parser = add_agent_args() args = site_config.parse_args(agent_class='HTTPCameraAgent', parser=parser, args=args) if args.mode == 'acq': init_params = True elif args.mode == 'test': init_params = False agent, runner = ocs_agent.init_site_agent(args) p = HTTPCameraAgent(agent, config_file=args.config_file, renew_token=args.renew_token) agent.register_process("acq", p.acq, p._stop_acq, startup=init_params) runner.run(agent, auto_reconnect=True) if __name__ == "__main__": main()