From 2d9d2b8ec08ee09cc64c5d925ab85716b1d7d1fb Mon Sep 17 00:00:00 2001 From: Eugene Blikh Date: Mon, 4 May 2026 09:19:16 +0300 Subject: [PATCH] search: add /api/v1/search API and opencode collector parser MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add FTS5 search repository with BM25 ranking, cursor pagination, and owner scoping - Expose GET /api/v1/search with query, tool, host, since/until filters - Add opencode collector parser reading from SQLite opencode.db - Spike document recording canonical opencode storage format Invariants: no schema migrations, no wire type changes, read-only search, marker snippets (not HTML), invalid queries → 400 INVALID, parser interface unchanged, collector state schema preserved, stats/react stubs untouched. Reviewed: offset fix (message.rowid instead of time_created for progress marker), all 22 test packages pass. --- cmd/lethe-collector/main.go | 2 + cmd/lethe-collector/main_test.go | 13 + cmd/lethe/main.go | 7 +- cmd/lethe/main_e2e_test.go | 3 + docs/spikes/opencode-format.md | 196 +++++++++ docs/tasks/lethe-search-and-opencode.md | 71 ++- internal/collector/parser/opencode/parser.go | 409 ++++++++++++++++++ .../collector/parser/opencode/parser_test.go | 334 ++++++++++++++ .../parser/opencode/testdata/schema.sql | 31 ++ internal/domain/search/handler.go | 154 +++++++ internal/domain/search/handler_test.go | 283 ++++++++++++ internal/domain/search/repository.go | 274 ++++++++++++ internal/domain/search/repository_test.go | 262 +++++++++++ internal/server/server.go | 13 +- 14 files changed, 2043 insertions(+), 9 deletions(-) create mode 100644 docs/spikes/opencode-format.md create mode 100644 internal/collector/parser/opencode/parser.go create mode 100644 internal/collector/parser/opencode/parser_test.go create mode 100644 internal/collector/parser/opencode/testdata/schema.sql create mode 100644 internal/domain/search/handler.go create mode 100644 internal/domain/search/handler_test.go create mode 100644 internal/domain/search/repository.go create mode 100644 internal/domain/search/repository_test.go diff --git a/cmd/lethe-collector/main.go b/cmd/lethe-collector/main.go index a814601a2c3cbb5dd3494563a7117a75b56cd371..9b5c9ff72eeaf451b42f961cc4ee95ea4b86b0e1 100644 --- a/cmd/lethe-collector/main.go +++ b/cmd/lethe-collector/main.go @@ -18,6 +18,7 @@ import ( "sourcecraft.dev/bigbes/lethe/internal/collector/ingest" "sourcecraft.dev/bigbes/lethe/internal/collector/parser" "sourcecraft.dev/bigbes/lethe/internal/collector/parser/claudecode" + "sourcecraft.dev/bigbes/lethe/internal/collector/parser/opencode" "sourcecraft.dev/bigbes/lethe/internal/collector/state" ) @@ -218,6 +219,7 @@ func newStatusCmd(configPath *string) *cobra.Command { func buildParsers(host string) map[string]parser.Parser { return map[string]parser.Parser{ "claude-code": claudecode.New(host), + "opencode": opencode.New(host), } } diff --git a/cmd/lethe-collector/main_test.go b/cmd/lethe-collector/main_test.go index f09094c81fbe924f8ef67673ce4686f7db251367..338aeb091bb991316da1985efe163b9be194f628 100644 --- a/cmd/lethe-collector/main_test.go +++ b/cmd/lethe-collector/main_test.go @@ -45,3 +45,16 @@ func TestBuildLogger_LevelEnforced(t *testing.T) { t.Fatal("error-level logger should enable error") } } + +func TestBuildParsers_RegistersSupportedTools(t *testing.T) { + parsers := buildParsers("laptop") + for _, tool := range []string{"claude-code", "opencode"} { + p, ok := parsers[tool] + if !ok { + t.Fatalf("parser %q is not registered", tool) + } + if got := p.Tool(); got != tool { + t.Fatalf("parser %q Tool() = %q", tool, got) + } + } +} diff --git a/cmd/lethe/main.go b/cmd/lethe/main.go index 424d041b55f357fdba29131b5b768a9ce678ac7d..07b4b7ba4a80756ed057c9d72998002bfa51df04 100644 --- a/cmd/lethe/main.go +++ b/cmd/lethe/main.go @@ -27,6 +27,7 @@ import ( "sourcecraft.dev/bigbes/lethe/internal/domain/ingest" "sourcecraft.dev/bigbes/lethe/internal/domain/project" "sourcecraft.dev/bigbes/lethe/internal/domain/savedsearch" + "sourcecraft.dev/bigbes/lethe/internal/domain/search" "sourcecraft.dev/bigbes/lethe/internal/domain/session" "sourcecraft.dev/bigbes/lethe/internal/domain/stats" "sourcecraft.dev/bigbes/lethe/internal/platform/database" @@ -105,6 +106,8 @@ func run() int { statsHnd = &stats.Handler{} savedSearchRepo = &savedsearch.Repository{} savedSearchHnd = &savedsearch.Handler{} + searchRepo = &search.Repository{} + searchHnd = &search.Handler{} serverSvc = &server.Server{} ) @@ -112,7 +115,7 @@ func run() int { loggerSvc, metricsSvc, dbSvc, dbCheckSvc, healthSetSvc, authSvc, ingestRepo, ingestSvc, ingestHnd, sessionRepo, sessionHnd, projectRepo, projectHnd, statsRepo, statsHnd, - savedSearchRepo, savedSearchHnd, serverSvc, + savedSearchRepo, savedSearchHnd, searchRepo, searchHnd, serverSvc, } mgr.AddComponent(ctx, @@ -134,6 +137,8 @@ func run() int { steward.MustServiceAsset(statsHnd), steward.MustServiceAsset(savedSearchRepo), steward.MustServiceAsset(savedSearchHnd), + steward.MustServiceAsset(searchRepo), + steward.MustServiceAsset(searchHnd), steward.MustServiceAsset(serverSvc, steward.Root()), ) diff --git a/cmd/lethe/main_e2e_test.go b/cmd/lethe/main_e2e_test.go index c207decb5aa0e66bda6329d465f4a5fbb0ba4f58..3d03b8d3f277596c81700cc34d7c01a264dd0d83 100644 --- a/cmd/lethe/main_e2e_test.go +++ b/cmd/lethe/main_e2e_test.go @@ -23,6 +23,7 @@ import ( "sourcecraft.dev/bigbes/lethe/internal/domain/ingest" "sourcecraft.dev/bigbes/lethe/internal/domain/project" "sourcecraft.dev/bigbes/lethe/internal/domain/savedsearch" + "sourcecraft.dev/bigbes/lethe/internal/domain/search" "sourcecraft.dev/bigbes/lethe/internal/domain/session" "sourcecraft.dev/bigbes/lethe/internal/domain/stats" "sourcecraft.dev/bigbes/lethe/internal/platform/database" @@ -89,6 +90,8 @@ func TestEndToEnd_MultiUserIsolation(t *testing.T) { steward.MustServiceAsset(&stats.Handler{}), steward.MustServiceAsset(&savedsearch.Repository{}), steward.MustServiceAsset(&savedsearch.Handler{}), + steward.MustServiceAsset(&search.Repository{}), + steward.MustServiceAsset(&search.Handler{}), steward.MustServiceAsset(srv, steward.Root()), ) diff --git a/docs/spikes/opencode-format.md b/docs/spikes/opencode-format.md new file mode 100644 index 0000000000000000000000000000000000000000..17b4497c77a3956f5cc38e466c82268a84072c8b --- /dev/null +++ b/docs/spikes/opencode-format.md @@ -0,0 +1,196 @@ +# opencode Storage Format Spike + +**Date:** 2026-05-04 +**Scope:** Determine canonical source, schema, progress marker, and parser risks for lethe collector. +**Respects:** IV10, PC3, AS3, AS4, UK1. + +--- + +## 1. Canonical Source Choice + +**Recommendation: SQLite database (`~/.local/share/opencode/opencode.db`)** + +Rationale: +- The DB is the source of truth. JSON files in `storage/` are hydrated projections of DB rows with extra derived fields (e.g. `messageID` in part JSON). +- DB counts: **725 sessions**, **26,611 messages**, **107,889 parts**. +- JSON files are fewer: 16,926 message JSONs vs 26,611 DB rows, indicating some messages exist only in the DB. +- The DB is actively WAL-journaled (`opencode.db-wal` ~2MB, `-shm` 32KB), so reads must use the live DB path, not a copy. + +Alternative rejected: +- `storage/{message,part,session,project}/` JSON tree — incomplete, redundant, and 2GB total vs 1.3GB DB. +- `event` table — append-only event log with `event_sequence` aggregate IDs. Useful for replay but overkill for collector polling. +- `~/.cache/opencode/` — models.json, node_modules; no transcript data. +- `~/.config/opencode/` — auth, plugins, skills; no transcript data. +- `~/.local/share/opencode/tool-output/` — binary blobs referenced by `callID`, not structured text. + +--- + +## 2. Schema Overview + +### 2.1 Tables (relevant to collector) + +```sql +CREATE TABLE session ( + id TEXT PRIMARY KEY, + project_id TEXT NOT NULL, + parent_id TEXT, + slug TEXT NOT NULL, + directory TEXT NOT NULL, + title TEXT NOT NULL, + version TEXT NOT NULL, + time_created INTEGER NOT NULL, + time_updated INTEGER NOT NULL +); + +CREATE TABLE message ( + id TEXT PRIMARY KEY, + session_id TEXT NOT NULL, + time_created INTEGER NOT NULL, + time_updated INTEGER NOT NULL, + data TEXT NOT NULL -- JSON +); + +CREATE TABLE part ( + id TEXT PRIMARY KEY, + message_id TEXT NOT NULL, + session_id TEXT NOT NULL, + time_created INTEGER NOT NULL, + time_updated INTEGER NOT NULL, + data TEXT NOT NULL -- JSON +); +``` + +Indexes: +- `part_session_idx` ON `part` (`session_id`) +- `session_project_idx` ON `session` (`project_id`) + +### 2.2 Message `data` JSON shape + +Top-level keys (observed): +- `role`: `"user" | "assistant"` +- `agent`: `"build" | "implementer-deep" | "implementer-smart" | "oracle" | ...` +- `mode`: same as agent, the execution mode +- `parentID`: previous message ID (for threading) +- `path`: `{ cwd, root }` — absolute filesystem paths (PII risk) +- `modelID`, `providerID`: model metadata +- `tokens`: `{ input, output, reasoning, cache: { read, write } }` +- `cost`: float +- `time`: `{ created, completed? }` — epoch millis +- `finish`: `"stop" | "tool-calls" | ...` + +### 2.3 Part `data` JSON shape (by type) + +| Type | Keys | Content | +|------|------|---------| +| `text` | `type`, `text`, `time`, `metadata?` | Raw prose | +| `reasoning` | `type`, `text`, `time` | Model reasoning block | +| `tool` | `type`, `tool`, `callID`, `state` | Tool invocation + output | +| `step-start` | `type` | Turn boundary marker | +| `step-finish` | `type`, `reason`, `tokens`, `cost`, `snapshot` | Turn summary | +| `patch` | `type`, `patch`, `path` | Code diff | +| `file` | `type`, `path`, `content?` | File reference | +| `compaction` | ... | Summarized history | + +### 2.4 Tool output storage + +Tool outputs are **not** stored in `part.data` JSON. Instead: +- `part.data.state.output` contains a short summary/truncation marker for some tools. +- Large tool outputs are written to `~/.local/share/opencode/tool-output/tool_` as opaque files. +- 20 files observed, ranging from 0B to 6MB. +- The `callID` in the part JSON links to the filename. + +--- + +## 3. Session / Message / Part Flow + +A single user → assistant turn is not one row. It is a multi-part sequence: + +``` +message (user, role=user) + └── part (type=text) — user prompt + +message (assistant, role=assistant) + └── part (type=step-start) — turn begins + └── part (type=reasoning) — model thinking + └── part (type=tool) — tool call(s) + └── part (type=step-finish) — turn ends +``` + +For search indexing, a "turn" is best defined as: +- **User turn**: one `message` with `role=user` + its `text` part(s). +- **Assistant turn**: one `message` with `role=assistant` + all associated `text`, `reasoning`, and `tool` parts between `step-start` and `step-finish`. + +--- + +## 4. Progress Marker Choice + +**Recommendation: `message.rowid` as the progress offset.** + +Rationale: +- AS4 assumes an integer offset. SQLite `rowid` is an integer marker and fits the existing collector state schema without a new format. +- The collector persists progress from accepted event `Seq` values. `TurnEvent.Seq` stores the current `message.rowid`, while `ingestion_state.last_offset` stores the next rowid to scan. This keeps partial acceptance resumable: a saved next event rowid is included on the next poll, and a fully accepted scan saves `max(rowid)+1`. +- `message.rowid` is unique and monotonic for rows in the source DB, so equal-millisecond messages do not skip unaccepted rows. +- `message.time_created` is still the event timestamp, but it is not a safe offset: multiple messages can share the same millisecond. +- Messages and parts both have `time_created`, but messages are the turn boundaries; parts inherit the message's session and can be queried via `message_id`. +- Alternative: `message.id` lexicographic — IDs are sortable ULIDs (e.g. `msg_df14a9a20001MitrpJBdwZHMN4`) but integer offset is simpler for the existing state schema. +- Alternative: `event.seq` — requires joining through `event_sequence`, adds complexity. + +Query pattern for incremental collection: +```sql +SELECT * FROM message +WHERE rowid >= ? +ORDER BY rowid ASC; +``` + +--- + +## 5. Fixture Anonymization Notes + +Fields that must be redacted in test fixtures and logs: +- `path.cwd`, `path.root` — absolute local paths. +- `modelID` / `providerID` — may contain API key fragments or account info. +- Tool output file contents (`tool-output/tool_`) — may contain secrets, env vars, tokens. +- `auth.json` — contains access tokens, refresh tokens; never read in parser. +- Session `title` — may contain internal project names or ticket IDs. + +Safe to retain: +- ID prefixes and structure (`ses_`, `msg_`, `prt_`, `tool_`). +- `role`, `agent`, `mode`, `type` enums. +- `time_created` relative offsets. +- `slug` — random adjective-noun pairs, no PII. +- Token counts and costs (aggregated, not content). + +--- + +## 6. Parser Risks + +| Risk | Impact | Mitigation | +|------|--------|------------| +| **Schema drift** | opencode may add new `part` types or message keys | Defensive JSON unmarshaling; ignore unknown keys | +| **WAL lock contention** | Collector reads while opencode writes | Use `mode=ro` URI or copy with `sqlite3` CLI if needed | +| **Large tool outputs** | 6MB+ files could bloat index | Only index tool output metadata/summary; skip `tool-output/` files unless `include_tool_outputs=1` | +| **Part ordering ambiguity** | `time_created` may collide for parts in same message | Use `id` lexicographic as secondary sort | +| **Eventual consistency** | Event sourcing means DB state may lag events | Poll `message` table directly; events are for replay only | +| **Nested JSON depth** | `state.output` in tool parts may be stringified JSON | Treat as opaque text or attempt one-level unmarshal | +| **Session archiving** | `time_archived` column exists; archived sessions may be compacted | Respect `time_archived IS NULL` or include archived based on config | + +--- + +## 7. Open Questions for Parser Implementation + +1. Should the collector index **every** session or only those matching a project filter? +2. Should `reasoning` parts be indexed as prose or excluded by default? +3. Should `patch` parts be indexed as code/text or handled separately? +4. How to handle parent-child message threading (`parentID`) in search results? +5. What is the compaction format in `compaction` parts? (Only 60 observed, low priority.) + +--- + +## 8. Summary for Next Phase + +- **Source:** SQLite `~/.local/share/opencode/opencode.db`, read-only. +- **Tables:** `session`, `message`, `part`. +- **Progress:** `message.rowid` (INTEGER SQLite row marker). +- **Turn definition:** `message` row + linked `part` rows by `message_id`. +- **Tool outputs:** External files in `tool-output/`, referenced by `callID`. +- **Registration:** One parser package implementing `parser.Parser` + registration in collector. diff --git a/docs/tasks/lethe-search-and-opencode.md b/docs/tasks/lethe-search-and-opencode.md index 443c0a060cd992d776f5fecb6c7d86b28a69e0ca..a38226ae259213e21824ae7e81ba9a9108c8f7b6 100644 --- a/docs/tasks/lethe-search-and-opencode.md +++ b/docs/tasks/lethe-search-and-opencode.md @@ -1,6 +1,9 @@ # lethe-search-and-opencode -**Status:** Design (hands-off) +**Status:** done +**Branch:** `task/lethe-search-and-opencode` +**Worktree:** `/Users/blikh/data/home/lethe/.worktrees/lethe-search-and-opencode` +**Mode:** hands-off **Module:** `sourcecraft.dev/bigbes/lethe` **Depends on:** `lethe-server.md` (#1) — FTS5 tables and triggers were created in #1; this task only adds query code. `lethe-collector-claude-code.md` (#2) — the collector framework and Parser interface this task extends. **Sibling tasks (deferred):** per-tool parsers (`lethe-collector-crush.md`, `lethe-collector-pi.md`, `lethe-collector-kimi.md`); RFC backlog items (cost rollups for tools that report it, tagging, JSON/Markdown export). @@ -243,14 +246,76 @@ Approach: ship `/api/v1/search` as an additive read domain first, then run the o - IF2 — `func (h *Handler) Mount(r chi.Router)` — server mount contract matching other domain packages. - IF3 — `func New(host string) *Parser` — opencode parser constructor registered by the collector CLI. - IF4 — `func buildParsers(host string) map[string]parser.Parser` — collector parser registry remains the only dispatch point. +- IF5 — `docs/spikes/opencode-format.md` — canonical opencode source choice consumed by the parser phase. ### Interface Graph - PH1 -> IF1 @ `internal/domain/search/` - PH2 IF1 -> IF2 @ `internal/domain/search/`, `internal/server/`, `cmd/lethe/` -- PH3 -> @ `docs/spikes/opencode-format.md` -- PH4 -> IF3, IF4 @ `internal/collector/parser/opencode/`, `cmd/lethe-collector/` +- PH3 -> IF5 @ `docs/spikes/opencode-format.md` +- PH4 IF5 -> IF3, IF4 @ `internal/collector/parser/opencode/`, `cmd/lethe-collector/` Backwards-compat: additive route and parser registration only; PH1/PH2 do not alter existing routes or schema, and PH4 does not change the parser interface, runner, or collector state schema. Scope check: no stats work, no React search UI, no schema migration, no saved-search changes, and no parser abstraction beyond `buildParsers`. + +## Verify + +**Result:** passed + +Positive: +- CK1 — `/api/v1/search` repository and handler tests cover ranked prose search, tool-output opt-in, filters, cursors, and response envelope. +- CK2 — opencode parser tests cover SQLite discovery, turn mapping, tool summaries, resume marker, malformed skips, and collector registration. +- CK3 — `go build ./cmd/lethe ./cmd/lethe-collector` succeeds. +- CK4 — `go test ./... -count=1` passes. + +Negative: +- CK5 — empty/invalid search query and bad cursor return `INVALID`. +- CK6 — non-admin `?owner=` on search returns `FORBIDDEN`. +- CK7 — opencode parser does not ingest external `tool-output/` blob contents. + +Invariants / assumptions: +- CK8 (IV1, IV2) — no search package references schema DDL or `internal/shared/wire`. +- CK9 (IV3-IV7) — search tests verify read-path behavior, owner scoping, prose default, marker snippets, and invalid-query handling. +- CK10 (IV8-IV10, AS3, AS4) — opencode parser implements `parser.Parser`, keeps collector state schema unchanged, and consumes the committed storage spike. +- CK11 (IV11, IV12) — stats packages and React `/search` route were not changed. + +Interfaces: +- CK12 (IF1) — `Repository.Search(ctx, Filter)` is called by handler and repository tests. +- CK13 (IF2) — `Handler.Mount(r chi.Router)` registers `/api/v1/search`. +- CK14 (IF3, IF4) — `opencode.New(host)` is registered through `buildParsers` and tested by `cmd/lethe-collector`. +- CK15 (IF5) — `docs/spikes/opencode-format.md` records the SQLite source and `message.rowid` marker used by PH4. + +Smoke: `go test ./internal/domain/search -run TestHandler_SuccessfulResponseEnvelope -v` and `go test ./internal/collector/parser/opencode -run TestParse_MapsTurnsAndIdentity -v` both pass. + +## Conclusion + +Outcome: `/api/v1/search` and the opencode collector parser shipped on `task/lethe-search-and-opencode` through `5cc599d`. + +Invariants: +- IV1 — no migration files were added. +- IV2 — `internal/shared/wire/` was not modified. +- IV3 — search implementation is repository/handler read-path code only. +- IV4 — search handler uses the existing authenticated owner-scope rules. +- IV5 — repository tests cover prose-only default and tool-output opt-in. +- IV6 — snippets use marker bytes, not HTML. +- IV7 — empty, malformed, and bad-cursor search inputs return `INVALID`. +- IV8 — opencode implements `parser.Parser` unchanged. +- IV9 — collector runner and state schema were unchanged. +- IV10 — `docs/spikes/opencode-format.md` landed before parser implementation. +- IV11 — stats API/page code was not changed. +- IV12 — React `/search` route was not changed. + +### Assumptions check +- AS1 — held — search tests exercise FTS rows populated by existing triggers. +- AS2 — held — search joins FTS rowid back to `turns.rowid` in tests and implementation. +- AS3 — held — spike confirmed readable opencode SQLite storage under `~/.local/share/opencode/`. +- AS4 — held after review fix — collector `last_offset` stores next opencode `message.rowid`, and `TurnEvent.Seq` stores current rowid. + +### Unknowns outcome +- UK1 — resolved — SQLite `opencode.db` is canonical for v1. +- UK2 — resolved for v1 — invalid FTS syntax maps to `INVALID`; no stricter normalizer was needed. +- UK3 — still-open — BM25 quality needs real archive usage after ingest. + +### Review findings +- Critical: opencode offset marker changed from `message.time_created` to inclusive next-`message.rowid` after reviewer found skipped-row risk in partial-accept paths. diff --git a/internal/collector/parser/opencode/parser.go b/internal/collector/parser/opencode/parser.go new file mode 100644 index 0000000000000000000000000000000000000000..a523d307d787ef0d2f93464ce4cebbdd14cd55da --- /dev/null +++ b/internal/collector/parser/opencode/parser.go @@ -0,0 +1,409 @@ +package opencode + +import ( + "context" + "crypto/sha256" + "database/sql" + "encoding/hex" + "encoding/json" + "fmt" + "os" + "path/filepath" + "strings" + "time" + + "sourcecraft.dev/bigbes/lethe/internal/collector/parser" + "sourcecraft.dev/bigbes/lethe/internal/shared/wire" + + _ "modernc.org/sqlite" +) + +const toolName = "opencode" + +// Parser maps opencode SQLite transcript records into lethe wire events. +type Parser struct { + host string +} + +// New builds a parser that stamps every emitted event with host. +func New(host string) *Parser { + return &Parser{host: host} +} + +// Tool returns the collector-facing tool name. +func (p *Parser) Tool() string { + return toolName +} + +// Discover returns the canonical opencode SQLite database below root. The root +// may be ~/.local/share/opencode or the database path itself. +func (p *Parser) Discover(root string) ([]parser.SourceFile, error) { + info, err := os.Stat(root) + if err != nil { + return nil, err + } + if !info.IsDir() { + if filepath.Base(root) != "opencode.db" { + return nil, nil + } + return []parser.SourceFile{{Path: root, Size: info.Size()}}, nil + } + + dbPath := filepath.Join(root, "opencode.db") + info, err = os.Stat(dbPath) + if err != nil { + if os.IsNotExist(err) { + return nil, nil + } + return nil, err + } + if info.IsDir() { + return nil, nil + } + return []parser.SourceFile{{Path: dbPath, Size: info.Size()}}, nil +} + +// Parse reads opencode messages starting at since. since and the returned marker +// are message.rowid markers: since is the next rowid to scan, returned offset is +// one past the last scanned row, and message.time_created remains the timestamp. +func (p *Parser) Parse(path string, since int64) ([]wire.TurnEvent, int64, error) { + db, err := sql.Open("sqlite", readOnlyDSN(path)) + if err != nil { + return nil, since, err + } + defer func() { _ = db.Close() }() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + rows, err := db.QueryContext(ctx, ` +SELECT + m.rowid, + m.id, + m.session_id, + m.time_created, + m.data, + s.directory, + s.time_created, + s.slug, + s.version +FROM message m +JOIN session s ON s.id = m.session_id +WHERE m.rowid >= ? +ORDER BY m.rowid ASC`, since) + if err != nil { + return nil, since, err + } + defer func() { _ = rows.Close() }() + + events := make([]wire.TurnEvent, 0) + next := since + for rows.Next() { + var record messageRow + if err := rows.Scan(&record.RowID, &record.ID, &record.SessionID, &record.TimeCreated, &record.Data, &record.Directory, &record.SessionCreated, &record.Slug, &record.Version); err != nil { + return events, next, err + } + if candidate := record.RowID + 1; candidate > next { + next = candidate + } + parts, err := loadParts(ctx, db, record.ID) + if err != nil { + return events, next, err + } + event, ok := p.mapRecord(path, record, parts) + if ok { + events = append(events, event) + } + } + if err := rows.Err(); err != nil { + return events, next, err + } + return events, next, nil +} + +type messageRow struct { + RowID int64 + ID string + SessionID string + TimeCreated int64 + Data string + Directory string + SessionCreated int64 + Slug string + Version string +} + +type partRow struct { + ID string + TimeCreated int64 + Data string +} + +type messageData struct { + Role string `json:"role"` + Agent string `json:"agent"` + Mode string `json:"mode"` + ModelID string `json:"modelID"` + ProviderID string `json:"providerID"` + Tokens tokenData `json:"tokens"` + Cost float64 `json:"cost"` + Path pathData `json:"path"` + Time timeData `json:"time"` + Raw json.RawMessage +} + +type tokenData struct { + Input int64 `json:"input"` + Output int64 `json:"output"` +} + +type pathData struct { + CWD string `json:"cwd"` + Root string `json:"root"` +} + +type timeData struct { + Created int64 `json:"created"` + Completed int64 `json:"completed"` +} + +type partData struct { + Type string `json:"type"` + Text string `json:"text"` + Tool string `json:"tool"` + CallID string `json:"callID"` + State json.RawMessage `json:"state"` + Raw json.RawMessage +} + +type toolState struct { + Status string `json:"status"` + Output json.RawMessage `json:"output"` +} + +type toolCallSummary struct { + Tool string `json:"tool,omitempty"` + CallID string `json:"call_id,omitempty"` + Status string `json:"status,omitempty"` + Output string `json:"output,omitempty"` +} + +func loadParts(ctx context.Context, db *sql.DB, messageID string) ([]partRow, error) { + rows, err := db.QueryContext(ctx, ` +SELECT id, time_created, data +FROM part +WHERE message_id = ? +ORDER BY time_created ASC, id ASC`, messageID) + if err != nil { + return nil, err + } + defer func() { _ = rows.Close() }() + + parts := make([]partRow, 0) + for rows.Next() { + var part partRow + if err := rows.Scan(&part.ID, &part.TimeCreated, &part.Data); err != nil { + return nil, err + } + parts = append(parts, part) + } + if err := rows.Err(); err != nil { + return nil, err + } + return parts, nil +} + +func (p *Parser) mapRecord(path string, record messageRow, partRows []partRow) (wire.TurnEvent, bool) { + msg, ok := parseMessageData(record.Data) + if !ok { + return wire.TurnEvent{}, false + } + role := strings.TrimSpace(msg.Role) + if role != "user" && role != "assistant" && role != "system" { + return wire.TurnEvent{}, false + } + + texts := make([]string, 0) + toolCalls := make([]toolCallSummary, 0) + for _, row := range partRows { + part, ok := parsePartData(row.Data) + if !ok { + continue + } + switch part.Type { + case "text", "reasoning": + if text := strings.TrimSpace(part.Text); text != "" { + texts = append(texts, text) + } + case "tool": + summary := summarizeToolPart(part) + toolCalls = append(toolCalls, summary) + if rendered := renderToolSummary(summary); rendered != "" { + texts = append(texts, rendered) + } + } + } + content := strings.Join(texts, "\n\n") + if strings.TrimSpace(content) == "" && len(toolCalls) == 0 { + return wire.TurnEvent{}, false + } + + event := wire.TurnEvent{ + Tool: toolName, + Host: p.host, + SessionID: record.SessionID, + TurnID: turnIDFor(record), + Seq: record.RowID, + Role: role, + Timestamp: record.TimeCreated / 1000, + Content: content, + SessionMeta: wire.SessionMeta{ + WorkingDir: workingDirFor(record, msg), + SourceFile: path, + StartedAt: int64PtrOrNil(record.SessionCreated / 1000), + Metadata: sessionMetadata(record), + }, + Metadata: json.RawMessage(record.Data), + } + if model := strings.TrimSpace(msg.ModelID); model != "" { + event.Model = &model + } + if msg.Tokens.Input != 0 { + event.TokensIn = int64PtrOrNil(msg.Tokens.Input) + } + if msg.Tokens.Output != 0 { + event.TokensOut = int64PtrOrNil(msg.Tokens.Output) + } + if msg.Cost != 0 { + cost := msg.Cost + event.CostUSD = &cost + } + if len(toolCalls) > 0 { + payload, err := json.Marshal(toolCalls) + if err == nil { + event.ToolCalls = payload + } + } + return event, true +} + +func parseMessageData(raw string) (messageData, bool) { + var msg messageData + if err := json.Unmarshal([]byte(raw), &msg); err != nil { + return messageData{}, false + } + msg.Raw = cloneRaw([]byte(raw)) + return msg, true +} + +func parsePartData(raw string) (partData, bool) { + var part partData + if err := json.Unmarshal([]byte(raw), &part); err != nil { + return partData{}, false + } + part.Raw = cloneRaw([]byte(raw)) + return part, true +} + +func summarizeToolPart(part partData) toolCallSummary { + summary := toolCallSummary{Tool: strings.TrimSpace(part.Tool), CallID: strings.TrimSpace(part.CallID)} + var state toolState + if len(part.State) > 0 && json.Unmarshal(part.State, &state) == nil { + summary.Status = strings.TrimSpace(state.Status) + summary.Output = safeOutputSummary(state.Output) + } + return summary +} + +func safeOutputSummary(raw json.RawMessage) string { + trimmed := strings.TrimSpace(string(raw)) + if trimmed == "" || trimmed == "null" { + return "" + } + var text string + if err := json.Unmarshal(raw, &text); err == nil { + return summarizeText(text) + } + return summarizeText(trimmed) +} + +func renderToolSummary(summary toolCallSummary) string { + label := summary.Tool + if label == "" { + label = "tool" + } + parts := []string{fmt.Sprintf("" +} + +func sessionMetadata(record messageRow) json.RawMessage { + payload := struct { + Slug string `json:"slug,omitempty"` + Version string `json:"version,omitempty"` + }{Slug: record.Slug, Version: record.Version} + b, err := json.Marshal(payload) + if err != nil || string(b) == "{}" { + return nil + } + return b +} + +func workingDirFor(record messageRow, msg messageData) *string { + for _, value := range []string{record.Directory, msg.Path.CWD, msg.Path.Root} { + if value = strings.TrimSpace(value); value != "" { + return &value + } + } + return nil +} + +func turnIDFor(record messageRow) string { + if strings.TrimSpace(record.ID) != "" { + return record.ID + } + sum := sha256.Sum256([]byte(fmt.Sprintf("%s|%d", record.SessionID, record.TimeCreated))) + return hex.EncodeToString(sum[:8]) +} + +func summarizeText(text string) string { + text = strings.TrimSpace(text) + if text == "" { + return "" + } + line := text + if idx := strings.IndexByte(line, '\n'); idx >= 0 { + line = line[:idx] + } + line = strings.TrimSpace(line) + if len(line) > 120 { + line = line[:117] + "..." + } + return line +} + +func int64PtrOrNil(v int64) *int64 { + if v == 0 { + return nil + } + return &v +} + +func cloneRaw(raw json.RawMessage) json.RawMessage { + if len(raw) == 0 { + return nil + } + out := make([]byte, len(raw)) + copy(out, raw) + return out +} + +func readOnlyDSN(path string) string { + return "file:" + path + "?mode=ro&_pragma=busy_timeout(5000)" +} + +var _ parser.Parser = (*Parser)(nil) diff --git a/internal/collector/parser/opencode/parser_test.go b/internal/collector/parser/opencode/parser_test.go new file mode 100644 index 0000000000000000000000000000000000000000..5d4dd4bac8600e93594c46ad3d241f9387d5a9f4 --- /dev/null +++ b/internal/collector/parser/opencode/parser_test.go @@ -0,0 +1,334 @@ +package opencode + +import ( + "database/sql" + "encoding/json" + "os" + "path/filepath" + "strings" + "testing" + + _ "modernc.org/sqlite" +) + +func TestDiscover_FindsCanonicalDatabase(t *testing.T) { + root := t.TempDir() + dbPath := filepath.Join(root, "opencode.db") + writeFile(t, dbPath, "sqlite fixture") + writeFile(t, filepath.Join(root, "opencode.db-wal"), "wal") + writeFile(t, filepath.Join(root, "storage", "message", "ignored.json"), "{}") + + p := New("laptop") + files, err := p.Discover(root) + if err != nil { + t.Fatalf("Discover(root): %v", err) + } + if len(files) != 1 { + t.Fatalf("len(files) = %d, want 1", len(files)) + } + if got := files[0].Path; got != dbPath { + t.Fatalf("files[0].Path = %q, want %q", got, dbPath) + } + if files[0].Size == 0 { + t.Fatal("files[0].Size = 0, want database size") + } + + files, err = p.Discover(dbPath) + if err != nil { + t.Fatalf("Discover(dbPath): %v", err) + } + if len(files) != 1 || files[0].Path != dbPath { + t.Fatalf("Discover(dbPath) = %#v, want canonical db only", files) + } +} + +func TestParse_MapsTurnsAndIdentity(t *testing.T) { + dbPath := createOpenCodeDB(t) + insertSession(t, dbPath, "ses_demo", "/workspace/demo", 1700000000000) + insertMessage(t, dbPath, "msg_user", "ses_demo", 1700000001000, `{"role":"user","path":{"cwd":"/workspace/demo"},"time":{"created":1700000001000}}`) + insertPart(t, dbPath, "prt_user_text", "msg_user", "ses_demo", 1700000001001, `{"type":"text","text":"Please inspect the failing test."}`) + insertMessage(t, dbPath, "msg_assistant", "ses_demo", 1700000002000, `{"role":"assistant","modelID":"model-redacted","tokens":{"input":7,"output":11},"cost":0.00042}`) + insertPart(t, dbPath, "prt_step", "msg_assistant", "ses_demo", 1700000002001, `{"type":"step-start"}`) + insertPart(t, dbPath, "prt_reason", "msg_assistant", "ses_demo", 1700000002002, `{"type":"reasoning","text":"Short safe reasoning summary."}`) + insertPart(t, dbPath, "prt_text", "msg_assistant", "ses_demo", 1700000002003, `{"type":"text","text":"The test fails because the parser is missing."}`) + + p := New("laptop") + events, next, err := p.Parse(dbPath, 0) + if err != nil { + t.Fatalf("Parse: %v", err) + } + if len(events) != 2 { + t.Fatalf("len(events) = %d, want 2", len(events)) + } + if next != 3 { + t.Fatalf("next = %d, want marker after assistant message", next) + } + + user := events[0] + if user.Tool != "opencode" || user.Host != "laptop" || user.SessionID != "ses_demo" || user.TurnID != "msg_user" { + t.Fatalf("user identity = tool=%q host=%q session=%q turn=%q", user.Tool, user.Host, user.SessionID, user.TurnID) + } + if user.Seq != 1 || user.Timestamp != 1700000001 || user.Role != "user" { + t.Fatalf("user seq/timestamp/role = %d/%d/%q", user.Seq, user.Timestamp, user.Role) + } + if user.Content != "Please inspect the failing test." { + t.Fatalf("user.Content = %q", user.Content) + } + if user.SessionMeta.WorkingDir == nil || *user.SessionMeta.WorkingDir != "/workspace/demo" { + t.Fatalf("user working dir = %v, want session directory", user.SessionMeta.WorkingDir) + } + if user.SessionMeta.SourceFile != dbPath { + t.Fatalf("user source_file = %q, want %q", user.SessionMeta.SourceFile, dbPath) + } + + assistant := events[1] + if assistant.Role != "assistant" { + t.Fatalf("assistant.Role = %q", assistant.Role) + } + if !strings.Contains(assistant.Content, "Short safe reasoning summary.") || !strings.Contains(assistant.Content, "The test fails because the parser is missing.") { + t.Fatalf("assistant.Content = %q", assistant.Content) + } + if assistant.Model == nil || *assistant.Model != "model-redacted" { + t.Fatalf("assistant.Model = %v", assistant.Model) + } + if assistant.TokensIn == nil || *assistant.TokensIn != 7 || assistant.TokensOut == nil || *assistant.TokensOut != 11 { + t.Fatalf("assistant tokens = %v/%v", assistant.TokensIn, assistant.TokensOut) + } + if assistant.CostUSD == nil || *assistant.CostUSD != 0.00042 { + t.Fatalf("assistant.CostUSD = %v", assistant.CostUSD) + } +} + +func TestParse_MapsToolPartSummaryWithoutExternalBlob(t *testing.T) { + dbPath := createOpenCodeDB(t) + insertSession(t, dbPath, "ses_tools", "/workspace/tools", 1700000100000) + insertMessage(t, dbPath, "msg_tool", "ses_tools", 1700000101000, `{"role":"assistant"}`) + insertPart(t, dbPath, "prt_tool", "msg_tool", "ses_tools", 1700000101001, `{"type":"tool","tool":"bash","callID":"call_safe","state":{"status":"completed","output":"go test ./internal/collector/parser/opencode ok"}}`) + toolOutputPath := filepath.Join(filepath.Dir(dbPath), "tool-output", "tool_call_safe") + writeFile(t, toolOutputPath, "SECRET=do-not-ingest") + + events, _, err := New("laptop").Parse(dbPath, 0) + if err != nil { + t.Fatalf("Parse: %v", err) + } + if len(events) != 1 { + t.Fatalf("len(events) = %d, want 1", len(events)) + } + if events[0].Role != "assistant" { + t.Fatalf("Role = %q, want assistant", events[0].Role) + } + if !strings.Contains(events[0].Content, "") { + t.Fatalf("Content = %q", events[0].Content) + } + if strings.Contains(events[0].Content, "SECRET") || strings.Contains(string(events[0].ToolCalls), "SECRET") { + t.Fatalf("external tool-output blob was ingested: content=%q tool_calls=%s", events[0].Content, string(events[0].ToolCalls)) + } + if !strings.Contains(string(events[0].ToolCalls), `"call_id":"call_safe"`) || !strings.Contains(string(events[0].ToolCalls), `"output":"go test ./internal/collector/parser/opencode ok"`) { + t.Fatalf("ToolCalls = %s, want safe part summary", string(events[0].ToolCalls)) + } +} + +func TestParse_ResumesFromMessageRowIDMarker(t *testing.T) { + dbPath := createOpenCodeDB(t) + insertSession(t, dbPath, "ses_resume", "/workspace/resume", 1700000200000) + insertMessage(t, dbPath, "msg_old", "ses_resume", 1700000201000, `{"role":"user"}`) + insertPart(t, dbPath, "prt_old", "msg_old", "ses_resume", 1700000201001, `{"type":"text","text":"old"}`) + insertMessage(t, dbPath, "msg_new", "ses_resume", 1700000202000, `{"role":"assistant"}`) + insertPart(t, dbPath, "prt_new", "msg_new", "ses_resume", 1700000202001, `{"type":"text","text":"new"}`) + + events, next, err := New("laptop").Parse(dbPath, 2) + if err != nil { + t.Fatalf("Parse: %v", err) + } + if len(events) != 1 || events[0].TurnID != "msg_new" || events[0].Content != "new" { + t.Fatalf("events = %#v, want only new message", events) + } + if next != 3 { + t.Fatalf("next = %d, want newest marker", next) + } + + events, next, err = New("laptop").Parse(dbPath, next) + if err != nil { + t.Fatalf("Parse second: %v", err) + } + if len(events) != 0 || next != 3 { + t.Fatalf("second parse = len %d next %d, want no events and stable marker", len(events), next) + } +} + +func TestParse_IdenticalMessageTimesResumeByRowID(t *testing.T) { + dbPath := createOpenCodeDB(t) + insertSession(t, dbPath, "ses_same_time", "/workspace/same-time", 1700000250000) + insertMessage(t, dbPath, "msg_first", "ses_same_time", 1700000251000, `{"role":"user"}`) + insertPart(t, dbPath, "prt_first", "msg_first", "ses_same_time", 1700000251001, `{"type":"text","text":"first"}`) + insertMessage(t, dbPath, "msg_second", "ses_same_time", 1700000251000, `{"role":"assistant"}`) + insertPart(t, dbPath, "prt_second", "msg_second", "ses_same_time", 1700000251002, `{"type":"text","text":"second"}`) + + events, next, err := New("laptop").Parse(dbPath, 0) + if err != nil { + t.Fatalf("Parse: %v", err) + } + if len(events) != 2 || events[0].TurnID != "msg_first" || events[1].TurnID != "msg_second" { + t.Fatalf("events = %#v, want both equal-time messages in rowid order", events) + } + if events[0].Seq != 1 || events[1].Seq != 2 || next != 3 { + t.Fatalf("seqs/next = %d/%d/%d, want rowid markers 1/2/3", events[0].Seq, events[1].Seq, next) + } + + events, next, err = New("laptop").Parse(dbPath, events[1].Seq) + if err != nil { + t.Fatalf("Parse resume: %v", err) + } + if len(events) != 1 || events[0].TurnID != "msg_second" || events[0].Content != "second" { + t.Fatalf("resume events = %#v, want only second equal-time message", events) + } + if next != 3 { + t.Fatalf("resume next = %d, want marker after second rowid", next) + } + + events, next, err = New("laptop").Parse(dbPath, next) + if err != nil { + t.Fatalf("Parse after full resume: %v", err) + } + if len(events) != 0 || next != 3 { + t.Fatalf("after full parse = len %d next %d, want no rows and unchanged marker 3", len(events), next) + } +} + +func TestParse_EmptyParseReturnsInputMarker(t *testing.T) { + dbPath := createOpenCodeDB(t) + insertSession(t, dbPath, "ses_empty", "/workspace/empty", 1700000260000) + + events, next, err := New("laptop").Parse(dbPath, 7) + if err != nil { + t.Fatalf("Parse: %v", err) + } + if len(events) != 0 || next != 7 { + t.Fatalf("empty parse = len %d next %d, want no events and unchanged marker 7", len(events), next) + } +} + +func TestParse_ReturnsMarkerAfterHighestScannedRowIDWithGaps(t *testing.T) { + dbPath := createOpenCodeDB(t) + insertSession(t, dbPath, "ses_gap", "/workspace/gap", 1700000270000) + insertMessageWithRowID(t, dbPath, 5, "msg_gap", "ses_gap", 1700000271000, `{"role":"user"}`) + insertPart(t, dbPath, "prt_gap", "msg_gap", "ses_gap", 1700000271001, `{"type":"text","text":"gap"}`) + + events, next, err := New("laptop").Parse(dbPath, 1) + if err != nil { + t.Fatalf("Parse: %v", err) + } + if len(events) != 1 || events[0].Seq != 5 || events[0].TurnID != "msg_gap" { + t.Fatalf("events = %#v, want rowid 5 event", events) + } + if next != 6 { + t.Fatalf("next = %d, want marker after highest scanned rowid", next) + } +} + +func TestParse_SkipsMalformedAndUnknownRecordsButAdvances(t *testing.T) { + dbPath := createOpenCodeDB(t) + insertSession(t, dbPath, "ses_bad", "/workspace/bad", 1700000300000) + insertMessage(t, dbPath, "msg_bad_json", "ses_bad", 1700000301000, `{not-json}`) + insertPart(t, dbPath, "prt_bad", "msg_bad_json", "ses_bad", 1700000301001, `{"type":"text","text":"bad message should not leak"}`) + insertMessage(t, dbPath, "msg_unknown_role", "ses_bad", 1700000302000, `{"role":"plugin"}`) + insertPart(t, dbPath, "prt_unknown", "msg_unknown_role", "ses_bad", 1700000302001, `{"type":"text","text":"unknown role"}`) + insertMessage(t, dbPath, "msg_good", "ses_bad", 1700000303000, `{"role":"user"}`) + insertPart(t, dbPath, "prt_good", "msg_good", "ses_bad", 1700000303001, `{"type":"text","text":"good"}`) + + events, next, err := New("laptop").Parse(dbPath, 0) + if err != nil { + t.Fatalf("Parse: %v", err) + } + if len(events) != 1 || events[0].TurnID != "msg_good" || events[0].Content != "good" { + t.Fatalf("events = %#v, want only good event", events) + } + if next != 4 { + t.Fatalf("next = %d, want all records consumed", next) + } +} + +func createOpenCodeDB(t *testing.T) string { + t.Helper() + dir := t.TempDir() + dbPath := filepath.Join(dir, "opencode.db") + db, err := sql.Open("sqlite", dbPath) + if err != nil { + t.Fatalf("open sqlite: %v", err) + } + defer func() { _ = db.Close() }() + ddl, err := os.ReadFile(filepath.Join("testdata", "schema.sql")) + if err != nil { + t.Fatalf("read schema: %v", err) + } + if _, err := db.Exec(string(ddl)); err != nil { + t.Fatalf("create schema: %v", err) + } + return dbPath +} + +func insertSession(t *testing.T, dbPath, id, directory string, created int64) { + t.Helper() + withDB(t, dbPath, func(db *sql.DB) { + _, err := db.Exec(`INSERT INTO session (id, project_id, slug, directory, title, version, time_created, time_updated) VALUES (?, 'proj_redacted', 'quiet-river', ?, 'redacted', '0.0.0', ?, ?)`, id, directory, created, created) + if err != nil { + t.Fatalf("insert session: %v", err) + } + }) +} + +func insertMessage(t *testing.T, dbPath, id, sessionID string, created int64, data string) { + t.Helper() + if !json.Valid([]byte(data)) && !strings.HasPrefix(data, "{not-json}") { + t.Fatalf("test fixture message data is invalid unexpectedly: %s", data) + } + withDB(t, dbPath, func(db *sql.DB) { + _, err := db.Exec(`INSERT INTO message (id, session_id, time_created, time_updated, data) VALUES (?, ?, ?, ?, ?)`, id, sessionID, created, created, data) + if err != nil { + t.Fatalf("insert message: %v", err) + } + }) +} + +func insertMessageWithRowID(t *testing.T, dbPath string, rowID int64, id, sessionID string, created int64, data string) { + t.Helper() + if !json.Valid([]byte(data)) && !strings.HasPrefix(data, "{not-json}") { + t.Fatalf("test fixture message data is invalid unexpectedly: %s", data) + } + withDB(t, dbPath, func(db *sql.DB) { + _, err := db.Exec(`INSERT INTO message (rowid, id, session_id, time_created, time_updated, data) VALUES (?, ?, ?, ?, ?, ?)`, rowID, id, sessionID, created, created, data) + if err != nil { + t.Fatalf("insert message with rowid: %v", err) + } + }) +} + +func insertPart(t *testing.T, dbPath, id, messageID, sessionID string, created int64, data string) { + t.Helper() + withDB(t, dbPath, func(db *sql.DB) { + _, err := db.Exec(`INSERT INTO part (id, message_id, session_id, time_created, time_updated, data) VALUES (?, ?, ?, ?, ?, ?)`, id, messageID, sessionID, created, created, data) + if err != nil { + t.Fatalf("insert part: %v", err) + } + }) +} + +func withDB(t *testing.T, dbPath string, fn func(*sql.DB)) { + t.Helper() + db, err := sql.Open("sqlite", dbPath) + if err != nil { + t.Fatalf("open sqlite: %v", err) + } + defer func() { _ = db.Close() }() + fn(db) +} + +func writeFile(t *testing.T, path, body string) { + t.Helper() + if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { + t.Fatalf("MkdirAll(%q): %v", path, err) + } + if err := os.WriteFile(path, []byte(body), 0o600); err != nil { + t.Fatalf("WriteFile(%q): %v", path, err) + } +} diff --git a/internal/collector/parser/opencode/testdata/schema.sql b/internal/collector/parser/opencode/testdata/schema.sql new file mode 100644 index 0000000000000000000000000000000000000000..b366698900ee377717b70e4034c71ca3a49c8a00 --- /dev/null +++ b/internal/collector/parser/opencode/testdata/schema.sql @@ -0,0 +1,31 @@ +CREATE TABLE session ( + id TEXT PRIMARY KEY, + project_id TEXT NOT NULL, + parent_id TEXT, + slug TEXT NOT NULL, + directory TEXT NOT NULL, + title TEXT NOT NULL, + version TEXT NOT NULL, + time_created INTEGER NOT NULL, + time_updated INTEGER NOT NULL +); + +CREATE TABLE message ( + id TEXT PRIMARY KEY, + session_id TEXT NOT NULL, + time_created INTEGER NOT NULL, + time_updated INTEGER NOT NULL, + data TEXT NOT NULL +); + +CREATE TABLE part ( + id TEXT PRIMARY KEY, + message_id TEXT NOT NULL, + session_id TEXT NOT NULL, + time_created INTEGER NOT NULL, + time_updated INTEGER NOT NULL, + data TEXT NOT NULL +); + +CREATE INDEX part_session_idx ON part (session_id); +CREATE INDEX session_project_idx ON session (project_id); diff --git a/internal/domain/search/handler.go b/internal/domain/search/handler.go new file mode 100644 index 0000000000000000000000000000000000000000..da851636c552a96ab5102b055b65663d6ea36196 --- /dev/null +++ b/internal/domain/search/handler.go @@ -0,0 +1,154 @@ +package search + +import ( + "context" + "log/slog" + "net/http" + "strconv" + "strings" + + "github.com/go-chi/chi/v5" + "go.bigb.es/auxilia/culpa" + "go.bigb.es/auxilia/scribe" + + "sourcecraft.dev/bigbes/lethe/internal/domain/session" + "sourcecraft.dev/bigbes/lethe/internal/pkg/apierror" + "sourcecraft.dev/bigbes/lethe/internal/pkg/httputil" + "sourcecraft.dev/bigbes/lethe/internal/server/auth" +) + +const ( + defaultLimit = 50 + maxLimit = 200 +) + +const allOwnersSentinel = "*" + +// Handler is the steward-managed HTTP boundary for the search read API. +// Repo is the injected SQL steward; the Handler holds no other state. +type Handler struct { + Repo *Repository `inject:""` +} + +// Init satisfies the steward Initer contract. +func (h *Handler) Init(_ context.Context) error { return nil } + +// Mount registers the read route under r. Server.Init mounts this inside +// the /api/v1 group, so the effective path is /api/v1/search. +func (h *Handler) Mount(r chi.Router) { + r.Get("/search", h.List) +} + +// List handles GET /search. It resolves the owner scope, parses query +// parameters, clamps pagination, and writes a search.Result. Errors surface +// through apierror.Render as RFC 7807. +func (h *Handler) List(w http.ResponseWriter, r *http.Request) { + scope, err := h.resolveScope(r) + if err != nil { + apierror.Render(w, r, err) + return + } + + q := r.URL.Query() + filter := Filter{Owner: scope} + + filter.Query = strings.TrimSpace(q.Get("q")) + if filter.Query == "" { + apierror.Render(w, r, culpa.WithCode( + culpa.WithPublic(culpa.New("search query is empty"), "q must not be empty"), + "INVALID", + )) + return + } + + if v := q.Get("include_tool_outputs"); v == "1" { + filter.IncludeToolOutputs = true + } + if v := q.Get("tool"); v != "" { + filter.Tool = &v + } + if v := q.Get("host"); v != "" { + filter.Host = &v + } + if v := q.Get("since"); v != "" { + n, perr := strconv.ParseInt(v, 10, 64) + if perr != nil { + apierror.Render(w, r, culpa.WithCode( + culpa.WithPublic(culpa.Wrap(perr, "parse since"), "since must be an integer (unix epoch seconds)"), + "INVALID", + )) + return + } + filter.Since = &n + } + if v := q.Get("until"); v != "" { + n, perr := strconv.ParseInt(v, 10, 64) + if perr != nil { + apierror.Render(w, r, culpa.WithCode( + culpa.WithPublic(culpa.Wrap(perr, "parse until"), "until must be an integer (unix epoch seconds)"), + "INVALID", + )) + return + } + filter.Until = &n + } + if filter.Since != nil && filter.Until != nil && *filter.Since > *filter.Until { + apierror.Render(w, r, culpa.WithCode( + culpa.WithPublic(culpa.New("since > until"), "since must be <= until"), + "INVALID", + )) + return + } + + filter.Limit = clampLimit(q.Get("limit")) + filter.Cursor = q.Get("cursor") + + result, err := h.Repo.Search(r.Context(), filter) + if err != nil { + apierror.Render(w, r, err) + return + } + + if writeErr := httputil.WriteJSON(w, http.StatusOK, result); writeErr != nil { + slog.Default().ErrorContext(r.Context(), "write search response", scribe.Err(writeErr)) + } +} + +// resolveScope reads the authenticated identity off the context and the +// optional `?owner=` query parameter, then returns the appropriate +// session.OwnerScope. Non-admin requests with `?owner=` set are 403. +func (h *Handler) resolveScope(r *http.Request) (session.OwnerScope, error) { + id := auth.MustIdentity(r.Context()) + param := r.URL.Query().Get("owner") + if param == "" { + return session.OwnerScope{User: id.User}, nil + } + if !id.IsAdmin { + return session.OwnerScope{}, culpa.WithCode( + culpa.WithPublic(culpa.New("?owner= is admin-only"), "?owner= is admin-only"), + "FORBIDDEN", + ) + } + if param == allOwnersSentinel { + return session.OwnerScope{User: id.User, AllOwners: true}, nil + } + owner := strings.ToLower(param) + return session.OwnerScope{User: id.User, SpecificOwner: &owner}, nil +} + +// clampLimit returns the effective limit: defaultLimit when missing, +// non-numeric, or negative; capped at maxLimit when the parsed value +// exceeds it. +func clampLimit(raw string) int { + if raw == "" { + return defaultLimit + } + n, err := strconv.Atoi(raw) + if err != nil || n < 0 { + return defaultLimit + } + if n > maxLimit { + return maxLimit + } + return n +} diff --git a/internal/domain/search/handler_test.go b/internal/domain/search/handler_test.go new file mode 100644 index 0000000000000000000000000000000000000000..c5075bdf92637bf5426371016d2468e60ee4832e --- /dev/null +++ b/internal/domain/search/handler_test.go @@ -0,0 +1,283 @@ +package search_test + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "github.com/go-chi/chi/v5" + + "sourcecraft.dev/bigbes/lethe/internal/domain/search" + "sourcecraft.dev/bigbes/lethe/internal/server/auth" +) + +// fakeAuthMiddleware injects a fixed Identity onto the request context. +func fakeAuthMiddleware(id auth.Identity) func(http.Handler) http.Handler { + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ctx := auth.WithIdentity(r.Context(), id) + next.ServeHTTP(w, r.WithContext(ctx)) + }) + } +} + +// newHandler wires a Repository against a fresh in-memory database and +// returns the Handler. +func newHandler(t *testing.T) (*search.Handler, *search.Repository) { + t.Helper() + repo, _ := newRepo(t) + h := &search.Handler{Repo: repo} + if err := h.Init(context.Background()); err != nil { + t.Fatalf("handler.Init: %v", err) + } + return h, repo +} + +// mountWithIdentity builds a chi router with fake auth and the search handler. +func mountWithIdentity(h *search.Handler, id auth.Identity) http.Handler { + r := chi.NewRouter() + r.Route("/api/v1", func(r chi.Router) { + r.Use(fakeAuthMiddleware(id)) + h.Mount(r) + }) + return r +} + +// problemBody captures RFC 7807 fields tests assert on. +type problemBody struct { + Status int `json:"status"` + Code string `json:"code"` +} + +// doSearch performs GET /api/v1/search with the given query string. +func doSearch(t *testing.T, router http.Handler, query string) (*httptest.ResponseRecorder, search.Result) { + t.Helper() + req := httptest.NewRequest(http.MethodGet, "/api/v1/search", nil) + if query != "" { + req.URL.RawQuery = query[1:] // strip leading '?' + } + rec := httptest.NewRecorder() + router.ServeHTTP(rec, req) + var body search.Result + if rec.Code == http.StatusOK { + if err := json.Unmarshal(rec.Body.Bytes(), &body); err != nil { + t.Fatalf("unmarshal search body: %v (body=%s)", err, rec.Body.String()) + } + } + return rec, body +} + +func TestHandler_Mount_RegistersSearchRoute(t *testing.T) { + h, _ := newHandler(t) + router := chi.NewRouter() + router.Route("/api/v1", func(r chi.Router) { + r.Use(fakeAuthMiddleware(auth.Identity{User: "alice"})) + h.Mount(r) + }) + + found := false + _ = chi.Walk(router, func(method, route string, _ http.Handler, _ ...func(http.Handler) http.Handler) error { + if method == http.MethodGet && route == "/api/v1/search" { + found = true + } + return nil + }) + if !found { + t.Fatal("expected GET /api/v1/search registered") + } +} + +func TestHandler_MissingQueryReturns400(t *testing.T) { + h, _ := newHandler(t) + router := mountWithIdentity(h, auth.Identity{User: "alice"}) + + rec, _ := doSearch(t, router, "") + if rec.Code != http.StatusBadRequest { + t.Fatalf("status=%d; want 400; body=%s", rec.Code, rec.Body.String()) + } + var p problemBody + _ = json.Unmarshal(rec.Body.Bytes(), &p) + if p.Code != "INVALID" { + t.Fatalf("expected INVALID; got %q", p.Code) + } +} + +func TestHandler_EmptyQueryReturns400(t *testing.T) { + h, _ := newHandler(t) + router := mountWithIdentity(h, auth.Identity{User: "alice"}) + + rec, _ := doSearch(t, router, "?q= ") + if rec.Code != http.StatusBadRequest { + t.Fatalf("status=%d; want 400; body=%s", rec.Code, rec.Body.String()) + } + var p problemBody + _ = json.Unmarshal(rec.Body.Bytes(), &p) + if p.Code != "INVALID" { + t.Fatalf("expected INVALID; got %q", p.Code) + } +} + +func TestHandler_BadSinceReturns400(t *testing.T) { + h, _ := newHandler(t) + router := mountWithIdentity(h, auth.Identity{User: "alice"}) + + rec, _ := doSearch(t, router, "?q=hello&since=not-a-number") + if rec.Code != http.StatusBadRequest { + t.Fatalf("status=%d; want 400; body=%s", rec.Code, rec.Body.String()) + } + var p problemBody + _ = json.Unmarshal(rec.Body.Bytes(), &p) + if p.Code != "INVALID" { + t.Fatalf("expected INVALID; got %q", p.Code) + } +} + +func TestHandler_NonAdminOwnerParamReturns403(t *testing.T) { + h, _ := newHandler(t) + router := mountWithIdentity(h, auth.Identity{User: "alice", IsAdmin: false}) + + for _, q := range []string{"?q=hello&owner=alice", "?q=hello&owner=bob", "?q=hello&owner=*"} { + rec, _ := doSearch(t, router, q) + if rec.Code != http.StatusForbidden { + t.Fatalf("query %q: status=%d; want 403; body=%s", q, rec.Code, rec.Body.String()) + } + var p problemBody + _ = json.Unmarshal(rec.Body.Bytes(), &p) + if p.Code != "FORBIDDEN" { + t.Fatalf("query %q: code=%q; want FORBIDDEN", q, p.Code) + } + } +} + +func TestHandler_AdminOwnerStarReturnsAllOwners(t *testing.T) { + h, repo := newHandler(t) + db := repo.Database.DB + seedSession(t, db, "alice", "cc", "phoebe", "sa", 100, 110) + seedTurn(t, db, "alice", "cc", "phoebe", "sa", "t1", 1, 101, "user", "needle alpha", nil) + seedSession(t, db, "bob", "cc", "phoebe", "sb", 100, 110) + seedTurn(t, db, "bob", "cc", "phoebe", "sb", "t1", 1, 101, "user", "needle beta", nil) + + router := mountWithIdentity(h, auth.Identity{User: "admin", IsAdmin: true}) + + rec, body := doSearch(t, router, "?q=needle&owner=*") + if rec.Code != http.StatusOK { + t.Fatalf("status=%d body=%s", rec.Code, rec.Body.String()) + } + if len(body.Results) != 2 { + t.Fatalf("expected 2 rows; got %d (%#v)", len(body.Results), body.Results) + } +} + +func TestHandler_BadCursorReturns400(t *testing.T) { + h, _ := newHandler(t) + router := mountWithIdentity(h, auth.Identity{User: "alice"}) + + rec, _ := doSearch(t, router, "?q=hello&cursor=not-valid") + if rec.Code != http.StatusBadRequest { + t.Fatalf("status=%d; want 400; body=%s", rec.Code, rec.Body.String()) + } + var p problemBody + _ = json.Unmarshal(rec.Body.Bytes(), &p) + if p.Code != "INVALID" { + t.Fatalf("expected INVALID; got %q", p.Code) + } +} + +func TestHandler_SuccessfulResponseEnvelope(t *testing.T) { + h, repo := newHandler(t) + db := repo.Database.DB + seedSession(t, db, "alice", "cc", "phoebe", "s1", 100, 110) + seedTurn(t, db, "alice", "cc", "phoebe", "s1", "t1", 1, 101, "user", "needle in haystack", nil) + + router := mountWithIdentity(h, auth.Identity{User: "alice"}) + + rec, body := doSearch(t, router, "?q=needle&limit=10") + if rec.Code != http.StatusOK { + t.Fatalf("status=%d body=%s", rec.Code, rec.Body.String()) + } + if body.Limit != 10 { + t.Fatalf("limit=%d; want 10", body.Limit) + } + if len(body.Results) != 1 { + t.Fatalf("expected 1 result; got %d (%#v)", len(body.Results), body.Results) + } + if body.Results[0].SessionID != "s1" { + t.Fatalf("expected session s1; got %#v", body.Results[0]) + } + // NextCursor should be empty when results fit in one page. + if body.NextCursor != "" { + t.Fatalf("expected empty next_cursor for single page; got %q", body.NextCursor) + } +} + +func TestHandler_LimitClamping(t *testing.T) { + h, _ := newHandler(t) + router := mountWithIdentity(h, auth.Identity{User: "alice"}) + + // Missing limit → default 50 + rec, body := doSearch(t, router, "?q=hello") + if rec.Code != http.StatusOK { + t.Fatalf("status=%d body=%s", rec.Code, rec.Body.String()) + } + if body.Limit != 50 { + t.Fatalf("default limit=%d; want 50", body.Limit) + } + + // Over max → capped at 200 + rec, body = doSearch(t, router, "?q=hello&limit=999") + if rec.Code != http.StatusOK { + t.Fatalf("status=%d body=%s", rec.Code, rec.Body.String()) + } + if body.Limit != 200 { + t.Fatalf("capped limit=%d; want 200", body.Limit) + } +} + +func TestHandler_IncludeToolOutputsParam(t *testing.T) { + h, repo := newHandler(t) + db := repo.Database.DB + seedSession(t, db, "alice", "cc", "phoebe", "s1", 100, 110) + seedTurn(t, db, "alice", "cc", "phoebe", "s1", "prose", 1, 101, "user", "needle in prose", nil) + seedTurn(t, db, "alice", "cc", "phoebe", "s1", "tool", 2, 102, "assistant", "plain text", strptr(`{"output":"needle in shell"}`)) + + router := mountWithIdentity(h, auth.Identity{User: "alice"}) + + // Default: prose only + rec, body := doSearch(t, router, "?q=needle") + if rec.Code != http.StatusOK { + t.Fatalf("status=%d body=%s", rec.Code, rec.Body.String()) + } + if len(body.Results) != 1 || body.Results[0].MatchSource != search.SourceTurn { + t.Fatalf("expected 1 prose result; got %#v", body.Results) + } + + // Explicit include + rec, body = doSearch(t, router, "?q=needle&include_tool_outputs=1") + if rec.Code != http.StatusOK { + t.Fatalf("status=%d body=%s", rec.Code, rec.Body.String()) + } + if len(body.Results) != 2 { + t.Fatalf("expected 2 results; got %#v", body.Results) + } +} + +func TestHandler_PerUserIsolation(t *testing.T) { + h, repo := newHandler(t) + db := repo.Database.DB + seedSession(t, db, "alice", "cc", "phoebe", "sa", 100, 110) + seedTurn(t, db, "alice", "cc", "phoebe", "sa", "t1", 1, 101, "user", "alice needle", nil) + seedSession(t, db, "bob", "cc", "phoebe", "sb", 100, 110) + seedTurn(t, db, "bob", "cc", "phoebe", "sb", "t1", 1, 101, "user", "bob needle", nil) + + router := mountWithIdentity(h, auth.Identity{User: "alice"}) + + rec, body := doSearch(t, router, "?q=needle") + if rec.Code != http.StatusOK { + t.Fatalf("status=%d body=%s", rec.Code, rec.Body.String()) + } + if len(body.Results) != 1 || body.Results[0].Owner != "alice" { + t.Fatalf("alice should see only her row; got %#v", body.Results) + } +} diff --git a/internal/domain/search/repository.go b/internal/domain/search/repository.go new file mode 100644 index 0000000000000000000000000000000000000000..dc972082cf346e9148e1f5f817fe57fb94e17727 --- /dev/null +++ b/internal/domain/search/repository.go @@ -0,0 +1,274 @@ +package search + +import ( + "context" + "crypto/sha256" + "database/sql" + "encoding/base64" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "strings" + + "go.bigb.es/auxilia/culpa" + + "sourcecraft.dev/bigbes/lethe/internal/domain/session" + "sourcecraft.dev/bigbes/lethe/internal/platform/database" +) + +const ( + SourceTurn = "turn" + SourceToolOutput = "tool_output" + + cursorVersion = 1 +) + +// Result is the repository response shape for paginated search output. +type Result struct { + Results []Row `json:"results"` + Limit int `json:"limit"` + NextCursor string `json:"next_cursor,omitempty"` + + Rows []Row `json:"-"` +} + +// Row is one matched turn. Snippet is generated by SQLite FTS5 and uses marker +// runes around hits; it never contains HTML markup added by the repository. +type Row struct { + Owner string `db:"owner" json:"owner"` + Tool string `db:"tool" json:"tool"` + Host string `db:"host" json:"host"` + SessionID string `db:"session_id" json:"session_id"` + WorkingDir *string `db:"working_dir" json:"working_dir,omitempty"` + TurnID string `db:"turn_id" json:"turn_id"` + Seq int64 `db:"seq" json:"seq"` + Role string `db:"role" json:"role"` + Timestamp int64 `db:"timestamp" json:"timestamp"` + Rank float64 `db:"rank" json:"rank"` + MatchSource string `db:"match_source" json:"match_source"` + Snippet string `db:"snippet" json:"snippet"` + RowID int64 `db:"rowid" json:"-"` +} + +// Filter controls query text, owner scoping, optional filters, and pagination. +type Filter struct { + Owner session.OwnerScope + Query string + IncludeToolOutputs bool + Tool *string + Host *string + Since *int64 + Until *int64 + Limit int + Cursor string +} + +// Cursor is the stable keyset position for the last row returned by a page. +type Cursor struct { + Rank float64 `json:"rank"` + Timestamp int64 `json:"timestamp"` + TurnID string `json:"turn_id"` + MatchSource string `json:"match_source"` + RowID int64 `json:"rowid,omitempty"` +} + +// Repository is the SQL steward for FTS search. It is read-only: Search only +// builds SELECT statements over the existing FTS and turns tables. +type Repository struct { + Database *database.Database `inject:""` +} + +func (r *Repository) Init(_ context.Context) error { return nil } + +// Search executes an FTS5 query against prose turns by default and unions tool +// output matches only when requested. Duplicate hits for the same turn are +// collapsed by best rank, with deterministic tie-breakers. +func (r *Repository) Search(ctx context.Context, f Filter) (*Result, error) { + if strings.TrimSpace(f.Query) == "" { + return nil, invalid("search query is empty") + } + if f.Limit <= 0 { + return &Result{Results: []Row{}, Limit: f.Limit, Rows: []Row{}}, nil + } + + var after *Cursor + if f.Cursor != "" { + c, err := DecodeCursor(f.Cursor, f) + if err != nil { + return nil, err + } + after = &c + } + + query, args := buildSearchSQL(f, after) + rows := make([]Row, 0) + if err := r.Database.DB.SelectContext(ctx, &rows, query, args...); err != nil { + if isFTSError(err) { + return nil, invalid("invalid search query") + } + return nil, culpa.WithCode(culpa.Wrap(err, "search"), "DB_QUERY") + } + + res := &Result{Results: rows, Limit: f.Limit, Rows: rows} + if len(rows) > f.Limit { + last := rows[f.Limit-1] + res.Results = rows[:f.Limit] + res.Rows = res.Results + next, err := EncodeCursor(Cursor{Rank: last.Rank, Timestamp: last.Timestamp, TurnID: last.TurnID, MatchSource: last.MatchSource, RowID: last.RowID}, f) + if err != nil { + return nil, err + } + res.NextCursor = next + } + return res, nil +} + +func buildSearchSQL(f Filter, after *Cursor) (string, []any) { + limit := f.Limit + 1 + var sb strings.Builder + args := make([]any, 0, 12) + + sb.WriteString(`WITH candidates AS (`) + appendFTSSelect(&sb, &args, "turns_fts", SourceTurn, 0, f) + if f.IncludeToolOutputs { + sb.WriteString(` UNION ALL `) + appendFTSSelect(&sb, &args, "tool_outputs_fts", SourceToolOutput, 1, f) + } + sb.WriteString(`), ranked AS (`) + sb.WriteString(`SELECT *, ROW_NUMBER() OVER (PARTITION BY rowid ORDER BY rank ASC, match_source ASC, rowid ASC) AS rn FROM candidates`) + sb.WriteString(`) SELECT t.owner, t.tool, t.host, t.session_id, s.working_dir, t.turn_id, t.seq, t.role, t.timestamp, ranked.rank, ranked.match_source, ranked.snippet, ranked.rowid`) + sb.WriteString(` FROM ranked JOIN turns t ON t.rowid = ranked.rowid`) + sb.WriteString(` JOIN sessions s ON s.owner = t.owner AND s.tool = t.tool AND s.host = t.host AND s.session_id = t.session_id`) + sb.WriteString(` WHERE ranked.rn = 1`) + if f.Since != nil { + sb.WriteString(` AND t.timestamp >= ?`) + args = append(args, *f.Since) + } + if f.Until != nil { + sb.WriteString(` AND t.timestamp <= ?`) + args = append(args, *f.Until) + } + if after != nil { + sb.WriteString(` AND (`) + sb.WriteString(`ranked.rank > ?`) + sb.WriteString(` OR (ranked.rank = ? AND t.timestamp < ?)`) + sb.WriteString(` OR (ranked.rank = ? AND t.timestamp = ? AND t.turn_id > ?)`) + sb.WriteString(` OR (ranked.rank = ? AND t.timestamp = ? AND t.turn_id = ? AND ranked.match_source > ?)`) + sb.WriteString(` OR (ranked.rank = ? AND t.timestamp = ? AND t.turn_id = ? AND ranked.match_source = ? AND ranked.rowid > ?)`) + sb.WriteString(`)`) + args = append(args, + after.Rank, + after.Rank, after.Timestamp, + after.Rank, after.Timestamp, after.TurnID, + after.Rank, after.Timestamp, after.TurnID, after.MatchSource, + after.Rank, after.Timestamp, after.TurnID, after.MatchSource, after.RowID, + ) + } + sb.WriteString(` ORDER BY ranked.rank ASC, t.timestamp DESC, t.turn_id ASC, ranked.match_source ASC, ranked.rowid ASC LIMIT ?`) + args = append(args, limit) + return sb.String(), args +} + +func appendFTSSelect(sb *strings.Builder, args *[]any, table, source string, priority int, f Filter) { + fmt.Fprintf(sb, `SELECT rowid, %d AS source_priority, '%s' AS match_source, bm25(%s) AS rank, snippet(%s, 0, char(2), char(3), '…', 12) AS snippet FROM %s WHERE %s MATCH ?`, priority, source, table, table, table, table) + *args = append(*args, f.Query) + appendFTSFilters(sb, args, table, f) +} + +func appendFTSFilters(sb *strings.Builder, args *[]any, table string, f Filter) { + switch { + case f.Owner.AllOwners: + case f.Owner.SpecificOwner != nil: + fmt.Fprintf(sb, ` AND %s.owner = ?`, table) + *args = append(*args, *f.Owner.SpecificOwner) + default: + fmt.Fprintf(sb, ` AND %s.owner = ?`, table) + *args = append(*args, f.Owner.User) + } + if f.Tool != nil { + fmt.Fprintf(sb, ` AND %s.tool = ?`, table) + *args = append(*args, *f.Tool) + } + if f.Host != nil { + fmt.Fprintf(sb, ` AND %s.host = ?`, table) + *args = append(*args, *f.Host) + } +} + +// EncodeCursor returns an opaque cursor bound to the normalized search tuple. +func EncodeCursor(c Cursor, f Filter) (string, error) { + p := cursorPayload{Version: cursorVersion, Cursor: c, FilterHash: filterHash(f)} + b, err := json.Marshal(p) + if err != nil { + return "", culpa.WithCode(culpa.Wrap(err, "encode search cursor"), "INTERNAL") + } + return base64.RawURLEncoding.EncodeToString(b), nil +} + +// DecodeCursor parses an opaque cursor and rejects cursors created for another +// normalized query/filter tuple. +func DecodeCursor(raw string, f Filter) (Cursor, error) { + b, err := base64.RawURLEncoding.DecodeString(raw) + if err != nil { + return Cursor{}, invalid("invalid search cursor") + } + var p cursorPayload + if err := json.Unmarshal(b, &p); err != nil { + return Cursor{}, invalid("invalid search cursor") + } + if p.Version != cursorVersion || p.FilterHash != filterHash(f) || p.Cursor.TurnID == "" || p.Cursor.MatchSource == "" || p.Cursor.RowID <= 0 { + return Cursor{}, invalid("invalid search cursor") + } + return p.Cursor, nil +} + +type cursorPayload struct { + Version int `json:"v"` + FilterHash string `json:"h"` + Cursor Cursor `json:"c"` +} + +type normalizedFilter struct { + OwnerUser string `json:"owner_user"` + OwnerAllOwners bool `json:"owner_all_owners"` + OwnerSpecificOwner *string `json:"owner_specific_owner"` + Query string `json:"query"` + IncludeToolOutputs bool `json:"include_tool_outputs"` + Tool *string `json:"tool"` + Host *string `json:"host"` + Since *int64 `json:"since"` + Until *int64 `json:"until"` +} + +func filterHash(f Filter) string { + n := normalizedFilter{ + OwnerUser: f.Owner.User, + OwnerAllOwners: f.Owner.AllOwners, + OwnerSpecificOwner: f.Owner.SpecificOwner, + Query: strings.TrimSpace(f.Query), + IncludeToolOutputs: f.IncludeToolOutputs, + Tool: f.Tool, + Host: f.Host, + Since: f.Since, + Until: f.Until, + } + b, err := json.Marshal(n) + if err != nil { + panic(err) + } + sum := sha256.Sum256(b) + return hex.EncodeToString(sum[:]) +} + +func invalid(msg string) error { + return culpa.WithCode(culpa.New(msg), "INVALID") +} + +func isFTSError(err error) bool { + if errors.Is(err, sql.ErrNoRows) { + return false + } + s := strings.ToLower(err.Error()) + return strings.Contains(s, "fts5") || strings.Contains(s, "fts syntax") || strings.Contains(s, "malformed match") || strings.Contains(s, "unterminated string") +} diff --git a/internal/domain/search/repository_test.go b/internal/domain/search/repository_test.go new file mode 100644 index 0000000000000000000000000000000000000000..2c98115b33e662a77ff3cfd1a741a4da3fc53833 --- /dev/null +++ b/internal/domain/search/repository_test.go @@ -0,0 +1,262 @@ +package search_test + +import ( + "context" + "encoding/json" + "strings" + "testing" + "time" + + "github.com/jmoiron/sqlx" + "go.bigb.es/auxilia/culpa" + + "sourcecraft.dev/bigbes/lethe/internal/config" + "sourcecraft.dev/bigbes/lethe/internal/domain/search" + "sourcecraft.dev/bigbes/lethe/internal/domain/session" + "sourcecraft.dev/bigbes/lethe/internal/platform/database" +) + +func newRepo(t *testing.T) (*search.Repository, *sqlx.DB) { + t.Helper() + d := &database.Database{Cfg: config.DatabaseConfig{Path: ":memory:", BusyTimeout: 5 * time.Second}} + if err := d.Init(context.Background()); err != nil { + t.Fatalf("database.Init: %v", err) + } + t.Cleanup(func() { _ = d.Destroy(context.Background()) }) + repo := &search.Repository{Database: d} + if err := repo.Init(context.Background()); err != nil { + t.Fatalf("repo.Init: %v", err) + } + return repo, d.DB +} + +func seedSession(t *testing.T, db *sqlx.DB, owner, tool, host, sid string, startedAt, endedAt int64) { + t.Helper() + seedSessionWithCwd(t, db, owner, tool, host, sid, startedAt, endedAt, nil) +} + +func seedSessionWithCwd(t *testing.T, db *sqlx.DB, owner, tool, host, sid string, startedAt, endedAt int64, cwd *string) { + t.Helper() + _, err := db.Exec(`INSERT INTO sessions (owner, tool, host, session_id, started_at, ended_at, working_dir, source_file, metadata) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, NULL)`, owner, tool, host, sid, startedAt, endedAt, cwd, "/tmp/search.jsonl") + if err != nil { + t.Fatalf("seed session %s/%s/%s/%s: %v", owner, tool, host, sid, err) + } +} + +func seedTurn(t *testing.T, db *sqlx.DB, owner, tool, host, sid, tid string, seq, ts int64, role, content string, toolCalls *string) { + t.Helper() + _, err := db.Exec(`INSERT INTO turns (owner, tool, host, session_id, turn_id, seq, role, timestamp, content, + model, tokens_in, tokens_out, cost_usd, tool_calls, metadata) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, NULL, NULL, NULL, NULL, ?, NULL)`, + owner, tool, host, sid, tid, seq, role, ts, content, toolCalls) + if err != nil { + t.Fatalf("seed turn %s/%s: %v", sid, tid, err) + } +} + +func strptr(v string) *string { return &v } + +func TestSearchScopesToOwner(t *testing.T) { + repo, db := newRepo(t) + seedSession(t, db, "alice", "cc", "phoebe", "a", 100, 110) + seedTurn(t, db, "alice", "cc", "phoebe", "a", "t1", 1, 101, "user", "needle belongs to alice", nil) + seedSession(t, db, "bob", "cc", "phoebe", "b", 100, 110) + seedTurn(t, db, "bob", "cc", "phoebe", "b", "t1", 1, 101, "user", "needle belongs to bob", nil) + + got, err := repo.Search(context.Background(), search.Filter{Owner: session.OwnerScope{User: "alice"}, Query: "needle", Limit: 10}) + if err != nil { + t.Fatalf("Search: %v", err) + } + if len(got.Results) != 1 || got.Results[0].Owner != "alice" || got.Results[0].SessionID != "a" { + t.Fatalf("expected alice/a only; got %#v", got.Results) + } +} + +func TestSearchAppliesToolHostAndTimeFilters(t *testing.T) { + repo, db := newRepo(t) + for _, row := range []struct { + tool, host, sid string + ts int64 + }{{"cc", "phoebe", "keep", 200}, {"gemini", "phoebe", "badtool", 200}, {"cc", "rhea", "badhost", 200}, {"cc", "phoebe", "early", 100}, {"cc", "phoebe", "late", 300}} { + seedSession(t, db, "alice", row.tool, row.host, row.sid, row.ts-1, row.ts+1) + seedTurn(t, db, "alice", row.tool, row.host, row.sid, "t1", 1, row.ts, "user", "needle", nil) + } + tool, host := "cc", "phoebe" + since, until := int64(150), int64(250) + got, err := repo.Search(context.Background(), search.Filter{Owner: session.OwnerScope{User: "alice"}, Query: "needle", Tool: &tool, Host: &host, Since: &since, Until: &until, Limit: 10}) + if err != nil { + t.Fatalf("Search: %v", err) + } + if len(got.Results) != 1 || got.Results[0].SessionID != "keep" { + t.Fatalf("expected keep only; got %#v", got.Results) + } +} + +func TestSearchResultShapeIncludesWorkingDirRankMatchSourceAndEnvelope(t *testing.T) { + repo, db := newRepo(t) + cwd := "/code/project" + seedSessionWithCwd(t, db, "alice", "cc", "phoebe", "s", 100, 110, &cwd) + seedTurn(t, db, "alice", "cc", "phoebe", "s", "t1", 1, 101, "user", "needle", nil) + + got, err := repo.Search(context.Background(), search.Filter{Owner: session.OwnerScope{User: "alice"}, Query: "needle", Limit: 10}) + if err != nil { + t.Fatalf("Search: %v", err) + } + if got.Limit != 10 || len(got.Results) != 1 { + t.Fatalf("bad envelope: %#v", got) + } + row := got.Results[0] + if row.WorkingDir == nil || *row.WorkingDir != cwd { + t.Fatalf("WorkingDir = %#v; want %q", row.WorkingDir, cwd) + } + if row.Rank == 0 { + t.Fatalf("Rank was not populated: %#v", row) + } + if row.MatchSource != search.SourceTurn { + t.Fatalf("MatchSource = %q; want %q", row.MatchSource, search.SourceTurn) + } + b, err := json.Marshal(got) + if err != nil { + t.Fatalf("Marshal: %v", err) + } + js := string(b) + if !strings.Contains(js, `"results"`) || !strings.Contains(js, `"limit":10`) || strings.Contains(js, `"rows"`) || strings.Contains(js, `"source"`) { + t.Fatalf("JSON envelope incompatible: %s", js) + } +} + +func TestSearchOrdersByRankBeforeTimestamp(t *testing.T) { + repo, db := newRepo(t) + seedSession(t, db, "alice", "cc", "phoebe", "s", 100, 500) + seedTurn(t, db, "alice", "cc", "phoebe", "s", "newer-weaker", 1, 400, "user", "needle filler filler filler filler filler filler filler", nil) + seedTurn(t, db, "alice", "cc", "phoebe", "s", "older-stronger", 2, 200, "user", "needle needle needle", nil) + + got, err := repo.Search(context.Background(), search.Filter{Owner: session.OwnerScope{User: "alice"}, Query: "needle", Limit: 10}) + if err != nil { + t.Fatalf("Search: %v", err) + } + if len(got.Results) < 2 { + t.Fatalf("need two results; got %#v", got.Results) + } + if got.Results[0].TurnID != "older-stronger" { + t.Fatalf("rank ordering ignored: got %#v", got.Results) + } + if got.Results[0].Rank > got.Results[1].Rank { + t.Fatalf("rank should be ascending (lower better): %#v", got.Results) + } +} + +func TestSearchDefaultsToProseAndToolOutputIsOptIn(t *testing.T) { + repo, db := newRepo(t) + seedSession(t, db, "alice", "cc", "phoebe", "s", 100, 110) + seedTurn(t, db, "alice", "cc", "phoebe", "s", "prose", 1, 101, "user", "needle in prose", nil) + seedTurn(t, db, "alice", "cc", "phoebe", "s", "tool", 2, 102, "assistant", "plain text", strptr(`{"output":"needle in shell"}`)) + + proseOnly, err := repo.Search(context.Background(), search.Filter{Owner: session.OwnerScope{User: "alice"}, Query: "needle", Limit: 10}) + if err != nil { + t.Fatalf("Search proseOnly: %v", err) + } + if len(proseOnly.Results) != 1 || proseOnly.Results[0].TurnID != "prose" || proseOnly.Results[0].MatchSource != search.SourceTurn { + t.Fatalf("expected only prose row; got %#v", proseOnly.Results) + } + + withTools, err := repo.Search(context.Background(), search.Filter{Owner: session.OwnerScope{User: "alice"}, Query: "needle", IncludeToolOutputs: true, Limit: 10}) + if err != nil { + t.Fatalf("Search withTools: %v", err) + } + if len(withTools.Results) != 2 { + t.Fatalf("expected prose + tool rows; got %#v", withTools.Results) + } +} + +func TestSearchDedupesTurnWithBetterRankedMatch(t *testing.T) { + repo, db := newRepo(t) + seedSession(t, db, "alice", "cc", "phoebe", "s", 100, 110) + seedTurn(t, db, "alice", "cc", "phoebe", "s", "t1", 1, 101, "assistant", "needle filler filler filler filler filler filler filler", strptr(`{"output":"needle needle needle"}`)) + + got, err := repo.Search(context.Background(), search.Filter{Owner: session.OwnerScope{User: "alice"}, Query: "needle", IncludeToolOutputs: true, Limit: 10}) + if err != nil { + t.Fatalf("Search: %v", err) + } + if len(got.Results) != 1 || got.Results[0].TurnID != "t1" || got.Results[0].MatchSource != search.SourceToolOutput { + t.Fatalf("expected one better-ranked tool row; got %#v", got.Results) + } +} + +func TestSearchCursorUsesRankTimestampTurnIDAndMatchSourceTieBreakers(t *testing.T) { + repo, db := newRepo(t) + seedSession(t, db, "alice", "cc", "phoebe", "s", 100, 110) + seedTurn(t, db, "alice", "cc", "phoebe", "s", "a", 1, 101, "user", "needle", nil) + seedTurn(t, db, "alice", "cc", "phoebe", "s", "b", 2, 101, "user", "needle", nil) + seedTurn(t, db, "alice", "cc", "phoebe", "s", "c", 3, 100, "assistant", "plain", strptr(`{"output":"needle"}`)) + + page1, err := repo.Search(context.Background(), search.Filter{Owner: session.OwnerScope{User: "alice"}, Query: "needle", IncludeToolOutputs: true, Limit: 2}) + if err != nil { + t.Fatalf("Search page1: %v", err) + } + if len(page1.Results) != 2 || page1.Results[0].TurnID != "a" || page1.Results[1].TurnID != "b" || page1.NextCursor == "" { + t.Fatalf("bad page1: %#v", page1) + } + cur, err := search.DecodeCursor(page1.NextCursor, search.Filter{Owner: session.OwnerScope{User: "alice"}, Query: "needle", IncludeToolOutputs: true, Limit: 99}) + if err != nil { + t.Fatalf("DecodeCursor: %v", err) + } + if cur.Rank != page1.Results[1].Rank || cur.Timestamp != page1.Results[1].Timestamp || cur.TurnID != "b" || cur.MatchSource != search.SourceTurn { + t.Fatalf("cursor did not preserve planned tie-breakers: %#v vs row %#v", cur, page1.Results[1]) + } + page2, err := repo.Search(context.Background(), search.Filter{Owner: session.OwnerScope{User: "alice"}, Query: "needle", IncludeToolOutputs: true, Limit: 2, Cursor: page1.NextCursor}) + if err != nil { + t.Fatalf("Search page2: %v", err) + } + if len(page2.Results) != 1 || page2.Results[0].TurnID != "c" || page2.Results[0].MatchSource != search.SourceToolOutput || page2.NextCursor != "" { + t.Fatalf("bad page2: %#v", page2) + } + + _, err = repo.Search(context.Background(), search.Filter{Owner: session.OwnerScope{User: "alice"}, Query: "other", IncludeToolOutputs: true, Limit: 1, Cursor: page1.NextCursor}) + if codeOf(err) != "INVALID" { + t.Fatalf("expected INVALID for filter-bound cursor, got %q (%v)", codeOf(err), err) + } +} + +func TestDecodeCursorRejectsMalformedInput(t *testing.T) { + _, err := search.DecodeCursor("not-base64", search.Filter{Owner: session.OwnerScope{User: "alice"}, Query: "needle"}) + if codeOf(err) != "INVALID" { + t.Fatalf("expected INVALID, got %q (%v)", codeOf(err), err) + } +} + +func TestSearchSnippetUsesMarkerRunes(t *testing.T) { + repo, db := newRepo(t) + seedSession(t, db, "alice", "cc", "phoebe", "s", 100, 110) + seedTurn(t, db, "alice", "cc", "phoebe", "s", "t1", 1, 101, "user", "alpha needle omega", nil) + + got, err := repo.Search(context.Background(), search.Filter{Owner: session.OwnerScope{User: "alice"}, Query: "needle", Limit: 10}) + if err != nil { + t.Fatalf("Search: %v", err) + } + if len(got.Results) != 1 || !strings.Contains(got.Results[0].Snippet, "\x02needle\x03") || strings.Contains(got.Results[0].Snippet, "") { + t.Fatalf("snippet missing marker bytes or contains HTML: %#v", got.Results) + } +} + +func TestSearchInvalidAndEmptyQueriesMapToInvalid(t *testing.T) { + repo, db := newRepo(t) + seedSession(t, db, "alice", "cc", "phoebe", "s", 100, 110) + seedTurn(t, db, "alice", "cc", "phoebe", "s", "t1", 1, 101, "user", "alpha", nil) + for _, q := range []string{"", " ", `"unterminated`} { + _, err := repo.Search(context.Background(), search.Filter{Owner: session.OwnerScope{User: "alice"}, Query: q, Limit: 10}) + if codeOf(err) != "INVALID" { + t.Fatalf("query %q: expected INVALID, got %q (%v)", q, codeOf(err), err) + } + } +} + +func codeOf(err error) string { + var cd culpa.CodeDetail + if !culpa.FindDetail(err, &cd) { + return "" + } + s, _ := cd.Code.(string) + return s +} diff --git a/internal/server/server.go b/internal/server/server.go index 7a28090c2199b3a9f659e067a835be43dd3de5d0..5c8ec0a4b8a11032ec16d17682898f726ef66ce2 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -32,6 +32,7 @@ import ( "sourcecraft.dev/bigbes/lethe/internal/domain/ingest" "sourcecraft.dev/bigbes/lethe/internal/domain/project" "sourcecraft.dev/bigbes/lethe/internal/domain/savedsearch" + "sourcecraft.dev/bigbes/lethe/internal/domain/search" "sourcecraft.dev/bigbes/lethe/internal/domain/session" "sourcecraft.dev/bigbes/lethe/internal/domain/stats" "sourcecraft.dev/bigbes/lethe/internal/pkg/apierror" @@ -57,12 +58,13 @@ type Server struct { Metrics *observability.Metrics `inject:""` Health *health.Set `inject:""` - Auth *authpkg.Authenticator `inject:""` - Ingest *ingest.Handler `inject:""` - Sessions *session.Handler `inject:""` - Projects *project.Handler `inject:""` - Stats *stats.Handler `inject:""` + Auth *authpkg.Authenticator `inject:""` + Ingest *ingest.Handler `inject:""` + Sessions *session.Handler `inject:""` + Projects *project.Handler `inject:""` + Stats *stats.Handler `inject:""` SavedSearches *savedsearch.Handler `inject:""` + Search *search.Handler `inject:""` router *chi.Mux httpSrv *http.Server @@ -107,6 +109,7 @@ func (s *Server) Init(_ context.Context) error { s.Projects.Mount(r) s.Stats.Mount(r) s.SavedSearches.Mount(r) + s.Search.Mount(r) }) // SPA catch-all: serves the embedded React app for all non-API GET paths.