~bigbes/lethe

f3118d95bdf88c114346b0b2b43fad44e564a484 — Eugene Blikh 24 days ago 538e632
collector: preserve valid rows around ingest errors
M cmd/lethe-collector/main.go => cmd/lethe-collector/main.go +5 -1
@@ 202,7 202,11 @@ func newStatusCmd(configPath *string) *cobra.Command {
						fmt.Printf("  %s: error=%v\n", f.Path, err)
						continue
					}
					fmt.Printf("  %s: offset=%d\n", f.Path, off)
					lagBytes := f.Size - off
					if lagBytes < 0 {
						lagBytes = 0
					}
					fmt.Printf("  %s: offset=%d lag_bytes=%d\n", f.Path, off, lagBytes)
				}
			}


M docs/tasks/lethe-collector-claude-code.md => docs/tasks/lethe-collector-claude-code.md +7 -1
@@ 315,9 315,15 @@ Smoke: `go run ./cmd/lethe-collector --config ./tmp/collector-smoke.yaml status`
- worktree: `task/lethe-collector-claude-code` at `/Users/blikh/data/home/lethe/.worktrees/lethe-collector-claude-code` — hands-off requires isolated reversible edits.
- worktree setup: added `.worktrees/` to `.gitignore` on `master` before creating the task worktree — `git-worktrees` requires project-local worktree directories to be ignored.
- uplan: plan auto-approved (hands-off).
- ureview (re-review): fixed `persistAcceptedOffset` to use the first error's `Line` (1-based within the request body) to identify the failed row, rather than assuming `result.Accepted` points to it. Valid but uncommitted rows before the failed line are re-posted as a smaller prefix before the failed row is skipped, preventing data loss when `accepted=0, errors line=2`.
- ureview (re-review): added WARN log with dropped row count and bytes to `EnforceOutboxLimit`.
- ureview (re-review): added `lag_bytes` per file to `status` output using `parser.SourceFile.Size` from discovery.

### Deferred / known limitations

- `status` does not display per-source `last_error` or ingestion `lag` because the PH1 state schema (`ingestion_state` table) stores only `last_offset`, not error history or timestamps. Adding these requires a schema extension outside the current plan scope.
- `status` does not display per-source `last_error` because the PH1 state schema (`ingestion_state` table) stores only `last_offset`, not error history or timestamps. Adding these requires a schema extension outside the current plan scope.

### Deferred (needs user input)

- retry/backoff: `http.retry_max` is loaded from config but exponential backoff needs a configured base/max delay; no conservative default was specified.
- status last_error: requires extending `ingestion_state` schema and deciding retention/update semantics.

M internal/collector/ingest/outbox.go => internal/collector/ingest/outbox.go +2 -0
@@ 4,6 4,7 @@ import (
	"bytes"
	"context"
	"fmt"
	"log/slog"

	"go.bigb.es/auxilia/culpa"
	"sourcecraft.dev/bigbes/lethe/internal/collector/state"


@@ 71,6 72,7 @@ func EnforceOutboxLimit(ctx context.Context, store *state.Store, maxBytes int64)
		}

		if len(deleteIDs) > 0 {
			slog.Warn("outbox overflow: dropped oldest rows", "dropped_rows", len(deleteIDs), "dropped_bytes", dropped)
			if err := store.Delete(ctx, deleteIDs); err != nil {
				return culpa.Wrap(err, "trim outbox")
			}

M internal/collector/ingest/outbox_test.go => internal/collector/ingest/outbox_test.go +44 -0
@@ 1,9 1,12 @@
package ingest

import (
	"bytes"
	"context"
	"log/slog"
	"net/http"
	"net/http/httptest"
	"strings"
	"testing"

	"sourcecraft.dev/bigbes/lethe/internal/collector/state"


@@ 209,6 212,47 @@ func TestEnforceOutboxLimit_NoOpWhenUnderLimit(t *testing.T) {
	}
}

func TestEnforceOutboxLimit_LogsWarningWhenRowsDropped(t *testing.T) {
	ctx := context.Background()
	dir := t.TempDir()
	store, err := state.Open(ctx, dir+"/test.db")
	if err != nil {
		t.Fatalf("Open: %v", err)
	}
	defer func() { _ = store.Close() }()

	items := []state.OutboxItem{
		{Tool: "cc", Host: "h", SourceFile: "/a.jsonl", Payload: []byte(`{"seq":1}`)},   // 9 bytes
		{Tool: "cc", Host: "h", SourceFile: "/b.jsonl", Payload: []byte(`{"seq":2}`)},   // 9 bytes
		{Tool: "cc", Host: "h", SourceFile: "/c.jsonl", Payload: []byte(`{"seq":333}`)}, // 11 bytes
	}
	for _, it := range items {
		if err := store.Enqueue(ctx, it); err != nil {
			t.Fatalf("Enqueue: %v", err)
		}
	}

	oldLogger := slog.Default()
	defer slog.SetDefault(oldLogger)
	var buf bytes.Buffer
	slog.SetDefault(slog.New(slog.NewTextHandler(&buf, &slog.HandlerOptions{Level: slog.LevelWarn})))

	if err := EnforceOutboxLimit(ctx, store, 20); err != nil {
		t.Fatalf("EnforceOutboxLimit: %v", err)
	}

	logOutput := buf.String()
	if !strings.Contains(logOutput, "outbox overflow") {
		t.Errorf("expected WARN log containing 'outbox overflow', got %q", logOutput)
	}
	if !strings.Contains(logOutput, "dropped_rows") {
		t.Errorf("expected WARN log containing 'dropped_rows', got %q", logOutput)
	}
	if !strings.Contains(logOutput, "dropped_bytes") {
		t.Errorf("expected WARN log containing 'dropped_bytes', got %q", logOutput)
	}
}

func TestEnforceOutboxLimit_EmptyOutbox(t *testing.T) {
	ctx := context.Background()
	dir := t.TempDir()

M internal/collector/ingest/runner.go => internal/collector/ingest/runner.go +73 -16
@@ 143,7 143,7 @@ func runFile(ctx context.Context, cfg config.Config, src config.SourceConfig, p 
			return culpa.Wrap(err, "post batch")
		}

		advanced, err := persistAcceptedOffset(ctx, store, src.Tool, path, events, newOffset, batch, result)
		advanced, err := persistAcceptedOffset(ctx, store, sender, cfg, src, path, events, newOffset, batch, result)
		if err != nil {
			return err
		}


@@ 175,7 175,7 @@ func enqueueBatch(ctx context.Context, store *state.Store, src config.SourceConf
	return nil
}

func persistAcceptedOffset(ctx context.Context, store *state.Store, tool string, path string, allEvents []wire.TurnEvent, newOffset int64, batch Batch, result Result) (int, error) {
func persistAcceptedOffset(ctx context.Context, store *state.Store, sender *Sender, cfg config.Config, src config.SourceConfig, path string, allEvents []wire.TurnEvent, newOffset int64, batch Batch, result Result) (int, error) {
	if result.Accepted < 0 || result.Accepted > len(batch.Events) {
		return 0, culpa.WithCode(
			fmt.Errorf("server accepted %d events for batch length %d", result.Accepted, len(batch.Events)),


@@ 188,25 188,82 @@ func persistAcceptedOffset(ctx context.Context, store *state.Store, tool string,
			"RUNNER_ACCEPTED_MISMATCH",
		)
	}
	if result.Accepted == 0 && len(result.Errors) == 0 {
		return 0, nil

	if len(result.Errors) == 0 {
		if result.Accepted == 0 {
			return 0, nil
		}
		nextGlobalIndex := batch.EventIndexes[0] + result.Accepted
		offset := eventOffset(allEvents, newOffset, nextGlobalIndex)
		if err := store.SaveOffset(ctx, src.Tool, path, offset); err != nil {
			return 0, culpa.Wrap(err, "save accepted offset")
		}
		return result.Accepted, nil
	}

	nextGlobalIndex := batch.EventIndexes[0] + result.Accepted
	if len(result.Errors) > 0 {
		// Skip the failed event so it does not loop forever.
		nextGlobalIndex++
	// There are errors — use the first error's line to identify the failed row.
	failedLine := result.Errors[0].Line
	if failedLine < 1 || failedLine > len(batch.Events) {
		return 0, culpa.WithCode(
			fmt.Errorf("server error line %d out of range [1,%d]", failedLine, len(batch.Events)),
			"RUNNER_ERROR_LINE_INVALID",
		)
	}
	failedLocalIdx := failedLine - 1

	var offset int64
	if nextGlobalIndex >= len(allEvents) {
		offset = newOffset
	} else {
		offset = allEvents[nextGlobalIndex].Seq
	if result.Accepted > failedLocalIdx {
		return 0, culpa.WithCode(
			fmt.Errorf("server accepted %d events but first error is at line %d", result.Accepted, failedLine),
			"RUNNER_ACCEPTED_MISMATCH",
		)
	}

	// Persist any already-accepted rows.
	if result.Accepted > 0 {
		nextGlobalIndex := batch.EventIndexes[0] + result.Accepted
		offset := eventOffset(allEvents, newOffset, nextGlobalIndex)
		if err := store.SaveOffset(ctx, src.Tool, path, offset); err != nil {
			return 0, culpa.Wrap(err, "save accepted offset")
		}
	}

	if err := store.SaveOffset(ctx, tool, path, offset); err != nil {
		return 0, culpa.Wrap(err, "save accepted offset")
	// If there are uncommitted valid rows between accepted and the failed row,
	// post that prefix safely with the same rules.
	if result.Accepted < failedLocalIdx {
		prefix := Batch{
			Events:       batch.Events[result.Accepted:failedLocalIdx],
			EventIndexes: batch.EventIndexes[result.Accepted:failedLocalIdx],
		}
		prefixResult, err := sender.PostBatch(ctx, prefix.Events)
		if err != nil {
			if enqueueErr := enqueueBatch(ctx, store, src, cfg.Host, path, prefix.Events, cfg.Outbox.MaxBytes); enqueueErr != nil {
				return 0, errors.Join(culpa.Wrap(err, "post prefix batch"), enqueueErr)
			}
			return 0, culpa.Wrap(err, "post prefix batch")
		}
		prefixAdvanced, err := persistAcceptedOffset(ctx, store, sender, cfg, src, path, allEvents, newOffset, prefix, prefixResult)
		if err != nil {
			return 0, err
		}
		if prefixAdvanced < len(prefix.Events) {
			// The prefix itself hit an error or could not advance fully.
			// Return total advanced so far (accepted rows + prefix advancement).
			return result.Accepted + prefixAdvanced, nil
		}
	}

	// Skip the failed row so it does not loop forever.
	nextGlobalIndex := batch.EventIndexes[0] + failedLocalIdx + 1
	offset := eventOffset(allEvents, newOffset, nextGlobalIndex)
	if err := store.SaveOffset(ctx, src.Tool, path, offset); err != nil {
		return 0, culpa.Wrap(err, "save offset past failed row")
	}
	return failedLocalIdx + 1, nil
}

func eventOffset(allEvents []wire.TurnEvent, newOffset int64, nextGlobalIndex int) int64 {
	if nextGlobalIndex >= len(allEvents) {
		return newOffset
	}
	return result.Accepted, nil
	return allEvents[nextGlobalIndex].Seq
}

M internal/collector/ingest/runner_test.go => internal/collector/ingest/runner_test.go +89 -0
@@ 55,6 55,95 @@ func TestRunOnce_PartialAcceptWithErrorSkipsFailedEvent(t *testing.T) {
	assertOutboxCount(t, ctx, store, 0)
}

func TestRunOnce_ZeroAcceptedWithErrorLineTwoPostsPrefixAndSkipsFailedEvent(t *testing.T) {
	ctx := context.Background()
	store := openTestStore(t, ctx)
	source := testSource(t, "claude-code", 10, 4096)
	file := filepath.Join(source.Path, "one.jsonl")
	events := []wire.TurnEvent{turnEvent(100, "a"), turnEvent(200, "b"), turnEvent(300, "c")}
	p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{
		file: {events: events, newOffset: 400},
	})

	var posted [][]wire.TurnEvent
	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		body, _ := io.ReadAll(r.Body)
		var batch []wire.TurnEvent
		for _, line := range strings.Split(string(body), "\n") {
			if line == "" {
				continue
			}
			var ev wire.TurnEvent
			if err := json.Unmarshal([]byte(line), &ev); err != nil {
				t.Fatalf("unmarshal event: %v", err)
			}
			batch = append(batch, ev)
		}
		posted = append(posted, batch)
		w.Header().Set("Content-Type", "application/json")
		if len(posted) == 1 {
			_, _ = w.Write([]byte(resultJSON(Result{Accepted: 0, Errors: []LineError{{Line: 2, Err: "bad"}}})))
		} else {
			_, _ = w.Write([]byte(resultJSON(Result{Accepted: len(batch)})))
		}
	}))
	defer ts.Close()
	sender := NewSender(ts.URL, ts.Client())

	err := RunOnce(ctx, testConfig(), source, p, store, sender)
	if err != nil {
		t.Fatalf("RunOnce first run: %v", err)
	}
	assertOffset(t, ctx, store, "claude-code", file, 300)

	if len(posted) != 2 {
		t.Fatalf("first run: expected 2 POSTs, got %d", len(posted))
	}
	if len(posted[0]) != 3 {
		t.Errorf("first run batch: expected 3 events, got %d", len(posted[0]))
	}
	if len(posted[1]) != 1 {
		t.Errorf("prefix batch: expected 1 event, got %d", len(posted[1]))
	}
	if posted[1][0].Content != "a" {
		t.Errorf("prefix batch: expected event 'a', got %q", posted[1][0].Content)
	}

	err = RunOnce(ctx, testConfig(), source, p, store, sender)
	if err != nil {
		t.Fatalf("RunOnce second run: %v", err)
	}
	assertOffset(t, ctx, store, "claude-code", file, 400)

	if len(posted) != 3 {
		t.Fatalf("second run: expected 3 POSTs total, got %d", len(posted))
	}
	if len(posted[2]) != 1 {
		t.Errorf("second run batch: expected 1 event, got %d", len(posted[2]))
	}
	if posted[2][0].Content != "c" {
		t.Errorf("second run batch: expected event 'c', got %q", posted[2][0].Content)
	}
}

func TestRunOnce_MalformedErrorLineOutOfRangeReturnsError(t *testing.T) {
	ctx := context.Background()
	store := openTestStore(t, ctx)
	source := testSource(t, "claude-code", 10, 4096)
	file := filepath.Join(source.Path, "one.jsonl")
	p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{
		file: {events: []wire.TurnEvent{turnEvent(100, "a"), turnEvent(200, "b")}, newOffset: 300},
	})
	sender := acceptingSender(t, []Result{{Accepted: 0, Errors: []LineError{{Line: 5, Err: "bad"}}}})

	err := RunOnce(ctx, testConfig(), source, p, store, sender)
	if err == nil {
		t.Fatal("expected RunOnce error for malformed error line")
	}

	assertOffset(t, ctx, store, "claude-code", file, 0)
}

func TestRunOnce_ZeroAcceptedWithErrorSkipsFirstEvent(t *testing.T) {
	ctx := context.Background()
	store := openTestStore(t, ctx)