M cmd/lethe-collector/main.go => cmd/lethe-collector/main.go +2 -0
@@ 18,6 18,7 @@ import (
"sourcecraft.dev/bigbes/lethe/internal/collector/ingest"
"sourcecraft.dev/bigbes/lethe/internal/collector/parser"
"sourcecraft.dev/bigbes/lethe/internal/collector/parser/claudecode"
+ "sourcecraft.dev/bigbes/lethe/internal/collector/parser/opencode"
"sourcecraft.dev/bigbes/lethe/internal/collector/state"
)
@@ 218,6 219,7 @@ func newStatusCmd(configPath *string) *cobra.Command {
func buildParsers(host string) map[string]parser.Parser {
return map[string]parser.Parser{
"claude-code": claudecode.New(host),
+ "opencode": opencode.New(host),
}
}
M cmd/lethe-collector/main_test.go => cmd/lethe-collector/main_test.go +13 -0
@@ 45,3 45,16 @@ func TestBuildLogger_LevelEnforced(t *testing.T) {
t.Fatal("error-level logger should enable error")
}
}
+
+func TestBuildParsers_RegistersSupportedTools(t *testing.T) {
+ parsers := buildParsers("laptop")
+ for _, tool := range []string{"claude-code", "opencode"} {
+ p, ok := parsers[tool]
+ if !ok {
+ t.Fatalf("parser %q is not registered", tool)
+ }
+ if got := p.Tool(); got != tool {
+ t.Fatalf("parser %q Tool() = %q", tool, got)
+ }
+ }
+}
M cmd/lethe/main.go => cmd/lethe/main.go +6 -1
@@ 27,6 27,7 @@ import (
"sourcecraft.dev/bigbes/lethe/internal/domain/ingest"
"sourcecraft.dev/bigbes/lethe/internal/domain/project"
"sourcecraft.dev/bigbes/lethe/internal/domain/savedsearch"
+ "sourcecraft.dev/bigbes/lethe/internal/domain/search"
"sourcecraft.dev/bigbes/lethe/internal/domain/session"
"sourcecraft.dev/bigbes/lethe/internal/domain/stats"
"sourcecraft.dev/bigbes/lethe/internal/platform/database"
@@ 105,6 106,8 @@ func run() int {
statsHnd = &stats.Handler{}
savedSearchRepo = &savedsearch.Repository{}
savedSearchHnd = &savedsearch.Handler{}
+ searchRepo = &search.Repository{}
+ searchHnd = &search.Handler{}
serverSvc = &server.Server{}
)
@@ 112,7 115,7 @@ func run() int {
loggerSvc, metricsSvc, dbSvc, dbCheckSvc, healthSetSvc,
authSvc, ingestRepo, ingestSvc, ingestHnd,
sessionRepo, sessionHnd, projectRepo, projectHnd, statsRepo, statsHnd,
- savedSearchRepo, savedSearchHnd, serverSvc,
+ savedSearchRepo, savedSearchHnd, searchRepo, searchHnd, serverSvc,
}
mgr.AddComponent(ctx,
@@ 134,6 137,8 @@ func run() int {
steward.MustServiceAsset(statsHnd),
steward.MustServiceAsset(savedSearchRepo),
steward.MustServiceAsset(savedSearchHnd),
+ steward.MustServiceAsset(searchRepo),
+ steward.MustServiceAsset(searchHnd),
steward.MustServiceAsset(serverSvc, steward.Root()),
)
M cmd/lethe/main_e2e_test.go => cmd/lethe/main_e2e_test.go +3 -0
@@ 23,6 23,7 @@ import (
"sourcecraft.dev/bigbes/lethe/internal/domain/ingest"
"sourcecraft.dev/bigbes/lethe/internal/domain/project"
"sourcecraft.dev/bigbes/lethe/internal/domain/savedsearch"
+ "sourcecraft.dev/bigbes/lethe/internal/domain/search"
"sourcecraft.dev/bigbes/lethe/internal/domain/session"
"sourcecraft.dev/bigbes/lethe/internal/domain/stats"
"sourcecraft.dev/bigbes/lethe/internal/platform/database"
@@ 89,6 90,8 @@ func TestEndToEnd_MultiUserIsolation(t *testing.T) {
steward.MustServiceAsset(&stats.Handler{}),
steward.MustServiceAsset(&savedsearch.Repository{}),
steward.MustServiceAsset(&savedsearch.Handler{}),
+ steward.MustServiceAsset(&search.Repository{}),
+ steward.MustServiceAsset(&search.Handler{}),
steward.MustServiceAsset(srv, steward.Root()),
)
A docs/spikes/opencode-format.md => docs/spikes/opencode-format.md +196 -0
@@ 0,0 1,196 @@
+# opencode Storage Format Spike
+
+**Date:** 2026-05-04
+**Scope:** Determine canonical source, schema, progress marker, and parser risks for lethe collector.
+**Respects:** IV10, PC3, AS3, AS4, UK1.
+
+---
+
+## 1. Canonical Source Choice
+
+**Recommendation: SQLite database (`~/.local/share/opencode/opencode.db`)**
+
+Rationale:
+- The DB is the source of truth. JSON files in `storage/` are hydrated projections of DB rows with extra derived fields (e.g. `messageID` in part JSON).
+- DB counts: **725 sessions**, **26,611 messages**, **107,889 parts**.
+- JSON files are fewer: 16,926 message JSONs vs 26,611 DB rows, indicating some messages exist only in the DB.
+- The DB is actively WAL-journaled (`opencode.db-wal` ~2MB, `-shm` 32KB), so reads must use the live DB path, not a copy.
+
+Alternative rejected:
+- `storage/{message,part,session,project}/` JSON tree — incomplete, redundant, and 2GB total vs 1.3GB DB.
+- `event` table — append-only event log with `event_sequence` aggregate IDs. Useful for replay but overkill for collector polling.
+- `~/.cache/opencode/` — models.json, node_modules; no transcript data.
+- `~/.config/opencode/` — auth, plugins, skills; no transcript data.
+- `~/.local/share/opencode/tool-output/` — binary blobs referenced by `callID`, not structured text.
+
+---
+
+## 2. Schema Overview
+
+### 2.1 Tables (relevant to collector)
+
+```sql
+CREATE TABLE session (
+ id TEXT PRIMARY KEY,
+ project_id TEXT NOT NULL,
+ parent_id TEXT,
+ slug TEXT NOT NULL,
+ directory TEXT NOT NULL,
+ title TEXT NOT NULL,
+ version TEXT NOT NULL,
+ time_created INTEGER NOT NULL,
+ time_updated INTEGER NOT NULL
+);
+
+CREATE TABLE message (
+ id TEXT PRIMARY KEY,
+ session_id TEXT NOT NULL,
+ time_created INTEGER NOT NULL,
+ time_updated INTEGER NOT NULL,
+ data TEXT NOT NULL -- JSON
+);
+
+CREATE TABLE part (
+ id TEXT PRIMARY KEY,
+ message_id TEXT NOT NULL,
+ session_id TEXT NOT NULL,
+ time_created INTEGER NOT NULL,
+ time_updated INTEGER NOT NULL,
+ data TEXT NOT NULL -- JSON
+);
+```
+
+Indexes:
+- `part_session_idx` ON `part` (`session_id`)
+- `session_project_idx` ON `session` (`project_id`)
+
+### 2.2 Message `data` JSON shape
+
+Top-level keys (observed):
+- `role`: `"user" | "assistant"`
+- `agent`: `"build" | "implementer-deep" | "implementer-smart" | "oracle" | ...`
+- `mode`: same as agent, the execution mode
+- `parentID`: previous message ID (for threading)
+- `path`: `{ cwd, root }` — absolute filesystem paths (PII risk)
+- `modelID`, `providerID`: model metadata
+- `tokens`: `{ input, output, reasoning, cache: { read, write } }`
+- `cost`: float
+- `time`: `{ created, completed? }` — epoch millis
+- `finish`: `"stop" | "tool-calls" | ...`
+
+### 2.3 Part `data` JSON shape (by type)
+
+| Type | Keys | Content |
+|------|------|---------|
+| `text` | `type`, `text`, `time`, `metadata?` | Raw prose |
+| `reasoning` | `type`, `text`, `time` | Model reasoning block |
+| `tool` | `type`, `tool`, `callID`, `state` | Tool invocation + output |
+| `step-start` | `type` | Turn boundary marker |
+| `step-finish` | `type`, `reason`, `tokens`, `cost`, `snapshot` | Turn summary |
+| `patch` | `type`, `patch`, `path` | Code diff |
+| `file` | `type`, `path`, `content?` | File reference |
+| `compaction` | ... | Summarized history |
+
+### 2.4 Tool output storage
+
+Tool outputs are **not** stored in `part.data` JSON. Instead:
+- `part.data.state.output` contains a short summary/truncation marker for some tools.
+- Large tool outputs are written to `~/.local/share/opencode/tool-output/tool_<callID>` as opaque files.
+- 20 files observed, ranging from 0B to 6MB.
+- The `callID` in the part JSON links to the filename.
+
+---
+
+## 3. Session / Message / Part Flow
+
+A single user → assistant turn is not one row. It is a multi-part sequence:
+
+```
+message (user, role=user)
+ └── part (type=text) — user prompt
+
+message (assistant, role=assistant)
+ └── part (type=step-start) — turn begins
+ └── part (type=reasoning) — model thinking
+ └── part (type=tool) — tool call(s)
+ └── part (type=step-finish) — turn ends
+```
+
+For search indexing, a "turn" is best defined as:
+- **User turn**: one `message` with `role=user` + its `text` part(s).
+- **Assistant turn**: one `message` with `role=assistant` + all associated `text`, `reasoning`, and `tool` parts between `step-start` and `step-finish`.
+
+---
+
+## 4. Progress Marker Choice
+
+**Recommendation: `message.rowid` as the progress offset.**
+
+Rationale:
+- AS4 assumes an integer offset. SQLite `rowid` is an integer marker and fits the existing collector state schema without a new format.
+- The collector persists progress from accepted event `Seq` values. `TurnEvent.Seq` stores the current `message.rowid`, while `ingestion_state.last_offset` stores the next rowid to scan. This keeps partial acceptance resumable: a saved next event rowid is included on the next poll, and a fully accepted scan saves `max(rowid)+1`.
+- `message.rowid` is unique and monotonic for rows in the source DB, so equal-millisecond messages do not skip unaccepted rows.
+- `message.time_created` is still the event timestamp, but it is not a safe offset: multiple messages can share the same millisecond.
+- Messages and parts both have `time_created`, but messages are the turn boundaries; parts inherit the message's session and can be queried via `message_id`.
+- Alternative: `message.id` lexicographic — IDs are sortable ULIDs (e.g. `msg_df14a9a20001MitrpJBdwZHMN4`) but integer offset is simpler for the existing state schema.
+- Alternative: `event.seq` — requires joining through `event_sequence`, adds complexity.
+
+Query pattern for incremental collection:
+```sql
+SELECT * FROM message
+WHERE rowid >= ?
+ORDER BY rowid ASC;
+```
+
+---
+
+## 5. Fixture Anonymization Notes
+
+Fields that must be redacted in test fixtures and logs:
+- `path.cwd`, `path.root` — absolute local paths.
+- `modelID` / `providerID` — may contain API key fragments or account info.
+- Tool output file contents (`tool-output/tool_<callID>`) — may contain secrets, env vars, tokens.
+- `auth.json` — contains access tokens, refresh tokens; never read in parser.
+- Session `title` — may contain internal project names or ticket IDs.
+
+Safe to retain:
+- ID prefixes and structure (`ses_`, `msg_`, `prt_`, `tool_`).
+- `role`, `agent`, `mode`, `type` enums.
+- `time_created` relative offsets.
+- `slug` — random adjective-noun pairs, no PII.
+- Token counts and costs (aggregated, not content).
+
+---
+
+## 6. Parser Risks
+
+| Risk | Impact | Mitigation |
+|------|--------|------------|
+| **Schema drift** | opencode may add new `part` types or message keys | Defensive JSON unmarshaling; ignore unknown keys |
+| **WAL lock contention** | Collector reads while opencode writes | Use `mode=ro` URI or copy with `sqlite3` CLI if needed |
+| **Large tool outputs** | 6MB+ files could bloat index | Only index tool output metadata/summary; skip `tool-output/` files unless `include_tool_outputs=1` |
+| **Part ordering ambiguity** | `time_created` may collide for parts in same message | Use `id` lexicographic as secondary sort |
+| **Eventual consistency** | Event sourcing means DB state may lag events | Poll `message` table directly; events are for replay only |
+| **Nested JSON depth** | `state.output` in tool parts may be stringified JSON | Treat as opaque text or attempt one-level unmarshal |
+| **Session archiving** | `time_archived` column exists; archived sessions may be compacted | Respect `time_archived IS NULL` or include archived based on config |
+
+---
+
+## 7. Open Questions for Parser Implementation
+
+1. Should the collector index **every** session or only those matching a project filter?
+2. Should `reasoning` parts be indexed as prose or excluded by default?
+3. Should `patch` parts be indexed as code/text or handled separately?
+4. How to handle parent-child message threading (`parentID`) in search results?
+5. What is the compaction format in `compaction` parts? (Only 60 observed, low priority.)
+
+---
+
+## 8. Summary for Next Phase
+
+- **Source:** SQLite `~/.local/share/opencode/opencode.db`, read-only.
+- **Tables:** `session`, `message`, `part`.
+- **Progress:** `message.rowid` (INTEGER SQLite row marker).
+- **Turn definition:** `message` row + linked `part` rows by `message_id`.
+- **Tool outputs:** External files in `tool-output/`, referenced by `callID`.
+- **Registration:** One parser package implementing `parser.Parser` + registration in collector.
M docs/tasks/lethe-search-and-opencode.md => docs/tasks/lethe-search-and-opencode.md +68 -3
@@ 1,6 1,9 @@
# lethe-search-and-opencode
-**Status:** Design (hands-off)
+**Status:** done
+**Branch:** `task/lethe-search-and-opencode`
+**Worktree:** `/Users/blikh/data/home/lethe/.worktrees/lethe-search-and-opencode`
+**Mode:** hands-off
**Module:** `sourcecraft.dev/bigbes/lethe`
**Depends on:** `lethe-server.md` (#1) — FTS5 tables and triggers were created in #1; this task only adds query code. `lethe-collector-claude-code.md` (#2) — the collector framework and Parser interface this task extends.
**Sibling tasks (deferred):** per-tool parsers (`lethe-collector-crush.md`, `lethe-collector-pi.md`, `lethe-collector-kimi.md`); RFC backlog items (cost rollups for tools that report it, tagging, JSON/Markdown export).
@@ 243,14 246,76 @@ Approach: ship `/api/v1/search` as an additive read domain first, then run the o
- IF2 — `func (h *Handler) Mount(r chi.Router)` — server mount contract matching other domain packages.
- IF3 — `func New(host string) *Parser` — opencode parser constructor registered by the collector CLI.
- IF4 — `func buildParsers(host string) map[string]parser.Parser` — collector parser registry remains the only dispatch point.
+- IF5 — `docs/spikes/opencode-format.md` — canonical opencode source choice consumed by the parser phase.
### Interface Graph
- PH1 -> IF1 @ `internal/domain/search/`
- PH2 IF1 -> IF2 @ `internal/domain/search/`, `internal/server/`, `cmd/lethe/`
-- PH3 -> @ `docs/spikes/opencode-format.md`
-- PH4 -> IF3, IF4 @ `internal/collector/parser/opencode/`, `cmd/lethe-collector/`
+- PH3 -> IF5 @ `docs/spikes/opencode-format.md`
+- PH4 IF5 -> IF3, IF4 @ `internal/collector/parser/opencode/`, `cmd/lethe-collector/`
Backwards-compat: additive route and parser registration only; PH1/PH2 do not alter existing routes or schema, and PH4 does not change the parser interface, runner, or collector state schema.
Scope check: no stats work, no React search UI, no schema migration, no saved-search changes, and no parser abstraction beyond `buildParsers`.
+
+## Verify
+
+**Result:** passed
+
+Positive:
+- CK1 — `/api/v1/search` repository and handler tests cover ranked prose search, tool-output opt-in, filters, cursors, and response envelope.
+- CK2 — opencode parser tests cover SQLite discovery, turn mapping, tool summaries, resume marker, malformed skips, and collector registration.
+- CK3 — `go build ./cmd/lethe ./cmd/lethe-collector` succeeds.
+- CK4 — `go test ./... -count=1` passes.
+
+Negative:
+- CK5 — empty/invalid search query and bad cursor return `INVALID`.
+- CK6 — non-admin `?owner=` on search returns `FORBIDDEN`.
+- CK7 — opencode parser does not ingest external `tool-output/` blob contents.
+
+Invariants / assumptions:
+- CK8 (IV1, IV2) — no search package references schema DDL or `internal/shared/wire`.
+- CK9 (IV3-IV7) — search tests verify read-path behavior, owner scoping, prose default, marker snippets, and invalid-query handling.
+- CK10 (IV8-IV10, AS3, AS4) — opencode parser implements `parser.Parser`, keeps collector state schema unchanged, and consumes the committed storage spike.
+- CK11 (IV11, IV12) — stats packages and React `/search` route were not changed.
+
+Interfaces:
+- CK12 (IF1) — `Repository.Search(ctx, Filter)` is called by handler and repository tests.
+- CK13 (IF2) — `Handler.Mount(r chi.Router)` registers `/api/v1/search`.
+- CK14 (IF3, IF4) — `opencode.New(host)` is registered through `buildParsers` and tested by `cmd/lethe-collector`.
+- CK15 (IF5) — `docs/spikes/opencode-format.md` records the SQLite source and `message.rowid` marker used by PH4.
+
+Smoke: `go test ./internal/domain/search -run TestHandler_SuccessfulResponseEnvelope -v` and `go test ./internal/collector/parser/opencode -run TestParse_MapsTurnsAndIdentity -v` both pass.
+
+## Conclusion
+
+Outcome: `/api/v1/search` and the opencode collector parser shipped on `task/lethe-search-and-opencode` through `5cc599d`.
+
+Invariants:
+- IV1 — no migration files were added.
+- IV2 — `internal/shared/wire/` was not modified.
+- IV3 — search implementation is repository/handler read-path code only.
+- IV4 — search handler uses the existing authenticated owner-scope rules.
+- IV5 — repository tests cover prose-only default and tool-output opt-in.
+- IV6 — snippets use marker bytes, not HTML.
+- IV7 — empty, malformed, and bad-cursor search inputs return `INVALID`.
+- IV8 — opencode implements `parser.Parser` unchanged.
+- IV9 — collector runner and state schema were unchanged.
+- IV10 — `docs/spikes/opencode-format.md` landed before parser implementation.
+- IV11 — stats API/page code was not changed.
+- IV12 — React `/search` route was not changed.
+
+### Assumptions check
+- AS1 — held — search tests exercise FTS rows populated by existing triggers.
+- AS2 — held — search joins FTS rowid back to `turns.rowid` in tests and implementation.
+- AS3 — held — spike confirmed readable opencode SQLite storage under `~/.local/share/opencode/`.
+- AS4 — held after review fix — collector `last_offset` stores next opencode `message.rowid`, and `TurnEvent.Seq` stores current rowid.
+
+### Unknowns outcome
+- UK1 — resolved — SQLite `opencode.db` is canonical for v1.
+- UK2 — resolved for v1 — invalid FTS syntax maps to `INVALID`; no stricter normalizer was needed.
+- UK3 — still-open — BM25 quality needs real archive usage after ingest.
+
+### Review findings
+- Critical: opencode offset marker changed from `message.time_created` to inclusive next-`message.rowid` after reviewer found skipped-row risk in partial-accept paths.
A internal/collector/parser/opencode/parser.go => internal/collector/parser/opencode/parser.go +409 -0
@@ 0,0 1,409 @@
+package opencode
+
+import (
+ "context"
+ "crypto/sha256"
+ "database/sql"
+ "encoding/hex"
+ "encoding/json"
+ "fmt"
+ "os"
+ "path/filepath"
+ "strings"
+ "time"
+
+ "sourcecraft.dev/bigbes/lethe/internal/collector/parser"
+ "sourcecraft.dev/bigbes/lethe/internal/shared/wire"
+
+ _ "modernc.org/sqlite"
+)
+
+const toolName = "opencode"
+
+// Parser maps opencode SQLite transcript records into lethe wire events.
+type Parser struct {
+ host string
+}
+
+// New builds a parser that stamps every emitted event with host.
+func New(host string) *Parser {
+ return &Parser{host: host}
+}
+
+// Tool returns the collector-facing tool name.
+func (p *Parser) Tool() string {
+ return toolName
+}
+
+// Discover returns the canonical opencode SQLite database below root. The root
+// may be ~/.local/share/opencode or the database path itself.
+func (p *Parser) Discover(root string) ([]parser.SourceFile, error) {
+ info, err := os.Stat(root)
+ if err != nil {
+ return nil, err
+ }
+ if !info.IsDir() {
+ if filepath.Base(root) != "opencode.db" {
+ return nil, nil
+ }
+ return []parser.SourceFile{{Path: root, Size: info.Size()}}, nil
+ }
+
+ dbPath := filepath.Join(root, "opencode.db")
+ info, err = os.Stat(dbPath)
+ if err != nil {
+ if os.IsNotExist(err) {
+ return nil, nil
+ }
+ return nil, err
+ }
+ if info.IsDir() {
+ return nil, nil
+ }
+ return []parser.SourceFile{{Path: dbPath, Size: info.Size()}}, nil
+}
+
+// Parse reads opencode messages starting at since. since and the returned marker
+// are message.rowid markers: since is the next rowid to scan, returned offset is
+// one past the last scanned row, and message.time_created remains the timestamp.
+func (p *Parser) Parse(path string, since int64) ([]wire.TurnEvent, int64, error) {
+ db, err := sql.Open("sqlite", readOnlyDSN(path))
+ if err != nil {
+ return nil, since, err
+ }
+ defer func() { _ = db.Close() }()
+
+ ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+ defer cancel()
+ rows, err := db.QueryContext(ctx, `
+SELECT
+ m.rowid,
+ m.id,
+ m.session_id,
+ m.time_created,
+ m.data,
+ s.directory,
+ s.time_created,
+ s.slug,
+ s.version
+FROM message m
+JOIN session s ON s.id = m.session_id
+WHERE m.rowid >= ?
+ORDER BY m.rowid ASC`, since)
+ if err != nil {
+ return nil, since, err
+ }
+ defer func() { _ = rows.Close() }()
+
+ events := make([]wire.TurnEvent, 0)
+ next := since
+ for rows.Next() {
+ var record messageRow
+ if err := rows.Scan(&record.RowID, &record.ID, &record.SessionID, &record.TimeCreated, &record.Data, &record.Directory, &record.SessionCreated, &record.Slug, &record.Version); err != nil {
+ return events, next, err
+ }
+ if candidate := record.RowID + 1; candidate > next {
+ next = candidate
+ }
+ parts, err := loadParts(ctx, db, record.ID)
+ if err != nil {
+ return events, next, err
+ }
+ event, ok := p.mapRecord(path, record, parts)
+ if ok {
+ events = append(events, event)
+ }
+ }
+ if err := rows.Err(); err != nil {
+ return events, next, err
+ }
+ return events, next, nil
+}
+
+type messageRow struct {
+ RowID int64
+ ID string
+ SessionID string
+ TimeCreated int64
+ Data string
+ Directory string
+ SessionCreated int64
+ Slug string
+ Version string
+}
+
+type partRow struct {
+ ID string
+ TimeCreated int64
+ Data string
+}
+
+type messageData struct {
+ Role string `json:"role"`
+ Agent string `json:"agent"`
+ Mode string `json:"mode"`
+ ModelID string `json:"modelID"`
+ ProviderID string `json:"providerID"`
+ Tokens tokenData `json:"tokens"`
+ Cost float64 `json:"cost"`
+ Path pathData `json:"path"`
+ Time timeData `json:"time"`
+ Raw json.RawMessage
+}
+
+type tokenData struct {
+ Input int64 `json:"input"`
+ Output int64 `json:"output"`
+}
+
+type pathData struct {
+ CWD string `json:"cwd"`
+ Root string `json:"root"`
+}
+
+type timeData struct {
+ Created int64 `json:"created"`
+ Completed int64 `json:"completed"`
+}
+
+type partData struct {
+ Type string `json:"type"`
+ Text string `json:"text"`
+ Tool string `json:"tool"`
+ CallID string `json:"callID"`
+ State json.RawMessage `json:"state"`
+ Raw json.RawMessage
+}
+
+type toolState struct {
+ Status string `json:"status"`
+ Output json.RawMessage `json:"output"`
+}
+
+type toolCallSummary struct {
+ Tool string `json:"tool,omitempty"`
+ CallID string `json:"call_id,omitempty"`
+ Status string `json:"status,omitempty"`
+ Output string `json:"output,omitempty"`
+}
+
+func loadParts(ctx context.Context, db *sql.DB, messageID string) ([]partRow, error) {
+ rows, err := db.QueryContext(ctx, `
+SELECT id, time_created, data
+FROM part
+WHERE message_id = ?
+ORDER BY time_created ASC, id ASC`, messageID)
+ if err != nil {
+ return nil, err
+ }
+ defer func() { _ = rows.Close() }()
+
+ parts := make([]partRow, 0)
+ for rows.Next() {
+ var part partRow
+ if err := rows.Scan(&part.ID, &part.TimeCreated, &part.Data); err != nil {
+ return nil, err
+ }
+ parts = append(parts, part)
+ }
+ if err := rows.Err(); err != nil {
+ return nil, err
+ }
+ return parts, nil
+}
+
+func (p *Parser) mapRecord(path string, record messageRow, partRows []partRow) (wire.TurnEvent, bool) {
+ msg, ok := parseMessageData(record.Data)
+ if !ok {
+ return wire.TurnEvent{}, false
+ }
+ role := strings.TrimSpace(msg.Role)
+ if role != "user" && role != "assistant" && role != "system" {
+ return wire.TurnEvent{}, false
+ }
+
+ texts := make([]string, 0)
+ toolCalls := make([]toolCallSummary, 0)
+ for _, row := range partRows {
+ part, ok := parsePartData(row.Data)
+ if !ok {
+ continue
+ }
+ switch part.Type {
+ case "text", "reasoning":
+ if text := strings.TrimSpace(part.Text); text != "" {
+ texts = append(texts, text)
+ }
+ case "tool":
+ summary := summarizeToolPart(part)
+ toolCalls = append(toolCalls, summary)
+ if rendered := renderToolSummary(summary); rendered != "" {
+ texts = append(texts, rendered)
+ }
+ }
+ }
+ content := strings.Join(texts, "\n\n")
+ if strings.TrimSpace(content) == "" && len(toolCalls) == 0 {
+ return wire.TurnEvent{}, false
+ }
+
+ event := wire.TurnEvent{
+ Tool: toolName,
+ Host: p.host,
+ SessionID: record.SessionID,
+ TurnID: turnIDFor(record),
+ Seq: record.RowID,
+ Role: role,
+ Timestamp: record.TimeCreated / 1000,
+ Content: content,
+ SessionMeta: wire.SessionMeta{
+ WorkingDir: workingDirFor(record, msg),
+ SourceFile: path,
+ StartedAt: int64PtrOrNil(record.SessionCreated / 1000),
+ Metadata: sessionMetadata(record),
+ },
+ Metadata: json.RawMessage(record.Data),
+ }
+ if model := strings.TrimSpace(msg.ModelID); model != "" {
+ event.Model = &model
+ }
+ if msg.Tokens.Input != 0 {
+ event.TokensIn = int64PtrOrNil(msg.Tokens.Input)
+ }
+ if msg.Tokens.Output != 0 {
+ event.TokensOut = int64PtrOrNil(msg.Tokens.Output)
+ }
+ if msg.Cost != 0 {
+ cost := msg.Cost
+ event.CostUSD = &cost
+ }
+ if len(toolCalls) > 0 {
+ payload, err := json.Marshal(toolCalls)
+ if err == nil {
+ event.ToolCalls = payload
+ }
+ }
+ return event, true
+}
+
+func parseMessageData(raw string) (messageData, bool) {
+ var msg messageData
+ if err := json.Unmarshal([]byte(raw), &msg); err != nil {
+ return messageData{}, false
+ }
+ msg.Raw = cloneRaw([]byte(raw))
+ return msg, true
+}
+
+func parsePartData(raw string) (partData, bool) {
+ var part partData
+ if err := json.Unmarshal([]byte(raw), &part); err != nil {
+ return partData{}, false
+ }
+ part.Raw = cloneRaw([]byte(raw))
+ return part, true
+}
+
+func summarizeToolPart(part partData) toolCallSummary {
+ summary := toolCallSummary{Tool: strings.TrimSpace(part.Tool), CallID: strings.TrimSpace(part.CallID)}
+ var state toolState
+ if len(part.State) > 0 && json.Unmarshal(part.State, &state) == nil {
+ summary.Status = strings.TrimSpace(state.Status)
+ summary.Output = safeOutputSummary(state.Output)
+ }
+ return summary
+}
+
+func safeOutputSummary(raw json.RawMessage) string {
+ trimmed := strings.TrimSpace(string(raw))
+ if trimmed == "" || trimmed == "null" {
+ return ""
+ }
+ var text string
+ if err := json.Unmarshal(raw, &text); err == nil {
+ return summarizeText(text)
+ }
+ return summarizeText(trimmed)
+}
+
+func renderToolSummary(summary toolCallSummary) string {
+ label := summary.Tool
+ if label == "" {
+ label = "tool"
+ }
+ parts := []string{fmt.Sprintf("<tool: %s", label)}
+ if summary.Status != "" {
+ parts = append(parts, " "+summary.Status)
+ }
+ if summary.Output != "" {
+ parts = append(parts, " - "+summary.Output)
+ }
+ return strings.Join(parts, "") + ">"
+}
+
+func sessionMetadata(record messageRow) json.RawMessage {
+ payload := struct {
+ Slug string `json:"slug,omitempty"`
+ Version string `json:"version,omitempty"`
+ }{Slug: record.Slug, Version: record.Version}
+ b, err := json.Marshal(payload)
+ if err != nil || string(b) == "{}" {
+ return nil
+ }
+ return b
+}
+
+func workingDirFor(record messageRow, msg messageData) *string {
+ for _, value := range []string{record.Directory, msg.Path.CWD, msg.Path.Root} {
+ if value = strings.TrimSpace(value); value != "" {
+ return &value
+ }
+ }
+ return nil
+}
+
+func turnIDFor(record messageRow) string {
+ if strings.TrimSpace(record.ID) != "" {
+ return record.ID
+ }
+ sum := sha256.Sum256([]byte(fmt.Sprintf("%s|%d", record.SessionID, record.TimeCreated)))
+ return hex.EncodeToString(sum[:8])
+}
+
+func summarizeText(text string) string {
+ text = strings.TrimSpace(text)
+ if text == "" {
+ return ""
+ }
+ line := text
+ if idx := strings.IndexByte(line, '\n'); idx >= 0 {
+ line = line[:idx]
+ }
+ line = strings.TrimSpace(line)
+ if len(line) > 120 {
+ line = line[:117] + "..."
+ }
+ return line
+}
+
+func int64PtrOrNil(v int64) *int64 {
+ if v == 0 {
+ return nil
+ }
+ return &v
+}
+
+func cloneRaw(raw json.RawMessage) json.RawMessage {
+ if len(raw) == 0 {
+ return nil
+ }
+ out := make([]byte, len(raw))
+ copy(out, raw)
+ return out
+}
+
+func readOnlyDSN(path string) string {
+ return "file:" + path + "?mode=ro&_pragma=busy_timeout(5000)"
+}
+
+var _ parser.Parser = (*Parser)(nil)
A internal/collector/parser/opencode/parser_test.go => internal/collector/parser/opencode/parser_test.go +334 -0
@@ 0,0 1,334 @@
+package opencode
+
+import (
+ "database/sql"
+ "encoding/json"
+ "os"
+ "path/filepath"
+ "strings"
+ "testing"
+
+ _ "modernc.org/sqlite"
+)
+
+func TestDiscover_FindsCanonicalDatabase(t *testing.T) {
+ root := t.TempDir()
+ dbPath := filepath.Join(root, "opencode.db")
+ writeFile(t, dbPath, "sqlite fixture")
+ writeFile(t, filepath.Join(root, "opencode.db-wal"), "wal")
+ writeFile(t, filepath.Join(root, "storage", "message", "ignored.json"), "{}")
+
+ p := New("laptop")
+ files, err := p.Discover(root)
+ if err != nil {
+ t.Fatalf("Discover(root): %v", err)
+ }
+ if len(files) != 1 {
+ t.Fatalf("len(files) = %d, want 1", len(files))
+ }
+ if got := files[0].Path; got != dbPath {
+ t.Fatalf("files[0].Path = %q, want %q", got, dbPath)
+ }
+ if files[0].Size == 0 {
+ t.Fatal("files[0].Size = 0, want database size")
+ }
+
+ files, err = p.Discover(dbPath)
+ if err != nil {
+ t.Fatalf("Discover(dbPath): %v", err)
+ }
+ if len(files) != 1 || files[0].Path != dbPath {
+ t.Fatalf("Discover(dbPath) = %#v, want canonical db only", files)
+ }
+}
+
+func TestParse_MapsTurnsAndIdentity(t *testing.T) {
+ dbPath := createOpenCodeDB(t)
+ insertSession(t, dbPath, "ses_demo", "/workspace/demo", 1700000000000)
+ insertMessage(t, dbPath, "msg_user", "ses_demo", 1700000001000, `{"role":"user","path":{"cwd":"/workspace/demo"},"time":{"created":1700000001000}}`)
+ insertPart(t, dbPath, "prt_user_text", "msg_user", "ses_demo", 1700000001001, `{"type":"text","text":"Please inspect the failing test."}`)
+ insertMessage(t, dbPath, "msg_assistant", "ses_demo", 1700000002000, `{"role":"assistant","modelID":"model-redacted","tokens":{"input":7,"output":11},"cost":0.00042}`)
+ insertPart(t, dbPath, "prt_step", "msg_assistant", "ses_demo", 1700000002001, `{"type":"step-start"}`)
+ insertPart(t, dbPath, "prt_reason", "msg_assistant", "ses_demo", 1700000002002, `{"type":"reasoning","text":"Short safe reasoning summary."}`)
+ insertPart(t, dbPath, "prt_text", "msg_assistant", "ses_demo", 1700000002003, `{"type":"text","text":"The test fails because the parser is missing."}`)
+
+ p := New("laptop")
+ events, next, err := p.Parse(dbPath, 0)
+ if err != nil {
+ t.Fatalf("Parse: %v", err)
+ }
+ if len(events) != 2 {
+ t.Fatalf("len(events) = %d, want 2", len(events))
+ }
+ if next != 3 {
+ t.Fatalf("next = %d, want marker after assistant message", next)
+ }
+
+ user := events[0]
+ if user.Tool != "opencode" || user.Host != "laptop" || user.SessionID != "ses_demo" || user.TurnID != "msg_user" {
+ t.Fatalf("user identity = tool=%q host=%q session=%q turn=%q", user.Tool, user.Host, user.SessionID, user.TurnID)
+ }
+ if user.Seq != 1 || user.Timestamp != 1700000001 || user.Role != "user" {
+ t.Fatalf("user seq/timestamp/role = %d/%d/%q", user.Seq, user.Timestamp, user.Role)
+ }
+ if user.Content != "Please inspect the failing test." {
+ t.Fatalf("user.Content = %q", user.Content)
+ }
+ if user.SessionMeta.WorkingDir == nil || *user.SessionMeta.WorkingDir != "/workspace/demo" {
+ t.Fatalf("user working dir = %v, want session directory", user.SessionMeta.WorkingDir)
+ }
+ if user.SessionMeta.SourceFile != dbPath {
+ t.Fatalf("user source_file = %q, want %q", user.SessionMeta.SourceFile, dbPath)
+ }
+
+ assistant := events[1]
+ if assistant.Role != "assistant" {
+ t.Fatalf("assistant.Role = %q", assistant.Role)
+ }
+ if !strings.Contains(assistant.Content, "Short safe reasoning summary.") || !strings.Contains(assistant.Content, "The test fails because the parser is missing.") {
+ t.Fatalf("assistant.Content = %q", assistant.Content)
+ }
+ if assistant.Model == nil || *assistant.Model != "model-redacted" {
+ t.Fatalf("assistant.Model = %v", assistant.Model)
+ }
+ if assistant.TokensIn == nil || *assistant.TokensIn != 7 || assistant.TokensOut == nil || *assistant.TokensOut != 11 {
+ t.Fatalf("assistant tokens = %v/%v", assistant.TokensIn, assistant.TokensOut)
+ }
+ if assistant.CostUSD == nil || *assistant.CostUSD != 0.00042 {
+ t.Fatalf("assistant.CostUSD = %v", assistant.CostUSD)
+ }
+}
+
+func TestParse_MapsToolPartSummaryWithoutExternalBlob(t *testing.T) {
+ dbPath := createOpenCodeDB(t)
+ insertSession(t, dbPath, "ses_tools", "/workspace/tools", 1700000100000)
+ insertMessage(t, dbPath, "msg_tool", "ses_tools", 1700000101000, `{"role":"assistant"}`)
+ insertPart(t, dbPath, "prt_tool", "msg_tool", "ses_tools", 1700000101001, `{"type":"tool","tool":"bash","callID":"call_safe","state":{"status":"completed","output":"go test ./internal/collector/parser/opencode ok"}}`)
+ toolOutputPath := filepath.Join(filepath.Dir(dbPath), "tool-output", "tool_call_safe")
+ writeFile(t, toolOutputPath, "SECRET=do-not-ingest")
+
+ events, _, err := New("laptop").Parse(dbPath, 0)
+ if err != nil {
+ t.Fatalf("Parse: %v", err)
+ }
+ if len(events) != 1 {
+ t.Fatalf("len(events) = %d, want 1", len(events))
+ }
+ if events[0].Role != "assistant" {
+ t.Fatalf("Role = %q, want assistant", events[0].Role)
+ }
+ if !strings.Contains(events[0].Content, "<tool: bash completed - go test ./internal/collector/parser/opencode ok>") {
+ t.Fatalf("Content = %q", events[0].Content)
+ }
+ if strings.Contains(events[0].Content, "SECRET") || strings.Contains(string(events[0].ToolCalls), "SECRET") {
+ t.Fatalf("external tool-output blob was ingested: content=%q tool_calls=%s", events[0].Content, string(events[0].ToolCalls))
+ }
+ if !strings.Contains(string(events[0].ToolCalls), `"call_id":"call_safe"`) || !strings.Contains(string(events[0].ToolCalls), `"output":"go test ./internal/collector/parser/opencode ok"`) {
+ t.Fatalf("ToolCalls = %s, want safe part summary", string(events[0].ToolCalls))
+ }
+}
+
+func TestParse_ResumesFromMessageRowIDMarker(t *testing.T) {
+ dbPath := createOpenCodeDB(t)
+ insertSession(t, dbPath, "ses_resume", "/workspace/resume", 1700000200000)
+ insertMessage(t, dbPath, "msg_old", "ses_resume", 1700000201000, `{"role":"user"}`)
+ insertPart(t, dbPath, "prt_old", "msg_old", "ses_resume", 1700000201001, `{"type":"text","text":"old"}`)
+ insertMessage(t, dbPath, "msg_new", "ses_resume", 1700000202000, `{"role":"assistant"}`)
+ insertPart(t, dbPath, "prt_new", "msg_new", "ses_resume", 1700000202001, `{"type":"text","text":"new"}`)
+
+ events, next, err := New("laptop").Parse(dbPath, 2)
+ if err != nil {
+ t.Fatalf("Parse: %v", err)
+ }
+ if len(events) != 1 || events[0].TurnID != "msg_new" || events[0].Content != "new" {
+ t.Fatalf("events = %#v, want only new message", events)
+ }
+ if next != 3 {
+ t.Fatalf("next = %d, want newest marker", next)
+ }
+
+ events, next, err = New("laptop").Parse(dbPath, next)
+ if err != nil {
+ t.Fatalf("Parse second: %v", err)
+ }
+ if len(events) != 0 || next != 3 {
+ t.Fatalf("second parse = len %d next %d, want no events and stable marker", len(events), next)
+ }
+}
+
+func TestParse_IdenticalMessageTimesResumeByRowID(t *testing.T) {
+ dbPath := createOpenCodeDB(t)
+ insertSession(t, dbPath, "ses_same_time", "/workspace/same-time", 1700000250000)
+ insertMessage(t, dbPath, "msg_first", "ses_same_time", 1700000251000, `{"role":"user"}`)
+ insertPart(t, dbPath, "prt_first", "msg_first", "ses_same_time", 1700000251001, `{"type":"text","text":"first"}`)
+ insertMessage(t, dbPath, "msg_second", "ses_same_time", 1700000251000, `{"role":"assistant"}`)
+ insertPart(t, dbPath, "prt_second", "msg_second", "ses_same_time", 1700000251002, `{"type":"text","text":"second"}`)
+
+ events, next, err := New("laptop").Parse(dbPath, 0)
+ if err != nil {
+ t.Fatalf("Parse: %v", err)
+ }
+ if len(events) != 2 || events[0].TurnID != "msg_first" || events[1].TurnID != "msg_second" {
+ t.Fatalf("events = %#v, want both equal-time messages in rowid order", events)
+ }
+ if events[0].Seq != 1 || events[1].Seq != 2 || next != 3 {
+ t.Fatalf("seqs/next = %d/%d/%d, want rowid markers 1/2/3", events[0].Seq, events[1].Seq, next)
+ }
+
+ events, next, err = New("laptop").Parse(dbPath, events[1].Seq)
+ if err != nil {
+ t.Fatalf("Parse resume: %v", err)
+ }
+ if len(events) != 1 || events[0].TurnID != "msg_second" || events[0].Content != "second" {
+ t.Fatalf("resume events = %#v, want only second equal-time message", events)
+ }
+ if next != 3 {
+ t.Fatalf("resume next = %d, want marker after second rowid", next)
+ }
+
+ events, next, err = New("laptop").Parse(dbPath, next)
+ if err != nil {
+ t.Fatalf("Parse after full resume: %v", err)
+ }
+ if len(events) != 0 || next != 3 {
+ t.Fatalf("after full parse = len %d next %d, want no rows and unchanged marker 3", len(events), next)
+ }
+}
+
+func TestParse_EmptyParseReturnsInputMarker(t *testing.T) {
+ dbPath := createOpenCodeDB(t)
+ insertSession(t, dbPath, "ses_empty", "/workspace/empty", 1700000260000)
+
+ events, next, err := New("laptop").Parse(dbPath, 7)
+ if err != nil {
+ t.Fatalf("Parse: %v", err)
+ }
+ if len(events) != 0 || next != 7 {
+ t.Fatalf("empty parse = len %d next %d, want no events and unchanged marker 7", len(events), next)
+ }
+}
+
+func TestParse_ReturnsMarkerAfterHighestScannedRowIDWithGaps(t *testing.T) {
+ dbPath := createOpenCodeDB(t)
+ insertSession(t, dbPath, "ses_gap", "/workspace/gap", 1700000270000)
+ insertMessageWithRowID(t, dbPath, 5, "msg_gap", "ses_gap", 1700000271000, `{"role":"user"}`)
+ insertPart(t, dbPath, "prt_gap", "msg_gap", "ses_gap", 1700000271001, `{"type":"text","text":"gap"}`)
+
+ events, next, err := New("laptop").Parse(dbPath, 1)
+ if err != nil {
+ t.Fatalf("Parse: %v", err)
+ }
+ if len(events) != 1 || events[0].Seq != 5 || events[0].TurnID != "msg_gap" {
+ t.Fatalf("events = %#v, want rowid 5 event", events)
+ }
+ if next != 6 {
+ t.Fatalf("next = %d, want marker after highest scanned rowid", next)
+ }
+}
+
+func TestParse_SkipsMalformedAndUnknownRecordsButAdvances(t *testing.T) {
+ dbPath := createOpenCodeDB(t)
+ insertSession(t, dbPath, "ses_bad", "/workspace/bad", 1700000300000)
+ insertMessage(t, dbPath, "msg_bad_json", "ses_bad", 1700000301000, `{not-json}`)
+ insertPart(t, dbPath, "prt_bad", "msg_bad_json", "ses_bad", 1700000301001, `{"type":"text","text":"bad message should not leak"}`)
+ insertMessage(t, dbPath, "msg_unknown_role", "ses_bad", 1700000302000, `{"role":"plugin"}`)
+ insertPart(t, dbPath, "prt_unknown", "msg_unknown_role", "ses_bad", 1700000302001, `{"type":"text","text":"unknown role"}`)
+ insertMessage(t, dbPath, "msg_good", "ses_bad", 1700000303000, `{"role":"user"}`)
+ insertPart(t, dbPath, "prt_good", "msg_good", "ses_bad", 1700000303001, `{"type":"text","text":"good"}`)
+
+ events, next, err := New("laptop").Parse(dbPath, 0)
+ if err != nil {
+ t.Fatalf("Parse: %v", err)
+ }
+ if len(events) != 1 || events[0].TurnID != "msg_good" || events[0].Content != "good" {
+ t.Fatalf("events = %#v, want only good event", events)
+ }
+ if next != 4 {
+ t.Fatalf("next = %d, want all records consumed", next)
+ }
+}
+
+func createOpenCodeDB(t *testing.T) string {
+ t.Helper()
+ dir := t.TempDir()
+ dbPath := filepath.Join(dir, "opencode.db")
+ db, err := sql.Open("sqlite", dbPath)
+ if err != nil {
+ t.Fatalf("open sqlite: %v", err)
+ }
+ defer func() { _ = db.Close() }()
+ ddl, err := os.ReadFile(filepath.Join("testdata", "schema.sql"))
+ if err != nil {
+ t.Fatalf("read schema: %v", err)
+ }
+ if _, err := db.Exec(string(ddl)); err != nil {
+ t.Fatalf("create schema: %v", err)
+ }
+ return dbPath
+}
+
+func insertSession(t *testing.T, dbPath, id, directory string, created int64) {
+ t.Helper()
+ withDB(t, dbPath, func(db *sql.DB) {
+ _, err := db.Exec(`INSERT INTO session (id, project_id, slug, directory, title, version, time_created, time_updated) VALUES (?, 'proj_redacted', 'quiet-river', ?, 'redacted', '0.0.0', ?, ?)`, id, directory, created, created)
+ if err != nil {
+ t.Fatalf("insert session: %v", err)
+ }
+ })
+}
+
+func insertMessage(t *testing.T, dbPath, id, sessionID string, created int64, data string) {
+ t.Helper()
+ if !json.Valid([]byte(data)) && !strings.HasPrefix(data, "{not-json}") {
+ t.Fatalf("test fixture message data is invalid unexpectedly: %s", data)
+ }
+ withDB(t, dbPath, func(db *sql.DB) {
+ _, err := db.Exec(`INSERT INTO message (id, session_id, time_created, time_updated, data) VALUES (?, ?, ?, ?, ?)`, id, sessionID, created, created, data)
+ if err != nil {
+ t.Fatalf("insert message: %v", err)
+ }
+ })
+}
+
+func insertMessageWithRowID(t *testing.T, dbPath string, rowID int64, id, sessionID string, created int64, data string) {
+ t.Helper()
+ if !json.Valid([]byte(data)) && !strings.HasPrefix(data, "{not-json}") {
+ t.Fatalf("test fixture message data is invalid unexpectedly: %s", data)
+ }
+ withDB(t, dbPath, func(db *sql.DB) {
+ _, err := db.Exec(`INSERT INTO message (rowid, id, session_id, time_created, time_updated, data) VALUES (?, ?, ?, ?, ?, ?)`, rowID, id, sessionID, created, created, data)
+ if err != nil {
+ t.Fatalf("insert message with rowid: %v", err)
+ }
+ })
+}
+
+func insertPart(t *testing.T, dbPath, id, messageID, sessionID string, created int64, data string) {
+ t.Helper()
+ withDB(t, dbPath, func(db *sql.DB) {
+ _, err := db.Exec(`INSERT INTO part (id, message_id, session_id, time_created, time_updated, data) VALUES (?, ?, ?, ?, ?, ?)`, id, messageID, sessionID, created, created, data)
+ if err != nil {
+ t.Fatalf("insert part: %v", err)
+ }
+ })
+}
+
+func withDB(t *testing.T, dbPath string, fn func(*sql.DB)) {
+ t.Helper()
+ db, err := sql.Open("sqlite", dbPath)
+ if err != nil {
+ t.Fatalf("open sqlite: %v", err)
+ }
+ defer func() { _ = db.Close() }()
+ fn(db)
+}
+
+func writeFile(t *testing.T, path, body string) {
+ t.Helper()
+ if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil {
+ t.Fatalf("MkdirAll(%q): %v", path, err)
+ }
+ if err := os.WriteFile(path, []byte(body), 0o600); err != nil {
+ t.Fatalf("WriteFile(%q): %v", path, err)
+ }
+}
A internal/collector/parser/opencode/testdata/schema.sql => internal/collector/parser/opencode/testdata/schema.sql +31 -0
@@ 0,0 1,31 @@
+CREATE TABLE session (
+ id TEXT PRIMARY KEY,
+ project_id TEXT NOT NULL,
+ parent_id TEXT,
+ slug TEXT NOT NULL,
+ directory TEXT NOT NULL,
+ title TEXT NOT NULL,
+ version TEXT NOT NULL,
+ time_created INTEGER NOT NULL,
+ time_updated INTEGER NOT NULL
+);
+
+CREATE TABLE message (
+ id TEXT PRIMARY KEY,
+ session_id TEXT NOT NULL,
+ time_created INTEGER NOT NULL,
+ time_updated INTEGER NOT NULL,
+ data TEXT NOT NULL
+);
+
+CREATE TABLE part (
+ id TEXT PRIMARY KEY,
+ message_id TEXT NOT NULL,
+ session_id TEXT NOT NULL,
+ time_created INTEGER NOT NULL,
+ time_updated INTEGER NOT NULL,
+ data TEXT NOT NULL
+);
+
+CREATE INDEX part_session_idx ON part (session_id);
+CREATE INDEX session_project_idx ON session (project_id);
A internal/domain/search/handler.go => internal/domain/search/handler.go +154 -0
@@ 0,0 1,154 @@
+package search
+
+import (
+ "context"
+ "log/slog"
+ "net/http"
+ "strconv"
+ "strings"
+
+ "github.com/go-chi/chi/v5"
+ "go.bigb.es/auxilia/culpa"
+ "go.bigb.es/auxilia/scribe"
+
+ "sourcecraft.dev/bigbes/lethe/internal/domain/session"
+ "sourcecraft.dev/bigbes/lethe/internal/pkg/apierror"
+ "sourcecraft.dev/bigbes/lethe/internal/pkg/httputil"
+ "sourcecraft.dev/bigbes/lethe/internal/server/auth"
+)
+
+const (
+ defaultLimit = 50
+ maxLimit = 200
+)
+
+const allOwnersSentinel = "*"
+
+// Handler is the steward-managed HTTP boundary for the search read API.
+// Repo is the injected SQL steward; the Handler holds no other state.
+type Handler struct {
+ Repo *Repository `inject:""`
+}
+
+// Init satisfies the steward Initer contract.
+func (h *Handler) Init(_ context.Context) error { return nil }
+
+// Mount registers the read route under r. Server.Init mounts this inside
+// the /api/v1 group, so the effective path is /api/v1/search.
+func (h *Handler) Mount(r chi.Router) {
+ r.Get("/search", h.List)
+}
+
+// List handles GET /search. It resolves the owner scope, parses query
+// parameters, clamps pagination, and writes a search.Result. Errors surface
+// through apierror.Render as RFC 7807.
+func (h *Handler) List(w http.ResponseWriter, r *http.Request) {
+ scope, err := h.resolveScope(r)
+ if err != nil {
+ apierror.Render(w, r, err)
+ return
+ }
+
+ q := r.URL.Query()
+ filter := Filter{Owner: scope}
+
+ filter.Query = strings.TrimSpace(q.Get("q"))
+ if filter.Query == "" {
+ apierror.Render(w, r, culpa.WithCode(
+ culpa.WithPublic(culpa.New("search query is empty"), "q must not be empty"),
+ "INVALID",
+ ))
+ return
+ }
+
+ if v := q.Get("include_tool_outputs"); v == "1" {
+ filter.IncludeToolOutputs = true
+ }
+ if v := q.Get("tool"); v != "" {
+ filter.Tool = &v
+ }
+ if v := q.Get("host"); v != "" {
+ filter.Host = &v
+ }
+ if v := q.Get("since"); v != "" {
+ n, perr := strconv.ParseInt(v, 10, 64)
+ if perr != nil {
+ apierror.Render(w, r, culpa.WithCode(
+ culpa.WithPublic(culpa.Wrap(perr, "parse since"), "since must be an integer (unix epoch seconds)"),
+ "INVALID",
+ ))
+ return
+ }
+ filter.Since = &n
+ }
+ if v := q.Get("until"); v != "" {
+ n, perr := strconv.ParseInt(v, 10, 64)
+ if perr != nil {
+ apierror.Render(w, r, culpa.WithCode(
+ culpa.WithPublic(culpa.Wrap(perr, "parse until"), "until must be an integer (unix epoch seconds)"),
+ "INVALID",
+ ))
+ return
+ }
+ filter.Until = &n
+ }
+ if filter.Since != nil && filter.Until != nil && *filter.Since > *filter.Until {
+ apierror.Render(w, r, culpa.WithCode(
+ culpa.WithPublic(culpa.New("since > until"), "since must be <= until"),
+ "INVALID",
+ ))
+ return
+ }
+
+ filter.Limit = clampLimit(q.Get("limit"))
+ filter.Cursor = q.Get("cursor")
+
+ result, err := h.Repo.Search(r.Context(), filter)
+ if err != nil {
+ apierror.Render(w, r, err)
+ return
+ }
+
+ if writeErr := httputil.WriteJSON(w, http.StatusOK, result); writeErr != nil {
+ slog.Default().ErrorContext(r.Context(), "write search response", scribe.Err(writeErr))
+ }
+}
+
+// resolveScope reads the authenticated identity off the context and the
+// optional `?owner=` query parameter, then returns the appropriate
+// session.OwnerScope. Non-admin requests with `?owner=` set are 403.
+func (h *Handler) resolveScope(r *http.Request) (session.OwnerScope, error) {
+ id := auth.MustIdentity(r.Context())
+ param := r.URL.Query().Get("owner")
+ if param == "" {
+ return session.OwnerScope{User: id.User}, nil
+ }
+ if !id.IsAdmin {
+ return session.OwnerScope{}, culpa.WithCode(
+ culpa.WithPublic(culpa.New("?owner= is admin-only"), "?owner= is admin-only"),
+ "FORBIDDEN",
+ )
+ }
+ if param == allOwnersSentinel {
+ return session.OwnerScope{User: id.User, AllOwners: true}, nil
+ }
+ owner := strings.ToLower(param)
+ return session.OwnerScope{User: id.User, SpecificOwner: &owner}, nil
+}
+
+// clampLimit returns the effective limit: defaultLimit when missing,
+// non-numeric, or negative; capped at maxLimit when the parsed value
+// exceeds it.
+func clampLimit(raw string) int {
+ if raw == "" {
+ return defaultLimit
+ }
+ n, err := strconv.Atoi(raw)
+ if err != nil || n < 0 {
+ return defaultLimit
+ }
+ if n > maxLimit {
+ return maxLimit
+ }
+ return n
+}
A internal/domain/search/handler_test.go => internal/domain/search/handler_test.go +283 -0
@@ 0,0 1,283 @@
+package search_test
+
+import (
+ "context"
+ "encoding/json"
+ "net/http"
+ "net/http/httptest"
+ "testing"
+
+ "github.com/go-chi/chi/v5"
+
+ "sourcecraft.dev/bigbes/lethe/internal/domain/search"
+ "sourcecraft.dev/bigbes/lethe/internal/server/auth"
+)
+
+// fakeAuthMiddleware injects a fixed Identity onto the request context.
+func fakeAuthMiddleware(id auth.Identity) func(http.Handler) http.Handler {
+ return func(next http.Handler) http.Handler {
+ return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ ctx := auth.WithIdentity(r.Context(), id)
+ next.ServeHTTP(w, r.WithContext(ctx))
+ })
+ }
+}
+
+// newHandler wires a Repository against a fresh in-memory database and
+// returns the Handler.
+func newHandler(t *testing.T) (*search.Handler, *search.Repository) {
+ t.Helper()
+ repo, _ := newRepo(t)
+ h := &search.Handler{Repo: repo}
+ if err := h.Init(context.Background()); err != nil {
+ t.Fatalf("handler.Init: %v", err)
+ }
+ return h, repo
+}
+
+// mountWithIdentity builds a chi router with fake auth and the search handler.
+func mountWithIdentity(h *search.Handler, id auth.Identity) http.Handler {
+ r := chi.NewRouter()
+ r.Route("/api/v1", func(r chi.Router) {
+ r.Use(fakeAuthMiddleware(id))
+ h.Mount(r)
+ })
+ return r
+}
+
+// problemBody captures RFC 7807 fields tests assert on.
+type problemBody struct {
+ Status int `json:"status"`
+ Code string `json:"code"`
+}
+
+// doSearch performs GET /api/v1/search with the given query string.
+func doSearch(t *testing.T, router http.Handler, query string) (*httptest.ResponseRecorder, search.Result) {
+ t.Helper()
+ req := httptest.NewRequest(http.MethodGet, "/api/v1/search", nil)
+ if query != "" {
+ req.URL.RawQuery = query[1:] // strip leading '?'
+ }
+ rec := httptest.NewRecorder()
+ router.ServeHTTP(rec, req)
+ var body search.Result
+ if rec.Code == http.StatusOK {
+ if err := json.Unmarshal(rec.Body.Bytes(), &body); err != nil {
+ t.Fatalf("unmarshal search body: %v (body=%s)", err, rec.Body.String())
+ }
+ }
+ return rec, body
+}
+
+func TestHandler_Mount_RegistersSearchRoute(t *testing.T) {
+ h, _ := newHandler(t)
+ router := chi.NewRouter()
+ router.Route("/api/v1", func(r chi.Router) {
+ r.Use(fakeAuthMiddleware(auth.Identity{User: "alice"}))
+ h.Mount(r)
+ })
+
+ found := false
+ _ = chi.Walk(router, func(method, route string, _ http.Handler, _ ...func(http.Handler) http.Handler) error {
+ if method == http.MethodGet && route == "/api/v1/search" {
+ found = true
+ }
+ return nil
+ })
+ if !found {
+ t.Fatal("expected GET /api/v1/search registered")
+ }
+}
+
+func TestHandler_MissingQueryReturns400(t *testing.T) {
+ h, _ := newHandler(t)
+ router := mountWithIdentity(h, auth.Identity{User: "alice"})
+
+ rec, _ := doSearch(t, router, "")
+ if rec.Code != http.StatusBadRequest {
+ t.Fatalf("status=%d; want 400; body=%s", rec.Code, rec.Body.String())
+ }
+ var p problemBody
+ _ = json.Unmarshal(rec.Body.Bytes(), &p)
+ if p.Code != "INVALID" {
+ t.Fatalf("expected INVALID; got %q", p.Code)
+ }
+}
+
+func TestHandler_EmptyQueryReturns400(t *testing.T) {
+ h, _ := newHandler(t)
+ router := mountWithIdentity(h, auth.Identity{User: "alice"})
+
+ rec, _ := doSearch(t, router, "?q= ")
+ if rec.Code != http.StatusBadRequest {
+ t.Fatalf("status=%d; want 400; body=%s", rec.Code, rec.Body.String())
+ }
+ var p problemBody
+ _ = json.Unmarshal(rec.Body.Bytes(), &p)
+ if p.Code != "INVALID" {
+ t.Fatalf("expected INVALID; got %q", p.Code)
+ }
+}
+
+func TestHandler_BadSinceReturns400(t *testing.T) {
+ h, _ := newHandler(t)
+ router := mountWithIdentity(h, auth.Identity{User: "alice"})
+
+ rec, _ := doSearch(t, router, "?q=hello&since=not-a-number")
+ if rec.Code != http.StatusBadRequest {
+ t.Fatalf("status=%d; want 400; body=%s", rec.Code, rec.Body.String())
+ }
+ var p problemBody
+ _ = json.Unmarshal(rec.Body.Bytes(), &p)
+ if p.Code != "INVALID" {
+ t.Fatalf("expected INVALID; got %q", p.Code)
+ }
+}
+
+func TestHandler_NonAdminOwnerParamReturns403(t *testing.T) {
+ h, _ := newHandler(t)
+ router := mountWithIdentity(h, auth.Identity{User: "alice", IsAdmin: false})
+
+ for _, q := range []string{"?q=hello&owner=alice", "?q=hello&owner=bob", "?q=hello&owner=*"} {
+ rec, _ := doSearch(t, router, q)
+ if rec.Code != http.StatusForbidden {
+ t.Fatalf("query %q: status=%d; want 403; body=%s", q, rec.Code, rec.Body.String())
+ }
+ var p problemBody
+ _ = json.Unmarshal(rec.Body.Bytes(), &p)
+ if p.Code != "FORBIDDEN" {
+ t.Fatalf("query %q: code=%q; want FORBIDDEN", q, p.Code)
+ }
+ }
+}
+
+func TestHandler_AdminOwnerStarReturnsAllOwners(t *testing.T) {
+ h, repo := newHandler(t)
+ db := repo.Database.DB
+ seedSession(t, db, "alice", "cc", "phoebe", "sa", 100, 110)
+ seedTurn(t, db, "alice", "cc", "phoebe", "sa", "t1", 1, 101, "user", "needle alpha", nil)
+ seedSession(t, db, "bob", "cc", "phoebe", "sb", 100, 110)
+ seedTurn(t, db, "bob", "cc", "phoebe", "sb", "t1", 1, 101, "user", "needle beta", nil)
+
+ router := mountWithIdentity(h, auth.Identity{User: "admin", IsAdmin: true})
+
+ rec, body := doSearch(t, router, "?q=needle&owner=*")
+ if rec.Code != http.StatusOK {
+ t.Fatalf("status=%d body=%s", rec.Code, rec.Body.String())
+ }
+ if len(body.Results) != 2 {
+ t.Fatalf("expected 2 rows; got %d (%#v)", len(body.Results), body.Results)
+ }
+}
+
+func TestHandler_BadCursorReturns400(t *testing.T) {
+ h, _ := newHandler(t)
+ router := mountWithIdentity(h, auth.Identity{User: "alice"})
+
+ rec, _ := doSearch(t, router, "?q=hello&cursor=not-valid")
+ if rec.Code != http.StatusBadRequest {
+ t.Fatalf("status=%d; want 400; body=%s", rec.Code, rec.Body.String())
+ }
+ var p problemBody
+ _ = json.Unmarshal(rec.Body.Bytes(), &p)
+ if p.Code != "INVALID" {
+ t.Fatalf("expected INVALID; got %q", p.Code)
+ }
+}
+
+func TestHandler_SuccessfulResponseEnvelope(t *testing.T) {
+ h, repo := newHandler(t)
+ db := repo.Database.DB
+ seedSession(t, db, "alice", "cc", "phoebe", "s1", 100, 110)
+ seedTurn(t, db, "alice", "cc", "phoebe", "s1", "t1", 1, 101, "user", "needle in haystack", nil)
+
+ router := mountWithIdentity(h, auth.Identity{User: "alice"})
+
+ rec, body := doSearch(t, router, "?q=needle&limit=10")
+ if rec.Code != http.StatusOK {
+ t.Fatalf("status=%d body=%s", rec.Code, rec.Body.String())
+ }
+ if body.Limit != 10 {
+ t.Fatalf("limit=%d; want 10", body.Limit)
+ }
+ if len(body.Results) != 1 {
+ t.Fatalf("expected 1 result; got %d (%#v)", len(body.Results), body.Results)
+ }
+ if body.Results[0].SessionID != "s1" {
+ t.Fatalf("expected session s1; got %#v", body.Results[0])
+ }
+ // NextCursor should be empty when results fit in one page.
+ if body.NextCursor != "" {
+ t.Fatalf("expected empty next_cursor for single page; got %q", body.NextCursor)
+ }
+}
+
+func TestHandler_LimitClamping(t *testing.T) {
+ h, _ := newHandler(t)
+ router := mountWithIdentity(h, auth.Identity{User: "alice"})
+
+ // Missing limit → default 50
+ rec, body := doSearch(t, router, "?q=hello")
+ if rec.Code != http.StatusOK {
+ t.Fatalf("status=%d body=%s", rec.Code, rec.Body.String())
+ }
+ if body.Limit != 50 {
+ t.Fatalf("default limit=%d; want 50", body.Limit)
+ }
+
+ // Over max → capped at 200
+ rec, body = doSearch(t, router, "?q=hello&limit=999")
+ if rec.Code != http.StatusOK {
+ t.Fatalf("status=%d body=%s", rec.Code, rec.Body.String())
+ }
+ if body.Limit != 200 {
+ t.Fatalf("capped limit=%d; want 200", body.Limit)
+ }
+}
+
+func TestHandler_IncludeToolOutputsParam(t *testing.T) {
+ h, repo := newHandler(t)
+ db := repo.Database.DB
+ seedSession(t, db, "alice", "cc", "phoebe", "s1", 100, 110)
+ seedTurn(t, db, "alice", "cc", "phoebe", "s1", "prose", 1, 101, "user", "needle in prose", nil)
+ seedTurn(t, db, "alice", "cc", "phoebe", "s1", "tool", 2, 102, "assistant", "plain text", strptr(`{"output":"needle in shell"}`))
+
+ router := mountWithIdentity(h, auth.Identity{User: "alice"})
+
+ // Default: prose only
+ rec, body := doSearch(t, router, "?q=needle")
+ if rec.Code != http.StatusOK {
+ t.Fatalf("status=%d body=%s", rec.Code, rec.Body.String())
+ }
+ if len(body.Results) != 1 || body.Results[0].MatchSource != search.SourceTurn {
+ t.Fatalf("expected 1 prose result; got %#v", body.Results)
+ }
+
+ // Explicit include
+ rec, body = doSearch(t, router, "?q=needle&include_tool_outputs=1")
+ if rec.Code != http.StatusOK {
+ t.Fatalf("status=%d body=%s", rec.Code, rec.Body.String())
+ }
+ if len(body.Results) != 2 {
+ t.Fatalf("expected 2 results; got %#v", body.Results)
+ }
+}
+
+func TestHandler_PerUserIsolation(t *testing.T) {
+ h, repo := newHandler(t)
+ db := repo.Database.DB
+ seedSession(t, db, "alice", "cc", "phoebe", "sa", 100, 110)
+ seedTurn(t, db, "alice", "cc", "phoebe", "sa", "t1", 1, 101, "user", "alice needle", nil)
+ seedSession(t, db, "bob", "cc", "phoebe", "sb", 100, 110)
+ seedTurn(t, db, "bob", "cc", "phoebe", "sb", "t1", 1, 101, "user", "bob needle", nil)
+
+ router := mountWithIdentity(h, auth.Identity{User: "alice"})
+
+ rec, body := doSearch(t, router, "?q=needle")
+ if rec.Code != http.StatusOK {
+ t.Fatalf("status=%d body=%s", rec.Code, rec.Body.String())
+ }
+ if len(body.Results) != 1 || body.Results[0].Owner != "alice" {
+ t.Fatalf("alice should see only her row; got %#v", body.Results)
+ }
+}
A internal/domain/search/repository.go => internal/domain/search/repository.go +274 -0
@@ 0,0 1,274 @@
+package search
+
+import (
+ "context"
+ "crypto/sha256"
+ "database/sql"
+ "encoding/base64"
+ "encoding/hex"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "strings"
+
+ "go.bigb.es/auxilia/culpa"
+
+ "sourcecraft.dev/bigbes/lethe/internal/domain/session"
+ "sourcecraft.dev/bigbes/lethe/internal/platform/database"
+)
+
+const (
+ SourceTurn = "turn"
+ SourceToolOutput = "tool_output"
+
+ cursorVersion = 1
+)
+
+// Result is the repository response shape for paginated search output.
+type Result struct {
+ Results []Row `json:"results"`
+ Limit int `json:"limit"`
+ NextCursor string `json:"next_cursor,omitempty"`
+
+ Rows []Row `json:"-"`
+}
+
+// Row is one matched turn. Snippet is generated by SQLite FTS5 and uses marker
+// runes around hits; it never contains HTML markup added by the repository.
+type Row struct {
+ Owner string `db:"owner" json:"owner"`
+ Tool string `db:"tool" json:"tool"`
+ Host string `db:"host" json:"host"`
+ SessionID string `db:"session_id" json:"session_id"`
+ WorkingDir *string `db:"working_dir" json:"working_dir,omitempty"`
+ TurnID string `db:"turn_id" json:"turn_id"`
+ Seq int64 `db:"seq" json:"seq"`
+ Role string `db:"role" json:"role"`
+ Timestamp int64 `db:"timestamp" json:"timestamp"`
+ Rank float64 `db:"rank" json:"rank"`
+ MatchSource string `db:"match_source" json:"match_source"`
+ Snippet string `db:"snippet" json:"snippet"`
+ RowID int64 `db:"rowid" json:"-"`
+}
+
+// Filter controls query text, owner scoping, optional filters, and pagination.
+type Filter struct {
+ Owner session.OwnerScope
+ Query string
+ IncludeToolOutputs bool
+ Tool *string
+ Host *string
+ Since *int64
+ Until *int64
+ Limit int
+ Cursor string
+}
+
+// Cursor is the stable keyset position for the last row returned by a page.
+type Cursor struct {
+ Rank float64 `json:"rank"`
+ Timestamp int64 `json:"timestamp"`
+ TurnID string `json:"turn_id"`
+ MatchSource string `json:"match_source"`
+ RowID int64 `json:"rowid,omitempty"`
+}
+
+// Repository is the SQL steward for FTS search. It is read-only: Search only
+// builds SELECT statements over the existing FTS and turns tables.
+type Repository struct {
+ Database *database.Database `inject:""`
+}
+
+func (r *Repository) Init(_ context.Context) error { return nil }
+
+// Search executes an FTS5 query against prose turns by default and unions tool
+// output matches only when requested. Duplicate hits for the same turn are
+// collapsed by best rank, with deterministic tie-breakers.
+func (r *Repository) Search(ctx context.Context, f Filter) (*Result, error) {
+ if strings.TrimSpace(f.Query) == "" {
+ return nil, invalid("search query is empty")
+ }
+ if f.Limit <= 0 {
+ return &Result{Results: []Row{}, Limit: f.Limit, Rows: []Row{}}, nil
+ }
+
+ var after *Cursor
+ if f.Cursor != "" {
+ c, err := DecodeCursor(f.Cursor, f)
+ if err != nil {
+ return nil, err
+ }
+ after = &c
+ }
+
+ query, args := buildSearchSQL(f, after)
+ rows := make([]Row, 0)
+ if err := r.Database.DB.SelectContext(ctx, &rows, query, args...); err != nil {
+ if isFTSError(err) {
+ return nil, invalid("invalid search query")
+ }
+ return nil, culpa.WithCode(culpa.Wrap(err, "search"), "DB_QUERY")
+ }
+
+ res := &Result{Results: rows, Limit: f.Limit, Rows: rows}
+ if len(rows) > f.Limit {
+ last := rows[f.Limit-1]
+ res.Results = rows[:f.Limit]
+ res.Rows = res.Results
+ next, err := EncodeCursor(Cursor{Rank: last.Rank, Timestamp: last.Timestamp, TurnID: last.TurnID, MatchSource: last.MatchSource, RowID: last.RowID}, f)
+ if err != nil {
+ return nil, err
+ }
+ res.NextCursor = next
+ }
+ return res, nil
+}
+
+func buildSearchSQL(f Filter, after *Cursor) (string, []any) {
+ limit := f.Limit + 1
+ var sb strings.Builder
+ args := make([]any, 0, 12)
+
+ sb.WriteString(`WITH candidates AS (`)
+ appendFTSSelect(&sb, &args, "turns_fts", SourceTurn, 0, f)
+ if f.IncludeToolOutputs {
+ sb.WriteString(` UNION ALL `)
+ appendFTSSelect(&sb, &args, "tool_outputs_fts", SourceToolOutput, 1, f)
+ }
+ sb.WriteString(`), ranked AS (`)
+ sb.WriteString(`SELECT *, ROW_NUMBER() OVER (PARTITION BY rowid ORDER BY rank ASC, match_source ASC, rowid ASC) AS rn FROM candidates`)
+ sb.WriteString(`) SELECT t.owner, t.tool, t.host, t.session_id, s.working_dir, t.turn_id, t.seq, t.role, t.timestamp, ranked.rank, ranked.match_source, ranked.snippet, ranked.rowid`)
+ sb.WriteString(` FROM ranked JOIN turns t ON t.rowid = ranked.rowid`)
+ sb.WriteString(` JOIN sessions s ON s.owner = t.owner AND s.tool = t.tool AND s.host = t.host AND s.session_id = t.session_id`)
+ sb.WriteString(` WHERE ranked.rn = 1`)
+ if f.Since != nil {
+ sb.WriteString(` AND t.timestamp >= ?`)
+ args = append(args, *f.Since)
+ }
+ if f.Until != nil {
+ sb.WriteString(` AND t.timestamp <= ?`)
+ args = append(args, *f.Until)
+ }
+ if after != nil {
+ sb.WriteString(` AND (`)
+ sb.WriteString(`ranked.rank > ?`)
+ sb.WriteString(` OR (ranked.rank = ? AND t.timestamp < ?)`)
+ sb.WriteString(` OR (ranked.rank = ? AND t.timestamp = ? AND t.turn_id > ?)`)
+ sb.WriteString(` OR (ranked.rank = ? AND t.timestamp = ? AND t.turn_id = ? AND ranked.match_source > ?)`)
+ sb.WriteString(` OR (ranked.rank = ? AND t.timestamp = ? AND t.turn_id = ? AND ranked.match_source = ? AND ranked.rowid > ?)`)
+ sb.WriteString(`)`)
+ args = append(args,
+ after.Rank,
+ after.Rank, after.Timestamp,
+ after.Rank, after.Timestamp, after.TurnID,
+ after.Rank, after.Timestamp, after.TurnID, after.MatchSource,
+ after.Rank, after.Timestamp, after.TurnID, after.MatchSource, after.RowID,
+ )
+ }
+ sb.WriteString(` ORDER BY ranked.rank ASC, t.timestamp DESC, t.turn_id ASC, ranked.match_source ASC, ranked.rowid ASC LIMIT ?`)
+ args = append(args, limit)
+ return sb.String(), args
+}
+
+func appendFTSSelect(sb *strings.Builder, args *[]any, table, source string, priority int, f Filter) {
+ fmt.Fprintf(sb, `SELECT rowid, %d AS source_priority, '%s' AS match_source, bm25(%s) AS rank, snippet(%s, 0, char(2), char(3), '…', 12) AS snippet FROM %s WHERE %s MATCH ?`, priority, source, table, table, table, table)
+ *args = append(*args, f.Query)
+ appendFTSFilters(sb, args, table, f)
+}
+
+func appendFTSFilters(sb *strings.Builder, args *[]any, table string, f Filter) {
+ switch {
+ case f.Owner.AllOwners:
+ case f.Owner.SpecificOwner != nil:
+ fmt.Fprintf(sb, ` AND %s.owner = ?`, table)
+ *args = append(*args, *f.Owner.SpecificOwner)
+ default:
+ fmt.Fprintf(sb, ` AND %s.owner = ?`, table)
+ *args = append(*args, f.Owner.User)
+ }
+ if f.Tool != nil {
+ fmt.Fprintf(sb, ` AND %s.tool = ?`, table)
+ *args = append(*args, *f.Tool)
+ }
+ if f.Host != nil {
+ fmt.Fprintf(sb, ` AND %s.host = ?`, table)
+ *args = append(*args, *f.Host)
+ }
+}
+
+// EncodeCursor returns an opaque cursor bound to the normalized search tuple.
+func EncodeCursor(c Cursor, f Filter) (string, error) {
+ p := cursorPayload{Version: cursorVersion, Cursor: c, FilterHash: filterHash(f)}
+ b, err := json.Marshal(p)
+ if err != nil {
+ return "", culpa.WithCode(culpa.Wrap(err, "encode search cursor"), "INTERNAL")
+ }
+ return base64.RawURLEncoding.EncodeToString(b), nil
+}
+
+// DecodeCursor parses an opaque cursor and rejects cursors created for another
+// normalized query/filter tuple.
+func DecodeCursor(raw string, f Filter) (Cursor, error) {
+ b, err := base64.RawURLEncoding.DecodeString(raw)
+ if err != nil {
+ return Cursor{}, invalid("invalid search cursor")
+ }
+ var p cursorPayload
+ if err := json.Unmarshal(b, &p); err != nil {
+ return Cursor{}, invalid("invalid search cursor")
+ }
+ if p.Version != cursorVersion || p.FilterHash != filterHash(f) || p.Cursor.TurnID == "" || p.Cursor.MatchSource == "" || p.Cursor.RowID <= 0 {
+ return Cursor{}, invalid("invalid search cursor")
+ }
+ return p.Cursor, nil
+}
+
+type cursorPayload struct {
+ Version int `json:"v"`
+ FilterHash string `json:"h"`
+ Cursor Cursor `json:"c"`
+}
+
+type normalizedFilter struct {
+ OwnerUser string `json:"owner_user"`
+ OwnerAllOwners bool `json:"owner_all_owners"`
+ OwnerSpecificOwner *string `json:"owner_specific_owner"`
+ Query string `json:"query"`
+ IncludeToolOutputs bool `json:"include_tool_outputs"`
+ Tool *string `json:"tool"`
+ Host *string `json:"host"`
+ Since *int64 `json:"since"`
+ Until *int64 `json:"until"`
+}
+
+func filterHash(f Filter) string {
+ n := normalizedFilter{
+ OwnerUser: f.Owner.User,
+ OwnerAllOwners: f.Owner.AllOwners,
+ OwnerSpecificOwner: f.Owner.SpecificOwner,
+ Query: strings.TrimSpace(f.Query),
+ IncludeToolOutputs: f.IncludeToolOutputs,
+ Tool: f.Tool,
+ Host: f.Host,
+ Since: f.Since,
+ Until: f.Until,
+ }
+ b, err := json.Marshal(n)
+ if err != nil {
+ panic(err)
+ }
+ sum := sha256.Sum256(b)
+ return hex.EncodeToString(sum[:])
+}
+
+func invalid(msg string) error {
+ return culpa.WithCode(culpa.New(msg), "INVALID")
+}
+
+func isFTSError(err error) bool {
+ if errors.Is(err, sql.ErrNoRows) {
+ return false
+ }
+ s := strings.ToLower(err.Error())
+ return strings.Contains(s, "fts5") || strings.Contains(s, "fts syntax") || strings.Contains(s, "malformed match") || strings.Contains(s, "unterminated string")
+}
A internal/domain/search/repository_test.go => internal/domain/search/repository_test.go +262 -0
@@ 0,0 1,262 @@
+package search_test
+
+import (
+ "context"
+ "encoding/json"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/jmoiron/sqlx"
+ "go.bigb.es/auxilia/culpa"
+
+ "sourcecraft.dev/bigbes/lethe/internal/config"
+ "sourcecraft.dev/bigbes/lethe/internal/domain/search"
+ "sourcecraft.dev/bigbes/lethe/internal/domain/session"
+ "sourcecraft.dev/bigbes/lethe/internal/platform/database"
+)
+
+func newRepo(t *testing.T) (*search.Repository, *sqlx.DB) {
+ t.Helper()
+ d := &database.Database{Cfg: config.DatabaseConfig{Path: ":memory:", BusyTimeout: 5 * time.Second}}
+ if err := d.Init(context.Background()); err != nil {
+ t.Fatalf("database.Init: %v", err)
+ }
+ t.Cleanup(func() { _ = d.Destroy(context.Background()) })
+ repo := &search.Repository{Database: d}
+ if err := repo.Init(context.Background()); err != nil {
+ t.Fatalf("repo.Init: %v", err)
+ }
+ return repo, d.DB
+}
+
+func seedSession(t *testing.T, db *sqlx.DB, owner, tool, host, sid string, startedAt, endedAt int64) {
+ t.Helper()
+ seedSessionWithCwd(t, db, owner, tool, host, sid, startedAt, endedAt, nil)
+}
+
+func seedSessionWithCwd(t *testing.T, db *sqlx.DB, owner, tool, host, sid string, startedAt, endedAt int64, cwd *string) {
+ t.Helper()
+ _, err := db.Exec(`INSERT INTO sessions (owner, tool, host, session_id, started_at, ended_at, working_dir, source_file, metadata)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, NULL)`, owner, tool, host, sid, startedAt, endedAt, cwd, "/tmp/search.jsonl")
+ if err != nil {
+ t.Fatalf("seed session %s/%s/%s/%s: %v", owner, tool, host, sid, err)
+ }
+}
+
+func seedTurn(t *testing.T, db *sqlx.DB, owner, tool, host, sid, tid string, seq, ts int64, role, content string, toolCalls *string) {
+ t.Helper()
+ _, err := db.Exec(`INSERT INTO turns (owner, tool, host, session_id, turn_id, seq, role, timestamp, content,
+ model, tokens_in, tokens_out, cost_usd, tool_calls, metadata)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, NULL, NULL, NULL, NULL, ?, NULL)`,
+ owner, tool, host, sid, tid, seq, role, ts, content, toolCalls)
+ if err != nil {
+ t.Fatalf("seed turn %s/%s: %v", sid, tid, err)
+ }
+}
+
+func strptr(v string) *string { return &v }
+
+func TestSearchScopesToOwner(t *testing.T) {
+ repo, db := newRepo(t)
+ seedSession(t, db, "alice", "cc", "phoebe", "a", 100, 110)
+ seedTurn(t, db, "alice", "cc", "phoebe", "a", "t1", 1, 101, "user", "needle belongs to alice", nil)
+ seedSession(t, db, "bob", "cc", "phoebe", "b", 100, 110)
+ seedTurn(t, db, "bob", "cc", "phoebe", "b", "t1", 1, 101, "user", "needle belongs to bob", nil)
+
+ got, err := repo.Search(context.Background(), search.Filter{Owner: session.OwnerScope{User: "alice"}, Query: "needle", Limit: 10})
+ if err != nil {
+ t.Fatalf("Search: %v", err)
+ }
+ if len(got.Results) != 1 || got.Results[0].Owner != "alice" || got.Results[0].SessionID != "a" {
+ t.Fatalf("expected alice/a only; got %#v", got.Results)
+ }
+}
+
+func TestSearchAppliesToolHostAndTimeFilters(t *testing.T) {
+ repo, db := newRepo(t)
+ for _, row := range []struct {
+ tool, host, sid string
+ ts int64
+ }{{"cc", "phoebe", "keep", 200}, {"gemini", "phoebe", "badtool", 200}, {"cc", "rhea", "badhost", 200}, {"cc", "phoebe", "early", 100}, {"cc", "phoebe", "late", 300}} {
+ seedSession(t, db, "alice", row.tool, row.host, row.sid, row.ts-1, row.ts+1)
+ seedTurn(t, db, "alice", row.tool, row.host, row.sid, "t1", 1, row.ts, "user", "needle", nil)
+ }
+ tool, host := "cc", "phoebe"
+ since, until := int64(150), int64(250)
+ got, err := repo.Search(context.Background(), search.Filter{Owner: session.OwnerScope{User: "alice"}, Query: "needle", Tool: &tool, Host: &host, Since: &since, Until: &until, Limit: 10})
+ if err != nil {
+ t.Fatalf("Search: %v", err)
+ }
+ if len(got.Results) != 1 || got.Results[0].SessionID != "keep" {
+ t.Fatalf("expected keep only; got %#v", got.Results)
+ }
+}
+
+func TestSearchResultShapeIncludesWorkingDirRankMatchSourceAndEnvelope(t *testing.T) {
+ repo, db := newRepo(t)
+ cwd := "/code/project"
+ seedSessionWithCwd(t, db, "alice", "cc", "phoebe", "s", 100, 110, &cwd)
+ seedTurn(t, db, "alice", "cc", "phoebe", "s", "t1", 1, 101, "user", "needle", nil)
+
+ got, err := repo.Search(context.Background(), search.Filter{Owner: session.OwnerScope{User: "alice"}, Query: "needle", Limit: 10})
+ if err != nil {
+ t.Fatalf("Search: %v", err)
+ }
+ if got.Limit != 10 || len(got.Results) != 1 {
+ t.Fatalf("bad envelope: %#v", got)
+ }
+ row := got.Results[0]
+ if row.WorkingDir == nil || *row.WorkingDir != cwd {
+ t.Fatalf("WorkingDir = %#v; want %q", row.WorkingDir, cwd)
+ }
+ if row.Rank == 0 {
+ t.Fatalf("Rank was not populated: %#v", row)
+ }
+ if row.MatchSource != search.SourceTurn {
+ t.Fatalf("MatchSource = %q; want %q", row.MatchSource, search.SourceTurn)
+ }
+ b, err := json.Marshal(got)
+ if err != nil {
+ t.Fatalf("Marshal: %v", err)
+ }
+ js := string(b)
+ if !strings.Contains(js, `"results"`) || !strings.Contains(js, `"limit":10`) || strings.Contains(js, `"rows"`) || strings.Contains(js, `"source"`) {
+ t.Fatalf("JSON envelope incompatible: %s", js)
+ }
+}
+
+func TestSearchOrdersByRankBeforeTimestamp(t *testing.T) {
+ repo, db := newRepo(t)
+ seedSession(t, db, "alice", "cc", "phoebe", "s", 100, 500)
+ seedTurn(t, db, "alice", "cc", "phoebe", "s", "newer-weaker", 1, 400, "user", "needle filler filler filler filler filler filler filler", nil)
+ seedTurn(t, db, "alice", "cc", "phoebe", "s", "older-stronger", 2, 200, "user", "needle needle needle", nil)
+
+ got, err := repo.Search(context.Background(), search.Filter{Owner: session.OwnerScope{User: "alice"}, Query: "needle", Limit: 10})
+ if err != nil {
+ t.Fatalf("Search: %v", err)
+ }
+ if len(got.Results) < 2 {
+ t.Fatalf("need two results; got %#v", got.Results)
+ }
+ if got.Results[0].TurnID != "older-stronger" {
+ t.Fatalf("rank ordering ignored: got %#v", got.Results)
+ }
+ if got.Results[0].Rank > got.Results[1].Rank {
+ t.Fatalf("rank should be ascending (lower better): %#v", got.Results)
+ }
+}
+
+func TestSearchDefaultsToProseAndToolOutputIsOptIn(t *testing.T) {
+ repo, db := newRepo(t)
+ seedSession(t, db, "alice", "cc", "phoebe", "s", 100, 110)
+ seedTurn(t, db, "alice", "cc", "phoebe", "s", "prose", 1, 101, "user", "needle in prose", nil)
+ seedTurn(t, db, "alice", "cc", "phoebe", "s", "tool", 2, 102, "assistant", "plain text", strptr(`{"output":"needle in shell"}`))
+
+ proseOnly, err := repo.Search(context.Background(), search.Filter{Owner: session.OwnerScope{User: "alice"}, Query: "needle", Limit: 10})
+ if err != nil {
+ t.Fatalf("Search proseOnly: %v", err)
+ }
+ if len(proseOnly.Results) != 1 || proseOnly.Results[0].TurnID != "prose" || proseOnly.Results[0].MatchSource != search.SourceTurn {
+ t.Fatalf("expected only prose row; got %#v", proseOnly.Results)
+ }
+
+ withTools, err := repo.Search(context.Background(), search.Filter{Owner: session.OwnerScope{User: "alice"}, Query: "needle", IncludeToolOutputs: true, Limit: 10})
+ if err != nil {
+ t.Fatalf("Search withTools: %v", err)
+ }
+ if len(withTools.Results) != 2 {
+ t.Fatalf("expected prose + tool rows; got %#v", withTools.Results)
+ }
+}
+
+func TestSearchDedupesTurnWithBetterRankedMatch(t *testing.T) {
+ repo, db := newRepo(t)
+ seedSession(t, db, "alice", "cc", "phoebe", "s", 100, 110)
+ seedTurn(t, db, "alice", "cc", "phoebe", "s", "t1", 1, 101, "assistant", "needle filler filler filler filler filler filler filler", strptr(`{"output":"needle needle needle"}`))
+
+ got, err := repo.Search(context.Background(), search.Filter{Owner: session.OwnerScope{User: "alice"}, Query: "needle", IncludeToolOutputs: true, Limit: 10})
+ if err != nil {
+ t.Fatalf("Search: %v", err)
+ }
+ if len(got.Results) != 1 || got.Results[0].TurnID != "t1" || got.Results[0].MatchSource != search.SourceToolOutput {
+ t.Fatalf("expected one better-ranked tool row; got %#v", got.Results)
+ }
+}
+
+func TestSearchCursorUsesRankTimestampTurnIDAndMatchSourceTieBreakers(t *testing.T) {
+ repo, db := newRepo(t)
+ seedSession(t, db, "alice", "cc", "phoebe", "s", 100, 110)
+ seedTurn(t, db, "alice", "cc", "phoebe", "s", "a", 1, 101, "user", "needle", nil)
+ seedTurn(t, db, "alice", "cc", "phoebe", "s", "b", 2, 101, "user", "needle", nil)
+ seedTurn(t, db, "alice", "cc", "phoebe", "s", "c", 3, 100, "assistant", "plain", strptr(`{"output":"needle"}`))
+
+ page1, err := repo.Search(context.Background(), search.Filter{Owner: session.OwnerScope{User: "alice"}, Query: "needle", IncludeToolOutputs: true, Limit: 2})
+ if err != nil {
+ t.Fatalf("Search page1: %v", err)
+ }
+ if len(page1.Results) != 2 || page1.Results[0].TurnID != "a" || page1.Results[1].TurnID != "b" || page1.NextCursor == "" {
+ t.Fatalf("bad page1: %#v", page1)
+ }
+ cur, err := search.DecodeCursor(page1.NextCursor, search.Filter{Owner: session.OwnerScope{User: "alice"}, Query: "needle", IncludeToolOutputs: true, Limit: 99})
+ if err != nil {
+ t.Fatalf("DecodeCursor: %v", err)
+ }
+ if cur.Rank != page1.Results[1].Rank || cur.Timestamp != page1.Results[1].Timestamp || cur.TurnID != "b" || cur.MatchSource != search.SourceTurn {
+ t.Fatalf("cursor did not preserve planned tie-breakers: %#v vs row %#v", cur, page1.Results[1])
+ }
+ page2, err := repo.Search(context.Background(), search.Filter{Owner: session.OwnerScope{User: "alice"}, Query: "needle", IncludeToolOutputs: true, Limit: 2, Cursor: page1.NextCursor})
+ if err != nil {
+ t.Fatalf("Search page2: %v", err)
+ }
+ if len(page2.Results) != 1 || page2.Results[0].TurnID != "c" || page2.Results[0].MatchSource != search.SourceToolOutput || page2.NextCursor != "" {
+ t.Fatalf("bad page2: %#v", page2)
+ }
+
+ _, err = repo.Search(context.Background(), search.Filter{Owner: session.OwnerScope{User: "alice"}, Query: "other", IncludeToolOutputs: true, Limit: 1, Cursor: page1.NextCursor})
+ if codeOf(err) != "INVALID" {
+ t.Fatalf("expected INVALID for filter-bound cursor, got %q (%v)", codeOf(err), err)
+ }
+}
+
+func TestDecodeCursorRejectsMalformedInput(t *testing.T) {
+ _, err := search.DecodeCursor("not-base64", search.Filter{Owner: session.OwnerScope{User: "alice"}, Query: "needle"})
+ if codeOf(err) != "INVALID" {
+ t.Fatalf("expected INVALID, got %q (%v)", codeOf(err), err)
+ }
+}
+
+func TestSearchSnippetUsesMarkerRunes(t *testing.T) {
+ repo, db := newRepo(t)
+ seedSession(t, db, "alice", "cc", "phoebe", "s", 100, 110)
+ seedTurn(t, db, "alice", "cc", "phoebe", "s", "t1", 1, 101, "user", "alpha needle omega", nil)
+
+ got, err := repo.Search(context.Background(), search.Filter{Owner: session.OwnerScope{User: "alice"}, Query: "needle", Limit: 10})
+ if err != nil {
+ t.Fatalf("Search: %v", err)
+ }
+ if len(got.Results) != 1 || !strings.Contains(got.Results[0].Snippet, "\x02needle\x03") || strings.Contains(got.Results[0].Snippet, "<mark>") {
+ t.Fatalf("snippet missing marker bytes or contains HTML: %#v", got.Results)
+ }
+}
+
+func TestSearchInvalidAndEmptyQueriesMapToInvalid(t *testing.T) {
+ repo, db := newRepo(t)
+ seedSession(t, db, "alice", "cc", "phoebe", "s", 100, 110)
+ seedTurn(t, db, "alice", "cc", "phoebe", "s", "t1", 1, 101, "user", "alpha", nil)
+ for _, q := range []string{"", " ", `"unterminated`} {
+ _, err := repo.Search(context.Background(), search.Filter{Owner: session.OwnerScope{User: "alice"}, Query: q, Limit: 10})
+ if codeOf(err) != "INVALID" {
+ t.Fatalf("query %q: expected INVALID, got %q (%v)", q, codeOf(err), err)
+ }
+ }
+}
+
+func codeOf(err error) string {
+ var cd culpa.CodeDetail
+ if !culpa.FindDetail(err, &cd) {
+ return ""
+ }
+ s, _ := cd.Code.(string)
+ return s
+}
M internal/server/server.go => internal/server/server.go +8 -5
@@ 32,6 32,7 @@ import (
"sourcecraft.dev/bigbes/lethe/internal/domain/ingest"
"sourcecraft.dev/bigbes/lethe/internal/domain/project"
"sourcecraft.dev/bigbes/lethe/internal/domain/savedsearch"
+ "sourcecraft.dev/bigbes/lethe/internal/domain/search"
"sourcecraft.dev/bigbes/lethe/internal/domain/session"
"sourcecraft.dev/bigbes/lethe/internal/domain/stats"
"sourcecraft.dev/bigbes/lethe/internal/pkg/apierror"
@@ 57,12 58,13 @@ type Server struct {
Metrics *observability.Metrics `inject:""`
Health *health.Set `inject:""`
- Auth *authpkg.Authenticator `inject:""`
- Ingest *ingest.Handler `inject:""`
- Sessions *session.Handler `inject:""`
- Projects *project.Handler `inject:""`
- Stats *stats.Handler `inject:""`
+ Auth *authpkg.Authenticator `inject:""`
+ Ingest *ingest.Handler `inject:""`
+ Sessions *session.Handler `inject:""`
+ Projects *project.Handler `inject:""`
+ Stats *stats.Handler `inject:""`
SavedSearches *savedsearch.Handler `inject:""`
+ Search *search.Handler `inject:""`
router *chi.Mux
httpSrv *http.Server
@@ 107,6 109,7 @@ func (s *Server) Init(_ context.Context) error {
s.Projects.Mount(r)
s.Stats.Mount(r)
s.SavedSearches.Mount(r)
+ s.Search.Mount(r)
})
// SPA catch-all: serves the embedded React app for all non-API GET paths.