Files
dbbackup/internal/cloud/s3.go
Renz 8929004abc feat: v2.0 Sprint 3 - Multipart Upload, Testing & Documentation (Part 2)
Sprint 3 Complete - Cloud Storage Full Implementation:

New Features:
 Multipart upload for large files (>100MB)
 Automatic part size (10MB) and concurrency (10 parts)
 MinIO testing infrastructure
 Comprehensive integration test script
 Complete cloud storage documentation

New Files:
- CLOUD.md - Complete cloud storage guide (580+ lines)
- docker-compose.minio.yml - MinIO + PostgreSQL + MySQL test setup
- scripts/test_cloud_storage.sh - Full integration test suite

Multipart Upload:
- Automatic for files >100MB
- 10MB part size for optimal performance
- 10 concurrent parts for faster uploads
- Progress tracking for multipart transfers
- AWS S3 Upload Manager integration

Testing Infrastructure:
- docker-compose.minio.yml:
  * MinIO S3-compatible storage
  * PostgreSQL 16 test database
  * MySQL 8.0 test database
  * Automatic bucket creation
  * Health checks for all services

- test_cloud_storage.sh (14 test scenarios):
  1. Service startup and health checks
  2. Test database creation with sample data
  3. Local backup creation
  4. Cloud upload to MinIO
  5. Cloud list verification
  6. Backup with cloud URI
  7. Database drop for restore test
  8. Restore from cloud URI
  9. Data verification after restore
  10. Cloud backup integrity verification
  11. Cleanup dry-run test
  12. Multiple backups creation
  13. Actual cleanup test
  14. Large file multipart upload (>100MB)

Documentation (CLOUD.md):
- Quick start guide
- URI syntax documentation
- Configuration methods (4 approaches)
- All cloud commands with examples
- Provider-specific setup (AWS S3, MinIO, B2, GCS)
- Multipart upload details
- Progress tracking
- Metadata synchronization
- Best practices (security, performance, reliability)
- Troubleshooting guide
- Real-world examples
- FAQ section

Sprint 3 COMPLETE!
Total implementation: 100% of requirements met

Cloud storage features now at 100%:
 URI parser and support
 Backup/restore/verify/cleanup integration
 Multipart uploads
 Testing infrastructure
 Comprehensive documentation
2025-11-25 20:39:34 +00:00

373 lines
8.6 KiB
Go

package cloud
import (
"context"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
)
// S3Backend implements the Backend interface for AWS S3 and compatible services
type S3Backend struct {
client *s3.Client
bucket string
prefix string
config *Config
}
// NewS3Backend creates a new S3 backend
func NewS3Backend(cfg *Config) (*S3Backend, error) {
if err := cfg.Validate(); err != nil {
return nil, fmt.Errorf("invalid config: %w", err)
}
ctx := context.Background()
// Build AWS config
var awsCfg aws.Config
var err error
if cfg.AccessKey != "" && cfg.SecretKey != "" {
// Use explicit credentials
credsProvider := credentials.NewStaticCredentialsProvider(
cfg.AccessKey,
cfg.SecretKey,
"",
)
awsCfg, err = config.LoadDefaultConfig(ctx,
config.WithCredentialsProvider(credsProvider),
config.WithRegion(cfg.Region),
)
} else {
// Use default credential chain (environment, IAM role, etc.)
awsCfg, err = config.LoadDefaultConfig(ctx,
config.WithRegion(cfg.Region),
)
}
if err != nil {
return nil, fmt.Errorf("failed to load AWS config: %w", err)
}
// Create S3 client with custom options
clientOptions := []func(*s3.Options){
func(o *s3.Options) {
if cfg.Endpoint != "" {
o.BaseEndpoint = aws.String(cfg.Endpoint)
}
if cfg.PathStyle {
o.UsePathStyle = true
}
},
}
client := s3.NewFromConfig(awsCfg, clientOptions...)
return &S3Backend{
client: client,
bucket: cfg.Bucket,
prefix: cfg.Prefix,
config: cfg,
}, nil
}
// Name returns the backend name
func (s *S3Backend) Name() string {
return "s3"
}
// buildKey creates the full S3 key from filename
func (s *S3Backend) buildKey(filename string) string {
if s.prefix == "" {
return filename
}
return filepath.Join(s.prefix, filename)
}
// Upload uploads a file to S3 with multipart support for large files
func (s *S3Backend) Upload(ctx context.Context, localPath, remotePath string, progress ProgressCallback) error {
// Open local file
file, err := os.Open(localPath)
if err != nil {
return fmt.Errorf("failed to open file: %w", err)
}
defer file.Close()
// Get file size
stat, err := file.Stat()
if err != nil {
return fmt.Errorf("failed to stat file: %w", err)
}
fileSize := stat.Size()
// Build S3 key
key := s.buildKey(remotePath)
// Use multipart upload for files larger than 100MB
const multipartThreshold = 100 * 1024 * 1024 // 100 MB
if fileSize > multipartThreshold {
return s.uploadMultipart(ctx, file, key, fileSize, progress)
}
// Simple upload for smaller files
return s.uploadSimple(ctx, file, key, fileSize, progress)
}
// uploadSimple performs a simple single-part upload
func (s *S3Backend) uploadSimple(ctx context.Context, file *os.File, key string, fileSize int64, progress ProgressCallback) error {
// Create progress reader
var reader io.Reader = file
if progress != nil {
reader = NewProgressReader(file, fileSize, progress)
}
// Upload to S3
_, err := s.client.PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(key),
Body: reader,
})
if err != nil {
return fmt.Errorf("failed to upload to S3: %w", err)
}
return nil
}
// uploadMultipart performs a multipart upload for large files
func (s *S3Backend) uploadMultipart(ctx context.Context, file *os.File, key string, fileSize int64, progress ProgressCallback) error {
// Create uploader with custom options
uploader := manager.NewUploader(s.client, func(u *manager.Uploader) {
// Part size: 10MB
u.PartSize = 10 * 1024 * 1024
// Upload up to 10 parts concurrently
u.Concurrency = 10
// Leave parts on failure for debugging
u.LeavePartsOnError = false
})
// Wrap file with progress reader
var reader io.Reader = file
if progress != nil {
reader = NewProgressReader(file, fileSize, progress)
}
// Upload with multipart
_, err := uploader.Upload(ctx, &s3.PutObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(key),
Body: reader,
})
if err != nil {
return fmt.Errorf("multipart upload failed: %w", err)
}
return nil
}
// Download downloads a file from S3
func (s *S3Backend) Download(ctx context.Context, remotePath, localPath string, progress ProgressCallback) error {
// Build S3 key
key := s.buildKey(remotePath)
// Get object size first
size, err := s.GetSize(ctx, remotePath)
if err != nil {
return fmt.Errorf("failed to get object size: %w", err)
}
// Download from S3
result, err := s.client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(key),
})
if err != nil {
return fmt.Errorf("failed to download from S3: %w", err)
}
defer result.Body.Close()
// Create local file
if err := os.MkdirAll(filepath.Dir(localPath), 0755); err != nil {
return fmt.Errorf("failed to create directory: %w", err)
}
outFile, err := os.Create(localPath)
if err != nil {
return fmt.Errorf("failed to create local file: %w", err)
}
defer outFile.Close()
// Copy with progress tracking
var reader io.Reader = result.Body
if progress != nil {
reader = NewProgressReader(result.Body, size, progress)
}
_, err = io.Copy(outFile, reader)
if err != nil {
return fmt.Errorf("failed to write file: %w", err)
}
return nil
}
// List lists all backup files in S3
func (s *S3Backend) List(ctx context.Context, prefix string) ([]BackupInfo, error) {
// Build full prefix
fullPrefix := s.buildKey(prefix)
// List objects
result, err := s.client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
Bucket: aws.String(s.bucket),
Prefix: aws.String(fullPrefix),
})
if err != nil {
return nil, fmt.Errorf("failed to list objects: %w", err)
}
// Convert to BackupInfo
var backups []BackupInfo
for _, obj := range result.Contents {
if obj.Key == nil {
continue
}
key := *obj.Key
name := filepath.Base(key)
// Skip if it's just a directory marker
if strings.HasSuffix(key, "/") {
continue
}
info := BackupInfo{
Key: key,
Name: name,
Size: *obj.Size,
LastModified: *obj.LastModified,
}
if obj.ETag != nil {
info.ETag = *obj.ETag
}
if obj.StorageClass != "" {
info.StorageClass = string(obj.StorageClass)
} else {
info.StorageClass = "STANDARD"
}
backups = append(backups, info)
}
return backups, nil
}
// Delete deletes a file from S3
func (s *S3Backend) Delete(ctx context.Context, remotePath string) error {
key := s.buildKey(remotePath)
_, err := s.client.DeleteObject(ctx, &s3.DeleteObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(key),
})
if err != nil {
return fmt.Errorf("failed to delete object: %w", err)
}
return nil
}
// Exists checks if a file exists in S3
func (s *S3Backend) Exists(ctx context.Context, remotePath string) (bool, error) {
key := s.buildKey(remotePath)
_, err := s.client.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(key),
})
if err != nil {
// Check if it's a "not found" error
if strings.Contains(err.Error(), "NotFound") || strings.Contains(err.Error(), "404") {
return false, nil
}
return false, fmt.Errorf("failed to check object existence: %w", err)
}
return true, nil
}
// GetSize returns the size of a remote file
func (s *S3Backend) GetSize(ctx context.Context, remotePath string) (int64, error) {
key := s.buildKey(remotePath)
result, err := s.client.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(key),
})
if err != nil {
return 0, fmt.Errorf("failed to get object metadata: %w", err)
}
if result.ContentLength == nil {
return 0, fmt.Errorf("content length not available")
}
return *result.ContentLength, nil
}
// BucketExists checks if the bucket exists and is accessible
func (s *S3Backend) BucketExists(ctx context.Context) (bool, error) {
_, err := s.client.HeadBucket(ctx, &s3.HeadBucketInput{
Bucket: aws.String(s.bucket),
})
if err != nil {
if strings.Contains(err.Error(), "NotFound") || strings.Contains(err.Error(), "404") {
return false, nil
}
return false, fmt.Errorf("failed to check bucket: %w", err)
}
return true, nil
}
// CreateBucket creates the bucket if it doesn't exist
func (s *S3Backend) CreateBucket(ctx context.Context) error {
exists, err := s.BucketExists(ctx)
if err != nil {
return err
}
if exists {
return nil
}
_, err = s.client.CreateBucket(ctx, &s3.CreateBucketInput{
Bucket: aws.String(s.bucket),
})
if err != nil {
return fmt.Errorf("failed to create bucket: %w", err)
}
return nil
}