~bigbes/lethe

ref: 2d9d2b8ec08ee09cc64c5d925ab85716b1d7d1fb lethe/internal/domain/search/repository.go -rw-r--r-- 9.1 KiB
2d9d2b8e — Eugene Blikh search: add /api/v1/search API and opencode collector parser 23 days ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
package search

import (
	"context"
	"crypto/sha256"
	"database/sql"
	"encoding/base64"
	"encoding/hex"
	"encoding/json"
	"errors"
	"fmt"
	"strings"

	"go.bigb.es/auxilia/culpa"

	"sourcecraft.dev/bigbes/lethe/internal/domain/session"
	"sourcecraft.dev/bigbes/lethe/internal/platform/database"
)

const (
	SourceTurn       = "turn"
	SourceToolOutput = "tool_output"

	cursorVersion = 1
)

// Result is the repository response shape for paginated search output.
type Result struct {
	Results    []Row  `json:"results"`
	Limit      int    `json:"limit"`
	NextCursor string `json:"next_cursor,omitempty"`

	Rows []Row `json:"-"`
}

// Row is one matched turn. Snippet is generated by SQLite FTS5 and uses marker
// runes around hits; it never contains HTML markup added by the repository.
type Row struct {
	Owner       string  `db:"owner"      json:"owner"`
	Tool        string  `db:"tool"       json:"tool"`
	Host        string  `db:"host"       json:"host"`
	SessionID   string  `db:"session_id" json:"session_id"`
	WorkingDir  *string `db:"working_dir" json:"working_dir,omitempty"`
	TurnID      string  `db:"turn_id"    json:"turn_id"`
	Seq         int64   `db:"seq"        json:"seq"`
	Role        string  `db:"role"       json:"role"`
	Timestamp   int64   `db:"timestamp"  json:"timestamp"`
	Rank        float64 `db:"rank"       json:"rank"`
	MatchSource string  `db:"match_source" json:"match_source"`
	Snippet     string  `db:"snippet"    json:"snippet"`
	RowID       int64   `db:"rowid"      json:"-"`
}

// Filter controls query text, owner scoping, optional filters, and pagination.
type Filter struct {
	Owner              session.OwnerScope
	Query              string
	IncludeToolOutputs bool
	Tool               *string
	Host               *string
	Since              *int64
	Until              *int64
	Limit              int
	Cursor             string
}

// Cursor is the stable keyset position for the last row returned by a page.
type Cursor struct {
	Rank        float64 `json:"rank"`
	Timestamp   int64   `json:"timestamp"`
	TurnID      string  `json:"turn_id"`
	MatchSource string  `json:"match_source"`
	RowID       int64   `json:"rowid,omitempty"`
}

// Repository is the SQL steward for FTS search. It is read-only: Search only
// builds SELECT statements over the existing FTS and turns tables.
type Repository struct {
	Database *database.Database `inject:""`
}

func (r *Repository) Init(_ context.Context) error { return nil }

// Search executes an FTS5 query against prose turns by default and unions tool
// output matches only when requested. Duplicate hits for the same turn are
// collapsed by best rank, with deterministic tie-breakers.
func (r *Repository) Search(ctx context.Context, f Filter) (*Result, error) {
	if strings.TrimSpace(f.Query) == "" {
		return nil, invalid("search query is empty")
	}
	if f.Limit <= 0 {
		return &Result{Results: []Row{}, Limit: f.Limit, Rows: []Row{}}, nil
	}

	var after *Cursor
	if f.Cursor != "" {
		c, err := DecodeCursor(f.Cursor, f)
		if err != nil {
			return nil, err
		}
		after = &c
	}

	query, args := buildSearchSQL(f, after)
	rows := make([]Row, 0)
	if err := r.Database.DB.SelectContext(ctx, &rows, query, args...); err != nil {
		if isFTSError(err) {
			return nil, invalid("invalid search query")
		}
		return nil, culpa.WithCode(culpa.Wrap(err, "search"), "DB_QUERY")
	}

	res := &Result{Results: rows, Limit: f.Limit, Rows: rows}
	if len(rows) > f.Limit {
		last := rows[f.Limit-1]
		res.Results = rows[:f.Limit]
		res.Rows = res.Results
		next, err := EncodeCursor(Cursor{Rank: last.Rank, Timestamp: last.Timestamp, TurnID: last.TurnID, MatchSource: last.MatchSource, RowID: last.RowID}, f)
		if err != nil {
			return nil, err
		}
		res.NextCursor = next
	}
	return res, nil
}

func buildSearchSQL(f Filter, after *Cursor) (string, []any) {
	limit := f.Limit + 1
	var sb strings.Builder
	args := make([]any, 0, 12)

	sb.WriteString(`WITH candidates AS (`)
	appendFTSSelect(&sb, &args, "turns_fts", SourceTurn, 0, f)
	if f.IncludeToolOutputs {
		sb.WriteString(` UNION ALL `)
		appendFTSSelect(&sb, &args, "tool_outputs_fts", SourceToolOutput, 1, f)
	}
	sb.WriteString(`), ranked AS (`)
	sb.WriteString(`SELECT *, ROW_NUMBER() OVER (PARTITION BY rowid ORDER BY rank ASC, match_source ASC, rowid ASC) AS rn FROM candidates`)
	sb.WriteString(`) SELECT t.owner, t.tool, t.host, t.session_id, s.working_dir, t.turn_id, t.seq, t.role, t.timestamp, ranked.rank, ranked.match_source, ranked.snippet, ranked.rowid`)
	sb.WriteString(` FROM ranked JOIN turns t ON t.rowid = ranked.rowid`)
	sb.WriteString(` JOIN sessions s ON s.owner = t.owner AND s.tool = t.tool AND s.host = t.host AND s.session_id = t.session_id`)
	sb.WriteString(` WHERE ranked.rn = 1`)
	if f.Since != nil {
		sb.WriteString(` AND t.timestamp >= ?`)
		args = append(args, *f.Since)
	}
	if f.Until != nil {
		sb.WriteString(` AND t.timestamp <= ?`)
		args = append(args, *f.Until)
	}
	if after != nil {
		sb.WriteString(` AND (`)
		sb.WriteString(`ranked.rank > ?`)
		sb.WriteString(` OR (ranked.rank = ? AND t.timestamp < ?)`)
		sb.WriteString(` OR (ranked.rank = ? AND t.timestamp = ? AND t.turn_id > ?)`)
		sb.WriteString(` OR (ranked.rank = ? AND t.timestamp = ? AND t.turn_id = ? AND ranked.match_source > ?)`)
		sb.WriteString(` OR (ranked.rank = ? AND t.timestamp = ? AND t.turn_id = ? AND ranked.match_source = ? AND ranked.rowid > ?)`)
		sb.WriteString(`)`)
		args = append(args,
			after.Rank,
			after.Rank, after.Timestamp,
			after.Rank, after.Timestamp, after.TurnID,
			after.Rank, after.Timestamp, after.TurnID, after.MatchSource,
			after.Rank, after.Timestamp, after.TurnID, after.MatchSource, after.RowID,
		)
	}
	sb.WriteString(` ORDER BY ranked.rank ASC, t.timestamp DESC, t.turn_id ASC, ranked.match_source ASC, ranked.rowid ASC LIMIT ?`)
	args = append(args, limit)
	return sb.String(), args
}

func appendFTSSelect(sb *strings.Builder, args *[]any, table, source string, priority int, f Filter) {
	fmt.Fprintf(sb, `SELECT rowid, %d AS source_priority, '%s' AS match_source, bm25(%s) AS rank, snippet(%s, 0, char(2), char(3), '…', 12) AS snippet FROM %s WHERE %s MATCH ?`, priority, source, table, table, table, table)
	*args = append(*args, f.Query)
	appendFTSFilters(sb, args, table, f)
}

func appendFTSFilters(sb *strings.Builder, args *[]any, table string, f Filter) {
	switch {
	case f.Owner.AllOwners:
	case f.Owner.SpecificOwner != nil:
		fmt.Fprintf(sb, ` AND %s.owner = ?`, table)
		*args = append(*args, *f.Owner.SpecificOwner)
	default:
		fmt.Fprintf(sb, ` AND %s.owner = ?`, table)
		*args = append(*args, f.Owner.User)
	}
	if f.Tool != nil {
		fmt.Fprintf(sb, ` AND %s.tool = ?`, table)
		*args = append(*args, *f.Tool)
	}
	if f.Host != nil {
		fmt.Fprintf(sb, ` AND %s.host = ?`, table)
		*args = append(*args, *f.Host)
	}
}

// EncodeCursor returns an opaque cursor bound to the normalized search tuple.
func EncodeCursor(c Cursor, f Filter) (string, error) {
	p := cursorPayload{Version: cursorVersion, Cursor: c, FilterHash: filterHash(f)}
	b, err := json.Marshal(p)
	if err != nil {
		return "", culpa.WithCode(culpa.Wrap(err, "encode search cursor"), "INTERNAL")
	}
	return base64.RawURLEncoding.EncodeToString(b), nil
}

// DecodeCursor parses an opaque cursor and rejects cursors created for another
// normalized query/filter tuple.
func DecodeCursor(raw string, f Filter) (Cursor, error) {
	b, err := base64.RawURLEncoding.DecodeString(raw)
	if err != nil {
		return Cursor{}, invalid("invalid search cursor")
	}
	var p cursorPayload
	if err := json.Unmarshal(b, &p); err != nil {
		return Cursor{}, invalid("invalid search cursor")
	}
	if p.Version != cursorVersion || p.FilterHash != filterHash(f) || p.Cursor.TurnID == "" || p.Cursor.MatchSource == "" || p.Cursor.RowID <= 0 {
		return Cursor{}, invalid("invalid search cursor")
	}
	return p.Cursor, nil
}

type cursorPayload struct {
	Version    int    `json:"v"`
	FilterHash string `json:"h"`
	Cursor     Cursor `json:"c"`
}

type normalizedFilter struct {
	OwnerUser          string  `json:"owner_user"`
	OwnerAllOwners     bool    `json:"owner_all_owners"`
	OwnerSpecificOwner *string `json:"owner_specific_owner"`
	Query              string  `json:"query"`
	IncludeToolOutputs bool    `json:"include_tool_outputs"`
	Tool               *string `json:"tool"`
	Host               *string `json:"host"`
	Since              *int64  `json:"since"`
	Until              *int64  `json:"until"`
}

func filterHash(f Filter) string {
	n := normalizedFilter{
		OwnerUser:          f.Owner.User,
		OwnerAllOwners:     f.Owner.AllOwners,
		OwnerSpecificOwner: f.Owner.SpecificOwner,
		Query:              strings.TrimSpace(f.Query),
		IncludeToolOutputs: f.IncludeToolOutputs,
		Tool:               f.Tool,
		Host:               f.Host,
		Since:              f.Since,
		Until:              f.Until,
	}
	b, err := json.Marshal(n)
	if err != nil {
		panic(err)
	}
	sum := sha256.Sum256(b)
	return hex.EncodeToString(sum[:])
}

func invalid(msg string) error {
	return culpa.WithCode(culpa.New(msg), "INVALID")
}

func isFTSError(err error) bool {
	if errors.Is(err, sql.ErrNoRows) {
		return false
	}
	s := strings.ToLower(err.Error())
	return strings.Contains(s, "fts5") || strings.Contains(s, "fts syntax") || strings.Contains(s, "malformed match") || strings.Contains(s, "unterminated string")
}