package ingest import ( "context" "encoding/json" "errors" "path/filepath" "testing" "time" "sourcecraft.dev/bigbes/lethe/internal/collector/config" "sourcecraft.dev/bigbes/lethe/internal/collector/state" "sourcecraft.dev/bigbes/lethe/internal/shared/wire" ) var ( errParseBoom = errors.New("parse boom") errPostBoom = errors.New("post boom") ) func turnEvent(seq int64, content string) wire.TurnEvent { return wire.TurnEvent{ Tool: "claude-code", Host: "test-host", SessionID: "session-1", TurnID: content, Seq: seq, Role: "user", Timestamp: 123, Content: content, SessionMeta: wire.SessionMeta{ SourceFile: "source.jsonl", }, } } func resultJSON(result Result) string { data, err := json.Marshal(result) if err != nil { panic(err) } return string(data) } func testConfig() config.Config { return config.Config{ Host: "test-host", ServerURL: "http://127.0.0.1", Outbox: config.OutboxConfig{MaxBytes: 1024 * 1024}, HTTP: config.HTTPConfig{Timeout: 30 * time.Second}, } } func testSource(t *testing.T, tool string, maxLines int, maxBytes int64) config.SourceConfig { t.Helper() return config.SourceConfig{ Tool: tool, Path: t.TempDir(), PollInterval: time.Millisecond, BatchMaxLines: maxLines, BatchMaxBytes: maxBytes, } } func openTestStore(t *testing.T, ctx context.Context) *state.Store { t.Helper() store, err := state.Open(ctx, filepath.Join(t.TempDir(), "state.db")) if err != nil { t.Fatalf("Open: %v", err) } t.Cleanup(func() { _ = store.Close() }) return store } func assertOffset(t *testing.T, ctx context.Context, store *state.Store, tool, file string, want int64) { t.Helper() got, err := store.GetOffset(ctx, tool, file) if err != nil { t.Fatalf("GetOffset: %v", err) } if got != want { t.Errorf("offset = %d, want %d", got, want) } } func assertOutboxCount(t *testing.T, ctx context.Context, store *state.Store, want int64) { t.Helper() st, err := store.Stats(ctx) if err != nil { t.Fatalf("Stats: %v", err) } if st.OutboxCount != want { t.Errorf("OutboxCount = %d, want %d", st.OutboxCount, want) } }