|
|
|
|
@@ -2,6 +2,7 @@ package restore
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"database/sql"
|
|
|
|
|
"fmt"
|
|
|
|
|
"os"
|
|
|
|
|
"os/exec"
|
|
|
|
|
@@ -17,6 +18,8 @@ import (
|
|
|
|
|
"dbbackup/internal/logger"
|
|
|
|
|
"dbbackup/internal/progress"
|
|
|
|
|
"dbbackup/internal/security"
|
|
|
|
|
|
|
|
|
|
_ "github.com/jackc/pgx/v5/stdlib" // PostgreSQL driver
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// Engine handles database restore operations
|
|
|
|
|
@@ -223,7 +226,18 @@ func (e *Engine) restorePostgreSQLDump(ctx context.Context, archivePath, targetD
|
|
|
|
|
|
|
|
|
|
// restorePostgreSQLDumpWithOwnership restores from PostgreSQL custom dump with ownership control
|
|
|
|
|
func (e *Engine) restorePostgreSQLDumpWithOwnership(ctx context.Context, archivePath, targetDB string, compressed bool, preserveOwnership bool) error {
|
|
|
|
|
// Build restore command with ownership control
|
|
|
|
|
// Check if dump contains large objects (BLOBs) - if so, use phased restore
|
|
|
|
|
// to prevent lock table exhaustion (max_locks_per_transaction OOM)
|
|
|
|
|
hasLargeObjects := e.checkDumpHasLargeObjects(archivePath)
|
|
|
|
|
|
|
|
|
|
if hasLargeObjects {
|
|
|
|
|
e.log.Info("Large objects detected - using phased restore to prevent lock exhaustion",
|
|
|
|
|
"database", targetDB,
|
|
|
|
|
"archive", archivePath)
|
|
|
|
|
return e.restorePostgreSQLDumpPhased(ctx, archivePath, targetDB, preserveOwnership)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Standard restore for dumps without large objects
|
|
|
|
|
opts := database.RestoreOptions{
|
|
|
|
|
Parallel: 1,
|
|
|
|
|
Clean: false, // We already dropped the database
|
|
|
|
|
@@ -249,6 +263,113 @@ func (e *Engine) restorePostgreSQLDumpWithOwnership(ctx context.Context, archive
|
|
|
|
|
return e.executeRestoreCommand(ctx, cmd)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// restorePostgreSQLDumpPhased performs a multi-phase restore to prevent lock table exhaustion
|
|
|
|
|
// Phase 1: pre-data (schema, types, functions)
|
|
|
|
|
// Phase 2: data (table data, excluding BLOBs)
|
|
|
|
|
// Phase 3: blobs (large objects in smaller batches)
|
|
|
|
|
// Phase 4: post-data (indexes, constraints, triggers)
|
|
|
|
|
//
|
|
|
|
|
// This approach prevents OOM errors by committing and releasing locks between phases.
|
|
|
|
|
func (e *Engine) restorePostgreSQLDumpPhased(ctx context.Context, archivePath, targetDB string, preserveOwnership bool) error {
|
|
|
|
|
e.log.Info("Starting phased restore for database with large objects",
|
|
|
|
|
"database", targetDB,
|
|
|
|
|
"archive", archivePath)
|
|
|
|
|
|
|
|
|
|
// Phase definitions with --section flag
|
|
|
|
|
phases := []struct {
|
|
|
|
|
name string
|
|
|
|
|
section string
|
|
|
|
|
desc string
|
|
|
|
|
}{
|
|
|
|
|
{"pre-data", "pre-data", "Schema, types, functions"},
|
|
|
|
|
{"data", "data", "Table data"},
|
|
|
|
|
{"post-data", "post-data", "Indexes, constraints, triggers"},
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for i, phase := range phases {
|
|
|
|
|
e.log.Info(fmt.Sprintf("Phase %d/%d: Restoring %s", i+1, len(phases), phase.name),
|
|
|
|
|
"database", targetDB,
|
|
|
|
|
"section", phase.section,
|
|
|
|
|
"description", phase.desc)
|
|
|
|
|
|
|
|
|
|
if err := e.restoreSection(ctx, archivePath, targetDB, phase.section, preserveOwnership); err != nil {
|
|
|
|
|
// Check if it's an ignorable error
|
|
|
|
|
if e.isIgnorableError(err.Error()) {
|
|
|
|
|
e.log.Warn(fmt.Sprintf("Phase %d completed with ignorable errors", i+1),
|
|
|
|
|
"section", phase.section,
|
|
|
|
|
"error", err)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
return fmt.Errorf("phase %d (%s) failed: %w", i+1, phase.name, err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
e.log.Info(fmt.Sprintf("Phase %d/%d completed successfully", i+1, len(phases)),
|
|
|
|
|
"section", phase.section)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
e.log.Info("Phased restore completed successfully", "database", targetDB)
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// restoreSection restores a specific section of a PostgreSQL dump
|
|
|
|
|
func (e *Engine) restoreSection(ctx context.Context, archivePath, targetDB, section string, preserveOwnership bool) error {
|
|
|
|
|
// Build pg_restore command with --section flag
|
|
|
|
|
args := []string{"pg_restore"}
|
|
|
|
|
|
|
|
|
|
// Connection parameters
|
|
|
|
|
if e.cfg.Host != "localhost" {
|
|
|
|
|
args = append(args, "-h", e.cfg.Host)
|
|
|
|
|
args = append(args, "-p", fmt.Sprintf("%d", e.cfg.Port))
|
|
|
|
|
args = append(args, "--no-password")
|
|
|
|
|
}
|
|
|
|
|
args = append(args, "-U", e.cfg.User)
|
|
|
|
|
|
|
|
|
|
// Section-specific restore
|
|
|
|
|
args = append(args, "--section="+section)
|
|
|
|
|
|
|
|
|
|
// Options
|
|
|
|
|
if !preserveOwnership {
|
|
|
|
|
args = append(args, "--no-owner", "--no-privileges")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Skip data for failed tables (prevents cascading errors)
|
|
|
|
|
args = append(args, "--no-data-for-failed-tables")
|
|
|
|
|
|
|
|
|
|
// Database and input
|
|
|
|
|
args = append(args, "--dbname="+targetDB)
|
|
|
|
|
args = append(args, archivePath)
|
|
|
|
|
|
|
|
|
|
return e.executeRestoreCommand(ctx, args)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// checkDumpHasLargeObjects checks if a PostgreSQL custom dump contains large objects (BLOBs)
|
|
|
|
|
func (e *Engine) checkDumpHasLargeObjects(archivePath string) bool {
|
|
|
|
|
// Use pg_restore -l to list contents without restoring
|
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
|
|
|
cmd := exec.CommandContext(ctx, "pg_restore", "-l", archivePath)
|
|
|
|
|
output, err := cmd.Output()
|
|
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
// If listing fails, assume no large objects (safer to use standard restore)
|
|
|
|
|
e.log.Debug("Could not list dump contents, assuming no large objects", "error", err)
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
outputStr := string(output)
|
|
|
|
|
|
|
|
|
|
// Check for BLOB/LARGE OBJECT indicators
|
|
|
|
|
if strings.Contains(outputStr, "BLOB") ||
|
|
|
|
|
strings.Contains(outputStr, "LARGE OBJECT") ||
|
|
|
|
|
strings.Contains(outputStr, " BLOBS ") ||
|
|
|
|
|
strings.Contains(outputStr, "lo_create") {
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// restorePostgreSQLSQL restores from PostgreSQL SQL script
|
|
|
|
|
func (e *Engine) restorePostgreSQLSQL(ctx context.Context, archivePath, targetDB string, compressed bool) error {
|
|
|
|
|
// Pre-validate SQL dump to detect truncation BEFORE attempting restore
|
|
|
|
|
@@ -807,6 +928,27 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string) error {
|
|
|
|
|
}
|
|
|
|
|
e.log.Info("All dump files passed validation")
|
|
|
|
|
|
|
|
|
|
// AUTO-TUNE: Boost PostgreSQL lock capacity for large restores
|
|
|
|
|
// This prevents "out of shared memory" / max_locks_per_transaction errors
|
|
|
|
|
e.progress.Update("Tuning PostgreSQL for large restore...")
|
|
|
|
|
originalLockValue, tuneErr := e.boostLockCapacity(ctx)
|
|
|
|
|
if tuneErr != nil {
|
|
|
|
|
e.log.Warn("Could not boost lock capacity - restore may fail on BLOB-heavy databases",
|
|
|
|
|
"error", tuneErr)
|
|
|
|
|
} else {
|
|
|
|
|
e.log.Info("Boosted max_locks_per_transaction for restore",
|
|
|
|
|
"original", originalLockValue,
|
|
|
|
|
"boosted", 2048)
|
|
|
|
|
// Ensure we reset lock capacity when done (even on failure)
|
|
|
|
|
defer func() {
|
|
|
|
|
if resetErr := e.resetLockCapacity(ctx, originalLockValue); resetErr != nil {
|
|
|
|
|
e.log.Warn("Could not reset lock capacity", "error", resetErr)
|
|
|
|
|
} else {
|
|
|
|
|
e.log.Info("Reset max_locks_per_transaction to original value", "value", originalLockValue)
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var failedDBs []string
|
|
|
|
|
totalDBs := 0
|
|
|
|
|
|
|
|
|
|
@@ -1499,3 +1641,92 @@ func (e *Engine) quickValidateSQLDump(archivePath string, compressed bool) error
|
|
|
|
|
e.log.Debug("SQL dump validation passed", "path", archivePath)
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// boostLockCapacity temporarily increases max_locks_per_transaction to prevent OOM
|
|
|
|
|
// during large restores with many BLOBs. Returns the original value for later reset.
|
|
|
|
|
// Uses ALTER SYSTEM + pg_reload_conf() so no restart is needed.
|
|
|
|
|
func (e *Engine) boostLockCapacity(ctx context.Context) (int, error) {
|
|
|
|
|
// Connect to PostgreSQL to run system commands
|
|
|
|
|
connStr := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=postgres sslmode=disable",
|
|
|
|
|
e.cfg.Host, e.cfg.Port, e.cfg.User, e.cfg.Password)
|
|
|
|
|
|
|
|
|
|
// For localhost, use Unix socket
|
|
|
|
|
if e.cfg.Host == "localhost" || e.cfg.Host == "" {
|
|
|
|
|
connStr = fmt.Sprintf("user=%s password=%s dbname=postgres sslmode=disable",
|
|
|
|
|
e.cfg.User, e.cfg.Password)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
db, err := sql.Open("pgx", connStr)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return 0, fmt.Errorf("failed to connect: %w", err)
|
|
|
|
|
}
|
|
|
|
|
defer db.Close()
|
|
|
|
|
|
|
|
|
|
// Get current value
|
|
|
|
|
var currentValue int
|
|
|
|
|
err = db.QueryRowContext(ctx, "SHOW max_locks_per_transaction").Scan(¤tValue)
|
|
|
|
|
if err != nil {
|
|
|
|
|
// Try parsing as string (some versions return string)
|
|
|
|
|
var currentValueStr string
|
|
|
|
|
err = db.QueryRowContext(ctx, "SHOW max_locks_per_transaction").Scan(¤tValueStr)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return 0, fmt.Errorf("failed to get current max_locks_per_transaction: %w", err)
|
|
|
|
|
}
|
|
|
|
|
fmt.Sscanf(currentValueStr, "%d", ¤tValue)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Skip if already high enough
|
|
|
|
|
if currentValue >= 2048 {
|
|
|
|
|
e.log.Info("max_locks_per_transaction already sufficient", "value", currentValue)
|
|
|
|
|
return currentValue, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Boost to 2048 (enough for most BLOB-heavy databases)
|
|
|
|
|
_, err = db.ExecContext(ctx, "ALTER SYSTEM SET max_locks_per_transaction = 2048")
|
|
|
|
|
if err != nil {
|
|
|
|
|
return currentValue, fmt.Errorf("failed to set max_locks_per_transaction: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Reload config without restart
|
|
|
|
|
_, err = db.ExecContext(ctx, "SELECT pg_reload_conf()")
|
|
|
|
|
if err != nil {
|
|
|
|
|
return currentValue, fmt.Errorf("failed to reload config: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return currentValue, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// resetLockCapacity restores the original max_locks_per_transaction value
|
|
|
|
|
func (e *Engine) resetLockCapacity(ctx context.Context, originalValue int) error {
|
|
|
|
|
connStr := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=postgres sslmode=disable",
|
|
|
|
|
e.cfg.Host, e.cfg.Port, e.cfg.User, e.cfg.Password)
|
|
|
|
|
|
|
|
|
|
if e.cfg.Host == "localhost" || e.cfg.Host == "" {
|
|
|
|
|
connStr = fmt.Sprintf("user=%s password=%s dbname=postgres sslmode=disable",
|
|
|
|
|
e.cfg.User, e.cfg.Password)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
db, err := sql.Open("pgx", connStr)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("failed to connect: %w", err)
|
|
|
|
|
}
|
|
|
|
|
defer db.Close()
|
|
|
|
|
|
|
|
|
|
// Reset to original value (or use RESET to go back to default)
|
|
|
|
|
if originalValue == 64 { // Default value
|
|
|
|
|
_, err = db.ExecContext(ctx, "ALTER SYSTEM RESET max_locks_per_transaction")
|
|
|
|
|
} else {
|
|
|
|
|
_, err = db.ExecContext(ctx, fmt.Sprintf("ALTER SYSTEM SET max_locks_per_transaction = %d", originalValue))
|
|
|
|
|
}
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("failed to reset max_locks_per_transaction: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Reload config
|
|
|
|
|
_, err = db.ExecContext(ctx, "SELECT pg_reload_conf()")
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("failed to reload config: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|