use std::collections::HashSet; use diesel::prelude::*; use super::SqliteRepository; use crate::{ db::{self, schema::*}, gpodder, }; impl gpodder::SubscriptionRepository for SqliteRepository { fn subscriptions_for_user( &self, user: &gpodder::User, ) -> Result, gpodder::AuthErr> { Ok(device_subscriptions::table .inner_join(devices::table) .filter(devices::user_id.eq(user.id)) .select(device_subscriptions::podcast_url) .distinct() .get_results(&mut self.pool.get()?)?) } fn subscriptions_for_device( &self, user: &gpodder::User, device_id: &str, ) -> Result, gpodder::AuthErr> { Ok(device_subscriptions::table .inner_join(devices::table) .filter( devices::user_id .eq(user.id) .and(devices::device_id.eq(device_id)), ) .select(device_subscriptions::podcast_url) .get_results(&mut self.pool.get()?)?) } fn set_subscriptions_for_device( &self, user: &gpodder::User, device_id: &str, urls: Vec, ) -> Result { // TODO use a better timestamp let timestamp = chrono::Utc::now().timestamp_millis(); self.pool.get()?.transaction(|conn| { let device = devices::table .select(db::Device::as_select()) .filter( devices::user_id .eq(user.id) .and(devices::device_id.eq(device_id)), ) .get_result(conn)?; // https://github.com/diesel-rs/diesel/discussions/2826 // SQLite doesn't support default on conflict set values, so we can't handle this using // on conflict. Therefore, we instead calculate which URLs should be inserted and which // updated, so we avoid conflicts. let urls: HashSet = urls.into_iter().collect(); let urls_in_db: HashSet = device_subscriptions::table .select(device_subscriptions::podcast_url) .filter(device_subscriptions::device_id.eq(device.id)) .get_results(conn)? .into_iter() .collect(); // URLs originally in the database that are no longer in the list let urls_to_delete = urls_in_db.difference(&urls); // URLs not in the database that are in the new list let urls_to_insert = urls.difference(&urls_in_db); // URLs that are in both the database and the new list. For these, those marked as // "deleted" in the database are updated so they're no longer deleted, with their // timestamp updated. let urls_to_update = urls.intersection(&urls_in_db); // Mark the URLs to delete as properly deleted diesel::update( device_subscriptions::table.filter( device_subscriptions::device_id .eq(device.id) .and(device_subscriptions::podcast_url.eq_any(urls_to_delete)), ), ) .set(( device_subscriptions::deleted.eq(true), device_subscriptions::time_changed.eq(timestamp), )) .execute(conn)?; // Update the existing deleted URLs that are reinserted as no longer deleted diesel::update( device_subscriptions::table.filter( device_subscriptions::device_id .eq(device.id) .and(device_subscriptions::podcast_url.eq_any(urls_to_update)) .and(device_subscriptions::deleted.eq(true)), ), ) .set(( device_subscriptions::deleted.eq(false), device_subscriptions::time_changed.eq(timestamp), )) .execute(conn)?; // Insert the new values into the database diesel::insert_into(device_subscriptions::table) .values( urls_to_insert .into_iter() .map(|url| db::NewDeviceSubscription { device_id: device.id, podcast_url: url.to_string(), deleted: false, time_changed: timestamp, }) .collect::>(), ) .execute(conn)?; Ok::<_, diesel::result::Error>(()) })?; Ok(timestamp + 1) } fn update_subscriptions_for_device( &self, user: &gpodder::User, device_id: &str, add: Vec, remove: Vec, ) -> Result { // TODO use a better timestamp let timestamp = chrono::Utc::now().timestamp_millis(); // TODO URLs that are in both the added and removed lists will currently get "re-added", // meaning their change timestamp will be updated even though they haven't really changed. let add: HashSet<_> = add.into_iter().collect(); let remove: HashSet<_> = remove.into_iter().collect(); self.pool.get()?.transaction(|conn| { let device = devices::table .select(db::Device::as_select()) .filter( devices::user_id .eq(user.id) .and(devices::device_id.eq(device_id)), ) .get_result(conn)?; let urls_in_db: HashSet = device_subscriptions::table .select(device_subscriptions::podcast_url) .filter(device_subscriptions::device_id.eq(device.id)) .get_results(conn)? .into_iter() .collect(); // Subscriptions to remove are those that were already in the database and are now part // of the removed list. Subscriptions that were never added in the first place don't // need to be marked as deleted. We also only update those that aren't already marked // as deleted. let urls_to_delete = remove.intersection(&urls_in_db); diesel::update( device_subscriptions::table.filter( device_subscriptions::device_id .eq(device.id) .and(device_subscriptions::podcast_url.eq_any(urls_to_delete)) .and(device_subscriptions::deleted.eq(false)), ), ) .set(( device_subscriptions::deleted.eq(true), device_subscriptions::time_changed.eq(timestamp), )) .execute(conn)?; // Subscriptions to update are those that are already in the database, but are also in // the added list. Only those who were originally marked as deleted get updated. let urls_to_update = add.intersection(&urls_in_db); diesel::update( device_subscriptions::table.filter( device_subscriptions::device_id .eq(device.id) .and(device_subscriptions::podcast_url.eq_any(urls_to_update)) .and(device_subscriptions::deleted.eq(true)), ), ) .set(( device_subscriptions::deleted.eq(false), device_subscriptions::time_changed.eq(timestamp), )) .execute(conn)?; // Subscriptions to insert are those that aren't in the database and are part of the // added list let urls_to_insert = add.difference(&urls_in_db); diesel::insert_into(device_subscriptions::table) .values( urls_to_insert .into_iter() .map(|url| db::NewDeviceSubscription { device_id: device.id, podcast_url: url.to_string(), deleted: false, time_changed: timestamp, }) .collect::>(), ) .execute(conn)?; Ok::<_, diesel::result::Error>(()) })?; Ok(timestamp + 1) } fn subscription_updates_for_device( &self, user: &gpodder::User, device_id: &str, since: i64, ) -> Result<(i64, Vec, Vec), gpodder::AuthErr> { let (mut timestamp, mut added, mut removed) = (0, Vec::new(), Vec::new()); let query = device_subscriptions::table .inner_join(devices::table) .filter( devices::user_id .eq(user.id) .and(devices::device_id.eq(device_id)) .and(device_subscriptions::time_changed.ge(since)), ) .select(db::DeviceSubscription::as_select()); for sub in query.load_iter(&mut self.pool.get()?)? { let sub = sub?; if sub.deleted { removed.push(sub.podcast_url); } else { added.push(sub.podcast_url); } timestamp = timestamp.max(sub.time_changed); } Ok((timestamp + 1, added, removed)) } }