package ingest import ( "context" "log/slog" "net/http" "strings" "github.com/go-chi/chi/v5" "go.bigb.es/auxilia/culpa" "go.bigb.es/auxilia/scribe" "sourcecraft.dev/bigbes/lethe/internal/config" "sourcecraft.dev/bigbes/lethe/internal/pkg/apierror" "sourcecraft.dev/bigbes/lethe/internal/pkg/httputil" "sourcecraft.dev/bigbes/lethe/internal/server/auth" ) // ndjsonContentType is the canonical media type for the ingest payload. The // IETF-style `application/x-ndjson` is the documented choice for lethe; the // collector emits this exact value. Anything else is rejected with 415. const ndjsonContentType = "application/x-ndjson" // Handler is the steward-managed ingest HTTP boundary. It enforces the // Content-Type and body cap, derives the owner from the authenticated // identity (never from the wire payload), and delegates the streaming pipeline // to Service. type Handler struct { Cfg config.IngestConfig `config:""` Service *Service `inject:""` } // Init satisfies the steward Initer contract. The handler holds no state // beyond its injected dependencies. func (h *Handler) Init(_ context.Context) error { return nil } // Mount registers POST /ingest on r. The router supplied by Server is the // /api/v1 group, so the effective path is /api/v1/ingest. Auth middleware is // installed by the parent group; Post relies on auth.MustIdentity(). func (h *Handler) Mount(r chi.Router) { r.Post("/ingest", h.Post) } // Post is the NDJSON ingest endpoint. It always returns 200 when the body is // well-formed enough to read — per-line failures live in the JSON `errors` // array. Hard infrastructure failures (DB down, body cap exceeded) propagate // as RFC 7807 problems via apierror.Render. func (h *Handler) Post(w http.ResponseWriter, r *http.Request) { identity := auth.MustIdentity(r.Context()) if !isNDJSONContentType(r.Header.Get("Content-Type")) { apierror.Render(w, r, culpa.WithCode( culpa.WithPublic( culpa.Errorf("Content-Type must be %s", ndjsonContentType), "Content-Type must be "+ndjsonContentType, ), "UNSUPPORTED_MEDIA_TYPE", )) return } // MaxBytesReader caps the request body; Service.Ingest's NDJSON scanner // surfaces an over-cap read as a TOO_LARGE-coded culpa error which // apierror.Render turns into 413. body := http.MaxBytesReader(w, r.Body, h.Cfg.MaxBodyBytes) defer func() { _ = body.Close() }() result, err := h.Service.Ingest(r.Context(), identity.User, body, h.Cfg.MaxBodyBytes) if err != nil { // Render the failure but don't drop the partial-accept count: the // 5xx/413 problem document tells the client the run aborted, and // the next call will resume from its own offset bookkeeping. apierror.Render(w, r, err) return } if writeErr := httputil.WriteJSON(w, http.StatusOK, result); writeErr != nil { slog.Default().ErrorContext(r.Context(), "write ingest response", scribe.Err(writeErr)) } } // isNDJSONContentType returns true when the header value (with any trailing // parameters such as `; charset=utf-8`) names the canonical NDJSON media // type. The MIME type itself is matched case-insensitively per RFC 9110. func isNDJSONContentType(v string) bool { if v == "" { return false } // Strip parameters; we only need the type/subtype prefix. if idx := strings.IndexByte(v, ';'); idx >= 0 { v = v[:idx] } return strings.EqualFold(strings.TrimSpace(v), ndjsonContentType) }