Compare commits
2 Commits
27afb3496d
...
97e42588ed
Author | SHA1 | Date |
---|---|---|
Jef Roosens | 97e42588ed | |
Jef Roosens | e17269ac3b |
|
@ -1,11 +1,17 @@
|
||||||
api_key = "test"
|
api_key = "test"
|
||||||
port = 8000
|
pkg_workers = 2
|
||||||
log_level = "tower_http=debug,rieterd=debug"
|
log_level = "rieterd=debug"
|
||||||
|
|
||||||
[fs]
|
[fs]
|
||||||
type = "locl"
|
type = "local"
|
||||||
data_dir = "./data"
|
data_dir = "./data"
|
||||||
|
|
||||||
[db]
|
[db]
|
||||||
type = "sqlite"
|
type = "sqlite"
|
||||||
db_dir = "./data"
|
db_dir = "./data"
|
||||||
|
# [db]
|
||||||
|
# type = "postgres"
|
||||||
|
# host = "localhost"
|
||||||
|
# db = "rieter"
|
||||||
|
# user = "rieter"
|
||||||
|
# password = "rieter"
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
use crate::{distro::MetaDistroMgr, Config, Global};
|
use crate::{distro::MetaDistroMgr, Config, FsConfig, Global};
|
||||||
|
|
||||||
use std::{io, path::PathBuf, sync::Arc};
|
use std::{io, path::PathBuf, sync::Arc};
|
||||||
|
|
||||||
|
@ -12,13 +12,6 @@ use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
||||||
#[derive(Parser)]
|
#[derive(Parser)]
|
||||||
#[command(author, version, about, long_about = None)]
|
#[command(author, version, about, long_about = None)]
|
||||||
pub struct Cli {
|
pub struct Cli {
|
||||||
/// Directory where repository metadata & SQLite database is stored
|
|
||||||
#[arg(env = "RIETER_DATA_DIR")]
|
|
||||||
pub data_dir: PathBuf,
|
|
||||||
/// API key to authenticate private routes with
|
|
||||||
#[arg(env = "RIETER_API_KEY")]
|
|
||||||
pub api_key: String,
|
|
||||||
|
|
||||||
#[arg(
|
#[arg(
|
||||||
short,
|
short,
|
||||||
long,
|
long,
|
||||||
|
@ -26,89 +19,54 @@ pub struct Cli {
|
||||||
default_value = "./rieterd.toml"
|
default_value = "./rieterd.toml"
|
||||||
)]
|
)]
|
||||||
pub config_file: PathBuf,
|
pub config_file: PathBuf,
|
||||||
|
|
||||||
/// Database connection URL; either sqlite:// or postgres://. Defaults to rieter.sqlite in the
|
|
||||||
/// data directory
|
|
||||||
#[arg(short, long, env = "RIETER_DATABASE_URL")]
|
|
||||||
pub database_url: Option<String>,
|
|
||||||
/// Port the server will listen on
|
|
||||||
#[arg(
|
|
||||||
short,
|
|
||||||
long,
|
|
||||||
value_name = "PORT",
|
|
||||||
default_value_t = 8000,
|
|
||||||
env = "RIETER_PORT"
|
|
||||||
)]
|
|
||||||
pub port: u16,
|
|
||||||
/// Log levels for the tracing
|
|
||||||
#[arg(
|
|
||||||
long,
|
|
||||||
value_name = "LOG_LEVEL",
|
|
||||||
default_value = "tower_http=debug,rieterd=debug,sea_orm=debug",
|
|
||||||
env = "RIETER_LOG"
|
|
||||||
)]
|
|
||||||
pub log: String,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Cli {
|
impl Cli {
|
||||||
pub fn init_tracing(&self) {
|
pub async fn run(&self) -> crate::Result<()> {
|
||||||
|
let config: Config = Config::figment(&self.config_file)
|
||||||
|
.extract()
|
||||||
|
.inspect_err(|e| tracing::error!("{}", e))?;
|
||||||
|
|
||||||
tracing_subscriber::registry()
|
tracing_subscriber::registry()
|
||||||
.with(tracing_subscriber::EnvFilter::new(self.log.clone()))
|
.with(tracing_subscriber::EnvFilter::new(config.log_level.clone()))
|
||||||
.with(tracing_subscriber::fmt::layer())
|
.with(tracing_subscriber::fmt::layer())
|
||||||
.init();
|
.init();
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn run(&self) -> crate::Result<()> {
|
tracing::info!("Connecting to database");
|
||||||
self.init_tracing();
|
let db = crate::db::connect(&config.db).await?;
|
||||||
|
|
||||||
//tracing::debug!("{:?}", &self.config_file);
|
|
||||||
//let new_config: crate::config::Config = crate::config::Config::figment(&self.config_file).extract().inspect_err(
|
|
||||||
// |e| tracing::error!("{}", e)
|
|
||||||
//)?;
|
|
||||||
//tracing::debug!("{:?}", new_config);
|
|
||||||
|
|
||||||
let db_url = if let Some(url) = &self.database_url {
|
|
||||||
url.clone()
|
|
||||||
} else {
|
|
||||||
format!(
|
|
||||||
"sqlite://{}?mode=rwc",
|
|
||||||
self.data_dir.join("rieter.sqlite").to_string_lossy()
|
|
||||||
)
|
|
||||||
};
|
|
||||||
|
|
||||||
debug!("Connecting to database with URL {}", db_url);
|
|
||||||
|
|
||||||
let mut options = sea_orm::ConnectOptions::new(db_url);
|
|
||||||
options.max_connections(16);
|
|
||||||
|
|
||||||
let db = sea_orm::Database::connect(options).await?;
|
|
||||||
crate::db::Migrator::up(&db, None).await?;
|
crate::db::Migrator::up(&db, None).await?;
|
||||||
|
|
||||||
debug!("Successfully applied migrations");
|
let mgr = match &config.fs {
|
||||||
|
FsConfig::Local { data_dir } => {
|
||||||
let config = Config {
|
crate::repo::RepoMgr::new(data_dir.join("repos"), db.clone()).await?
|
||||||
data_dir: self.data_dir.clone(),
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let mgr =
|
let mgr = Arc::new(mgr);
|
||||||
Arc::new(crate::repo::RepoMgr::new(&self.data_dir.join("repos"), db.clone()).await?);
|
|
||||||
|
|
||||||
for _ in 0..1 {
|
for _ in 0..config.pkg_workers {
|
||||||
let clone = Arc::clone(&mgr);
|
let clone = Arc::clone(&mgr);
|
||||||
|
|
||||||
tokio::spawn(async move { clone.pkg_parse_task().await });
|
tokio::spawn(async move { clone.pkg_parse_task().await });
|
||||||
}
|
}
|
||||||
|
|
||||||
let global = Global { config, mgr, db };
|
let global = Global {
|
||||||
|
config: config.clone(),
|
||||||
|
mgr,
|
||||||
|
db,
|
||||||
|
};
|
||||||
|
|
||||||
// build our application with a single route
|
// build our application with a single route
|
||||||
let app = Router::new()
|
let app = Router::new()
|
||||||
.nest("/api", crate::api::router())
|
.nest("/api", crate::api::router())
|
||||||
.merge(crate::repo::router(&self.api_key))
|
.merge(crate::repo::router(&config.api_key))
|
||||||
.with_state(global)
|
.with_state(global)
|
||||||
.layer(TraceLayer::new_for_http());
|
.layer(TraceLayer::new_for_http());
|
||||||
|
|
||||||
let domain: String = format!("0.0.0.0:{}", self.port).parse().unwrap();
|
let domain: String = format!("{}:{}", config.domain, config.port)
|
||||||
|
.parse()
|
||||||
|
.unwrap();
|
||||||
let listener = tokio::net::TcpListener::bind(domain).await?;
|
let listener = tokio::net::TcpListener::bind(domain).await?;
|
||||||
// run it with hyper on localhost:3000
|
// run it with hyper on localhost:3000
|
||||||
Ok(axum::serve(listener, app.into_make_service())
|
Ok(axum::serve(listener, app.into_make_service())
|
||||||
|
|
|
@ -6,34 +6,49 @@ use figment::{
|
||||||
};
|
};
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
|
|
||||||
#[derive(Deserialize, Debug)]
|
#[derive(Deserialize, Debug, Clone)]
|
||||||
#[serde(rename_all = "lowercase")]
|
#[serde(rename_all = "lowercase")]
|
||||||
#[serde(tag = "type")]
|
#[serde(tag = "type")]
|
||||||
pub enum FsConfig {
|
pub enum FsConfig {
|
||||||
Local { data_dir: PathBuf },
|
Local { data_dir: PathBuf },
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Deserialize, Debug)]
|
#[derive(Deserialize, Debug, Clone)]
|
||||||
#[serde(rename_all = "lowercase")]
|
#[serde(rename_all = "lowercase")]
|
||||||
#[serde(tag = "type")]
|
#[serde(tag = "type")]
|
||||||
pub enum DbConfig {
|
pub enum DbConfig {
|
||||||
Sqlite {
|
Sqlite {
|
||||||
db_dir: PathBuf,
|
db_dir: PathBuf,
|
||||||
|
#[serde(default = "default_db_sqlite_max_connections")]
|
||||||
|
max_connections: u32,
|
||||||
},
|
},
|
||||||
Postgres {
|
Postgres {
|
||||||
host: String,
|
host: String,
|
||||||
|
#[serde(default = "default_db_postgres_port")]
|
||||||
|
port: u16,
|
||||||
user: String,
|
user: String,
|
||||||
password: String,
|
password: String,
|
||||||
|
db: String,
|
||||||
|
#[serde(default)]
|
||||||
|
schema: String,
|
||||||
|
#[serde(default = "default_db_postgres_max_connections")]
|
||||||
|
max_connections: u32,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Deserialize, Debug)]
|
#[derive(Deserialize, Debug, Clone)]
|
||||||
pub struct Config {
|
pub struct Config {
|
||||||
api_key: String,
|
pub api_key: String,
|
||||||
port: u16,
|
#[serde(default = "default_domain")]
|
||||||
log_level: String,
|
pub domain: String,
|
||||||
fs: FsConfig,
|
#[serde(default = "default_port")]
|
||||||
db: DbConfig,
|
pub port: u16,
|
||||||
|
#[serde(default = "default_log_level")]
|
||||||
|
pub log_level: String,
|
||||||
|
pub fs: FsConfig,
|
||||||
|
pub db: DbConfig,
|
||||||
|
#[serde(default = "default_pkg_workers")]
|
||||||
|
pub pkg_workers: u32,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Config {
|
impl Config {
|
||||||
|
@ -43,3 +58,31 @@ impl Config {
|
||||||
.merge(Env::prefixed("RIETER_"))
|
.merge(Env::prefixed("RIETER_"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn default_domain() -> String {
|
||||||
|
String::from("0.0.0.0")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn default_port() -> u16 {
|
||||||
|
8000
|
||||||
|
}
|
||||||
|
|
||||||
|
fn default_log_level() -> String {
|
||||||
|
String::from("tower_http=debug,rieterd=debug,sea_orm=debug")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn default_db_sqlite_max_connections() -> u32 {
|
||||||
|
16
|
||||||
|
}
|
||||||
|
|
||||||
|
fn default_db_postgres_port() -> u16 {
|
||||||
|
5432
|
||||||
|
}
|
||||||
|
|
||||||
|
fn default_db_postgres_max_connections() -> u32 {
|
||||||
|
16
|
||||||
|
}
|
||||||
|
|
||||||
|
fn default_pkg_workers() -> u32 {
|
||||||
|
1
|
||||||
|
}
|
||||||
|
|
|
@ -2,10 +2,12 @@ pub mod entities;
|
||||||
mod migrator;
|
mod migrator;
|
||||||
pub mod query;
|
pub mod query;
|
||||||
|
|
||||||
|
use crate::config::DbConfig;
|
||||||
|
|
||||||
pub use entities::{prelude::*, *};
|
pub use entities::{prelude::*, *};
|
||||||
pub use migrator::Migrator;
|
pub use migrator::Migrator;
|
||||||
|
|
||||||
use sea_orm::{DeriveActiveEnum, EnumIter};
|
use sea_orm::{ConnectionTrait, Database, DbConn, DeriveActiveEnum, EnumIter};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
type Result<T> = std::result::Result<T, sea_orm::DbErr>;
|
type Result<T> = std::result::Result<T, sea_orm::DbErr>;
|
||||||
|
@ -50,3 +52,50 @@ pub struct FullPackage {
|
||||||
related: Vec<(PackageRelatedEnum, String)>,
|
related: Vec<(PackageRelatedEnum, String)>,
|
||||||
files: Vec<String>,
|
files: Vec<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn connect(conn: &DbConfig) -> crate::Result<DbConn> {
|
||||||
|
match conn {
|
||||||
|
DbConfig::Sqlite {
|
||||||
|
db_dir,
|
||||||
|
max_connections,
|
||||||
|
} => {
|
||||||
|
let url = format!(
|
||||||
|
"sqlite://{}?mode=rwc",
|
||||||
|
db_dir.join("rieter.sqlite").to_string_lossy()
|
||||||
|
);
|
||||||
|
let options = sea_orm::ConnectOptions::new(url)
|
||||||
|
.max_connections(*max_connections)
|
||||||
|
.to_owned();
|
||||||
|
|
||||||
|
let conn = Database::connect(options).await?;
|
||||||
|
|
||||||
|
// synchronous=NORMAL still ensures database consistency with WAL mode, as per the docs
|
||||||
|
// https://www.sqlite.org/pragma.html#pragma_synchronous
|
||||||
|
conn.execute_unprepared("PRAGMA journal_mode=WAL;").await?;
|
||||||
|
conn.execute_unprepared("PRAGMA synchronous=NORMAL;")
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(conn)
|
||||||
|
}
|
||||||
|
DbConfig::Postgres {
|
||||||
|
host,
|
||||||
|
port,
|
||||||
|
db,
|
||||||
|
user,
|
||||||
|
password,
|
||||||
|
schema,
|
||||||
|
max_connections,
|
||||||
|
} => {
|
||||||
|
let mut url = format!("postgres://{}:{}@{}:{}/{}", user, password, host, port, db);
|
||||||
|
|
||||||
|
if schema != "" {
|
||||||
|
url = format!("{url}?currentSchema={schema}");
|
||||||
|
}
|
||||||
|
|
||||||
|
let options = sea_orm::ConnectOptions::new(url)
|
||||||
|
.max_connections(*max_connections)
|
||||||
|
.to_owned();
|
||||||
|
Ok(Database::connect(options).await?)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -2,7 +2,7 @@ use crate::db::{self, *};
|
||||||
|
|
||||||
use futures::Stream;
|
use futures::Stream;
|
||||||
use sea_orm::{sea_query::IntoCondition, *};
|
use sea_orm::{sea_query::IntoCondition, *};
|
||||||
use sea_query::{Alias, Asterisk, Expr, Query, SelectStatement};
|
use sea_query::{Alias, Asterisk, Expr, IntoColumnRef, Query, SelectStatement};
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
#[derive(Deserialize)]
|
||||||
|
@ -222,8 +222,8 @@ pub struct PkgToRemove {
|
||||||
pub id: i32,
|
pub id: i32,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn max_pkg_ids_query() -> SelectStatement {
|
fn max_pkg_ids_query(committed: bool) -> SelectStatement {
|
||||||
Query::select()
|
let mut query = Query::select()
|
||||||
.from(db::package::Entity)
|
.from(db::package::Entity)
|
||||||
.columns([
|
.columns([
|
||||||
db::package::Column::RepoId,
|
db::package::Column::RepoId,
|
||||||
|
@ -236,39 +236,29 @@ fn max_pkg_ids_query() -> SelectStatement {
|
||||||
db::package::Column::Arch,
|
db::package::Column::Arch,
|
||||||
db::package::Column::Name,
|
db::package::Column::Name,
|
||||||
])
|
])
|
||||||
.cond_where(
|
|
||||||
Condition::all().add(db::package::Column::State.eq(db::PackageState::Committed)),
|
|
||||||
)
|
|
||||||
.to_owned()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn pkgs_to_sync(
|
|
||||||
conn: &DbConn,
|
|
||||||
repo: i32,
|
|
||||||
arch: &str,
|
|
||||||
) -> SelectorRaw<SelectModel<package::Model>> {
|
|
||||||
let max_id_query = Query::select()
|
|
||||||
.columns([
|
|
||||||
db::package::Column::RepoId,
|
|
||||||
db::package::Column::Arch,
|
|
||||||
db::package::Column::Name,
|
|
||||||
])
|
|
||||||
.expr_as(db::package::Column::Id.max(), Alias::new("max_id"))
|
|
||||||
.from(db::package::Entity)
|
|
||||||
.group_by_columns([
|
|
||||||
db::package::Column::RepoId,
|
|
||||||
db::package::Column::Arch,
|
|
||||||
db::package::Column::Name,
|
|
||||||
])
|
|
||||||
.to_owned();
|
.to_owned();
|
||||||
|
|
||||||
|
if committed {
|
||||||
|
query.cond_where(db::package::Column::State.eq(db::PackageState::Committed));
|
||||||
|
}
|
||||||
|
|
||||||
|
query
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Query that returns all packages that should be included in a sync for the given repository and
|
||||||
|
/// arch.
|
||||||
|
pub fn pkgs_to_sync(
|
||||||
|
conn: &DbConn,
|
||||||
|
repo: i32,
|
||||||
|
arch: &str,
|
||||||
|
) -> SelectorRaw<SelectModel<package::Model>> {
|
||||||
let (p1, p2) = (Alias::new("p1"), Alias::new("p2"));
|
let (p1, p2) = (Alias::new("p1"), Alias::new("p2"));
|
||||||
let query = Query::select()
|
let query = Query::select()
|
||||||
.column((p1.clone(), Asterisk))
|
.columns(db::package::Column::iter().map(|c| (p1.clone(), c)))
|
||||||
.from_as(db::package::Entity, p1.clone())
|
.from_as(db::package::Entity, p1.clone())
|
||||||
.join_subquery(
|
.join_subquery(
|
||||||
JoinType::InnerJoin,
|
JoinType::InnerJoin,
|
||||||
max_id_query,
|
max_pkg_ids_query(false),
|
||||||
p2.clone(),
|
p2.clone(),
|
||||||
Expr::col((p1.clone(), db::package::Column::Id))
|
Expr::col((p1.clone(), db::package::Column::Id))
|
||||||
.eq(Expr::col((p2.clone(), Alias::new("max_id")))),
|
.eq(Expr::col((p2.clone(), Alias::new("max_id")))),
|
||||||
|
@ -276,13 +266,13 @@ pub fn pkgs_to_sync(
|
||||||
.cond_where(
|
.cond_where(
|
||||||
Condition::all()
|
Condition::all()
|
||||||
.add(Expr::col((p1.clone(), db::package::Column::RepoId)).eq(repo))
|
.add(Expr::col((p1.clone(), db::package::Column::RepoId)).eq(repo))
|
||||||
.add(
|
|
||||||
Expr::col((p1.clone(), db::package::Column::State))
|
|
||||||
.ne(db::PackageState::PendingDeletion),
|
|
||||||
)
|
|
||||||
.add(
|
.add(
|
||||||
Expr::col((p1.clone(), db::package::Column::Arch))
|
Expr::col((p1.clone(), db::package::Column::Arch))
|
||||||
.is_in([arch, crate::ANY_ARCH]),
|
.is_in([arch, crate::ANY_ARCH]),
|
||||||
|
)
|
||||||
|
.add(
|
||||||
|
Expr::col((p1.clone(), db::package::Column::State))
|
||||||
|
.ne(db::PackageState::PendingDeletion),
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
.to_owned();
|
.to_owned();
|
||||||
|
@ -293,36 +283,10 @@ pub fn pkgs_to_sync(
|
||||||
}
|
}
|
||||||
|
|
||||||
fn stale_pkgs_query(include_repo: bool) -> SelectStatement {
|
fn stale_pkgs_query(include_repo: bool) -> SelectStatement {
|
||||||
// In each repository, only one version of a package can exist for any given arch. Because ids
|
|
||||||
// are monotonically increasing, we know that the row that represents the actual package
|
|
||||||
// currently in the repository is the row with the largest id whose state is "committed". This
|
|
||||||
// query finds this id for each (repo, arch, name) tuple.
|
|
||||||
let mut max_id_query = Query::select();
|
|
||||||
max_id_query
|
|
||||||
.from(db::package::Entity)
|
|
||||||
.columns([
|
|
||||||
db::package::Column::RepoId,
|
|
||||||
db::package::Column::Arch,
|
|
||||||
db::package::Column::Name,
|
|
||||||
])
|
|
||||||
.expr_as(db::package::Column::Id.max(), Alias::new("max_id"))
|
|
||||||
.group_by_columns([
|
|
||||||
db::package::Column::RepoId,
|
|
||||||
db::package::Column::Arch,
|
|
||||||
db::package::Column::Name,
|
|
||||||
])
|
|
||||||
.cond_where(
|
|
||||||
Condition::all().add(db::package::Column::State.eq(db::PackageState::Committed)),
|
|
||||||
);
|
|
||||||
|
|
||||||
let (p1, p2) = (Alias::new("p1"), Alias::new("p2"));
|
let (p1, p2) = (Alias::new("p1"), Alias::new("p2"));
|
||||||
let mut query = Query::select();
|
let mut query = Query::select()
|
||||||
|
.from_as(db::package::Entity, p1.clone())
|
||||||
// We then perform an inner join between the max id query above and the package table, where we
|
.to_owned();
|
||||||
// filter on rows whose id is less than their respective package's max id or whose state is set
|
|
||||||
// to "pending deletion". This gives us all rows in the database that correspond to packages
|
|
||||||
// that are no longer needed, and can thus be removed.
|
|
||||||
query.from_as(db::package::Entity, p1.clone());
|
|
||||||
|
|
||||||
if include_repo {
|
if include_repo {
|
||||||
query.columns([
|
query.columns([
|
||||||
|
@ -333,10 +297,13 @@ fn stale_pkgs_query(include_repo: bool) -> SelectStatement {
|
||||||
query.column((p1.clone(), db::package::Column::Id));
|
query.column((p1.clone(), db::package::Column::Id));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// We left join on the max pkgs query because a repository that has all its packages set to
|
||||||
|
// "pending deletion" doesn't show up in the query. These are also included with a where clause
|
||||||
|
// on the joined rows.
|
||||||
query
|
query
|
||||||
.join_subquery(
|
.join_subquery(
|
||||||
JoinType::InnerJoin,
|
JoinType::LeftJoin,
|
||||||
max_id_query,
|
max_pkg_ids_query(true),
|
||||||
p2.clone(),
|
p2.clone(),
|
||||||
Condition::all()
|
Condition::all()
|
||||||
.add(
|
.add(
|
||||||
|
@ -359,11 +326,12 @@ fn stale_pkgs_query(include_repo: bool) -> SelectStatement {
|
||||||
.lt(Expr::col((p2.clone(), Alias::new("max_id")))),
|
.lt(Expr::col((p2.clone(), Alias::new("max_id")))),
|
||||||
)
|
)
|
||||||
.add(
|
.add(
|
||||||
Expr::col((p1.clone(), db::package::Column::Id))
|
Expr::col((p1.clone(), db::package::Column::State))
|
||||||
.eq(db::PackageState::PendingDeletion),
|
.eq(db::PackageState::PendingDeletion),
|
||||||
),
|
),
|
||||||
)
|
);
|
||||||
.to_owned()
|
|
||||||
|
query
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn stale_pkgs(conn: &DbConn) -> SelectorRaw<SelectModel<PkgToRemove>> {
|
pub fn stale_pkgs(conn: &DbConn) -> SelectorRaw<SelectModel<PkgToRemove>> {
|
||||||
|
|
|
@ -6,6 +6,7 @@ mod distro;
|
||||||
mod error;
|
mod error;
|
||||||
mod repo;
|
mod repo;
|
||||||
|
|
||||||
|
pub use config::{Config, DbConfig, FsConfig};
|
||||||
pub use error::{Result, ServerError};
|
pub use error::{Result, ServerError};
|
||||||
use repo::DistroMgr;
|
use repo::DistroMgr;
|
||||||
|
|
||||||
|
@ -14,14 +15,9 @@ use std::{path::PathBuf, sync::Arc};
|
||||||
|
|
||||||
pub const ANY_ARCH: &'static str = "any";
|
pub const ANY_ARCH: &'static str = "any";
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct Config {
|
|
||||||
data_dir: PathBuf,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct Global {
|
pub struct Global {
|
||||||
config: Config,
|
config: crate::config::Config,
|
||||||
mgr: Arc<repo::RepoMgr>,
|
mgr: Arc<repo::RepoMgr>,
|
||||||
db: sea_orm::DbConn,
|
db: sea_orm::DbConn,
|
||||||
}
|
}
|
||||||
|
|
|
@ -100,42 +100,6 @@ impl RepoMgr {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Clean any remaining old package files from the database and file system
|
|
||||||
pub async fn remove_stale_pkgs(&self) -> crate::Result<()> {
|
|
||||||
let mut pkgs = db::query::package::stale_pkgs(&self.conn)
|
|
||||||
.stream(&self.conn)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
// Ids are monotonically increasing, so the max id suffices to know which packages to
|
|
||||||
// remove later
|
|
||||||
let mut max_id = -1;
|
|
||||||
let mut removed_pkgs = 0;
|
|
||||||
|
|
||||||
while let Some(pkg) = pkgs.next().await.transpose()? {
|
|
||||||
// Failing to remove the package file isn't the biggest problem
|
|
||||||
let _ = tokio::fs::remove_file(
|
|
||||||
self.repos_dir
|
|
||||||
.join(pkg.repo_id.to_string())
|
|
||||||
.join(pkg.id.to_string()),
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
|
|
||||||
if pkg.id > max_id {
|
|
||||||
max_id = pkg.id;
|
|
||||||
}
|
|
||||||
|
|
||||||
removed_pkgs += 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if removed_pkgs > 0 {
|
|
||||||
db::query::package::delete_stale_pkgs(&self.conn, max_id).await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
tracing::info!("Removed {removed_pkgs} stale package(s)");
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Generate the archive databases for the given repository and architecture.
|
/// Generate the archive databases for the given repository and architecture.
|
||||||
async fn generate_archives(&self, repo: i32, arch: &str) -> crate::Result<()> {
|
async fn generate_archives(&self, repo: i32, arch: &str) -> crate::Result<()> {
|
||||||
let [tmp_ar_db_path, tmp_ar_files_path, files_tmp_file_path, desc_tmp_file_path] =
|
let [tmp_ar_db_path, tmp_ar_files_path, files_tmp_file_path, desc_tmp_file_path] =
|
||||||
|
@ -209,6 +173,42 @@ impl RepoMgr {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Clean any remaining old package files from the database and file system
|
||||||
|
pub async fn remove_stale_pkgs(&self) -> crate::Result<()> {
|
||||||
|
let mut pkgs = db::query::package::stale_pkgs(&self.conn)
|
||||||
|
.stream(&self.conn)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// Ids are monotonically increasing, so the max id suffices to know which packages to
|
||||||
|
// remove later
|
||||||
|
let mut max_id = -1;
|
||||||
|
let mut removed_pkgs = 0;
|
||||||
|
|
||||||
|
while let Some(pkg) = pkgs.next().await.transpose()? {
|
||||||
|
// Failing to remove the package file isn't the biggest problem
|
||||||
|
let _ = tokio::fs::remove_file(
|
||||||
|
self.repos_dir
|
||||||
|
.join(pkg.repo_id.to_string())
|
||||||
|
.join(pkg.id.to_string()),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
if pkg.id > max_id {
|
||||||
|
max_id = pkg.id;
|
||||||
|
}
|
||||||
|
|
||||||
|
removed_pkgs += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if removed_pkgs > 0 {
|
||||||
|
db::query::package::delete_stale_pkgs(&self.conn, max_id).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
tracing::info!("Removed {removed_pkgs} stale package(s)");
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn pkg_parse_task(&self) {
|
pub async fn pkg_parse_task(&self) {
|
||||||
loop {
|
loop {
|
||||||
// Receive the next message and immediately drop the mutex afterwards. As long as the
|
// Receive the next message and immediately drop the mutex afterwards. As long as the
|
||||||
|
@ -248,7 +248,7 @@ impl RepoMgr {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn queue_pkg(&self, repo: i32, path: PathBuf) {
|
pub async fn queue_pkg(&self, repo: i32, path: PathBuf) {
|
||||||
let _ = self.pkg_queue.0.send(PkgQueueMsg { path, repo });
|
self.pkg_queue.0.send(PkgQueueMsg { path, repo }).unwrap();
|
||||||
self.repos.read().await.get(&repo).inspect(|n| {
|
self.repos.read().await.get(&repo).inspect(|n| {
|
||||||
n.0.fetch_add(1, Ordering::SeqCst);
|
n.0.fetch_add(1, Ordering::SeqCst);
|
||||||
});
|
});
|
||||||
|
@ -291,6 +291,7 @@ impl RepoMgr {
|
||||||
};
|
};
|
||||||
|
|
||||||
let repo_id: Option<i32> = db::Repo::find()
|
let repo_id: Option<i32> = db::Repo::find()
|
||||||
|
.filter(db::repo::Column::DistroId.eq(distro_id))
|
||||||
.filter(db::repo::Column::Name.eq(repo))
|
.filter(db::repo::Column::Name.eq(repo))
|
||||||
.select_only()
|
.select_only()
|
||||||
.column(db::repo::Column::Id)
|
.column(db::repo::Column::Id)
|
||||||
|
|
|
@ -6,6 +6,8 @@ pub mod package;
|
||||||
pub use manager::DistroMgr;
|
pub use manager::DistroMgr;
|
||||||
pub use manager2::RepoMgr;
|
pub use manager2::RepoMgr;
|
||||||
|
|
||||||
|
use crate::FsConfig;
|
||||||
|
|
||||||
use axum::{
|
use axum::{
|
||||||
body::Body,
|
body::Body,
|
||||||
extract::{Path, State},
|
extract::{Path, State},
|
||||||
|
@ -50,14 +52,13 @@ async fn get_file(
|
||||||
req: Request<Body>,
|
req: Request<Body>,
|
||||||
) -> crate::Result<impl IntoResponse> {
|
) -> crate::Result<impl IntoResponse> {
|
||||||
if let Some(repo_id) = global.mgr.get_repo(&distro, &repo).await? {
|
if let Some(repo_id) = global.mgr.get_repo(&distro, &repo).await? {
|
||||||
let repo_dir = global
|
match global.config.fs {
|
||||||
.config
|
FsConfig::Local { data_dir } => {
|
||||||
.data_dir
|
let repo_dir = data_dir.join("repos").join(repo_id.to_string());
|
||||||
.join("repos")
|
|
||||||
.join(repo_id.to_string());
|
|
||||||
|
|
||||||
let file_name =
|
let file_name = if file_name == format!("{}.db", repo)
|
||||||
if file_name == format!("{}.db", repo) || file_name == format!("{}.db.tar.gz", repo) {
|
|| file_name == format!("{}.db.tar.gz", repo)
|
||||||
|
{
|
||||||
format!("{}.db.tar.gz", arch)
|
format!("{}.db.tar.gz", arch)
|
||||||
} else if file_name == format!("{}.files", repo)
|
} else if file_name == format!("{}.files", repo)
|
||||||
|| file_name == format!("{}.files.tar.gz", repo)
|
|| file_name == format!("{}.files.tar.gz", repo)
|
||||||
|
@ -69,6 +70,8 @@ async fn get_file(
|
||||||
|
|
||||||
let path = repo_dir.join(file_name);
|
let path = repo_dir.join(file_name);
|
||||||
Ok(ServeFile::new(path).oneshot(req).await)
|
Ok(ServeFile::new(path).oneshot(req).await)
|
||||||
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
Err(StatusCode::NOT_FOUND.into())
|
Err(StatusCode::NOT_FOUND.into())
|
||||||
}
|
}
|
||||||
|
@ -78,7 +81,7 @@ async fn post_package_archive(
|
||||||
State(global): State<crate::Global>,
|
State(global): State<crate::Global>,
|
||||||
Path((distro, repo)): Path<(String, String)>,
|
Path((distro, repo)): Path<(String, String)>,
|
||||||
body: Body,
|
body: Body,
|
||||||
) -> crate::Result<()> {
|
) -> crate::Result<StatusCode> {
|
||||||
let mut body = StreamReader::new(body.into_data_stream().map_err(std::io::Error::other));
|
let mut body = StreamReader::new(body.into_data_stream().map_err(std::io::Error::other));
|
||||||
let repo = global.mgr.get_or_create_repo(&distro, &repo).await?;
|
let repo = global.mgr.get_or_create_repo(&distro, &repo).await?;
|
||||||
let [tmp_path] = global.mgr.random_file_paths();
|
let [tmp_path] = global.mgr.random_file_paths();
|
||||||
|
@ -88,7 +91,7 @@ async fn post_package_archive(
|
||||||
|
|
||||||
global.mgr.queue_pkg(repo, tmp_path).await;
|
global.mgr.queue_pkg(repo, tmp_path).await;
|
||||||
|
|
||||||
Ok(())
|
Ok(StatusCode::ACCEPTED)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn delete_repo(
|
async fn delete_repo(
|
||||||
|
@ -110,7 +113,15 @@ async fn delete_arch_repo(
|
||||||
State(global): State<crate::Global>,
|
State(global): State<crate::Global>,
|
||||||
Path((distro, repo, arch)): Path<(String, String, String)>,
|
Path((distro, repo, arch)): Path<(String, String, String)>,
|
||||||
) -> crate::Result<StatusCode> {
|
) -> crate::Result<StatusCode> {
|
||||||
|
if let Some(repo) = global.mgr.get_repo(&distro, &repo).await? {
|
||||||
|
global.mgr.remove_repo_arch(repo, &arch).await?;
|
||||||
|
|
||||||
|
tracing::info!("Removed architecture '{arch}' from repository {repo}");
|
||||||
|
|
||||||
|
Ok(StatusCode::OK)
|
||||||
|
} else {
|
||||||
Ok(StatusCode::NOT_FOUND)
|
Ok(StatusCode::NOT_FOUND)
|
||||||
|
}
|
||||||
//if let Some(mgr) = global.mgr.get_mgr(&distro).await {
|
//if let Some(mgr) = global.mgr.get_mgr(&distro).await {
|
||||||
// let repo_removed = mgr.remove_repo_arch(&repo, &arch).await?;
|
// let repo_removed = mgr.remove_repo_arch(&repo, &arch).await?;
|
||||||
//
|
//
|
||||||
|
|
Loading…
Reference in New Issue