feat: chunk large database inserts
parent
d375df0ff4
commit
e3b0f4f0a1
|
@ -4,6 +4,9 @@ use sea_orm::{sea_query::IntoCondition, *};
|
||||||
use sea_query::{Alias, Expr, Query, SelectStatement};
|
use sea_query::{Alias, Expr, Query, SelectStatement};
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
|
|
||||||
|
/// How many fields may be inserted at once into the database.
|
||||||
|
const PACKAGE_INSERT_LIMIT: usize = 1000;
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
#[derive(Deserialize)]
|
||||||
pub struct Filter {
|
pub struct Filter {
|
||||||
repo: Option<i32>,
|
repo: Option<i32>,
|
||||||
|
@ -160,23 +163,34 @@ pub async fn insert(
|
||||||
.iter()
|
.iter()
|
||||||
.map(|s| (PackageRelatedEnum::Optdepend, s)),
|
.map(|s| (PackageRelatedEnum::Optdepend, s)),
|
||||||
);
|
);
|
||||||
|
let related = crate::util::Chunked::new(related, PACKAGE_INSERT_LIMIT);
|
||||||
|
|
||||||
PackageRelated::insert_many(related.map(|(t, s)| package_related::ActiveModel {
|
for chunk in related {
|
||||||
|
PackageRelated::insert_many(
|
||||||
|
chunk
|
||||||
|
.into_iter()
|
||||||
|
.map(|(t, s)| package_related::ActiveModel {
|
||||||
package_id: Set(pkg_entry.id),
|
package_id: Set(pkg_entry.id),
|
||||||
r#type: Set(t),
|
r#type: Set(t),
|
||||||
name: Set(s.to_string()),
|
name: Set(s.to_string()),
|
||||||
}))
|
}),
|
||||||
|
)
|
||||||
.on_empty_do_nothing()
|
.on_empty_do_nothing()
|
||||||
.exec(&txn)
|
.exec(&txn)
|
||||||
.await?;
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
PackageFile::insert_many(pkg.files.iter().map(|s| package_file::ActiveModel {
|
let files = crate::util::Chunked::new(pkg.files, PACKAGE_INSERT_LIMIT);
|
||||||
|
|
||||||
|
for chunk in files {
|
||||||
|
PackageFile::insert_many(chunk.into_iter().map(|s| package_file::ActiveModel {
|
||||||
package_id: Set(pkg_entry.id),
|
package_id: Set(pkg_entry.id),
|
||||||
path: Set(s.display().to_string()),
|
path: Set(s.display().to_string()),
|
||||||
}))
|
}))
|
||||||
.on_empty_do_nothing()
|
.on_empty_do_nothing()
|
||||||
.exec(&txn)
|
.exec(&txn)
|
||||||
.await?;
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
txn.commit().await?;
|
txn.commit().await?;
|
||||||
|
|
||||||
|
|
|
@ -3,6 +3,7 @@ mod config;
|
||||||
pub mod db;
|
pub mod db;
|
||||||
mod error;
|
mod error;
|
||||||
mod repo;
|
mod repo;
|
||||||
|
mod util;
|
||||||
mod web;
|
mod web;
|
||||||
|
|
||||||
pub use config::{Config, DbConfig, FsConfig};
|
pub use config::{Config, DbConfig, FsConfig};
|
||||||
|
|
|
@ -0,0 +1,23 @@
|
||||||
|
pub struct Chunked<I> {
|
||||||
|
iter: I,
|
||||||
|
chunk_size: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<I: Iterator> Chunked<I> {
|
||||||
|
pub fn new<T: IntoIterator<IntoIter = I>>(into: T, chunk_size: usize) -> Self {
|
||||||
|
Self {
|
||||||
|
iter: into.into_iter(),
|
||||||
|
chunk_size,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// https://users.rust-lang.org/t/how-to-breakup-an-iterator-into-chunks/87915/5
|
||||||
|
impl<I: Iterator> Iterator for Chunked<I> {
|
||||||
|
type Item = Vec<I::Item>;
|
||||||
|
|
||||||
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
|
Some(self.iter.by_ref().take(self.chunk_size).collect())
|
||||||
|
.filter(|chunk: &Vec<_>| !chunk.is_empty())
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue