package ingest
import (
"bytes"
"context"
"go.bigb.es/auxilia/culpa"
"sourcecraft.dev/bigbes/lethe/internal/collector/state"
)
// ReplayOutbox sends the oldest outbox rows to the server, deleting each row
// only when the server reports full acceptance (no errors and accepted count
// equals the number of events in that row's payload).
func ReplayOutbox(ctx context.Context, store *state.Store, sender *Sender, limit int) error {
rows, err := store.Oldest(ctx, limit)
if err != nil {
return culpa.Wrap(err, "fetch outbox rows")
}
for _, row := range rows {
result, err := sender.postRaw(ctx, row.Payload)
if err != nil {
return culpa.Wrap(err, "replay outbox row")
}
eventCount := bytes.Count(row.Payload, []byte("\n"))
if len(result.Errors) == 0 && result.Accepted == eventCount {
if err := store.Delete(ctx, []int64{row.ID}); err != nil {
return culpa.Wrap(err, "delete replayed outbox row")
}
}
// Partial accept or server-side errors: leave row for next retry.
}
return nil
}
// EnforceOutboxLimit drops the oldest rows until total payload bytes are
// within maxBytes. It never blocks the ingestion loop.
func EnforceOutboxLimit(ctx context.Context, store *state.Store, maxBytes int64) error {
st, err := store.Stats(ctx)
if err != nil {
return culpa.Wrap(err, "get outbox stats")
}
if st.OutboxBytes <= maxBytes {
return nil
}
excess := st.OutboxBytes - maxBytes
rows, err := store.Oldest(ctx, 1000) // reasonable batch size
if err != nil {
return culpa.Wrap(err, "fetch oldest for trim")
}
var deleteIDs []int64
var dropped int64
for _, row := range rows {
deleteIDs = append(deleteIDs, row.ID)
dropped += int64(len(row.Payload))
if dropped >= excess {
break
}
}
if len(deleteIDs) > 0 {
if err := store.Delete(ctx, deleteIDs); err != nil {
return culpa.Wrap(err, "trim outbox")
}
}
return nil
}