~bigbes/lethe

ref: 72508a536dd54662af89cce0002f6f3dee1ed0f8 lethe/internal/collector/ingest/sender.go -rw-r--r-- 2.6 KiB
72508a53 — Eugene Blikh docs: refresh search and opencode plan 24 days ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
// Package ingest handles HTTP posting of event batches to the lethe server
// and outbox replay for durability.
package ingest

import (
	"bytes"
	"context"
	"encoding/json"
	"fmt"
	"io"
	"net/http"
	"strings"

	"go.bigb.es/auxilia/culpa"
	"sourcecraft.dev/bigbes/lethe/internal/shared/wire"
)

// LineError is the server's per-line failure shape.
type LineError struct {
	Line int    `json:"line"`
	Err  string `json:"error"`
}

// Result is the server's response to a batch ingest POST.
type Result struct {
	Accepted int         `json:"accepted"`
	Errors   []LineError `json:"errors"`
}

// Sender POSTs NDJSON batches to the lethe ingest endpoint.
type Sender struct {
	serverURL string
	client    *http.Client
}

// NewSender builds a Sender that posts to serverURL + /api/v1/ingest.
func NewSender(serverURL string, client *http.Client) *Sender {
	return &Sender{serverURL: strings.TrimRight(serverURL, "/"), client: client}
}

// PostBatch encodes events as NDJSON and POSTs them to the ingest endpoint.
func (s *Sender) PostBatch(ctx context.Context, events []wire.TurnEvent) (Result, error) {
	body, err := EncodeNDJSON(events)
	if err != nil {
		return Result{}, culpa.Wrap(err, "encode ndjson")
	}
	return s.postRaw(ctx, body)
}

// postRaw sends raw NDJSON bytes and decodes the server's response.
func (s *Sender) postRaw(ctx context.Context, body []byte) (Result, error) {
	req, err := http.NewRequestWithContext(ctx, http.MethodPost, s.serverURL+"/api/v1/ingest", bytes.NewReader(body))
	if err != nil {
		return Result{}, culpa.Wrap(err, "build request")
	}
	req.Header.Set("Content-Type", "application/x-ndjson")

	resp, err := s.client.Do(req)
	if err != nil {
		return Result{}, culpa.Wrap(err, "post batch")
	}
	defer func() { _ = resp.Body.Close() }()

	if resp.StatusCode < 200 || resp.StatusCode >= 300 {
		body, _ := io.ReadAll(resp.Body)
		return Result{}, culpa.WithCode(
			fmt.Errorf("ingest %s (body: %s)", resp.Status, string(body)),
			"INGEST_HTTP",
		)
	}

	var result Result
	if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
		return Result{}, culpa.Wrap(err, "decode ingest response")
	}
	return result, nil
}

// EncodeNDJSON serialises a slice of TurnEvent into newline-delimited JSON.
// Each event is one JSON object per line; the final line ends with a newline.
func EncodeNDJSON(events []wire.TurnEvent) ([]byte, error) {
	if len(events) == 0 {
		return nil, nil
	}
	var buf bytes.Buffer
	enc := json.NewEncoder(&buf)
	for _, ev := range events {
		if err := enc.Encode(ev); err != nil {
			return nil, culpa.Wrap(err, "encode event")
		}
	}
	return buf.Bytes(), nil
}