package ingest_test
import (
"context"
"fmt"
"strings"
"testing"
"sourcecraft.dev/bigbes/lethe/internal/config"
"sourcecraft.dev/bigbes/lethe/internal/domain/ingest"
"sourcecraft.dev/bigbes/lethe/internal/platform/database"
"sourcecraft.dev/bigbes/lethe/internal/platform/observability"
)
// newService wires Service+Repository+Logger+Metrics against a fresh in-memory
// DB. Returns the Service and the underlying *Database for direct queries.
func newService(t *testing.T, cfg config.IngestConfig) (*ingest.Service, *database.Database) {
t.Helper()
d := newTestDatabase(t)
repo := &ingest.Repository{Database: d}
if err := repo.Init(context.Background()); err != nil {
t.Fatalf("repo.Init: %v", err)
}
logger := &observability.Logger{Cfg: config.LoggingConfig{Level: "info", Format: "json"}}
if err := logger.Init(context.Background()); err != nil {
t.Fatalf("logger.Init: %v", err)
}
metrics := &observability.Metrics{}
if err := metrics.Init(context.Background()); err != nil {
t.Fatalf("metrics.Init: %v", err)
}
svc := &ingest.Service{
Cfg: cfg,
Repo: repo,
Log: logger,
Metrics: metrics,
}
if err := svc.Init(context.Background()); err != nil {
t.Fatalf("svc.Init: %v", err)
}
return svc, d
}
// defaultCfg is the canonical IngestConfig used by most tests.
func defaultCfg() config.IngestConfig {
return config.IngestConfig{
MaxBodyBytes: 1 << 20, // 1 MiB
MaxTurnContentBytes: 4096,
ChunkSize: 10,
}
}
// validLine produces a single NDJSON line with the supplied turn_id; all
// other fields are minimal-but-valid.
func validLine(turnID string, ts int64, content string) string {
return fmt.Sprintf(
`{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":%q,"seq":1,"role":"user","timestamp":%d,"content":%q,"session_meta":{"source_file":"/tmp/x.jsonl"}}`,
turnID, ts, content,
)
}
func body(lines ...string) *strings.Reader {
return strings.NewReader(strings.Join(lines, "\n") + "\n")
}
// --- validation matrix ----------------------------------------------------
func TestService_Ingest_RequiredFieldsRejected(t *testing.T) {
cases := []struct {
name string
line string
}{
{"missing tool", `{"host":"phoebe","session_id":"s1","turn_id":"t","seq":1,"role":"user","timestamp":1,"content":"x","session_meta":{"source_file":"/x"}}`},
{"missing host", `{"tool":"cc","session_id":"s1","turn_id":"t","seq":1,"role":"user","timestamp":1,"content":"x","session_meta":{"source_file":"/x"}}`},
{"missing session_id", `{"tool":"cc","host":"phoebe","turn_id":"t","seq":1,"role":"user","timestamp":1,"content":"x","session_meta":{"source_file":"/x"}}`},
{"missing turn_id", `{"tool":"cc","host":"phoebe","session_id":"s1","seq":1,"role":"user","timestamp":1,"content":"x","session_meta":{"source_file":"/x"}}`},
{"missing role", `{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":"t","seq":1,"timestamp":1,"content":"x","session_meta":{"source_file":"/x"}}`},
{"missing timestamp", `{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":"t","seq":1,"role":"user","content":"x","session_meta":{"source_file":"/x"}}`},
{"missing content", `{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":"t","seq":1,"role":"user","timestamp":1,"session_meta":{"source_file":"/x"}}`},
{"missing source_file", `{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":"t","seq":1,"role":"user","timestamp":1,"content":"x","session_meta":{}}`},
{"bad role", `{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":"t","seq":1,"role":"hacker","timestamp":1,"content":"x","session_meta":{"source_file":"/x"}}`},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
svc, _ := newService(t, defaultCfg())
res, err := svc.Ingest(context.Background(), "alice", strings.NewReader(tc.line+"\n"), 1<<20)
if err != nil {
t.Fatalf("Ingest err: %v", err)
}
if res.Accepted != 0 {
t.Errorf("Accepted=%d; want 0", res.Accepted)
}
if len(res.Errors) != 1 || res.Errors[0].Line != 1 {
t.Fatalf("expected one LineError on line 1, got %+v", res.Errors)
}
})
}
}
func TestService_Ingest_ContentOverCapRejected(t *testing.T) {
cfg := defaultCfg()
cfg.MaxTurnContentBytes = 16
svc, _ := newService(t, cfg)
line := validLine("tA", 1700000000, strings.Repeat("a", 17))
res, err := svc.Ingest(context.Background(), "alice", strings.NewReader(line+"\n"), 1<<20)
if err != nil {
t.Fatalf("Ingest: %v", err)
}
if res.Accepted != 0 || len(res.Errors) != 1 {
t.Fatalf("expected 0 accepted + 1 error, got %+v", res)
}
}
func TestService_Ingest_SourceFileOverCapRejected(t *testing.T) {
svc, _ := newService(t, defaultCfg())
long := strings.Repeat("p", 1025)
line := fmt.Sprintf(
`{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":"t","seq":1,"role":"user","timestamp":1,"content":"x","session_meta":{"source_file":%q}}`,
long,
)
res, err := svc.Ingest(context.Background(), "alice", strings.NewReader(line+"\n"), 1<<20)
if err != nil {
t.Fatalf("Ingest: %v", err)
}
if res.Accepted != 0 || len(res.Errors) != 1 {
t.Fatalf("expected 0 accepted + 1 error, got %+v", res)
}
}
func TestService_Ingest_ValidationFailureCommitsPriorChunks(t *testing.T) {
cfg := defaultCfg()
cfg.ChunkSize = 2
svc, d := newService(t, cfg)
// 2 valid lines (commit as chunk 1) + a third bad line.
lines := []string{
validLine("t1", 1700000000, "one"),
validLine("t2", 1700000001, "two"),
`{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":"t3","seq":1,"role":"hacker","timestamp":1,"content":"x","session_meta":{"source_file":"/x"}}`,
}
res, err := svc.Ingest(context.Background(), "alice", body(lines...), 1<<20)
if err != nil {
t.Fatalf("Ingest: %v", err)
}
if res.Accepted != 2 {
t.Errorf("Accepted=%d; want 2", res.Accepted)
}
if len(res.Errors) != 1 || res.Errors[0].Line != 3 {
t.Fatalf("expected one error on line 3, got %+v", res.Errors)
}
var n int
if err := d.DB.Get(&n, `SELECT COUNT(*) FROM turns WHERE owner='alice'`); err != nil {
t.Fatalf("q: %v", err)
}
if n != 2 {
t.Errorf("expected 2 committed turns, got %d", n)
}
}
// --- semantics ------------------------------------------------------------
func TestService_Ingest_Idempotent(t *testing.T) {
svc, d := newService(t, defaultCfg())
lines := []string{
validLine("t1", 1700000000, "alpha"),
validLine("t2", 1700000001, "beta"),
}
for i := 0; i < 2; i++ {
res, err := svc.Ingest(context.Background(), "alice", body(lines...), 1<<20)
if err != nil {
t.Fatalf("ingest %d: %v", i, err)
}
if res.Accepted != 2 {
t.Errorf("Accepted=%d; want 2 on iteration %d", res.Accepted, i)
}
}
var rows int
if err := d.DB.Get(&rows, `SELECT COUNT(*) FROM turns`); err != nil {
t.Fatalf("q: %v", err)
}
if rows != 2 {
t.Errorf("expected 2 turns after re-ingest, got %d", rows)
}
var sessions int
if err := d.DB.Get(&sessions, `SELECT COUNT(*) FROM sessions`); err != nil {
t.Fatalf("q: %v", err)
}
if sessions != 1 {
t.Errorf("expected 1 session, got %d", sessions)
}
}
func TestService_Ingest_FirstWriteWinsSession(t *testing.T) {
svc, d := newService(t, defaultCfg())
line1 := `{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":"t1","seq":1,"role":"user","timestamp":1700000000,"content":"x","session_meta":{"source_file":"/A","working_dir":"/A"}}`
if _, err := svc.Ingest(context.Background(), "alice", strings.NewReader(line1+"\n"), 1<<20); err != nil {
t.Fatalf("ingest 1: %v", err)
}
line2 := `{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":"t2","seq":2,"role":"user","timestamp":1700000100,"content":"y","session_meta":{"source_file":"/B","working_dir":"/B"}}`
if _, err := svc.Ingest(context.Background(), "alice", strings.NewReader(line2+"\n"), 1<<20); err != nil {
t.Fatalf("ingest 2: %v", err)
}
var wd, sf string
if err := d.DB.QueryRow(`SELECT working_dir, source_file FROM sessions WHERE owner=? AND session_id=?`,
"alice", "s1").Scan(&wd, &sf); err != nil {
t.Fatalf("q: %v", err)
}
if wd != "/A" || sf != "/A" {
t.Errorf("first-write-wins broken; working_dir=%q source_file=%q", wd, sf)
}
}
func TestService_Ingest_LastWriteWinsTurn(t *testing.T) {
svc, d := newService(t, defaultCfg())
if _, err := svc.Ingest(context.Background(), "alice",
strings.NewReader(validLine("t1", 1700000000, "first")+"\n"), 1<<20); err != nil {
t.Fatalf("ingest 1: %v", err)
}
if _, err := svc.Ingest(context.Background(), "alice",
strings.NewReader(validLine("t1", 1700000100, "second")+"\n"), 1<<20); err != nil {
t.Fatalf("ingest 2: %v", err)
}
var content string
if err := d.DB.Get(&content, `SELECT content FROM turns WHERE turn_id='t1'`); err != nil {
t.Fatalf("q: %v", err)
}
if content != "second" {
t.Errorf("last-write-wins broken; content=%q", content)
}
}
func TestService_Ingest_EndedAtExtends(t *testing.T) {
svc, d := newService(t, defaultCfg())
if _, err := svc.Ingest(context.Background(), "alice",
strings.NewReader(validLine("t1", 1700000000, "x")+"\n"), 1<<20); err != nil {
t.Fatalf("ingest 1: %v", err)
}
if _, err := svc.Ingest(context.Background(), "alice",
strings.NewReader(validLine("t2", 1700000500, "y")+"\n"), 1<<20); err != nil {
t.Fatalf("ingest 2: %v", err)
}
var ended int64
if err := d.DB.Get(&ended, `SELECT ended_at FROM sessions WHERE owner=? AND session_id=?`, "alice", "s1"); err != nil {
t.Fatalf("q: %v", err)
}
if ended != 1700000500 {
t.Errorf("expected ended_at=1700000500, got %d", ended)
}
}
func TestService_Ingest_StartedAtFallback(t *testing.T) {
svc, d := newService(t, defaultCfg())
// SessionMeta has no started_at; first turn timestamp wins.
if _, err := svc.Ingest(context.Background(), "alice",
strings.NewReader(validLine("t1", 1700000123, "x")+"\n"), 1<<20); err != nil {
t.Fatalf("ingest: %v", err)
}
var started int64
if err := d.DB.Get(&started, `SELECT started_at FROM sessions WHERE owner=? AND session_id=?`, "alice", "s1"); err != nil {
t.Fatalf("q: %v", err)
}
if started != 1700000123 {
t.Errorf("expected started_at=1700000123, got %d", started)
}
}
func TestService_Ingest_ChunkedPartialAcceptBadLineInChunk3(t *testing.T) {
cfg := defaultCfg()
cfg.ChunkSize = 2
svc, d := newService(t, cfg)
lines := []string{
validLine("t1", 1700000000, "a"),
validLine("t2", 1700000001, "b"),
validLine("t3", 1700000002, "c"),
validLine("t4", 1700000003, "d"),
// Bad line in chunk 3 (line index 5).
`{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":"t5","seq":1,"role":"hacker","timestamp":1,"content":"x","session_meta":{"source_file":"/x"}}`,
validLine("t6", 1700000005, "f"),
}
res, err := svc.Ingest(context.Background(), "alice", body(lines...), 1<<20)
if err != nil {
t.Fatalf("ingest: %v", err)
}
if res.Accepted != 4 {
t.Errorf("Accepted=%d; want 4 (2 * chunkSize)", res.Accepted)
}
if len(res.Errors) != 1 || res.Errors[0].Line != 5 {
t.Fatalf("expected one error referencing line 5, got %+v", res.Errors)
}
// Line 6 must not be processed.
var n int
if err := d.DB.Get(&n, `SELECT COUNT(*) FROM turns WHERE turn_id='t6'`); err != nil {
t.Fatalf("q: %v", err)
}
if n != 0 {
t.Errorf("expected line 6 not processed, found %d rows", n)
}
if err := d.DB.Get(&n, `SELECT COUNT(*) FROM turns`); err != nil {
t.Fatalf("q: %v", err)
}
if n != 4 {
t.Errorf("expected 4 committed turns, got %d", n)
}
}
func TestService_Ingest_PerUserIsolation(t *testing.T) {
svc, d := newService(t, defaultCfg())
lines := []string{validLine("t1", 1700000000, "x"), validLine("t2", 1700000001, "y")}
if _, err := svc.Ingest(context.Background(), "alice", body(lines...), 1<<20); err != nil {
t.Fatalf("alice ingest: %v", err)
}
if _, err := svc.Ingest(context.Background(), "bob", body(lines...), 1<<20); err != nil {
t.Fatalf("bob ingest: %v", err)
}
var n int
if err := d.DB.Get(&n, `SELECT COUNT(*) FROM sessions`); err != nil {
t.Fatalf("q sessions: %v", err)
}
if n != 2 {
t.Errorf("expected 2 sessions across owners, got %d", n)
}
if err := d.DB.Get(&n, `SELECT COUNT(*) FROM turns`); err != nil {
t.Fatalf("q turns: %v", err)
}
if n != 4 {
t.Errorf("expected 4 turns across owners, got %d", n)
}
// Mutating bob's session must not touch alice's row.
if _, err := d.DB.Exec(`UPDATE sessions SET ended_at = 1 WHERE owner='bob'`); err != nil {
t.Fatalf("bob mutate: %v", err)
}
var aliceEnded int64
if err := d.DB.Get(&aliceEnded, `SELECT ended_at FROM sessions WHERE owner='alice'`); err != nil {
t.Fatalf("q alice: %v", err)
}
if aliceEnded == 1 {
t.Errorf("bob's update bled into alice's row")
}
}
func TestService_Ingest_WireOwnerIgnored(t *testing.T) {
svc, d := newService(t, defaultCfg())
// Stray "owner":"evil" in the wire payload — must be dropped silently.
line := `{"owner":"evil","tool":"cc","host":"phoebe","session_id":"s1","turn_id":"t1","seq":1,"role":"user","timestamp":1700000000,"content":"x","session_meta":{"source_file":"/x"}}`
if _, err := svc.Ingest(context.Background(), "alice", strings.NewReader(line+"\n"), 1<<20); err != nil {
t.Fatalf("ingest: %v", err)
}
var owner string
if err := d.DB.Get(&owner, `SELECT owner FROM turns WHERE turn_id='t1'`); err != nil {
t.Fatalf("q: %v", err)
}
if owner != "alice" {
t.Fatalf("expected owner=alice, got %q", owner)
}
var n int
if err := d.DB.Get(&n, `SELECT COUNT(*) FROM turns WHERE owner='evil'`); err != nil {
t.Fatalf("q evil: %v", err)
}
if n != 0 {
t.Fatalf("expected zero rows for owner=evil, got %d", n)
}
}
func TestService_Ingest_FTSAfterIngest(t *testing.T) {
svc, d := newService(t, defaultCfg())
line := `{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":"tA","seq":1,"role":"user","timestamp":1700000000,"content":"unmistakable haystack","tool_calls":{"name":"shellexec"},"session_meta":{"source_file":"/x"}}`
if _, err := svc.Ingest(context.Background(), "alice", strings.NewReader(line+"\n"), 1<<20); err != nil {
t.Fatalf("ingest: %v", err)
}
var n int
if err := d.DB.Get(&n, `SELECT COUNT(*) FROM turns_fts WHERE owner='alice' AND turns_fts MATCH 'unmistakable'`); err != nil {
t.Fatalf("q turns_fts: %v", err)
}
if n != 1 {
t.Fatalf("expected 1 turns_fts hit, got %d", n)
}
if err := d.DB.Get(&n, `SELECT COUNT(*) FROM tool_outputs_fts WHERE owner='alice' AND tool_outputs_fts MATCH 'shellexec'`); err != nil {
t.Fatalf("q tool_outputs_fts: %v", err)
}
if n != 1 {
t.Fatalf("expected 1 tool_outputs_fts hit, got %d", n)
}
}
func TestService_Ingest_DBDownReturnsError(t *testing.T) {
svc, d := newService(t, defaultCfg())
// Close the underlying *sql.DB without nilling the field; the next
// BeginTx returns sql.ErrConnDone (or similar), simulating a real
// runtime "database closed" failure without crashing the test.
if err := d.DB.Close(); err != nil {
t.Fatalf("close: %v", err)
}
res, err := svc.Ingest(context.Background(), "alice",
strings.NewReader(validLine("t1", 1700000000, "x")+"\n"), 1<<20)
if err == nil {
t.Fatalf("expected error, got nil; result=%+v", res)
}
if res.Accepted != 0 {
t.Errorf("expected Accepted=0 on DB-down, got %d", res.Accepted)
}
}
func TestService_Ingest_BodyOverCapReturnsTooLarge(t *testing.T) {
cfg := defaultCfg()
cfg.MaxBodyBytes = 100
svc, _ := newService(t, cfg)
// 1 KiB payload that does not fit.
line := validLine("t1", 1700000000, strings.Repeat("a", 1024))
res, err := svc.Ingest(context.Background(), "alice", strings.NewReader(line+"\n"), cfg.MaxBodyBytes)
if err == nil {
t.Fatalf("expected TOO_LARGE error, got nil; result=%+v", res)
}
// We don't import apierror here; just assert err contains the message.
if !strings.Contains(err.Error(), "limit") && !strings.Contains(err.Error(), "long") {
t.Errorf("expected too-large/limit error, got %v", err)
}
if res.Accepted != 0 {
t.Errorf("Accepted=%d; want 0 on body cap", res.Accepted)
}
}