diff --git a/.gitignore b/.gitignore index 2e993bb..1a9dbfd 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,4 @@ coverage/ __pycache__/ opencode-plugin-template/ opencode-docs-*/ +package-lock.json diff --git a/.mise/tasks/build b/.mise/tasks/build deleted file mode 100755 index 470ce30..0000000 --- a/.mise/tasks/build +++ /dev/null @@ -1,6 +0,0 @@ -#!/usr/bin/env bash -#MISE description="Build the project" -set -e -rm -rf dist -tsc -p tsconfig.build.json -cp -r src/command dist/command \ No newline at end of file diff --git a/.mise/tasks/format b/.mise/tasks/format deleted file mode 100755 index 4b63ace..0000000 --- a/.mise/tasks/format +++ /dev/null @@ -1,3 +0,0 @@ -#!/usr/bin/env bash -#MISE description="Format code with Biome" -biome format --write . \ No newline at end of file diff --git a/.mise/tasks/lint b/.mise/tasks/lint deleted file mode 100755 index 6392e14..0000000 --- a/.mise/tasks/lint +++ /dev/null @@ -1,3 +0,0 @@ -#!/usr/bin/env bash -#MISE description="Run Biome lint" -biome lint . \ No newline at end of file diff --git a/.mise/tasks/lint:fix b/.mise/tasks/lint:fix deleted file mode 100755 index bcdc53b..0000000 --- a/.mise/tasks/lint:fix +++ /dev/null @@ -1,3 +0,0 @@ -#!/usr/bin/env bash -#MISE description="Run Biome lint with auto-fix" -biome lint --write . \ No newline at end of file diff --git a/.mise/tasks/local-pack-test b/.mise/tasks/local-pack-test deleted file mode 100755 index 5c34080..0000000 --- a/.mise/tasks/local-pack-test +++ /dev/null @@ -1,33 +0,0 @@ -#!/usr/bin/env bash -#MISE description="Install packed plugin into opencode cache (production-like test)" -set -euo pipefail - -PLUGIN_LOCAL_FILE="${HOME}/.config/opencode/plugin/opencode-synced-local.ts" -CACHE_DIR="${HOME}/.cache/opencode" -PACKAGE_DIR="$(pwd)" -VERSION="$(node -p "require('./package.json').version")" - -if [ -f "${PLUGIN_LOCAL_FILE}" ]; then - rm -f "${PLUGIN_LOCAL_FILE}" -fi - -TARBALL="$(npm pack | tail -n 1)" - -rm -rf "${CACHE_DIR}/node_modules/opencode-synced" - -if [ -f "${CACHE_DIR}/package.json" ]; then - python - < /dev/null 2>&1; then - echo "❌ Error: 'scripts' property found in package.json" - echo "Scripts are not allowed. Use .mise/tasks instead." - exit 1 -else - echo "✓ package.json validation passed" -fi diff --git a/.mise/tasks/prepare b/.mise/tasks/prepare deleted file mode 100755 index 9de48a1..0000000 --- a/.mise/tasks/prepare +++ /dev/null @@ -1,3 +0,0 @@ -#!/usr/bin/env bash -#MISE description="Prepare for publishing" -bun run build \ No newline at end of file diff --git a/.mise/tasks/publish b/.mise/tasks/publish deleted file mode 100755 index 5bb1d18..0000000 --- a/.mise/tasks/publish +++ /dev/null @@ -1,30 +0,0 @@ -#!/usr/bin/env bash -#MISE description="Publish to npm registry" -#MISE depends=["setup", "build"] -#USAGE flag "-t --tag " "Tag to publish with" default="latest" - -set -e - -echo "Publishing to npm" -echo " > with tag: ${usage_tag}..." -echo " > npm version: $(npm --version)" - -# Determine if we should use provenance -# Only enable in CI environments with OIDC support -PROVENANCE_FLAG="" -if [ -z "$GITHUB_ACTIONS" ]; then - # Local development: disable provenance (fails without CI provider context) - PROVENANCE_FLAG="--no-provenance" - echo " > provenance: disabled (local environment)" -else - # GitHub Actions: enable provenance (OIDC will handle it automatically) - echo " > provenance: enabled (GitHub Actions with OIDC)" -fi - -# Publish directly from source (allows provenance generation in CI) -npm publish \ - --access public \ - --tag "${usage_tag}" \ - $PROVENANCE_FLAG - -echo "Published successfully!" diff --git a/.mise/tasks/setup b/.mise/tasks/setup deleted file mode 100755 index fbb6dc9..0000000 --- a/.mise/tasks/setup +++ /dev/null @@ -1,12 +0,0 @@ -#!/usr/bin/env bash -#MISE description="Setting up project" - -echo "" -echo "🍜 Setting up project" -echo "" - -bun install - -echo "" -echo "👍 Done" -echo "" diff --git a/.mise/tasks/test b/.mise/tasks/test deleted file mode 100755 index 78fb3ad..0000000 --- a/.mise/tasks/test +++ /dev/null @@ -1,3 +0,0 @@ -#!/usr/bin/env bash -#MISE description="Run tests" -bun test \ No newline at end of file diff --git a/.mise/tasks/version b/.mise/tasks/version deleted file mode 100755 index 4ebaea6..0000000 --- a/.mise/tasks/version +++ /dev/null @@ -1,38 +0,0 @@ -#!/usr/bin/env bash -#MISE description="Bump version - handles 'next' prerelease or syncs after 'latest'" -#USAGE flag "-t --tag " "Tag for versioning strategy (next|latest)" default="next" - -set -e - -echo "Bumping version" -echo " > with tag: ${usage_tag}..." - -# When bumping "next": prerelease bump only -# When "latest": release-please handles it, but we sync "next" to be ahead -if [ "${usage_tag}" = "next" ]; then - echo "Bumping prerelease version for 'next' tag..." - bun pm version prerelease - echo " > Next version: $(jq -r '.version' package.json)" - -elif [ "${usage_tag}" = "latest" ]; then - echo "Syncing 'next' tag to be ahead of 'latest'..." - echo "(Note: 'latest' version should already be bumped by release-please)" - - # Get current version (should be latest after release-please) - LATEST_VERSION=$(jq -r '.version' package.json) - echo " > Latest version: $LATEST_VERSION" - - # Bump to next prerelease (X.Y.Z -> X.Y.(Z+1)-prerelease.0) - echo " > Bumping 'next' to be ahead of latest..." - bun pm version prerelease - - NEXT_VERSION=$(jq -r '.version' package.json) - echo " > Next version: $NEXT_VERSION" - -else - echo "❌ Unknown tag: ${usage_tag}" - echo " Use: --tag next (default) or --tag latest" - exit 1 -fi - -echo "Version bump completed!" diff --git a/src/sync/service.ts b/src/sync/service.ts index 3604254..156b54b 100644 --- a/src/sync/service.ts +++ b/src/sync/service.ts @@ -3,7 +3,7 @@ import path from 'node:path'; import type { PluginInput } from '@opencode-ai/plugin'; import { syncLocalToRepo, syncRepoToLocal } from './apply.js'; -import { generateCommitMessage } from './commit.js'; + import type { NormalizedSyncConfig } from './config.js'; import { canCommitMcpSecrets, @@ -19,7 +19,13 @@ import { import { SyncCommandError, SyncConfigMissingError } from './errors.js'; import type { SyncLockInfo } from './lock.js'; import { withSyncLock } from './lock.js'; -import { buildSyncPlan, resolveRepoRoot, resolveSyncLocations } from './paths.js'; +import { + buildSyncPlan, + resolveProjectsFilePath, + resolveRepoRoot, + resolveSyncLocations, +} from './paths.js'; +import { syncGlobalData } from './projects-merge.js'; import { commitAll, ensureRepoCloned, @@ -27,6 +33,7 @@ import { fetchAndFastForward, findSyncRepo, getAuthenticatedUser, + getHeadHash, getRepoStatus, hasLocalChanges, isRepoCloned, @@ -43,6 +50,8 @@ import { resolveSecretsBackendConfig, type SecretsBackend, } from './secrets-backend.js'; +import { syncSessions } from './session-merge.js'; +import { createNodeShell } from './shell.js'; import { createTursoSessionBackend, isRetryableTursoError, @@ -51,6 +60,7 @@ import { import { createLogger, extractTextFromResponse, + notify, resolveSmallModel, showToast, unwrapData, @@ -114,6 +124,10 @@ export interface SyncService { } export function createSyncService(ctx: SyncServiceContext): SyncService { + if (!ctx.$) { + ctx.$ = createNodeShell() as unknown as SyncServiceContext['$']; + } + const locations = resolveSyncLocations(); const log = createLogger(ctx.client); const lockPath = path.join(path.dirname(locations.statePath), 'sync.lock'); @@ -475,8 +489,10 @@ export function createSyncService(ctx: SyncServiceContext): SyncService { } tursoIdleFlushTimer = setTimeout(() => { tursoIdleFlushTimer = null; - void skipIfBusy(async () => { + skipIfBusy(async () => { await flushQueuedTursoSync('idle-event'); + }).catch(() => { + // Errors are already logged internally by skipIfBusy }); }, 250); }; @@ -527,7 +543,7 @@ export function createSyncService(ctx: SyncServiceContext): SyncService { stopTursoSyncLoop(); tursoSyncIntervalSec = nextInterval; tursoSyncTimer = setInterval(() => { - void skipIfBusy(async () => { + skipIfBusy(async () => { const latest = await loadSyncConfig(locations); if (!latest || !isTursoSessionBackend(latest)) { stopTursoSyncLoop(); @@ -542,6 +558,8 @@ export function createSyncService(ctx: SyncServiceContext): SyncService { if (result.warning) { log.warn(result.warning, { reason: 'background' }); } + }).catch(() => { + // Errors are already logged internally by skipIfBusy }); }, nextInterval * 1000); }; @@ -600,17 +618,19 @@ export function createSyncService(ctx: SyncServiceContext): SyncService { } catch (error) { const message = `Failed to load opencode-synced config: ${formatError(error)}`; log.error(message, { path: locations.syncConfigPath }); - await showToast( + await notify( ctx.client, - `Failed to load opencode-synced config. Check ${locations.syncConfigPath} for JSON errors.`, + '❌', + `Failed to load config. Check ${locations.syncConfigPath} for JSON errors.`, 'error' ); return; } if (!config) { stopTursoSyncLoop(); - await showToast( + await notify( ctx.client, + 'ℹ️', 'Configure opencode-synced with /sync-init or link to an existing repo with /sync-link', 'info' ); @@ -618,6 +638,7 @@ export function createSyncService(ctx: SyncServiceContext): SyncService { } try { assertValidSecretsBackend(config); + await notify(ctx.client, '🔄', 'Syncing config...', 'info'); let tursoWarning: string | null = null; if (isTursoSessionBackend(config)) { try { @@ -632,13 +653,14 @@ export function createSyncService(ctx: SyncServiceContext): SyncService { runSecretsPullIfConfigured, }); ensureTursoSyncLoop(config); + await notify(ctx.client, '✅', 'opencode-synced ready', 'success'); if (tursoWarning) { log.warn(tursoWarning); await showToast(ctx.client, tursoWarning, 'warning'); } } catch (error) { log.error('Startup sync failed', { error: formatError(error) }); - await showToast(ctx.client, formatError(error), 'error'); + await notify(ctx.client, '❌', formatError(error), 'error'); } }), status: async () => { @@ -796,6 +818,12 @@ export function createSyncService(ctx: SyncServiceContext): SyncService { } ensureTursoSyncLoop(config); + await notify( + ctx.client, + '🚀', + `Sync configured — ${repoIdentifier} (${resolveRepoBranch(config)})`, + 'success' + ); return lines.join('\n'); }), link: (options: LinkOptions) => @@ -811,6 +839,8 @@ export function createSyncService(ctx: SyncServiceContext): SyncService { ); } + await notify(ctx.client, '🔗', 'Linking to sync repo...', 'info'); + const found = await findSyncRepo(ctx.$, options.repo, { disableAutoDiscovery: disableAutoRepoDiscovery, }); @@ -904,7 +934,12 @@ export function createSyncService(ctx: SyncServiceContext): SyncService { lines.push('', ...linkNotes); } - await showToast(ctx.client, 'Config synced. Restart opencode to apply.', 'info'); + await notify( + ctx.client, + '🔗', + `Linked to ${found.owner}/${found.name}. Restart opencode to apply.`, + 'success' + ); return lines.join('\n'); }), pull: () => @@ -924,10 +959,13 @@ export function createSyncService(ctx: SyncServiceContext): SyncService { ); } + await notify(ctx.client, '📥', 'Pulling config from GitHub...', 'info'); + const update = await fetchAndFastForward(ctx.$, repoRoot, branch); if (!update.updated) { const tursoSummary = await runForegroundTursoCycle(config, 'pull-up-to-date'); ensureTursoSyncLoop(config); + await notify(ctx.client, '📥', 'Already up to date', 'success'); if (tursoSummary) { return ['Already up to date.', tursoSummary].join('\n'); } @@ -939,6 +977,20 @@ export function createSyncService(ctx: SyncServiceContext): SyncService { await syncRepoToLocal(plan, overrides); await runSecretsPullIfConfigured(config); + if (config.includeSessions && !isTursoSessionBackend(config)) { + const sessionDbPath = path.join(locations.xdg.dataDir, 'opencode', 'opencode.db'); + const sessionsDir = path.join(repoRoot, 'data', 'sessions'); + const result = await syncSessions(sessionDbPath, sessionsDir); + log.info(`Session sync result: ${result.total} total, ${result.merged} merged`); + } + + if (config.includeProjects) { + syncGlobalData( + resolveProjectsFilePath(), + path.join(repoRoot, 'data', 'opencode.global.dat') + ); + } + await updateState(locations, { lastPull: new Date().toISOString(), lastRemoteUpdate: new Date().toISOString(), @@ -947,7 +999,7 @@ export function createSyncService(ctx: SyncServiceContext): SyncService { const tursoSummary = await runForegroundTursoCycle(config, 'pull-updated'); ensureTursoSyncLoop(config); - await showToast(ctx.client, 'Config updated. Restart opencode to apply.', 'info'); + await notify(ctx.client, '📥', 'Config updated. Restart opencode to apply.', 'success'); if (tursoSummary) { return [ 'Remote config applied. Restart opencode to use new settings.', @@ -972,8 +1024,25 @@ export function createSyncService(ctx: SyncServiceContext): SyncService { ); } + await notify(ctx.client, '📤', 'Pushing config to GitHub...', 'info'); + const overrides = await loadOverrides(locations); const plan = buildSyncPlan(config, locations, repoRoot); + + if (config.includeSessions && !isTursoSessionBackend(config)) { + const sessionDbPath = path.join(locations.xdg.dataDir, 'opencode', 'opencode.db'); + const sessionsDir = path.join(repoRoot, 'data', 'sessions'); + const result = await syncSessions(sessionDbPath, sessionsDir); + log.info(`Session sync result: ${result.total} total, ${result.merged} merged`); + } + + if (config.includeProjects) { + syncGlobalData( + resolveProjectsFilePath(), + path.join(repoRoot, 'data', 'opencode.global.dat') + ); + } + await syncLocalToRepo(plan, overrides, { overridesPath: locations.overridesPath, allowMcpSecrets: canCommitMcpSecrets(config), @@ -985,6 +1054,7 @@ export function createSyncService(ctx: SyncServiceContext): SyncService { ensureTursoSyncLoop(config); try { const secretsResult = await runSecretsPushIfConfigured(config); + await notify(ctx.client, '📤', 'No local changes to push', 'info'); const lines: string[] = []; if (secretsResult === 'pushed') { lines.push('No local changes to push. Secrets updated.'); @@ -1007,7 +1077,8 @@ export function createSyncService(ctx: SyncServiceContext): SyncService { } } - const message = await generateCommitMessage({ client: ctx.client, $: ctx.$ }, repoRoot); + const now = new Date(); + const message = `Sync opencode config (${now.toISOString().slice(0, 10)})`; await commitAll(ctx.$, repoRoot, message); await pushBranch(ctx.$, repoRoot, branch); @@ -1026,6 +1097,8 @@ export function createSyncService(ctx: SyncServiceContext): SyncService { const tursoSummary = await runForegroundTursoCycle(config, 'push-updated'); ensureTursoSyncLoop(config); + await notify(ctx.client, '📤', `Pushed: ${message}`, 'success'); + if (secretsFailure) { const lines = [`Pushed changes: ${message}. Secrets push failed: ${secretsFailure}`]; if (tursoSummary) { @@ -1336,6 +1409,10 @@ function toRepoRelativePath(repoRoot: string, absolutePath: string): string { return path.relative(repoRoot, absolutePath).split(path.sep).join('/'); } +// EN: Startup sync — auto-commit dirty, HEAD-cache fetch skip, pull + session/project merge +// EN: github push is fire-and-forget (.catch) — doesn't block startup +// RU: Стартовая синхронизация — auto-commit при dirty, пропуск fetch по HEAD-кэшу, pull + merge сессий/проектов +// RU: push в GitHub — fire-and-forget (.catch) — не блокирует запуск async function runStartup( ctx: SyncServiceContext, locations: ReturnType, @@ -1355,34 +1432,86 @@ async function runStartup( const branch = await resolveBranch(ctx, config, repoRoot); log.debug('Resolved branch', { branch }); + const head = await getHeadHash(ctx.$, repoRoot); + const state = await loadState(locations); const dirty = await hasLocalChanges(ctx.$, repoRoot); if (dirty) { - log.warn('Uncommitted changes detected', { repoRoot }); - await showToast( - ctx.client, - `Uncommitted changes detected. Run /sync-resolve to auto-fix, or manually resolve in: ${repoRoot}`, - 'warning' - ); - return; + log.warn('Uncommitted changes detected, attempting auto-commit', { repoRoot }); + try { + const date = new Date().toISOString().slice(0, 10).replace(/-/g, '.'); + await commitAll(ctx.$, repoRoot, `Sync opencode config (${date})`); + const branch = await resolveBranch(ctx, config, repoRoot); + const commitHead = await getHeadHash(ctx.$, repoRoot); + // EN: Background push — don't block startup waiting for GitHub + // RU: Фоновый push — не ждём GitHub, не блокируем стартап + pushBranch(ctx.$, repoRoot, branch).catch((err) => + log.warn('Background push failed', { error: formatError(err) }) + ); + await updateState(locations, { + lastPush: new Date().toISOString(), + lastHead: commitHead ?? undefined, + }); + log.info('Auto-committed and pushed pending changes'); + await showToast(ctx.client, 'Pending changes committed and pushed', 'info'); + } catch (error) { + log.warn('Could not auto-commit pending changes', { error: formatError(error) }); + await showToast( + ctx.client, + `Uncommitted changes detected. Run /sync-resolve to auto-fix, or manually resolve in: ${repoRoot}`, + 'warning' + ); + return; + } } - const update = await fetchAndFastForward(ctx.$, repoRoot, branch); - if (update.updated) { - log.info('Pulled remote changes', { branch }); - const overrides = await loadOverrides(locations); - const plan = buildSyncPlan(config, locations, repoRoot); - await syncRepoToLocal(plan, overrides); - await options.runSecretsPullIfConfigured(config); - await updateState(locations, { - lastPull: new Date().toISOString(), - lastRemoteUpdate: new Date().toISOString(), - }); - await showToast(ctx.client, 'Config updated. Restart opencode to apply.', 'info'); - return; + const shouldFetch = !head || head !== state.lastHead || dirty; + if (shouldFetch) { + const update = await fetchAndFastForward(ctx.$, repoRoot, branch); + if (update.updated) { + log.info('Pulled remote changes', { branch }); + const overrides = await loadOverrides(locations); + const plan = buildSyncPlan(config, locations, repoRoot); + await syncRepoToLocal(plan, overrides); + await options.runSecretsPullIfConfigured(config); + + if (config.includeSessions && !isTursoSessionBackend(config)) { + const sessionDbPath = path.join(locations.xdg.dataDir, 'opencode', 'opencode.db'); + const sessionsDir = path.join(repoRoot, 'data', 'sessions'); + const result = await syncSessions(sessionDbPath, sessionsDir); + log.info(`Session sync result: ${result.total} total, ${result.merged} merged`); + } + + if (config.includeProjects) { + syncGlobalData( + resolveProjectsFilePath(), + path.join(repoRoot, 'data', 'opencode.global.dat') + ); + } + + await updateState(locations, { + lastPull: new Date().toISOString(), + lastRemoteUpdate: new Date().toISOString(), + lastHead: head ?? undefined, + }); + await showToast(ctx.client, 'Config updated. Restart opencode to apply.', 'info'); + return; + } } const overrides = await loadOverrides(locations); const plan = buildSyncPlan(config, locations, repoRoot); + + if (config.includeSessions && !isTursoSessionBackend(config)) { + const sessionDbPath = path.join(locations.xdg.dataDir, 'opencode', 'opencode.db'); + const sessionsDir = path.join(repoRoot, 'data', 'sessions'); + const result = await syncSessions(sessionDbPath, sessionsDir); + log.info(`Session sync result: ${result.total} total, ${result.merged} merged`); + } + + if (config.includeProjects) { + syncGlobalData(resolveProjectsFilePath(), path.join(repoRoot, 'data', 'opencode.global.dat')); + } + await syncLocalToRepo(plan, overrides, { overridesPath: locations.overridesPath, allowMcpSecrets: canCommitMcpSecrets(config), @@ -1393,12 +1522,15 @@ async function runStartup( return; } - const message = await generateCommitMessage({ client: ctx.client, $: ctx.$ }, repoRoot); + const now = new Date(); + const message = `Sync opencode config (${now.toISOString().slice(0, 10)})`; log.info('Pushing local changes', { message }); await commitAll(ctx.$, repoRoot, message); await pushBranch(ctx.$, repoRoot, branch); + const newHead = await getHeadHash(ctx.$, repoRoot); await updateState(locations, { lastPush: new Date().toISOString(), + lastHead: newHead ?? undefined, }); } @@ -1599,11 +1731,21 @@ async function analyzeAndDecideResolution( if (sessionId) { try { await ctx.client.session.delete({ path: { id: sessionId } }); - } catch {} + } catch { + // Session deletion is best-effort cleanup + } } } } catch (error) { - console.error('[ERROR] AI resolution analysis failed:', error); + ctx.client.app + .log({ + body: { + service: 'opencode-synced', + level: 'error', + message: `AI resolution analysis failed: ${formatError(error)}`, + }, + }) + .catch(() => {}); return { action: 'manual', reason: `Error analyzing changes: ${formatError(error)}` }; } } diff --git a/src/sync/session-db.ts b/src/sync/session-db.ts new file mode 100644 index 0000000..99c3d84 --- /dev/null +++ b/src/sync/session-db.ts @@ -0,0 +1,523 @@ +import fs from 'node:fs'; +import path from 'node:path'; +import { DatabaseSync } from 'node:sqlite'; + +export interface SessionMeta { + id: string; + project_id: string | null; + parent_id: string | null; + slug: string | null; + directory: string | null; + title: string | null; + version: number | null; + share_url: string | null; + summary_additions: number | null; + summary_deletions: number | null; + summary_files: number | null; + summary_diffs: string | null; + revert: string | null; + permission: string | null; + time_created: string | null; + time_updated: string | null; + time_compacting: string | null; + time_archived: string | null; + workspace_id: string | null; + path: string | null; + agent: string | null; + model: string | null; + cost: number | null; + tokens_input: number | null; + tokens_output: number | null; + tokens_reasoning: number | null; + tokens_cache_read: number | null; + tokens_cache_write: number | null; +} + +export interface Message { + id: string; + session_id: string; + time_created: string | null; + time_updated: string | null; + data: string | null; + parts: Part[]; +} + +export interface Part { + id: string; + message_id: string; + session_id: string; + time_created: string | null; + time_updated: string | null; + data: string | null; +} + +export interface SessionMessage { + id: string; + session_id: string; + type: string | null; + time_created: string | null; + time_updated: string | null; + data: string | null; +} + +export interface Todo { + session_id: string; + content: string | null; + status: string | null; + priority: number | null; + position: number | null; + time_created: string | null; + time_updated: string | null; +} + +export interface SessionShare { + session_id: string; + id: string; + secret: string | null; + url: string | null; + time_created: string | null; + time_updated: string | null; +} + +export interface Session { + session: SessionMeta; + messages: Message[]; + session_messages: SessionMessage[]; + todos: Todo[]; + session_shares: SessionShare[]; +} + +const SESSION_COLUMNS = [ + 'id', + 'project_id', + 'parent_id', + 'slug', + 'directory', + 'title', + 'version', + 'share_url', + 'summary_additions', + 'summary_deletions', + 'summary_files', + 'summary_diffs', + 'revert', + 'permission', + 'time_created', + 'time_updated', + 'time_compacting', + 'time_archived', + 'workspace_id', + 'path', + 'agent', + 'model', + 'cost', + 'tokens_input', + 'tokens_output', + 'tokens_reasoning', + 'tokens_cache_read', + 'tokens_cache_write', +]; + +const MESSAGE_COLUMNS = ['id', 'session_id', 'time_created', 'time_updated', 'data']; +const PART_COLUMNS = ['id', 'message_id', 'session_id', 'time_created', 'time_updated', 'data']; +const SESSION_MESSAGE_COLUMNS = [ + 'id', + 'session_id', + 'type', + 'time_created', + 'time_updated', + 'data', +]; +const TODO_COLUMNS = [ + 'session_id', + 'content', + 'status', + 'priority', + 'position', + 'time_created', + 'time_updated', +]; +const SHARE_COLUMNS = ['session_id', 'id', 'secret', 'url', 'time_created', 'time_updated']; + +function openDB(dbPath: string): DatabaseSync { + return new DatabaseSync(dbPath); +} + +// EN: Database handle variants — used by syncSessions to open DB once +// RU: Варианты с проброшенным handle — syncSessions открывает БД один раз +// EN: Force WAL checkpoint so subsequent reads see all committed writes +// RU: Принудительный WAL checkpoint, чтобы последующие чтения видели все записанные данные +export function checkpointDB(db: DatabaseSync): void { + try { + db.exec('PRAGMA wal_checkpoint'); + } catch { + // checkpoint may fail if another connection has a write lock — ignore + } +} + +function readSessionMeta(db: DatabaseSync, id: string): SessionMeta | null { + const row = db + .prepare(`SELECT ${SESSION_COLUMNS.join(', ')} FROM session WHERE id = ?`) + .get(id) as Record | undefined; + if (!row) return null; + return row as unknown as SessionMeta; +} + +function readMessages(db: DatabaseSync, sessionId: string): Message[] { + const rows = db + .prepare(`SELECT ${MESSAGE_COLUMNS.join(', ')} FROM message WHERE session_id = ?`) + .all(sessionId) as Record[]; + return rows.map((row) => { + const msg = row as unknown as Message; + const partRows = db + .prepare(`SELECT ${PART_COLUMNS.join(', ')} FROM part WHERE message_id = ?`) + .all(msg.id) as Record[]; + msg.parts = partRows.map((pr) => pr as unknown as Part); + return msg; + }); +} + +function readSessionMessages(db: DatabaseSync, sessionId: string): SessionMessage[] { + const rows = db + .prepare( + `SELECT ${SESSION_MESSAGE_COLUMNS.join(', ')} FROM session_message WHERE session_id = ?` + ) + .all(sessionId) as Record[]; + return rows.map((r) => r as unknown as SessionMessage); +} + +function readTodos(db: DatabaseSync, sessionId: string): Todo[] { + const rows = db + .prepare(`SELECT ${TODO_COLUMNS.join(', ')} FROM todo WHERE session_id = ?`) + .all(sessionId) as Record[]; + return rows.map((r) => r as unknown as Todo); +} + +function readShares(db: DatabaseSync, sessionId: string): SessionShare[] { + const rows = db + .prepare(`SELECT ${SHARE_COLUMNS.join(', ')} FROM session_share WHERE session_id = ?`) + .all(sessionId) as Record[]; + return rows.map((r) => r as unknown as SessionShare); +} + +// EN: List session IDs using an already-open DB handle (avoids open/close overhead) +// RU: Список ID сессий через уже открытый DB handle (без лишних open/close) +export function listSessionIdsFromHandle(db: DatabaseSync): string[] { + const rows = db.prepare('SELECT id FROM session').all() as { id: string }[]; + return rows.map((r) => r.id); +} + +export function listSessionIdsFromDB(dbPath: string): string[] { + if (!fs.existsSync(dbPath)) return []; + const db = openDB(dbPath); + try { + return listSessionIdsFromHandle(db); + } finally { + db.close(); + } +} + +export function listSessionIdsFromDir(dir: string): string[] { + if (!fs.existsSync(dir)) return []; + return fs + .readdirSync(dir) + .filter((f) => f.endsWith('.json')) + .map((f) => f.replace(/\.json$/, '')); +} + +// EN: Read session + all relations (messages, parts, todos, shares) via open handle +// RU: Чтение сессии + всех связей (сообщения, части, todo, шары) через открытый handle +export function readSessionFromHandle(db: DatabaseSync, id: string): Session | null { + const meta = readSessionMeta(db, id); + if (!meta) return null; + return { + session: meta, + messages: readMessages(db, id), + session_messages: readSessionMessages(db, id), + todos: readTodos(db, id), + session_shares: readShares(db, id), + }; +} + +export function readSessionFromDB(dbPath: string, id: string): Session | null { + if (!fs.existsSync(dbPath)) return null; + const db = openDB(dbPath); + try { + return readSessionFromHandle(db, id); + } finally { + db.close(); + } +} + +export function readSessionFromFile(dir: string, id: string): Session | null { + const filePath = path.join(dir, `${id}.json`); + if (!fs.existsSync(filePath)) return null; + try { + const content = fs.readFileSync(filePath, 'utf-8'); + const parsed = JSON.parse(content); + + if (Array.isArray(parsed.session) && parsed.columns?.session) { + const session = arrayToObject(parsed.session, parsed.columns.session); + const messages = (parsed.message ?? []).map((row: unknown[]) => { + const msg = arrayToObject(row, parsed.columns.message); + (msg as Record).parts = (parsed.parts ?? []) + .filter((p: unknown[]) => p[1] === msg.id) + .map((p: unknown[]) => arrayToObject(p, parsed.columns.parts)); + return msg; + }); + const session_messages = (parsed.session_messages ?? []).map((row: unknown[]) => + arrayToObject(row, parsed.columns.session_message) + ); + return { + session: session as unknown as SessionMeta, + messages, + session_messages, + todos: [], + session_shares: [], + } as Session; + } + + return { + session: parsed.session ?? parsed, + messages: parsed.messages ?? [], + session_messages: parsed.session_messages ?? [], + todos: parsed.todos ?? [], + session_shares: parsed.session_shares ?? [], + } as Session; + } catch { + // EN: Backup corrupted file before returning null (data recovery safety net) + // RU: Бэкап повреждённого файла перед возвратом null (страховка от потери данных) + try { + const brokenPath = `${filePath}.broken`; + if (!fs.existsSync(brokenPath)) { + fs.copyFileSync(filePath, brokenPath); + } + } catch { + // can't backup either — ignore + } + return null; + } +} + +export function readSessionsFromDB(dbPath: string): Session[] { + if (!fs.existsSync(dbPath)) return []; + const db = openDB(dbPath); + try { + const ids = db.prepare('SELECT id FROM session').all() as { id: string }[]; + return ids.map(({ id }) => ({ + session: readSessionMeta(db, id) as SessionMeta, + messages: readMessages(db, id), + session_messages: readSessionMessages(db, id), + todos: readTodos(db, id), + session_shares: readShares(db, id), + })); + } finally { + db.close(); + } +} + +function arrayToObject(arr: unknown[], colNames: string[]): Record { + const obj: Record = {}; + for (let i = 0; i < colNames.length; i++) { + obj[colNames[i]] = arr[i] ?? null; + } + return obj; +} + +export function readSessionsFromDir(dir: string): Session[] { + if (!fs.existsSync(dir)) return []; + const files = fs.readdirSync(dir).filter((f) => f.endsWith('.json')); + return files.map((file) => { + const content = fs.readFileSync(path.join(dir, file), 'utf-8'); + const parsed = JSON.parse(content); + + if (Array.isArray(parsed.session) && parsed.columns?.session) { + const session = arrayToObject(parsed.session, parsed.columns.session); + const messages = (parsed.message ?? []).map((row: unknown[]) => { + const msg = arrayToObject(row, parsed.columns.message); + (msg as Record).parts = (parsed.parts ?? []) + .filter((p: unknown[]) => p[1] === msg.id) + .map((p: unknown[]) => arrayToObject(p, parsed.columns.parts)); + return msg; + }); + const session_messages = (parsed.session_messages ?? []).map((row: unknown[]) => + arrayToObject(row, parsed.columns.session_message) + ); + return { + session: session as unknown as SessionMeta, + messages, + session_messages, + todos: [], + session_shares: [], + } as Session; + } + + return { + session: parsed.session ?? parsed, + messages: parsed.messages ?? [], + session_messages: parsed.session_messages ?? [], + todos: parsed.todos ?? [], + session_shares: parsed.session_shares ?? [], + } as Session; + }); +} + +function asSQLValue(val: unknown): string | number | null { + if (val === null || val === undefined) return null; + if (typeof val === 'string') return val; + if (typeof val === 'number') return val; + if (typeof val === 'boolean') return val ? 1 : 0; + return JSON.stringify(val); +} + +function upsertSession(db: DatabaseSync, s: SessionMeta): void { + const values = SESSION_COLUMNS.map((col) => + asSQLValue((s as unknown as Record)[col]) + ); + try { + db.prepare( + `INSERT OR REPLACE INTO session (${SESSION_COLUMNS.join(', ')}) VALUES (${SESSION_COLUMNS.map(() => '?').join(',')})` + ).run(...values); + } catch (e) { + throw new Error(`upsertSession failed for session ${s.id}: ${e}`); + } +} + +function upsertMessages(db: DatabaseSync, messages: Message[]): void { + const placeholders = MESSAGE_COLUMNS.map(() => '?').join(', '); + const stmt = db.prepare( + `INSERT OR REPLACE INTO message (${MESSAGE_COLUMNS.join(', ')}) VALUES (${placeholders})` + ); + for (const msg of messages) { + try { + stmt.run(v(msg.id), v(msg.session_id), v(msg.time_created), v(msg.time_updated), v(msg.data)); + } catch (e) { + throw new Error(`upsertMessages failed for msg ${msg.id}: ${e}`); + } + } +} + +function upsertParts(db: DatabaseSync, parts: Part[]): void { + const placeholders = PART_COLUMNS.map(() => '?').join(', '); + const stmt = db.prepare( + `INSERT OR REPLACE INTO part (${PART_COLUMNS.join(', ')}) VALUES (${placeholders})` + ); + for (const part of parts) { + try { + stmt.run( + v(part.id), + v(part.message_id), + v(part.session_id), + v(part.time_created), + v(part.time_updated), + v(part.data) + ); + } catch (e) { + throw new Error(`upsertParts failed for part ${part.id}: ${e}`); + } + } +} + +function v(val: unknown): string | number | null { + return asSQLValue(val); +} + +function upsertSessionMessages(db: DatabaseSync, items: SessionMessage[]): void { + const placeholders = SESSION_MESSAGE_COLUMNS.map(() => '?').join(', '); + const stmt = db.prepare( + `INSERT OR REPLACE INTO session_message (${SESSION_MESSAGE_COLUMNS.join(', ')}) VALUES (${placeholders})` + ); + for (const sm of items) { + stmt.run( + v(sm.id), + v(sm.session_id), + v(sm.type), + v(sm.time_created), + v(sm.time_updated), + v(sm.data) + ); + } +} + +function upsertTodos(db: DatabaseSync, items: Todo[]): void { + const placeholders = TODO_COLUMNS.map(() => '?').join(', '); + const stmt = db.prepare( + `INSERT OR REPLACE INTO todo (${TODO_COLUMNS.join(', ')}) VALUES (${placeholders})` + ); + for (const todo of items) { + stmt.run( + v(todo.session_id), + v(todo.content), + v(todo.status), + v(todo.priority), + v(todo.position), + v(todo.time_created), + v(todo.time_updated) + ); + } +} + +function upsertShares(db: DatabaseSync, items: SessionShare[]): void { + const placeholders = SHARE_COLUMNS.map(() => '?').join(', '); + const stmt = db.prepare( + `INSERT OR REPLACE INTO session_share (${SHARE_COLUMNS.join(', ')}) VALUES (${placeholders})` + ); + for (const share of items) { + stmt.run( + v(share.session_id), + v(share.id), + v(share.secret), + v(share.url), + v(share.time_created), + v(share.time_updated) + ); + } +} + +// EN: Batch-write multiple sessions in one transaction (avoids per-session DB open/close) +// RU: Пакетная запись нескольких сессий одной транзакцией (без per-session open/close БД) +export function writeSessionsToHandle(db: DatabaseSync, sessions: Session[]): void { + if (sessions.length === 0) return; + db.exec('BEGIN TRANSACTION'); + try { + for (const s of sessions) { + upsertSession(db, s.session); + upsertMessages(db, s.messages); + for (const msg of s.messages) { + upsertParts(db, msg.parts); + } + upsertSessionMessages(db, s.session_messages); + upsertTodos(db, s.todos); + upsertShares(db, s.session_shares); + } + db.exec('COMMIT'); + } catch (error) { + db.exec('ROLLBACK'); + throw error; + } +} + +export function writeSessionsToDB(dbPath: string, sessions: Session[]): void { + if (sessions.length === 0) return; + const db = openDB(dbPath); + try { + writeSessionsToHandle(db, sessions); + } finally { + db.close(); + } +} + +// EN: Write single session to JSON file (compact format, no pretty-print — saves ~30% disk) +// RU: Запись одной сессии в JSON-файл (compact, без pretty-print — экономия ~30% места) +export function writeSessionToFile(dir: string, session: Session): void { + fs.mkdirSync(dir, { recursive: true }); + const filePath = path.join(dir, `${session.session.id}.json`); + fs.writeFileSync(filePath, JSON.stringify(session), 'utf-8'); +} + +export function writeSessionsToDir(dir: string, sessions: Session[]): void { + fs.mkdirSync(dir, { recursive: true }); + for (const s of sessions) { + writeSessionToFile(dir, s); + } +} diff --git a/src/sync/session-merge.ts b/src/sync/session-merge.ts new file mode 100644 index 0000000..1f39375 --- /dev/null +++ b/src/sync/session-merge.ts @@ -0,0 +1,153 @@ +import fs from 'node:fs'; +import { DatabaseSync } from 'node:sqlite'; +import type { Message, Part, Session, Todo } from './session-db.js'; +import { + checkpointDB, + listSessionIdsFromDir, + listSessionIdsFromHandle, + readSessionFromFile, + readSessionFromHandle, + writeSessionsToHandle, + writeSessionToFile, +} from './session-db.js'; + +export interface SyncSessionResult { + total: number; + merged: number; + conflicts: number; +} + +// EN: Pick the newer record by time_updated (null treated as oldest) +// RU: Выбор более новой записи по time_updated (null считается старым) +function pickNewer(a: T, b: T): T { + if (!a.time_updated) return b; + if (!b.time_updated) return a; + return a.time_updated >= b.time_updated ? a : b; +} + +function unionMessages(local: Message[], remote: Message[]): Message[] { + const map = new Map(); + for (const msg of local) map.set(msg.id, msg); + for (const msg of remote) { + const existing = map.get(msg.id); + if (!existing) { + map.set(msg.id, msg); + } else { + const winner = pickNewer(existing, msg); + winner.parts = unionParts(existing.parts, msg.parts); + map.set(msg.id, winner); + } + } + return [...map.values()]; +} + +function unionParts(local: Part[], remote: Part[]): Part[] { + const map = new Map(); + for (const part of local) map.set(part.id, part); + for (const part of remote) { + const existing = map.get(part.id); + map.set(part.id, existing ? pickNewer(existing, part) : part); + } + return [...map.values()]; +} + +function unionById( + local: T[], + remote: T[] +): T[] { + const map = new Map(); + for (const item of local) map.set(item.id, item); + for (const item of remote) { + const existing = map.get(item.id); + map.set(item.id, existing ? pickNewer(existing, item) : item); + } + return [...map.values()]; +} + +function unionTodos(local: Todo[], remote: Todo[]): Todo[] { + const map = new Map(); + for (const item of local) { + const key = `${item.session_id}:${item.content ?? ''}`; + map.set(key, item); + } + for (const item of remote) { + const key = `${item.session_id}:${item.content ?? ''}`; + const existing = map.get(key); + map.set(key, existing ? pickNewer(existing, item) : item); + } + return [...map.values()]; +} + +// EN: Union-merge two session versions — picks newer session metadata + unions all relations by id +// RU: Union-merge двух версий сессии — берёт новую мету, объединяет все связи по id +function merge(local: Session, remote: Session): Session { + const localT = local.session.time_updated ?? ''; + const remoteT = remote.session.time_updated ?? ''; + const mergedSession = localT >= remoteT ? local.session : remote.session; + + return { + session: mergedSession, + messages: unionMessages(local.messages, remote.messages), + session_messages: unionById(local.session_messages, remote.session_messages), + todos: unionTodos(local.todos, remote.todos), + session_shares: unionById(local.session_shares, remote.session_shares), + }; +} + +// EN: Stream-based session sync — opens DB once, processes sessions one-by-one, batch-writes at end +// EN: Single DB connection eliminates per-session open/close overhead (was N+2 open/close per cycle) +// RU: Потоковая синхронизация — одно открытие БД, обработка сессий по одной, batch-запись в конце +// RU: Одно соединение с БД устраняет per-session накладные расходы (было N+2 open/close за цикл) +export async function syncSessions( + dbPath: string, + sessionsDir: string +): Promise { + const remoteIds = listSessionIdsFromDir(sessionsDir); + + const dbExists = fs.existsSync(dbPath); + const db = dbExists ? new DatabaseSync(dbPath) : null; + if (db) { + checkpointDB(db); + } + + try { + const localIds = db ? listSessionIdsFromHandle(db) : []; + const allIds = new Set([...localIds, ...remoteIds]); + let totalMerged = 0; + const hasRemote = remoteIds.length > 0; + const toWrite: Session[] = []; + + for (const id of allIds) { + const local = db ? readSessionFromHandle(db, id) : null; + const remote = readSessionFromFile(sessionsDir, id); + + let merged: Session; + + if (!local) { + merged = structuredClone(remote as Session); + } else if (!remote) { + merged = structuredClone(local); + } else { + totalMerged++; + merged = merge(local, remote); + } + + if (hasRemote && local) { + toWrite.push(merged); + } + writeSessionToFile(sessionsDir, merged); + } + + if (db && toWrite.length > 0) { + writeSessionsToHandle(db, toWrite); + } + + return { + total: allIds.size, + merged: totalMerged, + conflicts: 0, + }; + } finally { + db?.close(); + } +} diff --git a/src/sync/shell.ts b/src/sync/shell.ts new file mode 100644 index 0000000..cbfff22 --- /dev/null +++ b/src/sync/shell.ts @@ -0,0 +1,103 @@ +import { exec } from 'node:child_process'; + +export interface ShellProcessPromise + extends Promise<{ exitCode: number; stdout: string; stderr: string }> { + quiet(): this; + text(): Promise; +} + +export type ShellFn = (strings: TemplateStringsArray, ...values: unknown[]) => ShellProcessPromise; + +// EN: Build shell command string — wraps all interpolated values in double quotes +// EN: Prevents path/commit-message breakage on Windows (spaces in paths, etc.) +// RU: Сборка команды — все интерполированные значения оборачиваются в двойные кавычки +// RU: Предотвращает поломку путей/commit message на Windows (пробелы в путях, и т.д.) +function buildCommand(strings: TemplateStringsArray, ...values: unknown[]): string { + let cmd = ''; + for (let i = 0; i < strings.length; i++) { + cmd += strings[i]; + if (i < values.length) { + const v = values[i]; + const str = typeof v === 'string' ? v : String(v); + cmd += `"${str.replace(/"/g, '\\"')}"`; + } + } + return cmd.trim(); +} + +export function createNodeShell(): ShellFn { + return (strings: TemplateStringsArray, ...values: unknown[]): ShellProcessPromise => { + const command = buildCommand(strings, ...values); + let isQuiet = false; + + const run = (): Promise<{ exitCode: number; stdout: string; stderr: string }> => { + return new Promise((resolve) => { + const child = exec( + command, + { encoding: 'utf-8', maxBuffer: 10 * 1024 * 1024 }, + (error, stdout, stderr) => { + if (error) { + resolve({ + exitCode: error.code ?? 1, + stdout: stdout ?? '', + stderr: stderr ?? '', + }); + } else { + resolve({ exitCode: 0, stdout: stdout ?? '', stderr: '' }); + } + } + ); + if (!isQuiet) { + child.stdout?.pipe(process.stdout); + child.stderr?.pipe(process.stderr); + } + }); + }; + + let promise: Promise<{ exitCode: number; stdout: string; stderr: string }> | null = null; + const getPromise = (): Promise<{ exitCode: number; stdout: string; stderr: string }> => { + if (!promise) { + promise = run(); + } + return promise; + }; + + const shellPromise = { + // biome-ignore lint/suspicious/noThenProperty: thenable pattern for async shell execution + then( + onfulfilled?: + | ((value: { + exitCode: number; + stdout: string; + stderr: string; + }) => TResult1 | PromiseLike) + | null, + onrejected?: ((reason: unknown) => TResult2 | PromiseLike) | null + ): Promise { + return getPromise().then(onfulfilled, onrejected); + }, + catch( + onrejected?: ((reason: unknown) => TResult | PromiseLike) | null + ): Promise<{ exitCode: number; stdout: string; stderr: string } | TResult> { + return getPromise().catch(onrejected); + }, + finally( + onfinally?: (() => void) | null + ): Promise<{ exitCode: number; stdout: string; stderr: string }> { + return getPromise().finally(onfinally); + }, + get [Symbol.toStringTag]() { + return 'ShellProcessPromise'; + }, + quiet(): ShellProcessPromise { + isQuiet = true; + return this as unknown as ShellProcessPromise; + }, + text(): Promise { + return getPromise().then((r) => r.stdout); + }, + } as ShellProcessPromise; + + return shellPromise; + }; +}