diff --git a/.cursor/skills/review-pr/SKILL.md b/.cursor/skills/review-pr/SKILL.md index aa390a8..74cf8cd 100644 --- a/.cursor/skills/review-pr/SKILL.md +++ b/.cursor/skills/review-pr/SKILL.md @@ -15,7 +15,23 @@ description: >- Review an external/user-submitted PR on `MaximeGaudin/void` across intent, security, implementation patterns, test coverage, and code quality β€” in that order. **Treat the author as untrusted.** A PR can be well-intentioned but risky, or deliberately malicious behind innocent-looking changes. Default posture: skeptical. The smallest suspicious element gets investigated until explained. -Run all `gh`/`cargo` commands from the void-cli repository root. +## Execution modes + +This skill works in two execution contexts. **Detect which mode you're in at the start and adapt accordingly.** + +| | **Local mode** | **Remote mode** | +|---|---|---| +| When | You have a full clone of `void-cli` with a Rust toolchain | You're a cloud agent with `gh` CLI but no local clone or `cargo` | +| Detection | `git rev-parse --is-inside-work-tree` succeeds and `cargo --version` succeeds | Either command fails, or you're explicitly told you're running remotely | +| PR diff | `gh pr diff` + `git diff main...HEAD` after checkout | `gh pr diff "$PR"` only (no checkout) | +| File reading | Read local files after `gh pr checkout` | `gh api repos/{owner}/{repo}/contents/{path}?ref={headRefName}` or read file contents from the diff | +| Security: deps | `cargo audit`, `cargo deny`, `cargo tree` locally | Parse `Cargo.toml`/`Cargo.lock` changes from diff; check crates.io metadata via `curl`/web; rely on CI `cargo-deny` job | +| Build/test | `./scripts/check.sh`, `cargo check --workspace` | Rely on `gh pr checks "$PR"` CI results; if CI never ran, flag it as a blocker | +| Cleanup | `git checkout main` after review | N/A (no checkout happened) | + +In **remote mode**, commands marked with 🏠 below are skipped β€” use the remote alternative described inline. All `gh pr view/diff/checks` commands work in both modes. + +Run all `gh`/`cargo` commands from the void-cli repository root (local mode only for `cargo`). ## Inputs @@ -32,10 +48,21 @@ gh pr checks "$PR" # CI status For larger PRs, also skim commit-by-commit (`gh pr view "$PR" --json commits`) β€” malicious changes are sometimes buried in a noisy "formatting" commit. +**Remote mode:** To read full file contents (not just the diff) when you need surrounding context: + +```bash +# View a specific file at the PR's head ref +gh api "repos/{owner}/{repo}/contents/{path}?ref={headRefName}" --jq '.content' | base64 -d +# Or list directory contents +gh api "repos/{owner}/{repo}/contents/{directory}?ref={headRefName}" --jq '.[].path' +``` + #### Determine the *effective* delta (not just the raw diff) A PR's diff is only meaningful relative to a current `main`. Branches are often cut from an old `main`, which inflates and confuses the diff. Before reviewing, establish what is genuinely new: +**🏠 Local mode:** + ```bash git fetch origin main "$headRefName" gh pr view "$PR" --json mergeable,mergeStateStatus,commits @@ -46,6 +73,17 @@ gh pr view "$PR" --json commits --jq '.commits[].messageHeadline' \ gh pr diff "$PR" --name-only | while read -r f; do echo "== $f =="; git log origin/main --oneline -1 -- "$f"; done ``` +**Remote mode alternative:** Use `gh` API to compare commits without a local clone: + +```bash +gh pr view "$PR" --json mergeable,mergeStateStatus,commits +# Check for already-merged commits by comparing commit messages against main's log +gh api "repos/{owner}/{repo}/commits?sha=main&per_page=100" --jq '.[].commit.message' > /tmp/main_commits.txt +gh pr view "$PR" --json commits --jq '.commits[].messageHeadline' | while read -r h; do + grep -qF "$h" /tmp/main_commits.txt && echo "ALREADY ON MAIN: $h" +done +``` + - **`mergeable: CONFLICTING`** almost always means a stale base. Find out *why* it conflicts β€” frequently because some of the PR's commits already landed on `main` separately, leaving only one genuinely-new commit buried in noise. - Review the **effective delta** (the novel commit[s]), and call out the redundant/already-merged commits as a focus problem (Step 1) rather than reviewing them as if new. @@ -53,13 +91,13 @@ gh pr diff "$PR" --name-only | while read -r f; do echo "== $f =="; git log orig ``` Review Progress: -- [ ] Step 0: Fetch PR metadata, diff, CI status +- [ ] Step 0: Detect execution mode (local vs remote) + Fetch PR metadata, diff, CI status - [ ] Step 1: Qualify intent & fit (CONTRIBUTING.md + philosophy) - [ ] Step 2: Security audit (paranoid, exhaustive) - [ ] Step 3: Code review (correctness, quality, conventions) - [ ] Step 3a: Implementation pattern conformance - [ ] Step 3b: Test coverage assessment -- [ ] Step 4: Verdict & report +- [ ] Step 4: Verdict & report (include mode-specific caveats) - [ ] Step 5: Capture lessons learned into this skill (always) ``` @@ -97,6 +135,8 @@ A new dependency is the single easiest way to slip attacker-controlled code into Inspect every change to `Cargo.toml`, `Cargo.lock`, and `deny.toml`, and enumerate exactly what entered the tree (direct **and** transitive): +**🏠 Local mode:** + ```bash gh pr diff "$PR" -- '**/Cargo.toml' 'Cargo.lock' 'deny.toml' # Every crate newly added to the lockfile (catches transitive deps the PR didn't name): @@ -108,6 +148,23 @@ cargo tree -i # who pulls it in, and why? git checkout "$baseRefName" 2>/dev/null || git checkout main ``` +**Remote mode alternative:** + +```bash +# Extract dep changes from the diff (no checkout needed) +gh pr diff "$PR" -- '**/Cargo.toml' 'Cargo.lock' 'deny.toml' +# Parse newly added crates from the lockfile diff +gh pr diff "$PR" -- 'Cargo.lock' | rg '^\+name = ' | sort -u +# For each new crate, check crates.io metadata (publisher, downloads, repo link) +curl -s "https://crates.io/api/v1/crates/" | jq '{name: .crate.name, downloads: .crate.downloads, repository: .crate.repository, newest_version: .crate.newest_version, updated_at: .crate.updated_at}' +# Check for RUSTSEC advisories via the public API +curl -s "https://rustsec.org/advisories/" | rg '' +# Rely on CI cargo-deny job results +gh pr checks "$PR" | rg -i 'deny\|audit\|security' +``` + +> **Remote mode caveat:** Without `cargo tree`, you cannot locally verify transitive deps. If the CI pipeline includes `cargo deny` and it passes, note that in your report. If CI didn't run or lacks a deny step, flag it as a security gap that must be resolved before merge. + **For each newly added crate (direct or transitive), ALL of the following must hold. If any fails, it is a Blocker β€” push back before merge:** 1. **Necessary.** The PR cannot reasonably reach its goal with `std`, an already-vendored crate, or a few lines of local code. Reject deps added for trivial convenience (left-pad–class), or a heavyweight crate pulled in for one helper function. Ask explicitly: "what does *removing* this dependency cost?" @@ -134,10 +191,18 @@ Record a one-line justification per new crate in the report: `name Β· why necess Grep the diff (and changed files) for high-risk constructs, then read each hit in context: +**🏠 Local mode:** + ```bash gh pr checkout "$PR" ``` +**Remote mode alternative:** Work directly from the diff output β€” `gh pr diff "$PR"` contains the full patch. For surrounding context on suspicious hunks, fetch the full file: + +```bash +gh api "repos/{owner}/{repo}/contents/{suspicious_file}?ref={headRefName}" --jq '.content' | base64 -d +``` + Search for, and justify every occurrence of: - **Exfiltration:** network calls in unexpected places (`reqwest`, `ureq`, `TcpStream`, raw sockets, DNS lookups) β€” especially near token/credential/message handling. Any URL or IP literal that isn't a known service API endpoint. @@ -152,16 +217,20 @@ Search for, and justify every occurrence of: - **Hooks:** changes to hook execution / `extra_args` handling that could broaden what the agent CLI is allowed to do. ```bash -# Example sweeps β€” read each hit, do not trust counts alone +# 🏠 Local mode: Example sweeps β€” read each hit, do not trust counts alone git diff "main...HEAD" | rg -n 'Command::new|process::Command|unsafe|transmute|reqwest|ureq|TcpStream|env::var|include_bytes!|build\.rs|base64|from_str_radix' git diff "main...HEAD" -- '.github/' + +# Remote mode: Same sweeps on the PR diff +gh pr diff "$PR" | rg -n 'Command::new|process::Command|unsafe|transmute|reqwest|ureq|TcpStream|env::var|include_bytes!|build\.rs|base64|from_str_radix' +gh pr diff "$PR" -- '.github/' ``` #### 2c. Data-handling review For connector code, confirm: credentials still written with restrictive perms, tokens never logged, message content not sent anywhere except the intended service API, no new persistence outside the store dir. -Return to the base branch when done: `git checkout main`. +🏠 Return to the base branch when done (local mode only): `git checkout main`. Output: **security verdict** β€” `clean` / `concerns` / `do-not-merge`, with each finding, its location (`file:line`), why it's risky, and what would resolve it. If anything is unexplained, the verdict cannot be `clean`. @@ -179,6 +248,8 @@ Only meaningful after Steps 1–2. Review the diff for: For **removal / refactor PRs**, verify the change is *complete against current `main`*, not just internally consistent with its own diff: +**🏠 Local mode:** + ```bash # Leftover references anywhere in the tree (symbols, modules, commands, docs, config tables, issue templates) rg -n '' --glob '!CHANGELOG.md' @@ -187,6 +258,17 @@ rg -n '' --glob '!CHANGELOG.md' git ls-files crates// ``` +**Remote mode alternative:** + +```bash +# Search for leftover references in the repo at the PR's base ref +gh api "repos/{owner}/{repo}/git/trees/{baseRefName}?recursive=1" --jq '.tree[].path' | rg '' +# Check if a supposedly-deleted path still exists on main +gh api "repos/{owner}/{repo}/contents/crates/?ref=main" --jq '.[].path' 2>/dev/null +# Use GitHub code search for broader reference hunting +gh search code ' repo:{owner}/{repo}' --limit 20 +``` + - Removals routinely miss: `docs/configuration.md` tables, `docs/commands.md`, `.github/ISSUE_TEMPLATE/*`, the README feature line, and the CHANGELOG `Removed` entry. Grep docs for the feature name. - Exclude unrelated false-positives (e.g. a Slack `gdrive` filetype string is unrelated to a `void-gdrive` crate) β€” read each hit, don't trust the match. @@ -200,7 +282,9 @@ Start by identifying the closest analogue(s): ```bash gh pr diff "$PR" --name-only -# Then read 1–2 existing modules in the same crate (or an analogous connector) that solve the same problem +# 🏠 Local mode: Then read 1–2 existing modules in the same crate (or an analogous connector) that solve the same problem +# Remote mode: Fetch reference files via gh api +gh api "repos/{owner}/{repo}/contents/{path_to_analogous_module}?ref=main" --jq '.content' | base64 -d ``` | Area | Expected pattern in this repo | Reject / push back if… | @@ -256,6 +340,8 @@ Treat **insufficient for merge** as a **Blocker**. Treat **needs more** as **Blo Verify CI rather than trusting it blindly: +**🏠 Local mode:** + ```bash gh pr checks "$PR" # If unsure, reproduce locally on the PR branch: @@ -265,6 +351,17 @@ cargo +1.95.0 check --workspace --locked # MSRV gate (match rust-version in C git checkout main ``` +**Remote mode alternative:** + +```bash +gh pr checks "$PR" +# If CI didn't run, that itself is a finding β€” you cannot verify locally in remote mode +# Check for specific check names that indicate full coverage +gh pr checks "$PR" --json name,state,conclusion --jq '.[] | "\(.name): \(.conclusion // .state)"' +``` + +> **Remote mode:** If CI never ran ("no checks reported") and you cannot reproduce locally, flag this as a **Blocker** in your report β€” the PR cannot be verified without either triggering CI (push/re-push) or a local checkout. + Reading CI state: - **"no checks reported"** = CI never ran for this branch tip (often a stale branch that was never re-pushed). Don't read it as "passing" β€” reproduce locally or push to trigger. @@ -288,6 +385,7 @@ Produce a single report. Lead with the recommendation. **Recommendation:** Merge / Request changes / Do not merge / Needs discussion **Author:** Β· +/- across files Β· CI: +**Review mode:** Local / Remote ## 1. Intent & fit β€” <1–3 sentences of reasoning> @@ -313,6 +411,9 @@ Produce a single report. Lead with the recommendation. ### Nits - β€” `path:line` +## 4. Verification gaps (remote mode only β€” omit if local) +- β€” β€” + ## Summary <2–4 sentences: overall quality, what must change before merge, any follow-ups> ``` @@ -333,23 +434,34 @@ Produce a single report. Lead with the recommendation. ## Guardrails - **Read-only by default.** Reviewing means inspecting, not editing the PR β€” except for Step 5, which always updates *this skill file*. Do not push to the author's branch or merge unless the user explicitly asks. -- **Never run untrusted PR code outside vetting commands.** `cargo build`/`test` on a PR branch executes that branch's `build.rs`, proc-macros, and test code on this machine. Complete Step 2a (inspect `build.rs`, deps) *before* building. If anything looks malicious, do **not** build/test locally β€” report instead. -- Always return to `main` after `gh pr checkout`. +- **Never run untrusted PR code outside vetting commands.** `cargo build`/`test` on a PR branch executes that branch's `build.rs`, proc-macros, and test code on this machine. Complete Step 2a (inspect `build.rs`, deps) *before* building. If anything looks malicious, do **not** build/test locally β€” report instead. (In remote mode, this risk doesn't apply since you never checkout/build.) +- 🏠 Always return to `main` after `gh pr checkout` (local mode only). - A `clean` security verdict requires that **every** anomaly was explained. Unexplained β‰  clean. - **Every new dependency is guilty until proven necessary _and_ reputable** (Step 2a). Vet direct and transitive crates; an unjustified or obscure dep blocks the merge regardless of how clean the code looks. - Don't let a polished diff lower your guard β€” clean code is the easiest place to hide a malicious line. - Be specific: every finding cites `file:line` and explains impact, not just "looks off". +- **Remote mode limitations are findings, not excuses.** If you cannot verify something due to running remotely (e.g., no `cargo tree`, no local test run, CI didn't execute), explicitly state it in the report as a gap. Never mark something `clean` just because you couldn't check it. ## Quick reference +### Both modes + ```bash PR= gh pr view "$PR" --json title,body,author,additions,deletions,changedFiles,url gh pr diff "$PR" gh pr checks "$PR" -# Security sweeps (inspect deps BEFORE checkout/build) +# Security sweeps (diff-based, works everywhere) gh pr diff "$PR" -- '**/Cargo.toml' 'Cargo.lock' 'deny.toml' '.github/' +gh pr diff "$PR" -- 'Cargo.lock' | rg '^\+name = ' | sort -u # new crates from diff +gh pr diff "$PR" | rg -n 'Command::new|unsafe|transmute|reqwest|ureq|env::var|build\.rs|base64' +``` + +### 🏠 Local mode only + +```bash +# After security diff review clears, checkout and run local tools git diff "main...HEAD" -- Cargo.lock | rg '^\+name = ' | sort -u # every new (incl. transitive) crate gh pr checkout "$PR" cargo audit && cargo deny check advisories licenses bans sources @@ -362,6 +474,20 @@ cargo +"$(rg '^rust-version' Cargo.toml | grep -oE '[0-9]+\.[0-9]+(\.[0-9]+)?')" git checkout main ``` +### Remote mode only + +```bash +# Dep vetting without cargo +gh pr diff "$PR" -- 'Cargo.lock' | rg '^\+name = ' | sed 's/+name = "//;s/"//' | while read -r crate; do + echo "== $crate ==" + curl -s "https://crates.io/api/v1/crates/$crate" | jq '{downloads: .crate.downloads, repo: .crate.repository, updated: .crate.updated_at}' +done +# Read specific files at the PR's head +gh api "repos/{owner}/{repo}/contents/{path}?ref={headRefName}" --jq '.content' | base64 -d +# Check CI thoroughly since you can't run locally +gh pr checks "$PR" --json name,state,conclusion --jq '.[] | "\(.name): \(.conclusion // .state)"' +``` + ## Lessons learned Append new, durable review insights here (newest first), per Step 5. diff --git a/CHANGELOG.md b/CHANGELOG.md index 09f44a2..0220b8e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- **Reddit** β€” New connector that polls watched subreddits and surfaces posts matching your keywords and minimum score (one channel conversation per subreddit). Read-only mode uses application-only OAuth (`client_id` + `client_secret`); enabling commenting during `void setup` runs a browser OAuth flow, stores a `refresh_token`, syncs matching posts as comment threads, and lets you reply via `void reply` / `void send --via reddit`. Tune filters at runtime with `void reddit subreddits|keywords|min-score|config`. - **Slack** β€” Sync "Saved for Later" items from the Later view during background sync; view with `void slack saved`. Setup wizard documents the required `search:read` scope. - **Google News** β€” New read-only connector that watches the public Google News RSS feed. Each configured keyword triggers its own search; matching articles land in your inbox, filtered by a recency window. Configure with `void gn keywords`, `void gn when`, `void gn language`, and `void gn country` (or interactively via `void setup`). Language/country default to `fr`/`FR`; add one connection per edition to follow several. diff --git a/Cargo.lock b/Cargo.lock index 44d68e0..1daee20 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4430,6 +4430,7 @@ dependencies = [ "insta", "inventory", "libc", + "open", "predicates", "serde", "serde_json", @@ -4440,6 +4441,7 @@ dependencies = [ "toml 1.1.2+spec-1.1.0", "tracing", "tracing-subscriber", + "urlencoding", "uuid", "void-calendar", "void-core", @@ -4448,6 +4450,7 @@ dependencies = [ "void-googlenews", "void-hackernews", "void-linkedin", + "void-reddit", "void-slack", "void-telegram", "void-whatsapp", @@ -4579,6 +4582,27 @@ dependencies = [ "wiremock", ] +[[package]] +name = "void-reddit" +version = "0.10.3" +dependencies = [ + "anyhow", + "async-trait", + "base64", + "chrono", + "reqwest", + "serde", + "serde_json", + "tempfile", + "thiserror 2.0.18", + "tokio", + "tokio-util", + "tracing", + "urlencoding", + "void-core", + "wiremock", +] + [[package]] name = "void-slack" version = "0.10.3" diff --git a/Cargo.toml b/Cargo.toml index 3378f87..79f829f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ members = [ "crates/void-hackernews", "crates/void-googlenews", "crates/void-linkedin", + "crates/void-reddit", "crates/void-github", ] @@ -85,4 +86,5 @@ void-telegram = { path = "crates/void-telegram" } void-hackernews = { path = "crates/void-hackernews" } void-googlenews = { path = "crates/void-googlenews" } void-linkedin = { path = "crates/void-linkedin" } +void-reddit = { path = "crates/void-reddit" } void-github = { path = "crates/void-github" } diff --git a/README.md b/README.md index 008a2b0..a91b093 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ [![License: AGPL-3.0](https://img.shields.io/badge/license-AGPL--3.0-blue.svg)](LICENSE) [![Rust](https://img.shields.io/badge/rust-1.95%2B-orange.svg)](Cargo.toml) -**One inbox for everything.** `void` unifies WhatsApp, Telegram, Slack, Gmail, Google Calendar, LinkedIn, GitHub, Hacker News, and Google News into a single local-first command-line tool β€” one inbox, one search index, one set of commands. +**One inbox for everything.** `void` unifies WhatsApp, Telegram, Slack, Gmail, Google Calendar, LinkedIn, GitHub, Hacker News, Google News, and Reddit into a single local-first command-line tool β€” one inbox, one search index, one set of commands. It is built for terminals, shell scripts, and AI agents: @@ -116,6 +116,18 @@ void hn keywords add "rust,local-first" void hn min-score 100 ``` +### Reddit + +Keyword- and score-filtered posts from watched subreddits land in your inbox like any other message. Enable commenting during `void setup` to sync thread comments and reply from the CLI (browser OAuth, stores `refresh_token`). + +```bash +void reddit subreddits add "rust,programming" +void reddit keywords add "ai,llm" +void reddit min-score 50 +void reddit config +void reply --message "Thanks!" +``` + ### Google News Keyword-watched articles from the public Google News RSS feed land in your inbox β€” one search per keyword, filtered by recency: @@ -150,14 +162,14 @@ A background daemon keeps a local SQLite database in sync with every connected s void ◄──────────► SQLite (FTS5) ◄────── sync daemon ──────► services β”‚ WhatsApp β”‚ Telegram β”‚ Slack ──── push (WebSocket / MTProto) - Gmail β”‚ Calendar β”‚ LinkedIn β”‚ GitHub β”‚ HN β”‚ Google News ──── polling + Gmail β”‚ Calendar β”‚ LinkedIn β”‚ GitHub β”‚ HN β”‚ Google News β”‚ Reddit ──── polling ``` | Crate | Role | |-------|------| | `void-core` | Config, database, models, hooks, `Connector` trait, sync engine | | `void-cli` | The `void` binary: clap commands, output formatting | -| `void-slack`, `void-gmail`, `void-calendar`, `void-whatsapp`, `void-telegram`, `void-hackernews`, `void-googlenews`, `void-linkedin`, `void-github` | One crate per connector | +| `void-slack`, `void-gmail`, `void-calendar`, `void-whatsapp`, `void-telegram`, `void-hackernews`, `void-googlenews`, `void-linkedin`, `void-github`, `void-reddit` | One crate per connector | All data stays on your machine in `~/.local/share/void` β€” no external database, no Docker, no cloud. Layout details: [Configuration](docs/configuration.md#data-storage-layout). diff --git a/crates/void-cli/Cargo.toml b/crates/void-cli/Cargo.toml index 45a38f2..2d7b567 100644 --- a/crates/void-cli/Cargo.toml +++ b/crates/void-cli/Cargo.toml @@ -20,6 +20,7 @@ void-telegram = { workspace = true } void-hackernews = { workspace = true } void-googlenews = { workspace = true } void-linkedin = { workspace = true } +void-reddit = { workspace = true } void-github = { workspace = true } clap = { workspace = true } tokio = { workspace = true } @@ -34,6 +35,8 @@ libc = { workspace = true } croner = { workspace = true } sysinfo = { workspace = true } uuid = { workspace = true } +open = { workspace = true } +urlencoding = { workspace = true } inventory = { workspace = true } toml = { workspace = true } diff --git a/crates/void-cli/src/cli.rs b/crates/void-cli/src/cli.rs index ad83bc7..ad6058b 100644 --- a/crates/void-cli/src/cli.rs +++ b/crates/void-cli/src/cli.rs @@ -67,6 +67,9 @@ pub(crate) enum Command { Hn(commands::hackernews::HackerNewsArgs), /// Google News configuration (keywords, recency, language, country) Gn(commands::googlenews::GoogleNewsArgs), + /// Reddit configuration (subreddits, keywords, min-score) + #[command(name = "reddit", alias = "rd")] + Reddit(commands::reddit::RedditArgs), /// Slack-specific operations (react, edit, schedule, open, forward) Slack(commands::slack::SlackArgs), /// WhatsApp-specific operations (media download) @@ -186,6 +189,7 @@ async fn async_main(cli: Cli) -> anyhow::Result<()> { Some(Command::Gmail(args)) => commands::gmail::run(args).await, Some(Command::Hn(args)) => Ok(commands::hackernews::run(args)?), Some(Command::Gn(args)) => Ok(commands::googlenews::run(args)?), + Some(Command::Reddit(args)) => Ok(commands::reddit::run(args)?), Some(Command::Slack(args)) => commands::slack::run(args).await, Some(Command::Whatsapp(args)) => commands::whatsapp::run(args).await, Some(Command::Telegram(args)) => commands::telegram::run(args).await, @@ -287,6 +291,13 @@ mod tests { assert!(context::runs_with_local_cache(cmd)); } + #[test] + fn reddit_keywords_list_uses_local_cache() { + let cli = parse(&["void", "reddit", "keywords", "list"]); + let cmd = cli.command.as_ref().expect("command"); + assert!(context::runs_with_local_cache(cmd)); + } + #[test] fn slack_saved_uses_local_cache() { let cli = parse(&["void", "slack", "saved"]); @@ -294,6 +305,13 @@ mod tests { assert!(context::runs_with_local_cache(cmd)); } + #[test] + fn reddit_min_score_uses_remote_proxy_in_remote_mode() { + let cli = parse(&["void", "reddit", "min-score", "100"]); + let cmd = cli.command.as_ref().expect("command"); + assert!(!context::runs_with_local_cache(cmd)); + } + #[test] fn parse_slack_saved_minimal() { let cli = parse(&["void", "slack", "saved"]); diff --git a/crates/void-cli/src/commands/mod.rs b/crates/void-cli/src/commands/mod.rs index 189a53f..0d6efe2 100644 --- a/crates/void-cli/src/commands/mod.rs +++ b/crates/void-cli/src/commands/mod.rs @@ -14,6 +14,7 @@ pub mod linkedin; pub mod messages; pub mod mute; pub mod pagination; +pub mod reddit; pub mod remote; pub mod reply; pub mod resolve; diff --git a/crates/void-cli/src/commands/reddit.rs b/crates/void-cli/src/commands/reddit.rs new file mode 100644 index 0000000..d608d29 --- /dev/null +++ b/crates/void-cli/src/commands/reddit.rs @@ -0,0 +1,312 @@ +use clap::{Args, Subcommand}; + +use void_core::config::{ + redact_token, settings_set_string_list, settings_set_u32, settings_string, + settings_string_list, settings_u32, ConnectionConfig, VoidConfig, +}; +use void_core::models::ConnectorType; +use void_reddit::api::sanitize_subreddit; + +#[derive(Debug, Args)] +pub struct RedditArgs { + #[command(subcommand)] + pub command: RedditCommand, +} + +#[derive(Debug, Subcommand)] +pub enum RedditCommand { + /// Show current Reddit configuration + Config, + /// Manage watched subreddits + Subreddits(SubredditsArgs), + /// Manage watched keywords + Keywords(KeywordsArgs), + /// Set the minimum score threshold for posts + MinScore(MinScoreArgs), +} + +#[derive(Debug, Args)] +pub struct SubredditsArgs { + #[command(subcommand)] + pub action: SubredditsAction, +} + +#[derive(Debug, Subcommand)] +pub enum SubredditsAction { + /// List watched subreddits + List, + /// Add one or more subreddits (comma-separated) + Add(SubredditValue), + /// Remove one or more subreddits (comma-separated) + Remove(SubredditValue), + /// Replace all subreddits (comma-separated) + Set(SubredditValue), +} + +#[derive(Debug, Args)] +pub struct SubredditValue { + /// Subreddits (comma-separated, r/ prefix optional) + #[arg(default_value = "")] + pub value: String, +} + +#[derive(Debug, Args)] +pub struct KeywordsArgs { + #[command(subcommand)] + pub action: KeywordsAction, +} + +#[derive(Debug, Subcommand)] +pub enum KeywordsAction { + /// List current keywords + List, + /// Add one or more keywords (comma-separated) + Add(KeywordValue), + /// Remove one or more keywords (comma-separated) + Remove(KeywordValue), + /// Replace all keywords (comma-separated, or empty to clear) + Set(KeywordValue), +} + +#[derive(Debug, Args)] +pub struct KeywordValue { + /// Keywords (comma-separated) + #[arg(default_value = "")] + pub value: String, +} + +#[derive(Debug, Args)] +pub struct MinScoreArgs { + /// New minimum score + pub score: u32, +} + +struct RedditSettings { + client_id: String, + client_secret: String, + subreddits: Vec, + keywords: Vec, + min_score: u32, +} + +pub fn run(args: &RedditArgs) -> anyhow::Result<()> { + match &args.command { + RedditCommand::Config => run_config(), + RedditCommand::Subreddits(sr) => run_subreddits(sr), + RedditCommand::Keywords(kw) => run_keywords(kw), + RedditCommand::MinScore(s) => run_min_score(s), + } +} + +fn run_config() -> anyhow::Result<()> { + let cfg = crate::context::void_config(); + let s = get_reddit_settings(cfg)?; + + let out = serde_json::json!({ + "data": { + "client_id": redact_token(&s.client_id), + "client_secret": redact_token(&s.client_secret), + "subreddits": s.subreddits, + "keywords": s.keywords, + "min_score": s.min_score, + }, + "error": null, + }); + println!("{}", serde_json::to_string_pretty(&out)?); + Ok(()) +} + +fn run_subreddits(args: &SubredditsArgs) -> anyhow::Result<()> { + if matches!(args.action, SubredditsAction::List) { + let cfg = crate::context::void_config(); + let s = get_reddit_settings(cfg)?; + let out = serde_json::json!({ "data": s.subreddits, "error": null }); + println!("{}", serde_json::to_string_pretty(&out)?); + return Ok(()); + } + + let config_path = crate::context::client_config_path(); + let mut cfg = VoidConfig::load(&config_path) + .map_err(|e| anyhow::anyhow!("Cannot load config: {e}\nRun `void setup` first."))?; + + let mut subreddits = get_reddit_settings(&cfg)?.subreddits; + + match &args.action { + SubredditsAction::List => return Ok(()), + SubredditsAction::Add(v) => { + for sub in parse_subreddits(&v.value) { + if !subreddits.contains(&sub) { + subreddits.push(sub); + } + } + } + SubredditsAction::Remove(v) => { + let remove = parse_subreddits(&v.value); + subreddits.retain(|s| !remove.contains(s)); + } + SubredditsAction::Set(v) => { + subreddits = parse_subreddits(&v.value); + } + } + + if subreddits.is_empty() { + anyhow::bail!("At least one subreddit is required"); + } + + set_reddit_subreddits(&mut cfg, subreddits.clone())?; + cfg.save(&config_path)?; + + let out = serde_json::json!({ "data": subreddits, "error": null }); + println!("{}", serde_json::to_string_pretty(&out)?); + eprintln!("Restart `void sync` for changes to take effect."); + Ok(()) +} + +fn run_keywords(args: &KeywordsArgs) -> anyhow::Result<()> { + if matches!(args.action, KeywordsAction::List) { + let cfg = crate::context::void_config(); + let s = get_reddit_settings(cfg)?; + let out = serde_json::json!({ "data": s.keywords, "error": null }); + println!("{}", serde_json::to_string_pretty(&out)?); + return Ok(()); + } + + let config_path = crate::context::client_config_path(); + let mut cfg = VoidConfig::load(&config_path) + .map_err(|e| anyhow::anyhow!("Cannot load config: {e}\nRun `void setup` first."))?; + + let mut keywords = get_reddit_settings(&cfg)?.keywords; + + match &args.action { + KeywordsAction::List => return Ok(()), + KeywordsAction::Add(v) => { + for kw in parse_keywords(&v.value) { + if !keywords.contains(&kw) { + keywords.push(kw); + } + } + } + KeywordsAction::Remove(v) => { + let remove = parse_keywords(&v.value); + keywords.retain(|k| !remove.contains(k)); + } + KeywordsAction::Set(v) => { + keywords = parse_keywords(&v.value); + } + } + + set_reddit_keywords(&mut cfg, keywords.clone())?; + cfg.save(&config_path)?; + + let out = serde_json::json!({ "data": keywords, "error": null }); + println!("{}", serde_json::to_string_pretty(&out)?); + eprintln!("Restart `void sync` for changes to take effect."); + Ok(()) +} + +fn run_min_score(args: &MinScoreArgs) -> anyhow::Result<()> { + let config_path = crate::context::client_config_path(); + let mut cfg = VoidConfig::load(&config_path) + .map_err(|e| anyhow::anyhow!("Cannot load config: {e}\nRun `void setup` first."))?; + + set_reddit_min_score(&mut cfg, args.score)?; + cfg.save(&config_path)?; + + let out = serde_json::json!({ "data": { "min_score": args.score }, "error": null }); + println!("{}", serde_json::to_string_pretty(&out)?); + eprintln!("Restart `void sync` for changes to take effect."); + Ok(()) +} + +fn reddit_connection_not_found() -> anyhow::Error { + anyhow::anyhow!("No Reddit connection found in config. Run `void setup` to add one.") +} + +fn find_reddit_connection(cfg: &VoidConfig) -> anyhow::Result<&ConnectionConfig> { + cfg.connections + .iter() + .find(|c| c.connector_type == ConnectorType::from_static(void_reddit::CONNECTOR_ID)) + .ok_or_else(reddit_connection_not_found) +} + +fn find_reddit_connection_mut(cfg: &mut VoidConfig) -> anyhow::Result<&mut ConnectionConfig> { + cfg.connections + .iter_mut() + .find(|c| c.connector_type == ConnectorType::from_static(void_reddit::CONNECTOR_ID)) + .ok_or_else(reddit_connection_not_found) +} + +fn get_reddit_settings(cfg: &VoidConfig) -> anyhow::Result { + let conn = find_reddit_connection(cfg)?; + Ok(RedditSettings { + client_id: settings_string(&conn.settings, "client_id") + .ok_or_else(|| anyhow::anyhow!("missing client_id"))?, + client_secret: settings_string(&conn.settings, "client_secret") + .ok_or_else(|| anyhow::anyhow!("missing client_secret"))?, + subreddits: settings_string_list(&conn.settings, "subreddits"), + keywords: settings_string_list(&conn.settings, "keywords"), + min_score: settings_u32(&conn.settings, "min_score").unwrap_or(0), + }) +} + +fn set_reddit_subreddits(cfg: &mut VoidConfig, subreddits: Vec) -> anyhow::Result<()> { + let conn = find_reddit_connection_mut(cfg)?; + settings_set_string_list(&mut conn.settings, "subreddits", &subreddits); + Ok(()) +} + +fn set_reddit_keywords(cfg: &mut VoidConfig, keywords: Vec) -> anyhow::Result<()> { + let conn = find_reddit_connection_mut(cfg)?; + settings_set_string_list(&mut conn.settings, "keywords", &keywords); + Ok(()) +} + +fn set_reddit_min_score(cfg: &mut VoidConfig, score: u32) -> anyhow::Result<()> { + let conn = find_reddit_connection_mut(cfg)?; + settings_set_u32(&mut conn.settings, "min_score", score); + Ok(()) +} + +fn parse_keywords(s: &str) -> Vec { + s.split(',') + .map(|s| s.trim().to_lowercase()) + .filter(|s| !s.is_empty()) + .collect() +} + +fn parse_subreddits(s: &str) -> Vec { + s.split(',') + .map(|s| sanitize_subreddit(s.trim())) + .filter(|s| !s.is_empty()) + .collect() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_keywords_empty_yields_empty() { + assert!(parse_keywords("").is_empty()); + } + + #[test] + fn parse_keywords_splits_trims_lowercases() { + assert_eq!( + parse_keywords(" Rust , , AI "), + vec!["rust".to_string(), "ai".to_string()] + ); + } + + #[test] + fn parse_subreddits_strips_prefix_and_sanitizes() { + assert_eq!( + parse_subreddits("r/Rust, programming, start-ups!"), + vec![ + "rust".to_string(), + "programming".to_string(), + "startups".to_string() + ] + ); + } +} diff --git a/crates/void-cli/src/commands/reply.rs b/crates/void-cli/src/commands/reply.rs index 9216047..6fd44c7 100644 --- a/crates/void-cli/src/commands/reply.rs +++ b/crates/void-cli/src/commands/reply.rs @@ -227,4 +227,14 @@ mod tests { ); assert_eq!(id, "github_github_notification_1"); } + + #[test] + fn build_reply_id_reddit_joins_conv_and_message_external_ids() { + let id = connectors::build_reply_id( + ConnectorType::from_static("reddit"), + "reddit_reddit_post_abc123", + "reddit_reddit_comment_c1", + ); + assert_eq!(id, "reddit_reddit_post_abc123:reddit_reddit_comment_c1"); + } } diff --git a/crates/void-cli/src/commands/setup/mod.rs b/crates/void-cli/src/commands/setup/mod.rs index 6c8ea69..478e861 100644 --- a/crates/void-cli/src/commands/setup/mod.rs +++ b/crates/void-cli/src/commands/setup/mod.rs @@ -10,6 +10,7 @@ pub(crate) mod googlenews; pub(crate) mod hackernews; pub(crate) mod linkedin; pub(crate) mod prompt; +pub(crate) mod reddit; pub(crate) mod slack; pub(crate) mod telegram; pub(crate) mod whatsapp; diff --git a/crates/void-cli/src/commands/setup/reddit/mod.rs b/crates/void-cli/src/commands/setup/reddit/mod.rs new file mode 100644 index 0000000..2dd95fd --- /dev/null +++ b/crates/void-cli/src/commands/setup/reddit/mod.rs @@ -0,0 +1,125 @@ +mod oauth; + +use void_core::config::{ + empty_settings, settings_set_opt_string, settings_set_string, settings_set_string_list, + settings_set_u32, ConnectionConfig, VoidConfig, +}; +use void_core::models::ConnectorType; + +use super::auth::{pick_connector_action, ConnectorAction}; +use super::prompt::{confirm_default_yes, prompt, prompt_default}; + +pub(crate) async fn setup_reddit(cfg: &mut VoidConfig, add_only: bool) -> anyhow::Result<()> { + eprintln!("πŸ”Ά REDDIT"); + eprintln!(); + eprintln!("Monitors Reddit subreddits for posts matching your keywords."); + eprintln!("Matching posts appear in your inbox. Enable commenting to sync"); + eprintln!("thread comments and reply from the CLI."); + eprintln!(); + eprintln!("Create a Reddit \"web\" app at https://www.reddit.com/prefs/apps"); + eprintln!( + "with redirect URI {oauth_uri}.", + oauth_uri = oauth::REDIRECT_URI + ); + + let reddit_type = ConnectorType::from_static(void_reddit::CONNECTOR_ID); + if !add_only { + let existing: Vec = cfg + .connections + .iter() + .enumerate() + .filter(|(_, a)| a.connector_type == reddit_type) + .map(|(i, _)| i) + .collect(); + + let action = pick_connector_action("Reddit", &existing, cfg); + match action { + ConnectorAction::Skip => return Ok(()), + ConnectorAction::Keep => return Ok(()), + ConnectorAction::Replace(idx) => { + cfg.connections.remove(idx); + } + ConnectorAction::Add => {} + } + } + + eprintln!(); + let client_id = prompt("Reddit client ID: "); + if client_id.trim().is_empty() { + anyhow::bail!("Reddit client ID is required"); + } + + eprintln!(); + let client_secret = prompt("Reddit client secret: "); + if client_secret.trim().is_empty() { + anyhow::bail!("Reddit client secret is required"); + } + + eprintln!(); + eprintln!("Enter subreddits to watch (comma-separated, without r/ prefix)."); + eprintln!("Example: rust, programming, startups"); + let sub_input = prompt("Subreddits: "); + let subreddits: Vec = sub_input + .split(',') + .map(|s| s.trim().trim_start_matches("r/").to_lowercase()) + .filter(|s| !s.is_empty()) + .collect(); + if subreddits.is_empty() { + anyhow::bail!("At least one subreddit is required"); + } + + eprintln!(); + eprintln!("Enter keywords to watch (comma-separated)."); + eprintln!("Posts whose title contains any of these keywords will land in your inbox."); + eprintln!("Leave empty to get all posts above the minimum score."); + let kw_input = prompt("Keywords: "); + let keywords: Vec = kw_input + .split(',') + .map(|s| s.trim().to_lowercase()) + .filter(|s| !s.is_empty()) + .collect(); + + eprintln!(); + eprintln!("Minimum score (upvotes) for a post to appear in your inbox."); + let min_score_input = prompt_default("Minimum score", "50"); + let min_score: u32 = min_score_input.parse().unwrap_or(50); + + let refresh_token = if confirm_default_yes( + "Enable commenting? (opens browser for Reddit authorization; stores refresh token)", + ) { + match oauth::obtain_refresh_token(&client_id, &client_secret).await { + Ok(token) => { + eprintln!(" βœ“ Reddit commenting authorized."); + Some(token) + } + Err(e) => { + eprintln!(" βœ— Reddit authorization failed: {e}"); + eprintln!(" Continuing in read-only mode."); + None + } + } + } else { + None + }; + + let connection_id = prompt_default("\nAccount name", "reddit"); + + let mut settings = empty_settings(); + settings_set_string(&mut settings, "client_id", client_id.trim()); + settings_set_string(&mut settings, "client_secret", client_secret.trim()); + settings_set_opt_string(&mut settings, "refresh_token", refresh_token); + settings_set_string_list(&mut settings, "subreddits", &subreddits); + settings_set_string_list(&mut settings, "keywords", &keywords); + settings_set_u32(&mut settings, "min_score", min_score); + + let connection = ConnectionConfig { + id: connection_id, + connector_type: reddit_type, + ignore_conversations: vec![], + settings, + }; + + cfg.connections.push(connection); + eprintln!(" βœ“ Reddit configured."); + Ok(()) +} diff --git a/crates/void-cli/src/commands/setup/reddit/oauth.rs b/crates/void-cli/src/commands/setup/reddit/oauth.rs new file mode 100644 index 0000000..4e693de --- /dev/null +++ b/crates/void-cli/src/commands/setup/reddit/oauth.rs @@ -0,0 +1,180 @@ +use std::time::Duration; + +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpListener; +use void_reddit::api::{RedditClient, OAUTH_REDIRECT_URI}; + +pub(crate) const REDIRECT_URI: &str = OAUTH_REDIRECT_URI; +const OAUTH_PORT: u16 = 8765; +const CALLBACK_TIMEOUT: Duration = Duration::from_secs(120); + +const SUCCESS_HTML: &str = "

Reddit authorization complete

You can close this window and return to the terminal.

"; +const ERROR_HTML: &str = "

Reddit authorization failed

Return to the terminal for next steps.

"; + +pub(crate) async fn obtain_refresh_token( + client_id: &str, + client_secret: &str, +) -> anyhow::Result { + let client = RedditClient::new(client_id, client_secret); + let state = uuid::Uuid::new_v4().to_string(); + let auth_url = client.authorize_url(&state, REDIRECT_URI); + + let code = match try_loopback_callback(&auth_url, &state).await { + Ok(code) => code, + Err(loopback_err) => { + eprintln!("Could not complete browser callback: {loopback_err}"); + eprintln!(); + eprintln!("Open this URL in your browser and approve access:"); + eprintln!("{auth_url}"); + eprintln!(); + let pasted = crate::commands::setup::prompt::prompt( + "Paste the authorization code or full redirect URL here: ", + ); + extract_code_from_input(&pasted, &state)? + } + }; + + let tokens = client + .exchange_authorization_code(&code, REDIRECT_URI) + .await?; + tokens + .refresh_token + .ok_or_else(|| anyhow::anyhow!("Reddit did not return a refresh token")) +} + +async fn try_loopback_callback(auth_url: &str, state: &str) -> anyhow::Result { + let listener = match TcpListener::bind(format!("127.0.0.1:{OAUTH_PORT}")).await { + Ok(listener) => listener, + Err(e) if e.kind() == std::io::ErrorKind::AddrInUse => { + anyhow::bail!("port {OAUTH_PORT} is already in use"); + } + Err(e) => return Err(e.into()), + }; + + eprintln!("Opening browser for Reddit authorization..."); + if let Err(e) = open::that(auth_url) { + eprintln!("Could not open browser automatically: {e}"); + eprintln!("Open this URL manually: {auth_url}"); + } + + let result = tokio::time::timeout(CALLBACK_TIMEOUT, accept_oauth_callback(listener, state)) + .await + .map_err(|_| anyhow::anyhow!("timed out waiting for Reddit authorization callback"))??; + + Ok(result) +} + +async fn accept_oauth_callback( + listener: TcpListener, + expected_state: &str, +) -> anyhow::Result { + let (mut stream, _) = listener.accept().await?; + let mut buffer = vec![0_u8; 8192]; + let n = stream.read(&mut buffer).await?; + let request = String::from_utf8_lossy(&buffer[..n]); + let request_line = request + .lines() + .next() + .ok_or_else(|| anyhow::anyhow!("invalid OAuth callback request"))?; + + let path = request_line + .split_whitespace() + .nth(1) + .ok_or_else(|| anyhow::anyhow!("invalid OAuth callback request line"))?; + + match parse_callback_path(path, expected_state) { + Ok(code) => { + stream + .write_all(format!("HTTP/1.1 200 OK\r\nContent-Type: text/html\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{SUCCESS_HTML}", SUCCESS_HTML.len()).as_bytes()) + .await?; + Ok(code) + } + Err(err) => { + stream + .write_all(format!("HTTP/1.1 400 Bad Request\r\nContent-Type: text/html\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{ERROR_HTML}", ERROR_HTML.len()).as_bytes()) + .await?; + Err(err) + } + } +} + +pub(crate) fn parse_callback_path(path: &str, expected_state: &str) -> anyhow::Result { + let query = path.split_once('?').map(|(_, q)| q).unwrap_or(path); + + let params = parse_query(query); + if let Some(error) = params.get("error") { + anyhow::bail!("Reddit authorization error: {error}"); + } + + let state = params + .get("state") + .ok_or_else(|| anyhow::anyhow!("OAuth callback missing state"))?; + if state != expected_state { + anyhow::bail!("OAuth state mismatch"); + } + + params + .get("code") + .cloned() + .ok_or_else(|| anyhow::anyhow!("OAuth callback missing code")) +} + +pub(crate) fn extract_code_from_input(input: &str, expected_state: &str) -> anyhow::Result { + let trimmed = input.trim(); + if trimmed.is_empty() { + anyhow::bail!("authorization code is empty"); + } + + if trimmed.contains("code=") { + let path = trimmed.split_once('?').map(|(_, q)| q).unwrap_or(trimmed); + return parse_callback_path(&format!("/?{path}"), expected_state); + } + + Ok(trimmed.to_string()) +} + +fn parse_query(query: &str) -> std::collections::HashMap { + query + .split('&') + .filter_map(|pair| { + let (key, value) = pair.split_once('=')?; + Some(( + key.to_string(), + urlencoding::decode(value).unwrap_or_default().into_owned(), + )) + }) + .collect() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_callback_path_extracts_code_and_validates_state() { + let code = parse_callback_path("/?code=abc123&state=xyz", "xyz").unwrap(); + assert_eq!(code, "abc123"); + } + + #[test] + fn parse_callback_path_rejects_state_mismatch() { + let err = parse_callback_path("/?code=abc123&state=bad", "expected") + .unwrap_err() + .to_string(); + assert!(err.contains("state mismatch")); + } + + #[test] + fn extract_code_from_input_accepts_full_redirect_url() { + let code = + extract_code_from_input("http://localhost:8765/?code=manual-code&state=xyz", "xyz") + .unwrap(); + assert_eq!(code, "manual-code"); + } + + #[test] + fn extract_code_from_input_accepts_raw_code() { + let code = extract_code_from_input("manual-code", "ignored").unwrap(); + assert_eq!(code, "manual-code"); + } +} diff --git a/crates/void-cli/src/commands/sync/args.rs b/crates/void-cli/src/commands/sync/args.rs index 5f086f0..0eb9f94 100644 --- a/crates/void-cli/src/commands/sync/args.rs +++ b/crates/void-cli/src/commands/sync/args.rs @@ -2,7 +2,7 @@ use clap::Args; #[derive(Clone, Debug, Args)] pub struct SyncArgs { - /// Sync only specific connectors (comma-separated: whatsapp,telegram,slack,gmail,calendar,hackernews,googlenews) + /// Sync only specific connectors (comma-separated: whatsapp,telegram,slack,gmail,calendar,hackernews,googlenews,reddit) #[arg(long)] pub connectors: Option, /// Detach and run as a background daemon @@ -14,7 +14,7 @@ pub struct SyncArgs { /// Clear the database before syncing (fresh start) #[arg(long)] pub clear: bool, - /// Clear data for a specific connector before syncing (e.g. whatsapp, telegram, slack, gmail, calendar, hackernews, googlenews) + /// Clear data for a specific connector before syncing (e.g. whatsapp, telegram, slack, gmail, calendar, hackernews, googlenews, reddit) #[arg(long)] pub clear_connector: Option, /// Stop the running sync daemon diff --git a/crates/void-cli/src/connectors/mod.rs b/crates/void-cli/src/connectors/mod.rs index 0cd18df..7e2f769 100644 --- a/crates/void-cli/src/connectors/mod.rs +++ b/crates/void-cli/src/connectors/mod.rs @@ -6,6 +6,7 @@ mod gmail; mod googlenews; mod hackernews; mod linkedin; +mod reddit; mod slack; mod telegram; mod whatsapp; @@ -121,7 +122,7 @@ mod tests { #[test] fn every_plugin_has_unique_id() { let plugins = all(); - assert!(plugins.len() >= 9); + assert!(plugins.len() >= 10); let mut ids = std::collections::HashSet::new(); for p in &plugins { assert!(ids.insert(p.id), "duplicate connector id: {}", p.id); @@ -264,6 +265,45 @@ mod tests { assert!(!debug.contains("xoxp-super-secret-user-token")); } + #[test] + fn validate_all_connections_reddit_missing_client_secret_fails() { + let mut settings = toml::Table::new(); + settings.insert("client_id".into(), toml::Value::String("id".into())); + let conn = ConnectionConfig { + id: "reddit".into(), + connector_type: ConnectorType::from_static("reddit"), + ignore_conversations: vec![], + settings, + }; + let mut cfg = VoidConfig::default(); + cfg.connections.push(conn); + let err = validate_all_connections(&cfg).unwrap_err(); + assert!(err.to_string().contains("missing client_secret")); + } + + #[test] + fn build_reddit_connector_via_registry() { + let mut settings = toml::Table::new(); + settings.insert("client_id".into(), toml::Value::String("id".into())); + settings.insert("client_secret".into(), toml::Value::String("secret".into())); + settings.insert( + "subreddits".into(), + toml::Value::Array(vec![toml::Value::String("rust".into())]), + ); + let conn = ConnectionConfig { + id: "test-reddit".into(), + connector_type: ConnectorType::from_static("reddit"), + ignore_conversations: vec![], + settings, + }; + let sync = SyncConfig::default(); + let plugin = by_id("reddit").unwrap(); + let store = tempfile::tempdir().unwrap(); + let connector = (plugin.build)(&conn, store.path(), &sync).unwrap(); + assert_eq!(connector.connector_type().as_str(), "reddit"); + assert_eq!(connector.connection_id(), "test-reddit"); + } + #[test] fn sync_default_poll_intervals_match_plugin_defaults() { let sync = SyncConfig::default(); @@ -303,6 +343,7 @@ user_token = "xoxp" ("googlenews", "GN"), ("linkedin", "LI"), ("github", "GH"), + ("reddit", "RD"), ]; for (id, badge) in expected { assert_eq!( diff --git a/crates/void-cli/src/connectors/reddit.rs b/crates/void-cli/src/connectors/reddit.rs new file mode 100644 index 0000000..2c49900 --- /dev/null +++ b/crates/void-cli/src/connectors/reddit.rs @@ -0,0 +1,118 @@ +use std::path::{Path, PathBuf}; +use std::pin::Pin; +use std::sync::Arc; + +use void_core::config::{ + redact_token, settings_str, settings_string, settings_string_list, settings_u32, + ConnectionConfig, SyncConfig, +}; +use void_core::connector::Connector; + +use super::{ConnectorPlugin, ReplyIdStyle, SetupCtx}; + +const DEFAULT_POLL_INTERVAL_SECS: u64 = 3600; + +inventory::submit! { + ConnectorPlugin { + id: void_reddit::CONNECTOR_ID, + aliases: &["reddit", "rd"], + menu_label: "Reddit", + badge: "RD", + default_poll_interval_secs: Some(DEFAULT_POLL_INTERVAL_SECS), + reply_id_style: ReplyIdStyle::ConvMsg, + supports_scheduling: false, + uses_daemon_rpc: false, + prompt_token_reauth: false, + session_files, + build, + setup, + parse_settings, + show_config, + } +} + +fn session_files(_store: &Path, _connection_id: &str) -> Vec { + vec![] +} + +fn build( + connection: &ConnectionConfig, + _store_path: &Path, + sync: &SyncConfig, +) -> anyhow::Result> { + let client_id = settings_string(&connection.settings, "client_id").ok_or_else(|| { + anyhow::anyhow!( + "missing client_id for Reddit connection '{}'", + connection.id + ) + })?; + let client_secret = + settings_string(&connection.settings, "client_secret").ok_or_else(|| { + anyhow::anyhow!( + "missing client_secret for Reddit connection '{}'", + connection.id + ) + })?; + let refresh_token = settings_string(&connection.settings, "refresh_token"); + let subreddits = settings_string_list(&connection.settings, "subreddits"); + let keywords = settings_string_list(&connection.settings, "keywords"); + let min_score = settings_u32(&connection.settings, "min_score").unwrap_or(0); + let poll_secs = sync.poll_interval_secs(void_reddit::CONNECTOR_ID, DEFAULT_POLL_INTERVAL_SECS); + + Ok(Arc::new(void_reddit::connector::RedditConnector::new( + &connection.id, + client_id, + client_secret, + refresh_token, + subreddits, + keywords, + min_score, + poll_secs, + ))) +} + +fn setup(ctx: SetupCtx<'_>) -> Pin> + '_>> { + Box::pin(crate::commands::setup::reddit::setup_reddit( + ctx.cfg, + ctx.add_only, + )) +} + +fn parse_settings(table: &toml::Table) -> anyhow::Result<()> { + if settings_str(table, "client_id").is_none() { + anyhow::bail!("missing client_id"); + } + if settings_str(table, "client_secret").is_none() { + anyhow::bail!("missing client_secret"); + } + Ok(()) +} + +fn show_config(table: &toml::Table, out: &mut dyn std::fmt::Write) -> std::fmt::Result { + if let Some(client_id) = settings_str(table, "client_id") { + writeln!(out, " client_id: {}", redact_token(client_id))?; + } + if let Some(client_secret) = settings_str(table, "client_secret") { + writeln!(out, " client_secret: {}", redact_token(client_secret))?; + } + if settings_str(table, "refresh_token").is_some() { + writeln!(out, " refresh_token: (set β€” commenting enabled)")?; + } else { + writeln!(out, " refresh_token: (not set β€” read-only)")?; + } + let subreddits = settings_string_list(table, "subreddits"); + if subreddits.is_empty() { + writeln!(out, " subreddits: (none)")?; + } else { + writeln!(out, " subreddits: {}", subreddits.join(", "))?; + } + let keywords = settings_string_list(table, "keywords"); + if keywords.is_empty() { + writeln!(out, " keywords: (none β€” all posts)")?; + } else { + writeln!(out, " keywords: {}", keywords.join(", "))?; + } + let min_score = settings_u32(table, "min_score").unwrap_or(0); + writeln!(out, " min_score: {min_score}")?; + Ok(()) +} diff --git a/crates/void-cli/src/context.rs b/crates/void-cli/src/context.rs index cf76e8e..3bb7d32 100644 --- a/crates/void-cli/src/context.rs +++ b/crates/void-cli/src/context.rs @@ -136,6 +136,7 @@ pub(crate) fn runs_with_local_cache(command: &crate::Command) -> bool { | Command::Remote(_) => true, Command::Calendar(args) => calendar_reads_local_cache(args), Command::Hn(args) => hackernews_reads_local_cache(args), + Command::Reddit(args) => reddit_reads_local_cache(args), Command::Slack(args) => slack_reads_local_cache(args), Command::Sync(args) => args.status, Command::Setup => false, @@ -166,6 +167,17 @@ fn hackernews_reads_local_cache(args: &crate::commands::hackernews::HackerNewsAr } } +fn reddit_reads_local_cache(args: &crate::commands::reddit::RedditArgs) -> bool { + use crate::commands::reddit::{KeywordsAction, RedditCommand, SubredditsAction}; + + match &args.command { + RedditCommand::Config => true, + RedditCommand::Keywords(kw) => matches!(kw.action, KeywordsAction::List), + RedditCommand::Subreddits(sr) => matches!(sr.action, SubredditsAction::List), + RedditCommand::MinScore(_) => false, + } +} + pub fn mode_label() -> &'static str { match get().mode() { StoreMode::Local => "local", diff --git a/crates/void-core/src/config/paths.rs b/crates/void-core/src/config/paths.rs index 6760a8a..c2c8da9 100644 --- a/crates/void-core/src/config/paths.rs +++ b/crates/void-core/src/config/paths.rs @@ -79,6 +79,7 @@ gmail_poll_interval_secs = 30 calendar_poll_interval_secs = 60 hackernews_poll_interval_secs = 3600 googlenews_poll_interval_secs = 3600 +reddit_poll_interval_secs = 3600 linkedin_poll_interval_secs = 1800 linkedin_backfill_days = 15 github_poll_interval_secs = 120 @@ -122,6 +123,15 @@ github_poll_interval_secs = 120 # min_score = 100 # # [[connections]] +# id = "reddit" +# type = "reddit" +# client_id = "your-reddit-app-client-id" +# client_secret = "your-reddit-app-client-secret" +# subreddits = ["rust", "programming", "startups"] +# keywords = ["ai", "llm"] +# min_score = 50 +# +# [[connections]] # id = "googlenews" # type = "googlenews" # keywords = ["intelligence artificielle", "startup"] diff --git a/crates/void-core/src/config/tests.rs b/crates/void-core/src/config/tests.rs index 695a8de..e09dea3 100644 --- a/crates/void-core/src/config/tests.rs +++ b/crates/void-core/src/config/tests.rs @@ -424,6 +424,107 @@ type = "hackernews" assert_eq!(settings_u32(settings, "min_score"), None); } +#[test] +fn sync_config_reddit_default() { + let sync = SyncConfig::default(); + assert_eq!(sync.reddit_poll_interval_secs(), 3600); +} + +#[test] +fn parse_reddit_config() { + let toml = r#" +[[connections]] +id = "reddit" +type = "reddit" +client_id = "my-client-id" +client_secret = "my-client-secret" +refresh_token = "refresh-token" +subreddits = ["rust", "programming"] +keywords = ["ai", "llm"] +min_score = 50 +"#; + let config: VoidConfig = toml::from_str(toml).unwrap(); + assert_eq!( + config.connections[0].connector_type, + ConnectorType::from_static("reddit") + ); + let settings = &config.connections[0].settings; + assert_eq!(settings_str(settings, "client_id"), Some("my-client-id")); + assert_eq!( + settings_str(settings, "client_secret"), + Some("my-client-secret") + ); + assert_eq!( + settings_str(settings, "refresh_token"), + Some("refresh-token") + ); + assert_eq!( + settings_string_list(settings, "subreddits"), + vec!["rust".to_string(), "programming".to_string()] + ); + assert_eq!( + settings_string_list(settings, "keywords"), + vec!["ai".to_string(), "llm".to_string()] + ); + assert_eq!(settings_u32(settings, "min_score"), Some(50)); +} + +#[test] +fn parse_reddit_without_optional_fields() { + let toml = r#" +[[connections]] +id = "reddit" +type = "reddit" +client_id = "id" +client_secret = "secret" +"#; + let config: VoidConfig = toml::from_str(toml).unwrap(); + let settings = &config.connections[0].settings; + assert!(settings_string_list(settings, "subreddits").is_empty()); + assert!(settings_string_list(settings, "keywords").is_empty()); + assert_eq!(settings_u32(settings, "min_score"), None); + assert!(settings_str(settings, "refresh_token").is_none()); +} + +#[test] +fn reddit_settings_debug_redacts_secrets() { + let mut settings = empty_settings(); + settings_set_string(&mut settings, "client_id", "super-secret-client-id"); + settings_set_string(&mut settings, "client_secret", "super-secret-client-secret"); + settings_set_opt_string( + &mut settings, + "refresh_token", + Some("super-secret-refresh-token".to_string()), + ); + let config = ConnectionConfig { + id: "reddit".into(), + connector_type: ConnectorType::from_static("reddit"), + ignore_conversations: vec![], + settings, + }; + let debug = format!("{config:?}"); + assert!(!debug.contains("super-secret-client-secret")); + assert!(!debug.contains("super-secret-refresh-token")); + assert!(debug.contains("super-se...")); +} + +#[test] +fn parse_reddit_config_with_refresh_token() { + let toml = r#" +[[connections]] +id = "reddit" +type = "reddit" +client_id = "id" +client_secret = "secret" +refresh_token = "rt-123" +"#; + let config: VoidConfig = toml::from_str(toml).unwrap(); + assert_eq!( + settings_str(&config.connections[0].settings, "refresh_token"), + Some("rt-123") + ); +} + #[test] fn resolve_config_path_expands_tilde() { let path = super::resolve_config_path(Some(std::path::Path::new("~/.config/void/config.toml"))); diff --git a/crates/void-core/src/config/void_config.rs b/crates/void-core/src/config/void_config.rs index 44b30ad..46dd2f3 100644 --- a/crates/void-core/src/config/void_config.rs +++ b/crates/void-core/src/config/void_config.rs @@ -174,6 +174,10 @@ impl Default for SyncConfig { "github_poll_interval_secs".into(), toml::Value::Integer(default_github_poll() as i64), ); + values.insert( + "reddit_poll_interval_secs".into(), + toml::Value::Integer(default_reddit_poll() as i64), + ); Self { values } } } @@ -220,6 +224,10 @@ impl SyncConfig { self.poll_interval_secs("github", default_github_poll()) } + pub fn reddit_poll_interval_secs(&self) -> u64 { + self.poll_interval_secs("reddit", default_reddit_poll()) + } + pub fn iter_values(&self) -> impl Iterator { self.values.iter() } @@ -253,6 +261,10 @@ fn default_github_poll() -> u64 { 120 } +fn default_reddit_poll() -> u64 { + 3600 +} + impl VoidConfig { /// Parse config from a string without writing migrations or sidecar changes. pub fn parse(content: &str) -> Result { diff --git a/crates/void-reddit/Cargo.toml b/crates/void-reddit/Cargo.toml new file mode 100644 index 0000000..aaca4a2 --- /dev/null +++ b/crates/void-reddit/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "void-reddit" +version.workspace = true +edition.workspace = true +license.workspace = true +rust-version.workspace = true +description = "Reddit adapter for Void CLI" + +[dependencies] +void-core = { workspace = true } +tokio = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +tracing = { workspace = true } +anyhow = { workspace = true } +thiserror = { workspace = true } +async-trait = { workspace = true } +tokio-util = { workspace = true } +reqwest = { workspace = true } +chrono = { workspace = true } +base64 = { workspace = true } +urlencoding = { workspace = true } + +[dev-dependencies] +wiremock = { workspace = true } +tokio = { workspace = true } +tempfile = { workspace = true } diff --git a/crates/void-reddit/src/api.rs b/crates/void-reddit/src/api.rs new file mode 100644 index 0000000..586669c --- /dev/null +++ b/crates/void-reddit/src/api.rs @@ -0,0 +1,928 @@ +use std::sync::Mutex; +use std::time::{Duration, Instant}; + +use base64::Engine; +use reqwest::Client; +use serde::Deserialize; + +pub const OAUTH_REDIRECT_URI: &str = "http://localhost:8765"; +pub const OAUTH_SCOPES: &str = "read submit identity"; + +const DEFAULT_OAUTH_BASE: &str = "https://www.reddit.com"; +const DEFAULT_API_BASE: &str = "https://oauth.reddit.com"; +const USER_AGENT: &str = "void-cli/1.0 (by /u/void-cli)"; +const TOKEN_REFRESH_MARGIN: Duration = Duration::from_secs(60); + +#[derive(Debug, Clone, Deserialize)] +pub struct RedditListing { + pub data: RedditListingData, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct RedditListingData { + #[serde(default)] + pub(crate) children: Vec, +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(tag = "kind", content = "data")] +pub enum RedditListingItem { + #[serde(rename = "t3")] + Post(RedditPost), + #[serde(rename = "t1")] + Comment(RedditComment), + #[serde(rename = "more")] + More(MoreStub), +} + +#[derive(Debug, Clone, Deserialize)] +pub struct MoreStub { + #[serde(default)] + #[allow(dead_code)] + count: u32, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct RedditPost { + pub id: String, + #[serde(default)] + pub title: Option, + #[serde(default)] + pub author: Option, + #[serde(default)] + pub score: Option, + #[serde(default)] + pub url: Option, + #[serde(default)] + pub permalink: Option, + #[serde(default)] + pub num_comments: Option, + #[serde(default)] + pub upvote_ratio: Option, + #[serde(default)] + pub created_utc: Option, + #[serde(default)] + pub subreddit: Option, + #[serde(default)] + pub selftext: Option, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct RedditComment { + pub id: String, + #[serde(default)] + pub author: Option, + #[serde(default)] + pub body: Option, + #[serde(default)] + pub score: Option, + #[serde(default)] + pub parent_id: Option, + #[serde(default)] + pub link_id: Option, + #[serde(default)] + pub created_utc: Option, + #[serde(default)] + pub depth: Option, + #[serde(default, deserialize_with = "deserialize_replies")] + pub(crate) replies: RedditReplies, +} + +#[derive(Debug, Clone, Default)] +pub(crate) enum RedditReplies { + #[default] + Empty, + Listing(RedditListing), +} + +fn deserialize_replies<'de, D>(deserializer: D) -> Result +where + D: serde::Deserializer<'de>, +{ + let value = serde_json::Value::deserialize(deserializer)?; + if value.as_str().is_some_and(str::is_empty) { + return Ok(RedditReplies::Empty); + } + if let Ok(listing) = serde_json::from_value::(value) { + return Ok(RedditReplies::Listing(listing)); + } + Ok(RedditReplies::Empty) +} + +#[derive(Debug, Deserialize)] +struct TokenResponse { + access_token: String, + expires_in: u64, + #[serde(default)] + refresh_token: Option, +} + +#[derive(Debug, Clone)] +pub struct OAuthTokens { + pub access_token: String, + pub refresh_token: Option, + pub expires_in: u64, +} + +struct CachedToken { + access_token: String, + expires_at: Instant, +} + +pub struct RedditClient { + http: Client, + client_id: String, + client_secret: String, + refresh_token: Option, + oauth_base: String, + api_base: String, + token: Mutex>, +} + +impl RedditClient { + pub fn new(client_id: &str, client_secret: &str) -> Self { + Self::with_refresh_token(client_id, client_secret, None) + } + + pub fn with_refresh_token( + client_id: &str, + client_secret: &str, + refresh_token: Option, + ) -> Self { + Self::with_bases( + client_id, + client_secret, + refresh_token, + DEFAULT_OAUTH_BASE, + DEFAULT_API_BASE, + ) + } + + fn with_bases( + client_id: &str, + client_secret: &str, + refresh_token: Option, + oauth_base: &str, + api_base: &str, + ) -> Self { + Self { + http: Client::new(), + client_id: client_id.to_string(), + client_secret: client_secret.to_string(), + refresh_token, + oauth_base: oauth_base.trim_end_matches('/').to_string(), + api_base: api_base.trim_end_matches('/').to_string(), + token: Mutex::new(None), + } + } + + pub fn has_user_token(&self) -> bool { + self.refresh_token.is_some() + } + + pub fn user_agent(&self) -> &'static str { + USER_AGENT + } + + pub fn authorize_url(&self, state: &str, redirect_uri: &str) -> String { + format!( + "{}/api/v1/authorize?client_id={}&response_type=code&state={}&redirect_uri={}&duration=permanent&scope={}", + self.oauth_base, + urlencoding::encode(&self.client_id), + urlencoding::encode(state), + urlencoding::encode(redirect_uri), + urlencoding::encode(OAUTH_SCOPES), + ) + } + + pub async fn exchange_authorization_code( + &self, + code: &str, + redirect_uri: &str, + ) -> anyhow::Result { + let body = format!( + "grant_type=authorization_code&code={}&redirect_uri={}", + urlencoding::encode(code), + urlencoding::encode(redirect_uri), + ); + self.request_tokens(&body).await + } + + pub async fn subreddit_hot( + &self, + subreddit: &str, + limit: u32, + ) -> anyhow::Result> { + let token = self.access_token().await?; + let url = format!("{}/r/{}/hot", self.api_base, sanitize_subreddit(subreddit)); + + let response = self + .http + .get(&url) + .query(&[("limit", limit.to_string())]) + .header("Authorization", format!("Bearer {token}")) + .header("User-Agent", USER_AGENT) + .send() + .await?; + + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + anyhow::bail!("Reddit API error {status}: {body}"); + } + + let listing: RedditListing = response.json().await?; + Ok(listing + .data + .children + .into_iter() + .filter_map(|item| match item { + RedditListingItem::Post(post) => Some(post), + _ => None, + }) + .collect()) + } + + pub async fn get_post_comments( + &self, + post_id: &str, + sort: &str, + limit: u32, + depth: u32, + ) -> anyhow::Result<(RedditPost, Vec)> { + let token = self.access_token().await?; + let url = format!("{}/comments/{}", self.api_base, post_id); + + let response = self + .http + .get(&url) + .query(&[ + ("sort", sort.to_string()), + ("limit", limit.to_string()), + ("depth", depth.to_string()), + ]) + .header("Authorization", format!("Bearer {token}")) + .header("User-Agent", USER_AGENT) + .send() + .await?; + + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + anyhow::bail!("Reddit API error {status}: {body}"); + } + + let listings: Vec = response.json().await?; + let post = listings + .first() + .and_then(|listing| { + listing.data.children.first().and_then(|item| match item { + RedditListingItem::Post(post) => Some(post.clone()), + _ => None, + }) + }) + .ok_or_else(|| anyhow::anyhow!("Reddit comments response missing post listing"))?; + + let mut comments = Vec::new(); + if let Some(comment_listing) = listings.get(1) { + flatten_comments(&comment_listing.data.children, &mut comments); + } + + Ok((post, comments)) + } + + pub async fn post_comment(&self, thing_id: &str, text: &str) -> anyhow::Result { + if !self.has_user_token() { + anyhow::bail!( + "Reddit commenting requires OAuth authorization. Run `void setup` and enable commenting." + ); + } + + let token = self.access_token().await?; + let url = format!("{}/api/comment", self.api_base); + let body = format!( + "thing_id={}&text={}&api_type=json", + urlencoding::encode(thing_id), + urlencoding::encode(text), + ); + + let response = self + .http + .post(&url) + .header("Authorization", format!("Bearer {token}")) + .header("User-Agent", USER_AGENT) + .header("Content-Type", "application/x-www-form-urlencoded") + .body(body) + .send() + .await?; + + let status = response.status(); + let raw = response.text().await.unwrap_or_default(); + if !status.is_success() { + anyhow::bail!("Reddit API error {status}: {raw}"); + } + + let parsed: CommentResponse = serde_json::from_str(&raw) + .map_err(|e| anyhow::anyhow!("Failed to parse Reddit comment response: {e}: {raw}"))?; + + if let Some(errors) = parsed.json.errors { + if !errors.is_empty() { + anyhow::bail!("Reddit comment error: {errors:?}"); + } + } + + let comment_id = parsed + .json + .data + .and_then(|data| data.things.into_iter().next()) + .and_then(|thing| thing.data.id) + .ok_or_else(|| anyhow::anyhow!("Reddit comment response missing comment id"))?; + + Ok(format!("t1_{comment_id}")) + } + + pub(crate) async fn access_token(&self) -> anyhow::Result { + if let Some(token) = self.cached_token()? { + return Ok(token); + } + self.fetch_token().await + } + + fn cached_token(&self) -> anyhow::Result> { + let guard = self + .token + .lock() + .map_err(|_| anyhow::anyhow!("token lock poisoned"))?; + Ok(guard.as_ref().and_then(|cached| { + if cached.expires_at > Instant::now() { + Some(cached.access_token.clone()) + } else { + None + } + })) + } + + async fn fetch_token(&self) -> anyhow::Result { + let body = if let Some(ref refresh_token) = self.refresh_token { + format!( + "grant_type=refresh_token&refresh_token={}", + urlencoding::encode(refresh_token) + ) + } else { + "grant_type=client_credentials".to_string() + }; + + let tokens = self.request_tokens(&body).await?; + self.store_access_token(&tokens.access_token, tokens.expires_in); + Ok(tokens.access_token) + } + + async fn request_tokens(&self, body: &str) -> anyhow::Result { + let url = format!("{}/api/v1/access_token", self.oauth_base); + let auth = base64::engine::general_purpose::STANDARD + .encode(format!("{}:{}", self.client_id, self.client_secret)); + + let response = self + .http + .post(&url) + .header("Authorization", format!("Basic {auth}")) + .header("User-Agent", USER_AGENT) + .header("Content-Type", "application/x-www-form-urlencoded") + .body(body.to_string()) + .send() + .await?; + + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + anyhow::bail!("Reddit OAuth error {status}: {body}"); + } + + let token_response: TokenResponse = response.json().await?; + Ok(OAuthTokens { + access_token: token_response.access_token, + refresh_token: token_response.refresh_token, + expires_in: token_response.expires_in, + }) + } + + fn store_access_token(&self, access_token: &str, expires_in: u64) { + let expires_at = Instant::now() + Duration::from_secs(expires_in) - TOKEN_REFRESH_MARGIN; + if let Ok(mut guard) = self.token.lock() { + *guard = Some(CachedToken { + access_token: access_token.to_string(), + expires_at, + }); + } + } + + #[cfg(test)] + pub(crate) fn expired_token_for_test(&self, token: &str) { + let mut guard = self.token.lock().unwrap(); + *guard = Some(CachedToken { + access_token: token.to_string(), + expires_at: Instant::now() - Duration::from_secs(1), + }); + } + + #[cfg(test)] + fn with_bases_for_test( + client_id: &str, + client_secret: &str, + refresh_token: Option, + oauth_base: &str, + api_base: &str, + ) -> Self { + Self::with_bases( + client_id, + client_secret, + refresh_token, + oauth_base, + api_base, + ) + } +} + +fn flatten_comments(children: &[RedditListingItem], out: &mut Vec) { + for item in children { + match item { + RedditListingItem::Comment(comment) => { + if let RedditReplies::Listing(listing) = &comment.replies { + flatten_comments(&listing.data.children, out); + } + out.push(RedditComment { + replies: RedditReplies::Empty, + ..comment.clone() + }); + } + RedditListingItem::More(_) => {} + RedditListingItem::Post(_) => {} + } + } +} + +#[derive(Debug, Deserialize)] +struct CommentResponse { + json: CommentResponseJson, +} + +#[derive(Debug, Deserialize)] +struct CommentResponseJson { + #[serde(default)] + errors: Option>, + data: Option, +} + +#[derive(Debug, Deserialize)] +struct CommentResponseData { + things: Vec, +} + +#[derive(Debug, Deserialize)] +struct CommentThing { + data: CommentThingData, +} + +#[derive(Debug, Deserialize)] +struct CommentThingData { + id: Option, +} + +/// Normalize subreddit names for API paths and stable IDs. +pub fn sanitize_subreddit(name: &str) -> String { + name.trim() + .trim_start_matches("r/") + .trim_start_matches('/') + .to_lowercase() + .chars() + .filter(|c| c.is_ascii_alphanumeric() || *c == '_') + .collect() +} + +/// Extract a Reddit post ID from an external ID, URL, or raw ID. +pub fn extract_post_id(input: &str) -> anyhow::Result { + let trimmed = input.trim(); + if trimmed.is_empty() { + anyhow::bail!("post id is empty"); + } + + if let Some(id) = trimmed.strip_prefix("t3_") { + return Ok(id.to_string()); + } + + if trimmed.contains("_post_") { + if let Some(rest) = trimmed.rsplit("_post_").next() { + return Ok(rest.to_string()); + } + } + + if trimmed.starts_with("reddit_") { + if let Some(rest) = trimmed.rsplit('_').next() { + return Ok(rest.to_string()); + } + } + + if trimmed.contains("/comments/") { + let parts: Vec<&str> = trimmed.split('/').collect(); + if let Some(idx) = parts.iter().position(|p| *p == "comments") { + if let Some(id) = parts.get(idx + 1) { + return Ok(id.to_string()); + } + } + } + + if trimmed.chars().all(|c| c.is_ascii_alphanumeric()) { + return Ok(trimmed.to_string()); + } + + anyhow::bail!("unable to parse Reddit post id from '{trimmed}'") +} + +/// Extract a Reddit comment ID from a void message external ID. +pub fn extract_comment_id_from_external( + msg_external_id: &str, + connection_id: &str, +) -> anyhow::Result { + let prefix = format!("reddit_{connection_id}_comment_"); + msg_external_id + .strip_prefix(&prefix) + .map(str::to_string) + .ok_or_else(|| { + anyhow::anyhow!("unable to parse Reddit comment id from '{msg_external_id}'") + }) +} + +/// Extract a Reddit post ID from a void post-body message external ID. +pub fn extract_post_id_from_postbody_external( + msg_external_id: &str, + connection_id: &str, +) -> anyhow::Result { + let prefix = format!("reddit_{connection_id}_postbody_"); + msg_external_id + .strip_prefix(&prefix) + .map(str::to_string) + .ok_or_else(|| anyhow::anyhow!("unable to parse Reddit post id from '{msg_external_id}'")) +} + +#[cfg(test)] +mod tests { + use wiremock::matchers::{body_string, header, method, path, query_param}; + use wiremock::{Mock, MockServer, ResponseTemplate}; + + use super::*; + + #[test] + fn sanitize_subreddit_strips_prefix_and_invalid_chars() { + assert_eq!(sanitize_subreddit("r/Rust"), "rust"); + assert_eq!(sanitize_subreddit("/programming"), "programming"); + assert_eq!(sanitize_subreddit("start-ups!"), "startups"); + } + + #[test] + fn extract_post_id_from_various_inputs() { + assert_eq!(extract_post_id("abc123").unwrap(), "abc123"); + assert_eq!(extract_post_id("t3_abc123").unwrap(), "abc123"); + assert_eq!( + extract_post_id("reddit_reddit_post_abc123").unwrap(), + "abc123" + ); + assert_eq!( + extract_post_id("https://www.reddit.com/r/rust/comments/abc123/title/").unwrap(), + "abc123" + ); + } + + #[test] + fn extract_comment_id_from_external_id() { + assert_eq!( + super::extract_comment_id_from_external("reddit_reddit_comment_xyz", "reddit").unwrap(), + "xyz" + ); + } + + #[test] + fn deserialize_listing_response() { + let json = r#"{ + "kind": "Listing", + "data": { + "children": [{ + "kind": "t3", + "data": { + "id": "abc123", + "title": "Hello Rust", + "author": "dev", + "score": 150, + "url": "https://example.com", + "permalink": "/r/rust/comments/abc123/hello/", + "num_comments": 42, + "upvote_ratio": 0.91, + "created_utc": 1700000000.0, + "subreddit": "rust", + "selftext": "body" + } + }] + } + }"#; + let listing: RedditListing = serde_json::from_str(json).unwrap(); + assert_eq!(listing.data.children.len(), 1); + let post = match &listing.data.children[0] { + RedditListingItem::Post(post) => post, + _ => panic!("expected post"), + }; + assert_eq!(post.id, "abc123"); + assert_eq!(post.title.as_deref(), Some("Hello Rust")); + assert_eq!(post.score, Some(150)); + } + + #[test] + fn deserialize_missing_optional_fields() { + let json = r#"{ + "data": { + "children": [{ + "kind": "t3", + "data": { + "id": "x1", + "title": null, + "author": null, + "score": null, + "url": null, + "permalink": null, + "num_comments": null, + "upvote_ratio": null, + "created_utc": null, + "subreddit": null, + "selftext": null + } + }] + } + }"#; + let listing: RedditListing = serde_json::from_str(json).unwrap(); + let post = match &listing.data.children[0] { + RedditListingItem::Post(post) => post, + _ => panic!("expected post"), + }; + assert_eq!(post.id, "x1"); + assert!(post.title.is_none()); + assert!(post.score.is_none()); + } + + #[tokio::test] + async fn token_request_uses_client_credentials_and_user_agent() { + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/api/v1/access_token")) + .and(header("User-Agent", USER_AGENT)) + .and(header("Authorization", "Basic Y2xpZW50OmFwcC1zZWNyZXQ=")) + .and(body_string("grant_type=client_credentials")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "access_token": "tok123", + "token_type": "bearer", + "expires_in": 3600 + }))) + .mount(&server) + .await; + + let client = RedditClient::with_bases_for_test( + "client", + "app-secret", + None, + &server.uri(), + &server.uri(), + ); + let token = client.access_token().await.unwrap(); + assert_eq!(token, "tok123"); + } + + #[tokio::test] + async fn token_request_uses_refresh_token_grant() { + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/api/v1/access_token")) + .and(body_string( + "grant_type=refresh_token&refresh_token=refresh-abc", + )) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "access_token": "user-tok", + "token_type": "bearer", + "expires_in": 3600 + }))) + .mount(&server) + .await; + + let client = RedditClient::with_bases_for_test( + "client", + "secret", + Some("refresh-abc".into()), + &server.uri(), + &server.uri(), + ); + let token = client.access_token().await.unwrap(); + assert_eq!(token, "user-tok"); + } + + #[tokio::test] + async fn subreddit_request_uses_bearer_and_limit() { + let server = MockServer::start().await; + + Mock::given(method("POST")) + .and(path("/api/v1/access_token")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "access_token": "tok123", + "expires_in": 3600 + }))) + .mount(&server) + .await; + + Mock::given(method("GET")) + .and(path("/r/rust/hot")) + .and(query_param("limit", "100")) + .and(header("Authorization", "Bearer tok123")) + .and(header("User-Agent", USER_AGENT)) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "data": { "children": [] } + }))) + .mount(&server) + .await; + + let client = RedditClient::with_bases_for_test( + "client", + "secret", + None, + &server.uri(), + &server.uri(), + ); + let posts = client.subreddit_hot("rust", 100).await.unwrap(); + assert!(posts.is_empty()); + } + + #[tokio::test] + async fn refreshes_expired_token() { + let server = MockServer::start().await; + + Mock::given(method("POST")) + .and(path("/api/v1/access_token")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "access_token": "fresh-token", + "expires_in": 3600 + }))) + .expect(1) + .mount(&server) + .await; + + Mock::given(method("GET")) + .and(path("/r/rust/hot")) + .and(header("Authorization", "Bearer fresh-token")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "data": { "children": [] } + }))) + .mount(&server) + .await; + + let client = RedditClient::with_bases_for_test( + "client", + "secret", + None, + &server.uri(), + &server.uri(), + ); + client.expired_token_for_test("stale-token"); + let posts = client.subreddit_hot("rust", 100).await.unwrap(); + assert!(posts.is_empty()); + } + + #[tokio::test] + async fn get_post_comments_parses_nested_tree_and_skips_more() { + let server = MockServer::start().await; + + Mock::given(method("POST")) + .and(path("/api/v1/access_token")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "access_token": "tok123", + "expires_in": 3600 + }))) + .mount(&server) + .await; + + Mock::given(method("GET")) + .and(path("/comments/abc123")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!([ + { + "data": { + "children": [{ + "kind": "t3", + "data": { + "id": "abc123", + "title": "Post title", + "author": "author", + "selftext": "body" + } + }] + } + }, + { + "data": { + "children": [ + { + "kind": "t1", + "data": { + "id": "c1", + "author": "u1", + "body": "top", + "parent_id": "t3_abc123", + "link_id": "t3_abc123", + "created_utc": 1700000000.0, + "depth": 0, + "replies": { + "data": { + "children": [{ + "kind": "t1", + "data": { + "id": "c2", + "author": "u2", + "body": "reply", + "parent_id": "t1_c1", + "link_id": "t3_abc123", + "created_utc": 1700000001.0, + "depth": 1, + "replies": "" + } + }] + } + } + } + }, + { + "kind": "more", + "data": { "count": 5 } + } + ] + } + } + ]))) + .mount(&server) + .await; + + let client = RedditClient::with_bases_for_test( + "client", + "secret", + Some("refresh".into()), + &server.uri(), + &server.uri(), + ); + let (post, comments) = client + .get_post_comments("abc123", "new", 200, 3) + .await + .unwrap(); + assert_eq!(post.id, "abc123"); + assert_eq!(comments.len(), 2); + assert_eq!(comments[0].id, "c2"); + assert_eq!(comments[1].id, "c1"); + } + + #[tokio::test] + async fn post_comment_sends_form_body_and_parses_response() { + let server = MockServer::start().await; + + Mock::given(method("POST")) + .and(path("/api/v1/access_token")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "access_token": "tok123", + "expires_in": 3600 + }))) + .mount(&server) + .await; + + Mock::given(method("POST")) + .and(path("/api/comment")) + .and(body_string("thing_id=t1_parent&text=hello&api_type=json")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "json": { + "errors": [], + "data": { + "things": [{ + "data": { "id": "newcomment" } + }] + } + } + }))) + .mount(&server) + .await; + + let client = RedditClient::with_bases_for_test( + "client", + "secret", + Some("refresh".into()), + &server.uri(), + &server.uri(), + ); + let id = client.post_comment("t1_parent", "hello").await.unwrap(); + assert_eq!(id, "t1_newcomment"); + } + + #[tokio::test] + async fn post_comment_without_user_token_fails() { + let client = RedditClient::new("client", "secret"); + let err = client.post_comment("t3_abc", "hello").await.unwrap_err(); + assert!(err.to_string().contains("OAuth authorization")); + } +} diff --git a/crates/void-reddit/src/connector/mod.rs b/crates/void-reddit/src/connector/mod.rs new file mode 100644 index 0000000..1e6d048 --- /dev/null +++ b/crates/void-reddit/src/connector/mod.rs @@ -0,0 +1,150 @@ +mod sync; + +use std::sync::Arc; + +use async_trait::async_trait; +use tokio_util::sync::CancellationToken; +use void_core::connector::Connector; +use void_core::db::Database; +use void_core::models::{parse_reply_id, ConnectorType, HealthStatus, MessageContent}; + +pub struct RedditConnector { + config_id: String, + client_id: String, + client_secret: String, + refresh_token: Option, + subreddits: Vec, + keywords: Vec, + min_score: u32, + poll_interval_secs: u64, +} + +impl RedditConnector { + #[allow(clippy::too_many_arguments)] + pub fn new( + connection_id: &str, + client_id: String, + client_secret: String, + refresh_token: Option, + subreddits: Vec, + keywords: Vec, + min_score: u32, + poll_interval_secs: u64, + ) -> Self { + Self { + config_id: connection_id.to_string(), + client_id, + client_secret, + refresh_token, + subreddits: subreddits + .iter() + .map(|s| crate::api::sanitize_subreddit(s)) + .filter(|s| !s.is_empty()) + .collect(), + keywords: keywords.iter().map(|k| k.to_lowercase()).collect(), + min_score, + poll_interval_secs, + } + } + + fn client(&self) -> crate::api::RedditClient { + crate::api::RedditClient::with_refresh_token( + &self.client_id, + &self.client_secret, + self.refresh_token.clone(), + ) + } +} + +#[async_trait] +impl Connector for RedditConnector { + fn connector_type(&self) -> ConnectorType { + ConnectorType::from_static(crate::CONNECTOR_ID) + } + + fn connection_id(&self) -> &str { + &self.config_id + } + + async fn authenticate(&mut self) -> anyhow::Result<()> { + let client = self.client(); + let _ = client.subreddit_hot("all", 1).await?; + Ok(()) + } + + async fn start_sync(&self, db: Arc, cancel: CancellationToken) -> anyhow::Result<()> { + sync::run_sync( + &db, + &self.config_id, + &self.client_id, + &self.client_secret, + self.refresh_token.as_deref(), + &self.subreddits, + &self.keywords, + self.min_score, + self.poll_interval_secs, + cancel, + ) + .await + } + + async fn health_check(&self) -> anyhow::Result { + let client = self.client(); + let ok = client.subreddit_hot("all", 1).await.is_ok(); + Ok(HealthStatus { + connection_id: self.config_id.clone(), + connector_type: ConnectorType::from_static(crate::CONNECTOR_ID), + ok, + message: if ok { + if self.refresh_token.is_some() { + "Reddit OAuth credentials valid (commenting enabled)".to_string() + } else { + "Reddit OAuth credentials valid (read-only)".to_string() + } + } else { + "Reddit OAuth check failed".to_string() + }, + last_sync: None, + message_count: None, + }) + } + + async fn send_message(&self, to: &str, content: MessageContent) -> anyhow::Result { + let text = content.text(); + let post_id = crate::api::extract_post_id(to)?; + let client = self.client(); + client.post_comment(&format!("t3_{post_id}"), text).await + } + + async fn reply( + &self, + message_id: &str, + content: MessageContent, + _in_thread: bool, + ) -> anyhow::Result { + let (conv_external_id, msg_external_id) = parse_reply_id(message_id)?; + let text = content.text(); + let client = self.client(); + + let thing_id = if msg_external_id.contains("_postbody_") { + let post_id = crate::api::extract_post_id_from_postbody_external( + &msg_external_id, + &self.config_id, + )?; + format!("t3_{post_id}") + } else if msg_external_id.contains("_comment_") { + let comment_id = + crate::api::extract_comment_id_from_external(&msg_external_id, &self.config_id)?; + format!("t1_{comment_id}") + } else if conv_external_id.contains("_post_") { + let post_id = crate::api::extract_post_id(&conv_external_id)?; + format!("t3_{post_id}") + } else { + anyhow::bail!( + "Reddit reply target must be a post thread comment or post body (got {message_id})" + ); + }; + + client.post_comment(&thing_id, text).await + } +} diff --git a/crates/void-reddit/src/connector/sync.rs b/crates/void-reddit/src/connector/sync.rs new file mode 100644 index 0000000..460a7b3 --- /dev/null +++ b/crates/void-reddit/src/connector/sync.rs @@ -0,0 +1,589 @@ +use std::sync::Arc; +use std::time::{Duration, SystemTime}; + +use tokio_util::sync::CancellationToken; +use tracing::{error, info, warn}; +use void_core::db::Database; +use void_core::models::{Conversation, ConversationKind, Message}; +use void_core::progress::BackfillProgress; + +use crate::api::{sanitize_subreddit, RedditClient, RedditComment, RedditPost}; + +const REDDIT_BASE: &str = "https://www.reddit.com"; +const POSTS_PER_SUBREDDIT: u32 = 100; +const COMMENT_FETCH_DELAY: Duration = Duration::from_millis(1100); + +/// Wall-clock threshold to detect hibernation gaps (same rationale as Gmail/Slack/HN). +const IDLE_THRESHOLD: Duration = Duration::from_secs(3 * 60); + +#[allow(clippy::too_many_arguments)] +pub(super) async fn run_sync( + db: &Arc, + connection_id: &str, + client_id: &str, + client_secret: &str, + refresh_token: Option<&str>, + subreddits: &[String], + keywords: &[String], + min_score: u32, + poll_interval_secs: u64, + cancel: CancellationToken, +) -> anyhow::Result<()> { + let client = RedditClient::with_refresh_token( + client_id, + client_secret, + refresh_token.map(str::to_string), + ); + let comment_sync_enabled = refresh_token.is_some(); + + for subreddit in subreddits { + ensure_subreddit_conversation(db, connection_id, subreddit)?; + } + + info!(connection_id, "running initial Reddit sync"); + if let Err(e) = poll_subreddits( + &client, + db, + connection_id, + subreddits, + keywords, + min_score, + comment_sync_enabled, + &cancel, + true, + ) + .await + { + error!(connection_id, error = %e, "initial Reddit sync failed"); + } + + let mut interval = tokio::time::interval(Duration::from_secs(poll_interval_secs)); + interval.tick().await; + let mut last_poll = SystemTime::now(); + + loop { + tokio::select! { + _ = cancel.cancelled() => { + info!(connection_id, "Reddit sync cancelled"); + break; + } + _ = interval.tick() => { + let elapsed = last_poll.elapsed().unwrap_or_default(); + if elapsed > IDLE_THRESHOLD { + warn!( + connection_id, + idle_secs = elapsed.as_secs(), + "Reddit sync was idle, catching up" + ); + void_core::status!( + "[reddit:{connection_id}] sync idle for {}s, catching up", + elapsed.as_secs(), + ); + } else { + info!(connection_id, "polling Reddit"); + } + if let Err(e) = poll_subreddits( + &client, + db, + connection_id, + subreddits, + keywords, + min_score, + comment_sync_enabled, + &cancel, + elapsed > IDLE_THRESHOLD, + ) + .await + { + error!(connection_id, error = %e, "Reddit poll error"); + } + last_poll = SystemTime::now(); + } + } + } + Ok(()) +} + +fn ensure_subreddit_conversation( + db: &Arc, + connection_id: &str, + subreddit: &str, +) -> anyhow::Result<()> { + let sub = sanitize_subreddit(subreddit); + let conv_external_id = format!("reddit_{connection_id}_{sub}"); + let conv = Conversation { + id: format!("{connection_id}-{sub}"), + connection_id: connection_id.to_string(), + connector: "reddit".to_string(), + external_id: conv_external_id, + name: Some(format!("r/{sub}")), + kind: ConversationKind::Channel, + last_message_at: None, + unread_count: 0, + is_muted: false, + metadata: None, + }; + db.upsert_conversation(&conv)?; + Ok(()) +} + +#[allow(clippy::too_many_arguments)] +async fn poll_subreddits( + client: &RedditClient, + db: &Arc, + connection_id: &str, + subreddits: &[String], + keywords: &[String], + min_score: u32, + comment_sync_enabled: bool, + cancel: &CancellationToken, + show_progress: bool, +) -> anyhow::Result<()> { + if subreddits.is_empty() { + warn!(connection_id, "no subreddits configured, skipping poll"); + return Ok(()); + } + + let mut progress = show_progress.then(|| { + BackfillProgress::new(&format!("reddit:{connection_id}"), "posts") + .with_secondary("ingested") + }); + + for subreddit in subreddits { + if cancel.is_cancelled() { + break; + } + + let sub = sanitize_subreddit(subreddit); + let conv_id = format!("{connection_id}-{sub}"); + + let posts = match client.subreddit_hot(&sub, POSTS_PER_SUBREDDIT).await { + Ok(posts) => posts, + Err(e) => { + warn!(subreddit = %sub, error = %e, "failed to fetch subreddit posts"); + continue; + } + }; + + for post in posts { + if cancel.is_cancelled() { + break; + } + + if let Some(ref mut p) = progress { + p.inc(1); + } + + let external_id = format!("reddit_{connection_id}_{}", post.id); + let already_exists = db.message_exists(connection_id, &external_id)?; + + if !already_exists { + if !matches_filters(&post, keywords, min_score) { + continue; + } + + let msg = build_message(&post, connection_id, &conv_id, &sub); + db.upsert_message(&msg)?; + + let when_str = chrono::DateTime::from_timestamp(msg.timestamp, 0) + .map(|dt| dt.format("%Y-%m-%d %H:%M:%S").to_string()) + .unwrap_or_default(); + let title = post.title.as_deref().unwrap_or("(untitled)"); + let author = post.author.as_deref().unwrap_or("unknown"); + eprintln!("[reddit:{connection_id}] {when_str} (new) r/{sub} β€” {author}: {title}"); + + if let Some(ref mut p) = progress { + p.inc_secondary(1); + } + } + + if comment_sync_enabled + && (already_exists || matches_filters(&post, keywords, min_score)) + { + if let Err(e) = sync_post_comments(client, db, connection_id, &post, &sub).await { + warn!(post_id = %post.id, error = %e, "failed to sync Reddit post comments"); + } + tokio::time::sleep(COMMENT_FETCH_DELAY).await; + } + } + } + + if let Some(p) = progress { + p.finish(); + } + + if !cancel.is_cancelled() { + db.set_sync_state( + connection_id, + "reddit_last_sync", + &chrono::Utc::now().timestamp().to_string(), + )?; + } + + Ok(()) +} + +async fn sync_post_comments( + client: &RedditClient, + db: &Arc, + connection_id: &str, + post: &RedditPost, + subreddit: &str, +) -> anyhow::Result<()> { + let (thread_conv, post_body_msg) = + build_post_thread_conversation(post, connection_id, subreddit); + db.upsert_conversation(&thread_conv)?; + db.upsert_message(&post_body_msg)?; + + let (_, comments) = client.get_post_comments(&post.id, "new", 200, 3).await?; + + for comment in comments { + let external_id = format!("reddit_{connection_id}_comment_{}", comment.id); + if db.message_exists(connection_id, &external_id)? { + continue; + } + let msg = build_comment_message( + &comment, + connection_id, + &thread_conv.id, + subreddit, + &post.id, + ); + db.upsert_message(&msg)?; + } + + Ok(()) +} + +pub(crate) fn build_post_thread_conversation( + post: &RedditPost, + connection_id: &str, + subreddit: &str, +) -> (Conversation, Message) { + let post_id = &post.id; + let conv_id = format!("{connection_id}-post-{post_id}"); + let conv_external_id = format!("reddit_{connection_id}_post_{post_id}"); + let title = post.title.as_deref().unwrap_or("(untitled)"); + let permalink = post.permalink.as_deref().unwrap_or(""); + let reddit_url = if permalink.starts_with("http") { + permalink.to_string() + } else { + format!("{REDDIT_BASE}{permalink}") + }; + + let conv = Conversation { + id: conv_id.clone(), + connection_id: connection_id.to_string(), + connector: "reddit".to_string(), + external_id: conv_external_id.clone(), + name: Some(title.to_string()), + kind: ConversationKind::Thread, + last_message_at: post.created_utc.map(|ts| ts as i64), + unread_count: 0, + is_muted: false, + metadata: Some(serde_json::json!({ + "reddit_id": post_id, + "subreddit": subreddit, + "permalink": reddit_url, + })), + }; + + let post_body_msg = build_post_body_message(post, connection_id, &conv_id, subreddit); + (conv, post_body_msg) +} + +pub(crate) fn build_post_body_message( + post: &RedditPost, + connection_id: &str, + conv_id: &str, + subreddit: &str, +) -> Message { + let post_id = &post.id; + let title = post.title.as_deref().unwrap_or("(untitled)"); + let author = post.author.as_deref().unwrap_or("[deleted]"); + let selftext = post.selftext.as_deref().unwrap_or("").trim(); + let permalink = post.permalink.as_deref().unwrap_or(""); + let reddit_url = if permalink.starts_with("http") { + permalink.to_string() + } else { + format!("{REDDIT_BASE}{permalink}") + }; + + let body = if selftext.is_empty() { + format!("{title}\n{reddit_url}") + } else { + format!("{title}\n\n{selftext}\n\n{reddit_url}") + }; + + let timestamp = post + .created_utc + .map(|ts| ts as i64) + .unwrap_or_else(|| chrono::Utc::now().timestamp()); + + Message { + id: conv_id.to_string(), + conversation_id: conv_id.to_string(), + connection_id: connection_id.to_string(), + connector: "reddit".to_string(), + external_id: format!("reddit_{connection_id}_postbody_{post_id}"), + sender: author.to_string(), + sender_name: Some(author.to_string()), + sender_avatar_url: None, + body: Some(body), + timestamp, + synced_at: Some(chrono::Utc::now().timestamp()), + is_archived: false, + is_saved: false, + reply_to_id: None, + media_type: None, + metadata: Some(serde_json::json!({ + "reddit_id": post_id, + "subreddit": subreddit, + "permalink": reddit_url, + "source": "reddit_post_body", + })), + context_id: None, + context: None, + } +} + +pub(crate) fn build_comment_message( + comment: &RedditComment, + connection_id: &str, + conv_id: &str, + subreddit: &str, + post_id: &str, +) -> Message { + let author = comment.author.as_deref().unwrap_or("[deleted]"); + let body = comment.body.as_deref().unwrap_or("[removed]").to_string(); + let score = comment.score.unwrap_or(0).max(0) as u32; + let timestamp = comment + .created_utc + .map(|ts| ts as i64) + .unwrap_or_else(|| chrono::Utc::now().timestamp()); + + Message { + id: format!("{connection_id}-comment-{}", comment.id), + conversation_id: conv_id.to_string(), + connection_id: connection_id.to_string(), + connector: "reddit".to_string(), + external_id: format!("reddit_{connection_id}_comment_{}", comment.id), + sender: author.to_string(), + sender_name: Some(author.to_string()), + sender_avatar_url: None, + body: Some(body), + timestamp, + synced_at: Some(chrono::Utc::now().timestamp()), + is_archived: false, + is_saved: false, + reply_to_id: None, + media_type: None, + metadata: Some(serde_json::json!({ + "reddit_id": comment.id, + "post_id": post_id, + "subreddit": subreddit, + "parent_id": comment.parent_id, + "score": score, + "depth": comment.depth, + "source": "reddit_comment", + })), + context_id: None, + context: None, + } +} + +pub(crate) fn matches_filters(post: &RedditPost, keywords: &[String], min_score: u32) -> bool { + let score = post.score.unwrap_or(0).max(0) as u32; + if score < min_score { + return false; + } + + if keywords.is_empty() { + return true; + } + + let title = post.title.as_deref().unwrap_or("").to_lowercase(); + keywords.iter().any(|kw| title.contains(kw.as_str())) +} + +pub(crate) fn build_message( + post: &RedditPost, + connection_id: &str, + conv_id: &str, + subreddit: &str, +) -> Message { + let post_id = &post.id; + let title = post.title.as_deref().unwrap_or("(untitled)"); + let author = post.author.as_deref().unwrap_or("[deleted]"); + let score = post.score.unwrap_or(0).max(0) as u32; + let url = post.url.as_deref().unwrap_or("").to_string(); + let permalink = post.permalink.as_deref().unwrap_or(""); + let reddit_url = if permalink.starts_with("http") { + permalink.to_string() + } else { + format!("{REDDIT_BASE}{permalink}") + }; + let comments = post.num_comments.unwrap_or(0); + let upvote_ratio = post.upvote_ratio.unwrap_or(0.0); + + let body = if url.is_empty() || url == reddit_url { + format!("{title}\n{reddit_url}\n{score} upvotes | {comments} comments") + } else { + format!("{title}\n{url}\n{reddit_url}\n{score} upvotes | {comments} comments") + }; + + let metadata = serde_json::json!({ + "reddit_id": post_id, + "subreddit": subreddit, + "score": score, + "url": url, + "reddit_url": reddit_url, + "num_comments": comments, + "upvote_ratio": upvote_ratio, + }); + + let timestamp = post + .created_utc + .map(|ts| ts as i64) + .unwrap_or_else(|| chrono::Utc::now().timestamp()); + + Message { + id: format!("{connection_id}-{post_id}"), + conversation_id: conv_id.to_string(), + connection_id: connection_id.to_string(), + connector: "reddit".to_string(), + external_id: format!("reddit_{connection_id}_{post_id}"), + sender: author.to_string(), + sender_name: Some(author.to_string()), + sender_avatar_url: None, + body: Some(body), + timestamp, + synced_at: Some(chrono::Utc::now().timestamp()), + is_archived: false, + is_saved: false, + reply_to_id: None, + media_type: None, + metadata: Some(metadata), + context_id: None, + context: None, + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn make_post(id: &str, title: &str, score: i32) -> RedditPost { + RedditPost { + id: id.to_string(), + title: Some(title.to_string()), + author: Some("author".to_string()), + score: Some(score), + url: Some("https://example.com".to_string()), + permalink: Some("/r/rust/comments/abc/hello/".to_string()), + num_comments: Some(10), + upvote_ratio: Some(0.95), + created_utc: Some(1_700_000_000.0), + subreddit: Some("rust".to_string()), + selftext: Some("post body".to_string()), + } + } + + #[test] + fn matches_keyword_case_insensitive() { + let post = make_post("1", "Rust is Amazing", 200); + let keywords = vec!["rust".to_string()]; + assert!(matches_filters(&post, &keywords, 0)); + } + + #[test] + fn rejects_below_min_score() { + let post = make_post("1", "Rust is Amazing", 50); + let keywords = vec!["rust".to_string()]; + assert!(!matches_filters(&post, &keywords, 100)); + } + + #[test] + fn rejects_non_matching_keyword() { + let post = make_post("1", "Python is Great", 200); + let keywords = vec!["rust".to_string()]; + assert!(!matches_filters(&post, &keywords, 0)); + } + + #[test] + fn empty_keywords_matches_all_posts_above_threshold() { + let post = make_post("1", "Anything Goes", 200); + assert!(matches_filters(&post, &[], 0)); + } + + #[test] + fn missing_score_defaults_to_zero() { + let mut post = make_post("1", "No score", 0); + post.score = None; + assert!(matches_filters(&post, &[], 0)); + assert!(!matches_filters(&post, &[], 1)); + } + + #[test] + fn build_message_includes_all_fields() { + let post = make_post("abc123", "Cool Rust Tool", 350); + let msg = build_message(&post, "reddit", "reddit-rust", "rust"); + assert_eq!(msg.id, "reddit-abc123"); + assert_eq!(msg.conversation_id, "reddit-rust"); + assert_eq!(msg.external_id, "reddit_reddit_abc123"); + assert_eq!(msg.sender, "author"); + assert!(msg.body.as_ref().unwrap().contains("Cool Rust Tool")); + assert!(msg.body.as_ref().unwrap().contains("350 upvotes")); + assert!(msg.body.as_ref().unwrap().contains("https://example.com")); + let meta = msg.metadata.unwrap(); + assert_eq!(meta["reddit_id"], "abc123"); + assert_eq!(meta["subreddit"], "rust"); + assert_eq!(meta["score"], 350); + assert_eq!(meta["num_comments"], 10); + } + + #[test] + fn build_post_thread_conversation_creates_thread_with_metadata() { + let post = make_post("abc123", "Thread title", 100); + let (conv, msg) = build_post_thread_conversation(&post, "reddit", "rust"); + assert_eq!(conv.id, "reddit-post-abc123"); + assert_eq!(conv.external_id, "reddit_reddit_post_abc123"); + assert_eq!(conv.kind, ConversationKind::Thread); + assert_eq!(conv.metadata.as_ref().unwrap()["subreddit"], "rust"); + assert!(conv.metadata.as_ref().unwrap()["permalink"] + .as_str() + .unwrap() + .contains("reddit.com")); + assert_eq!(msg.external_id, "reddit_reddit_postbody_abc123"); + assert!(msg.body.as_ref().unwrap().contains("Thread title")); + assert!(msg.body.as_ref().unwrap().contains("post body")); + } + + #[test] + fn build_comment_message_sets_parent_metadata() { + let comment = RedditComment { + id: "c1".to_string(), + author: Some("user1".to_string()), + body: Some("nice".to_string()), + score: Some(5), + parent_id: Some("t3_abc123".to_string()), + link_id: Some("t3_abc123".to_string()), + created_utc: Some(1_700_000_001.0), + depth: Some(0), + replies: crate::api::RedditReplies::Empty, + }; + let msg = build_comment_message(&comment, "reddit", "reddit-post-abc123", "rust", "abc123"); + assert_eq!(msg.external_id, "reddit_reddit_comment_c1"); + assert_eq!(msg.sender, "user1"); + assert_eq!(msg.body.as_deref(), Some("nice")); + let meta = msg.metadata.unwrap(); + assert_eq!(meta["parent_id"], "t3_abc123"); + assert_eq!(meta["post_id"], "abc123"); + } + + #[test] + fn sanitize_subreddit_for_conversation_ids() { + assert_eq!(sanitize_subreddit("r/Rust"), "rust"); + assert_eq!(sanitize_subreddit("start-ups!"), "startups"); + } +} diff --git a/crates/void-reddit/src/lib.rs b/crates/void-reddit/src/lib.rs new file mode 100644 index 0000000..0d9fc9e --- /dev/null +++ b/crates/void-reddit/src/lib.rs @@ -0,0 +1,4 @@ +pub const CONNECTOR_ID: &str = "reddit"; + +pub mod api; +pub mod connector; diff --git a/docs/commands.md b/docs/commands.md index 1a07fd7..0d2a5e5 100644 --- a/docs/commands.md +++ b/docs/commands.md @@ -34,7 +34,7 @@ Most read commands accept: | Flag | Description | |------|-------------| -| `--connector ` | Filter by connector: `slack`, `gmail`, `whatsapp`, `telegram`, `calendar`, `linkedin` (alias: `li`), `hackernews` (alias: `hn`) | +| `--connector ` | Filter by connector: `slack`, `gmail`, `whatsapp`, `telegram`, `calendar`, `linkedin` (alias: `li`), `hackernews` (alias: `hn`), `googlenews` (alias: `gn`), `reddit` (alias: `rd`) | | `--connection ` | Filter by connection ID (when you have several accounts of one type) | | `-n`, `--size ` | Limit number of results (default: 50) | | `--page ` | Page through results | @@ -143,6 +143,32 @@ Tune the watched-keywords feed without editing `config.toml`. See [Connector set | `void hn keywords set ` | Replace all keywords (empty to clear) | | `void hn min-score ` | Set the minimum score threshold for stories | +## Reddit + +Tune watched subreddits, keywords, and score threshold without editing `config.toml`. See [Connector setup](connectors.md#reddit). + +| Command | Description | +|---------|-------------| +| `void reddit config` | Show current subreddits, keywords, and minimum score (credentials redacted) | +| `void reddit subreddits list` | List watched subreddits | +| `void reddit subreddits add ` | Add one or more subreddits (comma-separated) | +| `void reddit subreddits remove ` | Remove one or more subreddits (comma-separated) | +| `void reddit subreddits set ` | Replace all subreddits | +| `void reddit keywords list` | List watched keywords | +| `void reddit keywords add ` | Add one or more keywords (comma-separated) | +| `void reddit keywords remove ` | Remove one or more keywords (comma-separated) | +| `void reddit keywords set ` | Replace all keywords (empty to clear) | +| `void reddit min-score ` | Set the minimum score threshold for posts | + +Reply to synced Reddit comments (requires OAuth commenting enabled during setup): + +| Command | Description | +|---------|-------------| +| `void reply --message "..."` | Reply to a post or comment in a Reddit thread | +| `void send --via reddit --to --message "..."` | Post a top-level comment on a Reddit post | + +Alias: `void rd …` + ## Hooks LLM automations triggered by new messages or cron schedules. See the full [Hooks guide](hooks.md). diff --git a/docs/configuration.md b/docs/configuration.md index d24391e..c107b5b 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -30,6 +30,7 @@ path = "~/.local/share/void" gmail_poll_interval_secs = 30 calendar_poll_interval_secs = 60 hackernews_poll_interval_secs = 3600 +reddit_poll_interval_secs = 3600 linkedin_poll_interval_secs = 1800 linkedin_backfill_days = 15 github_poll_interval_secs = 120 @@ -67,6 +68,16 @@ type = "hackernews" keywords = ["rust", "ai", "startup"] min_score = 100 +[[connections]] +id = "reddit" +type = "reddit" +client_id = "your-reddit-app-client-id" +client_secret = "your-reddit-app-client-secret" +refresh_token = "stored-by-setup-when-commenting-enabled" +subreddits = ["rust", "programming"] +keywords = ["ai", "startup"] +min_score = 50 + [[connections]] id = "linkedin" type = "linkedin" @@ -98,6 +109,7 @@ Polling intervals for connectors that poll (push-based connectors β€” WhatsApp, | `gmail_poll_interval_secs` | 30 | | `calendar_poll_interval_secs` | 60 | | `hackernews_poll_interval_secs` | 3600 | +| `reddit_poll_interval_secs` | 3600 | | `linkedin_poll_interval_secs` | 1800 | | `linkedin_backfill_days` | 15 | | `github_poll_interval_secs` | 120 | @@ -109,7 +121,7 @@ Each connection is one account on one service. Every connection has: | Field | Required | Description | |-------|----------|-------------| | `id` | yes | Unique name you choose β€” used by `--connection ` | -| `type` | yes | One of `whatsapp`, `telegram`, `slack`, `gmail`, `calendar`, `hackernews`, `googlenews`, `linkedin`, `github` | +| `type` | yes | One of `whatsapp`, `telegram`, `slack`, `gmail`, `calendar`, `hackernews`, `googlenews`, `linkedin`, `reddit`, `github` | | `ignore_conversations` | no | List of conversations to auto-mute (see below) | Per-type fields: @@ -122,6 +134,8 @@ Per-type fields: | `gmail` | β€” | `credentials_file` (custom Google OAuth client) | | `calendar` | β€” | `credentials_file`, `calendar_ids` (default: primary) | | `hackernews` | β€” | `keywords` (default: `[]`), `min_score` (default: 0) | +| `googlenews` | β€” | `keywords`, `when`, `language`, `country` | +| `reddit` | `client_id`, `client_secret` | `refresh_token` (optional, enables commenting), `subreddits` (default: `[]`), `keywords` (default: `[]`), `min_score` (default: 0) | | `linkedin` | `api_key`, `dsn`, `account_id` (Unipile) | β€” | | `github` | `token`, `username` | β€” | diff --git a/docs/connectors.md b/docs/connectors.md index d07459b..6b5edef 100644 --- a/docs/connectors.md +++ b/docs/connectors.md @@ -12,6 +12,7 @@ Every connector is added through the same flow: run `void setup`, pick the servi | [LinkedIn](#linkedin-unipile) | Unipile API key | Unipile API polling | | [Hacker News](#hacker-news) | None β€” public API | HN API polling | | [Google News](#google-news) | None β€” public RSS | Google News RSS polling | +| [Reddit](#reddit) | Reddit app OAuth | Reddit API polling | | [GitHub](#github) | Personal Access Token | GitHub REST API polling | ## WhatsApp @@ -120,6 +121,49 @@ void gn config To follow several editions (e.g. French and US news), add one connection per edition β€” each is targetable with `--connection `. +## Reddit + +Reddit requires a **web** app registered at [reddit.com/prefs/apps](https://www.reddit.com/prefs/apps) with redirect URI `http://localhost:8765`. + +**Read-only mode** uses application-only OAuth (`client_credentials`) with just `client_id` and `client_secret`. Posts from watched subreddits appear in your inbox (one channel conversation per subreddit). + +**Commenting mode** (optional during `void setup`) runs a browser OAuth flow and stores a `refresh_token` in config. When enabled, matching posts also sync as thread conversations with comments, and you can reply from the CLI. + +OAuth setup tries a local callback on `localhost:8765` first. If the port is busy, the browser cannot open, or you are on a remote machine, setup falls back to printing the authorize URL and asking you to paste the returned code. + +Run `void setup`, select Reddit, and enter your client ID, client secret, subreddits, keywords, and minimum score. Optionally enable commenting for the OAuth flow. + +```toml +[[connections]] +id = "reddit" +type = "reddit" +client_id = "your-reddit-app-client-id" +client_secret = "your-reddit-app-client-secret" +refresh_token = "stored-by-setup-when-commenting-enabled" # optional +subreddits = ["rust", "programming", "startups"] +keywords = ["ai", "llm"] +min_score = 50 +``` + +Tune filters later without editing the config: + +```bash +void reddit subreddits add "rust,local-first" +void reddit subreddits remove "startups" +void reddit keywords add "ai,llm" +void reddit min-score 100 +void reddit config +``` + +Reply to a synced comment or post (requires `refresh_token`): + +```bash +void reply --message "Thanks for sharing!" +void send --via reddit --to reddit_reddit_post_abc123 --message "Great post!" +``` + +`void reply` targets a post-body or comment message inside a synced thread (the conversations created when commenting is enabled). To comment on a post that only appears in the subreddit feed, use `void send --via reddit --to ` instead. + ## GitHub GitHub syncs actionable activity into your inbox (read-only): diff --git a/scripts/server-config.toml.example b/scripts/server-config.toml.example index 7f59432..7cf19c4 100644 --- a/scripts/server-config.toml.example +++ b/scripts/server-config.toml.example @@ -9,6 +9,7 @@ mode = "local" gmail_poll_interval_secs = 30 calendar_poll_interval_secs = 60 hackernews_poll_interval_secs = 3600 +reddit_poll_interval_secs = 3600 linkedin_poll_interval_secs = 1800 linkedin_backfill_days = 15