package ingest
import (
"context"
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"
"sourcecraft.dev/bigbes/lethe/internal/shared/wire"
)
func TestEncodeNDJSON_EmitsOneObjectPerLineWithTrailingNewline(t *testing.T) {
events := []wire.TurnEvent{
{Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t1", Seq: 1, Role: "user", Timestamp: 1000, Content: "hello"},
{Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t2", Seq: 2, Role: "assistant", Timestamp: 1001, Content: "world"},
}
data, err := EncodeNDJSON(events)
if err != nil {
t.Fatalf("EncodeNDJSON: %v", err)
}
lines := strings.Split(string(data), "\n")
// Trailing newline means last element is empty.
if len(lines) != 3 {
t.Fatalf("expected 3 lines (2 events + trailing empty), got %d", len(lines))
}
if lines[2] != "" {
t.Errorf("expected trailing empty line, got %q", lines[2])
}
for i, line := range lines[:2] {
if line == "" {
t.Fatalf("line %d is empty", i)
}
var ev wire.TurnEvent
if err := json.Unmarshal([]byte(line), &ev); err != nil {
t.Fatalf("line %d invalid JSON: %v", i, err)
}
}
}
func TestEncodeNDJSON_EmptySlice(t *testing.T) {
data, err := EncodeNDJSON([]wire.TurnEvent{})
if err != nil {
t.Fatalf("EncodeNDJSON: %v", err)
}
if len(data) != 0 {
t.Errorf("expected empty bytes for empty slice, got %q", string(data))
}
}
func TestSender_PostBatch_Success(t *testing.T) {
var gotBody []byte
var gotContentType string
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
t.Errorf("expected POST, got %s", r.Method)
}
if r.URL.Path != "/api/v1/ingest" {
t.Errorf("expected path /api/v1/ingest, got %s", r.URL.Path)
}
gotContentType = r.Header.Get("Content-Type")
var err error
gotBody, err = io.ReadAll(r.Body)
if err != nil {
t.Fatalf("read body: %v", err)
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"accepted":2,"errors":[]}`))
}))
defer ts.Close()
sender := NewSender(ts.URL, http.DefaultClient)
events := []wire.TurnEvent{
{Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t1", Seq: 1, Role: "user", Timestamp: 1000, Content: "hello"},
{Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t2", Seq: 2, Role: "assistant", Timestamp: 1001, Content: "world"},
}
result, err := sender.PostBatch(context.Background(), events)
if err != nil {
t.Fatalf("PostBatch: %v", err)
}
if result.Accepted != 2 {
t.Errorf("Accepted = %d, want 2", result.Accepted)
}
if len(result.Errors) != 0 {
t.Errorf("Errors = %v, want empty", result.Errors)
}
if gotContentType != "application/x-ndjson" {
t.Errorf("Content-Type = %q, want application/x-ndjson", gotContentType)
}
// Verify body is valid NDJSON.
lines := strings.Split(string(gotBody), "\n")
if len(lines) != 3 || lines[2] != "" {
t.Errorf("body does not look like NDJSON with trailing newline: %q", string(gotBody))
}
}
func TestSender_PostBatch_TrailingSlashURL(t *testing.T) {
var gotPath string
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
gotPath = r.URL.Path
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"accepted":1,"errors":[]}`))
}))
defer ts.Close()
sender := NewSender(ts.URL+"/", http.DefaultClient)
_, err := sender.PostBatch(context.Background(), []wire.TurnEvent{
{Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t1", Seq: 1, Role: "user", Timestamp: 1000, Content: "hello"},
})
if err != nil {
t.Fatalf("PostBatch: %v", err)
}
if gotPath != "/api/v1/ingest" {
t.Errorf("expected path /api/v1/ingest, got %s", gotPath)
}
}
func TestSender_PostBatch_Non2xx(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusServiceUnavailable)
_, _ = w.Write([]byte(`busy`))
}))
defer ts.Close()
sender := NewSender(ts.URL, http.DefaultClient)
_, err := sender.PostBatch(context.Background(), []wire.TurnEvent{
{Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t1", Seq: 1, Role: "user", Timestamp: 1000, Content: "hello"},
})
if err == nil {
t.Fatal("expected error on 503, got nil")
}
}
func TestSender_PostBatch_MalformedResponse(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`not json`))
}))
defer ts.Close()
sender := NewSender(ts.URL, http.DefaultClient)
_, err := sender.PostBatch(context.Background(), []wire.TurnEvent{
{Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t1", Seq: 1, Role: "user", Timestamp: 1000, Content: "hello"},
})
if err == nil {
t.Fatal("expected error on malformed response, got nil")
}
}
func TestSender_PostBatch_PartialAccept(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"accepted":1,"errors":[{"line":2,"error":"bad row"}]}`))
}))
defer ts.Close()
sender := NewSender(ts.URL, http.DefaultClient)
result, err := sender.PostBatch(context.Background(), []wire.TurnEvent{
{Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t1", Seq: 1, Role: "user", Timestamp: 1000, Content: "hello"},
{Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t2", Seq: 2, Role: "assistant", Timestamp: 1001, Content: "world"},
})
if err != nil {
t.Fatalf("PostBatch: %v", err)
}
if result.Accepted != 1 {
t.Errorf("Accepted = %d, want 1", result.Accepted)
}
if len(result.Errors) != 1 || result.Errors[0].Line != 2 || result.Errors[0].Err != "bad row" {
t.Errorf("Errors = %+v, want one LineError{Line:2, Err:\"bad row\"}", result.Errors)
}
}
func TestSender_PostBatch_StructuredErrors(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"accepted":0,"errors":[{"line":1,"error":"bad"}]}`))
}))
defer ts.Close()
sender := NewSender(ts.URL, http.DefaultClient)
result, err := sender.PostBatch(context.Background(), []wire.TurnEvent{
{Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t1", Seq: 1, Role: "user", Timestamp: 1000, Content: "hello"},
})
if err != nil {
t.Fatalf("PostBatch: %v", err)
}
if result.Accepted != 0 {
t.Errorf("Accepted = %d, want 0", result.Accepted)
}
if len(result.Errors) != 1 {
t.Fatalf("Errors = %+v, want 1 error", result.Errors)
}
if result.Errors[0].Line != 1 || result.Errors[0].Err != "bad" {
t.Errorf("Errors[0] = %+v, want Line=1 Err=bad", result.Errors[0])
}
}