fix(backup/restore): implement DB+Go specialist recommendations

P0: Add ON_ERROR_STOP=1 to psql (fail fast, not 2.6M errors)
P1: Fix pipe deadlock in streaming compression (goroutine+context)
P1: Handle SIGPIPE (exit 141) - report compressor as root cause
P2: Validate .dump files with pg_restore --list before restore
P2: Add fsync after streaming compression for durability

Fixes potential hung backups and improves error diagnostics.
This commit is contained in:
2026-01-07 08:58:00 +01:00
parent 9ee55309bd
commit 91228552fb
3 changed files with 71 additions and 10 deletions

View File

@@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Detects truncated COPY blocks that cause "syntax error" failures
- Catches corrupted backups in seconds instead of wasting 49+ minutes
- Cluster restore pre-validates ALL dumps upfront (fail-fast approach)
- Custom format `.dump` files now validated with `pg_restore --list`
**Improved Error Messages:**
- Clear indication when dump file is truncated
@@ -22,6 +23,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Provides actionable error messages with root cause
### Fixed
- **P0: psql ON_ERROR_STOP** - Added `-v ON_ERROR_STOP=1` to psql commands to fail fast on first error instead of accumulating millions of errors
- **P1: Pipe deadlock** - Fixed streaming compression deadlock when pg_dump blocks on full pipe buffer; now uses goroutine with proper context timeout handling
- **P1: SIGPIPE handling** - Detect exit code 141 (broken pipe) and report compressor failure as root cause
- **P2: .dump validation** - Custom format dumps now validated with `pg_restore --list` before restore
- **P2: fsync durability** - Added `outFile.Sync()` after streaming compression to prevent truncation on power loss
- Truncated `.sql.gz` dumps no longer waste hours on doomed restores
- "syntax error at or near" errors now caught before restore begins
- Cluster restores abort immediately if any dump is corrupted
@@ -29,7 +35,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Technical Details
- Integrated `Diagnoser` into restore pipeline for pre-validation
- Added `quickValidateSQLDump()` for fast integrity checks
- Pre-validation runs on all `.sql.gz` files in cluster archives
- Pre-validation runs on all `.sql.gz` and `.dump` files in cluster archives
- Streaming compression uses channel-based wait with context cancellation
- Zero performance impact on valid backups (diagnosis is fast)
---

View File

@@ -1368,25 +1368,53 @@ func (e *Engine) executeWithStreamingCompression(ctx context.Context, cmdArgs []
// Then start pg_dump
if err := dumpCmd.Start(); err != nil {
compressCmd.Process.Kill()
return fmt.Errorf("failed to start pg_dump: %w", err)
}
// Wait for pg_dump to complete
dumpErr := dumpCmd.Wait()
// Wait for pg_dump in a goroutine to handle context timeout properly
// This prevents deadlock if pipe buffer fills and pg_dump blocks
dumpDone := make(chan error, 1)
go func() {
dumpDone <- dumpCmd.Wait()
}()
// Close stdout pipe to signal compressor we're done - MUST happen after Wait()
// but before we check for errors, so compressor gets EOF
var dumpErr error
select {
case dumpErr = <-dumpDone:
// pg_dump completed (success or failure)
case <-ctx.Done():
// Context cancelled/timeout - kill pg_dump to unblock
e.log.Warn("Backup timeout - killing pg_dump process")
dumpCmd.Process.Kill()
<-dumpDone // Wait for goroutine to finish
dumpErr = ctx.Err()
}
// Close stdout pipe to signal compressor we're done
// This MUST happen after pg_dump exits to avoid broken pipe
dumpStdout.Close()
// Wait for compression to complete
compressErr := compressCmd.Wait()
// Check errors in order
// Check errors - compressor failure first (it's usually the root cause)
if compressErr != nil {
e.log.Error("Compressor failed", "error", compressErr)
return fmt.Errorf("compression failed (check disk space): %w", compressErr)
}
if dumpErr != nil {
// Check for SIGPIPE (exit code 141) - indicates compressor died first
if exitErr, ok := dumpErr.(*exec.ExitError); ok && exitErr.ExitCode() == 141 {
e.log.Error("pg_dump received SIGPIPE - compressor may have failed")
return fmt.Errorf("pg_dump broken pipe - check disk space and compressor")
}
return fmt.Errorf("pg_dump failed: %w", dumpErr)
}
if compressErr != nil {
return fmt.Errorf("compression failed: %w", compressErr)
// Sync file to disk to ensure durability (prevents truncation on power loss)
if err := outFile.Sync(); err != nil {
e.log.Warn("Failed to sync output file", "error", err)
}
e.log.Debug("Streaming compression completed", "output", compressedFile)

View File

@@ -271,9 +271,10 @@ func (e *Engine) restorePostgreSQLSQL(ctx context.Context, archivePath, targetDB
}
if compressed {
psqlCmd := fmt.Sprintf("psql -U %s -d %s", e.cfg.User, targetDB)
// Use ON_ERROR_STOP=1 to fail fast on first error (prevents millions of errors on truncated dumps)
psqlCmd := fmt.Sprintf("psql -U %s -d %s -v ON_ERROR_STOP=1", e.cfg.User, targetDB)
if hostArg != "" {
psqlCmd = fmt.Sprintf("psql %s -U %s -d %s", hostArg, e.cfg.User, targetDB)
psqlCmd = fmt.Sprintf("psql %s -U %s -d %s -v ON_ERROR_STOP=1", hostArg, e.cfg.User, targetDB)
}
// Set PGPASSWORD in the bash command for password-less auth
cmd = []string{
@@ -288,6 +289,7 @@ func (e *Engine) restorePostgreSQLSQL(ctx context.Context, archivePath, targetDB
"-p", fmt.Sprintf("%d", e.cfg.Port),
"-U", e.cfg.User,
"-d", targetDB,
"-v", "ON_ERROR_STOP=1",
"-f", archivePath,
}
} else {
@@ -295,6 +297,7 @@ func (e *Engine) restorePostgreSQLSQL(ctx context.Context, archivePath, targetDB
"psql",
"-U", e.cfg.User,
"-d", targetDB,
"-v", "ON_ERROR_STOP=1",
"-f", archivePath,
}
}
@@ -721,6 +724,29 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string) error {
"truncated", result.IsTruncated,
"errors", result.Errors)
}
} else if strings.HasSuffix(dumpFile, ".dump") {
// Validate custom format dumps using pg_restore --list
cmd := exec.Command("pg_restore", "--list", dumpFile)
output, err := cmd.CombinedOutput()
if err != nil {
dbName := strings.TrimSuffix(entry.Name(), ".dump")
errDetail := strings.TrimSpace(string(output))
if len(errDetail) > 100 {
errDetail = errDetail[:100] + "..."
}
// Check for truncation indicators
if strings.Contains(errDetail, "unexpected end") || strings.Contains(errDetail, "invalid") {
corruptedDumps = append(corruptedDumps, fmt.Sprintf("%s: %s", dbName, errDetail))
e.log.Error("CORRUPTED custom dump file detected",
"database", dbName,
"file", entry.Name(),
"error", errDetail)
} else {
e.log.Warn("pg_restore --list warning (may be recoverable)",
"file", entry.Name(),
"error", errDetail)
}
}
}
}
if len(corruptedDumps) > 0 {