from .spec import Spec from typing import Union from pathlib import Path from datetime import datetime import subprocess class ContainerSpec(Spec): """ A spec for backing up via a container. """ _SKEL = {"container": None, "command": None, "mountpoint": "/from"} def __init__( self, name: str, container: str, destination: Union[str, Path], limit: int, command: str, extension: str, mountpoint: str, notify=None, ): super().__init__(name, destination, limit, extension, notify) self.container = container self.mountpoint = mountpoint self.command = command def backup(self): # Remove excess backups self.remove_backups() # Run actual backup command filename = "{}.{}".format( datetime.now().strftime("%Y-%m-%d_%H-%M-%S"), self.extension ) process = subprocess.run( "docker exec -t '{}' {} > '{}/{}'".format( self.container, self.command, self.destination, filename ), shell=True, ) if self.notifier: self.notifier.notify("backup", self.name, process.returncode)