package ingest
import (
"fmt"
"go.bigb.es/auxilia/culpa"
"sourcecraft.dev/bigbes/lethe/internal/shared/wire"
)
// Batch is a POST-sized slice of events plus their indexes in the parsed file
// result. EventIndexes lets accepted counts map back to source offsets.
type Batch struct {
Events []wire.TurnEvent
EventIndexes []int
}
// BuildBatches splits events by both line and encoded byte caps.
func BuildBatches(events []wire.TurnEvent, maxLines int, maxBytes int) ([]Batch, error) {
if maxLines <= 0 {
return nil, culpa.WithCode(fmt.Errorf("max lines must be positive: %d", maxLines), "BATCH_INVALID_CAP")
}
if maxBytes <= 0 {
return nil, culpa.WithCode(fmt.Errorf("max bytes must be positive: %d", maxBytes), "BATCH_INVALID_CAP")
}
if len(events) == 0 {
return nil, nil
}
batches := make([]Batch, 0, (len(events)+maxLines-1)/maxLines)
var current Batch
var currentBytes int
flush := func() {
if len(current.Events) == 0 {
return
}
batches = append(batches, current)
current = Batch{}
currentBytes = 0
}
for i, event := range events {
eventBytes, err := encodedEventLen(event)
if err != nil {
return nil, err
}
if eventBytes > maxBytes {
return nil, culpa.WithCode(
fmt.Errorf("event %d encodes to %d bytes, above max %d", i, eventBytes, maxBytes),
"BATCH_EVENT_TOO_LARGE",
)
}
lineCapReached := len(current.Events) >= maxLines
byteCapReached := currentBytes+eventBytes > maxBytes
if lineCapReached || byteCapReached {
flush()
}
current.Events = append(current.Events, event)
current.EventIndexes = append(current.EventIndexes, i)
currentBytes += eventBytes
}
flush()
return batches, nil
}
func encodedEventLen(event wire.TurnEvent) (int, error) {
data, err := EncodeNDJSON([]wire.TurnEvent{event})
if err != nil {
return 0, culpa.Wrap(err, "encode event for batch sizing")
}
return len(data), nil
}