~bigbes/ci-cacher

ref: 891eddce3a799dfa25fdce5029ef8eb7114033de ci-cacher/internal/s3/client.go -rw-r--r-- 6.2 KiB
891eddce — Eugene Blikh Initial cacher v0.0.1-dev — S3-backed CI cache helper 2 days ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
// Package s3 wraps the AWS SDK v2 S3 client with the Garage-compatible
// settings that the existing .builds/lib/ci-lib.sh shell helpers configure
// via ~/.aws/config:
//
//	[default]
//	region = $AWS_DEFAULT_REGION
//	s3 =
//	    addressing_style = path
//	    signature_version = s3v4
//	(plus AWS_REQUEST/RESPONSE_CHECKSUM_* = when_required for Garage)
package s3

import (
	"context"
	"errors"
	"fmt"
	"io"
	"net/http"
	"strings"

	"github.com/aws/aws-sdk-go-v2/aws"
	"github.com/aws/aws-sdk-go-v2/credentials"
	"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
	"github.com/aws/aws-sdk-go-v2/service/s3"
	"github.com/aws/aws-sdk-go-v2/service/s3/types"
	"github.com/aws/smithy-go"
)

// Client is a small surface over the S3 SDK tuned for the cacher use case.
type Client struct {
	api      *s3.Client
	uploader *manager.Uploader
	bucket   string
	prefix   string // joined to every key (with / separator)
}

// Options configures a new Client.
type Options struct {
	Endpoint  string
	Region    string
	Bucket    string
	Prefix    string
	KeyID     string
	Secret    string
	UserAgent string // appended to the default UA for traceability
}

// New constructs a Client. The HTTP client uses Go's default transport;
// callers can decorate later if needed (timeouts, proxies).
func New(opts Options) (*Client, error) {
	if opts.Endpoint == "" {
		return nil, errors.New("s3: endpoint required")
	}
	if opts.Region == "" {
		return nil, errors.New("s3: region required")
	}
	if opts.Bucket == "" {
		return nil, errors.New("s3: bucket required")
	}
	if opts.KeyID == "" || opts.Secret == "" {
		return nil, errors.New("s3: credentials required")
	}

	api := s3.New(s3.Options{
		Region:      opts.Region,
		Credentials: credentials.NewStaticCredentialsProvider(opts.KeyID, opts.Secret, ""),
		UsePathStyle: true, // Garage needs path-style addressing.
		BaseEndpoint: aws.String(opts.Endpoint),
		// Garage doesn't implement boto3 1.36+ trailing CRC32 checksums.
		// Mirrors AWS_REQUEST_CHECKSUM_CALCULATION=when_required and
		// AWS_RESPONSE_CHECKSUM_VALIDATION=when_required from the shell.
		RequestChecksumCalculation: aws.RequestChecksumCalculationWhenRequired,
		ResponseChecksumValidation: aws.ResponseChecksumValidationWhenRequired,
	})

	uploader := manager.NewUploader(api, func(u *manager.Uploader) {
		// Single-stream upload — multipart is auto for >5MiB.
		u.Concurrency = 1
	})

	return &Client{
		api:      api,
		uploader: uploader,
		bucket:   opts.Bucket,
		prefix:   strings.TrimSuffix(opts.Prefix, "/"),
	}, nil
}

// FullKey returns prefix/key, with single-slash semantics.
func (c *Client) FullKey(key string) string {
	if c.prefix == "" {
		return strings.TrimPrefix(key, "/")
	}
	return c.prefix + "/" + strings.TrimPrefix(key, "/")
}

// Bucket returns the configured bucket name (for diagnostics).
func (c *Client) Bucket() string { return c.bucket }

// Exists returns true if the object is present.
func (c *Client) Exists(ctx context.Context, key string) (bool, error) {
	_, err := c.api.HeadObject(ctx, &s3.HeadObjectInput{
		Bucket: aws.String(c.bucket),
		Key:    aws.String(c.FullKey(key)),
	})
	if err == nil {
		return true, nil
	}
	if isNotFound(err) {
		return false, nil
	}
	return false, fmt.Errorf("head s3://%s/%s: %w", c.bucket, c.FullKey(key), err)
}

// Get streams the object body. Caller must Close the returned reader.
func (c *Client) Get(ctx context.Context, key string) (io.ReadCloser, error) {
	out, err := c.api.GetObject(ctx, &s3.GetObjectInput{
		Bucket: aws.String(c.bucket),
		Key:    aws.String(c.FullKey(key)),
	})
	if err != nil {
		if isNotFound(err) {
			return nil, fmt.Errorf("get s3://%s/%s: not found", c.bucket, c.FullKey(key))
		}
		return nil, fmt.Errorf("get s3://%s/%s: %w", c.bucket, c.FullKey(key), err)
	}
	return out.Body, nil
}

// Put uploads body to key, streaming. Multipart is automatic for bodies
// over the manager's PartSize threshold (5MiB by default).
func (c *Client) Put(ctx context.Context, key string, body io.Reader) error {
	_, err := c.uploader.Upload(ctx, &s3.PutObjectInput{
		Bucket: aws.String(c.bucket),
		Key:    aws.String(c.FullKey(key)),
		Body:   body,
	})
	if err != nil {
		return fmt.Errorf("upload s3://%s/%s: %w", c.bucket, c.FullKey(key), err)
	}
	return nil
}

// Delete removes the object. Deleting a missing object returns nil
// (matches `aws s3 rm` semantics).
func (c *Client) Delete(ctx context.Context, key string) error {
	_, err := c.api.DeleteObject(ctx, &s3.DeleteObjectInput{
		Bucket: aws.String(c.bucket),
		Key:    aws.String(c.FullKey(key)),
	})
	if err != nil && !isNotFound(err) {
		return fmt.Errorf("delete s3://%s/%s: %w", c.bucket, c.FullKey(key), err)
	}
	return nil
}

// List yields object keys (relative to prefix) under sub.
func (c *Client) List(ctx context.Context, sub string) ([]string, error) {
	fullPrefix := c.FullKey(sub)
	var keys []string
	var token *string
	for {
		out, err := c.api.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
			Bucket:            aws.String(c.bucket),
			Prefix:            aws.String(fullPrefix),
			ContinuationToken: token,
		})
		if err != nil {
			return nil, fmt.Errorf("list s3://%s/%s: %w", c.bucket, fullPrefix, err)
		}
		for _, o := range out.Contents {
			if o.Key == nil {
				continue
			}
			keys = append(keys, strings.TrimPrefix(*o.Key, c.prefix+"/"))
		}
		if out.IsTruncated == nil || !*out.IsTruncated {
			break
		}
		token = out.NextContinuationToken
	}
	return keys, nil
}

// HeadBucket is the smoke test used by `cacher doctor`.
func (c *Client) HeadBucket(ctx context.Context) error {
	_, err := c.api.HeadBucket(ctx, &s3.HeadBucketInput{
		Bucket: aws.String(c.bucket),
	})
	if err != nil {
		return fmt.Errorf("head bucket %s: %w", c.bucket, err)
	}
	return nil
}

// isNotFound covers both HeadObject's empty-error case and the typed
// NoSuchKey returned by GetObject.
func isNotFound(err error) bool {
	var nf *types.NotFound
	var nsk *types.NoSuchKey
	if errors.As(err, &nf) || errors.As(err, &nsk) {
		return true
	}
	var apiErr smithy.APIError
	if errors.As(err, &apiErr) {
		switch apiErr.ErrorCode() {
		case "NotFound", "NoSuchKey", "404":
			return true
		}
	}
	var rerr interface{ HTTPStatusCode() int }
	if errors.As(err, &rerr) && rerr.HTTPStatusCode() == http.StatusNotFound {
		return true
	}
	return false
}