Files
dbbackup/internal/backup/incremental_postgres.go
Renz b18e9e9ec9 feat: Step 6 - Implement RestoreIncremental() for PostgreSQL
Implemented full incremental backup restoration:

internal/backup/incremental_postgres.go:
- RestoreIncremental() - main entry point
- Validates incremental backup metadata (.meta.json)
- Verifies base backup exists and is full backup
- Verifies checksums match (BaseBackupID == base SHA256)
- Extracts base backup to target directory first
- Applies incremental on top (overwrites changed files)
- Context cancellation support
- Comprehensive error handling:
  - Missing base backup
  - Wrong backup type (not incremental)
  - Checksum mismatch
  - Missing metadata

internal/backup/incremental_extract.go:
- extractTarGz() - extracts tar.gz archives
- Handles regular files, directories, symlinks
- Preserves file permissions and timestamps
- Progress logging every 100 files
- Context-aware (cancellable)

Restore Logic:
1. Load incremental metadata from .meta.json
2. Verify base backup exists and checksums match
3. Extract base backup (full restore)
4. Extract incremental backup (apply changed files)
5. Log completion with file counts

Features:
 Validates backup chain integrity
 Checksum verification for safety
 Handles base backup path mismatch (warning)
 Creates target directory if missing
 Preserves file attributes (perms, mtime)
 Detailed logging at each step

Status: READY FOR TESTING
Next: Write integration test (Step 7)
2025-11-26 07:04:34 +00:00

346 lines
10 KiB
Go

package backup
import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"time"
"dbbackup/internal/logger"
"dbbackup/internal/metadata"
)
// PostgresIncrementalEngine implements incremental backups for PostgreSQL
type PostgresIncrementalEngine struct {
log logger.Logger
}
// NewPostgresIncrementalEngine creates a new PostgreSQL incremental backup engine
func NewPostgresIncrementalEngine(log logger.Logger) *PostgresIncrementalEngine {
return &PostgresIncrementalEngine{
log: log,
}
}
// FindChangedFiles identifies files that changed since the base backup
// This is a simple mtime-based implementation. Production should use pg_basebackup with incremental support.
func (e *PostgresIncrementalEngine) FindChangedFiles(ctx context.Context, config *IncrementalBackupConfig) ([]ChangedFile, error) {
e.log.Info("Finding changed files for incremental backup",
"base_backup", config.BaseBackupPath,
"data_dir", config.DataDirectory)
// Load base backup metadata to get timestamp
baseInfo, err := e.loadBackupInfo(config.BaseBackupPath)
if err != nil {
return nil, fmt.Errorf("failed to load base backup info: %w", err)
}
// Validate base backup is full backup
if baseInfo.BackupType != "" && baseInfo.BackupType != "full" {
return nil, fmt.Errorf("base backup must be a full backup, got: %s", baseInfo.BackupType)
}
baseTimestamp := baseInfo.Timestamp
e.log.Info("Base backup timestamp", "timestamp", baseTimestamp)
// Scan data directory for changed files
var changedFiles []ChangedFile
err = filepath.Walk(config.DataDirectory, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
// Skip directories
if info.IsDir() {
return nil
}
// Skip temporary files, lock files, and sockets
if e.shouldSkipFile(path, info) {
return nil
}
// Check if file was modified after base backup
if info.ModTime().After(baseTimestamp) {
relPath, err := filepath.Rel(config.DataDirectory, path)
if err != nil {
e.log.Warn("Failed to get relative path", "path", path, "error", err)
return nil
}
changedFiles = append(changedFiles, ChangedFile{
RelativePath: relPath,
AbsolutePath: path,
Size: info.Size(),
ModTime: info.ModTime(),
})
}
return nil
})
if err != nil {
return nil, fmt.Errorf("failed to scan data directory: %w", err)
}
e.log.Info("Found changed files", "count", len(changedFiles))
return changedFiles, nil
}
// shouldSkipFile determines if a file should be excluded from incremental backup
func (e *PostgresIncrementalEngine) shouldSkipFile(path string, info os.FileInfo) bool {
name := info.Name()
// Skip temporary files
if strings.HasSuffix(name, ".tmp") {
return true
}
// Skip lock files
if strings.HasSuffix(name, ".lock") || name == "postmaster.pid" {
return true
}
// Skip sockets
if info.Mode()&os.ModeSocket != 0 {
return true
}
// Skip pg_wal symlink target (WAL handled separately if needed)
if strings.Contains(path, "pg_wal") || strings.Contains(path, "pg_xlog") {
return true
}
// Skip pg_replslot (replication slots)
if strings.Contains(path, "pg_replslot") {
return true
}
// Skip postmaster.opts (runtime config, regenerated on startup)
if name == "postmaster.opts" {
return true
}
return false
}
// loadBackupInfo loads backup metadata from .meta.json file
func (e *PostgresIncrementalEngine) loadBackupInfo(backupPath string) (*metadata.BackupMetadata, error) {
// Load using metadata package
meta, err := metadata.Load(backupPath)
if err != nil {
return nil, fmt.Errorf("failed to load backup metadata: %w", err)
}
return meta, nil
}
// CreateIncrementalBackup creates a new incremental backup archive
func (e *PostgresIncrementalEngine) CreateIncrementalBackup(ctx context.Context, config *IncrementalBackupConfig, changedFiles []ChangedFile) error {
e.log.Info("Creating incremental backup",
"changed_files", len(changedFiles),
"base_backup", config.BaseBackupPath)
if len(changedFiles) == 0 {
e.log.Info("No changed files detected - skipping incremental backup")
return fmt.Errorf("no changed files since base backup")
}
// Load base backup metadata
baseInfo, err := e.loadBackupInfo(config.BaseBackupPath)
if err != nil {
return fmt.Errorf("failed to load base backup info: %w", err)
}
// Generate output filename: dbname_incr_TIMESTAMP.tar.gz
timestamp := time.Now().Format("20060102_150405")
outputFile := filepath.Join(filepath.Dir(config.BaseBackupPath),
fmt.Sprintf("%s_incr_%s.tar.gz", baseInfo.Database, timestamp))
e.log.Info("Creating incremental archive", "output", outputFile)
// Create tar.gz archive with changed files
if err := e.createTarGz(ctx, outputFile, changedFiles, config); err != nil {
return fmt.Errorf("failed to create archive: %w", err)
}
// Calculate checksum
checksum, err := e.CalculateFileChecksum(outputFile)
if err != nil {
return fmt.Errorf("failed to calculate checksum: %w", err)
}
// Get archive size
stat, err := os.Stat(outputFile)
if err != nil {
return fmt.Errorf("failed to stat archive: %w", err)
}
// Calculate total size of changed files
var totalSize int64
for _, f := range changedFiles {
totalSize += f.Size
}
// Create incremental metadata
metadata := &metadata.BackupMetadata{
Version: "2.2.0",
Timestamp: time.Now(),
Database: baseInfo.Database,
DatabaseType: baseInfo.DatabaseType,
Host: baseInfo.Host,
Port: baseInfo.Port,
User: baseInfo.User,
BackupFile: outputFile,
SizeBytes: stat.Size(),
SHA256: checksum,
Compression: "gzip",
BackupType: "incremental",
BaseBackup: filepath.Base(config.BaseBackupPath),
Incremental: &metadata.IncrementalMetadata{
BaseBackupID: baseInfo.SHA256,
BaseBackupPath: filepath.Base(config.BaseBackupPath),
BaseBackupTimestamp: baseInfo.Timestamp,
IncrementalFiles: len(changedFiles),
TotalSize: totalSize,
BackupChain: buildBackupChain(baseInfo, filepath.Base(outputFile)),
},
}
// Save metadata
if err := metadata.Save(); err != nil {
return fmt.Errorf("failed to save metadata: %w", err)
}
e.log.Info("Incremental backup created successfully",
"output", outputFile,
"size", stat.Size(),
"changed_files", len(changedFiles),
"checksum", checksum[:16]+"...")
return nil
}
// RestoreIncremental restores an incremental backup on top of a base
func (e *PostgresIncrementalEngine) RestoreIncremental(ctx context.Context, baseBackupPath, incrementalPath, targetDir string) error {
e.log.Info("Restoring incremental backup",
"base", baseBackupPath,
"incremental", incrementalPath,
"target", targetDir)
// Load incremental metadata to verify it's an incremental backup
incrInfo, err := e.loadBackupInfo(incrementalPath)
if err != nil {
return fmt.Errorf("failed to load incremental backup metadata: %w", err)
}
if incrInfo.BackupType != "incremental" {
return fmt.Errorf("backup is not incremental (type: %s)", incrInfo.BackupType)
}
if incrInfo.Incremental == nil {
return fmt.Errorf("incremental metadata missing")
}
// Verify base backup path matches metadata
expectedBase := filepath.Join(filepath.Dir(incrementalPath), incrInfo.Incremental.BaseBackupPath)
if !strings.EqualFold(filepath.Clean(baseBackupPath), filepath.Clean(expectedBase)) {
e.log.Warn("Base backup path mismatch",
"provided", baseBackupPath,
"expected", expectedBase)
// Continue anyway - user might have moved files
}
// Verify base backup exists
if _, err := os.Stat(baseBackupPath); err != nil {
return fmt.Errorf("base backup not found: %w", err)
}
// Load base backup metadata to verify it's a full backup
baseInfo, err := e.loadBackupInfo(baseBackupPath)
if err != nil {
return fmt.Errorf("failed to load base backup metadata: %w", err)
}
if baseInfo.BackupType != "full" && baseInfo.BackupType != "" {
return fmt.Errorf("base backup is not a full backup (type: %s)", baseInfo.BackupType)
}
// Verify checksums match
if incrInfo.Incremental.BaseBackupID != "" && baseInfo.SHA256 != "" {
if incrInfo.Incremental.BaseBackupID != baseInfo.SHA256 {
return fmt.Errorf("base backup checksum mismatch: expected %s, got %s",
incrInfo.Incremental.BaseBackupID, baseInfo.SHA256)
}
e.log.Info("Base backup checksum verified", "checksum", baseInfo.SHA256)
}
// Create target directory if it doesn't exist
if err := os.MkdirAll(targetDir, 0755); err != nil {
return fmt.Errorf("failed to create target directory: %w", err)
}
// Step 1: Extract base backup to target directory
e.log.Info("Extracting base backup", "output", targetDir)
if err := e.extractTarGz(ctx, baseBackupPath, targetDir); err != nil {
return fmt.Errorf("failed to extract base backup: %w", err)
}
e.log.Info("Base backup extracted successfully")
// Step 2: Extract incremental backup, overwriting changed files
e.log.Info("Applying incremental backup", "changed_files", incrInfo.Incremental.IncrementalFiles)
if err := e.extractTarGz(ctx, incrementalPath, targetDir); err != nil {
return fmt.Errorf("failed to extract incremental backup: %w", err)
}
e.log.Info("Incremental backup applied successfully")
// Step 3: Verify restoration
e.log.Info("Restore complete",
"base_backup", filepath.Base(baseBackupPath),
"incremental_backup", filepath.Base(incrementalPath),
"target_directory", targetDir,
"total_files_updated", incrInfo.Incremental.IncrementalFiles)
return nil
}
// CalculateFileChecksum computes SHA-256 hash of a file
func (e *PostgresIncrementalEngine) CalculateFileChecksum(path string) (string, error) {
file, err := os.Open(path)
if err != nil {
return "", err
}
defer file.Close()
hash := sha256.New()
if _, err := io.Copy(hash, file); err != nil {
return "", err
}
return hex.EncodeToString(hash.Sum(nil)), nil
}
// buildBackupChain constructs the backup chain from base backup to current incremental
func buildBackupChain(baseInfo *metadata.BackupMetadata, currentBackup string) []string {
chain := []string{}
// If base backup has a chain (is itself incremental), use that
if baseInfo.Incremental != nil && len(baseInfo.Incremental.BackupChain) > 0 {
chain = append(chain, baseInfo.Incremental.BackupChain...)
} else {
// Base is a full backup, start chain with it
chain = append(chain, filepath.Base(baseInfo.BackupFile))
}
// Add current incremental to chain
chain = append(chain, currentBackup)
return chain
}