- Exponential backoff retry for all cloud operations (S3, Azure, GCS) - RetryConfig presets: Default (5x), Aggressive (10x), Quick (3x) - Smart error classification: IsPermanentError, IsRetryableError - Automatic file position reset on upload retry - Retry logging with wait duration - Multipart uploads use aggressive retry (more tolerance)
397 lines
9.6 KiB
Go
397 lines
9.6 KiB
Go
package cloud
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/aws/aws-sdk-go-v2/aws"
|
|
"github.com/aws/aws-sdk-go-v2/config"
|
|
"github.com/aws/aws-sdk-go-v2/credentials"
|
|
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
|
|
"github.com/aws/aws-sdk-go-v2/service/s3"
|
|
)
|
|
|
|
// S3Backend implements the Backend interface for AWS S3 and compatible services
|
|
type S3Backend struct {
|
|
client *s3.Client
|
|
bucket string
|
|
prefix string
|
|
config *Config
|
|
}
|
|
|
|
// NewS3Backend creates a new S3 backend
|
|
func NewS3Backend(cfg *Config) (*S3Backend, error) {
|
|
if err := cfg.Validate(); err != nil {
|
|
return nil, fmt.Errorf("invalid config: %w", err)
|
|
}
|
|
|
|
ctx := context.Background()
|
|
|
|
// Build AWS config
|
|
var awsCfg aws.Config
|
|
var err error
|
|
|
|
if cfg.AccessKey != "" && cfg.SecretKey != "" {
|
|
// Use explicit credentials
|
|
credsProvider := credentials.NewStaticCredentialsProvider(
|
|
cfg.AccessKey,
|
|
cfg.SecretKey,
|
|
"",
|
|
)
|
|
|
|
awsCfg, err = config.LoadDefaultConfig(ctx,
|
|
config.WithCredentialsProvider(credsProvider),
|
|
config.WithRegion(cfg.Region),
|
|
)
|
|
} else {
|
|
// Use default credential chain (environment, IAM role, etc.)
|
|
awsCfg, err = config.LoadDefaultConfig(ctx,
|
|
config.WithRegion(cfg.Region),
|
|
)
|
|
}
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to load AWS config: %w", err)
|
|
}
|
|
|
|
// Create S3 client with custom options
|
|
clientOptions := []func(*s3.Options){
|
|
func(o *s3.Options) {
|
|
if cfg.Endpoint != "" {
|
|
o.BaseEndpoint = aws.String(cfg.Endpoint)
|
|
}
|
|
if cfg.PathStyle {
|
|
o.UsePathStyle = true
|
|
}
|
|
},
|
|
}
|
|
|
|
client := s3.NewFromConfig(awsCfg, clientOptions...)
|
|
|
|
return &S3Backend{
|
|
client: client,
|
|
bucket: cfg.Bucket,
|
|
prefix: cfg.Prefix,
|
|
config: cfg,
|
|
}, nil
|
|
}
|
|
|
|
// Name returns the backend name
|
|
func (s *S3Backend) Name() string {
|
|
return "s3"
|
|
}
|
|
|
|
// buildKey creates the full S3 key from filename
|
|
func (s *S3Backend) buildKey(filename string) string {
|
|
if s.prefix == "" {
|
|
return filename
|
|
}
|
|
return filepath.Join(s.prefix, filename)
|
|
}
|
|
|
|
// Upload uploads a file to S3 with multipart support for large files
|
|
func (s *S3Backend) Upload(ctx context.Context, localPath, remotePath string, progress ProgressCallback) error {
|
|
// Open local file
|
|
file, err := os.Open(localPath)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to open file: %w", err)
|
|
}
|
|
defer file.Close()
|
|
|
|
// Get file size
|
|
stat, err := file.Stat()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to stat file: %w", err)
|
|
}
|
|
fileSize := stat.Size()
|
|
|
|
// Build S3 key
|
|
key := s.buildKey(remotePath)
|
|
|
|
// Use multipart upload for files larger than 100MB
|
|
const multipartThreshold = 100 * 1024 * 1024 // 100 MB
|
|
|
|
if fileSize > multipartThreshold {
|
|
return s.uploadMultipart(ctx, file, key, fileSize, progress)
|
|
}
|
|
|
|
// Simple upload for smaller files
|
|
return s.uploadSimple(ctx, file, key, fileSize, progress)
|
|
}
|
|
|
|
// uploadSimple performs a simple single-part upload with retry
|
|
func (s *S3Backend) uploadSimple(ctx context.Context, file *os.File, key string, fileSize int64, progress ProgressCallback) error {
|
|
return RetryOperationWithNotify(ctx, DefaultRetryConfig(), func() error {
|
|
// Reset file position for retry
|
|
if _, err := file.Seek(0, 0); err != nil {
|
|
return fmt.Errorf("failed to reset file position: %w", err)
|
|
}
|
|
|
|
// Create progress reader
|
|
var reader io.Reader = file
|
|
if progress != nil {
|
|
reader = NewProgressReader(file, fileSize, progress)
|
|
}
|
|
|
|
// Upload to S3
|
|
_, err := s.client.PutObject(ctx, &s3.PutObjectInput{
|
|
Bucket: aws.String(s.bucket),
|
|
Key: aws.String(key),
|
|
Body: reader,
|
|
})
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("failed to upload to S3: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}, func(err error, duration time.Duration) {
|
|
fmt.Printf("[S3] Upload retry in %v: %v\n", duration, err)
|
|
})
|
|
}
|
|
|
|
// uploadMultipart performs a multipart upload for large files with retry
|
|
func (s *S3Backend) uploadMultipart(ctx context.Context, file *os.File, key string, fileSize int64, progress ProgressCallback) error {
|
|
return RetryOperationWithNotify(ctx, AggressiveRetryConfig(), func() error {
|
|
// Reset file position for retry
|
|
if _, err := file.Seek(0, 0); err != nil {
|
|
return fmt.Errorf("failed to reset file position: %w", err)
|
|
}
|
|
|
|
// Create uploader with custom options
|
|
uploader := manager.NewUploader(s.client, func(u *manager.Uploader) {
|
|
// Part size: 10MB
|
|
u.PartSize = 10 * 1024 * 1024
|
|
|
|
// Upload up to 10 parts concurrently
|
|
u.Concurrency = 10
|
|
|
|
// Leave parts on failure for debugging
|
|
u.LeavePartsOnError = false
|
|
})
|
|
|
|
// Wrap file with progress reader
|
|
var reader io.Reader = file
|
|
if progress != nil {
|
|
reader = NewProgressReader(file, fileSize, progress)
|
|
}
|
|
|
|
// Upload with multipart
|
|
_, err := uploader.Upload(ctx, &s3.PutObjectInput{
|
|
Bucket: aws.String(s.bucket),
|
|
Key: aws.String(key),
|
|
Body: reader,
|
|
})
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("multipart upload failed: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}, func(err error, duration time.Duration) {
|
|
fmt.Printf("[S3] Multipart upload retry in %v: %v\n", duration, err)
|
|
})
|
|
}
|
|
|
|
// Download downloads a file from S3 with retry
|
|
func (s *S3Backend) Download(ctx context.Context, remotePath, localPath string, progress ProgressCallback) error {
|
|
// Build S3 key
|
|
key := s.buildKey(remotePath)
|
|
|
|
// Get object size first
|
|
size, err := s.GetSize(ctx, remotePath)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get object size: %w", err)
|
|
}
|
|
|
|
// Create directory for local file
|
|
if err := os.MkdirAll(filepath.Dir(localPath), 0755); err != nil {
|
|
return fmt.Errorf("failed to create directory: %w", err)
|
|
}
|
|
|
|
return RetryOperationWithNotify(ctx, DefaultRetryConfig(), func() error {
|
|
// Download from S3
|
|
result, err := s.client.GetObject(ctx, &s3.GetObjectInput{
|
|
Bucket: aws.String(s.bucket),
|
|
Key: aws.String(key),
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to download from S3: %w", err)
|
|
}
|
|
defer result.Body.Close()
|
|
|
|
// Create/truncate local file
|
|
outFile, err := os.Create(localPath)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create local file: %w", err)
|
|
}
|
|
defer outFile.Close()
|
|
|
|
// Copy with progress tracking
|
|
var reader io.Reader = result.Body
|
|
if progress != nil {
|
|
reader = NewProgressReader(result.Body, size, progress)
|
|
}
|
|
|
|
_, err = io.Copy(outFile, reader)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to write file: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}, func(err error, duration time.Duration) {
|
|
fmt.Printf("[S3] Download retry in %v: %v\n", duration, err)
|
|
})
|
|
}
|
|
|
|
// List lists all backup files in S3
|
|
func (s *S3Backend) List(ctx context.Context, prefix string) ([]BackupInfo, error) {
|
|
// Build full prefix
|
|
fullPrefix := s.buildKey(prefix)
|
|
|
|
// List objects
|
|
result, err := s.client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
|
|
Bucket: aws.String(s.bucket),
|
|
Prefix: aws.String(fullPrefix),
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to list objects: %w", err)
|
|
}
|
|
|
|
// Convert to BackupInfo
|
|
var backups []BackupInfo
|
|
for _, obj := range result.Contents {
|
|
if obj.Key == nil {
|
|
continue
|
|
}
|
|
|
|
key := *obj.Key
|
|
name := filepath.Base(key)
|
|
|
|
// Skip if it's just a directory marker
|
|
if strings.HasSuffix(key, "/") {
|
|
continue
|
|
}
|
|
|
|
info := BackupInfo{
|
|
Key: key,
|
|
Name: name,
|
|
Size: *obj.Size,
|
|
LastModified: *obj.LastModified,
|
|
}
|
|
|
|
if obj.ETag != nil {
|
|
info.ETag = *obj.ETag
|
|
}
|
|
|
|
if obj.StorageClass != "" {
|
|
info.StorageClass = string(obj.StorageClass)
|
|
} else {
|
|
info.StorageClass = "STANDARD"
|
|
}
|
|
|
|
backups = append(backups, info)
|
|
}
|
|
|
|
return backups, nil
|
|
}
|
|
|
|
// Delete deletes a file from S3
|
|
func (s *S3Backend) Delete(ctx context.Context, remotePath string) error {
|
|
key := s.buildKey(remotePath)
|
|
|
|
_, err := s.client.DeleteObject(ctx, &s3.DeleteObjectInput{
|
|
Bucket: aws.String(s.bucket),
|
|
Key: aws.String(key),
|
|
})
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("failed to delete object: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Exists checks if a file exists in S3
|
|
func (s *S3Backend) Exists(ctx context.Context, remotePath string) (bool, error) {
|
|
key := s.buildKey(remotePath)
|
|
|
|
_, err := s.client.HeadObject(ctx, &s3.HeadObjectInput{
|
|
Bucket: aws.String(s.bucket),
|
|
Key: aws.String(key),
|
|
})
|
|
|
|
if err != nil {
|
|
// Check if it's a "not found" error
|
|
if strings.Contains(err.Error(), "NotFound") || strings.Contains(err.Error(), "404") {
|
|
return false, nil
|
|
}
|
|
return false, fmt.Errorf("failed to check object existence: %w", err)
|
|
}
|
|
|
|
return true, nil
|
|
}
|
|
|
|
// GetSize returns the size of a remote file
|
|
func (s *S3Backend) GetSize(ctx context.Context, remotePath string) (int64, error) {
|
|
key := s.buildKey(remotePath)
|
|
|
|
result, err := s.client.HeadObject(ctx, &s3.HeadObjectInput{
|
|
Bucket: aws.String(s.bucket),
|
|
Key: aws.String(key),
|
|
})
|
|
|
|
if err != nil {
|
|
return 0, fmt.Errorf("failed to get object metadata: %w", err)
|
|
}
|
|
|
|
if result.ContentLength == nil {
|
|
return 0, fmt.Errorf("content length not available")
|
|
}
|
|
|
|
return *result.ContentLength, nil
|
|
}
|
|
|
|
// BucketExists checks if the bucket exists and is accessible
|
|
func (s *S3Backend) BucketExists(ctx context.Context) (bool, error) {
|
|
_, err := s.client.HeadBucket(ctx, &s3.HeadBucketInput{
|
|
Bucket: aws.String(s.bucket),
|
|
})
|
|
|
|
if err != nil {
|
|
if strings.Contains(err.Error(), "NotFound") || strings.Contains(err.Error(), "404") {
|
|
return false, nil
|
|
}
|
|
return false, fmt.Errorf("failed to check bucket: %w", err)
|
|
}
|
|
|
|
return true, nil
|
|
}
|
|
|
|
// CreateBucket creates the bucket if it doesn't exist
|
|
func (s *S3Backend) CreateBucket(ctx context.Context) error {
|
|
exists, err := s.BucketExists(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if exists {
|
|
return nil
|
|
}
|
|
|
|
_, err = s.client.CreateBucket(ctx, &s3.CreateBucketInput{
|
|
Bucket: aws.String(s.bucket),
|
|
})
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create bucket: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|