@@ 143,7 143,7 @@ func runFile(ctx context.Context, cfg config.Config, src config.SourceConfig, p
return culpa.Wrap(err, "post batch")
}
- advanced, err := persistAcceptedOffset(ctx, store, src.Tool, path, events, newOffset, batch, result.Accepted)
+ advanced, err := persistAcceptedOffset(ctx, store, src.Tool, path, events, newOffset, batch, result)
if err != nil {
return err
}
@@ 175,18 175,29 @@ func enqueueBatch(ctx context.Context, store *state.Store, src config.SourceConf
return nil
}
-func persistAcceptedOffset(ctx context.Context, store *state.Store, tool string, path string, allEvents []wire.TurnEvent, newOffset int64, batch Batch, accepted int) (int, error) {
- if accepted < 0 || accepted > len(batch.Events) {
+func persistAcceptedOffset(ctx context.Context, store *state.Store, tool string, path string, allEvents []wire.TurnEvent, newOffset int64, batch Batch, result Result) (int, error) {
+ if result.Accepted < 0 || result.Accepted > len(batch.Events) {
return 0, culpa.WithCode(
- fmt.Errorf("server accepted %d events for batch length %d", accepted, len(batch.Events)),
+ fmt.Errorf("server accepted %d events for batch length %d", result.Accepted, len(batch.Events)),
"RUNNER_ACCEPTED_INVALID",
)
}
- if accepted == 0 {
+ if result.Accepted == len(batch.Events) && len(result.Errors) > 0 {
+ return 0, culpa.WithCode(
+ fmt.Errorf("server accepted all %d events but reported %d errors", len(batch.Events), len(result.Errors)),
+ "RUNNER_ACCEPTED_MISMATCH",
+ )
+ }
+ if result.Accepted == 0 && len(result.Errors) == 0 {
return 0, nil
}
- nextGlobalIndex := batch.EventIndexes[0] + accepted
+ nextGlobalIndex := batch.EventIndexes[0] + result.Accepted
+ if len(result.Errors) > 0 {
+ // Skip the failed event so it does not loop forever.
+ nextGlobalIndex++
+ }
+
var offset int64
if nextGlobalIndex >= len(allEvents) {
offset = newOffset
@@ 197,5 208,5 @@ func persistAcceptedOffset(ctx context.Context, store *state.Store, tool string,
if err := store.SaveOffset(ctx, tool, path, offset); err != nil {
return 0, culpa.Wrap(err, "save accepted offset")
}
- return accepted, nil
+ return result.Accepted, nil
}
@@ 2,10 2,12 @@ package ingest
import (
"context"
+ "encoding/json"
"io"
"net/http"
"net/http/httptest"
"path/filepath"
+ "strings"
"sync"
"testing"
"time"
@@ 34,7 36,7 @@ func TestRunOnce_AllAcceptedPersistsNewOffsetAndDoesNotEnqueueOutbox(t *testing.
assertOutboxCount(t, ctx, store, 0)
}
-func TestRunOnce_PartialAcceptedPersistsFirstUnacceptedEventOffset(t *testing.T) {
+func TestRunOnce_PartialAcceptWithErrorSkipsFailedEvent(t *testing.T) {
ctx := context.Background()
store := openTestStore(t, ctx)
source := testSource(t, "claude-code", 10, 4096)
@@ 42,7 44,7 @@ func TestRunOnce_PartialAcceptedPersistsFirstUnacceptedEventOffset(t *testing.T)
p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{
file: {events: []wire.TurnEvent{turnEvent(100, "a"), turnEvent(200, "b"), turnEvent(300, "c")}, newOffset: 400},
})
- sender := acceptingSender(t, []Result{{Accepted: 2, Errors: []LineError{{Line: 3, Err: "bad"}}}})
+ sender := acceptingSender(t, []Result{{Accepted: 1, Errors: []LineError{{Line: 2, Err: "bad"}}}})
err := RunOnce(ctx, testConfig(), source, p, store, sender)
if err != nil {
@@ 53,7 55,7 @@ func TestRunOnce_PartialAcceptedPersistsFirstUnacceptedEventOffset(t *testing.T)
assertOutboxCount(t, ctx, store, 0)
}
-func TestRunOnce_ZeroAcceptedDoesNotPersistOffset(t *testing.T) {
+func TestRunOnce_ZeroAcceptedWithErrorSkipsFirstEvent(t *testing.T) {
ctx := context.Background()
store := openTestStore(t, ctx)
source := testSource(t, "claude-code", 10, 4096)
@@ 68,10 70,93 @@ func TestRunOnce_ZeroAcceptedDoesNotPersistOffset(t *testing.T) {
t.Fatalf("RunOnce: %v", err)
}
- assertOffset(t, ctx, store, "claude-code", file, 0)
+ assertOffset(t, ctx, store, "claude-code", file, 200)
assertOutboxCount(t, ctx, store, 0)
}
+func TestRunOnce_PartialAcceptSkipsBadRowAndRerunDoesNotRepostIt(t *testing.T) {
+ ctx := context.Background()
+ store := openTestStore(t, ctx)
+ source := testSource(t, "claude-code", 10, 4096)
+ file := filepath.Join(source.Path, "one.jsonl")
+ events := []wire.TurnEvent{turnEvent(100, "a"), turnEvent(200, "b"), turnEvent(300, "c")}
+ p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{
+ file: {events: events, newOffset: 400},
+ })
+
+ var posted [][]wire.TurnEvent
+ ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ body, _ := io.ReadAll(r.Body)
+ var batch []wire.TurnEvent
+ for _, line := range strings.Split(string(body), "\n") {
+ if line == "" {
+ continue
+ }
+ var ev wire.TurnEvent
+ if err := json.Unmarshal([]byte(line), &ev); err != nil {
+ t.Fatalf("unmarshal event: %v", err)
+ }
+ batch = append(batch, ev)
+ }
+ posted = append(posted, batch)
+ w.Header().Set("Content-Type", "application/json")
+ if len(posted) == 1 {
+ _, _ = w.Write([]byte(resultJSON(Result{Accepted: 1, Errors: []LineError{{Line: 2, Err: "bad"}}})))
+ } else {
+ _, _ = w.Write([]byte(resultJSON(Result{Accepted: 1})))
+ }
+ }))
+ defer ts.Close()
+ sender := NewSender(ts.URL, ts.Client())
+
+ err := RunOnce(ctx, testConfig(), source, p, store, sender)
+ if err != nil {
+ t.Fatalf("RunOnce first run: %v", err)
+ }
+ assertOffset(t, ctx, store, "claude-code", file, 300)
+
+ if len(posted) != 1 {
+ t.Fatalf("first run: expected 1 POST, got %d", len(posted))
+ }
+ if len(posted[0]) != 3 {
+ t.Errorf("first run: expected 3 events in batch, got %d", len(posted[0]))
+ }
+
+ err = RunOnce(ctx, testConfig(), source, p, store, sender)
+ if err != nil {
+ t.Fatalf("RunOnce second run: %v", err)
+ }
+ assertOffset(t, ctx, store, "claude-code", file, 400)
+
+ if len(posted) != 2 {
+ t.Fatalf("second run: expected 2 POSTs total, got %d", len(posted))
+ }
+ if len(posted[1]) != 1 {
+ t.Errorf("second run: expected 1 event in batch, got %d", len(posted[1]))
+ }
+ if posted[1][0].Content != "c" {
+ t.Errorf("second run: expected event 'c', got %q", posted[1][0].Content)
+ }
+}
+
+func TestRunOnce_FullAcceptWithErrorsReturnsError(t *testing.T) {
+ ctx := context.Background()
+ store := openTestStore(t, ctx)
+ source := testSource(t, "claude-code", 10, 4096)
+ file := filepath.Join(source.Path, "one.jsonl")
+ p := newFakeParser("claude-code", []parser.SourceFile{{Path: file}}, map[string]parseResult{
+ file: {events: []wire.TurnEvent{turnEvent(100, "a"), turnEvent(200, "b")}, newOffset: 300},
+ })
+ sender := acceptingSender(t, []Result{{Accepted: 2, Errors: []LineError{{Line: 2, Err: "bad"}}}})
+
+ err := RunOnce(ctx, testConfig(), source, p, store, sender)
+ if err == nil {
+ t.Fatal("expected RunOnce error when server accepts all but reports errors")
+ }
+
+ assertOffset(t, ctx, store, "claude-code", file, 0)
+}
+
func TestRunOnce_HardErrorEnqueuesUnsentBatchAndDoesNotAdvanceOffset(t *testing.T) {
ctx := context.Background()
store := openTestStore(t, ctx)
@@ 177,7 262,13 @@ func (p *fakeParser) Parse(path string, since int64) ([]wire.TurnEvent, int64, e
if res.err != nil {
return nil, since, res.err
}
- return res.events, res.newOffset, nil
+ var filtered []wire.TurnEvent
+ for _, ev := range res.events {
+ if ev.Seq >= since {
+ filtered = append(filtered, ev)
+ }
+ }
+ return filtered, res.newOffset, nil
}
type roundTripFunc func(*http.Request) (*http.Response, error)