// Package s3 wraps the AWS SDK v2 S3 client with the Garage-compatible // settings that the existing .builds/lib/ci-lib.sh shell helpers configure // via ~/.aws/config: // // [default] // region = $AWS_DEFAULT_REGION // s3 = // addressing_style = path // signature_version = s3v4 // (plus AWS_REQUEST/RESPONSE_CHECKSUM_* = when_required for Garage) package s3 import ( "context" "errors" "fmt" "io" "net/http" "strings" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/feature/s3/manager" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3/types" "github.com/aws/smithy-go" ) // Client is a small surface over the S3 SDK tuned for the cacher use case. type Client struct { api *s3.Client uploader *manager.Uploader bucket string prefix string // joined to every key (with / separator) } // Options configures a new Client. type Options struct { Endpoint string Region string Bucket string Prefix string KeyID string Secret string UserAgent string // appended to the default UA for traceability } // New constructs a Client. The HTTP client uses Go's default transport; // callers can decorate later if needed (timeouts, proxies). func New(opts Options) (*Client, error) { if opts.Endpoint == "" { return nil, errors.New("s3: endpoint required") } if opts.Region == "" { return nil, errors.New("s3: region required") } if opts.Bucket == "" { return nil, errors.New("s3: bucket required") } if opts.KeyID == "" || opts.Secret == "" { return nil, errors.New("s3: credentials required") } api := s3.New(s3.Options{ Region: opts.Region, Credentials: credentials.NewStaticCredentialsProvider(opts.KeyID, opts.Secret, ""), UsePathStyle: true, // Garage needs path-style addressing. BaseEndpoint: aws.String(opts.Endpoint), // Garage doesn't implement boto3 1.36+ trailing CRC32 checksums. // Mirrors AWS_REQUEST_CHECKSUM_CALCULATION=when_required and // AWS_RESPONSE_CHECKSUM_VALIDATION=when_required from the shell. RequestChecksumCalculation: aws.RequestChecksumCalculationWhenRequired, ResponseChecksumValidation: aws.ResponseChecksumValidationWhenRequired, }) uploader := manager.NewUploader(api, func(u *manager.Uploader) { // Single-stream upload — multipart is auto for >5MiB. u.Concurrency = 1 }) return &Client{ api: api, uploader: uploader, bucket: opts.Bucket, prefix: strings.TrimSuffix(opts.Prefix, "/"), }, nil } // FullKey returns prefix/key, with single-slash semantics. func (c *Client) FullKey(key string) string { if c.prefix == "" { return strings.TrimPrefix(key, "/") } return c.prefix + "/" + strings.TrimPrefix(key, "/") } // Bucket returns the configured bucket name (for diagnostics). func (c *Client) Bucket() string { return c.bucket } // Exists returns true if the object is present. func (c *Client) Exists(ctx context.Context, key string) (bool, error) { _, err := c.api.HeadObject(ctx, &s3.HeadObjectInput{ Bucket: aws.String(c.bucket), Key: aws.String(c.FullKey(key)), }) if err == nil { return true, nil } if isNotFound(err) { return false, nil } return false, fmt.Errorf("head s3://%s/%s: %w", c.bucket, c.FullKey(key), err) } // Get streams the object body. Caller must Close the returned reader. func (c *Client) Get(ctx context.Context, key string) (io.ReadCloser, error) { out, err := c.api.GetObject(ctx, &s3.GetObjectInput{ Bucket: aws.String(c.bucket), Key: aws.String(c.FullKey(key)), }) if err != nil { if isNotFound(err) { return nil, fmt.Errorf("get s3://%s/%s: not found", c.bucket, c.FullKey(key)) } return nil, fmt.Errorf("get s3://%s/%s: %w", c.bucket, c.FullKey(key), err) } return out.Body, nil } // Put uploads body to key, streaming. Multipart is automatic for bodies // over the manager's PartSize threshold (5MiB by default). func (c *Client) Put(ctx context.Context, key string, body io.Reader) error { _, err := c.uploader.Upload(ctx, &s3.PutObjectInput{ Bucket: aws.String(c.bucket), Key: aws.String(c.FullKey(key)), Body: body, }) if err != nil { return fmt.Errorf("upload s3://%s/%s: %w", c.bucket, c.FullKey(key), err) } return nil } // Delete removes the object. Deleting a missing object returns nil // (matches `aws s3 rm` semantics). func (c *Client) Delete(ctx context.Context, key string) error { _, err := c.api.DeleteObject(ctx, &s3.DeleteObjectInput{ Bucket: aws.String(c.bucket), Key: aws.String(c.FullKey(key)), }) if err != nil && !isNotFound(err) { return fmt.Errorf("delete s3://%s/%s: %w", c.bucket, c.FullKey(key), err) } return nil } // List yields object keys (relative to prefix) under sub. func (c *Client) List(ctx context.Context, sub string) ([]string, error) { fullPrefix := c.FullKey(sub) var keys []string var token *string for { out, err := c.api.ListObjectsV2(ctx, &s3.ListObjectsV2Input{ Bucket: aws.String(c.bucket), Prefix: aws.String(fullPrefix), ContinuationToken: token, }) if err != nil { return nil, fmt.Errorf("list s3://%s/%s: %w", c.bucket, fullPrefix, err) } for _, o := range out.Contents { if o.Key == nil { continue } keys = append(keys, strings.TrimPrefix(*o.Key, c.prefix+"/")) } if out.IsTruncated == nil || !*out.IsTruncated { break } token = out.NextContinuationToken } return keys, nil } // HeadBucket is the smoke test used by `cacher doctor`. func (c *Client) HeadBucket(ctx context.Context) error { _, err := c.api.HeadBucket(ctx, &s3.HeadBucketInput{ Bucket: aws.String(c.bucket), }) if err != nil { return fmt.Errorf("head bucket %s: %w", c.bucket, err) } return nil } // isNotFound covers both HeadObject's empty-error case and the typed // NoSuchKey returned by GetObject. func isNotFound(err error) bool { var nf *types.NotFound var nsk *types.NoSuchKey if errors.As(err, &nf) || errors.As(err, &nsk) { return true } var apiErr smithy.APIError if errors.As(err, &apiErr) { switch apiErr.ErrorCode() { case "NotFound", "NoSuchKey", "404": return true } } var rerr interface{ HTTPStatusCode() int } if errors.As(err, &rerr) && rerr.HTTPStatusCode() == http.StatusNotFound { return true } return false }