M cmd/lethe-collector/main.go => cmd/lethe-collector/main.go +2 -2
@@ 80,7 80,7 @@ func newDaemonCmd(configPath *string) *cobra.Command {
defer store.Close()
client := &http.Client{Timeout: cfg.HTTP.Timeout}
- sender := ingest.NewSender(cfg.ServerURL, client)
+ sender := ingest.NewSender(cfg.ServerURL, client, cfg.RemoteUserHeader)
parsers := buildParsers(cfg.Host)
@@ 111,7 111,7 @@ func newBackfillCmd(configPath *string) *cobra.Command {
defer store.Close()
client := &http.Client{Timeout: cfg.HTTP.Timeout}
- sender := ingest.NewSender(cfg.ServerURL, client)
+ sender := ingest.NewSender(cfg.ServerURL, client, cfg.RemoteUserHeader)
parsers := buildParsers(cfg.Host)
p, ok := parsers[tool]
M internal/collector/config/config.go => internal/collector/config/config.go +8 -7
@@ 23,13 23,14 @@ import (
// Config is the root collector configuration.
type Config struct {
- Host string `mapstructure:"host" validate:"required"`
- ServerURL string `mapstructure:"server_url" validate:"required,url"`
- StateDir string `mapstructure:"state_dir"`
- HTTP HTTPConfig `mapstructure:"http"`
- Outbox OutboxConfig `mapstructure:"outbox"`
- Sources []SourceConfig `mapstructure:"sources" validate:"required,min=1,dive"`
- Log LogConfig `mapstructure:"log"`
+ Host string `mapstructure:"host" validate:"required"`
+ ServerURL string `mapstructure:"server_url" validate:"required,url"`
+ RemoteUserHeader string `mapstructure:"remote_user_header"`
+ StateDir string `mapstructure:"state_dir"`
+ HTTP HTTPConfig `mapstructure:"http"`
+ Outbox OutboxConfig `mapstructure:"outbox"`
+ Sources []SourceConfig `mapstructure:"sources" validate:"required,min=1,dive"`
+ Log LogConfig `mapstructure:"log"`
}
// HTTPConfig tunes the outbound POST client.
M internal/collector/ingest/outbox_test.go => internal/collector/ingest/outbox_test.go +3 -3
@@ 43,7 43,7 @@ func TestReplayOutbox_DeletesOnlyFullyAcceptedRows(t *testing.T) {
}))
defer ts.Close()
- sender := NewSender(ts.URL, http.DefaultClient)
+ sender := NewSender(ts.URL, http.DefaultClient, "")
if err := ReplayOutbox(ctx, store, sender, 10); err != nil {
t.Fatalf("ReplayOutbox: %v", err)
}
@@ 84,7 84,7 @@ func TestReplayOutbox_LeavesRowOnPartialAccept(t *testing.T) {
}))
defer ts.Close()
- sender := NewSender(ts.URL, http.DefaultClient)
+ sender := NewSender(ts.URL, http.DefaultClient, "")
if err := ReplayOutbox(ctx, store, sender, 10); err != nil {
t.Fatalf("ReplayOutbox: %v", err)
}
@@ 119,7 119,7 @@ func TestReplayOutbox_LeavesRowOnError(t *testing.T) {
}))
defer ts.Close()
- sender := NewSender(ts.URL, http.DefaultClient)
+ sender := NewSender(ts.URL, http.DefaultClient, "")
// Expect error to propagate.
if err := ReplayOutbox(ctx, store, sender, 10); err == nil {
t.Fatal("expected error on 500, got nil")
M internal/collector/ingest/runner_test.go => internal/collector/ingest/runner_test.go +6 -6
@@ 133,7 133,7 @@ func TestRunOnce_ZeroAcceptedWithErrorLineTwoPostsPrefixAndSkipsFailedEvent(t *t
}
}))
defer ts.Close()
- sender := NewSender(ts.URL, ts.Client())
+ sender := NewSender(ts.URL, ts.Client(), "")
err := RunOnce(ctx, testConfig(), source, p, store, sender)
if err != nil {
@@ 241,7 241,7 @@ func TestRunOnce_PartialAcceptSkipsBadRowAndRerunDoesNotRepostIt(t *testing.T) {
}
}))
defer ts.Close()
- sender := NewSender(ts.URL, ts.Client())
+ sender := NewSender(ts.URL, ts.Client(), "")
err := RunOnce(ctx, testConfig(), source, p, store, sender)
if err != nil {
@@ 393,7 393,7 @@ func TestRunDaemon_CancelWaitsForActiveRunOnce(t *testing.T) {
_, _ = w.Write([]byte(resultJSON(Result{Accepted: 1})))
}))
defer ts.Close()
- sender := NewSender(ts.URL, ts.Client())
+ sender := NewSender(ts.URL, ts.Client(), "")
cfg := testConfig()
cfg.Sources = []config.SourceConfig{source}
@@ 500,7 500,7 @@ func TestRunOnce_SkippedOnlyParseResultPersistsNewOffsetAndDoesNotPost(t *testin
_, _ = w.Write([]byte(resultJSON(Result{Accepted: 0})))
}))
defer ts.Close()
- sender := NewSender(ts.URL, ts.Client())
+ sender := NewSender(ts.URL, ts.Client(), "")
p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{
file: {events: []wire.TurnEvent{}, newOffset: 250},
@@ 591,7 591,7 @@ func acceptingSender(t *testing.T, results []Result) *Sender {
_, _ = w.Write([]byte(resultJSON(result)))
}))
t.Cleanup(ts.Close)
- return NewSender(ts.URL, ts.Client())
+ return NewSender(ts.URL, ts.Client(), "")
}
func failingSender(t *testing.T) *Sender {
@@ 599,5 599,5 @@ func failingSender(t *testing.T) *Sender {
client := &http.Client{Transport: roundTripFunc(func(*http.Request) (*http.Response, error) {
return nil, errPostBoom
})}
- return NewSender("http://127.0.0.1", client)
+ return NewSender("http://127.0.0.1", client, "")
}
M internal/collector/ingest/sender.go => internal/collector/ingest/sender.go +14 -4
@@ 29,13 29,20 @@ type Result struct {
// Sender POSTs NDJSON batches to the lethe ingest endpoint.
type Sender struct {
- serverURL string
- client *http.Client
+ serverURL string
+ client *http.Client
+ remoteUserHeader string
}
// NewSender builds a Sender that posts to serverURL + /api/v1/ingest.
-func NewSender(serverURL string, client *http.Client) *Sender {
- return &Sender{serverURL: strings.TrimRight(serverURL, "/"), client: client}
+// If remoteUserHeader is non-empty, it is sent as the Remote-User header
+// on every request (for forward-auth setups without a reverse proxy).
+func NewSender(serverURL string, client *http.Client, remoteUserHeader string) *Sender {
+ return &Sender{
+ serverURL: strings.TrimRight(serverURL, "/"),
+ client: client,
+ remoteUserHeader: remoteUserHeader,
+ }
}
// PostBatch encodes events as NDJSON and POSTs them to the ingest endpoint.
@@ 54,6 61,9 @@ func (s *Sender) postRaw(ctx context.Context, body []byte) (Result, error) {
return Result{}, culpa.Wrap(err, "build request")
}
req.Header.Set("Content-Type", "application/x-ndjson")
+ if s.remoteUserHeader != "" {
+ req.Header.Set("Remote-User", s.remoteUserHeader)
+ }
resp, err := s.client.Do(req)
if err != nil {
M internal/collector/ingest/sender_test.go => internal/collector/ingest/sender_test.go +6 -6
@@ 75,7 75,7 @@ func TestSender_PostBatch_Success(t *testing.T) {
}))
defer ts.Close()
- sender := NewSender(ts.URL, http.DefaultClient)
+ sender := NewSender(ts.URL, http.DefaultClient, "")
events := []wire.TurnEvent{
{Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t1", Seq: 1, Role: "user", Timestamp: 1000, Content: "hello"},
{Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t2", Seq: 2, Role: "assistant", Timestamp: 1001, Content: "world"},
@@ 112,7 112,7 @@ func TestSender_PostBatch_TrailingSlashURL(t *testing.T) {
}))
defer ts.Close()
- sender := NewSender(ts.URL+"/", http.DefaultClient)
+ sender := NewSender(ts.URL+"/", http.DefaultClient, "")
_, err := sender.PostBatch(context.Background(), []wire.TurnEvent{
{Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t1", Seq: 1, Role: "user", Timestamp: 1000, Content: "hello"},
})
@@ 131,7 131,7 @@ func TestSender_PostBatch_Non2xx(t *testing.T) {
}))
defer ts.Close()
- sender := NewSender(ts.URL, http.DefaultClient)
+ sender := NewSender(ts.URL, http.DefaultClient, "")
_, err := sender.PostBatch(context.Background(), []wire.TurnEvent{
{Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t1", Seq: 1, Role: "user", Timestamp: 1000, Content: "hello"},
})
@@ 148,7 148,7 @@ func TestSender_PostBatch_MalformedResponse(t *testing.T) {
}))
defer ts.Close()
- sender := NewSender(ts.URL, http.DefaultClient)
+ sender := NewSender(ts.URL, http.DefaultClient, "")
_, err := sender.PostBatch(context.Background(), []wire.TurnEvent{
{Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t1", Seq: 1, Role: "user", Timestamp: 1000, Content: "hello"},
})
@@ 165,7 165,7 @@ func TestSender_PostBatch_PartialAccept(t *testing.T) {
}))
defer ts.Close()
- sender := NewSender(ts.URL, http.DefaultClient)
+ sender := NewSender(ts.URL, http.DefaultClient, "")
result, err := sender.PostBatch(context.Background(), []wire.TurnEvent{
{Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t1", Seq: 1, Role: "user", Timestamp: 1000, Content: "hello"},
{Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t2", Seq: 2, Role: "assistant", Timestamp: 1001, Content: "world"},
@@ 189,7 189,7 @@ func TestSender_PostBatch_StructuredErrors(t *testing.T) {
}))
defer ts.Close()
- sender := NewSender(ts.URL, http.DefaultClient)
+ sender := NewSender(ts.URL, http.DefaultClient, "")
result, err := sender.PostBatch(context.Background(), []wire.TurnEvent{
{Tool: "claude-code", Host: "laptop", SessionID: "s1", TurnID: "t1", Seq: 1, Role: "user", Timestamp: 1000, Content: "hello"},
})
M internal/collector/parser/opencode/parser.go => internal/collector/parser/opencode/parser.go +11 -7
@@ 73,7 73,7 @@ func (p *Parser) Parse(path string, since int64) ([]wire.TurnEvent, int64, error
}
defer func() { _ = db.Close() }()
- ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
rows, err := db.QueryContext(ctx, `
SELECT
@@ 237,15 237,19 @@ func (p *Parser) mapRecord(path string, record messageRow, partRows []partRow) (
case "tool":
summary := summarizeToolPart(part)
toolCalls = append(toolCalls, summary)
- if rendered := renderToolSummary(summary); rendered != "" {
- texts = append(texts, rendered)
- }
}
}
content := strings.Join(texts, "\n\n")
if strings.TrimSpace(content) == "" && len(toolCalls) == 0 {
return wire.TurnEvent{}, false
}
+ if strings.TrimSpace(content) == "" && len(toolCalls) > 0 {
+ parts := make([]string, 0, len(toolCalls))
+ for _, tc := range toolCalls {
+ parts = append(parts, renderToolSummary(tc))
+ }
+ content = strings.Join(parts, "\n")
+ }
event := wire.TurnEvent{
Tool: toolName,
@@ 321,9 325,9 @@ func safeOutputSummary(raw json.RawMessage) string {
}
var text string
if err := json.Unmarshal(raw, &text); err == nil {
- return summarizeText(text)
+ return text
}
- return summarizeText(trimmed)
+ return trimmed
}
func renderToolSummary(summary toolCallSummary) string {
@@ 336,7 340,7 @@ func renderToolSummary(summary toolCallSummary) string {
parts = append(parts, " "+summary.Status)
}
if summary.Output != "" {
- parts = append(parts, " - "+summary.Output)
+ parts = append(parts, " - "+summarizeText(summary.Output))
}
return strings.Join(parts, "") + ">"
}
M internal/collector/parser/opencode/parser_test.go => internal/collector/parser/opencode/parser_test.go +2 -2
@@ 117,8 117,8 @@ func TestParse_MapsToolPartSummaryWithoutExternalBlob(t *testing.T) {
if events[0].Role != "assistant" {
t.Fatalf("Role = %q, want assistant", events[0].Role)
}
- if !strings.Contains(events[0].Content, "<tool: bash completed - go test ./internal/collector/parser/opencode ok>") {
- t.Fatalf("Content = %q", events[0].Content)
+ if events[0].Content != "" {
+ t.Fatalf("Content = %q, want empty (tool part summary no longer embedded)", events[0].Content)
}
if strings.Contains(events[0].Content, "SECRET") || strings.Contains(string(events[0].ToolCalls), "SECRET") {
t.Fatalf("external tool-output blob was ingested: content=%q tool_calls=%s", events[0].Content, string(events[0].ToolCalls))
M internal/server/web/dist/index.html => internal/server/web/dist/index.html +1 -1
@@ 13,7 13,7 @@
href="https://fonts.googleapis.com/css2?family=Inter:wght@400;500;600&family=JetBrains+Mono:wght@400;500;700&display=swap"
rel="stylesheet"
/>
- <script type="module" crossorigin src="/assets/index-BLnRqqxn.js"></script>
+ <script type="module" crossorigin src="/assets/index-B0_vS5f_.js"></script>
<link rel="stylesheet" crossorigin href="/assets/index-Dc2JzkyG.css">
</head>
<body class="density-compact">
M web/src/features/session/Transcript.tsx => web/src/features/session/Transcript.tsx +10 -0
@@ 59,6 59,16 @@ export function Transcript({
<ReactMarkdown>{t.body}</ReactMarkdown>
)}
</div>
+ {t.toolOutput != null && (
+ <details className="tool-output">
+ <summary style={{ fontSize: 11, color: 'var(--ink-2)', cursor: 'pointer' }}>
+ {t.toolName ?? 'tool'} output
+ </summary>
+ <pre style={{ marginTop: 6, fontSize: 12, maxHeight: 360, overflow: 'auto' }}>
+ {t.toolOutput}
+ </pre>
+ </details>
+ )}
</div>
)
})}
M web/src/features/session/useSession.ts => web/src/features/session/useSession.ts +32 -14
@@ 27,6 27,7 @@ export interface Turn {
tokensOut?: number
toolName?: string
toolKind?: 'call' | 'result'
+ toolOutput?: string
}
export interface SessionWithTurnsDTO extends SessionDTO {
@@ 37,28 38,48 @@ export interface SessionWithTurns extends Omit<Session, 'turns'> {
turns: Turn[]
}
-function extractToolCalls(
+function extractToolCall(
raw: unknown,
-): { toolName?: string; toolKind?: 'call' | 'result' } {
+): { toolName?: string; toolKind?: 'call' | 'result'; toolOutput?: string } {
if (raw == null || typeof raw !== 'object') {
return {}
}
const obj = raw as Record<string, unknown>
+ // opencode format: {tool, status, output}
+ const opencodeTool = typeof obj['tool'] === 'string' ? obj['tool'] : undefined
+ const opencodeStatus = typeof obj['status'] === 'string' ? obj['status'] : undefined
+ if (opencodeTool != null) {
+ const kind = opencodeStatus === 'completed' ? 'result' : 'call'
+ const output = typeof obj['output'] === 'string' ? obj['output'] : undefined
+ return { toolName: opencodeTool, toolKind: kind as 'call' | 'result', toolOutput: output }
+ }
+ // claude-code format: {name, type: "tool_use"|"tool_result"}
const name = typeof obj['name'] === 'string' ? obj['name'] : undefined
+ const type = typeof obj['type'] === 'string' ? obj['type'] : undefined
const kind =
- obj['kind'] === 'call' || obj['kind'] === 'result'
- ? (obj['kind'] as 'call' | 'result')
- : undefined
- if (name == null && kind == null) {
- return {}
+ type === 'tool_use' ? 'call' as const
+ : type === 'tool_result' ? 'result' as const
+ : undefined
+ if (name != null && kind != null) {
+ const output = typeof obj['content'] === 'string' ? obj['content'] : undefined
+ return { toolName: name, toolKind: kind, toolOutput: output }
}
- return { toolName: name, toolKind: kind }
+ return {}
+}
+
+function extractFirstToolCall(
+ raw: unknown,
+): { toolName?: string; toolKind?: 'call' | 'result'; toolOutput?: string } {
+ if (Array.isArray(raw) && raw.length > 0) {
+ return extractToolCall(raw[0])
+ }
+ return extractToolCall(raw)
}
export function adaptTurn(d: TurnDTO): Turn {
const role: 'user' | 'assistant' | 'tool' =
d.role === 'system' ? 'assistant' : d.role
- const { toolName, toolKind } = extractToolCalls(d.tool_calls)
+ const { toolName, toolKind, toolOutput } = extractFirstToolCall(d.tool_calls)
return {
i: d.seq,
role,
@@ 68,6 89,7 @@ export function adaptTurn(d: TurnDTO): Turn {
tokensOut: d.tokens_out,
toolName,
toolKind,
+ toolOutput,
}
}
@@ 76,15 98,11 @@ export function useSession(
host: string,
id: string,
): UseQueryResult<SessionWithTurns> {
- // id is the composite `tool/host/session_id` from Session.id.
- // Extract only the session_id tail for the API URL.
- const rawSessionId = id.split('/').slice(2).join('/')
-
return useQuery({
queryKey: ['session', tool, host, id],
queryFn: async () => {
const dto = await apiFetch<SessionWithTurnsDTO>(
- `/api/v1/sessions/${tool}/${host}/${rawSessionId}`,
+ `/api/v1/sessions/${tool}/${host}/${id}`,
)
const session = adaptSession(dto)
return { ...session, turns: dto.turns.map(adaptTurn) }