From f3118d95bdf88c114346b0b2b43fad44e564a484 Mon Sep 17 00:00:00 2001 From: Eugene Blikh Date: Sun, 3 May 2026 19:42:15 +0300 Subject: [PATCH] collector: preserve valid rows around ingest errors --- cmd/lethe-collector/main.go | 6 +- docs/tasks/lethe-collector-claude-code.md | 8 +- internal/collector/ingest/outbox.go | 2 + internal/collector/ingest/outbox_test.go | 44 +++++++++++ internal/collector/ingest/runner.go | 89 +++++++++++++++++++---- internal/collector/ingest/runner_test.go | 89 +++++++++++++++++++++++ 6 files changed, 220 insertions(+), 18 deletions(-) diff --git a/cmd/lethe-collector/main.go b/cmd/lethe-collector/main.go index 3ce72d1955fa0af90905baf315fd6f0188a32f3e..c52154032eabfc90332b2987ef8ec5d31383d0de 100644 --- a/cmd/lethe-collector/main.go +++ b/cmd/lethe-collector/main.go @@ -202,7 +202,11 @@ func newStatusCmd(configPath *string) *cobra.Command { fmt.Printf(" %s: error=%v\n", f.Path, err) continue } - fmt.Printf(" %s: offset=%d\n", f.Path, off) + lagBytes := f.Size - off + if lagBytes < 0 { + lagBytes = 0 + } + fmt.Printf(" %s: offset=%d lag_bytes=%d\n", f.Path, off, lagBytes) } } diff --git a/docs/tasks/lethe-collector-claude-code.md b/docs/tasks/lethe-collector-claude-code.md index 6b427a4dbd12337f94e55685106b21c52828ee8e..f65da53a0e78c8b7f3f258bf81da02ae43f4ac16 100644 --- a/docs/tasks/lethe-collector-claude-code.md +++ b/docs/tasks/lethe-collector-claude-code.md @@ -315,9 +315,15 @@ Smoke: `go run ./cmd/lethe-collector --config ./tmp/collector-smoke.yaml status` - worktree: `task/lethe-collector-claude-code` at `/Users/blikh/data/home/lethe/.worktrees/lethe-collector-claude-code` — hands-off requires isolated reversible edits. - worktree setup: added `.worktrees/` to `.gitignore` on `master` before creating the task worktree — `git-worktrees` requires project-local worktree directories to be ignored. - uplan: plan auto-approved (hands-off). +- ureview (re-review): fixed `persistAcceptedOffset` to use the first error's `Line` (1-based within the request body) to identify the failed row, rather than assuming `result.Accepted` points to it. Valid but uncommitted rows before the failed line are re-posted as a smaller prefix before the failed row is skipped, preventing data loss when `accepted=0, errors line=2`. +- ureview (re-review): added WARN log with dropped row count and bytes to `EnforceOutboxLimit`. +- ureview (re-review): added `lag_bytes` per file to `status` output using `parser.SourceFile.Size` from discovery. ### Deferred / known limitations -- `status` does not display per-source `last_error` or ingestion `lag` because the PH1 state schema (`ingestion_state` table) stores only `last_offset`, not error history or timestamps. Adding these requires a schema extension outside the current plan scope. +- `status` does not display per-source `last_error` because the PH1 state schema (`ingestion_state` table) stores only `last_offset`, not error history or timestamps. Adding these requires a schema extension outside the current plan scope. ### Deferred (needs user input) + +- retry/backoff: `http.retry_max` is loaded from config but exponential backoff needs a configured base/max delay; no conservative default was specified. +- status last_error: requires extending `ingestion_state` schema and deciding retention/update semantics. diff --git a/internal/collector/ingest/outbox.go b/internal/collector/ingest/outbox.go index a1381ef3a5e91020a662caf9444682c288cabc5f..8d3424fc553a13afd7bde396e0154a7605d12a27 100644 --- a/internal/collector/ingest/outbox.go +++ b/internal/collector/ingest/outbox.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "log/slog" "go.bigb.es/auxilia/culpa" "sourcecraft.dev/bigbes/lethe/internal/collector/state" @@ -71,6 +72,7 @@ func EnforceOutboxLimit(ctx context.Context, store *state.Store, maxBytes int64) } if len(deleteIDs) > 0 { + slog.Warn("outbox overflow: dropped oldest rows", "dropped_rows", len(deleteIDs), "dropped_bytes", dropped) if err := store.Delete(ctx, deleteIDs); err != nil { return culpa.Wrap(err, "trim outbox") } diff --git a/internal/collector/ingest/outbox_test.go b/internal/collector/ingest/outbox_test.go index 6888fb1597029111713f0e665a9222652c800389..471ea84c83c5e491ddccf037e1488f770b38fde2 100644 --- a/internal/collector/ingest/outbox_test.go +++ b/internal/collector/ingest/outbox_test.go @@ -1,9 +1,12 @@ package ingest import ( + "bytes" "context" + "log/slog" "net/http" "net/http/httptest" + "strings" "testing" "sourcecraft.dev/bigbes/lethe/internal/collector/state" @@ -209,6 +212,47 @@ func TestEnforceOutboxLimit_NoOpWhenUnderLimit(t *testing.T) { } } +func TestEnforceOutboxLimit_LogsWarningWhenRowsDropped(t *testing.T) { + ctx := context.Background() + dir := t.TempDir() + store, err := state.Open(ctx, dir+"/test.db") + if err != nil { + t.Fatalf("Open: %v", err) + } + defer func() { _ = store.Close() }() + + items := []state.OutboxItem{ + {Tool: "cc", Host: "h", SourceFile: "/a.jsonl", Payload: []byte(`{"seq":1}`)}, // 9 bytes + {Tool: "cc", Host: "h", SourceFile: "/b.jsonl", Payload: []byte(`{"seq":2}`)}, // 9 bytes + {Tool: "cc", Host: "h", SourceFile: "/c.jsonl", Payload: []byte(`{"seq":333}`)}, // 11 bytes + } + for _, it := range items { + if err := store.Enqueue(ctx, it); err != nil { + t.Fatalf("Enqueue: %v", err) + } + } + + oldLogger := slog.Default() + defer slog.SetDefault(oldLogger) + var buf bytes.Buffer + slog.SetDefault(slog.New(slog.NewTextHandler(&buf, &slog.HandlerOptions{Level: slog.LevelWarn}))) + + if err := EnforceOutboxLimit(ctx, store, 20); err != nil { + t.Fatalf("EnforceOutboxLimit: %v", err) + } + + logOutput := buf.String() + if !strings.Contains(logOutput, "outbox overflow") { + t.Errorf("expected WARN log containing 'outbox overflow', got %q", logOutput) + } + if !strings.Contains(logOutput, "dropped_rows") { + t.Errorf("expected WARN log containing 'dropped_rows', got %q", logOutput) + } + if !strings.Contains(logOutput, "dropped_bytes") { + t.Errorf("expected WARN log containing 'dropped_bytes', got %q", logOutput) + } +} + func TestEnforceOutboxLimit_EmptyOutbox(t *testing.T) { ctx := context.Background() dir := t.TempDir() diff --git a/internal/collector/ingest/runner.go b/internal/collector/ingest/runner.go index 1a9c945837c2c36fd8063d6c83a7fecb2ae7ad84..65577cbde167e39ee762106c0468896dc5afe547 100644 --- a/internal/collector/ingest/runner.go +++ b/internal/collector/ingest/runner.go @@ -143,7 +143,7 @@ func runFile(ctx context.Context, cfg config.Config, src config.SourceConfig, p return culpa.Wrap(err, "post batch") } - advanced, err := persistAcceptedOffset(ctx, store, src.Tool, path, events, newOffset, batch, result) + advanced, err := persistAcceptedOffset(ctx, store, sender, cfg, src, path, events, newOffset, batch, result) if err != nil { return err } @@ -175,7 +175,7 @@ func enqueueBatch(ctx context.Context, store *state.Store, src config.SourceConf return nil } -func persistAcceptedOffset(ctx context.Context, store *state.Store, tool string, path string, allEvents []wire.TurnEvent, newOffset int64, batch Batch, result Result) (int, error) { +func persistAcceptedOffset(ctx context.Context, store *state.Store, sender *Sender, cfg config.Config, src config.SourceConfig, path string, allEvents []wire.TurnEvent, newOffset int64, batch Batch, result Result) (int, error) { if result.Accepted < 0 || result.Accepted > len(batch.Events) { return 0, culpa.WithCode( fmt.Errorf("server accepted %d events for batch length %d", result.Accepted, len(batch.Events)), @@ -188,25 +188,82 @@ func persistAcceptedOffset(ctx context.Context, store *state.Store, tool string, "RUNNER_ACCEPTED_MISMATCH", ) } - if result.Accepted == 0 && len(result.Errors) == 0 { - return 0, nil + + if len(result.Errors) == 0 { + if result.Accepted == 0 { + return 0, nil + } + nextGlobalIndex := batch.EventIndexes[0] + result.Accepted + offset := eventOffset(allEvents, newOffset, nextGlobalIndex) + if err := store.SaveOffset(ctx, src.Tool, path, offset); err != nil { + return 0, culpa.Wrap(err, "save accepted offset") + } + return result.Accepted, nil } - nextGlobalIndex := batch.EventIndexes[0] + result.Accepted - if len(result.Errors) > 0 { - // Skip the failed event so it does not loop forever. - nextGlobalIndex++ + // There are errors — use the first error's line to identify the failed row. + failedLine := result.Errors[0].Line + if failedLine < 1 || failedLine > len(batch.Events) { + return 0, culpa.WithCode( + fmt.Errorf("server error line %d out of range [1,%d]", failedLine, len(batch.Events)), + "RUNNER_ERROR_LINE_INVALID", + ) } + failedLocalIdx := failedLine - 1 - var offset int64 - if nextGlobalIndex >= len(allEvents) { - offset = newOffset - } else { - offset = allEvents[nextGlobalIndex].Seq + if result.Accepted > failedLocalIdx { + return 0, culpa.WithCode( + fmt.Errorf("server accepted %d events but first error is at line %d", result.Accepted, failedLine), + "RUNNER_ACCEPTED_MISMATCH", + ) + } + + // Persist any already-accepted rows. + if result.Accepted > 0 { + nextGlobalIndex := batch.EventIndexes[0] + result.Accepted + offset := eventOffset(allEvents, newOffset, nextGlobalIndex) + if err := store.SaveOffset(ctx, src.Tool, path, offset); err != nil { + return 0, culpa.Wrap(err, "save accepted offset") + } } - if err := store.SaveOffset(ctx, tool, path, offset); err != nil { - return 0, culpa.Wrap(err, "save accepted offset") + // If there are uncommitted valid rows between accepted and the failed row, + // post that prefix safely with the same rules. + if result.Accepted < failedLocalIdx { + prefix := Batch{ + Events: batch.Events[result.Accepted:failedLocalIdx], + EventIndexes: batch.EventIndexes[result.Accepted:failedLocalIdx], + } + prefixResult, err := sender.PostBatch(ctx, prefix.Events) + if err != nil { + if enqueueErr := enqueueBatch(ctx, store, src, cfg.Host, path, prefix.Events, cfg.Outbox.MaxBytes); enqueueErr != nil { + return 0, errors.Join(culpa.Wrap(err, "post prefix batch"), enqueueErr) + } + return 0, culpa.Wrap(err, "post prefix batch") + } + prefixAdvanced, err := persistAcceptedOffset(ctx, store, sender, cfg, src, path, allEvents, newOffset, prefix, prefixResult) + if err != nil { + return 0, err + } + if prefixAdvanced < len(prefix.Events) { + // The prefix itself hit an error or could not advance fully. + // Return total advanced so far (accepted rows + prefix advancement). + return result.Accepted + prefixAdvanced, nil + } + } + + // Skip the failed row so it does not loop forever. + nextGlobalIndex := batch.EventIndexes[0] + failedLocalIdx + 1 + offset := eventOffset(allEvents, newOffset, nextGlobalIndex) + if err := store.SaveOffset(ctx, src.Tool, path, offset); err != nil { + return 0, culpa.Wrap(err, "save offset past failed row") + } + return failedLocalIdx + 1, nil +} + +func eventOffset(allEvents []wire.TurnEvent, newOffset int64, nextGlobalIndex int) int64 { + if nextGlobalIndex >= len(allEvents) { + return newOffset } - return result.Accepted, nil + return allEvents[nextGlobalIndex].Seq } diff --git a/internal/collector/ingest/runner_test.go b/internal/collector/ingest/runner_test.go index 3809e0f3e90dd2dad8d71176ff96939d4f285d84..2952d57aa05d3af8ad05d2290030f933ef8110ea 100644 --- a/internal/collector/ingest/runner_test.go +++ b/internal/collector/ingest/runner_test.go @@ -55,6 +55,95 @@ func TestRunOnce_PartialAcceptWithErrorSkipsFailedEvent(t *testing.T) { assertOutboxCount(t, ctx, store, 0) } +func TestRunOnce_ZeroAcceptedWithErrorLineTwoPostsPrefixAndSkipsFailedEvent(t *testing.T) { + ctx := context.Background() + store := openTestStore(t, ctx) + source := testSource(t, "claude-code", 10, 4096) + file := filepath.Join(source.Path, "one.jsonl") + events := []wire.TurnEvent{turnEvent(100, "a"), turnEvent(200, "b"), turnEvent(300, "c")} + p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{ + file: {events: events, newOffset: 400}, + }) + + var posted [][]wire.TurnEvent + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, _ := io.ReadAll(r.Body) + var batch []wire.TurnEvent + for _, line := range strings.Split(string(body), "\n") { + if line == "" { + continue + } + var ev wire.TurnEvent + if err := json.Unmarshal([]byte(line), &ev); err != nil { + t.Fatalf("unmarshal event: %v", err) + } + batch = append(batch, ev) + } + posted = append(posted, batch) + w.Header().Set("Content-Type", "application/json") + if len(posted) == 1 { + _, _ = w.Write([]byte(resultJSON(Result{Accepted: 0, Errors: []LineError{{Line: 2, Err: "bad"}}}))) + } else { + _, _ = w.Write([]byte(resultJSON(Result{Accepted: len(batch)}))) + } + })) + defer ts.Close() + sender := NewSender(ts.URL, ts.Client()) + + err := RunOnce(ctx, testConfig(), source, p, store, sender) + if err != nil { + t.Fatalf("RunOnce first run: %v", err) + } + assertOffset(t, ctx, store, "claude-code", file, 300) + + if len(posted) != 2 { + t.Fatalf("first run: expected 2 POSTs, got %d", len(posted)) + } + if len(posted[0]) != 3 { + t.Errorf("first run batch: expected 3 events, got %d", len(posted[0])) + } + if len(posted[1]) != 1 { + t.Errorf("prefix batch: expected 1 event, got %d", len(posted[1])) + } + if posted[1][0].Content != "a" { + t.Errorf("prefix batch: expected event 'a', got %q", posted[1][0].Content) + } + + err = RunOnce(ctx, testConfig(), source, p, store, sender) + if err != nil { + t.Fatalf("RunOnce second run: %v", err) + } + assertOffset(t, ctx, store, "claude-code", file, 400) + + if len(posted) != 3 { + t.Fatalf("second run: expected 3 POSTs total, got %d", len(posted)) + } + if len(posted[2]) != 1 { + t.Errorf("second run batch: expected 1 event, got %d", len(posted[2])) + } + if posted[2][0].Content != "c" { + t.Errorf("second run batch: expected event 'c', got %q", posted[2][0].Content) + } +} + +func TestRunOnce_MalformedErrorLineOutOfRangeReturnsError(t *testing.T) { + ctx := context.Background() + store := openTestStore(t, ctx) + source := testSource(t, "claude-code", 10, 4096) + file := filepath.Join(source.Path, "one.jsonl") + p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{ + file: {events: []wire.TurnEvent{turnEvent(100, "a"), turnEvent(200, "b")}, newOffset: 300}, + }) + sender := acceptingSender(t, []Result{{Accepted: 0, Errors: []LineError{{Line: 5, Err: "bad"}}}}) + + err := RunOnce(ctx, testConfig(), source, p, store, sender) + if err == nil { + t.Fatal("expected RunOnce error for malformed error line") + } + + assertOffset(t, ctx, store, "claude-code", file, 0) +} + func TestRunOnce_ZeroAcceptedWithErrorSkipsFirstEvent(t *testing.T) { ctx := context.Background() store := openTestStore(t, ctx)