From 27c47142dda459f4dd552a8ceba4934eca81ff6b Mon Sep 17 00:00:00 2001 From: Eugene Blikh Date: Sun, 3 May 2026 18:40:41 +0300 Subject: [PATCH] collector: add polling source runner --- internal/collector/ingest/batch.go | 75 ++++++ internal/collector/ingest/batch_test.go | 82 +++++++ internal/collector/ingest/runner.go | 201 +++++++++++++++++ .../collector/ingest/runner_helpers_test.go | 94 ++++++++ internal/collector/ingest/runner_test.go | 213 ++++++++++++++++++ 5 files changed, 665 insertions(+) create mode 100644 internal/collector/ingest/batch.go create mode 100644 internal/collector/ingest/batch_test.go create mode 100644 internal/collector/ingest/runner.go create mode 100644 internal/collector/ingest/runner_helpers_test.go create mode 100644 internal/collector/ingest/runner_test.go diff --git a/internal/collector/ingest/batch.go b/internal/collector/ingest/batch.go new file mode 100644 index 0000000000000000000000000000000000000000..d1dc4b98da7cd4c5040a9d5177e7e5c77ea3ff1d --- /dev/null +++ b/internal/collector/ingest/batch.go @@ -0,0 +1,75 @@ +package ingest + +import ( + "fmt" + + "go.bigb.es/auxilia/culpa" + "sourcecraft.dev/bigbes/lethe/internal/shared/wire" +) + +// Batch is a POST-sized slice of events plus their indexes in the parsed file +// result. EventIndexes lets accepted counts map back to source offsets. +type Batch struct { + Events []wire.TurnEvent + EventIndexes []int +} + +// BuildBatches splits events by both line and encoded byte caps. +func BuildBatches(events []wire.TurnEvent, maxLines int, maxBytes int) ([]Batch, error) { + if maxLines <= 0 { + return nil, culpa.WithCode(fmt.Errorf("max lines must be positive: %d", maxLines), "BATCH_INVALID_CAP") + } + if maxBytes <= 0 { + return nil, culpa.WithCode(fmt.Errorf("max bytes must be positive: %d", maxBytes), "BATCH_INVALID_CAP") + } + if len(events) == 0 { + return nil, nil + } + + batches := make([]Batch, 0, (len(events)+maxLines-1)/maxLines) + var current Batch + var currentBytes int + + flush := func() { + if len(current.Events) == 0 { + return + } + batches = append(batches, current) + current = Batch{} + currentBytes = 0 + } + + for i, event := range events { + eventBytes, err := encodedEventLen(event) + if err != nil { + return nil, err + } + if eventBytes > maxBytes { + return nil, culpa.WithCode( + fmt.Errorf("event %d encodes to %d bytes, above max %d", i, eventBytes, maxBytes), + "BATCH_EVENT_TOO_LARGE", + ) + } + + lineCapReached := len(current.Events) >= maxLines + byteCapReached := currentBytes+eventBytes > maxBytes + if lineCapReached || byteCapReached { + flush() + } + + current.Events = append(current.Events, event) + current.EventIndexes = append(current.EventIndexes, i) + currentBytes += eventBytes + } + flush() + + return batches, nil +} + +func encodedEventLen(event wire.TurnEvent) (int, error) { + data, err := EncodeNDJSON([]wire.TurnEvent{event}) + if err != nil { + return 0, culpa.Wrap(err, "encode event for batch sizing") + } + return len(data), nil +} diff --git a/internal/collector/ingest/batch_test.go b/internal/collector/ingest/batch_test.go new file mode 100644 index 0000000000000000000000000000000000000000..4cf5712fe9bbbdafb588bb8ba9f53c36d9ef93b7 --- /dev/null +++ b/internal/collector/ingest/batch_test.go @@ -0,0 +1,82 @@ +package ingest + +import ( + "testing" + + "sourcecraft.dev/bigbes/lethe/internal/shared/wire" +) + +func TestBuildBatches_SplitsByMaxLines(t *testing.T) { + events := []wire.TurnEvent{ + turnEvent(0, "a"), + turnEvent(10, "b"), + turnEvent(20, "c"), + } + + batches, err := BuildBatches(events, 2, 1024) + if err != nil { + t.Fatalf("BuildBatches: %v", err) + } + + if len(batches) != 2 { + t.Fatalf("len(batches) = %d, want 2", len(batches)) + } + assertBatch(t, batches[0], []int{0, 1}) + assertBatch(t, batches[1], []int{2}) +} + +func TestBuildBatches_SplitsByMaxBytes(t *testing.T) { + events := []wire.TurnEvent{ + turnEvent(0, "a"), + turnEvent(10, "b"), + turnEvent(20, "c"), + } + firstSize := mustEncodedLen(t, events[:1]) + + batches, err := BuildBatches(events, 100, firstSize+1) + if err != nil { + t.Fatalf("BuildBatches: %v", err) + } + + if len(batches) != 3 { + t.Fatalf("len(batches) = %d, want 3", len(batches)) + } + assertBatch(t, batches[0], []int{0}) + assertBatch(t, batches[1], []int{1}) + assertBatch(t, batches[2], []int{2}) +} + +func TestBuildBatches_RejectsNonPositiveCaps(t *testing.T) { + events := []wire.TurnEvent{turnEvent(0, "a")} + + if _, err := BuildBatches(events, 0, 1024); err == nil { + t.Fatal("expected error for zero maxLines") + } + if _, err := BuildBatches(events, 1, 0); err == nil { + t.Fatal("expected error for zero maxBytes") + } +} + +func assertBatch(t *testing.T, batch Batch, wantIndexes []int) { + t.Helper() + if len(batch.Events) != len(wantIndexes) { + t.Fatalf("len(batch.Events) = %d, want %d", len(batch.Events), len(wantIndexes)) + } + if len(batch.EventIndexes) != len(wantIndexes) { + t.Fatalf("len(batch.EventIndexes) = %d, want %d", len(batch.EventIndexes), len(wantIndexes)) + } + for i, want := range wantIndexes { + if batch.EventIndexes[i] != want { + t.Errorf("EventIndexes[%d] = %d, want %d", i, batch.EventIndexes[i], want) + } + } +} + +func mustEncodedLen(t *testing.T, events []wire.TurnEvent) int { + t.Helper() + data, err := EncodeNDJSON(events) + if err != nil { + t.Fatalf("EncodeNDJSON: %v", err) + } + return len(data) +} diff --git a/internal/collector/ingest/runner.go b/internal/collector/ingest/runner.go new file mode 100644 index 0000000000000000000000000000000000000000..eb3f22135cadd039bc345e7799db334b306e42c7 --- /dev/null +++ b/internal/collector/ingest/runner.go @@ -0,0 +1,201 @@ +package ingest + +import ( + "context" + "errors" + "fmt" + "log/slog" + "math" + "time" + + "go.bigb.es/auxilia/async" + "go.bigb.es/auxilia/culpa" + "sourcecraft.dev/bigbes/lethe/internal/collector/config" + "sourcecraft.dev/bigbes/lethe/internal/collector/parser" + "sourcecraft.dev/bigbes/lethe/internal/collector/state" + "sourcecraft.dev/bigbes/lethe/internal/shared/wire" +) + +const replayOutboxLimit = 100 + +// RunOnce replays the safety-net outbox, scans one configured source, parses +// new complete records from persisted offsets, posts batches, and advances +// offsets only after server acceptance confirms the corresponding lines. +func RunOnce(ctx context.Context, cfg config.Config, src config.SourceConfig, p parser.Parser, store *state.Store, sender *Sender) error { + var errs []error + + if p.Tool() != src.Tool { + return culpa.WithCode( + fmt.Errorf("parser tool %q does not match source tool %q", p.Tool(), src.Tool), + "RUNNER_TOOL_MISMATCH", + ) + } + + if err := ReplayOutbox(ctx, store, sender, replayOutboxLimit); err != nil { + if ctxErr := ctx.Err(); ctxErr != nil { + return errors.Join(ctxErr, culpa.Wrap(err, "replay outbox")) + } + slog.Warn("outbox replay failed", "tool", src.Tool, "error", err) + errs = append(errs, culpa.Wrap(err, "replay outbox")) + } + + files, err := p.Discover(src.Path) + if err != nil { + return errors.Join(append(errs, culpa.Wrap(err, "discover source files"))...) + } + + for _, file := range files { + if err := ctx.Err(); err != nil { + return errors.Join(append(errs, err)...) + } + if err := runFile(ctx, cfg, src, p, store, sender, file.Path); err != nil { + slog.Warn("source file ingest failed", "tool", src.Tool, "source_file", file.Path, "error", err) + errs = append(errs, err) + } + } + + return errors.Join(errs...) +} + +// RunDaemon starts one independent polling loop per configured source and +// exits after the supplied context is canceled and all loops have stopped. +func RunDaemon(ctx context.Context, cfg config.Config, parsers map[string]parser.Parser, store *state.Store, sender *Sender) error { + if len(cfg.Sources) == 0 { + <-ctx.Done() + return nil + } + + group := async.NewGroup[struct{}]() + for _, source := range cfg.Sources { + src := source + p, ok := parsers[src.Tool] + if !ok { + return culpa.WithCode(fmt.Errorf("missing parser for source tool %q", src.Tool), "RUNNER_MISSING_PARSER") + } + group.AddExecutor(func(loopCtx context.Context) (struct{}, error) { + pollSource(loopCtx, cfg, src, p, store, sender) + return struct{}{}, nil + }) + } + + if err := group.Start(ctx); err != nil { + return culpa.Wrap(err, "start source polling loops") + } + if err := group.All(context.Background()); err != nil { + return culpa.Wrap(err, "wait for source polling loops") + } + return nil +} + +func pollSource(ctx context.Context, cfg config.Config, src config.SourceConfig, p parser.Parser, store *state.Store, sender *Sender) { + runAndLog := func() { + if ctx.Err() != nil { + return + } + if err := RunOnce(ctx, cfg, src, p, store, sender); err != nil && ctx.Err() == nil { + slog.Warn("source poll failed", "tool", src.Tool, "path", src.Path, "error", err) + } + } + + runAndLog() + ticker := time.NewTicker(src.PollInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + runAndLog() + } + } +} + +func runFile(ctx context.Context, cfg config.Config, src config.SourceConfig, p parser.Parser, store *state.Store, sender *Sender, path string) error { + offset, err := store.GetOffset(ctx, src.Tool, path) + if err != nil { + return culpa.Wrap(err, "get source offset") + } + + events, newOffset, err := p.Parse(path, offset) + if err != nil { + return culpa.Wrap(err, "parse source file") + } + if len(events) == 0 { + return nil + } + stampHost(events, cfg.Host) + + if src.BatchMaxBytes > math.MaxInt { + return culpa.WithCode(fmt.Errorf("batch max bytes %d exceeds int max", src.BatchMaxBytes), "RUNNER_BATCH_CAP") + } + batches, err := BuildBatches(events, src.BatchMaxLines, int(src.BatchMaxBytes)) + if err != nil { + return culpa.Wrap(err, "build event batches") + } + + for _, batch := range batches { + result, err := sender.PostBatch(ctx, batch.Events) + if err != nil { + if enqueueErr := enqueueBatch(ctx, store, src, cfg.Host, path, batch.Events, cfg.Outbox.MaxBytes); enqueueErr != nil { + return errors.Join(culpa.Wrap(err, "post batch"), enqueueErr) + } + return culpa.Wrap(err, "post batch") + } + + advanced, err := persistAcceptedOffset(ctx, store, src.Tool, path, events, newOffset, batch, result.Accepted) + if err != nil { + return err + } + if advanced < len(batch.Events) { + return nil + } + } + + return nil +} + +func stampHost(events []wire.TurnEvent, host string) { + for i := range events { + events[i].Host = host + } +} + +func enqueueBatch(ctx context.Context, store *state.Store, src config.SourceConfig, host string, path string, events []wire.TurnEvent, maxOutboxBytes int64) error { + payload, err := EncodeNDJSON(events) + if err != nil { + return culpa.Wrap(err, "encode failed batch") + } + if err := store.Enqueue(ctx, state.OutboxItem{Tool: src.Tool, Host: host, SourceFile: path, Payload: payload}); err != nil { + return culpa.Wrap(err, "enqueue failed batch") + } + if err := EnforceOutboxLimit(ctx, store, maxOutboxBytes); err != nil { + return culpa.Wrap(err, "enforce outbox limit") + } + return nil +} + +func persistAcceptedOffset(ctx context.Context, store *state.Store, tool string, path string, allEvents []wire.TurnEvent, newOffset int64, batch Batch, accepted int) (int, error) { + if accepted < 0 || accepted > len(batch.Events) { + return 0, culpa.WithCode( + fmt.Errorf("server accepted %d events for batch length %d", accepted, len(batch.Events)), + "RUNNER_ACCEPTED_INVALID", + ) + } + if accepted == 0 { + return 0, nil + } + + nextGlobalIndex := batch.EventIndexes[0] + accepted + var offset int64 + if nextGlobalIndex >= len(allEvents) { + offset = newOffset + } else { + offset = allEvents[nextGlobalIndex].Seq + } + + if err := store.SaveOffset(ctx, tool, path, offset); err != nil { + return 0, culpa.Wrap(err, "save accepted offset") + } + return accepted, nil +} diff --git a/internal/collector/ingest/runner_helpers_test.go b/internal/collector/ingest/runner_helpers_test.go new file mode 100644 index 0000000000000000000000000000000000000000..d6c39742dcef4c7414046c2613ac822a464356b2 --- /dev/null +++ b/internal/collector/ingest/runner_helpers_test.go @@ -0,0 +1,94 @@ +package ingest + +import ( + "context" + "encoding/json" + "errors" + "path/filepath" + "testing" + "time" + + "sourcecraft.dev/bigbes/lethe/internal/collector/config" + "sourcecraft.dev/bigbes/lethe/internal/collector/state" + "sourcecraft.dev/bigbes/lethe/internal/shared/wire" +) + +var ( + errParseBoom = errors.New("parse boom") + errPostBoom = errors.New("post boom") +) + +func turnEvent(seq int64, content string) wire.TurnEvent { + return wire.TurnEvent{ + Tool: "claude-code", + Host: "test-host", + SessionID: "session-1", + TurnID: content, + Seq: seq, + Role: "user", + Timestamp: 123, + Content: content, + SessionMeta: wire.SessionMeta{ + SourceFile: "source.jsonl", + }, + } +} + +func resultJSON(result Result) string { + data, err := json.Marshal(result) + if err != nil { + panic(err) + } + return string(data) +} + +func testConfig() config.Config { + return config.Config{ + Host: "test-host", + ServerURL: "http://127.0.0.1", + Outbox: config.OutboxConfig{MaxBytes: 1024 * 1024}, + } +} + +func testSource(t *testing.T, tool string, maxLines int, maxBytes int64) config.SourceConfig { + t.Helper() + return config.SourceConfig{ + Tool: tool, + Path: t.TempDir(), + PollInterval: time.Millisecond, + BatchMaxLines: maxLines, + BatchMaxBytes: maxBytes, + } +} + +func openTestStore(t *testing.T, ctx context.Context) *state.Store { + t.Helper() + store, err := state.Open(ctx, filepath.Join(t.TempDir(), "state.db")) + if err != nil { + t.Fatalf("Open: %v", err) + } + t.Cleanup(func() { _ = store.Close() }) + return store +} + +func assertOffset(t *testing.T, ctx context.Context, store *state.Store, tool, file string, want int64) { + t.Helper() + got, err := store.GetOffset(ctx, tool, file) + if err != nil { + t.Fatalf("GetOffset: %v", err) + } + if got != want { + t.Errorf("offset = %d, want %d", got, want) + } +} + +func assertOutboxCount(t *testing.T, ctx context.Context, store *state.Store, want int64) { + t.Helper() + st, err := store.Stats(ctx) + if err != nil { + t.Fatalf("Stats: %v", err) + } + if st.OutboxCount != want { + t.Errorf("OutboxCount = %d, want %d", st.OutboxCount, want) + } +} diff --git a/internal/collector/ingest/runner_test.go b/internal/collector/ingest/runner_test.go new file mode 100644 index 0000000000000000000000000000000000000000..6c610b9b8a5201aa8f4f0a94b53210ba11343da2 --- /dev/null +++ b/internal/collector/ingest/runner_test.go @@ -0,0 +1,213 @@ +package ingest + +import ( + "context" + "io" + "net/http" + "net/http/httptest" + "path/filepath" + "sync" + "testing" + "time" + + "sourcecraft.dev/bigbes/lethe/internal/collector/config" + "sourcecraft.dev/bigbes/lethe/internal/collector/parser" + "sourcecraft.dev/bigbes/lethe/internal/shared/wire" +) + +func TestRunOnce_AllAcceptedPersistsNewOffsetAndDoesNotEnqueueOutbox(t *testing.T) { + ctx := context.Background() + store := openTestStore(t, ctx) + source := testSource(t, "claude-code", 10, 4096) + file := filepath.Join(source.Path, "one.jsonl") + p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{ + file: {events: []wire.TurnEvent{turnEvent(100, "a"), turnEvent(200, "b")}, newOffset: 300}, + }) + sender := acceptingSender(t, []Result{{Accepted: 2}}) + + err := RunOnce(ctx, testConfig(), source, p, store, sender) + if err != nil { + t.Fatalf("RunOnce: %v", err) + } + + assertOffset(t, ctx, store, "claude-code", file, 300) + assertOutboxCount(t, ctx, store, 0) +} + +func TestRunOnce_PartialAcceptedPersistsFirstUnacceptedEventOffset(t *testing.T) { + ctx := context.Background() + store := openTestStore(t, ctx) + source := testSource(t, "claude-code", 10, 4096) + file := filepath.Join(source.Path, "one.jsonl") + p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{ + file: {events: []wire.TurnEvent{turnEvent(100, "a"), turnEvent(200, "b"), turnEvent(300, "c")}, newOffset: 400}, + }) + sender := acceptingSender(t, []Result{{Accepted: 2, Errors: []LineError{{Line: 3, Err: "bad"}}}}) + + err := RunOnce(ctx, testConfig(), source, p, store, sender) + if err != nil { + t.Fatalf("RunOnce: %v", err) + } + + assertOffset(t, ctx, store, "claude-code", file, 300) + assertOutboxCount(t, ctx, store, 0) +} + +func TestRunOnce_ZeroAcceptedDoesNotPersistOffset(t *testing.T) { + ctx := context.Background() + store := openTestStore(t, ctx) + source := testSource(t, "claude-code", 10, 4096) + file := filepath.Join(source.Path, "one.jsonl") + p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{ + file: {events: []wire.TurnEvent{turnEvent(100, "a"), turnEvent(200, "b")}, newOffset: 300}, + }) + sender := acceptingSender(t, []Result{{Accepted: 0, Errors: []LineError{{Line: 1, Err: "bad"}}}}) + + err := RunOnce(ctx, testConfig(), source, p, store, sender) + if err != nil { + t.Fatalf("RunOnce: %v", err) + } + + assertOffset(t, ctx, store, "claude-code", file, 0) + assertOutboxCount(t, ctx, store, 0) +} + +func TestRunOnce_HardErrorEnqueuesUnsentBatchAndDoesNotAdvanceOffset(t *testing.T) { + ctx := context.Background() + store := openTestStore(t, ctx) + source := testSource(t, "claude-code", 10, 4096) + file := filepath.Join(source.Path, "one.jsonl") + events := []wire.TurnEvent{turnEvent(100, "a"), turnEvent(200, "b")} + p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{ + file: {events: events, newOffset: 300}, + }) + sender := failingSender(t) + + err := RunOnce(ctx, testConfig(), source, p, store, sender) + if err == nil { + t.Fatal("expected RunOnce error on hard POST failure") + } + + assertOffset(t, ctx, store, "claude-code", file, 0) + rows, err := store.Oldest(ctx, 10) + if err != nil { + t.Fatalf("Oldest: %v", err) + } + if len(rows) != 1 { + t.Fatalf("len(rows) = %d, want 1", len(rows)) + } + wantPayload, err := EncodeNDJSON(events) + if err != nil { + t.Fatalf("EncodeNDJSON: %v", err) + } + if string(rows[0].Payload) != string(wantPayload) { + t.Errorf("payload = %q, want %q", string(rows[0].Payload), string(wantPayload)) + } +} + +func TestRunOnce_ContinuesToOtherFilesWhenOneFileFails(t *testing.T) { + ctx := context.Background() + store := openTestStore(t, ctx) + source := testSource(t, "claude-code", 10, 4096) + badFile := filepath.Join(source.Path, "bad.jsonl") + goodFile := filepath.Join(source.Path, "good.jsonl") + p := newFakeParser("claude-code", []parser.SourceFile{{Path: badFile}, {Path: goodFile}}, map[string]parseResult{ + badFile: {err: errParseBoom}, + goodFile: {events: []wire.TurnEvent{turnEvent(100, "ok")}, newOffset: 200}, + }) + sender := acceptingSender(t, []Result{{Accepted: 1}}) + + err := RunOnce(ctx, testConfig(), source, p, store, sender) + if err == nil { + t.Fatal("expected RunOnce to return collected parse error") + } + + assertOffset(t, ctx, store, "claude-code", badFile, 0) + assertOffset(t, ctx, store, "claude-code", goodFile, 200) +} + +func TestRunDaemon_ExitsWhenContextCanceled(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + store := openTestStore(t, context.Background()) + source := testSource(t, "claude-code", 10, 4096) + source.PollInterval = time.Hour + p := newFakeParser("claude-code", nil, nil) + sender := acceptingSender(t, nil) + cfg := testConfig() + cfg.Sources = []config.SourceConfig{source} + + done := make(chan error, 1) + go func() { + done <- RunDaemon(ctx, cfg, map[string]parser.Parser{"claude-code": p}, store, sender) + }() + + cancel() + select { + case err := <-done: + if err != nil { + t.Fatalf("RunDaemon: %v", err) + } + case <-time.After(2 * time.Second): + t.Fatal("RunDaemon did not exit after context cancellation") + } +} + +type parseResult struct { + events []wire.TurnEvent + newOffset int64 + err error +} + +type fakeParser struct { + tool string + files []parser.SourceFile + results map[string]parseResult +} + +func newFakeParser(tool string, files []parser.SourceFile, results map[string]parseResult) *fakeParser { + return &fakeParser{tool: tool, files: files, results: results} +} + +func (p *fakeParser) Tool() string { return p.tool } + +func (p *fakeParser) Discover(root string) ([]parser.SourceFile, error) { return p.files, nil } + +func (p *fakeParser) Parse(path string, since int64) ([]wire.TurnEvent, int64, error) { + res := p.results[path] + if res.err != nil { + return nil, since, res.err + } + return res.events, res.newOffset, nil +} + +type roundTripFunc func(*http.Request) (*http.Response, error) + +func (f roundTripFunc) RoundTrip(r *http.Request) (*http.Response, error) { return f(r) } + +func acceptingSender(t *testing.T, results []Result) *Sender { + t.Helper() + var mu sync.Mutex + idx := 0 + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, _ = io.ReadAll(r.Body) + mu.Lock() + defer mu.Unlock() + result := Result{Accepted: 0} + if idx < len(results) { + result = results[idx] + idx++ + } + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(resultJSON(result))) + })) + t.Cleanup(ts.Close) + return NewSender(ts.URL, ts.Client()) +} + +func failingSender(t *testing.T) *Sender { + t.Helper() + client := &http.Client{Transport: roundTripFunc(func(*http.Request) (*http.Response, error) { + return nil, errPostBoom + })} + return NewSender("http://127.0.0.1", client) +}