CRITICAL FIXES: - Encryption detection false positive (IsBackupEncrypted returned true for ALL files) - 12 cmd.Wait() deadlocks fixed with channel-based context handling - TUI timeout bugs: 60s->10min for safety checks, 15s->60s for DB listing - diagnose.go timeouts: 60s->5min for tar/pg_restore operations - Panic recovery added to parallel backup/restore goroutines - Variable shadowing fix in restore/engine.go These bugs caused pg_dump backups to fail through TUI for months.
870 lines
23 KiB
Go
870 lines
23 KiB
Go
// Package pitr provides Point-in-Time Recovery functionality
|
|
// This file contains MySQL/MariaDB binary log handling
|
|
package pitr
|
|
|
|
import (
|
|
"bufio"
|
|
"compress/gzip"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"regexp"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
// BinlogPosition represents a MySQL binary log position
|
|
type BinlogPosition struct {
|
|
File string `json:"file"` // Binary log filename (e.g., "mysql-bin.000042")
|
|
Position uint64 `json:"position"` // Byte position in the file
|
|
GTID string `json:"gtid,omitempty"` // GTID set (if available)
|
|
ServerID uint32 `json:"server_id,omitempty"`
|
|
}
|
|
|
|
// String returns a string representation of the binlog position
|
|
func (p *BinlogPosition) String() string {
|
|
if p.GTID != "" {
|
|
return fmt.Sprintf("%s:%d (GTID: %s)", p.File, p.Position, p.GTID)
|
|
}
|
|
return fmt.Sprintf("%s:%d", p.File, p.Position)
|
|
}
|
|
|
|
// IsZero returns true if the position is unset
|
|
func (p *BinlogPosition) IsZero() bool {
|
|
return p.File == "" && p.Position == 0 && p.GTID == ""
|
|
}
|
|
|
|
// Compare compares two binlog positions
|
|
// Returns -1 if p < other, 0 if equal, 1 if p > other
|
|
func (p *BinlogPosition) Compare(other LogPosition) int {
|
|
o, ok := other.(*BinlogPosition)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
|
|
// Compare by file first
|
|
fileComp := compareBinlogFiles(p.File, o.File)
|
|
if fileComp != 0 {
|
|
return fileComp
|
|
}
|
|
|
|
// Then by position within file
|
|
if p.Position < o.Position {
|
|
return -1
|
|
} else if p.Position > o.Position {
|
|
return 1
|
|
}
|
|
return 0
|
|
}
|
|
|
|
// ParseBinlogPosition parses a binlog position string
|
|
// Format: "filename:position" or "filename:position:gtid"
|
|
func ParseBinlogPosition(s string) (*BinlogPosition, error) {
|
|
parts := strings.SplitN(s, ":", 3)
|
|
if len(parts) < 2 {
|
|
return nil, fmt.Errorf("invalid binlog position format: %s (expected file:position)", s)
|
|
}
|
|
|
|
pos, err := strconv.ParseUint(parts[1], 10, 64)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("invalid position value: %s", parts[1])
|
|
}
|
|
|
|
bp := &BinlogPosition{
|
|
File: parts[0],
|
|
Position: pos,
|
|
}
|
|
|
|
if len(parts) == 3 {
|
|
bp.GTID = parts[2]
|
|
}
|
|
|
|
return bp, nil
|
|
}
|
|
|
|
// MarshalJSON serializes the binlog position to JSON
|
|
func (p *BinlogPosition) MarshalJSON() ([]byte, error) {
|
|
type Alias BinlogPosition
|
|
return json.Marshal((*Alias)(p))
|
|
}
|
|
|
|
// compareBinlogFiles compares two binlog filenames numerically
|
|
func compareBinlogFiles(a, b string) int {
|
|
numA := extractBinlogNumber(a)
|
|
numB := extractBinlogNumber(b)
|
|
|
|
if numA < numB {
|
|
return -1
|
|
} else if numA > numB {
|
|
return 1
|
|
}
|
|
return 0
|
|
}
|
|
|
|
// extractBinlogNumber extracts the numeric suffix from a binlog filename
|
|
func extractBinlogNumber(filename string) int {
|
|
// Match pattern like mysql-bin.000042
|
|
re := regexp.MustCompile(`\.(\d+)$`)
|
|
matches := re.FindStringSubmatch(filename)
|
|
if len(matches) < 2 {
|
|
return 0
|
|
}
|
|
num, _ := strconv.Atoi(matches[1])
|
|
return num
|
|
}
|
|
|
|
// BinlogFile represents a binary log file with metadata
|
|
type BinlogFile struct {
|
|
Name string `json:"name"`
|
|
Path string `json:"path"`
|
|
Size int64 `json:"size"`
|
|
ModTime time.Time `json:"mod_time"`
|
|
StartTime time.Time `json:"start_time,omitempty"` // First event timestamp
|
|
EndTime time.Time `json:"end_time,omitempty"` // Last event timestamp
|
|
StartPos uint64 `json:"start_pos"`
|
|
EndPos uint64 `json:"end_pos"`
|
|
GTID string `json:"gtid,omitempty"`
|
|
ServerID uint32 `json:"server_id,omitempty"`
|
|
Format string `json:"format,omitempty"` // ROW, STATEMENT, MIXED
|
|
Archived bool `json:"archived"`
|
|
ArchiveDir string `json:"archive_dir,omitempty"`
|
|
}
|
|
|
|
// BinlogArchiveInfo contains metadata about an archived binlog
|
|
type BinlogArchiveInfo struct {
|
|
OriginalFile string `json:"original_file"`
|
|
ArchivePath string `json:"archive_path"`
|
|
Size int64 `json:"size"`
|
|
Compressed bool `json:"compressed"`
|
|
Encrypted bool `json:"encrypted"`
|
|
Checksum string `json:"checksum"`
|
|
ArchivedAt time.Time `json:"archived_at"`
|
|
StartPos uint64 `json:"start_pos"`
|
|
EndPos uint64 `json:"end_pos"`
|
|
StartTime time.Time `json:"start_time"`
|
|
EndTime time.Time `json:"end_time"`
|
|
GTID string `json:"gtid,omitempty"`
|
|
}
|
|
|
|
// BinlogManager handles binary log operations
|
|
type BinlogManager struct {
|
|
mysqlbinlogPath string
|
|
binlogDir string
|
|
archiveDir string
|
|
compression bool
|
|
encryption bool
|
|
encryptionKey []byte
|
|
serverType DatabaseType // mysql or mariadb
|
|
}
|
|
|
|
// BinlogManagerConfig holds configuration for BinlogManager
|
|
type BinlogManagerConfig struct {
|
|
BinlogDir string
|
|
ArchiveDir string
|
|
Compression bool
|
|
Encryption bool
|
|
EncryptionKey []byte
|
|
}
|
|
|
|
// NewBinlogManager creates a new BinlogManager
|
|
func NewBinlogManager(config BinlogManagerConfig) (*BinlogManager, error) {
|
|
m := &BinlogManager{
|
|
binlogDir: config.BinlogDir,
|
|
archiveDir: config.ArchiveDir,
|
|
compression: config.Compression,
|
|
encryption: config.Encryption,
|
|
encryptionKey: config.EncryptionKey,
|
|
}
|
|
|
|
// Find mysqlbinlog executable
|
|
if err := m.detectTools(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return m, nil
|
|
}
|
|
|
|
// detectTools finds MySQL/MariaDB tools and determines server type
|
|
func (m *BinlogManager) detectTools() error {
|
|
// Try mariadb-binlog first (MariaDB)
|
|
if path, err := exec.LookPath("mariadb-binlog"); err == nil {
|
|
m.mysqlbinlogPath = path
|
|
m.serverType = DatabaseMariaDB
|
|
return nil
|
|
}
|
|
|
|
// Fall back to mysqlbinlog (MySQL or older MariaDB)
|
|
if path, err := exec.LookPath("mysqlbinlog"); err == nil {
|
|
m.mysqlbinlogPath = path
|
|
// Check if it's actually MariaDB's version
|
|
m.serverType = m.detectServerType()
|
|
return nil
|
|
}
|
|
|
|
return fmt.Errorf("mysqlbinlog or mariadb-binlog not found in PATH")
|
|
}
|
|
|
|
// detectServerType determines if we're working with MySQL or MariaDB
|
|
func (m *BinlogManager) detectServerType() DatabaseType {
|
|
// Use timeout to prevent blocking if command hangs
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
cmd := exec.CommandContext(ctx, m.mysqlbinlogPath, "--version")
|
|
output, err := cmd.Output()
|
|
if err != nil {
|
|
return DatabaseMySQL // Default to MySQL
|
|
}
|
|
|
|
if strings.Contains(strings.ToLower(string(output)), "mariadb") {
|
|
return DatabaseMariaDB
|
|
}
|
|
return DatabaseMySQL
|
|
}
|
|
|
|
// ServerType returns the detected server type
|
|
func (m *BinlogManager) ServerType() DatabaseType {
|
|
return m.serverType
|
|
}
|
|
|
|
// DiscoverBinlogs finds all binary log files in the configured directory
|
|
func (m *BinlogManager) DiscoverBinlogs(ctx context.Context) ([]BinlogFile, error) {
|
|
if m.binlogDir == "" {
|
|
return nil, fmt.Errorf("binlog directory not configured")
|
|
}
|
|
|
|
entries, err := os.ReadDir(m.binlogDir)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("reading binlog directory: %w", err)
|
|
}
|
|
|
|
var binlogs []BinlogFile
|
|
binlogPattern := regexp.MustCompile(`^[a-zA-Z0-9_-]+-bin\.\d{6}$`)
|
|
|
|
for _, entry := range entries {
|
|
if entry.IsDir() {
|
|
continue
|
|
}
|
|
|
|
// Check if it matches binlog naming convention
|
|
if !binlogPattern.MatchString(entry.Name()) {
|
|
continue
|
|
}
|
|
|
|
info, err := entry.Info()
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
binlog := BinlogFile{
|
|
Name: entry.Name(),
|
|
Path: filepath.Join(m.binlogDir, entry.Name()),
|
|
Size: info.Size(),
|
|
ModTime: info.ModTime(),
|
|
}
|
|
|
|
// Get binlog metadata using mysqlbinlog
|
|
if err := m.enrichBinlogMetadata(ctx, &binlog); err != nil {
|
|
// Log but don't fail - we can still use basic info
|
|
binlog.StartPos = 4 // Magic number size
|
|
}
|
|
|
|
binlogs = append(binlogs, binlog)
|
|
}
|
|
|
|
// Sort by file number
|
|
sort.Slice(binlogs, func(i, j int) bool {
|
|
return compareBinlogFiles(binlogs[i].Name, binlogs[j].Name) < 0
|
|
})
|
|
|
|
return binlogs, nil
|
|
}
|
|
|
|
// enrichBinlogMetadata extracts metadata from a binlog file
|
|
func (m *BinlogManager) enrichBinlogMetadata(ctx context.Context, binlog *BinlogFile) error {
|
|
// Use mysqlbinlog to read header and extract timestamps
|
|
cmd := exec.CommandContext(ctx, m.mysqlbinlogPath,
|
|
"--no-defaults",
|
|
"--start-position=4",
|
|
"--stop-position=1000", // Just read header area
|
|
binlog.Path,
|
|
)
|
|
|
|
output, err := cmd.Output()
|
|
if err != nil {
|
|
// Try without position limits
|
|
cmd = exec.CommandContext(ctx, m.mysqlbinlogPath,
|
|
"--no-defaults",
|
|
"-v", // Verbose mode for more info
|
|
binlog.Path,
|
|
)
|
|
output, _ = cmd.Output()
|
|
}
|
|
|
|
// Parse output for metadata
|
|
m.parseBinlogOutput(string(output), binlog)
|
|
|
|
// Get file size for end position
|
|
if binlog.EndPos == 0 {
|
|
binlog.EndPos = uint64(binlog.Size)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// parseBinlogOutput parses mysqlbinlog output to extract metadata
|
|
func (m *BinlogManager) parseBinlogOutput(output string, binlog *BinlogFile) {
|
|
lines := strings.Split(output, "\n")
|
|
|
|
// Pattern for timestamp: #YYMMDD HH:MM:SS
|
|
timestampRe := regexp.MustCompile(`#(\d{6})\s+(\d{1,2}:\d{2}:\d{2})`)
|
|
// Pattern for server_id
|
|
serverIDRe := regexp.MustCompile(`server id\s+(\d+)`)
|
|
// Pattern for end_log_pos
|
|
endPosRe := regexp.MustCompile(`end_log_pos\s+(\d+)`)
|
|
// Pattern for binlog format
|
|
formatRe := regexp.MustCompile(`binlog_format=(\w+)`)
|
|
// Pattern for GTID
|
|
gtidRe := regexp.MustCompile(`SET @@SESSION.GTID_NEXT=\s*'([^']+)'`)
|
|
mariaGtidRe := regexp.MustCompile(`GTID\s+(\d+-\d+-\d+)`)
|
|
|
|
var firstTimestamp, lastTimestamp time.Time
|
|
var maxEndPos uint64
|
|
|
|
for _, line := range lines {
|
|
// Extract timestamps
|
|
if matches := timestampRe.FindStringSubmatch(line); len(matches) == 3 {
|
|
// Parse YYMMDD format
|
|
dateStr := matches[1]
|
|
timeStr := matches[2]
|
|
if t, err := time.Parse("060102 15:04:05", dateStr+" "+timeStr); err == nil {
|
|
if firstTimestamp.IsZero() {
|
|
firstTimestamp = t
|
|
}
|
|
lastTimestamp = t
|
|
}
|
|
}
|
|
|
|
// Extract server_id
|
|
if matches := serverIDRe.FindStringSubmatch(line); len(matches) == 2 {
|
|
if id, err := strconv.ParseUint(matches[1], 10, 32); err == nil {
|
|
binlog.ServerID = uint32(id)
|
|
}
|
|
}
|
|
|
|
// Extract end_log_pos (track max for EndPos)
|
|
if matches := endPosRe.FindStringSubmatch(line); len(matches) == 2 {
|
|
if pos, err := strconv.ParseUint(matches[1], 10, 64); err == nil {
|
|
if pos > maxEndPos {
|
|
maxEndPos = pos
|
|
}
|
|
}
|
|
}
|
|
|
|
// Extract format
|
|
if matches := formatRe.FindStringSubmatch(line); len(matches) == 2 {
|
|
binlog.Format = matches[1]
|
|
}
|
|
|
|
// Extract GTID (MySQL format)
|
|
if matches := gtidRe.FindStringSubmatch(line); len(matches) == 2 {
|
|
binlog.GTID = matches[1]
|
|
}
|
|
|
|
// Extract GTID (MariaDB format)
|
|
if matches := mariaGtidRe.FindStringSubmatch(line); len(matches) == 2 {
|
|
binlog.GTID = matches[1]
|
|
}
|
|
}
|
|
|
|
if !firstTimestamp.IsZero() {
|
|
binlog.StartTime = firstTimestamp
|
|
}
|
|
if !lastTimestamp.IsZero() {
|
|
binlog.EndTime = lastTimestamp
|
|
}
|
|
if maxEndPos > 0 {
|
|
binlog.EndPos = maxEndPos
|
|
}
|
|
}
|
|
|
|
// GetCurrentPosition retrieves the current binary log position from MySQL
|
|
func (m *BinlogManager) GetCurrentPosition(ctx context.Context, dsn string) (*BinlogPosition, error) {
|
|
// This would typically connect to MySQL and run SHOW MASTER STATUS
|
|
// For now, return an error indicating it needs to be called with a connection
|
|
return nil, fmt.Errorf("GetCurrentPosition requires a database connection - use MySQLPITR.GetCurrentPosition instead")
|
|
}
|
|
|
|
// ArchiveBinlog archives a single binlog file to the archive directory
|
|
func (m *BinlogManager) ArchiveBinlog(ctx context.Context, binlog *BinlogFile) (*BinlogArchiveInfo, error) {
|
|
if m.archiveDir == "" {
|
|
return nil, fmt.Errorf("archive directory not configured")
|
|
}
|
|
|
|
// Ensure archive directory exists
|
|
if err := os.MkdirAll(m.archiveDir, 0750); err != nil {
|
|
return nil, fmt.Errorf("creating archive directory: %w", err)
|
|
}
|
|
|
|
archiveName := binlog.Name
|
|
if m.compression {
|
|
archiveName += ".gz"
|
|
}
|
|
archivePath := filepath.Join(m.archiveDir, archiveName)
|
|
|
|
// Check if already archived
|
|
if _, err := os.Stat(archivePath); err == nil {
|
|
return nil, fmt.Errorf("binlog already archived: %s", archivePath)
|
|
}
|
|
|
|
// Open source file
|
|
src, err := os.Open(binlog.Path)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("opening binlog: %w", err)
|
|
}
|
|
defer src.Close()
|
|
|
|
// Create destination file
|
|
dst, err := os.OpenFile(archivePath, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0640)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("creating archive file: %w", err)
|
|
}
|
|
defer dst.Close()
|
|
|
|
var writer io.Writer = dst
|
|
var gzWriter *gzip.Writer
|
|
|
|
if m.compression {
|
|
gzWriter = gzip.NewWriter(dst)
|
|
writer = gzWriter
|
|
defer gzWriter.Close()
|
|
}
|
|
|
|
// TODO: Add encryption layer if enabled
|
|
if m.encryption && len(m.encryptionKey) > 0 {
|
|
// Encryption would be added here
|
|
}
|
|
|
|
// Copy file content
|
|
written, err := io.Copy(writer, src)
|
|
if err != nil {
|
|
os.Remove(archivePath) // Cleanup on error
|
|
return nil, fmt.Errorf("copying binlog: %w", err)
|
|
}
|
|
|
|
// Close gzip writer to flush
|
|
if gzWriter != nil {
|
|
if err := gzWriter.Close(); err != nil {
|
|
os.Remove(archivePath)
|
|
return nil, fmt.Errorf("closing gzip writer: %w", err)
|
|
}
|
|
}
|
|
|
|
// Get final archive size
|
|
archiveInfo, err := os.Stat(archivePath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("getting archive info: %w", err)
|
|
}
|
|
|
|
// Calculate checksum (simple for now - could use SHA256)
|
|
checksum := fmt.Sprintf("size:%d", written)
|
|
|
|
return &BinlogArchiveInfo{
|
|
OriginalFile: binlog.Name,
|
|
ArchivePath: archivePath,
|
|
Size: archiveInfo.Size(),
|
|
Compressed: m.compression,
|
|
Encrypted: m.encryption,
|
|
Checksum: checksum,
|
|
ArchivedAt: time.Now(),
|
|
StartPos: binlog.StartPos,
|
|
EndPos: binlog.EndPos,
|
|
StartTime: binlog.StartTime,
|
|
EndTime: binlog.EndTime,
|
|
GTID: binlog.GTID,
|
|
}, nil
|
|
}
|
|
|
|
// ListArchivedBinlogs returns all archived binlog files
|
|
func (m *BinlogManager) ListArchivedBinlogs(ctx context.Context) ([]BinlogArchiveInfo, error) {
|
|
if m.archiveDir == "" {
|
|
return nil, fmt.Errorf("archive directory not configured")
|
|
}
|
|
|
|
entries, err := os.ReadDir(m.archiveDir)
|
|
if err != nil {
|
|
if os.IsNotExist(err) {
|
|
return []BinlogArchiveInfo{}, nil
|
|
}
|
|
return nil, fmt.Errorf("reading archive directory: %w", err)
|
|
}
|
|
|
|
var archives []BinlogArchiveInfo
|
|
metadataPath := filepath.Join(m.archiveDir, "metadata.json")
|
|
|
|
// Try to load metadata file for enriched info
|
|
metadata := m.loadArchiveMetadata(metadataPath)
|
|
|
|
for _, entry := range entries {
|
|
if entry.IsDir() || entry.Name() == "metadata.json" {
|
|
continue
|
|
}
|
|
|
|
info, err := entry.Info()
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
originalName := entry.Name()
|
|
compressed := false
|
|
if strings.HasSuffix(originalName, ".gz") {
|
|
originalName = strings.TrimSuffix(originalName, ".gz")
|
|
compressed = true
|
|
}
|
|
|
|
archive := BinlogArchiveInfo{
|
|
OriginalFile: originalName,
|
|
ArchivePath: filepath.Join(m.archiveDir, entry.Name()),
|
|
Size: info.Size(),
|
|
Compressed: compressed,
|
|
ArchivedAt: info.ModTime(),
|
|
}
|
|
|
|
// Enrich from metadata if available
|
|
if meta, ok := metadata[originalName]; ok {
|
|
archive.StartPos = meta.StartPos
|
|
archive.EndPos = meta.EndPos
|
|
archive.StartTime = meta.StartTime
|
|
archive.EndTime = meta.EndTime
|
|
archive.GTID = meta.GTID
|
|
archive.Checksum = meta.Checksum
|
|
}
|
|
|
|
archives = append(archives, archive)
|
|
}
|
|
|
|
// Sort by file number
|
|
sort.Slice(archives, func(i, j int) bool {
|
|
return compareBinlogFiles(archives[i].OriginalFile, archives[j].OriginalFile) < 0
|
|
})
|
|
|
|
return archives, nil
|
|
}
|
|
|
|
// loadArchiveMetadata loads the metadata.json file if it exists
|
|
func (m *BinlogManager) loadArchiveMetadata(path string) map[string]BinlogArchiveInfo {
|
|
result := make(map[string]BinlogArchiveInfo)
|
|
|
|
data, err := os.ReadFile(path)
|
|
if err != nil {
|
|
return result
|
|
}
|
|
|
|
var archives []BinlogArchiveInfo
|
|
if err := json.Unmarshal(data, &archives); err != nil {
|
|
return result
|
|
}
|
|
|
|
for _, a := range archives {
|
|
result[a.OriginalFile] = a
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
// SaveArchiveMetadata saves metadata for all archived binlogs
|
|
func (m *BinlogManager) SaveArchiveMetadata(archives []BinlogArchiveInfo) error {
|
|
if m.archiveDir == "" {
|
|
return fmt.Errorf("archive directory not configured")
|
|
}
|
|
|
|
metadataPath := filepath.Join(m.archiveDir, "metadata.json")
|
|
data, err := json.MarshalIndent(archives, "", " ")
|
|
if err != nil {
|
|
return fmt.Errorf("marshaling metadata: %w", err)
|
|
}
|
|
|
|
return os.WriteFile(metadataPath, data, 0640)
|
|
}
|
|
|
|
// ValidateBinlogChain validates the integrity of the binlog chain
|
|
func (m *BinlogManager) ValidateBinlogChain(ctx context.Context, binlogs []BinlogFile) (*ChainValidation, error) {
|
|
result := &ChainValidation{
|
|
Valid: true,
|
|
LogCount: len(binlogs),
|
|
}
|
|
|
|
if len(binlogs) == 0 {
|
|
result.Warnings = append(result.Warnings, "no binlog files found")
|
|
return result, nil
|
|
}
|
|
|
|
// Sort binlogs by file number
|
|
sorted := make([]BinlogFile, len(binlogs))
|
|
copy(sorted, binlogs)
|
|
sort.Slice(sorted, func(i, j int) bool {
|
|
return compareBinlogFiles(sorted[i].Name, sorted[j].Name) < 0
|
|
})
|
|
|
|
result.StartPos = &BinlogPosition{
|
|
File: sorted[0].Name,
|
|
Position: sorted[0].StartPos,
|
|
GTID: sorted[0].GTID,
|
|
}
|
|
result.EndPos = &BinlogPosition{
|
|
File: sorted[len(sorted)-1].Name,
|
|
Position: sorted[len(sorted)-1].EndPos,
|
|
GTID: sorted[len(sorted)-1].GTID,
|
|
}
|
|
|
|
// Check for gaps in sequence
|
|
var prevNum int
|
|
var prevName string
|
|
var prevServerID uint32
|
|
|
|
for i, binlog := range sorted {
|
|
result.TotalSize += binlog.Size
|
|
|
|
num := extractBinlogNumber(binlog.Name)
|
|
|
|
if i > 0 {
|
|
// Check sequence continuity
|
|
if num != prevNum+1 {
|
|
gap := LogGap{
|
|
After: prevName,
|
|
Before: binlog.Name,
|
|
Reason: fmt.Sprintf("missing binlog file(s) %d to %d", prevNum+1, num-1),
|
|
}
|
|
result.Gaps = append(result.Gaps, gap)
|
|
result.Valid = false
|
|
}
|
|
|
|
// Check server_id consistency
|
|
if binlog.ServerID != 0 && prevServerID != 0 && binlog.ServerID != prevServerID {
|
|
result.Warnings = append(result.Warnings,
|
|
fmt.Sprintf("server_id changed from %d to %d at %s (possible master failover)",
|
|
prevServerID, binlog.ServerID, binlog.Name))
|
|
}
|
|
}
|
|
|
|
prevNum = num
|
|
prevName = binlog.Name
|
|
if binlog.ServerID != 0 {
|
|
prevServerID = binlog.ServerID
|
|
}
|
|
}
|
|
|
|
if len(result.Gaps) > 0 {
|
|
result.Errors = append(result.Errors,
|
|
fmt.Sprintf("found %d gap(s) in binlog chain", len(result.Gaps)))
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// ReplayBinlogs replays binlog events to a target time or position
|
|
func (m *BinlogManager) ReplayBinlogs(ctx context.Context, opts ReplayOptions) error {
|
|
if len(opts.BinlogFiles) == 0 {
|
|
return fmt.Errorf("no binlog files specified")
|
|
}
|
|
|
|
// Build mysqlbinlog command
|
|
args := []string{"--no-defaults"}
|
|
|
|
// Add start position if specified
|
|
if opts.StartPosition != nil && !opts.StartPosition.IsZero() {
|
|
startPos, ok := opts.StartPosition.(*BinlogPosition)
|
|
if ok && startPos.Position > 0 {
|
|
args = append(args, fmt.Sprintf("--start-position=%d", startPos.Position))
|
|
}
|
|
}
|
|
|
|
// Add stop time or position
|
|
if opts.StopTime != nil && !opts.StopTime.IsZero() {
|
|
args = append(args, fmt.Sprintf("--stop-datetime=%s", opts.StopTime.Format("2006-01-02 15:04:05")))
|
|
}
|
|
|
|
if opts.StopPosition != nil && !opts.StopPosition.IsZero() {
|
|
stopPos, ok := opts.StopPosition.(*BinlogPosition)
|
|
if ok && stopPos.Position > 0 {
|
|
args = append(args, fmt.Sprintf("--stop-position=%d", stopPos.Position))
|
|
}
|
|
}
|
|
|
|
// Add binlog files
|
|
args = append(args, opts.BinlogFiles...)
|
|
|
|
if opts.DryRun {
|
|
// Just decode and show SQL
|
|
args = append([]string{args[0]}, append([]string{"-v"}, args[1:]...)...)
|
|
cmd := exec.CommandContext(ctx, m.mysqlbinlogPath, args...)
|
|
output, err := cmd.Output()
|
|
if err != nil {
|
|
return fmt.Errorf("parsing binlogs: %w", err)
|
|
}
|
|
if opts.Output != nil {
|
|
opts.Output.Write(output)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Pipe to mysql for replay
|
|
mysqlCmd := exec.CommandContext(ctx, "mysql",
|
|
"-u", opts.MySQLUser,
|
|
"-p"+opts.MySQLPass,
|
|
"-h", opts.MySQLHost,
|
|
"-P", strconv.Itoa(opts.MySQLPort),
|
|
)
|
|
|
|
binlogCmd := exec.CommandContext(ctx, m.mysqlbinlogPath, args...)
|
|
|
|
// Pipe mysqlbinlog output to mysql
|
|
pipe, err := binlogCmd.StdoutPipe()
|
|
if err != nil {
|
|
return fmt.Errorf("creating pipe: %w", err)
|
|
}
|
|
mysqlCmd.Stdin = pipe
|
|
|
|
// Capture stderr for error reporting
|
|
var binlogStderr, mysqlStderr strings.Builder
|
|
binlogCmd.Stderr = &binlogStderr
|
|
mysqlCmd.Stderr = &mysqlStderr
|
|
|
|
// Start commands
|
|
if err := binlogCmd.Start(); err != nil {
|
|
return fmt.Errorf("starting mysqlbinlog: %w", err)
|
|
}
|
|
if err := mysqlCmd.Start(); err != nil {
|
|
binlogCmd.Process.Kill()
|
|
return fmt.Errorf("starting mysql: %w", err)
|
|
}
|
|
|
|
// Wait for completion
|
|
binlogErr := binlogCmd.Wait()
|
|
mysqlErr := mysqlCmd.Wait()
|
|
|
|
if binlogErr != nil {
|
|
return fmt.Errorf("mysqlbinlog failed: %w\nstderr: %s", binlogErr, binlogStderr.String())
|
|
}
|
|
if mysqlErr != nil {
|
|
return fmt.Errorf("mysql replay failed: %w\nstderr: %s", mysqlErr, mysqlStderr.String())
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ReplayOptions holds options for replaying binlog files
|
|
type ReplayOptions struct {
|
|
BinlogFiles []string // Files to replay (in order)
|
|
StartPosition LogPosition // Start from this position
|
|
StopTime *time.Time // Stop at this time
|
|
StopPosition LogPosition // Stop at this position
|
|
DryRun bool // Just show what would be done
|
|
Output io.Writer // For dry-run output
|
|
MySQLHost string // MySQL host for replay
|
|
MySQLPort int // MySQL port
|
|
MySQLUser string // MySQL user
|
|
MySQLPass string // MySQL password
|
|
Database string // Limit to specific database
|
|
StopOnError bool // Stop on first error
|
|
}
|
|
|
|
// FindBinlogsInRange finds binlog files containing events within a time range
|
|
func (m *BinlogManager) FindBinlogsInRange(ctx context.Context, binlogs []BinlogFile, start, end time.Time) []BinlogFile {
|
|
var result []BinlogFile
|
|
|
|
for _, b := range binlogs {
|
|
// Include if binlog time range overlaps with requested range
|
|
if b.EndTime.IsZero() && b.StartTime.IsZero() {
|
|
// No timestamp info, include to be safe
|
|
result = append(result, b)
|
|
continue
|
|
}
|
|
|
|
// Check for overlap
|
|
binlogStart := b.StartTime
|
|
binlogEnd := b.EndTime
|
|
if binlogEnd.IsZero() {
|
|
binlogEnd = time.Now() // Assume current file goes to now
|
|
}
|
|
|
|
if !binlogStart.After(end) && !binlogEnd.Before(start) {
|
|
result = append(result, b)
|
|
}
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
// WatchBinlogs monitors for new binlog files and archives them
|
|
func (m *BinlogManager) WatchBinlogs(ctx context.Context, interval time.Duration, callback func(*BinlogFile)) error {
|
|
if m.binlogDir == "" {
|
|
return fmt.Errorf("binlog directory not configured")
|
|
}
|
|
|
|
// Get initial list
|
|
known := make(map[string]struct{})
|
|
binlogs, err := m.DiscoverBinlogs(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, b := range binlogs {
|
|
known[b.Name] = struct{}{}
|
|
}
|
|
|
|
ticker := time.NewTicker(interval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-ticker.C:
|
|
binlogs, err := m.DiscoverBinlogs(ctx)
|
|
if err != nil {
|
|
continue // Log error but keep watching
|
|
}
|
|
|
|
for _, b := range binlogs {
|
|
if _, exists := known[b.Name]; !exists {
|
|
// New binlog found
|
|
known[b.Name] = struct{}{}
|
|
if callback != nil {
|
|
callback(&b)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// ParseBinlogIndex reads the binlog index file
|
|
func (m *BinlogManager) ParseBinlogIndex(indexPath string) ([]string, error) {
|
|
file, err := os.Open(indexPath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("opening index file: %w", err)
|
|
}
|
|
defer file.Close()
|
|
|
|
var binlogs []string
|
|
scanner := bufio.NewScanner(file)
|
|
for scanner.Scan() {
|
|
line := strings.TrimSpace(scanner.Text())
|
|
if line != "" {
|
|
binlogs = append(binlogs, line)
|
|
}
|
|
}
|
|
|
|
if err := scanner.Err(); err != nil {
|
|
return nil, fmt.Errorf("reading index file: %w", err)
|
|
}
|
|
|
|
return binlogs, nil
|
|
}
|