feat: granular locking for proper concurrent access to server process
							parent
							
								
									a51ff3937d
								
							
						
					
					
						commit
						b3d1cec078
					
				|  | @ -17,6 +17,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 | |||
| 
 | ||||
| * Export command no longer reads backups that do not contribute to the final | ||||
|   state | ||||
| * Running backups no longer block stdin input or shutdown | ||||
| 
 | ||||
| ## [0.3.1](https://git.rustybever.be/Chewing_Bever/alex/src/tag/0.3.1) | ||||
| 
 | ||||
|  |  | |||
|  | @ -1,9 +1,6 @@ | |||
| mod config; | ||||
| 
 | ||||
| use std::{ | ||||
|     path::PathBuf, | ||||
|     sync::{Arc, Mutex}, | ||||
| }; | ||||
| use std::{path::PathBuf, sync::Arc}; | ||||
| 
 | ||||
| use clap::Args; | ||||
| use serde::{Deserialize, Serialize}; | ||||
|  | @ -45,11 +42,15 @@ pub struct RunArgs { | |||
|     pub xmx: Option<u64>, | ||||
| } | ||||
| 
 | ||||
| fn backups_thread(counter: Arc<Mutex<server::ServerProcess>>) { | ||||
| fn backups_thread(server: Arc<server::ServerProcess>) { | ||||
|     loop { | ||||
|         let next_scheduled_time = { | ||||
|             let server = counter.lock().unwrap(); | ||||
|             server.backups.next_scheduled_time().unwrap() | ||||
|             server | ||||
|                 .backups | ||||
|                 .read() | ||||
|                 .unwrap() | ||||
|                 .next_scheduled_time() | ||||
|                 .unwrap() | ||||
|         }; | ||||
| 
 | ||||
|         let now = chrono::offset::Utc::now(); | ||||
|  | @ -57,12 +58,8 @@ fn backups_thread(counter: Arc<Mutex<server::ServerProcess>>) { | |||
|             std::thread::sleep((next_scheduled_time - now).to_std().unwrap()); | ||||
|         } | ||||
| 
 | ||||
|         { | ||||
|             let mut server = counter.lock().unwrap(); | ||||
| 
 | ||||
|             // We explicitely ignore the error here, as we don't want the thread to fail
 | ||||
|             let _ = server.backup(); | ||||
|         } | ||||
|         // We explicitely ignore the error here, as we don't want the thread to fail
 | ||||
|         let _ = server.backup(); | ||||
|     } | ||||
| } | ||||
| 
 | ||||
|  | @ -90,7 +87,7 @@ impl RunCli { | |||
|             return Ok(()); | ||||
|         } | ||||
| 
 | ||||
|         let counter = Arc::new(Mutex::new(cmd.spawn()?)); | ||||
|         let counter = Arc::new(cmd.spawn()?); | ||||
| 
 | ||||
|         if !global.layers.is_empty() { | ||||
|             let clone = Arc::clone(&counter); | ||||
|  |  | |||
|  | @ -1,22 +1,21 @@ | |||
| use crate::backup::MetaManager; | ||||
| use crate::server::Metadata; | ||||
| use std::io::Write; | ||||
| use std::process::Child; | ||||
| use std::{io::Write, process::Child, sync::RwLock}; | ||||
| 
 | ||||
| use crate::{backup::MetaManager, server::Metadata}; | ||||
| 
 | ||||
| pub struct ServerProcess { | ||||
|     child: Child, | ||||
|     pub backups: MetaManager<Metadata>, | ||||
|     child: RwLock<Child>, | ||||
|     pub backups: RwLock<MetaManager<Metadata>>, | ||||
| } | ||||
| 
 | ||||
| impl ServerProcess { | ||||
|     pub fn new(manager: MetaManager<Metadata>, child: Child) -> ServerProcess { | ||||
|         ServerProcess { | ||||
|             child, | ||||
|             backups: manager, | ||||
|             child: RwLock::new(child), | ||||
|             backups: RwLock::new(manager), | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     pub fn send_command(&mut self, cmd: &str) -> std::io::Result<()> { | ||||
|     pub fn send_command(&self, cmd: &str) -> std::io::Result<()> { | ||||
|         match cmd.trim() { | ||||
|             "stop" | "exit" => self.stop()?, | ||||
|             "backup" => self.backup()?, | ||||
|  | @ -26,29 +25,34 @@ impl ServerProcess { | |||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     fn custom(&mut self, cmd: &str) -> std::io::Result<()> { | ||||
|         let mut stdin = self.child.stdin.as_ref().unwrap(); | ||||
|     fn custom(&self, cmd: &str) -> std::io::Result<()> { | ||||
|         let child = self.child.write().unwrap(); | ||||
|         let mut stdin = child.stdin.as_ref().unwrap(); | ||||
|         stdin.write_all(format!("{}\n", cmd.trim()).as_bytes())?; | ||||
|         stdin.flush()?; | ||||
| 
 | ||||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     pub fn stop(&mut self) -> std::io::Result<()> { | ||||
|     pub fn stop(&self) -> std::io::Result<()> { | ||||
|         self.custom("stop")?; | ||||
|         self.child.wait()?; | ||||
| 
 | ||||
|         self.child.write().unwrap().wait()?; | ||||
| 
 | ||||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     pub fn kill(&mut self) -> std::io::Result<()> { | ||||
|         self.child.kill() | ||||
|     pub fn kill(&self) -> std::io::Result<()> { | ||||
|         self.child.write().unwrap().kill() | ||||
|     } | ||||
| 
 | ||||
|     /// Perform a backup by disabling the server's save feature and flushing its data, before
 | ||||
|     /// creating an archive file.
 | ||||
|     pub fn backup(&mut self) -> std::io::Result<()> { | ||||
|         let layer_name = String::from(self.backups.next_scheduled_layer().unwrap()); | ||||
|     pub fn backup(&self) -> std::io::Result<()> { | ||||
|         // We explicitely lock this entire function to prevent parallel backups
 | ||||
|         let mut backups = self.backups.write().unwrap(); | ||||
| 
 | ||||
|         let layer_name = String::from(backups.next_scheduled_layer().unwrap()); | ||||
|         self.custom(&format!("say starting backup for layer '{}'", layer_name))?; | ||||
| 
 | ||||
|         // Make sure the server isn't modifying the files during the backup
 | ||||
|  | @ -60,7 +64,7 @@ impl ServerProcess { | |||
|         std::thread::sleep(std::time::Duration::from_secs(10)); | ||||
| 
 | ||||
|         let start_time = chrono::offset::Utc::now(); | ||||
|         let res = self.backups.perform_backup_cycle(); | ||||
|         let res = backups.perform_backup_cycle(); | ||||
| 
 | ||||
|         // The server's save feature needs to be enabled again even if the archive failed to create
 | ||||
|         self.custom("save-on")?; | ||||
|  |  | |||
|  | @ -1,6 +1,6 @@ | |||
| use std::{ | ||||
|     io, | ||||
|     sync::{atomic::AtomicBool, Arc, Mutex}, | ||||
|     sync::{atomic::AtomicBool, Arc}, | ||||
| }; | ||||
| 
 | ||||
| use signal_hook::{ | ||||
|  | @ -37,7 +37,7 @@ pub fn install_signal_handlers() -> io::Result<(Arc<AtomicBool>, SignalsInfo)> { | |||
| /// Loop that handles terminating signals as they come in.
 | ||||
| pub fn handle_signals( | ||||
|     signals: &mut SignalsInfo, | ||||
|     counter: Arc<Mutex<server::ServerProcess>>, | ||||
|     server: Arc<server::ServerProcess>, | ||||
| ) -> io::Result<()> { | ||||
|     let mut force = false; | ||||
| 
 | ||||
|  | @ -49,17 +49,15 @@ pub fn handle_signals( | |||
|         // This will currently not work, as the initial stop command will block the kill from
 | ||||
|         // happening.
 | ||||
|         if force { | ||||
|             let mut server = counter.lock().unwrap(); | ||||
|             return server.kill(); | ||||
|         } | ||||
|         // The stop command runs in a separate thread to avoid blocking the signal handling loop.
 | ||||
|         // After stopping the server, the thread terminates the process.
 | ||||
|         else { | ||||
|             let clone = Arc::clone(&counter); | ||||
|             let clone = Arc::clone(&server); | ||||
| 
 | ||||
|             std::thread::spawn(move || { | ||||
|                 let mut server = clone.lock().unwrap(); | ||||
|                 let _ = server.stop(); | ||||
|                 let _ = clone.stop(); | ||||
|                 std::process::exit(0); | ||||
|             }); | ||||
|         } | ||||
|  |  | |||
							
								
								
									
										17
									
								
								src/stdin.rs
								
								
								
								
							
							
						
						
									
										17
									
								
								src/stdin.rs
								
								
								
								
							|  | @ -1,11 +1,8 @@ | |||
| use std::{ | ||||
|     io, | ||||
|     sync::{Arc, Mutex}, | ||||
| }; | ||||
| use std::{io, sync::Arc}; | ||||
| 
 | ||||
| use crate::server; | ||||
| 
 | ||||
| pub fn handle_stdin(counter: Arc<Mutex<server::ServerProcess>>) { | ||||
| pub fn handle_stdin(server: Arc<server::ServerProcess>) { | ||||
|     let stdin = io::stdin(); | ||||
|     let input = &mut String::new(); | ||||
| 
 | ||||
|  | @ -16,13 +13,9 @@ pub fn handle_stdin(counter: Arc<Mutex<server::ServerProcess>>) { | |||
|             continue; | ||||
|         }; | ||||
| 
 | ||||
|         { | ||||
|             let mut server = counter.lock().unwrap(); | ||||
| 
 | ||||
|             if let Err(e) = server.send_command(input) { | ||||
|                 println!("{}", e); | ||||
|             }; | ||||
|         } | ||||
|         if let Err(e) = server.send_command(input) { | ||||
|             println!("{}", e); | ||||
|         }; | ||||
| 
 | ||||
|         if input.trim() == "stop" { | ||||
|             std::process::exit(0); | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue