refactor: moved knowledge of subscription change time to store
parent
8a9744c4a9
commit
330877c8c5
|
@ -1,5 +1,6 @@
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
|
|
||||||
|
use chrono::DateTime;
|
||||||
use diesel::prelude::*;
|
use diesel::prelude::*;
|
||||||
|
|
||||||
use super::SqliteRepository;
|
use super::SqliteRepository;
|
||||||
|
@ -8,24 +9,39 @@ use crate::{
|
||||||
gpodder,
|
gpodder,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
impl From<(String, i64)> for gpodder::Subscription {
|
||||||
|
fn from((url, ts): (String, i64)) -> Self {
|
||||||
|
Self {
|
||||||
|
url,
|
||||||
|
time_changed: DateTime::from_timestamp(ts, 0).unwrap(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl gpodder::SubscriptionRepository for SqliteRepository {
|
impl gpodder::SubscriptionRepository for SqliteRepository {
|
||||||
fn subscriptions_for_user(
|
fn subscriptions_for_user(
|
||||||
&self,
|
&self,
|
||||||
user: &gpodder::User,
|
user: &gpodder::User,
|
||||||
) -> Result<Vec<String>, gpodder::AuthErr> {
|
) -> Result<Vec<gpodder::Subscription>, gpodder::AuthErr> {
|
||||||
Ok(device_subscriptions::table
|
Ok(device_subscriptions::table
|
||||||
.inner_join(devices::table)
|
.inner_join(devices::table)
|
||||||
.filter(devices::user_id.eq(user.id))
|
.filter(devices::user_id.eq(user.id))
|
||||||
.select(device_subscriptions::podcast_url)
|
.select((
|
||||||
|
device_subscriptions::podcast_url,
|
||||||
|
device_subscriptions::time_changed,
|
||||||
|
))
|
||||||
.distinct()
|
.distinct()
|
||||||
.get_results(&mut self.pool.get()?)?)
|
.get_results::<(String, i64)>(&mut self.pool.get()?)?
|
||||||
|
.into_iter()
|
||||||
|
.map(Into::into)
|
||||||
|
.collect())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn subscriptions_for_device(
|
fn subscriptions_for_device(
|
||||||
&self,
|
&self,
|
||||||
user: &gpodder::User,
|
user: &gpodder::User,
|
||||||
device_id: &str,
|
device_id: &str,
|
||||||
) -> Result<Vec<String>, gpodder::AuthErr> {
|
) -> Result<Vec<gpodder::Subscription>, gpodder::AuthErr> {
|
||||||
Ok(device_subscriptions::table
|
Ok(device_subscriptions::table
|
||||||
.inner_join(devices::table)
|
.inner_join(devices::table)
|
||||||
.filter(
|
.filter(
|
||||||
|
@ -33,8 +49,14 @@ impl gpodder::SubscriptionRepository for SqliteRepository {
|
||||||
.eq(user.id)
|
.eq(user.id)
|
||||||
.and(devices::device_id.eq(device_id)),
|
.and(devices::device_id.eq(device_id)),
|
||||||
)
|
)
|
||||||
.select(device_subscriptions::podcast_url)
|
.select((
|
||||||
.get_results(&mut self.pool.get()?)?)
|
device_subscriptions::podcast_url,
|
||||||
|
device_subscriptions::time_changed,
|
||||||
|
))
|
||||||
|
.get_results::<(String, i64)>(&mut self.pool.get()?)?
|
||||||
|
.into_iter()
|
||||||
|
.map(Into::into)
|
||||||
|
.collect())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn set_subscriptions_for_device(
|
fn set_subscriptions_for_device(
|
||||||
|
@ -230,7 +252,7 @@ impl gpodder::SubscriptionRepository for SqliteRepository {
|
||||||
user: &gpodder::User,
|
user: &gpodder::User,
|
||||||
device_id: &str,
|
device_id: &str,
|
||||||
since: chrono::DateTime<chrono::Utc>,
|
since: chrono::DateTime<chrono::Utc>,
|
||||||
) -> Result<(chrono::DateTime<chrono::Utc>, Vec<String>, Vec<String>), gpodder::AuthErr> {
|
) -> Result<(Vec<gpodder::Subscription>, Vec<gpodder::Subscription>), gpodder::AuthErr> {
|
||||||
let since = since.timestamp();
|
let since = since.timestamp();
|
||||||
|
|
||||||
let (mut added, mut removed) = (Vec::new(), Vec::new());
|
let (mut added, mut removed) = (Vec::new(), Vec::new());
|
||||||
|
@ -245,20 +267,22 @@ impl gpodder::SubscriptionRepository for SqliteRepository {
|
||||||
)
|
)
|
||||||
.select(db::DeviceSubscription::as_select());
|
.select(db::DeviceSubscription::as_select());
|
||||||
|
|
||||||
let mut max_time: chrono::DateTime<chrono::Utc> = chrono::DateTime::<chrono::Utc>::MIN_UTC;
|
|
||||||
|
|
||||||
for sub in query.load_iter(&mut self.pool.get()?)? {
|
for sub in query.load_iter(&mut self.pool.get()?)? {
|
||||||
let sub = sub?;
|
let sub = sub?;
|
||||||
|
|
||||||
if sub.deleted {
|
if sub.deleted {
|
||||||
removed.push(sub.podcast_url);
|
removed.push(gpodder::Subscription {
|
||||||
|
url: sub.podcast_url,
|
||||||
|
time_changed: DateTime::from_timestamp(sub.time_changed, 0).unwrap(),
|
||||||
|
});
|
||||||
} else {
|
} else {
|
||||||
added.push(sub.podcast_url);
|
added.push(gpodder::Subscription {
|
||||||
|
url: sub.podcast_url,
|
||||||
|
time_changed: DateTime::from_timestamp(sub.time_changed, 0).unwrap(),
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
max_time = max_time.max(chrono::DateTime::from_timestamp(sub.time_changed, 0).unwrap());
|
Ok((added, removed))
|
||||||
}
|
|
||||||
|
|
||||||
Ok((max_time, added, removed))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -74,10 +74,10 @@ pub trait SubscriptionRepository {
|
||||||
&self,
|
&self,
|
||||||
user: &User,
|
user: &User,
|
||||||
device_id: &str,
|
device_id: &str,
|
||||||
) -> Result<Vec<String>, AuthErr>;
|
) -> Result<Vec<models::Subscription>, AuthErr>;
|
||||||
|
|
||||||
/// Return all subscriptions for a given user
|
/// Return all subscriptions for a given user
|
||||||
fn subscriptions_for_user(&self, user: &User) -> Result<Vec<String>, AuthErr>;
|
fn subscriptions_for_user(&self, user: &User) -> Result<Vec<models::Subscription>, AuthErr>;
|
||||||
|
|
||||||
/// Replace the list of subscriptions for a device with the given list
|
/// Replace the list of subscriptions for a device with the given list
|
||||||
fn set_subscriptions_for_device(
|
fn set_subscriptions_for_device(
|
||||||
|
@ -104,7 +104,7 @@ pub trait SubscriptionRepository {
|
||||||
user: &User,
|
user: &User,
|
||||||
device_id: &str,
|
device_id: &str,
|
||||||
since: DateTime<Utc>,
|
since: DateTime<Utc>,
|
||||||
) -> Result<(DateTime<Utc>, Vec<String>, Vec<String>), AuthErr>;
|
) -> Result<(Vec<Subscription>, Vec<Subscription>), AuthErr>;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait EpisodeActionRepository {
|
pub trait EpisodeActionRepository {
|
||||||
|
|
|
@ -52,3 +52,8 @@ pub struct Session {
|
||||||
pub last_seen: DateTime<Utc>,
|
pub last_seen: DateTime<Utc>,
|
||||||
pub user: User,
|
pub user: User,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub struct Subscription {
|
||||||
|
pub url: String,
|
||||||
|
pub time_changed: DateTime<Utc>,
|
||||||
|
}
|
||||||
|
|
|
@ -86,11 +86,14 @@ impl GpodderRepository {
|
||||||
&self,
|
&self,
|
||||||
user: &models::User,
|
user: &models::User,
|
||||||
device_id: &str,
|
device_id: &str,
|
||||||
) -> Result<Vec<String>, AuthErr> {
|
) -> Result<Vec<models::Subscription>, AuthErr> {
|
||||||
self.store.subscriptions_for_device(user, device_id)
|
self.store.subscriptions_for_device(user, device_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn subscriptions_for_user(&self, user: &models::User) -> Result<Vec<String>, AuthErr> {
|
pub fn subscriptions_for_user(
|
||||||
|
&self,
|
||||||
|
user: &models::User,
|
||||||
|
) -> Result<Vec<models::Subscription>, AuthErr> {
|
||||||
self.store.subscriptions_for_user(user)
|
self.store.subscriptions_for_user(user)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -128,11 +131,26 @@ impl GpodderRepository {
|
||||||
user: &models::User,
|
user: &models::User,
|
||||||
device_id: &str,
|
device_id: &str,
|
||||||
since: DateTime<Utc>,
|
since: DateTime<Utc>,
|
||||||
) -> Result<(DateTime<Utc>, Vec<String>, Vec<String>), AuthErr> {
|
) -> Result<
|
||||||
let (max_time_changed, added, removed) = self
|
(
|
||||||
|
DateTime<Utc>,
|
||||||
|
Vec<models::Subscription>,
|
||||||
|
Vec<models::Subscription>,
|
||||||
|
),
|
||||||
|
AuthErr,
|
||||||
|
> {
|
||||||
|
let now = chrono::Utc::now();
|
||||||
|
|
||||||
|
let (added, removed) = self
|
||||||
.store
|
.store
|
||||||
.subscription_updates_for_device(user, device_id, since)?;
|
.subscription_updates_for_device(user, device_id, since)?;
|
||||||
|
|
||||||
|
let max_time_changed = added
|
||||||
|
.iter()
|
||||||
|
.chain(removed.iter())
|
||||||
|
.map(|s| s.time_changed)
|
||||||
|
.max()
|
||||||
|
.unwrap_or(now);
|
||||||
Ok((max_time_changed + TimeDelta::seconds(1), added, removed))
|
Ok((max_time_changed + TimeDelta::seconds(1), added, removed))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -85,8 +85,8 @@ pub async fn get_subscription_changes(
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.map(|(next_time_changed, add, remove)| {
|
.map(|(next_time_changed, add, remove)| {
|
||||||
Json(SubscriptionDeltaResponse {
|
Json(SubscriptionDeltaResponse {
|
||||||
add,
|
add: add.into_iter().map(|s| s.url).collect(),
|
||||||
remove,
|
remove: remove.into_iter().map(|s| s.url).collect(),
|
||||||
timestamp: next_time_changed.timestamp(),
|
timestamp: next_time_changed.timestamp(),
|
||||||
})
|
})
|
||||||
})?)
|
})?)
|
||||||
|
|
|
@ -37,7 +37,7 @@ pub async fn get_device_subscriptions(
|
||||||
tokio::task::spawn_blocking(move || ctx.store.subscriptions_for_device(&user, &id))
|
tokio::task::spawn_blocking(move || ctx.store.subscriptions_for_device(&user, &id))
|
||||||
.await
|
.await
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.map(Json)?,
|
.map(|subs| Json(subs.into_iter().map(|s| s.url).collect()))?,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -54,7 +54,7 @@ pub async fn get_user_subscriptions(
|
||||||
tokio::task::spawn_blocking(move || ctx.store.subscriptions_for_user(&user))
|
tokio::task::spawn_blocking(move || ctx.store.subscriptions_for_user(&user))
|
||||||
.await
|
.await
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.map(Json)?,
|
.map(|subs| Json(subs.into_iter().map(|s| s.url).collect()))?,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue