package ingest import ( "context" "errors" "fmt" "log/slog" "math" "time" "go.bigb.es/auxilia/async" "go.bigb.es/auxilia/culpa" "sourcecraft.dev/bigbes/lethe/internal/collector/config" "sourcecraft.dev/bigbes/lethe/internal/collector/parser" "sourcecraft.dev/bigbes/lethe/internal/collector/state" "sourcecraft.dev/bigbes/lethe/internal/shared/wire" ) const replayOutboxLimit = 100 // RunOnce replays the safety-net outbox, scans one configured source, parses // new complete records from persisted offsets, posts batches, and advances // offsets only after server acceptance confirms the corresponding lines. func RunOnce(ctx context.Context, cfg config.Config, src config.SourceConfig, p parser.Parser, store *state.Store, sender *Sender) error { var errs []error if p.Tool() != src.Tool { return culpa.WithCode( fmt.Errorf("parser tool %q does not match source tool %q", p.Tool(), src.Tool), "RUNNER_TOOL_MISMATCH", ) } if err := ReplayOutbox(ctx, store, sender, replayOutboxLimit); err != nil { if ctxErr := ctx.Err(); ctxErr != nil { return errors.Join(ctxErr, culpa.Wrap(err, "replay outbox")) } slog.Warn("outbox replay failed", "tool", src.Tool, "error", err) errs = append(errs, culpa.Wrap(err, "replay outbox")) } files, err := p.Discover(src.Path) if err != nil { return errors.Join(append(errs, culpa.Wrap(err, "discover source files"))...) } for _, file := range files { if err := ctx.Err(); err != nil { return errors.Join(append(errs, err)...) } if err := runFile(ctx, cfg, src, p, store, sender, file.Path); err != nil { slog.Warn("source file ingest failed", "tool", src.Tool, "source_file", file.Path, "error", err) errs = append(errs, err) } } return errors.Join(errs...) } // RunDaemon starts one independent polling loop per configured source and // exits after the supplied context is canceled and all loops have stopped. func RunDaemon(ctx context.Context, cfg config.Config, parsers map[string]parser.Parser, store *state.Store, sender *Sender) error { if len(cfg.Sources) == 0 { <-ctx.Done() return nil } group := async.NewGroup[struct{}]() for _, source := range cfg.Sources { src := source p, ok := parsers[src.Tool] if !ok { return culpa.WithCode(fmt.Errorf("missing parser for source tool %q", src.Tool), "RUNNER_MISSING_PARSER") } group.AddExecutor(func(loopCtx context.Context) (struct{}, error) { pollSource(loopCtx, cfg, src, p, store, sender) return struct{}{}, nil }) } if err := group.Start(ctx); err != nil { return culpa.Wrap(err, "start source polling loops") } if err := group.All(context.Background()); err != nil { return culpa.Wrap(err, "wait for source polling loops") } return nil } func pollSource(ctx context.Context, cfg config.Config, src config.SourceConfig, p parser.Parser, store *state.Store, sender *Sender) { runAndLog := func() { if ctx.Err() != nil { return } if err := RunOnce(ctx, cfg, src, p, store, sender); err != nil && ctx.Err() == nil { slog.Warn("source poll failed", "tool", src.Tool, "path", src.Path, "error", err) } } runAndLog() ticker := time.NewTicker(src.PollInterval) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: runAndLog() } } } func runFile(ctx context.Context, cfg config.Config, src config.SourceConfig, p parser.Parser, store *state.Store, sender *Sender, path string) error { offset, err := store.GetOffset(ctx, src.Tool, path) if err != nil { return culpa.Wrap(err, "get source offset") } events, newOffset, err := p.Parse(path, offset) if err != nil { return culpa.Wrap(err, "parse source file") } if len(events) == 0 { return nil } stampHost(events, cfg.Host) if src.BatchMaxBytes > math.MaxInt { return culpa.WithCode(fmt.Errorf("batch max bytes %d exceeds int max", src.BatchMaxBytes), "RUNNER_BATCH_CAP") } batches, err := BuildBatches(events, src.BatchMaxLines, int(src.BatchMaxBytes)) if err != nil { return culpa.Wrap(err, "build event batches") } for _, batch := range batches { result, err := sender.PostBatch(ctx, batch.Events) if err != nil { if enqueueErr := enqueueBatch(ctx, store, src, cfg.Host, path, batch.Events, cfg.Outbox.MaxBytes); enqueueErr != nil { return errors.Join(culpa.Wrap(err, "post batch"), enqueueErr) } return culpa.Wrap(err, "post batch") } advanced, err := persistAcceptedOffset(ctx, store, src.Tool, path, events, newOffset, batch, result) if err != nil { return err } if advanced < len(batch.Events) { return nil } } return nil } func stampHost(events []wire.TurnEvent, host string) { for i := range events { events[i].Host = host } } func enqueueBatch(ctx context.Context, store *state.Store, src config.SourceConfig, host string, path string, events []wire.TurnEvent, maxOutboxBytes int64) error { payload, err := EncodeNDJSON(events) if err != nil { return culpa.Wrap(err, "encode failed batch") } if err := store.Enqueue(ctx, state.OutboxItem{Tool: src.Tool, Host: host, SourceFile: path, Payload: payload}); err != nil { return culpa.Wrap(err, "enqueue failed batch") } if err := EnforceOutboxLimit(ctx, store, maxOutboxBytes); err != nil { return culpa.Wrap(err, "enforce outbox limit") } return nil } func persistAcceptedOffset(ctx context.Context, store *state.Store, tool string, path string, allEvents []wire.TurnEvent, newOffset int64, batch Batch, result Result) (int, error) { if result.Accepted < 0 || result.Accepted > len(batch.Events) { return 0, culpa.WithCode( fmt.Errorf("server accepted %d events for batch length %d", result.Accepted, len(batch.Events)), "RUNNER_ACCEPTED_INVALID", ) } if result.Accepted == len(batch.Events) && len(result.Errors) > 0 { return 0, culpa.WithCode( fmt.Errorf("server accepted all %d events but reported %d errors", len(batch.Events), len(result.Errors)), "RUNNER_ACCEPTED_MISMATCH", ) } if result.Accepted == 0 && len(result.Errors) == 0 { return 0, nil } nextGlobalIndex := batch.EventIndexes[0] + result.Accepted if len(result.Errors) > 0 { // Skip the failed event so it does not loop forever. nextGlobalIndex++ } var offset int64 if nextGlobalIndex >= len(allEvents) { offset = newOffset } else { offset = allEvents[nextGlobalIndex].Seq } if err := store.SaveOffset(ctx, tool, path, offset); err != nil { return 0, culpa.Wrap(err, "save accepted offset") } return result.Accepted, nil }