Fix: Critical OOM issue in cluster restore - stream command output instead of loading into memory

- Replaced CombinedOutput() with streaming StderrPipe() in restore engine
- Fixed executeRestoreCommand() to read stderr in 4KB chunks
- Fixed executeRestoreWithDecompression() to stream output
- Fixed extractArchive() to avoid loading tar output into memory
- Fixed restoreGlobals() to stream large globals.sql files
- Only log ERROR/FATAL messages, not all output
- Prevents out-of-memory crashes on large database restores (GB+ data)

This fixes the 'fatal error: out of memory allocating heap arena metadata'
issue when restoring large cluster backups.
This commit is contained in:
2025-11-12 12:22:32 +00:00
parent 2019591b5b
commit 3d38e909b8

View File

@@ -271,11 +271,38 @@ func (e *Engine) executeRestoreCommand(ctx context.Context, cmdArgs []string) er
fmt.Sprintf("MYSQL_PWD=%s", e.cfg.Password), fmt.Sprintf("MYSQL_PWD=%s", e.cfg.Password),
) )
// Capture output // Stream stderr to avoid memory issues with large output
output, err := cmd.CombinedOutput() // Don't use CombinedOutput() as it loads everything into memory
stderr, err := cmd.StderrPipe()
if err != nil { if err != nil {
e.log.Error("Restore command failed", "error", err, "output", string(output)) return fmt.Errorf("failed to create stderr pipe: %w", err)
return fmt.Errorf("restore failed: %w\nOutput: %s", err, string(output)) }
if err := cmd.Start(); err != nil {
return fmt.Errorf("failed to start restore command: %w", err)
}
// Read stderr in chunks to log errors without loading all into memory
buf := make([]byte, 4096)
var lastError string
for {
n, err := stderr.Read(buf)
if n > 0 {
chunk := string(buf[:n])
// Only log errors/warnings, not all output
if strings.Contains(chunk, "ERROR") || strings.Contains(chunk, "FATAL") {
lastError = chunk
e.log.Warn("Restore stderr", "output", chunk)
}
}
if err != nil {
break
}
}
if err := cmd.Wait(); err != nil {
e.log.Error("Restore command failed", "error", err, "last_error", lastError)
return fmt.Errorf("restore failed: %w", err)
} }
e.log.Info("Restore command completed successfully") e.log.Info("Restore command completed successfully")
@@ -300,10 +327,37 @@ func (e *Engine) executeRestoreWithDecompression(ctx context.Context, archivePat
fmt.Sprintf("MYSQL_PWD=%s", e.cfg.Password), fmt.Sprintf("MYSQL_PWD=%s", e.cfg.Password),
) )
output, err := cmd.CombinedOutput() // Stream stderr to avoid memory issues with large output
stderr, err := cmd.StderrPipe()
if err != nil { if err != nil {
e.log.Error("Restore with decompression failed", "error", err, "output", string(output)) return fmt.Errorf("failed to create stderr pipe: %w", err)
return fmt.Errorf("restore failed: %w\nOutput: %s", err, string(output)) }
if err := cmd.Start(); err != nil {
return fmt.Errorf("failed to start restore command: %w", err)
}
// Read stderr in chunks to log errors without loading all into memory
buf := make([]byte, 4096)
var lastError string
for {
n, err := stderr.Read(buf)
if n > 0 {
chunk := string(buf[:n])
// Only log errors/warnings, not all output
if strings.Contains(chunk, "ERROR") || strings.Contains(chunk, "FATAL") {
lastError = chunk
e.log.Warn("Restore stderr", "output", chunk)
}
}
if err != nil {
break
}
}
if err := cmd.Wait(); err != nil {
e.log.Error("Restore with decompression failed", "error", err, "last_error", lastError)
return fmt.Errorf("restore failed: %w", err)
} }
return nil return nil
@@ -528,9 +582,28 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string) error {
// extractArchive extracts a tar.gz archive // extractArchive extracts a tar.gz archive
func (e *Engine) extractArchive(ctx context.Context, archivePath, destDir string) error { func (e *Engine) extractArchive(ctx context.Context, archivePath, destDir string) error {
cmd := exec.CommandContext(ctx, "tar", "-xzf", archivePath, "-C", destDir) cmd := exec.CommandContext(ctx, "tar", "-xzf", archivePath, "-C", destDir)
output, err := cmd.CombinedOutput()
// Stream stderr to avoid memory issues - tar can produce lots of output for large archives
stderr, err := cmd.StderrPipe()
if err != nil { if err != nil {
return fmt.Errorf("tar extraction failed: %w\nOutput: %s", err, string(output)) return fmt.Errorf("failed to create stderr pipe: %w", err)
}
if err := cmd.Start(); err != nil {
return fmt.Errorf("failed to start tar: %w", err)
}
// Discard stderr output in chunks to prevent memory buildup
buf := make([]byte, 4096)
for {
_, err := stderr.Read(buf)
if err != nil {
break
}
}
if err := cmd.Wait(); err != nil {
return fmt.Errorf("tar extraction failed: %w", err)
} }
return nil return nil
} }
@@ -553,9 +626,35 @@ func (e *Engine) restoreGlobals(ctx context.Context, globalsFile string) error {
cmd.Env = append(os.Environ(), fmt.Sprintf("PGPASSWORD=%s", e.cfg.Password)) cmd.Env = append(os.Environ(), fmt.Sprintf("PGPASSWORD=%s", e.cfg.Password))
output, err := cmd.CombinedOutput() // Stream output to avoid memory issues with large globals.sql files
stderr, err := cmd.StderrPipe()
if err != nil { if err != nil {
return fmt.Errorf("failed to restore globals: %w\nOutput: %s", err, string(output)) return fmt.Errorf("failed to create stderr pipe: %w", err)
}
if err := cmd.Start(); err != nil {
return fmt.Errorf("failed to start psql: %w", err)
}
// Read stderr in chunks
buf := make([]byte, 4096)
var lastError string
for {
n, err := stderr.Read(buf)
if n > 0 {
chunk := string(buf[:n])
if strings.Contains(chunk, "ERROR") || strings.Contains(chunk, "FATAL") {
lastError = chunk
e.log.Warn("Globals restore stderr", "output", chunk)
}
}
if err != nil {
break
}
}
if err := cmd.Wait(); err != nil {
return fmt.Errorf("failed to restore globals: %w (last error: %s)", err, lastError)
} }
return nil return nil