~bigbes/lethe

ref: ccee575b9f5055ffbd0f82cdd70bb82934240b54 lethe/internal/collector/parser/opencode/parser.go -rw-r--r-- 9.8 KiB
ccee575b — Eugene Blikh feat: add GFM markdown support including tables via remark-gfm 23 days ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
package opencode

import (
	"context"
	"crypto/sha256"
	"database/sql"
	"encoding/hex"
	"encoding/json"
	"fmt"
	"os"
	"path/filepath"
	"strings"
	"time"

	"sourcecraft.dev/bigbes/lethe/internal/collector/parser"
	"sourcecraft.dev/bigbes/lethe/internal/shared/wire"

	_ "modernc.org/sqlite"
)

const toolName = "opencode"

// Parser maps opencode SQLite transcript records into lethe wire events.
type Parser struct {
	host string
}

// New builds a parser that stamps every emitted event with host.
func New(host string) *Parser {
	return &Parser{host: host}
}

// Tool returns the collector-facing tool name.
func (p *Parser) Tool() string {
	return toolName
}

// Discover returns the canonical opencode SQLite database below root. The root
// may be ~/.local/share/opencode or the database path itself.
func (p *Parser) Discover(root string) ([]parser.SourceFile, error) {
	info, err := os.Stat(root)
	if err != nil {
		return nil, err
	}
	if !info.IsDir() {
		if filepath.Base(root) != "opencode.db" {
			return nil, nil
		}
		return []parser.SourceFile{{Path: root, Size: info.Size()}}, nil
	}

	dbPath := filepath.Join(root, "opencode.db")
	info, err = os.Stat(dbPath)
	if err != nil {
		if os.IsNotExist(err) {
			return nil, nil
		}
		return nil, err
	}
	if info.IsDir() {
		return nil, nil
	}
	return []parser.SourceFile{{Path: dbPath, Size: info.Size()}}, nil
}

// Parse reads opencode messages starting at since. since and the returned marker
// are message.rowid markers: since is the next rowid to scan, returned offset is
// one past the last scanned row, and message.time_created remains the timestamp.
func (p *Parser) Parse(path string, since int64) ([]wire.TurnEvent, int64, error) {
	db, err := sql.Open("sqlite", readOnlyDSN(path))
	if err != nil {
		return nil, since, err
	}
	defer func() { _ = db.Close() }()

	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
	defer cancel()
	rows, err := db.QueryContext(ctx, `
SELECT
    m.rowid,
    m.id,
    m.session_id,
    m.time_created,
    m.data,
    s.directory,
    s.time_created,
    s.slug,
    s.version
FROM message m
JOIN session s ON s.id = m.session_id
WHERE m.rowid >= ?
ORDER BY m.rowid ASC`, since)
	if err != nil {
		return nil, since, err
	}
	defer func() { _ = rows.Close() }()

	events := make([]wire.TurnEvent, 0)
	next := since
	for rows.Next() {
		var record messageRow
		if err := rows.Scan(&record.RowID, &record.ID, &record.SessionID, &record.TimeCreated, &record.Data, &record.Directory, &record.SessionCreated, &record.Slug, &record.Version); err != nil {
			return events, next, err
		}
		if candidate := record.RowID + 1; candidate > next {
			next = candidate
		}
		parts, err := loadParts(ctx, db, record.ID)
		if err != nil {
			return events, next, err
		}
		event, ok := p.mapRecord(path, record, parts)
		if ok {
			events = append(events, event)
		}
	}
	if err := rows.Err(); err != nil {
		return events, next, err
	}
	return events, next, nil
}

type messageRow struct {
	RowID          int64
	ID             string
	SessionID      string
	TimeCreated    int64
	Data           string
	Directory      string
	SessionCreated int64
	Slug           string
	Version        string
}

type partRow struct {
	ID          string
	TimeCreated int64
	Data        string
}

type messageData struct {
	Role       string    `json:"role"`
	Agent      string    `json:"agent"`
	Mode       string    `json:"mode"`
	ModelID    string    `json:"modelID"`
	ProviderID string    `json:"providerID"`
	Tokens     tokenData `json:"tokens"`
	Cost       float64   `json:"cost"`
	Path       pathData  `json:"path"`
	Time       timeData  `json:"time"`
	Raw        json.RawMessage
}

type tokenData struct {
	Input  int64 `json:"input"`
	Output int64 `json:"output"`
}

type pathData struct {
	CWD  string `json:"cwd"`
	Root string `json:"root"`
}

type timeData struct {
	Created   int64 `json:"created"`
	Completed int64 `json:"completed"`
}

type partData struct {
	Type   string          `json:"type"`
	Text   string          `json:"text"`
	Tool   string          `json:"tool"`
	CallID string          `json:"callID"`
	State  json.RawMessage `json:"state"`
	Raw    json.RawMessage
}

type toolState struct {
	Status string          `json:"status"`
	Output json.RawMessage `json:"output"`
}

type toolCallSummary struct {
	Tool   string `json:"tool,omitempty"`
	CallID string `json:"call_id,omitempty"`
	Status string `json:"status,omitempty"`
	Output string `json:"output,omitempty"`
}

func loadParts(ctx context.Context, db *sql.DB, messageID string) ([]partRow, error) {
	rows, err := db.QueryContext(ctx, `
SELECT id, time_created, data
FROM part
WHERE message_id = ?
ORDER BY time_created ASC, id ASC`, messageID)
	if err != nil {
		return nil, err
	}
	defer func() { _ = rows.Close() }()

	parts := make([]partRow, 0)
	for rows.Next() {
		var part partRow
		if err := rows.Scan(&part.ID, &part.TimeCreated, &part.Data); err != nil {
			return nil, err
		}
		parts = append(parts, part)
	}
	if err := rows.Err(); err != nil {
		return nil, err
	}
	return parts, nil
}

func (p *Parser) mapRecord(path string, record messageRow, partRows []partRow) (wire.TurnEvent, bool) {
	msg, ok := parseMessageData(record.Data)
	if !ok {
		return wire.TurnEvent{}, false
	}
	role := strings.TrimSpace(msg.Role)
	if role != "user" && role != "assistant" && role != "system" {
		return wire.TurnEvent{}, false
	}

	texts := make([]string, 0)
	toolCalls := make([]toolCallSummary, 0)
	for _, row := range partRows {
		part, ok := parsePartData(row.Data)
		if !ok {
			continue
		}
		switch part.Type {
		case "text", "reasoning":
			if text := strings.TrimSpace(part.Text); text != "" {
				texts = append(texts, text)
			}
		case "tool":
			summary := summarizeToolPart(part)
			toolCalls = append(toolCalls, summary)
		}
	}
	content := strings.Join(texts, "\n\n")
	if strings.TrimSpace(content) == "" && len(toolCalls) == 0 {
		return wire.TurnEvent{}, false
	}
	if strings.TrimSpace(content) == "" && len(toolCalls) > 0 {
		parts := make([]string, 0, len(toolCalls))
		for _, tc := range toolCalls {
			parts = append(parts, renderToolSummary(tc))
		}
		content = strings.Join(parts, "\n")
	}

	event := wire.TurnEvent{
		Tool:      toolName,
		Host:      p.host,
		SessionID: record.SessionID,
		TurnID:    turnIDFor(record),
		Seq:       record.RowID,
		Role:      role,
		Timestamp: record.TimeCreated / 1000,
		Content:   content,
		SessionMeta: wire.SessionMeta{
			WorkingDir: workingDirFor(record, msg),
			SourceFile: path,
			StartedAt:  int64PtrOrNil(record.SessionCreated / 1000),
			Metadata:   sessionMetadata(record),
		},
		Metadata: json.RawMessage(record.Data),
	}
	if model := strings.TrimSpace(msg.ModelID); model != "" {
		event.Model = &model
	}
	if msg.Tokens.Input != 0 {
		event.TokensIn = int64PtrOrNil(msg.Tokens.Input)
	}
	if msg.Tokens.Output != 0 {
		event.TokensOut = int64PtrOrNil(msg.Tokens.Output)
	}
	if msg.Cost != 0 {
		cost := msg.Cost
		event.CostUSD = &cost
	}
	if len(toolCalls) > 0 {
		payload, err := json.Marshal(toolCalls)
		if err == nil {
			event.ToolCalls = payload
		}
	}
	return event, true
}

func parseMessageData(raw string) (messageData, bool) {
	var msg messageData
	if err := json.Unmarshal([]byte(raw), &msg); err != nil {
		return messageData{}, false
	}
	msg.Raw = cloneRaw([]byte(raw))
	return msg, true
}

func parsePartData(raw string) (partData, bool) {
	var part partData
	if err := json.Unmarshal([]byte(raw), &part); err != nil {
		return partData{}, false
	}
	part.Raw = cloneRaw([]byte(raw))
	return part, true
}

func summarizeToolPart(part partData) toolCallSummary {
	summary := toolCallSummary{Tool: strings.TrimSpace(part.Tool), CallID: strings.TrimSpace(part.CallID)}
	var state toolState
	if len(part.State) > 0 && json.Unmarshal(part.State, &state) == nil {
		summary.Status = strings.TrimSpace(state.Status)
		summary.Output = safeOutputSummary(state.Output)
	}
	return summary
}

func safeOutputSummary(raw json.RawMessage) string {
	trimmed := strings.TrimSpace(string(raw))
	if trimmed == "" || trimmed == "null" {
		return ""
	}
	var text string
	if err := json.Unmarshal(raw, &text); err == nil {
		return text
	}
	return trimmed
}

func renderToolSummary(summary toolCallSummary) string {
	label := summary.Tool
	if label == "" {
		label = "tool"
	}
	parts := []string{fmt.Sprintf("<tool: %s", label)}
	if summary.Status != "" {
		parts = append(parts, " "+summary.Status)
	}
	if summary.Output != "" {
		parts = append(parts, " - "+summarizeText(summary.Output))
	}
	return strings.Join(parts, "") + ">"
}

func sessionMetadata(record messageRow) json.RawMessage {
	payload := struct {
		Slug    string `json:"slug,omitempty"`
		Version string `json:"version,omitempty"`
	}{Slug: record.Slug, Version: record.Version}
	b, err := json.Marshal(payload)
	if err != nil || string(b) == "{}" {
		return nil
	}
	return b
}

func workingDirFor(record messageRow, msg messageData) *string {
	for _, value := range []string{record.Directory, msg.Path.CWD, msg.Path.Root} {
		if value = strings.TrimSpace(value); value != "" {
			return &value
		}
	}
	return nil
}

func turnIDFor(record messageRow) string {
	if strings.TrimSpace(record.ID) != "" {
		return record.ID
	}
	sum := sha256.Sum256([]byte(fmt.Sprintf("%s|%d", record.SessionID, record.TimeCreated)))
	return hex.EncodeToString(sum[:8])
}

func summarizeText(text string) string {
	text = strings.TrimSpace(text)
	if text == "" {
		return ""
	}
	line := text
	if idx := strings.IndexByte(line, '\n'); idx >= 0 {
		line = line[:idx]
	}
	line = strings.TrimSpace(line)
	if len(line) > 120 {
		line = line[:117] + "..."
	}
	return line
}

func int64PtrOrNil(v int64) *int64 {
	if v == 0 {
		return nil
	}
	return &v
}

func cloneRaw(raw json.RawMessage) json.RawMessage {
	if len(raw) == 0 {
		return nil
	}
	out := make([]byte, len(raw))
	copy(out, raw)
	return out
}

func readOnlyDSN(path string) string {
	return "file:" + path + "?mode=ro&_pragma=busy_timeout(5000)"
}

var _ parser.Parser = (*Parser)(nil)