diff --git a/src/db/repository/device.rs b/src/db/repository/device.rs index cd1a4d9..38d5d4c 100644 --- a/src/db/repository/device.rs +++ b/src/db/repository/device.rs @@ -1,7 +1,5 @@ -use std::collections::HashSet; - use chrono::{DateTime, Utc}; -use diesel::{alias, dsl::not, prelude::*}; +use diesel::prelude::*; use super::SqliteRepository; use crate::{ @@ -187,54 +185,7 @@ impl gpodder::DeviceRepository for SqliteRepository { group_id: i64, time_changed: DateTime, ) -> Result<(), gpodder::AuthErr> { - let time_changed = time_changed.timestamp(); - let conn = &mut self.pool.get()?; - - conn.transaction(|conn| { - let device_ids: Vec = devices::table - .filter(devices::sync_group_id.eq(group_id)) - .select(devices::id) - .get_results(conn)?; - - // For each device in the group, we get the list of subscriptions not yet in its own - // non-deleted list, and add it to the database - for device_id in device_ids.iter().copied() { - let d1 = alias!(device_subscriptions as d1); - - let own_subscriptions = d1 - .filter( - d1.field(device_subscriptions::device_id) - .eq(device_id) - .and(d1.field(device_subscriptions::deleted).eq(false)), - ) - .select(d1.field(device_subscriptions::podcast_url)); - - let urls_to_add = device_subscriptions::table - .select(device_subscriptions::podcast_url) - .filter( - device_subscriptions::device_id - .eq_any(device_ids.iter()) - .and(device_subscriptions::deleted.eq(false)) - .and(not( - device_subscriptions::podcast_url.eq_any(own_subscriptions) - )), - ) - .distinct() - .load_iter(conn)? - .collect::, _>>()?; - - super::subscription::insert_subscriptions_for_single_device( - conn, - device_id, - urls_to_add.iter(), - time_changed, - )?; - } - - Ok::<_, diesel::result::Error>(()) - })?; - - Ok(()) + todo!() } fn devices_by_sync_group( diff --git a/src/db/repository/subscription.rs b/src/db/repository/subscription.rs index 447a27d..5448380 100644 --- a/src/db/repository/subscription.rs +++ b/src/db/repository/subscription.rs @@ -18,167 +18,6 @@ impl From<(String, i64)> for gpodder::Subscription { } } -fn set_subscriptions_for_single_device( - conn: &mut SqliteConnection, - device_id: i64, - urls: &HashSet, - time_changed: i64, -) -> QueryResult<()> { - // https://github.com/diesel-rs/diesel/discussions/2826 - // SQLite doesn't support default on conflict set values, so we can't handle this using - // on conflict. Therefore, we instead calculate which URLs should be inserted and which - // updated, so we avoid conflicts. - let urls_in_db: HashSet = device_subscriptions::table - .select(device_subscriptions::podcast_url) - .filter(device_subscriptions::device_id.eq(device_id)) - .get_results(conn)? - .into_iter() - .collect(); - - // URLs originally in the database that are no longer in the list - let urls_to_delete = urls_in_db.difference(&urls); - - // URLs not in the database that are in the new list - let urls_to_insert = urls.difference(&urls_in_db); - - // URLs that are in both the database and the new list. For these, those marked as - // "deleted" in the database are updated so they're no longer deleted, with their - // timestamp updated. - let urls_to_update = urls.intersection(&urls_in_db); - - // Mark the URLs to delete as properly deleted - diesel::update( - device_subscriptions::table.filter( - device_subscriptions::device_id - .eq(device_id) - .and(device_subscriptions::podcast_url.eq_any(urls_to_delete)), - ), - ) - .set(( - device_subscriptions::deleted.eq(true), - device_subscriptions::time_changed.eq(time_changed), - )) - .execute(conn)?; - - // Update the existing deleted URLs that are reinserted as no longer deleted - diesel::update( - device_subscriptions::table.filter( - device_subscriptions::device_id - .eq(device_id) - .and(device_subscriptions::podcast_url.eq_any(urls_to_update)) - .and(device_subscriptions::deleted.eq(true)), - ), - ) - .set(( - device_subscriptions::deleted.eq(false), - device_subscriptions::time_changed.eq(time_changed), - )) - .execute(conn)?; - - // Insert the new values into the database - diesel::insert_into(device_subscriptions::table) - .values( - urls_to_insert - .into_iter() - .map(|url| db::NewDeviceSubscription { - device_id, - podcast_url: url.to_string(), - deleted: false, - time_changed, - }) - .collect::>(), - ) - .execute(conn)?; - - Ok(()) -} - -/// Add the given URLs to the device's list of subscriptions, meaning the URLs are truly inserted -/// into the database. This function assumes the list of URLs is already free of URLs that already -/// have a corresponding row in the database, so no conflict checks are performed. -pub fn insert_subscriptions_for_single_device<'a>( - conn: &mut SqliteConnection, - device_id: i64, - urls: impl Iterator, - time_changed: i64, -) -> QueryResult<()> { - diesel::insert_into(device_subscriptions::table) - .values( - urls.into_iter() - .map(|url| db::NewDeviceSubscription { - device_id, - podcast_url: url.to_string(), - deleted: false, - time_changed, - }) - .collect::>(), - ) - .execute(conn)?; - - Ok(()) -} - -pub fn update_subscriptions_for_single_device( - conn: &mut SqliteConnection, - device_id: i64, - add: &HashSet, - remove: &HashSet, - time_changed: i64, -) -> QueryResult<()> { - let urls_in_db: HashSet = device_subscriptions::table - .select(device_subscriptions::podcast_url) - .filter(device_subscriptions::device_id.eq(device_id)) - .get_results(conn)? - .into_iter() - .collect(); - - // Subscriptions to remove are those that were already in the database and are now part - // of the removed list. Subscriptions that were never added in the first place don't - // need to be marked as deleted. We also only update those that aren't already marked - // as deleted. - let urls_to_delete = remove.intersection(&urls_in_db); - - diesel::update( - device_subscriptions::table.filter( - device_subscriptions::device_id - .eq(device_id) - .and(device_subscriptions::podcast_url.eq_any(urls_to_delete)) - .and(device_subscriptions::deleted.eq(false)), - ), - ) - .set(( - device_subscriptions::deleted.eq(true), - device_subscriptions::time_changed.eq(time_changed), - )) - .execute(conn)?; - - // Subscriptions to update are those that are already in the database, but are also in - // the added list. Only those who were originally marked as deleted get updated. - let urls_to_update = add.intersection(&urls_in_db); - - diesel::update( - device_subscriptions::table.filter( - device_subscriptions::device_id - .eq(device_id) - .and(device_subscriptions::podcast_url.eq_any(urls_to_update)) - .and(device_subscriptions::deleted.eq(true)), - ), - ) - .set(( - device_subscriptions::deleted.eq(false), - device_subscriptions::time_changed.eq(time_changed), - )) - .execute(conn)?; - - // Subscriptions to insert are those that aren't in the database and are part of the - // added list - let urls_to_insert = add.difference(&urls_in_db); - - insert_subscriptions_for_single_device(conn, device_id, urls_to_insert, time_changed)?; - - Ok(()) -} - impl gpodder::SubscriptionRepository for SqliteRepository { fn subscriptions_for_user( &self, @@ -227,33 +66,85 @@ impl gpodder::SubscriptionRepository for SqliteRepository { urls: Vec, time_changed: chrono::DateTime, ) -> Result<(), gpodder::AuthErr> { - let time_changed = time_changed.timestamp(); - let urls: HashSet = urls.into_iter().collect(); + // TODO use a better timestamp + let timestamp = time_changed.timestamp(); self.pool.get()?.transaction(|conn| { - let (device_id, group_id) = devices::table - .select((devices::id, devices::sync_group_id)) + let device = devices::table + .select(db::Device::as_select()) .filter( devices::user_id .eq(user.id) .and(devices::device_id.eq(device_id)), ) - .get_result::<(i64, Option)>(conn)?; + .get_result(conn)?; - // If the device is part of a sync group, we need to perform the update on every device - // in the group - if let Some(group_id) = group_id { - let device_ids: Vec = devices::table - .filter(devices::sync_group_id.eq(group_id)) - .select(devices::id) - .get_results(conn)?; + // https://github.com/diesel-rs/diesel/discussions/2826 + // SQLite doesn't support default on conflict set values, so we can't handle this using + // on conflict. Therefore, we instead calculate which URLs should be inserted and which + // updated, so we avoid conflicts. + let urls: HashSet = urls.into_iter().collect(); + let urls_in_db: HashSet = device_subscriptions::table + .select(device_subscriptions::podcast_url) + .filter(device_subscriptions::device_id.eq(device.id)) + .get_results(conn)? + .into_iter() + .collect(); - for device_id in device_ids { - set_subscriptions_for_single_device(conn, device_id, &urls, time_changed)?; - } - } else { - set_subscriptions_for_single_device(conn, device_id, &urls, time_changed)?; - } + // URLs originally in the database that are no longer in the list + let urls_to_delete = urls_in_db.difference(&urls); + + // URLs not in the database that are in the new list + let urls_to_insert = urls.difference(&urls_in_db); + + // URLs that are in both the database and the new list. For these, those marked as + // "deleted" in the database are updated so they're no longer deleted, with their + // timestamp updated. + let urls_to_update = urls.intersection(&urls_in_db); + + // Mark the URLs to delete as properly deleted + diesel::update( + device_subscriptions::table.filter( + device_subscriptions::device_id + .eq(device.id) + .and(device_subscriptions::podcast_url.eq_any(urls_to_delete)), + ), + ) + .set(( + device_subscriptions::deleted.eq(true), + device_subscriptions::time_changed.eq(timestamp), + )) + .execute(conn)?; + + // Update the existing deleted URLs that are reinserted as no longer deleted + diesel::update( + device_subscriptions::table.filter( + device_subscriptions::device_id + .eq(device.id) + .and(device_subscriptions::podcast_url.eq_any(urls_to_update)) + .and(device_subscriptions::deleted.eq(true)), + ), + ) + .set(( + device_subscriptions::deleted.eq(false), + device_subscriptions::time_changed.eq(timestamp), + )) + .execute(conn)?; + + // Insert the new values into the database + diesel::insert_into(device_subscriptions::table) + .values( + urls_to_insert + .into_iter() + .map(|url| db::NewDeviceSubscription { + device_id: device.id, + podcast_url: url.to_string(), + deleted: false, + time_changed: timestamp, + }) + .collect::>(), + ) + .execute(conn)?; Ok::<_, diesel::result::Error>(()) })?; @@ -269,7 +160,8 @@ impl gpodder::SubscriptionRepository for SqliteRepository { remove: Vec, time_changed: chrono::DateTime, ) -> Result<(), gpodder::AuthErr> { - let time_changed = time_changed.timestamp(); + // TODO use a better timestamp + let timestamp = time_changed.timestamp(); // TODO URLs that are in both the added and removed lists will currently get "re-added", // meaning their change timestamp will be updated even though they haven't really changed. @@ -277,41 +169,77 @@ impl gpodder::SubscriptionRepository for SqliteRepository { let remove: HashSet<_> = remove.into_iter().collect(); self.pool.get()?.transaction(|conn| { - let (device_id, group_id) = devices::table - .select((devices::id, devices::sync_group_id)) + let device = devices::table + .select(db::Device::as_select()) .filter( devices::user_id .eq(user.id) .and(devices::device_id.eq(device_id)), ) - .get_result::<(i64, Option)>(conn)?; + .get_result(conn)?; - // If the device is part of a sync group, we need to perform the update on every device - // in the group - if let Some(group_id) = group_id { - let device_ids: Vec = devices::table - .filter(devices::sync_group_id.eq(group_id)) - .select(devices::id) - .get_results(conn)?; + let urls_in_db: HashSet = device_subscriptions::table + .select(device_subscriptions::podcast_url) + .filter(device_subscriptions::device_id.eq(device.id)) + .get_results(conn)? + .into_iter() + .collect(); - for device_id in device_ids { - update_subscriptions_for_single_device( - conn, - device_id, - &add, - &remove, - time_changed, - )?; - } - } else { - update_subscriptions_for_single_device( - conn, - device_id, - &add, - &remove, - time_changed, - )?; - } + // Subscriptions to remove are those that were already in the database and are now part + // of the removed list. Subscriptions that were never added in the first place don't + // need to be marked as deleted. We also only update those that aren't already marked + // as deleted. + let urls_to_delete = remove.intersection(&urls_in_db); + + diesel::update( + device_subscriptions::table.filter( + device_subscriptions::device_id + .eq(device.id) + .and(device_subscriptions::podcast_url.eq_any(urls_to_delete)) + .and(device_subscriptions::deleted.eq(false)), + ), + ) + .set(( + device_subscriptions::deleted.eq(true), + device_subscriptions::time_changed.eq(timestamp), + )) + .execute(conn)?; + + // Subscriptions to update are those that are already in the database, but are also in + // the added list. Only those who were originally marked as deleted get updated. + let urls_to_update = add.intersection(&urls_in_db); + + diesel::update( + device_subscriptions::table.filter( + device_subscriptions::device_id + .eq(device.id) + .and(device_subscriptions::podcast_url.eq_any(urls_to_update)) + .and(device_subscriptions::deleted.eq(true)), + ), + ) + .set(( + device_subscriptions::deleted.eq(false), + device_subscriptions::time_changed.eq(timestamp), + )) + .execute(conn)?; + + // Subscriptions to insert are those that aren't in the database and are part of the + // added list + let urls_to_insert = add.difference(&urls_in_db); + + diesel::insert_into(device_subscriptions::table) + .values( + urls_to_insert + .into_iter() + .map(|url| db::NewDeviceSubscription { + device_id: device.id, + podcast_url: url.to_string(), + deleted: false, + time_changed: timestamp, + }) + .collect::>(), + ) + .execute(conn)?; Ok::<_, diesel::result::Error>(()) })?; diff --git a/src/gpodder/repository.rs b/src/gpodder/repository.rs index d197cd5..62f860e 100644 --- a/src/gpodder/repository.rs +++ b/src/gpodder/repository.rs @@ -1,4 +1,4 @@ -use std::{collections::HashSet, sync::Arc}; +use std::sync::Arc; use argon2::{Argon2, PasswordHash, PasswordVerifier}; use chrono::{DateTime, TimeDelta, Utc}; @@ -100,30 +100,7 @@ impl GpodderRepository { sync: Vec>, unsync: Vec<&str>, ) -> Result<(), AuthErr> { - let now = Utc::now(); - let mut unsync: HashSet<&str> = HashSet::from_iter(unsync); - let original_unsync = unsync.clone(); - - for group in sync { - // We want to remove devices that are provided in both a sync group and the unsync - // category, as this does not make sense to perform. We use the original unsync array, - // as a device could be provided multiple times in different sync groups. - let (remove, remaining): (Vec<&str>, Vec<&str>) = - group.into_iter().partition(|d| original_unsync.contains(d)); - - for device_id in remove { - unsync.remove(device_id); - } - - let group_id = self.store.merge_sync_groups(user, remaining)?; - self.store.synchronize_sync_group(group_id, now)?; - } - - // Finally we unsync the remaining devices - self.store - .remove_from_sync_group(user, unsync.into_iter().collect())?; - - Ok(()) + todo!("perform diff devices to sync and unsync") } pub fn devices_by_sync_group( diff --git a/src/server/gpodder/advanced/mod.rs b/src/server/gpodder/advanced/mod.rs index c9ca0cd..a1f596c 100644 --- a/src/server/gpodder/advanced/mod.rs +++ b/src/server/gpodder/advanced/mod.rs @@ -2,7 +2,6 @@ mod auth; mod devices; mod episodes; mod subscriptions; -mod sync; use axum::Router; @@ -14,5 +13,4 @@ pub fn router(ctx: Context) -> Router { .nest("/devices", devices::router(ctx.clone())) .nest("/subscriptions", subscriptions::router(ctx.clone())) .nest("/episodes", episodes::router(ctx.clone())) - .nest("/sync-devices", sync::router(ctx.clone())) } diff --git a/src/server/gpodder/advanced/sync.rs b/src/server/gpodder/advanced/sync.rs deleted file mode 100644 index fc97869..0000000 --- a/src/server/gpodder/advanced/sync.rs +++ /dev/null @@ -1,91 +0,0 @@ -use axum::{ - extract::{Path, State}, - middleware, - routing::get, - Extension, Json, Router, -}; - -use crate::{ - gpodder, - server::{ - error::{AppError, AppResult}, - gpodder::{ - auth_middleware, - format::{Format, StringWithFormat}, - models::{SyncStatus, SyncStatusDelta}, - }, - Context, - }, -}; - -pub fn router(ctx: Context) -> Router { - Router::new() - .route( - "/{username}", - get(get_sync_status).post(post_sync_status_changes), - ) - .layer(middleware::from_fn_with_state(ctx.clone(), auth_middleware)) -} - -pub async fn get_sync_status( - State(ctx): State, - Path(username): Path, - Extension(user): Extension, -) -> AppResult> { - if username.format != Format::Json { - return Err(AppError::NotFound); - } - - if *username != user.username { - return Err(AppError::BadRequest); - } - - Ok( - tokio::task::spawn_blocking(move || ctx.store.devices_by_sync_group(&user)) - .await - .unwrap() - .map(|(not_synchronized, synchronized)| { - Json(SyncStatus { - synchronized, - not_synchronized, - }) - })?, - ) -} - -pub async fn post_sync_status_changes( - State(ctx): State, - Path(username): Path, - Extension(user): Extension, - Json(delta): Json, -) -> AppResult> { - if username.format != Format::Json { - return Err(AppError::NotFound); - } - - if *username != user.username { - return Err(AppError::BadRequest); - } - - Ok(tokio::task::spawn_blocking(move || { - ctx.store.update_device_sync_status( - &user, - delta - .synchronize - .iter() - .map(|v| v.iter().map(|s| s.as_ref()).collect()) - .collect(), - delta.stop_synchronize.iter().map(|s| s.as_ref()).collect(), - )?; - - ctx.store.devices_by_sync_group(&user) - }) - .await - .unwrap() - .map(|(not_synchronized, synchronized)| { - Json(SyncStatus { - synchronized, - not_synchronized, - }) - })?) -} diff --git a/src/server/gpodder/models.rs b/src/server/gpodder/models.rs index 20d459f..6098a9f 100644 --- a/src/server/gpodder/models.rs +++ b/src/server/gpodder/models.rs @@ -73,20 +73,6 @@ pub struct EpisodeAction { pub action: EpisodeActionType, } -#[derive(Serialize)] -#[serde(rename_all = "kebab-case")] -pub struct SyncStatus { - pub synchronized: Vec>, - pub not_synchronized: Vec, -} - -#[derive(Deserialize)] -#[serde(rename_all = "kebab-case")] -pub struct SyncStatusDelta { - pub synchronize: Vec>, - pub stop_synchronize: Vec, -} - impl From for DeviceType { fn from(value: gpodder::DeviceType) -> Self { match value {