v3.42.9: Fix all timeout bugs and deadlocks
CRITICAL FIXES: - Encryption detection false positive (IsBackupEncrypted returned true for ALL files) - 12 cmd.Wait() deadlocks fixed with channel-based context handling - TUI timeout bugs: 60s->10min for safety checks, 15s->60s for DB listing - diagnose.go timeouts: 60s->5min for tar/pg_restore operations - Panic recovery added to parallel backup/restore goroutines - Variable shadowing fix in restore/engine.go These bugs caused pg_dump backups to fail through TUI for months.
This commit is contained in:
@@ -2,12 +2,14 @@ package auth
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"dbbackup/internal/config"
|
||||
)
|
||||
@@ -69,7 +71,10 @@ func checkPgHbaConf(user string) AuthMethod {
|
||||
|
||||
// findHbaFileViaPostgres asks PostgreSQL for the hba_file location
|
||||
func findHbaFileViaPostgres() string {
|
||||
cmd := exec.Command("psql", "-U", "postgres", "-t", "-c", "SHOW hba_file;")
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
cmd := exec.CommandContext(ctx, "psql", "-U", "postgres", "-t", "-c", "SHOW hba_file;")
|
||||
output, err := cmd.Output()
|
||||
if err != nil {
|
||||
return ""
|
||||
@@ -82,8 +87,11 @@ func parsePgHbaConf(path string, user string) AuthMethod {
|
||||
// Try with sudo if we can't read directly
|
||||
file, err := os.Open(path)
|
||||
if err != nil {
|
||||
// Try with sudo
|
||||
cmd := exec.Command("sudo", "cat", path)
|
||||
// Try with sudo (with timeout)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
cmd := exec.CommandContext(ctx, "sudo", "cat", path)
|
||||
output, err := cmd.Output()
|
||||
if err != nil {
|
||||
return AuthUnknown
|
||||
|
||||
@@ -87,20 +87,46 @@ func IsBackupEncrypted(backupPath string) bool {
|
||||
return meta.Encrypted
|
||||
}
|
||||
|
||||
// Fallback: check if file starts with encryption nonce
|
||||
// No metadata found - check file format to determine if encrypted
|
||||
// Known unencrypted formats have specific magic bytes:
|
||||
// - Gzip: 1f 8b
|
||||
// - PGDMP (PostgreSQL custom): 50 47 44 4d 50 (PGDMP)
|
||||
// - Plain SQL: starts with text (-- or SET or CREATE)
|
||||
// - Tar: 75 73 74 61 72 (ustar) at offset 257
|
||||
//
|
||||
// If file doesn't match any known format, it MIGHT be encrypted,
|
||||
// but we return false to avoid false positives. User must provide
|
||||
// metadata file or use --encrypt flag explicitly.
|
||||
file, err := os.Open(backupPath)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
// Try to read nonce - if it succeeds, likely encrypted
|
||||
nonce := make([]byte, crypto.NonceSize)
|
||||
if n, err := file.Read(nonce); err != nil || n != crypto.NonceSize {
|
||||
header := make([]byte, 6)
|
||||
if n, err := file.Read(header); err != nil || n < 2 {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
// Check for known unencrypted formats
|
||||
// Gzip magic: 1f 8b
|
||||
if header[0] == 0x1f && header[1] == 0x8b {
|
||||
return false // Gzip compressed - not encrypted
|
||||
}
|
||||
|
||||
// PGDMP magic (PostgreSQL custom format)
|
||||
if len(header) >= 5 && string(header[:5]) == "PGDMP" {
|
||||
return false // PostgreSQL custom dump - not encrypted
|
||||
}
|
||||
|
||||
// Plain text SQL (starts with --, SET, CREATE, etc.)
|
||||
if header[0] == '-' || header[0] == 'S' || header[0] == 'C' || header[0] == '/' {
|
||||
return false // Plain text SQL - not encrypted
|
||||
}
|
||||
|
||||
// Without metadata, we cannot reliably determine encryption status
|
||||
// Return false to avoid blocking restores with false positives
|
||||
return false
|
||||
}
|
||||
|
||||
// DecryptBackupFile decrypts an encrypted backup file
|
||||
|
||||
@@ -443,6 +443,14 @@ func (e *Engine) BackupCluster(ctx context.Context) error {
|
||||
defer wg.Done()
|
||||
defer func() { <-semaphore }() // Release
|
||||
|
||||
// Panic recovery - prevent one database failure from crashing entire cluster backup
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
e.log.Error("Panic in database backup goroutine", "database", name, "panic", r)
|
||||
atomic.AddInt32(&failCount, 1)
|
||||
}
|
||||
}()
|
||||
|
||||
// Check for cancellation at start of goroutine
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@@ -502,26 +510,10 @@ func (e *Engine) BackupCluster(ctx context.Context) error {
|
||||
|
||||
cmd := e.db.BuildBackupCommand(name, dumpFile, options)
|
||||
|
||||
// Calculate timeout based on database size:
|
||||
// - Minimum 2 hours for small databases
|
||||
// - Add 1 hour per 20GB for large databases
|
||||
// - This allows ~69GB database to take up to 5+ hours
|
||||
timeout := 2 * time.Hour
|
||||
if size, err := e.db.GetDatabaseSize(ctx, name); err == nil {
|
||||
sizeGB := size / (1024 * 1024 * 1024)
|
||||
if sizeGB > 20 {
|
||||
extraHours := (sizeGB / 20) + 1
|
||||
timeout = time.Duration(2+extraHours) * time.Hour
|
||||
mu.Lock()
|
||||
e.printf(" Extended timeout: %v (for %dGB database)\n", timeout, sizeGB)
|
||||
mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
dbCtx, cancel := context.WithTimeout(ctx, timeout)
|
||||
defer cancel()
|
||||
err := e.executeCommand(dbCtx, cmd, dumpFile)
|
||||
cancel()
|
||||
// NO TIMEOUT for individual database backups
|
||||
// Large databases with large objects can take many hours
|
||||
// The parent context handles cancellation if needed
|
||||
err := e.executeCommand(ctx, cmd, dumpFile)
|
||||
|
||||
if err != nil {
|
||||
e.log.Warn("Failed to backup database", "database", name, "error", err)
|
||||
@@ -614,12 +606,36 @@ func (e *Engine) executeCommandWithProgress(ctx context.Context, cmdArgs []strin
|
||||
return fmt.Errorf("failed to start command: %w", err)
|
||||
}
|
||||
|
||||
// Monitor progress via stderr
|
||||
go e.monitorCommandProgress(stderr, tracker)
|
||||
// Monitor progress via stderr in goroutine
|
||||
stderrDone := make(chan struct{})
|
||||
go func() {
|
||||
defer close(stderrDone)
|
||||
e.monitorCommandProgress(stderr, tracker)
|
||||
}()
|
||||
|
||||
// Wait for command to complete
|
||||
if err := cmd.Wait(); err != nil {
|
||||
return fmt.Errorf("backup command failed: %w", err)
|
||||
// Wait for command to complete with proper context handling
|
||||
cmdDone := make(chan error, 1)
|
||||
go func() {
|
||||
cmdDone <- cmd.Wait()
|
||||
}()
|
||||
|
||||
var cmdErr error
|
||||
select {
|
||||
case cmdErr = <-cmdDone:
|
||||
// Command completed (success or failure)
|
||||
case <-ctx.Done():
|
||||
// Context cancelled - kill process to unblock
|
||||
e.log.Warn("Backup cancelled - killing process")
|
||||
cmd.Process.Kill()
|
||||
<-cmdDone // Wait for goroutine to finish
|
||||
cmdErr = ctx.Err()
|
||||
}
|
||||
|
||||
// Wait for stderr reader to finish
|
||||
<-stderrDone
|
||||
|
||||
if cmdErr != nil {
|
||||
return fmt.Errorf("backup command failed: %w", cmdErr)
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -696,8 +712,12 @@ func (e *Engine) executeMySQLWithProgressAndCompression(ctx context.Context, cmd
|
||||
return fmt.Errorf("failed to get stderr pipe: %w", err)
|
||||
}
|
||||
|
||||
// Start monitoring progress
|
||||
go e.monitorCommandProgress(stderr, tracker)
|
||||
// Start monitoring progress in goroutine
|
||||
stderrDone := make(chan struct{})
|
||||
go func() {
|
||||
defer close(stderrDone)
|
||||
e.monitorCommandProgress(stderr, tracker)
|
||||
}()
|
||||
|
||||
// Start both commands
|
||||
if err := gzipCmd.Start(); err != nil {
|
||||
@@ -705,20 +725,41 @@ func (e *Engine) executeMySQLWithProgressAndCompression(ctx context.Context, cmd
|
||||
}
|
||||
|
||||
if err := dumpCmd.Start(); err != nil {
|
||||
gzipCmd.Process.Kill()
|
||||
return fmt.Errorf("failed to start mysqldump: %w", err)
|
||||
}
|
||||
|
||||
// Wait for mysqldump to complete
|
||||
if err := dumpCmd.Wait(); err != nil {
|
||||
return fmt.Errorf("mysqldump failed: %w", err)
|
||||
// Wait for mysqldump with context handling
|
||||
dumpDone := make(chan error, 1)
|
||||
go func() {
|
||||
dumpDone <- dumpCmd.Wait()
|
||||
}()
|
||||
|
||||
var dumpErr error
|
||||
select {
|
||||
case dumpErr = <-dumpDone:
|
||||
// mysqldump completed
|
||||
case <-ctx.Done():
|
||||
e.log.Warn("Backup cancelled - killing mysqldump")
|
||||
dumpCmd.Process.Kill()
|
||||
gzipCmd.Process.Kill()
|
||||
<-dumpDone
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
// Wait for stderr reader
|
||||
<-stderrDone
|
||||
|
||||
// Close pipe and wait for gzip
|
||||
pipe.Close()
|
||||
if err := gzipCmd.Wait(); err != nil {
|
||||
return fmt.Errorf("gzip failed: %w", err)
|
||||
}
|
||||
|
||||
if dumpErr != nil {
|
||||
return fmt.Errorf("mysqldump failed: %w", dumpErr)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -749,19 +790,45 @@ func (e *Engine) executeMySQLWithCompression(ctx context.Context, cmdArgs []stri
|
||||
gzipCmd.Stdin = stdin
|
||||
gzipCmd.Stdout = outFile
|
||||
|
||||
// Start both commands
|
||||
// Start gzip first
|
||||
if err := gzipCmd.Start(); err != nil {
|
||||
return fmt.Errorf("failed to start gzip: %w", err)
|
||||
}
|
||||
|
||||
if err := dumpCmd.Run(); err != nil {
|
||||
return fmt.Errorf("mysqldump failed: %w", err)
|
||||
// Start mysqldump
|
||||
if err := dumpCmd.Start(); err != nil {
|
||||
gzipCmd.Process.Kill()
|
||||
return fmt.Errorf("failed to start mysqldump: %w", err)
|
||||
}
|
||||
|
||||
// Wait for mysqldump with context handling
|
||||
dumpDone := make(chan error, 1)
|
||||
go func() {
|
||||
dumpDone <- dumpCmd.Wait()
|
||||
}()
|
||||
|
||||
var dumpErr error
|
||||
select {
|
||||
case dumpErr = <-dumpDone:
|
||||
// mysqldump completed
|
||||
case <-ctx.Done():
|
||||
e.log.Warn("Backup cancelled - killing mysqldump")
|
||||
dumpCmd.Process.Kill()
|
||||
gzipCmd.Process.Kill()
|
||||
<-dumpDone
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
// Close pipe and wait for gzip
|
||||
stdin.Close()
|
||||
if err := gzipCmd.Wait(); err != nil {
|
||||
return fmt.Errorf("gzip failed: %w", err)
|
||||
}
|
||||
|
||||
if dumpErr != nil {
|
||||
return fmt.Errorf("mysqldump failed: %w", dumpErr)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -898,15 +965,46 @@ func (e *Engine) createArchive(ctx context.Context, sourceDir, outputFile string
|
||||
goto regularTar
|
||||
}
|
||||
|
||||
// Wait for tar to finish
|
||||
if err := cmd.Wait(); err != nil {
|
||||
// Wait for tar with proper context handling
|
||||
tarDone := make(chan error, 1)
|
||||
go func() {
|
||||
tarDone <- cmd.Wait()
|
||||
}()
|
||||
|
||||
var tarErr error
|
||||
select {
|
||||
case tarErr = <-tarDone:
|
||||
// tar completed
|
||||
case <-ctx.Done():
|
||||
e.log.Warn("Archive creation cancelled - killing processes")
|
||||
cmd.Process.Kill()
|
||||
pigzCmd.Process.Kill()
|
||||
return fmt.Errorf("tar failed: %w", err)
|
||||
<-tarDone
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
// Wait for pigz to finish
|
||||
if err := pigzCmd.Wait(); err != nil {
|
||||
return fmt.Errorf("pigz compression failed: %w", err)
|
||||
if tarErr != nil {
|
||||
pigzCmd.Process.Kill()
|
||||
return fmt.Errorf("tar failed: %w", tarErr)
|
||||
}
|
||||
|
||||
// Wait for pigz with proper context handling
|
||||
pigzDone := make(chan error, 1)
|
||||
go func() {
|
||||
pigzDone <- pigzCmd.Wait()
|
||||
}()
|
||||
|
||||
var pigzErr error
|
||||
select {
|
||||
case pigzErr = <-pigzDone:
|
||||
case <-ctx.Done():
|
||||
pigzCmd.Process.Kill()
|
||||
<-pigzDone
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
if pigzErr != nil {
|
||||
return fmt.Errorf("pigz compression failed: %w", pigzErr)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -1251,8 +1349,10 @@ func (e *Engine) executeCommand(ctx context.Context, cmdArgs []string, outputFil
|
||||
return fmt.Errorf("failed to start backup command: %w", err)
|
||||
}
|
||||
|
||||
// Stream stderr output (don't buffer it all in memory)
|
||||
// Stream stderr output in goroutine (don't buffer it all in memory)
|
||||
stderrDone := make(chan struct{})
|
||||
go func() {
|
||||
defer close(stderrDone)
|
||||
scanner := bufio.NewScanner(stderr)
|
||||
scanner.Buffer(make([]byte, 64*1024), 1024*1024) // 1MB max line size
|
||||
for scanner.Scan() {
|
||||
@@ -1263,10 +1363,30 @@ func (e *Engine) executeCommand(ctx context.Context, cmdArgs []string, outputFil
|
||||
}
|
||||
}()
|
||||
|
||||
// Wait for command to complete
|
||||
if err := cmd.Wait(); err != nil {
|
||||
e.log.Error("Backup command failed", "error", err, "database", filepath.Base(outputFile))
|
||||
return fmt.Errorf("backup command failed: %w", err)
|
||||
// Wait for command to complete with proper context handling
|
||||
cmdDone := make(chan error, 1)
|
||||
go func() {
|
||||
cmdDone <- cmd.Wait()
|
||||
}()
|
||||
|
||||
var cmdErr error
|
||||
select {
|
||||
case cmdErr = <-cmdDone:
|
||||
// Command completed (success or failure)
|
||||
case <-ctx.Done():
|
||||
// Context cancelled - kill process to unblock
|
||||
e.log.Warn("Backup cancelled - killing pg_dump process")
|
||||
cmd.Process.Kill()
|
||||
<-cmdDone // Wait for goroutine to finish
|
||||
cmdErr = ctx.Err()
|
||||
}
|
||||
|
||||
// Wait for stderr reader to finish
|
||||
<-stderrDone
|
||||
|
||||
if cmdErr != nil {
|
||||
e.log.Error("Backup command failed", "error", cmdErr, "database", filepath.Base(outputFile))
|
||||
return fmt.Errorf("backup command failed: %w", cmdErr)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"dbbackup/internal/logger"
|
||||
)
|
||||
@@ -116,8 +117,11 @@ func KillOrphanedProcesses(log logger.Logger) error {
|
||||
|
||||
// findProcessesByName returns PIDs of processes matching the given name
|
||||
func findProcessesByName(name string, excludePID int) ([]int, error) {
|
||||
// Use pgrep for efficient process searching
|
||||
cmd := exec.Command("pgrep", "-x", name)
|
||||
// Use pgrep for efficient process searching with timeout
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
cmd := exec.CommandContext(ctx, "pgrep", "-x", name)
|
||||
output, err := cmd.Output()
|
||||
if err != nil {
|
||||
// Exit code 1 means no processes found (not an error)
|
||||
|
||||
@@ -217,8 +217,8 @@ func New() *Config {
|
||||
SingleDBName: getEnvString("SINGLE_DB_NAME", ""),
|
||||
RestoreDBName: getEnvString("RESTORE_DB_NAME", ""),
|
||||
|
||||
// Timeouts
|
||||
ClusterTimeoutMinutes: getEnvInt("CLUSTER_TIMEOUT_MIN", 240),
|
||||
// Timeouts - default 24 hours (1440 min) to handle very large databases with large objects
|
||||
ClusterTimeoutMinutes: getEnvInt("CLUSTER_TIMEOUT_MIN", 1440),
|
||||
|
||||
// Cluster parallelism (default: 2 concurrent operations for faster cluster backup/restore)
|
||||
ClusterParallelism: getEnvInt("CLUSTER_PARALLELISM", 2),
|
||||
@@ -227,7 +227,7 @@ func New() *Config {
|
||||
WorkDir: getEnvString("WORK_DIR", ""),
|
||||
|
||||
// Swap file management
|
||||
SwapFilePath: "", // Will be set after WorkDir is initialized
|
||||
SwapFilePath: "", // Will be set after WorkDir is initialized
|
||||
SwapFileSizeGB: getEnvInt("SWAP_FILE_SIZE_GB", 0), // 0 = disabled by default
|
||||
AutoSwap: getEnvBool("AUTO_SWAP", false),
|
||||
|
||||
|
||||
@@ -28,8 +28,9 @@ type LocalConfig struct {
|
||||
DumpJobs int
|
||||
|
||||
// Performance settings
|
||||
CPUWorkload string
|
||||
MaxCores int
|
||||
CPUWorkload string
|
||||
MaxCores int
|
||||
ClusterTimeout int // Cluster operation timeout in minutes (default: 1440 = 24 hours)
|
||||
|
||||
// Security settings
|
||||
RetentionDays int
|
||||
@@ -121,6 +122,10 @@ func LoadLocalConfig() (*LocalConfig, error) {
|
||||
if mc, err := strconv.Atoi(value); err == nil {
|
||||
cfg.MaxCores = mc
|
||||
}
|
||||
case "cluster_timeout":
|
||||
if ct, err := strconv.Atoi(value); err == nil {
|
||||
cfg.ClusterTimeout = ct
|
||||
}
|
||||
}
|
||||
case "security":
|
||||
switch key {
|
||||
@@ -199,6 +204,9 @@ func SaveLocalConfig(cfg *LocalConfig) error {
|
||||
if cfg.MaxCores != 0 {
|
||||
sb.WriteString(fmt.Sprintf("max_cores = %d\n", cfg.MaxCores))
|
||||
}
|
||||
if cfg.ClusterTimeout != 0 {
|
||||
sb.WriteString(fmt.Sprintf("cluster_timeout = %d\n", cfg.ClusterTimeout))
|
||||
}
|
||||
sb.WriteString("\n")
|
||||
|
||||
// Security section
|
||||
@@ -268,6 +276,10 @@ func ApplyLocalConfig(cfg *Config, local *LocalConfig) {
|
||||
if local.MaxCores != 0 {
|
||||
cfg.MaxCores = local.MaxCores
|
||||
}
|
||||
// Apply cluster timeout from config file (overrides default)
|
||||
if local.ClusterTimeout != 0 {
|
||||
cfg.ClusterTimeoutMinutes = local.ClusterTimeout
|
||||
}
|
||||
if cfg.RetentionDays == 30 && local.RetentionDays != 0 {
|
||||
cfg.RetentionDays = local.RetentionDays
|
||||
}
|
||||
@@ -282,21 +294,22 @@ func ApplyLocalConfig(cfg *Config, local *LocalConfig) {
|
||||
// ConfigFromConfig creates a LocalConfig from a Config
|
||||
func ConfigFromConfig(cfg *Config) *LocalConfig {
|
||||
return &LocalConfig{
|
||||
DBType: cfg.DatabaseType,
|
||||
Host: cfg.Host,
|
||||
Port: cfg.Port,
|
||||
User: cfg.User,
|
||||
Database: cfg.Database,
|
||||
SSLMode: cfg.SSLMode,
|
||||
BackupDir: cfg.BackupDir,
|
||||
WorkDir: cfg.WorkDir,
|
||||
Compression: cfg.CompressionLevel,
|
||||
Jobs: cfg.Jobs,
|
||||
DumpJobs: cfg.DumpJobs,
|
||||
CPUWorkload: cfg.CPUWorkloadType,
|
||||
MaxCores: cfg.MaxCores,
|
||||
RetentionDays: cfg.RetentionDays,
|
||||
MinBackups: cfg.MinBackups,
|
||||
MaxRetries: cfg.MaxRetries,
|
||||
DBType: cfg.DatabaseType,
|
||||
Host: cfg.Host,
|
||||
Port: cfg.Port,
|
||||
User: cfg.User,
|
||||
Database: cfg.Database,
|
||||
SSLMode: cfg.SSLMode,
|
||||
BackupDir: cfg.BackupDir,
|
||||
WorkDir: cfg.WorkDir,
|
||||
Compression: cfg.CompressionLevel,
|
||||
Jobs: cfg.Jobs,
|
||||
DumpJobs: cfg.DumpJobs,
|
||||
CPUWorkload: cfg.CPUWorkloadType,
|
||||
MaxCores: cfg.MaxCores,
|
||||
ClusterTimeout: cfg.ClusterTimeoutMinutes,
|
||||
RetentionDays: cfg.RetentionDays,
|
||||
MinBackups: cfg.MinBackups,
|
||||
MaxRetries: cfg.MaxRetries,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -234,10 +234,26 @@ func (e *MySQLDumpEngine) Backup(ctx context.Context, opts *BackupOptions) (*Bac
|
||||
gzWriter.Close()
|
||||
}
|
||||
|
||||
// Wait for command
|
||||
if err := cmd.Wait(); err != nil {
|
||||
// Wait for command with proper context handling
|
||||
cmdDone := make(chan error, 1)
|
||||
go func() {
|
||||
cmdDone <- cmd.Wait()
|
||||
}()
|
||||
|
||||
var cmdErr error
|
||||
select {
|
||||
case cmdErr = <-cmdDone:
|
||||
// Command completed
|
||||
case <-ctx.Done():
|
||||
e.log.Warn("MySQL backup cancelled - killing process")
|
||||
cmd.Process.Kill()
|
||||
<-cmdDone
|
||||
cmdErr = ctx.Err()
|
||||
}
|
||||
|
||||
if cmdErr != nil {
|
||||
stderr := stderrBuf.String()
|
||||
return nil, fmt.Errorf("mysqldump failed: %w\n%s", err, stderr)
|
||||
return nil, fmt.Errorf("mysqldump failed: %w\n%s", cmdErr, stderr)
|
||||
}
|
||||
|
||||
// Get file info
|
||||
@@ -442,8 +458,25 @@ func (e *MySQLDumpEngine) BackupToWriter(ctx context.Context, w io.Writer, opts
|
||||
gzWriter.Close()
|
||||
}
|
||||
|
||||
if err := cmd.Wait(); err != nil {
|
||||
return nil, fmt.Errorf("mysqldump failed: %w\n%s", err, stderrBuf.String())
|
||||
// Wait for command with proper context handling
|
||||
cmdDone := make(chan error, 1)
|
||||
go func() {
|
||||
cmdDone <- cmd.Wait()
|
||||
}()
|
||||
|
||||
var cmdErr error
|
||||
select {
|
||||
case cmdErr = <-cmdDone:
|
||||
// Command completed
|
||||
case <-ctx.Done():
|
||||
e.log.Warn("MySQL streaming backup cancelled - killing process")
|
||||
cmd.Process.Kill()
|
||||
<-cmdDone
|
||||
cmdErr = ctx.Err()
|
||||
}
|
||||
|
||||
if cmdErr != nil {
|
||||
return nil, fmt.Errorf("mysqldump failed: %w\n%s", cmdErr, stderrBuf.String())
|
||||
}
|
||||
|
||||
return &BackupResult{
|
||||
|
||||
@@ -212,7 +212,11 @@ func (m *BinlogManager) detectTools() error {
|
||||
|
||||
// detectServerType determines if we're working with MySQL or MariaDB
|
||||
func (m *BinlogManager) detectServerType() DatabaseType {
|
||||
cmd := exec.Command(m.mysqlbinlogPath, "--version")
|
||||
// Use timeout to prevent blocking if command hangs
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
cmd := exec.CommandContext(ctx, m.mysqlbinlogPath, "--version")
|
||||
output, err := cmd.Output()
|
||||
if err != nil {
|
||||
return DatabaseMySQL // Default to MySQL
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
@@ -12,6 +13,7 @@ import (
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"dbbackup/internal/logger"
|
||||
)
|
||||
@@ -60,9 +62,9 @@ type DiagnoseDetails struct {
|
||||
TableList []string `json:"table_list,omitempty"`
|
||||
|
||||
// Compression analysis
|
||||
GzipValid bool `json:"gzip_valid,omitempty"`
|
||||
GzipError string `json:"gzip_error,omitempty"`
|
||||
ExpandedSize int64 `json:"expanded_size,omitempty"`
|
||||
GzipValid bool `json:"gzip_valid,omitempty"`
|
||||
GzipError string `json:"gzip_error,omitempty"`
|
||||
ExpandedSize int64 `json:"expanded_size,omitempty"`
|
||||
CompressionRatio float64 `json:"compression_ratio,omitempty"`
|
||||
}
|
||||
|
||||
@@ -157,7 +159,7 @@ func (d *Diagnoser) diagnosePgDump(filePath string, result *DiagnoseResult) {
|
||||
result.IsCorrupted = true
|
||||
result.Details.HasPGDMPSignature = false
|
||||
result.Details.FirstBytes = fmt.Sprintf("%q", header[:minInt(n, 20)])
|
||||
result.Errors = append(result.Errors,
|
||||
result.Errors = append(result.Errors,
|
||||
"Missing PGDMP signature - file is NOT PostgreSQL custom format",
|
||||
"This file may be SQL format incorrectly named as .dump",
|
||||
"Try: file "+filePath+" to check actual file type")
|
||||
@@ -185,7 +187,7 @@ func (d *Diagnoser) diagnosePgDumpGz(filePath string, result *DiagnoseResult) {
|
||||
result.IsCorrupted = true
|
||||
result.Details.GzipValid = false
|
||||
result.Details.GzipError = err.Error()
|
||||
result.Errors = append(result.Errors,
|
||||
result.Errors = append(result.Errors,
|
||||
fmt.Sprintf("Invalid gzip format: %v", err),
|
||||
"The file may be truncated or corrupted during transfer")
|
||||
return
|
||||
@@ -210,7 +212,7 @@ func (d *Diagnoser) diagnosePgDumpGz(filePath string, result *DiagnoseResult) {
|
||||
} else {
|
||||
result.Details.HasPGDMPSignature = false
|
||||
result.Details.FirstBytes = fmt.Sprintf("%q", header[:minInt(n, 20)])
|
||||
|
||||
|
||||
// Check if it's actually SQL content
|
||||
content := string(header[:n])
|
||||
if strings.Contains(content, "PostgreSQL") || strings.Contains(content, "pg_dump") ||
|
||||
@@ -233,7 +235,7 @@ func (d *Diagnoser) diagnosePgDumpGz(filePath string, result *DiagnoseResult) {
|
||||
// Verify full gzip stream integrity by reading to end
|
||||
file.Seek(0, 0)
|
||||
gz, _ = gzip.NewReader(file)
|
||||
|
||||
|
||||
var totalRead int64
|
||||
buf := make([]byte, 32*1024)
|
||||
for {
|
||||
@@ -255,7 +257,7 @@ func (d *Diagnoser) diagnosePgDumpGz(filePath string, result *DiagnoseResult) {
|
||||
}
|
||||
}
|
||||
gz.Close()
|
||||
|
||||
|
||||
result.Details.ExpandedSize = totalRead
|
||||
if result.FileSize > 0 {
|
||||
result.Details.CompressionRatio = float64(totalRead) / float64(result.FileSize)
|
||||
@@ -392,7 +394,7 @@ func (d *Diagnoser) diagnoseSQLScript(filePath string, compressed bool, result *
|
||||
lastCopyTable, copyStartLine),
|
||||
"The backup was truncated during data export",
|
||||
"This explains the 'syntax error' during restore - COPY data is being interpreted as SQL")
|
||||
|
||||
|
||||
if len(copyDataSamples) > 0 {
|
||||
result.Errors = append(result.Errors,
|
||||
fmt.Sprintf("Sample orphaned data: %s", copyDataSamples[0]))
|
||||
@@ -412,8 +414,12 @@ func (d *Diagnoser) diagnoseSQLScript(filePath string, compressed bool, result *
|
||||
|
||||
// diagnoseClusterArchive analyzes a cluster tar.gz archive
|
||||
func (d *Diagnoser) diagnoseClusterArchive(filePath string, result *DiagnoseResult) {
|
||||
// First verify tar.gz integrity
|
||||
cmd := exec.Command("tar", "-tzf", filePath)
|
||||
// First verify tar.gz integrity with timeout
|
||||
// 5 minutes for large archives (multi-GB archives need more time)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
cmd := exec.CommandContext(ctx, "tar", "-tzf", filePath)
|
||||
output, err := cmd.Output()
|
||||
if err != nil {
|
||||
result.IsValid = false
|
||||
@@ -491,13 +497,18 @@ func (d *Diagnoser) diagnoseUnknown(filePath string, result *DiagnoseResult) {
|
||||
|
||||
// verifyWithPgRestore uses pg_restore --list to verify dump integrity
|
||||
func (d *Diagnoser) verifyWithPgRestore(filePath string, result *DiagnoseResult) {
|
||||
cmd := exec.Command("pg_restore", "--list", filePath)
|
||||
// Use timeout to prevent blocking on very large dump files
|
||||
// 5 minutes for large dumps (multi-GB dumps with many tables)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
cmd := exec.CommandContext(ctx, "pg_restore", "--list", filePath)
|
||||
output, err := cmd.CombinedOutput()
|
||||
|
||||
if err != nil {
|
||||
result.Details.PgRestoreListable = false
|
||||
result.Details.PgRestoreError = string(output)
|
||||
|
||||
|
||||
// Check for specific errors
|
||||
errStr := string(output)
|
||||
if strings.Contains(errStr, "unexpected end of file") ||
|
||||
@@ -544,7 +555,11 @@ func (d *Diagnoser) verifyWithPgRestore(filePath string, result *DiagnoseResult)
|
||||
// DiagnoseClusterDumps extracts and diagnoses all dumps in a cluster archive
|
||||
func (d *Diagnoser) DiagnoseClusterDumps(archivePath, tempDir string) ([]*DiagnoseResult, error) {
|
||||
// First, try to list archive contents without extracting (fast check)
|
||||
listCmd := exec.Command("tar", "-tzf", archivePath)
|
||||
// 10 minutes for very large archives
|
||||
listCtx, listCancel := context.WithTimeout(context.Background(), 10*time.Minute)
|
||||
defer listCancel()
|
||||
|
||||
listCmd := exec.CommandContext(listCtx, "tar", "-tzf", archivePath)
|
||||
listOutput, listErr := listCmd.CombinedOutput()
|
||||
if listErr != nil {
|
||||
// Archive listing failed - likely corrupted
|
||||
@@ -557,9 +572,9 @@ func (d *Diagnoser) DiagnoseClusterDumps(archivePath, tempDir string) ([]*Diagno
|
||||
IsCorrupted: true,
|
||||
Details: &DiagnoseDetails{},
|
||||
}
|
||||
|
||||
|
||||
errOutput := string(listOutput)
|
||||
if strings.Contains(errOutput, "unexpected end of file") ||
|
||||
if strings.Contains(errOutput, "unexpected end of file") ||
|
||||
strings.Contains(errOutput, "Unexpected EOF") ||
|
||||
strings.Contains(errOutput, "truncated") {
|
||||
errResult.IsTruncated = true
|
||||
@@ -574,28 +589,34 @@ func (d *Diagnoser) DiagnoseClusterDumps(archivePath, tempDir string) ([]*Diagno
|
||||
fmt.Sprintf("tar error: %s", truncateString(errOutput, 300)),
|
||||
"Run manually: tar -tzf "+archivePath+" 2>&1 | tail -50")
|
||||
}
|
||||
|
||||
|
||||
return []*DiagnoseResult{errResult}, nil
|
||||
}
|
||||
|
||||
// Archive is listable - now check disk space before extraction
|
||||
files := strings.Split(strings.TrimSpace(string(listOutput)), "\n")
|
||||
|
||||
|
||||
// Check if we have enough disk space (estimate 4x archive size needed)
|
||||
archiveInfo, _ := os.Stat(archivePath)
|
||||
requiredSpace := archiveInfo.Size() * 4
|
||||
|
||||
|
||||
// Check temp directory space - try to extract metadata first
|
||||
if stat, err := os.Stat(tempDir); err == nil && stat.IsDir() {
|
||||
// Try extraction of a small test file first
|
||||
testCmd := exec.Command("tar", "-xzf", archivePath, "-C", tempDir, "--wildcards", "*.json", "--wildcards", "globals.sql")
|
||||
// Try extraction of a small test file first with timeout
|
||||
testCtx, testCancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
testCmd := exec.CommandContext(testCtx, "tar", "-xzf", archivePath, "-C", tempDir, "--wildcards", "*.json", "--wildcards", "globals.sql")
|
||||
testCmd.Run() // Ignore error - just try to extract metadata
|
||||
testCancel()
|
||||
}
|
||||
|
||||
|
||||
d.log.Info("Archive listing successful", "files", len(files))
|
||||
|
||||
// Try full extraction
|
||||
cmd := exec.Command("tar", "-xzf", archivePath, "-C", tempDir)
|
||||
|
||||
// Try full extraction - NO TIMEOUT here as large archives can take a long time
|
||||
// Use a generous timeout (30 minutes) for very large archives
|
||||
extractCtx, extractCancel := context.WithTimeout(context.Background(), 30*time.Minute)
|
||||
defer extractCancel()
|
||||
|
||||
cmd := exec.CommandContext(extractCtx, "tar", "-xzf", archivePath, "-C", tempDir)
|
||||
var stderr bytes.Buffer
|
||||
cmd.Stderr = &stderr
|
||||
if err := cmd.Run(); err != nil {
|
||||
@@ -608,14 +629,14 @@ func (d *Diagnoser) DiagnoseClusterDumps(archivePath, tempDir string) ([]*Diagno
|
||||
IsValid: false,
|
||||
Details: &DiagnoseDetails{},
|
||||
}
|
||||
|
||||
|
||||
errOutput := stderr.String()
|
||||
if strings.Contains(errOutput, "No space left") ||
|
||||
if strings.Contains(errOutput, "No space left") ||
|
||||
strings.Contains(errOutput, "cannot write") ||
|
||||
strings.Contains(errOutput, "Disk quota exceeded") {
|
||||
errResult.Errors = append(errResult.Errors,
|
||||
"INSUFFICIENT DISK SPACE to extract archive for diagnosis",
|
||||
fmt.Sprintf("Archive size: %s (needs ~%s for extraction)",
|
||||
fmt.Sprintf("Archive size: %s (needs ~%s for extraction)",
|
||||
formatBytes(archiveInfo.Size()), formatBytes(requiredSpace)),
|
||||
"Use CLI diagnosis instead: dbbackup restore diagnose "+archivePath,
|
||||
"Or use --workdir flag to specify a location with more space")
|
||||
@@ -634,7 +655,7 @@ func (d *Diagnoser) DiagnoseClusterDumps(archivePath, tempDir string) ([]*Diagno
|
||||
fmt.Sprintf("Extraction failed: %v", err),
|
||||
fmt.Sprintf("tar error: %s", truncateString(errOutput, 300)))
|
||||
}
|
||||
|
||||
|
||||
// Still report what files we found in the listing
|
||||
var dumpFiles []string
|
||||
for _, f := range files {
|
||||
@@ -648,7 +669,7 @@ func (d *Diagnoser) DiagnoseClusterDumps(archivePath, tempDir string) ([]*Diagno
|
||||
errResult.Warnings = append(errResult.Warnings,
|
||||
fmt.Sprintf("Archive contains %d database dumps (listing only)", len(dumpFiles)))
|
||||
}
|
||||
|
||||
|
||||
return []*DiagnoseResult{errResult}, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -27,7 +27,7 @@ type Engine struct {
|
||||
progress progress.Indicator
|
||||
detailedReporter *progress.DetailedReporter
|
||||
dryRun bool
|
||||
debugLogPath string // Path to save debug log on error
|
||||
debugLogPath string // Path to save debug log on error
|
||||
errorCollector *ErrorCollector // Collects detailed error info
|
||||
}
|
||||
|
||||
@@ -357,43 +357,68 @@ func (e *Engine) executeRestoreCommandWithContext(ctx context.Context, cmdArgs [
|
||||
return fmt.Errorf("failed to start restore command: %w", err)
|
||||
}
|
||||
|
||||
// Read stderr in chunks to log errors without loading all into memory
|
||||
buf := make([]byte, 4096)
|
||||
// Read stderr in goroutine to avoid blocking
|
||||
var lastError string
|
||||
var errorCount int
|
||||
const maxErrors = 10 // Limit captured errors to prevent OOM
|
||||
for {
|
||||
n, err := stderr.Read(buf)
|
||||
if n > 0 {
|
||||
chunk := string(buf[:n])
|
||||
|
||||
// Feed to error collector if enabled
|
||||
if collector != nil {
|
||||
collector.CaptureStderr(chunk)
|
||||
}
|
||||
|
||||
// Only capture REAL errors, not verbose output
|
||||
if strings.Contains(chunk, "ERROR:") || strings.Contains(chunk, "FATAL:") || strings.Contains(chunk, "error:") {
|
||||
lastError = strings.TrimSpace(chunk)
|
||||
errorCount++
|
||||
if errorCount <= maxErrors {
|
||||
e.log.Warn("Restore stderr", "output", chunk)
|
||||
stderrDone := make(chan struct{})
|
||||
go func() {
|
||||
defer close(stderrDone)
|
||||
buf := make([]byte, 4096)
|
||||
const maxErrors = 10 // Limit captured errors to prevent OOM
|
||||
for {
|
||||
n, err := stderr.Read(buf)
|
||||
if n > 0 {
|
||||
chunk := string(buf[:n])
|
||||
|
||||
// Feed to error collector if enabled
|
||||
if collector != nil {
|
||||
collector.CaptureStderr(chunk)
|
||||
}
|
||||
|
||||
// Only capture REAL errors, not verbose output
|
||||
if strings.Contains(chunk, "ERROR:") || strings.Contains(chunk, "FATAL:") || strings.Contains(chunk, "error:") {
|
||||
lastError = strings.TrimSpace(chunk)
|
||||
errorCount++
|
||||
if errorCount <= maxErrors {
|
||||
e.log.Warn("Restore stderr", "output", chunk)
|
||||
}
|
||||
}
|
||||
// Note: --verbose output is discarded to prevent OOM
|
||||
}
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
// Note: --verbose output is discarded to prevent OOM
|
||||
}
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
}()
|
||||
|
||||
// Wait for command with proper context handling
|
||||
cmdDone := make(chan error, 1)
|
||||
go func() {
|
||||
cmdDone <- cmd.Wait()
|
||||
}()
|
||||
|
||||
var cmdErr error
|
||||
select {
|
||||
case cmdErr = <-cmdDone:
|
||||
// Command completed (success or failure)
|
||||
case <-ctx.Done():
|
||||
// Context cancelled - kill process
|
||||
e.log.Warn("Restore cancelled - killing process")
|
||||
cmd.Process.Kill()
|
||||
<-cmdDone
|
||||
cmdErr = ctx.Err()
|
||||
}
|
||||
|
||||
if err := cmd.Wait(); err != nil {
|
||||
// Wait for stderr reader to finish
|
||||
<-stderrDone
|
||||
|
||||
if cmdErr != nil {
|
||||
// Get exit code
|
||||
exitCode := 1
|
||||
if exitErr, ok := err.(*exec.ExitError); ok {
|
||||
if exitErr, ok := cmdErr.(*exec.ExitError); ok {
|
||||
exitCode = exitErr.ExitCode()
|
||||
}
|
||||
|
||||
|
||||
// PostgreSQL pg_restore returns exit code 1 even for ignorable errors
|
||||
// Check if errors are ignorable (already exists, duplicate, etc.)
|
||||
if lastError != "" && e.isIgnorableError(lastError) {
|
||||
@@ -427,10 +452,10 @@ func (e *Engine) executeRestoreCommandWithContext(ctx context.Context, cmdArgs [
|
||||
errType,
|
||||
errHint,
|
||||
)
|
||||
|
||||
|
||||
// Print report to console
|
||||
collector.PrintReport(report)
|
||||
|
||||
|
||||
// Save to file
|
||||
if e.debugLogPath != "" {
|
||||
if saveErr := collector.SaveReport(report, e.debugLogPath); saveErr != nil {
|
||||
@@ -481,31 +506,56 @@ func (e *Engine) executeRestoreWithDecompression(ctx context.Context, archivePat
|
||||
return fmt.Errorf("failed to start restore command: %w", err)
|
||||
}
|
||||
|
||||
// Read stderr in chunks to log errors without loading all into memory
|
||||
buf := make([]byte, 4096)
|
||||
// Read stderr in goroutine to avoid blocking
|
||||
var lastError string
|
||||
var errorCount int
|
||||
const maxErrors = 10 // Limit captured errors to prevent OOM
|
||||
for {
|
||||
n, err := stderr.Read(buf)
|
||||
if n > 0 {
|
||||
chunk := string(buf[:n])
|
||||
// Only capture REAL errors, not verbose output
|
||||
if strings.Contains(chunk, "ERROR:") || strings.Contains(chunk, "FATAL:") || strings.Contains(chunk, "error:") {
|
||||
lastError = strings.TrimSpace(chunk)
|
||||
errorCount++
|
||||
if errorCount <= maxErrors {
|
||||
e.log.Warn("Restore stderr", "output", chunk)
|
||||
stderrDone := make(chan struct{})
|
||||
go func() {
|
||||
defer close(stderrDone)
|
||||
buf := make([]byte, 4096)
|
||||
const maxErrors = 10 // Limit captured errors to prevent OOM
|
||||
for {
|
||||
n, err := stderr.Read(buf)
|
||||
if n > 0 {
|
||||
chunk := string(buf[:n])
|
||||
// Only capture REAL errors, not verbose output
|
||||
if strings.Contains(chunk, "ERROR:") || strings.Contains(chunk, "FATAL:") || strings.Contains(chunk, "error:") {
|
||||
lastError = strings.TrimSpace(chunk)
|
||||
errorCount++
|
||||
if errorCount <= maxErrors {
|
||||
e.log.Warn("Restore stderr", "output", chunk)
|
||||
}
|
||||
}
|
||||
// Note: --verbose output is discarded to prevent OOM
|
||||
}
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
// Note: --verbose output is discarded to prevent OOM
|
||||
}
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
}()
|
||||
|
||||
// Wait for command with proper context handling
|
||||
cmdDone := make(chan error, 1)
|
||||
go func() {
|
||||
cmdDone <- cmd.Wait()
|
||||
}()
|
||||
|
||||
var cmdErr error
|
||||
select {
|
||||
case cmdErr = <-cmdDone:
|
||||
// Command completed (success or failure)
|
||||
case <-ctx.Done():
|
||||
// Context cancelled - kill process
|
||||
e.log.Warn("Restore with decompression cancelled - killing process")
|
||||
cmd.Process.Kill()
|
||||
<-cmdDone
|
||||
cmdErr = ctx.Err()
|
||||
}
|
||||
|
||||
if err := cmd.Wait(); err != nil {
|
||||
// Wait for stderr reader to finish
|
||||
<-stderrDone
|
||||
|
||||
if cmdErr != nil {
|
||||
// PostgreSQL pg_restore returns exit code 1 even for ignorable errors
|
||||
// Check if errors are ignorable (already exists, duplicate, etc.)
|
||||
if lastError != "" && e.isIgnorableError(lastError) {
|
||||
@@ -517,18 +567,18 @@ func (e *Engine) executeRestoreWithDecompression(ctx context.Context, archivePat
|
||||
if lastError != "" {
|
||||
classification := checks.ClassifyError(lastError)
|
||||
e.log.Error("Restore with decompression failed",
|
||||
"error", err,
|
||||
"error", cmdErr,
|
||||
"last_stderr", lastError,
|
||||
"error_count", errorCount,
|
||||
"error_type", classification.Type,
|
||||
"hint", classification.Hint,
|
||||
"action", classification.Action)
|
||||
return fmt.Errorf("restore failed: %w (last error: %s, total errors: %d) - %s",
|
||||
err, lastError, errorCount, classification.Hint)
|
||||
cmdErr, lastError, errorCount, classification.Hint)
|
||||
}
|
||||
|
||||
e.log.Error("Restore with decompression failed", "error", err, "last_stderr", lastError, "error_count", errorCount)
|
||||
return fmt.Errorf("restore failed: %w", err)
|
||||
e.log.Error("Restore with decompression failed", "error", cmdErr, "last_stderr", lastError, "error_count", errorCount)
|
||||
return fmt.Errorf("restore failed: %w", cmdErr)
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -727,7 +777,7 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string) error {
|
||||
}
|
||||
} else if strings.HasSuffix(dumpFile, ".dump") {
|
||||
// Validate custom format dumps using pg_restore --list
|
||||
cmd := exec.Command("pg_restore", "--list", dumpFile)
|
||||
cmd := exec.CommandContext(ctx, "pg_restore", "--list", dumpFile)
|
||||
output, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
dbName := strings.TrimSuffix(entry.Name(), ".dump")
|
||||
@@ -812,6 +862,14 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string) error {
|
||||
defer wg.Done()
|
||||
defer func() { <-semaphore }() // Release
|
||||
|
||||
// Panic recovery - prevent one database failure from crashing entire cluster restore
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
e.log.Error("Panic in database restore goroutine", "file", filename, "panic", r)
|
||||
atomic.AddInt32(&failCount, 1)
|
||||
}
|
||||
}()
|
||||
|
||||
// Update estimator progress (thread-safe)
|
||||
mu.Lock()
|
||||
estimator.UpdateProgress(idx)
|
||||
@@ -939,16 +997,39 @@ func (e *Engine) extractArchive(ctx context.Context, archivePath, destDir string
|
||||
}
|
||||
|
||||
// Discard stderr output in chunks to prevent memory buildup
|
||||
buf := make([]byte, 4096)
|
||||
for {
|
||||
_, err := stderr.Read(buf)
|
||||
if err != nil {
|
||||
break
|
||||
stderrDone := make(chan struct{})
|
||||
go func() {
|
||||
defer close(stderrDone)
|
||||
buf := make([]byte, 4096)
|
||||
for {
|
||||
_, err := stderr.Read(buf)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Wait for command with proper context handling
|
||||
cmdDone := make(chan error, 1)
|
||||
go func() {
|
||||
cmdDone <- cmd.Wait()
|
||||
}()
|
||||
|
||||
var cmdErr error
|
||||
select {
|
||||
case cmdErr = <-cmdDone:
|
||||
// Command completed
|
||||
case <-ctx.Done():
|
||||
e.log.Warn("Archive extraction cancelled - killing process")
|
||||
cmd.Process.Kill()
|
||||
<-cmdDone
|
||||
cmdErr = ctx.Err()
|
||||
}
|
||||
|
||||
if err := cmd.Wait(); err != nil {
|
||||
return fmt.Errorf("tar extraction failed: %w", err)
|
||||
<-stderrDone
|
||||
|
||||
if cmdErr != nil {
|
||||
return fmt.Errorf("tar extraction failed: %w", cmdErr)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -981,25 +1062,48 @@ func (e *Engine) restoreGlobals(ctx context.Context, globalsFile string) error {
|
||||
return fmt.Errorf("failed to start psql: %w", err)
|
||||
}
|
||||
|
||||
// Read stderr in chunks
|
||||
buf := make([]byte, 4096)
|
||||
// Read stderr in chunks in goroutine
|
||||
var lastError string
|
||||
for {
|
||||
n, err := stderr.Read(buf)
|
||||
if n > 0 {
|
||||
chunk := string(buf[:n])
|
||||
if strings.Contains(chunk, "ERROR") || strings.Contains(chunk, "FATAL") {
|
||||
lastError = chunk
|
||||
e.log.Warn("Globals restore stderr", "output", chunk)
|
||||
stderrDone := make(chan struct{})
|
||||
go func() {
|
||||
defer close(stderrDone)
|
||||
buf := make([]byte, 4096)
|
||||
for {
|
||||
n, err := stderr.Read(buf)
|
||||
if n > 0 {
|
||||
chunk := string(buf[:n])
|
||||
if strings.Contains(chunk, "ERROR") || strings.Contains(chunk, "FATAL") {
|
||||
lastError = chunk
|
||||
e.log.Warn("Globals restore stderr", "output", chunk)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
}()
|
||||
|
||||
// Wait for command with proper context handling
|
||||
cmdDone := make(chan error, 1)
|
||||
go func() {
|
||||
cmdDone <- cmd.Wait()
|
||||
}()
|
||||
|
||||
var cmdErr error
|
||||
select {
|
||||
case cmdErr = <-cmdDone:
|
||||
// Command completed
|
||||
case <-ctx.Done():
|
||||
e.log.Warn("Globals restore cancelled - killing process")
|
||||
cmd.Process.Kill()
|
||||
<-cmdDone
|
||||
cmdErr = ctx.Err()
|
||||
}
|
||||
|
||||
if err := cmd.Wait(); err != nil {
|
||||
return fmt.Errorf("failed to restore globals: %w (last error: %s)", err, lastError)
|
||||
<-stderrDone
|
||||
|
||||
if cmdErr != nil {
|
||||
return fmt.Errorf("failed to restore globals: %w (last error: %s)", cmdErr, lastError)
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -1263,7 +1367,8 @@ func (e *Engine) detectLargeObjectsInDumps(dumpsDir string, entries []os.DirEntr
|
||||
}
|
||||
|
||||
// Use pg_restore -l to list contents (fast, doesn't restore data)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
// 2 minutes for large dumps with many objects
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
cmd := exec.CommandContext(ctx, "pg_restore", "-l", dumpFile)
|
||||
|
||||
@@ -3,6 +3,7 @@ package restore
|
||||
import (
|
||||
"bufio"
|
||||
"compress/gzip"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
@@ -20,43 +21,43 @@ import (
|
||||
// RestoreErrorReport contains comprehensive information about a restore failure
|
||||
type RestoreErrorReport struct {
|
||||
// Metadata
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
Version string `json:"version"`
|
||||
GoVersion string `json:"go_version"`
|
||||
OS string `json:"os"`
|
||||
Arch string `json:"arch"`
|
||||
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
Version string `json:"version"`
|
||||
GoVersion string `json:"go_version"`
|
||||
OS string `json:"os"`
|
||||
Arch string `json:"arch"`
|
||||
|
||||
// Archive info
|
||||
ArchivePath string `json:"archive_path"`
|
||||
ArchiveSize int64 `json:"archive_size"`
|
||||
ArchiveFormat string `json:"archive_format"`
|
||||
|
||||
|
||||
// Database info
|
||||
TargetDB string `json:"target_db"`
|
||||
DatabaseType string `json:"database_type"`
|
||||
|
||||
TargetDB string `json:"target_db"`
|
||||
DatabaseType string `json:"database_type"`
|
||||
|
||||
// Error details
|
||||
ExitCode int `json:"exit_code"`
|
||||
ErrorMessage string `json:"error_message"`
|
||||
ErrorType string `json:"error_type"`
|
||||
ErrorHint string `json:"error_hint"`
|
||||
TotalErrors int `json:"total_errors"`
|
||||
|
||||
ExitCode int `json:"exit_code"`
|
||||
ErrorMessage string `json:"error_message"`
|
||||
ErrorType string `json:"error_type"`
|
||||
ErrorHint string `json:"error_hint"`
|
||||
TotalErrors int `json:"total_errors"`
|
||||
|
||||
// Captured output
|
||||
LastStderr []string `json:"last_stderr"`
|
||||
FirstErrors []string `json:"first_errors"`
|
||||
|
||||
LastStderr []string `json:"last_stderr"`
|
||||
FirstErrors []string `json:"first_errors"`
|
||||
|
||||
// Context around failure
|
||||
FailureContext *FailureContext `json:"failure_context,omitempty"`
|
||||
|
||||
|
||||
// Diagnosis results
|
||||
DiagnosisResult *DiagnoseResult `json:"diagnosis_result,omitempty"`
|
||||
|
||||
|
||||
// Environment (sanitized)
|
||||
PostgresVersion string `json:"postgres_version,omitempty"`
|
||||
PostgresVersion string `json:"postgres_version,omitempty"`
|
||||
PgRestoreVersion string `json:"pg_restore_version,omitempty"`
|
||||
PsqlVersion string `json:"psql_version,omitempty"`
|
||||
|
||||
PsqlVersion string `json:"psql_version,omitempty"`
|
||||
|
||||
// Recommendations
|
||||
Recommendations []string `json:"recommendations"`
|
||||
}
|
||||
@@ -67,40 +68,40 @@ type FailureContext struct {
|
||||
FailedLine int `json:"failed_line,omitempty"`
|
||||
FailedStatement string `json:"failed_statement,omitempty"`
|
||||
SurroundingLines []string `json:"surrounding_lines,omitempty"`
|
||||
|
||||
|
||||
// For COPY block errors
|
||||
InCopyBlock bool `json:"in_copy_block,omitempty"`
|
||||
CopyTableName string `json:"copy_table_name,omitempty"`
|
||||
CopyStartLine int `json:"copy_start_line,omitempty"`
|
||||
SampleCopyData []string `json:"sample_copy_data,omitempty"`
|
||||
|
||||
InCopyBlock bool `json:"in_copy_block,omitempty"`
|
||||
CopyTableName string `json:"copy_table_name,omitempty"`
|
||||
CopyStartLine int `json:"copy_start_line,omitempty"`
|
||||
SampleCopyData []string `json:"sample_copy_data,omitempty"`
|
||||
|
||||
// File position info
|
||||
BytePosition int64 `json:"byte_position,omitempty"`
|
||||
PercentComplete float64 `json:"percent_complete,omitempty"`
|
||||
BytePosition int64 `json:"byte_position,omitempty"`
|
||||
PercentComplete float64 `json:"percent_complete,omitempty"`
|
||||
}
|
||||
|
||||
// ErrorCollector captures detailed error information during restore
|
||||
type ErrorCollector struct {
|
||||
log logger.Logger
|
||||
cfg *config.Config
|
||||
archivePath string
|
||||
targetDB string
|
||||
format ArchiveFormat
|
||||
|
||||
log logger.Logger
|
||||
cfg *config.Config
|
||||
archivePath string
|
||||
targetDB string
|
||||
format ArchiveFormat
|
||||
|
||||
// Captured data
|
||||
stderrLines []string
|
||||
firstErrors []string
|
||||
lastErrors []string
|
||||
totalErrors int
|
||||
exitCode int
|
||||
|
||||
stderrLines []string
|
||||
firstErrors []string
|
||||
lastErrors []string
|
||||
totalErrors int
|
||||
exitCode int
|
||||
|
||||
// Limits
|
||||
maxStderrLines int
|
||||
maxErrorCapture int
|
||||
|
||||
|
||||
// State
|
||||
startTime time.Time
|
||||
enabled bool
|
||||
startTime time.Time
|
||||
enabled bool
|
||||
}
|
||||
|
||||
// NewErrorCollector creates a new error collector
|
||||
@@ -126,30 +127,30 @@ func (ec *ErrorCollector) CaptureStderr(chunk string) {
|
||||
if !ec.enabled {
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
lines := strings.Split(chunk, "\n")
|
||||
for _, line := range lines {
|
||||
line = strings.TrimSpace(line)
|
||||
if line == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
// Store last N lines of stderr
|
||||
if len(ec.stderrLines) >= ec.maxStderrLines {
|
||||
// Shift array, drop oldest
|
||||
ec.stderrLines = ec.stderrLines[1:]
|
||||
}
|
||||
ec.stderrLines = append(ec.stderrLines, line)
|
||||
|
||||
|
||||
// Check if this is an error line
|
||||
if isErrorLine(line) {
|
||||
ec.totalErrors++
|
||||
|
||||
|
||||
// Capture first N errors
|
||||
if len(ec.firstErrors) < ec.maxErrorCapture {
|
||||
ec.firstErrors = append(ec.firstErrors, line)
|
||||
}
|
||||
|
||||
|
||||
// Keep last N errors (ring buffer style)
|
||||
if len(ec.lastErrors) >= ec.maxErrorCapture {
|
||||
ec.lastErrors = ec.lastErrors[1:]
|
||||
@@ -184,36 +185,36 @@ func (ec *ErrorCollector) GenerateReport(errMessage string, errType string, errH
|
||||
LastStderr: ec.stderrLines,
|
||||
FirstErrors: ec.firstErrors,
|
||||
}
|
||||
|
||||
|
||||
// Get archive size
|
||||
if stat, err := os.Stat(ec.archivePath); err == nil {
|
||||
report.ArchiveSize = stat.Size()
|
||||
}
|
||||
|
||||
|
||||
// Get tool versions
|
||||
report.PostgresVersion = getCommandVersion("postgres", "--version")
|
||||
report.PgRestoreVersion = getCommandVersion("pg_restore", "--version")
|
||||
report.PsqlVersion = getCommandVersion("psql", "--version")
|
||||
|
||||
|
||||
// Analyze failure context
|
||||
report.FailureContext = ec.analyzeFailureContext()
|
||||
|
||||
|
||||
// Run diagnosis if not already done
|
||||
diagnoser := NewDiagnoser(ec.log, false)
|
||||
if diagResult, err := diagnoser.DiagnoseFile(ec.archivePath); err == nil {
|
||||
report.DiagnosisResult = diagResult
|
||||
}
|
||||
|
||||
|
||||
// Generate recommendations
|
||||
report.Recommendations = ec.generateRecommendations(report)
|
||||
|
||||
|
||||
return report
|
||||
}
|
||||
|
||||
// analyzeFailureContext extracts context around the failure
|
||||
func (ec *ErrorCollector) analyzeFailureContext() *FailureContext {
|
||||
ctx := &FailureContext{}
|
||||
|
||||
|
||||
// Look for line number in errors
|
||||
for _, errLine := range ec.lastErrors {
|
||||
if lineNum := extractLineNumber(errLine); lineNum > 0 {
|
||||
@@ -221,7 +222,7 @@ func (ec *ErrorCollector) analyzeFailureContext() *FailureContext {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Look for COPY-related errors
|
||||
for _, errLine := range ec.lastErrors {
|
||||
if strings.Contains(errLine, "COPY") || strings.Contains(errLine, "syntax error") {
|
||||
@@ -233,12 +234,12 @@ func (ec *ErrorCollector) analyzeFailureContext() *FailureContext {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// If we have a line number, try to get surrounding context from the dump
|
||||
if ctx.FailedLine > 0 && ec.archivePath != "" {
|
||||
ctx.SurroundingLines = ec.getSurroundingLines(ctx.FailedLine, 5)
|
||||
}
|
||||
|
||||
|
||||
return ctx
|
||||
}
|
||||
|
||||
@@ -246,13 +247,13 @@ func (ec *ErrorCollector) analyzeFailureContext() *FailureContext {
|
||||
func (ec *ErrorCollector) getSurroundingLines(lineNum int, context int) []string {
|
||||
var reader io.Reader
|
||||
var lines []string
|
||||
|
||||
|
||||
file, err := os.Open(ec.archivePath)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
|
||||
// Handle compressed files
|
||||
if strings.HasSuffix(ec.archivePath, ".gz") {
|
||||
gz, err := gzip.NewReader(file)
|
||||
@@ -264,19 +265,19 @@ func (ec *ErrorCollector) getSurroundingLines(lineNum int, context int) []string
|
||||
} else {
|
||||
reader = file
|
||||
}
|
||||
|
||||
|
||||
scanner := bufio.NewScanner(reader)
|
||||
buf := make([]byte, 0, 1024*1024)
|
||||
scanner.Buffer(buf, 10*1024*1024)
|
||||
|
||||
|
||||
currentLine := 0
|
||||
startLine := lineNum - context
|
||||
endLine := lineNum + context
|
||||
|
||||
|
||||
if startLine < 1 {
|
||||
startLine = 1
|
||||
}
|
||||
|
||||
|
||||
for scanner.Scan() {
|
||||
currentLine++
|
||||
if currentLine >= startLine && currentLine <= endLine {
|
||||
@@ -290,18 +291,18 @@ func (ec *ErrorCollector) getSurroundingLines(lineNum int, context int) []string
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
return lines
|
||||
}
|
||||
|
||||
// generateRecommendations provides actionable recommendations based on the error
|
||||
func (ec *ErrorCollector) generateRecommendations(report *RestoreErrorReport) []string {
|
||||
var recs []string
|
||||
|
||||
|
||||
// Check diagnosis results
|
||||
if report.DiagnosisResult != nil {
|
||||
if report.DiagnosisResult.IsTruncated {
|
||||
recs = append(recs,
|
||||
recs = append(recs,
|
||||
"CRITICAL: Backup file is truncated/incomplete",
|
||||
"Action: Re-run the backup for the affected database",
|
||||
"Check: Verify disk space was available during backup",
|
||||
@@ -317,14 +318,14 @@ func (ec *ErrorCollector) generateRecommendations(report *RestoreErrorReport) []
|
||||
}
|
||||
if report.DiagnosisResult.Details != nil && report.DiagnosisResult.Details.UnterminatedCopy {
|
||||
recs = append(recs,
|
||||
fmt.Sprintf("ISSUE: COPY block for table '%s' was not terminated",
|
||||
fmt.Sprintf("ISSUE: COPY block for table '%s' was not terminated",
|
||||
report.DiagnosisResult.Details.LastCopyTable),
|
||||
"Cause: Backup was interrupted during data export",
|
||||
"Action: Re-run backup ensuring it completes fully",
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Check error patterns
|
||||
if report.TotalErrors > 1000000 {
|
||||
recs = append(recs,
|
||||
@@ -333,7 +334,7 @@ func (ec *ErrorCollector) generateRecommendations(report *RestoreErrorReport) []
|
||||
"Check: Verify dump format matches restore command",
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
// Check for common error types
|
||||
errLower := strings.ToLower(report.ErrorMessage)
|
||||
if strings.Contains(errLower, "syntax error") {
|
||||
@@ -343,7 +344,7 @@ func (ec *ErrorCollector) generateRecommendations(report *RestoreErrorReport) []
|
||||
"Check: Run 'dbbackup restore diagnose <archive>' for detailed analysis",
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
if strings.Contains(errLower, "permission denied") {
|
||||
recs = append(recs,
|
||||
"ISSUE: Permission denied",
|
||||
@@ -351,7 +352,7 @@ func (ec *ErrorCollector) generateRecommendations(report *RestoreErrorReport) []
|
||||
"Action: For ownership preservation, use a superuser account",
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
if strings.Contains(errLower, "does not exist") {
|
||||
recs = append(recs,
|
||||
"ISSUE: Missing object reference",
|
||||
@@ -359,7 +360,7 @@ func (ec *ErrorCollector) generateRecommendations(report *RestoreErrorReport) []
|
||||
"Action: Check if target database was created",
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
if len(recs) == 0 {
|
||||
recs = append(recs,
|
||||
"Run 'dbbackup restore diagnose <archive>' for detailed analysis",
|
||||
@@ -367,7 +368,7 @@ func (ec *ErrorCollector) generateRecommendations(report *RestoreErrorReport) []
|
||||
"Review the PostgreSQL/MySQL logs on the target server",
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
return recs
|
||||
}
|
||||
|
||||
@@ -378,18 +379,18 @@ func (ec *ErrorCollector) SaveReport(report *RestoreErrorReport, outputPath stri
|
||||
if err := os.MkdirAll(dir, 0755); err != nil {
|
||||
return fmt.Errorf("failed to create directory: %w", err)
|
||||
}
|
||||
|
||||
|
||||
// Marshal to JSON with indentation
|
||||
data, err := json.MarshalIndent(report, "", " ")
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal report: %w", err)
|
||||
}
|
||||
|
||||
|
||||
// Write file
|
||||
if err := os.WriteFile(outputPath, data, 0644); err != nil {
|
||||
return fmt.Errorf("failed to write report: %w", err)
|
||||
}
|
||||
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -399,35 +400,35 @@ func (ec *ErrorCollector) PrintReport(report *RestoreErrorReport) {
|
||||
fmt.Println(strings.Repeat("═", 70))
|
||||
fmt.Println(" 🔴 RESTORE ERROR REPORT")
|
||||
fmt.Println(strings.Repeat("═", 70))
|
||||
|
||||
|
||||
fmt.Printf("\n📅 Timestamp: %s\n", report.Timestamp.Format("2006-01-02 15:04:05"))
|
||||
fmt.Printf("📦 Archive: %s\n", filepath.Base(report.ArchivePath))
|
||||
fmt.Printf("📊 Format: %s\n", report.ArchiveFormat)
|
||||
fmt.Printf("🎯 Target DB: %s\n", report.TargetDB)
|
||||
fmt.Printf("⚠️ Exit Code: %d\n", report.ExitCode)
|
||||
fmt.Printf("❌ Total Errors: %d\n", report.TotalErrors)
|
||||
|
||||
|
||||
fmt.Println("\n" + strings.Repeat("─", 70))
|
||||
fmt.Println("ERROR DETAILS:")
|
||||
fmt.Println(strings.Repeat("─", 70))
|
||||
|
||||
|
||||
fmt.Printf("\nType: %s\n", report.ErrorType)
|
||||
fmt.Printf("Message: %s\n", report.ErrorMessage)
|
||||
if report.ErrorHint != "" {
|
||||
fmt.Printf("Hint: %s\n", report.ErrorHint)
|
||||
}
|
||||
|
||||
|
||||
// Show failure context
|
||||
if report.FailureContext != nil && report.FailureContext.FailedLine > 0 {
|
||||
fmt.Println("\n" + strings.Repeat("─", 70))
|
||||
fmt.Println("FAILURE CONTEXT:")
|
||||
fmt.Println(strings.Repeat("─", 70))
|
||||
|
||||
|
||||
fmt.Printf("\nFailed at line: %d\n", report.FailureContext.FailedLine)
|
||||
if report.FailureContext.InCopyBlock {
|
||||
fmt.Printf("Inside COPY block for table: %s\n", report.FailureContext.CopyTableName)
|
||||
}
|
||||
|
||||
|
||||
if len(report.FailureContext.SurroundingLines) > 0 {
|
||||
fmt.Println("\nSurrounding lines:")
|
||||
for _, line := range report.FailureContext.SurroundingLines {
|
||||
@@ -435,13 +436,13 @@ func (ec *ErrorCollector) PrintReport(report *RestoreErrorReport) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Show first few errors
|
||||
if len(report.FirstErrors) > 0 {
|
||||
fmt.Println("\n" + strings.Repeat("─", 70))
|
||||
fmt.Println("FIRST ERRORS:")
|
||||
fmt.Println(strings.Repeat("─", 70))
|
||||
|
||||
|
||||
for i, err := range report.FirstErrors {
|
||||
if i >= 5 {
|
||||
fmt.Printf("... and %d more\n", len(report.FirstErrors)-5)
|
||||
@@ -450,13 +451,13 @@ func (ec *ErrorCollector) PrintReport(report *RestoreErrorReport) {
|
||||
fmt.Printf(" %d. %s\n", i+1, truncateString(err, 100))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Show diagnosis summary
|
||||
if report.DiagnosisResult != nil && !report.DiagnosisResult.IsValid {
|
||||
fmt.Println("\n" + strings.Repeat("─", 70))
|
||||
fmt.Println("DIAGNOSIS:")
|
||||
fmt.Println(strings.Repeat("─", 70))
|
||||
|
||||
|
||||
if report.DiagnosisResult.IsTruncated {
|
||||
fmt.Println(" ❌ File is TRUNCATED")
|
||||
}
|
||||
@@ -470,21 +471,21 @@ func (ec *ErrorCollector) PrintReport(report *RestoreErrorReport) {
|
||||
fmt.Printf(" • %s\n", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Show recommendations
|
||||
fmt.Println("\n" + strings.Repeat("─", 70))
|
||||
fmt.Println("💡 RECOMMENDATIONS:")
|
||||
fmt.Println(strings.Repeat("─", 70))
|
||||
|
||||
|
||||
for _, rec := range report.Recommendations {
|
||||
fmt.Printf(" • %s\n", rec)
|
||||
}
|
||||
|
||||
|
||||
// Show tool versions
|
||||
fmt.Println("\n" + strings.Repeat("─", 70))
|
||||
fmt.Println("ENVIRONMENT:")
|
||||
fmt.Println(strings.Repeat("─", 70))
|
||||
|
||||
|
||||
fmt.Printf(" OS: %s/%s\n", report.OS, report.Arch)
|
||||
fmt.Printf(" Go: %s\n", report.GoVersion)
|
||||
if report.PgRestoreVersion != "" {
|
||||
@@ -493,15 +494,15 @@ func (ec *ErrorCollector) PrintReport(report *RestoreErrorReport) {
|
||||
if report.PsqlVersion != "" {
|
||||
fmt.Printf(" psql: %s\n", report.PsqlVersion)
|
||||
}
|
||||
|
||||
|
||||
fmt.Println(strings.Repeat("═", 70))
|
||||
}
|
||||
|
||||
// Helper functions
|
||||
|
||||
func isErrorLine(line string) bool {
|
||||
return strings.Contains(line, "ERROR:") ||
|
||||
strings.Contains(line, "FATAL:") ||
|
||||
return strings.Contains(line, "ERROR:") ||
|
||||
strings.Contains(line, "FATAL:") ||
|
||||
strings.Contains(line, "error:") ||
|
||||
strings.Contains(line, "PANIC:")
|
||||
}
|
||||
@@ -556,7 +557,11 @@ func getDatabaseType(format ArchiveFormat) string {
|
||||
}
|
||||
|
||||
func getCommandVersion(cmd string, arg string) string {
|
||||
output, err := exec.Command(cmd, arg).CombinedOutput()
|
||||
// Use timeout to prevent blocking if command hangs
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
output, err := exec.CommandContext(ctx, cmd, arg).CombinedOutput()
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"os/exec"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"dbbackup/internal/database"
|
||||
)
|
||||
@@ -47,8 +48,13 @@ func ParsePostgreSQLVersion(versionStr string) (*VersionInfo, error) {
|
||||
|
||||
// GetDumpFileVersion extracts the PostgreSQL version from a dump file
|
||||
// Uses pg_restore -l to read the dump metadata
|
||||
// Uses a 30-second timeout to avoid blocking on large files
|
||||
func GetDumpFileVersion(dumpPath string) (*VersionInfo, error) {
|
||||
cmd := exec.Command("pg_restore", "-l", dumpPath)
|
||||
// Use a timeout context to prevent blocking on very large dump files
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
cmd := exec.CommandContext(ctx, "pg_restore", "-l", dumpPath)
|
||||
output, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read dump file metadata: %w (output: %s)", err, string(output))
|
||||
|
||||
@@ -83,10 +83,10 @@ type backupCompleteMsg struct {
|
||||
|
||||
func executeBackupWithTUIProgress(parentCtx context.Context, cfg *config.Config, log logger.Logger, backupType, dbName string, ratio int) tea.Cmd {
|
||||
return func() tea.Msg {
|
||||
// Use configurable cluster timeout (minutes) from config; default set in config.New()
|
||||
// Use parent context to inherit cancellation from TUI
|
||||
clusterTimeout := time.Duration(cfg.ClusterTimeoutMinutes) * time.Minute
|
||||
ctx, cancel := context.WithTimeout(parentCtx, clusterTimeout)
|
||||
// NO TIMEOUT for backup operations - a backup takes as long as it takes
|
||||
// Large databases can take many hours
|
||||
// Only manual cancellation (Ctrl+C) should stop the backup
|
||||
ctx, cancel := context.WithCancel(parentCtx)
|
||||
defer cancel()
|
||||
|
||||
start := time.Now()
|
||||
|
||||
@@ -53,7 +53,8 @@ type databaseListMsg struct {
|
||||
|
||||
func fetchDatabases(cfg *config.Config, log logger.Logger) tea.Cmd {
|
||||
return func() tea.Msg {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
||||
// 60 seconds for database listing - busy servers may be slow
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
|
||||
defer cancel()
|
||||
|
||||
dbClient, err := database.New(cfg, log)
|
||||
|
||||
@@ -111,10 +111,10 @@ type restoreCompleteMsg struct {
|
||||
|
||||
func executeRestoreWithTUIProgress(parentCtx context.Context, cfg *config.Config, log logger.Logger, archive ArchiveInfo, targetDB string, cleanFirst, createIfMissing bool, restoreType string, cleanClusterFirst bool, existingDBs []string, saveDebugLog bool) tea.Cmd {
|
||||
return func() tea.Msg {
|
||||
// Use configurable cluster timeout (minutes) from config; default set in config.New()
|
||||
// Use parent context to inherit cancellation from TUI
|
||||
restoreTimeout := time.Duration(cfg.ClusterTimeoutMinutes) * time.Minute
|
||||
ctx, cancel := context.WithTimeout(parentCtx, restoreTimeout)
|
||||
// NO TIMEOUT for restore operations - a restore takes as long as it takes
|
||||
// Large databases with large objects can take many hours
|
||||
// Only manual cancellation (Ctrl+C) should stop the restore
|
||||
ctx, cancel := context.WithCancel(parentCtx)
|
||||
defer cancel()
|
||||
|
||||
start := time.Now()
|
||||
@@ -138,8 +138,8 @@ func executeRestoreWithTUIProgress(parentCtx context.Context, cfg *config.Config
|
||||
// This matches how cluster restore works - uses CLI tools, not database connections
|
||||
droppedCount := 0
|
||||
for _, dbName := range existingDBs {
|
||||
// Create timeout context for each database drop (30 seconds per DB)
|
||||
dropCtx, dropCancel := context.WithTimeout(ctx, 30*time.Second)
|
||||
// Create timeout context for each database drop (5 minutes per DB - large DBs take time)
|
||||
dropCtx, dropCancel := context.WithTimeout(ctx, 5*time.Minute)
|
||||
if err := dropDatabaseCLI(dropCtx, cfg, dbName); err != nil {
|
||||
log.Warn("Failed to drop database", "name", dbName, "error", err)
|
||||
// Continue with other databases
|
||||
|
||||
@@ -106,7 +106,8 @@ type safetyCheckCompleteMsg struct {
|
||||
|
||||
func runSafetyChecks(cfg *config.Config, log logger.Logger, archive ArchiveInfo, targetDB string) tea.Cmd {
|
||||
return func() tea.Msg {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
|
||||
// 10 minutes for safety checks - large archives can take a long time to diagnose
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
safety := restore.NewSafety(cfg, log)
|
||||
@@ -444,7 +445,7 @@ func (m RestorePreviewModel) View() string {
|
||||
// Advanced Options
|
||||
s.WriteString(archiveHeaderStyle.Render("⚙️ Advanced Options"))
|
||||
s.WriteString("\n")
|
||||
|
||||
|
||||
// Work directory option
|
||||
workDirIcon := "✗"
|
||||
workDirStyle := infoStyle
|
||||
@@ -460,7 +461,7 @@ func (m RestorePreviewModel) View() string {
|
||||
s.WriteString(infoStyle.Render(" ⚠️ Large archives need more space than /tmp may have"))
|
||||
s.WriteString("\n")
|
||||
}
|
||||
|
||||
|
||||
// Debug log option
|
||||
debugIcon := "✗"
|
||||
debugStyle := infoStyle
|
||||
|
||||
@@ -70,7 +70,8 @@ type statusMsg struct {
|
||||
|
||||
func fetchStatus(cfg *config.Config, log logger.Logger) tea.Cmd {
|
||||
return func() tea.Msg {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
// 30 seconds for status check - slow networks or SSL negotiation
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
dbClient, err := database.New(cfg, log)
|
||||
|
||||
Reference in New Issue
Block a user