Compare commits

..

4 Commits

Author SHA1 Message Date
dd1db844ce fix: improve lock capacity calculation for smaller VMs
All checks were successful
CI/CD / Test (push) Successful in 1m16s
CI/CD / Lint (push) Successful in 1m25s
CI/CD / Build & Release (push) Successful in 3m13s
- Fix boostLockCapacity: max_locks_per_transaction requires RESTART, not reload
- Calculate total lock capacity: max_locks × (max_connections + max_prepared_txns)
- Add TotalLockCapacity to preflight checks with warning if < 200,000
- Update error hints to explain capacity formula and recommend 4096+ for small VMs
- Show max_connections and total capacity in preflight summary

Fixes OOM 'out of shared memory' errors on VMs with reduced resources
2026-01-17 07:48:17 +01:00
4ea3ec2cf8 fix(preflight): improve BLOB count detection and block restore when max_locks_per_transaction is critically low
All checks were successful
CI/CD / Test (push) Successful in 1m15s
CI/CD / Lint (push) Successful in 1m26s
CI/CD / Build & Release (push) Successful in 3m14s
- Add higher lock boost tiers for extreme BLOB counts (100K+, 200K+)
- Calculate actual lock requirement: (totalBLOBs / max_connections) * 1.5
- Block restore with CRITICAL error when BLOB count exceeds 2x safe limit
- Improve preflight summary with PASSED/FAILED status display
- Add clearer fix instructions for max_locks_per_transaction
2026-01-17 07:25:45 +01:00
9200024e50 fix(restore): add context validity checks to debug cancellation issues
All checks were successful
CI/CD / Test (push) Successful in 1m21s
CI/CD / Lint (push) Successful in 1m28s
CI/CD / Build & Release (push) Successful in 3m17s
Added explicit context checks at critical points:
1. After extraction completes - logs error if context was cancelled
2. Before database restore loop starts - catches premature cancellation

This helps diagnose issues where all database restores fail with
'context cancelled' even though extraction completed successfully.

The user reported this happening after 4h20m extraction - all 6 DBs
showed 'restore skipped (context cancelled)'. These checks will log
exactly when/where the context becomes invalid.
2026-01-16 19:36:52 +01:00
698b8a761c feat(restore): add weighted progress, pre-extraction disk check, parallel-dbs flag
All checks were successful
CI/CD / Test (push) Successful in 1m20s
CI/CD / Lint (push) Successful in 1m32s
CI/CD / Build & Release (push) Successful in 3m19s
Three high-value improvements for cluster restore:

1. Weighted progress by database size
   - Progress now shows percentage by data volume, not just count
   - Phase 3/3: Databases (2/7) - 45.2% by size
   - Gives more accurate ETA for clusters with varied DB sizes

2. Pre-extraction disk space check
   - Checks workdir has 3x archive size before extraction
   - Prevents partial extraction failures when disk fills mid-way
   - Clear error message with required vs available GB

3. --parallel-dbs flag for concurrent restores
   - dbbackup restore cluster archive.tar.gz --parallel-dbs=4
   - Overrides CLUSTER_PARALLELISM config setting
   - Set to 1 for sequential restore (safest for large objects)
2026-01-16 18:31:12 +01:00
7 changed files with 326 additions and 44 deletions

View File

@@ -4,8 +4,8 @@ This directory contains pre-compiled binaries for the DB Backup Tool across mult
## Build Information ## Build Information
- **Version**: 3.42.50 - **Version**: 3.42.50
- **Build Time**: 2026-01-16_14:53:54_UTC - **Build Time**: 2026-01-17_06:25:57_UTC
- **Git Commit**: 5728b46 - **Git Commit**: 4ea3ec2
## Recent Updates (v1.1.0) ## Recent Updates (v1.1.0)
- ✅ Fixed TUI progress display with line-by-line output - ✅ Fixed TUI progress display with line-by-line output

View File

@@ -28,6 +28,7 @@ var (
restoreClean bool restoreClean bool
restoreCreate bool restoreCreate bool
restoreJobs int restoreJobs int
restoreParallelDBs int // Number of parallel database restores
restoreTarget string restoreTarget string
restoreVerbose bool restoreVerbose bool
restoreNoProgress bool restoreNoProgress bool
@@ -289,6 +290,7 @@ func init() {
restoreClusterCmd.Flags().BoolVar(&restoreForce, "force", false, "Skip safety checks and confirmations") restoreClusterCmd.Flags().BoolVar(&restoreForce, "force", false, "Skip safety checks and confirmations")
restoreClusterCmd.Flags().BoolVar(&restoreCleanCluster, "clean-cluster", false, "Drop all existing user databases before restore (disaster recovery)") restoreClusterCmd.Flags().BoolVar(&restoreCleanCluster, "clean-cluster", false, "Drop all existing user databases before restore (disaster recovery)")
restoreClusterCmd.Flags().IntVar(&restoreJobs, "jobs", 0, "Number of parallel decompression jobs (0 = auto)") restoreClusterCmd.Flags().IntVar(&restoreJobs, "jobs", 0, "Number of parallel decompression jobs (0 = auto)")
restoreClusterCmd.Flags().IntVar(&restoreParallelDBs, "parallel-dbs", 0, "Number of databases to restore in parallel (0 = use config default, 1 = sequential)")
restoreClusterCmd.Flags().StringVar(&restoreWorkdir, "workdir", "", "Working directory for extraction (use when system disk is small, e.g. /mnt/storage/restore_tmp)") restoreClusterCmd.Flags().StringVar(&restoreWorkdir, "workdir", "", "Working directory for extraction (use when system disk is small, e.g. /mnt/storage/restore_tmp)")
restoreClusterCmd.Flags().BoolVar(&restoreVerbose, "verbose", false, "Show detailed restore progress") restoreClusterCmd.Flags().BoolVar(&restoreVerbose, "verbose", false, "Show detailed restore progress")
restoreClusterCmd.Flags().BoolVar(&restoreNoProgress, "no-progress", false, "Disable progress indicators") restoreClusterCmd.Flags().BoolVar(&restoreNoProgress, "no-progress", false, "Disable progress indicators")
@@ -783,6 +785,12 @@ func runRestoreCluster(cmd *cobra.Command, args []string) error {
} }
} }
// Override cluster parallelism if --parallel-dbs is specified
if restoreParallelDBs > 0 {
cfg.ClusterParallelism = restoreParallelDBs
log.Info("Using custom parallelism for database restores", "parallel_dbs", restoreParallelDBs)
}
// Create restore engine // Create restore engine
engine := restore.New(cfg, log, db) engine := restore.New(cfg, log, db)

View File

@@ -68,8 +68,8 @@ func ClassifyError(errorMsg string) *ErrorClassification {
Type: "critical", Type: "critical",
Category: "locks", Category: "locks",
Message: errorMsg, Message: errorMsg,
Hint: "Lock table exhausted - typically caused by large objects (BLOBs) during restore", Hint: "Lock table exhausted. Total capacity = max_locks_per_transaction × (max_connections + max_prepared_transactions). If you reduced VM size or max_connections, you need higher max_locks_per_transaction to compensate.",
Action: "Option 1: Increase max_locks_per_transaction to 1024+ in postgresql.conf (requires restart). Option 2: Update dbbackup and retry - phased restore now auto-enabled for BLOB databases", Action: "Fix: ALTER SYSTEM SET max_locks_per_transaction = 4096; then RESTART PostgreSQL. For smaller VMs with fewer connections, you need higher max_locks_per_transaction values.",
Severity: 2, Severity: 2,
} }
case "permission_denied": case "permission_denied":
@@ -142,8 +142,8 @@ func ClassifyError(errorMsg string) *ErrorClassification {
Type: "critical", Type: "critical",
Category: "locks", Category: "locks",
Message: errorMsg, Message: errorMsg,
Hint: "Lock table exhausted - typically caused by large objects (BLOBs) during restore", Hint: "Lock table exhausted. Total capacity = max_locks_per_transaction × (max_connections + max_prepared_transactions). If you reduced VM size or max_connections, you need higher max_locks_per_transaction to compensate.",
Action: "Option 1: Increase max_locks_per_transaction to 1024+ in postgresql.conf (requires restart). Option 2: Update dbbackup and retry - phased restore now auto-enabled for BLOB databases", Action: "Fix: ALTER SYSTEM SET max_locks_per_transaction = 4096; then RESTART PostgreSQL. For smaller VMs with fewer connections, you need higher max_locks_per_transaction values.",
Severity: 2, Severity: 2,
} }
} }

View File

@@ -103,7 +103,7 @@ func TestChunker_ShiftedData(t *testing.T) {
// Test that shifted data still shares chunks (the key CDC benefit) // Test that shifted data still shares chunks (the key CDC benefit)
// Use deterministic random data for reproducible test results // Use deterministic random data for reproducible test results
rng := mathrand.New(mathrand.NewSource(42)) rng := mathrand.New(mathrand.NewSource(42))
original := make([]byte, 100*1024) original := make([]byte, 100*1024)
rng.Read(original) rng.Read(original)

View File

@@ -38,6 +38,10 @@ type DatabaseProgressCallback func(done, total int, dbName string)
// Parameters: done count, total count, database name, elapsed time for current restore phase, avg duration per DB // Parameters: done count, total count, database name, elapsed time for current restore phase, avg duration per DB
type DatabaseProgressWithTimingCallback func(done, total int, dbName string, phaseElapsed, avgPerDB time.Duration) type DatabaseProgressWithTimingCallback func(done, total int, dbName string, phaseElapsed, avgPerDB time.Duration)
// DatabaseProgressByBytesCallback is called with progress weighted by database sizes (bytes)
// Parameters: bytes completed, total bytes, current database name, databases done count, total database count
type DatabaseProgressByBytesCallback func(bytesDone, bytesTotal int64, dbName string, dbDone, dbTotal int)
// Engine handles database restore operations // Engine handles database restore operations
type Engine struct { type Engine struct {
cfg *config.Config cfg *config.Config
@@ -49,9 +53,10 @@ type Engine struct {
debugLogPath string // Path to save debug log on error debugLogPath string // Path to save debug log on error
// TUI progress callback for detailed progress reporting // TUI progress callback for detailed progress reporting
progressCallback ProgressCallback progressCallback ProgressCallback
dbProgressCallback DatabaseProgressCallback dbProgressCallback DatabaseProgressCallback
dbProgressTimingCallback DatabaseProgressWithTimingCallback dbProgressTimingCallback DatabaseProgressWithTimingCallback
dbProgressByBytesCallback DatabaseProgressByBytesCallback
} }
// New creates a new restore engine // New creates a new restore engine
@@ -122,6 +127,11 @@ func (e *Engine) SetDatabaseProgressWithTimingCallback(cb DatabaseProgressWithTi
e.dbProgressTimingCallback = cb e.dbProgressTimingCallback = cb
} }
// SetDatabaseProgressByBytesCallback sets a callback for progress weighted by database sizes
func (e *Engine) SetDatabaseProgressByBytesCallback(cb DatabaseProgressByBytesCallback) {
e.dbProgressByBytesCallback = cb
}
// reportProgress safely calls the progress callback if set // reportProgress safely calls the progress callback if set
func (e *Engine) reportProgress(current, total int64, description string) { func (e *Engine) reportProgress(current, total int64, description string) {
if e.progressCallback != nil { if e.progressCallback != nil {
@@ -143,6 +153,13 @@ func (e *Engine) reportDatabaseProgressWithTiming(done, total int, dbName string
} }
} }
// reportDatabaseProgressByBytes safely calls the bytes-weighted callback if set
func (e *Engine) reportDatabaseProgressByBytes(bytesDone, bytesTotal int64, dbName string, dbDone, dbTotal int) {
if e.dbProgressByBytesCallback != nil {
e.dbProgressByBytesCallback(bytesDone, bytesTotal, dbName, dbDone, dbTotal)
}
}
// loggerAdapter adapts our logger to the progress.Logger interface // loggerAdapter adapts our logger to the progress.Logger interface
type loggerAdapter struct { type loggerAdapter struct {
logger logger.Logger logger logger.Logger
@@ -861,6 +878,25 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string) error {
// Create temporary extraction directory in configured WorkDir // Create temporary extraction directory in configured WorkDir
workDir := e.cfg.GetEffectiveWorkDir() workDir := e.cfg.GetEffectiveWorkDir()
tempDir := filepath.Join(workDir, fmt.Sprintf(".restore_%d", time.Now().Unix())) tempDir := filepath.Join(workDir, fmt.Sprintf(".restore_%d", time.Now().Unix()))
// Check disk space for extraction (need ~3x archive size: compressed + extracted + working space)
if archiveInfo != nil {
requiredBytes := uint64(archiveInfo.Size()) * 3
extractionCheck := checks.CheckDiskSpace(workDir)
if extractionCheck.AvailableBytes < requiredBytes {
operation.Fail("Insufficient disk space for extraction")
return fmt.Errorf("insufficient disk space for extraction in %s: need %.1f GB, have %.1f GB (archive size: %.1f GB × 3)",
workDir,
float64(requiredBytes)/(1024*1024*1024),
float64(extractionCheck.AvailableBytes)/(1024*1024*1024),
float64(archiveInfo.Size())/(1024*1024*1024))
}
e.log.Info("Disk space check for extraction passed",
"workdir", workDir,
"required_gb", float64(requiredBytes)/(1024*1024*1024),
"available_gb", float64(extractionCheck.AvailableBytes)/(1024*1024*1024))
}
if err := os.MkdirAll(tempDir, 0755); err != nil { if err := os.MkdirAll(tempDir, 0755); err != nil {
operation.Fail("Failed to create temporary directory") operation.Fail("Failed to create temporary directory")
return fmt.Errorf("failed to create temp directory in %s: %w", workDir, err) return fmt.Errorf("failed to create temp directory in %s: %w", workDir, err)
@@ -874,6 +910,16 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string) error {
return fmt.Errorf("failed to extract archive: %w", err) return fmt.Errorf("failed to extract archive: %w", err)
} }
// Check context validity after extraction (debugging context cancellation issues)
if ctx.Err() != nil {
e.log.Error("Context cancelled after extraction - this should not happen",
"context_error", ctx.Err(),
"extraction_completed", true)
operation.Fail("Context cancelled unexpectedly")
return fmt.Errorf("context cancelled after extraction completed: %w", ctx.Err())
}
e.log.Info("Extraction completed, context still valid")
// Check if user has superuser privileges (required for ownership restoration) // Check if user has superuser privileges (required for ownership restoration)
e.progress.Update("Checking privileges...") e.progress.Update("Checking privileges...")
isSuperuser, err := e.checkSuperuser(ctx) isSuperuser, err := e.checkSuperuser(ctx)
@@ -1024,12 +1070,27 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string) error {
var restoreErrorsMu sync.Mutex var restoreErrorsMu sync.Mutex
totalDBs := 0 totalDBs := 0
// Count total databases // Count total databases and calculate total bytes for weighted progress
var totalBytes int64
dbSizes := make(map[string]int64) // Map database name to dump file size
for _, entry := range entries { for _, entry := range entries {
if !entry.IsDir() { if !entry.IsDir() {
totalDBs++ totalDBs++
dumpFile := filepath.Join(dumpsDir, entry.Name())
if info, err := os.Stat(dumpFile); err == nil {
dbName := entry.Name()
dbName = strings.TrimSuffix(dbName, ".dump")
dbName = strings.TrimSuffix(dbName, ".sql.gz")
dbSizes[dbName] = info.Size()
totalBytes += info.Size()
}
} }
} }
e.log.Info("Calculated total restore size", "databases", totalDBs, "total_bytes", totalBytes)
// Track bytes completed for weighted progress
var bytesCompleted int64
var bytesCompletedMu sync.Mutex
// Create ETA estimator for database restores // Create ETA estimator for database restores
estimator := progress.NewETAEstimator("Restoring cluster", totalDBs) estimator := progress.NewETAEstimator("Restoring cluster", totalDBs)
@@ -1057,6 +1118,18 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string) error {
var successCount, failCount int32 var successCount, failCount int32
var mu sync.Mutex // Protect shared resources (progress, logger) var mu sync.Mutex // Protect shared resources (progress, logger)
// CRITICAL: Check context before starting database restore loop
// This helps debug issues where context gets cancelled between extraction and restore
if ctx.Err() != nil {
e.log.Error("Context cancelled before database restore loop started",
"context_error", ctx.Err(),
"total_databases", totalDBs,
"parallelism", parallelism)
operation.Fail("Context cancelled before database restores could start")
return fmt.Errorf("context cancelled before database restore: %w", ctx.Err())
}
e.log.Info("Starting database restore loop", "databases", totalDBs, "parallelism", parallelism)
// Timing tracking for restore phase progress // Timing tracking for restore phase progress
restorePhaseStart := time.Now() restorePhaseStart := time.Now()
var completedDBTimes []time.Duration // Track duration for each completed DB restore var completedDBTimes []time.Duration // Track duration for each completed DB restore
@@ -1202,8 +1275,19 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string) error {
completedDBTimes = append(completedDBTimes, dbRestoreDuration) completedDBTimes = append(completedDBTimes, dbRestoreDuration)
completedDBTimesMu.Unlock() completedDBTimesMu.Unlock()
// Update bytes completed for weighted progress
dbSize := dbSizes[dbName]
bytesCompletedMu.Lock()
bytesCompleted += dbSize
currentBytesCompleted := bytesCompleted
currentSuccessCount := int(atomic.LoadInt32(&successCount)) + 1 // +1 because we're about to increment
bytesCompletedMu.Unlock()
// Report weighted progress (bytes-based)
e.reportDatabaseProgressByBytes(currentBytesCompleted, totalBytes, dbName, currentSuccessCount, totalDBs)
atomic.AddInt32(&successCount, 1) atomic.AddInt32(&successCount, 1)
// Small delay to ensure PostgreSQL fully closes connections before next restore // Small delay to ensure PostgreSQL fully closes connections before next restore
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
}(dbIndex, entry.Name()) }(dbIndex, entry.Name())
@@ -2041,9 +2125,10 @@ func (e *Engine) quickValidateSQLDump(archivePath string, compressed bool) error
return nil return nil
} }
// boostLockCapacity temporarily increases max_locks_per_transaction to prevent OOM // boostLockCapacity checks and reports on max_locks_per_transaction capacity.
// during large restores with many BLOBs. Returns the original value for later reset. // IMPORTANT: max_locks_per_transaction requires a PostgreSQL RESTART to change!
// Uses ALTER SYSTEM + pg_reload_conf() so no restart is needed. // This function now calculates total lock capacity based on max_connections and
// warns the user if capacity is insufficient for the restore.
func (e *Engine) boostLockCapacity(ctx context.Context) (int, error) { func (e *Engine) boostLockCapacity(ctx context.Context) (int, error) {
// Connect to PostgreSQL to run system commands // Connect to PostgreSQL to run system commands
connStr := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=postgres sslmode=disable", connStr := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=postgres sslmode=disable",
@@ -2061,7 +2146,7 @@ func (e *Engine) boostLockCapacity(ctx context.Context) (int, error) {
} }
defer db.Close() defer db.Close()
// Get current value // Get current max_locks_per_transaction
var currentValue int var currentValue int
err = db.QueryRowContext(ctx, "SHOW max_locks_per_transaction").Scan(&currentValue) err = db.QueryRowContext(ctx, "SHOW max_locks_per_transaction").Scan(&currentValue)
if err != nil { if err != nil {
@@ -2074,22 +2159,56 @@ func (e *Engine) boostLockCapacity(ctx context.Context) (int, error) {
fmt.Sscanf(currentValueStr, "%d", &currentValue) fmt.Sscanf(currentValueStr, "%d", &currentValue)
} }
// Skip if already high enough // Get max_connections to calculate total lock capacity
if currentValue >= 2048 { var maxConns int
e.log.Info("max_locks_per_transaction already sufficient", "value", currentValue) if err := db.QueryRowContext(ctx, "SHOW max_connections").Scan(&maxConns); err != nil {
return currentValue, nil maxConns = 100 // default
} }
// Boost to 2048 (enough for most BLOB-heavy databases) // Get max_prepared_transactions
_, err = db.ExecContext(ctx, "ALTER SYSTEM SET max_locks_per_transaction = 2048") var maxPreparedTxns int
if err != nil { if err := db.QueryRowContext(ctx, "SHOW max_prepared_transactions").Scan(&maxPreparedTxns); err != nil {
return currentValue, fmt.Errorf("failed to set max_locks_per_transaction: %w", err) maxPreparedTxns = 0
} }
// Reload config without restart // Calculate total lock table capacity:
_, err = db.ExecContext(ctx, "SELECT pg_reload_conf()") // Total locks = max_locks_per_transaction × (max_connections + max_prepared_transactions)
if err != nil { totalLockCapacity := currentValue * (maxConns + maxPreparedTxns)
return currentValue, fmt.Errorf("failed to reload config: %w", err)
e.log.Info("PostgreSQL lock table capacity",
"max_locks_per_transaction", currentValue,
"max_connections", maxConns,
"max_prepared_transactions", maxPreparedTxns,
"total_lock_capacity", totalLockCapacity)
// Minimum recommended total capacity for BLOB-heavy restores: 200,000 locks
minRecommendedCapacity := 200000
if totalLockCapacity < minRecommendedCapacity {
recommendedMaxLocks := minRecommendedCapacity / (maxConns + maxPreparedTxns)
if recommendedMaxLocks < 4096 {
recommendedMaxLocks = 4096
}
e.log.Warn("Lock table capacity may be insufficient for BLOB-heavy restores",
"current_total_capacity", totalLockCapacity,
"recommended_capacity", minRecommendedCapacity,
"current_max_locks", currentValue,
"recommended_max_locks", recommendedMaxLocks,
"note", "max_locks_per_transaction requires PostgreSQL RESTART to change")
// Write suggested fix to ALTER SYSTEM but warn about restart
_, err = db.ExecContext(ctx, fmt.Sprintf("ALTER SYSTEM SET max_locks_per_transaction = %d", recommendedMaxLocks))
if err != nil {
e.log.Warn("Could not set recommended max_locks_per_transaction (needs superuser)", "error", err)
} else {
e.log.Warn("Wrote recommended max_locks_per_transaction to postgresql.auto.conf",
"value", recommendedMaxLocks,
"action", "RESTART PostgreSQL to apply: sudo systemctl restart postgresql")
}
} else {
e.log.Info("Lock table capacity is sufficient",
"total_capacity", totalLockCapacity,
"max_locks_per_transaction", currentValue)
} }
return currentValue, nil return currentValue, nil

View File

@@ -48,12 +48,14 @@ type LinuxChecks struct {
// PostgreSQLChecks contains PostgreSQL configuration checks // PostgreSQLChecks contains PostgreSQL configuration checks
type PostgreSQLChecks struct { type PostgreSQLChecks struct {
MaxLocksPerTransaction int // Current setting MaxLocksPerTransaction int // Current setting
MaintenanceWorkMem string // Current setting MaxPreparedTransactions int // Current setting (affects lock capacity)
SharedBuffers string // Current setting (info only) TotalLockCapacity int // Calculated: max_locks × (max_connections + max_prepared)
MaxConnections int // Current setting MaintenanceWorkMem string // Current setting
Version string // PostgreSQL version SharedBuffers string // Current setting (info only)
IsSuperuser bool // Can we modify settings? MaxConnections int // Current setting
Version string // PostgreSQL version
IsSuperuser bool // Can we modify settings?
} }
// ArchiveChecks contains analysis of the backup archive // ArchiveChecks contains analysis of the backup archive
@@ -201,6 +203,29 @@ func (e *Engine) checkPostgreSQL(ctx context.Context, result *PreflightResult) {
result.PostgreSQL.IsSuperuser = isSuperuser result.PostgreSQL.IsSuperuser = isSuperuser
} }
// Check max_prepared_transactions for lock capacity calculation
var maxPreparedTxns string
if err := db.QueryRowContext(ctx, "SHOW max_prepared_transactions").Scan(&maxPreparedTxns); err == nil {
result.PostgreSQL.MaxPreparedTransactions, _ = strconv.Atoi(maxPreparedTxns)
}
// CRITICAL: Calculate TOTAL lock table capacity
// Formula: max_locks_per_transaction × (max_connections + max_prepared_transactions)
// This is THE key capacity metric for BLOB-heavy restores
maxConns := result.PostgreSQL.MaxConnections
if maxConns == 0 {
maxConns = 100 // default
}
maxPrepared := result.PostgreSQL.MaxPreparedTransactions
totalLockCapacity := result.PostgreSQL.MaxLocksPerTransaction * (maxConns + maxPrepared)
result.PostgreSQL.TotalLockCapacity = totalLockCapacity
e.log.Info("PostgreSQL lock table capacity",
"max_locks_per_transaction", result.PostgreSQL.MaxLocksPerTransaction,
"max_connections", maxConns,
"max_prepared_transactions", maxPrepared,
"total_lock_capacity", totalLockCapacity)
// CRITICAL: max_locks_per_transaction requires PostgreSQL RESTART to change! // CRITICAL: max_locks_per_transaction requires PostgreSQL RESTART to change!
// Warn users loudly about this - it's the #1 cause of "out of shared memory" errors // Warn users loudly about this - it's the #1 cause of "out of shared memory" errors
if result.PostgreSQL.MaxLocksPerTransaction < 256 { if result.PostgreSQL.MaxLocksPerTransaction < 256 {
@@ -212,10 +237,38 @@ func (e *Engine) checkPostgreSQL(ctx context.Context, result *PreflightResult) {
result.Warnings = append(result.Warnings, result.Warnings = append(result.Warnings,
fmt.Sprintf("max_locks_per_transaction=%d is low (recommend 256+). "+ fmt.Sprintf("max_locks_per_transaction=%d is low (recommend 256+). "+
"This setting requires PostgreSQL RESTART to change. "+ "This setting requires PostgreSQL RESTART to change. "+
"BLOB-heavy databases may fail with 'out of shared memory' error.", "BLOB-heavy databases may fail with 'out of shared memory' error. "+
"Fix: Edit postgresql.conf, set max_locks_per_transaction=2048, then restart PostgreSQL.",
result.PostgreSQL.MaxLocksPerTransaction)) result.PostgreSQL.MaxLocksPerTransaction))
} }
// NEW: Check total lock capacity is sufficient for typical BLOB operations
// Minimum recommended: 200,000 for moderate BLOB databases
minRecommendedCapacity := 200000
if totalLockCapacity < minRecommendedCapacity {
recommendedMaxLocks := minRecommendedCapacity / (maxConns + maxPrepared)
if recommendedMaxLocks < 4096 {
recommendedMaxLocks = 4096
}
e.log.Warn("Total lock table capacity is LOW for BLOB-heavy restores",
"current_capacity", totalLockCapacity,
"recommended", minRecommendedCapacity,
"current_max_locks", result.PostgreSQL.MaxLocksPerTransaction,
"current_max_connections", maxConns,
"recommended_max_locks", recommendedMaxLocks,
"note", "VMs with fewer connections need higher max_locks_per_transaction")
result.Warnings = append(result.Warnings,
fmt.Sprintf("Total lock capacity=%d is low (recommend %d+). "+
"Capacity = max_locks_per_transaction(%d) × max_connections(%d). "+
"If you reduced VM size/connections, increase max_locks_per_transaction to %d. "+
"Fix: ALTER SYSTEM SET max_locks_per_transaction = %d; then RESTART PostgreSQL.",
totalLockCapacity, minRecommendedCapacity,
result.PostgreSQL.MaxLocksPerTransaction, maxConns,
recommendedMaxLocks, recommendedMaxLocks))
}
// Parse shared_buffers and warn if very low // Parse shared_buffers and warn if very low
sharedBuffersMB := parseMemoryToMB(result.PostgreSQL.SharedBuffers) sharedBuffersMB := parseMemoryToMB(result.PostgreSQL.SharedBuffers)
if sharedBuffersMB > 0 && sharedBuffersMB < 256 { if sharedBuffersMB > 0 && sharedBuffersMB < 256 {
@@ -324,14 +377,57 @@ func (e *Engine) calculateRecommendations(result *PreflightResult) {
if result.Archive.TotalBlobCount > 50000 { if result.Archive.TotalBlobCount > 50000 {
lockBoost = 16384 lockBoost = 16384
} }
if result.Archive.TotalBlobCount > 100000 {
lockBoost = 32768
}
if result.Archive.TotalBlobCount > 200000 {
lockBoost = 65536
}
// Cap at reasonable maximum // For extreme cases, calculate actual requirement
if lockBoost > 16384 { // Rule of thumb: ~1 lock per BLOB, divided by max_connections (default 100)
lockBoost = 16384 // Add 50% safety margin
maxConns := result.PostgreSQL.MaxConnections
if maxConns == 0 {
maxConns = 100 // default
}
calculatedLocks := (result.Archive.TotalBlobCount / maxConns) * 3 / 2 // 1.5x safety margin
if calculatedLocks > lockBoost {
lockBoost = calculatedLocks
} }
result.Archive.RecommendedLockBoost = lockBoost result.Archive.RecommendedLockBoost = lockBoost
// CRITICAL: Check if current max_locks_per_transaction is dangerously low for this BLOB count
currentLocks := result.PostgreSQL.MaxLocksPerTransaction
if currentLocks > 0 && result.Archive.TotalBlobCount > 0 {
// Estimate max BLOBs we can handle: locks * max_connections
maxSafeBLOBs := currentLocks * maxConns
if result.Archive.TotalBlobCount > maxSafeBLOBs {
severity := "WARNING"
if result.Archive.TotalBlobCount > maxSafeBLOBs*2 {
severity = "CRITICAL"
result.CanProceed = false
}
e.log.Error(fmt.Sprintf("%s: max_locks_per_transaction too low for BLOB count", severity),
"current_max_locks", currentLocks,
"total_blobs", result.Archive.TotalBlobCount,
"max_safe_blobs", maxSafeBLOBs,
"recommended_max_locks", lockBoost)
result.Errors = append(result.Errors,
fmt.Sprintf("%s: Archive contains %s BLOBs but max_locks_per_transaction=%d can only safely handle ~%s. "+
"Increase max_locks_per_transaction to %d in postgresql.conf and RESTART PostgreSQL.",
severity,
humanize.Comma(int64(result.Archive.TotalBlobCount)),
currentLocks,
humanize.Comma(int64(maxSafeBLOBs)),
lockBoost))
}
}
// Log recommendation // Log recommendation
e.log.Info("Calculated recommended lock boost", e.log.Info("Calculated recommended lock boost",
"total_blobs", result.Archive.TotalBlobCount, "total_blobs", result.Archive.TotalBlobCount,
@@ -365,6 +461,13 @@ func (e *Engine) printPreflightSummary(result *PreflightResult) {
humanize.Comma(int64(result.PostgreSQL.MaxLocksPerTransaction)), humanize.Comma(int64(result.PostgreSQL.MaxLocksPerTransaction)),
humanize.Comma(int64(result.Archive.RecommendedLockBoost))), humanize.Comma(int64(result.Archive.RecommendedLockBoost))),
true) true)
printCheck("max_connections", humanize.Comma(int64(result.PostgreSQL.MaxConnections)), true)
// Show total lock capacity with warning if low
totalCapacityOK := result.PostgreSQL.TotalLockCapacity >= 200000
printCheck("Total Lock Capacity",
fmt.Sprintf("%s (max_locks × max_conns)",
humanize.Comma(int64(result.PostgreSQL.TotalLockCapacity))),
totalCapacityOK)
printCheck("maintenance_work_mem", fmt.Sprintf("%s → 2GB (auto-boost)", printCheck("maintenance_work_mem", fmt.Sprintf("%s → 2GB (auto-boost)",
result.PostgreSQL.MaintenanceWorkMem), true) result.PostgreSQL.MaintenanceWorkMem), true)
printInfo("shared_buffers", result.PostgreSQL.SharedBuffers) printInfo("shared_buffers", result.PostgreSQL.SharedBuffers)
@@ -386,6 +489,14 @@ func (e *Engine) printPreflightSummary(result *PreflightResult) {
} }
} }
// Errors (blocking issues)
if len(result.Errors) > 0 {
fmt.Println("\n ✗ ERRORS (must fix before proceeding):")
for _, e := range result.Errors {
fmt.Printf(" • %s\n", e)
}
}
// Warnings // Warnings
if len(result.Warnings) > 0 { if len(result.Warnings) > 0 {
fmt.Println("\n ⚠ Warnings:") fmt.Println("\n ⚠ Warnings:")
@@ -394,6 +505,23 @@ func (e *Engine) printPreflightSummary(result *PreflightResult) {
} }
} }
// Final status
fmt.Println()
if !result.CanProceed {
fmt.Println(" ┌─────────────────────────────────────────────────────────┐")
fmt.Println(" │ ✗ PREFLIGHT FAILED - Cannot proceed with restore │")
fmt.Println(" │ Fix the errors above and try again. │")
fmt.Println(" └─────────────────────────────────────────────────────────┘")
} else if len(result.Warnings) > 0 {
fmt.Println(" ┌─────────────────────────────────────────────────────────┐")
fmt.Println(" │ ⚠ PREFLIGHT PASSED WITH WARNINGS - Proceed with care │")
fmt.Println(" └─────────────────────────────────────────────────────────┘")
} else {
fmt.Println(" ┌─────────────────────────────────────────────────────────┐")
fmt.Println(" │ ✓ PREFLIGHT PASSED - Ready to restore │")
fmt.Println(" └─────────────────────────────────────────────────────────┘")
}
fmt.Println(strings.Repeat("─", 60)) fmt.Println(strings.Repeat("─", 60))
fmt.Println() fmt.Println()
} }

View File

@@ -159,6 +159,10 @@ type sharedProgressState struct {
overallPhase int overallPhase int
extractionDone bool extractionDone bool
// Weighted progress by database sizes (bytes)
dbBytesTotal int64 // Total bytes across all databases
dbBytesDone int64 // Bytes completed (sum of finished DB sizes)
// Rolling window for speed calculation // Rolling window for speed calculation
speedSamples []restoreSpeedSample speedSamples []restoreSpeedSample
} }
@@ -186,12 +190,12 @@ func clearCurrentRestoreProgress() {
currentRestoreProgressState = nil currentRestoreProgressState = nil
} }
func getCurrentRestoreProgress() (bytesTotal, bytesDone int64, description string, hasUpdate bool, dbTotal, dbDone int, speed float64, dbPhaseElapsed, dbAvgPerDB time.Duration, currentDB string, overallPhase int, extractionDone bool) { func getCurrentRestoreProgress() (bytesTotal, bytesDone int64, description string, hasUpdate bool, dbTotal, dbDone int, speed float64, dbPhaseElapsed, dbAvgPerDB time.Duration, currentDB string, overallPhase int, extractionDone bool, dbBytesTotal, dbBytesDone int64) {
currentRestoreProgressMu.Lock() currentRestoreProgressMu.Lock()
defer currentRestoreProgressMu.Unlock() defer currentRestoreProgressMu.Unlock()
if currentRestoreProgressState == nil { if currentRestoreProgressState == nil {
return 0, 0, "", false, 0, 0, 0, 0, 0, "", 0, false return 0, 0, "", false, 0, 0, 0, 0, 0, "", 0, false, 0, 0
} }
currentRestoreProgressState.mu.Lock() currentRestoreProgressState.mu.Lock()
@@ -205,7 +209,8 @@ func getCurrentRestoreProgress() (bytesTotal, bytesDone int64, description strin
currentRestoreProgressState.dbTotal, currentRestoreProgressState.dbDone, speed, currentRestoreProgressState.dbTotal, currentRestoreProgressState.dbDone, speed,
currentRestoreProgressState.dbPhaseElapsed, currentRestoreProgressState.dbAvgPerDB, currentRestoreProgressState.dbPhaseElapsed, currentRestoreProgressState.dbAvgPerDB,
currentRestoreProgressState.currentDB, currentRestoreProgressState.overallPhase, currentRestoreProgressState.currentDB, currentRestoreProgressState.overallPhase,
currentRestoreProgressState.extractionDone currentRestoreProgressState.extractionDone,
currentRestoreProgressState.dbBytesTotal, currentRestoreProgressState.dbBytesDone
} }
// calculateRollingSpeed calculates speed from recent samples (last 5 seconds) // calculateRollingSpeed calculates speed from recent samples (last 5 seconds)
@@ -359,6 +364,20 @@ func executeRestoreWithTUIProgress(parentCtx context.Context, cfg *config.Config
progressState.bytesDone = 0 progressState.bytesDone = 0
}) })
// Set up weighted (bytes-based) progress callback for accurate cluster restore progress
engine.SetDatabaseProgressByBytesCallback(func(bytesDone, bytesTotal int64, dbName string, dbDone, dbTotal int) {
progressState.mu.Lock()
defer progressState.mu.Unlock()
progressState.dbBytesDone = bytesDone
progressState.dbBytesTotal = bytesTotal
progressState.dbDone = dbDone
progressState.dbTotal = dbTotal
progressState.currentDB = dbName
progressState.overallPhase = 3
progressState.extractionDone = true
progressState.hasUpdate = true
})
// Store progress state in a package-level variable for the ticker to access // Store progress state in a package-level variable for the ticker to access
// This is a workaround because tea messages can't be sent from callbacks // This is a workaround because tea messages can't be sent from callbacks
setCurrentRestoreProgress(progressState) setCurrentRestoreProgress(progressState)
@@ -412,7 +431,7 @@ func (m RestoreExecutionModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
m.elapsed = time.Since(m.startTime) m.elapsed = time.Since(m.startTime)
// Poll shared progress state for real-time updates // Poll shared progress state for real-time updates
bytesTotal, bytesDone, description, hasUpdate, dbTotal, dbDone, speed, dbPhaseElapsed, dbAvgPerDB, currentDB, overallPhase, extractionDone := getCurrentRestoreProgress() bytesTotal, bytesDone, description, hasUpdate, dbTotal, dbDone, speed, dbPhaseElapsed, dbAvgPerDB, currentDB, overallPhase, extractionDone, dbBytesTotal, dbBytesDone := getCurrentRestoreProgress()
if hasUpdate && bytesTotal > 0 && !extractionDone { if hasUpdate && bytesTotal > 0 && !extractionDone {
// Phase 1: Extraction // Phase 1: Extraction
m.bytesTotal = bytesTotal m.bytesTotal = bytesTotal
@@ -443,8 +462,16 @@ func (m RestoreExecutionModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
} else { } else {
m.status = "Finalizing..." m.status = "Finalizing..."
} }
m.phase = fmt.Sprintf("Phase 3/3: Databases (%d/%d)", dbDone, dbTotal)
m.progress = int((dbDone * 100) / dbTotal) // Use weighted progress by bytes if available, otherwise use count
if dbBytesTotal > 0 {
weightedPercent := int((dbBytesDone * 100) / dbBytesTotal)
m.phase = fmt.Sprintf("Phase 3/3: Databases (%d/%d) - %.1f%% by size", dbDone, dbTotal, float64(dbBytesDone*100)/float64(dbBytesTotal))
m.progress = weightedPercent
} else {
m.phase = fmt.Sprintf("Phase 3/3: Databases (%d/%d)", dbDone, dbTotal)
m.progress = int((dbDone * 100) / dbTotal)
}
} else if hasUpdate && extractionDone && dbTotal == 0 { } else if hasUpdate && extractionDone && dbTotal == 0 {
// Phase 2: Globals restore (brief phase between extraction and databases) // Phase 2: Globals restore (brief phase between extraction and databases)
m.overallPhase = 2 m.overallPhase = 2