Compare commits

..

No commits in common. "dev" and "repo-db" have entirely different histories.
dev ... repo-db

57 changed files with 2301 additions and 3070 deletions

View File

@ -2,5 +2,3 @@ target/
.git/
server/data/
server/test.db/
Dockerfile
docker-compose.yml

View File

@ -1,39 +0,0 @@
matrix:
PLATFORM:
- 'linux/amd64'
platform: ${PLATFORM}
when:
branch: [main, dev]
event: [push, tag]
steps:
build:
image: 'git.rustybever.be/chewing_bever/rieter-builder:1.79-alpine3.19'
commands:
- cargo build --verbose --release
- '[ "$(readelf -d target/release/rieterd | grep NEEDED | wc -l)" = 0 ]'
publish-dev:
image: 'git.rustybever.be/chewing_bever/rieter-builder:1.79-alpine3.19'
commands:
- apk add --no-cache minio-client
- mcli alias set rb 'https://s3.rustybever.be' "$MINIO_ACCESS_KEY" "$MINIO_SECRET_KEY"
- mcli cp target/release/rieterd "rb/rieter/commits/$CI_COMMIT_SHA/rieterd-$(echo '${PLATFORM}' | sed 's:/:-:g')"
secrets:
- minio_access_key
- minio_secret_key
publish-rel:
image: 'curlimages/curl'
commands:
- >
curl -s --fail
--user "Chewing_Bever:$GITEA_PASSWORD"
--upload-file target/release/rieterd
https://git.rustybever.be/api/packages/Chewing_Bever/generic/rieter/"${CI_COMMIT_TAG}"/rieterd-"$(echo '${PLATFORM}' | sed 's:/:-:g')"
secrets:
- gitea_password
when:
event: tag

View File

@ -1,21 +1,13 @@
platform: 'linux/amd64'
branches:
exclude: [main]
when:
branch:
exclude: [dev, main]
event: push
steps:
pipeline:
build:
image: 'git.rustybever.be/chewing_bever/rieter-builder:1.79-alpine3.19'
image: 'rust:1.70-alpine3.18'
commands:
- apk add --no-cache build-base libarchive libarchive-dev
- cargo build --verbose
# Binaries, even debug ones, should be statically compiled
- '[ "$(readelf -d target/debug/rieterd | grep NEEDED | wc -l)" = 0 ]'
when:
event: [push]
# Clippy also performs a full build, so putting it here saves the CI a
# lot of work
clippy:
image: 'git.rustybever.be/chewing_bever/rieter-builder:1.79-alpine3.19'
commands:
- cargo clippy -- --no-deps -Dwarnings

View File

@ -0,0 +1,13 @@
platform: 'linux/amd64'
branches:
exclude: [main]
pipeline:
clippy:
image: 'rust:1.70-alpine3.18'
commands:
- rustup component add clippy
- cargo clippy -- --no-deps -Dwarnings
when:
event: [push]

View File

@ -1,43 +0,0 @@
platform: 'linux/amd64'
when:
branch: [main, dev]
event: [push, tag]
depends_on:
- build-rel
steps:
dev:
image: 'woodpeckerci/plugin-docker-buildx'
secrets:
- 'docker_username'
- 'docker_password'
settings:
registry: 'git.rustybever.be'
repo: 'git.rustybever.be/chewing_bever/rieter'
tags:
- 'dev'
platforms: [ 'linux/amd64' ]
build_args_from_env:
- 'CI_COMMIT_SHA'
mtu: 1300
when:
branch: dev
event: push
release:
image: 'woodpeckerci/plugin-docker-buildx'
secrets:
- 'docker_username'
- 'docker_password'
settings:
registry: 'git.rustybever.be'
repo: 'git.rustybever.be/chewing_bever/rieter'
auto_tag: true
platforms: [ 'linux/amd64' ]
build_args_from_env:
- 'CI_COMMIT_SHA'
mtu: 1300
when:
event: tag

View File

@ -1,12 +1,13 @@
platform: 'linux/amd64'
when:
branch:
branches:
exclude: [main]
event: push
steps:
pipeline:
lint:
image: 'git.rustybever.be/chewing_bever/rieter-builder:1.79-alpine3.19'
image: 'rust:1.70-alpine3.18'
commands:
- rustup component add rustfmt
- cargo fmt -- --check
when:
event: [push]

View File

@ -7,15 +7,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased](https://git.rustybever.be/Chewing_Bever/rieter/src/branch/dev)
## [0.1.0](https://git.rustybever.be/Chewing_Bever/rieter/src/tag/0.1.0)
### Added
* Server
* Functional repository server
* Supports any number of repositories, grouped into distros, each
supporting any number of architectures
* Repository & package information available using JSON REST API
* Queueing system with configurable number of workers for resilient
concurrency
* TOML configuration file
* SQLite & Postgres support
* Serve packages from any number of repositories & architectures
* Publish packages to and delete packages from repositories using HTTP
requests
* Packages of architecture "any" are part of every architecture's
database
* Bearer authentication for private routes

1492
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,16 +1,31 @@
FROM git.rustybever.be/chewing_bever/rieter-builder:1.79-alpine3.19 AS builder
FROM rust:1.70-alpine3.18 AS builder
ARG TARGETPLATFORM
ARG CI_COMMIT_SHA
ARG DI_VER=1.2.5
WORKDIR /app
# RUN apk add --no-cache \
# build-base \
# curl \
# make \
# unzip \
# pkgconf \
# openssl openssl-libs-static openssl-dev \
# libarchive-static libarchive-dev \
# zlib-static zlib-dev \
# bzip2-static bzip2-dev \
# xz-static xz-dev \
# expat-static expat-dev \
# zstd-static zstd-dev \
# lz4-static lz4-dev \
# acl-static acl-dev
RUN apk add --no-cache \
build-base \
curl \
make \
unzip \
pkgconf
pkgconf \
libarchive libarchive-dev
# Build dumb-init
RUN curl -Lo - "https://github.com/Yelp/dumb-init/archive/refs/tags/v${DI_VER}.tar.gz" | tar -xzf - && \
@ -21,19 +36,29 @@ RUN curl -Lo - "https://github.com/Yelp/dumb-init/archive/refs/tags/v${DI_VER}.t
COPY . .
RUN curl \
--fail \
-o rieterd \
"https://s3.rustybever.be/rieter/commits/${CI_COMMIT_SHA}/rieterd-$(echo "${TARGETPLATFORM}" | sed 's:/:-:g')" && \
chmod +x rieterd
# ENV LIBARCHIVE_STATIC=1 \
# LIBARCHIVE_LIB_DIR=/usr/lib \
# LIBARCHIVE_INCLUDE_DIR=/usr/include \
# LIBARCHIVE_LDFLAGS='-lssl -lcrypto -L/lib -lz -lbz2 -llzma -lexpat -lzstd -llz4'
# LIBARCHIVE_LDFLAGS='-L/usr/lib -lz -lbz2 -llzma -lexpat -lzstd -llz4 -lsqlite3'
RUN cargo build --release && \
du -h target/release/rieterd && \
readelf -d target/release/rieterd && \
chmod +x target/release/rieterd
# [ "$(readelf -d target/debug/rieterd | grep NEEDED | wc -l)" = 0 ] && \
# chmod +x target/debug/rieterd
FROM alpine:3.19
FROM alpine:3.18
WORKDIR /app
RUN apk add --no-cache \
libarchive
COPY --from=builder /app/dumb-init /bin/dumb-init
COPY --from=builder /app/rieterd /bin/rieterd
WORKDIR /data
COPY --from=builder /app/target/debug/rieterd /bin/rieterd
ENTRYPOINT ["/bin/dumb-init", "--"]
CMD ["/bin/rieterd"]

View File

@ -1,38 +1,2 @@
# Rieter
# rieter
Rieter is both a Pacman repository server, as well as a build system for Pacman
packages.
## Goals
### Repository server
My first goal for this project is to create a convenient all-round repository
server implementation that could be used for everything from self-hosting a
local repository to managing an entire distribution's package repository. It
should be easy to deploy, lightweight, and work with any distribution. It
should support any number of repositories and packages, and work with any
package architecture.
The repositories can be populated by manually uploading packages to the server
(e.g. from a CI build), or by mirroring already existing repositories. The
mirroring feature in particular makes it trivial to set up a new mirror for a
distribution, as the server would take care of keeping the mirror up-to-date.
Another usecase for this would be creating a local mirror of your
distribution's repositories, which can greatly reduce your update times
depending on your internet connection.
Most users however don't need a full copy of a distro's package repository, so
Rieter also provides a "smart mirror" mode. In this mode, a Rieter instance
only syncs packages that have been requested before, e.g. from a previous
system update. This way, your updates will still be a lot faster as the
required packages are cached, but packages you don't use don't get stored,
saving you a lot of storage space.
### Build system
The second goal is to create an easy-to-use build system for Pacman packages.
This could for example be used to automatically build AUR packages and publish
them to one of your repositories. This can greatly reduce update times, as you
no longer need to build AUR packages locally, as this automatically happens "in
the cloud".

View File

@ -1,20 +0,0 @@
# Command to build and push builder image (change tags as necessary):
# docker buildx build -f build.Dockerfile -t git.rustybever.be/chewing_bever/rieter-builder:1.79-alpine3.19 --platform linux/amd64,linux/arm64 --push .
FROM rust:1.79-alpine3.19
# Dependencies required to statically compile libarchive and libsqlite3
RUN apk add --no-cache \
build-base \
libarchive-static libarchive-dev \
zlib-static \
openssl-libs-static \
bzip2-static \
xz-static \
expat-static \
zstd-static \
lz4-static \
acl-static && \
rustup component add clippy rustfmt
# Tell the libarchive3-sys package to statically link libarchive
ENV LIBARCHIVE_STATIC=1

View File

@ -1,17 +0,0 @@
version: '3.4'
services:
app:
image: 'rieterd:latest'
ports:
- '8000:8000'
environment:
- 'RIETER_API_KEY=test'
- 'RIETER_DATABASE_URL=postgres://rieter:rieter@db:5432/rieter'
db:
image: 'postgres:15.3-alpine'
environment:
- 'POSTGRES_DB=rieter'
- 'POSTGRES_USER=rieter'
- 'POSTGRES_PASSWORD=rieter'

View File

@ -64,11 +64,11 @@ impl ReadFilter {
pub fn extension(&self) -> Option<&str> {
match self {
ReadFilter::None => Some(""),
ReadFilter::Gzip => Some("gz"),
ReadFilter::Bzip2 => Some("bz2"),
ReadFilter::Lzma => Some("lzma"),
ReadFilter::Xz => Some("xz"),
ReadFilter::Zstd => Some("zst"),
ReadFilter::Gzip => Some(".gz"),
ReadFilter::Bzip2 => Some(".bz2"),
ReadFilter::Lzma => Some(".lzma"),
ReadFilter::Xz => Some(".xz"),
ReadFilter::Zstd => Some(".zst"),
_ => None,
}
}
@ -386,7 +386,6 @@ pub enum ExtractOption {
ClearNoChangeFFlags,
}
#[derive(Default)]
pub struct ExtractOptions {
pub flags: i32,
}
@ -421,3 +420,9 @@ impl ExtractOptions {
self
}
}
impl Default for ExtractOptions {
fn default() -> ExtractOptions {
ExtractOptions { flags: 0 }
}
}

View File

@ -78,7 +78,7 @@ impl Builder {
ffi::archive_read_support_filter_program_signature(
self.handle_mut(),
c_prog.as_ptr(),
mem::transmute::<std::option::Option<extern "C" fn()>, *const std::ffi::c_void>(cb),
mem::transmute(cb),
size,
)
}

View File

@ -11,8 +11,6 @@ pub struct Builder {
consumed: bool,
}
unsafe impl Send for Builder {}
impl Builder {
pub fn new() -> Self {
Builder::default()

View File

@ -2,12 +2,10 @@ use super::WriteEntry;
use crate::error::ArchiveError;
use crate::Entry;
use crate::Handle;
use core::ffi::c_void;
use libarchive3_sys::ffi;
use std::fs;
use std::io;
use std::io::Read;
use std::io::Write;
use std::path::Path;
pub struct FileWriter {
@ -15,8 +13,6 @@ pub struct FileWriter {
closed: bool,
}
unsafe impl Send for FileWriter {}
impl Handle for FileWriter {
unsafe fn handle(&self) -> *const ffi::Struct_archive {
self.handle as *const _
@ -35,22 +31,11 @@ impl FileWriter {
}
}
/// Append the given entry to the archive. After successfully calling this function, writing to
/// the archive now writes to this entry.
pub fn append_entry(&mut self, entry: &mut WriteEntry) -> crate::Result<()> {
unsafe {
match ffi::archive_write_header(self.handle_mut(), entry.entry_mut()) {
ffi::ARCHIVE_OK => Ok(()),
_ => Err(ArchiveError::from(self as &dyn Handle)),
}
}
}
pub fn append_data<R: Read>(&mut self, entry: &mut WriteEntry, r: &mut R) -> crate::Result<()> {
unsafe {
match ffi::archive_write_header(self.handle_mut(), entry.entry_mut()) {
ffi::ARCHIVE_OK => (),
_ => return Err(ArchiveError::from(self as &dyn Handle)),
_ => return Err(ArchiveError::from(self as &dyn Handle).into()),
}
}
@ -74,7 +59,7 @@ impl FileWriter {
// Negative values signal errors
if res < 0 {
return Err(ArchiveError::from(self as &dyn Handle));
return Err(ArchiveError::from(self as &dyn Handle).into());
}
written += usize::try_from(res).unwrap();
@ -122,23 +107,3 @@ impl Drop for FileWriter {
}
}
}
impl Write for FileWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let res = unsafe {
ffi::archive_write_data(self.handle_mut(), buf.as_ptr() as *const c_void, buf.len())
} as isize;
if res < 0 {
Err(ArchiveError::from(self as &dyn Handle).into())
} else {
// Unwrap is safe as we check if the value is negative in the if statement
Ok(res.try_into().unwrap())
}
}
fn flush(&mut self) -> io::Result<()> {
// Libarchive doesn't seem to provide a flush mechanism
Ok(())
}
}

View File

@ -3,15 +3,12 @@ mod file;
use crate::Entry;
pub use builder::Builder;
pub use file::FileWriter;
use libarchive3_sys::ffi;
pub struct WriteEntry {
entry: *mut ffi::Struct_archive_entry,
}
unsafe impl Send for WriteEntry {}
impl WriteEntry {
pub fn new() -> Self {
let entry = unsafe { ffi::archive_entry_new() };
@ -30,12 +27,6 @@ impl Entry for WriteEntry {
}
}
impl Default for WriteEntry {
fn default() -> Self {
Self::new()
}
}
impl Drop for WriteEntry {
fn drop(&mut self) {
unsafe { ffi::archive_entry_free(self.entry_mut()) }

View File

@ -4,7 +4,3 @@
DYLD_LIBRARY_PATH=/Applications/Xcode.app/Contents/Developer/Toolchains/XcodeDefault.xctoolchain/usr/lib
xcode-select --install
# 64-bit timestamps
`time_t` has been replaced with `i64` as Musl no longer supports 32-bit `time_t` values.

View File

@ -1,6 +1,35 @@
extern crate pkg_config;
use std::env;
fn main() {
pkg_config::Config::new()
.atleast_version("3")
.probe("libarchive")
.unwrap();
let lib_dir = env::var("LIBARCHIVE_LIB_DIR").ok();
let include_dir = env::var("LIBARCHIVE_INCLUDE_DIR").ok();
if lib_dir.is_some() && include_dir.is_some() {
println!("cargo:rustc-flags=-L native={}", lib_dir.unwrap());
println!("cargo:include={}", include_dir.unwrap());
let mode = match env::var_os("LIBARCHIVE_STATIC") {
Some(_) => "static",
None => "dylib",
};
println!("cargo:rustc-flags=-l {0}=archive", mode);
if mode == "static" {
if let Ok(ldflags) = env::var("LIBARCHIVE_LDFLAGS") {
for token in ldflags.split_whitespace() {
if token.starts_with("-L") {
println!("cargo:rustc-flags=-L native={}", token.replace("-L", ""));
} else if token.starts_with("-l") {
println!("cargo:rustc-flags=-l static={}", token.replace("-l", ""));
}
}
}
}
} else {
match pkg_config::find_library("libarchive") {
Ok(_) => (),
Err(msg) => panic!("Unable to locate libarchive, err={:?}", msg),
}
}
}

View File

@ -294,10 +294,14 @@ extern "C" {
) -> c_int;
pub fn archive_read_extract_set_progress_callback(
arg1: *mut Struct_archive,
_progress_func: ::std::option::Option<unsafe extern "C" fn(arg1: *mut c_void)>,
_progress_func: ::std::option::Option<unsafe extern "C" fn(arg1: *mut c_void) -> ()>,
_user_data: *mut c_void,
);
pub fn archive_read_extract_set_skip_file(arg1: *mut Struct_archive, arg2: i64, arg3: i64);
) -> ();
pub fn archive_read_extract_set_skip_file(
arg1: *mut Struct_archive,
arg2: i64,
arg3: i64,
) -> ();
pub fn archive_read_close(arg1: *mut Struct_archive) -> c_int;
pub fn archive_read_free(arg1: *mut Struct_archive) -> c_int;
pub fn archive_read_finish(arg1: *mut Struct_archive) -> c_int;
@ -439,7 +443,7 @@ extern "C" {
arg3: ::std::option::Option<
unsafe extern "C" fn(arg1: *mut c_void, arg2: *const c_char, arg3: i64) -> i64,
>,
arg4: ::std::option::Option<unsafe extern "C" fn(arg1: *mut c_void)>,
arg4: ::std::option::Option<unsafe extern "C" fn(arg1: *mut c_void) -> ()>,
) -> c_int;
pub fn archive_write_disk_set_user_lookup(
arg1: *mut Struct_archive,
@ -447,7 +451,7 @@ extern "C" {
arg3: ::std::option::Option<
unsafe extern "C" fn(arg1: *mut c_void, arg2: *const c_char, arg3: i64) -> i64,
>,
arg4: ::std::option::Option<unsafe extern "C" fn(arg1: *mut c_void)>,
arg4: ::std::option::Option<unsafe extern "C" fn(arg1: *mut c_void) -> ()>,
) -> c_int;
pub fn archive_write_disk_gid(arg1: *mut Struct_archive, arg2: *const c_char, arg3: i64)
-> i64;
@ -471,7 +475,7 @@ extern "C" {
arg3: ::std::option::Option<
unsafe extern "C" fn(arg1: *mut c_void, arg2: i64) -> *const c_char,
>,
arg4: ::std::option::Option<unsafe extern "C" fn(arg1: *mut c_void)>,
arg4: ::std::option::Option<unsafe extern "C" fn(arg1: *mut c_void) -> ()>,
) -> c_int;
pub fn archive_read_disk_set_uname_lookup(
arg1: *mut Struct_archive,
@ -479,7 +483,7 @@ extern "C" {
arg3: ::std::option::Option<
unsafe extern "C" fn(arg1: *mut c_void, arg2: i64) -> *const c_char,
>,
arg4: ::std::option::Option<unsafe extern "C" fn(arg1: *mut c_void)>,
arg4: ::std::option::Option<unsafe extern "C" fn(arg1: *mut c_void) -> ()>,
) -> c_int;
pub fn archive_read_disk_open(arg1: *mut Struct_archive, arg2: *const c_char) -> c_int;
pub fn archive_read_disk_open_w(arg1: *mut Struct_archive, arg2: *const wchar_t) -> c_int;
@ -498,7 +502,7 @@ extern "C" {
arg1: *mut Struct_archive,
arg2: *mut c_void,
arg3: *mut Struct_archive_entry,
),
) -> (),
>,
_client_data: *mut c_void,
) -> c_int;
@ -525,9 +529,10 @@ extern "C" {
pub fn archive_error_string(arg1: *mut Struct_archive) -> *const c_char;
pub fn archive_format_name(arg1: *mut Struct_archive) -> *const c_char;
pub fn archive_format(arg1: *mut Struct_archive) -> c_int;
pub fn archive_clear_error(arg1: *mut Struct_archive);
pub fn archive_set_error(arg1: *mut Struct_archive, _err: c_int, fmt: *const c_char, ...);
pub fn archive_copy_error(dest: *mut Struct_archive, src: *mut Struct_archive);
pub fn archive_clear_error(arg1: *mut Struct_archive) -> ();
pub fn archive_set_error(arg1: *mut Struct_archive, _err: c_int, fmt: *const c_char, ...)
-> ();
pub fn archive_copy_error(dest: *mut Struct_archive, src: *mut Struct_archive) -> ();
pub fn archive_file_count(arg1: *mut Struct_archive) -> c_int;
pub fn archive_match_new() -> *mut Struct_archive;
pub fn archive_match_free(arg1: *mut Struct_archive) -> c_int;
@ -585,7 +590,7 @@ extern "C" {
pub fn archive_match_include_time(
arg1: *mut Struct_archive,
_flag: c_int,
_sec: i64,
_sec: time_t,
_nsec: c_long,
) -> c_int;
pub fn archive_match_include_date(
@ -625,16 +630,16 @@ extern "C" {
pub fn archive_match_include_gname_w(arg1: *mut Struct_archive, arg2: *const wchar_t) -> c_int;
pub fn archive_entry_clear(arg1: *mut Struct_archive_entry) -> *mut Struct_archive_entry;
pub fn archive_entry_clone(arg1: *mut Struct_archive_entry) -> *mut Struct_archive_entry;
pub fn archive_entry_free(arg1: *mut Struct_archive_entry);
pub fn archive_entry_free(arg1: *mut Struct_archive_entry) -> ();
pub fn archive_entry_new() -> *mut Struct_archive_entry;
pub fn archive_entry_new2(arg1: *mut Struct_archive) -> *mut Struct_archive_entry;
pub fn archive_entry_atime(arg1: *mut Struct_archive_entry) -> i64;
pub fn archive_entry_atime(arg1: *mut Struct_archive_entry) -> time_t;
pub fn archive_entry_atime_nsec(arg1: *mut Struct_archive_entry) -> c_long;
pub fn archive_entry_atime_is_set(arg1: *mut Struct_archive_entry) -> c_int;
pub fn archive_entry_birthtime(arg1: *mut Struct_archive_entry) -> i64;
pub fn archive_entry_birthtime(arg1: *mut Struct_archive_entry) -> time_t;
pub fn archive_entry_birthtime_nsec(arg1: *mut Struct_archive_entry) -> c_long;
pub fn archive_entry_birthtime_is_set(arg1: *mut Struct_archive_entry) -> c_int;
pub fn archive_entry_ctime(arg1: *mut Struct_archive_entry) -> i64;
pub fn archive_entry_ctime(arg1: *mut Struct_archive_entry) -> time_t;
pub fn archive_entry_ctime_nsec(arg1: *mut Struct_archive_entry) -> c_long;
pub fn archive_entry_ctime_is_set(arg1: *mut Struct_archive_entry) -> c_int;
pub fn archive_entry_dev(arg1: *mut Struct_archive_entry) -> dev_t;
@ -646,7 +651,7 @@ extern "C" {
arg1: *mut Struct_archive_entry,
arg2: *mut c_ulong,
arg3: *mut c_ulong,
);
) -> ();
pub fn archive_entry_fflags_text(arg1: *mut Struct_archive_entry) -> *const c_char;
pub fn archive_entry_gid(arg1: *mut Struct_archive_entry) -> i64;
pub fn archive_entry_gname(arg1: *mut Struct_archive_entry) -> *const c_char;
@ -657,7 +662,7 @@ extern "C" {
pub fn archive_entry_ino64(arg1: *mut Struct_archive_entry) -> i64;
pub fn archive_entry_ino_is_set(arg1: *mut Struct_archive_entry) -> c_int;
pub fn archive_entry_mode(arg1: *mut Struct_archive_entry) -> mode_t;
pub fn archive_entry_mtime(arg1: *mut Struct_archive_entry) -> i64;
pub fn archive_entry_mtime(arg1: *mut Struct_archive_entry) -> time_t;
pub fn archive_entry_mtime_nsec(arg1: *mut Struct_archive_entry) -> c_long;
pub fn archive_entry_mtime_is_set(arg1: *mut Struct_archive_entry) -> c_int;
pub fn archive_entry_nlink(arg1: *mut Struct_archive_entry) -> c_uint;
@ -677,17 +682,33 @@ extern "C" {
pub fn archive_entry_uid(arg1: *mut Struct_archive_entry) -> i64;
pub fn archive_entry_uname(arg1: *mut Struct_archive_entry) -> *const c_char;
pub fn archive_entry_uname_w(arg1: *mut Struct_archive_entry) -> *const wchar_t;
pub fn archive_entry_set_atime(arg1: *mut Struct_archive_entry, arg2: i64, arg3: c_long);
pub fn archive_entry_unset_atime(arg1: *mut Struct_archive_entry);
pub fn archive_entry_set_birthtime(arg1: *mut Struct_archive_entry, arg2: i64, arg3: c_long);
pub fn archive_entry_unset_birthtime(arg1: *mut Struct_archive_entry);
pub fn archive_entry_set_ctime(arg1: *mut Struct_archive_entry, arg2: i64, arg3: c_long);
pub fn archive_entry_unset_ctime(arg1: *mut Struct_archive_entry);
pub fn archive_entry_set_dev(arg1: *mut Struct_archive_entry, arg2: dev_t);
pub fn archive_entry_set_devmajor(arg1: *mut Struct_archive_entry, arg2: dev_t);
pub fn archive_entry_set_devminor(arg1: *mut Struct_archive_entry, arg2: dev_t);
pub fn archive_entry_set_filetype(arg1: *mut Struct_archive_entry, arg2: c_uint);
pub fn archive_entry_set_fflags(arg1: *mut Struct_archive_entry, arg2: c_ulong, arg3: c_ulong);
pub fn archive_entry_set_atime(
arg1: *mut Struct_archive_entry,
arg2: time_t,
arg3: c_long,
) -> ();
pub fn archive_entry_unset_atime(arg1: *mut Struct_archive_entry) -> ();
pub fn archive_entry_set_birthtime(
arg1: *mut Struct_archive_entry,
arg2: time_t,
arg3: c_long,
) -> ();
pub fn archive_entry_unset_birthtime(arg1: *mut Struct_archive_entry) -> ();
pub fn archive_entry_set_ctime(
arg1: *mut Struct_archive_entry,
arg2: time_t,
arg3: c_long,
) -> ();
pub fn archive_entry_unset_ctime(arg1: *mut Struct_archive_entry) -> ();
pub fn archive_entry_set_dev(arg1: *mut Struct_archive_entry, arg2: dev_t) -> ();
pub fn archive_entry_set_devmajor(arg1: *mut Struct_archive_entry, arg2: dev_t) -> ();
pub fn archive_entry_set_devminor(arg1: *mut Struct_archive_entry, arg2: dev_t) -> ();
pub fn archive_entry_set_filetype(arg1: *mut Struct_archive_entry, arg2: c_uint) -> ();
pub fn archive_entry_set_fflags(
arg1: *mut Struct_archive_entry,
arg2: c_ulong,
arg3: c_ulong,
) -> ();
pub fn archive_entry_copy_fflags_text(
arg1: *mut Struct_archive_entry,
arg2: *const c_char,
@ -696,60 +717,79 @@ extern "C" {
arg1: *mut Struct_archive_entry,
arg2: *const wchar_t,
) -> *const wchar_t;
pub fn archive_entry_set_gid(arg1: *mut Struct_archive_entry, arg2: i64);
pub fn archive_entry_set_gname(arg1: *mut Struct_archive_entry, arg2: *const c_char);
pub fn archive_entry_copy_gname(arg1: *mut Struct_archive_entry, arg2: *const c_char);
pub fn archive_entry_copy_gname_w(arg1: *mut Struct_archive_entry, arg2: *const wchar_t);
pub fn archive_entry_set_gid(arg1: *mut Struct_archive_entry, arg2: i64) -> ();
pub fn archive_entry_set_gname(arg1: *mut Struct_archive_entry, arg2: *const c_char) -> ();
pub fn archive_entry_copy_gname(arg1: *mut Struct_archive_entry, arg2: *const c_char) -> ();
pub fn archive_entry_copy_gname_w(arg1: *mut Struct_archive_entry, arg2: *const wchar_t) -> ();
pub fn archive_entry_update_gname_utf8(
arg1: *mut Struct_archive_entry,
arg2: *const c_char,
) -> c_int;
pub fn archive_entry_set_hardlink(arg1: *mut Struct_archive_entry, arg2: *const c_char);
pub fn archive_entry_copy_hardlink(arg1: *mut Struct_archive_entry, arg2: *const c_char);
pub fn archive_entry_copy_hardlink_w(arg1: *mut Struct_archive_entry, arg2: *const wchar_t);
pub fn archive_entry_set_hardlink(arg1: *mut Struct_archive_entry, arg2: *const c_char) -> ();
pub fn archive_entry_copy_hardlink(arg1: *mut Struct_archive_entry, arg2: *const c_char) -> ();
pub fn archive_entry_copy_hardlink_w(
arg1: *mut Struct_archive_entry,
arg2: *const wchar_t,
) -> ();
pub fn archive_entry_update_hardlink_utf8(
arg1: *mut Struct_archive_entry,
arg2: *const c_char,
) -> c_int;
pub fn archive_entry_set_ino(arg1: *mut Struct_archive_entry, arg2: i64);
pub fn archive_entry_set_ino64(arg1: *mut Struct_archive_entry, arg2: i64);
pub fn archive_entry_set_link(arg1: *mut Struct_archive_entry, arg2: *const c_char);
pub fn archive_entry_copy_link(arg1: *mut Struct_archive_entry, arg2: *const c_char);
pub fn archive_entry_copy_link_w(arg1: *mut Struct_archive_entry, arg2: *const wchar_t);
pub fn archive_entry_set_ino(arg1: *mut Struct_archive_entry, arg2: i64) -> ();
pub fn archive_entry_set_ino64(arg1: *mut Struct_archive_entry, arg2: i64) -> ();
pub fn archive_entry_set_link(arg1: *mut Struct_archive_entry, arg2: *const c_char) -> ();
pub fn archive_entry_copy_link(arg1: *mut Struct_archive_entry, arg2: *const c_char) -> ();
pub fn archive_entry_copy_link_w(arg1: *mut Struct_archive_entry, arg2: *const wchar_t) -> ();
pub fn archive_entry_update_link_utf8(
arg1: *mut Struct_archive_entry,
arg2: *const c_char,
) -> c_int;
pub fn archive_entry_set_mode(arg1: *mut Struct_archive_entry, arg2: mode_t);
pub fn archive_entry_set_mtime(arg1: *mut Struct_archive_entry, arg2: i64, arg3: c_long);
pub fn archive_entry_unset_mtime(arg1: *mut Struct_archive_entry);
pub fn archive_entry_set_nlink(arg1: *mut Struct_archive_entry, arg2: c_uint);
pub fn archive_entry_set_pathname(arg1: *mut Struct_archive_entry, arg2: *const c_char);
pub fn archive_entry_copy_pathname(arg1: *mut Struct_archive_entry, arg2: *const c_char);
pub fn archive_entry_copy_pathname_w(arg1: *mut Struct_archive_entry, arg2: *const wchar_t);
pub fn archive_entry_set_mode(arg1: *mut Struct_archive_entry, arg2: mode_t) -> ();
pub fn archive_entry_set_mtime(
arg1: *mut Struct_archive_entry,
arg2: time_t,
arg3: c_long,
) -> ();
pub fn archive_entry_unset_mtime(arg1: *mut Struct_archive_entry) -> ();
pub fn archive_entry_set_nlink(arg1: *mut Struct_archive_entry, arg2: c_uint) -> ();
pub fn archive_entry_set_pathname(arg1: *mut Struct_archive_entry, arg2: *const c_char) -> ();
pub fn archive_entry_copy_pathname(arg1: *mut Struct_archive_entry, arg2: *const c_char) -> ();
pub fn archive_entry_copy_pathname_w(
arg1: *mut Struct_archive_entry,
arg2: *const wchar_t,
) -> ();
pub fn archive_entry_update_pathname_utf8(
arg1: *mut Struct_archive_entry,
arg2: *const c_char,
) -> c_int;
pub fn archive_entry_set_perm(arg1: *mut Struct_archive_entry, arg2: mode_t);
pub fn archive_entry_set_rdev(arg1: *mut Struct_archive_entry, arg2: dev_t);
pub fn archive_entry_set_rdevmajor(arg1: *mut Struct_archive_entry, arg2: dev_t);
pub fn archive_entry_set_rdevminor(arg1: *mut Struct_archive_entry, arg2: dev_t);
pub fn archive_entry_set_size(arg1: *mut Struct_archive_entry, arg2: i64);
pub fn archive_entry_unset_size(arg1: *mut Struct_archive_entry);
pub fn archive_entry_copy_sourcepath(arg1: *mut Struct_archive_entry, arg2: *const c_char);
pub fn archive_entry_copy_sourcepath_w(arg1: *mut Struct_archive_entry, arg2: *const wchar_t);
pub fn archive_entry_set_symlink(arg1: *mut Struct_archive_entry, arg2: *const c_char);
pub fn archive_entry_copy_symlink(arg1: *mut Struct_archive_entry, arg2: *const c_char);
pub fn archive_entry_copy_symlink_w(arg1: *mut Struct_archive_entry, arg2: *const wchar_t);
pub fn archive_entry_set_perm(arg1: *mut Struct_archive_entry, arg2: mode_t) -> ();
pub fn archive_entry_set_rdev(arg1: *mut Struct_archive_entry, arg2: dev_t) -> ();
pub fn archive_entry_set_rdevmajor(arg1: *mut Struct_archive_entry, arg2: dev_t) -> ();
pub fn archive_entry_set_rdevminor(arg1: *mut Struct_archive_entry, arg2: dev_t) -> ();
pub fn archive_entry_set_size(arg1: *mut Struct_archive_entry, arg2: i64) -> ();
pub fn archive_entry_unset_size(arg1: *mut Struct_archive_entry) -> ();
pub fn archive_entry_copy_sourcepath(
arg1: *mut Struct_archive_entry,
arg2: *const c_char,
) -> ();
pub fn archive_entry_copy_sourcepath_w(
arg1: *mut Struct_archive_entry,
arg2: *const wchar_t,
) -> ();
pub fn archive_entry_set_symlink(arg1: *mut Struct_archive_entry, arg2: *const c_char) -> ();
pub fn archive_entry_copy_symlink(arg1: *mut Struct_archive_entry, arg2: *const c_char) -> ();
pub fn archive_entry_copy_symlink_w(
arg1: *mut Struct_archive_entry,
arg2: *const wchar_t,
) -> ();
pub fn archive_entry_update_symlink_utf8(
arg1: *mut Struct_archive_entry,
arg2: *const c_char,
) -> c_int;
pub fn archive_entry_set_uid(arg1: *mut Struct_archive_entry, arg2: i64);
pub fn archive_entry_set_uname(arg1: *mut Struct_archive_entry, arg2: *const c_char);
pub fn archive_entry_copy_uname(arg1: *mut Struct_archive_entry, arg2: *const c_char);
pub fn archive_entry_copy_uname_w(arg1: *mut Struct_archive_entry, arg2: *const wchar_t);
pub fn archive_entry_set_uid(arg1: *mut Struct_archive_entry, arg2: i64) -> ();
pub fn archive_entry_set_uname(arg1: *mut Struct_archive_entry, arg2: *const c_char) -> ();
pub fn archive_entry_copy_uname(arg1: *mut Struct_archive_entry, arg2: *const c_char) -> ();
pub fn archive_entry_copy_uname_w(arg1: *mut Struct_archive_entry, arg2: *const wchar_t) -> ();
pub fn archive_entry_update_uname_utf8(
arg1: *mut Struct_archive_entry,
arg2: *const c_char,
@ -757,7 +797,7 @@ extern "C" {
// pub fn archive_entry_stat(arg1: *mut Struct_archive_entry) -> *const Struct_stat;
// pub fn archive_entry_copy_stat(arg1: *mut Struct_archive_entry,
// arg2: *const Struct_stat)
// ;
// -> ();
pub fn archive_entry_mac_metadata(
arg1: *mut Struct_archive_entry,
arg2: *mut size_t,
@ -766,8 +806,8 @@ extern "C" {
arg1: *mut Struct_archive_entry,
arg2: *const c_void,
arg3: size_t,
);
pub fn archive_entry_acl_clear(arg1: *mut Struct_archive_entry);
) -> ();
pub fn archive_entry_acl_clear(arg1: *mut Struct_archive_entry) -> ();
pub fn archive_entry_acl_add_entry(
arg1: *mut Struct_archive_entry,
arg2: c_int,
@ -808,13 +848,13 @@ extern "C" {
pub fn archive_entry_acl_text(arg1: *mut Struct_archive_entry, arg2: c_int) -> *const c_char;
pub fn archive_entry_acl_count(arg1: *mut Struct_archive_entry, arg2: c_int) -> c_int;
pub fn archive_entry_acl(arg1: *mut Struct_archive_entry) -> *mut Struct_archive_acl;
pub fn archive_entry_xattr_clear(arg1: *mut Struct_archive_entry);
pub fn archive_entry_xattr_clear(arg1: *mut Struct_archive_entry) -> ();
pub fn archive_entry_xattr_add_entry(
arg1: *mut Struct_archive_entry,
arg2: *const c_char,
arg3: *const c_void,
arg4: size_t,
);
) -> ();
pub fn archive_entry_xattr_count(arg1: *mut Struct_archive_entry) -> c_int;
pub fn archive_entry_xattr_reset(arg1: *mut Struct_archive_entry) -> c_int;
pub fn archive_entry_xattr_next(
@ -823,8 +863,12 @@ extern "C" {
arg3: *mut *const c_void,
arg4: *mut size_t,
) -> c_int;
pub fn archive_entry_sparse_clear(arg1: *mut Struct_archive_entry);
pub fn archive_entry_sparse_add_entry(arg1: *mut Struct_archive_entry, arg2: i64, arg3: i64);
pub fn archive_entry_sparse_clear(arg1: *mut Struct_archive_entry) -> ();
pub fn archive_entry_sparse_add_entry(
arg1: *mut Struct_archive_entry,
arg2: i64,
arg3: i64,
) -> ();
pub fn archive_entry_sparse_count(arg1: *mut Struct_archive_entry) -> c_int;
pub fn archive_entry_sparse_reset(arg1: *mut Struct_archive_entry) -> c_int;
pub fn archive_entry_sparse_next(
@ -836,13 +880,13 @@ extern "C" {
pub fn archive_entry_linkresolver_set_strategy(
arg1: *mut Struct_archive_entry_linkresolver,
arg2: c_int,
);
pub fn archive_entry_linkresolver_free(arg1: *mut Struct_archive_entry_linkresolver);
) -> ();
pub fn archive_entry_linkresolver_free(arg1: *mut Struct_archive_entry_linkresolver) -> ();
pub fn archive_entry_linkify(
arg1: *mut Struct_archive_entry_linkresolver,
arg2: *mut *mut Struct_archive_entry,
arg3: *mut *mut Struct_archive_entry,
);
) -> ();
pub fn archive_entry_partial_links(
res: *mut Struct_archive_entry_linkresolver,
links: *mut c_uint,

View File

@ -7,22 +7,18 @@ authors = ["Jef Roosens"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
axum = { version = "0.7.5", features = ["http2", "macros"] }
axum = { version = "0.6.18", features = ["http2"] }
chrono = { version = "0.4.26", features = ["serde"] }
clap = { version = "4.3.12", features = ["env", "derive"] }
figment = { version = "0.10.19", features = ["env", "toml"] }
futures = "0.3.28"
http-body-util = "0.1.1"
libarchive = { path = "../libarchive" }
regex = "1.10.5"
sea-orm-migration = "0.12.1"
sea-query = { version = "0.30.7", features = ["backend-postgres", "backend-sqlite"] }
serde = { version = "1.0.178", features = ["derive"] }
sha256 = "1.1.4"
tokio = { version = "1.29.1", features = ["full"] }
tokio-util = { version = "0.7.8", features = ["io"] }
tower = { version = "0.4.13", features = ["make"] }
tower-http = { version = "0.5.2", features = ["fs", "trace", "auth"] }
tower-http = { version = "0.4.1", features = ["fs", "trace", "auth"] }
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
uuid = { version = "1.4.0", features = ["v4"] }
@ -34,6 +30,5 @@ features = [
"sqlx-postgres",
"runtime-tokio-rustls",
"macros",
"with-chrono",
"debug-print"
"with-chrono"
]

View File

@ -1,17 +0,0 @@
api_key = "test"
pkg_workers = 2
log_level = "rieterd=debug"
[fs]
type = "local"
data_dir = "./data"
[db]
type = "sqlite"
db_dir = "./data"
# [db]
# type = "postgres"
# host = "localhost"
# db = "rieter"
# user = "rieter"
# password = "rieter"

View File

@ -1,13 +1,13 @@
mod pagination;
use crate::db;
use axum::extract::{Path, Query, State};
use axum::routing::get;
use axum::Json;
use axum::Router;
use pagination::PaginatedResponse;
use axum::{
extract::{Path, Query, State},
routing::get,
Json, Router,
};
use crate::db;
pub fn router() -> Router<crate::Global> {
Router::new()
@ -20,19 +20,24 @@ pub fn router() -> Router<crate::Global> {
async fn get_repos(
State(global): State<crate::Global>,
Query(pagination): Query<pagination::Query>,
Query(filter): Query<db::query::repo::Filter>,
) -> crate::Result<Json<PaginatedResponse<db::repo::Model>>> {
let items =
db::query::repo::page(&global.db, pagination.per_page, pagination.page - 1, filter).await?;
Ok(Json(pagination.res(items)))
let (total_pages, repos) = global
.db
.repos(
pagination.per_page.unwrap_or(25),
pagination.page.unwrap_or(1) - 1,
)
.await?;
Ok(Json(pagination.res(total_pages, repos)))
}
async fn get_single_repo(
State(global): State<crate::Global>,
Path(id): Path<i32>,
) -> crate::Result<Json<db::repo::Model>> {
let repo = db::query::repo::by_id(&global.db, id)
let repo = global
.db
.repo(id)
.await?
.ok_or(axum::http::StatusCode::NOT_FOUND)?;
@ -42,20 +47,25 @@ async fn get_single_repo(
async fn get_packages(
State(global): State<crate::Global>,
Query(pagination): Query<pagination::Query>,
Query(filter): Query<db::query::package::Filter>,
) -> crate::Result<Json<PaginatedResponse<db::package::Model>>> {
let items =
db::query::package::page(&global.db, pagination.per_page, pagination.page - 1, filter)
let (total_pages, pkgs) = global
.db
.packages(
pagination.per_page.unwrap_or(25),
pagination.page.unwrap_or(1) - 1,
)
.await?;
Ok(Json(pagination.res(items)))
Ok(Json(pagination.res(total_pages, pkgs)))
}
async fn get_single_package(
State(global): State<crate::Global>,
Path(id): Path<i32>,
) -> crate::Result<Json<crate::db::FullPackage>> {
let entry = db::query::package::full(&global.db, id)
let entry = global
.db
.full_package(id)
.await?
.ok_or(axum::http::StatusCode::NOT_FOUND)?;

View File

@ -0,0 +1,38 @@
use serde::{Deserialize, Serialize};
pub const DEFAULT_PAGE: u64 = 0;
pub const DEFAULT_PER_PAGE: u64 = 25;
#[derive(Deserialize)]
pub struct Query {
pub page: Option<u64>,
pub per_page: Option<u64>,
}
#[derive(Serialize)]
pub struct PaginatedResponse<T>
where
T: for<'de> Serialize,
{
pub page: u64,
pub per_page: u64,
pub total_pages: u64,
pub count: usize,
pub items: Vec<T>,
}
impl Query {
pub fn res<T: for<'de> Serialize>(
self,
total_pages: u64,
items: Vec<T>,
) -> PaginatedResponse<T> {
PaginatedResponse {
page: self.page.unwrap_or(DEFAULT_PAGE),
per_page: self.per_page.unwrap_or(DEFAULT_PER_PAGE),
total_pages,
count: items.len(),
items,
}
}
}

View File

@ -1,15 +1,102 @@
use std::path::PathBuf;
use crate::repo::RepoGroupManager;
use crate::{Config, Global};
use axum::extract::FromRef;
use axum::Router;
use clap::Parser;
use std::io;
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use tower_http::trace::TraceLayer;
use tracing::debug;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
#[derive(Parser)]
#[command(author, version, about, long_about = None)]
pub struct Cli {
/// Directory where package archives will be stored
pub pkg_dir: PathBuf,
/// Directory where repository metadata & SQLite database is stored
pub data_dir: PathBuf,
/// API key to authenticate private routes with
pub api_key: String,
/// Database connection URL; either sqlite:// or postgres://. Defaults to rieter.sqlite in the
/// data directory
#[arg(short, long)]
pub database_url: Option<String>,
/// Port the server will listen on
#[arg(short, long, value_name = "PORT", default_value_t = 8000)]
pub port: u16,
/// Log levels for the tracing
#[arg(
short,
long,
env = "RIETER_CONFIG_FILE",
default_value = "./rieterd.toml"
value_name = "LOG_LEVEL",
default_value = "tower_http=debug,rieterd=debug"
)]
pub config_file: PathBuf,
pub log: String,
}
impl FromRef<Global> for Arc<RwLock<RepoGroupManager>> {
fn from_ref(global: &Global) -> Self {
Arc::clone(&global.repo_manager)
}
}
impl Cli {
pub fn init_tracing(&self) {
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::new(self.log.clone()))
.with(tracing_subscriber::fmt::layer())
.init();
}
pub async fn run(&self) -> crate::Result<()> {
self.init_tracing();
let db_url = if let Some(url) = &self.database_url {
url.clone()
} else {
format!(
"sqlite://{}",
self.data_dir.join("rieter.sqlite").to_string_lossy()
)
};
debug!("Connecting to database with URL {}", db_url);
let db = crate::db::RieterDb::connect(db_url).await?;
// let db = crate::db::init("postgres://rieter:rieter@localhost:5432/rieter")
// .await
// .unwrap();
let config = Config {
data_dir: self.data_dir.clone(),
repo_dir: self.data_dir.join("repos"),
pkg_dir: self.pkg_dir.clone(),
api_key: self.api_key.clone(),
};
let repo_manager = RepoGroupManager::new(&config.repo_dir, &self.pkg_dir);
let global = Global {
config,
repo_manager: Arc::new(RwLock::new(repo_manager)),
db,
};
// build our application with a single route
let app = Router::new()
.nest("/api", crate::api::router())
.merge(crate::repo::router(&self.api_key))
.with_state(global)
.layer(TraceLayer::new_for_http());
// run it with hyper on localhost:3000
Ok(
axum::Server::bind(&format!("0.0.0.0:{}", self.port).parse().unwrap())
.serve(app.into_make_service())
.await
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?,
)
}
}

View File

@ -1,88 +0,0 @@
use std::path::{Path, PathBuf};
use figment::{
providers::{Env, Format, Toml},
Figment,
};
use serde::Deserialize;
#[derive(Deserialize, Debug, Clone)]
#[serde(rename_all = "lowercase")]
#[serde(tag = "type")]
pub enum FsConfig {
Local { data_dir: PathBuf },
}
#[derive(Deserialize, Debug, Clone)]
#[serde(rename_all = "lowercase")]
#[serde(tag = "type")]
pub enum DbConfig {
Sqlite {
db_dir: PathBuf,
#[serde(default = "default_db_sqlite_max_connections")]
max_connections: u32,
},
Postgres {
host: String,
#[serde(default = "default_db_postgres_port")]
port: u16,
user: String,
password: String,
db: String,
#[serde(default)]
schema: String,
#[serde(default = "default_db_postgres_max_connections")]
max_connections: u32,
},
}
#[derive(Deserialize, Debug, Clone)]
pub struct Config {
pub api_key: String,
#[serde(default = "default_domain")]
pub domain: String,
#[serde(default = "default_port")]
pub port: u16,
#[serde(default = "default_log_level")]
pub log_level: String,
pub fs: FsConfig,
pub db: DbConfig,
#[serde(default = "default_pkg_workers")]
pub pkg_workers: u32,
}
impl Config {
pub fn figment(config_file: impl AsRef<Path>) -> Figment {
Figment::new()
.merge(Toml::file(config_file))
.merge(Env::prefixed("RIETER_"))
}
}
fn default_domain() -> String {
String::from("0.0.0.0")
}
fn default_port() -> u16 {
8000
}
fn default_log_level() -> String {
String::from("tower_http=debug,rieterd=debug,sea_orm=debug")
}
fn default_db_sqlite_max_connections() -> u32 {
16
}
fn default_db_postgres_port() -> u16 {
5432
}
fn default_db_postgres_max_connections() -> u32 {
16
}
fn default_pkg_workers() -> u32 {
1
}

View File

@ -0,0 +1,61 @@
use super::RieterDb;
use sea_orm::{DbBackend, DbErr, ExecResult, QueryResult, Statement};
use std::{future::Future, pin::Pin};
// Allows RieterDb objects to be passed to ORM functions
impl sea_orm::ConnectionTrait for RieterDb {
fn get_database_backend(&self) -> DbBackend {
self.conn.get_database_backend()
}
fn execute<'life0, 'async_trait>(
&'life0 self,
stmt: Statement,
) -> Pin<Box<dyn Future<Output = std::result::Result<ExecResult, DbErr>> + Send + 'async_trait>>
where
Self: 'async_trait,
'life0: 'async_trait,
{
self.conn.execute(stmt)
}
fn execute_unprepared<'life0, 'life1, 'async_trait>(
&'life0 self,
sql: &'life1 str,
) -> Pin<Box<dyn Future<Output = std::result::Result<ExecResult, DbErr>> + Send + 'async_trait>>
where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
{
self.conn.execute_unprepared(sql)
}
fn query_one<'life0, 'async_trait>(
&'life0 self,
stmt: Statement,
) -> Pin<
Box<
dyn Future<Output = std::result::Result<Option<QueryResult>, DbErr>>
+ Send
+ 'async_trait,
>,
>
where
Self: 'async_trait,
'life0: 'async_trait,
{
self.conn.query_one(stmt)
}
fn query_all<'life0, 'async_trait>(
&'life0 self,
stmt: Statement,
) -> Pin<
Box<
dyn Future<Output = std::result::Result<Vec<QueryResult>, DbErr>> + Send + 'async_trait,
>,
>
where
Self: 'async_trait,
'life0: 'async_trait,
{
self.conn.query_all(stmt)
}
}

View File

@ -1,27 +0,0 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.1
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "distro")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i32,
pub name: String,
pub description: Option<String>,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(has_many = "super::repo::Entity")]
Repo,
}
impl Related<super::repo::Entity> for Entity {
fn to() -> RelationDef {
Relation::Repo.def()
}
}
impl ActiveModelBehavior for ActiveModel {}

View File

@ -2,10 +2,12 @@
pub mod prelude;
pub mod distro;
pub mod package;
pub mod package_conflicts;
pub mod package_depends;
pub mod package_file;
pub mod package_group;
pub mod package_license;
pub mod package_related;
pub mod package_provides;
pub mod package_replaces;
pub mod repo;

View File

@ -1,11 +1,8 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.1
use chrono::NaiveDateTime;
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
use crate::db::PackageState;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "package")]
pub struct Model {
@ -20,26 +17,29 @@ pub struct Model {
pub c_size: i64,
pub description: Option<String>,
pub url: Option<String>,
pub build_date: NaiveDateTime,
pub build_date: DateTime,
pub packager: Option<String>,
pub pgp_sig: Option<String>,
pub pgp_sig_size: Option<i64>,
pub sha256_sum: String,
pub compression: String,
#[serde(skip_serializing)]
pub state: PackageState,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(has_many = "super::package_conflicts::Entity")]
PackageConflicts,
#[sea_orm(has_many = "super::package_depends::Entity")]
PackageDepends,
#[sea_orm(has_many = "super::package_file::Entity")]
PackageFile,
#[sea_orm(has_many = "super::package_group::Entity")]
PackageGroup,
#[sea_orm(has_many = "super::package_license::Entity")]
PackageLicense,
#[sea_orm(has_many = "super::package_related::Entity")]
PackageRelated,
#[sea_orm(has_many = "super::package_provides::Entity")]
PackageProvides,
#[sea_orm(has_many = "super::package_replaces::Entity")]
PackageReplaces,
#[sea_orm(
belongs_to = "super::repo::Entity",
from = "Column::RepoId",
@ -50,6 +50,18 @@ pub enum Relation {
Repo,
}
impl Related<super::package_conflicts::Entity> for Entity {
fn to() -> RelationDef {
Relation::PackageConflicts.def()
}
}
impl Related<super::package_depends::Entity> for Entity {
fn to() -> RelationDef {
Relation::PackageDepends.def()
}
}
impl Related<super::package_file::Entity> for Entity {
fn to() -> RelationDef {
Relation::PackageFile.def()
@ -68,9 +80,15 @@ impl Related<super::package_license::Entity> for Entity {
}
}
impl Related<super::package_related::Entity> for Entity {
impl Related<super::package_provides::Entity> for Entity {
fn to() -> RelationDef {
Relation::PackageRelated.def()
Relation::PackageProvides.def()
}
}
impl Related<super::package_replaces::Entity> for Entity {
fn to() -> RelationDef {
Relation::PackageReplaces.def()
}
}

View File

@ -0,0 +1,33 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.1
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "package_conflicts")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub package_id: i32,
#[sea_orm(primary_key, auto_increment = false)]
pub value: String,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(
belongs_to = "super::package::Entity",
from = "Column::PackageId",
to = "super::package::Column::Id",
on_update = "NoAction",
on_delete = "Cascade"
)]
Package,
}
impl Related<super::package::Entity> for Entity {
fn to() -> RelationDef {
Relation::Package.def()
}
}
impl ActiveModelBehavior for ActiveModel {}

View File

@ -3,17 +3,15 @@
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
use crate::db::PackageRelatedEnum;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "package_related")]
#[sea_orm(table_name = "package_depends")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub package_id: i32,
#[sea_orm(primary_key, auto_increment = false)]
pub r#type: PackageRelatedEnum,
pub r#type: crate::db::PackageDepend,
#[sea_orm(primary_key, auto_increment = false)]
pub name: String,
pub value: String,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]

View File

@ -9,7 +9,7 @@ pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub package_id: i32,
#[sea_orm(primary_key, auto_increment = false)]
pub path: String,
pub value: String,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]

View File

@ -9,7 +9,7 @@ pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub package_id: i32,
#[sea_orm(primary_key, auto_increment = false)]
pub name: String,
pub value: String,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]

View File

@ -9,7 +9,7 @@ pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub package_id: i32,
#[sea_orm(primary_key, auto_increment = false)]
pub name: String,
pub value: String,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]

View File

@ -0,0 +1,33 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.1
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "package_provides")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub package_id: i32,
#[sea_orm(primary_key, auto_increment = false)]
pub value: String,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(
belongs_to = "super::package::Entity",
from = "Column::PackageId",
to = "super::package::Column::Id",
on_update = "NoAction",
on_delete = "Cascade"
)]
Package,
}
impl Related<super::package::Entity> for Entity {
fn to() -> RelationDef {
Relation::Package.def()
}
}
impl ActiveModelBehavior for ActiveModel {}

View File

@ -0,0 +1,33 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.1
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "package_replaces")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub package_id: i32,
#[sea_orm(primary_key, auto_increment = false)]
pub value: String,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(
belongs_to = "super::package::Entity",
from = "Column::PackageId",
to = "super::package::Column::Id",
on_update = "NoAction",
on_delete = "Cascade"
)]
Package,
}
impl Related<super::package::Entity> for Entity {
fn to() -> RelationDef {
Relation::Package.def()
}
}
impl ActiveModelBehavior for ActiveModel {}

View File

@ -1,9 +1,11 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.1
pub use super::distro::Entity as Distro;
pub use super::package::Entity as Package;
pub use super::package_conflicts::Entity as PackageConflicts;
pub use super::package_depends::Entity as PackageDepends;
pub use super::package_file::Entity as PackageFile;
pub use super::package_group::Entity as PackageGroup;
pub use super::package_license::Entity as PackageLicense;
pub use super::package_related::Entity as PackageRelated;
pub use super::package_provides::Entity as PackageProvides;
pub use super::package_replaces::Entity as PackageReplaces;
pub use super::repo::Entity as Repo;

View File

@ -8,31 +8,17 @@ use serde::{Deserialize, Serialize};
pub struct Model {
#[sea_orm(primary_key)]
pub id: i32,
pub distro_id: i32,
#[sea_orm(unique)]
pub name: String,
pub description: Option<String>,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(
belongs_to = "super::distro::Entity",
from = "Column::DistroId",
to = "super::distro::Column::Id",
on_update = "NoAction",
on_delete = "Cascade"
)]
Distro,
#[sea_orm(has_many = "super::package::Entity")]
Package,
}
impl Related<super::distro::Entity> for Entity {
fn to() -> RelationDef {
Relation::Distro.def()
}
}
impl Related<super::package::Entity> for Entity {
fn to() -> RelationDef {
Relation::Package.def()

View File

@ -11,27 +11,6 @@ impl MigrationName for Migration {
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
Table::create()
.table(Distro::Table)
.col(
ColumnDef::new(Distro::Id)
.integer()
.not_null()
.auto_increment()
.primary_key(),
)
.col(
ColumnDef::new(Distro::Name)
.string()
.not_null()
.unique_key(),
)
.col(ColumnDef::new(Distro::Description).string())
.to_owned(),
)
.await?;
manager
.create_table(
Table::create()
@ -43,16 +22,8 @@ impl MigrationTrait for Migration {
.auto_increment()
.primary_key(),
)
.col(ColumnDef::new(Repo::DistroId).integer().not_null())
.col(ColumnDef::new(Repo::Name).string().not_null().unique_key())
.col(ColumnDef::new(Repo::Description).string())
.foreign_key(
ForeignKey::create()
.name("fk-repo-distro_id")
.from(Repo::Table, Repo::DistroId)
.to(Distro::Table, Distro::Id)
.on_delete(ForeignKeyAction::Cascade),
)
.to_owned(),
)
.await?;
@ -81,12 +52,6 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Package::PgpSig).string_len(255))
.col(ColumnDef::new(Package::PgpSigSize).big_integer())
.col(ColumnDef::new(Package::Sha256Sum).char_len(64).not_null())
.col(
ColumnDef::new(Package::Compression)
.string_len(16)
.not_null(),
)
.col(ColumnDef::new(Package::State).integer().not_null())
.foreign_key(
ForeignKey::create()
.name("fk-package-repo_id")
@ -107,14 +72,14 @@ impl MigrationTrait for Migration {
.not_null(),
)
.col(
ColumnDef::new(PackageLicense::Name)
ColumnDef::new(PackageLicense::Value)
.string_len(255)
.not_null(),
)
.primary_key(
Index::create()
.col(PackageLicense::PackageId)
.col(PackageLicense::Name),
.col(PackageLicense::Value),
)
.foreign_key(
ForeignKey::create()
@ -132,14 +97,14 @@ impl MigrationTrait for Migration {
.table(PackageGroup::Table)
.col(ColumnDef::new(PackageGroup::PackageId).integer().not_null())
.col(
ColumnDef::new(PackageGroup::Name)
ColumnDef::new(PackageGroup::Value)
.string_len(255)
.not_null(),
)
.primary_key(
Index::create()
.col(PackageGroup::PackageId)
.col(PackageGroup::Name),
.col(PackageGroup::Value),
)
.foreign_key(
ForeignKey::create()
@ -154,28 +119,119 @@ impl MigrationTrait for Migration {
manager
.create_table(
Table::create()
.table(PackageRelated::Table)
.table(PackageReplaces::Table)
.col(
ColumnDef::new(PackageRelated::PackageId)
ColumnDef::new(PackageReplaces::PackageId)
.integer()
.not_null(),
)
.col(ColumnDef::new(PackageRelated::Type).integer().not_null())
.col(
ColumnDef::new(PackageRelated::Name)
ColumnDef::new(PackageReplaces::Value)
.string_len(255)
.not_null(),
)
.primary_key(
Index::create()
.col(PackageRelated::PackageId)
.col(PackageRelated::Type)
.col(PackageRelated::Name),
.col(PackageReplaces::PackageId)
.col(PackageReplaces::Value),
)
.foreign_key(
ForeignKey::create()
.name("fk-package_replaces-package_id")
.from(PackageReplaces::Table, PackageReplaces::PackageId)
.to(Package::Table, Package::Id)
.on_delete(ForeignKeyAction::Cascade),
)
.to_owned(),
)
.await?;
manager
.create_table(
Table::create()
.table(PackageConflicts::Table)
.col(
ColumnDef::new(PackageConflicts::PackageId)
.integer()
.not_null(),
)
.col(
ColumnDef::new(PackageConflicts::Value)
.string_len(255)
.not_null(),
)
.primary_key(
Index::create()
.col(PackageConflicts::PackageId)
.col(PackageConflicts::Value),
)
.foreign_key(
ForeignKey::create()
.name("fk-package_conflicts-package_id")
.from(PackageConflicts::Table, PackageConflicts::PackageId)
.to(Package::Table, Package::Id)
.on_delete(ForeignKeyAction::Cascade),
)
.to_owned(),
)
.await?;
manager
.create_table(
Table::create()
.table(PackageProvides::Table)
.col(
ColumnDef::new(PackageProvides::PackageId)
.integer()
.not_null(),
)
.col(
ColumnDef::new(PackageProvides::Value)
.string_len(255)
.not_null(),
)
.primary_key(
Index::create()
.col(PackageProvides::PackageId)
.col(PackageProvides::Value),
)
.foreign_key(
ForeignKey::create()
.name("fk-package_provides-package_id")
.from(PackageProvides::Table, PackageProvides::PackageId)
.to(Package::Table, Package::Id)
.on_delete(ForeignKeyAction::Cascade),
)
.to_owned(),
)
.await?;
manager
.create_table(
Table::create()
.table(PackageDepends::Table)
.col(
ColumnDef::new(PackageDepends::PackageId)
.integer()
.not_null(),
)
.col(
ColumnDef::new(PackageDepends::Type)
.string_len(6)
.not_null(),
)
.col(
ColumnDef::new(PackageDepends::Value)
.string_len(255)
.not_null(),
)
.primary_key(
Index::create()
.col(PackageDepends::PackageId)
.col(PackageDepends::Type)
.col(PackageDepends::Value),
)
.foreign_key(
ForeignKey::create()
.name("fk-package_depends-package_id")
.from(PackageRelated::Table, PackageRelated::PackageId)
.from(PackageDepends::Table, PackageDepends::PackageId)
.to(Package::Table, Package::Id)
.on_delete(ForeignKeyAction::Cascade),
)
@ -187,11 +243,15 @@ impl MigrationTrait for Migration {
Table::create()
.table(PackageFile::Table)
.col(ColumnDef::new(PackageFile::PackageId).integer().not_null())
.col(ColumnDef::new(PackageFile::Path).string_len(255).not_null())
.col(
ColumnDef::new(PackageFile::Value)
.string_len(255)
.not_null(),
)
.primary_key(
Index::create()
.col(PackageFile::PackageId)
.col(PackageFile::Path),
.col(PackageFile::Value),
)
.foreign_key(
ForeignKey::create()
@ -216,7 +276,16 @@ impl MigrationTrait for Migration {
.drop_table(Table::drop().table(PackageGroup::Table).to_owned())
.await?;
manager
.drop_table(Table::drop().table(PackageRelated::Table).to_owned())
.drop_table(Table::drop().table(PackageReplaces::Table).to_owned())
.await?;
manager
.drop_table(Table::drop().table(PackageConflicts::Table).to_owned())
.await?;
manager
.drop_table(Table::drop().table(PackageProvides::Table).to_owned())
.await?;
manager
.drop_table(Table::drop().table(PackageDepends::Table).to_owned())
.await?;
manager
.drop_table(Table::drop().table(PackageFile::Table).to_owned())
@ -226,26 +295,14 @@ impl MigrationTrait for Migration {
.await?;
manager
.drop_table(Table::drop().table(Repo::Table).to_owned())
.await?;
manager
.drop_table(Table::drop().table(Distro::Table).to_owned())
.await
}
}
#[derive(Iden)]
pub enum Distro {
Table,
Id,
Name,
Description,
}
#[derive(Iden)]
pub enum Repo {
Table,
Id,
DistroId,
Name,
Description,
}
@ -268,35 +325,54 @@ pub enum Package {
PgpSig,
PgpSigSize,
Sha256Sum,
Compression,
State,
}
#[derive(Iden)]
pub enum PackageLicense {
Table,
PackageId,
Name,
Value,
}
#[derive(Iden)]
pub enum PackageGroup {
Table,
PackageId,
Name,
Value,
}
#[derive(Iden)]
pub enum PackageRelated {
pub enum PackageReplaces {
Table,
PackageId,
Value,
}
#[derive(Iden)]
pub enum PackageConflicts {
Table,
PackageId,
Value,
}
#[derive(Iden)]
pub enum PackageProvides {
Table,
PackageId,
Value,
}
#[derive(Iden)]
pub enum PackageDepends {
Table,
PackageId,
Type,
Name,
Value,
}
#[derive(Iden)]
pub enum PackageFile {
Table,
PackageId,
Path,
Value,
}

View File

@ -1,46 +1,31 @@
mod conn;
pub mod entities;
mod migrator;
pub mod query;
use crate::config::DbConfig;
use sea_orm::{
ActiveModelTrait, ColumnTrait, ConnectOptions, Database, DatabaseConnection, DeleteResult,
DeriveActiveEnum, EntityTrait, EnumIter, InsertResult, ModelTrait, NotSet, PaginatorTrait,
QueryFilter, QueryOrder, Set,
};
use sea_orm_migration::MigratorTrait;
use serde::{Deserialize, Serialize};
pub use entities::{prelude::*, *};
pub use migrator::Migrator;
use sea_orm::{ConnectionTrait, Database, DbConn, DeriveActiveEnum, EnumIter};
use serde::{Deserialize, Serialize};
use migrator::Migrator;
type Result<T> = std::result::Result<T, sea_orm::DbErr>;
#[derive(EnumIter, DeriveActiveEnum, Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
#[sea_orm(rs_type = "i32", db_type = "Integer")]
#[serde(rename_all = "lowercase")]
pub enum PackageRelatedEnum {
#[sea_orm(num_value = 0)]
Conflicts,
#[sea_orm(num_value = 1)]
Replaces,
#[sea_orm(num_value = 2)]
Provides,
#[sea_orm(num_value = 3)]
#[sea_orm(rs_type = "String", db_type = "String(Some(6))")]
pub enum PackageDepend {
#[sea_orm(string_value = "depend")]
Depend,
#[sea_orm(num_value = 4)]
Makedepend,
#[sea_orm(num_value = 5)]
Checkdepend,
#[sea_orm(num_value = 6)]
Optdepend,
}
#[derive(EnumIter, DeriveActiveEnum, Deserialize, Serialize, PartialEq, Eq, Clone, Debug)]
#[sea_orm(rs_type = "i32", db_type = "Integer")]
pub enum PackageState {
#[sea_orm(num_value = 0)]
PendingCommit,
#[sea_orm(num_value = 1)]
Committed,
#[sea_orm(num_value = 2)]
PendingDeletion,
#[sea_orm(string_value = "make")]
Make,
#[sea_orm(string_value = "check")]
Check,
#[sea_orm(string_value = "opt")]
Opt,
}
#[derive(Serialize)]
@ -49,53 +34,261 @@ pub struct FullPackage {
entry: package::Model,
licenses: Vec<String>,
groups: Vec<String>,
related: Vec<(PackageRelatedEnum, String)>,
replaces: Vec<String>,
provides: Vec<String>,
depends: Vec<(PackageDepend, String)>,
files: Vec<String>,
}
pub async fn connect(conn: &DbConfig) -> crate::Result<DbConn> {
match conn {
DbConfig::Sqlite {
db_dir,
max_connections,
} => {
let url = format!(
"sqlite://{}?mode=rwc",
db_dir.join("rieter.sqlite").to_string_lossy()
);
let options = sea_orm::ConnectOptions::new(url)
.max_connections(*max_connections)
.to_owned();
#[derive(Clone, Debug)]
pub struct RieterDb {
pub conn: DatabaseConnection,
}
let conn = Database::connect(options).await?;
impl RieterDb {
pub async fn connect<C: Into<ConnectOptions>>(opt: C) -> Result<Self> {
let db = Database::connect(opt).await?;
// synchronous=NORMAL still ensures database consistency with WAL mode, as per the docs
// https://www.sqlite.org/pragma.html#pragma_synchronous
conn.execute_unprepared("PRAGMA journal_mode=WAL;").await?;
conn.execute_unprepared("PRAGMA synchronous=NORMAL;")
Migrator::up(&db, None).await?;
Ok(Self { conn: db })
}
pub async fn repos(&self, per_page: u64, page: u64) -> Result<(u64, Vec<repo::Model>)> {
let paginator = Repo::find()
.order_by_asc(repo::Column::Id)
.paginate(&self.conn, per_page);
let repos = paginator.fetch_page(page).await?;
let total_pages = paginator.num_pages().await?;
Ok((total_pages, repos))
}
pub async fn repo(&self, id: i32) -> Result<Option<repo::Model>> {
repo::Entity::find_by_id(id).one(&self.conn).await
}
pub async fn repo_by_name(&self, name: &str) -> Result<Option<repo::Model>> {
Repo::find()
.filter(repo::Column::Name.eq(name))
.one(&self.conn)
.await
}
pub async fn insert_repo(
&self,
name: &str,
description: Option<&str>,
) -> Result<InsertResult<repo::ActiveModel>> {
let model = repo::ActiveModel {
id: NotSet,
name: Set(String::from(name)),
description: Set(description.map(String::from)),
};
Repo::insert(model).exec(&self.conn).await
}
pub async fn packages(&self, per_page: u64, page: u64) -> Result<(u64, Vec<package::Model>)> {
let paginator = Package::find()
.order_by_asc(package::Column::Id)
.paginate(&self.conn, per_page);
let packages = paginator.fetch_page(page).await?;
let total_pages = paginator.num_pages().await?;
Ok((total_pages, packages))
}
pub async fn package(&self, id: i32) -> Result<Option<package::Model>> {
package::Entity::find_by_id(id).one(&self.conn).await
}
pub async fn package_by_fields(
&self,
repo_id: i32,
name: &str,
version: Option<&str>,
arch: &str,
) -> Result<Option<package::Model>> {
let mut query = Package::find()
.filter(package::Column::RepoId.eq(repo_id))
.filter(package::Column::Name.eq(name))
.filter(package::Column::Arch.eq(arch));
if let Some(version) = version {
query = query.filter(package::Column::Version.eq(version));
}
query.one(&self.conn).await
}
pub async fn delete_packages_with_arch(
&self,
repo_id: i32,
arch: &str,
) -> Result<DeleteResult> {
Package::delete_many()
.filter(package::Column::RepoId.eq(repo_id))
.filter(package::Column::Arch.eq(arch))
.exec(&self.conn)
.await
}
pub async fn insert_package(
&self,
repo_id: i32,
pkg: crate::repo::package::Package,
) -> Result<()> {
let info = pkg.info;
let model = package::ActiveModel {
id: NotSet,
repo_id: Set(repo_id),
base: Set(info.base),
name: Set(info.name),
version: Set(info.version),
arch: Set(info.arch),
size: Set(info.size),
c_size: Set(info.csize),
description: Set(info.description),
url: Set(info.url),
build_date: Set(info.build_date),
packager: Set(info.packager),
pgp_sig: Set(info.pgpsig),
pgp_sig_size: Set(info.pgpsigsize),
sha256_sum: Set(info.sha256sum),
};
let pkg_entry = model.insert(&self.conn).await?;
// Insert all the related tables
PackageLicense::insert_many(info.licenses.iter().map(|s| package_license::ActiveModel {
package_id: Set(pkg_entry.id),
value: Set(s.to_string()),
}))
.on_empty_do_nothing()
.exec(self)
.await?;
Ok(conn)
}
DbConfig::Postgres {
host,
port,
db,
user,
password,
schema,
max_connections,
} => {
let mut url = format!("postgres://{}:{}@{}:{}/{}", user, password, host, port, db);
PackageGroup::insert_many(info.groups.iter().map(|s| package_group::ActiveModel {
package_id: Set(pkg_entry.id),
value: Set(s.to_string()),
}))
.on_empty_do_nothing()
.exec(self)
.await?;
if !schema.is_empty() {
url = format!("{url}?currentSchema={schema}");
PackageReplaces::insert_many(info.replaces.iter().map(|s| package_replaces::ActiveModel {
package_id: Set(pkg_entry.id),
value: Set(s.to_string()),
}))
.on_empty_do_nothing()
.exec(self)
.await?;
PackageConflicts::insert_many(info.conflicts.iter().map(|s| {
package_conflicts::ActiveModel {
package_id: Set(pkg_entry.id),
value: Set(s.to_string()),
}
}))
.on_empty_do_nothing()
.exec(self)
.await?;
PackageProvides::insert_many(info.provides.iter().map(|s| package_provides::ActiveModel {
package_id: Set(pkg_entry.id),
value: Set(s.to_string()),
}))
.on_empty_do_nothing()
.exec(self)
.await?;
PackageFile::insert_many(pkg.files.iter().map(|s| package_file::ActiveModel {
package_id: Set(pkg_entry.id),
value: Set(s.display().to_string()),
}))
.on_empty_do_nothing()
.exec(self)
.await?;
let deps = info
.depends
.iter()
.map(|d| (PackageDepend::Depend, d))
.chain(info.makedepends.iter().map(|d| (PackageDepend::Make, d)))
.chain(info.checkdepends.iter().map(|d| (PackageDepend::Check, d)))
.chain(info.optdepends.iter().map(|d| (PackageDepend::Opt, d)))
.map(|(t, s)| package_depends::ActiveModel {
package_id: Set(pkg_entry.id),
r#type: Set(t),
value: Set(s.to_string()),
});
PackageDepends::insert_many(deps)
.on_empty_do_nothing()
.exec(self)
.await?;
Ok(())
}
let options = sea_orm::ConnectOptions::new(url)
.max_connections(*max_connections)
.to_owned();
Ok(Database::connect(options).await?)
pub async fn full_package(&self, id: i32) -> Result<Option<FullPackage>> {
if let Some(entry) = self.package(id).await? {
let licenses = entry
.find_related(PackageLicense)
.all(self)
.await?
.into_iter()
.map(|e| e.value)
.collect();
let groups = entry
.find_related(PackageGroup)
.all(self)
.await?
.into_iter()
.map(|e| e.value)
.collect();
let replaces = entry
.find_related(PackageReplaces)
.all(self)
.await?
.into_iter()
.map(|e| e.value)
.collect();
let provides = entry
.find_related(PackageProvides)
.all(self)
.await?
.into_iter()
.map(|e| e.value)
.collect();
let depends = entry
.find_related(PackageDepends)
.all(self)
.await?
.into_iter()
.map(|e| (e.r#type, e.value))
.collect();
let files = entry
.find_related(PackageFile)
.all(self)
.await?
.into_iter()
.map(|e| e.value)
.collect();
Ok(Some(FullPackage {
entry,
licenses,
groups,
replaces,
provides,
depends,
files,
}))
} else {
Ok(None)
}
}
}

View File

@ -1,46 +0,0 @@
use crate::db::*;
use sea_orm::{sea_query::IntoCondition, *};
#[derive(Deserialize)]
pub struct Filter {
name: Option<String>,
}
impl IntoCondition for Filter {
fn into_condition(self) -> Condition {
Condition::all().add_option(
self.name
.map(|name| distro::Column::Name.like(format!("%{}%", name))),
)
}
}
pub async fn page(
conn: &DbConn,
per_page: u64,
page: u64,
filter: Filter,
) -> Result<Vec<distro::Model>> {
let paginator = Distro::find()
.filter(filter)
.order_by_asc(distro::Column::Id)
.paginate(conn, per_page);
let repos = paginator.fetch_page(page).await?;
Ok(repos)
}
pub async fn by_id(conn: &DbConn, id: i32) -> Result<Option<distro::Model>> {
distro::Entity::find_by_id(id).one(conn).await
}
pub async fn insert(conn: &DbConn, name: &str, description: Option<&str>) -> Result<distro::Model> {
let model = distro::ActiveModel {
id: NotSet,
name: Set(String::from(name)),
description: Set(description.map(String::from)),
};
model.insert(conn).await
}

View File

@ -1,3 +0,0 @@
pub mod distro;
pub mod package;
pub mod repo;

View File

@ -1,376 +0,0 @@
use crate::db::{self, *};
use sea_orm::{sea_query::IntoCondition, *};
use sea_query::{Alias, Expr, Query, SelectStatement};
use serde::Deserialize;
/// How many fields may be inserted at once into the database.
const PACKAGE_INSERT_LIMIT: usize = 1000;
#[derive(Deserialize)]
pub struct Filter {
repo: Option<i32>,
arch: Option<String>,
name: Option<String>,
}
impl IntoCondition for Filter {
fn into_condition(self) -> Condition {
Condition::all()
.add_option(self.repo.map(|repo| package::Column::RepoId.eq(repo)))
.add_option(self.arch.map(|arch| package::Column::Arch.eq(arch)))
.add_option(self.name.map(|name| package::Column::Name.contains(name)))
}
}
pub async fn page(
conn: &DbConn,
per_page: u64,
page: u64,
filter: Filter,
) -> crate::Result<Vec<package::Model>> {
let p2 = Alias::new("p2");
let query = Query::select()
.columns(db::package::Column::iter().map(|c| (db::package::Entity, c)))
.from(db::package::Entity)
.join_subquery(
JoinType::InnerJoin,
max_pkg_ids_query(true),
p2.clone(),
Expr::col((db::package::Entity, db::package::Column::Id))
.eq(Expr::col((p2.clone(), Alias::new("max_id")))),
)
.cond_where(filter)
.order_by((db::package::Entity, db::package::Column::Id), Order::Asc)
.to_owned();
let builder = conn.get_database_backend();
let sql = builder.build(&query);
Ok(db::Package::find()
.from_raw_sql(sql)
.paginate(conn, per_page)
.fetch_page(page)
.await?)
}
pub async fn by_id(conn: &DbConn, id: i32) -> Result<Option<package::Model>> {
package::Entity::find_by_id(id).one(conn).await
}
pub async fn by_fields(
conn: &DbConn,
repo_id: i32,
name: &str,
version: &str,
arch: &str,
compression: &str,
) -> Result<Option<package::Model>> {
let cond = Condition::all()
.add(package::Column::RepoId.eq(repo_id))
.add(package::Column::Name.eq(name))
.add(package::Column::Arch.eq(arch))
.add(package::Column::Version.eq(version))
.add(package::Column::Compression.eq(compression));
Package::find().filter(cond).one(conn).await
}
pub async fn delete_with_arch(conn: &DbConn, repo_id: i32, arch: &str) -> Result<DeleteResult> {
Package::delete_many()
.filter(package::Column::RepoId.eq(repo_id))
.filter(package::Column::Arch.eq(arch))
.exec(conn)
.await
}
pub async fn insert(
conn: &DbConn,
repo_id: i32,
pkg: crate::repo::package::Package,
) -> Result<package::Model> {
let info = pkg.info;
// Doing this manually is not the recommended way, but the generic error type of the
// transaction function didn't play well with my current error handling
let txn = conn.begin().await?;
let model = package::ActiveModel {
id: NotSet,
repo_id: Set(repo_id),
base: Set(info.base),
name: Set(info.name),
version: Set(info.version),
arch: Set(info.arch),
size: Set(info.size),
c_size: Set(info.csize),
description: Set(info.description),
url: Set(info.url),
build_date: Set(info.build_date),
packager: Set(info.packager),
pgp_sig: Set(info.pgpsig),
pgp_sig_size: Set(info.pgpsigsize),
sha256_sum: Set(info.sha256sum),
compression: Set(pkg.compression.extension().unwrap().to_string()),
state: Set(PackageState::PendingCommit),
};
let pkg_entry = model.insert(&txn).await?;
// Insert all the related tables
PackageLicense::insert_many(info.licenses.iter().map(|s| package_license::ActiveModel {
package_id: Set(pkg_entry.id),
name: Set(s.to_string()),
}))
.on_empty_do_nothing()
.exec(&txn)
.await?;
PackageGroup::insert_many(info.groups.iter().map(|s| package_group::ActiveModel {
package_id: Set(pkg_entry.id),
name: Set(s.to_string()),
}))
.on_empty_do_nothing()
.exec(&txn)
.await?;
let related = info
.conflicts
.iter()
.map(|s| (PackageRelatedEnum::Conflicts, s))
.chain(
info.replaces
.iter()
.map(|s| (PackageRelatedEnum::Replaces, s)),
)
.chain(
info.provides
.iter()
.map(|s| (PackageRelatedEnum::Provides, s)),
)
.chain(info.depends.iter().map(|s| (PackageRelatedEnum::Depend, s)))
.chain(
info.makedepends
.iter()
.map(|s| (PackageRelatedEnum::Makedepend, s)),
)
.chain(
info.checkdepends
.iter()
.map(|s| (PackageRelatedEnum::Checkdepend, s)),
)
.chain(
info.optdepends
.iter()
.map(|s| (PackageRelatedEnum::Optdepend, s)),
);
let related = crate::util::Chunked::new(related, PACKAGE_INSERT_LIMIT);
for chunk in related {
PackageRelated::insert_many(
chunk
.into_iter()
.map(|(t, s)| package_related::ActiveModel {
package_id: Set(pkg_entry.id),
r#type: Set(t),
name: Set(s.to_string()),
}),
)
.on_empty_do_nothing()
.exec(&txn)
.await?;
}
let files = crate::util::Chunked::new(pkg.files, PACKAGE_INSERT_LIMIT);
for chunk in files {
PackageFile::insert_many(chunk.into_iter().map(|s| package_file::ActiveModel {
package_id: Set(pkg_entry.id),
path: Set(s.display().to_string()),
}))
.on_empty_do_nothing()
.exec(&txn)
.await?;
}
txn.commit().await?;
Ok(pkg_entry)
}
pub async fn full(conn: &DbConn, id: i32) -> Result<Option<FullPackage>> {
if let Some(entry) = by_id(conn, id).await? {
let licenses: Vec<String> = entry
.find_related(PackageLicense)
.select_only()
.column(package_license::Column::Name)
.into_tuple()
.all(conn)
.await?;
let groups: Vec<String> = entry
.find_related(PackageGroup)
.select_only()
.column(package_group::Column::Name)
.into_tuple()
.all(conn)
.await?;
let related: Vec<(db::PackageRelatedEnum, String)> = entry
.find_related(PackageRelated)
.select_only()
.columns([package_related::Column::Type, package_related::Column::Name])
.into_tuple()
.all(conn)
.await?;
let files: Vec<String> = entry
.find_related(PackageFile)
.select_only()
.column(package_file::Column::Path)
.into_tuple()
.all(conn)
.await?;
Ok(Some(FullPackage {
entry,
licenses,
groups,
related,
files,
}))
} else {
Ok(None)
}
}
#[derive(FromQueryResult)]
pub struct PkgToRemove {
pub repo_id: i32,
pub id: i32,
}
fn max_pkg_ids_query(committed: bool) -> SelectStatement {
let mut query = Query::select()
.from(db::package::Entity)
.columns([
db::package::Column::RepoId,
db::package::Column::Arch,
db::package::Column::Name,
])
.expr_as(db::package::Column::Id.max(), Alias::new("max_id"))
.group_by_columns([
db::package::Column::RepoId,
db::package::Column::Arch,
db::package::Column::Name,
])
.to_owned();
if committed {
query.cond_where(db::package::Column::State.eq(db::PackageState::Committed));
}
query
}
/// Query that returns all packages that should be included in a sync for the given repository and
/// arch.
pub fn pkgs_to_sync(
conn: &DbConn,
repo: i32,
arch: &str,
) -> SelectorRaw<SelectModel<package::Model>> {
let (p1, p2) = (Alias::new("p1"), Alias::new("p2"));
let query = Query::select()
.columns(db::package::Column::iter().map(|c| (p1.clone(), c)))
.from_as(db::package::Entity, p1.clone())
.join_subquery(
JoinType::InnerJoin,
max_pkg_ids_query(false),
p2.clone(),
Expr::col((p1.clone(), db::package::Column::Id))
.eq(Expr::col((p2.clone(), Alias::new("max_id")))),
)
.cond_where(
Condition::all()
.add(Expr::col((p1.clone(), db::package::Column::RepoId)).eq(repo))
.add(
Expr::col((p1.clone(), db::package::Column::Arch))
.is_in([arch, crate::ANY_ARCH]),
)
.add(
Expr::col((p1.clone(), db::package::Column::State))
.ne(db::PackageState::PendingDeletion),
),
)
.to_owned();
let builder = conn.get_database_backend();
let sql = builder.build(&query);
db::Package::find().from_raw_sql(sql)
}
fn stale_pkgs_query(include_repo: bool) -> SelectStatement {
let (p1, p2) = (Alias::new("p1"), Alias::new("p2"));
let mut query = Query::select()
.from_as(db::package::Entity, p1.clone())
.to_owned();
if include_repo {
query.columns([
(p1.clone(), db::package::Column::RepoId),
(p1.clone(), db::package::Column::Id),
]);
} else {
query.column((p1.clone(), db::package::Column::Id));
}
// We left join on the max pkgs query because a repository that has all its packages set to
// "pending deletion" doesn't show up in the query. These are also included with a where clause
// on the joined rows.
query
.join_subquery(
JoinType::LeftJoin,
max_pkg_ids_query(true),
p2.clone(),
Condition::all()
.add(
Expr::col((p1.clone(), db::package::Column::RepoId))
.eq(Expr::col((p2.clone(), db::package::Column::RepoId))),
)
.add(
Expr::col((p1.clone(), db::package::Column::Arch))
.eq(Expr::col((p2.clone(), db::package::Column::Arch))),
)
.add(
Expr::col((p1.clone(), db::package::Column::Name))
.eq(Expr::col((p2.clone(), db::package::Column::Name))),
),
)
.cond_where(
Condition::any()
.add(
Expr::col((p1.clone(), db::package::Column::Id))
.lt(Expr::col((p2.clone(), Alias::new("max_id")))),
)
.add(
Expr::col((p1.clone(), db::package::Column::State))
.eq(db::PackageState::PendingDeletion),
),
);
query
}
pub fn stale_pkgs(conn: &DbConn) -> SelectorRaw<SelectModel<PkgToRemove>> {
let query = stale_pkgs_query(true);
let builder = conn.get_database_backend();
let sql = builder.build(&query);
PkgToRemove::find_by_statement(sql)
}
pub async fn delete_stale_pkgs(conn: &DbConn, max_id: i32) -> crate::Result<()> {
Ok(db::Package::delete_many()
.filter(db::package::Column::Id.lte(max_id))
.filter(db::package::Column::Id.in_subquery(stale_pkgs_query(false)))
.exec(conn)
.await
.map(|_| ())?)
}

View File

@ -1,59 +0,0 @@
use crate::db::*;
use sea_orm::{sea_query::IntoCondition, *};
#[derive(Deserialize)]
pub struct Filter {
name: Option<String>,
}
impl IntoCondition for Filter {
fn into_condition(self) -> Condition {
Condition::all().add_option(
self.name
.map(|name| repo::Column::Name.like(format!("%{}%", name))),
)
}
}
pub async fn page(
conn: &DbConn,
per_page: u64,
page: u64,
filter: Filter,
) -> Result<Vec<repo::Model>> {
let paginator = Repo::find()
.filter(filter)
.order_by_asc(repo::Column::Id)
.paginate(conn, per_page);
let repos = paginator.fetch_page(page).await?;
Ok(repos)
}
pub async fn by_id(conn: &DbConn, id: i32) -> Result<Option<repo::Model>> {
repo::Entity::find_by_id(id).one(conn).await
}
pub async fn by_name(conn: &DbConn, name: &str) -> Result<Option<repo::Model>> {
Repo::find()
.filter(repo::Column::Name.eq(name))
.one(conn)
.await
}
pub async fn insert(
conn: &DbConn,
distro_id: i32,
name: &str,
description: Option<&str>,
) -> Result<repo::Model> {
let model = repo::ActiveModel {
id: NotSet,
distro_id: Set(distro_id),
name: Set(String::from(name)),
description: Set(description.map(String::from)),
};
model.insert(conn).await
}

View File

@ -1,9 +1,8 @@
use std::{error::Error, fmt, io};
use axum::{
http::StatusCode,
response::{IntoResponse, Response},
};
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use std::error::Error;
use std::fmt;
use std::io;
pub type Result<T> = std::result::Result<T, ServerError>;
@ -13,9 +12,6 @@ pub enum ServerError {
Axum(axum::Error),
Db(sea_orm::DbErr),
Status(StatusCode),
Archive(libarchive::error::ArchiveError),
Figment(figment::Error),
Unit,
}
impl fmt::Display for ServerError {
@ -25,9 +21,6 @@ impl fmt::Display for ServerError {
ServerError::Axum(err) => write!(fmt, "{}", err),
ServerError::Status(status) => write!(fmt, "{}", status),
ServerError::Db(err) => write!(fmt, "{}", err),
ServerError::Archive(err) => write!(fmt, "{}", err),
ServerError::Figment(err) => write!(fmt, "{}", err),
ServerError::Unit => Ok(()),
}
}
}
@ -45,10 +38,7 @@ impl IntoResponse for ServerError {
ServerError::Db(sea_orm::DbErr::RecordNotFound(_)) => {
StatusCode::NOT_FOUND.into_response()
}
ServerError::Db(_)
| ServerError::Archive(_)
| ServerError::Figment(_)
| ServerError::Unit => StatusCode::INTERNAL_SERVER_ERROR.into_response(),
ServerError::Db(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(),
}
}
}
@ -82,15 +72,3 @@ impl From<sea_orm::DbErr> for ServerError {
ServerError::Db(err)
}
}
impl From<libarchive::error::ArchiveError> for ServerError {
fn from(err: libarchive::error::ArchiveError) -> Self {
ServerError::Archive(err)
}
}
impl From<figment::Error> for ServerError {
fn from(err: figment::Error) -> Self {
ServerError::Figment(err)
}
}

View File

@ -1,100 +1,32 @@
mod api;
mod cli;
mod config;
pub mod db;
mod error;
mod repo;
mod util;
mod web;
pub use config::{Config, DbConfig, FsConfig};
pub use error::{Result, ServerError};
use std::{io, path::PathBuf};
use clap::Parser;
use sea_orm_migration::MigratorTrait;
use tokio::runtime;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
pub use error::{Result, ServerError};
use repo::RepoGroupManager;
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
pub const ANY_ARCH: &str = "any";
pub const PKG_FILENAME_REGEX: &str = "^([a-z0-9@._+-]+)-((?:[0-9]+:)?[a-zA-Z0-9@._+]+-[0-9]+)-([a-zA-z0-9_]+).pkg.tar.([a-zA-Z0-9]+)$";
#[derive(Clone)]
pub struct Config {
data_dir: PathBuf,
repo_dir: PathBuf,
pkg_dir: PathBuf,
api_key: String,
}
#[derive(Clone)]
pub struct Global {
config: crate::config::Config,
repo: repo::Handle,
db: sea_orm::DbConn,
pkg_filename_re: regex::Regex,
config: Config,
repo_manager: Arc<RwLock<RepoGroupManager>>,
db: db::RieterDb,
}
fn main() -> crate::Result<()> {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
let handle = rt.handle();
#[tokio::main]
async fn main() -> crate::Result<()> {
let cli = cli::Cli::parse();
let global = setup(handle, cli.config_file)?;
handle.block_on(run(global))
}
fn setup(rt: &runtime::Handle, config_file: PathBuf) -> crate::Result<Global> {
let config: Config = Config::figment(config_file)
.extract()
.inspect_err(|e| tracing::error!("{}", e))?;
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::new(config.log_level.clone()))
.with(tracing_subscriber::fmt::layer())
.init();
tracing::info!("Connecting to database");
let db = rt.block_on(crate::db::connect(&config.db))?;
rt.block_on(crate::db::Migrator::up(&db, None))?;
let repo = match &config.fs {
FsConfig::Local { data_dir } => {
crate::repo::start(
data_dir.join("repos"),
db.clone(),
rt.clone(),
config.pkg_workers,
)?
//rt.block_on(crate::repo::RepoMgr::new(
// data_dir.join("repos"),
// db.clone(),
//))?
//RepoHandle::start(data_dir.join("repos"), db.clone(), config.pkg_workers, rt.clone())?
}
};
//let mgr = Arc::new(mgr);
//
//for _ in 0..config.pkg_workers {
// let clone = Arc::clone(&mgr);
//
// rt.spawn(async move { clone.pkg_parse_task().await });
//}
Ok(Global {
config: config.clone(),
repo,
db,
pkg_filename_re: regex::Regex::new(PKG_FILENAME_REGEX).unwrap(),
})
}
async fn run(global: Global) -> crate::Result<()> {
let domain: String = format!("{}:{}", &global.config.domain, global.config.port)
.parse()
.unwrap();
let listener = tokio::net::TcpListener::bind(domain).await?;
let app = web::router(global);
// run it with hyper on localhost:3000
Ok(axum::serve(listener, app.into_make_service())
.await
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?)
cli.run().await
}

View File

@ -1,245 +0,0 @@
use super::{archive, package, Command, SharedState};
use crate::db;
use std::{
path::PathBuf,
sync::{atomic::Ordering, Arc},
};
use futures::StreamExt;
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect};
use sea_query::Expr;
use tokio::{runtime, sync::mpsc};
use uuid::Uuid;
/// The actor is responsible for mutating the repositories. They receive their commands their
/// messages and process these commands in both a synchronous and asynchronous way.
pub struct Actor {
rt: runtime::Handle,
state: Arc<SharedState>,
}
impl Actor {
pub fn new(rt: runtime::Handle, state: Arc<SharedState>) -> Self {
Self {
rt,
state: Arc::clone(&state),
}
}
pub fn random_file_paths<const C: usize>(&self) -> [PathBuf; C] {
std::array::from_fn(|_| {
let uuid: uuid::fmt::Simple = Uuid::new_v4().into();
self.state.repos_dir.join(uuid.to_string())
})
}
/// Run the main actor loop
pub fn run(self) {
while let Some(msg) = {
let mut rx = self.state.rx.lock().unwrap();
rx.blocking_recv()
} {
match msg {
Command::ParsePkg(repo, path) => {
let _ = self.parse_pkg(repo, path);
if self
.state
.repos
.blocking_read()
.get(&repo)
.map(|n| n.0.load(Ordering::SeqCst))
== Some(0)
{
let _ = self.sync_repo(repo);
let _ = self.clean();
}
}
Command::SyncRepo(repo) => {
let _ = self.sync_repo(repo);
}
Command::Clean => {
let _ = self.clean();
}
}
}
}
/// Parse a queued package for the given repository.
fn parse_pkg(&self, repo: i32, path: PathBuf) -> crate::Result<()> {
let pkg = package::Package::open(&path)?;
let pkg = self
.rt
.block_on(db::query::package::insert(&self.state.conn, repo, pkg))?;
let dest_path = self
.state
.repos_dir
.join(repo.to_string())
.join(pkg.id.to_string());
std::fs::rename(path, dest_path)?;
tracing::info!(
"Added '{}-{}-{}' to repository {}",
pkg.name,
pkg.version,
pkg.arch,
repo,
);
self.state.repos.blocking_read().get(&repo).inspect(|n| {
n.0.fetch_sub(1, Ordering::SeqCst);
});
Ok(())
}
fn sync_repo(&self, repo: i32) -> crate::Result<()> {
let repos = self.state.repos.blocking_read();
if let Some(_guard) = repos.get(&repo).map(|n| n.1.lock()) {
let archs: Vec<String> = self.rt.block_on(
db::Package::find()
.filter(db::package::Column::RepoId.eq(repo))
.select_only()
.column(db::package::Column::Arch)
.distinct()
.into_tuple()
.all(&self.state.conn),
)?;
for arch in archs {
self.generate_archives(repo, &arch)?;
}
}
Ok(())
}
fn generate_archives(&self, repo: i32, arch: &str) -> crate::Result<()> {
let [tmp_ar_db_path, tmp_ar_files_path] = self.random_file_paths();
let mut ars = archive::RepoArchivesWriter::new(
&tmp_ar_db_path,
&tmp_ar_files_path,
self.random_file_paths(),
&self.rt,
&self.state.conn,
)?;
let (tx, mut rx) = mpsc::channel(1);
let conn = self.state.conn.clone();
let query = db::query::package::pkgs_to_sync(&self.state.conn, repo, arch);
// sea_orm needs its connections to be dropped inside an async context, so we spawn a task
// that streams the responses to the synchronous context via message passing
self.rt.spawn(async move {
match query.stream(&conn).await {
Ok(mut stream) => {
while let Some(res) = stream.next().await {
let is_err = res.is_err();
let _ = tx.send(res).await;
if is_err {
return;
}
}
}
Err(err) => {
let _ = tx.send(Err(err)).await;
}
}
});
let mut committed_ids: Vec<i32> = Vec::new();
while let Some(pkg) = rx.blocking_recv().transpose()? {
committed_ids.push(pkg.id);
ars.append_pkg(&pkg)?;
}
ars.close()?;
// Move newly generated package archives to their correct place
let repo_dir = self.state.repos_dir.join(repo.to_string());
std::fs::rename(tmp_ar_db_path, repo_dir.join(format!("{}.db.tar.gz", arch)))?;
std::fs::rename(
tmp_ar_files_path,
repo_dir.join(format!("{}.files.tar.gz", arch)),
)?;
// Update the state for the newly committed packages
self.rt.block_on(
db::Package::update_many()
.col_expr(
db::package::Column::State,
Expr::value(db::PackageState::Committed),
)
.filter(db::package::Column::Id.is_in(committed_ids))
.exec(&self.state.conn),
)?;
tracing::info!("Package archives generated for repo {} ('{}')", repo, arch);
Ok(())
}
fn clean(&self) -> crate::Result<()> {
let (tx, mut rx) = mpsc::channel(1);
let conn = self.state.conn.clone();
let query = db::query::package::stale_pkgs(&self.state.conn);
// sea_orm needs its connections to be dropped inside an async context, so we spawn a task
// that streams the responses to the synchronous context via message passing
self.rt.spawn(async move {
match query.stream(&conn).await {
Ok(mut stream) => {
while let Some(res) = stream.next().await {
let is_err = res.is_err();
let _ = tx.send(res).await;
if is_err {
return;
}
}
}
Err(err) => {
let _ = tx.send(Err(err)).await;
}
}
});
// Ids are monotonically increasing, so the max id suffices to know which packages to
// remove later
let mut max_id = -1;
let mut removed_pkgs = 0;
while let Some(pkg) = rx.blocking_recv().transpose()? {
// Failing to remove the package file isn't the biggest problem
let _ = std::fs::remove_file(
self.state
.repos_dir
.join(pkg.repo_id.to_string())
.join(pkg.id.to_string()),
);
if pkg.id > max_id {
max_id = pkg.id;
}
removed_pkgs += 1;
}
if removed_pkgs > 0 {
self.rt.block_on(db::query::package::delete_stale_pkgs(
&self.state.conn,
max_id,
))?;
}
tracing::info!("Cleaned up {removed_pkgs} old package(s)");
Ok(())
}
}

View File

@ -1,224 +0,0 @@
use crate::db;
use std::{
io::Write,
path::{Path, PathBuf},
};
use futures::StreamExt;
use libarchive::{
write::{Builder, FileWriter, WriteEntry},
Entry, WriteFilter, WriteFormat,
};
use sea_orm::{ColumnTrait, DbConn, ModelTrait, QueryFilter, QuerySelect};
use tokio::{runtime, sync::mpsc};
pub struct RepoArchivesWriter {
ar_db: FileWriter,
ar_files: FileWriter,
rt: runtime::Handle,
conn: DbConn,
tmp_paths: [PathBuf; 2],
}
impl RepoArchivesWriter {
pub fn new(
ar_db_path: impl AsRef<Path>,
ar_files_path: impl AsRef<Path>,
tmp_paths: [impl AsRef<Path>; 2],
rt: &runtime::Handle,
conn: &sea_orm::DbConn,
) -> crate::Result<Self> {
let ar_db = Self::open_ar(ar_db_path)?;
let ar_files = Self::open_ar(ar_files_path)?;
Ok(Self {
ar_db,
ar_files,
rt: rt.clone(),
conn: conn.clone(),
tmp_paths: [
tmp_paths[0].as_ref().to_path_buf(),
tmp_paths[1].as_ref().to_path_buf(),
],
})
}
fn open_ar(path: impl AsRef<Path>) -> crate::Result<FileWriter> {
let mut builder = Builder::new();
builder.add_filter(WriteFilter::Gzip)?;
builder.set_format(WriteFormat::PaxRestricted)?;
Ok(builder.open_file(path)?)
}
fn append_entry(
ar: &mut FileWriter,
src_path: impl AsRef<Path>,
dest_path: impl AsRef<Path>,
) -> crate::Result<()> {
let metadata = std::fs::metadata(&src_path)?;
let file_size = metadata.len();
let mut ar_entry = WriteEntry::new();
ar_entry.set_filetype(libarchive::archive::FileType::RegularFile);
ar_entry.set_pathname(dest_path);
ar_entry.set_mode(0o100644);
ar_entry.set_size(file_size.try_into().unwrap());
Ok(ar.append_path(&mut ar_entry, src_path)?)
}
pub fn append_pkg(&mut self, pkg: &db::package::Model) -> crate::Result<()> {
self.write_desc(&self.tmp_paths[0], pkg)?;
self.write_files(&self.tmp_paths[1], pkg)?;
let full_name = format!("{}-{}", pkg.name, pkg.version);
let dest_desc_path = format!("{}/desc", full_name);
let dest_files_path = format!("{}/files", full_name);
Self::append_entry(&mut self.ar_db, &self.tmp_paths[0], &dest_desc_path)?;
Self::append_entry(&mut self.ar_files, &self.tmp_paths[0], &dest_desc_path)?;
Self::append_entry(&mut self.ar_files, &self.tmp_paths[1], &dest_files_path)?;
Ok(())
}
/// Generate a "files" archive entry for the package in the given path
fn write_files(&self, path: impl AsRef<Path>, pkg: &db::package::Model) -> crate::Result<()> {
let mut f = std::io::BufWriter::new(std::fs::File::create(path)?);
writeln!(f, "%FILES%")?;
let (tx, mut rx) = mpsc::channel(1);
let conn = self.conn.clone();
let query = pkg.find_related(db::PackageFile);
self.rt.spawn(async move {
match query.stream(&conn).await {
Ok(mut stream) => {
while let Some(res) = stream.next().await {
let is_err = res.is_err();
let _ = tx.send(res).await;
if is_err {
return;
}
}
}
Err(err) => {
let _ = tx.send(Err(err)).await;
}
}
});
while let Some(file) = rx.blocking_recv().transpose()? {
writeln!(f, "{}", file.path)?;
}
f.flush()?;
Ok(())
}
fn write_desc(&self, path: impl AsRef<Path>, pkg: &db::package::Model) -> crate::Result<()> {
let mut f = std::io::BufWriter::new(std::fs::File::create(path)?);
let filename = format!(
"{}-{}-{}.pkg.tar.{}",
pkg.name, pkg.version, pkg.arch, pkg.compression
);
writeln!(f, "%FILENAME%\n{}", filename)?;
let mut write_attr = |k: &str, v: &str| {
if !v.is_empty() {
writeln!(f, "\n%{}%\n{}", k, v)
} else {
Ok(())
}
};
write_attr("NAME", &pkg.name)?;
write_attr("BASE", &pkg.base)?;
write_attr("VERSION", &pkg.version)?;
if let Some(ref desc) = pkg.description {
write_attr("DESC", desc)?;
}
let groups: Vec<String> = self.rt.block_on(
pkg.find_related(db::PackageGroup)
.select_only()
.column(db::package_group::Column::Name)
.into_tuple()
.all(&self.conn),
)?;
write_attr("GROUPS", &groups.join("\n"))?;
write_attr("CSIZE", &pkg.c_size.to_string())?;
write_attr("ISIZE", &pkg.size.to_string())?;
write_attr("SHA256SUM", &pkg.sha256_sum)?;
if let Some(ref url) = pkg.url {
write_attr("URL", url)?;
}
let licenses: Vec<String> = self.rt.block_on(
pkg.find_related(db::PackageLicense)
.select_only()
.column(db::package_license::Column::Name)
.into_tuple()
.all(&self.conn),
)?;
write_attr("LICENSE", &licenses.join("\n"))?;
write_attr("ARCH", &pkg.arch)?;
// TODO build date
write_attr(
"BUILDDATE",
&pkg.build_date.and_utc().timestamp().to_string(),
)?;
if let Some(ref packager) = pkg.packager {
write_attr("PACKAGER", packager)?;
}
let related = [
("REPLACES", db::PackageRelatedEnum::Replaces),
("CONFLICTS", db::PackageRelatedEnum::Conflicts),
("PROVIDES", db::PackageRelatedEnum::Provides),
("DEPENDS", db::PackageRelatedEnum::Depend),
("OPTDEPENDS", db::PackageRelatedEnum::Optdepend),
("MAKEDEPENDS", db::PackageRelatedEnum::Makedepend),
("CHECKDEPENDS", db::PackageRelatedEnum::Checkdepend),
];
for (key, attr) in related.into_iter() {
let items: Vec<String> = self.rt.block_on(
pkg.find_related(db::PackageRelated)
.filter(db::package_related::Column::Type.eq(attr))
.select_only()
.column(db::package_related::Column::Name)
.into_tuple()
.all(&self.conn),
)?;
write_attr(key, &items.join("\n"))?;
}
f.flush()?;
Ok(())
}
pub fn close(&mut self) -> crate::Result<()> {
self.ar_db.close()?;
self.ar_files.close()?;
let _ = std::fs::remove_file(&self.tmp_paths[0]);
let _ = std::fs::remove_file(&self.tmp_paths[1]);
Ok(())
}
}

View File

@ -1,144 +0,0 @@
use super::{Command, SharedState};
use crate::db;
use std::{
path::PathBuf,
sync::{atomic::Ordering, Arc},
};
use sea_orm::{
ActiveModelTrait, ColumnTrait, Condition, EntityTrait, NotSet, QueryFilter, QuerySelect, Set,
};
use sea_query::Expr;
use uuid::Uuid;
#[derive(Clone)]
pub struct Handle {
state: Arc<SharedState>,
}
impl Handle {
pub fn new(state: &Arc<SharedState>) -> Self {
Self {
state: Arc::clone(state),
}
}
pub fn random_file_paths<const C: usize>(&self) -> [PathBuf; C] {
std::array::from_fn(|_| {
let uuid: uuid::fmt::Simple = Uuid::new_v4().into();
self.state.repos_dir.join(uuid.to_string())
})
}
pub async fn get_or_create_repo(&self, distro: &str, repo: &str) -> crate::Result<i32> {
let mut repos = self.state.repos.write().await;
let distro_id: Option<i32> = db::Distro::find()
.filter(db::distro::Column::Name.eq(distro))
.select_only()
.column(db::distro::Column::Id)
.into_tuple()
.one(&self.state.conn)
.await?;
let distro_id = if let Some(id) = distro_id {
id
} else {
let new_distro = db::distro::ActiveModel {
id: NotSet,
name: Set(distro.to_string()),
description: NotSet,
};
new_distro.insert(&self.state.conn).await?.id
};
let repo_id: Option<i32> = db::Repo::find()
.filter(db::repo::Column::DistroId.eq(distro_id))
.filter(db::repo::Column::Name.eq(repo))
.select_only()
.column(db::repo::Column::Id)
.into_tuple()
.one(&self.state.conn)
.await?;
let repo_id = if let Some(id) = repo_id {
id
} else {
let new_repo = db::repo::ActiveModel {
id: NotSet,
distro_id: Set(distro_id),
name: Set(repo.to_string()),
description: NotSet,
};
let id = new_repo.insert(&self.state.conn).await?.id;
tokio::fs::create_dir(self.state.repos_dir.join(id.to_string())).await?;
repos.insert(id, Default::default());
id
};
Ok(repo_id)
}
pub async fn get_repo(&self, distro: &str, repo: &str) -> crate::Result<Option<i32>> {
Ok(db::Repo::find()
.find_also_related(db::Distro)
.filter(
Condition::all()
.add(db::repo::Column::Name.eq(repo))
.add(db::distro::Column::Name.eq(distro)),
)
.one(&self.state.conn)
.await
.map(|res| res.map(|(repo, _)| repo.id))?)
}
pub async fn remove_repo(&self, repo: i32) -> crate::Result<()> {
self.state.repos.write().await.remove(&repo);
db::Repo::delete_by_id(repo).exec(&self.state.conn).await?;
let _ = tokio::fs::remove_dir_all(self.state.repos_dir.join(repo.to_string())).await;
Ok(())
}
/// Remove all packages in the repository that have a given arch. This method marks all
/// packages with the given architecture as "pending deletion", before performing a manual sync
/// & removal of stale packages.
pub async fn remove_repo_arch(&self, repo: i32, arch: &str) -> crate::Result<()> {
db::Package::update_many()
.col_expr(
db::package::Column::State,
Expr::value(db::PackageState::PendingDeletion),
)
.filter(
Condition::all()
.add(db::package::Column::RepoId.eq(repo))
.add(db::package::Column::Arch.eq(arch)),
)
.exec(&self.state.conn)
.await?;
self.queue_sync(repo).await;
self.queue_clean().await;
Ok(())
}
pub async fn queue_pkg(&self, repo: i32, path: PathBuf) {
self.state.tx.send(Command::ParsePkg(repo, path)).unwrap();
self.state.repos.read().await.get(&repo).inspect(|n| {
n.0.fetch_add(1, Ordering::SeqCst);
});
}
async fn queue_sync(&self, repo: i32) {
self.state.tx.send(Command::SyncRepo(repo)).unwrap();
}
async fn queue_clean(&self) {
self.state.tx.send(Command::Clean).unwrap();
}
}

View File

@ -0,0 +1,311 @@
use super::package::Package;
use libarchive::write::{Builder, WriteEntry};
use libarchive::{Entry, WriteFilter, WriteFormat};
use std::fs;
use std::io;
use std::path::{Path, PathBuf};
pub const ANY_ARCH: &str = "any";
/// Overarching abstraction that orchestrates updating the repositories stored on the server
pub struct RepoGroupManager {
repo_dir: PathBuf,
pkg_dir: PathBuf,
}
fn parse_pkg_filename(file_name: &str) -> (String, &str, &str, &str) {
let name_parts = file_name.split('-').collect::<Vec<_>>();
let name = name_parts[..name_parts.len() - 3].join("-");
let version = name_parts[name_parts.len() - 3];
let release = name_parts[name_parts.len() - 2];
let (arch, _) = name_parts[name_parts.len() - 1].split_once('.').unwrap();
(name, version, release, arch)
}
impl RepoGroupManager {
pub fn new<P1: AsRef<Path>, P2: AsRef<Path>>(repo_dir: P1, pkg_dir: P2) -> Self {
RepoGroupManager {
repo_dir: repo_dir.as_ref().to_path_buf(),
pkg_dir: pkg_dir.as_ref().to_path_buf(),
}
}
pub fn sync(&mut self, repo: &str, arch: &str) -> io::Result<()> {
let subrepo_path = self.repo_dir.join(repo).join(arch);
let mut ar_db = Builder::new();
ar_db.add_filter(WriteFilter::Gzip)?;
ar_db.set_format(WriteFormat::PaxRestricted)?;
let mut ar_files = Builder::new();
ar_files.add_filter(WriteFilter::Gzip)?;
ar_files.set_format(WriteFormat::PaxRestricted)?;
let mut ar_db = ar_db.open_file(subrepo_path.join(format!("{}.db.tar.gz", repo)))?;
let mut ar_files =
ar_files.open_file(subrepo_path.join(format!("{}.files.tar.gz", repo)))?;
// All architectures should also include the "any" architecture, except for the "any"
// architecture itself.
let repo_any_dir = self.repo_dir.join(repo).join(ANY_ARCH);
let any_entries_iter = if arch != ANY_ARCH && repo_any_dir.try_exists()? {
Some(repo_any_dir.read_dir()?)
} else {
None
}
.into_iter()
.flatten();
for entry in subrepo_path.read_dir()?.chain(any_entries_iter) {
let entry = entry?;
if entry.file_type()?.is_dir() {
// The desc file needs to be added to both archives
let path_in_tar = PathBuf::from(entry.file_name()).join("desc");
let src_path = entry.path().join("desc");
let metadata = src_path.metadata()?;
let mut ar_entry = WriteEntry::new();
ar_entry.set_pathname(&path_in_tar);
// These small text files will definitely fit inside an i64
ar_entry.set_size(metadata.len().try_into().unwrap());
ar_entry.set_filetype(libarchive::archive::FileType::RegularFile);
ar_entry.set_mode(0o100644);
ar_db.append_path(&mut ar_entry, &src_path)?;
ar_files.append_path(&mut ar_entry, src_path)?;
// The files file is only required in the files database
let path_in_tar = PathBuf::from(entry.file_name()).join("files");
let src_path = entry.path().join("files");
let metadata = src_path.metadata()?;
let mut ar_entry = WriteEntry::new();
ar_entry.set_filetype(libarchive::archive::FileType::RegularFile);
ar_entry.set_pathname(&path_in_tar);
ar_entry.set_mode(0o100644);
// These small text files will definitely fit inside an i64
ar_entry.set_size(metadata.len().try_into().unwrap());
ar_files.append_path(&mut ar_entry, src_path)?;
}
}
ar_db.close()?;
ar_files.close()?;
Ok(())
}
/// Synchronize all present architectures' db archives in the given repository.
pub fn sync_all(&mut self, repo: &str) -> io::Result<()> {
for entry in self.repo_dir.join(repo).read_dir()? {
let entry = entry?;
if entry.file_type()?.is_dir() {
self.sync(repo, &entry.file_name().to_string_lossy())?;
}
}
Ok(())
}
pub fn add_pkg_from_path<P: AsRef<Path>>(
&mut self,
repo: &str,
path: P,
) -> io::Result<Package> {
let pkg = Package::open(&path)?;
self.add_pkg(repo, &pkg)?;
// After successfully adding the package, we move it to the packages directory
let dest_pkg_path = self
.pkg_dir
.join(repo)
.join(&pkg.info.arch)
.join(pkg.file_name());
fs::create_dir_all(dest_pkg_path.parent().unwrap())?;
fs::rename(&path, dest_pkg_path)?;
Ok(pkg)
}
/// Add a package to the given repo, returning to what architectures the package was added.
pub fn add_pkg(&mut self, repo: &str, pkg: &Package) -> io::Result<()> {
// TODO
// * if arch is "any", check if package doesn't already exist for other architecture
// * if arch isn't "any", check if package doesn't already exist for "any" architecture
// We first remove any existing version of the package
self.remove_pkg(repo, &pkg.info.arch, &pkg.info.name, false)?;
// Write the `desc` and `files` metadata files to disk
let metadata_dir = self
.repo_dir
.join(repo)
.join(&pkg.info.arch)
.join(format!("{}-{}", pkg.info.name, pkg.info.version));
fs::create_dir_all(&metadata_dir)?;
let mut desc_file = fs::File::create(metadata_dir.join("desc"))?;
pkg.write_desc(&mut desc_file)?;
let mut files_file = fs::File::create(metadata_dir.join("files"))?;
pkg.write_files(&mut files_file)?;
// If a package of type "any" is added, we need to update every existing database
if pkg.info.arch == ANY_ARCH {
self.sync_all(repo)?;
} else {
self.sync(repo, &pkg.info.arch)?;
}
Ok(())
}
pub fn remove_repo(&mut self, repo: &str) -> io::Result<bool> {
let repo_dir = self.repo_dir.join(repo);
if !repo_dir.exists() {
Ok(false)
} else {
fs::remove_dir_all(&repo_dir)?;
fs::remove_dir_all(self.pkg_dir.join(repo))?;
Ok(true)
}
}
pub fn remove_repo_arch(&mut self, repo: &str, arch: &str) -> io::Result<bool> {
let sub_path = PathBuf::from(repo).join(arch);
let repo_dir = self.repo_dir.join(&sub_path);
if !repo_dir.exists() {
return Ok(false);
}
fs::remove_dir_all(&repo_dir)?;
fs::remove_dir_all(self.pkg_dir.join(sub_path))?;
// Removing the "any" architecture updates all other repositories
if arch == ANY_ARCH {
self.sync_all(repo)?;
}
Ok(true)
}
pub fn remove_pkg(
&mut self,
repo: &str,
arch: &str,
pkg_name: &str,
sync: bool,
) -> io::Result<bool> {
let repo_arch_dir = self.repo_dir.join(repo).join(arch);
if !repo_arch_dir.exists() {
return Ok(false);
}
for entry in repo_arch_dir.read_dir()? {
let entry = entry?;
// Make sure we skip the archive files
if !entry.metadata()?.is_dir() {
continue;
}
let file_name = entry.file_name();
let file_name = file_name.to_string_lossy();
// The directory name should only contain the name of the package. The last two parts
// when splitting on a dash are the pkgver and pkgrel, so we trim those
let name_parts = file_name.split('-').collect::<Vec<_>>();
let name = name_parts[..name_parts.len() - 2].join("-");
if name == pkg_name {
fs::remove_dir_all(entry.path())?;
// Also remove the old package archive
let repo_arch_pkg_dir = self.pkg_dir.join(repo).join(arch);
repo_arch_pkg_dir.read_dir()?.try_for_each(|res| {
res.and_then(|entry: fs::DirEntry| {
let file_name = entry.file_name();
let file_name = file_name.to_string_lossy();
let (name, _, _, _) = parse_pkg_filename(&file_name);
if name == pkg_name {
fs::remove_file(entry.path())
} else {
Ok(())
}
})
})?;
if sync {
if arch == ANY_ARCH {
self.sync_all(repo)?;
} else {
self.sync(repo, arch)?;
}
}
return Ok(true);
}
}
Ok(false)
}
/// Wrapper around `remove_pkg` that accepts a path relative to the package directory to a
/// package archive.
pub fn remove_pkg_from_path<P: AsRef<Path>>(
&mut self,
path: P,
sync: bool,
) -> io::Result<Option<(String, String, String, String)>> {
let path = path.as_ref();
let components: Vec<_> = path.iter().collect();
if let [repo, _arch, file_name] = components[..] {
let full_path = self.pkg_dir.join(path);
if full_path.try_exists()? {
let file_name = file_name.to_string_lossy();
let (name, version, release, arch) = parse_pkg_filename(&file_name);
let metadata_dir_name = format!("{}-{}-{}", name, version, release);
// Remove package archive and entry in database
fs::remove_file(full_path)?;
fs::remove_dir_all(self.repo_dir.join(repo).join(arch).join(metadata_dir_name))?;
if sync {
if arch == ANY_ARCH {
self.sync_all(&repo.to_string_lossy())?;
} else {
self.sync(&repo.to_string_lossy(), arch)?;
}
}
Ok(Some((
name,
version.to_string(),
release.to_string(),
arch.to_string(),
)))
} else {
Ok(None)
}
} else {
Ok(None)
}
}
}

View File

@ -1,90 +1,256 @@
mod actor;
mod archive;
mod handle;
mod manager;
pub mod package;
pub use actor::Actor;
pub use handle::Handle;
pub use manager::RepoGroupManager;
use crate::db;
use std::path::PathBuf;
use std::{
collections::HashMap,
path::{Path, PathBuf},
sync::{atomic::AtomicU32, Arc, Mutex},
use axum::body::Body;
use axum::extract::{BodyStream, Path, State};
use axum::http::Request;
use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::routing::{delete, post};
use axum::Router;
use futures::StreamExt;
use sea_orm::ModelTrait;
use std::sync::Arc;
use tokio::{fs, io::AsyncWriteExt};
use tower::util::ServiceExt;
use tower_http::services::{ServeDir, ServeFile};
use tower_http::validate_request::ValidateRequestHeaderLayer;
use uuid::Uuid;
const DB_FILE_EXTS: [&str; 4] = [".db", ".files", ".db.tar.gz", ".files.tar.gz"];
pub fn router(api_key: &str) -> Router<crate::Global> {
Router::new()
.route(
"/:repo",
post(post_package_archive)
.delete(delete_repo)
.route_layer(ValidateRequestHeaderLayer::bearer(api_key)),
)
.route(
"/:repo/:arch",
delete(delete_arch_repo).route_layer(ValidateRequestHeaderLayer::bearer(api_key)),
)
// Routes added after the layer do not get that layer applied, so the GET requests will not
// be authorized
.route(
"/:repo/:arch/:filename",
delete(delete_package)
.route_layer(ValidateRequestHeaderLayer::bearer(api_key))
.get(get_file),
)
}
/// Serve the package archive files and database archives. If files are requested for an
/// architecture that does not have any explicit packages, a repository containing only "any" files
/// is returned.
async fn get_file(
State(global): State<crate::Global>,
Path((repo, arch, mut file_name)): Path<(String, String, String)>,
req: Request<Body>,
) -> crate::Result<impl IntoResponse> {
let repo_dir = global.config.repo_dir.join(&repo).join(&arch);
let repo_exists = tokio::fs::try_exists(&repo_dir).await?;
let res = if DB_FILE_EXTS.iter().any(|ext| file_name.ends_with(ext)) {
// Append tar extension to ensure we find the file
if !file_name.ends_with(".tar.gz") {
file_name.push_str(".tar.gz");
};
use sea_orm::{DbConn, EntityTrait, QuerySelect};
use tokio::{
runtime,
sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
RwLock,
},
if repo_exists {
ServeFile::new(repo_dir.join(file_name)).oneshot(req).await
} else {
let path = global
.config
.repo_dir
.join(repo)
.join(manager::ANY_ARCH)
.join(file_name);
ServeFile::new(path).oneshot(req).await
}
} else {
let any_file = global
.config
.pkg_dir
.join(repo)
.join(manager::ANY_ARCH)
.join(file_name);
if repo_exists {
ServeDir::new(global.config.pkg_dir)
.fallback(ServeFile::new(any_file))
.oneshot(req)
.await
} else {
ServeFile::new(any_file).oneshot(req).await
}
};
pub enum Command {
ParsePkg(i32, PathBuf),
SyncRepo(i32),
Clean,
Ok(res)
}
type RepoState = (AtomicU32, Arc<Mutex<()>>);
async fn post_package_archive(
State(global): State<crate::Global>,
Path(repo): Path<String>,
mut body: BodyStream,
) -> crate::Result<()> {
// We first stream the uploaded file to disk
let uuid: uuid::fmt::Simple = Uuid::new_v4().into();
let path = global.config.pkg_dir.join(uuid.to_string());
let mut f = fs::File::create(&path).await?;
pub struct SharedState {
pub repos_dir: PathBuf,
pub conn: DbConn,
pub rx: Mutex<UnboundedReceiver<Command>>,
pub tx: UnboundedSender<Command>,
pub repos: RwLock<HashMap<i32, RepoState>>,
while let Some(chunk) = body.next().await {
f.write_all(&chunk?).await?;
}
impl SharedState {
pub fn new(
repos_dir: impl AsRef<Path>,
conn: DbConn,
repos: HashMap<i32, (AtomicU32, Arc<Mutex<()>>)>,
) -> Self {
let (tx, rx) = unbounded_channel();
let clone = Arc::clone(&global.repo_manager);
let path_clone = path.clone();
let repo_clone = repo.clone();
let res = tokio::task::spawn_blocking(move || {
clone
.write()
.unwrap()
.add_pkg_from_path(&repo_clone, &path_clone)
})
.await?;
Self {
repos_dir: repos_dir.as_ref().to_path_buf(),
conn,
rx: Mutex::new(rx),
tx,
repos: RwLock::new(repos),
match res {
// Insert the newly added package into the database
Ok(pkg) => {
tracing::info!("Added '{}' to repository '{}'", pkg.file_name(), repo);
// Query the repo for its ID, or create it if it does not already exist
let res = global.db.repo_by_name(&repo).await?;
let repo_id = if let Some(repo_entity) = res {
repo_entity.id
} else {
global.db.insert_repo(&repo, None).await?.last_insert_id
};
// If the package already exists in the database, we remove it first
let res = global
.db
.package_by_fields(repo_id, &pkg.info.name, None, &pkg.info.arch)
.await?;
if let Some(entry) = res {
entry.delete(&global.db).await?;
}
global.db.insert_package(repo_id, pkg).await?;
Ok(())
}
// Remove the uploaded file and return the error
Err(err) => {
tokio::fs::remove_file(path).await?;
Err(err.into())
}
}
}
pub fn start(
repos_dir: impl AsRef<Path>,
conn: DbConn,
rt: runtime::Handle,
actors: u32,
) -> crate::Result<Handle> {
std::fs::create_dir_all(repos_dir.as_ref())?;
async fn delete_repo(
State(global): State<crate::Global>,
Path(repo): Path<String>,
) -> crate::Result<StatusCode> {
let clone = Arc::clone(&global.repo_manager);
let mut repos = HashMap::new();
let repo_ids: Vec<i32> = rt.block_on(
db::Repo::find()
.select_only()
.column(db::repo::Column::Id)
.into_tuple()
.all(&conn),
)?;
let repo_clone = repo.clone();
let repo_removed =
tokio::task::spawn_blocking(move || clone.write().unwrap().remove_repo(&repo_clone))
.await??;
for id in repo_ids {
repos.insert(id, Default::default());
if repo_removed {
let res = global.db.repo_by_name(&repo).await?;
if let Some(repo_entry) = res {
repo_entry.delete(&global.db).await?;
}
let state = Arc::new(SharedState::new(repos_dir, conn, repos));
tracing::info!("Removed repository '{}'", repo);
for _ in 0..actors {
let actor = Actor::new(rt.clone(), Arc::clone(&state));
std::thread::spawn(|| actor.run());
Ok(StatusCode::OK)
} else {
Ok(StatusCode::NOT_FOUND)
}
}
Ok(Handle::new(&state))
async fn delete_arch_repo(
State(global): State<crate::Global>,
Path((repo, arch)): Path<(String, String)>,
) -> crate::Result<StatusCode> {
let clone = Arc::clone(&global.repo_manager);
let arch_clone = arch.clone();
let repo_clone = repo.clone();
let repo_removed = tokio::task::spawn_blocking(move || {
clone
.write()
.unwrap()
.remove_repo_arch(&repo_clone, &arch_clone)
})
.await??;
if repo_removed {
let res = global.db.repo_by_name(&repo).await?;
if let Some(repo_entry) = res {
global
.db
.delete_packages_with_arch(repo_entry.id, &arch)
.await?;
}
tracing::info!("Removed architecture '{}' from repository '{}'", arch, repo);
Ok(StatusCode::OK)
} else {
Ok(StatusCode::NOT_FOUND)
}
}
async fn delete_package(
State(global): State<crate::Global>,
Path((repo, arch, file_name)): Path<(String, String, String)>,
) -> crate::Result<StatusCode> {
let clone = Arc::clone(&global.repo_manager);
let path = PathBuf::from(&repo).join(arch).join(&file_name);
let res = tokio::task::spawn_blocking(move || {
clone.write().unwrap().remove_pkg_from_path(path, true)
})
.await??;
if let Some((name, version, release, arch)) = res {
let res = global.db.repo_by_name(&repo).await?;
if let Some(repo_entry) = res {
let res = global
.db
.package_by_fields(
repo_entry.id,
&name,
Some(&format!("{}-{}", version, release)),
&arch,
)
.await?;
if let Some(entry) = res {
entry.delete(&global.db).await?;
}
}
tracing::info!("Removed '{}' from repository '{}'", file_name, repo);
Ok(StatusCode::OK)
} else {
Ok(StatusCode::NOT_FOUND)
}
}

View File

@ -1,17 +1,15 @@
use chrono::NaiveDateTime;
use libarchive::read::{Archive, Builder};
use libarchive::{Entry, ReadFilter};
use sea_orm::ActiveValue::Set;
use std::fmt;
use std::fs;
use std::io::{self, BufRead, BufReader, BufWriter, Read, Write};
use std::path::{Path, PathBuf};
use crate::db::entities::package;
use std::{
fmt, fs,
io::{self, BufRead, BufReader, Read},
path::{Path, PathBuf},
};
use chrono::NaiveDateTime;
use libarchive::{
read::{Archive, Builder},
Entry, ReadFilter,
};
use sea_orm::ActiveValue::Set;
const IGNORED_FILES: [&str; 5] = [".BUILDINFO", ".INSTALL", ".MTREE", ".PKGINFO", ".CHANGELOG"];
#[derive(Debug, Clone)]
pub struct Package {
@ -48,18 +46,18 @@ pub struct PkgInfo {
}
#[derive(Debug, PartialEq, Eq)]
pub enum InvalidPkgInfoError {
Size,
BuildDate,
PgpSigSize,
pub enum ParsePkgInfoError {
InvalidSize,
InvalidBuildDate,
InvalidPgpSigSize,
}
impl fmt::Display for InvalidPkgInfoError {
impl fmt::Display for ParsePkgInfoError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let s = match self {
Self::Size => "invalid size",
Self::BuildDate => "invalid build date",
Self::PgpSigSize => "invalid pgp sig size",
Self::InvalidSize => "invalid size",
Self::InvalidBuildDate => "invalid build date",
Self::InvalidPgpSigSize => "invalid pgp sig size",
};
write!(f, "{}", s)
@ -67,7 +65,7 @@ impl fmt::Display for InvalidPkgInfoError {
}
impl PkgInfo {
pub fn extend<S: AsRef<str>>(&mut self, line: S) -> Result<(), InvalidPkgInfoError> {
pub fn extend<S: AsRef<str>>(&mut self, line: S) -> Result<(), ParsePkgInfoError> {
let line = line.as_ref();
if !line.starts_with('#') {
@ -77,21 +75,26 @@ impl PkgInfo {
"pkgbase" => self.base = value.to_string(),
"pkgver" => self.version = value.to_string(),
"pkgdesc" => self.description = Some(value.to_string()),
"size" => self.size = value.parse().map_err(|_| InvalidPkgInfoError::Size)?,
"size" => {
self.size = value.parse().map_err(|_| ParsePkgInfoError::InvalidSize)?
}
"url" => self.url = Some(value.to_string()),
"arch" => self.arch = value.to_string(),
"builddate" => {
let seconds: i64 =
value.parse().map_err(|_| InvalidPkgInfoError::BuildDate)?;
self.build_date = chrono::DateTime::from_timestamp_millis(seconds * 1000)
.ok_or(InvalidPkgInfoError::BuildDate)?
.naive_utc();
let seconds: i64 = value
.parse()
.map_err(|_| ParsePkgInfoError::InvalidBuildDate)?;
self.build_date = NaiveDateTime::from_timestamp_millis(seconds * 1000)
.ok_or(ParsePkgInfoError::InvalidBuildDate)?
}
"packager" => self.packager = Some(value.to_string()),
"pgpsig" => self.pgpsig = Some(value.to_string()),
"pgpsigsize" => {
self.pgpsigsize =
Some(value.parse().map_err(|_| InvalidPkgInfoError::PgpSigSize)?)
self.pgpsigsize = Some(
value
.parse()
.map_err(|_| ParsePkgInfoError::InvalidPgpSigSize)?,
)
}
"group" => self.groups.push(value.to_string()),
"license" => self.licenses.push(value.to_string()),
@ -151,9 +154,11 @@ impl Package {
let entry = entry?;
let path_name = entry.pathname();
if !path_name.starts_with('.') {
if !IGNORED_FILES.iter().any(|p| p == &path_name) {
files.push(PathBuf::from(path_name));
} else if path_name == ".PKGINFO" {
}
if path_name == ".PKGINFO" {
info = Some(PkgInfo::parse(entry)?);
}
}
@ -188,11 +193,79 @@ impl Package {
// This unwrap should be safe, because we only allow passing through compressions with
// known file extensions
format!(
"{}.pkg.tar.{}",
"{}.pkg.tar{}",
self.full_name(),
self.compression.extension().unwrap()
)
}
/// Write the formatted desc file to the provided writer
pub fn write_desc<W: Write>(&self, w: &mut W) -> io::Result<()> {
// We write a lot of small strings to the writer, so wrapping it in a BufWriter is
// beneficial
let mut w = BufWriter::new(w);
let info = &self.info;
writeln!(w, "%FILENAME%\n{}", self.file_name())?;
let mut write = |key: &str, value: &str| {
if !value.is_empty() {
writeln!(w, "\n%{}%\n{}", key, value)
} else {
Ok(())
}
};
write("NAME", &info.name)?;
write("BASE", &info.base)?;
write("VERSION", &info.version)?;
if let Some(ref description) = info.description {
write("DESC", description)?;
}
write("GROUPS", &info.groups.join("\n"))?;
write("CSIZE", &info.csize.to_string())?;
write("ISIZE", &info.size.to_string())?;
write("SHA256SUM", &info.sha256sum)?;
if let Some(ref url) = info.url {
write("URL", url)?;
}
write("LICENSE", &info.licenses.join("\n"))?;
write("ARCH", &info.arch)?;
write("BUILDDATE", &info.build_date.timestamp().to_string())?;
if let Some(ref packager) = info.packager {
write("PACKAGER", packager)?;
}
write("REPLACES", &info.replaces.join("\n"))?;
write("CONFLICTS", &info.conflicts.join("\n"))?;
write("PROVIDES", &info.provides.join("\n"))?;
write("DEPENDS", &info.depends.join("\n"))?;
write("OPTDEPENDS", &info.optdepends.join("\n"))?;
write("MAKEDEPENDS", &info.makedepends.join("\n"))?;
write("CHECKDEPENDS", &info.checkdepends.join("\n"))?;
Ok(())
}
pub fn write_files<W: Write>(&self, w: &mut W) -> io::Result<()> {
// We write a lot of small strings to the writer, so wrapping it in a BufWriter is
// beneficial
let mut w = BufWriter::new(w);
writeln!(w, "%FILES%")?;
for file in &self.files {
writeln!(w, "{}", file.to_string_lossy())?;
}
Ok(())
}
}
impl From<Package> for package::ActiveModel {

View File

@ -1,23 +0,0 @@
pub struct Chunked<I> {
iter: I,
chunk_size: usize,
}
impl<I: Iterator> Chunked<I> {
pub fn new<T: IntoIterator<IntoIter = I>>(into: T, chunk_size: usize) -> Self {
Self {
iter: into.into_iter(),
chunk_size,
}
}
}
// https://users.rust-lang.org/t/how-to-breakup-an-iterator-into-chunks/87915/5
impl<I: Iterator> Iterator for Chunked<I> {
type Item = Vec<I::Item>;
fn next(&mut self) -> Option<Self::Item> {
Some(self.iter.by_ref().take(self.chunk_size).collect())
.filter(|chunk: &Vec<_>| !chunk.is_empty())
}
}

View File

@ -1,39 +0,0 @@
use serde::{Deserialize, Serialize};
#[derive(Deserialize)]
pub struct Query {
#[serde(default = "default_page")]
pub page: u64,
#[serde(default = "default_per_page")]
pub per_page: u64,
}
fn default_page() -> u64 {
1
}
fn default_per_page() -> u64 {
25
}
#[derive(Serialize)]
pub struct PaginatedResponse<T>
where
T: for<'de> Serialize,
{
pub page: u64,
pub per_page: u64,
pub count: usize,
pub items: Vec<T>,
}
impl Query {
pub fn res<T: for<'de> Serialize>(self, items: Vec<T>) -> PaginatedResponse<T> {
PaginatedResponse {
page: self.page,
per_page: self.per_page,
count: items.len(),
items,
}
}
}

View File

@ -1,13 +0,0 @@
mod api;
mod repo;
use axum::Router;
use tower_http::trace::TraceLayer;
pub fn router(global: crate::Global) -> Router {
Router::new()
.nest("/api", api::router())
.merge(repo::router(&global.config.api_key))
.with_state(global)
.layer(TraceLayer::new_for_http())
}

View File

@ -1,126 +0,0 @@
use crate::{db, FsConfig};
use axum::{
body::Body,
extract::{Path, State},
http::{Request, StatusCode},
response::IntoResponse,
routing::{delete, get, post},
Router,
};
use futures::TryStreamExt;
use tokio_util::io::StreamReader;
use tower::util::ServiceExt;
use tower_http::{services::ServeFile, validate_request::ValidateRequestHeaderLayer};
pub fn router(api_key: &str) -> Router<crate::Global> {
Router::new()
.route(
"/:distro/:repo",
post(post_package_archive)
.delete(delete_repo)
.route_layer(ValidateRequestHeaderLayer::bearer(api_key)),
)
.route(
"/:distro/:repo/:arch",
delete(delete_arch_repo).route_layer(ValidateRequestHeaderLayer::bearer(api_key)),
)
// Routes added after the layer do not get that layer applied, so the GET requests will not
// be authorized
.route("/:distro/:repo/:arch/:filename", get(get_file))
}
/// Serve the package archive files and database archives. If files are requested for an
/// architecture that does not have any explicit packages, a repository containing only "any" files
/// is returned.
async fn get_file(
State(global): State<crate::Global>,
Path((distro, repo, arch, file_name)): Path<(String, String, String, String)>,
req: Request<Body>,
) -> crate::Result<impl IntoResponse> {
if let Some(repo_id) = global.repo.get_repo(&distro, &repo).await? {
let file_name =
if file_name == format!("{}.db", repo) || file_name == format!("{}.db.tar.gz", repo) {
format!("{}.db.tar.gz", arch)
} else if file_name == format!("{}.files", repo)
|| file_name == format!("{}.files.tar.gz", repo)
{
format!("{}.files.tar.gz", arch)
} else if let Some(m) = global.pkg_filename_re.captures(&file_name) {
// SAFETY: these unwraps cannot fail if the RegEx matched successfully
db::query::package::by_fields(
&global.db,
repo_id,
m.get(1).unwrap().as_str(),
m.get(2).unwrap().as_str(),
m.get(3).unwrap().as_str(),
m.get(4).unwrap().as_str(),
)
.await?
.ok_or(StatusCode::NOT_FOUND)?
.id
.to_string()
} else {
return Err(StatusCode::NOT_FOUND.into());
};
match global.config.fs {
FsConfig::Local { data_dir } => {
let path = data_dir
.join("repos")
.join(repo_id.to_string())
.join(file_name);
Ok(ServeFile::new(path).oneshot(req).await)
}
}
} else {
Err(StatusCode::NOT_FOUND.into())
}
}
async fn post_package_archive(
State(global): State<crate::Global>,
Path((distro, repo)): Path<(String, String)>,
body: Body,
) -> crate::Result<StatusCode> {
let repo_id = global.repo.get_or_create_repo(&distro, &repo).await?;
let [tmp_path] = global.repo.random_file_paths();
let mut tmp_file = tokio::fs::File::create(&tmp_path).await?;
let mut body = StreamReader::new(body.into_data_stream().map_err(std::io::Error::other));
tokio::io::copy(&mut body, &mut tmp_file).await?;
global.repo.queue_pkg(repo_id, tmp_path).await;
Ok(StatusCode::ACCEPTED)
}
async fn delete_repo(
State(global): State<crate::Global>,
Path((distro, repo)): Path<(String, String)>,
) -> crate::Result<StatusCode> {
if let Some(repo) = global.repo.get_repo(&distro, &repo).await? {
global.repo.remove_repo(repo).await?;
tracing::info!("Removed repository {repo}");
Ok(StatusCode::OK)
} else {
Ok(StatusCode::NOT_FOUND)
}
}
async fn delete_arch_repo(
State(global): State<crate::Global>,
Path((distro, repo, arch)): Path<(String, String, String)>,
) -> crate::Result<StatusCode> {
if let Some(repo) = global.repo.get_repo(&distro, &repo).await? {
global.repo.remove_repo_arch(repo, &arch).await?;
tracing::info!("Removed architecture '{arch}' from repository {repo}");
Ok(StatusCode::OK)
} else {
Ok(StatusCode::NOT_FOUND)
}
}