From 698b8a761c2dc6b3368cc389bc5c7a8b1f3f0e59 Mon Sep 17 00:00:00 2001 From: Alexander Renz Date: Fri, 16 Jan 2026 18:31:12 +0100 Subject: [PATCH] feat(restore): add weighted progress, pre-extraction disk check, parallel-dbs flag Three high-value improvements for cluster restore: 1. Weighted progress by database size - Progress now shows percentage by data volume, not just count - Phase 3/3: Databases (2/7) - 45.2% by size - Gives more accurate ETA for clusters with varied DB sizes 2. Pre-extraction disk space check - Checks workdir has 3x archive size before extraction - Prevents partial extraction failures when disk fills mid-way - Clear error message with required vs available GB 3. --parallel-dbs flag for concurrent restores - dbbackup restore cluster archive.tar.gz --parallel-dbs=4 - Overrides CLUSTER_PARALLELISM config setting - Set to 1 for sequential restore (safest for large objects) --- bin/README.md | 4 +- cmd/restore.go | 8 ++++ internal/dedup/chunker_test.go | 2 +- internal/restore/engine.go | 72 +++++++++++++++++++++++++++++++--- internal/tui/restore_exec.go | 39 +++++++++++++++--- 5 files changed, 111 insertions(+), 14 deletions(-) diff --git a/bin/README.md b/bin/README.md index 8aa450c..1869d8d 100644 --- a/bin/README.md +++ b/bin/README.md @@ -4,8 +4,8 @@ This directory contains pre-compiled binaries for the DB Backup Tool across mult ## Build Information - **Version**: 3.42.50 -- **Build Time**: 2026-01-16_14:53:54_UTC -- **Git Commit**: 5728b46 +- **Build Time**: 2026-01-16_15:09:21_UTC +- **Git Commit**: dd7c4da ## Recent Updates (v1.1.0) - ✅ Fixed TUI progress display with line-by-line output diff --git a/cmd/restore.go b/cmd/restore.go index 4eabb6e..bee259e 100755 --- a/cmd/restore.go +++ b/cmd/restore.go @@ -28,6 +28,7 @@ var ( restoreClean bool restoreCreate bool restoreJobs int + restoreParallelDBs int // Number of parallel database restores restoreTarget string restoreVerbose bool restoreNoProgress bool @@ -289,6 +290,7 @@ func init() { restoreClusterCmd.Flags().BoolVar(&restoreForce, "force", false, "Skip safety checks and confirmations") restoreClusterCmd.Flags().BoolVar(&restoreCleanCluster, "clean-cluster", false, "Drop all existing user databases before restore (disaster recovery)") restoreClusterCmd.Flags().IntVar(&restoreJobs, "jobs", 0, "Number of parallel decompression jobs (0 = auto)") + restoreClusterCmd.Flags().IntVar(&restoreParallelDBs, "parallel-dbs", 0, "Number of databases to restore in parallel (0 = use config default, 1 = sequential)") restoreClusterCmd.Flags().StringVar(&restoreWorkdir, "workdir", "", "Working directory for extraction (use when system disk is small, e.g. /mnt/storage/restore_tmp)") restoreClusterCmd.Flags().BoolVar(&restoreVerbose, "verbose", false, "Show detailed restore progress") restoreClusterCmd.Flags().BoolVar(&restoreNoProgress, "no-progress", false, "Disable progress indicators") @@ -783,6 +785,12 @@ func runRestoreCluster(cmd *cobra.Command, args []string) error { } } + // Override cluster parallelism if --parallel-dbs is specified + if restoreParallelDBs > 0 { + cfg.ClusterParallelism = restoreParallelDBs + log.Info("Using custom parallelism for database restores", "parallel_dbs", restoreParallelDBs) + } + // Create restore engine engine := restore.New(cfg, log, db) diff --git a/internal/dedup/chunker_test.go b/internal/dedup/chunker_test.go index dee2f36..16b3854 100644 --- a/internal/dedup/chunker_test.go +++ b/internal/dedup/chunker_test.go @@ -103,7 +103,7 @@ func TestChunker_ShiftedData(t *testing.T) { // Test that shifted data still shares chunks (the key CDC benefit) // Use deterministic random data for reproducible test results rng := mathrand.New(mathrand.NewSource(42)) - + original := make([]byte, 100*1024) rng.Read(original) diff --git a/internal/restore/engine.go b/internal/restore/engine.go index daa077a..e956010 100755 --- a/internal/restore/engine.go +++ b/internal/restore/engine.go @@ -38,6 +38,10 @@ type DatabaseProgressCallback func(done, total int, dbName string) // Parameters: done count, total count, database name, elapsed time for current restore phase, avg duration per DB type DatabaseProgressWithTimingCallback func(done, total int, dbName string, phaseElapsed, avgPerDB time.Duration) +// DatabaseProgressByBytesCallback is called with progress weighted by database sizes (bytes) +// Parameters: bytes completed, total bytes, current database name, databases done count, total database count +type DatabaseProgressByBytesCallback func(bytesDone, bytesTotal int64, dbName string, dbDone, dbTotal int) + // Engine handles database restore operations type Engine struct { cfg *config.Config @@ -49,9 +53,10 @@ type Engine struct { debugLogPath string // Path to save debug log on error // TUI progress callback for detailed progress reporting - progressCallback ProgressCallback - dbProgressCallback DatabaseProgressCallback - dbProgressTimingCallback DatabaseProgressWithTimingCallback + progressCallback ProgressCallback + dbProgressCallback DatabaseProgressCallback + dbProgressTimingCallback DatabaseProgressWithTimingCallback + dbProgressByBytesCallback DatabaseProgressByBytesCallback } // New creates a new restore engine @@ -122,6 +127,11 @@ func (e *Engine) SetDatabaseProgressWithTimingCallback(cb DatabaseProgressWithTi e.dbProgressTimingCallback = cb } +// SetDatabaseProgressByBytesCallback sets a callback for progress weighted by database sizes +func (e *Engine) SetDatabaseProgressByBytesCallback(cb DatabaseProgressByBytesCallback) { + e.dbProgressByBytesCallback = cb +} + // reportProgress safely calls the progress callback if set func (e *Engine) reportProgress(current, total int64, description string) { if e.progressCallback != nil { @@ -143,6 +153,13 @@ func (e *Engine) reportDatabaseProgressWithTiming(done, total int, dbName string } } +// reportDatabaseProgressByBytes safely calls the bytes-weighted callback if set +func (e *Engine) reportDatabaseProgressByBytes(bytesDone, bytesTotal int64, dbName string, dbDone, dbTotal int) { + if e.dbProgressByBytesCallback != nil { + e.dbProgressByBytesCallback(bytesDone, bytesTotal, dbName, dbDone, dbTotal) + } +} + // loggerAdapter adapts our logger to the progress.Logger interface type loggerAdapter struct { logger logger.Logger @@ -861,6 +878,25 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string) error { // Create temporary extraction directory in configured WorkDir workDir := e.cfg.GetEffectiveWorkDir() tempDir := filepath.Join(workDir, fmt.Sprintf(".restore_%d", time.Now().Unix())) + + // Check disk space for extraction (need ~3x archive size: compressed + extracted + working space) + if archiveInfo != nil { + requiredBytes := uint64(archiveInfo.Size()) * 3 + extractionCheck := checks.CheckDiskSpace(workDir) + if extractionCheck.AvailableBytes < requiredBytes { + operation.Fail("Insufficient disk space for extraction") + return fmt.Errorf("insufficient disk space for extraction in %s: need %.1f GB, have %.1f GB (archive size: %.1f GB × 3)", + workDir, + float64(requiredBytes)/(1024*1024*1024), + float64(extractionCheck.AvailableBytes)/(1024*1024*1024), + float64(archiveInfo.Size())/(1024*1024*1024)) + } + e.log.Info("Disk space check for extraction passed", + "workdir", workDir, + "required_gb", float64(requiredBytes)/(1024*1024*1024), + "available_gb", float64(extractionCheck.AvailableBytes)/(1024*1024*1024)) + } + if err := os.MkdirAll(tempDir, 0755); err != nil { operation.Fail("Failed to create temporary directory") return fmt.Errorf("failed to create temp directory in %s: %w", workDir, err) @@ -1024,12 +1060,27 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string) error { var restoreErrorsMu sync.Mutex totalDBs := 0 - // Count total databases + // Count total databases and calculate total bytes for weighted progress + var totalBytes int64 + dbSizes := make(map[string]int64) // Map database name to dump file size for _, entry := range entries { if !entry.IsDir() { totalDBs++ + dumpFile := filepath.Join(dumpsDir, entry.Name()) + if info, err := os.Stat(dumpFile); err == nil { + dbName := entry.Name() + dbName = strings.TrimSuffix(dbName, ".dump") + dbName = strings.TrimSuffix(dbName, ".sql.gz") + dbSizes[dbName] = info.Size() + totalBytes += info.Size() + } } } + e.log.Info("Calculated total restore size", "databases", totalDBs, "total_bytes", totalBytes) + + // Track bytes completed for weighted progress + var bytesCompleted int64 + var bytesCompletedMu sync.Mutex // Create ETA estimator for database restores estimator := progress.NewETAEstimator("Restoring cluster", totalDBs) @@ -1202,8 +1253,19 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string) error { completedDBTimes = append(completedDBTimes, dbRestoreDuration) completedDBTimesMu.Unlock() + // Update bytes completed for weighted progress + dbSize := dbSizes[dbName] + bytesCompletedMu.Lock() + bytesCompleted += dbSize + currentBytesCompleted := bytesCompleted + currentSuccessCount := int(atomic.LoadInt32(&successCount)) + 1 // +1 because we're about to increment + bytesCompletedMu.Unlock() + + // Report weighted progress (bytes-based) + e.reportDatabaseProgressByBytes(currentBytesCompleted, totalBytes, dbName, currentSuccessCount, totalDBs) + atomic.AddInt32(&successCount, 1) - + // Small delay to ensure PostgreSQL fully closes connections before next restore time.Sleep(100 * time.Millisecond) }(dbIndex, entry.Name()) diff --git a/internal/tui/restore_exec.go b/internal/tui/restore_exec.go index e120151..2206598 100755 --- a/internal/tui/restore_exec.go +++ b/internal/tui/restore_exec.go @@ -159,6 +159,10 @@ type sharedProgressState struct { overallPhase int extractionDone bool + // Weighted progress by database sizes (bytes) + dbBytesTotal int64 // Total bytes across all databases + dbBytesDone int64 // Bytes completed (sum of finished DB sizes) + // Rolling window for speed calculation speedSamples []restoreSpeedSample } @@ -186,12 +190,12 @@ func clearCurrentRestoreProgress() { currentRestoreProgressState = nil } -func getCurrentRestoreProgress() (bytesTotal, bytesDone int64, description string, hasUpdate bool, dbTotal, dbDone int, speed float64, dbPhaseElapsed, dbAvgPerDB time.Duration, currentDB string, overallPhase int, extractionDone bool) { +func getCurrentRestoreProgress() (bytesTotal, bytesDone int64, description string, hasUpdate bool, dbTotal, dbDone int, speed float64, dbPhaseElapsed, dbAvgPerDB time.Duration, currentDB string, overallPhase int, extractionDone bool, dbBytesTotal, dbBytesDone int64) { currentRestoreProgressMu.Lock() defer currentRestoreProgressMu.Unlock() if currentRestoreProgressState == nil { - return 0, 0, "", false, 0, 0, 0, 0, 0, "", 0, false + return 0, 0, "", false, 0, 0, 0, 0, 0, "", 0, false, 0, 0 } currentRestoreProgressState.mu.Lock() @@ -205,7 +209,8 @@ func getCurrentRestoreProgress() (bytesTotal, bytesDone int64, description strin currentRestoreProgressState.dbTotal, currentRestoreProgressState.dbDone, speed, currentRestoreProgressState.dbPhaseElapsed, currentRestoreProgressState.dbAvgPerDB, currentRestoreProgressState.currentDB, currentRestoreProgressState.overallPhase, - currentRestoreProgressState.extractionDone + currentRestoreProgressState.extractionDone, + currentRestoreProgressState.dbBytesTotal, currentRestoreProgressState.dbBytesDone } // calculateRollingSpeed calculates speed from recent samples (last 5 seconds) @@ -359,6 +364,20 @@ func executeRestoreWithTUIProgress(parentCtx context.Context, cfg *config.Config progressState.bytesDone = 0 }) + // Set up weighted (bytes-based) progress callback for accurate cluster restore progress + engine.SetDatabaseProgressByBytesCallback(func(bytesDone, bytesTotal int64, dbName string, dbDone, dbTotal int) { + progressState.mu.Lock() + defer progressState.mu.Unlock() + progressState.dbBytesDone = bytesDone + progressState.dbBytesTotal = bytesTotal + progressState.dbDone = dbDone + progressState.dbTotal = dbTotal + progressState.currentDB = dbName + progressState.overallPhase = 3 + progressState.extractionDone = true + progressState.hasUpdate = true + }) + // Store progress state in a package-level variable for the ticker to access // This is a workaround because tea messages can't be sent from callbacks setCurrentRestoreProgress(progressState) @@ -412,7 +431,7 @@ func (m RestoreExecutionModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { m.elapsed = time.Since(m.startTime) // Poll shared progress state for real-time updates - bytesTotal, bytesDone, description, hasUpdate, dbTotal, dbDone, speed, dbPhaseElapsed, dbAvgPerDB, currentDB, overallPhase, extractionDone := getCurrentRestoreProgress() + bytesTotal, bytesDone, description, hasUpdate, dbTotal, dbDone, speed, dbPhaseElapsed, dbAvgPerDB, currentDB, overallPhase, extractionDone, dbBytesTotal, dbBytesDone := getCurrentRestoreProgress() if hasUpdate && bytesTotal > 0 && !extractionDone { // Phase 1: Extraction m.bytesTotal = bytesTotal @@ -443,8 +462,16 @@ func (m RestoreExecutionModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { } else { m.status = "Finalizing..." } - m.phase = fmt.Sprintf("Phase 3/3: Databases (%d/%d)", dbDone, dbTotal) - m.progress = int((dbDone * 100) / dbTotal) + + // Use weighted progress by bytes if available, otherwise use count + if dbBytesTotal > 0 { + weightedPercent := int((dbBytesDone * 100) / dbBytesTotal) + m.phase = fmt.Sprintf("Phase 3/3: Databases (%d/%d) - %.1f%% by size", dbDone, dbTotal, float64(dbBytesDone*100)/float64(dbBytesTotal)) + m.progress = weightedPercent + } else { + m.phase = fmt.Sprintf("Phase 3/3: Databases (%d/%d)", dbDone, dbTotal) + m.progress = int((dbDone * 100) / dbTotal) + } } else if hasUpdate && extractionDone && dbTotal == 0 { // Phase 2: Globals restore (brief phase between extraction and databases) m.overallPhase = 2