~bigbes/lethe

9094d79e008f69af42107022da2bfcf6f8d7aea8 — Eugene Blikh 24 days ago dcc2805
collector: enforce outbox cap before replay
M docs/tasks/lethe-collector-claude-code.md => docs/tasks/lethe-collector-claude-code.md +1 -0
@@ 319,6 319,7 @@ Smoke: `go run ./cmd/lethe-collector --config ./tmp/collector-smoke.yaml status`
- ureview (re-review): added `lag_bytes` per file to `status` output using `parser.SourceFile.Size` from discovery.
- ureview (final): bounded daemon drain uses `http.timeout` — no separate `shutdown_grace` config exists for the collector.
- ureview (final): backfill offset-0 semantics are implemented as `RunBackfillOnce` instead of a mode flag on `RunOnce` — explicit call sites are safer than a boolean parameter that could be misused in daemon loops.
- ureview (final): normalized sender `serverURL` and enforced outbox cap before every replay to fix IV5/IV9 violations found in review.

### Deferred (needs user input)


M internal/collector/ingest/runner.go => internal/collector/ingest/runner.go +26 -8
@@ 18,6 18,26 @@ import (

const replayOutboxLimit = 100

// enforceAndReplay enforces the outbox size limit then replays queued rows.
func enforceAndReplay(ctx context.Context, store *state.Store, sender *Sender, maxBytes int64, src config.SourceConfig) error {
	var errs []error
	if err := EnforceOutboxLimit(ctx, store, maxBytes); err != nil {
		if ctxErr := ctx.Err(); ctxErr != nil {
			return errors.Join(ctxErr, culpa.Wrap(err, "enforce outbox limit"))
		}
		slog.Warn("enforce outbox limit failed", "tool", src.Tool, "error", err)
		errs = append(errs, culpa.Wrap(err, "enforce outbox limit"))
	}
	if err := ReplayOutbox(ctx, store, sender, replayOutboxLimit); err != nil {
		if ctxErr := ctx.Err(); ctxErr != nil {
			return errors.Join(append(errs, ctxErr, culpa.Wrap(err, "replay outbox"))...)
		}
		slog.Warn("outbox replay failed", "tool", src.Tool, "error", err)
		errs = append(errs, culpa.Wrap(err, "replay outbox"))
	}
	return errors.Join(errs...)
}

// RunOnce replays the safety-net outbox, scans one configured source, parses
// new complete records from persisted offsets, posts batches, and advances
// offsets only after server acceptance confirms the corresponding lines.


@@ 31,12 51,11 @@ func RunOnce(ctx context.Context, cfg config.Config, src config.SourceConfig, p 
		)
	}

	if err := ReplayOutbox(ctx, store, sender, replayOutboxLimit); err != nil {
	if err := enforceAndReplay(ctx, store, sender, cfg.Outbox.MaxBytes, src); err != nil {
		if ctxErr := ctx.Err(); ctxErr != nil {
			return errors.Join(ctxErr, culpa.Wrap(err, "replay outbox"))
			return errors.Join(append(errs, err)...)
		}
		slog.Warn("outbox replay failed", "tool", src.Tool, "error", err)
		errs = append(errs, culpa.Wrap(err, "replay outbox"))
		errs = append(errs, err)
	}

	files, err := p.Discover(src.Path)


@@ 70,12 89,11 @@ func RunBackfillOnce(ctx context.Context, cfg config.Config, src config.SourceCo
		)
	}

	if err := ReplayOutbox(ctx, store, sender, replayOutboxLimit); err != nil {
	if err := enforceAndReplay(ctx, store, sender, cfg.Outbox.MaxBytes, src); err != nil {
		if ctxErr := ctx.Err(); ctxErr != nil {
			return errors.Join(ctxErr, culpa.Wrap(err, "replay outbox"))
			return errors.Join(append(errs, err)...)
		}
		slog.Warn("outbox replay failed", "tool", src.Tool, "error", err)
		errs = append(errs, culpa.Wrap(err, "replay outbox"))
		errs = append(errs, err)
	}

	files, err := p.Discover(src.Path)

M internal/collector/ingest/runner_test.go => internal/collector/ingest/runner_test.go +45 -0
@@ 14,6 14,7 @@ import (

	"sourcecraft.dev/bigbes/lethe/internal/collector/config"
	"sourcecraft.dev/bigbes/lethe/internal/collector/parser"
	"sourcecraft.dev/bigbes/lethe/internal/collector/state"
	"sourcecraft.dev/bigbes/lethe/internal/shared/wire"
)



@@ 36,6 37,50 @@ func TestRunOnce_AllAcceptedPersistsNewOffsetAndDoesNotEnqueueOutbox(t *testing.
	assertOutboxCount(t, ctx, store, 0)
}

func TestRunOnce_TrimsPreexistingOutboxOverLimitEvenWithNoEnqueue(t *testing.T) {
	ctx := context.Background()
	store := openTestStore(t, ctx)
	source := testSource(t, "claude-code", 10, 4096)
	file := filepath.Join(source.Path, "one.jsonl")

	// Pre-populate outbox with two 10-byte items.
	items := []state.OutboxItem{
		{Tool: "claude-code", Host: "h", SourceFile: file, Payload: []byte("{\"seq\":1}\n")},
		{Tool: "claude-code", Host: "h", SourceFile: file, Payload: []byte("{\"seq\":2}\n")},
	}
	for _, it := range items {
		if err := store.Enqueue(ctx, it); err != nil {
			t.Fatalf("Enqueue: %v", err)
		}
	}

	cfg := testConfig()
	cfg.Outbox.MaxBytes = 10

	// No new events => no new enqueue.
	p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{
		file: {events: []wire.TurnEvent{}, newOffset: 0},
	})
	// Accepted:0 so replay does not delete rows; enforce must trim.
	sender := acceptingSender(t, nil)

	err := RunOnce(ctx, cfg, source, p, store, sender)
	if err != nil {
		t.Fatalf("RunOnce: %v", err)
	}

	rows, err := store.Oldest(ctx, 10)
	if err != nil {
		t.Fatalf("Oldest: %v", err)
	}
	if len(rows) != 1 {
		t.Errorf("expected 1 row after trim, got %d", len(rows))
	}
	if len(rows) > 0 && string(rows[0].Payload) != "{\"seq\":2}\n" {
		t.Errorf("remaining payload = %q, want %q", string(rows[0].Payload), "{\"seq\":2}\n")
	}
}

func TestRunOnce_PartialAcceptWithErrorSkipsFailedEvent(t *testing.T) {
	ctx := context.Background()
	store := openTestStore(t, ctx)

M internal/collector/ingest/sender.go => internal/collector/ingest/sender.go +2 -1
@@ 9,6 9,7 @@ import (
	"fmt"
	"io"
	"net/http"
	"strings"

	"go.bigb.es/auxilia/culpa"
	"sourcecraft.dev/bigbes/lethe/internal/shared/wire"


@@ 34,7 35,7 @@ type Sender struct {

// NewSender builds a Sender that posts to serverURL + /api/v1/ingest.
func NewSender(serverURL string, client *http.Client) *Sender {
	return &Sender{serverURL: serverURL, client: client}
	return &Sender{serverURL: strings.TrimRight(serverURL, "/"), client: client}
}

// PostBatch encodes events as NDJSON and POSTs them to the ingest endpoint.

M internal/collector/ingest/sender_test.go => internal/collector/ingest/sender_test.go +22 -0
@@ 102,6 102,28 @@ func TestSender_PostBatch_Success(t *testing.T) {
	}
}

func TestSender_PostBatch_TrailingSlashURL(t *testing.T) {
	var gotPath string
	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		gotPath = r.URL.Path
		w.Header().Set("Content-Type", "application/json")
		w.WriteHeader(http.StatusOK)
		_, _ = w.Write([]byte(`{"accepted":1,"errors":[]}`))
	}))
	defer ts.Close()

	sender := NewSender(ts.URL+"/", http.DefaultClient)
	_, err := sender.PostBatch(context.Background(), []wire.TurnEvent{
		{Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t1", Seq: 1, Role: "user", Timestamp: 1000, Content: "hello"},
	})
	if err != nil {
		t.Fatalf("PostBatch: %v", err)
	}
	if gotPath != "/api/v1/ingest" {
		t.Errorf("expected path /api/v1/ingest, got %s", gotPath)
	}
}

func TestSender_PostBatch_Non2xx(t *testing.T) {
	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		w.WriteHeader(http.StatusServiceUnavailable)