diff --git a/internal/restore/engine.go b/internal/restore/engine.go index 0074272..a7a261e 100644 --- a/internal/restore/engine.go +++ b/internal/restore/engine.go @@ -162,7 +162,7 @@ func (e *Engine) restorePostgreSQLDump(ctx context.Context, archivePath, targetD NoPrivileges: true, SingleTransaction: true, } - + cmd := e.db.BuildRestoreCommand(targetDB, archivePath, opts) if compressed { @@ -178,18 +178,18 @@ func (e *Engine) restorePostgreSQLDumpWithOwnership(ctx context.Context, archive // Build restore command with ownership control opts := database.RestoreOptions{ Parallel: 1, - Clean: false, // We already dropped the database + Clean: false, // We already dropped the database NoOwner: !preserveOwnership, // Preserve ownership if we're superuser NoPrivileges: !preserveOwnership, // Preserve privileges if we're superuser SingleTransaction: true, } - - e.log.Info("Restoring database", - "database", targetDB, + + e.log.Info("Restoring database", + "database", targetDB, "preserveOwnership", preserveOwnership, "noOwner", opts.NoOwner, "noPrivileges", opts.NoPrivileges) - + cmd := e.db.BuildRestoreCommand(targetDB, archivePath, opts) if compressed { @@ -204,13 +204,13 @@ func (e *Engine) restorePostgreSQLDumpWithOwnership(ctx context.Context, archive func (e *Engine) restorePostgreSQLSQL(ctx context.Context, archivePath, targetDB string, compressed bool) error { // Use psql for SQL scripts var cmd []string - + // For localhost, omit -h to use Unix socket (avoids Ident auth issues) hostArg := "" if e.cfg.Host != "localhost" && e.cfg.Host != "" { hostArg = fmt.Sprintf("-h %s -p %d", e.cfg.Host, e.cfg.Port) } - + if compressed { psqlCmd := fmt.Sprintf("psql -U %s -d %s", e.cfg.User, targetDB) if hostArg != "" { @@ -447,7 +447,7 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string) error { e.log.Warn("Could not verify superuser status", "error", err) isSuperuser = false // Assume not superuser if check fails } - + if !isSuperuser { e.log.Warn("Current user is not a superuser - database ownership may not be fully restored") e.progress.Update("⚠️ Warning: Non-superuser - ownership restoration limited") @@ -493,14 +493,14 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string) error { var failedDBs []string totalDBs := 0 - + // Count total databases for _, entry := range entries { if !entry.IsDir() { totalDBs++ } } - + // Create ETA estimator for database restores estimator := progress.NewETAEstimator("Restoring cluster", totalDBs) e.progress.SetEstimator(estimator) @@ -510,28 +510,28 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string) error { if parallelism < 1 { parallelism = 1 // Ensure at least sequential } - + var successCount, failCount int32 var failedDBsMu sync.Mutex var mu sync.Mutex // Protect shared resources (progress, logger) - + // Create semaphore to limit concurrency semaphore := make(chan struct{}, parallelism) var wg sync.WaitGroup - + dbIndex := 0 for _, entry := range entries { if entry.IsDir() { continue } - + wg.Add(1) semaphore <- struct{}{} // Acquire - + go func(idx int, filename string) { defer wg.Done() defer func() { <-semaphore }() // Release - + // Update estimator progress (thread-safe) mu.Lock() estimator.UpdateProgress(idx) @@ -543,7 +543,7 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string) error { dbName = strings.TrimSuffix(dbName, ".sql.gz") dbProgress := 15 + int(float64(idx)/float64(totalDBs)*85.0) - + mu.Lock() statusMsg := fmt.Sprintf("Restoring database %s (%d/%d)", dbName, idx+1, totalDBs) e.progress.Update(statusMsg) @@ -569,7 +569,7 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string) error { // STEP 3: Restore with ownership preservation if superuser preserveOwnership := isSuperuser isCompressedSQL := strings.HasSuffix(dumpFile, ".sql.gz") - + var restoreErr error if isCompressedSQL { e.log.Info("Detected compressed SQL format, using psql + gunzip", "file", dumpFile) @@ -578,7 +578,7 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string) error { e.log.Info("Detected custom dump format, using pg_restore", "file", dumpFile) restoreErr = e.restorePostgreSQLDumpWithOwnership(ctx, dumpFile, dbName, false, preserveOwnership) } - + if restoreErr != nil { e.log.Error("Failed to restore database", "name", dbName, "error", restoreErr) failedDBsMu.Lock() @@ -590,13 +590,13 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string) error { atomic.AddInt32(&successCount, 1) }(dbIndex, entry.Name()) - + dbIndex++ } - + // Wait for all restores to complete wg.Wait() - + successCountFinal := int(atomic.LoadInt32(&successCount)) failCountFinal := int(atomic.LoadInt32(&failCount)) @@ -615,7 +615,7 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string) error { // extractArchive extracts a tar.gz archive func (e *Engine) extractArchive(ctx context.Context, archivePath, destDir string) error { cmd := exec.CommandContext(ctx, "tar", "-xzf", archivePath, "-C", destDir) - + // Stream stderr to avoid memory issues - tar can produce lots of output for large archives stderr, err := cmd.StderrPipe() if err != nil { @@ -649,12 +649,12 @@ func (e *Engine) restoreGlobals(ctx context.Context, globalsFile string) error { "-d", "postgres", "-f", globalsFile, } - + // Only add -h flag if host is not localhost (to use Unix socket for peer auth) if e.cfg.Host != "localhost" && e.cfg.Host != "127.0.0.1" && e.cfg.Host != "" { args = append([]string{"-h", e.cfg.Host}, args...) } - + cmd := exec.CommandContext(ctx, "psql", args...) cmd.Env = append(os.Environ(), fmt.Sprintf("PGPASSWORD=%s", e.cfg.Password)) @@ -701,22 +701,22 @@ func (e *Engine) checkSuperuser(ctx context.Context) (bool, error) { "-d", "postgres", "-tAc", "SELECT usesuper FROM pg_user WHERE usename = current_user", } - + // Only add -h flag if host is not localhost (to use Unix socket for peer auth) if e.cfg.Host != "localhost" && e.cfg.Host != "127.0.0.1" && e.cfg.Host != "" { args = append([]string{"-h", e.cfg.Host}, args...) } - + cmd := exec.CommandContext(ctx, "psql", args...) - + // Always set PGPASSWORD (empty string is fine for peer/ident auth) cmd.Env = append(os.Environ(), fmt.Sprintf("PGPASSWORD=%s", e.cfg.Password)) - + output, err := cmd.CombinedOutput() if err != nil { return false, fmt.Errorf("failed to check superuser status: %w", err) } - + isSuperuser := strings.TrimSpace(string(output)) == "t" return isSuperuser, nil } @@ -729,30 +729,30 @@ func (e *Engine) terminateConnections(ctx context.Context, dbName string) error WHERE datname = '%s' AND pid <> pg_backend_pid() `, dbName) - + args := []string{ "-p", fmt.Sprintf("%d", e.cfg.Port), "-U", e.cfg.User, "-d", "postgres", "-tAc", query, } - + // Only add -h flag if host is not localhost (to use Unix socket for peer auth) if e.cfg.Host != "localhost" && e.cfg.Host != "127.0.0.1" && e.cfg.Host != "" { args = append([]string{"-h", e.cfg.Host}, args...) } - + cmd := exec.CommandContext(ctx, "psql", args...) - + // Always set PGPASSWORD (empty string is fine for peer/ident auth) cmd.Env = append(os.Environ(), fmt.Sprintf("PGPASSWORD=%s", e.cfg.Password)) - + output, err := cmd.CombinedOutput() if err != nil { e.log.Warn("Failed to terminate connections", "database", dbName, "error", err, "output", string(output)) // Don't fail - database might not exist or have no connections } - + return nil } @@ -762,10 +762,10 @@ func (e *Engine) dropDatabaseIfExists(ctx context.Context, dbName string) error if err := e.terminateConnections(ctx, dbName); err != nil { e.log.Warn("Could not terminate connections", "database", dbName, "error", err) } - + // Wait a moment for connections to terminate time.Sleep(500 * time.Millisecond) - + // Drop the database args := []string{ "-p", fmt.Sprintf("%d", e.cfg.Port), @@ -773,22 +773,22 @@ func (e *Engine) dropDatabaseIfExists(ctx context.Context, dbName string) error "-d", "postgres", "-c", fmt.Sprintf("DROP DATABASE IF EXISTS \"%s\"", dbName), } - + // Only add -h flag if host is not localhost (to use Unix socket for peer auth) if e.cfg.Host != "localhost" && e.cfg.Host != "127.0.0.1" && e.cfg.Host != "" { args = append([]string{"-h", e.cfg.Host}, args...) } - + cmd := exec.CommandContext(ctx, "psql", args...) - + // Always set PGPASSWORD (empty string is fine for peer/ident auth) cmd.Env = append(os.Environ(), fmt.Sprintf("PGPASSWORD=%s", e.cfg.Password)) - + output, err := cmd.CombinedOutput() if err != nil { return fmt.Errorf("failed to drop database '%s': %w\nOutput: %s", dbName, err, string(output)) } - + e.log.Info("Dropped existing database", "name", dbName) return nil } @@ -803,23 +803,23 @@ func (e *Engine) ensureDatabaseExists(ctx context.Context, dbName string) error "-d", database, "-tAc", query, } - + // Only add -h flag if host is not localhost (to use Unix socket for peer auth) if e.cfg.Host != "localhost" && e.cfg.Host != "127.0.0.1" && e.cfg.Host != "" { args = append([]string{"-h", e.cfg.Host}, args...) } - + cmd := exec.CommandContext(ctx, "psql", args...) - + // Always set PGPASSWORD (empty string is fine for peer/ident auth) cmd.Env = append(os.Environ(), fmt.Sprintf("PGPASSWORD=%s", e.cfg.Password)) - + return cmd } // Check if database exists checkCmd := buildPsqlCmd(ctx, "postgres", fmt.Sprintf("SELECT 1 FROM pg_database WHERE datname = '%s'", dbName)) - + output, err := checkCmd.CombinedOutput() if err != nil { e.log.Warn("Database existence check failed", "name", dbName, "error", err, "output", string(output)) @@ -834,29 +834,29 @@ func (e *Engine) ensureDatabaseExists(ctx context.Context, dbName string) error // Database doesn't exist, create it e.log.Info("Creating database", "name", dbName) - + createArgs := []string{ "-p", fmt.Sprintf("%d", e.cfg.Port), "-U", e.cfg.User, "-d", "postgres", "-c", fmt.Sprintf("CREATE DATABASE \"%s\"", dbName), } - + // Only add -h flag if host is not localhost (to use Unix socket for peer auth) if e.cfg.Host != "localhost" && e.cfg.Host != "127.0.0.1" && e.cfg.Host != "" { createArgs = append([]string{"-h", e.cfg.Host}, createArgs...) } - + createCmd := exec.CommandContext(ctx, "psql", createArgs...) - + // Always set PGPASSWORD (empty string is fine for peer/ident auth) createCmd.Env = append(os.Environ(), fmt.Sprintf("PGPASSWORD=%s", e.cfg.Password)) output, err = createCmd.CombinedOutput() if err != nil { - // Log the error but don't fail - pg_restore might handle it + // Log the error and include the psql output in the returned error to aid debugging e.log.Warn("Database creation failed", "name", dbName, "error", err, "output", string(output)) - return fmt.Errorf("failed to create database '%s': %w", dbName, err) + return fmt.Errorf("failed to create database '%s': %w (output: %s)", dbName, err, strings.TrimSpace(string(output))) } e.log.Info("Successfully created database", "name", dbName)