Build all platforms v3.42.22
This commit is contained in:
@@ -3,7 +3,9 @@ package dedup
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
_ "github.com/mattn/go-sqlite3" // SQLite driver
|
||||
@@ -11,27 +13,67 @@ import (
|
||||
|
||||
// ChunkIndex provides fast chunk lookups using SQLite
|
||||
type ChunkIndex struct {
|
||||
db *sql.DB
|
||||
db *sql.DB
|
||||
dbPath string
|
||||
}
|
||||
|
||||
// NewChunkIndex opens or creates a chunk index database
|
||||
// NewChunkIndex opens or creates a chunk index database at the default location
|
||||
func NewChunkIndex(basePath string) (*ChunkIndex, error) {
|
||||
dbPath := filepath.Join(basePath, "chunks.db")
|
||||
return NewChunkIndexAt(dbPath)
|
||||
}
|
||||
|
||||
db, err := sql.Open("sqlite3", dbPath+"?_journal_mode=WAL&_synchronous=NORMAL")
|
||||
// NewChunkIndexAt opens or creates a chunk index database at a specific path
|
||||
// Use this to put the SQLite index on local storage when chunks are on NFS/CIFS
|
||||
func NewChunkIndexAt(dbPath string) (*ChunkIndex, error) {
|
||||
// Ensure parent directory exists
|
||||
if err := os.MkdirAll(filepath.Dir(dbPath), 0700); err != nil {
|
||||
return nil, fmt.Errorf("failed to create index directory: %w", err)
|
||||
}
|
||||
|
||||
// Add busy_timeout to handle lock contention gracefully
|
||||
db, err := sql.Open("sqlite3", dbPath+"?_journal_mode=WAL&_synchronous=NORMAL&_busy_timeout=5000")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to open chunk index: %w", err)
|
||||
}
|
||||
|
||||
idx := &ChunkIndex{db: db}
|
||||
// Test the connection and check for locking issues
|
||||
if err := db.Ping(); err != nil {
|
||||
db.Close()
|
||||
if isNFSLockingError(err) {
|
||||
return nil, fmt.Errorf("database locked (common on NFS/CIFS): %w\n\n"+
|
||||
"HINT: Use --index-db to put the SQLite index on local storage:\n"+
|
||||
" dbbackup dedup ... --index-db /var/lib/dbbackup/dedup-index.db", err)
|
||||
}
|
||||
return nil, fmt.Errorf("failed to connect to chunk index: %w", err)
|
||||
}
|
||||
|
||||
idx := &ChunkIndex{db: db, dbPath: dbPath}
|
||||
if err := idx.migrate(); err != nil {
|
||||
db.Close()
|
||||
if isNFSLockingError(err) {
|
||||
return nil, fmt.Errorf("database locked during migration (common on NFS/CIFS): %w\n\n"+
|
||||
"HINT: Use --index-db to put the SQLite index on local storage:\n"+
|
||||
" dbbackup dedup ... --index-db /var/lib/dbbackup/dedup-index.db", err)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return idx, nil
|
||||
}
|
||||
|
||||
// isNFSLockingError checks if an error is likely due to NFS/CIFS locking issues
|
||||
func isNFSLockingError(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
errStr := err.Error()
|
||||
return strings.Contains(errStr, "database is locked") ||
|
||||
strings.Contains(errStr, "SQLITE_BUSY") ||
|
||||
strings.Contains(errStr, "cannot lock") ||
|
||||
strings.Contains(errStr, "lock protocol")
|
||||
}
|
||||
|
||||
// migrate creates the schema if needed
|
||||
func (idx *ChunkIndex) migrate() error {
|
||||
schema := `
|
||||
@@ -166,15 +208,26 @@ func (idx *ChunkIndex) RemoveManifest(id string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// UpdateManifestVerified updates the verified timestamp for a manifest
|
||||
func (idx *ChunkIndex) UpdateManifestVerified(id string, verifiedAt time.Time) error {
|
||||
_, err := idx.db.Exec("UPDATE manifests SET verified_at = ? WHERE id = ?", verifiedAt, id)
|
||||
return err
|
||||
}
|
||||
|
||||
// IndexStats holds statistics about the dedup index
|
||||
type IndexStats struct {
|
||||
TotalChunks int64
|
||||
TotalManifests int64
|
||||
TotalSizeRaw int64 // Uncompressed, undeduplicated
|
||||
TotalSizeStored int64 // On-disk after dedup+compression
|
||||
DedupRatio float64
|
||||
TotalSizeRaw int64 // Uncompressed, undeduplicated (per-chunk)
|
||||
TotalSizeStored int64 // On-disk after dedup+compression (per-chunk)
|
||||
DedupRatio float64 // Based on manifests (real dedup ratio)
|
||||
OldestChunk time.Time
|
||||
NewestChunk time.Time
|
||||
|
||||
// Manifest-based stats (accurate dedup calculation)
|
||||
TotalBackupSize int64 // Sum of all backup original sizes
|
||||
TotalNewData int64 // Sum of all new chunks stored
|
||||
SpaceSaved int64 // Difference = what dedup saved
|
||||
}
|
||||
|
||||
// Stats returns statistics about the index
|
||||
@@ -206,8 +259,22 @@ func (idx *ChunkIndex) Stats() (*IndexStats, error) {
|
||||
|
||||
idx.db.QueryRow("SELECT COUNT(*) FROM manifests").Scan(&stats.TotalManifests)
|
||||
|
||||
if stats.TotalSizeRaw > 0 {
|
||||
stats.DedupRatio = 1.0 - float64(stats.TotalSizeStored)/float64(stats.TotalSizeRaw)
|
||||
// Calculate accurate dedup ratio from manifests
|
||||
// Sum all backup original sizes and all new data stored
|
||||
err = idx.db.QueryRow(`
|
||||
SELECT
|
||||
COALESCE(SUM(original_size), 0),
|
||||
COALESCE(SUM(stored_size), 0)
|
||||
FROM manifests
|
||||
`).Scan(&stats.TotalBackupSize, &stats.TotalNewData)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Calculate real dedup ratio: how much data was deduplicated across all backups
|
||||
if stats.TotalBackupSize > 0 {
|
||||
stats.DedupRatio = 1.0 - float64(stats.TotalNewData)/float64(stats.TotalBackupSize)
|
||||
stats.SpaceSaved = stats.TotalBackupSize - stats.TotalNewData
|
||||
}
|
||||
|
||||
return stats, nil
|
||||
|
||||
@@ -36,8 +36,9 @@ type Manifest struct {
|
||||
DedupRatio float64 `json:"dedup_ratio"` // 1.0 = no dedup, 0.0 = 100% dedup
|
||||
|
||||
// Encryption and compression settings used
|
||||
Encrypted bool `json:"encrypted"`
|
||||
Compressed bool `json:"compressed"`
|
||||
Encrypted bool `json:"encrypted"`
|
||||
Compressed bool `json:"compressed"`
|
||||
Decompressed bool `json:"decompressed,omitempty"` // Input was auto-decompressed before chunking
|
||||
|
||||
// Verification
|
||||
SHA256 string `json:"sha256"` // Hash of reconstructed file
|
||||
|
||||
235
internal/dedup/metrics.go
Normal file
235
internal/dedup/metrics.go
Normal file
@@ -0,0 +1,235 @@
|
||||
package dedup
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// DedupMetrics holds deduplication statistics for Prometheus
|
||||
type DedupMetrics struct {
|
||||
// Global stats
|
||||
TotalChunks int64
|
||||
TotalManifests int64
|
||||
TotalBackupSize int64 // Sum of all backup original sizes
|
||||
TotalNewData int64 // Sum of all new chunks stored
|
||||
SpaceSaved int64 // Bytes saved by deduplication
|
||||
DedupRatio float64 // Overall dedup ratio (0-1)
|
||||
DiskUsage int64 // Actual bytes on disk
|
||||
|
||||
// Per-database stats
|
||||
ByDatabase map[string]*DatabaseDedupMetrics
|
||||
}
|
||||
|
||||
// DatabaseDedupMetrics holds per-database dedup stats
|
||||
type DatabaseDedupMetrics struct {
|
||||
Database string
|
||||
BackupCount int
|
||||
TotalSize int64
|
||||
StoredSize int64
|
||||
DedupRatio float64
|
||||
LastBackupTime time.Time
|
||||
LastVerified time.Time
|
||||
}
|
||||
|
||||
// CollectMetrics gathers dedup statistics from the index and store
|
||||
func CollectMetrics(basePath string, indexPath string) (*DedupMetrics, error) {
|
||||
var idx *ChunkIndex
|
||||
var err error
|
||||
|
||||
if indexPath != "" {
|
||||
idx, err = NewChunkIndexAt(indexPath)
|
||||
} else {
|
||||
idx, err = NewChunkIndex(basePath)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to open chunk index: %w", err)
|
||||
}
|
||||
defer idx.Close()
|
||||
|
||||
store, err := NewChunkStore(StoreConfig{BasePath: basePath})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to open chunk store: %w", err)
|
||||
}
|
||||
|
||||
// Get index stats
|
||||
stats, err := idx.Stats()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get index stats: %w", err)
|
||||
}
|
||||
|
||||
// Get store stats
|
||||
storeStats, err := store.Stats()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get store stats: %w", err)
|
||||
}
|
||||
|
||||
metrics := &DedupMetrics{
|
||||
TotalChunks: stats.TotalChunks,
|
||||
TotalManifests: stats.TotalManifests,
|
||||
TotalBackupSize: stats.TotalBackupSize,
|
||||
TotalNewData: stats.TotalNewData,
|
||||
SpaceSaved: stats.SpaceSaved,
|
||||
DedupRatio: stats.DedupRatio,
|
||||
DiskUsage: storeStats.TotalSize,
|
||||
ByDatabase: make(map[string]*DatabaseDedupMetrics),
|
||||
}
|
||||
|
||||
// Collect per-database metrics from manifest store
|
||||
manifestStore, err := NewManifestStore(basePath)
|
||||
if err != nil {
|
||||
return metrics, nil // Return partial metrics
|
||||
}
|
||||
|
||||
manifests, err := manifestStore.ListAll()
|
||||
if err != nil {
|
||||
return metrics, nil // Return partial metrics
|
||||
}
|
||||
|
||||
for _, m := range manifests {
|
||||
dbKey := m.DatabaseName
|
||||
if dbKey == "" {
|
||||
dbKey = "_default"
|
||||
}
|
||||
|
||||
dbMetrics, ok := metrics.ByDatabase[dbKey]
|
||||
if !ok {
|
||||
dbMetrics = &DatabaseDedupMetrics{
|
||||
Database: dbKey,
|
||||
}
|
||||
metrics.ByDatabase[dbKey] = dbMetrics
|
||||
}
|
||||
|
||||
dbMetrics.BackupCount++
|
||||
dbMetrics.TotalSize += m.OriginalSize
|
||||
dbMetrics.StoredSize += m.StoredSize
|
||||
|
||||
if m.CreatedAt.After(dbMetrics.LastBackupTime) {
|
||||
dbMetrics.LastBackupTime = m.CreatedAt
|
||||
}
|
||||
if !m.VerifiedAt.IsZero() && m.VerifiedAt.After(dbMetrics.LastVerified) {
|
||||
dbMetrics.LastVerified = m.VerifiedAt
|
||||
}
|
||||
}
|
||||
|
||||
// Calculate per-database dedup ratios
|
||||
for _, dbMetrics := range metrics.ByDatabase {
|
||||
if dbMetrics.TotalSize > 0 {
|
||||
dbMetrics.DedupRatio = 1.0 - float64(dbMetrics.StoredSize)/float64(dbMetrics.TotalSize)
|
||||
}
|
||||
}
|
||||
|
||||
return metrics, nil
|
||||
}
|
||||
|
||||
// WritePrometheusTextfile writes dedup metrics in Prometheus format
|
||||
func WritePrometheusTextfile(path string, instance string, basePath string, indexPath string) error {
|
||||
metrics, err := CollectMetrics(basePath, indexPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
output := FormatPrometheusMetrics(metrics, instance)
|
||||
|
||||
// Atomic write
|
||||
dir := filepath.Dir(path)
|
||||
if err := os.MkdirAll(dir, 0755); err != nil {
|
||||
return fmt.Errorf("failed to create directory: %w", err)
|
||||
}
|
||||
|
||||
tmpPath := path + ".tmp"
|
||||
if err := os.WriteFile(tmpPath, []byte(output), 0644); err != nil {
|
||||
return fmt.Errorf("failed to write temp file: %w", err)
|
||||
}
|
||||
|
||||
if err := os.Rename(tmpPath, path); err != nil {
|
||||
os.Remove(tmpPath)
|
||||
return fmt.Errorf("failed to rename temp file: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// FormatPrometheusMetrics formats dedup metrics in Prometheus exposition format
|
||||
func FormatPrometheusMetrics(m *DedupMetrics, instance string) string {
|
||||
var b strings.Builder
|
||||
now := time.Now().Unix()
|
||||
|
||||
b.WriteString("# DBBackup Deduplication Prometheus Metrics\n")
|
||||
b.WriteString(fmt.Sprintf("# Generated at: %s\n", time.Now().Format(time.RFC3339)))
|
||||
b.WriteString(fmt.Sprintf("# Instance: %s\n", instance))
|
||||
b.WriteString("\n")
|
||||
|
||||
// Global dedup metrics
|
||||
b.WriteString("# HELP dbbackup_dedup_chunks_total Total number of unique chunks stored\n")
|
||||
b.WriteString("# TYPE dbbackup_dedup_chunks_total gauge\n")
|
||||
b.WriteString(fmt.Sprintf("dbbackup_dedup_chunks_total{instance=%q} %d\n", instance, m.TotalChunks))
|
||||
b.WriteString("\n")
|
||||
|
||||
b.WriteString("# HELP dbbackup_dedup_manifests_total Total number of deduplicated backups\n")
|
||||
b.WriteString("# TYPE dbbackup_dedup_manifests_total gauge\n")
|
||||
b.WriteString(fmt.Sprintf("dbbackup_dedup_manifests_total{instance=%q} %d\n", instance, m.TotalManifests))
|
||||
b.WriteString("\n")
|
||||
|
||||
b.WriteString("# HELP dbbackup_dedup_backup_bytes_total Total logical size of all backups in bytes\n")
|
||||
b.WriteString("# TYPE dbbackup_dedup_backup_bytes_total gauge\n")
|
||||
b.WriteString(fmt.Sprintf("dbbackup_dedup_backup_bytes_total{instance=%q} %d\n", instance, m.TotalBackupSize))
|
||||
b.WriteString("\n")
|
||||
|
||||
b.WriteString("# HELP dbbackup_dedup_stored_bytes_total Total unique data stored in bytes (after dedup)\n")
|
||||
b.WriteString("# TYPE dbbackup_dedup_stored_bytes_total gauge\n")
|
||||
b.WriteString(fmt.Sprintf("dbbackup_dedup_stored_bytes_total{instance=%q} %d\n", instance, m.TotalNewData))
|
||||
b.WriteString("\n")
|
||||
|
||||
b.WriteString("# HELP dbbackup_dedup_space_saved_bytes Bytes saved by deduplication\n")
|
||||
b.WriteString("# TYPE dbbackup_dedup_space_saved_bytes gauge\n")
|
||||
b.WriteString(fmt.Sprintf("dbbackup_dedup_space_saved_bytes{instance=%q} %d\n", instance, m.SpaceSaved))
|
||||
b.WriteString("\n")
|
||||
|
||||
b.WriteString("# HELP dbbackup_dedup_ratio Deduplication ratio (0-1, higher is better)\n")
|
||||
b.WriteString("# TYPE dbbackup_dedup_ratio gauge\n")
|
||||
b.WriteString(fmt.Sprintf("dbbackup_dedup_ratio{instance=%q} %.4f\n", instance, m.DedupRatio))
|
||||
b.WriteString("\n")
|
||||
|
||||
b.WriteString("# HELP dbbackup_dedup_disk_usage_bytes Actual disk usage of chunk store\n")
|
||||
b.WriteString("# TYPE dbbackup_dedup_disk_usage_bytes gauge\n")
|
||||
b.WriteString(fmt.Sprintf("dbbackup_dedup_disk_usage_bytes{instance=%q} %d\n", instance, m.DiskUsage))
|
||||
b.WriteString("\n")
|
||||
|
||||
// Per-database metrics
|
||||
if len(m.ByDatabase) > 0 {
|
||||
b.WriteString("# HELP dbbackup_dedup_database_backup_count Number of deduplicated backups per database\n")
|
||||
b.WriteString("# TYPE dbbackup_dedup_database_backup_count gauge\n")
|
||||
for _, db := range m.ByDatabase {
|
||||
b.WriteString(fmt.Sprintf("dbbackup_dedup_database_backup_count{instance=%q,database=%q} %d\n",
|
||||
instance, db.Database, db.BackupCount))
|
||||
}
|
||||
b.WriteString("\n")
|
||||
|
||||
b.WriteString("# HELP dbbackup_dedup_database_ratio Deduplication ratio per database (0-1)\n")
|
||||
b.WriteString("# TYPE dbbackup_dedup_database_ratio gauge\n")
|
||||
for _, db := range m.ByDatabase {
|
||||
b.WriteString(fmt.Sprintf("dbbackup_dedup_database_ratio{instance=%q,database=%q} %.4f\n",
|
||||
instance, db.Database, db.DedupRatio))
|
||||
}
|
||||
b.WriteString("\n")
|
||||
|
||||
b.WriteString("# HELP dbbackup_dedup_database_last_backup_timestamp Last backup timestamp per database\n")
|
||||
b.WriteString("# TYPE dbbackup_dedup_database_last_backup_timestamp gauge\n")
|
||||
for _, db := range m.ByDatabase {
|
||||
if !db.LastBackupTime.IsZero() {
|
||||
b.WriteString(fmt.Sprintf("dbbackup_dedup_database_last_backup_timestamp{instance=%q,database=%q} %d\n",
|
||||
instance, db.Database, db.LastBackupTime.Unix()))
|
||||
}
|
||||
}
|
||||
b.WriteString("\n")
|
||||
}
|
||||
|
||||
b.WriteString("# HELP dbbackup_dedup_scrape_timestamp Unix timestamp when dedup metrics were collected\n")
|
||||
b.WriteString("# TYPE dbbackup_dedup_scrape_timestamp gauge\n")
|
||||
b.WriteString(fmt.Sprintf("dbbackup_dedup_scrape_timestamp{instance=%q} %d\n", instance, now))
|
||||
|
||||
return b.String()
|
||||
}
|
||||
Reference in New Issue
Block a user