From cac80ca3e4fa0090d972a5ab715e66b7a8fcb58e Mon Sep 17 00:00:00 2001 From: Jef Roosens Date: Mon, 17 Mar 2025 09:25:20 +0100 Subject: [PATCH 1/4] feat: modify entire sync group when updating subscriptions --- src/db/repository/subscription.rs | 329 ++++++++++++++++++------------ 1 file changed, 195 insertions(+), 134 deletions(-) diff --git a/src/db/repository/subscription.rs b/src/db/repository/subscription.rs index 5448380..ffd347a 100644 --- a/src/db/repository/subscription.rs +++ b/src/db/repository/subscription.rs @@ -18,6 +18,154 @@ impl From<(String, i64)> for gpodder::Subscription { } } +fn set_subscriptions_for_single_device( + conn: &mut SqliteConnection, + device_id: i64, + urls: &HashSet, + time_changed: i64, +) -> QueryResult<()> { + // https://github.com/diesel-rs/diesel/discussions/2826 + // SQLite doesn't support default on conflict set values, so we can't handle this using + // on conflict. Therefore, we instead calculate which URLs should be inserted and which + // updated, so we avoid conflicts. + let urls_in_db: HashSet = device_subscriptions::table + .select(device_subscriptions::podcast_url) + .filter(device_subscriptions::device_id.eq(device_id)) + .get_results(conn)? + .into_iter() + .collect(); + + // URLs originally in the database that are no longer in the list + let urls_to_delete = urls_in_db.difference(&urls); + + // URLs not in the database that are in the new list + let urls_to_insert = urls.difference(&urls_in_db); + + // URLs that are in both the database and the new list. For these, those marked as + // "deleted" in the database are updated so they're no longer deleted, with their + // timestamp updated. + let urls_to_update = urls.intersection(&urls_in_db); + + // Mark the URLs to delete as properly deleted + diesel::update( + device_subscriptions::table.filter( + device_subscriptions::device_id + .eq(device_id) + .and(device_subscriptions::podcast_url.eq_any(urls_to_delete)), + ), + ) + .set(( + device_subscriptions::deleted.eq(true), + device_subscriptions::time_changed.eq(time_changed), + )) + .execute(conn)?; + + // Update the existing deleted URLs that are reinserted as no longer deleted + diesel::update( + device_subscriptions::table.filter( + device_subscriptions::device_id + .eq(device_id) + .and(device_subscriptions::podcast_url.eq_any(urls_to_update)) + .and(device_subscriptions::deleted.eq(true)), + ), + ) + .set(( + device_subscriptions::deleted.eq(false), + device_subscriptions::time_changed.eq(time_changed), + )) + .execute(conn)?; + + // Insert the new values into the database + diesel::insert_into(device_subscriptions::table) + .values( + urls_to_insert + .into_iter() + .map(|url| db::NewDeviceSubscription { + device_id, + podcast_url: url.to_string(), + deleted: false, + time_changed, + }) + .collect::>(), + ) + .execute(conn)?; + + Ok(()) +} + +fn update_subscriptions_for_single_device( + conn: &mut SqliteConnection, + device_id: i64, + add: &HashSet, + remove: &HashSet, + time_changed: i64, +) -> QueryResult<()> { + let urls_in_db: HashSet = device_subscriptions::table + .select(device_subscriptions::podcast_url) + .filter(device_subscriptions::device_id.eq(device_id)) + .get_results(conn)? + .into_iter() + .collect(); + + // Subscriptions to remove are those that were already in the database and are now part + // of the removed list. Subscriptions that were never added in the first place don't + // need to be marked as deleted. We also only update those that aren't already marked + // as deleted. + let urls_to_delete = remove.intersection(&urls_in_db); + + diesel::update( + device_subscriptions::table.filter( + device_subscriptions::device_id + .eq(device_id) + .and(device_subscriptions::podcast_url.eq_any(urls_to_delete)) + .and(device_subscriptions::deleted.eq(false)), + ), + ) + .set(( + device_subscriptions::deleted.eq(true), + device_subscriptions::time_changed.eq(time_changed), + )) + .execute(conn)?; + + // Subscriptions to update are those that are already in the database, but are also in + // the added list. Only those who were originally marked as deleted get updated. + let urls_to_update = add.intersection(&urls_in_db); + + diesel::update( + device_subscriptions::table.filter( + device_subscriptions::device_id + .eq(device_id) + .and(device_subscriptions::podcast_url.eq_any(urls_to_update)) + .and(device_subscriptions::deleted.eq(true)), + ), + ) + .set(( + device_subscriptions::deleted.eq(false), + device_subscriptions::time_changed.eq(time_changed), + )) + .execute(conn)?; + + // Subscriptions to insert are those that aren't in the database and are part of the + // added list + let urls_to_insert = add.difference(&urls_in_db); + + diesel::insert_into(device_subscriptions::table) + .values( + urls_to_insert + .into_iter() + .map(|url| db::NewDeviceSubscription { + device_id, + podcast_url: url.to_string(), + deleted: false, + time_changed, + }) + .collect::>(), + ) + .execute(conn)?; + + Ok(()) +} + impl gpodder::SubscriptionRepository for SqliteRepository { fn subscriptions_for_user( &self, @@ -67,84 +215,33 @@ impl gpodder::SubscriptionRepository for SqliteRepository { time_changed: chrono::DateTime, ) -> Result<(), gpodder::AuthErr> { // TODO use a better timestamp - let timestamp = time_changed.timestamp(); + let time_changed = time_changed.timestamp(); + let urls: HashSet = urls.into_iter().collect(); self.pool.get()?.transaction(|conn| { - let device = devices::table - .select(db::Device::as_select()) + let (device_id, group_id) = devices::table + .select((devices::id, devices::sync_group_id)) .filter( devices::user_id .eq(user.id) .and(devices::device_id.eq(device_id)), ) - .get_result(conn)?; + .get_result::<(i64, Option)>(conn)?; - // https://github.com/diesel-rs/diesel/discussions/2826 - // SQLite doesn't support default on conflict set values, so we can't handle this using - // on conflict. Therefore, we instead calculate which URLs should be inserted and which - // updated, so we avoid conflicts. - let urls: HashSet = urls.into_iter().collect(); - let urls_in_db: HashSet = device_subscriptions::table - .select(device_subscriptions::podcast_url) - .filter(device_subscriptions::device_id.eq(device.id)) - .get_results(conn)? - .into_iter() - .collect(); + // If the device is part of a sync group, we need to perform the update on every device + // in the group + if let Some(group_id) = group_id { + let device_ids: Vec = devices::table + .filter(devices::sync_group_id.eq(group_id)) + .select(devices::id) + .get_results(conn)?; - // URLs originally in the database that are no longer in the list - let urls_to_delete = urls_in_db.difference(&urls); - - // URLs not in the database that are in the new list - let urls_to_insert = urls.difference(&urls_in_db); - - // URLs that are in both the database and the new list. For these, those marked as - // "deleted" in the database are updated so they're no longer deleted, with their - // timestamp updated. - let urls_to_update = urls.intersection(&urls_in_db); - - // Mark the URLs to delete as properly deleted - diesel::update( - device_subscriptions::table.filter( - device_subscriptions::device_id - .eq(device.id) - .and(device_subscriptions::podcast_url.eq_any(urls_to_delete)), - ), - ) - .set(( - device_subscriptions::deleted.eq(true), - device_subscriptions::time_changed.eq(timestamp), - )) - .execute(conn)?; - - // Update the existing deleted URLs that are reinserted as no longer deleted - diesel::update( - device_subscriptions::table.filter( - device_subscriptions::device_id - .eq(device.id) - .and(device_subscriptions::podcast_url.eq_any(urls_to_update)) - .and(device_subscriptions::deleted.eq(true)), - ), - ) - .set(( - device_subscriptions::deleted.eq(false), - device_subscriptions::time_changed.eq(timestamp), - )) - .execute(conn)?; - - // Insert the new values into the database - diesel::insert_into(device_subscriptions::table) - .values( - urls_to_insert - .into_iter() - .map(|url| db::NewDeviceSubscription { - device_id: device.id, - podcast_url: url.to_string(), - deleted: false, - time_changed: timestamp, - }) - .collect::>(), - ) - .execute(conn)?; + for device_id in device_ids { + set_subscriptions_for_single_device(conn, device_id, &urls, time_changed)?; + } + } else { + set_subscriptions_for_single_device(conn, device_id, &urls, time_changed)?; + } Ok::<_, diesel::result::Error>(()) })?; @@ -161,7 +258,7 @@ impl gpodder::SubscriptionRepository for SqliteRepository { time_changed: chrono::DateTime, ) -> Result<(), gpodder::AuthErr> { // TODO use a better timestamp - let timestamp = time_changed.timestamp(); + let time_changed = time_changed.timestamp(); // TODO URLs that are in both the added and removed lists will currently get "re-added", // meaning their change timestamp will be updated even though they haven't really changed. @@ -169,77 +266,41 @@ impl gpodder::SubscriptionRepository for SqliteRepository { let remove: HashSet<_> = remove.into_iter().collect(); self.pool.get()?.transaction(|conn| { - let device = devices::table - .select(db::Device::as_select()) + let (device_id, group_id) = devices::table + .select((devices::id, devices::sync_group_id)) .filter( devices::user_id .eq(user.id) .and(devices::device_id.eq(device_id)), ) - .get_result(conn)?; + .get_result::<(i64, Option)>(conn)?; - let urls_in_db: HashSet = device_subscriptions::table - .select(device_subscriptions::podcast_url) - .filter(device_subscriptions::device_id.eq(device.id)) - .get_results(conn)? - .into_iter() - .collect(); + // If the device is part of a sync group, we need to perform the update on every device + // in the group + if let Some(group_id) = group_id { + let device_ids: Vec = devices::table + .filter(devices::sync_group_id.eq(group_id)) + .select(devices::id) + .get_results(conn)?; - // Subscriptions to remove are those that were already in the database and are now part - // of the removed list. Subscriptions that were never added in the first place don't - // need to be marked as deleted. We also only update those that aren't already marked - // as deleted. - let urls_to_delete = remove.intersection(&urls_in_db); - - diesel::update( - device_subscriptions::table.filter( - device_subscriptions::device_id - .eq(device.id) - .and(device_subscriptions::podcast_url.eq_any(urls_to_delete)) - .and(device_subscriptions::deleted.eq(false)), - ), - ) - .set(( - device_subscriptions::deleted.eq(true), - device_subscriptions::time_changed.eq(timestamp), - )) - .execute(conn)?; - - // Subscriptions to update are those that are already in the database, but are also in - // the added list. Only those who were originally marked as deleted get updated. - let urls_to_update = add.intersection(&urls_in_db); - - diesel::update( - device_subscriptions::table.filter( - device_subscriptions::device_id - .eq(device.id) - .and(device_subscriptions::podcast_url.eq_any(urls_to_update)) - .and(device_subscriptions::deleted.eq(true)), - ), - ) - .set(( - device_subscriptions::deleted.eq(false), - device_subscriptions::time_changed.eq(timestamp), - )) - .execute(conn)?; - - // Subscriptions to insert are those that aren't in the database and are part of the - // added list - let urls_to_insert = add.difference(&urls_in_db); - - diesel::insert_into(device_subscriptions::table) - .values( - urls_to_insert - .into_iter() - .map(|url| db::NewDeviceSubscription { - device_id: device.id, - podcast_url: url.to_string(), - deleted: false, - time_changed: timestamp, - }) - .collect::>(), - ) - .execute(conn)?; + for device_id in device_ids { + update_subscriptions_for_single_device( + conn, + device_id, + &add, + &remove, + time_changed, + )?; + } + } else { + update_subscriptions_for_single_device( + conn, + device_id, + &add, + &remove, + time_changed, + )?; + } Ok::<_, diesel::result::Error>(()) })?; From efe08771b1eeb355e43b00fb64d99167394459ed Mon Sep 17 00:00:00 2001 From: Jef Roosens Date: Mon, 17 Mar 2025 10:14:13 +0100 Subject: [PATCH 2/4] feat: implement synchronize sync group in sqlite repository --- src/db/repository/device.rs | 53 +++++++++++++++++++++++++++++-- src/db/repository/subscription.rs | 43 +++++++++++++++---------- 2 files changed, 78 insertions(+), 18 deletions(-) diff --git a/src/db/repository/device.rs b/src/db/repository/device.rs index 38d5d4c..cd1a4d9 100644 --- a/src/db/repository/device.rs +++ b/src/db/repository/device.rs @@ -1,5 +1,7 @@ +use std::collections::HashSet; + use chrono::{DateTime, Utc}; -use diesel::prelude::*; +use diesel::{alias, dsl::not, prelude::*}; use super::SqliteRepository; use crate::{ @@ -185,7 +187,54 @@ impl gpodder::DeviceRepository for SqliteRepository { group_id: i64, time_changed: DateTime, ) -> Result<(), gpodder::AuthErr> { - todo!() + let time_changed = time_changed.timestamp(); + let conn = &mut self.pool.get()?; + + conn.transaction(|conn| { + let device_ids: Vec = devices::table + .filter(devices::sync_group_id.eq(group_id)) + .select(devices::id) + .get_results(conn)?; + + // For each device in the group, we get the list of subscriptions not yet in its own + // non-deleted list, and add it to the database + for device_id in device_ids.iter().copied() { + let d1 = alias!(device_subscriptions as d1); + + let own_subscriptions = d1 + .filter( + d1.field(device_subscriptions::device_id) + .eq(device_id) + .and(d1.field(device_subscriptions::deleted).eq(false)), + ) + .select(d1.field(device_subscriptions::podcast_url)); + + let urls_to_add = device_subscriptions::table + .select(device_subscriptions::podcast_url) + .filter( + device_subscriptions::device_id + .eq_any(device_ids.iter()) + .and(device_subscriptions::deleted.eq(false)) + .and(not( + device_subscriptions::podcast_url.eq_any(own_subscriptions) + )), + ) + .distinct() + .load_iter(conn)? + .collect::, _>>()?; + + super::subscription::insert_subscriptions_for_single_device( + conn, + device_id, + urls_to_add.iter(), + time_changed, + )?; + } + + Ok::<_, diesel::result::Error>(()) + })?; + + Ok(()) } fn devices_by_sync_group( diff --git a/src/db/repository/subscription.rs b/src/db/repository/subscription.rs index ffd347a..447a27d 100644 --- a/src/db/repository/subscription.rs +++ b/src/db/repository/subscription.rs @@ -93,7 +93,32 @@ fn set_subscriptions_for_single_device( Ok(()) } -fn update_subscriptions_for_single_device( +/// Add the given URLs to the device's list of subscriptions, meaning the URLs are truly inserted +/// into the database. This function assumes the list of URLs is already free of URLs that already +/// have a corresponding row in the database, so no conflict checks are performed. +pub fn insert_subscriptions_for_single_device<'a>( + conn: &mut SqliteConnection, + device_id: i64, + urls: impl Iterator, + time_changed: i64, +) -> QueryResult<()> { + diesel::insert_into(device_subscriptions::table) + .values( + urls.into_iter() + .map(|url| db::NewDeviceSubscription { + device_id, + podcast_url: url.to_string(), + deleted: false, + time_changed, + }) + .collect::>(), + ) + .execute(conn)?; + + Ok(()) +} + +pub fn update_subscriptions_for_single_device( conn: &mut SqliteConnection, device_id: i64, add: &HashSet, @@ -149,19 +174,7 @@ fn update_subscriptions_for_single_device( // added list let urls_to_insert = add.difference(&urls_in_db); - diesel::insert_into(device_subscriptions::table) - .values( - urls_to_insert - .into_iter() - .map(|url| db::NewDeviceSubscription { - device_id, - podcast_url: url.to_string(), - deleted: false, - time_changed, - }) - .collect::>(), - ) - .execute(conn)?; + insert_subscriptions_for_single_device(conn, device_id, urls_to_insert, time_changed)?; Ok(()) } @@ -214,7 +227,6 @@ impl gpodder::SubscriptionRepository for SqliteRepository { urls: Vec, time_changed: chrono::DateTime, ) -> Result<(), gpodder::AuthErr> { - // TODO use a better timestamp let time_changed = time_changed.timestamp(); let urls: HashSet = urls.into_iter().collect(); @@ -257,7 +269,6 @@ impl gpodder::SubscriptionRepository for SqliteRepository { remove: Vec, time_changed: chrono::DateTime, ) -> Result<(), gpodder::AuthErr> { - // TODO use a better timestamp let time_changed = time_changed.timestamp(); // TODO URLs that are in both the added and removed lists will currently get "re-added", From f42c708cc6c58ec688b48f9ad8bd552fd40e5614 Mon Sep 17 00:00:00 2001 From: Jef Roosens Date: Mon, 17 Mar 2025 10:27:30 +0100 Subject: [PATCH 3/4] feat: implemented sync status update function in repository --- src/gpodder/repository.rs | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/src/gpodder/repository.rs b/src/gpodder/repository.rs index 62f860e..d197cd5 100644 --- a/src/gpodder/repository.rs +++ b/src/gpodder/repository.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{collections::HashSet, sync::Arc}; use argon2::{Argon2, PasswordHash, PasswordVerifier}; use chrono::{DateTime, TimeDelta, Utc}; @@ -100,7 +100,30 @@ impl GpodderRepository { sync: Vec>, unsync: Vec<&str>, ) -> Result<(), AuthErr> { - todo!("perform diff devices to sync and unsync") + let now = Utc::now(); + let mut unsync: HashSet<&str> = HashSet::from_iter(unsync); + let original_unsync = unsync.clone(); + + for group in sync { + // We want to remove devices that are provided in both a sync group and the unsync + // category, as this does not make sense to perform. We use the original unsync array, + // as a device could be provided multiple times in different sync groups. + let (remove, remaining): (Vec<&str>, Vec<&str>) = + group.into_iter().partition(|d| original_unsync.contains(d)); + + for device_id in remove { + unsync.remove(device_id); + } + + let group_id = self.store.merge_sync_groups(user, remaining)?; + self.store.synchronize_sync_group(group_id, now)?; + } + + // Finally we unsync the remaining devices + self.store + .remove_from_sync_group(user, unsync.into_iter().collect())?; + + Ok(()) } pub fn devices_by_sync_group( From 0e543539cff24e26c0b6a7d6dbd86d8a86bbb117 Mon Sep 17 00:00:00 2001 From: Jef Roosens Date: Mon, 17 Mar 2025 11:15:56 +0100 Subject: [PATCH 4/4] feat: implemented sync device API routes --- src/server/gpodder/advanced/mod.rs | 2 + src/server/gpodder/advanced/sync.rs | 91 +++++++++++++++++++++++++++++ src/server/gpodder/models.rs | 14 +++++ 3 files changed, 107 insertions(+) create mode 100644 src/server/gpodder/advanced/sync.rs diff --git a/src/server/gpodder/advanced/mod.rs b/src/server/gpodder/advanced/mod.rs index a1f596c..c9ca0cd 100644 --- a/src/server/gpodder/advanced/mod.rs +++ b/src/server/gpodder/advanced/mod.rs @@ -2,6 +2,7 @@ mod auth; mod devices; mod episodes; mod subscriptions; +mod sync; use axum::Router; @@ -13,4 +14,5 @@ pub fn router(ctx: Context) -> Router { .nest("/devices", devices::router(ctx.clone())) .nest("/subscriptions", subscriptions::router(ctx.clone())) .nest("/episodes", episodes::router(ctx.clone())) + .nest("/sync-devices", sync::router(ctx.clone())) } diff --git a/src/server/gpodder/advanced/sync.rs b/src/server/gpodder/advanced/sync.rs new file mode 100644 index 0000000..fc97869 --- /dev/null +++ b/src/server/gpodder/advanced/sync.rs @@ -0,0 +1,91 @@ +use axum::{ + extract::{Path, State}, + middleware, + routing::get, + Extension, Json, Router, +}; + +use crate::{ + gpodder, + server::{ + error::{AppError, AppResult}, + gpodder::{ + auth_middleware, + format::{Format, StringWithFormat}, + models::{SyncStatus, SyncStatusDelta}, + }, + Context, + }, +}; + +pub fn router(ctx: Context) -> Router { + Router::new() + .route( + "/{username}", + get(get_sync_status).post(post_sync_status_changes), + ) + .layer(middleware::from_fn_with_state(ctx.clone(), auth_middleware)) +} + +pub async fn get_sync_status( + State(ctx): State, + Path(username): Path, + Extension(user): Extension, +) -> AppResult> { + if username.format != Format::Json { + return Err(AppError::NotFound); + } + + if *username != user.username { + return Err(AppError::BadRequest); + } + + Ok( + tokio::task::spawn_blocking(move || ctx.store.devices_by_sync_group(&user)) + .await + .unwrap() + .map(|(not_synchronized, synchronized)| { + Json(SyncStatus { + synchronized, + not_synchronized, + }) + })?, + ) +} + +pub async fn post_sync_status_changes( + State(ctx): State, + Path(username): Path, + Extension(user): Extension, + Json(delta): Json, +) -> AppResult> { + if username.format != Format::Json { + return Err(AppError::NotFound); + } + + if *username != user.username { + return Err(AppError::BadRequest); + } + + Ok(tokio::task::spawn_blocking(move || { + ctx.store.update_device_sync_status( + &user, + delta + .synchronize + .iter() + .map(|v| v.iter().map(|s| s.as_ref()).collect()) + .collect(), + delta.stop_synchronize.iter().map(|s| s.as_ref()).collect(), + )?; + + ctx.store.devices_by_sync_group(&user) + }) + .await + .unwrap() + .map(|(not_synchronized, synchronized)| { + Json(SyncStatus { + synchronized, + not_synchronized, + }) + })?) +} diff --git a/src/server/gpodder/models.rs b/src/server/gpodder/models.rs index 6098a9f..20d459f 100644 --- a/src/server/gpodder/models.rs +++ b/src/server/gpodder/models.rs @@ -73,6 +73,20 @@ pub struct EpisodeAction { pub action: EpisodeActionType, } +#[derive(Serialize)] +#[serde(rename_all = "kebab-case")] +pub struct SyncStatus { + pub synchronized: Vec>, + pub not_synchronized: Vec, +} + +#[derive(Deserialize)] +#[serde(rename_all = "kebab-case")] +pub struct SyncStatusDelta { + pub synchronize: Vec>, + pub stop_synchronize: Vec, +} + impl From for DeviceType { fn from(value: gpodder::DeviceType) -> Self { match value {