feat: better concurrent uploads with limited parallel parsing

concurrent-repos
Jef Roosens 2024-06-09 23:04:45 +02:00
parent fa6de9b035
commit 97612e1af6
Signed by: Jef Roosens
GPG Key ID: B75D4F293C7052DB
5 changed files with 96 additions and 49 deletions

1
Cargo.lock generated
View File

@ -1662,6 +1662,7 @@ dependencies = [
"libarchive",
"sea-orm",
"sea-orm-migration",
"sea-query",
"serde",
"sha256",
"tokio",

View File

@ -14,6 +14,7 @@ futures = "0.3.28"
http-body-util = "0.1.1"
libarchive = { path = "../libarchive" }
sea-orm-migration = "0.12.1"
sea-query = { version = "0.30.7", features = ["backend-postgres", "backend-sqlite"] }
serde = { version = "1.0.178", features = ["derive"] }
sha256 = "1.1.4"
tokio = { version = "1.29.1", features = ["full"] }

View File

@ -7,19 +7,27 @@ use std::{
};
use futures::StreamExt;
use sea_orm::{ColumnTrait, DbConn, ModelTrait, QueryFilter, QuerySelect};
use tokio::{io::AsyncRead, sync::Mutex};
use sea_orm::{
ActiveModelTrait, ColumnTrait, DbConn, EntityTrait, ModelTrait, QueryFilter, QuerySelect,
Related, RelationTrait, Set, TransactionTrait,
};
use sea_query::{Expr, Query};
use tokio::{
io::AsyncRead,
sync::{Mutex, Semaphore},
};
use uuid::Uuid;
pub const ANY_ARCH: &'static str = "any";
pub const REPOS_DIR: &'static str = "repos";
pub const QUEUE_DIR: &'static str = "queue";
pub struct DistroMgr {
distro_dir: PathBuf,
distro_id: i32,
conn: DbConn,
lock: Arc<Mutex<()>>,
repo_lock: Arc<Mutex<()>>,
sync_lock: Arc<Mutex<()>>,
pkg_sema: Arc<Semaphore>,
}
impl DistroMgr {
@ -34,23 +42,21 @@ impl DistroMgr {
tokio::fs::create_dir(repos_dir).await?;
}
let queue_dir = distro_dir.as_ref().join(QUEUE_DIR);
if !tokio::fs::try_exists(&queue_dir).await? {
tokio::fs::create_dir(queue_dir).await?;
}
Ok(Self {
distro_dir: distro_dir.as_ref().to_path_buf(),
distro_id,
conn,
lock: Arc::new(Mutex::new(())),
repo_lock: Arc::new(Mutex::new(())),
sync_lock: Arc::new(Mutex::new(())),
pkg_sema: Arc::new(Semaphore::new(1)),
})
}
/// Generate archive databases for all known architectures in the repository, including the
/// "any" architecture.
pub async fn generate_archives_all(&self, repo: &str) -> Result<()> {
pub async fn sync_repo(&self, repo: &str) -> Result<()> {
let _guard = self.sync_lock.lock().await;
let repo = crate::db::query::repo::by_name(&self.conn, repo).await?;
if repo.is_none() {
@ -60,31 +66,23 @@ impl DistroMgr {
let repo = repo.unwrap();
let mut archs = repo
.find_related(crate::db::Package)
.find_related(db::Package)
.select_only()
.column(crate::db::package::Column::Arch)
.column(db::package::Column::Arch)
.distinct()
.into_tuple::<String>()
.stream(&self.conn)
.await?;
while let Some(arch) = archs.next().await.transpose()? {
self.generate_archives(&repo.name, &arch).await?;
self.generate_archives(&repo, &arch).await?;
}
Ok(())
}
/// Generate the archive databases for the given repository and architecture.
pub async fn generate_archives(&self, repo: &str, arch: &str) -> Result<()> {
let repo = crate::db::query::repo::by_name(&self.conn, repo).await?;
if repo.is_none() {
return Ok(());
}
let repo = repo.unwrap();
async fn generate_archives(&self, repo: &db::repo::Model, arch: &str) -> Result<()> {
let [tmp_ar_db_path, tmp_ar_files_path, files_tmp_file_path, desc_tmp_file_path] =
self.random_file_paths();
let ar_db = archive::RepoArchiveWriter::open(&tmp_ar_db_path).await?;
@ -95,10 +93,23 @@ impl DistroMgr {
let mut pkgs = repo
.find_related(crate::db::Package)
.filter(db::package::Column::Arch.is_in([arch, ANY_ARCH]))
.filter(
db::package::Column::Id.in_subquery(
Query::select()
.expr(db::package::Column::Id.max())
.from(db::package::Entity)
.group_by_columns([db::package::Column::Arch, db::package::Column::Name])
.to_owned(),
),
)
.stream(&self.conn)
.await?;
let mut commited_ids: Vec<i32> = Vec::new();
while let Some(pkg) = pkgs.next().await.transpose()? {
commited_ids.push(pkg.id);
let mut files_tmp_file = tokio::fs::File::create(&files_tmp_file_path).await?;
let mut desc_tmp_file = tokio::fs::File::create(&desc_tmp_file_path).await?;
@ -132,16 +143,34 @@ impl DistroMgr {
)
.await?;
// Only after we have successfully written everything to disk do we update the database.
// This order ensures any failure can be recovered, as the database is our single source of
// truth.
db::Package::update_many()
.col_expr(
db::package::Column::State,
Expr::value(db::PackageState::Committed),
)
.filter(db::package::Column::Id.is_in(commited_ids))
.exec(&self.conn)
.await?;
// If this fails there's no point in failing the function + if there were no packages in
// the repo, this fails anyway because the temp file doesn't exist
let _ = tokio::fs::remove_file(desc_tmp_file_path).await;
let _ = tokio::fs::remove_file(files_tmp_file_path).await;
tracing::info!(
"Package archives generated for '{}' ('{}')",
&repo.name,
arch
);
Ok(())
}
async fn get_or_create_repo(&self, repo: &str) -> Result<db::repo::Model> {
let _guard = self.lock.lock().await;
let _guard = self.repo_lock.lock().await;
if let Some(repo) = db::query::repo::by_name(&self.conn, repo).await? {
Ok(repo)
@ -205,7 +234,7 @@ impl DistroMgr {
// If we removed all "any" packages, we need to resync all databases
if arch == ANY_ARCH {
self.generate_archives_all(&repo.name).await?;
self.sync_repo(&repo.name).await?;
}
Ok(true)
@ -231,11 +260,11 @@ impl DistroMgr {
.await?;
pkg.delete(&self.conn).await?;
if arch == ANY_ARCH {
self.generate_archives_all(&repo.name).await?;
} else {
self.generate_archives(&repo.name, arch).await?;
}
//if arch == ANY_ARCH {
// self.sync_repo(&repo.name).await?;
//} else {
// self.generate_archives(&repo.name, arch).await?;
//}
Ok(true)
} else {
@ -246,26 +275,33 @@ impl DistroMgr {
}
}
pub async fn add_pkg_from_reader<R: AsyncRead + std::marker::Unpin>(
pub async fn add_pkg_from_path<P: AsRef<Path>>(
&self,
reader: &mut R,
path: P,
repo: &str,
) -> crate::Result<(String, String, String)> {
let [tmp_file_path] = self.random_file_paths();
let mut temp_file = tokio::fs::File::create(&tmp_file_path).await?;
let _guard = self.pkg_sema.acquire().await.unwrap();
tokio::io::copy(reader, &mut temp_file).await?;
let path_clone = tmp_file_path.clone();
let path_clone = path.as_ref().to_path_buf();
let pkg = tokio::task::spawn_blocking(move || package::Package::open(path_clone))
.await
.unwrap()?;
let repo = self.get_or_create_repo(repo).await?;
// TODO prevent database from being updated but file failing to move to repo dir?
let pkg = db::query::package::insert(&self.conn, repo.id, pkg).await?;
let queue_path = self.distro_dir.join(QUEUE_DIR).join(pkg.id.to_string());
tokio::fs::rename(tmp_file_path, queue_path).await?;
let queue_path = self.distro_dir.join(&repo.name).join(pkg.id.to_string());
tokio::fs::rename(path.as_ref(), queue_path).await?;
tracing::info!(
"Added '{}-{}' to repository '{}' ({})",
pkg.name,
pkg.version,
repo.name,
pkg.arch
);
// If the package already exists in the database, we remove it first
//let res = db::query::package::by_fields(

View File

@ -75,15 +75,24 @@ async fn post_package_archive(
) -> crate::Result<()> {
let mut body = StreamReader::new(body.into_data_stream().map_err(std::io::Error::other));
let mgr = global.mgr.get_or_create_mgr(&distro).await?;
let (name, version, arch) = mgr.add_pkg_from_reader(&mut body, &repo).await?;
let [tmp_path] = mgr.random_file_paths();
tracing::info!(
"Added '{}-{}' to repository '{}' ({})",
name,
version,
repo,
arch
);
let mut tmp_file = tokio::fs::File::create(&tmp_path).await?;
tokio::io::copy(&mut body, &mut tmp_file).await?;
tokio::spawn(async move { mgr.add_pkg_from_path(tmp_path, &repo).await });
//let (name, version, arch) = mgr.add_pkg_from_path(&mut body, &repo).await?;
//
//tracing::info!(
// "Added '{}-{}' to repository '{}' ({})",
// name,
// version,
// repo,
// arch
//);
//tokio::spawn(async move { mgr.sync_repo(&repo).await });
Ok(())
}

View File

@ -323,7 +323,7 @@ pub async fn write_desc<W: AsyncWrite + std::marker::Unpin>(
pkg: &package::Model,
) -> crate::Result<()> {
writer
.write_all(format!("%FILENAME%\n{}\n", filename(pkg)).as_bytes())
.write_all(format!("%FILENAME%\n{}\n", pkg.id).as_bytes())
.await?;
write_attribute(writer, "NAME", &pkg.name).await?;