From 3d38e909b868a50e435783d0edffdd067b44cdfd Mon Sep 17 00:00:00 2001 From: Renz Date: Wed, 12 Nov 2025 12:22:32 +0000 Subject: [PATCH] Fix: Critical OOM issue in cluster restore - stream command output instead of loading into memory - Replaced CombinedOutput() with streaming StderrPipe() in restore engine - Fixed executeRestoreCommand() to read stderr in 4KB chunks - Fixed executeRestoreWithDecompression() to stream output - Fixed extractArchive() to avoid loading tar output into memory - Fixed restoreGlobals() to stream large globals.sql files - Only log ERROR/FATAL messages, not all output - Prevents out-of-memory crashes on large database restores (GB+ data) This fixes the 'fatal error: out of memory allocating heap arena metadata' issue when restoring large cluster backups. --- internal/restore/engine.go | 121 +++++++++++++++++++++++++++++++++---- 1 file changed, 110 insertions(+), 11 deletions(-) diff --git a/internal/restore/engine.go b/internal/restore/engine.go index 2d227d6..0472e61 100644 --- a/internal/restore/engine.go +++ b/internal/restore/engine.go @@ -271,11 +271,38 @@ func (e *Engine) executeRestoreCommand(ctx context.Context, cmdArgs []string) er fmt.Sprintf("MYSQL_PWD=%s", e.cfg.Password), ) - // Capture output - output, err := cmd.CombinedOutput() + // Stream stderr to avoid memory issues with large output + // Don't use CombinedOutput() as it loads everything into memory + stderr, err := cmd.StderrPipe() if err != nil { - e.log.Error("Restore command failed", "error", err, "output", string(output)) - return fmt.Errorf("restore failed: %w\nOutput: %s", err, string(output)) + return fmt.Errorf("failed to create stderr pipe: %w", err) + } + + if err := cmd.Start(); err != nil { + return fmt.Errorf("failed to start restore command: %w", err) + } + + // Read stderr in chunks to log errors without loading all into memory + buf := make([]byte, 4096) + var lastError string + for { + n, err := stderr.Read(buf) + if n > 0 { + chunk := string(buf[:n]) + // Only log errors/warnings, not all output + if strings.Contains(chunk, "ERROR") || strings.Contains(chunk, "FATAL") { + lastError = chunk + e.log.Warn("Restore stderr", "output", chunk) + } + } + if err != nil { + break + } + } + + if err := cmd.Wait(); err != nil { + e.log.Error("Restore command failed", "error", err, "last_error", lastError) + return fmt.Errorf("restore failed: %w", err) } e.log.Info("Restore command completed successfully") @@ -300,10 +327,37 @@ func (e *Engine) executeRestoreWithDecompression(ctx context.Context, archivePat fmt.Sprintf("MYSQL_PWD=%s", e.cfg.Password), ) - output, err := cmd.CombinedOutput() + // Stream stderr to avoid memory issues with large output + stderr, err := cmd.StderrPipe() if err != nil { - e.log.Error("Restore with decompression failed", "error", err, "output", string(output)) - return fmt.Errorf("restore failed: %w\nOutput: %s", err, string(output)) + return fmt.Errorf("failed to create stderr pipe: %w", err) + } + + if err := cmd.Start(); err != nil { + return fmt.Errorf("failed to start restore command: %w", err) + } + + // Read stderr in chunks to log errors without loading all into memory + buf := make([]byte, 4096) + var lastError string + for { + n, err := stderr.Read(buf) + if n > 0 { + chunk := string(buf[:n]) + // Only log errors/warnings, not all output + if strings.Contains(chunk, "ERROR") || strings.Contains(chunk, "FATAL") { + lastError = chunk + e.log.Warn("Restore stderr", "output", chunk) + } + } + if err != nil { + break + } + } + + if err := cmd.Wait(); err != nil { + e.log.Error("Restore with decompression failed", "error", err, "last_error", lastError) + return fmt.Errorf("restore failed: %w", err) } return nil @@ -528,9 +582,28 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string) error { // extractArchive extracts a tar.gz archive func (e *Engine) extractArchive(ctx context.Context, archivePath, destDir string) error { cmd := exec.CommandContext(ctx, "tar", "-xzf", archivePath, "-C", destDir) - output, err := cmd.CombinedOutput() + + // Stream stderr to avoid memory issues - tar can produce lots of output for large archives + stderr, err := cmd.StderrPipe() if err != nil { - return fmt.Errorf("tar extraction failed: %w\nOutput: %s", err, string(output)) + return fmt.Errorf("failed to create stderr pipe: %w", err) + } + + if err := cmd.Start(); err != nil { + return fmt.Errorf("failed to start tar: %w", err) + } + + // Discard stderr output in chunks to prevent memory buildup + buf := make([]byte, 4096) + for { + _, err := stderr.Read(buf) + if err != nil { + break + } + } + + if err := cmd.Wait(); err != nil { + return fmt.Errorf("tar extraction failed: %w", err) } return nil } @@ -553,9 +626,35 @@ func (e *Engine) restoreGlobals(ctx context.Context, globalsFile string) error { cmd.Env = append(os.Environ(), fmt.Sprintf("PGPASSWORD=%s", e.cfg.Password)) - output, err := cmd.CombinedOutput() + // Stream output to avoid memory issues with large globals.sql files + stderr, err := cmd.StderrPipe() if err != nil { - return fmt.Errorf("failed to restore globals: %w\nOutput: %s", err, string(output)) + return fmt.Errorf("failed to create stderr pipe: %w", err) + } + + if err := cmd.Start(); err != nil { + return fmt.Errorf("failed to start psql: %w", err) + } + + // Read stderr in chunks + buf := make([]byte, 4096) + var lastError string + for { + n, err := stderr.Read(buf) + if n > 0 { + chunk := string(buf[:n]) + if strings.Contains(chunk, "ERROR") || strings.Contains(chunk, "FATAL") { + lastError = chunk + e.log.Warn("Globals restore stderr", "output", chunk) + } + } + if err != nil { + break + } + } + + if err := cmd.Wait(); err != nil { + return fmt.Errorf("failed to restore globals: %w (last error: %s)", err, lastError) } return nil