refactor: migrate subscriptions API to store
parent
3a5a6759ac
commit
da7befc5c4
|
@ -42,9 +42,10 @@ impl gpodder::SubscriptionRepository for SqliteRepository {
|
|||
user: &gpodder::User,
|
||||
device_id: &str,
|
||||
urls: Vec<String>,
|
||||
) -> Result<i64, gpodder::AuthErr> {
|
||||
time_changed: chrono::DateTime<chrono::Utc>,
|
||||
) -> Result<(), gpodder::AuthErr> {
|
||||
// TODO use a better timestamp
|
||||
let timestamp = chrono::Utc::now().timestamp();
|
||||
let timestamp = time_changed.timestamp();
|
||||
|
||||
self.pool.get()?.transaction(|conn| {
|
||||
let device = devices::table
|
||||
|
@ -126,7 +127,7 @@ impl gpodder::SubscriptionRepository for SqliteRepository {
|
|||
Ok::<_, diesel::result::Error>(())
|
||||
})?;
|
||||
|
||||
Ok(timestamp + 1)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn update_subscriptions_for_device(
|
||||
|
@ -135,9 +136,10 @@ impl gpodder::SubscriptionRepository for SqliteRepository {
|
|||
device_id: &str,
|
||||
add: Vec<String>,
|
||||
remove: Vec<String>,
|
||||
) -> Result<i64, gpodder::AuthErr> {
|
||||
time_changed: chrono::DateTime<chrono::Utc>,
|
||||
) -> Result<(), gpodder::AuthErr> {
|
||||
// TODO use a better timestamp
|
||||
let timestamp = chrono::Utc::now().timestamp_millis();
|
||||
let timestamp = time_changed.timestamp();
|
||||
|
||||
// TODO URLs that are in both the added and removed lists will currently get "re-added",
|
||||
// meaning their change timestamp will be updated even though they haven't really changed.
|
||||
|
@ -220,16 +222,18 @@ impl gpodder::SubscriptionRepository for SqliteRepository {
|
|||
Ok::<_, diesel::result::Error>(())
|
||||
})?;
|
||||
|
||||
Ok(timestamp + 1)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn subscription_updates_for_device(
|
||||
&self,
|
||||
user: &gpodder::User,
|
||||
device_id: &str,
|
||||
since: i64,
|
||||
) -> Result<(i64, Vec<String>, Vec<String>), gpodder::AuthErr> {
|
||||
let (mut timestamp, mut added, mut removed) = (0, Vec::new(), Vec::new());
|
||||
since: chrono::DateTime<chrono::Utc>,
|
||||
) -> Result<(chrono::DateTime<chrono::Utc>, Vec<String>, Vec<String>), gpodder::AuthErr> {
|
||||
let since = since.timestamp();
|
||||
|
||||
let (mut added, mut removed) = (Vec::new(), Vec::new());
|
||||
|
||||
let query = device_subscriptions::table
|
||||
.inner_join(devices::table)
|
||||
|
@ -241,6 +245,8 @@ impl gpodder::SubscriptionRepository for SqliteRepository {
|
|||
)
|
||||
.select(db::DeviceSubscription::as_select());
|
||||
|
||||
let mut max_time: chrono::DateTime<chrono::Utc> = chrono::DateTime::<chrono::Utc>::MIN_UTC;
|
||||
|
||||
for sub in query.load_iter(&mut self.pool.get()?)? {
|
||||
let sub = sub?;
|
||||
|
||||
|
@ -250,9 +256,9 @@ impl gpodder::SubscriptionRepository for SqliteRepository {
|
|||
added.push(sub.podcast_url);
|
||||
}
|
||||
|
||||
timestamp = timestamp.max(sub.time_changed);
|
||||
max_time = max_time.max(chrono::DateTime::from_timestamp(sub.time_changed, 0).unwrap());
|
||||
}
|
||||
|
||||
Ok((timestamp + 1, added, removed))
|
||||
Ok((max_time, added, removed))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -11,9 +11,9 @@ pub enum AuthErr {
|
|||
Other(Box<dyn std::error::Error + Sync + Send>),
|
||||
}
|
||||
|
||||
pub trait Store: AuthStore + DeviceRepository {}
|
||||
pub trait Store: AuthStore + DeviceRepository + SubscriptionRepository {}
|
||||
|
||||
impl<T> Store for T where T: AuthStore + DeviceRepository {}
|
||||
impl<T> Store for T where T: AuthStore + DeviceRepository + SubscriptionRepository {}
|
||||
|
||||
pub trait AuthRepository {
|
||||
/// Validate the given session ID and return its user.
|
||||
|
@ -78,7 +78,8 @@ pub trait SubscriptionRepository {
|
|||
user: &User,
|
||||
device_id: &str,
|
||||
urls: Vec<String>,
|
||||
) -> Result<i64, AuthErr>;
|
||||
time_changed: chrono::DateTime<chrono::Utc>,
|
||||
) -> Result<(), AuthErr>;
|
||||
|
||||
/// Update the list of subscriptions for a device by adding and removing the given URLs
|
||||
fn update_subscriptions_for_device(
|
||||
|
@ -87,15 +88,16 @@ pub trait SubscriptionRepository {
|
|||
device_id: &str,
|
||||
add: Vec<String>,
|
||||
remove: Vec<String>,
|
||||
) -> Result<i64, AuthErr>;
|
||||
time_changed: chrono::DateTime<chrono::Utc>,
|
||||
) -> Result<(), AuthErr>;
|
||||
|
||||
/// Returns the changes in subscriptions since the given timestamp.
|
||||
fn subscription_updates_for_device(
|
||||
&self,
|
||||
user: &User,
|
||||
device_id: &str,
|
||||
since: i64,
|
||||
) -> Result<(i64, Vec<String>, Vec<String>), AuthErr>;
|
||||
since: chrono::DateTime<chrono::Utc>,
|
||||
) -> Result<(chrono::DateTime<chrono::Utc>, Vec<String>, Vec<String>), AuthErr>;
|
||||
}
|
||||
|
||||
pub trait EpisodeActionRepository {
|
||||
|
|
|
@ -82,4 +82,62 @@ impl GpodderRepository {
|
|||
) -> Result<(), AuthErr> {
|
||||
self.store.update_device_info(user, device_id, patch)
|
||||
}
|
||||
|
||||
pub fn subscriptions_for_device(
|
||||
&self,
|
||||
user: &models::User,
|
||||
device_id: &str,
|
||||
) -> Result<Vec<String>, AuthErr> {
|
||||
self.store.subscriptions_for_device(user, device_id)
|
||||
}
|
||||
|
||||
pub fn subscriptions_for_user(&self, user: &models::User) -> Result<Vec<String>, AuthErr> {
|
||||
self.store.subscriptions_for_user(user)
|
||||
}
|
||||
|
||||
pub fn set_subscriptions_for_device(
|
||||
&self,
|
||||
user: &models::User,
|
||||
device_id: &str,
|
||||
urls: Vec<String>,
|
||||
) -> Result<chrono::DateTime<chrono::Utc>, AuthErr> {
|
||||
let time_changed = chrono::Utc::now();
|
||||
|
||||
self.store
|
||||
.set_subscriptions_for_device(user, device_id, urls, time_changed)?;
|
||||
|
||||
Ok(time_changed + chrono::TimeDelta::seconds(1))
|
||||
}
|
||||
|
||||
pub fn update_subscriptions_for_device(
|
||||
&self,
|
||||
user: &models::User,
|
||||
device_id: &str,
|
||||
add: Vec<String>,
|
||||
remove: Vec<String>,
|
||||
) -> Result<chrono::DateTime<chrono::Utc>, AuthErr> {
|
||||
let time_changed = chrono::Utc::now();
|
||||
|
||||
self.store
|
||||
.update_subscriptions_for_device(user, device_id, add, remove, time_changed)?;
|
||||
|
||||
Ok(time_changed + chrono::TimeDelta::seconds(1))
|
||||
}
|
||||
|
||||
pub fn subscription_updates_for_device(
|
||||
&self,
|
||||
user: &models::User,
|
||||
device_id: &str,
|
||||
since: chrono::DateTime<chrono::Utc>,
|
||||
) -> Result<(chrono::DateTime<chrono::Utc>, Vec<String>, Vec<String>), AuthErr> {
|
||||
let (max_time_changed, added, removed) = self
|
||||
.store
|
||||
.subscription_updates_for_device(user, device_id, since)?;
|
||||
|
||||
Ok((
|
||||
max_time_changed + chrono::TimeDelta::seconds(1),
|
||||
added,
|
||||
removed,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,7 +7,7 @@ use axum::{
|
|||
use serde::Deserialize;
|
||||
|
||||
use crate::{
|
||||
gpodder::{self, SubscriptionRepository},
|
||||
gpodder,
|
||||
server::{
|
||||
error::{AppError, AppResult},
|
||||
gpodder::{
|
||||
|
@ -43,14 +43,14 @@ pub async fn post_subscription_changes(
|
|||
}
|
||||
|
||||
Ok(tokio::task::spawn_blocking(move || {
|
||||
ctx.repo
|
||||
ctx.store
|
||||
.update_subscriptions_for_device(&user, &id, delta.add, delta.remove)
|
||||
})
|
||||
.await
|
||||
.unwrap()
|
||||
.map(|timestamp| {
|
||||
.map(|time_changed| {
|
||||
Json(UpdatedUrlsResponse {
|
||||
timestamp,
|
||||
timestamp: time_changed.timestamp(),
|
||||
update_urls: Vec::new(),
|
||||
})
|
||||
})?)
|
||||
|
@ -76,17 +76,18 @@ pub async fn get_subscription_changes(
|
|||
return Err(AppError::BadRequest);
|
||||
}
|
||||
|
||||
let since = chrono::DateTime::from_timestamp(query.since, 0).ok_or(AppError::BadRequest)?;
|
||||
|
||||
Ok(tokio::task::spawn_blocking(move || {
|
||||
ctx.repo
|
||||
.subscription_updates_for_device(&user, &id, query.since)
|
||||
ctx.store.subscription_updates_for_device(&user, &id, since)
|
||||
})
|
||||
.await
|
||||
.unwrap()
|
||||
.map(|(timestamp, add, remove)| {
|
||||
.map(|(next_time_changed, add, remove)| {
|
||||
Json(SubscriptionDeltaResponse {
|
||||
add,
|
||||
remove,
|
||||
timestamp,
|
||||
timestamp: next_time_changed.timestamp(),
|
||||
})
|
||||
})?)
|
||||
}
|
||||
|
|
|
@ -6,7 +6,7 @@ use axum::{
|
|||
};
|
||||
|
||||
use crate::{
|
||||
gpodder::{self, SubscriptionRepository},
|
||||
gpodder,
|
||||
server::{
|
||||
error::{AppError, AppResult},
|
||||
gpodder::{auth_middleware, format::StringWithFormat},
|
||||
|
@ -34,7 +34,7 @@ pub async fn get_device_subscriptions(
|
|||
}
|
||||
|
||||
Ok(
|
||||
tokio::task::spawn_blocking(move || ctx.repo.subscriptions_for_device(&user, &id))
|
||||
tokio::task::spawn_blocking(move || ctx.store.subscriptions_for_device(&user, &id))
|
||||
.await
|
||||
.unwrap()
|
||||
.map(Json)?,
|
||||
|
@ -51,7 +51,7 @@ pub async fn get_user_subscriptions(
|
|||
}
|
||||
|
||||
Ok(
|
||||
tokio::task::spawn_blocking(move || ctx.repo.subscriptions_for_user(&user))
|
||||
tokio::task::spawn_blocking(move || ctx.store.subscriptions_for_user(&user))
|
||||
.await
|
||||
.unwrap()
|
||||
.map(Json)?,
|
||||
|
@ -68,12 +68,10 @@ pub async fn put_device_subscriptions(
|
|||
return Err(AppError::BadRequest);
|
||||
}
|
||||
|
||||
Ok(
|
||||
tokio::task::spawn_blocking(move || {
|
||||
ctx.repo.set_subscriptions_for_device(&user, &id, urls)
|
||||
})
|
||||
.await
|
||||
.unwrap()
|
||||
.map(|_| ())?,
|
||||
)
|
||||
Ok(tokio::task::spawn_blocking(move || {
|
||||
ctx.store.set_subscriptions_for_device(&user, &id, urls)
|
||||
})
|
||||
.await
|
||||
.unwrap()
|
||||
.map(|_| ())?)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue