import argparse
import os
import subprocess
import time
import traceback
import sqlalchemy
import txaio
from ocs import ocs_agent, site_config
from socs.db.suprsync import SupRsyncFileHandler, SupRsyncFilesManager
[docs]
class SupRsync:
"""
Agent to rsync files to a remote (or local) destination, verify successful
transfer, and delete local files after a specified amount of time.
Parameters
--------------
agent : OCSAgent
OCS agent object
args : Namespace
Namespace with parsed arguments
Attributes
------------
agent : OCSAgent
OCS agent object
log : txaio.tx.Logger
txaio logger object, created by the OCSAgent
archive_name : string
Name of the managed archive. Sets which files in the suprsync db should
be copied.
ssh_host : str, optional
Remote host to copy data to. If None, will copy data locally.
ssh_key : str, optional
ssh-key to use to access the ssh host.
remote_basedir : path
Base directory on the destination server to copy files to
db_path : path
Path of the sqlite db to monitor
delete_after : float
Seconds after which this agent will delete successfully copied files.
cmd_timeout : float
Time (sec) for which cmds run on the remote will timeout
copy_timeout : float
Time (sec) after which a copy command will timeout
"""
def __init__(self, agent: ocs_agent.OCSAgent, args: argparse.Namespace) -> None:
self.agent = agent
self.instance_id = args.instance_id
self.log = txaio.make_logger()
self.archive_name = args.archive_name
self.ssh_host = args.ssh_host
self.ssh_key = args.ssh_key
self.remote_basedir = args.remote_basedir
self.db_path = args.db_path
self.delete_after = args.delete_local_after
self.max_copy_attempts = args.max_copy_attempts
self.running = False
self.cmd_timeout = args.cmd_timeout
self.copy_timeout = args.copy_timeout
self.files_per_batch = args.files_per_batch
self.sleep_time = args.sleep_time
self.compression = args.compression
self.bwlimit = args.bwlimit
self.suprsync_file_root = args.suprsync_file_root
self.db_echo: bool = args.db_echo
self.db_pool_size: int = args.db_pool_size
self.db_pool_max_overflow: int = args.db_pool_max_overflow
self.chmod = args.chmod
# Feed for counting transfer errors, loop iterations.
self.agent.register_feed('transfer_stats',
record=True,
agg_params={
'exclude_aggregator': True,
})
self.agent.register_feed('archive_stats', record=True)
[docs]
def run(self, session, params=None):
"""run()
**Process** - Main run process for the SupRsync agent. Continuosly
checks the suprsync db checking for files that need to be handled.
Notes:
The session data object contains info about recent
transfers::
>>> response.session['data']
{
"activity": "idle",
"timestamp": 1661284493.5398622,
"last_copy": {
"start_time": 1661284493.5333128,
"files": [],
"stop_time": 1661284493.5398622
},
"archive_stats": {
"smurf": {
"finalized_until": 1687797424.652119,
"num_files": 3,
"uncopied_files": 0,
"last_file_added": "/path/to/file",
"last_file_copied": "/path/to/file"
}
},
"counters": {
"iterations": 1,
"copies": 0,
"errors_timeout": 0,
"errors_nonzero": 0,
"errors_sqlite": 0
},
}
"""
# DB: These two objects don't establish a DB session on creation, just
# within their respective methods.
srfm = SupRsyncFilesManager(
self.db_path, create_all=True, echo=self.db_echo,
pool_size=self.db_pool_size, max_overflow=self.db_pool_max_overflow
)
handler = SupRsyncFileHandler(
srfm, self.archive_name, self.remote_basedir, ssh_host=self.ssh_host,
ssh_key=self.ssh_key, cmd_timeout=self.cmd_timeout,
copy_timeout=self.copy_timeout, compression=self.compression,
bwlimit=self.bwlimit, chmod=self.chmod
)
self.running = True
# Note this will also be stored directly in session.data
counters = {
'iterations': 0,
'copies': 0,
'errors_timeout': 0,
'errors_nonzero': 0,
'errors_sqlite': 0,
}
session.data = {
'activity': 'idle',
'last_copy': {},
'counters': counters,
}
next_feed_update = 0
# update tcdirs every six-hours
last_tcdir_update = 0
tcdir_update_interval = 6 * 3600
while self.running:
counters['iterations'] += 1
op = {'start_time': time.time()}
# Copy files to remote, check remote md5sums.
try:
session.data['activity'] = 'copying'
# DB: Begins a session, queries, and writes when done.
op['files'] = handler.copy_files(max_copy_attempts=self.max_copy_attempts,
num_files=self.files_per_batch)
counters['copies'] += len(op['files'])
session.degraded = False
except subprocess.TimeoutExpired as e:
self.log.error("Timeout when copying files! {e}", e=e)
op['error'] = 'timed out'
counters['errors_timeout'] += 1
except subprocess.CalledProcessError as e:
self.log.error("rsync returned non-zero exit code! {e}", e=e)
op['error'] = 'nonzero exit'
counters['errors_nonzero'] += 1
except sqlalchemy.exc.OperationalError as e: # database is locked
session.degraded = True
counters['errors_sqlite'] += 1
self.log.warn(f"Operational error: {e}")
self.log.debug("{e}", e=traceback.format_exc())
self.log.info("Waiting 5 seconds before continuing...")
time.sleep(5)
continue
now = time.time()
# Record 5-digit timecode dirs in DB. (Doesn't actually create any
# directories on disk.)
if now - last_tcdir_update > tcdir_update_interval:
# add timecode-dirs for all files from the last week
self.log.info("Creating timecode dirs for recent files.....")
try:
# DB: Queries w/occasional writes when a new dir is found.
srfm.create_all_timecode_dirs(
self.archive_name, min_ctime=now - (7 * 24 * 3600)
)
self.log.info("Finished creating tcdirs")
last_tcdir_update = now
session.degraded = False
except sqlalchemy.exc.OperationalError as e: # database is locked
session.degraded = True
counters['errors_sqlite'] += 1
self.log.warn(f"Operational error: {e}")
self.log.debug("{e}", e=traceback.format_exc())
self.log.info("Waiting 5 seconds before continuing...")
time.sleep(5)
continue
# Compute archive statistics.
try:
# DB: Query only.
archive_stats = srfm.get_archive_stats(self.archive_name)
session.degraded = False
except sqlalchemy.exc.OperationalError as e: # database is locked
session.degraded = True
counters['errors_sqlite'] += 1
self.log.warn(f"Operational error: {e}")
self.log.debug("{e}", e=traceback.format_exc())
self.log.info("Waiting 5 seconds before continuing...")
time.sleep(5)
continue
if archive_stats is not None:
self.agent.publish_to_feed('archive_stats', {
'block_name': self.archive_name,
'timestamp': now,
'data': archive_stats
})
session.data['archive_stats'] = archive_stats
op['stop_time'] = now
session.data['last_copy'] = op
session.data['timestamp'] = now
if now >= next_feed_update:
self.agent.publish_to_feed('transfer_stats', {
'block_name': 'block0',
'timestamp': now,
'data': counters})
next_feed_update = now + 10 * 60
# Delete transferred files from disk after specified time.
if self.delete_after is not None:
session.data['activity'] = 'deleting'
try:
# DB: Begins a session, queries, and writes.
handler.delete_files(self.delete_after)
session.degraded = False
except sqlalchemy.exc.OperationalError as e: # database is locked
session.degraded = True
counters['errors_sqlite'] += 1
self.log.warn(f"Operational error: {e}")
self.log.debug("{e}", e=traceback.format_exc())
self.log.info("Waiting 5 seconds before continuing...")
time.sleep(5)
continue
# After handling files, update the timecode dirs
try:
# DB: Mostly queries, w/occasional writes.
srfm.update_all_timecode_dirs(
self.archive_name, self.suprsync_file_root, self.instance_id)
session.degraded = False
except sqlalchemy.exc.OperationalError as e: # database is locked
session.degraded = True
counters['errors_sqlite'] += 1
self.log.warn(f"Operational error: {e}")
self.log.debug("{e}", e=traceback.format_exc())
self.log.info("Waiting 5 seconds before continuing...")
time.sleep(5)
continue
session.data['activity'] = 'idle'
time.sleep(self.sleep_time)
return True, "Stopped run process"
def _stop(self, session, params=None):
self.running = False
session.set_status('stopping')
def make_parser(parser=None):
if parser is None:
parser = argparse.ArgumentParser()
pgroup = parser.add_argument_group('Agent Options')
pgroup.add_argument('--archive-name', required=True, type=str,
help="Name of managed archive. Determines which files "
"should be copied")
pgroup.add_argument('--remote-basedir', required=True, type=str,
help="Base directory on the remote server where files "
"will be copied")
pgroup.add_argument('--db-path', required=True, type=str,
help="Path to the suprsync sqlite db")
pgroup.add_argument('--ssh-host', type=str, default=None,
help="Remote host to copy files to (e.g. "
"'<user>@<host>'). If None, will copy files locally")
pgroup.add_argument('--ssh-key', type=str,
help="Path to ssh-key needed to access remote host")
pgroup.add_argument('--delete-local-after', type=float,
help="Time (sec) after which this agent will delete "
"local copies of successfully transfered files. "
"If None, will not delete files.")
pgroup.add_argument('--max-copy-attempts', type=int, default=5,
help="Number of failed copy attempts before the agent "
"will stop trying to copy a file")
pgroup.add_argument('--copy-timeout', type=float,
help="Time (sec) before the rsync command will timeout")
pgroup.add_argument('--cmd-timeout', type=float,
help="Time (sec) before remote commands will timeout")
pgroup.add_argument('--files-per-batch', type=int,
help="Number of files to copy over per batch. Default "
"is None, which will copy over all available files.")
pgroup.add_argument('--sleep-time', type=float, default=60,
help="Time to sleep (sec) in between copy iterations")
pgroup.add_argument('--compression', action='store_true', default=False,
help="Activate gzip on data transfer (rsync -z)")
pgroup.add_argument('--bwlimit', type=str, default=None,
help="Bandwidth limit arg (passed through to rsync)")
pgroup.add_argument('--suprsync-file-root', type=str, required=True,
help="Local path where agent will write suprsync files")
pgroup.add_argument('--db-echo', action='store_true', help="Echos db queries")
pgroup.add_argument(
'--db-pool-size', type=int, default=5,
help="Number of connections to the suprsync db to keep open inside the "
"connection pool"
)
pgroup.add_argument(
'--db-pool-max-overflow', type=int, default=10,
help="Number of connections to allow in the overflow pool."
)
pgroup.add_argument('--chmod', type=str, default="g+rwX,o+rX",
help="Comma-separated chmod strings to apply to file permissions "
"on transfer. Defaults to making sure files are group-writeable "
"and world-readable.")
return parser
def main(args=None):
parser = make_parser()
args = site_config.parse_args('SupRsync',
parser=parser,
args=args)
txaio.start_logging(level=os.environ.get("LOGLEVEL", "info"))
agent, runner = ocs_agent.init_site_agent(args)
suprsync = SupRsync(agent, args)
agent.register_process('run', suprsync.run, suprsync._stop, startup=True)
runner.run(agent, auto_reconnect=True)
if __name__ == '__main__':
main()