package ingest import ( "context" "encoding/json" "io" "net/http" "net/http/httptest" "path/filepath" "strings" "sync" "testing" "time" "sourcecraft.dev/bigbes/lethe/internal/collector/config" "sourcecraft.dev/bigbes/lethe/internal/collector/parser" "sourcecraft.dev/bigbes/lethe/internal/collector/state" "sourcecraft.dev/bigbes/lethe/internal/shared/wire" ) func TestRunOnce_AllAcceptedPersistsNewOffsetAndDoesNotEnqueueOutbox(t *testing.T) { ctx := context.Background() store := openTestStore(t, ctx) source := testSource(t, "claude-code", 10, 4096) file := filepath.Join(source.Path, "one.jsonl") p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{ file: {events: []wire.TurnEvent{turnEvent(100, "a"), turnEvent(200, "b")}, newOffset: 300}, }) sender := acceptingSender(t, []Result{{Accepted: 2}}) err := RunOnce(ctx, testConfig(), source, p, store, sender) if err != nil { t.Fatalf("RunOnce: %v", err) } assertOffset(t, ctx, store, "claude-code", file, 300) assertOutboxCount(t, ctx, store, 0) } func TestRunOnce_TrimsPreexistingOutboxOverLimitEvenWithNoEnqueue(t *testing.T) { ctx := context.Background() store := openTestStore(t, ctx) source := testSource(t, "claude-code", 10, 4096) file := filepath.Join(source.Path, "one.jsonl") // Pre-populate outbox with two 10-byte items. items := []state.OutboxItem{ {Tool: "claude-code", Host: "h", SourceFile: file, Payload: []byte("{\"seq\":1}\n")}, {Tool: "claude-code", Host: "h", SourceFile: file, Payload: []byte("{\"seq\":2}\n")}, } for _, it := range items { if err := store.Enqueue(ctx, it); err != nil { t.Fatalf("Enqueue: %v", err) } } cfg := testConfig() cfg.Outbox.MaxBytes = 10 // No new events => no new enqueue. p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{ file: {events: []wire.TurnEvent{}, newOffset: 0}, }) // Accepted:0 so replay does not delete rows; enforce must trim. sender := acceptingSender(t, nil) err := RunOnce(ctx, cfg, source, p, store, sender) if err != nil { t.Fatalf("RunOnce: %v", err) } rows, err := store.Oldest(ctx, 10) if err != nil { t.Fatalf("Oldest: %v", err) } if len(rows) != 1 { t.Errorf("expected 1 row after trim, got %d", len(rows)) } if len(rows) > 0 && string(rows[0].Payload) != "{\"seq\":2}\n" { t.Errorf("remaining payload = %q, want %q", string(rows[0].Payload), "{\"seq\":2}\n") } } func TestRunOnce_PartialAcceptWithErrorSkipsFailedEvent(t *testing.T) { ctx := context.Background() store := openTestStore(t, ctx) source := testSource(t, "claude-code", 10, 4096) file := filepath.Join(source.Path, "one.jsonl") p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{ file: {events: []wire.TurnEvent{turnEvent(100, "a"), turnEvent(200, "b"), turnEvent(300, "c")}, newOffset: 400}, }) sender := acceptingSender(t, []Result{{Accepted: 1, Errors: []LineError{{Line: 2, Err: "bad"}}}}) err := RunOnce(ctx, testConfig(), source, p, store, sender) if err != nil { t.Fatalf("RunOnce: %v", err) } assertOffset(t, ctx, store, "claude-code", file, 300) assertOutboxCount(t, ctx, store, 0) } func TestRunOnce_ZeroAcceptedWithErrorLineTwoPostsPrefixAndSkipsFailedEvent(t *testing.T) { ctx := context.Background() store := openTestStore(t, ctx) source := testSource(t, "claude-code", 10, 4096) file := filepath.Join(source.Path, "one.jsonl") events := []wire.TurnEvent{turnEvent(100, "a"), turnEvent(200, "b"), turnEvent(300, "c")} p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{ file: {events: events, newOffset: 400}, }) var posted [][]wire.TurnEvent ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { body, _ := io.ReadAll(r.Body) var batch []wire.TurnEvent for _, line := range strings.Split(string(body), "\n") { if line == "" { continue } var ev wire.TurnEvent if err := json.Unmarshal([]byte(line), &ev); err != nil { t.Fatalf("unmarshal event: %v", err) } batch = append(batch, ev) } posted = append(posted, batch) w.Header().Set("Content-Type", "application/json") if len(posted) == 1 { _, _ = w.Write([]byte(resultJSON(Result{Accepted: 0, Errors: []LineError{{Line: 2, Err: "bad"}}}))) } else { _, _ = w.Write([]byte(resultJSON(Result{Accepted: len(batch)}))) } })) defer ts.Close() sender := NewSender(ts.URL, ts.Client()) err := RunOnce(ctx, testConfig(), source, p, store, sender) if err != nil { t.Fatalf("RunOnce first run: %v", err) } assertOffset(t, ctx, store, "claude-code", file, 300) if len(posted) != 2 { t.Fatalf("first run: expected 2 POSTs, got %d", len(posted)) } if len(posted[0]) != 3 { t.Errorf("first run batch: expected 3 events, got %d", len(posted[0])) } if len(posted[1]) != 1 { t.Errorf("prefix batch: expected 1 event, got %d", len(posted[1])) } if posted[1][0].Content != "a" { t.Errorf("prefix batch: expected event 'a', got %q", posted[1][0].Content) } err = RunOnce(ctx, testConfig(), source, p, store, sender) if err != nil { t.Fatalf("RunOnce second run: %v", err) } assertOffset(t, ctx, store, "claude-code", file, 400) if len(posted) != 3 { t.Fatalf("second run: expected 3 POSTs total, got %d", len(posted)) } if len(posted[2]) != 1 { t.Errorf("second run batch: expected 1 event, got %d", len(posted[2])) } if posted[2][0].Content != "c" { t.Errorf("second run batch: expected event 'c', got %q", posted[2][0].Content) } } func TestRunOnce_MalformedErrorLineOutOfRangeReturnsError(t *testing.T) { ctx := context.Background() store := openTestStore(t, ctx) source := testSource(t, "claude-code", 10, 4096) file := filepath.Join(source.Path, "one.jsonl") p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{ file: {events: []wire.TurnEvent{turnEvent(100, "a"), turnEvent(200, "b")}, newOffset: 300}, }) sender := acceptingSender(t, []Result{{Accepted: 0, Errors: []LineError{{Line: 5, Err: "bad"}}}}) err := RunOnce(ctx, testConfig(), source, p, store, sender) if err == nil { t.Fatal("expected RunOnce error for malformed error line") } assertOffset(t, ctx, store, "claude-code", file, 0) } func TestRunOnce_ZeroAcceptedWithErrorSkipsFirstEvent(t *testing.T) { ctx := context.Background() store := openTestStore(t, ctx) source := testSource(t, "claude-code", 10, 4096) file := filepath.Join(source.Path, "one.jsonl") p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{ file: {events: []wire.TurnEvent{turnEvent(100, "a"), turnEvent(200, "b")}, newOffset: 300}, }) sender := acceptingSender(t, []Result{{Accepted: 0, Errors: []LineError{{Line: 1, Err: "bad"}}}}) err := RunOnce(ctx, testConfig(), source, p, store, sender) if err != nil { t.Fatalf("RunOnce: %v", err) } assertOffset(t, ctx, store, "claude-code", file, 200) assertOutboxCount(t, ctx, store, 0) } func TestRunOnce_PartialAcceptSkipsBadRowAndRerunDoesNotRepostIt(t *testing.T) { ctx := context.Background() store := openTestStore(t, ctx) source := testSource(t, "claude-code", 10, 4096) file := filepath.Join(source.Path, "one.jsonl") events := []wire.TurnEvent{turnEvent(100, "a"), turnEvent(200, "b"), turnEvent(300, "c")} p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{ file: {events: events, newOffset: 400}, }) var posted [][]wire.TurnEvent ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { body, _ := io.ReadAll(r.Body) var batch []wire.TurnEvent for _, line := range strings.Split(string(body), "\n") { if line == "" { continue } var ev wire.TurnEvent if err := json.Unmarshal([]byte(line), &ev); err != nil { t.Fatalf("unmarshal event: %v", err) } batch = append(batch, ev) } posted = append(posted, batch) w.Header().Set("Content-Type", "application/json") if len(posted) == 1 { _, _ = w.Write([]byte(resultJSON(Result{Accepted: 1, Errors: []LineError{{Line: 2, Err: "bad"}}}))) } else { _, _ = w.Write([]byte(resultJSON(Result{Accepted: 1}))) } })) defer ts.Close() sender := NewSender(ts.URL, ts.Client()) err := RunOnce(ctx, testConfig(), source, p, store, sender) if err != nil { t.Fatalf("RunOnce first run: %v", err) } assertOffset(t, ctx, store, "claude-code", file, 300) if len(posted) != 1 { t.Fatalf("first run: expected 1 POST, got %d", len(posted)) } if len(posted[0]) != 3 { t.Errorf("first run: expected 3 events in batch, got %d", len(posted[0])) } err = RunOnce(ctx, testConfig(), source, p, store, sender) if err != nil { t.Fatalf("RunOnce second run: %v", err) } assertOffset(t, ctx, store, "claude-code", file, 400) if len(posted) != 2 { t.Fatalf("second run: expected 2 POSTs total, got %d", len(posted)) } if len(posted[1]) != 1 { t.Errorf("second run: expected 1 event in batch, got %d", len(posted[1])) } if posted[1][0].Content != "c" { t.Errorf("second run: expected event 'c', got %q", posted[1][0].Content) } } func TestRunOnce_FullAcceptWithErrorsReturnsError(t *testing.T) { ctx := context.Background() store := openTestStore(t, ctx) source := testSource(t, "claude-code", 10, 4096) file := filepath.Join(source.Path, "one.jsonl") p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{ file: {events: []wire.TurnEvent{turnEvent(100, "a"), turnEvent(200, "b")}, newOffset: 300}, }) sender := acceptingSender(t, []Result{{Accepted: 2, Errors: []LineError{{Line: 2, Err: "bad"}}}}) err := RunOnce(ctx, testConfig(), source, p, store, sender) if err == nil { t.Fatal("expected RunOnce error when server accepts all but reports errors") } assertOffset(t, ctx, store, "claude-code", file, 0) } func TestRunOnce_HardErrorEnqueuesUnsentBatchAndDoesNotAdvanceOffset(t *testing.T) { ctx := context.Background() store := openTestStore(t, ctx) source := testSource(t, "claude-code", 10, 4096) file := filepath.Join(source.Path, "one.jsonl") events := []wire.TurnEvent{turnEvent(100, "a"), turnEvent(200, "b")} p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{ file: {events: events, newOffset: 300}, }) sender := failingSender(t) err := RunOnce(ctx, testConfig(), source, p, store, sender) if err == nil { t.Fatal("expected RunOnce error on hard POST failure") } assertOffset(t, ctx, store, "claude-code", file, 0) rows, err := store.Oldest(ctx, 10) if err != nil { t.Fatalf("Oldest: %v", err) } if len(rows) != 1 { t.Fatalf("len(rows) = %d, want 1", len(rows)) } wantPayload, err := EncodeNDJSON(events) if err != nil { t.Fatalf("EncodeNDJSON: %v", err) } if string(rows[0].Payload) != string(wantPayload) { t.Errorf("payload = %q, want %q", string(rows[0].Payload), string(wantPayload)) } } func TestRunOnce_ContinuesToOtherFilesWhenOneFileFails(t *testing.T) { ctx := context.Background() store := openTestStore(t, ctx) source := testSource(t, "claude-code", 10, 4096) badFile := filepath.Join(source.Path, "bad.jsonl") goodFile := filepath.Join(source.Path, "good.jsonl") p := newFakeParser("claude-code", []parser.SourceFile{{Path: badFile}, {Path: goodFile}}, map[string]parseResult{ badFile: {err: errParseBoom}, goodFile: {events: []wire.TurnEvent{turnEvent(100, "ok")}, newOffset: 200}, }) sender := acceptingSender(t, []Result{{Accepted: 1}}) err := RunOnce(ctx, testConfig(), source, p, store, sender) if err == nil { t.Fatal("expected RunOnce to return collected parse error") } assertOffset(t, ctx, store, "claude-code", badFile, 0) assertOffset(t, ctx, store, "claude-code", goodFile, 200) } func TestRunDaemon_ExitsWhenContextCanceled(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) store := openTestStore(t, context.Background()) source := testSource(t, "claude-code", 10, 4096) source.PollInterval = time.Hour p := newFakeParser("claude-code", nil, nil) sender := acceptingSender(t, nil) cfg := testConfig() cfg.Sources = []config.SourceConfig{source} done := make(chan error, 1) go func() { done <- RunDaemon(ctx, cfg, map[string]parser.Parser{"claude-code": p}, store, sender) }() cancel() select { case err := <-done: if err != nil { t.Fatalf("RunDaemon: %v", err) } case <-time.After(2 * time.Second): t.Fatal("RunDaemon did not exit after context cancellation") } } func TestRunDaemon_CancelWaitsForActiveRunOnce(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) store := openTestStore(t, context.Background()) source := testSource(t, "claude-code", 10, 4096) source.PollInterval = time.Hour file := filepath.Join(source.Path, "one.jsonl") postStarted := make(chan struct{}) postContinue := make(chan struct{}) var postOnce sync.Once p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{ file: {events: []wire.TurnEvent{turnEvent(100, "a")}, newOffset: 200}, }) ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { postOnce.Do(func() { close(postStarted) }) <-postContinue w.Header().Set("Content-Type", "application/json") _, _ = w.Write([]byte(resultJSON(Result{Accepted: 1}))) })) defer ts.Close() sender := NewSender(ts.URL, ts.Client()) cfg := testConfig() cfg.Sources = []config.SourceConfig{source} cfg.HTTP.Timeout = 5 * time.Second done := make(chan error, 1) go func() { done <- RunDaemon(ctx, cfg, map[string]parser.Parser{"claude-code": p}, store, sender) }() select { case <-postStarted: case <-time.After(2 * time.Second): t.Fatal("POST did not start") } cancel() select { case <-done: t.Fatal("RunDaemon should wait for active RunOnce, not exit immediately") case <-time.After(100 * time.Millisecond): // expected } close(postContinue) select { case err := <-done: if err != nil { t.Fatalf("RunDaemon: %v", err) } case <-time.After(2 * time.Second): t.Fatal("RunDaemon did not exit after active run completed") } assertOffset(t, context.Background(), store, "claude-code", file, 200) } func TestRunBackfillOnce_IgnoresSavedOffsetAndPersistsProgress(t *testing.T) { ctx := context.Background() store := openTestStore(t, ctx) source := testSource(t, "claude-code", 10, 4096) file := filepath.Join(source.Path, "one.jsonl") if err := store.SaveOffset(ctx, "claude-code", file, 150); err != nil { t.Fatalf("SaveOffset: %v", err) } p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{ file: {events: []wire.TurnEvent{turnEvent(100, "a"), turnEvent(200, "b")}, newOffset: 300}, }) sender := acceptingSender(t, []Result{{Accepted: 2}}) err := RunBackfillOnce(ctx, testConfig(), source, p, store, sender) if err != nil { t.Fatalf("RunBackfillOnce: %v", err) } assertOffset(t, ctx, store, "claude-code", file, 300) assertOutboxCount(t, ctx, store, 0) } func TestRunBackfillOnce_InterruptedProgressIsResumable(t *testing.T) { ctx := context.Background() store := openTestStore(t, ctx) source := testSource(t, "claude-code", 10, 4096) file := filepath.Join(source.Path, "one.jsonl") if err := store.SaveOffset(ctx, "claude-code", file, 150); err != nil { t.Fatalf("SaveOffset: %v", err) } events := []wire.TurnEvent{turnEvent(100, "a"), turnEvent(200, "b")} p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{ file: {events: events, newOffset: 300}, }) sender := acceptingSender(t, []Result{{Accepted: 1}}) err := RunBackfillOnce(ctx, testConfig(), source, p, store, sender) if err != nil { t.Fatalf("RunBackfillOnce: %v", err) } assertOffset(t, ctx, store, "claude-code", file, 200) sender2 := acceptingSender(t, []Result{{Accepted: 1}}) err = RunOnce(ctx, testConfig(), source, p, store, sender2) if err != nil { t.Fatalf("RunOnce resume: %v", err) } assertOffset(t, ctx, store, "claude-code", file, 300) } type parseResult struct { events []wire.TurnEvent newOffset int64 err error } type fakeParser struct { tool string files []parser.SourceFile results map[string]parseResult } func newFakeParser(tool string, files []parser.SourceFile, results map[string]parseResult) *fakeParser { return &fakeParser{tool: tool, files: files, results: results} } func (p *fakeParser) Tool() string { return p.tool } func (p *fakeParser) Discover(root string) ([]parser.SourceFile, error) { return p.files, nil } func (p *fakeParser) Parse(path string, since int64) ([]wire.TurnEvent, int64, error) { res := p.results[path] if res.err != nil { return nil, since, res.err } var filtered []wire.TurnEvent for _, ev := range res.events { if ev.Seq >= since { filtered = append(filtered, ev) } } return filtered, res.newOffset, nil } type roundTripFunc func(*http.Request) (*http.Response, error) func (f roundTripFunc) RoundTrip(r *http.Request) (*http.Response, error) { return f(r) } func acceptingSender(t *testing.T, results []Result) *Sender { t.Helper() var mu sync.Mutex idx := 0 ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { _, _ = io.ReadAll(r.Body) mu.Lock() defer mu.Unlock() result := Result{Accepted: 0} if idx < len(results) { result = results[idx] idx++ } w.Header().Set("Content-Type", "application/json") _, _ = w.Write([]byte(resultJSON(result))) })) t.Cleanup(ts.Close) return NewSender(ts.URL, ts.Client()) } func failingSender(t *testing.T) *Sender { t.Helper() client := &http.Client{Transport: roundTripFunc(func(*http.Request) (*http.Response, error) { return nil, errPostBoom })} return NewSender("http://127.0.0.1", client) }