~bigbes/lethe

ref: f3118d95bdf88c114346b0b2b43fad44e564a484 lethe/docs/tasks/lethe-collector-claude-code.md -rw-r--r-- 24.2 KiB
f3118d95 — Eugene Blikh collector: preserve valid rows around ingest errors 24 days ago

#lethe-collector-claude-code

Status: executing Branch: task/lethe-collector-claude-code Worktree: /Users/blikh/data/home/lethe/.worktrees/lethe-collector-claude-code Mode: hands-off Module: sourcecraft.dev/bigbes/lethe Depends on: lethe-server.md (#1) — locks the wire format and ingest semantics this task targets. Sibling tasks (deferred): lethe-search-and-opencode.md (#3) and per-tool follow-ups (lethe-collector-crush.md, etc.) when the time comes.

#Design

#Purpose

Stand up the lethe-collector binary and the first parser (Claude Code). End state: a systemd user service on the laptop watches ~/.claude/projects/, parses new turns, ships them to the running lethe server over Tailscale, and survives offline periods via a local outbox. Re-runs are safe and resumable.

A successful end state for this task: the collector has been running on the laptop for an hour against real Claude Code activity, and the server's HTML timeline shows my actual recent sessions, with turns matching what's in the .jsonl files.

#Scope

In:

  • Single Go binary lethe-collector (cmd/lethe-collector/main.go), cobra-based:
    • lethe-collector daemon — long-running, watches all configured sources.
    • lethe-collector backfill <tool> — one-shot, walks all source files from offset 0, ships everything; resumable via the same offset state.
    • lethe-collector status — prints per-source ingestion lag, outbox depth, last error.
  • Parser interface in internal/collector/parser/ (the Parser type from RFC §6.2, populated against the locked internal/shared/wire/ types).
  • Claude Code parser in internal/collector/parser/claudecode/ with golden-file fixtures.
  • Local state DB in ~/.local/state/lethe/state.db (SQLite, one file): tables ingestion_state (per source file offset) and outbox (buffered events when server is unreachable).
  • Polling-based discovery and ingestion loop (no fsnotify); per-source goroutines orchestrated via auxilia/async.
  • HTTPS POST to the server's /api/v1/ingest over Tailscale; relies on tailscale serve injecting Tailscale-User-Login for the authenticated daemon.
  • Outbox replay with exponential backoff, bounded size (default 100 MB), oldest-drop on overflow with WARN.
  • systemd user unit shipped at deploy/lethe-collector.service with Restart=always, WantedBy=default.target, journald logging.
  • Configuration via YAML at ~/.config/lethe/collector.yaml, loaded with the same Viper strict-mode pattern as the server.
  • Logging via scribe, errors via culpa.

Out:

  • Other parsers (opencode → #3; crush, pi, kimi → their own task files later).
  • Any server-side change. Server is locked from #1; if a wire-format gap is discovered, it gets a separate amendment task.
  • macOS launchd unit (Linux only for v1; trivially added later — same binary, different unit file).
  • TUI / curses status. status prints plain text.
  • File-watcher backend (fsnotify or inotify directly).
  • HTTP/2 push, gRPC, or any non-NDJSON-over-HTTPS transport.
  • Multi-user / multi-account configurations (still single-tenant).

#Chosen approach

CLI: cobra. Three subcommands. daemon is the default deployed mode; backfill is the bootstrap and disaster-recovery tool; status is the operator's quick-look. Cobra is overkill for one command but right for three and pulls its weight from this task onward.

Discovery: polling. Every source has a configurable poll_interval (default 30s). On each tick, the source walks its root, lists candidate files (e.g. **/*.jsonl under ~/.claude/projects/), and processes each one independently. Polling beats fsnotify here because:

  • Source tools may write via rename(tmp, final) — fsnotify fires on a path that immediately doesn't exist at handle-open time.
  • Long-running sessions append continuously; a single file gets touched many times — polling coalesces naturally.
  • Cross-machine, cross-FS, cross-tool: polling has no edge cases. fsnotify has many.

Per-source ingestion loop.

  1. Walk the root, list source files.
  2. For each file: load last_offset from ingestion_state keyed by (tool, source_file).
  3. Open file read-only, seek to last_offset, scan to EOF using bufio.Scanner with a sufficiently-large buffer (Claude Code lines can be hundreds of KB).
  4. For each complete line, hand to the parser, accumulate wire.TurnEvents.
  5. Batch up to N events (default 500) or M bytes (default 8 MiB), whichever first; serialize to NDJSON; POST to /api/v1/ingest.
  6. On 200 {accepted: K, errors: [...]}: persist last_offset = offset_at_line(K) and continue from line K+1. If K < N, log the errors at WARN and skip the bad lines (their offset is also persisted past them so they don't loop forever).
  7. On 5xx or network error: serialize the unsent events into the outbox table and break to next file. Replay attempted on next tick.
  8. Sleep poll_interval, loop.

Outbox. A outbox table in the state DB: (id INTEGER PK AUTOINCREMENT, tool TEXT, host TEXT, source_file TEXT, payload BLOB, created_at INTEGER). On every tick, before processing fresh files, the loop tries to replay outbox rows oldest-first in chunks. Each successful POST deletes the rows it committed. Bounded by outbox.max_bytes config (default 100 MiB); when exceeded, oldest rows are dropped and a WARN is logged. The "happy path" (server reachable) never writes to the outbox at all — it's a strict overflow buffer.

Parser interface.

package parser

import "sourcecraft.dev/bigbes/lethe/internal/shared/wire"

type Parser interface {
    Tool() string
    Discover(root string) ([]SourceFile, error)
    Parse(path string, since int64) (events []wire.TurnEvent, newOffset int64, err error)
}

type SourceFile struct {
    Path string
    Size int64
}

Parse returns events in source order with monotonically-increasing seq. If a line is malformed, the parser returns it as a system-role turn with the raw line in metadata (so it shows up in the archive but doesn't poison search) and continues. newOffset is the byte position immediately after the last fully-parsed line — never mid-line, so a partial trailing write is left for the next poll.

Claude Code parser specifics.

  • Source root: ~/.claude/projects/. Real corpus includes both */<session-uuid>.jsonl and nested */<session-uuid>/subagents/*.jsonl; ingest every .jsonl file as its own session.
  • session_id: the UUID from the filename. The directory name (<project-hash>) goes into session_meta.metadata for project attribution.
  • One .jsonl line = one event, parsed into a permissive struct that uses json.RawMessage for any ambiguous field.
  • Event-type mapping keys off message.role plus nested message.content[].type, not just the top-level record type: in current Claude logs, tool use lives inside assistant records and tool results live inside user records.
    • message.role: "user" with string content → role: "user", content = the user message text.
    • message.role: "assistant" with text parts → role: "assistant", content = joined assistant text parts; model from the event; tokens_in/out from usage.input_tokens/output_tokens when present.
    • message.content[].type: "tool_use" and message.content[].type: "tool_result"role: "tool", content = a short rendered summary (e.g. "<tool_use: Read file=...>"), full payload into tool_calls JSON.
    • Non-turn records (permission-mode, attachment, ai-title, last-prompt, etc.) are skipped unless they fail to parse, in which case they degrade to a system turn with the raw line in metadata.
  • cwd field → session_meta.working_dir. The path of the file → session_meta.source_file.
  • cost_usd left null (Max-billed sessions don't reliably report cost).
  • turn_id: prefer the event's uuid field. When missing, synthesize sha256(session_id || seq || timestamp || content[:64]) truncated to 16 bytes hex.
  • parentUuid (resume chaining): stored in turn metadata for now. Chaining sessions across files is a #3-or-later UI concern — every .jsonl file is a session in this task.
  • Slash commands and sub-agent invocations: their event subtypes go into metadata opaquely. The UI in #1 already renders metadata as JSON-collapsed; surfacing them properly is a later refinement.

Auth. The collector POSTs to https://<phoebe>.tailnet.ts.net/api/v1/ingest. tailscale serve on phoebe terminates HTTPS and injects Tailscale-User-Login from the connecting node's owner. The server validates that header against its allowlist. If tailscale serve doesn't inject the header for non-browser clients (the open question from #1), the deploy step fixes it — the collector code itself is unchanged.

Configuration. YAML at ~/.config/lethe/collector.yaml:

server_url: "https://phoebe.<tailnet>.ts.net"
host: "laptop"                              # required; identifies this machine in the archive
state_dir: "~/.local/state/lethe"

http:
  timeout: "30s"
  retry_max: 5

outbox:
  max_bytes: 104857600                      # 100 MiB

sources:
  - tool: "claude-code"
    path: "~/.claude/projects"
    poll_interval: "30s"
    batch_max_lines: 500
    batch_max_bytes: 8388608                # 8 MiB

log:
  level: info
  format: human

host is required and has no default. The host string is the user's choice; the server stores it verbatim.

Tradeoffs that settled it.

  • Polling vs fsnotify: polling is correct in every case; fsnotify isn't. The wasted CPU of one os.ReadDir per minute is irrelevant.
  • Outbox in SQLite vs flat-file queue: one file (state.db) for both offsets and outbox, atomic transactions, no separate format to debug. Cost is one extra dependency that was already required.
  • Parse-then-batch vs streaming POST: batching keeps the wire protocol simple (NDJSON body, one HTTP call) and lets the server commit chunks atomically. Streaming would force the server to handle interrupted bodies — the RFC's chunked-commit response shape works because the body is bounded.
  • Synthesize missing turn_ids vs require source IDs: Claude Code always provides UUIDs in current versions, but the parser can't assume that holds for older fixture files or future regressions. Synthesis preserves idempotency; the rare case of a content[:64] collision within one session at one timestamp is acceptable.

#Assumptions

  • AS1 — The server-side ingest contract remains the locked internal/shared/wire.TurnEvent over POST /api/v1/ingest.
  • AS2 — Claude Code transcript files are append-only for the byte ranges the collector has already read.
  • AS3 — This task has one host identity and one configured Claude Code source root per collector process.

#Unknowns

  • UK1 — Whether tailscale serve injects Tailscale-User-Login for daemon HTTP clients.
  • UK2 — True line-size distribution of Claude Code .jsonl events.
  • UK3 — Whether the laptop's ~/.claude/projects/ ever contains files concurrent-written from multiple Claude Code processes.

#Backwards-compatibility check

Greenfield collector. The only interface contract this task can break is the wire format with the server, which is locked into internal/shared/wire/ and cannot drift unilaterally.

#Hands-off decisions

  • udesign: parser interface placed at internal/collector/parser/ rather than internal/parsers/ (RFC §6.2 used internal/parsers/<tool>/) — keeps "collector-internal" code in one subtree, mirrors internal/server/ from #1.
  • udesign: outbox lives in the same state DB as offsets — single SQLite file is simpler than two stores; transaction guarantees are useful when an offset bump and an outbox dequeue happen together.
  • udesign: bad-line handling skips with WARN rather than halting the file — one corrupt line in .jsonl shouldn't pause ingestion of the rest. Risk: silent data loss if many lines are wrong; mitigated by counting WARNs in status.
  • udesign: host is required config with no default — auto-detecting via os.Hostname() produces noise on machines whose hostname is myname-mbp.local. Forcing the user to choose laptop / workpc keeps the archive's host column meaningful.
  • udesign: TOML → YAML for collector config — consistent with the server's config format from #1; one parser, one mental model.
  • udesign: parentUuid chaining of resumed Claude Code sessions deferred — every .jsonl is one session in this task. Surfacing chains is a UI concern for later.
  • udesign: synthesized turn_id uses sha256(session_id || seq || timestamp || content[:64])[:16]content[:64] is enough to disambiguate within a single timestamp; full-content hash would balloon for large turns.
  • ureview: fixed partial-accept offset handling — skipped server-rejected rows so one bad turn cannot stall a source file.

TDD: yes (reason: parser behavior on golden fixture .jsonl files, offset persistence/resume semantics, outbox replay, and idempotent re-POST behavior are exactly the deterministic regression-prone surfaces TDD is good for. CLI scaffolding and systemd unit are exempt.)

#Invariants

  • The collector opens source files read-only. No code path writes, renames, or deletes anything under any source root.
  • ingestion_state.last_offset is persisted only after the server returns accepted: N and the offset has been advanced past line N.
  • Offsets are byte positions immediately after a fully-parsed \n. Never mid-line; partial trailing lines are left for the next poll.
  • A re-run after crash, kill -9, power loss, or container restart resumes from the last persisted offset. Server-side idempotency handles any duplicates the offset miss generates.
  • The outbox is bounded by outbox.max_bytes. Overflow drops oldest entries with a WARN; never blocks the loop.
  • One source file's failure (parse error, permission error, deleted file mid-loop) does not stop ingestion of any other source file.
  • wire.TurnEvent.host on every emitted event equals the host from config. The collector does not infer host from os.Hostname() or any environment.
  • The Parser interface is the only point of tool-specific knowledge in the collector. The ingestion loop knows nothing about Claude Code's JSON shape.
  • All HTTP requests to the server include the configured server_url and the path /api/v1/ingest; no other endpoints are called from daemon mode (status may call read endpoints).
  • daemon mode handles SIGTERM by stopping new polls, draining in-flight POSTs (bounded), persisting any in-memory offset advances, and exiting with code 0.

#Principles

  • Polling beats watching. The cost is bounded; the correctness is total.
  • Each source's loop is independent. Nothing in the architecture forces cross-source coordination.
  • The parser is the only file that knows a tool's format. Adding a tool is one new directory under internal/collector/parser/, plus a register.go import.
  • The outbox is a safety net, not the primary path. The happy path skips it entirely. If a bug forces all traffic through the outbox, that's a regression worth alerting on.
  • Permissive parsing: unknown fields → metadata, malformed lines → system-role turn with raw payload. Never panic, never stall.
  • No background goroutines without a context.Context tied to shutdown.
  • Test against real fixture files (anonymized snippets from ~/.claude/projects/ checked into testdata/), not hand-crafted minimal JSON.

#Plan

Approach: keep the parser as the only Claude-specific layer, then add small collector packages for config, SQLite state/outbox, HTTP sending, and orchestration; the CLI is a thin cobra shell over those packages.

#PH1 — Config And State

  • Tier: smart — config/state define the contracts every later phase consumes.
  • 1.1 internal/collector/config/config.go:1-220 (create)
    • Load(path string) (*Config, error) — strict Viper YAML loader with ~ expansion, defaults except required host, and validation.
    • Respects: IV7, PC2, GPC1.
  • 1.2 internal/collector/state/store.go:1-260 (create)
    • Open(ctx context.Context, path string) (*Store, error) — opens SQLite, creates parent dir, applies embedded migrations.
    • GetOffset(ctx context.Context, tool, sourceFile string) (int64, error) / SaveOffset(ctx context.Context, tool, sourceFile string, offset int64) error.
    • Enqueue(ctx context.Context, item OutboxItem) error, Oldest(ctx context.Context, limit int) ([]OutboxRow, error), Delete(ctx context.Context, ids []int64) error, Stats(ctx context.Context) (Stats, error).
    • Respects: IV2, IV4, IV5, GPC5.
  • 1.3 internal/collector/state/migrations.go:1-80 (create)
    • applyMigrations(ctx context.Context, db *sqlx.DB) error — idempotent DDL for ingestion_state and outbox.
    • Respects: IV4, IV5.
  • Commit: collector: add config and state store

#PH2 — HTTP Send And Outbox Replay

  • Tier: smart — partial-accept offset semantics and outbox deletion must match the server contract exactly.
  • 2.1 internal/collector/ingest/sender.go:1-240 (create)
    • PostBatch(ctx context.Context, events []wire.TurnEvent) (Result, error) — serializes NDJSON, POSTs server_url + /api/v1/ingest, decodes {accepted,errors}.
    • EncodeNDJSON(events []wire.TurnEvent) ([]byte, error) — shared by sender and outbox tests.
    • Respects: IV2, IV8, GPC4.
  • 2.2 internal/collector/ingest/outbox.go:1-220 (create)
    • ReplayOutbox(ctx context.Context, store *state.Store, sender *Sender, limit int) error — oldest-first replay, delete only fully accepted rows.
    • EnforceOutboxLimit(ctx context.Context, store *state.Store, maxBytes int64) error — oldest-drop overflow.
    • Respects: IV5, PC3, GPC5.
  • Commit: collector: add ingest sender and outbox replay

#PH3 — Source Runner

  • Tier: deep — this phase owns resumability, shutdown, and per-source isolation.
  • 3.1 internal/collector/ingest/runner.go:1-320 (create)
    • RunOnce(ctx context.Context, cfg config.Config, src config.Source, p parser.Parser, store *state.Store, sender *Sender) error — replay outbox, discover files, parse from persisted offset, send batches, persist accepted offsets.
    • RunDaemon(ctx context.Context, cfg config.Config, parsers map[string]parser.Parser, store *state.Store, sender *Sender) error — per-source polling loops via auxilia/async and context-bound shutdown.
    • Respects: IV1-IV8, PC1-PC6, AS1-AS3.
  • 3.2 internal/collector/ingest/batch.go:1-160 (create)
    • BuildBatches(events []wire.TurnEvent, maxLines int, maxBytes int) ([]Batch, error) — records event indexes so accepted counts map back to offsets.
    • Respects: IV2, IV3.
  • Commit: collector: add polling source runner

#PH4 — CLI And Deploy

  • Tier: smart — command behavior is user-facing but mostly glue.
  • 4.1 cmd/lethe-collector/main.go:1-260 (create)
    • newRootCmd() *cobra.Command, newDaemonCmd() *cobra.Command, newBackfillCmd() *cobra.Command, newStatusCmd() *cobra.Command.
    • Default config path is ~/.config/lethe/collector.yaml; host still has no default inside config.
    • Respects: IV6, IV7, IV9, GPC6.
  • 4.2 deploy/lethe-collector.service:1-40 (create)
    • systemd user unit running lethe-collector daemon with journald logging and restart policy.
    • Respects: IV9.
  • 4.3 docs/tasks/lethe-collector-claude-code.md (modify)
    • Record implementation decisions, deferred items, and verify results.
    • Respects: GPC7.
  • Commit: collector: add lethe-collector cli

#Test strategy

  • RED first: internal/collector/config tests for strict unknown-key rejection, required host, YAML defaults, and ~ expansion.
  • RED first: internal/collector/state tests for migration idempotency, offset upsert, outbox FIFO replay rows, byte accounting, and oldest-drop limit.
  • RED first: internal/collector/ingest tests for NDJSON encoding, partial accepted-count offset persistence, network-failure outbox enqueue, replay deletion, and batch byte/line caps.
  • Existing parser tests remain the regression gate for Claude Code format handling.

#Order & dependencies

  • PH1 blocks PH2-PH4.
  • PH2 blocks PH3.
  • PH3 blocks PH4 daemon/backfill behavior; status can be implemented after PH1.

#Risks / rollback

  • RK1 — The server returns accepted counts but not source offsets, so PH3 must retain per-event source offsets in-memory and enqueue whole batches on hard failures.
  • RK2 — tailscale serve header behavior remains empirical; verify records the result and defers token fallback if needed rather than changing the locked server in this task.

#Interfaces

  • IF1 — config.Load(path string) (*Config, error) — all CLI commands load the same strict collector YAML.
  • IF2 — state.Store offset/outbox methods — runner and status share one SQLite boundary.
  • IF3 — ingest.Sender.PostBatch(ctx, events) — runner and outbox replay share one HTTP boundary.
  • IF4 — ingest.RunOnce / ingest.RunDaemon — CLI commands do not know parser, offset, or batching internals.

#Interface graph

  • PH1 -> IF1, IF2 @ internal/collector/config/, internal/collector/state/
  • PH2 IF2 -> IF3 @ internal/collector/ingest/sender.go, internal/collector/ingest/outbox.go
  • PH3 IF1, IF2, IF3 -> IF4 @ internal/collector/ingest/runner.go, internal/collector/ingest/batch.go
  • PH4 IF1, IF2, IF4 -> @ cmd/lethe-collector/, deploy/

Backwards-compat: greenfield collector; PH2 must not mutate internal/shared/wire, and all server interaction stays inside the existing POST /api/v1/ingest contract.

Scope check: no server changes, no extra parser registry abstraction, and no token-auth fallback unless verify proves Tailscale forwarding cannot work.

#Verify

Result: passed

Positive:

  • CK1 — go test ./... -count=1 passes.
  • CK2 — go build ./cmd/lethe-collector succeeds.
  • CK3 — lethe-collector status with a minimal config opens the state DB and reports the configured source.

Negative:

  • CK4 — lethe-collector status --config ./tmp/missing.yaml exits non-zero with CONFIG_NOT_FOUND surfaced.

Invariants / assumptions:

  • CK5 (IV7) — internal/collector has no os.Hostname call; host flows from collector config.
  • CK6 (IV9, AS1) — sender posts only TurnEvent NDJSON to /api/v1/ingest.
  • CK7 (UK1) — Tailscale header injection remains unverifiable without the deployed Tailscale path.

Interfaces:

  • CK8 (IF1) — config.Load(path string) (*Config, error) is exercised by CLI and config tests.
  • CK9 (IF2) — state.Store offset/outbox methods are exercised by runner and state tests.
  • CK10 (IF3) — Sender.PostBatch(ctx, events) is exercised by sender, outbox, and runner tests.
  • CK11 (IF4) — RunOnce / RunDaemon are exercised by CLI wiring and runner tests.

Smoke: go run ./cmd/lethe-collector --config ./tmp/collector-smoke.yaml status → prints host, state DB, outbox stats, and source list.

#Conclusion

#Hands-off decisions

  • size: Medium — the design is complete and remaining work spans CLI, config, state, HTTP, daemon, deploy, and tests.
  • worktree: task/lethe-collector-claude-code at /Users/blikh/data/home/lethe/.worktrees/lethe-collector-claude-code — hands-off requires isolated reversible edits.
  • worktree setup: added .worktrees/ to .gitignore on master before creating the task worktree — git-worktrees requires project-local worktree directories to be ignored.
  • uplan: plan auto-approved (hands-off).
  • ureview (re-review): fixed persistAcceptedOffset to use the first error's Line (1-based within the request body) to identify the failed row, rather than assuming result.Accepted points to it. Valid but uncommitted rows before the failed line are re-posted as a smaller prefix before the failed row is skipped, preventing data loss when accepted=0, errors line=2.
  • ureview (re-review): added WARN log with dropped row count and bytes to EnforceOutboxLimit.
  • ureview (re-review): added lag_bytes per file to status output using parser.SourceFile.Size from discovery.

#Deferred / known limitations

  • status does not display per-source last_error because the PH1 state schema (ingestion_state table) stores only last_offset, not error history or timestamps. Adding these requires a schema extension outside the current plan scope.

#Deferred (needs user input)

  • retry/backoff: http.retry_max is loaded from config but exponential backoff needs a configured base/max delay; no conservative default was specified.
  • status last_error: requires extending ingestion_state schema and deciding retention/update semantics.