Files
dbbackup/internal/pitr/mysql.go
Alexander Renz f69bfe7071 feat: Add enterprise DBA features for production reliability
New features implemented:

1. Backup Catalog (internal/catalog/)
   - SQLite-based backup tracking
   - Gap detection and RPO monitoring
   - Search and statistics
   - Filesystem sync

2. DR Drill Testing (internal/drill/)
   - Automated restore testing in Docker containers
   - Database validation with custom queries
   - Catalog integration for drill-tested status

3. Smart Notifications (internal/notify/)
   - Event batching with configurable intervals
   - Time-based escalation policies
   - HTML/text/Slack templates

4. Compliance Reports (internal/report/)
   - SOC2, GDPR, HIPAA, PCI-DSS, ISO27001 frameworks
   - Evidence collection from catalog
   - JSON, Markdown, HTML output formats

5. RTO/RPO Calculator (internal/rto/)
   - Recovery objective analysis
   - RTO breakdown by phase
   - Recommendations for improvement

6. Replica-Aware Backup (internal/replica/)
   - Topology detection for PostgreSQL/MySQL
   - Automatic replica selection
   - Configurable selection strategies

7. Parallel Table Backup (internal/parallel/)
   - Concurrent table dumps
   - Worker pool with progress tracking
   - Large table optimization

8. MySQL/MariaDB PITR (internal/pitr/)
   - Binary log parsing and replay
   - Point-in-time recovery support
   - Transaction filtering

CLI commands added: catalog, drill, report, rto

All changes support the goal: reliable 3 AM database recovery.
2025-12-13 20:28:55 +01:00

925 lines
24 KiB
Go

// Package pitr provides Point-in-Time Recovery functionality
// This file contains the MySQL/MariaDB PITR provider implementation
package pitr
import (
"bufio"
"compress/gzip"
"context"
"database/sql"
"encoding/json"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"regexp"
"strconv"
"strings"
"time"
)
// MySQLPITR implements PITRProvider for MySQL and MariaDB
type MySQLPITR struct {
db *sql.DB
config MySQLPITRConfig
binlogManager *BinlogManager
serverType DatabaseType
serverVersion string
serverID uint32
gtidMode bool
}
// MySQLPITRConfig holds configuration for MySQL PITR
type MySQLPITRConfig struct {
// Connection settings
Host string `json:"host"`
Port int `json:"port"`
User string `json:"user"`
Password string `json:"password,omitempty"`
Socket string `json:"socket,omitempty"`
// Paths
DataDir string `json:"data_dir"`
BinlogDir string `json:"binlog_dir"`
ArchiveDir string `json:"archive_dir"`
RestoreDir string `json:"restore_dir"`
// Archive settings
ArchiveInterval time.Duration `json:"archive_interval"`
RetentionDays int `json:"retention_days"`
Compression bool `json:"compression"`
CompressionLevel int `json:"compression_level"`
Encryption bool `json:"encryption"`
EncryptionKey []byte `json:"-"`
// Behavior settings
RequireRowFormat bool `json:"require_row_format"`
RequireGTID bool `json:"require_gtid"`
FlushLogsOnBackup bool `json:"flush_logs_on_backup"`
LockTables bool `json:"lock_tables"`
SingleTransaction bool `json:"single_transaction"`
}
// NewMySQLPITR creates a new MySQL PITR provider
func NewMySQLPITR(db *sql.DB, config MySQLPITRConfig) (*MySQLPITR, error) {
m := &MySQLPITR{
db: db,
config: config,
}
// Detect server type and version
if err := m.detectServerInfo(); err != nil {
return nil, fmt.Errorf("detecting server info: %w", err)
}
// Initialize binlog manager
binlogConfig := BinlogManagerConfig{
BinlogDir: config.BinlogDir,
ArchiveDir: config.ArchiveDir,
Compression: config.Compression,
Encryption: config.Encryption,
EncryptionKey: config.EncryptionKey,
}
var err error
m.binlogManager, err = NewBinlogManager(binlogConfig)
if err != nil {
return nil, fmt.Errorf("creating binlog manager: %w", err)
}
return m, nil
}
// detectServerInfo detects MySQL/MariaDB version and configuration
func (m *MySQLPITR) detectServerInfo() error {
// Get version
var version string
err := m.db.QueryRow("SELECT VERSION()").Scan(&version)
if err != nil {
return fmt.Errorf("getting version: %w", err)
}
m.serverVersion = version
// Detect MariaDB vs MySQL
if strings.Contains(strings.ToLower(version), "mariadb") {
m.serverType = DatabaseMariaDB
} else {
m.serverType = DatabaseMySQL
}
// Get server_id
var serverID int
err = m.db.QueryRow("SELECT @@server_id").Scan(&serverID)
if err == nil {
m.serverID = uint32(serverID)
}
// Check GTID mode
if m.serverType == DatabaseMySQL {
var gtidMode string
err = m.db.QueryRow("SELECT @@gtid_mode").Scan(&gtidMode)
if err == nil {
m.gtidMode = strings.ToUpper(gtidMode) == "ON"
}
} else {
// MariaDB uses different variables
var gtidPos string
err = m.db.QueryRow("SELECT @@gtid_current_pos").Scan(&gtidPos)
m.gtidMode = err == nil && gtidPos != ""
}
return nil
}
// DatabaseType returns the database type this provider handles
func (m *MySQLPITR) DatabaseType() DatabaseType {
return m.serverType
}
// Enable enables PITR for the MySQL database
func (m *MySQLPITR) Enable(ctx context.Context, config PITREnableConfig) error {
// Check current binlog settings
status, err := m.Status(ctx)
if err != nil {
return fmt.Errorf("checking status: %w", err)
}
var issues []string
// Check if binlog is enabled
var logBin string
if err := m.db.QueryRowContext(ctx, "SELECT @@log_bin").Scan(&logBin); err != nil {
return fmt.Errorf("checking log_bin: %w", err)
}
if logBin != "1" && strings.ToUpper(logBin) != "ON" {
issues = append(issues, "binary logging is not enabled (log_bin=OFF)")
issues = append(issues, " Add to my.cnf: log_bin = mysql-bin")
}
// Check binlog format
if m.config.RequireRowFormat && status.LogLevel != "ROW" {
issues = append(issues, fmt.Sprintf("binlog_format is %s, not ROW", status.LogLevel))
issues = append(issues, " Add to my.cnf: binlog_format = ROW")
}
// Check GTID mode if required
if m.config.RequireGTID && !m.gtidMode {
issues = append(issues, "GTID mode is not enabled")
if m.serverType == DatabaseMySQL {
issues = append(issues, " Add to my.cnf: gtid_mode = ON, enforce_gtid_consistency = ON")
} else {
issues = append(issues, " MariaDB: GTIDs are automatically managed with log_slave_updates")
}
}
// Check expire_logs_days (don't want logs expiring before we archive them)
var expireDays int
m.db.QueryRowContext(ctx, "SELECT @@expire_logs_days").Scan(&expireDays)
if expireDays > 0 && expireDays < config.RetentionDays {
issues = append(issues,
fmt.Sprintf("expire_logs_days (%d) is less than retention days (%d)",
expireDays, config.RetentionDays))
}
if len(issues) > 0 {
return fmt.Errorf("PITR requirements not met:\n - %s", strings.Join(issues, "\n - "))
}
// Update archive configuration
m.config.ArchiveDir = config.ArchiveDir
m.config.RetentionDays = config.RetentionDays
m.config.ArchiveInterval = config.ArchiveInterval
m.config.Compression = config.Compression
m.config.Encryption = config.Encryption
m.config.EncryptionKey = config.EncryptionKey
// Create archive directory
if err := os.MkdirAll(config.ArchiveDir, 0750); err != nil {
return fmt.Errorf("creating archive directory: %w", err)
}
// Save configuration
configPath := filepath.Join(config.ArchiveDir, "pitr_config.json")
configData, _ := json.MarshalIndent(map[string]interface{}{
"enabled": true,
"server_type": m.serverType,
"server_version": m.serverVersion,
"server_id": m.serverID,
"gtid_mode": m.gtidMode,
"archive_dir": config.ArchiveDir,
"retention_days": config.RetentionDays,
"archive_interval": config.ArchiveInterval.String(),
"compression": config.Compression,
"encryption": config.Encryption,
"created_at": time.Now().Format(time.RFC3339),
}, "", " ")
if err := os.WriteFile(configPath, configData, 0640); err != nil {
return fmt.Errorf("saving config: %w", err)
}
return nil
}
// Disable disables PITR for the MySQL database
func (m *MySQLPITR) Disable(ctx context.Context) error {
configPath := filepath.Join(m.config.ArchiveDir, "pitr_config.json")
// Check if config exists
if _, err := os.Stat(configPath); os.IsNotExist(err) {
return fmt.Errorf("PITR is not enabled (no config file found)")
}
// Update config to disabled
configData, _ := json.MarshalIndent(map[string]interface{}{
"enabled": false,
"disabled_at": time.Now().Format(time.RFC3339),
}, "", " ")
if err := os.WriteFile(configPath, configData, 0640); err != nil {
return fmt.Errorf("updating config: %w", err)
}
return nil
}
// Status returns the current PITR status
func (m *MySQLPITR) Status(ctx context.Context) (*PITRStatus, error) {
status := &PITRStatus{
DatabaseType: m.serverType,
ArchiveDir: m.config.ArchiveDir,
}
// Check if PITR is enabled via config file
configPath := filepath.Join(m.config.ArchiveDir, "pitr_config.json")
if data, err := os.ReadFile(configPath); err == nil {
var config map[string]interface{}
if json.Unmarshal(data, &config) == nil {
if enabled, ok := config["enabled"].(bool); ok {
status.Enabled = enabled
}
}
}
// Get binlog format
var binlogFormat string
if err := m.db.QueryRowContext(ctx, "SELECT @@binlog_format").Scan(&binlogFormat); err == nil {
status.LogLevel = binlogFormat
}
// Get current position
pos, err := m.GetCurrentPosition(ctx)
if err == nil {
status.Position = pos
}
// Get archive stats
if m.config.ArchiveDir != "" {
archives, err := m.binlogManager.ListArchivedBinlogs(ctx)
if err == nil {
status.ArchiveCount = len(archives)
for _, a := range archives {
status.ArchiveSize += a.Size
if a.ArchivedAt.After(status.LastArchived) {
status.LastArchived = a.ArchivedAt
}
}
}
}
status.ArchiveMethod = "manual" // MySQL doesn't have automatic archiving like PostgreSQL
return status, nil
}
// GetCurrentPosition retrieves the current binary log position
func (m *MySQLPITR) GetCurrentPosition(ctx context.Context) (*BinlogPosition, error) {
pos := &BinlogPosition{}
// Use SHOW MASTER STATUS for current position
rows, err := m.db.QueryContext(ctx, "SHOW MASTER STATUS")
if err != nil {
return nil, fmt.Errorf("getting master status: %w", err)
}
defer rows.Close()
if rows.Next() {
var file string
var position uint64
var binlogDoDB, binlogIgnoreDB, executedGtidSet sql.NullString
cols, _ := rows.Columns()
switch len(cols) {
case 5: // MySQL 5.6+
err = rows.Scan(&file, &position, &binlogDoDB, &binlogIgnoreDB, &executedGtidSet)
case 4: // Older versions
err = rows.Scan(&file, &position, &binlogDoDB, &binlogIgnoreDB)
default:
err = rows.Scan(&file, &position)
}
if err != nil {
return nil, fmt.Errorf("scanning master status: %w", err)
}
pos.File = file
pos.Position = position
pos.ServerID = m.serverID
if executedGtidSet.Valid {
pos.GTID = executedGtidSet.String
}
} else {
return nil, fmt.Errorf("no master status available (is binary logging enabled?)")
}
// For MariaDB, get GTID position differently
if m.serverType == DatabaseMariaDB && pos.GTID == "" {
var gtidPos string
if err := m.db.QueryRowContext(ctx, "SELECT @@gtid_current_pos").Scan(&gtidPos); err == nil {
pos.GTID = gtidPos
}
}
return pos, nil
}
// CreateBackup creates a PITR-capable backup with position recording
func (m *MySQLPITR) CreateBackup(ctx context.Context, opts BackupOptions) (*PITRBackupInfo, error) {
// Get position BEFORE flushing logs
startPos, err := m.GetCurrentPosition(ctx)
if err != nil {
return nil, fmt.Errorf("getting start position: %w", err)
}
// Optionally flush logs to start a new binlog file
if opts.FlushLogs || m.config.FlushLogsOnBackup {
if _, err := m.db.ExecContext(ctx, "FLUSH BINARY LOGS"); err != nil {
return nil, fmt.Errorf("flushing binary logs: %w", err)
}
// Get new position after flush
startPos, err = m.GetCurrentPosition(ctx)
if err != nil {
return nil, fmt.Errorf("getting position after flush: %w", err)
}
}
// Build mysqldump command
dumpArgs := []string{
"--single-transaction",
"--routines",
"--triggers",
"--events",
"--master-data=2", // Include binlog position as comment
}
if m.config.FlushLogsOnBackup {
dumpArgs = append(dumpArgs, "--flush-logs")
}
// Add connection params
if m.config.Host != "" {
dumpArgs = append(dumpArgs, "-h", m.config.Host)
}
if m.config.Port > 0 {
dumpArgs = append(dumpArgs, "-P", strconv.Itoa(m.config.Port))
}
if m.config.User != "" {
dumpArgs = append(dumpArgs, "-u", m.config.User)
}
if m.config.Password != "" {
dumpArgs = append(dumpArgs, "-p"+m.config.Password)
}
if m.config.Socket != "" {
dumpArgs = append(dumpArgs, "-S", m.config.Socket)
}
// Add database selection
if opts.Database != "" {
dumpArgs = append(dumpArgs, opts.Database)
} else {
dumpArgs = append(dumpArgs, "--all-databases")
}
// Create output file
timestamp := time.Now().Format("20060102_150405")
backupName := fmt.Sprintf("mysql_pitr_%s.sql", timestamp)
if opts.Compression {
backupName += ".gz"
}
backupPath := filepath.Join(opts.OutputPath, backupName)
if err := os.MkdirAll(opts.OutputPath, 0750); err != nil {
return nil, fmt.Errorf("creating output directory: %w", err)
}
// Run mysqldump
cmd := exec.CommandContext(ctx, "mysqldump", dumpArgs...)
// Create output file
outFile, err := os.Create(backupPath)
if err != nil {
return nil, fmt.Errorf("creating backup file: %w", err)
}
defer outFile.Close()
var writer io.WriteCloser = outFile
if opts.Compression {
gzWriter := NewGzipWriter(outFile, opts.CompressionLvl)
writer = gzWriter
defer gzWriter.Close()
}
cmd.Stdout = writer
cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil {
os.Remove(backupPath)
return nil, fmt.Errorf("mysqldump failed: %w", err)
}
// Close writers
if opts.Compression {
writer.Close()
}
// Get file size
info, err := os.Stat(backupPath)
if err != nil {
return nil, fmt.Errorf("getting backup info: %w", err)
}
// Serialize position for JSON storage
posJSON, _ := json.Marshal(startPos)
backupInfo := &PITRBackupInfo{
BackupFile: backupPath,
DatabaseType: m.serverType,
DatabaseName: opts.Database,
Timestamp: time.Now(),
ServerVersion: m.serverVersion,
ServerID: int(m.serverID),
Position: startPos,
PositionJSON: string(posJSON),
SizeBytes: info.Size(),
Compressed: opts.Compression,
Encrypted: opts.Encryption,
}
// Save metadata alongside backup
metadataPath := backupPath + ".meta"
metaData, _ := json.MarshalIndent(backupInfo, "", " ")
os.WriteFile(metadataPath, metaData, 0640)
return backupInfo, nil
}
// Restore performs a point-in-time restore
func (m *MySQLPITR) Restore(ctx context.Context, backup *PITRBackupInfo, target RestoreTarget) error {
// Step 1: Restore base backup
if err := m.restoreBaseBackup(ctx, backup); err != nil {
return fmt.Errorf("restoring base backup: %w", err)
}
// Step 2: If target time is after backup time, replay binlogs
if target.Type == RestoreTargetImmediate {
return nil // Just restore to backup point
}
// Parse start position from backup
var startPos BinlogPosition
if err := json.Unmarshal([]byte(backup.PositionJSON), &startPos); err != nil {
return fmt.Errorf("parsing backup position: %w", err)
}
// Step 3: Find binlogs to replay
binlogs, err := m.binlogManager.DiscoverBinlogs(ctx)
if err != nil {
return fmt.Errorf("discovering binlogs: %w", err)
}
// Find archived binlogs too
archivedBinlogs, _ := m.binlogManager.ListArchivedBinlogs(ctx)
var filesToReplay []string
// Determine which binlogs to replay based on target
switch target.Type {
case RestoreTargetTime:
if target.Time == nil {
return fmt.Errorf("target time not specified")
}
// Find binlogs in range
relevantBinlogs := m.binlogManager.FindBinlogsInRange(ctx, binlogs, backup.Timestamp, *target.Time)
for _, b := range relevantBinlogs {
filesToReplay = append(filesToReplay, b.Path)
}
// Also check archives
for _, a := range archivedBinlogs {
if compareBinlogFiles(a.OriginalFile, startPos.File) >= 0 {
if !a.EndTime.IsZero() && !a.EndTime.Before(backup.Timestamp) && !a.StartTime.After(*target.Time) {
filesToReplay = append(filesToReplay, a.ArchivePath)
}
}
}
case RestoreTargetPosition:
if target.Position == nil {
return fmt.Errorf("target position not specified")
}
targetPos, ok := target.Position.(*BinlogPosition)
if !ok {
return fmt.Errorf("invalid target position type")
}
// Find binlogs from start to target position
for _, b := range binlogs {
if compareBinlogFiles(b.Name, startPos.File) >= 0 &&
compareBinlogFiles(b.Name, targetPos.File) <= 0 {
filesToReplay = append(filesToReplay, b.Path)
}
}
}
if len(filesToReplay) == 0 {
// Nothing to replay, backup is already at or past target
return nil
}
// Step 4: Replay binlogs
replayOpts := ReplayOptions{
BinlogFiles: filesToReplay,
StartPosition: &startPos,
DryRun: target.DryRun,
MySQLHost: m.config.Host,
MySQLPort: m.config.Port,
MySQLUser: m.config.User,
MySQLPass: m.config.Password,
StopOnError: target.StopOnErr,
}
if target.Type == RestoreTargetTime && target.Time != nil {
replayOpts.StopTime = target.Time
}
if target.Type == RestoreTargetPosition && target.Position != nil {
replayOpts.StopPosition = target.Position
}
if target.DryRun {
replayOpts.Output = os.Stdout
}
return m.binlogManager.ReplayBinlogs(ctx, replayOpts)
}
// restoreBaseBackup restores the base MySQL backup
func (m *MySQLPITR) restoreBaseBackup(ctx context.Context, backup *PITRBackupInfo) error {
// Build mysql command
mysqlArgs := []string{}
if m.config.Host != "" {
mysqlArgs = append(mysqlArgs, "-h", m.config.Host)
}
if m.config.Port > 0 {
mysqlArgs = append(mysqlArgs, "-P", strconv.Itoa(m.config.Port))
}
if m.config.User != "" {
mysqlArgs = append(mysqlArgs, "-u", m.config.User)
}
if m.config.Password != "" {
mysqlArgs = append(mysqlArgs, "-p"+m.config.Password)
}
if m.config.Socket != "" {
mysqlArgs = append(mysqlArgs, "-S", m.config.Socket)
}
// Prepare input
var input io.Reader
backupFile, err := os.Open(backup.BackupFile)
if err != nil {
return fmt.Errorf("opening backup file: %w", err)
}
defer backupFile.Close()
input = backupFile
// Handle compressed backups
if backup.Compressed || strings.HasSuffix(backup.BackupFile, ".gz") {
gzReader, err := NewGzipReader(backupFile)
if err != nil {
return fmt.Errorf("creating gzip reader: %w", err)
}
defer gzReader.Close()
input = gzReader
}
// Run mysql
cmd := exec.CommandContext(ctx, "mysql", mysqlArgs...)
cmd.Stdin = input
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
return cmd.Run()
}
// ListRecoveryPoints lists available recovery points/ranges
func (m *MySQLPITR) ListRecoveryPoints(ctx context.Context) ([]RecoveryWindow, error) {
var windows []RecoveryWindow
// Find all backup metadata files
backupPattern := filepath.Join(m.config.ArchiveDir, "..", "*", "*.meta")
metaFiles, _ := filepath.Glob(backupPattern)
// Also check default backup locations
additionalPaths := []string{
filepath.Join(m.config.ArchiveDir, "*.meta"),
filepath.Join(m.config.RestoreDir, "*.meta"),
}
for _, p := range additionalPaths {
matches, _ := filepath.Glob(p)
metaFiles = append(metaFiles, matches...)
}
// Get current binlogs
binlogs, err := m.binlogManager.DiscoverBinlogs(ctx)
if err != nil {
binlogs = []BinlogFile{}
}
// Get archived binlogs
archivedBinlogs, _ := m.binlogManager.ListArchivedBinlogs(ctx)
for _, metaFile := range metaFiles {
data, err := os.ReadFile(metaFile)
if err != nil {
continue
}
var backup PITRBackupInfo
if err := json.Unmarshal(data, &backup); err != nil {
continue
}
// Parse position
var startPos BinlogPosition
json.Unmarshal([]byte(backup.PositionJSON), &startPos)
window := RecoveryWindow{
BaseBackup: backup.BackupFile,
BackupTime: backup.Timestamp,
StartTime: backup.Timestamp,
StartPosition: &startPos,
}
// Find binlogs available after this backup
var relevantBinlogs []string
var latestTime time.Time
var latestPos *BinlogPosition
for _, b := range binlogs {
if compareBinlogFiles(b.Name, startPos.File) >= 0 {
relevantBinlogs = append(relevantBinlogs, b.Name)
if !b.EndTime.IsZero() && b.EndTime.After(latestTime) {
latestTime = b.EndTime
latestPos = &BinlogPosition{
File: b.Name,
Position: b.EndPos,
GTID: b.GTID,
}
}
}
}
for _, a := range archivedBinlogs {
if compareBinlogFiles(a.OriginalFile, startPos.File) >= 0 {
relevantBinlogs = append(relevantBinlogs, a.OriginalFile)
if !a.EndTime.IsZero() && a.EndTime.After(latestTime) {
latestTime = a.EndTime
latestPos = &BinlogPosition{
File: a.OriginalFile,
Position: a.EndPos,
GTID: a.GTID,
}
}
}
}
window.LogFiles = relevantBinlogs
if !latestTime.IsZero() {
window.EndTime = latestTime
} else {
window.EndTime = time.Now()
}
window.EndPosition = latestPos
// Check for gaps
validation, _ := m.binlogManager.ValidateBinlogChain(ctx, binlogs)
if validation != nil {
window.HasGaps = !validation.Valid
for _, gap := range validation.Gaps {
window.GapDetails = append(window.GapDetails, gap.Reason)
}
}
windows = append(windows, window)
}
return windows, nil
}
// ValidateChain validates the log chain integrity
func (m *MySQLPITR) ValidateChain(ctx context.Context, from, to time.Time) (*ChainValidation, error) {
// Discover all binlogs
binlogs, err := m.binlogManager.DiscoverBinlogs(ctx)
if err != nil {
return nil, fmt.Errorf("discovering binlogs: %w", err)
}
// Filter to time range
relevant := m.binlogManager.FindBinlogsInRange(ctx, binlogs, from, to)
// Validate chain
return m.binlogManager.ValidateBinlogChain(ctx, relevant)
}
// ArchiveNewBinlogs archives any binlog files that haven't been archived yet
func (m *MySQLPITR) ArchiveNewBinlogs(ctx context.Context) ([]BinlogArchiveInfo, error) {
// Get current binlogs
binlogs, err := m.binlogManager.DiscoverBinlogs(ctx)
if err != nil {
return nil, fmt.Errorf("discovering binlogs: %w", err)
}
// Get already archived
archived, _ := m.binlogManager.ListArchivedBinlogs(ctx)
archivedSet := make(map[string]struct{})
for _, a := range archived {
archivedSet[a.OriginalFile] = struct{}{}
}
// Get current binlog file (don't archive the active one)
currentPos, _ := m.GetCurrentPosition(ctx)
currentFile := ""
if currentPos != nil {
currentFile = currentPos.File
}
var newArchives []BinlogArchiveInfo
for i := range binlogs {
b := &binlogs[i]
// Skip if already archived
if _, exists := archivedSet[b.Name]; exists {
continue
}
// Skip the current active binlog
if b.Name == currentFile {
continue
}
// Archive
archiveInfo, err := m.binlogManager.ArchiveBinlog(ctx, b)
if err != nil {
// Log but continue
continue
}
newArchives = append(newArchives, *archiveInfo)
}
// Update metadata
if len(newArchives) > 0 {
allArchived, _ := m.binlogManager.ListArchivedBinlogs(ctx)
m.binlogManager.SaveArchiveMetadata(allArchived)
}
return newArchives, nil
}
// PurgeBinlogs purges old binlog files based on retention policy
func (m *MySQLPITR) PurgeBinlogs(ctx context.Context) error {
if m.config.RetentionDays <= 0 {
return fmt.Errorf("retention days not configured")
}
cutoff := time.Now().AddDate(0, 0, -m.config.RetentionDays)
// Get archived binlogs
archived, err := m.binlogManager.ListArchivedBinlogs(ctx)
if err != nil {
return fmt.Errorf("listing archived binlogs: %w", err)
}
for _, a := range archived {
if a.ArchivedAt.Before(cutoff) {
os.Remove(a.ArchivePath)
}
}
return nil
}
// GzipWriter is a helper for gzip compression
type GzipWriter struct {
w *gzip.Writer
}
func NewGzipWriter(w io.Writer, level int) *GzipWriter {
if level <= 0 {
level = gzip.DefaultCompression
}
gw, _ := gzip.NewWriterLevel(w, level)
return &GzipWriter{w: gw}
}
func (g *GzipWriter) Write(p []byte) (int, error) {
return g.w.Write(p)
}
func (g *GzipWriter) Close() error {
return g.w.Close()
}
// GzipReader is a helper for gzip decompression
type GzipReader struct {
r *gzip.Reader
}
func NewGzipReader(r io.Reader) (*GzipReader, error) {
gr, err := gzip.NewReader(r)
if err != nil {
return nil, err
}
return &GzipReader{r: gr}, nil
}
func (g *GzipReader) Read(p []byte) (int, error) {
return g.r.Read(p)
}
func (g *GzipReader) Close() error {
return g.r.Close()
}
// ExtractBinlogPositionFromDump extracts the binlog position from a mysqldump file
func ExtractBinlogPositionFromDump(dumpPath string) (*BinlogPosition, error) {
file, err := os.Open(dumpPath)
if err != nil {
return nil, fmt.Errorf("opening dump file: %w", err)
}
defer file.Close()
var reader io.Reader = file
if strings.HasSuffix(dumpPath, ".gz") {
gzReader, err := gzip.NewReader(file)
if err != nil {
return nil, fmt.Errorf("creating gzip reader: %w", err)
}
defer gzReader.Close()
reader = gzReader
}
// Look for CHANGE MASTER TO or -- CHANGE MASTER TO comment
// Pattern: -- CHANGE MASTER TO MASTER_LOG_FILE='mysql-bin.000042', MASTER_LOG_POS=1234;
scanner := NewLimitedScanner(reader, 1000) // Only scan first 1000 lines
posPattern := regexp.MustCompile(`MASTER_LOG_FILE='([^']+)',\s*MASTER_LOG_POS=(\d+)`)
for scanner.Scan() {
line := scanner.Text()
if matches := posPattern.FindStringSubmatch(line); len(matches) == 3 {
pos, _ := strconv.ParseUint(matches[2], 10, 64)
return &BinlogPosition{
File: matches[1],
Position: pos,
}, nil
}
}
return nil, fmt.Errorf("binlog position not found in dump file")
}
// LimitedScanner wraps bufio.Scanner with a line limit
type LimitedScanner struct {
scanner *bufio.Scanner
limit int
count int
}
func NewLimitedScanner(r io.Reader, limit int) *LimitedScanner {
return &LimitedScanner{
scanner: bufio.NewScanner(r),
limit: limit,
}
}
func (s *LimitedScanner) Scan() bool {
if s.count >= s.limit {
return false
}
s.count++
return s.scanner.Scan()
}
func (s *LimitedScanner) Text() string {
return s.scanner.Text()
}