feat: implement change timestamp for subscriptions set
This commit is contained in:
parent
2f0fe08f4c
commit
6d439783b5
7 changed files with 243 additions and 7 deletions
|
|
@ -1,3 +1,5 @@
|
|||
use std::collections::HashSet;
|
||||
|
||||
use diesel::prelude::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
|
|
@ -10,6 +12,8 @@ pub struct Subscription {
|
|||
pub id: i64,
|
||||
pub device_id: i64,
|
||||
pub url: String,
|
||||
pub time_changed: i64,
|
||||
pub deleted: bool,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Insertable)]
|
||||
|
|
@ -18,6 +22,8 @@ pub struct Subscription {
|
|||
pub struct NewSubscription {
|
||||
pub device_id: i64,
|
||||
pub url: String,
|
||||
pub time_changed: i64,
|
||||
pub deleted: bool,
|
||||
}
|
||||
|
||||
impl Subscription {
|
||||
|
|
@ -37,15 +43,76 @@ impl Subscription {
|
|||
.get_results(&mut pool.get()?)?)
|
||||
}
|
||||
|
||||
pub fn update_for_device(pool: &DbPool, device_id: i64, urls: Vec<String>) -> DbResult<()> {
|
||||
pub fn set_for_device(
|
||||
pool: &DbPool,
|
||||
device_id: i64,
|
||||
urls: Vec<String>,
|
||||
timestamp: i64,
|
||||
) -> DbResult<()> {
|
||||
pool.get()?.transaction(|conn| {
|
||||
diesel::delete(subscriptions::table.filter(subscriptions::device_id.eq(device_id)))
|
||||
.execute(conn)?;
|
||||
// https://github.com/diesel-rs/diesel/discussions/2826
|
||||
// SQLite doesn't support default on conflict set values, so we can't handle this using
|
||||
// on conflict. Therefore, we instead calculate which URLs should be inserted and which
|
||||
// updated, so we avoid conflicts.
|
||||
let urls: HashSet<String> = urls.into_iter().collect();
|
||||
let urls_in_db: HashSet<String> = subscriptions::table
|
||||
.select(subscriptions::url)
|
||||
.filter(subscriptions::device_id.eq(device_id))
|
||||
.get_results(&mut pool.get()?)?
|
||||
.into_iter()
|
||||
.collect();
|
||||
|
||||
// URLs originally in the database that are no longer in the list
|
||||
let urls_to_delete = urls_in_db.difference(&urls);
|
||||
|
||||
// URLs not in the database that are in the new list
|
||||
let urls_to_insert = urls.difference(&urls_in_db);
|
||||
|
||||
// URLs that are in both the database and the new list. For these, those marked as
|
||||
// "deleted" in the database are updated so they're no longer deleted, with their
|
||||
// timestamp updated.
|
||||
let urls_to_update = urls.intersection(&urls_in_db);
|
||||
|
||||
// Mark the URLs to delete as properly deleted
|
||||
diesel::update(
|
||||
subscriptions::table.filter(
|
||||
subscriptions::device_id
|
||||
.eq(device_id)
|
||||
.and(subscriptions::url.eq_any(urls_to_delete)),
|
||||
),
|
||||
)
|
||||
.set((
|
||||
subscriptions::deleted.eq(true),
|
||||
subscriptions::time_changed.eq(timestamp),
|
||||
))
|
||||
.execute(conn)?;
|
||||
|
||||
// Update the existing deleted URLs that are reinserted as no longer deleted
|
||||
diesel::update(
|
||||
subscriptions::table.filter(
|
||||
subscriptions::device_id
|
||||
.eq(device_id)
|
||||
.and(subscriptions::url.eq_any(urls_to_update))
|
||||
.and(subscriptions::deleted.eq(true)),
|
||||
),
|
||||
)
|
||||
.set((
|
||||
subscriptions::deleted.eq(false),
|
||||
subscriptions::time_changed.eq(timestamp),
|
||||
))
|
||||
.execute(conn)?;
|
||||
|
||||
// Insert the new values into the database
|
||||
diesel::insert_into(subscriptions::table)
|
||||
.values(
|
||||
urls.into_iter()
|
||||
.map(|url| NewSubscription { device_id, url })
|
||||
urls_to_insert
|
||||
.into_iter()
|
||||
.map(|url| NewSubscription {
|
||||
device_id,
|
||||
url: url.to_string(),
|
||||
deleted: false,
|
||||
time_changed: timestamp,
|
||||
})
|
||||
.collect::<Vec<_>>(),
|
||||
)
|
||||
.execute(conn)?;
|
||||
|
|
|
|||
|
|
@ -23,6 +23,8 @@ diesel::table! {
|
|||
id -> BigInt,
|
||||
device_id -> BigInt,
|
||||
url -> Text,
|
||||
time_changed -> BigInt,
|
||||
deleted -> Bool,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -85,8 +85,11 @@ pub async fn put_device_subscriptions(
|
|||
.insert(&ctx.pool)?
|
||||
};
|
||||
|
||||
Ok::<_, AppError>(db::Subscription::update_for_device(
|
||||
&ctx.pool, device.id, urls,
|
||||
Ok::<_, AppError>(db::Subscription::set_for_device(
|
||||
&ctx.pool,
|
||||
device.id,
|
||||
urls,
|
||||
chrono::Utc::now().timestamp(),
|
||||
)?)
|
||||
})
|
||||
.await
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue