package ingest
import (
"bytes"
"context"
"log/slog"
"net/http"
"net/http/httptest"
"strings"
"testing"
"sourcecraft.dev/bigbes/lethe/internal/collector/state"
)
func TestReplayOutbox_DeletesOnlyFullyAcceptedRows(t *testing.T) {
ctx := context.Background()
dir := t.TempDir()
store, err := state.Open(ctx, dir+"/test.db")
if err != nil {
t.Fatalf("Open: %v", err)
}
defer func() { _ = store.Close() }()
// Enqueue two items.
item1 := state.OutboxItem{Tool: "cc", Host: "h", SourceFile: "/a.jsonl", Payload: []byte(`{"seq":1}
`)}
item2 := state.OutboxItem{Tool: "cc", Host: "h", SourceFile: "/b.jsonl", Payload: []byte(`{"seq":2}
`)}
if err := store.Enqueue(ctx, item1); err != nil {
t.Fatalf("Enqueue 1: %v", err)
}
if err := store.Enqueue(ctx, item2); err != nil {
t.Fatalf("Enqueue 2: %v", err)
}
// Server accepts all rows.
callCount := 0
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
callCount++
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"accepted":1,"errors":[]}`))
}))
defer ts.Close()
sender := NewSender(ts.URL, http.DefaultClient)
if err := ReplayOutbox(ctx, store, sender, 10); err != nil {
t.Fatalf("ReplayOutbox: %v", err)
}
if callCount != 2 {
t.Errorf("expected 2 server calls, got %d", callCount)
}
rows, err := store.Oldest(ctx, 10)
if err != nil {
t.Fatalf("Oldest: %v", err)
}
if len(rows) != 0 {
t.Errorf("expected 0 rows after full replay, got %d", len(rows))
}
}
func TestReplayOutbox_LeavesRowOnPartialAccept(t *testing.T) {
ctx := context.Background()
dir := t.TempDir()
store, err := state.Open(ctx, dir+"/test.db")
if err != nil {
t.Fatalf("Open: %v", err)
}
defer func() { _ = store.Close() }()
item := state.OutboxItem{Tool: "cc", Host: "h", SourceFile: "/a.jsonl", Payload: []byte(`{"seq":1}
{"seq":2}
`)}
if err := store.Enqueue(ctx, item); err != nil {
t.Fatalf("Enqueue: %v", err)
}
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"accepted":1,"errors":[{"line":2,"error":"bad row"}]}`))
}))
defer ts.Close()
sender := NewSender(ts.URL, http.DefaultClient)
if err := ReplayOutbox(ctx, store, sender, 10); err != nil {
t.Fatalf("ReplayOutbox: %v", err)
}
rows, err := store.Oldest(ctx, 10)
if err != nil {
t.Fatalf("Oldest: %v", err)
}
if len(rows) != 1 {
t.Errorf("expected 1 row after partial accept, got %d", len(rows))
}
}
func TestReplayOutbox_LeavesRowOnError(t *testing.T) {
ctx := context.Background()
dir := t.TempDir()
store, err := state.Open(ctx, dir+"/test.db")
if err != nil {
t.Fatalf("Open: %v", err)
}
defer func() { _ = store.Close() }()
item := state.OutboxItem{Tool: "cc", Host: "h", SourceFile: "/a.jsonl", Payload: []byte(`{"seq":1}
`)}
if err := store.Enqueue(ctx, item); err != nil {
t.Fatalf("Enqueue: %v", err)
}
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(`boom`))
}))
defer ts.Close()
sender := NewSender(ts.URL, http.DefaultClient)
// Expect error to propagate.
if err := ReplayOutbox(ctx, store, sender, 10); err == nil {
t.Fatal("expected error on 500, got nil")
}
rows, err := store.Oldest(ctx, 10)
if err != nil {
t.Fatalf("Oldest: %v", err)
}
if len(rows) != 1 {
t.Errorf("expected 1 row after error, got %d", len(rows))
}
}
func TestEnforceOutboxLimit_DropsOldestRows(t *testing.T) {
ctx := context.Background()
dir := t.TempDir()
store, err := state.Open(ctx, dir+"/test.db")
if err != nil {
t.Fatalf("Open: %v", err)
}
defer func() { _ = store.Close() }()
// Enqueue three items with known sizes.
items := []state.OutboxItem{
{Tool: "cc", Host: "h", SourceFile: "/a.jsonl", Payload: []byte(`{"seq":1}`)}, // 9 bytes
{Tool: "cc", Host: "h", SourceFile: "/b.jsonl", Payload: []byte(`{"seq":2}`)}, // 9 bytes
{Tool: "cc", Host: "h", SourceFile: "/c.jsonl", Payload: []byte(`{"seq":333}`)}, // 11 bytes
}
for _, it := range items {
if err := store.Enqueue(ctx, it); err != nil {
t.Fatalf("Enqueue: %v", err)
}
}
// Total = 29 bytes. Cap at 20 => drop oldest until under 20.
if err := EnforceOutboxLimit(ctx, store, 20); err != nil {
t.Fatalf("EnforceOutboxLimit: %v", err)
}
rows, err := store.Oldest(ctx, 10)
if err != nil {
t.Fatalf("Oldest: %v", err)
}
// Should have dropped the first row (9 bytes), leaving 20 bytes exactly.
if len(rows) != 2 {
t.Errorf("expected 2 rows, got %d", len(rows))
}
// Verify the oldest remaining row is the second item.
if len(rows) > 0 && string(rows[0].Payload) != `{"seq":2}` {
t.Errorf("oldest remaining payload = %q, want {\"seq\":2}", string(rows[0].Payload))
}
// Verify stats are under limit.
st, err := store.Stats(ctx)
if err != nil {
t.Fatalf("Stats: %v", err)
}
if st.OutboxBytes > 20 {
t.Errorf("OutboxBytes = %d, want <= 20", st.OutboxBytes)
}
}
func TestEnforceOutboxLimit_NoOpWhenUnderLimit(t *testing.T) {
ctx := context.Background()
dir := t.TempDir()
store, err := state.Open(ctx, dir+"/test.db")
if err != nil {
t.Fatalf("Open: %v", err)
}
defer func() { _ = store.Close() }()
item := state.OutboxItem{Tool: "cc", Host: "h", SourceFile: "/a.jsonl", Payload: []byte(`x`)}
if err := store.Enqueue(ctx, item); err != nil {
t.Fatalf("Enqueue: %v", err)
}
if err := EnforceOutboxLimit(ctx, store, 1000); err != nil {
t.Fatalf("EnforceOutboxLimit: %v", err)
}
rows, err := store.Oldest(ctx, 10)
if err != nil {
t.Fatalf("Oldest: %v", err)
}
if len(rows) != 1 {
t.Errorf("expected 1 row, got %d", len(rows))
}
}
func TestEnforceOutboxLimit_LogsWarningWhenRowsDropped(t *testing.T) {
ctx := context.Background()
dir := t.TempDir()
store, err := state.Open(ctx, dir+"/test.db")
if err != nil {
t.Fatalf("Open: %v", err)
}
defer func() { _ = store.Close() }()
items := []state.OutboxItem{
{Tool: "cc", Host: "h", SourceFile: "/a.jsonl", Payload: []byte(`{"seq":1}`)}, // 9 bytes
{Tool: "cc", Host: "h", SourceFile: "/b.jsonl", Payload: []byte(`{"seq":2}`)}, // 9 bytes
{Tool: "cc", Host: "h", SourceFile: "/c.jsonl", Payload: []byte(`{"seq":333}`)}, // 11 bytes
}
for _, it := range items {
if err := store.Enqueue(ctx, it); err != nil {
t.Fatalf("Enqueue: %v", err)
}
}
oldLogger := slog.Default()
defer slog.SetDefault(oldLogger)
var buf bytes.Buffer
slog.SetDefault(slog.New(slog.NewTextHandler(&buf, &slog.HandlerOptions{Level: slog.LevelWarn})))
if err := EnforceOutboxLimit(ctx, store, 20); err != nil {
t.Fatalf("EnforceOutboxLimit: %v", err)
}
logOutput := buf.String()
if !strings.Contains(logOutput, "outbox overflow") {
t.Errorf("expected WARN log containing 'outbox overflow', got %q", logOutput)
}
if !strings.Contains(logOutput, "dropped_rows") {
t.Errorf("expected WARN log containing 'dropped_rows', got %q", logOutput)
}
if !strings.Contains(logOutput, "dropped_bytes") {
t.Errorf("expected WARN log containing 'dropped_bytes', got %q", logOutput)
}
}
func TestEnforceOutboxLimit_EmptyOutbox(t *testing.T) {
ctx := context.Background()
dir := t.TempDir()
store, err := state.Open(ctx, dir+"/test.db")
if err != nil {
t.Fatalf("Open: %v", err)
}
defer func() { _ = store.Close() }()
if err := EnforceOutboxLimit(ctx, store, 100); err != nil {
t.Fatalf("EnforceOutboxLimit: %v", err)
}
}
func TestEnforceOutboxLimit_RepeatedTrimming(t *testing.T) {
ctx := context.Background()
dir := t.TempDir()
store, err := state.Open(ctx, dir+"/test.db")
if err != nil {
t.Fatalf("Open: %v", err)
}
defer func() { _ = store.Close() }()
// Enqueue more than 1000 items so a single batch is insufficient.
for i := 0; i < 1002; i++ {
item := state.OutboxItem{Tool: "cc", Host: "h", SourceFile: "/a.jsonl", Payload: []byte("x")}
if err := store.Enqueue(ctx, item); err != nil {
t.Fatalf("Enqueue %d: %v", i, err)
}
}
if err := EnforceOutboxLimit(ctx, store, 0); err != nil {
t.Fatalf("EnforceOutboxLimit: %v", err)
}
st, err := store.Stats(ctx)
if err != nil {
t.Fatalf("Stats: %v", err)
}
if st.OutboxBytes > 0 {
t.Errorf("OutboxBytes = %d, want 0", st.OutboxBytes)
}
if st.OutboxCount != 0 {
t.Errorf("OutboxCount = %d, want 0", st.OutboxCount)
}
}