Why wrap external tools when you can BE the tool? New physical backup engines: • MySQL Clone Plugin - native 8.0.17+ physical backup • Filesystem Snapshots - LVM/ZFS/Btrfs orchestration • Binlog Streaming - continuous backup with seconds RPO • Parallel Cloud Upload - stream directly to S3, skip local disk Smart engine selection automatically picks the optimal strategy based on: - MySQL version and edition - Available filesystem features - Database size - Cloud connectivity Zero external dependencies. Single binary. Enterprise capabilities. Commercial backup vendors: we need to talk.
630 lines
13 KiB
Go
630 lines
13 KiB
Go
// Package parallel provides parallel cloud streaming capabilities
|
|
package parallel
|
|
|
|
import (
|
|
"context"
|
|
"crypto/sha256"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"io"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/aws/aws-sdk-go-v2/aws"
|
|
"github.com/aws/aws-sdk-go-v2/config"
|
|
"github.com/aws/aws-sdk-go-v2/service/s3"
|
|
"github.com/aws/aws-sdk-go-v2/service/s3/types"
|
|
)
|
|
|
|
// Config holds parallel upload configuration
|
|
type Config struct {
|
|
// Bucket is the S3 bucket name
|
|
Bucket string
|
|
|
|
// Key is the object key
|
|
Key string
|
|
|
|
// Region is the AWS region
|
|
Region string
|
|
|
|
// Endpoint is optional custom endpoint (for MinIO, etc.)
|
|
Endpoint string
|
|
|
|
// PartSize is the size of each part (default 10MB)
|
|
PartSize int64
|
|
|
|
// WorkerCount is the number of parallel upload workers
|
|
WorkerCount int
|
|
|
|
// BufferSize is the size of the part channel buffer
|
|
BufferSize int
|
|
|
|
// ChecksumEnabled enables SHA256 checksums per part
|
|
ChecksumEnabled bool
|
|
|
|
// RetryCount is the number of retries per part
|
|
RetryCount int
|
|
|
|
// RetryDelay is the delay between retries
|
|
RetryDelay time.Duration
|
|
|
|
// ServerSideEncryption sets the encryption algorithm
|
|
ServerSideEncryption string
|
|
|
|
// KMSKeyID is the KMS key for encryption
|
|
KMSKeyID string
|
|
}
|
|
|
|
// DefaultConfig returns default configuration
|
|
func DefaultConfig() Config {
|
|
return Config{
|
|
PartSize: 10 * 1024 * 1024, // 10MB
|
|
WorkerCount: 4,
|
|
BufferSize: 8,
|
|
ChecksumEnabled: true,
|
|
RetryCount: 3,
|
|
RetryDelay: time.Second,
|
|
}
|
|
}
|
|
|
|
// part represents a part to upload
|
|
type part struct {
|
|
Number int32
|
|
Data []byte
|
|
Hash string
|
|
}
|
|
|
|
// partResult represents the result of uploading a part
|
|
type partResult struct {
|
|
Number int32
|
|
ETag string
|
|
Error error
|
|
}
|
|
|
|
// CloudStreamer provides parallel streaming uploads to S3
|
|
type CloudStreamer struct {
|
|
cfg Config
|
|
client *s3.Client
|
|
|
|
mu sync.Mutex
|
|
uploadID string
|
|
key string
|
|
|
|
// Channels for worker pool
|
|
partsCh chan part
|
|
resultsCh chan partResult
|
|
workers sync.WaitGroup
|
|
cancel context.CancelFunc
|
|
|
|
// Current part buffer
|
|
buffer []byte
|
|
bufferLen int
|
|
partNumber int32
|
|
|
|
// Results tracking
|
|
results map[int32]string // partNumber -> ETag
|
|
resultsMu sync.RWMutex
|
|
uploadErrors []error
|
|
|
|
// Metrics
|
|
bytesUploaded int64
|
|
partsUploaded int64
|
|
startTime time.Time
|
|
}
|
|
|
|
// NewCloudStreamer creates a new parallel cloud streamer
|
|
func NewCloudStreamer(cfg Config) (*CloudStreamer, error) {
|
|
if cfg.Bucket == "" {
|
|
return nil, fmt.Errorf("bucket required")
|
|
}
|
|
if cfg.Key == "" {
|
|
return nil, fmt.Errorf("key required")
|
|
}
|
|
|
|
// Apply defaults
|
|
if cfg.PartSize == 0 {
|
|
cfg.PartSize = 10 * 1024 * 1024
|
|
}
|
|
if cfg.WorkerCount == 0 {
|
|
cfg.WorkerCount = 4
|
|
}
|
|
if cfg.BufferSize == 0 {
|
|
cfg.BufferSize = cfg.WorkerCount * 2
|
|
}
|
|
if cfg.RetryCount == 0 {
|
|
cfg.RetryCount = 3
|
|
}
|
|
|
|
// Load AWS config
|
|
opts := []func(*config.LoadOptions) error{
|
|
config.WithRegion(cfg.Region),
|
|
}
|
|
|
|
awsCfg, err := config.LoadDefaultConfig(context.Background(), opts...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to load AWS config: %w", err)
|
|
}
|
|
|
|
// Create S3 client
|
|
clientOpts := []func(*s3.Options){}
|
|
if cfg.Endpoint != "" {
|
|
clientOpts = append(clientOpts, func(o *s3.Options) {
|
|
o.BaseEndpoint = aws.String(cfg.Endpoint)
|
|
o.UsePathStyle = true
|
|
})
|
|
}
|
|
|
|
client := s3.NewFromConfig(awsCfg, clientOpts...)
|
|
|
|
return &CloudStreamer{
|
|
cfg: cfg,
|
|
client: client,
|
|
buffer: make([]byte, cfg.PartSize),
|
|
results: make(map[int32]string),
|
|
}, nil
|
|
}
|
|
|
|
// Start initiates the multipart upload and starts workers
|
|
func (cs *CloudStreamer) Start(ctx context.Context) error {
|
|
cs.mu.Lock()
|
|
defer cs.mu.Unlock()
|
|
|
|
cs.startTime = time.Now()
|
|
|
|
// Create multipart upload
|
|
input := &s3.CreateMultipartUploadInput{
|
|
Bucket: aws.String(cs.cfg.Bucket),
|
|
Key: aws.String(cs.cfg.Key),
|
|
}
|
|
|
|
if cs.cfg.ServerSideEncryption != "" {
|
|
input.ServerSideEncryption = types.ServerSideEncryption(cs.cfg.ServerSideEncryption)
|
|
}
|
|
if cs.cfg.KMSKeyID != "" {
|
|
input.SSEKMSKeyId = aws.String(cs.cfg.KMSKeyID)
|
|
}
|
|
|
|
result, err := cs.client.CreateMultipartUpload(ctx, input)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create multipart upload: %w", err)
|
|
}
|
|
|
|
cs.uploadID = *result.UploadId
|
|
cs.key = *result.Key
|
|
|
|
// Create channels
|
|
cs.partsCh = make(chan part, cs.cfg.BufferSize)
|
|
cs.resultsCh = make(chan partResult, cs.cfg.BufferSize)
|
|
|
|
// Create cancellable context
|
|
workerCtx, cancel := context.WithCancel(ctx)
|
|
cs.cancel = cancel
|
|
|
|
// Start workers
|
|
for i := 0; i < cs.cfg.WorkerCount; i++ {
|
|
cs.workers.Add(1)
|
|
go cs.worker(workerCtx, i)
|
|
}
|
|
|
|
// Start result collector
|
|
go cs.collectResults()
|
|
|
|
return nil
|
|
}
|
|
|
|
// worker uploads parts from the channel
|
|
func (cs *CloudStreamer) worker(ctx context.Context, id int) {
|
|
defer cs.workers.Done()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case p, ok := <-cs.partsCh:
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
etag, err := cs.uploadPart(ctx, p)
|
|
cs.resultsCh <- partResult{
|
|
Number: p.Number,
|
|
ETag: etag,
|
|
Error: err,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// uploadPart uploads a single part with retries
|
|
func (cs *CloudStreamer) uploadPart(ctx context.Context, p part) (string, error) {
|
|
var lastErr error
|
|
|
|
for attempt := 0; attempt <= cs.cfg.RetryCount; attempt++ {
|
|
if attempt > 0 {
|
|
select {
|
|
case <-ctx.Done():
|
|
return "", ctx.Err()
|
|
case <-time.After(cs.cfg.RetryDelay * time.Duration(attempt)):
|
|
}
|
|
}
|
|
|
|
input := &s3.UploadPartInput{
|
|
Bucket: aws.String(cs.cfg.Bucket),
|
|
Key: aws.String(cs.cfg.Key),
|
|
UploadId: aws.String(cs.uploadID),
|
|
PartNumber: aws.Int32(p.Number),
|
|
Body: newBytesReader(p.Data),
|
|
}
|
|
|
|
result, err := cs.client.UploadPart(ctx, input)
|
|
if err != nil {
|
|
lastErr = err
|
|
continue
|
|
}
|
|
|
|
atomic.AddInt64(&cs.bytesUploaded, int64(len(p.Data)))
|
|
atomic.AddInt64(&cs.partsUploaded, 1)
|
|
|
|
return *result.ETag, nil
|
|
}
|
|
|
|
return "", fmt.Errorf("failed after %d retries: %w", cs.cfg.RetryCount, lastErr)
|
|
}
|
|
|
|
// collectResults collects results from workers
|
|
func (cs *CloudStreamer) collectResults() {
|
|
for result := range cs.resultsCh {
|
|
cs.resultsMu.Lock()
|
|
if result.Error != nil {
|
|
cs.uploadErrors = append(cs.uploadErrors, result.Error)
|
|
} else {
|
|
cs.results[result.Number] = result.ETag
|
|
}
|
|
cs.resultsMu.Unlock()
|
|
}
|
|
}
|
|
|
|
// Write implements io.Writer for streaming data
|
|
func (cs *CloudStreamer) Write(p []byte) (int, error) {
|
|
written := 0
|
|
|
|
for len(p) > 0 {
|
|
// Calculate how much we can write to the buffer
|
|
available := int(cs.cfg.PartSize) - cs.bufferLen
|
|
toWrite := len(p)
|
|
if toWrite > available {
|
|
toWrite = available
|
|
}
|
|
|
|
// Copy to buffer
|
|
copy(cs.buffer[cs.bufferLen:], p[:toWrite])
|
|
cs.bufferLen += toWrite
|
|
written += toWrite
|
|
p = p[toWrite:]
|
|
|
|
// If buffer is full, send part
|
|
if cs.bufferLen >= int(cs.cfg.PartSize) {
|
|
if err := cs.sendPart(); err != nil {
|
|
return written, err
|
|
}
|
|
}
|
|
}
|
|
|
|
return written, nil
|
|
}
|
|
|
|
// sendPart sends the current buffer as a part
|
|
func (cs *CloudStreamer) sendPart() error {
|
|
if cs.bufferLen == 0 {
|
|
return nil
|
|
}
|
|
|
|
cs.partNumber++
|
|
|
|
// Copy buffer data
|
|
data := make([]byte, cs.bufferLen)
|
|
copy(data, cs.buffer[:cs.bufferLen])
|
|
|
|
// Calculate hash if enabled
|
|
var hash string
|
|
if cs.cfg.ChecksumEnabled {
|
|
h := sha256.Sum256(data)
|
|
hash = hex.EncodeToString(h[:])
|
|
}
|
|
|
|
// Send to workers
|
|
cs.partsCh <- part{
|
|
Number: cs.partNumber,
|
|
Data: data,
|
|
Hash: hash,
|
|
}
|
|
|
|
// Reset buffer
|
|
cs.bufferLen = 0
|
|
|
|
return nil
|
|
}
|
|
|
|
// Complete finishes the upload
|
|
func (cs *CloudStreamer) Complete(ctx context.Context) (string, error) {
|
|
// Send any remaining data
|
|
if cs.bufferLen > 0 {
|
|
if err := cs.sendPart(); err != nil {
|
|
return "", err
|
|
}
|
|
}
|
|
|
|
// Close parts channel and wait for workers
|
|
close(cs.partsCh)
|
|
cs.workers.Wait()
|
|
close(cs.resultsCh)
|
|
|
|
// Check for errors
|
|
cs.resultsMu.RLock()
|
|
if len(cs.uploadErrors) > 0 {
|
|
err := cs.uploadErrors[0]
|
|
cs.resultsMu.RUnlock()
|
|
// Abort upload
|
|
cs.abort(ctx)
|
|
return "", err
|
|
}
|
|
|
|
// Build completed parts list
|
|
parts := make([]types.CompletedPart, 0, len(cs.results))
|
|
for num, etag := range cs.results {
|
|
parts = append(parts, types.CompletedPart{
|
|
PartNumber: aws.Int32(num),
|
|
ETag: aws.String(etag),
|
|
})
|
|
}
|
|
cs.resultsMu.RUnlock()
|
|
|
|
// Sort parts by number
|
|
sortParts(parts)
|
|
|
|
// Complete multipart upload
|
|
result, err := cs.client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{
|
|
Bucket: aws.String(cs.cfg.Bucket),
|
|
Key: aws.String(cs.cfg.Key),
|
|
UploadId: aws.String(cs.uploadID),
|
|
MultipartUpload: &types.CompletedMultipartUpload{
|
|
Parts: parts,
|
|
},
|
|
})
|
|
if err != nil {
|
|
cs.abort(ctx)
|
|
return "", fmt.Errorf("failed to complete upload: %w", err)
|
|
}
|
|
|
|
location := ""
|
|
if result.Location != nil {
|
|
location = *result.Location
|
|
}
|
|
|
|
return location, nil
|
|
}
|
|
|
|
// abort aborts the multipart upload
|
|
func (cs *CloudStreamer) abort(ctx context.Context) {
|
|
if cs.uploadID == "" {
|
|
return
|
|
}
|
|
|
|
cs.client.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{
|
|
Bucket: aws.String(cs.cfg.Bucket),
|
|
Key: aws.String(cs.cfg.Key),
|
|
UploadId: aws.String(cs.uploadID),
|
|
})
|
|
}
|
|
|
|
// Cancel cancels the upload
|
|
func (cs *CloudStreamer) Cancel() error {
|
|
if cs.cancel != nil {
|
|
cs.cancel()
|
|
}
|
|
cs.abort(context.Background())
|
|
return nil
|
|
}
|
|
|
|
// Progress returns upload progress
|
|
func (cs *CloudStreamer) Progress() Progress {
|
|
return Progress{
|
|
BytesUploaded: atomic.LoadInt64(&cs.bytesUploaded),
|
|
PartsUploaded: atomic.LoadInt64(&cs.partsUploaded),
|
|
TotalParts: int64(cs.partNumber),
|
|
Duration: time.Since(cs.startTime),
|
|
}
|
|
}
|
|
|
|
// Progress represents upload progress
|
|
type Progress struct {
|
|
BytesUploaded int64
|
|
PartsUploaded int64
|
|
TotalParts int64
|
|
Duration time.Duration
|
|
}
|
|
|
|
// Speed returns the upload speed in bytes per second
|
|
func (p Progress) Speed() float64 {
|
|
if p.Duration == 0 {
|
|
return 0
|
|
}
|
|
return float64(p.BytesUploaded) / p.Duration.Seconds()
|
|
}
|
|
|
|
// bytesReader wraps a byte slice as an io.ReadSeekCloser
|
|
type bytesReader struct {
|
|
data []byte
|
|
pos int
|
|
}
|
|
|
|
func newBytesReader(data []byte) *bytesReader {
|
|
return &bytesReader{data: data}
|
|
}
|
|
|
|
func (r *bytesReader) Read(p []byte) (int, error) {
|
|
if r.pos >= len(r.data) {
|
|
return 0, io.EOF
|
|
}
|
|
n := copy(p, r.data[r.pos:])
|
|
r.pos += n
|
|
return n, nil
|
|
}
|
|
|
|
func (r *bytesReader) Seek(offset int64, whence int) (int64, error) {
|
|
var newPos int64
|
|
switch whence {
|
|
case io.SeekStart:
|
|
newPos = offset
|
|
case io.SeekCurrent:
|
|
newPos = int64(r.pos) + offset
|
|
case io.SeekEnd:
|
|
newPos = int64(len(r.data)) + offset
|
|
}
|
|
if newPos < 0 || newPos > int64(len(r.data)) {
|
|
return 0, fmt.Errorf("invalid seek position")
|
|
}
|
|
r.pos = int(newPos)
|
|
return newPos, nil
|
|
}
|
|
|
|
func (r *bytesReader) Close() error {
|
|
return nil
|
|
}
|
|
|
|
// sortParts sorts completed parts by number
|
|
func sortParts(parts []types.CompletedPart) {
|
|
for i := range parts {
|
|
for j := i + 1; j < len(parts); j++ {
|
|
if *parts[i].PartNumber > *parts[j].PartNumber {
|
|
parts[i], parts[j] = parts[j], parts[i]
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// MultiFileUploader uploads multiple files in parallel
|
|
type MultiFileUploader struct {
|
|
cfg Config
|
|
client *s3.Client
|
|
semaphore chan struct{}
|
|
}
|
|
|
|
// NewMultiFileUploader creates a new multi-file uploader
|
|
func NewMultiFileUploader(cfg Config) (*MultiFileUploader, error) {
|
|
// Load AWS config
|
|
awsCfg, err := config.LoadDefaultConfig(context.Background(),
|
|
config.WithRegion(cfg.Region),
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to load AWS config: %w", err)
|
|
}
|
|
|
|
clientOpts := []func(*s3.Options){}
|
|
if cfg.Endpoint != "" {
|
|
clientOpts = append(clientOpts, func(o *s3.Options) {
|
|
o.BaseEndpoint = aws.String(cfg.Endpoint)
|
|
o.UsePathStyle = true
|
|
})
|
|
}
|
|
|
|
client := s3.NewFromConfig(awsCfg, clientOpts...)
|
|
|
|
return &MultiFileUploader{
|
|
cfg: cfg,
|
|
client: client,
|
|
semaphore: make(chan struct{}, cfg.WorkerCount),
|
|
}, nil
|
|
}
|
|
|
|
// UploadFile represents a file to upload
|
|
type UploadFile struct {
|
|
Key string
|
|
Reader io.Reader
|
|
Size int64
|
|
}
|
|
|
|
// UploadResult represents the result of an upload
|
|
type UploadResult struct {
|
|
Key string
|
|
Location string
|
|
Error error
|
|
}
|
|
|
|
// Upload uploads multiple files in parallel
|
|
func (u *MultiFileUploader) Upload(ctx context.Context, files []UploadFile) []UploadResult {
|
|
results := make([]UploadResult, len(files))
|
|
var wg sync.WaitGroup
|
|
|
|
for i, file := range files {
|
|
wg.Add(1)
|
|
go func(idx int, f UploadFile) {
|
|
defer wg.Done()
|
|
|
|
// Acquire semaphore
|
|
select {
|
|
case u.semaphore <- struct{}{}:
|
|
defer func() { <-u.semaphore }()
|
|
case <-ctx.Done():
|
|
results[idx] = UploadResult{Key: f.Key, Error: ctx.Err()}
|
|
return
|
|
}
|
|
|
|
// Upload file
|
|
location, err := u.uploadFile(ctx, f)
|
|
results[idx] = UploadResult{
|
|
Key: f.Key,
|
|
Location: location,
|
|
Error: err,
|
|
}
|
|
}(i, file)
|
|
}
|
|
|
|
wg.Wait()
|
|
return results
|
|
}
|
|
|
|
// uploadFile uploads a single file
|
|
func (u *MultiFileUploader) uploadFile(ctx context.Context, file UploadFile) (string, error) {
|
|
// For small files, use PutObject
|
|
if file.Size < u.cfg.PartSize {
|
|
data, err := io.ReadAll(file.Reader)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
result, err := u.client.PutObject(ctx, &s3.PutObjectInput{
|
|
Bucket: aws.String(u.cfg.Bucket),
|
|
Key: aws.String(file.Key),
|
|
Body: newBytesReader(data),
|
|
})
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
_ = result
|
|
return fmt.Sprintf("s3://%s/%s", u.cfg.Bucket, file.Key), nil
|
|
}
|
|
|
|
// For large files, use multipart upload
|
|
cfg := u.cfg
|
|
cfg.Key = file.Key
|
|
|
|
streamer, err := NewCloudStreamer(cfg)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
if err := streamer.Start(ctx); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
if _, err := io.Copy(streamer, file.Reader); err != nil {
|
|
streamer.Cancel()
|
|
return "", err
|
|
}
|
|
|
|
return streamer.Complete(ctx)
|
|
}
|