~bigbes/lethe

ref: f3118d95bdf88c114346b0b2b43fad44e564a484 lethe/internal/collector/ingest/batch_test.go -rw-r--r-- 2.0 KiB
f3118d95 — Eugene Blikh collector: preserve valid rows around ingest errors 24 days ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package ingest

import (
	"testing"

	"sourcecraft.dev/bigbes/lethe/internal/shared/wire"
)

func TestBuildBatches_SplitsByMaxLines(t *testing.T) {
	events := []wire.TurnEvent{
		turnEvent(0, "a"),
		turnEvent(10, "b"),
		turnEvent(20, "c"),
	}

	batches, err := BuildBatches(events, 2, 1024)
	if err != nil {
		t.Fatalf("BuildBatches: %v", err)
	}

	if len(batches) != 2 {
		t.Fatalf("len(batches) = %d, want 2", len(batches))
	}
	assertBatch(t, batches[0], []int{0, 1})
	assertBatch(t, batches[1], []int{2})
}

func TestBuildBatches_SplitsByMaxBytes(t *testing.T) {
	events := []wire.TurnEvent{
		turnEvent(0, "a"),
		turnEvent(10, "b"),
		turnEvent(20, "c"),
	}
	firstSize := mustEncodedLen(t, events[:1])

	batches, err := BuildBatches(events, 100, firstSize+1)
	if err != nil {
		t.Fatalf("BuildBatches: %v", err)
	}

	if len(batches) != 3 {
		t.Fatalf("len(batches) = %d, want 3", len(batches))
	}
	assertBatch(t, batches[0], []int{0})
	assertBatch(t, batches[1], []int{1})
	assertBatch(t, batches[2], []int{2})
}

func TestBuildBatches_RejectsNonPositiveCaps(t *testing.T) {
	events := []wire.TurnEvent{turnEvent(0, "a")}

	if _, err := BuildBatches(events, 0, 1024); err == nil {
		t.Fatal("expected error for zero maxLines")
	}
	if _, err := BuildBatches(events, 1, 0); err == nil {
		t.Fatal("expected error for zero maxBytes")
	}
}

func assertBatch(t *testing.T, batch Batch, wantIndexes []int) {
	t.Helper()
	if len(batch.Events) != len(wantIndexes) {
		t.Fatalf("len(batch.Events) = %d, want %d", len(batch.Events), len(wantIndexes))
	}
	if len(batch.EventIndexes) != len(wantIndexes) {
		t.Fatalf("len(batch.EventIndexes) = %d, want %d", len(batch.EventIndexes), len(wantIndexes))
	}
	for i, want := range wantIndexes {
		if batch.EventIndexes[i] != want {
			t.Errorf("EventIndexes[%d] = %d, want %d", i, batch.EventIndexes[i], want)
		}
	}
}

func mustEncodedLen(t *testing.T, events []wire.TurnEvent) int {
	t.Helper()
	data, err := EncodeNDJSON(events)
	if err != nil {
		t.Fatalf("EncodeNDJSON: %v", err)
	}
	return len(data)
}