refactor: migrated episode actions API to store

Jef Roosens 2025-03-15 19:34:27 +01:00
parent da7befc5c4
commit 8a5e625e6f
No known key found for this signature in database
GPG Key ID: 21FD3D77D56BAF49
7 changed files with 97 additions and 53 deletions

View File

@ -9,7 +9,6 @@ pub fn serve(config: &crate::config::Config) -> u8 {
let repo = db::SqliteRepository::from(pool); let repo = db::SqliteRepository::from(pool);
let ctx = server::Context { let ctx = server::Context {
repo: repo.clone(),
store: crate::gpodder::GpodderRepository::new(repo), store: crate::gpodder::GpodderRepository::new(repo),
}; };
let app = server::app(ctx); let app = server::app(ctx);

View File

@ -1,4 +1,4 @@
use chrono::DateTime; use chrono::{DateTime, Utc};
use diesel::prelude::*; use diesel::prelude::*;
use super::SqliteRepository; use super::SqliteRepository;
@ -26,7 +26,7 @@ impl From<gpodder::EpisodeAction> for db::NewEpisodeAction {
podcast_url: value.podcast, podcast_url: value.podcast,
episode_url: value.episode, episode_url: value.episode,
time_changed: 0, time_changed: 0,
timestamp: value.timestamp.map(|t| t.and_utc().timestamp()), timestamp: value.timestamp.map(|t| t.timestamp()),
action, action,
started, started,
position, position,
@ -58,7 +58,8 @@ impl From<(Option<String>, db::EpisodeAction)> for gpodder::EpisodeAction {
// SAFETY the input to the from_timestamp function is always the result of a // SAFETY the input to the from_timestamp function is always the result of a
// previous timestamp() function call, which is guaranteed to be each other's // previous timestamp() function call, which is guaranteed to be each other's
// reverse // reverse
.map(|ts| DateTime::from_timestamp(ts, 0).unwrap().naive_utc()), .map(|ts| DateTime::from_timestamp(ts, 0).unwrap()),
time_changed: DateTime::from_timestamp(db_action.time_changed, 0).unwrap(),
device: device_id, device: device_id,
action, action,
} }
@ -70,8 +71,9 @@ impl gpodder::EpisodeActionRepository for SqliteRepository {
&self, &self,
user: &gpodder::User, user: &gpodder::User,
actions: Vec<gpodder::EpisodeAction>, actions: Vec<gpodder::EpisodeAction>,
) -> Result<i64, gpodder::AuthErr> { time_changed: DateTime<Utc>,
let time_changed = chrono::Utc::now().timestamp(); ) -> Result<(), gpodder::AuthErr> {
let time_changed = time_changed.timestamp();
// TODO optimize this query // TODO optimize this query
// 1. The lookup for a device could be replaced with a subquery, although Diesel seems to // 1. The lookup for a device could be replaced with a subquery, although Diesel seems to
@ -99,17 +101,18 @@ impl gpodder::EpisodeActionRepository for SqliteRepository {
Ok::<_, diesel::result::Error>(()) Ok::<_, diesel::result::Error>(())
})?; })?;
Ok(time_changed + 1) Ok(())
} }
fn episode_actions_for_user( fn episode_actions_for_user(
&self, &self,
user: &gpodder::User, user: &gpodder::User,
since: Option<i64>, since: Option<DateTime<Utc>>,
podcast: Option<String>, podcast: Option<String>,
device: Option<String>, device: Option<String>,
aggregated: bool, aggregated: bool,
) -> Result<(i64, Vec<gpodder::EpisodeAction>), gpodder::AuthErr> { ) -> Result<Vec<gpodder::EpisodeAction>, gpodder::AuthErr> {
let since = since.map(|ts| ts.timestamp()).unwrap_or(0);
let conn = &mut self.pool.get()?; let conn = &mut self.pool.get()?;
let mut query = episode_actions::table let mut query = episode_actions::table
@ -117,7 +120,7 @@ impl gpodder::EpisodeActionRepository for SqliteRepository {
.filter( .filter(
episode_actions::user_id episode_actions::user_id
.eq(user.id) .eq(user.id)
.and(episode_actions::time_changed.ge(since.unwrap_or(0))), .and(episode_actions::time_changed.ge(since)),
) )
.select(( .select((
devices::device_id.nullable(), devices::device_id.nullable(),
@ -157,16 +160,11 @@ impl gpodder::EpisodeActionRepository for SqliteRepository {
query.get_results(conn)? query.get_results(conn)?
}; };
let max_timestamp = db_actions
.iter()
.map(|(_, a)| a.time_changed)
.max()
.unwrap_or(0);
let actions = db_actions let actions = db_actions
.into_iter() .into_iter()
.map(gpodder::EpisodeAction::from) .map(gpodder::EpisodeAction::from)
.collect(); .collect();
Ok((max_timestamp + 1, actions)) Ok(actions)
} }
} }

View File

@ -1,6 +1,7 @@
pub mod models; pub mod models;
mod repository; mod repository;
use chrono::{DateTime, Utc};
pub use models::*; pub use models::*;
pub use repository::GpodderRepository; pub use repository::GpodderRepository;
@ -11,9 +12,15 @@ pub enum AuthErr {
Other(Box<dyn std::error::Error + Sync + Send>), Other(Box<dyn std::error::Error + Sync + Send>),
} }
pub trait Store: AuthStore + DeviceRepository + SubscriptionRepository {} pub trait Store:
AuthStore + DeviceRepository + SubscriptionRepository + EpisodeActionRepository
{
}
impl<T> Store for T where T: AuthStore + DeviceRepository + SubscriptionRepository {} impl<T> Store for T where
T: AuthStore + DeviceRepository + SubscriptionRepository + EpisodeActionRepository
{
}
pub trait AuthRepository { pub trait AuthRepository {
/// Validate the given session ID and return its user. /// Validate the given session ID and return its user.
@ -78,7 +85,7 @@ pub trait SubscriptionRepository {
user: &User, user: &User,
device_id: &str, device_id: &str,
urls: Vec<String>, urls: Vec<String>,
time_changed: chrono::DateTime<chrono::Utc>, time_changed: DateTime<Utc>,
) -> Result<(), AuthErr>; ) -> Result<(), AuthErr>;
/// Update the list of subscriptions for a device by adding and removing the given URLs /// Update the list of subscriptions for a device by adding and removing the given URLs
@ -88,7 +95,7 @@ pub trait SubscriptionRepository {
device_id: &str, device_id: &str,
add: Vec<String>, add: Vec<String>,
remove: Vec<String>, remove: Vec<String>,
time_changed: chrono::DateTime<chrono::Utc>, time_changed: DateTime<Utc>,
) -> Result<(), AuthErr>; ) -> Result<(), AuthErr>;
/// Returns the changes in subscriptions since the given timestamp. /// Returns the changes in subscriptions since the given timestamp.
@ -96,22 +103,26 @@ pub trait SubscriptionRepository {
&self, &self,
user: &User, user: &User,
device_id: &str, device_id: &str,
since: chrono::DateTime<chrono::Utc>, since: DateTime<Utc>,
) -> Result<(chrono::DateTime<chrono::Utc>, Vec<String>, Vec<String>), AuthErr>; ) -> Result<(DateTime<Utc>, Vec<String>, Vec<String>), AuthErr>;
} }
pub trait EpisodeActionRepository { pub trait EpisodeActionRepository {
/// Insert the given episode actions into the datastore. /// Insert the given episode actions into the datastore.
fn add_episode_actions(&self, user: &User, actions: Vec<EpisodeAction>) fn add_episode_actions(
-> Result<i64, AuthErr>; &self,
user: &User,
actions: Vec<EpisodeAction>,
time_changed: DateTime<Utc>,
) -> Result<(), AuthErr>;
/// Retrieve the list of episode actions for the given user. /// Retrieve the list of episode actions for the given user.
fn episode_actions_for_user( fn episode_actions_for_user(
&self, &self,
user: &User, user: &User,
since: Option<i64>, since: Option<DateTime<Utc>>,
podcast: Option<String>, podcast: Option<String>,
device: Option<String>, device: Option<String>,
aggregated: bool, aggregated: bool,
) -> Result<(i64, Vec<EpisodeAction>), AuthErr>; ) -> Result<Vec<EpisodeAction>, AuthErr>;
} }

View File

@ -1,4 +1,4 @@
use chrono::{DateTime, NaiveDateTime, Utc}; use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[derive(Clone)] #[derive(Clone)]
@ -52,7 +52,8 @@ pub enum EpisodeActionType {
pub struct EpisodeAction { pub struct EpisodeAction {
pub podcast: String, pub podcast: String,
pub episode: String, pub episode: String,
pub timestamp: Option<NaiveDateTime>, pub timestamp: Option<DateTime<Utc>>,
pub time_changed: DateTime<Utc>,
#[serde(default)] #[serde(default)]
pub device: Option<String>, pub device: Option<String>,
#[serde(flatten)] #[serde(flatten)]

View File

@ -1,6 +1,7 @@
use std::sync::Arc; use std::sync::Arc;
use argon2::{Argon2, PasswordHash, PasswordVerifier}; use argon2::{Argon2, PasswordHash, PasswordVerifier};
use chrono::{DateTime, TimeDelta, Utc};
use rand::Rng; use rand::Rng;
use super::{models, AuthErr, Store}; use super::{models, AuthErr, Store};
@ -26,9 +27,7 @@ impl GpodderRepository {
.ok_or(AuthErr::UnknownSession)?; .ok_or(AuthErr::UnknownSession)?;
// Expired sessions still in the database are considered removed // Expired sessions still in the database are considered removed
if chrono::Utc::now() - session.last_seen if Utc::now() - session.last_seen > TimeDelta::new(MAX_SESSION_AGE, 0).unwrap() {
> chrono::TimeDelta::new(MAX_SESSION_AGE, 0).unwrap()
{
Err(AuthErr::UnknownSession) Err(AuthErr::UnknownSession)
} else { } else {
Ok(session) Ok(session)
@ -57,7 +56,7 @@ impl GpodderRepository {
pub fn create_session(&self, user: &models::User) -> Result<models::Session, AuthErr> { pub fn create_session(&self, user: &models::User) -> Result<models::Session, AuthErr> {
let session = models::Session { let session = models::Session {
id: rand::thread_rng().gen(), id: rand::thread_rng().gen(),
last_seen: chrono::Utc::now(), last_seen: Utc::now(),
user: user.clone(), user: user.clone(),
}; };
@ -100,13 +99,13 @@ impl GpodderRepository {
user: &models::User, user: &models::User,
device_id: &str, device_id: &str,
urls: Vec<String>, urls: Vec<String>,
) -> Result<chrono::DateTime<chrono::Utc>, AuthErr> { ) -> Result<DateTime<Utc>, AuthErr> {
let time_changed = chrono::Utc::now(); let time_changed = Utc::now();
self.store self.store
.set_subscriptions_for_device(user, device_id, urls, time_changed)?; .set_subscriptions_for_device(user, device_id, urls, time_changed)?;
Ok(time_changed + chrono::TimeDelta::seconds(1)) Ok(time_changed + TimeDelta::seconds(1))
} }
pub fn update_subscriptions_for_device( pub fn update_subscriptions_for_device(
@ -115,29 +114,55 @@ impl GpodderRepository {
device_id: &str, device_id: &str,
add: Vec<String>, add: Vec<String>,
remove: Vec<String>, remove: Vec<String>,
) -> Result<chrono::DateTime<chrono::Utc>, AuthErr> { ) -> Result<DateTime<Utc>, AuthErr> {
let time_changed = chrono::Utc::now(); let time_changed = Utc::now();
self.store self.store
.update_subscriptions_for_device(user, device_id, add, remove, time_changed)?; .update_subscriptions_for_device(user, device_id, add, remove, time_changed)?;
Ok(time_changed + chrono::TimeDelta::seconds(1)) Ok(time_changed + TimeDelta::seconds(1))
} }
pub fn subscription_updates_for_device( pub fn subscription_updates_for_device(
&self, &self,
user: &models::User, user: &models::User,
device_id: &str, device_id: &str,
since: chrono::DateTime<chrono::Utc>, since: DateTime<Utc>,
) -> Result<(chrono::DateTime<chrono::Utc>, Vec<String>, Vec<String>), AuthErr> { ) -> Result<(DateTime<Utc>, Vec<String>, Vec<String>), AuthErr> {
let (max_time_changed, added, removed) = self let (max_time_changed, added, removed) = self
.store .store
.subscription_updates_for_device(user, device_id, since)?; .subscription_updates_for_device(user, device_id, since)?;
Ok(( Ok((max_time_changed + TimeDelta::seconds(1), added, removed))
max_time_changed + chrono::TimeDelta::seconds(1), }
added,
removed, pub fn add_episode_actions(
)) &self,
user: &models::User,
actions: Vec<models::EpisodeAction>,
) -> Result<DateTime<Utc>, AuthErr> {
let time_changed = Utc::now();
self.store
.add_episode_actions(user, actions, time_changed)?;
Ok(time_changed + TimeDelta::seconds(1))
}
pub fn episode_actions_for_user(
&self,
user: &models::User,
since: Option<DateTime<Utc>>,
podcast: Option<String>,
device: Option<String>,
aggregated: bool,
) -> Result<(DateTime<Utc>, Vec<models::EpisodeAction>), AuthErr> {
let now = chrono::Utc::now();
let actions = self
.store
.episode_actions_for_user(user, since, podcast, device, aggregated)?;
let max_time_changed = actions.iter().map(|a| a.time_changed).max().unwrap_or(now);
Ok((max_time_changed + TimeDelta::seconds(1), actions))
} }
} }

View File

@ -4,10 +4,11 @@ use axum::{
routing::post, routing::post,
Extension, Json, Router, Extension, Json, Router,
}; };
use chrono::DateTime;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::{ use crate::{
gpodder::{self, EpisodeActionRepository}, gpodder,
server::{ server::{
error::{AppError, AppResult}, error::{AppError, AppResult},
gpodder::{ gpodder::{
@ -43,12 +44,12 @@ async fn post_episode_actions(
} }
Ok( Ok(
tokio::task::spawn_blocking(move || ctx.repo.add_episode_actions(&user, actions)) tokio::task::spawn_blocking(move || ctx.store.add_episode_actions(&user, actions))
.await .await
.unwrap() .unwrap()
.map(|timestamp| { .map(|time_changed| {
Json(UpdatedUrlsResponse { Json(UpdatedUrlsResponse {
timestamp, timestamp: time_changed.timestamp(),
update_urls: Vec::new(), update_urls: Vec::new(),
}) })
})?, })?,
@ -84,10 +85,15 @@ async fn get_episode_actions(
return Err(AppError::BadRequest); return Err(AppError::BadRequest);
} }
let since = filter
.since
.map(|ts| DateTime::from_timestamp(ts, 0))
.flatten();
Ok(tokio::task::spawn_blocking(move || { Ok(tokio::task::spawn_blocking(move || {
ctx.repo.episode_actions_for_user( ctx.store.episode_actions_for_user(
&user, &user,
filter.since, since,
filter.podcast, filter.podcast,
filter.device, filter.device,
filter.aggregated, filter.aggregated,
@ -95,5 +101,10 @@ async fn get_episode_actions(
}) })
.await .await
.unwrap() .unwrap()
.map(|(timestamp, actions)| Json(EpisodeActionsResponse { timestamp, actions }))?) .map(|(ts, actions)| {
Json(EpisodeActionsResponse {
timestamp: ts.timestamp(),
actions,
})
})?)
} }

View File

@ -6,7 +6,6 @@ use tower_http::trace::TraceLayer;
#[derive(Clone)] #[derive(Clone)]
pub struct Context { pub struct Context {
pub repo: crate::db::SqliteRepository,
pub store: crate::gpodder::GpodderRepository, pub store: crate::gpodder::GpodderRepository,
} }