package ingest
import (
"context"
"errors"
"fmt"
"log/slog"
"math"
"time"
"go.bigb.es/auxilia/async"
"go.bigb.es/auxilia/culpa"
"sourcecraft.dev/bigbes/lethe/internal/collector/config"
"sourcecraft.dev/bigbes/lethe/internal/collector/parser"
"sourcecraft.dev/bigbes/lethe/internal/collector/state"
"sourcecraft.dev/bigbes/lethe/internal/shared/wire"
)
const replayOutboxLimit = 100
// RunOnce replays the safety-net outbox, scans one configured source, parses
// new complete records from persisted offsets, posts batches, and advances
// offsets only after server acceptance confirms the corresponding lines.
func RunOnce(ctx context.Context, cfg config.Config, src config.SourceConfig, p parser.Parser, store *state.Store, sender *Sender) error {
var errs []error
if p.Tool() != src.Tool {
return culpa.WithCode(
fmt.Errorf("parser tool %q does not match source tool %q", p.Tool(), src.Tool),
"RUNNER_TOOL_MISMATCH",
)
}
if err := ReplayOutbox(ctx, store, sender, replayOutboxLimit); err != nil {
if ctxErr := ctx.Err(); ctxErr != nil {
return errors.Join(ctxErr, culpa.Wrap(err, "replay outbox"))
}
slog.Warn("outbox replay failed", "tool", src.Tool, "error", err)
errs = append(errs, culpa.Wrap(err, "replay outbox"))
}
files, err := p.Discover(src.Path)
if err != nil {
return errors.Join(append(errs, culpa.Wrap(err, "discover source files"))...)
}
for _, file := range files {
if err := ctx.Err(); err != nil {
return errors.Join(append(errs, err)...)
}
if err := runFile(ctx, cfg, src, p, store, sender, file.Path); err != nil {
slog.Warn("source file ingest failed", "tool", src.Tool, "source_file", file.Path, "error", err)
errs = append(errs, err)
}
}
return errors.Join(errs...)
}
// RunDaemon starts one independent polling loop per configured source and
// exits after the supplied context is canceled and all loops have stopped.
func RunDaemon(ctx context.Context, cfg config.Config, parsers map[string]parser.Parser, store *state.Store, sender *Sender) error {
if len(cfg.Sources) == 0 {
<-ctx.Done()
return nil
}
group := async.NewGroup[struct{}]()
for _, source := range cfg.Sources {
src := source
p, ok := parsers[src.Tool]
if !ok {
return culpa.WithCode(fmt.Errorf("missing parser for source tool %q", src.Tool), "RUNNER_MISSING_PARSER")
}
group.AddExecutor(func(loopCtx context.Context) (struct{}, error) {
pollSource(loopCtx, cfg, src, p, store, sender)
return struct{}{}, nil
})
}
if err := group.Start(ctx); err != nil {
return culpa.Wrap(err, "start source polling loops")
}
if err := group.All(context.Background()); err != nil {
return culpa.Wrap(err, "wait for source polling loops")
}
return nil
}
func pollSource(ctx context.Context, cfg config.Config, src config.SourceConfig, p parser.Parser, store *state.Store, sender *Sender) {
runAndLog := func() {
if ctx.Err() != nil {
return
}
if err := RunOnce(ctx, cfg, src, p, store, sender); err != nil && ctx.Err() == nil {
slog.Warn("source poll failed", "tool", src.Tool, "path", src.Path, "error", err)
}
}
runAndLog()
ticker := time.NewTicker(src.PollInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
runAndLog()
}
}
}
func runFile(ctx context.Context, cfg config.Config, src config.SourceConfig, p parser.Parser, store *state.Store, sender *Sender, path string) error {
offset, err := store.GetOffset(ctx, src.Tool, path)
if err != nil {
return culpa.Wrap(err, "get source offset")
}
events, newOffset, err := p.Parse(path, offset)
if err != nil {
return culpa.Wrap(err, "parse source file")
}
if len(events) == 0 {
return nil
}
stampHost(events, cfg.Host)
if src.BatchMaxBytes > math.MaxInt {
return culpa.WithCode(fmt.Errorf("batch max bytes %d exceeds int max", src.BatchMaxBytes), "RUNNER_BATCH_CAP")
}
batches, err := BuildBatches(events, src.BatchMaxLines, int(src.BatchMaxBytes))
if err != nil {
return culpa.Wrap(err, "build event batches")
}
for _, batch := range batches {
result, err := sender.PostBatch(ctx, batch.Events)
if err != nil {
if enqueueErr := enqueueBatch(ctx, store, src, cfg.Host, path, batch.Events, cfg.Outbox.MaxBytes); enqueueErr != nil {
return errors.Join(culpa.Wrap(err, "post batch"), enqueueErr)
}
return culpa.Wrap(err, "post batch")
}
advanced, err := persistAcceptedOffset(ctx, store, src.Tool, path, events, newOffset, batch, result.Accepted)
if err != nil {
return err
}
if advanced < len(batch.Events) {
return nil
}
}
return nil
}
func stampHost(events []wire.TurnEvent, host string) {
for i := range events {
events[i].Host = host
}
}
func enqueueBatch(ctx context.Context, store *state.Store, src config.SourceConfig, host string, path string, events []wire.TurnEvent, maxOutboxBytes int64) error {
payload, err := EncodeNDJSON(events)
if err != nil {
return culpa.Wrap(err, "encode failed batch")
}
if err := store.Enqueue(ctx, state.OutboxItem{Tool: src.Tool, Host: host, SourceFile: path, Payload: payload}); err != nil {
return culpa.Wrap(err, "enqueue failed batch")
}
if err := EnforceOutboxLimit(ctx, store, maxOutboxBytes); err != nil {
return culpa.Wrap(err, "enforce outbox limit")
}
return nil
}
func persistAcceptedOffset(ctx context.Context, store *state.Store, tool string, path string, allEvents []wire.TurnEvent, newOffset int64, batch Batch, accepted int) (int, error) {
if accepted < 0 || accepted > len(batch.Events) {
return 0, culpa.WithCode(
fmt.Errorf("server accepted %d events for batch length %d", accepted, len(batch.Events)),
"RUNNER_ACCEPTED_INVALID",
)
}
if accepted == 0 {
return 0, nil
}
nextGlobalIndex := batch.EventIndexes[0] + accepted
var offset int64
if nextGlobalIndex >= len(allEvents) {
offset = newOffset
} else {
offset = allEvents[nextGlobalIndex].Seq
}
if err := store.SaveOffset(ctx, tool, path, offset); err != nil {
return 0, culpa.Wrap(err, "save accepted offset")
}
return accepted, nil
}