Started full rewrite; wrote first DirectorySpec

recovery-function
Jef Roosens 2021-01-15 17:58:29 +01:00
parent 75b5b5b316
commit bb33f7cbbc
9 changed files with 247 additions and 248 deletions

View File

@ -8,7 +8,7 @@ PYTHON=python3.8
'$(PYTHON)' -m venv .venv
.venv/bin/pip install -r requirements.txt -r requirements-dev.txt
venv: .venv/bin/python
venv: .venv/bin/activate
.PHONY: venv
format: venv

View File

@ -1,6 +1,6 @@
import argparse
import sys
from specs import parse_specs_file
from parser import read_specs_file
# This just displays the error type and message, not the stack trace
@ -20,6 +20,7 @@ parser.add_argument(
"--file",
action="append",
dest="file",
required=True,
help="File containing spec definitions.",
)
parser.add_argument(
@ -28,7 +29,7 @@ parser.add_argument(
action="store_const",
const=True,
default=False,
help="Print out the parsed specs as JSON " "and exit",
help="Print out the parsed specs as JSON and exit",
)
parser.add_argument(
"spec", nargs="*", help="The specs to process. Defaults to all."
@ -36,11 +37,11 @@ parser.add_argument(
# Parse arguments
args = parser.parse_args()
specs = sum([parse_specs_file(path) for path in args.file], [])
specs = sum([read_specs_file(path) for path in args.file], [])
# Filter specs if needed
if args.spec:
specs = filter(lambda s: s.name in args.spec, specs)
specs = list(filter(lambda s: s.name in args.spec, specs))
# Dump parsed data as json
if args.json:
@ -49,7 +50,10 @@ if args.json:
print(json.dumps([spec.to_dict() for spec in specs], indent=4))
else:
pass
# Run the backups
# for spec in specs:
# spec.backup()
if not specs:
print("No specs, exiting.")
sys.exit(0)
for spec in specs:
spec.backup()

36
app/parser.py 100644
View File

@ -0,0 +1,36 @@
from __future__ import annotations
import yaml
from pathlib import Path
from typing import List, Union
from specs import Spec, DirectorySpec
def read_specs_file(path: Union[str, Path]) -> List[Spec]:
with open(path, "r") as yaml_file:
data = yaml.safe_load(yaml_file, Loader=yaml.FullLoader)
categories = [("directories", DirectorySpec)]
specs = []
for key, class_name in categories:
if key not in data["specs"]:
continue
# Check what defaults are present
defaults = []
if data.get("defaults"):
if data["defaults"].get("all"):
defaults.append(data["defaults"]["all"])
if data["defaults"].get(key):
defaults.append(data["defaults"][key])
specs.extend(
[
class_name.from_dict(name, spec, *defaults)
for name, spec in data["specs"][key].items()
]
)
return specs

47
app/skeleton.py 100644
View File

@ -0,0 +1,47 @@
from typing import Dict
class InvalidKeyError(Exception):
def __init__(self, key):
self.message = "Invalid key: {}".format(key)
super().__init__(key)
class MissingKeyError(Exception):
def __init__(self, key):
self.message = "Missing key: {}".format(key)
super().__init__(key)
def combine(data: Dict, skel: Dict) -> Dict:
"""
Compare a dict with a given skeleton dict, and fill in default values where
needed.
"""
# First, check for illegal keys
for key in data:
if key not in skel:
raise InvalidKeyError(key)
# Then, check the default values
for key, value in skel.items():
if key not in data:
# Raise error if there's not default value
if value is None:
raise MissingKeyError(key)
# Replace with default value
data[key] = value
# Error if value is not same type as default value
elif type(data[key]) != type(value) and value is not None:
raise TypeError("Invalid value type")
# Recurse into dicts
elif type(value) == dict:
data[key] = combine_with_skeleton(data[key], value)
return data

View File

@ -1,2 +1,2 @@
from .specs import Spec
from .parser import parse_specs_file
from .spec import Spec
from .directory import DirectorySpec

View File

@ -0,0 +1,64 @@
from .spec import Spec
from pathlib import Path
from typing import Union
import subprocess
from datetime import datetime
class DirectorySpec(Spec):
"""
A spec for backing up a local directory.
"""
__SKEL = {
"name": None,
"source": None,
"destination": None,
"limit": None,
"notifier": None,
"command": "tar -czf '{destination}/{filename}' .",
"extension": "tar.gz",
}
def __init__(
self,
name: str,
source: Union[str, Path],
destination: Union[str, Path],
limit: int,
command: str,
extension: str,
notifier=None,
):
super().__init__(name, destination, limit, extension, notifier)
self.source = source if type(source) == Path else Path(source)
# Check existence of source directory
if not self.source.exists() or not self.source.is_dir():
raise NotADirectoryError(
"{} doesn't exist or isn't a directory.".format(self.source)
)
self.command = command
def backup(self):
# Remove excess backups
self.remove_backups()
# Run actual backup command
filename = "{}.{}".format(
datetime.now().strftime("%Y-%m-%d_%H-%M-%S"), self.extension
)
# TODO add logging stuff
process = subprocess.run(
self.command.format(
destination=self.destination,
filename=filename,
),
cwd=self.source,
)
if self.notifier:
self.notifier.notify(process.returncode)

View File

@ -1,96 +0,0 @@
import yaml
from pathlib import Path
from specs import Spec
from typing import List, Dict
class InvalidKeyError(Exception):
def __init__(self, key):
message = "Invalid key: {}".format(key)
super().__init__(key)
class MissingKeyError(Exception):
def __init__(self, key):
message = "Missing key: {}".format(key)
super().__init__(key)
def parse_specs_file(path: Path) -> List[Spec]:
"""
Parse a YAML file defining backup specs.
Args:
path: path to the specs file
Returns:
A list of specs
"""
# Skeleton of a spec config
# If a value is None, this means it doesn't have a default value and must be
# defined
spec_skel = {
"source": None,
"destination": None,
"limit": None,
"volume": False,
"notify": {"title": "Backup Notification", "events": ["failure"]},
}
# Read YAML file
with open(path, "r") as yaml_file:
data = yaml.load(yaml_file, Loader=yaml.Loader)
# Check specs section exists
if "specs" not in data:
raise MissingKeyError("specs")
# Allow for default notify settings
if "notify" in data:
spec_skel["notify"] = data["notify"]
specs = []
# Check format for each spec
for key in data["specs"]:
specs.append(
Spec.from_dict(
key, combine_with_skeleton(data["specs"][key], spec_skel)
)
)
return specs
def combine_with_skeleton(data: Dict, skel: Dict) -> Dict:
"""
Compare a dict with a given skeleton dict, and fill in default values where
needed.
"""
# First, check for illegal keys
for key in data:
if key not in skel:
raise InvalidKeyError(key)
# Then, check the default values
for key, value in skel.items():
if key not in data:
# Raise error if there's not default value
if value is None:
raise MissingKeyError(key)
# Replace with default value
data[key] = value
# Error if value is not same type as default value
elif type(data[key]) != type(value) and value is not None:
raise TypeError("Invalid value type")
# Recurse into dicts
elif type(value) == dict:
data[key] = combine_with_skeleton(data[key], value)
return data

86
app/specs/spec.py 100644
View File

@ -0,0 +1,86 @@
from __future__ import annotations
from pathlib import Path
from typing import Union, Dict
import skeleton
import os
class Spec:
"""
Base class for all other spec types.
"""
__SKEL = {
"name": None,
"destination": None,
"limit": None,
"notifier": None,
"extension": "tar.gz",
}
def __init__(
self,
name: str,
destination: Union[Path, str],
limit: int,
extension: str,
notifier=None,
):
"""
Args:
name: name of the spec
destination: directory where the backups shall reside
limit: max amount of backups
notifier: notifier object
"""
self.name = name
self.destination = (
destination if type(destination) == Path else Path(destination)
)
# Check existence of destination folder
if not self.destination.exists() or not self.destination.is_dir():
raise NotADirectoryError(
"{} doesn't exist or isn't a directory.".format(
self.destination
)
)
self.limit = limit
self.notifier = notifier
self.extension = extension
def remove_backups(self):
"""
Remove all backups exceeding the limit
"""
files = sorted(
self.destination.glob(self.extension),
key=os.path.getmtime,
reverse=True,
)
if len(files) >= self.limit:
for path in files[self.limit - 1 :]:
path.unlink()
def backup(self):
raise NotImplementedError()
def restore(self):
raise NotImplementedError()
@classmethod
def from_dict(cls, name, obj: Dict, *defaults: Dict) -> Spec:
# Combine defaults with skeleton, creating new skeleton
skel = cls.__SKEL
for default in defaults:
skel = skeleton.combine(defaults, skel)
# Then, combine actual values with new skeleton
obj = skeleton.combine(obj, skel)
return cls(name, **obj)

View File

@ -1,142 +0,0 @@
from datetime import datetime
import requests
import os
class Spec:
# Base class has no skeleton
__SKELETON = {}
def __init__(self, name, destination, limit, title, events=None):
self.name = name
self.destination = Path(destination)
self.limit = limit
self.title = title
self.events = [] if events is None else events
def to_dict(self):
return {
"name": self.name,
"destination": str(self.destination),
"limit": self.limit,
"notify": {"title": self.title, "events": self.events},
}
def backup(self):
raise NotImplementedError()
def remove_redundant(self):
tarballs = sorted(
self.destination.glob("*.tar.gz"),
key=os.path.getmtime,
reverse=True,
)
if len(tarballs) >= self.limit:
for path in tarballs[self.limit - 1 :]:
path.unlink()
def notify(self, status_code):
if status_code:
if "failure" not in self.events:
return
message = "backup for {} failed.".format(self.name)
else:
if "success" not in self.events:
return
message = "backup for {} succeeded.".format(self.name)
# Read API key from env vars
try:
key = os.environ["IFTTT_API_KEY"]
# Don't send notification if there's not API key defined
except KeyError:
return
url = "https://maker.ifttt.com/trigger/{}/with/key/{}".format(
"phone_notifications", key
)
data = {"value1": self.title, "value2": message}
requests.post(url, data=data)
def get_filename(self):
return "{}_{}.tar.gz".format(
self.name, datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
)
@staticmethod
def from_dict(name, data) -> "Specification":
if data.get("volume", False):
return VolumeSpec.from_dict(name, data)
return DirSpec.from_dict(name, data)
@staticmethod
def from_file(path: str):
with open(path, "r") as yaml_file:
data = yaml.load(yaml_file, Loader=yaml.Loader)
return [
Spec.from_dict(name, info) for name, info in data["specs"].items()
]
class DirSpec(Spec):
def __init__(self, name, source, destination, limit, title, events=None):
super().__init__(name, destination, limit, title, events)
self.source = Path(source)
def backup(self):
self.remove_redundant()
status_code = os.system(
"tar -C '{}' -czf '{}' -- .".format(
self.source, self.destination / self.get_filename()
)
)
self.notify(status_code)
@staticmethod
def from_dict(name, data):
return DirSpec(
name,
data["source"],
data["destination"],
data["limit"],
data["notify"]["title"],
data["notify"]["events"],
)
class VolumeSpec(Spec):
def __init__(self, name, volume, destination, limit, title, events=None):
super().__init__(name, destination, limit, title, events)
self.volume = volume
def backup(self):
status_code = os.system(
"docker run --rm -v '{}:/from' -v '{}:/to' alpine:latest "
"tar -C /from -czf '/to/{}' -- .".format(
self.volume, self.destination, self.get_filename()
)
)
@staticmethod
def from_dict(name, data):
return VolumeSpec(
name,
data["source"],
data["destination"],
data["limit"],
data["notify"]["title"],
data["notify"]["events"],
)