Compare commits
No commits in common. "76395afb10b08db0557be5430c492950bd27e38e" and "f9518d6b7de2bb78e1e69d638ea505c248c0101d" have entirely different histories.
76395afb10
...
f9518d6b7d
19 changed files with 419 additions and 910 deletions
138
Cargo.lock
generated
138
Cargo.lock
generated
|
|
@ -174,15 +174,6 @@ dependencies = [
|
|||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "atomic"
|
||||
version = "0.6.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8d818003e740b63afc82337e3160717f4f63078720a810b7b903e70a5d1d2994"
|
||||
dependencies = [
|
||||
"bytemuck",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "atomic-waker"
|
||||
version = "1.1.2"
|
||||
|
|
@ -389,12 +380,6 @@ dependencies = [
|
|||
"syn 1.0.109",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "bytemuck"
|
||||
version = "1.16.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "78834c15cb5d5efe3452d58b1e8ba890dd62d21907f867f383358198e56ebca5"
|
||||
|
||||
[[package]]
|
||||
name = "byteorder"
|
||||
version = "1.5.0"
|
||||
|
|
@ -645,20 +630,6 @@ version = "2.1.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a"
|
||||
|
||||
[[package]]
|
||||
name = "figment"
|
||||
version = "0.10.19"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8cb01cd46b0cf372153850f4c6c272d9cbea2da513e07538405148f95bd789f3"
|
||||
dependencies = [
|
||||
"atomic",
|
||||
"pear",
|
||||
"serde",
|
||||
"toml",
|
||||
"uncased",
|
||||
"version_check",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "flume"
|
||||
version = "0.11.0"
|
||||
|
|
@ -1066,12 +1037,6 @@ dependencies = [
|
|||
"syn 2.0.66",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "inlinable_string"
|
||||
version = "0.1.15"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c8fae54786f62fb2918dcfae3d568594e50eb9b5c25bf04371af6fe7516452fb"
|
||||
|
||||
[[package]]
|
||||
name = "is_terminal_polyfill"
|
||||
version = "1.70.0"
|
||||
|
|
@ -1421,29 +1386,6 @@ version = "1.0.15"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a"
|
||||
|
||||
[[package]]
|
||||
name = "pear"
|
||||
version = "0.2.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bdeeaa00ce488657faba8ebf44ab9361f9365a97bd39ffb8a60663f57ff4b467"
|
||||
dependencies = [
|
||||
"inlinable_string",
|
||||
"pear_codegen",
|
||||
"yansi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pear_codegen"
|
||||
version = "0.2.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4bab5b985dc082b345f812b7df84e1bef27e7207b39e448439ba8bd69c93f147"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"proc-macro2-diagnostics",
|
||||
"quote",
|
||||
"syn 2.0.66",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pem-rfc7468"
|
||||
version = "0.7.0"
|
||||
|
|
@ -1536,7 +1478,7 @@ version = "3.1.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6d37c51ca738a55da99dc0c4a34860fd675453b8b36209178c2249bb13651284"
|
||||
dependencies = [
|
||||
"toml_edit 0.21.1",
|
||||
"toml_edit",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
|
@ -1572,19 +1514,6 @@ dependencies = [
|
|||
"unicode-ident",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro2-diagnostics"
|
||||
version = "0.10.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "af066a9c399a26e020ada66a034357a868728e72cd426f3adcd35f80d88d88c8"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.66",
|
||||
"version_check",
|
||||
"yansi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ptr_meta"
|
||||
version = "0.1.4"
|
||||
|
|
@ -1728,13 +1657,11 @@ dependencies = [
|
|||
"axum",
|
||||
"chrono",
|
||||
"clap",
|
||||
"figment",
|
||||
"futures",
|
||||
"http-body-util",
|
||||
"libarchive",
|
||||
"sea-orm",
|
||||
"sea-orm-migration",
|
||||
"sea-query",
|
||||
"serde",
|
||||
"sha256",
|
||||
"tokio",
|
||||
|
|
@ -2109,15 +2036,6 @@ dependencies = [
|
|||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_spanned"
|
||||
version = "0.6.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "79e674e01f999af37c49f70a6ede167a8a60b2503e56c5599532a65baa5969a0"
|
||||
dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_urlencoded"
|
||||
version = "0.7.1"
|
||||
|
|
@ -2705,26 +2623,11 @@ dependencies = [
|
|||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "toml"
|
||||
version = "0.8.14"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6f49eb2ab21d2f26bd6db7bf383edc527a7ebaee412d17af4d40fdccd442f335"
|
||||
dependencies = [
|
||||
"serde",
|
||||
"serde_spanned",
|
||||
"toml_datetime",
|
||||
"toml_edit 0.22.14",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "toml_datetime"
|
||||
version = "0.6.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4badfd56924ae69bcc9039335b2e017639ce3f9b001c393c1b2d1ef846ce2cbf"
|
||||
dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "toml_edit"
|
||||
|
|
@ -2734,20 +2637,7 @@ checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1"
|
|||
dependencies = [
|
||||
"indexmap",
|
||||
"toml_datetime",
|
||||
"winnow 0.5.40",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "toml_edit"
|
||||
version = "0.22.14"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f21c7aaf97f1bd9ca9d4f9e73b0a6c74bd5afef56f2bc931943a6e1c37e04e38"
|
||||
dependencies = [
|
||||
"indexmap",
|
||||
"serde",
|
||||
"serde_spanned",
|
||||
"toml_datetime",
|
||||
"winnow 0.6.13",
|
||||
"winnow",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
|
@ -2872,15 +2762,6 @@ version = "1.17.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825"
|
||||
|
||||
[[package]]
|
||||
name = "uncased"
|
||||
version = "0.9.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e1b88fcfe09e89d3866a5c11019378088af2d24c3fbd4f0543f96b479ec90697"
|
||||
dependencies = [
|
||||
"version_check",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "unicase"
|
||||
version = "2.7.0"
|
||||
|
|
@ -3247,15 +3128,6 @@ dependencies = [
|
|||
"memchr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "winnow"
|
||||
version = "0.6.13"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "59b5e5f6c299a3c7890b876a2a587f3115162487e704907d9b6cd29473052ba1"
|
||||
dependencies = [
|
||||
"memchr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "wyz"
|
||||
version = "0.5.1"
|
||||
|
|
@ -3265,12 +3137,6 @@ dependencies = [
|
|||
"tap",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "yansi"
|
||||
version = "1.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cfe53a6657fd280eaa890a3bc59152892ffa3e30101319d168b781ed6529b049"
|
||||
|
||||
[[package]]
|
||||
name = "zerocopy"
|
||||
version = "0.7.34"
|
||||
|
|
|
|||
|
|
@ -10,12 +10,10 @@ authors = ["Jef Roosens"]
|
|||
axum = { version = "0.7.5", features = ["http2", "macros"] }
|
||||
chrono = { version = "0.4.26", features = ["serde"] }
|
||||
clap = { version = "4.3.12", features = ["env", "derive"] }
|
||||
figment = { version = "0.10.19", features = ["env", "toml"] }
|
||||
futures = "0.3.28"
|
||||
http-body-util = "0.1.1"
|
||||
libarchive = { path = "../libarchive" }
|
||||
sea-orm-migration = "0.12.1"
|
||||
sea-query = { version = "0.30.7", features = ["backend-postgres", "backend-sqlite"] }
|
||||
serde = { version = "1.0.178", features = ["derive"] }
|
||||
sha256 = "1.1.4"
|
||||
tokio = { version = "1.29.1", features = ["full"] }
|
||||
|
|
|
|||
|
|
@ -1,17 +0,0 @@
|
|||
api_key = "test"
|
||||
pkg_workers = 2
|
||||
log_level = "rieterd=debug"
|
||||
|
||||
[fs]
|
||||
type = "local"
|
||||
data_dir = "./data"
|
||||
|
||||
[db]
|
||||
type = "sqlite"
|
||||
db_dir = "./data"
|
||||
# [db]
|
||||
# type = "postgres"
|
||||
# host = "localhost"
|
||||
# db = "rieter"
|
||||
# user = "rieter"
|
||||
# password = "rieter"
|
||||
|
|
@ -22,10 +22,10 @@ async fn get_repos(
|
|||
Query(pagination): Query<pagination::Query>,
|
||||
Query(filter): Query<db::query::repo::Filter>,
|
||||
) -> crate::Result<Json<PaginatedResponse<db::repo::Model>>> {
|
||||
let items =
|
||||
let (total_pages, items) =
|
||||
db::query::repo::page(&global.db, pagination.per_page, pagination.page - 1, filter).await?;
|
||||
|
||||
Ok(Json(pagination.res(items)))
|
||||
Ok(Json(pagination.res(total_pages, items)))
|
||||
}
|
||||
|
||||
async fn get_single_repo(
|
||||
|
|
@ -44,11 +44,11 @@ async fn get_packages(
|
|||
Query(pagination): Query<pagination::Query>,
|
||||
Query(filter): Query<db::query::package::Filter>,
|
||||
) -> crate::Result<Json<PaginatedResponse<db::package::Model>>> {
|
||||
let items =
|
||||
let (total_pages, pkgs) =
|
||||
db::query::package::page(&global.db, pagination.per_page, pagination.page - 1, filter)
|
||||
.await?;
|
||||
|
||||
Ok(Json(pagination.res(items)))
|
||||
Ok(Json(pagination.res(total_pages, pkgs)))
|
||||
}
|
||||
|
||||
async fn get_single_package(
|
||||
|
|
|
|||
|
|
@ -1,19 +1,19 @@
|
|||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Deserialize)]
|
||||
#[serde(default)]
|
||||
pub struct Query {
|
||||
#[serde(default = "default_page")]
|
||||
pub page: u64,
|
||||
#[serde(default = "default_per_page")]
|
||||
pub per_page: u64,
|
||||
}
|
||||
|
||||
fn default_page() -> u64 {
|
||||
1
|
||||
impl Default for Query {
|
||||
fn default() -> Self {
|
||||
Query {
|
||||
page: 1,
|
||||
per_page: 25,
|
||||
}
|
||||
}
|
||||
|
||||
fn default_per_page() -> u64 {
|
||||
25
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
|
|
@ -23,15 +23,21 @@ where
|
|||
{
|
||||
pub page: u64,
|
||||
pub per_page: u64,
|
||||
pub total_pages: u64,
|
||||
pub count: usize,
|
||||
pub items: Vec<T>,
|
||||
}
|
||||
|
||||
impl Query {
|
||||
pub fn res<T: for<'de> Serialize>(self, items: Vec<T>) -> PaginatedResponse<T> {
|
||||
pub fn res<T: for<'de> Serialize>(
|
||||
self,
|
||||
total_pages: u64,
|
||||
items: Vec<T>,
|
||||
) -> PaginatedResponse<T> {
|
||||
PaginatedResponse {
|
||||
page: self.page,
|
||||
per_page: self.per_page,
|
||||
total_pages,
|
||||
count: items.len(),
|
||||
items,
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,71 +1,92 @@
|
|||
use crate::{Config, FsConfig, Global};
|
||||
|
||||
use std::{io, path::PathBuf, sync::Arc};
|
||||
use crate::{distro::MetaDistroMgr, Config, Global};
|
||||
|
||||
use axum::Router;
|
||||
use clap::Parser;
|
||||
use sea_orm_migration::MigratorTrait;
|
||||
use std::{io, path::PathBuf};
|
||||
use tower_http::trace::TraceLayer;
|
||||
use tracing::debug;
|
||||
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command(author, version, about, long_about = None)]
|
||||
pub struct Cli {
|
||||
/// Directory where repository metadata & SQLite database is stored
|
||||
#[arg(env = "RIETER_DATA_DIR")]
|
||||
pub data_dir: PathBuf,
|
||||
/// API key to authenticate private routes with
|
||||
#[arg(env = "RIETER_API_KEY")]
|
||||
pub api_key: String,
|
||||
|
||||
/// Database connection URL; either sqlite:// or postgres://. Defaults to rieter.sqlite in the
|
||||
/// data directory
|
||||
#[arg(short, long, env = "RIETER_DATABASE_URL")]
|
||||
pub database_url: Option<String>,
|
||||
/// Port the server will listen on
|
||||
#[arg(
|
||||
short,
|
||||
long,
|
||||
env = "RIETER_CONFIG_FILE",
|
||||
default_value = "./rieterd.toml"
|
||||
value_name = "PORT",
|
||||
default_value_t = 8000,
|
||||
env = "RIETER_PORT"
|
||||
)]
|
||||
pub config_file: PathBuf,
|
||||
pub port: u16,
|
||||
/// Log levels for the tracing
|
||||
#[arg(
|
||||
long,
|
||||
value_name = "LOG_LEVEL",
|
||||
default_value = "tower_http=debug,rieterd=debug,sea_orm=debug",
|
||||
env = "RIETER_LOG"
|
||||
)]
|
||||
pub log: String,
|
||||
}
|
||||
|
||||
impl Cli {
|
||||
pub async fn run(&self) -> crate::Result<()> {
|
||||
let config: Config = Config::figment(&self.config_file)
|
||||
.extract()
|
||||
.inspect_err(|e| tracing::error!("{}", e))?;
|
||||
|
||||
pub fn init_tracing(&self) {
|
||||
tracing_subscriber::registry()
|
||||
.with(tracing_subscriber::EnvFilter::new(config.log_level.clone()))
|
||||
.with(tracing_subscriber::EnvFilter::new(self.log.clone()))
|
||||
.with(tracing_subscriber::fmt::layer())
|
||||
.init();
|
||||
}
|
||||
|
||||
tracing::info!("Connecting to database");
|
||||
let db = crate::db::connect(&config.db).await?;
|
||||
pub async fn run(&self) -> crate::Result<()> {
|
||||
self.init_tracing();
|
||||
|
||||
let db_url = if let Some(url) = &self.database_url {
|
||||
url.clone()
|
||||
} else {
|
||||
format!(
|
||||
"sqlite://{}?mode=rwc",
|
||||
self.data_dir.join("rieter.sqlite").to_string_lossy()
|
||||
)
|
||||
};
|
||||
|
||||
debug!("Connecting to database with URL {}", db_url);
|
||||
|
||||
let mut options = sea_orm::ConnectOptions::new(db_url);
|
||||
options.max_connections(16);
|
||||
|
||||
let db = sea_orm::Database::connect(options).await?;
|
||||
crate::db::Migrator::up(&db, None).await?;
|
||||
|
||||
let mgr = match &config.fs {
|
||||
FsConfig::Local { data_dir } => {
|
||||
crate::repo::RepoMgr::new(data_dir.join("repos"), db.clone()).await?
|
||||
}
|
||||
debug!("Successfully applied migrations");
|
||||
|
||||
let config = Config {
|
||||
data_dir: self.data_dir.clone(),
|
||||
};
|
||||
|
||||
let mgr = Arc::new(mgr);
|
||||
let mgr = MetaDistroMgr::new(&self.data_dir.join("distros"), db.clone()).await?;
|
||||
|
||||
for _ in 0..config.pkg_workers {
|
||||
let clone = Arc::clone(&mgr);
|
||||
|
||||
tokio::spawn(async move { clone.pkg_parse_task().await });
|
||||
}
|
||||
|
||||
let global = Global {
|
||||
config: config.clone(),
|
||||
mgr,
|
||||
db,
|
||||
};
|
||||
let global = Global { config, mgr, db };
|
||||
|
||||
// build our application with a single route
|
||||
let app = Router::new()
|
||||
.nest("/api", crate::api::router())
|
||||
.merge(crate::repo::router(&config.api_key))
|
||||
.merge(crate::repo::router(&self.api_key))
|
||||
.with_state(global)
|
||||
.layer(TraceLayer::new_for_http());
|
||||
|
||||
let domain: String = format!("{}:{}", config.domain, config.port)
|
||||
.parse()
|
||||
.unwrap();
|
||||
let domain: String = format!("0.0.0.0:{}", self.port).parse().unwrap();
|
||||
let listener = tokio::net::TcpListener::bind(domain).await?;
|
||||
// run it with hyper on localhost:3000
|
||||
Ok(axum::serve(listener, app.into_make_service())
|
||||
|
|
|
|||
|
|
@ -1,88 +0,0 @@
|
|||
use std::path::{Path, PathBuf};
|
||||
|
||||
use figment::{
|
||||
providers::{Env, Format, Toml},
|
||||
Figment,
|
||||
};
|
||||
use serde::Deserialize;
|
||||
|
||||
#[derive(Deserialize, Debug, Clone)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
#[serde(tag = "type")]
|
||||
pub enum FsConfig {
|
||||
Local { data_dir: PathBuf },
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug, Clone)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
#[serde(tag = "type")]
|
||||
pub enum DbConfig {
|
||||
Sqlite {
|
||||
db_dir: PathBuf,
|
||||
#[serde(default = "default_db_sqlite_max_connections")]
|
||||
max_connections: u32,
|
||||
},
|
||||
Postgres {
|
||||
host: String,
|
||||
#[serde(default = "default_db_postgres_port")]
|
||||
port: u16,
|
||||
user: String,
|
||||
password: String,
|
||||
db: String,
|
||||
#[serde(default)]
|
||||
schema: String,
|
||||
#[serde(default = "default_db_postgres_max_connections")]
|
||||
max_connections: u32,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug, Clone)]
|
||||
pub struct Config {
|
||||
pub api_key: String,
|
||||
#[serde(default = "default_domain")]
|
||||
pub domain: String,
|
||||
#[serde(default = "default_port")]
|
||||
pub port: u16,
|
||||
#[serde(default = "default_log_level")]
|
||||
pub log_level: String,
|
||||
pub fs: FsConfig,
|
||||
pub db: DbConfig,
|
||||
#[serde(default = "default_pkg_workers")]
|
||||
pub pkg_workers: u32,
|
||||
}
|
||||
|
||||
impl Config {
|
||||
pub fn figment(config_file: impl AsRef<Path>) -> Figment {
|
||||
Figment::new()
|
||||
.merge(Toml::file(config_file))
|
||||
.merge(Env::prefixed("RIETER_"))
|
||||
}
|
||||
}
|
||||
|
||||
fn default_domain() -> String {
|
||||
String::from("0.0.0.0")
|
||||
}
|
||||
|
||||
fn default_port() -> u16 {
|
||||
8000
|
||||
}
|
||||
|
||||
fn default_log_level() -> String {
|
||||
String::from("tower_http=debug,rieterd=debug,sea_orm=debug")
|
||||
}
|
||||
|
||||
fn default_db_sqlite_max_connections() -> u32 {
|
||||
16
|
||||
}
|
||||
|
||||
fn default_db_postgres_port() -> u16 {
|
||||
5432
|
||||
}
|
||||
|
||||
fn default_db_postgres_max_connections() -> u32 {
|
||||
16
|
||||
}
|
||||
|
||||
fn default_pkg_workers() -> u32 {
|
||||
1
|
||||
}
|
||||
|
|
@ -4,8 +4,6 @@ use chrono::NaiveDateTime;
|
|||
use sea_orm::entity::prelude::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::db::PackageState;
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
|
||||
#[sea_orm(table_name = "package")]
|
||||
pub struct Model {
|
||||
|
|
@ -26,8 +24,6 @@ pub struct Model {
|
|||
pub pgp_sig_size: Option<i64>,
|
||||
pub sha256_sum: String,
|
||||
pub compression: String,
|
||||
#[serde(skip_serializing)]
|
||||
pub state: PackageState,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
|
||||
|
|
|
|||
|
|
@ -81,12 +81,7 @@ impl MigrationTrait for Migration {
|
|||
.col(ColumnDef::new(Package::PgpSig).string_len(255))
|
||||
.col(ColumnDef::new(Package::PgpSigSize).big_integer())
|
||||
.col(ColumnDef::new(Package::Sha256Sum).char_len(64).not_null())
|
||||
.col(
|
||||
ColumnDef::new(Package::Compression)
|
||||
.string_len(16)
|
||||
.not_null(),
|
||||
)
|
||||
.col(ColumnDef::new(Package::State).integer().not_null())
|
||||
.col(ColumnDef::new(Package::Compression).string_len(16).not_null())
|
||||
.foreign_key(
|
||||
ForeignKey::create()
|
||||
.name("fk-package-repo_id")
|
||||
|
|
@ -269,7 +264,6 @@ pub enum Package {
|
|||
PgpSigSize,
|
||||
Sha256Sum,
|
||||
Compression,
|
||||
State,
|
||||
}
|
||||
|
||||
#[derive(Iden)]
|
||||
|
|
|
|||
|
|
@ -2,12 +2,10 @@ pub mod entities;
|
|||
mod migrator;
|
||||
pub mod query;
|
||||
|
||||
use crate::config::DbConfig;
|
||||
|
||||
pub use entities::{prelude::*, *};
|
||||
pub use migrator::Migrator;
|
||||
|
||||
use sea_orm::{ConnectionTrait, Database, DbConn, DeriveActiveEnum, EnumIter};
|
||||
use sea_orm::{DeriveActiveEnum, EnumIter};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
type Result<T> = std::result::Result<T, sea_orm::DbErr>;
|
||||
|
|
@ -32,17 +30,6 @@ pub enum PackageRelatedEnum {
|
|||
Optdepend,
|
||||
}
|
||||
|
||||
#[derive(EnumIter, DeriveActiveEnum, Deserialize, Serialize, PartialEq, Eq, Clone, Debug)]
|
||||
#[sea_orm(rs_type = "i32", db_type = "Integer")]
|
||||
pub enum PackageState {
|
||||
#[sea_orm(num_value = 0)]
|
||||
PendingCommit,
|
||||
#[sea_orm(num_value = 1)]
|
||||
Committed,
|
||||
#[sea_orm(num_value = 2)]
|
||||
PendingDeletion,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct FullPackage {
|
||||
#[serde(flatten)]
|
||||
|
|
@ -52,50 +39,3 @@ pub struct FullPackage {
|
|||
related: Vec<(PackageRelatedEnum, String)>,
|
||||
files: Vec<String>,
|
||||
}
|
||||
|
||||
pub async fn connect(conn: &DbConfig) -> crate::Result<DbConn> {
|
||||
match conn {
|
||||
DbConfig::Sqlite {
|
||||
db_dir,
|
||||
max_connections,
|
||||
} => {
|
||||
let url = format!(
|
||||
"sqlite://{}?mode=rwc",
|
||||
db_dir.join("rieter.sqlite").to_string_lossy()
|
||||
);
|
||||
let options = sea_orm::ConnectOptions::new(url)
|
||||
.max_connections(*max_connections)
|
||||
.to_owned();
|
||||
|
||||
let conn = Database::connect(options).await?;
|
||||
|
||||
// synchronous=NORMAL still ensures database consistency with WAL mode, as per the docs
|
||||
// https://www.sqlite.org/pragma.html#pragma_synchronous
|
||||
conn.execute_unprepared("PRAGMA journal_mode=WAL;").await?;
|
||||
conn.execute_unprepared("PRAGMA synchronous=NORMAL;")
|
||||
.await?;
|
||||
|
||||
Ok(conn)
|
||||
}
|
||||
DbConfig::Postgres {
|
||||
host,
|
||||
port,
|
||||
db,
|
||||
user,
|
||||
password,
|
||||
schema,
|
||||
max_connections,
|
||||
} => {
|
||||
let mut url = format!("postgres://{}:{}@{}:{}/{}", user, password, host, port, db);
|
||||
|
||||
if schema != "" {
|
||||
url = format!("{url}?currentSchema={schema}");
|
||||
}
|
||||
|
||||
let options = sea_orm::ConnectOptions::new(url)
|
||||
.max_connections(*max_connections)
|
||||
.to_owned();
|
||||
Ok(Database::connect(options).await?)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,14 +21,15 @@ pub async fn page(
|
|||
per_page: u64,
|
||||
page: u64,
|
||||
filter: Filter,
|
||||
) -> Result<Vec<distro::Model>> {
|
||||
) -> Result<(u64, Vec<distro::Model>)> {
|
||||
let paginator = Distro::find()
|
||||
.filter(filter)
|
||||
.order_by_asc(distro::Column::Id)
|
||||
.paginate(conn, per_page);
|
||||
let repos = paginator.fetch_page(page).await?;
|
||||
let total_pages = paginator.num_pages().await?;
|
||||
|
||||
Ok(repos)
|
||||
Ok((total_pages, repos))
|
||||
}
|
||||
|
||||
pub async fn by_id(conn: &DbConn, id: i32) -> Result<Option<distro::Model>> {
|
||||
|
|
|
|||
|
|
@ -1,8 +1,6 @@
|
|||
use crate::db::{self, *};
|
||||
|
||||
use futures::Stream;
|
||||
use sea_orm::{sea_query::IntoCondition, *};
|
||||
use sea_query::{Alias, Asterisk, Expr, IntoColumnRef, Query, SelectStatement};
|
||||
use serde::Deserialize;
|
||||
|
||||
#[derive(Deserialize)]
|
||||
|
|
@ -17,7 +15,10 @@ impl IntoCondition for Filter {
|
|||
Condition::all()
|
||||
.add_option(self.repo.map(|repo| package::Column::RepoId.eq(repo)))
|
||||
.add_option(self.arch.map(|arch| package::Column::Arch.eq(arch)))
|
||||
.add_option(self.name.map(|name| package::Column::Name.contains(name)))
|
||||
.add_option(
|
||||
self.name
|
||||
.map(|name| package::Column::Name.like(format!("%{}%", name))),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -26,29 +27,15 @@ pub async fn page(
|
|||
per_page: u64,
|
||||
page: u64,
|
||||
filter: Filter,
|
||||
) -> crate::Result<Vec<package::Model>> {
|
||||
let p2 = Alias::new("p2");
|
||||
let query = Query::select()
|
||||
.columns(db::package::Column::iter().map(|c| (db::package::Entity, c)))
|
||||
.from(db::package::Entity)
|
||||
.join_subquery(
|
||||
JoinType::InnerJoin,
|
||||
max_pkg_ids_query(true),
|
||||
p2.clone(),
|
||||
Expr::col((db::package::Entity, db::package::Column::Id))
|
||||
.eq(Expr::col((p2.clone(), Alias::new("max_id")))),
|
||||
)
|
||||
.cond_where(filter)
|
||||
.order_by((db::package::Entity, db::package::Column::Id), Order::Asc)
|
||||
.to_owned();
|
||||
let builder = conn.get_database_backend();
|
||||
let sql = builder.build(&query);
|
||||
) -> super::Result<(u64, Vec<package::Model>)> {
|
||||
let paginator = Package::find()
|
||||
.filter(filter)
|
||||
.order_by_asc(package::Column::Id)
|
||||
.paginate(conn, per_page);
|
||||
let packages = paginator.fetch_page(page).await?;
|
||||
let total_pages = paginator.num_pages().await?;
|
||||
|
||||
Ok(db::Package::find()
|
||||
.from_raw_sql(sql)
|
||||
.paginate(conn, per_page)
|
||||
.fetch_page(page)
|
||||
.await?)
|
||||
Ok((total_pages, packages))
|
||||
}
|
||||
|
||||
pub async fn by_id(conn: &DbConn, id: i32) -> Result<Option<package::Model>> {
|
||||
|
|
@ -81,17 +68,9 @@ pub async fn delete_with_arch(conn: &DbConn, repo_id: i32, arch: &str) -> Result
|
|||
.await
|
||||
}
|
||||
|
||||
pub async fn insert(
|
||||
conn: &DbConn,
|
||||
repo_id: i32,
|
||||
pkg: crate::repo::package::Package,
|
||||
) -> Result<package::Model> {
|
||||
pub async fn insert(conn: &DbConn, repo_id: i32, pkg: crate::repo::package::Package) -> Result<()> {
|
||||
let info = pkg.info;
|
||||
|
||||
// Doing this manually is not the recommended way, but the generic error type of the
|
||||
// transaction function didn't play well with my current error handling
|
||||
let txn = conn.begin().await?;
|
||||
|
||||
let model = package::ActiveModel {
|
||||
id: NotSet,
|
||||
repo_id: Set(repo_id),
|
||||
|
|
@ -109,10 +88,9 @@ pub async fn insert(
|
|||
pgp_sig_size: Set(info.pgpsigsize),
|
||||
sha256_sum: Set(info.sha256sum),
|
||||
compression: Set(pkg.compression.extension().unwrap().to_string()),
|
||||
state: Set(PackageState::PendingCommit),
|
||||
};
|
||||
|
||||
let pkg_entry = model.insert(&txn).await?;
|
||||
let pkg_entry = model.insert(conn).await?;
|
||||
|
||||
// Insert all the related tables
|
||||
PackageLicense::insert_many(info.licenses.iter().map(|s| package_license::ActiveModel {
|
||||
|
|
@ -120,7 +98,7 @@ pub async fn insert(
|
|||
name: Set(s.to_string()),
|
||||
}))
|
||||
.on_empty_do_nothing()
|
||||
.exec(&txn)
|
||||
.exec(conn)
|
||||
.await?;
|
||||
|
||||
PackageGroup::insert_many(info.groups.iter().map(|s| package_group::ActiveModel {
|
||||
|
|
@ -128,7 +106,7 @@ pub async fn insert(
|
|||
name: Set(s.to_string()),
|
||||
}))
|
||||
.on_empty_do_nothing()
|
||||
.exec(&txn)
|
||||
.exec(conn)
|
||||
.await?;
|
||||
|
||||
let related = info
|
||||
|
|
@ -168,7 +146,7 @@ pub async fn insert(
|
|||
name: Set(s.to_string()),
|
||||
}))
|
||||
.on_empty_do_nothing()
|
||||
.exec(&txn)
|
||||
.exec(conn)
|
||||
.await?;
|
||||
|
||||
PackageFile::insert_many(pkg.files.iter().map(|s| package_file::ActiveModel {
|
||||
|
|
@ -176,12 +154,10 @@ pub async fn insert(
|
|||
path: Set(s.display().to_string()),
|
||||
}))
|
||||
.on_empty_do_nothing()
|
||||
.exec(&txn)
|
||||
.exec(conn)
|
||||
.await?;
|
||||
|
||||
txn.commit().await?;
|
||||
|
||||
Ok(pkg_entry)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn full(conn: &DbConn, id: i32) -> Result<Option<FullPackage>> {
|
||||
|
|
@ -226,138 +202,3 @@ pub async fn full(conn: &DbConn, id: i32) -> Result<Option<FullPackage>> {
|
|||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(FromQueryResult)]
|
||||
pub struct PkgToRemove {
|
||||
pub repo_id: i32,
|
||||
pub id: i32,
|
||||
}
|
||||
|
||||
fn max_pkg_ids_query(committed: bool) -> SelectStatement {
|
||||
let mut query = Query::select()
|
||||
.from(db::package::Entity)
|
||||
.columns([
|
||||
db::package::Column::RepoId,
|
||||
db::package::Column::Arch,
|
||||
db::package::Column::Name,
|
||||
])
|
||||
.expr_as(db::package::Column::Id.max(), Alias::new("max_id"))
|
||||
.group_by_columns([
|
||||
db::package::Column::RepoId,
|
||||
db::package::Column::Arch,
|
||||
db::package::Column::Name,
|
||||
])
|
||||
.to_owned();
|
||||
|
||||
if committed {
|
||||
query.cond_where(db::package::Column::State.eq(db::PackageState::Committed));
|
||||
}
|
||||
|
||||
query
|
||||
}
|
||||
|
||||
/// Query that returns all packages that should be included in a sync for the given repository and
|
||||
/// arch.
|
||||
pub fn pkgs_to_sync(
|
||||
conn: &DbConn,
|
||||
repo: i32,
|
||||
arch: &str,
|
||||
) -> SelectorRaw<SelectModel<package::Model>> {
|
||||
let (p1, p2) = (Alias::new("p1"), Alias::new("p2"));
|
||||
let query = Query::select()
|
||||
.columns(db::package::Column::iter().map(|c| (p1.clone(), c)))
|
||||
.from_as(db::package::Entity, p1.clone())
|
||||
.join_subquery(
|
||||
JoinType::InnerJoin,
|
||||
max_pkg_ids_query(false),
|
||||
p2.clone(),
|
||||
Expr::col((p1.clone(), db::package::Column::Id))
|
||||
.eq(Expr::col((p2.clone(), Alias::new("max_id")))),
|
||||
)
|
||||
.cond_where(
|
||||
Condition::all()
|
||||
.add(Expr::col((p1.clone(), db::package::Column::RepoId)).eq(repo))
|
||||
.add(
|
||||
Expr::col((p1.clone(), db::package::Column::Arch))
|
||||
.is_in([arch, crate::ANY_ARCH]),
|
||||
)
|
||||
.add(
|
||||
Expr::col((p1.clone(), db::package::Column::State))
|
||||
.ne(db::PackageState::PendingDeletion),
|
||||
),
|
||||
)
|
||||
.to_owned();
|
||||
let builder = conn.get_database_backend();
|
||||
let sql = builder.build(&query);
|
||||
|
||||
db::Package::find().from_raw_sql(sql)
|
||||
}
|
||||
|
||||
fn stale_pkgs_query(include_repo: bool) -> SelectStatement {
|
||||
let (p1, p2) = (Alias::new("p1"), Alias::new("p2"));
|
||||
let mut query = Query::select()
|
||||
.from_as(db::package::Entity, p1.clone())
|
||||
.to_owned();
|
||||
|
||||
if include_repo {
|
||||
query.columns([
|
||||
(p1.clone(), db::package::Column::RepoId),
|
||||
(p1.clone(), db::package::Column::Id),
|
||||
]);
|
||||
} else {
|
||||
query.column((p1.clone(), db::package::Column::Id));
|
||||
}
|
||||
|
||||
// We left join on the max pkgs query because a repository that has all its packages set to
|
||||
// "pending deletion" doesn't show up in the query. These are also included with a where clause
|
||||
// on the joined rows.
|
||||
query
|
||||
.join_subquery(
|
||||
JoinType::LeftJoin,
|
||||
max_pkg_ids_query(true),
|
||||
p2.clone(),
|
||||
Condition::all()
|
||||
.add(
|
||||
Expr::col((p1.clone(), db::package::Column::RepoId))
|
||||
.eq(Expr::col((p2.clone(), db::package::Column::RepoId))),
|
||||
)
|
||||
.add(
|
||||
Expr::col((p1.clone(), db::package::Column::Arch))
|
||||
.eq(Expr::col((p2.clone(), db::package::Column::Arch))),
|
||||
)
|
||||
.add(
|
||||
Expr::col((p1.clone(), db::package::Column::Name))
|
||||
.eq(Expr::col((p2.clone(), db::package::Column::Name))),
|
||||
),
|
||||
)
|
||||
.cond_where(
|
||||
Condition::any()
|
||||
.add(
|
||||
Expr::col((p1.clone(), db::package::Column::Id))
|
||||
.lt(Expr::col((p2.clone(), Alias::new("max_id")))),
|
||||
)
|
||||
.add(
|
||||
Expr::col((p1.clone(), db::package::Column::State))
|
||||
.eq(db::PackageState::PendingDeletion),
|
||||
),
|
||||
);
|
||||
|
||||
query
|
||||
}
|
||||
|
||||
pub fn stale_pkgs(conn: &DbConn) -> SelectorRaw<SelectModel<PkgToRemove>> {
|
||||
let query = stale_pkgs_query(true);
|
||||
let builder = conn.get_database_backend();
|
||||
let sql = builder.build(&query);
|
||||
|
||||
PkgToRemove::find_by_statement(sql)
|
||||
}
|
||||
|
||||
pub async fn delete_stale_pkgs(conn: &DbConn, max_id: i32) -> crate::Result<()> {
|
||||
Ok(db::Package::delete_many()
|
||||
.filter(db::package::Column::Id.lte(max_id))
|
||||
.filter(db::package::Column::Id.in_subquery(stale_pkgs_query(false)))
|
||||
.exec(conn)
|
||||
.await
|
||||
.map(|_| ())?)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,14 +21,15 @@ pub async fn page(
|
|||
per_page: u64,
|
||||
page: u64,
|
||||
filter: Filter,
|
||||
) -> Result<Vec<repo::Model>> {
|
||||
) -> Result<(u64, Vec<repo::Model>)> {
|
||||
let paginator = Repo::find()
|
||||
.filter(filter)
|
||||
.order_by_asc(repo::Column::Id)
|
||||
.paginate(conn, per_page);
|
||||
let repos = paginator.fetch_page(page).await?;
|
||||
let total_pages = paginator.num_pages().await?;
|
||||
|
||||
Ok(repos)
|
||||
Ok((total_pages, repos))
|
||||
}
|
||||
|
||||
pub async fn by_id(conn: &DbConn, id: i32) -> Result<Option<repo::Model>> {
|
||||
|
|
|
|||
70
server/src/distro.rs
Normal file
70
server/src/distro.rs
Normal file
|
|
@ -0,0 +1,70 @@
|
|||
use crate::{db, DistroMgr};
|
||||
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
path::{Path, PathBuf},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use sea_orm::{DbConn, EntityTrait};
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct MetaDistroMgr {
|
||||
distro_dir: PathBuf,
|
||||
conn: DbConn,
|
||||
distros: Arc<Mutex<HashMap<String, Arc<DistroMgr>>>>,
|
||||
}
|
||||
|
||||
impl MetaDistroMgr {
|
||||
pub async fn new<P: AsRef<Path>>(distro_dir: P, conn: DbConn) -> crate::Result<Self> {
|
||||
if !tokio::fs::try_exists(&distro_dir).await? {
|
||||
tokio::fs::create_dir(&distro_dir).await?;
|
||||
}
|
||||
|
||||
let distro_dir = distro_dir.as_ref().to_path_buf();
|
||||
let mut map: HashMap<String, Arc<DistroMgr>> = HashMap::new();
|
||||
|
||||
let distros = db::Distro::find().all(&conn).await?;
|
||||
|
||||
for distro in distros {
|
||||
let mgr =
|
||||
DistroMgr::new(distro_dir.join(&distro.name), distro.id, conn.clone()).await?;
|
||||
map.insert(distro.name, Arc::new(mgr));
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
distro_dir,
|
||||
conn,
|
||||
distros: Arc::new(Mutex::new(map)),
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn get_mgr(&self, distro: &str) -> Option<Arc<DistroMgr>> {
|
||||
let map = self.distros.lock().await;
|
||||
|
||||
map.get(distro).map(|mgr| Arc::clone(mgr))
|
||||
}
|
||||
|
||||
pub async fn get_or_create_mgr(&self, distro: &str) -> crate::Result<Arc<DistroMgr>> {
|
||||
let mut map = self.distros.lock().await;
|
||||
|
||||
if let Some(mgr) = map.get(distro) {
|
||||
Ok(Arc::clone(mgr))
|
||||
} else {
|
||||
let distro = db::query::distro::insert(&self.conn, distro, None).await?;
|
||||
|
||||
let mgr = Arc::new(
|
||||
DistroMgr::new(
|
||||
self.distro_dir.join(&distro.name),
|
||||
distro.id,
|
||||
self.conn.clone(),
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
map.insert(distro.name, Arc::clone(&mgr));
|
||||
|
||||
Ok(mgr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -14,8 +14,6 @@ pub enum ServerError {
|
|||
Db(sea_orm::DbErr),
|
||||
Status(StatusCode),
|
||||
Archive(libarchive::error::ArchiveError),
|
||||
Figment(figment::Error),
|
||||
Unit,
|
||||
}
|
||||
|
||||
impl fmt::Display for ServerError {
|
||||
|
|
@ -26,8 +24,6 @@ impl fmt::Display for ServerError {
|
|||
ServerError::Status(status) => write!(fmt, "{}", status),
|
||||
ServerError::Db(err) => write!(fmt, "{}", err),
|
||||
ServerError::Archive(err) => write!(fmt, "{}", err),
|
||||
ServerError::Figment(err) => write!(fmt, "{}", err),
|
||||
ServerError::Unit => Ok(()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -45,10 +41,9 @@ impl IntoResponse for ServerError {
|
|||
ServerError::Db(sea_orm::DbErr::RecordNotFound(_)) => {
|
||||
StatusCode::NOT_FOUND.into_response()
|
||||
}
|
||||
ServerError::Db(_)
|
||||
| ServerError::Archive(_)
|
||||
| ServerError::Figment(_)
|
||||
| ServerError::Unit => StatusCode::INTERNAL_SERVER_ERROR.into_response(),
|
||||
ServerError::Db(_) | ServerError::Archive(_) => {
|
||||
StatusCode::INTERNAL_SERVER_ERROR.into_response()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -88,9 +83,3 @@ impl From<libarchive::error::ArchiveError> for ServerError {
|
|||
ServerError::Archive(err)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<figment::Error> for ServerError {
|
||||
fn from(err: figment::Error) -> Self {
|
||||
ServerError::Figment(err)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,23 +1,25 @@
|
|||
mod api;
|
||||
mod cli;
|
||||
mod config;
|
||||
pub mod db;
|
||||
mod distro;
|
||||
mod error;
|
||||
mod repo;
|
||||
|
||||
pub use config::{Config, DbConfig, FsConfig};
|
||||
pub use error::{Result, ServerError};
|
||||
|
||||
use std::sync::Arc;
|
||||
use repo::DistroMgr;
|
||||
|
||||
use clap::Parser;
|
||||
use std::path::PathBuf;
|
||||
|
||||
pub const ANY_ARCH: &'static str = "any";
|
||||
#[derive(Clone)]
|
||||
pub struct Config {
|
||||
data_dir: PathBuf,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Global {
|
||||
config: crate::config::Config,
|
||||
mgr: Arc<repo::RepoMgr>,
|
||||
config: Config,
|
||||
mgr: distro::MetaDistroMgr,
|
||||
db: sea_orm::DbConn,
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,107 +1,71 @@
|
|||
use super::{archive, package};
|
||||
use crate::db::{self, query::package::delete_stale_pkgs};
|
||||
use crate::{db, error::Result};
|
||||
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
path::{Path, PathBuf},
|
||||
sync::{
|
||||
atomic::{AtomicU32, Ordering},
|
||||
Arc,
|
||||
},
|
||||
};
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use futures::StreamExt;
|
||||
use sea_orm::{
|
||||
ActiveModelTrait, ColumnTrait, Condition, ConnectionTrait, DbConn, EntityTrait, JoinType,
|
||||
ModelTrait, NotSet, QueryFilter, QuerySelect, Related, RelationTrait, Set, TransactionTrait,
|
||||
};
|
||||
use sea_query::{Alias, Expr, Query};
|
||||
use tokio::sync::{
|
||||
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
|
||||
Mutex, RwLock,
|
||||
};
|
||||
use sea_orm::{ColumnTrait, DbConn, ModelTrait, QueryFilter, QuerySelect};
|
||||
use tokio::io::AsyncRead;
|
||||
use uuid::Uuid;
|
||||
|
||||
struct PkgQueueMsg {
|
||||
repo: i32,
|
||||
path: PathBuf,
|
||||
}
|
||||
pub const ANY_ARCH: &'static str = "any";
|
||||
|
||||
/// A single instance of this struct orchestrates everything related to managing packages files on
|
||||
/// disk for all repositories in the server
|
||||
pub struct RepoMgr {
|
||||
repos_dir: PathBuf,
|
||||
pub struct DistroMgr {
|
||||
distro_dir: PathBuf,
|
||||
distro_id: i32,
|
||||
conn: DbConn,
|
||||
pkg_queue: (
|
||||
UnboundedSender<PkgQueueMsg>,
|
||||
Mutex<UnboundedReceiver<PkgQueueMsg>>,
|
||||
),
|
||||
repos: RwLock<HashMap<i32, (AtomicU32, Arc<Mutex<()>>)>>,
|
||||
}
|
||||
|
||||
impl RepoMgr {
|
||||
pub async fn new<P: AsRef<Path>>(repos_dir: P, conn: DbConn) -> crate::Result<Self> {
|
||||
if !tokio::fs::try_exists(&repos_dir).await? {
|
||||
tokio::fs::create_dir(&repos_dir).await?;
|
||||
}
|
||||
|
||||
let (tx, rx) = unbounded_channel();
|
||||
|
||||
let mut repos = HashMap::new();
|
||||
let repo_ids: Vec<i32> = db::Repo::find()
|
||||
.select_only()
|
||||
.column(db::repo::Column::Id)
|
||||
.into_tuple()
|
||||
.all(&conn)
|
||||
.await?;
|
||||
|
||||
for id in repo_ids {
|
||||
repos.insert(id, Default::default());
|
||||
impl DistroMgr {
|
||||
pub async fn new<P: AsRef<Path>>(distro_dir: P, distro_id: i32, conn: DbConn) -> Result<Self> {
|
||||
if !tokio::fs::try_exists(&distro_dir).await? {
|
||||
tokio::fs::create_dir(&distro_dir).await?;
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
repos_dir: repos_dir.as_ref().to_path_buf(),
|
||||
distro_dir: distro_dir.as_ref().to_path_buf(),
|
||||
distro_id,
|
||||
conn,
|
||||
pkg_queue: (tx, Mutex::new(rx)),
|
||||
repos: RwLock::new(repos),
|
||||
})
|
||||
}
|
||||
|
||||
/// Generate archive databases for all known architectures in the repository, including the
|
||||
/// "any" architecture.
|
||||
pub async fn sync_repo(&self, repo: i32) -> crate::Result<()> {
|
||||
let lock = self
|
||||
.repos
|
||||
.read()
|
||||
.await
|
||||
.get(&repo)
|
||||
.map(|(_, lock)| Arc::clone(lock));
|
||||
pub async fn generate_archives_all(&self, repo: &str) -> Result<()> {
|
||||
let repo = crate::db::query::repo::by_name(&self.conn, repo).await?;
|
||||
|
||||
if lock.is_none() {
|
||||
if repo.is_none() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let lock = lock.unwrap();
|
||||
let _guard = lock.lock().await;
|
||||
let repo = repo.unwrap();
|
||||
|
||||
let archs: Vec<String> = db::Package::find()
|
||||
.filter(db::package::Column::RepoId.eq(repo))
|
||||
let mut archs = repo
|
||||
.find_related(crate::db::Package)
|
||||
.select_only()
|
||||
.column(db::package::Column::Arch)
|
||||
.column(crate::db::package::Column::Arch)
|
||||
.distinct()
|
||||
.into_tuple()
|
||||
.all(&self.conn)
|
||||
.into_tuple::<String>()
|
||||
.stream(&self.conn)
|
||||
.await?;
|
||||
|
||||
for arch in archs {
|
||||
self.generate_archives(repo, &arch).await?;
|
||||
while let Some(arch) = archs.next().await.transpose()? {
|
||||
self.generate_archives(&repo.name, &arch).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Generate the archive databases for the given repository and architecture.
|
||||
async fn generate_archives(&self, repo: i32, arch: &str) -> crate::Result<()> {
|
||||
pub async fn generate_archives(&self, repo: &str, arch: &str) -> Result<()> {
|
||||
let repo = crate::db::query::repo::by_name(&self.conn, repo).await?;
|
||||
|
||||
if repo.is_none() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let repo = repo.unwrap();
|
||||
|
||||
let [tmp_ar_db_path, tmp_ar_files_path, files_tmp_file_path, desc_tmp_file_path] =
|
||||
self.random_file_paths();
|
||||
let ar_db = archive::RepoArchiveWriter::open(&tmp_ar_db_path).await?;
|
||||
|
|
@ -109,15 +73,13 @@ impl RepoMgr {
|
|||
|
||||
// Query all packages in the repo that have the given architecture or the "any"
|
||||
// architecture
|
||||
let mut pkgs = db::query::package::pkgs_to_sync(&self.conn, repo, arch)
|
||||
let mut pkgs = repo
|
||||
.find_related(crate::db::Package)
|
||||
.filter(db::package::Column::Arch.is_in([arch, ANY_ARCH]))
|
||||
.stream(&self.conn)
|
||||
.await?;
|
||||
|
||||
let mut commited_ids: Vec<i32> = Vec::new();
|
||||
|
||||
while let Some(pkg) = pkgs.next().await.transpose()? {
|
||||
commited_ids.push(pkg.id);
|
||||
|
||||
let mut files_tmp_file = tokio::fs::File::create(&files_tmp_file_path).await?;
|
||||
let mut desc_tmp_file = tokio::fs::File::create(&desc_tmp_file_path).await?;
|
||||
|
||||
|
|
@ -141,7 +103,7 @@ impl RepoMgr {
|
|||
ar_db.close().await?;
|
||||
ar_files.close().await?;
|
||||
|
||||
let repo_dir = self.repos_dir.join(repo.to_string());
|
||||
let repo_dir = self.distro_dir.join(&repo.name);
|
||||
|
||||
// Move the db archives to their respective places
|
||||
tokio::fs::rename(tmp_ar_db_path, repo_dir.join(format!("{}.db.tar.gz", arch))).await?;
|
||||
|
|
@ -151,235 +113,176 @@ impl RepoMgr {
|
|||
)
|
||||
.await?;
|
||||
|
||||
// Only after we have successfully written everything to disk do we update the database.
|
||||
// This order ensures any failure can be recovered, as the database is our single source of
|
||||
// truth.
|
||||
db::Package::update_many()
|
||||
.col_expr(
|
||||
db::package::Column::State,
|
||||
Expr::value(db::PackageState::Committed),
|
||||
)
|
||||
.filter(db::package::Column::Id.is_in(commited_ids))
|
||||
.exec(&self.conn)
|
||||
.await?;
|
||||
|
||||
// If this fails there's no point in failing the function + if there were no packages in
|
||||
// the repo, this fails anyway because the temp file doesn't exist
|
||||
let _ = tokio::fs::remove_file(desc_tmp_file_path).await;
|
||||
let _ = tokio::fs::remove_file(files_tmp_file_path).await;
|
||||
|
||||
tracing::info!("Package archives generated for repo {} ('{}')", repo, arch);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Clean any remaining old package files from the database and file system
|
||||
pub async fn remove_stale_pkgs(&self) -> crate::Result<()> {
|
||||
let mut pkgs = db::query::package::stale_pkgs(&self.conn)
|
||||
/// Remove the repo with the given name, if it existed
|
||||
pub async fn remove_repo(&self, repo: &str) -> Result<bool> {
|
||||
let res = db::query::repo::by_name(&self.conn, repo).await?;
|
||||
|
||||
if let Some(repo_entry) = res {
|
||||
// Remove repository from database
|
||||
repo_entry.delete(&self.conn).await?;
|
||||
|
||||
// Remove files from file system
|
||||
tokio::fs::remove_dir_all(self.distro_dir.join(repo)).await?;
|
||||
|
||||
Ok(true)
|
||||
} else {
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
|
||||
/// Remove all packages from the repository with the given arch.
|
||||
pub async fn remove_repo_arch(&self, repo: &str, arch: &str) -> Result<bool> {
|
||||
let repo = db::query::repo::by_name(&self.conn, repo).await?;
|
||||
|
||||
if let Some(repo) = repo {
|
||||
let mut pkgs = repo
|
||||
.find_related(db::Package)
|
||||
.filter(db::package::Column::Arch.eq(arch))
|
||||
.stream(&self.conn)
|
||||
.await?;
|
||||
|
||||
// Ids are monotonically increasing, so the max id suffices to know which packages to
|
||||
// remove later
|
||||
let mut max_id = -1;
|
||||
let mut removed_pkgs = 0;
|
||||
|
||||
while let Some(pkg) = pkgs.next().await.transpose()? {
|
||||
// Failing to remove the package file isn't the biggest problem
|
||||
let _ = tokio::fs::remove_file(
|
||||
self.repos_dir
|
||||
.join(pkg.repo_id.to_string())
|
||||
.join(pkg.id.to_string()),
|
||||
let path = self
|
||||
.distro_dir
|
||||
.join(&repo.name)
|
||||
.join(super::package::filename(&pkg));
|
||||
tokio::fs::remove_file(path).await?;
|
||||
|
||||
pkg.delete(&self.conn).await?;
|
||||
}
|
||||
|
||||
tokio::fs::remove_file(
|
||||
self.distro_dir
|
||||
.join(&repo.name)
|
||||
.join(format!("{}.db.tar.gz", arch)),
|
||||
)
|
||||
.await;
|
||||
|
||||
if pkg.id > max_id {
|
||||
max_id = pkg.id;
|
||||
}
|
||||
|
||||
removed_pkgs += 1;
|
||||
}
|
||||
|
||||
if removed_pkgs > 0 {
|
||||
db::query::package::delete_stale_pkgs(&self.conn, max_id).await?;
|
||||
}
|
||||
|
||||
tracing::info!("Removed {removed_pkgs} stale package(s)");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn pkg_parse_task(&self) {
|
||||
loop {
|
||||
// Receive the next message and immediately drop the mutex afterwards. As long as the
|
||||
// quue is empty, this will lock the mutex. This is okay, as the mutex will be unlocked
|
||||
// as soon as a message is received, so another worker can pick up the mutex.
|
||||
let msg = {
|
||||
let mut recv = self.pkg_queue.1.lock().await;
|
||||
recv.recv().await
|
||||
};
|
||||
|
||||
if let Some(msg) = msg {
|
||||
// TODO better handle this error (retry if failure wasn't because the package is
|
||||
// faulty)
|
||||
let _ = self
|
||||
.add_pkg_from_path(msg.path, msg.repo)
|
||||
.await
|
||||
.inspect_err(|e| tracing::error!("{:?}", e));
|
||||
|
||||
let old = self
|
||||
.repos
|
||||
.read()
|
||||
.await
|
||||
.get(&msg.repo)
|
||||
.map(|n| n.0.fetch_sub(1, Ordering::SeqCst));
|
||||
|
||||
// Every time the queue for a repo becomes empty, we run a sync job
|
||||
if old == Some(1) {
|
||||
// TODO error handling
|
||||
let _ = self.sync_repo(msg.repo).await;
|
||||
|
||||
// TODO move this so that we only clean if entire queue is empty, not just
|
||||
// queue for specific repo
|
||||
let _ = self.remove_stale_pkgs().await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn queue_pkg(&self, repo: i32, path: PathBuf) {
|
||||
self.pkg_queue.0.send(PkgQueueMsg { path, repo }).unwrap();
|
||||
self.repos.read().await.get(&repo).inspect(|n| {
|
||||
n.0.fetch_add(1, Ordering::SeqCst);
|
||||
});
|
||||
}
|
||||
|
||||
pub async fn get_repo(&self, distro: &str, repo: &str) -> crate::Result<Option<i32>> {
|
||||
Ok(db::Repo::find()
|
||||
.find_also_related(db::Distro)
|
||||
.filter(
|
||||
Condition::all()
|
||||
.add(db::repo::Column::Name.eq(repo))
|
||||
.add(db::distro::Column::Name.eq(distro)),
|
||||
.await?;
|
||||
tokio::fs::remove_file(
|
||||
self.distro_dir
|
||||
.join(&repo.name)
|
||||
.join(format!("{}.files.tar.gz", arch)),
|
||||
)
|
||||
.one(&self.conn)
|
||||
.await
|
||||
.map(|res| res.map(|(repo, _)| repo.id))?)
|
||||
}
|
||||
|
||||
pub async fn get_or_create_repo(&self, distro: &str, repo: &str) -> crate::Result<i32> {
|
||||
let mut repos = self.repos.write().await;
|
||||
|
||||
let distro_id: Option<i32> = db::Distro::find()
|
||||
.filter(db::distro::Column::Name.eq(distro))
|
||||
.select_only()
|
||||
.column(db::distro::Column::Id)
|
||||
.into_tuple()
|
||||
.one(&self.conn)
|
||||
.await?;
|
||||
|
||||
let distro_id = if let Some(id) = distro_id {
|
||||
id
|
||||
} else {
|
||||
let new_distro = db::distro::ActiveModel {
|
||||
id: NotSet,
|
||||
name: Set(distro.to_string()),
|
||||
description: NotSet,
|
||||
};
|
||||
|
||||
new_distro.insert(&self.conn).await?.id
|
||||
};
|
||||
|
||||
let repo_id: Option<i32> = db::Repo::find()
|
||||
.filter(db::repo::Column::DistroId.eq(distro_id))
|
||||
.filter(db::repo::Column::Name.eq(repo))
|
||||
.select_only()
|
||||
.column(db::repo::Column::Id)
|
||||
.into_tuple()
|
||||
.one(&self.conn)
|
||||
.await?;
|
||||
|
||||
let repo_id = if let Some(id) = repo_id {
|
||||
id
|
||||
} else {
|
||||
let new_repo = db::repo::ActiveModel {
|
||||
id: NotSet,
|
||||
distro_id: Set(distro_id),
|
||||
name: Set(repo.to_string()),
|
||||
description: NotSet,
|
||||
};
|
||||
let id = new_repo.insert(&self.conn).await?.id;
|
||||
|
||||
tokio::fs::create_dir(self.repos_dir.join(id.to_string())).await?;
|
||||
repos.insert(id, Default::default());
|
||||
|
||||
id
|
||||
};
|
||||
|
||||
Ok(repo_id)
|
||||
// If we removed all "any" packages, we need to resync all databases
|
||||
if arch == ANY_ARCH {
|
||||
self.generate_archives_all(&repo.name).await?;
|
||||
}
|
||||
|
||||
async fn add_pkg_from_path<P: AsRef<Path>>(&self, path: P, repo: i32) -> crate::Result<()> {
|
||||
let path_clone = path.as_ref().to_path_buf();
|
||||
Ok(true)
|
||||
} else {
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn remove_pkg(&self, repo: &str, arch: &str, name: &str) -> Result<bool> {
|
||||
let repo = db::query::repo::by_name(&self.conn, repo).await?;
|
||||
|
||||
if let Some(repo) = repo {
|
||||
let pkg =
|
||||
db::query::package::by_fields(&self.conn, repo.id, arch, name, None, None).await?;
|
||||
|
||||
if let Some(pkg) = pkg {
|
||||
// Remove package from database & file system
|
||||
tokio::fs::remove_file(
|
||||
self.distro_dir
|
||||
.join(&repo.name)
|
||||
.join(super::package::filename(&pkg)),
|
||||
)
|
||||
.await?;
|
||||
pkg.delete(&self.conn).await?;
|
||||
|
||||
if arch == ANY_ARCH {
|
||||
self.generate_archives_all(&repo.name).await?;
|
||||
} else {
|
||||
self.generate_archives(&repo.name, arch).await?;
|
||||
}
|
||||
|
||||
Ok(true)
|
||||
} else {
|
||||
Ok(false)
|
||||
}
|
||||
} else {
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn add_pkg_from_reader<R: AsyncRead + std::marker::Unpin>(
|
||||
&self,
|
||||
reader: &mut R,
|
||||
repo: &str,
|
||||
) -> crate::Result<(String, String, String)> {
|
||||
let [path] = self.random_file_paths();
|
||||
let mut temp_file = tokio::fs::File::create(&path).await?;
|
||||
|
||||
tokio::io::copy(reader, &mut temp_file).await?;
|
||||
|
||||
let path_clone = path.clone();
|
||||
let pkg = tokio::task::spawn_blocking(move || package::Package::open(path_clone))
|
||||
.await
|
||||
.unwrap()?;
|
||||
|
||||
// TODO prevent database from being updated but file failing to move to repo dir?
|
||||
let pkg = db::query::package::insert(&self.conn, repo, pkg).await?;
|
||||
let repo_dir = self.distro_dir.join(repo);
|
||||
|
||||
let dest_path = self
|
||||
.repos_dir
|
||||
.join(repo.to_string())
|
||||
.join(pkg.id.to_string());
|
||||
tokio::fs::rename(path.as_ref(), dest_path).await?;
|
||||
let repo_id = if let Some(repo) = db::query::repo::by_name(&self.conn, &repo).await? {
|
||||
repo.id
|
||||
} else {
|
||||
tokio::fs::create_dir(&repo_dir).await?;
|
||||
|
||||
tracing::info!(
|
||||
"Added '{}-{}-{}' to repository {}",
|
||||
pkg.name,
|
||||
pkg.version,
|
||||
pkg.arch,
|
||||
repo,
|
||||
);
|
||||
db::query::repo::insert(&self.conn, self.distro_id, repo, None)
|
||||
.await?
|
||||
.id
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn remove_repo(&self, repo: i32) -> crate::Result<()> {
|
||||
self.repos.write().await.remove(&repo);
|
||||
db::Repo::delete_by_id(repo).exec(&self.conn).await?;
|
||||
let _ = tokio::fs::remove_dir_all(self.repos_dir.join(repo.to_string())).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Remove all packages in the repository that have a given arch. This method marks all
|
||||
/// packages with the given architecture as "pending deletion", before performing a manual sync
|
||||
/// & removal of stale packages.
|
||||
pub async fn remove_repo_arch(&self, repo: i32, arch: &str) -> crate::Result<()> {
|
||||
db::Package::update_many()
|
||||
.col_expr(
|
||||
db::package::Column::State,
|
||||
Expr::value(db::PackageState::PendingDeletion),
|
||||
// If the package already exists in the database, we remove it first
|
||||
let res = db::query::package::by_fields(
|
||||
&self.conn,
|
||||
repo_id,
|
||||
&pkg.info.arch,
|
||||
&pkg.info.name,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.filter(
|
||||
Condition::all()
|
||||
.add(db::package::Column::RepoId.eq(repo))
|
||||
.add(db::package::Column::Arch.eq(arch)),
|
||||
)
|
||||
.exec(&self.conn)
|
||||
.await?;
|
||||
|
||||
self.sync_repo(repo).await?;
|
||||
self.remove_stale_pkgs().await?;
|
||||
|
||||
Ok(())
|
||||
if let Some(entry) = res {
|
||||
entry.delete(&self.conn).await?;
|
||||
}
|
||||
|
||||
let dest_pkg_path = repo_dir.join(pkg.file_name());
|
||||
|
||||
// Insert new package into database
|
||||
let name = pkg.info.name.clone();
|
||||
let version = pkg.info.version.clone();
|
||||
let arch = pkg.info.arch.clone();
|
||||
db::query::package::insert(&self.conn, repo_id, pkg).await?;
|
||||
|
||||
// Move the package to its final resting place
|
||||
tokio::fs::rename(path, dest_pkg_path).await?;
|
||||
|
||||
// Synchronize archive databases
|
||||
if arch == ANY_ARCH {
|
||||
self.generate_archives_all(repo).await?;
|
||||
} else {
|
||||
self.generate_archives(repo, &arch).await?;
|
||||
}
|
||||
|
||||
Ok((name, version, arch))
|
||||
}
|
||||
|
||||
/// Generate a path to a unique file that can be used as a temporary file
|
||||
pub fn random_file_paths<const C: usize>(&self) -> [PathBuf; C] {
|
||||
std::array::from_fn(|_| {
|
||||
let uuid: uuid::fmt::Simple = Uuid::new_v4().into();
|
||||
self.repos_dir.join(uuid.to_string())
|
||||
self.distro_dir.join(uuid.to_string())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,9 +2,7 @@ mod archive;
|
|||
mod manager;
|
||||
pub mod package;
|
||||
|
||||
pub use manager::RepoMgr;
|
||||
|
||||
use crate::FsConfig;
|
||||
pub use manager::DistroMgr;
|
||||
|
||||
use axum::{
|
||||
body::Body,
|
||||
|
|
@ -49,14 +47,15 @@ async fn get_file(
|
|||
Path((distro, repo, arch, file_name)): Path<(String, String, String, String)>,
|
||||
req: Request<Body>,
|
||||
) -> crate::Result<impl IntoResponse> {
|
||||
if let Some(repo_id) = global.mgr.get_repo(&distro, &repo).await? {
|
||||
match global.config.fs {
|
||||
FsConfig::Local { data_dir } => {
|
||||
let repo_dir = data_dir.join("repos").join(repo_id.to_string());
|
||||
let repo_dir = global
|
||||
.config
|
||||
.data_dir
|
||||
.join("distros")
|
||||
.join(&distro)
|
||||
.join(&repo);
|
||||
|
||||
let file_name = if file_name == format!("{}.db", repo)
|
||||
|| file_name == format!("{}.db.tar.gz", repo)
|
||||
{
|
||||
let file_name =
|
||||
if file_name == format!("{}.db", repo) || file_name == format!("{}.db.tar.gz", repo) {
|
||||
format!("{}.db.tar.gz", arch)
|
||||
} else if file_name == format!("{}.files", repo)
|
||||
|| file_name == format!("{}.files.tar.gz", repo)
|
||||
|
|
@ -66,96 +65,87 @@ async fn get_file(
|
|||
file_name
|
||||
};
|
||||
|
||||
let path = repo_dir.join(file_name);
|
||||
Ok(ServeFile::new(path).oneshot(req).await)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Err(StatusCode::NOT_FOUND.into())
|
||||
}
|
||||
Ok(ServeFile::new(repo_dir.join(file_name)).oneshot(req).await)
|
||||
}
|
||||
|
||||
async fn post_package_archive(
|
||||
State(global): State<crate::Global>,
|
||||
Path((distro, repo)): Path<(String, String)>,
|
||||
body: Body,
|
||||
) -> crate::Result<StatusCode> {
|
||||
) -> crate::Result<()> {
|
||||
let mut body = StreamReader::new(body.into_data_stream().map_err(std::io::Error::other));
|
||||
let repo = global.mgr.get_or_create_repo(&distro, &repo).await?;
|
||||
let [tmp_path] = global.mgr.random_file_paths();
|
||||
let mgr = global.mgr.get_or_create_mgr(&distro).await?;
|
||||
let (name, version, arch) = mgr.add_pkg_from_reader(&mut body, &repo).await?;
|
||||
|
||||
let mut tmp_file = tokio::fs::File::create(&tmp_path).await?;
|
||||
tokio::io::copy(&mut body, &mut tmp_file).await?;
|
||||
tracing::info!(
|
||||
"Added '{}-{}' to repository '{}' ({})",
|
||||
name,
|
||||
version,
|
||||
repo,
|
||||
arch
|
||||
);
|
||||
|
||||
global.mgr.queue_pkg(repo, tmp_path).await;
|
||||
|
||||
Ok(StatusCode::ACCEPTED)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn delete_repo(
|
||||
State(global): State<crate::Global>,
|
||||
Path((distro, repo)): Path<(String, String)>,
|
||||
) -> crate::Result<StatusCode> {
|
||||
if let Some(repo) = global.mgr.get_repo(&distro, &repo).await? {
|
||||
global.mgr.remove_repo(repo).await?;
|
||||
if let Some(mgr) = global.mgr.get_mgr(&distro).await {
|
||||
let repo_removed = mgr.remove_repo(&repo).await?;
|
||||
|
||||
tracing::info!("Removed repository {repo}");
|
||||
if repo_removed {
|
||||
tracing::info!("Removed repository '{}'", repo);
|
||||
|
||||
Ok(StatusCode::OK)
|
||||
} else {
|
||||
Ok(StatusCode::NOT_FOUND)
|
||||
}
|
||||
} else {
|
||||
Ok(StatusCode::NOT_FOUND)
|
||||
}
|
||||
}
|
||||
|
||||
async fn delete_arch_repo(
|
||||
State(global): State<crate::Global>,
|
||||
Path((distro, repo, arch)): Path<(String, String, String)>,
|
||||
) -> crate::Result<StatusCode> {
|
||||
if let Some(repo) = global.mgr.get_repo(&distro, &repo).await? {
|
||||
global.mgr.remove_repo_arch(repo, &arch).await?;
|
||||
if let Some(mgr) = global.mgr.get_mgr(&distro).await {
|
||||
let repo_removed = mgr.remove_repo_arch(&repo, &arch).await?;
|
||||
|
||||
tracing::info!("Removed architecture '{arch}' from repository {repo}");
|
||||
if repo_removed {
|
||||
tracing::info!("Removed arch '{}' from repository '{}'", arch, repo);
|
||||
|
||||
Ok(StatusCode::OK)
|
||||
} else {
|
||||
Ok(StatusCode::NOT_FOUND)
|
||||
}
|
||||
//if let Some(mgr) = global.mgr.get_mgr(&distro).await {
|
||||
// let repo_removed = mgr.remove_repo_arch(&repo, &arch).await?;
|
||||
//
|
||||
// if repo_removed {
|
||||
// tracing::info!("Removed arch '{}' from repository '{}'", arch, repo);
|
||||
//
|
||||
// Ok(StatusCode::OK)
|
||||
// } else {
|
||||
// Ok(StatusCode::NOT_FOUND)
|
||||
// }
|
||||
//} else {
|
||||
// Ok(StatusCode::NOT_FOUND)
|
||||
//}
|
||||
} else {
|
||||
Ok(StatusCode::NOT_FOUND)
|
||||
}
|
||||
}
|
||||
|
||||
async fn delete_package(
|
||||
State(global): State<crate::Global>,
|
||||
Path((distro, repo, arch, pkg_name)): Path<(String, String, String, String)>,
|
||||
) -> crate::Result<StatusCode> {
|
||||
if let Some(mgr) = global.mgr.get_mgr(&distro).await {
|
||||
let pkg_removed = mgr.remove_pkg(&repo, &arch, &pkg_name).await?;
|
||||
|
||||
if pkg_removed {
|
||||
tracing::info!(
|
||||
"Removed package '{}' ({}) from repository '{}'",
|
||||
pkg_name,
|
||||
arch,
|
||||
repo
|
||||
);
|
||||
|
||||
Ok(StatusCode::OK)
|
||||
} else {
|
||||
Ok(StatusCode::NOT_FOUND)
|
||||
//if let Some(mgr) = global.mgr.get_mgr(&distro).await {
|
||||
// let pkg_removed = mgr.remove_pkg(&repo, &arch, &pkg_name).await?;
|
||||
//
|
||||
// if pkg_removed {
|
||||
// tracing::info!(
|
||||
// "Removed package '{}' ({}) from repository '{}'",
|
||||
// pkg_name,
|
||||
// arch,
|
||||
// repo
|
||||
// );
|
||||
//
|
||||
// Ok(StatusCode::OK)
|
||||
// } else {
|
||||
// Ok(StatusCode::NOT_FOUND)
|
||||
// }
|
||||
//} else {
|
||||
// Ok(StatusCode::NOT_FOUND)
|
||||
//}
|
||||
}
|
||||
} else {
|
||||
Ok(StatusCode::NOT_FOUND)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -323,7 +323,7 @@ pub async fn write_desc<W: AsyncWrite + std::marker::Unpin>(
|
|||
pkg: &package::Model,
|
||||
) -> crate::Result<()> {
|
||||
writer
|
||||
.write_all(format!("%FILENAME%\n{}\n", pkg.id).as_bytes())
|
||||
.write_all(format!("%FILENAME%\n{}\n", filename(pkg)).as_bytes())
|
||||
.await?;
|
||||
|
||||
write_attribute(writer, "NAME", &pkg.name).await?;
|
||||
|
|
@ -397,8 +397,6 @@ pub async fn write_desc<W: AsyncWrite + std::marker::Unpin>(
|
|||
write_attribute(writer, key, &items.join("\n")).await?;
|
||||
}
|
||||
|
||||
writer.flush().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
@ -419,7 +417,5 @@ pub async fn write_files<W: AsyncWrite + std::marker::Unpin>(
|
|||
.await?;
|
||||
}
|
||||
|
||||
writer.flush().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue