~bigbes/lethe

ref: 27c47142dda459f4dd552a8ceba4934eca81ff6b lethe/internal/collector/ingest/runner.go -rw-r--r-- 5.9 KiB
27c47142 — Eugene Blikh collector: add polling source runner 24 days ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
package ingest

import (
	"context"
	"errors"
	"fmt"
	"log/slog"
	"math"
	"time"

	"go.bigb.es/auxilia/async"
	"go.bigb.es/auxilia/culpa"
	"sourcecraft.dev/bigbes/lethe/internal/collector/config"
	"sourcecraft.dev/bigbes/lethe/internal/collector/parser"
	"sourcecraft.dev/bigbes/lethe/internal/collector/state"
	"sourcecraft.dev/bigbes/lethe/internal/shared/wire"
)

const replayOutboxLimit = 100

// RunOnce replays the safety-net outbox, scans one configured source, parses
// new complete records from persisted offsets, posts batches, and advances
// offsets only after server acceptance confirms the corresponding lines.
func RunOnce(ctx context.Context, cfg config.Config, src config.SourceConfig, p parser.Parser, store *state.Store, sender *Sender) error {
	var errs []error

	if p.Tool() != src.Tool {
		return culpa.WithCode(
			fmt.Errorf("parser tool %q does not match source tool %q", p.Tool(), src.Tool),
			"RUNNER_TOOL_MISMATCH",
		)
	}

	if err := ReplayOutbox(ctx, store, sender, replayOutboxLimit); err != nil {
		if ctxErr := ctx.Err(); ctxErr != nil {
			return errors.Join(ctxErr, culpa.Wrap(err, "replay outbox"))
		}
		slog.Warn("outbox replay failed", "tool", src.Tool, "error", err)
		errs = append(errs, culpa.Wrap(err, "replay outbox"))
	}

	files, err := p.Discover(src.Path)
	if err != nil {
		return errors.Join(append(errs, culpa.Wrap(err, "discover source files"))...)
	}

	for _, file := range files {
		if err := ctx.Err(); err != nil {
			return errors.Join(append(errs, err)...)
		}
		if err := runFile(ctx, cfg, src, p, store, sender, file.Path); err != nil {
			slog.Warn("source file ingest failed", "tool", src.Tool, "source_file", file.Path, "error", err)
			errs = append(errs, err)
		}
	}

	return errors.Join(errs...)
}

// RunDaemon starts one independent polling loop per configured source and
// exits after the supplied context is canceled and all loops have stopped.
func RunDaemon(ctx context.Context, cfg config.Config, parsers map[string]parser.Parser, store *state.Store, sender *Sender) error {
	if len(cfg.Sources) == 0 {
		<-ctx.Done()
		return nil
	}

	group := async.NewGroup[struct{}]()
	for _, source := range cfg.Sources {
		src := source
		p, ok := parsers[src.Tool]
		if !ok {
			return culpa.WithCode(fmt.Errorf("missing parser for source tool %q", src.Tool), "RUNNER_MISSING_PARSER")
		}
		group.AddExecutor(func(loopCtx context.Context) (struct{}, error) {
			pollSource(loopCtx, cfg, src, p, store, sender)
			return struct{}{}, nil
		})
	}

	if err := group.Start(ctx); err != nil {
		return culpa.Wrap(err, "start source polling loops")
	}
	if err := group.All(context.Background()); err != nil {
		return culpa.Wrap(err, "wait for source polling loops")
	}
	return nil
}

func pollSource(ctx context.Context, cfg config.Config, src config.SourceConfig, p parser.Parser, store *state.Store, sender *Sender) {
	runAndLog := func() {
		if ctx.Err() != nil {
			return
		}
		if err := RunOnce(ctx, cfg, src, p, store, sender); err != nil && ctx.Err() == nil {
			slog.Warn("source poll failed", "tool", src.Tool, "path", src.Path, "error", err)
		}
	}

	runAndLog()
	ticker := time.NewTicker(src.PollInterval)
	defer ticker.Stop()

	for {
		select {
		case <-ctx.Done():
			return
		case <-ticker.C:
			runAndLog()
		}
	}
}

func runFile(ctx context.Context, cfg config.Config, src config.SourceConfig, p parser.Parser, store *state.Store, sender *Sender, path string) error {
	offset, err := store.GetOffset(ctx, src.Tool, path)
	if err != nil {
		return culpa.Wrap(err, "get source offset")
	}

	events, newOffset, err := p.Parse(path, offset)
	if err != nil {
		return culpa.Wrap(err, "parse source file")
	}
	if len(events) == 0 {
		return nil
	}
	stampHost(events, cfg.Host)

	if src.BatchMaxBytes > math.MaxInt {
		return culpa.WithCode(fmt.Errorf("batch max bytes %d exceeds int max", src.BatchMaxBytes), "RUNNER_BATCH_CAP")
	}
	batches, err := BuildBatches(events, src.BatchMaxLines, int(src.BatchMaxBytes))
	if err != nil {
		return culpa.Wrap(err, "build event batches")
	}

	for _, batch := range batches {
		result, err := sender.PostBatch(ctx, batch.Events)
		if err != nil {
			if enqueueErr := enqueueBatch(ctx, store, src, cfg.Host, path, batch.Events, cfg.Outbox.MaxBytes); enqueueErr != nil {
				return errors.Join(culpa.Wrap(err, "post batch"), enqueueErr)
			}
			return culpa.Wrap(err, "post batch")
		}

		advanced, err := persistAcceptedOffset(ctx, store, src.Tool, path, events, newOffset, batch, result.Accepted)
		if err != nil {
			return err
		}
		if advanced < len(batch.Events) {
			return nil
		}
	}

	return nil
}

func stampHost(events []wire.TurnEvent, host string) {
	for i := range events {
		events[i].Host = host
	}
}

func enqueueBatch(ctx context.Context, store *state.Store, src config.SourceConfig, host string, path string, events []wire.TurnEvent, maxOutboxBytes int64) error {
	payload, err := EncodeNDJSON(events)
	if err != nil {
		return culpa.Wrap(err, "encode failed batch")
	}
	if err := store.Enqueue(ctx, state.OutboxItem{Tool: src.Tool, Host: host, SourceFile: path, Payload: payload}); err != nil {
		return culpa.Wrap(err, "enqueue failed batch")
	}
	if err := EnforceOutboxLimit(ctx, store, maxOutboxBytes); err != nil {
		return culpa.Wrap(err, "enforce outbox limit")
	}
	return nil
}

func persistAcceptedOffset(ctx context.Context, store *state.Store, tool string, path string, allEvents []wire.TurnEvent, newOffset int64, batch Batch, accepted int) (int, error) {
	if accepted < 0 || accepted > len(batch.Events) {
		return 0, culpa.WithCode(
			fmt.Errorf("server accepted %d events for batch length %d", accepted, len(batch.Events)),
			"RUNNER_ACCEPTED_INVALID",
		)
	}
	if accepted == 0 {
		return 0, nil
	}

	nextGlobalIndex := batch.EventIndexes[0] + accepted
	var offset int64
	if nextGlobalIndex >= len(allEvents) {
		offset = newOffset
	} else {
		offset = allEvents[nextGlobalIndex].Seq
	}

	if err := store.SaveOffset(ctx, tool, path, offset); err != nil {
		return 0, culpa.Wrap(err, "save accepted offset")
	}
	return accepted, nil
}