From 8383c20c8ccfc2f64ad0987140d6536ccaf8d7ff Mon Sep 17 00:00:00 2001 From: Eugene Blikh Date: Sat, 25 Apr 2026 23:21:31 +0300 Subject: [PATCH] feat(ingest): NDJSON ingest with chunked transactions and partial-accept --- internal/domain/ingest/handler.go | 94 ++++- internal/domain/ingest/handler_test.go | 170 +++++++++ internal/domain/ingest/repository.go | 147 ++++++++ internal/domain/ingest/repository_test.go | 306 ++++++++++++++++ internal/domain/ingest/service.go | 235 ++++++++++++ internal/domain/ingest/service_test.go | 424 ++++++++++++++++++++++ internal/pkg/apierror/apierror.go | 3 + 7 files changed, 1368 insertions(+), 11 deletions(-) create mode 100644 internal/domain/ingest/handler_test.go create mode 100644 internal/domain/ingest/repository.go create mode 100644 internal/domain/ingest/repository_test.go create mode 100644 internal/domain/ingest/service.go create mode 100644 internal/domain/ingest/service_test.go diff --git a/internal/domain/ingest/handler.go b/internal/domain/ingest/handler.go index 381bcba1c4f579847a5dd72ba31da6bb82b7155b..560ffc10eacad3e06b7dc265a295e09f5c5a2049 100644 --- a/internal/domain/ingest/handler.go +++ b/internal/domain/ingest/handler.go @@ -1,22 +1,94 @@ -// Phase-5 stub. Replaced by real implementation in Phase 7. -// -// Mount is a no-op so the /api/v1 group registers no ingest routes yet; -// Server.Init still wires it in so the full router topology compiles. package ingest import ( "context" + "log/slog" + "net/http" + "strings" "github.com/go-chi/chi/v5" + "go.bigb.es/auxilia/culpa" + "go.bigb.es/auxilia/scribe" + + "sourcecraft.dev/bigbes/lethe/internal/config" + "sourcecraft.dev/bigbes/lethe/internal/pkg/apierror" + "sourcecraft.dev/bigbes/lethe/internal/pkg/httputil" + "sourcecraft.dev/bigbes/lethe/internal/server/auth" ) -// Handler is a no-op steward service. Phase 7 supplies the real ingest -// pipeline (NDJSON streaming, validation, chunked commit). -type Handler struct{} +// ndjsonContentType is the canonical media type for the ingest payload. The +// IETF-style `application/x-ndjson` is the documented choice for lethe; the +// collector emits this exact value. Anything else is rejected with 415. +const ndjsonContentType = "application/x-ndjson" + +// Handler is the steward-managed ingest HTTP boundary. It enforces the +// Content-Type and body cap, derives the owner from the authenticated +// identity (never from the wire payload), and delegates the streaming pipeline +// to Service. +type Handler struct { + Cfg config.IngestConfig `config:""` + Service *Service `inject:""` +} -// Init satisfies the steward Service contract. +// Init satisfies the steward Initer contract. The handler holds no state +// beyond its injected dependencies. func (h *Handler) Init(_ context.Context) error { return nil } -// Mount registers no routes. Phase 7 replaces this with the ingest endpoints -// rooted at the /api/v1 group passed in by Server.Init. -func (h *Handler) Mount(_ chi.Router) {} +// Mount registers POST /ingest on r. The router supplied by Server is the +// /api/v1 group, so the effective path is /api/v1/ingest. Auth middleware is +// installed by the parent group; Post relies on auth.MustIdentity(). +func (h *Handler) Mount(r chi.Router) { + r.Post("/ingest", h.Post) +} + +// Post is the NDJSON ingest endpoint. It always returns 200 when the body is +// well-formed enough to read — per-line failures live in the JSON `errors` +// array. Hard infrastructure failures (DB down, body cap exceeded) propagate +// as RFC 7807 problems via apierror.Render. +func (h *Handler) Post(w http.ResponseWriter, r *http.Request) { + identity := auth.MustIdentity(r.Context()) + + if !isNDJSONContentType(r.Header.Get("Content-Type")) { + apierror.Render(w, r, culpa.WithCode( + culpa.WithPublic( + culpa.Errorf("Content-Type must be %s", ndjsonContentType), + "Content-Type must be "+ndjsonContentType, + ), + "UNSUPPORTED_MEDIA_TYPE", + )) + return + } + + // MaxBytesReader caps the request body; Service.Ingest's NDJSON scanner + // surfaces an over-cap read as a TOO_LARGE-coded culpa error which + // apierror.Render turns into 413. + body := http.MaxBytesReader(w, r.Body, h.Cfg.MaxBodyBytes) + defer func() { _ = body.Close() }() + + result, err := h.Service.Ingest(r.Context(), identity.User, body, h.Cfg.MaxBodyBytes) + if err != nil { + // Render the failure but don't drop the partial-accept count: the + // 5xx/413 problem document tells the client the run aborted, and + // the next call will resume from its own offset bookkeeping. + apierror.Render(w, r, err) + return + } + + if writeErr := httputil.WriteJSON(w, http.StatusOK, result); writeErr != nil { + slog.Default().ErrorContext(r.Context(), "write ingest response", scribe.Err(writeErr)) + } +} + +// isNDJSONContentType returns true when the header value (with any trailing +// parameters such as `; charset=utf-8`) names the canonical NDJSON media +// type. The MIME type itself is matched case-insensitively per RFC 9110. +func isNDJSONContentType(v string) bool { + if v == "" { + return false + } + // Strip parameters; we only need the type/subtype prefix. + if idx := strings.IndexByte(v, ';'); idx >= 0 { + v = v[:idx] + } + return strings.EqualFold(strings.TrimSpace(v), ndjsonContentType) +} diff --git a/internal/domain/ingest/handler_test.go b/internal/domain/ingest/handler_test.go new file mode 100644 index 0000000000000000000000000000000000000000..23ff2f1de6543392261ded4ad655171447adfe94 --- /dev/null +++ b/internal/domain/ingest/handler_test.go @@ -0,0 +1,170 @@ +package ingest_test + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/go-chi/chi/v5" + + "sourcecraft.dev/bigbes/lethe/internal/config" + "sourcecraft.dev/bigbes/lethe/internal/domain/ingest" + "sourcecraft.dev/bigbes/lethe/internal/server/auth" +) + +// fakeAuthMiddleware injects a fixed Identity onto the request context so the +// handler under test can call auth.MustIdentity without a real Authenticator. +func fakeAuthMiddleware(user string) func(http.Handler) http.Handler { + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ctx := auth.WithIdentity(r.Context(), auth.Identity{User: user}) + next.ServeHTTP(w, r.WithContext(ctx)) + }) + } +} + +// newHandler wires Service + Repository + observability + handler against a +// fresh in-memory database, then returns the handler. +func newHandler(t *testing.T, cfg config.IngestConfig) *ingest.Handler { + t.Helper() + svc, _ := newService(t, cfg) + h := &ingest.Handler{Cfg: cfg, Service: svc} + if err := h.Init(context.Background()); err != nil { + t.Fatalf("handler.Init: %v", err) + } + return h +} + +// mountHandler builds a chi router with the fake auth middleware and the +// ingest handler mounted under /api/v1. +func mountHandler(h *ingest.Handler, user string) http.Handler { + r := chi.NewRouter() + r.Route("/api/v1", func(r chi.Router) { + r.Use(fakeAuthMiddleware(user)) + h.Mount(r) + }) + return r +} + +func TestHandler_Post_HappyPath(t *testing.T) { + h := newHandler(t, defaultCfg()) + router := mountHandler(h, "alice") + + payload := strings.Join([]string{validLine("t1", 1700000000, "x"), validLine("t2", 1700000001, "y")}, "\n") + "\n" + req := httptest.NewRequest(http.MethodPost, "/api/v1/ingest", strings.NewReader(payload)) + req.Header.Set("Content-Type", "application/x-ndjson") + rec := httptest.NewRecorder() + router.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("status=%d body=%s", rec.Code, rec.Body.String()) + } + var got ingest.Result + if err := json.Unmarshal(rec.Body.Bytes(), &got); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if got.Accepted != 2 { + t.Errorf("Accepted=%d; want 2", got.Accepted) + } + if len(got.Errors) != 0 { + t.Errorf("Errors=%v; want empty", got.Errors) + } +} + +func TestHandler_Post_WrongContentTypeReturns415(t *testing.T) { + h := newHandler(t, defaultCfg()) + router := mountHandler(h, "alice") + + req := httptest.NewRequest(http.MethodPost, "/api/v1/ingest", + strings.NewReader(validLine("t1", 1700000000, "x")+"\n")) + req.Header.Set("Content-Type", "application/json") + rec := httptest.NewRecorder() + router.ServeHTTP(rec, req) + + if rec.Code != http.StatusUnsupportedMediaType { + t.Fatalf("status=%d; want 415; body=%s", rec.Code, rec.Body.String()) + } + if ct := rec.Header().Get("Content-Type"); ct != "application/problem+json" { + t.Errorf("Content-Type=%q; want application/problem+json", ct) + } +} + +func TestHandler_Post_MissingContentTypeReturns415(t *testing.T) { + h := newHandler(t, defaultCfg()) + router := mountHandler(h, "alice") + + req := httptest.NewRequest(http.MethodPost, "/api/v1/ingest", + strings.NewReader(validLine("t1", 1700000000, "x")+"\n")) + // No Content-Type set. + rec := httptest.NewRecorder() + router.ServeHTTP(rec, req) + + if rec.Code != http.StatusUnsupportedMediaType { + t.Fatalf("status=%d; want 415", rec.Code) + } +} + +func TestHandler_Post_BodyOverCapReturns413(t *testing.T) { + cfg := defaultCfg() + cfg.MaxBodyBytes = 100 + h := newHandler(t, cfg) + router := mountHandler(h, "alice") + + // 1 KiB+ body that exceeds the cap. + big := validLine("t1", 1700000000, strings.Repeat("a", 1024)) + req := httptest.NewRequest(http.MethodPost, "/api/v1/ingest", strings.NewReader(big+"\n")) + req.Header.Set("Content-Type", "application/x-ndjson") + rec := httptest.NewRecorder() + router.ServeHTTP(rec, req) + + if rec.Code != http.StatusRequestEntityTooLarge { + t.Fatalf("status=%d; want 413; body=%s", rec.Code, rec.Body.String()) + } +} + +func TestHandler_Post_DBDownReturns5xx(t *testing.T) { + cfg := defaultCfg() + svc, db := newService(t, cfg) + h := &ingest.Handler{Cfg: cfg, Service: svc} + if err := h.Init(context.Background()); err != nil { + t.Fatalf("handler.Init: %v", err) + } + // Close the underlying *sql.DB without nilling the field so BeginTx + // returns an error rather than nil-dereferencing. This mirrors the + // runtime "database closed" failure mode (e.g. driver disconnect) more + // accurately than calling Destroy. + if err := db.DB.Close(); err != nil { + t.Fatalf("close underlying db: %v", err) + } + router := mountHandler(h, "alice") + + req := httptest.NewRequest(http.MethodPost, "/api/v1/ingest", + strings.NewReader(validLine("t1", 1700000000, "x")+"\n")) + req.Header.Set("Content-Type", "application/x-ndjson") + rec := httptest.NewRecorder() + router.ServeHTTP(rec, req) + + if rec.Code < 500 { + t.Fatalf("status=%d; want 5xx; body=%s", rec.Code, rec.Body.String()) + } +} + +// TestHandler_Post_ContentTypeWithCharset proves the canonical media check +// tolerates the `; charset=utf-8` parameter the collector may attach. +func TestHandler_Post_ContentTypeWithCharset(t *testing.T) { + h := newHandler(t, defaultCfg()) + router := mountHandler(h, "alice") + + req := httptest.NewRequest(http.MethodPost, "/api/v1/ingest", + strings.NewReader(validLine("t1", 1700000000, "x")+"\n")) + req.Header.Set("Content-Type", "application/x-ndjson; charset=utf-8") + rec := httptest.NewRecorder() + router.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("status=%d; want 200; body=%s", rec.Code, rec.Body.String()) + } +} diff --git a/internal/domain/ingest/repository.go b/internal/domain/ingest/repository.go new file mode 100644 index 0000000000000000000000000000000000000000..db082808f45ae0a884f6f7be95277214d4185eec --- /dev/null +++ b/internal/domain/ingest/repository.go @@ -0,0 +1,147 @@ +// Package ingest implements the NDJSON ingest pipeline for lethe: per-line +// validation against the wire contract, chunked transactional commit with +// per-line partial-accept semantics, and the trust-model invariant that the +// stored owner is always the authenticated user (never the wire payload). +// +// Layering inside the package: +// - Repository owns the raw SQL: per-chunk UPSERT of sessions/turns inside a +// caller-supplied transaction. It does not begin or commit the tx. +// - Service owns the streaming, validation, and chunking policy. It opens +// and commits one tx per chunk, increments metrics, and converts SQL +// errors into per-line LineError entries (or hard 5xx for non-line +// failures like a closed DB). +// - Handler is the HTTP boundary: enforces Content-Type, body cap, and +// auth-derived owner; renders RFC 7807 problems via apierror.Render. +package ingest + +import ( + "context" + "errors" + + "github.com/jmoiron/sqlx" + "go.bigb.es/auxilia/culpa" + "modernc.org/sqlite" + sqlite3 "modernc.org/sqlite/lib" + + "sourcecraft.dev/bigbes/lethe/internal/platform/database" + "sourcecraft.dev/bigbes/lethe/internal/shared/wire" +) + +// Repository is the steward-managed SQL steward for ingest. It is stateless; +// Init is empty because every UpsertChunk call gets its tx from the Service. +type Repository struct { + Database *database.Database `inject:""` +} + +// Init satisfies the steward Initer contract. The Repository has no state to +// build up — the database is owned by the Database steward and injected +// directly. +func (r *Repository) Init(_ context.Context) error { return nil } + +// upsertSessionStmt UPSERTs a row in `sessions` keyed by the composite PK +// (owner, tool, host, session_id). On conflict only `ended_at` is widened +// (MAX), preserving the first-write-wins values for `started_at`, `working_dir`, +// `source_file`, and `metadata`. SQLite leaves columns not in the SET list +// unchanged on the existing row when DO UPDATE fires. +const upsertSessionStmt = ` +INSERT INTO sessions + (owner, tool, host, session_id, started_at, ended_at, working_dir, source_file, metadata) +VALUES + (?, ?, ?, ?, ?, ?, ?, ?, ?) +ON CONFLICT (owner, tool, host, session_id) DO UPDATE SET + ended_at = MAX(sessions.ended_at, excluded.ended_at) +` + +// upsertTurnStmt UPSERTs a row in `turns` keyed by the composite PK +// (owner, tool, host, session_id, turn_id). Last-write-wins: every non-key +// column is overwritten from the incoming row. +const upsertTurnStmt = ` +INSERT INTO turns + (owner, tool, host, session_id, turn_id, seq, role, timestamp, + content, model, tokens_in, tokens_out, cost_usd, tool_calls, metadata) +VALUES + (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) +ON CONFLICT (owner, tool, host, session_id, turn_id) DO UPDATE SET + seq = excluded.seq, + role = excluded.role, + timestamp = excluded.timestamp, + content = excluded.content, + model = excluded.model, + tokens_in = excluded.tokens_in, + tokens_out = excluded.tokens_out, + cost_usd = excluded.cost_usd, + tool_calls = excluded.tool_calls, + metadata = excluded.metadata +` + +// UpsertChunk writes every turn (and its parent session) in chunk under the +// caller-supplied transaction. The tx is owned by the caller (the Service): +// UpsertChunk neither begins nor commits it. +// +// owner is bound from the function parameter on every INSERT — never from the +// wire payload. wire.TurnEvent has no `owner` JSON field; even if a hostile +// client sneaks a stray `"owner":"evil"` into the line, json.Unmarshal drops +// it silently and the SQL below never references it. This invariant is the +// core of the multi-tenant trust model. +func (r *Repository) UpsertChunk(ctx context.Context, tx *sqlx.Tx, owner string, chunk []wire.TurnEvent) error { + for _, t := range chunk { + startedAt := t.Timestamp + if t.SessionMeta.StartedAt != nil { + startedAt = *t.SessionMeta.StartedAt + } + + // owner is bound from the function parameter, not the wire payload (trust model invariant). + if _, err := tx.ExecContext(ctx, upsertSessionStmt, + owner, t.Tool, t.Host, t.SessionID, + startedAt, + t.Timestamp, // ended_at first-write equals started turn timestamp; widened by MAX on conflict. + t.SessionMeta.WorkingDir, + t.SessionMeta.SourceFile, + rawMessageOrNil(t.SessionMeta.Metadata), + ); err != nil { + return wrapSQLErr(err, "upsert session") + } + + // owner is bound from the function parameter, not the wire payload (trust model invariant). + if _, err := tx.ExecContext(ctx, upsertTurnStmt, + owner, t.Tool, t.Host, t.SessionID, t.TurnID, + t.Seq, t.Role, t.Timestamp, t.Content, + t.Model, t.TokensIn, t.TokensOut, t.CostUSD, + rawMessageOrNil(t.ToolCalls), + rawMessageOrNil(t.Metadata), + ); err != nil { + return wrapSQLErr(err, "upsert turn") + } + } + return nil +} + +// rawMessageOrNil normalizes a json.RawMessage to a SQL NULL when empty so the +// stored value is a real NULL (not the literal string "null" or ""). +func rawMessageOrNil(m []byte) any { + if len(m) == 0 { + return nil + } + return string(m) +} + +// wrapSQLErr classifies SQLite-driver errors so the Service can decide +// whether to attribute them to the offending line (FK / constraint failures) +// or to surface them as a hard 5xx (DB closed, IO error). Any unrecognized +// failure is wrapped under DB_UPSERT. +func wrapSQLErr(err error, msg string) error { + var se *sqlite.Error + if errors.As(err, &se) { + switch se.Code() { + case sqlite3.SQLITE_CONSTRAINT_FOREIGNKEY: + return culpa.WithCode(culpa.Wrap(err, msg), "DB_FOREIGN_KEY") + case sqlite3.SQLITE_CONSTRAINT, + sqlite3.SQLITE_CONSTRAINT_CHECK, + sqlite3.SQLITE_CONSTRAINT_NOTNULL, + sqlite3.SQLITE_CONSTRAINT_PRIMARYKEY, + sqlite3.SQLITE_CONSTRAINT_UNIQUE: + return culpa.WithCode(culpa.Wrap(err, msg), "DB_CONSTRAINT") + } + } + return culpa.WithCode(culpa.Wrap(err, msg), "DB_UPSERT") +} diff --git a/internal/domain/ingest/repository_test.go b/internal/domain/ingest/repository_test.go new file mode 100644 index 0000000000000000000000000000000000000000..81668636d3803d73d060443c29b3633f37700e57 --- /dev/null +++ b/internal/domain/ingest/repository_test.go @@ -0,0 +1,306 @@ +package ingest_test + +import ( + "context" + "encoding/json" + "testing" + "time" + + "github.com/jmoiron/sqlx" + _ "modernc.org/sqlite" + + "sourcecraft.dev/bigbes/lethe/internal/config" + "sourcecraft.dev/bigbes/lethe/internal/domain/ingest" + "sourcecraft.dev/bigbes/lethe/internal/platform/database" + "sourcecraft.dev/bigbes/lethe/internal/shared/wire" +) + +// newTestDatabase builds a Database steward against :memory:, runs Init, and +// registers Cleanup. Returns the *Database (Repository expects the steward). +func newTestDatabase(t *testing.T) *database.Database { + t.Helper() + d := &database.Database{ + Cfg: config.DatabaseConfig{ + Path: ":memory:", + BusyTimeout: 5 * time.Second, + }, + } + if err := d.Init(context.Background()); err != nil { + t.Fatalf("database.Init: %v", err) + } + t.Cleanup(func() { _ = d.Destroy(context.Background()) }) + return d +} + +// newRepo wires a Repository against a fresh in-memory database. +func newRepo(t *testing.T) (*ingest.Repository, *sqlx.DB) { + t.Helper() + d := newTestDatabase(t) + repo := &ingest.Repository{Database: d} + if err := repo.Init(context.Background()); err != nil { + t.Fatalf("repo.Init: %v", err) + } + return repo, d.DB +} + +func ptrInt64(v int64) *int64 { return &v } +func ptrString(v string) *string { return &v } +func ptrFloat64(v float64) *float64 { return &v } + +// turn builds a minimal valid TurnEvent with overridable session_meta. +func turn(tool, host, sid, tid string, seq int64, ts int64, content string) wire.TurnEvent { + return wire.TurnEvent{ + Tool: tool, + Host: host, + SessionID: sid, + TurnID: tid, + Seq: seq, + Role: "user", + Timestamp: ts, + Content: content, + SessionMeta: wire.SessionMeta{ + SourceFile: "/tmp/x.jsonl", + }, + } +} + +func runUpsert(t *testing.T, repo *ingest.Repository, db *sqlx.DB, owner string, turns []wire.TurnEvent) { + t.Helper() + if err := database.InTx(context.Background(), db, func(tx *sqlx.Tx) error { + return repo.UpsertChunk(context.Background(), tx, owner, turns) + }); err != nil { + t.Fatalf("upsert: %v", err) + } +} + +func TestRepository_UpsertChunk_FirstWriteWinsSession(t *testing.T) { + repo, db := newRepo(t) + t1 := turn("cc", "phoebe", "s1", "tA", 1, 1700000000, "hello") + t1.SessionMeta.WorkingDir = ptrString("/A") + runUpsert(t, repo, db, "alice", []wire.TurnEvent{t1}) + + t2 := turn("cc", "phoebe", "s1", "tB", 2, 1700000100, "world") + t2.SessionMeta.WorkingDir = ptrString("/B") + runUpsert(t, repo, db, "alice", []wire.TurnEvent{t2}) + + var wd string + if err := db.Get(&wd, `SELECT working_dir FROM sessions WHERE owner=? AND tool=? AND host=? AND session_id=?`, + "alice", "cc", "phoebe", "s1"); err != nil { + t.Fatalf("query: %v", err) + } + if wd != "/A" { + t.Fatalf("expected first-write working_dir /A, got %q", wd) + } +} + +func TestRepository_UpsertChunk_LastWriteWinsTurn(t *testing.T) { + repo, db := newRepo(t) + tA := turn("cc", "phoebe", "s1", "tA", 1, 1700000000, "first content") + runUpsert(t, repo, db, "alice", []wire.TurnEvent{tA}) + + tB := turn("cc", "phoebe", "s1", "tA", 2, 1700000050, "second content") + tB.Role = "assistant" + runUpsert(t, repo, db, "alice", []wire.TurnEvent{tB}) + + var content, role string + var seq int64 + if err := db.QueryRow(`SELECT content, role, seq FROM turns WHERE owner=? AND turn_id=?`, + "alice", "tA").Scan(&content, &role, &seq); err != nil { + t.Fatalf("query: %v", err) + } + if content != "second content" || role != "assistant" || seq != 2 { + t.Fatalf("expected last-write columns; got content=%q role=%q seq=%d", content, role, seq) + } +} + +func TestRepository_UpsertChunk_EndedAtMax(t *testing.T) { + repo, db := newRepo(t) + t1 := turn("cc", "phoebe", "s1", "tA", 1, 1700000100, "first") + t1.SessionMeta.StartedAt = ptrInt64(1700000000) + runUpsert(t, repo, db, "alice", []wire.TurnEvent{t1}) + + // Second turn: smaller ended_at than current; MAX must keep first. + t2 := turn("cc", "phoebe", "s1", "tB", 2, 1699999999, "earlier") + runUpsert(t, repo, db, "alice", []wire.TurnEvent{t2}) + var ended int64 + if err := db.Get(&ended, `SELECT ended_at FROM sessions WHERE owner=? AND session_id=?`, "alice", "s1"); err != nil { + t.Fatalf("q1: %v", err) + } + if ended != 1700000100 { + t.Fatalf("expected ended_at preserved at 1700000100, got %d", ended) + } + + // Third: larger ended_at; MAX must extend. + t3 := turn("cc", "phoebe", "s1", "tC", 3, 1700000500, "later") + runUpsert(t, repo, db, "alice", []wire.TurnEvent{t3}) + if err := db.Get(&ended, `SELECT ended_at FROM sessions WHERE owner=? AND session_id=?`, "alice", "s1"); err != nil { + t.Fatalf("q2: %v", err) + } + if ended != 1700000500 { + t.Fatalf("expected ended_at extended to 1700000500, got %d", ended) + } +} + +func TestRepository_UpsertChunk_StartedAtFallbackMinTurnTimestamp(t *testing.T) { + repo, db := newRepo(t) + // SessionMeta.StartedAt is nil; started_at must come from turn timestamp. + t1 := turn("cc", "phoebe", "s1", "tA", 1, 1700000050, "x") + runUpsert(t, repo, db, "alice", []wire.TurnEvent{t1}) + var started int64 + if err := db.Get(&started, `SELECT started_at FROM sessions WHERE owner=? AND session_id=?`, "alice", "s1"); err != nil { + t.Fatalf("q: %v", err) + } + if started != 1700000050 { + t.Fatalf("expected started_at fallback 1700000050, got %d", started) + } + + // A subsequent turn with an earlier timestamp must NOT change started_at + // (first-write-wins on start). + t2 := turn("cc", "phoebe", "s1", "tB", 2, 1700000010, "earlier") + runUpsert(t, repo, db, "alice", []wire.TurnEvent{t2}) + if err := db.Get(&started, `SELECT started_at FROM sessions WHERE owner=? AND session_id=?`, "alice", "s1"); err != nil { + t.Fatalf("q2: %v", err) + } + if started != 1700000050 { + t.Fatalf("expected started_at preserved at 1700000050, got %d", started) + } +} + +func TestRepository_UpsertChunk_PerOwnerIsolation(t *testing.T) { + repo, db := newRepo(t) + tA := turn("cc", "phoebe", "s1", "tA", 1, 1700000000, "shared") + runUpsert(t, repo, db, "alice", []wire.TurnEvent{tA}) + runUpsert(t, repo, db, "bob", []wire.TurnEvent{tA}) + + var rows int + if err := db.Get(&rows, `SELECT COUNT(*) FROM sessions WHERE tool='cc' AND host='phoebe' AND session_id='s1'`); err != nil { + t.Fatalf("q: %v", err) + } + if rows != 2 { + t.Fatalf("expected two sessions across owners, got %d", rows) + } + if err := db.Get(&rows, `SELECT COUNT(*) FROM turns WHERE turn_id='tA'`); err != nil { + t.Fatalf("q2: %v", err) + } + if rows != 2 { + t.Fatalf("expected two turns across owners, got %d", rows) + } +} + +func TestRepository_UpsertChunk_FKViolationPropagatesAsLineLevelCode(t *testing.T) { + // Repository alone cannot produce an FK violation — sessions are upserted + // by the same call. We synthesize the violation by inserting a turn + // referencing a (owner, tool, host, session_id) that does not exist; + // this proves the FK wrapper code-path is reachable via the Repository's + // raw exec layer for the Service to classify as line-level. + d := newTestDatabase(t) + tx, err := d.DB.BeginTxx(context.Background(), nil) + if err != nil { + t.Fatalf("begin: %v", err) + } + defer func() { _ = tx.Rollback() }() + + _, err = tx.Exec(` + INSERT INTO turns (owner, tool, host, session_id, turn_id, seq, role, timestamp, content) + VALUES ('alice','cc','phoebe','ghost','tX',1,'user',1,'orphan')`) + if err == nil { + t.Fatalf("expected FK violation, got nil") + } +} + +func TestRepository_UpsertChunk_OwnerBoundFromParameter(t *testing.T) { + // Sneak a non-wire field into the JSON; unmarshalling drops it. Then + // confirm the stored row uses the parameter, not whatever the wire said. + repo, db := newRepo(t) + raw := []byte(`{"owner":"evil","tool":"cc","host":"phoebe","session_id":"s1","turn_id":"tA","seq":1,"role":"user","timestamp":1700000000,"content":"hi","session_meta":{"source_file":"/tmp/x.jsonl"}}`) + var ev wire.TurnEvent + if err := json.Unmarshal(raw, &ev); err != nil { + t.Fatalf("unmarshal: %v", err) + } + runUpsert(t, repo, db, "alice", []wire.TurnEvent{ev}) + + var owner string + if err := db.Get(&owner, `SELECT owner FROM turns WHERE turn_id='tA'`); err != nil { + t.Fatalf("q: %v", err) + } + if owner != "alice" { + t.Fatalf("expected stored owner=alice, got %q", owner) + } + var n int + if err := db.Get(&n, `SELECT COUNT(*) FROM turns WHERE owner='evil'`); err != nil { + t.Fatalf("q evil: %v", err) + } + if n != 0 { + t.Fatalf("expected zero rows for owner=evil, got %d", n) + } +} + +func TestRepository_UpsertChunk_FTSPopulated(t *testing.T) { + repo, db := newRepo(t) + tc := json.RawMessage(`{"name":"shell","args":"ls"}`) + t1 := turn("cc", "phoebe", "s1", "tA", 1, 1700000000, "find the needle") + t1.ToolCalls = tc + runUpsert(t, repo, db, "alice", []wire.TurnEvent{t1}) + + var n int + if err := db.Get(&n, `SELECT COUNT(*) FROM turns_fts WHERE owner='alice' AND turns_fts MATCH 'needle'`); err != nil { + t.Fatalf("q turns_fts: %v", err) + } + if n != 1 { + t.Fatalf("expected 1 turns_fts hit, got %d", n) + } + if err := db.Get(&n, `SELECT COUNT(*) FROM tool_outputs_fts WHERE owner='alice' AND tool_outputs_fts MATCH 'shell'`); err != nil { + t.Fatalf("q tool_outputs_fts: %v", err) + } + if n != 1 { + t.Fatalf("expected 1 tool_outputs_fts hit, got %d", n) + } +} + +// Verify metadata json.RawMessage round-trips as opaque text into the row. +func TestRepository_UpsertChunk_MetadataPersisted(t *testing.T) { + repo, db := newRepo(t) + t1 := turn("cc", "phoebe", "s1", "tA", 1, 1700000000, "x") + t1.Metadata = json.RawMessage(`{"k":"v"}`) + t1.SessionMeta.Metadata = json.RawMessage(`{"sk":"sv"}`) + runUpsert(t, repo, db, "alice", []wire.TurnEvent{t1}) + + var tm, sm string + if err := db.QueryRow(`SELECT metadata FROM turns WHERE turn_id='tA'`).Scan(&tm); err != nil { + t.Fatalf("q tm: %v", err) + } + if tm != `{"k":"v"}` { + t.Fatalf("turn metadata roundtrip failed: %q", tm) + } + if err := db.QueryRow(`SELECT metadata FROM sessions WHERE session_id='s1'`).Scan(&sm); err != nil { + t.Fatalf("q sm: %v", err) + } + if sm != `{"sk":"sv"}` { + t.Fatalf("session metadata roundtrip failed: %q", sm) + } +} + +// Defensive: pass a free *float64 to ensure pointer types serialize cleanly. +func TestRepository_UpsertChunk_OptionalPointersStored(t *testing.T) { + repo, db := newRepo(t) + t1 := turn("cc", "phoebe", "s1", "tA", 1, 1700000000, "x") + t1.Model = ptrString("gpt-x") + t1.TokensIn = ptrInt64(10) + t1.TokensOut = ptrInt64(20) + t1.CostUSD = ptrFloat64(0.12) + runUpsert(t, repo, db, "alice", []wire.TurnEvent{t1}) + + var ( + model string + tokensIn int64 + tokensOut int64 + cost float64 + ) + if err := db.QueryRow(`SELECT model, tokens_in, tokens_out, cost_usd FROM turns WHERE turn_id='tA'`). + Scan(&model, &tokensIn, &tokensOut, &cost); err != nil { + t.Fatalf("q: %v", err) + } + if model != "gpt-x" || tokensIn != 10 || tokensOut != 20 || cost != 0.12 { + t.Fatalf("optional fields not round-tripped: %v %v %v %v", model, tokensIn, tokensOut, cost) + } +} diff --git a/internal/domain/ingest/service.go b/internal/domain/ingest/service.go new file mode 100644 index 0000000000000000000000000000000000000000..9de606536142cf5099a73a503b15ba838014c24e --- /dev/null +++ b/internal/domain/ingest/service.go @@ -0,0 +1,235 @@ +package ingest + +import ( + "context" + "encoding/json" + "io" + "log/slog" + + "github.com/jmoiron/sqlx" + "go.bigb.es/auxilia/culpa" + "go.bigb.es/auxilia/scribe" + + "sourcecraft.dev/bigbes/lethe/internal/config" + "sourcecraft.dev/bigbes/lethe/internal/pkg/httputil" + "sourcecraft.dev/bigbes/lethe/internal/platform/database" + "sourcecraft.dev/bigbes/lethe/internal/platform/observability" + "sourcecraft.dev/bigbes/lethe/internal/shared/wire" +) + +// validRoles is the closed set of values accepted in TurnEvent.Role. Phase 7 +// validates against this list before reaching the database. +var validRoles = map[string]struct{}{ + "user": {}, + "assistant": {}, + "tool": {}, + "system": {}, +} + +// maxSourceFileBytes caps the per-line SessionMeta.SourceFile length. The +// server treats source_file as opaque, but extreme values are still rejected +// to keep storage and logs sane. +const maxSourceFileBytes = 1024 + +// LineError describes a single NDJSON line that failed validation, JSON +// parsing, or chunk commit. It is stable wire output: the handler embeds the +// list directly into the 200 response body. +type LineError struct { + Line int `json:"line"` + Err string `json:"error"` +} + +// Result is the outcome of one Ingest call. Accepted is the count of turns +// actually committed to the database; Errors lists per-line failures in the +// order they occurred. Per the spec the ingest endpoint returns 200 even when +// Errors is non-empty — the client uses Accepted to advance its offset. +type Result struct { + Accepted int `json:"accepted"` + Errors []LineError `json:"errors"` +} + +// Service is the steward-managed ingest pipeline. Repo is the SQL boundary; +// Cfg fixes the chunk size and content-byte cap; Log emits per-chunk failure +// notices at WARN; Metrics counts accepted/errored lines and committed chunks. +type Service struct { + Cfg config.IngestConfig `config:""` + Repo *Repository `inject:""` + Log *observability.Logger `inject:""` + Metrics *observability.Metrics `inject:""` +} + +// Init satisfies the steward Initer contract. The Service is stateless beyond +// its injected dependencies. +func (s *Service) Init(_ context.Context) error { return nil } + +// validateTurn enforces the wire contract on a single decoded TurnEvent. It +// returns a culpa-coded INVALID error on the first failure; the Service maps +// that into a per-line LineError. Seq is intentionally unchecked: zero is a +// legal first sequence number on the wire. +func validateTurn(t wire.TurnEvent, maxContentBytes int64) error { + if t.Tool == "" { + return culpa.WithCode(culpa.New("tool is required"), "INVALID") + } + if t.Host == "" { + return culpa.WithCode(culpa.New("host is required"), "INVALID") + } + if t.SessionID == "" { + return culpa.WithCode(culpa.New("session_id is required"), "INVALID") + } + if t.TurnID == "" { + return culpa.WithCode(culpa.New("turn_id is required"), "INVALID") + } + if t.Role == "" { + return culpa.WithCode(culpa.New("role is required"), "INVALID") + } + if _, ok := validRoles[t.Role]; !ok { + return culpa.WithCode(culpa.Errorf("role %q is not one of {user, assistant, tool, system}", t.Role), "INVALID") + } + if t.Timestamp == 0 { + return culpa.WithCode(culpa.New("timestamp is required"), "INVALID") + } + if t.Content == "" { + return culpa.WithCode(culpa.New("content is required"), "INVALID") + } + if int64(len(t.Content)) > maxContentBytes { + return culpa.WithCode(culpa.Errorf("content exceeds %d bytes", maxContentBytes), "INVALID") + } + if t.SessionMeta.SourceFile == "" { + return culpa.WithCode(culpa.New("session_meta.source_file is required"), "INVALID") + } + if len(t.SessionMeta.SourceFile) > maxSourceFileBytes { + return culpa.WithCode(culpa.Errorf("session_meta.source_file exceeds %d bytes", maxSourceFileBytes), "INVALID") + } + return nil +} + +// Ingest streams NDJSON from body, validates each line, and commits in chunks +// of Cfg.ChunkSize under one transaction per chunk. +// +// Partial-accept semantics: lines that committed in earlier chunks remain +// committed even when a later line fails — Accepted reflects what is actually +// in the DB. A per-line failure (JSON parse, validation, FK) appends a +// LineError and stops processing; subsequent lines are not touched. A +// non-line failure (DB closed, body cap exceeded, scanner I/O) returns the +// (still-truthful) Result alongside the wrapped error so the Handler can map +// it to a 5xx or 413. +func (s *Service) Ingest(ctx context.Context, owner string, body io.Reader, maxBytes int64) (Result, error) { + var ( + result Result + chunk = make([]wire.TurnEvent, 0, s.Cfg.ChunkSize) + startsAt = make([]int, 0, s.Cfg.ChunkSize) // line number of each buffered turn + lineNum int + ) + + commitChunk := func() error { + if len(chunk) == 0 { + return nil + } + firstLine := startsAt[0] + first := chunk[0] + + txErr := database.InTx(ctx, s.Repo.Database.DB, func(tx *sqlx.Tx) error { + return s.Repo.UpsertChunk(ctx, tx, owner, chunk) + }) + if txErr != nil { + s.Metrics.IngestLinesErrored.Inc() + s.Log.L.WarnContext(ctx, "ingest chunk commit failed", + slog.String("owner", owner), + slog.Int("line", firstLine), + slog.String("tool", first.Tool), + slog.String("host", first.Host), + slog.String("session_id", first.SessionID), + scribe.Err(txErr), + ) + return txErr + } + + n := len(chunk) + result.Accepted += n + s.Metrics.IngestLinesAccepted.Add(float64(n)) + s.Metrics.IngestChunksCommitted.Inc() + + // Reset buffers without releasing the underlying capacity. + chunk = chunk[:0] + startsAt = startsAt[:0] + return nil + } + + handleChunkErr := func(err error) (Result, error) { + // Take the failing chunk's first line BEFORE reset; commitChunk + // leaves buffers untouched on error so startsAt still has it. + failedLine := lineNum + if len(startsAt) > 0 { + failedLine = startsAt[0] + } + if isLineLevelDBError(err) { + result.Errors = append(result.Errors, LineError{ + Line: failedLine, + Err: err.Error(), + }) + return result, nil + } + return result, err + } + + for raw, scanErr := range httputil.ReadNDJSONLines(body, maxBytes) { + if scanErr != nil { + // Body cap or oversized line: caller maps to 413. + return result, scanErr + } + lineNum++ + + var event wire.TurnEvent + if err := json.Unmarshal(raw, &event); err != nil { + s.Metrics.IngestLinesErrored.Inc() + result.Errors = append(result.Errors, LineError{ + Line: lineNum, + Err: "invalid json: " + err.Error(), + }) + return result, nil + } + if err := validateTurn(event, s.Cfg.MaxTurnContentBytes); err != nil { + s.Metrics.IngestLinesErrored.Inc() + result.Errors = append(result.Errors, LineError{ + Line: lineNum, + Err: err.Error(), + }) + return result, nil + } + + chunk = append(chunk, event) + startsAt = append(startsAt, lineNum) + + if len(chunk) >= s.Cfg.ChunkSize { + if err := commitChunk(); err != nil { + return handleChunkErr(err) + } + } + } + + if err := commitChunk(); err != nil { + return handleChunkErr(err) + } + + return result, nil +} + +// isLineLevelDBError reports whether err describes a per-row failure (FK or +// constraint violation) that should be surfaced as a LineError in the 200 +// response, rather than escalated to a 5xx. The classification mirrors the +// codes Repository.UpsertChunk attaches via wrapSQLErr. +func isLineLevelDBError(err error) bool { + var cd culpa.CodeDetail + if !culpa.FindDetail(err, &cd) { + return false + } + code, ok := cd.Code.(string) + if !ok { + return false + } + switch code { + case "DB_FOREIGN_KEY", "DB_CONSTRAINT": + return true + } + return false +} diff --git a/internal/domain/ingest/service_test.go b/internal/domain/ingest/service_test.go new file mode 100644 index 0000000000000000000000000000000000000000..c366e6fa46ef89922d93f84edfbc09bb14dcf8ac --- /dev/null +++ b/internal/domain/ingest/service_test.go @@ -0,0 +1,424 @@ +package ingest_test + +import ( + "context" + "fmt" + "strings" + "testing" + + "sourcecraft.dev/bigbes/lethe/internal/config" + "sourcecraft.dev/bigbes/lethe/internal/domain/ingest" + "sourcecraft.dev/bigbes/lethe/internal/platform/database" + "sourcecraft.dev/bigbes/lethe/internal/platform/observability" +) + +// newService wires Service+Repository+Logger+Metrics against a fresh in-memory +// DB. Returns the Service and the underlying *Database for direct queries. +func newService(t *testing.T, cfg config.IngestConfig) (*ingest.Service, *database.Database) { + t.Helper() + d := newTestDatabase(t) + repo := &ingest.Repository{Database: d} + if err := repo.Init(context.Background()); err != nil { + t.Fatalf("repo.Init: %v", err) + } + logger := &observability.Logger{Cfg: config.LoggingConfig{Level: "info", Format: "json"}} + if err := logger.Init(context.Background()); err != nil { + t.Fatalf("logger.Init: %v", err) + } + metrics := &observability.Metrics{} + if err := metrics.Init(context.Background()); err != nil { + t.Fatalf("metrics.Init: %v", err) + } + svc := &ingest.Service{ + Cfg: cfg, + Repo: repo, + Log: logger, + Metrics: metrics, + } + if err := svc.Init(context.Background()); err != nil { + t.Fatalf("svc.Init: %v", err) + } + return svc, d +} + +// defaultCfg is the canonical IngestConfig used by most tests. +func defaultCfg() config.IngestConfig { + return config.IngestConfig{ + MaxBodyBytes: 1 << 20, // 1 MiB + MaxTurnContentBytes: 4096, + ChunkSize: 10, + } +} + +// validLine produces a single NDJSON line with the supplied turn_id; all +// other fields are minimal-but-valid. +func validLine(turnID string, ts int64, content string) string { + return fmt.Sprintf( + `{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":%q,"seq":1,"role":"user","timestamp":%d,"content":%q,"session_meta":{"source_file":"/tmp/x.jsonl"}}`, + turnID, ts, content, + ) +} + +func body(lines ...string) *strings.Reader { + return strings.NewReader(strings.Join(lines, "\n") + "\n") +} + +// --- validation matrix ---------------------------------------------------- + +func TestService_Ingest_RequiredFieldsRejected(t *testing.T) { + cases := []struct { + name string + line string + }{ + {"missing tool", `{"host":"phoebe","session_id":"s1","turn_id":"t","seq":1,"role":"user","timestamp":1,"content":"x","session_meta":{"source_file":"/x"}}`}, + {"missing host", `{"tool":"cc","session_id":"s1","turn_id":"t","seq":1,"role":"user","timestamp":1,"content":"x","session_meta":{"source_file":"/x"}}`}, + {"missing session_id", `{"tool":"cc","host":"phoebe","turn_id":"t","seq":1,"role":"user","timestamp":1,"content":"x","session_meta":{"source_file":"/x"}}`}, + {"missing turn_id", `{"tool":"cc","host":"phoebe","session_id":"s1","seq":1,"role":"user","timestamp":1,"content":"x","session_meta":{"source_file":"/x"}}`}, + {"missing role", `{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":"t","seq":1,"timestamp":1,"content":"x","session_meta":{"source_file":"/x"}}`}, + {"missing timestamp", `{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":"t","seq":1,"role":"user","content":"x","session_meta":{"source_file":"/x"}}`}, + {"missing content", `{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":"t","seq":1,"role":"user","timestamp":1,"session_meta":{"source_file":"/x"}}`}, + {"missing source_file", `{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":"t","seq":1,"role":"user","timestamp":1,"content":"x","session_meta":{}}`}, + {"bad role", `{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":"t","seq":1,"role":"hacker","timestamp":1,"content":"x","session_meta":{"source_file":"/x"}}`}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + svc, _ := newService(t, defaultCfg()) + res, err := svc.Ingest(context.Background(), "alice", strings.NewReader(tc.line+"\n"), 1<<20) + if err != nil { + t.Fatalf("Ingest err: %v", err) + } + if res.Accepted != 0 { + t.Errorf("Accepted=%d; want 0", res.Accepted) + } + if len(res.Errors) != 1 || res.Errors[0].Line != 1 { + t.Fatalf("expected one LineError on line 1, got %+v", res.Errors) + } + }) + } +} + +func TestService_Ingest_ContentOverCapRejected(t *testing.T) { + cfg := defaultCfg() + cfg.MaxTurnContentBytes = 16 + svc, _ := newService(t, cfg) + line := validLine("tA", 1700000000, strings.Repeat("a", 17)) + res, err := svc.Ingest(context.Background(), "alice", strings.NewReader(line+"\n"), 1<<20) + if err != nil { + t.Fatalf("Ingest: %v", err) + } + if res.Accepted != 0 || len(res.Errors) != 1 { + t.Fatalf("expected 0 accepted + 1 error, got %+v", res) + } +} + +func TestService_Ingest_SourceFileOverCapRejected(t *testing.T) { + svc, _ := newService(t, defaultCfg()) + long := strings.Repeat("p", 1025) + line := fmt.Sprintf( + `{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":"t","seq":1,"role":"user","timestamp":1,"content":"x","session_meta":{"source_file":%q}}`, + long, + ) + res, err := svc.Ingest(context.Background(), "alice", strings.NewReader(line+"\n"), 1<<20) + if err != nil { + t.Fatalf("Ingest: %v", err) + } + if res.Accepted != 0 || len(res.Errors) != 1 { + t.Fatalf("expected 0 accepted + 1 error, got %+v", res) + } +} + +func TestService_Ingest_ValidationFailureCommitsPriorChunks(t *testing.T) { + cfg := defaultCfg() + cfg.ChunkSize = 2 + svc, d := newService(t, cfg) + + // 2 valid lines (commit as chunk 1) + a third bad line. + lines := []string{ + validLine("t1", 1700000000, "one"), + validLine("t2", 1700000001, "two"), + `{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":"t3","seq":1,"role":"hacker","timestamp":1,"content":"x","session_meta":{"source_file":"/x"}}`, + } + res, err := svc.Ingest(context.Background(), "alice", body(lines...), 1<<20) + if err != nil { + t.Fatalf("Ingest: %v", err) + } + if res.Accepted != 2 { + t.Errorf("Accepted=%d; want 2", res.Accepted) + } + if len(res.Errors) != 1 || res.Errors[0].Line != 3 { + t.Fatalf("expected one error on line 3, got %+v", res.Errors) + } + var n int + if err := d.DB.Get(&n, `SELECT COUNT(*) FROM turns WHERE owner='alice'`); err != nil { + t.Fatalf("q: %v", err) + } + if n != 2 { + t.Errorf("expected 2 committed turns, got %d", n) + } +} + +// --- semantics ------------------------------------------------------------ + +func TestService_Ingest_Idempotent(t *testing.T) { + svc, d := newService(t, defaultCfg()) + lines := []string{ + validLine("t1", 1700000000, "alpha"), + validLine("t2", 1700000001, "beta"), + } + for i := 0; i < 2; i++ { + res, err := svc.Ingest(context.Background(), "alice", body(lines...), 1<<20) + if err != nil { + t.Fatalf("ingest %d: %v", i, err) + } + if res.Accepted != 2 { + t.Errorf("Accepted=%d; want 2 on iteration %d", res.Accepted, i) + } + } + var rows int + if err := d.DB.Get(&rows, `SELECT COUNT(*) FROM turns`); err != nil { + t.Fatalf("q: %v", err) + } + if rows != 2 { + t.Errorf("expected 2 turns after re-ingest, got %d", rows) + } + var sessions int + if err := d.DB.Get(&sessions, `SELECT COUNT(*) FROM sessions`); err != nil { + t.Fatalf("q: %v", err) + } + if sessions != 1 { + t.Errorf("expected 1 session, got %d", sessions) + } +} + +func TestService_Ingest_FirstWriteWinsSession(t *testing.T) { + svc, d := newService(t, defaultCfg()) + line1 := `{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":"t1","seq":1,"role":"user","timestamp":1700000000,"content":"x","session_meta":{"source_file":"/A","working_dir":"/A"}}` + if _, err := svc.Ingest(context.Background(), "alice", strings.NewReader(line1+"\n"), 1<<20); err != nil { + t.Fatalf("ingest 1: %v", err) + } + line2 := `{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":"t2","seq":2,"role":"user","timestamp":1700000100,"content":"y","session_meta":{"source_file":"/B","working_dir":"/B"}}` + if _, err := svc.Ingest(context.Background(), "alice", strings.NewReader(line2+"\n"), 1<<20); err != nil { + t.Fatalf("ingest 2: %v", err) + } + + var wd, sf string + if err := d.DB.QueryRow(`SELECT working_dir, source_file FROM sessions WHERE owner=? AND session_id=?`, + "alice", "s1").Scan(&wd, &sf); err != nil { + t.Fatalf("q: %v", err) + } + if wd != "/A" || sf != "/A" { + t.Errorf("first-write-wins broken; working_dir=%q source_file=%q", wd, sf) + } +} + +func TestService_Ingest_LastWriteWinsTurn(t *testing.T) { + svc, d := newService(t, defaultCfg()) + if _, err := svc.Ingest(context.Background(), "alice", + strings.NewReader(validLine("t1", 1700000000, "first")+"\n"), 1<<20); err != nil { + t.Fatalf("ingest 1: %v", err) + } + if _, err := svc.Ingest(context.Background(), "alice", + strings.NewReader(validLine("t1", 1700000100, "second")+"\n"), 1<<20); err != nil { + t.Fatalf("ingest 2: %v", err) + } + var content string + if err := d.DB.Get(&content, `SELECT content FROM turns WHERE turn_id='t1'`); err != nil { + t.Fatalf("q: %v", err) + } + if content != "second" { + t.Errorf("last-write-wins broken; content=%q", content) + } +} + +func TestService_Ingest_EndedAtExtends(t *testing.T) { + svc, d := newService(t, defaultCfg()) + if _, err := svc.Ingest(context.Background(), "alice", + strings.NewReader(validLine("t1", 1700000000, "x")+"\n"), 1<<20); err != nil { + t.Fatalf("ingest 1: %v", err) + } + if _, err := svc.Ingest(context.Background(), "alice", + strings.NewReader(validLine("t2", 1700000500, "y")+"\n"), 1<<20); err != nil { + t.Fatalf("ingest 2: %v", err) + } + var ended int64 + if err := d.DB.Get(&ended, `SELECT ended_at FROM sessions WHERE owner=? AND session_id=?`, "alice", "s1"); err != nil { + t.Fatalf("q: %v", err) + } + if ended != 1700000500 { + t.Errorf("expected ended_at=1700000500, got %d", ended) + } +} + +func TestService_Ingest_StartedAtFallback(t *testing.T) { + svc, d := newService(t, defaultCfg()) + // SessionMeta has no started_at; first turn timestamp wins. + if _, err := svc.Ingest(context.Background(), "alice", + strings.NewReader(validLine("t1", 1700000123, "x")+"\n"), 1<<20); err != nil { + t.Fatalf("ingest: %v", err) + } + var started int64 + if err := d.DB.Get(&started, `SELECT started_at FROM sessions WHERE owner=? AND session_id=?`, "alice", "s1"); err != nil { + t.Fatalf("q: %v", err) + } + if started != 1700000123 { + t.Errorf("expected started_at=1700000123, got %d", started) + } +} + +func TestService_Ingest_ChunkedPartialAcceptBadLineInChunk3(t *testing.T) { + cfg := defaultCfg() + cfg.ChunkSize = 2 + svc, d := newService(t, cfg) + + lines := []string{ + validLine("t1", 1700000000, "a"), + validLine("t2", 1700000001, "b"), + validLine("t3", 1700000002, "c"), + validLine("t4", 1700000003, "d"), + // Bad line in chunk 3 (line index 5). + `{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":"t5","seq":1,"role":"hacker","timestamp":1,"content":"x","session_meta":{"source_file":"/x"}}`, + validLine("t6", 1700000005, "f"), + } + res, err := svc.Ingest(context.Background(), "alice", body(lines...), 1<<20) + if err != nil { + t.Fatalf("ingest: %v", err) + } + if res.Accepted != 4 { + t.Errorf("Accepted=%d; want 4 (2 * chunkSize)", res.Accepted) + } + if len(res.Errors) != 1 || res.Errors[0].Line != 5 { + t.Fatalf("expected one error referencing line 5, got %+v", res.Errors) + } + // Line 6 must not be processed. + var n int + if err := d.DB.Get(&n, `SELECT COUNT(*) FROM turns WHERE turn_id='t6'`); err != nil { + t.Fatalf("q: %v", err) + } + if n != 0 { + t.Errorf("expected line 6 not processed, found %d rows", n) + } + if err := d.DB.Get(&n, `SELECT COUNT(*) FROM turns`); err != nil { + t.Fatalf("q: %v", err) + } + if n != 4 { + t.Errorf("expected 4 committed turns, got %d", n) + } +} + +func TestService_Ingest_PerUserIsolation(t *testing.T) { + svc, d := newService(t, defaultCfg()) + lines := []string{validLine("t1", 1700000000, "x"), validLine("t2", 1700000001, "y")} + if _, err := svc.Ingest(context.Background(), "alice", body(lines...), 1<<20); err != nil { + t.Fatalf("alice ingest: %v", err) + } + if _, err := svc.Ingest(context.Background(), "bob", body(lines...), 1<<20); err != nil { + t.Fatalf("bob ingest: %v", err) + } + + var n int + if err := d.DB.Get(&n, `SELECT COUNT(*) FROM sessions`); err != nil { + t.Fatalf("q sessions: %v", err) + } + if n != 2 { + t.Errorf("expected 2 sessions across owners, got %d", n) + } + if err := d.DB.Get(&n, `SELECT COUNT(*) FROM turns`); err != nil { + t.Fatalf("q turns: %v", err) + } + if n != 4 { + t.Errorf("expected 4 turns across owners, got %d", n) + } + + // Mutating bob's session must not touch alice's row. + if _, err := d.DB.Exec(`UPDATE sessions SET ended_at = 1 WHERE owner='bob'`); err != nil { + t.Fatalf("bob mutate: %v", err) + } + var aliceEnded int64 + if err := d.DB.Get(&aliceEnded, `SELECT ended_at FROM sessions WHERE owner='alice'`); err != nil { + t.Fatalf("q alice: %v", err) + } + if aliceEnded == 1 { + t.Errorf("bob's update bled into alice's row") + } +} + +func TestService_Ingest_WireOwnerIgnored(t *testing.T) { + svc, d := newService(t, defaultCfg()) + // Stray "owner":"evil" in the wire payload — must be dropped silently. + line := `{"owner":"evil","tool":"cc","host":"phoebe","session_id":"s1","turn_id":"t1","seq":1,"role":"user","timestamp":1700000000,"content":"x","session_meta":{"source_file":"/x"}}` + if _, err := svc.Ingest(context.Background(), "alice", strings.NewReader(line+"\n"), 1<<20); err != nil { + t.Fatalf("ingest: %v", err) + } + var owner string + if err := d.DB.Get(&owner, `SELECT owner FROM turns WHERE turn_id='t1'`); err != nil { + t.Fatalf("q: %v", err) + } + if owner != "alice" { + t.Fatalf("expected owner=alice, got %q", owner) + } + var n int + if err := d.DB.Get(&n, `SELECT COUNT(*) FROM turns WHERE owner='evil'`); err != nil { + t.Fatalf("q evil: %v", err) + } + if n != 0 { + t.Fatalf("expected zero rows for owner=evil, got %d", n) + } +} + +func TestService_Ingest_FTSAfterIngest(t *testing.T) { + svc, d := newService(t, defaultCfg()) + line := `{"tool":"cc","host":"phoebe","session_id":"s1","turn_id":"tA","seq":1,"role":"user","timestamp":1700000000,"content":"unmistakable haystack","tool_calls":{"name":"shellexec"},"session_meta":{"source_file":"/x"}}` + if _, err := svc.Ingest(context.Background(), "alice", strings.NewReader(line+"\n"), 1<<20); err != nil { + t.Fatalf("ingest: %v", err) + } + var n int + if err := d.DB.Get(&n, `SELECT COUNT(*) FROM turns_fts WHERE owner='alice' AND turns_fts MATCH 'unmistakable'`); err != nil { + t.Fatalf("q turns_fts: %v", err) + } + if n != 1 { + t.Fatalf("expected 1 turns_fts hit, got %d", n) + } + if err := d.DB.Get(&n, `SELECT COUNT(*) FROM tool_outputs_fts WHERE owner='alice' AND tool_outputs_fts MATCH 'shellexec'`); err != nil { + t.Fatalf("q tool_outputs_fts: %v", err) + } + if n != 1 { + t.Fatalf("expected 1 tool_outputs_fts hit, got %d", n) + } +} + +func TestService_Ingest_DBDownReturnsError(t *testing.T) { + svc, d := newService(t, defaultCfg()) + // Close the underlying *sql.DB without nilling the field; the next + // BeginTx returns sql.ErrConnDone (or similar), simulating a real + // runtime "database closed" failure without crashing the test. + if err := d.DB.Close(); err != nil { + t.Fatalf("close: %v", err) + } + res, err := svc.Ingest(context.Background(), "alice", + strings.NewReader(validLine("t1", 1700000000, "x")+"\n"), 1<<20) + if err == nil { + t.Fatalf("expected error, got nil; result=%+v", res) + } + if res.Accepted != 0 { + t.Errorf("expected Accepted=0 on DB-down, got %d", res.Accepted) + } +} + +func TestService_Ingest_BodyOverCapReturnsTooLarge(t *testing.T) { + cfg := defaultCfg() + cfg.MaxBodyBytes = 100 + svc, _ := newService(t, cfg) + // 1 KiB payload that does not fit. + line := validLine("t1", 1700000000, strings.Repeat("a", 1024)) + res, err := svc.Ingest(context.Background(), "alice", strings.NewReader(line+"\n"), cfg.MaxBodyBytes) + if err == nil { + t.Fatalf("expected TOO_LARGE error, got nil; result=%+v", res) + } + // We don't import apierror here; just assert err contains the message. + if !strings.Contains(err.Error(), "limit") && !strings.Contains(err.Error(), "long") { + t.Errorf("expected too-large/limit error, got %v", err) + } + if res.Accepted != 0 { + t.Errorf("Accepted=%d; want 0 on body cap", res.Accepted) + } +} diff --git a/internal/pkg/apierror/apierror.go b/internal/pkg/apierror/apierror.go index 357cb539c9402c15e82163ecb95d09947364867b..d8520a4bad69027f2196baf3040ef9aae7206a65 100644 --- a/internal/pkg/apierror/apierror.go +++ b/internal/pkg/apierror/apierror.go @@ -52,6 +52,9 @@ var codeStatus = map[string]int{ "FORBIDDEN": http.StatusForbidden, "CONFLICT": http.StatusConflict, "TOO_LARGE": http.StatusRequestEntityTooLarge, + // UNSUPPORTED_MEDIA_TYPE is used by the ingest handler when the request's + // Content-Type is not the canonical NDJSON media type. Added in Phase 7. + "UNSUPPORTED_MEDIA_TYPE": http.StatusUnsupportedMediaType, } // Render is the single boundary that translates a culpa-wrapped error into