~bigbes/lethe

ref: 27c47142dda459f4dd552a8ceba4934eca81ff6b lethe/internal/collector/ingest/batch.go -rw-r--r-- 1.9 KiB
27c47142 — Eugene Blikh collector: add polling source runner 24 days ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package ingest

import (
	"fmt"

	"go.bigb.es/auxilia/culpa"
	"sourcecraft.dev/bigbes/lethe/internal/shared/wire"
)

// Batch is a POST-sized slice of events plus their indexes in the parsed file
// result. EventIndexes lets accepted counts map back to source offsets.
type Batch struct {
	Events       []wire.TurnEvent
	EventIndexes []int
}

// BuildBatches splits events by both line and encoded byte caps.
func BuildBatches(events []wire.TurnEvent, maxLines int, maxBytes int) ([]Batch, error) {
	if maxLines <= 0 {
		return nil, culpa.WithCode(fmt.Errorf("max lines must be positive: %d", maxLines), "BATCH_INVALID_CAP")
	}
	if maxBytes <= 0 {
		return nil, culpa.WithCode(fmt.Errorf("max bytes must be positive: %d", maxBytes), "BATCH_INVALID_CAP")
	}
	if len(events) == 0 {
		return nil, nil
	}

	batches := make([]Batch, 0, (len(events)+maxLines-1)/maxLines)
	var current Batch
	var currentBytes int

	flush := func() {
		if len(current.Events) == 0 {
			return
		}
		batches = append(batches, current)
		current = Batch{}
		currentBytes = 0
	}

	for i, event := range events {
		eventBytes, err := encodedEventLen(event)
		if err != nil {
			return nil, err
		}
		if eventBytes > maxBytes {
			return nil, culpa.WithCode(
				fmt.Errorf("event %d encodes to %d bytes, above max %d", i, eventBytes, maxBytes),
				"BATCH_EVENT_TOO_LARGE",
			)
		}

		lineCapReached := len(current.Events) >= maxLines
		byteCapReached := currentBytes+eventBytes > maxBytes
		if lineCapReached || byteCapReached {
			flush()
		}

		current.Events = append(current.Events, event)
		current.EventIndexes = append(current.EventIndexes, i)
		currentBytes += eventBytes
	}
	flush()

	return batches, nil
}

func encodedEventLen(event wire.TurnEvent) (int, error) {
	data, err := EncodeNDJSON([]wire.TurnEvent{event})
	if err != nil {
		return 0, culpa.Wrap(err, "encode event for batch sizing")
	}
	return len(data), nil
}