Files
dbbackup/internal/catalog/sync.go
Alexander Renz f69bfe7071 feat: Add enterprise DBA features for production reliability
New features implemented:

1. Backup Catalog (internal/catalog/)
   - SQLite-based backup tracking
   - Gap detection and RPO monitoring
   - Search and statistics
   - Filesystem sync

2. DR Drill Testing (internal/drill/)
   - Automated restore testing in Docker containers
   - Database validation with custom queries
   - Catalog integration for drill-tested status

3. Smart Notifications (internal/notify/)
   - Event batching with configurable intervals
   - Time-based escalation policies
   - HTML/text/Slack templates

4. Compliance Reports (internal/report/)
   - SOC2, GDPR, HIPAA, PCI-DSS, ISO27001 frameworks
   - Evidence collection from catalog
   - JSON, Markdown, HTML output formats

5. RTO/RPO Calculator (internal/rto/)
   - Recovery objective analysis
   - RTO breakdown by phase
   - Recommendations for improvement

6. Replica-Aware Backup (internal/replica/)
   - Topology detection for PostgreSQL/MySQL
   - Automatic replica selection
   - Configurable selection strategies

7. Parallel Table Backup (internal/parallel/)
   - Concurrent table dumps
   - Worker pool with progress tracking
   - Large table optimization

8. MySQL/MariaDB PITR (internal/pitr/)
   - Binary log parsing and replay
   - Point-in-time recovery support
   - Transaction filtering

CLI commands added: catalog, drill, report, rto

All changes support the goal: reliable 3 AM database recovery.
2025-12-13 20:28:55 +01:00

235 lines
6.9 KiB
Go

// Package catalog - Sync functionality for importing backups into catalog
package catalog
import (
"context"
"database/sql"
"fmt"
"os"
"path/filepath"
"strings"
"time"
"dbbackup/internal/metadata"
)
// SyncFromDirectory scans a directory and imports backup metadata into the catalog
func (c *SQLiteCatalog) SyncFromDirectory(ctx context.Context, dir string) (*SyncResult, error) {
start := time.Now()
result := &SyncResult{}
// Find all metadata files
pattern := filepath.Join(dir, "*.meta.json")
matches, err := filepath.Glob(pattern)
if err != nil {
return nil, fmt.Errorf("failed to scan directory: %w", err)
}
// Also check subdirectories
subPattern := filepath.Join(dir, "*", "*.meta.json")
subMatches, _ := filepath.Glob(subPattern)
matches = append(matches, subMatches...)
for _, metaPath := range matches {
// Derive backup file path from metadata path
backupPath := strings.TrimSuffix(metaPath, ".meta.json")
// Check if backup file exists
if _, err := os.Stat(backupPath); os.IsNotExist(err) {
result.Details = append(result.Details,
fmt.Sprintf("SKIP: %s (backup file missing)", filepath.Base(backupPath)))
continue
}
// Load metadata
meta, err := metadata.Load(backupPath)
if err != nil {
result.Errors++
result.Details = append(result.Details,
fmt.Sprintf("ERROR: %s - %v", filepath.Base(backupPath), err))
continue
}
// Check if already in catalog
existing, _ := c.GetByPath(ctx, backupPath)
if existing != nil {
// Update if metadata changed
if existing.SHA256 != meta.SHA256 || existing.SizeBytes != meta.SizeBytes {
entry := metadataToEntry(meta, backupPath)
entry.ID = existing.ID
if err := c.Update(ctx, entry); err != nil {
result.Errors++
result.Details = append(result.Details,
fmt.Sprintf("ERROR updating: %s - %v", filepath.Base(backupPath), err))
} else {
result.Updated++
}
}
continue
}
// Add new entry
entry := metadataToEntry(meta, backupPath)
if err := c.Add(ctx, entry); err != nil {
result.Errors++
result.Details = append(result.Details,
fmt.Sprintf("ERROR adding: %s - %v", filepath.Base(backupPath), err))
} else {
result.Added++
result.Details = append(result.Details,
fmt.Sprintf("ADDED: %s (%s)", filepath.Base(backupPath), FormatSize(meta.SizeBytes)))
}
}
// Check for removed backups (backups in catalog but not on disk)
entries, _ := c.Search(ctx, &SearchQuery{})
for _, entry := range entries {
if !strings.HasPrefix(entry.BackupPath, dir) {
continue
}
if _, err := os.Stat(entry.BackupPath); os.IsNotExist(err) {
// Mark as deleted
entry.Status = StatusDeleted
c.Update(ctx, entry)
result.Removed++
result.Details = append(result.Details,
fmt.Sprintf("REMOVED: %s (file not found)", filepath.Base(entry.BackupPath)))
}
}
result.Duration = time.Since(start).Seconds()
return result, nil
}
// SyncFromCloud imports backups from cloud storage
func (c *SQLiteCatalog) SyncFromCloud(ctx context.Context, provider, bucket, prefix string) (*SyncResult, error) {
// This will be implemented when integrating with cloud package
// For now, return a placeholder
return &SyncResult{
Details: []string{"Cloud sync not yet implemented - use directory sync instead"},
}, nil
}
// metadataToEntry converts backup metadata to a catalog entry
func metadataToEntry(meta *metadata.BackupMetadata, backupPath string) *Entry {
entry := &Entry{
Database: meta.Database,
DatabaseType: meta.DatabaseType,
Host: meta.Host,
Port: meta.Port,
BackupPath: backupPath,
BackupType: meta.BackupType,
SizeBytes: meta.SizeBytes,
SHA256: meta.SHA256,
Compression: meta.Compression,
Encrypted: meta.Encrypted,
CreatedAt: meta.Timestamp,
Duration: meta.Duration,
Status: StatusCompleted,
Metadata: meta.ExtraInfo,
}
if entry.BackupType == "" {
entry.BackupType = "full"
}
return entry
}
// ImportEntry creates a catalog entry directly from backup file info
func (c *SQLiteCatalog) ImportEntry(ctx context.Context, backupPath string, info os.FileInfo, dbName, dbType string) error {
entry := &Entry{
Database: dbName,
DatabaseType: dbType,
BackupPath: backupPath,
BackupType: "full",
SizeBytes: info.Size(),
CreatedAt: info.ModTime(),
Status: StatusCompleted,
}
// Detect compression from extension
switch {
case strings.HasSuffix(backupPath, ".gz"):
entry.Compression = "gzip"
case strings.HasSuffix(backupPath, ".lz4"):
entry.Compression = "lz4"
case strings.HasSuffix(backupPath, ".zst"):
entry.Compression = "zstd"
}
// Check if encrypted
if strings.Contains(backupPath, ".enc") {
entry.Encrypted = true
}
// Try to load metadata if exists
if meta, err := metadata.Load(backupPath); err == nil {
entry.SHA256 = meta.SHA256
entry.Duration = meta.Duration
entry.Host = meta.Host
entry.Port = meta.Port
entry.Metadata = meta.ExtraInfo
}
return c.Add(ctx, entry)
}
// SyncStatus returns the sync status summary
type SyncStatus struct {
LastSync *time.Time `json:"last_sync,omitempty"`
TotalEntries int64 `json:"total_entries"`
ActiveEntries int64 `json:"active_entries"`
DeletedEntries int64 `json:"deleted_entries"`
Directories []string `json:"directories"`
}
// GetSyncStatus returns the current sync status
func (c *SQLiteCatalog) GetSyncStatus(ctx context.Context) (*SyncStatus, error) {
status := &SyncStatus{}
// Get last sync time
var lastSync sql.NullString
c.db.QueryRowContext(ctx, "SELECT value FROM catalog_meta WHERE key = 'last_sync'").Scan(&lastSync)
if lastSync.Valid {
if t, err := time.Parse(time.RFC3339, lastSync.String); err == nil {
status.LastSync = &t
}
}
// Count entries
c.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM backups").Scan(&status.TotalEntries)
c.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM backups WHERE status != 'deleted'").Scan(&status.ActiveEntries)
c.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM backups WHERE status = 'deleted'").Scan(&status.DeletedEntries)
// Get unique directories
rows, _ := c.db.QueryContext(ctx, `
SELECT DISTINCT
CASE
WHEN instr(backup_path, '/') > 0
THEN substr(backup_path, 1, length(backup_path) - length(replace(backup_path, '/', '')) - length(substr(backup_path, length(backup_path) - length(replace(backup_path, '/', '')) + 2)))
ELSE backup_path
END as dir
FROM backups WHERE status != 'deleted'
`)
if rows != nil {
defer rows.Close()
for rows.Next() {
var dir string
rows.Scan(&dir)
status.Directories = append(status.Directories, dir)
}
}
return status, nil
}
// SetLastSync updates the last sync timestamp
func (c *SQLiteCatalog) SetLastSync(ctx context.Context) error {
_, err := c.db.ExecContext(ctx, `
INSERT OR REPLACE INTO catalog_meta (key, value, updated_at)
VALUES ('last_sync', ?, CURRENT_TIMESTAMP)
`, time.Now().Format(time.RFC3339))
return err
}