diff --git a/CHANGELOG.md b/CHANGELOG.md index 72122cf..7bd04b5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,25 @@ All notable changes to dbbackup will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [3.42.33] - 2026-01-14 "Exponential Backoff Retry" + +### Added - cenkalti/backoff for Cloud Operation Retry +- **Exponential backoff retry** for all cloud operations (S3, Azure, GCS) +- **Retry configurations**: + - `DefaultRetryConfig()` - 5 retries, 500ms→30s backoff, 5 min max + - `AggressiveRetryConfig()` - 10 retries, 1s→60s backoff, 15 min max + - `QuickRetryConfig()` - 3 retries, 100ms→5s backoff, 30s max +- **Smart error classification**: + - `IsPermanentError()` - Auth/bucket errors (no retry) + - `IsRetryableError()` - Timeout/network errors (retry) +- **Retry logging** - Each retry attempt is logged with wait duration + +### Changed +- S3 simple upload, multipart upload, download now retry on transient failures +- Azure simple upload, download now retry on transient failures +- GCS upload, download now retry on transient failures +- Large file multipart uploads use `AggressiveRetryConfig()` (more retries) + ## [3.42.32] - 2026-01-14 "Cross-Platform Colors" ### Added - fatih/color for Cross-Platform Terminal Colors diff --git a/bin/README.md b/bin/README.md index 673b8c2..47c3ed3 100644 --- a/bin/README.md +++ b/bin/README.md @@ -3,9 +3,9 @@ This directory contains pre-compiled binaries for the DB Backup Tool across multiple platforms and architectures. ## Build Information -- **Version**: 3.42.31 -- **Build Time**: 2026-01-14_15:07:12_UTC -- **Git Commit**: dc6dfd8 +- **Version**: 3.42.32 +- **Build Time**: 2026-01-14_15:13:08_UTC +- **Git Commit**: 6a24ee3 ## Recent Updates (v1.1.0) - ✅ Fixed TUI progress display with line-by-line output diff --git a/go.mod b/go.mod index d916893..ddd26b9 100755 --- a/go.mod +++ b/go.mod @@ -57,6 +57,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/sts v1.41.2 // indirect github.com/aws/smithy-go v1.23.2 // indirect github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect + github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/charmbracelet/colorprofile v0.2.3-0.20250311203215-f60798e515dc // indirect github.com/charmbracelet/x/ansi v0.10.1 // indirect diff --git a/go.sum b/go.sum index a3e6c8b..5de978d 100755 --- a/go.sum +++ b/go.sum @@ -84,6 +84,8 @@ github.com/aws/smithy-go v1.23.2 h1:Crv0eatJUQhaManss33hS5r40CG3ZFH+21XSkqMrIUM= github.com/aws/smithy-go v1.23.2/go.mod h1:LEj2LM3rBRQJxPZTB4KuzZkaZYnZPnvgIhb4pu07mx0= github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiErDT4WkJ2k= github.com/aymanbagabas/go-osc52/v2 v2.0.1/go.mod h1:uYgXzlJ7ZpABp8OJ+exZzJJhRNQ2ASbcXHWsFqH8hp8= +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/charmbracelet/bubbles v0.21.0 h1:9TdC97SdRVg/1aaXNVWfFH3nnLAwOXr8Fn6u6mfQdFs= diff --git a/internal/cloud/azure.go b/internal/cloud/azure.go index 25706cf..c4fbafe 100644 --- a/internal/cloud/azure.go +++ b/internal/cloud/azure.go @@ -151,37 +151,46 @@ func (a *AzureBackend) Upload(ctx context.Context, localPath, remotePath string, return a.uploadSimple(ctx, file, blobName, fileSize, progress) } -// uploadSimple uploads a file using simple upload (single request) +// uploadSimple uploads a file using simple upload (single request) with retry func (a *AzureBackend) uploadSimple(ctx context.Context, file *os.File, blobName string, fileSize int64, progress ProgressCallback) error { - blockBlobClient := a.client.ServiceClient().NewContainerClient(a.containerName).NewBlockBlobClient(blobName) + return RetryOperationWithNotify(ctx, DefaultRetryConfig(), func() error { + // Reset file position for retry + if _, err := file.Seek(0, 0); err != nil { + return fmt.Errorf("failed to reset file position: %w", err) + } - // Wrap reader with progress tracking - reader := NewProgressReader(file, fileSize, progress) + blockBlobClient := a.client.ServiceClient().NewContainerClient(a.containerName).NewBlockBlobClient(blobName) - // Calculate MD5 hash for integrity - hash := sha256.New() - teeReader := io.TeeReader(reader, hash) + // Wrap reader with progress tracking + reader := NewProgressReader(file, fileSize, progress) - _, err := blockBlobClient.UploadStream(ctx, teeReader, &blockblob.UploadStreamOptions{ - BlockSize: 4 * 1024 * 1024, // 4MB blocks + // Calculate MD5 hash for integrity + hash := sha256.New() + teeReader := io.TeeReader(reader, hash) + + _, err := blockBlobClient.UploadStream(ctx, teeReader, &blockblob.UploadStreamOptions{ + BlockSize: 4 * 1024 * 1024, // 4MB blocks + }) + if err != nil { + return fmt.Errorf("failed to upload blob: %w", err) + } + + // Store checksum as metadata + checksum := hex.EncodeToString(hash.Sum(nil)) + metadata := map[string]*string{ + "sha256": &checksum, + } + + _, err = blockBlobClient.SetMetadata(ctx, metadata, nil) + if err != nil { + // Non-fatal: upload succeeded but metadata failed + fmt.Fprintf(os.Stderr, "Warning: failed to set blob metadata: %v\n", err) + } + + return nil + }, func(err error, duration time.Duration) { + fmt.Printf("[Azure] Upload retry in %v: %v\n", duration, err) }) - if err != nil { - return fmt.Errorf("failed to upload blob: %w", err) - } - - // Store checksum as metadata - checksum := hex.EncodeToString(hash.Sum(nil)) - metadata := map[string]*string{ - "sha256": &checksum, - } - - _, err = blockBlobClient.SetMetadata(ctx, metadata, nil) - if err != nil { - // Non-fatal: upload succeeded but metadata failed - fmt.Fprintf(os.Stderr, "Warning: failed to set blob metadata: %v\n", err) - } - - return nil } // uploadBlocks uploads a file using block blob staging (for large files) @@ -251,7 +260,7 @@ func (a *AzureBackend) uploadBlocks(ctx context.Context, file *os.File, blobName return nil } -// Download downloads a file from Azure Blob Storage +// Download downloads a file from Azure Blob Storage with retry func (a *AzureBackend) Download(ctx context.Context, remotePath, localPath string, progress ProgressCallback) error { blobName := strings.TrimPrefix(remotePath, "/") blockBlobClient := a.client.ServiceClient().NewContainerClient(a.containerName).NewBlockBlobClient(blobName) @@ -264,30 +273,34 @@ func (a *AzureBackend) Download(ctx context.Context, remotePath, localPath strin fileSize := *props.ContentLength - // Download blob - resp, err := blockBlobClient.DownloadStream(ctx, nil) - if err != nil { - return fmt.Errorf("failed to download blob: %w", err) - } - defer resp.Body.Close() + return RetryOperationWithNotify(ctx, DefaultRetryConfig(), func() error { + // Download blob + resp, err := blockBlobClient.DownloadStream(ctx, nil) + if err != nil { + return fmt.Errorf("failed to download blob: %w", err) + } + defer resp.Body.Close() - // Create local file - file, err := os.Create(localPath) - if err != nil { - return fmt.Errorf("failed to create file: %w", err) - } - defer file.Close() + // Create/truncate local file + file, err := os.Create(localPath) + if err != nil { + return fmt.Errorf("failed to create file: %w", err) + } + defer file.Close() - // Wrap reader with progress tracking - reader := NewProgressReader(resp.Body, fileSize, progress) + // Wrap reader with progress tracking + reader := NewProgressReader(resp.Body, fileSize, progress) - // Copy with progress - _, err = io.Copy(file, reader) - if err != nil { - return fmt.Errorf("failed to write file: %w", err) - } + // Copy with progress + _, err = io.Copy(file, reader) + if err != nil { + return fmt.Errorf("failed to write file: %w", err) + } - return nil + return nil + }, func(err error, duration time.Duration) { + fmt.Printf("[Azure] Download retry in %v: %v\n", duration, err) + }) } // Delete deletes a file from Azure Blob Storage diff --git a/internal/cloud/gcs.go b/internal/cloud/gcs.go index 07a01fa..ea81d2c 100644 --- a/internal/cloud/gcs.go +++ b/internal/cloud/gcs.go @@ -89,7 +89,7 @@ func (g *GCSBackend) Name() string { return "gcs" } -// Upload uploads a file to Google Cloud Storage +// Upload uploads a file to Google Cloud Storage with retry func (g *GCSBackend) Upload(ctx context.Context, localPath, remotePath string, progress ProgressCallback) error { file, err := os.Open(localPath) if err != nil { @@ -106,45 +106,54 @@ func (g *GCSBackend) Upload(ctx context.Context, localPath, remotePath string, p // Remove leading slash from remote path objectName := strings.TrimPrefix(remotePath, "/") - bucket := g.client.Bucket(g.bucketName) - object := bucket.Object(objectName) + return RetryOperationWithNotify(ctx, DefaultRetryConfig(), func() error { + // Reset file position for retry + if _, err := file.Seek(0, 0); err != nil { + return fmt.Errorf("failed to reset file position: %w", err) + } - // Create writer with automatic chunking for large files - writer := object.NewWriter(ctx) - writer.ChunkSize = 16 * 1024 * 1024 // 16MB chunks for streaming + bucket := g.client.Bucket(g.bucketName) + object := bucket.Object(objectName) - // Wrap reader with progress tracking and hash calculation - hash := sha256.New() - reader := NewProgressReader(io.TeeReader(file, hash), fileSize, progress) + // Create writer with automatic chunking for large files + writer := object.NewWriter(ctx) + writer.ChunkSize = 16 * 1024 * 1024 // 16MB chunks for streaming - // Upload with progress tracking - _, err = io.Copy(writer, reader) - if err != nil { - writer.Close() - return fmt.Errorf("failed to upload object: %w", err) - } + // Wrap reader with progress tracking and hash calculation + hash := sha256.New() + reader := NewProgressReader(io.TeeReader(file, hash), fileSize, progress) - // Close writer (finalizes upload) - if err := writer.Close(); err != nil { - return fmt.Errorf("failed to finalize upload: %w", err) - } + // Upload with progress tracking + _, err = io.Copy(writer, reader) + if err != nil { + writer.Close() + return fmt.Errorf("failed to upload object: %w", err) + } - // Store checksum as metadata - checksum := hex.EncodeToString(hash.Sum(nil)) - _, err = object.Update(ctx, storage.ObjectAttrsToUpdate{ - Metadata: map[string]string{ - "sha256": checksum, - }, + // Close writer (finalizes upload) + if err := writer.Close(); err != nil { + return fmt.Errorf("failed to finalize upload: %w", err) + } + + // Store checksum as metadata + checksum := hex.EncodeToString(hash.Sum(nil)) + _, err = object.Update(ctx, storage.ObjectAttrsToUpdate{ + Metadata: map[string]string{ + "sha256": checksum, + }, + }) + if err != nil { + // Non-fatal: upload succeeded but metadata failed + fmt.Fprintf(os.Stderr, "Warning: failed to set object metadata: %v\n", err) + } + + return nil + }, func(err error, duration time.Duration) { + fmt.Printf("[GCS] Upload retry in %v: %v\n", duration, err) }) - if err != nil { - // Non-fatal: upload succeeded but metadata failed - fmt.Fprintf(os.Stderr, "Warning: failed to set object metadata: %v\n", err) - } - - return nil } -// Download downloads a file from Google Cloud Storage +// Download downloads a file from Google Cloud Storage with retry func (g *GCSBackend) Download(ctx context.Context, remotePath, localPath string, progress ProgressCallback) error { objectName := strings.TrimPrefix(remotePath, "/") @@ -159,30 +168,34 @@ func (g *GCSBackend) Download(ctx context.Context, remotePath, localPath string, fileSize := attrs.Size - // Create reader - reader, err := object.NewReader(ctx) - if err != nil { - return fmt.Errorf("failed to download object: %w", err) - } - defer reader.Close() + return RetryOperationWithNotify(ctx, DefaultRetryConfig(), func() error { + // Create reader + reader, err := object.NewReader(ctx) + if err != nil { + return fmt.Errorf("failed to download object: %w", err) + } + defer reader.Close() - // Create local file - file, err := os.Create(localPath) - if err != nil { - return fmt.Errorf("failed to create file: %w", err) - } - defer file.Close() + // Create/truncate local file + file, err := os.Create(localPath) + if err != nil { + return fmt.Errorf("failed to create file: %w", err) + } + defer file.Close() - // Wrap reader with progress tracking - progressReader := NewProgressReader(reader, fileSize, progress) + // Wrap reader with progress tracking + progressReader := NewProgressReader(reader, fileSize, progress) - // Copy with progress - _, err = io.Copy(file, progressReader) - if err != nil { - return fmt.Errorf("failed to write file: %w", err) - } + // Copy with progress + _, err = io.Copy(file, progressReader) + if err != nil { + return fmt.Errorf("failed to write file: %w", err) + } - return nil + return nil + }, func(err error, duration time.Duration) { + fmt.Printf("[GCS] Download retry in %v: %v\n", duration, err) + }) } // Delete deletes a file from Google Cloud Storage diff --git a/internal/cloud/retry.go b/internal/cloud/retry.go new file mode 100644 index 0000000..8f32d12 --- /dev/null +++ b/internal/cloud/retry.go @@ -0,0 +1,257 @@ +package cloud + +import ( + "context" + "fmt" + "net" + "strings" + "time" + + "github.com/cenkalti/backoff/v4" +) + +// RetryConfig configures retry behavior +type RetryConfig struct { + MaxRetries int // Maximum number of retries (0 = unlimited) + InitialInterval time.Duration // Initial backoff interval + MaxInterval time.Duration // Maximum backoff interval + MaxElapsedTime time.Duration // Maximum total time for retries + Multiplier float64 // Backoff multiplier +} + +// DefaultRetryConfig returns sensible defaults for cloud operations +func DefaultRetryConfig() *RetryConfig { + return &RetryConfig{ + MaxRetries: 5, + InitialInterval: 500 * time.Millisecond, + MaxInterval: 30 * time.Second, + MaxElapsedTime: 5 * time.Minute, + Multiplier: 2.0, + } +} + +// AggressiveRetryConfig returns config for critical operations that need more retries +func AggressiveRetryConfig() *RetryConfig { + return &RetryConfig{ + MaxRetries: 10, + InitialInterval: 1 * time.Second, + MaxInterval: 60 * time.Second, + MaxElapsedTime: 15 * time.Minute, + Multiplier: 1.5, + } +} + +// QuickRetryConfig returns config for operations that should fail fast +func QuickRetryConfig() *RetryConfig { + return &RetryConfig{ + MaxRetries: 3, + InitialInterval: 100 * time.Millisecond, + MaxInterval: 5 * time.Second, + MaxElapsedTime: 30 * time.Second, + Multiplier: 2.0, + } +} + +// RetryOperation executes an operation with exponential backoff retry +func RetryOperation(ctx context.Context, cfg *RetryConfig, operation func() error) error { + if cfg == nil { + cfg = DefaultRetryConfig() + } + + // Create exponential backoff + expBackoff := backoff.NewExponentialBackOff() + expBackoff.InitialInterval = cfg.InitialInterval + expBackoff.MaxInterval = cfg.MaxInterval + expBackoff.MaxElapsedTime = cfg.MaxElapsedTime + expBackoff.Multiplier = cfg.Multiplier + expBackoff.Reset() + + // Wrap with max retries if specified + var b backoff.BackOff = expBackoff + if cfg.MaxRetries > 0 { + b = backoff.WithMaxRetries(expBackoff, uint64(cfg.MaxRetries)) + } + + // Add context support + b = backoff.WithContext(b, ctx) + + // Track attempts for logging + attempt := 0 + + // Wrap operation to handle permanent vs retryable errors + wrappedOp := func() error { + attempt++ + err := operation() + if err == nil { + return nil + } + + // Check if error is permanent (should not retry) + if IsPermanentError(err) { + return backoff.Permanent(err) + } + + return err + } + + return backoff.Retry(wrappedOp, b) +} + +// RetryOperationWithNotify executes an operation with retry and calls notify on each retry +func RetryOperationWithNotify(ctx context.Context, cfg *RetryConfig, operation func() error, notify func(err error, duration time.Duration)) error { + if cfg == nil { + cfg = DefaultRetryConfig() + } + + // Create exponential backoff + expBackoff := backoff.NewExponentialBackOff() + expBackoff.InitialInterval = cfg.InitialInterval + expBackoff.MaxInterval = cfg.MaxInterval + expBackoff.MaxElapsedTime = cfg.MaxElapsedTime + expBackoff.Multiplier = cfg.Multiplier + expBackoff.Reset() + + // Wrap with max retries if specified + var b backoff.BackOff = expBackoff + if cfg.MaxRetries > 0 { + b = backoff.WithMaxRetries(expBackoff, uint64(cfg.MaxRetries)) + } + + // Add context support + b = backoff.WithContext(b, ctx) + + // Wrap operation to handle permanent vs retryable errors + wrappedOp := func() error { + err := operation() + if err == nil { + return nil + } + + // Check if error is permanent (should not retry) + if IsPermanentError(err) { + return backoff.Permanent(err) + } + + return err + } + + return backoff.RetryNotify(wrappedOp, b, notify) +} + +// IsPermanentError returns true if the error should not be retried +func IsPermanentError(err error) bool { + if err == nil { + return false + } + + errStr := strings.ToLower(err.Error()) + + // Authentication/authorization errors - don't retry + permanentPatterns := []string{ + "access denied", + "forbidden", + "unauthorized", + "invalid credentials", + "invalid access key", + "invalid secret", + "no such bucket", + "bucket not found", + "container not found", + "nosuchbucket", + "nosuchkey", + "invalid argument", + "malformed", + "invalid request", + "permission denied", + "access control", + "policy", + } + + for _, pattern := range permanentPatterns { + if strings.Contains(errStr, pattern) { + return true + } + } + + return false +} + +// IsRetryableError returns true if the error is transient and should be retried +func IsRetryableError(err error) bool { + if err == nil { + return false + } + + // Network errors are typically retryable + var netErr net.Error + if ok := isNetError(err, &netErr); ok { + return netErr.Timeout() || netErr.Temporary() + } + + errStr := strings.ToLower(err.Error()) + + // Transient errors - should retry + retryablePatterns := []string{ + "timeout", + "connection reset", + "connection refused", + "connection closed", + "eof", + "broken pipe", + "temporary failure", + "service unavailable", + "internal server error", + "bad gateway", + "gateway timeout", + "too many requests", + "rate limit", + "throttl", + "slowdown", + "try again", + "retry", + } + + for _, pattern := range retryablePatterns { + if strings.Contains(errStr, pattern) { + return true + } + } + + return false +} + +// isNetError checks if err wraps a net.Error +func isNetError(err error, target *net.Error) bool { + for err != nil { + if ne, ok := err.(net.Error); ok { + *target = ne + return true + } + // Try to unwrap + if unwrapper, ok := err.(interface{ Unwrap() error }); ok { + err = unwrapper.Unwrap() + } else { + break + } + } + return false +} + +// WithRetry is a helper that wraps a function with default retry logic +func WithRetry(ctx context.Context, operationName string, fn func() error) error { + notify := func(err error, duration time.Duration) { + // Log retry attempts (caller can provide their own logger if needed) + fmt.Printf("[RETRY] %s failed, retrying in %v: %v\n", operationName, duration, err) + } + + return RetryOperationWithNotify(ctx, DefaultRetryConfig(), fn, notify) +} + +// WithRetryConfig is a helper that wraps a function with custom retry config +func WithRetryConfig(ctx context.Context, cfg *RetryConfig, operationName string, fn func() error) error { + notify := func(err error, duration time.Duration) { + fmt.Printf("[RETRY] %s failed, retrying in %v: %v\n", operationName, duration, err) + } + + return RetryOperationWithNotify(ctx, cfg, fn, notify) +} diff --git a/internal/cloud/s3.go b/internal/cloud/s3.go index d0da4c8..d1ef467 100644 --- a/internal/cloud/s3.go +++ b/internal/cloud/s3.go @@ -7,6 +7,7 @@ import ( "os" "path/filepath" "strings" + "time" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" @@ -123,63 +124,81 @@ func (s *S3Backend) Upload(ctx context.Context, localPath, remotePath string, pr return s.uploadSimple(ctx, file, key, fileSize, progress) } -// uploadSimple performs a simple single-part upload +// uploadSimple performs a simple single-part upload with retry func (s *S3Backend) uploadSimple(ctx context.Context, file *os.File, key string, fileSize int64, progress ProgressCallback) error { - // Create progress reader - var reader io.Reader = file - if progress != nil { - reader = NewProgressReader(file, fileSize, progress) - } + return RetryOperationWithNotify(ctx, DefaultRetryConfig(), func() error { + // Reset file position for retry + if _, err := file.Seek(0, 0); err != nil { + return fmt.Errorf("failed to reset file position: %w", err) + } - // Upload to S3 - _, err := s.client.PutObject(ctx, &s3.PutObjectInput{ - Bucket: aws.String(s.bucket), - Key: aws.String(key), - Body: reader, + // Create progress reader + var reader io.Reader = file + if progress != nil { + reader = NewProgressReader(file, fileSize, progress) + } + + // Upload to S3 + _, err := s.client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(key), + Body: reader, + }) + + if err != nil { + return fmt.Errorf("failed to upload to S3: %w", err) + } + + return nil + }, func(err error, duration time.Duration) { + fmt.Printf("[S3] Upload retry in %v: %v\n", duration, err) }) - - if err != nil { - return fmt.Errorf("failed to upload to S3: %w", err) - } - - return nil } -// uploadMultipart performs a multipart upload for large files +// uploadMultipart performs a multipart upload for large files with retry func (s *S3Backend) uploadMultipart(ctx context.Context, file *os.File, key string, fileSize int64, progress ProgressCallback) error { - // Create uploader with custom options - uploader := manager.NewUploader(s.client, func(u *manager.Uploader) { - // Part size: 10MB - u.PartSize = 10 * 1024 * 1024 + return RetryOperationWithNotify(ctx, AggressiveRetryConfig(), func() error { + // Reset file position for retry + if _, err := file.Seek(0, 0); err != nil { + return fmt.Errorf("failed to reset file position: %w", err) + } - // Upload up to 10 parts concurrently - u.Concurrency = 10 + // Create uploader with custom options + uploader := manager.NewUploader(s.client, func(u *manager.Uploader) { + // Part size: 10MB + u.PartSize = 10 * 1024 * 1024 - // Leave parts on failure for debugging - u.LeavePartsOnError = false + // Upload up to 10 parts concurrently + u.Concurrency = 10 + + // Leave parts on failure for debugging + u.LeavePartsOnError = false + }) + + // Wrap file with progress reader + var reader io.Reader = file + if progress != nil { + reader = NewProgressReader(file, fileSize, progress) + } + + // Upload with multipart + _, err := uploader.Upload(ctx, &s3.PutObjectInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(key), + Body: reader, + }) + + if err != nil { + return fmt.Errorf("multipart upload failed: %w", err) + } + + return nil + }, func(err error, duration time.Duration) { + fmt.Printf("[S3] Multipart upload retry in %v: %v\n", duration, err) }) - - // Wrap file with progress reader - var reader io.Reader = file - if progress != nil { - reader = NewProgressReader(file, fileSize, progress) - } - - // Upload with multipart - _, err := uploader.Upload(ctx, &s3.PutObjectInput{ - Bucket: aws.String(s.bucket), - Key: aws.String(key), - Body: reader, - }) - - if err != nil { - return fmt.Errorf("multipart upload failed: %w", err) - } - - return nil } -// Download downloads a file from S3 +// Download downloads a file from S3 with retry func (s *S3Backend) Download(ctx context.Context, remotePath, localPath string, progress ProgressCallback) error { // Build S3 key key := s.buildKey(remotePath) @@ -190,39 +209,44 @@ func (s *S3Backend) Download(ctx context.Context, remotePath, localPath string, return fmt.Errorf("failed to get object size: %w", err) } - // Download from S3 - result, err := s.client.GetObject(ctx, &s3.GetObjectInput{ - Bucket: aws.String(s.bucket), - Key: aws.String(key), - }) - if err != nil { - return fmt.Errorf("failed to download from S3: %w", err) - } - defer result.Body.Close() - - // Create local file + // Create directory for local file if err := os.MkdirAll(filepath.Dir(localPath), 0755); err != nil { return fmt.Errorf("failed to create directory: %w", err) } - outFile, err := os.Create(localPath) - if err != nil { - return fmt.Errorf("failed to create local file: %w", err) - } - defer outFile.Close() + return RetryOperationWithNotify(ctx, DefaultRetryConfig(), func() error { + // Download from S3 + result, err := s.client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(key), + }) + if err != nil { + return fmt.Errorf("failed to download from S3: %w", err) + } + defer result.Body.Close() - // Copy with progress tracking - var reader io.Reader = result.Body - if progress != nil { - reader = NewProgressReader(result.Body, size, progress) - } + // Create/truncate local file + outFile, err := os.Create(localPath) + if err != nil { + return fmt.Errorf("failed to create local file: %w", err) + } + defer outFile.Close() - _, err = io.Copy(outFile, reader) - if err != nil { - return fmt.Errorf("failed to write file: %w", err) - } + // Copy with progress tracking + var reader io.Reader = result.Body + if progress != nil { + reader = NewProgressReader(result.Body, size, progress) + } - return nil + _, err = io.Copy(outFile, reader) + if err != nil { + return fmt.Errorf("failed to write file: %w", err) + } + + return nil + }, func(err error, duration time.Duration) { + fmt.Printf("[S3] Download retry in %v: %v\n", duration, err) + }) } // List lists all backup files in S3 diff --git a/main.go b/main.go index 90cf1d0..6d7ba8b 100755 --- a/main.go +++ b/main.go @@ -16,7 +16,7 @@ import ( // Build information (set by ldflags) var ( - version = "3.42.32" + version = "3.42.33" buildTime = "unknown" gitCommit = "unknown" )