package ingest
import (
"context"
"errors"
"fmt"
"log/slog"
"math"
"time"
"go.bigb.es/auxilia/async"
"go.bigb.es/auxilia/culpa"
"sourcecraft.dev/bigbes/lethe/internal/collector/config"
"sourcecraft.dev/bigbes/lethe/internal/collector/parser"
"sourcecraft.dev/bigbes/lethe/internal/collector/state"
"sourcecraft.dev/bigbes/lethe/internal/shared/wire"
)
const replayOutboxLimit = 100
// enforceAndReplay enforces the outbox size limit then replays queued rows.
func enforceAndReplay(ctx context.Context, store *state.Store, sender *Sender, maxBytes int64, src config.SourceConfig) error {
var errs []error
if err := EnforceOutboxLimit(ctx, store, maxBytes); err != nil {
if ctxErr := ctx.Err(); ctxErr != nil {
return errors.Join(ctxErr, culpa.Wrap(err, "enforce outbox limit"))
}
slog.Warn("enforce outbox limit failed", "tool", src.Tool, "error", err)
errs = append(errs, culpa.Wrap(err, "enforce outbox limit"))
}
if err := ReplayOutbox(ctx, store, sender, replayOutboxLimit); err != nil {
if ctxErr := ctx.Err(); ctxErr != nil {
return errors.Join(append(errs, ctxErr, culpa.Wrap(err, "replay outbox"))...)
}
slog.Warn("outbox replay failed", "tool", src.Tool, "error", err)
errs = append(errs, culpa.Wrap(err, "replay outbox"))
}
return errors.Join(errs...)
}
// RunOnce replays the safety-net outbox, scans one configured source, parses
// new complete records from persisted offsets, posts batches, and advances
// offsets only after server acceptance confirms the corresponding lines.
func RunOnce(ctx context.Context, cfg config.Config, src config.SourceConfig, p parser.Parser, store *state.Store, sender *Sender) error {
var errs []error
if p.Tool() != src.Tool {
return culpa.WithCode(
fmt.Errorf("parser tool %q does not match source tool %q", p.Tool(), src.Tool),
"RUNNER_TOOL_MISMATCH",
)
}
if err := enforceAndReplay(ctx, store, sender, cfg.Outbox.MaxBytes, src); err != nil {
if ctxErr := ctx.Err(); ctxErr != nil {
return errors.Join(append(errs, err)...)
}
errs = append(errs, err)
}
files, err := p.Discover(src.Path)
if err != nil {
return errors.Join(append(errs, culpa.Wrap(err, "discover source files"))...)
}
for _, file := range files {
if err := ctx.Err(); err != nil {
return errors.Join(append(errs, err)...)
}
if err := runFile(ctx, cfg, src, p, store, sender, file.Path); err != nil {
slog.Warn("source file ingest failed", "tool", src.Tool, "source_file", file.Path, "error", err)
errs = append(errs, err)
}
}
return errors.Join(errs...)
}
// RunBackfillOnce is like RunOnce but starts every matched file at offset 0
// instead of the persisted offset. Accepted progress is still saved, so an
// interrupted backfill can be resumed via RunOnce.
func RunBackfillOnce(ctx context.Context, cfg config.Config, src config.SourceConfig, p parser.Parser, store *state.Store, sender *Sender) error {
var errs []error
if p.Tool() != src.Tool {
return culpa.WithCode(
fmt.Errorf("parser tool %q does not match source tool %q", p.Tool(), src.Tool),
"RUNNER_TOOL_MISMATCH",
)
}
if err := enforceAndReplay(ctx, store, sender, cfg.Outbox.MaxBytes, src); err != nil {
if ctxErr := ctx.Err(); ctxErr != nil {
return errors.Join(append(errs, err)...)
}
errs = append(errs, err)
}
files, err := p.Discover(src.Path)
if err != nil {
return errors.Join(append(errs, culpa.Wrap(err, "discover source files"))...)
}
for _, file := range files {
if err := ctx.Err(); err != nil {
return errors.Join(append(errs, err)...)
}
if err := runFileFromOffset(ctx, cfg, src, p, store, sender, file.Path, 0); err != nil {
slog.Warn("source file ingest failed", "tool", src.Tool, "source_file", file.Path, "error", err)
errs = append(errs, err)
}
}
return errors.Join(errs...)
}
// RunDaemon starts one independent polling loop per configured source and
// exits after the supplied context is canceled and all loops have stopped.
func RunDaemon(ctx context.Context, cfg config.Config, parsers map[string]parser.Parser, store *state.Store, sender *Sender) error {
if len(cfg.Sources) == 0 {
<-ctx.Done()
return nil
}
group := async.NewGroup[struct{}]()
for _, source := range cfg.Sources {
src := source
p, ok := parsers[src.Tool]
if !ok {
return culpa.WithCode(fmt.Errorf("missing parser for source tool %q", src.Tool), "RUNNER_MISSING_PARSER")
}
group.AddExecutor(func(loopCtx context.Context) (struct{}, error) {
pollSource(loopCtx, cfg, src, p, store, sender)
return struct{}{}, nil
})
}
if err := group.Start(ctx); err != nil {
return culpa.Wrap(err, "start source polling loops")
}
if err := group.All(context.Background()); err != nil {
return culpa.Wrap(err, "wait for source polling loops")
}
return nil
}
func pollSource(ctx context.Context, cfg config.Config, src config.SourceConfig, p parser.Parser, store *state.Store, sender *Sender) {
runAndLog := func() {
if ctx.Err() != nil {
return
}
// Use a bounded context independent of daemon cancellation so an
// in-flight RunOnce is not aborted by SIGTERM.
runCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), cfg.HTTP.Timeout)
defer cancel()
if err := RunOnce(runCtx, cfg, src, p, store, sender); err != nil && runCtx.Err() == nil {
slog.Warn("source poll failed", "tool", src.Tool, "path", src.Path, "error", err)
}
}
runAndLog()
ticker := time.NewTicker(src.PollInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
runAndLog()
}
}
}
func runFile(ctx context.Context, cfg config.Config, src config.SourceConfig, p parser.Parser, store *state.Store, sender *Sender, path string) error {
offset, err := store.GetOffset(ctx, src.Tool, path)
if err != nil {
return culpa.Wrap(err, "get source offset")
}
return runFileFromOffset(ctx, cfg, src, p, store, sender, path, offset)
}
func runFileFromOffset(ctx context.Context, cfg config.Config, src config.SourceConfig, p parser.Parser, store *state.Store, sender *Sender, path string, startOffset int64) error {
events, newOffset, err := p.Parse(path, startOffset)
if err != nil {
return culpa.Wrap(err, "parse source file")
}
if len(events) == 0 {
return nil
}
stampHost(events, cfg.Host)
if src.BatchMaxBytes > math.MaxInt {
return culpa.WithCode(fmt.Errorf("batch max bytes %d exceeds int max", src.BatchMaxBytes), "RUNNER_BATCH_CAP")
}
batches, err := BuildBatches(events, src.BatchMaxLines, int(src.BatchMaxBytes))
if err != nil {
return culpa.Wrap(err, "build event batches")
}
for _, batch := range batches {
result, err := sender.PostBatch(ctx, batch.Events)
if err != nil {
if enqueueErr := enqueueBatch(ctx, store, src, cfg.Host, path, batch.Events, cfg.Outbox.MaxBytes); enqueueErr != nil {
return errors.Join(culpa.Wrap(err, "post batch"), enqueueErr)
}
return culpa.Wrap(err, "post batch")
}
advanced, err := persistAcceptedOffset(ctx, store, sender, cfg, src, path, events, newOffset, batch, result)
if err != nil {
return err
}
if advanced < len(batch.Events) {
return nil
}
}
return nil
}
func stampHost(events []wire.TurnEvent, host string) {
for i := range events {
events[i].Host = host
}
}
func enqueueBatch(ctx context.Context, store *state.Store, src config.SourceConfig, host string, path string, events []wire.TurnEvent, maxOutboxBytes int64) error {
payload, err := EncodeNDJSON(events)
if err != nil {
return culpa.Wrap(err, "encode failed batch")
}
if err := store.Enqueue(ctx, state.OutboxItem{Tool: src.Tool, Host: host, SourceFile: path, Payload: payload}); err != nil {
return culpa.Wrap(err, "enqueue failed batch")
}
if err := EnforceOutboxLimit(ctx, store, maxOutboxBytes); err != nil {
return culpa.Wrap(err, "enforce outbox limit")
}
return nil
}
func persistAcceptedOffset(ctx context.Context, store *state.Store, sender *Sender, cfg config.Config, src config.SourceConfig, path string, allEvents []wire.TurnEvent, newOffset int64, batch Batch, result Result) (int, error) {
if result.Accepted < 0 || result.Accepted > len(batch.Events) {
return 0, culpa.WithCode(
fmt.Errorf("server accepted %d events for batch length %d", result.Accepted, len(batch.Events)),
"RUNNER_ACCEPTED_INVALID",
)
}
if result.Accepted == len(batch.Events) && len(result.Errors) > 0 {
return 0, culpa.WithCode(
fmt.Errorf("server accepted all %d events but reported %d errors", len(batch.Events), len(result.Errors)),
"RUNNER_ACCEPTED_MISMATCH",
)
}
if len(result.Errors) == 0 {
if result.Accepted == 0 {
return 0, nil
}
nextGlobalIndex := batch.EventIndexes[0] + result.Accepted
offset := eventOffset(allEvents, newOffset, nextGlobalIndex)
if err := store.SaveOffset(ctx, src.Tool, path, offset); err != nil {
return 0, culpa.Wrap(err, "save accepted offset")
}
return result.Accepted, nil
}
// There are errors — use the first error's line to identify the failed row.
failedLine := result.Errors[0].Line
if failedLine < 1 || failedLine > len(batch.Events) {
return 0, culpa.WithCode(
fmt.Errorf("server error line %d out of range [1,%d]", failedLine, len(batch.Events)),
"RUNNER_ERROR_LINE_INVALID",
)
}
failedLocalIdx := failedLine - 1
if result.Accepted > failedLocalIdx {
return 0, culpa.WithCode(
fmt.Errorf("server accepted %d events but first error is at line %d", result.Accepted, failedLine),
"RUNNER_ACCEPTED_MISMATCH",
)
}
// Persist any already-accepted rows.
if result.Accepted > 0 {
nextGlobalIndex := batch.EventIndexes[0] + result.Accepted
offset := eventOffset(allEvents, newOffset, nextGlobalIndex)
if err := store.SaveOffset(ctx, src.Tool, path, offset); err != nil {
return 0, culpa.Wrap(err, "save accepted offset")
}
}
// If there are uncommitted valid rows between accepted and the failed row,
// post that prefix safely with the same rules.
if result.Accepted < failedLocalIdx {
prefix := Batch{
Events: batch.Events[result.Accepted:failedLocalIdx],
EventIndexes: batch.EventIndexes[result.Accepted:failedLocalIdx],
}
prefixResult, err := sender.PostBatch(ctx, prefix.Events)
if err != nil {
if enqueueErr := enqueueBatch(ctx, store, src, cfg.Host, path, prefix.Events, cfg.Outbox.MaxBytes); enqueueErr != nil {
return 0, errors.Join(culpa.Wrap(err, "post prefix batch"), enqueueErr)
}
return 0, culpa.Wrap(err, "post prefix batch")
}
prefixAdvanced, err := persistAcceptedOffset(ctx, store, sender, cfg, src, path, allEvents, newOffset, prefix, prefixResult)
if err != nil {
return 0, err
}
if prefixAdvanced < len(prefix.Events) {
// The prefix itself hit an error or could not advance fully.
// Return total advanced so far (accepted rows + prefix advancement).
return result.Accepted + prefixAdvanced, nil
}
}
// Skip the failed row so it does not loop forever.
nextGlobalIndex := batch.EventIndexes[0] + failedLocalIdx + 1
offset := eventOffset(allEvents, newOffset, nextGlobalIndex)
if err := store.SaveOffset(ctx, src.Tool, path, offset); err != nil {
return 0, culpa.Wrap(err, "save offset past failed row")
}
return failedLocalIdx + 1, nil
}
func eventOffset(allEvents []wire.TurnEvent, newOffset int64, nextGlobalIndex int) int64 {
if nextGlobalIndex >= len(allEvents) {
return newOffset
}
return allEvents[nextGlobalIndex].Seq
}