Files
dbbackup/internal/backup/incremental_postgres.go
A. Renz 914307ac8f ci: add golangci-lint config and fix formatting
- Add .golangci.yml with minimal linters (govet, ineffassign)
- Run gofmt -s and goimports on all files to fix formatting
- Disable fieldalignment and copylocks checks in govet
2025-12-11 17:53:28 +01:00

346 lines
10 KiB
Go

package backup
import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"time"
"dbbackup/internal/logger"
"dbbackup/internal/metadata"
)
// PostgresIncrementalEngine implements incremental backups for PostgreSQL
type PostgresIncrementalEngine struct {
log logger.Logger
}
// NewPostgresIncrementalEngine creates a new PostgreSQL incremental backup engine
func NewPostgresIncrementalEngine(log logger.Logger) *PostgresIncrementalEngine {
return &PostgresIncrementalEngine{
log: log,
}
}
// FindChangedFiles identifies files that changed since the base backup
// This is a simple mtime-based implementation. Production should use pg_basebackup with incremental support.
func (e *PostgresIncrementalEngine) FindChangedFiles(ctx context.Context, config *IncrementalBackupConfig) ([]ChangedFile, error) {
e.log.Info("Finding changed files for incremental backup",
"base_backup", config.BaseBackupPath,
"data_dir", config.DataDirectory)
// Load base backup metadata to get timestamp
baseInfo, err := e.loadBackupInfo(config.BaseBackupPath)
if err != nil {
return nil, fmt.Errorf("failed to load base backup info: %w", err)
}
// Validate base backup is full backup
if baseInfo.BackupType != "" && baseInfo.BackupType != "full" {
return nil, fmt.Errorf("base backup must be a full backup, got: %s", baseInfo.BackupType)
}
baseTimestamp := baseInfo.Timestamp
e.log.Info("Base backup timestamp", "timestamp", baseTimestamp)
// Scan data directory for changed files
var changedFiles []ChangedFile
err = filepath.Walk(config.DataDirectory, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
// Skip directories
if info.IsDir() {
return nil
}
// Skip temporary files, lock files, and sockets
if e.shouldSkipFile(path, info) {
return nil
}
// Check if file was modified after base backup
if info.ModTime().After(baseTimestamp) {
relPath, err := filepath.Rel(config.DataDirectory, path)
if err != nil {
e.log.Warn("Failed to get relative path", "path", path, "error", err)
return nil
}
changedFiles = append(changedFiles, ChangedFile{
RelativePath: relPath,
AbsolutePath: path,
Size: info.Size(),
ModTime: info.ModTime(),
})
}
return nil
})
if err != nil {
return nil, fmt.Errorf("failed to scan data directory: %w", err)
}
e.log.Info("Found changed files", "count", len(changedFiles))
return changedFiles, nil
}
// shouldSkipFile determines if a file should be excluded from incremental backup
func (e *PostgresIncrementalEngine) shouldSkipFile(path string, info os.FileInfo) bool {
name := info.Name()
// Skip temporary files
if strings.HasSuffix(name, ".tmp") {
return true
}
// Skip lock files
if strings.HasSuffix(name, ".lock") || name == "postmaster.pid" {
return true
}
// Skip sockets
if info.Mode()&os.ModeSocket != 0 {
return true
}
// Skip pg_wal symlink target (WAL handled separately if needed)
if strings.Contains(path, "pg_wal") || strings.Contains(path, "pg_xlog") {
return true
}
// Skip pg_replslot (replication slots)
if strings.Contains(path, "pg_replslot") {
return true
}
// Skip postmaster.opts (runtime config, regenerated on startup)
if name == "postmaster.opts" {
return true
}
return false
}
// loadBackupInfo loads backup metadata from .meta.json file
func (e *PostgresIncrementalEngine) loadBackupInfo(backupPath string) (*metadata.BackupMetadata, error) {
// Load using metadata package
meta, err := metadata.Load(backupPath)
if err != nil {
return nil, fmt.Errorf("failed to load backup metadata: %w", err)
}
return meta, nil
}
// CreateIncrementalBackup creates a new incremental backup archive
func (e *PostgresIncrementalEngine) CreateIncrementalBackup(ctx context.Context, config *IncrementalBackupConfig, changedFiles []ChangedFile) error {
e.log.Info("Creating incremental backup",
"changed_files", len(changedFiles),
"base_backup", config.BaseBackupPath)
if len(changedFiles) == 0 {
e.log.Info("No changed files detected - skipping incremental backup")
return fmt.Errorf("no changed files since base backup")
}
// Load base backup metadata
baseInfo, err := e.loadBackupInfo(config.BaseBackupPath)
if err != nil {
return fmt.Errorf("failed to load base backup info: %w", err)
}
// Generate output filename: dbname_incr_TIMESTAMP.tar.gz
timestamp := time.Now().Format("20060102_150405")
outputFile := filepath.Join(filepath.Dir(config.BaseBackupPath),
fmt.Sprintf("%s_incr_%s.tar.gz", baseInfo.Database, timestamp))
e.log.Info("Creating incremental archive", "output", outputFile)
// Create tar.gz archive with changed files
if err := e.createTarGz(ctx, outputFile, changedFiles, config); err != nil {
return fmt.Errorf("failed to create archive: %w", err)
}
// Calculate checksum
checksum, err := e.CalculateFileChecksum(outputFile)
if err != nil {
return fmt.Errorf("failed to calculate checksum: %w", err)
}
// Get archive size
stat, err := os.Stat(outputFile)
if err != nil {
return fmt.Errorf("failed to stat archive: %w", err)
}
// Calculate total size of changed files
var totalSize int64
for _, f := range changedFiles {
totalSize += f.Size
}
// Create incremental metadata
metadata := &metadata.BackupMetadata{
Version: "2.2.0",
Timestamp: time.Now(),
Database: baseInfo.Database,
DatabaseType: baseInfo.DatabaseType,
Host: baseInfo.Host,
Port: baseInfo.Port,
User: baseInfo.User,
BackupFile: outputFile,
SizeBytes: stat.Size(),
SHA256: checksum,
Compression: "gzip",
BackupType: "incremental",
BaseBackup: filepath.Base(config.BaseBackupPath),
Incremental: &metadata.IncrementalMetadata{
BaseBackupID: baseInfo.SHA256,
BaseBackupPath: filepath.Base(config.BaseBackupPath),
BaseBackupTimestamp: baseInfo.Timestamp,
IncrementalFiles: len(changedFiles),
TotalSize: totalSize,
BackupChain: buildBackupChain(baseInfo, filepath.Base(outputFile)),
},
}
// Save metadata
if err := metadata.Save(); err != nil {
return fmt.Errorf("failed to save metadata: %w", err)
}
e.log.Info("Incremental backup created successfully",
"output", outputFile,
"size", stat.Size(),
"changed_files", len(changedFiles),
"checksum", checksum[:16]+"...")
return nil
}
// RestoreIncremental restores an incremental backup on top of a base
func (e *PostgresIncrementalEngine) RestoreIncremental(ctx context.Context, baseBackupPath, incrementalPath, targetDir string) error {
e.log.Info("Restoring incremental backup",
"base", baseBackupPath,
"incremental", incrementalPath,
"target", targetDir)
// Load incremental metadata to verify it's an incremental backup
incrInfo, err := e.loadBackupInfo(incrementalPath)
if err != nil {
return fmt.Errorf("failed to load incremental backup metadata: %w", err)
}
if incrInfo.BackupType != "incremental" {
return fmt.Errorf("backup is not incremental (type: %s)", incrInfo.BackupType)
}
if incrInfo.Incremental == nil {
return fmt.Errorf("incremental metadata missing")
}
// Verify base backup path matches metadata
expectedBase := filepath.Join(filepath.Dir(incrementalPath), incrInfo.Incremental.BaseBackupPath)
if !strings.EqualFold(filepath.Clean(baseBackupPath), filepath.Clean(expectedBase)) {
e.log.Warn("Base backup path mismatch",
"provided", baseBackupPath,
"expected", expectedBase)
// Continue anyway - user might have moved files
}
// Verify base backup exists
if _, err := os.Stat(baseBackupPath); err != nil {
return fmt.Errorf("base backup not found: %w", err)
}
// Load base backup metadata to verify it's a full backup
baseInfo, err := e.loadBackupInfo(baseBackupPath)
if err != nil {
return fmt.Errorf("failed to load base backup metadata: %w", err)
}
if baseInfo.BackupType != "full" && baseInfo.BackupType != "" {
return fmt.Errorf("base backup is not a full backup (type: %s)", baseInfo.BackupType)
}
// Verify checksums match
if incrInfo.Incremental.BaseBackupID != "" && baseInfo.SHA256 != "" {
if incrInfo.Incremental.BaseBackupID != baseInfo.SHA256 {
return fmt.Errorf("base backup checksum mismatch: expected %s, got %s",
incrInfo.Incremental.BaseBackupID, baseInfo.SHA256)
}
e.log.Info("Base backup checksum verified", "checksum", baseInfo.SHA256)
}
// Create target directory if it doesn't exist
if err := os.MkdirAll(targetDir, 0755); err != nil {
return fmt.Errorf("failed to create target directory: %w", err)
}
// Step 1: Extract base backup to target directory
e.log.Info("Extracting base backup", "output", targetDir)
if err := e.extractTarGz(ctx, baseBackupPath, targetDir); err != nil {
return fmt.Errorf("failed to extract base backup: %w", err)
}
e.log.Info("Base backup extracted successfully")
// Step 2: Extract incremental backup, overwriting changed files
e.log.Info("Applying incremental backup", "changed_files", incrInfo.Incremental.IncrementalFiles)
if err := e.extractTarGz(ctx, incrementalPath, targetDir); err != nil {
return fmt.Errorf("failed to extract incremental backup: %w", err)
}
e.log.Info("Incremental backup applied successfully")
// Step 3: Verify restoration
e.log.Info("Restore complete",
"base_backup", filepath.Base(baseBackupPath),
"incremental_backup", filepath.Base(incrementalPath),
"target_directory", targetDir,
"total_files_updated", incrInfo.Incremental.IncrementalFiles)
return nil
}
// CalculateFileChecksum computes SHA-256 hash of a file
func (e *PostgresIncrementalEngine) CalculateFileChecksum(path string) (string, error) {
file, err := os.Open(path)
if err != nil {
return "", err
}
defer file.Close()
hash := sha256.New()
if _, err := io.Copy(hash, file); err != nil {
return "", err
}
return hex.EncodeToString(hash.Sum(nil)), nil
}
// buildBackupChain constructs the backup chain from base backup to current incremental
func buildBackupChain(baseInfo *metadata.BackupMetadata, currentBackup string) []string {
chain := []string{}
// If base backup has a chain (is itself incremental), use that
if baseInfo.Incremental != nil && len(baseInfo.Incremental.BackupChain) > 0 {
chain = append(chain, baseInfo.Incremental.BackupChain...)
} else {
// Base is a full backup, start chain with it
chain = append(chain, filepath.Base(baseInfo.BackupFile))
}
// Add current incremental to chain
chain = append(chain, currentBackup)
return chain
}