package main
import (
"context"
"errors"
"fmt"
"log/slog"
"net/http"
"os"
"os/signal"
"path/filepath"
"syscall"
"github.com/spf13/cobra"
"go.bigb.es/auxilia/scribe"
"sourcecraft.dev/bigbes/lethe/internal/collector/config"
"sourcecraft.dev/bigbes/lethe/internal/collector/ingest"
"sourcecraft.dev/bigbes/lethe/internal/collector/parser"
"sourcecraft.dev/bigbes/lethe/internal/collector/parser/claudecode"
"sourcecraft.dev/bigbes/lethe/internal/collector/parser/opencode"
"sourcecraft.dev/bigbes/lethe/internal/collector/state"
)
func main() {
if err := newRootCmd().Execute(); err != nil {
os.Exit(1)
}
}
func defaultConfigPath() string {
home, err := os.UserHomeDir()
if err != nil {
return "~/.config/lethe/collector.yaml"
}
return filepath.Join(home, ".config", "lethe", "collector.yaml")
}
func newRootCmd() *cobra.Command {
var configPath string
root := &cobra.Command{
Use: "lethe-collector",
Short: "Ingest assistant transcripts into lethe",
}
root.PersistentFlags().StringVar(&configPath, "config", defaultConfigPath(), "path to collector config file")
root.AddCommand(
newDaemonCmd(&configPath),
newBackfillCmd(&configPath),
newStatusCmd(&configPath),
)
return root
}
func newDaemonCmd(configPath *string) *cobra.Command {
return &cobra.Command{
Use: "daemon",
Short: "Run the collector daemon",
RunE: func(cmd *cobra.Command, args []string) error {
cfg, err := config.Load(*configPath)
if err != nil {
return fmt.Errorf("load config: %w", err)
}
logger, err := buildLogger(cfg.Log.Level, cfg.Log.Format)
if err != nil {
return fmt.Errorf("setup logger: %w", err)
}
slog.SetDefault(logger)
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
store, err := state.Open(ctx, filepath.Join(cfg.StateDir, "state.db"))
if err != nil {
return fmt.Errorf("open state: %w", err)
}
defer store.Close()
client := &http.Client{Timeout: cfg.HTTP.Timeout}
sender := ingest.NewSender(cfg.ServerURL, client, cfg.RemoteUserHeader)
parsers := buildParsers(cfg.Host)
return ingest.RunDaemon(ctx, *cfg, parsers, store, sender)
},
}
}
func newBackfillCmd(configPath *string) *cobra.Command {
return &cobra.Command{
Use: "backfill <tool>",
Short: "One-shot ingest for a specific tool",
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
tool := args[0]
cfg, err := config.Load(*configPath)
if err != nil {
return fmt.Errorf("load config: %w", err)
}
ctx := cmd.Context()
store, err := state.Open(ctx, filepath.Join(cfg.StateDir, "state.db"))
if err != nil {
return fmt.Errorf("open state: %w", err)
}
defer store.Close()
client := &http.Client{Timeout: cfg.HTTP.Timeout}
sender := ingest.NewSender(cfg.ServerURL, client, cfg.RemoteUserHeader)
parsers := buildParsers(cfg.Host)
p, ok := parsers[tool]
if !ok {
return fmt.Errorf("no parser registered for tool %q", tool)
}
var matched int
var errs []error
for _, src := range cfg.Sources {
if src.Tool != tool {
continue
}
matched++
if err := ingest.RunBackfillOnce(ctx, *cfg, src, p, store, sender); err != nil {
errs = append(errs, fmt.Errorf("backfill %s (%s): %w", src.Tool, src.Path, err))
}
}
if matched == 0 {
return fmt.Errorf("no configured source for tool %q", tool)
}
if len(errs) > 0 {
return fmt.Errorf("backfill completed with errors: %w", errors.Join(errs...))
}
return nil
},
}
}
func newStatusCmd(configPath *string) *cobra.Command {
return &cobra.Command{
Use: "status",
Short: "Print collector status",
RunE: func(cmd *cobra.Command, args []string) error {
cfg, err := config.Load(*configPath)
if err != nil {
return fmt.Errorf("load config: %w", err)
}
ctx := cmd.Context()
store, err := state.Open(ctx, filepath.Join(cfg.StateDir, "state.db"))
if err != nil {
return fmt.Errorf("open state: %w", err)
}
defer store.Close()
stats, err := store.Stats(ctx)
if err != nil {
return fmt.Errorf("get stats: %w", err)
}
fmt.Printf("host: %s\n", cfg.Host)
fmt.Printf("server_url: %s\n", cfg.ServerURL)
fmt.Printf("state_db: %s\n", filepath.Join(cfg.StateDir, "state.db"))
fmt.Printf("outbox_rows: %d\n", stats.OutboxCount)
fmt.Printf("outbox_bytes: %d\n", stats.OutboxBytes)
fmt.Printf("source_offsets: %d\n", stats.SourceOffsets)
fmt.Println()
parsers := buildParsers(cfg.Host)
for _, src := range cfg.Sources {
fmt.Printf("source: %s (%s)\n", src.Tool, src.Path)
p, ok := parsers[src.Tool]
if !ok {
fmt.Printf(" offset: N/A (no parser for %s)\n", src.Tool)
continue
}
files, err := p.Discover(src.Path)
if err != nil {
fmt.Printf(" discover error: %v\n", err)
continue
}
if len(files) == 0 {
fmt.Printf(" (no files found)\n")
continue
}
for _, f := range files {
off, err := store.GetOffset(ctx, src.Tool, f.Path)
if err != nil {
fmt.Printf(" %s: error=%v\n", f.Path, err)
continue
}
lagBytes := f.Size - off
if lagBytes < 0 {
lagBytes = 0
}
fmt.Printf(" %s: offset=%d lag_bytes=%d\n", f.Path, off, lagBytes)
}
}
return nil
},
}
}
func buildParsers(host string) map[string]parser.Parser {
return map[string]parser.Parser{
"claude-code": claudecode.New(host),
"opencode": opencode.New(host),
}
}
func buildLogger(levelStr, format string) (*slog.Logger, error) {
var level slog.Level
switch levelStr {
case "debug":
level = slog.LevelDebug
case "info":
level = slog.LevelInfo
case "warn":
level = slog.LevelWarn
case "error":
level = slog.LevelError
default:
return nil, fmt.Errorf("unknown log level %q", levelStr)
}
var handler slog.Handler
switch format {
case "tint":
handler = scribe.NewTintHandler(
scribe.WithWriter(os.Stderr),
scribe.WithLevel(level),
)
case "json":
handler = slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{Level: level})
case "human":
handler = slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: level})
default:
return nil, fmt.Errorf("unknown log format %q", format)
}
return slog.New(handler), nil
}