~bigbes/lethe

ref: 50654f96ce59bec2ca3200cd54806704472ff21f lethe/internal/domain/ingest/repository.go -rw-r--r-- 5.8 KiB
50654f96 — Eugene Blikh web: wire display settings UI 30 days ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
// Package ingest implements the NDJSON ingest pipeline for lethe: per-line
// validation against the wire contract, chunked transactional commit with
// per-line partial-accept semantics, and the trust-model invariant that the
// stored owner is always the authenticated user (never the wire payload).
//
// Layering inside the package:
//   - Repository owns the raw SQL: per-chunk UPSERT of sessions/turns inside a
//     caller-supplied transaction. It does not begin or commit the tx.
//   - Service owns the streaming, validation, and chunking policy. It opens
//     and commits one tx per chunk, increments metrics, and converts SQL
//     errors into per-line LineError entries (or hard 5xx for non-line
//     failures like a closed DB).
//   - Handler is the HTTP boundary: enforces Content-Type, body cap, and
//     auth-derived owner; renders RFC 7807 problems via apierror.Render.
package ingest

import (
	"context"
	"errors"

	"github.com/jmoiron/sqlx"
	"go.bigb.es/auxilia/culpa"
	"modernc.org/sqlite"
	sqlite3 "modernc.org/sqlite/lib"

	"sourcecraft.dev/bigbes/lethe/internal/platform/database"
	"sourcecraft.dev/bigbes/lethe/internal/shared/wire"
)

// Repository is the steward-managed SQL steward for ingest. It is stateless;
// Init is empty because every UpsertChunk call gets its tx from the Service.
type Repository struct {
	Database *database.Database `inject:""`
}

// Init satisfies the steward Initer contract. The Repository has no state to
// build up — the database is owned by the Database steward and injected
// directly.
func (r *Repository) Init(_ context.Context) error { return nil }

// upsertSessionStmt UPSERTs a row in `sessions` keyed by the composite PK
// (owner, tool, host, session_id). On conflict only `ended_at` is widened
// (MAX), preserving the first-write-wins values for `started_at`, `working_dir`,
// `source_file`, and `metadata`. SQLite leaves columns not in the SET list
// unchanged on the existing row when DO UPDATE fires.
const upsertSessionStmt = `
INSERT INTO sessions
    (owner, tool, host, session_id, started_at, ended_at, working_dir, source_file, metadata)
VALUES
    (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (owner, tool, host, session_id) DO UPDATE SET
    ended_at = MAX(sessions.ended_at, excluded.ended_at)
`

// upsertTurnStmt UPSERTs a row in `turns` keyed by the composite PK
// (owner, tool, host, session_id, turn_id). Last-write-wins: every non-key
// column is overwritten from the incoming row.
const upsertTurnStmt = `
INSERT INTO turns
    (owner, tool, host, session_id, turn_id, seq, role, timestamp,
     content, model, tokens_in, tokens_out, cost_usd, tool_calls, metadata)
VALUES
    (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (owner, tool, host, session_id, turn_id) DO UPDATE SET
    seq        = excluded.seq,
    role       = excluded.role,
    timestamp  = excluded.timestamp,
    content    = excluded.content,
    model      = excluded.model,
    tokens_in  = excluded.tokens_in,
    tokens_out = excluded.tokens_out,
    cost_usd   = excluded.cost_usd,
    tool_calls = excluded.tool_calls,
    metadata   = excluded.metadata
`

// UpsertChunk writes every turn (and its parent session) in chunk under the
// caller-supplied transaction. The tx is owned by the caller (the Service):
// UpsertChunk neither begins nor commits it.
//
// owner is bound from the function parameter on every INSERT — never from the
// wire payload. wire.TurnEvent has no `owner` JSON field; even if a hostile
// client sneaks a stray `"owner":"evil"` into the line, json.Unmarshal drops
// it silently and the SQL below never references it. This invariant is the
// core of the multi-tenant trust model.
func (r *Repository) UpsertChunk(ctx context.Context, tx *sqlx.Tx, owner string, chunk []wire.TurnEvent) error {
	for _, t := range chunk {
		startedAt := t.Timestamp
		if t.SessionMeta.StartedAt != nil {
			startedAt = *t.SessionMeta.StartedAt
		}

		// owner is bound from the function parameter, not the wire payload (trust model invariant).
		if _, err := tx.ExecContext(ctx, upsertSessionStmt,
			owner, t.Tool, t.Host, t.SessionID,
			startedAt,
			t.Timestamp, // ended_at first-write equals started turn timestamp; widened by MAX on conflict.
			t.SessionMeta.WorkingDir,
			t.SessionMeta.SourceFile,
			rawMessageOrNil(t.SessionMeta.Metadata),
		); err != nil {
			return wrapSQLErr(err, "upsert session")
		}

		// owner is bound from the function parameter, not the wire payload (trust model invariant).
		if _, err := tx.ExecContext(ctx, upsertTurnStmt,
			owner, t.Tool, t.Host, t.SessionID, t.TurnID,
			t.Seq, t.Role, t.Timestamp, t.Content,
			t.Model, t.TokensIn, t.TokensOut, t.CostUSD,
			rawMessageOrNil(t.ToolCalls),
			rawMessageOrNil(t.Metadata),
		); err != nil {
			return wrapSQLErr(err, "upsert turn")
		}
	}
	return nil
}

// rawMessageOrNil normalizes a json.RawMessage to a SQL NULL when empty so the
// stored value is a real NULL (not the literal string "null" or "").
func rawMessageOrNil(m []byte) any {
	if len(m) == 0 {
		return nil
	}
	return string(m)
}

// wrapSQLErr classifies SQLite-driver errors so the Service can decide
// whether to attribute them to the offending line (FK / constraint failures)
// or to surface them as a hard 5xx (DB closed, IO error). Any unrecognized
// failure is wrapped under DB_UPSERT.
func wrapSQLErr(err error, msg string) error {
	var se *sqlite.Error
	if errors.As(err, &se) {
		switch se.Code() {
		case sqlite3.SQLITE_CONSTRAINT_FOREIGNKEY:
			return culpa.WithCode(culpa.Wrap(err, msg), "DB_FOREIGN_KEY")
		case sqlite3.SQLITE_CONSTRAINT,
			sqlite3.SQLITE_CONSTRAINT_CHECK,
			sqlite3.SQLITE_CONSTRAINT_NOTNULL,
			sqlite3.SQLITE_CONSTRAINT_PRIMARYKEY,
			sqlite3.SQLITE_CONSTRAINT_UNIQUE:
			return culpa.WithCode(culpa.Wrap(err, msg), "DB_CONSTRAINT")
		}
	}
	return culpa.WithCode(culpa.Wrap(err, msg), "DB_UPSERT")
}