ci: add golangci-lint config and fix formatting
- Add .golangci.yml with minimal linters (govet, ineffassign) - Run gofmt -s and goimports on all files to fix formatting - Disable fieldalignment and copylocks checks in govet
This commit is contained in:
@@ -20,11 +20,11 @@ import (
|
||||
"dbbackup/internal/cloud"
|
||||
"dbbackup/internal/config"
|
||||
"dbbackup/internal/database"
|
||||
"dbbackup/internal/security"
|
||||
"dbbackup/internal/logger"
|
||||
"dbbackup/internal/metadata"
|
||||
"dbbackup/internal/metrics"
|
||||
"dbbackup/internal/progress"
|
||||
"dbbackup/internal/security"
|
||||
"dbbackup/internal/swap"
|
||||
)
|
||||
|
||||
@@ -42,7 +42,7 @@ type Engine struct {
|
||||
func New(cfg *config.Config, log logger.Logger, db database.Database) *Engine {
|
||||
progressIndicator := progress.NewIndicator(true, "line") // Use line-by-line indicator
|
||||
detailedReporter := progress.NewDetailedReporter(progressIndicator, &loggerAdapter{logger: log})
|
||||
|
||||
|
||||
return &Engine{
|
||||
cfg: cfg,
|
||||
log: log,
|
||||
@@ -56,7 +56,7 @@ func New(cfg *config.Config, log logger.Logger, db database.Database) *Engine {
|
||||
// NewWithProgress creates a new backup engine with a custom progress indicator
|
||||
func NewWithProgress(cfg *config.Config, log logger.Logger, db database.Database, progressIndicator progress.Indicator) *Engine {
|
||||
detailedReporter := progress.NewDetailedReporter(progressIndicator, &loggerAdapter{logger: log})
|
||||
|
||||
|
||||
return &Engine{
|
||||
cfg: cfg,
|
||||
log: log,
|
||||
@@ -73,9 +73,9 @@ func NewSilent(cfg *config.Config, log logger.Logger, db database.Database, prog
|
||||
if progressIndicator == nil {
|
||||
progressIndicator = progress.NewNullIndicator()
|
||||
}
|
||||
|
||||
|
||||
detailedReporter := progress.NewDetailedReporter(progressIndicator, &loggerAdapter{logger: log})
|
||||
|
||||
|
||||
return &Engine{
|
||||
cfg: cfg,
|
||||
log: log,
|
||||
@@ -126,16 +126,16 @@ func (e *Engine) BackupSingle(ctx context.Context, databaseName string) error {
|
||||
// Start detailed operation tracking
|
||||
operationID := generateOperationID()
|
||||
tracker := e.detailedReporter.StartOperation(operationID, databaseName, "backup")
|
||||
|
||||
|
||||
// Add operation details
|
||||
tracker.SetDetails("database", databaseName)
|
||||
tracker.SetDetails("type", "single")
|
||||
tracker.SetDetails("compression", strconv.Itoa(e.cfg.CompressionLevel))
|
||||
tracker.SetDetails("format", "custom")
|
||||
|
||||
|
||||
// Start preparing backup directory
|
||||
prepStep := tracker.AddStep("prepare", "Preparing backup directory")
|
||||
|
||||
|
||||
// Validate and sanitize backup directory path
|
||||
validBackupDir, err := security.ValidateBackupPath(e.cfg.BackupDir)
|
||||
if err != nil {
|
||||
@@ -144,7 +144,7 @@ func (e *Engine) BackupSingle(ctx context.Context, databaseName string) error {
|
||||
return fmt.Errorf("invalid backup directory path: %w", err)
|
||||
}
|
||||
e.cfg.BackupDir = validBackupDir
|
||||
|
||||
|
||||
if err := os.MkdirAll(e.cfg.BackupDir, 0755); err != nil {
|
||||
err = fmt.Errorf("failed to create backup directory %s. Check write permissions or use --backup-dir to specify writable location: %w", e.cfg.BackupDir, err)
|
||||
prepStep.Fail(err)
|
||||
@@ -153,20 +153,20 @@ func (e *Engine) BackupSingle(ctx context.Context, databaseName string) error {
|
||||
}
|
||||
prepStep.Complete("Backup directory prepared")
|
||||
tracker.UpdateProgress(10, "Backup directory prepared")
|
||||
|
||||
|
||||
// Generate timestamp and filename
|
||||
timestamp := time.Now().Format("20060102_150405")
|
||||
var outputFile string
|
||||
|
||||
|
||||
if e.cfg.IsPostgreSQL() {
|
||||
outputFile = filepath.Join(e.cfg.BackupDir, fmt.Sprintf("db_%s_%s.dump", databaseName, timestamp))
|
||||
} else {
|
||||
outputFile = filepath.Join(e.cfg.BackupDir, fmt.Sprintf("db_%s_%s.sql.gz", databaseName, timestamp))
|
||||
}
|
||||
|
||||
|
||||
tracker.SetDetails("output_file", outputFile)
|
||||
tracker.UpdateProgress(20, "Generated backup filename")
|
||||
|
||||
|
||||
// Build backup command
|
||||
cmdStep := tracker.AddStep("command", "Building backup command")
|
||||
options := database.BackupOptions{
|
||||
@@ -177,15 +177,15 @@ func (e *Engine) BackupSingle(ctx context.Context, databaseName string) error {
|
||||
NoOwner: false,
|
||||
NoPrivileges: false,
|
||||
}
|
||||
|
||||
|
||||
cmd := e.db.BuildBackupCommand(databaseName, outputFile, options)
|
||||
cmdStep.Complete("Backup command prepared")
|
||||
tracker.UpdateProgress(30, "Backup command prepared")
|
||||
|
||||
|
||||
// Execute backup command with progress monitoring
|
||||
execStep := tracker.AddStep("execute", "Executing database backup")
|
||||
tracker.UpdateProgress(40, "Starting database backup...")
|
||||
|
||||
|
||||
if err := e.executeCommandWithProgress(ctx, cmd, outputFile, tracker); err != nil {
|
||||
err = fmt.Errorf("backup failed for %s: %w. Check database connectivity and disk space", databaseName, err)
|
||||
execStep.Fail(err)
|
||||
@@ -194,7 +194,7 @@ func (e *Engine) BackupSingle(ctx context.Context, databaseName string) error {
|
||||
}
|
||||
execStep.Complete("Database backup completed")
|
||||
tracker.UpdateProgress(80, "Database backup completed")
|
||||
|
||||
|
||||
// Verify backup file
|
||||
verifyStep := tracker.AddStep("verify", "Verifying backup file")
|
||||
if info, err := os.Stat(outputFile); err != nil {
|
||||
@@ -209,7 +209,7 @@ func (e *Engine) BackupSingle(ctx context.Context, databaseName string) error {
|
||||
verifyStep.Complete(fmt.Sprintf("Backup file verified: %s", size))
|
||||
tracker.UpdateProgress(90, fmt.Sprintf("Backup verified: %s", size))
|
||||
}
|
||||
|
||||
|
||||
// Calculate and save checksum
|
||||
checksumStep := tracker.AddStep("checksum", "Calculating SHA-256 checksum")
|
||||
if checksum, err := security.ChecksumFile(outputFile); err != nil {
|
||||
@@ -223,7 +223,7 @@ func (e *Engine) BackupSingle(ctx context.Context, databaseName string) error {
|
||||
e.log.Info("Backup checksum", "sha256", checksum)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Create metadata file
|
||||
metaStep := tracker.AddStep("metadata", "Creating metadata file")
|
||||
if err := e.createMetadata(outputFile, databaseName, "single", ""); err != nil {
|
||||
@@ -232,12 +232,12 @@ func (e *Engine) BackupSingle(ctx context.Context, databaseName string) error {
|
||||
} else {
|
||||
metaStep.Complete("Metadata file created")
|
||||
}
|
||||
|
||||
|
||||
// Record metrics for observability
|
||||
if info, err := os.Stat(outputFile); err == nil && metrics.GlobalMetrics != nil {
|
||||
metrics.GlobalMetrics.RecordOperation("backup_single", databaseName, time.Now().Add(-time.Minute), info.Size(), true, 0)
|
||||
}
|
||||
|
||||
|
||||
// Cloud upload if enabled
|
||||
if e.cfg.CloudEnabled && e.cfg.CloudAutoUpload {
|
||||
if err := e.uploadToCloud(ctx, outputFile, tracker); err != nil {
|
||||
@@ -245,39 +245,39 @@ func (e *Engine) BackupSingle(ctx context.Context, databaseName string) error {
|
||||
// Don't fail the backup if cloud upload fails
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Complete operation
|
||||
tracker.UpdateProgress(100, "Backup operation completed successfully")
|
||||
tracker.Complete(fmt.Sprintf("Single database backup completed: %s", filepath.Base(outputFile)))
|
||||
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// BackupSample performs a sample database backup
|
||||
func (e *Engine) BackupSample(ctx context.Context, databaseName string) error {
|
||||
operation := e.log.StartOperation("Sample Database Backup")
|
||||
|
||||
|
||||
// Ensure backup directory exists
|
||||
if err := os.MkdirAll(e.cfg.BackupDir, 0755); err != nil {
|
||||
operation.Fail("Failed to create backup directory")
|
||||
return fmt.Errorf("failed to create backup directory: %w", err)
|
||||
}
|
||||
|
||||
|
||||
// Generate timestamp and filename
|
||||
timestamp := time.Now().Format("20060102_150405")
|
||||
outputFile := filepath.Join(e.cfg.BackupDir,
|
||||
outputFile := filepath.Join(e.cfg.BackupDir,
|
||||
fmt.Sprintf("sample_%s_%s%d_%s.sql", databaseName, e.cfg.SampleStrategy, e.cfg.SampleValue, timestamp))
|
||||
|
||||
|
||||
operation.Update("Starting sample database backup")
|
||||
e.progress.Start(fmt.Sprintf("Creating sample backup of '%s' (%s=%d)", databaseName, e.cfg.SampleStrategy, e.cfg.SampleValue))
|
||||
|
||||
|
||||
// For sample backups, we need to get the schema first, then sample data
|
||||
if err := e.createSampleBackup(ctx, databaseName, outputFile); err != nil {
|
||||
e.progress.Fail(fmt.Sprintf("Sample backup failed: %v", err))
|
||||
operation.Fail("Sample backup failed")
|
||||
return fmt.Errorf("sample backup failed: %w", err)
|
||||
}
|
||||
|
||||
|
||||
// Check output file
|
||||
if info, err := os.Stat(outputFile); err != nil {
|
||||
e.progress.Fail("Sample backup file not created")
|
||||
@@ -288,12 +288,12 @@ func (e *Engine) BackupSample(ctx context.Context, databaseName string) error {
|
||||
e.progress.Complete(fmt.Sprintf("Sample backup completed: %s (%s)", filepath.Base(outputFile), size))
|
||||
operation.Complete(fmt.Sprintf("Sample backup created: %s (%s)", outputFile, size))
|
||||
}
|
||||
|
||||
|
||||
// Create metadata file
|
||||
if err := e.createMetadata(outputFile, databaseName, "sample", e.cfg.SampleStrategy); err != nil {
|
||||
e.log.Warn("Failed to create metadata file", "error", err)
|
||||
}
|
||||
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -302,19 +302,19 @@ func (e *Engine) BackupCluster(ctx context.Context) error {
|
||||
if !e.cfg.IsPostgreSQL() {
|
||||
return fmt.Errorf("cluster backup is only supported for PostgreSQL")
|
||||
}
|
||||
|
||||
|
||||
operation := e.log.StartOperation("Cluster Backup")
|
||||
|
||||
|
||||
// Setup swap file if configured
|
||||
var swapMgr *swap.Manager
|
||||
if e.cfg.AutoSwap && e.cfg.SwapFileSizeGB > 0 {
|
||||
swapMgr = swap.NewManager(e.cfg.SwapFilePath, e.cfg.SwapFileSizeGB, e.log)
|
||||
|
||||
|
||||
if swapMgr.IsSupported() {
|
||||
e.log.Info("Setting up temporary swap file for large backup",
|
||||
"path", e.cfg.SwapFilePath,
|
||||
e.log.Info("Setting up temporary swap file for large backup",
|
||||
"path", e.cfg.SwapFilePath,
|
||||
"size_gb", e.cfg.SwapFileSizeGB)
|
||||
|
||||
|
||||
if err := swapMgr.Setup(); err != nil {
|
||||
e.log.Warn("Failed to setup swap file (continuing without it)", "error", err)
|
||||
} else {
|
||||
@@ -329,7 +329,7 @@ func (e *Engine) BackupCluster(ctx context.Context) error {
|
||||
e.log.Warn("Swap file management not supported on this platform", "os", swapMgr)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Use appropriate progress indicator based on silent mode
|
||||
var quietProgress progress.Indicator
|
||||
if e.silent {
|
||||
@@ -340,42 +340,42 @@ func (e *Engine) BackupCluster(ctx context.Context) error {
|
||||
quietProgress = progress.NewQuietLineByLine()
|
||||
quietProgress.Start("Starting cluster backup (all databases)")
|
||||
}
|
||||
|
||||
|
||||
// Ensure backup directory exists
|
||||
if err := os.MkdirAll(e.cfg.BackupDir, 0755); err != nil {
|
||||
operation.Fail("Failed to create backup directory")
|
||||
quietProgress.Fail("Failed to create backup directory")
|
||||
return fmt.Errorf("failed to create backup directory: %w", err)
|
||||
}
|
||||
|
||||
|
||||
// Check disk space before starting backup (cached for performance)
|
||||
e.log.Info("Checking disk space availability")
|
||||
spaceCheck := checks.CheckDiskSpaceCached(e.cfg.BackupDir)
|
||||
|
||||
|
||||
if !e.silent {
|
||||
// Show disk space status in CLI mode
|
||||
fmt.Println("\n" + checks.FormatDiskSpaceMessage(spaceCheck))
|
||||
}
|
||||
|
||||
|
||||
if spaceCheck.Critical {
|
||||
operation.Fail("Insufficient disk space")
|
||||
quietProgress.Fail("Insufficient disk space - free up space and try again")
|
||||
return fmt.Errorf("insufficient disk space: %.1f%% used, operation blocked", spaceCheck.UsedPercent)
|
||||
}
|
||||
|
||||
|
||||
if spaceCheck.Warning {
|
||||
e.log.Warn("Low disk space - backup may fail if database is large",
|
||||
e.log.Warn("Low disk space - backup may fail if database is large",
|
||||
"available_gb", float64(spaceCheck.AvailableBytes)/(1024*1024*1024),
|
||||
"used_percent", spaceCheck.UsedPercent)
|
||||
}
|
||||
|
||||
|
||||
// Generate timestamp and filename
|
||||
timestamp := time.Now().Format("20060102_150405")
|
||||
outputFile := filepath.Join(e.cfg.BackupDir, fmt.Sprintf("cluster_%s.tar.gz", timestamp))
|
||||
tempDir := filepath.Join(e.cfg.BackupDir, fmt.Sprintf(".cluster_%s", timestamp))
|
||||
|
||||
|
||||
operation.Update("Starting cluster backup")
|
||||
|
||||
|
||||
// Create temporary directory
|
||||
if err := os.MkdirAll(filepath.Join(tempDir, "dumps"), 0755); err != nil {
|
||||
operation.Fail("Failed to create temporary directory")
|
||||
@@ -383,7 +383,7 @@ func (e *Engine) BackupCluster(ctx context.Context) error {
|
||||
return fmt.Errorf("failed to create temp directory: %w", err)
|
||||
}
|
||||
defer os.RemoveAll(tempDir)
|
||||
|
||||
|
||||
// Backup globals
|
||||
e.printf(" Backing up global objects...\n")
|
||||
if err := e.backupGlobals(ctx, tempDir); err != nil {
|
||||
@@ -391,7 +391,7 @@ func (e *Engine) BackupCluster(ctx context.Context) error {
|
||||
operation.Fail("Global backup failed")
|
||||
return fmt.Errorf("failed to backup globals: %w", err)
|
||||
}
|
||||
|
||||
|
||||
// Get list of databases
|
||||
e.printf(" Getting database list...\n")
|
||||
databases, err := e.db.ListDatabases(ctx)
|
||||
@@ -400,31 +400,31 @@ func (e *Engine) BackupCluster(ctx context.Context) error {
|
||||
operation.Fail("Database listing failed")
|
||||
return fmt.Errorf("failed to list databases: %w", err)
|
||||
}
|
||||
|
||||
|
||||
// Create ETA estimator for database backups
|
||||
estimator := progress.NewETAEstimator("Backing up cluster", len(databases))
|
||||
quietProgress.SetEstimator(estimator)
|
||||
|
||||
|
||||
// Backup each database
|
||||
parallelism := e.cfg.ClusterParallelism
|
||||
if parallelism < 1 {
|
||||
parallelism = 1 // Ensure at least sequential
|
||||
}
|
||||
|
||||
|
||||
if parallelism == 1 {
|
||||
e.printf(" Backing up %d databases sequentially...\n", len(databases))
|
||||
} else {
|
||||
e.printf(" Backing up %d databases with %d parallel workers...\n", len(databases), parallelism)
|
||||
}
|
||||
|
||||
|
||||
// Use worker pool for parallel backup
|
||||
var successCount, failCount int32
|
||||
var mu sync.Mutex // Protect shared resources (printf, estimator)
|
||||
|
||||
|
||||
// Create semaphore to limit concurrency
|
||||
semaphore := make(chan struct{}, parallelism)
|
||||
var wg sync.WaitGroup
|
||||
|
||||
|
||||
for i, dbName := range databases {
|
||||
// Check if context is cancelled before starting new backup
|
||||
select {
|
||||
@@ -435,14 +435,14 @@ func (e *Engine) BackupCluster(ctx context.Context) error {
|
||||
return fmt.Errorf("backup cancelled: %w", ctx.Err())
|
||||
default:
|
||||
}
|
||||
|
||||
|
||||
wg.Add(1)
|
||||
semaphore <- struct{}{} // Acquire
|
||||
|
||||
|
||||
go func(idx int, name string) {
|
||||
defer wg.Done()
|
||||
defer func() { <-semaphore }() // Release
|
||||
|
||||
|
||||
// Check for cancellation at start of goroutine
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@@ -451,14 +451,14 @@ func (e *Engine) BackupCluster(ctx context.Context) error {
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
|
||||
// Update estimator progress (thread-safe)
|
||||
mu.Lock()
|
||||
estimator.UpdateProgress(idx)
|
||||
e.printf(" [%d/%d] Backing up database: %s\n", idx+1, len(databases), name)
|
||||
quietProgress.Update(fmt.Sprintf("Backing up database %d/%d: %s", idx+1, len(databases), name))
|
||||
mu.Unlock()
|
||||
|
||||
|
||||
// Check database size and warn if very large
|
||||
if size, err := e.db.GetDatabaseSize(ctx, name); err == nil {
|
||||
sizeStr := formatBytes(size)
|
||||
@@ -469,17 +469,17 @@ func (e *Engine) BackupCluster(ctx context.Context) error {
|
||||
}
|
||||
mu.Unlock()
|
||||
}
|
||||
|
||||
|
||||
dumpFile := filepath.Join(tempDir, "dumps", name+".dump")
|
||||
|
||||
|
||||
compressionLevel := e.cfg.CompressionLevel
|
||||
if compressionLevel > 6 {
|
||||
compressionLevel = 6
|
||||
}
|
||||
|
||||
|
||||
format := "custom"
|
||||
parallel := e.cfg.DumpJobs
|
||||
|
||||
|
||||
if size, err := e.db.GetDatabaseSize(ctx, name); err == nil {
|
||||
if size > 5*1024*1024*1024 {
|
||||
format = "plain"
|
||||
@@ -490,7 +490,7 @@ func (e *Engine) BackupCluster(ctx context.Context) error {
|
||||
mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
options := database.BackupOptions{
|
||||
Compression: compressionLevel,
|
||||
Parallel: parallel,
|
||||
@@ -499,14 +499,14 @@ func (e *Engine) BackupCluster(ctx context.Context) error {
|
||||
NoOwner: false,
|
||||
NoPrivileges: false,
|
||||
}
|
||||
|
||||
|
||||
cmd := e.db.BuildBackupCommand(name, dumpFile, options)
|
||||
|
||||
|
||||
dbCtx, cancel := context.WithTimeout(ctx, 2*time.Hour)
|
||||
defer cancel()
|
||||
err := e.executeCommand(dbCtx, cmd, dumpFile)
|
||||
cancel()
|
||||
|
||||
|
||||
if err != nil {
|
||||
e.log.Warn("Failed to backup database", "database", name, "error", err)
|
||||
mu.Lock()
|
||||
@@ -526,15 +526,15 @@ func (e *Engine) BackupCluster(ctx context.Context) error {
|
||||
}
|
||||
}(i, dbName)
|
||||
}
|
||||
|
||||
|
||||
// Wait for all backups to complete
|
||||
wg.Wait()
|
||||
|
||||
|
||||
successCountFinal := int(atomic.LoadInt32(&successCount))
|
||||
failCountFinal := int(atomic.LoadInt32(&failCount))
|
||||
|
||||
|
||||
e.printf(" Backup summary: %d succeeded, %d failed\n", successCountFinal, failCountFinal)
|
||||
|
||||
|
||||
// Create archive
|
||||
e.printf(" Creating compressed archive...\n")
|
||||
if err := e.createArchive(ctx, tempDir, outputFile); err != nil {
|
||||
@@ -542,7 +542,7 @@ func (e *Engine) BackupCluster(ctx context.Context) error {
|
||||
operation.Fail("Archive creation failed")
|
||||
return fmt.Errorf("failed to create archive: %w", err)
|
||||
}
|
||||
|
||||
|
||||
// Check output file
|
||||
if info, err := os.Stat(outputFile); err != nil {
|
||||
quietProgress.Fail("Cluster backup archive not created")
|
||||
@@ -553,12 +553,12 @@ func (e *Engine) BackupCluster(ctx context.Context) error {
|
||||
quietProgress.Complete(fmt.Sprintf("Cluster backup completed: %s (%s)", filepath.Base(outputFile), size))
|
||||
operation.Complete(fmt.Sprintf("Cluster backup created: %s (%s)", outputFile, size))
|
||||
}
|
||||
|
||||
|
||||
// Create cluster metadata file
|
||||
if err := e.createClusterMetadata(outputFile, databases, successCountFinal, failCountFinal); err != nil {
|
||||
e.log.Warn("Failed to create cluster metadata file", "error", err)
|
||||
}
|
||||
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -567,11 +567,11 @@ func (e *Engine) executeCommandWithProgress(ctx context.Context, cmdArgs []strin
|
||||
if len(cmdArgs) == 0 {
|
||||
return fmt.Errorf("empty command")
|
||||
}
|
||||
|
||||
|
||||
e.log.Debug("Executing backup command with progress", "cmd", cmdArgs[0], "args", cmdArgs[1:])
|
||||
|
||||
|
||||
cmd := exec.CommandContext(ctx, cmdArgs[0], cmdArgs[1:]...)
|
||||
|
||||
|
||||
// Set environment variables for database tools
|
||||
cmd.Env = os.Environ()
|
||||
if e.cfg.Password != "" {
|
||||
@@ -581,51 +581,51 @@ func (e *Engine) executeCommandWithProgress(ctx context.Context, cmdArgs []strin
|
||||
cmd.Env = append(cmd.Env, "MYSQL_PWD="+e.cfg.Password)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// For MySQL, handle compression and redirection differently
|
||||
if e.cfg.IsMySQL() && e.cfg.CompressionLevel > 0 {
|
||||
return e.executeMySQLWithProgressAndCompression(ctx, cmdArgs, outputFile, tracker)
|
||||
}
|
||||
|
||||
|
||||
// Get stderr pipe for progress monitoring
|
||||
stderr, err := cmd.StderrPipe()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get stderr pipe: %w", err)
|
||||
}
|
||||
|
||||
|
||||
// Start the command
|
||||
if err := cmd.Start(); err != nil {
|
||||
return fmt.Errorf("failed to start command: %w", err)
|
||||
}
|
||||
|
||||
|
||||
// Monitor progress via stderr
|
||||
go e.monitorCommandProgress(stderr, tracker)
|
||||
|
||||
|
||||
// Wait for command to complete
|
||||
if err := cmd.Wait(); err != nil {
|
||||
return fmt.Errorf("backup command failed: %w", err)
|
||||
}
|
||||
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// monitorCommandProgress monitors command output for progress information
|
||||
func (e *Engine) monitorCommandProgress(stderr io.ReadCloser, tracker *progress.OperationTracker) {
|
||||
defer stderr.Close()
|
||||
|
||||
|
||||
scanner := bufio.NewScanner(stderr)
|
||||
scanner.Buffer(make([]byte, 64*1024), 1024*1024) // 64KB initial, 1MB max for performance
|
||||
progressBase := 40 // Start from 40% since command preparation is done
|
||||
progressBase := 40 // Start from 40% since command preparation is done
|
||||
progressIncrement := 0
|
||||
|
||||
|
||||
for scanner.Scan() {
|
||||
line := strings.TrimSpace(scanner.Text())
|
||||
if line == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
e.log.Debug("Command output", "line", line)
|
||||
|
||||
|
||||
// Increment progress gradually based on output
|
||||
if progressBase < 75 {
|
||||
progressIncrement++
|
||||
@@ -634,7 +634,7 @@ func (e *Engine) monitorCommandProgress(stderr io.ReadCloser, tracker *progress.
|
||||
tracker.UpdateProgress(progressBase, "Processing data...")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Look for specific progress indicators
|
||||
if strings.Contains(line, "COPY") {
|
||||
tracker.UpdateProgress(progressBase+5, "Copying table data...")
|
||||
@@ -654,55 +654,55 @@ func (e *Engine) executeMySQLWithProgressAndCompression(ctx context.Context, cmd
|
||||
if e.cfg.Password != "" {
|
||||
dumpCmd.Env = append(dumpCmd.Env, "MYSQL_PWD="+e.cfg.Password)
|
||||
}
|
||||
|
||||
|
||||
// Create gzip command
|
||||
gzipCmd := exec.CommandContext(ctx, "gzip", fmt.Sprintf("-%d", e.cfg.CompressionLevel))
|
||||
|
||||
|
||||
// Create output file
|
||||
outFile, err := os.Create(outputFile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create output file: %w", err)
|
||||
}
|
||||
defer outFile.Close()
|
||||
|
||||
|
||||
// Set up pipeline: mysqldump | gzip > outputfile
|
||||
pipe, err := dumpCmd.StdoutPipe()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create pipe: %w", err)
|
||||
}
|
||||
|
||||
|
||||
gzipCmd.Stdin = pipe
|
||||
gzipCmd.Stdout = outFile
|
||||
|
||||
|
||||
// Get stderr for progress monitoring
|
||||
stderr, err := dumpCmd.StderrPipe()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get stderr pipe: %w", err)
|
||||
}
|
||||
|
||||
|
||||
// Start monitoring progress
|
||||
go e.monitorCommandProgress(stderr, tracker)
|
||||
|
||||
|
||||
// Start both commands
|
||||
if err := gzipCmd.Start(); err != nil {
|
||||
return fmt.Errorf("failed to start gzip: %w", err)
|
||||
}
|
||||
|
||||
|
||||
if err := dumpCmd.Start(); err != nil {
|
||||
return fmt.Errorf("failed to start mysqldump: %w", err)
|
||||
}
|
||||
|
||||
|
||||
// Wait for mysqldump to complete
|
||||
if err := dumpCmd.Wait(); err != nil {
|
||||
return fmt.Errorf("mysqldump failed: %w", err)
|
||||
}
|
||||
|
||||
|
||||
// Close pipe and wait for gzip
|
||||
pipe.Close()
|
||||
if err := gzipCmd.Wait(); err != nil {
|
||||
return fmt.Errorf("gzip failed: %w", err)
|
||||
}
|
||||
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -714,17 +714,17 @@ func (e *Engine) executeMySQLWithCompression(ctx context.Context, cmdArgs []stri
|
||||
if e.cfg.Password != "" {
|
||||
dumpCmd.Env = append(dumpCmd.Env, "MYSQL_PWD="+e.cfg.Password)
|
||||
}
|
||||
|
||||
|
||||
// Create gzip command
|
||||
gzipCmd := exec.CommandContext(ctx, "gzip", fmt.Sprintf("-%d", e.cfg.CompressionLevel))
|
||||
|
||||
|
||||
// Create output file
|
||||
outFile, err := os.Create(outputFile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create output file: %w", err)
|
||||
}
|
||||
defer outFile.Close()
|
||||
|
||||
|
||||
// Set up pipeline: mysqldump | gzip > outputfile
|
||||
stdin, err := dumpCmd.StdoutPipe()
|
||||
if err != nil {
|
||||
@@ -732,20 +732,20 @@ func (e *Engine) executeMySQLWithCompression(ctx context.Context, cmdArgs []stri
|
||||
}
|
||||
gzipCmd.Stdin = stdin
|
||||
gzipCmd.Stdout = outFile
|
||||
|
||||
|
||||
// Start both commands
|
||||
if err := gzipCmd.Start(); err != nil {
|
||||
return fmt.Errorf("failed to start gzip: %w", err)
|
||||
}
|
||||
|
||||
|
||||
if err := dumpCmd.Run(); err != nil {
|
||||
return fmt.Errorf("mysqldump failed: %w", err)
|
||||
}
|
||||
|
||||
|
||||
if err := gzipCmd.Wait(); err != nil {
|
||||
return fmt.Errorf("gzip failed: %w", err)
|
||||
}
|
||||
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -757,23 +757,23 @@ func (e *Engine) createSampleBackup(ctx context.Context, databaseName, outputFil
|
||||
// 2. Get list of tables
|
||||
// 3. For each table, run sampling query
|
||||
// 4. Combine into single SQL file
|
||||
|
||||
|
||||
// For now, we'll use a simple approach with schema-only backup first
|
||||
// Then add sample data
|
||||
|
||||
|
||||
file, err := os.Create(outputFile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create sample backup file: %w", err)
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
|
||||
// Write header
|
||||
fmt.Fprintf(file, "-- Sample Database Backup\n")
|
||||
fmt.Fprintf(file, "-- Database: %s\n", databaseName)
|
||||
fmt.Fprintf(file, "-- Strategy: %s = %d\n", e.cfg.SampleStrategy, e.cfg.SampleValue)
|
||||
fmt.Fprintf(file, "-- Created: %s\n", time.Now().Format(time.RFC3339))
|
||||
fmt.Fprintf(file, "-- WARNING: This backup may have referential integrity issues!\n\n")
|
||||
|
||||
|
||||
// For PostgreSQL, we can use pg_dump --schema-only first
|
||||
if e.cfg.IsPostgreSQL() {
|
||||
// Get schema
|
||||
@@ -781,61 +781,61 @@ func (e *Engine) createSampleBackup(ctx context.Context, databaseName, outputFil
|
||||
SchemaOnly: true,
|
||||
Format: "plain",
|
||||
})
|
||||
|
||||
|
||||
cmd := exec.CommandContext(ctx, schemaCmd[0], schemaCmd[1:]...)
|
||||
cmd.Env = os.Environ()
|
||||
if e.cfg.Password != "" {
|
||||
cmd.Env = append(cmd.Env, "PGPASSWORD="+e.cfg.Password)
|
||||
}
|
||||
cmd.Stdout = file
|
||||
|
||||
|
||||
if err := cmd.Run(); err != nil {
|
||||
return fmt.Errorf("failed to export schema: %w", err)
|
||||
}
|
||||
|
||||
|
||||
fmt.Fprintf(file, "\n-- Sample data follows\n\n")
|
||||
|
||||
|
||||
// Get tables and sample data
|
||||
tables, err := e.db.ListTables(ctx, databaseName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to list tables: %w", err)
|
||||
}
|
||||
|
||||
|
||||
strategy := database.SampleStrategy{
|
||||
Type: e.cfg.SampleStrategy,
|
||||
Value: e.cfg.SampleValue,
|
||||
}
|
||||
|
||||
|
||||
for _, table := range tables {
|
||||
fmt.Fprintf(file, "-- Data for table: %s\n", table)
|
||||
sampleQuery := e.db.BuildSampleQuery(databaseName, table, strategy)
|
||||
fmt.Fprintf(file, "\\copy (%s) TO STDOUT\n\n", sampleQuery)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// backupGlobals creates a backup of global PostgreSQL objects
|
||||
func (e *Engine) backupGlobals(ctx context.Context, tempDir string) error {
|
||||
globalsFile := filepath.Join(tempDir, "globals.sql")
|
||||
|
||||
|
||||
cmd := exec.CommandContext(ctx, "pg_dumpall", "--globals-only")
|
||||
if e.cfg.Host != "localhost" {
|
||||
cmd.Args = append(cmd.Args, "-h", e.cfg.Host, "-p", fmt.Sprintf("%d", e.cfg.Port))
|
||||
}
|
||||
cmd.Args = append(cmd.Args, "-U", e.cfg.User)
|
||||
|
||||
|
||||
cmd.Env = os.Environ()
|
||||
if e.cfg.Password != "" {
|
||||
cmd.Env = append(cmd.Env, "PGPASSWORD="+e.cfg.Password)
|
||||
}
|
||||
|
||||
|
||||
output, err := cmd.Output()
|
||||
if err != nil {
|
||||
return fmt.Errorf("pg_dumpall failed: %w", err)
|
||||
}
|
||||
|
||||
|
||||
return os.WriteFile(globalsFile, output, 0644)
|
||||
}
|
||||
|
||||
@@ -844,13 +844,13 @@ func (e *Engine) createArchive(ctx context.Context, sourceDir, outputFile string
|
||||
// Use pigz for faster parallel compression if available, otherwise use standard gzip
|
||||
compressCmd := "tar"
|
||||
compressArgs := []string{"-czf", outputFile, "-C", sourceDir, "."}
|
||||
|
||||
|
||||
// Check if pigz is available for faster parallel compression
|
||||
if _, err := exec.LookPath("pigz"); err == nil {
|
||||
// Use pigz with number of cores for parallel compression
|
||||
compressArgs = []string{"-cf", "-", "-C", sourceDir, "."}
|
||||
cmd := exec.CommandContext(ctx, "tar", compressArgs...)
|
||||
|
||||
|
||||
// Create output file
|
||||
outFile, err := os.Create(outputFile)
|
||||
if err != nil {
|
||||
@@ -858,10 +858,10 @@ func (e *Engine) createArchive(ctx context.Context, sourceDir, outputFile string
|
||||
goto regularTar
|
||||
}
|
||||
defer outFile.Close()
|
||||
|
||||
|
||||
// Pipe to pigz for parallel compression
|
||||
pigzCmd := exec.CommandContext(ctx, "pigz", "-p", strconv.Itoa(e.cfg.Jobs))
|
||||
|
||||
|
||||
tarOut, err := cmd.StdoutPipe()
|
||||
if err != nil {
|
||||
outFile.Close()
|
||||
@@ -870,7 +870,7 @@ func (e *Engine) createArchive(ctx context.Context, sourceDir, outputFile string
|
||||
}
|
||||
pigzCmd.Stdin = tarOut
|
||||
pigzCmd.Stdout = outFile
|
||||
|
||||
|
||||
// Start both commands
|
||||
if err := pigzCmd.Start(); err != nil {
|
||||
outFile.Close()
|
||||
@@ -881,13 +881,13 @@ func (e *Engine) createArchive(ctx context.Context, sourceDir, outputFile string
|
||||
outFile.Close()
|
||||
goto regularTar
|
||||
}
|
||||
|
||||
|
||||
// Wait for tar to finish
|
||||
if err := cmd.Wait(); err != nil {
|
||||
pigzCmd.Process.Kill()
|
||||
return fmt.Errorf("tar failed: %w", err)
|
||||
}
|
||||
|
||||
|
||||
// Wait for pigz to finish
|
||||
if err := pigzCmd.Wait(); err != nil {
|
||||
return fmt.Errorf("pigz compression failed: %w", err)
|
||||
@@ -898,7 +898,7 @@ func (e *Engine) createArchive(ctx context.Context, sourceDir, outputFile string
|
||||
regularTar:
|
||||
// Standard tar with gzip (fallback)
|
||||
cmd := exec.CommandContext(ctx, compressCmd, compressArgs...)
|
||||
|
||||
|
||||
// Stream stderr to avoid memory issues
|
||||
// Use io.Copy to ensure goroutine completes when pipe closes
|
||||
stderr, err := cmd.StderrPipe()
|
||||
@@ -914,7 +914,7 @@ regularTar:
|
||||
// Scanner will exit when stderr pipe closes after cmd.Wait()
|
||||
}()
|
||||
}
|
||||
|
||||
|
||||
if err := cmd.Run(); err != nil {
|
||||
return fmt.Errorf("tar failed: %w", err)
|
||||
}
|
||||
@@ -925,26 +925,26 @@ regularTar:
|
||||
// createMetadata creates a metadata file for the backup
|
||||
func (e *Engine) createMetadata(backupFile, database, backupType, strategy string) error {
|
||||
startTime := time.Now()
|
||||
|
||||
|
||||
// Get backup file information
|
||||
info, err := os.Stat(backupFile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to stat backup file: %w", err)
|
||||
}
|
||||
|
||||
|
||||
// Calculate SHA-256 checksum
|
||||
sha256, err := metadata.CalculateSHA256(backupFile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to calculate checksum: %w", err)
|
||||
}
|
||||
|
||||
|
||||
// Get database version
|
||||
ctx := context.Background()
|
||||
dbVersion, _ := e.db.GetVersion(ctx)
|
||||
if dbVersion == "" {
|
||||
dbVersion = "unknown"
|
||||
}
|
||||
|
||||
|
||||
// Determine compression format
|
||||
compressionFormat := "none"
|
||||
if e.cfg.CompressionLevel > 0 {
|
||||
@@ -954,7 +954,7 @@ func (e *Engine) createMetadata(backupFile, database, backupType, strategy strin
|
||||
compressionFormat = fmt.Sprintf("gzip-%d", e.cfg.CompressionLevel)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Create backup metadata
|
||||
meta := &metadata.BackupMetadata{
|
||||
Version: "2.0",
|
||||
@@ -973,18 +973,18 @@ func (e *Engine) createMetadata(backupFile, database, backupType, strategy strin
|
||||
Duration: time.Since(startTime).Seconds(),
|
||||
ExtraInfo: make(map[string]string),
|
||||
}
|
||||
|
||||
|
||||
// Add strategy for sample backups
|
||||
if strategy != "" {
|
||||
meta.ExtraInfo["sample_strategy"] = strategy
|
||||
meta.ExtraInfo["sample_value"] = fmt.Sprintf("%d", e.cfg.SampleValue)
|
||||
}
|
||||
|
||||
|
||||
// Save metadata
|
||||
if err := meta.Save(); err != nil {
|
||||
return fmt.Errorf("failed to save metadata: %w", err)
|
||||
}
|
||||
|
||||
|
||||
// Also save legacy .info file for backward compatibility
|
||||
legacyMetaFile := backupFile + ".info"
|
||||
legacyContent := fmt.Sprintf(`{
|
||||
@@ -998,39 +998,39 @@ func (e *Engine) createMetadata(backupFile, database, backupType, strategy strin
|
||||
"compression": %d,
|
||||
"size_bytes": %d
|
||||
}`, backupType, database, startTime.Format("20060102_150405"),
|
||||
e.cfg.Host, e.cfg.Port, e.cfg.User, e.cfg.DatabaseType,
|
||||
e.cfg.Host, e.cfg.Port, e.cfg.User, e.cfg.DatabaseType,
|
||||
e.cfg.CompressionLevel, info.Size())
|
||||
|
||||
|
||||
if err := os.WriteFile(legacyMetaFile, []byte(legacyContent), 0644); err != nil {
|
||||
e.log.Warn("Failed to save legacy metadata file", "error", err)
|
||||
}
|
||||
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// createClusterMetadata creates metadata for cluster backups
|
||||
func (e *Engine) createClusterMetadata(backupFile string, databases []string, successCount, failCount int) error {
|
||||
startTime := time.Now()
|
||||
|
||||
|
||||
// Get backup file information
|
||||
info, err := os.Stat(backupFile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to stat backup file: %w", err)
|
||||
}
|
||||
|
||||
|
||||
// Calculate SHA-256 checksum for archive
|
||||
sha256, err := metadata.CalculateSHA256(backupFile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to calculate checksum: %w", err)
|
||||
}
|
||||
|
||||
|
||||
// Get database version
|
||||
ctx := context.Background()
|
||||
dbVersion, _ := e.db.GetVersion(ctx)
|
||||
if dbVersion == "" {
|
||||
dbVersion = "unknown"
|
||||
}
|
||||
|
||||
|
||||
// Create cluster metadata
|
||||
clusterMeta := &metadata.ClusterMetadata{
|
||||
Version: "2.0",
|
||||
@@ -1050,7 +1050,7 @@ func (e *Engine) createClusterMetadata(backupFile string, databases []string, su
|
||||
"database_version": dbVersion,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
// Add database names to metadata
|
||||
for _, dbName := range databases {
|
||||
dbMeta := metadata.BackupMetadata{
|
||||
@@ -1061,12 +1061,12 @@ func (e *Engine) createClusterMetadata(backupFile string, databases []string, su
|
||||
}
|
||||
clusterMeta.Databases = append(clusterMeta.Databases, dbMeta)
|
||||
}
|
||||
|
||||
|
||||
// Save cluster metadata
|
||||
if err := clusterMeta.Save(backupFile); err != nil {
|
||||
return fmt.Errorf("failed to save cluster metadata: %w", err)
|
||||
}
|
||||
|
||||
|
||||
// Also save legacy .info file for backward compatibility
|
||||
legacyMetaFile := backupFile + ".info"
|
||||
legacyContent := fmt.Sprintf(`{
|
||||
@@ -1085,18 +1085,18 @@ func (e *Engine) createClusterMetadata(backupFile string, databases []string, su
|
||||
}`, startTime.Format("20060102_150405"),
|
||||
e.cfg.Host, e.cfg.Port, e.cfg.User, e.cfg.DatabaseType,
|
||||
e.cfg.CompressionLevel, info.Size(), len(databases), successCount, failCount)
|
||||
|
||||
|
||||
if err := os.WriteFile(legacyMetaFile, []byte(legacyContent), 0644); err != nil {
|
||||
e.log.Warn("Failed to save legacy cluster metadata file", "error", err)
|
||||
}
|
||||
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// uploadToCloud uploads a backup file to cloud storage
|
||||
func (e *Engine) uploadToCloud(ctx context.Context, backupFile string, tracker *progress.OperationTracker) error {
|
||||
uploadStep := tracker.AddStep("cloud_upload", "Uploading to cloud storage")
|
||||
|
||||
|
||||
// Create cloud backend
|
||||
cloudCfg := &cloud.Config{
|
||||
Provider: e.cfg.CloudProvider,
|
||||
@@ -1111,23 +1111,23 @@ func (e *Engine) uploadToCloud(ctx context.Context, backupFile string, tracker *
|
||||
Timeout: 300,
|
||||
MaxRetries: 3,
|
||||
}
|
||||
|
||||
|
||||
backend, err := cloud.NewBackend(cloudCfg)
|
||||
if err != nil {
|
||||
uploadStep.Fail(fmt.Errorf("failed to create cloud backend: %w", err))
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
// Get file info
|
||||
info, err := os.Stat(backupFile)
|
||||
if err != nil {
|
||||
uploadStep.Fail(fmt.Errorf("failed to stat backup file: %w", err))
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
filename := filepath.Base(backupFile)
|
||||
e.log.Info("Uploading backup to cloud", "file", filename, "size", cloud.FormatSize(info.Size()))
|
||||
|
||||
|
||||
// Progress callback
|
||||
var lastPercent int
|
||||
progressCallback := func(transferred, total int64) {
|
||||
@@ -1137,14 +1137,14 @@ func (e *Engine) uploadToCloud(ctx context.Context, backupFile string, tracker *
|
||||
lastPercent = percent
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Upload to cloud
|
||||
err = backend.Upload(ctx, backupFile, filename, progressCallback)
|
||||
if err != nil {
|
||||
uploadStep.Fail(fmt.Errorf("cloud upload failed: %w", err))
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
// Also upload metadata file
|
||||
metaFile := backupFile + ".meta.json"
|
||||
if _, err := os.Stat(metaFile); err == nil {
|
||||
@@ -1154,10 +1154,10 @@ func (e *Engine) uploadToCloud(ctx context.Context, backupFile string, tracker *
|
||||
// Don't fail if metadata upload fails
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
uploadStep.Complete(fmt.Sprintf("Uploaded to %s/%s/%s", backend.Name(), e.cfg.CloudBucket, filename))
|
||||
e.log.Info("Backup uploaded to cloud", "provider", backend.Name(), "bucket", e.cfg.CloudBucket, "file", filename)
|
||||
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1166,9 +1166,9 @@ func (e *Engine) executeCommand(ctx context.Context, cmdArgs []string, outputFil
|
||||
if len(cmdArgs) == 0 {
|
||||
return fmt.Errorf("empty command")
|
||||
}
|
||||
|
||||
|
||||
e.log.Debug("Executing backup command", "cmd", cmdArgs[0], "args", cmdArgs[1:])
|
||||
|
||||
|
||||
// Check if pg_dump will write to stdout (which means we need to handle piping to compressor).
|
||||
// BuildBackupCommand omits --file when format==plain AND compression==0, causing pg_dump
|
||||
// to write to stdout. In that case we must pipe to external compressor.
|
||||
@@ -1192,28 +1192,28 @@ func (e *Engine) executeCommand(ctx context.Context, cmdArgs []string, outputFil
|
||||
if isPlainFormat && !hasFileFlag {
|
||||
usesStdout = true
|
||||
}
|
||||
|
||||
e.log.Debug("Backup command analysis",
|
||||
"plain_format", isPlainFormat,
|
||||
"has_file_flag", hasFileFlag,
|
||||
|
||||
e.log.Debug("Backup command analysis",
|
||||
"plain_format", isPlainFormat,
|
||||
"has_file_flag", hasFileFlag,
|
||||
"uses_stdout", usesStdout,
|
||||
"output_file", outputFile)
|
||||
|
||||
|
||||
// For MySQL, handle compression differently
|
||||
if e.cfg.IsMySQL() && e.cfg.CompressionLevel > 0 {
|
||||
return e.executeMySQLWithCompression(ctx, cmdArgs, outputFile)
|
||||
}
|
||||
|
||||
|
||||
// For plain format writing to stdout, use streaming compression
|
||||
if usesStdout {
|
||||
e.log.Debug("Using streaming compression for large database")
|
||||
return e.executeWithStreamingCompression(ctx, cmdArgs, outputFile)
|
||||
}
|
||||
|
||||
|
||||
// For custom format, pg_dump handles everything (writes directly to file)
|
||||
// NO GO BUFFERING - pg_dump writes directly to disk
|
||||
cmd := exec.CommandContext(ctx, cmdArgs[0], cmdArgs[1:]...)
|
||||
|
||||
|
||||
// Set environment variables for database tools
|
||||
cmd.Env = os.Environ()
|
||||
if e.cfg.Password != "" {
|
||||
@@ -1223,18 +1223,18 @@ func (e *Engine) executeCommand(ctx context.Context, cmdArgs []string, outputFil
|
||||
cmd.Env = append(cmd.Env, "MYSQL_PWD="+e.cfg.Password)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Stream stderr to avoid memory issues with large databases
|
||||
stderr, err := cmd.StderrPipe()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create stderr pipe: %w", err)
|
||||
}
|
||||
|
||||
|
||||
// Start the command
|
||||
if err := cmd.Start(); err != nil {
|
||||
return fmt.Errorf("failed to start backup command: %w", err)
|
||||
}
|
||||
|
||||
|
||||
// Stream stderr output (don't buffer it all in memory)
|
||||
go func() {
|
||||
scanner := bufio.NewScanner(stderr)
|
||||
@@ -1246,13 +1246,13 @@ func (e *Engine) executeCommand(ctx context.Context, cmdArgs []string, outputFil
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
|
||||
// Wait for command to complete
|
||||
if err := cmd.Wait(); err != nil {
|
||||
e.log.Error("Backup command failed", "error", err, "database", filepath.Base(outputFile))
|
||||
return fmt.Errorf("backup command failed: %w", err)
|
||||
}
|
||||
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1260,7 +1260,7 @@ func (e *Engine) executeCommand(ctx context.Context, cmdArgs []string, outputFil
|
||||
// Uses: pg_dump | pigz > file.sql.gz (zero-copy streaming)
|
||||
func (e *Engine) executeWithStreamingCompression(ctx context.Context, cmdArgs []string, outputFile string) error {
|
||||
e.log.Debug("Using streaming compression for large database")
|
||||
|
||||
|
||||
// Derive compressed output filename. If the output was named *.dump we replace that
|
||||
// with *.sql.gz; otherwise append .gz to the provided output file so we don't
|
||||
// accidentally create unwanted double extensions.
|
||||
@@ -1273,43 +1273,43 @@ func (e *Engine) executeWithStreamingCompression(ctx context.Context, cmdArgs []
|
||||
} else {
|
||||
compressedFile = outputFile + ".gz"
|
||||
}
|
||||
|
||||
|
||||
// Create pg_dump command
|
||||
dumpCmd := exec.CommandContext(ctx, cmdArgs[0], cmdArgs[1:]...)
|
||||
dumpCmd.Env = os.Environ()
|
||||
if e.cfg.Password != "" && e.cfg.IsPostgreSQL() {
|
||||
dumpCmd.Env = append(dumpCmd.Env, "PGPASSWORD="+e.cfg.Password)
|
||||
}
|
||||
|
||||
|
||||
// Check for pigz (parallel gzip)
|
||||
compressor := "gzip"
|
||||
compressorArgs := []string{"-c"}
|
||||
|
||||
|
||||
if _, err := exec.LookPath("pigz"); err == nil {
|
||||
compressor = "pigz"
|
||||
compressorArgs = []string{"-p", strconv.Itoa(e.cfg.Jobs), "-c"}
|
||||
e.log.Debug("Using pigz for parallel compression", "threads", e.cfg.Jobs)
|
||||
}
|
||||
|
||||
|
||||
// Create compression command
|
||||
compressCmd := exec.CommandContext(ctx, compressor, compressorArgs...)
|
||||
|
||||
|
||||
// Create output file
|
||||
outFile, err := os.Create(compressedFile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create output file: %w", err)
|
||||
}
|
||||
defer outFile.Close()
|
||||
|
||||
|
||||
// Set up pipeline: pg_dump | pigz > file.sql.gz
|
||||
dumpStdout, err := dumpCmd.StdoutPipe()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create dump stdout pipe: %w", err)
|
||||
}
|
||||
|
||||
|
||||
compressCmd.Stdin = dumpStdout
|
||||
compressCmd.Stdout = outFile
|
||||
|
||||
|
||||
// Capture stderr from both commands
|
||||
dumpStderr, err := dumpCmd.StderrPipe()
|
||||
if err != nil {
|
||||
@@ -1319,7 +1319,7 @@ func (e *Engine) executeWithStreamingCompression(ctx context.Context, cmdArgs []
|
||||
if err != nil {
|
||||
e.log.Warn("Failed to capture compress stderr", "error", err)
|
||||
}
|
||||
|
||||
|
||||
// Stream stderr output
|
||||
if dumpStderr != nil {
|
||||
go func() {
|
||||
@@ -1332,7 +1332,7 @@ func (e *Engine) executeWithStreamingCompression(ctx context.Context, cmdArgs []
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
|
||||
if compressStderr != nil {
|
||||
go func() {
|
||||
scanner := bufio.NewScanner(compressStderr)
|
||||
@@ -1344,30 +1344,30 @@ func (e *Engine) executeWithStreamingCompression(ctx context.Context, cmdArgs []
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
|
||||
// Start compression first
|
||||
if err := compressCmd.Start(); err != nil {
|
||||
return fmt.Errorf("failed to start compressor: %w", err)
|
||||
}
|
||||
|
||||
|
||||
// Then start pg_dump
|
||||
if err := dumpCmd.Start(); err != nil {
|
||||
return fmt.Errorf("failed to start pg_dump: %w", err)
|
||||
}
|
||||
|
||||
|
||||
// Wait for pg_dump to complete
|
||||
if err := dumpCmd.Wait(); err != nil {
|
||||
return fmt.Errorf("pg_dump failed: %w", err)
|
||||
}
|
||||
|
||||
|
||||
// Close stdout pipe to signal compressor we're done
|
||||
dumpStdout.Close()
|
||||
|
||||
|
||||
// Wait for compression to complete
|
||||
if err := compressCmd.Wait(); err != nil {
|
||||
return fmt.Errorf("compression failed: %w", err)
|
||||
}
|
||||
|
||||
|
||||
e.log.Debug("Streaming compression completed", "output", compressedFile)
|
||||
return nil
|
||||
}
|
||||
@@ -1384,4 +1384,4 @@ func formatBytes(bytes int64) string {
|
||||
exp++
|
||||
}
|
||||
return fmt.Sprintf("%.1f %cB", float64(bytes)/float64(div), "KMGTPE"[exp])
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user