~bigbes/ci-cacher

ref: 202baab18cf97619495b606f1959c080150b124f ci-cacher/internal/s3/client.go -rw-r--r-- 7.6 KiB
202baab1 — Eugene Blikh publish.yml: override hut pages origin to public URL 2 days ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
// Package s3 wraps the AWS SDK v2 S3 client with the Garage-compatible
// settings that the existing .builds/lib/ci-lib.sh shell helpers configure
// via ~/.aws/config:
//
//	[default]
//	region = $AWS_DEFAULT_REGION
//	s3 =
//	    addressing_style = path
//	    signature_version = s3v4
//	(plus AWS_REQUEST/RESPONSE_CHECKSUM_* = when_required for Garage)
package s3

import (
	"context"
	"errors"
	"fmt"
	"io"
	"net/http"
	"strings"

	"github.com/aws/aws-sdk-go-v2/aws"
	"github.com/aws/aws-sdk-go-v2/credentials"
	"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
	"github.com/aws/aws-sdk-go-v2/service/s3"
	"github.com/aws/aws-sdk-go-v2/service/s3/types"
	"github.com/aws/smithy-go"
)

// Client is a small surface over the S3 SDK tuned for the cacher use case.
type Client struct {
	api      *s3.Client
	uploader *manager.Uploader
	bucket   string
	prefix   string // joined to every key (with / separator)
}

// Options configures a new Client.
type Options struct {
	Endpoint  string
	Region    string
	Bucket    string
	Prefix    string
	KeyID     string
	Secret    string
	UserAgent string // appended to the default UA for traceability
}

// New constructs a Client. The HTTP client uses Go's default transport;
// callers can decorate later if needed (timeouts, proxies).
func New(opts Options) (*Client, error) {
	if opts.Endpoint == "" {
		return nil, errors.New("s3: endpoint required")
	}
	if opts.Region == "" {
		return nil, errors.New("s3: region required")
	}
	if opts.Bucket == "" {
		return nil, errors.New("s3: bucket required")
	}
	if opts.KeyID == "" || opts.Secret == "" {
		return nil, errors.New("s3: credentials required")
	}

	api := s3.New(s3.Options{
		Region:      opts.Region,
		Credentials: credentials.NewStaticCredentialsProvider(opts.KeyID, opts.Secret, ""),
		UsePathStyle: true, // Garage needs path-style addressing.
		BaseEndpoint: aws.String(opts.Endpoint),
		// Garage doesn't implement boto3 1.36+ trailing CRC32 checksums.
		// Mirrors AWS_REQUEST_CHECKSUM_CALCULATION=when_required and
		// AWS_RESPONSE_CHECKSUM_VALIDATION=when_required from the shell.
		RequestChecksumCalculation: aws.RequestChecksumCalculationWhenRequired,
		ResponseChecksumValidation: aws.ResponseChecksumValidationWhenRequired,
	})

	uploader := manager.NewUploader(api, func(u *manager.Uploader) {
		// Single-stream upload — multipart is auto for >5MiB.
		u.Concurrency = 1
	})

	return &Client{
		api:      api,
		uploader: uploader,
		bucket:   opts.Bucket,
		prefix:   strings.TrimSuffix(opts.Prefix, "/"),
	}, nil
}

// FullKey returns prefix/key, with single-slash semantics.
func (c *Client) FullKey(key string) string {
	if c.prefix == "" {
		return strings.TrimPrefix(key, "/")
	}
	return c.prefix + "/" + strings.TrimPrefix(key, "/")
}

// Bucket returns the configured bucket name (for diagnostics).
func (c *Client) Bucket() string { return c.bucket }

// SetPrefix mutates the configured key prefix in place. Used by
// `cacher list --root` to escape the project namespace.
func (c *Client) SetPrefix(prefix string) {
	c.prefix = strings.TrimSuffix(prefix, "/")
}

// Exists returns true if the object is present.
func (c *Client) Exists(ctx context.Context, key string) (bool, error) {
	_, err := c.api.HeadObject(ctx, &s3.HeadObjectInput{
		Bucket: aws.String(c.bucket),
		Key:    aws.String(c.FullKey(key)),
	})
	if err == nil {
		return true, nil
	}
	if isNotFound(err) {
		return false, nil
	}
	return false, fmt.Errorf("head s3://%s/%s: %w", c.bucket, c.FullKey(key), err)
}

// Get streams the object body. Caller must Close the returned reader.
func (c *Client) Get(ctx context.Context, key string) (io.ReadCloser, error) {
	out, err := c.api.GetObject(ctx, &s3.GetObjectInput{
		Bucket: aws.String(c.bucket),
		Key:    aws.String(c.FullKey(key)),
	})
	if err != nil {
		if isNotFound(err) {
			return nil, fmt.Errorf("get s3://%s/%s: not found", c.bucket, c.FullKey(key))
		}
		return nil, fmt.Errorf("get s3://%s/%s: %w", c.bucket, c.FullKey(key), err)
	}
	return out.Body, nil
}

// Put uploads body to key, streaming. Multipart is automatic for bodies
// over the manager's PartSize threshold (5MiB by default).
func (c *Client) Put(ctx context.Context, key string, body io.Reader) error {
	_, err := c.uploader.Upload(ctx, &s3.PutObjectInput{
		Bucket: aws.String(c.bucket),
		Key:    aws.String(c.FullKey(key)),
		Body:   body,
	})
	if err != nil {
		return fmt.Errorf("upload s3://%s/%s: %w", c.bucket, c.FullKey(key), err)
	}
	return nil
}

// Delete removes the object. Deleting a missing object returns nil
// (matches `aws s3 rm` semantics).
func (c *Client) Delete(ctx context.Context, key string) error {
	_, err := c.api.DeleteObject(ctx, &s3.DeleteObjectInput{
		Bucket: aws.String(c.bucket),
		Key:    aws.String(c.FullKey(key)),
	})
	if err != nil && !isNotFound(err) {
		return fmt.Errorf("delete s3://%s/%s: %w", c.bucket, c.FullKey(key), err)
	}
	return nil
}

// ListResult separates "files at this level" from "common prefixes
// (directories)". When recursive=true, Prefixes is always empty and
// Keys contains every object under sub.
type ListResult struct {
	Keys     []string // object keys relative to the client prefix
	Prefixes []string // common prefixes (each ending in "/") relative to the client prefix
}

// List walks objects under sub. When recursive is false, results are
// delimited by "/" so output matches `aws s3 ls` — common prefixes
// surface as "directories".
func (c *Client) List(ctx context.Context, sub string, recursive bool) (ListResult, error) {
	fullPrefix := c.FullKey(sub)
	// When using a delimiter, the prefix must end with the delimiter
	// for "directory" semantics — otherwise S3 treats the prefix as a
	// literal substring match.
	if !recursive && fullPrefix != "" && !strings.HasSuffix(fullPrefix, "/") {
		fullPrefix += "/"
	}
	var delim *string
	if !recursive {
		delim = aws.String("/")
	}

	var res ListResult
	trim := ""
	if c.prefix != "" {
		trim = c.prefix + "/"
	}

	var token *string
	for {
		out, err := c.api.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
			Bucket:            aws.String(c.bucket),
			Prefix:            aws.String(fullPrefix),
			Delimiter:         delim,
			ContinuationToken: token,
		})
		if err != nil {
			return res, fmt.Errorf("list s3://%s/%s: %w", c.bucket, fullPrefix, err)
		}
		for _, o := range out.Contents {
			if o.Key == nil {
				continue
			}
			res.Keys = append(res.Keys, strings.TrimPrefix(*o.Key, trim))
		}
		for _, p := range out.CommonPrefixes {
			if p.Prefix == nil {
				continue
			}
			res.Prefixes = append(res.Prefixes, strings.TrimPrefix(*p.Prefix, trim))
		}
		if out.IsTruncated == nil || !*out.IsTruncated {
			break
		}
		token = out.NextContinuationToken
	}
	return res, nil
}

// PingBucket is the smoke test used by `cacher doctor`. Garage rejects
// HeadBucket with 403 even for valid credentials, so we use a 1-key
// ListObjectsV2 instead — matches `aws s3 ls s3://$bucket/` from the
// shell helper this replaces.
func (c *Client) PingBucket(ctx context.Context) error {
	_, err := c.api.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
		Bucket:  aws.String(c.bucket),
		MaxKeys: aws.Int32(1),
	})
	if err != nil {
		return fmt.Errorf("ping bucket %s: %w", c.bucket, err)
	}
	return nil
}

// isNotFound covers both HeadObject's empty-error case and the typed
// NoSuchKey returned by GetObject.
func isNotFound(err error) bool {
	var nf *types.NotFound
	var nsk *types.NoSuchKey
	if errors.As(err, &nf) || errors.As(err, &nsk) {
		return true
	}
	var apiErr smithy.APIError
	if errors.As(err, &apiErr) {
		switch apiErr.ErrorCode() {
		case "NotFound", "NoSuchKey", "404":
			return true
		}
	}
	var rerr interface{ HTTPStatusCode() int }
	if errors.As(err, &rerr) && rerr.HTTPStatusCode() == http.StatusNotFound {
		return true
	}
	return false
}