@@ 22,6 22,7 @@ is just the server.
```bash
cp config.example.yaml config.yaml
+# edit config.yaml — at minimum set auth.allowed_users
just run
# or, for hot reload during development:
just air
@@ 29,6 30,29 @@ just air
The server reads `config.yaml` by default. Pass `-config <path>` to override.
+Once the server is up (default bind `127.0.0.1:8080`), exercise the API:
+
+```bash
+# Public probes (no auth):
+curl http://127.0.0.1:8080/healthz
+curl http://127.0.0.1:8080/readyz
+curl http://127.0.0.1:8080/metrics
+
+# Forward-auth (the reverse proxy normally injects this header):
+curl -H 'Remote-User: bigbes' http://127.0.0.1:8080/api/v1/sessions
+
+# Ingest one NDJSON line as that user:
+printf '%s\n' '{"tool":"claude-code","host":"laptop","session_id":"s1","turn_id":"t1","seq":0,"role":"user","timestamp":1700000000,"content":"hi","session_meta":{"source_file":"/tmp/s1.jsonl"}}' \
+ | curl -H 'Remote-User: bigbes' -H 'Content-Type: application/x-ndjson' \
+ --data-binary @- http://127.0.0.1:8080/api/v1/ingest
+
+# Read the session back:
+curl -H 'Remote-User: bigbes' http://127.0.0.1:8080/api/v1/sessions/claude-code/laptop/s1
+
+# OIDC bearer (when auth.oidc.enabled: true):
+curl -H "Authorization: Bearer $JWT" http://127.0.0.1:8080/api/v1/sessions
+```
+
## Trust model
`lethe` always binds `127.0.0.1`. It is not safe to expose it to the
@@ 36,6 60,25 @@ network directly. All authentication assumes a trusted reverse proxy on
the same host (e.g. Caddy or Traefik) that terminates TLS and either
injects auth headers or relays an OIDC bearer.
+```
+ client ──TLS──▶ Caddy ──forward_auth──▶ Authelia
+ │ │
+ │ ◀── 200 + headers ────┘
+ │ (Remote-User, Remote-Groups, ...)
+ │
+ └─HTTP─▶ lethe (127.0.0.1:8080)
+ │
+ └─▶ SQLite (./data/lethe.db)
+```
+
+The proxy MUST strip any `Remote-User` (and the other `Remote-*` headers)
+that arrive on the inbound request before it issues the forward-auth
+sub-request — otherwise a hostile client can spoof an identity by setting
+the header itself. Authelia's documentation calls this out; Caddy's
+`copy_headers` directive only adds Authelia's response headers, it does
+not clear inbound ones, so add an explicit `header_down -Remote-*` (or
+equivalent) when you pass requests through.
+
The implicit assumption is that nothing else on the host binds
`127.0.0.1` and forges auth headers. That is the whole trust boundary
for path (a) below.
@@ 144,27 187,55 @@ the proxy. Everything under `/api/v1/*` is authed.
| Method | Path | Auth | Notes |
|--------|-----------------------------------------------------|------|-------|
-| POST | `/api/v1/ingest` | yes | NDJSON body of `TurnEvent` records. Idempotent at the turn level per owner. |
-| GET | `/api/v1/sessions` | yes | Paginated. Filters: `tool`, `host`, `since`, `until`. Admins may pass `?owner=<user>` or `?owner=*`. |
+| POST | `/api/v1/ingest` | yes | `Content-Type: application/x-ndjson`; one `TurnEvent` per line. Idempotent at the turn level per owner. |
+| GET | `/api/v1/sessions` | yes | Paginated. Filters: `tool`, `host`, `since`, `until`, `limit`, `offset`. Admins may pass `?owner=<user>` or `?owner=*`. |
| GET | `/api/v1/sessions/{tool}/{host}/{session_id}` | yes | Full session with turns inline. Admins may pass `?owner=<user>`. |
-| GET | `/healthz` | no | Liveness. |
-| GET | `/readyz` | no | Readiness — DB ping etc. |
-| GET | `/metrics` | no | Prometheus. |
+| GET | `/healthz` | no | Liveness — constant 200 `ok`. |
+| GET | `/readyz` | no | Readiness — runs every registered probe; 200 with `{"checks":{...}}` or 503. |
+| GET | `/metrics` | no | Prometheus exposition. |
+
+Response shapes (success):
+
+- **POST `/api/v1/ingest`** → `200` with `{"accepted": <int>, "errors": [{"line": <int>, "error": <string>}]}`. The endpoint always returns 200 when the body is well-formed enough to read; per-line failures are reported in `errors` and the client uses `accepted` to advance its offset bookkeeping.
+- **GET `/api/v1/sessions`** → `200` with `{"sessions": [Session, ...], "limit": <int>, "offset": <int>}`. The echoed `limit`/`offset` reflect the (clamped) effective values.
+- **GET `/api/v1/sessions/{tool}/{host}/{session_id}`** → `200` with the `Session` columns flattened at the top level plus `"turns": [Turn, ...]` ordered by `seq` ascending.
The `owner` field is server-derived; the wire format has no `owner`. A
non-admin caller passing `?owner=` gets 403.
-Errors are RFC 7807 `application/problem+json`.
+Errors are RFC 7807 `application/problem+json` with the lethe-specific
+`code` extension carrying the machine-readable culpa code (e.g.
+`UNAUTHORIZED`, `NOT_FOUND`, `INVALID`, `DB_OPEN`).
## Production deployment
-`lethe` binds `127.0.0.1` only. Run it on the same host as a reverse
-proxy that terminates TLS and injects auth headers (or relays an OIDC
-bearer). Do not expose the listener to the network directly.
+`lethe` binds `127.0.0.1` only — non-loopback binds are rejected at
+startup with a `CONFIG_INVALID` error. Run it on the same host as a
+reverse proxy (Caddy or Traefik) that terminates TLS and injects auth
+headers (or relays an OIDC bearer). Never publish `:8080` directly.
When run via the bundled `docker-compose.yml`, the service does not
publish a port to the host — it only `expose`s `8080` on the compose
-network. The reverse proxy reaches lethe through that network.
+network. The reverse proxy reaches lethe through that network. The
+SQLite file lives at `./data/lethe.db` on the host (mounted into the
+container at `/data`); take backups against the host path.
+
+## Operational notes
+
+- **Health checks** — point your orchestrator at `GET /healthz` (constant
+ 200, no DB touch) for liveness and `GET /readyz` (DB ping with a 5s
+ budget) for readiness.
+- **Metrics** — `GET /metrics` exposes Prometheus series prefixed with
+ `lethe_*` plus the standard process and Go runtime collectors. Scrape
+ it locally; do not expose `:8080` to the network.
+- **Lifecycle** — on `SIGINT`/`SIGTERM` lethe runs a 15s graceful stop
+ (in-flight requests drain, then the listener closes), then closes the
+ database. If init fails before `Start` succeeds, lethe walks the
+ partially-initialized assets in reverse and calls `Destroy` on each so
+ no resource leaks.
+- **Logs** — set `logging.format: json` for structured production
+ logging; `tint` is the friendly developer format. `request_id` and
+ `user` are stamped on every record originating from the HTTP layer.
## Backup
@@ 188,9 259,22 @@ PATH=/usr/bin:/bin
```
.
-├── cmd/lethe/ # main, thin shell
+├── cmd/lethe/ # main, thin shell + e2e smoke
├── internal/
-│ └── shared/wire/ # locked NDJSON contract shared with the collector
+│ ├── config/ # YAML+env loader, validators, defaults
+│ ├── domain/
+│ │ ├── ingest/ # NDJSON ingest: handler, service, repo
+│ │ └── session/ # sessions read API: handler, repo
+│ ├── pkg/
+│ │ ├── apierror/ # culpa-error → RFC 7807 problem renderer
+│ │ └── httputil/ # NDJSON scanner, JSON write helper
+│ ├── platform/
+│ │ ├── database/ # sqlx wrapper + embed.FS migrations
+│ │ ├── health/ # Checker + steward-aggregated Set
+│ │ └── observability/ # slog logger + Prometheus metrics
+│ ├── server/ # chi router, middleware, route mount
+│ │ └── auth/ # forward-auth + OIDC verifier
+│ └── shared/wire/ # locked NDJSON contract (collector-shared)
├── config.example.yaml
├── Justfile
├── .air.toml
@@ 198,7 282,3 @@ PATH=/usr/bin:/bin
├── docker-compose.yml
└── .golangci.yml
```
-
-The rest of the layout (`internal/config`, `internal/server`,
-`internal/domain/...`, `internal/platform/...`, `migrations/`) lands in
-later phases.
@@ 1,18 1,193 @@
-// Command lethe is the lethe server binary. Phase 1 ships a stub that prints
-// the version and selected config path; real wiring lands in Phase 9.
+// Command lethe is the lethe server binary. main.go is a thin shell: it loads
+// the configuration, registers every steward asset that makes up the running
+// server, and orchestrates the lifecycle (Inject -> Init -> Start -> wait for
+// signal -> Stop -> Destroy). All business logic lives in the assets.
+//
+// Steward unwind compensation: per the Phase 4 finding, steward.Manager does
+// NOT call Destroy on already-init'd siblings when a later component's Init
+// returns an error. main keeps a parallel destroyer slice in registration
+// order; on Init/Start failure it walks the slice in reverse and calls Destroy
+// directly, swallowing individual errors so the rest of the cleanup proceeds.
package main
import (
+ "context"
"flag"
"fmt"
+ "log/slog"
+ "os"
+ "os/signal"
+ "syscall"
+ "time"
+
+ "go.bigb.es/auxilia/scribe"
+ "go.bigb.es/auxilia/steward"
+
+ "sourcecraft.dev/bigbes/lethe/internal/config"
+ "sourcecraft.dev/bigbes/lethe/internal/domain/ingest"
+ "sourcecraft.dev/bigbes/lethe/internal/domain/session"
+ "sourcecraft.dev/bigbes/lethe/internal/platform/database"
+ "sourcecraft.dev/bigbes/lethe/internal/platform/health"
+ "sourcecraft.dev/bigbes/lethe/internal/platform/observability"
+ "sourcecraft.dev/bigbes/lethe/internal/server"
+ authpkg "sourcecraft.dev/bigbes/lethe/internal/server/auth"
)
-const version = "0.1.0-dev"
+const (
+ version = "0.1.0-dev"
+ shutdownGrace = 15 * time.Second
+ perDestroyTimeout = 5 * time.Second
+ exitOK = 0
+ exitConfigError = 1
+ exitLifecycleError = 2
+)
+
+// destroyer is the local interface every steward asset that holds resources
+// implements. The unwind compensator narrows registered services to this
+// interface via type-assert; assets that hold no resources are skipped.
+type destroyer interface {
+ Destroy(context.Context) error
+}
func main() {
+ os.Exit(run())
+}
+
+// run is the testable entry point. It returns an exit code rather than
+// calling os.Exit directly so future tests (or a smoke harness) can exercise
+// startup and shutdown without tearing down the test binary.
+func run() int {
configPath := flag.String("config", "config.yaml", "path to YAML config file")
flag.Parse()
- fmt.Printf("lethe %s\n", version)
- fmt.Printf("config: %s\n", *configPath)
+ // Bootstrap a stderr logger before anything else so the unwind path always
+ // has a working slog.Default(). The Logger asset's Init (Phase 4) replaces
+ // this with the configured handler once it runs.
+ slog.SetDefault(slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo})))
+
+ slog.Info("lethe starting", slog.String("version", version), slog.String("config", *configPath))
+
+ cfg, err := config.Load(*configPath)
+ if err != nil {
+ slog.Error("load config", scribe.Err(err))
+ return exitConfigError
+ }
+
+ // signal.NotifyContext converts SIGINT/SIGTERM into ctx cancellation. The
+ // stop closure is also called explicitly when we want to release the OS
+ // signal handlers (e.g. before the shutdown phase).
+ ctx, stopSignals := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
+ defer stopSignals()
+
+ mgr := steward.NewManager()
+
+ // Underlying service struct pointers are tracked here in registration
+ // order so the unwind compensator can walk them in reverse on Init/Start
+ // failure.
+ var (
+ loggerSvc = &observability.Logger{}
+ metricsSvc = &observability.Metrics{}
+ dbSvc = &database.Database{}
+ dbCheckSvc = &health.DBCheck{}
+ healthSetSvc = &health.Set{}
+ authSvc = &authpkg.Authenticator{}
+ ingestRepo = &ingest.Repository{}
+ ingestSvc = &ingest.Service{}
+ ingestHnd = &ingest.Handler{}
+ sessionRepo = &session.Repository{}
+ sessionHnd = &session.Handler{}
+ serverSvc = &server.Server{}
+ )
+
+ registered := []any{
+ loggerSvc, metricsSvc, dbSvc, dbCheckSvc, healthSetSvc,
+ authSvc, ingestRepo, ingestSvc, ingestHnd,
+ sessionRepo, sessionHnd, serverSvc,
+ }
+
+ mgr.AddComponent(ctx,
+ steward.MustConfigurationAsset(cfg),
+ steward.MustServiceAsset(loggerSvc),
+ steward.MustServiceAsset(metricsSvc),
+ steward.MustServiceAsset(dbSvc),
+ steward.MustServiceAsset(dbCheckSvc),
+ steward.MustServiceAsset(healthSetSvc),
+ steward.MustServiceAsset(authSvc),
+ steward.MustServiceAsset(ingestRepo),
+ steward.MustServiceAsset(ingestSvc),
+ steward.MustServiceAsset(ingestHnd),
+ steward.MustServiceAsset(sessionRepo),
+ steward.MustServiceAsset(sessionHnd),
+ steward.MustServiceAsset(serverSvc, steward.Root()),
+ )
+
+ if cfg.Auth.OIDC.Enabled {
+ oidcSvc := &authpkg.OIDCVerifier{}
+ registered = append(registered, oidcSvc)
+ mgr.AddComponent(ctx, steward.MustServiceAsset(oidcSvc))
+ }
+
+ if err := mgr.Inject(ctx); err != nil {
+ slog.Error("steward inject failed", scribe.Err(err))
+ unwindOnError(registered)
+ return exitLifecycleError
+ }
+
+ if err := mgr.Init(ctx); err != nil {
+ slog.Error("steward init failed", scribe.Err(err))
+ unwindOnError(registered)
+ return exitLifecycleError
+ }
+
+ if err := mgr.Start(ctx); err != nil {
+ slog.Error("steward start failed", scribe.Err(err))
+ unwindOnError(registered)
+ return exitLifecycleError
+ }
+
+ slog.Info("lethe ready", slog.String("bind", cfg.Server.Bind))
+
+ // Block until SIGINT/SIGTERM (or a parent cancellation if main is ever
+ // embedded). After this point we own the shutdown sequence.
+ <-ctx.Done()
+ slog.Info("signal received; shutting down")
+
+ // Use a fresh context for shutdown — ctx is already cancelled. The 15s
+ // budget bounds Stop+Destroy so a stuck dependency cannot keep the
+ // process alive indefinitely.
+ stopCtx, cancel := context.WithTimeout(context.Background(), shutdownGrace)
+ defer cancel()
+
+ if err := mgr.Stop(stopCtx); err != nil {
+ slog.Error("steward stop returned error", scribe.Err(err))
+ }
+ if err := mgr.Destroy(context.Background()); err != nil {
+ slog.Error("steward destroy returned error", scribe.Err(err))
+ }
+
+ slog.Info("lethe stopped")
+ return exitOK
+}
+
+// unwindOnError walks the assets in reverse registration order and calls
+// Destroy on each one that implements the destroyer interface. Individual
+// errors are logged and swallowed so partial cleanup proceeds. Each Destroy
+// call gets its own short-lived context so a hung dependency cannot block the
+// rest. This compensates for the Phase 4 finding that steward.Manager does
+// not unwind on its own when Init/Start fails.
+func unwindOnError(registered []any) {
+ for i := len(registered) - 1; i >= 0; i-- {
+ svc, ok := registered[i].(destroyer)
+ if !ok {
+ continue
+ }
+ ctx, cancel := context.WithTimeout(context.Background(), perDestroyTimeout)
+ if err := svc.Destroy(ctx); err != nil {
+ slog.Warn("destroy on unwind failed",
+ slog.String("component", fmt.Sprintf("%T", registered[i])),
+ scribe.Err(err),
+ )
+ }
+ cancel()
+ }
}
@@ 0,0 1,338 @@
+// End-to-end smoke test for the lethe binary. The test assembles the same
+// steward graph main wires up — but with a :memory: SQLite, a random TCP
+// port, and forward-auth (no OIDC) — and drives it via the real chi router
+// over a real loopback listener. Two distinct users ingest identical
+// (tool, host, session_id) tuples; the test confirms both rows coexist
+// under different owners and that each user only sees their own data
+// through the read API.
+package main
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "fmt"
+ "io"
+ "net/http"
+ "testing"
+ "time"
+
+ "go.bigb.es/auxilia/steward"
+
+ "sourcecraft.dev/bigbes/lethe/internal/config"
+ "sourcecraft.dev/bigbes/lethe/internal/domain/ingest"
+ "sourcecraft.dev/bigbes/lethe/internal/domain/session"
+ "sourcecraft.dev/bigbes/lethe/internal/platform/database"
+ "sourcecraft.dev/bigbes/lethe/internal/platform/health"
+ "sourcecraft.dev/bigbes/lethe/internal/platform/observability"
+ "sourcecraft.dev/bigbes/lethe/internal/server"
+ authpkg "sourcecraft.dev/bigbes/lethe/internal/server/auth"
+)
+
+// TestEndToEnd_MultiUserIsolation drives the full server graph and verifies
+// the trust-model invariant: the stored owner is always the authenticated
+// user, never the wire payload, and reads are scoped to that owner.
+func TestEndToEnd_MultiUserIsolation(t *testing.T) {
+ cfg := &config.Config{
+ Server: config.ServerConfig{
+ // Port :0 lets the kernel assign a free port; we read it back
+ // from Server.Addr() once Start succeeds.
+ Bind: "127.0.0.1:0",
+ ShutdownGrace: 5 * time.Second,
+ },
+ Database: config.DatabaseConfig{
+ Path: ":memory:",
+ BusyTimeout: 5 * time.Second,
+ },
+ Auth: config.AuthConfig{
+ AllowedUsers: []string{"alice", "bob"},
+ Admins: []string{},
+ ForwardAuth: config.ForwardAuthConfig{
+ Enabled: true,
+ UserHeader: "Remote-User",
+ },
+ OIDC: config.OIDCConfig{Enabled: false},
+ },
+ Logging: config.LoggingConfig{
+ Level: "info",
+ Format: "json",
+ },
+ Ingest: config.IngestConfig{
+ MaxBodyBytes: 16 * 1024 * 1024,
+ MaxTurnContentBytes: 4 * 1024 * 1024,
+ ChunkSize: 500,
+ },
+ }
+
+ mgr := steward.NewManager()
+ srv := &server.Server{}
+
+ mgr.AddComponent(context.Background(),
+ steward.MustConfigurationAsset(cfg),
+ steward.MustServiceAsset(&observability.Logger{}),
+ steward.MustServiceAsset(&observability.Metrics{}),
+ steward.MustServiceAsset(&database.Database{}),
+ steward.MustServiceAsset(&health.DBCheck{}),
+ steward.MustServiceAsset(&health.Set{}),
+ steward.MustServiceAsset(&authpkg.Authenticator{}),
+ steward.MustServiceAsset(&ingest.Repository{}),
+ steward.MustServiceAsset(&ingest.Service{}),
+ steward.MustServiceAsset(&ingest.Handler{}),
+ steward.MustServiceAsset(&session.Repository{}),
+ steward.MustServiceAsset(&session.Handler{}),
+ steward.MustServiceAsset(srv, steward.Root()),
+ )
+
+ ctx := context.Background()
+ if err := mgr.Inject(ctx); err != nil {
+ t.Fatalf("Inject: %v", err)
+ }
+ if err := mgr.Init(ctx); err != nil {
+ t.Fatalf("Init: %v", err)
+ }
+ if err := mgr.Start(ctx); err != nil {
+ t.Fatalf("Start: %v", err)
+ }
+ t.Cleanup(func() {
+ stopCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ if err := mgr.Stop(stopCtx); err != nil {
+ t.Errorf("Stop: %v", err)
+ }
+ if err := mgr.Destroy(context.Background()); err != nil {
+ t.Errorf("Destroy: %v", err)
+ }
+ })
+
+ addr := srv.Addr()
+ if addr == "" {
+ t.Fatalf("Server.Addr() empty after Start")
+ }
+ base := "http://" + addr
+
+ // Wait for the listener to accept (Serve runs in a goroutine).
+ if !waitHealthy(t, base, 5*time.Second) {
+ t.Fatalf("server did not become healthy at %s", base)
+ }
+
+ tool, host, sessionID := "claude-code", "laptop", "session-001"
+
+ // --- alice ingest: 2 turns under (tool, host, sessionID) ---
+ alicePayload := makeNDJSON(tool, host, sessionID, []turnSpec{
+ {turnID: "t1", seq: 0, role: "user", content: "alice question"},
+ {turnID: "t2", seq: 1, role: "assistant", content: "alice answer"},
+ })
+ doIngest(t, base, "alice", alicePayload, 2)
+
+ // alice GET sessions: exactly her one session.
+ aliceList := doListSessions(t, base, "alice")
+ if len(aliceList) != 1 {
+ t.Fatalf("alice list: got %d sessions, want 1", len(aliceList))
+ }
+ if got := aliceList[0]["owner"]; got != "alice" {
+ t.Fatalf("alice list[0].owner = %v; want alice", got)
+ }
+ if got := aliceList[0]["session_id"]; got != sessionID {
+ t.Fatalf("alice list[0].session_id = %v; want %s", got, sessionID)
+ }
+
+ // alice GET detail: her two turns, content matches.
+ aliceDetail := doGetSession(t, base, "alice", tool, host, sessionID)
+ if got := aliceDetail["owner"]; got != "alice" {
+ t.Fatalf("alice detail.owner = %v; want alice", got)
+ }
+ turns, _ := aliceDetail["turns"].([]any)
+ if len(turns) != 2 {
+ t.Fatalf("alice detail.turns: got %d, want 2", len(turns))
+ }
+ if got := turns[0].(map[string]any)["content"]; got != "alice question" {
+ t.Fatalf("alice detail.turns[0].content = %v; want alice question", got)
+ }
+
+ // --- bob ingest: same composite key, different content ---
+ bobPayload := makeNDJSON(tool, host, sessionID, []turnSpec{
+ {turnID: "t1", seq: 0, role: "user", content: "bob question"},
+ {turnID: "t2", seq: 1, role: "assistant", content: "bob answer"},
+ })
+ doIngest(t, base, "bob", bobPayload, 2)
+
+ // alice still sees only her own row.
+ aliceList2 := doListSessions(t, base, "alice")
+ if len(aliceList2) != 1 || aliceList2[0]["owner"] != "alice" {
+ t.Fatalf("alice list after bob ingest: got %v, want exactly 1 alice session", aliceList2)
+ }
+
+ // bob sees only his own row.
+ bobList := doListSessions(t, base, "bob")
+ if len(bobList) != 1 {
+ t.Fatalf("bob list: got %d sessions, want 1", len(bobList))
+ }
+ if got := bobList[0]["owner"]; got != "bob" {
+ t.Fatalf("bob list[0].owner = %v; want bob", got)
+ }
+
+ // alice detail: her content unchanged by bob's write.
+ aliceDetail2 := doGetSession(t, base, "alice", tool, host, sessionID)
+ turns2, _ := aliceDetail2["turns"].([]any)
+ if len(turns2) != 2 {
+ t.Fatalf("alice detail (post-bob).turns: got %d, want 2", len(turns2))
+ }
+ if got := turns2[0].(map[string]any)["content"]; got != "alice question" {
+ t.Fatalf("alice detail (post-bob).turns[0].content = %v; want alice question (bob's write must not bleed through)", got)
+ }
+
+ // bob detail: his content.
+ bobDetail := doGetSession(t, base, "bob", tool, host, sessionID)
+ if got := bobDetail["owner"]; got != "bob" {
+ t.Fatalf("bob detail.owner = %v; want bob", got)
+ }
+ turnsB, _ := bobDetail["turns"].([]any)
+ if len(turnsB) != 2 {
+ t.Fatalf("bob detail.turns: got %d, want 2", len(turnsB))
+ }
+ if got := turnsB[0].(map[string]any)["content"]; got != "bob question" {
+ t.Fatalf("bob detail.turns[0].content = %v; want bob question", got)
+ }
+}
+
+// waitHealthy polls /healthz until it returns 200 or the timeout elapses.
+func waitHealthy(t *testing.T, base string, timeout time.Duration) bool {
+ t.Helper()
+ deadline := time.Now().Add(timeout)
+ for time.Now().Before(deadline) {
+ resp, err := http.Get(base + "/healthz")
+ if err == nil {
+ _ = resp.Body.Close()
+ if resp.StatusCode == http.StatusOK {
+ return true
+ }
+ }
+ time.Sleep(20 * time.Millisecond)
+ }
+ return false
+}
+
+// turnSpec is the per-turn knob for makeNDJSON.
+type turnSpec struct {
+ turnID string
+ seq int64
+ role string
+ content string
+}
+
+// makeNDJSON emits one TurnEvent per spec, all sharing (tool, host, sessionID)
+// and SessionMeta.SourceFile. The trust-model invariant means the wire format
+// has no "owner" field; the server stamps owner from auth.
+func makeNDJSON(tool, host, sessionID string, specs []turnSpec) []byte {
+ var buf bytes.Buffer
+ now := time.Now().Unix()
+ for _, s := range specs {
+ obj := map[string]any{
+ "tool": tool,
+ "host": host,
+ "session_id": sessionID,
+ "turn_id": s.turnID,
+ "seq": s.seq,
+ "role": s.role,
+ "timestamp": now + s.seq,
+ "content": s.content,
+ "session_meta": map[string]any{
+ "source_file": "/tmp/" + sessionID + ".jsonl",
+ },
+ }
+ raw, err := json.Marshal(obj)
+ if err != nil {
+ panic(err)
+ }
+ buf.Write(raw)
+ buf.WriteByte('\n')
+ }
+ return buf.Bytes()
+}
+
+// doIngest POSTs payload to /api/v1/ingest as user and asserts the result
+// reports wantAccepted accepted turns with no errors.
+func doIngest(t *testing.T, base, user string, payload []byte, wantAccepted int) {
+ t.Helper()
+ req, err := http.NewRequest(http.MethodPost, base+"/api/v1/ingest", bytes.NewReader(payload))
+ if err != nil {
+ t.Fatalf("ingest %s: build req: %v", user, err)
+ }
+ req.Header.Set("Content-Type", "application/x-ndjson")
+ req.Header.Set("Remote-User", user)
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil {
+ t.Fatalf("ingest %s: do: %v", user, err)
+ }
+ body, _ := io.ReadAll(resp.Body)
+ _ = resp.Body.Close()
+ if resp.StatusCode != http.StatusOK {
+ t.Fatalf("ingest %s: status = %d body=%s", user, resp.StatusCode, string(body))
+ }
+ var result struct {
+ Accepted int `json:"accepted"`
+ Errors []map[string]interface{} `json:"errors"`
+ }
+ if err := json.Unmarshal(body, &result); err != nil {
+ t.Fatalf("ingest %s: parse body %s: %v", user, string(body), err)
+ }
+ if result.Accepted != wantAccepted {
+ t.Fatalf("ingest %s: accepted = %d; want %d (errors=%v)", user, result.Accepted, wantAccepted, result.Errors)
+ }
+ if len(result.Errors) != 0 {
+ t.Fatalf("ingest %s: unexpected errors: %v", user, result.Errors)
+ }
+}
+
+// doListSessions GETs /api/v1/sessions as user and returns the parsed
+// "sessions" slice. Asserts the call returned 200.
+func doListSessions(t *testing.T, base, user string) []map[string]any {
+ t.Helper()
+ req, err := http.NewRequest(http.MethodGet, base+"/api/v1/sessions", nil)
+ if err != nil {
+ t.Fatalf("list %s: build req: %v", user, err)
+ }
+ req.Header.Set("Remote-User", user)
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil {
+ t.Fatalf("list %s: do: %v", user, err)
+ }
+ body, _ := io.ReadAll(resp.Body)
+ _ = resp.Body.Close()
+ if resp.StatusCode != http.StatusOK {
+ t.Fatalf("list %s: status = %d body=%s", user, resp.StatusCode, string(body))
+ }
+ var out struct {
+ Sessions []map[string]any `json:"sessions"`
+ }
+ if err := json.Unmarshal(body, &out); err != nil {
+ t.Fatalf("list %s: parse body %s: %v", user, string(body), err)
+ }
+ return out.Sessions
+}
+
+// doGetSession GETs /api/v1/sessions/{tool}/{host}/{session_id} as user and
+// returns the parsed session-with-turns object. Asserts the call returned 200.
+func doGetSession(t *testing.T, base, user, tool, host, sessionID string) map[string]any {
+ t.Helper()
+ url := fmt.Sprintf("%s/api/v1/sessions/%s/%s/%s", base, tool, host, sessionID)
+ req, err := http.NewRequest(http.MethodGet, url, nil)
+ if err != nil {
+ t.Fatalf("get %s: build req: %v", user, err)
+ }
+ req.Header.Set("Remote-User", user)
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil {
+ t.Fatalf("get %s: do: %v", user, err)
+ }
+ body, _ := io.ReadAll(resp.Body)
+ _ = resp.Body.Close()
+ if resp.StatusCode != http.StatusOK {
+ t.Fatalf("get %s: status = %d body=%s", user, resp.StatusCode, string(body))
+ }
+ var out map[string]any
+ if err := json.Unmarshal(body, &out); err != nil {
+ t.Fatalf("get %s: parse body %s: %v", user, string(body), err)
+ }
+ return out
+}