package ingest
import (
"context"
"encoding/json"
"io"
"log/slog"
"github.com/jmoiron/sqlx"
"go.bigb.es/auxilia/culpa"
"go.bigb.es/auxilia/scribe"
"sourcecraft.dev/bigbes/lethe/internal/config"
"sourcecraft.dev/bigbes/lethe/internal/pkg/httputil"
"sourcecraft.dev/bigbes/lethe/internal/platform/database"
"sourcecraft.dev/bigbes/lethe/internal/platform/observability"
"sourcecraft.dev/bigbes/lethe/internal/shared/wire"
)
// validRoles is the closed set of values accepted in TurnEvent.Role. Phase 7
// validates against this list before reaching the database.
var validRoles = map[string]struct{}{
"user": {},
"assistant": {},
"tool": {},
"system": {},
}
// maxSourceFileBytes caps the per-line SessionMeta.SourceFile length. The
// server treats source_file as opaque, but extreme values are still rejected
// to keep storage and logs sane.
const maxSourceFileBytes = 1024
// LineError describes a single NDJSON line that failed validation, JSON
// parsing, or chunk commit. It is stable wire output: the handler embeds the
// list directly into the 200 response body.
type LineError struct {
Line int `json:"line"`
Err string `json:"error"`
}
// Result is the outcome of one Ingest call. Accepted is the count of turns
// actually committed to the database; Errors lists per-line failures in the
// order they occurred. Per the spec the ingest endpoint returns 200 even when
// Errors is non-empty — the client uses Accepted to advance its offset.
type Result struct {
Accepted int `json:"accepted"`
Errors []LineError `json:"errors"`
}
// Service is the steward-managed ingest pipeline. Repo is the SQL boundary;
// Cfg fixes the chunk size and content-byte cap; Log emits per-chunk failure
// notices at WARN; Metrics counts accepted/errored lines and committed chunks.
type Service struct {
Cfg config.IngestConfig `config:""`
Repo *Repository `inject:""`
Log *observability.Logger `inject:""`
Metrics *observability.Metrics `inject:""`
}
// Init satisfies the steward Initer contract. The Service is stateless beyond
// its injected dependencies.
func (s *Service) Init(_ context.Context) error { return nil }
// validateTurn enforces the wire contract on a single decoded TurnEvent. It
// returns a culpa-coded INVALID error on the first failure; the Service maps
// that into a per-line LineError. Seq is intentionally unchecked: zero is a
// legal first sequence number on the wire.
func validateTurn(t wire.TurnEvent, maxContentBytes int64) error {
if t.Tool == "" {
return culpa.WithCode(culpa.New("tool is required"), "INVALID")
}
if t.Host == "" {
return culpa.WithCode(culpa.New("host is required"), "INVALID")
}
if t.SessionID == "" {
return culpa.WithCode(culpa.New("session_id is required"), "INVALID")
}
if t.TurnID == "" {
return culpa.WithCode(culpa.New("turn_id is required"), "INVALID")
}
if t.Role == "" {
return culpa.WithCode(culpa.New("role is required"), "INVALID")
}
if _, ok := validRoles[t.Role]; !ok {
return culpa.WithCode(culpa.Errorf("role %q is not one of {user, assistant, tool, system}", t.Role), "INVALID")
}
if t.Timestamp == 0 {
return culpa.WithCode(culpa.New("timestamp is required"), "INVALID")
}
if t.Content == "" {
return culpa.WithCode(culpa.New("content is required"), "INVALID")
}
if int64(len(t.Content)) > maxContentBytes {
return culpa.WithCode(culpa.Errorf("content exceeds %d bytes", maxContentBytes), "INVALID")
}
if t.SessionMeta.SourceFile == "" {
return culpa.WithCode(culpa.New("session_meta.source_file is required"), "INVALID")
}
if len(t.SessionMeta.SourceFile) > maxSourceFileBytes {
return culpa.WithCode(culpa.Errorf("session_meta.source_file exceeds %d bytes", maxSourceFileBytes), "INVALID")
}
return nil
}
// Ingest streams NDJSON from body, validates each line, and commits in chunks
// of Cfg.ChunkSize under one transaction per chunk.
//
// Partial-accept semantics: lines that committed in earlier chunks remain
// committed even when a later line fails — Accepted reflects what is actually
// in the DB. A per-line failure (JSON parse, validation, FK) appends a
// LineError and stops processing; subsequent lines are not touched. A
// non-line failure (DB closed, body cap exceeded, scanner I/O) returns the
// (still-truthful) Result alongside the wrapped error so the Handler can map
// it to a 5xx or 413.
func (s *Service) Ingest(ctx context.Context, owner string, body io.Reader, maxBytes int64) (Result, error) {
var (
result Result
chunk = make([]wire.TurnEvent, 0, s.Cfg.ChunkSize)
startsAt = make([]int, 0, s.Cfg.ChunkSize) // line number of each buffered turn
lineNum int
)
commitChunk := func() error {
if len(chunk) == 0 {
return nil
}
firstLine := startsAt[0]
first := chunk[0]
txErr := database.InTx(ctx, s.Repo.Database.DB, func(tx *sqlx.Tx) error {
return s.Repo.UpsertChunk(ctx, tx, owner, chunk)
})
if txErr != nil {
s.Metrics.IngestLinesErrored.Inc()
s.Log.L.WarnContext(ctx, "ingest chunk commit failed",
slog.String("owner", owner),
slog.Int("line", firstLine),
slog.String("tool", first.Tool),
slog.String("host", first.Host),
slog.String("session_id", first.SessionID),
scribe.Err(txErr),
)
return txErr
}
n := len(chunk)
result.Accepted += n
s.Metrics.IngestLinesAccepted.Add(float64(n))
s.Metrics.IngestChunksCommitted.Inc()
// Reset buffers without releasing the underlying capacity.
chunk = chunk[:0]
startsAt = startsAt[:0]
return nil
}
handleChunkErr := func(err error) (Result, error) {
// Take the failing chunk's first line BEFORE reset; commitChunk
// leaves buffers untouched on error so startsAt still has it.
failedLine := lineNum
if len(startsAt) > 0 {
failedLine = startsAt[0]
}
if isLineLevelDBError(err) {
result.Errors = append(result.Errors, LineError{
Line: failedLine,
Err: err.Error(),
})
return result, nil
}
return result, err
}
for raw, scanErr := range httputil.ReadNDJSONLines(body, maxBytes) {
if scanErr != nil {
// Body cap or oversized line: caller maps to 413.
return result, scanErr
}
lineNum++
var event wire.TurnEvent
if err := json.Unmarshal(raw, &event); err != nil {
s.Metrics.IngestLinesErrored.Inc()
result.Errors = append(result.Errors, LineError{
Line: lineNum,
Err: "invalid json: " + err.Error(),
})
return result, nil
}
if err := validateTurn(event, s.Cfg.MaxTurnContentBytes); err != nil {
s.Metrics.IngestLinesErrored.Inc()
result.Errors = append(result.Errors, LineError{
Line: lineNum,
Err: err.Error(),
})
return result, nil
}
chunk = append(chunk, event)
startsAt = append(startsAt, lineNum)
if len(chunk) >= s.Cfg.ChunkSize {
if err := commitChunk(); err != nil {
return handleChunkErr(err)
}
}
}
if err := commitChunk(); err != nil {
return handleChunkErr(err)
}
return result, nil
}
// isLineLevelDBError reports whether err describes a per-row failure (FK or
// constraint violation) that should be surfaced as a LineError in the 200
// response, rather than escalated to a 5xx. The classification mirrors the
// codes Repository.UpsertChunk attaches via wrapSQLErr.
func isLineLevelDBError(err error) bool {
var cd culpa.CodeDetail
if !culpa.FindDetail(err, &cd) {
return false
}
code, ok := cd.Code.(string)
if !ok {
return false
}
switch code {
case "DB_FOREIGN_KEY", "DB_CONSTRAINT":
return true
}
return false
}