Compare commits

..

8 Commits

Author SHA1 Message Date
Jef Roosens 64d9df2e18
feat(repo): implement file downloads
ci/woodpecker/push/build Pipeline failed Details
ci/woodpecker/push/lint Pipeline failed Details
2024-07-21 22:42:06 +02:00
Jef Roosens aa0aae41ab
feat(repo): implement async workers
ci/woodpecker/push/build Pipeline failed Details
ci/woodpecker/push/lint Pipeline failed Details
2024-07-21 22:00:38 +02:00
Jef Roosens d38fd5ca74
feat(repo): write db archive parser 2024-07-21 21:10:13 +02:00
Jef Roosens 986162e926
refactor(repo): use register command for start 2024-07-21 19:25:03 +02:00
Jef Roosens d39205b653
feat: add POST api route for creating mirror repos
ci/woodpecker/push/build Pipeline failed Details
ci/woodpecker/push/lint Pipeline failed Details
2024-07-21 13:40:06 +02:00
Jef Roosens cbb04a40e0
feat: added repo mirrors migration
ci/woodpecker/push/build Pipeline was successful Details
ci/woodpecker/push/lint Pipeline failed Details
2024-07-18 22:40:31 +02:00
Jef Roosens f761e3b36d
refactor: move database entities into separate crate
ci/woodpecker/push/build Pipeline was successful Details
ci/woodpecker/push/lint Pipeline was successful Details
2024-07-16 20:38:43 +02:00
Jef Roosens 4225ce3471
refactor: moved migrations to own crate 2024-07-15 21:48:26 +02:00
45 changed files with 1497 additions and 273 deletions

View File

@ -24,6 +24,9 @@ steps:
secrets: secrets:
- minio_access_key - minio_access_key
- minio_secret_key - minio_secret_key
when:
branch: dev
event: push
publish-rel: publish-rel:
image: 'curlimages/curl' image: 'curlimages/curl'

View File

@ -25,19 +25,3 @@ steps:
when: when:
branch: dev branch: dev
event: push event: push
release:
image: 'woodpeckerci/plugin-docker-buildx'
secrets:
- 'docker_username'
- 'docker_password'
settings:
registry: 'git.rustybever.be'
repo: 'git.rustybever.be/chewing_bever/rieter'
auto_tag: true
platforms: [ 'linux/amd64' ]
build_args_from_env:
- 'CI_COMMIT_SHA'
mtu: 1300
when:
event: tag

View File

@ -7,8 +7,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased](https://git.rustybever.be/Chewing_Bever/rieter/src/branch/dev) ## [Unreleased](https://git.rustybever.be/Chewing_Bever/rieter/src/branch/dev)
## [0.1.0](https://git.rustybever.be/Chewing_Bever/rieter/src/tag/0.1.0)
### Added ### Added
* Functional repository server * Functional repository server

799
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,9 +1,10 @@
[workspace] [workspace]
members = [ members = [
'migration',
'server', 'server',
'libarchive', 'libarchive',
'libarchive3-sys' 'libarchive3-sys'
] , "entity"]
[profile.release] [profile.release]
lto = "fat" lto = "fat"

View File

@ -0,0 +1,8 @@
[package]
name = "entity"
version = "0.1.0"
edition = "2021"
[dependencies]
sea-orm = "0.12.15"
serde = { version = "1.0.204", features = ["derive"] }

View File

@ -8,6 +8,7 @@ use serde::{Deserialize, Serialize};
pub struct Model { pub struct Model {
#[sea_orm(primary_key)] #[sea_orm(primary_key)]
pub id: i32, pub id: i32,
#[sea_orm(unique)]
pub name: String, pub name: String,
pub description: Option<String>, pub description: Option<String>,
} }

View File

@ -9,3 +9,4 @@ pub mod package_group;
pub mod package_license; pub mod package_license;
pub mod package_related; pub mod package_related;
pub mod repo; pub mod repo;
pub mod repo_mirror;

View File

@ -1,11 +1,8 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.1 //! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.1
use chrono::NaiveDateTime;
use sea_orm::entity::prelude::*; use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::db::PackageState;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "package")] #[sea_orm(table_name = "package")]
pub struct Model { pub struct Model {
@ -20,14 +17,13 @@ pub struct Model {
pub c_size: i64, pub c_size: i64,
pub description: Option<String>, pub description: Option<String>,
pub url: Option<String>, pub url: Option<String>,
pub build_date: NaiveDateTime, pub build_date: DateTime,
pub packager: Option<String>, pub packager: Option<String>,
pub pgp_sig: Option<String>, pub pgp_sig: Option<String>,
pub pgp_sig_size: Option<i64>, pub pgp_sig_size: Option<i64>,
pub sha256_sum: String, pub sha256_sum: String,
pub compression: String, pub compression: String,
#[serde(skip_serializing)] pub state: crate::PackageState,
pub state: PackageState,
} }
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]

View File

@ -3,15 +3,13 @@
use sea_orm::entity::prelude::*; use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::db::PackageRelatedEnum;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "package_related")] #[sea_orm(table_name = "package_related")]
pub struct Model { pub struct Model {
#[sea_orm(primary_key, auto_increment = false)] #[sea_orm(primary_key, auto_increment = false)]
pub package_id: i32, pub package_id: i32,
#[sea_orm(primary_key, auto_increment = false)] #[sea_orm(primary_key, auto_increment = false)]
pub r#type: PackageRelatedEnum, pub r#type: crate::PackageRelatedEnum,
#[sea_orm(primary_key, auto_increment = false)] #[sea_orm(primary_key, auto_increment = false)]
pub name: String, pub name: String,
} }

View File

@ -7,3 +7,4 @@ pub use super::package_group::Entity as PackageGroup;
pub use super::package_license::Entity as PackageLicense; pub use super::package_license::Entity as PackageLicense;
pub use super::package_related::Entity as PackageRelated; pub use super::package_related::Entity as PackageRelated;
pub use super::repo::Entity as Repo; pub use super::repo::Entity as Repo;
pub use super::repo_mirror::Entity as RepoMirror;

View File

@ -9,8 +9,10 @@ pub struct Model {
#[sea_orm(primary_key)] #[sea_orm(primary_key)]
pub id: i32, pub id: i32,
pub distro_id: i32, pub distro_id: i32,
#[sea_orm(unique)]
pub name: String, pub name: String,
pub description: Option<String>, pub description: Option<String>,
pub r#type: crate::RepoType,
} }
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
@ -25,6 +27,8 @@ pub enum Relation {
Distro, Distro,
#[sea_orm(has_many = "super::package::Entity")] #[sea_orm(has_many = "super::package::Entity")]
Package, Package,
#[sea_orm(has_many = "super::repo_mirror::Entity")]
RepoMirror,
} }
impl Related<super::distro::Entity> for Entity { impl Related<super::distro::Entity> for Entity {
@ -39,4 +43,10 @@ impl Related<super::package::Entity> for Entity {
} }
} }
impl Related<super::repo_mirror::Entity> for Entity {
fn to() -> RelationDef {
Relation::RepoMirror.def()
}
}
impl ActiveModelBehavior for ActiveModel {} impl ActiveModelBehavior for ActiveModel {}

View File

@ -0,0 +1,33 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.1
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "repo_mirror")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i32,
pub repo_id: i32,
pub url: String,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(
belongs_to = "super::repo::Entity",
from = "Column::RepoId",
to = "super::repo::Column::Id",
on_update = "NoAction",
on_delete = "Cascade"
)]
Repo,
}
impl Related<super::repo::Entity> for Entity {
fn to() -> RelationDef {
Relation::Repo.def()
}
}
impl ActiveModelBehavior for ActiveModel {}

46
entity/src/lib.rs 100644
View File

@ -0,0 +1,46 @@
pub mod entity;
pub use entity::prelude::*;
pub use entity::*;
use sea_orm::{DeriveActiveEnum, EnumIter};
use serde::{Deserialize, Serialize};
#[derive(EnumIter, DeriveActiveEnum, Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
#[sea_orm(rs_type = "i32", db_type = "Integer")]
#[serde(rename_all = "lowercase")]
pub enum PackageRelatedEnum {
#[sea_orm(num_value = 0)]
Conflicts,
#[sea_orm(num_value = 1)]
Replaces,
#[sea_orm(num_value = 2)]
Provides,
#[sea_orm(num_value = 3)]
Depend,
#[sea_orm(num_value = 4)]
Makedepend,
#[sea_orm(num_value = 5)]
Checkdepend,
#[sea_orm(num_value = 6)]
Optdepend,
}
#[derive(EnumIter, DeriveActiveEnum, Deserialize, Serialize, PartialEq, Eq, Clone, Debug)]
#[sea_orm(rs_type = "i32", db_type = "Integer")]
pub enum PackageState {
#[sea_orm(num_value = 0)]
PendingCommit,
#[sea_orm(num_value = 1)]
Committed,
#[sea_orm(num_value = 2)]
PendingDeletion,
}
#[derive(EnumIter, DeriveActiveEnum, Deserialize, Serialize, PartialEq, Eq, Clone, Debug)]
#[sea_orm(rs_type = "i32", db_type = "Integer")]
pub enum RepoType {
#[sea_orm(num_value = 0)]
Regular,
#[sea_orm(num_value = 1)]
FullMirror,
}

View File

@ -5,6 +5,6 @@ pub mod write;
pub use archive::{ pub use archive::{
Entry, ExtractOption, ExtractOptions, Handle, ReadCompression, ReadFilter, ReadFormat, Entry, ExtractOption, ExtractOptions, Handle, ReadCompression, ReadFilter, ReadFormat,
WriteFilter, WriteFormat, WriteFilter, WriteFormat, FileType,
}; };
pub use error::Result; pub use error::Result;

View File

@ -6,9 +6,10 @@ pub use builder::Builder;
use crate::archive::Handle; use crate::archive::Handle;
use crate::ReadFilter; use crate::ReadFilter;
use entries::Entries; pub use entries::{Entries, ReadEntry};
use libarchive3_sys::ffi; use libarchive3_sys::ffi;
use std::path::Path; use std::path::Path;
pub use file::FileReader;
// Represents a read view of an archive // Represents a read view of an archive
pub trait Archive: Handle + Sized { pub trait Archive: Handle + Sized {

View File

@ -0,0 +1,20 @@
[package]
name = "migration"
version = "0.1.0"
edition = "2021"
publish = false
[lib]
name = "migration"
path = "src/lib.rs"
[dependencies]
async-std = { version = "1", features = ["attributes", "tokio1"] }
[dependencies.sea-orm-migration]
version = "0.12.0"
features = [
"sqlx-postgres",
"sqlx-sqlite",
"runtime-tokio-rustls",
]

View File

@ -0,0 +1,41 @@
# Running Migrator CLI
- Generate a new migration file
```sh
cargo run -- generate MIGRATION_NAME
```
- Apply all pending migrations
```sh
cargo run
```
```sh
cargo run -- up
```
- Apply first 10 pending migrations
```sh
cargo run -- up -n 10
```
- Rollback last applied migrations
```sh
cargo run -- down
```
- Rollback last 10 applied migrations
```sh
cargo run -- down -n 10
```
- Drop all tables from the database, then reapply all migrations
```sh
cargo run -- fresh
```
- Rollback all applied migrations, then reapply all migrations
```sh
cargo run -- refresh
```
- Rollback all applied migrations
```sh
cargo run -- reset
```
- Check the status of all migrations
```sh
cargo run -- status
```

View File

@ -0,0 +1,16 @@
pub use sea_orm_migration::prelude::*;
mod m20230730_000001_create_repo_tables;
mod m20240716_184104_repo_mirrors;
pub struct Migrator;
#[async_trait::async_trait]
impl MigratorTrait for Migrator {
fn migrations() -> Vec<Box<dyn MigrationTrait>> {
vec![
Box::new(m20230730_000001_create_repo_tables::Migration),
Box::new(m20240716_184104_repo_mirrors::Migration),
]
}
}

View File

@ -0,0 +1,74 @@
use sea_orm_migration::prelude::*;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(Repo::Table)
.add_column(
ColumnDef::new(Repo::Type)
.integer()
.not_null()
.default(0),
)
.to_owned(),
)
.await?;
manager
.create_table(
Table::create()
.table(RepoMirror::Table)
.col(
ColumnDef::new(RepoMirror::Id)
.integer()
.not_null()
.auto_increment()
.primary_key()
)
.col(ColumnDef::new(RepoMirror::RepoId).integer().not_null())
.col(ColumnDef::new(RepoMirror::Url).string().not_null())
.foreign_key(
ForeignKey::create()
.name("fk-repo_mirror-repo-id")
.from(RepoMirror::Table, RepoMirror::RepoId)
.to(Repo::Table, Repo::Id)
.on_delete(ForeignKeyAction::Cascade),
)
.to_owned(),
)
.await?;
Ok(())
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_table(Table::drop().table(RepoMirror::Table).to_owned())
.await?;
manager
.alter_table(Table::alter().drop_column(Repo::Type).to_owned())
.await?;
Ok(())
}
}
#[derive(Iden)]
pub enum Repo {
Table,
Id,
Type,
}
#[derive(Iden)]
pub enum RepoMirror {
Table,
Id,
RepoId,
Url,
}

View File

@ -0,0 +1,6 @@
use sea_orm_migration::prelude::*;
#[async_std::main]
async fn main() {
cli::run_cli(migration::Migrator).await;
}

View File

@ -15,7 +15,6 @@ futures = "0.3.28"
http-body-util = "0.1.1" http-body-util = "0.1.1"
libarchive = { path = "../libarchive" } libarchive = { path = "../libarchive" }
regex = "1.10.5" regex = "1.10.5"
sea-orm-migration = "0.12.1"
sea-query = { version = "0.30.7", features = ["backend-postgres", "backend-sqlite"] } sea-query = { version = "0.30.7", features = ["backend-postgres", "backend-sqlite"] }
serde = { version = "1.0.178", features = ["derive"] } serde = { version = "1.0.178", features = ["derive"] }
sha256 = "1.1.4" sha256 = "1.1.4"
@ -26,6 +25,9 @@ tower-http = { version = "0.5.2", features = ["fs", "trace", "auth"] }
tracing = "0.1.37" tracing = "0.1.37"
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
uuid = { version = "1.4.0", features = ["v4"] } uuid = { version = "1.4.0", features = ["v4"] }
migration = { path = "../migration" }
entity = { path = "../entity" }
reqwest = { version = "0.12.5", features = ["stream"] }
[dependencies.sea-orm] [dependencies.sea-orm]
version = "0.12.1" version = "0.12.1"

View File

@ -1,7 +1,10 @@
api_key = "test" api_key = "test"
pkg_workers = 2
log_level = "rieterd=debug" log_level = "rieterd=debug"
[repo]
sync_workers = 2
async_workers = 1
[fs] [fs]
type = "local" type = "local"
data_dir = "./data" data_dir = "./data"

View File

@ -36,6 +36,14 @@ pub enum DbConfig {
}, },
} }
#[derive(Deserialize, Debug, Clone)]
pub struct RepoConfig {
#[serde(default = "default_repo_sync_workers")]
pub sync_workers: u32,
#[serde(default = "default_repo_async_workers")]
pub async_workers: u32,
}
#[derive(Deserialize, Debug, Clone)] #[derive(Deserialize, Debug, Clone)]
pub struct Config { pub struct Config {
pub api_key: String, pub api_key: String,
@ -47,8 +55,7 @@ pub struct Config {
pub log_level: String, pub log_level: String,
pub fs: FsConfig, pub fs: FsConfig,
pub db: DbConfig, pub db: DbConfig,
#[serde(default = "default_pkg_workers")] pub repo: RepoConfig,
pub pkg_workers: u32,
} }
impl Config { impl Config {
@ -83,6 +90,10 @@ fn default_db_postgres_max_connections() -> u32 {
16 16
} }
fn default_pkg_workers() -> u32 { fn default_repo_sync_workers() -> u32 {
1
}
fn default_repo_async_workers() -> u32 {
1 1
} }

View File

@ -1,12 +0,0 @@
use sea_orm_migration::prelude::*;
pub struct Migrator;
mod m20230730_000001_create_repo_tables;
#[async_trait::async_trait]
impl MigratorTrait for Migrator {
fn migrations() -> Vec<Box<dyn MigrationTrait>> {
vec![Box::new(m20230730_000001_create_repo_tables::Migration)]
}
}

View File

@ -1,58 +1,81 @@
pub mod entities;
mod migrator;
pub mod query; pub mod query;
use crate::config::DbConfig; use crate::config::DbConfig;
pub use entities::{prelude::*, *}; use sea_orm::{
pub use migrator::Migrator; ActiveModelTrait,
ActiveValue::{NotSet, Set},
use sea_orm::{ConnectionTrait, Database, DbConn, DeriveActiveEnum, EnumIter}; ConnectionTrait, Database, DbConn, EntityTrait, TransactionTrait,
};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
type Result<T> = std::result::Result<T, sea_orm::DbErr>; type Result<T> = std::result::Result<T, sea_orm::DbErr>;
#[derive(EnumIter, DeriveActiveEnum, Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
#[sea_orm(rs_type = "i32", db_type = "Integer")]
#[serde(rename_all = "lowercase")]
pub enum PackageRelatedEnum {
#[sea_orm(num_value = 0)]
Conflicts,
#[sea_orm(num_value = 1)]
Replaces,
#[sea_orm(num_value = 2)]
Provides,
#[sea_orm(num_value = 3)]
Depend,
#[sea_orm(num_value = 4)]
Makedepend,
#[sea_orm(num_value = 5)]
Checkdepend,
#[sea_orm(num_value = 6)]
Optdepend,
}
#[derive(EnumIter, DeriveActiveEnum, Deserialize, Serialize, PartialEq, Eq, Clone, Debug)]
#[sea_orm(rs_type = "i32", db_type = "Integer")]
pub enum PackageState {
#[sea_orm(num_value = 0)]
PendingCommit,
#[sea_orm(num_value = 1)]
Committed,
#[sea_orm(num_value = 2)]
PendingDeletion,
}
#[derive(Serialize)] #[derive(Serialize)]
pub struct FullPackage { pub struct FullPackage {
#[serde(flatten)] #[serde(flatten)]
entry: package::Model, entry: entity::package::Model,
licenses: Vec<String>, licenses: Vec<String>,
groups: Vec<String>, groups: Vec<String>,
related: Vec<(PackageRelatedEnum, String)>, related: Vec<(entity::PackageRelatedEnum, String)>,
files: Vec<String>, files: Vec<String>,
} }
#[derive(Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum RepoType {
Regular,
FullMirror { mirrors: Vec<String> },
}
#[derive(Serialize, Deserialize)]
pub struct NewRepo {
distro_id: i32,
name: String,
description: Option<String>,
#[serde(flatten)]
r#type: RepoType,
}
impl NewRepo {
pub async fn insert(self, conn: &DbConn) -> crate::Result<entity::repo::Model> {
let txn = conn.begin().await?;
let repo_type = match self.r#type {
RepoType::Regular => entity::RepoType::Regular,
RepoType::FullMirror { .. } => entity::RepoType::FullMirror,
};
let repo = entity::repo::ActiveModel {
id: NotSet,
distro_id: Set(self.distro_id),
name: Set(self.name),
description: Set(self.description),
r#type: Set(repo_type),
};
let model = repo.insert(conn).await?;
match self.r#type {
RepoType::Regular => {}
RepoType::FullMirror { mirrors } => {
entity::RepoMirror::insert_many(mirrors.into_iter().map(|url| {
entity::repo_mirror::ActiveModel {
id: NotSet,
repo_id: Set(model.id),
url: Set(url),
}
}))
.on_empty_do_nothing()
.exec(&txn)
.await?;
}
}
txn.commit().await?;
Ok(model)
}
}
pub async fn connect(conn: &DbConfig) -> crate::Result<DbConn> { pub async fn connect(conn: &DbConfig) -> crate::Result<DbConn> {
match conn { match conn {
DbConfig::Sqlite { DbConfig::Sqlite {

View File

@ -1,6 +1,11 @@
use crate::db::*; use crate::db::Result;
use sea_orm::{sea_query::IntoCondition, *}; use entity::{distro, prelude::Distro};
use sea_orm::{
sea_query::IntoCondition, ActiveModelTrait, ColumnTrait, Condition, DbConn, EntityTrait,
NotSet, PaginatorTrait, QueryFilter, QueryOrder, Set,
};
use serde::Deserialize;
#[derive(Deserialize)] #[derive(Deserialize)]
pub struct Filter { pub struct Filter {
@ -21,7 +26,7 @@ pub async fn page(
per_page: u64, per_page: u64,
page: u64, page: u64,
filter: Filter, filter: Filter,
) -> Result<Vec<distro::Model>> { ) -> Result<Vec<entity::distro::Model>> {
let paginator = Distro::find() let paginator = Distro::find()
.filter(filter) .filter(filter)
.order_by_asc(distro::Column::Id) .order_by_asc(distro::Column::Id)

View File

@ -1,7 +1,16 @@
use crate::db::{self, *}; use crate::db::{FullPackage, Result};
use sea_orm::{sea_query::IntoCondition, *}; use entity::{
use sea_query::{Alias, Expr, Query, SelectStatement}; package, package_file, package_group, package_license, package_related,
prelude::{Package, PackageFile, PackageGroup, PackageLicense, PackageRelated},
PackageRelatedEnum, PackageState,
};
use sea_orm::{
sea_query::IntoCondition, ActiveModelTrait, ColumnTrait, Condition, ConnectionTrait, DbConn,
DeleteResult, EntityTrait, FromQueryResult, Iterable, ModelTrait, NotSet, PaginatorTrait,
QueryFilter, QuerySelect, SelectModel, SelectorRaw, Set, TransactionTrait,
};
use sea_query::{Alias, Expr, JoinType, Order, Query, SelectStatement};
use serde::Deserialize; use serde::Deserialize;
/// How many fields may be inserted at once into the database. /// How many fields may be inserted at once into the database.
@ -31,22 +40,22 @@ pub async fn page(
) -> crate::Result<Vec<package::Model>> { ) -> crate::Result<Vec<package::Model>> {
let p2 = Alias::new("p2"); let p2 = Alias::new("p2");
let query = Query::select() let query = Query::select()
.columns(db::package::Column::iter().map(|c| (db::package::Entity, c))) .columns(package::Column::iter().map(|c| (package::Entity, c)))
.from(db::package::Entity) .from(package::Entity)
.join_subquery( .join_subquery(
JoinType::InnerJoin, JoinType::InnerJoin,
max_pkg_ids_query(true), max_pkg_ids_query(true),
p2.clone(), p2.clone(),
Expr::col((db::package::Entity, db::package::Column::Id)) Expr::col((package::Entity, package::Column::Id))
.eq(Expr::col((p2.clone(), Alias::new("max_id")))), .eq(Expr::col((p2.clone(), Alias::new("max_id")))),
) )
.cond_where(filter) .cond_where(filter)
.order_by((db::package::Entity, db::package::Column::Id), Order::Asc) .order_by((package::Entity, package::Column::Id), Order::Asc)
.to_owned(); .to_owned();
let builder = conn.get_database_backend(); let builder = conn.get_database_backend();
let sql = builder.build(&query); let sql = builder.build(&query);
Ok(db::Package::find() Ok(Package::find()
.from_raw_sql(sql) .from_raw_sql(sql)
.paginate(conn, per_page) .paginate(conn, per_page)
.fetch_page(page) .fetch_page(page)
@ -213,7 +222,7 @@ pub async fn full(conn: &DbConn, id: i32) -> Result<Option<FullPackage>> {
.into_tuple() .into_tuple()
.all(conn) .all(conn)
.await?; .await?;
let related: Vec<(db::PackageRelatedEnum, String)> = entry let related: Vec<(PackageRelatedEnum, String)> = entry
.find_related(PackageRelated) .find_related(PackageRelated)
.select_only() .select_only()
.columns([package_related::Column::Type, package_related::Column::Name]) .columns([package_related::Column::Type, package_related::Column::Name])
@ -248,22 +257,22 @@ pub struct PkgToRemove {
fn max_pkg_ids_query(committed: bool) -> SelectStatement { fn max_pkg_ids_query(committed: bool) -> SelectStatement {
let mut query = Query::select() let mut query = Query::select()
.from(db::package::Entity) .from(package::Entity)
.columns([ .columns([
db::package::Column::RepoId, package::Column::RepoId,
db::package::Column::Arch, package::Column::Arch,
db::package::Column::Name, package::Column::Name,
]) ])
.expr_as(db::package::Column::Id.max(), Alias::new("max_id")) .expr_as(package::Column::Id.max(), Alias::new("max_id"))
.group_by_columns([ .group_by_columns([
db::package::Column::RepoId, package::Column::RepoId,
db::package::Column::Arch, package::Column::Arch,
db::package::Column::Name, package::Column::Name,
]) ])
.to_owned(); .to_owned();
if committed { if committed {
query.cond_where(db::package::Column::State.eq(db::PackageState::Committed)); query.cond_where(package::Column::State.eq(PackageState::Committed));
} }
query query
@ -278,47 +287,44 @@ pub fn pkgs_to_sync(
) -> SelectorRaw<SelectModel<package::Model>> { ) -> SelectorRaw<SelectModel<package::Model>> {
let (p1, p2) = (Alias::new("p1"), Alias::new("p2")); let (p1, p2) = (Alias::new("p1"), Alias::new("p2"));
let query = Query::select() let query = Query::select()
.columns(db::package::Column::iter().map(|c| (p1.clone(), c))) .columns(package::Column::iter().map(|c| (p1.clone(), c)))
.from_as(db::package::Entity, p1.clone()) .from_as(package::Entity, p1.clone())
.join_subquery( .join_subquery(
JoinType::InnerJoin, JoinType::InnerJoin,
max_pkg_ids_query(false), max_pkg_ids_query(false),
p2.clone(), p2.clone(),
Expr::col((p1.clone(), db::package::Column::Id)) Expr::col((p1.clone(), package::Column::Id))
.eq(Expr::col((p2.clone(), Alias::new("max_id")))), .eq(Expr::col((p2.clone(), Alias::new("max_id")))),
) )
.cond_where( .cond_where(
Condition::all() Condition::all()
.add(Expr::col((p1.clone(), db::package::Column::RepoId)).eq(repo)) .add(Expr::col((p1.clone(), package::Column::RepoId)).eq(repo))
.add(Expr::col((p1.clone(), package::Column::Arch)).is_in([arch, crate::ANY_ARCH]))
.add( .add(
Expr::col((p1.clone(), db::package::Column::Arch)) Expr::col((p1.clone(), package::Column::State))
.is_in([arch, crate::ANY_ARCH]), .ne(PackageState::PendingDeletion),
)
.add(
Expr::col((p1.clone(), db::package::Column::State))
.ne(db::PackageState::PendingDeletion),
), ),
) )
.to_owned(); .to_owned();
let builder = conn.get_database_backend(); let builder = conn.get_database_backend();
let sql = builder.build(&query); let sql = builder.build(&query);
db::Package::find().from_raw_sql(sql) Package::find().from_raw_sql(sql)
} }
fn stale_pkgs_query(include_repo: bool) -> SelectStatement { fn stale_pkgs_query(include_repo: bool) -> SelectStatement {
let (p1, p2) = (Alias::new("p1"), Alias::new("p2")); let (p1, p2) = (Alias::new("p1"), Alias::new("p2"));
let mut query = Query::select() let mut query = Query::select()
.from_as(db::package::Entity, p1.clone()) .from_as(package::Entity, p1.clone())
.to_owned(); .to_owned();
if include_repo { if include_repo {
query.columns([ query.columns([
(p1.clone(), db::package::Column::RepoId), (p1.clone(), package::Column::RepoId),
(p1.clone(), db::package::Column::Id), (p1.clone(), package::Column::Id),
]); ]);
} else { } else {
query.column((p1.clone(), db::package::Column::Id)); query.column((p1.clone(), package::Column::Id));
} }
// We left join on the max pkgs query because a repository that has all its packages set to // We left join on the max pkgs query because a repository that has all its packages set to
@ -331,27 +337,27 @@ fn stale_pkgs_query(include_repo: bool) -> SelectStatement {
p2.clone(), p2.clone(),
Condition::all() Condition::all()
.add( .add(
Expr::col((p1.clone(), db::package::Column::RepoId)) Expr::col((p1.clone(), package::Column::RepoId))
.eq(Expr::col((p2.clone(), db::package::Column::RepoId))), .eq(Expr::col((p2.clone(), package::Column::RepoId))),
) )
.add( .add(
Expr::col((p1.clone(), db::package::Column::Arch)) Expr::col((p1.clone(), package::Column::Arch))
.eq(Expr::col((p2.clone(), db::package::Column::Arch))), .eq(Expr::col((p2.clone(), package::Column::Arch))),
) )
.add( .add(
Expr::col((p1.clone(), db::package::Column::Name)) Expr::col((p1.clone(), package::Column::Name))
.eq(Expr::col((p2.clone(), db::package::Column::Name))), .eq(Expr::col((p2.clone(), package::Column::Name))),
), ),
) )
.cond_where( .cond_where(
Condition::any() Condition::any()
.add( .add(
Expr::col((p1.clone(), db::package::Column::Id)) Expr::col((p1.clone(), package::Column::Id))
.lt(Expr::col((p2.clone(), Alias::new("max_id")))), .lt(Expr::col((p2.clone(), Alias::new("max_id")))),
) )
.add( .add(
Expr::col((p1.clone(), db::package::Column::State)) Expr::col((p1.clone(), package::Column::State))
.eq(db::PackageState::PendingDeletion), .eq(PackageState::PendingDeletion),
), ),
); );
@ -367,9 +373,9 @@ pub fn stale_pkgs(conn: &DbConn) -> SelectorRaw<SelectModel<PkgToRemove>> {
} }
pub async fn delete_stale_pkgs(conn: &DbConn, max_id: i32) -> crate::Result<()> { pub async fn delete_stale_pkgs(conn: &DbConn, max_id: i32) -> crate::Result<()> {
Ok(db::Package::delete_many() Ok(Package::delete_many()
.filter(db::package::Column::Id.lte(max_id)) .filter(package::Column::Id.lte(max_id))
.filter(db::package::Column::Id.in_subquery(stale_pkgs_query(false))) .filter(package::Column::Id.in_subquery(stale_pkgs_query(false)))
.exec(conn) .exec(conn)
.await .await
.map(|_| ())?) .map(|_| ())?)

View File

@ -1,6 +1,11 @@
use crate::db::*; use crate::db::Result;
use sea_orm::{sea_query::IntoCondition, *}; use entity::{prelude::Repo, repo};
use sea_orm::{
sea_query::IntoCondition, ActiveModelTrait, ColumnTrait, Condition, DbConn, EntityTrait,
NotSet, PaginatorTrait, QueryFilter, QueryOrder, Set,
};
use serde::Deserialize;
#[derive(Deserialize)] #[derive(Deserialize)]
pub struct Filter { pub struct Filter {
@ -47,12 +52,14 @@ pub async fn insert(
distro_id: i32, distro_id: i32,
name: &str, name: &str,
description: Option<&str>, description: Option<&str>,
r#type: entity::RepoType,
) -> Result<repo::Model> { ) -> Result<repo::Model> {
let model = repo::ActiveModel { let model = repo::ActiveModel {
id: NotSet, id: NotSet,
distro_id: Set(distro_id), distro_id: Set(distro_id),
name: Set(String::from(name)), name: Set(String::from(name)),
description: Set(description.map(String::from)), description: Set(description.map(String::from)),
r#type: Set(r#type),
}; };
model.insert(conn).await model.insert(conn).await

View File

@ -15,6 +15,7 @@ pub enum ServerError {
Status(StatusCode), Status(StatusCode),
Archive(libarchive::error::ArchiveError), Archive(libarchive::error::ArchiveError),
Figment(figment::Error), Figment(figment::Error),
Reqwest(reqwest::Error),
Unit, Unit,
} }
@ -27,6 +28,7 @@ impl fmt::Display for ServerError {
ServerError::Db(err) => write!(fmt, "{}", err), ServerError::Db(err) => write!(fmt, "{}", err),
ServerError::Archive(err) => write!(fmt, "{}", err), ServerError::Archive(err) => write!(fmt, "{}", err),
ServerError::Figment(err) => write!(fmt, "{}", err), ServerError::Figment(err) => write!(fmt, "{}", err),
ServerError::Reqwest(err) => write!(fmt, "{}", err),
ServerError::Unit => Ok(()), ServerError::Unit => Ok(()),
} }
} }
@ -48,6 +50,7 @@ impl IntoResponse for ServerError {
ServerError::Db(_) ServerError::Db(_)
| ServerError::Archive(_) | ServerError::Archive(_)
| ServerError::Figment(_) | ServerError::Figment(_)
| ServerError::Reqwest(_)
| ServerError::Unit => StatusCode::INTERNAL_SERVER_ERROR.into_response(), | ServerError::Unit => StatusCode::INTERNAL_SERVER_ERROR.into_response(),
} }
} }
@ -94,3 +97,9 @@ impl From<figment::Error> for ServerError {
ServerError::Figment(err) ServerError::Figment(err)
} }
} }
impl From<reqwest::Error> for ServerError {
fn from(err: reqwest::Error) -> Self {
ServerError::Reqwest(err)
}
}

View File

@ -12,7 +12,7 @@ pub use error::{Result, ServerError};
use std::{io, path::PathBuf}; use std::{io, path::PathBuf};
use clap::Parser; use clap::Parser;
use sea_orm_migration::MigratorTrait; use migration::MigratorTrait;
use tokio::runtime; use tokio::runtime;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
@ -52,7 +52,7 @@ fn setup(rt: &runtime::Handle, config_file: PathBuf) -> crate::Result<Global> {
tracing::info!("Connecting to database"); tracing::info!("Connecting to database");
let db = rt.block_on(crate::db::connect(&config.db))?; let db = rt.block_on(crate::db::connect(&config.db))?;
rt.block_on(crate::db::Migrator::up(&db, None))?; rt.block_on(migration::Migrator::up(&db, None))?;
let repo = match &config.fs { let repo = match &config.fs {
FsConfig::Local { data_dir } => { FsConfig::Local { data_dir } => {
@ -60,7 +60,8 @@ fn setup(rt: &runtime::Handle, config_file: PathBuf) -> crate::Result<Global> {
data_dir.join("repos"), data_dir.join("repos"),
db.clone(), db.clone(),
rt.clone(), rt.clone(),
config.pkg_workers, config.repo.sync_workers,
config.repo.async_workers,
)? )?
//rt.block_on(crate::repo::RepoMgr::new( //rt.block_on(crate::repo::RepoMgr::new(
// data_dir.join("repos"), // data_dir.join("repos"),

View File

@ -0,0 +1,64 @@
use crate::{
db,
repo::{archive, package, AsyncCommand, SharedState},
};
use std::{
io,
path::PathBuf,
sync::{atomic::Ordering, Arc},
};
use uuid::Uuid;
use futures::StreamExt;
use tokio::io::AsyncWriteExt;
pub struct AsyncActor {
state: Arc<SharedState>,
}
impl AsyncActor {
pub fn new(state: &Arc<SharedState>) -> Self {
Self {
state: Arc::clone(state)
}
}
pub fn random_file_paths<const C: usize>(&self) -> [PathBuf; C] {
std::array::from_fn(|_| {
let uuid: uuid::fmt::Simple = Uuid::new_v4().into();
self.state.repos_dir.join(uuid.to_string())
})
}
pub async fn run(self) {
while let Some(msg) = {
let mut rx = self.state.async_queue.1.lock().await;
rx.recv().await
} {
match msg {
AsyncCommand::DownloadDbArchive(repo_id, url) => {
match self.download_file(url).await {
Ok(path) => todo!("schedule parse db archive"),
Err(err) => {
todo!("reschedule unless max retries reached")
},
}
}
}
}
}
pub async fn download_file(&self, url: reqwest::Url) -> crate::Result<PathBuf> {
let [path] = self.random_file_paths();
let mut stream = self.state.client.get(url).send().await?.bytes_stream();
let mut f = tokio::fs::File::create(&path).await?;
while let Some(chunk) = stream.next().await.transpose()? {
f.write_all(&chunk).await?;
}
Ok(path)
}
}

View File

@ -0,0 +1,5 @@
mod sync;
mod r#async;
pub use sync::Actor;
pub use r#async::AsyncActor;

View File

@ -1,5 +1,7 @@
use super::{archive, package, Command, SharedState}; use crate::{
use crate::db; db,
repo::{archive, package, Command, SharedState},
};
use std::{ use std::{
path::PathBuf, path::PathBuf,
@ -20,10 +22,10 @@ pub struct Actor {
} }
impl Actor { impl Actor {
pub fn new(rt: runtime::Handle, state: Arc<SharedState>) -> Self { pub fn new(rt: runtime::Handle, state: &Arc<SharedState>) -> Self {
Self { Self {
rt, rt,
state: Arc::clone(&state), state: Arc::clone(state),
} }
} }
@ -37,7 +39,7 @@ impl Actor {
/// Run the main actor loop /// Run the main actor loop
pub fn run(self) { pub fn run(self) {
while let Some(msg) = { while let Some(msg) = {
let mut rx = self.state.rx.lock().unwrap(); let mut rx = self.state.sync_queue.1.lock().unwrap();
rx.blocking_recv() rx.blocking_recv()
} { } {
match msg { match msg {
@ -99,10 +101,10 @@ impl Actor {
if let Some(_guard) = repos.get(&repo).map(|n| n.1.lock()) { if let Some(_guard) = repos.get(&repo).map(|n| n.1.lock()) {
let archs: Vec<String> = self.rt.block_on( let archs: Vec<String> = self.rt.block_on(
db::Package::find() entity::Package::find()
.filter(db::package::Column::RepoId.eq(repo)) .filter(entity::package::Column::RepoId.eq(repo))
.select_only() .select_only()
.column(db::package::Column::Arch) .column(entity::package::Column::Arch)
.distinct() .distinct()
.into_tuple() .into_tuple()
.all(&self.state.conn), .all(&self.state.conn),
@ -171,12 +173,12 @@ impl Actor {
// Update the state for the newly committed packages // Update the state for the newly committed packages
self.rt.block_on( self.rt.block_on(
db::Package::update_many() entity::Package::update_many()
.col_expr( .col_expr(
db::package::Column::State, entity::package::Column::State,
Expr::value(db::PackageState::Committed), Expr::value(entity::PackageState::Committed),
) )
.filter(db::package::Column::Id.is_in(committed_ids)) .filter(entity::package::Column::Id.is_in(committed_ids))
.exec(&self.state.conn), .exec(&self.state.conn),
)?; )?;

View File

@ -1,4 +1,3 @@
use crate::db;
use std::{ use std::{
io::Write, io::Write,
path::{Path, PathBuf}, path::{Path, PathBuf},
@ -69,7 +68,7 @@ impl RepoArchivesWriter {
Ok(ar.append_path(&mut ar_entry, src_path)?) Ok(ar.append_path(&mut ar_entry, src_path)?)
} }
pub fn append_pkg(&mut self, pkg: &db::package::Model) -> crate::Result<()> { pub fn append_pkg(&mut self, pkg: &entity::package::Model) -> crate::Result<()> {
self.write_desc(&self.tmp_paths[0], pkg)?; self.write_desc(&self.tmp_paths[0], pkg)?;
self.write_files(&self.tmp_paths[1], pkg)?; self.write_files(&self.tmp_paths[1], pkg)?;
@ -85,7 +84,11 @@ impl RepoArchivesWriter {
} }
/// Generate a "files" archive entry for the package in the given path /// Generate a "files" archive entry for the package in the given path
fn write_files(&self, path: impl AsRef<Path>, pkg: &db::package::Model) -> crate::Result<()> { fn write_files(
&self,
path: impl AsRef<Path>,
pkg: &entity::package::Model,
) -> crate::Result<()> {
let mut f = std::io::BufWriter::new(std::fs::File::create(path)?); let mut f = std::io::BufWriter::new(std::fs::File::create(path)?);
writeln!(f, "%FILES%")?; writeln!(f, "%FILES%")?;
@ -93,7 +96,7 @@ impl RepoArchivesWriter {
let (tx, mut rx) = mpsc::channel(1); let (tx, mut rx) = mpsc::channel(1);
let conn = self.conn.clone(); let conn = self.conn.clone();
let query = pkg.find_related(db::PackageFile); let query = pkg.find_related(entity::prelude::PackageFile);
self.rt.spawn(async move { self.rt.spawn(async move {
match query.stream(&conn).await { match query.stream(&conn).await {
@ -121,7 +124,11 @@ impl RepoArchivesWriter {
Ok(()) Ok(())
} }
fn write_desc(&self, path: impl AsRef<Path>, pkg: &db::package::Model) -> crate::Result<()> { fn write_desc(
&self,
path: impl AsRef<Path>,
pkg: &entity::package::Model,
) -> crate::Result<()> {
let mut f = std::io::BufWriter::new(std::fs::File::create(path)?); let mut f = std::io::BufWriter::new(std::fs::File::create(path)?);
let filename = format!( let filename = format!(
@ -147,9 +154,9 @@ impl RepoArchivesWriter {
} }
let groups: Vec<String> = self.rt.block_on( let groups: Vec<String> = self.rt.block_on(
pkg.find_related(db::PackageGroup) pkg.find_related(entity::prelude::PackageGroup)
.select_only() .select_only()
.column(db::package_group::Column::Name) .column(entity::package_group::Column::Name)
.into_tuple() .into_tuple()
.all(&self.conn), .all(&self.conn),
)?; )?;
@ -165,9 +172,9 @@ impl RepoArchivesWriter {
} }
let licenses: Vec<String> = self.rt.block_on( let licenses: Vec<String> = self.rt.block_on(
pkg.find_related(db::PackageLicense) pkg.find_related(entity::prelude::PackageLicense)
.select_only() .select_only()
.column(db::package_license::Column::Name) .column(entity::package_license::Column::Name)
.into_tuple() .into_tuple()
.all(&self.conn), .all(&self.conn),
)?; )?;
@ -186,21 +193,21 @@ impl RepoArchivesWriter {
} }
let related = [ let related = [
("REPLACES", db::PackageRelatedEnum::Replaces), ("REPLACES", entity::PackageRelatedEnum::Replaces),
("CONFLICTS", db::PackageRelatedEnum::Conflicts), ("CONFLICTS", entity::PackageRelatedEnum::Conflicts),
("PROVIDES", db::PackageRelatedEnum::Provides), ("PROVIDES", entity::PackageRelatedEnum::Provides),
("DEPENDS", db::PackageRelatedEnum::Depend), ("DEPENDS", entity::PackageRelatedEnum::Depend),
("OPTDEPENDS", db::PackageRelatedEnum::Optdepend), ("OPTDEPENDS", entity::PackageRelatedEnum::Optdepend),
("MAKEDEPENDS", db::PackageRelatedEnum::Makedepend), ("MAKEDEPENDS", entity::PackageRelatedEnum::Makedepend),
("CHECKDEPENDS", db::PackageRelatedEnum::Checkdepend), ("CHECKDEPENDS", entity::PackageRelatedEnum::Checkdepend),
]; ];
for (key, attr) in related.into_iter() { for (key, attr) in related.into_iter() {
let items: Vec<String> = self.rt.block_on( let items: Vec<String> = self.rt.block_on(
pkg.find_related(db::PackageRelated) pkg.find_related(entity::prelude::PackageRelated)
.filter(db::package_related::Column::Type.eq(attr)) .filter(entity::package_related::Column::Type.eq(attr))
.select_only() .select_only()
.column(db::package_related::Column::Name) .column(entity::package_related::Column::Name)
.into_tuple() .into_tuple()
.all(&self.conn), .all(&self.conn),
)?; )?;

View File

@ -1,5 +1,4 @@
use super::{Command, SharedState}; use super::{Command, SharedState};
use crate::db;
use std::{ use std::{
path::PathBuf, path::PathBuf,
@ -31,13 +30,24 @@ impl Handle {
}) })
} }
pub async fn get_or_create_repo(&self, distro: &str, repo: &str) -> crate::Result<i32> { pub async fn register_repo(&self, repo_id: i32) -> crate::Result<()> {
let mut repos = self.state.repos.write().await; let repo_dir = self.state.repos_dir.join(repo_id.to_string());
let distro_id: Option<i32> = db::Distro::find() if !tokio::fs::try_exists(repo_dir).await? {
.filter(db::distro::Column::Name.eq(distro)) tokio::fs::create_dir(self.state.repos_dir.join(repo_id.to_string())).await?;
}
let mut repos = self.state.repos.write().await;
repos.insert(repo_id, Default::default());
Ok(())
}
pub async fn get_or_create_repo(&self, distro: &str, repo: &str) -> crate::Result<i32> {
let distro_id: Option<i32> = entity::Distro::find()
.filter(entity::distro::Column::Name.eq(distro))
.select_only() .select_only()
.column(db::distro::Column::Id) .column(entity::distro::Column::Id)
.into_tuple() .into_tuple()
.one(&self.state.conn) .one(&self.state.conn)
.await?; .await?;
@ -45,7 +55,7 @@ impl Handle {
let distro_id = if let Some(id) = distro_id { let distro_id = if let Some(id) = distro_id {
id id
} else { } else {
let new_distro = db::distro::ActiveModel { let new_distro = entity::distro::ActiveModel {
id: NotSet, id: NotSet,
name: Set(distro.to_string()), name: Set(distro.to_string()),
description: NotSet, description: NotSet,
@ -54,11 +64,11 @@ impl Handle {
new_distro.insert(&self.state.conn).await?.id new_distro.insert(&self.state.conn).await?.id
}; };
let repo_id: Option<i32> = db::Repo::find() let repo_id: Option<i32> = entity::Repo::find()
.filter(db::repo::Column::DistroId.eq(distro_id)) .filter(entity::repo::Column::DistroId.eq(distro_id))
.filter(db::repo::Column::Name.eq(repo)) .filter(entity::repo::Column::Name.eq(repo))
.select_only() .select_only()
.column(db::repo::Column::Id) .column(entity::repo::Column::Id)
.into_tuple() .into_tuple()
.one(&self.state.conn) .one(&self.state.conn)
.await?; .await?;
@ -66,16 +76,16 @@ impl Handle {
let repo_id = if let Some(id) = repo_id { let repo_id = if let Some(id) = repo_id {
id id
} else { } else {
let new_repo = db::repo::ActiveModel { let new_repo = entity::repo::ActiveModel {
id: NotSet, id: NotSet,
distro_id: Set(distro_id), distro_id: Set(distro_id),
name: Set(repo.to_string()), name: Set(repo.to_string()),
description: NotSet, description: NotSet,
r#type: Set(entity::RepoType::Regular),
}; };
let id = new_repo.insert(&self.state.conn).await?.id; let id = new_repo.insert(&self.state.conn).await?.id;
tokio::fs::create_dir(self.state.repos_dir.join(id.to_string())).await?; self.register_repo(id).await?;
repos.insert(id, Default::default());
id id
}; };
@ -84,12 +94,12 @@ impl Handle {
} }
pub async fn get_repo(&self, distro: &str, repo: &str) -> crate::Result<Option<i32>> { pub async fn get_repo(&self, distro: &str, repo: &str) -> crate::Result<Option<i32>> {
Ok(db::Repo::find() Ok(entity::Repo::find()
.find_also_related(db::Distro) .find_also_related(entity::Distro)
.filter( .filter(
Condition::all() Condition::all()
.add(db::repo::Column::Name.eq(repo)) .add(entity::repo::Column::Name.eq(repo))
.add(db::distro::Column::Name.eq(distro)), .add(entity::distro::Column::Name.eq(distro)),
) )
.one(&self.state.conn) .one(&self.state.conn)
.await .await
@ -98,7 +108,9 @@ impl Handle {
pub async fn remove_repo(&self, repo: i32) -> crate::Result<()> { pub async fn remove_repo(&self, repo: i32) -> crate::Result<()> {
self.state.repos.write().await.remove(&repo); self.state.repos.write().await.remove(&repo);
db::Repo::delete_by_id(repo).exec(&self.state.conn).await?; entity::Repo::delete_by_id(repo)
.exec(&self.state.conn)
.await?;
let _ = tokio::fs::remove_dir_all(self.state.repos_dir.join(repo.to_string())).await; let _ = tokio::fs::remove_dir_all(self.state.repos_dir.join(repo.to_string())).await;
Ok(()) Ok(())
@ -108,15 +120,15 @@ impl Handle {
/// packages with the given architecture as "pending deletion", before performing a manual sync /// packages with the given architecture as "pending deletion", before performing a manual sync
/// & removal of stale packages. /// & removal of stale packages.
pub async fn remove_repo_arch(&self, repo: i32, arch: &str) -> crate::Result<()> { pub async fn remove_repo_arch(&self, repo: i32, arch: &str) -> crate::Result<()> {
db::Package::update_many() entity::Package::update_many()
.col_expr( .col_expr(
db::package::Column::State, entity::package::Column::State,
Expr::value(db::PackageState::PendingDeletion), Expr::value(entity::PackageState::PendingDeletion),
) )
.filter( .filter(
Condition::all() Condition::all()
.add(db::package::Column::RepoId.eq(repo)) .add(entity::package::Column::RepoId.eq(repo))
.add(db::package::Column::Arch.eq(arch)), .add(entity::package::Column::Arch.eq(arch)),
) )
.exec(&self.state.conn) .exec(&self.state.conn)
.await?; .await?;
@ -128,17 +140,25 @@ impl Handle {
} }
pub async fn queue_pkg(&self, repo: i32, path: PathBuf) { pub async fn queue_pkg(&self, repo: i32, path: PathBuf) {
self.state.tx.send(Command::ParsePkg(repo, path)).unwrap(); self.state
.sync_queue
.0
.send(Command::ParsePkg(repo, path))
.unwrap();
self.state.repos.read().await.get(&repo).inspect(|n| { self.state.repos.read().await.get(&repo).inspect(|n| {
n.0.fetch_add(1, Ordering::SeqCst); n.0.fetch_add(1, Ordering::SeqCst);
}); });
} }
async fn queue_sync(&self, repo: i32) { async fn queue_sync(&self, repo: i32) {
self.state.tx.send(Command::SyncRepo(repo)).unwrap(); self.state
.sync_queue
.0
.send(Command::SyncRepo(repo))
.unwrap();
} }
async fn queue_clean(&self) { async fn queue_clean(&self) {
self.state.tx.send(Command::Clean).unwrap(); self.state.sync_queue.0.send(Command::Clean).unwrap();
} }
} }

View File

@ -0,0 +1,3 @@
mod parser;
pub use parser::{DbArchiveEntry, DbArchiveParser};

View File

@ -0,0 +1,75 @@
use std::{
io::{self, BufRead},
path::Path,
};
use libarchive::{
read::{Archive, Builder, Entries, FileReader, ReadEntry},
Entry,
};
pub struct DbArchiveParser<'a, T: 'a + Archive> {
entries: Entries<'a, T>,
}
pub struct DbArchiveEntry {
name: String,
version: String,
filename: String,
}
impl<'a, T: Archive> DbArchiveParser<'a, T> {
pub fn new(ar: &'a mut T) -> Self {
Self {
entries: Entries::new(ar),
}
}
// parse a given entry. If the entry's not a regular file, the function returns None.
fn parse_entry(entry: ReadEntry<'a, T>) -> io::Result<DbArchiveEntry> {
let reader = io::BufReader::new(entry);
let mut lines = reader.lines();
let mut name: Option<String> = None;
let mut version: Option<String> = None;
let mut filename: Option<String> = None;
while let Some(line) = lines.next().transpose()? {
match line.as_str() {
"%NAME%" => name = lines.next().transpose()?,
"%VERSION%" => version = lines.next().transpose()?,
"%FILENAME%" => filename = lines.next().transpose()?,
_ => {}
}
}
if name.is_some() && version.is_some() && filename.is_some() {
Ok(DbArchiveEntry {
name: name.unwrap(),
version: version.unwrap(),
filename: filename.unwrap(),
})
} else {
Err(io::Error::other("Missing fields in entry file"))
}
}
}
impl<'a, T: Archive> Iterator for DbArchiveParser<'a, T> {
type Item = io::Result<DbArchiveEntry>;
fn next(&mut self) -> Option<Self::Item> {
while let Some(entry) = self.entries.next() {
match entry {
Ok(entry) => {
if entry.filetype() == libarchive::FileType::RegularFile {
return Some(Self::parse_entry(entry));
}
}
Err(err) => return Some(Err(err.into())),
}
}
None
}
}

View File

@ -1,13 +1,12 @@
mod actor; mod actor;
mod archive; mod archive;
mod handle; mod handle;
mod mirror;
pub mod package; pub mod package;
pub use actor::Actor; pub use actor::{Actor, AsyncActor};
pub use handle::Handle; pub use handle::Handle;
use crate::db;
use std::{ use std::{
collections::HashMap, collections::HashMap,
path::{Path, PathBuf}, path::{Path, PathBuf},
@ -29,30 +28,36 @@ pub enum Command {
Clean, Clean,
} }
pub enum AsyncCommand {
DownloadDbArchive(i32, reqwest::Url)
}
type RepoState = (AtomicU32, Arc<Mutex<()>>); type RepoState = (AtomicU32, Arc<Mutex<()>>);
pub struct SharedState { pub struct SharedState {
pub repos_dir: PathBuf, pub repos_dir: PathBuf,
pub conn: DbConn, pub conn: DbConn,
pub rx: Mutex<UnboundedReceiver<Command>>, pub sync_queue: (UnboundedSender<Command>, Mutex<UnboundedReceiver<Command>>),
pub tx: UnboundedSender<Command>, pub async_queue: (
UnboundedSender<AsyncCommand>,
tokio::sync::Mutex<UnboundedReceiver<AsyncCommand>>,
),
pub repos: RwLock<HashMap<i32, RepoState>>, pub repos: RwLock<HashMap<i32, RepoState>>,
pub client: reqwest::Client,
} }
impl SharedState { impl SharedState {
pub fn new( pub fn new(repos_dir: impl AsRef<Path>, conn: DbConn) -> Self {
repos_dir: impl AsRef<Path>,
conn: DbConn,
repos: HashMap<i32, (AtomicU32, Arc<Mutex<()>>)>,
) -> Self {
let (tx, rx) = unbounded_channel(); let (tx, rx) = unbounded_channel();
let (async_tx, async_rx) = unbounded_channel();
Self { Self {
repos_dir: repos_dir.as_ref().to_path_buf(), repos_dir: repos_dir.as_ref().to_path_buf(),
conn, conn,
rx: Mutex::new(rx), sync_queue: (tx, Mutex::new(rx)),
tx, async_queue: (async_tx, tokio::sync::Mutex::new(async_rx)),
repos: RwLock::new(repos), repos: RwLock::new(Default::default()),
client: reqwest::Client::new(),
} }
} }
} }
@ -61,30 +66,38 @@ pub fn start(
repos_dir: impl AsRef<Path>, repos_dir: impl AsRef<Path>,
conn: DbConn, conn: DbConn,
rt: runtime::Handle, rt: runtime::Handle,
actors: u32, sync_actors: u32,
async_actors: u32,
) -> crate::Result<Handle> { ) -> crate::Result<Handle> {
std::fs::create_dir_all(repos_dir.as_ref())?; std::fs::create_dir_all(repos_dir.as_ref())?;
let mut repos = HashMap::new();
let repo_ids: Vec<i32> = rt.block_on( let repo_ids: Vec<i32> = rt.block_on(
db::Repo::find() entity::prelude::Repo::find()
.select_only() .select_only()
.column(db::repo::Column::Id) .column(entity::repo::Column::Id)
.into_tuple() .into_tuple()
.all(&conn), .all(&conn),
)?; )?;
for id in repo_ids { let state = Arc::new(SharedState::new(repos_dir, conn));
repos.insert(id, Default::default());
}
let state = Arc::new(SharedState::new(repos_dir, conn, repos)); for _ in 0..sync_actors {
let actor = Actor::new(rt.clone(), &state);
for _ in 0..actors {
let actor = Actor::new(rt.clone(), Arc::clone(&state));
std::thread::spawn(|| actor.run()); std::thread::spawn(|| actor.run());
} }
Ok(Handle::new(&state)) for _ in 0..async_actors {
let actor = AsyncActor::new(&state);
rt.spawn(actor.run());
}
let handle = Handle::new(&state);
for id in repo_ids {
rt.block_on(handle.register_repo(id))?;
}
Ok(handle)
} }

View File

@ -1,5 +1,3 @@
use crate::db::entities::package;
use std::{ use std::{
fmt, fs, fmt, fs,
io::{self, BufRead, BufReader, Read}, io::{self, BufRead, BufReader, Read},
@ -11,7 +9,6 @@ use libarchive::{
read::{Archive, Builder}, read::{Archive, Builder},
Entry, ReadFilter, Entry, ReadFilter,
}; };
use sea_orm::ActiveValue::Set;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Package { pub struct Package {
@ -194,26 +191,3 @@ impl Package {
) )
} }
} }
impl From<Package> for package::ActiveModel {
fn from(pkg: Package) -> Self {
let info = pkg.info;
package::ActiveModel {
base: Set(info.base),
name: Set(info.name),
version: Set(info.version),
arch: Set(info.arch),
size: Set(info.size),
c_size: Set(info.csize),
description: Set(info.description),
url: Set(info.url),
build_date: Set(info.build_date),
packager: Set(info.packager),
pgp_sig: Set(info.pgpsig),
pgp_sig_size: Set(info.pgpsigsize),
sha256_sum: Set(info.sha256sum),
..Default::default()
}
}
}

View File

@ -5,13 +5,14 @@ use pagination::PaginatedResponse;
use axum::{ use axum::{
extract::{Path, Query, State}, extract::{Path, Query, State},
http::StatusCode,
routing::get, routing::get,
Json, Router, Json, Router,
}; };
pub fn router() -> Router<crate::Global> { pub fn router() -> Router<crate::Global> {
Router::new() Router::new()
.route("/repos", get(get_repos)) .route("/repos", get(get_repos).post(post_repo))
.route("/repos/:id", get(get_single_repo)) .route("/repos/:id", get(get_single_repo))
.route("/packages", get(get_packages)) .route("/packages", get(get_packages))
.route("/packages/:id", get(get_single_package)) .route("/packages/:id", get(get_single_package))
@ -21,7 +22,7 @@ async fn get_repos(
State(global): State<crate::Global>, State(global): State<crate::Global>,
Query(pagination): Query<pagination::Query>, Query(pagination): Query<pagination::Query>,
Query(filter): Query<db::query::repo::Filter>, Query(filter): Query<db::query::repo::Filter>,
) -> crate::Result<Json<PaginatedResponse<db::repo::Model>>> { ) -> crate::Result<Json<PaginatedResponse<entity::repo::Model>>> {
let items = let items =
db::query::repo::page(&global.db, pagination.per_page, pagination.page - 1, filter).await?; db::query::repo::page(&global.db, pagination.per_page, pagination.page - 1, filter).await?;
@ -31,7 +32,7 @@ async fn get_repos(
async fn get_single_repo( async fn get_single_repo(
State(global): State<crate::Global>, State(global): State<crate::Global>,
Path(id): Path<i32>, Path(id): Path<i32>,
) -> crate::Result<Json<db::repo::Model>> { ) -> crate::Result<Json<entity::repo::Model>> {
let repo = db::query::repo::by_id(&global.db, id) let repo = db::query::repo::by_id(&global.db, id)
.await? .await?
.ok_or(axum::http::StatusCode::NOT_FOUND)?; .ok_or(axum::http::StatusCode::NOT_FOUND)?;
@ -39,11 +40,21 @@ async fn get_single_repo(
Ok(Json(repo)) Ok(Json(repo))
} }
async fn post_repo(
State(global): State<crate::Global>,
Json(repo): Json<crate::db::NewRepo>,
) -> crate::Result<(StatusCode, Json<entity::repo::Model>)> {
let model = repo.insert(&global.db).await?;
global.repo.register_repo(model.id).await?;
Ok((StatusCode::CREATED, Json(model)))
}
async fn get_packages( async fn get_packages(
State(global): State<crate::Global>, State(global): State<crate::Global>,
Query(pagination): Query<pagination::Query>, Query(pagination): Query<pagination::Query>,
Query(filter): Query<db::query::package::Filter>, Query(filter): Query<db::query::package::Filter>,
) -> crate::Result<Json<PaginatedResponse<db::package::Model>>> { ) -> crate::Result<Json<PaginatedResponse<entity::package::Model>>> {
let items = let items =
db::query::package::page(&global.db, pagination.per_page, pagination.page - 1, filter) db::query::package::page(&global.db, pagination.per_page, pagination.page - 1, filter)
.await?; .await?;