Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 97c52ab9e5 | |||
| 3c9e5f04ca | |||
| 86a28b6ec5 |
@ -11,6 +11,7 @@ import (
|
||||
|
||||
"dbbackup/internal/database"
|
||||
"dbbackup/internal/engine/native"
|
||||
"dbbackup/internal/metadata"
|
||||
"dbbackup/internal/notify"
|
||||
|
||||
"github.com/klauspost/pgzip"
|
||||
@ -163,6 +164,54 @@ func runNativeBackup(ctx context.Context, db database.Database, databaseName, ba
|
||||
"duration", backupDuration,
|
||||
"engine", result.EngineUsed)
|
||||
|
||||
// Get actual file size from disk
|
||||
fileInfo, err := os.Stat(outputFile)
|
||||
var actualSize int64
|
||||
if err == nil {
|
||||
actualSize = fileInfo.Size()
|
||||
} else {
|
||||
actualSize = result.BytesProcessed
|
||||
}
|
||||
|
||||
// Calculate SHA256 checksum
|
||||
sha256sum, err := metadata.CalculateSHA256(outputFile)
|
||||
if err != nil {
|
||||
log.Warn("Failed to calculate SHA256", "error", err)
|
||||
sha256sum = ""
|
||||
}
|
||||
|
||||
// Create and save metadata file
|
||||
meta := &metadata.BackupMetadata{
|
||||
Version: "1.0",
|
||||
Timestamp: backupStartTime,
|
||||
Database: databaseName,
|
||||
DatabaseType: dbType,
|
||||
Host: cfg.Host,
|
||||
Port: cfg.Port,
|
||||
User: cfg.User,
|
||||
BackupFile: filepath.Base(outputFile),
|
||||
SizeBytes: actualSize,
|
||||
SHA256: sha256sum,
|
||||
Compression: "gzip",
|
||||
BackupType: backupType,
|
||||
Duration: backupDuration.Seconds(),
|
||||
ExtraInfo: map[string]string{
|
||||
"engine": result.EngineUsed,
|
||||
"objects_processed": fmt.Sprintf("%d", result.ObjectsProcessed),
|
||||
},
|
||||
}
|
||||
|
||||
if cfg.CompressionLevel == 0 {
|
||||
meta.Compression = "none"
|
||||
}
|
||||
|
||||
metaPath := outputFile + ".meta.json"
|
||||
if err := metadata.Save(metaPath, meta); err != nil {
|
||||
log.Warn("Failed to save metadata", "error", err)
|
||||
} else {
|
||||
log.Debug("Metadata saved", "path", metaPath)
|
||||
}
|
||||
|
||||
// Audit log: backup completed
|
||||
auditLogger.LogBackupComplete(user, databaseName, cfg.BackupDir, result.BytesProcessed)
|
||||
|
||||
|
||||
@ -70,11 +70,11 @@ func (p *PostgreSQL) Connect(ctx context.Context) error {
|
||||
maxConns = 5 // minimum pool size
|
||||
}
|
||||
}
|
||||
config.MaxConns = maxConns // Max concurrent connections based on --jobs
|
||||
config.MinConns = 2 // Keep minimum connections ready
|
||||
config.MaxConnLifetime = 0 // No limit on connection lifetime
|
||||
config.MaxConnIdleTime = 0 // No idle timeout
|
||||
config.HealthCheckPeriod = 1 * time.Minute // Health check every minute
|
||||
config.MaxConns = maxConns // Max concurrent connections based on --jobs
|
||||
config.MinConns = 2 // Keep minimum connections ready
|
||||
config.MaxConnLifetime = 0 // No limit on connection lifetime
|
||||
config.MaxConnIdleTime = 0 // No idle timeout
|
||||
config.HealthCheckPeriod = 5 * time.Second // Faster health check for quicker shutdown on Ctrl+C
|
||||
|
||||
// Optimize for large query results (BLOB data)
|
||||
config.ConnConfig.RuntimeParams["work_mem"] = "64MB"
|
||||
@ -97,6 +97,16 @@ func (p *PostgreSQL) Connect(ctx context.Context) error {
|
||||
|
||||
p.pool = pool
|
||||
p.db = db
|
||||
|
||||
// CRITICAL: Start a goroutine to close the pool when context is cancelled
|
||||
// This ensures the background health check goroutine is stopped on Ctrl+C
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
if p.pool != nil {
|
||||
p.pool.Close()
|
||||
}
|
||||
}()
|
||||
|
||||
p.log.Info("Connected to PostgreSQL successfully", "driver", "pgx", "max_conns", config.MaxConns)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -28,6 +28,9 @@ type ParallelRestoreEngine struct {
|
||||
|
||||
// Configuration
|
||||
parallelWorkers int
|
||||
|
||||
// Internal cancel channel to stop the pool cleanup goroutine
|
||||
closeCh chan struct{}
|
||||
}
|
||||
|
||||
// ParallelRestoreOptions configures parallel restore behavior
|
||||
@ -101,18 +104,40 @@ func NewParallelRestoreEngineWithContext(ctx context.Context, config *PostgreSQL
|
||||
poolConfig.MaxConns = int32(workers + 2)
|
||||
poolConfig.MinConns = int32(workers)
|
||||
|
||||
// CRITICAL: Reduce health check period to allow faster shutdown
|
||||
// Default is 1 minute which causes hangs on Ctrl+C
|
||||
poolConfig.HealthCheckPeriod = 5 * time.Second
|
||||
|
||||
// Use the provided context so pool health checks stop when context is cancelled
|
||||
pool, err := pgxpool.NewWithConfig(ctx, poolConfig)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create connection pool: %w", err)
|
||||
}
|
||||
|
||||
return &ParallelRestoreEngine{
|
||||
closeCh := make(chan struct{})
|
||||
|
||||
engine := &ParallelRestoreEngine{
|
||||
config: config,
|
||||
pool: pool,
|
||||
log: log,
|
||||
parallelWorkers: workers,
|
||||
}, nil
|
||||
closeCh: closeCh,
|
||||
}
|
||||
|
||||
// CRITICAL: Start a goroutine to close the pool when context is cancelled OR engine is closed
|
||||
// This ensures the background health check goroutine is stopped on Ctrl+C or normal Close()
|
||||
go func() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// Context cancelled (e.g., Ctrl+C)
|
||||
pool.Close()
|
||||
case <-closeCh:
|
||||
// Engine explicitly closed - pool already closed by Close()
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
return engine, nil
|
||||
}
|
||||
|
||||
// RestoreFile restores from a SQL file with parallel execution
|
||||
@ -510,8 +535,13 @@ func (e *ParallelRestoreEngine) executeCopy(ctx context.Context, stmt *SQLStatem
|
||||
return tag.RowsAffected(), nil
|
||||
}
|
||||
|
||||
// Close closes the connection pool
|
||||
// Close closes the connection pool and stops the cleanup goroutine
|
||||
func (e *ParallelRestoreEngine) Close() error {
|
||||
// Signal the cleanup goroutine to exit
|
||||
if e.closeCh != nil {
|
||||
close(e.closeCh)
|
||||
}
|
||||
// Close the pool
|
||||
if e.pool != nil {
|
||||
e.pool.Close()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user