feat: migrate subscriptions api to repository

This commit is contained in:
Jef Roosens 2025-02-27 22:48:34 +01:00
parent a2233d9da8
commit adda030c3b
No known key found for this signature in database
GPG key ID: 21FD3D77D56BAF49
5 changed files with 347 additions and 110 deletions

View file

@ -7,16 +7,13 @@ use axum::{
use serde::Deserialize;
use crate::{
db,
gpodder::{self, SubscriptionRepository},
server::{
error::{AppError, AppResult},
gpodder::{
auth_middleware,
format::{Format, StringWithFormat},
models::{
DeviceType, SubscriptionChangeResponse, SubscriptionDelta,
SubscriptionDeltaResponse,
},
models::{SubscriptionChangeResponse, SubscriptionDelta, SubscriptionDeltaResponse},
},
Context,
},
@ -34,7 +31,7 @@ pub fn router(ctx: Context) -> Router<Context> {
pub async fn post_subscription_changes(
State(ctx): State<Context>,
Path((username, id)): Path<(String, StringWithFormat)>,
Extension(user): Extension<db::User>,
Extension(user): Extension<gpodder::User>,
Json(delta): Json<SubscriptionDelta>,
) -> AppResult<Json<SubscriptionChangeResponse>> {
if id.format != Format::Json {
@ -45,37 +42,18 @@ pub async fn post_subscription_changes(
return Err(AppError::BadRequest);
}
let timestamp = chrono::Utc::now().timestamp_millis();
tokio::task::spawn_blocking(move || {
let device = if let Some(device) = db::Device::by_device_id(&ctx.pool, user.id, &id)? {
device
} else {
db::NewDevice::new(
user.id,
id.to_string(),
String::new(),
DeviceType::Other.into(),
)
.insert(&ctx.pool)?
};
db::Subscription::update_for_device(
&ctx.pool,
device.id,
delta.add,
delta.remove,
timestamp,
)
Ok(tokio::task::spawn_blocking(move || {
ctx.repo
.update_subscriptions_for_device(&user, &id, delta.add, delta.remove)
})
.await
.unwrap()?;
Ok(Json(SubscriptionChangeResponse {
timestamp: timestamp + 1,
// TODO implement URL sanitization
update_urls: vec![],
}))
.unwrap()
.map(|timestamp| {
Json(SubscriptionChangeResponse {
timestamp,
update_urls: Vec::new(),
})
})?)
}
#[derive(Deserialize)]
@ -87,7 +65,7 @@ pub struct SinceQuery {
pub async fn get_subscription_changes(
State(ctx): State<Context>,
Path((username, id)): Path<(String, StringWithFormat)>,
Extension(user): Extension<db::User>,
Extension(user): Extension<gpodder::User>,
Query(query): Query<SinceQuery>,
) -> AppResult<Json<SubscriptionDeltaResponse>> {
if id.format != Format::Json {
@ -98,34 +76,17 @@ pub async fn get_subscription_changes(
return Err(AppError::BadRequest);
}
let subscriptions = tokio::task::spawn_blocking(move || {
let device =
db::Device::by_device_id(&ctx.pool, user.id, &id)?.ok_or(AppError::NotFound)?;
Ok::<_, AppError>(db::Subscription::updated_since_for_device(
&ctx.pool,
device.id,
query.since,
)?)
Ok(tokio::task::spawn_blocking(move || {
ctx.repo
.subscription_updates_for_device(&user, &id, query.since)
})
.await
.unwrap()?;
let mut delta = SubscriptionDeltaResponse::default();
delta.timestamp = query.since;
for sub in subscriptions.into_iter() {
if sub.deleted {
delta.remove.push(sub.url);
} else {
delta.add.push(sub.url);
}
delta.timestamp = delta.timestamp.max(sub.time_changed);
}
// Timestamp should reflect the events *after* the last seen change
delta.timestamp += 1;
Ok(Json(delta))
.unwrap()
.map(|(timestamp, add, remove)| {
Json(SubscriptionDeltaResponse {
add,
remove,
timestamp,
})
})?)
}

View file

@ -2,14 +2,14 @@ use axum::{
extract::{Path, State},
middleware,
routing::get,
Extension, Form, Json, Router,
Extension, Json, Router,
};
use crate::{
db,
gpodder::{self, SubscriptionRepository},
server::{
error::{AppError, AppResult},
gpodder::{auth_middleware, format::StringWithFormat, models::DeviceType},
gpodder::{auth_middleware, format::StringWithFormat},
Context,
},
};
@ -27,73 +27,53 @@ pub fn router(ctx: Context) -> Router<Context> {
pub async fn get_device_subscriptions(
State(ctx): State<Context>,
Path((username, id)): Path<(String, StringWithFormat)>,
Extension(user): Extension<db::User>,
Extension(user): Extension<gpodder::User>,
) -> AppResult<Json<Vec<String>>> {
if username != user.username {
return Err(AppError::BadRequest);
}
let subscriptions = tokio::task::spawn_blocking(move || {
let device =
db::Device::by_device_id(&ctx.pool, user.id, &id)?.ok_or(AppError::NotFound)?;
Ok::<_, AppError>(db::Subscription::for_device(&ctx.pool, device.id)?)
})
.await
.unwrap()?;
Ok(Json(subscriptions))
Ok(
tokio::task::spawn_blocking(move || ctx.repo.subscriptions_for_device(&user, &id))
.await
.unwrap()
.map(Json)?,
)
}
pub async fn get_user_subscriptions(
State(ctx): State<Context>,
Path(username): Path<StringWithFormat>,
Extension(user): Extension<db::User>,
Extension(user): Extension<gpodder::User>,
) -> AppResult<Json<Vec<String>>> {
if *username != user.username {
return Err(AppError::BadRequest);
}
let subscriptions =
tokio::task::spawn_blocking(move || db::Subscription::for_user(&ctx.pool, user.id))
Ok(
tokio::task::spawn_blocking(move || ctx.repo.subscriptions_for_user(&user))
.await
.unwrap()?;
Ok(Json(subscriptions))
.unwrap()
.map(Json)?,
)
}
pub async fn put_device_subscriptions(
State(ctx): State<Context>,
Path((username, id)): Path<(String, StringWithFormat)>,
Extension(user): Extension<db::User>,
Extension(user): Extension<gpodder::User>,
Json(urls): Json<Vec<String>>,
) -> AppResult<()> {
if *username != user.username {
return Err(AppError::BadRequest);
}
tokio::task::spawn_blocking(move || {
let device = if let Some(device) = db::Device::by_device_id(&ctx.pool, user.id, &id)? {
device
} else {
db::NewDevice::new(
user.id,
id.to_string(),
String::new(),
DeviceType::Other.into(),
)
.insert(&ctx.pool)?
};
Ok::<_, AppError>(db::Subscription::set_for_device(
&ctx.pool,
device.id,
urls,
chrono::Utc::now().timestamp(),
)?)
})
.await
.unwrap()?;
Ok(())
Ok(
tokio::task::spawn_blocking(move || {
ctx.repo.set_subscriptions_for_device(&user, &id, urls)
})
.await
.unwrap()
.map(|_| ())?,
)
}