~bigbes/lethe

27c47142dda459f4dd552a8ceba4934eca81ff6b — Eugene Blikh 24 days ago ef738a2
collector: add polling source runner
A internal/collector/ingest/batch.go => internal/collector/ingest/batch.go +75 -0
@@ 0,0 1,75 @@
package ingest

import (
	"fmt"

	"go.bigb.es/auxilia/culpa"
	"sourcecraft.dev/bigbes/lethe/internal/shared/wire"
)

// Batch is a POST-sized slice of events plus their indexes in the parsed file
// result. EventIndexes lets accepted counts map back to source offsets.
type Batch struct {
	Events       []wire.TurnEvent
	EventIndexes []int
}

// BuildBatches splits events by both line and encoded byte caps.
func BuildBatches(events []wire.TurnEvent, maxLines int, maxBytes int) ([]Batch, error) {
	if maxLines <= 0 {
		return nil, culpa.WithCode(fmt.Errorf("max lines must be positive: %d", maxLines), "BATCH_INVALID_CAP")
	}
	if maxBytes <= 0 {
		return nil, culpa.WithCode(fmt.Errorf("max bytes must be positive: %d", maxBytes), "BATCH_INVALID_CAP")
	}
	if len(events) == 0 {
		return nil, nil
	}

	batches := make([]Batch, 0, (len(events)+maxLines-1)/maxLines)
	var current Batch
	var currentBytes int

	flush := func() {
		if len(current.Events) == 0 {
			return
		}
		batches = append(batches, current)
		current = Batch{}
		currentBytes = 0
	}

	for i, event := range events {
		eventBytes, err := encodedEventLen(event)
		if err != nil {
			return nil, err
		}
		if eventBytes > maxBytes {
			return nil, culpa.WithCode(
				fmt.Errorf("event %d encodes to %d bytes, above max %d", i, eventBytes, maxBytes),
				"BATCH_EVENT_TOO_LARGE",
			)
		}

		lineCapReached := len(current.Events) >= maxLines
		byteCapReached := currentBytes+eventBytes > maxBytes
		if lineCapReached || byteCapReached {
			flush()
		}

		current.Events = append(current.Events, event)
		current.EventIndexes = append(current.EventIndexes, i)
		currentBytes += eventBytes
	}
	flush()

	return batches, nil
}

func encodedEventLen(event wire.TurnEvent) (int, error) {
	data, err := EncodeNDJSON([]wire.TurnEvent{event})
	if err != nil {
		return 0, culpa.Wrap(err, "encode event for batch sizing")
	}
	return len(data), nil
}

A internal/collector/ingest/batch_test.go => internal/collector/ingest/batch_test.go +82 -0
@@ 0,0 1,82 @@
package ingest

import (
	"testing"

	"sourcecraft.dev/bigbes/lethe/internal/shared/wire"
)

func TestBuildBatches_SplitsByMaxLines(t *testing.T) {
	events := []wire.TurnEvent{
		turnEvent(0, "a"),
		turnEvent(10, "b"),
		turnEvent(20, "c"),
	}

	batches, err := BuildBatches(events, 2, 1024)
	if err != nil {
		t.Fatalf("BuildBatches: %v", err)
	}

	if len(batches) != 2 {
		t.Fatalf("len(batches) = %d, want 2", len(batches))
	}
	assertBatch(t, batches[0], []int{0, 1})
	assertBatch(t, batches[1], []int{2})
}

func TestBuildBatches_SplitsByMaxBytes(t *testing.T) {
	events := []wire.TurnEvent{
		turnEvent(0, "a"),
		turnEvent(10, "b"),
		turnEvent(20, "c"),
	}
	firstSize := mustEncodedLen(t, events[:1])

	batches, err := BuildBatches(events, 100, firstSize+1)
	if err != nil {
		t.Fatalf("BuildBatches: %v", err)
	}

	if len(batches) != 3 {
		t.Fatalf("len(batches) = %d, want 3", len(batches))
	}
	assertBatch(t, batches[0], []int{0})
	assertBatch(t, batches[1], []int{1})
	assertBatch(t, batches[2], []int{2})
}

func TestBuildBatches_RejectsNonPositiveCaps(t *testing.T) {
	events := []wire.TurnEvent{turnEvent(0, "a")}

	if _, err := BuildBatches(events, 0, 1024); err == nil {
		t.Fatal("expected error for zero maxLines")
	}
	if _, err := BuildBatches(events, 1, 0); err == nil {
		t.Fatal("expected error for zero maxBytes")
	}
}

func assertBatch(t *testing.T, batch Batch, wantIndexes []int) {
	t.Helper()
	if len(batch.Events) != len(wantIndexes) {
		t.Fatalf("len(batch.Events) = %d, want %d", len(batch.Events), len(wantIndexes))
	}
	if len(batch.EventIndexes) != len(wantIndexes) {
		t.Fatalf("len(batch.EventIndexes) = %d, want %d", len(batch.EventIndexes), len(wantIndexes))
	}
	for i, want := range wantIndexes {
		if batch.EventIndexes[i] != want {
			t.Errorf("EventIndexes[%d] = %d, want %d", i, batch.EventIndexes[i], want)
		}
	}
}

func mustEncodedLen(t *testing.T, events []wire.TurnEvent) int {
	t.Helper()
	data, err := EncodeNDJSON(events)
	if err != nil {
		t.Fatalf("EncodeNDJSON: %v", err)
	}
	return len(data)
}

A internal/collector/ingest/runner.go => internal/collector/ingest/runner.go +201 -0
@@ 0,0 1,201 @@
package ingest

import (
	"context"
	"errors"
	"fmt"
	"log/slog"
	"math"
	"time"

	"go.bigb.es/auxilia/async"
	"go.bigb.es/auxilia/culpa"
	"sourcecraft.dev/bigbes/lethe/internal/collector/config"
	"sourcecraft.dev/bigbes/lethe/internal/collector/parser"
	"sourcecraft.dev/bigbes/lethe/internal/collector/state"
	"sourcecraft.dev/bigbes/lethe/internal/shared/wire"
)

const replayOutboxLimit = 100

// RunOnce replays the safety-net outbox, scans one configured source, parses
// new complete records from persisted offsets, posts batches, and advances
// offsets only after server acceptance confirms the corresponding lines.
func RunOnce(ctx context.Context, cfg config.Config, src config.SourceConfig, p parser.Parser, store *state.Store, sender *Sender) error {
	var errs []error

	if p.Tool() != src.Tool {
		return culpa.WithCode(
			fmt.Errorf("parser tool %q does not match source tool %q", p.Tool(), src.Tool),
			"RUNNER_TOOL_MISMATCH",
		)
	}

	if err := ReplayOutbox(ctx, store, sender, replayOutboxLimit); err != nil {
		if ctxErr := ctx.Err(); ctxErr != nil {
			return errors.Join(ctxErr, culpa.Wrap(err, "replay outbox"))
		}
		slog.Warn("outbox replay failed", "tool", src.Tool, "error", err)
		errs = append(errs, culpa.Wrap(err, "replay outbox"))
	}

	files, err := p.Discover(src.Path)
	if err != nil {
		return errors.Join(append(errs, culpa.Wrap(err, "discover source files"))...)
	}

	for _, file := range files {
		if err := ctx.Err(); err != nil {
			return errors.Join(append(errs, err)...)
		}
		if err := runFile(ctx, cfg, src, p, store, sender, file.Path); err != nil {
			slog.Warn("source file ingest failed", "tool", src.Tool, "source_file", file.Path, "error", err)
			errs = append(errs, err)
		}
	}

	return errors.Join(errs...)
}

// RunDaemon starts one independent polling loop per configured source and
// exits after the supplied context is canceled and all loops have stopped.
func RunDaemon(ctx context.Context, cfg config.Config, parsers map[string]parser.Parser, store *state.Store, sender *Sender) error {
	if len(cfg.Sources) == 0 {
		<-ctx.Done()
		return nil
	}

	group := async.NewGroup[struct{}]()
	for _, source := range cfg.Sources {
		src := source
		p, ok := parsers[src.Tool]
		if !ok {
			return culpa.WithCode(fmt.Errorf("missing parser for source tool %q", src.Tool), "RUNNER_MISSING_PARSER")
		}
		group.AddExecutor(func(loopCtx context.Context) (struct{}, error) {
			pollSource(loopCtx, cfg, src, p, store, sender)
			return struct{}{}, nil
		})
	}

	if err := group.Start(ctx); err != nil {
		return culpa.Wrap(err, "start source polling loops")
	}
	if err := group.All(context.Background()); err != nil {
		return culpa.Wrap(err, "wait for source polling loops")
	}
	return nil
}

func pollSource(ctx context.Context, cfg config.Config, src config.SourceConfig, p parser.Parser, store *state.Store, sender *Sender) {
	runAndLog := func() {
		if ctx.Err() != nil {
			return
		}
		if err := RunOnce(ctx, cfg, src, p, store, sender); err != nil && ctx.Err() == nil {
			slog.Warn("source poll failed", "tool", src.Tool, "path", src.Path, "error", err)
		}
	}

	runAndLog()
	ticker := time.NewTicker(src.PollInterval)
	defer ticker.Stop()

	for {
		select {
		case <-ctx.Done():
			return
		case <-ticker.C:
			runAndLog()
		}
	}
}

func runFile(ctx context.Context, cfg config.Config, src config.SourceConfig, p parser.Parser, store *state.Store, sender *Sender, path string) error {
	offset, err := store.GetOffset(ctx, src.Tool, path)
	if err != nil {
		return culpa.Wrap(err, "get source offset")
	}

	events, newOffset, err := p.Parse(path, offset)
	if err != nil {
		return culpa.Wrap(err, "parse source file")
	}
	if len(events) == 0 {
		return nil
	}
	stampHost(events, cfg.Host)

	if src.BatchMaxBytes > math.MaxInt {
		return culpa.WithCode(fmt.Errorf("batch max bytes %d exceeds int max", src.BatchMaxBytes), "RUNNER_BATCH_CAP")
	}
	batches, err := BuildBatches(events, src.BatchMaxLines, int(src.BatchMaxBytes))
	if err != nil {
		return culpa.Wrap(err, "build event batches")
	}

	for _, batch := range batches {
		result, err := sender.PostBatch(ctx, batch.Events)
		if err != nil {
			if enqueueErr := enqueueBatch(ctx, store, src, cfg.Host, path, batch.Events, cfg.Outbox.MaxBytes); enqueueErr != nil {
				return errors.Join(culpa.Wrap(err, "post batch"), enqueueErr)
			}
			return culpa.Wrap(err, "post batch")
		}

		advanced, err := persistAcceptedOffset(ctx, store, src.Tool, path, events, newOffset, batch, result.Accepted)
		if err != nil {
			return err
		}
		if advanced < len(batch.Events) {
			return nil
		}
	}

	return nil
}

func stampHost(events []wire.TurnEvent, host string) {
	for i := range events {
		events[i].Host = host
	}
}

func enqueueBatch(ctx context.Context, store *state.Store, src config.SourceConfig, host string, path string, events []wire.TurnEvent, maxOutboxBytes int64) error {
	payload, err := EncodeNDJSON(events)
	if err != nil {
		return culpa.Wrap(err, "encode failed batch")
	}
	if err := store.Enqueue(ctx, state.OutboxItem{Tool: src.Tool, Host: host, SourceFile: path, Payload: payload}); err != nil {
		return culpa.Wrap(err, "enqueue failed batch")
	}
	if err := EnforceOutboxLimit(ctx, store, maxOutboxBytes); err != nil {
		return culpa.Wrap(err, "enforce outbox limit")
	}
	return nil
}

func persistAcceptedOffset(ctx context.Context, store *state.Store, tool string, path string, allEvents []wire.TurnEvent, newOffset int64, batch Batch, accepted int) (int, error) {
	if accepted < 0 || accepted > len(batch.Events) {
		return 0, culpa.WithCode(
			fmt.Errorf("server accepted %d events for batch length %d", accepted, len(batch.Events)),
			"RUNNER_ACCEPTED_INVALID",
		)
	}
	if accepted == 0 {
		return 0, nil
	}

	nextGlobalIndex := batch.EventIndexes[0] + accepted
	var offset int64
	if nextGlobalIndex >= len(allEvents) {
		offset = newOffset
	} else {
		offset = allEvents[nextGlobalIndex].Seq
	}

	if err := store.SaveOffset(ctx, tool, path, offset); err != nil {
		return 0, culpa.Wrap(err, "save accepted offset")
	}
	return accepted, nil
}

A internal/collector/ingest/runner_helpers_test.go => internal/collector/ingest/runner_helpers_test.go +94 -0
@@ 0,0 1,94 @@
package ingest

import (
	"context"
	"encoding/json"
	"errors"
	"path/filepath"
	"testing"
	"time"

	"sourcecraft.dev/bigbes/lethe/internal/collector/config"
	"sourcecraft.dev/bigbes/lethe/internal/collector/state"
	"sourcecraft.dev/bigbes/lethe/internal/shared/wire"
)

var (
	errParseBoom = errors.New("parse boom")
	errPostBoom  = errors.New("post boom")
)

func turnEvent(seq int64, content string) wire.TurnEvent {
	return wire.TurnEvent{
		Tool:      "claude-code",
		Host:      "test-host",
		SessionID: "session-1",
		TurnID:    content,
		Seq:       seq,
		Role:      "user",
		Timestamp: 123,
		Content:   content,
		SessionMeta: wire.SessionMeta{
			SourceFile: "source.jsonl",
		},
	}
}

func resultJSON(result Result) string {
	data, err := json.Marshal(result)
	if err != nil {
		panic(err)
	}
	return string(data)
}

func testConfig() config.Config {
	return config.Config{
		Host:      "test-host",
		ServerURL: "http://127.0.0.1",
		Outbox:    config.OutboxConfig{MaxBytes: 1024 * 1024},
	}
}

func testSource(t *testing.T, tool string, maxLines int, maxBytes int64) config.SourceConfig {
	t.Helper()
	return config.SourceConfig{
		Tool:          tool,
		Path:          t.TempDir(),
		PollInterval:  time.Millisecond,
		BatchMaxLines: maxLines,
		BatchMaxBytes: maxBytes,
	}
}

func openTestStore(t *testing.T, ctx context.Context) *state.Store {
	t.Helper()
	store, err := state.Open(ctx, filepath.Join(t.TempDir(), "state.db"))
	if err != nil {
		t.Fatalf("Open: %v", err)
	}
	t.Cleanup(func() { _ = store.Close() })
	return store
}

func assertOffset(t *testing.T, ctx context.Context, store *state.Store, tool, file string, want int64) {
	t.Helper()
	got, err := store.GetOffset(ctx, tool, file)
	if err != nil {
		t.Fatalf("GetOffset: %v", err)
	}
	if got != want {
		t.Errorf("offset = %d, want %d", got, want)
	}
}

func assertOutboxCount(t *testing.T, ctx context.Context, store *state.Store, want int64) {
	t.Helper()
	st, err := store.Stats(ctx)
	if err != nil {
		t.Fatalf("Stats: %v", err)
	}
	if st.OutboxCount != want {
		t.Errorf("OutboxCount = %d, want %d", st.OutboxCount, want)
	}
}

A internal/collector/ingest/runner_test.go => internal/collector/ingest/runner_test.go +213 -0
@@ 0,0 1,213 @@
package ingest

import (
	"context"
	"io"
	"net/http"
	"net/http/httptest"
	"path/filepath"
	"sync"
	"testing"
	"time"

	"sourcecraft.dev/bigbes/lethe/internal/collector/config"
	"sourcecraft.dev/bigbes/lethe/internal/collector/parser"
	"sourcecraft.dev/bigbes/lethe/internal/shared/wire"
)

func TestRunOnce_AllAcceptedPersistsNewOffsetAndDoesNotEnqueueOutbox(t *testing.T) {
	ctx := context.Background()
	store := openTestStore(t, ctx)
	source := testSource(t, "claude-code", 10, 4096)
	file := filepath.Join(source.Path, "one.jsonl")
	p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{
		file: {events: []wire.TurnEvent{turnEvent(100, "a"), turnEvent(200, "b")}, newOffset: 300},
	})
	sender := acceptingSender(t, []Result{{Accepted: 2}})

	err := RunOnce(ctx, testConfig(), source, p, store, sender)
	if err != nil {
		t.Fatalf("RunOnce: %v", err)
	}

	assertOffset(t, ctx, store, "claude-code", file, 300)
	assertOutboxCount(t, ctx, store, 0)
}

func TestRunOnce_PartialAcceptedPersistsFirstUnacceptedEventOffset(t *testing.T) {
	ctx := context.Background()
	store := openTestStore(t, ctx)
	source := testSource(t, "claude-code", 10, 4096)
	file := filepath.Join(source.Path, "one.jsonl")
	p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{
		file: {events: []wire.TurnEvent{turnEvent(100, "a"), turnEvent(200, "b"), turnEvent(300, "c")}, newOffset: 400},
	})
	sender := acceptingSender(t, []Result{{Accepted: 2, Errors: []LineError{{Line: 3, Err: "bad"}}}})

	err := RunOnce(ctx, testConfig(), source, p, store, sender)
	if err != nil {
		t.Fatalf("RunOnce: %v", err)
	}

	assertOffset(t, ctx, store, "claude-code", file, 300)
	assertOutboxCount(t, ctx, store, 0)
}

func TestRunOnce_ZeroAcceptedDoesNotPersistOffset(t *testing.T) {
	ctx := context.Background()
	store := openTestStore(t, ctx)
	source := testSource(t, "claude-code", 10, 4096)
	file := filepath.Join(source.Path, "one.jsonl")
	p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{
		file: {events: []wire.TurnEvent{turnEvent(100, "a"), turnEvent(200, "b")}, newOffset: 300},
	})
	sender := acceptingSender(t, []Result{{Accepted: 0, Errors: []LineError{{Line: 1, Err: "bad"}}}})

	err := RunOnce(ctx, testConfig(), source, p, store, sender)
	if err != nil {
		t.Fatalf("RunOnce: %v", err)
	}

	assertOffset(t, ctx, store, "claude-code", file, 0)
	assertOutboxCount(t, ctx, store, 0)
}

func TestRunOnce_HardErrorEnqueuesUnsentBatchAndDoesNotAdvanceOffset(t *testing.T) {
	ctx := context.Background()
	store := openTestStore(t, ctx)
	source := testSource(t, "claude-code", 10, 4096)
	file := filepath.Join(source.Path, "one.jsonl")
	events := []wire.TurnEvent{turnEvent(100, "a"), turnEvent(200, "b")}
	p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{
		file: {events: events, newOffset: 300},
	})
	sender := failingSender(t)

	err := RunOnce(ctx, testConfig(), source, p, store, sender)
	if err == nil {
		t.Fatal("expected RunOnce error on hard POST failure")
	}

	assertOffset(t, ctx, store, "claude-code", file, 0)
	rows, err := store.Oldest(ctx, 10)
	if err != nil {
		t.Fatalf("Oldest: %v", err)
	}
	if len(rows) != 1 {
		t.Fatalf("len(rows) = %d, want 1", len(rows))
	}
	wantPayload, err := EncodeNDJSON(events)
	if err != nil {
		t.Fatalf("EncodeNDJSON: %v", err)
	}
	if string(rows[0].Payload) != string(wantPayload) {
		t.Errorf("payload = %q, want %q", string(rows[0].Payload), string(wantPayload))
	}
}

func TestRunOnce_ContinuesToOtherFilesWhenOneFileFails(t *testing.T) {
	ctx := context.Background()
	store := openTestStore(t, ctx)
	source := testSource(t, "claude-code", 10, 4096)
	badFile := filepath.Join(source.Path, "bad.jsonl")
	goodFile := filepath.Join(source.Path, "good.jsonl")
	p := newFakeParser("claude-code", []parser.SourceFile{{Path: badFile}, {Path: goodFile}}, map[string]parseResult{
		badFile:  {err: errParseBoom},
		goodFile: {events: []wire.TurnEvent{turnEvent(100, "ok")}, newOffset: 200},
	})
	sender := acceptingSender(t, []Result{{Accepted: 1}})

	err := RunOnce(ctx, testConfig(), source, p, store, sender)
	if err == nil {
		t.Fatal("expected RunOnce to return collected parse error")
	}

	assertOffset(t, ctx, store, "claude-code", badFile, 0)
	assertOffset(t, ctx, store, "claude-code", goodFile, 200)
}

func TestRunDaemon_ExitsWhenContextCanceled(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	store := openTestStore(t, context.Background())
	source := testSource(t, "claude-code", 10, 4096)
	source.PollInterval = time.Hour
	p := newFakeParser("claude-code", nil, nil)
	sender := acceptingSender(t, nil)
	cfg := testConfig()
	cfg.Sources = []config.SourceConfig{source}

	done := make(chan error, 1)
	go func() {
		done <- RunDaemon(ctx, cfg, map[string]parser.Parser{"claude-code": p}, store, sender)
	}()

	cancel()
	select {
	case err := <-done:
		if err != nil {
			t.Fatalf("RunDaemon: %v", err)
		}
	case <-time.After(2 * time.Second):
		t.Fatal("RunDaemon did not exit after context cancellation")
	}
}

type parseResult struct {
	events    []wire.TurnEvent
	newOffset int64
	err       error
}

type fakeParser struct {
	tool    string
	files   []parser.SourceFile
	results map[string]parseResult
}

func newFakeParser(tool string, files []parser.SourceFile, results map[string]parseResult) *fakeParser {
	return &fakeParser{tool: tool, files: files, results: results}
}

func (p *fakeParser) Tool() string { return p.tool }

func (p *fakeParser) Discover(root string) ([]parser.SourceFile, error) { return p.files, nil }

func (p *fakeParser) Parse(path string, since int64) ([]wire.TurnEvent, int64, error) {
	res := p.results[path]
	if res.err != nil {
		return nil, since, res.err
	}
	return res.events, res.newOffset, nil
}

type roundTripFunc func(*http.Request) (*http.Response, error)

func (f roundTripFunc) RoundTrip(r *http.Request) (*http.Response, error) { return f(r) }

func acceptingSender(t *testing.T, results []Result) *Sender {
	t.Helper()
	var mu sync.Mutex
	idx := 0
	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		_, _ = io.ReadAll(r.Body)
		mu.Lock()
		defer mu.Unlock()
		result := Result{Accepted: 0}
		if idx < len(results) {
			result = results[idx]
			idx++
		}
		w.Header().Set("Content-Type", "application/json")
		_, _ = w.Write([]byte(resultJSON(result)))
	}))
	t.Cleanup(ts.Close)
	return NewSender(ts.URL, ts.Client())
}

func failingSender(t *testing.T) *Sender {
	t.Helper()
	client := &http.Client{Transport: roundTripFunc(func(*http.Request) (*http.Response, error) {
		return nil, errPostBoom
	})}
	return NewSender("http://127.0.0.1", client)
}