~bigbes/lethe

ref: ccee575b9f5055ffbd0f82cdd70bb82934240b54 lethe/internal/collector/ingest/outbox.go -rw-r--r-- 2.2 KiB
ccee575b — Eugene Blikh feat: add GFM markdown support including tables via remark-gfm 24 days ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package ingest

import (
	"bytes"
	"context"
	"fmt"
	"log/slog"

	"go.bigb.es/auxilia/culpa"
	"sourcecraft.dev/bigbes/lethe/internal/collector/state"
)

// ReplayOutbox sends the oldest outbox rows to the server, deleting each row
// only when the server reports full acceptance (no errors and accepted count
// equals the number of events in that row's payload).
func ReplayOutbox(ctx context.Context, store *state.Store, sender *Sender, limit int) error {
	rows, err := store.Oldest(ctx, limit)
	if err != nil {
		return culpa.Wrap(err, "fetch outbox rows")
	}

	for _, row := range rows {
		result, err := sender.postRaw(ctx, row.Payload)
		if err != nil {
			return culpa.Wrap(err, "replay outbox row")
		}

		eventCount := bytes.Count(row.Payload, []byte("\n"))
		if len(result.Errors) == 0 && result.Accepted == eventCount {
			if err := store.Delete(ctx, []int64{row.ID}); err != nil {
				return culpa.Wrap(err, "delete replayed outbox row")
			}
		}
		// Partial accept or server-side errors: leave row for next retry.
	}

	return nil
}

// EnforceOutboxLimit drops the oldest rows until total payload bytes are
// within maxBytes. It never blocks the ingestion loop.
func EnforceOutboxLimit(ctx context.Context, store *state.Store, maxBytes int64) error {
	for {
		st, err := store.Stats(ctx)
		if err != nil {
			return culpa.Wrap(err, "get outbox stats")
		}
		if st.OutboxBytes <= maxBytes {
			return nil
		}

		rows, err := store.Oldest(ctx, 1000) // reasonable batch size
		if err != nil {
			return culpa.Wrap(err, "fetch oldest for trim")
		}
		if len(rows) == 0 {
			return culpa.WithCode(
				fmt.Errorf("outbox still exceeds limit (%d bytes > %d) with no rows remaining", st.OutboxBytes, maxBytes),
				"OUTBOX_TRIM_INCONSISTENT",
			)
		}

		excess := st.OutboxBytes - maxBytes
		var deleteIDs []int64
		var dropped int64
		for _, row := range rows {
			deleteIDs = append(deleteIDs, row.ID)
			dropped += int64(len(row.Payload))
			if dropped >= excess {
				break
			}
		}

		if len(deleteIDs) > 0 {
			slog.Warn("outbox overflow: dropped oldest rows", "dropped_rows", len(deleteIDs), "dropped_bytes", dropped)
			if err := store.Delete(ctx, deleteIDs); err != nil {
				return culpa.Wrap(err, "trim outbox")
			}
		}
	}
}