package ingest import ( "context" "encoding/json" "io" "log/slog" "github.com/jmoiron/sqlx" "go.bigb.es/auxilia/culpa" "go.bigb.es/auxilia/scribe" "sourcecraft.dev/bigbes/lethe/internal/config" "sourcecraft.dev/bigbes/lethe/internal/pkg/httputil" "sourcecraft.dev/bigbes/lethe/internal/platform/database" "sourcecraft.dev/bigbes/lethe/internal/platform/observability" "sourcecraft.dev/bigbes/lethe/internal/shared/wire" ) // validRoles is the closed set of values accepted in TurnEvent.Role. Phase 7 // validates against this list before reaching the database. var validRoles = map[string]struct{}{ "user": {}, "assistant": {}, "tool": {}, "system": {}, } // maxSourceFileBytes caps the per-line SessionMeta.SourceFile length. The // server treats source_file as opaque, but extreme values are still rejected // to keep storage and logs sane. const maxSourceFileBytes = 1024 // LineError describes a single NDJSON line that failed validation, JSON // parsing, or chunk commit. It is stable wire output: the handler embeds the // list directly into the 200 response body. type LineError struct { Line int `json:"line"` Err string `json:"error"` } // Result is the outcome of one Ingest call. Accepted is the count of turns // actually committed to the database; Errors lists per-line failures in the // order they occurred. Per the spec the ingest endpoint returns 200 even when // Errors is non-empty — the client uses Accepted to advance its offset. type Result struct { Accepted int `json:"accepted"` Errors []LineError `json:"errors"` } // Service is the steward-managed ingest pipeline. Repo is the SQL boundary; // Cfg fixes the chunk size and content-byte cap; Log emits per-chunk failure // notices at WARN; Metrics counts accepted/errored lines and committed chunks. type Service struct { Cfg config.IngestConfig `config:""` Repo *Repository `inject:""` Log *observability.Logger `inject:""` Metrics *observability.Metrics `inject:""` } // Init satisfies the steward Initer contract. The Service is stateless beyond // its injected dependencies. func (s *Service) Init(_ context.Context) error { return nil } // validateTurn enforces the wire contract on a single decoded TurnEvent. It // returns a culpa-coded INVALID error on the first failure; the Service maps // that into a per-line LineError. Seq is intentionally unchecked: zero is a // legal first sequence number on the wire. func validateTurn(t wire.TurnEvent, maxContentBytes int64) error { if t.Tool == "" { return culpa.WithCode(culpa.New("tool is required"), "INVALID") } if t.Host == "" { return culpa.WithCode(culpa.New("host is required"), "INVALID") } if t.SessionID == "" { return culpa.WithCode(culpa.New("session_id is required"), "INVALID") } if t.TurnID == "" { return culpa.WithCode(culpa.New("turn_id is required"), "INVALID") } if t.Role == "" { return culpa.WithCode(culpa.New("role is required"), "INVALID") } if _, ok := validRoles[t.Role]; !ok { return culpa.WithCode(culpa.Errorf("role %q is not one of {user, assistant, tool, system}", t.Role), "INVALID") } if t.Timestamp == 0 { return culpa.WithCode(culpa.New("timestamp is required"), "INVALID") } if t.Content == "" { return culpa.WithCode(culpa.New("content is required"), "INVALID") } if int64(len(t.Content)) > maxContentBytes { return culpa.WithCode(culpa.Errorf("content exceeds %d bytes", maxContentBytes), "INVALID") } if t.SessionMeta.SourceFile == "" { return culpa.WithCode(culpa.New("session_meta.source_file is required"), "INVALID") } if len(t.SessionMeta.SourceFile) > maxSourceFileBytes { return culpa.WithCode(culpa.Errorf("session_meta.source_file exceeds %d bytes", maxSourceFileBytes), "INVALID") } return nil } // Ingest streams NDJSON from body, validates each line, and commits in chunks // of Cfg.ChunkSize under one transaction per chunk. // // Partial-accept semantics: lines that committed in earlier chunks remain // committed even when a later line fails — Accepted reflects what is actually // in the DB. A per-line failure (JSON parse, validation, FK) appends a // LineError and stops processing; subsequent lines are not touched. A // non-line failure (DB closed, body cap exceeded, scanner I/O) returns the // (still-truthful) Result alongside the wrapped error so the Handler can map // it to a 5xx or 413. func (s *Service) Ingest(ctx context.Context, owner string, body io.Reader, maxBytes int64) (Result, error) { var ( result Result chunk = make([]wire.TurnEvent, 0, s.Cfg.ChunkSize) startsAt = make([]int, 0, s.Cfg.ChunkSize) // line number of each buffered turn lineNum int ) commitChunk := func() error { if len(chunk) == 0 { return nil } firstLine := startsAt[0] first := chunk[0] txErr := database.InTx(ctx, s.Repo.Database.DB, func(tx *sqlx.Tx) error { return s.Repo.UpsertChunk(ctx, tx, owner, chunk) }) if txErr != nil { s.Metrics.IngestLinesErrored.Inc() s.Log.L.WarnContext(ctx, "ingest chunk commit failed", slog.String("owner", owner), slog.Int("line", firstLine), slog.String("tool", first.Tool), slog.String("host", first.Host), slog.String("session_id", first.SessionID), scribe.Err(txErr), ) return txErr } n := len(chunk) result.Accepted += n s.Metrics.IngestLinesAccepted.Add(float64(n)) s.Metrics.IngestChunksCommitted.Inc() // Reset buffers without releasing the underlying capacity. chunk = chunk[:0] startsAt = startsAt[:0] return nil } handleChunkErr := func(err error) (Result, error) { // Take the failing chunk's first line BEFORE reset; commitChunk // leaves buffers untouched on error so startsAt still has it. failedLine := lineNum if len(startsAt) > 0 { failedLine = startsAt[0] } if isLineLevelDBError(err) { result.Errors = append(result.Errors, LineError{ Line: failedLine, Err: err.Error(), }) return result, nil } return result, err } for raw, scanErr := range httputil.ReadNDJSONLines(body, maxBytes) { if scanErr != nil { // Body cap or oversized line: caller maps to 413. return result, scanErr } lineNum++ var event wire.TurnEvent if err := json.Unmarshal(raw, &event); err != nil { s.Metrics.IngestLinesErrored.Inc() result.Errors = append(result.Errors, LineError{ Line: lineNum, Err: "invalid json: " + err.Error(), }) return result, nil } if err := validateTurn(event, s.Cfg.MaxTurnContentBytes); err != nil { s.Metrics.IngestLinesErrored.Inc() result.Errors = append(result.Errors, LineError{ Line: lineNum, Err: err.Error(), }) return result, nil } chunk = append(chunk, event) startsAt = append(startsAt, lineNum) if len(chunk) >= s.Cfg.ChunkSize { if err := commitChunk(); err != nil { return handleChunkErr(err) } } } if err := commitChunk(); err != nil { return handleChunkErr(err) } return result, nil } // isLineLevelDBError reports whether err describes a per-row failure (FK or // constraint violation) that should be surfaced as a LineError in the 200 // response, rather than escalated to a 5xx. The classification mirrors the // codes Repository.UpsertChunk attaches via wrapSQLErr. func isLineLevelDBError(err error) bool { var cd culpa.CodeDetail if !culpa.FindDetail(err, &cd) { return false } code, ok := cd.Code.(string) if !ok { return false } switch code { case "DB_FOREIGN_KEY", "DB_CONSTRAINT": return true } return false }