~bigbes/lethe

ref: 964d8022994f1c6260df9565e1882ebaaf15badc lethe/internal/domain/ingest/handler.go -rw-r--r-- 3.3 KiB
964d8022 — Eugene Blikh test(lethe): register savedsearch repo+handler in e2e steward graph a month ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package ingest

import (
	"context"
	"log/slog"
	"net/http"
	"strings"

	"github.com/go-chi/chi/v5"
	"go.bigb.es/auxilia/culpa"
	"go.bigb.es/auxilia/scribe"

	"sourcecraft.dev/bigbes/lethe/internal/config"
	"sourcecraft.dev/bigbes/lethe/internal/pkg/apierror"
	"sourcecraft.dev/bigbes/lethe/internal/pkg/httputil"
	"sourcecraft.dev/bigbes/lethe/internal/server/auth"
)

// ndjsonContentType is the canonical media type for the ingest payload. The
// IETF-style `application/x-ndjson` is the documented choice for lethe; the
// collector emits this exact value. Anything else is rejected with 415.
const ndjsonContentType = "application/x-ndjson"

// Handler is the steward-managed ingest HTTP boundary. It enforces the
// Content-Type and body cap, derives the owner from the authenticated
// identity (never from the wire payload), and delegates the streaming pipeline
// to Service.
type Handler struct {
	Cfg     config.IngestConfig `config:""`
	Service *Service            `inject:""`
}

// Init satisfies the steward Initer contract. The handler holds no state
// beyond its injected dependencies.
func (h *Handler) Init(_ context.Context) error { return nil }

// Mount registers POST /ingest on r. The router supplied by Server is the
// /api/v1 group, so the effective path is /api/v1/ingest. Auth middleware is
// installed by the parent group; Post relies on auth.MustIdentity().
func (h *Handler) Mount(r chi.Router) {
	r.Post("/ingest", h.Post)
}

// Post is the NDJSON ingest endpoint. It always returns 200 when the body is
// well-formed enough to read — per-line failures live in the JSON `errors`
// array. Hard infrastructure failures (DB down, body cap exceeded) propagate
// as RFC 7807 problems via apierror.Render.
func (h *Handler) Post(w http.ResponseWriter, r *http.Request) {
	identity := auth.MustIdentity(r.Context())

	if !isNDJSONContentType(r.Header.Get("Content-Type")) {
		apierror.Render(w, r, culpa.WithCode(
			culpa.WithPublic(
				culpa.Errorf("Content-Type must be %s", ndjsonContentType),
				"Content-Type must be "+ndjsonContentType,
			),
			"UNSUPPORTED_MEDIA_TYPE",
		))
		return
	}

	// MaxBytesReader caps the request body; Service.Ingest's NDJSON scanner
	// surfaces an over-cap read as a TOO_LARGE-coded culpa error which
	// apierror.Render turns into 413.
	body := http.MaxBytesReader(w, r.Body, h.Cfg.MaxBodyBytes)
	defer func() { _ = body.Close() }()

	result, err := h.Service.Ingest(r.Context(), identity.User, body, h.Cfg.MaxBodyBytes)
	if err != nil {
		// Render the failure but don't drop the partial-accept count: the
		// 5xx/413 problem document tells the client the run aborted, and
		// the next call will resume from its own offset bookkeeping.
		apierror.Render(w, r, err)
		return
	}

	if writeErr := httputil.WriteJSON(w, http.StatusOK, result); writeErr != nil {
		slog.Default().ErrorContext(r.Context(), "write ingest response", scribe.Err(writeErr))
	}
}

// isNDJSONContentType returns true when the header value (with any trailing
// parameters such as `; charset=utf-8`) names the canonical NDJSON media
// type. The MIME type itself is matched case-insensitively per RFC 9110.
func isNDJSONContentType(v string) bool {
	if v == "" {
		return false
	}
	// Strip parameters; we only need the type/subtype prefix.
	if idx := strings.IndexByte(v, ';'); idx >= 0 {
		v = v[:idx]
	}
	return strings.EqualFold(strings.TrimSpace(v), ndjsonContentType)
}