use std::collections::HashSet; use chrono::DateTime; use diesel::prelude::*; use super::SqliteRepository; use crate::{ db::{self, schema::*}, gpodder, }; impl From<(String, i64)> for gpodder::Subscription { fn from((url, ts): (String, i64)) -> Self { Self { url, time_changed: DateTime::from_timestamp(ts, 0).unwrap(), } } } fn set_subscriptions_for_single_device( conn: &mut SqliteConnection, device_id: i64, urls: &HashSet, time_changed: i64, ) -> QueryResult<()> { // https://github.com/diesel-rs/diesel/discussions/2826 // SQLite doesn't support default on conflict set values, so we can't handle this using // on conflict. Therefore, we instead calculate which URLs should be inserted and which // updated, so we avoid conflicts. let urls_in_db: HashSet = device_subscriptions::table .select(device_subscriptions::podcast_url) .filter(device_subscriptions::device_id.eq(device_id)) .get_results(conn)? .into_iter() .collect(); // URLs originally in the database that are no longer in the list let urls_to_delete = urls_in_db.difference(&urls); // URLs not in the database that are in the new list let urls_to_insert = urls.difference(&urls_in_db); // URLs that are in both the database and the new list. For these, those marked as // "deleted" in the database are updated so they're no longer deleted, with their // timestamp updated. let urls_to_update = urls.intersection(&urls_in_db); // Mark the URLs to delete as properly deleted diesel::update( device_subscriptions::table.filter( device_subscriptions::device_id .eq(device_id) .and(device_subscriptions::podcast_url.eq_any(urls_to_delete)), ), ) .set(( device_subscriptions::deleted.eq(true), device_subscriptions::time_changed.eq(time_changed), )) .execute(conn)?; // Update the existing deleted URLs that are reinserted as no longer deleted diesel::update( device_subscriptions::table.filter( device_subscriptions::device_id .eq(device_id) .and(device_subscriptions::podcast_url.eq_any(urls_to_update)) .and(device_subscriptions::deleted.eq(true)), ), ) .set(( device_subscriptions::deleted.eq(false), device_subscriptions::time_changed.eq(time_changed), )) .execute(conn)?; // Insert the new values into the database diesel::insert_into(device_subscriptions::table) .values( urls_to_insert .into_iter() .map(|url| db::NewDeviceSubscription { device_id, podcast_url: url.to_string(), deleted: false, time_changed, }) .collect::>(), ) .execute(conn)?; Ok(()) } /// Add the given URLs to the device's list of subscriptions, meaning the URLs are truly inserted /// into the database. This function assumes the list of URLs is already free of URLs that already /// have a corresponding row in the database, so no conflict checks are performed. pub fn insert_subscriptions_for_single_device<'a>( conn: &mut SqliteConnection, device_id: i64, urls: impl Iterator, time_changed: i64, ) -> QueryResult<()> { diesel::insert_into(device_subscriptions::table) .values( urls.into_iter() .map(|url| db::NewDeviceSubscription { device_id, podcast_url: url.to_string(), deleted: false, time_changed, }) .collect::>(), ) .execute(conn)?; Ok(()) } pub fn update_subscriptions_for_single_device( conn: &mut SqliteConnection, device_id: i64, add: &HashSet, remove: &HashSet, time_changed: i64, ) -> QueryResult<()> { let urls_in_db: HashSet = device_subscriptions::table .select(device_subscriptions::podcast_url) .filter(device_subscriptions::device_id.eq(device_id)) .get_results(conn)? .into_iter() .collect(); // Subscriptions to remove are those that were already in the database and are now part // of the removed list. Subscriptions that were never added in the first place don't // need to be marked as deleted. We also only update those that aren't already marked // as deleted. let urls_to_delete = remove.intersection(&urls_in_db); diesel::update( device_subscriptions::table.filter( device_subscriptions::device_id .eq(device_id) .and(device_subscriptions::podcast_url.eq_any(urls_to_delete)) .and(device_subscriptions::deleted.eq(false)), ), ) .set(( device_subscriptions::deleted.eq(true), device_subscriptions::time_changed.eq(time_changed), )) .execute(conn)?; // Subscriptions to update are those that are already in the database, but are also in // the added list. Only those who were originally marked as deleted get updated. let urls_to_update = add.intersection(&urls_in_db); diesel::update( device_subscriptions::table.filter( device_subscriptions::device_id .eq(device_id) .and(device_subscriptions::podcast_url.eq_any(urls_to_update)) .and(device_subscriptions::deleted.eq(true)), ), ) .set(( device_subscriptions::deleted.eq(false), device_subscriptions::time_changed.eq(time_changed), )) .execute(conn)?; // Subscriptions to insert are those that aren't in the database and are part of the // added list let urls_to_insert = add.difference(&urls_in_db); insert_subscriptions_for_single_device(conn, device_id, urls_to_insert, time_changed)?; Ok(()) } impl gpodder::SubscriptionRepository for SqliteRepository { fn subscriptions_for_user( &self, user: &gpodder::User, ) -> Result, gpodder::AuthErr> { Ok(device_subscriptions::table .inner_join(devices::table) .filter(devices::user_id.eq(user.id)) .select(( device_subscriptions::podcast_url, device_subscriptions::time_changed, )) .distinct() .get_results::<(String, i64)>(&mut self.pool.get()?)? .into_iter() .map(Into::into) .collect()) } fn subscriptions_for_device( &self, user: &gpodder::User, device_id: &str, ) -> Result, gpodder::AuthErr> { Ok(device_subscriptions::table .inner_join(devices::table) .filter( devices::user_id .eq(user.id) .and(devices::device_id.eq(device_id)), ) .select(( device_subscriptions::podcast_url, device_subscriptions::time_changed, )) .get_results::<(String, i64)>(&mut self.pool.get()?)? .into_iter() .map(Into::into) .collect()) } fn set_subscriptions_for_device( &self, user: &gpodder::User, device_id: &str, urls: Vec, time_changed: chrono::DateTime, ) -> Result<(), gpodder::AuthErr> { let time_changed = time_changed.timestamp(); let urls: HashSet = urls.into_iter().collect(); self.pool.get()?.transaction(|conn| { let (device_id, group_id) = devices::table .select((devices::id, devices::sync_group_id)) .filter( devices::user_id .eq(user.id) .and(devices::device_id.eq(device_id)), ) .get_result::<(i64, Option)>(conn)?; // If the device is part of a sync group, we need to perform the update on every device // in the group if let Some(group_id) = group_id { let device_ids: Vec = devices::table .filter(devices::sync_group_id.eq(group_id)) .select(devices::id) .get_results(conn)?; for device_id in device_ids { set_subscriptions_for_single_device(conn, device_id, &urls, time_changed)?; } } else { set_subscriptions_for_single_device(conn, device_id, &urls, time_changed)?; } Ok::<_, diesel::result::Error>(()) })?; Ok(()) } fn update_subscriptions_for_device( &self, user: &gpodder::User, device_id: &str, add: Vec, remove: Vec, time_changed: chrono::DateTime, ) -> Result<(), gpodder::AuthErr> { let time_changed = time_changed.timestamp(); // TODO URLs that are in both the added and removed lists will currently get "re-added", // meaning their change timestamp will be updated even though they haven't really changed. let add: HashSet<_> = add.into_iter().collect(); let remove: HashSet<_> = remove.into_iter().collect(); self.pool.get()?.transaction(|conn| { let (device_id, group_id) = devices::table .select((devices::id, devices::sync_group_id)) .filter( devices::user_id .eq(user.id) .and(devices::device_id.eq(device_id)), ) .get_result::<(i64, Option)>(conn)?; // If the device is part of a sync group, we need to perform the update on every device // in the group if let Some(group_id) = group_id { let device_ids: Vec = devices::table .filter(devices::sync_group_id.eq(group_id)) .select(devices::id) .get_results(conn)?; for device_id in device_ids { update_subscriptions_for_single_device( conn, device_id, &add, &remove, time_changed, )?; } } else { update_subscriptions_for_single_device( conn, device_id, &add, &remove, time_changed, )?; } Ok::<_, diesel::result::Error>(()) })?; Ok(()) } fn subscription_updates_for_device( &self, user: &gpodder::User, device_id: &str, since: chrono::DateTime, ) -> Result<(Vec, Vec), gpodder::AuthErr> { let since = since.timestamp(); let (mut added, mut removed) = (Vec::new(), Vec::new()); let query = device_subscriptions::table .inner_join(devices::table) .filter( devices::user_id .eq(user.id) .and(devices::device_id.eq(device_id)) .and(device_subscriptions::time_changed.ge(since)), ) .select(db::DeviceSubscription::as_select()); for sub in query.load_iter(&mut self.pool.get()?)? { let sub = sub?; if sub.deleted { removed.push(gpodder::Subscription { url: sub.podcast_url, time_changed: DateTime::from_timestamp(sub.time_changed, 0).unwrap(), }); } else { added.push(gpodder::Subscription { url: sub.podcast_url, time_changed: DateTime::from_timestamp(sub.time_changed, 0).unwrap(), }); } } Ok((added, removed)) } }