From 9094d79e008f69af42107022da2bfcf6f8d7aea8 Mon Sep 17 00:00:00 2001 From: Eugene Blikh Date: Sun, 3 May 2026 20:35:15 +0300 Subject: [PATCH] collector: enforce outbox cap before replay --- docs/tasks/lethe-collector-claude-code.md | 1 + internal/collector/ingest/runner.go | 34 +++++++++++++---- internal/collector/ingest/runner_test.go | 45 +++++++++++++++++++++++ internal/collector/ingest/sender.go | 3 +- internal/collector/ingest/sender_test.go | 22 +++++++++++ 5 files changed, 96 insertions(+), 9 deletions(-) diff --git a/docs/tasks/lethe-collector-claude-code.md b/docs/tasks/lethe-collector-claude-code.md index f7d1eb8ee9d459d130bdea3eee180a137f1e225f..c8e4c47084776defa6b09f2822cb2e0c68d3ed78 100644 --- a/docs/tasks/lethe-collector-claude-code.md +++ b/docs/tasks/lethe-collector-claude-code.md @@ -319,6 +319,7 @@ Smoke: `go run ./cmd/lethe-collector --config ./tmp/collector-smoke.yaml status` - ureview (re-review): added `lag_bytes` per file to `status` output using `parser.SourceFile.Size` from discovery. - ureview (final): bounded daemon drain uses `http.timeout` — no separate `shutdown_grace` config exists for the collector. - ureview (final): backfill offset-0 semantics are implemented as `RunBackfillOnce` instead of a mode flag on `RunOnce` — explicit call sites are safer than a boolean parameter that could be misused in daemon loops. +- ureview (final): normalized sender `serverURL` and enforced outbox cap before every replay to fix IV5/IV9 violations found in review. ### Deferred (needs user input) diff --git a/internal/collector/ingest/runner.go b/internal/collector/ingest/runner.go index d8c363dab167430c512f3047375bdd2bf2e1d930..3e3e17ea8345b8d50fdd02dda47298000be4e3a1 100644 --- a/internal/collector/ingest/runner.go +++ b/internal/collector/ingest/runner.go @@ -18,6 +18,26 @@ import ( const replayOutboxLimit = 100 +// enforceAndReplay enforces the outbox size limit then replays queued rows. +func enforceAndReplay(ctx context.Context, store *state.Store, sender *Sender, maxBytes int64, src config.SourceConfig) error { + var errs []error + if err := EnforceOutboxLimit(ctx, store, maxBytes); err != nil { + if ctxErr := ctx.Err(); ctxErr != nil { + return errors.Join(ctxErr, culpa.Wrap(err, "enforce outbox limit")) + } + slog.Warn("enforce outbox limit failed", "tool", src.Tool, "error", err) + errs = append(errs, culpa.Wrap(err, "enforce outbox limit")) + } + if err := ReplayOutbox(ctx, store, sender, replayOutboxLimit); err != nil { + if ctxErr := ctx.Err(); ctxErr != nil { + return errors.Join(append(errs, ctxErr, culpa.Wrap(err, "replay outbox"))...) + } + slog.Warn("outbox replay failed", "tool", src.Tool, "error", err) + errs = append(errs, culpa.Wrap(err, "replay outbox")) + } + return errors.Join(errs...) +} + // RunOnce replays the safety-net outbox, scans one configured source, parses // new complete records from persisted offsets, posts batches, and advances // offsets only after server acceptance confirms the corresponding lines. @@ -31,12 +51,11 @@ func RunOnce(ctx context.Context, cfg config.Config, src config.SourceConfig, p ) } - if err := ReplayOutbox(ctx, store, sender, replayOutboxLimit); err != nil { + if err := enforceAndReplay(ctx, store, sender, cfg.Outbox.MaxBytes, src); err != nil { if ctxErr := ctx.Err(); ctxErr != nil { - return errors.Join(ctxErr, culpa.Wrap(err, "replay outbox")) + return errors.Join(append(errs, err)...) } - slog.Warn("outbox replay failed", "tool", src.Tool, "error", err) - errs = append(errs, culpa.Wrap(err, "replay outbox")) + errs = append(errs, err) } files, err := p.Discover(src.Path) @@ -70,12 +89,11 @@ func RunBackfillOnce(ctx context.Context, cfg config.Config, src config.SourceCo ) } - if err := ReplayOutbox(ctx, store, sender, replayOutboxLimit); err != nil { + if err := enforceAndReplay(ctx, store, sender, cfg.Outbox.MaxBytes, src); err != nil { if ctxErr := ctx.Err(); ctxErr != nil { - return errors.Join(ctxErr, culpa.Wrap(err, "replay outbox")) + return errors.Join(append(errs, err)...) } - slog.Warn("outbox replay failed", "tool", src.Tool, "error", err) - errs = append(errs, culpa.Wrap(err, "replay outbox")) + errs = append(errs, err) } files, err := p.Discover(src.Path) diff --git a/internal/collector/ingest/runner_test.go b/internal/collector/ingest/runner_test.go index 1f4cc8204471dccd68c9ac9bfe33c68c4499ccb2..d04fddcb3f21a812d15395ba886d746a31cc4eea 100644 --- a/internal/collector/ingest/runner_test.go +++ b/internal/collector/ingest/runner_test.go @@ -14,6 +14,7 @@ import ( "sourcecraft.dev/bigbes/lethe/internal/collector/config" "sourcecraft.dev/bigbes/lethe/internal/collector/parser" + "sourcecraft.dev/bigbes/lethe/internal/collector/state" "sourcecraft.dev/bigbes/lethe/internal/shared/wire" ) @@ -36,6 +37,50 @@ func TestRunOnce_AllAcceptedPersistsNewOffsetAndDoesNotEnqueueOutbox(t *testing. assertOutboxCount(t, ctx, store, 0) } +func TestRunOnce_TrimsPreexistingOutboxOverLimitEvenWithNoEnqueue(t *testing.T) { + ctx := context.Background() + store := openTestStore(t, ctx) + source := testSource(t, "claude-code", 10, 4096) + file := filepath.Join(source.Path, "one.jsonl") + + // Pre-populate outbox with two 10-byte items. + items := []state.OutboxItem{ + {Tool: "claude-code", Host: "h", SourceFile: file, Payload: []byte("{\"seq\":1}\n")}, + {Tool: "claude-code", Host: "h", SourceFile: file, Payload: []byte("{\"seq\":2}\n")}, + } + for _, it := range items { + if err := store.Enqueue(ctx, it); err != nil { + t.Fatalf("Enqueue: %v", err) + } + } + + cfg := testConfig() + cfg.Outbox.MaxBytes = 10 + + // No new events => no new enqueue. + p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{ + file: {events: []wire.TurnEvent{}, newOffset: 0}, + }) + // Accepted:0 so replay does not delete rows; enforce must trim. + sender := acceptingSender(t, nil) + + err := RunOnce(ctx, cfg, source, p, store, sender) + if err != nil { + t.Fatalf("RunOnce: %v", err) + } + + rows, err := store.Oldest(ctx, 10) + if err != nil { + t.Fatalf("Oldest: %v", err) + } + if len(rows) != 1 { + t.Errorf("expected 1 row after trim, got %d", len(rows)) + } + if len(rows) > 0 && string(rows[0].Payload) != "{\"seq\":2}\n" { + t.Errorf("remaining payload = %q, want %q", string(rows[0].Payload), "{\"seq\":2}\n") + } +} + func TestRunOnce_PartialAcceptWithErrorSkipsFailedEvent(t *testing.T) { ctx := context.Background() store := openTestStore(t, ctx) diff --git a/internal/collector/ingest/sender.go b/internal/collector/ingest/sender.go index f5720ef864d2d44e57c08d7669a6d60df511221a..88868652d6f558c7707744b87e4290bae4a10515 100644 --- a/internal/collector/ingest/sender.go +++ b/internal/collector/ingest/sender.go @@ -9,6 +9,7 @@ import ( "fmt" "io" "net/http" + "strings" "go.bigb.es/auxilia/culpa" "sourcecraft.dev/bigbes/lethe/internal/shared/wire" @@ -34,7 +35,7 @@ type Sender struct { // NewSender builds a Sender that posts to serverURL + /api/v1/ingest. func NewSender(serverURL string, client *http.Client) *Sender { - return &Sender{serverURL: serverURL, client: client} + return &Sender{serverURL: strings.TrimRight(serverURL, "/"), client: client} } // PostBatch encodes events as NDJSON and POSTs them to the ingest endpoint. diff --git a/internal/collector/ingest/sender_test.go b/internal/collector/ingest/sender_test.go index b8343674972bc18bb930bb076808f5e7d25014ad..f12105f0fad6154322915b67e470cadcb0868a20 100644 --- a/internal/collector/ingest/sender_test.go +++ b/internal/collector/ingest/sender_test.go @@ -102,6 +102,28 @@ func TestSender_PostBatch_Success(t *testing.T) { } } +func TestSender_PostBatch_TrailingSlashURL(t *testing.T) { + var gotPath string + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + gotPath = r.URL.Path + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"accepted":1,"errors":[]}`)) + })) + defer ts.Close() + + sender := NewSender(ts.URL+"/", http.DefaultClient) + _, err := sender.PostBatch(context.Background(), []wire.TurnEvent{ + {Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t1", Seq: 1, Role: "user", Timestamp: 1000, Content: "hello"}, + }) + if err != nil { + t.Fatalf("PostBatch: %v", err) + } + if gotPath != "/api/v1/ingest" { + t.Errorf("expected path /api/v1/ingest, got %s", gotPath) + } +} + func TestSender_PostBatch_Non2xx(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusServiceUnavailable)