diff --git a/cmd/restore.go b/cmd/restore.go index c6548ce..792add5 100755 --- a/cmd/restore.go +++ b/cmd/restore.go @@ -13,6 +13,7 @@ import ( "dbbackup/internal/backup" "dbbackup/internal/cloud" "dbbackup/internal/database" + "dbbackup/internal/pitr" "dbbackup/internal/restore" "dbbackup/internal/security" @@ -33,6 +34,15 @@ var ( // Encryption flags restoreEncryptionKeyFile string restoreEncryptionKeyEnv string = "DBBACKUP_ENCRYPTION_KEY" + + // PITR restore flags (additional to pitr.go) + pitrBaseBackup string + pitrWALArchive string + pitrTargetDir string + pitrInclusive bool + pitrSkipExtract bool + pitrAutoStart bool + pitrMonitor bool ) // restoreCmd represents the restore command @@ -146,11 +156,61 @@ Shows information about each archive: RunE: runRestoreList, } +// restorePITRCmd performs Point-in-Time Recovery +var restorePITRCmd = &cobra.Command{ + Use: "pitr", + Short: "Point-in-Time Recovery (PITR) restore", + Long: `Restore PostgreSQL database to a specific point in time using WAL archives. + +PITR allows restoring to any point in time, not just the backup moment. +Requires a base backup and continuous WAL archives. + +Recovery Target Types: + --target-time Restore to specific timestamp + --target-xid Restore to transaction ID + --target-lsn Restore to Log Sequence Number + --target-name Restore to named restore point + --target-immediate Restore to earliest consistent point + +Examples: + # Restore to specific time + dbbackup restore pitr \\ + --base-backup /backups/base.tar.gz \\ + --wal-archive /backups/wal/ \\ + --target-time "2024-11-26 12:00:00" \\ + --target-dir /var/lib/postgresql/14/main + + # Restore to transaction ID + dbbackup restore pitr \\ + --base-backup /backups/base.tar.gz \\ + --wal-archive /backups/wal/ \\ + --target-xid 1000000 \\ + --target-dir /var/lib/postgresql/14/main \\ + --auto-start + + # Restore to LSN + dbbackup restore pitr \\ + --base-backup /backups/base.tar.gz \\ + --wal-archive /backups/wal/ \\ + --target-lsn "0/3000000" \\ + --target-dir /var/lib/postgresql/14/main + + # Restore to earliest consistent point + dbbackup restore pitr \\ + --base-backup /backups/base.tar.gz \\ + --wal-archive /backups/wal/ \\ + --target-immediate \\ + --target-dir /var/lib/postgresql/14/main +`, + RunE: runRestorePITR, +} + func init() { rootCmd.AddCommand(restoreCmd) restoreCmd.AddCommand(restoreSingleCmd) restoreCmd.AddCommand(restoreClusterCmd) restoreCmd.AddCommand(restoreListCmd) + restoreCmd.AddCommand(restorePITRCmd) // Single restore flags restoreSingleCmd.Flags().BoolVar(&restoreConfirm, "confirm", false, "Confirm and execute restore (required)") @@ -173,6 +233,26 @@ func init() { restoreClusterCmd.Flags().BoolVar(&restoreNoProgress, "no-progress", false, "Disable progress indicators") restoreClusterCmd.Flags().StringVar(&restoreEncryptionKeyFile, "encryption-key-file", "", "Path to encryption key file (required for encrypted backups)") restoreClusterCmd.Flags().StringVar(&restoreEncryptionKeyEnv, "encryption-key-env", "DBBACKUP_ENCRYPTION_KEY", "Environment variable containing encryption key") + + // PITR restore flags + restorePITRCmd.Flags().StringVar(&pitrBaseBackup, "base-backup", "", "Path to base backup file (.tar.gz) (required)") + restorePITRCmd.Flags().StringVar(&pitrWALArchive, "wal-archive", "", "Path to WAL archive directory (required)") + restorePITRCmd.Flags().StringVar(&pitrTargetTime, "target-time", "", "Restore to timestamp (YYYY-MM-DD HH:MM:SS)") + restorePITRCmd.Flags().StringVar(&pitrTargetXID, "target-xid", "", "Restore to transaction ID") + restorePITRCmd.Flags().StringVar(&pitrTargetLSN, "target-lsn", "", "Restore to LSN (e.g., 0/3000000)") + restorePITRCmd.Flags().StringVar(&pitrTargetName, "target-name", "", "Restore to named restore point") + restorePITRCmd.Flags().BoolVar(&pitrTargetImmediate, "target-immediate", false, "Restore to earliest consistent point") + restorePITRCmd.Flags().StringVar(&pitrRecoveryAction, "target-action", "promote", "Action after recovery (promote|pause|shutdown)") + restorePITRCmd.Flags().StringVar(&pitrTargetDir, "target-dir", "", "PostgreSQL data directory (required)") + restorePITRCmd.Flags().StringVar(&pitrWALSource, "timeline", "latest", "Timeline to follow (latest or timeline ID)") + restorePITRCmd.Flags().BoolVar(&pitrInclusive, "inclusive", true, "Include target transaction/time") + restorePITRCmd.Flags().BoolVar(&pitrSkipExtract, "skip-extraction", false, "Skip base backup extraction (data dir exists)") + restorePITRCmd.Flags().BoolVar(&pitrAutoStart, "auto-start", false, "Automatically start PostgreSQL after setup") + restorePITRCmd.Flags().BoolVar(&pitrMonitor, "monitor", false, "Monitor recovery progress (requires --auto-start)") + + restorePITRCmd.MarkFlagRequired("base-backup") + restorePITRCmd.MarkFlagRequired("wal-archive") + restorePITRCmd.MarkFlagRequired("target-dir") } // runRestoreSingle restores a single database @@ -605,3 +685,53 @@ func truncate(s string, max int) string { } return s[:max-3] + "..." } + +// runRestorePITR performs Point-in-Time Recovery +func runRestorePITR(cmd *cobra.Command, args []string) error { + ctx := cmd.Context() + + // Parse recovery target + target, err := pitr.ParseRecoveryTarget( + pitrTargetTime, + pitrTargetXID, + pitrTargetLSN, + pitrTargetName, + pitrTargetImmediate, + pitrRecoveryAction, + pitrWALSource, + pitrInclusive, + ) + if err != nil { + return fmt.Errorf("invalid recovery target: %w", err) + } + + // Display recovery target info + log.Info("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━") + log.Info(" Point-in-Time Recovery (PITR)") + log.Info("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━") + log.Info("") + log.Info(target.String()) + log.Info("") + + // Create restore orchestrator + orchestrator := pitr.NewRestoreOrchestrator(cfg, log) + + // Prepare restore options + opts := &pitr.RestoreOptions{ + BaseBackupPath: pitrBaseBackup, + WALArchiveDir: pitrWALArchive, + Target: target, + TargetDataDir: pitrTargetDir, + SkipExtraction: pitrSkipExtract, + AutoStart: pitrAutoStart, + MonitorProgress: pitrMonitor, + } + + // Perform PITR restore + if err := orchestrator.RestorePointInTime(ctx, opts); err != nil { + return fmt.Errorf("PITR restore failed: %w", err) + } + + log.Info("✅ PITR restore completed successfully") + return nil +} diff --git a/internal/pitr/recovery_config.go b/internal/pitr/recovery_config.go new file mode 100644 index 0000000..0aef543 --- /dev/null +++ b/internal/pitr/recovery_config.go @@ -0,0 +1,314 @@ +package pitr + +import ( + "fmt" + "os" + "path/filepath" + "strings" + + "dbbackup/internal/logger" +) + +// RecoveryConfigGenerator generates PostgreSQL recovery configuration files +type RecoveryConfigGenerator struct { + log logger.Logger +} + +// NewRecoveryConfigGenerator creates a new recovery config generator +func NewRecoveryConfigGenerator(log logger.Logger) *RecoveryConfigGenerator { + return &RecoveryConfigGenerator{ + log: log, + } +} + +// RecoveryConfig holds all recovery configuration parameters +type RecoveryConfig struct { + // Core recovery settings + Target *RecoveryTarget + WALArchiveDir string + RestoreCommand string + + // PostgreSQL version + PostgreSQLVersion int // Major version (12, 13, 14, etc.) + + // Additional settings + PrimaryConnInfo string // For standby mode + PrimarySlotName string // Replication slot name + RecoveryMinApplyDelay string // Min delay for replay + + // Paths + DataDir string // PostgreSQL data directory +} + +// GenerateRecoveryConfig writes recovery configuration files +// PostgreSQL 12+: postgresql.auto.conf + recovery.signal +// PostgreSQL < 12: recovery.conf +func (rcg *RecoveryConfigGenerator) GenerateRecoveryConfig(config *RecoveryConfig) error { + rcg.log.Info("Generating recovery configuration", + "pg_version", config.PostgreSQLVersion, + "target_type", config.Target.Type, + "data_dir", config.DataDir) + + if config.PostgreSQLVersion >= 12 { + return rcg.generateModernRecoveryConfig(config) + } + return rcg.generateLegacyRecoveryConfig(config) +} + +// generateModernRecoveryConfig generates config for PostgreSQL 12+ +// Uses postgresql.auto.conf and recovery.signal +func (rcg *RecoveryConfigGenerator) generateModernRecoveryConfig(config *RecoveryConfig) error { + // Create recovery.signal file (empty file that triggers recovery mode) + recoverySignalPath := filepath.Join(config.DataDir, "recovery.signal") + rcg.log.Info("Creating recovery.signal file", "path", recoverySignalPath) + + signalFile, err := os.Create(recoverySignalPath) + if err != nil { + return fmt.Errorf("failed to create recovery.signal: %w", err) + } + signalFile.Close() + + // Generate postgresql.auto.conf with recovery settings + autoConfPath := filepath.Join(config.DataDir, "postgresql.auto.conf") + rcg.log.Info("Generating postgresql.auto.conf", "path", autoConfPath) + + var sb strings.Builder + sb.WriteString("# PostgreSQL recovery configuration\n") + sb.WriteString("# Generated by dbbackup for Point-in-Time Recovery\n") + sb.WriteString(fmt.Sprintf("# Target: %s\n", config.Target.Summary())) + sb.WriteString("\n") + + // Restore command + if config.RestoreCommand == "" { + config.RestoreCommand = rcg.generateRestoreCommand(config.WALArchiveDir) + } + sb.WriteString(FormatConfigLine("restore_command", config.RestoreCommand)) + sb.WriteString("\n") + + // Recovery target parameters + targetConfig := config.Target.ToPostgreSQLConfig() + for key, value := range targetConfig { + sb.WriteString(FormatConfigLine(key, value)) + sb.WriteString("\n") + } + + // Optional: Primary connection info (for standby mode) + if config.PrimaryConnInfo != "" { + sb.WriteString("\n# Standby configuration\n") + sb.WriteString(FormatConfigLine("primary_conninfo", config.PrimaryConnInfo)) + sb.WriteString("\n") + if config.PrimarySlotName != "" { + sb.WriteString(FormatConfigLine("primary_slot_name", config.PrimarySlotName)) + sb.WriteString("\n") + } + } + + // Optional: Recovery delay + if config.RecoveryMinApplyDelay != "" { + sb.WriteString(FormatConfigLine("recovery_min_apply_delay", config.RecoveryMinApplyDelay)) + sb.WriteString("\n") + } + + // Write the configuration file + if err := os.WriteFile(autoConfPath, []byte(sb.String()), 0600); err != nil { + return fmt.Errorf("failed to write postgresql.auto.conf: %w", err) + } + + rcg.log.Info("Recovery configuration generated successfully", + "signal", recoverySignalPath, + "config", autoConfPath) + + return nil +} + +// generateLegacyRecoveryConfig generates config for PostgreSQL < 12 +// Uses recovery.conf file +func (rcg *RecoveryConfigGenerator) generateLegacyRecoveryConfig(config *RecoveryConfig) error { + recoveryConfPath := filepath.Join(config.DataDir, "recovery.conf") + rcg.log.Info("Generating recovery.conf (legacy)", "path", recoveryConfPath) + + var sb strings.Builder + sb.WriteString("# PostgreSQL recovery configuration\n") + sb.WriteString("# Generated by dbbackup for Point-in-Time Recovery\n") + sb.WriteString(fmt.Sprintf("# Target: %s\n", config.Target.Summary())) + sb.WriteString("\n") + + // Restore command + if config.RestoreCommand == "" { + config.RestoreCommand = rcg.generateRestoreCommand(config.WALArchiveDir) + } + sb.WriteString(FormatConfigLine("restore_command", config.RestoreCommand)) + sb.WriteString("\n") + + // Recovery target parameters + targetConfig := config.Target.ToPostgreSQLConfig() + for key, value := range targetConfig { + sb.WriteString(FormatConfigLine(key, value)) + sb.WriteString("\n") + } + + // Optional: Primary connection info (for standby mode) + if config.PrimaryConnInfo != "" { + sb.WriteString("\n# Standby configuration\n") + sb.WriteString(FormatConfigLine("standby_mode", "on")) + sb.WriteString("\n") + sb.WriteString(FormatConfigLine("primary_conninfo", config.PrimaryConnInfo)) + sb.WriteString("\n") + if config.PrimarySlotName != "" { + sb.WriteString(FormatConfigLine("primary_slot_name", config.PrimarySlotName)) + sb.WriteString("\n") + } + } + + // Optional: Recovery delay + if config.RecoveryMinApplyDelay != "" { + sb.WriteString(FormatConfigLine("recovery_min_apply_delay", config.RecoveryMinApplyDelay)) + sb.WriteString("\n") + } + + // Write the configuration file + if err := os.WriteFile(recoveryConfPath, []byte(sb.String()), 0600); err != nil { + return fmt.Errorf("failed to write recovery.conf: %w", err) + } + + rcg.log.Info("Recovery configuration generated successfully", "file", recoveryConfPath) + + return nil +} + +// generateRestoreCommand creates a restore_command for fetching WAL files +func (rcg *RecoveryConfigGenerator) generateRestoreCommand(walArchiveDir string) string { + // The restore_command is executed by PostgreSQL to fetch WAL files + // %f = WAL filename, %p = full path to copy WAL file to + + // Try multiple extensions (.gz.enc, .enc, .gz, plain) + // This handles compressed and/or encrypted WAL files + return fmt.Sprintf(`bash -c 'for ext in .gz.enc .enc .gz ""; do [ -f "%s/%%f$ext" ] && { [ -z "$ext" ] && cp "%s/%%f$ext" "%%p" || case "$ext" in *.gz.enc) gpg -d "%s/%%f$ext" | gunzip > "%%p" ;; *.enc) gpg -d "%s/%%f$ext" > "%%p" ;; *.gz) gunzip -c "%s/%%f$ext" > "%%p" ;; esac; exit 0; }; done; exit 1'`, + walArchiveDir, walArchiveDir, walArchiveDir, walArchiveDir, walArchiveDir) +} + +// ValidateDataDirectory validates that the target directory is suitable for recovery +func (rcg *RecoveryConfigGenerator) ValidateDataDirectory(dataDir string) error { + rcg.log.Info("Validating data directory", "path", dataDir) + + // Check if directory exists + stat, err := os.Stat(dataDir) + if err != nil { + if os.IsNotExist(err) { + return fmt.Errorf("data directory does not exist: %s", dataDir) + } + return fmt.Errorf("failed to access data directory: %w", err) + } + + if !stat.IsDir() { + return fmt.Errorf("data directory is not a directory: %s", dataDir) + } + + // Check for PG_VERSION file (indicates PostgreSQL data directory) + pgVersionPath := filepath.Join(dataDir, "PG_VERSION") + if _, err := os.Stat(pgVersionPath); err != nil { + if os.IsNotExist(err) { + rcg.log.Warn("PG_VERSION file not found - may not be a PostgreSQL data directory", "path", dataDir) + } + } + + // Check if PostgreSQL is running (postmaster.pid exists) + postmasterPid := filepath.Join(dataDir, "postmaster.pid") + if _, err := os.Stat(postmasterPid); err == nil { + return fmt.Errorf("PostgreSQL is currently running in data directory %s (postmaster.pid exists). Stop PostgreSQL before running recovery", dataDir) + } + + // Check write permissions + testFile := filepath.Join(dataDir, ".dbbackup_test_write") + if err := os.WriteFile(testFile, []byte("test"), 0600); err != nil { + return fmt.Errorf("data directory is not writable: %w", err) + } + os.Remove(testFile) + + rcg.log.Info("Data directory validation passed", "path", dataDir) + return nil +} + +// DetectPostgreSQLVersion detects the PostgreSQL version from the data directory +func (rcg *RecoveryConfigGenerator) DetectPostgreSQLVersion(dataDir string) (int, error) { + pgVersionPath := filepath.Join(dataDir, "PG_VERSION") + + content, err := os.ReadFile(pgVersionPath) + if err != nil { + return 0, fmt.Errorf("failed to read PG_VERSION: %w", err) + } + + versionStr := strings.TrimSpace(string(content)) + + // Parse major version (e.g., "14" or "14.2") + parts := strings.Split(versionStr, ".") + if len(parts) == 0 { + return 0, fmt.Errorf("invalid PG_VERSION format: %s", versionStr) + } + + var majorVersion int + if _, err := fmt.Sscanf(parts[0], "%d", &majorVersion); err != nil { + return 0, fmt.Errorf("failed to parse PostgreSQL version from '%s': %w", versionStr, err) + } + + rcg.log.Info("Detected PostgreSQL version", "version", majorVersion, "full", versionStr) + return majorVersion, nil +} + +// CleanupRecoveryFiles removes recovery configuration files (for cleanup after recovery) +func (rcg *RecoveryConfigGenerator) CleanupRecoveryFiles(dataDir string, pgVersion int) error { + rcg.log.Info("Cleaning up recovery files", "data_dir", dataDir) + + if pgVersion >= 12 { + // Remove recovery.signal + recoverySignal := filepath.Join(dataDir, "recovery.signal") + if err := os.Remove(recoverySignal); err != nil && !os.IsNotExist(err) { + rcg.log.Warn("Failed to remove recovery.signal", "error", err) + } + + // Note: postgresql.auto.conf is kept as it may contain other settings + rcg.log.Info("Removed recovery.signal file") + } else { + // Remove recovery.conf + recoveryConf := filepath.Join(dataDir, "recovery.conf") + if err := os.Remove(recoveryConf); err != nil && !os.IsNotExist(err) { + rcg.log.Warn("Failed to remove recovery.conf", "error", err) + } + rcg.log.Info("Removed recovery.conf file") + } + + // Remove recovery.done if it exists (created by PostgreSQL after successful recovery) + recoveryDone := filepath.Join(dataDir, "recovery.done") + if err := os.Remove(recoveryDone); err != nil && !os.IsNotExist(err) { + rcg.log.Warn("Failed to remove recovery.done", "error", err) + } + + return nil +} + +// BackupExistingConfig backs up existing recovery configuration (if any) +func (rcg *RecoveryConfigGenerator) BackupExistingConfig(dataDir string) error { + timestamp := fmt.Sprintf("%d", os.Getpid()) + + // Backup recovery.signal if exists (PG 12+) + recoverySignal := filepath.Join(dataDir, "recovery.signal") + if _, err := os.Stat(recoverySignal); err == nil { + backup := filepath.Join(dataDir, fmt.Sprintf("recovery.signal.bak.%s", timestamp)) + if err := os.Rename(recoverySignal, backup); err != nil { + return fmt.Errorf("failed to backup recovery.signal: %w", err) + } + rcg.log.Info("Backed up existing recovery.signal", "backup", backup) + } + + // Backup recovery.conf if exists (PG < 12) + recoveryConf := filepath.Join(dataDir, "recovery.conf") + if _, err := os.Stat(recoveryConf); err == nil { + backup := filepath.Join(dataDir, fmt.Sprintf("recovery.conf.bak.%s", timestamp)) + if err := os.Rename(recoveryConf, backup); err != nil { + return fmt.Errorf("failed to backup recovery.conf: %w", err) + } + rcg.log.Info("Backed up existing recovery.conf", "backup", backup) + } + + return nil +} diff --git a/internal/pitr/recovery_target.go b/internal/pitr/recovery_target.go new file mode 100644 index 0000000..fff64a7 --- /dev/null +++ b/internal/pitr/recovery_target.go @@ -0,0 +1,323 @@ +package pitr + +import ( + "fmt" + "regexp" + "strconv" + "strings" + "time" +) + +// RecoveryTarget represents a PostgreSQL recovery target +type RecoveryTarget struct { + Type string // "time", "xid", "lsn", "name", "immediate" + Value string // The target value (timestamp, XID, LSN, or restore point name) + Action string // "promote", "pause", "shutdown" + Timeline string // Timeline to follow ("latest" or timeline ID) + Inclusive bool // Whether target is inclusive (default: true) +} + +// RecoveryTargetType constants +const ( + TargetTypeTime = "time" + TargetTypeXID = "xid" + TargetTypeLSN = "lsn" + TargetTypeName = "name" + TargetTypeImmediate = "immediate" +) + +// RecoveryAction constants +const ( + ActionPromote = "promote" + ActionPause = "pause" + ActionShutdown = "shutdown" +) + +// ParseRecoveryTarget creates a RecoveryTarget from CLI flags +func ParseRecoveryTarget( + targetTime, targetXID, targetLSN, targetName string, + targetImmediate bool, + targetAction, timeline string, + inclusive bool, +) (*RecoveryTarget, error) { + rt := &RecoveryTarget{ + Action: targetAction, + Timeline: timeline, + Inclusive: inclusive, + } + + // Validate action + if rt.Action == "" { + rt.Action = ActionPromote // Default + } + if !isValidAction(rt.Action) { + return nil, fmt.Errorf("invalid recovery action: %s (must be promote, pause, or shutdown)", rt.Action) + } + + // Determine target type (only one can be specified) + targetsSpecified := 0 + if targetTime != "" { + rt.Type = TargetTypeTime + rt.Value = targetTime + targetsSpecified++ + } + if targetXID != "" { + rt.Type = TargetTypeXID + rt.Value = targetXID + targetsSpecified++ + } + if targetLSN != "" { + rt.Type = TargetTypeLSN + rt.Value = targetLSN + targetsSpecified++ + } + if targetName != "" { + rt.Type = TargetTypeName + rt.Value = targetName + targetsSpecified++ + } + if targetImmediate { + rt.Type = TargetTypeImmediate + rt.Value = "immediate" + targetsSpecified++ + } + + if targetsSpecified == 0 { + return nil, fmt.Errorf("no recovery target specified (use --target-time, --target-xid, --target-lsn, --target-name, or --target-immediate)") + } + if targetsSpecified > 1 { + return nil, fmt.Errorf("multiple recovery targets specified, only one allowed") + } + + // Validate the target + if err := rt.Validate(); err != nil { + return nil, err + } + + return rt, nil +} + +// Validate validates the recovery target configuration +func (rt *RecoveryTarget) Validate() error { + if rt.Type == "" { + return fmt.Errorf("recovery target type not specified") + } + + switch rt.Type { + case TargetTypeTime: + return rt.validateTime() + case TargetTypeXID: + return rt.validateXID() + case TargetTypeLSN: + return rt.validateLSN() + case TargetTypeName: + return rt.validateName() + case TargetTypeImmediate: + // Immediate has no value to validate + return nil + default: + return fmt.Errorf("unknown recovery target type: %s", rt.Type) + } +} + +// validateTime validates a timestamp target +func (rt *RecoveryTarget) validateTime() error { + if rt.Value == "" { + return fmt.Errorf("recovery target time is empty") + } + + // Try parsing various timestamp formats + formats := []string{ + "2006-01-02 15:04:05", // Standard format + "2006-01-02 15:04:05.999999", // With microseconds + "2006-01-02T15:04:05", // ISO 8601 + "2006-01-02T15:04:05Z", // ISO 8601 with UTC + "2006-01-02T15:04:05-07:00", // ISO 8601 with timezone + time.RFC3339, // RFC3339 + time.RFC3339Nano, // RFC3339 with nanoseconds + } + + var parseErr error + for _, format := range formats { + _, err := time.Parse(format, rt.Value) + if err == nil { + return nil // Successfully parsed + } + parseErr = err + } + + return fmt.Errorf("invalid timestamp format '%s': %w (expected format: YYYY-MM-DD HH:MM:SS)", rt.Value, parseErr) +} + +// validateXID validates a transaction ID target +func (rt *RecoveryTarget) validateXID() error { + if rt.Value == "" { + return fmt.Errorf("recovery target XID is empty") + } + + // XID must be a positive integer + xid, err := strconv.ParseUint(rt.Value, 10, 64) + if err != nil { + return fmt.Errorf("invalid transaction ID '%s': must be a positive integer", rt.Value) + } + + if xid == 0 { + return fmt.Errorf("invalid transaction ID 0: XID must be greater than 0") + } + + return nil +} + +// validateLSN validates a Log Sequence Number target +func (rt *RecoveryTarget) validateLSN() error { + if rt.Value == "" { + return fmt.Errorf("recovery target LSN is empty") + } + + // LSN format: XXX/XXXXXXXX (hex/hex) + // Example: 0/3000000, 1/A2000000 + lsnPattern := regexp.MustCompile(`^[0-9A-Fa-f]+/[0-9A-Fa-f]+$`) + if !lsnPattern.MatchString(rt.Value) { + return fmt.Errorf("invalid LSN format '%s': expected format XXX/XXXXXXXX (e.g., 0/3000000)", rt.Value) + } + + // Validate both parts are valid hex + parts := strings.Split(rt.Value, "/") + if len(parts) != 2 { + return fmt.Errorf("invalid LSN format '%s': must contain exactly one '/'", rt.Value) + } + + for i, part := range parts { + if _, err := strconv.ParseUint(part, 16, 64); err != nil { + return fmt.Errorf("invalid LSN component %d '%s': must be hexadecimal", i+1, part) + } + } + + return nil +} + +// validateName validates a restore point name target +func (rt *RecoveryTarget) validateName() error { + if rt.Value == "" { + return fmt.Errorf("recovery target name is empty") + } + + // PostgreSQL restore point names have some restrictions + // They should be valid identifiers + if len(rt.Value) > 63 { + return fmt.Errorf("restore point name too long: %d characters (max 63)", len(rt.Value)) + } + + // Check for invalid characters (only alphanumeric, underscore, hyphen) + validName := regexp.MustCompile(`^[a-zA-Z0-9_-]+$`) + if !validName.MatchString(rt.Value) { + return fmt.Errorf("invalid restore point name '%s': only alphanumeric, underscore, and hyphen allowed", rt.Value) + } + + return nil +} + +// isValidAction checks if the recovery action is valid +func isValidAction(action string) bool { + switch strings.ToLower(action) { + case ActionPromote, ActionPause, ActionShutdown: + return true + default: + return false + } +} + +// ToPostgreSQLConfig converts the recovery target to PostgreSQL configuration parameters +// Returns a map of config keys to values suitable for postgresql.auto.conf or recovery.conf +func (rt *RecoveryTarget) ToPostgreSQLConfig() map[string]string { + config := make(map[string]string) + + // Set recovery target based on type + switch rt.Type { + case TargetTypeTime: + config["recovery_target_time"] = rt.Value + case TargetTypeXID: + config["recovery_target_xid"] = rt.Value + case TargetTypeLSN: + config["recovery_target_lsn"] = rt.Value + case TargetTypeName: + config["recovery_target_name"] = rt.Value + case TargetTypeImmediate: + config["recovery_target"] = "immediate" + } + + // Set recovery target action + config["recovery_target_action"] = rt.Action + + // Set timeline + if rt.Timeline != "" { + config["recovery_target_timeline"] = rt.Timeline + } else { + config["recovery_target_timeline"] = "latest" + } + + // Set inclusive flag (only for time, xid, lsn targets) + if rt.Type != TargetTypeImmediate && rt.Type != TargetTypeName { + if rt.Inclusive { + config["recovery_target_inclusive"] = "true" + } else { + config["recovery_target_inclusive"] = "false" + } + } + + return config +} + +// FormatConfigLine formats a config key-value pair for PostgreSQL config files +func FormatConfigLine(key, value string) string { + // Quote values that contain spaces or special characters + needsQuoting := strings.ContainsAny(value, " \t#'\"\\") + if needsQuoting { + // Escape single quotes + value = strings.ReplaceAll(value, "'", "''") + return fmt.Sprintf("%s = '%s'", key, value) + } + return fmt.Sprintf("%s = %s", key, value) +} + +// String returns a human-readable representation of the recovery target +func (rt *RecoveryTarget) String() string { + var sb strings.Builder + + sb.WriteString("Recovery Target:\n") + sb.WriteString(fmt.Sprintf(" Type: %s\n", rt.Type)) + + if rt.Type != TargetTypeImmediate { + sb.WriteString(fmt.Sprintf(" Value: %s\n", rt.Value)) + } + + sb.WriteString(fmt.Sprintf(" Action: %s\n", rt.Action)) + + if rt.Timeline != "" { + sb.WriteString(fmt.Sprintf(" Timeline: %s\n", rt.Timeline)) + } + + if rt.Type != TargetTypeImmediate && rt.Type != TargetTypeName { + sb.WriteString(fmt.Sprintf(" Inclusive: %v\n", rt.Inclusive)) + } + + return sb.String() +} + +// Summary returns a one-line summary of the recovery target +func (rt *RecoveryTarget) Summary() string { + switch rt.Type { + case TargetTypeTime: + return fmt.Sprintf("Restore to time: %s", rt.Value) + case TargetTypeXID: + return fmt.Sprintf("Restore to transaction ID: %s", rt.Value) + case TargetTypeLSN: + return fmt.Sprintf("Restore to LSN: %s", rt.Value) + case TargetTypeName: + return fmt.Sprintf("Restore to named point: %s", rt.Value) + case TargetTypeImmediate: + return "Restore to earliest consistent point" + default: + return "Unknown recovery target" + } +} diff --git a/internal/pitr/restore.go b/internal/pitr/restore.go new file mode 100644 index 0000000..ad948a7 --- /dev/null +++ b/internal/pitr/restore.go @@ -0,0 +1,381 @@ +package pitr + +import ( + "context" + "fmt" + "os" + "os/exec" + "path/filepath" + "strings" + "time" + + "dbbackup/internal/config" + "dbbackup/internal/logger" +) + +// RestoreOrchestrator orchestrates Point-in-Time Recovery operations +type RestoreOrchestrator struct { + log logger.Logger + config *config.Config + configGen *RecoveryConfigGenerator +} + +// NewRestoreOrchestrator creates a new PITR restore orchestrator +func NewRestoreOrchestrator(cfg *config.Config, log logger.Logger) *RestoreOrchestrator { + return &RestoreOrchestrator{ + log: log, + config: cfg, + configGen: NewRecoveryConfigGenerator(log), + } +} + +// RestoreOptions holds options for PITR restore +type RestoreOptions struct { + BaseBackupPath string // Path to base backup file (.tar.gz, .sql, or directory) + WALArchiveDir string // Path to WAL archive directory + Target *RecoveryTarget // Recovery target + TargetDataDir string // PostgreSQL data directory to restore to + PostgreSQLBin string // Path to PostgreSQL binaries (optional, will auto-detect) + SkipExtraction bool // Skip base backup extraction (data dir already exists) + AutoStart bool // Automatically start PostgreSQL after recovery + MonitorProgress bool // Monitor recovery progress +} + +// RestorePointInTime performs a Point-in-Time Recovery +func (ro *RestoreOrchestrator) RestorePointInTime(ctx context.Context, opts *RestoreOptions) error { + ro.log.Info("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━") + ro.log.Info(" Point-in-Time Recovery (PITR)") + ro.log.Info("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━") + ro.log.Info("") + ro.log.Info("Target:", "summary", opts.Target.Summary()) + ro.log.Info("Base Backup:", "path", opts.BaseBackupPath) + ro.log.Info("WAL Archive:", "path", opts.WALArchiveDir) + ro.log.Info("Data Directory:", "path", opts.TargetDataDir) + ro.log.Info("") + + // Step 1: Validate inputs + if err := ro.validateInputs(opts); err != nil { + return fmt.Errorf("validation failed: %w", err) + } + + // Step 2: Extract base backup (if needed) + if !opts.SkipExtraction { + if err := ro.extractBaseBackup(ctx, opts); err != nil { + return fmt.Errorf("base backup extraction failed: %w", err) + } + } else { + ro.log.Info("Skipping base backup extraction (--skip-extraction)") + } + + // Step 3: Detect PostgreSQL version + pgVersion, err := ro.configGen.DetectPostgreSQLVersion(opts.TargetDataDir) + if err != nil { + return fmt.Errorf("failed to detect PostgreSQL version: %w", err) + } + ro.log.Info("PostgreSQL version detected", "version", pgVersion) + + // Step 4: Backup existing recovery config (if any) + if err := ro.configGen.BackupExistingConfig(opts.TargetDataDir); err != nil { + ro.log.Warn("Failed to backup existing recovery config", "error", err) + } + + // Step 5: Generate recovery configuration + recoveryConfig := &RecoveryConfig{ + Target: opts.Target, + WALArchiveDir: opts.WALArchiveDir, + PostgreSQLVersion: pgVersion, + DataDir: opts.TargetDataDir, + } + + if err := ro.configGen.GenerateRecoveryConfig(recoveryConfig); err != nil { + return fmt.Errorf("failed to generate recovery configuration: %w", err) + } + + ro.log.Info("✅ Recovery configuration generated successfully") + ro.log.Info("") + ro.log.Info("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━") + ro.log.Info(" Next Steps:") + ro.log.Info("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━") + ro.log.Info("") + ro.log.Info("1. Start PostgreSQL to begin recovery:") + ro.log.Info(fmt.Sprintf(" pg_ctl -D %s start", opts.TargetDataDir)) + ro.log.Info("") + ro.log.Info("2. Monitor recovery progress:") + ro.log.Info(" tail -f " + filepath.Join(opts.TargetDataDir, "log", "postgresql-*.log")) + ro.log.Info(" OR query: SELECT * FROM pg_stat_recovery_prefetch;") + ro.log.Info("") + ro.log.Info("3. After recovery completes:") + ro.log.Info(fmt.Sprintf(" - Action: %s", opts.Target.Action)) + if opts.Target.Action == ActionPromote { + ro.log.Info(" - PostgreSQL will automatically promote to primary") + } else if opts.Target.Action == ActionPause { + ro.log.Info(" - PostgreSQL will pause - manually promote with: pg_ctl promote") + } + ro.log.Info("") + ro.log.Info("Recovery configuration ready!") + ro.log.Info("") + + // Optional: Auto-start PostgreSQL + if opts.AutoStart { + if err := ro.startPostgreSQL(ctx, opts); err != nil { + ro.log.Error("Failed to start PostgreSQL", "error", err) + return fmt.Errorf("PostgreSQL startup failed: %w", err) + } + + // Optional: Monitor recovery + if opts.MonitorProgress { + if err := ro.monitorRecovery(ctx, opts); err != nil { + ro.log.Warn("Recovery monitoring encountered an issue", "error", err) + } + } + } + + return nil +} + +// validateInputs validates restore options +func (ro *RestoreOrchestrator) validateInputs(opts *RestoreOptions) error { + ro.log.Info("Validating restore options...") + + // Validate target + if opts.Target == nil { + return fmt.Errorf("recovery target not specified") + } + if err := opts.Target.Validate(); err != nil { + return fmt.Errorf("invalid recovery target: %w", err) + } + + // Validate base backup path + if !opts.SkipExtraction { + if opts.BaseBackupPath == "" { + return fmt.Errorf("base backup path not specified") + } + if _, err := os.Stat(opts.BaseBackupPath); err != nil { + return fmt.Errorf("base backup not found: %w", err) + } + } + + // Validate WAL archive directory + if opts.WALArchiveDir == "" { + return fmt.Errorf("WAL archive directory not specified") + } + if stat, err := os.Stat(opts.WALArchiveDir); err != nil { + return fmt.Errorf("WAL archive directory not accessible: %w", err) + } else if !stat.IsDir() { + return fmt.Errorf("WAL archive path is not a directory: %s", opts.WALArchiveDir) + } + + // Validate target data directory + if opts.TargetDataDir == "" { + return fmt.Errorf("target data directory not specified") + } + + // If not skipping extraction, target dir should not exist or be empty + if !opts.SkipExtraction { + if stat, err := os.Stat(opts.TargetDataDir); err == nil { + if stat.IsDir() { + entries, err := os.ReadDir(opts.TargetDataDir) + if err != nil { + return fmt.Errorf("failed to read target directory: %w", err) + } + if len(entries) > 0 { + return fmt.Errorf("target data directory is not empty: %s (use --skip-extraction if intentional)", opts.TargetDataDir) + } + } else { + return fmt.Errorf("target path exists but is not a directory: %s", opts.TargetDataDir) + } + } + } else { + // If skipping extraction, validate the data directory + if err := ro.configGen.ValidateDataDirectory(opts.TargetDataDir); err != nil { + return err + } + } + + ro.log.Info("✅ Validation passed") + return nil +} + +// extractBaseBackup extracts the base backup to the target directory +func (ro *RestoreOrchestrator) extractBaseBackup(ctx context.Context, opts *RestoreOptions) error { + ro.log.Info("Extracting base backup...", "source", opts.BaseBackupPath, "dest", opts.TargetDataDir) + + // Create target directory + if err := os.MkdirAll(opts.TargetDataDir, 0700); err != nil { + return fmt.Errorf("failed to create target directory: %w", err) + } + + // Determine backup format and extract + backupPath := opts.BaseBackupPath + + // Check if encrypted + if strings.HasSuffix(backupPath, ".enc") { + ro.log.Info("Backup is encrypted - decryption not yet implemented in PITR module") + return fmt.Errorf("encrypted backups not yet supported for PITR restore (use manual decryption)") + } + + // Check format + if strings.HasSuffix(backupPath, ".tar.gz") || strings.HasSuffix(backupPath, ".tgz") { + return ro.extractTarGzBackup(ctx, backupPath, opts.TargetDataDir) + } else if strings.HasSuffix(backupPath, ".tar") { + return ro.extractTarBackup(ctx, backupPath, opts.TargetDataDir) + } else if stat, err := os.Stat(backupPath); err == nil && stat.IsDir() { + return ro.copyDirectoryBackup(ctx, backupPath, opts.TargetDataDir) + } + + return fmt.Errorf("unsupported backup format: %s (expected .tar.gz, .tar, or directory)", backupPath) +} + +// extractTarGzBackup extracts a .tar.gz backup +func (ro *RestoreOrchestrator) extractTarGzBackup(ctx context.Context, source, dest string) error { + ro.log.Info("Extracting tar.gz backup...") + + cmd := exec.CommandContext(ctx, "tar", "-xzf", source, "-C", dest) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + if err := cmd.Run(); err != nil { + return fmt.Errorf("tar extraction failed: %w", err) + } + + ro.log.Info("✅ Base backup extracted successfully") + return nil +} + +// extractTarBackup extracts a .tar backup +func (ro *RestoreOrchestrator) extractTarBackup(ctx context.Context, source, dest string) error { + ro.log.Info("Extracting tar backup...") + + cmd := exec.CommandContext(ctx, "tar", "-xf", source, "-C", dest) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + if err := cmd.Run(); err != nil { + return fmt.Errorf("tar extraction failed: %w", err) + } + + ro.log.Info("✅ Base backup extracted successfully") + return nil +} + +// copyDirectoryBackup copies a directory backup +func (ro *RestoreOrchestrator) copyDirectoryBackup(ctx context.Context, source, dest string) error { + ro.log.Info("Copying directory backup...") + + cmd := exec.CommandContext(ctx, "cp", "-a", source+"/.", dest+"/") + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + if err := cmd.Run(); err != nil { + return fmt.Errorf("directory copy failed: %w", err) + } + + ro.log.Info("✅ Base backup copied successfully") + return nil +} + +// startPostgreSQL starts PostgreSQL server +func (ro *RestoreOrchestrator) startPostgreSQL(ctx context.Context, opts *RestoreOptions) error { + ro.log.Info("Starting PostgreSQL for recovery...") + + pgCtl := "pg_ctl" + if opts.PostgreSQLBin != "" { + pgCtl = filepath.Join(opts.PostgreSQLBin, "pg_ctl") + } + + cmd := exec.CommandContext(ctx, pgCtl, "-D", opts.TargetDataDir, "-l", filepath.Join(opts.TargetDataDir, "logfile"), "start") + + output, err := cmd.CombinedOutput() + if err != nil { + ro.log.Error("PostgreSQL startup failed", "output", string(output)) + return fmt.Errorf("pg_ctl start failed: %w", err) + } + + ro.log.Info("✅ PostgreSQL started successfully") + ro.log.Info("PostgreSQL is now performing recovery...") + return nil +} + +// monitorRecovery monitors recovery progress +func (ro *RestoreOrchestrator) monitorRecovery(ctx context.Context, opts *RestoreOptions) error { + ro.log.Info("Monitoring recovery progress...") + ro.log.Info("(This is a simplified monitor - check PostgreSQL logs for detailed progress)") + + // Monitor for up to 5 minutes or until context cancelled + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + timeout := time.After(5 * time.Minute) + + for { + select { + case <-ctx.Done(): + ro.log.Info("Monitoring cancelled") + return ctx.Err() + case <-timeout: + ro.log.Info("Monitoring timeout reached (5 minutes)") + ro.log.Info("Recovery may still be in progress - check PostgreSQL logs") + return nil + case <-ticker.C: + // Check if recovery is complete by looking for postmaster.pid + pidFile := filepath.Join(opts.TargetDataDir, "postmaster.pid") + if _, err := os.Stat(pidFile); err == nil { + ro.log.Info("✅ PostgreSQL is running") + + // Check if recovery files still exist + recoverySignal := filepath.Join(opts.TargetDataDir, "recovery.signal") + recoveryConf := filepath.Join(opts.TargetDataDir, "recovery.conf") + + if _, err := os.Stat(recoverySignal); os.IsNotExist(err) { + if _, err := os.Stat(recoveryConf); os.IsNotExist(err) { + ro.log.Info("✅ Recovery completed - PostgreSQL promoted to primary") + return nil + } + } + + ro.log.Info("Recovery in progress...") + } else { + ro.log.Info("PostgreSQL not yet started or crashed") + } + } + } +} + +// GetRecoveryStatus checks the current recovery status +func (ro *RestoreOrchestrator) GetRecoveryStatus(dataDir string) (string, error) { + // Check for recovery signal files + recoverySignal := filepath.Join(dataDir, "recovery.signal") + standbySignal := filepath.Join(dataDir, "standby.signal") + recoveryConf := filepath.Join(dataDir, "recovery.conf") + postmasterPid := filepath.Join(dataDir, "postmaster.pid") + + // Check if PostgreSQL is running + _, pgRunning := os.Stat(postmasterPid) + + if _, err := os.Stat(recoverySignal); err == nil { + if pgRunning == nil { + return "recovering", nil + } + return "recovery_configured", nil + } + + if _, err := os.Stat(standbySignal); err == nil { + if pgRunning == nil { + return "standby", nil + } + return "standby_configured", nil + } + + if _, err := os.Stat(recoveryConf); err == nil { + if pgRunning == nil { + return "recovering_legacy", nil + } + return "recovery_configured_legacy", nil + } + + if pgRunning == nil { + return "primary", nil + } + + return "not_configured", nil +}