P0: Add ON_ERROR_STOP=1 to psql (fail fast, not 2.6M errors) P1: Fix pipe deadlock in streaming compression (goroutine+context) P1: Handle SIGPIPE (exit 141) - report compressor as root cause P2: Validate .dump files with pg_restore --list before restore P2: Add fsync after streaming compression for durability Fixes potential hung backups and improves error diagnostics.
1437 lines
44 KiB
Go
Executable File
1437 lines
44 KiB
Go
Executable File
package backup
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"crypto/rand"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"dbbackup/internal/checks"
|
|
"dbbackup/internal/cloud"
|
|
"dbbackup/internal/config"
|
|
"dbbackup/internal/database"
|
|
"dbbackup/internal/logger"
|
|
"dbbackup/internal/metadata"
|
|
"dbbackup/internal/metrics"
|
|
"dbbackup/internal/progress"
|
|
"dbbackup/internal/security"
|
|
"dbbackup/internal/swap"
|
|
)
|
|
|
|
// Engine handles backup operations
|
|
type Engine struct {
|
|
cfg *config.Config
|
|
log logger.Logger
|
|
db database.Database
|
|
progress progress.Indicator
|
|
detailedReporter *progress.DetailedReporter
|
|
silent bool // Silent mode for TUI
|
|
}
|
|
|
|
// New creates a new backup engine
|
|
func New(cfg *config.Config, log logger.Logger, db database.Database) *Engine {
|
|
progressIndicator := progress.NewIndicator(true, "line") // Use line-by-line indicator
|
|
detailedReporter := progress.NewDetailedReporter(progressIndicator, &loggerAdapter{logger: log})
|
|
|
|
return &Engine{
|
|
cfg: cfg,
|
|
log: log,
|
|
db: db,
|
|
progress: progressIndicator,
|
|
detailedReporter: detailedReporter,
|
|
silent: false,
|
|
}
|
|
}
|
|
|
|
// NewWithProgress creates a new backup engine with a custom progress indicator
|
|
func NewWithProgress(cfg *config.Config, log logger.Logger, db database.Database, progressIndicator progress.Indicator) *Engine {
|
|
detailedReporter := progress.NewDetailedReporter(progressIndicator, &loggerAdapter{logger: log})
|
|
|
|
return &Engine{
|
|
cfg: cfg,
|
|
log: log,
|
|
db: db,
|
|
progress: progressIndicator,
|
|
detailedReporter: detailedReporter,
|
|
silent: false,
|
|
}
|
|
}
|
|
|
|
// NewSilent creates a new backup engine in silent mode (for TUI)
|
|
func NewSilent(cfg *config.Config, log logger.Logger, db database.Database, progressIndicator progress.Indicator) *Engine {
|
|
// If no indicator provided, use null indicator (no output)
|
|
if progressIndicator == nil {
|
|
progressIndicator = progress.NewNullIndicator()
|
|
}
|
|
|
|
detailedReporter := progress.NewDetailedReporter(progressIndicator, &loggerAdapter{logger: log})
|
|
|
|
return &Engine{
|
|
cfg: cfg,
|
|
log: log,
|
|
db: db,
|
|
progress: progressIndicator,
|
|
detailedReporter: detailedReporter,
|
|
silent: true, // Silent mode enabled
|
|
}
|
|
}
|
|
|
|
// loggerAdapter adapts our logger to the progress.Logger interface
|
|
type loggerAdapter struct {
|
|
logger logger.Logger
|
|
}
|
|
|
|
func (la *loggerAdapter) Info(msg string, args ...any) {
|
|
la.logger.Info(msg, args...)
|
|
}
|
|
|
|
func (la *loggerAdapter) Warn(msg string, args ...any) {
|
|
la.logger.Warn(msg, args...)
|
|
}
|
|
|
|
func (la *loggerAdapter) Error(msg string, args ...any) {
|
|
la.logger.Error(msg, args...)
|
|
}
|
|
|
|
func (la *loggerAdapter) Debug(msg string, args ...any) {
|
|
la.logger.Debug(msg, args...)
|
|
}
|
|
|
|
// printf prints to stdout only if not in silent mode
|
|
func (e *Engine) printf(format string, args ...interface{}) {
|
|
if !e.silent {
|
|
fmt.Printf(format, args...)
|
|
}
|
|
}
|
|
|
|
// generateOperationID creates a unique operation ID
|
|
func generateOperationID() string {
|
|
bytes := make([]byte, 8)
|
|
rand.Read(bytes)
|
|
return hex.EncodeToString(bytes)
|
|
}
|
|
|
|
// BackupSingle performs a single database backup with detailed progress tracking
|
|
func (e *Engine) BackupSingle(ctx context.Context, databaseName string) error {
|
|
// Start detailed operation tracking
|
|
operationID := generateOperationID()
|
|
tracker := e.detailedReporter.StartOperation(operationID, databaseName, "backup")
|
|
|
|
// Add operation details
|
|
tracker.SetDetails("database", databaseName)
|
|
tracker.SetDetails("type", "single")
|
|
tracker.SetDetails("compression", strconv.Itoa(e.cfg.CompressionLevel))
|
|
tracker.SetDetails("format", "custom")
|
|
|
|
// Start preparing backup directory
|
|
prepStep := tracker.AddStep("prepare", "Preparing backup directory")
|
|
|
|
// Validate and sanitize backup directory path
|
|
validBackupDir, err := security.ValidateBackupPath(e.cfg.BackupDir)
|
|
if err != nil {
|
|
prepStep.Fail(fmt.Errorf("invalid backup directory path: %w", err))
|
|
tracker.Fail(fmt.Errorf("invalid backup directory path: %w", err))
|
|
return fmt.Errorf("invalid backup directory path: %w", err)
|
|
}
|
|
e.cfg.BackupDir = validBackupDir
|
|
|
|
if err := os.MkdirAll(e.cfg.BackupDir, 0755); err != nil {
|
|
err = fmt.Errorf("failed to create backup directory %s. Check write permissions or use --backup-dir to specify writable location: %w", e.cfg.BackupDir, err)
|
|
prepStep.Fail(err)
|
|
tracker.Fail(err)
|
|
return err
|
|
}
|
|
prepStep.Complete("Backup directory prepared")
|
|
tracker.UpdateProgress(10, "Backup directory prepared")
|
|
|
|
// Generate timestamp and filename
|
|
timestamp := time.Now().Format("20060102_150405")
|
|
var outputFile string
|
|
|
|
if e.cfg.IsPostgreSQL() {
|
|
outputFile = filepath.Join(e.cfg.BackupDir, fmt.Sprintf("db_%s_%s.dump", databaseName, timestamp))
|
|
} else {
|
|
outputFile = filepath.Join(e.cfg.BackupDir, fmt.Sprintf("db_%s_%s.sql.gz", databaseName, timestamp))
|
|
}
|
|
|
|
tracker.SetDetails("output_file", outputFile)
|
|
tracker.UpdateProgress(20, "Generated backup filename")
|
|
|
|
// Build backup command
|
|
cmdStep := tracker.AddStep("command", "Building backup command")
|
|
options := database.BackupOptions{
|
|
Compression: e.cfg.CompressionLevel,
|
|
Parallel: e.cfg.DumpJobs,
|
|
Format: "custom",
|
|
Blobs: true,
|
|
NoOwner: false,
|
|
NoPrivileges: false,
|
|
}
|
|
|
|
cmd := e.db.BuildBackupCommand(databaseName, outputFile, options)
|
|
cmdStep.Complete("Backup command prepared")
|
|
tracker.UpdateProgress(30, "Backup command prepared")
|
|
|
|
// Execute backup command with progress monitoring
|
|
execStep := tracker.AddStep("execute", "Executing database backup")
|
|
tracker.UpdateProgress(40, "Starting database backup...")
|
|
|
|
if err := e.executeCommandWithProgress(ctx, cmd, outputFile, tracker); err != nil {
|
|
err = fmt.Errorf("backup failed for %s: %w. Check database connectivity and disk space", databaseName, err)
|
|
execStep.Fail(err)
|
|
tracker.Fail(err)
|
|
return err
|
|
}
|
|
execStep.Complete("Database backup completed")
|
|
tracker.UpdateProgress(80, "Database backup completed")
|
|
|
|
// Verify backup file
|
|
verifyStep := tracker.AddStep("verify", "Verifying backup file")
|
|
if info, err := os.Stat(outputFile); err != nil {
|
|
err = fmt.Errorf("backup file not created at %s. Backup command may have failed silently: %w", outputFile, err)
|
|
verifyStep.Fail(err)
|
|
tracker.Fail(err)
|
|
return err
|
|
} else {
|
|
size := formatBytes(info.Size())
|
|
tracker.SetDetails("file_size", size)
|
|
tracker.SetByteProgress(info.Size(), info.Size())
|
|
verifyStep.Complete(fmt.Sprintf("Backup file verified: %s", size))
|
|
tracker.UpdateProgress(90, fmt.Sprintf("Backup verified: %s", size))
|
|
}
|
|
|
|
// Calculate and save checksum
|
|
checksumStep := tracker.AddStep("checksum", "Calculating SHA-256 checksum")
|
|
if checksum, err := security.ChecksumFile(outputFile); err != nil {
|
|
e.log.Warn("Failed to calculate checksum", "error", err)
|
|
checksumStep.Fail(fmt.Errorf("checksum calculation failed: %w", err))
|
|
} else {
|
|
if err := security.SaveChecksum(outputFile, checksum); err != nil {
|
|
e.log.Warn("Failed to save checksum", "error", err)
|
|
} else {
|
|
checksumStep.Complete(fmt.Sprintf("Checksum: %s", checksum[:16]+"..."))
|
|
e.log.Info("Backup checksum", "sha256", checksum)
|
|
}
|
|
}
|
|
|
|
// Create metadata file
|
|
metaStep := tracker.AddStep("metadata", "Creating metadata file")
|
|
if err := e.createMetadata(outputFile, databaseName, "single", ""); err != nil {
|
|
e.log.Warn("Failed to create metadata file", "error", err)
|
|
metaStep.Fail(fmt.Errorf("metadata creation failed: %w", err))
|
|
} else {
|
|
metaStep.Complete("Metadata file created")
|
|
}
|
|
|
|
// Record metrics for observability
|
|
if info, err := os.Stat(outputFile); err == nil && metrics.GlobalMetrics != nil {
|
|
metrics.GlobalMetrics.RecordOperation("backup_single", databaseName, time.Now().Add(-time.Minute), info.Size(), true, 0)
|
|
}
|
|
|
|
// Cloud upload if enabled
|
|
if e.cfg.CloudEnabled && e.cfg.CloudAutoUpload {
|
|
if err := e.uploadToCloud(ctx, outputFile, tracker); err != nil {
|
|
e.log.Warn("Cloud upload failed", "error", err)
|
|
// Don't fail the backup if cloud upload fails
|
|
}
|
|
}
|
|
|
|
// Complete operation
|
|
tracker.UpdateProgress(100, "Backup operation completed successfully")
|
|
tracker.Complete(fmt.Sprintf("Single database backup completed: %s", filepath.Base(outputFile)))
|
|
|
|
return nil
|
|
}
|
|
|
|
// BackupSample performs a sample database backup
|
|
func (e *Engine) BackupSample(ctx context.Context, databaseName string) error {
|
|
operation := e.log.StartOperation("Sample Database Backup")
|
|
|
|
// Ensure backup directory exists
|
|
if err := os.MkdirAll(e.cfg.BackupDir, 0755); err != nil {
|
|
operation.Fail("Failed to create backup directory")
|
|
return fmt.Errorf("failed to create backup directory: %w", err)
|
|
}
|
|
|
|
// Generate timestamp and filename
|
|
timestamp := time.Now().Format("20060102_150405")
|
|
outputFile := filepath.Join(e.cfg.BackupDir,
|
|
fmt.Sprintf("sample_%s_%s%d_%s.sql", databaseName, e.cfg.SampleStrategy, e.cfg.SampleValue, timestamp))
|
|
|
|
operation.Update("Starting sample database backup")
|
|
e.progress.Start(fmt.Sprintf("Creating sample backup of '%s' (%s=%d)", databaseName, e.cfg.SampleStrategy, e.cfg.SampleValue))
|
|
|
|
// For sample backups, we need to get the schema first, then sample data
|
|
if err := e.createSampleBackup(ctx, databaseName, outputFile); err != nil {
|
|
e.progress.Fail(fmt.Sprintf("Sample backup failed: %v", err))
|
|
operation.Fail("Sample backup failed")
|
|
return fmt.Errorf("sample backup failed: %w", err)
|
|
}
|
|
|
|
// Check output file
|
|
if info, err := os.Stat(outputFile); err != nil {
|
|
e.progress.Fail("Sample backup file not created")
|
|
operation.Fail("Sample backup file not found")
|
|
return fmt.Errorf("sample backup file not created: %w", err)
|
|
} else {
|
|
size := formatBytes(info.Size())
|
|
e.progress.Complete(fmt.Sprintf("Sample backup completed: %s (%s)", filepath.Base(outputFile), size))
|
|
operation.Complete(fmt.Sprintf("Sample backup created: %s (%s)", outputFile, size))
|
|
}
|
|
|
|
// Create metadata file
|
|
if err := e.createMetadata(outputFile, databaseName, "sample", e.cfg.SampleStrategy); err != nil {
|
|
e.log.Warn("Failed to create metadata file", "error", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// BackupCluster performs a full cluster backup (PostgreSQL only)
|
|
func (e *Engine) BackupCluster(ctx context.Context) error {
|
|
if !e.cfg.IsPostgreSQL() {
|
|
return fmt.Errorf("cluster backup is only supported for PostgreSQL")
|
|
}
|
|
|
|
operation := e.log.StartOperation("Cluster Backup")
|
|
|
|
// Setup swap file if configured
|
|
var swapMgr *swap.Manager
|
|
if e.cfg.AutoSwap && e.cfg.SwapFileSizeGB > 0 {
|
|
swapMgr = swap.NewManager(e.cfg.SwapFilePath, e.cfg.SwapFileSizeGB, e.log)
|
|
|
|
if swapMgr.IsSupported() {
|
|
e.log.Info("Setting up temporary swap file for large backup",
|
|
"path", e.cfg.SwapFilePath,
|
|
"size_gb", e.cfg.SwapFileSizeGB)
|
|
|
|
if err := swapMgr.Setup(); err != nil {
|
|
e.log.Warn("Failed to setup swap file (continuing without it)", "error", err)
|
|
} else {
|
|
// Ensure cleanup on exit
|
|
defer func() {
|
|
if err := swapMgr.Cleanup(); err != nil {
|
|
e.log.Warn("Failed to cleanup swap file", "error", err)
|
|
}
|
|
}()
|
|
}
|
|
} else {
|
|
e.log.Warn("Swap file management not supported on this platform", "os", swapMgr)
|
|
}
|
|
}
|
|
|
|
// Use appropriate progress indicator based on silent mode
|
|
var quietProgress progress.Indicator
|
|
if e.silent {
|
|
// In silent mode (TUI), use null indicator - no stdout output at all
|
|
quietProgress = progress.NewNullIndicator()
|
|
} else {
|
|
// In CLI mode, use quiet line-by-line output
|
|
quietProgress = progress.NewQuietLineByLine()
|
|
quietProgress.Start("Starting cluster backup (all databases)")
|
|
}
|
|
|
|
// Ensure backup directory exists
|
|
if err := os.MkdirAll(e.cfg.BackupDir, 0755); err != nil {
|
|
operation.Fail("Failed to create backup directory")
|
|
quietProgress.Fail("Failed to create backup directory")
|
|
return fmt.Errorf("failed to create backup directory: %w", err)
|
|
}
|
|
|
|
// Check disk space before starting backup (cached for performance)
|
|
e.log.Info("Checking disk space availability")
|
|
spaceCheck := checks.CheckDiskSpaceCached(e.cfg.BackupDir)
|
|
|
|
if !e.silent {
|
|
// Show disk space status in CLI mode
|
|
fmt.Println("\n" + checks.FormatDiskSpaceMessage(spaceCheck))
|
|
}
|
|
|
|
if spaceCheck.Critical {
|
|
operation.Fail("Insufficient disk space")
|
|
quietProgress.Fail("Insufficient disk space - free up space and try again")
|
|
return fmt.Errorf("insufficient disk space: %.1f%% used, operation blocked", spaceCheck.UsedPercent)
|
|
}
|
|
|
|
if spaceCheck.Warning {
|
|
e.log.Warn("Low disk space - backup may fail if database is large",
|
|
"available_gb", float64(spaceCheck.AvailableBytes)/(1024*1024*1024),
|
|
"used_percent", spaceCheck.UsedPercent)
|
|
}
|
|
|
|
// Generate timestamp and filename
|
|
timestamp := time.Now().Format("20060102_150405")
|
|
outputFile := filepath.Join(e.cfg.BackupDir, fmt.Sprintf("cluster_%s.tar.gz", timestamp))
|
|
tempDir := filepath.Join(e.cfg.BackupDir, fmt.Sprintf(".cluster_%s", timestamp))
|
|
|
|
operation.Update("Starting cluster backup")
|
|
|
|
// Create temporary directory
|
|
if err := os.MkdirAll(filepath.Join(tempDir, "dumps"), 0755); err != nil {
|
|
operation.Fail("Failed to create temporary directory")
|
|
quietProgress.Fail("Failed to create temporary directory")
|
|
return fmt.Errorf("failed to create temp directory: %w", err)
|
|
}
|
|
defer os.RemoveAll(tempDir)
|
|
|
|
// Backup globals
|
|
e.printf(" Backing up global objects...\n")
|
|
if err := e.backupGlobals(ctx, tempDir); err != nil {
|
|
quietProgress.Fail(fmt.Sprintf("Failed to backup globals: %v", err))
|
|
operation.Fail("Global backup failed")
|
|
return fmt.Errorf("failed to backup globals: %w", err)
|
|
}
|
|
|
|
// Get list of databases
|
|
e.printf(" Getting database list...\n")
|
|
databases, err := e.db.ListDatabases(ctx)
|
|
if err != nil {
|
|
quietProgress.Fail(fmt.Sprintf("Failed to list databases: %v", err))
|
|
operation.Fail("Database listing failed")
|
|
return fmt.Errorf("failed to list databases: %w", err)
|
|
}
|
|
|
|
// Create ETA estimator for database backups
|
|
estimator := progress.NewETAEstimator("Backing up cluster", len(databases))
|
|
quietProgress.SetEstimator(estimator)
|
|
|
|
// Backup each database
|
|
parallelism := e.cfg.ClusterParallelism
|
|
if parallelism < 1 {
|
|
parallelism = 1 // Ensure at least sequential
|
|
}
|
|
|
|
if parallelism == 1 {
|
|
e.printf(" Backing up %d databases sequentially...\n", len(databases))
|
|
} else {
|
|
e.printf(" Backing up %d databases with %d parallel workers...\n", len(databases), parallelism)
|
|
}
|
|
|
|
// Use worker pool for parallel backup
|
|
var successCount, failCount int32
|
|
var mu sync.Mutex // Protect shared resources (printf, estimator)
|
|
|
|
// Create semaphore to limit concurrency
|
|
semaphore := make(chan struct{}, parallelism)
|
|
var wg sync.WaitGroup
|
|
|
|
for i, dbName := range databases {
|
|
// Check if context is cancelled before starting new backup
|
|
select {
|
|
case <-ctx.Done():
|
|
e.log.Info("Backup cancelled by user")
|
|
quietProgress.Fail("Backup cancelled by user (Ctrl+C)")
|
|
operation.Fail("Backup cancelled")
|
|
return fmt.Errorf("backup cancelled: %w", ctx.Err())
|
|
default:
|
|
}
|
|
|
|
wg.Add(1)
|
|
semaphore <- struct{}{} // Acquire
|
|
|
|
go func(idx int, name string) {
|
|
defer wg.Done()
|
|
defer func() { <-semaphore }() // Release
|
|
|
|
// Check for cancellation at start of goroutine
|
|
select {
|
|
case <-ctx.Done():
|
|
e.log.Info("Database backup cancelled", "database", name)
|
|
atomic.AddInt32(&failCount, 1)
|
|
return
|
|
default:
|
|
}
|
|
|
|
// Update estimator progress (thread-safe)
|
|
mu.Lock()
|
|
estimator.UpdateProgress(idx)
|
|
e.printf(" [%d/%d] Backing up database: %s\n", idx+1, len(databases), name)
|
|
quietProgress.Update(fmt.Sprintf("Backing up database %d/%d: %s", idx+1, len(databases), name))
|
|
mu.Unlock()
|
|
|
|
// Check database size and warn if very large
|
|
if size, err := e.db.GetDatabaseSize(ctx, name); err == nil {
|
|
sizeStr := formatBytes(size)
|
|
mu.Lock()
|
|
e.printf(" Database size: %s\n", sizeStr)
|
|
if size > 10*1024*1024*1024 { // > 10GB
|
|
e.printf(" ⚠️ Large database detected - this may take a while\n")
|
|
}
|
|
mu.Unlock()
|
|
}
|
|
|
|
dumpFile := filepath.Join(tempDir, "dumps", name+".dump")
|
|
|
|
compressionLevel := e.cfg.CompressionLevel
|
|
if compressionLevel > 6 {
|
|
compressionLevel = 6
|
|
}
|
|
|
|
format := "custom"
|
|
parallel := e.cfg.DumpJobs
|
|
|
|
if size, err := e.db.GetDatabaseSize(ctx, name); err == nil {
|
|
if size > 5*1024*1024*1024 {
|
|
format = "plain"
|
|
compressionLevel = 0
|
|
parallel = 0
|
|
mu.Lock()
|
|
e.printf(" Using plain format + external compression (optimal for large DBs)\n")
|
|
mu.Unlock()
|
|
}
|
|
}
|
|
|
|
options := database.BackupOptions{
|
|
Compression: compressionLevel,
|
|
Parallel: parallel,
|
|
Format: format,
|
|
Blobs: true,
|
|
NoOwner: false,
|
|
NoPrivileges: false,
|
|
}
|
|
|
|
cmd := e.db.BuildBackupCommand(name, dumpFile, options)
|
|
|
|
// Calculate timeout based on database size:
|
|
// - Minimum 2 hours for small databases
|
|
// - Add 1 hour per 20GB for large databases
|
|
// - This allows ~69GB database to take up to 5+ hours
|
|
timeout := 2 * time.Hour
|
|
if size, err := e.db.GetDatabaseSize(ctx, name); err == nil {
|
|
sizeGB := size / (1024 * 1024 * 1024)
|
|
if sizeGB > 20 {
|
|
extraHours := (sizeGB / 20) + 1
|
|
timeout = time.Duration(2+extraHours) * time.Hour
|
|
mu.Lock()
|
|
e.printf(" Extended timeout: %v (for %dGB database)\n", timeout, sizeGB)
|
|
mu.Unlock()
|
|
}
|
|
}
|
|
|
|
dbCtx, cancel := context.WithTimeout(ctx, timeout)
|
|
defer cancel()
|
|
err := e.executeCommand(dbCtx, cmd, dumpFile)
|
|
cancel()
|
|
|
|
if err != nil {
|
|
e.log.Warn("Failed to backup database", "database", name, "error", err)
|
|
mu.Lock()
|
|
e.printf(" ⚠️ WARNING: Failed to backup %s: %v\n", name, err)
|
|
mu.Unlock()
|
|
atomic.AddInt32(&failCount, 1)
|
|
} else {
|
|
compressedCandidate := strings.TrimSuffix(dumpFile, ".dump") + ".sql.gz"
|
|
mu.Lock()
|
|
if info, err := os.Stat(compressedCandidate); err == nil {
|
|
e.printf(" ✅ Completed %s (%s)\n", name, formatBytes(info.Size()))
|
|
} else if info, err := os.Stat(dumpFile); err == nil {
|
|
e.printf(" ✅ Completed %s (%s)\n", name, formatBytes(info.Size()))
|
|
}
|
|
mu.Unlock()
|
|
atomic.AddInt32(&successCount, 1)
|
|
}
|
|
}(i, dbName)
|
|
}
|
|
|
|
// Wait for all backups to complete
|
|
wg.Wait()
|
|
|
|
successCountFinal := int(atomic.LoadInt32(&successCount))
|
|
failCountFinal := int(atomic.LoadInt32(&failCount))
|
|
|
|
e.printf(" Backup summary: %d succeeded, %d failed\n", successCountFinal, failCountFinal)
|
|
|
|
// Create archive
|
|
e.printf(" Creating compressed archive...\n")
|
|
if err := e.createArchive(ctx, tempDir, outputFile); err != nil {
|
|
quietProgress.Fail(fmt.Sprintf("Failed to create archive: %v", err))
|
|
operation.Fail("Archive creation failed")
|
|
return fmt.Errorf("failed to create archive: %w", err)
|
|
}
|
|
|
|
// Check output file
|
|
if info, err := os.Stat(outputFile); err != nil {
|
|
quietProgress.Fail("Cluster backup archive not created")
|
|
operation.Fail("Cluster backup archive not found")
|
|
return fmt.Errorf("cluster backup archive not created: %w", err)
|
|
} else {
|
|
size := formatBytes(info.Size())
|
|
quietProgress.Complete(fmt.Sprintf("Cluster backup completed: %s (%s)", filepath.Base(outputFile), size))
|
|
operation.Complete(fmt.Sprintf("Cluster backup created: %s (%s)", outputFile, size))
|
|
}
|
|
|
|
// Create cluster metadata file
|
|
if err := e.createClusterMetadata(outputFile, databases, successCountFinal, failCountFinal); err != nil {
|
|
e.log.Warn("Failed to create cluster metadata file", "error", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// executeCommandWithProgress executes a backup command with real-time progress monitoring
|
|
func (e *Engine) executeCommandWithProgress(ctx context.Context, cmdArgs []string, outputFile string, tracker *progress.OperationTracker) error {
|
|
if len(cmdArgs) == 0 {
|
|
return fmt.Errorf("empty command")
|
|
}
|
|
|
|
e.log.Debug("Executing backup command with progress", "cmd", cmdArgs[0], "args", cmdArgs[1:])
|
|
|
|
cmd := exec.CommandContext(ctx, cmdArgs[0], cmdArgs[1:]...)
|
|
|
|
// Set environment variables for database tools
|
|
cmd.Env = os.Environ()
|
|
if e.cfg.Password != "" {
|
|
if e.cfg.IsPostgreSQL() {
|
|
cmd.Env = append(cmd.Env, "PGPASSWORD="+e.cfg.Password)
|
|
} else if e.cfg.IsMySQL() {
|
|
cmd.Env = append(cmd.Env, "MYSQL_PWD="+e.cfg.Password)
|
|
}
|
|
}
|
|
|
|
// For MySQL, handle compression and redirection differently
|
|
if e.cfg.IsMySQL() && e.cfg.CompressionLevel > 0 {
|
|
return e.executeMySQLWithProgressAndCompression(ctx, cmdArgs, outputFile, tracker)
|
|
}
|
|
|
|
// Get stderr pipe for progress monitoring
|
|
stderr, err := cmd.StderrPipe()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get stderr pipe: %w", err)
|
|
}
|
|
|
|
// Start the command
|
|
if err := cmd.Start(); err != nil {
|
|
return fmt.Errorf("failed to start command: %w", err)
|
|
}
|
|
|
|
// Monitor progress via stderr
|
|
go e.monitorCommandProgress(stderr, tracker)
|
|
|
|
// Wait for command to complete
|
|
if err := cmd.Wait(); err != nil {
|
|
return fmt.Errorf("backup command failed: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// monitorCommandProgress monitors command output for progress information
|
|
func (e *Engine) monitorCommandProgress(stderr io.ReadCloser, tracker *progress.OperationTracker) {
|
|
defer stderr.Close()
|
|
|
|
scanner := bufio.NewScanner(stderr)
|
|
scanner.Buffer(make([]byte, 64*1024), 1024*1024) // 64KB initial, 1MB max for performance
|
|
progressBase := 40 // Start from 40% since command preparation is done
|
|
progressIncrement := 0
|
|
|
|
for scanner.Scan() {
|
|
line := strings.TrimSpace(scanner.Text())
|
|
if line == "" {
|
|
continue
|
|
}
|
|
|
|
e.log.Debug("Command output", "line", line)
|
|
|
|
// Increment progress gradually based on output
|
|
if progressBase < 75 {
|
|
progressIncrement++
|
|
if progressIncrement%5 == 0 { // Update every 5 lines
|
|
progressBase += 2
|
|
tracker.UpdateProgress(progressBase, "Processing data...")
|
|
}
|
|
}
|
|
|
|
// Look for specific progress indicators
|
|
if strings.Contains(line, "COPY") {
|
|
tracker.UpdateProgress(progressBase+5, "Copying table data...")
|
|
} else if strings.Contains(line, "completed") {
|
|
tracker.UpdateProgress(75, "Backup nearly complete...")
|
|
} else if strings.Contains(line, "done") {
|
|
tracker.UpdateProgress(78, "Finalizing backup...")
|
|
}
|
|
}
|
|
}
|
|
|
|
// executeMySQLWithProgressAndCompression handles MySQL backup with compression and progress
|
|
func (e *Engine) executeMySQLWithProgressAndCompression(ctx context.Context, cmdArgs []string, outputFile string, tracker *progress.OperationTracker) error {
|
|
// Create mysqldump command
|
|
dumpCmd := exec.CommandContext(ctx, cmdArgs[0], cmdArgs[1:]...)
|
|
dumpCmd.Env = os.Environ()
|
|
if e.cfg.Password != "" {
|
|
dumpCmd.Env = append(dumpCmd.Env, "MYSQL_PWD="+e.cfg.Password)
|
|
}
|
|
|
|
// Create gzip command
|
|
gzipCmd := exec.CommandContext(ctx, "gzip", fmt.Sprintf("-%d", e.cfg.CompressionLevel))
|
|
|
|
// Create output file
|
|
outFile, err := os.Create(outputFile)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create output file: %w", err)
|
|
}
|
|
defer outFile.Close()
|
|
|
|
// Set up pipeline: mysqldump | gzip > outputfile
|
|
pipe, err := dumpCmd.StdoutPipe()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create pipe: %w", err)
|
|
}
|
|
|
|
gzipCmd.Stdin = pipe
|
|
gzipCmd.Stdout = outFile
|
|
|
|
// Get stderr for progress monitoring
|
|
stderr, err := dumpCmd.StderrPipe()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get stderr pipe: %w", err)
|
|
}
|
|
|
|
// Start monitoring progress
|
|
go e.monitorCommandProgress(stderr, tracker)
|
|
|
|
// Start both commands
|
|
if err := gzipCmd.Start(); err != nil {
|
|
return fmt.Errorf("failed to start gzip: %w", err)
|
|
}
|
|
|
|
if err := dumpCmd.Start(); err != nil {
|
|
return fmt.Errorf("failed to start mysqldump: %w", err)
|
|
}
|
|
|
|
// Wait for mysqldump to complete
|
|
if err := dumpCmd.Wait(); err != nil {
|
|
return fmt.Errorf("mysqldump failed: %w", err)
|
|
}
|
|
|
|
// Close pipe and wait for gzip
|
|
pipe.Close()
|
|
if err := gzipCmd.Wait(); err != nil {
|
|
return fmt.Errorf("gzip failed: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// executeMySQLWithCompression handles MySQL backup with compression
|
|
func (e *Engine) executeMySQLWithCompression(ctx context.Context, cmdArgs []string, outputFile string) error {
|
|
// Create mysqldump command
|
|
dumpCmd := exec.CommandContext(ctx, cmdArgs[0], cmdArgs[1:]...)
|
|
dumpCmd.Env = os.Environ()
|
|
if e.cfg.Password != "" {
|
|
dumpCmd.Env = append(dumpCmd.Env, "MYSQL_PWD="+e.cfg.Password)
|
|
}
|
|
|
|
// Create gzip command
|
|
gzipCmd := exec.CommandContext(ctx, "gzip", fmt.Sprintf("-%d", e.cfg.CompressionLevel))
|
|
|
|
// Create output file
|
|
outFile, err := os.Create(outputFile)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create output file: %w", err)
|
|
}
|
|
defer outFile.Close()
|
|
|
|
// Set up pipeline: mysqldump | gzip > outputfile
|
|
stdin, err := dumpCmd.StdoutPipe()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create pipe: %w", err)
|
|
}
|
|
gzipCmd.Stdin = stdin
|
|
gzipCmd.Stdout = outFile
|
|
|
|
// Start both commands
|
|
if err := gzipCmd.Start(); err != nil {
|
|
return fmt.Errorf("failed to start gzip: %w", err)
|
|
}
|
|
|
|
if err := dumpCmd.Run(); err != nil {
|
|
return fmt.Errorf("mysqldump failed: %w", err)
|
|
}
|
|
|
|
if err := gzipCmd.Wait(); err != nil {
|
|
return fmt.Errorf("gzip failed: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// createSampleBackup creates a sample backup with reduced dataset
|
|
func (e *Engine) createSampleBackup(ctx context.Context, databaseName, outputFile string) error {
|
|
// This is a simplified implementation
|
|
// A full implementation would:
|
|
// 1. Export schema
|
|
// 2. Get list of tables
|
|
// 3. For each table, run sampling query
|
|
// 4. Combine into single SQL file
|
|
|
|
// For now, we'll use a simple approach with schema-only backup first
|
|
// Then add sample data
|
|
|
|
file, err := os.Create(outputFile)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create sample backup file: %w", err)
|
|
}
|
|
defer file.Close()
|
|
|
|
// Write header
|
|
fmt.Fprintf(file, "-- Sample Database Backup\n")
|
|
fmt.Fprintf(file, "-- Database: %s\n", databaseName)
|
|
fmt.Fprintf(file, "-- Strategy: %s = %d\n", e.cfg.SampleStrategy, e.cfg.SampleValue)
|
|
fmt.Fprintf(file, "-- Created: %s\n", time.Now().Format(time.RFC3339))
|
|
fmt.Fprintf(file, "-- WARNING: This backup may have referential integrity issues!\n\n")
|
|
|
|
// For PostgreSQL, we can use pg_dump --schema-only first
|
|
if e.cfg.IsPostgreSQL() {
|
|
// Get schema
|
|
schemaCmd := e.db.BuildBackupCommand(databaseName, "/dev/stdout", database.BackupOptions{
|
|
SchemaOnly: true,
|
|
Format: "plain",
|
|
})
|
|
|
|
cmd := exec.CommandContext(ctx, schemaCmd[0], schemaCmd[1:]...)
|
|
cmd.Env = os.Environ()
|
|
if e.cfg.Password != "" {
|
|
cmd.Env = append(cmd.Env, "PGPASSWORD="+e.cfg.Password)
|
|
}
|
|
cmd.Stdout = file
|
|
|
|
if err := cmd.Run(); err != nil {
|
|
return fmt.Errorf("failed to export schema: %w", err)
|
|
}
|
|
|
|
fmt.Fprintf(file, "\n-- Sample data follows\n\n")
|
|
|
|
// Get tables and sample data
|
|
tables, err := e.db.ListTables(ctx, databaseName)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to list tables: %w", err)
|
|
}
|
|
|
|
strategy := database.SampleStrategy{
|
|
Type: e.cfg.SampleStrategy,
|
|
Value: e.cfg.SampleValue,
|
|
}
|
|
|
|
for _, table := range tables {
|
|
fmt.Fprintf(file, "-- Data for table: %s\n", table)
|
|
sampleQuery := e.db.BuildSampleQuery(databaseName, table, strategy)
|
|
fmt.Fprintf(file, "\\copy (%s) TO STDOUT\n\n", sampleQuery)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// backupGlobals creates a backup of global PostgreSQL objects
|
|
func (e *Engine) backupGlobals(ctx context.Context, tempDir string) error {
|
|
globalsFile := filepath.Join(tempDir, "globals.sql")
|
|
|
|
cmd := exec.CommandContext(ctx, "pg_dumpall", "--globals-only")
|
|
if e.cfg.Host != "localhost" {
|
|
cmd.Args = append(cmd.Args, "-h", e.cfg.Host, "-p", fmt.Sprintf("%d", e.cfg.Port))
|
|
}
|
|
cmd.Args = append(cmd.Args, "-U", e.cfg.User)
|
|
|
|
cmd.Env = os.Environ()
|
|
if e.cfg.Password != "" {
|
|
cmd.Env = append(cmd.Env, "PGPASSWORD="+e.cfg.Password)
|
|
}
|
|
|
|
output, err := cmd.Output()
|
|
if err != nil {
|
|
return fmt.Errorf("pg_dumpall failed: %w", err)
|
|
}
|
|
|
|
return os.WriteFile(globalsFile, output, 0644)
|
|
}
|
|
|
|
// createArchive creates a compressed tar archive
|
|
func (e *Engine) createArchive(ctx context.Context, sourceDir, outputFile string) error {
|
|
// Use pigz for faster parallel compression if available, otherwise use standard gzip
|
|
compressCmd := "tar"
|
|
compressArgs := []string{"-czf", outputFile, "-C", sourceDir, "."}
|
|
|
|
// Check if pigz is available for faster parallel compression
|
|
if _, err := exec.LookPath("pigz"); err == nil {
|
|
// Use pigz with number of cores for parallel compression
|
|
compressArgs = []string{"-cf", "-", "-C", sourceDir, "."}
|
|
cmd := exec.CommandContext(ctx, "tar", compressArgs...)
|
|
|
|
// Create output file
|
|
outFile, err := os.Create(outputFile)
|
|
if err != nil {
|
|
// Fallback to regular tar
|
|
goto regularTar
|
|
}
|
|
defer outFile.Close()
|
|
|
|
// Pipe to pigz for parallel compression
|
|
pigzCmd := exec.CommandContext(ctx, "pigz", "-p", strconv.Itoa(e.cfg.Jobs))
|
|
|
|
tarOut, err := cmd.StdoutPipe()
|
|
if err != nil {
|
|
outFile.Close()
|
|
// Fallback to regular tar
|
|
goto regularTar
|
|
}
|
|
pigzCmd.Stdin = tarOut
|
|
pigzCmd.Stdout = outFile
|
|
|
|
// Start both commands
|
|
if err := pigzCmd.Start(); err != nil {
|
|
outFile.Close()
|
|
goto regularTar
|
|
}
|
|
if err := cmd.Start(); err != nil {
|
|
pigzCmd.Process.Kill()
|
|
outFile.Close()
|
|
goto regularTar
|
|
}
|
|
|
|
// Wait for tar to finish
|
|
if err := cmd.Wait(); err != nil {
|
|
pigzCmd.Process.Kill()
|
|
return fmt.Errorf("tar failed: %w", err)
|
|
}
|
|
|
|
// Wait for pigz to finish
|
|
if err := pigzCmd.Wait(); err != nil {
|
|
return fmt.Errorf("pigz compression failed: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
regularTar:
|
|
// Standard tar with gzip (fallback)
|
|
cmd := exec.CommandContext(ctx, compressCmd, compressArgs...)
|
|
|
|
// Stream stderr to avoid memory issues
|
|
// Use io.Copy to ensure goroutine completes when pipe closes
|
|
stderr, err := cmd.StderrPipe()
|
|
if err == nil {
|
|
go func() {
|
|
scanner := bufio.NewScanner(stderr)
|
|
for scanner.Scan() {
|
|
line := scanner.Text()
|
|
if line != "" {
|
|
e.log.Debug("Archive creation", "output", line)
|
|
}
|
|
}
|
|
// Scanner will exit when stderr pipe closes after cmd.Wait()
|
|
}()
|
|
}
|
|
|
|
if err := cmd.Run(); err != nil {
|
|
return fmt.Errorf("tar failed: %w", err)
|
|
}
|
|
// cmd.Run() calls Wait() which closes stderr pipe, terminating the goroutine
|
|
return nil
|
|
}
|
|
|
|
// createMetadata creates a metadata file for the backup
|
|
func (e *Engine) createMetadata(backupFile, database, backupType, strategy string) error {
|
|
startTime := time.Now()
|
|
|
|
// Get backup file information
|
|
info, err := os.Stat(backupFile)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to stat backup file: %w", err)
|
|
}
|
|
|
|
// Calculate SHA-256 checksum
|
|
sha256, err := metadata.CalculateSHA256(backupFile)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to calculate checksum: %w", err)
|
|
}
|
|
|
|
// Get database version
|
|
ctx := context.Background()
|
|
dbVersion, _ := e.db.GetVersion(ctx)
|
|
if dbVersion == "" {
|
|
dbVersion = "unknown"
|
|
}
|
|
|
|
// Determine compression format
|
|
compressionFormat := "none"
|
|
if e.cfg.CompressionLevel > 0 {
|
|
if e.cfg.Jobs > 1 {
|
|
compressionFormat = fmt.Sprintf("pigz-%d", e.cfg.CompressionLevel)
|
|
} else {
|
|
compressionFormat = fmt.Sprintf("gzip-%d", e.cfg.CompressionLevel)
|
|
}
|
|
}
|
|
|
|
// Create backup metadata
|
|
meta := &metadata.BackupMetadata{
|
|
Version: "2.0",
|
|
Timestamp: startTime,
|
|
Database: database,
|
|
DatabaseType: e.cfg.DatabaseType,
|
|
DatabaseVersion: dbVersion,
|
|
Host: e.cfg.Host,
|
|
Port: e.cfg.Port,
|
|
User: e.cfg.User,
|
|
BackupFile: backupFile,
|
|
SizeBytes: info.Size(),
|
|
SHA256: sha256,
|
|
Compression: compressionFormat,
|
|
BackupType: backupType,
|
|
Duration: time.Since(startTime).Seconds(),
|
|
ExtraInfo: make(map[string]string),
|
|
}
|
|
|
|
// Add strategy for sample backups
|
|
if strategy != "" {
|
|
meta.ExtraInfo["sample_strategy"] = strategy
|
|
meta.ExtraInfo["sample_value"] = fmt.Sprintf("%d", e.cfg.SampleValue)
|
|
}
|
|
|
|
// Save metadata
|
|
if err := meta.Save(); err != nil {
|
|
return fmt.Errorf("failed to save metadata: %w", err)
|
|
}
|
|
|
|
// Also save legacy .info file for backward compatibility
|
|
legacyMetaFile := backupFile + ".info"
|
|
legacyContent := fmt.Sprintf(`{
|
|
"type": "%s",
|
|
"database": "%s",
|
|
"timestamp": "%s",
|
|
"host": "%s",
|
|
"port": %d,
|
|
"user": "%s",
|
|
"db_type": "%s",
|
|
"compression": %d,
|
|
"size_bytes": %d
|
|
}`, backupType, database, startTime.Format("20060102_150405"),
|
|
e.cfg.Host, e.cfg.Port, e.cfg.User, e.cfg.DatabaseType,
|
|
e.cfg.CompressionLevel, info.Size())
|
|
|
|
if err := os.WriteFile(legacyMetaFile, []byte(legacyContent), 0644); err != nil {
|
|
e.log.Warn("Failed to save legacy metadata file", "error", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// createClusterMetadata creates metadata for cluster backups
|
|
func (e *Engine) createClusterMetadata(backupFile string, databases []string, successCount, failCount int) error {
|
|
startTime := time.Now()
|
|
|
|
// Get backup file information
|
|
info, err := os.Stat(backupFile)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to stat backup file: %w", err)
|
|
}
|
|
|
|
// Calculate SHA-256 checksum for archive
|
|
sha256, err := metadata.CalculateSHA256(backupFile)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to calculate checksum: %w", err)
|
|
}
|
|
|
|
// Get database version
|
|
ctx := context.Background()
|
|
dbVersion, _ := e.db.GetVersion(ctx)
|
|
if dbVersion == "" {
|
|
dbVersion = "unknown"
|
|
}
|
|
|
|
// Create cluster metadata
|
|
clusterMeta := &metadata.ClusterMetadata{
|
|
Version: "2.0",
|
|
Timestamp: startTime,
|
|
ClusterName: fmt.Sprintf("%s:%d", e.cfg.Host, e.cfg.Port),
|
|
DatabaseType: e.cfg.DatabaseType,
|
|
Host: e.cfg.Host,
|
|
Port: e.cfg.Port,
|
|
Databases: make([]metadata.BackupMetadata, 0),
|
|
TotalSize: info.Size(),
|
|
Duration: time.Since(startTime).Seconds(),
|
|
ExtraInfo: map[string]string{
|
|
"database_count": fmt.Sprintf("%d", len(databases)),
|
|
"success_count": fmt.Sprintf("%d", successCount),
|
|
"failure_count": fmt.Sprintf("%d", failCount),
|
|
"archive_sha256": sha256,
|
|
"database_version": dbVersion,
|
|
},
|
|
}
|
|
|
|
// Add database names to metadata
|
|
for _, dbName := range databases {
|
|
dbMeta := metadata.BackupMetadata{
|
|
Database: dbName,
|
|
DatabaseType: e.cfg.DatabaseType,
|
|
DatabaseVersion: dbVersion,
|
|
Timestamp: startTime,
|
|
}
|
|
clusterMeta.Databases = append(clusterMeta.Databases, dbMeta)
|
|
}
|
|
|
|
// Save cluster metadata
|
|
if err := clusterMeta.Save(backupFile); err != nil {
|
|
return fmt.Errorf("failed to save cluster metadata: %w", err)
|
|
}
|
|
|
|
// Also save legacy .info file for backward compatibility
|
|
legacyMetaFile := backupFile + ".info"
|
|
legacyContent := fmt.Sprintf(`{
|
|
"type": "cluster",
|
|
"database": "cluster",
|
|
"timestamp": "%s",
|
|
"host": "%s",
|
|
"port": %d,
|
|
"user": "%s",
|
|
"db_type": "%s",
|
|
"compression": %d,
|
|
"size_bytes": %d,
|
|
"database_count": %d,
|
|
"success_count": %d,
|
|
"failure_count": %d
|
|
}`, startTime.Format("20060102_150405"),
|
|
e.cfg.Host, e.cfg.Port, e.cfg.User, e.cfg.DatabaseType,
|
|
e.cfg.CompressionLevel, info.Size(), len(databases), successCount, failCount)
|
|
|
|
if err := os.WriteFile(legacyMetaFile, []byte(legacyContent), 0644); err != nil {
|
|
e.log.Warn("Failed to save legacy cluster metadata file", "error", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// uploadToCloud uploads a backup file to cloud storage
|
|
func (e *Engine) uploadToCloud(ctx context.Context, backupFile string, tracker *progress.OperationTracker) error {
|
|
uploadStep := tracker.AddStep("cloud_upload", "Uploading to cloud storage")
|
|
|
|
// Create cloud backend
|
|
cloudCfg := &cloud.Config{
|
|
Provider: e.cfg.CloudProvider,
|
|
Bucket: e.cfg.CloudBucket,
|
|
Region: e.cfg.CloudRegion,
|
|
Endpoint: e.cfg.CloudEndpoint,
|
|
AccessKey: e.cfg.CloudAccessKey,
|
|
SecretKey: e.cfg.CloudSecretKey,
|
|
Prefix: e.cfg.CloudPrefix,
|
|
UseSSL: true,
|
|
PathStyle: e.cfg.CloudProvider == "minio",
|
|
Timeout: 300,
|
|
MaxRetries: 3,
|
|
}
|
|
|
|
backend, err := cloud.NewBackend(cloudCfg)
|
|
if err != nil {
|
|
uploadStep.Fail(fmt.Errorf("failed to create cloud backend: %w", err))
|
|
return err
|
|
}
|
|
|
|
// Get file info
|
|
info, err := os.Stat(backupFile)
|
|
if err != nil {
|
|
uploadStep.Fail(fmt.Errorf("failed to stat backup file: %w", err))
|
|
return err
|
|
}
|
|
|
|
filename := filepath.Base(backupFile)
|
|
e.log.Info("Uploading backup to cloud", "file", filename, "size", cloud.FormatSize(info.Size()))
|
|
|
|
// Progress callback
|
|
var lastPercent int
|
|
progressCallback := func(transferred, total int64) {
|
|
percent := int(float64(transferred) / float64(total) * 100)
|
|
if percent != lastPercent && percent%10 == 0 {
|
|
e.log.Debug("Upload progress", "percent", percent, "transferred", cloud.FormatSize(transferred), "total", cloud.FormatSize(total))
|
|
lastPercent = percent
|
|
}
|
|
}
|
|
|
|
// Upload to cloud
|
|
err = backend.Upload(ctx, backupFile, filename, progressCallback)
|
|
if err != nil {
|
|
uploadStep.Fail(fmt.Errorf("cloud upload failed: %w", err))
|
|
return err
|
|
}
|
|
|
|
// Also upload metadata file
|
|
metaFile := backupFile + ".meta.json"
|
|
if _, err := os.Stat(metaFile); err == nil {
|
|
metaFilename := filepath.Base(metaFile)
|
|
if err := backend.Upload(ctx, metaFile, metaFilename, nil); err != nil {
|
|
e.log.Warn("Failed to upload metadata file", "error", err)
|
|
// Don't fail if metadata upload fails
|
|
}
|
|
}
|
|
|
|
uploadStep.Complete(fmt.Sprintf("Uploaded to %s/%s/%s", backend.Name(), e.cfg.CloudBucket, filename))
|
|
e.log.Info("Backup uploaded to cloud", "provider", backend.Name(), "bucket", e.cfg.CloudBucket, "file", filename)
|
|
|
|
return nil
|
|
}
|
|
|
|
// executeCommand executes a backup command (optimized for huge databases)
|
|
func (e *Engine) executeCommand(ctx context.Context, cmdArgs []string, outputFile string) error {
|
|
if len(cmdArgs) == 0 {
|
|
return fmt.Errorf("empty command")
|
|
}
|
|
|
|
e.log.Debug("Executing backup command", "cmd", cmdArgs[0], "args", cmdArgs[1:])
|
|
|
|
// Check if pg_dump will write to stdout (which means we need to handle piping to compressor).
|
|
// BuildBackupCommand omits --file when format==plain AND compression==0, causing pg_dump
|
|
// to write to stdout. In that case we must pipe to external compressor.
|
|
usesStdout := false
|
|
isPlainFormat := false
|
|
hasFileFlag := false
|
|
|
|
for _, arg := range cmdArgs {
|
|
if strings.HasPrefix(arg, "--format=") && strings.Contains(arg, "plain") {
|
|
isPlainFormat = true
|
|
}
|
|
if arg == "-Fp" {
|
|
isPlainFormat = true
|
|
}
|
|
if arg == "--file" || strings.HasPrefix(arg, "--file=") {
|
|
hasFileFlag = true
|
|
}
|
|
}
|
|
|
|
// If plain format and no --file specified, pg_dump writes to stdout
|
|
if isPlainFormat && !hasFileFlag {
|
|
usesStdout = true
|
|
}
|
|
|
|
e.log.Debug("Backup command analysis",
|
|
"plain_format", isPlainFormat,
|
|
"has_file_flag", hasFileFlag,
|
|
"uses_stdout", usesStdout,
|
|
"output_file", outputFile)
|
|
|
|
// For MySQL, handle compression differently
|
|
if e.cfg.IsMySQL() && e.cfg.CompressionLevel > 0 {
|
|
return e.executeMySQLWithCompression(ctx, cmdArgs, outputFile)
|
|
}
|
|
|
|
// For plain format writing to stdout, use streaming compression
|
|
if usesStdout {
|
|
e.log.Debug("Using streaming compression for large database")
|
|
return e.executeWithStreamingCompression(ctx, cmdArgs, outputFile)
|
|
}
|
|
|
|
// For custom format, pg_dump handles everything (writes directly to file)
|
|
// NO GO BUFFERING - pg_dump writes directly to disk
|
|
cmd := exec.CommandContext(ctx, cmdArgs[0], cmdArgs[1:]...)
|
|
|
|
// Set environment variables for database tools
|
|
cmd.Env = os.Environ()
|
|
if e.cfg.Password != "" {
|
|
if e.cfg.IsPostgreSQL() {
|
|
cmd.Env = append(cmd.Env, "PGPASSWORD="+e.cfg.Password)
|
|
} else if e.cfg.IsMySQL() {
|
|
cmd.Env = append(cmd.Env, "MYSQL_PWD="+e.cfg.Password)
|
|
}
|
|
}
|
|
|
|
// Stream stderr to avoid memory issues with large databases
|
|
stderr, err := cmd.StderrPipe()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create stderr pipe: %w", err)
|
|
}
|
|
|
|
// Start the command
|
|
if err := cmd.Start(); err != nil {
|
|
return fmt.Errorf("failed to start backup command: %w", err)
|
|
}
|
|
|
|
// Stream stderr output (don't buffer it all in memory)
|
|
go func() {
|
|
scanner := bufio.NewScanner(stderr)
|
|
scanner.Buffer(make([]byte, 64*1024), 1024*1024) // 1MB max line size
|
|
for scanner.Scan() {
|
|
line := scanner.Text()
|
|
if line != "" {
|
|
e.log.Debug("Backup output", "line", line)
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Wait for command to complete
|
|
if err := cmd.Wait(); err != nil {
|
|
e.log.Error("Backup command failed", "error", err, "database", filepath.Base(outputFile))
|
|
return fmt.Errorf("backup command failed: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// executeWithStreamingCompression handles plain format dumps with external compression
|
|
// Uses: pg_dump | pigz > file.sql.gz (zero-copy streaming)
|
|
func (e *Engine) executeWithStreamingCompression(ctx context.Context, cmdArgs []string, outputFile string) error {
|
|
e.log.Debug("Using streaming compression for large database")
|
|
|
|
// Derive compressed output filename. If the output was named *.dump we replace that
|
|
// with *.sql.gz; otherwise append .gz to the provided output file so we don't
|
|
// accidentally create unwanted double extensions.
|
|
var compressedFile string
|
|
lowerOut := strings.ToLower(outputFile)
|
|
if strings.HasSuffix(lowerOut, ".dump") {
|
|
compressedFile = strings.TrimSuffix(outputFile, ".dump") + ".sql.gz"
|
|
} else if strings.HasSuffix(lowerOut, ".sql") {
|
|
compressedFile = outputFile + ".gz"
|
|
} else {
|
|
compressedFile = outputFile + ".gz"
|
|
}
|
|
|
|
// Create pg_dump command
|
|
dumpCmd := exec.CommandContext(ctx, cmdArgs[0], cmdArgs[1:]...)
|
|
dumpCmd.Env = os.Environ()
|
|
if e.cfg.Password != "" && e.cfg.IsPostgreSQL() {
|
|
dumpCmd.Env = append(dumpCmd.Env, "PGPASSWORD="+e.cfg.Password)
|
|
}
|
|
|
|
// Check for pigz (parallel gzip)
|
|
compressor := "gzip"
|
|
compressorArgs := []string{"-c"}
|
|
|
|
if _, err := exec.LookPath("pigz"); err == nil {
|
|
compressor = "pigz"
|
|
compressorArgs = []string{"-p", strconv.Itoa(e.cfg.Jobs), "-c"}
|
|
e.log.Debug("Using pigz for parallel compression", "threads", e.cfg.Jobs)
|
|
}
|
|
|
|
// Create compression command
|
|
compressCmd := exec.CommandContext(ctx, compressor, compressorArgs...)
|
|
|
|
// Create output file
|
|
outFile, err := os.Create(compressedFile)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create output file: %w", err)
|
|
}
|
|
defer outFile.Close()
|
|
|
|
// Set up pipeline: pg_dump | pigz > file.sql.gz
|
|
dumpStdout, err := dumpCmd.StdoutPipe()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create dump stdout pipe: %w", err)
|
|
}
|
|
|
|
compressCmd.Stdin = dumpStdout
|
|
compressCmd.Stdout = outFile
|
|
|
|
// Capture stderr from both commands
|
|
dumpStderr, err := dumpCmd.StderrPipe()
|
|
if err != nil {
|
|
e.log.Warn("Failed to capture dump stderr", "error", err)
|
|
}
|
|
compressStderr, err := compressCmd.StderrPipe()
|
|
if err != nil {
|
|
e.log.Warn("Failed to capture compress stderr", "error", err)
|
|
}
|
|
|
|
// Stream stderr output
|
|
if dumpStderr != nil {
|
|
go func() {
|
|
scanner := bufio.NewScanner(dumpStderr)
|
|
for scanner.Scan() {
|
|
line := scanner.Text()
|
|
if line != "" {
|
|
e.log.Debug("pg_dump", "output", line)
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
if compressStderr != nil {
|
|
go func() {
|
|
scanner := bufio.NewScanner(compressStderr)
|
|
for scanner.Scan() {
|
|
line := scanner.Text()
|
|
if line != "" {
|
|
e.log.Debug("compression", "output", line)
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Start compression first
|
|
if err := compressCmd.Start(); err != nil {
|
|
return fmt.Errorf("failed to start compressor: %w", err)
|
|
}
|
|
|
|
// Then start pg_dump
|
|
if err := dumpCmd.Start(); err != nil {
|
|
compressCmd.Process.Kill()
|
|
return fmt.Errorf("failed to start pg_dump: %w", err)
|
|
}
|
|
|
|
// Wait for pg_dump in a goroutine to handle context timeout properly
|
|
// This prevents deadlock if pipe buffer fills and pg_dump blocks
|
|
dumpDone := make(chan error, 1)
|
|
go func() {
|
|
dumpDone <- dumpCmd.Wait()
|
|
}()
|
|
|
|
var dumpErr error
|
|
select {
|
|
case dumpErr = <-dumpDone:
|
|
// pg_dump completed (success or failure)
|
|
case <-ctx.Done():
|
|
// Context cancelled/timeout - kill pg_dump to unblock
|
|
e.log.Warn("Backup timeout - killing pg_dump process")
|
|
dumpCmd.Process.Kill()
|
|
<-dumpDone // Wait for goroutine to finish
|
|
dumpErr = ctx.Err()
|
|
}
|
|
|
|
// Close stdout pipe to signal compressor we're done
|
|
// This MUST happen after pg_dump exits to avoid broken pipe
|
|
dumpStdout.Close()
|
|
|
|
// Wait for compression to complete
|
|
compressErr := compressCmd.Wait()
|
|
|
|
// Check errors - compressor failure first (it's usually the root cause)
|
|
if compressErr != nil {
|
|
e.log.Error("Compressor failed", "error", compressErr)
|
|
return fmt.Errorf("compression failed (check disk space): %w", compressErr)
|
|
}
|
|
if dumpErr != nil {
|
|
// Check for SIGPIPE (exit code 141) - indicates compressor died first
|
|
if exitErr, ok := dumpErr.(*exec.ExitError); ok && exitErr.ExitCode() == 141 {
|
|
e.log.Error("pg_dump received SIGPIPE - compressor may have failed")
|
|
return fmt.Errorf("pg_dump broken pipe - check disk space and compressor")
|
|
}
|
|
return fmt.Errorf("pg_dump failed: %w", dumpErr)
|
|
}
|
|
|
|
// Sync file to disk to ensure durability (prevents truncation on power loss)
|
|
if err := outFile.Sync(); err != nil {
|
|
e.log.Warn("Failed to sync output file", "error", err)
|
|
}
|
|
|
|
e.log.Debug("Streaming compression completed", "output", compressedFile)
|
|
return nil
|
|
}
|
|
|
|
// formatBytes formats byte count in human-readable format
|
|
func formatBytes(bytes int64) string {
|
|
const unit = 1024
|
|
if bytes < unit {
|
|
return fmt.Sprintf("%d B", bytes)
|
|
}
|
|
div, exp := int64(unit), 0
|
|
for n := bytes / unit; n >= unit; n /= unit {
|
|
div *= unit
|
|
exp++
|
|
}
|
|
return fmt.Sprintf("%.1f %cB", float64(bytes)/float64(div), "KMGTPE"[exp])
|
|
}
|