Compare commits

...

7 Commits

19 changed files with 628 additions and 132 deletions

View File

@ -0,0 +1,3 @@
-- This file should undo anything in `up.sql`
alter table sessions
drop column last_seen;

View File

@ -0,0 +1,3 @@
-- Your SQL goes here
alter table sessions
add column last_seen bigint not null default 0;

View File

@ -6,9 +6,10 @@ pub fn serve(config: &crate::config::Config) -> u8 {
tracing::info!("Initializing database and running migrations");
let pool = db::initialize_db(config.data_dir.join(crate::DB_FILENAME), true).unwrap();
let repo = db::SqliteRepository::from(pool);
let ctx = server::Context {
repo: db::SqliteRepository::from(pool),
store: crate::gpodder::GpodderRepository::new(repo),
};
let app = server::app(ctx);

View File

@ -11,16 +11,21 @@ use crate::db::{schema::*, DbPool, DbResult};
pub struct Session {
pub id: i64,
pub user_id: i64,
pub last_seen: i64,
}
impl Session {
pub fn new_for_user(pool: &DbPool, user_id: i64) -> DbResult<Self> {
pub fn new_for_user(pool: &DbPool, user_id: i64, last_seen: i64) -> DbResult<Self> {
let id: i64 = rand::thread_rng().gen();
Ok(Self { id, user_id }
.insert_into(sessions::table)
.returning(Self::as_returning())
.get_result(&mut pool.get()?)?)
Ok(Self {
id,
user_id,
last_seen,
}
.insert_into(sessions::table)
.returning(Self::as_returning())
.get_result(&mut pool.get()?)?)
}
pub fn user_from_id(pool: &DbPool, id: i64) -> DbResult<Option<super::user::User>> {

View File

@ -1,3 +1,4 @@
use chrono::DateTime;
use diesel::prelude::*;
use rand::Rng;
@ -19,6 +20,16 @@ impl From<diesel::result::Error> for gpodder::AuthErr {
}
}
impl From<db::User> for gpodder::User {
fn from(value: db::User) -> Self {
Self {
id: value.id,
username: value.username,
password_hash: value.password_hash,
}
}
}
impl gpodder::AuthRepository for SqliteRepository {
fn validate_credentials(
&self,
@ -35,6 +46,7 @@ impl gpodder::AuthRepository for SqliteRepository {
Ok(gpodder::User {
id: user.id,
username: user.username,
password_hash: user.password_hash,
})
} else {
Err(gpodder::AuthErr::InvalidPassword)
@ -54,6 +66,7 @@ impl gpodder::AuthRepository for SqliteRepository {
Ok(user) => Ok(gpodder::User {
id: user.id,
username: user.username,
password_hash: user.password_hash,
}),
Err(diesel::result::Error::NotFound) => Err(gpodder::AuthErr::UnknownSession),
Err(err) => Err(gpodder::AuthErr::Other(Box::new(err))),
@ -77,6 +90,7 @@ impl gpodder::AuthRepository for SqliteRepository {
let session_id = db::Session {
id,
user_id: user.id,
last_seen: chrono::Utc::now().timestamp(),
}
.insert_into(sessions::table)
.returning(sessions::id)
@ -87,6 +101,7 @@ impl gpodder::AuthRepository for SqliteRepository {
gpodder::User {
id: user.id,
username: user.username,
password_hash: user.password_hash,
},
))
} else {
@ -121,3 +136,49 @@ impl gpodder::AuthRepository for SqliteRepository {
}
}
}
impl gpodder::AuthStore for SqliteRepository {
fn get_user(&self, username: &str) -> Result<Option<gpodder::models::User>, AuthErr> {
Ok(users::table
.select(db::User::as_select())
.filter(users::username.eq(username))
.first(&mut self.pool.get()?)
.optional()?
.map(gpodder::User::from))
}
fn get_session(&self, session_id: i64) -> Result<Option<gpodder::models::Session>, AuthErr> {
match sessions::table
.inner_join(users::table)
.filter(sessions::id.eq(session_id))
.select((db::Session::as_select(), db::User::as_select()))
.get_result(&mut self.pool.get()?)
{
Ok((session, user)) => Ok(Some(gpodder::Session {
id: session.id,
last_seen: DateTime::from_timestamp(session.last_seen, 0).unwrap(),
user: user.into(),
})),
Err(err) => Err(AuthErr::from(err)),
}
}
fn remove_session(&self, session_id: i64) -> Result<(), AuthErr> {
Ok(
diesel::delete(sessions::table.filter(sessions::id.eq(session_id)))
.execute(&mut self.pool.get()?)
.map(|_| ())?,
)
}
fn insert_session(&self, session: &gpodder::Session) -> Result<(), AuthErr> {
Ok(db::Session {
id: session.id,
user_id: session.user.id,
last_seen: session.last_seen.timestamp(),
}
.insert_into(sessions::table)
.execute(&mut self.pool.get()?)
.map(|_| ())?)
}
}

View File

@ -1,4 +1,4 @@
use chrono::DateTime;
use chrono::{DateTime, Utc};
use diesel::prelude::*;
use super::SqliteRepository;
@ -26,7 +26,7 @@ impl From<gpodder::EpisodeAction> for db::NewEpisodeAction {
podcast_url: value.podcast,
episode_url: value.episode,
time_changed: 0,
timestamp: value.timestamp.map(|t| t.and_utc().timestamp()),
timestamp: value.timestamp.map(|t| t.timestamp()),
action,
started,
position,
@ -58,7 +58,8 @@ impl From<(Option<String>, db::EpisodeAction)> for gpodder::EpisodeAction {
// SAFETY the input to the from_timestamp function is always the result of a
// previous timestamp() function call, which is guaranteed to be each other's
// reverse
.map(|ts| DateTime::from_timestamp(ts, 0).unwrap().naive_utc()),
.map(|ts| DateTime::from_timestamp(ts, 0).unwrap()),
time_changed: DateTime::from_timestamp(db_action.time_changed, 0).unwrap(),
device: device_id,
action,
}
@ -70,8 +71,9 @@ impl gpodder::EpisodeActionRepository for SqliteRepository {
&self,
user: &gpodder::User,
actions: Vec<gpodder::EpisodeAction>,
) -> Result<i64, gpodder::AuthErr> {
let time_changed = chrono::Utc::now().timestamp();
time_changed: DateTime<Utc>,
) -> Result<(), gpodder::AuthErr> {
let time_changed = time_changed.timestamp();
// TODO optimize this query
// 1. The lookup for a device could be replaced with a subquery, although Diesel seems to
@ -99,17 +101,18 @@ impl gpodder::EpisodeActionRepository for SqliteRepository {
Ok::<_, diesel::result::Error>(())
})?;
Ok(time_changed + 1)
Ok(())
}
fn episode_actions_for_user(
&self,
user: &gpodder::User,
since: Option<i64>,
since: Option<DateTime<Utc>>,
podcast: Option<String>,
device: Option<String>,
aggregated: bool,
) -> Result<(i64, Vec<gpodder::EpisodeAction>), gpodder::AuthErr> {
) -> Result<Vec<gpodder::EpisodeAction>, gpodder::AuthErr> {
let since = since.map(|ts| ts.timestamp()).unwrap_or(0);
let conn = &mut self.pool.get()?;
let mut query = episode_actions::table
@ -117,7 +120,7 @@ impl gpodder::EpisodeActionRepository for SqliteRepository {
.filter(
episode_actions::user_id
.eq(user.id)
.and(episode_actions::time_changed.ge(since.unwrap_or(0))),
.and(episode_actions::time_changed.ge(since)),
)
.select((
devices::device_id.nullable(),
@ -157,16 +160,11 @@ impl gpodder::EpisodeActionRepository for SqliteRepository {
query.get_results(conn)?
};
let max_timestamp = db_actions
.iter()
.map(|(_, a)| a.time_changed)
.max()
.unwrap_or(0);
let actions = db_actions
.into_iter()
.map(gpodder::EpisodeAction::from)
.collect();
Ok((max_timestamp + 1, actions))
Ok(actions)
}
}

View File

@ -1,5 +1,6 @@
use std::collections::HashSet;
use chrono::DateTime;
use diesel::prelude::*;
use super::SqliteRepository;
@ -8,24 +9,39 @@ use crate::{
gpodder,
};
impl From<(String, i64)> for gpodder::Subscription {
fn from((url, ts): (String, i64)) -> Self {
Self {
url,
time_changed: DateTime::from_timestamp(ts, 0).unwrap(),
}
}
}
impl gpodder::SubscriptionRepository for SqliteRepository {
fn subscriptions_for_user(
&self,
user: &gpodder::User,
) -> Result<Vec<String>, gpodder::AuthErr> {
) -> Result<Vec<gpodder::Subscription>, gpodder::AuthErr> {
Ok(device_subscriptions::table
.inner_join(devices::table)
.filter(devices::user_id.eq(user.id))
.select(device_subscriptions::podcast_url)
.select((
device_subscriptions::podcast_url,
device_subscriptions::time_changed,
))
.distinct()
.get_results(&mut self.pool.get()?)?)
.get_results::<(String, i64)>(&mut self.pool.get()?)?
.into_iter()
.map(Into::into)
.collect())
}
fn subscriptions_for_device(
&self,
user: &gpodder::User,
device_id: &str,
) -> Result<Vec<String>, gpodder::AuthErr> {
) -> Result<Vec<gpodder::Subscription>, gpodder::AuthErr> {
Ok(device_subscriptions::table
.inner_join(devices::table)
.filter(
@ -33,8 +49,14 @@ impl gpodder::SubscriptionRepository for SqliteRepository {
.eq(user.id)
.and(devices::device_id.eq(device_id)),
)
.select(device_subscriptions::podcast_url)
.get_results(&mut self.pool.get()?)?)
.select((
device_subscriptions::podcast_url,
device_subscriptions::time_changed,
))
.get_results::<(String, i64)>(&mut self.pool.get()?)?
.into_iter()
.map(Into::into)
.collect())
}
fn set_subscriptions_for_device(
@ -42,9 +64,10 @@ impl gpodder::SubscriptionRepository for SqliteRepository {
user: &gpodder::User,
device_id: &str,
urls: Vec<String>,
) -> Result<i64, gpodder::AuthErr> {
time_changed: chrono::DateTime<chrono::Utc>,
) -> Result<(), gpodder::AuthErr> {
// TODO use a better timestamp
let timestamp = chrono::Utc::now().timestamp();
let timestamp = time_changed.timestamp();
self.pool.get()?.transaction(|conn| {
let device = devices::table
@ -126,7 +149,7 @@ impl gpodder::SubscriptionRepository for SqliteRepository {
Ok::<_, diesel::result::Error>(())
})?;
Ok(timestamp + 1)
Ok(())
}
fn update_subscriptions_for_device(
@ -135,9 +158,10 @@ impl gpodder::SubscriptionRepository for SqliteRepository {
device_id: &str,
add: Vec<String>,
remove: Vec<String>,
) -> Result<i64, gpodder::AuthErr> {
time_changed: chrono::DateTime<chrono::Utc>,
) -> Result<(), gpodder::AuthErr> {
// TODO use a better timestamp
let timestamp = chrono::Utc::now().timestamp_millis();
let timestamp = time_changed.timestamp();
// TODO URLs that are in both the added and removed lists will currently get "re-added",
// meaning their change timestamp will be updated even though they haven't really changed.
@ -220,16 +244,18 @@ impl gpodder::SubscriptionRepository for SqliteRepository {
Ok::<_, diesel::result::Error>(())
})?;
Ok(timestamp + 1)
Ok(())
}
fn subscription_updates_for_device(
&self,
user: &gpodder::User,
device_id: &str,
since: i64,
) -> Result<(i64, Vec<String>, Vec<String>), gpodder::AuthErr> {
let (mut timestamp, mut added, mut removed) = (0, Vec::new(), Vec::new());
since: chrono::DateTime<chrono::Utc>,
) -> Result<(Vec<gpodder::Subscription>, Vec<gpodder::Subscription>), gpodder::AuthErr> {
let since = since.timestamp();
let (mut added, mut removed) = (Vec::new(), Vec::new());
let query = device_subscriptions::table
.inner_join(devices::table)
@ -245,14 +271,18 @@ impl gpodder::SubscriptionRepository for SqliteRepository {
let sub = sub?;
if sub.deleted {
removed.push(sub.podcast_url);
removed.push(gpodder::Subscription {
url: sub.podcast_url,
time_changed: DateTime::from_timestamp(sub.time_changed, 0).unwrap(),
});
} else {
added.push(sub.podcast_url);
added.push(gpodder::Subscription {
url: sub.podcast_url,
time_changed: DateTime::from_timestamp(sub.time_changed, 0).unwrap(),
});
}
timestamp = timestamp.max(sub.time_changed);
}
Ok((timestamp + 1, added, removed))
Ok((added, removed))
}
}

View File

@ -41,6 +41,7 @@ diesel::table! {
sessions (id) {
id -> BigInt,
user_id -> BigInt,
last_seen -> BigInt,
}
}

View File

@ -1,6 +1,9 @@
pub mod models;
mod repository;
use chrono::{DateTime, Utc};
pub use models::*;
pub use repository::GpodderRepository;
pub enum AuthErr {
UnknownSession,
@ -9,6 +12,16 @@ pub enum AuthErr {
Other(Box<dyn std::error::Error + Sync + Send>),
}
pub trait Store:
AuthStore + DeviceRepository + SubscriptionRepository + EpisodeActionRepository
{
}
impl<T> Store for T where
T: AuthStore + DeviceRepository + SubscriptionRepository + EpisodeActionRepository
{
}
pub trait AuthRepository {
/// Validate the given session ID and return its user.
fn validate_session(&self, session_id: i64) -> Result<models::User, AuthErr>;
@ -27,6 +40,20 @@ pub trait AuthRepository {
fn remove_session(&self, username: &str, session_id: i64) -> Result<(), AuthErr>;
}
pub trait AuthStore {
/// Retrieve the session with the given session ID
fn get_session(&self, session_id: i64) -> Result<Option<models::Session>, AuthErr>;
/// Retrieve the user with the given username
fn get_user(&self, username: &str) -> Result<Option<models::User>, AuthErr>;
/// Create a new session for a user with the given session ID
fn insert_session(&self, session: &Session) -> Result<(), AuthErr>;
/// Remove the session with the given session ID
fn remove_session(&self, session_id: i64) -> Result<(), AuthErr>;
}
pub trait DeviceRepository {
/// Return all devices associated with the user
fn devices_for_user(&self, user: &User) -> Result<Vec<Device>, AuthErr>;
@ -47,10 +74,10 @@ pub trait SubscriptionRepository {
&self,
user: &User,
device_id: &str,
) -> Result<Vec<String>, AuthErr>;
) -> Result<Vec<models::Subscription>, AuthErr>;
/// Return all subscriptions for a given user
fn subscriptions_for_user(&self, user: &User) -> Result<Vec<String>, AuthErr>;
fn subscriptions_for_user(&self, user: &User) -> Result<Vec<models::Subscription>, AuthErr>;
/// Replace the list of subscriptions for a device with the given list
fn set_subscriptions_for_device(
@ -58,7 +85,8 @@ pub trait SubscriptionRepository {
user: &User,
device_id: &str,
urls: Vec<String>,
) -> Result<i64, AuthErr>;
time_changed: DateTime<Utc>,
) -> Result<(), AuthErr>;
/// Update the list of subscriptions for a device by adding and removing the given URLs
fn update_subscriptions_for_device(
@ -67,29 +95,34 @@ pub trait SubscriptionRepository {
device_id: &str,
add: Vec<String>,
remove: Vec<String>,
) -> Result<i64, AuthErr>;
time_changed: DateTime<Utc>,
) -> Result<(), AuthErr>;
/// Returns the changes in subscriptions since the given timestamp.
fn subscription_updates_for_device(
&self,
user: &User,
device_id: &str,
since: i64,
) -> Result<(i64, Vec<String>, Vec<String>), AuthErr>;
since: DateTime<Utc>,
) -> Result<(Vec<Subscription>, Vec<Subscription>), AuthErr>;
}
pub trait EpisodeActionRepository {
/// Insert the given episode actions into the datastore.
fn add_episode_actions(&self, user: &User, actions: Vec<EpisodeAction>)
-> Result<i64, AuthErr>;
fn add_episode_actions(
&self,
user: &User,
actions: Vec<EpisodeAction>,
time_changed: DateTime<Utc>,
) -> Result<(), AuthErr>;
/// Retrieve the list of episode actions for the given user.
fn episode_actions_for_user(
&self,
user: &User,
since: Option<i64>,
since: Option<DateTime<Utc>>,
podcast: Option<String>,
device: Option<String>,
aggregated: bool,
) -> Result<(i64, Vec<EpisodeAction>), AuthErr>;
) -> Result<Vec<EpisodeAction>, AuthErr>;
}

View File

@ -1,14 +1,12 @@
use chrono::NaiveDateTime;
use serde::{Deserialize, Serialize};
use chrono::{DateTime, Utc};
#[derive(Clone)]
pub struct User {
pub id: i64,
pub username: String,
pub password_hash: String,
}
#[derive(Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum DeviceType {
Desktop,
Laptop,
@ -17,7 +15,6 @@ pub enum DeviceType {
Other,
}
#[derive(Serialize)]
pub struct Device {
pub id: String,
pub caption: String,
@ -25,35 +22,38 @@ pub struct Device {
pub subscriptions: i64,
}
#[derive(Deserialize)]
pub struct DevicePatch {
pub caption: Option<String>,
pub r#type: Option<DeviceType>,
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "lowercase")]
#[serde(tag = "action")]
pub enum EpisodeActionType {
Download,
Play {
#[serde(default)]
started: Option<i32>,
position: i32,
#[serde(default)]
total: Option<i32>,
},
Delete,
New,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct EpisodeAction {
pub podcast: String,
pub episode: String,
pub timestamp: Option<NaiveDateTime>,
#[serde(default)]
pub timestamp: Option<DateTime<Utc>>,
pub time_changed: DateTime<Utc>,
pub device: Option<String>,
#[serde(flatten)]
pub action: EpisodeActionType,
}
pub struct Session {
pub id: i64,
pub last_seen: DateTime<Utc>,
pub user: User,
}
pub struct Subscription {
pub url: String,
pub time_changed: DateTime<Utc>,
}

View File

@ -0,0 +1,186 @@
use std::sync::Arc;
use argon2::{Argon2, PasswordHash, PasswordVerifier};
use chrono::{DateTime, TimeDelta, Utc};
use rand::Rng;
use super::{models, AuthErr, Store};
const MAX_SESSION_AGE: i64 = 60 * 60 * 24 * 7;
#[derive(Clone)]
pub struct GpodderRepository {
store: Arc<dyn Store + Send + Sync>,
}
impl GpodderRepository {
pub fn new(store: impl Store + Send + Sync + 'static) -> Self {
Self {
store: Arc::new(store),
}
}
pub fn validate_session(&self, session_id: i64) -> Result<models::Session, AuthErr> {
let session = self
.store
.get_session(session_id)?
.ok_or(AuthErr::UnknownSession)?;
// Expired sessions still in the database are considered removed
if Utc::now() - session.last_seen > TimeDelta::new(MAX_SESSION_AGE, 0).unwrap() {
Err(AuthErr::UnknownSession)
} else {
Ok(session)
}
}
pub fn validate_credentials(
&self,
username: &str,
password: &str,
) -> Result<models::User, AuthErr> {
let user = self.store.get_user(username)?.ok_or(AuthErr::UnknownUser)?;
let password_hash = PasswordHash::new(&user.password_hash).unwrap();
if Argon2::default()
.verify_password(password.as_bytes(), &password_hash)
.is_ok()
{
Ok(user)
} else {
Err(AuthErr::InvalidPassword)
}
}
pub fn create_session(&self, user: &models::User) -> Result<models::Session, AuthErr> {
let session = models::Session {
id: rand::thread_rng().gen(),
last_seen: Utc::now(),
user: user.clone(),
};
self.store.insert_session(&session)?;
Ok(session)
}
pub fn remove_session(&self, session_id: i64) -> Result<(), AuthErr> {
self.store.remove_session(session_id)
}
pub fn devices_for_user(&self, user: &models::User) -> Result<Vec<models::Device>, AuthErr> {
self.store.devices_for_user(user)
}
pub fn update_device_info(
&self,
user: &models::User,
device_id: &str,
patch: models::DevicePatch,
) -> Result<(), AuthErr> {
self.store.update_device_info(user, device_id, patch)
}
pub fn subscriptions_for_device(
&self,
user: &models::User,
device_id: &str,
) -> Result<Vec<models::Subscription>, AuthErr> {
self.store.subscriptions_for_device(user, device_id)
}
pub fn subscriptions_for_user(
&self,
user: &models::User,
) -> Result<Vec<models::Subscription>, AuthErr> {
self.store.subscriptions_for_user(user)
}
pub fn set_subscriptions_for_device(
&self,
user: &models::User,
device_id: &str,
urls: Vec<String>,
) -> Result<DateTime<Utc>, AuthErr> {
let time_changed = Utc::now();
self.store
.set_subscriptions_for_device(user, device_id, urls, time_changed)?;
Ok(time_changed + TimeDelta::seconds(1))
}
pub fn update_subscriptions_for_device(
&self,
user: &models::User,
device_id: &str,
add: Vec<String>,
remove: Vec<String>,
) -> Result<DateTime<Utc>, AuthErr> {
let time_changed = Utc::now();
self.store
.update_subscriptions_for_device(user, device_id, add, remove, time_changed)?;
Ok(time_changed + TimeDelta::seconds(1))
}
pub fn subscription_updates_for_device(
&self,
user: &models::User,
device_id: &str,
since: DateTime<Utc>,
) -> Result<
(
DateTime<Utc>,
Vec<models::Subscription>,
Vec<models::Subscription>,
),
AuthErr,
> {
let now = chrono::Utc::now();
let (added, removed) = self
.store
.subscription_updates_for_device(user, device_id, since)?;
let max_time_changed = added
.iter()
.chain(removed.iter())
.map(|s| s.time_changed)
.max()
.unwrap_or(now);
Ok((max_time_changed + TimeDelta::seconds(1), added, removed))
}
pub fn add_episode_actions(
&self,
user: &models::User,
actions: Vec<models::EpisodeAction>,
) -> Result<DateTime<Utc>, AuthErr> {
let time_changed = Utc::now();
self.store
.add_episode_actions(user, actions, time_changed)?;
Ok(time_changed + TimeDelta::seconds(1))
}
pub fn episode_actions_for_user(
&self,
user: &models::User,
since: Option<DateTime<Utc>>,
podcast: Option<String>,
device: Option<String>,
aggregated: bool,
) -> Result<(DateTime<Utc>, Vec<models::EpisodeAction>), AuthErr> {
let now = chrono::Utc::now();
let actions = self
.store
.episode_actions_for_user(user, since, podcast, device, aggregated)?;
let max_time_changed = actions.iter().map(|a| a.time_changed).max().unwrap_or(now);
Ok((max_time_changed + TimeDelta::seconds(1), actions))
}
}

View File

@ -12,13 +12,10 @@ use axum_extra::{
TypedHeader,
};
use crate::{
gpodder::AuthRepository,
server::{
error::{AppError, AppResult},
gpodder::SESSION_ID_COOKIE,
Context,
},
use crate::server::{
error::{AppError, AppResult},
gpodder::SESSION_ID_COOKIE,
Context,
};
pub fn router() -> Router<Context> {
@ -38,14 +35,17 @@ async fn post_login(
return Err(AppError::BadRequest);
}
let (session_id, _) = tokio::task::spawn_blocking(move || {
ctx.repo.create_session(auth.username(), auth.password())
let session = tokio::task::spawn_blocking(move || {
let user = ctx
.store
.validate_credentials(auth.username(), auth.password())?;
ctx.store.create_session(&user)
})
.await
.unwrap()?;
Ok(jar.add(
Cookie::build((SESSION_ID_COOKIE, session_id.to_string())).expires(Expiration::Session),
Cookie::build((SESSION_ID_COOKIE, session.id.to_string())).expires(Expiration::Session),
))
}
@ -60,7 +60,8 @@ async fn post_logout(
.parse()
.map_err(|_| AppError::BadRequest)?;
tokio::task::spawn_blocking(move || ctx.repo.remove_session(&username, session_id))
// TODO reintroduce username check
tokio::task::spawn_blocking(move || ctx.store.remove_session(session_id))
.await
.unwrap()?;

View File

@ -6,12 +6,13 @@ use axum::{
};
use crate::{
gpodder::{self, DeviceRepository},
gpodder,
server::{
error::{AppError, AppResult},
gpodder::{
auth_middleware,
format::{Format, StringWithFormat},
models,
},
Context,
},
@ -28,7 +29,7 @@ async fn get_devices(
State(ctx): State<Context>,
Path(username): Path<StringWithFormat>,
Extension(user): Extension<gpodder::User>,
) -> AppResult<Json<Vec<gpodder::Device>>> {
) -> AppResult<Json<Vec<models::Device>>> {
if username.format != Format::Json {
return Err(AppError::NotFound);
}
@ -38,10 +39,10 @@ async fn get_devices(
}
Ok(
tokio::task::spawn_blocking(move || ctx.repo.devices_for_user(&user))
tokio::task::spawn_blocking(move || ctx.store.devices_for_user(&user))
.await
.unwrap()
.map(Json)?,
.map(|devices| Json(devices.into_iter().map(models::Device::from).collect()))?,
)
}
@ -49,13 +50,13 @@ async fn post_device(
State(ctx): State<Context>,
Path((_username, id)): Path<(String, StringWithFormat)>,
Extension(user): Extension<gpodder::User>,
Json(patch): Json<gpodder::DevicePatch>,
Json(patch): Json<models::DevicePatch>,
) -> AppResult<()> {
if id.format != Format::Json {
return Err(AppError::NotFound);
}
tokio::task::spawn_blocking(move || ctx.repo.update_device_info(&user, &id, patch))
tokio::task::spawn_blocking(move || ctx.store.update_device_info(&user, &id, patch.into()))
.await
.unwrap()?;

View File

@ -4,15 +4,17 @@ use axum::{
routing::post,
Extension, Json, Router,
};
use chrono::DateTime;
use serde::{Deserialize, Serialize};
use crate::{
gpodder::{self, EpisodeActionRepository},
gpodder,
server::{
error::{AppError, AppResult},
gpodder::{
auth_middleware,
format::{Format, StringWithFormat},
models,
models::UpdatedUrlsResponse,
},
Context,
@ -32,7 +34,7 @@ async fn post_episode_actions(
State(ctx): State<Context>,
Path(username): Path<StringWithFormat>,
Extension(user): Extension<gpodder::User>,
Json(actions): Json<Vec<gpodder::EpisodeAction>>,
Json(actions): Json<Vec<models::EpisodeAction>>,
) -> AppResult<Json<UpdatedUrlsResponse>> {
if username.format != Format::Json {
return Err(AppError::NotFound);
@ -42,17 +44,18 @@ async fn post_episode_actions(
return Err(AppError::BadRequest);
}
Ok(
tokio::task::spawn_blocking(move || ctx.repo.add_episode_actions(&user, actions))
.await
.unwrap()
.map(|timestamp| {
Json(UpdatedUrlsResponse {
timestamp,
update_urls: Vec::new(),
})
})?,
)
Ok(tokio::task::spawn_blocking(move || {
ctx.store
.add_episode_actions(&user, actions.into_iter().map(Into::into).collect())
})
.await
.unwrap()
.map(|time_changed| {
Json(UpdatedUrlsResponse {
timestamp: time_changed.timestamp(),
update_urls: Vec::new(),
})
})?)
}
#[derive(Deserialize, Default)]
@ -67,7 +70,7 @@ struct FilterQuery {
#[derive(Serialize)]
struct EpisodeActionsResponse {
timestamp: i64,
actions: Vec<gpodder::EpisodeAction>,
actions: Vec<models::EpisodeAction>,
}
async fn get_episode_actions(
@ -84,10 +87,15 @@ async fn get_episode_actions(
return Err(AppError::BadRequest);
}
let since = filter
.since
.map(|ts| DateTime::from_timestamp(ts, 0))
.flatten();
Ok(tokio::task::spawn_blocking(move || {
ctx.repo.episode_actions_for_user(
ctx.store.episode_actions_for_user(
&user,
filter.since,
since,
filter.podcast,
filter.device,
filter.aggregated,
@ -95,5 +103,10 @@ async fn get_episode_actions(
})
.await
.unwrap()
.map(|(timestamp, actions)| Json(EpisodeActionsResponse { timestamp, actions }))?)
.map(|(ts, actions)| {
Json(EpisodeActionsResponse {
timestamp: ts.timestamp(),
actions: actions.into_iter().map(Into::into).collect(),
})
})?)
}

View File

@ -7,7 +7,7 @@ use axum::{
use serde::Deserialize;
use crate::{
gpodder::{self, SubscriptionRepository},
gpodder,
server::{
error::{AppError, AppResult},
gpodder::{
@ -43,14 +43,14 @@ pub async fn post_subscription_changes(
}
Ok(tokio::task::spawn_blocking(move || {
ctx.repo
ctx.store
.update_subscriptions_for_device(&user, &id, delta.add, delta.remove)
})
.await
.unwrap()
.map(|timestamp| {
.map(|time_changed| {
Json(UpdatedUrlsResponse {
timestamp,
timestamp: time_changed.timestamp(),
update_urls: Vec::new(),
})
})?)
@ -76,17 +76,18 @@ pub async fn get_subscription_changes(
return Err(AppError::BadRequest);
}
let since = chrono::DateTime::from_timestamp(query.since, 0).ok_or(AppError::BadRequest)?;
Ok(tokio::task::spawn_blocking(move || {
ctx.repo
.subscription_updates_for_device(&user, &id, query.since)
ctx.store.subscription_updates_for_device(&user, &id, since)
})
.await
.unwrap()
.map(|(timestamp, add, remove)| {
.map(|(next_time_changed, add, remove)| {
Json(SubscriptionDeltaResponse {
add,
remove,
timestamp,
add: add.into_iter().map(|s| s.url).collect(),
remove: remove.into_iter().map(|s| s.url).collect(),
timestamp: next_time_changed.timestamp(),
})
})?)
}

View File

@ -17,10 +17,7 @@ use axum_extra::{
};
use tower_http::set_header::SetResponseHeaderLayer;
use crate::{
gpodder::{self, AuthRepository},
server::error::AppError,
};
use crate::{gpodder, server::error::AppError};
use super::Context;
@ -51,12 +48,12 @@ pub async fn auth_middleware(State(ctx): State<Context>, mut req: Request, next:
.and_then(|c| c.value().parse::<i64>().ok())
{
let ctx_clone = ctx.clone();
match tokio::task::spawn_blocking(move || ctx_clone.repo.validate_session(session_id))
match tokio::task::spawn_blocking(move || ctx_clone.store.validate_session(session_id))
.await
.unwrap()
{
Ok(user) => {
auth_user = Some(user);
Ok(session) => {
auth_user = Some(session.user);
}
Err(gpodder::AuthErr::UnknownSession) => {
jar = jar.add(
@ -77,7 +74,7 @@ pub async fn auth_middleware(State(ctx): State<Context>, mut req: Request, next:
.await
{
match tokio::task::spawn_blocking(move || {
ctx.repo
ctx.store
.validate_credentials(auth.username(), auth.password())
})
.await

View File

@ -1,5 +1,8 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use crate::gpodder;
#[derive(Deserialize, Debug)]
pub struct SubscriptionDelta {
pub add: Vec<String>,
@ -18,3 +21,164 @@ pub struct UpdatedUrlsResponse {
pub timestamp: i64,
pub update_urls: Vec<(String, String)>,
}
#[derive(Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum DeviceType {
Desktop,
Laptop,
Mobile,
Server,
Other,
}
#[derive(Serialize)]
pub struct Device {
pub id: String,
pub caption: String,
pub r#type: DeviceType,
pub subscriptions: i64,
}
#[derive(Deserialize)]
pub struct DevicePatch {
pub caption: Option<String>,
pub r#type: Option<DeviceType>,
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "lowercase")]
#[serde(tag = "action")]
pub enum EpisodeActionType {
Download,
Play {
#[serde(default)]
started: Option<i32>,
position: i32,
#[serde(default)]
total: Option<i32>,
},
Delete,
New,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct EpisodeAction {
pub podcast: String,
pub episode: String,
pub timestamp: Option<i64>,
#[serde(default)]
pub device: Option<String>,
#[serde(flatten)]
pub action: EpisodeActionType,
}
impl From<gpodder::DeviceType> for DeviceType {
fn from(value: gpodder::DeviceType) -> Self {
match value {
gpodder::DeviceType::Other => Self::Other,
gpodder::DeviceType::Laptop => Self::Laptop,
gpodder::DeviceType::Mobile => Self::Mobile,
gpodder::DeviceType::Server => Self::Server,
gpodder::DeviceType::Desktop => Self::Desktop,
}
}
}
impl From<DeviceType> for gpodder::DeviceType {
fn from(value: DeviceType) -> Self {
match value {
DeviceType::Other => gpodder::DeviceType::Other,
DeviceType::Laptop => gpodder::DeviceType::Laptop,
DeviceType::Mobile => gpodder::DeviceType::Mobile,
DeviceType::Server => gpodder::DeviceType::Server,
DeviceType::Desktop => gpodder::DeviceType::Desktop,
}
}
}
impl From<gpodder::Device> for Device {
fn from(value: gpodder::Device) -> Self {
Self {
id: value.id,
caption: value.caption,
r#type: value.r#type.into(),
subscriptions: value.subscriptions,
}
}
}
impl From<DevicePatch> for gpodder::DevicePatch {
fn from(value: DevicePatch) -> Self {
Self {
caption: value.caption,
r#type: value.r#type.map(Into::into),
}
}
}
impl From<gpodder::EpisodeActionType> for EpisodeActionType {
fn from(value: gpodder::EpisodeActionType) -> Self {
match value {
gpodder::EpisodeActionType::New => Self::New,
gpodder::EpisodeActionType::Delete => Self::Delete,
gpodder::EpisodeActionType::Download => Self::Download,
gpodder::EpisodeActionType::Play {
started,
position,
total,
} => Self::Play {
started,
position,
total,
},
}
}
}
impl From<EpisodeActionType> for gpodder::EpisodeActionType {
fn from(value: EpisodeActionType) -> Self {
match value {
EpisodeActionType::New => gpodder::EpisodeActionType::New,
EpisodeActionType::Delete => gpodder::EpisodeActionType::Delete,
EpisodeActionType::Download => gpodder::EpisodeActionType::Download,
EpisodeActionType::Play {
started,
position,
total,
} => gpodder::EpisodeActionType::Play {
started,
position,
total,
},
}
}
}
impl From<gpodder::EpisodeAction> for EpisodeAction {
fn from(value: gpodder::EpisodeAction) -> Self {
Self {
podcast: value.podcast,
episode: value.episode,
timestamp: value.timestamp.map(|ts| ts.timestamp()),
device: value.device,
action: value.action.into(),
}
}
}
impl From<EpisodeAction> for gpodder::EpisodeAction {
fn from(value: EpisodeAction) -> Self {
Self {
podcast: value.podcast,
episode: value.episode,
// TODO remove this unwrap
timestamp: value
.timestamp
.map(|ts| DateTime::from_timestamp(ts, 0).unwrap()),
device: value.device,
action: value.action.into(),
time_changed: DateTime::<Utc>::MIN_UTC,
}
}
}

View File

@ -6,7 +6,7 @@ use axum::{
};
use crate::{
gpodder::{self, SubscriptionRepository},
gpodder,
server::{
error::{AppError, AppResult},
gpodder::{auth_middleware, format::StringWithFormat},
@ -34,10 +34,10 @@ pub async fn get_device_subscriptions(
}
Ok(
tokio::task::spawn_blocking(move || ctx.repo.subscriptions_for_device(&user, &id))
tokio::task::spawn_blocking(move || ctx.store.subscriptions_for_device(&user, &id))
.await
.unwrap()
.map(Json)?,
.map(|subs| Json(subs.into_iter().map(|s| s.url).collect()))?,
)
}
@ -51,10 +51,10 @@ pub async fn get_user_subscriptions(
}
Ok(
tokio::task::spawn_blocking(move || ctx.repo.subscriptions_for_user(&user))
tokio::task::spawn_blocking(move || ctx.store.subscriptions_for_user(&user))
.await
.unwrap()
.map(Json)?,
.map(|subs| Json(subs.into_iter().map(|s| s.url).collect()))?,
)
}
@ -68,12 +68,10 @@ pub async fn put_device_subscriptions(
return Err(AppError::BadRequest);
}
Ok(
tokio::task::spawn_blocking(move || {
ctx.repo.set_subscriptions_for_device(&user, &id, urls)
})
.await
.unwrap()
.map(|_| ())?,
)
Ok(tokio::task::spawn_blocking(move || {
ctx.store.set_subscriptions_for_device(&user, &id, urls)
})
.await
.unwrap()
.map(|_| ())?)
}

View File

@ -6,7 +6,7 @@ use tower_http::trace::TraceLayer;
#[derive(Clone)]
pub struct Context {
pub repo: crate::db::SqliteRepository,
pub store: crate::gpodder::GpodderRepository,
}
pub fn app(ctx: Context) -> Router {