diff --git a/server/src/db/query/mod.rs b/server/src/db/query/mod.rs index f0a809b..9eb7954 100644 --- a/server/src/db/query/mod.rs +++ b/server/src/db/query/mod.rs @@ -1,5 +1,3 @@ pub mod distro; pub mod package; pub mod repo; - -type Result = std::result::Result; diff --git a/server/src/db/query/package.rs b/server/src/db/query/package.rs index bfdad73..ad9d74a 100644 --- a/server/src/db/query/package.rs +++ b/server/src/db/query/package.rs @@ -1,8 +1,7 @@ use crate::db::{self, *}; -use futures::Stream; use sea_orm::{sea_query::IntoCondition, *}; -use sea_query::{Alias, Asterisk, Expr, IntoColumnRef, Query, SelectStatement}; +use sea_query::{Alias, Expr, Query, SelectStatement}; use serde::Deserialize; #[derive(Deserialize)] diff --git a/server/src/main.rs b/server/src/main.rs index 274d419..33865b5 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -8,7 +8,7 @@ mod repo; pub use config::{Config, DbConfig, FsConfig}; pub use error::{Result, ServerError}; -use std::{io, path::PathBuf, sync::Arc}; +use std::{io, path::PathBuf}; use axum::Router; use tower_http::trace::TraceLayer; diff --git a/server/src/repo/actor.rs b/server/src/repo/actor.rs index 04289d1..57f1b93 100644 --- a/server/src/repo/actor.rs +++ b/server/src/repo/actor.rs @@ -1,4 +1,4 @@ -use super::{archive, package, Handle}; +use super::{archive, package}; use crate::db; use std::{ @@ -11,11 +11,8 @@ use std::{ }; use futures::StreamExt; -use sea_orm::{ - ActiveModelTrait, ColumnTrait, Condition, ConnectionTrait, DbConn, EntityTrait, JoinType, - ModelTrait, NotSet, QueryFilter, QuerySelect, Related, RelationTrait, Set, TransactionTrait, -}; -use sea_query::{Alias, Expr, Query}; +use sea_orm::{ColumnTrait, DbConn, EntityTrait, QueryFilter, QuerySelect}; +use sea_query::Expr; use tokio::{ runtime, sync::{ diff --git a/server/src/repo/archive.rs b/server/src/repo/archive.rs index 39b3b82..ad08a67 100644 --- a/server/src/repo/archive.rs +++ b/server/src/repo/archive.rs @@ -1,6 +1,6 @@ use crate::db; use std::{ - io::{self, Write}, + io::Write, path::{Path, PathBuf}, }; diff --git a/server/src/repo/handle.rs b/server/src/repo/handle.rs index 9e63e81..4cec237 100644 --- a/server/src/repo/handle.rs +++ b/server/src/repo/handle.rs @@ -4,21 +4,15 @@ use crate::db; use std::{ collections::HashMap, path::{Path, PathBuf}, - sync::{ - atomic::{AtomicU32, Ordering}, - Arc, Mutex, RwLock, - }, + sync::{atomic::Ordering, Arc}, }; use sea_orm::{ - ActiveModelTrait, ColumnTrait, Condition, ConnectionTrait, DbConn, EntityTrait, JoinType, - ModelTrait, NotSet, QueryFilter, QuerySelect, Related, RelationTrait, Set, TransactionTrait, -}; -use sea_query::{Alias, Expr, Query}; -use tokio::{ - runtime, - sync::mpsc::{unbounded_channel, UnboundedSender}, + ActiveModelTrait, ColumnTrait, Condition, DbConn, EntityTrait, NotSet, QueryFilter, + QuerySelect, Set, }; +use sea_query::Expr; +use tokio::runtime; use uuid::Uuid; #[derive(Clone)] diff --git a/server/src/repo/manager.rs b/server/src/repo/manager.rs deleted file mode 100644 index e4f0581..0000000 --- a/server/src/repo/manager.rs +++ /dev/null @@ -1,385 +0,0 @@ -use super::{archive, package}; -use crate::db::{self, query::package::delete_stale_pkgs}; - -use std::{ - collections::HashMap, - path::{Path, PathBuf}, - sync::{ - atomic::{AtomicU32, Ordering}, - Arc, - }, -}; - -use futures::StreamExt; -use sea_orm::{ - ActiveModelTrait, ColumnTrait, Condition, ConnectionTrait, DbConn, EntityTrait, JoinType, - ModelTrait, NotSet, QueryFilter, QuerySelect, Related, RelationTrait, Set, TransactionTrait, -}; -use sea_query::{Alias, Expr, Query}; -use tokio::sync::{ - mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, - Mutex, RwLock, -}; -use uuid::Uuid; - -struct PkgQueueMsg { - repo: i32, - path: PathBuf, -} - -/// A single instance of this struct orchestrates everything related to managing packages files on -/// disk for all repositories in the server -pub struct RepoMgr { - repos_dir: PathBuf, - conn: DbConn, - pkg_queue: ( - UnboundedSender, - Mutex>, - ), - repos: RwLock>)>>, -} - -impl RepoMgr { - pub async fn new>(repos_dir: P, conn: DbConn) -> crate::Result { - if !tokio::fs::try_exists(&repos_dir).await? { - tokio::fs::create_dir(&repos_dir).await?; - } - - let (tx, rx) = unbounded_channel(); - - let mut repos = HashMap::new(); - let repo_ids: Vec = db::Repo::find() - .select_only() - .column(db::repo::Column::Id) - .into_tuple() - .all(&conn) - .await?; - - for id in repo_ids { - repos.insert(id, Default::default()); - } - - Ok(Self { - repos_dir: repos_dir.as_ref().to_path_buf(), - conn, - pkg_queue: (tx, Mutex::new(rx)), - repos: RwLock::new(repos), - }) - } - - /// Generate archive databases for all known architectures in the repository, including the - /// "any" architecture. - pub async fn sync_repo(&self, repo: i32) -> crate::Result<()> { - let lock = self - .repos - .read() - .await - .get(&repo) - .map(|(_, lock)| Arc::clone(lock)); - - if lock.is_none() { - return Ok(()); - } - - let lock = lock.unwrap(); - let _guard = lock.lock().await; - - let archs: Vec = db::Package::find() - .filter(db::package::Column::RepoId.eq(repo)) - .select_only() - .column(db::package::Column::Arch) - .distinct() - .into_tuple() - .all(&self.conn) - .await?; - - for arch in archs { - self.generate_archives(repo, &arch).await?; - } - - Ok(()) - } - - /// Generate the archive databases for the given repository and architecture. - async fn generate_archives(&self, repo: i32, arch: &str) -> crate::Result<()> { - let [tmp_ar_db_path, tmp_ar_files_path, files_tmp_file_path, desc_tmp_file_path] = - self.random_file_paths(); - let ar_db = archive::RepoArchiveWriter::open(&tmp_ar_db_path).await?; - let ar_files = archive::RepoArchiveWriter::open(&tmp_ar_files_path).await?; - - // Query all packages in the repo that have the given architecture or the "any" - // architecture - let mut pkgs = db::query::package::pkgs_to_sync(&self.conn, repo, arch) - .stream(&self.conn) - .await?; - - let mut commited_ids: Vec = Vec::new(); - - while let Some(pkg) = pkgs.next().await.transpose()? { - commited_ids.push(pkg.id); - - let mut files_tmp_file = tokio::fs::File::create(&files_tmp_file_path).await?; - let mut desc_tmp_file = tokio::fs::File::create(&desc_tmp_file_path).await?; - - package::write_files(&self.conn, &mut files_tmp_file, &pkg).await?; - package::write_desc(&self.conn, &mut desc_tmp_file, &pkg).await?; - - let full_name = format!("{}-{}", pkg.name, pkg.version); - - ar_db - .add_entry(&full_name, &desc_tmp_file_path, true) - .await?; - ar_files - .add_entry(&full_name, &desc_tmp_file_path, true) - .await?; - ar_files - .add_entry(&full_name, &files_tmp_file_path, false) - .await?; - } - - // Cleanup - ar_db.close().await?; - ar_files.close().await?; - - let repo_dir = self.repos_dir.join(repo.to_string()); - - // Move the db archives to their respective places - tokio::fs::rename(tmp_ar_db_path, repo_dir.join(format!("{}.db.tar.gz", arch))).await?; - tokio::fs::rename( - tmp_ar_files_path, - repo_dir.join(format!("{}.files.tar.gz", arch)), - ) - .await?; - - // Only after we have successfully written everything to disk do we update the database. - // This order ensures any failure can be recovered, as the database is our single source of - // truth. - db::Package::update_many() - .col_expr( - db::package::Column::State, - Expr::value(db::PackageState::Committed), - ) - .filter(db::package::Column::Id.is_in(commited_ids)) - .exec(&self.conn) - .await?; - - // If this fails there's no point in failing the function + if there were no packages in - // the repo, this fails anyway because the temp file doesn't exist - let _ = tokio::fs::remove_file(desc_tmp_file_path).await; - let _ = tokio::fs::remove_file(files_tmp_file_path).await; - - tracing::info!("Package archives generated for repo {} ('{}')", repo, arch); - - Ok(()) - } - - /// Clean any remaining old package files from the database and file system - pub async fn remove_stale_pkgs(&self) -> crate::Result<()> { - let mut pkgs = db::query::package::stale_pkgs(&self.conn) - .stream(&self.conn) - .await?; - - // Ids are monotonically increasing, so the max id suffices to know which packages to - // remove later - let mut max_id = -1; - let mut removed_pkgs = 0; - - while let Some(pkg) = pkgs.next().await.transpose()? { - // Failing to remove the package file isn't the biggest problem - let _ = tokio::fs::remove_file( - self.repos_dir - .join(pkg.repo_id.to_string()) - .join(pkg.id.to_string()), - ) - .await; - - if pkg.id > max_id { - max_id = pkg.id; - } - - removed_pkgs += 1; - } - - if removed_pkgs > 0 { - db::query::package::delete_stale_pkgs(&self.conn, max_id).await?; - } - - tracing::info!("Removed {removed_pkgs} stale package(s)"); - - Ok(()) - } - - pub async fn pkg_parse_task(&self) { - loop { - // Receive the next message and immediately drop the mutex afterwards. As long as the - // quue is empty, this will lock the mutex. This is okay, as the mutex will be unlocked - // as soon as a message is received, so another worker can pick up the mutex. - let msg = { - let mut recv = self.pkg_queue.1.lock().await; - recv.recv().await - }; - - if let Some(msg) = msg { - // TODO better handle this error (retry if failure wasn't because the package is - // faulty) - let _ = self - .add_pkg_from_path(msg.path, msg.repo) - .await - .inspect_err(|e| tracing::error!("{:?}", e)); - - let old = self - .repos - .read() - .await - .get(&msg.repo) - .map(|n| n.0.fetch_sub(1, Ordering::SeqCst)); - - // Every time the queue for a repo becomes empty, we run a sync job - if old == Some(1) { - // TODO error handling - let _ = self.sync_repo(msg.repo).await; - - // TODO move this so that we only clean if entire queue is empty, not just - // queue for specific repo - let _ = self.remove_stale_pkgs().await; - } - } - } - } - - pub async fn queue_pkg(&self, repo: i32, path: PathBuf) { - self.pkg_queue.0.send(PkgQueueMsg { path, repo }).unwrap(); - self.repos.read().await.get(&repo).inspect(|n| { - n.0.fetch_add(1, Ordering::SeqCst); - }); - } - - pub async fn get_repo(&self, distro: &str, repo: &str) -> crate::Result> { - Ok(db::Repo::find() - .find_also_related(db::Distro) - .filter( - Condition::all() - .add(db::repo::Column::Name.eq(repo)) - .add(db::distro::Column::Name.eq(distro)), - ) - .one(&self.conn) - .await - .map(|res| res.map(|(repo, _)| repo.id))?) - } - - pub async fn get_or_create_repo(&self, distro: &str, repo: &str) -> crate::Result { - let mut repos = self.repos.write().await; - - let distro_id: Option = db::Distro::find() - .filter(db::distro::Column::Name.eq(distro)) - .select_only() - .column(db::distro::Column::Id) - .into_tuple() - .one(&self.conn) - .await?; - - let distro_id = if let Some(id) = distro_id { - id - } else { - let new_distro = db::distro::ActiveModel { - id: NotSet, - name: Set(distro.to_string()), - description: NotSet, - }; - - new_distro.insert(&self.conn).await?.id - }; - - let repo_id: Option = db::Repo::find() - .filter(db::repo::Column::DistroId.eq(distro_id)) - .filter(db::repo::Column::Name.eq(repo)) - .select_only() - .column(db::repo::Column::Id) - .into_tuple() - .one(&self.conn) - .await?; - - let repo_id = if let Some(id) = repo_id { - id - } else { - let new_repo = db::repo::ActiveModel { - id: NotSet, - distro_id: Set(distro_id), - name: Set(repo.to_string()), - description: NotSet, - }; - let id = new_repo.insert(&self.conn).await?.id; - - tokio::fs::create_dir(self.repos_dir.join(id.to_string())).await?; - repos.insert(id, Default::default()); - - id - }; - - Ok(repo_id) - } - - async fn add_pkg_from_path>(&self, path: P, repo: i32) -> crate::Result<()> { - let path_clone = path.as_ref().to_path_buf(); - let pkg = tokio::task::spawn_blocking(move || package::Package::open(path_clone)) - .await - .unwrap()?; - - // TODO prevent database from being updated but file failing to move to repo dir? - let pkg = db::query::package::insert(&self.conn, repo, pkg).await?; - - let dest_path = self - .repos_dir - .join(repo.to_string()) - .join(pkg.id.to_string()); - tokio::fs::rename(path.as_ref(), dest_path).await?; - - tracing::info!( - "Added '{}-{}-{}' to repository {}", - pkg.name, - pkg.version, - pkg.arch, - repo, - ); - - Ok(()) - } - - pub async fn remove_repo(&self, repo: i32) -> crate::Result<()> { - self.repos.write().await.remove(&repo); - db::Repo::delete_by_id(repo).exec(&self.conn).await?; - let _ = tokio::fs::remove_dir_all(self.repos_dir.join(repo.to_string())).await; - - Ok(()) - } - - /// Remove all packages in the repository that have a given arch. This method marks all - /// packages with the given architecture as "pending deletion", before performing a manual sync - /// & removal of stale packages. - pub async fn remove_repo_arch(&self, repo: i32, arch: &str) -> crate::Result<()> { - db::Package::update_many() - .col_expr( - db::package::Column::State, - Expr::value(db::PackageState::PendingDeletion), - ) - .filter( - Condition::all() - .add(db::package::Column::RepoId.eq(repo)) - .add(db::package::Column::Arch.eq(arch)), - ) - .exec(&self.conn) - .await?; - - self.sync_repo(repo).await?; - self.remove_stale_pkgs().await?; - - Ok(()) - } - - pub fn random_file_paths(&self) -> [PathBuf; C] { - std::array::from_fn(|_| { - let uuid: uuid::fmt::Simple = Uuid::new_v4().into(); - self.repos_dir.join(uuid.to_string()) - }) - } -} diff --git a/server/src/repo/package.rs b/server/src/repo/package.rs index df98559..70466ba 100644 --- a/server/src/repo/package.rs +++ b/server/src/repo/package.rs @@ -1,19 +1,17 @@ -use crate::db::{self, entities::package, PackageRelatedEnum}; +use crate::db::entities::package; use std::{ fmt, fs, - io::{self, BufRead, BufReader, BufWriter, Read, Write}, + io::{self, BufRead, BufReader, Read}, path::{Path, PathBuf}, }; use chrono::NaiveDateTime; -use futures::StreamExt; use libarchive::{ read::{Archive, Builder}, Entry, ReadFilter, }; -use sea_orm::{ActiveValue::Set, ColumnTrait, DbConn, ModelTrait, QueryFilter, QuerySelect}; -use tokio::io::{AsyncWrite, AsyncWriteExt}; +use sea_orm::ActiveValue::Set; const IGNORED_FILES: [&str; 5] = [".BUILDINFO", ".INSTALL", ".MTREE", ".PKGINFO", ".CHANGELOG"]; @@ -204,74 +202,6 @@ impl Package { self.compression.extension().unwrap() ) } - - /// Write the formatted desc file to the provided writer - pub fn write_desc(&self, w: &mut W) -> io::Result<()> { - // We write a lot of small strings to the writer, so wrapping it in a BufWriter is - // beneficial - let mut w = BufWriter::new(w); - - let info = &self.info; - - writeln!(w, "%FILENAME%\n{}", self.file_name())?; - - let mut write = |key: &str, value: &str| { - if !value.is_empty() { - writeln!(w, "\n%{}%\n{}", key, value) - } else { - Ok(()) - } - }; - - write("NAME", &info.name)?; - write("BASE", &info.base)?; - write("VERSION", &info.version)?; - - if let Some(ref description) = info.description { - write("DESC", description)?; - } - write("GROUPS", &info.groups.join("\n"))?; - write("CSIZE", &info.csize.to_string())?; - write("ISIZE", &info.size.to_string())?; - - write("SHA256SUM", &info.sha256sum)?; - - if let Some(ref url) = info.url { - write("URL", url)?; - } - - write("LICENSE", &info.licenses.join("\n"))?; - write("ARCH", &info.arch)?; - write("BUILDDATE", &info.build_date.timestamp().to_string())?; - - if let Some(ref packager) = info.packager { - write("PACKAGER", packager)?; - } - - write("REPLACES", &info.replaces.join("\n"))?; - write("CONFLICTS", &info.conflicts.join("\n"))?; - write("PROVIDES", &info.provides.join("\n"))?; - write("DEPENDS", &info.depends.join("\n"))?; - write("OPTDEPENDS", &info.optdepends.join("\n"))?; - write("MAKEDEPENDS", &info.makedepends.join("\n"))?; - write("CHECKDEPENDS", &info.checkdepends.join("\n"))?; - - Ok(()) - } - - pub fn write_files(&self, w: &mut W) -> io::Result<()> { - // We write a lot of small strings to the writer, so wrapping it in a BufWriter is - // beneficial - let mut w = BufWriter::new(w); - - writeln!(w, "%FILES%")?; - - for file in &self.files { - writeln!(w, "{}", file.to_string_lossy())?; - } - - Ok(()) - } } impl From for package::ActiveModel { @@ -303,123 +233,3 @@ pub fn filename(pkg: &package::Model) -> String { pkg.name, pkg.version, pkg.arch, pkg.compression ) } - -async fn write_attribute( - writer: &mut W, - key: &str, - value: &str, -) -> io::Result<()> { - if !value.is_empty() { - let s = format!("\n%{}%\n{}\n", key, value); - writer.write_all(s.as_bytes()).await?; - } - - Ok(()) -} - -pub async fn write_desc( - conn: &DbConn, - writer: &mut W, - pkg: &package::Model, -) -> crate::Result<()> { - writer - .write_all(format!("%FILENAME%\n{}\n", pkg.id).as_bytes()) - .await?; - - write_attribute(writer, "NAME", &pkg.name).await?; - write_attribute(writer, "BASE", &pkg.base).await?; - write_attribute(writer, "VERSION", &pkg.version).await?; - - if let Some(ref description) = pkg.description { - write_attribute(writer, "DESC", description).await?; - } - - let groups: Vec = pkg - .find_related(db::PackageGroup) - .select_only() - .column(db::package_group::Column::Name) - .into_tuple() - .all(conn) - .await?; - write_attribute(writer, "GROUPS", &groups.join("\n")).await?; - - write_attribute(writer, "CSIZE", &pkg.c_size.to_string()).await?; - write_attribute(writer, "ISIZE", &pkg.size.to_string()).await?; - write_attribute(writer, "SHA256SUM", &pkg.sha256_sum).await?; - - if let Some(ref url) = pkg.url { - write_attribute(writer, "URL", url).await?; - } - - let licenses: Vec = pkg - .find_related(db::PackageLicense) - .select_only() - .column(db::package_license::Column::Name) - .into_tuple() - .all(conn) - .await?; - write_attribute(writer, "LICENSE", &licenses.join("\n")).await?; - - write_attribute(writer, "ARCH", &pkg.arch).await?; - - // TODO build date - write_attribute( - writer, - "BUILDDATE", - &pkg.build_date.and_utc().timestamp().to_string(), - ) - .await?; - - if let Some(ref packager) = pkg.packager { - write_attribute(writer, "PACKAGER", packager).await?; - } - - let related = [ - ("REPLACES", PackageRelatedEnum::Replaces), - ("CONFLICTS", PackageRelatedEnum::Conflicts), - ("PROVIDES", PackageRelatedEnum::Provides), - ("DEPENDS", PackageRelatedEnum::Depend), - ("OPTDEPENDS", PackageRelatedEnum::Optdepend), - ("MAKEDEPENDS", PackageRelatedEnum::Makedepend), - ("CHECKDEPENDS", PackageRelatedEnum::Checkdepend), - ]; - - for (key, attr) in related.into_iter() { - let items: Vec = pkg - .find_related(db::PackageRelated) - .filter(db::package_related::Column::Type.eq(attr)) - .select_only() - .column(db::package_related::Column::Name) - .into_tuple() - .all(conn) - .await?; - - write_attribute(writer, key, &items.join("\n")).await?; - } - - writer.flush().await?; - - Ok(()) -} - -pub async fn write_files( - conn: &DbConn, - writer: &mut W, - pkg: &package::Model, -) -> crate::Result<()> { - let line = "%FILES%\n"; - writer.write_all(line.as_bytes()).await?; - - // Generate the files list for the package - let mut files = pkg.find_related(db::PackageFile).stream(conn).await?; - - while let Some(file) = files.next().await.transpose()? { - writer - .write_all(format!("{}\n", file.path).as_bytes()) - .await?; - } - - writer.flush().await?; - - Ok(()) -}