package cmd import ( "context" "fmt" "io" "os/exec" "github.com/klauspost/compress/zstd" "github.com/spf13/cobra" ) var dockerCmd = &cobra.Command{ Use: "docker", Short: "Docker image cache (tar+zstd streamed to S3)", } var dockerExistsCmd = &cobra.Command{ Use: "exists ", Short: "Exit 0 if a cached docker image is present at key, 1 if missing", Args: cobra.ExactArgs(1), RunE: func(cmd *cobra.Command, args []string) error { cli, cfg, err := client() if err != nil { return err } key, err := resolveKey(args[0], cfg) if err != nil { return err } ok, err := cli.Exists(context.Background(), key) if err != nil { return err } fmt.Fprintln(cmd.OutOrStdout(), key) if !ok { return ErrNotFound } return nil }, } var flagDockerPull bool var dockerDownloadCmd = &cobra.Command{ Use: "download ", Short: "Pull a cached image from S3 into the local docker daemon", Long: `Streams s3://bucket/key through zstd-decode into ` + "`docker load`" + `. With --pull, a cache miss falls back to ` + "`docker pull `" + ` then seeds the S3 cache via ` + "`docker save`" + `. This makes a single invocation into the full cache-or-fetch pattern callable from a CI manifest: cacher docker download garage/v2.3.0.tar.zst dxflrs/garage:v2.3.0 --pull`, Args: cobra.ExactArgs(2), RunE: func(cmd *cobra.Command, args []string) error { cli, cfg, err := client() if err != nil { return err } key, err := resolveKey(args[0], cfg) if err != nil { return err } // The arg is for the caller's own ergonomics + logging; // docker load reads the tag from the tar stream itself. tag := args[1] ctx := context.Background() ok, err := cli.Exists(ctx, key) if err != nil { return err } if ok { fmt.Fprintf(cmd.ErrOrStderr(), "Cache HIT — %s → load %s\n", key, tag) body, err := cli.Get(ctx, key) if err != nil { return err } defer body.Close() dec, err := zstd.NewReader(body) if err != nil { return err } defer dec.Close() return runDockerLoad(ctx, dec) } fmt.Fprintf(cmd.ErrOrStderr(), "Cache MISS — %s\n", key) if !flagDockerPull { return fmt.Errorf("%w: %s", ErrNotFound, key) } fmt.Fprintf(cmd.ErrOrStderr(), "Pulling %s\n", tag) if err := runDockerPull(ctx, tag); err != nil { return err } // Image is now in the local daemon. Seed the S3 cache so the // next run hits. Treat the upload as best-effort — the pull // already gave us what we needed locally. if err := saveTagToS3(ctx, cli, key, tag); err != nil { fmt.Fprintf(cmd.ErrOrStderr(), "warn: seed upload failed: %v\n", err) return nil } fmt.Fprintf(cmd.ErrOrStderr(), "Cached → %s\n", key) return nil }, } var dockerUploadCmd = &cobra.Command{ Use: "upload ", Short: "Save a local docker image to the S3 cache", Long: `Pipes ` + "`docker save image:tag`" + ` through zstd-encode into a streamed S3 multipart upload. No on-disk tempfile.`, Args: cobra.ExactArgs(2), RunE: func(cmd *cobra.Command, args []string) error { cli, cfg, err := client() if err != nil { return err } key, err := resolveKey(args[0], cfg) if err != nil { return err } tag := args[1] ctx := context.Background() if !flagForce { ok, err := cli.Exists(ctx, key) if err != nil { return err } if ok { fmt.Fprintf(cmd.ErrOrStderr(), "Skipped — %s already present (use --force)\n", key) return nil } } fmt.Fprintf(cmd.ErrOrStderr(), "Saving %s → %s\n", tag, key) return saveTagToS3(ctx, cli, key, tag) }, } func init() { for _, c := range []*cobra.Command{dockerExistsCmd, dockerDownloadCmd, dockerUploadCmd} { addS3Flags(c) addKeyFlags(c) } dockerUploadCmd.Flags().BoolVar(&flagForce, "force", false, "overwrite if key already exists") dockerDownloadCmd.Flags().BoolVar(&flagDockerPull, "pull", false, "on cache miss, docker pull and seed S3") dockerCmd.AddCommand(dockerExistsCmd, dockerDownloadCmd, dockerUploadCmd) rootCmd.AddCommand(dockerCmd) } // runDockerPull invokes `docker pull `, surfacing daemon output as // a single warn line per chunk so the test log isn't dominated by pull // progress bars. func runDockerPull(ctx context.Context, tag string) error { pull := exec.CommandContext(ctx, "docker", "pull", tag) pull.Stdout = newWarnWriter("docker pull") pull.Stderr = newWarnWriter("docker pull") if err := pull.Run(); err != nil { return fmt.Errorf("docker pull %s: %w", tag, err) } return nil } // saveTagToS3 wires `docker save` stdout → zstd encoder → io.Pipe → // s3.Put. The encoder runs in a goroutine writing into the pipe; the // uploader goroutine reads from the pipe. Both ends signal errors back // through pw.CloseWithError so the upload sees a clean EOF or a wrapped // failure. func saveTagToS3(ctx context.Context, cli s3Putter, key, tag string) error { save := exec.CommandContext(ctx, "docker", "save", tag) saveOut, err := save.StdoutPipe() if err != nil { return err } save.Stderr = newWarnWriter("docker save") if err := save.Start(); err != nil { return fmt.Errorf("docker save: %w", err) } pr, pw := io.Pipe() go func() { zw, zerr := zstd.NewWriter(pw, zstd.WithEncoderLevel(zstd.SpeedDefault)) if zerr != nil { pw.CloseWithError(zerr) return } if _, cerr := io.Copy(zw, saveOut); cerr != nil { zw.Close() pw.CloseWithError(cerr) return } if zerr := zw.Close(); zerr != nil { pw.CloseWithError(zerr) return } pw.Close() }() upErr := cli.Put(ctx, key, pr) waitErr := save.Wait() if upErr != nil { return upErr } if waitErr != nil { return fmt.Errorf("docker save: %w", waitErr) } return nil } // runDockerLoad pipes r (already zstd-decoded) into `docker load`. func runDockerLoad(ctx context.Context, r io.Reader) error { load := exec.CommandContext(ctx, "docker", "load") stdin, err := load.StdinPipe() if err != nil { return err } load.Stdout = newWarnWriter("docker load") load.Stderr = newWarnWriter("docker load") if err := load.Start(); err != nil { return fmt.Errorf("docker load: %w", err) } _, copyErr := io.Copy(stdin, r) stdin.Close() waitErr := load.Wait() if copyErr != nil { return fmt.Errorf("docker load write: %w", copyErr) } if waitErr != nil { return fmt.Errorf("docker load: %w", waitErr) } return nil } // s3Putter is satisfied by *s3.Client; declared here so saveTagToS3 is // testable with a fake. type s3Putter interface { Put(ctx context.Context, key string, r io.Reader) error }