From 27fb0e8d55e0e34822825b45cd4cdcc451960a60 Mon Sep 17 00:00:00 2001 From: vvarlet Date: Fri, 19 Jun 2026 16:23:09 +0200 Subject: [PATCH 1/5] feat(slack): sync Saved for Later items via `void slack saved` Add background sync of Slack's "Later" view using `search.messages` with `is:saved`. Items are stored with an `is_saved` flag (DB migration v13) and reconciled on each sync cycle. Missing messages are fetched on demand. The command lives under `void slack saved` (Slack-specific). Includes: API client, sync logic with fetch-on-miss, CLI subcommand, DB reconcile/list/count helpers, setup wizard scope docs, and tests. Co-authored-by: Cursor --- CHANGELOG.md | 2 + crates/void-cli/src/cli.rs | 49 +++++++ crates/void-cli/src/commands/resolve.rs | 1 + crates/void-cli/src/commands/setup/slack.rs | 4 +- crates/void-cli/src/commands/slack/args.rs | 2 + crates/void-cli/src/commands/slack/mod.rs | 2 + crates/void-cli/src/commands/slack/saved.rs | 41 ++++++ crates/void-cli/src/context.rs | 7 + .../read_paths_snapshots__inbox.snap | 2 + crates/void-core/src/db/database_access.rs | 30 ++++ crates/void-core/src/db/messages/archive.rs | 2 +- crates/void-core/src/db/messages/inbox.rs | 4 +- crates/void-core/src/db/messages/lookup.rs | 6 +- crates/void-core/src/db/messages/mod.rs | 2 + crates/void-core/src/db/messages/read.rs | 6 +- crates/void-core/src/db/messages/saved.rs | 114 +++++++++++++++ crates/void-core/src/db/messages/upsert.rs | 5 +- crates/void-core/src/db/row.rs | 1 + crates/void-core/src/db/schema.rs | 18 ++- crates/void-core/src/db/search.rs | 4 +- crates/void-core/src/db/tests/crud.rs | 23 +++- crates/void-core/src/db/tests/mod.rs | 1 + crates/void-core/src/db/tests/saved.rs | 130 ++++++++++++++++++ crates/void-core/src/hooks/tests.rs | 1 + crates/void-core/src/models/message.rs | 2 + crates/void-core/src/models/tests.rs | 2 + crates/void-core/src/test_fixtures.rs | 2 + crates/void-gmail/src/connector/sync.rs | 1 + crates/void-gmail/src/connector/tests.rs | 2 + crates/void-googlenews/src/connector/sync.rs | 1 + crates/void-hackernews/src/connector/sync.rs | 1 + .../void-linkedin/src/connector/posts_sync.rs | 1 + crates/void-linkedin/src/connector/sync.rs | 1 + crates/void-slack/src/api/conversations.rs | 23 ++++ crates/void-slack/src/api/tests.rs | 32 +++++ crates/void-slack/src/api/types.rs | 31 +++++ .../src/connector/connector_trait.rs | 7 + crates/void-slack/src/connector/mapping.rs | 1 + .../void-slack/src/connector/socket_mode.rs | 12 +- crates/void-slack/src/connector/sync.rs | 130 ++++++++++++++++++ crates/void-slack/src/connector/tests.rs | 107 ++++++++++++++ crates/void-telegram/src/connector/sync.rs | 1 + crates/void-whatsapp/src/connector/sync.rs | 2 + docs/commands.md | 1 + 44 files changed, 799 insertions(+), 18 deletions(-) create mode 100644 crates/void-cli/src/commands/slack/saved.rs create mode 100644 crates/void-core/src/db/messages/saved.rs create mode 100644 crates/void-core/src/db/tests/saved.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 17f8625..ab6125b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- **Slack** — Sync "Saved for Later" items from the Later view during background sync; view with `void slack saved`. Setup wizard documents the required `search:read` scope. - **Google News** — New read-only connector that watches the public Google News RSS feed. Each configured keyword triggers its own search; matching articles land in your inbox, filtered by a recency window. Configure with `void gn keywords`, `void gn when`, `void gn language`, and `void gn country` (or interactively via `void setup`). Language/country default to `fr`/`FR`; add one connection per edition to follow several. ## [0.10.3] - 2026-06-17 @@ -388,3 +389,4 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - **CLI commands** — `inbox`, `conversations`, `messages`, `search`, `contacts`, `channels`, `calendar`, `send`, `reply`, `archive`, `doctor`, `status` - **Output formatting** — JSON mode and human-readable tables - **Skills** — daily routine, calendar, Gmail, Slack, WhatsApp skill files + diff --git a/crates/void-cli/src/cli.rs b/crates/void-cli/src/cli.rs index 11d76e4..0ef99c0 100644 --- a/crates/void-cli/src/cli.rs +++ b/crates/void-cli/src/cli.rs @@ -301,6 +301,55 @@ mod tests { assert!(context::runs_with_local_cache(cmd)); } + #[test] + fn slack_saved_uses_local_cache() { + let cli = parse(&["void", "slack", "saved"]); + let cmd = cli.command.as_ref().expect("command"); + assert!(context::runs_with_local_cache(cmd)); + } + + #[test] + fn parse_slack_saved_minimal() { + let cli = parse(&["void", "slack", "saved"]); + match cli.command { + Some(Command::Slack(ref args)) => match &args.command { + commands::slack::SlackCommand::Saved(saved) => { + assert!(saved.connection.is_none()); + assert_eq!(saved.size, 50); + assert_eq!(saved.page, 1); + } + other => panic!("expected Saved, got {other:?}"), + }, + other => panic!("expected Slack, got {other:?}"), + } + } + + #[test] + fn parse_slack_saved_with_filters() { + let cli = parse(&[ + "void", + "slack", + "saved", + "--connection", + "slack-gladia", + "-n", + "10", + "--page", + "2", + ]); + match cli.command { + Some(Command::Slack(ref args)) => match &args.command { + commands::slack::SlackCommand::Saved(saved) => { + assert_eq!(saved.connection.as_deref(), Some("slack-gladia")); + assert_eq!(saved.size, 10); + assert_eq!(saved.page, 2); + } + other => panic!("expected Saved, got {other:?}"), + }, + other => panic!("expected Slack, got {other:?}"), + } + } + // --- Gmail forward parsing --- #[test] diff --git a/crates/void-cli/src/commands/resolve.rs b/crates/void-cli/src/commands/resolve.rs index ac23d68..0eadfea 100644 --- a/crates/void-cli/src/commands/resolve.rs +++ b/crates/void-cli/src/commands/resolve.rs @@ -161,6 +161,7 @@ mod tests { timestamp: 1, synced_at: None, is_archived: false, + is_saved: false, reply_to_id: None, media_type: None, metadata: None, diff --git a/crates/void-cli/src/commands/setup/slack.rs b/crates/void-cli/src/commands/setup/slack.rs index 0bf4707..3d9abb4 100644 --- a/crates/void-cli/src/commands/setup/slack.rs +++ b/crates/void-cli/src/commands/setup/slack.rs @@ -94,6 +94,7 @@ pub(crate) async fn setup_slack( eprintln!(" mpim:read — View basic info about group DMs"); eprintln!(" reactions:read — View emoji reactions"); eprintln!(" reactions:write — Add emoji reactions"); + eprintln!(" search:read — Search workspace content (Saved for Later)"); eprintln!(" users:read — View people in the workspace"); eprintln!(); if !confirm_default_yes("Done? Continue to next step") { @@ -145,7 +146,8 @@ pub(crate) async fn setup_slack( eprintln!("STEP 5 — Install the App & Collect Tokens"); eprintln!(); eprintln!(" Go to \"Install App\" in the left sidebar and install to your workspace."); - eprintln!(" (If already installed, click \"Reinstall to Workspace\" to apply scope changes.)"); + eprintln!(" (If already installed, click \"Reinstall to Workspace\" to apply scope changes,"); + eprintln!(" including search:read for Saved for Later sync.)"); eprintln!(); eprintln!(" You need two tokens:"); eprintln!(" • User OAuth Token (xoxp-...) → found under \"OAuth & Permissions\""); diff --git a/crates/void-cli/src/commands/slack/args.rs b/crates/void-cli/src/commands/slack/args.rs index 95255fb..5915083 100644 --- a/crates/void-cli/src/commands/slack/args.rs +++ b/crates/void-cli/src/commands/slack/args.rs @@ -18,6 +18,8 @@ pub enum SlackCommand { Open(OpenArgs), /// Forward a message to another channel or user Forward(ForwardArgs), + /// Show messages saved for later (Slack Later view) + Saved(super::saved::SavedArgs), } #[derive(Debug, Args)] diff --git a/crates/void-cli/src/commands/slack/mod.rs b/crates/void-cli/src/commands/slack/mod.rs index 208c7fe..8491c8b 100644 --- a/crates/void-cli/src/commands/slack/mod.rs +++ b/crates/void-cli/src/commands/slack/mod.rs @@ -1,6 +1,7 @@ //! Slack CLI helpers (react, edit, schedule, open DM/group). mod args; +mod saved; pub use args::*; @@ -17,6 +18,7 @@ pub async fn run(args: &SlackArgs) -> anyhow::Result<()> { SlackCommand::Schedule(a) => run_schedule(a).await, SlackCommand::Open(a) => run_open(a).await, SlackCommand::Forward(a) => run_forward(a).await, + SlackCommand::Saved(a) => saved::run(a), } } diff --git a/crates/void-cli/src/commands/slack/saved.rs b/crates/void-cli/src/commands/slack/saved.rs new file mode 100644 index 0000000..6f2abba --- /dev/null +++ b/crates/void-cli/src/commands/slack/saved.rs @@ -0,0 +1,41 @@ +use clap::Args; +use tracing::debug; + +use super::super::pagination::{build_meta, parse_page}; +use crate::output::OutputFormatter; + +#[derive(Debug, Args)] +pub struct SavedArgs { + /// Filter by connection (partial match on connection_id) + #[arg(long)] + pub connection: Option, + /// Maximum number of results to return + #[arg(short = 'n', long, default_value = "50")] + pub size: i64, + /// Page number (1-based) + #[arg(long, default_value = "1")] + pub page: i64, +} + +pub fn run(args: &SavedArgs) -> anyhow::Result<()> { + debug!( + connection = ?args.connection, + size = args.size, + page = args.page, + "slack saved" + ); + let _cfg = crate::context::config(); + let db = crate::context::open_db()?; + let formatter = OutputFormatter::new(); + let offset = parse_page(args.size, args.page)?; + + let (mut messages, total_elements) = db.list_saved_messages( + args.connection.as_deref(), + Some("slack"), + args.size, + offset, + )?; + messages.reverse(); + let meta = build_meta(args.page, args.size, total_elements); + formatter.print_paginated(&messages, meta) +} diff --git a/crates/void-cli/src/context.rs b/crates/void-cli/src/context.rs index d8487dc..cf76e8e 100644 --- a/crates/void-cli/src/context.rs +++ b/crates/void-cli/src/context.rs @@ -136,6 +136,7 @@ pub(crate) fn runs_with_local_cache(command: &crate::Command) -> bool { | Command::Remote(_) => true, Command::Calendar(args) => calendar_reads_local_cache(args), Command::Hn(args) => hackernews_reads_local_cache(args), + Command::Slack(args) => slack_reads_local_cache(args), Command::Sync(args) => args.status, Command::Setup => false, _ => false, @@ -149,6 +150,12 @@ fn calendar_reads_local_cache(args: &crate::commands::calendar::CalendarArgs) -> matches!(args.command, None | Some(CalendarCommand::Week)) } +fn slack_reads_local_cache(args: &crate::commands::slack::SlackArgs) -> bool { + use crate::commands::slack::SlackCommand; + + matches!(args.command, SlackCommand::Saved(_)) +} + fn hackernews_reads_local_cache(args: &crate::commands::hackernews::HackerNewsArgs) -> bool { use crate::commands::hackernews::{HnCommand, KeywordsAction}; diff --git a/crates/void-cli/tests/snapshots/read_paths_snapshots__inbox.snap b/crates/void-cli/tests/snapshots/read_paths_snapshots__inbox.snap index f4c0ec8..fd187ac 100644 --- a/crates/void-cli/tests/snapshots/read_paths_snapshots__inbox.snap +++ b/crates/void-cli/tests/snapshots/read_paths_snapshots__inbox.snap @@ -13,6 +13,7 @@ expression: json "external_id": "ext-m1", "id": "m1", "is_archived": false, + "is_saved": false, "media_type": null, "metadata": null, "reply_to_id": null, @@ -31,6 +32,7 @@ expression: json "external_id": "ext-m2", "id": "m2", "is_archived": false, + "is_saved": false, "media_type": null, "metadata": null, "reply_to_id": null, diff --git a/crates/void-core/src/db/database_access.rs b/crates/void-core/src/db/database_access.rs index a76371c..2352679 100644 --- a/crates/void-core/src/db/database_access.rs +++ b/crates/void-core/src/db/database_access.rs @@ -243,6 +243,36 @@ impl Database { messages::reconcile_inbox(&*self.conn()?, connection_id, connector, inbox_external_ids) } + /// Reconcile `is_saved` for all messages of a connection to match the given saved set. + /// Returns (newly_saved_count, newly_unsaved_count). + pub fn reconcile_saved( + &self, + connection_id: &str, + connector: &str, + saved_external_ids: &std::collections::HashSet, + ) -> Result<(usize, usize), DbError> { + messages::reconcile_saved(&*self.conn()?, connection_id, connector, saved_external_ids) + } + + pub fn list_saved_messages( + &self, + connection_filter: Option<&str>, + connector_filter: Option<&str>, + limit: i64, + offset: i64, + ) -> Result<(Vec, i64), DbError> { + let conn = self.conn()?; + let rows = messages::list_saved( + &conn, + connection_filter, + connector_filter, + limit, + offset, + )?; + let total = messages::count_saved(&conn, connection_filter, connector_filter)?; + Ok((rows, total)) + } + pub fn find_message_by_external_id( &self, connection_id: &str, diff --git a/crates/void-core/src/db/messages/archive.rs b/crates/void-core/src/db/messages/archive.rs index 06ca0d7..45c3f14 100644 --- a/crates/void-core/src/db/messages/archive.rs +++ b/crates/void-core/src/db/messages/archive.rs @@ -14,7 +14,7 @@ pub fn bulk_archive_before( connector_filter: Option<&str>, ) -> Result, DbError> { let mut sql = String::from( - "SELECT id, conversation_id, connection_id, connector, external_id, sender, sender_name, sender_avatar_url, body, timestamp, synced_at, is_archived, reply_to_id, media_type, metadata, context_id + "SELECT id, conversation_id, connection_id, connector, external_id, sender, sender_name, sender_avatar_url, body, timestamp, synced_at, is_archived, reply_to_id, media_type, metadata, context_id, is_saved FROM messages WHERE is_archived = 0 AND timestamp < ?1", ); let mut param_values: Vec> = vec![Box::new(before_ts)]; diff --git a/crates/void-core/src/db/messages/inbox.rs b/crates/void-core/src/db/messages/inbox.rs index f4aa2e7..a03cca5 100644 --- a/crates/void-core/src/db/messages/inbox.rs +++ b/crates/void-core/src/db/messages/inbox.rs @@ -20,7 +20,7 @@ pub fn enrich_with_context(conn: &Connection, messages: &mut [Message]) -> Resul let mut context_map: HashMap> = HashMap::new(); let mut stmt = conn.prepare( - "SELECT id, conversation_id, connection_id, connector, external_id, sender, sender_name, sender_avatar_url, body, timestamp, synced_at, is_archived, reply_to_id, media_type, metadata, context_id + "SELECT id, conversation_id, connection_id, connector, external_id, sender, sender_name, sender_avatar_url, body, timestamp, synced_at, is_archived, reply_to_id, media_type, metadata, context_id, is_saved FROM messages WHERE context_id = ?1 ORDER BY timestamp ASC LIMIT 50", )?; @@ -90,7 +90,7 @@ pub fn messages_pending_file_download( limit: i64, ) -> Result, DbError> { let mut stmt = conn.prepare( - "SELECT id, conversation_id, connection_id, connector, external_id, sender, sender_name, sender_avatar_url, body, timestamp, synced_at, is_archived, reply_to_id, media_type, metadata, context_id + "SELECT id, conversation_id, connection_id, connector, external_id, sender, sender_name, sender_avatar_url, body, timestamp, synced_at, is_archived, reply_to_id, media_type, metadata, context_id, is_saved FROM messages WHERE connection_id = ?1 AND connector = ?2 AND metadata LIKE '%url_private%' diff --git a/crates/void-core/src/db/messages/lookup.rs b/crates/void-core/src/db/messages/lookup.rs index 29f7f33..dabf559 100644 --- a/crates/void-core/src/db/messages/lookup.rs +++ b/crates/void-core/src/db/messages/lookup.rs @@ -10,7 +10,7 @@ pub fn find_by_external_id( external_id: &str, ) -> Result, DbError> { conn.query_row( - "SELECT id, conversation_id, connection_id, connector, external_id, sender, sender_name, sender_avatar_url, body, timestamp, synced_at, is_archived, reply_to_id, media_type, metadata, context_id + "SELECT id, conversation_id, connection_id, connector, external_id, sender, sender_name, sender_avatar_url, body, timestamp, synced_at, is_archived, reply_to_id, media_type, metadata, context_id, is_saved FROM messages WHERE connection_id = ?1 AND external_id = ?2", params![connection_id, external_id], row::row_to_message, @@ -32,7 +32,7 @@ pub fn find_by_slack_link( message_ts: &str, ) -> Result, DbError> { conn.query_row( - "SELECT m.id, m.conversation_id, m.connection_id, m.connector, m.external_id, m.sender, m.sender_name, m.sender_avatar_url, m.body, m.timestamp, m.synced_at, m.is_archived, m.reply_to_id, m.media_type, m.metadata, m.context_id + "SELECT m.id, m.conversation_id, m.connection_id, m.connector, m.external_id, m.sender, m.sender_name, m.sender_avatar_url, m.body, m.timestamp, m.synced_at, m.is_archived, m.reply_to_id, m.media_type, m.metadata, m.context_id, m.is_saved FROM messages m JOIN conversations c ON m.conversation_id = c.id WHERE m.connector = 'slack' @@ -69,7 +69,7 @@ pub fn last_in_conversation( conversation_id: &str, ) -> Result, DbError> { conn.query_row( - "SELECT id, conversation_id, connection_id, connector, external_id, sender, sender_name, sender_avatar_url, body, timestamp, synced_at, is_archived, reply_to_id, media_type, metadata, context_id + "SELECT id, conversation_id, connection_id, connector, external_id, sender, sender_name, sender_avatar_url, body, timestamp, synced_at, is_archived, reply_to_id, media_type, metadata, context_id, is_saved FROM messages WHERE conversation_id = ?1 ORDER BY timestamp DESC LIMIT 1", params![conversation_id], row::row_to_message, diff --git a/crates/void-core/src/db/messages/mod.rs b/crates/void-core/src/db/messages/mod.rs index 42e2a99..7606c87 100644 --- a/crates/void-core/src/db/messages/mod.rs +++ b/crates/void-core/src/db/messages/mod.rs @@ -4,6 +4,7 @@ mod archive; mod inbox; mod lookup; mod read; +mod saved; mod upsert; /// SQL clause that keeps only the most recent message per `context_id`, @@ -27,6 +28,7 @@ pub use inbox::{ backfill_avatar_urls, enrich_with_context, messages_pending_file_download, reconcile_inbox, senders_missing_avatar, }; +pub use saved::{count_saved, list_saved, reconcile_saved}; pub use lookup::{ find_by_external_id, find_by_slack_link, find_slack_conversation_by_external_id, last_in_conversation, diff --git a/crates/void-core/src/db/messages/read.rs b/crates/void-core/src/db/messages/read.rs index f08ff81..32497bb 100644 --- a/crates/void-core/src/db/messages/read.rs +++ b/crates/void-core/src/db/messages/read.rs @@ -16,7 +16,7 @@ pub fn list_for_conversation( ) -> Result, DbError> { let suffix_pattern = format!("%-{conversation_id}"); let mut sql = String::from( - "SELECT id, conversation_id, connection_id, connector, external_id, sender, sender_name, sender_avatar_url, body, timestamp, synced_at, is_archived, reply_to_id, media_type, metadata, context_id + "SELECT id, conversation_id, connection_id, connector, external_id, sender, sender_name, sender_avatar_url, body, timestamp, synced_at, is_archived, reply_to_id, media_type, metadata, context_id, is_saved FROM messages WHERE (conversation_id = ?1 OR conversation_id LIKE ?2)", ); let mut param_values: Vec> = vec![ @@ -89,7 +89,7 @@ pub fn count_for_conversation( pub fn get(conn: &Connection, id: &str) -> Result, DbError> { let suffix_pattern = format!("%-{id}"); conn.query_row( - "SELECT id, conversation_id, connection_id, connector, external_id, sender, sender_name, sender_avatar_url, body, timestamp, synced_at, is_archived, reply_to_id, media_type, metadata, context_id + "SELECT id, conversation_id, connection_id, connector, external_id, sender, sender_name, sender_avatar_url, body, timestamp, synced_at, is_archived, reply_to_id, media_type, metadata, context_id, is_saved FROM messages WHERE id = ?1 OR id LIKE ?2", params![id, suffix_pattern], row::row_to_message, @@ -123,7 +123,7 @@ pub fn list_recent( dedup_context: bool, ) -> Result, DbError> { let mut sql = String::from( - "SELECT id, conversation_id, connection_id, connector, external_id, sender, sender_name, sender_avatar_url, body, timestamp, synced_at, is_archived, reply_to_id, media_type, metadata, context_id + "SELECT id, conversation_id, connection_id, connector, external_id, sender, sender_name, sender_avatar_url, body, timestamp, synced_at, is_archived, reply_to_id, media_type, metadata, context_id, is_saved FROM messages WHERE 1=1", ); let mut param_values: Vec> = Vec::new(); diff --git a/crates/void-core/src/db/messages/saved.rs b/crates/void-core/src/db/messages/saved.rs new file mode 100644 index 0000000..ebd9d4b --- /dev/null +++ b/crates/void-core/src/db/messages/saved.rs @@ -0,0 +1,114 @@ +use std::collections::HashSet; + +use rusqlite::{params, Connection}; + +use super::super::row; +use crate::error::DbError; +use crate::models::Message; + +const MESSAGE_COLUMNS: &str = "id, conversation_id, connection_id, connector, external_id, sender, sender_name, sender_avatar_url, body, timestamp, synced_at, is_archived, reply_to_id, media_type, metadata, context_id, is_saved"; + +/// Reconcile `is_saved` for a connection: messages whose external_id is in +/// `saved_external_ids` get `is_saved = 1`, all others get `is_saved = 0`. +pub fn reconcile_saved( + conn: &Connection, + connection_id: &str, + connector: &str, + saved_external_ids: &HashSet, +) -> Result<(usize, usize), DbError> { + let mut stmt = conn.prepare( + "SELECT external_id, is_saved FROM messages WHERE connection_id = ?1 AND connector = ?2", + )?; + let rows: Vec<(String, bool)> = stmt + .query_map(params![connection_id, connector], |row| { + Ok((row.get::<_, String>(0)?, row.get::<_, i32>(1)? != 0)) + })? + .collect::>()?; + + let mut newly_saved = 0usize; + let mut newly_unsaved = 0usize; + + let mut mark_stmt = conn.prepare( + "UPDATE messages SET is_saved = ?3 WHERE connection_id = ?1 AND external_id = ?2", + )?; + + for (ext_id, was_saved) in &rows { + let should_save = saved_external_ids.contains(ext_id); + if should_save != *was_saved { + mark_stmt.execute(params![connection_id, ext_id, should_save as i32])?; + if should_save { + newly_saved += 1; + } else { + newly_unsaved += 1; + } + } + } + + Ok((newly_saved, newly_unsaved)) +} + +pub fn list_saved( + conn: &Connection, + connection_filter: Option<&str>, + connector_filter: Option<&str>, + limit: i64, + offset: i64, +) -> Result, DbError> { + let mut sql = format!("SELECT {MESSAGE_COLUMNS} FROM messages WHERE is_saved = 1"); + let mut param_values: Vec> = Vec::new(); + + if let Some(acct) = connection_filter { + let pattern = format!("%{acct}%"); + sql.push_str(&format!( + " AND connection_id LIKE ?{}", + param_values.len() + 1 + )); + param_values.push(Box::new(pattern)); + } + if let Some(conn_type) = connector_filter { + sql.push_str(&format!(" AND connector = ?{}", param_values.len() + 1)); + param_values.push(Box::new(conn_type.to_string())); + } + + sql.push_str(&format!( + " ORDER BY timestamp DESC LIMIT ?{} OFFSET ?{}", + param_values.len() + 1, + param_values.len() + 2 + )); + param_values.push(Box::new(limit)); + param_values.push(Box::new(offset)); + + let mut stmt = conn.prepare(&sql)?; + let params_ref: Vec<&dyn rusqlite::types::ToSql> = + param_values.iter().map(|p| p.as_ref()).collect(); + let rows = stmt.query_map(params_ref.as_slice(), row::row_to_message)?; + rows.collect::, _>>().map_err(Into::into) +} + +pub fn count_saved( + conn: &Connection, + connection_filter: Option<&str>, + connector_filter: Option<&str>, +) -> Result { + let mut sql = String::from("SELECT COUNT(*) FROM messages WHERE is_saved = 1"); + let mut param_values: Vec> = Vec::new(); + + if let Some(acct) = connection_filter { + let pattern = format!("%{acct}%"); + sql.push_str(&format!( + " AND connection_id LIKE ?{}", + param_values.len() + 1 + )); + param_values.push(Box::new(pattern)); + } + if let Some(conn_type) = connector_filter { + sql.push_str(&format!(" AND connector = ?{}", param_values.len() + 1)); + param_values.push(Box::new(conn_type.to_string())); + } + + let mut stmt = conn.prepare(&sql)?; + let params_ref: Vec<&dyn rusqlite::types::ToSql> = + param_values.iter().map(|p| p.as_ref()).collect(); + let count = stmt.query_row(params_ref.as_slice(), |row| row.get(0))?; + Ok(count) +} diff --git a/crates/void-core/src/db/messages/upsert.rs b/crates/void-core/src/db/messages/upsert.rs index 8ce72c3..2491197 100644 --- a/crates/void-core/src/db/messages/upsert.rs +++ b/crates/void-core/src/db/messages/upsert.rs @@ -24,8 +24,8 @@ pub fn upsert_row(conn: &Connection, msg: &Message) -> Result { let is_new = !message_exists(conn, &msg.connection_id, &msg.external_id)?; let now = chrono::Utc::now().timestamp(); conn.execute( - "INSERT INTO messages (id, conversation_id, connection_id, connector, external_id, sender, sender_name, sender_avatar_url, body, timestamp, synced_at, is_archived, reply_to_id, media_type, metadata, context_id) - VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16) + "INSERT INTO messages (id, conversation_id, connection_id, connector, external_id, sender, sender_name, sender_avatar_url, body, timestamp, synced_at, is_archived, is_saved, reply_to_id, media_type, metadata, context_id) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17) ON CONFLICT(connection_id, external_id) DO UPDATE SET body = excluded.body, connector = excluded.connector, @@ -48,6 +48,7 @@ pub fn upsert_row(conn: &Connection, msg: &Message) -> Result { msg.timestamp, msg.synced_at.unwrap_or(now), msg.is_archived as i32, + msg.is_saved as i32, msg.reply_to_id, msg.media_type, msg.metadata.as_ref().map(|v| v.to_string()), diff --git a/crates/void-core/src/db/row.rs b/crates/void-core/src/db/row.rs index 2d5dc44..076fe25 100644 --- a/crates/void-core/src/db/row.rs +++ b/crates/void-core/src/db/row.rs @@ -53,6 +53,7 @@ pub(crate) fn row_to_message(row: &Row) -> rusqlite::Result { metadata: parse_json_opt(row.get(14)?), context_id: row.get(15)?, context: None, + is_saved: row.get::<_, i32>(16)? != 0, }) } diff --git a/crates/void-core/src/db/schema.rs b/crates/void-core/src/db/schema.rs index 332bb4d..62ec67e 100644 --- a/crates/void-core/src/db/schema.rs +++ b/crates/void-core/src/db/schema.rs @@ -5,7 +5,7 @@ use tracing::debug; use crate::error::DbError; -pub const SCHEMA_VERSION: i32 = 12; +pub const SCHEMA_VERSION: i32 = 13; /// Run all pending migrations on the database connection. pub fn run_migrations(conn: &Connection) -> Result<(), DbError> { @@ -61,6 +61,9 @@ pub fn run_migrations(conn: &Connection) -> Result<(), DbError> { if version < 12 { migrate_v12(conn)?; } + if version < 13 { + migrate_v13(conn)?; + } Ok(()) } @@ -302,3 +305,16 @@ fn migrate_v12(conn: &Connection) -> Result<(), DbError> { )?; Ok(()) } + +fn migrate_v13(conn: &Connection) -> Result<(), DbError> { + debug!("running migration v13: add is_saved to messages"); + conn.execute_batch( + " + ALTER TABLE messages ADD COLUMN is_saved INTEGER NOT NULL DEFAULT 0; + CREATE INDEX IF NOT EXISTS idx_messages_is_saved ON messages(is_saved, timestamp DESC); + + INSERT OR REPLACE INTO schema_version (version) VALUES (13); + ", + )?; + Ok(()) +} diff --git a/crates/void-core/src/db/search.rs b/crates/void-core/src/db/search.rs index f3eceee..64d10c8 100644 --- a/crates/void-core/src/db/search.rs +++ b/crates/void-core/src/db/search.rs @@ -91,7 +91,7 @@ impl Database { let terms: Vec<&str> = query.split_whitespace().collect(); let escaped = fts5_escape(query); let mut sql = String::from( - "SELECT m.id, m.conversation_id, m.connection_id, m.connector, m.external_id, m.sender, m.sender_name, m.sender_avatar_url, m.body, m.timestamp, m.synced_at, m.is_archived, m.reply_to_id, m.media_type, m.metadata, m.context_id + "SELECT m.id, m.conversation_id, m.connection_id, m.connector, m.external_id, m.sender, m.sender_name, m.sender_avatar_url, m.body, m.timestamp, m.synced_at, m.is_archived, m.reply_to_id, m.media_type, m.metadata, m.context_id, m.is_saved FROM messages_fts fts JOIN messages m ON m.rowid = fts.rowid WHERE messages_fts MATCH ?1", @@ -194,7 +194,7 @@ impl Database { } let mut sql = format!( - "SELECT m.id, m.conversation_id, m.connection_id, m.connector, m.external_id, m.sender, m.sender_name, m.sender_avatar_url, m.body, m.timestamp, m.synced_at, m.is_archived, m.reply_to_id, m.media_type, m.metadata, m.context_id + "SELECT m.id, m.conversation_id, m.connection_id, m.connector, m.external_id, m.sender, m.sender_name, m.sender_avatar_url, m.body, m.timestamp, m.synced_at, m.is_archived, m.reply_to_id, m.media_type, m.metadata, m.context_id, m.is_saved FROM messages m WHERE 1=1{name_clause}", ); diff --git a/crates/void-core/src/db/tests/crud.rs b/crates/void-core/src/db/tests/crud.rs index a37de38..dab052b 100644 --- a/crates/void-core/src/db/tests/crud.rs +++ b/crates/void-core/src/db/tests/crud.rs @@ -232,6 +232,7 @@ fn find_slack_message_by_link_ignores_connection_naming() { timestamp: 1_776_936_528, synced_at: None, is_archived: false, + is_saved: false, reply_to_id: None, media_type: None, metadata: None, @@ -391,6 +392,15 @@ fn migrate_whatsapp_jid_connections_merges_jid_rows_into_config_name() { .unwrap() .execute("DELETE FROM schema_version WHERE version >= 12", []) .unwrap(); + db.conn() + .unwrap() + .execute_batch( + " + DROP INDEX IF EXISTS idx_messages_is_saved; + ALTER TABLE messages DROP COLUMN is_saved; + ", + ) + .unwrap(); db.conn() .unwrap() .execute( @@ -471,7 +481,7 @@ fn schema_snapshot_matches_expected() { let names: Vec<&str> = rows.iter().map(|(n, _)| n.as_str()).collect(); - // Expected object names at SCHEMA_VERSION = 11. Includes FTS5 shadow tables + // Expected object names at SCHEMA_VERSION = 13. Includes FTS5 shadow tables // (messages_fts_*) created automatically by the virtual table. let expected = [ "conversations", @@ -479,6 +489,7 @@ fn schema_snapshot_matches_expected() { "hook_logs", "idx_hook_logs_started", "idx_messages_context_id", + "idx_messages_is_saved", "messages", "messages_ad", "messages_ai", @@ -517,6 +528,7 @@ fn schema_snapshot_core_table_columns_present() { "connector", "synced_at", "is_archived", + "is_saved", "context_id", "sender_avatar_url", ] { @@ -642,6 +654,15 @@ fn migrations_preserve_existing_data() { assert_eq!(conn_id, "legacy-acct"); assert_eq!(body, "preserved body"); + let is_saved: i32 = conn + .query_row( + "SELECT is_saved FROM messages WHERE id = 'mg1'", + [], + |r| r.get(0), + ) + .unwrap(); + assert_eq!(is_saved, 0, "is_saved should default to 0 after migration"); + let conv_conn: String = conn .query_row( "SELECT connection_id FROM conversations WHERE id = 'cv1'", diff --git a/crates/void-core/src/db/tests/mod.rs b/crates/void-core/src/db/tests/mod.rs index 65cc703..7cd3f18 100644 --- a/crates/void-core/src/db/tests/mod.rs +++ b/crates/void-core/src/db/tests/mod.rs @@ -6,4 +6,5 @@ mod crud; mod dedup; mod fixtures; mod mute; +mod saved; mod search; diff --git a/crates/void-core/src/db/tests/saved.rs b/crates/void-core/src/db/tests/saved.rs new file mode 100644 index 0000000..4d8bc57 --- /dev/null +++ b/crates/void-core/src/db/tests/saved.rs @@ -0,0 +1,130 @@ +use std::collections::HashSet; + +use super::fixtures::*; + +#[test] +fn reconcile_saved_marks_correct_messages() { + let db = test_db(); + let conv = make_conversation("c1", "test-slack", "C123"); + db.upsert_conversation(&conv).unwrap(); + + let mut m1 = make_message("m1", "c1", "test-slack", "saved one", 1_000); + m1.external_id = "ts-1".into(); + db.upsert_message(&m1).unwrap(); + + let mut m2 = make_message("m2", "c1", "test-slack", "not saved", 2_000); + m2.external_id = "ts-2".into(); + db.upsert_message(&m2).unwrap(); + + let saved = HashSet::from(["ts-1".to_string()]); + let (newly_saved, newly_unsaved) = db + .reconcile_saved("test-slack", "slack", &saved) + .unwrap(); + assert_eq!(newly_saved, 1); + assert_eq!(newly_unsaved, 0); + + let loaded = db.get_message("m1").unwrap().unwrap(); + assert!(loaded.is_saved); + let other = db.get_message("m2").unwrap().unwrap(); + assert!(!other.is_saved); +} + +#[test] +fn reconcile_saved_clears_previously_saved() { + let db = test_db(); + let conv = make_conversation("c1", "test-slack", "C123"); + db.upsert_conversation(&conv).unwrap(); + + let mut m1 = make_message("m1", "c1", "test-slack", "was saved", 1_000); + m1.external_id = "ts-1".into(); + db.upsert_message(&m1).unwrap(); + + let saved = HashSet::from(["ts-1".to_string()]); + db.reconcile_saved("test-slack", "slack", &saved) + .unwrap(); + + let empty = HashSet::new(); + let (newly_saved, newly_unsaved) = db + .reconcile_saved("test-slack", "slack", &empty) + .unwrap(); + assert_eq!(newly_saved, 0); + assert_eq!(newly_unsaved, 1); + + let loaded = db.get_message("m1").unwrap().unwrap(); + assert!(!loaded.is_saved); +} + +#[test] +fn list_saved_returns_only_saved() { + let db = test_db(); + let conv = make_conversation("c1", "test-slack", "C123"); + db.upsert_conversation(&conv).unwrap(); + + let mut m1 = make_message("m1", "c1", "test-slack", "saved", 1_000); + m1.external_id = "ts-1".into(); + db.upsert_message(&m1).unwrap(); + + let mut m2 = make_message("m2", "c1", "test-slack", "not saved", 2_000); + m2.external_id = "ts-2".into(); + db.upsert_message(&m2).unwrap(); + + db.reconcile_saved("test-slack", "slack", &HashSet::from(["ts-1".to_string()])) + .unwrap(); + + let (rows, total) = db.list_saved_messages(None, None, 50, 0).unwrap(); + assert_eq!(total, 1); + assert_eq!(rows.len(), 1); + assert_eq!(rows[0].id, "m1"); +} + +#[test] +fn list_saved_respects_connection_filter() { + let db = test_db(); + let conv1 = make_conversation("c1", "work-slack", "C1"); + let conv2 = make_conversation("c2", "home-slack", "C2"); + db.upsert_conversation(&conv1).unwrap(); + db.upsert_conversation(&conv2).unwrap(); + + let mut m1 = make_message("m1", "c1", "work-slack", "work saved", 1_000); + m1.external_id = "ts-1".into(); + db.upsert_message(&m1).unwrap(); + + let mut m2 = make_message("m2", "c2", "home-slack", "home saved", 2_000); + m2.external_id = "ts-2".into(); + db.upsert_message(&m2).unwrap(); + + db.reconcile_saved("work-slack", "slack", &HashSet::from(["ts-1".to_string()])) + .unwrap(); + db.reconcile_saved("home-slack", "slack", &HashSet::from(["ts-2".to_string()])) + .unwrap(); + + let (rows, total) = db + .list_saved_messages(Some("work"), None, 50, 0) + .unwrap(); + assert_eq!(total, 1); + assert_eq!(rows[0].connection_id, "work-slack"); +} + +#[test] +fn count_saved_matches_list() { + let db = test_db(); + let conv = make_conversation("c1", "test-slack", "C123"); + db.upsert_conversation(&conv).unwrap(); + + for (id, ext, ts) in [("m1", "ts-1", 1_000), ("m2", "ts-2", 2_000)] { + let mut msg = make_message(id, "c1", "test-slack", "saved", ts); + msg.external_id = ext.into(); + db.upsert_message(&msg).unwrap(); + } + + db.reconcile_saved( + "test-slack", + "slack", + &HashSet::from(["ts-1".to_string(), "ts-2".to_string()]), + ) + .unwrap(); + + let (rows, total) = db.list_saved_messages(None, None, 50, 0).unwrap(); + assert_eq!(total, 2); + assert_eq!(rows.len(), 2); +} diff --git a/crates/void-core/src/hooks/tests.rs b/crates/void-core/src/hooks/tests.rs index 3205d17..6294d10 100644 --- a/crates/void-core/src/hooks/tests.rs +++ b/crates/void-core/src/hooks/tests.rs @@ -193,6 +193,7 @@ fn expand_placeholders_with_message() { timestamp: 1_700_000_000, synced_at: None, is_archived: false, + is_saved: false, reply_to_id: None, media_type: None, metadata: None, diff --git a/crates/void-core/src/models/message.rs b/crates/void-core/src/models/message.rs index 3dab802..83fa691 100644 --- a/crates/void-core/src/models/message.rs +++ b/crates/void-core/src/models/message.rs @@ -35,6 +35,8 @@ pub struct Message { #[serde(with = "epoch_iso8601_opt")] pub synced_at: Option, pub is_archived: bool, + #[serde(default)] + pub is_saved: bool, pub reply_to_id: Option, pub media_type: Option, pub metadata: Option, diff --git a/crates/void-core/src/models/tests.rs b/crates/void-core/src/models/tests.rs index 1f0dd5f..76e6b79 100644 --- a/crates/void-core/src/models/tests.rs +++ b/crates/void-core/src/models/tests.rs @@ -46,6 +46,7 @@ fn message_serialization_roundtrip() { timestamp: 1_700_000_000, synced_at: Some(1_700_000_010), is_archived: false, + is_saved: false, reply_to_id: None, media_type: None, metadata: None, @@ -256,6 +257,7 @@ fn make_msg_ts(id: &str, ts: i64, ctx_id: Option<&str>) -> Message { timestamp: ts, synced_at: None, is_archived: false, + is_saved: false, reply_to_id: None, media_type: None, metadata: None, diff --git a/crates/void-core/src/test_fixtures.rs b/crates/void-core/src/test_fixtures.rs index 8c7f0e9..870a1b9 100644 --- a/crates/void-core/src/test_fixtures.rs +++ b/crates/void-core/src/test_fixtures.rs @@ -49,6 +49,7 @@ pub fn make_message(id: &str, conv_id: &str, connection_id: &str, body: &str, ts timestamp: ts, synced_at: None, is_archived: false, + is_saved: false, reply_to_id: None, media_type: None, metadata: None, @@ -77,6 +78,7 @@ pub fn make_message_with_sender( timestamp: ts, synced_at: None, is_archived: false, + is_saved: false, reply_to_id: None, media_type: None, metadata: None, diff --git a/crates/void-gmail/src/connector/sync.rs b/crates/void-gmail/src/connector/sync.rs index 0aeb77c..806c517 100644 --- a/crates/void-gmail/src/connector/sync.rs +++ b/crates/void-gmail/src/connector/sync.rs @@ -316,6 +316,7 @@ impl GmailConnector { .label_ids .as_ref() .is_some_and(|labels| labels.iter().any(|l| l == "INBOX")), + is_saved: false, reply_to_id: msg .get_header("In-Reply-To") .map(|v| format!("{}-{v}", connection_id)), diff --git a/crates/void-gmail/src/connector/tests.rs b/crates/void-gmail/src/connector/tests.rs index 5d7874a..a649aee 100644 --- a/crates/void-gmail/src/connector/tests.rs +++ b/crates/void-gmail/src/connector/tests.rs @@ -196,6 +196,7 @@ async fn initial_sync_saves_history_id() { .unwrap_or(0), synced_at: None, is_archived: false, + is_saved: false, reply_to_id: None, media_type: None, metadata: None, @@ -328,6 +329,7 @@ async fn incremental_sync_uses_history_id() { .unwrap_or(0), synced_at: None, is_archived: false, + is_saved: false, reply_to_id: None, media_type: None, metadata: None, diff --git a/crates/void-googlenews/src/connector/sync.rs b/crates/void-googlenews/src/connector/sync.rs index 5a1f893..e095008 100644 --- a/crates/void-googlenews/src/connector/sync.rs +++ b/crates/void-googlenews/src/connector/sync.rs @@ -251,6 +251,7 @@ fn build_message( timestamp, synced_at: Some(chrono::Utc::now().timestamp()), is_archived: false, + is_saved: false, reply_to_id: None, media_type: None, metadata: Some(metadata), diff --git a/crates/void-hackernews/src/connector/sync.rs b/crates/void-hackernews/src/connector/sync.rs index 50c5e46..7da1371 100644 --- a/crates/void-hackernews/src/connector/sync.rs +++ b/crates/void-hackernews/src/connector/sync.rs @@ -240,6 +240,7 @@ fn build_message(item: &HnItem, connection_id: &str, conv_id: &str) -> Message { timestamp: item.time.unwrap_or_else(|| chrono::Utc::now().timestamp()), synced_at: Some(chrono::Utc::now().timestamp()), is_archived: false, + is_saved: false, reply_to_id: None, media_type: None, metadata: Some(metadata), diff --git a/crates/void-linkedin/src/connector/posts_sync.rs b/crates/void-linkedin/src/connector/posts_sync.rs index bb0e1fd..fc48168 100644 --- a/crates/void-linkedin/src/connector/posts_sync.rs +++ b/crates/void-linkedin/src/connector/posts_sync.rs @@ -408,6 +408,7 @@ fn comment_to_void(input: CommentVoidInput<'_>) -> Message { timestamp: comment_timestamp(comment), synced_at: Some(chrono::Utc::now().timestamp()), is_archived: false, + is_saved: false, reply_to_id: None, media_type: None, metadata: Some(serde_json::json!({ diff --git a/crates/void-linkedin/src/connector/sync.rs b/crates/void-linkedin/src/connector/sync.rs index f8d0628..70185e7 100644 --- a/crates/void-linkedin/src/connector/sync.rs +++ b/crates/void-linkedin/src/connector/sync.rs @@ -571,6 +571,7 @@ pub(super) fn message_to_void( timestamp: extract::parse_timestamp(msg.timestamp.as_deref()), synced_at: Some(chrono::Utc::now().timestamp()), is_archived: false, + is_saved: false, reply_to_id: None, media_type: extract::extract_media_type(msg), metadata: build_message_metadata(msg, profile), diff --git a/crates/void-slack/src/api/conversations.rs b/crates/void-slack/src/api/conversations.rs index 8726daf..193cf0a 100644 --- a/crates/void-slack/src/api/conversations.rs +++ b/crates/void-slack/src/api/conversations.rs @@ -214,4 +214,27 @@ impl SlackApiClient { debug!(channel_id = ?result.channel.id, "slack: conversations.open success"); Ok(result) } + + /// Search for messages saved for later (`is:saved` modifier). + pub async fn search_messages_saved( + &self, + cursor: Option<&str>, + count: u32, + ) -> Result { + let mut params: Vec<(&str, String)> = vec![ + ("query", "is:saved".into()), + ("sort", "timestamp".into()), + ("sort_dir", "desc".into()), + ("count", count.to_string()), + ]; + if let Some(c) = cursor { + params.push(("cursor", c.to_string())); + } + self.get_with_retry( + &format!("{}/search.messages", self.base_url), + ¶ms, + "search.messages (saved)", + ) + .await + } } diff --git a/crates/void-slack/src/api/tests.rs b/crates/void-slack/src/api/tests.rs index 590a709..dc2b3fe 100644 --- a/crates/void-slack/src/api/tests.rs +++ b/crates/void-slack/src/api/tests.rs @@ -199,3 +199,35 @@ async fn malformed_history_missing_messages_is_clean_err() { "got {err:?}" ); } + +#[tokio::test] +async fn search_messages_saved_parses_response() { + let server = MockServer::start().await; + Mock::given(method("GET")) + .and(path("/search.messages")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "ok": true, + "messages": { + "matches": [ + { + "ts": "1700000000.000100", + "channel": {"id": "C1"}, + "user": "U1", + "text": "saved item", + "permalink": "https://example.slack.com/archives/C1/p1700000000000100" + } + ], + "pagination": {"next_cursor": "cursor-2"} + }, + "response_metadata": {"next_cursor": "cursor-2"} + }))) + .mount(&server) + .await; + + let api = SlackApiClient::with_base_url("xoxp-test", &server.uri()).unwrap(); + let resp = api.search_messages_saved(None, 20).await.unwrap(); + assert_eq!(resp.messages.matches.len(), 1); + assert_eq!(resp.messages.matches[0].ts, "1700000000.000100"); + assert_eq!(resp.messages.matches[0].channel.id, "C1"); + assert_eq!(resp.messages.matches[0].text.as_deref(), Some("saved item")); +} diff --git a/crates/void-slack/src/api/types.rs b/crates/void-slack/src/api/types.rs index 4b48bd2..9735c5e 100644 --- a/crates/void-slack/src/api/types.rs +++ b/crates/void-slack/src/api/types.rs @@ -202,3 +202,34 @@ pub struct FilesUploadUrlResponse { pub upload_url: String, pub file_id: String, } + +#[derive(Debug, Deserialize)] +pub struct SearchMessagesResponse { + pub messages: SearchMessagesMatches, + pub response_metadata: Option, +} + +#[derive(Debug, Deserialize)] +pub struct SearchMessagesMatches { + pub matches: Vec, + pub pagination: Option, +} + +#[derive(Debug, Deserialize)] +pub struct SearchMatch { + pub channel: SearchMatchChannel, + pub ts: String, + pub text: Option, + pub user: Option, + pub permalink: Option, +} + +#[derive(Debug, Deserialize)] +pub struct SearchMatchChannel { + pub id: String, +} + +#[derive(Debug, Deserialize)] +pub struct SearchPagination { + pub next_cursor: Option, +} diff --git a/crates/void-slack/src/connector/connector_trait.rs b/crates/void-slack/src/connector/connector_trait.rs index 99f8706..84e3d8c 100644 --- a/crates/void-slack/src/connector/connector_trait.rs +++ b/crates/void-slack/src/connector/connector_trait.rs @@ -75,6 +75,13 @@ impl Connector for SlackConnector { warn!(connection_id = %self.connection_id, error = %e, "Slack catch-up failed"); } } + if let Err(e) = self.sync_saved(&db).await { + warn!( + connection_id = %self.connection_id, + error = %e, + "saved sync failed (non-fatal)" + ); + } }; let (_, socket_result) = tokio::join!(backfill_task, self.run_socket_mode(&db, &cancel)); diff --git a/crates/void-slack/src/connector/mapping.rs b/crates/void-slack/src/connector/mapping.rs index 3ee8508..8b5b658 100644 --- a/crates/void-slack/src/connector/mapping.rs +++ b/crates/void-slack/src/connector/mapping.rs @@ -152,6 +152,7 @@ pub(crate) fn map_message_cached( timestamp: parse_ts(&msg.ts).unwrap_or(0), synced_at: None, is_archived: false, + is_saved: false, reply_to_id: msg .thread_ts .as_ref() diff --git a/crates/void-slack/src/connector/socket_mode.rs b/crates/void-slack/src/connector/socket_mode.rs index 8934bd9..c71fc13 100644 --- a/crates/void-slack/src/connector/socket_mode.rs +++ b/crates/void-slack/src/connector/socket_mode.rs @@ -172,6 +172,13 @@ impl SlackConnector { "catch-up after reconnect failed" ); } + if let Err(e) = self.sync_saved(db).await { + warn!( + connection_id = %self.connection_id, + error = %e, + "saved sync after reconnect failed" + ); + } void_core::status!( "[slack:{}] reconnecting Socket Mode in 2s...", @@ -361,8 +368,9 @@ impl SlackConnector { body: Some(body), timestamp, synced_at: None, - is_archived: false, - reply_to_id: thread_ts.map(|tts| format!("{}-{tts}", self.connection_id)), + is_archived: false, + is_saved: false, + reply_to_id: thread_ts.map(|tts| format!("{}-{tts}", self.connection_id)), media_type, metadata, context_id, diff --git a/crates/void-slack/src/connector/sync.rs b/crates/void-slack/src/connector/sync.rs index 281882d..4f6a136 100644 --- a/crates/void-slack/src/connector/sync.rs +++ b/crates/void-slack/src/connector/sync.rs @@ -254,6 +254,136 @@ impl SlackConnector { Ok(()) } + pub(crate) async fn sync_saved(&self, db: &Database) -> anyhow::Result<()> { + use std::collections::HashSet; + + use tracing::warn; + + info!(connection_id = %self.connection_id, "syncing Slack saved-for-later items"); + + let user_cache = self.prefetch_users().await?; + + let mut saved_external_ids = HashSet::new(); + let mut cursor: Option = None; + let page_size = 100u32; + let mut slack_matches = 0usize; + let mut ingested = 0usize; + + loop { + let resp = self + .api + .search_messages_saved(cursor.as_deref(), page_size) + .await?; + + for m in &resp.messages.matches { + slack_matches += 1; + if db + .find_message_by_external_id(&self.connection_id, &m.ts)? + .is_some() + { + saved_external_ids.insert(m.ts.clone()); + } else if self + .ingest_saved_match(db, &m.channel.id, &m.ts, &user_cache) + .await? + { + ingested += 1; + saved_external_ids.insert(m.ts.clone()); + } else { + warn!( + connection_id = %self.connection_id, + channel_id = %m.channel.id, + ts = %m.ts, + "saved message could not be ingested" + ); + } + } + + let next = resp + .response_metadata + .as_ref() + .and_then(|m| m.next_cursor.as_ref()) + .or_else(|| { + resp.messages + .pagination + .as_ref() + .and_then(|p| p.next_cursor.as_ref()) + }) + .filter(|c| !c.is_empty()) + .cloned(); + + if next.is_none() { + break; + } + cursor = next; + } + + let (newly_saved, newly_unsaved) = + db.reconcile_saved(&self.connection_id, "slack", &saved_external_ids)?; + + void_core::status!( + "[slack:{}] saved sync — {} matched ({} fetched), {} newly saved, {} unsaved", + self.connection_id, + saved_external_ids.len(), + ingested, + newly_saved, + newly_unsaved + ); + + info!( + connection_id = %self.connection_id, + slack_matches, + matched = saved_external_ids.len(), + ingested, + newly_saved, + newly_unsaved, + "saved-for-later sync complete" + ); + + Ok(()) + } + + /// Fetch a saved-for-later message from Slack and store it locally. + async fn ingest_saved_match( + &self, + db: &Database, + channel_id: &str, + ts: &str, + user_cache: &HashMap, + ) -> anyhow::Result { + use tracing::debug; + + debug!( + connection_id = %self.connection_id, + channel_id, + ts, + "fetching saved message not in local DB" + ); + + let slack_conv = self.api.conversations_info(channel_id).await?; + let conversation = map_conversation(&slack_conv, &self.connection_id, user_cache); + db.upsert_conversation(&conversation)?; + + let conv_id = conversation.id.clone(); + let Some(slack_msg) = self.api.get_single_message(channel_id, ts).await? else { + return Ok(false); + }; + + let Some(message) = map_message_cached( + &slack_msg, + &slack_conv, + &conv_id, + &self.connection_id, + user_cache, + ) else { + return Ok(false); + }; + + let mut batch = [message]; + self.download_message_files(&mut batch).await; + db.upsert_message(&batch[0])?; + Ok(true) + } + /// Backfill avatar URLs: first from the prefetched user cache, then resolve /// remaining unknown senders individually via `users.info`. async fn backfill_avatars(&self, db: &Database, user_cache: &HashMap) { diff --git a/crates/void-slack/src/connector/tests.rs b/crates/void-slack/src/connector/tests.rs index 70f50dc..cf71002 100644 --- a/crates/void-slack/src/connector/tests.rs +++ b/crates/void-slack/src/connector/tests.rs @@ -881,6 +881,7 @@ async fn catch_up_fetches_messages_since_latest() { timestamp: 1_741_700_000, synced_at: None, is_archived: false, + is_saved: false, reply_to_id: None, media_type: None, metadata: None, @@ -909,6 +910,111 @@ async fn catch_up_fetches_messages_since_latest() { assert_eq!(new_msg.sender_name.as_deref(), Some("Alice")); } +#[tokio::test] +async fn sync_saved_fetches_missing_message_and_marks_saved() { + let server = wiremock::MockServer::start().await; + + let users = serde_json::json!({ + "ok": true, + "members": [ + { + "id": "U1", + "name": "alice", + "real_name": "Alice", + "profile": {"display_name": "Alice", "real_name": "Alice"} + } + ] + }); + wiremock::Mock::given(wiremock::matchers::method("GET")) + .and(wiremock::matchers::path("/users.list")) + .respond_with(wiremock::ResponseTemplate::new(200).set_body_json(users)) + .mount(&server) + .await; + + let search = serde_json::json!({ + "ok": true, + "messages": { + "matches": [ + { + "ts": "1741700000.000100", + "channel": {"id": "C1"}, + "user": "U1", + "text": "Saved item" + } + ] + } + }); + wiremock::Mock::given(wiremock::matchers::method("GET")) + .and(wiremock::matchers::path("/search.messages")) + .and(wiremock::matchers::query_param("query", "is:saved")) + .respond_with(wiremock::ResponseTemplate::new(200).set_body_json(search)) + .mount(&server) + .await; + + let channel_info = serde_json::json!({ + "ok": true, + "channel": { + "id": "C1", + "name": "general", + "is_channel": true, + "is_group": false, + "is_im": false, + "is_mpim": false, + "is_private": false + } + }); + wiremock::Mock::given(wiremock::matchers::method("GET")) + .and(wiremock::matchers::path("/conversations.info")) + .and(wiremock::matchers::query_param("channel", "C1")) + .respond_with(wiremock::ResponseTemplate::new(200).set_body_json(channel_info)) + .mount(&server) + .await; + + let history = serde_json::json!({ + "ok": true, + "messages": [ + { + "ts": "1741700000.000100", + "user": "U1", + "text": "Saved item" + } + ] + }); + wiremock::Mock::given(wiremock::matchers::method("GET")) + .and(wiremock::matchers::path("/conversations.history")) + .and(wiremock::matchers::query_param("channel", "C1")) + .and(wiremock::matchers::query_param("latest", "1741700000.000100")) + .and(wiremock::matchers::query_param("oldest", "1741700000.000100")) + .respond_with(wiremock::ResponseTemplate::new(200).set_body_json(history)) + .mount(&server) + .await; + + let connector = SlackConnector { + connection_id: "test-slack".to_string(), + api: crate::api::SlackApiClient::with_base_url("test-token", &server.uri()).unwrap(), + app_token: "xapp-test".to_string(), + app_id: None, + config_refresh_token: std::sync::Mutex::new(None), + config_path: None, + store_path: std::env::temp_dir(), + }; + + let db = void_core::db::Database::open_in_memory().unwrap(); + connector.sync_saved(&db).await.unwrap(); + + let msg = db + .get_message("test-slack-1741700000.000100") + .unwrap() + .expect("message should be ingested"); + assert_eq!(msg.body.as_deref(), Some("Saved item")); + assert!(msg.is_saved); + + let (rows, total) = db.list_saved_messages(None, None, 50, 0).unwrap(); + assert_eq!(total, 1); + assert_eq!(rows.len(), 1); + assert_eq!(rows[0].id, "test-slack-1741700000.000100"); +} + #[tokio::test] async fn start_sync_runs_catch_up_when_backfill_done() { let server = wiremock::MockServer::start().await; @@ -987,6 +1093,7 @@ async fn start_sync_runs_catch_up_when_backfill_done() { timestamp: 1_741_700_000, synced_at: None, is_archived: false, + is_saved: false, reply_to_id: None, media_type: None, metadata: None, diff --git a/crates/void-telegram/src/connector/sync.rs b/crates/void-telegram/src/connector/sync.rs index 07f5e91..5be96c6 100644 --- a/crates/void-telegram/src/connector/sync.rs +++ b/crates/void-telegram/src/connector/sync.rs @@ -267,6 +267,7 @@ fn tg_message_to_void( timestamp: msg.date().timestamp(), synced_at: Some(chrono::Utc::now().timestamp()), is_archived: false, + is_saved: false, reply_to_id, media_type: extract::extract_media_type(msg), metadata: extract::extract_media_metadata(msg), diff --git a/crates/void-whatsapp/src/connector/sync.rs b/crates/void-whatsapp/src/connector/sync.rs index 5268a13..a87caa1 100644 --- a/crates/void-whatsapp/src/connector/sync.rs +++ b/crates/void-whatsapp/src/connector/sync.rs @@ -160,6 +160,7 @@ pub(super) fn handle_history_sync( timestamp: *msg_ts, synced_at: None, is_archived: false, + is_saved: false, reply_to_id, media_type, metadata: media_metadata, @@ -283,6 +284,7 @@ pub(super) fn handle_message( timestamp: msg_ts, synced_at: None, is_archived: false, + is_saved: false, reply_to_id: extract_quoted_id(msg), media_type, metadata: media_metadata, diff --git a/docs/commands.md b/docs/commands.md index f843d8f..e9deec6 100644 --- a/docs/commands.md +++ b/docs/commands.md @@ -110,6 +110,7 @@ All Slack subcommands accept `--connection `. | `void slack schedule --channel --message --at