~bigbes/lethe

ad5ee6528aae4f5d70cee77f87bd9b275eb21928 — Eugene Blikh 24 days ago d7eb706
collector: add ingest sender and outbox replay
A internal/collector/ingest/outbox.go => internal/collector/ingest/outbox.go +71 -0
@@ 0,0 1,71 @@
package ingest

import (
	"bytes"
	"context"

	"go.bigb.es/auxilia/culpa"
	"sourcecraft.dev/bigbes/lethe/internal/collector/state"
)

// ReplayOutbox sends the oldest outbox rows to the server, deleting each row
// only when the server reports full acceptance (no errors and accepted count
// equals the number of events in that row's payload).
func ReplayOutbox(ctx context.Context, store *state.Store, sender *Sender, limit int) error {
	rows, err := store.Oldest(ctx, limit)
	if err != nil {
		return culpa.Wrap(err, "fetch outbox rows")
	}

	for _, row := range rows {
		result, err := sender.postRaw(ctx, row.Payload)
		if err != nil {
			return culpa.Wrap(err, "replay outbox row")
		}

		eventCount := bytes.Count(row.Payload, []byte("\n"))
		if len(result.Errors) == 0 && result.Accepted == eventCount {
			if err := store.Delete(ctx, []int64{row.ID}); err != nil {
				return culpa.Wrap(err, "delete replayed outbox row")
			}
		}
		// Partial accept or server-side errors: leave row for next retry.
	}

	return nil
}

// EnforceOutboxLimit drops the oldest rows until total payload bytes are
// within maxBytes. It never blocks the ingestion loop.
func EnforceOutboxLimit(ctx context.Context, store *state.Store, maxBytes int64) error {
	st, err := store.Stats(ctx)
	if err != nil {
		return culpa.Wrap(err, "get outbox stats")
	}
	if st.OutboxBytes <= maxBytes {
		return nil
	}

	excess := st.OutboxBytes - maxBytes
	rows, err := store.Oldest(ctx, 1000) // reasonable batch size
	if err != nil {
		return culpa.Wrap(err, "fetch oldest for trim")
	}

	var deleteIDs []int64
	var dropped int64
	for _, row := range rows {
		deleteIDs = append(deleteIDs, row.ID)
		dropped += int64(len(row.Payload))
		if dropped >= excess {
			break
		}
	}

	if len(deleteIDs) > 0 {
		if err := store.Delete(ctx, deleteIDs); err != nil {
			return culpa.Wrap(err, "trim outbox")
		}
	}
	return nil
}

A internal/collector/ingest/outbox_test.go => internal/collector/ingest/outbox_test.go +224 -0
@@ 0,0 1,224 @@
package ingest

import (
	"context"
	"net/http"
	"net/http/httptest"
	"testing"

	"sourcecraft.dev/bigbes/lethe/internal/collector/state"
)

func TestReplayOutbox_DeletesOnlyFullyAcceptedRows(t *testing.T) {
	ctx := context.Background()
	dir := t.TempDir()
	store, err := state.Open(ctx, dir+"/test.db")
	if err != nil {
		t.Fatalf("Open: %v", err)
	}
	defer func() { _ = store.Close() }()

	// Enqueue two items.
	item1 := state.OutboxItem{Tool: "cc", Host: "h", SourceFile: "/a.jsonl", Payload: []byte(`{"seq":1}
`)}
	item2 := state.OutboxItem{Tool: "cc", Host: "h", SourceFile: "/b.jsonl", Payload: []byte(`{"seq":2}
`)}
	if err := store.Enqueue(ctx, item1); err != nil {
		t.Fatalf("Enqueue 1: %v", err)
	}
	if err := store.Enqueue(ctx, item2); err != nil {
		t.Fatalf("Enqueue 2: %v", err)
	}

	// Server accepts all rows.
	callCount := 0
	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		callCount++
		w.Header().Set("Content-Type", "application/json")
		w.WriteHeader(http.StatusOK)
		_, _ = w.Write([]byte(`{"accepted":1,"errors":[]}`))
	}))
	defer ts.Close()

	sender := NewSender(ts.URL, http.DefaultClient)
	if err := ReplayOutbox(ctx, store, sender, 10); err != nil {
		t.Fatalf("ReplayOutbox: %v", err)
	}

	if callCount != 2 {
		t.Errorf("expected 2 server calls, got %d", callCount)
	}

	rows, err := store.Oldest(ctx, 10)
	if err != nil {
		t.Fatalf("Oldest: %v", err)
	}
	if len(rows) != 0 {
		t.Errorf("expected 0 rows after full replay, got %d", len(rows))
	}
}

func TestReplayOutbox_LeavesRowOnPartialAccept(t *testing.T) {
	ctx := context.Background()
	dir := t.TempDir()
	store, err := state.Open(ctx, dir+"/test.db")
	if err != nil {
		t.Fatalf("Open: %v", err)
	}
	defer func() { _ = store.Close() }()

	item := state.OutboxItem{Tool: "cc", Host: "h", SourceFile: "/a.jsonl", Payload: []byte(`{"seq":1}
{"seq":2}
`)}
	if err := store.Enqueue(ctx, item); err != nil {
		t.Fatalf("Enqueue: %v", err)
	}

	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		w.Header().Set("Content-Type", "application/json")
		w.WriteHeader(http.StatusOK)
		_, _ = w.Write([]byte(`{"accepted":1,"errors":["bad row"]}`))
	}))
	defer ts.Close()

	sender := NewSender(ts.URL, http.DefaultClient)
	if err := ReplayOutbox(ctx, store, sender, 10); err != nil {
		t.Fatalf("ReplayOutbox: %v", err)
	}

	rows, err := store.Oldest(ctx, 10)
	if err != nil {
		t.Fatalf("Oldest: %v", err)
	}
	if len(rows) != 1 {
		t.Errorf("expected 1 row after partial accept, got %d", len(rows))
	}
}

func TestReplayOutbox_LeavesRowOnError(t *testing.T) {
	ctx := context.Background()
	dir := t.TempDir()
	store, err := state.Open(ctx, dir+"/test.db")
	if err != nil {
		t.Fatalf("Open: %v", err)
	}
	defer func() { _ = store.Close() }()

	item := state.OutboxItem{Tool: "cc", Host: "h", SourceFile: "/a.jsonl", Payload: []byte(`{"seq":1}
`)}
	if err := store.Enqueue(ctx, item); err != nil {
		t.Fatalf("Enqueue: %v", err)
	}

	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		w.WriteHeader(http.StatusInternalServerError)
		_, _ = w.Write([]byte(`boom`))
	}))
	defer ts.Close()

	sender := NewSender(ts.URL, http.DefaultClient)
	// Expect error to propagate.
	if err := ReplayOutbox(ctx, store, sender, 10); err == nil {
		t.Fatal("expected error on 500, got nil")
	}

	rows, err := store.Oldest(ctx, 10)
	if err != nil {
		t.Fatalf("Oldest: %v", err)
	}
	if len(rows) != 1 {
		t.Errorf("expected 1 row after error, got %d", len(rows))
	}
}

func TestEnforceOutboxLimit_DropsOldestRows(t *testing.T) {
	ctx := context.Background()
	dir := t.TempDir()
	store, err := state.Open(ctx, dir+"/test.db")
	if err != nil {
		t.Fatalf("Open: %v", err)
	}
	defer func() { _ = store.Close() }()

	// Enqueue three items with known sizes.
	items := []state.OutboxItem{
		{Tool: "cc", Host: "h", SourceFile: "/a.jsonl", Payload: []byte(`{"seq":1}`)},   // 9 bytes
		{Tool: "cc", Host: "h", SourceFile: "/b.jsonl", Payload: []byte(`{"seq":2}`)},   // 9 bytes
		{Tool: "cc", Host: "h", SourceFile: "/c.jsonl", Payload: []byte(`{"seq":333}`)}, // 11 bytes
	}
	for _, it := range items {
		if err := store.Enqueue(ctx, it); err != nil {
			t.Fatalf("Enqueue: %v", err)
		}
	}

	// Total = 29 bytes. Cap at 20 => drop oldest until under 20.
	if err := EnforceOutboxLimit(ctx, store, 20); err != nil {
		t.Fatalf("EnforceOutboxLimit: %v", err)
	}

	rows, err := store.Oldest(ctx, 10)
	if err != nil {
		t.Fatalf("Oldest: %v", err)
	}

	// Should have dropped the first row (9 bytes), leaving 20 bytes exactly.
	if len(rows) != 2 {
		t.Errorf("expected 2 rows, got %d", len(rows))
	}

	// Verify the oldest remaining row is the second item.
	if len(rows) > 0 && string(rows[0].Payload) != `{"seq":2}` {
		t.Errorf("oldest remaining payload = %q, want {\"seq\":2}", string(rows[0].Payload))
	}

	// Verify stats are under limit.
	st, err := store.Stats(ctx)
	if err != nil {
		t.Fatalf("Stats: %v", err)
	}
	if st.OutboxBytes > 20 {
		t.Errorf("OutboxBytes = %d, want <= 20", st.OutboxBytes)
	}
}

func TestEnforceOutboxLimit_NoOpWhenUnderLimit(t *testing.T) {
	ctx := context.Background()
	dir := t.TempDir()
	store, err := state.Open(ctx, dir+"/test.db")
	if err != nil {
		t.Fatalf("Open: %v", err)
	}
	defer func() { _ = store.Close() }()

	item := state.OutboxItem{Tool: "cc", Host: "h", SourceFile: "/a.jsonl", Payload: []byte(`x`)}
	if err := store.Enqueue(ctx, item); err != nil {
		t.Fatalf("Enqueue: %v", err)
	}

	if err := EnforceOutboxLimit(ctx, store, 1000); err != nil {
		t.Fatalf("EnforceOutboxLimit: %v", err)
	}

	rows, err := store.Oldest(ctx, 10)
	if err != nil {
		t.Fatalf("Oldest: %v", err)
	}
	if len(rows) != 1 {
		t.Errorf("expected 1 row, got %d", len(rows))
	}
}

func TestEnforceOutboxLimit_EmptyOutbox(t *testing.T) {
	ctx := context.Background()
	dir := t.TempDir()
	store, err := state.Open(ctx, dir+"/test.db")
	if err != nil {
		t.Fatalf("Open: %v", err)
	}
	defer func() { _ = store.Close() }()

	if err := EnforceOutboxLimit(ctx, store, 100); err != nil {
		t.Fatalf("EnforceOutboxLimit: %v", err)
	}
}

A internal/collector/ingest/sender.go => internal/collector/ingest/sender.go +86 -0
@@ 0,0 1,86 @@
// Package ingest handles HTTP posting of event batches to the lethe server
// and outbox replay for durability.
package ingest

import (
	"bytes"
	"context"
	"encoding/json"
	"fmt"
	"io"
	"net/http"

	"go.bigb.es/auxilia/culpa"
	"sourcecraft.dev/bigbes/lethe/internal/shared/wire"
)

// Result is the server's response to a batch ingest POST.
type Result struct {
	Accepted int      `json:"accepted"`
	Errors   []string `json:"errors"`
}

// Sender POSTs NDJSON batches to the lethe ingest endpoint.
type Sender struct {
	serverURL string
	client    *http.Client
}

// NewSender builds a Sender that posts to serverURL + /api/v1/ingest.
func NewSender(serverURL string, client *http.Client) *Sender {
	return &Sender{serverURL: serverURL, client: client}
}

// PostBatch encodes events as NDJSON and POSTs them to the ingest endpoint.
func (s *Sender) PostBatch(ctx context.Context, events []wire.TurnEvent) (Result, error) {
	body, err := EncodeNDJSON(events)
	if err != nil {
		return Result{}, culpa.Wrap(err, "encode ndjson")
	}
	return s.postRaw(ctx, body)
}

// postRaw sends raw NDJSON bytes and decodes the server's response.
func (s *Sender) postRaw(ctx context.Context, body []byte) (Result, error) {
	req, err := http.NewRequestWithContext(ctx, http.MethodPost, s.serverURL+"/api/v1/ingest", bytes.NewReader(body))
	if err != nil {
		return Result{}, culpa.Wrap(err, "build request")
	}
	req.Header.Set("Content-Type", "application/x-ndjson")

	resp, err := s.client.Do(req)
	if err != nil {
		return Result{}, culpa.Wrap(err, "post batch")
	}
	defer func() { _ = resp.Body.Close() }()

	if resp.StatusCode < 200 || resp.StatusCode >= 300 {
		body, _ := io.ReadAll(resp.Body)
		return Result{}, culpa.WithCode(
			fmt.Errorf("ingest %s: %s (body: %s)", resp.Status, string(body), resp.Status),
			"INGEST_HTTP",
		)
	}

	var result Result
	if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
		return Result{}, culpa.Wrap(err, "decode ingest response")
	}
	return result, nil
}

// EncodeNDJSON serialises a slice of TurnEvent into newline-delimited JSON.
// Each event is one JSON object per line; the final line ends with a newline.
func EncodeNDJSON(events []wire.TurnEvent) ([]byte, error) {
	if len(events) == 0 {
		return nil, nil
	}
	var buf bytes.Buffer
	enc := json.NewEncoder(&buf)
	for _, ev := range events {
		if err := enc.Encode(ev); err != nil {
			return nil, culpa.Wrap(err, "encode event")
		}
	}
	return buf.Bytes(), nil
}

A internal/collector/ingest/sender_test.go => internal/collector/ingest/sender_test.go +160 -0
@@ 0,0 1,160 @@
package ingest

import (
	"context"
	"encoding/json"
	"io"
	"net/http"
	"net/http/httptest"
	"strings"
	"testing"

	"sourcecraft.dev/bigbes/lethe/internal/shared/wire"
)

func TestEncodeNDJSON_EmitsOneObjectPerLineWithTrailingNewline(t *testing.T) {
	events := []wire.TurnEvent{
		{Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t1", Seq: 1, Role: "user", Timestamp: 1000, Content: "hello"},
		{Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t2", Seq: 2, Role: "assistant", Timestamp: 1001, Content: "world"},
	}

	data, err := EncodeNDJSON(events)
	if err != nil {
		t.Fatalf("EncodeNDJSON: %v", err)
	}

	lines := strings.Split(string(data), "\n")
	// Trailing newline means last element is empty.
	if len(lines) != 3 {
		t.Fatalf("expected 3 lines (2 events + trailing empty), got %d", len(lines))
	}
	if lines[2] != "" {
		t.Errorf("expected trailing empty line, got %q", lines[2])
	}

	for i, line := range lines[:2] {
		if line == "" {
			t.Fatalf("line %d is empty", i)
		}
		var ev wire.TurnEvent
		if err := json.Unmarshal([]byte(line), &ev); err != nil {
			t.Fatalf("line %d invalid JSON: %v", i, err)
		}
	}
}

func TestEncodeNDJSON_EmptySlice(t *testing.T) {
	data, err := EncodeNDJSON([]wire.TurnEvent{})
	if err != nil {
		t.Fatalf("EncodeNDJSON: %v", err)
	}
	if len(data) != 0 {
		t.Errorf("expected empty bytes for empty slice, got %q", string(data))
	}
}

func TestSender_PostBatch_Success(t *testing.T) {
	var gotBody []byte
	var gotContentType string
	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		if r.Method != http.MethodPost {
			t.Errorf("expected POST, got %s", r.Method)
		}
		if r.URL.Path != "/api/v1/ingest" {
			t.Errorf("expected path /api/v1/ingest, got %s", r.URL.Path)
		}
		gotContentType = r.Header.Get("Content-Type")
		var err error
		gotBody, err = io.ReadAll(r.Body)
		if err != nil {
			t.Fatalf("read body: %v", err)
		}
		w.Header().Set("Content-Type", "application/json")
		w.WriteHeader(http.StatusOK)
		_, _ = w.Write([]byte(`{"accepted":2,"errors":[]}`))
	}))
	defer ts.Close()

	sender := NewSender(ts.URL, http.DefaultClient)
	events := []wire.TurnEvent{
		{Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t1", Seq: 1, Role: "user", Timestamp: 1000, Content: "hello"},
		{Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t2", Seq: 2, Role: "assistant", Timestamp: 1001, Content: "world"},
	}

	result, err := sender.PostBatch(context.Background(), events)
	if err != nil {
		t.Fatalf("PostBatch: %v", err)
	}
	if result.Accepted != 2 {
		t.Errorf("Accepted = %d, want 2", result.Accepted)
	}
	if len(result.Errors) != 0 {
		t.Errorf("Errors = %v, want empty", result.Errors)
	}
	if gotContentType != "application/x-ndjson" {
		t.Errorf("Content-Type = %q, want application/x-ndjson", gotContentType)
	}

	// Verify body is valid NDJSON.
	lines := strings.Split(string(gotBody), "\n")
	if len(lines) != 3 || lines[2] != "" {
		t.Errorf("body does not look like NDJSON with trailing newline: %q", string(gotBody))
	}
}

func TestSender_PostBatch_Non2xx(t *testing.T) {
	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		w.WriteHeader(http.StatusServiceUnavailable)
		_, _ = w.Write([]byte(`busy`))
	}))
	defer ts.Close()

	sender := NewSender(ts.URL, http.DefaultClient)
	_, err := sender.PostBatch(context.Background(), []wire.TurnEvent{
		{Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t1", Seq: 1, Role: "user", Timestamp: 1000, Content: "hello"},
	})
	if err == nil {
		t.Fatal("expected error on 503, got nil")
	}
}

func TestSender_PostBatch_MalformedResponse(t *testing.T) {
	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		w.Header().Set("Content-Type", "application/json")
		w.WriteHeader(http.StatusOK)
		_, _ = w.Write([]byte(`not json`))
	}))
	defer ts.Close()

	sender := NewSender(ts.URL, http.DefaultClient)
	_, err := sender.PostBatch(context.Background(), []wire.TurnEvent{
		{Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t1", Seq: 1, Role: "user", Timestamp: 1000, Content: "hello"},
	})
	if err == nil {
		t.Fatal("expected error on malformed response, got nil")
	}
}

func TestSender_PostBatch_PartialAccept(t *testing.T) {
	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		w.Header().Set("Content-Type", "application/json")
		w.WriteHeader(http.StatusOK)
		_, _ = w.Write([]byte(`{"accepted":1,"errors":["bad row"]}`))
	}))
	defer ts.Close()

	sender := NewSender(ts.URL, http.DefaultClient)
	result, err := sender.PostBatch(context.Background(), []wire.TurnEvent{
		{Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t1", Seq: 1, Role: "user", Timestamp: 1000, Content: "hello"},
		{Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t2", Seq: 2, Role: "assistant", Timestamp: 1001, Content: "world"},
	})
	if err != nil {
		t.Fatalf("PostBatch: %v", err)
	}
	if result.Accepted != 1 {
		t.Errorf("Accepted = %d, want 1", result.Accepted)
	}
	if len(result.Errors) != 1 || result.Errors[0] != "bad row" {
		t.Errorf("Errors = %v, want [bad row]", result.Errors)
	}
}