feat: add database migration between servers

- Add 'migrate cluster' command for full cluster migration
- Add 'migrate single' command for single database migration
- Support PostgreSQL and MySQL database migration
- Staged migration: backup from source → restore to target
- Pre-flight checks validate connectivity before execution
- Dry-run mode by default (--confirm to execute)
- Support for --clean, --keep-backup, --exclude options
- Parallel backup/restore with configurable jobs
- Automatic cleanup of temporary backup files
This commit is contained in:
2025-12-13 18:25:28 +01:00
parent 1ccfdbcf52
commit 2becde8077
2 changed files with 1019 additions and 0 deletions

450
cmd/migrate.go Normal file
View File

@@ -0,0 +1,450 @@
package cmd
import (
"context"
"fmt"
"os"
"os/signal"
"path/filepath"
"syscall"
"time"
"dbbackup/internal/config"
"dbbackup/internal/migrate"
"github.com/spf13/cobra"
)
var (
// Source connection flags
migrateSourceHost string
migrateSourcePort int
migrateSourceUser string
migrateSourcePassword string
migrateSourceSSLMode string
// Target connection flags
migrateTargetHost string
migrateTargetPort int
migrateTargetUser string
migrateTargetPassword string
migrateTargetDatabase string
migrateTargetSSLMode string
// Migration options
migrateWorkdir string
migrateClean bool
migrateConfirm bool
migrateDryRun bool
migrateKeepBackup bool
migrateJobs int
migrateVerbose bool
migrateExclude []string
)
// migrateCmd represents the migrate command
var migrateCmd = &cobra.Command{
Use: "migrate",
Short: "Migrate databases between servers",
Long: `Migrate databases from one server to another.
This command performs a staged migration:
1. Creates a backup from the source server
2. Stores backup in a working directory
3. Restores the backup to the target server
4. Cleans up temporary files (unless --keep-backup)
Supports PostgreSQL and MySQL cluster migration or single database migration.
Examples:
# Migrate entire PostgreSQL cluster
dbbackup migrate cluster \
--source-host old-server --source-port 5432 --source-user postgres \
--target-host new-server --target-port 5432 --target-user postgres \
--confirm
# Migrate single database
dbbackup migrate single mydb \
--source-host old-server --source-user postgres \
--target-host new-server --target-user postgres \
--confirm
# Dry-run to preview migration
dbbackup migrate cluster \
--source-host old-server \
--target-host new-server \
--dry-run
`,
Run: func(cmd *cobra.Command, args []string) {
cmd.Help()
},
}
// migrateClusterCmd migrates an entire database cluster
var migrateClusterCmd = &cobra.Command{
Use: "cluster",
Short: "Migrate entire database cluster to target server",
Long: `Migrate all databases from source cluster to target server.
This command:
1. Connects to source server and lists all databases
2. Creates individual backups of each database
3. Restores each database to target server
4. Optionally cleans up backup files after successful migration
Requirements:
- Database client tools (pg_dump/pg_restore or mysqldump/mysql)
- Network access to both source and target servers
- Sufficient disk space in working directory for backups
Safety features:
- Dry-run mode by default (use --confirm to execute)
- Pre-flight checks on both servers
- Optional backup retention after migration
Examples:
# Preview migration
dbbackup migrate cluster \
--source-host old-server \
--target-host new-server
# Execute migration with cleanup of existing databases
dbbackup migrate cluster \
--source-host old-server --source-user postgres \
--target-host new-server --target-user postgres \
--clean --confirm
# Exclude specific databases
dbbackup migrate cluster \
--source-host old-server \
--target-host new-server \
--exclude template0,template1 \
--confirm
`,
RunE: runMigrateCluster,
}
// migrateSingleCmd migrates a single database
var migrateSingleCmd = &cobra.Command{
Use: "single [database-name]",
Short: "Migrate single database to target server",
Long: `Migrate a single database from source server to target server.
Examples:
# Migrate database to same name on target
dbbackup migrate single myapp_db \
--source-host old-server \
--target-host new-server \
--confirm
# Migrate to different database name
dbbackup migrate single myapp_db \
--source-host old-server \
--target-host new-server \
--target-database myapp_db_new \
--confirm
`,
Args: cobra.ExactArgs(1),
RunE: runMigrateSingle,
}
func init() {
// Add migrate command to root
rootCmd.AddCommand(migrateCmd)
// Add subcommands
migrateCmd.AddCommand(migrateClusterCmd)
migrateCmd.AddCommand(migrateSingleCmd)
// Source connection flags
migrateCmd.PersistentFlags().StringVar(&migrateSourceHost, "source-host", "localhost", "Source database host")
migrateCmd.PersistentFlags().IntVar(&migrateSourcePort, "source-port", 5432, "Source database port")
migrateCmd.PersistentFlags().StringVar(&migrateSourceUser, "source-user", "", "Source database user")
migrateCmd.PersistentFlags().StringVar(&migrateSourcePassword, "source-password", "", "Source database password")
migrateCmd.PersistentFlags().StringVar(&migrateSourceSSLMode, "source-ssl-mode", "prefer", "Source SSL mode (disable, prefer, require)")
// Target connection flags
migrateCmd.PersistentFlags().StringVar(&migrateTargetHost, "target-host", "", "Target database host (required)")
migrateCmd.PersistentFlags().IntVar(&migrateTargetPort, "target-port", 5432, "Target database port")
migrateCmd.PersistentFlags().StringVar(&migrateTargetUser, "target-user", "", "Target database user (default: same as source)")
migrateCmd.PersistentFlags().StringVar(&migrateTargetPassword, "target-password", "", "Target database password")
migrateCmd.PersistentFlags().StringVar(&migrateTargetSSLMode, "target-ssl-mode", "prefer", "Target SSL mode (disable, prefer, require)")
// Single database specific flags
migrateSingleCmd.Flags().StringVar(&migrateTargetDatabase, "target-database", "", "Target database name (default: same as source)")
// Cluster specific flags
migrateClusterCmd.Flags().StringSliceVar(&migrateExclude, "exclude", []string{}, "Databases to exclude from migration")
// Migration options
migrateCmd.PersistentFlags().StringVar(&migrateWorkdir, "workdir", "", "Working directory for backup files (default: system temp)")
migrateCmd.PersistentFlags().BoolVar(&migrateClean, "clean", false, "Drop existing databases on target before restore")
migrateCmd.PersistentFlags().BoolVar(&migrateConfirm, "confirm", false, "Confirm and execute migration (default: dry-run)")
migrateCmd.PersistentFlags().BoolVar(&migrateDryRun, "dry-run", false, "Preview migration without executing")
migrateCmd.PersistentFlags().BoolVar(&migrateKeepBackup, "keep-backup", false, "Keep backup files after successful migration")
migrateCmd.PersistentFlags().IntVar(&migrateJobs, "jobs", 4, "Parallel jobs for backup/restore")
migrateCmd.PersistentFlags().BoolVar(&migrateVerbose, "verbose", false, "Verbose output")
// Mark required flags
migrateCmd.MarkPersistentFlagRequired("target-host")
}
func runMigrateCluster(cmd *cobra.Command, args []string) error {
// Validate target host
if migrateTargetHost == "" {
return fmt.Errorf("--target-host is required")
}
// Set defaults
if migrateSourceUser == "" {
migrateSourceUser = os.Getenv("USER")
}
if migrateTargetUser == "" {
migrateTargetUser = migrateSourceUser
}
workdir := migrateWorkdir
if workdir == "" {
workdir = filepath.Join(os.TempDir(), "dbbackup-migrate")
}
// Create working directory
if err := os.MkdirAll(workdir, 0755); err != nil {
return fmt.Errorf("failed to create working directory: %w", err)
}
// Create source config
sourceCfg := config.New()
sourceCfg.Host = migrateSourceHost
sourceCfg.Port = migrateSourcePort
sourceCfg.User = migrateSourceUser
sourceCfg.Password = migrateSourcePassword
sourceCfg.SSLMode = migrateSourceSSLMode
sourceCfg.Database = "postgres" // Default connection database
sourceCfg.DatabaseType = cfg.DatabaseType
sourceCfg.BackupDir = workdir
sourceCfg.DumpJobs = migrateJobs
// Create target config
targetCfg := config.New()
targetCfg.Host = migrateTargetHost
targetCfg.Port = migrateTargetPort
targetCfg.User = migrateTargetUser
targetCfg.Password = migrateTargetPassword
targetCfg.SSLMode = migrateTargetSSLMode
targetCfg.Database = "postgres"
targetCfg.DatabaseType = cfg.DatabaseType
targetCfg.BackupDir = workdir
// Create migration engine
engine, err := migrate.NewEngine(sourceCfg, targetCfg, log)
if err != nil {
return fmt.Errorf("failed to create migration engine: %w", err)
}
defer engine.Close()
// Configure engine
engine.SetWorkDir(workdir)
engine.SetKeepBackup(migrateKeepBackup)
engine.SetJobs(migrateJobs)
engine.SetDryRun(migrateDryRun || !migrateConfirm)
engine.SetVerbose(migrateVerbose)
engine.SetCleanTarget(migrateClean)
// Setup context with cancellation
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Handle interrupt signals
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
go func() {
<-sigChan
log.Warn("Received interrupt signal, cancelling migration...")
cancel()
}()
// Connect to databases
if err := engine.Connect(ctx); err != nil {
return fmt.Errorf("failed to connect: %w", err)
}
// Print migration plan
fmt.Println()
fmt.Println("=== Cluster Migration Plan ===")
fmt.Println()
fmt.Printf("Source: %s@%s:%d\n", migrateSourceUser, migrateSourceHost, migrateSourcePort)
fmt.Printf("Target: %s@%s:%d\n", migrateTargetUser, migrateTargetHost, migrateTargetPort)
fmt.Printf("Database Type: %s\n", cfg.DatabaseType)
fmt.Printf("Working Directory: %s\n", workdir)
fmt.Printf("Clean Target: %v\n", migrateClean)
fmt.Printf("Keep Backup: %v\n", migrateKeepBackup)
fmt.Printf("Parallel Jobs: %d\n", migrateJobs)
if len(migrateExclude) > 0 {
fmt.Printf("Excluded: %v\n", migrateExclude)
}
fmt.Println()
isDryRun := migrateDryRun || !migrateConfirm
if isDryRun {
fmt.Println("Mode: DRY-RUN (use --confirm to execute)")
fmt.Println()
return engine.PreflightCheck(ctx)
}
fmt.Println("Mode: EXECUTE")
fmt.Println()
// Execute migration
startTime := time.Now()
result, err := engine.MigrateCluster(ctx, migrateExclude)
duration := time.Since(startTime)
if err != nil {
log.Error("Migration failed", "error", err, "duration", duration)
return fmt.Errorf("migration failed: %w", err)
}
// Print results
fmt.Println()
fmt.Println("=== Migration Complete ===")
fmt.Println()
fmt.Printf("Duration: %s\n", duration.Round(time.Second))
fmt.Printf("Databases Migrated: %d\n", result.DatabaseCount)
if result.BackupPath != "" && migrateKeepBackup {
fmt.Printf("Backup Location: %s\n", result.BackupPath)
}
fmt.Println()
return nil
}
func runMigrateSingle(cmd *cobra.Command, args []string) error {
dbName := args[0]
// Validate target host
if migrateTargetHost == "" {
return fmt.Errorf("--target-host is required")
}
// Set defaults
if migrateSourceUser == "" {
migrateSourceUser = os.Getenv("USER")
}
if migrateTargetUser == "" {
migrateTargetUser = migrateSourceUser
}
targetDB := migrateTargetDatabase
if targetDB == "" {
targetDB = dbName
}
workdir := migrateWorkdir
if workdir == "" {
workdir = filepath.Join(os.TempDir(), "dbbackup-migrate")
}
// Create working directory
if err := os.MkdirAll(workdir, 0755); err != nil {
return fmt.Errorf("failed to create working directory: %w", err)
}
// Create source config
sourceCfg := config.New()
sourceCfg.Host = migrateSourceHost
sourceCfg.Port = migrateSourcePort
sourceCfg.User = migrateSourceUser
sourceCfg.Password = migrateSourcePassword
sourceCfg.SSLMode = migrateSourceSSLMode
sourceCfg.Database = dbName
sourceCfg.DatabaseType = cfg.DatabaseType
sourceCfg.BackupDir = workdir
sourceCfg.DumpJobs = migrateJobs
// Create target config
targetCfg := config.New()
targetCfg.Host = migrateTargetHost
targetCfg.Port = migrateTargetPort
targetCfg.User = migrateTargetUser
targetCfg.Password = migrateTargetPassword
targetCfg.SSLMode = migrateTargetSSLMode
targetCfg.Database = targetDB
targetCfg.DatabaseType = cfg.DatabaseType
targetCfg.BackupDir = workdir
// Create migration engine
engine, err := migrate.NewEngine(sourceCfg, targetCfg, log)
if err != nil {
return fmt.Errorf("failed to create migration engine: %w", err)
}
defer engine.Close()
// Configure engine
engine.SetWorkDir(workdir)
engine.SetKeepBackup(migrateKeepBackup)
engine.SetJobs(migrateJobs)
engine.SetDryRun(migrateDryRun || !migrateConfirm)
engine.SetVerbose(migrateVerbose)
engine.SetCleanTarget(migrateClean)
// Setup context with cancellation
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Handle interrupt signals
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
go func() {
<-sigChan
log.Warn("Received interrupt signal, cancelling migration...")
cancel()
}()
// Connect to databases
if err := engine.Connect(ctx); err != nil {
return fmt.Errorf("failed to connect: %w", err)
}
// Print migration plan
fmt.Println()
fmt.Println("=== Single Database Migration Plan ===")
fmt.Println()
fmt.Printf("Source: %s@%s:%d/%s\n", migrateSourceUser, migrateSourceHost, migrateSourcePort, dbName)
fmt.Printf("Target: %s@%s:%d/%s\n", migrateTargetUser, migrateTargetHost, migrateTargetPort, targetDB)
fmt.Printf("Database Type: %s\n", cfg.DatabaseType)
fmt.Printf("Working Directory: %s\n", workdir)
fmt.Printf("Clean Target: %v\n", migrateClean)
fmt.Printf("Keep Backup: %v\n", migrateKeepBackup)
fmt.Println()
isDryRun := migrateDryRun || !migrateConfirm
if isDryRun {
fmt.Println("Mode: DRY-RUN (use --confirm to execute)")
fmt.Println()
return engine.PreflightCheck(ctx)
}
fmt.Println("Mode: EXECUTE")
fmt.Println()
// Execute migration
startTime := time.Now()
err = engine.MigrateSingle(ctx, dbName, targetDB)
duration := time.Since(startTime)
if err != nil {
log.Error("Migration failed", "error", err, "duration", duration)
return fmt.Errorf("migration failed: %w", err)
}
// Print results
fmt.Println()
fmt.Println("=== Migration Complete ===")
fmt.Println()
fmt.Printf("Duration: %s\n", duration.Round(time.Second))
fmt.Printf("Database: %s -> %s\n", dbName, targetDB)
fmt.Println()
return nil
}

569
internal/migrate/engine.go Normal file
View File

@@ -0,0 +1,569 @@
package migrate
import (
"context"
"fmt"
"os"
"os/exec"
"path/filepath"
"time"
"dbbackup/internal/config"
"dbbackup/internal/database"
"dbbackup/internal/logger"
"dbbackup/internal/progress"
)
// ClusterOptions holds configuration for cluster migration
type ClusterOptions struct {
// Source connection
SourceHost string
SourcePort int
SourceUser string
SourcePassword string
SourceSSLMode string
// Target connection
TargetHost string
TargetPort int
TargetUser string
TargetPassword string
TargetSSLMode string
// Migration options
WorkDir string
CleanTarget bool
KeepBackup bool
Jobs int
CompressionLevel int
Verbose bool
DryRun bool
DatabaseType string
ExcludeDBs []string
}
// SingleOptions holds configuration for single database migration
type SingleOptions struct {
// Source connection
SourceHost string
SourcePort int
SourceUser string
SourcePassword string
SourceDatabase string
SourceSSLMode string
// Target connection
TargetHost string
TargetPort int
TargetUser string
TargetPassword string
TargetDatabase string
TargetSSLMode string
// Migration options
WorkDir string
CleanTarget bool
KeepBackup bool
Jobs int
CompressionLevel int
Verbose bool
DryRun bool
DatabaseType string
}
// Result holds the outcome of a migration
type Result struct {
DatabaseCount int
TotalBytes int64
BackupPath string
Duration time.Duration
Databases []string
}
// Engine handles database migration between servers
type Engine struct {
sourceCfg *config.Config
targetCfg *config.Config
sourceDB database.Database
targetDB database.Database
log logger.Logger
progress progress.Indicator
workDir string
keepBackup bool
jobs int
dryRun bool
verbose bool
cleanTarget bool
}
// NewEngine creates a new migration engine
func NewEngine(sourceCfg, targetCfg *config.Config, log logger.Logger) (*Engine, error) {
// Create source database connection
sourceDB, err := database.New(sourceCfg, log)
if err != nil {
return nil, fmt.Errorf("failed to create source database connection: %w", err)
}
// Create target database connection
targetDB, err := database.New(targetCfg, log)
if err != nil {
return nil, fmt.Errorf("failed to create target database connection: %w", err)
}
return &Engine{
sourceCfg: sourceCfg,
targetCfg: targetCfg,
sourceDB: sourceDB,
targetDB: targetDB,
log: log,
progress: progress.NewSpinner(),
workDir: os.TempDir(),
keepBackup: false,
jobs: 4,
dryRun: false,
verbose: false,
cleanTarget: false,
}, nil
}
// SetWorkDir sets the working directory for backup files
func (e *Engine) SetWorkDir(dir string) {
e.workDir = dir
}
// SetKeepBackup sets whether to keep backup files after migration
func (e *Engine) SetKeepBackup(keep bool) {
e.keepBackup = keep
}
// SetJobs sets the number of parallel jobs for backup/restore
func (e *Engine) SetJobs(jobs int) {
e.jobs = jobs
}
// SetDryRun sets whether to perform a dry run (no actual changes)
func (e *Engine) SetDryRun(dryRun bool) {
e.dryRun = dryRun
}
// SetVerbose sets verbose output mode
func (e *Engine) SetVerbose(verbose bool) {
e.verbose = verbose
}
// SetCleanTarget sets whether to clean target before restore
func (e *Engine) SetCleanTarget(clean bool) {
e.cleanTarget = clean
}
// Connect establishes connections to both source and target databases
func (e *Engine) Connect(ctx context.Context) error {
if err := e.sourceDB.Connect(ctx); err != nil {
return fmt.Errorf("failed to connect to source database: %w", err)
}
if err := e.targetDB.Connect(ctx); err != nil {
e.sourceDB.Close()
return fmt.Errorf("failed to connect to target database: %w", err)
}
return nil
}
// Close closes connections to both databases
func (e *Engine) Close() error {
var errs []error
if e.sourceDB != nil {
if err := e.sourceDB.Close(); err != nil {
errs = append(errs, fmt.Errorf("source close error: %w", err))
}
}
if e.targetDB != nil {
if err := e.targetDB.Close(); err != nil {
errs = append(errs, fmt.Errorf("target close error: %w", err))
}
}
if len(errs) > 0 {
return fmt.Errorf("close errors: %v", errs)
}
return nil
}
// PreflightCheck validates both source and target connections
func (e *Engine) PreflightCheck(ctx context.Context) error {
e.log.Info("Running preflight checks...")
// Create working directory
if err := os.MkdirAll(e.workDir, 0755); err != nil {
return fmt.Errorf("failed to create working directory: %w", err)
}
// Check source connection
e.log.Info("Checking source connection", "host", e.sourceCfg.Host, "port", e.sourceCfg.Port)
if err := e.sourceDB.Ping(ctx); err != nil {
return fmt.Errorf("source connection failed: %w", err)
}
fmt.Printf(" [OK] Source connection: %s:%d\n", e.sourceCfg.Host, e.sourceCfg.Port)
// Get source version
version, err := e.sourceDB.GetVersion(ctx)
if err != nil {
e.log.Warn("Could not get source version", "error", err)
} else {
fmt.Printf(" [OK] Source version: %s\n", version)
}
// List source databases
databases, err := e.sourceDB.ListDatabases(ctx)
if err != nil {
return fmt.Errorf("failed to list source databases: %w", err)
}
fmt.Printf(" [OK] Source databases: %d found\n", len(databases))
for _, db := range databases {
fmt.Printf(" - %s\n", db)
}
// Check target connection
e.log.Info("Checking target connection", "host", e.targetCfg.Host, "port", e.targetCfg.Port)
if err := e.targetDB.Ping(ctx); err != nil {
return fmt.Errorf("target connection failed: %w", err)
}
fmt.Printf(" [OK] Target connection: %s:%d\n", e.targetCfg.Host, e.targetCfg.Port)
// Get target version
targetVersion, err := e.targetDB.GetVersion(ctx)
if err != nil {
e.log.Warn("Could not get target version", "error", err)
} else {
fmt.Printf(" [OK] Target version: %s\n", targetVersion)
}
// List target databases
targetDatabases, err := e.targetDB.ListDatabases(ctx)
if err != nil {
e.log.Warn("Could not list target databases", "error", err)
} else {
fmt.Printf(" [OK] Target databases: %d existing\n", len(targetDatabases))
if e.cleanTarget && len(targetDatabases) > 0 {
fmt.Println(" [WARN] Clean mode: existing databases will be dropped")
}
}
// Check disk space in working directory
fmt.Printf(" [OK] Working directory: %s\n", e.workDir)
fmt.Println()
fmt.Println("Preflight checks passed. Use --confirm to execute migration.")
return nil
}
// MigrateSingle migrates a single database from source to target
func (e *Engine) MigrateSingle(ctx context.Context, databaseName, targetName string) error {
if targetName == "" {
targetName = databaseName
}
operation := e.log.StartOperation("Single Database Migration")
e.log.Info("Starting single database migration",
"source_db", databaseName,
"target_db", targetName,
"source_host", e.sourceCfg.Host,
"target_host", e.targetCfg.Host)
if e.dryRun {
e.log.Info("DRY RUN: Would migrate database",
"source", databaseName,
"target", targetName)
fmt.Printf("DRY RUN: Would migrate '%s' -> '%s'\n", databaseName, targetName)
return nil
}
// Phase 1: Backup from source
e.progress.Start(fmt.Sprintf("Backing up '%s' from source server", databaseName))
fmt.Printf("Phase 1: Backing up database '%s'...\n", databaseName)
backupFile, err := e.backupDatabase(ctx, databaseName)
if err != nil {
e.progress.Fail(fmt.Sprintf("Backup failed: %v", err))
operation.Fail("Backup phase failed")
return fmt.Errorf("backup phase failed: %w", err)
}
e.progress.Complete(fmt.Sprintf("Backup completed: %s", filepath.Base(backupFile)))
// Get backup size
var backupSize int64
if fi, err := os.Stat(backupFile); err == nil {
backupSize = fi.Size()
}
fmt.Printf(" Backup created: %s (%s)\n", backupFile, formatBytes(backupSize))
// Cleanup backup file after migration (unless keepBackup is set)
if !e.keepBackup {
defer func() {
if err := os.Remove(backupFile); err != nil {
e.log.Warn("Failed to cleanup backup file", "file", backupFile, "error", err)
} else {
fmt.Println(" Backup file removed")
}
}()
}
// Phase 2: Restore to target
e.progress.Start(fmt.Sprintf("Restoring '%s' to target server", targetName))
fmt.Printf("Phase 2: Restoring to database '%s'...\n", targetName)
if err := e.restoreDatabase(ctx, backupFile, targetName); err != nil {
e.progress.Fail(fmt.Sprintf("Restore failed: %v", err))
operation.Fail("Restore phase failed")
return fmt.Errorf("restore phase failed: %w", err)
}
e.progress.Complete(fmt.Sprintf("Migration completed: %s -> %s", databaseName, targetName))
fmt.Printf(" Database '%s' restored successfully\n", targetName)
operation.Complete(fmt.Sprintf("Migrated '%s' to '%s'", databaseName, targetName))
return nil
}
// MigrateCluster migrates all databases from source to target cluster
func (e *Engine) MigrateCluster(ctx context.Context, excludeDBs []string) (*Result, error) {
result := &Result{}
startTime := time.Now()
operation := e.log.StartOperation("Cluster Migration")
e.log.Info("Starting cluster migration",
"source_host", e.sourceCfg.Host,
"target_host", e.targetCfg.Host,
"excluded_dbs", excludeDBs)
// List all databases from source
databases, err := e.sourceDB.ListDatabases(ctx)
if err != nil {
operation.Fail("Failed to list source databases")
return nil, fmt.Errorf("failed to list source databases: %w", err)
}
// Filter out excluded databases
excludeMap := make(map[string]bool)
for _, db := range excludeDBs {
excludeMap[db] = true
}
var toMigrate []string
for _, db := range databases {
if !excludeMap[db] {
toMigrate = append(toMigrate, db)
}
}
e.log.Info("Databases to migrate", "count", len(toMigrate), "databases", toMigrate)
fmt.Printf("Found %d databases to migrate\n", len(toMigrate))
if e.dryRun {
e.log.Info("DRY RUN: Would migrate databases", "databases", toMigrate)
fmt.Println("DRY RUN: Would migrate the following databases:")
for _, db := range toMigrate {
fmt.Printf(" - %s\n", db)
}
result.Databases = toMigrate
result.DatabaseCount = len(toMigrate)
return result, nil
}
// Migrate each database
var failed []string
var migrated []string
for i, db := range toMigrate {
fmt.Printf("\n[%d/%d] Migrating database: %s\n", i+1, len(toMigrate), db)
e.log.Info("Migrating database", "index", i+1, "total", len(toMigrate), "database", db)
if err := e.MigrateSingle(ctx, db, db); err != nil {
e.log.Error("Failed to migrate database", "database", db, "error", err)
failed = append(failed, db)
// Continue with other databases
} else {
migrated = append(migrated, db)
}
}
result.Databases = migrated
result.DatabaseCount = len(migrated)
result.Duration = time.Since(startTime)
fmt.Printf("\nCluster migration completed in %v\n", result.Duration.Round(time.Second))
fmt.Printf(" Migrated: %d databases\n", len(migrated))
if len(failed) > 0 {
fmt.Printf(" Failed: %d databases (%v)\n", len(failed), failed)
operation.Fail(fmt.Sprintf("Migration completed with %d failures", len(failed)))
return result, fmt.Errorf("failed to migrate %d databases: %v", len(failed), failed)
}
operation.Complete(fmt.Sprintf("Cluster migration completed: %d databases", len(toMigrate)))
return result, nil
}
// backupDatabase creates a backup of the specified database from the source server
func (e *Engine) backupDatabase(ctx context.Context, databaseName string) (string, error) {
// Generate backup filename
timestamp := time.Now().Format("20060102_150405")
var outputFile string
if e.sourceCfg.IsPostgreSQL() {
outputFile = filepath.Join(e.workDir, fmt.Sprintf("migrate_%s_%s.dump", databaseName, timestamp))
} else {
outputFile = filepath.Join(e.workDir, fmt.Sprintf("migrate_%s_%s.sql.gz", databaseName, timestamp))
}
// Build backup command using database interface
options := database.BackupOptions{
Compression: 6,
Parallel: e.jobs,
Format: "custom",
Blobs: true,
}
cmdArgs := e.sourceDB.BuildBackupCommand(databaseName, outputFile, options)
if len(cmdArgs) == 0 {
return "", fmt.Errorf("failed to build backup command")
}
// Execute backup command
cmd := exec.CommandContext(ctx, cmdArgs[0], cmdArgs[1:]...)
cmd.Env = e.buildSourceEnv()
if e.verbose {
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
}
output, err := cmd.CombinedOutput()
if err != nil {
return "", fmt.Errorf("backup command failed: %w, output: %s", err, string(output))
}
// Verify backup file exists
if _, err := os.Stat(outputFile); err != nil {
return "", fmt.Errorf("backup file not created: %w", err)
}
return outputFile, nil
}
// restoreDatabase restores a backup file to the target server
func (e *Engine) restoreDatabase(ctx context.Context, backupFile, targetDB string) error {
// Ensure target database exists
exists, err := e.targetDB.DatabaseExists(ctx, targetDB)
if err != nil {
return fmt.Errorf("failed to check target database: %w", err)
}
if !exists {
e.log.Info("Creating target database", "database", targetDB)
if err := e.targetDB.CreateDatabase(ctx, targetDB); err != nil {
return fmt.Errorf("failed to create target database: %w", err)
}
} else if e.cleanTarget {
e.log.Info("Dropping and recreating target database", "database", targetDB)
if err := e.targetDB.DropDatabase(ctx, targetDB); err != nil {
e.log.Warn("Failed to drop target database", "database", targetDB, "error", err)
}
if err := e.targetDB.CreateDatabase(ctx, targetDB); err != nil {
return fmt.Errorf("failed to create target database: %w", err)
}
}
// Build restore command
options := database.RestoreOptions{
Parallel: e.jobs,
Clean: e.cleanTarget,
IfExists: true,
SingleTransaction: false,
Verbose: e.verbose,
}
cmdArgs := e.targetDB.BuildRestoreCommand(targetDB, backupFile, options)
if len(cmdArgs) == 0 {
return fmt.Errorf("failed to build restore command")
}
// Execute restore command
cmd := exec.CommandContext(ctx, cmdArgs[0], cmdArgs[1:]...)
cmd.Env = e.buildTargetEnv()
if e.verbose {
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
}
output, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("restore command failed: %w, output: %s", err, string(output))
}
return nil
}
// buildSourceEnv builds environment variables for source database commands
func (e *Engine) buildSourceEnv() []string {
env := os.Environ()
if e.sourceCfg.IsPostgreSQL() {
env = append(env,
fmt.Sprintf("PGHOST=%s", e.sourceCfg.Host),
fmt.Sprintf("PGPORT=%d", e.sourceCfg.Port),
fmt.Sprintf("PGUSER=%s", e.sourceCfg.User),
fmt.Sprintf("PGPASSWORD=%s", e.sourceCfg.Password),
)
if e.sourceCfg.SSLMode != "" {
env = append(env, fmt.Sprintf("PGSSLMODE=%s", e.sourceCfg.SSLMode))
}
} else if e.sourceCfg.IsMySQL() {
env = append(env,
fmt.Sprintf("MYSQL_HOST=%s", e.sourceCfg.Host),
fmt.Sprintf("MYSQL_TCP_PORT=%d", e.sourceCfg.Port),
fmt.Sprintf("MYSQL_PWD=%s", e.sourceCfg.Password),
)
}
return env
}
// buildTargetEnv builds environment variables for target database commands
func (e *Engine) buildTargetEnv() []string {
env := os.Environ()
if e.targetCfg.IsPostgreSQL() {
env = append(env,
fmt.Sprintf("PGHOST=%s", e.targetCfg.Host),
fmt.Sprintf("PGPORT=%d", e.targetCfg.Port),
fmt.Sprintf("PGUSER=%s", e.targetCfg.User),
fmt.Sprintf("PGPASSWORD=%s", e.targetCfg.Password),
)
if e.targetCfg.SSLMode != "" {
env = append(env, fmt.Sprintf("PGSSLMODE=%s", e.targetCfg.SSLMode))
}
} else if e.targetCfg.IsMySQL() {
env = append(env,
fmt.Sprintf("MYSQL_HOST=%s", e.targetCfg.Host),
fmt.Sprintf("MYSQL_TCP_PORT=%d", e.targetCfg.Port),
fmt.Sprintf("MYSQL_PWD=%s", e.targetCfg.Password),
)
}
return env
}
// formatBytes formats bytes as human-readable string
func formatBytes(bytes int64) string {
const unit = 1024
if bytes < unit {
return fmt.Sprintf("%d B", bytes)
}
div, exp := int64(unit), 0
for n := bytes / unit; n >= unit; n /= unit {
div *= unit
exp++
}
return fmt.Sprintf("%.1f %cB", float64(bytes)/float64(div), "KMGTPE"[exp])
}