Fix: Auto-detect large objects in cluster restore to prevent lock contention

- Added detectLargeObjectsInDumps() to scan dump files for BLOB/LARGE OBJECT entries
- Automatically reduces ClusterParallelism to 1 when large objects detected
- Prevents 'could not open large object' and 'max_locks_per_transaction' errors
- Sequential restore eliminates lock table exhaustion when multiple DBs have BLOBs
- Uses pg_restore -l for fast metadata scanning (checks up to 5 dumps)
- Logs warning and shows user notification when parallelism adjusted
- Also includes: CLUSTER_RESTORE_COMPLIANCE.md documentation and enhanced d7030 test DB
This commit is contained in:
2025-11-14 13:28:50 +00:00
parent f801c7a549
commit bfce57a0b6
3 changed files with 251 additions and 11 deletions

View File

@@ -548,11 +548,24 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string) error {
estimator := progress.NewETAEstimator("Restoring cluster", totalDBs)
e.progress.SetEstimator(estimator)
// Check for large objects in dump files and adjust parallelism
hasLargeObjects := e.detectLargeObjectsInDumps(dumpsDir, entries)
// Use worker pool for parallel restore
parallelism := e.cfg.ClusterParallelism
if parallelism < 1 {
parallelism = 1 // Ensure at least sequential
}
// Automatically reduce parallelism if large objects detected
if hasLargeObjects && parallelism > 1 {
e.log.Warn("Large objects detected in dump files - reducing parallelism to avoid lock contention",
"original_parallelism", parallelism,
"adjusted_parallelism", 1)
e.progress.Update("⚠️ Large objects detected - using sequential restore to avoid lock conflicts")
time.Sleep(2 * time.Second) // Give user time to see warning
parallelism = 1
}
var successCount, failCount int32
var failedDBsMu sync.Mutex
@@ -973,6 +986,56 @@ func (e *Engine) previewClusterRestore(archivePath string) error {
return nil
}
// detectLargeObjectsInDumps checks if any dump files contain large objects
func (e *Engine) detectLargeObjectsInDumps(dumpsDir string, entries []os.DirEntry) bool {
hasLargeObjects := false
checkedCount := 0
maxChecks := 5 // Only check first 5 dumps to avoid slowdown
for _, entry := range entries {
if entry.IsDir() || checkedCount >= maxChecks {
continue
}
dumpFile := filepath.Join(dumpsDir, entry.Name())
// Skip compressed SQL files (can't easily check without decompressing)
if strings.HasSuffix(dumpFile, ".sql.gz") {
continue
}
// Use pg_restore -l to list contents (fast, doesn't restore data)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cmd := exec.CommandContext(ctx, "pg_restore", "-l", dumpFile)
output, err := cmd.Output()
if err != nil {
// If pg_restore -l fails, it might not be custom format - skip
continue
}
checkedCount++
// Check if output contains "BLOB" or "LARGE OBJECT" entries
outputStr := string(output)
if strings.Contains(outputStr, "BLOB") ||
strings.Contains(outputStr, "LARGE OBJECT") ||
strings.Contains(outputStr, " BLOBS ") {
e.log.Info("Large objects detected in dump file", "file", entry.Name())
hasLargeObjects = true
// Don't break - log all files with large objects
}
}
if hasLargeObjects {
e.log.Warn("Cluster contains databases with large objects - parallel restore may cause lock contention")
}
return hasLargeObjects
}
// FormatBytes formats bytes to human readable format
func FormatBytes(bytes int64) string {
const unit = 1024