Compare commits

..

2 Commits

Author SHA1 Message Date
Jef Roosens 159d3de72b
Added recovery flag 2021-02-25 10:59:59 +01:00
Jef Roosens 33d8c301e8
Removed shortened stack trace 2021-02-25 10:54:54 +01:00
25 changed files with 115 additions and 657 deletions

View File

@ -1,8 +0,0 @@
# vim: ft=cfg
[flake8]
max-complexity = 7
docstring-convention=google
# Required to be compatible with black
extend-ignore = E203,W503
inline-quotes = double

3
.gitignore vendored
View File

@ -1,6 +1,3 @@
__pycache__/ __pycache__/
.venv/ .venv/
backup_tool backup_tool
*.egg-info/
.tox/
.coverage

View File

@ -1,31 +0,0 @@
matrix:
PYTHON_VERSION:
- 3.6
- 3.7
- 3.8
- 3.9
pipeline:
test:
group: test
image: python:${PYTHON_VERSION}-alpine
pull: true
commands:
- pip install -e .[test]
- pytest --cov=app --cov-fail-under=90 tests/
test-pypy:
group: test
image: pypy:3-7-slim
pull: true
commands:
- pip install -e .[test]
- pytest --cov=app --cov-fail-under=90 tests/
lint:
image: python:3.6-alpine
commands:
- apk update && apk add --no-cache build-base
- pip install -e .[lint]
- black --check setup.py app
- flake8 setup.py app

View File

@ -1,28 +1,25 @@
# =====CONFIG===== # =====CONFIG=====
PYTHON := python3.6 # Devop environment runs in 3.8
VENV := .venv PYTHON=python3.8
# =====RECIPES===== # =====RECIPES=====
# Create the virtual environment .venv/bin/activate: requirements.txt requirements-dev.txt
$(VENV)/bin/activate: setup.py '$(PYTHON)' -m venv .venv
'$(PYTHON)' -m venv '$(VENV)' .venv/bin/pip install -r requirements.txt -r requirements-dev.txt
'$(VENV)/bin/pip' install -e .[develop]
venv: $(VENV)/bin/activate venv: .venv/bin/activate
.PHONY: venv .PHONY: venv
# Format the codebase using Black
format: venv format: venv
@ '$(VENV)/bin/black' setup.py app @ .venv/bin/black app/*.py app/**/*.py
.PHONY: format .PHONY: format
# Remove any temporary files
clean: clean:
@ rm -rf '$(VENV)' .tox backup_tool rm -rf .venv
rm backup_tool
.PHONY: clean .PHONY: clean
# Pack the package into a zipfile
backup_tool: backup_tool:
@ cd app && \ @ cd app && \
zip -r ../app.zip * \ zip -r ../app.zip * \
@ -34,17 +31,5 @@ backup_tool:
app: backup_tool app: backup_tool
.PHONY: app .PHONY: app
# Install the app
install: app install: app
cp backup_tool /usr/local/bin cp backup_tool /usr/local/bin
.PHONY: install
# We can't force the develop to have all the versions locally, so
# the local tests only include python3.6
test: venv tox.ini
@ '$(VENV)/bin/tox' -e py36
.PHONY: test
lint: venv
@ '$(VENV)/bin/tox' -e lint
.PHONY: lint

View File

@ -1 +0,0 @@
"""Module containing all app code."""

View File

@ -1,63 +1,62 @@
"""The main entrypoint of the program."""
import argparse import argparse
import sys import sys
from parser import read_specs_file from parser import read_specs_file
def except_hook(ext_type, value, traceback): # Define parser
"""Make errors not show the stracktrace to stdout. parser = argparse.ArgumentParser(
Todo:
* Replace this with proper error handling
"""
sys.stderr.write("{}: {}\n".format(ext_type.__name__, value))
# sys.excepthook = except_hook
if __name__ == "__main__":
# Define parser
parser = argparse.ArgumentParser(
description="Backup directories and Docker volumes." description="Backup directories and Docker volumes."
) )
parser.add_argument(
parser.add_argument(
"-f", "-f",
"--file", "--file",
action="append", action="append",
dest="file", dest="file",
required=True, required=True,
help="File containing spec definitions.", help="File containing spec definitions.",
) )
parser.add_argument(
parser.add_argument(
"-j", "-j",
"--json", "--json",
action="store_const", action="store_const",
const=True, const=True,
default=False, default=False,
help="Print out the parsed specs as JSON and exit", help="Print out the parsed specs as JSON and exit",
) )
parser.add_argument(
parser.add_argument(
"-r",
"--recover",
action="append",
nargs=2,
metavar=("SPEC", "BACKUP"),
dest="recovers",
help="Recover the given spec; requires two arguments",
)
parser.add_argument(
"spec", nargs="*", help="The specs to process. Defaults to all." "spec", nargs="*", help="The specs to process. Defaults to all."
) )
# Parse arguments # Parse arguments
args = parser.parse_args() args = parser.parse_args()
specs = sum((read_specs_file(path) for path in args.file), []) specs = sum([read_specs_file(path) for path in args.file], [])
# Filter specs if needed # Filter specs if needed
if args.spec: if args.spec:
specs = list(filter(lambda s: s.name in args.spec, specs)) specs = list(filter(lambda s: s.name in args.spec, specs))
# Dump parsed data as json # Dump parsed data as json
if args.json: if args.json:
import json import json
# TODO replace this with error handling system
print(json.dumps([spec.to_dict() for spec in specs], indent=4)) print(json.dumps([spec.to_dict() for spec in specs], indent=4))
elif not specs: else:
# TODO replace this with error handling system # Run the backups
if not specs:
print("No specs, exiting.") print("No specs, exiting.")
sys.exit(0) sys.exit(0)

View File

@ -1,55 +0,0 @@
"""Common exceptions raised by the program."""
from typing import Union, List
class InvalidKeyError(Exception):
"""Thrown when a config file contains an invalid key."""
def __init__(self, keys: Union[str, List[str]]):
"""Create a new InvalidKeyError object with the given key.
Args:
keys: the invalid key(s)
"""
if type(keys) == str:
keys = [keys]
self.message = "Invalid key(s): {}".format(", ".join(keys))
super().__init__()
class MissingKeyError(Exception):
"""Thrown when a required key is missing from a config."""
def __init__(self, keys: Union[str, List[str]]):
"""Create a new MissingKeyError object with the given key.
Args:
keys: the invalid key(s)
"""
if type(keys) == str:
keys = [keys]
self.message = "Missing key(s): {}".format(", ".join(keys))
super().__init__()
class InvalidValueError(Exception):
"""Thrown when a key contains an invalid value."""
def __init__(self, key: str, expected: str, actual: str):
"""Create a new InvalidValueError given the arguments.
Args:
key: the key containing the invalid value
expected: name of the expected type
actual: name of the actual type
"""
self.message = (
f"Invalid value for key {key}: expected {expected}, "
f"got {actual}"
)
super().__init__()

View File

@ -1,109 +0,0 @@
"""This module contains the logging module."""
from typing import Union
from pathlib import Path
from datetime import datetime
import sys
class Logger:
"""A logger class that logs, ya get the point."""
LOG_LEVELS = [
"debug",
"info",
"warning",
"error",
"critical",
]
"""The log levels' names.
When used as arguments, the counting starts at 1
instead of 0.
"""
def __init__(
self,
log_file: Union[Path, str] = None,
append: bool = True,
stdout: bool = True,
log_level: int = 3,
):
"""Initialize a new Logger object.
Args:
log_file: path to a log file. If any of the folders within the log
file's path don't exist, they will get created. If no value is
specified, no log file is created.
append: wether or not to append to the existing file or overwrite
it. If False, the original file gets deleted during init.
stdout: wether or not to log to stdout as well
log_level: the minimum level to log
"""
self.log_file = Path(log_file) if log_file else None
self.stdout = stdout
self.log_level = log_level
# Remove the original log file
if not append:
self.log_file.unlink(missing_ok=True)
def custom(self, message: str, header: str = None):
"""Log a message given a header and a message.
If a header is provided (aka truthy), the final form of the messsage
wil be:
`[YYYY-MM-DD HH:MM:SS][header] message`
Otherwise, it's just:
`[YYYY-MM-DD HH:MM:SS] message`
Args:
message: the message to display
header: the header to add to the message
"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_message = f"[{timestamp}] {message}\n"
if header:
log_message = f"[{timestamp}][{header}] {message}\n"
if self.log_file:
self.log_file.write_text(log_message)
if self.stdout:
sys.stdout.write(log_message)
def log(self, level: int, message: str):
"""Log a message with a specific level.
Args:
level: log level (index in the LOG_LEVELS variable)
message: the message to log
"""
if level < self.log_level:
return
level_name = self.LOG_LEVELS[level - 1].upper()
self.custom(level_name, message)
def debug(self, message: str):
"""Log a debug message."""
self.log(1, message)
def info(self, message: str):
"""Log an info message."""
self.log(2, message)
def warning(self, message: str):
"""Log a warning message."""
self.log(3, message)
def error(self, message: str):
"""Log an error message."""
self.log(4, message)
def critical(self, message: str):
"""Log a critical message."""
self.log(5, message)

View File

@ -1,12 +1,9 @@
"""Module handling IFTTT notifications."""
from typing import List from typing import List
import os import os
import requests import requests
class Notifier: class Notifier:
"""A notifier object that can send IFTTT notifications."""
# (positive, negative) # (positive, negative)
_EVENTS = { _EVENTS = {
"backup": ( "backup": (
@ -18,40 +15,24 @@ class Notifier:
"Couldn't restore {name}.", "Couldn't restore {name}.",
), ),
} }
"""The message content for a given event."""
# Placeholder # Placeholder
def __init__( def __init__(
self, title: str, events: List[str], endpoint: str, api_key: str = None self, title: str, events: List[str], endpoint: str, api_key: str = None
): ):
"""Initialize a new Notifier object.
Args:
title: the notification title to use
events: the event types that should trigger a notification (should
be one of the keys in _EVENTS).
endpoint: IFTTT endpoint name
api_key: your IFTTT API key. If not provided, it will be read from
the IFTTT_API_KEY environment variable.
Todo:
* Read the API key on init
"""
self.title = title self.title = title
self.events = events self.events = events
self.endpoint = endpoint self.endpoint = endpoint
self.api_key = api_key self.api_key = api_key
def notify(self, category: str, name: str, status_code: int): def notify(self, category: str, name: str, status_code: int):
"""Send an IFTTT notification. """
Args: Args:
category: type of notify (should be one of the keys in _EVENTS). category: type of notify (e.g. backup or restore)
Only if the category was passed during initialization will the
notification be sent.
name: name of the spec name: name of the spec
status_code: exit code of the command status_code: exit code of the command
""" """
event = "{}_{}".format( event = "{}_{}".format(
category, "success" if status_code == 0 else "failure" category, "success" if status_code == 0 else "failure"
) )

View File

@ -1,4 +1,3 @@
"""Handles parsing a config file from disk."""
import yaml import yaml
from pathlib import Path from pathlib import Path
from typing import List, Union from typing import List, Union
@ -7,18 +6,9 @@ import skeleton
def read_specs_file(path: Union[str, Path]) -> List[Spec]: def read_specs_file(path: Union[str, Path]) -> List[Spec]:
"""Read a config file and merge it with the skeleton.
Args:
path: path to the yaml config file
Returns:
A list of specs, parsed from the config.
"""
with open(path, "r") as yaml_file: with open(path, "r") as yaml_file:
data = yaml.safe_load(yaml_file) data = yaml.safe_load(yaml_file)
# NOTE shouldn't this be defined as a top-level variable?
categories = [ categories = [
("directories", DirectorySpec), ("directories", DirectorySpec),
("volumes", VolumeSpec), ("volumes", VolumeSpec),
@ -33,7 +23,6 @@ def read_specs_file(path: Union[str, Path]) -> List[Spec]:
# Check what defaults are present # Check what defaults are present
defaults = {} defaults = {}
if data.get("defaults"): if data.get("defaults"):
if data["defaults"].get("all"): if data["defaults"].get("all"):
defaults = skeleton.merge(defaults, data["defaults"]["all"]) defaults = skeleton.merge(defaults, data["defaults"]["all"])

View File

@ -1,25 +1,21 @@
"""Handles merging with the skeleton config."""
from typing import Dict from typing import Dict
from .exceptions import InvalidKeyError, MissingKeyError
class InvalidKeyError(Exception):
def __init__(self, key):
self.message = "Invalid key: {}".format(key)
super().__init__(key)
class MissingKeyError(Exception):
def __init__(self, key):
self.message = "Missing key: {}".format(key)
super().__init__(key)
def merge(*dicts: [Dict]) -> Dict: def merge(*dicts: [Dict]) -> Dict:
"""Merge multiple dicts into one.
It reads the dicts from left to right, always preferring the "right"
dictionary's values. Therefore, the dictionaries should be sorted from
least important to most important (e.g. a default values skeleton should be
to the left of a dict of selected values).
Args:
dicts: the dictionaries to merge
Returns:
a new dictionary representing the merged dictionaries
Todo:
* Make sure an infinite loop is not possible
"""
# Base cases # Base cases
if len(dicts) == 0: if len(dicts) == 0:
return {} return {}
@ -48,28 +44,15 @@ def merge(*dicts: [Dict]) -> Dict:
def merge_with_skeleton(data: Dict, skel: Dict) -> Dict: def merge_with_skeleton(data: Dict, skel: Dict) -> Dict:
"""Merge a dictionary with a skeleton containing default values.
The skeleton not only defines what the default values are, but also
enforces a certain shape. This allows us to define a config file using a
dictionary and parse it.
Args:
data: dictionary containing the selected config values
skel: dictionary containing the skeleton (aka the def)
Returns:
a new dictionary representing the two merged dictionaries
Todo:
* Check if an infinite loop is possible
* Split info less complex functions
""" """
# First, check for illegal keys Compare a dict with a given skeleton dict, and fill in default values where
invalid_keys = list(filter(lambda k: k not in skel, data)) needed.
"""
if invalid_keys: # First, check for illegal keys
raise InvalidKeyError(invalid_keys) for key in data:
if key not in skel:
raise InvalidKeyError(key)
# Then, check the default values # Then, check the default values
for key, value in skel.items(): for key, value in skel.items():
@ -83,7 +66,6 @@ def merge_with_skeleton(data: Dict, skel: Dict) -> Dict:
# Error if value is not same type as default value # Error if value is not same type as default value
elif type(data[key]) != type(value) and value is not None: elif type(data[key]) != type(value) and value is not None:
# TODO make this error message more verbose
raise TypeError("Invalid value type") raise TypeError("Invalid value type")
# Recurse into dicts # Recurse into dicts

View File

@ -1,7 +1,4 @@
"""Parent module for the various spec types."""
from .spec import Spec from .spec import Spec
from .directory import DirectorySpec from .directory import DirectorySpec
from .volume import VolumeSpec from .volume import VolumeSpec
from .container import ContainerSpec from .container import ContainerSpec
__all__ = ["Spec", "DirectorySpec", "VolumeSpec", "ContainerSpec"]

View File

@ -1,4 +1,3 @@
"""Module defining a container-based spec."""
from .spec import Spec from .spec import Spec
from typing import Union from typing import Union
from pathlib import Path from pathlib import Path
@ -7,10 +6,11 @@ import subprocess
class ContainerSpec(Spec): class ContainerSpec(Spec):
"""Spec for backing up via a container.""" """
A spec for backing up via a container.
"""
_SKEL = {"container": None, "command": None, "mountpoint": "/from"} _SKEL = {"container": None, "command": None, "mountpoint": "/from"}
"""The skeleton for the ContainerSpec config."""
def __init__( def __init__(
self, self,
@ -23,21 +23,6 @@ class ContainerSpec(Spec):
mountpoint: str, mountpoint: str,
notify=None, notify=None,
): ):
"""Create a new ContainerSpec object.
Args:
name: name of the spec (used as an identifier)
container: the Docker container to back up
destination: where to store the backups (gets created if
non-existent)
limit: max amount of backups to keep
command: command to run inside the container. This command should
perform a specified backup and output this data to stdout. This
output then gets piped to a backup file.
extension: the extension of the backup files.
mountpoint: I don't actually know, this never gets used
notify: notifier object (may be None)
"""
super().__init__(name, destination, limit, extension, notify) super().__init__(name, destination, limit, extension, notify)
self.container = container self.container = container
@ -45,7 +30,6 @@ class ContainerSpec(Spec):
self.command = command self.command = command
def backup(self): def backup(self):
"""Create a new backup."""
# Remove excess backups # Remove excess backups
self.remove_backups() self.remove_backups()

View File

@ -1,4 +1,3 @@
"""Module defining a directory-based spec."""
from .spec import Spec from .spec import Spec
from pathlib import Path from pathlib import Path
from typing import Union from typing import Union
@ -7,7 +6,9 @@ from datetime import datetime
class DirectorySpec(Spec): class DirectorySpec(Spec):
"""A spec for backing up a local directory.""" """
A spec for backing up a local directory.
"""
_SKEL = { _SKEL = {
"source": None, "source": None,
@ -24,17 +25,6 @@ class DirectorySpec(Spec):
extension: str, extension: str,
notify=None, notify=None,
): ):
"""Initialize a new DirectorySpec object.
Args:
name: name of the spec
source: what directory to back up
destination: where to store the backup
limit: how many backups to keep
command: what command to use to create the backup
extension: extension of the backup files
notify: a Notifier object that handles sending notifications
"""
super().__init__(name, destination, limit, extension, notify) super().__init__(name, destination, limit, extension, notify)
self.source = source if type(source) == Path else Path(source) self.source = source if type(source) == Path else Path(source)
@ -48,7 +38,6 @@ class DirectorySpec(Spec):
self.command = command self.command = command
def backup(self): def backup(self):
"""Create a new backup."""
# Remove excess backups # Remove excess backups
self.remove_backups() self.remove_backups()

View File

@ -1,4 +1,4 @@
"""This module contains the base Spec class.""" from pathlib import Path
from typing import Union, Dict from typing import Union, Dict
import skeleton import skeleton
import os import os
@ -7,7 +7,9 @@ import inspect
class Spec: class Spec:
"""Base class for all other spec types.""" """
Base class for all other spec types.
"""
_SKEL = { _SKEL = {
"destination": None, "destination": None,
@ -29,26 +31,23 @@ class Spec:
extension: str, extension: str,
notify=None, notify=None,
): ):
"""Initialize a new Spec object. """
This initializer usually gets called by a subclass's init instead of
directly.
Args: Args:
name: name of the spec name: name of the spec
destination: directory where the backups shall reside destination: directory where the backups shall reside
limit: max amount of backups limit: max amount of backups
extension: file extension of the backup files notifier: notifier object
notify: notifier object to send IFTT notifications
""" """
self.name = name self.name = name
self.destination = Path(destination) self.destination = (
destination if type(destination) == Path else Path(destination)
)
# Create destination if non-existent # Create destination if non-existent
try: try:
self.destination.mkdir(parents=True, exist_ok=True) self.destination.mkdir(parents=True, exist_ok=True)
# TODO just make this some checks in advance
except FileExistsError: except FileExistsError:
raise NotADirectoryError( raise NotADirectoryError(
"{} already exists, but isn't a directory.".format( "{} already exists, but isn't a directory.".format(
@ -61,24 +60,16 @@ class Spec:
self.extension = extension self.extension = extension
@classmethod @classmethod
def skeleton(cls: "Spec") -> Dict: def skeleton(cls):
"""Return the skeleton for the given class.
It works by inspecting the inheritance tree and merging the skeleton
for each of the parents.
Args:
cls: the class to get the skeleton for
Returns:
a dictionary containing the skeleton
"""
return skeleton.merge( return skeleton.merge(
*[val._SKEL for val in reversed(inspect.getmro(cls)[:-1])] *[val._SKEL for val in reversed(inspect.getmro(cls)[:-1])]
) )
def remove_backups(self): def remove_backups(self):
"""Remove all backups exceeding the limit.""" """
Remove all backups exceeding the limit
"""
files = sorted( files = sorted(
self.destination.glob("*." + self.extension), self.destination.glob("*." + self.extension),
key=os.path.getmtime, key=os.path.getmtime,
@ -90,22 +81,13 @@ class Spec:
path.unlink() path.unlink()
def backup(self): def backup(self):
"""Create a new backup.
This function should be implemented by the subclasses.
"""
raise NotImplementedError() raise NotImplementedError()
def restore(self): def restore(self):
"""Restore a given backup (NOT IMPLEMENTED).
This function should be implemented by the subclasses.
"""
raise NotImplementedError() raise NotImplementedError()
@classmethod @classmethod
def from_dict(cls, name, obj: Dict, defaults: Dict) -> "Spec": def from_dict(cls, name, obj: Dict, defaults: Dict) -> "Spec":
"""Create the class given a dictionary (e.g. from a config)."""
# Combine defaults with skeleton, creating new skeleton # Combine defaults with skeleton, creating new skeleton
skel = skeleton.merge(cls.skeleton(), defaults) skel = skeleton.merge(cls.skeleton(), defaults)
@ -115,8 +97,4 @@ class Spec:
return cls(name, **obj) return cls(name, **obj)
def to_dict(self): def to_dict(self):
"""Export the class as a dictionary.
This function should be imnplemented by the subclasses.
"""
raise NotImplementedError() raise NotImplementedError()

View File

@ -1,4 +1,3 @@
"""Module defining a Docker volume-based spec."""
from .spec import Spec from .spec import Spec
from typing import Union from typing import Union
from pathlib import Path from pathlib import Path
@ -7,7 +6,9 @@ import subprocess
class VolumeSpec(Spec): class VolumeSpec(Spec):
"""A spec for backing up a Docker volume.""" """
A spec for backing up a Docker volume.
"""
_SKEL = { _SKEL = {
"volume": None, "volume": None,
@ -26,18 +27,6 @@ class VolumeSpec(Spec):
extension: str, extension: str,
notify=None, notify=None,
): ):
"""Initialize a new VolumeSpec object.
Args:
name: name of the spec
volume: Docker volume to back up
image: base image to use to run backup command
destination: where to store the backup files
limit: max backup files to keep
command: backup command to run within the base image
extension: file extension of the backup files
notify: Notifier object
"""
super().__init__(name, destination, limit, extension, notify) super().__init__(name, destination, limit, extension, notify)
self.volume = volume self.volume = volume
@ -45,7 +34,6 @@ class VolumeSpec(Spec):
self.command = command self.command = command
def backup(self): def backup(self):
"""Create a new backup."""
# Remove excess backups # Remove excess backups
self.remove_backups() self.remove_backups()
@ -54,10 +42,8 @@ class VolumeSpec(Spec):
datetime.now().strftime("%Y-%m-%d_%H-%M-%S"), self.extension datetime.now().strftime("%Y-%m-%d_%H-%M-%S"), self.extension
) )
base_cmd = "docker run --rm -v '{}:/from' -v '{}:/to' -w /from '{}' {}"
process = subprocess.run( process = subprocess.run(
base_cmd.format( "docker run --rm -v '{}:/from' -v '{}:/to' -w /from '{}' {}".format(
self.volume, self.volume,
self.destination, self.destination,
self.image, self.image,

View File

@ -1,3 +0,0 @@
{
"$schema": "https://docs.renovatebot.com/renovate-schema.json"
}

View File

@ -0,0 +1,10 @@
# Language server
jedi==0.18.0
# Linting & Formatting
black==20.8b1
flake8==3.8.4
# Testing
tox==3.21.1
pytest==6.2.1

View File

@ -1,25 +0,0 @@
[options.extras_require]
# Used to run the tests inside the CICD pipeline
ci =
tox==3.23.1
# Used inside Tox for running tests
test =
pytest==6.2.4
pytest-cov==2.12.1
# Used inside tox for linting
lint =
black==20.8b1
flake8==3.9.2
flake8-bugbear==21.4.3
flake8-comprehensions==3.5.0
flake8-docstrings==1.6.0
flake8-print==4.0.0
flake8-quotes==3.2.0
# Required for the developer
develop =
%(ci)s
%(lint)s
jedi==0.18.0

View File

@ -1,10 +0,0 @@
from setuptools import setup
setup(
name="backup-tool",
version="0.1.0",
author="Jef Roosens",
description="A utility to simply backing up services.",
# TODO add license
packages=["app", "tests"],
)

View File

@ -1,69 +0,0 @@
"""Tests for the skeleton module."""
from app.skeleton import merge
def test_merge_empty():
"""Test correct response for an empty merge."""
assert merge() == {}
def test_merge_single():
"""Test merge command with a single input."""
assert merge({}) == {}
dic = {"test": "value", "test2": "value2"}
assert merge(dic) == dic
def test_merge_double_no_overlap():
"""Test merge command with two non-overlapping inputs."""
d1 = {"test": "value", "test2": "value2"}
d2 = {"test3": "value3"}
d_res = {"test": "value", "test2": "value2", "test3": "value3"}
assert merge(d1, d2) == d_res
def test_merge_double_overlap():
"""Test merge command with two overlapping inputs."""
d1 = {"test": "value", "test2": "value2"}
d2 = {"test2": "value3"}
d_res = {"test": "value", "test2": "value3"}
assert merge(d1, d2) == d_res
def test_merge_triple_no_overlap():
"""Test merge command with three non-overlapping inputs.
This test tells us that the recursion works.
"""
d1 = {"test": "value", "test2": "value2"}
d2 = {"test3": "value3"}
d3 = {"test4": "value4"}
d_res = {
"test": "value",
"test2": "value2",
"test3": "value3",
"test4": "value4",
}
assert merge(d1, d2, d3) == d_res
def test_merge_triple_overlap():
"""Test merge command with three overlapping inputs.
This test tells us that the recursion works.
"""
d1 = {"test": "value", "test2": "value2"}
d2 = {"test3": "value3"}
d3 = {"test2": "value4"}
d_res = {
"test": "value",
"test2": "value4",
"test3": "value3",
}
assert merge(d1, d2, d3) == d_res

View File

@ -1,23 +0,0 @@
"""Tests for the logger module."""
from app.logger import Logger
from datetime import datetime
def test_custom_stdout(capfd):
"""Test the custom command."""
logger = Logger()
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
logger.custom("a message", header="cewl")
out, _ = capfd.readouterr()
assert out == f"[{timestamp}][cewl] a message\n"
def test_log_stdout(capfd):
"""Test the log command with several levels."""
logger = Logger()
# TODO

View File

@ -1,70 +0,0 @@
"""Tests wether the skeleton merge works."""
from app.skeleton import merge_with_skeleton
from app.exceptions import InvalidKeyError, MissingKeyError
import pytest
def test_single_invalid_key():
"""Tests wether an InvalidKeyError is correctly thrown for a single key."""
data = {
"test": 1,
"test2": "test"
}
skel = {
"test": None,
}
with pytest.raises(InvalidKeyError) as e_info:
merge_with_skeleton(data, skel)
assert e_info.value.message == "Invalid key(s): test2"
def test_multiple_invalid_keys():
"""Tests wether an InvalidKeyError is thrown for multiple keys."""
data = {
"test": 1,
"test2": "test",
"test3": "test",
}
skel = {
"test": None,
}
with pytest.raises(InvalidKeyError) as e_info:
merge_with_skeleton(data, skel)
assert e_info.value.message == "Invalid key(s): test2, test3"
def test_single_missing_key():
"""Tests wether a MissingKeyError is correctly thrown for a single key."""
data = {
"test": 1,
}
skel = {
"test": None,
"test2": None,
}
with pytest.raises(MissingKeyError) as e_info:
merge_with_skeleton(data, skel)
assert e_info.value.message == "Missing key(s): test2"
def test_multiple_missing_keys():
"""Tests wether a MissingKeyError is correctly thrown for multiple keys."""
data = {
"test": 1,
}
skel = {
"test": None,
"test2": None,
"test3": None,
}
with pytest.raises(MissingKeyError) as e_info:
merge_with_skeleton(data, skel)
assert e_info.value.message == "Missing key(s): test2, test3"

15
tox.ini
View File

@ -1,15 +0,0 @@
[tox]
envlist = py36,py37,pypy37,py38,py39,lint
[testenv]
deps = .[test]
commands =
pytest
pytest --cov=app --cov-fail-under=90 tests/
[testenv:lint]
basepython = python3.6
deps = .[lint]
commands =
black --check setup.py app
flake8 setup.py app