From 538e632d05ea9cd425e52f87d4877822bd721a75 Mon Sep 17 00:00:00 2001 From: Eugene Blikh Date: Sun, 3 May 2026 19:14:18 +0300 Subject: [PATCH] collector: skip rejected rows after partial ingest --- docs/tasks/lethe-collector-claude-code.md | 26 +++++- internal/collector/ingest/runner.go | 25 ++++-- internal/collector/ingest/runner_test.go | 101 ++++++++++++++++++++-- 3 files changed, 137 insertions(+), 15 deletions(-) diff --git a/docs/tasks/lethe-collector-claude-code.md b/docs/tasks/lethe-collector-claude-code.md index f56566c172898c86fabb8958442c70a2bf02274f..6b427a4dbd12337f94e55685106b21c52828ee8e 100644 --- a/docs/tasks/lethe-collector-claude-code.md +++ b/docs/tasks/lethe-collector-claude-code.md @@ -159,6 +159,7 @@ Greenfield collector. The only interface contract this task can break is the wir - udesign: TOML → YAML for collector config — consistent with the server's config format from #1; one parser, one mental model. - udesign: `parentUuid` chaining of resumed Claude Code sessions deferred — every `.jsonl` is one session in this task. Surfacing chains is a UI concern for later. - udesign: synthesized `turn_id` uses `sha256(session_id || seq || timestamp || content[:64])[:16]` — `content[:64]` is enough to disambiguate within a single timestamp; full-content hash would balloon for large turns. +- ureview: fixed partial-accept offset handling — skipped server-rejected rows so one bad turn cannot stall a source file. TDD: yes (reason: parser behavior on golden fixture `.jsonl` files, offset persistence/resume semantics, outbox replay, and idempotent re-POST behavior are exactly the deterministic regression-prone surfaces TDD is good for. CLI scaffolding and systemd unit are exempt.) @@ -283,9 +284,28 @@ Scope check: no server changes, no extra parser registry abstraction, and no tok ## Verify -- `go test ./... -count=1` passes (all packages). -- `go build ./cmd/lethe-collector` succeeds. -- CLI commands (`daemon`, `backfill`, `status`) wired to existing collector packages. +**Result:** passed + +Positive: +- CK1 — `go test ./... -count=1` passes. +- CK2 — `go build ./cmd/lethe-collector` succeeds. +- CK3 — `lethe-collector status` with a minimal config opens the state DB and reports the configured source. + +Negative: +- CK4 — `lethe-collector status --config ./tmp/missing.yaml` exits non-zero with `CONFIG_NOT_FOUND` surfaced. + +Invariants / assumptions: +- CK5 (IV7) — `internal/collector` has no `os.Hostname` call; host flows from collector config. +- CK6 (IV9, AS1) — sender posts only `TurnEvent` NDJSON to `/api/v1/ingest`. +- CK7 (UK1) — Tailscale header injection remains unverifiable without the deployed Tailscale path. + +Interfaces: +- CK8 (IF1) — `config.Load(path string) (*Config, error)` is exercised by CLI and config tests. +- CK9 (IF2) — `state.Store` offset/outbox methods are exercised by runner and state tests. +- CK10 (IF3) — `Sender.PostBatch(ctx, events)` is exercised by sender, outbox, and runner tests. +- CK11 (IF4) — `RunOnce` / `RunDaemon` are exercised by CLI wiring and runner tests. + +Smoke: `go run ./cmd/lethe-collector --config ./tmp/collector-smoke.yaml status` → prints host, state DB, outbox stats, and source list. ## Conclusion diff --git a/internal/collector/ingest/runner.go b/internal/collector/ingest/runner.go index eb3f22135cadd039bc345e7799db334b306e42c7..1a9c945837c2c36fd8063d6c83a7fecb2ae7ad84 100644 --- a/internal/collector/ingest/runner.go +++ b/internal/collector/ingest/runner.go @@ -143,7 +143,7 @@ func runFile(ctx context.Context, cfg config.Config, src config.SourceConfig, p return culpa.Wrap(err, "post batch") } - advanced, err := persistAcceptedOffset(ctx, store, src.Tool, path, events, newOffset, batch, result.Accepted) + advanced, err := persistAcceptedOffset(ctx, store, src.Tool, path, events, newOffset, batch, result) if err != nil { return err } @@ -175,18 +175,29 @@ func enqueueBatch(ctx context.Context, store *state.Store, src config.SourceConf return nil } -func persistAcceptedOffset(ctx context.Context, store *state.Store, tool string, path string, allEvents []wire.TurnEvent, newOffset int64, batch Batch, accepted int) (int, error) { - if accepted < 0 || accepted > len(batch.Events) { +func persistAcceptedOffset(ctx context.Context, store *state.Store, tool string, path string, allEvents []wire.TurnEvent, newOffset int64, batch Batch, result Result) (int, error) { + if result.Accepted < 0 || result.Accepted > len(batch.Events) { return 0, culpa.WithCode( - fmt.Errorf("server accepted %d events for batch length %d", accepted, len(batch.Events)), + fmt.Errorf("server accepted %d events for batch length %d", result.Accepted, len(batch.Events)), "RUNNER_ACCEPTED_INVALID", ) } - if accepted == 0 { + if result.Accepted == len(batch.Events) && len(result.Errors) > 0 { + return 0, culpa.WithCode( + fmt.Errorf("server accepted all %d events but reported %d errors", len(batch.Events), len(result.Errors)), + "RUNNER_ACCEPTED_MISMATCH", + ) + } + if result.Accepted == 0 && len(result.Errors) == 0 { return 0, nil } - nextGlobalIndex := batch.EventIndexes[0] + accepted + nextGlobalIndex := batch.EventIndexes[0] + result.Accepted + if len(result.Errors) > 0 { + // Skip the failed event so it does not loop forever. + nextGlobalIndex++ + } + var offset int64 if nextGlobalIndex >= len(allEvents) { offset = newOffset @@ -197,5 +208,5 @@ func persistAcceptedOffset(ctx context.Context, store *state.Store, tool string, if err := store.SaveOffset(ctx, tool, path, offset); err != nil { return 0, culpa.Wrap(err, "save accepted offset") } - return accepted, nil + return result.Accepted, nil } diff --git a/internal/collector/ingest/runner_test.go b/internal/collector/ingest/runner_test.go index 6c610b9b8a5201aa8f4f0a94b53210ba11343da2..3809e0f3e90dd2dad8d71176ff96939d4f285d84 100644 --- a/internal/collector/ingest/runner_test.go +++ b/internal/collector/ingest/runner_test.go @@ -2,10 +2,12 @@ package ingest import ( "context" + "encoding/json" "io" "net/http" "net/http/httptest" "path/filepath" + "strings" "sync" "testing" "time" @@ -34,7 +36,7 @@ func TestRunOnce_AllAcceptedPersistsNewOffsetAndDoesNotEnqueueOutbox(t *testing. assertOutboxCount(t, ctx, store, 0) } -func TestRunOnce_PartialAcceptedPersistsFirstUnacceptedEventOffset(t *testing.T) { +func TestRunOnce_PartialAcceptWithErrorSkipsFailedEvent(t *testing.T) { ctx := context.Background() store := openTestStore(t, ctx) source := testSource(t, "claude-code", 10, 4096) @@ -42,7 +44,7 @@ func TestRunOnce_PartialAcceptedPersistsFirstUnacceptedEventOffset(t *testing.T) p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{ file: {events: []wire.TurnEvent{turnEvent(100, "a"), turnEvent(200, "b"), turnEvent(300, "c")}, newOffset: 400}, }) - sender := acceptingSender(t, []Result{{Accepted: 2, Errors: []LineError{{Line: 3, Err: "bad"}}}}) + sender := acceptingSender(t, []Result{{Accepted: 1, Errors: []LineError{{Line: 2, Err: "bad"}}}}) err := RunOnce(ctx, testConfig(), source, p, store, sender) if err != nil { @@ -53,7 +55,7 @@ func TestRunOnce_PartialAcceptedPersistsFirstUnacceptedEventOffset(t *testing.T) assertOutboxCount(t, ctx, store, 0) } -func TestRunOnce_ZeroAcceptedDoesNotPersistOffset(t *testing.T) { +func TestRunOnce_ZeroAcceptedWithErrorSkipsFirstEvent(t *testing.T) { ctx := context.Background() store := openTestStore(t, ctx) source := testSource(t, "claude-code", 10, 4096) @@ -68,10 +70,93 @@ func TestRunOnce_ZeroAcceptedDoesNotPersistOffset(t *testing.T) { t.Fatalf("RunOnce: %v", err) } - assertOffset(t, ctx, store, "claude-code", file, 0) + assertOffset(t, ctx, store, "claude-code", file, 200) assertOutboxCount(t, ctx, store, 0) } +func TestRunOnce_PartialAcceptSkipsBadRowAndRerunDoesNotRepostIt(t *testing.T) { + ctx := context.Background() + store := openTestStore(t, ctx) + source := testSource(t, "claude-code", 10, 4096) + file := filepath.Join(source.Path, "one.jsonl") + events := []wire.TurnEvent{turnEvent(100, "a"), turnEvent(200, "b"), turnEvent(300, "c")} + p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{ + file: {events: events, newOffset: 400}, + }) + + var posted [][]wire.TurnEvent + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, _ := io.ReadAll(r.Body) + var batch []wire.TurnEvent + for _, line := range strings.Split(string(body), "\n") { + if line == "" { + continue + } + var ev wire.TurnEvent + if err := json.Unmarshal([]byte(line), &ev); err != nil { + t.Fatalf("unmarshal event: %v", err) + } + batch = append(batch, ev) + } + posted = append(posted, batch) + w.Header().Set("Content-Type", "application/json") + if len(posted) == 1 { + _, _ = w.Write([]byte(resultJSON(Result{Accepted: 1, Errors: []LineError{{Line: 2, Err: "bad"}}}))) + } else { + _, _ = w.Write([]byte(resultJSON(Result{Accepted: 1}))) + } + })) + defer ts.Close() + sender := NewSender(ts.URL, ts.Client()) + + err := RunOnce(ctx, testConfig(), source, p, store, sender) + if err != nil { + t.Fatalf("RunOnce first run: %v", err) + } + assertOffset(t, ctx, store, "claude-code", file, 300) + + if len(posted) != 1 { + t.Fatalf("first run: expected 1 POST, got %d", len(posted)) + } + if len(posted[0]) != 3 { + t.Errorf("first run: expected 3 events in batch, got %d", len(posted[0])) + } + + err = RunOnce(ctx, testConfig(), source, p, store, sender) + if err != nil { + t.Fatalf("RunOnce second run: %v", err) + } + assertOffset(t, ctx, store, "claude-code", file, 400) + + if len(posted) != 2 { + t.Fatalf("second run: expected 2 POSTs total, got %d", len(posted)) + } + if len(posted[1]) != 1 { + t.Errorf("second run: expected 1 event in batch, got %d", len(posted[1])) + } + if posted[1][0].Content != "c" { + t.Errorf("second run: expected event 'c', got %q", posted[1][0].Content) + } +} + +func TestRunOnce_FullAcceptWithErrorsReturnsError(t *testing.T) { + ctx := context.Background() + store := openTestStore(t, ctx) + source := testSource(t, "claude-code", 10, 4096) + file := filepath.Join(source.Path, "one.jsonl") + p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{ + file: {events: []wire.TurnEvent{turnEvent(100, "a"), turnEvent(200, "b")}, newOffset: 300}, + }) + sender := acceptingSender(t, []Result{{Accepted: 2, Errors: []LineError{{Line: 2, Err: "bad"}}}}) + + err := RunOnce(ctx, testConfig(), source, p, store, sender) + if err == nil { + t.Fatal("expected RunOnce error when server accepts all but reports errors") + } + + assertOffset(t, ctx, store, "claude-code", file, 0) +} + func TestRunOnce_HardErrorEnqueuesUnsentBatchAndDoesNotAdvanceOffset(t *testing.T) { ctx := context.Background() store := openTestStore(t, ctx) @@ -177,7 +262,13 @@ func (p *fakeParser) Parse(path string, since int64) ([]wire.TurnEvent, int64, e if res.err != nil { return nil, since, res.err } - return res.events, res.newOffset, nil + var filtered []wire.TurnEvent + for _, ev := range res.events { + if ev.Seq >= since { + filtered = append(filtered, ev) + } + } + return filtered, res.newOffset, nil } type roundTripFunc func(*http.Request) (*http.Response, error)