package ingest import ( "context" "errors" "fmt" "log/slog" "math" "time" "go.bigb.es/auxilia/async" "go.bigb.es/auxilia/culpa" "sourcecraft.dev/bigbes/lethe/internal/collector/config" "sourcecraft.dev/bigbes/lethe/internal/collector/parser" "sourcecraft.dev/bigbes/lethe/internal/collector/state" "sourcecraft.dev/bigbes/lethe/internal/shared/wire" ) const replayOutboxLimit = 100 // enforceAndReplay enforces the outbox size limit then replays queued rows. func enforceAndReplay(ctx context.Context, store *state.Store, sender *Sender, maxBytes int64, src config.SourceConfig) error { var errs []error if err := EnforceOutboxLimit(ctx, store, maxBytes); err != nil { if ctxErr := ctx.Err(); ctxErr != nil { return errors.Join(ctxErr, culpa.Wrap(err, "enforce outbox limit")) } slog.Warn("enforce outbox limit failed", "tool", src.Tool, "error", err) errs = append(errs, culpa.Wrap(err, "enforce outbox limit")) } if err := ReplayOutbox(ctx, store, sender, replayOutboxLimit); err != nil { if ctxErr := ctx.Err(); ctxErr != nil { return errors.Join(append(errs, ctxErr, culpa.Wrap(err, "replay outbox"))...) } slog.Warn("outbox replay failed", "tool", src.Tool, "error", err) errs = append(errs, culpa.Wrap(err, "replay outbox")) } return errors.Join(errs...) } // RunOnce replays the safety-net outbox, scans one configured source, parses // new complete records from persisted offsets, posts batches, and advances // offsets only after server acceptance confirms the corresponding lines. func RunOnce(ctx context.Context, cfg config.Config, src config.SourceConfig, p parser.Parser, store *state.Store, sender *Sender) error { var errs []error if p.Tool() != src.Tool { return culpa.WithCode( fmt.Errorf("parser tool %q does not match source tool %q", p.Tool(), src.Tool), "RUNNER_TOOL_MISMATCH", ) } if err := enforceAndReplay(ctx, store, sender, cfg.Outbox.MaxBytes, src); err != nil { if ctxErr := ctx.Err(); ctxErr != nil { return errors.Join(append(errs, err)...) } errs = append(errs, err) } files, err := p.Discover(src.Path) if err != nil { return errors.Join(append(errs, culpa.Wrap(err, "discover source files"))...) } for _, file := range files { if err := ctx.Err(); err != nil { return errors.Join(append(errs, err)...) } if err := runFile(ctx, cfg, src, p, store, sender, file.Path); err != nil { slog.Warn("source file ingest failed", "tool", src.Tool, "source_file", file.Path, "error", err) errs = append(errs, err) } } return errors.Join(errs...) } // RunBackfillOnce is like RunOnce but starts every matched file at offset 0 // instead of the persisted offset. Accepted progress is still saved, so an // interrupted backfill can be resumed via RunOnce. func RunBackfillOnce(ctx context.Context, cfg config.Config, src config.SourceConfig, p parser.Parser, store *state.Store, sender *Sender) error { var errs []error if p.Tool() != src.Tool { return culpa.WithCode( fmt.Errorf("parser tool %q does not match source tool %q", p.Tool(), src.Tool), "RUNNER_TOOL_MISMATCH", ) } if err := enforceAndReplay(ctx, store, sender, cfg.Outbox.MaxBytes, src); err != nil { if ctxErr := ctx.Err(); ctxErr != nil { return errors.Join(append(errs, err)...) } errs = append(errs, err) } files, err := p.Discover(src.Path) if err != nil { return errors.Join(append(errs, culpa.Wrap(err, "discover source files"))...) } for _, file := range files { if err := ctx.Err(); err != nil { return errors.Join(append(errs, err)...) } if err := runFileFromOffset(ctx, cfg, src, p, store, sender, file.Path, 0); err != nil { slog.Warn("source file ingest failed", "tool", src.Tool, "source_file", file.Path, "error", err) errs = append(errs, err) } } return errors.Join(errs...) } // RunDaemon starts one independent polling loop per configured source and // exits after the supplied context is canceled and all loops have stopped. func RunDaemon(ctx context.Context, cfg config.Config, parsers map[string]parser.Parser, store *state.Store, sender *Sender) error { if len(cfg.Sources) == 0 { <-ctx.Done() return nil } group := async.NewGroup[struct{}]() for _, source := range cfg.Sources { src := source p, ok := parsers[src.Tool] if !ok { return culpa.WithCode(fmt.Errorf("missing parser for source tool %q", src.Tool), "RUNNER_MISSING_PARSER") } group.AddExecutor(func(loopCtx context.Context) (struct{}, error) { pollSource(loopCtx, cfg, src, p, store, sender) return struct{}{}, nil }) } if err := group.Start(ctx); err != nil { return culpa.Wrap(err, "start source polling loops") } if err := group.All(context.Background()); err != nil { return culpa.Wrap(err, "wait for source polling loops") } return nil } func pollSource(ctx context.Context, cfg config.Config, src config.SourceConfig, p parser.Parser, store *state.Store, sender *Sender) { runAndLog := func() { if ctx.Err() != nil { return } // Use a bounded context independent of daemon cancellation so an // in-flight RunOnce is not aborted by SIGTERM. runCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), cfg.HTTP.Timeout) defer cancel() if err := RunOnce(runCtx, cfg, src, p, store, sender); err != nil && runCtx.Err() == nil { slog.Warn("source poll failed", "tool", src.Tool, "path", src.Path, "error", err) } } runAndLog() ticker := time.NewTicker(src.PollInterval) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: runAndLog() } } } func runFile(ctx context.Context, cfg config.Config, src config.SourceConfig, p parser.Parser, store *state.Store, sender *Sender, path string) error { offset, err := store.GetOffset(ctx, src.Tool, path) if err != nil { return culpa.Wrap(err, "get source offset") } return runFileFromOffset(ctx, cfg, src, p, store, sender, path, offset) } func runFileFromOffset(ctx context.Context, cfg config.Config, src config.SourceConfig, p parser.Parser, store *state.Store, sender *Sender, path string, startOffset int64) error { events, newOffset, err := p.Parse(path, startOffset) if err != nil { return culpa.Wrap(err, "parse source file") } if len(events) == 0 { if newOffset > startOffset { if err := store.SaveOffset(ctx, src.Tool, path, newOffset); err != nil { return culpa.Wrap(err, "save skipped-only offset") } } return nil } stampHost(events, cfg.Host) if src.BatchMaxBytes > math.MaxInt { return culpa.WithCode(fmt.Errorf("batch max bytes %d exceeds int max", src.BatchMaxBytes), "RUNNER_BATCH_CAP") } batches, err := BuildBatches(events, src.BatchMaxLines, int(src.BatchMaxBytes)) if err != nil { return culpa.Wrap(err, "build event batches") } for _, batch := range batches { result, err := sender.PostBatch(ctx, batch.Events) if err != nil { if enqueueErr := enqueueBatch(ctx, store, src, cfg.Host, path, batch.Events, cfg.Outbox.MaxBytes); enqueueErr != nil { return errors.Join(culpa.Wrap(err, "post batch"), enqueueErr) } return culpa.Wrap(err, "post batch") } advanced, err := persistAcceptedOffset(ctx, store, sender, cfg, src, path, events, newOffset, batch, result) if err != nil { return err } if advanced < len(batch.Events) { return nil } } return nil } func stampHost(events []wire.TurnEvent, host string) { for i := range events { events[i].Host = host } } func enqueueBatch(ctx context.Context, store *state.Store, src config.SourceConfig, host string, path string, events []wire.TurnEvent, maxOutboxBytes int64) error { payload, err := EncodeNDJSON(events) if err != nil { return culpa.Wrap(err, "encode failed batch") } if err := store.Enqueue(ctx, state.OutboxItem{Tool: src.Tool, Host: host, SourceFile: path, Payload: payload}); err != nil { return culpa.Wrap(err, "enqueue failed batch") } if err := EnforceOutboxLimit(ctx, store, maxOutboxBytes); err != nil { return culpa.Wrap(err, "enforce outbox limit") } return nil } func persistAcceptedOffset(ctx context.Context, store *state.Store, sender *Sender, cfg config.Config, src config.SourceConfig, path string, allEvents []wire.TurnEvent, newOffset int64, batch Batch, result Result) (int, error) { if result.Accepted < 0 || result.Accepted > len(batch.Events) { return 0, culpa.WithCode( fmt.Errorf("server accepted %d events for batch length %d", result.Accepted, len(batch.Events)), "RUNNER_ACCEPTED_INVALID", ) } if result.Accepted == len(batch.Events) && len(result.Errors) > 0 { return 0, culpa.WithCode( fmt.Errorf("server accepted all %d events but reported %d errors", len(batch.Events), len(result.Errors)), "RUNNER_ACCEPTED_MISMATCH", ) } if len(result.Errors) == 0 { if result.Accepted == 0 { return 0, nil } nextGlobalIndex := batch.EventIndexes[0] + result.Accepted offset := eventOffset(allEvents, newOffset, nextGlobalIndex) if err := store.SaveOffset(ctx, src.Tool, path, offset); err != nil { return 0, culpa.Wrap(err, "save accepted offset") } return result.Accepted, nil } // There are errors — use the first error's line to identify the failed row. failedLine := result.Errors[0].Line if failedLine < 1 || failedLine > len(batch.Events) { return 0, culpa.WithCode( fmt.Errorf("server error line %d out of range [1,%d]", failedLine, len(batch.Events)), "RUNNER_ERROR_LINE_INVALID", ) } failedLocalIdx := failedLine - 1 if result.Accepted > failedLocalIdx { return 0, culpa.WithCode( fmt.Errorf("server accepted %d events but first error is at line %d", result.Accepted, failedLine), "RUNNER_ACCEPTED_MISMATCH", ) } // Persist any already-accepted rows. if result.Accepted > 0 { nextGlobalIndex := batch.EventIndexes[0] + result.Accepted offset := eventOffset(allEvents, newOffset, nextGlobalIndex) if err := store.SaveOffset(ctx, src.Tool, path, offset); err != nil { return 0, culpa.Wrap(err, "save accepted offset") } } // If there are uncommitted valid rows between accepted and the failed row, // post that prefix safely with the same rules. if result.Accepted < failedLocalIdx { prefix := Batch{ Events: batch.Events[result.Accepted:failedLocalIdx], EventIndexes: batch.EventIndexes[result.Accepted:failedLocalIdx], } prefixResult, err := sender.PostBatch(ctx, prefix.Events) if err != nil { if enqueueErr := enqueueBatch(ctx, store, src, cfg.Host, path, prefix.Events, cfg.Outbox.MaxBytes); enqueueErr != nil { return 0, errors.Join(culpa.Wrap(err, "post prefix batch"), enqueueErr) } return 0, culpa.Wrap(err, "post prefix batch") } prefixAdvanced, err := persistAcceptedOffset(ctx, store, sender, cfg, src, path, allEvents, newOffset, prefix, prefixResult) if err != nil { return 0, err } if prefixAdvanced < len(prefix.Events) { // The prefix itself hit an error or could not advance fully. // Return total advanced so far (accepted rows + prefix advancement). return result.Accepted + prefixAdvanced, nil } } // Skip the failed row so it does not loop forever. nextGlobalIndex := batch.EventIndexes[0] + failedLocalIdx + 1 offset := eventOffset(allEvents, newOffset, nextGlobalIndex) if err := store.SaveOffset(ctx, src.Tool, path, offset); err != nil { return 0, culpa.Wrap(err, "save offset past failed row") } return failedLocalIdx + 1, nil } func eventOffset(allEvents []wire.TurnEvent, newOffset int64, nextGlobalIndex int) int64 { if nextGlobalIndex >= len(allEvents) { return newOffset } return allEvents[nextGlobalIndex].Seq }