fix(backup/restore): implement DB+Go specialist recommendations
P0: Add ON_ERROR_STOP=1 to psql (fail fast, not 2.6M errors) P1: Fix pipe deadlock in streaming compression (goroutine+context) P1: Handle SIGPIPE (exit 141) - report compressor as root cause P2: Validate .dump files with pg_restore --list before restore P2: Add fsync after streaming compression for durability Fixes potential hung backups and improves error diagnostics.
This commit is contained in:
@@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||||||
- Detects truncated COPY blocks that cause "syntax error" failures
|
- Detects truncated COPY blocks that cause "syntax error" failures
|
||||||
- Catches corrupted backups in seconds instead of wasting 49+ minutes
|
- Catches corrupted backups in seconds instead of wasting 49+ minutes
|
||||||
- Cluster restore pre-validates ALL dumps upfront (fail-fast approach)
|
- Cluster restore pre-validates ALL dumps upfront (fail-fast approach)
|
||||||
|
- Custom format `.dump` files now validated with `pg_restore --list`
|
||||||
|
|
||||||
**Improved Error Messages:**
|
**Improved Error Messages:**
|
||||||
- Clear indication when dump file is truncated
|
- Clear indication when dump file is truncated
|
||||||
@@ -22,6 +23,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||||||
- Provides actionable error messages with root cause
|
- Provides actionable error messages with root cause
|
||||||
|
|
||||||
### Fixed
|
### Fixed
|
||||||
|
- **P0: psql ON_ERROR_STOP** - Added `-v ON_ERROR_STOP=1` to psql commands to fail fast on first error instead of accumulating millions of errors
|
||||||
|
- **P1: Pipe deadlock** - Fixed streaming compression deadlock when pg_dump blocks on full pipe buffer; now uses goroutine with proper context timeout handling
|
||||||
|
- **P1: SIGPIPE handling** - Detect exit code 141 (broken pipe) and report compressor failure as root cause
|
||||||
|
- **P2: .dump validation** - Custom format dumps now validated with `pg_restore --list` before restore
|
||||||
|
- **P2: fsync durability** - Added `outFile.Sync()` after streaming compression to prevent truncation on power loss
|
||||||
- Truncated `.sql.gz` dumps no longer waste hours on doomed restores
|
- Truncated `.sql.gz` dumps no longer waste hours on doomed restores
|
||||||
- "syntax error at or near" errors now caught before restore begins
|
- "syntax error at or near" errors now caught before restore begins
|
||||||
- Cluster restores abort immediately if any dump is corrupted
|
- Cluster restores abort immediately if any dump is corrupted
|
||||||
@@ -29,7 +35,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||||||
### Technical Details
|
### Technical Details
|
||||||
- Integrated `Diagnoser` into restore pipeline for pre-validation
|
- Integrated `Diagnoser` into restore pipeline for pre-validation
|
||||||
- Added `quickValidateSQLDump()` for fast integrity checks
|
- Added `quickValidateSQLDump()` for fast integrity checks
|
||||||
- Pre-validation runs on all `.sql.gz` files in cluster archives
|
- Pre-validation runs on all `.sql.gz` and `.dump` files in cluster archives
|
||||||
|
- Streaming compression uses channel-based wait with context cancellation
|
||||||
- Zero performance impact on valid backups (diagnosis is fast)
|
- Zero performance impact on valid backups (diagnosis is fast)
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|||||||
@@ -1368,25 +1368,53 @@ func (e *Engine) executeWithStreamingCompression(ctx context.Context, cmdArgs []
|
|||||||
|
|
||||||
// Then start pg_dump
|
// Then start pg_dump
|
||||||
if err := dumpCmd.Start(); err != nil {
|
if err := dumpCmd.Start(); err != nil {
|
||||||
|
compressCmd.Process.Kill()
|
||||||
return fmt.Errorf("failed to start pg_dump: %w", err)
|
return fmt.Errorf("failed to start pg_dump: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait for pg_dump to complete
|
// Wait for pg_dump in a goroutine to handle context timeout properly
|
||||||
dumpErr := dumpCmd.Wait()
|
// This prevents deadlock if pipe buffer fills and pg_dump blocks
|
||||||
|
dumpDone := make(chan error, 1)
|
||||||
|
go func() {
|
||||||
|
dumpDone <- dumpCmd.Wait()
|
||||||
|
}()
|
||||||
|
|
||||||
// Close stdout pipe to signal compressor we're done - MUST happen after Wait()
|
var dumpErr error
|
||||||
// but before we check for errors, so compressor gets EOF
|
select {
|
||||||
|
case dumpErr = <-dumpDone:
|
||||||
|
// pg_dump completed (success or failure)
|
||||||
|
case <-ctx.Done():
|
||||||
|
// Context cancelled/timeout - kill pg_dump to unblock
|
||||||
|
e.log.Warn("Backup timeout - killing pg_dump process")
|
||||||
|
dumpCmd.Process.Kill()
|
||||||
|
<-dumpDone // Wait for goroutine to finish
|
||||||
|
dumpErr = ctx.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close stdout pipe to signal compressor we're done
|
||||||
|
// This MUST happen after pg_dump exits to avoid broken pipe
|
||||||
dumpStdout.Close()
|
dumpStdout.Close()
|
||||||
|
|
||||||
// Wait for compression to complete
|
// Wait for compression to complete
|
||||||
compressErr := compressCmd.Wait()
|
compressErr := compressCmd.Wait()
|
||||||
|
|
||||||
// Check errors in order
|
// Check errors - compressor failure first (it's usually the root cause)
|
||||||
|
if compressErr != nil {
|
||||||
|
e.log.Error("Compressor failed", "error", compressErr)
|
||||||
|
return fmt.Errorf("compression failed (check disk space): %w", compressErr)
|
||||||
|
}
|
||||||
if dumpErr != nil {
|
if dumpErr != nil {
|
||||||
|
// Check for SIGPIPE (exit code 141) - indicates compressor died first
|
||||||
|
if exitErr, ok := dumpErr.(*exec.ExitError); ok && exitErr.ExitCode() == 141 {
|
||||||
|
e.log.Error("pg_dump received SIGPIPE - compressor may have failed")
|
||||||
|
return fmt.Errorf("pg_dump broken pipe - check disk space and compressor")
|
||||||
|
}
|
||||||
return fmt.Errorf("pg_dump failed: %w", dumpErr)
|
return fmt.Errorf("pg_dump failed: %w", dumpErr)
|
||||||
}
|
}
|
||||||
if compressErr != nil {
|
|
||||||
return fmt.Errorf("compression failed: %w", compressErr)
|
// Sync file to disk to ensure durability (prevents truncation on power loss)
|
||||||
|
if err := outFile.Sync(); err != nil {
|
||||||
|
e.log.Warn("Failed to sync output file", "error", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
e.log.Debug("Streaming compression completed", "output", compressedFile)
|
e.log.Debug("Streaming compression completed", "output", compressedFile)
|
||||||
|
|||||||
@@ -271,9 +271,10 @@ func (e *Engine) restorePostgreSQLSQL(ctx context.Context, archivePath, targetDB
|
|||||||
}
|
}
|
||||||
|
|
||||||
if compressed {
|
if compressed {
|
||||||
psqlCmd := fmt.Sprintf("psql -U %s -d %s", e.cfg.User, targetDB)
|
// Use ON_ERROR_STOP=1 to fail fast on first error (prevents millions of errors on truncated dumps)
|
||||||
|
psqlCmd := fmt.Sprintf("psql -U %s -d %s -v ON_ERROR_STOP=1", e.cfg.User, targetDB)
|
||||||
if hostArg != "" {
|
if hostArg != "" {
|
||||||
psqlCmd = fmt.Sprintf("psql %s -U %s -d %s", hostArg, e.cfg.User, targetDB)
|
psqlCmd = fmt.Sprintf("psql %s -U %s -d %s -v ON_ERROR_STOP=1", hostArg, e.cfg.User, targetDB)
|
||||||
}
|
}
|
||||||
// Set PGPASSWORD in the bash command for password-less auth
|
// Set PGPASSWORD in the bash command for password-less auth
|
||||||
cmd = []string{
|
cmd = []string{
|
||||||
@@ -288,6 +289,7 @@ func (e *Engine) restorePostgreSQLSQL(ctx context.Context, archivePath, targetDB
|
|||||||
"-p", fmt.Sprintf("%d", e.cfg.Port),
|
"-p", fmt.Sprintf("%d", e.cfg.Port),
|
||||||
"-U", e.cfg.User,
|
"-U", e.cfg.User,
|
||||||
"-d", targetDB,
|
"-d", targetDB,
|
||||||
|
"-v", "ON_ERROR_STOP=1",
|
||||||
"-f", archivePath,
|
"-f", archivePath,
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@@ -295,6 +297,7 @@ func (e *Engine) restorePostgreSQLSQL(ctx context.Context, archivePath, targetDB
|
|||||||
"psql",
|
"psql",
|
||||||
"-U", e.cfg.User,
|
"-U", e.cfg.User,
|
||||||
"-d", targetDB,
|
"-d", targetDB,
|
||||||
|
"-v", "ON_ERROR_STOP=1",
|
||||||
"-f", archivePath,
|
"-f", archivePath,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -721,6 +724,29 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string) error {
|
|||||||
"truncated", result.IsTruncated,
|
"truncated", result.IsTruncated,
|
||||||
"errors", result.Errors)
|
"errors", result.Errors)
|
||||||
}
|
}
|
||||||
|
} else if strings.HasSuffix(dumpFile, ".dump") {
|
||||||
|
// Validate custom format dumps using pg_restore --list
|
||||||
|
cmd := exec.Command("pg_restore", "--list", dumpFile)
|
||||||
|
output, err := cmd.CombinedOutput()
|
||||||
|
if err != nil {
|
||||||
|
dbName := strings.TrimSuffix(entry.Name(), ".dump")
|
||||||
|
errDetail := strings.TrimSpace(string(output))
|
||||||
|
if len(errDetail) > 100 {
|
||||||
|
errDetail = errDetail[:100] + "..."
|
||||||
|
}
|
||||||
|
// Check for truncation indicators
|
||||||
|
if strings.Contains(errDetail, "unexpected end") || strings.Contains(errDetail, "invalid") {
|
||||||
|
corruptedDumps = append(corruptedDumps, fmt.Sprintf("%s: %s", dbName, errDetail))
|
||||||
|
e.log.Error("CORRUPTED custom dump file detected",
|
||||||
|
"database", dbName,
|
||||||
|
"file", entry.Name(),
|
||||||
|
"error", errDetail)
|
||||||
|
} else {
|
||||||
|
e.log.Warn("pg_restore --list warning (may be recoverable)",
|
||||||
|
"file", entry.Name(),
|
||||||
|
"error", errDetail)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(corruptedDumps) > 0 {
|
if len(corruptedDumps) > 0 {
|
||||||
|
|||||||
Reference in New Issue
Block a user