From f1673181bc8cd0298403408f20740423a43b84a0 Mon Sep 17 00:00:00 2001 From: Eugene Blikh Date: Sun, 3 May 2026 20:20:25 +0300 Subject: [PATCH] collector: fix daemon drain and backfill start --- cmd/lethe-collector/main.go | 2 +- docs/tasks/lethe-collector-claude-code.md | 2 + internal/collector/ingest/runner.go | 50 +++++++- .../collector/ingest/runner_helpers_test.go | 1 + internal/collector/ingest/runner_test.go | 116 ++++++++++++++++++ 5 files changed, 168 insertions(+), 3 deletions(-) diff --git a/cmd/lethe-collector/main.go b/cmd/lethe-collector/main.go index c52154032eabfc90332b2987ef8ec5d31383d0de..a814601a2c3cbb5dd3494563a7117a75b56cd371 100644 --- a/cmd/lethe-collector/main.go +++ b/cmd/lethe-collector/main.go @@ -125,7 +125,7 @@ func newBackfillCmd(configPath *string) *cobra.Command { continue } matched++ - if err := ingest.RunOnce(ctx, *cfg, src, p, store, sender); err != nil { + if err := ingest.RunBackfillOnce(ctx, *cfg, src, p, store, sender); err != nil { errs = append(errs, fmt.Errorf("backfill %s (%s): %w", src.Tool, src.Path, err)) } } diff --git a/docs/tasks/lethe-collector-claude-code.md b/docs/tasks/lethe-collector-claude-code.md index 757691c4206d864e3212b137ad72b2acfa53c468..92d28e2c8c27ea89bce4020b05355515bf89b8bc 100644 --- a/docs/tasks/lethe-collector-claude-code.md +++ b/docs/tasks/lethe-collector-claude-code.md @@ -160,6 +160,8 @@ Greenfield collector. The only interface contract this task can break is the wir - udesign: `parentUuid` chaining of resumed Claude Code sessions deferred — every `.jsonl` is one session in this task. Surfacing chains is a UI concern for later. - udesign: synthesized `turn_id` uses `sha256(session_id || seq || timestamp || content[:64])[:16]` — `content[:64]` is enough to disambiguate within a single timestamp; full-content hash would balloon for large turns. - ureview: fixed partial-accept offset handling — skipped server-rejected rows so one bad turn cannot stall a source file. +- ureview: bounded daemon drain uses `http.timeout` — no separate `shutdown_grace` config exists for the collector. +- ureview: backfill offset-0 semantics are implemented as a dedicated `RunBackfillOnce` function rather than a mode flag on `RunOnce` — explicit call sites are safer than a boolean parameter that could be misused in daemon loops. TDD: yes (reason: parser behavior on golden fixture `.jsonl` files, offset persistence/resume semantics, outbox replay, and idempotent re-POST behavior are exactly the deterministic regression-prone surfaces TDD is good for. CLI scaffolding and systemd unit are exempt.) diff --git a/internal/collector/ingest/runner.go b/internal/collector/ingest/runner.go index 65577cbde167e39ee762106c0468896dc5afe547..d8c363dab167430c512f3047375bdd2bf2e1d930 100644 --- a/internal/collector/ingest/runner.go +++ b/internal/collector/ingest/runner.go @@ -57,6 +57,45 @@ func RunOnce(ctx context.Context, cfg config.Config, src config.SourceConfig, p return errors.Join(errs...) } +// RunBackfillOnce is like RunOnce but starts every matched file at offset 0 +// instead of the persisted offset. Accepted progress is still saved, so an +// interrupted backfill can be resumed via RunOnce. +func RunBackfillOnce(ctx context.Context, cfg config.Config, src config.SourceConfig, p parser.Parser, store *state.Store, sender *Sender) error { + var errs []error + + if p.Tool() != src.Tool { + return culpa.WithCode( + fmt.Errorf("parser tool %q does not match source tool %q", p.Tool(), src.Tool), + "RUNNER_TOOL_MISMATCH", + ) + } + + if err := ReplayOutbox(ctx, store, sender, replayOutboxLimit); err != nil { + if ctxErr := ctx.Err(); ctxErr != nil { + return errors.Join(ctxErr, culpa.Wrap(err, "replay outbox")) + } + slog.Warn("outbox replay failed", "tool", src.Tool, "error", err) + errs = append(errs, culpa.Wrap(err, "replay outbox")) + } + + files, err := p.Discover(src.Path) + if err != nil { + return errors.Join(append(errs, culpa.Wrap(err, "discover source files"))...) + } + + for _, file := range files { + if err := ctx.Err(); err != nil { + return errors.Join(append(errs, err)...) + } + if err := runFileFromOffset(ctx, cfg, src, p, store, sender, file.Path, 0); err != nil { + slog.Warn("source file ingest failed", "tool", src.Tool, "source_file", file.Path, "error", err) + errs = append(errs, err) + } + } + + return errors.Join(errs...) +} + // RunDaemon starts one independent polling loop per configured source and // exits after the supplied context is canceled and all loops have stopped. func RunDaemon(ctx context.Context, cfg config.Config, parsers map[string]parser.Parser, store *state.Store, sender *Sender) error { @@ -92,7 +131,11 @@ func pollSource(ctx context.Context, cfg config.Config, src config.SourceConfig, if ctx.Err() != nil { return } - if err := RunOnce(ctx, cfg, src, p, store, sender); err != nil && ctx.Err() == nil { + // Use a bounded context independent of daemon cancellation so an + // in-flight RunOnce is not aborted by SIGTERM. + runCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), cfg.HTTP.Timeout) + defer cancel() + if err := RunOnce(runCtx, cfg, src, p, store, sender); err != nil && runCtx.Err() == nil { slog.Warn("source poll failed", "tool", src.Tool, "path", src.Path, "error", err) } } @@ -116,8 +159,11 @@ func runFile(ctx context.Context, cfg config.Config, src config.SourceConfig, p if err != nil { return culpa.Wrap(err, "get source offset") } + return runFileFromOffset(ctx, cfg, src, p, store, sender, path, offset) +} - events, newOffset, err := p.Parse(path, offset) +func runFileFromOffset(ctx context.Context, cfg config.Config, src config.SourceConfig, p parser.Parser, store *state.Store, sender *Sender, path string, startOffset int64) error { + events, newOffset, err := p.Parse(path, startOffset) if err != nil { return culpa.Wrap(err, "parse source file") } diff --git a/internal/collector/ingest/runner_helpers_test.go b/internal/collector/ingest/runner_helpers_test.go index d6c39742dcef4c7414046c2613ac822a464356b2..8815186032f25d17a2181e25bd73fbfb21c27bb5 100644 --- a/internal/collector/ingest/runner_helpers_test.go +++ b/internal/collector/ingest/runner_helpers_test.go @@ -47,6 +47,7 @@ func testConfig() config.Config { Host: "test-host", ServerURL: "http://127.0.0.1", Outbox: config.OutboxConfig{MaxBytes: 1024 * 1024}, + HTTP: config.HTTPConfig{Timeout: 30 * time.Second}, } } diff --git a/internal/collector/ingest/runner_test.go b/internal/collector/ingest/runner_test.go index 2952d57aa05d3af8ad05d2290030f933ef8110ea..1f4cc8204471dccd68c9ac9bfe33c68c4499ccb2 100644 --- a/internal/collector/ingest/runner_test.go +++ b/internal/collector/ingest/runner_test.go @@ -326,6 +326,122 @@ func TestRunDaemon_ExitsWhenContextCanceled(t *testing.T) { } } +func TestRunDaemon_CancelWaitsForActiveRunOnce(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + store := openTestStore(t, context.Background()) + source := testSource(t, "claude-code", 10, 4096) + source.PollInterval = time.Hour + file := filepath.Join(source.Path, "one.jsonl") + + postStarted := make(chan struct{}) + postContinue := make(chan struct{}) + var postOnce sync.Once + + p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{ + file: {events: []wire.TurnEvent{turnEvent(100, "a")}, newOffset: 200}, + }) + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + postOnce.Do(func() { close(postStarted) }) + <-postContinue + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(resultJSON(Result{Accepted: 1}))) + })) + defer ts.Close() + sender := NewSender(ts.URL, ts.Client()) + + cfg := testConfig() + cfg.Sources = []config.SourceConfig{source} + cfg.HTTP.Timeout = 5 * time.Second + + done := make(chan error, 1) + go func() { + done <- RunDaemon(ctx, cfg, map[string]parser.Parser{"claude-code": p}, store, sender) + }() + + select { + case <-postStarted: + case <-time.After(2 * time.Second): + t.Fatal("POST did not start") + } + cancel() + + select { + case <-done: + t.Fatal("RunDaemon should wait for active RunOnce, not exit immediately") + case <-time.After(100 * time.Millisecond): + // expected + } + + close(postContinue) + + select { + case err := <-done: + if err != nil { + t.Fatalf("RunDaemon: %v", err) + } + case <-time.After(2 * time.Second): + t.Fatal("RunDaemon did not exit after active run completed") + } + + assertOffset(t, context.Background(), store, "claude-code", file, 200) +} + +func TestRunBackfillOnce_IgnoresSavedOffsetAndPersistsProgress(t *testing.T) { + ctx := context.Background() + store := openTestStore(t, ctx) + source := testSource(t, "claude-code", 10, 4096) + file := filepath.Join(source.Path, "one.jsonl") + + if err := store.SaveOffset(ctx, "claude-code", file, 150); err != nil { + t.Fatalf("SaveOffset: %v", err) + } + + p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{ + file: {events: []wire.TurnEvent{turnEvent(100, "a"), turnEvent(200, "b")}, newOffset: 300}, + }) + sender := acceptingSender(t, []Result{{Accepted: 2}}) + + err := RunBackfillOnce(ctx, testConfig(), source, p, store, sender) + if err != nil { + t.Fatalf("RunBackfillOnce: %v", err) + } + + assertOffset(t, ctx, store, "claude-code", file, 300) + assertOutboxCount(t, ctx, store, 0) +} + +func TestRunBackfillOnce_InterruptedProgressIsResumable(t *testing.T) { + ctx := context.Background() + store := openTestStore(t, ctx) + source := testSource(t, "claude-code", 10, 4096) + file := filepath.Join(source.Path, "one.jsonl") + + if err := store.SaveOffset(ctx, "claude-code", file, 150); err != nil { + t.Fatalf("SaveOffset: %v", err) + } + + events := []wire.TurnEvent{turnEvent(100, "a"), turnEvent(200, "b")} + p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{ + file: {events: events, newOffset: 300}, + }) + sender := acceptingSender(t, []Result{{Accepted: 1}}) + + err := RunBackfillOnce(ctx, testConfig(), source, p, store, sender) + if err != nil { + t.Fatalf("RunBackfillOnce: %v", err) + } + + assertOffset(t, ctx, store, "claude-code", file, 200) + + sender2 := acceptingSender(t, []Result{{Accepted: 1}}) + err = RunOnce(ctx, testConfig(), source, p, store, sender2) + if err != nil { + t.Fatalf("RunOnce resume: %v", err) + } + assertOffset(t, ctx, store, "claude-code", file, 300) +} + type parseResult struct { events []wire.TurnEvent newOffset int64