feat(restore): add weighted progress, pre-extraction disk check, parallel-dbs flag
All checks were successful
CI/CD / Test (push) Successful in 1m20s
CI/CD / Lint (push) Successful in 1m32s
CI/CD / Build & Release (push) Successful in 3m19s

Three high-value improvements for cluster restore:

1. Weighted progress by database size
   - Progress now shows percentage by data volume, not just count
   - Phase 3/3: Databases (2/7) - 45.2% by size
   - Gives more accurate ETA for clusters with varied DB sizes

2. Pre-extraction disk space check
   - Checks workdir has 3x archive size before extraction
   - Prevents partial extraction failures when disk fills mid-way
   - Clear error message with required vs available GB

3. --parallel-dbs flag for concurrent restores
   - dbbackup restore cluster archive.tar.gz --parallel-dbs=4
   - Overrides CLUSTER_PARALLELISM config setting
   - Set to 1 for sequential restore (safest for large objects)
This commit is contained in:
2026-01-16 18:31:12 +01:00
parent dd7c4da0eb
commit 698b8a761c
5 changed files with 111 additions and 14 deletions

View File

@@ -4,8 +4,8 @@ This directory contains pre-compiled binaries for the DB Backup Tool across mult
## Build Information ## Build Information
- **Version**: 3.42.50 - **Version**: 3.42.50
- **Build Time**: 2026-01-16_14:53:54_UTC - **Build Time**: 2026-01-16_15:09:21_UTC
- **Git Commit**: 5728b46 - **Git Commit**: dd7c4da
## Recent Updates (v1.1.0) ## Recent Updates (v1.1.0)
- ✅ Fixed TUI progress display with line-by-line output - ✅ Fixed TUI progress display with line-by-line output

View File

@@ -28,6 +28,7 @@ var (
restoreClean bool restoreClean bool
restoreCreate bool restoreCreate bool
restoreJobs int restoreJobs int
restoreParallelDBs int // Number of parallel database restores
restoreTarget string restoreTarget string
restoreVerbose bool restoreVerbose bool
restoreNoProgress bool restoreNoProgress bool
@@ -289,6 +290,7 @@ func init() {
restoreClusterCmd.Flags().BoolVar(&restoreForce, "force", false, "Skip safety checks and confirmations") restoreClusterCmd.Flags().BoolVar(&restoreForce, "force", false, "Skip safety checks and confirmations")
restoreClusterCmd.Flags().BoolVar(&restoreCleanCluster, "clean-cluster", false, "Drop all existing user databases before restore (disaster recovery)") restoreClusterCmd.Flags().BoolVar(&restoreCleanCluster, "clean-cluster", false, "Drop all existing user databases before restore (disaster recovery)")
restoreClusterCmd.Flags().IntVar(&restoreJobs, "jobs", 0, "Number of parallel decompression jobs (0 = auto)") restoreClusterCmd.Flags().IntVar(&restoreJobs, "jobs", 0, "Number of parallel decompression jobs (0 = auto)")
restoreClusterCmd.Flags().IntVar(&restoreParallelDBs, "parallel-dbs", 0, "Number of databases to restore in parallel (0 = use config default, 1 = sequential)")
restoreClusterCmd.Flags().StringVar(&restoreWorkdir, "workdir", "", "Working directory for extraction (use when system disk is small, e.g. /mnt/storage/restore_tmp)") restoreClusterCmd.Flags().StringVar(&restoreWorkdir, "workdir", "", "Working directory for extraction (use when system disk is small, e.g. /mnt/storage/restore_tmp)")
restoreClusterCmd.Flags().BoolVar(&restoreVerbose, "verbose", false, "Show detailed restore progress") restoreClusterCmd.Flags().BoolVar(&restoreVerbose, "verbose", false, "Show detailed restore progress")
restoreClusterCmd.Flags().BoolVar(&restoreNoProgress, "no-progress", false, "Disable progress indicators") restoreClusterCmd.Flags().BoolVar(&restoreNoProgress, "no-progress", false, "Disable progress indicators")
@@ -783,6 +785,12 @@ func runRestoreCluster(cmd *cobra.Command, args []string) error {
} }
} }
// Override cluster parallelism if --parallel-dbs is specified
if restoreParallelDBs > 0 {
cfg.ClusterParallelism = restoreParallelDBs
log.Info("Using custom parallelism for database restores", "parallel_dbs", restoreParallelDBs)
}
// Create restore engine // Create restore engine
engine := restore.New(cfg, log, db) engine := restore.New(cfg, log, db)

View File

@@ -38,6 +38,10 @@ type DatabaseProgressCallback func(done, total int, dbName string)
// Parameters: done count, total count, database name, elapsed time for current restore phase, avg duration per DB // Parameters: done count, total count, database name, elapsed time for current restore phase, avg duration per DB
type DatabaseProgressWithTimingCallback func(done, total int, dbName string, phaseElapsed, avgPerDB time.Duration) type DatabaseProgressWithTimingCallback func(done, total int, dbName string, phaseElapsed, avgPerDB time.Duration)
// DatabaseProgressByBytesCallback is called with progress weighted by database sizes (bytes)
// Parameters: bytes completed, total bytes, current database name, databases done count, total database count
type DatabaseProgressByBytesCallback func(bytesDone, bytesTotal int64, dbName string, dbDone, dbTotal int)
// Engine handles database restore operations // Engine handles database restore operations
type Engine struct { type Engine struct {
cfg *config.Config cfg *config.Config
@@ -52,6 +56,7 @@ type Engine struct {
progressCallback ProgressCallback progressCallback ProgressCallback
dbProgressCallback DatabaseProgressCallback dbProgressCallback DatabaseProgressCallback
dbProgressTimingCallback DatabaseProgressWithTimingCallback dbProgressTimingCallback DatabaseProgressWithTimingCallback
dbProgressByBytesCallback DatabaseProgressByBytesCallback
} }
// New creates a new restore engine // New creates a new restore engine
@@ -122,6 +127,11 @@ func (e *Engine) SetDatabaseProgressWithTimingCallback(cb DatabaseProgressWithTi
e.dbProgressTimingCallback = cb e.dbProgressTimingCallback = cb
} }
// SetDatabaseProgressByBytesCallback sets a callback for progress weighted by database sizes
func (e *Engine) SetDatabaseProgressByBytesCallback(cb DatabaseProgressByBytesCallback) {
e.dbProgressByBytesCallback = cb
}
// reportProgress safely calls the progress callback if set // reportProgress safely calls the progress callback if set
func (e *Engine) reportProgress(current, total int64, description string) { func (e *Engine) reportProgress(current, total int64, description string) {
if e.progressCallback != nil { if e.progressCallback != nil {
@@ -143,6 +153,13 @@ func (e *Engine) reportDatabaseProgressWithTiming(done, total int, dbName string
} }
} }
// reportDatabaseProgressByBytes safely calls the bytes-weighted callback if set
func (e *Engine) reportDatabaseProgressByBytes(bytesDone, bytesTotal int64, dbName string, dbDone, dbTotal int) {
if e.dbProgressByBytesCallback != nil {
e.dbProgressByBytesCallback(bytesDone, bytesTotal, dbName, dbDone, dbTotal)
}
}
// loggerAdapter adapts our logger to the progress.Logger interface // loggerAdapter adapts our logger to the progress.Logger interface
type loggerAdapter struct { type loggerAdapter struct {
logger logger.Logger logger logger.Logger
@@ -861,6 +878,25 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string) error {
// Create temporary extraction directory in configured WorkDir // Create temporary extraction directory in configured WorkDir
workDir := e.cfg.GetEffectiveWorkDir() workDir := e.cfg.GetEffectiveWorkDir()
tempDir := filepath.Join(workDir, fmt.Sprintf(".restore_%d", time.Now().Unix())) tempDir := filepath.Join(workDir, fmt.Sprintf(".restore_%d", time.Now().Unix()))
// Check disk space for extraction (need ~3x archive size: compressed + extracted + working space)
if archiveInfo != nil {
requiredBytes := uint64(archiveInfo.Size()) * 3
extractionCheck := checks.CheckDiskSpace(workDir)
if extractionCheck.AvailableBytes < requiredBytes {
operation.Fail("Insufficient disk space for extraction")
return fmt.Errorf("insufficient disk space for extraction in %s: need %.1f GB, have %.1f GB (archive size: %.1f GB × 3)",
workDir,
float64(requiredBytes)/(1024*1024*1024),
float64(extractionCheck.AvailableBytes)/(1024*1024*1024),
float64(archiveInfo.Size())/(1024*1024*1024))
}
e.log.Info("Disk space check for extraction passed",
"workdir", workDir,
"required_gb", float64(requiredBytes)/(1024*1024*1024),
"available_gb", float64(extractionCheck.AvailableBytes)/(1024*1024*1024))
}
if err := os.MkdirAll(tempDir, 0755); err != nil { if err := os.MkdirAll(tempDir, 0755); err != nil {
operation.Fail("Failed to create temporary directory") operation.Fail("Failed to create temporary directory")
return fmt.Errorf("failed to create temp directory in %s: %w", workDir, err) return fmt.Errorf("failed to create temp directory in %s: %w", workDir, err)
@@ -1024,12 +1060,27 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string) error {
var restoreErrorsMu sync.Mutex var restoreErrorsMu sync.Mutex
totalDBs := 0 totalDBs := 0
// Count total databases // Count total databases and calculate total bytes for weighted progress
var totalBytes int64
dbSizes := make(map[string]int64) // Map database name to dump file size
for _, entry := range entries { for _, entry := range entries {
if !entry.IsDir() { if !entry.IsDir() {
totalDBs++ totalDBs++
dumpFile := filepath.Join(dumpsDir, entry.Name())
if info, err := os.Stat(dumpFile); err == nil {
dbName := entry.Name()
dbName = strings.TrimSuffix(dbName, ".dump")
dbName = strings.TrimSuffix(dbName, ".sql.gz")
dbSizes[dbName] = info.Size()
totalBytes += info.Size()
} }
} }
}
e.log.Info("Calculated total restore size", "databases", totalDBs, "total_bytes", totalBytes)
// Track bytes completed for weighted progress
var bytesCompleted int64
var bytesCompletedMu sync.Mutex
// Create ETA estimator for database restores // Create ETA estimator for database restores
estimator := progress.NewETAEstimator("Restoring cluster", totalDBs) estimator := progress.NewETAEstimator("Restoring cluster", totalDBs)
@@ -1202,6 +1253,17 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string) error {
completedDBTimes = append(completedDBTimes, dbRestoreDuration) completedDBTimes = append(completedDBTimes, dbRestoreDuration)
completedDBTimesMu.Unlock() completedDBTimesMu.Unlock()
// Update bytes completed for weighted progress
dbSize := dbSizes[dbName]
bytesCompletedMu.Lock()
bytesCompleted += dbSize
currentBytesCompleted := bytesCompleted
currentSuccessCount := int(atomic.LoadInt32(&successCount)) + 1 // +1 because we're about to increment
bytesCompletedMu.Unlock()
// Report weighted progress (bytes-based)
e.reportDatabaseProgressByBytes(currentBytesCompleted, totalBytes, dbName, currentSuccessCount, totalDBs)
atomic.AddInt32(&successCount, 1) atomic.AddInt32(&successCount, 1)
// Small delay to ensure PostgreSQL fully closes connections before next restore // Small delay to ensure PostgreSQL fully closes connections before next restore

View File

@@ -159,6 +159,10 @@ type sharedProgressState struct {
overallPhase int overallPhase int
extractionDone bool extractionDone bool
// Weighted progress by database sizes (bytes)
dbBytesTotal int64 // Total bytes across all databases
dbBytesDone int64 // Bytes completed (sum of finished DB sizes)
// Rolling window for speed calculation // Rolling window for speed calculation
speedSamples []restoreSpeedSample speedSamples []restoreSpeedSample
} }
@@ -186,12 +190,12 @@ func clearCurrentRestoreProgress() {
currentRestoreProgressState = nil currentRestoreProgressState = nil
} }
func getCurrentRestoreProgress() (bytesTotal, bytesDone int64, description string, hasUpdate bool, dbTotal, dbDone int, speed float64, dbPhaseElapsed, dbAvgPerDB time.Duration, currentDB string, overallPhase int, extractionDone bool) { func getCurrentRestoreProgress() (bytesTotal, bytesDone int64, description string, hasUpdate bool, dbTotal, dbDone int, speed float64, dbPhaseElapsed, dbAvgPerDB time.Duration, currentDB string, overallPhase int, extractionDone bool, dbBytesTotal, dbBytesDone int64) {
currentRestoreProgressMu.Lock() currentRestoreProgressMu.Lock()
defer currentRestoreProgressMu.Unlock() defer currentRestoreProgressMu.Unlock()
if currentRestoreProgressState == nil { if currentRestoreProgressState == nil {
return 0, 0, "", false, 0, 0, 0, 0, 0, "", 0, false return 0, 0, "", false, 0, 0, 0, 0, 0, "", 0, false, 0, 0
} }
currentRestoreProgressState.mu.Lock() currentRestoreProgressState.mu.Lock()
@@ -205,7 +209,8 @@ func getCurrentRestoreProgress() (bytesTotal, bytesDone int64, description strin
currentRestoreProgressState.dbTotal, currentRestoreProgressState.dbDone, speed, currentRestoreProgressState.dbTotal, currentRestoreProgressState.dbDone, speed,
currentRestoreProgressState.dbPhaseElapsed, currentRestoreProgressState.dbAvgPerDB, currentRestoreProgressState.dbPhaseElapsed, currentRestoreProgressState.dbAvgPerDB,
currentRestoreProgressState.currentDB, currentRestoreProgressState.overallPhase, currentRestoreProgressState.currentDB, currentRestoreProgressState.overallPhase,
currentRestoreProgressState.extractionDone currentRestoreProgressState.extractionDone,
currentRestoreProgressState.dbBytesTotal, currentRestoreProgressState.dbBytesDone
} }
// calculateRollingSpeed calculates speed from recent samples (last 5 seconds) // calculateRollingSpeed calculates speed from recent samples (last 5 seconds)
@@ -359,6 +364,20 @@ func executeRestoreWithTUIProgress(parentCtx context.Context, cfg *config.Config
progressState.bytesDone = 0 progressState.bytesDone = 0
}) })
// Set up weighted (bytes-based) progress callback for accurate cluster restore progress
engine.SetDatabaseProgressByBytesCallback(func(bytesDone, bytesTotal int64, dbName string, dbDone, dbTotal int) {
progressState.mu.Lock()
defer progressState.mu.Unlock()
progressState.dbBytesDone = bytesDone
progressState.dbBytesTotal = bytesTotal
progressState.dbDone = dbDone
progressState.dbTotal = dbTotal
progressState.currentDB = dbName
progressState.overallPhase = 3
progressState.extractionDone = true
progressState.hasUpdate = true
})
// Store progress state in a package-level variable for the ticker to access // Store progress state in a package-level variable for the ticker to access
// This is a workaround because tea messages can't be sent from callbacks // This is a workaround because tea messages can't be sent from callbacks
setCurrentRestoreProgress(progressState) setCurrentRestoreProgress(progressState)
@@ -412,7 +431,7 @@ func (m RestoreExecutionModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
m.elapsed = time.Since(m.startTime) m.elapsed = time.Since(m.startTime)
// Poll shared progress state for real-time updates // Poll shared progress state for real-time updates
bytesTotal, bytesDone, description, hasUpdate, dbTotal, dbDone, speed, dbPhaseElapsed, dbAvgPerDB, currentDB, overallPhase, extractionDone := getCurrentRestoreProgress() bytesTotal, bytesDone, description, hasUpdate, dbTotal, dbDone, speed, dbPhaseElapsed, dbAvgPerDB, currentDB, overallPhase, extractionDone, dbBytesTotal, dbBytesDone := getCurrentRestoreProgress()
if hasUpdate && bytesTotal > 0 && !extractionDone { if hasUpdate && bytesTotal > 0 && !extractionDone {
// Phase 1: Extraction // Phase 1: Extraction
m.bytesTotal = bytesTotal m.bytesTotal = bytesTotal
@@ -443,8 +462,16 @@ func (m RestoreExecutionModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
} else { } else {
m.status = "Finalizing..." m.status = "Finalizing..."
} }
// Use weighted progress by bytes if available, otherwise use count
if dbBytesTotal > 0 {
weightedPercent := int((dbBytesDone * 100) / dbBytesTotal)
m.phase = fmt.Sprintf("Phase 3/3: Databases (%d/%d) - %.1f%% by size", dbDone, dbTotal, float64(dbBytesDone*100)/float64(dbBytesTotal))
m.progress = weightedPercent
} else {
m.phase = fmt.Sprintf("Phase 3/3: Databases (%d/%d)", dbDone, dbTotal) m.phase = fmt.Sprintf("Phase 3/3: Databases (%d/%d)", dbDone, dbTotal)
m.progress = int((dbDone * 100) / dbTotal) m.progress = int((dbDone * 100) / dbTotal)
}
} else if hasUpdate && extractionDone && dbTotal == 0 { } else if hasUpdate && extractionDone && dbTotal == 0 {
// Phase 2: Globals restore (brief phase between extraction and databases) // Phase 2: Globals restore (brief phase between extraction and databases)
m.overallPhase = 2 m.overallPhase = 2