package opencode
import (
"context"
"crypto/sha256"
"database/sql"
"encoding/hex"
"encoding/json"
"fmt"
"os"
"path/filepath"
"strings"
"time"
"sourcecraft.dev/bigbes/lethe/internal/collector/parser"
"sourcecraft.dev/bigbes/lethe/internal/shared/wire"
_ "modernc.org/sqlite"
)
const toolName = "opencode"
// Parser maps opencode SQLite transcript records into lethe wire events.
type Parser struct {
host string
}
// New builds a parser that stamps every emitted event with host.
func New(host string) *Parser {
return &Parser{host: host}
}
// Tool returns the collector-facing tool name.
func (p *Parser) Tool() string {
return toolName
}
// Discover returns the canonical opencode SQLite database below root. The root
// may be ~/.local/share/opencode or the database path itself.
func (p *Parser) Discover(root string) ([]parser.SourceFile, error) {
info, err := os.Stat(root)
if err != nil {
return nil, err
}
if !info.IsDir() {
if filepath.Base(root) != "opencode.db" {
return nil, nil
}
return []parser.SourceFile{{Path: root, Size: info.Size()}}, nil
}
dbPath := filepath.Join(root, "opencode.db")
info, err = os.Stat(dbPath)
if err != nil {
if os.IsNotExist(err) {
return nil, nil
}
return nil, err
}
if info.IsDir() {
return nil, nil
}
return []parser.SourceFile{{Path: dbPath, Size: info.Size()}}, nil
}
// Parse reads opencode messages starting at since. since and the returned marker
// are message.rowid markers: since is the next rowid to scan, returned offset is
// one past the last scanned row, and message.time_created remains the timestamp.
func (p *Parser) Parse(path string, since int64) ([]wire.TurnEvent, int64, error) {
db, err := sql.Open("sqlite", readOnlyDSN(path))
if err != nil {
return nil, since, err
}
defer func() { _ = db.Close() }()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
rows, err := db.QueryContext(ctx, `
SELECT
m.rowid,
m.id,
m.session_id,
m.time_created,
m.data,
s.directory,
s.time_created,
s.slug,
s.version
FROM message m
JOIN session s ON s.id = m.session_id
WHERE m.rowid >= ?
ORDER BY m.rowid ASC`, since)
if err != nil {
return nil, since, err
}
defer func() { _ = rows.Close() }()
events := make([]wire.TurnEvent, 0)
next := since
for rows.Next() {
var record messageRow
if err := rows.Scan(&record.RowID, &record.ID, &record.SessionID, &record.TimeCreated, &record.Data, &record.Directory, &record.SessionCreated, &record.Slug, &record.Version); err != nil {
return events, next, err
}
if candidate := record.RowID + 1; candidate > next {
next = candidate
}
parts, err := loadParts(ctx, db, record.ID)
if err != nil {
return events, next, err
}
event, ok := p.mapRecord(path, record, parts)
if ok {
events = append(events, event)
}
}
if err := rows.Err(); err != nil {
return events, next, err
}
return events, next, nil
}
type messageRow struct {
RowID int64
ID string
SessionID string
TimeCreated int64
Data string
Directory string
SessionCreated int64
Slug string
Version string
}
type partRow struct {
ID string
TimeCreated int64
Data string
}
type messageData struct {
Role string `json:"role"`
Agent string `json:"agent"`
Mode string `json:"mode"`
ModelID string `json:"modelID"`
ProviderID string `json:"providerID"`
Tokens tokenData `json:"tokens"`
Cost float64 `json:"cost"`
Path pathData `json:"path"`
Time timeData `json:"time"`
Raw json.RawMessage
}
type tokenData struct {
Input int64 `json:"input"`
Output int64 `json:"output"`
}
type pathData struct {
CWD string `json:"cwd"`
Root string `json:"root"`
}
type timeData struct {
Created int64 `json:"created"`
Completed int64 `json:"completed"`
}
type partData struct {
Type string `json:"type"`
Text string `json:"text"`
Tool string `json:"tool"`
CallID string `json:"callID"`
State json.RawMessage `json:"state"`
Raw json.RawMessage
}
type toolState struct {
Status string `json:"status"`
Output json.RawMessage `json:"output"`
}
type toolCallSummary struct {
Tool string `json:"tool,omitempty"`
CallID string `json:"call_id,omitempty"`
Status string `json:"status,omitempty"`
Output string `json:"output,omitempty"`
}
func loadParts(ctx context.Context, db *sql.DB, messageID string) ([]partRow, error) {
rows, err := db.QueryContext(ctx, `
SELECT id, time_created, data
FROM part
WHERE message_id = ?
ORDER BY time_created ASC, id ASC`, messageID)
if err != nil {
return nil, err
}
defer func() { _ = rows.Close() }()
parts := make([]partRow, 0)
for rows.Next() {
var part partRow
if err := rows.Scan(&part.ID, &part.TimeCreated, &part.Data); err != nil {
return nil, err
}
parts = append(parts, part)
}
if err := rows.Err(); err != nil {
return nil, err
}
return parts, nil
}
func (p *Parser) mapRecord(path string, record messageRow, partRows []partRow) (wire.TurnEvent, bool) {
msg, ok := parseMessageData(record.Data)
if !ok {
return wire.TurnEvent{}, false
}
role := strings.TrimSpace(msg.Role)
if role != "user" && role != "assistant" && role != "system" {
return wire.TurnEvent{}, false
}
texts := make([]string, 0)
toolCalls := make([]toolCallSummary, 0)
for _, row := range partRows {
part, ok := parsePartData(row.Data)
if !ok {
continue
}
switch part.Type {
case "text", "reasoning":
if text := strings.TrimSpace(part.Text); text != "" {
texts = append(texts, text)
}
case "tool":
summary := summarizeToolPart(part)
toolCalls = append(toolCalls, summary)
}
}
content := strings.Join(texts, "\n\n")
if strings.TrimSpace(content) == "" && len(toolCalls) == 0 {
return wire.TurnEvent{}, false
}
if strings.TrimSpace(content) == "" && len(toolCalls) > 0 {
parts := make([]string, 0, len(toolCalls))
for _, tc := range toolCalls {
parts = append(parts, renderToolSummary(tc))
}
content = strings.Join(parts, "\n")
}
event := wire.TurnEvent{
Tool: toolName,
Host: p.host,
SessionID: record.SessionID,
TurnID: turnIDFor(record),
Seq: record.RowID,
Role: role,
Timestamp: record.TimeCreated / 1000,
Content: content,
SessionMeta: wire.SessionMeta{
WorkingDir: workingDirFor(record, msg),
SourceFile: path,
StartedAt: int64PtrOrNil(record.SessionCreated / 1000),
Metadata: sessionMetadata(record),
},
Metadata: json.RawMessage(record.Data),
}
if model := strings.TrimSpace(msg.ModelID); model != "" {
event.Model = &model
}
if msg.Tokens.Input != 0 {
event.TokensIn = int64PtrOrNil(msg.Tokens.Input)
}
if msg.Tokens.Output != 0 {
event.TokensOut = int64PtrOrNil(msg.Tokens.Output)
}
if msg.Cost != 0 {
cost := msg.Cost
event.CostUSD = &cost
}
if len(toolCalls) > 0 {
payload, err := json.Marshal(toolCalls)
if err == nil {
event.ToolCalls = payload
}
}
return event, true
}
func parseMessageData(raw string) (messageData, bool) {
var msg messageData
if err := json.Unmarshal([]byte(raw), &msg); err != nil {
return messageData{}, false
}
msg.Raw = cloneRaw([]byte(raw))
return msg, true
}
func parsePartData(raw string) (partData, bool) {
var part partData
if err := json.Unmarshal([]byte(raw), &part); err != nil {
return partData{}, false
}
part.Raw = cloneRaw([]byte(raw))
return part, true
}
func summarizeToolPart(part partData) toolCallSummary {
summary := toolCallSummary{Tool: strings.TrimSpace(part.Tool), CallID: strings.TrimSpace(part.CallID)}
var state toolState
if len(part.State) > 0 && json.Unmarshal(part.State, &state) == nil {
summary.Status = strings.TrimSpace(state.Status)
summary.Output = safeOutputSummary(state.Output)
}
return summary
}
func safeOutputSummary(raw json.RawMessage) string {
trimmed := strings.TrimSpace(string(raw))
if trimmed == "" || trimmed == "null" {
return ""
}
var text string
if err := json.Unmarshal(raw, &text); err == nil {
return text
}
return trimmed
}
func renderToolSummary(summary toolCallSummary) string {
label := summary.Tool
if label == "" {
label = "tool"
}
parts := []string{fmt.Sprintf("<tool: %s", label)}
if summary.Status != "" {
parts = append(parts, " "+summary.Status)
}
if summary.Output != "" {
parts = append(parts, " - "+summarizeText(summary.Output))
}
return strings.Join(parts, "") + ">"
}
func sessionMetadata(record messageRow) json.RawMessage {
payload := struct {
Slug string `json:"slug,omitempty"`
Version string `json:"version,omitempty"`
}{Slug: record.Slug, Version: record.Version}
b, err := json.Marshal(payload)
if err != nil || string(b) == "{}" {
return nil
}
return b
}
func workingDirFor(record messageRow, msg messageData) *string {
for _, value := range []string{record.Directory, msg.Path.CWD, msg.Path.Root} {
if value = strings.TrimSpace(value); value != "" {
return &value
}
}
return nil
}
func turnIDFor(record messageRow) string {
if strings.TrimSpace(record.ID) != "" {
return record.ID
}
sum := sha256.Sum256([]byte(fmt.Sprintf("%s|%d", record.SessionID, record.TimeCreated)))
return hex.EncodeToString(sum[:8])
}
func summarizeText(text string) string {
text = strings.TrimSpace(text)
if text == "" {
return ""
}
line := text
if idx := strings.IndexByte(line, '\n'); idx >= 0 {
line = line[:idx]
}
line = strings.TrimSpace(line)
if len(line) > 120 {
line = line[:117] + "..."
}
return line
}
func int64PtrOrNil(v int64) *int64 {
if v == 0 {
return nil
}
return &v
}
func cloneRaw(raw json.RawMessage) json.RawMessage {
if len(raw) == 0 {
return nil
}
out := make([]byte, len(raw))
copy(out, raw)
return out
}
func readOnlyDSN(path string) string {
return "file:" + path + "?mode=ro&_pragma=busy_timeout(5000)"
}
var _ parser.Parser = (*Parser)(nil)