Compare commits

...

15 Commits

19 changed files with 913 additions and 422 deletions

138
Cargo.lock generated
View File

@ -174,6 +174,15 @@ dependencies = [
"num-traits", "num-traits",
] ]
[[package]]
name = "atomic"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d818003e740b63afc82337e3160717f4f63078720a810b7b903e70a5d1d2994"
dependencies = [
"bytemuck",
]
[[package]] [[package]]
name = "atomic-waker" name = "atomic-waker"
version = "1.1.2" version = "1.1.2"
@ -380,6 +389,12 @@ dependencies = [
"syn 1.0.109", "syn 1.0.109",
] ]
[[package]]
name = "bytemuck"
version = "1.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78834c15cb5d5efe3452d58b1e8ba890dd62d21907f867f383358198e56ebca5"
[[package]] [[package]]
name = "byteorder" name = "byteorder"
version = "1.5.0" version = "1.5.0"
@ -630,6 +645,20 @@ version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a" checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a"
[[package]]
name = "figment"
version = "0.10.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8cb01cd46b0cf372153850f4c6c272d9cbea2da513e07538405148f95bd789f3"
dependencies = [
"atomic",
"pear",
"serde",
"toml",
"uncased",
"version_check",
]
[[package]] [[package]]
name = "flume" name = "flume"
version = "0.11.0" version = "0.11.0"
@ -1037,6 +1066,12 @@ dependencies = [
"syn 2.0.66", "syn 2.0.66",
] ]
[[package]]
name = "inlinable_string"
version = "0.1.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c8fae54786f62fb2918dcfae3d568594e50eb9b5c25bf04371af6fe7516452fb"
[[package]] [[package]]
name = "is_terminal_polyfill" name = "is_terminal_polyfill"
version = "1.70.0" version = "1.70.0"
@ -1386,6 +1421,29 @@ version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a"
[[package]]
name = "pear"
version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bdeeaa00ce488657faba8ebf44ab9361f9365a97bd39ffb8a60663f57ff4b467"
dependencies = [
"inlinable_string",
"pear_codegen",
"yansi",
]
[[package]]
name = "pear_codegen"
version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bab5b985dc082b345f812b7df84e1bef27e7207b39e448439ba8bd69c93f147"
dependencies = [
"proc-macro2",
"proc-macro2-diagnostics",
"quote",
"syn 2.0.66",
]
[[package]] [[package]]
name = "pem-rfc7468" name = "pem-rfc7468"
version = "0.7.0" version = "0.7.0"
@ -1478,7 +1536,7 @@ version = "3.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d37c51ca738a55da99dc0c4a34860fd675453b8b36209178c2249bb13651284" checksum = "6d37c51ca738a55da99dc0c4a34860fd675453b8b36209178c2249bb13651284"
dependencies = [ dependencies = [
"toml_edit", "toml_edit 0.21.1",
] ]
[[package]] [[package]]
@ -1514,6 +1572,19 @@ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "proc-macro2-diagnostics"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af066a9c399a26e020ada66a034357a868728e72cd426f3adcd35f80d88d88c8"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.66",
"version_check",
"yansi",
]
[[package]] [[package]]
name = "ptr_meta" name = "ptr_meta"
version = "0.1.4" version = "0.1.4"
@ -1657,11 +1728,13 @@ dependencies = [
"axum", "axum",
"chrono", "chrono",
"clap", "clap",
"figment",
"futures", "futures",
"http-body-util", "http-body-util",
"libarchive", "libarchive",
"sea-orm", "sea-orm",
"sea-orm-migration", "sea-orm-migration",
"sea-query",
"serde", "serde",
"sha256", "sha256",
"tokio", "tokio",
@ -2036,6 +2109,15 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "serde_spanned"
version = "0.6.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79e674e01f999af37c49f70a6ede167a8a60b2503e56c5599532a65baa5969a0"
dependencies = [
"serde",
]
[[package]] [[package]]
name = "serde_urlencoded" name = "serde_urlencoded"
version = "0.7.1" version = "0.7.1"
@ -2623,11 +2705,26 @@ dependencies = [
"tokio", "tokio",
] ]
[[package]]
name = "toml"
version = "0.8.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6f49eb2ab21d2f26bd6db7bf383edc527a7ebaee412d17af4d40fdccd442f335"
dependencies = [
"serde",
"serde_spanned",
"toml_datetime",
"toml_edit 0.22.14",
]
[[package]] [[package]]
name = "toml_datetime" name = "toml_datetime"
version = "0.6.6" version = "0.6.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4badfd56924ae69bcc9039335b2e017639ce3f9b001c393c1b2d1ef846ce2cbf" checksum = "4badfd56924ae69bcc9039335b2e017639ce3f9b001c393c1b2d1ef846ce2cbf"
dependencies = [
"serde",
]
[[package]] [[package]]
name = "toml_edit" name = "toml_edit"
@ -2637,7 +2734,20 @@ checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1"
dependencies = [ dependencies = [
"indexmap", "indexmap",
"toml_datetime", "toml_datetime",
"winnow", "winnow 0.5.40",
]
[[package]]
name = "toml_edit"
version = "0.22.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f21c7aaf97f1bd9ca9d4f9e73b0a6c74bd5afef56f2bc931943a6e1c37e04e38"
dependencies = [
"indexmap",
"serde",
"serde_spanned",
"toml_datetime",
"winnow 0.6.13",
] ]
[[package]] [[package]]
@ -2762,6 +2872,15 @@ version = "1.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825"
[[package]]
name = "uncased"
version = "0.9.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1b88fcfe09e89d3866a5c11019378088af2d24c3fbd4f0543f96b479ec90697"
dependencies = [
"version_check",
]
[[package]] [[package]]
name = "unicase" name = "unicase"
version = "2.7.0" version = "2.7.0"
@ -3128,6 +3247,15 @@ dependencies = [
"memchr", "memchr",
] ]
[[package]]
name = "winnow"
version = "0.6.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59b5e5f6c299a3c7890b876a2a587f3115162487e704907d9b6cd29473052ba1"
dependencies = [
"memchr",
]
[[package]] [[package]]
name = "wyz" name = "wyz"
version = "0.5.1" version = "0.5.1"
@ -3137,6 +3265,12 @@ dependencies = [
"tap", "tap",
] ]
[[package]]
name = "yansi"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cfe53a6657fd280eaa890a3bc59152892ffa3e30101319d168b781ed6529b049"
[[package]] [[package]]
name = "zerocopy" name = "zerocopy"
version = "0.7.34" version = "0.7.34"

View File

@ -10,10 +10,12 @@ authors = ["Jef Roosens"]
axum = { version = "0.7.5", features = ["http2", "macros"] } axum = { version = "0.7.5", features = ["http2", "macros"] }
chrono = { version = "0.4.26", features = ["serde"] } chrono = { version = "0.4.26", features = ["serde"] }
clap = { version = "4.3.12", features = ["env", "derive"] } clap = { version = "4.3.12", features = ["env", "derive"] }
figment = { version = "0.10.19", features = ["env", "toml"] }
futures = "0.3.28" futures = "0.3.28"
http-body-util = "0.1.1" http-body-util = "0.1.1"
libarchive = { path = "../libarchive" } libarchive = { path = "../libarchive" }
sea-orm-migration = "0.12.1" sea-orm-migration = "0.12.1"
sea-query = { version = "0.30.7", features = ["backend-postgres", "backend-sqlite"] }
serde = { version = "1.0.178", features = ["derive"] } serde = { version = "1.0.178", features = ["derive"] }
sha256 = "1.1.4" sha256 = "1.1.4"
tokio = { version = "1.29.1", features = ["full"] } tokio = { version = "1.29.1", features = ["full"] }

View File

@ -0,0 +1,17 @@
api_key = "test"
pkg_workers = 2
log_level = "rieterd=debug"
[fs]
type = "local"
data_dir = "./data"
[db]
type = "sqlite"
db_dir = "./data"
# [db]
# type = "postgres"
# host = "localhost"
# db = "rieter"
# user = "rieter"
# password = "rieter"

View File

@ -22,10 +22,10 @@ async fn get_repos(
Query(pagination): Query<pagination::Query>, Query(pagination): Query<pagination::Query>,
Query(filter): Query<db::query::repo::Filter>, Query(filter): Query<db::query::repo::Filter>,
) -> crate::Result<Json<PaginatedResponse<db::repo::Model>>> { ) -> crate::Result<Json<PaginatedResponse<db::repo::Model>>> {
let (total_pages, items) = let items =
db::query::repo::page(&global.db, pagination.per_page, pagination.page - 1, filter).await?; db::query::repo::page(&global.db, pagination.per_page, pagination.page - 1, filter).await?;
Ok(Json(pagination.res(total_pages, items))) Ok(Json(pagination.res(items)))
} }
async fn get_single_repo( async fn get_single_repo(
@ -44,11 +44,11 @@ async fn get_packages(
Query(pagination): Query<pagination::Query>, Query(pagination): Query<pagination::Query>,
Query(filter): Query<db::query::package::Filter>, Query(filter): Query<db::query::package::Filter>,
) -> crate::Result<Json<PaginatedResponse<db::package::Model>>> { ) -> crate::Result<Json<PaginatedResponse<db::package::Model>>> {
let (total_pages, pkgs) = let items =
db::query::package::page(&global.db, pagination.per_page, pagination.page - 1, filter) db::query::package::page(&global.db, pagination.per_page, pagination.page - 1, filter)
.await?; .await?;
Ok(Json(pagination.res(total_pages, pkgs))) Ok(Json(pagination.res(items)))
} }
async fn get_single_package( async fn get_single_package(

View File

@ -1,19 +1,19 @@
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[derive(Deserialize)] #[derive(Deserialize)]
#[serde(default)]
pub struct Query { pub struct Query {
#[serde(default = "default_page")]
pub page: u64, pub page: u64,
#[serde(default = "default_per_page")]
pub per_page: u64, pub per_page: u64,
} }
impl Default for Query { fn default_page() -> u64 {
fn default() -> Self { 1
Query { }
page: 1,
per_page: 25, fn default_per_page() -> u64 {
} 25
}
} }
#[derive(Serialize)] #[derive(Serialize)]
@ -23,21 +23,15 @@ where
{ {
pub page: u64, pub page: u64,
pub per_page: u64, pub per_page: u64,
pub total_pages: u64,
pub count: usize, pub count: usize,
pub items: Vec<T>, pub items: Vec<T>,
} }
impl Query { impl Query {
pub fn res<T: for<'de> Serialize>( pub fn res<T: for<'de> Serialize>(self, items: Vec<T>) -> PaginatedResponse<T> {
self,
total_pages: u64,
items: Vec<T>,
) -> PaginatedResponse<T> {
PaginatedResponse { PaginatedResponse {
page: self.page, page: self.page,
per_page: self.per_page, per_page: self.per_page,
total_pages,
count: items.len(), count: items.len(),
items, items,
} }

View File

@ -1,92 +1,71 @@
use crate::{distro::MetaDistroMgr, Config, Global}; use crate::{Config, FsConfig, Global};
use std::{io, path::PathBuf, sync::Arc};
use axum::Router; use axum::Router;
use clap::Parser; use clap::Parser;
use sea_orm_migration::MigratorTrait; use sea_orm_migration::MigratorTrait;
use std::{io, path::PathBuf};
use tower_http::trace::TraceLayer; use tower_http::trace::TraceLayer;
use tracing::debug;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
#[derive(Parser)] #[derive(Parser)]
#[command(author, version, about, long_about = None)] #[command(author, version, about, long_about = None)]
pub struct Cli { pub struct Cli {
/// Directory where repository metadata & SQLite database is stored
#[arg(env = "RIETER_DATA_DIR")]
pub data_dir: PathBuf,
/// API key to authenticate private routes with
#[arg(env = "RIETER_API_KEY")]
pub api_key: String,
/// Database connection URL; either sqlite:// or postgres://. Defaults to rieter.sqlite in the
/// data directory
#[arg(short, long, env = "RIETER_DATABASE_URL")]
pub database_url: Option<String>,
/// Port the server will listen on
#[arg( #[arg(
short, short,
long, long,
value_name = "PORT", env = "RIETER_CONFIG_FILE",
default_value_t = 8000, default_value = "./rieterd.toml"
env = "RIETER_PORT"
)] )]
pub port: u16, pub config_file: PathBuf,
/// Log levels for the tracing
#[arg(
long,
value_name = "LOG_LEVEL",
default_value = "tower_http=debug,rieterd=debug,sea_orm=debug",
env = "RIETER_LOG"
)]
pub log: String,
} }
impl Cli { impl Cli {
pub fn init_tracing(&self) { pub async fn run(&self) -> crate::Result<()> {
let config: Config = Config::figment(&self.config_file)
.extract()
.inspect_err(|e| tracing::error!("{}", e))?;
tracing_subscriber::registry() tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::new(self.log.clone())) .with(tracing_subscriber::EnvFilter::new(config.log_level.clone()))
.with(tracing_subscriber::fmt::layer()) .with(tracing_subscriber::fmt::layer())
.init(); .init();
}
pub async fn run(&self) -> crate::Result<()> { tracing::info!("Connecting to database");
self.init_tracing(); let db = crate::db::connect(&config.db).await?;
let db_url = if let Some(url) = &self.database_url {
url.clone()
} else {
format!(
"sqlite://{}?mode=rwc",
self.data_dir.join("rieter.sqlite").to_string_lossy()
)
};
debug!("Connecting to database with URL {}", db_url);
let mut options = sea_orm::ConnectOptions::new(db_url);
options.max_connections(16);
let db = sea_orm::Database::connect(options).await?;
crate::db::Migrator::up(&db, None).await?; crate::db::Migrator::up(&db, None).await?;
debug!("Successfully applied migrations"); let mgr = match &config.fs {
FsConfig::Local { data_dir } => {
let config = Config { crate::repo::RepoMgr::new(data_dir.join("repos"), db.clone()).await?
data_dir: self.data_dir.clone(), }
}; };
let mgr = MetaDistroMgr::new(&self.data_dir.join("distros"), db.clone()).await?; let mgr = Arc::new(mgr);
let global = Global { config, mgr, db }; for _ in 0..config.pkg_workers {
let clone = Arc::clone(&mgr);
tokio::spawn(async move { clone.pkg_parse_task().await });
}
let global = Global {
config: config.clone(),
mgr,
db,
};
// build our application with a single route // build our application with a single route
let app = Router::new() let app = Router::new()
.nest("/api", crate::api::router()) .nest("/api", crate::api::router())
.merge(crate::repo::router(&self.api_key)) .merge(crate::repo::router(&config.api_key))
.with_state(global) .with_state(global)
.layer(TraceLayer::new_for_http()); .layer(TraceLayer::new_for_http());
let domain: String = format!("0.0.0.0:{}", self.port).parse().unwrap(); let domain: String = format!("{}:{}", config.domain, config.port)
.parse()
.unwrap();
let listener = tokio::net::TcpListener::bind(domain).await?; let listener = tokio::net::TcpListener::bind(domain).await?;
// run it with hyper on localhost:3000 // run it with hyper on localhost:3000
Ok(axum::serve(listener, app.into_make_service()) Ok(axum::serve(listener, app.into_make_service())

View File

@ -0,0 +1,88 @@
use std::path::{Path, PathBuf};
use figment::{
providers::{Env, Format, Toml},
Figment,
};
use serde::Deserialize;
#[derive(Deserialize, Debug, Clone)]
#[serde(rename_all = "lowercase")]
#[serde(tag = "type")]
pub enum FsConfig {
Local { data_dir: PathBuf },
}
#[derive(Deserialize, Debug, Clone)]
#[serde(rename_all = "lowercase")]
#[serde(tag = "type")]
pub enum DbConfig {
Sqlite {
db_dir: PathBuf,
#[serde(default = "default_db_sqlite_max_connections")]
max_connections: u32,
},
Postgres {
host: String,
#[serde(default = "default_db_postgres_port")]
port: u16,
user: String,
password: String,
db: String,
#[serde(default)]
schema: String,
#[serde(default = "default_db_postgres_max_connections")]
max_connections: u32,
},
}
#[derive(Deserialize, Debug, Clone)]
pub struct Config {
pub api_key: String,
#[serde(default = "default_domain")]
pub domain: String,
#[serde(default = "default_port")]
pub port: u16,
#[serde(default = "default_log_level")]
pub log_level: String,
pub fs: FsConfig,
pub db: DbConfig,
#[serde(default = "default_pkg_workers")]
pub pkg_workers: u32,
}
impl Config {
pub fn figment(config_file: impl AsRef<Path>) -> Figment {
Figment::new()
.merge(Toml::file(config_file))
.merge(Env::prefixed("RIETER_"))
}
}
fn default_domain() -> String {
String::from("0.0.0.0")
}
fn default_port() -> u16 {
8000
}
fn default_log_level() -> String {
String::from("tower_http=debug,rieterd=debug,sea_orm=debug")
}
fn default_db_sqlite_max_connections() -> u32 {
16
}
fn default_db_postgres_port() -> u16 {
5432
}
fn default_db_postgres_max_connections() -> u32 {
16
}
fn default_pkg_workers() -> u32 {
1
}

View File

@ -4,6 +4,8 @@ use chrono::NaiveDateTime;
use sea_orm::entity::prelude::*; use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::db::PackageState;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "package")] #[sea_orm(table_name = "package")]
pub struct Model { pub struct Model {
@ -24,6 +26,8 @@ pub struct Model {
pub pgp_sig_size: Option<i64>, pub pgp_sig_size: Option<i64>,
pub sha256_sum: String, pub sha256_sum: String,
pub compression: String, pub compression: String,
#[serde(skip_serializing)]
pub state: PackageState,
} }
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]

View File

@ -81,7 +81,12 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Package::PgpSig).string_len(255)) .col(ColumnDef::new(Package::PgpSig).string_len(255))
.col(ColumnDef::new(Package::PgpSigSize).big_integer()) .col(ColumnDef::new(Package::PgpSigSize).big_integer())
.col(ColumnDef::new(Package::Sha256Sum).char_len(64).not_null()) .col(ColumnDef::new(Package::Sha256Sum).char_len(64).not_null())
.col(ColumnDef::new(Package::Compression).string_len(16).not_null()) .col(
ColumnDef::new(Package::Compression)
.string_len(16)
.not_null(),
)
.col(ColumnDef::new(Package::State).integer().not_null())
.foreign_key( .foreign_key(
ForeignKey::create() ForeignKey::create()
.name("fk-package-repo_id") .name("fk-package-repo_id")
@ -264,6 +269,7 @@ pub enum Package {
PgpSigSize, PgpSigSize,
Sha256Sum, Sha256Sum,
Compression, Compression,
State,
} }
#[derive(Iden)] #[derive(Iden)]

View File

@ -2,10 +2,12 @@ pub mod entities;
mod migrator; mod migrator;
pub mod query; pub mod query;
use crate::config::DbConfig;
pub use entities::{prelude::*, *}; pub use entities::{prelude::*, *};
pub use migrator::Migrator; pub use migrator::Migrator;
use sea_orm::{DeriveActiveEnum, EnumIter}; use sea_orm::{ConnectionTrait, Database, DbConn, DeriveActiveEnum, EnumIter};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
type Result<T> = std::result::Result<T, sea_orm::DbErr>; type Result<T> = std::result::Result<T, sea_orm::DbErr>;
@ -30,6 +32,17 @@ pub enum PackageRelatedEnum {
Optdepend, Optdepend,
} }
#[derive(EnumIter, DeriveActiveEnum, Deserialize, Serialize, PartialEq, Eq, Clone, Debug)]
#[sea_orm(rs_type = "i32", db_type = "Integer")]
pub enum PackageState {
#[sea_orm(num_value = 0)]
PendingCommit,
#[sea_orm(num_value = 1)]
Committed,
#[sea_orm(num_value = 2)]
PendingDeletion,
}
#[derive(Serialize)] #[derive(Serialize)]
pub struct FullPackage { pub struct FullPackage {
#[serde(flatten)] #[serde(flatten)]
@ -39,3 +52,50 @@ pub struct FullPackage {
related: Vec<(PackageRelatedEnum, String)>, related: Vec<(PackageRelatedEnum, String)>,
files: Vec<String>, files: Vec<String>,
} }
pub async fn connect(conn: &DbConfig) -> crate::Result<DbConn> {
match conn {
DbConfig::Sqlite {
db_dir,
max_connections,
} => {
let url = format!(
"sqlite://{}?mode=rwc",
db_dir.join("rieter.sqlite").to_string_lossy()
);
let options = sea_orm::ConnectOptions::new(url)
.max_connections(*max_connections)
.to_owned();
let conn = Database::connect(options).await?;
// synchronous=NORMAL still ensures database consistency with WAL mode, as per the docs
// https://www.sqlite.org/pragma.html#pragma_synchronous
conn.execute_unprepared("PRAGMA journal_mode=WAL;").await?;
conn.execute_unprepared("PRAGMA synchronous=NORMAL;")
.await?;
Ok(conn)
}
DbConfig::Postgres {
host,
port,
db,
user,
password,
schema,
max_connections,
} => {
let mut url = format!("postgres://{}:{}@{}:{}/{}", user, password, host, port, db);
if schema != "" {
url = format!("{url}?currentSchema={schema}");
}
let options = sea_orm::ConnectOptions::new(url)
.max_connections(*max_connections)
.to_owned();
Ok(Database::connect(options).await?)
}
}
}

View File

@ -21,15 +21,14 @@ pub async fn page(
per_page: u64, per_page: u64,
page: u64, page: u64,
filter: Filter, filter: Filter,
) -> Result<(u64, Vec<distro::Model>)> { ) -> Result<Vec<distro::Model>> {
let paginator = Distro::find() let paginator = Distro::find()
.filter(filter) .filter(filter)
.order_by_asc(distro::Column::Id) .order_by_asc(distro::Column::Id)
.paginate(conn, per_page); .paginate(conn, per_page);
let repos = paginator.fetch_page(page).await?; let repos = paginator.fetch_page(page).await?;
let total_pages = paginator.num_pages().await?;
Ok((total_pages, repos)) Ok(repos)
} }
pub async fn by_id(conn: &DbConn, id: i32) -> Result<Option<distro::Model>> { pub async fn by_id(conn: &DbConn, id: i32) -> Result<Option<distro::Model>> {

View File

@ -1,6 +1,8 @@
use crate::db::{self, *}; use crate::db::{self, *};
use futures::Stream;
use sea_orm::{sea_query::IntoCondition, *}; use sea_orm::{sea_query::IntoCondition, *};
use sea_query::{Alias, Asterisk, Expr, IntoColumnRef, Query, SelectStatement};
use serde::Deserialize; use serde::Deserialize;
#[derive(Deserialize)] #[derive(Deserialize)]
@ -15,10 +17,7 @@ impl IntoCondition for Filter {
Condition::all() Condition::all()
.add_option(self.repo.map(|repo| package::Column::RepoId.eq(repo))) .add_option(self.repo.map(|repo| package::Column::RepoId.eq(repo)))
.add_option(self.arch.map(|arch| package::Column::Arch.eq(arch))) .add_option(self.arch.map(|arch| package::Column::Arch.eq(arch)))
.add_option( .add_option(self.name.map(|name| package::Column::Name.contains(name)))
self.name
.map(|name| package::Column::Name.like(format!("%{}%", name))),
)
} }
} }
@ -27,15 +26,29 @@ pub async fn page(
per_page: u64, per_page: u64,
page: u64, page: u64,
filter: Filter, filter: Filter,
) -> super::Result<(u64, Vec<package::Model>)> { ) -> crate::Result<Vec<package::Model>> {
let paginator = Package::find() let p2 = Alias::new("p2");
.filter(filter) let query = Query::select()
.order_by_asc(package::Column::Id) .columns(db::package::Column::iter().map(|c| (db::package::Entity, c)))
.paginate(conn, per_page); .from(db::package::Entity)
let packages = paginator.fetch_page(page).await?; .join_subquery(
let total_pages = paginator.num_pages().await?; JoinType::InnerJoin,
max_pkg_ids_query(true),
p2.clone(),
Expr::col((db::package::Entity, db::package::Column::Id))
.eq(Expr::col((p2.clone(), Alias::new("max_id")))),
)
.cond_where(filter)
.order_by((db::package::Entity, db::package::Column::Id), Order::Asc)
.to_owned();
let builder = conn.get_database_backend();
let sql = builder.build(&query);
Ok((total_pages, packages)) Ok(db::Package::find()
.from_raw_sql(sql)
.paginate(conn, per_page)
.fetch_page(page)
.await?)
} }
pub async fn by_id(conn: &DbConn, id: i32) -> Result<Option<package::Model>> { pub async fn by_id(conn: &DbConn, id: i32) -> Result<Option<package::Model>> {
@ -68,9 +81,17 @@ pub async fn delete_with_arch(conn: &DbConn, repo_id: i32, arch: &str) -> Result
.await .await
} }
pub async fn insert(conn: &DbConn, repo_id: i32, pkg: crate::repo::package::Package) -> Result<()> { pub async fn insert(
conn: &DbConn,
repo_id: i32,
pkg: crate::repo::package::Package,
) -> Result<package::Model> {
let info = pkg.info; let info = pkg.info;
// Doing this manually is not the recommended way, but the generic error type of the
// transaction function didn't play well with my current error handling
let txn = conn.begin().await?;
let model = package::ActiveModel { let model = package::ActiveModel {
id: NotSet, id: NotSet,
repo_id: Set(repo_id), repo_id: Set(repo_id),
@ -88,9 +109,10 @@ pub async fn insert(conn: &DbConn, repo_id: i32, pkg: crate::repo::package::Pack
pgp_sig_size: Set(info.pgpsigsize), pgp_sig_size: Set(info.pgpsigsize),
sha256_sum: Set(info.sha256sum), sha256_sum: Set(info.sha256sum),
compression: Set(pkg.compression.extension().unwrap().to_string()), compression: Set(pkg.compression.extension().unwrap().to_string()),
state: Set(PackageState::PendingCommit),
}; };
let pkg_entry = model.insert(conn).await?; let pkg_entry = model.insert(&txn).await?;
// Insert all the related tables // Insert all the related tables
PackageLicense::insert_many(info.licenses.iter().map(|s| package_license::ActiveModel { PackageLicense::insert_many(info.licenses.iter().map(|s| package_license::ActiveModel {
@ -98,7 +120,7 @@ pub async fn insert(conn: &DbConn, repo_id: i32, pkg: crate::repo::package::Pack
name: Set(s.to_string()), name: Set(s.to_string()),
})) }))
.on_empty_do_nothing() .on_empty_do_nothing()
.exec(conn) .exec(&txn)
.await?; .await?;
PackageGroup::insert_many(info.groups.iter().map(|s| package_group::ActiveModel { PackageGroup::insert_many(info.groups.iter().map(|s| package_group::ActiveModel {
@ -106,7 +128,7 @@ pub async fn insert(conn: &DbConn, repo_id: i32, pkg: crate::repo::package::Pack
name: Set(s.to_string()), name: Set(s.to_string()),
})) }))
.on_empty_do_nothing() .on_empty_do_nothing()
.exec(conn) .exec(&txn)
.await?; .await?;
let related = info let related = info
@ -146,7 +168,7 @@ pub async fn insert(conn: &DbConn, repo_id: i32, pkg: crate::repo::package::Pack
name: Set(s.to_string()), name: Set(s.to_string()),
})) }))
.on_empty_do_nothing() .on_empty_do_nothing()
.exec(conn) .exec(&txn)
.await?; .await?;
PackageFile::insert_many(pkg.files.iter().map(|s| package_file::ActiveModel { PackageFile::insert_many(pkg.files.iter().map(|s| package_file::ActiveModel {
@ -154,10 +176,12 @@ pub async fn insert(conn: &DbConn, repo_id: i32, pkg: crate::repo::package::Pack
path: Set(s.display().to_string()), path: Set(s.display().to_string()),
})) }))
.on_empty_do_nothing() .on_empty_do_nothing()
.exec(conn) .exec(&txn)
.await?; .await?;
Ok(()) txn.commit().await?;
Ok(pkg_entry)
} }
pub async fn full(conn: &DbConn, id: i32) -> Result<Option<FullPackage>> { pub async fn full(conn: &DbConn, id: i32) -> Result<Option<FullPackage>> {
@ -202,3 +226,138 @@ pub async fn full(conn: &DbConn, id: i32) -> Result<Option<FullPackage>> {
Ok(None) Ok(None)
} }
} }
#[derive(FromQueryResult)]
pub struct PkgToRemove {
pub repo_id: i32,
pub id: i32,
}
fn max_pkg_ids_query(committed: bool) -> SelectStatement {
let mut query = Query::select()
.from(db::package::Entity)
.columns([
db::package::Column::RepoId,
db::package::Column::Arch,
db::package::Column::Name,
])
.expr_as(db::package::Column::Id.max(), Alias::new("max_id"))
.group_by_columns([
db::package::Column::RepoId,
db::package::Column::Arch,
db::package::Column::Name,
])
.to_owned();
if committed {
query.cond_where(db::package::Column::State.eq(db::PackageState::Committed));
}
query
}
/// Query that returns all packages that should be included in a sync for the given repository and
/// arch.
pub fn pkgs_to_sync(
conn: &DbConn,
repo: i32,
arch: &str,
) -> SelectorRaw<SelectModel<package::Model>> {
let (p1, p2) = (Alias::new("p1"), Alias::new("p2"));
let query = Query::select()
.columns(db::package::Column::iter().map(|c| (p1.clone(), c)))
.from_as(db::package::Entity, p1.clone())
.join_subquery(
JoinType::InnerJoin,
max_pkg_ids_query(false),
p2.clone(),
Expr::col((p1.clone(), db::package::Column::Id))
.eq(Expr::col((p2.clone(), Alias::new("max_id")))),
)
.cond_where(
Condition::all()
.add(Expr::col((p1.clone(), db::package::Column::RepoId)).eq(repo))
.add(
Expr::col((p1.clone(), db::package::Column::Arch))
.is_in([arch, crate::ANY_ARCH]),
)
.add(
Expr::col((p1.clone(), db::package::Column::State))
.ne(db::PackageState::PendingDeletion),
),
)
.to_owned();
let builder = conn.get_database_backend();
let sql = builder.build(&query);
db::Package::find().from_raw_sql(sql)
}
fn stale_pkgs_query(include_repo: bool) -> SelectStatement {
let (p1, p2) = (Alias::new("p1"), Alias::new("p2"));
let mut query = Query::select()
.from_as(db::package::Entity, p1.clone())
.to_owned();
if include_repo {
query.columns([
(p1.clone(), db::package::Column::RepoId),
(p1.clone(), db::package::Column::Id),
]);
} else {
query.column((p1.clone(), db::package::Column::Id));
}
// We left join on the max pkgs query because a repository that has all its packages set to
// "pending deletion" doesn't show up in the query. These are also included with a where clause
// on the joined rows.
query
.join_subquery(
JoinType::LeftJoin,
max_pkg_ids_query(true),
p2.clone(),
Condition::all()
.add(
Expr::col((p1.clone(), db::package::Column::RepoId))
.eq(Expr::col((p2.clone(), db::package::Column::RepoId))),
)
.add(
Expr::col((p1.clone(), db::package::Column::Arch))
.eq(Expr::col((p2.clone(), db::package::Column::Arch))),
)
.add(
Expr::col((p1.clone(), db::package::Column::Name))
.eq(Expr::col((p2.clone(), db::package::Column::Name))),
),
)
.cond_where(
Condition::any()
.add(
Expr::col((p1.clone(), db::package::Column::Id))
.lt(Expr::col((p2.clone(), Alias::new("max_id")))),
)
.add(
Expr::col((p1.clone(), db::package::Column::State))
.eq(db::PackageState::PendingDeletion),
),
);
query
}
pub fn stale_pkgs(conn: &DbConn) -> SelectorRaw<SelectModel<PkgToRemove>> {
let query = stale_pkgs_query(true);
let builder = conn.get_database_backend();
let sql = builder.build(&query);
PkgToRemove::find_by_statement(sql)
}
pub async fn delete_stale_pkgs(conn: &DbConn, max_id: i32) -> crate::Result<()> {
Ok(db::Package::delete_many()
.filter(db::package::Column::Id.lte(max_id))
.filter(db::package::Column::Id.in_subquery(stale_pkgs_query(false)))
.exec(conn)
.await
.map(|_| ())?)
}

View File

@ -21,15 +21,14 @@ pub async fn page(
per_page: u64, per_page: u64,
page: u64, page: u64,
filter: Filter, filter: Filter,
) -> Result<(u64, Vec<repo::Model>)> { ) -> Result<Vec<repo::Model>> {
let paginator = Repo::find() let paginator = Repo::find()
.filter(filter) .filter(filter)
.order_by_asc(repo::Column::Id) .order_by_asc(repo::Column::Id)
.paginate(conn, per_page); .paginate(conn, per_page);
let repos = paginator.fetch_page(page).await?; let repos = paginator.fetch_page(page).await?;
let total_pages = paginator.num_pages().await?;
Ok((total_pages, repos)) Ok(repos)
} }
pub async fn by_id(conn: &DbConn, id: i32) -> Result<Option<repo::Model>> { pub async fn by_id(conn: &DbConn, id: i32) -> Result<Option<repo::Model>> {

View File

@ -1,70 +0,0 @@
use crate::{db, DistroMgr};
use std::{
collections::HashMap,
path::{Path, PathBuf},
sync::Arc,
};
use sea_orm::{DbConn, EntityTrait};
use tokio::sync::Mutex;
#[derive(Clone)]
pub struct MetaDistroMgr {
distro_dir: PathBuf,
conn: DbConn,
distros: Arc<Mutex<HashMap<String, Arc<DistroMgr>>>>,
}
impl MetaDistroMgr {
pub async fn new<P: AsRef<Path>>(distro_dir: P, conn: DbConn) -> crate::Result<Self> {
if !tokio::fs::try_exists(&distro_dir).await? {
tokio::fs::create_dir(&distro_dir).await?;
}
let distro_dir = distro_dir.as_ref().to_path_buf();
let mut map: HashMap<String, Arc<DistroMgr>> = HashMap::new();
let distros = db::Distro::find().all(&conn).await?;
for distro in distros {
let mgr =
DistroMgr::new(distro_dir.join(&distro.name), distro.id, conn.clone()).await?;
map.insert(distro.name, Arc::new(mgr));
}
Ok(Self {
distro_dir,
conn,
distros: Arc::new(Mutex::new(map)),
})
}
pub async fn get_mgr(&self, distro: &str) -> Option<Arc<DistroMgr>> {
let map = self.distros.lock().await;
map.get(distro).map(|mgr| Arc::clone(mgr))
}
pub async fn get_or_create_mgr(&self, distro: &str) -> crate::Result<Arc<DistroMgr>> {
let mut map = self.distros.lock().await;
if let Some(mgr) = map.get(distro) {
Ok(Arc::clone(mgr))
} else {
let distro = db::query::distro::insert(&self.conn, distro, None).await?;
let mgr = Arc::new(
DistroMgr::new(
self.distro_dir.join(&distro.name),
distro.id,
self.conn.clone(),
)
.await?,
);
map.insert(distro.name, Arc::clone(&mgr));
Ok(mgr)
}
}
}

View File

@ -14,6 +14,8 @@ pub enum ServerError {
Db(sea_orm::DbErr), Db(sea_orm::DbErr),
Status(StatusCode), Status(StatusCode),
Archive(libarchive::error::ArchiveError), Archive(libarchive::error::ArchiveError),
Figment(figment::Error),
Unit,
} }
impl fmt::Display for ServerError { impl fmt::Display for ServerError {
@ -24,6 +26,8 @@ impl fmt::Display for ServerError {
ServerError::Status(status) => write!(fmt, "{}", status), ServerError::Status(status) => write!(fmt, "{}", status),
ServerError::Db(err) => write!(fmt, "{}", err), ServerError::Db(err) => write!(fmt, "{}", err),
ServerError::Archive(err) => write!(fmt, "{}", err), ServerError::Archive(err) => write!(fmt, "{}", err),
ServerError::Figment(err) => write!(fmt, "{}", err),
ServerError::Unit => Ok(()),
} }
} }
} }
@ -41,9 +45,10 @@ impl IntoResponse for ServerError {
ServerError::Db(sea_orm::DbErr::RecordNotFound(_)) => { ServerError::Db(sea_orm::DbErr::RecordNotFound(_)) => {
StatusCode::NOT_FOUND.into_response() StatusCode::NOT_FOUND.into_response()
} }
ServerError::Db(_) | ServerError::Archive(_) => { ServerError::Db(_)
StatusCode::INTERNAL_SERVER_ERROR.into_response() | ServerError::Archive(_)
} | ServerError::Figment(_)
| ServerError::Unit => StatusCode::INTERNAL_SERVER_ERROR.into_response(),
} }
} }
} }
@ -83,3 +88,9 @@ impl From<libarchive::error::ArchiveError> for ServerError {
ServerError::Archive(err) ServerError::Archive(err)
} }
} }
impl From<figment::Error> for ServerError {
fn from(err: figment::Error) -> Self {
ServerError::Figment(err)
}
}

View File

@ -1,25 +1,23 @@
mod api; mod api;
mod cli; mod cli;
mod config;
pub mod db; pub mod db;
mod distro;
mod error; mod error;
mod repo; mod repo;
pub use config::{Config, DbConfig, FsConfig};
pub use error::{Result, ServerError}; pub use error::{Result, ServerError};
use repo::DistroMgr;
use std::sync::Arc;
use clap::Parser; use clap::Parser;
use std::path::PathBuf;
#[derive(Clone)] pub const ANY_ARCH: &'static str = "any";
pub struct Config {
data_dir: PathBuf,
}
#[derive(Clone)] #[derive(Clone)]
pub struct Global { pub struct Global {
config: Config, config: crate::config::Config,
mgr: distro::MetaDistroMgr, mgr: Arc<repo::RepoMgr>,
db: sea_orm::DbConn, db: sea_orm::DbConn,
} }

View File

@ -1,71 +1,107 @@
use super::{archive, package}; use super::{archive, package};
use crate::{db, error::Result}; use crate::db::{self, query::package::delete_stale_pkgs};
use std::path::{Path, PathBuf}; use std::{
collections::HashMap,
path::{Path, PathBuf},
sync::{
atomic::{AtomicU32, Ordering},
Arc,
},
};
use futures::StreamExt; use futures::StreamExt;
use sea_orm::{ColumnTrait, DbConn, ModelTrait, QueryFilter, QuerySelect}; use sea_orm::{
use tokio::io::AsyncRead; ActiveModelTrait, ColumnTrait, Condition, ConnectionTrait, DbConn, EntityTrait, JoinType,
ModelTrait, NotSet, QueryFilter, QuerySelect, Related, RelationTrait, Set, TransactionTrait,
};
use sea_query::{Alias, Expr, Query};
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
Mutex, RwLock,
};
use uuid::Uuid; use uuid::Uuid;
pub const ANY_ARCH: &'static str = "any"; struct PkgQueueMsg {
repo: i32,
pub struct DistroMgr { path: PathBuf,
distro_dir: PathBuf,
distro_id: i32,
conn: DbConn,
} }
impl DistroMgr { /// A single instance of this struct orchestrates everything related to managing packages files on
pub async fn new<P: AsRef<Path>>(distro_dir: P, distro_id: i32, conn: DbConn) -> Result<Self> { /// disk for all repositories in the server
if !tokio::fs::try_exists(&distro_dir).await? { pub struct RepoMgr {
tokio::fs::create_dir(&distro_dir).await?; repos_dir: PathBuf,
conn: DbConn,
pkg_queue: (
UnboundedSender<PkgQueueMsg>,
Mutex<UnboundedReceiver<PkgQueueMsg>>,
),
repos: RwLock<HashMap<i32, (AtomicU32, Arc<Mutex<()>>)>>,
}
impl RepoMgr {
pub async fn new<P: AsRef<Path>>(repos_dir: P, conn: DbConn) -> crate::Result<Self> {
if !tokio::fs::try_exists(&repos_dir).await? {
tokio::fs::create_dir(&repos_dir).await?;
}
let (tx, rx) = unbounded_channel();
let mut repos = HashMap::new();
let repo_ids: Vec<i32> = db::Repo::find()
.select_only()
.column(db::repo::Column::Id)
.into_tuple()
.all(&conn)
.await?;
for id in repo_ids {
repos.insert(id, Default::default());
} }
Ok(Self { Ok(Self {
distro_dir: distro_dir.as_ref().to_path_buf(), repos_dir: repos_dir.as_ref().to_path_buf(),
distro_id,
conn, conn,
pkg_queue: (tx, Mutex::new(rx)),
repos: RwLock::new(repos),
}) })
} }
/// Generate archive databases for all known architectures in the repository, including the /// Generate archive databases for all known architectures in the repository, including the
/// "any" architecture. /// "any" architecture.
pub async fn generate_archives_all(&self, repo: &str) -> Result<()> { pub async fn sync_repo(&self, repo: i32) -> crate::Result<()> {
let repo = crate::db::query::repo::by_name(&self.conn, repo).await?; let lock = self
.repos
.read()
.await
.get(&repo)
.map(|(_, lock)| Arc::clone(lock));
if repo.is_none() { if lock.is_none() {
return Ok(()); return Ok(());
} }
let repo = repo.unwrap(); let lock = lock.unwrap();
let _guard = lock.lock().await;
let mut archs = repo let archs: Vec<String> = db::Package::find()
.find_related(crate::db::Package) .filter(db::package::Column::RepoId.eq(repo))
.select_only() .select_only()
.column(crate::db::package::Column::Arch) .column(db::package::Column::Arch)
.distinct() .distinct()
.into_tuple::<String>() .into_tuple()
.stream(&self.conn) .all(&self.conn)
.await?; .await?;
while let Some(arch) = archs.next().await.transpose()? { for arch in archs {
self.generate_archives(&repo.name, &arch).await?; self.generate_archives(repo, &arch).await?;
} }
Ok(()) Ok(())
} }
/// Generate the archive databases for the given repository and architecture. /// Generate the archive databases for the given repository and architecture.
pub async fn generate_archives(&self, repo: &str, arch: &str) -> Result<()> { async fn generate_archives(&self, repo: i32, arch: &str) -> crate::Result<()> {
let repo = crate::db::query::repo::by_name(&self.conn, repo).await?;
if repo.is_none() {
return Ok(());
}
let repo = repo.unwrap();
let [tmp_ar_db_path, tmp_ar_files_path, files_tmp_file_path, desc_tmp_file_path] = let [tmp_ar_db_path, tmp_ar_files_path, files_tmp_file_path, desc_tmp_file_path] =
self.random_file_paths(); self.random_file_paths();
let ar_db = archive::RepoArchiveWriter::open(&tmp_ar_db_path).await?; let ar_db = archive::RepoArchiveWriter::open(&tmp_ar_db_path).await?;
@ -73,13 +109,15 @@ impl DistroMgr {
// Query all packages in the repo that have the given architecture or the "any" // Query all packages in the repo that have the given architecture or the "any"
// architecture // architecture
let mut pkgs = repo let mut pkgs = db::query::package::pkgs_to_sync(&self.conn, repo, arch)
.find_related(crate::db::Package)
.filter(db::package::Column::Arch.is_in([arch, ANY_ARCH]))
.stream(&self.conn) .stream(&self.conn)
.await?; .await?;
let mut commited_ids: Vec<i32> = Vec::new();
while let Some(pkg) = pkgs.next().await.transpose()? { while let Some(pkg) = pkgs.next().await.transpose()? {
commited_ids.push(pkg.id);
let mut files_tmp_file = tokio::fs::File::create(&files_tmp_file_path).await?; let mut files_tmp_file = tokio::fs::File::create(&files_tmp_file_path).await?;
let mut desc_tmp_file = tokio::fs::File::create(&desc_tmp_file_path).await?; let mut desc_tmp_file = tokio::fs::File::create(&desc_tmp_file_path).await?;
@ -103,7 +141,7 @@ impl DistroMgr {
ar_db.close().await?; ar_db.close().await?;
ar_files.close().await?; ar_files.close().await?;
let repo_dir = self.distro_dir.join(&repo.name); let repo_dir = self.repos_dir.join(repo.to_string());
// Move the db archives to their respective places // Move the db archives to their respective places
tokio::fs::rename(tmp_ar_db_path, repo_dir.join(format!("{}.db.tar.gz", arch))).await?; tokio::fs::rename(tmp_ar_db_path, repo_dir.join(format!("{}.db.tar.gz", arch))).await?;
@ -113,176 +151,235 @@ impl DistroMgr {
) )
.await?; .await?;
// Only after we have successfully written everything to disk do we update the database.
// This order ensures any failure can be recovered, as the database is our single source of
// truth.
db::Package::update_many()
.col_expr(
db::package::Column::State,
Expr::value(db::PackageState::Committed),
)
.filter(db::package::Column::Id.is_in(commited_ids))
.exec(&self.conn)
.await?;
// If this fails there's no point in failing the function + if there were no packages in // If this fails there's no point in failing the function + if there were no packages in
// the repo, this fails anyway because the temp file doesn't exist // the repo, this fails anyway because the temp file doesn't exist
let _ = tokio::fs::remove_file(desc_tmp_file_path).await; let _ = tokio::fs::remove_file(desc_tmp_file_path).await;
let _ = tokio::fs::remove_file(files_tmp_file_path).await; let _ = tokio::fs::remove_file(files_tmp_file_path).await;
tracing::info!("Package archives generated for repo {} ('{}')", repo, arch);
Ok(()) Ok(())
} }
/// Remove the repo with the given name, if it existed /// Clean any remaining old package files from the database and file system
pub async fn remove_repo(&self, repo: &str) -> Result<bool> { pub async fn remove_stale_pkgs(&self) -> crate::Result<()> {
let res = db::query::repo::by_name(&self.conn, repo).await?; let mut pkgs = db::query::package::stale_pkgs(&self.conn)
if let Some(repo_entry) = res {
// Remove repository from database
repo_entry.delete(&self.conn).await?;
// Remove files from file system
tokio::fs::remove_dir_all(self.distro_dir.join(repo)).await?;
Ok(true)
} else {
Ok(false)
}
}
/// Remove all packages from the repository with the given arch.
pub async fn remove_repo_arch(&self, repo: &str, arch: &str) -> Result<bool> {
let repo = db::query::repo::by_name(&self.conn, repo).await?;
if let Some(repo) = repo {
let mut pkgs = repo
.find_related(db::Package)
.filter(db::package::Column::Arch.eq(arch))
.stream(&self.conn) .stream(&self.conn)
.await?; .await?;
// Ids are monotonically increasing, so the max id suffices to know which packages to
// remove later
let mut max_id = -1;
let mut removed_pkgs = 0;
while let Some(pkg) = pkgs.next().await.transpose()? { while let Some(pkg) = pkgs.next().await.transpose()? {
let path = self // Failing to remove the package file isn't the biggest problem
.distro_dir let _ = tokio::fs::remove_file(
.join(&repo.name) self.repos_dir
.join(super::package::filename(&pkg)); .join(pkg.repo_id.to_string())
tokio::fs::remove_file(path).await?; .join(pkg.id.to_string()),
)
.await;
pkg.delete(&self.conn).await?; if pkg.id > max_id {
max_id = pkg.id;
} }
tokio::fs::remove_file( removed_pkgs += 1;
self.distro_dir }
.join(&repo.name)
.join(format!("{}.db.tar.gz", arch)), if removed_pkgs > 0 {
) db::query::package::delete_stale_pkgs(&self.conn, max_id).await?;
.await?; }
tokio::fs::remove_file(
self.distro_dir tracing::info!("Removed {removed_pkgs} stale package(s)");
.join(&repo.name)
.join(format!("{}.files.tar.gz", arch)), Ok(())
}
pub async fn pkg_parse_task(&self) {
loop {
// Receive the next message and immediately drop the mutex afterwards. As long as the
// quue is empty, this will lock the mutex. This is okay, as the mutex will be unlocked
// as soon as a message is received, so another worker can pick up the mutex.
let msg = {
let mut recv = self.pkg_queue.1.lock().await;
recv.recv().await
};
if let Some(msg) = msg {
// TODO better handle this error (retry if failure wasn't because the package is
// faulty)
let _ = self
.add_pkg_from_path(msg.path, msg.repo)
.await
.inspect_err(|e| tracing::error!("{:?}", e));
let old = self
.repos
.read()
.await
.get(&msg.repo)
.map(|n| n.0.fetch_sub(1, Ordering::SeqCst));
// Every time the queue for a repo becomes empty, we run a sync job
if old == Some(1) {
// TODO error handling
let _ = self.sync_repo(msg.repo).await;
// TODO move this so that we only clean if entire queue is empty, not just
// queue for specific repo
let _ = self.remove_stale_pkgs().await;
}
}
}
}
pub async fn queue_pkg(&self, repo: i32, path: PathBuf) {
self.pkg_queue.0.send(PkgQueueMsg { path, repo }).unwrap();
self.repos.read().await.get(&repo).inspect(|n| {
n.0.fetch_add(1, Ordering::SeqCst);
});
}
pub async fn get_repo(&self, distro: &str, repo: &str) -> crate::Result<Option<i32>> {
Ok(db::Repo::find()
.find_also_related(db::Distro)
.filter(
Condition::all()
.add(db::repo::Column::Name.eq(repo))
.add(db::distro::Column::Name.eq(distro)),
) )
.one(&self.conn)
.await
.map(|res| res.map(|(repo, _)| repo.id))?)
}
pub async fn get_or_create_repo(&self, distro: &str, repo: &str) -> crate::Result<i32> {
let mut repos = self.repos.write().await;
let distro_id: Option<i32> = db::Distro::find()
.filter(db::distro::Column::Name.eq(distro))
.select_only()
.column(db::distro::Column::Id)
.into_tuple()
.one(&self.conn)
.await?; .await?;
// If we removed all "any" packages, we need to resync all databases let distro_id = if let Some(id) = distro_id {
if arch == ANY_ARCH { id
self.generate_archives_all(&repo.name).await?;
}
Ok(true)
} else { } else {
Ok(false) let new_distro = db::distro::ActiveModel {
} id: NotSet,
} name: Set(distro.to_string()),
description: NotSet,
};
pub async fn remove_pkg(&self, repo: &str, arch: &str, name: &str) -> Result<bool> { new_distro.insert(&self.conn).await?.id
let repo = db::query::repo::by_name(&self.conn, repo).await?; };
if let Some(repo) = repo { let repo_id: Option<i32> = db::Repo::find()
let pkg = .filter(db::repo::Column::DistroId.eq(distro_id))
db::query::package::by_fields(&self.conn, repo.id, arch, name, None, None).await?; .filter(db::repo::Column::Name.eq(repo))
.select_only()
if let Some(pkg) = pkg { .column(db::repo::Column::Id)
// Remove package from database & file system .into_tuple()
tokio::fs::remove_file( .one(&self.conn)
self.distro_dir
.join(&repo.name)
.join(super::package::filename(&pkg)),
)
.await?; .await?;
pkg.delete(&self.conn).await?;
if arch == ANY_ARCH { let repo_id = if let Some(id) = repo_id {
self.generate_archives_all(&repo.name).await?; id
} else { } else {
self.generate_archives(&repo.name, arch).await?; let new_repo = db::repo::ActiveModel {
id: NotSet,
distro_id: Set(distro_id),
name: Set(repo.to_string()),
description: NotSet,
};
let id = new_repo.insert(&self.conn).await?.id;
tokio::fs::create_dir(self.repos_dir.join(id.to_string())).await?;
repos.insert(id, Default::default());
id
};
Ok(repo_id)
} }
Ok(true) async fn add_pkg_from_path<P: AsRef<Path>>(&self, path: P, repo: i32) -> crate::Result<()> {
} else { let path_clone = path.as_ref().to_path_buf();
Ok(false)
}
} else {
Ok(false)
}
}
pub async fn add_pkg_from_reader<R: AsyncRead + std::marker::Unpin>(
&self,
reader: &mut R,
repo: &str,
) -> crate::Result<(String, String, String)> {
let [path] = self.random_file_paths();
let mut temp_file = tokio::fs::File::create(&path).await?;
tokio::io::copy(reader, &mut temp_file).await?;
let path_clone = path.clone();
let pkg = tokio::task::spawn_blocking(move || package::Package::open(path_clone)) let pkg = tokio::task::spawn_blocking(move || package::Package::open(path_clone))
.await .await
.unwrap()?; .unwrap()?;
let repo_dir = self.distro_dir.join(repo); // TODO prevent database from being updated but file failing to move to repo dir?
let pkg = db::query::package::insert(&self.conn, repo, pkg).await?;
let repo_id = if let Some(repo) = db::query::repo::by_name(&self.conn, &repo).await? { let dest_path = self
repo.id .repos_dir
} else { .join(repo.to_string())
tokio::fs::create_dir(&repo_dir).await?; .join(pkg.id.to_string());
tokio::fs::rename(path.as_ref(), dest_path).await?;
db::query::repo::insert(&self.conn, self.distro_id, repo, None) tracing::info!(
.await? "Added '{}-{}-{}' to repository {}",
.id pkg.name,
}; pkg.version,
pkg.arch,
repo,
);
// If the package already exists in the database, we remove it first Ok(())
let res = db::query::package::by_fields( }
&self.conn,
repo_id, pub async fn remove_repo(&self, repo: i32) -> crate::Result<()> {
&pkg.info.arch, self.repos.write().await.remove(&repo);
&pkg.info.name, db::Repo::delete_by_id(repo).exec(&self.conn).await?;
None, let _ = tokio::fs::remove_dir_all(self.repos_dir.join(repo.to_string())).await;
None,
Ok(())
}
/// Remove all packages in the repository that have a given arch. This method marks all
/// packages with the given architecture as "pending deletion", before performing a manual sync
/// & removal of stale packages.
pub async fn remove_repo_arch(&self, repo: i32, arch: &str) -> crate::Result<()> {
db::Package::update_many()
.col_expr(
db::package::Column::State,
Expr::value(db::PackageState::PendingDeletion),
) )
.filter(
Condition::all()
.add(db::package::Column::RepoId.eq(repo))
.add(db::package::Column::Arch.eq(arch)),
)
.exec(&self.conn)
.await?; .await?;
if let Some(entry) = res { self.sync_repo(repo).await?;
entry.delete(&self.conn).await?; self.remove_stale_pkgs().await?;
Ok(())
} }
let dest_pkg_path = repo_dir.join(pkg.file_name());
// Insert new package into database
let name = pkg.info.name.clone();
let version = pkg.info.version.clone();
let arch = pkg.info.arch.clone();
db::query::package::insert(&self.conn, repo_id, pkg).await?;
// Move the package to its final resting place
tokio::fs::rename(path, dest_pkg_path).await?;
// Synchronize archive databases
if arch == ANY_ARCH {
self.generate_archives_all(repo).await?;
} else {
self.generate_archives(repo, &arch).await?;
}
Ok((name, version, arch))
}
/// Generate a path to a unique file that can be used as a temporary file
pub fn random_file_paths<const C: usize>(&self) -> [PathBuf; C] { pub fn random_file_paths<const C: usize>(&self) -> [PathBuf; C] {
std::array::from_fn(|_| { std::array::from_fn(|_| {
let uuid: uuid::fmt::Simple = Uuid::new_v4().into(); let uuid: uuid::fmt::Simple = Uuid::new_v4().into();
self.distro_dir.join(uuid.to_string()) self.repos_dir.join(uuid.to_string())
}) })
} }
} }

View File

@ -2,7 +2,9 @@ mod archive;
mod manager; mod manager;
pub mod package; pub mod package;
pub use manager::DistroMgr; pub use manager::RepoMgr;
use crate::FsConfig;
use axum::{ use axum::{
body::Body, body::Body,
@ -47,15 +49,14 @@ async fn get_file(
Path((distro, repo, arch, file_name)): Path<(String, String, String, String)>, Path((distro, repo, arch, file_name)): Path<(String, String, String, String)>,
req: Request<Body>, req: Request<Body>,
) -> crate::Result<impl IntoResponse> { ) -> crate::Result<impl IntoResponse> {
let repo_dir = global if let Some(repo_id) = global.mgr.get_repo(&distro, &repo).await? {
.config match global.config.fs {
.data_dir FsConfig::Local { data_dir } => {
.join("distros") let repo_dir = data_dir.join("repos").join(repo_id.to_string());
.join(&distro)
.join(&repo);
let file_name = let file_name = if file_name == format!("{}.db", repo)
if file_name == format!("{}.db", repo) || file_name == format!("{}.db.tar.gz", repo) { || file_name == format!("{}.db.tar.gz", repo)
{
format!("{}.db.tar.gz", arch) format!("{}.db.tar.gz", arch)
} else if file_name == format!("{}.files", repo) } else if file_name == format!("{}.files", repo)
|| file_name == format!("{}.files.tar.gz", repo) || file_name == format!("{}.files.tar.gz", repo)
@ -65,87 +66,96 @@ async fn get_file(
file_name file_name
}; };
Ok(ServeFile::new(repo_dir.join(file_name)).oneshot(req).await) let path = repo_dir.join(file_name);
Ok(ServeFile::new(path).oneshot(req).await)
}
}
} else {
Err(StatusCode::NOT_FOUND.into())
}
} }
async fn post_package_archive( async fn post_package_archive(
State(global): State<crate::Global>, State(global): State<crate::Global>,
Path((distro, repo)): Path<(String, String)>, Path((distro, repo)): Path<(String, String)>,
body: Body, body: Body,
) -> crate::Result<()> { ) -> crate::Result<StatusCode> {
let mut body = StreamReader::new(body.into_data_stream().map_err(std::io::Error::other)); let mut body = StreamReader::new(body.into_data_stream().map_err(std::io::Error::other));
let mgr = global.mgr.get_or_create_mgr(&distro).await?; let repo = global.mgr.get_or_create_repo(&distro, &repo).await?;
let (name, version, arch) = mgr.add_pkg_from_reader(&mut body, &repo).await?; let [tmp_path] = global.mgr.random_file_paths();
tracing::info!( let mut tmp_file = tokio::fs::File::create(&tmp_path).await?;
"Added '{}-{}' to repository '{}' ({})", tokio::io::copy(&mut body, &mut tmp_file).await?;
name,
version,
repo,
arch
);
Ok(()) global.mgr.queue_pkg(repo, tmp_path).await;
Ok(StatusCode::ACCEPTED)
} }
async fn delete_repo( async fn delete_repo(
State(global): State<crate::Global>, State(global): State<crate::Global>,
Path((distro, repo)): Path<(String, String)>, Path((distro, repo)): Path<(String, String)>,
) -> crate::Result<StatusCode> { ) -> crate::Result<StatusCode> {
if let Some(mgr) = global.mgr.get_mgr(&distro).await { if let Some(repo) = global.mgr.get_repo(&distro, &repo).await? {
let repo_removed = mgr.remove_repo(&repo).await?; global.mgr.remove_repo(repo).await?;
if repo_removed { tracing::info!("Removed repository {repo}");
tracing::info!("Removed repository '{}'", repo);
Ok(StatusCode::OK) Ok(StatusCode::OK)
} else { } else {
Ok(StatusCode::NOT_FOUND) Ok(StatusCode::NOT_FOUND)
} }
} else {
Ok(StatusCode::NOT_FOUND)
}
} }
async fn delete_arch_repo( async fn delete_arch_repo(
State(global): State<crate::Global>, State(global): State<crate::Global>,
Path((distro, repo, arch)): Path<(String, String, String)>, Path((distro, repo, arch)): Path<(String, String, String)>,
) -> crate::Result<StatusCode> { ) -> crate::Result<StatusCode> {
if let Some(mgr) = global.mgr.get_mgr(&distro).await { if let Some(repo) = global.mgr.get_repo(&distro, &repo).await? {
let repo_removed = mgr.remove_repo_arch(&repo, &arch).await?; global.mgr.remove_repo_arch(repo, &arch).await?;
if repo_removed { tracing::info!("Removed architecture '{arch}' from repository {repo}");
tracing::info!("Removed arch '{}' from repository '{}'", arch, repo);
Ok(StatusCode::OK) Ok(StatusCode::OK)
} else { } else {
Ok(StatusCode::NOT_FOUND) Ok(StatusCode::NOT_FOUND)
} }
} else { //if let Some(mgr) = global.mgr.get_mgr(&distro).await {
Ok(StatusCode::NOT_FOUND) // let repo_removed = mgr.remove_repo_arch(&repo, &arch).await?;
} //
// if repo_removed {
// tracing::info!("Removed arch '{}' from repository '{}'", arch, repo);
//
// Ok(StatusCode::OK)
// } else {
// Ok(StatusCode::NOT_FOUND)
// }
//} else {
// Ok(StatusCode::NOT_FOUND)
//}
} }
async fn delete_package( async fn delete_package(
State(global): State<crate::Global>, State(global): State<crate::Global>,
Path((distro, repo, arch, pkg_name)): Path<(String, String, String, String)>, Path((distro, repo, arch, pkg_name)): Path<(String, String, String, String)>,
) -> crate::Result<StatusCode> { ) -> crate::Result<StatusCode> {
if let Some(mgr) = global.mgr.get_mgr(&distro).await {
let pkg_removed = mgr.remove_pkg(&repo, &arch, &pkg_name).await?;
if pkg_removed {
tracing::info!(
"Removed package '{}' ({}) from repository '{}'",
pkg_name,
arch,
repo
);
Ok(StatusCode::OK)
} else {
Ok(StatusCode::NOT_FOUND) Ok(StatusCode::NOT_FOUND)
} //if let Some(mgr) = global.mgr.get_mgr(&distro).await {
} else { // let pkg_removed = mgr.remove_pkg(&repo, &arch, &pkg_name).await?;
Ok(StatusCode::NOT_FOUND) //
} // if pkg_removed {
// tracing::info!(
// "Removed package '{}' ({}) from repository '{}'",
// pkg_name,
// arch,
// repo
// );
//
// Ok(StatusCode::OK)
// } else {
// Ok(StatusCode::NOT_FOUND)
// }
//} else {
// Ok(StatusCode::NOT_FOUND)
//}
} }

View File

@ -323,7 +323,7 @@ pub async fn write_desc<W: AsyncWrite + std::marker::Unpin>(
pkg: &package::Model, pkg: &package::Model,
) -> crate::Result<()> { ) -> crate::Result<()> {
writer writer
.write_all(format!("%FILENAME%\n{}\n", filename(pkg)).as_bytes()) .write_all(format!("%FILENAME%\n{}\n", pkg.id).as_bytes())
.await?; .await?;
write_attribute(writer, "NAME", &pkg.name).await?; write_attribute(writer, "NAME", &pkg.name).await?;
@ -397,6 +397,8 @@ pub async fn write_desc<W: AsyncWrite + std::marker::Unpin>(
write_attribute(writer, key, &items.join("\n")).await?; write_attribute(writer, key, &items.join("\n")).await?;
} }
writer.flush().await?;
Ok(()) Ok(())
} }
@ -417,5 +419,7 @@ pub async fn write_files<W: AsyncWrite + std::marker::Unpin>(
.await?; .await?;
} }
writer.flush().await?;
Ok(()) Ok(())
} }