// Package ingest handles HTTP posting of event batches to the lethe server
// and outbox replay for durability.
package ingest
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"go.bigb.es/auxilia/culpa"
"sourcecraft.dev/bigbes/lethe/internal/shared/wire"
)
// LineError is the server's per-line failure shape.
type LineError struct {
Line int `json:"line"`
Err string `json:"error"`
}
// Result is the server's response to a batch ingest POST.
type Result struct {
Accepted int `json:"accepted"`
Errors []LineError `json:"errors"`
}
// Sender POSTs NDJSON batches to the lethe ingest endpoint.
type Sender struct {
serverURL string
client *http.Client
}
// NewSender builds a Sender that posts to serverURL + /api/v1/ingest.
func NewSender(serverURL string, client *http.Client) *Sender {
return &Sender{serverURL: serverURL, client: client}
}
// PostBatch encodes events as NDJSON and POSTs them to the ingest endpoint.
func (s *Sender) PostBatch(ctx context.Context, events []wire.TurnEvent) (Result, error) {
body, err := EncodeNDJSON(events)
if err != nil {
return Result{}, culpa.Wrap(err, "encode ndjson")
}
return s.postRaw(ctx, body)
}
// postRaw sends raw NDJSON bytes and decodes the server's response.
func (s *Sender) postRaw(ctx context.Context, body []byte) (Result, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodPost, s.serverURL+"/api/v1/ingest", bytes.NewReader(body))
if err != nil {
return Result{}, culpa.Wrap(err, "build request")
}
req.Header.Set("Content-Type", "application/x-ndjson")
resp, err := s.client.Do(req)
if err != nil {
return Result{}, culpa.Wrap(err, "post batch")
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
body, _ := io.ReadAll(resp.Body)
return Result{}, culpa.WithCode(
fmt.Errorf("ingest %s (body: %s)", resp.Status, string(body)),
"INGEST_HTTP",
)
}
var result Result
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return Result{}, culpa.Wrap(err, "decode ingest response")
}
return result, nil
}
// EncodeNDJSON serialises a slice of TurnEvent into newline-delimited JSON.
// Each event is one JSON object per line; the final line ends with a newline.
func EncodeNDJSON(events []wire.TurnEvent) ([]byte, error) {
if len(events) == 0 {
return nil, nil
}
var buf bytes.Buffer
enc := json.NewEncoder(&buf)
for _, ev := range events {
if err := enc.Encode(ev); err != nil {
return nil, culpa.Wrap(err, "encode event")
}
}
return buf.Bytes(), nil
}