M internal/collector/ingest/outbox.go => internal/collector/ingest/outbox.go +32 -24
@@ 3,6 3,7 @@ package ingest
import (
"bytes"
"context"
+ "fmt"
"go.bigb.es/auxilia/culpa"
"sourcecraft.dev/bigbes/lethe/internal/collector/state"
@@ 38,34 39,41 @@ func ReplayOutbox(ctx context.Context, store *state.Store, sender *Sender, limit
// EnforceOutboxLimit drops the oldest rows until total payload bytes are
// within maxBytes. It never blocks the ingestion loop.
func EnforceOutboxLimit(ctx context.Context, store *state.Store, maxBytes int64) error {
- st, err := store.Stats(ctx)
- if err != nil {
- return culpa.Wrap(err, "get outbox stats")
- }
- if st.OutboxBytes <= maxBytes {
- return nil
- }
+ for {
+ st, err := store.Stats(ctx)
+ if err != nil {
+ return culpa.Wrap(err, "get outbox stats")
+ }
+ if st.OutboxBytes <= maxBytes {
+ return nil
+ }
- excess := st.OutboxBytes - maxBytes
- rows, err := store.Oldest(ctx, 1000) // reasonable batch size
- if err != nil {
- return culpa.Wrap(err, "fetch oldest for trim")
- }
+ rows, err := store.Oldest(ctx, 1000) // reasonable batch size
+ if err != nil {
+ return culpa.Wrap(err, "fetch oldest for trim")
+ }
+ if len(rows) == 0 {
+ return culpa.WithCode(
+ fmt.Errorf("outbox still exceeds limit (%d bytes > %d) with no rows remaining", st.OutboxBytes, maxBytes),
+ "OUTBOX_TRIM_INCONSISTENT",
+ )
+ }
- var deleteIDs []int64
- var dropped int64
- for _, row := range rows {
- deleteIDs = append(deleteIDs, row.ID)
- dropped += int64(len(row.Payload))
- if dropped >= excess {
- break
+ excess := st.OutboxBytes - maxBytes
+ var deleteIDs []int64
+ var dropped int64
+ for _, row := range rows {
+ deleteIDs = append(deleteIDs, row.ID)
+ dropped += int64(len(row.Payload))
+ if dropped >= excess {
+ break
+ }
}
- }
- if len(deleteIDs) > 0 {
- if err := store.Delete(ctx, deleteIDs); err != nil {
- return culpa.Wrap(err, "trim outbox")
+ if len(deleteIDs) > 0 {
+ if err := store.Delete(ctx, deleteIDs); err != nil {
+ return culpa.Wrap(err, "trim outbox")
+ }
}
}
- return nil
}
M internal/collector/ingest/outbox_test.go => internal/collector/ingest/outbox_test.go +34 -1
@@ 77,7 77,7 @@ func TestReplayOutbox_LeavesRowOnPartialAccept(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
- _, _ = w.Write([]byte(`{"accepted":1,"errors":["bad row"]}`))
+ _, _ = w.Write([]byte(`{"accepted":1,"errors":[{"line":2,"error":"bad row"}]}`))
}))
defer ts.Close()
@@ 222,3 222,36 @@ func TestEnforceOutboxLimit_EmptyOutbox(t *testing.T) {
t.Fatalf("EnforceOutboxLimit: %v", err)
}
}
+
+func TestEnforceOutboxLimit_RepeatedTrimming(t *testing.T) {
+ ctx := context.Background()
+ dir := t.TempDir()
+ store, err := state.Open(ctx, dir+"/test.db")
+ if err != nil {
+ t.Fatalf("Open: %v", err)
+ }
+ defer func() { _ = store.Close() }()
+
+ // Enqueue more than 1000 items so a single batch is insufficient.
+ for i := 0; i < 1002; i++ {
+ item := state.OutboxItem{Tool: "cc", Host: "h", SourceFile: "/a.jsonl", Payload: []byte("x")}
+ if err := store.Enqueue(ctx, item); err != nil {
+ t.Fatalf("Enqueue %d: %v", i, err)
+ }
+ }
+
+ if err := EnforceOutboxLimit(ctx, store, 0); err != nil {
+ t.Fatalf("EnforceOutboxLimit: %v", err)
+ }
+
+ st, err := store.Stats(ctx)
+ if err != nil {
+ t.Fatalf("Stats: %v", err)
+ }
+ if st.OutboxBytes > 0 {
+ t.Errorf("OutboxBytes = %d, want 0", st.OutboxBytes)
+ }
+ if st.OutboxCount != 0 {
+ t.Errorf("OutboxCount = %d, want 0", st.OutboxCount)
+ }
+}
M internal/collector/ingest/sender.go => internal/collector/ingest/sender.go +9 -3
@@ 14,10 14,16 @@ import (
"sourcecraft.dev/bigbes/lethe/internal/shared/wire"
)
+// LineError is the server's per-line failure shape.
+type LineError struct {
+ Line int `json:"line"`
+ Err string `json:"error"`
+}
+
// Result is the server's response to a batch ingest POST.
type Result struct {
- Accepted int `json:"accepted"`
- Errors []string `json:"errors"`
+ Accepted int `json:"accepted"`
+ Errors []LineError `json:"errors"`
}
// Sender POSTs NDJSON batches to the lethe ingest endpoint.
@@ 57,7 63,7 @@ func (s *Sender) postRaw(ctx context.Context, body []byte) (Result, error) {
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
body, _ := io.ReadAll(resp.Body)
return Result{}, culpa.WithCode(
- fmt.Errorf("ingest %s: %s (body: %s)", resp.Status, string(body), resp.Status),
+ fmt.Errorf("ingest %s (body: %s)", resp.Status, string(body)),
"INGEST_HTTP",
)
}
M internal/collector/ingest/sender_test.go => internal/collector/ingest/sender_test.go +29 -3
@@ 139,7 139,7 @@ func TestSender_PostBatch_PartialAccept(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
- _, _ = w.Write([]byte(`{"accepted":1,"errors":["bad row"]}`))
+ _, _ = w.Write([]byte(`{"accepted":1,"errors":[{"line":2,"error":"bad row"}]}`))
}))
defer ts.Close()
@@ 154,7 154,33 @@ func TestSender_PostBatch_PartialAccept(t *testing.T) {
if result.Accepted != 1 {
t.Errorf("Accepted = %d, want 1", result.Accepted)
}
- if len(result.Errors) != 1 || result.Errors[0] != "bad row" {
- t.Errorf("Errors = %v, want [bad row]", result.Errors)
+ if len(result.Errors) != 1 || result.Errors[0].Line != 2 || result.Errors[0].Err != "bad row" {
+ t.Errorf("Errors = %+v, want one LineError{Line:2, Err:\"bad row\"}", result.Errors)
+ }
+}
+
+func TestSender_PostBatch_StructuredErrors(t *testing.T) {
+ ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(http.StatusOK)
+ _, _ = w.Write([]byte(`{"accepted":0,"errors":[{"line":1,"error":"bad"}]}`))
+ }))
+ defer ts.Close()
+
+ sender := NewSender(ts.URL, http.DefaultClient)
+ result, err := sender.PostBatch(context.Background(), []wire.TurnEvent{
+ {Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t1", Seq: 1, Role: "user", Timestamp: 1000, Content: "hello"},
+ })
+ if err != nil {
+ t.Fatalf("PostBatch: %v", err)
+ }
+ if result.Accepted != 0 {
+ t.Errorf("Accepted = %d, want 0", result.Accepted)
+ }
+ if len(result.Errors) != 1 {
+ t.Fatalf("Errors = %+v, want 1 error", result.Errors)
+ }
+ if result.Errors[0].Line != 1 || result.Errors[0].Err != "bad" {
+ t.Errorf("Errors[0] = %+v, want Line=1 Err=bad", result.Errors[0])
}
}