package ingest_test import ( "context" "encoding/json" "testing" "time" "github.com/jmoiron/sqlx" _ "modernc.org/sqlite" "sourcecraft.dev/bigbes/lethe/internal/config" "sourcecraft.dev/bigbes/lethe/internal/domain/ingest" "sourcecraft.dev/bigbes/lethe/internal/platform/database" "sourcecraft.dev/bigbes/lethe/internal/shared/wire" ) // newTestDatabase builds a Database steward against :memory:, runs Init, and // registers Cleanup. Returns the *Database (Repository expects the steward). func newTestDatabase(t *testing.T) *database.Database { t.Helper() d := &database.Database{ Cfg: config.DatabaseConfig{ Path: ":memory:", BusyTimeout: 5 * time.Second, }, } if err := d.Init(context.Background()); err != nil { t.Fatalf("database.Init: %v", err) } t.Cleanup(func() { _ = d.Destroy(context.Background()) }) return d } // newRepo wires a Repository against a fresh in-memory database. func newRepo(t *testing.T) (*ingest.Repository, *sqlx.DB) { t.Helper() d := newTestDatabase(t) repo := &ingest.Repository{Database: d} if err := repo.Init(context.Background()); err != nil { t.Fatalf("repo.Init: %v", err) } return repo, d.DB } func ptrInt64(v int64) *int64 { return &v } func ptrString(v string) *string { return &v } func ptrFloat64(v float64) *float64 { return &v } // turn builds a minimal valid TurnEvent with overridable session_meta. func turn(tool, host, sid, tid string, seq int64, ts int64, content string) wire.TurnEvent { return wire.TurnEvent{ Tool: tool, Host: host, SessionID: sid, TurnID: tid, Seq: seq, Role: "user", Timestamp: ts, Content: content, SessionMeta: wire.SessionMeta{ SourceFile: "/tmp/x.jsonl", }, } } func runUpsert(t *testing.T, repo *ingest.Repository, db *sqlx.DB, owner string, turns []wire.TurnEvent) { t.Helper() if err := database.InTx(context.Background(), db, func(tx *sqlx.Tx) error { return repo.UpsertChunk(context.Background(), tx, owner, turns) }); err != nil { t.Fatalf("upsert: %v", err) } } func TestRepository_UpsertChunk_FirstWriteWinsSession(t *testing.T) { repo, db := newRepo(t) t1 := turn("cc", "phoebe", "s1", "tA", 1, 1700000000, "hello") t1.SessionMeta.WorkingDir = ptrString("/A") runUpsert(t, repo, db, "alice", []wire.TurnEvent{t1}) t2 := turn("cc", "phoebe", "s1", "tB", 2, 1700000100, "world") t2.SessionMeta.WorkingDir = ptrString("/B") runUpsert(t, repo, db, "alice", []wire.TurnEvent{t2}) var wd string if err := db.Get(&wd, `SELECT working_dir FROM sessions WHERE owner=? AND tool=? AND host=? AND session_id=?`, "alice", "cc", "phoebe", "s1"); err != nil { t.Fatalf("query: %v", err) } if wd != "/A" { t.Fatalf("expected first-write working_dir /A, got %q", wd) } } func TestRepository_UpsertChunk_LastWriteWinsTurn(t *testing.T) { repo, db := newRepo(t) tA := turn("cc", "phoebe", "s1", "tA", 1, 1700000000, "first content") runUpsert(t, repo, db, "alice", []wire.TurnEvent{tA}) tB := turn("cc", "phoebe", "s1", "tA", 2, 1700000050, "second content") tB.Role = "assistant" runUpsert(t, repo, db, "alice", []wire.TurnEvent{tB}) var content, role string var seq int64 if err := db.QueryRow(`SELECT content, role, seq FROM turns WHERE owner=? AND turn_id=?`, "alice", "tA").Scan(&content, &role, &seq); err != nil { t.Fatalf("query: %v", err) } if content != "second content" || role != "assistant" || seq != 2 { t.Fatalf("expected last-write columns; got content=%q role=%q seq=%d", content, role, seq) } } func TestRepository_UpsertChunk_EndedAtMax(t *testing.T) { repo, db := newRepo(t) t1 := turn("cc", "phoebe", "s1", "tA", 1, 1700000100, "first") t1.SessionMeta.StartedAt = ptrInt64(1700000000) runUpsert(t, repo, db, "alice", []wire.TurnEvent{t1}) // Second turn: smaller ended_at than current; MAX must keep first. t2 := turn("cc", "phoebe", "s1", "tB", 2, 1699999999, "earlier") runUpsert(t, repo, db, "alice", []wire.TurnEvent{t2}) var ended int64 if err := db.Get(&ended, `SELECT ended_at FROM sessions WHERE owner=? AND session_id=?`, "alice", "s1"); err != nil { t.Fatalf("q1: %v", err) } if ended != 1700000100 { t.Fatalf("expected ended_at preserved at 1700000100, got %d", ended) } // Third: larger ended_at; MAX must extend. t3 := turn("cc", "phoebe", "s1", "tC", 3, 1700000500, "later") runUpsert(t, repo, db, "alice", []wire.TurnEvent{t3}) if err := db.Get(&ended, `SELECT ended_at FROM sessions WHERE owner=? AND session_id=?`, "alice", "s1"); err != nil { t.Fatalf("q2: %v", err) } if ended != 1700000500 { t.Fatalf("expected ended_at extended to 1700000500, got %d", ended) } } func TestRepository_UpsertChunk_StartedAtFallbackMinTurnTimestamp(t *testing.T) { repo, db := newRepo(t) // SessionMeta.StartedAt is nil; started_at must come from turn timestamp. t1 := turn("cc", "phoebe", "s1", "tA", 1, 1700000050, "x") runUpsert(t, repo, db, "alice", []wire.TurnEvent{t1}) var started int64 if err := db.Get(&started, `SELECT started_at FROM sessions WHERE owner=? AND session_id=?`, "alice", "s1"); err != nil { t.Fatalf("q: %v", err) } if started != 1700000050 { t.Fatalf("expected started_at fallback 1700000050, got %d", started) } // A subsequent turn with an earlier timestamp must NOT change started_at // (first-write-wins on start). t2 := turn("cc", "phoebe", "s1", "tB", 2, 1700000010, "earlier") runUpsert(t, repo, db, "alice", []wire.TurnEvent{t2}) if err := db.Get(&started, `SELECT started_at FROM sessions WHERE owner=? AND session_id=?`, "alice", "s1"); err != nil { t.Fatalf("q2: %v", err) } if started != 1700000050 { t.Fatalf("expected started_at preserved at 1700000050, got %d", started) } } func TestRepository_UpsertChunk_PerOwnerIsolation(t *testing.T) { repo, db := newRepo(t) tA := turn("cc", "phoebe", "s1", "tA", 1, 1700000000, "shared") runUpsert(t, repo, db, "alice", []wire.TurnEvent{tA}) runUpsert(t, repo, db, "bob", []wire.TurnEvent{tA}) var rows int if err := db.Get(&rows, `SELECT COUNT(*) FROM sessions WHERE tool='cc' AND host='phoebe' AND session_id='s1'`); err != nil { t.Fatalf("q: %v", err) } if rows != 2 { t.Fatalf("expected two sessions across owners, got %d", rows) } if err := db.Get(&rows, `SELECT COUNT(*) FROM turns WHERE turn_id='tA'`); err != nil { t.Fatalf("q2: %v", err) } if rows != 2 { t.Fatalf("expected two turns across owners, got %d", rows) } } func TestRepository_UpsertChunk_FKViolationPropagatesAsLineLevelCode(t *testing.T) { // Repository alone cannot produce an FK violation — sessions are upserted // by the same call. We synthesize the violation by inserting a turn // referencing a (owner, tool, host, session_id) that does not exist; // this proves the FK wrapper code-path is reachable via the Repository's // raw exec layer for the Service to classify as line-level. d := newTestDatabase(t) tx, err := d.DB.BeginTxx(context.Background(), nil) if err != nil { t.Fatalf("begin: %v", err) } defer func() { _ = tx.Rollback() }() _, err = tx.Exec(` INSERT INTO turns (owner, tool, host, session_id, turn_id, seq, role, timestamp, content) VALUES ('alice','cc','phoebe','ghost','tX',1,'user',1,'orphan')`) if err == nil { t.Fatalf("expected FK violation, got nil") } } func TestRepository_UpsertChunk_OwnerBoundFromParameter(t *testing.T) { // Sneak a non-wire field into the JSON; unmarshalling drops it. Then // confirm the stored row uses the parameter, not whatever the wire said. repo, db := newRepo(t) raw := []byte(`{"owner":"evil","tool":"cc","host":"phoebe","session_id":"s1","turn_id":"tA","seq":1,"role":"user","timestamp":1700000000,"content":"hi","session_meta":{"source_file":"/tmp/x.jsonl"}}`) var ev wire.TurnEvent if err := json.Unmarshal(raw, &ev); err != nil { t.Fatalf("unmarshal: %v", err) } runUpsert(t, repo, db, "alice", []wire.TurnEvent{ev}) var owner string if err := db.Get(&owner, `SELECT owner FROM turns WHERE turn_id='tA'`); err != nil { t.Fatalf("q: %v", err) } if owner != "alice" { t.Fatalf("expected stored owner=alice, got %q", owner) } var n int if err := db.Get(&n, `SELECT COUNT(*) FROM turns WHERE owner='evil'`); err != nil { t.Fatalf("q evil: %v", err) } if n != 0 { t.Fatalf("expected zero rows for owner=evil, got %d", n) } } func TestRepository_UpsertChunk_FTSPopulated(t *testing.T) { repo, db := newRepo(t) tc := json.RawMessage(`{"name":"shell","args":"ls"}`) t1 := turn("cc", "phoebe", "s1", "tA", 1, 1700000000, "find the needle") t1.ToolCalls = tc runUpsert(t, repo, db, "alice", []wire.TurnEvent{t1}) var n int if err := db.Get(&n, `SELECT COUNT(*) FROM turns_fts WHERE owner='alice' AND turns_fts MATCH 'needle'`); err != nil { t.Fatalf("q turns_fts: %v", err) } if n != 1 { t.Fatalf("expected 1 turns_fts hit, got %d", n) } if err := db.Get(&n, `SELECT COUNT(*) FROM tool_outputs_fts WHERE owner='alice' AND tool_outputs_fts MATCH 'shell'`); err != nil { t.Fatalf("q tool_outputs_fts: %v", err) } if n != 1 { t.Fatalf("expected 1 tool_outputs_fts hit, got %d", n) } } // Verify metadata json.RawMessage round-trips as opaque text into the row. func TestRepository_UpsertChunk_MetadataPersisted(t *testing.T) { repo, db := newRepo(t) t1 := turn("cc", "phoebe", "s1", "tA", 1, 1700000000, "x") t1.Metadata = json.RawMessage(`{"k":"v"}`) t1.SessionMeta.Metadata = json.RawMessage(`{"sk":"sv"}`) runUpsert(t, repo, db, "alice", []wire.TurnEvent{t1}) var tm, sm string if err := db.QueryRow(`SELECT metadata FROM turns WHERE turn_id='tA'`).Scan(&tm); err != nil { t.Fatalf("q tm: %v", err) } if tm != `{"k":"v"}` { t.Fatalf("turn metadata roundtrip failed: %q", tm) } if err := db.QueryRow(`SELECT metadata FROM sessions WHERE session_id='s1'`).Scan(&sm); err != nil { t.Fatalf("q sm: %v", err) } if sm != `{"sk":"sv"}` { t.Fatalf("session metadata roundtrip failed: %q", sm) } } // Defensive: pass a free *float64 to ensure pointer types serialize cleanly. func TestRepository_UpsertChunk_OptionalPointersStored(t *testing.T) { repo, db := newRepo(t) t1 := turn("cc", "phoebe", "s1", "tA", 1, 1700000000, "x") t1.Model = ptrString("gpt-x") t1.TokensIn = ptrInt64(10) t1.TokensOut = ptrInt64(20) t1.CostUSD = ptrFloat64(0.12) runUpsert(t, repo, db, "alice", []wire.TurnEvent{t1}) var ( model string tokensIn int64 tokensOut int64 cost float64 ) if err := db.QueryRow(`SELECT model, tokens_in, tokens_out, cost_usd FROM turns WHERE turn_id='tA'`). Scan(&model, &tokensIn, &tokensOut, &cost); err != nil { t.Fatalf("q: %v", err) } if model != "gpt-x" || tokensIn != 10 || tokensOut != 20 || cost != 0.12 { t.Fatalf("optional fields not round-tripped: %v %v %v %v", model, tokensIn, tokensOut, cost) } }