~bigbes/lethe

ref: 96e95ab9e44d2234ab036319836a5087eb4c2a2f lethe/internal/collector/ingest/runner.go -rw-r--r-- 11.3 KiB
96e95ab9 — Eugene Blikh fix: add tool column to search table; remove conversation bleed from comments 23 days ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
package ingest

import (
	"context"
	"errors"
	"fmt"
	"log/slog"
	"math"
	"time"

	"go.bigb.es/auxilia/async"
	"go.bigb.es/auxilia/culpa"
	"sourcecraft.dev/bigbes/lethe/internal/collector/config"
	"sourcecraft.dev/bigbes/lethe/internal/collector/parser"
	"sourcecraft.dev/bigbes/lethe/internal/collector/state"
	"sourcecraft.dev/bigbes/lethe/internal/shared/wire"
)

const replayOutboxLimit = 100

// enforceAndReplay enforces the outbox size limit then replays queued rows.
func enforceAndReplay(ctx context.Context, store *state.Store, sender *Sender, maxBytes int64, src config.SourceConfig) error {
	var errs []error
	if err := EnforceOutboxLimit(ctx, store, maxBytes); err != nil {
		if ctxErr := ctx.Err(); ctxErr != nil {
			return errors.Join(ctxErr, culpa.Wrap(err, "enforce outbox limit"))
		}
		slog.Warn("enforce outbox limit failed", "tool", src.Tool, "error", err)
		errs = append(errs, culpa.Wrap(err, "enforce outbox limit"))
	}
	if err := ReplayOutbox(ctx, store, sender, replayOutboxLimit); err != nil {
		if ctxErr := ctx.Err(); ctxErr != nil {
			return errors.Join(append(errs, ctxErr, culpa.Wrap(err, "replay outbox"))...)
		}
		slog.Warn("outbox replay failed", "tool", src.Tool, "error", err)
		errs = append(errs, culpa.Wrap(err, "replay outbox"))
	}
	return errors.Join(errs...)
}

// RunOnce replays the safety-net outbox, scans one configured source, parses
// new complete records from persisted offsets, posts batches, and advances
// offsets only after server acceptance confirms the corresponding lines.
func RunOnce(ctx context.Context, cfg config.Config, src config.SourceConfig, p parser.Parser, store *state.Store, sender *Sender) error {
	var errs []error

	if p.Tool() != src.Tool {
		return culpa.WithCode(
			fmt.Errorf("parser tool %q does not match source tool %q", p.Tool(), src.Tool),
			"RUNNER_TOOL_MISMATCH",
		)
	}

	if err := enforceAndReplay(ctx, store, sender, cfg.Outbox.MaxBytes, src); err != nil {
		if ctxErr := ctx.Err(); ctxErr != nil {
			return errors.Join(append(errs, err)...)
		}
		errs = append(errs, err)
	}

	files, err := p.Discover(src.Path)
	if err != nil {
		return errors.Join(append(errs, culpa.Wrap(err, "discover source files"))...)
	}

	for _, file := range files {
		if err := ctx.Err(); err != nil {
			return errors.Join(append(errs, err)...)
		}
		if err := runFile(ctx, cfg, src, p, store, sender, file.Path); err != nil {
			slog.Warn("source file ingest failed", "tool", src.Tool, "source_file", file.Path, "error", err)
			errs = append(errs, err)
		}
	}

	return errors.Join(errs...)
}

// RunBackfillOnce is like RunOnce but starts every matched file at offset 0
// instead of the persisted offset. Accepted progress is still saved, so an
// interrupted backfill can be resumed via RunOnce.
func RunBackfillOnce(ctx context.Context, cfg config.Config, src config.SourceConfig, p parser.Parser, store *state.Store, sender *Sender) error {
	var errs []error

	if p.Tool() != src.Tool {
		return culpa.WithCode(
			fmt.Errorf("parser tool %q does not match source tool %q", p.Tool(), src.Tool),
			"RUNNER_TOOL_MISMATCH",
		)
	}

	if err := enforceAndReplay(ctx, store, sender, cfg.Outbox.MaxBytes, src); err != nil {
		if ctxErr := ctx.Err(); ctxErr != nil {
			return errors.Join(append(errs, err)...)
		}
		errs = append(errs, err)
	}

	files, err := p.Discover(src.Path)
	if err != nil {
		return errors.Join(append(errs, culpa.Wrap(err, "discover source files"))...)
	}

	for _, file := range files {
		if err := ctx.Err(); err != nil {
			return errors.Join(append(errs, err)...)
		}
		if err := runFileFromOffset(ctx, cfg, src, p, store, sender, file.Path, 0); err != nil {
			slog.Warn("source file ingest failed", "tool", src.Tool, "source_file", file.Path, "error", err)
			errs = append(errs, err)
		}
	}

	return errors.Join(errs...)
}

// RunDaemon starts one independent polling loop per configured source and
// exits after the supplied context is canceled and all loops have stopped.
func RunDaemon(ctx context.Context, cfg config.Config, parsers map[string]parser.Parser, store *state.Store, sender *Sender) error {
	if len(cfg.Sources) == 0 {
		<-ctx.Done()
		return nil
	}

	group := async.NewGroup[struct{}]()
	for _, source := range cfg.Sources {
		src := source
		p, ok := parsers[src.Tool]
		if !ok {
			return culpa.WithCode(fmt.Errorf("missing parser for source tool %q", src.Tool), "RUNNER_MISSING_PARSER")
		}
		group.AddExecutor(func(loopCtx context.Context) (struct{}, error) {
			pollSource(loopCtx, cfg, src, p, store, sender)
			return struct{}{}, nil
		})
	}

	if err := group.Start(ctx); err != nil {
		return culpa.Wrap(err, "start source polling loops")
	}
	if err := group.All(context.Background()); err != nil {
		return culpa.Wrap(err, "wait for source polling loops")
	}
	return nil
}

func pollSource(ctx context.Context, cfg config.Config, src config.SourceConfig, p parser.Parser, store *state.Store, sender *Sender) {
	runAndLog := func() {
		if ctx.Err() != nil {
			return
		}
		// Use a bounded context independent of daemon cancellation so an
		// in-flight RunOnce is not aborted by SIGTERM.
		runCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), cfg.HTTP.Timeout)
		defer cancel()
		if err := RunOnce(runCtx, cfg, src, p, store, sender); err != nil && runCtx.Err() == nil {
			slog.Warn("source poll failed", "tool", src.Tool, "path", src.Path, "error", err)
		}
	}

	runAndLog()
	ticker := time.NewTicker(src.PollInterval)
	defer ticker.Stop()

	for {
		select {
		case <-ctx.Done():
			return
		case <-ticker.C:
			runAndLog()
		}
	}
}

func runFile(ctx context.Context, cfg config.Config, src config.SourceConfig, p parser.Parser, store *state.Store, sender *Sender, path string) error {
	offset, err := store.GetOffset(ctx, src.Tool, path)
	if err != nil {
		return culpa.Wrap(err, "get source offset")
	}
	return runFileFromOffset(ctx, cfg, src, p, store, sender, path, offset)
}

func runFileFromOffset(ctx context.Context, cfg config.Config, src config.SourceConfig, p parser.Parser, store *state.Store, sender *Sender, path string, startOffset int64) error {
	events, newOffset, err := p.Parse(path, startOffset)
	if err != nil {
		return culpa.Wrap(err, "parse source file")
	}
	if len(events) == 0 {
		if newOffset > startOffset {
			if err := store.SaveOffset(ctx, src.Tool, path, newOffset); err != nil {
				return culpa.Wrap(err, "save skipped-only offset")
			}
		}
		return nil
	}
	stampHost(events, cfg.Host)

	if src.BatchMaxBytes > math.MaxInt {
		return culpa.WithCode(fmt.Errorf("batch max bytes %d exceeds int max", src.BatchMaxBytes), "RUNNER_BATCH_CAP")
	}
	batches, err := BuildBatches(events, src.BatchMaxLines, int(src.BatchMaxBytes))
	if err != nil {
		return culpa.Wrap(err, "build event batches")
	}

	for _, batch := range batches {
		result, err := sender.PostBatch(ctx, batch.Events)
		if err != nil {
			if enqueueErr := enqueueBatch(ctx, store, src, cfg.Host, path, batch.Events, cfg.Outbox.MaxBytes); enqueueErr != nil {
				return errors.Join(culpa.Wrap(err, "post batch"), enqueueErr)
			}
			return culpa.Wrap(err, "post batch")
		}

		advanced, err := persistAcceptedOffset(ctx, store, sender, cfg, src, path, events, newOffset, batch, result)
		if err != nil {
			return err
		}
		if advanced < len(batch.Events) {
			return nil
		}
	}

	return nil
}

func stampHost(events []wire.TurnEvent, host string) {
	for i := range events {
		events[i].Host = host
	}
}

func enqueueBatch(ctx context.Context, store *state.Store, src config.SourceConfig, host string, path string, events []wire.TurnEvent, maxOutboxBytes int64) error {
	payload, err := EncodeNDJSON(events)
	if err != nil {
		return culpa.Wrap(err, "encode failed batch")
	}
	if err := store.Enqueue(ctx, state.OutboxItem{Tool: src.Tool, Host: host, SourceFile: path, Payload: payload}); err != nil {
		return culpa.Wrap(err, "enqueue failed batch")
	}
	if err := EnforceOutboxLimit(ctx, store, maxOutboxBytes); err != nil {
		return culpa.Wrap(err, "enforce outbox limit")
	}
	return nil
}

func persistAcceptedOffset(ctx context.Context, store *state.Store, sender *Sender, cfg config.Config, src config.SourceConfig, path string, allEvents []wire.TurnEvent, newOffset int64, batch Batch, result Result) (int, error) {
	if result.Accepted < 0 || result.Accepted > len(batch.Events) {
		return 0, culpa.WithCode(
			fmt.Errorf("server accepted %d events for batch length %d", result.Accepted, len(batch.Events)),
			"RUNNER_ACCEPTED_INVALID",
		)
	}
	if result.Accepted == len(batch.Events) && len(result.Errors) > 0 {
		return 0, culpa.WithCode(
			fmt.Errorf("server accepted all %d events but reported %d errors", len(batch.Events), len(result.Errors)),
			"RUNNER_ACCEPTED_MISMATCH",
		)
	}

	if len(result.Errors) == 0 {
		if result.Accepted == 0 {
			return 0, nil
		}
		nextGlobalIndex := batch.EventIndexes[0] + result.Accepted
		offset := eventOffset(allEvents, newOffset, nextGlobalIndex)
		if err := store.SaveOffset(ctx, src.Tool, path, offset); err != nil {
			return 0, culpa.Wrap(err, "save accepted offset")
		}
		return result.Accepted, nil
	}

	// There are errors — use the first error's line to identify the failed row.
	failedLine := result.Errors[0].Line
	if failedLine < 1 || failedLine > len(batch.Events) {
		return 0, culpa.WithCode(
			fmt.Errorf("server error line %d out of range [1,%d]", failedLine, len(batch.Events)),
			"RUNNER_ERROR_LINE_INVALID",
		)
	}
	failedLocalIdx := failedLine - 1

	if result.Accepted > failedLocalIdx {
		return 0, culpa.WithCode(
			fmt.Errorf("server accepted %d events but first error is at line %d", result.Accepted, failedLine),
			"RUNNER_ACCEPTED_MISMATCH",
		)
	}

	// Persist any already-accepted rows.
	if result.Accepted > 0 {
		nextGlobalIndex := batch.EventIndexes[0] + result.Accepted
		offset := eventOffset(allEvents, newOffset, nextGlobalIndex)
		if err := store.SaveOffset(ctx, src.Tool, path, offset); err != nil {
			return 0, culpa.Wrap(err, "save accepted offset")
		}
	}

	// If there are uncommitted valid rows between accepted and the failed row,
	// post that prefix safely with the same rules.
	if result.Accepted < failedLocalIdx {
		prefix := Batch{
			Events:       batch.Events[result.Accepted:failedLocalIdx],
			EventIndexes: batch.EventIndexes[result.Accepted:failedLocalIdx],
		}
		prefixResult, err := sender.PostBatch(ctx, prefix.Events)
		if err != nil {
			if enqueueErr := enqueueBatch(ctx, store, src, cfg.Host, path, prefix.Events, cfg.Outbox.MaxBytes); enqueueErr != nil {
				return 0, errors.Join(culpa.Wrap(err, "post prefix batch"), enqueueErr)
			}
			return 0, culpa.Wrap(err, "post prefix batch")
		}
		prefixAdvanced, err := persistAcceptedOffset(ctx, store, sender, cfg, src, path, allEvents, newOffset, prefix, prefixResult)
		if err != nil {
			return 0, err
		}
		if prefixAdvanced < len(prefix.Events) {
			// The prefix itself hit an error or could not advance fully.
			// Return total advanced so far (accepted rows + prefix advancement).
			return result.Accepted + prefixAdvanced, nil
		}
	}

	// Skip the failed row so it does not loop forever.
	nextGlobalIndex := batch.EventIndexes[0] + failedLocalIdx + 1
	offset := eventOffset(allEvents, newOffset, nextGlobalIndex)
	if err := store.SaveOffset(ctx, src.Tool, path, offset); err != nil {
		return 0, culpa.Wrap(err, "save offset past failed row")
	}
	return failedLocalIdx + 1, nil
}

func eventOffset(allEvents []wire.TurnEvent, newOffset int64, nextGlobalIndex int) int64 {
	if nextGlobalIndex >= len(allEvents) {
		return newOffset
	}
	return allEvents[nextGlobalIndex].Seq
}