refactor(gpodder): add authenticated view of repository
parent
0e91eef0e8
commit
346c27fc3f
|
@ -9,13 +9,22 @@ use crate::{
|
|||
store::{AuthErr, GpodderStore},
|
||||
};
|
||||
|
||||
const MAX_SESSION_AGE: i64 = 60 * 60 * 24 * 7;
|
||||
const MAX_SESSION_AGE: TimeDelta = TimeDelta::seconds(60 * 60 * 24 * 7);
|
||||
|
||||
/// Main abstraction over the database that provides API-compatible methods for querying and
|
||||
/// modifying the underlying database
|
||||
#[derive(Clone)]
|
||||
pub struct GpodderRepository {
|
||||
store: Arc<dyn GpodderStore + Send + Sync>,
|
||||
}
|
||||
|
||||
/// Authenticated view of the repository, providing methods that take the authenticated user
|
||||
/// explicitely into account
|
||||
pub struct AuthenticatedRepository<'a> {
|
||||
store: &'a (dyn GpodderStore + Send + Sync),
|
||||
user: &'a models::User,
|
||||
}
|
||||
|
||||
impl GpodderRepository {
|
||||
pub fn new(store: impl GpodderStore + Send + Sync + 'static) -> Self {
|
||||
Self {
|
||||
|
@ -23,6 +32,14 @@ impl GpodderRepository {
|
|||
}
|
||||
}
|
||||
|
||||
/// Return an authenticated view of the repository for the given user
|
||||
pub fn user<'a>(&'a self, user: &'a models::User) -> AuthenticatedRepository<'a> {
|
||||
AuthenticatedRepository {
|
||||
store: self.store.as_ref(),
|
||||
user: user,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_session(&self, session_id: i64) -> Result<models::Session, AuthErr> {
|
||||
let session = self
|
||||
.store
|
||||
|
@ -30,21 +47,13 @@ impl GpodderRepository {
|
|||
.ok_or(AuthErr::UnknownSession)?;
|
||||
|
||||
// Expired sessions still in the database are considered removed
|
||||
if Utc::now() - session.last_seen > TimeDelta::new(MAX_SESSION_AGE, 0).unwrap() {
|
||||
if Utc::now() - session.last_seen > MAX_SESSION_AGE {
|
||||
Err(AuthErr::UnknownSession)
|
||||
} else {
|
||||
Ok(session)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn paginated_sessions(
|
||||
&self,
|
||||
user: &models::User,
|
||||
page: models::Page,
|
||||
) -> Result<Vec<models::Session>, AuthErr> {
|
||||
self.store.paginated_sessions(user, page)
|
||||
}
|
||||
|
||||
pub fn get_user(&self, username: &str) -> Result<models::User, AuthErr> {
|
||||
self.store.get_user(username)?.ok_or(AuthErr::UnknownUser)
|
||||
}
|
||||
|
@ -79,23 +88,6 @@ impl GpodderRepository {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn create_session(
|
||||
&self,
|
||||
user: &models::User,
|
||||
user_agent: Option<String>,
|
||||
) -> Result<models::Session, AuthErr> {
|
||||
let session = models::Session {
|
||||
id: rand::thread_rng().r#gen(),
|
||||
last_seen: Utc::now(),
|
||||
user: user.clone(),
|
||||
user_agent,
|
||||
};
|
||||
|
||||
self.store.insert_session(&session)?;
|
||||
|
||||
Ok(session)
|
||||
}
|
||||
|
||||
pub fn refresh_session(&self, session: &models::Session) -> Result<(), AuthErr> {
|
||||
let now = Utc::now();
|
||||
|
||||
|
@ -107,27 +99,80 @@ impl GpodderRepository {
|
|||
}
|
||||
|
||||
pub fn remove_old_sessions(&self) -> Result<usize, AuthErr> {
|
||||
let min_last_seen = Utc::now() - TimeDelta::seconds(MAX_SESSION_AGE);
|
||||
let min_last_seen = Utc::now() - MAX_SESSION_AGE;
|
||||
|
||||
self.store.remove_old_sessions(min_last_seen)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn devices_for_user(&self, user: &models::User) -> Result<Vec<models::Device>, AuthErr> {
|
||||
self.store.devices_for_user(user)
|
||||
impl<'a> AuthenticatedRepository<'a> {
|
||||
/// Retrieve the given session from the database, if it exists and is visible to the user
|
||||
pub fn get_session(&self, session_id: i64) -> Result<models::Session, AuthErr> {
|
||||
let session = self
|
||||
.store
|
||||
.get_session(session_id)?
|
||||
.ok_or(AuthErr::UnknownSession)?;
|
||||
|
||||
// Users can't see sessions from other users, and expired sessions still in the database
|
||||
// are considered removed
|
||||
if session.user.id != self.user.id || Utc::now() - session.last_seen > MAX_SESSION_AGE {
|
||||
Err(AuthErr::UnknownSession)
|
||||
} else {
|
||||
Ok(session)
|
||||
}
|
||||
}
|
||||
|
||||
/// Retrieve a paginated list of the user's sessions
|
||||
pub fn paginated_sessions(&self, page: models::Page) -> Result<Vec<models::Session>, AuthErr> {
|
||||
self.store.paginated_sessions(self.user, page)
|
||||
}
|
||||
|
||||
/// Create a new session for the authenticated user
|
||||
pub fn create_session(&self, user_agent: Option<String>) -> Result<models::Session, AuthErr> {
|
||||
let session = models::Session {
|
||||
id: rand::thread_rng().r#gen(),
|
||||
last_seen: Utc::now(),
|
||||
user: self.user.clone(),
|
||||
user_agent,
|
||||
};
|
||||
|
||||
self.store.insert_session(&session)?;
|
||||
|
||||
Ok(session)
|
||||
}
|
||||
|
||||
/// Set the session's last seen value to the current time
|
||||
pub fn refresh_session(&self, session: &models::Session) -> Result<(), AuthErr> {
|
||||
let now = Utc::now();
|
||||
|
||||
self.store.refresh_session(session, now)
|
||||
}
|
||||
|
||||
/// Remove the given session, if it belongs to the authenticated user
|
||||
pub fn remove_session(&self, session_id: i64) -> Result<(), AuthErr> {
|
||||
// This fails if the session doesn't exist for the user, so it's basically a "exists" check
|
||||
let session = self.get_session(session_id)?;
|
||||
|
||||
self.store.remove_session(session.id)
|
||||
}
|
||||
|
||||
/// Return the devices for the authenticated user
|
||||
pub fn devices(&self) -> Result<Vec<models::Device>, AuthErr> {
|
||||
self.store.devices_for_user(self.user)
|
||||
}
|
||||
|
||||
/// Update the metadata of a device
|
||||
pub fn update_device_info(
|
||||
&self,
|
||||
user: &models::User,
|
||||
device_id: &str,
|
||||
patch: models::DevicePatch,
|
||||
) -> Result<(), AuthErr> {
|
||||
self.store.update_device_info(user, device_id, patch)
|
||||
self.store.update_device_info(self.user, device_id, patch)
|
||||
}
|
||||
|
||||
/// Update the sync status for some of the user's devices
|
||||
pub fn update_device_sync_status(
|
||||
&self,
|
||||
user: &models::User,
|
||||
sync: Vec<Vec<&str>>,
|
||||
unsync: Vec<&str>,
|
||||
) -> Result<(), AuthErr> {
|
||||
|
@ -146,71 +191,72 @@ impl GpodderRepository {
|
|||
unsync.remove(device_id);
|
||||
}
|
||||
|
||||
let group_id = self.store.merge_sync_groups(user, remaining)?;
|
||||
let group_id = self.store.merge_sync_groups(self.user, remaining)?;
|
||||
self.store.synchronize_sync_group(group_id, now)?;
|
||||
}
|
||||
|
||||
// Finally we unsync the remaining devices
|
||||
self.store
|
||||
.remove_from_sync_group(user, unsync.into_iter().collect())?;
|
||||
.remove_from_sync_group(self.user, unsync.into_iter().collect())?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn devices_by_sync_group(
|
||||
&self,
|
||||
user: &models::User,
|
||||
) -> Result<(Vec<String>, Vec<Vec<String>>), AuthErr> {
|
||||
self.store.devices_by_sync_group(user)
|
||||
/// Return the user's devices, grouped per sync group
|
||||
pub fn devices_by_sync_group(&self) -> Result<(Vec<String>, Vec<Vec<String>>), AuthErr> {
|
||||
self.store.devices_by_sync_group(self.user)
|
||||
}
|
||||
|
||||
/// Retrieve the user's subscriptions for a device
|
||||
pub fn subscriptions_for_device(
|
||||
&self,
|
||||
user: &models::User,
|
||||
device_id: &str,
|
||||
) -> Result<Vec<models::Subscription>, AuthErr> {
|
||||
self.store.subscriptions_for_device(user, device_id)
|
||||
self.store.subscriptions_for_device(self.user, device_id)
|
||||
}
|
||||
|
||||
pub fn subscriptions_for_user(
|
||||
&self,
|
||||
user: &models::User,
|
||||
) -> Result<Vec<models::Subscription>, AuthErr> {
|
||||
self.store.subscriptions_for_user(user)
|
||||
/// Retrieve the user's subscriptions
|
||||
pub fn subscriptions(&self) -> Result<Vec<models::Subscription>, AuthErr> {
|
||||
self.store.subscriptions_for_user(self.user)
|
||||
}
|
||||
|
||||
/// Set the subscriptions for a given device
|
||||
pub fn set_subscriptions_for_device(
|
||||
&self,
|
||||
user: &models::User,
|
||||
device_id: &str,
|
||||
urls: Vec<String>,
|
||||
) -> Result<DateTime<Utc>, AuthErr> {
|
||||
let time_changed = Utc::now();
|
||||
|
||||
self.store
|
||||
.set_subscriptions_for_device(user, device_id, urls, time_changed)?;
|
||||
.set_subscriptions_for_device(self.user, device_id, urls, time_changed)?;
|
||||
|
||||
Ok(time_changed + TimeDelta::seconds(1))
|
||||
}
|
||||
|
||||
/// Add and remove subscriptions to and from a given device.
|
||||
pub fn update_subscriptions_for_device(
|
||||
&self,
|
||||
user: &models::User,
|
||||
device_id: &str,
|
||||
add: Vec<String>,
|
||||
remove: Vec<String>,
|
||||
) -> Result<DateTime<Utc>, AuthErr> {
|
||||
let time_changed = Utc::now();
|
||||
|
||||
self.store
|
||||
.update_subscriptions_for_device(user, device_id, add, remove, time_changed)?;
|
||||
self.store.update_subscriptions_for_device(
|
||||
self.user,
|
||||
device_id,
|
||||
add,
|
||||
remove,
|
||||
time_changed,
|
||||
)?;
|
||||
|
||||
Ok(time_changed + TimeDelta::seconds(1))
|
||||
}
|
||||
|
||||
/// Get the changes in subscriptions for a given device after a given timestamp.
|
||||
pub fn subscription_updates_for_device(
|
||||
&self,
|
||||
user: &models::User,
|
||||
device_id: &str,
|
||||
since: DateTime<Utc>,
|
||||
) -> Result<
|
||||
|
@ -225,7 +271,7 @@ impl GpodderRepository {
|
|||
|
||||
let (added, removed) = self
|
||||
.store
|
||||
.subscription_updates_for_device(user, device_id, since)?;
|
||||
.subscription_updates_for_device(self.user, device_id, since)?;
|
||||
|
||||
let max_time_changed = added
|
||||
.iter()
|
||||
|
@ -236,22 +282,22 @@ impl GpodderRepository {
|
|||
Ok((max_time_changed + TimeDelta::seconds(1), added, removed))
|
||||
}
|
||||
|
||||
/// Add episode actions to the database
|
||||
pub fn add_episode_actions(
|
||||
&self,
|
||||
user: &models::User,
|
||||
actions: Vec<models::EpisodeAction>,
|
||||
) -> Result<DateTime<Utc>, AuthErr> {
|
||||
let time_changed = Utc::now();
|
||||
|
||||
self.store
|
||||
.add_episode_actions(user, actions, time_changed)?;
|
||||
.add_episode_actions(self.user, actions, time_changed)?;
|
||||
|
||||
Ok(time_changed + TimeDelta::seconds(1))
|
||||
}
|
||||
|
||||
/// Get episode actions for the currently authenticated user
|
||||
pub fn episode_actions_for_user(
|
||||
&self,
|
||||
user: &models::User,
|
||||
since: Option<DateTime<Utc>>,
|
||||
podcast: Option<String>,
|
||||
device: Option<String>,
|
||||
|
@ -260,7 +306,7 @@ impl GpodderRepository {
|
|||
let now = chrono::Utc::now();
|
||||
let actions = self
|
||||
.store
|
||||
.episode_actions_for_user(user, since, podcast, device, aggregated)?;
|
||||
.episode_actions_for_user(self.user, since, podcast, device, aggregated)?;
|
||||
let max_time_changed = actions.iter().map(|a| a.time_changed).max().unwrap_or(now);
|
||||
|
||||
Ok((max_time_changed + TimeDelta::seconds(1), actions))
|
||||
|
|
|
@ -23,15 +23,14 @@ impl Command {
|
|||
match self {
|
||||
Self::Sync { username, devices } => {
|
||||
let user = store.get_user(username)?;
|
||||
store.update_device_sync_status(
|
||||
&user,
|
||||
store.user(&user).update_device_sync_status(
|
||||
vec![devices.iter().map(|s| s.as_ref()).collect()],
|
||||
Vec::new(),
|
||||
)?;
|
||||
}
|
||||
Self::Devices { username } => {
|
||||
let user = store.get_user(username)?;
|
||||
let devices = store.devices_for_user(&user)?;
|
||||
let devices = store.user(&user).devices()?;
|
||||
|
||||
for device in devices {
|
||||
println!("{} ({} subscriptions)", device.id, device.subscriptions);
|
||||
|
|
|
@ -66,7 +66,7 @@ async fn post_login(
|
|||
.validate_credentials(auth.username(), auth.password())?;
|
||||
|
||||
let user_agent = user_agent.map(|header| header.to_string());
|
||||
let session = ctx.store.create_session(&user, user_agent)?;
|
||||
let session = ctx.store.user(&user).create_session(user_agent)?;
|
||||
|
||||
Ok::<_, AuthErr>(session)
|
||||
})
|
||||
|
|
|
@ -39,7 +39,7 @@ async fn get_devices(
|
|||
}
|
||||
|
||||
Ok(
|
||||
tokio::task::spawn_blocking(move || ctx.store.devices_for_user(&user))
|
||||
tokio::task::spawn_blocking(move || ctx.store.user(&user).devices())
|
||||
.await
|
||||
.unwrap()
|
||||
.map(|devices| Json(devices.into_iter().map(models::Device::from).collect()))?,
|
||||
|
@ -56,9 +56,11 @@ async fn post_device(
|
|||
return Err(AppError::NotFound);
|
||||
}
|
||||
|
||||
tokio::task::spawn_blocking(move || ctx.store.update_device_info(&user, &id, patch.into()))
|
||||
.await
|
||||
.unwrap()?;
|
||||
tokio::task::spawn_blocking(move || {
|
||||
ctx.store.user(&user).update_device_info(&id, patch.into())
|
||||
})
|
||||
.await
|
||||
.unwrap()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -46,7 +46,8 @@ async fn post_episode_actions(
|
|||
|
||||
Ok(tokio::task::spawn_blocking(move || {
|
||||
ctx.store
|
||||
.add_episode_actions(&user, actions.into_iter().map(Into::into).collect())
|
||||
.user(&user)
|
||||
.add_episode_actions(actions.into_iter().map(Into::into).collect())
|
||||
})
|
||||
.await
|
||||
.unwrap()
|
||||
|
@ -90,8 +91,7 @@ async fn get_episode_actions(
|
|||
let since = filter.since.and_then(|ts| DateTime::from_timestamp(ts, 0));
|
||||
|
||||
Ok(tokio::task::spawn_blocking(move || {
|
||||
ctx.store.episode_actions_for_user(
|
||||
&user,
|
||||
ctx.store.user(&user).episode_actions_for_user(
|
||||
since,
|
||||
filter.podcast,
|
||||
filter.device,
|
||||
|
|
|
@ -44,7 +44,8 @@ pub async fn post_subscription_changes(
|
|||
|
||||
Ok(tokio::task::spawn_blocking(move || {
|
||||
ctx.store
|
||||
.update_subscriptions_for_device(&user, &id, delta.add, delta.remove)
|
||||
.user(&user)
|
||||
.update_subscriptions_for_device(&id, delta.add, delta.remove)
|
||||
})
|
||||
.await
|
||||
.unwrap()
|
||||
|
@ -79,7 +80,9 @@ pub async fn get_subscription_changes(
|
|||
let since = chrono::DateTime::from_timestamp(query.since, 0).ok_or(AppError::BadRequest)?;
|
||||
|
||||
Ok(tokio::task::spawn_blocking(move || {
|
||||
ctx.store.subscription_updates_for_device(&user, &id, since)
|
||||
ctx.store
|
||||
.user(&user)
|
||||
.subscription_updates_for_device(&id, since)
|
||||
})
|
||||
.await
|
||||
.unwrap()
|
||||
|
|
|
@ -41,7 +41,7 @@ pub async fn get_sync_status(
|
|||
}
|
||||
|
||||
Ok(
|
||||
tokio::task::spawn_blocking(move || ctx.store.devices_by_sync_group(&user))
|
||||
tokio::task::spawn_blocking(move || ctx.store.user(&user).devices_by_sync_group())
|
||||
.await
|
||||
.unwrap()
|
||||
.map(|(not_synchronized, synchronized)| {
|
||||
|
@ -68,8 +68,7 @@ pub async fn post_sync_status_changes(
|
|||
}
|
||||
|
||||
Ok(tokio::task::spawn_blocking(move || {
|
||||
ctx.store.update_device_sync_status(
|
||||
&user,
|
||||
ctx.store.user(&user).update_device_sync_status(
|
||||
delta
|
||||
.synchronize
|
||||
.iter()
|
||||
|
@ -78,7 +77,7 @@ pub async fn post_sync_status_changes(
|
|||
delta.stop_synchronize.iter().map(|s| s.as_ref()).collect(),
|
||||
)?;
|
||||
|
||||
ctx.store.devices_by_sync_group(&user)
|
||||
ctx.store.user(&user).devices_by_sync_group()
|
||||
})
|
||||
.await
|
||||
.unwrap()
|
||||
|
|
|
@ -34,7 +34,7 @@ pub async fn get_device_subscriptions(
|
|||
}
|
||||
|
||||
Ok(
|
||||
tokio::task::spawn_blocking(move || ctx.store.subscriptions_for_device(&user, &id))
|
||||
tokio::task::spawn_blocking(move || ctx.store.user(&user).subscriptions_for_device(&id))
|
||||
.await
|
||||
.unwrap()
|
||||
.map(|subs| Json(subs.into_iter().map(|s| s.url).collect()))?,
|
||||
|
@ -51,7 +51,7 @@ pub async fn get_user_subscriptions(
|
|||
}
|
||||
|
||||
Ok(
|
||||
tokio::task::spawn_blocking(move || ctx.store.subscriptions_for_user(&user))
|
||||
tokio::task::spawn_blocking(move || ctx.store.user(&user).subscriptions())
|
||||
.await
|
||||
.unwrap()
|
||||
.map(|subs| Json(subs.into_iter().map(|s| s.url).collect()))?,
|
||||
|
@ -69,7 +69,9 @@ pub async fn put_device_subscriptions(
|
|||
}
|
||||
|
||||
Ok(tokio::task::spawn_blocking(move || {
|
||||
ctx.store.set_subscriptions_for_device(&user, &id, urls)
|
||||
ctx.store
|
||||
.user(&user)
|
||||
.set_subscriptions_for_device(&id, urls)
|
||||
})
|
||||
.await
|
||||
.unwrap()
|
||||
|
|
|
@ -125,7 +125,7 @@ async fn post_login(
|
|||
.validate_credentials(&login.username, &login.password)?;
|
||||
|
||||
let user_agent = user_agent.map(|header| header.to_string());
|
||||
let session = ctx.store.create_session(&user, user_agent)?;
|
||||
let session = ctx.store.user(&user).create_session(user_agent)?;
|
||||
|
||||
Ok::<_, AuthErr>(session)
|
||||
})
|
||||
|
|
|
@ -31,7 +31,9 @@ pub async fn get_sessions(
|
|||
) -> AppResult<TemplateResponse<Page<View>>> {
|
||||
let next_page = page.next_page();
|
||||
let sessions = tokio::task::spawn_blocking(move || {
|
||||
ctx.store.paginated_sessions(&session.user, page.into())
|
||||
ctx.store
|
||||
.user(&session.user)
|
||||
.paginated_sessions(page.into())
|
||||
})
|
||||
.await
|
||||
.unwrap()?;
|
||||
|
|
Loading…
Reference in New Issue