diff --git a/gpodder/src/repository.rs b/gpodder/src/repository.rs index 5c086e6..173785d 100644 --- a/gpodder/src/repository.rs +++ b/gpodder/src/repository.rs @@ -9,13 +9,22 @@ use crate::{ store::{AuthErr, GpodderStore}, }; -const MAX_SESSION_AGE: i64 = 60 * 60 * 24 * 7; +const MAX_SESSION_AGE: TimeDelta = TimeDelta::seconds(60 * 60 * 24 * 7); +/// Main abstraction over the database that provides API-compatible methods for querying and +/// modifying the underlying database #[derive(Clone)] pub struct GpodderRepository { store: Arc, } +/// Authenticated view of the repository, providing methods that take the authenticated user +/// explicitely into account +pub struct AuthenticatedRepository<'a> { + store: &'a (dyn GpodderStore + Send + Sync), + user: &'a models::User, +} + impl GpodderRepository { pub fn new(store: impl GpodderStore + Send + Sync + 'static) -> Self { Self { @@ -23,6 +32,14 @@ impl GpodderRepository { } } + /// Return an authenticated view of the repository for the given user + pub fn user<'a>(&'a self, user: &'a models::User) -> AuthenticatedRepository<'a> { + AuthenticatedRepository { + store: self.store.as_ref(), + user: user, + } + } + pub fn get_session(&self, session_id: i64) -> Result { let session = self .store @@ -30,21 +47,13 @@ impl GpodderRepository { .ok_or(AuthErr::UnknownSession)?; // Expired sessions still in the database are considered removed - if Utc::now() - session.last_seen > TimeDelta::new(MAX_SESSION_AGE, 0).unwrap() { + if Utc::now() - session.last_seen > MAX_SESSION_AGE { Err(AuthErr::UnknownSession) } else { Ok(session) } } - pub fn paginated_sessions( - &self, - user: &models::User, - page: models::Page, - ) -> Result, AuthErr> { - self.store.paginated_sessions(user, page) - } - pub fn get_user(&self, username: &str) -> Result { self.store.get_user(username)?.ok_or(AuthErr::UnknownUser) } @@ -79,23 +88,6 @@ impl GpodderRepository { } } - pub fn create_session( - &self, - user: &models::User, - user_agent: Option, - ) -> Result { - let session = models::Session { - id: rand::thread_rng().r#gen(), - last_seen: Utc::now(), - user: user.clone(), - user_agent, - }; - - self.store.insert_session(&session)?; - - Ok(session) - } - pub fn refresh_session(&self, session: &models::Session) -> Result<(), AuthErr> { let now = Utc::now(); @@ -107,27 +99,80 @@ impl GpodderRepository { } pub fn remove_old_sessions(&self) -> Result { - let min_last_seen = Utc::now() - TimeDelta::seconds(MAX_SESSION_AGE); + let min_last_seen = Utc::now() - MAX_SESSION_AGE; self.store.remove_old_sessions(min_last_seen) } +} - pub fn devices_for_user(&self, user: &models::User) -> Result, AuthErr> { - self.store.devices_for_user(user) +impl<'a> AuthenticatedRepository<'a> { + /// Retrieve the given session from the database, if it exists and is visible to the user + pub fn get_session(&self, session_id: i64) -> Result { + let session = self + .store + .get_session(session_id)? + .ok_or(AuthErr::UnknownSession)?; + + // Users can't see sessions from other users, and expired sessions still in the database + // are considered removed + if session.user.id != self.user.id || Utc::now() - session.last_seen > MAX_SESSION_AGE { + Err(AuthErr::UnknownSession) + } else { + Ok(session) + } } + /// Retrieve a paginated list of the user's sessions + pub fn paginated_sessions(&self, page: models::Page) -> Result, AuthErr> { + self.store.paginated_sessions(self.user, page) + } + + /// Create a new session for the authenticated user + pub fn create_session(&self, user_agent: Option) -> Result { + let session = models::Session { + id: rand::thread_rng().r#gen(), + last_seen: Utc::now(), + user: self.user.clone(), + user_agent, + }; + + self.store.insert_session(&session)?; + + Ok(session) + } + + /// Set the session's last seen value to the current time + pub fn refresh_session(&self, session: &models::Session) -> Result<(), AuthErr> { + let now = Utc::now(); + + self.store.refresh_session(session, now) + } + + /// Remove the given session, if it belongs to the authenticated user + pub fn remove_session(&self, session_id: i64) -> Result<(), AuthErr> { + // This fails if the session doesn't exist for the user, so it's basically a "exists" check + let session = self.get_session(session_id)?; + + self.store.remove_session(session.id) + } + + /// Return the devices for the authenticated user + pub fn devices(&self) -> Result, AuthErr> { + self.store.devices_for_user(self.user) + } + + /// Update the metadata of a device pub fn update_device_info( &self, - user: &models::User, device_id: &str, patch: models::DevicePatch, ) -> Result<(), AuthErr> { - self.store.update_device_info(user, device_id, patch) + self.store.update_device_info(self.user, device_id, patch) } + /// Update the sync status for some of the user's devices pub fn update_device_sync_status( &self, - user: &models::User, sync: Vec>, unsync: Vec<&str>, ) -> Result<(), AuthErr> { @@ -146,71 +191,72 @@ impl GpodderRepository { unsync.remove(device_id); } - let group_id = self.store.merge_sync_groups(user, remaining)?; + let group_id = self.store.merge_sync_groups(self.user, remaining)?; self.store.synchronize_sync_group(group_id, now)?; } // Finally we unsync the remaining devices self.store - .remove_from_sync_group(user, unsync.into_iter().collect())?; + .remove_from_sync_group(self.user, unsync.into_iter().collect())?; Ok(()) } - pub fn devices_by_sync_group( - &self, - user: &models::User, - ) -> Result<(Vec, Vec>), AuthErr> { - self.store.devices_by_sync_group(user) + /// Return the user's devices, grouped per sync group + pub fn devices_by_sync_group(&self) -> Result<(Vec, Vec>), AuthErr> { + self.store.devices_by_sync_group(self.user) } + /// Retrieve the user's subscriptions for a device pub fn subscriptions_for_device( &self, - user: &models::User, device_id: &str, ) -> Result, AuthErr> { - self.store.subscriptions_for_device(user, device_id) + self.store.subscriptions_for_device(self.user, device_id) } - pub fn subscriptions_for_user( - &self, - user: &models::User, - ) -> Result, AuthErr> { - self.store.subscriptions_for_user(user) + /// Retrieve the user's subscriptions + pub fn subscriptions(&self) -> Result, AuthErr> { + self.store.subscriptions_for_user(self.user) } + /// Set the subscriptions for a given device pub fn set_subscriptions_for_device( &self, - user: &models::User, device_id: &str, urls: Vec, ) -> Result, AuthErr> { let time_changed = Utc::now(); self.store - .set_subscriptions_for_device(user, device_id, urls, time_changed)?; + .set_subscriptions_for_device(self.user, device_id, urls, time_changed)?; Ok(time_changed + TimeDelta::seconds(1)) } + /// Add and remove subscriptions to and from a given device. pub fn update_subscriptions_for_device( &self, - user: &models::User, device_id: &str, add: Vec, remove: Vec, ) -> Result, AuthErr> { let time_changed = Utc::now(); - self.store - .update_subscriptions_for_device(user, device_id, add, remove, time_changed)?; + self.store.update_subscriptions_for_device( + self.user, + device_id, + add, + remove, + time_changed, + )?; Ok(time_changed + TimeDelta::seconds(1)) } + /// Get the changes in subscriptions for a given device after a given timestamp. pub fn subscription_updates_for_device( &self, - user: &models::User, device_id: &str, since: DateTime, ) -> Result< @@ -225,7 +271,7 @@ impl GpodderRepository { let (added, removed) = self .store - .subscription_updates_for_device(user, device_id, since)?; + .subscription_updates_for_device(self.user, device_id, since)?; let max_time_changed = added .iter() @@ -236,22 +282,22 @@ impl GpodderRepository { Ok((max_time_changed + TimeDelta::seconds(1), added, removed)) } + /// Add episode actions to the database pub fn add_episode_actions( &self, - user: &models::User, actions: Vec, ) -> Result, AuthErr> { let time_changed = Utc::now(); self.store - .add_episode_actions(user, actions, time_changed)?; + .add_episode_actions(self.user, actions, time_changed)?; Ok(time_changed + TimeDelta::seconds(1)) } + /// Get episode actions for the currently authenticated user pub fn episode_actions_for_user( &self, - user: &models::User, since: Option>, podcast: Option, device: Option, @@ -260,7 +306,7 @@ impl GpodderRepository { let now = chrono::Utc::now(); let actions = self .store - .episode_actions_for_user(user, since, podcast, device, aggregated)?; + .episode_actions_for_user(self.user, since, podcast, device, aggregated)?; let max_time_changed = actions.iter().map(|a| a.time_changed).max().unwrap_or(now); Ok((max_time_changed + TimeDelta::seconds(1), actions)) diff --git a/otter/src/cli/gpo.rs b/otter/src/cli/gpo.rs index 1cc2f65..f2d0cf0 100644 --- a/otter/src/cli/gpo.rs +++ b/otter/src/cli/gpo.rs @@ -23,15 +23,14 @@ impl Command { match self { Self::Sync { username, devices } => { let user = store.get_user(username)?; - store.update_device_sync_status( - &user, + store.user(&user).update_device_sync_status( vec![devices.iter().map(|s| s.as_ref()).collect()], Vec::new(), )?; } Self::Devices { username } => { let user = store.get_user(username)?; - let devices = store.devices_for_user(&user)?; + let devices = store.user(&user).devices()?; for device in devices { println!("{} ({} subscriptions)", device.id, device.subscriptions); diff --git a/otter/src/server/gpodder/advanced/auth.rs b/otter/src/server/gpodder/advanced/auth.rs index 9c920dc..7e1c8fc 100644 --- a/otter/src/server/gpodder/advanced/auth.rs +++ b/otter/src/server/gpodder/advanced/auth.rs @@ -66,7 +66,7 @@ async fn post_login( .validate_credentials(auth.username(), auth.password())?; let user_agent = user_agent.map(|header| header.to_string()); - let session = ctx.store.create_session(&user, user_agent)?; + let session = ctx.store.user(&user).create_session(user_agent)?; Ok::<_, AuthErr>(session) }) diff --git a/otter/src/server/gpodder/advanced/devices.rs b/otter/src/server/gpodder/advanced/devices.rs index 30bec37..e1bbc12 100644 --- a/otter/src/server/gpodder/advanced/devices.rs +++ b/otter/src/server/gpodder/advanced/devices.rs @@ -39,7 +39,7 @@ async fn get_devices( } Ok( - tokio::task::spawn_blocking(move || ctx.store.devices_for_user(&user)) + tokio::task::spawn_blocking(move || ctx.store.user(&user).devices()) .await .unwrap() .map(|devices| Json(devices.into_iter().map(models::Device::from).collect()))?, @@ -56,9 +56,11 @@ async fn post_device( return Err(AppError::NotFound); } - tokio::task::spawn_blocking(move || ctx.store.update_device_info(&user, &id, patch.into())) - .await - .unwrap()?; + tokio::task::spawn_blocking(move || { + ctx.store.user(&user).update_device_info(&id, patch.into()) + }) + .await + .unwrap()?; Ok(()) } diff --git a/otter/src/server/gpodder/advanced/episodes.rs b/otter/src/server/gpodder/advanced/episodes.rs index 2d62333..19e4e1d 100644 --- a/otter/src/server/gpodder/advanced/episodes.rs +++ b/otter/src/server/gpodder/advanced/episodes.rs @@ -46,7 +46,8 @@ async fn post_episode_actions( Ok(tokio::task::spawn_blocking(move || { ctx.store - .add_episode_actions(&user, actions.into_iter().map(Into::into).collect()) + .user(&user) + .add_episode_actions(actions.into_iter().map(Into::into).collect()) }) .await .unwrap() @@ -90,8 +91,7 @@ async fn get_episode_actions( let since = filter.since.and_then(|ts| DateTime::from_timestamp(ts, 0)); Ok(tokio::task::spawn_blocking(move || { - ctx.store.episode_actions_for_user( - &user, + ctx.store.user(&user).episode_actions_for_user( since, filter.podcast, filter.device, diff --git a/otter/src/server/gpodder/advanced/subscriptions.rs b/otter/src/server/gpodder/advanced/subscriptions.rs index b430b3e..6687d32 100644 --- a/otter/src/server/gpodder/advanced/subscriptions.rs +++ b/otter/src/server/gpodder/advanced/subscriptions.rs @@ -44,7 +44,8 @@ pub async fn post_subscription_changes( Ok(tokio::task::spawn_blocking(move || { ctx.store - .update_subscriptions_for_device(&user, &id, delta.add, delta.remove) + .user(&user) + .update_subscriptions_for_device(&id, delta.add, delta.remove) }) .await .unwrap() @@ -79,7 +80,9 @@ pub async fn get_subscription_changes( let since = chrono::DateTime::from_timestamp(query.since, 0).ok_or(AppError::BadRequest)?; Ok(tokio::task::spawn_blocking(move || { - ctx.store.subscription_updates_for_device(&user, &id, since) + ctx.store + .user(&user) + .subscription_updates_for_device(&id, since) }) .await .unwrap() diff --git a/otter/src/server/gpodder/advanced/sync.rs b/otter/src/server/gpodder/advanced/sync.rs index 1ac2c33..6ba3ea2 100644 --- a/otter/src/server/gpodder/advanced/sync.rs +++ b/otter/src/server/gpodder/advanced/sync.rs @@ -41,7 +41,7 @@ pub async fn get_sync_status( } Ok( - tokio::task::spawn_blocking(move || ctx.store.devices_by_sync_group(&user)) + tokio::task::spawn_blocking(move || ctx.store.user(&user).devices_by_sync_group()) .await .unwrap() .map(|(not_synchronized, synchronized)| { @@ -68,8 +68,7 @@ pub async fn post_sync_status_changes( } Ok(tokio::task::spawn_blocking(move || { - ctx.store.update_device_sync_status( - &user, + ctx.store.user(&user).update_device_sync_status( delta .synchronize .iter() @@ -78,7 +77,7 @@ pub async fn post_sync_status_changes( delta.stop_synchronize.iter().map(|s| s.as_ref()).collect(), )?; - ctx.store.devices_by_sync_group(&user) + ctx.store.user(&user).devices_by_sync_group() }) .await .unwrap() diff --git a/otter/src/server/gpodder/simple/subscriptions.rs b/otter/src/server/gpodder/simple/subscriptions.rs index cd960f1..404e242 100644 --- a/otter/src/server/gpodder/simple/subscriptions.rs +++ b/otter/src/server/gpodder/simple/subscriptions.rs @@ -34,7 +34,7 @@ pub async fn get_device_subscriptions( } Ok( - tokio::task::spawn_blocking(move || ctx.store.subscriptions_for_device(&user, &id)) + tokio::task::spawn_blocking(move || ctx.store.user(&user).subscriptions_for_device(&id)) .await .unwrap() .map(|subs| Json(subs.into_iter().map(|s| s.url).collect()))?, @@ -51,7 +51,7 @@ pub async fn get_user_subscriptions( } Ok( - tokio::task::spawn_blocking(move || ctx.store.subscriptions_for_user(&user)) + tokio::task::spawn_blocking(move || ctx.store.user(&user).subscriptions()) .await .unwrap() .map(|subs| Json(subs.into_iter().map(|s| s.url).collect()))?, @@ -69,7 +69,9 @@ pub async fn put_device_subscriptions( } Ok(tokio::task::spawn_blocking(move || { - ctx.store.set_subscriptions_for_device(&user, &id, urls) + ctx.store + .user(&user) + .set_subscriptions_for_device(&id, urls) }) .await .unwrap() diff --git a/otter/src/server/web/mod.rs b/otter/src/server/web/mod.rs index 24da8ca..9bbd11f 100644 --- a/otter/src/server/web/mod.rs +++ b/otter/src/server/web/mod.rs @@ -125,7 +125,7 @@ async fn post_login( .validate_credentials(&login.username, &login.password)?; let user_agent = user_agent.map(|header| header.to_string()); - let session = ctx.store.create_session(&user, user_agent)?; + let session = ctx.store.user(&user).create_session(user_agent)?; Ok::<_, AuthErr>(session) }) diff --git a/otter/src/server/web/sessions.rs b/otter/src/server/web/sessions.rs index d05e3a7..8c25f30 100644 --- a/otter/src/server/web/sessions.rs +++ b/otter/src/server/web/sessions.rs @@ -31,7 +31,9 @@ pub async fn get_sessions( ) -> AppResult>> { let next_page = page.next_page(); let sessions = tokio::task::spawn_blocking(move || { - ctx.store.paginated_sessions(&session.user, page.into()) + ctx.store + .user(&session.user) + .paginated_sessions(page.into()) }) .await .unwrap()?;