feat: v2.0 Sprint 2 - Cloud Storage Support (Part 1)
- Add AWS SDK v2 for S3 integration - Implement cloud.Backend interface for multi-provider support - Add full S3 backend with upload/download/list/delete - Support MinIO and Backblaze B2 (S3-compatible) - Implement progress tracking for uploads/downloads - Add cloud commands: upload, download, list, delete New commands: - dbbackup cloud upload [files] - Upload backups to cloud - dbbackup cloud download [remote] [local] - Download from cloud - dbbackup cloud list [prefix] - List cloud backups - dbbackup cloud delete [remote] - Delete from cloud Configuration via flags or environment: - --cloud-provider, --cloud-bucket, --cloud-region - --cloud-endpoint (for MinIO/B2) - --cloud-access-key, --cloud-secret-key New packages: - internal/cloud - Cloud storage abstraction layer
This commit is contained in:
324
internal/cloud/s3.go
Normal file
324
internal/cloud/s3.go
Normal file
@@ -0,0 +1,324 @@
|
||||
package cloud
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
"github.com/aws/aws-sdk-go-v2/config"
|
||||
"github.com/aws/aws-sdk-go-v2/credentials"
|
||||
"github.com/aws/aws-sdk-go-v2/service/s3"
|
||||
)
|
||||
|
||||
// S3Backend implements the Backend interface for AWS S3 and compatible services
|
||||
type S3Backend struct {
|
||||
client *s3.Client
|
||||
bucket string
|
||||
prefix string
|
||||
config *Config
|
||||
}
|
||||
|
||||
// NewS3Backend creates a new S3 backend
|
||||
func NewS3Backend(cfg *Config) (*S3Backend, error) {
|
||||
if err := cfg.Validate(); err != nil {
|
||||
return nil, fmt.Errorf("invalid config: %w", err)
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// Build AWS config
|
||||
var awsCfg aws.Config
|
||||
var err error
|
||||
|
||||
if cfg.AccessKey != "" && cfg.SecretKey != "" {
|
||||
// Use explicit credentials
|
||||
credsProvider := credentials.NewStaticCredentialsProvider(
|
||||
cfg.AccessKey,
|
||||
cfg.SecretKey,
|
||||
"",
|
||||
)
|
||||
|
||||
awsCfg, err = config.LoadDefaultConfig(ctx,
|
||||
config.WithCredentialsProvider(credsProvider),
|
||||
config.WithRegion(cfg.Region),
|
||||
)
|
||||
} else {
|
||||
// Use default credential chain (environment, IAM role, etc.)
|
||||
awsCfg, err = config.LoadDefaultConfig(ctx,
|
||||
config.WithRegion(cfg.Region),
|
||||
)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to load AWS config: %w", err)
|
||||
}
|
||||
|
||||
// Create S3 client with custom options
|
||||
clientOptions := []func(*s3.Options){
|
||||
func(o *s3.Options) {
|
||||
if cfg.Endpoint != "" {
|
||||
o.BaseEndpoint = aws.String(cfg.Endpoint)
|
||||
}
|
||||
if cfg.PathStyle {
|
||||
o.UsePathStyle = true
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
client := s3.NewFromConfig(awsCfg, clientOptions...)
|
||||
|
||||
return &S3Backend{
|
||||
client: client,
|
||||
bucket: cfg.Bucket,
|
||||
prefix: cfg.Prefix,
|
||||
config: cfg,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Name returns the backend name
|
||||
func (s *S3Backend) Name() string {
|
||||
return "s3"
|
||||
}
|
||||
|
||||
// buildKey creates the full S3 key from filename
|
||||
func (s *S3Backend) buildKey(filename string) string {
|
||||
if s.prefix == "" {
|
||||
return filename
|
||||
}
|
||||
return filepath.Join(s.prefix, filename)
|
||||
}
|
||||
|
||||
// Upload uploads a file to S3
|
||||
func (s *S3Backend) Upload(ctx context.Context, localPath, remotePath string, progress ProgressCallback) error {
|
||||
// Open local file
|
||||
file, err := os.Open(localPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open file: %w", err)
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
// Get file size
|
||||
stat, err := file.Stat()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to stat file: %w", err)
|
||||
}
|
||||
fileSize := stat.Size()
|
||||
|
||||
// Create progress reader
|
||||
var reader io.Reader = file
|
||||
if progress != nil {
|
||||
reader = NewProgressReader(file, fileSize, progress)
|
||||
}
|
||||
|
||||
// Build S3 key
|
||||
key := s.buildKey(remotePath)
|
||||
|
||||
// Upload to S3
|
||||
_, err = s.client.PutObject(ctx, &s3.PutObjectInput{
|
||||
Bucket: aws.String(s.bucket),
|
||||
Key: aws.String(key),
|
||||
Body: reader,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to upload to S3: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Download downloads a file from S3
|
||||
func (s *S3Backend) Download(ctx context.Context, remotePath, localPath string, progress ProgressCallback) error {
|
||||
// Build S3 key
|
||||
key := s.buildKey(remotePath)
|
||||
|
||||
// Get object size first
|
||||
size, err := s.GetSize(ctx, remotePath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get object size: %w", err)
|
||||
}
|
||||
|
||||
// Download from S3
|
||||
result, err := s.client.GetObject(ctx, &s3.GetObjectInput{
|
||||
Bucket: aws.String(s.bucket),
|
||||
Key: aws.String(key),
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to download from S3: %w", err)
|
||||
}
|
||||
defer result.Body.Close()
|
||||
|
||||
// Create local file
|
||||
if err := os.MkdirAll(filepath.Dir(localPath), 0755); err != nil {
|
||||
return fmt.Errorf("failed to create directory: %w", err)
|
||||
}
|
||||
|
||||
outFile, err := os.Create(localPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create local file: %w", err)
|
||||
}
|
||||
defer outFile.Close()
|
||||
|
||||
// Copy with progress tracking
|
||||
var reader io.Reader = result.Body
|
||||
if progress != nil {
|
||||
reader = NewProgressReader(result.Body, size, progress)
|
||||
}
|
||||
|
||||
_, err = io.Copy(outFile, reader)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to write file: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// List lists all backup files in S3
|
||||
func (s *S3Backend) List(ctx context.Context, prefix string) ([]BackupInfo, error) {
|
||||
// Build full prefix
|
||||
fullPrefix := s.buildKey(prefix)
|
||||
|
||||
// List objects
|
||||
result, err := s.client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
|
||||
Bucket: aws.String(s.bucket),
|
||||
Prefix: aws.String(fullPrefix),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to list objects: %w", err)
|
||||
}
|
||||
|
||||
// Convert to BackupInfo
|
||||
var backups []BackupInfo
|
||||
for _, obj := range result.Contents {
|
||||
if obj.Key == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
key := *obj.Key
|
||||
name := filepath.Base(key)
|
||||
|
||||
// Skip if it's just a directory marker
|
||||
if strings.HasSuffix(key, "/") {
|
||||
continue
|
||||
}
|
||||
|
||||
info := BackupInfo{
|
||||
Key: key,
|
||||
Name: name,
|
||||
Size: *obj.Size,
|
||||
LastModified: *obj.LastModified,
|
||||
}
|
||||
|
||||
if obj.ETag != nil {
|
||||
info.ETag = *obj.ETag
|
||||
}
|
||||
|
||||
if obj.StorageClass != "" {
|
||||
info.StorageClass = string(obj.StorageClass)
|
||||
} else {
|
||||
info.StorageClass = "STANDARD"
|
||||
}
|
||||
|
||||
backups = append(backups, info)
|
||||
}
|
||||
|
||||
return backups, nil
|
||||
}
|
||||
|
||||
// Delete deletes a file from S3
|
||||
func (s *S3Backend) Delete(ctx context.Context, remotePath string) error {
|
||||
key := s.buildKey(remotePath)
|
||||
|
||||
_, err := s.client.DeleteObject(ctx, &s3.DeleteObjectInput{
|
||||
Bucket: aws.String(s.bucket),
|
||||
Key: aws.String(key),
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to delete object: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Exists checks if a file exists in S3
|
||||
func (s *S3Backend) Exists(ctx context.Context, remotePath string) (bool, error) {
|
||||
key := s.buildKey(remotePath)
|
||||
|
||||
_, err := s.client.HeadObject(ctx, &s3.HeadObjectInput{
|
||||
Bucket: aws.String(s.bucket),
|
||||
Key: aws.String(key),
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
// Check if it's a "not found" error
|
||||
if strings.Contains(err.Error(), "NotFound") || strings.Contains(err.Error(), "404") {
|
||||
return false, nil
|
||||
}
|
||||
return false, fmt.Errorf("failed to check object existence: %w", err)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// GetSize returns the size of a remote file
|
||||
func (s *S3Backend) GetSize(ctx context.Context, remotePath string) (int64, error) {
|
||||
key := s.buildKey(remotePath)
|
||||
|
||||
result, err := s.client.HeadObject(ctx, &s3.HeadObjectInput{
|
||||
Bucket: aws.String(s.bucket),
|
||||
Key: aws.String(key),
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to get object metadata: %w", err)
|
||||
}
|
||||
|
||||
if result.ContentLength == nil {
|
||||
return 0, fmt.Errorf("content length not available")
|
||||
}
|
||||
|
||||
return *result.ContentLength, nil
|
||||
}
|
||||
|
||||
// BucketExists checks if the bucket exists and is accessible
|
||||
func (s *S3Backend) BucketExists(ctx context.Context) (bool, error) {
|
||||
_, err := s.client.HeadBucket(ctx, &s3.HeadBucketInput{
|
||||
Bucket: aws.String(s.bucket),
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), "NotFound") || strings.Contains(err.Error(), "404") {
|
||||
return false, nil
|
||||
}
|
||||
return false, fmt.Errorf("failed to check bucket: %w", err)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// CreateBucket creates the bucket if it doesn't exist
|
||||
func (s *S3Backend) CreateBucket(ctx context.Context) error {
|
||||
exists, err := s.BucketExists(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if exists {
|
||||
return nil
|
||||
}
|
||||
|
||||
_, err = s.client.CreateBucket(ctx, &s3.CreateBucketInput{
|
||||
Bucket: aws.String(s.bucket),
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create bucket: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user