Implemented full incremental backup restoration: internal/backup/incremental_postgres.go: - RestoreIncremental() - main entry point - Validates incremental backup metadata (.meta.json) - Verifies base backup exists and is full backup - Verifies checksums match (BaseBackupID == base SHA256) - Extracts base backup to target directory first - Applies incremental on top (overwrites changed files) - Context cancellation support - Comprehensive error handling: - Missing base backup - Wrong backup type (not incremental) - Checksum mismatch - Missing metadata internal/backup/incremental_extract.go: - extractTarGz() - extracts tar.gz archives - Handles regular files, directories, symlinks - Preserves file permissions and timestamps - Progress logging every 100 files - Context-aware (cancellable) Restore Logic: 1. Load incremental metadata from .meta.json 2. Verify base backup exists and checksums match 3. Extract base backup (full restore) 4. Extract incremental backup (apply changed files) 5. Log completion with file counts Features: ✅ Validates backup chain integrity ✅ Checksum verification for safety ✅ Handles base backup path mismatch (warning) ✅ Creates target directory if missing ✅ Preserves file attributes (perms, mtime) ✅ Detailed logging at each step Status: READY FOR TESTING Next: Write integration test (Step 7)
346 lines
10 KiB
Go
346 lines
10 KiB
Go
package backup
|
|
|
|
import (
|
|
"context"
|
|
"crypto/sha256"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"time"
|
|
|
|
"dbbackup/internal/logger"
|
|
"dbbackup/internal/metadata"
|
|
)
|
|
|
|
// PostgresIncrementalEngine implements incremental backups for PostgreSQL
|
|
type PostgresIncrementalEngine struct {
|
|
log logger.Logger
|
|
}
|
|
|
|
// NewPostgresIncrementalEngine creates a new PostgreSQL incremental backup engine
|
|
func NewPostgresIncrementalEngine(log logger.Logger) *PostgresIncrementalEngine {
|
|
return &PostgresIncrementalEngine{
|
|
log: log,
|
|
}
|
|
}
|
|
|
|
// FindChangedFiles identifies files that changed since the base backup
|
|
// This is a simple mtime-based implementation. Production should use pg_basebackup with incremental support.
|
|
func (e *PostgresIncrementalEngine) FindChangedFiles(ctx context.Context, config *IncrementalBackupConfig) ([]ChangedFile, error) {
|
|
e.log.Info("Finding changed files for incremental backup",
|
|
"base_backup", config.BaseBackupPath,
|
|
"data_dir", config.DataDirectory)
|
|
|
|
// Load base backup metadata to get timestamp
|
|
baseInfo, err := e.loadBackupInfo(config.BaseBackupPath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to load base backup info: %w", err)
|
|
}
|
|
|
|
// Validate base backup is full backup
|
|
if baseInfo.BackupType != "" && baseInfo.BackupType != "full" {
|
|
return nil, fmt.Errorf("base backup must be a full backup, got: %s", baseInfo.BackupType)
|
|
}
|
|
|
|
baseTimestamp := baseInfo.Timestamp
|
|
e.log.Info("Base backup timestamp", "timestamp", baseTimestamp)
|
|
|
|
// Scan data directory for changed files
|
|
var changedFiles []ChangedFile
|
|
|
|
err = filepath.Walk(config.DataDirectory, func(path string, info os.FileInfo, err error) error {
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Skip directories
|
|
if info.IsDir() {
|
|
return nil
|
|
}
|
|
|
|
// Skip temporary files, lock files, and sockets
|
|
if e.shouldSkipFile(path, info) {
|
|
return nil
|
|
}
|
|
|
|
// Check if file was modified after base backup
|
|
if info.ModTime().After(baseTimestamp) {
|
|
relPath, err := filepath.Rel(config.DataDirectory, path)
|
|
if err != nil {
|
|
e.log.Warn("Failed to get relative path", "path", path, "error", err)
|
|
return nil
|
|
}
|
|
|
|
changedFiles = append(changedFiles, ChangedFile{
|
|
RelativePath: relPath,
|
|
AbsolutePath: path,
|
|
Size: info.Size(),
|
|
ModTime: info.ModTime(),
|
|
})
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to scan data directory: %w", err)
|
|
}
|
|
|
|
e.log.Info("Found changed files", "count", len(changedFiles))
|
|
return changedFiles, nil
|
|
}
|
|
|
|
// shouldSkipFile determines if a file should be excluded from incremental backup
|
|
func (e *PostgresIncrementalEngine) shouldSkipFile(path string, info os.FileInfo) bool {
|
|
name := info.Name()
|
|
|
|
// Skip temporary files
|
|
if strings.HasSuffix(name, ".tmp") {
|
|
return true
|
|
}
|
|
|
|
// Skip lock files
|
|
if strings.HasSuffix(name, ".lock") || name == "postmaster.pid" {
|
|
return true
|
|
}
|
|
|
|
// Skip sockets
|
|
if info.Mode()&os.ModeSocket != 0 {
|
|
return true
|
|
}
|
|
|
|
// Skip pg_wal symlink target (WAL handled separately if needed)
|
|
if strings.Contains(path, "pg_wal") || strings.Contains(path, "pg_xlog") {
|
|
return true
|
|
}
|
|
|
|
// Skip pg_replslot (replication slots)
|
|
if strings.Contains(path, "pg_replslot") {
|
|
return true
|
|
}
|
|
|
|
// Skip postmaster.opts (runtime config, regenerated on startup)
|
|
if name == "postmaster.opts" {
|
|
return true
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// loadBackupInfo loads backup metadata from .meta.json file
|
|
func (e *PostgresIncrementalEngine) loadBackupInfo(backupPath string) (*metadata.BackupMetadata, error) {
|
|
// Load using metadata package
|
|
meta, err := metadata.Load(backupPath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to load backup metadata: %w", err)
|
|
}
|
|
|
|
return meta, nil
|
|
}
|
|
|
|
// CreateIncrementalBackup creates a new incremental backup archive
|
|
func (e *PostgresIncrementalEngine) CreateIncrementalBackup(ctx context.Context, config *IncrementalBackupConfig, changedFiles []ChangedFile) error {
|
|
e.log.Info("Creating incremental backup",
|
|
"changed_files", len(changedFiles),
|
|
"base_backup", config.BaseBackupPath)
|
|
|
|
if len(changedFiles) == 0 {
|
|
e.log.Info("No changed files detected - skipping incremental backup")
|
|
return fmt.Errorf("no changed files since base backup")
|
|
}
|
|
|
|
// Load base backup metadata
|
|
baseInfo, err := e.loadBackupInfo(config.BaseBackupPath)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to load base backup info: %w", err)
|
|
}
|
|
|
|
// Generate output filename: dbname_incr_TIMESTAMP.tar.gz
|
|
timestamp := time.Now().Format("20060102_150405")
|
|
outputFile := filepath.Join(filepath.Dir(config.BaseBackupPath),
|
|
fmt.Sprintf("%s_incr_%s.tar.gz", baseInfo.Database, timestamp))
|
|
|
|
e.log.Info("Creating incremental archive", "output", outputFile)
|
|
|
|
// Create tar.gz archive with changed files
|
|
if err := e.createTarGz(ctx, outputFile, changedFiles, config); err != nil {
|
|
return fmt.Errorf("failed to create archive: %w", err)
|
|
}
|
|
|
|
// Calculate checksum
|
|
checksum, err := e.CalculateFileChecksum(outputFile)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to calculate checksum: %w", err)
|
|
}
|
|
|
|
// Get archive size
|
|
stat, err := os.Stat(outputFile)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to stat archive: %w", err)
|
|
}
|
|
|
|
// Calculate total size of changed files
|
|
var totalSize int64
|
|
for _, f := range changedFiles {
|
|
totalSize += f.Size
|
|
}
|
|
|
|
// Create incremental metadata
|
|
metadata := &metadata.BackupMetadata{
|
|
Version: "2.2.0",
|
|
Timestamp: time.Now(),
|
|
Database: baseInfo.Database,
|
|
DatabaseType: baseInfo.DatabaseType,
|
|
Host: baseInfo.Host,
|
|
Port: baseInfo.Port,
|
|
User: baseInfo.User,
|
|
BackupFile: outputFile,
|
|
SizeBytes: stat.Size(),
|
|
SHA256: checksum,
|
|
Compression: "gzip",
|
|
BackupType: "incremental",
|
|
BaseBackup: filepath.Base(config.BaseBackupPath),
|
|
Incremental: &metadata.IncrementalMetadata{
|
|
BaseBackupID: baseInfo.SHA256,
|
|
BaseBackupPath: filepath.Base(config.BaseBackupPath),
|
|
BaseBackupTimestamp: baseInfo.Timestamp,
|
|
IncrementalFiles: len(changedFiles),
|
|
TotalSize: totalSize,
|
|
BackupChain: buildBackupChain(baseInfo, filepath.Base(outputFile)),
|
|
},
|
|
}
|
|
|
|
// Save metadata
|
|
if err := metadata.Save(); err != nil {
|
|
return fmt.Errorf("failed to save metadata: %w", err)
|
|
}
|
|
|
|
e.log.Info("Incremental backup created successfully",
|
|
"output", outputFile,
|
|
"size", stat.Size(),
|
|
"changed_files", len(changedFiles),
|
|
"checksum", checksum[:16]+"...")
|
|
|
|
return nil
|
|
}
|
|
|
|
// RestoreIncremental restores an incremental backup on top of a base
|
|
func (e *PostgresIncrementalEngine) RestoreIncremental(ctx context.Context, baseBackupPath, incrementalPath, targetDir string) error {
|
|
e.log.Info("Restoring incremental backup",
|
|
"base", baseBackupPath,
|
|
"incremental", incrementalPath,
|
|
"target", targetDir)
|
|
|
|
// Load incremental metadata to verify it's an incremental backup
|
|
incrInfo, err := e.loadBackupInfo(incrementalPath)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to load incremental backup metadata: %w", err)
|
|
}
|
|
|
|
if incrInfo.BackupType != "incremental" {
|
|
return fmt.Errorf("backup is not incremental (type: %s)", incrInfo.BackupType)
|
|
}
|
|
|
|
if incrInfo.Incremental == nil {
|
|
return fmt.Errorf("incremental metadata missing")
|
|
}
|
|
|
|
// Verify base backup path matches metadata
|
|
expectedBase := filepath.Join(filepath.Dir(incrementalPath), incrInfo.Incremental.BaseBackupPath)
|
|
if !strings.EqualFold(filepath.Clean(baseBackupPath), filepath.Clean(expectedBase)) {
|
|
e.log.Warn("Base backup path mismatch",
|
|
"provided", baseBackupPath,
|
|
"expected", expectedBase)
|
|
// Continue anyway - user might have moved files
|
|
}
|
|
|
|
// Verify base backup exists
|
|
if _, err := os.Stat(baseBackupPath); err != nil {
|
|
return fmt.Errorf("base backup not found: %w", err)
|
|
}
|
|
|
|
// Load base backup metadata to verify it's a full backup
|
|
baseInfo, err := e.loadBackupInfo(baseBackupPath)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to load base backup metadata: %w", err)
|
|
}
|
|
|
|
if baseInfo.BackupType != "full" && baseInfo.BackupType != "" {
|
|
return fmt.Errorf("base backup is not a full backup (type: %s)", baseInfo.BackupType)
|
|
}
|
|
|
|
// Verify checksums match
|
|
if incrInfo.Incremental.BaseBackupID != "" && baseInfo.SHA256 != "" {
|
|
if incrInfo.Incremental.BaseBackupID != baseInfo.SHA256 {
|
|
return fmt.Errorf("base backup checksum mismatch: expected %s, got %s",
|
|
incrInfo.Incremental.BaseBackupID, baseInfo.SHA256)
|
|
}
|
|
e.log.Info("Base backup checksum verified", "checksum", baseInfo.SHA256)
|
|
}
|
|
|
|
// Create target directory if it doesn't exist
|
|
if err := os.MkdirAll(targetDir, 0755); err != nil {
|
|
return fmt.Errorf("failed to create target directory: %w", err)
|
|
}
|
|
|
|
// Step 1: Extract base backup to target directory
|
|
e.log.Info("Extracting base backup", "output", targetDir)
|
|
if err := e.extractTarGz(ctx, baseBackupPath, targetDir); err != nil {
|
|
return fmt.Errorf("failed to extract base backup: %w", err)
|
|
}
|
|
e.log.Info("Base backup extracted successfully")
|
|
|
|
// Step 2: Extract incremental backup, overwriting changed files
|
|
e.log.Info("Applying incremental backup", "changed_files", incrInfo.Incremental.IncrementalFiles)
|
|
if err := e.extractTarGz(ctx, incrementalPath, targetDir); err != nil {
|
|
return fmt.Errorf("failed to extract incremental backup: %w", err)
|
|
}
|
|
e.log.Info("Incremental backup applied successfully")
|
|
|
|
// Step 3: Verify restoration
|
|
e.log.Info("Restore complete",
|
|
"base_backup", filepath.Base(baseBackupPath),
|
|
"incremental_backup", filepath.Base(incrementalPath),
|
|
"target_directory", targetDir,
|
|
"total_files_updated", incrInfo.Incremental.IncrementalFiles)
|
|
|
|
return nil
|
|
}
|
|
|
|
// CalculateFileChecksum computes SHA-256 hash of a file
|
|
func (e *PostgresIncrementalEngine) CalculateFileChecksum(path string) (string, error) {
|
|
file, err := os.Open(path)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
defer file.Close()
|
|
|
|
hash := sha256.New()
|
|
if _, err := io.Copy(hash, file); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
return hex.EncodeToString(hash.Sum(nil)), nil
|
|
}
|
|
|
|
// buildBackupChain constructs the backup chain from base backup to current incremental
|
|
func buildBackupChain(baseInfo *metadata.BackupMetadata, currentBackup string) []string {
|
|
chain := []string{}
|
|
|
|
// If base backup has a chain (is itself incremental), use that
|
|
if baseInfo.Incremental != nil && len(baseInfo.Incremental.BackupChain) > 0 {
|
|
chain = append(chain, baseInfo.Incremental.BackupChain...)
|
|
} else {
|
|
// Base is a full backup, start chain with it
|
|
chain = append(chain, filepath.Base(baseInfo.BackupFile))
|
|
}
|
|
|
|
// Add current incremental to chain
|
|
chain = append(chain, currentBackup)
|
|
|
|
return chain
|
|
}
|