package ingest import ( "context" "encoding/json" "io" "net/http" "net/http/httptest" "strings" "testing" "sourcecraft.dev/bigbes/lethe/internal/shared/wire" ) func TestEncodeNDJSON_EmitsOneObjectPerLineWithTrailingNewline(t *testing.T) { events := []wire.TurnEvent{ {Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t1", Seq: 1, Role: "user", Timestamp: 1000, Content: "hello"}, {Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t2", Seq: 2, Role: "assistant", Timestamp: 1001, Content: "world"}, } data, err := EncodeNDJSON(events) if err != nil { t.Fatalf("EncodeNDJSON: %v", err) } lines := strings.Split(string(data), "\n") // Trailing newline means last element is empty. if len(lines) != 3 { t.Fatalf("expected 3 lines (2 events + trailing empty), got %d", len(lines)) } if lines[2] != "" { t.Errorf("expected trailing empty line, got %q", lines[2]) } for i, line := range lines[:2] { if line == "" { t.Fatalf("line %d is empty", i) } var ev wire.TurnEvent if err := json.Unmarshal([]byte(line), &ev); err != nil { t.Fatalf("line %d invalid JSON: %v", i, err) } } } func TestEncodeNDJSON_EmptySlice(t *testing.T) { data, err := EncodeNDJSON([]wire.TurnEvent{}) if err != nil { t.Fatalf("EncodeNDJSON: %v", err) } if len(data) != 0 { t.Errorf("expected empty bytes for empty slice, got %q", string(data)) } } func TestSender_PostBatch_Success(t *testing.T) { var gotBody []byte var gotContentType string ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { t.Errorf("expected POST, got %s", r.Method) } if r.URL.Path != "/api/v1/ingest" { t.Errorf("expected path /api/v1/ingest, got %s", r.URL.Path) } gotContentType = r.Header.Get("Content-Type") var err error gotBody, err = io.ReadAll(r.Body) if err != nil { t.Fatalf("read body: %v", err) } w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte(`{"accepted":2,"errors":[]}`)) })) defer ts.Close() sender := NewSender(ts.URL, http.DefaultClient) events := []wire.TurnEvent{ {Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t1", Seq: 1, Role: "user", Timestamp: 1000, Content: "hello"}, {Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t2", Seq: 2, Role: "assistant", Timestamp: 1001, Content: "world"}, } result, err := sender.PostBatch(context.Background(), events) if err != nil { t.Fatalf("PostBatch: %v", err) } if result.Accepted != 2 { t.Errorf("Accepted = %d, want 2", result.Accepted) } if len(result.Errors) != 0 { t.Errorf("Errors = %v, want empty", result.Errors) } if gotContentType != "application/x-ndjson" { t.Errorf("Content-Type = %q, want application/x-ndjson", gotContentType) } // Verify body is valid NDJSON. lines := strings.Split(string(gotBody), "\n") if len(lines) != 3 || lines[2] != "" { t.Errorf("body does not look like NDJSON with trailing newline: %q", string(gotBody)) } } func TestSender_PostBatch_Non2xx(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusServiceUnavailable) _, _ = w.Write([]byte(`busy`)) })) defer ts.Close() sender := NewSender(ts.URL, http.DefaultClient) _, err := sender.PostBatch(context.Background(), []wire.TurnEvent{ {Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t1", Seq: 1, Role: "user", Timestamp: 1000, Content: "hello"}, }) if err == nil { t.Fatal("expected error on 503, got nil") } } func TestSender_PostBatch_MalformedResponse(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte(`not json`)) })) defer ts.Close() sender := NewSender(ts.URL, http.DefaultClient) _, err := sender.PostBatch(context.Background(), []wire.TurnEvent{ {Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t1", Seq: 1, Role: "user", Timestamp: 1000, Content: "hello"}, }) if err == nil { t.Fatal("expected error on malformed response, got nil") } } func TestSender_PostBatch_PartialAccept(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte(`{"accepted":1,"errors":[{"line":2,"error":"bad row"}]}`)) })) defer ts.Close() sender := NewSender(ts.URL, http.DefaultClient) result, err := sender.PostBatch(context.Background(), []wire.TurnEvent{ {Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t1", Seq: 1, Role: "user", Timestamp: 1000, Content: "hello"}, {Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t2", Seq: 2, Role: "assistant", Timestamp: 1001, Content: "world"}, }) if err != nil { t.Fatalf("PostBatch: %v", err) } if result.Accepted != 1 { t.Errorf("Accepted = %d, want 1", result.Accepted) } if len(result.Errors) != 1 || result.Errors[0].Line != 2 || result.Errors[0].Err != "bad row" { t.Errorf("Errors = %+v, want one LineError{Line:2, Err:\"bad row\"}", result.Errors) } } func TestSender_PostBatch_StructuredErrors(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte(`{"accepted":0,"errors":[{"line":1,"error":"bad"}]}`)) })) defer ts.Close() sender := NewSender(ts.URL, http.DefaultClient) result, err := sender.PostBatch(context.Background(), []wire.TurnEvent{ {Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t1", Seq: 1, Role: "user", Timestamp: 1000, Content: "hello"}, }) if err != nil { t.Fatalf("PostBatch: %v", err) } if result.Accepted != 0 { t.Errorf("Accepted = %d, want 0", result.Accepted) } if len(result.Errors) != 1 { t.Fatalf("Errors = %+v, want 1 error", result.Errors) } if result.Errors[0].Line != 1 || result.Errors[0].Err != "bad" { t.Errorf("Errors[0] = %+v, want Line=1 Err=bad", result.Errors[0]) } }