feat: modify entire sync group when updating subscriptions
parent
025a69ea71
commit
cac80ca3e4
|
@ -18,6 +18,154 @@ impl From<(String, i64)> for gpodder::Subscription {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn set_subscriptions_for_single_device(
|
||||||
|
conn: &mut SqliteConnection,
|
||||||
|
device_id: i64,
|
||||||
|
urls: &HashSet<String>,
|
||||||
|
time_changed: i64,
|
||||||
|
) -> QueryResult<()> {
|
||||||
|
// https://github.com/diesel-rs/diesel/discussions/2826
|
||||||
|
// SQLite doesn't support default on conflict set values, so we can't handle this using
|
||||||
|
// on conflict. Therefore, we instead calculate which URLs should be inserted and which
|
||||||
|
// updated, so we avoid conflicts.
|
||||||
|
let urls_in_db: HashSet<String> = device_subscriptions::table
|
||||||
|
.select(device_subscriptions::podcast_url)
|
||||||
|
.filter(device_subscriptions::device_id.eq(device_id))
|
||||||
|
.get_results(conn)?
|
||||||
|
.into_iter()
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
// URLs originally in the database that are no longer in the list
|
||||||
|
let urls_to_delete = urls_in_db.difference(&urls);
|
||||||
|
|
||||||
|
// URLs not in the database that are in the new list
|
||||||
|
let urls_to_insert = urls.difference(&urls_in_db);
|
||||||
|
|
||||||
|
// URLs that are in both the database and the new list. For these, those marked as
|
||||||
|
// "deleted" in the database are updated so they're no longer deleted, with their
|
||||||
|
// timestamp updated.
|
||||||
|
let urls_to_update = urls.intersection(&urls_in_db);
|
||||||
|
|
||||||
|
// Mark the URLs to delete as properly deleted
|
||||||
|
diesel::update(
|
||||||
|
device_subscriptions::table.filter(
|
||||||
|
device_subscriptions::device_id
|
||||||
|
.eq(device_id)
|
||||||
|
.and(device_subscriptions::podcast_url.eq_any(urls_to_delete)),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
.set((
|
||||||
|
device_subscriptions::deleted.eq(true),
|
||||||
|
device_subscriptions::time_changed.eq(time_changed),
|
||||||
|
))
|
||||||
|
.execute(conn)?;
|
||||||
|
|
||||||
|
// Update the existing deleted URLs that are reinserted as no longer deleted
|
||||||
|
diesel::update(
|
||||||
|
device_subscriptions::table.filter(
|
||||||
|
device_subscriptions::device_id
|
||||||
|
.eq(device_id)
|
||||||
|
.and(device_subscriptions::podcast_url.eq_any(urls_to_update))
|
||||||
|
.and(device_subscriptions::deleted.eq(true)),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
.set((
|
||||||
|
device_subscriptions::deleted.eq(false),
|
||||||
|
device_subscriptions::time_changed.eq(time_changed),
|
||||||
|
))
|
||||||
|
.execute(conn)?;
|
||||||
|
|
||||||
|
// Insert the new values into the database
|
||||||
|
diesel::insert_into(device_subscriptions::table)
|
||||||
|
.values(
|
||||||
|
urls_to_insert
|
||||||
|
.into_iter()
|
||||||
|
.map(|url| db::NewDeviceSubscription {
|
||||||
|
device_id,
|
||||||
|
podcast_url: url.to_string(),
|
||||||
|
deleted: false,
|
||||||
|
time_changed,
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>(),
|
||||||
|
)
|
||||||
|
.execute(conn)?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn update_subscriptions_for_single_device(
|
||||||
|
conn: &mut SqliteConnection,
|
||||||
|
device_id: i64,
|
||||||
|
add: &HashSet<String>,
|
||||||
|
remove: &HashSet<String>,
|
||||||
|
time_changed: i64,
|
||||||
|
) -> QueryResult<()> {
|
||||||
|
let urls_in_db: HashSet<String> = device_subscriptions::table
|
||||||
|
.select(device_subscriptions::podcast_url)
|
||||||
|
.filter(device_subscriptions::device_id.eq(device_id))
|
||||||
|
.get_results(conn)?
|
||||||
|
.into_iter()
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
// Subscriptions to remove are those that were already in the database and are now part
|
||||||
|
// of the removed list. Subscriptions that were never added in the first place don't
|
||||||
|
// need to be marked as deleted. We also only update those that aren't already marked
|
||||||
|
// as deleted.
|
||||||
|
let urls_to_delete = remove.intersection(&urls_in_db);
|
||||||
|
|
||||||
|
diesel::update(
|
||||||
|
device_subscriptions::table.filter(
|
||||||
|
device_subscriptions::device_id
|
||||||
|
.eq(device_id)
|
||||||
|
.and(device_subscriptions::podcast_url.eq_any(urls_to_delete))
|
||||||
|
.and(device_subscriptions::deleted.eq(false)),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
.set((
|
||||||
|
device_subscriptions::deleted.eq(true),
|
||||||
|
device_subscriptions::time_changed.eq(time_changed),
|
||||||
|
))
|
||||||
|
.execute(conn)?;
|
||||||
|
|
||||||
|
// Subscriptions to update are those that are already in the database, but are also in
|
||||||
|
// the added list. Only those who were originally marked as deleted get updated.
|
||||||
|
let urls_to_update = add.intersection(&urls_in_db);
|
||||||
|
|
||||||
|
diesel::update(
|
||||||
|
device_subscriptions::table.filter(
|
||||||
|
device_subscriptions::device_id
|
||||||
|
.eq(device_id)
|
||||||
|
.and(device_subscriptions::podcast_url.eq_any(urls_to_update))
|
||||||
|
.and(device_subscriptions::deleted.eq(true)),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
.set((
|
||||||
|
device_subscriptions::deleted.eq(false),
|
||||||
|
device_subscriptions::time_changed.eq(time_changed),
|
||||||
|
))
|
||||||
|
.execute(conn)?;
|
||||||
|
|
||||||
|
// Subscriptions to insert are those that aren't in the database and are part of the
|
||||||
|
// added list
|
||||||
|
let urls_to_insert = add.difference(&urls_in_db);
|
||||||
|
|
||||||
|
diesel::insert_into(device_subscriptions::table)
|
||||||
|
.values(
|
||||||
|
urls_to_insert
|
||||||
|
.into_iter()
|
||||||
|
.map(|url| db::NewDeviceSubscription {
|
||||||
|
device_id,
|
||||||
|
podcast_url: url.to_string(),
|
||||||
|
deleted: false,
|
||||||
|
time_changed,
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>(),
|
||||||
|
)
|
||||||
|
.execute(conn)?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
impl gpodder::SubscriptionRepository for SqliteRepository {
|
impl gpodder::SubscriptionRepository for SqliteRepository {
|
||||||
fn subscriptions_for_user(
|
fn subscriptions_for_user(
|
||||||
&self,
|
&self,
|
||||||
|
@ -67,84 +215,33 @@ impl gpodder::SubscriptionRepository for SqliteRepository {
|
||||||
time_changed: chrono::DateTime<chrono::Utc>,
|
time_changed: chrono::DateTime<chrono::Utc>,
|
||||||
) -> Result<(), gpodder::AuthErr> {
|
) -> Result<(), gpodder::AuthErr> {
|
||||||
// TODO use a better timestamp
|
// TODO use a better timestamp
|
||||||
let timestamp = time_changed.timestamp();
|
let time_changed = time_changed.timestamp();
|
||||||
|
let urls: HashSet<String> = urls.into_iter().collect();
|
||||||
|
|
||||||
self.pool.get()?.transaction(|conn| {
|
self.pool.get()?.transaction(|conn| {
|
||||||
let device = devices::table
|
let (device_id, group_id) = devices::table
|
||||||
.select(db::Device::as_select())
|
.select((devices::id, devices::sync_group_id))
|
||||||
.filter(
|
.filter(
|
||||||
devices::user_id
|
devices::user_id
|
||||||
.eq(user.id)
|
.eq(user.id)
|
||||||
.and(devices::device_id.eq(device_id)),
|
.and(devices::device_id.eq(device_id)),
|
||||||
)
|
)
|
||||||
.get_result(conn)?;
|
.get_result::<(i64, Option<i64>)>(conn)?;
|
||||||
|
|
||||||
// https://github.com/diesel-rs/diesel/discussions/2826
|
// If the device is part of a sync group, we need to perform the update on every device
|
||||||
// SQLite doesn't support default on conflict set values, so we can't handle this using
|
// in the group
|
||||||
// on conflict. Therefore, we instead calculate which URLs should be inserted and which
|
if let Some(group_id) = group_id {
|
||||||
// updated, so we avoid conflicts.
|
let device_ids: Vec<i64> = devices::table
|
||||||
let urls: HashSet<String> = urls.into_iter().collect();
|
.filter(devices::sync_group_id.eq(group_id))
|
||||||
let urls_in_db: HashSet<String> = device_subscriptions::table
|
.select(devices::id)
|
||||||
.select(device_subscriptions::podcast_url)
|
.get_results(conn)?;
|
||||||
.filter(device_subscriptions::device_id.eq(device.id))
|
|
||||||
.get_results(conn)?
|
|
||||||
.into_iter()
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
// URLs originally in the database that are no longer in the list
|
for device_id in device_ids {
|
||||||
let urls_to_delete = urls_in_db.difference(&urls);
|
set_subscriptions_for_single_device(conn, device_id, &urls, time_changed)?;
|
||||||
|
}
|
||||||
// URLs not in the database that are in the new list
|
} else {
|
||||||
let urls_to_insert = urls.difference(&urls_in_db);
|
set_subscriptions_for_single_device(conn, device_id, &urls, time_changed)?;
|
||||||
|
}
|
||||||
// URLs that are in both the database and the new list. For these, those marked as
|
|
||||||
// "deleted" in the database are updated so they're no longer deleted, with their
|
|
||||||
// timestamp updated.
|
|
||||||
let urls_to_update = urls.intersection(&urls_in_db);
|
|
||||||
|
|
||||||
// Mark the URLs to delete as properly deleted
|
|
||||||
diesel::update(
|
|
||||||
device_subscriptions::table.filter(
|
|
||||||
device_subscriptions::device_id
|
|
||||||
.eq(device.id)
|
|
||||||
.and(device_subscriptions::podcast_url.eq_any(urls_to_delete)),
|
|
||||||
),
|
|
||||||
)
|
|
||||||
.set((
|
|
||||||
device_subscriptions::deleted.eq(true),
|
|
||||||
device_subscriptions::time_changed.eq(timestamp),
|
|
||||||
))
|
|
||||||
.execute(conn)?;
|
|
||||||
|
|
||||||
// Update the existing deleted URLs that are reinserted as no longer deleted
|
|
||||||
diesel::update(
|
|
||||||
device_subscriptions::table.filter(
|
|
||||||
device_subscriptions::device_id
|
|
||||||
.eq(device.id)
|
|
||||||
.and(device_subscriptions::podcast_url.eq_any(urls_to_update))
|
|
||||||
.and(device_subscriptions::deleted.eq(true)),
|
|
||||||
),
|
|
||||||
)
|
|
||||||
.set((
|
|
||||||
device_subscriptions::deleted.eq(false),
|
|
||||||
device_subscriptions::time_changed.eq(timestamp),
|
|
||||||
))
|
|
||||||
.execute(conn)?;
|
|
||||||
|
|
||||||
// Insert the new values into the database
|
|
||||||
diesel::insert_into(device_subscriptions::table)
|
|
||||||
.values(
|
|
||||||
urls_to_insert
|
|
||||||
.into_iter()
|
|
||||||
.map(|url| db::NewDeviceSubscription {
|
|
||||||
device_id: device.id,
|
|
||||||
podcast_url: url.to_string(),
|
|
||||||
deleted: false,
|
|
||||||
time_changed: timestamp,
|
|
||||||
})
|
|
||||||
.collect::<Vec<_>>(),
|
|
||||||
)
|
|
||||||
.execute(conn)?;
|
|
||||||
|
|
||||||
Ok::<_, diesel::result::Error>(())
|
Ok::<_, diesel::result::Error>(())
|
||||||
})?;
|
})?;
|
||||||
|
@ -161,7 +258,7 @@ impl gpodder::SubscriptionRepository for SqliteRepository {
|
||||||
time_changed: chrono::DateTime<chrono::Utc>,
|
time_changed: chrono::DateTime<chrono::Utc>,
|
||||||
) -> Result<(), gpodder::AuthErr> {
|
) -> Result<(), gpodder::AuthErr> {
|
||||||
// TODO use a better timestamp
|
// TODO use a better timestamp
|
||||||
let timestamp = time_changed.timestamp();
|
let time_changed = time_changed.timestamp();
|
||||||
|
|
||||||
// TODO URLs that are in both the added and removed lists will currently get "re-added",
|
// TODO URLs that are in both the added and removed lists will currently get "re-added",
|
||||||
// meaning their change timestamp will be updated even though they haven't really changed.
|
// meaning their change timestamp will be updated even though they haven't really changed.
|
||||||
|
@ -169,77 +266,41 @@ impl gpodder::SubscriptionRepository for SqliteRepository {
|
||||||
let remove: HashSet<_> = remove.into_iter().collect();
|
let remove: HashSet<_> = remove.into_iter().collect();
|
||||||
|
|
||||||
self.pool.get()?.transaction(|conn| {
|
self.pool.get()?.transaction(|conn| {
|
||||||
let device = devices::table
|
let (device_id, group_id) = devices::table
|
||||||
.select(db::Device::as_select())
|
.select((devices::id, devices::sync_group_id))
|
||||||
.filter(
|
.filter(
|
||||||
devices::user_id
|
devices::user_id
|
||||||
.eq(user.id)
|
.eq(user.id)
|
||||||
.and(devices::device_id.eq(device_id)),
|
.and(devices::device_id.eq(device_id)),
|
||||||
)
|
)
|
||||||
.get_result(conn)?;
|
.get_result::<(i64, Option<i64>)>(conn)?;
|
||||||
|
|
||||||
let urls_in_db: HashSet<String> = device_subscriptions::table
|
// If the device is part of a sync group, we need to perform the update on every device
|
||||||
.select(device_subscriptions::podcast_url)
|
// in the group
|
||||||
.filter(device_subscriptions::device_id.eq(device.id))
|
if let Some(group_id) = group_id {
|
||||||
.get_results(conn)?
|
let device_ids: Vec<i64> = devices::table
|
||||||
.into_iter()
|
.filter(devices::sync_group_id.eq(group_id))
|
||||||
.collect();
|
.select(devices::id)
|
||||||
|
.get_results(conn)?;
|
||||||
|
|
||||||
// Subscriptions to remove are those that were already in the database and are now part
|
for device_id in device_ids {
|
||||||
// of the removed list. Subscriptions that were never added in the first place don't
|
update_subscriptions_for_single_device(
|
||||||
// need to be marked as deleted. We also only update those that aren't already marked
|
conn,
|
||||||
// as deleted.
|
device_id,
|
||||||
let urls_to_delete = remove.intersection(&urls_in_db);
|
&add,
|
||||||
|
&remove,
|
||||||
diesel::update(
|
time_changed,
|
||||||
device_subscriptions::table.filter(
|
)?;
|
||||||
device_subscriptions::device_id
|
}
|
||||||
.eq(device.id)
|
} else {
|
||||||
.and(device_subscriptions::podcast_url.eq_any(urls_to_delete))
|
update_subscriptions_for_single_device(
|
||||||
.and(device_subscriptions::deleted.eq(false)),
|
conn,
|
||||||
),
|
device_id,
|
||||||
)
|
&add,
|
||||||
.set((
|
&remove,
|
||||||
device_subscriptions::deleted.eq(true),
|
time_changed,
|
||||||
device_subscriptions::time_changed.eq(timestamp),
|
)?;
|
||||||
))
|
}
|
||||||
.execute(conn)?;
|
|
||||||
|
|
||||||
// Subscriptions to update are those that are already in the database, but are also in
|
|
||||||
// the added list. Only those who were originally marked as deleted get updated.
|
|
||||||
let urls_to_update = add.intersection(&urls_in_db);
|
|
||||||
|
|
||||||
diesel::update(
|
|
||||||
device_subscriptions::table.filter(
|
|
||||||
device_subscriptions::device_id
|
|
||||||
.eq(device.id)
|
|
||||||
.and(device_subscriptions::podcast_url.eq_any(urls_to_update))
|
|
||||||
.and(device_subscriptions::deleted.eq(true)),
|
|
||||||
),
|
|
||||||
)
|
|
||||||
.set((
|
|
||||||
device_subscriptions::deleted.eq(false),
|
|
||||||
device_subscriptions::time_changed.eq(timestamp),
|
|
||||||
))
|
|
||||||
.execute(conn)?;
|
|
||||||
|
|
||||||
// Subscriptions to insert are those that aren't in the database and are part of the
|
|
||||||
// added list
|
|
||||||
let urls_to_insert = add.difference(&urls_in_db);
|
|
||||||
|
|
||||||
diesel::insert_into(device_subscriptions::table)
|
|
||||||
.values(
|
|
||||||
urls_to_insert
|
|
||||||
.into_iter()
|
|
||||||
.map(|url| db::NewDeviceSubscription {
|
|
||||||
device_id: device.id,
|
|
||||||
podcast_url: url.to_string(),
|
|
||||||
deleted: false,
|
|
||||||
time_changed: timestamp,
|
|
||||||
})
|
|
||||||
.collect::<Vec<_>>(),
|
|
||||||
)
|
|
||||||
.execute(conn)?;
|
|
||||||
|
|
||||||
Ok::<_, diesel::result::Error>(())
|
Ok::<_, diesel::result::Error>(())
|
||||||
})?;
|
})?;
|
||||||
|
|
Loading…
Reference in New Issue