package ingest
import (
"context"
"io"
"net/http"
"net/http/httptest"
"path/filepath"
"sync"
"testing"
"time"
"sourcecraft.dev/bigbes/lethe/internal/collector/config"
"sourcecraft.dev/bigbes/lethe/internal/collector/parser"
"sourcecraft.dev/bigbes/lethe/internal/shared/wire"
)
func TestRunOnce_AllAcceptedPersistsNewOffsetAndDoesNotEnqueueOutbox(t *testing.T) {
ctx := context.Background()
store := openTestStore(t, ctx)
source := testSource(t, "claude-code", 10, 4096)
file := filepath.Join(source.Path, "one.jsonl")
p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{
file: {events: []wire.TurnEvent{turnEvent(100, "a"), turnEvent(200, "b")}, newOffset: 300},
})
sender := acceptingSender(t, []Result{{Accepted: 2}})
err := RunOnce(ctx, testConfig(), source, p, store, sender)
if err != nil {
t.Fatalf("RunOnce: %v", err)
}
assertOffset(t, ctx, store, "claude-code", file, 300)
assertOutboxCount(t, ctx, store, 0)
}
func TestRunOnce_PartialAcceptedPersistsFirstUnacceptedEventOffset(t *testing.T) {
ctx := context.Background()
store := openTestStore(t, ctx)
source := testSource(t, "claude-code", 10, 4096)
file := filepath.Join(source.Path, "one.jsonl")
p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{
file: {events: []wire.TurnEvent{turnEvent(100, "a"), turnEvent(200, "b"), turnEvent(300, "c")}, newOffset: 400},
})
sender := acceptingSender(t, []Result{{Accepted: 2, Errors: []LineError{{Line: 3, Err: "bad"}}}})
err := RunOnce(ctx, testConfig(), source, p, store, sender)
if err != nil {
t.Fatalf("RunOnce: %v", err)
}
assertOffset(t, ctx, store, "claude-code", file, 300)
assertOutboxCount(t, ctx, store, 0)
}
func TestRunOnce_ZeroAcceptedDoesNotPersistOffset(t *testing.T) {
ctx := context.Background()
store := openTestStore(t, ctx)
source := testSource(t, "claude-code", 10, 4096)
file := filepath.Join(source.Path, "one.jsonl")
p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{
file: {events: []wire.TurnEvent{turnEvent(100, "a"), turnEvent(200, "b")}, newOffset: 300},
})
sender := acceptingSender(t, []Result{{Accepted: 0, Errors: []LineError{{Line: 1, Err: "bad"}}}})
err := RunOnce(ctx, testConfig(), source, p, store, sender)
if err != nil {
t.Fatalf("RunOnce: %v", err)
}
assertOffset(t, ctx, store, "claude-code", file, 0)
assertOutboxCount(t, ctx, store, 0)
}
func TestRunOnce_HardErrorEnqueuesUnsentBatchAndDoesNotAdvanceOffset(t *testing.T) {
ctx := context.Background()
store := openTestStore(t, ctx)
source := testSource(t, "claude-code", 10, 4096)
file := filepath.Join(source.Path, "one.jsonl")
events := []wire.TurnEvent{turnEvent(100, "a"), turnEvent(200, "b")}
p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{
file: {events: events, newOffset: 300},
})
sender := failingSender(t)
err := RunOnce(ctx, testConfig(), source, p, store, sender)
if err == nil {
t.Fatal("expected RunOnce error on hard POST failure")
}
assertOffset(t, ctx, store, "claude-code", file, 0)
rows, err := store.Oldest(ctx, 10)
if err != nil {
t.Fatalf("Oldest: %v", err)
}
if len(rows) != 1 {
t.Fatalf("len(rows) = %d, want 1", len(rows))
}
wantPayload, err := EncodeNDJSON(events)
if err != nil {
t.Fatalf("EncodeNDJSON: %v", err)
}
if string(rows[0].Payload) != string(wantPayload) {
t.Errorf("payload = %q, want %q", string(rows[0].Payload), string(wantPayload))
}
}
func TestRunOnce_ContinuesToOtherFilesWhenOneFileFails(t *testing.T) {
ctx := context.Background()
store := openTestStore(t, ctx)
source := testSource(t, "claude-code", 10, 4096)
badFile := filepath.Join(source.Path, "bad.jsonl")
goodFile := filepath.Join(source.Path, "good.jsonl")
p := newFakeParser("claude-code", []parser.SourceFile{{Path: badFile}, {Path: goodFile}}, map[string]parseResult{
badFile: {err: errParseBoom},
goodFile: {events: []wire.TurnEvent{turnEvent(100, "ok")}, newOffset: 200},
})
sender := acceptingSender(t, []Result{{Accepted: 1}})
err := RunOnce(ctx, testConfig(), source, p, store, sender)
if err == nil {
t.Fatal("expected RunOnce to return collected parse error")
}
assertOffset(t, ctx, store, "claude-code", badFile, 0)
assertOffset(t, ctx, store, "claude-code", goodFile, 200)
}
func TestRunDaemon_ExitsWhenContextCanceled(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
store := openTestStore(t, context.Background())
source := testSource(t, "claude-code", 10, 4096)
source.PollInterval = time.Hour
p := newFakeParser("claude-code", nil, nil)
sender := acceptingSender(t, nil)
cfg := testConfig()
cfg.Sources = []config.SourceConfig{source}
done := make(chan error, 1)
go func() {
done <- RunDaemon(ctx, cfg, map[string]parser.Parser{"claude-code": p}, store, sender)
}()
cancel()
select {
case err := <-done:
if err != nil {
t.Fatalf("RunDaemon: %v", err)
}
case <-time.After(2 * time.Second):
t.Fatal("RunDaemon did not exit after context cancellation")
}
}
type parseResult struct {
events []wire.TurnEvent
newOffset int64
err error
}
type fakeParser struct {
tool string
files []parser.SourceFile
results map[string]parseResult
}
func newFakeParser(tool string, files []parser.SourceFile, results map[string]parseResult) *fakeParser {
return &fakeParser{tool: tool, files: files, results: results}
}
func (p *fakeParser) Tool() string { return p.tool }
func (p *fakeParser) Discover(root string) ([]parser.SourceFile, error) { return p.files, nil }
func (p *fakeParser) Parse(path string, since int64) ([]wire.TurnEvent, int64, error) {
res := p.results[path]
if res.err != nil {
return nil, since, res.err
}
return res.events, res.newOffset, nil
}
type roundTripFunc func(*http.Request) (*http.Response, error)
func (f roundTripFunc) RoundTrip(r *http.Request) (*http.Response, error) { return f(r) }
func acceptingSender(t *testing.T, results []Result) *Sender {
t.Helper()
var mu sync.Mutex
idx := 0
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = io.ReadAll(r.Body)
mu.Lock()
defer mu.Unlock()
result := Result{Accepted: 0}
if idx < len(results) {
result = results[idx]
idx++
}
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(resultJSON(result)))
}))
t.Cleanup(ts.Close)
return NewSender(ts.URL, ts.Client())
}
func failingSender(t *testing.T) *Sender {
t.Helper()
client := &http.Client{Transport: roundTripFunc(func(*http.Request) (*http.Response, error) {
return nil, errPostBoom
})}
return NewSender("http://127.0.0.1", client)
}