Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 0f7d2bf7c6 | |||
| dee0273e6a | |||
| 89769137ad | |||
| 272b0730a8 | |||
| 487293dfc9 |
@ -4,8 +4,8 @@ This directory contains pre-compiled binaries for the DB Backup Tool across mult
|
||||
|
||||
## Build Information
|
||||
- **Version**: 3.42.81
|
||||
- **Build Time**: 2026-01-23_08:43:59_UTC
|
||||
- **Git Commit**: 03e9cd8
|
||||
- **Build Time**: 2026-01-23_09:14:44_UTC
|
||||
- **Git Commit**: dee0273
|
||||
|
||||
## Recent Updates (v1.1.0)
|
||||
- ✅ Fixed TUI progress display with line-by-line output
|
||||
|
||||
4
go.mod
4
go.mod
@ -83,6 +83,8 @@ require (
|
||||
github.com/jackc/pgpassfile v1.0.0 // indirect
|
||||
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
|
||||
github.com/jackc/puddle/v2 v2.2.2 // indirect
|
||||
github.com/klauspost/compress v1.18.3 // indirect
|
||||
github.com/klauspost/pgzip v1.2.6 // indirect
|
||||
github.com/lucasb-eyer/go-colorful v1.2.0 // indirect
|
||||
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
|
||||
github.com/mattn/go-colorable v0.1.13 // indirect
|
||||
@ -115,7 +117,7 @@ require (
|
||||
go.opentelemetry.io/otel/trace v1.37.0 // indirect
|
||||
golang.org/x/net v0.46.0 // indirect
|
||||
golang.org/x/oauth2 v0.33.0 // indirect
|
||||
golang.org/x/sync v0.18.0 // indirect
|
||||
golang.org/x/sync v0.19.0 // indirect
|
||||
golang.org/x/sys v0.38.0 // indirect
|
||||
golang.org/x/term v0.36.0 // indirect
|
||||
golang.org/x/text v0.30.0 // indirect
|
||||
|
||||
6
go.sum
6
go.sum
@ -167,6 +167,10 @@ github.com/jackc/pgx/v5 v5.7.6 h1:rWQc5FwZSPX58r1OQmkuaNicxdmExaEz5A2DO2hUuTk=
|
||||
github.com/jackc/pgx/v5 v5.7.6/go.mod h1:aruU7o91Tc2q2cFp5h4uP3f6ztExVpyVv88Xl/8Vl8M=
|
||||
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
|
||||
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
|
||||
github.com/klauspost/compress v1.18.3 h1:9PJRvfbmTabkOX8moIpXPbMMbYN60bWImDDU7L+/6zw=
|
||||
github.com/klauspost/compress v1.18.3/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4=
|
||||
github.com/klauspost/pgzip v1.2.6 h1:8RXeL5crjEUFnR2/Sn6GJNWtSQ3Dk8pq4CL3jvdDyjU=
|
||||
github.com/klauspost/pgzip v1.2.6/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs=
|
||||
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
|
||||
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
|
||||
github.com/lucasb-eyer/go-colorful v1.2.0 h1:1nnpGOrhyZZuNyfu1QjKiUICQ74+3FNCN69Aj6K7nkY=
|
||||
@ -264,6 +268,8 @@ golang.org/x/oauth2 v0.33.0 h1:4Q+qn+E5z8gPRJfmRy7C2gGG3T4jIprK6aSYgTXGRpo=
|
||||
golang.org/x/oauth2 v0.33.0/go.mod h1:lzm5WQJQwKZ3nwavOZ3IS5Aulzxi68dUSgRHujetwEA=
|
||||
golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I=
|
||||
golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
|
||||
golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4=
|
||||
golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
|
||||
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
|
||||
@ -20,6 +20,7 @@ import (
|
||||
"dbbackup/internal/cloud"
|
||||
"dbbackup/internal/config"
|
||||
"dbbackup/internal/database"
|
||||
"dbbackup/internal/fs"
|
||||
"dbbackup/internal/logger"
|
||||
"dbbackup/internal/metadata"
|
||||
"dbbackup/internal/metrics"
|
||||
@ -960,117 +961,26 @@ func (e *Engine) backupGlobals(ctx context.Context, tempDir string) error {
|
||||
return os.WriteFile(globalsFile, output, 0644)
|
||||
}
|
||||
|
||||
// createArchive creates a compressed tar archive
|
||||
// createArchive creates a compressed tar archive using parallel gzip compression
|
||||
// Uses in-process pgzip for 2-4x faster compression on multi-core systems
|
||||
func (e *Engine) createArchive(ctx context.Context, sourceDir, outputFile string) error {
|
||||
// Use pigz for faster parallel compression if available, otherwise use standard gzip
|
||||
compressCmd := "tar"
|
||||
compressArgs := []string{"-czf", outputFile, "-C", sourceDir, "."}
|
||||
e.log.Debug("Creating archive with parallel compression",
|
||||
"source", sourceDir,
|
||||
"output", outputFile,
|
||||
"compression", e.cfg.CompressionLevel)
|
||||
|
||||
// Check if pigz is available for faster parallel compression
|
||||
if _, err := exec.LookPath("pigz"); err == nil {
|
||||
// Use pigz with number of cores for parallel compression
|
||||
compressArgs = []string{"-cf", "-", "-C", sourceDir, "."}
|
||||
cmd := exec.CommandContext(ctx, "tar", compressArgs...)
|
||||
|
||||
// Create output file
|
||||
outFile, err := os.Create(outputFile)
|
||||
if err != nil {
|
||||
// Fallback to regular tar
|
||||
goto regularTar
|
||||
// Use in-process parallel compression with pgzip
|
||||
err := fs.CreateTarGzParallel(ctx, sourceDir, outputFile, e.cfg.CompressionLevel, func(progress fs.CreateProgress) {
|
||||
// Optional: log progress for large archives
|
||||
if progress.FilesCount%100 == 0 && progress.FilesCount > 0 {
|
||||
e.log.Debug("Archive progress", "files", progress.FilesCount, "bytes", progress.BytesWritten)
|
||||
}
|
||||
defer outFile.Close()
|
||||
})
|
||||
|
||||
// Pipe to pigz for parallel compression
|
||||
pigzCmd := exec.CommandContext(ctx, "pigz", "-p", strconv.Itoa(e.cfg.Jobs))
|
||||
|
||||
tarOut, err := cmd.StdoutPipe()
|
||||
if err != nil {
|
||||
outFile.Close()
|
||||
// Fallback to regular tar
|
||||
goto regularTar
|
||||
}
|
||||
pigzCmd.Stdin = tarOut
|
||||
pigzCmd.Stdout = outFile
|
||||
|
||||
// Start both commands
|
||||
if err := pigzCmd.Start(); err != nil {
|
||||
outFile.Close()
|
||||
goto regularTar
|
||||
}
|
||||
if err := cmd.Start(); err != nil {
|
||||
pigzCmd.Process.Kill()
|
||||
outFile.Close()
|
||||
goto regularTar
|
||||
}
|
||||
|
||||
// Wait for tar with proper context handling
|
||||
tarDone := make(chan error, 1)
|
||||
go func() {
|
||||
tarDone <- cmd.Wait()
|
||||
}()
|
||||
|
||||
var tarErr error
|
||||
select {
|
||||
case tarErr = <-tarDone:
|
||||
// tar completed
|
||||
case <-ctx.Done():
|
||||
e.log.Warn("Archive creation cancelled - killing processes")
|
||||
cmd.Process.Kill()
|
||||
pigzCmd.Process.Kill()
|
||||
<-tarDone
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
if tarErr != nil {
|
||||
pigzCmd.Process.Kill()
|
||||
return fmt.Errorf("tar failed: %w", tarErr)
|
||||
}
|
||||
|
||||
// Wait for pigz with proper context handling
|
||||
pigzDone := make(chan error, 1)
|
||||
go func() {
|
||||
pigzDone <- pigzCmd.Wait()
|
||||
}()
|
||||
|
||||
var pigzErr error
|
||||
select {
|
||||
case pigzErr = <-pigzDone:
|
||||
case <-ctx.Done():
|
||||
pigzCmd.Process.Kill()
|
||||
<-pigzDone
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
if pigzErr != nil {
|
||||
return fmt.Errorf("pigz compression failed: %w", pigzErr)
|
||||
}
|
||||
return nil
|
||||
if err != nil {
|
||||
return fmt.Errorf("parallel archive creation failed: %w", err)
|
||||
}
|
||||
|
||||
regularTar:
|
||||
// Standard tar with gzip (fallback)
|
||||
cmd := exec.CommandContext(ctx, compressCmd, compressArgs...)
|
||||
|
||||
// Stream stderr to avoid memory issues
|
||||
// Use io.Copy to ensure goroutine completes when pipe closes
|
||||
stderr, err := cmd.StderrPipe()
|
||||
if err == nil {
|
||||
go func() {
|
||||
scanner := bufio.NewScanner(stderr)
|
||||
for scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
if line != "" {
|
||||
e.log.Debug("Archive creation", "output", line)
|
||||
}
|
||||
}
|
||||
// Scanner will exit when stderr pipe closes after cmd.Wait()
|
||||
}()
|
||||
}
|
||||
|
||||
if err := cmd.Run(); err != nil {
|
||||
return fmt.Errorf("tar failed: %w", err)
|
||||
}
|
||||
// cmd.Run() calls Wait() which closes stderr pipe, terminating the goroutine
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
333
internal/fs/extract.go
Normal file
333
internal/fs/extract.go
Normal file
@ -0,0 +1,333 @@
|
||||
// Package fs provides parallel tar.gz extraction using pgzip
|
||||
package fs
|
||||
|
||||
import (
|
||||
"archive/tar"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"strings"
|
||||
|
||||
"github.com/klauspost/pgzip"
|
||||
)
|
||||
|
||||
// ExtractProgress reports extraction progress
|
||||
type ExtractProgress struct {
|
||||
CurrentFile string
|
||||
BytesRead int64
|
||||
TotalBytes int64
|
||||
FilesCount int
|
||||
CurrentIndex int
|
||||
}
|
||||
|
||||
// ProgressCallback is called during extraction
|
||||
type ProgressCallback func(progress ExtractProgress)
|
||||
|
||||
// ExtractTarGzParallel extracts a tar.gz archive using parallel gzip decompression
|
||||
// This is 2-4x faster than standard gzip on multi-core systems
|
||||
// Uses pgzip which decompresses in parallel using multiple goroutines
|
||||
func ExtractTarGzParallel(ctx context.Context, archivePath, destDir string, progressCb ProgressCallback) error {
|
||||
// Open the archive
|
||||
file, err := os.Open(archivePath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot open archive: %w", err)
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
// Get file size for progress
|
||||
stat, err := file.Stat()
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot stat archive: %w", err)
|
||||
}
|
||||
totalSize := stat.Size()
|
||||
|
||||
// Create parallel gzip reader
|
||||
// Uses all available CPU cores for decompression
|
||||
gzReader, err := pgzip.NewReaderN(file, 1<<20, runtime.NumCPU()) // 1MB blocks
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot create gzip reader: %w", err)
|
||||
}
|
||||
defer gzReader.Close()
|
||||
|
||||
// Create tar reader
|
||||
tarReader := tar.NewReader(gzReader)
|
||||
|
||||
// Track progress
|
||||
var bytesRead int64
|
||||
var filesCount int
|
||||
|
||||
// Extract each file
|
||||
for {
|
||||
// Check context
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
header, err := tarReader.Next()
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("error reading tar: %w", err)
|
||||
}
|
||||
|
||||
// Security: prevent path traversal
|
||||
targetPath := filepath.Join(destDir, header.Name)
|
||||
if !strings.HasPrefix(filepath.Clean(targetPath), filepath.Clean(destDir)) {
|
||||
return fmt.Errorf("path traversal detected: %s", header.Name)
|
||||
}
|
||||
|
||||
filesCount++
|
||||
|
||||
// Report progress
|
||||
if progressCb != nil {
|
||||
// Estimate bytes read from file position
|
||||
pos, _ := file.Seek(0, io.SeekCurrent)
|
||||
progressCb(ExtractProgress{
|
||||
CurrentFile: header.Name,
|
||||
BytesRead: pos,
|
||||
TotalBytes: totalSize,
|
||||
FilesCount: filesCount,
|
||||
CurrentIndex: filesCount,
|
||||
})
|
||||
}
|
||||
|
||||
switch header.Typeflag {
|
||||
case tar.TypeDir:
|
||||
if err := os.MkdirAll(targetPath, 0700); err != nil {
|
||||
return fmt.Errorf("cannot create directory %s: %w", targetPath, err)
|
||||
}
|
||||
|
||||
case tar.TypeReg:
|
||||
// Ensure parent directory exists
|
||||
if err := os.MkdirAll(filepath.Dir(targetPath), 0700); err != nil {
|
||||
return fmt.Errorf("cannot create parent directory: %w", err)
|
||||
}
|
||||
|
||||
// Create file with secure permissions
|
||||
outFile, err := os.OpenFile(targetPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot create file %s: %w", targetPath, err)
|
||||
}
|
||||
|
||||
// Copy with size limit to prevent zip bombs
|
||||
written, err := io.Copy(outFile, tarReader)
|
||||
outFile.Close()
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("error writing %s: %w", targetPath, err)
|
||||
}
|
||||
|
||||
bytesRead += written
|
||||
|
||||
case tar.TypeSymlink:
|
||||
// Handle symlinks (validate target is within destDir)
|
||||
linkTarget := header.Linkname
|
||||
absTarget := filepath.Join(filepath.Dir(targetPath), linkTarget)
|
||||
if !strings.HasPrefix(filepath.Clean(absTarget), filepath.Clean(destDir)) {
|
||||
// Skip symlinks that point outside
|
||||
continue
|
||||
}
|
||||
if err := os.Symlink(linkTarget, targetPath); err != nil {
|
||||
// Ignore symlink errors (may not be supported)
|
||||
continue
|
||||
}
|
||||
|
||||
default:
|
||||
// Skip other types (devices, etc.)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ExtractTarGzFast is a convenience wrapper that chooses the best extraction method
|
||||
// Uses parallel gzip if available, falls back to system tar if needed
|
||||
func ExtractTarGzFast(ctx context.Context, archivePath, destDir string, progressCb ProgressCallback) error {
|
||||
// Always use parallel Go implementation - it's faster and more portable
|
||||
return ExtractTarGzParallel(ctx, archivePath, destDir, progressCb)
|
||||
}
|
||||
|
||||
// CreateProgress reports archive creation progress
|
||||
type CreateProgress struct {
|
||||
CurrentFile string
|
||||
BytesWritten int64
|
||||
FilesCount int
|
||||
}
|
||||
|
||||
// CreateProgressCallback is called during archive creation
|
||||
type CreateProgressCallback func(progress CreateProgress)
|
||||
|
||||
// CreateTarGzParallel creates a tar.gz archive using parallel gzip compression
|
||||
// This is 2-4x faster than standard gzip on multi-core systems
|
||||
// Uses pgzip which compresses in parallel using multiple goroutines
|
||||
func CreateTarGzParallel(ctx context.Context, sourceDir, outputPath string, compressionLevel int, progressCb CreateProgressCallback) error {
|
||||
// Create output file
|
||||
outFile, err := os.Create(outputPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot create archive: %w", err)
|
||||
}
|
||||
defer outFile.Close()
|
||||
|
||||
// Create parallel gzip writer
|
||||
// Uses all available CPU cores for compression
|
||||
gzWriter, err := pgzip.NewWriterLevel(outFile, compressionLevel)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot create gzip writer: %w", err)
|
||||
}
|
||||
// Set block size and concurrency for parallel compression
|
||||
if err := gzWriter.SetConcurrency(1<<20, runtime.NumCPU()); err != nil {
|
||||
// Non-fatal, continue with defaults
|
||||
}
|
||||
defer gzWriter.Close()
|
||||
|
||||
// Create tar writer
|
||||
tarWriter := tar.NewWriter(gzWriter)
|
||||
defer tarWriter.Close()
|
||||
|
||||
var bytesWritten int64
|
||||
var filesCount int
|
||||
|
||||
// Walk the source directory
|
||||
err = filepath.Walk(sourceDir, func(path string, info os.FileInfo, err error) error {
|
||||
// Check for cancellation
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Get relative path
|
||||
relPath, err := filepath.Rel(sourceDir, path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Skip the root directory itself
|
||||
if relPath == "." {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Create tar header
|
||||
header, err := tar.FileInfoHeader(info, "")
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot create header for %s: %w", relPath, err)
|
||||
}
|
||||
|
||||
// Use relative path in archive
|
||||
header.Name = relPath
|
||||
|
||||
// Handle symlinks
|
||||
if info.Mode()&os.ModeSymlink != 0 {
|
||||
link, err := os.Readlink(path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot read symlink %s: %w", path, err)
|
||||
}
|
||||
header.Linkname = link
|
||||
}
|
||||
|
||||
// Write header
|
||||
if err := tarWriter.WriteHeader(header); err != nil {
|
||||
return fmt.Errorf("cannot write header for %s: %w", relPath, err)
|
||||
}
|
||||
|
||||
// If it's a regular file, write its contents
|
||||
if info.Mode().IsRegular() {
|
||||
file, err := os.Open(path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot open %s: %w", path, err)
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
written, err := io.Copy(tarWriter, file)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot write %s: %w", path, err)
|
||||
}
|
||||
bytesWritten += written
|
||||
}
|
||||
|
||||
filesCount++
|
||||
|
||||
// Report progress
|
||||
if progressCb != nil {
|
||||
progressCb(CreateProgress{
|
||||
CurrentFile: relPath,
|
||||
BytesWritten: bytesWritten,
|
||||
FilesCount: filesCount,
|
||||
})
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
// Clean up partial file on error
|
||||
outFile.Close()
|
||||
os.Remove(outputPath)
|
||||
return err
|
||||
}
|
||||
|
||||
// Explicitly close tar and gzip to flush all data
|
||||
if err := tarWriter.Close(); err != nil {
|
||||
return fmt.Errorf("cannot close tar writer: %w", err)
|
||||
}
|
||||
if err := gzWriter.Close(); err != nil {
|
||||
return fmt.Errorf("cannot close gzip writer: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// EstimateCompressionRatio samples the archive to estimate uncompressed size
|
||||
// Returns a multiplier (e.g., 3.0 means uncompressed is ~3x the compressed size)
|
||||
func EstimateCompressionRatio(archivePath string) (float64, error) {
|
||||
file, err := os.Open(archivePath)
|
||||
if err != nil {
|
||||
return 3.0, err // Default to 3x
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
// Get compressed size
|
||||
stat, err := file.Stat()
|
||||
if err != nil {
|
||||
return 3.0, err
|
||||
}
|
||||
compressedSize := stat.Size()
|
||||
|
||||
// Read first 1MB and measure decompression ratio
|
||||
gzReader, err := pgzip.NewReader(file)
|
||||
if err != nil {
|
||||
return 3.0, err
|
||||
}
|
||||
defer gzReader.Close()
|
||||
|
||||
// Read up to 1MB of decompressed data
|
||||
buf := make([]byte, 1<<20)
|
||||
n, _ := io.ReadFull(gzReader, buf)
|
||||
|
||||
if n < 1024 {
|
||||
return 3.0, nil // Not enough data, use default
|
||||
}
|
||||
|
||||
// Estimate: decompressed / compressed
|
||||
// Based on sample of first 1MB
|
||||
compressedPortion := float64(compressedSize) * (float64(n) / float64(compressedSize))
|
||||
if compressedPortion > 0 {
|
||||
ratio := float64(n) / compressedPortion
|
||||
if ratio > 1.0 && ratio < 20.0 {
|
||||
return ratio, nil
|
||||
}
|
||||
}
|
||||
|
||||
return 3.0, nil // Default
|
||||
}
|
||||
@ -130,6 +130,7 @@ func (m *TmpfsManager) GetBestTmpfs(minFreeGB int) *TmpfsInfo {
|
||||
|
||||
// GetTempDir returns a temp directory on tmpfs if available
|
||||
// Falls back to os.TempDir() if no suitable tmpfs found
|
||||
// Uses secure permissions (0700) to prevent other users from reading sensitive data
|
||||
func (m *TmpfsManager) GetTempDir(subdir string, minFreeGB int) (string, bool) {
|
||||
best := m.GetBestTmpfs(minFreeGB)
|
||||
if best == nil {
|
||||
@ -137,13 +138,16 @@ func (m *TmpfsManager) GetTempDir(subdir string, minFreeGB int) (string, bool) {
|
||||
return filepath.Join(os.TempDir(), subdir), false
|
||||
}
|
||||
|
||||
// Create subdir on tmpfs
|
||||
// Create subdir on tmpfs with secure permissions (0700 = owner-only)
|
||||
dir := filepath.Join(best.MountPoint, subdir)
|
||||
if err := os.MkdirAll(dir, 0755); err != nil {
|
||||
if err := os.MkdirAll(dir, 0700); err != nil {
|
||||
// Fallback if we can't create
|
||||
return filepath.Join(os.TempDir(), subdir), false
|
||||
}
|
||||
|
||||
// Ensure permissions are correct even if dir already existed
|
||||
os.Chmod(dir, 0700)
|
||||
|
||||
return dir, true
|
||||
}
|
||||
|
||||
@ -279,3 +283,38 @@ func GetMemoryStatus() (*MemoryStatus, error) {
|
||||
|
||||
return status, nil
|
||||
}
|
||||
|
||||
// SecureMkdirTemp creates a temporary directory with secure permissions (0700)
|
||||
// This prevents other users from reading sensitive database dump contents
|
||||
// Uses the specified baseDir, or os.TempDir() if empty
|
||||
func SecureMkdirTemp(baseDir, pattern string) (string, error) {
|
||||
if baseDir == "" {
|
||||
baseDir = os.TempDir()
|
||||
}
|
||||
|
||||
// Use os.MkdirTemp for unique naming
|
||||
dir, err := os.MkdirTemp(baseDir, pattern)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// Ensure secure permissions (0700 = owner read/write/execute only)
|
||||
if err := os.Chmod(dir, 0700); err != nil {
|
||||
// Try to clean up if we can't secure it
|
||||
os.Remove(dir)
|
||||
return "", fmt.Errorf("cannot set secure permissions: %w", err)
|
||||
}
|
||||
|
||||
return dir, nil
|
||||
}
|
||||
|
||||
// SecureWriteFile writes content to a file with secure permissions (0600)
|
||||
// This prevents other users from reading sensitive data
|
||||
func SecureWriteFile(filename string, data []byte) error {
|
||||
// Write with restrictive permissions
|
||||
if err := os.WriteFile(filename, data, 0600); err != nil {
|
||||
return err
|
||||
}
|
||||
// Ensure permissions are correct
|
||||
return os.Chmod(filename, 0600)
|
||||
}
|
||||
|
||||
@ -10,6 +10,7 @@ import (
|
||||
"time"
|
||||
|
||||
"dbbackup/internal/config"
|
||||
"dbbackup/internal/fs"
|
||||
"dbbackup/internal/logger"
|
||||
)
|
||||
|
||||
@ -226,15 +227,18 @@ func (ro *RestoreOrchestrator) extractBaseBackup(ctx context.Context, opts *Rest
|
||||
return fmt.Errorf("unsupported backup format: %s (expected .tar.gz, .tar, or directory)", backupPath)
|
||||
}
|
||||
|
||||
// extractTarGzBackup extracts a .tar.gz backup
|
||||
// extractTarGzBackup extracts a .tar.gz backup using parallel gzip
|
||||
func (ro *RestoreOrchestrator) extractTarGzBackup(ctx context.Context, source, dest string) error {
|
||||
ro.log.Info("Extracting tar.gz backup...")
|
||||
ro.log.Info("Extracting tar.gz backup with parallel gzip...")
|
||||
|
||||
cmd := exec.CommandContext(ctx, "tar", "-xzf", source, "-C", dest)
|
||||
cmd.Stdout = os.Stdout
|
||||
cmd.Stderr = os.Stderr
|
||||
|
||||
if err := cmd.Run(); err != nil {
|
||||
// Use parallel extraction (2-4x faster on multi-core)
|
||||
err := fs.ExtractTarGzParallel(ctx, source, dest, func(progress fs.ExtractProgress) {
|
||||
if progress.TotalBytes > 0 && progress.FilesCount%100 == 0 {
|
||||
pct := float64(progress.BytesRead) / float64(progress.TotalBytes) * 100
|
||||
ro.log.Debug("Extraction progress", "percent", fmt.Sprintf("%.1f%%", pct))
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("tar extraction failed: %w", err)
|
||||
}
|
||||
|
||||
|
||||
@ -28,6 +28,28 @@ var PhaseWeights = map[Phase]int{
|
||||
PhaseVerifying: 5,
|
||||
}
|
||||
|
||||
// ProgressSnapshot is a mutex-free copy of progress state for safe reading
|
||||
type ProgressSnapshot struct {
|
||||
Operation string
|
||||
ArchiveFile string
|
||||
Phase Phase
|
||||
ExtractBytes int64
|
||||
ExtractTotal int64
|
||||
DatabasesDone int
|
||||
DatabasesTotal int
|
||||
CurrentDB string
|
||||
CurrentDBBytes int64
|
||||
CurrentDBTotal int64
|
||||
DatabaseSizes map[string]int64
|
||||
VerifyDone int
|
||||
VerifyTotal int
|
||||
StartTime time.Time
|
||||
PhaseStartTime time.Time
|
||||
LastUpdateTime time.Time
|
||||
DatabaseTimes []time.Duration
|
||||
Errors []string
|
||||
}
|
||||
|
||||
// UnifiedClusterProgress combines all progress states into one cohesive structure
|
||||
// This replaces multiple separate callbacks with a single comprehensive view
|
||||
type UnifiedClusterProgress struct {
|
||||
@ -282,22 +304,41 @@ func (p *UnifiedClusterProgress) GetETA() time.Duration {
|
||||
}
|
||||
|
||||
// GetSnapshot returns a copy of current state (thread-safe)
|
||||
func (p *UnifiedClusterProgress) GetSnapshot() UnifiedClusterProgress {
|
||||
// Returns a ProgressSnapshot without the mutex to avoid copy-lock issues
|
||||
func (p *UnifiedClusterProgress) GetSnapshot() ProgressSnapshot {
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
|
||||
snapshot := *p
|
||||
// Deep copy slices/maps
|
||||
snapshot.DatabaseTimes = make([]time.Duration, len(p.DatabaseTimes))
|
||||
copy(snapshot.DatabaseTimes, p.DatabaseTimes)
|
||||
snapshot.DatabaseSizes = make(map[string]int64)
|
||||
dbTimes := make([]time.Duration, len(p.DatabaseTimes))
|
||||
copy(dbTimes, p.DatabaseTimes)
|
||||
dbSizes := make(map[string]int64)
|
||||
for k, v := range p.DatabaseSizes {
|
||||
snapshot.DatabaseSizes[k] = v
|
||||
dbSizes[k] = v
|
||||
}
|
||||
snapshot.Errors = make([]string, len(p.Errors))
|
||||
copy(snapshot.Errors, p.Errors)
|
||||
errors := make([]string, len(p.Errors))
|
||||
copy(errors, p.Errors)
|
||||
|
||||
return snapshot
|
||||
return ProgressSnapshot{
|
||||
Operation: p.Operation,
|
||||
ArchiveFile: p.ArchiveFile,
|
||||
Phase: p.Phase,
|
||||
ExtractBytes: p.ExtractBytes,
|
||||
ExtractTotal: p.ExtractTotal,
|
||||
DatabasesDone: p.DatabasesDone,
|
||||
DatabasesTotal: p.DatabasesTotal,
|
||||
CurrentDB: p.CurrentDB,
|
||||
CurrentDBBytes: p.CurrentDBBytes,
|
||||
CurrentDBTotal: p.CurrentDBTotal,
|
||||
DatabaseSizes: dbSizes,
|
||||
VerifyDone: p.VerifyDone,
|
||||
VerifyTotal: p.VerifyTotal,
|
||||
StartTime: p.StartTime,
|
||||
PhaseStartTime: p.PhaseStartTime,
|
||||
LastUpdateTime: p.LastUpdateTime,
|
||||
DatabaseTimes: dbTimes,
|
||||
Errors: errors,
|
||||
}
|
||||
}
|
||||
|
||||
// FormatStatus returns a formatted status string
|
||||
|
||||
245
internal/restore/checkpoint.go
Normal file
245
internal/restore/checkpoint.go
Normal file
@ -0,0 +1,245 @@
|
||||
// Package restore provides checkpoint/resume capability for cluster restores
|
||||
package restore
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// RestoreCheckpoint tracks progress of a cluster restore for resume capability
|
||||
type RestoreCheckpoint struct {
|
||||
mu sync.RWMutex
|
||||
|
||||
// Archive identification
|
||||
ArchivePath string `json:"archive_path"`
|
||||
ArchiveSize int64 `json:"archive_size"`
|
||||
ArchiveMod time.Time `json:"archive_modified"`
|
||||
|
||||
// Progress tracking
|
||||
StartTime time.Time `json:"start_time"`
|
||||
LastUpdate time.Time `json:"last_update"`
|
||||
TotalDBs int `json:"total_dbs"`
|
||||
CompletedDBs []string `json:"completed_dbs"`
|
||||
FailedDBs map[string]string `json:"failed_dbs"` // db -> error message
|
||||
SkippedDBs []string `json:"skipped_dbs"`
|
||||
GlobalsDone bool `json:"globals_done"`
|
||||
ExtractedPath string `json:"extracted_path"` // Reuse extraction
|
||||
|
||||
// Config at start (for validation)
|
||||
Profile string `json:"profile"`
|
||||
CleanCluster bool `json:"clean_cluster"`
|
||||
ParallelDBs int `json:"parallel_dbs"`
|
||||
Jobs int `json:"jobs"`
|
||||
}
|
||||
|
||||
// CheckpointFile returns the checkpoint file path for an archive
|
||||
func CheckpointFile(archivePath, workDir string) string {
|
||||
archiveName := filepath.Base(archivePath)
|
||||
if workDir != "" {
|
||||
return filepath.Join(workDir, ".dbbackup-checkpoint-"+archiveName+".json")
|
||||
}
|
||||
return filepath.Join(os.TempDir(), ".dbbackup-checkpoint-"+archiveName+".json")
|
||||
}
|
||||
|
||||
// NewRestoreCheckpoint creates a new checkpoint for a cluster restore
|
||||
func NewRestoreCheckpoint(archivePath string, totalDBs int) *RestoreCheckpoint {
|
||||
stat, _ := os.Stat(archivePath)
|
||||
var size int64
|
||||
var mod time.Time
|
||||
if stat != nil {
|
||||
size = stat.Size()
|
||||
mod = stat.ModTime()
|
||||
}
|
||||
|
||||
return &RestoreCheckpoint{
|
||||
ArchivePath: archivePath,
|
||||
ArchiveSize: size,
|
||||
ArchiveMod: mod,
|
||||
StartTime: time.Now(),
|
||||
LastUpdate: time.Now(),
|
||||
TotalDBs: totalDBs,
|
||||
CompletedDBs: make([]string, 0),
|
||||
FailedDBs: make(map[string]string),
|
||||
SkippedDBs: make([]string, 0),
|
||||
}
|
||||
}
|
||||
|
||||
// LoadCheckpoint loads an existing checkpoint file
|
||||
func LoadCheckpoint(checkpointPath string) (*RestoreCheckpoint, error) {
|
||||
data, err := os.ReadFile(checkpointPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var cp RestoreCheckpoint
|
||||
if err := json.Unmarshal(data, &cp); err != nil {
|
||||
return nil, fmt.Errorf("invalid checkpoint file: %w", err)
|
||||
}
|
||||
|
||||
return &cp, nil
|
||||
}
|
||||
|
||||
// Save persists the checkpoint to disk
|
||||
func (cp *RestoreCheckpoint) Save(checkpointPath string) error {
|
||||
cp.mu.RLock()
|
||||
defer cp.mu.RUnlock()
|
||||
|
||||
cp.LastUpdate = time.Now()
|
||||
|
||||
data, err := json.MarshalIndent(cp, "", " ")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Write to temp file first, then rename (atomic)
|
||||
tmpPath := checkpointPath + ".tmp"
|
||||
if err := os.WriteFile(tmpPath, data, 0600); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return os.Rename(tmpPath, checkpointPath)
|
||||
}
|
||||
|
||||
// MarkGlobalsDone marks globals as restored
|
||||
func (cp *RestoreCheckpoint) MarkGlobalsDone() {
|
||||
cp.mu.Lock()
|
||||
defer cp.mu.Unlock()
|
||||
cp.GlobalsDone = true
|
||||
}
|
||||
|
||||
// MarkCompleted marks a database as successfully restored
|
||||
func (cp *RestoreCheckpoint) MarkCompleted(dbName string) {
|
||||
cp.mu.Lock()
|
||||
defer cp.mu.Unlock()
|
||||
|
||||
// Don't add duplicates
|
||||
for _, db := range cp.CompletedDBs {
|
||||
if db == dbName {
|
||||
return
|
||||
}
|
||||
}
|
||||
cp.CompletedDBs = append(cp.CompletedDBs, dbName)
|
||||
cp.LastUpdate = time.Now()
|
||||
}
|
||||
|
||||
// MarkFailed marks a database as failed with error message
|
||||
func (cp *RestoreCheckpoint) MarkFailed(dbName, errMsg string) {
|
||||
cp.mu.Lock()
|
||||
defer cp.mu.Unlock()
|
||||
cp.FailedDBs[dbName] = errMsg
|
||||
cp.LastUpdate = time.Now()
|
||||
}
|
||||
|
||||
// MarkSkipped marks a database as skipped (e.g., context cancelled)
|
||||
func (cp *RestoreCheckpoint) MarkSkipped(dbName string) {
|
||||
cp.mu.Lock()
|
||||
defer cp.mu.Unlock()
|
||||
cp.SkippedDBs = append(cp.SkippedDBs, dbName)
|
||||
}
|
||||
|
||||
// IsCompleted checks if a database was already restored
|
||||
func (cp *RestoreCheckpoint) IsCompleted(dbName string) bool {
|
||||
cp.mu.RLock()
|
||||
defer cp.mu.RUnlock()
|
||||
|
||||
for _, db := range cp.CompletedDBs {
|
||||
if db == dbName {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// IsFailed checks if a database previously failed
|
||||
func (cp *RestoreCheckpoint) IsFailed(dbName string) bool {
|
||||
cp.mu.RLock()
|
||||
defer cp.mu.RUnlock()
|
||||
_, failed := cp.FailedDBs[dbName]
|
||||
return failed
|
||||
}
|
||||
|
||||
// ValidateForResume checks if checkpoint is valid for resuming with given archive
|
||||
func (cp *RestoreCheckpoint) ValidateForResume(archivePath string) error {
|
||||
stat, err := os.Stat(archivePath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot stat archive: %w", err)
|
||||
}
|
||||
|
||||
// Check archive matches
|
||||
if stat.Size() != cp.ArchiveSize {
|
||||
return fmt.Errorf("archive size changed: checkpoint=%d, current=%d", cp.ArchiveSize, stat.Size())
|
||||
}
|
||||
|
||||
if !stat.ModTime().Equal(cp.ArchiveMod) {
|
||||
return fmt.Errorf("archive modified since checkpoint: checkpoint=%s, current=%s",
|
||||
cp.ArchiveMod.Format(time.RFC3339), stat.ModTime().Format(time.RFC3339))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Progress returns a human-readable progress string
|
||||
func (cp *RestoreCheckpoint) Progress() string {
|
||||
cp.mu.RLock()
|
||||
defer cp.mu.RUnlock()
|
||||
|
||||
completed := len(cp.CompletedDBs)
|
||||
failed := len(cp.FailedDBs)
|
||||
remaining := cp.TotalDBs - completed - failed
|
||||
|
||||
return fmt.Sprintf("%d/%d completed, %d failed, %d remaining",
|
||||
completed, cp.TotalDBs, failed, remaining)
|
||||
}
|
||||
|
||||
// RemainingDBs returns list of databases not yet completed or failed
|
||||
func (cp *RestoreCheckpoint) RemainingDBs(allDBs []string) []string {
|
||||
cp.mu.RLock()
|
||||
defer cp.mu.RUnlock()
|
||||
|
||||
remaining := make([]string, 0)
|
||||
for _, db := range allDBs {
|
||||
found := false
|
||||
for _, completed := range cp.CompletedDBs {
|
||||
if db == completed {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
if _, failed := cp.FailedDBs[db]; !failed {
|
||||
remaining = append(remaining, db)
|
||||
}
|
||||
}
|
||||
}
|
||||
return remaining
|
||||
}
|
||||
|
||||
// Delete removes the checkpoint file
|
||||
func (cp *RestoreCheckpoint) Delete(checkpointPath string) error {
|
||||
return os.Remove(checkpointPath)
|
||||
}
|
||||
|
||||
// Summary returns a summary of the checkpoint state
|
||||
func (cp *RestoreCheckpoint) Summary() string {
|
||||
cp.mu.RLock()
|
||||
defer cp.mu.RUnlock()
|
||||
|
||||
elapsed := time.Since(cp.StartTime)
|
||||
return fmt.Sprintf(
|
||||
"Restore checkpoint: %s\n"+
|
||||
" Started: %s (%s ago)\n"+
|
||||
" Globals: %v\n"+
|
||||
" Databases: %d/%d completed, %d failed\n"+
|
||||
" Last update: %s",
|
||||
filepath.Base(cp.ArchivePath),
|
||||
cp.StartTime.Format("2006-01-02 15:04:05"),
|
||||
elapsed.Round(time.Second),
|
||||
cp.GlobalsDone,
|
||||
len(cp.CompletedDBs), cp.TotalDBs, len(cp.FailedDBs),
|
||||
cp.LastUpdate.Format("2006-01-02 15:04:05"),
|
||||
)
|
||||
}
|
||||
@ -15,6 +15,7 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"dbbackup/internal/fs"
|
||||
"dbbackup/internal/logger"
|
||||
)
|
||||
|
||||
@ -782,8 +783,8 @@ func (d *Diagnoser) DiagnoseClusterDumps(archivePath, tempDir string) ([]*Diagno
|
||||
if stat, err := os.Stat(tempDir); err == nil && stat.IsDir() {
|
||||
// Try extraction of a small test file first with timeout
|
||||
testCtx, testCancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
testCmd := exec.CommandContext(testCtx, "tar", "-xzf", archivePath, "-C", tempDir, "--wildcards", "*.json", "--wildcards", "globals.sql")
|
||||
testCmd.Run() // Ignore error - just try to extract metadata
|
||||
testCmd := exec.CommandContext(testCtx, "tar", "-tzf", archivePath)
|
||||
testCmd.Run() // Ignore error - just test if archive is readable
|
||||
testCancel()
|
||||
}
|
||||
|
||||
@ -791,15 +792,12 @@ func (d *Diagnoser) DiagnoseClusterDumps(archivePath, tempDir string) ([]*Diagno
|
||||
d.log.Info("Archive listing successful", "files", len(files))
|
||||
}
|
||||
|
||||
// Try full extraction - NO TIMEOUT here as large archives can take a long time
|
||||
// Use a generous timeout (30 minutes) for very large archives
|
||||
// Try full extraction using parallel gzip (2-4x faster on multi-core)
|
||||
extractCtx, extractCancel := context.WithTimeout(context.Background(), 30*time.Minute)
|
||||
defer extractCancel()
|
||||
|
||||
cmd := exec.CommandContext(extractCtx, "tar", "-xzf", archivePath, "-C", tempDir)
|
||||
var stderr bytes.Buffer
|
||||
cmd.Stderr = &stderr
|
||||
if err := cmd.Run(); err != nil {
|
||||
err = fs.ExtractTarGzParallel(extractCtx, archivePath, tempDir, nil)
|
||||
if err != nil {
|
||||
// Extraction failed
|
||||
errResult := &DiagnoseResult{
|
||||
FilePath: archivePath,
|
||||
@ -810,7 +808,7 @@ func (d *Diagnoser) DiagnoseClusterDumps(archivePath, tempDir string) ([]*Diagno
|
||||
Details: &DiagnoseDetails{},
|
||||
}
|
||||
|
||||
errOutput := stderr.String()
|
||||
errOutput := err.Error()
|
||||
if strings.Contains(errOutput, "No space left") ||
|
||||
strings.Contains(errOutput, "cannot write") ||
|
||||
strings.Contains(errOutput, "Disk quota exceeded") {
|
||||
|
||||
@ -19,6 +19,7 @@ import (
|
||||
"dbbackup/internal/checks"
|
||||
"dbbackup/internal/config"
|
||||
"dbbackup/internal/database"
|
||||
"dbbackup/internal/fs"
|
||||
"dbbackup/internal/logger"
|
||||
"dbbackup/internal/progress"
|
||||
"dbbackup/internal/security"
|
||||
@ -1421,8 +1422,23 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string, preExtr
|
||||
continue
|
||||
}
|
||||
|
||||
// Check context before acquiring semaphore to prevent goroutine leak
|
||||
if ctx.Err() != nil {
|
||||
e.log.Warn("Context cancelled - stopping database restore scheduling")
|
||||
break
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
semaphore <- struct{}{} // Acquire
|
||||
|
||||
// Acquire semaphore with context awareness to prevent goroutine leak
|
||||
select {
|
||||
case semaphore <- struct{}{}:
|
||||
// Acquired, proceed
|
||||
case <-ctx.Done():
|
||||
wg.Done()
|
||||
e.log.Warn("Context cancelled while waiting for semaphore", "file", entry.Name())
|
||||
continue
|
||||
}
|
||||
|
||||
go func(idx int, filename string) {
|
||||
defer wg.Done()
|
||||
@ -1829,74 +1845,31 @@ func (pr *progressReader) Read(p []byte) (n int, err error) {
|
||||
return n, err
|
||||
}
|
||||
|
||||
// extractArchiveShell extracts using shell tar command (faster but no progress)
|
||||
// extractArchiveShell extracts using parallel gzip (2-4x faster on multi-core)
|
||||
func (e *Engine) extractArchiveShell(ctx context.Context, archivePath, destDir string) error {
|
||||
// Start heartbeat ticker for extraction progress
|
||||
extractionStart := time.Now()
|
||||
heartbeatCtx, cancelHeartbeat := context.WithCancel(ctx)
|
||||
heartbeatTicker := time.NewTicker(5 * time.Second)
|
||||
defer heartbeatTicker.Stop()
|
||||
defer cancelHeartbeat()
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-heartbeatTicker.C:
|
||||
elapsed := time.Since(extractionStart)
|
||||
e.progress.Update(fmt.Sprintf("Extracting archive... (elapsed: %s)", formatDuration(elapsed)))
|
||||
case <-heartbeatCtx.Done():
|
||||
return
|
||||
}
|
||||
e.log.Info("Extracting archive with parallel gzip",
|
||||
"archive", archivePath,
|
||||
"dest", destDir,
|
||||
"method", "pgzip")
|
||||
|
||||
// Use parallel extraction
|
||||
err := fs.ExtractTarGzParallel(ctx, archivePath, destDir, func(progress fs.ExtractProgress) {
|
||||
if progress.TotalBytes > 0 {
|
||||
elapsed := time.Since(extractionStart)
|
||||
pct := float64(progress.BytesRead) / float64(progress.TotalBytes) * 100
|
||||
e.progress.Update(fmt.Sprintf("Extracting archive... %.1f%% (elapsed: %s)", pct, formatDuration(elapsed)))
|
||||
}
|
||||
}()
|
||||
})
|
||||
|
||||
cmd := exec.CommandContext(ctx, "tar", "-xzf", archivePath, "-C", destDir)
|
||||
|
||||
// Stream stderr to avoid memory issues - tar can produce lots of output for large archives
|
||||
stderr, err := cmd.StderrPipe()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create stderr pipe: %w", err)
|
||||
return fmt.Errorf("parallel extraction failed: %w", err)
|
||||
}
|
||||
|
||||
if err := cmd.Start(); err != nil {
|
||||
return fmt.Errorf("failed to start tar: %w", err)
|
||||
}
|
||||
|
||||
// Discard stderr output in chunks to prevent memory buildup
|
||||
stderrDone := make(chan struct{})
|
||||
go func() {
|
||||
defer close(stderrDone)
|
||||
buf := make([]byte, 4096)
|
||||
for {
|
||||
_, err := stderr.Read(buf)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Wait for command with proper context handling
|
||||
cmdDone := make(chan error, 1)
|
||||
go func() {
|
||||
cmdDone <- cmd.Wait()
|
||||
}()
|
||||
|
||||
var cmdErr error
|
||||
select {
|
||||
case cmdErr = <-cmdDone:
|
||||
// Command completed
|
||||
case <-ctx.Done():
|
||||
e.log.Warn("Archive extraction cancelled - killing process")
|
||||
cmd.Process.Kill()
|
||||
<-cmdDone
|
||||
cmdErr = ctx.Err()
|
||||
}
|
||||
|
||||
<-stderrDone
|
||||
|
||||
if cmdErr != nil {
|
||||
return fmt.Errorf("tar extraction failed: %w", cmdErr)
|
||||
}
|
||||
elapsed := time.Since(extractionStart)
|
||||
e.log.Info("Archive extraction complete", "duration", formatDuration(elapsed))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@ -499,14 +499,28 @@ func getMemInfo() (*memInfo, error) {
|
||||
}
|
||||
|
||||
// TunePostgresForRestore returns SQL commands to tune PostgreSQL for low-memory restore
|
||||
func (g *LargeDBGuard) TunePostgresForRestore() []string {
|
||||
// lockBoost should be calculated based on BLOB count (use preflight.Archive.RecommendedLockBoost)
|
||||
func (g *LargeDBGuard) TunePostgresForRestore(lockBoost int) []string {
|
||||
// Use incremental lock values, never go straight to max
|
||||
// Minimum 2048, scale based on actual need
|
||||
if lockBoost < 2048 {
|
||||
lockBoost = 2048
|
||||
}
|
||||
// Cap at 65536 - higher values use too much shared memory
|
||||
if lockBoost > 65536 {
|
||||
lockBoost = 65536
|
||||
}
|
||||
|
||||
return []string{
|
||||
"ALTER SYSTEM SET work_mem = '64MB';",
|
||||
"ALTER SYSTEM SET maintenance_work_mem = '256MB';",
|
||||
"ALTER SYSTEM SET max_parallel_workers = 0;",
|
||||
"ALTER SYSTEM SET max_parallel_workers_per_gather = 0;",
|
||||
"ALTER SYSTEM SET max_parallel_maintenance_workers = 0;",
|
||||
"ALTER SYSTEM SET max_locks_per_transaction = 65536;",
|
||||
fmt.Sprintf("ALTER SYSTEM SET max_locks_per_transaction = %d;", lockBoost),
|
||||
"-- Checkpoint tuning for large restores:",
|
||||
"ALTER SYSTEM SET checkpoint_timeout = '30min';",
|
||||
"ALTER SYSTEM SET checkpoint_completion_target = 0.9;",
|
||||
"SELECT pg_reload_conf();",
|
||||
}
|
||||
}
|
||||
@ -519,6 +533,8 @@ func (g *LargeDBGuard) RevertPostgresSettings() []string {
|
||||
"ALTER SYSTEM RESET max_parallel_workers;",
|
||||
"ALTER SYSTEM RESET max_parallel_workers_per_gather;",
|
||||
"ALTER SYSTEM RESET max_parallel_maintenance_workers;",
|
||||
"ALTER SYSTEM RESET checkpoint_timeout;",
|
||||
"ALTER SYSTEM RESET checkpoint_completion_target;",
|
||||
"SELECT pg_reload_conf();",
|
||||
}
|
||||
}
|
||||
|
||||
@ -10,6 +10,7 @@ import (
|
||||
"strings"
|
||||
|
||||
"dbbackup/internal/config"
|
||||
"dbbackup/internal/fs"
|
||||
"dbbackup/internal/logger"
|
||||
)
|
||||
|
||||
@ -272,21 +273,32 @@ func (s *Safety) ValidateAndExtractCluster(ctx context.Context, archivePath stri
|
||||
workDir = s.cfg.BackupDir
|
||||
}
|
||||
|
||||
tempDir, err := os.MkdirTemp(workDir, "dbbackup-cluster-extract-*")
|
||||
// Use secure temp directory (0700 permissions) to prevent other users
|
||||
// from reading sensitive database dump contents
|
||||
tempDir, err := fs.SecureMkdirTemp(workDir, "dbbackup-cluster-extract-*")
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to create temp extraction directory in %s: %w", workDir, err)
|
||||
}
|
||||
|
||||
// Extract using tar command (fastest method)
|
||||
// Extract using parallel gzip (2-4x faster on multi-core systems)
|
||||
s.log.Info("Pre-extracting cluster archive for validation and restore",
|
||||
"archive", archivePath,
|
||||
"dest", tempDir)
|
||||
"dest", tempDir,
|
||||
"method", "parallel-gzip")
|
||||
|
||||
cmd := exec.CommandContext(ctx, "tar", "-xzf", archivePath, "-C", tempDir)
|
||||
output, err := cmd.CombinedOutput()
|
||||
// Use Go's parallel extraction instead of shelling out to tar
|
||||
// This uses pgzip for multi-core decompression
|
||||
err = fs.ExtractTarGzParallel(ctx, archivePath, tempDir, func(progress fs.ExtractProgress) {
|
||||
if progress.TotalBytes > 0 {
|
||||
pct := float64(progress.BytesRead) / float64(progress.TotalBytes) * 100
|
||||
s.log.Debug("Extraction progress",
|
||||
"file", progress.CurrentFile,
|
||||
"percent", fmt.Sprintf("%.1f%%", pct))
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
os.RemoveAll(tempDir) // Cleanup on failure
|
||||
return "", fmt.Errorf("extraction failed: %w: %s", err, string(output))
|
||||
return "", fmt.Errorf("extraction failed: %w", err)
|
||||
}
|
||||
|
||||
s.log.Info("Cluster archive extracted successfully", "location", tempDir)
|
||||
|
||||
@ -47,8 +47,8 @@ type WorkDirMode int
|
||||
|
||||
const (
|
||||
WorkDirSystemTemp WorkDirMode = iota // Use system temp (/tmp)
|
||||
WorkDirConfig // Use config.WorkDir
|
||||
WorkDirBackup // Use config.BackupDir
|
||||
WorkDirConfig // Use config.WorkDir
|
||||
WorkDirBackup // Use config.BackupDir
|
||||
)
|
||||
|
||||
type RestorePreviewModel struct {
|
||||
@ -69,9 +69,9 @@ type RestorePreviewModel struct {
|
||||
checking bool
|
||||
canProceed bool
|
||||
message string
|
||||
saveDebugLog bool // Save detailed error report on failure
|
||||
debugLocks bool // Enable detailed lock debugging
|
||||
workDir string // Resolved work directory path
|
||||
saveDebugLog bool // Save detailed error report on failure
|
||||
debugLocks bool // Enable detailed lock debugging
|
||||
workDir string // Resolved work directory path
|
||||
workDirMode WorkDirMode // Which source is selected
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user