From 91228552fbd9eb547d1cbd09b3a61ed612101dfb Mon Sep 17 00:00:00 2001 From: Alexander Renz Date: Wed, 7 Jan 2026 08:58:00 +0100 Subject: [PATCH] fix(backup/restore): implement DB+Go specialist recommendations P0: Add ON_ERROR_STOP=1 to psql (fail fast, not 2.6M errors) P1: Fix pipe deadlock in streaming compression (goroutine+context) P1: Handle SIGPIPE (exit 141) - report compressor as root cause P2: Validate .dump files with pg_restore --list before restore P2: Add fsync after streaming compression for durability Fixes potential hung backups and improves error diagnostics. --- CHANGELOG.md | 9 +++++++- internal/backup/engine.go | 42 +++++++++++++++++++++++++++++++------- internal/restore/engine.go | 30 +++++++++++++++++++++++++-- 3 files changed, 71 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d188f26..1fa6f3b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Detects truncated COPY blocks that cause "syntax error" failures - Catches corrupted backups in seconds instead of wasting 49+ minutes - Cluster restore pre-validates ALL dumps upfront (fail-fast approach) +- Custom format `.dump` files now validated with `pg_restore --list` **Improved Error Messages:** - Clear indication when dump file is truncated @@ -22,6 +23,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Provides actionable error messages with root cause ### Fixed +- **P0: psql ON_ERROR_STOP** - Added `-v ON_ERROR_STOP=1` to psql commands to fail fast on first error instead of accumulating millions of errors +- **P1: Pipe deadlock** - Fixed streaming compression deadlock when pg_dump blocks on full pipe buffer; now uses goroutine with proper context timeout handling +- **P1: SIGPIPE handling** - Detect exit code 141 (broken pipe) and report compressor failure as root cause +- **P2: .dump validation** - Custom format dumps now validated with `pg_restore --list` before restore +- **P2: fsync durability** - Added `outFile.Sync()` after streaming compression to prevent truncation on power loss - Truncated `.sql.gz` dumps no longer waste hours on doomed restores - "syntax error at or near" errors now caught before restore begins - Cluster restores abort immediately if any dump is corrupted @@ -29,7 +35,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Technical Details - Integrated `Diagnoser` into restore pipeline for pre-validation - Added `quickValidateSQLDump()` for fast integrity checks -- Pre-validation runs on all `.sql.gz` files in cluster archives +- Pre-validation runs on all `.sql.gz` and `.dump` files in cluster archives +- Streaming compression uses channel-based wait with context cancellation - Zero performance impact on valid backups (diagnosis is fast) --- diff --git a/internal/backup/engine.go b/internal/backup/engine.go index ef3b9b3..7a2ce60 100755 --- a/internal/backup/engine.go +++ b/internal/backup/engine.go @@ -1368,25 +1368,53 @@ func (e *Engine) executeWithStreamingCompression(ctx context.Context, cmdArgs [] // Then start pg_dump if err := dumpCmd.Start(); err != nil { + compressCmd.Process.Kill() return fmt.Errorf("failed to start pg_dump: %w", err) } - // Wait for pg_dump to complete - dumpErr := dumpCmd.Wait() + // Wait for pg_dump in a goroutine to handle context timeout properly + // This prevents deadlock if pipe buffer fills and pg_dump blocks + dumpDone := make(chan error, 1) + go func() { + dumpDone <- dumpCmd.Wait() + }() - // Close stdout pipe to signal compressor we're done - MUST happen after Wait() - // but before we check for errors, so compressor gets EOF + var dumpErr error + select { + case dumpErr = <-dumpDone: + // pg_dump completed (success or failure) + case <-ctx.Done(): + // Context cancelled/timeout - kill pg_dump to unblock + e.log.Warn("Backup timeout - killing pg_dump process") + dumpCmd.Process.Kill() + <-dumpDone // Wait for goroutine to finish + dumpErr = ctx.Err() + } + + // Close stdout pipe to signal compressor we're done + // This MUST happen after pg_dump exits to avoid broken pipe dumpStdout.Close() // Wait for compression to complete compressErr := compressCmd.Wait() - // Check errors in order + // Check errors - compressor failure first (it's usually the root cause) + if compressErr != nil { + e.log.Error("Compressor failed", "error", compressErr) + return fmt.Errorf("compression failed (check disk space): %w", compressErr) + } if dumpErr != nil { + // Check for SIGPIPE (exit code 141) - indicates compressor died first + if exitErr, ok := dumpErr.(*exec.ExitError); ok && exitErr.ExitCode() == 141 { + e.log.Error("pg_dump received SIGPIPE - compressor may have failed") + return fmt.Errorf("pg_dump broken pipe - check disk space and compressor") + } return fmt.Errorf("pg_dump failed: %w", dumpErr) } - if compressErr != nil { - return fmt.Errorf("compression failed: %w", compressErr) + + // Sync file to disk to ensure durability (prevents truncation on power loss) + if err := outFile.Sync(); err != nil { + e.log.Warn("Failed to sync output file", "error", err) } e.log.Debug("Streaming compression completed", "output", compressedFile) diff --git a/internal/restore/engine.go b/internal/restore/engine.go index d961f1e..af095e4 100755 --- a/internal/restore/engine.go +++ b/internal/restore/engine.go @@ -271,9 +271,10 @@ func (e *Engine) restorePostgreSQLSQL(ctx context.Context, archivePath, targetDB } if compressed { - psqlCmd := fmt.Sprintf("psql -U %s -d %s", e.cfg.User, targetDB) + // Use ON_ERROR_STOP=1 to fail fast on first error (prevents millions of errors on truncated dumps) + psqlCmd := fmt.Sprintf("psql -U %s -d %s -v ON_ERROR_STOP=1", e.cfg.User, targetDB) if hostArg != "" { - psqlCmd = fmt.Sprintf("psql %s -U %s -d %s", hostArg, e.cfg.User, targetDB) + psqlCmd = fmt.Sprintf("psql %s -U %s -d %s -v ON_ERROR_STOP=1", hostArg, e.cfg.User, targetDB) } // Set PGPASSWORD in the bash command for password-less auth cmd = []string{ @@ -288,6 +289,7 @@ func (e *Engine) restorePostgreSQLSQL(ctx context.Context, archivePath, targetDB "-p", fmt.Sprintf("%d", e.cfg.Port), "-U", e.cfg.User, "-d", targetDB, + "-v", "ON_ERROR_STOP=1", "-f", archivePath, } } else { @@ -295,6 +297,7 @@ func (e *Engine) restorePostgreSQLSQL(ctx context.Context, archivePath, targetDB "psql", "-U", e.cfg.User, "-d", targetDB, + "-v", "ON_ERROR_STOP=1", "-f", archivePath, } } @@ -721,6 +724,29 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string) error { "truncated", result.IsTruncated, "errors", result.Errors) } + } else if strings.HasSuffix(dumpFile, ".dump") { + // Validate custom format dumps using pg_restore --list + cmd := exec.Command("pg_restore", "--list", dumpFile) + output, err := cmd.CombinedOutput() + if err != nil { + dbName := strings.TrimSuffix(entry.Name(), ".dump") + errDetail := strings.TrimSpace(string(output)) + if len(errDetail) > 100 { + errDetail = errDetail[:100] + "..." + } + // Check for truncation indicators + if strings.Contains(errDetail, "unexpected end") || strings.Contains(errDetail, "invalid") { + corruptedDumps = append(corruptedDumps, fmt.Sprintf("%s: %s", dbName, errDetail)) + e.log.Error("CORRUPTED custom dump file detected", + "database", dbName, + "file", entry.Name(), + "error", errDetail) + } else { + e.log.Warn("pg_restore --list warning (may be recoverable)", + "file", entry.Name(), + "error", errDetail) + } + } } } if len(corruptedDumps) > 0 {