package ingest import ( "bytes" "context" "go.bigb.es/auxilia/culpa" "sourcecraft.dev/bigbes/lethe/internal/collector/state" ) // ReplayOutbox sends the oldest outbox rows to the server, deleting each row // only when the server reports full acceptance (no errors and accepted count // equals the number of events in that row's payload). func ReplayOutbox(ctx context.Context, store *state.Store, sender *Sender, limit int) error { rows, err := store.Oldest(ctx, limit) if err != nil { return culpa.Wrap(err, "fetch outbox rows") } for _, row := range rows { result, err := sender.postRaw(ctx, row.Payload) if err != nil { return culpa.Wrap(err, "replay outbox row") } eventCount := bytes.Count(row.Payload, []byte("\n")) if len(result.Errors) == 0 && result.Accepted == eventCount { if err := store.Delete(ctx, []int64{row.ID}); err != nil { return culpa.Wrap(err, "delete replayed outbox row") } } // Partial accept or server-side errors: leave row for next retry. } return nil } // EnforceOutboxLimit drops the oldest rows until total payload bytes are // within maxBytes. It never blocks the ingestion loop. func EnforceOutboxLimit(ctx context.Context, store *state.Store, maxBytes int64) error { st, err := store.Stats(ctx) if err != nil { return culpa.Wrap(err, "get outbox stats") } if st.OutboxBytes <= maxBytes { return nil } excess := st.OutboxBytes - maxBytes rows, err := store.Oldest(ctx, 1000) // reasonable batch size if err != nil { return culpa.Wrap(err, "fetch oldest for trim") } var deleteIDs []int64 var dropped int64 for _, row := range rows { deleteIDs = append(deleteIDs, row.ID) dropped += int64(len(row.Payload)) if dropped >= excess { break } } if len(deleteIDs) > 0 { if err := store.Delete(ctx, deleteIDs); err != nil { return culpa.Wrap(err, "trim outbox") } } return nil }