~bigbes/lethe

b91a08d0987559eafe4335e91012e0bb02f98984 — Eugene Blikh a month ago 91adc52
feat(platform): scribe logger, prometheus registry, health checker set
M internal/deps/deps.go => internal/deps/deps.go +7 -8
@@ 1,20 1,19 @@
// Package deps records the locked set of direct dependencies for the lethe
// server during early scaffolding. Real packages adopt these as they come
// online (server — chi/prometheus; auth — go-oidc; observability —
// auxilia/scribe; lifecycle — auxilia/steward).
// online (server — chi; auth — go-oidc).
//
// Phase 2 promoted viper, validator/v10, and culpa to real imports under
// internal/config. Phase 3 promoted sqlx, modernc.org/sqlite, and
// golang-migrate/v4 (plus its sqlite driver and iofs source) to real imports
// under internal/platform/database, and culpa is now used there too. Once
// every dep below has at least one real importer, this file is expected to
// disappear in the same commit that completes the migration.
// under internal/platform/database, and culpa is now used there too. Phase 4
// promoted prometheus/client_golang and auxilia/scribe (under
// internal/platform/observability) and auxilia/steward (canary test under
// internal/platform/health) to real imports. Once every dep below has at
// least one real importer, this file is expected to disappear in the same
// commit that completes the migration.
package deps

import (
	_ "github.com/coreos/go-oidc/v3/oidc"
	_ "github.com/go-chi/chi/v5"
	_ "github.com/prometheus/client_golang/prometheus"
	_ "go.bigb.es/auxilia/scribe"
	_ "go.bigb.es/auxilia/steward"
)

A internal/platform/health/health.go => internal/platform/health/health.go +77 -0
@@ 0,0 1,77 @@
// Package health defines a small Checker abstraction and a steward-managed
// Set that aggregates every registered Checker into a single readiness probe.
//
// A Checker is anything with a name and a Check(ctx) method. Concrete checks
// live next to the subsystem they probe (DBCheck here, future probes such as
// IndexCheck or QueueCheck land in their own files). Each check is registered
// as a steward asset; Set multi-injects them via the `inject:""` tag so adding
// a probe is a registration-only operation — no edits to Set or Run.
//
// The handler that exposes /readyz lives in the HTTP layer (Phase 5) and
// consumes (*Set).Run directly.
package health

import (
	"context"
	"time"

	"sourcecraft.dev/bigbes/lethe/internal/platform/database"
)

// perCheckTimeout bounds the time any single Checker.Check call is allowed to
// take. The HTTP /readyz handler typically has its own request budget; the
// per-check timeout here is the inner bound so a hung check cannot starve the
// rest of the set.
const perCheckTimeout = 2 * time.Second

// Checker is the contract every health probe satisfies. Implementations may
// be steward services with their own dependencies (see DBCheck).
type Checker interface {
	Name() string
	Check(ctx context.Context) error
}

// Set is the steward-managed aggregator. It multi-injects every registered
// Checker via the `inject:""` tag. Adding a new check is a matter of
// registering an asset that implements Checker — no edits to Set are needed.
type Set struct {
	Checks []Checker `inject:""`
}

// Run invokes every registered Checker sequentially, applying a per-check
// timeout via context.WithTimeout. Returns a map keyed by Checker.Name() with
// the error each check produced (nil on success), and an aggregate allOK
// flag that is true iff every check succeeded.
//
// Empty Checks slice → empty map and allOK=true. This is intentional: an
// empty set means no subsystem has declared a readiness signal yet, not that
// the system is unhealthy.
func (s *Set) Run(ctx context.Context) (map[string]error, bool) {
	results := make(map[string]error, len(s.Checks))
	allOK := true
	for _, c := range s.Checks {
		ctx2, cancel := context.WithTimeout(ctx, perCheckTimeout)
		err := c.Check(ctx2)
		cancel()
		results[c.Name()] = err
		if err != nil {
			allOK = false
		}
	}
	return results, allOK
}

// DBCheck verifies the SQLite database is reachable by pinging the
// underlying *sql.DB. It is a steward service: register it as an asset and
// it will be picked up by Set's multi-injection.
type DBCheck struct {
	DB *database.Database `inject:""`
}

// Name returns the stable identifier surfaced in the readiness response.
func (c *DBCheck) Name() string { return "database" }

// Check pings the database. The 2s timeout is supplied by Set.Run.
func (c *DBCheck) Check(ctx context.Context) error {
	return c.DB.DB.PingContext(ctx)
}

A internal/platform/health/health_test.go => internal/platform/health/health_test.go +108 -0
@@ 0,0 1,108 @@
package health

import (
	"context"
	"errors"
	"testing"
	"time"
)

// fakeChecker is a hand-built Checker for unit tests. Tests construct Set
// directly via &Set{Checks: []Checker{...}} — no steward involvement.
type fakeChecker struct {
	name  string
	err   error
	delay time.Duration
}

func (f *fakeChecker) Name() string { return f.name }

func (f *fakeChecker) Check(ctx context.Context) error {
	if f.delay > 0 {
		select {
		case <-time.After(f.delay):
		case <-ctx.Done():
			return ctx.Err()
		}
	}
	return f.err
}

func TestSetRunAllOK(t *testing.T) {
	s := &Set{Checks: []Checker{
		&fakeChecker{name: "a"},
		&fakeChecker{name: "b"},
	}}
	results, allOK := s.Run(context.Background())
	if !allOK {
		t.Fatalf("allOK = false; want true")
	}
	if len(results) != 2 {
		t.Fatalf("len(results) = %d; want 2", len(results))
	}
	for name, err := range results {
		if err != nil {
			t.Errorf("results[%q] = %v; want nil", name, err)
		}
	}
}

func TestSetRunAggregatesFailures(t *testing.T) {
	boom := errors.New("boom")
	s := &Set{Checks: []Checker{
		&fakeChecker{name: "a"},
		&fakeChecker{name: "b", err: boom},
		&fakeChecker{name: "c"},
	}}
	results, allOK := s.Run(context.Background())
	if allOK {
		t.Fatalf("allOK = true; want false (one check failed)")
	}
	if got := results["a"]; got != nil {
		t.Errorf("results[a] = %v; want nil", got)
	}
	if got := results["b"]; !errors.Is(got, boom) {
		t.Errorf("results[b] = %v; want errors.Is boom", got)
	}
	if got := results["c"]; got != nil {
		t.Errorf("results[c] = %v; want nil", got)
	}
}

func TestSetRunEmptyChecksReturnsAllOK(t *testing.T) {
	s := &Set{}
	results, allOK := s.Run(context.Background())
	if !allOK {
		t.Fatalf("allOK = false; want true (empty Checks is intentional)")
	}
	if len(results) != 0 {
		t.Fatalf("len(results) = %d; want 0", len(results))
	}
}

func TestSetRunEnforcesPerCheckTimeout(t *testing.T) {
	// fakeChecker with a delay much larger than the 2s per-check budget.
	// Use 10s delay; if the timeout works the test returns in ~2s.
	s := &Set{Checks: []Checker{
		&fakeChecker{name: "slow", delay: 10 * time.Second},
	}}
	start := time.Now()
	results, allOK := s.Run(context.Background())
	elapsed := time.Since(start)

	// The 2s per-check timeout must fire. Allow generous slack for CI but
	// firmly less than the 10s delay we set.
	if elapsed >= 5*time.Second {
		t.Fatalf("Run took %s; per-check timeout did not fire", elapsed)
	}
	if allOK {
		t.Fatalf("allOK = true; want false (timeout should surface as error)")
	}
	got := results["slow"]
	if got == nil {
		t.Fatalf("results[slow] = nil; want timeout error")
	}
	if !errors.Is(got, context.DeadlineExceeded) {
		t.Errorf("results[slow] = %v; want errors.Is(context.DeadlineExceeded)", got)
	}
}

A internal/platform/health/steward_unwind_test.go => internal/platform/health/steward_unwind_test.go +77 -0
@@ 0,0 1,77 @@
package health_test

// This is the Phase 4 steward unwind canary. It verifies whether
// steward.Manager invokes Destroy on already-initialized siblings when a
// later component's Init returns an error. The lifecycle design in the
// lethe-server task assumes it does; if not, an explicit unwind step has to
// be added in main.go (Phase 9).
//
// The test is deliberately placed in package `health_test` (next to the
// health package) so it can import steward without polluting the health
// package and without introducing a brand-new package directory just for
// the canary. See the Phase 4 plan, point 16.
//
// IMPORTANT: this test is allowed to fail. A failure here is the very signal
// the dispatcher needs to plan an explicit unwind. Do NOT mark it Skip; do
// NOT add a workaround in this phase.

import (
	"context"
	"errors"
	"testing"

	"go.bigb.es/auxilia/steward"
)

// recordingService records whether Init and Destroy were called. It has no
// dependencies, so it can sit anywhere in the start order.
type recordingService struct {
	initCalled    bool
	destroyCalled bool
}

func (r *recordingService) Init(_ context.Context) error {
	r.initCalled = true
	return nil
}

func (r *recordingService) Destroy(_ context.Context) error {
	r.destroyCalled = true
	return nil
}

// failingService errors out of Init. It is registered after recordingService
// so that recordingService is already initialized at the point of failure.
type failingService struct{}

var errFailing = errors.New("failingService.Init: intentional failure")

func (f *failingService) Init(_ context.Context) error { return errFailing }

func TestStewardUnwindsOnInitFailure(t *testing.T) {
	rec := &recordingService{}
	fail := &failingService{}

	mgr := steward.NewManager()
	mgr.AddComponent(context.Background(),
		steward.MustServiceAsset(rec, steward.Root(), steward.IgnoreUnused()),
		steward.MustServiceAsset(fail, steward.Root(), steward.IgnoreUnused()),
	)

	if err := mgr.Inject(context.Background()); err != nil {
		t.Fatalf("Inject: %v", err)
	}

	initErr := mgr.Init(context.Background())
	if initErr == nil {
		t.Fatalf("expected Init to surface failingService error, got nil")
	}
	if !rec.initCalled {
		t.Fatalf("recordingService.Init was never called — registration order assumption broken")
	}
	if !rec.destroyCalled {
		// THE FINDING. Steward did not unwind. main.go must call Destroy on
		// every initialized sibling itself when Init fails.
		t.Fatalf("steward did NOT call recordingService.Destroy after sibling Init failed; explicit unwind required in main")
	}
}

A internal/platform/observability/logger.go => internal/platform/observability/logger.go +190 -0
@@ 0,0 1,190 @@
// Package observability hosts the cross-cutting steward services that wire
// structured logging and Prometheus metrics into the lethe server. Both
// services are constructed lazily by steward and have no dependencies on the
// HTTP layer; HTTP middleware (Phase 5) reads from them.
package observability

import (
	"context"
	"log/slog"
	"os"

	"go.bigb.es/auxilia/culpa"
	"go.bigb.es/auxilia/scribe"

	"sourcecraft.dev/bigbes/lethe/internal/config"
)

// maskedKeys lists the attribute keys whose values must never appear in logs
// in cleartext. The list is enforced uniformly across both formats: scribe's
// WithMaskKeys for the tint handler, and a ReplaceAttr function for the
// JSON handler. Keep this list as the single source of truth.
var maskedKeys = []string{"password", "token", "authorization", "secret", "cookie"}

// Unexported context-key types prevent accidental collisions with keys from
// other packages. Helpers below are the only intended access path.
type (
	requestIDKey struct{}
	userKey      struct{}
)

// WithRequestID attaches a request id to ctx so the log handler can stamp
// every record with it. Phase 5's request-id middleware calls this.
func WithRequestID(ctx context.Context, id string) context.Context {
	return context.WithValue(ctx, requestIDKey{}, id)
}

// RequestIDFrom returns the request id previously attached with WithRequestID,
// or the empty string if none is set.
func RequestIDFrom(ctx context.Context) string {
	v, _ := ctx.Value(requestIDKey{}).(string)
	return v
}

// WithUser attaches the authenticated user to ctx. Phase 6's auth layer calls
// this; the logger picks it up automatically.
func WithUser(ctx context.Context, user string) context.Context {
	return context.WithValue(ctx, userKey{}, user)
}

// UserFrom returns the user attached with WithUser, or the empty string if
// none is set.
func UserFrom(ctx context.Context) string {
	v, _ := ctx.Value(userKey{}).(string)
	return v
}

// Logger is the steward-managed logging steward. The active *slog.Logger is
// exposed via L for components that prefer explicit injection; everything
// else can rely on slog.Default(), which Init points at L.
type Logger struct {
	Cfg config.LoggingConfig `config:""`
	L   *slog.Logger
}

// Init builds the configured handler (tint or json), wraps it in
// contextHandler so request-id and user context fields end up in every
// record, and installs the result as the package-default slog logger.
func (l *Logger) Init(_ context.Context) error {
	level, err := parseLevel(l.Cfg.Level)
	if err != nil {
		return culpa.WithCode(err, "OBS_LOGGER_INIT")
	}

	var inner slog.Handler
	switch l.Cfg.Format {
	case "tint":
		opts := []scribe.Option{
			scribe.WithWriter(os.Stderr),
			scribe.WithLevel(level),
		}
		// scribe's WithMaskKeys takes a variadic of strings.
		opts = append(opts, scribe.WithMaskKeys(maskedKeys...))
		inner = scribe.NewTintHandler(opts...)
	case "json":
		inner = slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{
			Level:       level,
			ReplaceAttr: maskAttrJSON,
		})
	default:
		// Config validation already restricts Format to {tint,json}; if we
		// reach here the validator was bypassed — treat it as a programming
		// error rather than papering over silently.
		return culpa.WithCode(
			culpa.New("unsupported log format"),
			"OBS_LOGGER_INIT",
		)
	}

	logger := slog.New(&contextHandler{inner: inner})
	l.L = logger
	slog.SetDefault(logger)
	return nil
}

// parseLevel maps the validated config string to a slog.Level. Returning an
// error rather than defaulting silently — the validator should prevent any
// unknown value from arriving here.
func parseLevel(s string) (slog.Level, error) {
	switch s {
	case "debug":
		return slog.LevelDebug, nil
	case "info":
		return slog.LevelInfo, nil
	case "warn":
		return slog.LevelWarn, nil
	case "error":
		return slog.LevelError, nil
	default:
		return 0, culpa.New("unknown log level")
	}
}

// maskAttrJSON enforces the same mask-keys policy that scribe.WithMaskKeys
// applies, but for the stdlib JSON handler. Match is case-insensitive on the
// last group element to mirror scribe's behaviour.
func maskAttrJSON(_ []string, a slog.Attr) slog.Attr {
	for _, k := range maskedKeys {
		if equalFold(a.Key, k) {
			return slog.String(a.Key, "***")
		}
	}
	return a
}

// equalFold is a tiny ASCII case-insensitive compare; we avoid importing
// strings only for this. Both inputs are short attr keys.
func equalFold(a, b string) bool {
	if len(a) != len(b) {
		return false
	}
	for i := 0; i < len(a); i++ {
		ca, cb := a[i], b[i]
		if ca >= 'A' && ca <= 'Z' {
			ca += 'a' - 'A'
		}
		if cb >= 'A' && cb <= 'Z' {
			cb += 'a' - 'A'
		}
		if ca != cb {
			return false
		}
	}
	return true
}

// contextHandler decorates an inner slog.Handler so every emitted record is
// stamped with the request id and (if present) user pulled from the record's
// context. WithAttrs/WithGroup delegate; the wrapped handler is rebuilt so
// the chain remains intact.
type contextHandler struct {
	inner slog.Handler
}

// Enabled delegates to the inner handler.
func (h *contextHandler) Enabled(ctx context.Context, lvl slog.Level) bool {
	return h.inner.Enabled(ctx, lvl)
}

// Handle adds request_id and user attributes (when present in ctx) before
// passing the record through to the inner handler.
func (h *contextHandler) Handle(ctx context.Context, r slog.Record) error {
	if id := RequestIDFrom(ctx); id != "" {
		r.AddAttrs(slog.String("request_id", id))
	}
	if u := UserFrom(ctx); u != "" {
		r.AddAttrs(slog.String("user", u))
	}
	return h.inner.Handle(ctx, r)
}

// WithAttrs returns a new contextHandler whose inner handler has the supplied
// attributes pre-bound.
func (h *contextHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
	return &contextHandler{inner: h.inner.WithAttrs(attrs)}
}

// WithGroup returns a new contextHandler scoped to the named group.
func (h *contextHandler) WithGroup(name string) slog.Handler {
	return &contextHandler{inner: h.inner.WithGroup(name)}
}

A internal/platform/observability/logger_test.go => internal/platform/observability/logger_test.go +101 -0
@@ 0,0 1,101 @@
package observability

import (
	"bytes"
	"context"
	"encoding/json"
	"log/slog"
	"testing"

	"sourcecraft.dev/bigbes/lethe/internal/config"
)

// TestLoggerInitTintAndJSON exercises Init for both supported formats and
// ensures the package-default logger is replaced.
func TestLoggerInitTintAndJSON(t *testing.T) {
	prev := slog.Default()
	t.Cleanup(func() { slog.SetDefault(prev) })

	for _, format := range []string{"tint", "json"} {
		l := &Logger{Cfg: config.LoggingConfig{Level: "info", Format: format}}
		if err := l.Init(context.Background()); err != nil {
			t.Fatalf("Init(%s): %v", format, err)
		}
		if l.L == nil {
			t.Fatalf("Init(%s): L is nil", format)
		}
		if slog.Default() != l.L {
			t.Fatalf("Init(%s): slog.Default not replaced", format)
		}
	}
}

func TestLoggerInitRejectsUnknownLevelAndFormat(t *testing.T) {
	l := &Logger{Cfg: config.LoggingConfig{Level: "verbose", Format: "json"}}
	if err := l.Init(context.Background()); err == nil {
		t.Fatalf("Init: expected error for unknown level")
	}
	l = &Logger{Cfg: config.LoggingConfig{Level: "info", Format: "yaml"}}
	if err := l.Init(context.Background()); err == nil {
		t.Fatalf("Init: expected error for unknown format")
	}
}

// TestContextHandlerAddsRequestIDAndUser proves the contextHandler stamp
// fires for both keys when present, and is silent when absent.
func TestContextHandlerAddsRequestIDAndUser(t *testing.T) {
	var buf bytes.Buffer
	inner := slog.NewJSONHandler(&buf, &slog.HandlerOptions{Level: slog.LevelDebug})
	logger := slog.New(&contextHandler{inner: inner})

	ctx := WithUser(WithRequestID(context.Background(), "req-42"), "alice")
	logger.InfoContext(ctx, "hello")

	var rec map[string]any
	if err := json.Unmarshal(buf.Bytes(), &rec); err != nil {
		t.Fatalf("unmarshal log line: %v", err)
	}
	if rec["request_id"] != "req-42" {
		t.Fatalf("request_id = %v; want req-42", rec["request_id"])
	}
	if rec["user"] != "alice" {
		t.Fatalf("user = %v; want alice", rec["user"])
	}

	// Empty context: no request_id/user fields.
	buf.Reset()
	logger.InfoContext(context.Background(), "anon")
	rec = nil
	if err := json.Unmarshal(buf.Bytes(), &rec); err != nil {
		t.Fatalf("unmarshal anon log line: %v", err)
	}
	if _, ok := rec["request_id"]; ok {
		t.Errorf("anon: request_id present unexpectedly")
	}
	if _, ok := rec["user"]; ok {
		t.Errorf("anon: user present unexpectedly")
	}
}

// TestJSONMaskingRedactsSensitiveKeys covers the JSON ReplaceAttr path. The
// scribe tint path is library-tested by go.bigb.es/auxilia.
func TestJSONMaskingRedactsSensitiveKeys(t *testing.T) {
	var buf bytes.Buffer
	h := slog.NewJSONHandler(&buf, &slog.HandlerOptions{ReplaceAttr: maskAttrJSON})
	logger := slog.New(h)
	logger.Info("login", slog.String("password", "hunter2"), slog.String("authorization", "Bearer xyz"), slog.String("user", "alice"))

	var rec map[string]any
	if err := json.Unmarshal(buf.Bytes(), &rec); err != nil {
		t.Fatalf("unmarshal: %v", err)
	}
	if rec["password"] != "***" {
		t.Errorf("password = %v; want ***", rec["password"])
	}
	if rec["authorization"] != "***" {
		t.Errorf("authorization = %v; want ***", rec["authorization"])
	}
	if rec["user"] != "alice" {
		t.Errorf("user = %v; want alice (not masked)", rec["user"])
	}
}

A internal/platform/observability/metrics.go => internal/platform/observability/metrics.go +79 -0
@@ 0,0 1,79 @@
package observability

import (
	"context"

	"github.com/prometheus/client_golang/prometheus"
	"github.com/prometheus/client_golang/prometheus/collectors"
)

// Metrics is the steward-managed Prometheus steward. It owns a private
// Registry (no global registry leakage) and the lethe-specific collectors
// other layers increment. The HTTP middleware (Phase 5) records request
// counters/histograms here; the ingest service (Phase 7) increments the
// ingest counters.
//
// Registering everything via Registry.MustRegister in Init keeps wiring in
// one place. Cardinality control on HTTPRequests/HTTPDuration is enforced by
// the middleware: the route label must come from chi's RoutePattern (never
// the raw URL path).
type Metrics struct {
	Registry *prometheus.Registry

	HTTPRequests *prometheus.CounterVec
	HTTPDuration *prometheus.HistogramVec

	IngestLinesAccepted   prometheus.Counter
	IngestLinesErrored    prometheus.Counter
	IngestChunksCommitted prometheus.Counter
}

// Init builds a fresh registry, attaches the standard process and Go runtime
// collectors, and registers the lethe-specific HTTP and ingest series.
func (m *Metrics) Init(_ context.Context) error {
	m.Registry = prometheus.NewRegistry()

	m.Registry.MustRegister(
		collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
		collectors.NewGoCollector(),
	)

	m.HTTPRequests = prometheus.NewCounterVec(
		prometheus.CounterOpts{
			Name: "lethe_http_requests_total",
			Help: "Total HTTP requests handled, labelled by method, chi route pattern, and status code.",
		},
		[]string{"method", "route", "status"},
	)

	m.HTTPDuration = prometheus.NewHistogramVec(
		prometheus.HistogramOpts{
			Name:    "lethe_http_request_duration_seconds",
			Help:    "HTTP request duration in seconds, labelled by method and chi route pattern.",
			Buckets: prometheus.DefBuckets,
		},
		[]string{"method", "route"},
	)

	m.IngestLinesAccepted = prometheus.NewCounter(prometheus.CounterOpts{
		Name: "lethe_ingest_lines_accepted_total",
		Help: "Total ingest JSONL lines accepted (validated and queued for commit).",
	})
	m.IngestLinesErrored = prometheus.NewCounter(prometheus.CounterOpts{
		Name: "lethe_ingest_lines_errored_total",
		Help: "Total ingest JSONL lines rejected (validation, size, or schema failures).",
	})
	m.IngestChunksCommitted = prometheus.NewCounter(prometheus.CounterOpts{
		Name: "lethe_ingest_chunks_committed_total",
		Help: "Total ingest chunks successfully committed to the database.",
	})

	m.Registry.MustRegister(
		m.HTTPRequests,
		m.HTTPDuration,
		m.IngestLinesAccepted,
		m.IngestLinesErrored,
		m.IngestChunksCommitted,
	)
	return nil
}

A internal/platform/observability/metrics_test.go => internal/platform/observability/metrics_test.go +41 -0
@@ 0,0 1,41 @@
package observability

import (
	"context"
	"testing"
)

// TestMetricsInit confirms Init returns nil and every counter/vec is wired.
// We deliberately do not assert on Prometheus exposition format — that's an
// integration concern handled when the /metrics endpoint lands in Phase 5.
func TestMetricsInit(t *testing.T) {
	m := &Metrics{}
	if err := m.Init(context.Background()); err != nil {
		t.Fatalf("Init: %v", err)
	}
	if m.Registry == nil {
		t.Fatal("Registry is nil")
	}
	if m.HTTPRequests == nil {
		t.Fatal("HTTPRequests is nil")
	}
	if m.HTTPDuration == nil {
		t.Fatal("HTTPDuration is nil")
	}
	if m.IngestLinesAccepted == nil {
		t.Fatal("IngestLinesAccepted is nil")
	}
	if m.IngestLinesErrored == nil {
		t.Fatal("IngestLinesErrored is nil")
	}
	if m.IngestChunksCommitted == nil {
		t.Fatal("IngestChunksCommitted is nil")
	}

	// Sanity: increments/observations don't panic.
	m.HTTPRequests.WithLabelValues("GET", "/healthz", "200").Inc()
	m.HTTPDuration.WithLabelValues("GET", "/healthz").Observe(0.001)
	m.IngestLinesAccepted.Inc()
	m.IngestLinesErrored.Inc()
	m.IngestChunksCommitted.Inc()
}