package ingest
import (
"context"
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"path/filepath"
"strings"
"sync"
"testing"
"time"
"sourcecraft.dev/bigbes/lethe/internal/collector/config"
"sourcecraft.dev/bigbes/lethe/internal/collector/parser"
"sourcecraft.dev/bigbes/lethe/internal/collector/state"
"sourcecraft.dev/bigbes/lethe/internal/shared/wire"
)
func TestRunOnce_AllAcceptedPersistsNewOffsetAndDoesNotEnqueueOutbox(t *testing.T) {
ctx := context.Background()
store := openTestStore(t, ctx)
source := testSource(t, "claude-code", 10, 4096)
file := filepath.Join(source.Path, "one.jsonl")
p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{
file: {events: []wire.TurnEvent{turnEvent(100, "a"), turnEvent(200, "b")}, newOffset: 300},
})
sender := acceptingSender(t, []Result{{Accepted: 2}})
err := RunOnce(ctx, testConfig(), source, p, store, sender)
if err != nil {
t.Fatalf("RunOnce: %v", err)
}
assertOffset(t, ctx, store, "claude-code", file, 300)
assertOutboxCount(t, ctx, store, 0)
}
func TestRunOnce_TrimsPreexistingOutboxOverLimitEvenWithNoEnqueue(t *testing.T) {
ctx := context.Background()
store := openTestStore(t, ctx)
source := testSource(t, "claude-code", 10, 4096)
file := filepath.Join(source.Path, "one.jsonl")
// Pre-populate outbox with two 10-byte items.
items := []state.OutboxItem{
{Tool: "claude-code", Host: "h", SourceFile: file, Payload: []byte("{\"seq\":1}\n")},
{Tool: "claude-code", Host: "h", SourceFile: file, Payload: []byte("{\"seq\":2}\n")},
}
for _, it := range items {
if err := store.Enqueue(ctx, it); err != nil {
t.Fatalf("Enqueue: %v", err)
}
}
cfg := testConfig()
cfg.Outbox.MaxBytes = 10
// No new events => no new enqueue.
p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{
file: {events: []wire.TurnEvent{}, newOffset: 0},
})
// Accepted:0 so replay does not delete rows; enforce must trim.
sender := acceptingSender(t, nil)
err := RunOnce(ctx, cfg, source, p, store, sender)
if err != nil {
t.Fatalf("RunOnce: %v", err)
}
rows, err := store.Oldest(ctx, 10)
if err != nil {
t.Fatalf("Oldest: %v", err)
}
if len(rows) != 1 {
t.Errorf("expected 1 row after trim, got %d", len(rows))
}
if len(rows) > 0 && string(rows[0].Payload) != "{\"seq\":2}\n" {
t.Errorf("remaining payload = %q, want %q", string(rows[0].Payload), "{\"seq\":2}\n")
}
}
func TestRunOnce_PartialAcceptWithErrorSkipsFailedEvent(t *testing.T) {
ctx := context.Background()
store := openTestStore(t, ctx)
source := testSource(t, "claude-code", 10, 4096)
file := filepath.Join(source.Path, "one.jsonl")
p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{
file: {events: []wire.TurnEvent{turnEvent(100, "a"), turnEvent(200, "b"), turnEvent(300, "c")}, newOffset: 400},
})
sender := acceptingSender(t, []Result{{Accepted: 1, Errors: []LineError{{Line: 2, Err: "bad"}}}})
err := RunOnce(ctx, testConfig(), source, p, store, sender)
if err != nil {
t.Fatalf("RunOnce: %v", err)
}
assertOffset(t, ctx, store, "claude-code", file, 300)
assertOutboxCount(t, ctx, store, 0)
}
func TestRunOnce_ZeroAcceptedWithErrorLineTwoPostsPrefixAndSkipsFailedEvent(t *testing.T) {
ctx := context.Background()
store := openTestStore(t, ctx)
source := testSource(t, "claude-code", 10, 4096)
file := filepath.Join(source.Path, "one.jsonl")
events := []wire.TurnEvent{turnEvent(100, "a"), turnEvent(200, "b"), turnEvent(300, "c")}
p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{
file: {events: events, newOffset: 400},
})
var posted [][]wire.TurnEvent
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, _ := io.ReadAll(r.Body)
var batch []wire.TurnEvent
for _, line := range strings.Split(string(body), "\n") {
if line == "" {
continue
}
var ev wire.TurnEvent
if err := json.Unmarshal([]byte(line), &ev); err != nil {
t.Fatalf("unmarshal event: %v", err)
}
batch = append(batch, ev)
}
posted = append(posted, batch)
w.Header().Set("Content-Type", "application/json")
if len(posted) == 1 {
_, _ = w.Write([]byte(resultJSON(Result{Accepted: 0, Errors: []LineError{{Line: 2, Err: "bad"}}})))
} else {
_, _ = w.Write([]byte(resultJSON(Result{Accepted: len(batch)})))
}
}))
defer ts.Close()
sender := NewSender(ts.URL, ts.Client())
err := RunOnce(ctx, testConfig(), source, p, store, sender)
if err != nil {
t.Fatalf("RunOnce first run: %v", err)
}
assertOffset(t, ctx, store, "claude-code", file, 300)
if len(posted) != 2 {
t.Fatalf("first run: expected 2 POSTs, got %d", len(posted))
}
if len(posted[0]) != 3 {
t.Errorf("first run batch: expected 3 events, got %d", len(posted[0]))
}
if len(posted[1]) != 1 {
t.Errorf("prefix batch: expected 1 event, got %d", len(posted[1]))
}
if posted[1][0].Content != "a" {
t.Errorf("prefix batch: expected event 'a', got %q", posted[1][0].Content)
}
err = RunOnce(ctx, testConfig(), source, p, store, sender)
if err != nil {
t.Fatalf("RunOnce second run: %v", err)
}
assertOffset(t, ctx, store, "claude-code", file, 400)
if len(posted) != 3 {
t.Fatalf("second run: expected 3 POSTs total, got %d", len(posted))
}
if len(posted[2]) != 1 {
t.Errorf("second run batch: expected 1 event, got %d", len(posted[2]))
}
if posted[2][0].Content != "c" {
t.Errorf("second run batch: expected event 'c', got %q", posted[2][0].Content)
}
}
func TestRunOnce_MalformedErrorLineOutOfRangeReturnsError(t *testing.T) {
ctx := context.Background()
store := openTestStore(t, ctx)
source := testSource(t, "claude-code", 10, 4096)
file := filepath.Join(source.Path, "one.jsonl")
p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{
file: {events: []wire.TurnEvent{turnEvent(100, "a"), turnEvent(200, "b")}, newOffset: 300},
})
sender := acceptingSender(t, []Result{{Accepted: 0, Errors: []LineError{{Line: 5, Err: "bad"}}}})
err := RunOnce(ctx, testConfig(), source, p, store, sender)
if err == nil {
t.Fatal("expected RunOnce error for malformed error line")
}
assertOffset(t, ctx, store, "claude-code", file, 0)
}
func TestRunOnce_ZeroAcceptedWithErrorSkipsFirstEvent(t *testing.T) {
ctx := context.Background()
store := openTestStore(t, ctx)
source := testSource(t, "claude-code", 10, 4096)
file := filepath.Join(source.Path, "one.jsonl")
p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{
file: {events: []wire.TurnEvent{turnEvent(100, "a"), turnEvent(200, "b")}, newOffset: 300},
})
sender := acceptingSender(t, []Result{{Accepted: 0, Errors: []LineError{{Line: 1, Err: "bad"}}}})
err := RunOnce(ctx, testConfig(), source, p, store, sender)
if err != nil {
t.Fatalf("RunOnce: %v", err)
}
assertOffset(t, ctx, store, "claude-code", file, 200)
assertOutboxCount(t, ctx, store, 0)
}
func TestRunOnce_PartialAcceptSkipsBadRowAndRerunDoesNotRepostIt(t *testing.T) {
ctx := context.Background()
store := openTestStore(t, ctx)
source := testSource(t, "claude-code", 10, 4096)
file := filepath.Join(source.Path, "one.jsonl")
events := []wire.TurnEvent{turnEvent(100, "a"), turnEvent(200, "b"), turnEvent(300, "c")}
p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{
file: {events: events, newOffset: 400},
})
var posted [][]wire.TurnEvent
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, _ := io.ReadAll(r.Body)
var batch []wire.TurnEvent
for _, line := range strings.Split(string(body), "\n") {
if line == "" {
continue
}
var ev wire.TurnEvent
if err := json.Unmarshal([]byte(line), &ev); err != nil {
t.Fatalf("unmarshal event: %v", err)
}
batch = append(batch, ev)
}
posted = append(posted, batch)
w.Header().Set("Content-Type", "application/json")
if len(posted) == 1 {
_, _ = w.Write([]byte(resultJSON(Result{Accepted: 1, Errors: []LineError{{Line: 2, Err: "bad"}}})))
} else {
_, _ = w.Write([]byte(resultJSON(Result{Accepted: 1})))
}
}))
defer ts.Close()
sender := NewSender(ts.URL, ts.Client())
err := RunOnce(ctx, testConfig(), source, p, store, sender)
if err != nil {
t.Fatalf("RunOnce first run: %v", err)
}
assertOffset(t, ctx, store, "claude-code", file, 300)
if len(posted) != 1 {
t.Fatalf("first run: expected 1 POST, got %d", len(posted))
}
if len(posted[0]) != 3 {
t.Errorf("first run: expected 3 events in batch, got %d", len(posted[0]))
}
err = RunOnce(ctx, testConfig(), source, p, store, sender)
if err != nil {
t.Fatalf("RunOnce second run: %v", err)
}
assertOffset(t, ctx, store, "claude-code", file, 400)
if len(posted) != 2 {
t.Fatalf("second run: expected 2 POSTs total, got %d", len(posted))
}
if len(posted[1]) != 1 {
t.Errorf("second run: expected 1 event in batch, got %d", len(posted[1]))
}
if posted[1][0].Content != "c" {
t.Errorf("second run: expected event 'c', got %q", posted[1][0].Content)
}
}
func TestRunOnce_FullAcceptWithErrorsReturnsError(t *testing.T) {
ctx := context.Background()
store := openTestStore(t, ctx)
source := testSource(t, "claude-code", 10, 4096)
file := filepath.Join(source.Path, "one.jsonl")
p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{
file: {events: []wire.TurnEvent{turnEvent(100, "a"), turnEvent(200, "b")}, newOffset: 300},
})
sender := acceptingSender(t, []Result{{Accepted: 2, Errors: []LineError{{Line: 2, Err: "bad"}}}})
err := RunOnce(ctx, testConfig(), source, p, store, sender)
if err == nil {
t.Fatal("expected RunOnce error when server accepts all but reports errors")
}
assertOffset(t, ctx, store, "claude-code", file, 0)
}
func TestRunOnce_HardErrorEnqueuesUnsentBatchAndDoesNotAdvanceOffset(t *testing.T) {
ctx := context.Background()
store := openTestStore(t, ctx)
source := testSource(t, "claude-code", 10, 4096)
file := filepath.Join(source.Path, "one.jsonl")
events := []wire.TurnEvent{turnEvent(100, "a"), turnEvent(200, "b")}
p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{
file: {events: events, newOffset: 300},
})
sender := failingSender(t)
err := RunOnce(ctx, testConfig(), source, p, store, sender)
if err == nil {
t.Fatal("expected RunOnce error on hard POST failure")
}
assertOffset(t, ctx, store, "claude-code", file, 0)
rows, err := store.Oldest(ctx, 10)
if err != nil {
t.Fatalf("Oldest: %v", err)
}
if len(rows) != 1 {
t.Fatalf("len(rows) = %d, want 1", len(rows))
}
wantPayload, err := EncodeNDJSON(events)
if err != nil {
t.Fatalf("EncodeNDJSON: %v", err)
}
if string(rows[0].Payload) != string(wantPayload) {
t.Errorf("payload = %q, want %q", string(rows[0].Payload), string(wantPayload))
}
}
func TestRunOnce_ContinuesToOtherFilesWhenOneFileFails(t *testing.T) {
ctx := context.Background()
store := openTestStore(t, ctx)
source := testSource(t, "claude-code", 10, 4096)
badFile := filepath.Join(source.Path, "bad.jsonl")
goodFile := filepath.Join(source.Path, "good.jsonl")
p := newFakeParser("claude-code", []parser.SourceFile{{Path: badFile}, {Path: goodFile}}, map[string]parseResult{
badFile: {err: errParseBoom},
goodFile: {events: []wire.TurnEvent{turnEvent(100, "ok")}, newOffset: 200},
})
sender := acceptingSender(t, []Result{{Accepted: 1}})
err := RunOnce(ctx, testConfig(), source, p, store, sender)
if err == nil {
t.Fatal("expected RunOnce to return collected parse error")
}
assertOffset(t, ctx, store, "claude-code", badFile, 0)
assertOffset(t, ctx, store, "claude-code", goodFile, 200)
}
func TestRunDaemon_ExitsWhenContextCanceled(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
store := openTestStore(t, context.Background())
source := testSource(t, "claude-code", 10, 4096)
source.PollInterval = time.Hour
p := newFakeParser("claude-code", nil, nil)
sender := acceptingSender(t, nil)
cfg := testConfig()
cfg.Sources = []config.SourceConfig{source}
done := make(chan error, 1)
go func() {
done <- RunDaemon(ctx, cfg, map[string]parser.Parser{"claude-code": p}, store, sender)
}()
cancel()
select {
case err := <-done:
if err != nil {
t.Fatalf("RunDaemon: %v", err)
}
case <-time.After(2 * time.Second):
t.Fatal("RunDaemon did not exit after context cancellation")
}
}
func TestRunDaemon_CancelWaitsForActiveRunOnce(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
store := openTestStore(t, context.Background())
source := testSource(t, "claude-code", 10, 4096)
source.PollInterval = time.Hour
file := filepath.Join(source.Path, "one.jsonl")
postStarted := make(chan struct{})
postContinue := make(chan struct{})
var postOnce sync.Once
p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{
file: {events: []wire.TurnEvent{turnEvent(100, "a")}, newOffset: 200},
})
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
postOnce.Do(func() { close(postStarted) })
<-postContinue
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(resultJSON(Result{Accepted: 1})))
}))
defer ts.Close()
sender := NewSender(ts.URL, ts.Client())
cfg := testConfig()
cfg.Sources = []config.SourceConfig{source}
cfg.HTTP.Timeout = 5 * time.Second
done := make(chan error, 1)
go func() {
done <- RunDaemon(ctx, cfg, map[string]parser.Parser{"claude-code": p}, store, sender)
}()
select {
case <-postStarted:
case <-time.After(2 * time.Second):
t.Fatal("POST did not start")
}
cancel()
select {
case <-done:
t.Fatal("RunDaemon should wait for active RunOnce, not exit immediately")
case <-time.After(100 * time.Millisecond):
// expected
}
close(postContinue)
select {
case err := <-done:
if err != nil {
t.Fatalf("RunDaemon: %v", err)
}
case <-time.After(2 * time.Second):
t.Fatal("RunDaemon did not exit after active run completed")
}
assertOffset(t, context.Background(), store, "claude-code", file, 200)
}
func TestRunBackfillOnce_IgnoresSavedOffsetAndPersistsProgress(t *testing.T) {
ctx := context.Background()
store := openTestStore(t, ctx)
source := testSource(t, "claude-code", 10, 4096)
file := filepath.Join(source.Path, "one.jsonl")
if err := store.SaveOffset(ctx, "claude-code", file, 150); err != nil {
t.Fatalf("SaveOffset: %v", err)
}
p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{
file: {events: []wire.TurnEvent{turnEvent(100, "a"), turnEvent(200, "b")}, newOffset: 300},
})
sender := acceptingSender(t, []Result{{Accepted: 2}})
err := RunBackfillOnce(ctx, testConfig(), source, p, store, sender)
if err != nil {
t.Fatalf("RunBackfillOnce: %v", err)
}
assertOffset(t, ctx, store, "claude-code", file, 300)
assertOutboxCount(t, ctx, store, 0)
}
func TestRunBackfillOnce_InterruptedProgressIsResumable(t *testing.T) {
ctx := context.Background()
store := openTestStore(t, ctx)
source := testSource(t, "claude-code", 10, 4096)
file := filepath.Join(source.Path, "one.jsonl")
if err := store.SaveOffset(ctx, "claude-code", file, 150); err != nil {
t.Fatalf("SaveOffset: %v", err)
}
events := []wire.TurnEvent{turnEvent(100, "a"), turnEvent(200, "b")}
p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{
file: {events: events, newOffset: 300},
})
sender := acceptingSender(t, []Result{{Accepted: 1}})
err := RunBackfillOnce(ctx, testConfig(), source, p, store, sender)
if err != nil {
t.Fatalf("RunBackfillOnce: %v", err)
}
assertOffset(t, ctx, store, "claude-code", file, 200)
sender2 := acceptingSender(t, []Result{{Accepted: 1}})
err = RunOnce(ctx, testConfig(), source, p, store, sender2)
if err != nil {
t.Fatalf("RunOnce resume: %v", err)
}
assertOffset(t, ctx, store, "claude-code", file, 300)
}
type parseResult struct {
events []wire.TurnEvent
newOffset int64
err error
}
type fakeParser struct {
tool string
files []parser.SourceFile
results map[string]parseResult
}
func newFakeParser(tool string, files []parser.SourceFile, results map[string]parseResult) *fakeParser {
return &fakeParser{tool: tool, files: files, results: results}
}
func (p *fakeParser) Tool() string { return p.tool }
func (p *fakeParser) Discover(root string) ([]parser.SourceFile, error) { return p.files, nil }
func (p *fakeParser) Parse(path string, since int64) ([]wire.TurnEvent, int64, error) {
res := p.results[path]
if res.err != nil {
return nil, since, res.err
}
var filtered []wire.TurnEvent
for _, ev := range res.events {
if ev.Seq >= since {
filtered = append(filtered, ev)
}
}
return filtered, res.newOffset, nil
}
type roundTripFunc func(*http.Request) (*http.Response, error)
func (f roundTripFunc) RoundTrip(r *http.Request) (*http.Response, error) { return f(r) }
func acceptingSender(t *testing.T, results []Result) *Sender {
t.Helper()
var mu sync.Mutex
idx := 0
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = io.ReadAll(r.Body)
mu.Lock()
defer mu.Unlock()
result := Result{Accepted: 0}
if idx < len(results) {
result = results[idx]
idx++
}
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(resultJSON(result)))
}))
t.Cleanup(ts.Close)
return NewSender(ts.URL, ts.Client())
}
func failingSender(t *testing.T) *Sender {
t.Helper()
client := &http.Client{Transport: roundTripFunc(func(*http.Request) (*http.Response, error) {
return nil, errPostBoom
})}
return NewSender("http://127.0.0.1", client)
}