New features implemented: 1. Backup Catalog (internal/catalog/) - SQLite-based backup tracking - Gap detection and RPO monitoring - Search and statistics - Filesystem sync 2. DR Drill Testing (internal/drill/) - Automated restore testing in Docker containers - Database validation with custom queries - Catalog integration for drill-tested status 3. Smart Notifications (internal/notify/) - Event batching with configurable intervals - Time-based escalation policies - HTML/text/Slack templates 4. Compliance Reports (internal/report/) - SOC2, GDPR, HIPAA, PCI-DSS, ISO27001 frameworks - Evidence collection from catalog - JSON, Markdown, HTML output formats 5. RTO/RPO Calculator (internal/rto/) - Recovery objective analysis - RTO breakdown by phase - Recommendations for improvement 6. Replica-Aware Backup (internal/replica/) - Topology detection for PostgreSQL/MySQL - Automatic replica selection - Configurable selection strategies 7. Parallel Table Backup (internal/parallel/) - Concurrent table dumps - Worker pool with progress tracking - Large table optimization 8. MySQL/MariaDB PITR (internal/pitr/) - Binary log parsing and replay - Point-in-time recovery support - Transaction filtering CLI commands added: catalog, drill, report, rto All changes support the goal: reliable 3 AM database recovery.
364 lines
8.7 KiB
Go
364 lines
8.7 KiB
Go
// Package notify - Escalation for critical events
|
|
package notify
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// EscalationConfig configures notification escalation
|
|
type EscalationConfig struct {
|
|
Enabled bool // Enable escalation
|
|
Levels []EscalationLevel // Escalation levels
|
|
AcknowledgeURL string // URL to acknowledge alerts
|
|
CooldownPeriod time.Duration // Cooldown between escalations
|
|
RepeatInterval time.Duration // Repeat unacknowledged alerts
|
|
MaxRepeats int // Maximum repeat attempts
|
|
TrackingEnabled bool // Track escalation state
|
|
}
|
|
|
|
// EscalationLevel defines an escalation tier
|
|
type EscalationLevel struct {
|
|
Name string // Level name (e.g., "primary", "secondary", "manager")
|
|
Delay time.Duration // Delay before escalating to this level
|
|
Recipients []string // Email recipients for this level
|
|
Webhook string // Webhook URL for this level
|
|
Severity Severity // Minimum severity to escalate
|
|
Message string // Custom message template
|
|
}
|
|
|
|
// DefaultEscalationConfig returns sensible defaults
|
|
func DefaultEscalationConfig() EscalationConfig {
|
|
return EscalationConfig{
|
|
Enabled: false,
|
|
CooldownPeriod: 15 * time.Minute,
|
|
RepeatInterval: 30 * time.Minute,
|
|
MaxRepeats: 3,
|
|
Levels: []EscalationLevel{
|
|
{
|
|
Name: "primary",
|
|
Delay: 0,
|
|
Severity: SeverityError,
|
|
},
|
|
{
|
|
Name: "secondary",
|
|
Delay: 15 * time.Minute,
|
|
Severity: SeverityError,
|
|
},
|
|
{
|
|
Name: "critical",
|
|
Delay: 30 * time.Minute,
|
|
Severity: SeverityCritical,
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
// EscalationState tracks escalation for an alert
|
|
type EscalationState struct {
|
|
AlertID string `json:"alert_id"`
|
|
Event *Event `json:"event"`
|
|
CurrentLevel int `json:"current_level"`
|
|
StartedAt time.Time `json:"started_at"`
|
|
LastEscalation time.Time `json:"last_escalation"`
|
|
RepeatCount int `json:"repeat_count"`
|
|
Acknowledged bool `json:"acknowledged"`
|
|
AcknowledgedBy string `json:"acknowledged_by,omitempty"`
|
|
AcknowledgedAt *time.Time `json:"acknowledged_at,omitempty"`
|
|
Resolved bool `json:"resolved"`
|
|
}
|
|
|
|
// Escalator manages alert escalation
|
|
type Escalator struct {
|
|
config EscalationConfig
|
|
manager *Manager
|
|
alerts map[string]*EscalationState
|
|
mu sync.RWMutex
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
ticker *time.Ticker
|
|
}
|
|
|
|
// NewEscalator creates a new escalation manager
|
|
func NewEscalator(config EscalationConfig, manager *Manager) *Escalator {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
e := &Escalator{
|
|
config: config,
|
|
manager: manager,
|
|
alerts: make(map[string]*EscalationState),
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
}
|
|
|
|
if config.Enabled {
|
|
e.ticker = time.NewTicker(time.Minute)
|
|
go e.runEscalationLoop()
|
|
}
|
|
|
|
return e
|
|
}
|
|
|
|
// Handle processes an event for potential escalation
|
|
func (e *Escalator) Handle(event *Event) {
|
|
if !e.config.Enabled {
|
|
return
|
|
}
|
|
|
|
// Only escalate errors and critical events
|
|
if severityOrder(event.Severity) < severityOrder(SeverityError) {
|
|
return
|
|
}
|
|
|
|
// Generate alert ID
|
|
alertID := e.generateAlertID(event)
|
|
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
|
|
// Check if alert already exists
|
|
if existing, ok := e.alerts[alertID]; ok {
|
|
if !existing.Acknowledged && !existing.Resolved {
|
|
// Alert already being escalated
|
|
return
|
|
}
|
|
}
|
|
|
|
// Create new escalation state
|
|
state := &EscalationState{
|
|
AlertID: alertID,
|
|
Event: event,
|
|
CurrentLevel: 0,
|
|
StartedAt: time.Now(),
|
|
LastEscalation: time.Now(),
|
|
}
|
|
|
|
e.alerts[alertID] = state
|
|
|
|
// Send immediate notification to first level
|
|
e.notifyLevel(state, 0)
|
|
}
|
|
|
|
// generateAlertID creates a unique ID for an alert
|
|
func (e *Escalator) generateAlertID(event *Event) string {
|
|
return fmt.Sprintf("%s_%s_%s",
|
|
event.Type,
|
|
event.Database,
|
|
event.Hostname)
|
|
}
|
|
|
|
// notifyLevel sends notification for a specific escalation level
|
|
func (e *Escalator) notifyLevel(state *EscalationState, level int) {
|
|
if level >= len(e.config.Levels) {
|
|
return
|
|
}
|
|
|
|
lvl := e.config.Levels[level]
|
|
|
|
// Create escalated event
|
|
escalatedEvent := &Event{
|
|
Type: state.Event.Type,
|
|
Severity: state.Event.Severity,
|
|
Timestamp: time.Now(),
|
|
Database: state.Event.Database,
|
|
Hostname: state.Event.Hostname,
|
|
Message: e.formatEscalationMessage(state, lvl),
|
|
Details: make(map[string]string),
|
|
}
|
|
|
|
escalatedEvent.Details["escalation_level"] = lvl.Name
|
|
escalatedEvent.Details["alert_id"] = state.AlertID
|
|
escalatedEvent.Details["escalation_time"] = fmt.Sprintf("%d", int(time.Since(state.StartedAt).Minutes()))
|
|
escalatedEvent.Details["original_message"] = state.Event.Message
|
|
|
|
if state.Event.Error != "" {
|
|
escalatedEvent.Error = state.Event.Error
|
|
}
|
|
|
|
// Send via manager
|
|
e.manager.Notify(escalatedEvent)
|
|
|
|
state.CurrentLevel = level
|
|
state.LastEscalation = time.Now()
|
|
}
|
|
|
|
// formatEscalationMessage creates an escalation message
|
|
func (e *Escalator) formatEscalationMessage(state *EscalationState, level EscalationLevel) string {
|
|
if level.Message != "" {
|
|
return level.Message
|
|
}
|
|
|
|
elapsed := time.Since(state.StartedAt)
|
|
return fmt.Sprintf("🚨 ESCALATION [%s] - Alert unacknowledged for %s\n\n%s",
|
|
level.Name,
|
|
formatDuration(elapsed),
|
|
state.Event.Message)
|
|
}
|
|
|
|
// runEscalationLoop checks for alerts that need escalation
|
|
func (e *Escalator) runEscalationLoop() {
|
|
for {
|
|
select {
|
|
case <-e.ctx.Done():
|
|
return
|
|
case <-e.ticker.C:
|
|
e.checkEscalations()
|
|
}
|
|
}
|
|
}
|
|
|
|
// checkEscalations checks all alerts for needed escalation
|
|
func (e *Escalator) checkEscalations() {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
|
|
now := time.Now()
|
|
|
|
for _, state := range e.alerts {
|
|
if state.Acknowledged || state.Resolved {
|
|
continue
|
|
}
|
|
|
|
// Check if we need to escalate to next level
|
|
nextLevel := state.CurrentLevel + 1
|
|
if nextLevel < len(e.config.Levels) {
|
|
lvl := e.config.Levels[nextLevel]
|
|
if now.Sub(state.StartedAt) >= lvl.Delay {
|
|
e.notifyLevel(state, nextLevel)
|
|
}
|
|
}
|
|
|
|
// Check if we need to repeat the alert
|
|
if state.RepeatCount < e.config.MaxRepeats {
|
|
if now.Sub(state.LastEscalation) >= e.config.RepeatInterval {
|
|
e.notifyLevel(state, state.CurrentLevel)
|
|
state.RepeatCount++
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Acknowledge acknowledges an alert
|
|
func (e *Escalator) Acknowledge(alertID, user string) error {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
|
|
state, ok := e.alerts[alertID]
|
|
if !ok {
|
|
return fmt.Errorf("alert not found: %s", alertID)
|
|
}
|
|
|
|
now := time.Now()
|
|
state.Acknowledged = true
|
|
state.AcknowledgedBy = user
|
|
state.AcknowledgedAt = &now
|
|
|
|
return nil
|
|
}
|
|
|
|
// Resolve resolves an alert
|
|
func (e *Escalator) Resolve(alertID string) error {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
|
|
state, ok := e.alerts[alertID]
|
|
if !ok {
|
|
return fmt.Errorf("alert not found: %s", alertID)
|
|
}
|
|
|
|
state.Resolved = true
|
|
return nil
|
|
}
|
|
|
|
// GetActiveAlerts returns all active (unacknowledged, unresolved) alerts
|
|
func (e *Escalator) GetActiveAlerts() []*EscalationState {
|
|
e.mu.RLock()
|
|
defer e.mu.RUnlock()
|
|
|
|
var active []*EscalationState
|
|
for _, state := range e.alerts {
|
|
if !state.Acknowledged && !state.Resolved {
|
|
active = append(active, state)
|
|
}
|
|
}
|
|
return active
|
|
}
|
|
|
|
// GetAlert returns a specific alert
|
|
func (e *Escalator) GetAlert(alertID string) (*EscalationState, bool) {
|
|
e.mu.RLock()
|
|
defer e.mu.RUnlock()
|
|
|
|
state, ok := e.alerts[alertID]
|
|
return state, ok
|
|
}
|
|
|
|
// CleanupOld removes old resolved/acknowledged alerts
|
|
func (e *Escalator) CleanupOld(maxAge time.Duration) int {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
|
|
now := time.Now()
|
|
removed := 0
|
|
|
|
for id, state := range e.alerts {
|
|
if (state.Acknowledged || state.Resolved) && now.Sub(state.StartedAt) > maxAge {
|
|
delete(e.alerts, id)
|
|
removed++
|
|
}
|
|
}
|
|
|
|
return removed
|
|
}
|
|
|
|
// Stop stops the escalator
|
|
func (e *Escalator) Stop() {
|
|
e.cancel()
|
|
if e.ticker != nil {
|
|
e.ticker.Stop()
|
|
}
|
|
}
|
|
|
|
// EscalatorStats returns escalator statistics
|
|
type EscalatorStats struct {
|
|
ActiveAlerts int `json:"active_alerts"`
|
|
AcknowledgedAlerts int `json:"acknowledged_alerts"`
|
|
ResolvedAlerts int `json:"resolved_alerts"`
|
|
EscalationEnabled bool `json:"escalation_enabled"`
|
|
LevelCount int `json:"level_count"`
|
|
}
|
|
|
|
// Stats returns escalator statistics
|
|
func (e *Escalator) Stats() EscalatorStats {
|
|
e.mu.RLock()
|
|
defer e.mu.RUnlock()
|
|
|
|
stats := EscalatorStats{
|
|
EscalationEnabled: e.config.Enabled,
|
|
LevelCount: len(e.config.Levels),
|
|
}
|
|
|
|
for _, state := range e.alerts {
|
|
if state.Resolved {
|
|
stats.ResolvedAlerts++
|
|
} else if state.Acknowledged {
|
|
stats.AcknowledgedAlerts++
|
|
} else {
|
|
stats.ActiveAlerts++
|
|
}
|
|
}
|
|
|
|
return stats
|
|
}
|
|
|
|
func formatDuration(d time.Duration) string {
|
|
if d < time.Minute {
|
|
return fmt.Sprintf("%.0fs", d.Seconds())
|
|
}
|
|
if d < time.Hour {
|
|
return fmt.Sprintf("%.0fm", d.Minutes())
|
|
}
|
|
return fmt.Sprintf("%.0fh %.0fm", d.Hours(), d.Minutes()-d.Hours()*60)
|
|
}
|