From ad5ee6528aae4f5d70cee77f87bd9b275eb21928 Mon Sep 17 00:00:00 2001 From: Eugene Blikh Date: Sun, 3 May 2026 18:17:02 +0300 Subject: [PATCH] collector: add ingest sender and outbox replay --- internal/collector/ingest/outbox.go | 71 +++++++ internal/collector/ingest/outbox_test.go | 224 +++++++++++++++++++++++ internal/collector/ingest/sender.go | 86 +++++++++ internal/collector/ingest/sender_test.go | 160 ++++++++++++++++ 4 files changed, 541 insertions(+) create mode 100644 internal/collector/ingest/outbox.go create mode 100644 internal/collector/ingest/outbox_test.go create mode 100644 internal/collector/ingest/sender.go create mode 100644 internal/collector/ingest/sender_test.go diff --git a/internal/collector/ingest/outbox.go b/internal/collector/ingest/outbox.go new file mode 100644 index 0000000000000000000000000000000000000000..bd22b958d22e37bc841606c49a45dcd73cb98d2d --- /dev/null +++ b/internal/collector/ingest/outbox.go @@ -0,0 +1,71 @@ +package ingest + +import ( + "bytes" + "context" + + "go.bigb.es/auxilia/culpa" + "sourcecraft.dev/bigbes/lethe/internal/collector/state" +) + +// ReplayOutbox sends the oldest outbox rows to the server, deleting each row +// only when the server reports full acceptance (no errors and accepted count +// equals the number of events in that row's payload). +func ReplayOutbox(ctx context.Context, store *state.Store, sender *Sender, limit int) error { + rows, err := store.Oldest(ctx, limit) + if err != nil { + return culpa.Wrap(err, "fetch outbox rows") + } + + for _, row := range rows { + result, err := sender.postRaw(ctx, row.Payload) + if err != nil { + return culpa.Wrap(err, "replay outbox row") + } + + eventCount := bytes.Count(row.Payload, []byte("\n")) + if len(result.Errors) == 0 && result.Accepted == eventCount { + if err := store.Delete(ctx, []int64{row.ID}); err != nil { + return culpa.Wrap(err, "delete replayed outbox row") + } + } + // Partial accept or server-side errors: leave row for next retry. + } + + return nil +} + +// EnforceOutboxLimit drops the oldest rows until total payload bytes are +// within maxBytes. It never blocks the ingestion loop. +func EnforceOutboxLimit(ctx context.Context, store *state.Store, maxBytes int64) error { + st, err := store.Stats(ctx) + if err != nil { + return culpa.Wrap(err, "get outbox stats") + } + if st.OutboxBytes <= maxBytes { + return nil + } + + excess := st.OutboxBytes - maxBytes + rows, err := store.Oldest(ctx, 1000) // reasonable batch size + if err != nil { + return culpa.Wrap(err, "fetch oldest for trim") + } + + var deleteIDs []int64 + var dropped int64 + for _, row := range rows { + deleteIDs = append(deleteIDs, row.ID) + dropped += int64(len(row.Payload)) + if dropped >= excess { + break + } + } + + if len(deleteIDs) > 0 { + if err := store.Delete(ctx, deleteIDs); err != nil { + return culpa.Wrap(err, "trim outbox") + } + } + return nil +} diff --git a/internal/collector/ingest/outbox_test.go b/internal/collector/ingest/outbox_test.go new file mode 100644 index 0000000000000000000000000000000000000000..447edaecc04cbcea0f8fd459809bb0a7b42efba7 --- /dev/null +++ b/internal/collector/ingest/outbox_test.go @@ -0,0 +1,224 @@ +package ingest + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + + "sourcecraft.dev/bigbes/lethe/internal/collector/state" +) + +func TestReplayOutbox_DeletesOnlyFullyAcceptedRows(t *testing.T) { + ctx := context.Background() + dir := t.TempDir() + store, err := state.Open(ctx, dir+"/test.db") + if err != nil { + t.Fatalf("Open: %v", err) + } + defer func() { _ = store.Close() }() + + // Enqueue two items. + item1 := state.OutboxItem{Tool: "cc", Host: "h", SourceFile: "/a.jsonl", Payload: []byte(`{"seq":1} +`)} + item2 := state.OutboxItem{Tool: "cc", Host: "h", SourceFile: "/b.jsonl", Payload: []byte(`{"seq":2} +`)} + if err := store.Enqueue(ctx, item1); err != nil { + t.Fatalf("Enqueue 1: %v", err) + } + if err := store.Enqueue(ctx, item2); err != nil { + t.Fatalf("Enqueue 2: %v", err) + } + + // Server accepts all rows. + callCount := 0 + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + callCount++ + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"accepted":1,"errors":[]}`)) + })) + defer ts.Close() + + sender := NewSender(ts.URL, http.DefaultClient) + if err := ReplayOutbox(ctx, store, sender, 10); err != nil { + t.Fatalf("ReplayOutbox: %v", err) + } + + if callCount != 2 { + t.Errorf("expected 2 server calls, got %d", callCount) + } + + rows, err := store.Oldest(ctx, 10) + if err != nil { + t.Fatalf("Oldest: %v", err) + } + if len(rows) != 0 { + t.Errorf("expected 0 rows after full replay, got %d", len(rows)) + } +} + +func TestReplayOutbox_LeavesRowOnPartialAccept(t *testing.T) { + ctx := context.Background() + dir := t.TempDir() + store, err := state.Open(ctx, dir+"/test.db") + if err != nil { + t.Fatalf("Open: %v", err) + } + defer func() { _ = store.Close() }() + + item := state.OutboxItem{Tool: "cc", Host: "h", SourceFile: "/a.jsonl", Payload: []byte(`{"seq":1} +{"seq":2} +`)} + if err := store.Enqueue(ctx, item); err != nil { + t.Fatalf("Enqueue: %v", err) + } + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"accepted":1,"errors":["bad row"]}`)) + })) + defer ts.Close() + + sender := NewSender(ts.URL, http.DefaultClient) + if err := ReplayOutbox(ctx, store, sender, 10); err != nil { + t.Fatalf("ReplayOutbox: %v", err) + } + + rows, err := store.Oldest(ctx, 10) + if err != nil { + t.Fatalf("Oldest: %v", err) + } + if len(rows) != 1 { + t.Errorf("expected 1 row after partial accept, got %d", len(rows)) + } +} + +func TestReplayOutbox_LeavesRowOnError(t *testing.T) { + ctx := context.Background() + dir := t.TempDir() + store, err := state.Open(ctx, dir+"/test.db") + if err != nil { + t.Fatalf("Open: %v", err) + } + defer func() { _ = store.Close() }() + + item := state.OutboxItem{Tool: "cc", Host: "h", SourceFile: "/a.jsonl", Payload: []byte(`{"seq":1} +`)} + if err := store.Enqueue(ctx, item); err != nil { + t.Fatalf("Enqueue: %v", err) + } + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte(`boom`)) + })) + defer ts.Close() + + sender := NewSender(ts.URL, http.DefaultClient) + // Expect error to propagate. + if err := ReplayOutbox(ctx, store, sender, 10); err == nil { + t.Fatal("expected error on 500, got nil") + } + + rows, err := store.Oldest(ctx, 10) + if err != nil { + t.Fatalf("Oldest: %v", err) + } + if len(rows) != 1 { + t.Errorf("expected 1 row after error, got %d", len(rows)) + } +} + +func TestEnforceOutboxLimit_DropsOldestRows(t *testing.T) { + ctx := context.Background() + dir := t.TempDir() + store, err := state.Open(ctx, dir+"/test.db") + if err != nil { + t.Fatalf("Open: %v", err) + } + defer func() { _ = store.Close() }() + + // Enqueue three items with known sizes. + items := []state.OutboxItem{ + {Tool: "cc", Host: "h", SourceFile: "/a.jsonl", Payload: []byte(`{"seq":1}`)}, // 9 bytes + {Tool: "cc", Host: "h", SourceFile: "/b.jsonl", Payload: []byte(`{"seq":2}`)}, // 9 bytes + {Tool: "cc", Host: "h", SourceFile: "/c.jsonl", Payload: []byte(`{"seq":333}`)}, // 11 bytes + } + for _, it := range items { + if err := store.Enqueue(ctx, it); err != nil { + t.Fatalf("Enqueue: %v", err) + } + } + + // Total = 29 bytes. Cap at 20 => drop oldest until under 20. + if err := EnforceOutboxLimit(ctx, store, 20); err != nil { + t.Fatalf("EnforceOutboxLimit: %v", err) + } + + rows, err := store.Oldest(ctx, 10) + if err != nil { + t.Fatalf("Oldest: %v", err) + } + + // Should have dropped the first row (9 bytes), leaving 20 bytes exactly. + if len(rows) != 2 { + t.Errorf("expected 2 rows, got %d", len(rows)) + } + + // Verify the oldest remaining row is the second item. + if len(rows) > 0 && string(rows[0].Payload) != `{"seq":2}` { + t.Errorf("oldest remaining payload = %q, want {\"seq\":2}", string(rows[0].Payload)) + } + + // Verify stats are under limit. + st, err := store.Stats(ctx) + if err != nil { + t.Fatalf("Stats: %v", err) + } + if st.OutboxBytes > 20 { + t.Errorf("OutboxBytes = %d, want <= 20", st.OutboxBytes) + } +} + +func TestEnforceOutboxLimit_NoOpWhenUnderLimit(t *testing.T) { + ctx := context.Background() + dir := t.TempDir() + store, err := state.Open(ctx, dir+"/test.db") + if err != nil { + t.Fatalf("Open: %v", err) + } + defer func() { _ = store.Close() }() + + item := state.OutboxItem{Tool: "cc", Host: "h", SourceFile: "/a.jsonl", Payload: []byte(`x`)} + if err := store.Enqueue(ctx, item); err != nil { + t.Fatalf("Enqueue: %v", err) + } + + if err := EnforceOutboxLimit(ctx, store, 1000); err != nil { + t.Fatalf("EnforceOutboxLimit: %v", err) + } + + rows, err := store.Oldest(ctx, 10) + if err != nil { + t.Fatalf("Oldest: %v", err) + } + if len(rows) != 1 { + t.Errorf("expected 1 row, got %d", len(rows)) + } +} + +func TestEnforceOutboxLimit_EmptyOutbox(t *testing.T) { + ctx := context.Background() + dir := t.TempDir() + store, err := state.Open(ctx, dir+"/test.db") + if err != nil { + t.Fatalf("Open: %v", err) + } + defer func() { _ = store.Close() }() + + if err := EnforceOutboxLimit(ctx, store, 100); err != nil { + t.Fatalf("EnforceOutboxLimit: %v", err) + } +} diff --git a/internal/collector/ingest/sender.go b/internal/collector/ingest/sender.go new file mode 100644 index 0000000000000000000000000000000000000000..19656bffdc81a2823a0f053dfb186df83dfaf6f4 --- /dev/null +++ b/internal/collector/ingest/sender.go @@ -0,0 +1,86 @@ +// Package ingest handles HTTP posting of event batches to the lethe server +// and outbox replay for durability. +package ingest + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + + "go.bigb.es/auxilia/culpa" + "sourcecraft.dev/bigbes/lethe/internal/shared/wire" +) + +// Result is the server's response to a batch ingest POST. +type Result struct { + Accepted int `json:"accepted"` + Errors []string `json:"errors"` +} + +// Sender POSTs NDJSON batches to the lethe ingest endpoint. +type Sender struct { + serverURL string + client *http.Client +} + +// NewSender builds a Sender that posts to serverURL + /api/v1/ingest. +func NewSender(serverURL string, client *http.Client) *Sender { + return &Sender{serverURL: serverURL, client: client} +} + +// PostBatch encodes events as NDJSON and POSTs them to the ingest endpoint. +func (s *Sender) PostBatch(ctx context.Context, events []wire.TurnEvent) (Result, error) { + body, err := EncodeNDJSON(events) + if err != nil { + return Result{}, culpa.Wrap(err, "encode ndjson") + } + return s.postRaw(ctx, body) +} + +// postRaw sends raw NDJSON bytes and decodes the server's response. +func (s *Sender) postRaw(ctx context.Context, body []byte) (Result, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodPost, s.serverURL+"/api/v1/ingest", bytes.NewReader(body)) + if err != nil { + return Result{}, culpa.Wrap(err, "build request") + } + req.Header.Set("Content-Type", "application/x-ndjson") + + resp, err := s.client.Do(req) + if err != nil { + return Result{}, culpa.Wrap(err, "post batch") + } + defer func() { _ = resp.Body.Close() }() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + body, _ := io.ReadAll(resp.Body) + return Result{}, culpa.WithCode( + fmt.Errorf("ingest %s: %s (body: %s)", resp.Status, string(body), resp.Status), + "INGEST_HTTP", + ) + } + + var result Result + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return Result{}, culpa.Wrap(err, "decode ingest response") + } + return result, nil +} + +// EncodeNDJSON serialises a slice of TurnEvent into newline-delimited JSON. +// Each event is one JSON object per line; the final line ends with a newline. +func EncodeNDJSON(events []wire.TurnEvent) ([]byte, error) { + if len(events) == 0 { + return nil, nil + } + var buf bytes.Buffer + enc := json.NewEncoder(&buf) + for _, ev := range events { + if err := enc.Encode(ev); err != nil { + return nil, culpa.Wrap(err, "encode event") + } + } + return buf.Bytes(), nil +} diff --git a/internal/collector/ingest/sender_test.go b/internal/collector/ingest/sender_test.go new file mode 100644 index 0000000000000000000000000000000000000000..6a64fa8d4516ea1ce47879efcfe346a7938632ec --- /dev/null +++ b/internal/collector/ingest/sender_test.go @@ -0,0 +1,160 @@ +package ingest + +import ( + "context" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "sourcecraft.dev/bigbes/lethe/internal/shared/wire" +) + +func TestEncodeNDJSON_EmitsOneObjectPerLineWithTrailingNewline(t *testing.T) { + events := []wire.TurnEvent{ + {Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t1", Seq: 1, Role: "user", Timestamp: 1000, Content: "hello"}, + {Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t2", Seq: 2, Role: "assistant", Timestamp: 1001, Content: "world"}, + } + + data, err := EncodeNDJSON(events) + if err != nil { + t.Fatalf("EncodeNDJSON: %v", err) + } + + lines := strings.Split(string(data), "\n") + // Trailing newline means last element is empty. + if len(lines) != 3 { + t.Fatalf("expected 3 lines (2 events + trailing empty), got %d", len(lines)) + } + if lines[2] != "" { + t.Errorf("expected trailing empty line, got %q", lines[2]) + } + + for i, line := range lines[:2] { + if line == "" { + t.Fatalf("line %d is empty", i) + } + var ev wire.TurnEvent + if err := json.Unmarshal([]byte(line), &ev); err != nil { + t.Fatalf("line %d invalid JSON: %v", i, err) + } + } +} + +func TestEncodeNDJSON_EmptySlice(t *testing.T) { + data, err := EncodeNDJSON([]wire.TurnEvent{}) + if err != nil { + t.Fatalf("EncodeNDJSON: %v", err) + } + if len(data) != 0 { + t.Errorf("expected empty bytes for empty slice, got %q", string(data)) + } +} + +func TestSender_PostBatch_Success(t *testing.T) { + var gotBody []byte + var gotContentType string + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + t.Errorf("expected POST, got %s", r.Method) + } + if r.URL.Path != "/api/v1/ingest" { + t.Errorf("expected path /api/v1/ingest, got %s", r.URL.Path) + } + gotContentType = r.Header.Get("Content-Type") + var err error + gotBody, err = io.ReadAll(r.Body) + if err != nil { + t.Fatalf("read body: %v", err) + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"accepted":2,"errors":[]}`)) + })) + defer ts.Close() + + sender := NewSender(ts.URL, http.DefaultClient) + events := []wire.TurnEvent{ + {Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t1", Seq: 1, Role: "user", Timestamp: 1000, Content: "hello"}, + {Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t2", Seq: 2, Role: "assistant", Timestamp: 1001, Content: "world"}, + } + + result, err := sender.PostBatch(context.Background(), events) + if err != nil { + t.Fatalf("PostBatch: %v", err) + } + if result.Accepted != 2 { + t.Errorf("Accepted = %d, want 2", result.Accepted) + } + if len(result.Errors) != 0 { + t.Errorf("Errors = %v, want empty", result.Errors) + } + if gotContentType != "application/x-ndjson" { + t.Errorf("Content-Type = %q, want application/x-ndjson", gotContentType) + } + + // Verify body is valid NDJSON. + lines := strings.Split(string(gotBody), "\n") + if len(lines) != 3 || lines[2] != "" { + t.Errorf("body does not look like NDJSON with trailing newline: %q", string(gotBody)) + } +} + +func TestSender_PostBatch_Non2xx(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusServiceUnavailable) + _, _ = w.Write([]byte(`busy`)) + })) + defer ts.Close() + + sender := NewSender(ts.URL, http.DefaultClient) + _, err := sender.PostBatch(context.Background(), []wire.TurnEvent{ + {Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t1", Seq: 1, Role: "user", Timestamp: 1000, Content: "hello"}, + }) + if err == nil { + t.Fatal("expected error on 503, got nil") + } +} + +func TestSender_PostBatch_MalformedResponse(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`not json`)) + })) + defer ts.Close() + + sender := NewSender(ts.URL, http.DefaultClient) + _, err := sender.PostBatch(context.Background(), []wire.TurnEvent{ + {Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t1", Seq: 1, Role: "user", Timestamp: 1000, Content: "hello"}, + }) + if err == nil { + t.Fatal("expected error on malformed response, got nil") + } +} + +func TestSender_PostBatch_PartialAccept(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"accepted":1,"errors":["bad row"]}`)) + })) + defer ts.Close() + + sender := NewSender(ts.URL, http.DefaultClient) + result, err := sender.PostBatch(context.Background(), []wire.TurnEvent{ + {Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t1", Seq: 1, Role: "user", Timestamp: 1000, Content: "hello"}, + {Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t2", Seq: 2, Role: "assistant", Timestamp: 1001, Content: "world"}, + }) + if err != nil { + t.Fatalf("PostBatch: %v", err) + } + if result.Accepted != 1 { + t.Errorf("Accepted = %d, want 1", result.Accepted) + } + if len(result.Errors) != 1 || result.Errors[0] != "bad row" { + t.Errorf("Errors = %v, want [bad row]", result.Errors) + } +}