Compare commits
8 Commits
dev
...
feat/mirro
Author | SHA1 | Date |
---|---|---|
Jef Roosens | 64d9df2e18 | |
Jef Roosens | aa0aae41ab | |
Jef Roosens | d38fd5ca74 | |
Jef Roosens | 986162e926 | |
Jef Roosens | d39205b653 | |
Jef Roosens | cbb04a40e0 | |
Jef Roosens | f761e3b36d | |
Jef Roosens | 4225ce3471 |
|
@ -24,6 +24,9 @@ steps:
|
||||||
secrets:
|
secrets:
|
||||||
- minio_access_key
|
- minio_access_key
|
||||||
- minio_secret_key
|
- minio_secret_key
|
||||||
|
when:
|
||||||
|
branch: dev
|
||||||
|
event: push
|
||||||
|
|
||||||
publish-rel:
|
publish-rel:
|
||||||
image: 'curlimages/curl'
|
image: 'curlimages/curl'
|
||||||
|
|
|
@ -25,19 +25,3 @@ steps:
|
||||||
when:
|
when:
|
||||||
branch: dev
|
branch: dev
|
||||||
event: push
|
event: push
|
||||||
|
|
||||||
release:
|
|
||||||
image: 'woodpeckerci/plugin-docker-buildx'
|
|
||||||
secrets:
|
|
||||||
- 'docker_username'
|
|
||||||
- 'docker_password'
|
|
||||||
settings:
|
|
||||||
registry: 'git.rustybever.be'
|
|
||||||
repo: 'git.rustybever.be/chewing_bever/rieter'
|
|
||||||
auto_tag: true
|
|
||||||
platforms: [ 'linux/amd64' ]
|
|
||||||
build_args_from_env:
|
|
||||||
- 'CI_COMMIT_SHA'
|
|
||||||
mtu: 1300
|
|
||||||
when:
|
|
||||||
event: tag
|
|
||||||
|
|
|
@ -7,8 +7,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||||
|
|
||||||
## [Unreleased](https://git.rustybever.be/Chewing_Bever/rieter/src/branch/dev)
|
## [Unreleased](https://git.rustybever.be/Chewing_Bever/rieter/src/branch/dev)
|
||||||
|
|
||||||
## [0.1.0](https://git.rustybever.be/Chewing_Bever/rieter/src/tag/0.1.0)
|
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
|
|
||||||
* Functional repository server
|
* Functional repository server
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -1,9 +1,10 @@
|
||||||
[workspace]
|
[workspace]
|
||||||
members = [
|
members = [
|
||||||
|
'migration',
|
||||||
'server',
|
'server',
|
||||||
'libarchive',
|
'libarchive',
|
||||||
'libarchive3-sys'
|
'libarchive3-sys'
|
||||||
]
|
, "entity"]
|
||||||
|
|
||||||
[profile.release]
|
[profile.release]
|
||||||
lto = "fat"
|
lto = "fat"
|
||||||
|
|
|
@ -0,0 +1,8 @@
|
||||||
|
[package]
|
||||||
|
name = "entity"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
sea-orm = "0.12.15"
|
||||||
|
serde = { version = "1.0.204", features = ["derive"] }
|
|
@ -8,6 +8,7 @@ use serde::{Deserialize, Serialize};
|
||||||
pub struct Model {
|
pub struct Model {
|
||||||
#[sea_orm(primary_key)]
|
#[sea_orm(primary_key)]
|
||||||
pub id: i32,
|
pub id: i32,
|
||||||
|
#[sea_orm(unique)]
|
||||||
pub name: String,
|
pub name: String,
|
||||||
pub description: Option<String>,
|
pub description: Option<String>,
|
||||||
}
|
}
|
|
@ -9,3 +9,4 @@ pub mod package_group;
|
||||||
pub mod package_license;
|
pub mod package_license;
|
||||||
pub mod package_related;
|
pub mod package_related;
|
||||||
pub mod repo;
|
pub mod repo;
|
||||||
|
pub mod repo_mirror;
|
|
@ -1,11 +1,8 @@
|
||||||
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.1
|
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.1
|
||||||
|
|
||||||
use chrono::NaiveDateTime;
|
|
||||||
use sea_orm::entity::prelude::*;
|
use sea_orm::entity::prelude::*;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use crate::db::PackageState;
|
|
||||||
|
|
||||||
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
|
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
|
||||||
#[sea_orm(table_name = "package")]
|
#[sea_orm(table_name = "package")]
|
||||||
pub struct Model {
|
pub struct Model {
|
||||||
|
@ -20,14 +17,13 @@ pub struct Model {
|
||||||
pub c_size: i64,
|
pub c_size: i64,
|
||||||
pub description: Option<String>,
|
pub description: Option<String>,
|
||||||
pub url: Option<String>,
|
pub url: Option<String>,
|
||||||
pub build_date: NaiveDateTime,
|
pub build_date: DateTime,
|
||||||
pub packager: Option<String>,
|
pub packager: Option<String>,
|
||||||
pub pgp_sig: Option<String>,
|
pub pgp_sig: Option<String>,
|
||||||
pub pgp_sig_size: Option<i64>,
|
pub pgp_sig_size: Option<i64>,
|
||||||
pub sha256_sum: String,
|
pub sha256_sum: String,
|
||||||
pub compression: String,
|
pub compression: String,
|
||||||
#[serde(skip_serializing)]
|
pub state: crate::PackageState,
|
||||||
pub state: PackageState,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
|
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
|
|
@ -3,15 +3,13 @@
|
||||||
use sea_orm::entity::prelude::*;
|
use sea_orm::entity::prelude::*;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use crate::db::PackageRelatedEnum;
|
|
||||||
|
|
||||||
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
|
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
|
||||||
#[sea_orm(table_name = "package_related")]
|
#[sea_orm(table_name = "package_related")]
|
||||||
pub struct Model {
|
pub struct Model {
|
||||||
#[sea_orm(primary_key, auto_increment = false)]
|
#[sea_orm(primary_key, auto_increment = false)]
|
||||||
pub package_id: i32,
|
pub package_id: i32,
|
||||||
#[sea_orm(primary_key, auto_increment = false)]
|
#[sea_orm(primary_key, auto_increment = false)]
|
||||||
pub r#type: PackageRelatedEnum,
|
pub r#type: crate::PackageRelatedEnum,
|
||||||
#[sea_orm(primary_key, auto_increment = false)]
|
#[sea_orm(primary_key, auto_increment = false)]
|
||||||
pub name: String,
|
pub name: String,
|
||||||
}
|
}
|
|
@ -7,3 +7,4 @@ pub use super::package_group::Entity as PackageGroup;
|
||||||
pub use super::package_license::Entity as PackageLicense;
|
pub use super::package_license::Entity as PackageLicense;
|
||||||
pub use super::package_related::Entity as PackageRelated;
|
pub use super::package_related::Entity as PackageRelated;
|
||||||
pub use super::repo::Entity as Repo;
|
pub use super::repo::Entity as Repo;
|
||||||
|
pub use super::repo_mirror::Entity as RepoMirror;
|
|
@ -9,8 +9,10 @@ pub struct Model {
|
||||||
#[sea_orm(primary_key)]
|
#[sea_orm(primary_key)]
|
||||||
pub id: i32,
|
pub id: i32,
|
||||||
pub distro_id: i32,
|
pub distro_id: i32,
|
||||||
|
#[sea_orm(unique)]
|
||||||
pub name: String,
|
pub name: String,
|
||||||
pub description: Option<String>,
|
pub description: Option<String>,
|
||||||
|
pub r#type: crate::RepoType,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
|
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
|
||||||
|
@ -25,6 +27,8 @@ pub enum Relation {
|
||||||
Distro,
|
Distro,
|
||||||
#[sea_orm(has_many = "super::package::Entity")]
|
#[sea_orm(has_many = "super::package::Entity")]
|
||||||
Package,
|
Package,
|
||||||
|
#[sea_orm(has_many = "super::repo_mirror::Entity")]
|
||||||
|
RepoMirror,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Related<super::distro::Entity> for Entity {
|
impl Related<super::distro::Entity> for Entity {
|
||||||
|
@ -39,4 +43,10 @@ impl Related<super::package::Entity> for Entity {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Related<super::repo_mirror::Entity> for Entity {
|
||||||
|
fn to() -> RelationDef {
|
||||||
|
Relation::RepoMirror.def()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl ActiveModelBehavior for ActiveModel {}
|
impl ActiveModelBehavior for ActiveModel {}
|
|
@ -0,0 +1,33 @@
|
||||||
|
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.1
|
||||||
|
|
||||||
|
use sea_orm::entity::prelude::*;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
|
||||||
|
#[sea_orm(table_name = "repo_mirror")]
|
||||||
|
pub struct Model {
|
||||||
|
#[sea_orm(primary_key)]
|
||||||
|
pub id: i32,
|
||||||
|
pub repo_id: i32,
|
||||||
|
pub url: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
|
||||||
|
pub enum Relation {
|
||||||
|
#[sea_orm(
|
||||||
|
belongs_to = "super::repo::Entity",
|
||||||
|
from = "Column::RepoId",
|
||||||
|
to = "super::repo::Column::Id",
|
||||||
|
on_update = "NoAction",
|
||||||
|
on_delete = "Cascade"
|
||||||
|
)]
|
||||||
|
Repo,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Related<super::repo::Entity> for Entity {
|
||||||
|
fn to() -> RelationDef {
|
||||||
|
Relation::Repo.def()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ActiveModelBehavior for ActiveModel {}
|
|
@ -0,0 +1,46 @@
|
||||||
|
pub mod entity;
|
||||||
|
pub use entity::prelude::*;
|
||||||
|
pub use entity::*;
|
||||||
|
|
||||||
|
use sea_orm::{DeriveActiveEnum, EnumIter};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
#[derive(EnumIter, DeriveActiveEnum, Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
|
||||||
|
#[sea_orm(rs_type = "i32", db_type = "Integer")]
|
||||||
|
#[serde(rename_all = "lowercase")]
|
||||||
|
pub enum PackageRelatedEnum {
|
||||||
|
#[sea_orm(num_value = 0)]
|
||||||
|
Conflicts,
|
||||||
|
#[sea_orm(num_value = 1)]
|
||||||
|
Replaces,
|
||||||
|
#[sea_orm(num_value = 2)]
|
||||||
|
Provides,
|
||||||
|
#[sea_orm(num_value = 3)]
|
||||||
|
Depend,
|
||||||
|
#[sea_orm(num_value = 4)]
|
||||||
|
Makedepend,
|
||||||
|
#[sea_orm(num_value = 5)]
|
||||||
|
Checkdepend,
|
||||||
|
#[sea_orm(num_value = 6)]
|
||||||
|
Optdepend,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(EnumIter, DeriveActiveEnum, Deserialize, Serialize, PartialEq, Eq, Clone, Debug)]
|
||||||
|
#[sea_orm(rs_type = "i32", db_type = "Integer")]
|
||||||
|
pub enum PackageState {
|
||||||
|
#[sea_orm(num_value = 0)]
|
||||||
|
PendingCommit,
|
||||||
|
#[sea_orm(num_value = 1)]
|
||||||
|
Committed,
|
||||||
|
#[sea_orm(num_value = 2)]
|
||||||
|
PendingDeletion,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(EnumIter, DeriveActiveEnum, Deserialize, Serialize, PartialEq, Eq, Clone, Debug)]
|
||||||
|
#[sea_orm(rs_type = "i32", db_type = "Integer")]
|
||||||
|
pub enum RepoType {
|
||||||
|
#[sea_orm(num_value = 0)]
|
||||||
|
Regular,
|
||||||
|
#[sea_orm(num_value = 1)]
|
||||||
|
FullMirror,
|
||||||
|
}
|
|
@ -5,6 +5,6 @@ pub mod write;
|
||||||
|
|
||||||
pub use archive::{
|
pub use archive::{
|
||||||
Entry, ExtractOption, ExtractOptions, Handle, ReadCompression, ReadFilter, ReadFormat,
|
Entry, ExtractOption, ExtractOptions, Handle, ReadCompression, ReadFilter, ReadFormat,
|
||||||
WriteFilter, WriteFormat,
|
WriteFilter, WriteFormat, FileType,
|
||||||
};
|
};
|
||||||
pub use error::Result;
|
pub use error::Result;
|
||||||
|
|
|
@ -6,9 +6,10 @@ pub use builder::Builder;
|
||||||
|
|
||||||
use crate::archive::Handle;
|
use crate::archive::Handle;
|
||||||
use crate::ReadFilter;
|
use crate::ReadFilter;
|
||||||
use entries::Entries;
|
pub use entries::{Entries, ReadEntry};
|
||||||
use libarchive3_sys::ffi;
|
use libarchive3_sys::ffi;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
|
pub use file::FileReader;
|
||||||
|
|
||||||
// Represents a read view of an archive
|
// Represents a read view of an archive
|
||||||
pub trait Archive: Handle + Sized {
|
pub trait Archive: Handle + Sized {
|
||||||
|
|
|
@ -0,0 +1,20 @@
|
||||||
|
[package]
|
||||||
|
name = "migration"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
publish = false
|
||||||
|
|
||||||
|
[lib]
|
||||||
|
name = "migration"
|
||||||
|
path = "src/lib.rs"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
async-std = { version = "1", features = ["attributes", "tokio1"] }
|
||||||
|
|
||||||
|
[dependencies.sea-orm-migration]
|
||||||
|
version = "0.12.0"
|
||||||
|
features = [
|
||||||
|
"sqlx-postgres",
|
||||||
|
"sqlx-sqlite",
|
||||||
|
"runtime-tokio-rustls",
|
||||||
|
]
|
|
@ -0,0 +1,41 @@
|
||||||
|
# Running Migrator CLI
|
||||||
|
|
||||||
|
- Generate a new migration file
|
||||||
|
```sh
|
||||||
|
cargo run -- generate MIGRATION_NAME
|
||||||
|
```
|
||||||
|
- Apply all pending migrations
|
||||||
|
```sh
|
||||||
|
cargo run
|
||||||
|
```
|
||||||
|
```sh
|
||||||
|
cargo run -- up
|
||||||
|
```
|
||||||
|
- Apply first 10 pending migrations
|
||||||
|
```sh
|
||||||
|
cargo run -- up -n 10
|
||||||
|
```
|
||||||
|
- Rollback last applied migrations
|
||||||
|
```sh
|
||||||
|
cargo run -- down
|
||||||
|
```
|
||||||
|
- Rollback last 10 applied migrations
|
||||||
|
```sh
|
||||||
|
cargo run -- down -n 10
|
||||||
|
```
|
||||||
|
- Drop all tables from the database, then reapply all migrations
|
||||||
|
```sh
|
||||||
|
cargo run -- fresh
|
||||||
|
```
|
||||||
|
- Rollback all applied migrations, then reapply all migrations
|
||||||
|
```sh
|
||||||
|
cargo run -- refresh
|
||||||
|
```
|
||||||
|
- Rollback all applied migrations
|
||||||
|
```sh
|
||||||
|
cargo run -- reset
|
||||||
|
```
|
||||||
|
- Check the status of all migrations
|
||||||
|
```sh
|
||||||
|
cargo run -- status
|
||||||
|
```
|
|
@ -0,0 +1,16 @@
|
||||||
|
pub use sea_orm_migration::prelude::*;
|
||||||
|
|
||||||
|
mod m20230730_000001_create_repo_tables;
|
||||||
|
mod m20240716_184104_repo_mirrors;
|
||||||
|
|
||||||
|
pub struct Migrator;
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl MigratorTrait for Migrator {
|
||||||
|
fn migrations() -> Vec<Box<dyn MigrationTrait>> {
|
||||||
|
vec![
|
||||||
|
Box::new(m20230730_000001_create_repo_tables::Migration),
|
||||||
|
Box::new(m20240716_184104_repo_mirrors::Migration),
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,74 @@
|
||||||
|
use sea_orm_migration::prelude::*;
|
||||||
|
|
||||||
|
#[derive(DeriveMigrationName)]
|
||||||
|
pub struct Migration;
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl MigrationTrait for Migration {
|
||||||
|
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||||
|
manager
|
||||||
|
.alter_table(
|
||||||
|
Table::alter()
|
||||||
|
.table(Repo::Table)
|
||||||
|
.add_column(
|
||||||
|
ColumnDef::new(Repo::Type)
|
||||||
|
.integer()
|
||||||
|
.not_null()
|
||||||
|
.default(0),
|
||||||
|
)
|
||||||
|
.to_owned(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
manager
|
||||||
|
.create_table(
|
||||||
|
Table::create()
|
||||||
|
.table(RepoMirror::Table)
|
||||||
|
.col(
|
||||||
|
ColumnDef::new(RepoMirror::Id)
|
||||||
|
.integer()
|
||||||
|
.not_null()
|
||||||
|
.auto_increment()
|
||||||
|
.primary_key()
|
||||||
|
)
|
||||||
|
.col(ColumnDef::new(RepoMirror::RepoId).integer().not_null())
|
||||||
|
.col(ColumnDef::new(RepoMirror::Url).string().not_null())
|
||||||
|
.foreign_key(
|
||||||
|
ForeignKey::create()
|
||||||
|
.name("fk-repo_mirror-repo-id")
|
||||||
|
.from(RepoMirror::Table, RepoMirror::RepoId)
|
||||||
|
.to(Repo::Table, Repo::Id)
|
||||||
|
.on_delete(ForeignKeyAction::Cascade),
|
||||||
|
)
|
||||||
|
.to_owned(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||||
|
manager
|
||||||
|
.drop_table(Table::drop().table(RepoMirror::Table).to_owned())
|
||||||
|
.await?;
|
||||||
|
manager
|
||||||
|
.alter_table(Table::alter().drop_column(Repo::Type).to_owned())
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Iden)]
|
||||||
|
pub enum Repo {
|
||||||
|
Table,
|
||||||
|
Id,
|
||||||
|
Type,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Iden)]
|
||||||
|
pub enum RepoMirror {
|
||||||
|
Table,
|
||||||
|
Id,
|
||||||
|
RepoId,
|
||||||
|
Url,
|
||||||
|
}
|
|
@ -0,0 +1,6 @@
|
||||||
|
use sea_orm_migration::prelude::*;
|
||||||
|
|
||||||
|
#[async_std::main]
|
||||||
|
async fn main() {
|
||||||
|
cli::run_cli(migration::Migrator).await;
|
||||||
|
}
|
|
@ -15,7 +15,6 @@ futures = "0.3.28"
|
||||||
http-body-util = "0.1.1"
|
http-body-util = "0.1.1"
|
||||||
libarchive = { path = "../libarchive" }
|
libarchive = { path = "../libarchive" }
|
||||||
regex = "1.10.5"
|
regex = "1.10.5"
|
||||||
sea-orm-migration = "0.12.1"
|
|
||||||
sea-query = { version = "0.30.7", features = ["backend-postgres", "backend-sqlite"] }
|
sea-query = { version = "0.30.7", features = ["backend-postgres", "backend-sqlite"] }
|
||||||
serde = { version = "1.0.178", features = ["derive"] }
|
serde = { version = "1.0.178", features = ["derive"] }
|
||||||
sha256 = "1.1.4"
|
sha256 = "1.1.4"
|
||||||
|
@ -26,6 +25,9 @@ tower-http = { version = "0.5.2", features = ["fs", "trace", "auth"] }
|
||||||
tracing = "0.1.37"
|
tracing = "0.1.37"
|
||||||
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
|
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
|
||||||
uuid = { version = "1.4.0", features = ["v4"] }
|
uuid = { version = "1.4.0", features = ["v4"] }
|
||||||
|
migration = { path = "../migration" }
|
||||||
|
entity = { path = "../entity" }
|
||||||
|
reqwest = { version = "0.12.5", features = ["stream"] }
|
||||||
|
|
||||||
[dependencies.sea-orm]
|
[dependencies.sea-orm]
|
||||||
version = "0.12.1"
|
version = "0.12.1"
|
||||||
|
|
|
@ -1,7 +1,10 @@
|
||||||
api_key = "test"
|
api_key = "test"
|
||||||
pkg_workers = 2
|
|
||||||
log_level = "rieterd=debug"
|
log_level = "rieterd=debug"
|
||||||
|
|
||||||
|
[repo]
|
||||||
|
sync_workers = 2
|
||||||
|
async_workers = 1
|
||||||
|
|
||||||
[fs]
|
[fs]
|
||||||
type = "local"
|
type = "local"
|
||||||
data_dir = "./data"
|
data_dir = "./data"
|
||||||
|
|
|
@ -36,6 +36,14 @@ pub enum DbConfig {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize, Debug, Clone)]
|
||||||
|
pub struct RepoConfig {
|
||||||
|
#[serde(default = "default_repo_sync_workers")]
|
||||||
|
pub sync_workers: u32,
|
||||||
|
#[serde(default = "default_repo_async_workers")]
|
||||||
|
pub async_workers: u32,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Deserialize, Debug, Clone)]
|
#[derive(Deserialize, Debug, Clone)]
|
||||||
pub struct Config {
|
pub struct Config {
|
||||||
pub api_key: String,
|
pub api_key: String,
|
||||||
|
@ -47,8 +55,7 @@ pub struct Config {
|
||||||
pub log_level: String,
|
pub log_level: String,
|
||||||
pub fs: FsConfig,
|
pub fs: FsConfig,
|
||||||
pub db: DbConfig,
|
pub db: DbConfig,
|
||||||
#[serde(default = "default_pkg_workers")]
|
pub repo: RepoConfig,
|
||||||
pub pkg_workers: u32,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Config {
|
impl Config {
|
||||||
|
@ -83,6 +90,10 @@ fn default_db_postgres_max_connections() -> u32 {
|
||||||
16
|
16
|
||||||
}
|
}
|
||||||
|
|
||||||
fn default_pkg_workers() -> u32 {
|
fn default_repo_sync_workers() -> u32 {
|
||||||
|
1
|
||||||
|
}
|
||||||
|
|
||||||
|
fn default_repo_async_workers() -> u32 {
|
||||||
1
|
1
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,12 +0,0 @@
|
||||||
use sea_orm_migration::prelude::*;
|
|
||||||
|
|
||||||
pub struct Migrator;
|
|
||||||
|
|
||||||
mod m20230730_000001_create_repo_tables;
|
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
impl MigratorTrait for Migrator {
|
|
||||||
fn migrations() -> Vec<Box<dyn MigrationTrait>> {
|
|
||||||
vec![Box::new(m20230730_000001_create_repo_tables::Migration)]
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,58 +1,81 @@
|
||||||
pub mod entities;
|
|
||||||
mod migrator;
|
|
||||||
pub mod query;
|
pub mod query;
|
||||||
|
|
||||||
use crate::config::DbConfig;
|
use crate::config::DbConfig;
|
||||||
|
|
||||||
pub use entities::{prelude::*, *};
|
use sea_orm::{
|
||||||
pub use migrator::Migrator;
|
ActiveModelTrait,
|
||||||
|
ActiveValue::{NotSet, Set},
|
||||||
use sea_orm::{ConnectionTrait, Database, DbConn, DeriveActiveEnum, EnumIter};
|
ConnectionTrait, Database, DbConn, EntityTrait, TransactionTrait,
|
||||||
|
};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
type Result<T> = std::result::Result<T, sea_orm::DbErr>;
|
type Result<T> = std::result::Result<T, sea_orm::DbErr>;
|
||||||
|
|
||||||
#[derive(EnumIter, DeriveActiveEnum, Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
|
|
||||||
#[sea_orm(rs_type = "i32", db_type = "Integer")]
|
|
||||||
#[serde(rename_all = "lowercase")]
|
|
||||||
pub enum PackageRelatedEnum {
|
|
||||||
#[sea_orm(num_value = 0)]
|
|
||||||
Conflicts,
|
|
||||||
#[sea_orm(num_value = 1)]
|
|
||||||
Replaces,
|
|
||||||
#[sea_orm(num_value = 2)]
|
|
||||||
Provides,
|
|
||||||
#[sea_orm(num_value = 3)]
|
|
||||||
Depend,
|
|
||||||
#[sea_orm(num_value = 4)]
|
|
||||||
Makedepend,
|
|
||||||
#[sea_orm(num_value = 5)]
|
|
||||||
Checkdepend,
|
|
||||||
#[sea_orm(num_value = 6)]
|
|
||||||
Optdepend,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(EnumIter, DeriveActiveEnum, Deserialize, Serialize, PartialEq, Eq, Clone, Debug)]
|
|
||||||
#[sea_orm(rs_type = "i32", db_type = "Integer")]
|
|
||||||
pub enum PackageState {
|
|
||||||
#[sea_orm(num_value = 0)]
|
|
||||||
PendingCommit,
|
|
||||||
#[sea_orm(num_value = 1)]
|
|
||||||
Committed,
|
|
||||||
#[sea_orm(num_value = 2)]
|
|
||||||
PendingDeletion,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Serialize)]
|
#[derive(Serialize)]
|
||||||
pub struct FullPackage {
|
pub struct FullPackage {
|
||||||
#[serde(flatten)]
|
#[serde(flatten)]
|
||||||
entry: package::Model,
|
entry: entity::package::Model,
|
||||||
licenses: Vec<String>,
|
licenses: Vec<String>,
|
||||||
groups: Vec<String>,
|
groups: Vec<String>,
|
||||||
related: Vec<(PackageRelatedEnum, String)>,
|
related: Vec<(entity::PackageRelatedEnum, String)>,
|
||||||
files: Vec<String>,
|
files: Vec<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize)]
|
||||||
|
#[serde(tag = "type")]
|
||||||
|
pub enum RepoType {
|
||||||
|
Regular,
|
||||||
|
FullMirror { mirrors: Vec<String> },
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize)]
|
||||||
|
pub struct NewRepo {
|
||||||
|
distro_id: i32,
|
||||||
|
name: String,
|
||||||
|
description: Option<String>,
|
||||||
|
#[serde(flatten)]
|
||||||
|
r#type: RepoType,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl NewRepo {
|
||||||
|
pub async fn insert(self, conn: &DbConn) -> crate::Result<entity::repo::Model> {
|
||||||
|
let txn = conn.begin().await?;
|
||||||
|
|
||||||
|
let repo_type = match self.r#type {
|
||||||
|
RepoType::Regular => entity::RepoType::Regular,
|
||||||
|
RepoType::FullMirror { .. } => entity::RepoType::FullMirror,
|
||||||
|
};
|
||||||
|
|
||||||
|
let repo = entity::repo::ActiveModel {
|
||||||
|
id: NotSet,
|
||||||
|
distro_id: Set(self.distro_id),
|
||||||
|
name: Set(self.name),
|
||||||
|
description: Set(self.description),
|
||||||
|
r#type: Set(repo_type),
|
||||||
|
};
|
||||||
|
let model = repo.insert(conn).await?;
|
||||||
|
|
||||||
|
match self.r#type {
|
||||||
|
RepoType::Regular => {}
|
||||||
|
RepoType::FullMirror { mirrors } => {
|
||||||
|
entity::RepoMirror::insert_many(mirrors.into_iter().map(|url| {
|
||||||
|
entity::repo_mirror::ActiveModel {
|
||||||
|
id: NotSet,
|
||||||
|
repo_id: Set(model.id),
|
||||||
|
url: Set(url),
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
.on_empty_do_nothing()
|
||||||
|
.exec(&txn)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
txn.commit().await?;
|
||||||
|
Ok(model)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn connect(conn: &DbConfig) -> crate::Result<DbConn> {
|
pub async fn connect(conn: &DbConfig) -> crate::Result<DbConn> {
|
||||||
match conn {
|
match conn {
|
||||||
DbConfig::Sqlite {
|
DbConfig::Sqlite {
|
||||||
|
|
|
@ -1,6 +1,11 @@
|
||||||
use crate::db::*;
|
use crate::db::Result;
|
||||||
|
|
||||||
use sea_orm::{sea_query::IntoCondition, *};
|
use entity::{distro, prelude::Distro};
|
||||||
|
use sea_orm::{
|
||||||
|
sea_query::IntoCondition, ActiveModelTrait, ColumnTrait, Condition, DbConn, EntityTrait,
|
||||||
|
NotSet, PaginatorTrait, QueryFilter, QueryOrder, Set,
|
||||||
|
};
|
||||||
|
use serde::Deserialize;
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
#[derive(Deserialize)]
|
||||||
pub struct Filter {
|
pub struct Filter {
|
||||||
|
@ -21,7 +26,7 @@ pub async fn page(
|
||||||
per_page: u64,
|
per_page: u64,
|
||||||
page: u64,
|
page: u64,
|
||||||
filter: Filter,
|
filter: Filter,
|
||||||
) -> Result<Vec<distro::Model>> {
|
) -> Result<Vec<entity::distro::Model>> {
|
||||||
let paginator = Distro::find()
|
let paginator = Distro::find()
|
||||||
.filter(filter)
|
.filter(filter)
|
||||||
.order_by_asc(distro::Column::Id)
|
.order_by_asc(distro::Column::Id)
|
||||||
|
|
|
@ -1,7 +1,16 @@
|
||||||
use crate::db::{self, *};
|
use crate::db::{FullPackage, Result};
|
||||||
|
|
||||||
use sea_orm::{sea_query::IntoCondition, *};
|
use entity::{
|
||||||
use sea_query::{Alias, Expr, Query, SelectStatement};
|
package, package_file, package_group, package_license, package_related,
|
||||||
|
prelude::{Package, PackageFile, PackageGroup, PackageLicense, PackageRelated},
|
||||||
|
PackageRelatedEnum, PackageState,
|
||||||
|
};
|
||||||
|
use sea_orm::{
|
||||||
|
sea_query::IntoCondition, ActiveModelTrait, ColumnTrait, Condition, ConnectionTrait, DbConn,
|
||||||
|
DeleteResult, EntityTrait, FromQueryResult, Iterable, ModelTrait, NotSet, PaginatorTrait,
|
||||||
|
QueryFilter, QuerySelect, SelectModel, SelectorRaw, Set, TransactionTrait,
|
||||||
|
};
|
||||||
|
use sea_query::{Alias, Expr, JoinType, Order, Query, SelectStatement};
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
|
|
||||||
/// How many fields may be inserted at once into the database.
|
/// How many fields may be inserted at once into the database.
|
||||||
|
@ -31,22 +40,22 @@ pub async fn page(
|
||||||
) -> crate::Result<Vec<package::Model>> {
|
) -> crate::Result<Vec<package::Model>> {
|
||||||
let p2 = Alias::new("p2");
|
let p2 = Alias::new("p2");
|
||||||
let query = Query::select()
|
let query = Query::select()
|
||||||
.columns(db::package::Column::iter().map(|c| (db::package::Entity, c)))
|
.columns(package::Column::iter().map(|c| (package::Entity, c)))
|
||||||
.from(db::package::Entity)
|
.from(package::Entity)
|
||||||
.join_subquery(
|
.join_subquery(
|
||||||
JoinType::InnerJoin,
|
JoinType::InnerJoin,
|
||||||
max_pkg_ids_query(true),
|
max_pkg_ids_query(true),
|
||||||
p2.clone(),
|
p2.clone(),
|
||||||
Expr::col((db::package::Entity, db::package::Column::Id))
|
Expr::col((package::Entity, package::Column::Id))
|
||||||
.eq(Expr::col((p2.clone(), Alias::new("max_id")))),
|
.eq(Expr::col((p2.clone(), Alias::new("max_id")))),
|
||||||
)
|
)
|
||||||
.cond_where(filter)
|
.cond_where(filter)
|
||||||
.order_by((db::package::Entity, db::package::Column::Id), Order::Asc)
|
.order_by((package::Entity, package::Column::Id), Order::Asc)
|
||||||
.to_owned();
|
.to_owned();
|
||||||
let builder = conn.get_database_backend();
|
let builder = conn.get_database_backend();
|
||||||
let sql = builder.build(&query);
|
let sql = builder.build(&query);
|
||||||
|
|
||||||
Ok(db::Package::find()
|
Ok(Package::find()
|
||||||
.from_raw_sql(sql)
|
.from_raw_sql(sql)
|
||||||
.paginate(conn, per_page)
|
.paginate(conn, per_page)
|
||||||
.fetch_page(page)
|
.fetch_page(page)
|
||||||
|
@ -213,7 +222,7 @@ pub async fn full(conn: &DbConn, id: i32) -> Result<Option<FullPackage>> {
|
||||||
.into_tuple()
|
.into_tuple()
|
||||||
.all(conn)
|
.all(conn)
|
||||||
.await?;
|
.await?;
|
||||||
let related: Vec<(db::PackageRelatedEnum, String)> = entry
|
let related: Vec<(PackageRelatedEnum, String)> = entry
|
||||||
.find_related(PackageRelated)
|
.find_related(PackageRelated)
|
||||||
.select_only()
|
.select_only()
|
||||||
.columns([package_related::Column::Type, package_related::Column::Name])
|
.columns([package_related::Column::Type, package_related::Column::Name])
|
||||||
|
@ -248,22 +257,22 @@ pub struct PkgToRemove {
|
||||||
|
|
||||||
fn max_pkg_ids_query(committed: bool) -> SelectStatement {
|
fn max_pkg_ids_query(committed: bool) -> SelectStatement {
|
||||||
let mut query = Query::select()
|
let mut query = Query::select()
|
||||||
.from(db::package::Entity)
|
.from(package::Entity)
|
||||||
.columns([
|
.columns([
|
||||||
db::package::Column::RepoId,
|
package::Column::RepoId,
|
||||||
db::package::Column::Arch,
|
package::Column::Arch,
|
||||||
db::package::Column::Name,
|
package::Column::Name,
|
||||||
])
|
])
|
||||||
.expr_as(db::package::Column::Id.max(), Alias::new("max_id"))
|
.expr_as(package::Column::Id.max(), Alias::new("max_id"))
|
||||||
.group_by_columns([
|
.group_by_columns([
|
||||||
db::package::Column::RepoId,
|
package::Column::RepoId,
|
||||||
db::package::Column::Arch,
|
package::Column::Arch,
|
||||||
db::package::Column::Name,
|
package::Column::Name,
|
||||||
])
|
])
|
||||||
.to_owned();
|
.to_owned();
|
||||||
|
|
||||||
if committed {
|
if committed {
|
||||||
query.cond_where(db::package::Column::State.eq(db::PackageState::Committed));
|
query.cond_where(package::Column::State.eq(PackageState::Committed));
|
||||||
}
|
}
|
||||||
|
|
||||||
query
|
query
|
||||||
|
@ -278,47 +287,44 @@ pub fn pkgs_to_sync(
|
||||||
) -> SelectorRaw<SelectModel<package::Model>> {
|
) -> SelectorRaw<SelectModel<package::Model>> {
|
||||||
let (p1, p2) = (Alias::new("p1"), Alias::new("p2"));
|
let (p1, p2) = (Alias::new("p1"), Alias::new("p2"));
|
||||||
let query = Query::select()
|
let query = Query::select()
|
||||||
.columns(db::package::Column::iter().map(|c| (p1.clone(), c)))
|
.columns(package::Column::iter().map(|c| (p1.clone(), c)))
|
||||||
.from_as(db::package::Entity, p1.clone())
|
.from_as(package::Entity, p1.clone())
|
||||||
.join_subquery(
|
.join_subquery(
|
||||||
JoinType::InnerJoin,
|
JoinType::InnerJoin,
|
||||||
max_pkg_ids_query(false),
|
max_pkg_ids_query(false),
|
||||||
p2.clone(),
|
p2.clone(),
|
||||||
Expr::col((p1.clone(), db::package::Column::Id))
|
Expr::col((p1.clone(), package::Column::Id))
|
||||||
.eq(Expr::col((p2.clone(), Alias::new("max_id")))),
|
.eq(Expr::col((p2.clone(), Alias::new("max_id")))),
|
||||||
)
|
)
|
||||||
.cond_where(
|
.cond_where(
|
||||||
Condition::all()
|
Condition::all()
|
||||||
.add(Expr::col((p1.clone(), db::package::Column::RepoId)).eq(repo))
|
.add(Expr::col((p1.clone(), package::Column::RepoId)).eq(repo))
|
||||||
|
.add(Expr::col((p1.clone(), package::Column::Arch)).is_in([arch, crate::ANY_ARCH]))
|
||||||
.add(
|
.add(
|
||||||
Expr::col((p1.clone(), db::package::Column::Arch))
|
Expr::col((p1.clone(), package::Column::State))
|
||||||
.is_in([arch, crate::ANY_ARCH]),
|
.ne(PackageState::PendingDeletion),
|
||||||
)
|
|
||||||
.add(
|
|
||||||
Expr::col((p1.clone(), db::package::Column::State))
|
|
||||||
.ne(db::PackageState::PendingDeletion),
|
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
.to_owned();
|
.to_owned();
|
||||||
let builder = conn.get_database_backend();
|
let builder = conn.get_database_backend();
|
||||||
let sql = builder.build(&query);
|
let sql = builder.build(&query);
|
||||||
|
|
||||||
db::Package::find().from_raw_sql(sql)
|
Package::find().from_raw_sql(sql)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn stale_pkgs_query(include_repo: bool) -> SelectStatement {
|
fn stale_pkgs_query(include_repo: bool) -> SelectStatement {
|
||||||
let (p1, p2) = (Alias::new("p1"), Alias::new("p2"));
|
let (p1, p2) = (Alias::new("p1"), Alias::new("p2"));
|
||||||
let mut query = Query::select()
|
let mut query = Query::select()
|
||||||
.from_as(db::package::Entity, p1.clone())
|
.from_as(package::Entity, p1.clone())
|
||||||
.to_owned();
|
.to_owned();
|
||||||
|
|
||||||
if include_repo {
|
if include_repo {
|
||||||
query.columns([
|
query.columns([
|
||||||
(p1.clone(), db::package::Column::RepoId),
|
(p1.clone(), package::Column::RepoId),
|
||||||
(p1.clone(), db::package::Column::Id),
|
(p1.clone(), package::Column::Id),
|
||||||
]);
|
]);
|
||||||
} else {
|
} else {
|
||||||
query.column((p1.clone(), db::package::Column::Id));
|
query.column((p1.clone(), package::Column::Id));
|
||||||
}
|
}
|
||||||
|
|
||||||
// We left join on the max pkgs query because a repository that has all its packages set to
|
// We left join on the max pkgs query because a repository that has all its packages set to
|
||||||
|
@ -331,27 +337,27 @@ fn stale_pkgs_query(include_repo: bool) -> SelectStatement {
|
||||||
p2.clone(),
|
p2.clone(),
|
||||||
Condition::all()
|
Condition::all()
|
||||||
.add(
|
.add(
|
||||||
Expr::col((p1.clone(), db::package::Column::RepoId))
|
Expr::col((p1.clone(), package::Column::RepoId))
|
||||||
.eq(Expr::col((p2.clone(), db::package::Column::RepoId))),
|
.eq(Expr::col((p2.clone(), package::Column::RepoId))),
|
||||||
)
|
)
|
||||||
.add(
|
.add(
|
||||||
Expr::col((p1.clone(), db::package::Column::Arch))
|
Expr::col((p1.clone(), package::Column::Arch))
|
||||||
.eq(Expr::col((p2.clone(), db::package::Column::Arch))),
|
.eq(Expr::col((p2.clone(), package::Column::Arch))),
|
||||||
)
|
)
|
||||||
.add(
|
.add(
|
||||||
Expr::col((p1.clone(), db::package::Column::Name))
|
Expr::col((p1.clone(), package::Column::Name))
|
||||||
.eq(Expr::col((p2.clone(), db::package::Column::Name))),
|
.eq(Expr::col((p2.clone(), package::Column::Name))),
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
.cond_where(
|
.cond_where(
|
||||||
Condition::any()
|
Condition::any()
|
||||||
.add(
|
.add(
|
||||||
Expr::col((p1.clone(), db::package::Column::Id))
|
Expr::col((p1.clone(), package::Column::Id))
|
||||||
.lt(Expr::col((p2.clone(), Alias::new("max_id")))),
|
.lt(Expr::col((p2.clone(), Alias::new("max_id")))),
|
||||||
)
|
)
|
||||||
.add(
|
.add(
|
||||||
Expr::col((p1.clone(), db::package::Column::State))
|
Expr::col((p1.clone(), package::Column::State))
|
||||||
.eq(db::PackageState::PendingDeletion),
|
.eq(PackageState::PendingDeletion),
|
||||||
),
|
),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -367,9 +373,9 @@ pub fn stale_pkgs(conn: &DbConn) -> SelectorRaw<SelectModel<PkgToRemove>> {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn delete_stale_pkgs(conn: &DbConn, max_id: i32) -> crate::Result<()> {
|
pub async fn delete_stale_pkgs(conn: &DbConn, max_id: i32) -> crate::Result<()> {
|
||||||
Ok(db::Package::delete_many()
|
Ok(Package::delete_many()
|
||||||
.filter(db::package::Column::Id.lte(max_id))
|
.filter(package::Column::Id.lte(max_id))
|
||||||
.filter(db::package::Column::Id.in_subquery(stale_pkgs_query(false)))
|
.filter(package::Column::Id.in_subquery(stale_pkgs_query(false)))
|
||||||
.exec(conn)
|
.exec(conn)
|
||||||
.await
|
.await
|
||||||
.map(|_| ())?)
|
.map(|_| ())?)
|
||||||
|
|
|
@ -1,6 +1,11 @@
|
||||||
use crate::db::*;
|
use crate::db::Result;
|
||||||
|
|
||||||
use sea_orm::{sea_query::IntoCondition, *};
|
use entity::{prelude::Repo, repo};
|
||||||
|
use sea_orm::{
|
||||||
|
sea_query::IntoCondition, ActiveModelTrait, ColumnTrait, Condition, DbConn, EntityTrait,
|
||||||
|
NotSet, PaginatorTrait, QueryFilter, QueryOrder, Set,
|
||||||
|
};
|
||||||
|
use serde::Deserialize;
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
#[derive(Deserialize)]
|
||||||
pub struct Filter {
|
pub struct Filter {
|
||||||
|
@ -47,12 +52,14 @@ pub async fn insert(
|
||||||
distro_id: i32,
|
distro_id: i32,
|
||||||
name: &str,
|
name: &str,
|
||||||
description: Option<&str>,
|
description: Option<&str>,
|
||||||
|
r#type: entity::RepoType,
|
||||||
) -> Result<repo::Model> {
|
) -> Result<repo::Model> {
|
||||||
let model = repo::ActiveModel {
|
let model = repo::ActiveModel {
|
||||||
id: NotSet,
|
id: NotSet,
|
||||||
distro_id: Set(distro_id),
|
distro_id: Set(distro_id),
|
||||||
name: Set(String::from(name)),
|
name: Set(String::from(name)),
|
||||||
description: Set(description.map(String::from)),
|
description: Set(description.map(String::from)),
|
||||||
|
r#type: Set(r#type),
|
||||||
};
|
};
|
||||||
|
|
||||||
model.insert(conn).await
|
model.insert(conn).await
|
||||||
|
|
|
@ -15,6 +15,7 @@ pub enum ServerError {
|
||||||
Status(StatusCode),
|
Status(StatusCode),
|
||||||
Archive(libarchive::error::ArchiveError),
|
Archive(libarchive::error::ArchiveError),
|
||||||
Figment(figment::Error),
|
Figment(figment::Error),
|
||||||
|
Reqwest(reqwest::Error),
|
||||||
Unit,
|
Unit,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -27,6 +28,7 @@ impl fmt::Display for ServerError {
|
||||||
ServerError::Db(err) => write!(fmt, "{}", err),
|
ServerError::Db(err) => write!(fmt, "{}", err),
|
||||||
ServerError::Archive(err) => write!(fmt, "{}", err),
|
ServerError::Archive(err) => write!(fmt, "{}", err),
|
||||||
ServerError::Figment(err) => write!(fmt, "{}", err),
|
ServerError::Figment(err) => write!(fmt, "{}", err),
|
||||||
|
ServerError::Reqwest(err) => write!(fmt, "{}", err),
|
||||||
ServerError::Unit => Ok(()),
|
ServerError::Unit => Ok(()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -48,6 +50,7 @@ impl IntoResponse for ServerError {
|
||||||
ServerError::Db(_)
|
ServerError::Db(_)
|
||||||
| ServerError::Archive(_)
|
| ServerError::Archive(_)
|
||||||
| ServerError::Figment(_)
|
| ServerError::Figment(_)
|
||||||
|
| ServerError::Reqwest(_)
|
||||||
| ServerError::Unit => StatusCode::INTERNAL_SERVER_ERROR.into_response(),
|
| ServerError::Unit => StatusCode::INTERNAL_SERVER_ERROR.into_response(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -94,3 +97,9 @@ impl From<figment::Error> for ServerError {
|
||||||
ServerError::Figment(err)
|
ServerError::Figment(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl From<reqwest::Error> for ServerError {
|
||||||
|
fn from(err: reqwest::Error) -> Self {
|
||||||
|
ServerError::Reqwest(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -12,7 +12,7 @@ pub use error::{Result, ServerError};
|
||||||
use std::{io, path::PathBuf};
|
use std::{io, path::PathBuf};
|
||||||
|
|
||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
use sea_orm_migration::MigratorTrait;
|
use migration::MigratorTrait;
|
||||||
use tokio::runtime;
|
use tokio::runtime;
|
||||||
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
||||||
|
|
||||||
|
@ -52,7 +52,7 @@ fn setup(rt: &runtime::Handle, config_file: PathBuf) -> crate::Result<Global> {
|
||||||
|
|
||||||
tracing::info!("Connecting to database");
|
tracing::info!("Connecting to database");
|
||||||
let db = rt.block_on(crate::db::connect(&config.db))?;
|
let db = rt.block_on(crate::db::connect(&config.db))?;
|
||||||
rt.block_on(crate::db::Migrator::up(&db, None))?;
|
rt.block_on(migration::Migrator::up(&db, None))?;
|
||||||
|
|
||||||
let repo = match &config.fs {
|
let repo = match &config.fs {
|
||||||
FsConfig::Local { data_dir } => {
|
FsConfig::Local { data_dir } => {
|
||||||
|
@ -60,7 +60,8 @@ fn setup(rt: &runtime::Handle, config_file: PathBuf) -> crate::Result<Global> {
|
||||||
data_dir.join("repos"),
|
data_dir.join("repos"),
|
||||||
db.clone(),
|
db.clone(),
|
||||||
rt.clone(),
|
rt.clone(),
|
||||||
config.pkg_workers,
|
config.repo.sync_workers,
|
||||||
|
config.repo.async_workers,
|
||||||
)?
|
)?
|
||||||
//rt.block_on(crate::repo::RepoMgr::new(
|
//rt.block_on(crate::repo::RepoMgr::new(
|
||||||
// data_dir.join("repos"),
|
// data_dir.join("repos"),
|
||||||
|
|
|
@ -0,0 +1,64 @@
|
||||||
|
use crate::{
|
||||||
|
db,
|
||||||
|
repo::{archive, package, AsyncCommand, SharedState},
|
||||||
|
};
|
||||||
|
|
||||||
|
use std::{
|
||||||
|
io,
|
||||||
|
path::PathBuf,
|
||||||
|
sync::{atomic::Ordering, Arc},
|
||||||
|
};
|
||||||
|
|
||||||
|
use uuid::Uuid;
|
||||||
|
use futures::StreamExt;
|
||||||
|
use tokio::io::AsyncWriteExt;
|
||||||
|
|
||||||
|
pub struct AsyncActor {
|
||||||
|
state: Arc<SharedState>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AsyncActor {
|
||||||
|
pub fn new(state: &Arc<SharedState>) -> Self {
|
||||||
|
Self {
|
||||||
|
state: Arc::clone(state)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn random_file_paths<const C: usize>(&self) -> [PathBuf; C] {
|
||||||
|
std::array::from_fn(|_| {
|
||||||
|
let uuid: uuid::fmt::Simple = Uuid::new_v4().into();
|
||||||
|
self.state.repos_dir.join(uuid.to_string())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn run(self) {
|
||||||
|
while let Some(msg) = {
|
||||||
|
let mut rx = self.state.async_queue.1.lock().await;
|
||||||
|
rx.recv().await
|
||||||
|
} {
|
||||||
|
match msg {
|
||||||
|
AsyncCommand::DownloadDbArchive(repo_id, url) => {
|
||||||
|
match self.download_file(url).await {
|
||||||
|
Ok(path) => todo!("schedule parse db archive"),
|
||||||
|
Err(err) => {
|
||||||
|
todo!("reschedule unless max retries reached")
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn download_file(&self, url: reqwest::Url) -> crate::Result<PathBuf> {
|
||||||
|
let [path] = self.random_file_paths();
|
||||||
|
|
||||||
|
let mut stream = self.state.client.get(url).send().await?.bytes_stream();
|
||||||
|
let mut f = tokio::fs::File::create(&path).await?;
|
||||||
|
|
||||||
|
while let Some(chunk) = stream.next().await.transpose()? {
|
||||||
|
f.write_all(&chunk).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(path)
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,5 @@
|
||||||
|
mod sync;
|
||||||
|
mod r#async;
|
||||||
|
|
||||||
|
pub use sync::Actor;
|
||||||
|
pub use r#async::AsyncActor;
|
|
@ -1,5 +1,7 @@
|
||||||
use super::{archive, package, Command, SharedState};
|
use crate::{
|
||||||
use crate::db;
|
db,
|
||||||
|
repo::{archive, package, Command, SharedState},
|
||||||
|
};
|
||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
path::PathBuf,
|
path::PathBuf,
|
||||||
|
@ -20,10 +22,10 @@ pub struct Actor {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Actor {
|
impl Actor {
|
||||||
pub fn new(rt: runtime::Handle, state: Arc<SharedState>) -> Self {
|
pub fn new(rt: runtime::Handle, state: &Arc<SharedState>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
rt,
|
rt,
|
||||||
state: Arc::clone(&state),
|
state: Arc::clone(state),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -37,7 +39,7 @@ impl Actor {
|
||||||
/// Run the main actor loop
|
/// Run the main actor loop
|
||||||
pub fn run(self) {
|
pub fn run(self) {
|
||||||
while let Some(msg) = {
|
while let Some(msg) = {
|
||||||
let mut rx = self.state.rx.lock().unwrap();
|
let mut rx = self.state.sync_queue.1.lock().unwrap();
|
||||||
rx.blocking_recv()
|
rx.blocking_recv()
|
||||||
} {
|
} {
|
||||||
match msg {
|
match msg {
|
||||||
|
@ -99,10 +101,10 @@ impl Actor {
|
||||||
|
|
||||||
if let Some(_guard) = repos.get(&repo).map(|n| n.1.lock()) {
|
if let Some(_guard) = repos.get(&repo).map(|n| n.1.lock()) {
|
||||||
let archs: Vec<String> = self.rt.block_on(
|
let archs: Vec<String> = self.rt.block_on(
|
||||||
db::Package::find()
|
entity::Package::find()
|
||||||
.filter(db::package::Column::RepoId.eq(repo))
|
.filter(entity::package::Column::RepoId.eq(repo))
|
||||||
.select_only()
|
.select_only()
|
||||||
.column(db::package::Column::Arch)
|
.column(entity::package::Column::Arch)
|
||||||
.distinct()
|
.distinct()
|
||||||
.into_tuple()
|
.into_tuple()
|
||||||
.all(&self.state.conn),
|
.all(&self.state.conn),
|
||||||
|
@ -171,12 +173,12 @@ impl Actor {
|
||||||
|
|
||||||
// Update the state for the newly committed packages
|
// Update the state for the newly committed packages
|
||||||
self.rt.block_on(
|
self.rt.block_on(
|
||||||
db::Package::update_many()
|
entity::Package::update_many()
|
||||||
.col_expr(
|
.col_expr(
|
||||||
db::package::Column::State,
|
entity::package::Column::State,
|
||||||
Expr::value(db::PackageState::Committed),
|
Expr::value(entity::PackageState::Committed),
|
||||||
)
|
)
|
||||||
.filter(db::package::Column::Id.is_in(committed_ids))
|
.filter(entity::package::Column::Id.is_in(committed_ids))
|
||||||
.exec(&self.state.conn),
|
.exec(&self.state.conn),
|
||||||
)?;
|
)?;
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
use crate::db;
|
|
||||||
use std::{
|
use std::{
|
||||||
io::Write,
|
io::Write,
|
||||||
path::{Path, PathBuf},
|
path::{Path, PathBuf},
|
||||||
|
@ -69,7 +68,7 @@ impl RepoArchivesWriter {
|
||||||
Ok(ar.append_path(&mut ar_entry, src_path)?)
|
Ok(ar.append_path(&mut ar_entry, src_path)?)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn append_pkg(&mut self, pkg: &db::package::Model) -> crate::Result<()> {
|
pub fn append_pkg(&mut self, pkg: &entity::package::Model) -> crate::Result<()> {
|
||||||
self.write_desc(&self.tmp_paths[0], pkg)?;
|
self.write_desc(&self.tmp_paths[0], pkg)?;
|
||||||
self.write_files(&self.tmp_paths[1], pkg)?;
|
self.write_files(&self.tmp_paths[1], pkg)?;
|
||||||
|
|
||||||
|
@ -85,7 +84,11 @@ impl RepoArchivesWriter {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Generate a "files" archive entry for the package in the given path
|
/// Generate a "files" archive entry for the package in the given path
|
||||||
fn write_files(&self, path: impl AsRef<Path>, pkg: &db::package::Model) -> crate::Result<()> {
|
fn write_files(
|
||||||
|
&self,
|
||||||
|
path: impl AsRef<Path>,
|
||||||
|
pkg: &entity::package::Model,
|
||||||
|
) -> crate::Result<()> {
|
||||||
let mut f = std::io::BufWriter::new(std::fs::File::create(path)?);
|
let mut f = std::io::BufWriter::new(std::fs::File::create(path)?);
|
||||||
|
|
||||||
writeln!(f, "%FILES%")?;
|
writeln!(f, "%FILES%")?;
|
||||||
|
@ -93,7 +96,7 @@ impl RepoArchivesWriter {
|
||||||
let (tx, mut rx) = mpsc::channel(1);
|
let (tx, mut rx) = mpsc::channel(1);
|
||||||
|
|
||||||
let conn = self.conn.clone();
|
let conn = self.conn.clone();
|
||||||
let query = pkg.find_related(db::PackageFile);
|
let query = pkg.find_related(entity::prelude::PackageFile);
|
||||||
|
|
||||||
self.rt.spawn(async move {
|
self.rt.spawn(async move {
|
||||||
match query.stream(&conn).await {
|
match query.stream(&conn).await {
|
||||||
|
@ -121,7 +124,11 @@ impl RepoArchivesWriter {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn write_desc(&self, path: impl AsRef<Path>, pkg: &db::package::Model) -> crate::Result<()> {
|
fn write_desc(
|
||||||
|
&self,
|
||||||
|
path: impl AsRef<Path>,
|
||||||
|
pkg: &entity::package::Model,
|
||||||
|
) -> crate::Result<()> {
|
||||||
let mut f = std::io::BufWriter::new(std::fs::File::create(path)?);
|
let mut f = std::io::BufWriter::new(std::fs::File::create(path)?);
|
||||||
|
|
||||||
let filename = format!(
|
let filename = format!(
|
||||||
|
@ -147,9 +154,9 @@ impl RepoArchivesWriter {
|
||||||
}
|
}
|
||||||
|
|
||||||
let groups: Vec<String> = self.rt.block_on(
|
let groups: Vec<String> = self.rt.block_on(
|
||||||
pkg.find_related(db::PackageGroup)
|
pkg.find_related(entity::prelude::PackageGroup)
|
||||||
.select_only()
|
.select_only()
|
||||||
.column(db::package_group::Column::Name)
|
.column(entity::package_group::Column::Name)
|
||||||
.into_tuple()
|
.into_tuple()
|
||||||
.all(&self.conn),
|
.all(&self.conn),
|
||||||
)?;
|
)?;
|
||||||
|
@ -165,9 +172,9 @@ impl RepoArchivesWriter {
|
||||||
}
|
}
|
||||||
|
|
||||||
let licenses: Vec<String> = self.rt.block_on(
|
let licenses: Vec<String> = self.rt.block_on(
|
||||||
pkg.find_related(db::PackageLicense)
|
pkg.find_related(entity::prelude::PackageLicense)
|
||||||
.select_only()
|
.select_only()
|
||||||
.column(db::package_license::Column::Name)
|
.column(entity::package_license::Column::Name)
|
||||||
.into_tuple()
|
.into_tuple()
|
||||||
.all(&self.conn),
|
.all(&self.conn),
|
||||||
)?;
|
)?;
|
||||||
|
@ -186,21 +193,21 @@ impl RepoArchivesWriter {
|
||||||
}
|
}
|
||||||
|
|
||||||
let related = [
|
let related = [
|
||||||
("REPLACES", db::PackageRelatedEnum::Replaces),
|
("REPLACES", entity::PackageRelatedEnum::Replaces),
|
||||||
("CONFLICTS", db::PackageRelatedEnum::Conflicts),
|
("CONFLICTS", entity::PackageRelatedEnum::Conflicts),
|
||||||
("PROVIDES", db::PackageRelatedEnum::Provides),
|
("PROVIDES", entity::PackageRelatedEnum::Provides),
|
||||||
("DEPENDS", db::PackageRelatedEnum::Depend),
|
("DEPENDS", entity::PackageRelatedEnum::Depend),
|
||||||
("OPTDEPENDS", db::PackageRelatedEnum::Optdepend),
|
("OPTDEPENDS", entity::PackageRelatedEnum::Optdepend),
|
||||||
("MAKEDEPENDS", db::PackageRelatedEnum::Makedepend),
|
("MAKEDEPENDS", entity::PackageRelatedEnum::Makedepend),
|
||||||
("CHECKDEPENDS", db::PackageRelatedEnum::Checkdepend),
|
("CHECKDEPENDS", entity::PackageRelatedEnum::Checkdepend),
|
||||||
];
|
];
|
||||||
|
|
||||||
for (key, attr) in related.into_iter() {
|
for (key, attr) in related.into_iter() {
|
||||||
let items: Vec<String> = self.rt.block_on(
|
let items: Vec<String> = self.rt.block_on(
|
||||||
pkg.find_related(db::PackageRelated)
|
pkg.find_related(entity::prelude::PackageRelated)
|
||||||
.filter(db::package_related::Column::Type.eq(attr))
|
.filter(entity::package_related::Column::Type.eq(attr))
|
||||||
.select_only()
|
.select_only()
|
||||||
.column(db::package_related::Column::Name)
|
.column(entity::package_related::Column::Name)
|
||||||
.into_tuple()
|
.into_tuple()
|
||||||
.all(&self.conn),
|
.all(&self.conn),
|
||||||
)?;
|
)?;
|
||||||
|
|
|
@ -1,5 +1,4 @@
|
||||||
use super::{Command, SharedState};
|
use super::{Command, SharedState};
|
||||||
use crate::db;
|
|
||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
path::PathBuf,
|
path::PathBuf,
|
||||||
|
@ -31,13 +30,24 @@ impl Handle {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_or_create_repo(&self, distro: &str, repo: &str) -> crate::Result<i32> {
|
pub async fn register_repo(&self, repo_id: i32) -> crate::Result<()> {
|
||||||
let mut repos = self.state.repos.write().await;
|
let repo_dir = self.state.repos_dir.join(repo_id.to_string());
|
||||||
|
|
||||||
let distro_id: Option<i32> = db::Distro::find()
|
if !tokio::fs::try_exists(repo_dir).await? {
|
||||||
.filter(db::distro::Column::Name.eq(distro))
|
tokio::fs::create_dir(self.state.repos_dir.join(repo_id.to_string())).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut repos = self.state.repos.write().await;
|
||||||
|
repos.insert(repo_id, Default::default());
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_or_create_repo(&self, distro: &str, repo: &str) -> crate::Result<i32> {
|
||||||
|
let distro_id: Option<i32> = entity::Distro::find()
|
||||||
|
.filter(entity::distro::Column::Name.eq(distro))
|
||||||
.select_only()
|
.select_only()
|
||||||
.column(db::distro::Column::Id)
|
.column(entity::distro::Column::Id)
|
||||||
.into_tuple()
|
.into_tuple()
|
||||||
.one(&self.state.conn)
|
.one(&self.state.conn)
|
||||||
.await?;
|
.await?;
|
||||||
|
@ -45,7 +55,7 @@ impl Handle {
|
||||||
let distro_id = if let Some(id) = distro_id {
|
let distro_id = if let Some(id) = distro_id {
|
||||||
id
|
id
|
||||||
} else {
|
} else {
|
||||||
let new_distro = db::distro::ActiveModel {
|
let new_distro = entity::distro::ActiveModel {
|
||||||
id: NotSet,
|
id: NotSet,
|
||||||
name: Set(distro.to_string()),
|
name: Set(distro.to_string()),
|
||||||
description: NotSet,
|
description: NotSet,
|
||||||
|
@ -54,11 +64,11 @@ impl Handle {
|
||||||
new_distro.insert(&self.state.conn).await?.id
|
new_distro.insert(&self.state.conn).await?.id
|
||||||
};
|
};
|
||||||
|
|
||||||
let repo_id: Option<i32> = db::Repo::find()
|
let repo_id: Option<i32> = entity::Repo::find()
|
||||||
.filter(db::repo::Column::DistroId.eq(distro_id))
|
.filter(entity::repo::Column::DistroId.eq(distro_id))
|
||||||
.filter(db::repo::Column::Name.eq(repo))
|
.filter(entity::repo::Column::Name.eq(repo))
|
||||||
.select_only()
|
.select_only()
|
||||||
.column(db::repo::Column::Id)
|
.column(entity::repo::Column::Id)
|
||||||
.into_tuple()
|
.into_tuple()
|
||||||
.one(&self.state.conn)
|
.one(&self.state.conn)
|
||||||
.await?;
|
.await?;
|
||||||
|
@ -66,16 +76,16 @@ impl Handle {
|
||||||
let repo_id = if let Some(id) = repo_id {
|
let repo_id = if let Some(id) = repo_id {
|
||||||
id
|
id
|
||||||
} else {
|
} else {
|
||||||
let new_repo = db::repo::ActiveModel {
|
let new_repo = entity::repo::ActiveModel {
|
||||||
id: NotSet,
|
id: NotSet,
|
||||||
distro_id: Set(distro_id),
|
distro_id: Set(distro_id),
|
||||||
name: Set(repo.to_string()),
|
name: Set(repo.to_string()),
|
||||||
description: NotSet,
|
description: NotSet,
|
||||||
|
r#type: Set(entity::RepoType::Regular),
|
||||||
};
|
};
|
||||||
let id = new_repo.insert(&self.state.conn).await?.id;
|
let id = new_repo.insert(&self.state.conn).await?.id;
|
||||||
|
|
||||||
tokio::fs::create_dir(self.state.repos_dir.join(id.to_string())).await?;
|
self.register_repo(id).await?;
|
||||||
repos.insert(id, Default::default());
|
|
||||||
|
|
||||||
id
|
id
|
||||||
};
|
};
|
||||||
|
@ -84,12 +94,12 @@ impl Handle {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_repo(&self, distro: &str, repo: &str) -> crate::Result<Option<i32>> {
|
pub async fn get_repo(&self, distro: &str, repo: &str) -> crate::Result<Option<i32>> {
|
||||||
Ok(db::Repo::find()
|
Ok(entity::Repo::find()
|
||||||
.find_also_related(db::Distro)
|
.find_also_related(entity::Distro)
|
||||||
.filter(
|
.filter(
|
||||||
Condition::all()
|
Condition::all()
|
||||||
.add(db::repo::Column::Name.eq(repo))
|
.add(entity::repo::Column::Name.eq(repo))
|
||||||
.add(db::distro::Column::Name.eq(distro)),
|
.add(entity::distro::Column::Name.eq(distro)),
|
||||||
)
|
)
|
||||||
.one(&self.state.conn)
|
.one(&self.state.conn)
|
||||||
.await
|
.await
|
||||||
|
@ -98,7 +108,9 @@ impl Handle {
|
||||||
|
|
||||||
pub async fn remove_repo(&self, repo: i32) -> crate::Result<()> {
|
pub async fn remove_repo(&self, repo: i32) -> crate::Result<()> {
|
||||||
self.state.repos.write().await.remove(&repo);
|
self.state.repos.write().await.remove(&repo);
|
||||||
db::Repo::delete_by_id(repo).exec(&self.state.conn).await?;
|
entity::Repo::delete_by_id(repo)
|
||||||
|
.exec(&self.state.conn)
|
||||||
|
.await?;
|
||||||
let _ = tokio::fs::remove_dir_all(self.state.repos_dir.join(repo.to_string())).await;
|
let _ = tokio::fs::remove_dir_all(self.state.repos_dir.join(repo.to_string())).await;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -108,15 +120,15 @@ impl Handle {
|
||||||
/// packages with the given architecture as "pending deletion", before performing a manual sync
|
/// packages with the given architecture as "pending deletion", before performing a manual sync
|
||||||
/// & removal of stale packages.
|
/// & removal of stale packages.
|
||||||
pub async fn remove_repo_arch(&self, repo: i32, arch: &str) -> crate::Result<()> {
|
pub async fn remove_repo_arch(&self, repo: i32, arch: &str) -> crate::Result<()> {
|
||||||
db::Package::update_many()
|
entity::Package::update_many()
|
||||||
.col_expr(
|
.col_expr(
|
||||||
db::package::Column::State,
|
entity::package::Column::State,
|
||||||
Expr::value(db::PackageState::PendingDeletion),
|
Expr::value(entity::PackageState::PendingDeletion),
|
||||||
)
|
)
|
||||||
.filter(
|
.filter(
|
||||||
Condition::all()
|
Condition::all()
|
||||||
.add(db::package::Column::RepoId.eq(repo))
|
.add(entity::package::Column::RepoId.eq(repo))
|
||||||
.add(db::package::Column::Arch.eq(arch)),
|
.add(entity::package::Column::Arch.eq(arch)),
|
||||||
)
|
)
|
||||||
.exec(&self.state.conn)
|
.exec(&self.state.conn)
|
||||||
.await?;
|
.await?;
|
||||||
|
@ -128,17 +140,25 @@ impl Handle {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn queue_pkg(&self, repo: i32, path: PathBuf) {
|
pub async fn queue_pkg(&self, repo: i32, path: PathBuf) {
|
||||||
self.state.tx.send(Command::ParsePkg(repo, path)).unwrap();
|
self.state
|
||||||
|
.sync_queue
|
||||||
|
.0
|
||||||
|
.send(Command::ParsePkg(repo, path))
|
||||||
|
.unwrap();
|
||||||
self.state.repos.read().await.get(&repo).inspect(|n| {
|
self.state.repos.read().await.get(&repo).inspect(|n| {
|
||||||
n.0.fetch_add(1, Ordering::SeqCst);
|
n.0.fetch_add(1, Ordering::SeqCst);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn queue_sync(&self, repo: i32) {
|
async fn queue_sync(&self, repo: i32) {
|
||||||
self.state.tx.send(Command::SyncRepo(repo)).unwrap();
|
self.state
|
||||||
|
.sync_queue
|
||||||
|
.0
|
||||||
|
.send(Command::SyncRepo(repo))
|
||||||
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn queue_clean(&self) {
|
async fn queue_clean(&self) {
|
||||||
self.state.tx.send(Command::Clean).unwrap();
|
self.state.sync_queue.0.send(Command::Clean).unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,3 @@
|
||||||
|
mod parser;
|
||||||
|
|
||||||
|
pub use parser::{DbArchiveEntry, DbArchiveParser};
|
|
@ -0,0 +1,75 @@
|
||||||
|
use std::{
|
||||||
|
io::{self, BufRead},
|
||||||
|
path::Path,
|
||||||
|
};
|
||||||
|
|
||||||
|
use libarchive::{
|
||||||
|
read::{Archive, Builder, Entries, FileReader, ReadEntry},
|
||||||
|
Entry,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub struct DbArchiveParser<'a, T: 'a + Archive> {
|
||||||
|
entries: Entries<'a, T>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct DbArchiveEntry {
|
||||||
|
name: String,
|
||||||
|
version: String,
|
||||||
|
filename: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, T: Archive> DbArchiveParser<'a, T> {
|
||||||
|
pub fn new(ar: &'a mut T) -> Self {
|
||||||
|
Self {
|
||||||
|
entries: Entries::new(ar),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// parse a given entry. If the entry's not a regular file, the function returns None.
|
||||||
|
fn parse_entry(entry: ReadEntry<'a, T>) -> io::Result<DbArchiveEntry> {
|
||||||
|
let reader = io::BufReader::new(entry);
|
||||||
|
let mut lines = reader.lines();
|
||||||
|
|
||||||
|
let mut name: Option<String> = None;
|
||||||
|
let mut version: Option<String> = None;
|
||||||
|
let mut filename: Option<String> = None;
|
||||||
|
|
||||||
|
while let Some(line) = lines.next().transpose()? {
|
||||||
|
match line.as_str() {
|
||||||
|
"%NAME%" => name = lines.next().transpose()?,
|
||||||
|
"%VERSION%" => version = lines.next().transpose()?,
|
||||||
|
"%FILENAME%" => filename = lines.next().transpose()?,
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if name.is_some() && version.is_some() && filename.is_some() {
|
||||||
|
Ok(DbArchiveEntry {
|
||||||
|
name: name.unwrap(),
|
||||||
|
version: version.unwrap(),
|
||||||
|
filename: filename.unwrap(),
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
Err(io::Error::other("Missing fields in entry file"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, T: Archive> Iterator for DbArchiveParser<'a, T> {
|
||||||
|
type Item = io::Result<DbArchiveEntry>;
|
||||||
|
|
||||||
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
|
while let Some(entry) = self.entries.next() {
|
||||||
|
match entry {
|
||||||
|
Ok(entry) => {
|
||||||
|
if entry.filetype() == libarchive::FileType::RegularFile {
|
||||||
|
return Some(Self::parse_entry(entry));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(err) => return Some(Err(err.into())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,13 +1,12 @@
|
||||||
mod actor;
|
mod actor;
|
||||||
mod archive;
|
mod archive;
|
||||||
mod handle;
|
mod handle;
|
||||||
|
mod mirror;
|
||||||
pub mod package;
|
pub mod package;
|
||||||
|
|
||||||
pub use actor::Actor;
|
pub use actor::{Actor, AsyncActor};
|
||||||
pub use handle::Handle;
|
pub use handle::Handle;
|
||||||
|
|
||||||
use crate::db;
|
|
||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
collections::HashMap,
|
collections::HashMap,
|
||||||
path::{Path, PathBuf},
|
path::{Path, PathBuf},
|
||||||
|
@ -29,30 +28,36 @@ pub enum Command {
|
||||||
Clean,
|
Clean,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub enum AsyncCommand {
|
||||||
|
DownloadDbArchive(i32, reqwest::Url)
|
||||||
|
}
|
||||||
|
|
||||||
type RepoState = (AtomicU32, Arc<Mutex<()>>);
|
type RepoState = (AtomicU32, Arc<Mutex<()>>);
|
||||||
|
|
||||||
pub struct SharedState {
|
pub struct SharedState {
|
||||||
pub repos_dir: PathBuf,
|
pub repos_dir: PathBuf,
|
||||||
pub conn: DbConn,
|
pub conn: DbConn,
|
||||||
pub rx: Mutex<UnboundedReceiver<Command>>,
|
pub sync_queue: (UnboundedSender<Command>, Mutex<UnboundedReceiver<Command>>),
|
||||||
pub tx: UnboundedSender<Command>,
|
pub async_queue: (
|
||||||
|
UnboundedSender<AsyncCommand>,
|
||||||
|
tokio::sync::Mutex<UnboundedReceiver<AsyncCommand>>,
|
||||||
|
),
|
||||||
pub repos: RwLock<HashMap<i32, RepoState>>,
|
pub repos: RwLock<HashMap<i32, RepoState>>,
|
||||||
|
pub client: reqwest::Client,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SharedState {
|
impl SharedState {
|
||||||
pub fn new(
|
pub fn new(repos_dir: impl AsRef<Path>, conn: DbConn) -> Self {
|
||||||
repos_dir: impl AsRef<Path>,
|
|
||||||
conn: DbConn,
|
|
||||||
repos: HashMap<i32, (AtomicU32, Arc<Mutex<()>>)>,
|
|
||||||
) -> Self {
|
|
||||||
let (tx, rx) = unbounded_channel();
|
let (tx, rx) = unbounded_channel();
|
||||||
|
let (async_tx, async_rx) = unbounded_channel();
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
repos_dir: repos_dir.as_ref().to_path_buf(),
|
repos_dir: repos_dir.as_ref().to_path_buf(),
|
||||||
conn,
|
conn,
|
||||||
rx: Mutex::new(rx),
|
sync_queue: (tx, Mutex::new(rx)),
|
||||||
tx,
|
async_queue: (async_tx, tokio::sync::Mutex::new(async_rx)),
|
||||||
repos: RwLock::new(repos),
|
repos: RwLock::new(Default::default()),
|
||||||
|
client: reqwest::Client::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -61,30 +66,38 @@ pub fn start(
|
||||||
repos_dir: impl AsRef<Path>,
|
repos_dir: impl AsRef<Path>,
|
||||||
conn: DbConn,
|
conn: DbConn,
|
||||||
rt: runtime::Handle,
|
rt: runtime::Handle,
|
||||||
actors: u32,
|
sync_actors: u32,
|
||||||
|
async_actors: u32,
|
||||||
) -> crate::Result<Handle> {
|
) -> crate::Result<Handle> {
|
||||||
std::fs::create_dir_all(repos_dir.as_ref())?;
|
std::fs::create_dir_all(repos_dir.as_ref())?;
|
||||||
|
|
||||||
let mut repos = HashMap::new();
|
|
||||||
let repo_ids: Vec<i32> = rt.block_on(
|
let repo_ids: Vec<i32> = rt.block_on(
|
||||||
db::Repo::find()
|
entity::prelude::Repo::find()
|
||||||
.select_only()
|
.select_only()
|
||||||
.column(db::repo::Column::Id)
|
.column(entity::repo::Column::Id)
|
||||||
.into_tuple()
|
.into_tuple()
|
||||||
.all(&conn),
|
.all(&conn),
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
for id in repo_ids {
|
let state = Arc::new(SharedState::new(repos_dir, conn));
|
||||||
repos.insert(id, Default::default());
|
|
||||||
}
|
|
||||||
|
|
||||||
let state = Arc::new(SharedState::new(repos_dir, conn, repos));
|
for _ in 0..sync_actors {
|
||||||
|
let actor = Actor::new(rt.clone(), &state);
|
||||||
for _ in 0..actors {
|
|
||||||
let actor = Actor::new(rt.clone(), Arc::clone(&state));
|
|
||||||
|
|
||||||
std::thread::spawn(|| actor.run());
|
std::thread::spawn(|| actor.run());
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(Handle::new(&state))
|
for _ in 0..async_actors {
|
||||||
|
let actor = AsyncActor::new(&state);
|
||||||
|
|
||||||
|
rt.spawn(actor.run());
|
||||||
|
}
|
||||||
|
|
||||||
|
let handle = Handle::new(&state);
|
||||||
|
|
||||||
|
for id in repo_ids {
|
||||||
|
rt.block_on(handle.register_repo(id))?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(handle)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,3 @@
|
||||||
use crate::db::entities::package;
|
|
||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
fmt, fs,
|
fmt, fs,
|
||||||
io::{self, BufRead, BufReader, Read},
|
io::{self, BufRead, BufReader, Read},
|
||||||
|
@ -11,7 +9,6 @@ use libarchive::{
|
||||||
read::{Archive, Builder},
|
read::{Archive, Builder},
|
||||||
Entry, ReadFilter,
|
Entry, ReadFilter,
|
||||||
};
|
};
|
||||||
use sea_orm::ActiveValue::Set;
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct Package {
|
pub struct Package {
|
||||||
|
@ -194,26 +191,3 @@ impl Package {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<Package> for package::ActiveModel {
|
|
||||||
fn from(pkg: Package) -> Self {
|
|
||||||
let info = pkg.info;
|
|
||||||
|
|
||||||
package::ActiveModel {
|
|
||||||
base: Set(info.base),
|
|
||||||
name: Set(info.name),
|
|
||||||
version: Set(info.version),
|
|
||||||
arch: Set(info.arch),
|
|
||||||
size: Set(info.size),
|
|
||||||
c_size: Set(info.csize),
|
|
||||||
description: Set(info.description),
|
|
||||||
url: Set(info.url),
|
|
||||||
build_date: Set(info.build_date),
|
|
||||||
packager: Set(info.packager),
|
|
||||||
pgp_sig: Set(info.pgpsig),
|
|
||||||
pgp_sig_size: Set(info.pgpsigsize),
|
|
||||||
sha256_sum: Set(info.sha256sum),
|
|
||||||
..Default::default()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -5,13 +5,14 @@ use pagination::PaginatedResponse;
|
||||||
|
|
||||||
use axum::{
|
use axum::{
|
||||||
extract::{Path, Query, State},
|
extract::{Path, Query, State},
|
||||||
|
http::StatusCode,
|
||||||
routing::get,
|
routing::get,
|
||||||
Json, Router,
|
Json, Router,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub fn router() -> Router<crate::Global> {
|
pub fn router() -> Router<crate::Global> {
|
||||||
Router::new()
|
Router::new()
|
||||||
.route("/repos", get(get_repos))
|
.route("/repos", get(get_repos).post(post_repo))
|
||||||
.route("/repos/:id", get(get_single_repo))
|
.route("/repos/:id", get(get_single_repo))
|
||||||
.route("/packages", get(get_packages))
|
.route("/packages", get(get_packages))
|
||||||
.route("/packages/:id", get(get_single_package))
|
.route("/packages/:id", get(get_single_package))
|
||||||
|
@ -21,7 +22,7 @@ async fn get_repos(
|
||||||
State(global): State<crate::Global>,
|
State(global): State<crate::Global>,
|
||||||
Query(pagination): Query<pagination::Query>,
|
Query(pagination): Query<pagination::Query>,
|
||||||
Query(filter): Query<db::query::repo::Filter>,
|
Query(filter): Query<db::query::repo::Filter>,
|
||||||
) -> crate::Result<Json<PaginatedResponse<db::repo::Model>>> {
|
) -> crate::Result<Json<PaginatedResponse<entity::repo::Model>>> {
|
||||||
let items =
|
let items =
|
||||||
db::query::repo::page(&global.db, pagination.per_page, pagination.page - 1, filter).await?;
|
db::query::repo::page(&global.db, pagination.per_page, pagination.page - 1, filter).await?;
|
||||||
|
|
||||||
|
@ -31,7 +32,7 @@ async fn get_repos(
|
||||||
async fn get_single_repo(
|
async fn get_single_repo(
|
||||||
State(global): State<crate::Global>,
|
State(global): State<crate::Global>,
|
||||||
Path(id): Path<i32>,
|
Path(id): Path<i32>,
|
||||||
) -> crate::Result<Json<db::repo::Model>> {
|
) -> crate::Result<Json<entity::repo::Model>> {
|
||||||
let repo = db::query::repo::by_id(&global.db, id)
|
let repo = db::query::repo::by_id(&global.db, id)
|
||||||
.await?
|
.await?
|
||||||
.ok_or(axum::http::StatusCode::NOT_FOUND)?;
|
.ok_or(axum::http::StatusCode::NOT_FOUND)?;
|
||||||
|
@ -39,11 +40,21 @@ async fn get_single_repo(
|
||||||
Ok(Json(repo))
|
Ok(Json(repo))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn post_repo(
|
||||||
|
State(global): State<crate::Global>,
|
||||||
|
Json(repo): Json<crate::db::NewRepo>,
|
||||||
|
) -> crate::Result<(StatusCode, Json<entity::repo::Model>)> {
|
||||||
|
let model = repo.insert(&global.db).await?;
|
||||||
|
global.repo.register_repo(model.id).await?;
|
||||||
|
|
||||||
|
Ok((StatusCode::CREATED, Json(model)))
|
||||||
|
}
|
||||||
|
|
||||||
async fn get_packages(
|
async fn get_packages(
|
||||||
State(global): State<crate::Global>,
|
State(global): State<crate::Global>,
|
||||||
Query(pagination): Query<pagination::Query>,
|
Query(pagination): Query<pagination::Query>,
|
||||||
Query(filter): Query<db::query::package::Filter>,
|
Query(filter): Query<db::query::package::Filter>,
|
||||||
) -> crate::Result<Json<PaginatedResponse<db::package::Model>>> {
|
) -> crate::Result<Json<PaginatedResponse<entity::package::Model>>> {
|
||||||
let items =
|
let items =
|
||||||
db::query::package::page(&global.db, pagination.per_page, pagination.page - 1, filter)
|
db::query::package::page(&global.db, pagination.per_page, pagination.page - 1, filter)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
Loading…
Reference in New Issue