refactor: split gpodder repository and the sqlite data store implementation into separate crates

The complete separation of concerns via the gpodder repository allows us
to cleanly separate the server from the gpodder specification. This
paves the way for a later Postgres implementation of the data store.
This commit is contained in:
Jef Roosens 2025-03-19 08:54:49 +01:00
parent 86687a7b96
commit 0cfcd90eba
No known key found for this signature in database
GPG key ID: 21FD3D77D56BAF49
45 changed files with 2416 additions and 882 deletions

96
gpodder_sqlite/src/lib.rs Normal file
View file

@ -0,0 +1,96 @@
mod models;
mod repository;
mod schema;
use diesel::connection::InstrumentationEvent;
use diesel::r2d2::CustomizeConnection;
use diesel::Connection;
pub use repository::SqliteRepository;
use diesel::{
r2d2::{ConnectionManager, Pool},
SqliteConnection,
};
use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness};
use std::{error::Error, fmt, path::Path};
pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations");
pub type DbPool = Pool<ConnectionManager<SqliteConnection>>;
pub type DbResult<T> = Result<T, DbError>;
#[derive(Debug)]
pub enum DbError {
Pool(diesel::r2d2::PoolError),
Db(diesel::result::Error),
}
impl fmt::Display for DbError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Pool(_) => write!(f, "failed to acquire connection from pool"),
Self::Db(_) => write!(f, "error while executing query"),
}
}
}
impl Error for DbError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
Self::Pool(err) => Some(err),
Self::Db(err) => Some(err),
}
}
}
impl From<diesel::r2d2::PoolError> for DbError {
fn from(value: diesel::r2d2::PoolError) -> Self {
Self::Pool(value)
}
}
impl From<diesel::result::Error> for DbError {
fn from(value: diesel::result::Error) -> Self {
Self::Db(value)
}
}
impl From<DbError> for gpodder::AuthErr {
fn from(value: DbError) -> Self {
match value {
DbError::Pool(err) => Self::Other(Box::new(err)),
DbError::Db(err) => Self::Other(Box::new(err)),
}
}
}
#[derive(Debug)]
pub struct AddQueryDebugLogs;
impl CustomizeConnection<SqliteConnection, diesel::r2d2::Error> for AddQueryDebugLogs {
fn on_acquire(&self, conn: &mut SqliteConnection) -> Result<(), diesel::r2d2::Error> {
conn.set_instrumentation(|event: InstrumentationEvent<'_>| match event {
InstrumentationEvent::StartQuery { query, .. } => {
tracing::debug!("{}", query);
}
_ => {}
});
Ok(())
}
}
pub fn initialize_db(path: impl AsRef<Path>, run_migrations: bool) -> Result<DbPool, DbError> {
let manager = ConnectionManager::<SqliteConnection>::new(path.as_ref().to_string_lossy());
let pool = Pool::builder()
.connection_customizer(Box::new(AddQueryDebugLogs))
.build(manager)?;
if run_migrations {
pool.get()?.run_pending_migrations(MIGRATIONS).unwrap();
}
Ok(pool)
}

View file

@ -0,0 +1,148 @@
use std::{fmt, str::FromStr};
use diesel::{
deserialize::{FromSql, FromSqlRow},
expression::AsExpression,
prelude::*,
serialize::ToSql,
sql_types::Text,
sqlite::{Sqlite, SqliteValue},
};
use crate::schema::*;
#[derive(Clone, Queryable, Selectable)]
#[diesel(table_name = devices)]
#[diesel(check_for_backend(diesel::sqlite::Sqlite))]
pub struct Device {
pub id: i64,
pub device_id: String,
pub user_id: i64,
pub caption: String,
pub type_: DeviceType,
pub sync_group_id: Option<i64>,
}
#[derive(Insertable)]
#[diesel(table_name = devices)]
#[diesel(check_for_backend(diesel::sqlite::Sqlite))]
pub struct NewDevice {
pub device_id: String,
pub user_id: i64,
pub caption: String,
pub type_: DeviceType,
}
#[derive(FromSqlRow, Debug, AsExpression, Clone)]
#[diesel(sql_type = Text)]
pub enum DeviceType {
Desktop,
Laptop,
Mobile,
Server,
Other,
}
impl Device {
pub fn device_id_to_id(
conn: &mut SqliteConnection,
user_id: i64,
device_id: &str,
) -> diesel::QueryResult<i64> {
devices::table
.select(devices::id)
.filter(
devices::user_id
.eq(user_id)
.and(devices::device_id.eq(device_id)),
)
.get_result(conn)
}
pub fn by_device_id(
conn: &mut SqliteConnection,
user_id: i64,
device_id: &str,
) -> diesel::QueryResult<Self> {
devices::dsl::devices
.select(Self::as_select())
.filter(
devices::user_id
.eq(user_id)
.and(devices::device_id.eq(device_id)),
)
.get_result(conn)
}
}
impl NewDevice {
pub fn new(user_id: i64, device_id: String, caption: String, type_: DeviceType) -> Self {
Self {
device_id,
user_id,
caption,
type_,
}
}
}
impl fmt::Display for DeviceType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{}",
match self {
Self::Desktop => "desktop",
Self::Laptop => "laptop",
Self::Mobile => "mobile",
Self::Server => "server",
Self::Other => "other",
}
)
}
}
#[derive(Debug)]
pub struct DeviceTypeParseErr(String);
impl fmt::Display for DeviceTypeParseErr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "invalid device type '{}'", self.0)
}
}
impl std::error::Error for DeviceTypeParseErr {}
impl FromStr for DeviceType {
type Err = DeviceTypeParseErr;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"desktop" => Ok(Self::Desktop),
"laptop" => Ok(Self::Laptop),
"mobile" => Ok(Self::Mobile),
"server" => Ok(Self::Server),
"other" => Ok(Self::Other),
_ => Err(DeviceTypeParseErr(s.to_string())),
}
}
}
impl FromSql<Text, Sqlite> for DeviceType {
fn from_sql(bytes: SqliteValue) -> diesel::deserialize::Result<Self> {
let s = <String as FromSql<Text, Sqlite>>::from_sql(bytes)?;
Ok(s.as_str().parse()?)
}
}
impl ToSql<Text, Sqlite> for DeviceType {
fn to_sql<'b>(
&'b self,
out: &mut diesel::serialize::Output<'b, '_, Sqlite>,
) -> diesel::serialize::Result {
out.set_value(self.to_string());
Ok(diesel::serialize::IsNull::No)
}
}

View file

@ -0,0 +1,24 @@
use diesel::prelude::*;
use crate::schema::*;
#[derive(Clone, Queryable, Selectable)]
#[diesel(table_name = device_subscriptions)]
#[diesel(check_for_backend(diesel::sqlite::Sqlite))]
pub struct DeviceSubscription {
pub id: i64,
pub device_id: i64,
pub podcast_url: String,
pub time_changed: i64,
pub deleted: bool,
}
#[derive(Insertable)]
#[diesel(table_name = device_subscriptions)]
#[diesel(check_for_backend(diesel::sqlite::Sqlite))]
pub struct NewDeviceSubscription {
pub device_id: i64,
pub podcast_url: String,
pub time_changed: i64,
pub deleted: bool,
}

View file

@ -0,0 +1,114 @@
use std::{fmt, str::FromStr};
use diesel::{
deserialize::{FromSql, FromSqlRow},
expression::AsExpression,
prelude::{Insertable, Queryable},
serialize::ToSql,
sql_types::Text,
sqlite::{Sqlite, SqliteValue},
Selectable,
};
use crate::schema::*;
#[derive(Clone, Queryable, Selectable)]
#[diesel(table_name = episode_actions)]
#[diesel(check_for_backend(diesel::sqlite::Sqlite))]
pub struct EpisodeAction {
pub id: i64,
pub user_id: i64,
pub device_id: Option<i64>,
pub podcast_url: String,
pub episode_url: String,
pub time_changed: i64,
pub timestamp: Option<i64>,
pub action: ActionType,
pub started: Option<i32>,
pub position: Option<i32>,
pub total: Option<i32>,
}
#[derive(Insertable)]
#[diesel(table_name = episode_actions)]
#[diesel(check_for_backend(diesel::sqlite::Sqlite))]
pub struct NewEpisodeAction {
pub user_id: i64,
pub device_id: Option<i64>,
pub podcast_url: String,
pub episode_url: String,
pub time_changed: i64,
pub timestamp: Option<i64>,
pub action: ActionType,
pub started: Option<i32>,
pub position: Option<i32>,
pub total: Option<i32>,
}
#[derive(FromSqlRow, Debug, AsExpression, Clone)]
#[diesel(sql_type = Text)]
pub enum ActionType {
New,
Download,
Play,
Delete,
}
impl fmt::Display for ActionType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{}",
match self {
Self::New => "new",
Self::Download => "download",
Self::Play => "play",
Self::Delete => "delete",
}
)
}
}
#[derive(Debug)]
pub struct ActionTypeParseErr(String);
impl fmt::Display for ActionTypeParseErr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "invalid action type '{}'", self.0)
}
}
impl std::error::Error for ActionTypeParseErr {}
impl FromStr for ActionType {
type Err = ActionTypeParseErr;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"new" => Ok(Self::New),
"download" => Ok(Self::Download),
"delete" => Ok(Self::Delete),
"play" => Ok(Self::Play),
_ => Err(ActionTypeParseErr(s.to_string())),
}
}
}
impl FromSql<Text, Sqlite> for ActionType {
fn from_sql(bytes: SqliteValue) -> diesel::deserialize::Result<Self> {
let s = <String as FromSql<Text, Sqlite>>::from_sql(bytes)?;
Ok(s.as_str().parse()?)
}
}
impl ToSql<Text, Sqlite> for ActionType {
fn to_sql<'b>(
&'b self,
out: &mut diesel::serialize::Output<'b, '_, Sqlite>,
) -> diesel::serialize::Result {
out.set_value(self.to_string());
Ok(diesel::serialize::IsNull::No)
}
}

View file

@ -0,0 +1,6 @@
pub mod device;
pub mod device_subscription;
pub mod episode_action;
pub mod session;
pub mod sync_group;
pub mod user;

View file

@ -0,0 +1,60 @@
use diesel::prelude::*;
use crate::schema::*;
#[derive(Clone, Queryable, Selectable, Insertable, Associations)]
#[diesel(belongs_to(super::user::User))]
#[diesel(table_name = sessions)]
#[diesel(check_for_backend(diesel::sqlite::Sqlite))]
pub struct Session {
pub id: i64,
pub user_id: i64,
pub last_seen: i64,
}
impl Session {
// pub fn new_for_user(pool: &DbPool, user_id: i64, last_seen: i64) -> DbResult<Self> {
// let id: i64 = rand::thread_rng().gen();
// Ok(Self {
// id,
// user_id,
// last_seen,
// }
// .insert_into(sessions::table)
// .returning(Self::as_returning())
// .get_result(&mut pool.get()?)?)
// }
// pub fn user_from_id(pool: &DbPool, id: i64) -> DbResult<Option<super::user::User>> {
// Ok(sessions::dsl::sessions
// .inner_join(users::table)
// .filter(sessions::id.eq(id))
// .select(User::as_select())
// .get_result(&mut pool.get()?)
// .optional()?)
// }
// pub fn user(&self, pool: &DbPool) -> DbResult<Option<super::user::User>> {
// Self::user_from_id(pool, self.id)
// }
// pub fn by_id(pool: &DbPool, id: i64) -> DbResult<Option<Self>> {
// Ok(sessions::dsl::sessions
// .find(id)
// .get_result(&mut pool.get()?)
// .optional()?)
// }
// pub fn remove(self, pool: &DbPool) -> DbResult<bool> {
// Self::remove_by_id(pool, self.id)
// }
// pub fn remove_by_id(pool: &DbPool, id: i64) -> DbResult<bool> {
// Ok(
// diesel::delete(sessions::dsl::sessions.filter(sessions::id.eq(id)))
// .execute(&mut pool.get()?)?
// > 0,
// )
// }
}

View file

@ -0,0 +1,33 @@
use diesel::{
dsl::{exists, not},
prelude::*,
};
use crate::schema::*;
#[derive(Queryable, Selectable)]
#[diesel(table_name = sync_groups)]
#[diesel(check_for_backend(diesel::sqlite::Sqlite))]
pub struct SyncGroup {
pub id: i64,
}
impl SyncGroup {
pub fn new(conn: &mut SqliteConnection) -> QueryResult<Self> {
diesel::insert_into(sync_groups::table)
.default_values()
.returning(SyncGroup::as_returning())
.get_result(conn)
}
pub fn remove_unused(conn: &mut SqliteConnection) -> QueryResult<usize> {
diesel::delete(
sync_groups::table.filter(not(exists(
devices::table
.select(1.into_sql::<diesel::sql_types::Integer>())
.filter(devices::sync_group_id.eq(sync_groups::id.nullable())),
))),
)
.execute(conn)
}
}

View file

@ -0,0 +1,47 @@
use diesel::prelude::*;
use crate::schema::*;
#[derive(Clone, Queryable, Selectable)]
#[diesel(table_name = users)]
#[diesel(check_for_backend(diesel::sqlite::Sqlite))]
pub struct User {
pub id: i64,
pub username: String,
pub password_hash: String,
}
#[derive(Insertable)]
#[diesel(table_name = users)]
#[diesel(check_for_backend(diesel::sqlite::Sqlite))]
pub struct NewUser {
pub username: String,
pub password_hash: String,
}
// impl NewUser {
// pub fn new(username: String, password: String) -> Self {
// Self {
// username,
// password_hash: hash_password(&password),
// }
// }
// }
// impl User {
// pub fn by_username(pool: &DbPool, username: impl AsRef<str>) -> DbResult<Option<Self>> {
// Ok(users::dsl::users
// .select(User::as_select())
// .filter(users::username.eq(username.as_ref()))
// .first(&mut pool.get()?)
// .optional()?)
// }
// pub fn verify_password(&self, password: impl AsRef<str>) -> bool {
// let password_hash = PasswordHash::new(&self.password_hash).unwrap();
// Argon2::default()
// .verify_password(password.as_ref().as_bytes(), &password_hash)
// .is_ok()
// }
// }

View file

@ -0,0 +1,96 @@
use chrono::DateTime;
use diesel::prelude::*;
use gpodder::AuthErr;
use super::SqliteRepository;
use crate::{
models::{session::Session, user::User},
schema::*,
DbError,
};
impl From<User> for gpodder::User {
fn from(value: User) -> Self {
Self {
id: value.id,
username: value.username,
password_hash: value.password_hash,
}
}
}
impl gpodder::AuthStore for SqliteRepository {
fn get_user(&self, username: &str) -> Result<Option<gpodder::models::User>, AuthErr> {
Ok(users::table
.select(User::as_select())
.filter(users::username.eq(username))
.first(&mut self.pool.get().map_err(DbError::from)?)
.optional()
.map_err(DbError::from)?
.map(gpodder::User::from))
}
fn get_session(&self, session_id: i64) -> Result<Option<gpodder::models::Session>, AuthErr> {
match sessions::table
.inner_join(users::table)
.filter(sessions::id.eq(session_id))
.select((Session::as_select(), User::as_select()))
.get_result(&mut self.pool.get().map_err(DbError::from)?)
{
Ok((session, user)) => Ok(Some(gpodder::Session {
id: session.id,
last_seen: DateTime::from_timestamp(session.last_seen, 0).unwrap(),
user: user.into(),
})),
Err(err) => Err(DbError::from(err).into()),
}
}
fn remove_session(&self, session_id: i64) -> Result<(), AuthErr> {
Ok(
diesel::delete(sessions::table.filter(sessions::id.eq(session_id)))
.execute(&mut self.pool.get().map_err(DbError::from)?)
.map(|_| ())
.map_err(DbError::from)?,
)
}
fn insert_session(&self, session: &gpodder::Session) -> Result<(), AuthErr> {
Ok(Session {
id: session.id,
user_id: session.user.id,
last_seen: session.last_seen.timestamp(),
}
.insert_into(sessions::table)
.execute(&mut self.pool.get().map_err(DbError::from)?)
.map(|_| ())
.map_err(DbError::from)?)
}
fn refresh_session(
&self,
session: &gpodder::Session,
timestamp: DateTime<chrono::Utc>,
) -> Result<(), AuthErr> {
if diesel::update(sessions::table.filter(sessions::id.eq(session.id)))
.set(sessions::last_seen.eq(timestamp.timestamp()))
.execute(&mut self.pool.get().map_err(DbError::from)?)
.map_err(DbError::from)?
== 0
{
Err(AuthErr::UnknownSession)
} else {
Ok(())
}
}
fn remove_old_sessions(&self, min_last_seen: DateTime<chrono::Utc>) -> Result<usize, AuthErr> {
let min_last_seen = min_last_seen.timestamp();
Ok(
diesel::delete(sessions::table.filter(sessions::last_seen.lt(min_last_seen)))
.execute(&mut self.pool.get().map_err(DbError::from)?)
.map_err(DbError::from)?,
)
}
}

View file

@ -0,0 +1,298 @@
use std::collections::HashSet;
use chrono::{DateTime, Utc};
use diesel::{alias, dsl::not, prelude::*};
use gpodder::AuthErr;
use super::SqliteRepository;
use crate::{
models::{
device::{Device, DeviceType, NewDevice},
sync_group::SyncGroup,
},
schema::*,
DbError,
};
impl From<DeviceType> for gpodder::DeviceType {
fn from(value: DeviceType) -> Self {
match value {
DeviceType::Desktop => Self::Desktop,
DeviceType::Laptop => Self::Laptop,
DeviceType::Mobile => Self::Mobile,
DeviceType::Server => Self::Server,
DeviceType::Other => Self::Other,
}
}
}
impl From<gpodder::DeviceType> for DeviceType {
fn from(value: gpodder::DeviceType) -> Self {
match value {
gpodder::DeviceType::Desktop => Self::Desktop,
gpodder::DeviceType::Laptop => Self::Laptop,
gpodder::DeviceType::Mobile => Self::Mobile,
gpodder::DeviceType::Server => Self::Server,
gpodder::DeviceType::Other => Self::Other,
}
}
}
impl gpodder::DeviceRepository for SqliteRepository {
fn devices_for_user(
&self,
user: &gpodder::User,
) -> Result<Vec<gpodder::Device>, gpodder::AuthErr> {
(|| {
Ok::<_, DbError>(
devices::table
.select(Device::as_select())
.filter(devices::user_id.eq(user.id))
.get_results(&mut self.pool.get()?)?
.into_iter()
.map(|d| gpodder::Device {
id: d.device_id,
caption: d.caption,
r#type: d.type_.into(),
// TODO implement subscription count
subscriptions: 0,
})
.collect(),
)
})()
.map_err(AuthErr::from)
}
fn update_device_info(
&self,
user: &gpodder::User,
device_id: &str,
patch: gpodder::DevicePatch,
) -> Result<(), gpodder::AuthErr> {
(|| {
if let Some(mut device) = devices::table
.select(Device::as_select())
.filter(
devices::user_id
.eq(user.id)
.and(devices::device_id.eq(device_id)),
)
.get_result(&mut self.pool.get()?)
.optional()?
{
if let Some(caption) = patch.caption {
device.caption = caption;
}
if let Some(type_) = patch.r#type {
device.type_ = type_.into();
}
diesel::update(devices::table.filter(devices::id.eq(device.id)))
.set((
devices::caption.eq(&device.caption),
devices::type_.eq(&device.type_),
))
.execute(&mut self.pool.get()?)?;
} else {
let device = NewDevice {
device_id: device_id.to_string(),
user_id: user.id,
caption: patch.caption.unwrap_or(String::new()),
type_: patch.r#type.unwrap_or(gpodder::DeviceType::Other).into(),
};
diesel::insert_into(devices::table)
.values(device)
.execute(&mut self.pool.get()?)?;
}
Ok::<_, DbError>(())
})()
.map_err(AuthErr::from)
}
fn merge_sync_groups(
&self,
user: &gpodder::User,
device_ids: Vec<&str>,
) -> Result<i64, gpodder::AuthErr> {
(|| {
let conn = &mut self.pool.get()?;
conn.transaction(|conn| {
let devices: Vec<(i64, Option<i64>)> = devices::table
.select((devices::id, devices::sync_group_id))
.filter(
devices::user_id
.eq(user.id)
.and(devices::device_id.eq_any(device_ids)),
)
.get_results(conn)?;
let mut sync_group_ids: Vec<i64> = devices
.iter()
.filter_map(|(_, group_id)| *group_id)
.collect();
// Remove any duplicates, giving us each sync group ID once
sync_group_ids.sort();
sync_group_ids.dedup();
// If any of the devices are already in a sync group, we reuse the first one we find.
// Otherwise, we generate a new one.
let sync_group_id = if let Some(id) = sync_group_ids.pop() {
id
} else {
SyncGroup::new(conn)?.id
};
// Move all devices in the other sync groups into the new sync group
diesel::update(
devices::table.filter(devices::sync_group_id.eq_any(sync_group_ids.iter())),
)
.set(devices::sync_group_id.eq(sync_group_id))
.execute(conn)?;
// Add the non-synchronized devices into the new sync group
let unsynced_device_ids =
devices.iter().filter_map(
|(id, group_id)| if group_id.is_none() { Some(id) } else { None },
);
diesel::update(devices::table.filter(devices::id.eq_any(unsynced_device_ids)))
.set(devices::sync_group_id.eq(sync_group_id))
.execute(conn)?;
// Remove the other now unused sync groups
diesel::delete(sync_groups::table.filter(sync_groups::id.eq_any(sync_group_ids)))
.execute(conn)?;
Ok::<_, DbError>(sync_group_id)
})
})()
.map_err(AuthErr::from)
}
fn remove_from_sync_group(
&self,
user: &gpodder::User,
device_ids: Vec<&str>,
) -> Result<(), gpodder::AuthErr> {
(|| {
let conn = &mut self.pool.get()?;
diesel::update(
devices::table.filter(
devices::user_id
.eq(user.id)
.and(devices::device_id.eq_any(device_ids)),
),
)
.set(devices::sync_group_id.eq(None::<i64>))
.execute(conn)?;
// This is in a different transaction on purpose, as the success of this removal shouldn't
// fail the entire query
SyncGroup::remove_unused(conn)?;
Ok::<_, DbError>(())
})()
.map_err(AuthErr::from)
}
fn synchronize_sync_group(
&self,
group_id: i64,
time_changed: DateTime<Utc>,
) -> Result<(), gpodder::AuthErr> {
(|| {
let time_changed = time_changed.timestamp();
let conn = &mut self.pool.get()?;
conn.transaction(|conn| {
let device_ids: Vec<i64> = devices::table
.filter(devices::sync_group_id.eq(group_id))
.select(devices::id)
.get_results(conn)?;
// For each device in the group, we get the list of subscriptions not yet in its own
// non-deleted list, and add it to the database
for device_id in device_ids.iter().copied() {
let d1 = alias!(device_subscriptions as d1);
let own_subscriptions = d1
.filter(
d1.field(device_subscriptions::device_id)
.eq(device_id)
.and(d1.field(device_subscriptions::deleted).eq(false)),
)
.select(d1.field(device_subscriptions::podcast_url));
let urls_to_add = device_subscriptions::table
.select(device_subscriptions::podcast_url)
.filter(
device_subscriptions::device_id
.eq_any(device_ids.iter())
.and(device_subscriptions::deleted.eq(false))
.and(not(
device_subscriptions::podcast_url.eq_any(own_subscriptions)
)),
)
.distinct()
.load_iter(conn)?
.collect::<Result<HashSet<String>, _>>()?;
super::subscription::insert_subscriptions_for_single_device(
conn,
device_id,
urls_to_add.iter(),
time_changed,
)?;
}
Ok::<_, DbError>(())
})
})()
.map_err(AuthErr::from)
}
fn devices_by_sync_group(
&self,
user: &gpodder::User,
) -> Result<(Vec<String>, Vec<Vec<String>>), gpodder::AuthErr> {
(|| {
let mut not_synchronized = Vec::new();
let mut synchronized = Vec::new();
let conn = &mut self.pool.get()?;
let mut devices = devices::table
.select((devices::device_id, devices::sync_group_id))
.filter(devices::user_id.eq(user.id))
.order(devices::sync_group_id)
.load_iter::<(String, Option<i64>), _>(conn)?;
let mut cur_group = &mut not_synchronized;
let mut cur_group_id: Option<i64> = None;
while let Some((device_id, group_id)) = devices.next().transpose()? {
if group_id != cur_group_id {
if group_id.is_none() {
cur_group = &mut not_synchronized;
} else {
synchronized.push(Vec::new());
let index = synchronized.len() - 1;
cur_group = &mut synchronized[index];
}
cur_group_id = group_id;
}
cur_group.push(device_id);
}
Ok::<_, DbError>((not_synchronized, synchronized))
})()
.map_err(AuthErr::from)
}
}

View file

@ -0,0 +1,176 @@
use chrono::{DateTime, Utc};
use diesel::prelude::*;
use gpodder::AuthErr;
use super::SqliteRepository;
use crate::{
models::{
device::Device,
episode_action::{ActionType, EpisodeAction, NewEpisodeAction},
},
schema::*,
DbError,
};
impl From<gpodder::EpisodeAction> for NewEpisodeAction {
fn from(value: gpodder::EpisodeAction) -> Self {
let (action, started, position, total) = match value.action {
gpodder::EpisodeActionType::New => (ActionType::New, None, None, None),
gpodder::EpisodeActionType::Delete => (ActionType::Delete, None, None, None),
gpodder::EpisodeActionType::Download => (ActionType::Download, None, None, None),
gpodder::EpisodeActionType::Play {
started,
position,
total,
} => (ActionType::Play, started, Some(position), total),
};
NewEpisodeAction {
user_id: 0,
device_id: None,
podcast_url: value.podcast,
episode_url: value.episode,
time_changed: 0,
timestamp: value.timestamp.map(|t| t.timestamp()),
action,
started,
position,
total,
}
}
}
fn to_gpodder_action(
(device_id, db_action): (Option<String>, EpisodeAction),
) -> gpodder::EpisodeAction {
let action = match db_action.action {
ActionType::Play => gpodder::EpisodeActionType::Play {
started: db_action.started,
// SAFETY: the condition that this isn't null if the action type is "play" is
// explicitely enforced by the database using a CHECK constraint.
position: db_action.position.unwrap(),
total: db_action.total,
},
ActionType::New => gpodder::EpisodeActionType::New,
ActionType::Delete => gpodder::EpisodeActionType::Delete,
ActionType::Download => gpodder::EpisodeActionType::Download,
};
gpodder::EpisodeAction {
podcast: db_action.podcast_url,
episode: db_action.episode_url,
timestamp: db_action
.timestamp
// SAFETY the input to the from_timestamp function is always the result of a
// previous timestamp() function call, which is guaranteed to be each other's
// reverse
.map(|ts| DateTime::from_timestamp(ts, 0).unwrap()),
time_changed: DateTime::from_timestamp(db_action.time_changed, 0).unwrap(),
device: device_id,
action,
}
}
impl gpodder::EpisodeActionRepository for SqliteRepository {
fn add_episode_actions(
&self,
user: &gpodder::User,
actions: Vec<gpodder::EpisodeAction>,
time_changed: DateTime<Utc>,
) -> Result<(), gpodder::AuthErr> {
(|| {
let time_changed = time_changed.timestamp();
// TODO optimize this query
// 1. The lookup for a device could be replaced with a subquery, although Diesel seems to
// have a problem using an Option<String> to match equality with a String
// 2. Ideally the for loop would be replaced with a single query inserting multiple values,
// although each value would need its own subquery
//
// NOTE this function usually gets called from the same device, so optimizing the
// amount of device lookups required would be useful.
self.pool.get()?.transaction(|conn| {
for action in actions {
let device_id = if let Some(device) = &action.device {
Some(Device::device_id_to_id(conn, user.id, device)?)
} else {
None
};
let mut new_action: NewEpisodeAction = action.into();
new_action.user_id = user.id;
new_action.device_id = device_id;
new_action.time_changed = time_changed;
diesel::insert_into(episode_actions::table)
.values(&new_action)
.execute(conn)?;
}
Ok::<_, DbError>(())
})
})()
.map_err(AuthErr::from)
}
fn episode_actions_for_user(
&self,
user: &gpodder::User,
since: Option<DateTime<Utc>>,
podcast: Option<String>,
device: Option<String>,
aggregated: bool,
) -> Result<Vec<gpodder::EpisodeAction>, gpodder::AuthErr> {
(|| {
let since = since.map(|ts| ts.timestamp()).unwrap_or(0);
let conn = &mut self.pool.get()?;
let mut query = episode_actions::table
.left_join(devices::table)
.filter(
episode_actions::user_id
.eq(user.id)
.and(episode_actions::time_changed.ge(since)),
)
.select((devices::device_id.nullable(), EpisodeAction::as_select()))
.into_boxed();
if let Some(device_id) = device {
query = query.filter(devices::device_id.eq(device_id));
}
if let Some(podcast_url) = podcast {
query = query.filter(episode_actions::podcast_url.eq(podcast_url));
}
let db_actions: Vec<(Option<String>, EpisodeAction)> = if aggregated {
// https://stackoverflow.com/a/7745635
// For each episode URL, we want to return the row with the highest `time_changed`
// value. We achieve this be left joining with self on the URL, as well as whether the
// left row's time_changed value is less than the right one. Rows with the largest
// time_changed value for a given URL will join with a NULL value (because of the left
// join), so we filter those out to retrieve the correct rows.
let a2 = diesel::alias!(episode_actions as a2);
query
.left_join(
a2.on(episode_actions::episode_url
.eq(a2.field(episode_actions::episode_url))
.and(
episode_actions::time_changed
.lt(a2.field(episode_actions::time_changed)),
)),
)
.filter(a2.field(episode_actions::episode_url).is_null())
.get_results(conn)?
} else {
query.get_results(conn)?
};
let actions = db_actions.into_iter().map(to_gpodder_action).collect();
Ok::<_, DbError>(actions)
})()
.map_err(AuthErr::from)
}
}

View file

@ -0,0 +1,27 @@
mod auth;
mod device;
mod episode_action;
mod subscription;
use std::path::Path;
use super::DbPool;
#[derive(Clone)]
pub struct SqliteRepository {
pool: DbPool,
}
impl From<DbPool> for SqliteRepository {
fn from(value: DbPool) -> Self {
Self { pool: value }
}
}
impl SqliteRepository {
pub fn from_path(path: impl AsRef<Path>) -> Result<Self, gpodder::AuthErr> {
let pool = super::initialize_db(path, true)?;
Ok(Self { pool })
}
}

View file

@ -0,0 +1,374 @@
use std::collections::HashSet;
use chrono::DateTime;
use diesel::prelude::*;
use gpodder::AuthErr;
use super::SqliteRepository;
use crate::{
models::device_subscription::{DeviceSubscription, NewDeviceSubscription},
schema::*,
DbError,
};
fn set_subscriptions_for_single_device(
conn: &mut SqliteConnection,
device_id: i64,
urls: &HashSet<String>,
time_changed: i64,
) -> QueryResult<()> {
// https://github.com/diesel-rs/diesel/discussions/2826
// SQLite doesn't support default on conflict set values, so we can't handle this using
// on conflict. Therefore, we instead calculate which URLs should be inserted and which
// updated, so we avoid conflicts.
let urls_in_db: HashSet<String> = device_subscriptions::table
.select(device_subscriptions::podcast_url)
.filter(device_subscriptions::device_id.eq(device_id))
.get_results(conn)?
.into_iter()
.collect();
// URLs originally in the database that are no longer in the list
let urls_to_delete = urls_in_db.difference(&urls);
// URLs not in the database that are in the new list
let urls_to_insert = urls.difference(&urls_in_db);
// URLs that are in both the database and the new list. For these, those marked as
// "deleted" in the database are updated so they're no longer deleted, with their
// timestamp updated.
let urls_to_update = urls.intersection(&urls_in_db);
// Mark the URLs to delete as properly deleted
diesel::update(
device_subscriptions::table.filter(
device_subscriptions::device_id
.eq(device_id)
.and(device_subscriptions::podcast_url.eq_any(urls_to_delete)),
),
)
.set((
device_subscriptions::deleted.eq(true),
device_subscriptions::time_changed.eq(time_changed),
))
.execute(conn)?;
// Update the existing deleted URLs that are reinserted as no longer deleted
diesel::update(
device_subscriptions::table.filter(
device_subscriptions::device_id
.eq(device_id)
.and(device_subscriptions::podcast_url.eq_any(urls_to_update))
.and(device_subscriptions::deleted.eq(true)),
),
)
.set((
device_subscriptions::deleted.eq(false),
device_subscriptions::time_changed.eq(time_changed),
))
.execute(conn)?;
// Insert the new values into the database
diesel::insert_into(device_subscriptions::table)
.values(
urls_to_insert
.into_iter()
.map(|url| NewDeviceSubscription {
device_id,
podcast_url: url.to_string(),
deleted: false,
time_changed,
})
.collect::<Vec<_>>(),
)
.execute(conn)?;
Ok(())
}
/// Add the given URLs to the device's list of subscriptions, meaning the URLs are truly inserted
/// into the database. This function assumes the list of URLs is already free of URLs that already
/// have a corresponding row in the database, so no conflict checks are performed.
pub fn insert_subscriptions_for_single_device<'a>(
conn: &mut SqliteConnection,
device_id: i64,
urls: impl Iterator<Item = &'a String>,
time_changed: i64,
) -> QueryResult<()> {
diesel::insert_into(device_subscriptions::table)
.values(
urls.into_iter()
.map(|url| NewDeviceSubscription {
device_id,
podcast_url: url.to_string(),
deleted: false,
time_changed,
})
.collect::<Vec<_>>(),
)
.execute(conn)?;
Ok(())
}
pub fn update_subscriptions_for_single_device(
conn: &mut SqliteConnection,
device_id: i64,
add: &HashSet<String>,
remove: &HashSet<String>,
time_changed: i64,
) -> QueryResult<()> {
let urls_in_db: HashSet<String> = device_subscriptions::table
.select(device_subscriptions::podcast_url)
.filter(device_subscriptions::device_id.eq(device_id))
.get_results(conn)?
.into_iter()
.collect();
// Subscriptions to remove are those that were already in the database and are now part
// of the removed list. Subscriptions that were never added in the first place don't
// need to be marked as deleted. We also only update those that aren't already marked
// as deleted.
let urls_to_delete = remove.intersection(&urls_in_db);
diesel::update(
device_subscriptions::table.filter(
device_subscriptions::device_id
.eq(device_id)
.and(device_subscriptions::podcast_url.eq_any(urls_to_delete))
.and(device_subscriptions::deleted.eq(false)),
),
)
.set((
device_subscriptions::deleted.eq(true),
device_subscriptions::time_changed.eq(time_changed),
))
.execute(conn)?;
// Subscriptions to update are those that are already in the database, but are also in
// the added list. Only those who were originally marked as deleted get updated.
let urls_to_update = add.intersection(&urls_in_db);
diesel::update(
device_subscriptions::table.filter(
device_subscriptions::device_id
.eq(device_id)
.and(device_subscriptions::podcast_url.eq_any(urls_to_update))
.and(device_subscriptions::deleted.eq(true)),
),
)
.set((
device_subscriptions::deleted.eq(false),
device_subscriptions::time_changed.eq(time_changed),
))
.execute(conn)?;
// Subscriptions to insert are those that aren't in the database and are part of the
// added list
let urls_to_insert = add.difference(&urls_in_db);
insert_subscriptions_for_single_device(conn, device_id, urls_to_insert, time_changed)?;
Ok(())
}
impl gpodder::SubscriptionRepository for SqliteRepository {
fn subscriptions_for_user(
&self,
user: &gpodder::User,
) -> Result<Vec<gpodder::Subscription>, gpodder::AuthErr> {
(|| {
Ok::<_, DbError>(
device_subscriptions::table
.inner_join(devices::table)
.filter(devices::user_id.eq(user.id))
.select((
device_subscriptions::podcast_url,
device_subscriptions::time_changed,
))
.distinct()
.get_results::<(String, i64)>(&mut self.pool.get()?)?
.into_iter()
.map(|(url, ts)| gpodder::Subscription {
url,
time_changed: DateTime::from_timestamp(ts, 0).unwrap(),
})
.collect(),
)
})()
.map_err(AuthErr::from)
}
fn subscriptions_for_device(
&self,
user: &gpodder::User,
device_id: &str,
) -> Result<Vec<gpodder::Subscription>, gpodder::AuthErr> {
(|| {
Ok::<_, DbError>(
device_subscriptions::table
.inner_join(devices::table)
.filter(
devices::user_id
.eq(user.id)
.and(devices::device_id.eq(device_id)),
)
.select((
device_subscriptions::podcast_url,
device_subscriptions::time_changed,
))
.get_results::<(String, i64)>(&mut self.pool.get()?)?
.into_iter()
.map(|(url, ts)| gpodder::Subscription {
url,
time_changed: DateTime::from_timestamp(ts, 0).unwrap(),
})
.collect(),
)
})()
.map_err(AuthErr::from)
}
fn set_subscriptions_for_device(
&self,
user: &gpodder::User,
device_id: &str,
urls: Vec<String>,
time_changed: chrono::DateTime<chrono::Utc>,
) -> Result<(), gpodder::AuthErr> {
(|| {
let time_changed = time_changed.timestamp();
let urls: HashSet<String> = urls.into_iter().collect();
self.pool.get()?.transaction(|conn| {
let (device_id, group_id) = devices::table
.select((devices::id, devices::sync_group_id))
.filter(
devices::user_id
.eq(user.id)
.and(devices::device_id.eq(device_id)),
)
.get_result::<(i64, Option<i64>)>(conn)?;
// If the device is part of a sync group, we need to perform the update on every device
// in the group
if let Some(group_id) = group_id {
let device_ids: Vec<i64> = devices::table
.filter(devices::sync_group_id.eq(group_id))
.select(devices::id)
.get_results(conn)?;
for device_id in device_ids {
set_subscriptions_for_single_device(conn, device_id, &urls, time_changed)?;
}
} else {
set_subscriptions_for_single_device(conn, device_id, &urls, time_changed)?;
}
Ok::<_, DbError>(())
})
})()
.map_err(AuthErr::from)
}
fn update_subscriptions_for_device(
&self,
user: &gpodder::User,
device_id: &str,
add: Vec<String>,
remove: Vec<String>,
time_changed: chrono::DateTime<chrono::Utc>,
) -> Result<(), gpodder::AuthErr> {
(|| {
let time_changed = time_changed.timestamp();
// TODO URLs that are in both the added and removed lists will currently get "re-added",
// meaning their change timestamp will be updated even though they haven't really changed.
let add: HashSet<_> = add.into_iter().collect();
let remove: HashSet<_> = remove.into_iter().collect();
self.pool.get()?.transaction(|conn| {
let (device_id, group_id) = devices::table
.select((devices::id, devices::sync_group_id))
.filter(
devices::user_id
.eq(user.id)
.and(devices::device_id.eq(device_id)),
)
.get_result::<(i64, Option<i64>)>(conn)?;
// If the device is part of a sync group, we need to perform the update on every device
// in the group
if let Some(group_id) = group_id {
let device_ids: Vec<i64> = devices::table
.filter(devices::sync_group_id.eq(group_id))
.select(devices::id)
.get_results(conn)?;
for device_id in device_ids {
update_subscriptions_for_single_device(
conn,
device_id,
&add,
&remove,
time_changed,
)?;
}
} else {
update_subscriptions_for_single_device(
conn,
device_id,
&add,
&remove,
time_changed,
)?;
}
Ok::<_, DbError>(())
})
})()
.map_err(AuthErr::from)
}
fn subscription_updates_for_device(
&self,
user: &gpodder::User,
device_id: &str,
since: chrono::DateTime<chrono::Utc>,
) -> Result<(Vec<gpodder::Subscription>, Vec<gpodder::Subscription>), gpodder::AuthErr> {
(|| {
let since = since.timestamp();
let (mut added, mut removed) = (Vec::new(), Vec::new());
let query = device_subscriptions::table
.inner_join(devices::table)
.filter(
devices::user_id
.eq(user.id)
.and(devices::device_id.eq(device_id))
.and(device_subscriptions::time_changed.ge(since)),
)
.select(DeviceSubscription::as_select());
for sub in query.load_iter(&mut self.pool.get()?)? {
let sub = sub?;
if sub.deleted {
removed.push(gpodder::Subscription {
url: sub.podcast_url,
time_changed: DateTime::from_timestamp(sub.time_changed, 0).unwrap(),
});
} else {
added.push(gpodder::Subscription {
url: sub.podcast_url,
time_changed: DateTime::from_timestamp(sub.time_changed, 0).unwrap(),
});
}
}
Ok::<_, DbError>((added, removed))
})()
.map_err(AuthErr::from)
}
}

View file

@ -0,0 +1,77 @@
// @generated automatically by Diesel CLI.
diesel::table! {
device_subscriptions (id) {
id -> BigInt,
device_id -> BigInt,
podcast_url -> Text,
time_changed -> BigInt,
deleted -> Bool,
}
}
diesel::table! {
devices (id) {
id -> BigInt,
device_id -> Text,
user_id -> BigInt,
sync_group_id -> Nullable<BigInt>,
caption -> Text,
#[sql_name = "type"]
type_ -> Text,
}
}
diesel::table! {
episode_actions (id) {
id -> BigInt,
user_id -> BigInt,
device_id -> Nullable<BigInt>,
podcast_url -> Text,
episode_url -> Text,
time_changed -> BigInt,
timestamp -> Nullable<BigInt>,
action -> Text,
started -> Nullable<Integer>,
position -> Nullable<Integer>,
total -> Nullable<Integer>,
}
}
diesel::table! {
sessions (id) {
id -> BigInt,
user_id -> BigInt,
last_seen -> BigInt,
}
}
diesel::table! {
sync_groups (id) {
id -> BigInt,
}
}
diesel::table! {
users (id) {
id -> BigInt,
username -> Text,
password_hash -> Text,
}
}
diesel::joinable!(device_subscriptions -> devices (device_id));
diesel::joinable!(devices -> sync_groups (sync_group_id));
diesel::joinable!(devices -> users (user_id));
diesel::joinable!(episode_actions -> devices (device_id));
diesel::joinable!(episode_actions -> users (user_id));
diesel::joinable!(sessions -> users (user_id));
diesel::allow_tables_to_appear_in_same_query!(
device_subscriptions,
devices,
episode_actions,
sessions,
sync_groups,
users,
);