From d2a77459be9c2a2a61326e3bce1e64b749bda522 Mon Sep 17 00:00:00 2001 From: Eugene Blikh Date: Mon, 4 May 2026 13:50:21 +0300 Subject: [PATCH] fix: store full opencode tool output and display it in session view - Collector sender now supports Remote-User header for forward-auth - OpenCode parser: store full tool output (was truncated to 120 chars + first line) - OpenCode parser: generate fallback content for tool-only turns instead of empty content - OpenCode parser: increase parse timeout to 5min for large DBs - Frontend: show collapsible tool output in session detail below turn body - Frontend: extract toolOutput from both opencode ({output}) and claude-code ({content}) formats --- cmd/lethe-collector/main.go | 4 +- internal/collector/config/config.go | 15 +++--- internal/collector/ingest/outbox_test.go | 6 +-- internal/collector/ingest/runner_test.go | 12 ++--- internal/collector/ingest/sender.go | 18 ++++++-- internal/collector/ingest/sender_test.go | 12 ++--- internal/collector/parser/opencode/parser.go | 18 +++++--- .../collector/parser/opencode/parser_test.go | 4 +- internal/server/web/dist/index.html | 2 +- web/src/features/session/Transcript.tsx | 10 ++++ web/src/features/session/useSession.ts | 46 +++++++++++++------ 11 files changed, 95 insertions(+), 52 deletions(-) diff --git a/cmd/lethe-collector/main.go b/cmd/lethe-collector/main.go index 9b5c9ff72eeaf451b42f961cc4ee95ea4b86b0e1..5999dbf811abd538bff4e975cfb46bdbee62f8db 100644 --- a/cmd/lethe-collector/main.go +++ b/cmd/lethe-collector/main.go @@ -80,7 +80,7 @@ func newDaemonCmd(configPath *string) *cobra.Command { defer store.Close() client := &http.Client{Timeout: cfg.HTTP.Timeout} - sender := ingest.NewSender(cfg.ServerURL, client) + sender := ingest.NewSender(cfg.ServerURL, client, cfg.RemoteUserHeader) parsers := buildParsers(cfg.Host) @@ -111,7 +111,7 @@ func newBackfillCmd(configPath *string) *cobra.Command { defer store.Close() client := &http.Client{Timeout: cfg.HTTP.Timeout} - sender := ingest.NewSender(cfg.ServerURL, client) + sender := ingest.NewSender(cfg.ServerURL, client, cfg.RemoteUserHeader) parsers := buildParsers(cfg.Host) p, ok := parsers[tool] diff --git a/internal/collector/config/config.go b/internal/collector/config/config.go index 3c0f449a2bcdf3983a270838a463a029b89e48a8..27a512ce58f9f63d3df7dc15e8ec0e627340e08b 100644 --- a/internal/collector/config/config.go +++ b/internal/collector/config/config.go @@ -23,13 +23,14 @@ import ( // Config is the root collector configuration. type Config struct { - Host string `mapstructure:"host" validate:"required"` - ServerURL string `mapstructure:"server_url" validate:"required,url"` - StateDir string `mapstructure:"state_dir"` - HTTP HTTPConfig `mapstructure:"http"` - Outbox OutboxConfig `mapstructure:"outbox"` - Sources []SourceConfig `mapstructure:"sources" validate:"required,min=1,dive"` - Log LogConfig `mapstructure:"log"` + Host string `mapstructure:"host" validate:"required"` + ServerURL string `mapstructure:"server_url" validate:"required,url"` + RemoteUserHeader string `mapstructure:"remote_user_header"` + StateDir string `mapstructure:"state_dir"` + HTTP HTTPConfig `mapstructure:"http"` + Outbox OutboxConfig `mapstructure:"outbox"` + Sources []SourceConfig `mapstructure:"sources" validate:"required,min=1,dive"` + Log LogConfig `mapstructure:"log"` } // HTTPConfig tunes the outbound POST client. diff --git a/internal/collector/ingest/outbox_test.go b/internal/collector/ingest/outbox_test.go index 471ea84c83c5e491ddccf037e1488f770b38fde2..c36526856b730d4e4c0b78777549c1a10f09afc3 100644 --- a/internal/collector/ingest/outbox_test.go +++ b/internal/collector/ingest/outbox_test.go @@ -43,7 +43,7 @@ func TestReplayOutbox_DeletesOnlyFullyAcceptedRows(t *testing.T) { })) defer ts.Close() - sender := NewSender(ts.URL, http.DefaultClient) + sender := NewSender(ts.URL, http.DefaultClient, "") if err := ReplayOutbox(ctx, store, sender, 10); err != nil { t.Fatalf("ReplayOutbox: %v", err) } @@ -84,7 +84,7 @@ func TestReplayOutbox_LeavesRowOnPartialAccept(t *testing.T) { })) defer ts.Close() - sender := NewSender(ts.URL, http.DefaultClient) + sender := NewSender(ts.URL, http.DefaultClient, "") if err := ReplayOutbox(ctx, store, sender, 10); err != nil { t.Fatalf("ReplayOutbox: %v", err) } @@ -119,7 +119,7 @@ func TestReplayOutbox_LeavesRowOnError(t *testing.T) { })) defer ts.Close() - sender := NewSender(ts.URL, http.DefaultClient) + sender := NewSender(ts.URL, http.DefaultClient, "") // Expect error to propagate. if err := ReplayOutbox(ctx, store, sender, 10); err == nil { t.Fatal("expected error on 500, got nil") diff --git a/internal/collector/ingest/runner_test.go b/internal/collector/ingest/runner_test.go index 07424741916557fd326c1ca87f0d11b4ba06600e..5da02f795a054830983c62d7507717dd76410feb 100644 --- a/internal/collector/ingest/runner_test.go +++ b/internal/collector/ingest/runner_test.go @@ -133,7 +133,7 @@ func TestRunOnce_ZeroAcceptedWithErrorLineTwoPostsPrefixAndSkipsFailedEvent(t *t } })) defer ts.Close() - sender := NewSender(ts.URL, ts.Client()) + sender := NewSender(ts.URL, ts.Client(), "") err := RunOnce(ctx, testConfig(), source, p, store, sender) if err != nil { @@ -241,7 +241,7 @@ func TestRunOnce_PartialAcceptSkipsBadRowAndRerunDoesNotRepostIt(t *testing.T) { } })) defer ts.Close() - sender := NewSender(ts.URL, ts.Client()) + sender := NewSender(ts.URL, ts.Client(), "") err := RunOnce(ctx, testConfig(), source, p, store, sender) if err != nil { @@ -393,7 +393,7 @@ func TestRunDaemon_CancelWaitsForActiveRunOnce(t *testing.T) { _, _ = w.Write([]byte(resultJSON(Result{Accepted: 1}))) })) defer ts.Close() - sender := NewSender(ts.URL, ts.Client()) + sender := NewSender(ts.URL, ts.Client(), "") cfg := testConfig() cfg.Sources = []config.SourceConfig{source} @@ -500,7 +500,7 @@ func TestRunOnce_SkippedOnlyParseResultPersistsNewOffsetAndDoesNotPost(t *testin _, _ = w.Write([]byte(resultJSON(Result{Accepted: 0}))) })) defer ts.Close() - sender := NewSender(ts.URL, ts.Client()) + sender := NewSender(ts.URL, ts.Client(), "") p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{ file: {events: []wire.TurnEvent{}, newOffset: 250}, @@ -591,7 +591,7 @@ func acceptingSender(t *testing.T, results []Result) *Sender { _, _ = w.Write([]byte(resultJSON(result))) })) t.Cleanup(ts.Close) - return NewSender(ts.URL, ts.Client()) + return NewSender(ts.URL, ts.Client(), "") } func failingSender(t *testing.T) *Sender { @@ -599,5 +599,5 @@ func failingSender(t *testing.T) *Sender { client := &http.Client{Transport: roundTripFunc(func(*http.Request) (*http.Response, error) { return nil, errPostBoom })} - return NewSender("http://127.0.0.1", client) + return NewSender("http://127.0.0.1", client, "") } diff --git a/internal/collector/ingest/sender.go b/internal/collector/ingest/sender.go index 88868652d6f558c7707744b87e4290bae4a10515..9648314263782a9124d034a2f1c619eb08cd1fa2 100644 --- a/internal/collector/ingest/sender.go +++ b/internal/collector/ingest/sender.go @@ -29,13 +29,20 @@ type Result struct { // Sender POSTs NDJSON batches to the lethe ingest endpoint. type Sender struct { - serverURL string - client *http.Client + serverURL string + client *http.Client + remoteUserHeader string } // NewSender builds a Sender that posts to serverURL + /api/v1/ingest. -func NewSender(serverURL string, client *http.Client) *Sender { - return &Sender{serverURL: strings.TrimRight(serverURL, "/"), client: client} +// If remoteUserHeader is non-empty, it is sent as the Remote-User header +// on every request (for forward-auth setups without a reverse proxy). +func NewSender(serverURL string, client *http.Client, remoteUserHeader string) *Sender { + return &Sender{ + serverURL: strings.TrimRight(serverURL, "/"), + client: client, + remoteUserHeader: remoteUserHeader, + } } // PostBatch encodes events as NDJSON and POSTs them to the ingest endpoint. @@ -54,6 +61,9 @@ func (s *Sender) postRaw(ctx context.Context, body []byte) (Result, error) { return Result{}, culpa.Wrap(err, "build request") } req.Header.Set("Content-Type", "application/x-ndjson") + if s.remoteUserHeader != "" { + req.Header.Set("Remote-User", s.remoteUserHeader) + } resp, err := s.client.Do(req) if err != nil { diff --git a/internal/collector/ingest/sender_test.go b/internal/collector/ingest/sender_test.go index f12105f0fad6154322915b67e470cadcb0868a20..eef9d6be9a56fd54478b7644189605733fa3c15d 100644 --- a/internal/collector/ingest/sender_test.go +++ b/internal/collector/ingest/sender_test.go @@ -75,7 +75,7 @@ func TestSender_PostBatch_Success(t *testing.T) { })) defer ts.Close() - sender := NewSender(ts.URL, http.DefaultClient) + sender := NewSender(ts.URL, http.DefaultClient, "") events := []wire.TurnEvent{ {Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t1", Seq: 1, Role: "user", Timestamp: 1000, Content: "hello"}, {Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t2", Seq: 2, Role: "assistant", Timestamp: 1001, Content: "world"}, @@ -112,7 +112,7 @@ func TestSender_PostBatch_TrailingSlashURL(t *testing.T) { })) defer ts.Close() - sender := NewSender(ts.URL+"/", http.DefaultClient) + sender := NewSender(ts.URL+"/", http.DefaultClient, "") _, err := sender.PostBatch(context.Background(), []wire.TurnEvent{ {Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t1", Seq: 1, Role: "user", Timestamp: 1000, Content: "hello"}, }) @@ -131,7 +131,7 @@ func TestSender_PostBatch_Non2xx(t *testing.T) { })) defer ts.Close() - sender := NewSender(ts.URL, http.DefaultClient) + sender := NewSender(ts.URL, http.DefaultClient, "") _, err := sender.PostBatch(context.Background(), []wire.TurnEvent{ {Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t1", Seq: 1, Role: "user", Timestamp: 1000, Content: "hello"}, }) @@ -148,7 +148,7 @@ func TestSender_PostBatch_MalformedResponse(t *testing.T) { })) defer ts.Close() - sender := NewSender(ts.URL, http.DefaultClient) + sender := NewSender(ts.URL, http.DefaultClient, "") _, err := sender.PostBatch(context.Background(), []wire.TurnEvent{ {Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t1", Seq: 1, Role: "user", Timestamp: 1000, Content: "hello"}, }) @@ -165,7 +165,7 @@ func TestSender_PostBatch_PartialAccept(t *testing.T) { })) defer ts.Close() - sender := NewSender(ts.URL, http.DefaultClient) + sender := NewSender(ts.URL, http.DefaultClient, "") result, err := sender.PostBatch(context.Background(), []wire.TurnEvent{ {Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t1", Seq: 1, Role: "user", Timestamp: 1000, Content: "hello"}, {Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t2", Seq: 2, Role: "assistant", Timestamp: 1001, Content: "world"}, @@ -189,7 +189,7 @@ func TestSender_PostBatch_StructuredErrors(t *testing.T) { })) defer ts.Close() - sender := NewSender(ts.URL, http.DefaultClient) + sender := NewSender(ts.URL, http.DefaultClient, "") result, err := sender.PostBatch(context.Background(), []wire.TurnEvent{ {Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t1", Seq: 1, Role: "user", Timestamp: 1000, Content: "hello"}, }) diff --git a/internal/collector/parser/opencode/parser.go b/internal/collector/parser/opencode/parser.go index a523d307d787ef0d2f93464ce4cebbdd14cd55da..f40c775a5ec731367f9cc7df8f1bccff6a62da86 100644 --- a/internal/collector/parser/opencode/parser.go +++ b/internal/collector/parser/opencode/parser.go @@ -73,7 +73,7 @@ func (p *Parser) Parse(path string, since int64) ([]wire.TurnEvent, int64, error } defer func() { _ = db.Close() }() - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() rows, err := db.QueryContext(ctx, ` SELECT @@ -237,15 +237,19 @@ func (p *Parser) mapRecord(path string, record messageRow, partRows []partRow) ( case "tool": summary := summarizeToolPart(part) toolCalls = append(toolCalls, summary) - if rendered := renderToolSummary(summary); rendered != "" { - texts = append(texts, rendered) - } } } content := strings.Join(texts, "\n\n") if strings.TrimSpace(content) == "" && len(toolCalls) == 0 { return wire.TurnEvent{}, false } + if strings.TrimSpace(content) == "" && len(toolCalls) > 0 { + parts := make([]string, 0, len(toolCalls)) + for _, tc := range toolCalls { + parts = append(parts, renderToolSummary(tc)) + } + content = strings.Join(parts, "\n") + } event := wire.TurnEvent{ Tool: toolName, @@ -321,9 +325,9 @@ func safeOutputSummary(raw json.RawMessage) string { } var text string if err := json.Unmarshal(raw, &text); err == nil { - return summarizeText(text) + return text } - return summarizeText(trimmed) + return trimmed } func renderToolSummary(summary toolCallSummary) string { @@ -336,7 +340,7 @@ func renderToolSummary(summary toolCallSummary) string { parts = append(parts, " "+summary.Status) } if summary.Output != "" { - parts = append(parts, " - "+summary.Output) + parts = append(parts, " - "+summarizeText(summary.Output)) } return strings.Join(parts, "") + ">" } diff --git a/internal/collector/parser/opencode/parser_test.go b/internal/collector/parser/opencode/parser_test.go index 5d4dd4bac8600e93594c46ad3d241f9387d5a9f4..90b7ec661d004347c960f77baafbfe964b71729a 100644 --- a/internal/collector/parser/opencode/parser_test.go +++ b/internal/collector/parser/opencode/parser_test.go @@ -117,8 +117,8 @@ func TestParse_MapsToolPartSummaryWithoutExternalBlob(t *testing.T) { if events[0].Role != "assistant" { t.Fatalf("Role = %q, want assistant", events[0].Role) } - if !strings.Contains(events[0].Content, "") { - t.Fatalf("Content = %q", events[0].Content) + if events[0].Content != "" { + t.Fatalf("Content = %q, want empty (tool part summary no longer embedded)", events[0].Content) } if strings.Contains(events[0].Content, "SECRET") || strings.Contains(string(events[0].ToolCalls), "SECRET") { t.Fatalf("external tool-output blob was ingested: content=%q tool_calls=%s", events[0].Content, string(events[0].ToolCalls)) diff --git a/internal/server/web/dist/index.html b/internal/server/web/dist/index.html index 3a0adf3c27f25360f3946c9da930a6eba7de0e3a..4961ae77816cb1a1c63f5e0da7f5f6b42bb6f199 100644 --- a/internal/server/web/dist/index.html +++ b/internal/server/web/dist/index.html @@ -13,7 +13,7 @@ href="https://fonts.googleapis.com/css2?family=Inter:wght@400;500;600&family=JetBrains+Mono:wght@400;500;700&display=swap" rel="stylesheet" /> - + diff --git a/web/src/features/session/Transcript.tsx b/web/src/features/session/Transcript.tsx index 987ef94215fd8397c039fde037e61eb7394823ad..32e21434b3bd662be0a04f92a3ffd6e5b4d4bb94 100644 --- a/web/src/features/session/Transcript.tsx +++ b/web/src/features/session/Transcript.tsx @@ -59,6 +59,16 @@ export function Transcript({ {t.body} )} + {t.toolOutput != null && ( +
+ + {t.toolName ?? 'tool'} output + +
+                  {t.toolOutput}
+                
+
+ )} ) })} diff --git a/web/src/features/session/useSession.ts b/web/src/features/session/useSession.ts index 5c2536cb8aea3d8fef813553a3f31f0582407ac8..8c0be4dad0f0d72c4d4e9bf14d404ed404d628b3 100644 --- a/web/src/features/session/useSession.ts +++ b/web/src/features/session/useSession.ts @@ -27,6 +27,7 @@ export interface Turn { tokensOut?: number toolName?: string toolKind?: 'call' | 'result' + toolOutput?: string } export interface SessionWithTurnsDTO extends SessionDTO { @@ -37,28 +38,48 @@ export interface SessionWithTurns extends Omit { turns: Turn[] } -function extractToolCalls( +function extractToolCall( raw: unknown, -): { toolName?: string; toolKind?: 'call' | 'result' } { +): { toolName?: string; toolKind?: 'call' | 'result'; toolOutput?: string } { if (raw == null || typeof raw !== 'object') { return {} } const obj = raw as Record + // opencode format: {tool, status, output} + const opencodeTool = typeof obj['tool'] === 'string' ? obj['tool'] : undefined + const opencodeStatus = typeof obj['status'] === 'string' ? obj['status'] : undefined + if (opencodeTool != null) { + const kind = opencodeStatus === 'completed' ? 'result' : 'call' + const output = typeof obj['output'] === 'string' ? obj['output'] : undefined + return { toolName: opencodeTool, toolKind: kind as 'call' | 'result', toolOutput: output } + } + // claude-code format: {name, type: "tool_use"|"tool_result"} const name = typeof obj['name'] === 'string' ? obj['name'] : undefined + const type = typeof obj['type'] === 'string' ? obj['type'] : undefined const kind = - obj['kind'] === 'call' || obj['kind'] === 'result' - ? (obj['kind'] as 'call' | 'result') - : undefined - if (name == null && kind == null) { - return {} + type === 'tool_use' ? 'call' as const + : type === 'tool_result' ? 'result' as const + : undefined + if (name != null && kind != null) { + const output = typeof obj['content'] === 'string' ? obj['content'] : undefined + return { toolName: name, toolKind: kind, toolOutput: output } } - return { toolName: name, toolKind: kind } + return {} +} + +function extractFirstToolCall( + raw: unknown, +): { toolName?: string; toolKind?: 'call' | 'result'; toolOutput?: string } { + if (Array.isArray(raw) && raw.length > 0) { + return extractToolCall(raw[0]) + } + return extractToolCall(raw) } export function adaptTurn(d: TurnDTO): Turn { const role: 'user' | 'assistant' | 'tool' = d.role === 'system' ? 'assistant' : d.role - const { toolName, toolKind } = extractToolCalls(d.tool_calls) + const { toolName, toolKind, toolOutput } = extractFirstToolCall(d.tool_calls) return { i: d.seq, role, @@ -68,6 +89,7 @@ export function adaptTurn(d: TurnDTO): Turn { tokensOut: d.tokens_out, toolName, toolKind, + toolOutput, } } @@ -76,15 +98,11 @@ export function useSession( host: string, id: string, ): UseQueryResult { - // id is the composite `tool/host/session_id` from Session.id. - // Extract only the session_id tail for the API URL. - const rawSessionId = id.split('/').slice(2).join('/') - return useQuery({ queryKey: ['session', tool, host, id], queryFn: async () => { const dto = await apiFetch( - `/api/v1/sessions/${tool}/${host}/${rawSessionId}`, + `/api/v1/sessions/${tool}/${host}/${id}`, ) const session = adaptSession(dto) return { ...session, turns: dto.turns.map(adaptTurn) }