refactor: migrate subscriptions API to store

main
Jef Roosens 2025-03-15 19:19:18 +01:00
parent 6bb3e8a27f
commit dd14a2152f
No known key found for this signature in database
GPG Key ID: 21FD3D77D56BAF49
5 changed files with 101 additions and 36 deletions

View File

@ -42,9 +42,10 @@ impl gpodder::SubscriptionRepository for SqliteRepository {
user: &gpodder::User, user: &gpodder::User,
device_id: &str, device_id: &str,
urls: Vec<String>, urls: Vec<String>,
) -> Result<i64, gpodder::AuthErr> { time_changed: chrono::DateTime<chrono::Utc>,
) -> Result<(), gpodder::AuthErr> {
// TODO use a better timestamp // TODO use a better timestamp
let timestamp = chrono::Utc::now().timestamp(); let timestamp = time_changed.timestamp();
self.pool.get()?.transaction(|conn| { self.pool.get()?.transaction(|conn| {
let device = devices::table let device = devices::table
@ -126,7 +127,7 @@ impl gpodder::SubscriptionRepository for SqliteRepository {
Ok::<_, diesel::result::Error>(()) Ok::<_, diesel::result::Error>(())
})?; })?;
Ok(timestamp + 1) Ok(())
} }
fn update_subscriptions_for_device( fn update_subscriptions_for_device(
@ -135,9 +136,10 @@ impl gpodder::SubscriptionRepository for SqliteRepository {
device_id: &str, device_id: &str,
add: Vec<String>, add: Vec<String>,
remove: Vec<String>, remove: Vec<String>,
) -> Result<i64, gpodder::AuthErr> { time_changed: chrono::DateTime<chrono::Utc>,
) -> Result<(), gpodder::AuthErr> {
// TODO use a better timestamp // TODO use a better timestamp
let timestamp = chrono::Utc::now().timestamp_millis(); let timestamp = time_changed.timestamp();
// TODO URLs that are in both the added and removed lists will currently get "re-added", // TODO URLs that are in both the added and removed lists will currently get "re-added",
// meaning their change timestamp will be updated even though they haven't really changed. // meaning their change timestamp will be updated even though they haven't really changed.
@ -220,16 +222,18 @@ impl gpodder::SubscriptionRepository for SqliteRepository {
Ok::<_, diesel::result::Error>(()) Ok::<_, diesel::result::Error>(())
})?; })?;
Ok(timestamp + 1) Ok(())
} }
fn subscription_updates_for_device( fn subscription_updates_for_device(
&self, &self,
user: &gpodder::User, user: &gpodder::User,
device_id: &str, device_id: &str,
since: i64, since: chrono::DateTime<chrono::Utc>,
) -> Result<(i64, Vec<String>, Vec<String>), gpodder::AuthErr> { ) -> Result<(chrono::DateTime<chrono::Utc>, Vec<String>, Vec<String>), gpodder::AuthErr> {
let (mut timestamp, mut added, mut removed) = (0, Vec::new(), Vec::new()); let since = since.timestamp();
let (mut added, mut removed) = (Vec::new(), Vec::new());
let query = device_subscriptions::table let query = device_subscriptions::table
.inner_join(devices::table) .inner_join(devices::table)
@ -241,6 +245,8 @@ impl gpodder::SubscriptionRepository for SqliteRepository {
) )
.select(db::DeviceSubscription::as_select()); .select(db::DeviceSubscription::as_select());
let mut max_time: chrono::DateTime<chrono::Utc> = chrono::DateTime::<chrono::Utc>::MIN_UTC;
for sub in query.load_iter(&mut self.pool.get()?)? { for sub in query.load_iter(&mut self.pool.get()?)? {
let sub = sub?; let sub = sub?;
@ -250,9 +256,9 @@ impl gpodder::SubscriptionRepository for SqliteRepository {
added.push(sub.podcast_url); added.push(sub.podcast_url);
} }
timestamp = timestamp.max(sub.time_changed); max_time = max_time.max(chrono::DateTime::from_timestamp(sub.time_changed, 0).unwrap());
} }
Ok((timestamp + 1, added, removed)) Ok((max_time, added, removed))
} }
} }

View File

@ -11,9 +11,9 @@ pub enum AuthErr {
Other(Box<dyn std::error::Error + Sync + Send>), Other(Box<dyn std::error::Error + Sync + Send>),
} }
pub trait Store: AuthStore + DeviceRepository {} pub trait Store: AuthStore + DeviceRepository + SubscriptionRepository {}
impl<T> Store for T where T: AuthStore + DeviceRepository {} impl<T> Store for T where T: AuthStore + DeviceRepository + SubscriptionRepository {}
pub trait AuthRepository { pub trait AuthRepository {
/// Validate the given session ID and return its user. /// Validate the given session ID and return its user.
@ -78,7 +78,8 @@ pub trait SubscriptionRepository {
user: &User, user: &User,
device_id: &str, device_id: &str,
urls: Vec<String>, urls: Vec<String>,
) -> Result<i64, AuthErr>; time_changed: chrono::DateTime<chrono::Utc>,
) -> Result<(), AuthErr>;
/// Update the list of subscriptions for a device by adding and removing the given URLs /// Update the list of subscriptions for a device by adding and removing the given URLs
fn update_subscriptions_for_device( fn update_subscriptions_for_device(
@ -87,15 +88,16 @@ pub trait SubscriptionRepository {
device_id: &str, device_id: &str,
add: Vec<String>, add: Vec<String>,
remove: Vec<String>, remove: Vec<String>,
) -> Result<i64, AuthErr>; time_changed: chrono::DateTime<chrono::Utc>,
) -> Result<(), AuthErr>;
/// Returns the changes in subscriptions since the given timestamp. /// Returns the changes in subscriptions since the given timestamp.
fn subscription_updates_for_device( fn subscription_updates_for_device(
&self, &self,
user: &User, user: &User,
device_id: &str, device_id: &str,
since: i64, since: chrono::DateTime<chrono::Utc>,
) -> Result<(i64, Vec<String>, Vec<String>), AuthErr>; ) -> Result<(chrono::DateTime<chrono::Utc>, Vec<String>, Vec<String>), AuthErr>;
} }
pub trait EpisodeActionRepository { pub trait EpisodeActionRepository {

View File

@ -82,4 +82,62 @@ impl GpodderRepository {
) -> Result<(), AuthErr> { ) -> Result<(), AuthErr> {
self.store.update_device_info(user, device_id, patch) self.store.update_device_info(user, device_id, patch)
} }
pub fn subscriptions_for_device(
&self,
user: &models::User,
device_id: &str,
) -> Result<Vec<String>, AuthErr> {
self.store.subscriptions_for_device(user, device_id)
}
pub fn subscriptions_for_user(&self, user: &models::User) -> Result<Vec<String>, AuthErr> {
self.store.subscriptions_for_user(user)
}
pub fn set_subscriptions_for_device(
&self,
user: &models::User,
device_id: &str,
urls: Vec<String>,
) -> Result<chrono::DateTime<chrono::Utc>, AuthErr> {
let time_changed = chrono::Utc::now();
self.store
.set_subscriptions_for_device(user, device_id, urls, time_changed)?;
Ok(time_changed + chrono::TimeDelta::seconds(1))
}
pub fn update_subscriptions_for_device(
&self,
user: &models::User,
device_id: &str,
add: Vec<String>,
remove: Vec<String>,
) -> Result<chrono::DateTime<chrono::Utc>, AuthErr> {
let time_changed = chrono::Utc::now();
self.store
.update_subscriptions_for_device(user, device_id, add, remove, time_changed)?;
Ok(time_changed + chrono::TimeDelta::seconds(1))
}
pub fn subscription_updates_for_device(
&self,
user: &models::User,
device_id: &str,
since: chrono::DateTime<chrono::Utc>,
) -> Result<(chrono::DateTime<chrono::Utc>, Vec<String>, Vec<String>), AuthErr> {
let (max_time_changed, added, removed) = self
.store
.subscription_updates_for_device(user, device_id, since)?;
Ok((
max_time_changed + chrono::TimeDelta::seconds(1),
added,
removed,
))
}
} }

View File

@ -7,7 +7,7 @@ use axum::{
use serde::Deserialize; use serde::Deserialize;
use crate::{ use crate::{
gpodder::{self, SubscriptionRepository}, gpodder,
server::{ server::{
error::{AppError, AppResult}, error::{AppError, AppResult},
gpodder::{ gpodder::{
@ -43,14 +43,14 @@ pub async fn post_subscription_changes(
} }
Ok(tokio::task::spawn_blocking(move || { Ok(tokio::task::spawn_blocking(move || {
ctx.repo ctx.store
.update_subscriptions_for_device(&user, &id, delta.add, delta.remove) .update_subscriptions_for_device(&user, &id, delta.add, delta.remove)
}) })
.await .await
.unwrap() .unwrap()
.map(|timestamp| { .map(|time_changed| {
Json(UpdatedUrlsResponse { Json(UpdatedUrlsResponse {
timestamp, timestamp: time_changed.timestamp(),
update_urls: Vec::new(), update_urls: Vec::new(),
}) })
})?) })?)
@ -76,17 +76,18 @@ pub async fn get_subscription_changes(
return Err(AppError::BadRequest); return Err(AppError::BadRequest);
} }
let since = chrono::DateTime::from_timestamp(query.since, 0).ok_or(AppError::BadRequest)?;
Ok(tokio::task::spawn_blocking(move || { Ok(tokio::task::spawn_blocking(move || {
ctx.repo ctx.store.subscription_updates_for_device(&user, &id, since)
.subscription_updates_for_device(&user, &id, query.since)
}) })
.await .await
.unwrap() .unwrap()
.map(|(timestamp, add, remove)| { .map(|(next_time_changed, add, remove)| {
Json(SubscriptionDeltaResponse { Json(SubscriptionDeltaResponse {
add, add,
remove, remove,
timestamp, timestamp: next_time_changed.timestamp(),
}) })
})?) })?)
} }

View File

@ -6,7 +6,7 @@ use axum::{
}; };
use crate::{ use crate::{
gpodder::{self, SubscriptionRepository}, gpodder,
server::{ server::{
error::{AppError, AppResult}, error::{AppError, AppResult},
gpodder::{auth_middleware, format::StringWithFormat}, gpodder::{auth_middleware, format::StringWithFormat},
@ -34,7 +34,7 @@ pub async fn get_device_subscriptions(
} }
Ok( Ok(
tokio::task::spawn_blocking(move || ctx.repo.subscriptions_for_device(&user, &id)) tokio::task::spawn_blocking(move || ctx.store.subscriptions_for_device(&user, &id))
.await .await
.unwrap() .unwrap()
.map(Json)?, .map(Json)?,
@ -51,7 +51,7 @@ pub async fn get_user_subscriptions(
} }
Ok( Ok(
tokio::task::spawn_blocking(move || ctx.repo.subscriptions_for_user(&user)) tokio::task::spawn_blocking(move || ctx.store.subscriptions_for_user(&user))
.await .await
.unwrap() .unwrap()
.map(Json)?, .map(Json)?,
@ -68,12 +68,10 @@ pub async fn put_device_subscriptions(
return Err(AppError::BadRequest); return Err(AppError::BadRequest);
} }
Ok( Ok(tokio::task::spawn_blocking(move || {
tokio::task::spawn_blocking(move || { ctx.store.set_subscriptions_for_device(&user, &id, urls)
ctx.repo.set_subscriptions_for_device(&user, &id, urls) })
}) .await
.await .unwrap()
.unwrap() .map(|_| ())?)
.map(|_| ())?,
)
} }