fix: Typo
This commit is contained in:
@@ -162,7 +162,7 @@ func (e *Engine) restorePostgreSQLDump(ctx context.Context, archivePath, targetD
|
||||
NoPrivileges: true,
|
||||
SingleTransaction: true,
|
||||
}
|
||||
|
||||
|
||||
cmd := e.db.BuildRestoreCommand(targetDB, archivePath, opts)
|
||||
|
||||
if compressed {
|
||||
@@ -178,18 +178,18 @@ func (e *Engine) restorePostgreSQLDumpWithOwnership(ctx context.Context, archive
|
||||
// Build restore command with ownership control
|
||||
opts := database.RestoreOptions{
|
||||
Parallel: 1,
|
||||
Clean: false, // We already dropped the database
|
||||
Clean: false, // We already dropped the database
|
||||
NoOwner: !preserveOwnership, // Preserve ownership if we're superuser
|
||||
NoPrivileges: !preserveOwnership, // Preserve privileges if we're superuser
|
||||
SingleTransaction: true,
|
||||
}
|
||||
|
||||
e.log.Info("Restoring database",
|
||||
"database", targetDB,
|
||||
|
||||
e.log.Info("Restoring database",
|
||||
"database", targetDB,
|
||||
"preserveOwnership", preserveOwnership,
|
||||
"noOwner", opts.NoOwner,
|
||||
"noPrivileges", opts.NoPrivileges)
|
||||
|
||||
|
||||
cmd := e.db.BuildRestoreCommand(targetDB, archivePath, opts)
|
||||
|
||||
if compressed {
|
||||
@@ -204,13 +204,13 @@ func (e *Engine) restorePostgreSQLDumpWithOwnership(ctx context.Context, archive
|
||||
func (e *Engine) restorePostgreSQLSQL(ctx context.Context, archivePath, targetDB string, compressed bool) error {
|
||||
// Use psql for SQL scripts
|
||||
var cmd []string
|
||||
|
||||
|
||||
// For localhost, omit -h to use Unix socket (avoids Ident auth issues)
|
||||
hostArg := ""
|
||||
if e.cfg.Host != "localhost" && e.cfg.Host != "" {
|
||||
hostArg = fmt.Sprintf("-h %s -p %d", e.cfg.Host, e.cfg.Port)
|
||||
}
|
||||
|
||||
|
||||
if compressed {
|
||||
psqlCmd := fmt.Sprintf("psql -U %s -d %s", e.cfg.User, targetDB)
|
||||
if hostArg != "" {
|
||||
@@ -447,7 +447,7 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string) error {
|
||||
e.log.Warn("Could not verify superuser status", "error", err)
|
||||
isSuperuser = false // Assume not superuser if check fails
|
||||
}
|
||||
|
||||
|
||||
if !isSuperuser {
|
||||
e.log.Warn("Current user is not a superuser - database ownership may not be fully restored")
|
||||
e.progress.Update("⚠️ Warning: Non-superuser - ownership restoration limited")
|
||||
@@ -493,14 +493,14 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string) error {
|
||||
|
||||
var failedDBs []string
|
||||
totalDBs := 0
|
||||
|
||||
|
||||
// Count total databases
|
||||
for _, entry := range entries {
|
||||
if !entry.IsDir() {
|
||||
totalDBs++
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Create ETA estimator for database restores
|
||||
estimator := progress.NewETAEstimator("Restoring cluster", totalDBs)
|
||||
e.progress.SetEstimator(estimator)
|
||||
@@ -510,28 +510,28 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string) error {
|
||||
if parallelism < 1 {
|
||||
parallelism = 1 // Ensure at least sequential
|
||||
}
|
||||
|
||||
|
||||
var successCount, failCount int32
|
||||
var failedDBsMu sync.Mutex
|
||||
var mu sync.Mutex // Protect shared resources (progress, logger)
|
||||
|
||||
|
||||
// Create semaphore to limit concurrency
|
||||
semaphore := make(chan struct{}, parallelism)
|
||||
var wg sync.WaitGroup
|
||||
|
||||
|
||||
dbIndex := 0
|
||||
for _, entry := range entries {
|
||||
if entry.IsDir() {
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
wg.Add(1)
|
||||
semaphore <- struct{}{} // Acquire
|
||||
|
||||
|
||||
go func(idx int, filename string) {
|
||||
defer wg.Done()
|
||||
defer func() { <-semaphore }() // Release
|
||||
|
||||
|
||||
// Update estimator progress (thread-safe)
|
||||
mu.Lock()
|
||||
estimator.UpdateProgress(idx)
|
||||
@@ -543,7 +543,7 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string) error {
|
||||
dbName = strings.TrimSuffix(dbName, ".sql.gz")
|
||||
|
||||
dbProgress := 15 + int(float64(idx)/float64(totalDBs)*85.0)
|
||||
|
||||
|
||||
mu.Lock()
|
||||
statusMsg := fmt.Sprintf("Restoring database %s (%d/%d)", dbName, idx+1, totalDBs)
|
||||
e.progress.Update(statusMsg)
|
||||
@@ -569,7 +569,7 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string) error {
|
||||
// STEP 3: Restore with ownership preservation if superuser
|
||||
preserveOwnership := isSuperuser
|
||||
isCompressedSQL := strings.HasSuffix(dumpFile, ".sql.gz")
|
||||
|
||||
|
||||
var restoreErr error
|
||||
if isCompressedSQL {
|
||||
e.log.Info("Detected compressed SQL format, using psql + gunzip", "file", dumpFile)
|
||||
@@ -578,7 +578,7 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string) error {
|
||||
e.log.Info("Detected custom dump format, using pg_restore", "file", dumpFile)
|
||||
restoreErr = e.restorePostgreSQLDumpWithOwnership(ctx, dumpFile, dbName, false, preserveOwnership)
|
||||
}
|
||||
|
||||
|
||||
if restoreErr != nil {
|
||||
e.log.Error("Failed to restore database", "name", dbName, "error", restoreErr)
|
||||
failedDBsMu.Lock()
|
||||
@@ -590,13 +590,13 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string) error {
|
||||
|
||||
atomic.AddInt32(&successCount, 1)
|
||||
}(dbIndex, entry.Name())
|
||||
|
||||
|
||||
dbIndex++
|
||||
}
|
||||
|
||||
|
||||
// Wait for all restores to complete
|
||||
wg.Wait()
|
||||
|
||||
|
||||
successCountFinal := int(atomic.LoadInt32(&successCount))
|
||||
failCountFinal := int(atomic.LoadInt32(&failCount))
|
||||
|
||||
@@ -615,7 +615,7 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string) error {
|
||||
// extractArchive extracts a tar.gz archive
|
||||
func (e *Engine) extractArchive(ctx context.Context, archivePath, destDir string) error {
|
||||
cmd := exec.CommandContext(ctx, "tar", "-xzf", archivePath, "-C", destDir)
|
||||
|
||||
|
||||
// Stream stderr to avoid memory issues - tar can produce lots of output for large archives
|
||||
stderr, err := cmd.StderrPipe()
|
||||
if err != nil {
|
||||
@@ -649,12 +649,12 @@ func (e *Engine) restoreGlobals(ctx context.Context, globalsFile string) error {
|
||||
"-d", "postgres",
|
||||
"-f", globalsFile,
|
||||
}
|
||||
|
||||
|
||||
// Only add -h flag if host is not localhost (to use Unix socket for peer auth)
|
||||
if e.cfg.Host != "localhost" && e.cfg.Host != "127.0.0.1" && e.cfg.Host != "" {
|
||||
args = append([]string{"-h", e.cfg.Host}, args...)
|
||||
}
|
||||
|
||||
|
||||
cmd := exec.CommandContext(ctx, "psql", args...)
|
||||
|
||||
cmd.Env = append(os.Environ(), fmt.Sprintf("PGPASSWORD=%s", e.cfg.Password))
|
||||
@@ -701,22 +701,22 @@ func (e *Engine) checkSuperuser(ctx context.Context) (bool, error) {
|
||||
"-d", "postgres",
|
||||
"-tAc", "SELECT usesuper FROM pg_user WHERE usename = current_user",
|
||||
}
|
||||
|
||||
|
||||
// Only add -h flag if host is not localhost (to use Unix socket for peer auth)
|
||||
if e.cfg.Host != "localhost" && e.cfg.Host != "127.0.0.1" && e.cfg.Host != "" {
|
||||
args = append([]string{"-h", e.cfg.Host}, args...)
|
||||
}
|
||||
|
||||
|
||||
cmd := exec.CommandContext(ctx, "psql", args...)
|
||||
|
||||
|
||||
// Always set PGPASSWORD (empty string is fine for peer/ident auth)
|
||||
cmd.Env = append(os.Environ(), fmt.Sprintf("PGPASSWORD=%s", e.cfg.Password))
|
||||
|
||||
|
||||
output, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed to check superuser status: %w", err)
|
||||
}
|
||||
|
||||
|
||||
isSuperuser := strings.TrimSpace(string(output)) == "t"
|
||||
return isSuperuser, nil
|
||||
}
|
||||
@@ -729,30 +729,30 @@ func (e *Engine) terminateConnections(ctx context.Context, dbName string) error
|
||||
WHERE datname = '%s'
|
||||
AND pid <> pg_backend_pid()
|
||||
`, dbName)
|
||||
|
||||
|
||||
args := []string{
|
||||
"-p", fmt.Sprintf("%d", e.cfg.Port),
|
||||
"-U", e.cfg.User,
|
||||
"-d", "postgres",
|
||||
"-tAc", query,
|
||||
}
|
||||
|
||||
|
||||
// Only add -h flag if host is not localhost (to use Unix socket for peer auth)
|
||||
if e.cfg.Host != "localhost" && e.cfg.Host != "127.0.0.1" && e.cfg.Host != "" {
|
||||
args = append([]string{"-h", e.cfg.Host}, args...)
|
||||
}
|
||||
|
||||
|
||||
cmd := exec.CommandContext(ctx, "psql", args...)
|
||||
|
||||
|
||||
// Always set PGPASSWORD (empty string is fine for peer/ident auth)
|
||||
cmd.Env = append(os.Environ(), fmt.Sprintf("PGPASSWORD=%s", e.cfg.Password))
|
||||
|
||||
|
||||
output, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
e.log.Warn("Failed to terminate connections", "database", dbName, "error", err, "output", string(output))
|
||||
// Don't fail - database might not exist or have no connections
|
||||
}
|
||||
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -762,10 +762,10 @@ func (e *Engine) dropDatabaseIfExists(ctx context.Context, dbName string) error
|
||||
if err := e.terminateConnections(ctx, dbName); err != nil {
|
||||
e.log.Warn("Could not terminate connections", "database", dbName, "error", err)
|
||||
}
|
||||
|
||||
|
||||
// Wait a moment for connections to terminate
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
|
||||
// Drop the database
|
||||
args := []string{
|
||||
"-p", fmt.Sprintf("%d", e.cfg.Port),
|
||||
@@ -773,22 +773,22 @@ func (e *Engine) dropDatabaseIfExists(ctx context.Context, dbName string) error
|
||||
"-d", "postgres",
|
||||
"-c", fmt.Sprintf("DROP DATABASE IF EXISTS \"%s\"", dbName),
|
||||
}
|
||||
|
||||
|
||||
// Only add -h flag if host is not localhost (to use Unix socket for peer auth)
|
||||
if e.cfg.Host != "localhost" && e.cfg.Host != "127.0.0.1" && e.cfg.Host != "" {
|
||||
args = append([]string{"-h", e.cfg.Host}, args...)
|
||||
}
|
||||
|
||||
|
||||
cmd := exec.CommandContext(ctx, "psql", args...)
|
||||
|
||||
|
||||
// Always set PGPASSWORD (empty string is fine for peer/ident auth)
|
||||
cmd.Env = append(os.Environ(), fmt.Sprintf("PGPASSWORD=%s", e.cfg.Password))
|
||||
|
||||
|
||||
output, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to drop database '%s': %w\nOutput: %s", dbName, err, string(output))
|
||||
}
|
||||
|
||||
|
||||
e.log.Info("Dropped existing database", "name", dbName)
|
||||
return nil
|
||||
}
|
||||
@@ -803,23 +803,23 @@ func (e *Engine) ensureDatabaseExists(ctx context.Context, dbName string) error
|
||||
"-d", database,
|
||||
"-tAc", query,
|
||||
}
|
||||
|
||||
|
||||
// Only add -h flag if host is not localhost (to use Unix socket for peer auth)
|
||||
if e.cfg.Host != "localhost" && e.cfg.Host != "127.0.0.1" && e.cfg.Host != "" {
|
||||
args = append([]string{"-h", e.cfg.Host}, args...)
|
||||
}
|
||||
|
||||
|
||||
cmd := exec.CommandContext(ctx, "psql", args...)
|
||||
|
||||
|
||||
// Always set PGPASSWORD (empty string is fine for peer/ident auth)
|
||||
cmd.Env = append(os.Environ(), fmt.Sprintf("PGPASSWORD=%s", e.cfg.Password))
|
||||
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
// Check if database exists
|
||||
checkCmd := buildPsqlCmd(ctx, "postgres", fmt.Sprintf("SELECT 1 FROM pg_database WHERE datname = '%s'", dbName))
|
||||
|
||||
|
||||
output, err := checkCmd.CombinedOutput()
|
||||
if err != nil {
|
||||
e.log.Warn("Database existence check failed", "name", dbName, "error", err, "output", string(output))
|
||||
@@ -834,29 +834,29 @@ func (e *Engine) ensureDatabaseExists(ctx context.Context, dbName string) error
|
||||
|
||||
// Database doesn't exist, create it
|
||||
e.log.Info("Creating database", "name", dbName)
|
||||
|
||||
|
||||
createArgs := []string{
|
||||
"-p", fmt.Sprintf("%d", e.cfg.Port),
|
||||
"-U", e.cfg.User,
|
||||
"-d", "postgres",
|
||||
"-c", fmt.Sprintf("CREATE DATABASE \"%s\"", dbName),
|
||||
}
|
||||
|
||||
|
||||
// Only add -h flag if host is not localhost (to use Unix socket for peer auth)
|
||||
if e.cfg.Host != "localhost" && e.cfg.Host != "127.0.0.1" && e.cfg.Host != "" {
|
||||
createArgs = append([]string{"-h", e.cfg.Host}, createArgs...)
|
||||
}
|
||||
|
||||
|
||||
createCmd := exec.CommandContext(ctx, "psql", createArgs...)
|
||||
|
||||
|
||||
// Always set PGPASSWORD (empty string is fine for peer/ident auth)
|
||||
createCmd.Env = append(os.Environ(), fmt.Sprintf("PGPASSWORD=%s", e.cfg.Password))
|
||||
|
||||
output, err = createCmd.CombinedOutput()
|
||||
if err != nil {
|
||||
// Log the error but don't fail - pg_restore might handle it
|
||||
// Log the error and include the psql output in the returned error to aid debugging
|
||||
e.log.Warn("Database creation failed", "name", dbName, "error", err, "output", string(output))
|
||||
return fmt.Errorf("failed to create database '%s': %w", dbName, err)
|
||||
return fmt.Errorf("failed to create database '%s': %w (output: %s)", dbName, err, strings.TrimSpace(string(output)))
|
||||
}
|
||||
|
||||
e.log.Info("Successfully created database", "name", dbName)
|
||||
|
||||
Reference in New Issue
Block a user