feat: Step 5 - Implement CreateIncrementalBackup() for PostgreSQL

Implemented full incremental backup creation:

internal/backup/incremental_postgres.go:
- CreateIncrementalBackup() - main entry point
- Validates base backup exists and is full backup
- Loads base backup metadata (.meta.json)
- Uses FindChangedFiles() to detect modifications
- Creates tar.gz with ONLY changed files
- Generates incremental metadata with:
  - Base backup ID (SHA-256)
  - Backup chain (base -> incr1 -> incr2...)
  - Changed file count and total size
- Saves .meta.json with full incremental metadata
- Calculates SHA-256 checksum of archive

internal/backup/incremental_tar.go:
- createTarGz() - creates compressed archive
- addFileToTar() - adds individual files to tar
- Handles context cancellation
- Progress logging for each file
- Preserves file permissions and timestamps

Helper Functions:
- loadBackupInfo() - loads BackupMetadata from .meta.json
- buildBackupChain() - constructs restore chain
- CalculateFileChecksum() - SHA-256 for archive

Features:
 Creates tar.gz with ONLY changed files
 Much smaller than full backup
 Links to base backup via SHA-256
 Tracks complete restore chain
 Full metadata for restore validation
 Context-aware (cancellable)

Status: READY FOR TESTING
Next: Wire into backup engine, test with real PostgreSQL data
This commit is contained in:
2025-11-26 06:51:32 +00:00
parent e059cc2e3a
commit 2f9d2ba339
2 changed files with 200 additions and 21 deletions

View File

@@ -4,14 +4,15 @@ import (
"context" "context"
"crypto/sha256" "crypto/sha256"
"encoding/hex" "encoding/hex"
"encoding/json"
"fmt" "fmt"
"io" "io"
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
"time"
"dbbackup/internal/logger" "dbbackup/internal/logger"
"dbbackup/internal/metadata"
) )
// PostgresIncrementalEngine implements incremental backups for PostgreSQL // PostgresIncrementalEngine implements incremental backups for PostgreSQL
@@ -39,7 +40,8 @@ func (e *PostgresIncrementalEngine) FindChangedFiles(ctx context.Context, config
return nil, fmt.Errorf("failed to load base backup info: %w", err) return nil, fmt.Errorf("failed to load base backup info: %w", err)
} }
if baseInfo.BackupType != BackupTypeFull { // Validate base backup is full backup
if baseInfo.BackupType != "" && baseInfo.BackupType != "full" {
return nil, fmt.Errorf("base backup must be a full backup, got: %s", baseInfo.BackupType) return nil, fmt.Errorf("base backup must be a full backup, got: %s", baseInfo.BackupType)
} }
@@ -128,22 +130,15 @@ func (e *PostgresIncrementalEngine) shouldSkipFile(path string, info os.FileInfo
return false return false
} }
// loadBackupInfo loads backup metadata from .info file // loadBackupInfo loads backup metadata from .meta.json file
func (e *PostgresIncrementalEngine) loadBackupInfo(backupPath string) (*BackupInfo, error) { func (e *PostgresIncrementalEngine) loadBackupInfo(backupPath string) (*metadata.BackupMetadata, error) {
// Remove .tar.gz extension and add .info // Load using metadata package
infoPath := strings.TrimSuffix(backupPath, ".tar.gz") + ".info" meta, err := metadata.Load(backupPath)
data, err := os.ReadFile(infoPath)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to read info file %s: %w", infoPath, err) return nil, fmt.Errorf("failed to load backup metadata: %w", err)
} }
var info BackupInfo return meta, nil
if err := json.Unmarshal(data, &info); err != nil {
return nil, fmt.Errorf("failed to parse info file: %w", err)
}
return &info, nil
} }
// CreateIncrementalBackup creates a new incremental backup archive // CreateIncrementalBackup creates a new incremental backup archive
@@ -152,13 +147,84 @@ func (e *PostgresIncrementalEngine) CreateIncrementalBackup(ctx context.Context,
"changed_files", len(changedFiles), "changed_files", len(changedFiles),
"base_backup", config.BaseBackupPath) "base_backup", config.BaseBackupPath)
// TODO: Implementation in next step if len(changedFiles) == 0 {
// 1. Create tar.gz with only changed files e.log.Info("No changed files detected - skipping incremental backup")
// 2. Generate metadata with base backup reference return fmt.Errorf("no changed files since base backup")
// 3. Write .info file with incremental metadata }
// 4. Calculate checksums
return fmt.Errorf("not implemented yet") // Load base backup metadata
baseInfo, err := e.loadBackupInfo(config.BaseBackupPath)
if err != nil {
return fmt.Errorf("failed to load base backup info: %w", err)
}
// Generate output filename: dbname_incr_TIMESTAMP.tar.gz
timestamp := time.Now().Format("20060102_150405")
outputFile := filepath.Join(filepath.Dir(config.BaseBackupPath),
fmt.Sprintf("%s_incr_%s.tar.gz", baseInfo.Database, timestamp))
e.log.Info("Creating incremental archive", "output", outputFile)
// Create tar.gz archive with changed files
if err := e.createTarGz(ctx, outputFile, changedFiles, config); err != nil {
return fmt.Errorf("failed to create archive: %w", err)
}
// Calculate checksum
checksum, err := e.CalculateFileChecksum(outputFile)
if err != nil {
return fmt.Errorf("failed to calculate checksum: %w", err)
}
// Get archive size
stat, err := os.Stat(outputFile)
if err != nil {
return fmt.Errorf("failed to stat archive: %w", err)
}
// Calculate total size of changed files
var totalSize int64
for _, f := range changedFiles {
totalSize += f.Size
}
// Create incremental metadata
metadata := &metadata.BackupMetadata{
Version: "2.2.0",
Timestamp: time.Now(),
Database: baseInfo.Database,
DatabaseType: baseInfo.DatabaseType,
Host: baseInfo.Host,
Port: baseInfo.Port,
User: baseInfo.User,
BackupFile: outputFile,
SizeBytes: stat.Size(),
SHA256: checksum,
Compression: "gzip",
BackupType: "incremental",
BaseBackup: filepath.Base(config.BaseBackupPath),
Incremental: &metadata.IncrementalMetadata{
BaseBackupID: baseInfo.SHA256,
BaseBackupPath: filepath.Base(config.BaseBackupPath),
BaseBackupTimestamp: baseInfo.Timestamp,
IncrementalFiles: len(changedFiles),
TotalSize: totalSize,
BackupChain: buildBackupChain(baseInfo, filepath.Base(outputFile)),
},
}
// Save metadata
if err := metadata.Save(); err != nil {
return fmt.Errorf("failed to save metadata: %w", err)
}
e.log.Info("Incremental backup created successfully",
"output", outputFile,
"size", stat.Size(),
"changed_files", len(changedFiles),
"checksum", checksum[:16]+"...")
return nil
} }
// RestoreIncremental restores an incremental backup on top of a base // RestoreIncremental restores an incremental backup on top of a base
@@ -192,3 +258,21 @@ func (e *PostgresIncrementalEngine) CalculateFileChecksum(path string) (string,
return hex.EncodeToString(hash.Sum(nil)), nil return hex.EncodeToString(hash.Sum(nil)), nil
} }
// buildBackupChain constructs the backup chain from base backup to current incremental
func buildBackupChain(baseInfo *metadata.BackupMetadata, currentBackup string) []string {
chain := []string{}
// If base backup has a chain (is itself incremental), use that
if baseInfo.Incremental != nil && len(baseInfo.Incremental.BackupChain) > 0 {
chain = append(chain, baseInfo.Incremental.BackupChain...)
} else {
// Base is a full backup, start chain with it
chain = append(chain, filepath.Base(baseInfo.BackupFile))
}
// Add current incremental to chain
chain = append(chain, currentBackup)
return chain
}

View File

@@ -0,0 +1,95 @@
package backup
import (
"archive/tar"
"compress/gzip"
"context"
"fmt"
"io"
"os"
)
// createTarGz creates a tar.gz archive with the specified changed files
func (e *PostgresIncrementalEngine) createTarGz(ctx context.Context, outputFile string, changedFiles []ChangedFile, config *IncrementalBackupConfig) error {
// Create output file
outFile, err := os.Create(outputFile)
if err != nil {
return fmt.Errorf("failed to create output file: %w", err)
}
defer outFile.Close()
// Create gzip writer
gzWriter, err := gzip.NewWriterLevel(outFile, config.CompressionLevel)
if err != nil {
return fmt.Errorf("failed to create gzip writer: %w", err)
}
defer gzWriter.Close()
// Create tar writer
tarWriter := tar.NewWriter(gzWriter)
defer tarWriter.Close()
// Add each changed file to archive
for i, changedFile := range changedFiles {
// Check context cancellation
select {
case <-ctx.Done():
return ctx.Err()
default:
}
e.log.Debug("Adding file to archive",
"file", changedFile.RelativePath,
"progress", fmt.Sprintf("%d/%d", i+1, len(changedFiles)))
if err := e.addFileToTar(tarWriter, changedFile); err != nil {
return fmt.Errorf("failed to add file %s: %w", changedFile.RelativePath, err)
}
}
return nil
}
// addFileToTar adds a single file to the tar archive
func (e *PostgresIncrementalEngine) addFileToTar(tarWriter *tar.Writer, changedFile ChangedFile) error {
// Open the file
file, err := os.Open(changedFile.AbsolutePath)
if err != nil {
return fmt.Errorf("failed to open file: %w", err)
}
defer file.Close()
// Get file info
info, err := file.Stat()
if err != nil {
return fmt.Errorf("failed to stat file: %w", err)
}
// Skip if file has been deleted/changed since scan
if info.Size() != changedFile.Size {
e.log.Warn("File size changed since scan, using current size",
"file", changedFile.RelativePath,
"old_size", changedFile.Size,
"new_size", info.Size())
}
// Create tar header
header := &tar.Header{
Name: changedFile.RelativePath,
Size: info.Size(),
Mode: int64(info.Mode()),
ModTime: info.ModTime(),
}
// Write header
if err := tarWriter.WriteHeader(header); err != nil {
return fmt.Errorf("failed to write tar header: %w", err)
}
// Copy file content
if _, err := io.Copy(tarWriter, file); err != nil {
return fmt.Errorf("failed to copy file content: %w", err)
}
return nil
}