refactor: split backup and alex into separate crate; set up workspace
This commit is contained in:
parent
d23227dd0b
commit
abafd9a28c
26 changed files with 1494 additions and 33 deletions
162
backup/src/delta.rs
Normal file
162
backup/src/delta.rs
Normal file
|
|
@ -0,0 +1,162 @@
|
|||
use std::{borrow::Borrow, fmt};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use super::State;
|
||||
|
||||
/// Represents the changes relative to the previous backup
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct Delta {
|
||||
/// What files were added/modified in each part of the tarball.
|
||||
pub added: State,
|
||||
/// What files were removed in this backup, in comparison to the previous backup. For full
|
||||
/// backups, this will always be empty, as they do not consider previous backups.
|
||||
/// The map stores a separate list for each top-level directory, as the contents of these
|
||||
/// directories can come for different source directories.
|
||||
pub removed: State,
|
||||
}
|
||||
|
||||
impl Delta {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
added: State::new(),
|
||||
removed: State::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns whether the delta is empty by checking whether both its added and removed state
|
||||
/// return true for their `is_empty`.
|
||||
#[allow(dead_code)]
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.added.is_empty() && self.removed.is_empty()
|
||||
}
|
||||
|
||||
/// Calculate the union of this delta with another delta.
|
||||
///
|
||||
/// The union of two deltas is a delta that produces the same state as if you were to apply
|
||||
/// both deltas in-order. Note that this operation is not commutative.
|
||||
pub fn union(&self, delta: &Self) -> Self {
|
||||
let mut out = self.clone();
|
||||
|
||||
for (dir, added) in delta.added.iter() {
|
||||
// Files that were removed in the current state, but added in the new state, are no
|
||||
// longer removed
|
||||
if let Some(orig_removed) = out.removed.get_mut(dir) {
|
||||
orig_removed.retain(|k| !added.contains(k));
|
||||
}
|
||||
|
||||
// Newly added files are added to the state as well
|
||||
if let Some(orig_added) = out.added.get_mut(dir) {
|
||||
orig_added.extend(added.iter().cloned());
|
||||
} else {
|
||||
out.added.insert(dir.clone(), added.clone());
|
||||
}
|
||||
}
|
||||
|
||||
for (dir, removed) in delta.removed.iter() {
|
||||
// Files that were originally added, but now deleted are removed from the added list
|
||||
if let Some(orig_added) = out.added.get_mut(dir) {
|
||||
orig_added.retain(|k| !removed.contains(k));
|
||||
}
|
||||
|
||||
// Newly removed files are added to the state as well
|
||||
if let Some(orig_removed) = out.removed.get_mut(dir) {
|
||||
orig_removed.extend(removed.iter().cloned());
|
||||
} else {
|
||||
out.removed.insert(dir.clone(), removed.clone());
|
||||
}
|
||||
}
|
||||
|
||||
out
|
||||
}
|
||||
|
||||
// Calculate the difference between this delta and the other delta.
|
||||
//
|
||||
// The difference simply means removing all adds and removes that are also performed in the
|
||||
// other delta.
|
||||
pub fn difference(&self, other: &Self) -> Self {
|
||||
let mut out = self.clone();
|
||||
|
||||
for (dir, added) in out.added.iter_mut() {
|
||||
// If files are added in the other delta, we don't add them in this delta
|
||||
if let Some(other_added) = other.added.get(dir) {
|
||||
added.retain(|k| !other_added.contains(k));
|
||||
};
|
||||
}
|
||||
|
||||
for (dir, removed) in out.removed.iter_mut() {
|
||||
// If files are removed in the other delta, we don't remove them in this delta either
|
||||
if let Some(other_removed) = other.removed.get(dir) {
|
||||
removed.retain(|k| !other_removed.contains(k));
|
||||
}
|
||||
}
|
||||
|
||||
out
|
||||
}
|
||||
|
||||
// Calculate the strict difference between this delta and the other delta.
|
||||
//
|
||||
// The strict difference is a difference where all operations that would be overwritten by the
|
||||
// other delta are also removed (a.k.a. adding a file after removing it, or vice versa)
|
||||
pub fn strict_difference(&self, other: &Self) -> Self {
|
||||
let mut out = self.difference(other);
|
||||
|
||||
for (dir, added) in out.added.iter_mut() {
|
||||
// Remove additions that are removed in the other delta
|
||||
if let Some(other_removed) = other.removed.get(dir) {
|
||||
added.retain(|k| !other_removed.contains(k));
|
||||
}
|
||||
}
|
||||
|
||||
for (dir, removed) in out.removed.iter_mut() {
|
||||
// Remove removals that are re-added in the other delta
|
||||
if let Some(other_added) = other.added.get(dir) {
|
||||
removed.retain(|k| !other_added.contains(k));
|
||||
}
|
||||
}
|
||||
|
||||
out
|
||||
}
|
||||
|
||||
/// Given a chain of deltas, ordered from last to first, calculate the "contribution" for each
|
||||
/// state.
|
||||
///
|
||||
/// The contribution of a delta in a given chain is defined as the parts of the state produced
|
||||
/// by this chain that are actually provided by this delta. This comes down to calculating the
|
||||
/// strict difference of this delta and all of its successive deltas.
|
||||
pub fn contributions<I>(deltas: I) -> Vec<State>
|
||||
where
|
||||
I: IntoIterator,
|
||||
I::Item: Borrow<Delta>,
|
||||
{
|
||||
let mut contributions: Vec<State> = Vec::new();
|
||||
|
||||
let mut deltas = deltas.into_iter();
|
||||
|
||||
if let Some(first_delta) = deltas.next() {
|
||||
// From last to first, we calculate the strict difference of the delta with the union of all its
|
||||
// following deltas. The list of added files of this difference is the contribution for
|
||||
// that delta.
|
||||
contributions.push(first_delta.borrow().added.clone());
|
||||
let mut union_future = first_delta.borrow().clone();
|
||||
|
||||
for delta in deltas {
|
||||
contributions.push(delta.borrow().strict_difference(&union_future).added);
|
||||
union_future = union_future.union(delta.borrow());
|
||||
}
|
||||
}
|
||||
|
||||
// contributions.reverse();
|
||||
|
||||
contributions
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for Delta {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
let added_count: usize = self.added.values().map(|s| s.len()).sum();
|
||||
let removed_count: usize = self.removed.values().map(|s| s.len()).sum();
|
||||
|
||||
write!(f, "+{}-{}", added_count, removed_count)
|
||||
}
|
||||
}
|
||||
43
backup/src/io_ext.rs
Normal file
43
backup/src/io_ext.rs
Normal file
|
|
@ -0,0 +1,43 @@
|
|||
use std::io::{self, Write};
|
||||
|
||||
/// Wrapper around the Write trait that counts how many bytes have been written in total.
|
||||
/// Heavily inspired by https://stackoverflow.com/a/42189386
|
||||
pub struct CountingWrite<W> {
|
||||
inner: W,
|
||||
count: usize,
|
||||
}
|
||||
|
||||
impl<W> CountingWrite<W>
|
||||
where
|
||||
W: Write,
|
||||
{
|
||||
pub fn new(writer: W) -> Self {
|
||||
Self {
|
||||
inner: writer,
|
||||
count: 0,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn bytes_written(&self) -> usize {
|
||||
self.count
|
||||
}
|
||||
}
|
||||
|
||||
impl<W> Write for CountingWrite<W>
|
||||
where
|
||||
W: Write,
|
||||
{
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
let res = self.inner.write(buf);
|
||||
|
||||
if let Ok(count) = res {
|
||||
self.count += count;
|
||||
}
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
self.inner.flush()
|
||||
}
|
||||
}
|
||||
319
backup/src/lib.rs
Normal file
319
backup/src/lib.rs
Normal file
|
|
@ -0,0 +1,319 @@
|
|||
mod delta;
|
||||
mod io_ext;
|
||||
pub mod manager;
|
||||
mod path;
|
||||
mod state;
|
||||
|
||||
use std::{
|
||||
collections::HashSet,
|
||||
fmt,
|
||||
fs::File,
|
||||
io,
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
|
||||
use chrono::Utc;
|
||||
use flate2::{read::GzDecoder, write::GzEncoder, Compression};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use delta::Delta;
|
||||
pub use manager::Manager;
|
||||
pub use manager::ManagerConfig;
|
||||
pub use manager::MetaManager;
|
||||
use path::PathExt;
|
||||
pub use state::State;
|
||||
|
||||
const BYTE_SUFFIXES: [&str; 5] = ["B", "KiB", "MiB", "GiB", "TiB"];
|
||||
|
||||
pub fn other(msg: &str) -> io::Error {
|
||||
io::Error::new(io::ErrorKind::Other, msg)
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub enum BackupType {
|
||||
Full,
|
||||
Incremental,
|
||||
}
|
||||
|
||||
/// Represents a successful backup
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct Backup<T: Clone> {
|
||||
/// When the backup was started (also corresponds to the name)
|
||||
pub start_time: chrono::DateTime<Utc>,
|
||||
/// When the backup finished
|
||||
pub end_time: chrono::DateTime<Utc>,
|
||||
pub size: usize,
|
||||
/// Type of the backup
|
||||
pub type_: BackupType,
|
||||
pub delta: Delta,
|
||||
/// Additional metadata that can be associated with a given backup
|
||||
pub metadata: Option<T>,
|
||||
}
|
||||
|
||||
impl Backup<()> {
|
||||
pub const FILENAME_FORMAT: &str = "%Y-%m-%d_%H-%M-%S.tar.gz";
|
||||
|
||||
/// Return the path to a backup file by properly formatting the data.
|
||||
pub fn path<P: AsRef<Path>>(backup_dir: P, start_time: chrono::DateTime<Utc>) -> PathBuf {
|
||||
let backup_dir = backup_dir.as_ref();
|
||||
|
||||
let filename = format!("{}", start_time.format(Self::FILENAME_FORMAT));
|
||||
backup_dir.join(filename)
|
||||
}
|
||||
|
||||
/// Extract an archive.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `backup_path` - Path to the archive to extract
|
||||
/// * `dirs` - list of tuples `(path_in_tar, dst_dir)` with `dst_dir` the directory on-disk
|
||||
/// where the files stored under `path_in_tar` inside the tarball should be extracted to.
|
||||
pub fn extract_archive<P: AsRef<Path>>(
|
||||
archive_path: P,
|
||||
dirs: &Vec<(PathBuf, PathBuf)>,
|
||||
) -> io::Result<()> {
|
||||
let tar_gz = File::open(archive_path)?;
|
||||
let enc = GzDecoder::new(tar_gz);
|
||||
let mut ar = tar::Archive::new(enc);
|
||||
|
||||
// Unpack each file by matching it with one of the destination directories and extracting
|
||||
// it to the right path
|
||||
for entry in ar.entries()? {
|
||||
let mut entry = entry?;
|
||||
let entry_path_in_tar = entry.path()?.to_path_buf();
|
||||
|
||||
for (path_in_tar, dst_dir) in dirs {
|
||||
if entry_path_in_tar.starts_with(path_in_tar) {
|
||||
let dst_path =
|
||||
dst_dir.join(entry_path_in_tar.strip_prefix(path_in_tar).unwrap());
|
||||
|
||||
// Ensure all parent directories are present
|
||||
std::fs::create_dir_all(dst_path.parent().unwrap())?;
|
||||
|
||||
entry.unpack(dst_path)?;
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Clone> Backup<T> {
|
||||
/// Set the backup's metadata.
|
||||
pub fn set_metadata(&mut self, metadata: T) {
|
||||
self.metadata = Some(metadata);
|
||||
}
|
||||
|
||||
/// Create a new Full backup, populated with the given directories.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `backup_dir` - Directory to store archive in
|
||||
/// * `dirs` - list of tuples `(path_in_tar, src_dir)` with `path_in_tar` the directory name
|
||||
/// under which `src_dir`'s contents should be stored in the archive
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// The `Backup` instance describing this new backup.
|
||||
pub fn create<P: AsRef<Path>>(
|
||||
backup_dir: P,
|
||||
dirs: &Vec<(PathBuf, PathBuf)>,
|
||||
) -> io::Result<Self> {
|
||||
let start_time = chrono::offset::Utc::now();
|
||||
|
||||
let path = Backup::path(backup_dir, start_time);
|
||||
let tar_gz = io_ext::CountingWrite::new(File::create(path)?);
|
||||
let enc = GzEncoder::new(tar_gz, Compression::default());
|
||||
let mut ar = tar::Builder::new(enc);
|
||||
|
||||
let mut delta = Delta::new();
|
||||
|
||||
for (dir_in_tar, src_dir) in dirs {
|
||||
let mut added_files: HashSet<PathBuf> = HashSet::new();
|
||||
|
||||
for entry in src_dir.read_dir_recursive()?.ignored("cache").files() {
|
||||
let path = entry?.path();
|
||||
let stripped = path.strip_prefix(src_dir).unwrap();
|
||||
|
||||
ar.append_path_with_name(&path, dir_in_tar.join(stripped))?;
|
||||
added_files.insert(stripped.to_path_buf());
|
||||
}
|
||||
|
||||
delta.added.insert(dir_in_tar.to_path_buf(), added_files);
|
||||
}
|
||||
|
||||
let mut enc = ar.into_inner()?;
|
||||
|
||||
// The docs recommend running try_finish before unwrapping using finish
|
||||
enc.try_finish()?;
|
||||
let tar_gz = enc.finish()?;
|
||||
|
||||
Ok(Backup {
|
||||
type_: BackupType::Full,
|
||||
start_time,
|
||||
end_time: chrono::Utc::now(),
|
||||
size: tar_gz.bytes_written(),
|
||||
delta,
|
||||
metadata: None,
|
||||
})
|
||||
}
|
||||
|
||||
/// Create a new Incremental backup from the given state, populated with the given directories.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `previous_state` - State the file system was in during the previous backup in the chain
|
||||
/// * `previous_start_time` - Start time of the previous backup; used to filter files
|
||||
/// * `backup_dir` - Directory to store archive in
|
||||
/// * `dirs` - list of tuples `(path_in_tar, src_dir)` with `path_in_tar` the directory name
|
||||
/// under which `src_dir`'s contents should be stored in the archive
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// The `Backup` instance describing this new backup.
|
||||
pub fn create_from<P: AsRef<Path>>(
|
||||
previous_state: State,
|
||||
previous_start_time: chrono::DateTime<Utc>,
|
||||
backup_dir: P,
|
||||
dirs: &Vec<(PathBuf, PathBuf)>,
|
||||
) -> io::Result<Self> {
|
||||
let start_time = chrono::offset::Utc::now();
|
||||
|
||||
let path = Backup::path(backup_dir, start_time);
|
||||
let tar_gz = io_ext::CountingWrite::new(File::create(path)?);
|
||||
let enc = GzEncoder::new(tar_gz, Compression::default());
|
||||
let mut ar = tar::Builder::new(enc);
|
||||
|
||||
let mut delta = Delta::new();
|
||||
|
||||
for (dir_in_tar, src_dir) in dirs {
|
||||
let mut all_files: HashSet<PathBuf> = HashSet::new();
|
||||
let mut added_files: HashSet<PathBuf> = HashSet::new();
|
||||
|
||||
for entry in src_dir.read_dir_recursive()?.ignored("cache").files() {
|
||||
let path = entry?.path();
|
||||
let stripped = path.strip_prefix(src_dir).unwrap();
|
||||
|
||||
if !path.not_modified_since(previous_start_time) {
|
||||
ar.append_path_with_name(&path, dir_in_tar.join(stripped))?;
|
||||
added_files.insert(stripped.to_path_buf());
|
||||
}
|
||||
|
||||
all_files.insert(stripped.to_path_buf());
|
||||
}
|
||||
|
||||
delta.added.insert(dir_in_tar.clone(), added_files);
|
||||
|
||||
if let Some(previous_files) = previous_state.get(dir_in_tar) {
|
||||
delta.removed.insert(
|
||||
dir_in_tar.to_path_buf(),
|
||||
previous_files.difference(&all_files).cloned().collect(),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
let mut enc = ar.into_inner()?;
|
||||
|
||||
// The docs recommend running try_finish before unwrapping using finish
|
||||
enc.try_finish()?;
|
||||
let tar_gz = enc.finish()?;
|
||||
|
||||
Ok(Backup {
|
||||
type_: BackupType::Incremental,
|
||||
start_time,
|
||||
end_time: chrono::Utc::now(),
|
||||
size: tar_gz.bytes_written(),
|
||||
delta,
|
||||
metadata: None,
|
||||
})
|
||||
}
|
||||
|
||||
/// Restore the backup by extracting its contents to the respective directories.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `backup_dir` - Backup directory where the file is stored
|
||||
/// * `dirs` - list of tuples `(path_in_tar, dst_dir)` with `dst_dir` the directory on-disk
|
||||
/// where the files stored under `path_in_tar` inside the tarball should be extracted to.
|
||||
pub fn restore<P: AsRef<Path>>(
|
||||
&self,
|
||||
backup_dir: P,
|
||||
dirs: &Vec<(PathBuf, PathBuf)>,
|
||||
) -> io::Result<()> {
|
||||
let backup_path = Backup::path(backup_dir, self.start_time);
|
||||
Backup::extract_archive(backup_path, dirs)?;
|
||||
|
||||
// Remove any files
|
||||
for (path_in_tar, dst_dir) in dirs {
|
||||
if let Some(removed) = self.delta.removed.get(path_in_tar) {
|
||||
for path in removed {
|
||||
let dst_path = dst_dir.join(path);
|
||||
std::fs::remove_file(dst_path)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn open<P: AsRef<Path>>(&self, backup_dir: P) -> io::Result<tar::Archive<GzDecoder<File>>> {
|
||||
let path = Backup::path(backup_dir, self.start_time);
|
||||
let tar_gz = File::open(path)?;
|
||||
let enc = GzDecoder::new(tar_gz);
|
||||
Ok(tar::Archive::new(enc))
|
||||
}
|
||||
|
||||
/// Open this backup's archive and append all its files that are part of the provided state to
|
||||
/// the archive file.
|
||||
pub fn append<P: AsRef<Path>>(
|
||||
&self,
|
||||
backup_dir: P,
|
||||
state: &State,
|
||||
ar: &mut tar::Builder<GzEncoder<File>>,
|
||||
) -> io::Result<()> {
|
||||
let mut own_ar = self.open(backup_dir)?;
|
||||
|
||||
for entry in own_ar.entries()? {
|
||||
let entry = entry?;
|
||||
let entry_path_in_tar = entry.path()?.to_path_buf();
|
||||
|
||||
if state.contains(&entry_path_in_tar) {
|
||||
let header = entry.header().clone();
|
||||
ar.append(&header, entry)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Clone> fmt::Display for Backup<T> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
let letter = match self.type_ {
|
||||
BackupType::Full => 'F',
|
||||
BackupType::Incremental => 'I',
|
||||
};
|
||||
|
||||
// Pretty-print size
|
||||
// If your backup is a petabyte or larger, this will crash and you need to re-evaluate your
|
||||
// life choices
|
||||
let index = self.size.ilog(1024) as usize;
|
||||
let size = self.size as f64 / (1024.0_f64.powi(index as i32));
|
||||
let duration = self.end_time - self.start_time;
|
||||
|
||||
write!(
|
||||
f,
|
||||
"{} ({}, {}m{}s, {:.2}{}, {})",
|
||||
self.start_time.format(Backup::FILENAME_FORMAT),
|
||||
letter,
|
||||
duration.num_seconds() / 60,
|
||||
duration.num_seconds() % 60,
|
||||
size,
|
||||
BYTE_SUFFIXES[index],
|
||||
self.delta
|
||||
)
|
||||
}
|
||||
}
|
||||
46
backup/src/manager/config.rs
Normal file
46
backup/src/manager/config.rs
Normal file
|
|
@ -0,0 +1,46 @@
|
|||
use std::{error::Error, fmt, str::FromStr};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct ManagerConfig {
|
||||
pub name: String,
|
||||
pub frequency: u32,
|
||||
pub chains: u64,
|
||||
pub chain_len: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ParseManagerConfigErr;
|
||||
|
||||
impl Error for ParseManagerConfigErr {}
|
||||
|
||||
impl fmt::Display for ParseManagerConfigErr {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "parse manager config err")
|
||||
}
|
||||
}
|
||||
|
||||
impl FromStr for ManagerConfig {
|
||||
type Err = ParseManagerConfigErr;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
let splits: Vec<&str> = s.split(',').collect();
|
||||
|
||||
if let [name, frequency, chains, chain_len] = splits[..] {
|
||||
let name: String = name.parse().map_err(|_| ParseManagerConfigErr)?;
|
||||
let frequency: u32 = frequency.parse().map_err(|_| ParseManagerConfigErr)?;
|
||||
let chains: u64 = chains.parse().map_err(|_| ParseManagerConfigErr)?;
|
||||
let chain_len: u64 = chain_len.parse().map_err(|_| ParseManagerConfigErr)?;
|
||||
|
||||
Ok(ManagerConfig {
|
||||
name,
|
||||
chains,
|
||||
chain_len,
|
||||
frequency,
|
||||
})
|
||||
} else {
|
||||
Err(ParseManagerConfigErr)
|
||||
}
|
||||
}
|
||||
}
|
||||
149
backup/src/manager/meta.rs
Normal file
149
backup/src/manager/meta.rs
Normal file
|
|
@ -0,0 +1,149 @@
|
|||
use std::{
|
||||
collections::HashMap,
|
||||
io,
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
|
||||
use chrono::Utc;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use super::{Manager, ManagerConfig};
|
||||
|
||||
/// Manages a collection of backup layers, allowing them to be utilized as a single object.
|
||||
pub struct MetaManager<T>
|
||||
where
|
||||
T: Clone + Serialize + for<'de> Deserialize<'de> + std::fmt::Debug,
|
||||
{
|
||||
backup_dir: PathBuf,
|
||||
dirs: Vec<(PathBuf, PathBuf)>,
|
||||
default_metadata: T,
|
||||
managers: HashMap<String, Manager<T>>,
|
||||
}
|
||||
|
||||
impl<T> MetaManager<T>
|
||||
where
|
||||
T: Clone + Serialize + for<'de> Deserialize<'de> + std::fmt::Debug,
|
||||
{
|
||||
pub fn new<P: Into<PathBuf>>(
|
||||
backup_dir: P,
|
||||
dirs: Vec<(PathBuf, PathBuf)>,
|
||||
default_metadata: T,
|
||||
) -> Self {
|
||||
MetaManager {
|
||||
backup_dir: backup_dir.into(),
|
||||
dirs,
|
||||
default_metadata,
|
||||
managers: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Add a new manager to track, initializing it first.
|
||||
pub fn add(&mut self, config: &ManagerConfig) -> io::Result<()> {
|
||||
// Backup dir itself should exist, but we control its contents, so we can create
|
||||
// separate directories for each layer
|
||||
let path = self.backup_dir.join(&config.name);
|
||||
|
||||
// If the directory already exists, that's okay
|
||||
match std::fs::create_dir(&path) {
|
||||
Ok(()) => (),
|
||||
Err(e) => match e.kind() {
|
||||
io::ErrorKind::AlreadyExists => (),
|
||||
_ => return Err(e),
|
||||
},
|
||||
};
|
||||
|
||||
let mut manager = Manager::new(
|
||||
path,
|
||||
self.dirs.clone(),
|
||||
self.default_metadata.clone(),
|
||||
config.chain_len,
|
||||
config.chains,
|
||||
chrono::Duration::minutes(config.frequency.into()),
|
||||
);
|
||||
manager.load()?;
|
||||
self.managers.insert(config.name.clone(), manager);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Convenient wrapper for `add`.
|
||||
pub fn add_all(&mut self, configs: &Vec<ManagerConfig>) -> io::Result<()> {
|
||||
for config in configs {
|
||||
self.add(config)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Return the name of the next scheduled layer, if one or more managers are present.
|
||||
pub fn next_scheduled_layer(&self) -> Option<&str> {
|
||||
self.managers
|
||||
.iter()
|
||||
.min_by_key(|(_, m)| m.next_scheduled_time())
|
||||
.map(|(k, _)| k.as_str())
|
||||
}
|
||||
|
||||
/// Return the earliest scheduled time for the underlying managers.
|
||||
pub fn next_scheduled_time(&self) -> Option<chrono::DateTime<Utc>> {
|
||||
self.managers
|
||||
.values()
|
||||
.map(|m| m.next_scheduled_time())
|
||||
.min()
|
||||
}
|
||||
|
||||
/// Perform a backup cycle for the earliest scheduled manager.
|
||||
pub fn perform_backup_cycle(&mut self) -> io::Result<()> {
|
||||
if let Some(manager) = self
|
||||
.managers
|
||||
.values_mut()
|
||||
.min_by_key(|m| m.next_scheduled_time())
|
||||
{
|
||||
manager.create_backup()?;
|
||||
manager.remove_old_backups()
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a manual backup for a specific layer
|
||||
pub fn create_backup(&mut self, layer: &str) -> Option<io::Result<()>> {
|
||||
if let Some(manager) = self.managers.get_mut(layer) {
|
||||
let mut res = manager.create_backup();
|
||||
|
||||
if res.is_ok() {
|
||||
res = manager.remove_old_backups();
|
||||
}
|
||||
|
||||
Some(res)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Restore a backup for a specific layer
|
||||
pub fn restore_backup(
|
||||
&self,
|
||||
layer: &str,
|
||||
start_time: chrono::DateTime<Utc>,
|
||||
dirs: &Vec<(PathBuf, PathBuf)>,
|
||||
) -> Option<io::Result<()>> {
|
||||
self.managers
|
||||
.get(layer)
|
||||
.map(|manager| manager.restore_backup(start_time, dirs))
|
||||
}
|
||||
|
||||
pub fn export_backup<P: AsRef<Path>>(
|
||||
&self,
|
||||
layer: &str,
|
||||
start_time: chrono::DateTime<Utc>,
|
||||
output_path: P,
|
||||
) -> Option<io::Result<()>> {
|
||||
self.managers
|
||||
.get(layer)
|
||||
.map(|manager| manager.export_backup(start_time, output_path))
|
||||
}
|
||||
|
||||
pub fn managers(&self) -> &HashMap<String, Manager<T>> {
|
||||
&self.managers
|
||||
}
|
||||
}
|
||||
253
backup/src/manager/mod.rs
Normal file
253
backup/src/manager/mod.rs
Normal file
|
|
@ -0,0 +1,253 @@
|
|||
mod config;
|
||||
mod meta;
|
||||
|
||||
use std::{
|
||||
fs::{File, OpenOptions},
|
||||
io,
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
|
||||
use chrono::{SubsecRound, Utc};
|
||||
use flate2::{write::GzEncoder, Compression};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use super::{Backup, BackupType, Delta, State};
|
||||
use crate::other;
|
||||
pub use config::ManagerConfig;
|
||||
pub use meta::MetaManager;
|
||||
|
||||
/// Manages a single backup layer consisting of one or more chains of backups.
|
||||
pub struct Manager<T>
|
||||
where
|
||||
T: Clone + Serialize + for<'de> Deserialize<'de> + std::fmt::Debug,
|
||||
{
|
||||
backup_dir: PathBuf,
|
||||
dirs: Vec<(PathBuf, PathBuf)>,
|
||||
default_metadata: T,
|
||||
chain_len: u64,
|
||||
chains_to_keep: u64,
|
||||
frequency: chrono::Duration,
|
||||
chains: Vec<Vec<Backup<T>>>,
|
||||
}
|
||||
|
||||
impl<T> Manager<T>
|
||||
where
|
||||
T: Clone + Serialize + for<'de> Deserialize<'de> + std::fmt::Debug,
|
||||
{
|
||||
const METADATA_FILE: &str = "alex.json";
|
||||
|
||||
pub fn new<P: Into<PathBuf>>(
|
||||
backup_dir: P,
|
||||
dirs: Vec<(PathBuf, PathBuf)>,
|
||||
metadata: T,
|
||||
chain_len: u64,
|
||||
chains_to_keep: u64,
|
||||
frequency: chrono::Duration,
|
||||
) -> Self {
|
||||
Self {
|
||||
backup_dir: backup_dir.into(),
|
||||
dirs,
|
||||
default_metadata: metadata,
|
||||
chain_len,
|
||||
chains_to_keep,
|
||||
frequency,
|
||||
chains: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new backup, either full or incremental, depending on the state of the current
|
||||
/// chain.
|
||||
pub fn create_backup(&mut self) -> io::Result<()> {
|
||||
// We start a new chain if the current chain is complete, or if there isn't a first chain
|
||||
// yet
|
||||
if let Some(current_chain) = self.chains.last() {
|
||||
let current_chain_len: u64 = current_chain.len().try_into().unwrap();
|
||||
|
||||
if current_chain_len >= self.chain_len {
|
||||
self.chains.push(Vec::new());
|
||||
}
|
||||
} else {
|
||||
self.chains.push(Vec::new());
|
||||
}
|
||||
|
||||
let current_chain = self.chains.last_mut().unwrap();
|
||||
|
||||
let mut backup = if !current_chain.is_empty() {
|
||||
let previous_backup = current_chain.last().unwrap();
|
||||
let previous_state = State::from(current_chain.iter().map(|b| &b.delta));
|
||||
|
||||
Backup::create_from(
|
||||
previous_state,
|
||||
previous_backup.start_time,
|
||||
&self.backup_dir,
|
||||
&self.dirs,
|
||||
)?
|
||||
} else {
|
||||
Backup::create(&self.backup_dir, &self.dirs)?
|
||||
};
|
||||
|
||||
backup.set_metadata(self.default_metadata.clone());
|
||||
|
||||
current_chain.push(backup);
|
||||
|
||||
self.save()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Delete all backups associated with outdated chains, and forget those chains.
|
||||
pub fn remove_old_backups(&mut self) -> io::Result<()> {
|
||||
let chains_to_store: usize = self.chains_to_keep.try_into().unwrap();
|
||||
|
||||
if chains_to_store < self.chains.len() {
|
||||
let mut remove_count: usize = self.chains.len() - chains_to_store;
|
||||
|
||||
// We only count finished chains towards the list of stored chains
|
||||
let chain_len: usize = self.chain_len.try_into().unwrap();
|
||||
if self.chains.last().unwrap().len() < chain_len {
|
||||
remove_count -= 1;
|
||||
}
|
||||
|
||||
for chain in self.chains.drain(..remove_count) {
|
||||
for backup in chain {
|
||||
let path = Backup::path(&self.backup_dir, backup.start_time);
|
||||
std::fs::remove_file(path)?;
|
||||
}
|
||||
}
|
||||
|
||||
self.save()?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Write the in-memory state to disk.
|
||||
pub fn save(&self) -> io::Result<()> {
|
||||
let json_file = File::create(self.backup_dir.join(Self::METADATA_FILE))?;
|
||||
serde_json::to_writer(json_file, &self.chains)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Overwrite the in-memory state with the on-disk state.
|
||||
pub fn load(&mut self) -> io::Result<()> {
|
||||
let json_file = match File::open(self.backup_dir.join(Self::METADATA_FILE)) {
|
||||
Ok(f) => f,
|
||||
Err(e) => {
|
||||
// Don't error out if the file isn't there, it will be created when necessary
|
||||
if e.kind() == io::ErrorKind::NotFound {
|
||||
self.chains = Vec::new();
|
||||
|
||||
return Ok(());
|
||||
} else {
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
self.chains = serde_json::from_reader(json_file)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Calculate the next time a backup should be created. If no backup has been created yet, it
|
||||
/// will return now.
|
||||
pub fn next_scheduled_time(&self) -> chrono::DateTime<Utc> {
|
||||
self.chains
|
||||
.last()
|
||||
.and_then(|last_chain| last_chain.last())
|
||||
.map(|last_backup| last_backup.start_time + self.frequency)
|
||||
.unwrap_or_else(chrono::offset::Utc::now)
|
||||
}
|
||||
|
||||
/// Search for a chain containing a backup with the specified start time.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// A tuple (chain, index) with index being the index of the found backup in the returned
|
||||
/// chain.
|
||||
fn find(&self, start_time: chrono::DateTime<Utc>) -> Option<(&Vec<Backup<T>>, usize)> {
|
||||
for chain in &self.chains {
|
||||
if let Some(index) = chain
|
||||
.iter()
|
||||
.position(|b| b.start_time.trunc_subsecs(0) == start_time)
|
||||
{
|
||||
return Some((chain, index));
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
/// Restore the backup with the given start time by restoring its chain up to and including the
|
||||
/// backup, in order.
|
||||
pub fn restore_backup(
|
||||
&self,
|
||||
start_time: chrono::DateTime<Utc>,
|
||||
dirs: &Vec<(PathBuf, PathBuf)>,
|
||||
) -> io::Result<()> {
|
||||
self.find(start_time)
|
||||
.ok_or_else(|| other("Unknown layer."))
|
||||
.and_then(|(chain, index)| {
|
||||
for backup in chain.iter().take(index + 1) {
|
||||
backup.restore(&self.backup_dir, dirs)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
/// Export the backup with the given start time as a new full archive.
|
||||
pub fn export_backup<P: AsRef<Path>>(
|
||||
&self,
|
||||
start_time: chrono::DateTime<Utc>,
|
||||
output_path: P,
|
||||
) -> io::Result<()> {
|
||||
self.find(start_time)
|
||||
.ok_or_else(|| other("Unknown layer."))
|
||||
.and_then(|(chain, index)| {
|
||||
match chain[index].type_ {
|
||||
// A full backup is simply copied to the output path
|
||||
BackupType::Full => std::fs::copy(
|
||||
Backup::path(&self.backup_dir, chain[index].start_time),
|
||||
output_path,
|
||||
)
|
||||
.map(|_| ()),
|
||||
// Incremental backups are exported one by one according to their contribution
|
||||
BackupType::Incremental => {
|
||||
let contributions = Delta::contributions(
|
||||
chain.iter().take(index + 1).map(|b| &b.delta).rev(),
|
||||
);
|
||||
|
||||
let tar_gz = OpenOptions::new()
|
||||
.write(true)
|
||||
.create(true)
|
||||
.open(output_path.as_ref())?;
|
||||
let enc = GzEncoder::new(tar_gz, Compression::default());
|
||||
let mut ar = tar::Builder::new(enc);
|
||||
|
||||
// We only need to consider backups that have a non-empty contribution.
|
||||
// This allows us to skip reading backups that have been completely
|
||||
// overwritten by their successors anyways.
|
||||
for (contribution, backup) in contributions
|
||||
.iter()
|
||||
.rev()
|
||||
.zip(chain.iter().take(index + 1))
|
||||
.filter(|(contribution, _)| !contribution.is_empty())
|
||||
{
|
||||
println!("{}", &backup);
|
||||
backup.append(&self.backup_dir, contribution, &mut ar)?;
|
||||
}
|
||||
|
||||
let mut enc = ar.into_inner()?;
|
||||
enc.try_finish()
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Get a reference to the underlying chains
|
||||
pub fn chains(&self) -> &Vec<Vec<Backup<T>>> {
|
||||
&self.chains
|
||||
}
|
||||
}
|
||||
149
backup/src/path.rs
Normal file
149
backup/src/path.rs
Normal file
|
|
@ -0,0 +1,149 @@
|
|||
use std::{
|
||||
collections::HashSet,
|
||||
ffi::OsString,
|
||||
fs::{self, DirEntry},
|
||||
io,
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
|
||||
use chrono::{Local, Utc};
|
||||
|
||||
pub struct ReadDirRecursive {
|
||||
ignored: HashSet<OsString>,
|
||||
read_dir: fs::ReadDir,
|
||||
dir_stack: Vec<PathBuf>,
|
||||
files_only: bool,
|
||||
}
|
||||
|
||||
impl ReadDirRecursive {
|
||||
/// Start the iterator for a new directory
|
||||
pub fn start<P: AsRef<Path>>(path: P) -> io::Result<Self> {
|
||||
let path = path.as_ref();
|
||||
let read_dir = path.read_dir()?;
|
||||
|
||||
Ok(ReadDirRecursive {
|
||||
ignored: HashSet::new(),
|
||||
read_dir,
|
||||
dir_stack: Vec::new(),
|
||||
files_only: false,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn ignored<S: Into<OsString>>(mut self, s: S) -> Self {
|
||||
self.ignored.insert(s.into());
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
pub fn files(mut self) -> Self {
|
||||
self.files_only = true;
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
/// Tries to populate the `read_dir` field with a new `ReadDir` instance to consume.
|
||||
fn next_read_dir(&mut self) -> io::Result<bool> {
|
||||
if let Some(path) = self.dir_stack.pop() {
|
||||
self.read_dir = path.read_dir()?;
|
||||
|
||||
Ok(true)
|
||||
} else {
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
|
||||
/// Convenience method to add a new directory to the stack.
|
||||
fn push_entry(&mut self, entry: &io::Result<DirEntry>) {
|
||||
if let Ok(entry) = entry {
|
||||
if entry.path().is_dir() {
|
||||
self.dir_stack.push(entry.path());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Determine whether an entry should be returned by the iterator.
|
||||
fn should_return(&self, entry: &io::Result<DirEntry>) -> bool {
|
||||
if let Ok(entry) = entry {
|
||||
let mut res = !self.ignored.contains(&entry.file_name());
|
||||
|
||||
// Please just let me combine these already
|
||||
if self.files_only {
|
||||
if let Ok(file_type) = entry.file_type() {
|
||||
res = res && file_type.is_file();
|
||||
}
|
||||
// We couldn't determine if it's a file, so we don't return it
|
||||
else {
|
||||
res = false;
|
||||
}
|
||||
}
|
||||
|
||||
res
|
||||
} else {
|
||||
true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Iterator for ReadDirRecursive {
|
||||
type Item = io::Result<DirEntry>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
loop {
|
||||
// First, we try to consume the current directory's items
|
||||
while let Some(entry) = self.read_dir.next() {
|
||||
self.push_entry(&entry);
|
||||
|
||||
if self.should_return(&entry) {
|
||||
return Some(entry);
|
||||
}
|
||||
}
|
||||
|
||||
// If we get an error while setting up a new directory, we return this, otherwise we
|
||||
// keep trying to consume the directories
|
||||
match self.next_read_dir() {
|
||||
Ok(true) => (),
|
||||
// There's no more directories to traverse, so the iterator is done
|
||||
Ok(false) => return None,
|
||||
Err(e) => return Some(Err(e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub trait PathExt {
|
||||
/// Confirm whether the file has not been modified since the given timestamp.
|
||||
///
|
||||
/// This function will only return true if it can determine with certainty that the file hasn't
|
||||
/// been modified.
|
||||
///
|
||||
/// # Args
|
||||
///
|
||||
/// * `timestamp` - Timestamp to compare modified time with
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// True if the file has not been modified for sure, false otherwise.
|
||||
fn not_modified_since(&self, timestamp: chrono::DateTime<Utc>) -> bool;
|
||||
|
||||
/// An extension of the `read_dir` command that runs through the entire underlying directory
|
||||
/// structure using breadth-first search
|
||||
fn read_dir_recursive(&self) -> io::Result<ReadDirRecursive>;
|
||||
}
|
||||
|
||||
impl PathExt for Path {
|
||||
fn not_modified_since(&self, timestamp: chrono::DateTime<Utc>) -> bool {
|
||||
self.metadata()
|
||||
.and_then(|m| m.modified())
|
||||
.map(|last_modified| {
|
||||
let t: chrono::DateTime<Utc> = last_modified.into();
|
||||
let t = t.with_timezone(&Local);
|
||||
|
||||
t < timestamp
|
||||
})
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
fn read_dir_recursive(&self) -> io::Result<ReadDirRecursive> {
|
||||
ReadDirRecursive::start(self)
|
||||
}
|
||||
}
|
||||
98
backup/src/state.rs
Normal file
98
backup/src/state.rs
Normal file
|
|
@ -0,0 +1,98 @@
|
|||
use std::{
|
||||
borrow::Borrow,
|
||||
collections::{HashMap, HashSet},
|
||||
ops::{Deref, DerefMut},
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::Delta;
|
||||
|
||||
/// Struct that represents a current state for a backup. This struct acts as a smart pointer around
|
||||
/// a HashMap.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct State(HashMap<PathBuf, HashSet<PathBuf>>);
|
||||
|
||||
impl State {
|
||||
pub fn new() -> Self {
|
||||
State(HashMap::new())
|
||||
}
|
||||
|
||||
/// Apply the delta to the current state.
|
||||
pub fn apply(&mut self, delta: &Delta) {
|
||||
// First we add new files, then we remove the old ones
|
||||
for (dir, added) in delta.added.iter() {
|
||||
if let Some(current) = self.0.get_mut(dir) {
|
||||
current.extend(added.iter().cloned());
|
||||
} else {
|
||||
self.0.insert(dir.clone(), added.clone());
|
||||
}
|
||||
}
|
||||
|
||||
for (dir, removed) in delta.removed.iter() {
|
||||
if let Some(current) = self.0.get_mut(dir) {
|
||||
current.retain(|k| !removed.contains(k));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns whether the provided relative path is part of the given state.
|
||||
pub fn contains<P: AsRef<Path>>(&self, path: P) -> bool {
|
||||
let path = path.as_ref();
|
||||
|
||||
self.0.iter().any(|(dir, files)| {
|
||||
path.starts_with(dir) && files.contains(path.strip_prefix(dir).unwrap())
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns whether the state is empty.
|
||||
///
|
||||
/// Note that this does not necessarily mean that the state does not contain any sets, but
|
||||
/// rather that any sets that it does contain are also empty.
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.0.values().all(|s| s.is_empty())
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> From<T> for State
|
||||
where
|
||||
T: IntoIterator,
|
||||
T::Item: Borrow<Delta>,
|
||||
{
|
||||
fn from(deltas: T) -> Self {
|
||||
let mut state = State::new();
|
||||
|
||||
for delta in deltas {
|
||||
state.apply(delta.borrow());
|
||||
}
|
||||
|
||||
state
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<HashMap<PathBuf, HashSet<PathBuf>>> for State {
|
||||
fn as_ref(&self) -> &HashMap<PathBuf, HashSet<PathBuf>> {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for State {
|
||||
type Target = HashMap<PathBuf, HashSet<PathBuf>>;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl DerefMut for State {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for State {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue