diff --git a/cmd/restore.go b/cmd/restore.go index af1a8e2..df461b5 100644 --- a/cmd/restore.go +++ b/cmd/restore.go @@ -262,6 +262,8 @@ func runRestoreSingle(cmd *cobra.Command, args []string) error { sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) + defer signal.Stop(sigChan) // Ensure signal cleanup on exit + go func() { <-sigChan log.Warn("Restore interrupted by user") @@ -356,6 +358,8 @@ func runRestoreCluster(cmd *cobra.Command, args []string) error { sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) + defer signal.Stop(sigChan) // Ensure signal cleanup on exit + go func() { <-sigChan log.Warn("Restore interrupted by user") diff --git a/internal/backup/engine.go b/internal/backup/engine.go index 3128a03..8e14697 100644 --- a/internal/backup/engine.go +++ b/internal/backup/engine.go @@ -12,6 +12,8 @@ import ( "path/filepath" "strconv" "strings" + "sync" + "sync/atomic" "time" "dbbackup/internal/config" @@ -338,90 +340,115 @@ func (e *Engine) BackupCluster(ctx context.Context) error { quietProgress.SetEstimator(estimator) // Backup each database - e.printf(" Backing up %d databases...\n", len(databases)) - successCount := 0 - failCount := 0 - - for i, dbName := range databases { - // Update estimator progress - estimator.UpdateProgress(i) - - e.printf(" [%d/%d] Backing up database: %s\n", i+1, len(databases), dbName) - quietProgress.Update(fmt.Sprintf("Backing up database %d/%d: %s", i+1, len(databases), dbName)) - - // Check database size and warn if very large - if size, err := e.db.GetDatabaseSize(ctx, dbName); err == nil { - sizeStr := formatBytes(size) - e.printf(" Database size: %s\n", sizeStr) - if size > 10*1024*1024*1024 { // > 10GB - e.printf(" ⚠️ Large database detected - this may take a while\n") - } - } - - dumpFile := filepath.Join(tempDir, "dumps", dbName+".dump") - - // For cluster backups, use settings optimized for large databases: - // - Lower compression (faster, less memory) - // - Use parallel dumps if configured - // - Smart format selection based on size - - compressionLevel := e.cfg.CompressionLevel - if compressionLevel > 6 { - compressionLevel = 6 // Cap at 6 for cluster backups to reduce memory - } - - // Determine optimal format based on database size - format := "custom" - parallel := e.cfg.DumpJobs - - // For large databases (>5GB), use plain format with external compression - // This avoids pg_dump's custom format memory overhead - if size, err := e.db.GetDatabaseSize(ctx, dbName); err == nil { - if size > 5*1024*1024*1024 { // > 5GB - format = "plain" // Plain SQL format - compressionLevel = 0 // Disable pg_dump compression - parallel = 0 // Plain format doesn't support parallel - e.printf(" Using plain format + external compression (optimal for large DBs)\n") - } - } - - options := database.BackupOptions{ - Compression: compressionLevel, - Parallel: parallel, - Format: format, - Blobs: true, - NoOwner: false, - NoPrivileges: false, - } - - cmd := e.db.BuildBackupCommand(dbName, dumpFile, options) - - // Use a context with timeout for each database to prevent hangs - // Use longer timeout for huge databases (2 hours per database) - dbCtx, cancel := context.WithTimeout(ctx, 2*time.Hour) - defer cancel() // Ensure cancel is called even if executeCommand panics - err := e.executeCommand(dbCtx, cmd, dumpFile) - cancel() // Also call immediately for early cleanup - - if err != nil { - e.log.Warn("Failed to backup database", "database", dbName, "error", err) - e.printf(" ⚠️ WARNING: Failed to backup %s: %v\n", dbName, err) - failCount++ - // Continue with other databases - } else { - // If streaming compression was used the compressed file may have a different name - // (e.g. .sql.gz). Prefer compressed file size when present, fall back to dumpFile. - compressedCandidate := strings.TrimSuffix(dumpFile, ".dump") + ".sql.gz" - if info, err := os.Stat(compressedCandidate); err == nil { - e.printf(" ✅ Completed %s (%s)\n", dbName, formatBytes(info.Size())) - } else if info, err := os.Stat(dumpFile); err == nil { - e.printf(" ✅ Completed %s (%s)\n", dbName, formatBytes(info.Size())) - } - successCount++ - } + parallelism := e.cfg.ClusterParallelism + if parallelism < 1 { + parallelism = 1 // Ensure at least sequential } - e.printf(" Backup summary: %d succeeded, %d failed\n", successCount, failCount) + if parallelism == 1 { + e.printf(" Backing up %d databases sequentially...\n", len(databases)) + } else { + e.printf(" Backing up %d databases with %d parallel workers...\n", len(databases), parallelism) + } + + // Use worker pool for parallel backup + var successCount, failCount int32 + var mu sync.Mutex // Protect shared resources (printf, estimator) + + // Create semaphore to limit concurrency + semaphore := make(chan struct{}, parallelism) + var wg sync.WaitGroup + + for i, dbName := range databases { + wg.Add(1) + semaphore <- struct{}{} // Acquire + + go func(idx int, name string) { + defer wg.Done() + defer func() { <-semaphore }() // Release + + // Update estimator progress (thread-safe) + mu.Lock() + estimator.UpdateProgress(idx) + e.printf(" [%d/%d] Backing up database: %s\n", idx+1, len(databases), name) + quietProgress.Update(fmt.Sprintf("Backing up database %d/%d: %s", idx+1, len(databases), name)) + mu.Unlock() + + // Check database size and warn if very large + if size, err := e.db.GetDatabaseSize(ctx, name); err == nil { + sizeStr := formatBytes(size) + mu.Lock() + e.printf(" Database size: %s\n", sizeStr) + if size > 10*1024*1024*1024 { // > 10GB + e.printf(" ⚠️ Large database detected - this may take a while\n") + } + mu.Unlock() + } + + dumpFile := filepath.Join(tempDir, "dumps", name+".dump") + + compressionLevel := e.cfg.CompressionLevel + if compressionLevel > 6 { + compressionLevel = 6 + } + + format := "custom" + parallel := e.cfg.DumpJobs + + if size, err := e.db.GetDatabaseSize(ctx, name); err == nil { + if size > 5*1024*1024*1024 { + format = "plain" + compressionLevel = 0 + parallel = 0 + mu.Lock() + e.printf(" Using plain format + external compression (optimal for large DBs)\n") + mu.Unlock() + } + } + + options := database.BackupOptions{ + Compression: compressionLevel, + Parallel: parallel, + Format: format, + Blobs: true, + NoOwner: false, + NoPrivileges: false, + } + + cmd := e.db.BuildBackupCommand(name, dumpFile, options) + + dbCtx, cancel := context.WithTimeout(ctx, 2*time.Hour) + defer cancel() + err := e.executeCommand(dbCtx, cmd, dumpFile) + cancel() + + if err != nil { + e.log.Warn("Failed to backup database", "database", name, "error", err) + mu.Lock() + e.printf(" ⚠️ WARNING: Failed to backup %s: %v\n", name, err) + mu.Unlock() + atomic.AddInt32(&failCount, 1) + } else { + compressedCandidate := strings.TrimSuffix(dumpFile, ".dump") + ".sql.gz" + mu.Lock() + if info, err := os.Stat(compressedCandidate); err == nil { + e.printf(" ✅ Completed %s (%s)\n", name, formatBytes(info.Size())) + } else if info, err := os.Stat(dumpFile); err == nil { + e.printf(" ✅ Completed %s (%s)\n", name, formatBytes(info.Size())) + } + mu.Unlock() + atomic.AddInt32(&successCount, 1) + } + }(i, dbName) + } + + // Wait for all backups to complete + wg.Wait() + + successCountFinal := int(atomic.LoadInt32(&successCount)) + failCountFinal := int(atomic.LoadInt32(&failCount)) + + e.printf(" Backup summary: %d succeeded, %d failed\n", successCountFinal, failCountFinal) // Create archive e.printf(" Creating compressed archive...\n") diff --git a/internal/config/config.go b/internal/config/config.go index 82a7bfe..fdae46c 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -57,6 +57,9 @@ type Config struct { // Timeouts (in minutes) ClusterTimeoutMinutes int + // Cluster parallelism + ClusterParallelism int // Number of concurrent databases during cluster operations (0 = sequential) + // Swap file management (for large backups) SwapFilePath string // Path to temporary swap file SwapFileSizeGB int // Size in GB (0 = disabled) @@ -144,6 +147,9 @@ func New() *Config { // Timeouts ClusterTimeoutMinutes: getEnvInt("CLUSTER_TIMEOUT_MIN", 240), + // Cluster parallelism (default: 2 concurrent operations for faster cluster backup/restore) + ClusterParallelism: getEnvInt("CLUSTER_PARALLELISM", 2), + // Swap file management SwapFilePath: getEnvString("SWAP_FILE_PATH", "/tmp/dbbackup_swap"), SwapFileSizeGB: getEnvInt("SWAP_FILE_SIZE_GB", 0), // 0 = disabled by default diff --git a/internal/progress/progress.go b/internal/progress/progress.go index 75d2e97..51d0580 100644 --- a/internal/progress/progress.go +++ b/internal/progress/progress.go @@ -45,13 +45,16 @@ func (s *Spinner) Start(message string) { s.active = true go func() { + ticker := time.NewTicker(s.interval) + defer ticker.Stop() + i := 0 lastMessage := "" for { select { case <-s.stopCh: return - default: + case <-ticker.C: if s.active { displayMsg := s.message @@ -70,7 +73,6 @@ func (s *Spinner) Start(message string) { fmt.Fprintf(s.writer, "\r%s", currentFrame) } i++ - time.Sleep(s.interval) } } } @@ -132,12 +134,15 @@ func (d *Dots) Start(message string) { fmt.Fprint(d.writer, message) go func() { + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() + count := 0 for { select { case <-d.stopCh: return - default: + case <-ticker.C: if d.active { fmt.Fprint(d.writer, ".") count++ @@ -145,7 +150,6 @@ func (d *Dots) Start(message string) { // Reset dots fmt.Fprint(d.writer, "\r"+d.message) } - time.Sleep(500 * time.Millisecond) } } } diff --git a/internal/restore/engine.go b/internal/restore/engine.go index 0472e61..0074272 100644 --- a/internal/restore/engine.go +++ b/internal/restore/engine.go @@ -7,6 +7,8 @@ import ( "os/exec" "path/filepath" "strings" + "sync" + "sync/atomic" "time" "dbbackup/internal/config" @@ -489,8 +491,6 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string) error { return fmt.Errorf("failed to read dumps directory: %w", err) } - successCount := 0 - failCount := 0 var failedDBs []string totalDBs := 0 @@ -505,77 +505,110 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string) error { estimator := progress.NewETAEstimator("Restoring cluster", totalDBs) e.progress.SetEstimator(estimator) - for i, entry := range entries { + // Use worker pool for parallel restore + parallelism := e.cfg.ClusterParallelism + if parallelism < 1 { + parallelism = 1 // Ensure at least sequential + } + + var successCount, failCount int32 + var failedDBsMu sync.Mutex + var mu sync.Mutex // Protect shared resources (progress, logger) + + // Create semaphore to limit concurrency + semaphore := make(chan struct{}, parallelism) + var wg sync.WaitGroup + + dbIndex := 0 + for _, entry := range entries { if entry.IsDir() { continue } - // Update estimator progress - estimator.UpdateProgress(i) - - dumpFile := filepath.Join(dumpsDir, entry.Name()) - // Strip file extensions to get database name (.dump or .sql.gz) - dbName := entry.Name() - dbName = strings.TrimSuffix(dbName, ".dump") - dbName = strings.TrimSuffix(dbName, ".sql.gz") - - // Calculate progress percentage for logging - dbProgress := 15 + int(float64(i)/float64(totalDBs)*85.0) + wg.Add(1) + semaphore <- struct{}{} // Acquire - statusMsg := fmt.Sprintf("Restoring database %s (%d/%d)", dbName, i+1, totalDBs) - e.progress.Update(statusMsg) - e.log.Info("Restoring database", "name", dbName, "file", dumpFile, "progress", dbProgress) + go func(idx int, filename string) { + defer wg.Done() + defer func() { <-semaphore }() // Release + + // Update estimator progress (thread-safe) + mu.Lock() + estimator.UpdateProgress(idx) + mu.Unlock() - // STEP 1: Drop existing database completely (clean slate) - e.log.Info("Dropping existing database for clean restore", "name", dbName) - if err := e.dropDatabaseIfExists(ctx, dbName); err != nil { - e.log.Warn("Could not drop existing database", "name", dbName, "error", err) - // Continue anyway - database might not exist - } + dumpFile := filepath.Join(dumpsDir, filename) + dbName := filename + dbName = strings.TrimSuffix(dbName, ".dump") + dbName = strings.TrimSuffix(dbName, ".sql.gz") - // STEP 2: Create fresh database (pg_restore will handle ownership if we have privileges) - if err := e.ensureDatabaseExists(ctx, dbName); err != nil { - e.log.Error("Failed to create database", "name", dbName, "error", err) - failedDBs = append(failedDBs, fmt.Sprintf("%s: failed to create database: %v", dbName, err)) - failCount++ - continue - } + dbProgress := 15 + int(float64(idx)/float64(totalDBs)*85.0) + + mu.Lock() + statusMsg := fmt.Sprintf("Restoring database %s (%d/%d)", dbName, idx+1, totalDBs) + e.progress.Update(statusMsg) + e.log.Info("Restoring database", "name", dbName, "file", dumpFile, "progress", dbProgress) + mu.Unlock() - // STEP 3: Restore with ownership preservation if superuser - // Detect if this is a .sql.gz file (plain SQL) or .dump file (custom format) - preserveOwnership := isSuperuser - isCompressedSQL := strings.HasSuffix(dumpFile, ".sql.gz") + // STEP 1: Drop existing database completely (clean slate) + e.log.Info("Dropping existing database for clean restore", "name", dbName) + if err := e.dropDatabaseIfExists(ctx, dbName); err != nil { + e.log.Warn("Could not drop existing database", "name", dbName, "error", err) + } + + // STEP 2: Create fresh database + if err := e.ensureDatabaseExists(ctx, dbName); err != nil { + e.log.Error("Failed to create database", "name", dbName, "error", err) + failedDBsMu.Lock() + failedDBs = append(failedDBs, fmt.Sprintf("%s: failed to create database: %v", dbName, err)) + failedDBsMu.Unlock() + atomic.AddInt32(&failCount, 1) + return + } + + // STEP 3: Restore with ownership preservation if superuser + preserveOwnership := isSuperuser + isCompressedSQL := strings.HasSuffix(dumpFile, ".sql.gz") + + var restoreErr error + if isCompressedSQL { + e.log.Info("Detected compressed SQL format, using psql + gunzip", "file", dumpFile) + restoreErr = e.restorePostgreSQLSQL(ctx, dumpFile, dbName, true) + } else { + e.log.Info("Detected custom dump format, using pg_restore", "file", dumpFile) + restoreErr = e.restorePostgreSQLDumpWithOwnership(ctx, dumpFile, dbName, false, preserveOwnership) + } + + if restoreErr != nil { + e.log.Error("Failed to restore database", "name", dbName, "error", restoreErr) + failedDBsMu.Lock() + failedDBs = append(failedDBs, fmt.Sprintf("%s: %v", dbName, restoreErr)) + failedDBsMu.Unlock() + atomic.AddInt32(&failCount, 1) + return + } + + atomic.AddInt32(&successCount, 1) + }(dbIndex, entry.Name()) - var restoreErr error - if isCompressedSQL { - // Plain SQL compressed - use psql with gunzip - e.log.Info("Detected compressed SQL format, using psql + gunzip", "file", dumpFile) - restoreErr = e.restorePostgreSQLSQL(ctx, dumpFile, dbName, true) - } else { - // Custom format - use pg_restore - e.log.Info("Detected custom dump format, using pg_restore", "file", dumpFile) - restoreErr = e.restorePostgreSQLDumpWithOwnership(ctx, dumpFile, dbName, false, preserveOwnership) - } - - if restoreErr != nil { - e.log.Error("Failed to restore database", "name", dbName, "error", restoreErr) - failedDBs = append(failedDBs, fmt.Sprintf("%s: %v", dbName, restoreErr)) - failCount++ - continue - } - - successCount++ + dbIndex++ } + + // Wait for all restores to complete + wg.Wait() + + successCountFinal := int(atomic.LoadInt32(&successCount)) + failCountFinal := int(atomic.LoadInt32(&failCount)) - if failCount > 0 { + if failCountFinal > 0 { failedList := strings.Join(failedDBs, "; ") - e.progress.Fail(fmt.Sprintf("Cluster restore completed with errors: %d succeeded, %d failed", successCount, failCount)) - operation.Complete(fmt.Sprintf("Partial restore: %d succeeded, %d failed", successCount, failCount)) - return fmt.Errorf("cluster restore completed with %d failures: %s", failCount, failedList) + e.progress.Fail(fmt.Sprintf("Cluster restore completed with errors: %d succeeded, %d failed", successCountFinal, failCountFinal)) + operation.Complete(fmt.Sprintf("Partial restore: %d succeeded, %d failed", successCountFinal, failCountFinal)) + return fmt.Errorf("cluster restore completed with %d failures: %s", failCountFinal, failedList) } - e.progress.Complete(fmt.Sprintf("Cluster restored successfully: %d databases", successCount)) - operation.Complete(fmt.Sprintf("Restored %d databases from cluster archive", successCount)) + e.progress.Complete(fmt.Sprintf("Cluster restored successfully: %d databases", successCountFinal)) + operation.Complete(fmt.Sprintf("Restored %d databases from cluster archive", successCountFinal)) return nil }