package ingest import ( "fmt" "go.bigb.es/auxilia/culpa" "sourcecraft.dev/bigbes/lethe/internal/shared/wire" ) // Batch is a POST-sized slice of events plus their indexes in the parsed file // result. EventIndexes lets accepted counts map back to source offsets. type Batch struct { Events []wire.TurnEvent EventIndexes []int } // BuildBatches splits events by both line and encoded byte caps. func BuildBatches(events []wire.TurnEvent, maxLines int, maxBytes int) ([]Batch, error) { if maxLines <= 0 { return nil, culpa.WithCode(fmt.Errorf("max lines must be positive: %d", maxLines), "BATCH_INVALID_CAP") } if maxBytes <= 0 { return nil, culpa.WithCode(fmt.Errorf("max bytes must be positive: %d", maxBytes), "BATCH_INVALID_CAP") } if len(events) == 0 { return nil, nil } batches := make([]Batch, 0, (len(events)+maxLines-1)/maxLines) var current Batch var currentBytes int flush := func() { if len(current.Events) == 0 { return } batches = append(batches, current) current = Batch{} currentBytes = 0 } for i, event := range events { eventBytes, err := encodedEventLen(event) if err != nil { return nil, err } if eventBytes > maxBytes { return nil, culpa.WithCode( fmt.Errorf("event %d encodes to %d bytes, above max %d", i, eventBytes, maxBytes), "BATCH_EVENT_TOO_LARGE", ) } lineCapReached := len(current.Events) >= maxLines byteCapReached := currentBytes+eventBytes > maxBytes if lineCapReached || byteCapReached { flush() } current.Events = append(current.Events, event) current.EventIndexes = append(current.EventIndexes, i) currentBytes += eventBytes } flush() return batches, nil } func encodedEventLen(event wire.TurnEvent) (int, error) { data, err := EncodeNDJSON([]wire.TurnEvent{event}) if err != nil { return 0, culpa.Wrap(err, "encode event for batch sizing") } return len(data), nil }