package ingest
import (
"context"
"log/slog"
"net/http"
"strings"
"github.com/go-chi/chi/v5"
"go.bigb.es/auxilia/culpa"
"go.bigb.es/auxilia/scribe"
"sourcecraft.dev/bigbes/lethe/internal/config"
"sourcecraft.dev/bigbes/lethe/internal/pkg/apierror"
"sourcecraft.dev/bigbes/lethe/internal/pkg/httputil"
"sourcecraft.dev/bigbes/lethe/internal/server/auth"
)
// ndjsonContentType is the canonical media type for the ingest payload. The
// IETF-style `application/x-ndjson` is the documented choice for lethe; the
// collector emits this exact value. Anything else is rejected with 415.
const ndjsonContentType = "application/x-ndjson"
// Handler is the steward-managed ingest HTTP boundary. It enforces the
// Content-Type and body cap, derives the owner from the authenticated
// identity (never from the wire payload), and delegates the streaming pipeline
// to Service.
type Handler struct {
Cfg config.IngestConfig `config:""`
Service *Service `inject:""`
}
// Init satisfies the steward Initer contract. The handler holds no state
// beyond its injected dependencies.
func (h *Handler) Init(_ context.Context) error { return nil }
// Mount registers POST /ingest on r. The router supplied by Server is the
// /api/v1 group, so the effective path is /api/v1/ingest. Auth middleware is
// installed by the parent group; Post relies on auth.MustIdentity().
func (h *Handler) Mount(r chi.Router) {
r.Post("/ingest", h.Post)
}
// Post is the NDJSON ingest endpoint. It always returns 200 when the body is
// well-formed enough to read — per-line failures live in the JSON `errors`
// array. Hard infrastructure failures (DB down, body cap exceeded) propagate
// as RFC 7807 problems via apierror.Render.
func (h *Handler) Post(w http.ResponseWriter, r *http.Request) {
identity := auth.MustIdentity(r.Context())
if !isNDJSONContentType(r.Header.Get("Content-Type")) {
apierror.Render(w, r, culpa.WithCode(
culpa.WithPublic(
culpa.Errorf("Content-Type must be %s", ndjsonContentType),
"Content-Type must be "+ndjsonContentType,
),
"UNSUPPORTED_MEDIA_TYPE",
))
return
}
// MaxBytesReader caps the request body; Service.Ingest's NDJSON scanner
// surfaces an over-cap read as a TOO_LARGE-coded culpa error which
// apierror.Render turns into 413.
body := http.MaxBytesReader(w, r.Body, h.Cfg.MaxBodyBytes)
defer func() { _ = body.Close() }()
result, err := h.Service.Ingest(r.Context(), identity.User, body, h.Cfg.MaxBodyBytes)
if err != nil {
// Render the failure but don't drop the partial-accept count: the
// 5xx/413 problem document tells the client the run aborted, and
// the next call will resume from its own offset bookkeeping.
apierror.Render(w, r, err)
return
}
if writeErr := httputil.WriteJSON(w, http.StatusOK, result); writeErr != nil {
slog.Default().ErrorContext(r.Context(), "write ingest response", scribe.Err(writeErr))
}
}
// isNDJSONContentType returns true when the header value (with any trailing
// parameters such as `; charset=utf-8`) names the canonical NDJSON media
// type. The MIME type itself is matched case-insensitively per RFC 9110.
func isNDJSONContentType(v string) bool {
if v == "" {
return false
}
// Strip parameters; we only need the type/subtype prefix.
if idx := strings.IndexByte(v, ';'); idx >= 0 {
v = v[:idx]
}
return strings.EqualFold(strings.TrimSpace(v), ndjsonContentType)
}