// Package ingest implements the NDJSON ingest pipeline for lethe: per-line // validation against the wire contract, chunked transactional commit with // per-line partial-accept semantics, and the trust-model invariant that the // stored owner is always the authenticated user (never the wire payload). // // Layering inside the package: // - Repository owns the raw SQL: per-chunk UPSERT of sessions/turns inside a // caller-supplied transaction. It does not begin or commit the tx. // - Service owns the streaming, validation, and chunking policy. It opens // and commits one tx per chunk, increments metrics, and converts SQL // errors into per-line LineError entries (or hard 5xx for non-line // failures like a closed DB). // - Handler is the HTTP boundary: enforces Content-Type, body cap, and // auth-derived owner; renders RFC 7807 problems via apierror.Render. package ingest import ( "context" "errors" "github.com/jmoiron/sqlx" "go.bigb.es/auxilia/culpa" "modernc.org/sqlite" sqlite3 "modernc.org/sqlite/lib" "sourcecraft.dev/bigbes/lethe/internal/platform/database" "sourcecraft.dev/bigbes/lethe/internal/shared/wire" ) // Repository is the steward-managed SQL steward for ingest. It is stateless; // Init is empty because every UpsertChunk call gets its tx from the Service. type Repository struct { Database *database.Database `inject:""` } // Init satisfies the steward Initer contract. The Repository has no state to // build up — the database is owned by the Database steward and injected // directly. func (r *Repository) Init(_ context.Context) error { return nil } // upsertSessionStmt UPSERTs a row in `sessions` keyed by the composite PK // (owner, tool, host, session_id). On conflict only `ended_at` is widened // (MAX), preserving the first-write-wins values for `started_at`, `working_dir`, // `source_file`, and `metadata`. SQLite leaves columns not in the SET list // unchanged on the existing row when DO UPDATE fires. const upsertSessionStmt = ` INSERT INTO sessions (owner, tool, host, session_id, started_at, ended_at, working_dir, source_file, metadata) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT (owner, tool, host, session_id) DO UPDATE SET ended_at = MAX(sessions.ended_at, excluded.ended_at) ` // upsertTurnStmt UPSERTs a row in `turns` keyed by the composite PK // (owner, tool, host, session_id, turn_id). Last-write-wins: every non-key // column is overwritten from the incoming row. const upsertTurnStmt = ` INSERT INTO turns (owner, tool, host, session_id, turn_id, seq, role, timestamp, content, model, tokens_in, tokens_out, cost_usd, tool_calls, metadata) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT (owner, tool, host, session_id, turn_id) DO UPDATE SET seq = excluded.seq, role = excluded.role, timestamp = excluded.timestamp, content = excluded.content, model = excluded.model, tokens_in = excluded.tokens_in, tokens_out = excluded.tokens_out, cost_usd = excluded.cost_usd, tool_calls = excluded.tool_calls, metadata = excluded.metadata ` // UpsertChunk writes every turn (and its parent session) in chunk under the // caller-supplied transaction. The tx is owned by the caller (the Service): // UpsertChunk neither begins nor commits it. // // owner is bound from the function parameter on every INSERT — never from the // wire payload. wire.TurnEvent has no `owner` JSON field; even if a hostile // client sneaks a stray `"owner":"evil"` into the line, json.Unmarshal drops // it silently and the SQL below never references it. This invariant is the // core of the multi-tenant trust model. func (r *Repository) UpsertChunk(ctx context.Context, tx *sqlx.Tx, owner string, chunk []wire.TurnEvent) error { for _, t := range chunk { startedAt := t.Timestamp if t.SessionMeta.StartedAt != nil { startedAt = *t.SessionMeta.StartedAt } // owner is bound from the function parameter, not the wire payload (trust model invariant). if _, err := tx.ExecContext(ctx, upsertSessionStmt, owner, t.Tool, t.Host, t.SessionID, startedAt, t.Timestamp, // ended_at first-write equals started turn timestamp; widened by MAX on conflict. t.SessionMeta.WorkingDir, t.SessionMeta.SourceFile, rawMessageOrNil(t.SessionMeta.Metadata), ); err != nil { return wrapSQLErr(err, "upsert session") } // owner is bound from the function parameter, not the wire payload (trust model invariant). if _, err := tx.ExecContext(ctx, upsertTurnStmt, owner, t.Tool, t.Host, t.SessionID, t.TurnID, t.Seq, t.Role, t.Timestamp, t.Content, t.Model, t.TokensIn, t.TokensOut, t.CostUSD, rawMessageOrNil(t.ToolCalls), rawMessageOrNil(t.Metadata), ); err != nil { return wrapSQLErr(err, "upsert turn") } } return nil } // rawMessageOrNil normalizes a json.RawMessage to a SQL NULL when empty so the // stored value is a real NULL (not the literal string "null" or ""). func rawMessageOrNil(m []byte) any { if len(m) == 0 { return nil } return string(m) } // wrapSQLErr classifies SQLite-driver errors so the Service can decide // whether to attribute them to the offending line (FK / constraint failures) // or to surface them as a hard 5xx (DB closed, IO error). Any unrecognized // failure is wrapped under DB_UPSERT. func wrapSQLErr(err error, msg string) error { var se *sqlite.Error if errors.As(err, &se) { switch se.Code() { case sqlite3.SQLITE_CONSTRAINT_FOREIGNKEY: return culpa.WithCode(culpa.Wrap(err, msg), "DB_FOREIGN_KEY") case sqlite3.SQLITE_CONSTRAINT, sqlite3.SQLITE_CONSTRAINT_CHECK, sqlite3.SQLITE_CONSTRAINT_NOTNULL, sqlite3.SQLITE_CONSTRAINT_PRIMARYKEY, sqlite3.SQLITE_CONSTRAINT_UNIQUE: return culpa.WithCode(culpa.Wrap(err, msg), "DB_CONSTRAINT") } } return culpa.WithCode(culpa.Wrap(err, msg), "DB_UPSERT") }