Compare commits
11 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 0e050b2def | |||
| 62d58c77af | |||
| c5be9bcd2b | |||
| b120f1507e | |||
| dd1db844ce | |||
| 4ea3ec2cf8 | |||
| 9200024e50 | |||
| 698b8a761c | |||
| dd7c4da0eb | |||
| b2a78cad2a | |||
| 5728b465e6 |
15
CHANGELOG.md
15
CHANGELOG.md
@@ -5,6 +5,21 @@ All notable changes to dbbackup will be documented in this file.
|
||||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||
|
||||
## [3.42.50] - 2026-01-16 "Ctrl+C Signal Handling Fix"
|
||||
|
||||
### Fixed - Proper Ctrl+C/SIGINT Handling in TUI
|
||||
- **Added tea.InterruptMsg handling** - Bubbletea v1.3+ sends `InterruptMsg` for SIGINT signals
|
||||
instead of a `KeyMsg` with "ctrl+c", causing cancellation to not work
|
||||
- **Fixed cluster restore cancellation** - Ctrl+C now properly cancels running restore operations
|
||||
- **Fixed cluster backup cancellation** - Ctrl+C now properly cancels running backup operations
|
||||
- **Added interrupt handling to main menu** - Proper cleanup on SIGINT from menu
|
||||
- **Orphaned process cleanup** - `cleanup.KillOrphanedProcesses()` called on all interrupt paths
|
||||
|
||||
### Changed
|
||||
- All TUI execution views now handle both `tea.KeyMsg` ("ctrl+c") and `tea.InterruptMsg`
|
||||
- Context cancellation properly propagates to child processes via `exec.CommandContext`
|
||||
- No zombie pg_dump/pg_restore/gzip processes left behind on cancellation
|
||||
|
||||
## [3.42.49] - 2026-01-16 "Unified Cluster Backup Progress"
|
||||
|
||||
### Added - Unified Progress Display for Cluster Backup
|
||||
|
||||
@@ -3,9 +3,9 @@
|
||||
This directory contains pre-compiled binaries for the DB Backup Tool across multiple platforms and architectures.
|
||||
|
||||
## Build Information
|
||||
- **Version**: 3.42.48
|
||||
- **Build Time**: 2026-01-16_14:32:45_UTC
|
||||
- **Git Commit**: 780beaa
|
||||
- **Version**: 3.42.50
|
||||
- **Build Time**: 2026-01-17_12:41:47_UTC
|
||||
- **Git Commit**: 62d58c7
|
||||
|
||||
## Recent Updates (v1.1.0)
|
||||
- ✅ Fixed TUI progress display with line-by-line output
|
||||
|
||||
@@ -28,6 +28,7 @@ var (
|
||||
restoreClean bool
|
||||
restoreCreate bool
|
||||
restoreJobs int
|
||||
restoreParallelDBs int // Number of parallel database restores
|
||||
restoreTarget string
|
||||
restoreVerbose bool
|
||||
restoreNoProgress bool
|
||||
@@ -289,6 +290,7 @@ func init() {
|
||||
restoreClusterCmd.Flags().BoolVar(&restoreForce, "force", false, "Skip safety checks and confirmations")
|
||||
restoreClusterCmd.Flags().BoolVar(&restoreCleanCluster, "clean-cluster", false, "Drop all existing user databases before restore (disaster recovery)")
|
||||
restoreClusterCmd.Flags().IntVar(&restoreJobs, "jobs", 0, "Number of parallel decompression jobs (0 = auto)")
|
||||
restoreClusterCmd.Flags().IntVar(&restoreParallelDBs, "parallel-dbs", 0, "Number of databases to restore in parallel (0 = use config default, 1 = sequential, -1 = auto-detect based on CPU/RAM)")
|
||||
restoreClusterCmd.Flags().StringVar(&restoreWorkdir, "workdir", "", "Working directory for extraction (use when system disk is small, e.g. /mnt/storage/restore_tmp)")
|
||||
restoreClusterCmd.Flags().BoolVar(&restoreVerbose, "verbose", false, "Show detailed restore progress")
|
||||
restoreClusterCmd.Flags().BoolVar(&restoreNoProgress, "no-progress", false, "Disable progress indicators")
|
||||
@@ -783,6 +785,17 @@ func runRestoreCluster(cmd *cobra.Command, args []string) error {
|
||||
}
|
||||
}
|
||||
|
||||
// Override cluster parallelism if --parallel-dbs is specified
|
||||
if restoreParallelDBs == -1 {
|
||||
// Auto-detect optimal parallelism based on system resources
|
||||
autoParallel := restore.CalculateOptimalParallel()
|
||||
cfg.ClusterParallelism = autoParallel
|
||||
log.Info("Auto-detected optimal parallelism for database restores", "parallel_dbs", autoParallel, "mode", "auto")
|
||||
} else if restoreParallelDBs > 0 {
|
||||
cfg.ClusterParallelism = restoreParallelDBs
|
||||
log.Info("Using custom parallelism for database restores", "parallel_dbs", restoreParallelDBs)
|
||||
}
|
||||
|
||||
// Create restore engine
|
||||
engine := restore.New(cfg, log, db)
|
||||
|
||||
|
||||
@@ -94,7 +94,7 @@
|
||||
"uid": "${DS_PROMETHEUS}"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "dbbackup_rpo_seconds{instance=~\"$instance\"} < 86400",
|
||||
"expr": "dbbackup_rpo_seconds{instance=~\"$instance\"} < bool 604800",
|
||||
"legendFormat": "{{database}}",
|
||||
"range": true,
|
||||
"refId": "A"
|
||||
@@ -711,19 +711,6 @@
|
||||
},
|
||||
"pluginVersion": "10.2.0",
|
||||
"targets": [
|
||||
{
|
||||
"datasource": {
|
||||
"type": "prometheus",
|
||||
"uid": "${DS_PROMETHEUS}"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "dbbackup_rpo_seconds{instance=~\"$instance\"} < 86400",
|
||||
"format": "table",
|
||||
"instant": true,
|
||||
"legendFormat": "__auto",
|
||||
"range": false,
|
||||
"refId": "Status"
|
||||
},
|
||||
{
|
||||
"datasource": {
|
||||
"type": "prometheus",
|
||||
@@ -769,26 +756,30 @@
|
||||
"Time": true,
|
||||
"Time 1": true,
|
||||
"Time 2": true,
|
||||
"Time 3": true,
|
||||
"__name__": true,
|
||||
"__name__ 1": true,
|
||||
"__name__ 2": true,
|
||||
"__name__ 3": true,
|
||||
"instance 1": true,
|
||||
"instance 2": true,
|
||||
"instance 3": true,
|
||||
"job": true,
|
||||
"job 1": true,
|
||||
"job 2": true,
|
||||
"job 3": true
|
||||
"engine 1": true,
|
||||
"engine 2": true
|
||||
},
|
||||
"indexByName": {
|
||||
"Database": 0,
|
||||
"Instance": 1,
|
||||
"Engine": 2,
|
||||
"RPO": 3,
|
||||
"Size": 4
|
||||
},
|
||||
"indexByName": {},
|
||||
"renameByName": {
|
||||
"Value #RPO": "RPO",
|
||||
"Value #Size": "Size",
|
||||
"Value #Status": "Status",
|
||||
"database": "Database",
|
||||
"instance": "Instance"
|
||||
"instance": "Instance",
|
||||
"engine": "Engine"
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1275,7 +1266,7 @@
|
||||
"query": "label_values(dbbackup_rpo_seconds, instance)",
|
||||
"refId": "StandardVariableQuery"
|
||||
},
|
||||
"refresh": 1,
|
||||
"refresh": 2,
|
||||
"regex": "",
|
||||
"skipUrlSync": false,
|
||||
"sort": 1,
|
||||
|
||||
@@ -68,8 +68,8 @@ func ClassifyError(errorMsg string) *ErrorClassification {
|
||||
Type: "critical",
|
||||
Category: "locks",
|
||||
Message: errorMsg,
|
||||
Hint: "Lock table exhausted - typically caused by large objects (BLOBs) during restore",
|
||||
Action: "Option 1: Increase max_locks_per_transaction to 1024+ in postgresql.conf (requires restart). Option 2: Update dbbackup and retry - phased restore now auto-enabled for BLOB databases",
|
||||
Hint: "Lock table exhausted. Total capacity = max_locks_per_transaction × (max_connections + max_prepared_transactions). If you reduced VM size or max_connections, you need higher max_locks_per_transaction to compensate.",
|
||||
Action: "Fix: ALTER SYSTEM SET max_locks_per_transaction = 4096; then RESTART PostgreSQL. For smaller VMs with fewer connections, you need higher max_locks_per_transaction values.",
|
||||
Severity: 2,
|
||||
}
|
||||
case "permission_denied":
|
||||
@@ -142,8 +142,8 @@ func ClassifyError(errorMsg string) *ErrorClassification {
|
||||
Type: "critical",
|
||||
Category: "locks",
|
||||
Message: errorMsg,
|
||||
Hint: "Lock table exhausted - typically caused by large objects (BLOBs) during restore",
|
||||
Action: "Option 1: Increase max_locks_per_transaction to 1024+ in postgresql.conf (requires restart). Option 2: Update dbbackup and retry - phased restore now auto-enabled for BLOB databases",
|
||||
Hint: "Lock table exhausted. Total capacity = max_locks_per_transaction × (max_connections + max_prepared_transactions). If you reduced VM size or max_connections, you need higher max_locks_per_transaction to compensate.",
|
||||
Action: "Fix: ALTER SYSTEM SET max_locks_per_transaction = 4096; then RESTART PostgreSQL. For smaller VMs with fewer connections, you need higher max_locks_per_transaction values.",
|
||||
Severity: 2,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"bytes"
|
||||
"crypto/rand"
|
||||
"io"
|
||||
mathrand "math/rand"
|
||||
"testing"
|
||||
)
|
||||
|
||||
@@ -100,12 +101,15 @@ func TestChunker_Deterministic(t *testing.T) {
|
||||
|
||||
func TestChunker_ShiftedData(t *testing.T) {
|
||||
// Test that shifted data still shares chunks (the key CDC benefit)
|
||||
// Use deterministic random data for reproducible test results
|
||||
rng := mathrand.New(mathrand.NewSource(42))
|
||||
|
||||
original := make([]byte, 100*1024)
|
||||
rand.Read(original)
|
||||
rng.Read(original)
|
||||
|
||||
// Create shifted version (prepend some bytes)
|
||||
prefix := make([]byte, 1000)
|
||||
rand.Read(prefix)
|
||||
rng.Read(prefix)
|
||||
shifted := append(prefix, original...)
|
||||
|
||||
// Chunk both
|
||||
|
||||
@@ -38,6 +38,10 @@ type DatabaseProgressCallback func(done, total int, dbName string)
|
||||
// Parameters: done count, total count, database name, elapsed time for current restore phase, avg duration per DB
|
||||
type DatabaseProgressWithTimingCallback func(done, total int, dbName string, phaseElapsed, avgPerDB time.Duration)
|
||||
|
||||
// DatabaseProgressByBytesCallback is called with progress weighted by database sizes (bytes)
|
||||
// Parameters: bytes completed, total bytes, current database name, databases done count, total database count
|
||||
type DatabaseProgressByBytesCallback func(bytesDone, bytesTotal int64, dbName string, dbDone, dbTotal int)
|
||||
|
||||
// Engine handles database restore operations
|
||||
type Engine struct {
|
||||
cfg *config.Config
|
||||
@@ -49,9 +53,10 @@ type Engine struct {
|
||||
debugLogPath string // Path to save debug log on error
|
||||
|
||||
// TUI progress callback for detailed progress reporting
|
||||
progressCallback ProgressCallback
|
||||
dbProgressCallback DatabaseProgressCallback
|
||||
dbProgressTimingCallback DatabaseProgressWithTimingCallback
|
||||
progressCallback ProgressCallback
|
||||
dbProgressCallback DatabaseProgressCallback
|
||||
dbProgressTimingCallback DatabaseProgressWithTimingCallback
|
||||
dbProgressByBytesCallback DatabaseProgressByBytesCallback
|
||||
}
|
||||
|
||||
// New creates a new restore engine
|
||||
@@ -122,6 +127,11 @@ func (e *Engine) SetDatabaseProgressWithTimingCallback(cb DatabaseProgressWithTi
|
||||
e.dbProgressTimingCallback = cb
|
||||
}
|
||||
|
||||
// SetDatabaseProgressByBytesCallback sets a callback for progress weighted by database sizes
|
||||
func (e *Engine) SetDatabaseProgressByBytesCallback(cb DatabaseProgressByBytesCallback) {
|
||||
e.dbProgressByBytesCallback = cb
|
||||
}
|
||||
|
||||
// reportProgress safely calls the progress callback if set
|
||||
func (e *Engine) reportProgress(current, total int64, description string) {
|
||||
if e.progressCallback != nil {
|
||||
@@ -143,6 +153,13 @@ func (e *Engine) reportDatabaseProgressWithTiming(done, total int, dbName string
|
||||
}
|
||||
}
|
||||
|
||||
// reportDatabaseProgressByBytes safely calls the bytes-weighted callback if set
|
||||
func (e *Engine) reportDatabaseProgressByBytes(bytesDone, bytesTotal int64, dbName string, dbDone, dbTotal int) {
|
||||
if e.dbProgressByBytesCallback != nil {
|
||||
e.dbProgressByBytesCallback(bytesDone, bytesTotal, dbName, dbDone, dbTotal)
|
||||
}
|
||||
}
|
||||
|
||||
// loggerAdapter adapts our logger to the progress.Logger interface
|
||||
type loggerAdapter struct {
|
||||
logger logger.Logger
|
||||
@@ -861,6 +878,25 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string) error {
|
||||
// Create temporary extraction directory in configured WorkDir
|
||||
workDir := e.cfg.GetEffectiveWorkDir()
|
||||
tempDir := filepath.Join(workDir, fmt.Sprintf(".restore_%d", time.Now().Unix()))
|
||||
|
||||
// Check disk space for extraction (need ~3x archive size: compressed + extracted + working space)
|
||||
if archiveInfo != nil {
|
||||
requiredBytes := uint64(archiveInfo.Size()) * 3
|
||||
extractionCheck := checks.CheckDiskSpace(workDir)
|
||||
if extractionCheck.AvailableBytes < requiredBytes {
|
||||
operation.Fail("Insufficient disk space for extraction")
|
||||
return fmt.Errorf("insufficient disk space for extraction in %s: need %.1f GB, have %.1f GB (archive size: %.1f GB × 3)",
|
||||
workDir,
|
||||
float64(requiredBytes)/(1024*1024*1024),
|
||||
float64(extractionCheck.AvailableBytes)/(1024*1024*1024),
|
||||
float64(archiveInfo.Size())/(1024*1024*1024))
|
||||
}
|
||||
e.log.Info("Disk space check for extraction passed",
|
||||
"workdir", workDir,
|
||||
"required_gb", float64(requiredBytes)/(1024*1024*1024),
|
||||
"available_gb", float64(extractionCheck.AvailableBytes)/(1024*1024*1024))
|
||||
}
|
||||
|
||||
if err := os.MkdirAll(tempDir, 0755); err != nil {
|
||||
operation.Fail("Failed to create temporary directory")
|
||||
return fmt.Errorf("failed to create temp directory in %s: %w", workDir, err)
|
||||
@@ -874,6 +910,16 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string) error {
|
||||
return fmt.Errorf("failed to extract archive: %w", err)
|
||||
}
|
||||
|
||||
// Check context validity after extraction (debugging context cancellation issues)
|
||||
if ctx.Err() != nil {
|
||||
e.log.Error("Context cancelled after extraction - this should not happen",
|
||||
"context_error", ctx.Err(),
|
||||
"extraction_completed", true)
|
||||
operation.Fail("Context cancelled unexpectedly")
|
||||
return fmt.Errorf("context cancelled after extraction completed: %w", ctx.Err())
|
||||
}
|
||||
e.log.Info("Extraction completed, context still valid")
|
||||
|
||||
// Check if user has superuser privileges (required for ownership restoration)
|
||||
e.progress.Update("Checking privileges...")
|
||||
isSuperuser, err := e.checkSuperuser(ctx)
|
||||
@@ -1024,12 +1070,27 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string) error {
|
||||
var restoreErrorsMu sync.Mutex
|
||||
totalDBs := 0
|
||||
|
||||
// Count total databases
|
||||
// Count total databases and calculate total bytes for weighted progress
|
||||
var totalBytes int64
|
||||
dbSizes := make(map[string]int64) // Map database name to dump file size
|
||||
for _, entry := range entries {
|
||||
if !entry.IsDir() {
|
||||
totalDBs++
|
||||
dumpFile := filepath.Join(dumpsDir, entry.Name())
|
||||
if info, err := os.Stat(dumpFile); err == nil {
|
||||
dbName := entry.Name()
|
||||
dbName = strings.TrimSuffix(dbName, ".dump")
|
||||
dbName = strings.TrimSuffix(dbName, ".sql.gz")
|
||||
dbSizes[dbName] = info.Size()
|
||||
totalBytes += info.Size()
|
||||
}
|
||||
}
|
||||
}
|
||||
e.log.Info("Calculated total restore size", "databases", totalDBs, "total_bytes", totalBytes)
|
||||
|
||||
// Track bytes completed for weighted progress
|
||||
var bytesCompleted int64
|
||||
var bytesCompletedMu sync.Mutex
|
||||
|
||||
// Create ETA estimator for database restores
|
||||
estimator := progress.NewETAEstimator("Restoring cluster", totalDBs)
|
||||
@@ -1057,6 +1118,18 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string) error {
|
||||
var successCount, failCount int32
|
||||
var mu sync.Mutex // Protect shared resources (progress, logger)
|
||||
|
||||
// CRITICAL: Check context before starting database restore loop
|
||||
// This helps debug issues where context gets cancelled between extraction and restore
|
||||
if ctx.Err() != nil {
|
||||
e.log.Error("Context cancelled before database restore loop started",
|
||||
"context_error", ctx.Err(),
|
||||
"total_databases", totalDBs,
|
||||
"parallelism", parallelism)
|
||||
operation.Fail("Context cancelled before database restores could start")
|
||||
return fmt.Errorf("context cancelled before database restore: %w", ctx.Err())
|
||||
}
|
||||
e.log.Info("Starting database restore loop", "databases", totalDBs, "parallelism", parallelism)
|
||||
|
||||
// Timing tracking for restore phase progress
|
||||
restorePhaseStart := time.Now()
|
||||
var completedDBTimes []time.Duration // Track duration for each completed DB restore
|
||||
@@ -1202,7 +1275,21 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string) error {
|
||||
completedDBTimes = append(completedDBTimes, dbRestoreDuration)
|
||||
completedDBTimesMu.Unlock()
|
||||
|
||||
// Update bytes completed for weighted progress
|
||||
dbSize := dbSizes[dbName]
|
||||
bytesCompletedMu.Lock()
|
||||
bytesCompleted += dbSize
|
||||
currentBytesCompleted := bytesCompleted
|
||||
currentSuccessCount := int(atomic.LoadInt32(&successCount)) + 1 // +1 because we're about to increment
|
||||
bytesCompletedMu.Unlock()
|
||||
|
||||
// Report weighted progress (bytes-based)
|
||||
e.reportDatabaseProgressByBytes(currentBytesCompleted, totalBytes, dbName, currentSuccessCount, totalDBs)
|
||||
|
||||
atomic.AddInt32(&successCount, 1)
|
||||
|
||||
// Small delay to ensure PostgreSQL fully closes connections before next restore
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}(dbIndex, entry.Name())
|
||||
|
||||
dbIndex++
|
||||
@@ -2038,9 +2125,10 @@ func (e *Engine) quickValidateSQLDump(archivePath string, compressed bool) error
|
||||
return nil
|
||||
}
|
||||
|
||||
// boostLockCapacity temporarily increases max_locks_per_transaction to prevent OOM
|
||||
// during large restores with many BLOBs. Returns the original value for later reset.
|
||||
// Uses ALTER SYSTEM + pg_reload_conf() so no restart is needed.
|
||||
// boostLockCapacity checks and reports on max_locks_per_transaction capacity.
|
||||
// IMPORTANT: max_locks_per_transaction requires a PostgreSQL RESTART to change!
|
||||
// This function now calculates total lock capacity based on max_connections and
|
||||
// warns the user if capacity is insufficient for the restore.
|
||||
func (e *Engine) boostLockCapacity(ctx context.Context) (int, error) {
|
||||
// Connect to PostgreSQL to run system commands
|
||||
connStr := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=postgres sslmode=disable",
|
||||
@@ -2058,7 +2146,7 @@ func (e *Engine) boostLockCapacity(ctx context.Context) (int, error) {
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
// Get current value
|
||||
// Get current max_locks_per_transaction
|
||||
var currentValue int
|
||||
err = db.QueryRowContext(ctx, "SHOW max_locks_per_transaction").Scan(¤tValue)
|
||||
if err != nil {
|
||||
@@ -2071,22 +2159,56 @@ func (e *Engine) boostLockCapacity(ctx context.Context) (int, error) {
|
||||
fmt.Sscanf(currentValueStr, "%d", ¤tValue)
|
||||
}
|
||||
|
||||
// Skip if already high enough
|
||||
if currentValue >= 2048 {
|
||||
e.log.Info("max_locks_per_transaction already sufficient", "value", currentValue)
|
||||
return currentValue, nil
|
||||
// Get max_connections to calculate total lock capacity
|
||||
var maxConns int
|
||||
if err := db.QueryRowContext(ctx, "SHOW max_connections").Scan(&maxConns); err != nil {
|
||||
maxConns = 100 // default
|
||||
}
|
||||
|
||||
// Boost to 2048 (enough for most BLOB-heavy databases)
|
||||
_, err = db.ExecContext(ctx, "ALTER SYSTEM SET max_locks_per_transaction = 2048")
|
||||
if err != nil {
|
||||
return currentValue, fmt.Errorf("failed to set max_locks_per_transaction: %w", err)
|
||||
// Get max_prepared_transactions
|
||||
var maxPreparedTxns int
|
||||
if err := db.QueryRowContext(ctx, "SHOW max_prepared_transactions").Scan(&maxPreparedTxns); err != nil {
|
||||
maxPreparedTxns = 0
|
||||
}
|
||||
|
||||
// Reload config without restart
|
||||
_, err = db.ExecContext(ctx, "SELECT pg_reload_conf()")
|
||||
if err != nil {
|
||||
return currentValue, fmt.Errorf("failed to reload config: %w", err)
|
||||
// Calculate total lock table capacity:
|
||||
// Total locks = max_locks_per_transaction × (max_connections + max_prepared_transactions)
|
||||
totalLockCapacity := currentValue * (maxConns + maxPreparedTxns)
|
||||
|
||||
e.log.Info("PostgreSQL lock table capacity",
|
||||
"max_locks_per_transaction", currentValue,
|
||||
"max_connections", maxConns,
|
||||
"max_prepared_transactions", maxPreparedTxns,
|
||||
"total_lock_capacity", totalLockCapacity)
|
||||
|
||||
// Minimum recommended total capacity for BLOB-heavy restores: 200,000 locks
|
||||
minRecommendedCapacity := 200000
|
||||
if totalLockCapacity < minRecommendedCapacity {
|
||||
recommendedMaxLocks := minRecommendedCapacity / (maxConns + maxPreparedTxns)
|
||||
if recommendedMaxLocks < 4096 {
|
||||
recommendedMaxLocks = 4096
|
||||
}
|
||||
|
||||
e.log.Warn("Lock table capacity may be insufficient for BLOB-heavy restores",
|
||||
"current_total_capacity", totalLockCapacity,
|
||||
"recommended_capacity", minRecommendedCapacity,
|
||||
"current_max_locks", currentValue,
|
||||
"recommended_max_locks", recommendedMaxLocks,
|
||||
"note", "max_locks_per_transaction requires PostgreSQL RESTART to change")
|
||||
|
||||
// Write suggested fix to ALTER SYSTEM but warn about restart
|
||||
_, err = db.ExecContext(ctx, fmt.Sprintf("ALTER SYSTEM SET max_locks_per_transaction = %d", recommendedMaxLocks))
|
||||
if err != nil {
|
||||
e.log.Warn("Could not set recommended max_locks_per_transaction (needs superuser)", "error", err)
|
||||
} else {
|
||||
e.log.Warn("Wrote recommended max_locks_per_transaction to postgresql.auto.conf",
|
||||
"value", recommendedMaxLocks,
|
||||
"action", "RESTART PostgreSQL to apply: sudo systemctl restart postgresql")
|
||||
}
|
||||
} else {
|
||||
e.log.Info("Lock table capacity is sufficient",
|
||||
"total_capacity", totalLockCapacity,
|
||||
"max_locks_per_transaction", currentValue)
|
||||
}
|
||||
|
||||
return currentValue, nil
|
||||
|
||||
@@ -16,6 +16,57 @@ import (
|
||||
"github.com/shirou/gopsutil/v3/mem"
|
||||
)
|
||||
|
||||
// CalculateOptimalParallel returns the recommended number of parallel workers
|
||||
// based on available system resources (CPU cores and RAM).
|
||||
// This is a standalone function that can be called from anywhere.
|
||||
// Returns 0 if resources cannot be detected.
|
||||
func CalculateOptimalParallel() int {
|
||||
cpuCores := runtime.NumCPU()
|
||||
|
||||
vmem, err := mem.VirtualMemory()
|
||||
if err != nil {
|
||||
// Fallback: use half of CPU cores if memory detection fails
|
||||
if cpuCores > 1 {
|
||||
return cpuCores / 2
|
||||
}
|
||||
return 1
|
||||
}
|
||||
|
||||
memAvailableGB := float64(vmem.Available) / (1024 * 1024 * 1024)
|
||||
|
||||
// Each pg_restore worker needs approximately 2-4GB of RAM
|
||||
// Use conservative 3GB per worker to avoid OOM
|
||||
const memPerWorkerGB = 3.0
|
||||
|
||||
// Calculate limits
|
||||
maxByMem := int(memAvailableGB / memPerWorkerGB)
|
||||
maxByCPU := cpuCores
|
||||
|
||||
// Use the minimum of memory and CPU limits
|
||||
recommended := maxByMem
|
||||
if maxByCPU < recommended {
|
||||
recommended = maxByCPU
|
||||
}
|
||||
|
||||
// Apply sensible bounds
|
||||
if recommended < 1 {
|
||||
recommended = 1
|
||||
}
|
||||
if recommended > 16 {
|
||||
recommended = 16 // Cap at 16 to avoid diminishing returns
|
||||
}
|
||||
|
||||
// If memory pressure is high (>80%), reduce parallelism
|
||||
if vmem.UsedPercent > 80 && recommended > 1 {
|
||||
recommended = recommended / 2
|
||||
if recommended < 1 {
|
||||
recommended = 1
|
||||
}
|
||||
}
|
||||
|
||||
return recommended
|
||||
}
|
||||
|
||||
// PreflightResult contains all preflight check results
|
||||
type PreflightResult struct {
|
||||
// Linux system checks
|
||||
@@ -35,25 +86,29 @@ type PreflightResult struct {
|
||||
|
||||
// LinuxChecks contains Linux kernel/system checks
|
||||
type LinuxChecks struct {
|
||||
ShmMax int64 // /proc/sys/kernel/shmmax
|
||||
ShmAll int64 // /proc/sys/kernel/shmall
|
||||
MemTotal uint64 // Total RAM in bytes
|
||||
MemAvailable uint64 // Available RAM in bytes
|
||||
MemUsedPercent float64 // Memory usage percentage
|
||||
ShmMaxOK bool // Is shmmax sufficient?
|
||||
ShmAllOK bool // Is shmall sufficient?
|
||||
MemAvailableOK bool // Is available RAM sufficient?
|
||||
IsLinux bool // Are we running on Linux?
|
||||
ShmMax int64 // /proc/sys/kernel/shmmax
|
||||
ShmAll int64 // /proc/sys/kernel/shmall
|
||||
MemTotal uint64 // Total RAM in bytes
|
||||
MemAvailable uint64 // Available RAM in bytes
|
||||
MemUsedPercent float64 // Memory usage percentage
|
||||
CPUCores int // Number of CPU cores
|
||||
RecommendedParallel int // Auto-calculated optimal parallel count
|
||||
ShmMaxOK bool // Is shmmax sufficient?
|
||||
ShmAllOK bool // Is shmall sufficient?
|
||||
MemAvailableOK bool // Is available RAM sufficient?
|
||||
IsLinux bool // Are we running on Linux?
|
||||
}
|
||||
|
||||
// PostgreSQLChecks contains PostgreSQL configuration checks
|
||||
type PostgreSQLChecks struct {
|
||||
MaxLocksPerTransaction int // Current setting
|
||||
MaintenanceWorkMem string // Current setting
|
||||
SharedBuffers string // Current setting (info only)
|
||||
MaxConnections int // Current setting
|
||||
Version string // PostgreSQL version
|
||||
IsSuperuser bool // Can we modify settings?
|
||||
MaxLocksPerTransaction int // Current setting
|
||||
MaxPreparedTransactions int // Current setting (affects lock capacity)
|
||||
TotalLockCapacity int // Calculated: max_locks × (max_connections + max_prepared)
|
||||
MaintenanceWorkMem string // Current setting
|
||||
SharedBuffers string // Current setting (info only)
|
||||
MaxConnections int // Current setting
|
||||
Version string // PostgreSQL version
|
||||
IsSuperuser bool // Can we modify settings?
|
||||
}
|
||||
|
||||
// ArchiveChecks contains analysis of the backup archive
|
||||
@@ -98,6 +153,7 @@ func (e *Engine) RunPreflightChecks(ctx context.Context, dumpsDir string, entrie
|
||||
// checkSystemResources uses gopsutil for cross-platform system checks
|
||||
func (e *Engine) checkSystemResources(result *PreflightResult) {
|
||||
result.Linux.IsLinux = runtime.GOOS == "linux"
|
||||
result.Linux.CPUCores = runtime.NumCPU()
|
||||
|
||||
// Get memory info (works on Linux, macOS, Windows, BSD)
|
||||
if vmem, err := mem.VirtualMemory(); err == nil {
|
||||
@@ -116,6 +172,9 @@ func (e *Engine) checkSystemResources(result *PreflightResult) {
|
||||
e.log.Warn("Could not detect system memory", "error", err)
|
||||
}
|
||||
|
||||
// Calculate recommended parallel based on resources
|
||||
result.Linux.RecommendedParallel = e.calculateRecommendedParallel(result)
|
||||
|
||||
// Linux-specific kernel checks (shmmax, shmall)
|
||||
if result.Linux.IsLinux {
|
||||
e.checkLinuxKernel(result)
|
||||
@@ -201,6 +260,29 @@ func (e *Engine) checkPostgreSQL(ctx context.Context, result *PreflightResult) {
|
||||
result.PostgreSQL.IsSuperuser = isSuperuser
|
||||
}
|
||||
|
||||
// Check max_prepared_transactions for lock capacity calculation
|
||||
var maxPreparedTxns string
|
||||
if err := db.QueryRowContext(ctx, "SHOW max_prepared_transactions").Scan(&maxPreparedTxns); err == nil {
|
||||
result.PostgreSQL.MaxPreparedTransactions, _ = strconv.Atoi(maxPreparedTxns)
|
||||
}
|
||||
|
||||
// CRITICAL: Calculate TOTAL lock table capacity
|
||||
// Formula: max_locks_per_transaction × (max_connections + max_prepared_transactions)
|
||||
// This is THE key capacity metric for BLOB-heavy restores
|
||||
maxConns := result.PostgreSQL.MaxConnections
|
||||
if maxConns == 0 {
|
||||
maxConns = 100 // default
|
||||
}
|
||||
maxPrepared := result.PostgreSQL.MaxPreparedTransactions
|
||||
totalLockCapacity := result.PostgreSQL.MaxLocksPerTransaction * (maxConns + maxPrepared)
|
||||
result.PostgreSQL.TotalLockCapacity = totalLockCapacity
|
||||
|
||||
e.log.Info("PostgreSQL lock table capacity",
|
||||
"max_locks_per_transaction", result.PostgreSQL.MaxLocksPerTransaction,
|
||||
"max_connections", maxConns,
|
||||
"max_prepared_transactions", maxPrepared,
|
||||
"total_lock_capacity", totalLockCapacity)
|
||||
|
||||
// CRITICAL: max_locks_per_transaction requires PostgreSQL RESTART to change!
|
||||
// Warn users loudly about this - it's the #1 cause of "out of shared memory" errors
|
||||
if result.PostgreSQL.MaxLocksPerTransaction < 256 {
|
||||
@@ -212,10 +294,38 @@ func (e *Engine) checkPostgreSQL(ctx context.Context, result *PreflightResult) {
|
||||
result.Warnings = append(result.Warnings,
|
||||
fmt.Sprintf("max_locks_per_transaction=%d is low (recommend 256+). "+
|
||||
"This setting requires PostgreSQL RESTART to change. "+
|
||||
"BLOB-heavy databases may fail with 'out of shared memory' error.",
|
||||
"BLOB-heavy databases may fail with 'out of shared memory' error. "+
|
||||
"Fix: Edit postgresql.conf, set max_locks_per_transaction=2048, then restart PostgreSQL.",
|
||||
result.PostgreSQL.MaxLocksPerTransaction))
|
||||
}
|
||||
|
||||
// NEW: Check total lock capacity is sufficient for typical BLOB operations
|
||||
// Minimum recommended: 200,000 for moderate BLOB databases
|
||||
minRecommendedCapacity := 200000
|
||||
if totalLockCapacity < minRecommendedCapacity {
|
||||
recommendedMaxLocks := minRecommendedCapacity / (maxConns + maxPrepared)
|
||||
if recommendedMaxLocks < 4096 {
|
||||
recommendedMaxLocks = 4096
|
||||
}
|
||||
|
||||
e.log.Warn("Total lock table capacity is LOW for BLOB-heavy restores",
|
||||
"current_capacity", totalLockCapacity,
|
||||
"recommended", minRecommendedCapacity,
|
||||
"current_max_locks", result.PostgreSQL.MaxLocksPerTransaction,
|
||||
"current_max_connections", maxConns,
|
||||
"recommended_max_locks", recommendedMaxLocks,
|
||||
"note", "VMs with fewer connections need higher max_locks_per_transaction")
|
||||
|
||||
result.Warnings = append(result.Warnings,
|
||||
fmt.Sprintf("Total lock capacity=%d is low (recommend %d+). "+
|
||||
"Capacity = max_locks_per_transaction(%d) × max_connections(%d). "+
|
||||
"If you reduced VM size/connections, increase max_locks_per_transaction to %d. "+
|
||||
"Fix: ALTER SYSTEM SET max_locks_per_transaction = %d; then RESTART PostgreSQL.",
|
||||
totalLockCapacity, minRecommendedCapacity,
|
||||
result.PostgreSQL.MaxLocksPerTransaction, maxConns,
|
||||
recommendedMaxLocks, recommendedMaxLocks))
|
||||
}
|
||||
|
||||
// Parse shared_buffers and warn if very low
|
||||
sharedBuffersMB := parseMemoryToMB(result.PostgreSQL.SharedBuffers)
|
||||
if sharedBuffersMB > 0 && sharedBuffersMB < 256 {
|
||||
@@ -324,20 +434,113 @@ func (e *Engine) calculateRecommendations(result *PreflightResult) {
|
||||
if result.Archive.TotalBlobCount > 50000 {
|
||||
lockBoost = 16384
|
||||
}
|
||||
if result.Archive.TotalBlobCount > 100000 {
|
||||
lockBoost = 32768
|
||||
}
|
||||
if result.Archive.TotalBlobCount > 200000 {
|
||||
lockBoost = 65536
|
||||
}
|
||||
|
||||
// Cap at reasonable maximum
|
||||
if lockBoost > 16384 {
|
||||
lockBoost = 16384
|
||||
// For extreme cases, calculate actual requirement
|
||||
// Rule of thumb: ~1 lock per BLOB, divided by max_connections (default 100)
|
||||
// Add 50% safety margin
|
||||
maxConns := result.PostgreSQL.MaxConnections
|
||||
if maxConns == 0 {
|
||||
maxConns = 100 // default
|
||||
}
|
||||
calculatedLocks := (result.Archive.TotalBlobCount / maxConns) * 3 / 2 // 1.5x safety margin
|
||||
if calculatedLocks > lockBoost {
|
||||
lockBoost = calculatedLocks
|
||||
}
|
||||
|
||||
result.Archive.RecommendedLockBoost = lockBoost
|
||||
|
||||
// CRITICAL: Check if current max_locks_per_transaction is dangerously low for this BLOB count
|
||||
currentLocks := result.PostgreSQL.MaxLocksPerTransaction
|
||||
if currentLocks > 0 && result.Archive.TotalBlobCount > 0 {
|
||||
// Estimate max BLOBs we can handle: locks * max_connections
|
||||
maxSafeBLOBs := currentLocks * maxConns
|
||||
|
||||
if result.Archive.TotalBlobCount > maxSafeBLOBs {
|
||||
severity := "WARNING"
|
||||
if result.Archive.TotalBlobCount > maxSafeBLOBs*2 {
|
||||
severity = "CRITICAL"
|
||||
result.CanProceed = false
|
||||
}
|
||||
|
||||
e.log.Error(fmt.Sprintf("%s: max_locks_per_transaction too low for BLOB count", severity),
|
||||
"current_max_locks", currentLocks,
|
||||
"total_blobs", result.Archive.TotalBlobCount,
|
||||
"max_safe_blobs", maxSafeBLOBs,
|
||||
"recommended_max_locks", lockBoost)
|
||||
|
||||
result.Errors = append(result.Errors,
|
||||
fmt.Sprintf("%s: Archive contains %s BLOBs but max_locks_per_transaction=%d can only safely handle ~%s. "+
|
||||
"Increase max_locks_per_transaction to %d in postgresql.conf and RESTART PostgreSQL.",
|
||||
severity,
|
||||
humanize.Comma(int64(result.Archive.TotalBlobCount)),
|
||||
currentLocks,
|
||||
humanize.Comma(int64(maxSafeBLOBs)),
|
||||
lockBoost))
|
||||
}
|
||||
}
|
||||
|
||||
// Log recommendation
|
||||
e.log.Info("Calculated recommended lock boost",
|
||||
"total_blobs", result.Archive.TotalBlobCount,
|
||||
"recommended_locks", lockBoost)
|
||||
}
|
||||
|
||||
// calculateRecommendedParallel determines optimal parallelism based on system resources
|
||||
// Returns the recommended number of parallel workers for pg_restore
|
||||
func (e *Engine) calculateRecommendedParallel(result *PreflightResult) int {
|
||||
cpuCores := result.Linux.CPUCores
|
||||
if cpuCores == 0 {
|
||||
cpuCores = runtime.NumCPU()
|
||||
}
|
||||
|
||||
memAvailableGB := float64(result.Linux.MemAvailable) / (1024 * 1024 * 1024)
|
||||
|
||||
// Each pg_restore worker needs approximately 2-4GB of RAM
|
||||
// Use conservative 3GB per worker to avoid OOM
|
||||
const memPerWorkerGB = 3.0
|
||||
|
||||
// Calculate limits
|
||||
maxByMem := int(memAvailableGB / memPerWorkerGB)
|
||||
maxByCPU := cpuCores
|
||||
|
||||
// Use the minimum of memory and CPU limits
|
||||
recommended := maxByMem
|
||||
if maxByCPU < recommended {
|
||||
recommended = maxByCPU
|
||||
}
|
||||
|
||||
// Apply sensible bounds
|
||||
if recommended < 1 {
|
||||
recommended = 1
|
||||
}
|
||||
if recommended > 16 {
|
||||
recommended = 16 // Cap at 16 to avoid diminishing returns
|
||||
}
|
||||
|
||||
// If memory pressure is high (>80%), reduce parallelism
|
||||
if result.Linux.MemUsedPercent > 80 && recommended > 1 {
|
||||
recommended = recommended / 2
|
||||
if recommended < 1 {
|
||||
recommended = 1
|
||||
}
|
||||
}
|
||||
|
||||
e.log.Info("Calculated recommended parallel",
|
||||
"cpu_cores", cpuCores,
|
||||
"mem_available_gb", fmt.Sprintf("%.1f", memAvailableGB),
|
||||
"max_by_mem", maxByMem,
|
||||
"max_by_cpu", maxByCPU,
|
||||
"recommended", recommended)
|
||||
|
||||
return recommended
|
||||
}
|
||||
|
||||
// printPreflightSummary prints a nice summary of all checks
|
||||
func (e *Engine) printPreflightSummary(result *PreflightResult) {
|
||||
fmt.Println()
|
||||
@@ -350,6 +553,8 @@ func (e *Engine) printPreflightSummary(result *PreflightResult) {
|
||||
printCheck("Total RAM", humanize.Bytes(result.Linux.MemTotal), true)
|
||||
printCheck("Available RAM", humanize.Bytes(result.Linux.MemAvailable), result.Linux.MemAvailableOK || result.Linux.MemAvailable == 0)
|
||||
printCheck("Memory Usage", fmt.Sprintf("%.1f%%", result.Linux.MemUsedPercent), result.Linux.MemUsedPercent < 85)
|
||||
printCheck("CPU Cores", fmt.Sprintf("%d", result.Linux.CPUCores), true)
|
||||
printCheck("Recommended Parallel", fmt.Sprintf("%d (auto-calculated)", result.Linux.RecommendedParallel), true)
|
||||
|
||||
// Linux-specific kernel checks
|
||||
if result.Linux.IsLinux && result.Linux.ShmMax > 0 {
|
||||
@@ -365,6 +570,13 @@ func (e *Engine) printPreflightSummary(result *PreflightResult) {
|
||||
humanize.Comma(int64(result.PostgreSQL.MaxLocksPerTransaction)),
|
||||
humanize.Comma(int64(result.Archive.RecommendedLockBoost))),
|
||||
true)
|
||||
printCheck("max_connections", humanize.Comma(int64(result.PostgreSQL.MaxConnections)), true)
|
||||
// Show total lock capacity with warning if low
|
||||
totalCapacityOK := result.PostgreSQL.TotalLockCapacity >= 200000
|
||||
printCheck("Total Lock Capacity",
|
||||
fmt.Sprintf("%s (max_locks × max_conns)",
|
||||
humanize.Comma(int64(result.PostgreSQL.TotalLockCapacity))),
|
||||
totalCapacityOK)
|
||||
printCheck("maintenance_work_mem", fmt.Sprintf("%s → 2GB (auto-boost)",
|
||||
result.PostgreSQL.MaintenanceWorkMem), true)
|
||||
printInfo("shared_buffers", result.PostgreSQL.SharedBuffers)
|
||||
@@ -386,6 +598,14 @@ func (e *Engine) printPreflightSummary(result *PreflightResult) {
|
||||
}
|
||||
}
|
||||
|
||||
// Errors (blocking issues)
|
||||
if len(result.Errors) > 0 {
|
||||
fmt.Println("\n ✗ ERRORS (must fix before proceeding):")
|
||||
for _, e := range result.Errors {
|
||||
fmt.Printf(" • %s\n", e)
|
||||
}
|
||||
}
|
||||
|
||||
// Warnings
|
||||
if len(result.Warnings) > 0 {
|
||||
fmt.Println("\n ⚠ Warnings:")
|
||||
@@ -394,6 +614,23 @@ func (e *Engine) printPreflightSummary(result *PreflightResult) {
|
||||
}
|
||||
}
|
||||
|
||||
// Final status
|
||||
fmt.Println()
|
||||
if !result.CanProceed {
|
||||
fmt.Println(" ┌─────────────────────────────────────────────────────────┐")
|
||||
fmt.Println(" │ ✗ PREFLIGHT FAILED - Cannot proceed with restore │")
|
||||
fmt.Println(" │ Fix the errors above and try again. │")
|
||||
fmt.Println(" └─────────────────────────────────────────────────────────┘")
|
||||
} else if len(result.Warnings) > 0 {
|
||||
fmt.Println(" ┌─────────────────────────────────────────────────────────┐")
|
||||
fmt.Println(" │ ⚠ PREFLIGHT PASSED WITH WARNINGS - Proceed with care │")
|
||||
fmt.Println(" └─────────────────────────────────────────────────────────┘")
|
||||
} else {
|
||||
fmt.Println(" ┌─────────────────────────────────────────────────────────┐")
|
||||
fmt.Println(" │ ✓ PREFLIGHT PASSED - Ready to restore │")
|
||||
fmt.Println(" └─────────────────────────────────────────────────────────┘")
|
||||
}
|
||||
|
||||
fmt.Println(strings.Repeat("─", 60))
|
||||
fmt.Println()
|
||||
}
|
||||
|
||||
@@ -295,6 +295,20 @@ func (m BackupExecutionModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
|
||||
}
|
||||
return m, nil
|
||||
|
||||
case tea.InterruptMsg:
|
||||
// Handle Ctrl+C signal (SIGINT) - Bubbletea v1.3+ sends this instead of KeyMsg for ctrl+c
|
||||
if !m.done && !m.cancelling {
|
||||
m.cancelling = true
|
||||
m.status = "[STOP] Cancelling backup... (please wait)"
|
||||
if m.cancel != nil {
|
||||
m.cancel()
|
||||
}
|
||||
return m, nil
|
||||
} else if m.done {
|
||||
return m.parent, tea.Quit
|
||||
}
|
||||
return m, nil
|
||||
|
||||
case tea.KeyMsg:
|
||||
switch msg.String() {
|
||||
case "ctrl+c", "esc":
|
||||
@@ -441,64 +455,61 @@ func (m BackupExecutionModel) View() string {
|
||||
// Show completion summary with detailed stats
|
||||
if m.err != nil {
|
||||
s.WriteString("\n")
|
||||
s.WriteString(errorStyle.Render(" ╔══════════════════════════════════════════════════════════╗"))
|
||||
s.WriteString(errorStyle.Render("╔══════════════════════════════════════════════════════════════╗"))
|
||||
s.WriteString("\n")
|
||||
s.WriteString(errorStyle.Render(" ║ [FAIL] BACKUP FAILED ║"))
|
||||
s.WriteString(errorStyle.Render("║ [FAIL] BACKUP FAILED ║"))
|
||||
s.WriteString("\n")
|
||||
s.WriteString(errorStyle.Render(" ╚══════════════════════════════════════════════════════════╝"))
|
||||
s.WriteString(errorStyle.Render("╚══════════════════════════════════════════════════════════════╝"))
|
||||
s.WriteString("\n\n")
|
||||
s.WriteString(errorStyle.Render(fmt.Sprintf(" Error: %v", m.err)))
|
||||
s.WriteString(errorStyle.Render(fmt.Sprintf(" Error: %v", m.err)))
|
||||
s.WriteString("\n")
|
||||
} else {
|
||||
s.WriteString(successStyle.Render("╔══════════════════════════════════════════════════════════════╗"))
|
||||
s.WriteString("\n")
|
||||
s.WriteString(successStyle.Render(" ╔══════════════════════════════════════════════════════════╗"))
|
||||
s.WriteString(successStyle.Render("║ [OK] BACKUP COMPLETED SUCCESSFULLY ║"))
|
||||
s.WriteString("\n")
|
||||
s.WriteString(successStyle.Render(" ║ [OK] BACKUP COMPLETED SUCCESSFULLY ║"))
|
||||
s.WriteString("\n")
|
||||
s.WriteString(successStyle.Render(" ╚══════════════════════════════════════════════════════════╝"))
|
||||
s.WriteString(successStyle.Render("╚══════════════════════════════════════════════════════════════╝"))
|
||||
s.WriteString("\n\n")
|
||||
|
||||
// Summary section
|
||||
s.WriteString(infoStyle.Render(" ─── Summary ─────────────────────────────────────────────"))
|
||||
s.WriteString(infoStyle.Render(" ─── Summary ───────────────────────────────────────────────"))
|
||||
s.WriteString("\n\n")
|
||||
|
||||
// Backup type specific info
|
||||
switch m.backupType {
|
||||
case "cluster":
|
||||
s.WriteString(" Type: Cluster Backup\n")
|
||||
s.WriteString(" Type: Cluster Backup\n")
|
||||
if m.dbTotal > 0 {
|
||||
s.WriteString(fmt.Sprintf(" Databases: %d backed up\n", m.dbTotal))
|
||||
s.WriteString(fmt.Sprintf(" Databases: %d backed up\n", m.dbTotal))
|
||||
}
|
||||
case "single":
|
||||
s.WriteString(" Type: Single Database Backup\n")
|
||||
s.WriteString(fmt.Sprintf(" Database: %s\n", m.databaseName))
|
||||
s.WriteString(" Type: Single Database Backup\n")
|
||||
s.WriteString(fmt.Sprintf(" Database: %s\n", m.databaseName))
|
||||
case "sample":
|
||||
s.WriteString(" Type: Sample Backup\n")
|
||||
s.WriteString(fmt.Sprintf(" Database: %s\n", m.databaseName))
|
||||
s.WriteString(fmt.Sprintf(" Sample Ratio: %d\n", m.ratio))
|
||||
s.WriteString(" Type: Sample Backup\n")
|
||||
s.WriteString(fmt.Sprintf(" Database: %s\n", m.databaseName))
|
||||
s.WriteString(fmt.Sprintf(" Sample Ratio: %d\n", m.ratio))
|
||||
}
|
||||
|
||||
s.WriteString("\n")
|
||||
|
||||
// Timing section
|
||||
s.WriteString(infoStyle.Render(" ─── Timing ──────────────────────────────────────────────"))
|
||||
s.WriteString("\n\n")
|
||||
|
||||
elapsed := time.Since(m.startTime)
|
||||
s.WriteString(fmt.Sprintf(" Total Time: %s\n", formatBackupDuration(elapsed)))
|
||||
|
||||
if m.backupType == "cluster" && m.dbTotal > 0 {
|
||||
avgPerDB := elapsed / time.Duration(m.dbTotal)
|
||||
s.WriteString(fmt.Sprintf(" Avg per DB: %s\n", formatBackupDuration(avgPerDB)))
|
||||
}
|
||||
|
||||
s.WriteString("\n")
|
||||
s.WriteString(infoStyle.Render(" ─────────────────────────────────────────────────────────"))
|
||||
s.WriteString("\n")
|
||||
}
|
||||
|
||||
// Timing section (always shown, consistent with restore)
|
||||
s.WriteString(infoStyle.Render(" ─── Timing ────────────────────────────────────────────────"))
|
||||
s.WriteString("\n\n")
|
||||
|
||||
elapsed := time.Since(m.startTime)
|
||||
s.WriteString(fmt.Sprintf(" Total Time: %s\n", formatBackupDuration(elapsed)))
|
||||
|
||||
if m.backupType == "cluster" && m.dbTotal > 0 && m.err == nil {
|
||||
avgPerDB := elapsed / time.Duration(m.dbTotal)
|
||||
s.WriteString(fmt.Sprintf(" Avg per DB: %s\n", formatBackupDuration(avgPerDB)))
|
||||
}
|
||||
|
||||
s.WriteString("\n")
|
||||
s.WriteString(" [KEY] Press Enter or ESC to return to menu\n")
|
||||
s.WriteString(infoStyle.Render(" ───────────────────────────────────────────────────────────"))
|
||||
s.WriteString("\n\n")
|
||||
s.WriteString(infoStyle.Render(" [KEYS] Press Enter to continue"))
|
||||
}
|
||||
|
||||
return s.String()
|
||||
|
||||
@@ -188,6 +188,21 @@ func (m *MenuModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
|
||||
}
|
||||
return m, nil
|
||||
|
||||
case tea.InterruptMsg:
|
||||
// Handle Ctrl+C signal (SIGINT) - Bubbletea v1.3+ sends this
|
||||
if m.cancel != nil {
|
||||
m.cancel()
|
||||
}
|
||||
|
||||
// Clean up any orphaned processes before exit
|
||||
m.logger.Info("Cleaning up processes before exit (SIGINT)")
|
||||
if err := cleanup.KillOrphanedProcesses(m.logger); err != nil {
|
||||
m.logger.Warn("Failed to clean up all processes", "error", err)
|
||||
}
|
||||
|
||||
m.quitting = true
|
||||
return m, tea.Quit
|
||||
|
||||
case tea.KeyMsg:
|
||||
switch msg.String() {
|
||||
case "ctrl+c", "q":
|
||||
|
||||
@@ -159,6 +159,10 @@ type sharedProgressState struct {
|
||||
overallPhase int
|
||||
extractionDone bool
|
||||
|
||||
// Weighted progress by database sizes (bytes)
|
||||
dbBytesTotal int64 // Total bytes across all databases
|
||||
dbBytesDone int64 // Bytes completed (sum of finished DB sizes)
|
||||
|
||||
// Rolling window for speed calculation
|
||||
speedSamples []restoreSpeedSample
|
||||
}
|
||||
@@ -186,12 +190,12 @@ func clearCurrentRestoreProgress() {
|
||||
currentRestoreProgressState = nil
|
||||
}
|
||||
|
||||
func getCurrentRestoreProgress() (bytesTotal, bytesDone int64, description string, hasUpdate bool, dbTotal, dbDone int, speed float64, dbPhaseElapsed, dbAvgPerDB time.Duration, currentDB string, overallPhase int, extractionDone bool) {
|
||||
func getCurrentRestoreProgress() (bytesTotal, bytesDone int64, description string, hasUpdate bool, dbTotal, dbDone int, speed float64, dbPhaseElapsed, dbAvgPerDB time.Duration, currentDB string, overallPhase int, extractionDone bool, dbBytesTotal, dbBytesDone int64) {
|
||||
currentRestoreProgressMu.Lock()
|
||||
defer currentRestoreProgressMu.Unlock()
|
||||
|
||||
if currentRestoreProgressState == nil {
|
||||
return 0, 0, "", false, 0, 0, 0, 0, 0, "", 0, false
|
||||
return 0, 0, "", false, 0, 0, 0, 0, 0, "", 0, false, 0, 0
|
||||
}
|
||||
|
||||
currentRestoreProgressState.mu.Lock()
|
||||
@@ -205,7 +209,8 @@ func getCurrentRestoreProgress() (bytesTotal, bytesDone int64, description strin
|
||||
currentRestoreProgressState.dbTotal, currentRestoreProgressState.dbDone, speed,
|
||||
currentRestoreProgressState.dbPhaseElapsed, currentRestoreProgressState.dbAvgPerDB,
|
||||
currentRestoreProgressState.currentDB, currentRestoreProgressState.overallPhase,
|
||||
currentRestoreProgressState.extractionDone
|
||||
currentRestoreProgressState.extractionDone,
|
||||
currentRestoreProgressState.dbBytesTotal, currentRestoreProgressState.dbBytesDone
|
||||
}
|
||||
|
||||
// calculateRollingSpeed calculates speed from recent samples (last 5 seconds)
|
||||
@@ -359,6 +364,20 @@ func executeRestoreWithTUIProgress(parentCtx context.Context, cfg *config.Config
|
||||
progressState.bytesDone = 0
|
||||
})
|
||||
|
||||
// Set up weighted (bytes-based) progress callback for accurate cluster restore progress
|
||||
engine.SetDatabaseProgressByBytesCallback(func(bytesDone, bytesTotal int64, dbName string, dbDone, dbTotal int) {
|
||||
progressState.mu.Lock()
|
||||
defer progressState.mu.Unlock()
|
||||
progressState.dbBytesDone = bytesDone
|
||||
progressState.dbBytesTotal = bytesTotal
|
||||
progressState.dbDone = dbDone
|
||||
progressState.dbTotal = dbTotal
|
||||
progressState.currentDB = dbName
|
||||
progressState.overallPhase = 3
|
||||
progressState.extractionDone = true
|
||||
progressState.hasUpdate = true
|
||||
})
|
||||
|
||||
// Store progress state in a package-level variable for the ticker to access
|
||||
// This is a workaround because tea messages can't be sent from callbacks
|
||||
setCurrentRestoreProgress(progressState)
|
||||
@@ -412,7 +431,7 @@ func (m RestoreExecutionModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
|
||||
m.elapsed = time.Since(m.startTime)
|
||||
|
||||
// Poll shared progress state for real-time updates
|
||||
bytesTotal, bytesDone, description, hasUpdate, dbTotal, dbDone, speed, dbPhaseElapsed, dbAvgPerDB, currentDB, overallPhase, extractionDone := getCurrentRestoreProgress()
|
||||
bytesTotal, bytesDone, description, hasUpdate, dbTotal, dbDone, speed, dbPhaseElapsed, dbAvgPerDB, currentDB, overallPhase, extractionDone, dbBytesTotal, dbBytesDone := getCurrentRestoreProgress()
|
||||
if hasUpdate && bytesTotal > 0 && !extractionDone {
|
||||
// Phase 1: Extraction
|
||||
m.bytesTotal = bytesTotal
|
||||
@@ -443,8 +462,16 @@ func (m RestoreExecutionModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
|
||||
} else {
|
||||
m.status = "Finalizing..."
|
||||
}
|
||||
m.phase = fmt.Sprintf("Phase 3/3: Databases (%d/%d)", dbDone, dbTotal)
|
||||
m.progress = int((dbDone * 100) / dbTotal)
|
||||
|
||||
// Use weighted progress by bytes if available, otherwise use count
|
||||
if dbBytesTotal > 0 {
|
||||
weightedPercent := int((dbBytesDone * 100) / dbBytesTotal)
|
||||
m.phase = fmt.Sprintf("Phase 3/3: Databases (%d/%d) - %.1f%% by size", dbDone, dbTotal, float64(dbBytesDone*100)/float64(dbBytesTotal))
|
||||
m.progress = weightedPercent
|
||||
} else {
|
||||
m.phase = fmt.Sprintf("Phase 3/3: Databases (%d/%d)", dbDone, dbTotal)
|
||||
m.progress = int((dbDone * 100) / dbTotal)
|
||||
}
|
||||
} else if hasUpdate && extractionDone && dbTotal == 0 {
|
||||
// Phase 2: Globals restore (brief phase between extraction and databases)
|
||||
m.overallPhase = 2
|
||||
@@ -536,6 +563,21 @@ func (m RestoreExecutionModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
|
||||
}
|
||||
return m, nil
|
||||
|
||||
case tea.InterruptMsg:
|
||||
// Handle Ctrl+C signal (SIGINT) - Bubbletea v1.3+ sends this instead of KeyMsg for ctrl+c
|
||||
if !m.done && !m.cancelling {
|
||||
m.cancelling = true
|
||||
m.status = "[STOP] Cancelling restore... (please wait)"
|
||||
m.phase = "Cancelling"
|
||||
if m.cancel != nil {
|
||||
m.cancel()
|
||||
}
|
||||
return m, nil
|
||||
} else if m.done {
|
||||
return m.parent, tea.Quit
|
||||
}
|
||||
return m, nil
|
||||
|
||||
case tea.KeyMsg:
|
||||
switch msg.String() {
|
||||
case "ctrl+c", "esc":
|
||||
|
||||
Reference in New Issue
Block a user