// Package ingest implements the NDJSON ingest pipeline for lethe: per-line
// validation against the wire contract, chunked transactional commit with
// per-line partial-accept semantics, and the trust-model invariant that the
// stored owner is always the authenticated user (never the wire payload).
//
// Layering inside the package:
// - Repository owns the raw SQL: per-chunk UPSERT of sessions/turns inside a
// caller-supplied transaction. It does not begin or commit the tx.
// - Service owns the streaming, validation, and chunking policy. It opens
// and commits one tx per chunk, increments metrics, and converts SQL
// errors into per-line LineError entries (or hard 5xx for non-line
// failures like a closed DB).
// - Handler is the HTTP boundary: enforces Content-Type, body cap, and
// auth-derived owner; renders RFC 7807 problems via apierror.Render.
package ingest
import (
"context"
"errors"
"github.com/jmoiron/sqlx"
"go.bigb.es/auxilia/culpa"
"modernc.org/sqlite"
sqlite3 "modernc.org/sqlite/lib"
"sourcecraft.dev/bigbes/lethe/internal/platform/database"
"sourcecraft.dev/bigbes/lethe/internal/shared/wire"
)
// Repository is the steward-managed SQL steward for ingest. It is stateless;
// Init is empty because every UpsertChunk call gets its tx from the Service.
type Repository struct {
Database *database.Database `inject:""`
}
// Init satisfies the steward Initer contract. The Repository has no state to
// build up — the database is owned by the Database steward and injected
// directly.
func (r *Repository) Init(_ context.Context) error { return nil }
// upsertSessionStmt UPSERTs a row in `sessions` keyed by the composite PK
// (owner, tool, host, session_id). On conflict only `ended_at` is widened
// (MAX), preserving the first-write-wins values for `started_at`, `working_dir`,
// `source_file`, and `metadata`. SQLite leaves columns not in the SET list
// unchanged on the existing row when DO UPDATE fires.
const upsertSessionStmt = `
INSERT INTO sessions
(owner, tool, host, session_id, started_at, ended_at, working_dir, source_file, metadata)
VALUES
(?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (owner, tool, host, session_id) DO UPDATE SET
ended_at = MAX(sessions.ended_at, excluded.ended_at)
`
// upsertTurnStmt UPSERTs a row in `turns` keyed by the composite PK
// (owner, tool, host, session_id, turn_id). Last-write-wins: every non-key
// column is overwritten from the incoming row.
const upsertTurnStmt = `
INSERT INTO turns
(owner, tool, host, session_id, turn_id, seq, role, timestamp,
content, model, tokens_in, tokens_out, cost_usd, tool_calls, metadata)
VALUES
(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (owner, tool, host, session_id, turn_id) DO UPDATE SET
seq = excluded.seq,
role = excluded.role,
timestamp = excluded.timestamp,
content = excluded.content,
model = excluded.model,
tokens_in = excluded.tokens_in,
tokens_out = excluded.tokens_out,
cost_usd = excluded.cost_usd,
tool_calls = excluded.tool_calls,
metadata = excluded.metadata
`
// UpsertChunk writes every turn (and its parent session) in chunk under the
// caller-supplied transaction. The tx is owned by the caller (the Service):
// UpsertChunk neither begins nor commits it.
//
// owner is bound from the function parameter on every INSERT — never from the
// wire payload. wire.TurnEvent has no `owner` JSON field; even if a hostile
// client sneaks a stray `"owner":"evil"` into the line, json.Unmarshal drops
// it silently and the SQL below never references it. This invariant is the
// core of the multi-tenant trust model.
func (r *Repository) UpsertChunk(ctx context.Context, tx *sqlx.Tx, owner string, chunk []wire.TurnEvent) error {
for _, t := range chunk {
startedAt := t.Timestamp
if t.SessionMeta.StartedAt != nil {
startedAt = *t.SessionMeta.StartedAt
}
// owner is bound from the function parameter, not the wire payload (trust model invariant).
if _, err := tx.ExecContext(ctx, upsertSessionStmt,
owner, t.Tool, t.Host, t.SessionID,
startedAt,
t.Timestamp, // ended_at first-write equals started turn timestamp; widened by MAX on conflict.
t.SessionMeta.WorkingDir,
t.SessionMeta.SourceFile,
rawMessageOrNil(t.SessionMeta.Metadata),
); err != nil {
return wrapSQLErr(err, "upsert session")
}
// owner is bound from the function parameter, not the wire payload (trust model invariant).
if _, err := tx.ExecContext(ctx, upsertTurnStmt,
owner, t.Tool, t.Host, t.SessionID, t.TurnID,
t.Seq, t.Role, t.Timestamp, t.Content,
t.Model, t.TokensIn, t.TokensOut, t.CostUSD,
rawMessageOrNil(t.ToolCalls),
rawMessageOrNil(t.Metadata),
); err != nil {
return wrapSQLErr(err, "upsert turn")
}
}
return nil
}
// rawMessageOrNil normalizes a json.RawMessage to a SQL NULL when empty so the
// stored value is a real NULL (not the literal string "null" or "").
func rawMessageOrNil(m []byte) any {
if len(m) == 0 {
return nil
}
return string(m)
}
// wrapSQLErr classifies SQLite-driver errors so the Service can decide
// whether to attribute them to the offending line (FK / constraint failures)
// or to surface them as a hard 5xx (DB closed, IO error). Any unrecognized
// failure is wrapped under DB_UPSERT.
func wrapSQLErr(err error, msg string) error {
var se *sqlite.Error
if errors.As(err, &se) {
switch se.Code() {
case sqlite3.SQLITE_CONSTRAINT_FOREIGNKEY:
return culpa.WithCode(culpa.Wrap(err, msg), "DB_FOREIGN_KEY")
case sqlite3.SQLITE_CONSTRAINT,
sqlite3.SQLITE_CONSTRAINT_CHECK,
sqlite3.SQLITE_CONSTRAINT_NOTNULL,
sqlite3.SQLITE_CONSTRAINT_PRIMARYKEY,
sqlite3.SQLITE_CONSTRAINT_UNIQUE:
return culpa.WithCode(culpa.Wrap(err, msg), "DB_CONSTRAINT")
}
}
return culpa.WithCode(culpa.Wrap(err, msg), "DB_UPSERT")
}