package opencode import ( "context" "crypto/sha256" "database/sql" "encoding/hex" "encoding/json" "fmt" "os" "path/filepath" "strings" "time" "sourcecraft.dev/bigbes/lethe/internal/collector/parser" "sourcecraft.dev/bigbes/lethe/internal/shared/wire" _ "modernc.org/sqlite" ) const toolName = "opencode" // Parser maps opencode SQLite transcript records into lethe wire events. type Parser struct { host string } // New builds a parser that stamps every emitted event with host. func New(host string) *Parser { return &Parser{host: host} } // Tool returns the collector-facing tool name. func (p *Parser) Tool() string { return toolName } // Discover returns the canonical opencode SQLite database below root. The root // may be ~/.local/share/opencode or the database path itself. func (p *Parser) Discover(root string) ([]parser.SourceFile, error) { info, err := os.Stat(root) if err != nil { return nil, err } if !info.IsDir() { if filepath.Base(root) != "opencode.db" { return nil, nil } return []parser.SourceFile{{Path: root, Size: info.Size()}}, nil } dbPath := filepath.Join(root, "opencode.db") info, err = os.Stat(dbPath) if err != nil { if os.IsNotExist(err) { return nil, nil } return nil, err } if info.IsDir() { return nil, nil } return []parser.SourceFile{{Path: dbPath, Size: info.Size()}}, nil } // Parse reads opencode messages starting at since. since and the returned marker // are message.rowid markers: since is the next rowid to scan, returned offset is // one past the last scanned row, and message.time_created remains the timestamp. func (p *Parser) Parse(path string, since int64) ([]wire.TurnEvent, int64, error) { db, err := sql.Open("sqlite", readOnlyDSN(path)) if err != nil { return nil, since, err } defer func() { _ = db.Close() }() ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() rows, err := db.QueryContext(ctx, ` SELECT m.rowid, m.id, m.session_id, m.time_created, m.data, s.directory, s.time_created, s.slug, s.version FROM message m JOIN session s ON s.id = m.session_id WHERE m.rowid >= ? ORDER BY m.rowid ASC`, since) if err != nil { return nil, since, err } defer func() { _ = rows.Close() }() events := make([]wire.TurnEvent, 0) next := since for rows.Next() { var record messageRow if err := rows.Scan(&record.RowID, &record.ID, &record.SessionID, &record.TimeCreated, &record.Data, &record.Directory, &record.SessionCreated, &record.Slug, &record.Version); err != nil { return events, next, err } if candidate := record.RowID + 1; candidate > next { next = candidate } parts, err := loadParts(ctx, db, record.ID) if err != nil { return events, next, err } event, ok := p.mapRecord(path, record, parts) if ok { events = append(events, event) } } if err := rows.Err(); err != nil { return events, next, err } return events, next, nil } type messageRow struct { RowID int64 ID string SessionID string TimeCreated int64 Data string Directory string SessionCreated int64 Slug string Version string } type partRow struct { ID string TimeCreated int64 Data string } type messageData struct { Role string `json:"role"` Agent string `json:"agent"` Mode string `json:"mode"` ModelID string `json:"modelID"` ProviderID string `json:"providerID"` Tokens tokenData `json:"tokens"` Cost float64 `json:"cost"` Path pathData `json:"path"` Time timeData `json:"time"` Raw json.RawMessage } type tokenData struct { Input int64 `json:"input"` Output int64 `json:"output"` } type pathData struct { CWD string `json:"cwd"` Root string `json:"root"` } type timeData struct { Created int64 `json:"created"` Completed int64 `json:"completed"` } type partData struct { Type string `json:"type"` Text string `json:"text"` Tool string `json:"tool"` CallID string `json:"callID"` State json.RawMessage `json:"state"` Raw json.RawMessage } type toolState struct { Status string `json:"status"` Output json.RawMessage `json:"output"` } type toolCallSummary struct { Tool string `json:"tool,omitempty"` CallID string `json:"call_id,omitempty"` Status string `json:"status,omitempty"` Output string `json:"output,omitempty"` } func loadParts(ctx context.Context, db *sql.DB, messageID string) ([]partRow, error) { rows, err := db.QueryContext(ctx, ` SELECT id, time_created, data FROM part WHERE message_id = ? ORDER BY time_created ASC, id ASC`, messageID) if err != nil { return nil, err } defer func() { _ = rows.Close() }() parts := make([]partRow, 0) for rows.Next() { var part partRow if err := rows.Scan(&part.ID, &part.TimeCreated, &part.Data); err != nil { return nil, err } parts = append(parts, part) } if err := rows.Err(); err != nil { return nil, err } return parts, nil } func (p *Parser) mapRecord(path string, record messageRow, partRows []partRow) (wire.TurnEvent, bool) { msg, ok := parseMessageData(record.Data) if !ok { return wire.TurnEvent{}, false } role := strings.TrimSpace(msg.Role) if role != "user" && role != "assistant" && role != "system" { return wire.TurnEvent{}, false } texts := make([]string, 0) toolCalls := make([]toolCallSummary, 0) for _, row := range partRows { part, ok := parsePartData(row.Data) if !ok { continue } switch part.Type { case "text", "reasoning": if text := strings.TrimSpace(part.Text); text != "" { texts = append(texts, text) } case "tool": summary := summarizeToolPart(part) toolCalls = append(toolCalls, summary) } } content := strings.Join(texts, "\n\n") if strings.TrimSpace(content) == "" && len(toolCalls) == 0 { return wire.TurnEvent{}, false } if strings.TrimSpace(content) == "" && len(toolCalls) > 0 { parts := make([]string, 0, len(toolCalls)) for _, tc := range toolCalls { parts = append(parts, renderToolSummary(tc)) } content = strings.Join(parts, "\n") } event := wire.TurnEvent{ Tool: toolName, Host: p.host, SessionID: record.SessionID, TurnID: turnIDFor(record), Seq: record.RowID, Role: role, Timestamp: record.TimeCreated / 1000, Content: content, SessionMeta: wire.SessionMeta{ WorkingDir: workingDirFor(record, msg), SourceFile: path, StartedAt: int64PtrOrNil(record.SessionCreated / 1000), Metadata: sessionMetadata(record), }, Metadata: json.RawMessage(record.Data), } if model := strings.TrimSpace(msg.ModelID); model != "" { event.Model = &model } if msg.Tokens.Input != 0 { event.TokensIn = int64PtrOrNil(msg.Tokens.Input) } if msg.Tokens.Output != 0 { event.TokensOut = int64PtrOrNil(msg.Tokens.Output) } if msg.Cost != 0 { cost := msg.Cost event.CostUSD = &cost } if len(toolCalls) > 0 { payload, err := json.Marshal(toolCalls) if err == nil { event.ToolCalls = payload } } return event, true } func parseMessageData(raw string) (messageData, bool) { var msg messageData if err := json.Unmarshal([]byte(raw), &msg); err != nil { return messageData{}, false } msg.Raw = cloneRaw([]byte(raw)) return msg, true } func parsePartData(raw string) (partData, bool) { var part partData if err := json.Unmarshal([]byte(raw), &part); err != nil { return partData{}, false } part.Raw = cloneRaw([]byte(raw)) return part, true } func summarizeToolPart(part partData) toolCallSummary { summary := toolCallSummary{Tool: strings.TrimSpace(part.Tool), CallID: strings.TrimSpace(part.CallID)} var state toolState if len(part.State) > 0 && json.Unmarshal(part.State, &state) == nil { summary.Status = strings.TrimSpace(state.Status) summary.Output = safeOutputSummary(state.Output) } return summary } func safeOutputSummary(raw json.RawMessage) string { trimmed := strings.TrimSpace(string(raw)) if trimmed == "" || trimmed == "null" { return "" } var text string if err := json.Unmarshal(raw, &text); err == nil { return text } return trimmed } func renderToolSummary(summary toolCallSummary) string { label := summary.Tool if label == "" { label = "tool" } parts := []string{fmt.Sprintf("" } func sessionMetadata(record messageRow) json.RawMessage { payload := struct { Slug string `json:"slug,omitempty"` Version string `json:"version,omitempty"` }{Slug: record.Slug, Version: record.Version} b, err := json.Marshal(payload) if err != nil || string(b) == "{}" { return nil } return b } func workingDirFor(record messageRow, msg messageData) *string { for _, value := range []string{record.Directory, msg.Path.CWD, msg.Path.Root} { if value = strings.TrimSpace(value); value != "" { return &value } } return nil } func turnIDFor(record messageRow) string { if strings.TrimSpace(record.ID) != "" { return record.ID } sum := sha256.Sum256([]byte(fmt.Sprintf("%s|%d", record.SessionID, record.TimeCreated))) return hex.EncodeToString(sum[:8]) } func summarizeText(text string) string { text = strings.TrimSpace(text) if text == "" { return "" } line := text if idx := strings.IndexByte(line, '\n'); idx >= 0 { line = line[:idx] } line = strings.TrimSpace(line) if len(line) > 120 { line = line[:117] + "..." } return line } func int64PtrOrNil(v int64) *int64 { if v == 0 { return nil } return &v } func cloneRaw(raw json.RawMessage) json.RawMessage { if len(raw) == 0 { return nil } out := make([]byte, len(raw)) copy(out, raw) return out } func readOnlyDSN(path string) string { return "file:" + path + "?mode=ro&_pragma=busy_timeout(5000)" } var _ parser.Parser = (*Parser)(nil)