// Package ingest handles HTTP posting of event batches to the lethe server // and outbox replay for durability. package ingest import ( "bytes" "context" "encoding/json" "fmt" "io" "net/http" "go.bigb.es/auxilia/culpa" "sourcecraft.dev/bigbes/lethe/internal/shared/wire" ) // Result is the server's response to a batch ingest POST. type Result struct { Accepted int `json:"accepted"` Errors []string `json:"errors"` } // Sender POSTs NDJSON batches to the lethe ingest endpoint. type Sender struct { serverURL string client *http.Client } // NewSender builds a Sender that posts to serverURL + /api/v1/ingest. func NewSender(serverURL string, client *http.Client) *Sender { return &Sender{serverURL: serverURL, client: client} } // PostBatch encodes events as NDJSON and POSTs them to the ingest endpoint. func (s *Sender) PostBatch(ctx context.Context, events []wire.TurnEvent) (Result, error) { body, err := EncodeNDJSON(events) if err != nil { return Result{}, culpa.Wrap(err, "encode ndjson") } return s.postRaw(ctx, body) } // postRaw sends raw NDJSON bytes and decodes the server's response. func (s *Sender) postRaw(ctx context.Context, body []byte) (Result, error) { req, err := http.NewRequestWithContext(ctx, http.MethodPost, s.serverURL+"/api/v1/ingest", bytes.NewReader(body)) if err != nil { return Result{}, culpa.Wrap(err, "build request") } req.Header.Set("Content-Type", "application/x-ndjson") resp, err := s.client.Do(req) if err != nil { return Result{}, culpa.Wrap(err, "post batch") } defer func() { _ = resp.Body.Close() }() if resp.StatusCode < 200 || resp.StatusCode >= 300 { body, _ := io.ReadAll(resp.Body) return Result{}, culpa.WithCode( fmt.Errorf("ingest %s: %s (body: %s)", resp.Status, string(body), resp.Status), "INGEST_HTTP", ) } var result Result if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { return Result{}, culpa.Wrap(err, "decode ingest response") } return result, nil } // EncodeNDJSON serialises a slice of TurnEvent into newline-delimited JSON. // Each event is one JSON object per line; the final line ends with a newline. func EncodeNDJSON(events []wire.TurnEvent) ([]byte, error) { if len(events) == 0 { return nil, nil } var buf bytes.Buffer enc := json.NewEncoder(&buf) for _, ev := range events { if err := enc.Encode(ev); err != nil { return nil, culpa.Wrap(err, "encode event") } } return buf.Bytes(), nil }