From ef738a28b65454441af0c98c23a389cfa24fb22c Mon Sep 17 00:00:00 2001 From: Eugene Blikh Date: Sun, 3 May 2026 18:31:57 +0300 Subject: [PATCH] collector: align ingest sender with server response --- internal/collector/ingest/outbox.go | 56 ++++++++++++++---------- internal/collector/ingest/outbox_test.go | 35 ++++++++++++++- internal/collector/ingest/sender.go | 12 +++-- internal/collector/ingest/sender_test.go | 32 ++++++++++++-- 4 files changed, 104 insertions(+), 31 deletions(-) diff --git a/internal/collector/ingest/outbox.go b/internal/collector/ingest/outbox.go index bd22b958d22e37bc841606c49a45dcd73cb98d2d..a1381ef3a5e91020a662caf9444682c288cabc5f 100644 --- a/internal/collector/ingest/outbox.go +++ b/internal/collector/ingest/outbox.go @@ -3,6 +3,7 @@ package ingest import ( "bytes" "context" + "fmt" "go.bigb.es/auxilia/culpa" "sourcecraft.dev/bigbes/lethe/internal/collector/state" @@ -38,34 +39,41 @@ func ReplayOutbox(ctx context.Context, store *state.Store, sender *Sender, limit // EnforceOutboxLimit drops the oldest rows until total payload bytes are // within maxBytes. It never blocks the ingestion loop. func EnforceOutboxLimit(ctx context.Context, store *state.Store, maxBytes int64) error { - st, err := store.Stats(ctx) - if err != nil { - return culpa.Wrap(err, "get outbox stats") - } - if st.OutboxBytes <= maxBytes { - return nil - } + for { + st, err := store.Stats(ctx) + if err != nil { + return culpa.Wrap(err, "get outbox stats") + } + if st.OutboxBytes <= maxBytes { + return nil + } - excess := st.OutboxBytes - maxBytes - rows, err := store.Oldest(ctx, 1000) // reasonable batch size - if err != nil { - return culpa.Wrap(err, "fetch oldest for trim") - } + rows, err := store.Oldest(ctx, 1000) // reasonable batch size + if err != nil { + return culpa.Wrap(err, "fetch oldest for trim") + } + if len(rows) == 0 { + return culpa.WithCode( + fmt.Errorf("outbox still exceeds limit (%d bytes > %d) with no rows remaining", st.OutboxBytes, maxBytes), + "OUTBOX_TRIM_INCONSISTENT", + ) + } - var deleteIDs []int64 - var dropped int64 - for _, row := range rows { - deleteIDs = append(deleteIDs, row.ID) - dropped += int64(len(row.Payload)) - if dropped >= excess { - break + excess := st.OutboxBytes - maxBytes + var deleteIDs []int64 + var dropped int64 + for _, row := range rows { + deleteIDs = append(deleteIDs, row.ID) + dropped += int64(len(row.Payload)) + if dropped >= excess { + break + } } - } - if len(deleteIDs) > 0 { - if err := store.Delete(ctx, deleteIDs); err != nil { - return culpa.Wrap(err, "trim outbox") + if len(deleteIDs) > 0 { + if err := store.Delete(ctx, deleteIDs); err != nil { + return culpa.Wrap(err, "trim outbox") + } } } - return nil } diff --git a/internal/collector/ingest/outbox_test.go b/internal/collector/ingest/outbox_test.go index 447edaecc04cbcea0f8fd459809bb0a7b42efba7..6888fb1597029111713f0e665a9222652c800389 100644 --- a/internal/collector/ingest/outbox_test.go +++ b/internal/collector/ingest/outbox_test.go @@ -77,7 +77,7 @@ func TestReplayOutbox_LeavesRowOnPartialAccept(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) - _, _ = w.Write([]byte(`{"accepted":1,"errors":["bad row"]}`)) + _, _ = w.Write([]byte(`{"accepted":1,"errors":[{"line":2,"error":"bad row"}]}`)) })) defer ts.Close() @@ -222,3 +222,36 @@ func TestEnforceOutboxLimit_EmptyOutbox(t *testing.T) { t.Fatalf("EnforceOutboxLimit: %v", err) } } + +func TestEnforceOutboxLimit_RepeatedTrimming(t *testing.T) { + ctx := context.Background() + dir := t.TempDir() + store, err := state.Open(ctx, dir+"/test.db") + if err != nil { + t.Fatalf("Open: %v", err) + } + defer func() { _ = store.Close() }() + + // Enqueue more than 1000 items so a single batch is insufficient. + for i := 0; i < 1002; i++ { + item := state.OutboxItem{Tool: "cc", Host: "h", SourceFile: "/a.jsonl", Payload: []byte("x")} + if err := store.Enqueue(ctx, item); err != nil { + t.Fatalf("Enqueue %d: %v", i, err) + } + } + + if err := EnforceOutboxLimit(ctx, store, 0); err != nil { + t.Fatalf("EnforceOutboxLimit: %v", err) + } + + st, err := store.Stats(ctx) + if err != nil { + t.Fatalf("Stats: %v", err) + } + if st.OutboxBytes > 0 { + t.Errorf("OutboxBytes = %d, want 0", st.OutboxBytes) + } + if st.OutboxCount != 0 { + t.Errorf("OutboxCount = %d, want 0", st.OutboxCount) + } +} diff --git a/internal/collector/ingest/sender.go b/internal/collector/ingest/sender.go index 19656bffdc81a2823a0f053dfb186df83dfaf6f4..f5720ef864d2d44e57c08d7669a6d60df511221a 100644 --- a/internal/collector/ingest/sender.go +++ b/internal/collector/ingest/sender.go @@ -14,10 +14,16 @@ import ( "sourcecraft.dev/bigbes/lethe/internal/shared/wire" ) +// LineError is the server's per-line failure shape. +type LineError struct { + Line int `json:"line"` + Err string `json:"error"` +} + // Result is the server's response to a batch ingest POST. type Result struct { - Accepted int `json:"accepted"` - Errors []string `json:"errors"` + Accepted int `json:"accepted"` + Errors []LineError `json:"errors"` } // Sender POSTs NDJSON batches to the lethe ingest endpoint. @@ -57,7 +63,7 @@ func (s *Sender) postRaw(ctx context.Context, body []byte) (Result, error) { if resp.StatusCode < 200 || resp.StatusCode >= 300 { body, _ := io.ReadAll(resp.Body) return Result{}, culpa.WithCode( - fmt.Errorf("ingest %s: %s (body: %s)", resp.Status, string(body), resp.Status), + fmt.Errorf("ingest %s (body: %s)", resp.Status, string(body)), "INGEST_HTTP", ) } diff --git a/internal/collector/ingest/sender_test.go b/internal/collector/ingest/sender_test.go index 6a64fa8d4516ea1ce47879efcfe346a7938632ec..b8343674972bc18bb930bb076808f5e7d25014ad 100644 --- a/internal/collector/ingest/sender_test.go +++ b/internal/collector/ingest/sender_test.go @@ -139,7 +139,7 @@ func TestSender_PostBatch_PartialAccept(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) - _, _ = w.Write([]byte(`{"accepted":1,"errors":["bad row"]}`)) + _, _ = w.Write([]byte(`{"accepted":1,"errors":[{"line":2,"error":"bad row"}]}`)) })) defer ts.Close() @@ -154,7 +154,33 @@ func TestSender_PostBatch_PartialAccept(t *testing.T) { if result.Accepted != 1 { t.Errorf("Accepted = %d, want 1", result.Accepted) } - if len(result.Errors) != 1 || result.Errors[0] != "bad row" { - t.Errorf("Errors = %v, want [bad row]", result.Errors) + if len(result.Errors) != 1 || result.Errors[0].Line != 2 || result.Errors[0].Err != "bad row" { + t.Errorf("Errors = %+v, want one LineError{Line:2, Err:\"bad row\"}", result.Errors) + } +} + +func TestSender_PostBatch_StructuredErrors(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"accepted":0,"errors":[{"line":1,"error":"bad"}]}`)) + })) + defer ts.Close() + + sender := NewSender(ts.URL, http.DefaultClient) + result, err := sender.PostBatch(context.Background(), []wire.TurnEvent{ + {Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t1", Seq: 1, Role: "user", Timestamp: 1000, Content: "hello"}, + }) + if err != nil { + t.Fatalf("PostBatch: %v", err) + } + if result.Accepted != 0 { + t.Errorf("Accepted = %d, want 0", result.Accepted) + } + if len(result.Errors) != 1 { + t.Fatalf("Errors = %+v, want 1 error", result.Errors) + } + if result.Errors[0].Line != 1 || result.Errors[0].Err != "bad" { + t.Errorf("Errors[0] = %+v, want Line=1 Err=bad", result.Errors[0]) } }