diff --git a/src/db/repository/subscription.rs b/src/db/repository/subscription.rs index febe9dc..6ab8fe5 100644 --- a/src/db/repository/subscription.rs +++ b/src/db/repository/subscription.rs @@ -42,9 +42,10 @@ impl gpodder::SubscriptionRepository for SqliteRepository { user: &gpodder::User, device_id: &str, urls: Vec, - ) -> Result { + time_changed: chrono::DateTime, + ) -> Result<(), gpodder::AuthErr> { // TODO use a better timestamp - let timestamp = chrono::Utc::now().timestamp(); + let timestamp = time_changed.timestamp(); self.pool.get()?.transaction(|conn| { let device = devices::table @@ -126,7 +127,7 @@ impl gpodder::SubscriptionRepository for SqliteRepository { Ok::<_, diesel::result::Error>(()) })?; - Ok(timestamp + 1) + Ok(()) } fn update_subscriptions_for_device( @@ -135,9 +136,10 @@ impl gpodder::SubscriptionRepository for SqliteRepository { device_id: &str, add: Vec, remove: Vec, - ) -> Result { + time_changed: chrono::DateTime, + ) -> Result<(), gpodder::AuthErr> { // TODO use a better timestamp - let timestamp = chrono::Utc::now().timestamp_millis(); + let timestamp = time_changed.timestamp(); // TODO URLs that are in both the added and removed lists will currently get "re-added", // meaning their change timestamp will be updated even though they haven't really changed. @@ -220,16 +222,18 @@ impl gpodder::SubscriptionRepository for SqliteRepository { Ok::<_, diesel::result::Error>(()) })?; - Ok(timestamp + 1) + Ok(()) } fn subscription_updates_for_device( &self, user: &gpodder::User, device_id: &str, - since: i64, - ) -> Result<(i64, Vec, Vec), gpodder::AuthErr> { - let (mut timestamp, mut added, mut removed) = (0, Vec::new(), Vec::new()); + since: chrono::DateTime, + ) -> Result<(chrono::DateTime, Vec, Vec), gpodder::AuthErr> { + let since = since.timestamp(); + + let (mut added, mut removed) = (Vec::new(), Vec::new()); let query = device_subscriptions::table .inner_join(devices::table) @@ -241,6 +245,8 @@ impl gpodder::SubscriptionRepository for SqliteRepository { ) .select(db::DeviceSubscription::as_select()); + let mut max_time: chrono::DateTime = chrono::DateTime::::MIN_UTC; + for sub in query.load_iter(&mut self.pool.get()?)? { let sub = sub?; @@ -250,9 +256,9 @@ impl gpodder::SubscriptionRepository for SqliteRepository { added.push(sub.podcast_url); } - timestamp = timestamp.max(sub.time_changed); + max_time = max_time.max(chrono::DateTime::from_timestamp(sub.time_changed, 0).unwrap()); } - Ok((timestamp + 1, added, removed)) + Ok((max_time, added, removed)) } } diff --git a/src/gpodder/mod.rs b/src/gpodder/mod.rs index eb6daa2..18f23b2 100644 --- a/src/gpodder/mod.rs +++ b/src/gpodder/mod.rs @@ -11,9 +11,9 @@ pub enum AuthErr { Other(Box), } -pub trait Store: AuthStore + DeviceRepository {} +pub trait Store: AuthStore + DeviceRepository + SubscriptionRepository {} -impl Store for T where T: AuthStore + DeviceRepository {} +impl Store for T where T: AuthStore + DeviceRepository + SubscriptionRepository {} pub trait AuthRepository { /// Validate the given session ID and return its user. @@ -78,7 +78,8 @@ pub trait SubscriptionRepository { user: &User, device_id: &str, urls: Vec, - ) -> Result; + time_changed: chrono::DateTime, + ) -> Result<(), AuthErr>; /// Update the list of subscriptions for a device by adding and removing the given URLs fn update_subscriptions_for_device( @@ -87,15 +88,16 @@ pub trait SubscriptionRepository { device_id: &str, add: Vec, remove: Vec, - ) -> Result; + time_changed: chrono::DateTime, + ) -> Result<(), AuthErr>; /// Returns the changes in subscriptions since the given timestamp. fn subscription_updates_for_device( &self, user: &User, device_id: &str, - since: i64, - ) -> Result<(i64, Vec, Vec), AuthErr>; + since: chrono::DateTime, + ) -> Result<(chrono::DateTime, Vec, Vec), AuthErr>; } pub trait EpisodeActionRepository { diff --git a/src/gpodder/repository.rs b/src/gpodder/repository.rs index 9314369..e0d9d12 100644 --- a/src/gpodder/repository.rs +++ b/src/gpodder/repository.rs @@ -82,4 +82,62 @@ impl GpodderRepository { ) -> Result<(), AuthErr> { self.store.update_device_info(user, device_id, patch) } + + pub fn subscriptions_for_device( + &self, + user: &models::User, + device_id: &str, + ) -> Result, AuthErr> { + self.store.subscriptions_for_device(user, device_id) + } + + pub fn subscriptions_for_user(&self, user: &models::User) -> Result, AuthErr> { + self.store.subscriptions_for_user(user) + } + + pub fn set_subscriptions_for_device( + &self, + user: &models::User, + device_id: &str, + urls: Vec, + ) -> Result, AuthErr> { + let time_changed = chrono::Utc::now(); + + self.store + .set_subscriptions_for_device(user, device_id, urls, time_changed)?; + + Ok(time_changed + chrono::TimeDelta::seconds(1)) + } + + pub fn update_subscriptions_for_device( + &self, + user: &models::User, + device_id: &str, + add: Vec, + remove: Vec, + ) -> Result, AuthErr> { + let time_changed = chrono::Utc::now(); + + self.store + .update_subscriptions_for_device(user, device_id, add, remove, time_changed)?; + + Ok(time_changed + chrono::TimeDelta::seconds(1)) + } + + pub fn subscription_updates_for_device( + &self, + user: &models::User, + device_id: &str, + since: chrono::DateTime, + ) -> Result<(chrono::DateTime, Vec, Vec), AuthErr> { + let (max_time_changed, added, removed) = self + .store + .subscription_updates_for_device(user, device_id, since)?; + + Ok(( + max_time_changed + chrono::TimeDelta::seconds(1), + added, + removed, + )) + } } diff --git a/src/server/gpodder/advanced/subscriptions.rs b/src/server/gpodder/advanced/subscriptions.rs index 98d464a..dbb8d28 100644 --- a/src/server/gpodder/advanced/subscriptions.rs +++ b/src/server/gpodder/advanced/subscriptions.rs @@ -7,7 +7,7 @@ use axum::{ use serde::Deserialize; use crate::{ - gpodder::{self, SubscriptionRepository}, + gpodder, server::{ error::{AppError, AppResult}, gpodder::{ @@ -43,14 +43,14 @@ pub async fn post_subscription_changes( } Ok(tokio::task::spawn_blocking(move || { - ctx.repo + ctx.store .update_subscriptions_for_device(&user, &id, delta.add, delta.remove) }) .await .unwrap() - .map(|timestamp| { + .map(|time_changed| { Json(UpdatedUrlsResponse { - timestamp, + timestamp: time_changed.timestamp(), update_urls: Vec::new(), }) })?) @@ -76,17 +76,18 @@ pub async fn get_subscription_changes( return Err(AppError::BadRequest); } + let since = chrono::DateTime::from_timestamp(query.since, 0).ok_or(AppError::BadRequest)?; + Ok(tokio::task::spawn_blocking(move || { - ctx.repo - .subscription_updates_for_device(&user, &id, query.since) + ctx.store.subscription_updates_for_device(&user, &id, since) }) .await .unwrap() - .map(|(timestamp, add, remove)| { + .map(|(next_time_changed, add, remove)| { Json(SubscriptionDeltaResponse { add, remove, - timestamp, + timestamp: next_time_changed.timestamp(), }) })?) } diff --git a/src/server/gpodder/simple/subscriptions.rs b/src/server/gpodder/simple/subscriptions.rs index 4e6f7c6..4f6266f 100644 --- a/src/server/gpodder/simple/subscriptions.rs +++ b/src/server/gpodder/simple/subscriptions.rs @@ -6,7 +6,7 @@ use axum::{ }; use crate::{ - gpodder::{self, SubscriptionRepository}, + gpodder, server::{ error::{AppError, AppResult}, gpodder::{auth_middleware, format::StringWithFormat}, @@ -34,7 +34,7 @@ pub async fn get_device_subscriptions( } Ok( - tokio::task::spawn_blocking(move || ctx.repo.subscriptions_for_device(&user, &id)) + tokio::task::spawn_blocking(move || ctx.store.subscriptions_for_device(&user, &id)) .await .unwrap() .map(Json)?, @@ -51,7 +51,7 @@ pub async fn get_user_subscriptions( } Ok( - tokio::task::spawn_blocking(move || ctx.repo.subscriptions_for_user(&user)) + tokio::task::spawn_blocking(move || ctx.store.subscriptions_for_user(&user)) .await .unwrap() .map(Json)?, @@ -68,12 +68,10 @@ pub async fn put_device_subscriptions( return Err(AppError::BadRequest); } - Ok( - tokio::task::spawn_blocking(move || { - ctx.repo.set_subscriptions_for_device(&user, &id, urls) - }) - .await - .unwrap() - .map(|_| ())?, - ) + Ok(tokio::task::spawn_blocking(move || { + ctx.store.set_subscriptions_for_device(&user, &id, urls) + }) + .await + .unwrap() + .map(|_| ())?) }