diff --git a/src/cli/serve.rs b/src/cli/serve.rs index e1d0fa3..664a1e0 100644 --- a/src/cli/serve.rs +++ b/src/cli/serve.rs @@ -9,7 +9,6 @@ pub fn serve(config: &crate::config::Config) -> u8 { let repo = db::SqliteRepository::from(pool); let ctx = server::Context { - repo: repo.clone(), store: crate::gpodder::GpodderRepository::new(repo), }; let app = server::app(ctx); diff --git a/src/db/repository/episode_action.rs b/src/db/repository/episode_action.rs index 3f66bc9..5a45cd2 100644 --- a/src/db/repository/episode_action.rs +++ b/src/db/repository/episode_action.rs @@ -1,4 +1,4 @@ -use chrono::DateTime; +use chrono::{DateTime, Utc}; use diesel::prelude::*; use super::SqliteRepository; @@ -26,7 +26,7 @@ impl From for db::NewEpisodeAction { podcast_url: value.podcast, episode_url: value.episode, time_changed: 0, - timestamp: value.timestamp.map(|t| t.and_utc().timestamp()), + timestamp: value.timestamp.map(|t| t.timestamp()), action, started, position, @@ -58,7 +58,8 @@ impl From<(Option, db::EpisodeAction)> for gpodder::EpisodeAction { // SAFETY the input to the from_timestamp function is always the result of a // previous timestamp() function call, which is guaranteed to be each other's // reverse - .map(|ts| DateTime::from_timestamp(ts, 0).unwrap().naive_utc()), + .map(|ts| DateTime::from_timestamp(ts, 0).unwrap()), + time_changed: DateTime::from_timestamp(db_action.time_changed, 0).unwrap(), device: device_id, action, } @@ -70,8 +71,9 @@ impl gpodder::EpisodeActionRepository for SqliteRepository { &self, user: &gpodder::User, actions: Vec, - ) -> Result { - let time_changed = chrono::Utc::now().timestamp(); + time_changed: DateTime, + ) -> Result<(), gpodder::AuthErr> { + let time_changed = time_changed.timestamp(); // TODO optimize this query // 1. The lookup for a device could be replaced with a subquery, although Diesel seems to @@ -99,17 +101,18 @@ impl gpodder::EpisodeActionRepository for SqliteRepository { Ok::<_, diesel::result::Error>(()) })?; - Ok(time_changed + 1) + Ok(()) } fn episode_actions_for_user( &self, user: &gpodder::User, - since: Option, + since: Option>, podcast: Option, device: Option, aggregated: bool, - ) -> Result<(i64, Vec), gpodder::AuthErr> { + ) -> Result, gpodder::AuthErr> { + let since = since.map(|ts| ts.timestamp()).unwrap_or(0); let conn = &mut self.pool.get()?; let mut query = episode_actions::table @@ -117,7 +120,7 @@ impl gpodder::EpisodeActionRepository for SqliteRepository { .filter( episode_actions::user_id .eq(user.id) - .and(episode_actions::time_changed.ge(since.unwrap_or(0))), + .and(episode_actions::time_changed.ge(since)), ) .select(( devices::device_id.nullable(), @@ -157,16 +160,11 @@ impl gpodder::EpisodeActionRepository for SqliteRepository { query.get_results(conn)? }; - let max_timestamp = db_actions - .iter() - .map(|(_, a)| a.time_changed) - .max() - .unwrap_or(0); let actions = db_actions .into_iter() .map(gpodder::EpisodeAction::from) .collect(); - Ok((max_timestamp + 1, actions)) + Ok(actions) } } diff --git a/src/gpodder/mod.rs b/src/gpodder/mod.rs index 18f23b2..0e89cb9 100644 --- a/src/gpodder/mod.rs +++ b/src/gpodder/mod.rs @@ -1,6 +1,7 @@ pub mod models; mod repository; +use chrono::{DateTime, Utc}; pub use models::*; pub use repository::GpodderRepository; @@ -11,9 +12,15 @@ pub enum AuthErr { Other(Box), } -pub trait Store: AuthStore + DeviceRepository + SubscriptionRepository {} +pub trait Store: + AuthStore + DeviceRepository + SubscriptionRepository + EpisodeActionRepository +{ +} -impl Store for T where T: AuthStore + DeviceRepository + SubscriptionRepository {} +impl Store for T where + T: AuthStore + DeviceRepository + SubscriptionRepository + EpisodeActionRepository +{ +} pub trait AuthRepository { /// Validate the given session ID and return its user. @@ -78,7 +85,7 @@ pub trait SubscriptionRepository { user: &User, device_id: &str, urls: Vec, - time_changed: chrono::DateTime, + time_changed: DateTime, ) -> Result<(), AuthErr>; /// Update the list of subscriptions for a device by adding and removing the given URLs @@ -88,7 +95,7 @@ pub trait SubscriptionRepository { device_id: &str, add: Vec, remove: Vec, - time_changed: chrono::DateTime, + time_changed: DateTime, ) -> Result<(), AuthErr>; /// Returns the changes in subscriptions since the given timestamp. @@ -96,22 +103,26 @@ pub trait SubscriptionRepository { &self, user: &User, device_id: &str, - since: chrono::DateTime, - ) -> Result<(chrono::DateTime, Vec, Vec), AuthErr>; + since: DateTime, + ) -> Result<(DateTime, Vec, Vec), AuthErr>; } pub trait EpisodeActionRepository { /// Insert the given episode actions into the datastore. - fn add_episode_actions(&self, user: &User, actions: Vec) - -> Result; + fn add_episode_actions( + &self, + user: &User, + actions: Vec, + time_changed: DateTime, + ) -> Result<(), AuthErr>; /// Retrieve the list of episode actions for the given user. fn episode_actions_for_user( &self, user: &User, - since: Option, + since: Option>, podcast: Option, device: Option, aggregated: bool, - ) -> Result<(i64, Vec), AuthErr>; + ) -> Result, AuthErr>; } diff --git a/src/gpodder/models.rs b/src/gpodder/models.rs index 2ecb004..1c9e550 100644 --- a/src/gpodder/models.rs +++ b/src/gpodder/models.rs @@ -1,4 +1,4 @@ -use chrono::{DateTime, NaiveDateTime, Utc}; +use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; #[derive(Clone)] @@ -52,7 +52,8 @@ pub enum EpisodeActionType { pub struct EpisodeAction { pub podcast: String, pub episode: String, - pub timestamp: Option, + pub timestamp: Option>, + pub time_changed: DateTime, #[serde(default)] pub device: Option, #[serde(flatten)] diff --git a/src/gpodder/repository.rs b/src/gpodder/repository.rs index e0d9d12..9913c9d 100644 --- a/src/gpodder/repository.rs +++ b/src/gpodder/repository.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use argon2::{Argon2, PasswordHash, PasswordVerifier}; +use chrono::{DateTime, TimeDelta, Utc}; use rand::Rng; use super::{models, AuthErr, Store}; @@ -26,9 +27,7 @@ impl GpodderRepository { .ok_or(AuthErr::UnknownSession)?; // Expired sessions still in the database are considered removed - if chrono::Utc::now() - session.last_seen - > chrono::TimeDelta::new(MAX_SESSION_AGE, 0).unwrap() - { + if Utc::now() - session.last_seen > TimeDelta::new(MAX_SESSION_AGE, 0).unwrap() { Err(AuthErr::UnknownSession) } else { Ok(session) @@ -57,7 +56,7 @@ impl GpodderRepository { pub fn create_session(&self, user: &models::User) -> Result { let session = models::Session { id: rand::thread_rng().gen(), - last_seen: chrono::Utc::now(), + last_seen: Utc::now(), user: user.clone(), }; @@ -100,13 +99,13 @@ impl GpodderRepository { user: &models::User, device_id: &str, urls: Vec, - ) -> Result, AuthErr> { - let time_changed = chrono::Utc::now(); + ) -> Result, AuthErr> { + let time_changed = Utc::now(); self.store .set_subscriptions_for_device(user, device_id, urls, time_changed)?; - Ok(time_changed + chrono::TimeDelta::seconds(1)) + Ok(time_changed + TimeDelta::seconds(1)) } pub fn update_subscriptions_for_device( @@ -115,29 +114,55 @@ impl GpodderRepository { device_id: &str, add: Vec, remove: Vec, - ) -> Result, AuthErr> { - let time_changed = chrono::Utc::now(); + ) -> Result, AuthErr> { + let time_changed = Utc::now(); self.store .update_subscriptions_for_device(user, device_id, add, remove, time_changed)?; - Ok(time_changed + chrono::TimeDelta::seconds(1)) + Ok(time_changed + TimeDelta::seconds(1)) } pub fn subscription_updates_for_device( &self, user: &models::User, device_id: &str, - since: chrono::DateTime, - ) -> Result<(chrono::DateTime, Vec, Vec), AuthErr> { + since: DateTime, + ) -> Result<(DateTime, Vec, Vec), AuthErr> { let (max_time_changed, added, removed) = self .store .subscription_updates_for_device(user, device_id, since)?; - Ok(( - max_time_changed + chrono::TimeDelta::seconds(1), - added, - removed, - )) + Ok((max_time_changed + TimeDelta::seconds(1), added, removed)) + } + + pub fn add_episode_actions( + &self, + user: &models::User, + actions: Vec, + ) -> Result, AuthErr> { + let time_changed = Utc::now(); + + self.store + .add_episode_actions(user, actions, time_changed)?; + + Ok(time_changed + TimeDelta::seconds(1)) + } + + pub fn episode_actions_for_user( + &self, + user: &models::User, + since: Option>, + podcast: Option, + device: Option, + aggregated: bool, + ) -> Result<(DateTime, Vec), AuthErr> { + let now = chrono::Utc::now(); + let actions = self + .store + .episode_actions_for_user(user, since, podcast, device, aggregated)?; + let max_time_changed = actions.iter().map(|a| a.time_changed).max().unwrap_or(now); + + Ok((max_time_changed + TimeDelta::seconds(1), actions)) } } diff --git a/src/server/gpodder/advanced/episodes.rs b/src/server/gpodder/advanced/episodes.rs index b862b9c..22839c0 100644 --- a/src/server/gpodder/advanced/episodes.rs +++ b/src/server/gpodder/advanced/episodes.rs @@ -4,10 +4,11 @@ use axum::{ routing::post, Extension, Json, Router, }; +use chrono::DateTime; use serde::{Deserialize, Serialize}; use crate::{ - gpodder::{self, EpisodeActionRepository}, + gpodder, server::{ error::{AppError, AppResult}, gpodder::{ @@ -43,12 +44,12 @@ async fn post_episode_actions( } Ok( - tokio::task::spawn_blocking(move || ctx.repo.add_episode_actions(&user, actions)) + tokio::task::spawn_blocking(move || ctx.store.add_episode_actions(&user, actions)) .await .unwrap() - .map(|timestamp| { + .map(|time_changed| { Json(UpdatedUrlsResponse { - timestamp, + timestamp: time_changed.timestamp(), update_urls: Vec::new(), }) })?, @@ -84,10 +85,15 @@ async fn get_episode_actions( return Err(AppError::BadRequest); } + let since = filter + .since + .map(|ts| DateTime::from_timestamp(ts, 0)) + .flatten(); + Ok(tokio::task::spawn_blocking(move || { - ctx.repo.episode_actions_for_user( + ctx.store.episode_actions_for_user( &user, - filter.since, + since, filter.podcast, filter.device, filter.aggregated, @@ -95,5 +101,10 @@ async fn get_episode_actions( }) .await .unwrap() - .map(|(timestamp, actions)| Json(EpisodeActionsResponse { timestamp, actions }))?) + .map(|(ts, actions)| { + Json(EpisodeActionsResponse { + timestamp: ts.timestamp(), + actions, + }) + })?) } diff --git a/src/server/mod.rs b/src/server/mod.rs index 0a88fd6..93ac8d3 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -6,7 +6,6 @@ use tower_http::trace::TraceLayer; #[derive(Clone)] pub struct Context { - pub repo: crate::db::SqliteRepository, pub store: crate::gpodder::GpodderRepository, }