package ingest_test
import (
"context"
"encoding/json"
"testing"
"time"
"github.com/jmoiron/sqlx"
_ "modernc.org/sqlite"
"sourcecraft.dev/bigbes/lethe/internal/config"
"sourcecraft.dev/bigbes/lethe/internal/domain/ingest"
"sourcecraft.dev/bigbes/lethe/internal/platform/database"
"sourcecraft.dev/bigbes/lethe/internal/shared/wire"
)
// newTestDatabase builds a Database steward against :memory:, runs Init, and
// registers Cleanup. Returns the *Database (Repository expects the steward).
func newTestDatabase(t *testing.T) *database.Database {
t.Helper()
d := &database.Database{
Cfg: config.DatabaseConfig{
Path: ":memory:",
BusyTimeout: 5 * time.Second,
},
}
if err := d.Init(context.Background()); err != nil {
t.Fatalf("database.Init: %v", err)
}
t.Cleanup(func() { _ = d.Destroy(context.Background()) })
return d
}
// newRepo wires a Repository against a fresh in-memory database.
func newRepo(t *testing.T) (*ingest.Repository, *sqlx.DB) {
t.Helper()
d := newTestDatabase(t)
repo := &ingest.Repository{Database: d}
if err := repo.Init(context.Background()); err != nil {
t.Fatalf("repo.Init: %v", err)
}
return repo, d.DB
}
func ptrInt64(v int64) *int64 { return &v }
func ptrString(v string) *string { return &v }
func ptrFloat64(v float64) *float64 { return &v }
// turn builds a minimal valid TurnEvent with overridable session_meta.
func turn(tool, host, sid, tid string, seq int64, ts int64, content string) wire.TurnEvent {
return wire.TurnEvent{
Tool: tool,
Host: host,
SessionID: sid,
TurnID: tid,
Seq: seq,
Role: "user",
Timestamp: ts,
Content: content,
SessionMeta: wire.SessionMeta{
SourceFile: "/tmp/x.jsonl",
},
}
}
func runUpsert(t *testing.T, repo *ingest.Repository, db *sqlx.DB, owner string, turns []wire.TurnEvent) {
t.Helper()
if err := database.InTx(context.Background(), db, func(tx *sqlx.Tx) error {
return repo.UpsertChunk(context.Background(), tx, owner, turns)
}); err != nil {
t.Fatalf("upsert: %v", err)
}
}
func TestRepository_UpsertChunk_FirstWriteWinsSession(t *testing.T) {
repo, db := newRepo(t)
t1 := turn("cc", "phoebe", "s1", "tA", 1, 1700000000, "hello")
t1.SessionMeta.WorkingDir = ptrString("/A")
runUpsert(t, repo, db, "alice", []wire.TurnEvent{t1})
t2 := turn("cc", "phoebe", "s1", "tB", 2, 1700000100, "world")
t2.SessionMeta.WorkingDir = ptrString("/B")
runUpsert(t, repo, db, "alice", []wire.TurnEvent{t2})
var wd string
if err := db.Get(&wd, `SELECT working_dir FROM sessions WHERE owner=? AND tool=? AND host=? AND session_id=?`,
"alice", "cc", "phoebe", "s1"); err != nil {
t.Fatalf("query: %v", err)
}
if wd != "/A" {
t.Fatalf("expected first-write working_dir /A, got %q", wd)
}
}
func TestRepository_UpsertChunk_LastWriteWinsTurn(t *testing.T) {
repo, db := newRepo(t)
tA := turn("cc", "phoebe", "s1", "tA", 1, 1700000000, "first content")
runUpsert(t, repo, db, "alice", []wire.TurnEvent{tA})
tB := turn("cc", "phoebe", "s1", "tA", 2, 1700000050, "second content")
tB.Role = "assistant"
runUpsert(t, repo, db, "alice", []wire.TurnEvent{tB})
var content, role string
var seq int64
if err := db.QueryRow(`SELECT content, role, seq FROM turns WHERE owner=? AND turn_id=?`,
"alice", "tA").Scan(&content, &role, &seq); err != nil {
t.Fatalf("query: %v", err)
}
if content != "second content" || role != "assistant" || seq != 2 {
t.Fatalf("expected last-write columns; got content=%q role=%q seq=%d", content, role, seq)
}
}
func TestRepository_UpsertChunk_EndedAtMax(t *testing.T) {
repo, db := newRepo(t)
t1 := turn("cc", "phoebe", "s1", "tA", 1, 1700000100, "first")
t1.SessionMeta.StartedAt = ptrInt64(1700000000)
runUpsert(t, repo, db, "alice", []wire.TurnEvent{t1})
// Second turn: smaller ended_at than current; MAX must keep first.
t2 := turn("cc", "phoebe", "s1", "tB", 2, 1699999999, "earlier")
runUpsert(t, repo, db, "alice", []wire.TurnEvent{t2})
var ended int64
if err := db.Get(&ended, `SELECT ended_at FROM sessions WHERE owner=? AND session_id=?`, "alice", "s1"); err != nil {
t.Fatalf("q1: %v", err)
}
if ended != 1700000100 {
t.Fatalf("expected ended_at preserved at 1700000100, got %d", ended)
}
// Third: larger ended_at; MAX must extend.
t3 := turn("cc", "phoebe", "s1", "tC", 3, 1700000500, "later")
runUpsert(t, repo, db, "alice", []wire.TurnEvent{t3})
if err := db.Get(&ended, `SELECT ended_at FROM sessions WHERE owner=? AND session_id=?`, "alice", "s1"); err != nil {
t.Fatalf("q2: %v", err)
}
if ended != 1700000500 {
t.Fatalf("expected ended_at extended to 1700000500, got %d", ended)
}
}
func TestRepository_UpsertChunk_StartedAtFallbackMinTurnTimestamp(t *testing.T) {
repo, db := newRepo(t)
// SessionMeta.StartedAt is nil; started_at must come from turn timestamp.
t1 := turn("cc", "phoebe", "s1", "tA", 1, 1700000050, "x")
runUpsert(t, repo, db, "alice", []wire.TurnEvent{t1})
var started int64
if err := db.Get(&started, `SELECT started_at FROM sessions WHERE owner=? AND session_id=?`, "alice", "s1"); err != nil {
t.Fatalf("q: %v", err)
}
if started != 1700000050 {
t.Fatalf("expected started_at fallback 1700000050, got %d", started)
}
// A subsequent turn with an earlier timestamp must NOT change started_at
// (first-write-wins on start).
t2 := turn("cc", "phoebe", "s1", "tB", 2, 1700000010, "earlier")
runUpsert(t, repo, db, "alice", []wire.TurnEvent{t2})
if err := db.Get(&started, `SELECT started_at FROM sessions WHERE owner=? AND session_id=?`, "alice", "s1"); err != nil {
t.Fatalf("q2: %v", err)
}
if started != 1700000050 {
t.Fatalf("expected started_at preserved at 1700000050, got %d", started)
}
}
func TestRepository_UpsertChunk_PerOwnerIsolation(t *testing.T) {
repo, db := newRepo(t)
tA := turn("cc", "phoebe", "s1", "tA", 1, 1700000000, "shared")
runUpsert(t, repo, db, "alice", []wire.TurnEvent{tA})
runUpsert(t, repo, db, "bob", []wire.TurnEvent{tA})
var rows int
if err := db.Get(&rows, `SELECT COUNT(*) FROM sessions WHERE tool='cc' AND host='phoebe' AND session_id='s1'`); err != nil {
t.Fatalf("q: %v", err)
}
if rows != 2 {
t.Fatalf("expected two sessions across owners, got %d", rows)
}
if err := db.Get(&rows, `SELECT COUNT(*) FROM turns WHERE turn_id='tA'`); err != nil {
t.Fatalf("q2: %v", err)
}
if rows != 2 {
t.Fatalf("expected two turns across owners, got %d", rows)
}
}
func TestRepository_UpsertChunk_FKViolationPropagatesAsLineLevelCode(t *testing.T) {
// Repository alone cannot produce an FK violation — sessions are upserted
// by the same call. We synthesize the violation by inserting a turn
// referencing a (owner, tool, host, session_id) that does not exist;
// this proves the FK wrapper code-path is reachable via the Repository's
// raw exec layer for the Service to classify as line-level.
d := newTestDatabase(t)
tx, err := d.DB.BeginTxx(context.Background(), nil)
if err != nil {
t.Fatalf("begin: %v", err)
}
defer func() { _ = tx.Rollback() }()
_, err = tx.Exec(`
INSERT INTO turns (owner, tool, host, session_id, turn_id, seq, role, timestamp, content)
VALUES ('alice','cc','phoebe','ghost','tX',1,'user',1,'orphan')`)
if err == nil {
t.Fatalf("expected FK violation, got nil")
}
}
func TestRepository_UpsertChunk_OwnerBoundFromParameter(t *testing.T) {
// Sneak a non-wire field into the JSON; unmarshalling drops it. Then
// confirm the stored row uses the parameter, not whatever the wire said.
repo, db := newRepo(t)
raw := []byte(`{"owner":"evil","tool":"cc","host":"phoebe","session_id":"s1","turn_id":"tA","seq":1,"role":"user","timestamp":1700000000,"content":"hi","session_meta":{"source_file":"/tmp/x.jsonl"}}`)
var ev wire.TurnEvent
if err := json.Unmarshal(raw, &ev); err != nil {
t.Fatalf("unmarshal: %v", err)
}
runUpsert(t, repo, db, "alice", []wire.TurnEvent{ev})
var owner string
if err := db.Get(&owner, `SELECT owner FROM turns WHERE turn_id='tA'`); err != nil {
t.Fatalf("q: %v", err)
}
if owner != "alice" {
t.Fatalf("expected stored owner=alice, got %q", owner)
}
var n int
if err := db.Get(&n, `SELECT COUNT(*) FROM turns WHERE owner='evil'`); err != nil {
t.Fatalf("q evil: %v", err)
}
if n != 0 {
t.Fatalf("expected zero rows for owner=evil, got %d", n)
}
}
func TestRepository_UpsertChunk_FTSPopulated(t *testing.T) {
repo, db := newRepo(t)
tc := json.RawMessage(`{"name":"shell","args":"ls"}`)
t1 := turn("cc", "phoebe", "s1", "tA", 1, 1700000000, "find the needle")
t1.ToolCalls = tc
runUpsert(t, repo, db, "alice", []wire.TurnEvent{t1})
var n int
if err := db.Get(&n, `SELECT COUNT(*) FROM turns_fts WHERE owner='alice' AND turns_fts MATCH 'needle'`); err != nil {
t.Fatalf("q turns_fts: %v", err)
}
if n != 1 {
t.Fatalf("expected 1 turns_fts hit, got %d", n)
}
if err := db.Get(&n, `SELECT COUNT(*) FROM tool_outputs_fts WHERE owner='alice' AND tool_outputs_fts MATCH 'shell'`); err != nil {
t.Fatalf("q tool_outputs_fts: %v", err)
}
if n != 1 {
t.Fatalf("expected 1 tool_outputs_fts hit, got %d", n)
}
}
// Verify metadata json.RawMessage round-trips as opaque text into the row.
func TestRepository_UpsertChunk_MetadataPersisted(t *testing.T) {
repo, db := newRepo(t)
t1 := turn("cc", "phoebe", "s1", "tA", 1, 1700000000, "x")
t1.Metadata = json.RawMessage(`{"k":"v"}`)
t1.SessionMeta.Metadata = json.RawMessage(`{"sk":"sv"}`)
runUpsert(t, repo, db, "alice", []wire.TurnEvent{t1})
var tm, sm string
if err := db.QueryRow(`SELECT metadata FROM turns WHERE turn_id='tA'`).Scan(&tm); err != nil {
t.Fatalf("q tm: %v", err)
}
if tm != `{"k":"v"}` {
t.Fatalf("turn metadata roundtrip failed: %q", tm)
}
if err := db.QueryRow(`SELECT metadata FROM sessions WHERE session_id='s1'`).Scan(&sm); err != nil {
t.Fatalf("q sm: %v", err)
}
if sm != `{"sk":"sv"}` {
t.Fatalf("session metadata roundtrip failed: %q", sm)
}
}
// Defensive: pass a free *float64 to ensure pointer types serialize cleanly.
func TestRepository_UpsertChunk_OptionalPointersStored(t *testing.T) {
repo, db := newRepo(t)
t1 := turn("cc", "phoebe", "s1", "tA", 1, 1700000000, "x")
t1.Model = ptrString("gpt-x")
t1.TokensIn = ptrInt64(10)
t1.TokensOut = ptrInt64(20)
t1.CostUSD = ptrFloat64(0.12)
runUpsert(t, repo, db, "alice", []wire.TurnEvent{t1})
var (
model string
tokensIn int64
tokensOut int64
cost float64
)
if err := db.QueryRow(`SELECT model, tokens_in, tokens_out, cost_usd FROM turns WHERE turn_id='tA'`).
Scan(&model, &tokensIn, &tokensOut, &cost); err != nil {
t.Fatalf("q: %v", err)
}
if model != "gpt-x" || tokensIn != 10 || tokensOut != 20 || cost != 0.12 {
t.Fatalf("optional fields not round-tripped: %v %v %v %v", model, tokensIn, tokensOut, cost)
}
}