2021-01-15 17:58:29 +01:00
|
|
|
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
|
|
from typing import Union, Dict
|
|
|
|
import skeleton
|
|
|
|
import os
|
2021-01-15 21:08:56 +01:00
|
|
|
from notifier import Notifier
|
|
|
|
import inspect
|
2021-01-15 17:58:29 +01:00
|
|
|
|
|
|
|
|
|
|
|
class Spec:
|
|
|
|
"""
|
|
|
|
Base class for all other spec types.
|
|
|
|
"""
|
|
|
|
|
2021-01-15 21:08:56 +01:00
|
|
|
_SKEL = {
|
2021-01-15 17:58:29 +01:00
|
|
|
"destination": None,
|
|
|
|
"limit": None,
|
2021-01-15 21:08:56 +01:00
|
|
|
"notify": {
|
|
|
|
"title": "Backup Notification",
|
|
|
|
"events": ["backup_sucess"],
|
2021-01-15 21:29:23 +01:00
|
|
|
"endpoint": None,
|
|
|
|
"api_key": "",
|
2021-01-15 21:08:56 +01:00
|
|
|
},
|
2021-01-15 17:58:29 +01:00
|
|
|
"extension": "tar.gz",
|
|
|
|
}
|
|
|
|
|
|
|
|
def __init__(
|
|
|
|
self,
|
|
|
|
name: str,
|
|
|
|
destination: Union[Path, str],
|
|
|
|
limit: int,
|
|
|
|
extension: str,
|
2021-01-15 21:08:56 +01:00
|
|
|
notify=None,
|
2021-01-15 17:58:29 +01:00
|
|
|
):
|
|
|
|
"""
|
|
|
|
Args:
|
|
|
|
name: name of the spec
|
|
|
|
destination: directory where the backups shall reside
|
|
|
|
limit: max amount of backups
|
|
|
|
notifier: notifier object
|
|
|
|
"""
|
|
|
|
|
|
|
|
self.name = name
|
|
|
|
self.destination = (
|
|
|
|
destination if type(destination) == Path else Path(destination)
|
|
|
|
)
|
|
|
|
|
|
|
|
# Check existence of destination folder
|
|
|
|
if not self.destination.exists() or not self.destination.is_dir():
|
|
|
|
raise NotADirectoryError(
|
|
|
|
"{} doesn't exist or isn't a directory.".format(
|
|
|
|
self.destination
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
self.limit = limit
|
2021-01-15 21:08:56 +01:00
|
|
|
self.notifier = Notifier(*notify) if notify else None
|
2021-01-15 17:58:29 +01:00
|
|
|
self.extension = extension
|
|
|
|
|
2021-01-15 21:08:56 +01:00
|
|
|
@classmethod
|
|
|
|
def skeleton(cls):
|
|
|
|
return skeleton.merge(
|
|
|
|
*[val._SKEL for val in reversed(inspect.getmro(cls)[:-1])]
|
|
|
|
)
|
|
|
|
|
2021-01-15 17:58:29 +01:00
|
|
|
def remove_backups(self):
|
|
|
|
"""
|
|
|
|
Remove all backups exceeding the limit
|
|
|
|
"""
|
|
|
|
|
|
|
|
files = sorted(
|
2021-01-15 21:29:23 +01:00
|
|
|
self.destination.glob("*." + self.extension),
|
2021-01-15 17:58:29 +01:00
|
|
|
key=os.path.getmtime,
|
|
|
|
reverse=True,
|
|
|
|
)
|
|
|
|
|
|
|
|
if len(files) >= self.limit:
|
|
|
|
for path in files[self.limit - 1 :]:
|
|
|
|
path.unlink()
|
|
|
|
|
|
|
|
def backup(self):
|
|
|
|
raise NotImplementedError()
|
|
|
|
|
|
|
|
def restore(self):
|
|
|
|
raise NotImplementedError()
|
|
|
|
|
|
|
|
@classmethod
|
2021-01-15 21:08:56 +01:00
|
|
|
def from_dict(cls, name, obj: Dict, defaults: Dict) -> Spec:
|
2021-01-15 17:58:29 +01:00
|
|
|
# Combine defaults with skeleton, creating new skeleton
|
2021-01-15 21:08:56 +01:00
|
|
|
skel = skeleton.merge(cls.skeleton(), defaults)
|
2021-01-15 17:58:29 +01:00
|
|
|
|
|
|
|
# Then, combine actual values with new skeleton
|
2021-01-15 21:08:56 +01:00
|
|
|
obj = skeleton.merge_with_skeleton(obj, skel)
|
2021-01-15 17:58:29 +01:00
|
|
|
|
|
|
|
return cls(name, **obj)
|
2021-01-15 21:08:56 +01:00
|
|
|
|
|
|
|
def to_dict(self):
|
|
|
|
raise NotImplementedError()
|