package cmd
import (
"context"
"fmt"
"io"
"os/exec"
"github.com/klauspost/compress/zstd"
"github.com/spf13/cobra"
)
var dockerCmd = &cobra.Command{
Use: "docker",
Short: "Docker image cache (tar+zstd streamed to S3)",
}
var dockerExistsCmd = &cobra.Command{
Use: "exists <key>",
Short: "Exit 0 if a cached docker image is present at key, 1 if missing",
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
cli, cfg, err := client()
if err != nil {
return err
}
key, err := resolveKey(args[0], cfg)
if err != nil {
return err
}
ok, err := cli.Exists(context.Background(), key)
if err != nil {
return err
}
fmt.Fprintln(cmd.OutOrStdout(), key)
if !ok {
return ErrNotFound
}
return nil
},
}
var dockerDownloadCmd = &cobra.Command{
Use: "download <key> <image:tag>",
Short: "Pull a cached image from S3 into the local docker daemon",
Long: `Streams s3://bucket/key through zstd-decode into ` + "`docker load`" + `.`,
Args: cobra.ExactArgs(2),
RunE: func(cmd *cobra.Command, args []string) error {
cli, cfg, err := client()
if err != nil {
return err
}
key, err := resolveKey(args[0], cfg)
if err != nil {
return err
}
// The <image:tag> arg is for the caller's own ergonomics + logging;
// docker load reads the tag from the tar stream itself.
tag := args[1]
ctx := context.Background()
ok, err := cli.Exists(ctx, key)
if err != nil {
return err
}
if !ok {
return fmt.Errorf("%w: %s", ErrNotFound, key)
}
fmt.Fprintf(cmd.ErrOrStderr(), "Cache HIT — %s → load %s\n", key, tag)
body, err := cli.Get(ctx, key)
if err != nil {
return err
}
defer body.Close()
dec, err := zstd.NewReader(body)
if err != nil {
return err
}
defer dec.Close()
return runDockerLoad(ctx, dec)
},
}
var dockerUploadCmd = &cobra.Command{
Use: "upload <key> <image:tag>",
Short: "Save a local docker image to the S3 cache",
Long: `Pipes ` + "`docker save image:tag`" + ` through zstd-encode into a streamed
S3 multipart upload. No on-disk tempfile.`,
Args: cobra.ExactArgs(2),
RunE: func(cmd *cobra.Command, args []string) error {
cli, cfg, err := client()
if err != nil {
return err
}
key, err := resolveKey(args[0], cfg)
if err != nil {
return err
}
tag := args[1]
ctx := context.Background()
if !flagForce {
ok, err := cli.Exists(ctx, key)
if err != nil {
return err
}
if ok {
fmt.Fprintf(cmd.ErrOrStderr(), "Skipped — %s already present (use --force)\n", key)
return nil
}
}
fmt.Fprintf(cmd.ErrOrStderr(), "Saving %s → %s\n", tag, key)
return saveTagToS3(ctx, cli, key, tag)
},
}
func init() {
for _, c := range []*cobra.Command{dockerExistsCmd, dockerDownloadCmd, dockerUploadCmd} {
addS3Flags(c)
addKeyFlags(c)
}
dockerUploadCmd.Flags().BoolVar(&flagForce, "force", false, "overwrite if key already exists")
dockerCmd.AddCommand(dockerExistsCmd, dockerDownloadCmd, dockerUploadCmd)
rootCmd.AddCommand(dockerCmd)
}
// saveTagToS3 wires `docker save` stdout → zstd encoder → io.Pipe →
// s3.Put. The encoder runs in a goroutine writing into the pipe; the
// uploader goroutine reads from the pipe. Both ends signal errors back
// through pw.CloseWithError so the upload sees a clean EOF or a wrapped
// failure.
func saveTagToS3(ctx context.Context, cli s3Putter, key, tag string) error {
save := exec.CommandContext(ctx, "docker", "save", tag)
saveOut, err := save.StdoutPipe()
if err != nil {
return err
}
save.Stderr = newWarnWriter("docker save")
if err := save.Start(); err != nil {
return fmt.Errorf("docker save: %w", err)
}
pr, pw := io.Pipe()
go func() {
zw, zerr := zstd.NewWriter(pw, zstd.WithEncoderLevel(zstd.SpeedDefault))
if zerr != nil {
pw.CloseWithError(zerr)
return
}
if _, cerr := io.Copy(zw, saveOut); cerr != nil {
zw.Close()
pw.CloseWithError(cerr)
return
}
if zerr := zw.Close(); zerr != nil {
pw.CloseWithError(zerr)
return
}
pw.Close()
}()
upErr := cli.Put(ctx, key, pr)
waitErr := save.Wait()
if upErr != nil {
return upErr
}
if waitErr != nil {
return fmt.Errorf("docker save: %w", waitErr)
}
return nil
}
// runDockerLoad pipes r (already zstd-decoded) into `docker load`.
func runDockerLoad(ctx context.Context, r io.Reader) error {
load := exec.CommandContext(ctx, "docker", "load")
stdin, err := load.StdinPipe()
if err != nil {
return err
}
load.Stdout = newWarnWriter("docker load")
load.Stderr = newWarnWriter("docker load")
if err := load.Start(); err != nil {
return fmt.Errorf("docker load: %w", err)
}
_, copyErr := io.Copy(stdin, r)
stdin.Close()
waitErr := load.Wait()
if copyErr != nil {
return fmt.Errorf("docker load write: %w", copyErr)
}
if waitErr != nil {
return fmt.Errorf("docker load: %w", waitErr)
}
return nil
}
// s3Putter is satisfied by *s3.Client; declared here so saveTagToS3 is
// testable with a fake.
type s3Putter interface {
Put(ctx context.Context, key string, r io.Reader) error
}