Fix cluster backup OOM: streaming output, longer timeouts, parallel dumps
This commit is contained in:
@ -297,13 +297,36 @@ func (e *Engine) BackupCluster(ctx context.Context) error {
|
|||||||
|
|
||||||
// Backup each database
|
// Backup each database
|
||||||
e.printf(" Backing up %d databases...\n", len(databases))
|
e.printf(" Backing up %d databases...\n", len(databases))
|
||||||
|
successCount := 0
|
||||||
|
failCount := 0
|
||||||
|
|
||||||
for i, dbName := range databases {
|
for i, dbName := range databases {
|
||||||
e.printf(" Backing up database %d/%d: %s\n", i+1, len(databases), dbName)
|
e.printf(" [%d/%d] Backing up database: %s\n", i+1, len(databases), dbName)
|
||||||
|
quietProgress.Update(fmt.Sprintf("Backing up database %d/%d: %s", i+1, len(databases), dbName))
|
||||||
|
|
||||||
|
// Check database size and warn if very large
|
||||||
|
if size, err := e.db.GetDatabaseSize(ctx, dbName); err == nil {
|
||||||
|
sizeStr := formatBytes(size)
|
||||||
|
e.printf(" Database size: %s\n", sizeStr)
|
||||||
|
if size > 10*1024*1024*1024 { // > 10GB
|
||||||
|
e.printf(" ⚠️ Large database detected - this may take a while\n")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
dumpFile := filepath.Join(tempDir, "dumps", dbName+".dump")
|
dumpFile := filepath.Join(tempDir, "dumps", dbName+".dump")
|
||||||
|
|
||||||
|
// For cluster backups, use settings optimized for large databases:
|
||||||
|
// - Lower compression (faster, less memory)
|
||||||
|
// - Use parallel dumps if configured
|
||||||
|
// - Custom format with moderate compression
|
||||||
|
compressionLevel := e.cfg.CompressionLevel
|
||||||
|
if compressionLevel > 6 {
|
||||||
|
compressionLevel = 6 // Cap at 6 for cluster backups to reduce memory
|
||||||
|
}
|
||||||
|
|
||||||
options := database.BackupOptions{
|
options := database.BackupOptions{
|
||||||
Compression: e.cfg.CompressionLevel,
|
Compression: compressionLevel,
|
||||||
Parallel: 1, // Individual dumps in cluster are not parallel
|
Parallel: e.cfg.DumpJobs, // Use parallel dumps for large databases
|
||||||
Format: "custom",
|
Format: "custom",
|
||||||
Blobs: true,
|
Blobs: true,
|
||||||
NoOwner: false,
|
NoOwner: false,
|
||||||
@ -311,12 +334,28 @@ func (e *Engine) BackupCluster(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
cmd := e.db.BuildBackupCommand(dbName, dumpFile, options)
|
cmd := e.db.BuildBackupCommand(dbName, dumpFile, options)
|
||||||
if err := e.executeCommand(ctx, cmd, dumpFile); err != nil {
|
|
||||||
|
// Use a context with timeout for each database to prevent hangs
|
||||||
|
// Use longer timeout for huge databases (2 hours per database)
|
||||||
|
dbCtx, cancel := context.WithTimeout(ctx, 2*time.Hour)
|
||||||
|
err := e.executeCommand(dbCtx, cmd, dumpFile)
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
e.log.Warn("Failed to backup database", "database", dbName, "error", err)
|
e.log.Warn("Failed to backup database", "database", dbName, "error", err)
|
||||||
|
e.printf(" ⚠️ WARNING: Failed to backup %s: %v\n", dbName, err)
|
||||||
|
failCount++
|
||||||
// Continue with other databases
|
// Continue with other databases
|
||||||
|
} else {
|
||||||
|
if info, err := os.Stat(dumpFile); err == nil {
|
||||||
|
e.printf(" ✅ Completed %s (%s)\n", dbName, formatBytes(info.Size()))
|
||||||
|
}
|
||||||
|
successCount++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
e.printf(" Backup summary: %d succeeded, %d failed\n", successCount, failCount)
|
||||||
|
|
||||||
// Create archive
|
// Create archive
|
||||||
e.printf(" Creating compressed archive...\n")
|
e.printf(" Creating compressed archive...\n")
|
||||||
if err := e.createArchive(tempDir, outputFile); err != nil {
|
if err := e.createArchive(tempDir, outputFile); err != nil {
|
||||||
@ -618,10 +657,62 @@ func (e *Engine) backupGlobals(ctx context.Context, tempDir string) error {
|
|||||||
|
|
||||||
// createArchive creates a compressed tar archive
|
// createArchive creates a compressed tar archive
|
||||||
func (e *Engine) createArchive(sourceDir, outputFile string) error {
|
func (e *Engine) createArchive(sourceDir, outputFile string) error {
|
||||||
cmd := exec.Command("tar", "-czf", outputFile, "-C", sourceDir, ".")
|
// Use pigz for faster parallel compression if available, otherwise use standard gzip
|
||||||
output, err := cmd.CombinedOutput()
|
compressCmd := "tar"
|
||||||
if err != nil {
|
compressArgs := []string{"-czf", outputFile, "-C", sourceDir, "."}
|
||||||
return fmt.Errorf("tar failed: %w, output: %s", err, string(output))
|
|
||||||
|
// Check if pigz is available for faster parallel compression
|
||||||
|
if _, err := exec.LookPath("pigz"); err == nil {
|
||||||
|
// Use pigz with number of cores for parallel compression
|
||||||
|
compressArgs = []string{"-cf", "-", "-C", sourceDir, "."}
|
||||||
|
cmd := exec.Command("tar", compressArgs...)
|
||||||
|
|
||||||
|
// Pipe to pigz for parallel compression
|
||||||
|
pigzCmd := exec.Command("pigz", "-p", strconv.Itoa(e.cfg.Jobs), ">", outputFile)
|
||||||
|
|
||||||
|
tarOut, err := cmd.StdoutPipe()
|
||||||
|
if err != nil {
|
||||||
|
// Fallback to regular tar
|
||||||
|
goto regularTar
|
||||||
|
}
|
||||||
|
pigzCmd.Stdin = tarOut
|
||||||
|
|
||||||
|
if err := pigzCmd.Start(); err != nil {
|
||||||
|
goto regularTar
|
||||||
|
}
|
||||||
|
if err := cmd.Start(); err != nil {
|
||||||
|
goto regularTar
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := cmd.Wait(); err != nil {
|
||||||
|
return fmt.Errorf("tar failed: %w", err)
|
||||||
|
}
|
||||||
|
if err := pigzCmd.Wait(); err != nil {
|
||||||
|
return fmt.Errorf("pigz compression failed: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
regularTar:
|
||||||
|
// Standard tar with gzip (fallback)
|
||||||
|
cmd := exec.Command(compressCmd, compressArgs...)
|
||||||
|
|
||||||
|
// Stream stderr to avoid memory issues
|
||||||
|
stderr, err := cmd.StderrPipe()
|
||||||
|
if err == nil {
|
||||||
|
go func() {
|
||||||
|
scanner := bufio.NewScanner(stderr)
|
||||||
|
for scanner.Scan() {
|
||||||
|
line := scanner.Text()
|
||||||
|
if line != "" {
|
||||||
|
e.log.Debug("Archive creation", "output", line)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := cmd.Run(); err != nil {
|
||||||
|
return fmt.Errorf("tar failed: %w", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -683,11 +774,33 @@ func (e *Engine) executeCommand(ctx context.Context, cmdArgs []string, outputFil
|
|||||||
return e.executeMySQLWithCompression(ctx, cmdArgs, outputFile)
|
return e.executeMySQLWithCompression(ctx, cmdArgs, outputFile)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run the command
|
// Stream stderr to avoid memory issues with large databases
|
||||||
output, err := cmd.CombinedOutput()
|
stderr, err := cmd.StderrPipe()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.log.Error("Backup command failed", "error", err, "output", string(output))
|
return fmt.Errorf("failed to create stderr pipe: %w", err)
|
||||||
return fmt.Errorf("backup command failed: %w, output: %s", err, string(output))
|
}
|
||||||
|
|
||||||
|
// Start the command
|
||||||
|
if err := cmd.Start(); err != nil {
|
||||||
|
return fmt.Errorf("failed to start backup command: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stream stderr output (don't buffer it all in memory)
|
||||||
|
go func() {
|
||||||
|
scanner := bufio.NewScanner(stderr)
|
||||||
|
scanner.Buffer(make([]byte, 64*1024), 1024*1024) // 1MB max line size
|
||||||
|
for scanner.Scan() {
|
||||||
|
line := scanner.Text()
|
||||||
|
if line != "" {
|
||||||
|
e.log.Debug("Backup output", "line", line)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Wait for command to complete
|
||||||
|
if err := cmd.Wait(); err != nil {
|
||||||
|
e.log.Error("Backup command failed", "error", err, "database", filepath.Base(outputFile))
|
||||||
|
return fmt.Errorf("backup command failed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
Reference in New Issue
Block a user