From 78a274e01f4f942d2f7ef280328baa778de6e92d Mon Sep 17 00:00:00 2001 From: Jef Roosens Date: Sat, 15 Mar 2025 16:12:46 +0100 Subject: [PATCH 01/14] feat: add last_seen field to sessions --- src/db/models/session.rs | 15 ++++++++++----- src/db/repository/auth.rs | 1 + src/db/schema.rs | 1 + 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/db/models/session.rs b/src/db/models/session.rs index 10ee4a1..273550a 100644 --- a/src/db/models/session.rs +++ b/src/db/models/session.rs @@ -11,16 +11,21 @@ use crate::db::{schema::*, DbPool, DbResult}; pub struct Session { pub id: i64, pub user_id: i64, + pub last_seen: i64, } impl Session { - pub fn new_for_user(pool: &DbPool, user_id: i64) -> DbResult { + pub fn new_for_user(pool: &DbPool, user_id: i64, last_seen: i64) -> DbResult { let id: i64 = rand::thread_rng().gen(); - Ok(Self { id, user_id } - .insert_into(sessions::table) - .returning(Self::as_returning()) - .get_result(&mut pool.get()?)?) + Ok(Self { + id, + user_id, + last_seen, + } + .insert_into(sessions::table) + .returning(Self::as_returning()) + .get_result(&mut pool.get()?)?) } pub fn user_from_id(pool: &DbPool, id: i64) -> DbResult> { diff --git a/src/db/repository/auth.rs b/src/db/repository/auth.rs index ff30f09..453c1f5 100644 --- a/src/db/repository/auth.rs +++ b/src/db/repository/auth.rs @@ -77,6 +77,7 @@ impl gpodder::AuthRepository for SqliteRepository { let session_id = db::Session { id, user_id: user.id, + last_seen: chrono::Utc::now().timestamp(), } .insert_into(sessions::table) .returning(sessions::id) diff --git a/src/db/schema.rs b/src/db/schema.rs index fe21dfe..2f597f1 100644 --- a/src/db/schema.rs +++ b/src/db/schema.rs @@ -41,6 +41,7 @@ diesel::table! { sessions (id) { id -> BigInt, user_id -> BigInt, + last_seen -> BigInt, } } From 54a723f803150d1060bac6636d8d4c6f7318b25a Mon Sep 17 00:00:00 2001 From: Jef Roosens Date: Sat, 15 Mar 2025 18:28:40 +0100 Subject: [PATCH 02/14] refactor: moved auth business logic outside of db using store abstraction --- src/cli/serve.rs | 4 +- src/db/repository/auth.rs | 60 +++++++++++++++++++++++ src/gpodder/mod.rs | 16 +++++++ src/gpodder/models.rs | 9 +++- src/gpodder/repository.rs | 74 +++++++++++++++++++++++++++++ src/server/gpodder/advanced/auth.rs | 23 ++++----- src/server/gpodder/mod.rs | 13 ++--- src/server/mod.rs | 1 + 8 files changed, 179 insertions(+), 21 deletions(-) create mode 100644 src/gpodder/repository.rs diff --git a/src/cli/serve.rs b/src/cli/serve.rs index a0ff0fe..1183814 100644 --- a/src/cli/serve.rs +++ b/src/cli/serve.rs @@ -6,9 +6,11 @@ pub fn serve(config: &crate::config::Config) -> u8 { tracing::info!("Initializing database and running migrations"); let pool = db::initialize_db(config.data_dir.join(crate::DB_FILENAME), true).unwrap(); + let repo = db::SqliteRepository::from(pool); let ctx = server::Context { - repo: db::SqliteRepository::from(pool), + repo: repo.clone(), + store: crate::gpodder::GpodderRepository::new(Box::new(repo)), }; let app = server::app(ctx); diff --git a/src/db/repository/auth.rs b/src/db/repository/auth.rs index 453c1f5..e65f046 100644 --- a/src/db/repository/auth.rs +++ b/src/db/repository/auth.rs @@ -1,3 +1,4 @@ +use chrono::DateTime; use diesel::prelude::*; use rand::Rng; @@ -19,6 +20,16 @@ impl From for gpodder::AuthErr { } } +impl From for gpodder::User { + fn from(value: db::User) -> Self { + Self { + id: value.id, + username: value.username, + password_hash: value.password_hash, + } + } +} + impl gpodder::AuthRepository for SqliteRepository { fn validate_credentials( &self, @@ -35,6 +46,7 @@ impl gpodder::AuthRepository for SqliteRepository { Ok(gpodder::User { id: user.id, username: user.username, + password_hash: user.password_hash, }) } else { Err(gpodder::AuthErr::InvalidPassword) @@ -54,6 +66,7 @@ impl gpodder::AuthRepository for SqliteRepository { Ok(user) => Ok(gpodder::User { id: user.id, username: user.username, + password_hash: user.password_hash, }), Err(diesel::result::Error::NotFound) => Err(gpodder::AuthErr::UnknownSession), Err(err) => Err(gpodder::AuthErr::Other(Box::new(err))), @@ -88,6 +101,7 @@ impl gpodder::AuthRepository for SqliteRepository { gpodder::User { id: user.id, username: user.username, + password_hash: user.password_hash, }, )) } else { @@ -122,3 +136,49 @@ impl gpodder::AuthRepository for SqliteRepository { } } } + +impl gpodder::AuthStore for SqliteRepository { + fn get_user(&self, username: &str) -> Result, AuthErr> { + Ok(users::table + .select(db::User::as_select()) + .filter(users::username.eq(username)) + .first(&mut self.pool.get()?) + .optional()? + .map(gpodder::User::from)) + } + + fn get_session(&self, session_id: i64) -> Result, AuthErr> { + match sessions::table + .inner_join(users::table) + .filter(sessions::id.eq(session_id)) + .select((db::Session::as_select(), db::User::as_select())) + .get_result(&mut self.pool.get()?) + { + Ok((session, user)) => Ok(Some(gpodder::Session { + id: session.id, + last_seen: DateTime::from_timestamp(session.last_seen, 0).unwrap(), + user: user.into(), + })), + Err(err) => Err(AuthErr::from(err)), + } + } + + fn remove_session(&self, session_id: i64) -> Result<(), AuthErr> { + Ok( + diesel::delete(sessions::table.filter(sessions::id.eq(session_id))) + .execute(&mut self.pool.get()?) + .map(|_| ())?, + ) + } + + fn insert_session(&self, session: &gpodder::Session) -> Result<(), AuthErr> { + Ok(db::Session { + id: session.id, + user_id: session.user.id, + last_seen: session.last_seen.timestamp(), + } + .insert_into(sessions::table) + .execute(&mut self.pool.get()?) + .map(|_| ())?) + } +} diff --git a/src/gpodder/mod.rs b/src/gpodder/mod.rs index 4ae79d3..567badf 100644 --- a/src/gpodder/mod.rs +++ b/src/gpodder/mod.rs @@ -1,6 +1,8 @@ pub mod models; +mod repository; pub use models::*; +pub use repository::GpodderRepository; pub enum AuthErr { UnknownSession, @@ -27,6 +29,20 @@ pub trait AuthRepository { fn remove_session(&self, username: &str, session_id: i64) -> Result<(), AuthErr>; } +pub trait AuthStore { + /// Retrieve the session with the given session ID + fn get_session(&self, session_id: i64) -> Result, AuthErr>; + + /// Retrieve the user with the given username + fn get_user(&self, username: &str) -> Result, AuthErr>; + + /// Create a new session for a user with the given session ID + fn insert_session(&self, session: &Session) -> Result<(), AuthErr>; + + /// Remove the session with the given session ID + fn remove_session(&self, session_id: i64) -> Result<(), AuthErr>; +} + pub trait DeviceRepository { /// Return all devices associated with the user fn devices_for_user(&self, user: &User) -> Result, AuthErr>; diff --git a/src/gpodder/models.rs b/src/gpodder/models.rs index 39a615c..2ecb004 100644 --- a/src/gpodder/models.rs +++ b/src/gpodder/models.rs @@ -1,10 +1,11 @@ -use chrono::NaiveDateTime; +use chrono::{DateTime, NaiveDateTime, Utc}; use serde::{Deserialize, Serialize}; #[derive(Clone)] pub struct User { pub id: i64, pub username: String, + pub password_hash: String, } #[derive(Serialize, Deserialize)] @@ -57,3 +58,9 @@ pub struct EpisodeAction { #[serde(flatten)] pub action: EpisodeActionType, } + +pub struct Session { + pub id: i64, + pub last_seen: DateTime, + pub user: User, +} diff --git a/src/gpodder/repository.rs b/src/gpodder/repository.rs new file mode 100644 index 0000000..a6fb0b1 --- /dev/null +++ b/src/gpodder/repository.rs @@ -0,0 +1,74 @@ +use std::sync::Arc; + +use argon2::{Argon2, PasswordHash, PasswordVerifier}; +use rand::Rng; + +use super::{models, AuthErr, AuthStore}; + +const MAX_SESSION_AGE: i64 = 60 * 60 * 24 * 7; + +type Store = dyn AuthStore + Send + Sync; + +#[derive(Clone)] +pub struct GpodderRepository { + store: Arc, +} + +impl GpodderRepository { + pub fn new(store: Box) -> Self { + Self { + store: Arc::from(store), + } + } + + pub fn validate_session(&self, session_id: i64) -> Result { + let session = self + .store + .get_session(session_id)? + .ok_or(AuthErr::UnknownSession)?; + + // Expired sessions still in the database are considered removed + if chrono::Utc::now() - session.last_seen + > chrono::TimeDelta::new(MAX_SESSION_AGE, 0).unwrap() + { + Err(AuthErr::UnknownSession) + } else { + Ok(session) + } + } + + pub fn validate_credentials( + &self, + username: &str, + password: &str, + ) -> Result { + let user = self.store.get_user(username)?.ok_or(AuthErr::UnknownUser)?; + + let password_hash = PasswordHash::new(&user.password_hash).unwrap(); + + if Argon2::default() + .verify_password(password.as_bytes(), &password_hash) + .is_ok() + { + Ok(user) + } else { + Err(AuthErr::InvalidPassword) + } + } + + pub fn create_session(&self, user: &models::User) -> Result { + let session = models::Session { + id: rand::thread_rng().gen(), + last_seen: chrono::Utc::now(), + user: user.clone(), + }; + + self.store.insert_session(&session)?; + + Ok(session) + } + + pub fn remove_session(&self, session_id: i64) -> Result<(), AuthErr> { + self.store.remove_session(session_id) + } +} diff --git a/src/server/gpodder/advanced/auth.rs b/src/server/gpodder/advanced/auth.rs index abdb0d6..ac5cbe6 100644 --- a/src/server/gpodder/advanced/auth.rs +++ b/src/server/gpodder/advanced/auth.rs @@ -12,13 +12,10 @@ use axum_extra::{ TypedHeader, }; -use crate::{ - gpodder::AuthRepository, - server::{ - error::{AppError, AppResult}, - gpodder::SESSION_ID_COOKIE, - Context, - }, +use crate::server::{ + error::{AppError, AppResult}, + gpodder::SESSION_ID_COOKIE, + Context, }; pub fn router() -> Router { @@ -38,14 +35,17 @@ async fn post_login( return Err(AppError::BadRequest); } - let (session_id, _) = tokio::task::spawn_blocking(move || { - ctx.repo.create_session(auth.username(), auth.password()) + let session = tokio::task::spawn_blocking(move || { + let user = ctx + .store + .validate_credentials(auth.username(), auth.password())?; + ctx.store.create_session(&user) }) .await .unwrap()?; Ok(jar.add( - Cookie::build((SESSION_ID_COOKIE, session_id.to_string())).expires(Expiration::Session), + Cookie::build((SESSION_ID_COOKIE, session.id.to_string())).expires(Expiration::Session), )) } @@ -60,7 +60,8 @@ async fn post_logout( .parse() .map_err(|_| AppError::BadRequest)?; - tokio::task::spawn_blocking(move || ctx.repo.remove_session(&username, session_id)) + // TODO reintroduce username check + tokio::task::spawn_blocking(move || ctx.store.remove_session(session_id)) .await .unwrap()?; diff --git a/src/server/gpodder/mod.rs b/src/server/gpodder/mod.rs index 5483935..776100e 100644 --- a/src/server/gpodder/mod.rs +++ b/src/server/gpodder/mod.rs @@ -17,10 +17,7 @@ use axum_extra::{ }; use tower_http::set_header::SetResponseHeaderLayer; -use crate::{ - gpodder::{self, AuthRepository}, - server::error::AppError, -}; +use crate::{gpodder, server::error::AppError}; use super::Context; @@ -51,12 +48,12 @@ pub async fn auth_middleware(State(ctx): State, mut req: Request, next: .and_then(|c| c.value().parse::().ok()) { let ctx_clone = ctx.clone(); - match tokio::task::spawn_blocking(move || ctx_clone.repo.validate_session(session_id)) + match tokio::task::spawn_blocking(move || ctx_clone.store.validate_session(session_id)) .await .unwrap() { - Ok(user) => { - auth_user = Some(user); + Ok(session) => { + auth_user = Some(session.user); } Err(gpodder::AuthErr::UnknownSession) => { jar = jar.add( @@ -77,7 +74,7 @@ pub async fn auth_middleware(State(ctx): State, mut req: Request, next: .await { match tokio::task::spawn_blocking(move || { - ctx.repo + ctx.store .validate_credentials(auth.username(), auth.password()) }) .await diff --git a/src/server/mod.rs b/src/server/mod.rs index 3e7a0e1..0a88fd6 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -7,6 +7,7 @@ use tower_http::trace::TraceLayer; #[derive(Clone)] pub struct Context { pub repo: crate::db::SqliteRepository, + pub store: crate::gpodder::GpodderRepository, } pub fn app(ctx: Context) -> Router { From 3a5a6759acac45f8532cdd1ed9b709bc83316f97 Mon Sep 17 00:00:00 2001 From: Jef Roosens Date: Sat, 15 Mar 2025 19:01:38 +0100 Subject: [PATCH 03/14] refactor: migrate devices to store --- src/cli/serve.rs | 2 +- src/gpodder/mod.rs | 4 ++++ src/gpodder/repository.rs | 23 +++++++++++++++++------ src/server/gpodder/advanced/devices.rs | 6 +++--- 4 files changed, 25 insertions(+), 10 deletions(-) diff --git a/src/cli/serve.rs b/src/cli/serve.rs index 1183814..e1d0fa3 100644 --- a/src/cli/serve.rs +++ b/src/cli/serve.rs @@ -10,7 +10,7 @@ pub fn serve(config: &crate::config::Config) -> u8 { let ctx = server::Context { repo: repo.clone(), - store: crate::gpodder::GpodderRepository::new(Box::new(repo)), + store: crate::gpodder::GpodderRepository::new(repo), }; let app = server::app(ctx); diff --git a/src/gpodder/mod.rs b/src/gpodder/mod.rs index 567badf..eb6daa2 100644 --- a/src/gpodder/mod.rs +++ b/src/gpodder/mod.rs @@ -11,6 +11,10 @@ pub enum AuthErr { Other(Box), } +pub trait Store: AuthStore + DeviceRepository {} + +impl Store for T where T: AuthStore + DeviceRepository {} + pub trait AuthRepository { /// Validate the given session ID and return its user. fn validate_session(&self, session_id: i64) -> Result; diff --git a/src/gpodder/repository.rs b/src/gpodder/repository.rs index a6fb0b1..9314369 100644 --- a/src/gpodder/repository.rs +++ b/src/gpodder/repository.rs @@ -3,21 +3,19 @@ use std::sync::Arc; use argon2::{Argon2, PasswordHash, PasswordVerifier}; use rand::Rng; -use super::{models, AuthErr, AuthStore}; +use super::{models, AuthErr, Store}; const MAX_SESSION_AGE: i64 = 60 * 60 * 24 * 7; -type Store = dyn AuthStore + Send + Sync; - #[derive(Clone)] pub struct GpodderRepository { - store: Arc, + store: Arc, } impl GpodderRepository { - pub fn new(store: Box) -> Self { + pub fn new(store: impl Store + Send + Sync + 'static) -> Self { Self { - store: Arc::from(store), + store: Arc::new(store), } } @@ -71,4 +69,17 @@ impl GpodderRepository { pub fn remove_session(&self, session_id: i64) -> Result<(), AuthErr> { self.store.remove_session(session_id) } + + pub fn devices_for_user(&self, user: &models::User) -> Result, AuthErr> { + self.store.devices_for_user(user) + } + + pub fn update_device_info( + &self, + user: &models::User, + device_id: &str, + patch: models::DevicePatch, + ) -> Result<(), AuthErr> { + self.store.update_device_info(user, device_id, patch) + } } diff --git a/src/server/gpodder/advanced/devices.rs b/src/server/gpodder/advanced/devices.rs index bb90dc2..397ddb3 100644 --- a/src/server/gpodder/advanced/devices.rs +++ b/src/server/gpodder/advanced/devices.rs @@ -6,7 +6,7 @@ use axum::{ }; use crate::{ - gpodder::{self, DeviceRepository}, + gpodder, server::{ error::{AppError, AppResult}, gpodder::{ @@ -38,7 +38,7 @@ async fn get_devices( } Ok( - tokio::task::spawn_blocking(move || ctx.repo.devices_for_user(&user)) + tokio::task::spawn_blocking(move || ctx.store.devices_for_user(&user)) .await .unwrap() .map(Json)?, @@ -55,7 +55,7 @@ async fn post_device( return Err(AppError::NotFound); } - tokio::task::spawn_blocking(move || ctx.repo.update_device_info(&user, &id, patch)) + tokio::task::spawn_blocking(move || ctx.store.update_device_info(&user, &id, patch)) .await .unwrap()?; From da7befc5c4799109fc45ff478ec8caa58ba3a054 Mon Sep 17 00:00:00 2001 From: Jef Roosens Date: Sat, 15 Mar 2025 19:19:18 +0100 Subject: [PATCH 04/14] refactor: migrate subscriptions API to store --- src/db/repository/subscription.rs | 28 ++++++---- src/gpodder/mod.rs | 14 +++-- src/gpodder/repository.rs | 58 ++++++++++++++++++++ src/server/gpodder/advanced/subscriptions.rs | 17 +++--- src/server/gpodder/simple/subscriptions.rs | 20 +++---- 5 files changed, 101 insertions(+), 36 deletions(-) diff --git a/src/db/repository/subscription.rs b/src/db/repository/subscription.rs index febe9dc..6ab8fe5 100644 --- a/src/db/repository/subscription.rs +++ b/src/db/repository/subscription.rs @@ -42,9 +42,10 @@ impl gpodder::SubscriptionRepository for SqliteRepository { user: &gpodder::User, device_id: &str, urls: Vec, - ) -> Result { + time_changed: chrono::DateTime, + ) -> Result<(), gpodder::AuthErr> { // TODO use a better timestamp - let timestamp = chrono::Utc::now().timestamp(); + let timestamp = time_changed.timestamp(); self.pool.get()?.transaction(|conn| { let device = devices::table @@ -126,7 +127,7 @@ impl gpodder::SubscriptionRepository for SqliteRepository { Ok::<_, diesel::result::Error>(()) })?; - Ok(timestamp + 1) + Ok(()) } fn update_subscriptions_for_device( @@ -135,9 +136,10 @@ impl gpodder::SubscriptionRepository for SqliteRepository { device_id: &str, add: Vec, remove: Vec, - ) -> Result { + time_changed: chrono::DateTime, + ) -> Result<(), gpodder::AuthErr> { // TODO use a better timestamp - let timestamp = chrono::Utc::now().timestamp_millis(); + let timestamp = time_changed.timestamp(); // TODO URLs that are in both the added and removed lists will currently get "re-added", // meaning their change timestamp will be updated even though they haven't really changed. @@ -220,16 +222,18 @@ impl gpodder::SubscriptionRepository for SqliteRepository { Ok::<_, diesel::result::Error>(()) })?; - Ok(timestamp + 1) + Ok(()) } fn subscription_updates_for_device( &self, user: &gpodder::User, device_id: &str, - since: i64, - ) -> Result<(i64, Vec, Vec), gpodder::AuthErr> { - let (mut timestamp, mut added, mut removed) = (0, Vec::new(), Vec::new()); + since: chrono::DateTime, + ) -> Result<(chrono::DateTime, Vec, Vec), gpodder::AuthErr> { + let since = since.timestamp(); + + let (mut added, mut removed) = (Vec::new(), Vec::new()); let query = device_subscriptions::table .inner_join(devices::table) @@ -241,6 +245,8 @@ impl gpodder::SubscriptionRepository for SqliteRepository { ) .select(db::DeviceSubscription::as_select()); + let mut max_time: chrono::DateTime = chrono::DateTime::::MIN_UTC; + for sub in query.load_iter(&mut self.pool.get()?)? { let sub = sub?; @@ -250,9 +256,9 @@ impl gpodder::SubscriptionRepository for SqliteRepository { added.push(sub.podcast_url); } - timestamp = timestamp.max(sub.time_changed); + max_time = max_time.max(chrono::DateTime::from_timestamp(sub.time_changed, 0).unwrap()); } - Ok((timestamp + 1, added, removed)) + Ok((max_time, added, removed)) } } diff --git a/src/gpodder/mod.rs b/src/gpodder/mod.rs index eb6daa2..18f23b2 100644 --- a/src/gpodder/mod.rs +++ b/src/gpodder/mod.rs @@ -11,9 +11,9 @@ pub enum AuthErr { Other(Box), } -pub trait Store: AuthStore + DeviceRepository {} +pub trait Store: AuthStore + DeviceRepository + SubscriptionRepository {} -impl Store for T where T: AuthStore + DeviceRepository {} +impl Store for T where T: AuthStore + DeviceRepository + SubscriptionRepository {} pub trait AuthRepository { /// Validate the given session ID and return its user. @@ -78,7 +78,8 @@ pub trait SubscriptionRepository { user: &User, device_id: &str, urls: Vec, - ) -> Result; + time_changed: chrono::DateTime, + ) -> Result<(), AuthErr>; /// Update the list of subscriptions for a device by adding and removing the given URLs fn update_subscriptions_for_device( @@ -87,15 +88,16 @@ pub trait SubscriptionRepository { device_id: &str, add: Vec, remove: Vec, - ) -> Result; + time_changed: chrono::DateTime, + ) -> Result<(), AuthErr>; /// Returns the changes in subscriptions since the given timestamp. fn subscription_updates_for_device( &self, user: &User, device_id: &str, - since: i64, - ) -> Result<(i64, Vec, Vec), AuthErr>; + since: chrono::DateTime, + ) -> Result<(chrono::DateTime, Vec, Vec), AuthErr>; } pub trait EpisodeActionRepository { diff --git a/src/gpodder/repository.rs b/src/gpodder/repository.rs index 9314369..e0d9d12 100644 --- a/src/gpodder/repository.rs +++ b/src/gpodder/repository.rs @@ -82,4 +82,62 @@ impl GpodderRepository { ) -> Result<(), AuthErr> { self.store.update_device_info(user, device_id, patch) } + + pub fn subscriptions_for_device( + &self, + user: &models::User, + device_id: &str, + ) -> Result, AuthErr> { + self.store.subscriptions_for_device(user, device_id) + } + + pub fn subscriptions_for_user(&self, user: &models::User) -> Result, AuthErr> { + self.store.subscriptions_for_user(user) + } + + pub fn set_subscriptions_for_device( + &self, + user: &models::User, + device_id: &str, + urls: Vec, + ) -> Result, AuthErr> { + let time_changed = chrono::Utc::now(); + + self.store + .set_subscriptions_for_device(user, device_id, urls, time_changed)?; + + Ok(time_changed + chrono::TimeDelta::seconds(1)) + } + + pub fn update_subscriptions_for_device( + &self, + user: &models::User, + device_id: &str, + add: Vec, + remove: Vec, + ) -> Result, AuthErr> { + let time_changed = chrono::Utc::now(); + + self.store + .update_subscriptions_for_device(user, device_id, add, remove, time_changed)?; + + Ok(time_changed + chrono::TimeDelta::seconds(1)) + } + + pub fn subscription_updates_for_device( + &self, + user: &models::User, + device_id: &str, + since: chrono::DateTime, + ) -> Result<(chrono::DateTime, Vec, Vec), AuthErr> { + let (max_time_changed, added, removed) = self + .store + .subscription_updates_for_device(user, device_id, since)?; + + Ok(( + max_time_changed + chrono::TimeDelta::seconds(1), + added, + removed, + )) + } } diff --git a/src/server/gpodder/advanced/subscriptions.rs b/src/server/gpodder/advanced/subscriptions.rs index 98d464a..dbb8d28 100644 --- a/src/server/gpodder/advanced/subscriptions.rs +++ b/src/server/gpodder/advanced/subscriptions.rs @@ -7,7 +7,7 @@ use axum::{ use serde::Deserialize; use crate::{ - gpodder::{self, SubscriptionRepository}, + gpodder, server::{ error::{AppError, AppResult}, gpodder::{ @@ -43,14 +43,14 @@ pub async fn post_subscription_changes( } Ok(tokio::task::spawn_blocking(move || { - ctx.repo + ctx.store .update_subscriptions_for_device(&user, &id, delta.add, delta.remove) }) .await .unwrap() - .map(|timestamp| { + .map(|time_changed| { Json(UpdatedUrlsResponse { - timestamp, + timestamp: time_changed.timestamp(), update_urls: Vec::new(), }) })?) @@ -76,17 +76,18 @@ pub async fn get_subscription_changes( return Err(AppError::BadRequest); } + let since = chrono::DateTime::from_timestamp(query.since, 0).ok_or(AppError::BadRequest)?; + Ok(tokio::task::spawn_blocking(move || { - ctx.repo - .subscription_updates_for_device(&user, &id, query.since) + ctx.store.subscription_updates_for_device(&user, &id, since) }) .await .unwrap() - .map(|(timestamp, add, remove)| { + .map(|(next_time_changed, add, remove)| { Json(SubscriptionDeltaResponse { add, remove, - timestamp, + timestamp: next_time_changed.timestamp(), }) })?) } diff --git a/src/server/gpodder/simple/subscriptions.rs b/src/server/gpodder/simple/subscriptions.rs index 4e6f7c6..4f6266f 100644 --- a/src/server/gpodder/simple/subscriptions.rs +++ b/src/server/gpodder/simple/subscriptions.rs @@ -6,7 +6,7 @@ use axum::{ }; use crate::{ - gpodder::{self, SubscriptionRepository}, + gpodder, server::{ error::{AppError, AppResult}, gpodder::{auth_middleware, format::StringWithFormat}, @@ -34,7 +34,7 @@ pub async fn get_device_subscriptions( } Ok( - tokio::task::spawn_blocking(move || ctx.repo.subscriptions_for_device(&user, &id)) + tokio::task::spawn_blocking(move || ctx.store.subscriptions_for_device(&user, &id)) .await .unwrap() .map(Json)?, @@ -51,7 +51,7 @@ pub async fn get_user_subscriptions( } Ok( - tokio::task::spawn_blocking(move || ctx.repo.subscriptions_for_user(&user)) + tokio::task::spawn_blocking(move || ctx.store.subscriptions_for_user(&user)) .await .unwrap() .map(Json)?, @@ -68,12 +68,10 @@ pub async fn put_device_subscriptions( return Err(AppError::BadRequest); } - Ok( - tokio::task::spawn_blocking(move || { - ctx.repo.set_subscriptions_for_device(&user, &id, urls) - }) - .await - .unwrap() - .map(|_| ())?, - ) + Ok(tokio::task::spawn_blocking(move || { + ctx.store.set_subscriptions_for_device(&user, &id, urls) + }) + .await + .unwrap() + .map(|_| ())?) } From 8a5e625e6f8ed47d3ffb3971ee5461e89866cf2e Mon Sep 17 00:00:00 2001 From: Jef Roosens Date: Sat, 15 Mar 2025 19:34:27 +0100 Subject: [PATCH 05/14] refactor: migrated episode actions API to store --- src/cli/serve.rs | 1 - src/db/repository/episode_action.rs | 28 ++++++------ src/gpodder/mod.rs | 31 ++++++++----- src/gpodder/models.rs | 5 ++- src/gpodder/repository.rs | 59 ++++++++++++++++++------- src/server/gpodder/advanced/episodes.rs | 25 ++++++++--- src/server/mod.rs | 1 - 7 files changed, 97 insertions(+), 53 deletions(-) diff --git a/src/cli/serve.rs b/src/cli/serve.rs index e1d0fa3..664a1e0 100644 --- a/src/cli/serve.rs +++ b/src/cli/serve.rs @@ -9,7 +9,6 @@ pub fn serve(config: &crate::config::Config) -> u8 { let repo = db::SqliteRepository::from(pool); let ctx = server::Context { - repo: repo.clone(), store: crate::gpodder::GpodderRepository::new(repo), }; let app = server::app(ctx); diff --git a/src/db/repository/episode_action.rs b/src/db/repository/episode_action.rs index 3f66bc9..5a45cd2 100644 --- a/src/db/repository/episode_action.rs +++ b/src/db/repository/episode_action.rs @@ -1,4 +1,4 @@ -use chrono::DateTime; +use chrono::{DateTime, Utc}; use diesel::prelude::*; use super::SqliteRepository; @@ -26,7 +26,7 @@ impl From for db::NewEpisodeAction { podcast_url: value.podcast, episode_url: value.episode, time_changed: 0, - timestamp: value.timestamp.map(|t| t.and_utc().timestamp()), + timestamp: value.timestamp.map(|t| t.timestamp()), action, started, position, @@ -58,7 +58,8 @@ impl From<(Option, db::EpisodeAction)> for gpodder::EpisodeAction { // SAFETY the input to the from_timestamp function is always the result of a // previous timestamp() function call, which is guaranteed to be each other's // reverse - .map(|ts| DateTime::from_timestamp(ts, 0).unwrap().naive_utc()), + .map(|ts| DateTime::from_timestamp(ts, 0).unwrap()), + time_changed: DateTime::from_timestamp(db_action.time_changed, 0).unwrap(), device: device_id, action, } @@ -70,8 +71,9 @@ impl gpodder::EpisodeActionRepository for SqliteRepository { &self, user: &gpodder::User, actions: Vec, - ) -> Result { - let time_changed = chrono::Utc::now().timestamp(); + time_changed: DateTime, + ) -> Result<(), gpodder::AuthErr> { + let time_changed = time_changed.timestamp(); // TODO optimize this query // 1. The lookup for a device could be replaced with a subquery, although Diesel seems to @@ -99,17 +101,18 @@ impl gpodder::EpisodeActionRepository for SqliteRepository { Ok::<_, diesel::result::Error>(()) })?; - Ok(time_changed + 1) + Ok(()) } fn episode_actions_for_user( &self, user: &gpodder::User, - since: Option, + since: Option>, podcast: Option, device: Option, aggregated: bool, - ) -> Result<(i64, Vec), gpodder::AuthErr> { + ) -> Result, gpodder::AuthErr> { + let since = since.map(|ts| ts.timestamp()).unwrap_or(0); let conn = &mut self.pool.get()?; let mut query = episode_actions::table @@ -117,7 +120,7 @@ impl gpodder::EpisodeActionRepository for SqliteRepository { .filter( episode_actions::user_id .eq(user.id) - .and(episode_actions::time_changed.ge(since.unwrap_or(0))), + .and(episode_actions::time_changed.ge(since)), ) .select(( devices::device_id.nullable(), @@ -157,16 +160,11 @@ impl gpodder::EpisodeActionRepository for SqliteRepository { query.get_results(conn)? }; - let max_timestamp = db_actions - .iter() - .map(|(_, a)| a.time_changed) - .max() - .unwrap_or(0); let actions = db_actions .into_iter() .map(gpodder::EpisodeAction::from) .collect(); - Ok((max_timestamp + 1, actions)) + Ok(actions) } } diff --git a/src/gpodder/mod.rs b/src/gpodder/mod.rs index 18f23b2..0e89cb9 100644 --- a/src/gpodder/mod.rs +++ b/src/gpodder/mod.rs @@ -1,6 +1,7 @@ pub mod models; mod repository; +use chrono::{DateTime, Utc}; pub use models::*; pub use repository::GpodderRepository; @@ -11,9 +12,15 @@ pub enum AuthErr { Other(Box), } -pub trait Store: AuthStore + DeviceRepository + SubscriptionRepository {} +pub trait Store: + AuthStore + DeviceRepository + SubscriptionRepository + EpisodeActionRepository +{ +} -impl Store for T where T: AuthStore + DeviceRepository + SubscriptionRepository {} +impl Store for T where + T: AuthStore + DeviceRepository + SubscriptionRepository + EpisodeActionRepository +{ +} pub trait AuthRepository { /// Validate the given session ID and return its user. @@ -78,7 +85,7 @@ pub trait SubscriptionRepository { user: &User, device_id: &str, urls: Vec, - time_changed: chrono::DateTime, + time_changed: DateTime, ) -> Result<(), AuthErr>; /// Update the list of subscriptions for a device by adding and removing the given URLs @@ -88,7 +95,7 @@ pub trait SubscriptionRepository { device_id: &str, add: Vec, remove: Vec, - time_changed: chrono::DateTime, + time_changed: DateTime, ) -> Result<(), AuthErr>; /// Returns the changes in subscriptions since the given timestamp. @@ -96,22 +103,26 @@ pub trait SubscriptionRepository { &self, user: &User, device_id: &str, - since: chrono::DateTime, - ) -> Result<(chrono::DateTime, Vec, Vec), AuthErr>; + since: DateTime, + ) -> Result<(DateTime, Vec, Vec), AuthErr>; } pub trait EpisodeActionRepository { /// Insert the given episode actions into the datastore. - fn add_episode_actions(&self, user: &User, actions: Vec) - -> Result; + fn add_episode_actions( + &self, + user: &User, + actions: Vec, + time_changed: DateTime, + ) -> Result<(), AuthErr>; /// Retrieve the list of episode actions for the given user. fn episode_actions_for_user( &self, user: &User, - since: Option, + since: Option>, podcast: Option, device: Option, aggregated: bool, - ) -> Result<(i64, Vec), AuthErr>; + ) -> Result, AuthErr>; } diff --git a/src/gpodder/models.rs b/src/gpodder/models.rs index 2ecb004..1c9e550 100644 --- a/src/gpodder/models.rs +++ b/src/gpodder/models.rs @@ -1,4 +1,4 @@ -use chrono::{DateTime, NaiveDateTime, Utc}; +use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; #[derive(Clone)] @@ -52,7 +52,8 @@ pub enum EpisodeActionType { pub struct EpisodeAction { pub podcast: String, pub episode: String, - pub timestamp: Option, + pub timestamp: Option>, + pub time_changed: DateTime, #[serde(default)] pub device: Option, #[serde(flatten)] diff --git a/src/gpodder/repository.rs b/src/gpodder/repository.rs index e0d9d12..9913c9d 100644 --- a/src/gpodder/repository.rs +++ b/src/gpodder/repository.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use argon2::{Argon2, PasswordHash, PasswordVerifier}; +use chrono::{DateTime, TimeDelta, Utc}; use rand::Rng; use super::{models, AuthErr, Store}; @@ -26,9 +27,7 @@ impl GpodderRepository { .ok_or(AuthErr::UnknownSession)?; // Expired sessions still in the database are considered removed - if chrono::Utc::now() - session.last_seen - > chrono::TimeDelta::new(MAX_SESSION_AGE, 0).unwrap() - { + if Utc::now() - session.last_seen > TimeDelta::new(MAX_SESSION_AGE, 0).unwrap() { Err(AuthErr::UnknownSession) } else { Ok(session) @@ -57,7 +56,7 @@ impl GpodderRepository { pub fn create_session(&self, user: &models::User) -> Result { let session = models::Session { id: rand::thread_rng().gen(), - last_seen: chrono::Utc::now(), + last_seen: Utc::now(), user: user.clone(), }; @@ -100,13 +99,13 @@ impl GpodderRepository { user: &models::User, device_id: &str, urls: Vec, - ) -> Result, AuthErr> { - let time_changed = chrono::Utc::now(); + ) -> Result, AuthErr> { + let time_changed = Utc::now(); self.store .set_subscriptions_for_device(user, device_id, urls, time_changed)?; - Ok(time_changed + chrono::TimeDelta::seconds(1)) + Ok(time_changed + TimeDelta::seconds(1)) } pub fn update_subscriptions_for_device( @@ -115,29 +114,55 @@ impl GpodderRepository { device_id: &str, add: Vec, remove: Vec, - ) -> Result, AuthErr> { - let time_changed = chrono::Utc::now(); + ) -> Result, AuthErr> { + let time_changed = Utc::now(); self.store .update_subscriptions_for_device(user, device_id, add, remove, time_changed)?; - Ok(time_changed + chrono::TimeDelta::seconds(1)) + Ok(time_changed + TimeDelta::seconds(1)) } pub fn subscription_updates_for_device( &self, user: &models::User, device_id: &str, - since: chrono::DateTime, - ) -> Result<(chrono::DateTime, Vec, Vec), AuthErr> { + since: DateTime, + ) -> Result<(DateTime, Vec, Vec), AuthErr> { let (max_time_changed, added, removed) = self .store .subscription_updates_for_device(user, device_id, since)?; - Ok(( - max_time_changed + chrono::TimeDelta::seconds(1), - added, - removed, - )) + Ok((max_time_changed + TimeDelta::seconds(1), added, removed)) + } + + pub fn add_episode_actions( + &self, + user: &models::User, + actions: Vec, + ) -> Result, AuthErr> { + let time_changed = Utc::now(); + + self.store + .add_episode_actions(user, actions, time_changed)?; + + Ok(time_changed + TimeDelta::seconds(1)) + } + + pub fn episode_actions_for_user( + &self, + user: &models::User, + since: Option>, + podcast: Option, + device: Option, + aggregated: bool, + ) -> Result<(DateTime, Vec), AuthErr> { + let now = chrono::Utc::now(); + let actions = self + .store + .episode_actions_for_user(user, since, podcast, device, aggregated)?; + let max_time_changed = actions.iter().map(|a| a.time_changed).max().unwrap_or(now); + + Ok((max_time_changed + TimeDelta::seconds(1), actions)) } } diff --git a/src/server/gpodder/advanced/episodes.rs b/src/server/gpodder/advanced/episodes.rs index b862b9c..22839c0 100644 --- a/src/server/gpodder/advanced/episodes.rs +++ b/src/server/gpodder/advanced/episodes.rs @@ -4,10 +4,11 @@ use axum::{ routing::post, Extension, Json, Router, }; +use chrono::DateTime; use serde::{Deserialize, Serialize}; use crate::{ - gpodder::{self, EpisodeActionRepository}, + gpodder, server::{ error::{AppError, AppResult}, gpodder::{ @@ -43,12 +44,12 @@ async fn post_episode_actions( } Ok( - tokio::task::spawn_blocking(move || ctx.repo.add_episode_actions(&user, actions)) + tokio::task::spawn_blocking(move || ctx.store.add_episode_actions(&user, actions)) .await .unwrap() - .map(|timestamp| { + .map(|time_changed| { Json(UpdatedUrlsResponse { - timestamp, + timestamp: time_changed.timestamp(), update_urls: Vec::new(), }) })?, @@ -84,10 +85,15 @@ async fn get_episode_actions( return Err(AppError::BadRequest); } + let since = filter + .since + .map(|ts| DateTime::from_timestamp(ts, 0)) + .flatten(); + Ok(tokio::task::spawn_blocking(move || { - ctx.repo.episode_actions_for_user( + ctx.store.episode_actions_for_user( &user, - filter.since, + since, filter.podcast, filter.device, filter.aggregated, @@ -95,5 +101,10 @@ async fn get_episode_actions( }) .await .unwrap() - .map(|(timestamp, actions)| Json(EpisodeActionsResponse { timestamp, actions }))?) + .map(|(ts, actions)| { + Json(EpisodeActionsResponse { + timestamp: ts.timestamp(), + actions, + }) + })?) } diff --git a/src/server/mod.rs b/src/server/mod.rs index 0a88fd6..93ac8d3 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -6,7 +6,6 @@ use tower_http::trace::TraceLayer; #[derive(Clone)] pub struct Context { - pub repo: crate::db::SqliteRepository, pub store: crate::gpodder::GpodderRepository, } From bd51c1c768b2c3f1b2c52b8325e7f2e7a98cda1c Mon Sep 17 00:00:00 2001 From: Jef Roosens Date: Sat, 15 Mar 2025 20:06:58 +0100 Subject: [PATCH 06/14] refactor: decoupled gpodder and server models --- src/gpodder/models.rs | 13 -- src/server/gpodder/advanced/devices.rs | 9 +- src/server/gpodder/advanced/episodes.rs | 30 +++-- src/server/gpodder/models.rs | 164 ++++++++++++++++++++++++ 4 files changed, 185 insertions(+), 31 deletions(-) diff --git a/src/gpodder/models.rs b/src/gpodder/models.rs index 1c9e550..3bbf229 100644 --- a/src/gpodder/models.rs +++ b/src/gpodder/models.rs @@ -1,5 +1,4 @@ use chrono::{DateTime, Utc}; -use serde::{Deserialize, Serialize}; #[derive(Clone)] pub struct User { @@ -8,8 +7,6 @@ pub struct User { pub password_hash: String, } -#[derive(Serialize, Deserialize)] -#[serde(rename_all = "lowercase")] pub enum DeviceType { Desktop, Laptop, @@ -18,7 +15,6 @@ pub enum DeviceType { Other, } -#[derive(Serialize)] pub struct Device { pub id: String, pub caption: String, @@ -26,37 +22,28 @@ pub struct Device { pub subscriptions: i64, } -#[derive(Deserialize)] pub struct DevicePatch { pub caption: Option, pub r#type: Option, } -#[derive(Serialize, Deserialize, Debug)] -#[serde(rename_all = "lowercase")] -#[serde(tag = "action")] pub enum EpisodeActionType { Download, Play { - #[serde(default)] started: Option, position: i32, - #[serde(default)] total: Option, }, Delete, New, } -#[derive(Serialize, Deserialize, Debug)] pub struct EpisodeAction { pub podcast: String, pub episode: String, pub timestamp: Option>, pub time_changed: DateTime, - #[serde(default)] pub device: Option, - #[serde(flatten)] pub action: EpisodeActionType, } diff --git a/src/server/gpodder/advanced/devices.rs b/src/server/gpodder/advanced/devices.rs index 397ddb3..859e1c1 100644 --- a/src/server/gpodder/advanced/devices.rs +++ b/src/server/gpodder/advanced/devices.rs @@ -12,6 +12,7 @@ use crate::{ gpodder::{ auth_middleware, format::{Format, StringWithFormat}, + models, }, Context, }, @@ -28,7 +29,7 @@ async fn get_devices( State(ctx): State, Path(username): Path, Extension(user): Extension, -) -> AppResult>> { +) -> AppResult>> { if username.format != Format::Json { return Err(AppError::NotFound); } @@ -41,7 +42,7 @@ async fn get_devices( tokio::task::spawn_blocking(move || ctx.store.devices_for_user(&user)) .await .unwrap() - .map(Json)?, + .map(|devices| Json(devices.into_iter().map(models::Device::from).collect()))?, ) } @@ -49,13 +50,13 @@ async fn post_device( State(ctx): State, Path((_username, id)): Path<(String, StringWithFormat)>, Extension(user): Extension, - Json(patch): Json, + Json(patch): Json, ) -> AppResult<()> { if id.format != Format::Json { return Err(AppError::NotFound); } - tokio::task::spawn_blocking(move || ctx.store.update_device_info(&user, &id, patch)) + tokio::task::spawn_blocking(move || ctx.store.update_device_info(&user, &id, patch.into())) .await .unwrap()?; diff --git a/src/server/gpodder/advanced/episodes.rs b/src/server/gpodder/advanced/episodes.rs index 22839c0..6c7ff32 100644 --- a/src/server/gpodder/advanced/episodes.rs +++ b/src/server/gpodder/advanced/episodes.rs @@ -14,6 +14,7 @@ use crate::{ gpodder::{ auth_middleware, format::{Format, StringWithFormat}, + models, models::UpdatedUrlsResponse, }, Context, @@ -33,7 +34,7 @@ async fn post_episode_actions( State(ctx): State, Path(username): Path, Extension(user): Extension, - Json(actions): Json>, + Json(actions): Json>, ) -> AppResult> { if username.format != Format::Json { return Err(AppError::NotFound); @@ -43,17 +44,18 @@ async fn post_episode_actions( return Err(AppError::BadRequest); } - Ok( - tokio::task::spawn_blocking(move || ctx.store.add_episode_actions(&user, actions)) - .await - .unwrap() - .map(|time_changed| { - Json(UpdatedUrlsResponse { - timestamp: time_changed.timestamp(), - update_urls: Vec::new(), - }) - })?, - ) + Ok(tokio::task::spawn_blocking(move || { + ctx.store + .add_episode_actions(&user, actions.into_iter().map(Into::into).collect()) + }) + .await + .unwrap() + .map(|time_changed| { + Json(UpdatedUrlsResponse { + timestamp: time_changed.timestamp(), + update_urls: Vec::new(), + }) + })?) } #[derive(Deserialize, Default)] @@ -68,7 +70,7 @@ struct FilterQuery { #[derive(Serialize)] struct EpisodeActionsResponse { timestamp: i64, - actions: Vec, + actions: Vec, } async fn get_episode_actions( @@ -104,7 +106,7 @@ async fn get_episode_actions( .map(|(ts, actions)| { Json(EpisodeActionsResponse { timestamp: ts.timestamp(), - actions, + actions: actions.into_iter().map(Into::into).collect(), }) })?) } diff --git a/src/server/gpodder/models.rs b/src/server/gpodder/models.rs index 2be8f87..b2268af 100644 --- a/src/server/gpodder/models.rs +++ b/src/server/gpodder/models.rs @@ -1,5 +1,8 @@ +use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; +use crate::gpodder; + #[derive(Deserialize, Debug)] pub struct SubscriptionDelta { pub add: Vec, @@ -18,3 +21,164 @@ pub struct UpdatedUrlsResponse { pub timestamp: i64, pub update_urls: Vec<(String, String)>, } + +#[derive(Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum DeviceType { + Desktop, + Laptop, + Mobile, + Server, + Other, +} + +#[derive(Serialize)] +pub struct Device { + pub id: String, + pub caption: String, + pub r#type: DeviceType, + pub subscriptions: i64, +} + +#[derive(Deserialize)] +pub struct DevicePatch { + pub caption: Option, + pub r#type: Option, +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "lowercase")] +#[serde(tag = "action")] +pub enum EpisodeActionType { + Download, + Play { + #[serde(default)] + started: Option, + position: i32, + #[serde(default)] + total: Option, + }, + Delete, + New, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct EpisodeAction { + pub podcast: String, + pub episode: String, + pub timestamp: Option, + #[serde(default)] + pub device: Option, + #[serde(flatten)] + pub action: EpisodeActionType, +} + +impl From for DeviceType { + fn from(value: gpodder::DeviceType) -> Self { + match value { + gpodder::DeviceType::Other => Self::Other, + gpodder::DeviceType::Laptop => Self::Laptop, + gpodder::DeviceType::Mobile => Self::Mobile, + gpodder::DeviceType::Server => Self::Server, + gpodder::DeviceType::Desktop => Self::Desktop, + } + } +} + +impl From for gpodder::DeviceType { + fn from(value: DeviceType) -> Self { + match value { + DeviceType::Other => gpodder::DeviceType::Other, + DeviceType::Laptop => gpodder::DeviceType::Laptop, + DeviceType::Mobile => gpodder::DeviceType::Mobile, + DeviceType::Server => gpodder::DeviceType::Server, + DeviceType::Desktop => gpodder::DeviceType::Desktop, + } + } +} + +impl From for Device { + fn from(value: gpodder::Device) -> Self { + Self { + id: value.id, + caption: value.caption, + r#type: value.r#type.into(), + subscriptions: value.subscriptions, + } + } +} + +impl From for gpodder::DevicePatch { + fn from(value: DevicePatch) -> Self { + Self { + caption: value.caption, + r#type: value.r#type.map(Into::into), + } + } +} + +impl From for EpisodeActionType { + fn from(value: gpodder::EpisodeActionType) -> Self { + match value { + gpodder::EpisodeActionType::New => Self::New, + gpodder::EpisodeActionType::Delete => Self::Delete, + gpodder::EpisodeActionType::Download => Self::Download, + gpodder::EpisodeActionType::Play { + started, + position, + total, + } => Self::Play { + started, + position, + total, + }, + } + } +} + +impl From for gpodder::EpisodeActionType { + fn from(value: EpisodeActionType) -> Self { + match value { + EpisodeActionType::New => gpodder::EpisodeActionType::New, + EpisodeActionType::Delete => gpodder::EpisodeActionType::Delete, + EpisodeActionType::Download => gpodder::EpisodeActionType::Download, + EpisodeActionType::Play { + started, + position, + total, + } => gpodder::EpisodeActionType::Play { + started, + position, + total, + }, + } + } +} + +impl From for EpisodeAction { + fn from(value: gpodder::EpisodeAction) -> Self { + Self { + podcast: value.podcast, + episode: value.episode, + timestamp: value.timestamp.map(|ts| ts.timestamp()), + device: value.device, + action: value.action.into(), + } + } +} + +impl From for gpodder::EpisodeAction { + fn from(value: EpisodeAction) -> Self { + Self { + podcast: value.podcast, + episode: value.episode, + // TODO remove this unwrap + timestamp: value + .timestamp + .map(|ts| DateTime::from_timestamp(ts, 0).unwrap()), + device: value.device, + action: value.action.into(), + time_changed: DateTime::::MIN_UTC, + } + } +} From 78420eed68ee4fec64e58d7310c6817a47a5a107 Mon Sep 17 00:00:00 2001 From: Jef Roosens Date: Sat, 15 Mar 2025 20:20:38 +0100 Subject: [PATCH 07/14] refactor: moved knowledge of subscription change time to store --- src/db/repository/subscription.rs | 52 ++++++++++++++------ src/gpodder/mod.rs | 6 +-- src/gpodder/models.rs | 5 ++ src/gpodder/repository.rs | 26 ++++++++-- src/server/gpodder/advanced/subscriptions.rs | 4 +- src/server/gpodder/simple/subscriptions.rs | 4 +- 6 files changed, 72 insertions(+), 25 deletions(-) diff --git a/src/db/repository/subscription.rs b/src/db/repository/subscription.rs index 6ab8fe5..5448380 100644 --- a/src/db/repository/subscription.rs +++ b/src/db/repository/subscription.rs @@ -1,5 +1,6 @@ use std::collections::HashSet; +use chrono::DateTime; use diesel::prelude::*; use super::SqliteRepository; @@ -8,24 +9,39 @@ use crate::{ gpodder, }; +impl From<(String, i64)> for gpodder::Subscription { + fn from((url, ts): (String, i64)) -> Self { + Self { + url, + time_changed: DateTime::from_timestamp(ts, 0).unwrap(), + } + } +} + impl gpodder::SubscriptionRepository for SqliteRepository { fn subscriptions_for_user( &self, user: &gpodder::User, - ) -> Result, gpodder::AuthErr> { + ) -> Result, gpodder::AuthErr> { Ok(device_subscriptions::table .inner_join(devices::table) .filter(devices::user_id.eq(user.id)) - .select(device_subscriptions::podcast_url) + .select(( + device_subscriptions::podcast_url, + device_subscriptions::time_changed, + )) .distinct() - .get_results(&mut self.pool.get()?)?) + .get_results::<(String, i64)>(&mut self.pool.get()?)? + .into_iter() + .map(Into::into) + .collect()) } fn subscriptions_for_device( &self, user: &gpodder::User, device_id: &str, - ) -> Result, gpodder::AuthErr> { + ) -> Result, gpodder::AuthErr> { Ok(device_subscriptions::table .inner_join(devices::table) .filter( @@ -33,8 +49,14 @@ impl gpodder::SubscriptionRepository for SqliteRepository { .eq(user.id) .and(devices::device_id.eq(device_id)), ) - .select(device_subscriptions::podcast_url) - .get_results(&mut self.pool.get()?)?) + .select(( + device_subscriptions::podcast_url, + device_subscriptions::time_changed, + )) + .get_results::<(String, i64)>(&mut self.pool.get()?)? + .into_iter() + .map(Into::into) + .collect()) } fn set_subscriptions_for_device( @@ -230,7 +252,7 @@ impl gpodder::SubscriptionRepository for SqliteRepository { user: &gpodder::User, device_id: &str, since: chrono::DateTime, - ) -> Result<(chrono::DateTime, Vec, Vec), gpodder::AuthErr> { + ) -> Result<(Vec, Vec), gpodder::AuthErr> { let since = since.timestamp(); let (mut added, mut removed) = (Vec::new(), Vec::new()); @@ -245,20 +267,22 @@ impl gpodder::SubscriptionRepository for SqliteRepository { ) .select(db::DeviceSubscription::as_select()); - let mut max_time: chrono::DateTime = chrono::DateTime::::MIN_UTC; - for sub in query.load_iter(&mut self.pool.get()?)? { let sub = sub?; if sub.deleted { - removed.push(sub.podcast_url); + removed.push(gpodder::Subscription { + url: sub.podcast_url, + time_changed: DateTime::from_timestamp(sub.time_changed, 0).unwrap(), + }); } else { - added.push(sub.podcast_url); + added.push(gpodder::Subscription { + url: sub.podcast_url, + time_changed: DateTime::from_timestamp(sub.time_changed, 0).unwrap(), + }); } - - max_time = max_time.max(chrono::DateTime::from_timestamp(sub.time_changed, 0).unwrap()); } - Ok((max_time, added, removed)) + Ok((added, removed)) } } diff --git a/src/gpodder/mod.rs b/src/gpodder/mod.rs index 0e89cb9..8336de1 100644 --- a/src/gpodder/mod.rs +++ b/src/gpodder/mod.rs @@ -74,10 +74,10 @@ pub trait SubscriptionRepository { &self, user: &User, device_id: &str, - ) -> Result, AuthErr>; + ) -> Result, AuthErr>; /// Return all subscriptions for a given user - fn subscriptions_for_user(&self, user: &User) -> Result, AuthErr>; + fn subscriptions_for_user(&self, user: &User) -> Result, AuthErr>; /// Replace the list of subscriptions for a device with the given list fn set_subscriptions_for_device( @@ -104,7 +104,7 @@ pub trait SubscriptionRepository { user: &User, device_id: &str, since: DateTime, - ) -> Result<(DateTime, Vec, Vec), AuthErr>; + ) -> Result<(Vec, Vec), AuthErr>; } pub trait EpisodeActionRepository { diff --git a/src/gpodder/models.rs b/src/gpodder/models.rs index 3bbf229..b68e73b 100644 --- a/src/gpodder/models.rs +++ b/src/gpodder/models.rs @@ -52,3 +52,8 @@ pub struct Session { pub last_seen: DateTime, pub user: User, } + +pub struct Subscription { + pub url: String, + pub time_changed: DateTime, +} diff --git a/src/gpodder/repository.rs b/src/gpodder/repository.rs index 9913c9d..411f9cf 100644 --- a/src/gpodder/repository.rs +++ b/src/gpodder/repository.rs @@ -86,11 +86,14 @@ impl GpodderRepository { &self, user: &models::User, device_id: &str, - ) -> Result, AuthErr> { + ) -> Result, AuthErr> { self.store.subscriptions_for_device(user, device_id) } - pub fn subscriptions_for_user(&self, user: &models::User) -> Result, AuthErr> { + pub fn subscriptions_for_user( + &self, + user: &models::User, + ) -> Result, AuthErr> { self.store.subscriptions_for_user(user) } @@ -128,11 +131,26 @@ impl GpodderRepository { user: &models::User, device_id: &str, since: DateTime, - ) -> Result<(DateTime, Vec, Vec), AuthErr> { - let (max_time_changed, added, removed) = self + ) -> Result< + ( + DateTime, + Vec, + Vec, + ), + AuthErr, + > { + let now = chrono::Utc::now(); + + let (added, removed) = self .store .subscription_updates_for_device(user, device_id, since)?; + let max_time_changed = added + .iter() + .chain(removed.iter()) + .map(|s| s.time_changed) + .max() + .unwrap_or(now); Ok((max_time_changed + TimeDelta::seconds(1), added, removed)) } diff --git a/src/server/gpodder/advanced/subscriptions.rs b/src/server/gpodder/advanced/subscriptions.rs index dbb8d28..e5691e1 100644 --- a/src/server/gpodder/advanced/subscriptions.rs +++ b/src/server/gpodder/advanced/subscriptions.rs @@ -85,8 +85,8 @@ pub async fn get_subscription_changes( .unwrap() .map(|(next_time_changed, add, remove)| { Json(SubscriptionDeltaResponse { - add, - remove, + add: add.into_iter().map(|s| s.url).collect(), + remove: remove.into_iter().map(|s| s.url).collect(), timestamp: next_time_changed.timestamp(), }) })?) diff --git a/src/server/gpodder/simple/subscriptions.rs b/src/server/gpodder/simple/subscriptions.rs index 4f6266f..6e9227c 100644 --- a/src/server/gpodder/simple/subscriptions.rs +++ b/src/server/gpodder/simple/subscriptions.rs @@ -37,7 +37,7 @@ pub async fn get_device_subscriptions( tokio::task::spawn_blocking(move || ctx.store.subscriptions_for_device(&user, &id)) .await .unwrap() - .map(Json)?, + .map(|subs| Json(subs.into_iter().map(|s| s.url).collect()))?, ) } @@ -54,7 +54,7 @@ pub async fn get_user_subscriptions( tokio::task::spawn_blocking(move || ctx.store.subscriptions_for_user(&user)) .await .unwrap() - .map(Json)?, + .map(|subs| Json(subs.into_iter().map(|s| s.url).collect()))?, ) } From 3f0e01aaf6c7600f4a65fcc04d8c524c96080897 Mon Sep 17 00:00:00 2001 From: Jef Roosens Date: Sat, 15 Mar 2025 16:12:46 +0100 Subject: [PATCH 08/14] feat: add last_seen field to sessions --- .../2025-03-15-145721_session_last_seen/down.sql | 3 +++ .../2025-03-15-145721_session_last_seen/up.sql | 3 +++ src/db/models/session.rs | 15 ++++++++++----- src/db/repository/auth.rs | 1 + src/db/schema.rs | 1 + 5 files changed, 18 insertions(+), 5 deletions(-) create mode 100644 migrations/2025-03-15-145721_session_last_seen/down.sql create mode 100644 migrations/2025-03-15-145721_session_last_seen/up.sql diff --git a/migrations/2025-03-15-145721_session_last_seen/down.sql b/migrations/2025-03-15-145721_session_last_seen/down.sql new file mode 100644 index 0000000..e794cf7 --- /dev/null +++ b/migrations/2025-03-15-145721_session_last_seen/down.sql @@ -0,0 +1,3 @@ +-- This file should undo anything in `up.sql` +alter table sessions + drop column last_seen; diff --git a/migrations/2025-03-15-145721_session_last_seen/up.sql b/migrations/2025-03-15-145721_session_last_seen/up.sql new file mode 100644 index 0000000..776d287 --- /dev/null +++ b/migrations/2025-03-15-145721_session_last_seen/up.sql @@ -0,0 +1,3 @@ +-- Your SQL goes here +alter table sessions + add column last_seen bigint not null default 0; diff --git a/src/db/models/session.rs b/src/db/models/session.rs index 10ee4a1..273550a 100644 --- a/src/db/models/session.rs +++ b/src/db/models/session.rs @@ -11,16 +11,21 @@ use crate::db::{schema::*, DbPool, DbResult}; pub struct Session { pub id: i64, pub user_id: i64, + pub last_seen: i64, } impl Session { - pub fn new_for_user(pool: &DbPool, user_id: i64) -> DbResult { + pub fn new_for_user(pool: &DbPool, user_id: i64, last_seen: i64) -> DbResult { let id: i64 = rand::thread_rng().gen(); - Ok(Self { id, user_id } - .insert_into(sessions::table) - .returning(Self::as_returning()) - .get_result(&mut pool.get()?)?) + Ok(Self { + id, + user_id, + last_seen, + } + .insert_into(sessions::table) + .returning(Self::as_returning()) + .get_result(&mut pool.get()?)?) } pub fn user_from_id(pool: &DbPool, id: i64) -> DbResult> { diff --git a/src/db/repository/auth.rs b/src/db/repository/auth.rs index ff30f09..453c1f5 100644 --- a/src/db/repository/auth.rs +++ b/src/db/repository/auth.rs @@ -77,6 +77,7 @@ impl gpodder::AuthRepository for SqliteRepository { let session_id = db::Session { id, user_id: user.id, + last_seen: chrono::Utc::now().timestamp(), } .insert_into(sessions::table) .returning(sessions::id) diff --git a/src/db/schema.rs b/src/db/schema.rs index fe21dfe..2f597f1 100644 --- a/src/db/schema.rs +++ b/src/db/schema.rs @@ -41,6 +41,7 @@ diesel::table! { sessions (id) { id -> BigInt, user_id -> BigInt, + last_seen -> BigInt, } } From b1fa048081d588a9fcbcd83072dfd84eaf54c2cc Mon Sep 17 00:00:00 2001 From: Jef Roosens Date: Sat, 15 Mar 2025 18:28:40 +0100 Subject: [PATCH 09/14] refactor: moved auth business logic outside of db using store abstraction --- src/cli/serve.rs | 4 +- src/db/repository/auth.rs | 60 +++++++++++++++++++++++ src/gpodder/mod.rs | 16 +++++++ src/gpodder/models.rs | 9 +++- src/gpodder/repository.rs | 74 +++++++++++++++++++++++++++++ src/server/gpodder/advanced/auth.rs | 23 ++++----- src/server/gpodder/mod.rs | 13 ++--- src/server/mod.rs | 1 + 8 files changed, 179 insertions(+), 21 deletions(-) create mode 100644 src/gpodder/repository.rs diff --git a/src/cli/serve.rs b/src/cli/serve.rs index a0ff0fe..1183814 100644 --- a/src/cli/serve.rs +++ b/src/cli/serve.rs @@ -6,9 +6,11 @@ pub fn serve(config: &crate::config::Config) -> u8 { tracing::info!("Initializing database and running migrations"); let pool = db::initialize_db(config.data_dir.join(crate::DB_FILENAME), true).unwrap(); + let repo = db::SqliteRepository::from(pool); let ctx = server::Context { - repo: db::SqliteRepository::from(pool), + repo: repo.clone(), + store: crate::gpodder::GpodderRepository::new(Box::new(repo)), }; let app = server::app(ctx); diff --git a/src/db/repository/auth.rs b/src/db/repository/auth.rs index 453c1f5..e65f046 100644 --- a/src/db/repository/auth.rs +++ b/src/db/repository/auth.rs @@ -1,3 +1,4 @@ +use chrono::DateTime; use diesel::prelude::*; use rand::Rng; @@ -19,6 +20,16 @@ impl From for gpodder::AuthErr { } } +impl From for gpodder::User { + fn from(value: db::User) -> Self { + Self { + id: value.id, + username: value.username, + password_hash: value.password_hash, + } + } +} + impl gpodder::AuthRepository for SqliteRepository { fn validate_credentials( &self, @@ -35,6 +46,7 @@ impl gpodder::AuthRepository for SqliteRepository { Ok(gpodder::User { id: user.id, username: user.username, + password_hash: user.password_hash, }) } else { Err(gpodder::AuthErr::InvalidPassword) @@ -54,6 +66,7 @@ impl gpodder::AuthRepository for SqliteRepository { Ok(user) => Ok(gpodder::User { id: user.id, username: user.username, + password_hash: user.password_hash, }), Err(diesel::result::Error::NotFound) => Err(gpodder::AuthErr::UnknownSession), Err(err) => Err(gpodder::AuthErr::Other(Box::new(err))), @@ -88,6 +101,7 @@ impl gpodder::AuthRepository for SqliteRepository { gpodder::User { id: user.id, username: user.username, + password_hash: user.password_hash, }, )) } else { @@ -122,3 +136,49 @@ impl gpodder::AuthRepository for SqliteRepository { } } } + +impl gpodder::AuthStore for SqliteRepository { + fn get_user(&self, username: &str) -> Result, AuthErr> { + Ok(users::table + .select(db::User::as_select()) + .filter(users::username.eq(username)) + .first(&mut self.pool.get()?) + .optional()? + .map(gpodder::User::from)) + } + + fn get_session(&self, session_id: i64) -> Result, AuthErr> { + match sessions::table + .inner_join(users::table) + .filter(sessions::id.eq(session_id)) + .select((db::Session::as_select(), db::User::as_select())) + .get_result(&mut self.pool.get()?) + { + Ok((session, user)) => Ok(Some(gpodder::Session { + id: session.id, + last_seen: DateTime::from_timestamp(session.last_seen, 0).unwrap(), + user: user.into(), + })), + Err(err) => Err(AuthErr::from(err)), + } + } + + fn remove_session(&self, session_id: i64) -> Result<(), AuthErr> { + Ok( + diesel::delete(sessions::table.filter(sessions::id.eq(session_id))) + .execute(&mut self.pool.get()?) + .map(|_| ())?, + ) + } + + fn insert_session(&self, session: &gpodder::Session) -> Result<(), AuthErr> { + Ok(db::Session { + id: session.id, + user_id: session.user.id, + last_seen: session.last_seen.timestamp(), + } + .insert_into(sessions::table) + .execute(&mut self.pool.get()?) + .map(|_| ())?) + } +} diff --git a/src/gpodder/mod.rs b/src/gpodder/mod.rs index 4ae79d3..567badf 100644 --- a/src/gpodder/mod.rs +++ b/src/gpodder/mod.rs @@ -1,6 +1,8 @@ pub mod models; +mod repository; pub use models::*; +pub use repository::GpodderRepository; pub enum AuthErr { UnknownSession, @@ -27,6 +29,20 @@ pub trait AuthRepository { fn remove_session(&self, username: &str, session_id: i64) -> Result<(), AuthErr>; } +pub trait AuthStore { + /// Retrieve the session with the given session ID + fn get_session(&self, session_id: i64) -> Result, AuthErr>; + + /// Retrieve the user with the given username + fn get_user(&self, username: &str) -> Result, AuthErr>; + + /// Create a new session for a user with the given session ID + fn insert_session(&self, session: &Session) -> Result<(), AuthErr>; + + /// Remove the session with the given session ID + fn remove_session(&self, session_id: i64) -> Result<(), AuthErr>; +} + pub trait DeviceRepository { /// Return all devices associated with the user fn devices_for_user(&self, user: &User) -> Result, AuthErr>; diff --git a/src/gpodder/models.rs b/src/gpodder/models.rs index 39a615c..2ecb004 100644 --- a/src/gpodder/models.rs +++ b/src/gpodder/models.rs @@ -1,10 +1,11 @@ -use chrono::NaiveDateTime; +use chrono::{DateTime, NaiveDateTime, Utc}; use serde::{Deserialize, Serialize}; #[derive(Clone)] pub struct User { pub id: i64, pub username: String, + pub password_hash: String, } #[derive(Serialize, Deserialize)] @@ -57,3 +58,9 @@ pub struct EpisodeAction { #[serde(flatten)] pub action: EpisodeActionType, } + +pub struct Session { + pub id: i64, + pub last_seen: DateTime, + pub user: User, +} diff --git a/src/gpodder/repository.rs b/src/gpodder/repository.rs new file mode 100644 index 0000000..a6fb0b1 --- /dev/null +++ b/src/gpodder/repository.rs @@ -0,0 +1,74 @@ +use std::sync::Arc; + +use argon2::{Argon2, PasswordHash, PasswordVerifier}; +use rand::Rng; + +use super::{models, AuthErr, AuthStore}; + +const MAX_SESSION_AGE: i64 = 60 * 60 * 24 * 7; + +type Store = dyn AuthStore + Send + Sync; + +#[derive(Clone)] +pub struct GpodderRepository { + store: Arc, +} + +impl GpodderRepository { + pub fn new(store: Box) -> Self { + Self { + store: Arc::from(store), + } + } + + pub fn validate_session(&self, session_id: i64) -> Result { + let session = self + .store + .get_session(session_id)? + .ok_or(AuthErr::UnknownSession)?; + + // Expired sessions still in the database are considered removed + if chrono::Utc::now() - session.last_seen + > chrono::TimeDelta::new(MAX_SESSION_AGE, 0).unwrap() + { + Err(AuthErr::UnknownSession) + } else { + Ok(session) + } + } + + pub fn validate_credentials( + &self, + username: &str, + password: &str, + ) -> Result { + let user = self.store.get_user(username)?.ok_or(AuthErr::UnknownUser)?; + + let password_hash = PasswordHash::new(&user.password_hash).unwrap(); + + if Argon2::default() + .verify_password(password.as_bytes(), &password_hash) + .is_ok() + { + Ok(user) + } else { + Err(AuthErr::InvalidPassword) + } + } + + pub fn create_session(&self, user: &models::User) -> Result { + let session = models::Session { + id: rand::thread_rng().gen(), + last_seen: chrono::Utc::now(), + user: user.clone(), + }; + + self.store.insert_session(&session)?; + + Ok(session) + } + + pub fn remove_session(&self, session_id: i64) -> Result<(), AuthErr> { + self.store.remove_session(session_id) + } +} diff --git a/src/server/gpodder/advanced/auth.rs b/src/server/gpodder/advanced/auth.rs index abdb0d6..ac5cbe6 100644 --- a/src/server/gpodder/advanced/auth.rs +++ b/src/server/gpodder/advanced/auth.rs @@ -12,13 +12,10 @@ use axum_extra::{ TypedHeader, }; -use crate::{ - gpodder::AuthRepository, - server::{ - error::{AppError, AppResult}, - gpodder::SESSION_ID_COOKIE, - Context, - }, +use crate::server::{ + error::{AppError, AppResult}, + gpodder::SESSION_ID_COOKIE, + Context, }; pub fn router() -> Router { @@ -38,14 +35,17 @@ async fn post_login( return Err(AppError::BadRequest); } - let (session_id, _) = tokio::task::spawn_blocking(move || { - ctx.repo.create_session(auth.username(), auth.password()) + let session = tokio::task::spawn_blocking(move || { + let user = ctx + .store + .validate_credentials(auth.username(), auth.password())?; + ctx.store.create_session(&user) }) .await .unwrap()?; Ok(jar.add( - Cookie::build((SESSION_ID_COOKIE, session_id.to_string())).expires(Expiration::Session), + Cookie::build((SESSION_ID_COOKIE, session.id.to_string())).expires(Expiration::Session), )) } @@ -60,7 +60,8 @@ async fn post_logout( .parse() .map_err(|_| AppError::BadRequest)?; - tokio::task::spawn_blocking(move || ctx.repo.remove_session(&username, session_id)) + // TODO reintroduce username check + tokio::task::spawn_blocking(move || ctx.store.remove_session(session_id)) .await .unwrap()?; diff --git a/src/server/gpodder/mod.rs b/src/server/gpodder/mod.rs index 5483935..776100e 100644 --- a/src/server/gpodder/mod.rs +++ b/src/server/gpodder/mod.rs @@ -17,10 +17,7 @@ use axum_extra::{ }; use tower_http::set_header::SetResponseHeaderLayer; -use crate::{ - gpodder::{self, AuthRepository}, - server::error::AppError, -}; +use crate::{gpodder, server::error::AppError}; use super::Context; @@ -51,12 +48,12 @@ pub async fn auth_middleware(State(ctx): State, mut req: Request, next: .and_then(|c| c.value().parse::().ok()) { let ctx_clone = ctx.clone(); - match tokio::task::spawn_blocking(move || ctx_clone.repo.validate_session(session_id)) + match tokio::task::spawn_blocking(move || ctx_clone.store.validate_session(session_id)) .await .unwrap() { - Ok(user) => { - auth_user = Some(user); + Ok(session) => { + auth_user = Some(session.user); } Err(gpodder::AuthErr::UnknownSession) => { jar = jar.add( @@ -77,7 +74,7 @@ pub async fn auth_middleware(State(ctx): State, mut req: Request, next: .await { match tokio::task::spawn_blocking(move || { - ctx.repo + ctx.store .validate_credentials(auth.username(), auth.password()) }) .await diff --git a/src/server/mod.rs b/src/server/mod.rs index 3e7a0e1..0a88fd6 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -7,6 +7,7 @@ use tower_http::trace::TraceLayer; #[derive(Clone)] pub struct Context { pub repo: crate::db::SqliteRepository, + pub store: crate::gpodder::GpodderRepository, } pub fn app(ctx: Context) -> Router { From 6bb3e8a27fc6d336284350d91ced2e2ad68dce08 Mon Sep 17 00:00:00 2001 From: Jef Roosens Date: Sat, 15 Mar 2025 19:01:38 +0100 Subject: [PATCH 10/14] refactor: migrate devices to store --- src/cli/serve.rs | 2 +- src/gpodder/mod.rs | 4 ++++ src/gpodder/repository.rs | 23 +++++++++++++++++------ src/server/gpodder/advanced/devices.rs | 6 +++--- 4 files changed, 25 insertions(+), 10 deletions(-) diff --git a/src/cli/serve.rs b/src/cli/serve.rs index 1183814..e1d0fa3 100644 --- a/src/cli/serve.rs +++ b/src/cli/serve.rs @@ -10,7 +10,7 @@ pub fn serve(config: &crate::config::Config) -> u8 { let ctx = server::Context { repo: repo.clone(), - store: crate::gpodder::GpodderRepository::new(Box::new(repo)), + store: crate::gpodder::GpodderRepository::new(repo), }; let app = server::app(ctx); diff --git a/src/gpodder/mod.rs b/src/gpodder/mod.rs index 567badf..eb6daa2 100644 --- a/src/gpodder/mod.rs +++ b/src/gpodder/mod.rs @@ -11,6 +11,10 @@ pub enum AuthErr { Other(Box), } +pub trait Store: AuthStore + DeviceRepository {} + +impl Store for T where T: AuthStore + DeviceRepository {} + pub trait AuthRepository { /// Validate the given session ID and return its user. fn validate_session(&self, session_id: i64) -> Result; diff --git a/src/gpodder/repository.rs b/src/gpodder/repository.rs index a6fb0b1..9314369 100644 --- a/src/gpodder/repository.rs +++ b/src/gpodder/repository.rs @@ -3,21 +3,19 @@ use std::sync::Arc; use argon2::{Argon2, PasswordHash, PasswordVerifier}; use rand::Rng; -use super::{models, AuthErr, AuthStore}; +use super::{models, AuthErr, Store}; const MAX_SESSION_AGE: i64 = 60 * 60 * 24 * 7; -type Store = dyn AuthStore + Send + Sync; - #[derive(Clone)] pub struct GpodderRepository { - store: Arc, + store: Arc, } impl GpodderRepository { - pub fn new(store: Box) -> Self { + pub fn new(store: impl Store + Send + Sync + 'static) -> Self { Self { - store: Arc::from(store), + store: Arc::new(store), } } @@ -71,4 +69,17 @@ impl GpodderRepository { pub fn remove_session(&self, session_id: i64) -> Result<(), AuthErr> { self.store.remove_session(session_id) } + + pub fn devices_for_user(&self, user: &models::User) -> Result, AuthErr> { + self.store.devices_for_user(user) + } + + pub fn update_device_info( + &self, + user: &models::User, + device_id: &str, + patch: models::DevicePatch, + ) -> Result<(), AuthErr> { + self.store.update_device_info(user, device_id, patch) + } } diff --git a/src/server/gpodder/advanced/devices.rs b/src/server/gpodder/advanced/devices.rs index bb90dc2..397ddb3 100644 --- a/src/server/gpodder/advanced/devices.rs +++ b/src/server/gpodder/advanced/devices.rs @@ -6,7 +6,7 @@ use axum::{ }; use crate::{ - gpodder::{self, DeviceRepository}, + gpodder, server::{ error::{AppError, AppResult}, gpodder::{ @@ -38,7 +38,7 @@ async fn get_devices( } Ok( - tokio::task::spawn_blocking(move || ctx.repo.devices_for_user(&user)) + tokio::task::spawn_blocking(move || ctx.store.devices_for_user(&user)) .await .unwrap() .map(Json)?, @@ -55,7 +55,7 @@ async fn post_device( return Err(AppError::NotFound); } - tokio::task::spawn_blocking(move || ctx.repo.update_device_info(&user, &id, patch)) + tokio::task::spawn_blocking(move || ctx.store.update_device_info(&user, &id, patch)) .await .unwrap()?; From dd14a2152ffcbb7ad5864e349180532e7102e9a0 Mon Sep 17 00:00:00 2001 From: Jef Roosens Date: Sat, 15 Mar 2025 19:19:18 +0100 Subject: [PATCH 11/14] refactor: migrate subscriptions API to store --- src/db/repository/subscription.rs | 28 ++++++---- src/gpodder/mod.rs | 14 +++-- src/gpodder/repository.rs | 58 ++++++++++++++++++++ src/server/gpodder/advanced/subscriptions.rs | 17 +++--- src/server/gpodder/simple/subscriptions.rs | 20 +++---- 5 files changed, 101 insertions(+), 36 deletions(-) diff --git a/src/db/repository/subscription.rs b/src/db/repository/subscription.rs index febe9dc..6ab8fe5 100644 --- a/src/db/repository/subscription.rs +++ b/src/db/repository/subscription.rs @@ -42,9 +42,10 @@ impl gpodder::SubscriptionRepository for SqliteRepository { user: &gpodder::User, device_id: &str, urls: Vec, - ) -> Result { + time_changed: chrono::DateTime, + ) -> Result<(), gpodder::AuthErr> { // TODO use a better timestamp - let timestamp = chrono::Utc::now().timestamp(); + let timestamp = time_changed.timestamp(); self.pool.get()?.transaction(|conn| { let device = devices::table @@ -126,7 +127,7 @@ impl gpodder::SubscriptionRepository for SqliteRepository { Ok::<_, diesel::result::Error>(()) })?; - Ok(timestamp + 1) + Ok(()) } fn update_subscriptions_for_device( @@ -135,9 +136,10 @@ impl gpodder::SubscriptionRepository for SqliteRepository { device_id: &str, add: Vec, remove: Vec, - ) -> Result { + time_changed: chrono::DateTime, + ) -> Result<(), gpodder::AuthErr> { // TODO use a better timestamp - let timestamp = chrono::Utc::now().timestamp_millis(); + let timestamp = time_changed.timestamp(); // TODO URLs that are in both the added and removed lists will currently get "re-added", // meaning their change timestamp will be updated even though they haven't really changed. @@ -220,16 +222,18 @@ impl gpodder::SubscriptionRepository for SqliteRepository { Ok::<_, diesel::result::Error>(()) })?; - Ok(timestamp + 1) + Ok(()) } fn subscription_updates_for_device( &self, user: &gpodder::User, device_id: &str, - since: i64, - ) -> Result<(i64, Vec, Vec), gpodder::AuthErr> { - let (mut timestamp, mut added, mut removed) = (0, Vec::new(), Vec::new()); + since: chrono::DateTime, + ) -> Result<(chrono::DateTime, Vec, Vec), gpodder::AuthErr> { + let since = since.timestamp(); + + let (mut added, mut removed) = (Vec::new(), Vec::new()); let query = device_subscriptions::table .inner_join(devices::table) @@ -241,6 +245,8 @@ impl gpodder::SubscriptionRepository for SqliteRepository { ) .select(db::DeviceSubscription::as_select()); + let mut max_time: chrono::DateTime = chrono::DateTime::::MIN_UTC; + for sub in query.load_iter(&mut self.pool.get()?)? { let sub = sub?; @@ -250,9 +256,9 @@ impl gpodder::SubscriptionRepository for SqliteRepository { added.push(sub.podcast_url); } - timestamp = timestamp.max(sub.time_changed); + max_time = max_time.max(chrono::DateTime::from_timestamp(sub.time_changed, 0).unwrap()); } - Ok((timestamp + 1, added, removed)) + Ok((max_time, added, removed)) } } diff --git a/src/gpodder/mod.rs b/src/gpodder/mod.rs index eb6daa2..18f23b2 100644 --- a/src/gpodder/mod.rs +++ b/src/gpodder/mod.rs @@ -11,9 +11,9 @@ pub enum AuthErr { Other(Box), } -pub trait Store: AuthStore + DeviceRepository {} +pub trait Store: AuthStore + DeviceRepository + SubscriptionRepository {} -impl Store for T where T: AuthStore + DeviceRepository {} +impl Store for T where T: AuthStore + DeviceRepository + SubscriptionRepository {} pub trait AuthRepository { /// Validate the given session ID and return its user. @@ -78,7 +78,8 @@ pub trait SubscriptionRepository { user: &User, device_id: &str, urls: Vec, - ) -> Result; + time_changed: chrono::DateTime, + ) -> Result<(), AuthErr>; /// Update the list of subscriptions for a device by adding and removing the given URLs fn update_subscriptions_for_device( @@ -87,15 +88,16 @@ pub trait SubscriptionRepository { device_id: &str, add: Vec, remove: Vec, - ) -> Result; + time_changed: chrono::DateTime, + ) -> Result<(), AuthErr>; /// Returns the changes in subscriptions since the given timestamp. fn subscription_updates_for_device( &self, user: &User, device_id: &str, - since: i64, - ) -> Result<(i64, Vec, Vec), AuthErr>; + since: chrono::DateTime, + ) -> Result<(chrono::DateTime, Vec, Vec), AuthErr>; } pub trait EpisodeActionRepository { diff --git a/src/gpodder/repository.rs b/src/gpodder/repository.rs index 9314369..e0d9d12 100644 --- a/src/gpodder/repository.rs +++ b/src/gpodder/repository.rs @@ -82,4 +82,62 @@ impl GpodderRepository { ) -> Result<(), AuthErr> { self.store.update_device_info(user, device_id, patch) } + + pub fn subscriptions_for_device( + &self, + user: &models::User, + device_id: &str, + ) -> Result, AuthErr> { + self.store.subscriptions_for_device(user, device_id) + } + + pub fn subscriptions_for_user(&self, user: &models::User) -> Result, AuthErr> { + self.store.subscriptions_for_user(user) + } + + pub fn set_subscriptions_for_device( + &self, + user: &models::User, + device_id: &str, + urls: Vec, + ) -> Result, AuthErr> { + let time_changed = chrono::Utc::now(); + + self.store + .set_subscriptions_for_device(user, device_id, urls, time_changed)?; + + Ok(time_changed + chrono::TimeDelta::seconds(1)) + } + + pub fn update_subscriptions_for_device( + &self, + user: &models::User, + device_id: &str, + add: Vec, + remove: Vec, + ) -> Result, AuthErr> { + let time_changed = chrono::Utc::now(); + + self.store + .update_subscriptions_for_device(user, device_id, add, remove, time_changed)?; + + Ok(time_changed + chrono::TimeDelta::seconds(1)) + } + + pub fn subscription_updates_for_device( + &self, + user: &models::User, + device_id: &str, + since: chrono::DateTime, + ) -> Result<(chrono::DateTime, Vec, Vec), AuthErr> { + let (max_time_changed, added, removed) = self + .store + .subscription_updates_for_device(user, device_id, since)?; + + Ok(( + max_time_changed + chrono::TimeDelta::seconds(1), + added, + removed, + )) + } } diff --git a/src/server/gpodder/advanced/subscriptions.rs b/src/server/gpodder/advanced/subscriptions.rs index 98d464a..dbb8d28 100644 --- a/src/server/gpodder/advanced/subscriptions.rs +++ b/src/server/gpodder/advanced/subscriptions.rs @@ -7,7 +7,7 @@ use axum::{ use serde::Deserialize; use crate::{ - gpodder::{self, SubscriptionRepository}, + gpodder, server::{ error::{AppError, AppResult}, gpodder::{ @@ -43,14 +43,14 @@ pub async fn post_subscription_changes( } Ok(tokio::task::spawn_blocking(move || { - ctx.repo + ctx.store .update_subscriptions_for_device(&user, &id, delta.add, delta.remove) }) .await .unwrap() - .map(|timestamp| { + .map(|time_changed| { Json(UpdatedUrlsResponse { - timestamp, + timestamp: time_changed.timestamp(), update_urls: Vec::new(), }) })?) @@ -76,17 +76,18 @@ pub async fn get_subscription_changes( return Err(AppError::BadRequest); } + let since = chrono::DateTime::from_timestamp(query.since, 0).ok_or(AppError::BadRequest)?; + Ok(tokio::task::spawn_blocking(move || { - ctx.repo - .subscription_updates_for_device(&user, &id, query.since) + ctx.store.subscription_updates_for_device(&user, &id, since) }) .await .unwrap() - .map(|(timestamp, add, remove)| { + .map(|(next_time_changed, add, remove)| { Json(SubscriptionDeltaResponse { add, remove, - timestamp, + timestamp: next_time_changed.timestamp(), }) })?) } diff --git a/src/server/gpodder/simple/subscriptions.rs b/src/server/gpodder/simple/subscriptions.rs index 4e6f7c6..4f6266f 100644 --- a/src/server/gpodder/simple/subscriptions.rs +++ b/src/server/gpodder/simple/subscriptions.rs @@ -6,7 +6,7 @@ use axum::{ }; use crate::{ - gpodder::{self, SubscriptionRepository}, + gpodder, server::{ error::{AppError, AppResult}, gpodder::{auth_middleware, format::StringWithFormat}, @@ -34,7 +34,7 @@ pub async fn get_device_subscriptions( } Ok( - tokio::task::spawn_blocking(move || ctx.repo.subscriptions_for_device(&user, &id)) + tokio::task::spawn_blocking(move || ctx.store.subscriptions_for_device(&user, &id)) .await .unwrap() .map(Json)?, @@ -51,7 +51,7 @@ pub async fn get_user_subscriptions( } Ok( - tokio::task::spawn_blocking(move || ctx.repo.subscriptions_for_user(&user)) + tokio::task::spawn_blocking(move || ctx.store.subscriptions_for_user(&user)) .await .unwrap() .map(Json)?, @@ -68,12 +68,10 @@ pub async fn put_device_subscriptions( return Err(AppError::BadRequest); } - Ok( - tokio::task::spawn_blocking(move || { - ctx.repo.set_subscriptions_for_device(&user, &id, urls) - }) - .await - .unwrap() - .map(|_| ())?, - ) + Ok(tokio::task::spawn_blocking(move || { + ctx.store.set_subscriptions_for_device(&user, &id, urls) + }) + .await + .unwrap() + .map(|_| ())?) } From 465612eec78819cfe8eb81dcdc7368a9e8061681 Mon Sep 17 00:00:00 2001 From: Jef Roosens Date: Sat, 15 Mar 2025 19:34:27 +0100 Subject: [PATCH 12/14] refactor: migrated episode actions API to store --- src/cli/serve.rs | 1 - src/db/repository/episode_action.rs | 28 ++++++------ src/gpodder/mod.rs | 31 ++++++++----- src/gpodder/models.rs | 5 ++- src/gpodder/repository.rs | 59 ++++++++++++++++++------- src/server/gpodder/advanced/episodes.rs | 25 ++++++++--- src/server/mod.rs | 1 - 7 files changed, 97 insertions(+), 53 deletions(-) diff --git a/src/cli/serve.rs b/src/cli/serve.rs index e1d0fa3..664a1e0 100644 --- a/src/cli/serve.rs +++ b/src/cli/serve.rs @@ -9,7 +9,6 @@ pub fn serve(config: &crate::config::Config) -> u8 { let repo = db::SqliteRepository::from(pool); let ctx = server::Context { - repo: repo.clone(), store: crate::gpodder::GpodderRepository::new(repo), }; let app = server::app(ctx); diff --git a/src/db/repository/episode_action.rs b/src/db/repository/episode_action.rs index 3f66bc9..5a45cd2 100644 --- a/src/db/repository/episode_action.rs +++ b/src/db/repository/episode_action.rs @@ -1,4 +1,4 @@ -use chrono::DateTime; +use chrono::{DateTime, Utc}; use diesel::prelude::*; use super::SqliteRepository; @@ -26,7 +26,7 @@ impl From for db::NewEpisodeAction { podcast_url: value.podcast, episode_url: value.episode, time_changed: 0, - timestamp: value.timestamp.map(|t| t.and_utc().timestamp()), + timestamp: value.timestamp.map(|t| t.timestamp()), action, started, position, @@ -58,7 +58,8 @@ impl From<(Option, db::EpisodeAction)> for gpodder::EpisodeAction { // SAFETY the input to the from_timestamp function is always the result of a // previous timestamp() function call, which is guaranteed to be each other's // reverse - .map(|ts| DateTime::from_timestamp(ts, 0).unwrap().naive_utc()), + .map(|ts| DateTime::from_timestamp(ts, 0).unwrap()), + time_changed: DateTime::from_timestamp(db_action.time_changed, 0).unwrap(), device: device_id, action, } @@ -70,8 +71,9 @@ impl gpodder::EpisodeActionRepository for SqliteRepository { &self, user: &gpodder::User, actions: Vec, - ) -> Result { - let time_changed = chrono::Utc::now().timestamp(); + time_changed: DateTime, + ) -> Result<(), gpodder::AuthErr> { + let time_changed = time_changed.timestamp(); // TODO optimize this query // 1. The lookup for a device could be replaced with a subquery, although Diesel seems to @@ -99,17 +101,18 @@ impl gpodder::EpisodeActionRepository for SqliteRepository { Ok::<_, diesel::result::Error>(()) })?; - Ok(time_changed + 1) + Ok(()) } fn episode_actions_for_user( &self, user: &gpodder::User, - since: Option, + since: Option>, podcast: Option, device: Option, aggregated: bool, - ) -> Result<(i64, Vec), gpodder::AuthErr> { + ) -> Result, gpodder::AuthErr> { + let since = since.map(|ts| ts.timestamp()).unwrap_or(0); let conn = &mut self.pool.get()?; let mut query = episode_actions::table @@ -117,7 +120,7 @@ impl gpodder::EpisodeActionRepository for SqliteRepository { .filter( episode_actions::user_id .eq(user.id) - .and(episode_actions::time_changed.ge(since.unwrap_or(0))), + .and(episode_actions::time_changed.ge(since)), ) .select(( devices::device_id.nullable(), @@ -157,16 +160,11 @@ impl gpodder::EpisodeActionRepository for SqliteRepository { query.get_results(conn)? }; - let max_timestamp = db_actions - .iter() - .map(|(_, a)| a.time_changed) - .max() - .unwrap_or(0); let actions = db_actions .into_iter() .map(gpodder::EpisodeAction::from) .collect(); - Ok((max_timestamp + 1, actions)) + Ok(actions) } } diff --git a/src/gpodder/mod.rs b/src/gpodder/mod.rs index 18f23b2..0e89cb9 100644 --- a/src/gpodder/mod.rs +++ b/src/gpodder/mod.rs @@ -1,6 +1,7 @@ pub mod models; mod repository; +use chrono::{DateTime, Utc}; pub use models::*; pub use repository::GpodderRepository; @@ -11,9 +12,15 @@ pub enum AuthErr { Other(Box), } -pub trait Store: AuthStore + DeviceRepository + SubscriptionRepository {} +pub trait Store: + AuthStore + DeviceRepository + SubscriptionRepository + EpisodeActionRepository +{ +} -impl Store for T where T: AuthStore + DeviceRepository + SubscriptionRepository {} +impl Store for T where + T: AuthStore + DeviceRepository + SubscriptionRepository + EpisodeActionRepository +{ +} pub trait AuthRepository { /// Validate the given session ID and return its user. @@ -78,7 +85,7 @@ pub trait SubscriptionRepository { user: &User, device_id: &str, urls: Vec, - time_changed: chrono::DateTime, + time_changed: DateTime, ) -> Result<(), AuthErr>; /// Update the list of subscriptions for a device by adding and removing the given URLs @@ -88,7 +95,7 @@ pub trait SubscriptionRepository { device_id: &str, add: Vec, remove: Vec, - time_changed: chrono::DateTime, + time_changed: DateTime, ) -> Result<(), AuthErr>; /// Returns the changes in subscriptions since the given timestamp. @@ -96,22 +103,26 @@ pub trait SubscriptionRepository { &self, user: &User, device_id: &str, - since: chrono::DateTime, - ) -> Result<(chrono::DateTime, Vec, Vec), AuthErr>; + since: DateTime, + ) -> Result<(DateTime, Vec, Vec), AuthErr>; } pub trait EpisodeActionRepository { /// Insert the given episode actions into the datastore. - fn add_episode_actions(&self, user: &User, actions: Vec) - -> Result; + fn add_episode_actions( + &self, + user: &User, + actions: Vec, + time_changed: DateTime, + ) -> Result<(), AuthErr>; /// Retrieve the list of episode actions for the given user. fn episode_actions_for_user( &self, user: &User, - since: Option, + since: Option>, podcast: Option, device: Option, aggregated: bool, - ) -> Result<(i64, Vec), AuthErr>; + ) -> Result, AuthErr>; } diff --git a/src/gpodder/models.rs b/src/gpodder/models.rs index 2ecb004..1c9e550 100644 --- a/src/gpodder/models.rs +++ b/src/gpodder/models.rs @@ -1,4 +1,4 @@ -use chrono::{DateTime, NaiveDateTime, Utc}; +use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; #[derive(Clone)] @@ -52,7 +52,8 @@ pub enum EpisodeActionType { pub struct EpisodeAction { pub podcast: String, pub episode: String, - pub timestamp: Option, + pub timestamp: Option>, + pub time_changed: DateTime, #[serde(default)] pub device: Option, #[serde(flatten)] diff --git a/src/gpodder/repository.rs b/src/gpodder/repository.rs index e0d9d12..9913c9d 100644 --- a/src/gpodder/repository.rs +++ b/src/gpodder/repository.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use argon2::{Argon2, PasswordHash, PasswordVerifier}; +use chrono::{DateTime, TimeDelta, Utc}; use rand::Rng; use super::{models, AuthErr, Store}; @@ -26,9 +27,7 @@ impl GpodderRepository { .ok_or(AuthErr::UnknownSession)?; // Expired sessions still in the database are considered removed - if chrono::Utc::now() - session.last_seen - > chrono::TimeDelta::new(MAX_SESSION_AGE, 0).unwrap() - { + if Utc::now() - session.last_seen > TimeDelta::new(MAX_SESSION_AGE, 0).unwrap() { Err(AuthErr::UnknownSession) } else { Ok(session) @@ -57,7 +56,7 @@ impl GpodderRepository { pub fn create_session(&self, user: &models::User) -> Result { let session = models::Session { id: rand::thread_rng().gen(), - last_seen: chrono::Utc::now(), + last_seen: Utc::now(), user: user.clone(), }; @@ -100,13 +99,13 @@ impl GpodderRepository { user: &models::User, device_id: &str, urls: Vec, - ) -> Result, AuthErr> { - let time_changed = chrono::Utc::now(); + ) -> Result, AuthErr> { + let time_changed = Utc::now(); self.store .set_subscriptions_for_device(user, device_id, urls, time_changed)?; - Ok(time_changed + chrono::TimeDelta::seconds(1)) + Ok(time_changed + TimeDelta::seconds(1)) } pub fn update_subscriptions_for_device( @@ -115,29 +114,55 @@ impl GpodderRepository { device_id: &str, add: Vec, remove: Vec, - ) -> Result, AuthErr> { - let time_changed = chrono::Utc::now(); + ) -> Result, AuthErr> { + let time_changed = Utc::now(); self.store .update_subscriptions_for_device(user, device_id, add, remove, time_changed)?; - Ok(time_changed + chrono::TimeDelta::seconds(1)) + Ok(time_changed + TimeDelta::seconds(1)) } pub fn subscription_updates_for_device( &self, user: &models::User, device_id: &str, - since: chrono::DateTime, - ) -> Result<(chrono::DateTime, Vec, Vec), AuthErr> { + since: DateTime, + ) -> Result<(DateTime, Vec, Vec), AuthErr> { let (max_time_changed, added, removed) = self .store .subscription_updates_for_device(user, device_id, since)?; - Ok(( - max_time_changed + chrono::TimeDelta::seconds(1), - added, - removed, - )) + Ok((max_time_changed + TimeDelta::seconds(1), added, removed)) + } + + pub fn add_episode_actions( + &self, + user: &models::User, + actions: Vec, + ) -> Result, AuthErr> { + let time_changed = Utc::now(); + + self.store + .add_episode_actions(user, actions, time_changed)?; + + Ok(time_changed + TimeDelta::seconds(1)) + } + + pub fn episode_actions_for_user( + &self, + user: &models::User, + since: Option>, + podcast: Option, + device: Option, + aggregated: bool, + ) -> Result<(DateTime, Vec), AuthErr> { + let now = chrono::Utc::now(); + let actions = self + .store + .episode_actions_for_user(user, since, podcast, device, aggregated)?; + let max_time_changed = actions.iter().map(|a| a.time_changed).max().unwrap_or(now); + + Ok((max_time_changed + TimeDelta::seconds(1), actions)) } } diff --git a/src/server/gpodder/advanced/episodes.rs b/src/server/gpodder/advanced/episodes.rs index b862b9c..22839c0 100644 --- a/src/server/gpodder/advanced/episodes.rs +++ b/src/server/gpodder/advanced/episodes.rs @@ -4,10 +4,11 @@ use axum::{ routing::post, Extension, Json, Router, }; +use chrono::DateTime; use serde::{Deserialize, Serialize}; use crate::{ - gpodder::{self, EpisodeActionRepository}, + gpodder, server::{ error::{AppError, AppResult}, gpodder::{ @@ -43,12 +44,12 @@ async fn post_episode_actions( } Ok( - tokio::task::spawn_blocking(move || ctx.repo.add_episode_actions(&user, actions)) + tokio::task::spawn_blocking(move || ctx.store.add_episode_actions(&user, actions)) .await .unwrap() - .map(|timestamp| { + .map(|time_changed| { Json(UpdatedUrlsResponse { - timestamp, + timestamp: time_changed.timestamp(), update_urls: Vec::new(), }) })?, @@ -84,10 +85,15 @@ async fn get_episode_actions( return Err(AppError::BadRequest); } + let since = filter + .since + .map(|ts| DateTime::from_timestamp(ts, 0)) + .flatten(); + Ok(tokio::task::spawn_blocking(move || { - ctx.repo.episode_actions_for_user( + ctx.store.episode_actions_for_user( &user, - filter.since, + since, filter.podcast, filter.device, filter.aggregated, @@ -95,5 +101,10 @@ async fn get_episode_actions( }) .await .unwrap() - .map(|(timestamp, actions)| Json(EpisodeActionsResponse { timestamp, actions }))?) + .map(|(ts, actions)| { + Json(EpisodeActionsResponse { + timestamp: ts.timestamp(), + actions, + }) + })?) } diff --git a/src/server/mod.rs b/src/server/mod.rs index 0a88fd6..93ac8d3 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -6,7 +6,6 @@ use tower_http::trace::TraceLayer; #[derive(Clone)] pub struct Context { - pub repo: crate::db::SqliteRepository, pub store: crate::gpodder::GpodderRepository, } From 8a9744c4a9e5d238684b018ae7813f8a4b961ca8 Mon Sep 17 00:00:00 2001 From: Jef Roosens Date: Sat, 15 Mar 2025 20:06:58 +0100 Subject: [PATCH 13/14] refactor: decoupled gpodder and server models --- src/gpodder/models.rs | 13 -- src/server/gpodder/advanced/devices.rs | 9 +- src/server/gpodder/advanced/episodes.rs | 30 +++-- src/server/gpodder/models.rs | 164 ++++++++++++++++++++++++ 4 files changed, 185 insertions(+), 31 deletions(-) diff --git a/src/gpodder/models.rs b/src/gpodder/models.rs index 1c9e550..3bbf229 100644 --- a/src/gpodder/models.rs +++ b/src/gpodder/models.rs @@ -1,5 +1,4 @@ use chrono::{DateTime, Utc}; -use serde::{Deserialize, Serialize}; #[derive(Clone)] pub struct User { @@ -8,8 +7,6 @@ pub struct User { pub password_hash: String, } -#[derive(Serialize, Deserialize)] -#[serde(rename_all = "lowercase")] pub enum DeviceType { Desktop, Laptop, @@ -18,7 +15,6 @@ pub enum DeviceType { Other, } -#[derive(Serialize)] pub struct Device { pub id: String, pub caption: String, @@ -26,37 +22,28 @@ pub struct Device { pub subscriptions: i64, } -#[derive(Deserialize)] pub struct DevicePatch { pub caption: Option, pub r#type: Option, } -#[derive(Serialize, Deserialize, Debug)] -#[serde(rename_all = "lowercase")] -#[serde(tag = "action")] pub enum EpisodeActionType { Download, Play { - #[serde(default)] started: Option, position: i32, - #[serde(default)] total: Option, }, Delete, New, } -#[derive(Serialize, Deserialize, Debug)] pub struct EpisodeAction { pub podcast: String, pub episode: String, pub timestamp: Option>, pub time_changed: DateTime, - #[serde(default)] pub device: Option, - #[serde(flatten)] pub action: EpisodeActionType, } diff --git a/src/server/gpodder/advanced/devices.rs b/src/server/gpodder/advanced/devices.rs index 397ddb3..859e1c1 100644 --- a/src/server/gpodder/advanced/devices.rs +++ b/src/server/gpodder/advanced/devices.rs @@ -12,6 +12,7 @@ use crate::{ gpodder::{ auth_middleware, format::{Format, StringWithFormat}, + models, }, Context, }, @@ -28,7 +29,7 @@ async fn get_devices( State(ctx): State, Path(username): Path, Extension(user): Extension, -) -> AppResult>> { +) -> AppResult>> { if username.format != Format::Json { return Err(AppError::NotFound); } @@ -41,7 +42,7 @@ async fn get_devices( tokio::task::spawn_blocking(move || ctx.store.devices_for_user(&user)) .await .unwrap() - .map(Json)?, + .map(|devices| Json(devices.into_iter().map(models::Device::from).collect()))?, ) } @@ -49,13 +50,13 @@ async fn post_device( State(ctx): State, Path((_username, id)): Path<(String, StringWithFormat)>, Extension(user): Extension, - Json(patch): Json, + Json(patch): Json, ) -> AppResult<()> { if id.format != Format::Json { return Err(AppError::NotFound); } - tokio::task::spawn_blocking(move || ctx.store.update_device_info(&user, &id, patch)) + tokio::task::spawn_blocking(move || ctx.store.update_device_info(&user, &id, patch.into())) .await .unwrap()?; diff --git a/src/server/gpodder/advanced/episodes.rs b/src/server/gpodder/advanced/episodes.rs index 22839c0..6c7ff32 100644 --- a/src/server/gpodder/advanced/episodes.rs +++ b/src/server/gpodder/advanced/episodes.rs @@ -14,6 +14,7 @@ use crate::{ gpodder::{ auth_middleware, format::{Format, StringWithFormat}, + models, models::UpdatedUrlsResponse, }, Context, @@ -33,7 +34,7 @@ async fn post_episode_actions( State(ctx): State, Path(username): Path, Extension(user): Extension, - Json(actions): Json>, + Json(actions): Json>, ) -> AppResult> { if username.format != Format::Json { return Err(AppError::NotFound); @@ -43,17 +44,18 @@ async fn post_episode_actions( return Err(AppError::BadRequest); } - Ok( - tokio::task::spawn_blocking(move || ctx.store.add_episode_actions(&user, actions)) - .await - .unwrap() - .map(|time_changed| { - Json(UpdatedUrlsResponse { - timestamp: time_changed.timestamp(), - update_urls: Vec::new(), - }) - })?, - ) + Ok(tokio::task::spawn_blocking(move || { + ctx.store + .add_episode_actions(&user, actions.into_iter().map(Into::into).collect()) + }) + .await + .unwrap() + .map(|time_changed| { + Json(UpdatedUrlsResponse { + timestamp: time_changed.timestamp(), + update_urls: Vec::new(), + }) + })?) } #[derive(Deserialize, Default)] @@ -68,7 +70,7 @@ struct FilterQuery { #[derive(Serialize)] struct EpisodeActionsResponse { timestamp: i64, - actions: Vec, + actions: Vec, } async fn get_episode_actions( @@ -104,7 +106,7 @@ async fn get_episode_actions( .map(|(ts, actions)| { Json(EpisodeActionsResponse { timestamp: ts.timestamp(), - actions, + actions: actions.into_iter().map(Into::into).collect(), }) })?) } diff --git a/src/server/gpodder/models.rs b/src/server/gpodder/models.rs index 2be8f87..b2268af 100644 --- a/src/server/gpodder/models.rs +++ b/src/server/gpodder/models.rs @@ -1,5 +1,8 @@ +use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; +use crate::gpodder; + #[derive(Deserialize, Debug)] pub struct SubscriptionDelta { pub add: Vec, @@ -18,3 +21,164 @@ pub struct UpdatedUrlsResponse { pub timestamp: i64, pub update_urls: Vec<(String, String)>, } + +#[derive(Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum DeviceType { + Desktop, + Laptop, + Mobile, + Server, + Other, +} + +#[derive(Serialize)] +pub struct Device { + pub id: String, + pub caption: String, + pub r#type: DeviceType, + pub subscriptions: i64, +} + +#[derive(Deserialize)] +pub struct DevicePatch { + pub caption: Option, + pub r#type: Option, +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "lowercase")] +#[serde(tag = "action")] +pub enum EpisodeActionType { + Download, + Play { + #[serde(default)] + started: Option, + position: i32, + #[serde(default)] + total: Option, + }, + Delete, + New, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct EpisodeAction { + pub podcast: String, + pub episode: String, + pub timestamp: Option, + #[serde(default)] + pub device: Option, + #[serde(flatten)] + pub action: EpisodeActionType, +} + +impl From for DeviceType { + fn from(value: gpodder::DeviceType) -> Self { + match value { + gpodder::DeviceType::Other => Self::Other, + gpodder::DeviceType::Laptop => Self::Laptop, + gpodder::DeviceType::Mobile => Self::Mobile, + gpodder::DeviceType::Server => Self::Server, + gpodder::DeviceType::Desktop => Self::Desktop, + } + } +} + +impl From for gpodder::DeviceType { + fn from(value: DeviceType) -> Self { + match value { + DeviceType::Other => gpodder::DeviceType::Other, + DeviceType::Laptop => gpodder::DeviceType::Laptop, + DeviceType::Mobile => gpodder::DeviceType::Mobile, + DeviceType::Server => gpodder::DeviceType::Server, + DeviceType::Desktop => gpodder::DeviceType::Desktop, + } + } +} + +impl From for Device { + fn from(value: gpodder::Device) -> Self { + Self { + id: value.id, + caption: value.caption, + r#type: value.r#type.into(), + subscriptions: value.subscriptions, + } + } +} + +impl From for gpodder::DevicePatch { + fn from(value: DevicePatch) -> Self { + Self { + caption: value.caption, + r#type: value.r#type.map(Into::into), + } + } +} + +impl From for EpisodeActionType { + fn from(value: gpodder::EpisodeActionType) -> Self { + match value { + gpodder::EpisodeActionType::New => Self::New, + gpodder::EpisodeActionType::Delete => Self::Delete, + gpodder::EpisodeActionType::Download => Self::Download, + gpodder::EpisodeActionType::Play { + started, + position, + total, + } => Self::Play { + started, + position, + total, + }, + } + } +} + +impl From for gpodder::EpisodeActionType { + fn from(value: EpisodeActionType) -> Self { + match value { + EpisodeActionType::New => gpodder::EpisodeActionType::New, + EpisodeActionType::Delete => gpodder::EpisodeActionType::Delete, + EpisodeActionType::Download => gpodder::EpisodeActionType::Download, + EpisodeActionType::Play { + started, + position, + total, + } => gpodder::EpisodeActionType::Play { + started, + position, + total, + }, + } + } +} + +impl From for EpisodeAction { + fn from(value: gpodder::EpisodeAction) -> Self { + Self { + podcast: value.podcast, + episode: value.episode, + timestamp: value.timestamp.map(|ts| ts.timestamp()), + device: value.device, + action: value.action.into(), + } + } +} + +impl From for gpodder::EpisodeAction { + fn from(value: EpisodeAction) -> Self { + Self { + podcast: value.podcast, + episode: value.episode, + // TODO remove this unwrap + timestamp: value + .timestamp + .map(|ts| DateTime::from_timestamp(ts, 0).unwrap()), + device: value.device, + action: value.action.into(), + time_changed: DateTime::::MIN_UTC, + } + } +} From 330877c8c587b2790aec9c0e5d0115899f155a6b Mon Sep 17 00:00:00 2001 From: Jef Roosens Date: Sat, 15 Mar 2025 20:20:38 +0100 Subject: [PATCH 14/14] refactor: moved knowledge of subscription change time to store --- src/db/repository/subscription.rs | 52 ++++++++++++++------ src/gpodder/mod.rs | 6 +-- src/gpodder/models.rs | 5 ++ src/gpodder/repository.rs | 26 ++++++++-- src/server/gpodder/advanced/subscriptions.rs | 4 +- src/server/gpodder/simple/subscriptions.rs | 4 +- 6 files changed, 72 insertions(+), 25 deletions(-) diff --git a/src/db/repository/subscription.rs b/src/db/repository/subscription.rs index 6ab8fe5..5448380 100644 --- a/src/db/repository/subscription.rs +++ b/src/db/repository/subscription.rs @@ -1,5 +1,6 @@ use std::collections::HashSet; +use chrono::DateTime; use diesel::prelude::*; use super::SqliteRepository; @@ -8,24 +9,39 @@ use crate::{ gpodder, }; +impl From<(String, i64)> for gpodder::Subscription { + fn from((url, ts): (String, i64)) -> Self { + Self { + url, + time_changed: DateTime::from_timestamp(ts, 0).unwrap(), + } + } +} + impl gpodder::SubscriptionRepository for SqliteRepository { fn subscriptions_for_user( &self, user: &gpodder::User, - ) -> Result, gpodder::AuthErr> { + ) -> Result, gpodder::AuthErr> { Ok(device_subscriptions::table .inner_join(devices::table) .filter(devices::user_id.eq(user.id)) - .select(device_subscriptions::podcast_url) + .select(( + device_subscriptions::podcast_url, + device_subscriptions::time_changed, + )) .distinct() - .get_results(&mut self.pool.get()?)?) + .get_results::<(String, i64)>(&mut self.pool.get()?)? + .into_iter() + .map(Into::into) + .collect()) } fn subscriptions_for_device( &self, user: &gpodder::User, device_id: &str, - ) -> Result, gpodder::AuthErr> { + ) -> Result, gpodder::AuthErr> { Ok(device_subscriptions::table .inner_join(devices::table) .filter( @@ -33,8 +49,14 @@ impl gpodder::SubscriptionRepository for SqliteRepository { .eq(user.id) .and(devices::device_id.eq(device_id)), ) - .select(device_subscriptions::podcast_url) - .get_results(&mut self.pool.get()?)?) + .select(( + device_subscriptions::podcast_url, + device_subscriptions::time_changed, + )) + .get_results::<(String, i64)>(&mut self.pool.get()?)? + .into_iter() + .map(Into::into) + .collect()) } fn set_subscriptions_for_device( @@ -230,7 +252,7 @@ impl gpodder::SubscriptionRepository for SqliteRepository { user: &gpodder::User, device_id: &str, since: chrono::DateTime, - ) -> Result<(chrono::DateTime, Vec, Vec), gpodder::AuthErr> { + ) -> Result<(Vec, Vec), gpodder::AuthErr> { let since = since.timestamp(); let (mut added, mut removed) = (Vec::new(), Vec::new()); @@ -245,20 +267,22 @@ impl gpodder::SubscriptionRepository for SqliteRepository { ) .select(db::DeviceSubscription::as_select()); - let mut max_time: chrono::DateTime = chrono::DateTime::::MIN_UTC; - for sub in query.load_iter(&mut self.pool.get()?)? { let sub = sub?; if sub.deleted { - removed.push(sub.podcast_url); + removed.push(gpodder::Subscription { + url: sub.podcast_url, + time_changed: DateTime::from_timestamp(sub.time_changed, 0).unwrap(), + }); } else { - added.push(sub.podcast_url); + added.push(gpodder::Subscription { + url: sub.podcast_url, + time_changed: DateTime::from_timestamp(sub.time_changed, 0).unwrap(), + }); } - - max_time = max_time.max(chrono::DateTime::from_timestamp(sub.time_changed, 0).unwrap()); } - Ok((max_time, added, removed)) + Ok((added, removed)) } } diff --git a/src/gpodder/mod.rs b/src/gpodder/mod.rs index 0e89cb9..8336de1 100644 --- a/src/gpodder/mod.rs +++ b/src/gpodder/mod.rs @@ -74,10 +74,10 @@ pub trait SubscriptionRepository { &self, user: &User, device_id: &str, - ) -> Result, AuthErr>; + ) -> Result, AuthErr>; /// Return all subscriptions for a given user - fn subscriptions_for_user(&self, user: &User) -> Result, AuthErr>; + fn subscriptions_for_user(&self, user: &User) -> Result, AuthErr>; /// Replace the list of subscriptions for a device with the given list fn set_subscriptions_for_device( @@ -104,7 +104,7 @@ pub trait SubscriptionRepository { user: &User, device_id: &str, since: DateTime, - ) -> Result<(DateTime, Vec, Vec), AuthErr>; + ) -> Result<(Vec, Vec), AuthErr>; } pub trait EpisodeActionRepository { diff --git a/src/gpodder/models.rs b/src/gpodder/models.rs index 3bbf229..b68e73b 100644 --- a/src/gpodder/models.rs +++ b/src/gpodder/models.rs @@ -52,3 +52,8 @@ pub struct Session { pub last_seen: DateTime, pub user: User, } + +pub struct Subscription { + pub url: String, + pub time_changed: DateTime, +} diff --git a/src/gpodder/repository.rs b/src/gpodder/repository.rs index 9913c9d..411f9cf 100644 --- a/src/gpodder/repository.rs +++ b/src/gpodder/repository.rs @@ -86,11 +86,14 @@ impl GpodderRepository { &self, user: &models::User, device_id: &str, - ) -> Result, AuthErr> { + ) -> Result, AuthErr> { self.store.subscriptions_for_device(user, device_id) } - pub fn subscriptions_for_user(&self, user: &models::User) -> Result, AuthErr> { + pub fn subscriptions_for_user( + &self, + user: &models::User, + ) -> Result, AuthErr> { self.store.subscriptions_for_user(user) } @@ -128,11 +131,26 @@ impl GpodderRepository { user: &models::User, device_id: &str, since: DateTime, - ) -> Result<(DateTime, Vec, Vec), AuthErr> { - let (max_time_changed, added, removed) = self + ) -> Result< + ( + DateTime, + Vec, + Vec, + ), + AuthErr, + > { + let now = chrono::Utc::now(); + + let (added, removed) = self .store .subscription_updates_for_device(user, device_id, since)?; + let max_time_changed = added + .iter() + .chain(removed.iter()) + .map(|s| s.time_changed) + .max() + .unwrap_or(now); Ok((max_time_changed + TimeDelta::seconds(1), added, removed)) } diff --git a/src/server/gpodder/advanced/subscriptions.rs b/src/server/gpodder/advanced/subscriptions.rs index dbb8d28..e5691e1 100644 --- a/src/server/gpodder/advanced/subscriptions.rs +++ b/src/server/gpodder/advanced/subscriptions.rs @@ -85,8 +85,8 @@ pub async fn get_subscription_changes( .unwrap() .map(|(next_time_changed, add, remove)| { Json(SubscriptionDeltaResponse { - add, - remove, + add: add.into_iter().map(|s| s.url).collect(), + remove: remove.into_iter().map(|s| s.url).collect(), timestamp: next_time_changed.timestamp(), }) })?) diff --git a/src/server/gpodder/simple/subscriptions.rs b/src/server/gpodder/simple/subscriptions.rs index 4f6266f..6e9227c 100644 --- a/src/server/gpodder/simple/subscriptions.rs +++ b/src/server/gpodder/simple/subscriptions.rs @@ -37,7 +37,7 @@ pub async fn get_device_subscriptions( tokio::task::spawn_blocking(move || ctx.store.subscriptions_for_device(&user, &id)) .await .unwrap() - .map(Json)?, + .map(|subs| Json(subs.into_iter().map(|s| s.url).collect()))?, ) } @@ -54,7 +54,7 @@ pub async fn get_user_subscriptions( tokio::task::spawn_blocking(move || ctx.store.subscriptions_for_user(&user)) .await .unwrap() - .map(Json)?, + .map(|subs| Json(subs.into_iter().map(|s| s.url).collect()))?, ) }