This refactor allows new implementations of the store contract to reuse the same tests, ensuring all implementations support the same behavior.
310 lines
10 KiB
Rust
310 lines
10 KiB
Rust
use std::collections::HashSet;
|
|
|
|
use chrono::{DateTime, Utc};
|
|
use diesel::{alias, dsl::not, prelude::*};
|
|
use gpodder::AuthErr;
|
|
|
|
use super::SqliteRepository;
|
|
use crate::{
|
|
DbError,
|
|
models::{
|
|
device::{Device, DeviceType, NewDevice},
|
|
sync_group::SyncGroup,
|
|
},
|
|
schema::*,
|
|
};
|
|
|
|
impl From<DeviceType> for gpodder::DeviceType {
|
|
fn from(value: DeviceType) -> Self {
|
|
match value {
|
|
DeviceType::Desktop => Self::Desktop,
|
|
DeviceType::Laptop => Self::Laptop,
|
|
DeviceType::Mobile => Self::Mobile,
|
|
DeviceType::Server => Self::Server,
|
|
DeviceType::Other => Self::Other,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl From<gpodder::DeviceType> for DeviceType {
|
|
fn from(value: gpodder::DeviceType) -> Self {
|
|
match value {
|
|
gpodder::DeviceType::Desktop => Self::Desktop,
|
|
gpodder::DeviceType::Laptop => Self::Laptop,
|
|
gpodder::DeviceType::Mobile => Self::Mobile,
|
|
gpodder::DeviceType::Server => Self::Server,
|
|
gpodder::DeviceType::Other => Self::Other,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl gpodder::GpodderDeviceStore for SqliteRepository {
|
|
fn devices_for_user(
|
|
&self,
|
|
user: &gpodder::User,
|
|
) -> Result<Vec<gpodder::Device>, gpodder::AuthErr> {
|
|
(|| {
|
|
Ok::<_, DbError>(
|
|
devices::table
|
|
.select(Device::as_select())
|
|
.filter(devices::user_id.eq(user.id))
|
|
.get_results(&mut self.pool.get()?)?
|
|
.into_iter()
|
|
.map(|d| gpodder::Device {
|
|
id: d.device_id,
|
|
caption: d.caption,
|
|
r#type: d.type_.into(),
|
|
// TODO implement subscription count
|
|
subscriptions: 0,
|
|
})
|
|
.collect(),
|
|
)
|
|
})()
|
|
.map_err(AuthErr::from)
|
|
}
|
|
|
|
fn update_device_info(
|
|
&self,
|
|
user: &gpodder::User,
|
|
device_id: &str,
|
|
patch: gpodder::DevicePatch,
|
|
) -> Result<(), gpodder::AuthErr> {
|
|
(|| {
|
|
let conn = &mut self.pool.get()?;
|
|
|
|
if let Some(mut device) = devices::table
|
|
.select(Device::as_select())
|
|
.filter(
|
|
devices::user_id
|
|
.eq(user.id)
|
|
.and(devices::device_id.eq(device_id)),
|
|
)
|
|
.get_result(conn)
|
|
.optional()?
|
|
{
|
|
if let Some(caption) = patch.caption {
|
|
device.caption = caption;
|
|
}
|
|
|
|
if let Some(type_) = patch.r#type {
|
|
device.type_ = type_.into();
|
|
}
|
|
|
|
diesel::update(devices::table.filter(devices::id.eq(device.id)))
|
|
.set((
|
|
devices::caption.eq(&device.caption),
|
|
devices::type_.eq(&device.type_),
|
|
))
|
|
.execute(conn)?;
|
|
} else {
|
|
let device = NewDevice {
|
|
device_id: device_id.to_string(),
|
|
user_id: user.id,
|
|
caption: patch.caption.unwrap_or_default(),
|
|
type_: patch.r#type.unwrap_or(gpodder::DeviceType::Other).into(),
|
|
};
|
|
|
|
diesel::insert_into(devices::table)
|
|
.values(device)
|
|
.execute(conn)?;
|
|
}
|
|
|
|
Ok::<_, DbError>(())
|
|
})()
|
|
.map_err(AuthErr::from)
|
|
}
|
|
|
|
fn merge_sync_groups(
|
|
&self,
|
|
user: &gpodder::User,
|
|
device_ids: Vec<&str>,
|
|
) -> Result<i64, gpodder::AuthErr> {
|
|
(|| {
|
|
let conn = &mut self.pool.get()?;
|
|
|
|
conn.transaction(|conn| {
|
|
let devices: Vec<(i64, Option<i64>)> = devices::table
|
|
.select((devices::id, devices::sync_group_id))
|
|
.filter(
|
|
devices::user_id
|
|
.eq(user.id)
|
|
.and(devices::device_id.eq_any(device_ids)),
|
|
)
|
|
.get_results(conn)?;
|
|
|
|
let mut sync_group_ids: Vec<i64> = devices
|
|
.iter()
|
|
.filter_map(|(_, group_id)| *group_id)
|
|
.collect();
|
|
|
|
// Remove any duplicates, giving us each sync group ID once
|
|
sync_group_ids.sort();
|
|
sync_group_ids.dedup();
|
|
|
|
// If any of the devices are already in a sync group, we reuse the first one we find.
|
|
// Otherwise, we generate a new one.
|
|
let sync_group_id = if let Some(id) = sync_group_ids.pop() {
|
|
id
|
|
} else {
|
|
SyncGroup::new(conn)?.id
|
|
};
|
|
|
|
// Move all devices in the other sync groups into the new sync group
|
|
diesel::update(
|
|
devices::table.filter(devices::sync_group_id.eq_any(sync_group_ids.iter())),
|
|
)
|
|
.set(devices::sync_group_id.eq(sync_group_id))
|
|
.execute(conn)?;
|
|
|
|
// Add the non-synchronized devices into the new sync group
|
|
let unsynced_device_ids = devices
|
|
.iter()
|
|
.filter_map(|(id, group_id)| if group_id.is_none() { Some(id) } else { None });
|
|
|
|
diesel::update(devices::table.filter(devices::id.eq_any(unsynced_device_ids)))
|
|
.set(devices::sync_group_id.eq(sync_group_id))
|
|
.execute(conn)?;
|
|
|
|
// Remove the other now unused sync groups
|
|
diesel::delete(sync_groups::table.filter(sync_groups::id.eq_any(sync_group_ids)))
|
|
.execute(conn)?;
|
|
|
|
Ok::<_, DbError>(sync_group_id)
|
|
})
|
|
})()
|
|
.map_err(AuthErr::from)
|
|
}
|
|
|
|
fn remove_from_sync_group(
|
|
&self,
|
|
user: &gpodder::User,
|
|
device_ids: Vec<&str>,
|
|
) -> Result<(), gpodder::AuthErr> {
|
|
(|| {
|
|
let conn = &mut self.pool.get()?;
|
|
|
|
diesel::update(
|
|
devices::table.filter(
|
|
devices::user_id
|
|
.eq(user.id)
|
|
.and(devices::device_id.eq_any(device_ids)),
|
|
),
|
|
)
|
|
.set(devices::sync_group_id.eq(None::<i64>))
|
|
.execute(conn)?;
|
|
|
|
// This is in a different transaction on purpose, as the success of this removal shouldn't
|
|
// fail the entire query
|
|
SyncGroup::remove_unused(conn)?;
|
|
|
|
Ok::<_, DbError>(())
|
|
})()
|
|
.map_err(AuthErr::from)
|
|
}
|
|
|
|
fn synchronize_sync_group(
|
|
&self,
|
|
group_id: i64,
|
|
time_changed: DateTime<Utc>,
|
|
) -> Result<(), gpodder::AuthErr> {
|
|
(|| {
|
|
let time_changed = time_changed.timestamp();
|
|
let conn = &mut self.pool.get()?;
|
|
|
|
conn.transaction(|conn| {
|
|
let device_ids: Vec<i64> = devices::table
|
|
.filter(devices::sync_group_id.eq(group_id))
|
|
.select(devices::id)
|
|
.get_results(conn)?;
|
|
|
|
// For each device in the group, we get the list of subscriptions not yet in its own
|
|
// non-deleted list, and add it to the database
|
|
for device_id in device_ids.iter().copied() {
|
|
let d1 = alias!(device_subscriptions as d1);
|
|
|
|
let own_subscriptions = d1
|
|
.filter(
|
|
d1.field(device_subscriptions::device_id)
|
|
.eq(device_id)
|
|
.and(d1.field(device_subscriptions::deleted).eq(false)),
|
|
)
|
|
.select(d1.field(device_subscriptions::podcast_url));
|
|
|
|
let urls_to_add = device_subscriptions::table
|
|
.select(device_subscriptions::podcast_url)
|
|
.filter(
|
|
device_subscriptions::device_id
|
|
.eq_any(device_ids.iter())
|
|
.and(device_subscriptions::deleted.eq(false))
|
|
.and(not(
|
|
device_subscriptions::podcast_url.eq_any(own_subscriptions)
|
|
)),
|
|
)
|
|
.distinct()
|
|
.load_iter(conn)?
|
|
.collect::<Result<HashSet<String>, _>>()?;
|
|
|
|
super::subscription::insert_subscriptions_for_single_device(
|
|
conn,
|
|
device_id,
|
|
urls_to_add.iter(),
|
|
time_changed,
|
|
)?;
|
|
}
|
|
|
|
Ok::<_, DbError>(())
|
|
})
|
|
})()
|
|
.map_err(AuthErr::from)
|
|
}
|
|
|
|
fn devices_by_sync_group(
|
|
&self,
|
|
user: &gpodder::User,
|
|
) -> Result<(Vec<String>, Vec<Vec<String>>), gpodder::AuthErr> {
|
|
(|| {
|
|
let mut not_synchronized = Vec::new();
|
|
let mut synchronized = Vec::new();
|
|
|
|
let conn = &mut self.pool.get()?;
|
|
let mut devices = devices::table
|
|
.select((devices::device_id, devices::sync_group_id))
|
|
.filter(devices::user_id.eq(user.id))
|
|
.order(devices::sync_group_id)
|
|
.load_iter::<(String, Option<i64>), _>(conn)?;
|
|
|
|
let mut cur_group = &mut not_synchronized;
|
|
let mut cur_group_id: Option<i64> = None;
|
|
|
|
while let Some((device_id, group_id)) = devices.next().transpose()? {
|
|
if group_id != cur_group_id {
|
|
if group_id.is_none() {
|
|
cur_group = &mut not_synchronized;
|
|
} else {
|
|
synchronized.push(Vec::new());
|
|
let index = synchronized.len() - 1;
|
|
cur_group = &mut synchronized[index];
|
|
}
|
|
|
|
cur_group_id = group_id;
|
|
}
|
|
|
|
cur_group.push(device_id);
|
|
}
|
|
|
|
Ok::<_, DbError>((not_synchronized, synchronized))
|
|
})()
|
|
.map_err(AuthErr::from)
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use crate::SqliteRepository;
|
|
|
|
#[test]
|
|
fn test_insert_devices() {
|
|
let store = SqliteRepository::in_memory().unwrap();
|
|
gpodder_test::device::test_insert_devices(store);
|
|
}
|
|
}
|