package ingest import ( "context" "net/http" "net/http/httptest" "testing" "sourcecraft.dev/bigbes/lethe/internal/collector/state" ) func TestReplayOutbox_DeletesOnlyFullyAcceptedRows(t *testing.T) { ctx := context.Background() dir := t.TempDir() store, err := state.Open(ctx, dir+"/test.db") if err != nil { t.Fatalf("Open: %v", err) } defer func() { _ = store.Close() }() // Enqueue two items. item1 := state.OutboxItem{Tool: "cc", Host: "h", SourceFile: "/a.jsonl", Payload: []byte(`{"seq":1} `)} item2 := state.OutboxItem{Tool: "cc", Host: "h", SourceFile: "/b.jsonl", Payload: []byte(`{"seq":2} `)} if err := store.Enqueue(ctx, item1); err != nil { t.Fatalf("Enqueue 1: %v", err) } if err := store.Enqueue(ctx, item2); err != nil { t.Fatalf("Enqueue 2: %v", err) } // Server accepts all rows. callCount := 0 ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { callCount++ w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte(`{"accepted":1,"errors":[]}`)) })) defer ts.Close() sender := NewSender(ts.URL, http.DefaultClient) if err := ReplayOutbox(ctx, store, sender, 10); err != nil { t.Fatalf("ReplayOutbox: %v", err) } if callCount != 2 { t.Errorf("expected 2 server calls, got %d", callCount) } rows, err := store.Oldest(ctx, 10) if err != nil { t.Fatalf("Oldest: %v", err) } if len(rows) != 0 { t.Errorf("expected 0 rows after full replay, got %d", len(rows)) } } func TestReplayOutbox_LeavesRowOnPartialAccept(t *testing.T) { ctx := context.Background() dir := t.TempDir() store, err := state.Open(ctx, dir+"/test.db") if err != nil { t.Fatalf("Open: %v", err) } defer func() { _ = store.Close() }() item := state.OutboxItem{Tool: "cc", Host: "h", SourceFile: "/a.jsonl", Payload: []byte(`{"seq":1} {"seq":2} `)} if err := store.Enqueue(ctx, item); err != nil { t.Fatalf("Enqueue: %v", err) } ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte(`{"accepted":1,"errors":["bad row"]}`)) })) defer ts.Close() sender := NewSender(ts.URL, http.DefaultClient) if err := ReplayOutbox(ctx, store, sender, 10); err != nil { t.Fatalf("ReplayOutbox: %v", err) } rows, err := store.Oldest(ctx, 10) if err != nil { t.Fatalf("Oldest: %v", err) } if len(rows) != 1 { t.Errorf("expected 1 row after partial accept, got %d", len(rows)) } } func TestReplayOutbox_LeavesRowOnError(t *testing.T) { ctx := context.Background() dir := t.TempDir() store, err := state.Open(ctx, dir+"/test.db") if err != nil { t.Fatalf("Open: %v", err) } defer func() { _ = store.Close() }() item := state.OutboxItem{Tool: "cc", Host: "h", SourceFile: "/a.jsonl", Payload: []byte(`{"seq":1} `)} if err := store.Enqueue(ctx, item); err != nil { t.Fatalf("Enqueue: %v", err) } ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusInternalServerError) _, _ = w.Write([]byte(`boom`)) })) defer ts.Close() sender := NewSender(ts.URL, http.DefaultClient) // Expect error to propagate. if err := ReplayOutbox(ctx, store, sender, 10); err == nil { t.Fatal("expected error on 500, got nil") } rows, err := store.Oldest(ctx, 10) if err != nil { t.Fatalf("Oldest: %v", err) } if len(rows) != 1 { t.Errorf("expected 1 row after error, got %d", len(rows)) } } func TestEnforceOutboxLimit_DropsOldestRows(t *testing.T) { ctx := context.Background() dir := t.TempDir() store, err := state.Open(ctx, dir+"/test.db") if err != nil { t.Fatalf("Open: %v", err) } defer func() { _ = store.Close() }() // Enqueue three items with known sizes. items := []state.OutboxItem{ {Tool: "cc", Host: "h", SourceFile: "/a.jsonl", Payload: []byte(`{"seq":1}`)}, // 9 bytes {Tool: "cc", Host: "h", SourceFile: "/b.jsonl", Payload: []byte(`{"seq":2}`)}, // 9 bytes {Tool: "cc", Host: "h", SourceFile: "/c.jsonl", Payload: []byte(`{"seq":333}`)}, // 11 bytes } for _, it := range items { if err := store.Enqueue(ctx, it); err != nil { t.Fatalf("Enqueue: %v", err) } } // Total = 29 bytes. Cap at 20 => drop oldest until under 20. if err := EnforceOutboxLimit(ctx, store, 20); err != nil { t.Fatalf("EnforceOutboxLimit: %v", err) } rows, err := store.Oldest(ctx, 10) if err != nil { t.Fatalf("Oldest: %v", err) } // Should have dropped the first row (9 bytes), leaving 20 bytes exactly. if len(rows) != 2 { t.Errorf("expected 2 rows, got %d", len(rows)) } // Verify the oldest remaining row is the second item. if len(rows) > 0 && string(rows[0].Payload) != `{"seq":2}` { t.Errorf("oldest remaining payload = %q, want {\"seq\":2}", string(rows[0].Payload)) } // Verify stats are under limit. st, err := store.Stats(ctx) if err != nil { t.Fatalf("Stats: %v", err) } if st.OutboxBytes > 20 { t.Errorf("OutboxBytes = %d, want <= 20", st.OutboxBytes) } } func TestEnforceOutboxLimit_NoOpWhenUnderLimit(t *testing.T) { ctx := context.Background() dir := t.TempDir() store, err := state.Open(ctx, dir+"/test.db") if err != nil { t.Fatalf("Open: %v", err) } defer func() { _ = store.Close() }() item := state.OutboxItem{Tool: "cc", Host: "h", SourceFile: "/a.jsonl", Payload: []byte(`x`)} if err := store.Enqueue(ctx, item); err != nil { t.Fatalf("Enqueue: %v", err) } if err := EnforceOutboxLimit(ctx, store, 1000); err != nil { t.Fatalf("EnforceOutboxLimit: %v", err) } rows, err := store.Oldest(ctx, 10) if err != nil { t.Fatalf("Oldest: %v", err) } if len(rows) != 1 { t.Errorf("expected 1 row, got %d", len(rows)) } } func TestEnforceOutboxLimit_EmptyOutbox(t *testing.T) { ctx := context.Background() dir := t.TempDir() store, err := state.Open(ctx, dir+"/test.db") if err != nil { t.Fatalf("Open: %v", err) } defer func() { _ = store.Close() }() if err := EnforceOutboxLimit(ctx, store, 100); err != nil { t.Fatalf("EnforceOutboxLimit: %v", err) } }