M cmd/lethe-collector/main.go => cmd/lethe-collector/main.go +1 -1
@@ 125,7 125,7 @@ func newBackfillCmd(configPath *string) *cobra.Command {
continue
}
matched++
- if err := ingest.RunOnce(ctx, *cfg, src, p, store, sender); err != nil {
+ if err := ingest.RunBackfillOnce(ctx, *cfg, src, p, store, sender); err != nil {
errs = append(errs, fmt.Errorf("backfill %s (%s): %w", src.Tool, src.Path, err))
}
}
M docs/tasks/lethe-collector-claude-code.md => docs/tasks/lethe-collector-claude-code.md +2 -0
@@ 160,6 160,8 @@ Greenfield collector. The only interface contract this task can break is the wir
- udesign: `parentUuid` chaining of resumed Claude Code sessions deferred — every `.jsonl` is one session in this task. Surfacing chains is a UI concern for later.
- udesign: synthesized `turn_id` uses `sha256(session_id || seq || timestamp || content[:64])[:16]` — `content[:64]` is enough to disambiguate within a single timestamp; full-content hash would balloon for large turns.
- ureview: fixed partial-accept offset handling — skipped server-rejected rows so one bad turn cannot stall a source file.
+- ureview: bounded daemon drain uses `http.timeout` — no separate `shutdown_grace` config exists for the collector.
+- ureview: backfill offset-0 semantics are implemented as a dedicated `RunBackfillOnce` function rather than a mode flag on `RunOnce` — explicit call sites are safer than a boolean parameter that could be misused in daemon loops.
TDD: yes (reason: parser behavior on golden fixture `.jsonl` files, offset persistence/resume semantics, outbox replay, and idempotent re-POST behavior are exactly the deterministic regression-prone surfaces TDD is good for. CLI scaffolding and systemd unit are exempt.)
M internal/collector/ingest/runner.go => internal/collector/ingest/runner.go +48 -2
@@ 57,6 57,45 @@ func RunOnce(ctx context.Context, cfg config.Config, src config.SourceConfig, p
return errors.Join(errs...)
}
+// RunBackfillOnce is like RunOnce but starts every matched file at offset 0
+// instead of the persisted offset. Accepted progress is still saved, so an
+// interrupted backfill can be resumed via RunOnce.
+func RunBackfillOnce(ctx context.Context, cfg config.Config, src config.SourceConfig, p parser.Parser, store *state.Store, sender *Sender) error {
+ var errs []error
+
+ if p.Tool() != src.Tool {
+ return culpa.WithCode(
+ fmt.Errorf("parser tool %q does not match source tool %q", p.Tool(), src.Tool),
+ "RUNNER_TOOL_MISMATCH",
+ )
+ }
+
+ if err := ReplayOutbox(ctx, store, sender, replayOutboxLimit); err != nil {
+ if ctxErr := ctx.Err(); ctxErr != nil {
+ return errors.Join(ctxErr, culpa.Wrap(err, "replay outbox"))
+ }
+ slog.Warn("outbox replay failed", "tool", src.Tool, "error", err)
+ errs = append(errs, culpa.Wrap(err, "replay outbox"))
+ }
+
+ files, err := p.Discover(src.Path)
+ if err != nil {
+ return errors.Join(append(errs, culpa.Wrap(err, "discover source files"))...)
+ }
+
+ for _, file := range files {
+ if err := ctx.Err(); err != nil {
+ return errors.Join(append(errs, err)...)
+ }
+ if err := runFileFromOffset(ctx, cfg, src, p, store, sender, file.Path, 0); err != nil {
+ slog.Warn("source file ingest failed", "tool", src.Tool, "source_file", file.Path, "error", err)
+ errs = append(errs, err)
+ }
+ }
+
+ return errors.Join(errs...)
+}
+
// RunDaemon starts one independent polling loop per configured source and
// exits after the supplied context is canceled and all loops have stopped.
func RunDaemon(ctx context.Context, cfg config.Config, parsers map[string]parser.Parser, store *state.Store, sender *Sender) error {
@@ 92,7 131,11 @@ func pollSource(ctx context.Context, cfg config.Config, src config.SourceConfig,
if ctx.Err() != nil {
return
}
- if err := RunOnce(ctx, cfg, src, p, store, sender); err != nil && ctx.Err() == nil {
+ // Use a bounded context independent of daemon cancellation so an
+ // in-flight RunOnce is not aborted by SIGTERM.
+ runCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), cfg.HTTP.Timeout)
+ defer cancel()
+ if err := RunOnce(runCtx, cfg, src, p, store, sender); err != nil && runCtx.Err() == nil {
slog.Warn("source poll failed", "tool", src.Tool, "path", src.Path, "error", err)
}
}
@@ 116,8 159,11 @@ func runFile(ctx context.Context, cfg config.Config, src config.SourceConfig, p
if err != nil {
return culpa.Wrap(err, "get source offset")
}
+ return runFileFromOffset(ctx, cfg, src, p, store, sender, path, offset)
+}
- events, newOffset, err := p.Parse(path, offset)
+func runFileFromOffset(ctx context.Context, cfg config.Config, src config.SourceConfig, p parser.Parser, store *state.Store, sender *Sender, path string, startOffset int64) error {
+ events, newOffset, err := p.Parse(path, startOffset)
if err != nil {
return culpa.Wrap(err, "parse source file")
}
M internal/collector/ingest/runner_helpers_test.go => internal/collector/ingest/runner_helpers_test.go +1 -0
@@ 47,6 47,7 @@ func testConfig() config.Config {
Host: "test-host",
ServerURL: "http://127.0.0.1",
Outbox: config.OutboxConfig{MaxBytes: 1024 * 1024},
+ HTTP: config.HTTPConfig{Timeout: 30 * time.Second},
}
}
M internal/collector/ingest/runner_test.go => internal/collector/ingest/runner_test.go +116 -0
@@ 326,6 326,122 @@ func TestRunDaemon_ExitsWhenContextCanceled(t *testing.T) {
}
}
+func TestRunDaemon_CancelWaitsForActiveRunOnce(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+ store := openTestStore(t, context.Background())
+ source := testSource(t, "claude-code", 10, 4096)
+ source.PollInterval = time.Hour
+ file := filepath.Join(source.Path, "one.jsonl")
+
+ postStarted := make(chan struct{})
+ postContinue := make(chan struct{})
+ var postOnce sync.Once
+
+ p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{
+ file: {events: []wire.TurnEvent{turnEvent(100, "a")}, newOffset: 200},
+ })
+
+ ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ postOnce.Do(func() { close(postStarted) })
+ <-postContinue
+ w.Header().Set("Content-Type", "application/json")
+ _, _ = w.Write([]byte(resultJSON(Result{Accepted: 1})))
+ }))
+ defer ts.Close()
+ sender := NewSender(ts.URL, ts.Client())
+
+ cfg := testConfig()
+ cfg.Sources = []config.SourceConfig{source}
+ cfg.HTTP.Timeout = 5 * time.Second
+
+ done := make(chan error, 1)
+ go func() {
+ done <- RunDaemon(ctx, cfg, map[string]parser.Parser{"claude-code": p}, store, sender)
+ }()
+
+ select {
+ case <-postStarted:
+ case <-time.After(2 * time.Second):
+ t.Fatal("POST did not start")
+ }
+ cancel()
+
+ select {
+ case <-done:
+ t.Fatal("RunDaemon should wait for active RunOnce, not exit immediately")
+ case <-time.After(100 * time.Millisecond):
+ // expected
+ }
+
+ close(postContinue)
+
+ select {
+ case err := <-done:
+ if err != nil {
+ t.Fatalf("RunDaemon: %v", err)
+ }
+ case <-time.After(2 * time.Second):
+ t.Fatal("RunDaemon did not exit after active run completed")
+ }
+
+ assertOffset(t, context.Background(), store, "claude-code", file, 200)
+}
+
+func TestRunBackfillOnce_IgnoresSavedOffsetAndPersistsProgress(t *testing.T) {
+ ctx := context.Background()
+ store := openTestStore(t, ctx)
+ source := testSource(t, "claude-code", 10, 4096)
+ file := filepath.Join(source.Path, "one.jsonl")
+
+ if err := store.SaveOffset(ctx, "claude-code", file, 150); err != nil {
+ t.Fatalf("SaveOffset: %v", err)
+ }
+
+ p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{
+ file: {events: []wire.TurnEvent{turnEvent(100, "a"), turnEvent(200, "b")}, newOffset: 300},
+ })
+ sender := acceptingSender(t, []Result{{Accepted: 2}})
+
+ err := RunBackfillOnce(ctx, testConfig(), source, p, store, sender)
+ if err != nil {
+ t.Fatalf("RunBackfillOnce: %v", err)
+ }
+
+ assertOffset(t, ctx, store, "claude-code", file, 300)
+ assertOutboxCount(t, ctx, store, 0)
+}
+
+func TestRunBackfillOnce_InterruptedProgressIsResumable(t *testing.T) {
+ ctx := context.Background()
+ store := openTestStore(t, ctx)
+ source := testSource(t, "claude-code", 10, 4096)
+ file := filepath.Join(source.Path, "one.jsonl")
+
+ if err := store.SaveOffset(ctx, "claude-code", file, 150); err != nil {
+ t.Fatalf("SaveOffset: %v", err)
+ }
+
+ events := []wire.TurnEvent{turnEvent(100, "a"), turnEvent(200, "b")}
+ p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{
+ file: {events: events, newOffset: 300},
+ })
+ sender := acceptingSender(t, []Result{{Accepted: 1}})
+
+ err := RunBackfillOnce(ctx, testConfig(), source, p, store, sender)
+ if err != nil {
+ t.Fatalf("RunBackfillOnce: %v", err)
+ }
+
+ assertOffset(t, ctx, store, "claude-code", file, 200)
+
+ sender2 := acceptingSender(t, []Result{{Accepted: 1}})
+ err = RunOnce(ctx, testConfig(), source, p, store, sender2)
+ if err != nil {
+ t.Fatalf("RunOnce resume: %v", err)
+ }
+ assertOffset(t, ctx, store, "claude-code", file, 300)
+}
+
type parseResult struct {
events []wire.TurnEvent
newOffset int64