300 lines
10 KiB
Rust
300 lines
10 KiB
Rust
use std::collections::HashSet;
|
|
|
|
use chrono::{DateTime, Utc};
|
|
use diesel::{alias, dsl::not, prelude::*};
|
|
use gpodder::AuthErr;
|
|
|
|
use super::SqliteRepository;
|
|
use crate::{
|
|
DbError,
|
|
models::{
|
|
device::{Device, DeviceType, NewDevice},
|
|
sync_group::SyncGroup,
|
|
},
|
|
schema::*,
|
|
};
|
|
|
|
impl From<DeviceType> for gpodder::DeviceType {
|
|
fn from(value: DeviceType) -> Self {
|
|
match value {
|
|
DeviceType::Desktop => Self::Desktop,
|
|
DeviceType::Laptop => Self::Laptop,
|
|
DeviceType::Mobile => Self::Mobile,
|
|
DeviceType::Server => Self::Server,
|
|
DeviceType::Other => Self::Other,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl From<gpodder::DeviceType> for DeviceType {
|
|
fn from(value: gpodder::DeviceType) -> Self {
|
|
match value {
|
|
gpodder::DeviceType::Desktop => Self::Desktop,
|
|
gpodder::DeviceType::Laptop => Self::Laptop,
|
|
gpodder::DeviceType::Mobile => Self::Mobile,
|
|
gpodder::DeviceType::Server => Self::Server,
|
|
gpodder::DeviceType::Other => Self::Other,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl gpodder::GpodderDeviceStore for SqliteRepository {
|
|
fn devices_for_user(
|
|
&self,
|
|
user: &gpodder::User,
|
|
) -> Result<Vec<gpodder::Device>, gpodder::AuthErr> {
|
|
(|| {
|
|
Ok::<_, DbError>(
|
|
devices::table
|
|
.select(Device::as_select())
|
|
.filter(devices::user_id.eq(user.id))
|
|
.get_results(&mut self.pool.get()?)?
|
|
.into_iter()
|
|
.map(|d| gpodder::Device {
|
|
id: d.device_id,
|
|
caption: d.caption,
|
|
r#type: d.type_.into(),
|
|
// TODO implement subscription count
|
|
subscriptions: 0,
|
|
})
|
|
.collect(),
|
|
)
|
|
})()
|
|
.map_err(AuthErr::from)
|
|
}
|
|
|
|
fn update_device_info(
|
|
&self,
|
|
user: &gpodder::User,
|
|
device_id: &str,
|
|
patch: gpodder::DevicePatch,
|
|
) -> Result<(), gpodder::AuthErr> {
|
|
(|| {
|
|
let conn = &mut self.pool.get()?;
|
|
|
|
if let Some(mut device) = devices::table
|
|
.select(Device::as_select())
|
|
.filter(
|
|
devices::user_id
|
|
.eq(user.id)
|
|
.and(devices::device_id.eq(device_id)),
|
|
)
|
|
.get_result(conn)
|
|
.optional()?
|
|
{
|
|
if let Some(caption) = patch.caption {
|
|
device.caption = caption;
|
|
}
|
|
|
|
if let Some(type_) = patch.r#type {
|
|
device.type_ = type_.into();
|
|
}
|
|
|
|
diesel::update(devices::table.filter(devices::id.eq(device.id)))
|
|
.set((
|
|
devices::caption.eq(&device.caption),
|
|
devices::type_.eq(&device.type_),
|
|
))
|
|
.execute(conn)?;
|
|
} else {
|
|
let device = NewDevice {
|
|
device_id: device_id.to_string(),
|
|
user_id: user.id,
|
|
caption: patch.caption.unwrap_or_default(),
|
|
type_: patch.r#type.unwrap_or(gpodder::DeviceType::Other).into(),
|
|
};
|
|
|
|
diesel::insert_into(devices::table)
|
|
.values(device)
|
|
.execute(conn)?;
|
|
}
|
|
|
|
Ok::<_, DbError>(())
|
|
})()
|
|
.map_err(AuthErr::from)
|
|
}
|
|
|
|
fn merge_sync_groups(
|
|
&self,
|
|
user: &gpodder::User,
|
|
device_ids: Vec<&str>,
|
|
) -> Result<i64, gpodder::AuthErr> {
|
|
(|| {
|
|
let conn = &mut self.pool.get()?;
|
|
|
|
conn.transaction(|conn| {
|
|
let devices: Vec<(i64, Option<i64>)> = devices::table
|
|
.select((devices::id, devices::sync_group_id))
|
|
.filter(
|
|
devices::user_id
|
|
.eq(user.id)
|
|
.and(devices::device_id.eq_any(device_ids)),
|
|
)
|
|
.get_results(conn)?;
|
|
|
|
let mut sync_group_ids: Vec<i64> = devices
|
|
.iter()
|
|
.filter_map(|(_, group_id)| *group_id)
|
|
.collect();
|
|
|
|
// Remove any duplicates, giving us each sync group ID once
|
|
sync_group_ids.sort();
|
|
sync_group_ids.dedup();
|
|
|
|
// If any of the devices are already in a sync group, we reuse the first one we find.
|
|
// Otherwise, we generate a new one.
|
|
let sync_group_id = if let Some(id) = sync_group_ids.pop() {
|
|
id
|
|
} else {
|
|
SyncGroup::new(conn)?.id
|
|
};
|
|
|
|
// Move all devices in the other sync groups into the new sync group
|
|
diesel::update(
|
|
devices::table.filter(devices::sync_group_id.eq_any(sync_group_ids.iter())),
|
|
)
|
|
.set(devices::sync_group_id.eq(sync_group_id))
|
|
.execute(conn)?;
|
|
|
|
// Add the non-synchronized devices into the new sync group
|
|
let unsynced_device_ids = devices
|
|
.iter()
|
|
.filter_map(|(id, group_id)| if group_id.is_none() { Some(id) } else { None });
|
|
|
|
diesel::update(devices::table.filter(devices::id.eq_any(unsynced_device_ids)))
|
|
.set(devices::sync_group_id.eq(sync_group_id))
|
|
.execute(conn)?;
|
|
|
|
// Remove the other now unused sync groups
|
|
diesel::delete(sync_groups::table.filter(sync_groups::id.eq_any(sync_group_ids)))
|
|
.execute(conn)?;
|
|
|
|
Ok::<_, DbError>(sync_group_id)
|
|
})
|
|
})()
|
|
.map_err(AuthErr::from)
|
|
}
|
|
|
|
fn remove_from_sync_group(
|
|
&self,
|
|
user: &gpodder::User,
|
|
device_ids: Vec<&str>,
|
|
) -> Result<(), gpodder::AuthErr> {
|
|
(|| {
|
|
let conn = &mut self.pool.get()?;
|
|
|
|
diesel::update(
|
|
devices::table.filter(
|
|
devices::user_id
|
|
.eq(user.id)
|
|
.and(devices::device_id.eq_any(device_ids)),
|
|
),
|
|
)
|
|
.set(devices::sync_group_id.eq(None::<i64>))
|
|
.execute(conn)?;
|
|
|
|
// This is in a different transaction on purpose, as the success of this removal shouldn't
|
|
// fail the entire query
|
|
SyncGroup::remove_unused(conn)?;
|
|
|
|
Ok::<_, DbError>(())
|
|
})()
|
|
.map_err(AuthErr::from)
|
|
}
|
|
|
|
fn synchronize_sync_group(
|
|
&self,
|
|
group_id: i64,
|
|
time_changed: DateTime<Utc>,
|
|
) -> Result<(), gpodder::AuthErr> {
|
|
(|| {
|
|
let time_changed = time_changed.timestamp();
|
|
let conn = &mut self.pool.get()?;
|
|
|
|
conn.transaction(|conn| {
|
|
let device_ids: Vec<i64> = devices::table
|
|
.filter(devices::sync_group_id.eq(group_id))
|
|
.select(devices::id)
|
|
.get_results(conn)?;
|
|
|
|
// For each device in the group, we get the list of subscriptions not yet in its own
|
|
// non-deleted list, and add it to the database
|
|
for device_id in device_ids.iter().copied() {
|
|
let d1 = alias!(device_subscriptions as d1);
|
|
|
|
let own_subscriptions = d1
|
|
.filter(
|
|
d1.field(device_subscriptions::device_id)
|
|
.eq(device_id)
|
|
.and(d1.field(device_subscriptions::deleted).eq(false)),
|
|
)
|
|
.select(d1.field(device_subscriptions::podcast_url));
|
|
|
|
let urls_to_add = device_subscriptions::table
|
|
.select(device_subscriptions::podcast_url)
|
|
.filter(
|
|
device_subscriptions::device_id
|
|
.eq_any(device_ids.iter())
|
|
.and(device_subscriptions::deleted.eq(false))
|
|
.and(not(
|
|
device_subscriptions::podcast_url.eq_any(own_subscriptions)
|
|
)),
|
|
)
|
|
.distinct()
|
|
.load_iter(conn)?
|
|
.collect::<Result<HashSet<String>, _>>()?;
|
|
|
|
super::subscription::insert_subscriptions_for_single_device(
|
|
conn,
|
|
device_id,
|
|
urls_to_add.iter(),
|
|
time_changed,
|
|
)?;
|
|
}
|
|
|
|
Ok::<_, DbError>(())
|
|
})
|
|
})()
|
|
.map_err(AuthErr::from)
|
|
}
|
|
|
|
fn devices_by_sync_group(
|
|
&self,
|
|
user: &gpodder::User,
|
|
) -> Result<(Vec<String>, Vec<Vec<String>>), gpodder::AuthErr> {
|
|
(|| {
|
|
let mut not_synchronized = Vec::new();
|
|
let mut synchronized = Vec::new();
|
|
|
|
let conn = &mut self.pool.get()?;
|
|
let mut devices = devices::table
|
|
.select((devices::device_id, devices::sync_group_id))
|
|
.filter(devices::user_id.eq(user.id))
|
|
.order(devices::sync_group_id)
|
|
.load_iter::<(String, Option<i64>), _>(conn)?;
|
|
|
|
let mut cur_group = &mut not_synchronized;
|
|
let mut cur_group_id: Option<i64> = None;
|
|
|
|
while let Some((device_id, group_id)) = devices.next().transpose()? {
|
|
if group_id != cur_group_id {
|
|
if group_id.is_none() {
|
|
cur_group = &mut not_synchronized;
|
|
} else {
|
|
synchronized.push(Vec::new());
|
|
let index = synchronized.len() - 1;
|
|
cur_group = &mut synchronized[index];
|
|
}
|
|
|
|
cur_group_id = group_id;
|
|
}
|
|
|
|
cur_group.push(device_id);
|
|
}
|
|
|
|
Ok::<_, DbError>((not_synchronized, synchronized))
|
|
})()
|
|
.map_err(AuthErr::from)
|
|
}
|
|
}
|