package ingest_test import ( "context" "fmt" "strings" "testing" "sourcecraft.dev/bigbes/lethe/internal/config" "sourcecraft.dev/bigbes/lethe/internal/domain/ingest" "sourcecraft.dev/bigbes/lethe/internal/platform/database" "sourcecraft.dev/bigbes/lethe/internal/platform/observability" ) // newService wires Service+Repository+Logger+Metrics against a fresh in-memory // DB. Returns the Service and the underlying *Database for direct queries. func newService(t *testing.T, cfg config.IngestConfig) (*ingest.Service, *database.Database) { t.Helper() d := newTestDatabase(t) repo := &ingest.Repository{Database: d} if err := repo.Init(context.Background()); err != nil { t.Fatalf("repo.Init: %v", err) } logger := &observability.Logger{Cfg: config.LoggingConfig{Level: "info", Format: "json"}} if err := logger.Init(context.Background()); err != nil { t.Fatalf("logger.Init: %v", err) } metrics := &observability.Metrics{} if err := metrics.Init(context.Background()); err != nil { t.Fatalf("metrics.Init: %v", err) } svc := &ingest.Service{ Cfg: cfg, Repo: repo, Log: logger, Metrics: metrics, } if err := svc.Init(context.Background()); err != nil { t.Fatalf("svc.Init: %v", err) } return svc, d } // defaultCfg is the canonical IngestConfig used by most tests. func defaultCfg() config.IngestConfig { return config.IngestConfig{ MaxBodyBytes: 1 << 20, // 1 MiB MaxTurnContentBytes: 4096, ChunkSize: 10, } } // validLine produces a single NDJSON line with the supplied turn_id; all // other fields are minimal-but-valid. func validLine(turnID string, ts int64, content string) string { return fmt.Sprintf( `{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":%q,"seq":1,"role":"user","timestamp":%d,"content":%q,"session_meta":{"source_file":"/tmp/x.jsonl"}}`, turnID, ts, content, ) } func body(lines ...string) *strings.Reader { return strings.NewReader(strings.Join(lines, "\n") + "\n") } // --- validation matrix ---------------------------------------------------- func TestService_Ingest_RequiredFieldsRejected(t *testing.T) { cases := []struct { name string line string }{ {"missing tool", `{"host":"phoebe","session_id":"s1","turn_id":"t","seq":1,"role":"user","timestamp":1,"content":"x","session_meta":{"source_file":"/x"}}`}, {"missing host", `{"tool":"cc","session_id":"s1","turn_id":"t","seq":1,"role":"user","timestamp":1,"content":"x","session_meta":{"source_file":"/x"}}`}, {"missing session_id", `{"tool":"cc","host":"phoebe","turn_id":"t","seq":1,"role":"user","timestamp":1,"content":"x","session_meta":{"source_file":"/x"}}`}, {"missing turn_id", `{"tool":"cc","host":"phoebe","session_id":"s1","seq":1,"role":"user","timestamp":1,"content":"x","session_meta":{"source_file":"/x"}}`}, {"missing role", `{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":"t","seq":1,"timestamp":1,"content":"x","session_meta":{"source_file":"/x"}}`}, {"missing timestamp", `{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":"t","seq":1,"role":"user","content":"x","session_meta":{"source_file":"/x"}}`}, {"missing content", `{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":"t","seq":1,"role":"user","timestamp":1,"session_meta":{"source_file":"/x"}}`}, {"missing source_file", `{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":"t","seq":1,"role":"user","timestamp":1,"content":"x","session_meta":{}}`}, {"bad role", `{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":"t","seq":1,"role":"hacker","timestamp":1,"content":"x","session_meta":{"source_file":"/x"}}`}, } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { svc, _ := newService(t, defaultCfg()) res, err := svc.Ingest(context.Background(), "alice", strings.NewReader(tc.line+"\n"), 1<<20) if err != nil { t.Fatalf("Ingest err: %v", err) } if res.Accepted != 0 { t.Errorf("Accepted=%d; want 0", res.Accepted) } if len(res.Errors) != 1 || res.Errors[0].Line != 1 { t.Fatalf("expected one LineError on line 1, got %+v", res.Errors) } }) } } func TestService_Ingest_ContentOverCapRejected(t *testing.T) { cfg := defaultCfg() cfg.MaxTurnContentBytes = 16 svc, _ := newService(t, cfg) line := validLine("tA", 1700000000, strings.Repeat("a", 17)) res, err := svc.Ingest(context.Background(), "alice", strings.NewReader(line+"\n"), 1<<20) if err != nil { t.Fatalf("Ingest: %v", err) } if res.Accepted != 0 || len(res.Errors) != 1 { t.Fatalf("expected 0 accepted + 1 error, got %+v", res) } } func TestService_Ingest_SourceFileOverCapRejected(t *testing.T) { svc, _ := newService(t, defaultCfg()) long := strings.Repeat("p", 1025) line := fmt.Sprintf( `{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":"t","seq":1,"role":"user","timestamp":1,"content":"x","session_meta":{"source_file":%q}}`, long, ) res, err := svc.Ingest(context.Background(), "alice", strings.NewReader(line+"\n"), 1<<20) if err != nil { t.Fatalf("Ingest: %v", err) } if res.Accepted != 0 || len(res.Errors) != 1 { t.Fatalf("expected 0 accepted + 1 error, got %+v", res) } } func TestService_Ingest_ValidationFailureCommitsPriorChunks(t *testing.T) { cfg := defaultCfg() cfg.ChunkSize = 2 svc, d := newService(t, cfg) // 2 valid lines (commit as chunk 1) + a third bad line. lines := []string{ validLine("t1", 1700000000, "one"), validLine("t2", 1700000001, "two"), `{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":"t3","seq":1,"role":"hacker","timestamp":1,"content":"x","session_meta":{"source_file":"/x"}}`, } res, err := svc.Ingest(context.Background(), "alice", body(lines...), 1<<20) if err != nil { t.Fatalf("Ingest: %v", err) } if res.Accepted != 2 { t.Errorf("Accepted=%d; want 2", res.Accepted) } if len(res.Errors) != 1 || res.Errors[0].Line != 3 { t.Fatalf("expected one error on line 3, got %+v", res.Errors) } var n int if err := d.DB.Get(&n, `SELECT COUNT(*) FROM turns WHERE owner='alice'`); err != nil { t.Fatalf("q: %v", err) } if n != 2 { t.Errorf("expected 2 committed turns, got %d", n) } } // --- semantics ------------------------------------------------------------ func TestService_Ingest_Idempotent(t *testing.T) { svc, d := newService(t, defaultCfg()) lines := []string{ validLine("t1", 1700000000, "alpha"), validLine("t2", 1700000001, "beta"), } for i := 0; i < 2; i++ { res, err := svc.Ingest(context.Background(), "alice", body(lines...), 1<<20) if err != nil { t.Fatalf("ingest %d: %v", i, err) } if res.Accepted != 2 { t.Errorf("Accepted=%d; want 2 on iteration %d", res.Accepted, i) } } var rows int if err := d.DB.Get(&rows, `SELECT COUNT(*) FROM turns`); err != nil { t.Fatalf("q: %v", err) } if rows != 2 { t.Errorf("expected 2 turns after re-ingest, got %d", rows) } var sessions int if err := d.DB.Get(&sessions, `SELECT COUNT(*) FROM sessions`); err != nil { t.Fatalf("q: %v", err) } if sessions != 1 { t.Errorf("expected 1 session, got %d", sessions) } } func TestService_Ingest_FirstWriteWinsSession(t *testing.T) { svc, d := newService(t, defaultCfg()) line1 := `{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":"t1","seq":1,"role":"user","timestamp":1700000000,"content":"x","session_meta":{"source_file":"/A","working_dir":"/A"}}` if _, err := svc.Ingest(context.Background(), "alice", strings.NewReader(line1+"\n"), 1<<20); err != nil { t.Fatalf("ingest 1: %v", err) } line2 := `{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":"t2","seq":2,"role":"user","timestamp":1700000100,"content":"y","session_meta":{"source_file":"/B","working_dir":"/B"}}` if _, err := svc.Ingest(context.Background(), "alice", strings.NewReader(line2+"\n"), 1<<20); err != nil { t.Fatalf("ingest 2: %v", err) } var wd, sf string if err := d.DB.QueryRow(`SELECT working_dir, source_file FROM sessions WHERE owner=? AND session_id=?`, "alice", "s1").Scan(&wd, &sf); err != nil { t.Fatalf("q: %v", err) } if wd != "/A" || sf != "/A" { t.Errorf("first-write-wins broken; working_dir=%q source_file=%q", wd, sf) } } func TestService_Ingest_LastWriteWinsTurn(t *testing.T) { svc, d := newService(t, defaultCfg()) if _, err := svc.Ingest(context.Background(), "alice", strings.NewReader(validLine("t1", 1700000000, "first")+"\n"), 1<<20); err != nil { t.Fatalf("ingest 1: %v", err) } if _, err := svc.Ingest(context.Background(), "alice", strings.NewReader(validLine("t1", 1700000100, "second")+"\n"), 1<<20); err != nil { t.Fatalf("ingest 2: %v", err) } var content string if err := d.DB.Get(&content, `SELECT content FROM turns WHERE turn_id='t1'`); err != nil { t.Fatalf("q: %v", err) } if content != "second" { t.Errorf("last-write-wins broken; content=%q", content) } } func TestService_Ingest_EndedAtExtends(t *testing.T) { svc, d := newService(t, defaultCfg()) if _, err := svc.Ingest(context.Background(), "alice", strings.NewReader(validLine("t1", 1700000000, "x")+"\n"), 1<<20); err != nil { t.Fatalf("ingest 1: %v", err) } if _, err := svc.Ingest(context.Background(), "alice", strings.NewReader(validLine("t2", 1700000500, "y")+"\n"), 1<<20); err != nil { t.Fatalf("ingest 2: %v", err) } var ended int64 if err := d.DB.Get(&ended, `SELECT ended_at FROM sessions WHERE owner=? AND session_id=?`, "alice", "s1"); err != nil { t.Fatalf("q: %v", err) } if ended != 1700000500 { t.Errorf("expected ended_at=1700000500, got %d", ended) } } func TestService_Ingest_StartedAtFallback(t *testing.T) { svc, d := newService(t, defaultCfg()) // SessionMeta has no started_at; first turn timestamp wins. if _, err := svc.Ingest(context.Background(), "alice", strings.NewReader(validLine("t1", 1700000123, "x")+"\n"), 1<<20); err != nil { t.Fatalf("ingest: %v", err) } var started int64 if err := d.DB.Get(&started, `SELECT started_at FROM sessions WHERE owner=? AND session_id=?`, "alice", "s1"); err != nil { t.Fatalf("q: %v", err) } if started != 1700000123 { t.Errorf("expected started_at=1700000123, got %d", started) } } func TestService_Ingest_ChunkedPartialAcceptBadLineInChunk3(t *testing.T) { cfg := defaultCfg() cfg.ChunkSize = 2 svc, d := newService(t, cfg) lines := []string{ validLine("t1", 1700000000, "a"), validLine("t2", 1700000001, "b"), validLine("t3", 1700000002, "c"), validLine("t4", 1700000003, "d"), // Bad line in chunk 3 (line index 5). `{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":"t5","seq":1,"role":"hacker","timestamp":1,"content":"x","session_meta":{"source_file":"/x"}}`, validLine("t6", 1700000005, "f"), } res, err := svc.Ingest(context.Background(), "alice", body(lines...), 1<<20) if err != nil { t.Fatalf("ingest: %v", err) } if res.Accepted != 4 { t.Errorf("Accepted=%d; want 4 (2 * chunkSize)", res.Accepted) } if len(res.Errors) != 1 || res.Errors[0].Line != 5 { t.Fatalf("expected one error referencing line 5, got %+v", res.Errors) } // Line 6 must not be processed. var n int if err := d.DB.Get(&n, `SELECT COUNT(*) FROM turns WHERE turn_id='t6'`); err != nil { t.Fatalf("q: %v", err) } if n != 0 { t.Errorf("expected line 6 not processed, found %d rows", n) } if err := d.DB.Get(&n, `SELECT COUNT(*) FROM turns`); err != nil { t.Fatalf("q: %v", err) } if n != 4 { t.Errorf("expected 4 committed turns, got %d", n) } } func TestService_Ingest_PerUserIsolation(t *testing.T) { svc, d := newService(t, defaultCfg()) lines := []string{validLine("t1", 1700000000, "x"), validLine("t2", 1700000001, "y")} if _, err := svc.Ingest(context.Background(), "alice", body(lines...), 1<<20); err != nil { t.Fatalf("alice ingest: %v", err) } if _, err := svc.Ingest(context.Background(), "bob", body(lines...), 1<<20); err != nil { t.Fatalf("bob ingest: %v", err) } var n int if err := d.DB.Get(&n, `SELECT COUNT(*) FROM sessions`); err != nil { t.Fatalf("q sessions: %v", err) } if n != 2 { t.Errorf("expected 2 sessions across owners, got %d", n) } if err := d.DB.Get(&n, `SELECT COUNT(*) FROM turns`); err != nil { t.Fatalf("q turns: %v", err) } if n != 4 { t.Errorf("expected 4 turns across owners, got %d", n) } // Mutating bob's session must not touch alice's row. if _, err := d.DB.Exec(`UPDATE sessions SET ended_at = 1 WHERE owner='bob'`); err != nil { t.Fatalf("bob mutate: %v", err) } var aliceEnded int64 if err := d.DB.Get(&aliceEnded, `SELECT ended_at FROM sessions WHERE owner='alice'`); err != nil { t.Fatalf("q alice: %v", err) } if aliceEnded == 1 { t.Errorf("bob's update bled into alice's row") } } func TestService_Ingest_WireOwnerIgnored(t *testing.T) { svc, d := newService(t, defaultCfg()) // Stray "owner":"evil" in the wire payload — must be dropped silently. line := `{"owner":"evil","tool":"cc","host":"phoebe","session_id":"s1","turn_id":"t1","seq":1,"role":"user","timestamp":1700000000,"content":"x","session_meta":{"source_file":"/x"}}` if _, err := svc.Ingest(context.Background(), "alice", strings.NewReader(line+"\n"), 1<<20); err != nil { t.Fatalf("ingest: %v", err) } var owner string if err := d.DB.Get(&owner, `SELECT owner FROM turns WHERE turn_id='t1'`); err != nil { t.Fatalf("q: %v", err) } if owner != "alice" { t.Fatalf("expected owner=alice, got %q", owner) } var n int if err := d.DB.Get(&n, `SELECT COUNT(*) FROM turns WHERE owner='evil'`); err != nil { t.Fatalf("q evil: %v", err) } if n != 0 { t.Fatalf("expected zero rows for owner=evil, got %d", n) } } func TestService_Ingest_FTSAfterIngest(t *testing.T) { svc, d := newService(t, defaultCfg()) line := `{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":"tA","seq":1,"role":"user","timestamp":1700000000,"content":"unmistakable haystack","tool_calls":{"name":"shellexec"},"session_meta":{"source_file":"/x"}}` if _, err := svc.Ingest(context.Background(), "alice", strings.NewReader(line+"\n"), 1<<20); err != nil { t.Fatalf("ingest: %v", err) } var n int if err := d.DB.Get(&n, `SELECT COUNT(*) FROM turns_fts WHERE owner='alice' AND turns_fts MATCH 'unmistakable'`); err != nil { t.Fatalf("q turns_fts: %v", err) } if n != 1 { t.Fatalf("expected 1 turns_fts hit, got %d", n) } if err := d.DB.Get(&n, `SELECT COUNT(*) FROM tool_outputs_fts WHERE owner='alice' AND tool_outputs_fts MATCH 'shellexec'`); err != nil { t.Fatalf("q tool_outputs_fts: %v", err) } if n != 1 { t.Fatalf("expected 1 tool_outputs_fts hit, got %d", n) } } func TestService_Ingest_DBDownReturnsError(t *testing.T) { svc, d := newService(t, defaultCfg()) // Close the underlying *sql.DB without nilling the field; the next // BeginTx returns sql.ErrConnDone (or similar), simulating a real // runtime "database closed" failure without crashing the test. if err := d.DB.Close(); err != nil { t.Fatalf("close: %v", err) } res, err := svc.Ingest(context.Background(), "alice", strings.NewReader(validLine("t1", 1700000000, "x")+"\n"), 1<<20) if err == nil { t.Fatalf("expected error, got nil; result=%+v", res) } if res.Accepted != 0 { t.Errorf("expected Accepted=0 on DB-down, got %d", res.Accepted) } } func TestService_Ingest_BodyOverCapReturnsTooLarge(t *testing.T) { cfg := defaultCfg() cfg.MaxBodyBytes = 100 svc, _ := newService(t, cfg) // 1 KiB payload that does not fit. line := validLine("t1", 1700000000, strings.Repeat("a", 1024)) res, err := svc.Ingest(context.Background(), "alice", strings.NewReader(line+"\n"), cfg.MaxBodyBytes) if err == nil { t.Fatalf("expected TOO_LARGE error, got nil; result=%+v", res) } // We don't import apierror here; just assert err contains the message. if !strings.Contains(err.Error(), "limit") && !strings.Contains(err.Error(), "long") { t.Errorf("expected too-large/limit error, got %v", err) } if res.Accepted != 0 { t.Errorf("Accepted=%d; want 0 on body cap", res.Accepted) } }