~bigbes/lethe

ref: 9094d79e008f69af42107022da2bfcf6f8d7aea8 lethe/internal/collector/state/store.go -rw-r--r-- 4.8 KiB
9094d79e — Eugene Blikh collector: enforce outbox cap before replay 24 days ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
package state

import (
	"context"
	"database/sql"
	"errors"
	"fmt"
	"os"
	"path/filepath"
	"time"

	"github.com/jmoiron/sqlx"
	"go.bigb.es/auxilia/culpa"
	_ "modernc.org/sqlite"
)

// Store is the local SQLite persistence for offsets and outbox.
type Store struct {
	db *sqlx.DB
}

// OutboxItem is an event waiting to be POSTed to the server.
type OutboxItem struct {
	Tool       string
	Host       string
	SourceFile string
	Payload    []byte
}

// OutboxRow is an item already persisted in the outbox.
type OutboxRow struct {
	ID         int64  `db:"id"`
	Tool       string `db:"tool"`
	Host       string `db:"host"`
	SourceFile string `db:"source_file"`
	Payload    []byte `db:"payload"`
	CreatedAt  int64  `db:"created_at"`
}

// Stats summarises the current store contents.
type Stats struct {
	OutboxCount   int64
	OutboxBytes   int64
	SourceOffsets int64
}

// Open creates parent directories if needed, opens (or creates) the SQLite
// database at path, and applies idempotent migrations.
func Open(ctx context.Context, path string) (*Store, error) {
	dir := filepath.Dir(path)
	if err := os.MkdirAll(dir, 0o750); err != nil {
		return nil, culpa.WithCode(culpa.Wrap(err, "create state directory"), "STATE_MKDIR")
	}

	dsn := buildDSN(path)
	db, err := sqlx.ConnectContext(ctx, "sqlite", dsn)
	if err != nil {
		return nil, culpa.WithCode(culpa.Wrap(err, "open state database"), "STATE_OPEN")
	}

	if err := applyMigrations(ctx, db); err != nil {
		_ = db.Close()
		return nil, culpa.WithCode(culpa.Wrap(err, "apply state migrations"), "STATE_MIGRATE")
	}

	return &Store{db: db}, nil
}

// Close releases the underlying database.
func (s *Store) Close() error {
	if s.db == nil {
		return nil
	}
	return s.db.Close()
}

// DB exposes the underlying sqlx handle for tests that want to assert on
// schema directly. Production code should not use this.
func (s *Store) DB() *sqlx.DB {
	return s.db
}

// GetOffset returns the last persisted offset for (tool, sourceFile).
// A missing row returns 0.
func (s *Store) GetOffset(ctx context.Context, tool, sourceFile string) (int64, error) {
	var off int64
	err := s.db.GetContext(ctx, &off,
		`SELECT last_offset FROM ingestion_state WHERE tool = ? AND source_file = ?`,
		tool, sourceFile)
	if err != nil {
		if errors.Is(err, sql.ErrNoRows) {
			return 0, nil
		}
		return 0, culpa.Wrap(err, "get offset")
	}
	return off, nil
}

// SaveOffset upserts the offset for (tool, sourceFile).
func (s *Store) SaveOffset(ctx context.Context, tool, sourceFile string, offset int64) error {
	_, err := s.db.ExecContext(ctx, `
		INSERT INTO ingestion_state (tool, source_file, last_offset, updated_at)
		VALUES (?, ?, ?, ?)
		ON CONFLICT (tool, source_file) DO UPDATE SET
			last_offset = excluded.last_offset,
			updated_at = excluded.updated_at
	`, tool, sourceFile, offset, time.Now().Unix())
	if err != nil {
		return culpa.Wrap(err, "save offset")
	}
	return nil
}

// Enqueue inserts a new outbox item.
func (s *Store) Enqueue(ctx context.Context, item OutboxItem) error {
	_, err := s.db.ExecContext(ctx, `
		INSERT INTO outbox (tool, host, source_file, payload, created_at)
		VALUES (?, ?, ?, ?, ?)
	`, item.Tool, item.Host, item.SourceFile, item.Payload, time.Now().Unix())
	if err != nil {
		return culpa.Wrap(err, "enqueue outbox item")
	}
	return nil
}

// Oldest returns up to limit outbox rows ordered by creation time (id).
func (s *Store) Oldest(ctx context.Context, limit int) ([]OutboxRow, error) {
	var rows []OutboxRow
	err := s.db.SelectContext(ctx, &rows, `
		SELECT id, tool, host, source_file, payload, created_at
		FROM outbox
		ORDER BY id ASC
		LIMIT ?
	`, limit)
	if err != nil {
		return nil, culpa.Wrap(err, "list oldest outbox items")
	}
	return rows, nil
}

// Delete removes outbox rows by id.
func (s *Store) Delete(ctx context.Context, ids []int64) error {
	if len(ids) == 0 {
		return nil
	}
	query, args, err := sqlx.In(`DELETE FROM outbox WHERE id IN (?)`, ids)
	if err != nil {
		return culpa.Wrap(err, "build delete query")
	}
	query = s.db.Rebind(query)
	_, err = s.db.ExecContext(ctx, query, args...)
	if err != nil {
		return culpa.Wrap(err, "delete outbox items")
	}
	return nil
}

// Stats returns row counts and byte totals.
func (s *Store) Stats(ctx context.Context) (Stats, error) {
	var st Stats
	if err := s.db.GetContext(ctx, &st.OutboxCount, `SELECT COUNT(*) FROM outbox`); err != nil {
		return st, culpa.Wrap(err, "count outbox")
	}
	if err := s.db.GetContext(ctx, &st.OutboxBytes, `SELECT COALESCE(SUM(LENGTH(payload)), 0) FROM outbox`); err != nil {
		return st, culpa.Wrap(err, "sum outbox bytes")
	}
	if err := s.db.GetContext(ctx, &st.SourceOffsets, `SELECT COUNT(*) FROM ingestion_state`); err != nil {
		return st, culpa.Wrap(err, "count offsets")
	}
	return st, nil
}

func buildDSN(path string) string {
	return fmt.Sprintf(
		"%s?_pragma=journal_mode(WAL)&_pragma=busy_timeout(5000)&_pragma=foreign_keys(on)&_pragma=synchronous(NORMAL)",
		path,
	)
}