~bigbes/lethe

ref: f903c872196ad6f6bf020e59637993f119438d7f lethe/internal/domain/ingest/service.go -rw-r--r-- 7.4 KiB
f903c872 — Eugene Blikh web: scaffold vite/react/ts project, port design tokens and primitives a month ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
package ingest

import (
	"context"
	"encoding/json"
	"io"
	"log/slog"

	"github.com/jmoiron/sqlx"
	"go.bigb.es/auxilia/culpa"
	"go.bigb.es/auxilia/scribe"

	"sourcecraft.dev/bigbes/lethe/internal/config"
	"sourcecraft.dev/bigbes/lethe/internal/pkg/httputil"
	"sourcecraft.dev/bigbes/lethe/internal/platform/database"
	"sourcecraft.dev/bigbes/lethe/internal/platform/observability"
	"sourcecraft.dev/bigbes/lethe/internal/shared/wire"
)

// validRoles is the closed set of values accepted in TurnEvent.Role. Phase 7
// validates against this list before reaching the database.
var validRoles = map[string]struct{}{
	"user":      {},
	"assistant": {},
	"tool":      {},
	"system":    {},
}

// maxSourceFileBytes caps the per-line SessionMeta.SourceFile length. The
// server treats source_file as opaque, but extreme values are still rejected
// to keep storage and logs sane.
const maxSourceFileBytes = 1024

// LineError describes a single NDJSON line that failed validation, JSON
// parsing, or chunk commit. It is stable wire output: the handler embeds the
// list directly into the 200 response body.
type LineError struct {
	Line int    `json:"line"`
	Err  string `json:"error"`
}

// Result is the outcome of one Ingest call. Accepted is the count of turns
// actually committed to the database; Errors lists per-line failures in the
// order they occurred. Per the spec the ingest endpoint returns 200 even when
// Errors is non-empty — the client uses Accepted to advance its offset.
type Result struct {
	Accepted int         `json:"accepted"`
	Errors   []LineError `json:"errors"`
}

// Service is the steward-managed ingest pipeline. Repo is the SQL boundary;
// Cfg fixes the chunk size and content-byte cap; Log emits per-chunk failure
// notices at WARN; Metrics counts accepted/errored lines and committed chunks.
type Service struct {
	Cfg     config.IngestConfig    `config:""`
	Repo    *Repository            `inject:""`
	Log     *observability.Logger  `inject:""`
	Metrics *observability.Metrics `inject:""`
}

// Init satisfies the steward Initer contract. The Service is stateless beyond
// its injected dependencies.
func (s *Service) Init(_ context.Context) error { return nil }

// validateTurn enforces the wire contract on a single decoded TurnEvent. It
// returns a culpa-coded INVALID error on the first failure; the Service maps
// that into a per-line LineError. Seq is intentionally unchecked: zero is a
// legal first sequence number on the wire.
func validateTurn(t wire.TurnEvent, maxContentBytes int64) error {
	if t.Tool == "" {
		return culpa.WithCode(culpa.New("tool is required"), "INVALID")
	}
	if t.Host == "" {
		return culpa.WithCode(culpa.New("host is required"), "INVALID")
	}
	if t.SessionID == "" {
		return culpa.WithCode(culpa.New("session_id is required"), "INVALID")
	}
	if t.TurnID == "" {
		return culpa.WithCode(culpa.New("turn_id is required"), "INVALID")
	}
	if t.Role == "" {
		return culpa.WithCode(culpa.New("role is required"), "INVALID")
	}
	if _, ok := validRoles[t.Role]; !ok {
		return culpa.WithCode(culpa.Errorf("role %q is not one of {user, assistant, tool, system}", t.Role), "INVALID")
	}
	if t.Timestamp == 0 {
		return culpa.WithCode(culpa.New("timestamp is required"), "INVALID")
	}
	if t.Content == "" {
		return culpa.WithCode(culpa.New("content is required"), "INVALID")
	}
	if int64(len(t.Content)) > maxContentBytes {
		return culpa.WithCode(culpa.Errorf("content exceeds %d bytes", maxContentBytes), "INVALID")
	}
	if t.SessionMeta.SourceFile == "" {
		return culpa.WithCode(culpa.New("session_meta.source_file is required"), "INVALID")
	}
	if len(t.SessionMeta.SourceFile) > maxSourceFileBytes {
		return culpa.WithCode(culpa.Errorf("session_meta.source_file exceeds %d bytes", maxSourceFileBytes), "INVALID")
	}
	return nil
}

// Ingest streams NDJSON from body, validates each line, and commits in chunks
// of Cfg.ChunkSize under one transaction per chunk.
//
// Partial-accept semantics: lines that committed in earlier chunks remain
// committed even when a later line fails — Accepted reflects what is actually
// in the DB. A per-line failure (JSON parse, validation, FK) appends a
// LineError and stops processing; subsequent lines are not touched. A
// non-line failure (DB closed, body cap exceeded, scanner I/O) returns the
// (still-truthful) Result alongside the wrapped error so the Handler can map
// it to a 5xx or 413.
func (s *Service) Ingest(ctx context.Context, owner string, body io.Reader, maxBytes int64) (Result, error) {
	var (
		result   Result
		chunk    = make([]wire.TurnEvent, 0, s.Cfg.ChunkSize)
		startsAt = make([]int, 0, s.Cfg.ChunkSize) // line number of each buffered turn
		lineNum  int
	)

	commitChunk := func() error {
		if len(chunk) == 0 {
			return nil
		}
		firstLine := startsAt[0]
		first := chunk[0]

		txErr := database.InTx(ctx, s.Repo.Database.DB, func(tx *sqlx.Tx) error {
			return s.Repo.UpsertChunk(ctx, tx, owner, chunk)
		})
		if txErr != nil {
			s.Metrics.IngestLinesErrored.Inc()
			s.Log.L.WarnContext(ctx, "ingest chunk commit failed",
				slog.String("owner", owner),
				slog.Int("line", firstLine),
				slog.String("tool", first.Tool),
				slog.String("host", first.Host),
				slog.String("session_id", first.SessionID),
				scribe.Err(txErr),
			)
			return txErr
		}

		n := len(chunk)
		result.Accepted += n
		s.Metrics.IngestLinesAccepted.Add(float64(n))
		s.Metrics.IngestChunksCommitted.Inc()

		// Reset buffers without releasing the underlying capacity.
		chunk = chunk[:0]
		startsAt = startsAt[:0]
		return nil
	}

	handleChunkErr := func(err error) (Result, error) {
		// Take the failing chunk's first line BEFORE reset; commitChunk
		// leaves buffers untouched on error so startsAt still has it.
		failedLine := lineNum
		if len(startsAt) > 0 {
			failedLine = startsAt[0]
		}
		if isLineLevelDBError(err) {
			result.Errors = append(result.Errors, LineError{
				Line: failedLine,
				Err:  err.Error(),
			})
			return result, nil
		}
		return result, err
	}

	for raw, scanErr := range httputil.ReadNDJSONLines(body, maxBytes) {
		if scanErr != nil {
			// Body cap or oversized line: caller maps to 413.
			return result, scanErr
		}
		lineNum++

		var event wire.TurnEvent
		if err := json.Unmarshal(raw, &event); err != nil {
			s.Metrics.IngestLinesErrored.Inc()
			result.Errors = append(result.Errors, LineError{
				Line: lineNum,
				Err:  "invalid json: " + err.Error(),
			})
			return result, nil
		}
		if err := validateTurn(event, s.Cfg.MaxTurnContentBytes); err != nil {
			s.Metrics.IngestLinesErrored.Inc()
			result.Errors = append(result.Errors, LineError{
				Line: lineNum,
				Err:  err.Error(),
			})
			return result, nil
		}

		chunk = append(chunk, event)
		startsAt = append(startsAt, lineNum)

		if len(chunk) >= s.Cfg.ChunkSize {
			if err := commitChunk(); err != nil {
				return handleChunkErr(err)
			}
		}
	}

	if err := commitChunk(); err != nil {
		return handleChunkErr(err)
	}

	return result, nil
}

// isLineLevelDBError reports whether err describes a per-row failure (FK or
// constraint violation) that should be surfaced as a LineError in the 200
// response, rather than escalated to a 5xx. The classification mirrors the
// codes Repository.UpsertChunk attaches via wrapSQLErr.
func isLineLevelDBError(err error) bool {
	var cd culpa.CodeDetail
	if !culpa.FindDetail(err, &cd) {
		return false
	}
	code, ok := cd.Code.(string)
	if !ok {
		return false
	}
	switch code {
	case "DB_FOREIGN_KEY", "DB_CONSTRAINT":
		return true
	}
	return false
}