~bigbes/lethe

ref: 0ff155ae175906dbebf30fa57a71c0935280ad62 lethe/internal/domain/ingest/repository_test.go -rw-r--r-- 10.5 KiB
0ff155ae — Eugene Blikh docs(task): record execution conclusion across all 9 phases a month ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
package ingest_test

import (
	"context"
	"encoding/json"
	"testing"
	"time"

	"github.com/jmoiron/sqlx"
	_ "modernc.org/sqlite"

	"sourcecraft.dev/bigbes/lethe/internal/config"
	"sourcecraft.dev/bigbes/lethe/internal/domain/ingest"
	"sourcecraft.dev/bigbes/lethe/internal/platform/database"
	"sourcecraft.dev/bigbes/lethe/internal/shared/wire"
)

// newTestDatabase builds a Database steward against :memory:, runs Init, and
// registers Cleanup. Returns the *Database (Repository expects the steward).
func newTestDatabase(t *testing.T) *database.Database {
	t.Helper()
	d := &database.Database{
		Cfg: config.DatabaseConfig{
			Path:        ":memory:",
			BusyTimeout: 5 * time.Second,
		},
	}
	if err := d.Init(context.Background()); err != nil {
		t.Fatalf("database.Init: %v", err)
	}
	t.Cleanup(func() { _ = d.Destroy(context.Background()) })
	return d
}

// newRepo wires a Repository against a fresh in-memory database.
func newRepo(t *testing.T) (*ingest.Repository, *sqlx.DB) {
	t.Helper()
	d := newTestDatabase(t)
	repo := &ingest.Repository{Database: d}
	if err := repo.Init(context.Background()); err != nil {
		t.Fatalf("repo.Init: %v", err)
	}
	return repo, d.DB
}

func ptrInt64(v int64) *int64       { return &v }
func ptrString(v string) *string    { return &v }
func ptrFloat64(v float64) *float64 { return &v }

// turn builds a minimal valid TurnEvent with overridable session_meta.
func turn(tool, host, sid, tid string, seq int64, ts int64, content string) wire.TurnEvent {
	return wire.TurnEvent{
		Tool:      tool,
		Host:      host,
		SessionID: sid,
		TurnID:    tid,
		Seq:       seq,
		Role:      "user",
		Timestamp: ts,
		Content:   content,
		SessionMeta: wire.SessionMeta{
			SourceFile: "/tmp/x.jsonl",
		},
	}
}

func runUpsert(t *testing.T, repo *ingest.Repository, db *sqlx.DB, owner string, turns []wire.TurnEvent) {
	t.Helper()
	if err := database.InTx(context.Background(), db, func(tx *sqlx.Tx) error {
		return repo.UpsertChunk(context.Background(), tx, owner, turns)
	}); err != nil {
		t.Fatalf("upsert: %v", err)
	}
}

func TestRepository_UpsertChunk_FirstWriteWinsSession(t *testing.T) {
	repo, db := newRepo(t)
	t1 := turn("cc", "phoebe", "s1", "tA", 1, 1700000000, "hello")
	t1.SessionMeta.WorkingDir = ptrString("/A")
	runUpsert(t, repo, db, "alice", []wire.TurnEvent{t1})

	t2 := turn("cc", "phoebe", "s1", "tB", 2, 1700000100, "world")
	t2.SessionMeta.WorkingDir = ptrString("/B")
	runUpsert(t, repo, db, "alice", []wire.TurnEvent{t2})

	var wd string
	if err := db.Get(&wd, `SELECT working_dir FROM sessions WHERE owner=? AND tool=? AND host=? AND session_id=?`,
		"alice", "cc", "phoebe", "s1"); err != nil {
		t.Fatalf("query: %v", err)
	}
	if wd != "/A" {
		t.Fatalf("expected first-write working_dir /A, got %q", wd)
	}
}

func TestRepository_UpsertChunk_LastWriteWinsTurn(t *testing.T) {
	repo, db := newRepo(t)
	tA := turn("cc", "phoebe", "s1", "tA", 1, 1700000000, "first content")
	runUpsert(t, repo, db, "alice", []wire.TurnEvent{tA})

	tB := turn("cc", "phoebe", "s1", "tA", 2, 1700000050, "second content")
	tB.Role = "assistant"
	runUpsert(t, repo, db, "alice", []wire.TurnEvent{tB})

	var content, role string
	var seq int64
	if err := db.QueryRow(`SELECT content, role, seq FROM turns WHERE owner=? AND turn_id=?`,
		"alice", "tA").Scan(&content, &role, &seq); err != nil {
		t.Fatalf("query: %v", err)
	}
	if content != "second content" || role != "assistant" || seq != 2 {
		t.Fatalf("expected last-write columns; got content=%q role=%q seq=%d", content, role, seq)
	}
}

func TestRepository_UpsertChunk_EndedAtMax(t *testing.T) {
	repo, db := newRepo(t)
	t1 := turn("cc", "phoebe", "s1", "tA", 1, 1700000100, "first")
	t1.SessionMeta.StartedAt = ptrInt64(1700000000)
	runUpsert(t, repo, db, "alice", []wire.TurnEvent{t1})

	// Second turn: smaller ended_at than current; MAX must keep first.
	t2 := turn("cc", "phoebe", "s1", "tB", 2, 1699999999, "earlier")
	runUpsert(t, repo, db, "alice", []wire.TurnEvent{t2})
	var ended int64
	if err := db.Get(&ended, `SELECT ended_at FROM sessions WHERE owner=? AND session_id=?`, "alice", "s1"); err != nil {
		t.Fatalf("q1: %v", err)
	}
	if ended != 1700000100 {
		t.Fatalf("expected ended_at preserved at 1700000100, got %d", ended)
	}

	// Third: larger ended_at; MAX must extend.
	t3 := turn("cc", "phoebe", "s1", "tC", 3, 1700000500, "later")
	runUpsert(t, repo, db, "alice", []wire.TurnEvent{t3})
	if err := db.Get(&ended, `SELECT ended_at FROM sessions WHERE owner=? AND session_id=?`, "alice", "s1"); err != nil {
		t.Fatalf("q2: %v", err)
	}
	if ended != 1700000500 {
		t.Fatalf("expected ended_at extended to 1700000500, got %d", ended)
	}
}

func TestRepository_UpsertChunk_StartedAtFallbackMinTurnTimestamp(t *testing.T) {
	repo, db := newRepo(t)
	// SessionMeta.StartedAt is nil; started_at must come from turn timestamp.
	t1 := turn("cc", "phoebe", "s1", "tA", 1, 1700000050, "x")
	runUpsert(t, repo, db, "alice", []wire.TurnEvent{t1})
	var started int64
	if err := db.Get(&started, `SELECT started_at FROM sessions WHERE owner=? AND session_id=?`, "alice", "s1"); err != nil {
		t.Fatalf("q: %v", err)
	}
	if started != 1700000050 {
		t.Fatalf("expected started_at fallback 1700000050, got %d", started)
	}

	// A subsequent turn with an earlier timestamp must NOT change started_at
	// (first-write-wins on start).
	t2 := turn("cc", "phoebe", "s1", "tB", 2, 1700000010, "earlier")
	runUpsert(t, repo, db, "alice", []wire.TurnEvent{t2})
	if err := db.Get(&started, `SELECT started_at FROM sessions WHERE owner=? AND session_id=?`, "alice", "s1"); err != nil {
		t.Fatalf("q2: %v", err)
	}
	if started != 1700000050 {
		t.Fatalf("expected started_at preserved at 1700000050, got %d", started)
	}
}

func TestRepository_UpsertChunk_PerOwnerIsolation(t *testing.T) {
	repo, db := newRepo(t)
	tA := turn("cc", "phoebe", "s1", "tA", 1, 1700000000, "shared")
	runUpsert(t, repo, db, "alice", []wire.TurnEvent{tA})
	runUpsert(t, repo, db, "bob", []wire.TurnEvent{tA})

	var rows int
	if err := db.Get(&rows, `SELECT COUNT(*) FROM sessions WHERE tool='cc' AND host='phoebe' AND session_id='s1'`); err != nil {
		t.Fatalf("q: %v", err)
	}
	if rows != 2 {
		t.Fatalf("expected two sessions across owners, got %d", rows)
	}
	if err := db.Get(&rows, `SELECT COUNT(*) FROM turns WHERE turn_id='tA'`); err != nil {
		t.Fatalf("q2: %v", err)
	}
	if rows != 2 {
		t.Fatalf("expected two turns across owners, got %d", rows)
	}
}

func TestRepository_UpsertChunk_FKViolationPropagatesAsLineLevelCode(t *testing.T) {
	// Repository alone cannot produce an FK violation — sessions are upserted
	// by the same call. We synthesize the violation by inserting a turn
	// referencing a (owner, tool, host, session_id) that does not exist;
	// this proves the FK wrapper code-path is reachable via the Repository's
	// raw exec layer for the Service to classify as line-level.
	d := newTestDatabase(t)
	tx, err := d.DB.BeginTxx(context.Background(), nil)
	if err != nil {
		t.Fatalf("begin: %v", err)
	}
	defer func() { _ = tx.Rollback() }()

	_, err = tx.Exec(`
		INSERT INTO turns (owner, tool, host, session_id, turn_id, seq, role, timestamp, content)
		VALUES ('alice','cc','phoebe','ghost','tX',1,'user',1,'orphan')`)
	if err == nil {
		t.Fatalf("expected FK violation, got nil")
	}
}

func TestRepository_UpsertChunk_OwnerBoundFromParameter(t *testing.T) {
	// Sneak a non-wire field into the JSON; unmarshalling drops it. Then
	// confirm the stored row uses the parameter, not whatever the wire said.
	repo, db := newRepo(t)
	raw := []byte(`{"owner":"evil","tool":"cc","host":"phoebe","session_id":"s1","turn_id":"tA","seq":1,"role":"user","timestamp":1700000000,"content":"hi","session_meta":{"source_file":"/tmp/x.jsonl"}}`)
	var ev wire.TurnEvent
	if err := json.Unmarshal(raw, &ev); err != nil {
		t.Fatalf("unmarshal: %v", err)
	}
	runUpsert(t, repo, db, "alice", []wire.TurnEvent{ev})

	var owner string
	if err := db.Get(&owner, `SELECT owner FROM turns WHERE turn_id='tA'`); err != nil {
		t.Fatalf("q: %v", err)
	}
	if owner != "alice" {
		t.Fatalf("expected stored owner=alice, got %q", owner)
	}
	var n int
	if err := db.Get(&n, `SELECT COUNT(*) FROM turns WHERE owner='evil'`); err != nil {
		t.Fatalf("q evil: %v", err)
	}
	if n != 0 {
		t.Fatalf("expected zero rows for owner=evil, got %d", n)
	}
}

func TestRepository_UpsertChunk_FTSPopulated(t *testing.T) {
	repo, db := newRepo(t)
	tc := json.RawMessage(`{"name":"shell","args":"ls"}`)
	t1 := turn("cc", "phoebe", "s1", "tA", 1, 1700000000, "find the needle")
	t1.ToolCalls = tc
	runUpsert(t, repo, db, "alice", []wire.TurnEvent{t1})

	var n int
	if err := db.Get(&n, `SELECT COUNT(*) FROM turns_fts WHERE owner='alice' AND turns_fts MATCH 'needle'`); err != nil {
		t.Fatalf("q turns_fts: %v", err)
	}
	if n != 1 {
		t.Fatalf("expected 1 turns_fts hit, got %d", n)
	}
	if err := db.Get(&n, `SELECT COUNT(*) FROM tool_outputs_fts WHERE owner='alice' AND tool_outputs_fts MATCH 'shell'`); err != nil {
		t.Fatalf("q tool_outputs_fts: %v", err)
	}
	if n != 1 {
		t.Fatalf("expected 1 tool_outputs_fts hit, got %d", n)
	}
}

// Verify metadata json.RawMessage round-trips as opaque text into the row.
func TestRepository_UpsertChunk_MetadataPersisted(t *testing.T) {
	repo, db := newRepo(t)
	t1 := turn("cc", "phoebe", "s1", "tA", 1, 1700000000, "x")
	t1.Metadata = json.RawMessage(`{"k":"v"}`)
	t1.SessionMeta.Metadata = json.RawMessage(`{"sk":"sv"}`)
	runUpsert(t, repo, db, "alice", []wire.TurnEvent{t1})

	var tm, sm string
	if err := db.QueryRow(`SELECT metadata FROM turns WHERE turn_id='tA'`).Scan(&tm); err != nil {
		t.Fatalf("q tm: %v", err)
	}
	if tm != `{"k":"v"}` {
		t.Fatalf("turn metadata roundtrip failed: %q", tm)
	}
	if err := db.QueryRow(`SELECT metadata FROM sessions WHERE session_id='s1'`).Scan(&sm); err != nil {
		t.Fatalf("q sm: %v", err)
	}
	if sm != `{"sk":"sv"}` {
		t.Fatalf("session metadata roundtrip failed: %q", sm)
	}
}

// Defensive: pass a free *float64 to ensure pointer types serialize cleanly.
func TestRepository_UpsertChunk_OptionalPointersStored(t *testing.T) {
	repo, db := newRepo(t)
	t1 := turn("cc", "phoebe", "s1", "tA", 1, 1700000000, "x")
	t1.Model = ptrString("gpt-x")
	t1.TokensIn = ptrInt64(10)
	t1.TokensOut = ptrInt64(20)
	t1.CostUSD = ptrFloat64(0.12)
	runUpsert(t, repo, db, "alice", []wire.TurnEvent{t1})

	var (
		model     string
		tokensIn  int64
		tokensOut int64
		cost      float64
	)
	if err := db.QueryRow(`SELECT model, tokens_in, tokens_out, cost_usd FROM turns WHERE turn_id='tA'`).
		Scan(&model, &tokensIn, &tokensOut, &cost); err != nil {
		t.Fatalf("q: %v", err)
	}
	if model != "gpt-x" || tokensIn != 10 || tokensOut != 20 || cost != 0.12 {
		t.Fatalf("optional fields not round-tripped: %v %v %v %v", model, tokensIn, tokensOut, cost)
	}
}