From b3d1cec078557a1f8f47517d3121471e55577fa9 Mon Sep 17 00:00:00 2001 From: Chewing_Bever Date: Sat, 12 Aug 2023 11:44:35 +0200 Subject: [PATCH] feat: granular locking for proper concurrent access to server process --- CHANGELOG.md | 1 + src/cli/run/mod.rs | 25 +++++++++++-------------- src/server/process.rs | 40 ++++++++++++++++++++++------------------ src/signals.rs | 10 ++++------ src/stdin.rs | 17 +++++------------ 5 files changed, 43 insertions(+), 50 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 553fd69..e2e02ad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 * Export command no longer reads backups that do not contribute to the final state +* Running backups no longer block stdin input or shutdown ## [0.3.1](https://git.rustybever.be/Chewing_Bever/alex/src/tag/0.3.1) diff --git a/src/cli/run/mod.rs b/src/cli/run/mod.rs index 21c6a0a..4a11692 100644 --- a/src/cli/run/mod.rs +++ b/src/cli/run/mod.rs @@ -1,9 +1,6 @@ mod config; -use std::{ - path::PathBuf, - sync::{Arc, Mutex}, -}; +use std::{path::PathBuf, sync::Arc}; use clap::Args; use serde::{Deserialize, Serialize}; @@ -45,11 +42,15 @@ pub struct RunArgs { pub xmx: Option, } -fn backups_thread(counter: Arc>) { +fn backups_thread(server: Arc) { loop { let next_scheduled_time = { - let server = counter.lock().unwrap(); - server.backups.next_scheduled_time().unwrap() + server + .backups + .read() + .unwrap() + .next_scheduled_time() + .unwrap() }; let now = chrono::offset::Utc::now(); @@ -57,12 +58,8 @@ fn backups_thread(counter: Arc>) { std::thread::sleep((next_scheduled_time - now).to_std().unwrap()); } - { - let mut server = counter.lock().unwrap(); - - // We explicitely ignore the error here, as we don't want the thread to fail - let _ = server.backup(); - } + // We explicitely ignore the error here, as we don't want the thread to fail + let _ = server.backup(); } } @@ -90,7 +87,7 @@ impl RunCli { return Ok(()); } - let counter = Arc::new(Mutex::new(cmd.spawn()?)); + let counter = Arc::new(cmd.spawn()?); if !global.layers.is_empty() { let clone = Arc::clone(&counter); diff --git a/src/server/process.rs b/src/server/process.rs index 7748c1d..e7ff3d5 100644 --- a/src/server/process.rs +++ b/src/server/process.rs @@ -1,22 +1,21 @@ -use crate::backup::MetaManager; -use crate::server::Metadata; -use std::io::Write; -use std::process::Child; +use std::{io::Write, process::Child, sync::RwLock}; + +use crate::{backup::MetaManager, server::Metadata}; pub struct ServerProcess { - child: Child, - pub backups: MetaManager, + child: RwLock, + pub backups: RwLock>, } impl ServerProcess { pub fn new(manager: MetaManager, child: Child) -> ServerProcess { ServerProcess { - child, - backups: manager, + child: RwLock::new(child), + backups: RwLock::new(manager), } } - pub fn send_command(&mut self, cmd: &str) -> std::io::Result<()> { + pub fn send_command(&self, cmd: &str) -> std::io::Result<()> { match cmd.trim() { "stop" | "exit" => self.stop()?, "backup" => self.backup()?, @@ -26,29 +25,34 @@ impl ServerProcess { Ok(()) } - fn custom(&mut self, cmd: &str) -> std::io::Result<()> { - let mut stdin = self.child.stdin.as_ref().unwrap(); + fn custom(&self, cmd: &str) -> std::io::Result<()> { + let child = self.child.write().unwrap(); + let mut stdin = child.stdin.as_ref().unwrap(); stdin.write_all(format!("{}\n", cmd.trim()).as_bytes())?; stdin.flush()?; Ok(()) } - pub fn stop(&mut self) -> std::io::Result<()> { + pub fn stop(&self) -> std::io::Result<()> { self.custom("stop")?; - self.child.wait()?; + + self.child.write().unwrap().wait()?; Ok(()) } - pub fn kill(&mut self) -> std::io::Result<()> { - self.child.kill() + pub fn kill(&self) -> std::io::Result<()> { + self.child.write().unwrap().kill() } /// Perform a backup by disabling the server's save feature and flushing its data, before /// creating an archive file. - pub fn backup(&mut self) -> std::io::Result<()> { - let layer_name = String::from(self.backups.next_scheduled_layer().unwrap()); + pub fn backup(&self) -> std::io::Result<()> { + // We explicitely lock this entire function to prevent parallel backups + let mut backups = self.backups.write().unwrap(); + + let layer_name = String::from(backups.next_scheduled_layer().unwrap()); self.custom(&format!("say starting backup for layer '{}'", layer_name))?; // Make sure the server isn't modifying the files during the backup @@ -60,7 +64,7 @@ impl ServerProcess { std::thread::sleep(std::time::Duration::from_secs(10)); let start_time = chrono::offset::Utc::now(); - let res = self.backups.perform_backup_cycle(); + let res = backups.perform_backup_cycle(); // The server's save feature needs to be enabled again even if the archive failed to create self.custom("save-on")?; diff --git a/src/signals.rs b/src/signals.rs index 5462a75..87e5c73 100644 --- a/src/signals.rs +++ b/src/signals.rs @@ -1,6 +1,6 @@ use std::{ io, - sync::{atomic::AtomicBool, Arc, Mutex}, + sync::{atomic::AtomicBool, Arc}, }; use signal_hook::{ @@ -37,7 +37,7 @@ pub fn install_signal_handlers() -> io::Result<(Arc, SignalsInfo)> { /// Loop that handles terminating signals as they come in. pub fn handle_signals( signals: &mut SignalsInfo, - counter: Arc>, + server: Arc, ) -> io::Result<()> { let mut force = false; @@ -49,17 +49,15 @@ pub fn handle_signals( // This will currently not work, as the initial stop command will block the kill from // happening. if force { - let mut server = counter.lock().unwrap(); return server.kill(); } // The stop command runs in a separate thread to avoid blocking the signal handling loop. // After stopping the server, the thread terminates the process. else { - let clone = Arc::clone(&counter); + let clone = Arc::clone(&server); std::thread::spawn(move || { - let mut server = clone.lock().unwrap(); - let _ = server.stop(); + let _ = clone.stop(); std::process::exit(0); }); } diff --git a/src/stdin.rs b/src/stdin.rs index 4a7f761..8317671 100644 --- a/src/stdin.rs +++ b/src/stdin.rs @@ -1,11 +1,8 @@ -use std::{ - io, - sync::{Arc, Mutex}, -}; +use std::{io, sync::Arc}; use crate::server; -pub fn handle_stdin(counter: Arc>) { +pub fn handle_stdin(server: Arc) { let stdin = io::stdin(); let input = &mut String::new(); @@ -16,13 +13,9 @@ pub fn handle_stdin(counter: Arc>) { continue; }; - { - let mut server = counter.lock().unwrap(); - - if let Err(e) = server.send_command(input) { - println!("{}", e); - }; - } + if let Err(e) = server.send_command(input) { + println!("{}", e); + }; if input.trim() == "stop" { std::process::exit(0);