Source code for socs.db.suprsync

import os
import subprocess
import tempfile
import time
from contextlib import nullcontext
from functools import wraps

import txaio
import yaml
from sqlalchemy import (Boolean, Column, Float, ForeignKey, Integer, String,
                        asc, create_engine, text)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

from socs.util import get_md5sum

TABLE_VERSION = 0
txaio.use_twisted()


Base = declarative_base()

# Number of days after which a timecode directory will be marked as complete if
# the subsequent timecode dir has not been created.
DAYS_TO_COMPLETE_TCDIR = 1


[docs] class TimecodeDir(Base): """ Table for information about 'timecode' directories. These are directories in particular archives such as 'smurf' that we care about tracking whether or not they've completed syncing. Timecode directories must start with a 5-digit time-code. Attributes --------------- timecode: int Timecode for directory. Must be 5 digits, and will be roughly 1 a day. archive_name : str Archive the directory is in. completed : bool True if we expect no more files to be added to this directory. synced : bool True if all files in this directory have been synced to the remote. finalized : bool True if the 'finalization' file has been written and added to the db. finalize_file_id : int ID for the SupRsyncFile object that is the finalization file for this timecode dir. """ __tablename__ = f"timecode_dirs_v{TABLE_VERSION}" id = Column(Integer, primary_key=True) timecode = Column(Integer, nullable=False) archive_name = Column(String, nullable=False) completed = Column(Boolean, default=False) synced = Column(Boolean, default=False) finalized = Column(Boolean, default=False) finalize_file_id = Column(Integer, ForeignKey(f"supersync_v{TABLE_VERSION}.id"))
[docs] class SupRsyncFile(Base): """ Files table utilized by the SupRsync agent. Attributes ---------- local_path : String Absolute path of the local file to be copied local_md5sum : String locally calculated checksum archive_name : String Name of the archive, i.e. `timestreams` or `smurf`. Each archive is managed by its own SupRsync instance, so they can be copied to different base-dirs or hosts. remote_path : String Path of the file on the remote server relative to the base-dir. specified in the SupRsync agent config. remote_md5sum : String, optional Md5sum calculated on remote machine timestamp : Float Timestamp that file was added to db copied : Float, optional Time at which file was transfered removed : Float, optional Time at which file was removed from local server. failed_copy_attempts : Int Number of failed copy attempts deletable : Bool Whether file should be deleted after copying ignore : Bool If true, file will be ignored by SupRsync agent and not included in `finalized_until`. """ __tablename__ = f"supersync_v{TABLE_VERSION}" id = Column(Integer, primary_key=True) local_path = Column(String, nullable=False) local_md5sum = Column(String, nullable=False) archive_name = Column(String, nullable=False) remote_path = Column(String, nullable=False) timestamp = Column(Float, nullable=False) remote_md5sum = Column(String) copied = Column(Float) removed = Column(Float) failed_copy_attempts = Column(Integer, default=0) deletable = Column(Boolean, default=True) ignore = Column(Boolean, default=False) def __str__(self): excl = ('_sa_adapter', '_sa_instance_state') d = { k: v for k, v in vars(self).items() if not k.startswith('_') and not any(hasattr(v, a) for a in excl) } s = "SupRsyncFile:\n" s += "\n".join([ f" {k}: {v}" for k, v in d.items() ]) return s
[docs] def split_path(path): """Splits path into a list where each element is a subdirectory""" return os.path.normpath(path).strip('/').split('/')
[docs] def check_timecode(file: SupRsyncFile): """ Tries to extract timecode from the remote path. If it fails, returns None. """ split = split_path(file.remote_path) try: timecode = int(split[0]) if len(str(timecode)) != 5: # Timecode must be 5 digits raise ValueError("Timecode not 5 digits") return timecode except ValueError: return None
[docs] def create_file(local_path, remote_path, archive_name, local_md5sum=None, timestamp=None, deletable=True): """ Creates SupRsyncFiles object. Args ---- local_path : String or Path Absolute path of the local file to be copied remote_path : String Path of the file on the remote server relative to the base-dir. specified in the SupRsync agent config. archive_name : String Name of the archive, i.e. `timestreams` or `smurf`. Each archive is managed by its own SupRsync instance, so they can be copied to different base-dirs or hosts. local_md5sum : String, optional locally calculated checksum. If not specified, will calculate md5sum automatically. timestamp: Timestamp of file. If None is specified, will use the current time. deletable : bool If true, can be deleted by suprsync agent """ local_path = str(local_path) remote_path = str(remote_path) if local_md5sum is None: local_md5sum = get_md5sum(local_path) if timestamp is None: timestamp = time.time() file = SupRsyncFile( local_path=local_path, local_md5sum=local_md5sum, remote_path=remote_path, archive_name=archive_name, timestamp=timestamp ) if deletable is not None: file.deletable = deletable return file
[docs] def beginsession(f): """A decorator to wrap instance methods with a context manager for beginning and properly closing SQLAlchemy sessions within the SupRsyncFilesManager. This is to facilitate automatic clean up of sessions in methods that interact with the database. """ @wraps(f) def wrapper(self, *args, session=None, **kwargs): if session is None: cm = self.Session.begin() else: cm = nullcontext(session) with cm as context_session: kwargs.update({'session': context_session}) return f(self, *args, **kwargs) return wrapper
[docs] class SupRsyncFilesManager: """ Helper class for accessing and adding entries to the SupRsync files database. Args ----- db_path : path path to sqlite db create_all : bool Create table if it hasn't been generated yet. echo : bool If true, writes sql statements to stdout """ def __init__( self, db_path: str, create_all: bool = True, echo: bool = False, pool_size: int = 5, max_overflow: int = 10 ) -> None: self.log = txaio.make_logger() db_path = os.path.abspath(db_path) if not os.path.exists(os.path.dirname(db_path)): os.makedirs(os.path.dirname(db_path)) self._engine = create_engine( f'sqlite:///{db_path}', echo=echo, pool_size=pool_size, max_overflow=max_overflow, ) # Use WAL to reduce concurrency issues with self._engine.connect() as conn: result = conn.execute(text("PRAGMA journal_mode=WAL")) self.log.info(f"SQLite journal_mode is: {result.scalar()}") self.Session = sessionmaker(bind=self._engine) if create_all: Base.metadata.create_all(self._engine)
[docs] @beginsession def get_archive_stats(self, archive_name, session=None): files = session.query(SupRsyncFile).filter( SupRsyncFile.archive_name == archive_name, ).order_by(asc(SupRsyncFile.timestamp)).all() finalized_until = None num_files_to_copy = 0 last_file_added = '' last_file_copied = '' for f in files: last_file_added = f.local_path if (f.local_md5sum == f.remote_md5sum): last_file_copied = f.local_path if (not f.ignore) and (f.local_md5sum != f.remote_md5sum): num_files_to_copy += 1 if finalized_until is None and not (f.ignore): if f.local_md5sum != f.remote_md5sum: finalized_until = f.timestamp - 1 # There are no more uncopied files that aren't ignored if finalized_until is None: finalized_until = time.time() stats = { 'finalized_until': finalized_until, 'num_files': len(files), 'uncopied_files': num_files_to_copy, 'last_file_added': last_file_added, 'last_file_copied': last_file_copied, } return stats
[docs] @beginsession def get_finalized_until(self, archive_name, session=None): """ Returns a timetamp for which all files preceding are either successfully copied, or ignored. If all files are copied, returns the current time. Args ------ archive_name : String Archive name to get finalized_until for session : sqlalchemy session SQLAlchemy session to use. If none is passed, will create a new session """ query = session.query(SupRsyncFile).filter( SupRsyncFile.archive_name == archive_name, ).order_by(asc(SupRsyncFile.timestamp)) for file in query.all(): if file.ignore: continue if file.local_md5sum != file.remote_md5sum: return file.timestamp - 1 else: return time.time()
[docs] @beginsession def add_file(self, local_path, remote_path, archive_name, local_md5sum=None, timestamp=None, session=None, deletable=True): """ Adds file to the SupRsyncFiles table. Args ---- local_path : String Absolute path of the local file to be copied remote_path : String Path of the file on the remote server relative to the base-dir. specified in the SupRsync agent config. archive_name : String Name of the archive, i.e. `timestreams` or `smurf`. Each archive is managed by its own SupRsync instance, so they can be copied to different base-dirs or hosts. local_md5sum : String, optional locally calculated checksum. If not specified, will calculate md5sum automatically. session : sqlalchemy session Session to use to add the SupRsyncFile. If None, will create a new session and commit afterwards. deletable : bool If true, can be deleted by suprsync agent """ file = create_file(local_path, remote_path, archive_name, local_md5sum=local_md5sum, timestamp=timestamp, deletable=deletable) self._add_file_tcdir(file, session) session.add(file) return file
[docs] @beginsession def get_copyable_files(self, archive_name, session=None, max_copy_attempts=None, num_files=None): """ Gets all SupRsyncFiles that are copyable, meaning they satisfy: - local and remote md5sums do not match - Failed copy attempts is below the max number of attempts Args ---- archive_name : string Name of archive to get files from session : sqlalchemy session Session to use to get files. If none is specified, one will be created. You need to specify this if you wish to change file data and commit afterwards. max_copy_attempts : int Max number of failed copy atempts num_files : int Number of files to return """ if max_copy_attempts is None: max_copy_attempts = 2**10 query = session.query(SupRsyncFile).filter( SupRsyncFile.removed == None, # noqa: E711 SupRsyncFile.archive_name == archive_name, SupRsyncFile.failed_copy_attempts < max_copy_attempts, SupRsyncFile.ignore == False, # noqa: E712 ) files = [] for f in query.all(): if f.local_md5sum != f.remote_md5sum: files.append(f) if num_files is not None: if len(files) >= num_files: break return files
[docs] @beginsession def get_deletable_files(self, archive_name, delete_after, session=None): """ Gets all files that are deletable, meaning that the local and remote md5sums match, and they have existed longer than ``delete_after`` seconds. Args ----- archive_name : str Name of archive to pull files from delete_after : float Time since creation (in seconds) for which it's ok to delete files. session : sqlalchemy session Session to use to query files. """ query = session.query(SupRsyncFile).filter( SupRsyncFile.removed == None, # noqa: E711 SupRsyncFile.archive_name == archive_name, SupRsyncFile.deletable, ) files = [] now = time.time() for f in query.all(): if f.local_md5sum == f.remote_md5sum: if now > f.timestamp + delete_after: files.append(f) return files
[docs] @beginsession def get_known_files(self, archive_name, session=None, min_ctime=None): """Gets all files. This can be used to help avoid double-registering files. Args ----- archive_name : str Name of archive to pull files from session : sqlalchemy session Session to use to query files. min_ctime : float, optional minimum ctime to use when querying files. """ if min_ctime is None: min_ctime = 0 query = session.query(SupRsyncFile).filter( SupRsyncFile.archive_name == archive_name, SupRsyncFile.timestamp > min_ctime, ).order_by(asc(SupRsyncFile.timestamp)) return list(query.all())
def _add_file_tcdir(self, file: SupRsyncFile, session): """ Creates and adds a TimecodeDir for a file if possible. This will attempt to extract the timecode from the remote filename, and will create a new TimecodeDir if it doesn't already exist. """ # print(file.remote_path) tc = check_timecode(file) if tc is None: return None tcdir = session.query(TimecodeDir).filter( TimecodeDir.timecode == tc, TimecodeDir.archive_name == file.archive_name, ).one_or_none() if tcdir is not None: return tcdir tcdir = TimecodeDir(timecode=tc, archive_name=file.archive_name) session.add(tcdir) return tcdir
[docs] def create_all_timecode_dirs(self, archive_name, min_ctime=None): with self.Session.begin() as session: files = self.get_known_files( archive_name, session=session, min_ctime=min_ctime) for file in files: self._add_file_tcdir(file, session)
[docs] def update_all_timecode_dirs(self, archive_name, file_root, sync_id): with self.Session.begin() as session: tcdirs = session.query(TimecodeDir).filter( TimecodeDir.archive_name == archive_name, ).all() for tcdir in tcdirs: self._update_tcdir(tcdir, session, file_root, sync_id)
def _update_tcdir(self, tcdir, session, file_root, sync_id): """ Takes the next series of actions for a timecode dir object. - If we expect no more files to be added to the tc dir, marks it as complete - If all files in the tc dir have been synced, marks it as synced - If the tc dir is synced and not finalized, creates the finalization file and marks as finalized. """ if tcdir.finalized: return now = time.time() if not tcdir.completed: all_tcs = session.query(TimecodeDir.timecode).all() for tc, in all_tcs: if tc > tcdir.timecode: # Mark as complete if there's a timecode after this one tcdir.completed = True break else: # No timecodes after this one. Mark after complete if we are # over a full day away. if (now // 1e5 - tcdir.timecode) > DAYS_TO_COMPLETE_TCDIR: tcdir.completed = True # Gets all files in this tcdir files = session.query(SupRsyncFile).filter( SupRsyncFile.remote_path.like(f'{tcdir.timecode}/%') ).all() if tcdir.completed and not tcdir.synced: for f in files: if f.local_md5sum != f.remote_md5sum: break # File is not synced properly else: tcdir.synced = True if tcdir.synced and not tcdir.finalized: # Finalize file # Get subdirs this suprsync instance is responsible for subdirs = set() for f in files: split = split_path(f.remote_path) if len(split) > 2: subdirs.add(split[1]) tcdir_summary = { 'timecode': tcdir.timecode, 'num_files': len(files), 'subdirs': list(subdirs), 'finalized_at': now, 'finalized_until': self.get_finalized_until(tcdir.archive_name), 'archive_name': tcdir.archive_name, 'instance_id': sync_id } tc = int(now // 1e5) timestamp = int(now) fname = f'{timestamp}_{tcdir.archive_name}_{tcdir.timecode}_finalized.yaml' finalize_local_path = os.path.join( file_root, str(tc), sync_id, fname, ) finalize_remote_path = os.path.join( str(tc), 'suprsync', sync_id, fname ) os.makedirs(os.path.dirname(finalize_local_path), exist_ok=True) with open(finalize_local_path, 'w') as f: yaml.dump(tcdir_summary, f) file = self.add_file( finalize_local_path, finalize_remote_path, tcdir.archive_name, session=session, timestamp=now ) session.add(file) session.flush() tcdir.finalized = True tcdir.finalize_file_id = file.id
[docs] class SupRsyncFileHandler: """ Helper class to handle files in the suprsync db and copy them to their dest / delete them if enough time has passed. """ def __init__(self, file_manager, archive_name, remote_basedir, ssh_host=None, ssh_key=None, cmd_timeout=None, copy_timeout=None, compression=None, bwlimit=None, chmod=None): self.srfm = file_manager self.archive_name = archive_name self.ssh_host = ssh_host self.ssh_key = ssh_key self.remote_basedir = remote_basedir self.log = txaio.make_logger() self.cmd_timeout = cmd_timeout self.copy_timeout = copy_timeout self.compression = compression self.bwlimit = bwlimit self.chmod = chmod
[docs] def run_on_remote(self, cmd, timeout=None): """ Runs a command on the remote server or locally if ssh_host is None. Parameters ----------- cmd : list Command to be run """ _cmd = [] if self.ssh_host is not None: _cmd += ['ssh', self.ssh_host] if self.ssh_key is not None: _cmd.extend(['-i', self.ssh_key]) _cmd += cmd if timeout is None: timeout = self.cmd_timeout self.log.debug(f"Running: {' '.join(_cmd)}") res = subprocess.run(_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout, check=True) if res.stderr: self.log.error("stderr for cmd: {cmd}\n{err}", cmd=_cmd, err=res.stderr.decode()) return res
[docs] def copy_files(self, max_copy_attempts=None, num_files=None): """ Copies a batch of files, and computes remote md5sums. Args ---- max_copy_attempts : int Max number of failed copy atempts num_files : int Number of files to return Returns ------- copy_attempts : list of (str, bool) Each entry of the list provides the path to the copied file, and a bool indicating wheter the remote md5sum matched. """ output = [] with self.srfm.Session.begin() as session: files = self.srfm.get_copyable_files( self.archive_name, max_copy_attempts=max_copy_attempts, num_files=num_files, session=session ) if not files: return [] if self.ssh_host is not None: dest = self.ssh_host + ':' + self.remote_basedir else: dest = self.remote_basedir # Creates temp directory with remote dir structure of symlinks for # rsync to copy. file_map = {} remote_paths = [] with tempfile.TemporaryDirectory() as tmp_dir: self.log.info("Copying files:") for file in files: self.log.info(f"- {file.local_path}") tmp_path = os.path.join(tmp_dir, file.remote_path) os.makedirs(os.path.dirname(tmp_path), exist_ok=True) if not os.path.exists(file.local_path): self.log.warn("Cannot find file {path}", path=file.local_path) file.failed_copy_attempts += 1 continue if os.path.exists(tmp_path): self.log.warn("Temp file {path} already exists!", path=tmp_path) file.failed_copy_attempts += 1 continue os.symlink(file.local_path, tmp_path) remote_path = os.path.normpath( os.path.join(self.remote_basedir, file.remote_path) ) remote_paths.append(remote_path) file_map[remote_path] = file cmd = ['rsync', '-Lrt'] if self.chmod: cmd += ['-p', f'--chmod={self.chmod}'] if self.compression: cmd.append('-z') if self.bwlimit: cmd.append(f'--bwlimit={self.bwlimit}') if self.ssh_key is not None: cmd.extend(['--rsh', f'ssh -i {self.ssh_key}']) cmd.extend([tmp_dir + '/', dest]) self.log.debug(f"Running: {' '.join(cmd)}") subprocess.run(cmd, check=True, timeout=self.copy_timeout) # Mark all files as 'copied' to remote. for file in files: file.copied = time.time() self.log.info("Checksumming on remote.") res = self.run_on_remote(['md5sum'] + remote_paths) for line in res.stdout.decode().split('\n'): split = line.split() # If file cannot be found, line will say: # "md5sum: file: No such file or directory if len(split) != 2: continue md5sum, path = line.split() key = os.path.normpath(path) if key in file_map: file_map[key].remote_md5sum = md5sum for file in files: md5_ok = (file.remote_md5sum == file.local_md5sum) output.append((file.local_path, md5_ok)) if not md5_ok: file.failed_copy_attempts += 1 self.log.info( f"Copy failed for file {file.local_path}! " f"(copy attempts: {file.failed_copy_attempts})" ) self.log.info(f"Local md5: {file.local_md5sum}, " f"remote_md5: {file.remote_md5sum}") self.log.info("Copy session complete.") return output
[docs] def delete_files(self, delete_after): """ Gets deletable files, deletes them, and updates file info Args ----- delete_after : float Time since creation (in seconds) for which it's ok to delete files. """ with self.srfm.Session.begin() as session: files = self.srfm.get_deletable_files( self.archive_name, delete_after, session=session ) for file in files: if os.path.exists(file.local_path): try: self.log.info(f"Removing file {file.local_path}") os.remove(file.local_path) file.removed = time.time() except PermissionError: self.log.error( f"Permission error: Could not remove {file.local_path}" ) else: self.log.warn( f"File {file.local_path} no longer exists! " "Updating remove time to be 0" ) file.removed = 0