feat: atomatically update db archives

concurrent-repos
Jef Roosens 2024-05-29 15:04:20 +02:00
parent 0b1c8b640f
commit ecc33f0153
Signed by: Jef Roosens
GPG Key ID: B75D4F293C7052DB
4 changed files with 67 additions and 75 deletions

View File

@ -81,7 +81,7 @@ impl Cli {
let config = Config {
data_dir: self.data_dir.clone(),
};
let repo_manager = MetaRepoMgr::new(&self.data_dir.join("repos"));
let repo_manager = MetaRepoMgr::new(&self.data_dir.join("repos"), db.clone());
let global = Global {
config,

View File

@ -30,11 +30,13 @@ impl RepoArchiveWriter {
.unwrap()?;
Ok(Self {
// In practice, mutex is only ever used by one thread at a time. It's simply here so we
// can use spawn_blocking without issues.
ar: Arc::new(Mutex::new(ar)),
})
}
/// Set the current entry to be a new "files" list
/// Add either a "desc" or "files" entry to the archive
pub async fn add_entry<P: AsRef<Path>>(
&self,
full_name: &str,
@ -73,15 +75,4 @@ impl RepoArchiveWriter {
.unwrap()?,
)
}
//
///// Append the given line to the currently active entry
//pub async fn write_line(&self, line: &str) -> io::Result<()> {
// let line = String::from(line);
// let (tx, rx) = oneshot::channel();
//
// self.tx.send(Message::AppendLine(tx, line)).await;
//
// rx.await.unwrap()
//}
}

View File

@ -12,19 +12,21 @@ pub const ANY_ARCH: &'static str = "any";
pub struct MetaRepoMgr {
repo_dir: PathBuf,
conn: DbConn,
}
impl MetaRepoMgr {
pub fn new<P: AsRef<Path>>(repo_dir: P) -> Self {
pub fn new<P: AsRef<Path>>(repo_dir: P, conn: DbConn) -> Self {
MetaRepoMgr {
repo_dir: repo_dir.as_ref().to_path_buf(),
conn,
}
}
/// Generate archive databases for all known architectures in the repository, including the
/// "any" architecture.
pub async fn generate_archives_all(&self, conn: &DbConn, repo: &str) -> Result<()> {
let repo = crate::db::query::repo::by_name(conn, repo).await?;
pub async fn generate_archives_all(&self, repo: &str) -> Result<()> {
let repo = crate::db::query::repo::by_name(&self.conn, repo).await?;
if repo.is_none() {
return Ok(());
@ -38,19 +40,19 @@ impl MetaRepoMgr {
.column(crate::db::package::Column::Arch)
.distinct()
.into_tuple::<String>()
.stream(conn)
.stream(&self.conn)
.await?;
while let Some(arch) = archs.next().await.transpose()? {
self.generate_archives(conn, &repo.name, &arch).await?;
self.generate_archives(&repo.name, &arch).await?;
}
Ok(())
}
/// Generate the archive databases for the given repository and architecture.
pub async fn generate_archives(&self, conn: &DbConn, repo: &str, arch: &str) -> Result<()> {
let repo = crate::db::query::repo::by_name(conn, repo).await?;
pub async fn generate_archives(&self, repo: &str, arch: &str) -> Result<()> {
let repo = crate::db::query::repo::by_name(&self.conn, repo).await?;
if repo.is_none() {
return Ok(());
@ -61,34 +63,25 @@ impl MetaRepoMgr {
let parent_dir = self.repo_dir.join(&repo.name);
tokio::fs::create_dir_all(&parent_dir).await?;
let ar_db =
archive::RepoArchiveWriter::open(parent_dir.join(format!("{}.db.tar.gz", arch)))
.await?;
let ar_files =
archive::RepoArchiveWriter::open(parent_dir.join(format!("{}.files.tar.gz", arch)))
.await?;
let [tmp_ar_db_path, tmp_ar_files_path, files_tmp_file_path, desc_tmp_file_path] =
self.random_file_paths();
let ar_db = archive::RepoArchiveWriter::open(&tmp_ar_db_path).await?;
let ar_files = archive::RepoArchiveWriter::open(&tmp_ar_files_path).await?;
// Query all packages in the repo that have the given architecture or the "any"
// architecture
let mut pkgs = repo
.find_related(crate::db::Package)
.filter(db::package::Column::Arch.is_in([arch, ANY_ARCH]))
.stream(conn)
.stream(&self.conn)
.await?;
// Create two temp file paths to write our entries to
let uuid: uuid::fmt::Simple = Uuid::new_v4().into();
let files_tmp_file_path = self.repo_dir.join(uuid.to_string());
let uuid: uuid::fmt::Simple = Uuid::new_v4().into();
let desc_tmp_file_path = self.repo_dir.join(uuid.to_string());
while let Some(pkg) = pkgs.next().await.transpose()? {
let mut files_tmp_file = tokio::fs::File::create(&files_tmp_file_path).await?;
let mut desc_tmp_file = tokio::fs::File::create(&desc_tmp_file_path).await?;
package::write_files(conn, &mut files_tmp_file, &pkg).await?;
package::write_desc(conn, &mut desc_tmp_file, &pkg).await?;
package::write_files(&self.conn, &mut files_tmp_file, &pkg).await?;
package::write_desc(&self.conn, &mut desc_tmp_file, &pkg).await?;
let full_name = format!("{}-{}", pkg.name, pkg.version);
@ -107,6 +100,18 @@ impl MetaRepoMgr {
ar_db.close().await?;
ar_files.close().await?;
// Move the db archives to their respective places
tokio::fs::rename(
tmp_ar_db_path,
parent_dir.join(format!("{}.db.tar.gz", arch)),
)
.await?;
tokio::fs::rename(
tmp_ar_files_path,
parent_dir.join(format!("{}.files.tar.gz", arch)),
)
.await?;
// If this fails there's no point in failing the function + if there were no packages in
// the repo, this fails anyway because the temp file doesn't exist
let _ = tokio::fs::remove_file(desc_tmp_file_path).await;
@ -116,12 +121,12 @@ impl MetaRepoMgr {
}
/// Remove the repo with the given name, if it existed
pub async fn remove_repo(&self, conn: &DbConn, repo: &str) -> Result<bool> {
let res = db::query::repo::by_name(conn, repo).await?;
pub async fn remove_repo(&self, repo: &str) -> Result<bool> {
let res = db::query::repo::by_name(&self.conn, repo).await?;
if let Some(repo_entry) = res {
// Remove repository from database
repo_entry.delete(conn).await?;
repo_entry.delete(&self.conn).await?;
// Remove files from file system
tokio::fs::remove_dir_all(self.repo_dir.join(repo)).await?;
@ -133,14 +138,14 @@ impl MetaRepoMgr {
}
/// Remove all packages from the repository with the given arch.
pub async fn remove_repo_arch(&self, conn: &DbConn, repo: &str, arch: &str) -> Result<bool> {
let repo = db::query::repo::by_name(conn, repo).await?;
pub async fn remove_repo_arch(&self, repo: &str, arch: &str) -> Result<bool> {
let repo = db::query::repo::by_name(&self.conn, repo).await?;
if let Some(repo) = repo {
let mut pkgs = repo
.find_related(db::Package)
.filter(db::package::Column::Arch.eq(arch))
.stream(conn)
.stream(&self.conn)
.await?;
while let Some(pkg) = pkgs.next().await.transpose()? {
@ -150,7 +155,7 @@ impl MetaRepoMgr {
.join(super::package::filename(&pkg));
tokio::fs::remove_file(path).await?;
pkg.delete(conn).await?;
pkg.delete(&self.conn).await?;
}
tokio::fs::remove_file(
@ -168,7 +173,7 @@ impl MetaRepoMgr {
// If we removed all "any" packages, we need to resync all databases
if arch == ANY_ARCH {
self.generate_archives_all(conn, &repo.name).await?;
self.generate_archives_all(&repo.name).await?;
}
Ok(true)
@ -177,17 +182,12 @@ impl MetaRepoMgr {
}
}
pub async fn remove_pkg(
&self,
conn: &DbConn,
repo: &str,
arch: &str,
name: &str,
) -> Result<bool> {
let repo = db::query::repo::by_name(conn, repo).await?;
pub async fn remove_pkg(&self, repo: &str, arch: &str, name: &str) -> Result<bool> {
let repo = db::query::repo::by_name(&self.conn, repo).await?;
if let Some(repo) = repo {
let pkg = db::query::package::by_fields(conn, repo.id, arch, name, None, None).await?;
let pkg =
db::query::package::by_fields(&self.conn, repo.id, arch, name, None, None).await?;
if let Some(pkg) = pkg {
// Remove package from database & file system
@ -197,12 +197,12 @@ impl MetaRepoMgr {
.join(super::package::filename(&pkg)),
)
.await?;
pkg.delete(conn).await?;
pkg.delete(&self.conn).await?;
if arch == ANY_ARCH {
self.generate_archives_all(conn, &repo.name).await?;
self.generate_archives_all(&repo.name).await?;
} else {
self.generate_archives(conn, &repo.name, arch).await?;
self.generate_archives(&repo.name, arch).await?;
}
Ok(true)
@ -216,13 +216,11 @@ impl MetaRepoMgr {
pub async fn add_pkg_from_reader<R: AsyncRead + std::marker::Unpin>(
&self,
conn: &DbConn,
reader: &mut R,
repo: &str,
) -> crate::Result<(String, String, String)> {
// Copy file contents to temporary path so libarchive can work with it
let uuid: uuid::fmt::Simple = Uuid::new_v4().into();
let path = self.repo_dir.join(uuid.to_string());
let [path] = self.random_file_paths();
let mut temp_file = tokio::fs::File::create(&path).await?;
tokio::io::copy(reader, &mut temp_file).await?;
@ -234,17 +232,17 @@ impl MetaRepoMgr {
.unwrap()?;
// Query the repo for its ID, or create it if it does not already exist
let res = db::query::repo::by_name(conn, &repo).await?;
let res = db::query::repo::by_name(&self.conn, &repo).await?;
let repo_id = if let Some(repo_entity) = res {
repo_entity.id
} else {
db::query::repo::insert(conn, repo, None).await?.id
db::query::repo::insert(&self.conn, repo, None).await?.id
};
// If the package already exists in the database, we remove it first
let res = db::query::package::by_fields(
conn,
&self.conn,
repo_id,
&pkg.info.arch,
&pkg.info.name,
@ -254,7 +252,7 @@ impl MetaRepoMgr {
.await?;
if let Some(entry) = res {
entry.delete(conn).await?;
entry.delete(&self.conn).await?;
}
let dest_pkg_path = self.repo_dir.join(repo).join(pkg.file_name());
@ -263,7 +261,7 @@ impl MetaRepoMgr {
let name = pkg.info.name.clone();
let version = pkg.info.version.clone();
let arch = pkg.info.arch.clone();
db::query::package::insert(conn, repo_id, pkg).await?;
db::query::package::insert(&self.conn, repo_id, pkg).await?;
// Move the package to its final resting place
tokio::fs::create_dir_all(dest_pkg_path.parent().unwrap()).await?;
@ -271,11 +269,19 @@ impl MetaRepoMgr {
// Synchronize archive databases
if arch == ANY_ARCH {
self.generate_archives_all(conn, repo).await?;
self.generate_archives_all(repo).await?;
} else {
self.generate_archives(conn, repo, &arch).await?;
self.generate_archives(repo, &arch).await?;
}
Ok((name, version, arch))
}
/// Generate a path to a unique file that can be used as a temporary file
pub fn random_file_paths<const C: usize>(&self) -> [PathBuf; C] {
std::array::from_fn(|_| {
let uuid: uuid::fmt::Simple = Uuid::new_v4().into();
self.repo_dir.join(uuid.to_string())
})
}
}

View File

@ -73,7 +73,7 @@ async fn post_package_archive(
.repo_manager
.write()
.await
.add_pkg_from_reader(&global.db, &mut body, &repo)
.add_pkg_from_reader(&mut body, &repo)
.await?;
tracing::info!(
@ -91,12 +91,7 @@ async fn delete_repo(
State(global): State<crate::Global>,
Path(repo): Path<String>,
) -> crate::Result<StatusCode> {
let repo_removed = global
.repo_manager
.write()
.await
.remove_repo(&global.db, &repo)
.await?;
let repo_removed = global.repo_manager.write().await.remove_repo(&repo).await?;
if repo_removed {
tracing::info!("Removed repository '{}'", repo);
@ -115,7 +110,7 @@ async fn delete_arch_repo(
.repo_manager
.write()
.await
.remove_repo_arch(&global.db, &repo, &arch)
.remove_repo_arch(&repo, &arch)
.await?;
if repo_removed {
@ -135,7 +130,7 @@ async fn delete_package(
.repo_manager
.write()
.await
.remove_pkg(&global.db, &repo, &arch, &pkg_name)
.remove_pkg(&repo, &arch, &pkg_name)
.await?;
if pkg_removed {