From 53221c915cf9f4ce373ee0192e58898d660bdbfd Mon Sep 17 00:00:00 2001 From: Eugene Blikh Date: Sat, 25 Apr 2026 23:46:44 +0300 Subject: [PATCH] feat(cmd): wire server with /healthz /readyz /metrics + authed /api/v1 Phase 9 of lethe-server: thin main.go that loads config, registers every steward asset, and orchestrates Inject -> Init -> Start -> wait -> Stop -> Destroy. Compensates for the Phase 4 finding (steward.Manager does not unwind on Init failure) by tracking destroyer-implementing assets in a parallel slice and invoking Destroy in reverse registration order with a per-call timeout when Init or Start fails. Server.Start now opens its TCP listener synchronously and exposes the bound address via Addr(), so the e2e smoke can bind to 127.0.0.1:0 and discover the kernel-assigned port. Adds an end-to-end smoke test that drives the real steward graph (in-memory SQLite, real loopback listener, forward-auth) through ingest + sessions list/detail for two users with the same composite session key, proving owner isolation reaches all the way through the trust boundary. Deletes internal/platform/health/steward_unwind_test.go: the canary's purpose was to surface the unwind gap so Phase 9 could compensate, which it now does. README updated with consolidated curl quickstart (forward-auth + OIDC bearer variants), trust-chain diagram and the proxy-must-strip-Remote-* spoofing note, response-shape documentation for the API surface, and an operational notes section covering health, metrics, lifecycle and logs. --- README.md | 112 +++++- cmd/lethe/main.go | 185 +++++++++- cmd/lethe/main_e2e_test.go | 338 ++++++++++++++++++ .../platform/health/steward_unwind_test.go | 77 ---- internal/server/server.go | 29 +- 5 files changed, 637 insertions(+), 104 deletions(-) create mode 100644 cmd/lethe/main_e2e_test.go delete mode 100644 internal/platform/health/steward_unwind_test.go diff --git a/README.md b/README.md index f2247c324099d8640bdd72c9fb29d1e90926464d..958718c164080a406993837f7e7cedecdf1eb4c9 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,7 @@ is just the server. ```bash cp config.example.yaml config.yaml +# edit config.yaml — at minimum set auth.allowed_users just run # or, for hot reload during development: just air @@ -29,6 +30,29 @@ just air The server reads `config.yaml` by default. Pass `-config ` to override. +Once the server is up (default bind `127.0.0.1:8080`), exercise the API: + +```bash +# Public probes (no auth): +curl http://127.0.0.1:8080/healthz +curl http://127.0.0.1:8080/readyz +curl http://127.0.0.1:8080/metrics + +# Forward-auth (the reverse proxy normally injects this header): +curl -H 'Remote-User: bigbes' http://127.0.0.1:8080/api/v1/sessions + +# Ingest one NDJSON line as that user: +printf '%s\n' '{"tool":"claude-code","host":"laptop","session_id":"s1","turn_id":"t1","seq":0,"role":"user","timestamp":1700000000,"content":"hi","session_meta":{"source_file":"/tmp/s1.jsonl"}}' \ + | curl -H 'Remote-User: bigbes' -H 'Content-Type: application/x-ndjson' \ + --data-binary @- http://127.0.0.1:8080/api/v1/ingest + +# Read the session back: +curl -H 'Remote-User: bigbes' http://127.0.0.1:8080/api/v1/sessions/claude-code/laptop/s1 + +# OIDC bearer (when auth.oidc.enabled: true): +curl -H "Authorization: Bearer $JWT" http://127.0.0.1:8080/api/v1/sessions +``` + ## Trust model `lethe` always binds `127.0.0.1`. It is not safe to expose it to the @@ -36,6 +60,25 @@ network directly. All authentication assumes a trusted reverse proxy on the same host (e.g. Caddy or Traefik) that terminates TLS and either injects auth headers or relays an OIDC bearer. +``` + client ──TLS──▶ Caddy ──forward_auth──▶ Authelia + │ │ + │ ◀── 200 + headers ────┘ + │ (Remote-User, Remote-Groups, ...) + │ + └─HTTP─▶ lethe (127.0.0.1:8080) + │ + └─▶ SQLite (./data/lethe.db) +``` + +The proxy MUST strip any `Remote-User` (and the other `Remote-*` headers) +that arrive on the inbound request before it issues the forward-auth +sub-request — otherwise a hostile client can spoof an identity by setting +the header itself. Authelia's documentation calls this out; Caddy's +`copy_headers` directive only adds Authelia's response headers, it does +not clear inbound ones, so add an explicit `header_down -Remote-*` (or +equivalent) when you pass requests through. + The implicit assumption is that nothing else on the host binds `127.0.0.1` and forges auth headers. That is the whole trust boundary for path (a) below. @@ -144,27 +187,55 @@ the proxy. Everything under `/api/v1/*` is authed. | Method | Path | Auth | Notes | |--------|-----------------------------------------------------|------|-------| -| POST | `/api/v1/ingest` | yes | NDJSON body of `TurnEvent` records. Idempotent at the turn level per owner. | -| GET | `/api/v1/sessions` | yes | Paginated. Filters: `tool`, `host`, `since`, `until`. Admins may pass `?owner=` or `?owner=*`. | +| POST | `/api/v1/ingest` | yes | `Content-Type: application/x-ndjson`; one `TurnEvent` per line. Idempotent at the turn level per owner. | +| GET | `/api/v1/sessions` | yes | Paginated. Filters: `tool`, `host`, `since`, `until`, `limit`, `offset`. Admins may pass `?owner=` or `?owner=*`. | | GET | `/api/v1/sessions/{tool}/{host}/{session_id}` | yes | Full session with turns inline. Admins may pass `?owner=`. | -| GET | `/healthz` | no | Liveness. | -| GET | `/readyz` | no | Readiness — DB ping etc. | -| GET | `/metrics` | no | Prometheus. | +| GET | `/healthz` | no | Liveness — constant 200 `ok`. | +| GET | `/readyz` | no | Readiness — runs every registered probe; 200 with `{"checks":{...}}` or 503. | +| GET | `/metrics` | no | Prometheus exposition. | + +Response shapes (success): + +- **POST `/api/v1/ingest`** → `200` with `{"accepted": , "errors": [{"line": , "error": }]}`. The endpoint always returns 200 when the body is well-formed enough to read; per-line failures are reported in `errors` and the client uses `accepted` to advance its offset bookkeeping. +- **GET `/api/v1/sessions`** → `200` with `{"sessions": [Session, ...], "limit": , "offset": }`. The echoed `limit`/`offset` reflect the (clamped) effective values. +- **GET `/api/v1/sessions/{tool}/{host}/{session_id}`** → `200` with the `Session` columns flattened at the top level plus `"turns": [Turn, ...]` ordered by `seq` ascending. The `owner` field is server-derived; the wire format has no `owner`. A non-admin caller passing `?owner=` gets 403. -Errors are RFC 7807 `application/problem+json`. +Errors are RFC 7807 `application/problem+json` with the lethe-specific +`code` extension carrying the machine-readable culpa code (e.g. +`UNAUTHORIZED`, `NOT_FOUND`, `INVALID`, `DB_OPEN`). ## Production deployment -`lethe` binds `127.0.0.1` only. Run it on the same host as a reverse -proxy that terminates TLS and injects auth headers (or relays an OIDC -bearer). Do not expose the listener to the network directly. +`lethe` binds `127.0.0.1` only — non-loopback binds are rejected at +startup with a `CONFIG_INVALID` error. Run it on the same host as a +reverse proxy (Caddy or Traefik) that terminates TLS and injects auth +headers (or relays an OIDC bearer). Never publish `:8080` directly. When run via the bundled `docker-compose.yml`, the service does not publish a port to the host — it only `expose`s `8080` on the compose -network. The reverse proxy reaches lethe through that network. +network. The reverse proxy reaches lethe through that network. The +SQLite file lives at `./data/lethe.db` on the host (mounted into the +container at `/data`); take backups against the host path. + +## Operational notes + +- **Health checks** — point your orchestrator at `GET /healthz` (constant + 200, no DB touch) for liveness and `GET /readyz` (DB ping with a 5s + budget) for readiness. +- **Metrics** — `GET /metrics` exposes Prometheus series prefixed with + `lethe_*` plus the standard process and Go runtime collectors. Scrape + it locally; do not expose `:8080` to the network. +- **Lifecycle** — on `SIGINT`/`SIGTERM` lethe runs a 15s graceful stop + (in-flight requests drain, then the listener closes), then closes the + database. If init fails before `Start` succeeds, lethe walks the + partially-initialized assets in reverse and calls `Destroy` on each so + no resource leaks. +- **Logs** — set `logging.format: json` for structured production + logging; `tint` is the friendly developer format. `request_id` and + `user` are stamped on every record originating from the HTTP layer. ## Backup @@ -188,9 +259,22 @@ PATH=/usr/bin:/bin ``` . -├── cmd/lethe/ # main, thin shell +├── cmd/lethe/ # main, thin shell + e2e smoke ├── internal/ -│ └── shared/wire/ # locked NDJSON contract shared with the collector +│ ├── config/ # YAML+env loader, validators, defaults +│ ├── domain/ +│ │ ├── ingest/ # NDJSON ingest: handler, service, repo +│ │ └── session/ # sessions read API: handler, repo +│ ├── pkg/ +│ │ ├── apierror/ # culpa-error → RFC 7807 problem renderer +│ │ └── httputil/ # NDJSON scanner, JSON write helper +│ ├── platform/ +│ │ ├── database/ # sqlx wrapper + embed.FS migrations +│ │ ├── health/ # Checker + steward-aggregated Set +│ │ └── observability/ # slog logger + Prometheus metrics +│ ├── server/ # chi router, middleware, route mount +│ │ └── auth/ # forward-auth + OIDC verifier +│ └── shared/wire/ # locked NDJSON contract (collector-shared) ├── config.example.yaml ├── Justfile ├── .air.toml @@ -198,7 +282,3 @@ PATH=/usr/bin:/bin ├── docker-compose.yml └── .golangci.yml ``` - -The rest of the layout (`internal/config`, `internal/server`, -`internal/domain/...`, `internal/platform/...`, `migrations/`) lands in -later phases. diff --git a/cmd/lethe/main.go b/cmd/lethe/main.go index 025d4523e6cd4d0352b361acaf8a20a18edac6fe..655cdee8fa316ba2d7ac981c0f678cd69b3d7ca8 100644 --- a/cmd/lethe/main.go +++ b/cmd/lethe/main.go @@ -1,18 +1,193 @@ -// Command lethe is the lethe server binary. Phase 1 ships a stub that prints -// the version and selected config path; real wiring lands in Phase 9. +// Command lethe is the lethe server binary. main.go is a thin shell: it loads +// the configuration, registers every steward asset that makes up the running +// server, and orchestrates the lifecycle (Inject -> Init -> Start -> wait for +// signal -> Stop -> Destroy). All business logic lives in the assets. +// +// Steward unwind compensation: per the Phase 4 finding, steward.Manager does +// NOT call Destroy on already-init'd siblings when a later component's Init +// returns an error. main keeps a parallel destroyer slice in registration +// order; on Init/Start failure it walks the slice in reverse and calls Destroy +// directly, swallowing individual errors so the rest of the cleanup proceeds. package main import ( + "context" "flag" "fmt" + "log/slog" + "os" + "os/signal" + "syscall" + "time" + + "go.bigb.es/auxilia/scribe" + "go.bigb.es/auxilia/steward" + + "sourcecraft.dev/bigbes/lethe/internal/config" + "sourcecraft.dev/bigbes/lethe/internal/domain/ingest" + "sourcecraft.dev/bigbes/lethe/internal/domain/session" + "sourcecraft.dev/bigbes/lethe/internal/platform/database" + "sourcecraft.dev/bigbes/lethe/internal/platform/health" + "sourcecraft.dev/bigbes/lethe/internal/platform/observability" + "sourcecraft.dev/bigbes/lethe/internal/server" + authpkg "sourcecraft.dev/bigbes/lethe/internal/server/auth" ) -const version = "0.1.0-dev" +const ( + version = "0.1.0-dev" + shutdownGrace = 15 * time.Second + perDestroyTimeout = 5 * time.Second + exitOK = 0 + exitConfigError = 1 + exitLifecycleError = 2 +) + +// destroyer is the local interface every steward asset that holds resources +// implements. The unwind compensator narrows registered services to this +// interface via type-assert; assets that hold no resources are skipped. +type destroyer interface { + Destroy(context.Context) error +} func main() { + os.Exit(run()) +} + +// run is the testable entry point. It returns an exit code rather than +// calling os.Exit directly so future tests (or a smoke harness) can exercise +// startup and shutdown without tearing down the test binary. +func run() int { configPath := flag.String("config", "config.yaml", "path to YAML config file") flag.Parse() - fmt.Printf("lethe %s\n", version) - fmt.Printf("config: %s\n", *configPath) + // Bootstrap a stderr logger before anything else so the unwind path always + // has a working slog.Default(). The Logger asset's Init (Phase 4) replaces + // this with the configured handler once it runs. + slog.SetDefault(slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo}))) + + slog.Info("lethe starting", slog.String("version", version), slog.String("config", *configPath)) + + cfg, err := config.Load(*configPath) + if err != nil { + slog.Error("load config", scribe.Err(err)) + return exitConfigError + } + + // signal.NotifyContext converts SIGINT/SIGTERM into ctx cancellation. The + // stop closure is also called explicitly when we want to release the OS + // signal handlers (e.g. before the shutdown phase). + ctx, stopSignals := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stopSignals() + + mgr := steward.NewManager() + + // Underlying service struct pointers are tracked here in registration + // order so the unwind compensator can walk them in reverse on Init/Start + // failure. + var ( + loggerSvc = &observability.Logger{} + metricsSvc = &observability.Metrics{} + dbSvc = &database.Database{} + dbCheckSvc = &health.DBCheck{} + healthSetSvc = &health.Set{} + authSvc = &authpkg.Authenticator{} + ingestRepo = &ingest.Repository{} + ingestSvc = &ingest.Service{} + ingestHnd = &ingest.Handler{} + sessionRepo = &session.Repository{} + sessionHnd = &session.Handler{} + serverSvc = &server.Server{} + ) + + registered := []any{ + loggerSvc, metricsSvc, dbSvc, dbCheckSvc, healthSetSvc, + authSvc, ingestRepo, ingestSvc, ingestHnd, + sessionRepo, sessionHnd, serverSvc, + } + + mgr.AddComponent(ctx, + steward.MustConfigurationAsset(cfg), + steward.MustServiceAsset(loggerSvc), + steward.MustServiceAsset(metricsSvc), + steward.MustServiceAsset(dbSvc), + steward.MustServiceAsset(dbCheckSvc), + steward.MustServiceAsset(healthSetSvc), + steward.MustServiceAsset(authSvc), + steward.MustServiceAsset(ingestRepo), + steward.MustServiceAsset(ingestSvc), + steward.MustServiceAsset(ingestHnd), + steward.MustServiceAsset(sessionRepo), + steward.MustServiceAsset(sessionHnd), + steward.MustServiceAsset(serverSvc, steward.Root()), + ) + + if cfg.Auth.OIDC.Enabled { + oidcSvc := &authpkg.OIDCVerifier{} + registered = append(registered, oidcSvc) + mgr.AddComponent(ctx, steward.MustServiceAsset(oidcSvc)) + } + + if err := mgr.Inject(ctx); err != nil { + slog.Error("steward inject failed", scribe.Err(err)) + unwindOnError(registered) + return exitLifecycleError + } + + if err := mgr.Init(ctx); err != nil { + slog.Error("steward init failed", scribe.Err(err)) + unwindOnError(registered) + return exitLifecycleError + } + + if err := mgr.Start(ctx); err != nil { + slog.Error("steward start failed", scribe.Err(err)) + unwindOnError(registered) + return exitLifecycleError + } + + slog.Info("lethe ready", slog.String("bind", cfg.Server.Bind)) + + // Block until SIGINT/SIGTERM (or a parent cancellation if main is ever + // embedded). After this point we own the shutdown sequence. + <-ctx.Done() + slog.Info("signal received; shutting down") + + // Use a fresh context for shutdown — ctx is already cancelled. The 15s + // budget bounds Stop+Destroy so a stuck dependency cannot keep the + // process alive indefinitely. + stopCtx, cancel := context.WithTimeout(context.Background(), shutdownGrace) + defer cancel() + + if err := mgr.Stop(stopCtx); err != nil { + slog.Error("steward stop returned error", scribe.Err(err)) + } + if err := mgr.Destroy(context.Background()); err != nil { + slog.Error("steward destroy returned error", scribe.Err(err)) + } + + slog.Info("lethe stopped") + return exitOK +} + +// unwindOnError walks the assets in reverse registration order and calls +// Destroy on each one that implements the destroyer interface. Individual +// errors are logged and swallowed so partial cleanup proceeds. Each Destroy +// call gets its own short-lived context so a hung dependency cannot block the +// rest. This compensates for the Phase 4 finding that steward.Manager does +// not unwind on its own when Init/Start fails. +func unwindOnError(registered []any) { + for i := len(registered) - 1; i >= 0; i-- { + svc, ok := registered[i].(destroyer) + if !ok { + continue + } + ctx, cancel := context.WithTimeout(context.Background(), perDestroyTimeout) + if err := svc.Destroy(ctx); err != nil { + slog.Warn("destroy on unwind failed", + slog.String("component", fmt.Sprintf("%T", registered[i])), + scribe.Err(err), + ) + } + cancel() + } } diff --git a/cmd/lethe/main_e2e_test.go b/cmd/lethe/main_e2e_test.go new file mode 100644 index 0000000000000000000000000000000000000000..11cac6bfbf7824b3eeb01b88fedefdc9c52996dd --- /dev/null +++ b/cmd/lethe/main_e2e_test.go @@ -0,0 +1,338 @@ +// End-to-end smoke test for the lethe binary. The test assembles the same +// steward graph main wires up — but with a :memory: SQLite, a random TCP +// port, and forward-auth (no OIDC) — and drives it via the real chi router +// over a real loopback listener. Two distinct users ingest identical +// (tool, host, session_id) tuples; the test confirms both rows coexist +// under different owners and that each user only sees their own data +// through the read API. +package main + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "testing" + "time" + + "go.bigb.es/auxilia/steward" + + "sourcecraft.dev/bigbes/lethe/internal/config" + "sourcecraft.dev/bigbes/lethe/internal/domain/ingest" + "sourcecraft.dev/bigbes/lethe/internal/domain/session" + "sourcecraft.dev/bigbes/lethe/internal/platform/database" + "sourcecraft.dev/bigbes/lethe/internal/platform/health" + "sourcecraft.dev/bigbes/lethe/internal/platform/observability" + "sourcecraft.dev/bigbes/lethe/internal/server" + authpkg "sourcecraft.dev/bigbes/lethe/internal/server/auth" +) + +// TestEndToEnd_MultiUserIsolation drives the full server graph and verifies +// the trust-model invariant: the stored owner is always the authenticated +// user, never the wire payload, and reads are scoped to that owner. +func TestEndToEnd_MultiUserIsolation(t *testing.T) { + cfg := &config.Config{ + Server: config.ServerConfig{ + // Port :0 lets the kernel assign a free port; we read it back + // from Server.Addr() once Start succeeds. + Bind: "127.0.0.1:0", + ShutdownGrace: 5 * time.Second, + }, + Database: config.DatabaseConfig{ + Path: ":memory:", + BusyTimeout: 5 * time.Second, + }, + Auth: config.AuthConfig{ + AllowedUsers: []string{"alice", "bob"}, + Admins: []string{}, + ForwardAuth: config.ForwardAuthConfig{ + Enabled: true, + UserHeader: "Remote-User", + }, + OIDC: config.OIDCConfig{Enabled: false}, + }, + Logging: config.LoggingConfig{ + Level: "info", + Format: "json", + }, + Ingest: config.IngestConfig{ + MaxBodyBytes: 16 * 1024 * 1024, + MaxTurnContentBytes: 4 * 1024 * 1024, + ChunkSize: 500, + }, + } + + mgr := steward.NewManager() + srv := &server.Server{} + + mgr.AddComponent(context.Background(), + steward.MustConfigurationAsset(cfg), + steward.MustServiceAsset(&observability.Logger{}), + steward.MustServiceAsset(&observability.Metrics{}), + steward.MustServiceAsset(&database.Database{}), + steward.MustServiceAsset(&health.DBCheck{}), + steward.MustServiceAsset(&health.Set{}), + steward.MustServiceAsset(&authpkg.Authenticator{}), + steward.MustServiceAsset(&ingest.Repository{}), + steward.MustServiceAsset(&ingest.Service{}), + steward.MustServiceAsset(&ingest.Handler{}), + steward.MustServiceAsset(&session.Repository{}), + steward.MustServiceAsset(&session.Handler{}), + steward.MustServiceAsset(srv, steward.Root()), + ) + + ctx := context.Background() + if err := mgr.Inject(ctx); err != nil { + t.Fatalf("Inject: %v", err) + } + if err := mgr.Init(ctx); err != nil { + t.Fatalf("Init: %v", err) + } + if err := mgr.Start(ctx); err != nil { + t.Fatalf("Start: %v", err) + } + t.Cleanup(func() { + stopCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := mgr.Stop(stopCtx); err != nil { + t.Errorf("Stop: %v", err) + } + if err := mgr.Destroy(context.Background()); err != nil { + t.Errorf("Destroy: %v", err) + } + }) + + addr := srv.Addr() + if addr == "" { + t.Fatalf("Server.Addr() empty after Start") + } + base := "http://" + addr + + // Wait for the listener to accept (Serve runs in a goroutine). + if !waitHealthy(t, base, 5*time.Second) { + t.Fatalf("server did not become healthy at %s", base) + } + + tool, host, sessionID := "claude-code", "laptop", "session-001" + + // --- alice ingest: 2 turns under (tool, host, sessionID) --- + alicePayload := makeNDJSON(tool, host, sessionID, []turnSpec{ + {turnID: "t1", seq: 0, role: "user", content: "alice question"}, + {turnID: "t2", seq: 1, role: "assistant", content: "alice answer"}, + }) + doIngest(t, base, "alice", alicePayload, 2) + + // alice GET sessions: exactly her one session. + aliceList := doListSessions(t, base, "alice") + if len(aliceList) != 1 { + t.Fatalf("alice list: got %d sessions, want 1", len(aliceList)) + } + if got := aliceList[0]["owner"]; got != "alice" { + t.Fatalf("alice list[0].owner = %v; want alice", got) + } + if got := aliceList[0]["session_id"]; got != sessionID { + t.Fatalf("alice list[0].session_id = %v; want %s", got, sessionID) + } + + // alice GET detail: her two turns, content matches. + aliceDetail := doGetSession(t, base, "alice", tool, host, sessionID) + if got := aliceDetail["owner"]; got != "alice" { + t.Fatalf("alice detail.owner = %v; want alice", got) + } + turns, _ := aliceDetail["turns"].([]any) + if len(turns) != 2 { + t.Fatalf("alice detail.turns: got %d, want 2", len(turns)) + } + if got := turns[0].(map[string]any)["content"]; got != "alice question" { + t.Fatalf("alice detail.turns[0].content = %v; want alice question", got) + } + + // --- bob ingest: same composite key, different content --- + bobPayload := makeNDJSON(tool, host, sessionID, []turnSpec{ + {turnID: "t1", seq: 0, role: "user", content: "bob question"}, + {turnID: "t2", seq: 1, role: "assistant", content: "bob answer"}, + }) + doIngest(t, base, "bob", bobPayload, 2) + + // alice still sees only her own row. + aliceList2 := doListSessions(t, base, "alice") + if len(aliceList2) != 1 || aliceList2[0]["owner"] != "alice" { + t.Fatalf("alice list after bob ingest: got %v, want exactly 1 alice session", aliceList2) + } + + // bob sees only his own row. + bobList := doListSessions(t, base, "bob") + if len(bobList) != 1 { + t.Fatalf("bob list: got %d sessions, want 1", len(bobList)) + } + if got := bobList[0]["owner"]; got != "bob" { + t.Fatalf("bob list[0].owner = %v; want bob", got) + } + + // alice detail: her content unchanged by bob's write. + aliceDetail2 := doGetSession(t, base, "alice", tool, host, sessionID) + turns2, _ := aliceDetail2["turns"].([]any) + if len(turns2) != 2 { + t.Fatalf("alice detail (post-bob).turns: got %d, want 2", len(turns2)) + } + if got := turns2[0].(map[string]any)["content"]; got != "alice question" { + t.Fatalf("alice detail (post-bob).turns[0].content = %v; want alice question (bob's write must not bleed through)", got) + } + + // bob detail: his content. + bobDetail := doGetSession(t, base, "bob", tool, host, sessionID) + if got := bobDetail["owner"]; got != "bob" { + t.Fatalf("bob detail.owner = %v; want bob", got) + } + turnsB, _ := bobDetail["turns"].([]any) + if len(turnsB) != 2 { + t.Fatalf("bob detail.turns: got %d, want 2", len(turnsB)) + } + if got := turnsB[0].(map[string]any)["content"]; got != "bob question" { + t.Fatalf("bob detail.turns[0].content = %v; want bob question", got) + } +} + +// waitHealthy polls /healthz until it returns 200 or the timeout elapses. +func waitHealthy(t *testing.T, base string, timeout time.Duration) bool { + t.Helper() + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + resp, err := http.Get(base + "/healthz") + if err == nil { + _ = resp.Body.Close() + if resp.StatusCode == http.StatusOK { + return true + } + } + time.Sleep(20 * time.Millisecond) + } + return false +} + +// turnSpec is the per-turn knob for makeNDJSON. +type turnSpec struct { + turnID string + seq int64 + role string + content string +} + +// makeNDJSON emits one TurnEvent per spec, all sharing (tool, host, sessionID) +// and SessionMeta.SourceFile. The trust-model invariant means the wire format +// has no "owner" field; the server stamps owner from auth. +func makeNDJSON(tool, host, sessionID string, specs []turnSpec) []byte { + var buf bytes.Buffer + now := time.Now().Unix() + for _, s := range specs { + obj := map[string]any{ + "tool": tool, + "host": host, + "session_id": sessionID, + "turn_id": s.turnID, + "seq": s.seq, + "role": s.role, + "timestamp": now + s.seq, + "content": s.content, + "session_meta": map[string]any{ + "source_file": "/tmp/" + sessionID + ".jsonl", + }, + } + raw, err := json.Marshal(obj) + if err != nil { + panic(err) + } + buf.Write(raw) + buf.WriteByte('\n') + } + return buf.Bytes() +} + +// doIngest POSTs payload to /api/v1/ingest as user and asserts the result +// reports wantAccepted accepted turns with no errors. +func doIngest(t *testing.T, base, user string, payload []byte, wantAccepted int) { + t.Helper() + req, err := http.NewRequest(http.MethodPost, base+"/api/v1/ingest", bytes.NewReader(payload)) + if err != nil { + t.Fatalf("ingest %s: build req: %v", user, err) + } + req.Header.Set("Content-Type", "application/x-ndjson") + req.Header.Set("Remote-User", user) + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("ingest %s: do: %v", user, err) + } + body, _ := io.ReadAll(resp.Body) + _ = resp.Body.Close() + if resp.StatusCode != http.StatusOK { + t.Fatalf("ingest %s: status = %d body=%s", user, resp.StatusCode, string(body)) + } + var result struct { + Accepted int `json:"accepted"` + Errors []map[string]interface{} `json:"errors"` + } + if err := json.Unmarshal(body, &result); err != nil { + t.Fatalf("ingest %s: parse body %s: %v", user, string(body), err) + } + if result.Accepted != wantAccepted { + t.Fatalf("ingest %s: accepted = %d; want %d (errors=%v)", user, result.Accepted, wantAccepted, result.Errors) + } + if len(result.Errors) != 0 { + t.Fatalf("ingest %s: unexpected errors: %v", user, result.Errors) + } +} + +// doListSessions GETs /api/v1/sessions as user and returns the parsed +// "sessions" slice. Asserts the call returned 200. +func doListSessions(t *testing.T, base, user string) []map[string]any { + t.Helper() + req, err := http.NewRequest(http.MethodGet, base+"/api/v1/sessions", nil) + if err != nil { + t.Fatalf("list %s: build req: %v", user, err) + } + req.Header.Set("Remote-User", user) + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("list %s: do: %v", user, err) + } + body, _ := io.ReadAll(resp.Body) + _ = resp.Body.Close() + if resp.StatusCode != http.StatusOK { + t.Fatalf("list %s: status = %d body=%s", user, resp.StatusCode, string(body)) + } + var out struct { + Sessions []map[string]any `json:"sessions"` + } + if err := json.Unmarshal(body, &out); err != nil { + t.Fatalf("list %s: parse body %s: %v", user, string(body), err) + } + return out.Sessions +} + +// doGetSession GETs /api/v1/sessions/{tool}/{host}/{session_id} as user and +// returns the parsed session-with-turns object. Asserts the call returned 200. +func doGetSession(t *testing.T, base, user, tool, host, sessionID string) map[string]any { + t.Helper() + url := fmt.Sprintf("%s/api/v1/sessions/%s/%s/%s", base, tool, host, sessionID) + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + t.Fatalf("get %s: build req: %v", user, err) + } + req.Header.Set("Remote-User", user) + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("get %s: do: %v", user, err) + } + body, _ := io.ReadAll(resp.Body) + _ = resp.Body.Close() + if resp.StatusCode != http.StatusOK { + t.Fatalf("get %s: status = %d body=%s", user, resp.StatusCode, string(body)) + } + var out map[string]any + if err := json.Unmarshal(body, &out); err != nil { + t.Fatalf("get %s: parse body %s: %v", user, string(body), err) + } + return out +} diff --git a/internal/platform/health/steward_unwind_test.go b/internal/platform/health/steward_unwind_test.go deleted file mode 100644 index 860123c4aab1db24ab00daba12056629dfb0726c..0000000000000000000000000000000000000000 --- a/internal/platform/health/steward_unwind_test.go +++ /dev/null @@ -1,77 +0,0 @@ -package health_test - -// This is the Phase 4 steward unwind canary. It verifies whether -// steward.Manager invokes Destroy on already-initialized siblings when a -// later component's Init returns an error. The lifecycle design in the -// lethe-server task assumes it does; if not, an explicit unwind step has to -// be added in main.go (Phase 9). -// -// The test is deliberately placed in package `health_test` (next to the -// health package) so it can import steward without polluting the health -// package and without introducing a brand-new package directory just for -// the canary. See the Phase 4 plan, point 16. -// -// IMPORTANT: this test is allowed to fail. A failure here is the very signal -// the dispatcher needs to plan an explicit unwind. Do NOT mark it Skip; do -// NOT add a workaround in this phase. - -import ( - "context" - "errors" - "testing" - - "go.bigb.es/auxilia/steward" -) - -// recordingService records whether Init and Destroy were called. It has no -// dependencies, so it can sit anywhere in the start order. -type recordingService struct { - initCalled bool - destroyCalled bool -} - -func (r *recordingService) Init(_ context.Context) error { - r.initCalled = true - return nil -} - -func (r *recordingService) Destroy(_ context.Context) error { - r.destroyCalled = true - return nil -} - -// failingService errors out of Init. It is registered after recordingService -// so that recordingService is already initialized at the point of failure. -type failingService struct{} - -var errFailing = errors.New("failingService.Init: intentional failure") - -func (f *failingService) Init(_ context.Context) error { return errFailing } - -func TestStewardUnwindsOnInitFailure(t *testing.T) { - rec := &recordingService{} - fail := &failingService{} - - mgr := steward.NewManager() - mgr.AddComponent(context.Background(), - steward.MustServiceAsset(rec, steward.Root(), steward.IgnoreUnused()), - steward.MustServiceAsset(fail, steward.Root(), steward.IgnoreUnused()), - ) - - if err := mgr.Inject(context.Background()); err != nil { - t.Fatalf("Inject: %v", err) - } - - initErr := mgr.Init(context.Background()) - if initErr == nil { - t.Fatalf("expected Init to surface failingService error, got nil") - } - if !rec.initCalled { - t.Fatalf("recordingService.Init was never called — registration order assumption broken") - } - if !rec.destroyCalled { - // THE FINDING. Steward did not unwind. main.go must call Destroy on - // every initialized sibling itself when Init fails. - t.Fatalf("steward did NOT call recordingService.Destroy after sibling Init failed; explicit unwind required in main") - } -} diff --git a/internal/server/server.go b/internal/server/server.go index 013e7b109f443518fdb77526e568e6fdec846cd3..d86786eafbb619a7c31691a9533bd2e19dfd6050 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -56,8 +56,19 @@ type Server struct { Ingest *ingest.Handler `inject:""` Sessions *session.Handler `inject:""` - router *chi.Mux - httpSrv *http.Server + router *chi.Mux + httpSrv *http.Server + listener net.Listener +} + +// Addr returns the actual bound address of the listener. It is empty until +// Start has succeeded. Used by tests that bind to "127.0.0.1:0" and need the +// kernel-assigned port; production callers should rely on Cfg.Bind directly. +func (s *Server) Addr() string { + if s.listener == nil { + return "" + } + return s.listener.Addr().String() } // Init validates the bind address and constructs the chi router with the @@ -91,16 +102,22 @@ func (s *Server) Init(_ context.Context) error { return nil } -// Start spawns ListenAndServe in the background and returns immediately. -// Errors are logged; steward observes lifecycle via Stop. +// Start binds the configured listener and serves requests in the background. +// The listener is opened synchronously so a bind failure surfaces here rather +// than from a goroutine; the actual port (when Cfg.Bind ends in ":0") is then +// available via Addr(). Steward observes lifecycle via Stop. func (s *Server) Start(_ context.Context) error { + ln, err := net.Listen("tcp", s.Cfg.Bind) + if err != nil { + return culpa.WithCode(culpa.Wrap(err, "listen on bind address"), "SERVER_LISTEN") + } + s.listener = ln s.httpSrv = &http.Server{ - Addr: s.Cfg.Bind, Handler: s.router, ReadHeaderTimeout: 10 * time.Second, } go func() { - if err := s.httpSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + if err := s.httpSrv.Serve(ln); err != nil && !errors.Is(err, http.ErrServerClosed) { slog.Default().Error("server crashed", scribe.Err(err)) } }()