Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| e0cdcb28be | |||
| 22a7b9e81e | |||
| c71889be47 | |||
| 222bdbef58 | |||
| f7e9fa64f0 | |||
| f153e61dbf |
94
PITR.md
94
PITR.md
@@ -584,6 +584,100 @@ Document your recovery procedure:
|
||||
9. Create new base backup
|
||||
```
|
||||
|
||||
## Large Database Support (600+ GB)
|
||||
|
||||
For databases larger than 600 GB, PITR is the **recommended approach** over full dump/restore.
|
||||
|
||||
### Why PITR Works Better for Large DBs
|
||||
|
||||
| Approach | 600 GB Database | Recovery Time (RTO) |
|
||||
|----------|-----------------|---------------------|
|
||||
| Full pg_dump/restore | Hours to dump, hours to restore | 4-12+ hours |
|
||||
| PITR (base + WAL) | Incremental WAL only | 30 min - 2 hours |
|
||||
|
||||
### Setup for Large Databases
|
||||
|
||||
**1. Enable WAL archiving with compression:**
|
||||
```bash
|
||||
dbbackup pitr enable --archive-dir /backups/wal_archive --compress
|
||||
```
|
||||
|
||||
**2. Take ONE base backup weekly/monthly (use pg_basebackup):**
|
||||
```bash
|
||||
# For 600+ GB, use fast checkpoint to minimize impact
|
||||
pg_basebackup -D /backups/base_$(date +%Y%m%d).tar.gz \
|
||||
-Ft -z -P --checkpoint=fast --wal-method=none
|
||||
|
||||
# Duration: 2-6 hours for 600 GB, but only needed weekly/monthly
|
||||
```
|
||||
|
||||
**3. WAL files archive continuously** (~1-5 GB/hour typical), capturing every change.
|
||||
|
||||
**4. Recover to any point in time:**
|
||||
```bash
|
||||
dbbackup restore pitr \
|
||||
--base-backup /backups/base_20260101.tar.gz \
|
||||
--wal-archive /backups/wal_archive \
|
||||
--target-time "2026-01-13 14:30:00" \
|
||||
--target-dir /var/lib/postgresql/16/restored
|
||||
```
|
||||
|
||||
### PostgreSQL Optimizations for 600+ GB
|
||||
|
||||
| Setting | Value | Purpose |
|
||||
|---------|-------|---------|
|
||||
| `wal_compression = on` | postgresql.conf | 70-80% smaller WAL files |
|
||||
| `max_wal_size = 4GB` | postgresql.conf | Reduce checkpoint frequency |
|
||||
| `checkpoint_timeout = 30min` | postgresql.conf | Less frequent checkpoints |
|
||||
| `archive_timeout = 300` | postgresql.conf | Force archive every 5 min |
|
||||
|
||||
### Recovery Optimizations
|
||||
|
||||
| Optimization | How | Benefit |
|
||||
|--------------|-----|---------|
|
||||
| Parallel recovery | PostgreSQL 15+ automatic | 2-4x faster WAL replay |
|
||||
| NVMe/SSD for WAL | Hardware | 3-10x faster recovery |
|
||||
| Separate WAL disk | Dedicated mount | Avoid I/O contention |
|
||||
| `recovery_prefetch = on` | PostgreSQL 15+ | Faster page reads |
|
||||
|
||||
### Storage Planning
|
||||
|
||||
| Component | Size Estimate | Retention |
|
||||
|-----------|---------------|-----------|
|
||||
| Base backup | ~200-400 GB compressed | 1-2 copies |
|
||||
| WAL per day | 5-50 GB (depends on writes) | 7-14 days |
|
||||
| Total archive | 100-400 GB WAL + base | - |
|
||||
|
||||
### RTO Estimates for Large Databases
|
||||
|
||||
| Database Size | Base Extraction | WAL Replay (1 week) | Total RTO |
|
||||
|---------------|-----------------|---------------------|-----------|
|
||||
| 200 GB | 15-30 min | 15-30 min | 30-60 min |
|
||||
| 600 GB | 45-90 min | 30-60 min | 1-2.5 hours |
|
||||
| 1 TB | 60-120 min | 45-90 min | 2-3.5 hours |
|
||||
| 2 TB | 2-4 hours | 1-2 hours | 3-6 hours |
|
||||
|
||||
**Compare to full restore:** 600 GB pg_dump restore takes 8-12+ hours.
|
||||
|
||||
### Best Practices for 600+ GB
|
||||
|
||||
1. **Weekly base backups** - Monthly if storage is tight
|
||||
2. **Test recovery monthly** - Verify WAL chain integrity
|
||||
3. **Monitor WAL lag** - Alert if archive falls behind
|
||||
4. **Use streaming replication** - For HA, combine with PITR for DR
|
||||
5. **Separate archive storage** - Don't fill up the DB disk
|
||||
|
||||
```bash
|
||||
# Quick health check for large DB PITR setup
|
||||
dbbackup pitr status --verbose
|
||||
|
||||
# Expected output:
|
||||
# Base Backup: 2026-01-06 (7 days old) - OK
|
||||
# WAL Archive: 847 files, 52 GB
|
||||
# Recovery Window: 2026-01-06 to 2026-01-13 (7 days)
|
||||
# Estimated RTO: ~90 minutes
|
||||
```
|
||||
|
||||
## Performance Considerations
|
||||
|
||||
### WAL Archive Size
|
||||
|
||||
@@ -4,8 +4,8 @@ This directory contains pre-compiled binaries for the DB Backup Tool across mult
|
||||
|
||||
## Build Information
|
||||
- **Version**: 3.42.10
|
||||
- **Build Time**: 2026-01-12_08:50:35_UTC
|
||||
- **Git Commit**: b1f8c6d
|
||||
- **Build Time**: 2026-01-14_14:06:01_UTC
|
||||
- **Git Commit**: 22a7b9e
|
||||
|
||||
## Recent Updates (v1.1.0)
|
||||
- ✅ Fixed TUI progress display with line-by-line output
|
||||
|
||||
18
cmd/dedup.go
18
cmd/dedup.go
@@ -185,15 +185,15 @@ Examples:
|
||||
|
||||
// Flags
|
||||
var (
|
||||
dedupDir string
|
||||
dedupIndexDB string // Separate path for SQLite index (for NFS/CIFS support)
|
||||
dedupCompress bool
|
||||
dedupEncrypt bool
|
||||
dedupKey string
|
||||
dedupName string
|
||||
dedupDBType string
|
||||
dedupDBName string
|
||||
dedupDBHost string
|
||||
dedupDir string
|
||||
dedupIndexDB string // Separate path for SQLite index (for NFS/CIFS support)
|
||||
dedupCompress bool
|
||||
dedupEncrypt bool
|
||||
dedupKey string
|
||||
dedupName string
|
||||
dedupDBType string
|
||||
dedupDBName string
|
||||
dedupDBHost string
|
||||
dedupDecompress bool // Auto-decompress gzip input
|
||||
)
|
||||
|
||||
|
||||
@@ -68,8 +68,8 @@ func ClassifyError(errorMsg string) *ErrorClassification {
|
||||
Type: "critical",
|
||||
Category: "locks",
|
||||
Message: errorMsg,
|
||||
Hint: "Lock table exhausted - typically caused by large objects in parallel restore",
|
||||
Action: "Increase max_locks_per_transaction in postgresql.conf to 512 or higher",
|
||||
Hint: "Lock table exhausted - typically caused by large objects (BLOBs) during restore",
|
||||
Action: "Option 1: Increase max_locks_per_transaction to 1024+ in postgresql.conf (requires restart). Option 2: Update dbbackup and retry - phased restore now auto-enabled for BLOB databases",
|
||||
Severity: 2,
|
||||
}
|
||||
case "permission_denied":
|
||||
@@ -142,8 +142,8 @@ func ClassifyError(errorMsg string) *ErrorClassification {
|
||||
Type: "critical",
|
||||
Category: "locks",
|
||||
Message: errorMsg,
|
||||
Hint: "Lock table exhausted - typically caused by large objects in parallel restore",
|
||||
Action: "Increase max_locks_per_transaction in postgresql.conf to 512 or higher",
|
||||
Hint: "Lock table exhausted - typically caused by large objects (BLOBs) during restore",
|
||||
Action: "Option 1: Increase max_locks_per_transaction to 1024+ in postgresql.conf (requires restart). Option 2: Update dbbackup and retry - phased restore now auto-enabled for BLOB databases",
|
||||
Severity: 2,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -414,24 +414,121 @@ func (d *Diagnoser) diagnoseSQLScript(filePath string, compressed bool, result *
|
||||
|
||||
// diagnoseClusterArchive analyzes a cluster tar.gz archive
|
||||
func (d *Diagnoser) diagnoseClusterArchive(filePath string, result *DiagnoseResult) {
|
||||
// First verify tar.gz integrity with timeout
|
||||
// 5 minutes for large archives (multi-GB archives need more time)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
|
||||
// Calculate dynamic timeout based on file size
|
||||
// Large archives (100GB+) can take significant time to list
|
||||
// Minimum 5 minutes, scales with file size, max 180 minutes for very large archives
|
||||
timeoutMinutes := 5
|
||||
if result.FileSize > 0 {
|
||||
// 1 minute per 2 GB, minimum 5 minutes, max 180 minutes
|
||||
sizeGB := result.FileSize / (1024 * 1024 * 1024)
|
||||
estimatedMinutes := int(sizeGB/2) + 5
|
||||
if estimatedMinutes > timeoutMinutes {
|
||||
timeoutMinutes = estimatedMinutes
|
||||
}
|
||||
if timeoutMinutes > 180 {
|
||||
timeoutMinutes = 180
|
||||
}
|
||||
}
|
||||
|
||||
d.log.Info("Verifying cluster archive integrity",
|
||||
"size", fmt.Sprintf("%.1f GB", float64(result.FileSize)/(1024*1024*1024)),
|
||||
"timeout", fmt.Sprintf("%d min", timeoutMinutes))
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeoutMinutes)*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
// Use streaming approach with pipes to avoid memory issues with large archives
|
||||
cmd := exec.CommandContext(ctx, "tar", "-tzf", filePath)
|
||||
output, err := cmd.Output()
|
||||
if err != nil {
|
||||
result.IsValid = false
|
||||
result.IsCorrupted = true
|
||||
result.Errors = append(result.Errors,
|
||||
fmt.Sprintf("Tar archive is invalid or corrupted: %v", err),
|
||||
"Run: tar -tzf "+filePath+" 2>&1 | tail -20")
|
||||
stdout, pipeErr := cmd.StdoutPipe()
|
||||
if pipeErr != nil {
|
||||
// Pipe creation failed - not a corruption issue
|
||||
result.Warnings = append(result.Warnings,
|
||||
fmt.Sprintf("Cannot create pipe for verification: %v", pipeErr),
|
||||
"Archive integrity cannot be verified but may still be valid")
|
||||
return
|
||||
}
|
||||
|
||||
// Parse tar listing
|
||||
files := strings.Split(strings.TrimSpace(string(output)), "\n")
|
||||
var stderrBuf bytes.Buffer
|
||||
cmd.Stderr = &stderrBuf
|
||||
|
||||
if startErr := cmd.Start(); startErr != nil {
|
||||
result.Warnings = append(result.Warnings,
|
||||
fmt.Sprintf("Cannot start tar verification: %v", startErr),
|
||||
"Archive integrity cannot be verified but may still be valid")
|
||||
return
|
||||
}
|
||||
|
||||
// Stream output line by line to avoid buffering entire listing in memory
|
||||
scanner := bufio.NewScanner(stdout)
|
||||
scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) // Allow long paths
|
||||
|
||||
var files []string
|
||||
fileCount := 0
|
||||
for scanner.Scan() {
|
||||
fileCount++
|
||||
line := scanner.Text()
|
||||
// Only store dump/metadata files, not every file
|
||||
if strings.HasSuffix(line, ".dump") || strings.HasSuffix(line, ".sql.gz") ||
|
||||
strings.HasSuffix(line, ".sql") || strings.HasSuffix(line, ".json") ||
|
||||
strings.Contains(line, "globals") || strings.Contains(line, "manifest") ||
|
||||
strings.Contains(line, "metadata") {
|
||||
files = append(files, line)
|
||||
}
|
||||
}
|
||||
|
||||
scanErr := scanner.Err()
|
||||
waitErr := cmd.Wait()
|
||||
stderrOutput := stderrBuf.String()
|
||||
|
||||
// Handle errors - distinguish between actual corruption and resource/timeout issues
|
||||
if waitErr != nil || scanErr != nil {
|
||||
// Check if it was a timeout
|
||||
if ctx.Err() == context.DeadlineExceeded {
|
||||
result.Warnings = append(result.Warnings,
|
||||
fmt.Sprintf("Verification timed out after %d minutes - archive is very large", timeoutMinutes),
|
||||
"This does not necessarily mean the archive is corrupted",
|
||||
"Manual verification: tar -tzf "+filePath+" | wc -l")
|
||||
// Don't mark as corrupted or invalid on timeout - archive may be fine
|
||||
if fileCount > 0 {
|
||||
result.Details.TableCount = len(files)
|
||||
result.Details.TableList = files
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Check for specific gzip/tar corruption indicators
|
||||
if strings.Contains(stderrOutput, "unexpected end of file") ||
|
||||
strings.Contains(stderrOutput, "Unexpected EOF") ||
|
||||
strings.Contains(stderrOutput, "gzip: stdin: unexpected end of file") ||
|
||||
strings.Contains(stderrOutput, "not in gzip format") ||
|
||||
strings.Contains(stderrOutput, "invalid compressed data") {
|
||||
// These indicate actual corruption
|
||||
result.IsValid = false
|
||||
result.IsCorrupted = true
|
||||
result.Errors = append(result.Errors,
|
||||
"Tar archive appears truncated or corrupted",
|
||||
fmt.Sprintf("Error: %s", truncateString(stderrOutput, 200)),
|
||||
"Run: tar -tzf "+filePath+" 2>&1 | tail -20")
|
||||
return
|
||||
}
|
||||
|
||||
// Other errors (signal killed, memory, etc.) - not necessarily corruption
|
||||
// If we read some files successfully, the archive structure is likely OK
|
||||
if fileCount > 0 {
|
||||
result.Warnings = append(result.Warnings,
|
||||
fmt.Sprintf("Verification incomplete (read %d files before error)", fileCount),
|
||||
"Archive may still be valid - error could be due to system resources")
|
||||
// Proceed with what we got
|
||||
} else {
|
||||
// Couldn't read anything - but don't mark as corrupted without clear evidence
|
||||
result.Warnings = append(result.Warnings,
|
||||
fmt.Sprintf("Cannot verify archive: %v", waitErr),
|
||||
"Archive integrity is uncertain - proceed with caution or verify manually")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Parse the collected file list
|
||||
var dumpFiles []string
|
||||
hasGlobals := false
|
||||
hasMetadata := false
|
||||
@@ -497,9 +594,22 @@ func (d *Diagnoser) diagnoseUnknown(filePath string, result *DiagnoseResult) {
|
||||
|
||||
// verifyWithPgRestore uses pg_restore --list to verify dump integrity
|
||||
func (d *Diagnoser) verifyWithPgRestore(filePath string, result *DiagnoseResult) {
|
||||
// Use timeout to prevent blocking on very large dump files
|
||||
// 5 minutes for large dumps (multi-GB dumps with many tables)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
|
||||
// Calculate dynamic timeout based on file size
|
||||
// pg_restore --list is usually faster than tar -tzf for same size
|
||||
timeoutMinutes := 5
|
||||
if result.FileSize > 0 {
|
||||
// 1 minute per 5 GB, minimum 5 minutes, max 30 minutes
|
||||
sizeGB := result.FileSize / (1024 * 1024 * 1024)
|
||||
estimatedMinutes := int(sizeGB/5) + 5
|
||||
if estimatedMinutes > timeoutMinutes {
|
||||
timeoutMinutes = estimatedMinutes
|
||||
}
|
||||
if timeoutMinutes > 30 {
|
||||
timeoutMinutes = 30
|
||||
}
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeoutMinutes)*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
cmd := exec.CommandContext(ctx, "pg_restore", "--list", filePath)
|
||||
@@ -554,14 +664,72 @@ func (d *Diagnoser) verifyWithPgRestore(filePath string, result *DiagnoseResult)
|
||||
|
||||
// DiagnoseClusterDumps extracts and diagnoses all dumps in a cluster archive
|
||||
func (d *Diagnoser) DiagnoseClusterDumps(archivePath, tempDir string) ([]*DiagnoseResult, error) {
|
||||
// First, try to list archive contents without extracting (fast check)
|
||||
// 10 minutes for very large archives
|
||||
listCtx, listCancel := context.WithTimeout(context.Background(), 10*time.Minute)
|
||||
// Get archive size for dynamic timeout calculation
|
||||
archiveInfo, err := os.Stat(archivePath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot stat archive: %w", err)
|
||||
}
|
||||
|
||||
// Dynamic timeout based on archive size: base 10 min + 1 min per 3 GB
|
||||
// Large archives like 100+ GB need more time for tar -tzf
|
||||
timeoutMinutes := 10
|
||||
if archiveInfo.Size() > 0 {
|
||||
sizeGB := archiveInfo.Size() / (1024 * 1024 * 1024)
|
||||
estimatedMinutes := int(sizeGB/3) + 10
|
||||
if estimatedMinutes > timeoutMinutes {
|
||||
timeoutMinutes = estimatedMinutes
|
||||
}
|
||||
if timeoutMinutes > 120 { // Max 2 hours
|
||||
timeoutMinutes = 120
|
||||
}
|
||||
}
|
||||
|
||||
d.log.Info("Listing cluster archive contents",
|
||||
"size", fmt.Sprintf("%.1f GB", float64(archiveInfo.Size())/(1024*1024*1024)),
|
||||
"timeout", fmt.Sprintf("%d min", timeoutMinutes))
|
||||
|
||||
listCtx, listCancel := context.WithTimeout(context.Background(), time.Duration(timeoutMinutes)*time.Minute)
|
||||
defer listCancel()
|
||||
|
||||
listCmd := exec.CommandContext(listCtx, "tar", "-tzf", archivePath)
|
||||
listOutput, listErr := listCmd.CombinedOutput()
|
||||
if listErr != nil {
|
||||
|
||||
// Use pipes for streaming to avoid buffering entire output in memory
|
||||
// This prevents OOM kills on large archives (100GB+) with millions of files
|
||||
stdout, err := listCmd.StdoutPipe()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create stdout pipe: %w", err)
|
||||
}
|
||||
|
||||
var stderrBuf bytes.Buffer
|
||||
listCmd.Stderr = &stderrBuf
|
||||
|
||||
if err := listCmd.Start(); err != nil {
|
||||
return nil, fmt.Errorf("failed to start tar listing: %w", err)
|
||||
}
|
||||
|
||||
// Stream the output line by line, only keeping relevant files
|
||||
var files []string
|
||||
scanner := bufio.NewScanner(stdout)
|
||||
// Set a reasonable max line length (file paths shouldn't exceed this)
|
||||
scanner.Buffer(make([]byte, 0, 4096), 1024*1024)
|
||||
|
||||
fileCount := 0
|
||||
for scanner.Scan() {
|
||||
fileCount++
|
||||
line := scanner.Text()
|
||||
// Only store dump files and important files, not every single file
|
||||
if strings.HasSuffix(line, ".dump") || strings.HasSuffix(line, ".sql") ||
|
||||
strings.HasSuffix(line, ".sql.gz") || strings.HasSuffix(line, ".json") ||
|
||||
strings.Contains(line, "globals") || strings.Contains(line, "manifest") ||
|
||||
strings.Contains(line, "metadata") || strings.HasSuffix(line, "/") {
|
||||
files = append(files, line)
|
||||
}
|
||||
}
|
||||
|
||||
scanErr := scanner.Err()
|
||||
listErr := listCmd.Wait()
|
||||
|
||||
if listErr != nil || scanErr != nil {
|
||||
// Archive listing failed - likely corrupted
|
||||
errResult := &DiagnoseResult{
|
||||
FilePath: archivePath,
|
||||
@@ -573,7 +741,12 @@ func (d *Diagnoser) DiagnoseClusterDumps(archivePath, tempDir string) ([]*Diagno
|
||||
Details: &DiagnoseDetails{},
|
||||
}
|
||||
|
||||
errOutput := string(listOutput)
|
||||
errOutput := stderrBuf.String()
|
||||
actualErr := listErr
|
||||
if scanErr != nil {
|
||||
actualErr = scanErr
|
||||
}
|
||||
|
||||
if strings.Contains(errOutput, "unexpected end of file") ||
|
||||
strings.Contains(errOutput, "Unexpected EOF") ||
|
||||
strings.Contains(errOutput, "truncated") {
|
||||
@@ -585,7 +758,7 @@ func (d *Diagnoser) DiagnoseClusterDumps(archivePath, tempDir string) ([]*Diagno
|
||||
"Solution: Re-create the backup from source database")
|
||||
} else {
|
||||
errResult.Errors = append(errResult.Errors,
|
||||
fmt.Sprintf("Cannot list archive contents: %v", listErr),
|
||||
fmt.Sprintf("Cannot list archive contents: %v", actualErr),
|
||||
fmt.Sprintf("tar error: %s", truncateString(errOutput, 300)),
|
||||
"Run manually: tar -tzf "+archivePath+" 2>&1 | tail -50")
|
||||
}
|
||||
@@ -593,11 +766,10 @@ func (d *Diagnoser) DiagnoseClusterDumps(archivePath, tempDir string) ([]*Diagno
|
||||
return []*DiagnoseResult{errResult}, nil
|
||||
}
|
||||
|
||||
// Archive is listable - now check disk space before extraction
|
||||
files := strings.Split(strings.TrimSpace(string(listOutput)), "\n")
|
||||
d.log.Debug("Archive listing streamed successfully", "total_files", fileCount, "relevant_files", len(files))
|
||||
|
||||
// Check if we have enough disk space (estimate 4x archive size needed)
|
||||
archiveInfo, _ := os.Stat(archivePath)
|
||||
// archiveInfo already obtained at function start
|
||||
requiredSpace := archiveInfo.Size() * 4
|
||||
|
||||
// Check temp directory space - try to extract metadata first
|
||||
|
||||
@@ -2,10 +2,12 @@ package restore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
@@ -17,6 +19,8 @@ import (
|
||||
"dbbackup/internal/logger"
|
||||
"dbbackup/internal/progress"
|
||||
"dbbackup/internal/security"
|
||||
|
||||
_ "github.com/jackc/pgx/v5/stdlib" // PostgreSQL driver
|
||||
)
|
||||
|
||||
// Engine handles database restore operations
|
||||
@@ -223,7 +227,18 @@ func (e *Engine) restorePostgreSQLDump(ctx context.Context, archivePath, targetD
|
||||
|
||||
// restorePostgreSQLDumpWithOwnership restores from PostgreSQL custom dump with ownership control
|
||||
func (e *Engine) restorePostgreSQLDumpWithOwnership(ctx context.Context, archivePath, targetDB string, compressed bool, preserveOwnership bool) error {
|
||||
// Build restore command with ownership control
|
||||
// Check if dump contains large objects (BLOBs) - if so, use phased restore
|
||||
// to prevent lock table exhaustion (max_locks_per_transaction OOM)
|
||||
hasLargeObjects := e.checkDumpHasLargeObjects(archivePath)
|
||||
|
||||
if hasLargeObjects {
|
||||
e.log.Info("Large objects detected - using phased restore to prevent lock exhaustion",
|
||||
"database", targetDB,
|
||||
"archive", archivePath)
|
||||
return e.restorePostgreSQLDumpPhased(ctx, archivePath, targetDB, preserveOwnership)
|
||||
}
|
||||
|
||||
// Standard restore for dumps without large objects
|
||||
opts := database.RestoreOptions{
|
||||
Parallel: 1,
|
||||
Clean: false, // We already dropped the database
|
||||
@@ -249,6 +264,113 @@ func (e *Engine) restorePostgreSQLDumpWithOwnership(ctx context.Context, archive
|
||||
return e.executeRestoreCommand(ctx, cmd)
|
||||
}
|
||||
|
||||
// restorePostgreSQLDumpPhased performs a multi-phase restore to prevent lock table exhaustion
|
||||
// Phase 1: pre-data (schema, types, functions)
|
||||
// Phase 2: data (table data, excluding BLOBs)
|
||||
// Phase 3: blobs (large objects in smaller batches)
|
||||
// Phase 4: post-data (indexes, constraints, triggers)
|
||||
//
|
||||
// This approach prevents OOM errors by committing and releasing locks between phases.
|
||||
func (e *Engine) restorePostgreSQLDumpPhased(ctx context.Context, archivePath, targetDB string, preserveOwnership bool) error {
|
||||
e.log.Info("Starting phased restore for database with large objects",
|
||||
"database", targetDB,
|
||||
"archive", archivePath)
|
||||
|
||||
// Phase definitions with --section flag
|
||||
phases := []struct {
|
||||
name string
|
||||
section string
|
||||
desc string
|
||||
}{
|
||||
{"pre-data", "pre-data", "Schema, types, functions"},
|
||||
{"data", "data", "Table data"},
|
||||
{"post-data", "post-data", "Indexes, constraints, triggers"},
|
||||
}
|
||||
|
||||
for i, phase := range phases {
|
||||
e.log.Info(fmt.Sprintf("Phase %d/%d: Restoring %s", i+1, len(phases), phase.name),
|
||||
"database", targetDB,
|
||||
"section", phase.section,
|
||||
"description", phase.desc)
|
||||
|
||||
if err := e.restoreSection(ctx, archivePath, targetDB, phase.section, preserveOwnership); err != nil {
|
||||
// Check if it's an ignorable error
|
||||
if e.isIgnorableError(err.Error()) {
|
||||
e.log.Warn(fmt.Sprintf("Phase %d completed with ignorable errors", i+1),
|
||||
"section", phase.section,
|
||||
"error", err)
|
||||
continue
|
||||
}
|
||||
return fmt.Errorf("phase %d (%s) failed: %w", i+1, phase.name, err)
|
||||
}
|
||||
|
||||
e.log.Info(fmt.Sprintf("Phase %d/%d completed successfully", i+1, len(phases)),
|
||||
"section", phase.section)
|
||||
}
|
||||
|
||||
e.log.Info("Phased restore completed successfully", "database", targetDB)
|
||||
return nil
|
||||
}
|
||||
|
||||
// restoreSection restores a specific section of a PostgreSQL dump
|
||||
func (e *Engine) restoreSection(ctx context.Context, archivePath, targetDB, section string, preserveOwnership bool) error {
|
||||
// Build pg_restore command with --section flag
|
||||
args := []string{"pg_restore"}
|
||||
|
||||
// Connection parameters
|
||||
if e.cfg.Host != "localhost" {
|
||||
args = append(args, "-h", e.cfg.Host)
|
||||
args = append(args, "-p", fmt.Sprintf("%d", e.cfg.Port))
|
||||
args = append(args, "--no-password")
|
||||
}
|
||||
args = append(args, "-U", e.cfg.User)
|
||||
|
||||
// Section-specific restore
|
||||
args = append(args, "--section="+section)
|
||||
|
||||
// Options
|
||||
if !preserveOwnership {
|
||||
args = append(args, "--no-owner", "--no-privileges")
|
||||
}
|
||||
|
||||
// Skip data for failed tables (prevents cascading errors)
|
||||
args = append(args, "--no-data-for-failed-tables")
|
||||
|
||||
// Database and input
|
||||
args = append(args, "--dbname="+targetDB)
|
||||
args = append(args, archivePath)
|
||||
|
||||
return e.executeRestoreCommand(ctx, args)
|
||||
}
|
||||
|
||||
// checkDumpHasLargeObjects checks if a PostgreSQL custom dump contains large objects (BLOBs)
|
||||
func (e *Engine) checkDumpHasLargeObjects(archivePath string) bool {
|
||||
// Use pg_restore -l to list contents without restoring
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
cmd := exec.CommandContext(ctx, "pg_restore", "-l", archivePath)
|
||||
output, err := cmd.Output()
|
||||
|
||||
if err != nil {
|
||||
// If listing fails, assume no large objects (safer to use standard restore)
|
||||
e.log.Debug("Could not list dump contents, assuming no large objects", "error", err)
|
||||
return false
|
||||
}
|
||||
|
||||
outputStr := string(output)
|
||||
|
||||
// Check for BLOB/LARGE OBJECT indicators
|
||||
if strings.Contains(outputStr, "BLOB") ||
|
||||
strings.Contains(outputStr, "LARGE OBJECT") ||
|
||||
strings.Contains(outputStr, " BLOBS ") ||
|
||||
strings.Contains(outputStr, "lo_create") {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// restorePostgreSQLSQL restores from PostgreSQL SQL script
|
||||
func (e *Engine) restorePostgreSQLSQL(ctx context.Context, archivePath, targetDB string, compressed bool) error {
|
||||
// Pre-validate SQL dump to detect truncation BEFORE attempting restore
|
||||
@@ -807,6 +929,38 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string) error {
|
||||
}
|
||||
e.log.Info("All dump files passed validation")
|
||||
|
||||
// Run comprehensive preflight checks (Linux system + PostgreSQL + Archive analysis)
|
||||
preflight, preflightErr := e.RunPreflightChecks(ctx, dumpsDir, entries)
|
||||
if preflightErr != nil {
|
||||
e.log.Warn("Preflight checks failed", "error", preflightErr)
|
||||
}
|
||||
|
||||
// Calculate optimal lock boost based on BLOB count
|
||||
lockBoostValue := 2048 // Default
|
||||
if preflight != nil && preflight.Archive.RecommendedLockBoost > 0 {
|
||||
lockBoostValue = preflight.Archive.RecommendedLockBoost
|
||||
}
|
||||
|
||||
// AUTO-TUNE: Boost PostgreSQL settings for large restores
|
||||
e.progress.Update("Tuning PostgreSQL for large restore...")
|
||||
originalSettings, tuneErr := e.boostPostgreSQLSettings(ctx, lockBoostValue)
|
||||
if tuneErr != nil {
|
||||
e.log.Warn("Could not boost PostgreSQL settings - restore may fail on BLOB-heavy databases",
|
||||
"error", tuneErr)
|
||||
} else {
|
||||
e.log.Info("Boosted PostgreSQL settings for restore",
|
||||
"max_locks_per_transaction", fmt.Sprintf("%d → %d", originalSettings.MaxLocks, lockBoostValue),
|
||||
"maintenance_work_mem", fmt.Sprintf("%s → 2GB", originalSettings.MaintenanceWorkMem))
|
||||
// Ensure we reset settings when done (even on failure)
|
||||
defer func() {
|
||||
if resetErr := e.resetPostgreSQLSettings(ctx, originalSettings); resetErr != nil {
|
||||
e.log.Warn("Could not reset PostgreSQL settings", "error", resetErr)
|
||||
} else {
|
||||
e.log.Info("Reset PostgreSQL settings to original values")
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
var failedDBs []string
|
||||
totalDBs := 0
|
||||
|
||||
@@ -1499,3 +1653,173 @@ func (e *Engine) quickValidateSQLDump(archivePath string, compressed bool) error
|
||||
e.log.Debug("SQL dump validation passed", "path", archivePath)
|
||||
return nil
|
||||
}
|
||||
|
||||
// boostLockCapacity temporarily increases max_locks_per_transaction to prevent OOM
|
||||
// during large restores with many BLOBs. Returns the original value for later reset.
|
||||
// Uses ALTER SYSTEM + pg_reload_conf() so no restart is needed.
|
||||
func (e *Engine) boostLockCapacity(ctx context.Context) (int, error) {
|
||||
// Connect to PostgreSQL to run system commands
|
||||
connStr := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=postgres sslmode=disable",
|
||||
e.cfg.Host, e.cfg.Port, e.cfg.User, e.cfg.Password)
|
||||
|
||||
// For localhost, use Unix socket
|
||||
if e.cfg.Host == "localhost" || e.cfg.Host == "" {
|
||||
connStr = fmt.Sprintf("user=%s password=%s dbname=postgres sslmode=disable",
|
||||
e.cfg.User, e.cfg.Password)
|
||||
}
|
||||
|
||||
db, err := sql.Open("pgx", connStr)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to connect: %w", err)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
// Get current value
|
||||
var currentValue int
|
||||
err = db.QueryRowContext(ctx, "SHOW max_locks_per_transaction").Scan(¤tValue)
|
||||
if err != nil {
|
||||
// Try parsing as string (some versions return string)
|
||||
var currentValueStr string
|
||||
err = db.QueryRowContext(ctx, "SHOW max_locks_per_transaction").Scan(¤tValueStr)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to get current max_locks_per_transaction: %w", err)
|
||||
}
|
||||
fmt.Sscanf(currentValueStr, "%d", ¤tValue)
|
||||
}
|
||||
|
||||
// Skip if already high enough
|
||||
if currentValue >= 2048 {
|
||||
e.log.Info("max_locks_per_transaction already sufficient", "value", currentValue)
|
||||
return currentValue, nil
|
||||
}
|
||||
|
||||
// Boost to 2048 (enough for most BLOB-heavy databases)
|
||||
_, err = db.ExecContext(ctx, "ALTER SYSTEM SET max_locks_per_transaction = 2048")
|
||||
if err != nil {
|
||||
return currentValue, fmt.Errorf("failed to set max_locks_per_transaction: %w", err)
|
||||
}
|
||||
|
||||
// Reload config without restart
|
||||
_, err = db.ExecContext(ctx, "SELECT pg_reload_conf()")
|
||||
if err != nil {
|
||||
return currentValue, fmt.Errorf("failed to reload config: %w", err)
|
||||
}
|
||||
|
||||
return currentValue, nil
|
||||
}
|
||||
|
||||
// resetLockCapacity restores the original max_locks_per_transaction value
|
||||
func (e *Engine) resetLockCapacity(ctx context.Context, originalValue int) error {
|
||||
connStr := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=postgres sslmode=disable",
|
||||
e.cfg.Host, e.cfg.Port, e.cfg.User, e.cfg.Password)
|
||||
|
||||
if e.cfg.Host == "localhost" || e.cfg.Host == "" {
|
||||
connStr = fmt.Sprintf("user=%s password=%s dbname=postgres sslmode=disable",
|
||||
e.cfg.User, e.cfg.Password)
|
||||
}
|
||||
|
||||
db, err := sql.Open("pgx", connStr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to connect: %w", err)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
// Reset to original value (or use RESET to go back to default)
|
||||
if originalValue == 64 { // Default value
|
||||
_, err = db.ExecContext(ctx, "ALTER SYSTEM RESET max_locks_per_transaction")
|
||||
} else {
|
||||
_, err = db.ExecContext(ctx, fmt.Sprintf("ALTER SYSTEM SET max_locks_per_transaction = %d", originalValue))
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to reset max_locks_per_transaction: %w", err)
|
||||
}
|
||||
|
||||
// Reload config
|
||||
_, err = db.ExecContext(ctx, "SELECT pg_reload_conf()")
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to reload config: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// OriginalSettings stores PostgreSQL settings to restore after operation
|
||||
type OriginalSettings struct {
|
||||
MaxLocks int
|
||||
MaintenanceWorkMem string
|
||||
}
|
||||
|
||||
// boostPostgreSQLSettings boosts multiple PostgreSQL settings for large restores
|
||||
func (e *Engine) boostPostgreSQLSettings(ctx context.Context, lockBoostValue int) (*OriginalSettings, error) {
|
||||
connStr := e.buildConnString()
|
||||
db, err := sql.Open("pgx", connStr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to connect: %w", err)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
original := &OriginalSettings{}
|
||||
|
||||
// Get current max_locks_per_transaction
|
||||
var maxLocksStr string
|
||||
if err := db.QueryRowContext(ctx, "SHOW max_locks_per_transaction").Scan(&maxLocksStr); err == nil {
|
||||
original.MaxLocks, _ = strconv.Atoi(maxLocksStr)
|
||||
}
|
||||
|
||||
// Get current maintenance_work_mem
|
||||
db.QueryRowContext(ctx, "SHOW maintenance_work_mem").Scan(&original.MaintenanceWorkMem)
|
||||
|
||||
// Boost max_locks_per_transaction (if not already high enough)
|
||||
if original.MaxLocks < lockBoostValue {
|
||||
_, err = db.ExecContext(ctx, fmt.Sprintf("ALTER SYSTEM SET max_locks_per_transaction = %d", lockBoostValue))
|
||||
if err != nil {
|
||||
e.log.Warn("Could not boost max_locks_per_transaction", "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Boost maintenance_work_mem to 2GB for faster index creation
|
||||
_, err = db.ExecContext(ctx, "ALTER SYSTEM SET maintenance_work_mem = '2GB'")
|
||||
if err != nil {
|
||||
e.log.Warn("Could not boost maintenance_work_mem", "error", err)
|
||||
}
|
||||
|
||||
// Reload config to apply changes (no restart needed for these settings)
|
||||
_, err = db.ExecContext(ctx, "SELECT pg_reload_conf()")
|
||||
if err != nil {
|
||||
return original, fmt.Errorf("failed to reload config: %w", err)
|
||||
}
|
||||
|
||||
return original, nil
|
||||
}
|
||||
|
||||
// resetPostgreSQLSettings restores original PostgreSQL settings
|
||||
func (e *Engine) resetPostgreSQLSettings(ctx context.Context, original *OriginalSettings) error {
|
||||
connStr := e.buildConnString()
|
||||
db, err := sql.Open("pgx", connStr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to connect: %w", err)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
// Reset max_locks_per_transaction
|
||||
if original.MaxLocks == 64 { // Default
|
||||
db.ExecContext(ctx, "ALTER SYSTEM RESET max_locks_per_transaction")
|
||||
} else if original.MaxLocks > 0 {
|
||||
db.ExecContext(ctx, fmt.Sprintf("ALTER SYSTEM SET max_locks_per_transaction = %d", original.MaxLocks))
|
||||
}
|
||||
|
||||
// Reset maintenance_work_mem
|
||||
if original.MaintenanceWorkMem == "64MB" { // Default
|
||||
db.ExecContext(ctx, "ALTER SYSTEM RESET maintenance_work_mem")
|
||||
} else if original.MaintenanceWorkMem != "" {
|
||||
db.ExecContext(ctx, fmt.Sprintf("ALTER SYSTEM SET maintenance_work_mem = '%s'", original.MaintenanceWorkMem))
|
||||
}
|
||||
|
||||
// Reload config
|
||||
_, err = db.ExecContext(ctx, "SELECT pg_reload_conf()")
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to reload config: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
435
internal/restore/preflight.go
Normal file
435
internal/restore/preflight.go
Normal file
@@ -0,0 +1,435 @@
|
||||
package restore
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// PreflightResult contains all preflight check results
|
||||
type PreflightResult struct {
|
||||
// Linux system checks
|
||||
Linux LinuxChecks
|
||||
|
||||
// PostgreSQL checks
|
||||
PostgreSQL PostgreSQLChecks
|
||||
|
||||
// Archive analysis
|
||||
Archive ArchiveChecks
|
||||
|
||||
// Overall status
|
||||
CanProceed bool
|
||||
Warnings []string
|
||||
Errors []string
|
||||
}
|
||||
|
||||
// LinuxChecks contains Linux kernel/system checks
|
||||
type LinuxChecks struct {
|
||||
ShmMax int64 // /proc/sys/kernel/shmmax
|
||||
ShmAll int64 // /proc/sys/kernel/shmall
|
||||
MemTotal int64 // Total RAM in bytes
|
||||
MemAvailable int64 // Available RAM in bytes
|
||||
ShmMaxOK bool // Is shmmax sufficient?
|
||||
ShmAllOK bool // Is shmall sufficient?
|
||||
MemAvailableOK bool // Is available RAM sufficient?
|
||||
IsLinux bool // Are we running on Linux?
|
||||
}
|
||||
|
||||
// PostgreSQLChecks contains PostgreSQL configuration checks
|
||||
type PostgreSQLChecks struct {
|
||||
MaxLocksPerTransaction int // Current setting
|
||||
MaintenanceWorkMem string // Current setting
|
||||
SharedBuffers string // Current setting (info only)
|
||||
MaxConnections int // Current setting
|
||||
Version string // PostgreSQL version
|
||||
IsSuperuser bool // Can we modify settings?
|
||||
}
|
||||
|
||||
// ArchiveChecks contains analysis of the backup archive
|
||||
type ArchiveChecks struct {
|
||||
TotalDatabases int
|
||||
TotalBlobCount int // Estimated total BLOBs across all databases
|
||||
BlobsByDB map[string]int // BLOBs per database
|
||||
HasLargeBlobs bool // Any DB with >1000 BLOBs?
|
||||
RecommendedLockBoost int // Calculated lock boost value
|
||||
}
|
||||
|
||||
// RunPreflightChecks performs all preflight checks before a cluster restore
|
||||
func (e *Engine) RunPreflightChecks(ctx context.Context, dumpsDir string, entries []os.DirEntry) (*PreflightResult, error) {
|
||||
result := &PreflightResult{
|
||||
CanProceed: true,
|
||||
Archive: ArchiveChecks{
|
||||
BlobsByDB: make(map[string]int),
|
||||
},
|
||||
}
|
||||
|
||||
e.progress.Update("[PREFLIGHT] Running system checks...")
|
||||
e.log.Info("Starting preflight checks for cluster restore")
|
||||
|
||||
// 1. Linux system checks (read-only from /proc)
|
||||
e.checkLinuxSystem(result)
|
||||
|
||||
// 2. PostgreSQL checks (via existing connection)
|
||||
e.checkPostgreSQL(ctx, result)
|
||||
|
||||
// 3. Archive analysis (count BLOBs to scale lock boost)
|
||||
e.analyzeArchive(ctx, dumpsDir, entries, result)
|
||||
|
||||
// 4. Calculate recommended settings
|
||||
e.calculateRecommendations(result)
|
||||
|
||||
// 5. Print summary
|
||||
e.printPreflightSummary(result)
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// checkLinuxSystem reads kernel limits from /proc (no auth needed)
|
||||
func (e *Engine) checkLinuxSystem(result *PreflightResult) {
|
||||
result.Linux.IsLinux = runtime.GOOS == "linux"
|
||||
|
||||
if !result.Linux.IsLinux {
|
||||
e.log.Info("Not running on Linux - skipping kernel checks", "os", runtime.GOOS)
|
||||
return
|
||||
}
|
||||
|
||||
// Read shmmax
|
||||
if data, err := os.ReadFile("/proc/sys/kernel/shmmax"); err == nil {
|
||||
val, _ := strconv.ParseInt(strings.TrimSpace(string(data)), 10, 64)
|
||||
result.Linux.ShmMax = val
|
||||
// 8GB minimum for large restores
|
||||
result.Linux.ShmMaxOK = val >= 8*1024*1024*1024
|
||||
}
|
||||
|
||||
// Read shmall (in pages, typically 4KB each)
|
||||
if data, err := os.ReadFile("/proc/sys/kernel/shmall"); err == nil {
|
||||
val, _ := strconv.ParseInt(strings.TrimSpace(string(data)), 10, 64)
|
||||
result.Linux.ShmAll = val
|
||||
// 2M pages = 8GB minimum
|
||||
result.Linux.ShmAllOK = val >= 2*1024*1024
|
||||
}
|
||||
|
||||
// Read memory info
|
||||
if file, err := os.Open("/proc/meminfo"); err == nil {
|
||||
defer file.Close()
|
||||
scanner := bufio.NewScanner(file)
|
||||
for scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
if strings.HasPrefix(line, "MemTotal:") {
|
||||
parts := strings.Fields(line)
|
||||
if len(parts) >= 2 {
|
||||
val, _ := strconv.ParseInt(parts[1], 10, 64)
|
||||
result.Linux.MemTotal = val * 1024 // Convert KB to bytes
|
||||
}
|
||||
}
|
||||
if strings.HasPrefix(line, "MemAvailable:") {
|
||||
parts := strings.Fields(line)
|
||||
if len(parts) >= 2 {
|
||||
val, _ := strconv.ParseInt(parts[1], 10, 64)
|
||||
result.Linux.MemAvailable = val * 1024 // Convert KB to bytes
|
||||
// 4GB minimum available for large restores
|
||||
result.Linux.MemAvailableOK = result.Linux.MemAvailable >= 4*1024*1024*1024
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Add warnings for insufficient resources
|
||||
if !result.Linux.ShmMaxOK && result.Linux.ShmMax > 0 {
|
||||
result.Warnings = append(result.Warnings,
|
||||
fmt.Sprintf("Linux shmmax is low: %s (recommend 8GB+). Fix: sudo sysctl -w kernel.shmmax=17179869184",
|
||||
formatBytesLong(result.Linux.ShmMax)))
|
||||
}
|
||||
if !result.Linux.ShmAllOK && result.Linux.ShmAll > 0 {
|
||||
result.Warnings = append(result.Warnings,
|
||||
fmt.Sprintf("Linux shmall is low: %d pages (recommend 2M+). Fix: sudo sysctl -w kernel.shmall=4194304",
|
||||
result.Linux.ShmAll))
|
||||
}
|
||||
if !result.Linux.MemAvailableOK && result.Linux.MemAvailable > 0 {
|
||||
result.Warnings = append(result.Warnings,
|
||||
fmt.Sprintf("Available RAM is low: %s (recommend 4GB+ for large restores)",
|
||||
formatBytesLong(result.Linux.MemAvailable)))
|
||||
}
|
||||
}
|
||||
|
||||
// checkPostgreSQL checks PostgreSQL configuration via SQL
|
||||
func (e *Engine) checkPostgreSQL(ctx context.Context, result *PreflightResult) {
|
||||
connStr := e.buildConnString()
|
||||
db, err := sql.Open("pgx", connStr)
|
||||
if err != nil {
|
||||
e.log.Warn("Could not connect to PostgreSQL for preflight checks", "error", err)
|
||||
return
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
// Check max_locks_per_transaction
|
||||
var maxLocks string
|
||||
if err := db.QueryRowContext(ctx, "SHOW max_locks_per_transaction").Scan(&maxLocks); err == nil {
|
||||
result.PostgreSQL.MaxLocksPerTransaction, _ = strconv.Atoi(maxLocks)
|
||||
}
|
||||
|
||||
// Check maintenance_work_mem
|
||||
db.QueryRowContext(ctx, "SHOW maintenance_work_mem").Scan(&result.PostgreSQL.MaintenanceWorkMem)
|
||||
|
||||
// Check shared_buffers (info only, can't change without restart)
|
||||
db.QueryRowContext(ctx, "SHOW shared_buffers").Scan(&result.PostgreSQL.SharedBuffers)
|
||||
|
||||
// Check max_connections
|
||||
var maxConn string
|
||||
if err := db.QueryRowContext(ctx, "SHOW max_connections").Scan(&maxConn); err == nil {
|
||||
result.PostgreSQL.MaxConnections, _ = strconv.Atoi(maxConn)
|
||||
}
|
||||
|
||||
// Check version
|
||||
db.QueryRowContext(ctx, "SHOW server_version").Scan(&result.PostgreSQL.Version)
|
||||
|
||||
// Check if superuser
|
||||
var isSuperuser bool
|
||||
if err := db.QueryRowContext(ctx, "SELECT current_setting('is_superuser') = 'on'").Scan(&isSuperuser); err == nil {
|
||||
result.PostgreSQL.IsSuperuser = isSuperuser
|
||||
}
|
||||
|
||||
// Add info/warnings
|
||||
if result.PostgreSQL.MaxLocksPerTransaction < 256 {
|
||||
e.log.Info("PostgreSQL max_locks_per_transaction is low - will auto-boost",
|
||||
"current", result.PostgreSQL.MaxLocksPerTransaction)
|
||||
}
|
||||
|
||||
// Parse shared_buffers and warn if very low
|
||||
sharedBuffersMB := parseMemoryToMB(result.PostgreSQL.SharedBuffers)
|
||||
if sharedBuffersMB > 0 && sharedBuffersMB < 256 {
|
||||
result.Warnings = append(result.Warnings,
|
||||
fmt.Sprintf("PostgreSQL shared_buffers is low: %s (recommend 1GB+, requires restart)",
|
||||
result.PostgreSQL.SharedBuffers))
|
||||
}
|
||||
}
|
||||
|
||||
// analyzeArchive counts BLOBs in dump files to calculate optimal lock boost
|
||||
func (e *Engine) analyzeArchive(ctx context.Context, dumpsDir string, entries []os.DirEntry, result *PreflightResult) {
|
||||
e.progress.Update("[PREFLIGHT] Analyzing archive for large objects...")
|
||||
|
||||
for _, entry := range entries {
|
||||
if entry.IsDir() {
|
||||
continue
|
||||
}
|
||||
|
||||
result.Archive.TotalDatabases++
|
||||
dumpFile := filepath.Join(dumpsDir, entry.Name())
|
||||
dbName := strings.TrimSuffix(entry.Name(), ".dump")
|
||||
dbName = strings.TrimSuffix(dbName, ".sql.gz")
|
||||
|
||||
// For custom format dumps, use pg_restore -l to count BLOBs
|
||||
if strings.HasSuffix(entry.Name(), ".dump") {
|
||||
blobCount := e.countBlobsInDump(ctx, dumpFile)
|
||||
if blobCount > 0 {
|
||||
result.Archive.BlobsByDB[dbName] = blobCount
|
||||
result.Archive.TotalBlobCount += blobCount
|
||||
if blobCount > 1000 {
|
||||
result.Archive.HasLargeBlobs = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// For SQL format, try to estimate from file content (sample check)
|
||||
if strings.HasSuffix(entry.Name(), ".sql.gz") {
|
||||
// Check for lo_create patterns in compressed SQL
|
||||
blobCount := e.estimateBlobsInSQL(dumpFile)
|
||||
if blobCount > 0 {
|
||||
result.Archive.BlobsByDB[dbName] = blobCount
|
||||
result.Archive.TotalBlobCount += blobCount
|
||||
if blobCount > 1000 {
|
||||
result.Archive.HasLargeBlobs = true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// countBlobsInDump uses pg_restore -l to count BLOB entries
|
||||
func (e *Engine) countBlobsInDump(ctx context.Context, dumpFile string) int {
|
||||
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
cmd := exec.CommandContext(ctx, "pg_restore", "-l", dumpFile)
|
||||
output, err := cmd.Output()
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
|
||||
// Count lines containing BLOB/LARGE OBJECT
|
||||
count := 0
|
||||
for _, line := range strings.Split(string(output), "\n") {
|
||||
if strings.Contains(line, "BLOB") || strings.Contains(line, "LARGE OBJECT") {
|
||||
count++
|
||||
}
|
||||
}
|
||||
return count
|
||||
}
|
||||
|
||||
// estimateBlobsInSQL samples compressed SQL for lo_create patterns
|
||||
func (e *Engine) estimateBlobsInSQL(sqlFile string) int {
|
||||
// Use zgrep for efficient searching in gzipped files
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
// Count lo_create calls (each = one large object)
|
||||
cmd := exec.CommandContext(ctx, "zgrep", "-c", "lo_create", sqlFile)
|
||||
output, err := cmd.Output()
|
||||
if err != nil {
|
||||
// Also try SELECT lo_create pattern
|
||||
cmd2 := exec.CommandContext(ctx, "zgrep", "-c", "SELECT.*lo_create", sqlFile)
|
||||
output, err = cmd2.Output()
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
||||
count, _ := strconv.Atoi(strings.TrimSpace(string(output)))
|
||||
return count
|
||||
}
|
||||
|
||||
// calculateRecommendations determines optimal settings based on analysis
|
||||
func (e *Engine) calculateRecommendations(result *PreflightResult) {
|
||||
// Base lock boost
|
||||
lockBoost := 2048
|
||||
|
||||
// Scale up based on BLOB count
|
||||
if result.Archive.TotalBlobCount > 5000 {
|
||||
lockBoost = 4096
|
||||
}
|
||||
if result.Archive.TotalBlobCount > 10000 {
|
||||
lockBoost = 8192
|
||||
}
|
||||
if result.Archive.TotalBlobCount > 50000 {
|
||||
lockBoost = 16384
|
||||
}
|
||||
|
||||
// Cap at reasonable maximum
|
||||
if lockBoost > 16384 {
|
||||
lockBoost = 16384
|
||||
}
|
||||
|
||||
result.Archive.RecommendedLockBoost = lockBoost
|
||||
|
||||
// Log recommendation
|
||||
e.log.Info("Calculated recommended lock boost",
|
||||
"total_blobs", result.Archive.TotalBlobCount,
|
||||
"recommended_locks", lockBoost)
|
||||
}
|
||||
|
||||
// printPreflightSummary prints a nice summary of all checks
|
||||
func (e *Engine) printPreflightSummary(result *PreflightResult) {
|
||||
fmt.Println()
|
||||
fmt.Println(strings.Repeat("─", 60))
|
||||
fmt.Println(" PREFLIGHT CHECKS")
|
||||
fmt.Println(strings.Repeat("─", 60))
|
||||
|
||||
// Linux checks
|
||||
if result.Linux.IsLinux {
|
||||
fmt.Println("\n Linux System:")
|
||||
printCheck("shmmax", formatBytesLong(result.Linux.ShmMax), result.Linux.ShmMaxOK || result.Linux.ShmMax == 0)
|
||||
printCheck("shmall", fmt.Sprintf("%d pages", result.Linux.ShmAll), result.Linux.ShmAllOK || result.Linux.ShmAll == 0)
|
||||
printCheck("Available RAM", formatBytesLong(result.Linux.MemAvailable), result.Linux.MemAvailableOK || result.Linux.MemAvailable == 0)
|
||||
}
|
||||
|
||||
// PostgreSQL checks
|
||||
fmt.Println("\n PostgreSQL:")
|
||||
printCheck("Version", result.PostgreSQL.Version, true)
|
||||
printCheck("max_locks_per_transaction", fmt.Sprintf("%d → %d (auto-boost)",
|
||||
result.PostgreSQL.MaxLocksPerTransaction, result.Archive.RecommendedLockBoost),
|
||||
true)
|
||||
printCheck("maintenance_work_mem", fmt.Sprintf("%s → 2GB (auto-boost)",
|
||||
result.PostgreSQL.MaintenanceWorkMem), true)
|
||||
printInfo("shared_buffers", result.PostgreSQL.SharedBuffers)
|
||||
printCheck("Superuser", fmt.Sprintf("%v", result.PostgreSQL.IsSuperuser), result.PostgreSQL.IsSuperuser)
|
||||
|
||||
// Archive analysis
|
||||
fmt.Println("\n Archive Analysis:")
|
||||
printInfo("Total databases", fmt.Sprintf("%d", result.Archive.TotalDatabases))
|
||||
printInfo("Total BLOBs detected", fmt.Sprintf("%d", result.Archive.TotalBlobCount))
|
||||
if len(result.Archive.BlobsByDB) > 0 {
|
||||
fmt.Println(" Databases with BLOBs:")
|
||||
for db, count := range result.Archive.BlobsByDB {
|
||||
status := "✓"
|
||||
if count > 1000 {
|
||||
status = "⚠"
|
||||
}
|
||||
fmt.Printf(" %s %s: %d BLOBs\n", status, db, count)
|
||||
}
|
||||
}
|
||||
|
||||
// Warnings
|
||||
if len(result.Warnings) > 0 {
|
||||
fmt.Println("\n ⚠ Warnings:")
|
||||
for _, w := range result.Warnings {
|
||||
fmt.Printf(" • %s\n", w)
|
||||
}
|
||||
}
|
||||
|
||||
fmt.Println(strings.Repeat("─", 60))
|
||||
fmt.Println()
|
||||
}
|
||||
|
||||
func printCheck(name, value string, ok bool) {
|
||||
status := "✓"
|
||||
if !ok {
|
||||
status = "⚠"
|
||||
}
|
||||
fmt.Printf(" %s %s: %s\n", status, name, value)
|
||||
}
|
||||
|
||||
func printInfo(name, value string) {
|
||||
fmt.Printf(" ℹ %s: %s\n", name, value)
|
||||
}
|
||||
|
||||
// formatBytesLong is a local formatting helper for preflight display
|
||||
func formatBytesLong(bytes int64) string {
|
||||
if bytes == 0 {
|
||||
return "unknown"
|
||||
}
|
||||
const unit = 1024
|
||||
if bytes < unit {
|
||||
return fmt.Sprintf("%d B", bytes)
|
||||
}
|
||||
div, exp := int64(unit), 0
|
||||
for n := bytes / unit; n >= unit; n /= unit {
|
||||
div *= unit
|
||||
exp++
|
||||
}
|
||||
return fmt.Sprintf("%.1f %cB", float64(bytes)/float64(div), "KMGTPE"[exp])
|
||||
}
|
||||
|
||||
func parseMemoryToMB(memStr string) int {
|
||||
memStr = strings.ToUpper(strings.TrimSpace(memStr))
|
||||
var value int
|
||||
var unit string
|
||||
fmt.Sscanf(memStr, "%d%s", &value, &unit)
|
||||
|
||||
switch {
|
||||
case strings.HasPrefix(unit, "G"):
|
||||
return value * 1024
|
||||
case strings.HasPrefix(unit, "M"):
|
||||
return value
|
||||
case strings.HasPrefix(unit, "K"):
|
||||
return value / 1024
|
||||
default:
|
||||
return value / (1024 * 1024) // Assume bytes
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Engine) buildConnString() string {
|
||||
if e.cfg.Host == "localhost" || e.cfg.Host == "" {
|
||||
return fmt.Sprintf("user=%s password=%s dbname=postgres sslmode=disable",
|
||||
e.cfg.User, e.cfg.Password)
|
||||
}
|
||||
return fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=postgres sslmode=disable",
|
||||
e.cfg.Host, e.cfg.Port, e.cfg.User, e.cfg.Password)
|
||||
}
|
||||
@@ -229,8 +229,14 @@ func containsSQLKeywords(content string) bool {
|
||||
}
|
||||
|
||||
// CheckDiskSpace verifies sufficient disk space for restore
|
||||
// Uses the effective work directory (WorkDir if set, otherwise BackupDir) since
|
||||
// that's where extraction actually happens for large databases
|
||||
func (s *Safety) CheckDiskSpace(archivePath string, multiplier float64) error {
|
||||
return s.CheckDiskSpaceAt(archivePath, s.cfg.BackupDir, multiplier)
|
||||
checkDir := s.cfg.GetEffectiveWorkDir()
|
||||
if checkDir == "" {
|
||||
checkDir = s.cfg.BackupDir
|
||||
}
|
||||
return s.CheckDiskSpaceAt(archivePath, checkDir, multiplier)
|
||||
}
|
||||
|
||||
// CheckDiskSpaceAt verifies sufficient disk space at a specific directory
|
||||
|
||||
@@ -106,9 +106,23 @@ type safetyCheckCompleteMsg struct {
|
||||
|
||||
func runSafetyChecks(cfg *config.Config, log logger.Logger, archive ArchiveInfo, targetDB string) tea.Cmd {
|
||||
return func() tea.Msg {
|
||||
// 10 minutes for safety checks - large archives can take a long time to diagnose
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
|
||||
// Dynamic timeout based on archive size for large database support
|
||||
// Base: 10 minutes + 1 minute per 5 GB, max 120 minutes
|
||||
timeoutMinutes := 10
|
||||
if archive.Size > 0 {
|
||||
sizeGB := archive.Size / (1024 * 1024 * 1024)
|
||||
estimatedMinutes := int(sizeGB/5) + 10
|
||||
if estimatedMinutes > timeoutMinutes {
|
||||
timeoutMinutes = estimatedMinutes
|
||||
}
|
||||
if timeoutMinutes > 120 {
|
||||
timeoutMinutes = 120
|
||||
}
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeoutMinutes)*time.Minute)
|
||||
defer cancel()
|
||||
_ = ctx // Used by database checks below
|
||||
|
||||
safety := restore.NewSafety(cfg, log)
|
||||
checks := []SafetyCheck{}
|
||||
|
||||
Reference in New Issue
Block a user