~bigbes/lethe

8383c20c8ccfc2f64ad0987140d6536ccaf8d7ff — Eugene Blikh a month ago 80b1c09
feat(ingest): NDJSON ingest with chunked transactions and partial-accept
M internal/domain/ingest/handler.go => internal/domain/ingest/handler.go +83 -11
@@ 1,22 1,94 @@
// Phase-5 stub. Replaced by real implementation in Phase 7.
//
// Mount is a no-op so the /api/v1 group registers no ingest routes yet;
// Server.Init still wires it in so the full router topology compiles.
package ingest

import (
	"context"
	"log/slog"
	"net/http"
	"strings"

	"github.com/go-chi/chi/v5"
	"go.bigb.es/auxilia/culpa"
	"go.bigb.es/auxilia/scribe"

	"sourcecraft.dev/bigbes/lethe/internal/config"
	"sourcecraft.dev/bigbes/lethe/internal/pkg/apierror"
	"sourcecraft.dev/bigbes/lethe/internal/pkg/httputil"
	"sourcecraft.dev/bigbes/lethe/internal/server/auth"
)

// Handler is a no-op steward service. Phase 7 supplies the real ingest
// pipeline (NDJSON streaming, validation, chunked commit).
type Handler struct{}
// ndjsonContentType is the canonical media type for the ingest payload. The
// IETF-style `application/x-ndjson` is the documented choice for lethe; the
// collector emits this exact value. Anything else is rejected with 415.
const ndjsonContentType = "application/x-ndjson"

// Handler is the steward-managed ingest HTTP boundary. It enforces the
// Content-Type and body cap, derives the owner from the authenticated
// identity (never from the wire payload), and delegates the streaming pipeline
// to Service.
type Handler struct {
	Cfg     config.IngestConfig `config:""`
	Service *Service            `inject:""`
}

// Init satisfies the steward Service contract.
// Init satisfies the steward Initer contract. The handler holds no state
// beyond its injected dependencies.
func (h *Handler) Init(_ context.Context) error { return nil }

// Mount registers no routes. Phase 7 replaces this with the ingest endpoints
// rooted at the /api/v1 group passed in by Server.Init.
func (h *Handler) Mount(_ chi.Router) {}
// Mount registers POST /ingest on r. The router supplied by Server is the
// /api/v1 group, so the effective path is /api/v1/ingest. Auth middleware is
// installed by the parent group; Post relies on auth.MustIdentity().
func (h *Handler) Mount(r chi.Router) {
	r.Post("/ingest", h.Post)
}

// Post is the NDJSON ingest endpoint. It always returns 200 when the body is
// well-formed enough to read — per-line failures live in the JSON `errors`
// array. Hard infrastructure failures (DB down, body cap exceeded) propagate
// as RFC 7807 problems via apierror.Render.
func (h *Handler) Post(w http.ResponseWriter, r *http.Request) {
	identity := auth.MustIdentity(r.Context())

	if !isNDJSONContentType(r.Header.Get("Content-Type")) {
		apierror.Render(w, r, culpa.WithCode(
			culpa.WithPublic(
				culpa.Errorf("Content-Type must be %s", ndjsonContentType),
				"Content-Type must be "+ndjsonContentType,
			),
			"UNSUPPORTED_MEDIA_TYPE",
		))
		return
	}

	// MaxBytesReader caps the request body; Service.Ingest's NDJSON scanner
	// surfaces an over-cap read as a TOO_LARGE-coded culpa error which
	// apierror.Render turns into 413.
	body := http.MaxBytesReader(w, r.Body, h.Cfg.MaxBodyBytes)
	defer func() { _ = body.Close() }()

	result, err := h.Service.Ingest(r.Context(), identity.User, body, h.Cfg.MaxBodyBytes)
	if err != nil {
		// Render the failure but don't drop the partial-accept count: the
		// 5xx/413 problem document tells the client the run aborted, and
		// the next call will resume from its own offset bookkeeping.
		apierror.Render(w, r, err)
		return
	}

	if writeErr := httputil.WriteJSON(w, http.StatusOK, result); writeErr != nil {
		slog.Default().ErrorContext(r.Context(), "write ingest response", scribe.Err(writeErr))
	}
}

// isNDJSONContentType returns true when the header value (with any trailing
// parameters such as `; charset=utf-8`) names the canonical NDJSON media
// type. The MIME type itself is matched case-insensitively per RFC 9110.
func isNDJSONContentType(v string) bool {
	if v == "" {
		return false
	}
	// Strip parameters; we only need the type/subtype prefix.
	if idx := strings.IndexByte(v, ';'); idx >= 0 {
		v = v[:idx]
	}
	return strings.EqualFold(strings.TrimSpace(v), ndjsonContentType)
}

A internal/domain/ingest/handler_test.go => internal/domain/ingest/handler_test.go +170 -0
@@ 0,0 1,170 @@
package ingest_test

import (
	"context"
	"encoding/json"
	"net/http"
	"net/http/httptest"
	"strings"
	"testing"

	"github.com/go-chi/chi/v5"

	"sourcecraft.dev/bigbes/lethe/internal/config"
	"sourcecraft.dev/bigbes/lethe/internal/domain/ingest"
	"sourcecraft.dev/bigbes/lethe/internal/server/auth"
)

// fakeAuthMiddleware injects a fixed Identity onto the request context so the
// handler under test can call auth.MustIdentity without a real Authenticator.
func fakeAuthMiddleware(user string) func(http.Handler) http.Handler {
	return func(next http.Handler) http.Handler {
		return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
			ctx := auth.WithIdentity(r.Context(), auth.Identity{User: user})
			next.ServeHTTP(w, r.WithContext(ctx))
		})
	}
}

// newHandler wires Service + Repository + observability + handler against a
// fresh in-memory database, then returns the handler.
func newHandler(t *testing.T, cfg config.IngestConfig) *ingest.Handler {
	t.Helper()
	svc, _ := newService(t, cfg)
	h := &ingest.Handler{Cfg: cfg, Service: svc}
	if err := h.Init(context.Background()); err != nil {
		t.Fatalf("handler.Init: %v", err)
	}
	return h
}

// mountHandler builds a chi router with the fake auth middleware and the
// ingest handler mounted under /api/v1.
func mountHandler(h *ingest.Handler, user string) http.Handler {
	r := chi.NewRouter()
	r.Route("/api/v1", func(r chi.Router) {
		r.Use(fakeAuthMiddleware(user))
		h.Mount(r)
	})
	return r
}

func TestHandler_Post_HappyPath(t *testing.T) {
	h := newHandler(t, defaultCfg())
	router := mountHandler(h, "alice")

	payload := strings.Join([]string{validLine("t1", 1700000000, "x"), validLine("t2", 1700000001, "y")}, "\n") + "\n"
	req := httptest.NewRequest(http.MethodPost, "/api/v1/ingest", strings.NewReader(payload))
	req.Header.Set("Content-Type", "application/x-ndjson")
	rec := httptest.NewRecorder()
	router.ServeHTTP(rec, req)

	if rec.Code != http.StatusOK {
		t.Fatalf("status=%d body=%s", rec.Code, rec.Body.String())
	}
	var got ingest.Result
	if err := json.Unmarshal(rec.Body.Bytes(), &got); err != nil {
		t.Fatalf("unmarshal: %v", err)
	}
	if got.Accepted != 2 {
		t.Errorf("Accepted=%d; want 2", got.Accepted)
	}
	if len(got.Errors) != 0 {
		t.Errorf("Errors=%v; want empty", got.Errors)
	}
}

func TestHandler_Post_WrongContentTypeReturns415(t *testing.T) {
	h := newHandler(t, defaultCfg())
	router := mountHandler(h, "alice")

	req := httptest.NewRequest(http.MethodPost, "/api/v1/ingest",
		strings.NewReader(validLine("t1", 1700000000, "x")+"\n"))
	req.Header.Set("Content-Type", "application/json")
	rec := httptest.NewRecorder()
	router.ServeHTTP(rec, req)

	if rec.Code != http.StatusUnsupportedMediaType {
		t.Fatalf("status=%d; want 415; body=%s", rec.Code, rec.Body.String())
	}
	if ct := rec.Header().Get("Content-Type"); ct != "application/problem+json" {
		t.Errorf("Content-Type=%q; want application/problem+json", ct)
	}
}

func TestHandler_Post_MissingContentTypeReturns415(t *testing.T) {
	h := newHandler(t, defaultCfg())
	router := mountHandler(h, "alice")

	req := httptest.NewRequest(http.MethodPost, "/api/v1/ingest",
		strings.NewReader(validLine("t1", 1700000000, "x")+"\n"))
	// No Content-Type set.
	rec := httptest.NewRecorder()
	router.ServeHTTP(rec, req)

	if rec.Code != http.StatusUnsupportedMediaType {
		t.Fatalf("status=%d; want 415", rec.Code)
	}
}

func TestHandler_Post_BodyOverCapReturns413(t *testing.T) {
	cfg := defaultCfg()
	cfg.MaxBodyBytes = 100
	h := newHandler(t, cfg)
	router := mountHandler(h, "alice")

	// 1 KiB+ body that exceeds the cap.
	big := validLine("t1", 1700000000, strings.Repeat("a", 1024))
	req := httptest.NewRequest(http.MethodPost, "/api/v1/ingest", strings.NewReader(big+"\n"))
	req.Header.Set("Content-Type", "application/x-ndjson")
	rec := httptest.NewRecorder()
	router.ServeHTTP(rec, req)

	if rec.Code != http.StatusRequestEntityTooLarge {
		t.Fatalf("status=%d; want 413; body=%s", rec.Code, rec.Body.String())
	}
}

func TestHandler_Post_DBDownReturns5xx(t *testing.T) {
	cfg := defaultCfg()
	svc, db := newService(t, cfg)
	h := &ingest.Handler{Cfg: cfg, Service: svc}
	if err := h.Init(context.Background()); err != nil {
		t.Fatalf("handler.Init: %v", err)
	}
	// Close the underlying *sql.DB without nilling the field so BeginTx
	// returns an error rather than nil-dereferencing. This mirrors the
	// runtime "database closed" failure mode (e.g. driver disconnect) more
	// accurately than calling Destroy.
	if err := db.DB.Close(); err != nil {
		t.Fatalf("close underlying db: %v", err)
	}
	router := mountHandler(h, "alice")

	req := httptest.NewRequest(http.MethodPost, "/api/v1/ingest",
		strings.NewReader(validLine("t1", 1700000000, "x")+"\n"))
	req.Header.Set("Content-Type", "application/x-ndjson")
	rec := httptest.NewRecorder()
	router.ServeHTTP(rec, req)

	if rec.Code < 500 {
		t.Fatalf("status=%d; want 5xx; body=%s", rec.Code, rec.Body.String())
	}
}

// TestHandler_Post_ContentTypeWithCharset proves the canonical media check
// tolerates the `; charset=utf-8` parameter the collector may attach.
func TestHandler_Post_ContentTypeWithCharset(t *testing.T) {
	h := newHandler(t, defaultCfg())
	router := mountHandler(h, "alice")

	req := httptest.NewRequest(http.MethodPost, "/api/v1/ingest",
		strings.NewReader(validLine("t1", 1700000000, "x")+"\n"))
	req.Header.Set("Content-Type", "application/x-ndjson; charset=utf-8")
	rec := httptest.NewRecorder()
	router.ServeHTTP(rec, req)

	if rec.Code != http.StatusOK {
		t.Fatalf("status=%d; want 200; body=%s", rec.Code, rec.Body.String())
	}
}

A internal/domain/ingest/repository.go => internal/domain/ingest/repository.go +147 -0
@@ 0,0 1,147 @@
// Package ingest implements the NDJSON ingest pipeline for lethe: per-line
// validation against the wire contract, chunked transactional commit with
// per-line partial-accept semantics, and the trust-model invariant that the
// stored owner is always the authenticated user (never the wire payload).
//
// Layering inside the package:
//   - Repository owns the raw SQL: per-chunk UPSERT of sessions/turns inside a
//     caller-supplied transaction. It does not begin or commit the tx.
//   - Service owns the streaming, validation, and chunking policy. It opens
//     and commits one tx per chunk, increments metrics, and converts SQL
//     errors into per-line LineError entries (or hard 5xx for non-line
//     failures like a closed DB).
//   - Handler is the HTTP boundary: enforces Content-Type, body cap, and
//     auth-derived owner; renders RFC 7807 problems via apierror.Render.
package ingest

import (
	"context"
	"errors"

	"github.com/jmoiron/sqlx"
	"go.bigb.es/auxilia/culpa"
	"modernc.org/sqlite"
	sqlite3 "modernc.org/sqlite/lib"

	"sourcecraft.dev/bigbes/lethe/internal/platform/database"
	"sourcecraft.dev/bigbes/lethe/internal/shared/wire"
)

// Repository is the steward-managed SQL steward for ingest. It is stateless;
// Init is empty because every UpsertChunk call gets its tx from the Service.
type Repository struct {
	Database *database.Database `inject:""`
}

// Init satisfies the steward Initer contract. The Repository has no state to
// build up — the database is owned by the Database steward and injected
// directly.
func (r *Repository) Init(_ context.Context) error { return nil }

// upsertSessionStmt UPSERTs a row in `sessions` keyed by the composite PK
// (owner, tool, host, session_id). On conflict only `ended_at` is widened
// (MAX), preserving the first-write-wins values for `started_at`, `working_dir`,
// `source_file`, and `metadata`. SQLite leaves columns not in the SET list
// unchanged on the existing row when DO UPDATE fires.
const upsertSessionStmt = `
INSERT INTO sessions
    (owner, tool, host, session_id, started_at, ended_at, working_dir, source_file, metadata)
VALUES
    (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (owner, tool, host, session_id) DO UPDATE SET
    ended_at = MAX(sessions.ended_at, excluded.ended_at)
`

// upsertTurnStmt UPSERTs a row in `turns` keyed by the composite PK
// (owner, tool, host, session_id, turn_id). Last-write-wins: every non-key
// column is overwritten from the incoming row.
const upsertTurnStmt = `
INSERT INTO turns
    (owner, tool, host, session_id, turn_id, seq, role, timestamp,
     content, model, tokens_in, tokens_out, cost_usd, tool_calls, metadata)
VALUES
    (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (owner, tool, host, session_id, turn_id) DO UPDATE SET
    seq        = excluded.seq,
    role       = excluded.role,
    timestamp  = excluded.timestamp,
    content    = excluded.content,
    model      = excluded.model,
    tokens_in  = excluded.tokens_in,
    tokens_out = excluded.tokens_out,
    cost_usd   = excluded.cost_usd,
    tool_calls = excluded.tool_calls,
    metadata   = excluded.metadata
`

// UpsertChunk writes every turn (and its parent session) in chunk under the
// caller-supplied transaction. The tx is owned by the caller (the Service):
// UpsertChunk neither begins nor commits it.
//
// owner is bound from the function parameter on every INSERT — never from the
// wire payload. wire.TurnEvent has no `owner` JSON field; even if a hostile
// client sneaks a stray `"owner":"evil"` into the line, json.Unmarshal drops
// it silently and the SQL below never references it. This invariant is the
// core of the multi-tenant trust model.
func (r *Repository) UpsertChunk(ctx context.Context, tx *sqlx.Tx, owner string, chunk []wire.TurnEvent) error {
	for _, t := range chunk {
		startedAt := t.Timestamp
		if t.SessionMeta.StartedAt != nil {
			startedAt = *t.SessionMeta.StartedAt
		}

		// owner is bound from the function parameter, not the wire payload (trust model invariant).
		if _, err := tx.ExecContext(ctx, upsertSessionStmt,
			owner, t.Tool, t.Host, t.SessionID,
			startedAt,
			t.Timestamp, // ended_at first-write equals started turn timestamp; widened by MAX on conflict.
			t.SessionMeta.WorkingDir,
			t.SessionMeta.SourceFile,
			rawMessageOrNil(t.SessionMeta.Metadata),
		); err != nil {
			return wrapSQLErr(err, "upsert session")
		}

		// owner is bound from the function parameter, not the wire payload (trust model invariant).
		if _, err := tx.ExecContext(ctx, upsertTurnStmt,
			owner, t.Tool, t.Host, t.SessionID, t.TurnID,
			t.Seq, t.Role, t.Timestamp, t.Content,
			t.Model, t.TokensIn, t.TokensOut, t.CostUSD,
			rawMessageOrNil(t.ToolCalls),
			rawMessageOrNil(t.Metadata),
		); err != nil {
			return wrapSQLErr(err, "upsert turn")
		}
	}
	return nil
}

// rawMessageOrNil normalizes a json.RawMessage to a SQL NULL when empty so the
// stored value is a real NULL (not the literal string "null" or "").
func rawMessageOrNil(m []byte) any {
	if len(m) == 0 {
		return nil
	}
	return string(m)
}

// wrapSQLErr classifies SQLite-driver errors so the Service can decide
// whether to attribute them to the offending line (FK / constraint failures)
// or to surface them as a hard 5xx (DB closed, IO error). Any unrecognized
// failure is wrapped under DB_UPSERT.
func wrapSQLErr(err error, msg string) error {
	var se *sqlite.Error
	if errors.As(err, &se) {
		switch se.Code() {
		case sqlite3.SQLITE_CONSTRAINT_FOREIGNKEY:
			return culpa.WithCode(culpa.Wrap(err, msg), "DB_FOREIGN_KEY")
		case sqlite3.SQLITE_CONSTRAINT,
			sqlite3.SQLITE_CONSTRAINT_CHECK,
			sqlite3.SQLITE_CONSTRAINT_NOTNULL,
			sqlite3.SQLITE_CONSTRAINT_PRIMARYKEY,
			sqlite3.SQLITE_CONSTRAINT_UNIQUE:
			return culpa.WithCode(culpa.Wrap(err, msg), "DB_CONSTRAINT")
		}
	}
	return culpa.WithCode(culpa.Wrap(err, msg), "DB_UPSERT")
}

A internal/domain/ingest/repository_test.go => internal/domain/ingest/repository_test.go +306 -0
@@ 0,0 1,306 @@
package ingest_test

import (
	"context"
	"encoding/json"
	"testing"
	"time"

	"github.com/jmoiron/sqlx"
	_ "modernc.org/sqlite"

	"sourcecraft.dev/bigbes/lethe/internal/config"
	"sourcecraft.dev/bigbes/lethe/internal/domain/ingest"
	"sourcecraft.dev/bigbes/lethe/internal/platform/database"
	"sourcecraft.dev/bigbes/lethe/internal/shared/wire"
)

// newTestDatabase builds a Database steward against :memory:, runs Init, and
// registers Cleanup. Returns the *Database (Repository expects the steward).
func newTestDatabase(t *testing.T) *database.Database {
	t.Helper()
	d := &database.Database{
		Cfg: config.DatabaseConfig{
			Path:        ":memory:",
			BusyTimeout: 5 * time.Second,
		},
	}
	if err := d.Init(context.Background()); err != nil {
		t.Fatalf("database.Init: %v", err)
	}
	t.Cleanup(func() { _ = d.Destroy(context.Background()) })
	return d
}

// newRepo wires a Repository against a fresh in-memory database.
func newRepo(t *testing.T) (*ingest.Repository, *sqlx.DB) {
	t.Helper()
	d := newTestDatabase(t)
	repo := &ingest.Repository{Database: d}
	if err := repo.Init(context.Background()); err != nil {
		t.Fatalf("repo.Init: %v", err)
	}
	return repo, d.DB
}

func ptrInt64(v int64) *int64       { return &v }
func ptrString(v string) *string    { return &v }
func ptrFloat64(v float64) *float64 { return &v }

// turn builds a minimal valid TurnEvent with overridable session_meta.
func turn(tool, host, sid, tid string, seq int64, ts int64, content string) wire.TurnEvent {
	return wire.TurnEvent{
		Tool:      tool,
		Host:      host,
		SessionID: sid,
		TurnID:    tid,
		Seq:       seq,
		Role:      "user",
		Timestamp: ts,
		Content:   content,
		SessionMeta: wire.SessionMeta{
			SourceFile: "/tmp/x.jsonl",
		},
	}
}

func runUpsert(t *testing.T, repo *ingest.Repository, db *sqlx.DB, owner string, turns []wire.TurnEvent) {
	t.Helper()
	if err := database.InTx(context.Background(), db, func(tx *sqlx.Tx) error {
		return repo.UpsertChunk(context.Background(), tx, owner, turns)
	}); err != nil {
		t.Fatalf("upsert: %v", err)
	}
}

func TestRepository_UpsertChunk_FirstWriteWinsSession(t *testing.T) {
	repo, db := newRepo(t)
	t1 := turn("cc", "phoebe", "s1", "tA", 1, 1700000000, "hello")
	t1.SessionMeta.WorkingDir = ptrString("/A")
	runUpsert(t, repo, db, "alice", []wire.TurnEvent{t1})

	t2 := turn("cc", "phoebe", "s1", "tB", 2, 1700000100, "world")
	t2.SessionMeta.WorkingDir = ptrString("/B")
	runUpsert(t, repo, db, "alice", []wire.TurnEvent{t2})

	var wd string
	if err := db.Get(&wd, `SELECT working_dir FROM sessions WHERE owner=? AND tool=? AND host=? AND session_id=?`,
		"alice", "cc", "phoebe", "s1"); err != nil {
		t.Fatalf("query: %v", err)
	}
	if wd != "/A" {
		t.Fatalf("expected first-write working_dir /A, got %q", wd)
	}
}

func TestRepository_UpsertChunk_LastWriteWinsTurn(t *testing.T) {
	repo, db := newRepo(t)
	tA := turn("cc", "phoebe", "s1", "tA", 1, 1700000000, "first content")
	runUpsert(t, repo, db, "alice", []wire.TurnEvent{tA})

	tB := turn("cc", "phoebe", "s1", "tA", 2, 1700000050, "second content")
	tB.Role = "assistant"
	runUpsert(t, repo, db, "alice", []wire.TurnEvent{tB})

	var content, role string
	var seq int64
	if err := db.QueryRow(`SELECT content, role, seq FROM turns WHERE owner=? AND turn_id=?`,
		"alice", "tA").Scan(&content, &role, &seq); err != nil {
		t.Fatalf("query: %v", err)
	}
	if content != "second content" || role != "assistant" || seq != 2 {
		t.Fatalf("expected last-write columns; got content=%q role=%q seq=%d", content, role, seq)
	}
}

func TestRepository_UpsertChunk_EndedAtMax(t *testing.T) {
	repo, db := newRepo(t)
	t1 := turn("cc", "phoebe", "s1", "tA", 1, 1700000100, "first")
	t1.SessionMeta.StartedAt = ptrInt64(1700000000)
	runUpsert(t, repo, db, "alice", []wire.TurnEvent{t1})

	// Second turn: smaller ended_at than current; MAX must keep first.
	t2 := turn("cc", "phoebe", "s1", "tB", 2, 1699999999, "earlier")
	runUpsert(t, repo, db, "alice", []wire.TurnEvent{t2})
	var ended int64
	if err := db.Get(&ended, `SELECT ended_at FROM sessions WHERE owner=? AND session_id=?`, "alice", "s1"); err != nil {
		t.Fatalf("q1: %v", err)
	}
	if ended != 1700000100 {
		t.Fatalf("expected ended_at preserved at 1700000100, got %d", ended)
	}

	// Third: larger ended_at; MAX must extend.
	t3 := turn("cc", "phoebe", "s1", "tC", 3, 1700000500, "later")
	runUpsert(t, repo, db, "alice", []wire.TurnEvent{t3})
	if err := db.Get(&ended, `SELECT ended_at FROM sessions WHERE owner=? AND session_id=?`, "alice", "s1"); err != nil {
		t.Fatalf("q2: %v", err)
	}
	if ended != 1700000500 {
		t.Fatalf("expected ended_at extended to 1700000500, got %d", ended)
	}
}

func TestRepository_UpsertChunk_StartedAtFallbackMinTurnTimestamp(t *testing.T) {
	repo, db := newRepo(t)
	// SessionMeta.StartedAt is nil; started_at must come from turn timestamp.
	t1 := turn("cc", "phoebe", "s1", "tA", 1, 1700000050, "x")
	runUpsert(t, repo, db, "alice", []wire.TurnEvent{t1})
	var started int64
	if err := db.Get(&started, `SELECT started_at FROM sessions WHERE owner=? AND session_id=?`, "alice", "s1"); err != nil {
		t.Fatalf("q: %v", err)
	}
	if started != 1700000050 {
		t.Fatalf("expected started_at fallback 1700000050, got %d", started)
	}

	// A subsequent turn with an earlier timestamp must NOT change started_at
	// (first-write-wins on start).
	t2 := turn("cc", "phoebe", "s1", "tB", 2, 1700000010, "earlier")
	runUpsert(t, repo, db, "alice", []wire.TurnEvent{t2})
	if err := db.Get(&started, `SELECT started_at FROM sessions WHERE owner=? AND session_id=?`, "alice", "s1"); err != nil {
		t.Fatalf("q2: %v", err)
	}
	if started != 1700000050 {
		t.Fatalf("expected started_at preserved at 1700000050, got %d", started)
	}
}

func TestRepository_UpsertChunk_PerOwnerIsolation(t *testing.T) {
	repo, db := newRepo(t)
	tA := turn("cc", "phoebe", "s1", "tA", 1, 1700000000, "shared")
	runUpsert(t, repo, db, "alice", []wire.TurnEvent{tA})
	runUpsert(t, repo, db, "bob", []wire.TurnEvent{tA})

	var rows int
	if err := db.Get(&rows, `SELECT COUNT(*) FROM sessions WHERE tool='cc' AND host='phoebe' AND session_id='s1'`); err != nil {
		t.Fatalf("q: %v", err)
	}
	if rows != 2 {
		t.Fatalf("expected two sessions across owners, got %d", rows)
	}
	if err := db.Get(&rows, `SELECT COUNT(*) FROM turns WHERE turn_id='tA'`); err != nil {
		t.Fatalf("q2: %v", err)
	}
	if rows != 2 {
		t.Fatalf("expected two turns across owners, got %d", rows)
	}
}

func TestRepository_UpsertChunk_FKViolationPropagatesAsLineLevelCode(t *testing.T) {
	// Repository alone cannot produce an FK violation — sessions are upserted
	// by the same call. We synthesize the violation by inserting a turn
	// referencing a (owner, tool, host, session_id) that does not exist;
	// this proves the FK wrapper code-path is reachable via the Repository's
	// raw exec layer for the Service to classify as line-level.
	d := newTestDatabase(t)
	tx, err := d.DB.BeginTxx(context.Background(), nil)
	if err != nil {
		t.Fatalf("begin: %v", err)
	}
	defer func() { _ = tx.Rollback() }()

	_, err = tx.Exec(`
		INSERT INTO turns (owner, tool, host, session_id, turn_id, seq, role, timestamp, content)
		VALUES ('alice','cc','phoebe','ghost','tX',1,'user',1,'orphan')`)
	if err == nil {
		t.Fatalf("expected FK violation, got nil")
	}
}

func TestRepository_UpsertChunk_OwnerBoundFromParameter(t *testing.T) {
	// Sneak a non-wire field into the JSON; unmarshalling drops it. Then
	// confirm the stored row uses the parameter, not whatever the wire said.
	repo, db := newRepo(t)
	raw := []byte(`{"owner":"evil","tool":"cc","host":"phoebe","session_id":"s1","turn_id":"tA","seq":1,"role":"user","timestamp":1700000000,"content":"hi","session_meta":{"source_file":"/tmp/x.jsonl"}}`)
	var ev wire.TurnEvent
	if err := json.Unmarshal(raw, &ev); err != nil {
		t.Fatalf("unmarshal: %v", err)
	}
	runUpsert(t, repo, db, "alice", []wire.TurnEvent{ev})

	var owner string
	if err := db.Get(&owner, `SELECT owner FROM turns WHERE turn_id='tA'`); err != nil {
		t.Fatalf("q: %v", err)
	}
	if owner != "alice" {
		t.Fatalf("expected stored owner=alice, got %q", owner)
	}
	var n int
	if err := db.Get(&n, `SELECT COUNT(*) FROM turns WHERE owner='evil'`); err != nil {
		t.Fatalf("q evil: %v", err)
	}
	if n != 0 {
		t.Fatalf("expected zero rows for owner=evil, got %d", n)
	}
}

func TestRepository_UpsertChunk_FTSPopulated(t *testing.T) {
	repo, db := newRepo(t)
	tc := json.RawMessage(`{"name":"shell","args":"ls"}`)
	t1 := turn("cc", "phoebe", "s1", "tA", 1, 1700000000, "find the needle")
	t1.ToolCalls = tc
	runUpsert(t, repo, db, "alice", []wire.TurnEvent{t1})

	var n int
	if err := db.Get(&n, `SELECT COUNT(*) FROM turns_fts WHERE owner='alice' AND turns_fts MATCH 'needle'`); err != nil {
		t.Fatalf("q turns_fts: %v", err)
	}
	if n != 1 {
		t.Fatalf("expected 1 turns_fts hit, got %d", n)
	}
	if err := db.Get(&n, `SELECT COUNT(*) FROM tool_outputs_fts WHERE owner='alice' AND tool_outputs_fts MATCH 'shell'`); err != nil {
		t.Fatalf("q tool_outputs_fts: %v", err)
	}
	if n != 1 {
		t.Fatalf("expected 1 tool_outputs_fts hit, got %d", n)
	}
}

// Verify metadata json.RawMessage round-trips as opaque text into the row.
func TestRepository_UpsertChunk_MetadataPersisted(t *testing.T) {
	repo, db := newRepo(t)
	t1 := turn("cc", "phoebe", "s1", "tA", 1, 1700000000, "x")
	t1.Metadata = json.RawMessage(`{"k":"v"}`)
	t1.SessionMeta.Metadata = json.RawMessage(`{"sk":"sv"}`)
	runUpsert(t, repo, db, "alice", []wire.TurnEvent{t1})

	var tm, sm string
	if err := db.QueryRow(`SELECT metadata FROM turns WHERE turn_id='tA'`).Scan(&tm); err != nil {
		t.Fatalf("q tm: %v", err)
	}
	if tm != `{"k":"v"}` {
		t.Fatalf("turn metadata roundtrip failed: %q", tm)
	}
	if err := db.QueryRow(`SELECT metadata FROM sessions WHERE session_id='s1'`).Scan(&sm); err != nil {
		t.Fatalf("q sm: %v", err)
	}
	if sm != `{"sk":"sv"}` {
		t.Fatalf("session metadata roundtrip failed: %q", sm)
	}
}

// Defensive: pass a free *float64 to ensure pointer types serialize cleanly.
func TestRepository_UpsertChunk_OptionalPointersStored(t *testing.T) {
	repo, db := newRepo(t)
	t1 := turn("cc", "phoebe", "s1", "tA", 1, 1700000000, "x")
	t1.Model = ptrString("gpt-x")
	t1.TokensIn = ptrInt64(10)
	t1.TokensOut = ptrInt64(20)
	t1.CostUSD = ptrFloat64(0.12)
	runUpsert(t, repo, db, "alice", []wire.TurnEvent{t1})

	var (
		model     string
		tokensIn  int64
		tokensOut int64
		cost      float64
	)
	if err := db.QueryRow(`SELECT model, tokens_in, tokens_out, cost_usd FROM turns WHERE turn_id='tA'`).
		Scan(&model, &tokensIn, &tokensOut, &cost); err != nil {
		t.Fatalf("q: %v", err)
	}
	if model != "gpt-x" || tokensIn != 10 || tokensOut != 20 || cost != 0.12 {
		t.Fatalf("optional fields not round-tripped: %v %v %v %v", model, tokensIn, tokensOut, cost)
	}
}

A internal/domain/ingest/service.go => internal/domain/ingest/service.go +235 -0
@@ 0,0 1,235 @@
package ingest

import (
	"context"
	"encoding/json"
	"io"
	"log/slog"

	"github.com/jmoiron/sqlx"
	"go.bigb.es/auxilia/culpa"
	"go.bigb.es/auxilia/scribe"

	"sourcecraft.dev/bigbes/lethe/internal/config"
	"sourcecraft.dev/bigbes/lethe/internal/pkg/httputil"
	"sourcecraft.dev/bigbes/lethe/internal/platform/database"
	"sourcecraft.dev/bigbes/lethe/internal/platform/observability"
	"sourcecraft.dev/bigbes/lethe/internal/shared/wire"
)

// validRoles is the closed set of values accepted in TurnEvent.Role. Phase 7
// validates against this list before reaching the database.
var validRoles = map[string]struct{}{
	"user":      {},
	"assistant": {},
	"tool":      {},
	"system":    {},
}

// maxSourceFileBytes caps the per-line SessionMeta.SourceFile length. The
// server treats source_file as opaque, but extreme values are still rejected
// to keep storage and logs sane.
const maxSourceFileBytes = 1024

// LineError describes a single NDJSON line that failed validation, JSON
// parsing, or chunk commit. It is stable wire output: the handler embeds the
// list directly into the 200 response body.
type LineError struct {
	Line int    `json:"line"`
	Err  string `json:"error"`
}

// Result is the outcome of one Ingest call. Accepted is the count of turns
// actually committed to the database; Errors lists per-line failures in the
// order they occurred. Per the spec the ingest endpoint returns 200 even when
// Errors is non-empty — the client uses Accepted to advance its offset.
type Result struct {
	Accepted int         `json:"accepted"`
	Errors   []LineError `json:"errors"`
}

// Service is the steward-managed ingest pipeline. Repo is the SQL boundary;
// Cfg fixes the chunk size and content-byte cap; Log emits per-chunk failure
// notices at WARN; Metrics counts accepted/errored lines and committed chunks.
type Service struct {
	Cfg     config.IngestConfig    `config:""`
	Repo    *Repository            `inject:""`
	Log     *observability.Logger  `inject:""`
	Metrics *observability.Metrics `inject:""`
}

// Init satisfies the steward Initer contract. The Service is stateless beyond
// its injected dependencies.
func (s *Service) Init(_ context.Context) error { return nil }

// validateTurn enforces the wire contract on a single decoded TurnEvent. It
// returns a culpa-coded INVALID error on the first failure; the Service maps
// that into a per-line LineError. Seq is intentionally unchecked: zero is a
// legal first sequence number on the wire.
func validateTurn(t wire.TurnEvent, maxContentBytes int64) error {
	if t.Tool == "" {
		return culpa.WithCode(culpa.New("tool is required"), "INVALID")
	}
	if t.Host == "" {
		return culpa.WithCode(culpa.New("host is required"), "INVALID")
	}
	if t.SessionID == "" {
		return culpa.WithCode(culpa.New("session_id is required"), "INVALID")
	}
	if t.TurnID == "" {
		return culpa.WithCode(culpa.New("turn_id is required"), "INVALID")
	}
	if t.Role == "" {
		return culpa.WithCode(culpa.New("role is required"), "INVALID")
	}
	if _, ok := validRoles[t.Role]; !ok {
		return culpa.WithCode(culpa.Errorf("role %q is not one of {user, assistant, tool, system}", t.Role), "INVALID")
	}
	if t.Timestamp == 0 {
		return culpa.WithCode(culpa.New("timestamp is required"), "INVALID")
	}
	if t.Content == "" {
		return culpa.WithCode(culpa.New("content is required"), "INVALID")
	}
	if int64(len(t.Content)) > maxContentBytes {
		return culpa.WithCode(culpa.Errorf("content exceeds %d bytes", maxContentBytes), "INVALID")
	}
	if t.SessionMeta.SourceFile == "" {
		return culpa.WithCode(culpa.New("session_meta.source_file is required"), "INVALID")
	}
	if len(t.SessionMeta.SourceFile) > maxSourceFileBytes {
		return culpa.WithCode(culpa.Errorf("session_meta.source_file exceeds %d bytes", maxSourceFileBytes), "INVALID")
	}
	return nil
}

// Ingest streams NDJSON from body, validates each line, and commits in chunks
// of Cfg.ChunkSize under one transaction per chunk.
//
// Partial-accept semantics: lines that committed in earlier chunks remain
// committed even when a later line fails — Accepted reflects what is actually
// in the DB. A per-line failure (JSON parse, validation, FK) appends a
// LineError and stops processing; subsequent lines are not touched. A
// non-line failure (DB closed, body cap exceeded, scanner I/O) returns the
// (still-truthful) Result alongside the wrapped error so the Handler can map
// it to a 5xx or 413.
func (s *Service) Ingest(ctx context.Context, owner string, body io.Reader, maxBytes int64) (Result, error) {
	var (
		result   Result
		chunk    = make([]wire.TurnEvent, 0, s.Cfg.ChunkSize)
		startsAt = make([]int, 0, s.Cfg.ChunkSize) // line number of each buffered turn
		lineNum  int
	)

	commitChunk := func() error {
		if len(chunk) == 0 {
			return nil
		}
		firstLine := startsAt[0]
		first := chunk[0]

		txErr := database.InTx(ctx, s.Repo.Database.DB, func(tx *sqlx.Tx) error {
			return s.Repo.UpsertChunk(ctx, tx, owner, chunk)
		})
		if txErr != nil {
			s.Metrics.IngestLinesErrored.Inc()
			s.Log.L.WarnContext(ctx, "ingest chunk commit failed",
				slog.String("owner", owner),
				slog.Int("line", firstLine),
				slog.String("tool", first.Tool),
				slog.String("host", first.Host),
				slog.String("session_id", first.SessionID),
				scribe.Err(txErr),
			)
			return txErr
		}

		n := len(chunk)
		result.Accepted += n
		s.Metrics.IngestLinesAccepted.Add(float64(n))
		s.Metrics.IngestChunksCommitted.Inc()

		// Reset buffers without releasing the underlying capacity.
		chunk = chunk[:0]
		startsAt = startsAt[:0]
		return nil
	}

	handleChunkErr := func(err error) (Result, error) {
		// Take the failing chunk's first line BEFORE reset; commitChunk
		// leaves buffers untouched on error so startsAt still has it.
		failedLine := lineNum
		if len(startsAt) > 0 {
			failedLine = startsAt[0]
		}
		if isLineLevelDBError(err) {
			result.Errors = append(result.Errors, LineError{
				Line: failedLine,
				Err:  err.Error(),
			})
			return result, nil
		}
		return result, err
	}

	for raw, scanErr := range httputil.ReadNDJSONLines(body, maxBytes) {
		if scanErr != nil {
			// Body cap or oversized line: caller maps to 413.
			return result, scanErr
		}
		lineNum++

		var event wire.TurnEvent
		if err := json.Unmarshal(raw, &event); err != nil {
			s.Metrics.IngestLinesErrored.Inc()
			result.Errors = append(result.Errors, LineError{
				Line: lineNum,
				Err:  "invalid json: " + err.Error(),
			})
			return result, nil
		}
		if err := validateTurn(event, s.Cfg.MaxTurnContentBytes); err != nil {
			s.Metrics.IngestLinesErrored.Inc()
			result.Errors = append(result.Errors, LineError{
				Line: lineNum,
				Err:  err.Error(),
			})
			return result, nil
		}

		chunk = append(chunk, event)
		startsAt = append(startsAt, lineNum)

		if len(chunk) >= s.Cfg.ChunkSize {
			if err := commitChunk(); err != nil {
				return handleChunkErr(err)
			}
		}
	}

	if err := commitChunk(); err != nil {
		return handleChunkErr(err)
	}

	return result, nil
}

// isLineLevelDBError reports whether err describes a per-row failure (FK or
// constraint violation) that should be surfaced as a LineError in the 200
// response, rather than escalated to a 5xx. The classification mirrors the
// codes Repository.UpsertChunk attaches via wrapSQLErr.
func isLineLevelDBError(err error) bool {
	var cd culpa.CodeDetail
	if !culpa.FindDetail(err, &cd) {
		return false
	}
	code, ok := cd.Code.(string)
	if !ok {
		return false
	}
	switch code {
	case "DB_FOREIGN_KEY", "DB_CONSTRAINT":
		return true
	}
	return false
}

A internal/domain/ingest/service_test.go => internal/domain/ingest/service_test.go +424 -0
@@ 0,0 1,424 @@
package ingest_test

import (
	"context"
	"fmt"
	"strings"
	"testing"

	"sourcecraft.dev/bigbes/lethe/internal/config"
	"sourcecraft.dev/bigbes/lethe/internal/domain/ingest"
	"sourcecraft.dev/bigbes/lethe/internal/platform/database"
	"sourcecraft.dev/bigbes/lethe/internal/platform/observability"
)

// newService wires Service+Repository+Logger+Metrics against a fresh in-memory
// DB. Returns the Service and the underlying *Database for direct queries.
func newService(t *testing.T, cfg config.IngestConfig) (*ingest.Service, *database.Database) {
	t.Helper()
	d := newTestDatabase(t)
	repo := &ingest.Repository{Database: d}
	if err := repo.Init(context.Background()); err != nil {
		t.Fatalf("repo.Init: %v", err)
	}
	logger := &observability.Logger{Cfg: config.LoggingConfig{Level: "info", Format: "json"}}
	if err := logger.Init(context.Background()); err != nil {
		t.Fatalf("logger.Init: %v", err)
	}
	metrics := &observability.Metrics{}
	if err := metrics.Init(context.Background()); err != nil {
		t.Fatalf("metrics.Init: %v", err)
	}
	svc := &ingest.Service{
		Cfg:     cfg,
		Repo:    repo,
		Log:     logger,
		Metrics: metrics,
	}
	if err := svc.Init(context.Background()); err != nil {
		t.Fatalf("svc.Init: %v", err)
	}
	return svc, d
}

// defaultCfg is the canonical IngestConfig used by most tests.
func defaultCfg() config.IngestConfig {
	return config.IngestConfig{
		MaxBodyBytes:        1 << 20, // 1 MiB
		MaxTurnContentBytes: 4096,
		ChunkSize:           10,
	}
}

// validLine produces a single NDJSON line with the supplied turn_id; all
// other fields are minimal-but-valid.
func validLine(turnID string, ts int64, content string) string {
	return fmt.Sprintf(
		`{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":%q,"seq":1,"role":"user","timestamp":%d,"content":%q,"session_meta":{"source_file":"/tmp/x.jsonl"}}`,
		turnID, ts, content,
	)
}

func body(lines ...string) *strings.Reader {
	return strings.NewReader(strings.Join(lines, "\n") + "\n")
}

// --- validation matrix ----------------------------------------------------

func TestService_Ingest_RequiredFieldsRejected(t *testing.T) {
	cases := []struct {
		name string
		line string
	}{
		{"missing tool", `{"host":"phoebe","session_id":"s1","turn_id":"t","seq":1,"role":"user","timestamp":1,"content":"x","session_meta":{"source_file":"/x"}}`},
		{"missing host", `{"tool":"cc","session_id":"s1","turn_id":"t","seq":1,"role":"user","timestamp":1,"content":"x","session_meta":{"source_file":"/x"}}`},
		{"missing session_id", `{"tool":"cc","host":"phoebe","turn_id":"t","seq":1,"role":"user","timestamp":1,"content":"x","session_meta":{"source_file":"/x"}}`},
		{"missing turn_id", `{"tool":"cc","host":"phoebe","session_id":"s1","seq":1,"role":"user","timestamp":1,"content":"x","session_meta":{"source_file":"/x"}}`},
		{"missing role", `{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":"t","seq":1,"timestamp":1,"content":"x","session_meta":{"source_file":"/x"}}`},
		{"missing timestamp", `{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":"t","seq":1,"role":"user","content":"x","session_meta":{"source_file":"/x"}}`},
		{"missing content", `{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":"t","seq":1,"role":"user","timestamp":1,"session_meta":{"source_file":"/x"}}`},
		{"missing source_file", `{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":"t","seq":1,"role":"user","timestamp":1,"content":"x","session_meta":{}}`},
		{"bad role", `{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":"t","seq":1,"role":"hacker","timestamp":1,"content":"x","session_meta":{"source_file":"/x"}}`},
	}
	for _, tc := range cases {
		t.Run(tc.name, func(t *testing.T) {
			svc, _ := newService(t, defaultCfg())
			res, err := svc.Ingest(context.Background(), "alice", strings.NewReader(tc.line+"\n"), 1<<20)
			if err != nil {
				t.Fatalf("Ingest err: %v", err)
			}
			if res.Accepted != 0 {
				t.Errorf("Accepted=%d; want 0", res.Accepted)
			}
			if len(res.Errors) != 1 || res.Errors[0].Line != 1 {
				t.Fatalf("expected one LineError on line 1, got %+v", res.Errors)
			}
		})
	}
}

func TestService_Ingest_ContentOverCapRejected(t *testing.T) {
	cfg := defaultCfg()
	cfg.MaxTurnContentBytes = 16
	svc, _ := newService(t, cfg)
	line := validLine("tA", 1700000000, strings.Repeat("a", 17))
	res, err := svc.Ingest(context.Background(), "alice", strings.NewReader(line+"\n"), 1<<20)
	if err != nil {
		t.Fatalf("Ingest: %v", err)
	}
	if res.Accepted != 0 || len(res.Errors) != 1 {
		t.Fatalf("expected 0 accepted + 1 error, got %+v", res)
	}
}

func TestService_Ingest_SourceFileOverCapRejected(t *testing.T) {
	svc, _ := newService(t, defaultCfg())
	long := strings.Repeat("p", 1025)
	line := fmt.Sprintf(
		`{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":"t","seq":1,"role":"user","timestamp":1,"content":"x","session_meta":{"source_file":%q}}`,
		long,
	)
	res, err := svc.Ingest(context.Background(), "alice", strings.NewReader(line+"\n"), 1<<20)
	if err != nil {
		t.Fatalf("Ingest: %v", err)
	}
	if res.Accepted != 0 || len(res.Errors) != 1 {
		t.Fatalf("expected 0 accepted + 1 error, got %+v", res)
	}
}

func TestService_Ingest_ValidationFailureCommitsPriorChunks(t *testing.T) {
	cfg := defaultCfg()
	cfg.ChunkSize = 2
	svc, d := newService(t, cfg)

	// 2 valid lines (commit as chunk 1) + a third bad line.
	lines := []string{
		validLine("t1", 1700000000, "one"),
		validLine("t2", 1700000001, "two"),
		`{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":"t3","seq":1,"role":"hacker","timestamp":1,"content":"x","session_meta":{"source_file":"/x"}}`,
	}
	res, err := svc.Ingest(context.Background(), "alice", body(lines...), 1<<20)
	if err != nil {
		t.Fatalf("Ingest: %v", err)
	}
	if res.Accepted != 2 {
		t.Errorf("Accepted=%d; want 2", res.Accepted)
	}
	if len(res.Errors) != 1 || res.Errors[0].Line != 3 {
		t.Fatalf("expected one error on line 3, got %+v", res.Errors)
	}
	var n int
	if err := d.DB.Get(&n, `SELECT COUNT(*) FROM turns WHERE owner='alice'`); err != nil {
		t.Fatalf("q: %v", err)
	}
	if n != 2 {
		t.Errorf("expected 2 committed turns, got %d", n)
	}
}

// --- semantics ------------------------------------------------------------

func TestService_Ingest_Idempotent(t *testing.T) {
	svc, d := newService(t, defaultCfg())
	lines := []string{
		validLine("t1", 1700000000, "alpha"),
		validLine("t2", 1700000001, "beta"),
	}
	for i := 0; i < 2; i++ {
		res, err := svc.Ingest(context.Background(), "alice", body(lines...), 1<<20)
		if err != nil {
			t.Fatalf("ingest %d: %v", i, err)
		}
		if res.Accepted != 2 {
			t.Errorf("Accepted=%d; want 2 on iteration %d", res.Accepted, i)
		}
	}
	var rows int
	if err := d.DB.Get(&rows, `SELECT COUNT(*) FROM turns`); err != nil {
		t.Fatalf("q: %v", err)
	}
	if rows != 2 {
		t.Errorf("expected 2 turns after re-ingest, got %d", rows)
	}
	var sessions int
	if err := d.DB.Get(&sessions, `SELECT COUNT(*) FROM sessions`); err != nil {
		t.Fatalf("q: %v", err)
	}
	if sessions != 1 {
		t.Errorf("expected 1 session, got %d", sessions)
	}
}

func TestService_Ingest_FirstWriteWinsSession(t *testing.T) {
	svc, d := newService(t, defaultCfg())
	line1 := `{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":"t1","seq":1,"role":"user","timestamp":1700000000,"content":"x","session_meta":{"source_file":"/A","working_dir":"/A"}}`
	if _, err := svc.Ingest(context.Background(), "alice", strings.NewReader(line1+"\n"), 1<<20); err != nil {
		t.Fatalf("ingest 1: %v", err)
	}
	line2 := `{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":"t2","seq":2,"role":"user","timestamp":1700000100,"content":"y","session_meta":{"source_file":"/B","working_dir":"/B"}}`
	if _, err := svc.Ingest(context.Background(), "alice", strings.NewReader(line2+"\n"), 1<<20); err != nil {
		t.Fatalf("ingest 2: %v", err)
	}

	var wd, sf string
	if err := d.DB.QueryRow(`SELECT working_dir, source_file FROM sessions WHERE owner=? AND session_id=?`,
		"alice", "s1").Scan(&wd, &sf); err != nil {
		t.Fatalf("q: %v", err)
	}
	if wd != "/A" || sf != "/A" {
		t.Errorf("first-write-wins broken; working_dir=%q source_file=%q", wd, sf)
	}
}

func TestService_Ingest_LastWriteWinsTurn(t *testing.T) {
	svc, d := newService(t, defaultCfg())
	if _, err := svc.Ingest(context.Background(), "alice",
		strings.NewReader(validLine("t1", 1700000000, "first")+"\n"), 1<<20); err != nil {
		t.Fatalf("ingest 1: %v", err)
	}
	if _, err := svc.Ingest(context.Background(), "alice",
		strings.NewReader(validLine("t1", 1700000100, "second")+"\n"), 1<<20); err != nil {
		t.Fatalf("ingest 2: %v", err)
	}
	var content string
	if err := d.DB.Get(&content, `SELECT content FROM turns WHERE turn_id='t1'`); err != nil {
		t.Fatalf("q: %v", err)
	}
	if content != "second" {
		t.Errorf("last-write-wins broken; content=%q", content)
	}
}

func TestService_Ingest_EndedAtExtends(t *testing.T) {
	svc, d := newService(t, defaultCfg())
	if _, err := svc.Ingest(context.Background(), "alice",
		strings.NewReader(validLine("t1", 1700000000, "x")+"\n"), 1<<20); err != nil {
		t.Fatalf("ingest 1: %v", err)
	}
	if _, err := svc.Ingest(context.Background(), "alice",
		strings.NewReader(validLine("t2", 1700000500, "y")+"\n"), 1<<20); err != nil {
		t.Fatalf("ingest 2: %v", err)
	}
	var ended int64
	if err := d.DB.Get(&ended, `SELECT ended_at FROM sessions WHERE owner=? AND session_id=?`, "alice", "s1"); err != nil {
		t.Fatalf("q: %v", err)
	}
	if ended != 1700000500 {
		t.Errorf("expected ended_at=1700000500, got %d", ended)
	}
}

func TestService_Ingest_StartedAtFallback(t *testing.T) {
	svc, d := newService(t, defaultCfg())
	// SessionMeta has no started_at; first turn timestamp wins.
	if _, err := svc.Ingest(context.Background(), "alice",
		strings.NewReader(validLine("t1", 1700000123, "x")+"\n"), 1<<20); err != nil {
		t.Fatalf("ingest: %v", err)
	}
	var started int64
	if err := d.DB.Get(&started, `SELECT started_at FROM sessions WHERE owner=? AND session_id=?`, "alice", "s1"); err != nil {
		t.Fatalf("q: %v", err)
	}
	if started != 1700000123 {
		t.Errorf("expected started_at=1700000123, got %d", started)
	}
}

func TestService_Ingest_ChunkedPartialAcceptBadLineInChunk3(t *testing.T) {
	cfg := defaultCfg()
	cfg.ChunkSize = 2
	svc, d := newService(t, cfg)

	lines := []string{
		validLine("t1", 1700000000, "a"),
		validLine("t2", 1700000001, "b"),
		validLine("t3", 1700000002, "c"),
		validLine("t4", 1700000003, "d"),
		// Bad line in chunk 3 (line index 5).
		`{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":"t5","seq":1,"role":"hacker","timestamp":1,"content":"x","session_meta":{"source_file":"/x"}}`,
		validLine("t6", 1700000005, "f"),
	}
	res, err := svc.Ingest(context.Background(), "alice", body(lines...), 1<<20)
	if err != nil {
		t.Fatalf("ingest: %v", err)
	}
	if res.Accepted != 4 {
		t.Errorf("Accepted=%d; want 4 (2 * chunkSize)", res.Accepted)
	}
	if len(res.Errors) != 1 || res.Errors[0].Line != 5 {
		t.Fatalf("expected one error referencing line 5, got %+v", res.Errors)
	}
	// Line 6 must not be processed.
	var n int
	if err := d.DB.Get(&n, `SELECT COUNT(*) FROM turns WHERE turn_id='t6'`); err != nil {
		t.Fatalf("q: %v", err)
	}
	if n != 0 {
		t.Errorf("expected line 6 not processed, found %d rows", n)
	}
	if err := d.DB.Get(&n, `SELECT COUNT(*) FROM turns`); err != nil {
		t.Fatalf("q: %v", err)
	}
	if n != 4 {
		t.Errorf("expected 4 committed turns, got %d", n)
	}
}

func TestService_Ingest_PerUserIsolation(t *testing.T) {
	svc, d := newService(t, defaultCfg())
	lines := []string{validLine("t1", 1700000000, "x"), validLine("t2", 1700000001, "y")}
	if _, err := svc.Ingest(context.Background(), "alice", body(lines...), 1<<20); err != nil {
		t.Fatalf("alice ingest: %v", err)
	}
	if _, err := svc.Ingest(context.Background(), "bob", body(lines...), 1<<20); err != nil {
		t.Fatalf("bob ingest: %v", err)
	}

	var n int
	if err := d.DB.Get(&n, `SELECT COUNT(*) FROM sessions`); err != nil {
		t.Fatalf("q sessions: %v", err)
	}
	if n != 2 {
		t.Errorf("expected 2 sessions across owners, got %d", n)
	}
	if err := d.DB.Get(&n, `SELECT COUNT(*) FROM turns`); err != nil {
		t.Fatalf("q turns: %v", err)
	}
	if n != 4 {
		t.Errorf("expected 4 turns across owners, got %d", n)
	}

	// Mutating bob's session must not touch alice's row.
	if _, err := d.DB.Exec(`UPDATE sessions SET ended_at = 1 WHERE owner='bob'`); err != nil {
		t.Fatalf("bob mutate: %v", err)
	}
	var aliceEnded int64
	if err := d.DB.Get(&aliceEnded, `SELECT ended_at FROM sessions WHERE owner='alice'`); err != nil {
		t.Fatalf("q alice: %v", err)
	}
	if aliceEnded == 1 {
		t.Errorf("bob's update bled into alice's row")
	}
}

func TestService_Ingest_WireOwnerIgnored(t *testing.T) {
	svc, d := newService(t, defaultCfg())
	// Stray "owner":"evil" in the wire payload — must be dropped silently.
	line := `{"owner":"evil","tool":"cc","host":"phoebe","session_id":"s1","turn_id":"t1","seq":1,"role":"user","timestamp":1700000000,"content":"x","session_meta":{"source_file":"/x"}}`
	if _, err := svc.Ingest(context.Background(), "alice", strings.NewReader(line+"\n"), 1<<20); err != nil {
		t.Fatalf("ingest: %v", err)
	}
	var owner string
	if err := d.DB.Get(&owner, `SELECT owner FROM turns WHERE turn_id='t1'`); err != nil {
		t.Fatalf("q: %v", err)
	}
	if owner != "alice" {
		t.Fatalf("expected owner=alice, got %q", owner)
	}
	var n int
	if err := d.DB.Get(&n, `SELECT COUNT(*) FROM turns WHERE owner='evil'`); err != nil {
		t.Fatalf("q evil: %v", err)
	}
	if n != 0 {
		t.Fatalf("expected zero rows for owner=evil, got %d", n)
	}
}

func TestService_Ingest_FTSAfterIngest(t *testing.T) {
	svc, d := newService(t, defaultCfg())
	line := `{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":"tA","seq":1,"role":"user","timestamp":1700000000,"content":"unmistakable haystack","tool_calls":{"name":"shellexec"},"session_meta":{"source_file":"/x"}}`
	if _, err := svc.Ingest(context.Background(), "alice", strings.NewReader(line+"\n"), 1<<20); err != nil {
		t.Fatalf("ingest: %v", err)
	}
	var n int
	if err := d.DB.Get(&n, `SELECT COUNT(*) FROM turns_fts WHERE owner='alice' AND turns_fts MATCH 'unmistakable'`); err != nil {
		t.Fatalf("q turns_fts: %v", err)
	}
	if n != 1 {
		t.Fatalf("expected 1 turns_fts hit, got %d", n)
	}
	if err := d.DB.Get(&n, `SELECT COUNT(*) FROM tool_outputs_fts WHERE owner='alice' AND tool_outputs_fts MATCH 'shellexec'`); err != nil {
		t.Fatalf("q tool_outputs_fts: %v", err)
	}
	if n != 1 {
		t.Fatalf("expected 1 tool_outputs_fts hit, got %d", n)
	}
}

func TestService_Ingest_DBDownReturnsError(t *testing.T) {
	svc, d := newService(t, defaultCfg())
	// Close the underlying *sql.DB without nilling the field; the next
	// BeginTx returns sql.ErrConnDone (or similar), simulating a real
	// runtime "database closed" failure without crashing the test.
	if err := d.DB.Close(); err != nil {
		t.Fatalf("close: %v", err)
	}
	res, err := svc.Ingest(context.Background(), "alice",
		strings.NewReader(validLine("t1", 1700000000, "x")+"\n"), 1<<20)
	if err == nil {
		t.Fatalf("expected error, got nil; result=%+v", res)
	}
	if res.Accepted != 0 {
		t.Errorf("expected Accepted=0 on DB-down, got %d", res.Accepted)
	}
}

func TestService_Ingest_BodyOverCapReturnsTooLarge(t *testing.T) {
	cfg := defaultCfg()
	cfg.MaxBodyBytes = 100
	svc, _ := newService(t, cfg)
	// 1 KiB payload that does not fit.
	line := validLine("t1", 1700000000, strings.Repeat("a", 1024))
	res, err := svc.Ingest(context.Background(), "alice", strings.NewReader(line+"\n"), cfg.MaxBodyBytes)
	if err == nil {
		t.Fatalf("expected TOO_LARGE error, got nil; result=%+v", res)
	}
	// We don't import apierror here; just assert err contains the message.
	if !strings.Contains(err.Error(), "limit") && !strings.Contains(err.Error(), "long") {
		t.Errorf("expected too-large/limit error, got %v", err)
	}
	if res.Accepted != 0 {
		t.Errorf("Accepted=%d; want 0 on body cap", res.Accepted)
	}
}

M internal/pkg/apierror/apierror.go => internal/pkg/apierror/apierror.go +3 -0
@@ 52,6 52,9 @@ var codeStatus = map[string]int{
	"FORBIDDEN":          http.StatusForbidden,
	"CONFLICT":           http.StatusConflict,
	"TOO_LARGE":          http.StatusRequestEntityTooLarge,
	// UNSUPPORTED_MEDIA_TYPE is used by the ingest handler when the request's
	// Content-Type is not the canonical NDJSON media type. Added in Phase 7.
	"UNSUPPORTED_MEDIA_TYPE": http.StatusUnsupportedMediaType,
}

// Render is the single boundary that translates a culpa-wrapped error into