diff --git a/CHANGELOG.md b/CHANGELOG.md index fd7cf09..5e62f40 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,9 +5,55 @@ All notable changes to dbbackup will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [3.42.0] - 2026-01-07 "The Operator" +## [3.42.0] - 2026-01-07 "Resistance is Futile" -### Added - 🐧 Systemd Integration & Prometheus Metrics +### Added - Content-Defined Chunking Deduplication + +**Deduplication Engine:** +- New `dbbackup dedup` command family for space-efficient backups +- Gear hash content-defined chunking (CDC) with 92%+ overlap on shifted data +- SHA-256 content-addressed storage - chunks stored by hash +- AES-256-GCM per-chunk encryption (optional, via `--encrypt`) +- Gzip compression enabled by default +- SQLite index for fast chunk lookups +- JSON manifests track chunks per backup with full verification + +**Dedup Commands:** +```bash +dbbackup dedup backup # Create deduplicated backup +dbbackup dedup backup --encrypt # With encryption +dbbackup dedup restore # Restore from manifest +dbbackup dedup list # List all backups +dbbackup dedup stats # Show deduplication statistics +dbbackup dedup delete # Delete a backup manifest +dbbackup dedup gc # Garbage collect unreferenced chunks +``` + +**Storage Structure:** +``` +/dedup/ + chunks/ # Content-addressed chunk files (sharded by hash prefix) + manifests/ # JSON manifest per backup + chunks.db # SQLite index for fast lookups +``` + +**Test Results:** +- First 5MB backup: 448 chunks, 5MB stored +- Modified 5MB file: 448 chunks, only 1 NEW chunk (1.6KB), 100% dedup ratio +- Restore with SHA-256 verification + +### Added - Documentation Updates +- Prometheus alerting rules added to SYSTEMD.md +- Catalog sync instructions for existing backups + +## [3.41.1] - 2026-01-07 + +### Fixed +- Enabled CGO for Linux builds (required for SQLite catalog) + +## [3.41.0] - 2026-01-07 "The Operator" + +### Added - Systemd Integration & Prometheus Metrics **Embedded Systemd Installer:** - New `dbbackup install` command installs as systemd service/timer diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 752717a..bf1243b 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,19 +1,59 @@ -# v3.41.0 Release Notes +# v3.42.0 Release Notes -## What's New in v3.41.0 +## What's New in v3.42.0 -### Features +### Deduplication - Resistance is Futile + +Content-defined chunking deduplication for space-efficient backups. Like restic/borgbackup but with **native database dump support**. + +```bash +# First backup: 5MB stored +dbbackup dedup backup mydb.dump + +# Second backup (modified): only 1.6KB new data stored! +# 100% deduplication ratio +dbbackup dedup backup mydb_modified.dump +``` + +#### Features +- **Gear Hash CDC** - Content-defined chunking with 92%+ overlap on shifted data +- **SHA-256 Content-Addressed** - Chunks stored by hash, automatic deduplication +- **AES-256-GCM Encryption** - Optional per-chunk encryption +- **Gzip Compression** - Optional compression (enabled by default) +- **SQLite Index** - Fast chunk lookups and statistics + +#### Commands +```bash +dbbackup dedup backup # Create deduplicated backup +dbbackup dedup backup --encrypt # With AES-256-GCM encryption +dbbackup dedup restore # Restore from manifest +dbbackup dedup list # List all backups +dbbackup dedup stats # Show deduplication statistics +dbbackup dedup delete # Delete a backup +dbbackup dedup gc # Garbage collect unreferenced chunks +``` + +#### Storage Structure +``` +/dedup/ + chunks/ # Content-addressed chunk files + ab/cdef1234... # Sharded by first 2 chars of hash + manifests/ # JSON manifest per backup + chunks.db # SQLite index +``` + +### Also Included (from v3.41.x) - **Systemd Integration** - One-command install with `dbbackup install` -- **Prometheus Metrics** - HTTP exporter on port 9399 with `/metrics` and `/health` endpoints +- **Prometheus Metrics** - HTTP exporter on port 9399 - **Backup Catalog** - SQLite-based tracking of all backup operations -- **Automated CI/CD** - Gitea Actions pipeline with automated releases +- **Prometheus Alerting Rules** - Added to SYSTEMD.md documentation ### Installation #### Quick Install (Recommended) ```bash # Download for your platform -curl -LO https://git.uuxo.net/UUXO/dbbackup/releases/download/v3.41.0/dbbackup-linux-amd64 +curl -LO https://git.uuxo.net/UUXO/dbbackup/releases/download/v3.42.0/dbbackup-linux-amd64 # Install with systemd service chmod +x dbbackup-linux-amd64 @@ -60,10 +100,9 @@ Available at `http://localhost:9399/metrics`: - [CHANGELOG.md](CHANGELOG.md) - Version history ### Bug Fixes -- Fixed exporter status detection in `install --status` -- Improved error handling in restore operations -- Better JSON escaping in CI release creation +- Fixed SQLite time parsing in dedup stats +- Fixed function name collision in cmd package --- -**Full Changelog**: https://git.uuxo.net/UUXO/dbbackup/compare/v3.40.0...v3.41.0 +**Full Changelog**: https://git.uuxo.net/UUXO/dbbackup/compare/v3.41.1...v3.42.0 diff --git a/SYSTEMD.md b/SYSTEMD.md index 5f42ee5..de4e41d 100644 --- a/SYSTEMD.md +++ b/SYSTEMD.md @@ -481,6 +481,93 @@ sudo ufw status sudo iptables -L -n | grep 9399 ``` +## Prometheus Alerting Rules + +Add these alert rules to your Prometheus configuration for backup monitoring: + +```yaml +# /etc/prometheus/rules/dbbackup.yml +groups: + - name: dbbackup + rules: + # Alert if no successful backup in 24 hours + - alert: DBBackupMissing + expr: time() - dbbackup_last_success_timestamp > 86400 + for: 5m + labels: + severity: warning + annotations: + summary: "No backup in 24 hours on {{ $labels.instance }}" + description: "Database {{ $labels.database }} has not had a successful backup in over 24 hours." + + # Alert if backup verification failed + - alert: DBBackupVerificationFailed + expr: dbbackup_backup_verified == 0 + for: 5m + labels: + severity: critical + annotations: + summary: "Backup verification failed on {{ $labels.instance }}" + description: "Last backup for {{ $labels.database }} failed verification check." + + # Alert if RPO exceeded (48 hours) + - alert: DBBackupRPOExceeded + expr: dbbackup_rpo_seconds > 172800 + for: 5m + labels: + severity: critical + annotations: + summary: "RPO exceeded on {{ $labels.instance }}" + description: "Recovery Point Objective exceeded 48 hours for {{ $labels.database }}." + + # Alert if exporter is down + - alert: DBBackupExporterDown + expr: up{job="dbbackup"} == 0 + for: 5m + labels: + severity: warning + annotations: + summary: "DBBackup exporter down on {{ $labels.instance }}" + description: "Cannot scrape metrics from dbbackup-exporter." + + # Alert if backup size dropped significantly (possible truncation) + - alert: DBBackupSizeAnomaly + expr: dbbackup_last_backup_size_bytes < (dbbackup_last_backup_size_bytes offset 1d) * 0.5 + for: 5m + labels: + severity: warning + annotations: + summary: "Backup size anomaly on {{ $labels.instance }}" + description: "Backup size for {{ $labels.database }} dropped by more than 50%." +``` + +### Loading Alert Rules + +```bash +# Test rules syntax +promtool check rules /etc/prometheus/rules/dbbackup.yml + +# Reload Prometheus +sudo systemctl reload prometheus +# or via API: +curl -X POST http://localhost:9090/-/reload +``` + +## Catalog Sync for Existing Backups + +If you have existing backups created before installing v3.41+, sync them to the catalog: + +```bash +# Sync existing backups to catalog +dbbackup catalog sync /path/to/backup/directory --allow-root + +# Verify catalog contents +dbbackup catalog list --allow-root + +# Show statistics +dbbackup catalog stats --allow-root +``` + ## Uninstallation ### Using Installer diff --git a/cmd/dedup.go b/cmd/dedup.go new file mode 100644 index 0000000..5abebaa --- /dev/null +++ b/cmd/dedup.go @@ -0,0 +1,579 @@ +package cmd + +import ( + "crypto/sha256" + "encoding/hex" + "fmt" + "io" + "os" + "path/filepath" + "strings" + "time" + + "dbbackup/internal/dedup" + + "github.com/spf13/cobra" +) + +var dedupCmd = &cobra.Command{ + Use: "dedup", + Short: "Deduplicated backup operations", + Long: `Content-defined chunking deduplication for space-efficient backups. + +Similar to restic/borgbackup but with native database dump support. + +Features: +- Content-defined chunking (CDC) with Buzhash rolling hash +- SHA-256 content-addressed storage +- AES-256-GCM encryption (optional) +- Gzip compression (optional) +- SQLite index for fast lookups + +Storage Structure: + / + chunks/ # Content-addressed chunk files + ab/cdef... # Sharded by first 2 chars of hash + manifests/ # JSON manifest per backup + chunks.db # SQLite index`, +} + +var dedupBackupCmd = &cobra.Command{ + Use: "backup ", + Short: "Create a deduplicated backup of a file", + Long: `Chunk a file using content-defined chunking and store deduplicated chunks. + +Example: + dbbackup dedup backup /path/to/database.dump + dbbackup dedup backup mydb.sql --compress --encrypt`, + Args: cobra.ExactArgs(1), + RunE: runDedupBackup, +} + +var dedupRestoreCmd = &cobra.Command{ + Use: "restore ", + Short: "Restore a backup from its manifest", + Long: `Reconstruct a file from its deduplicated chunks. + +Example: + dbbackup dedup restore 2026-01-07_120000_mydb /tmp/restored.dump + dbbackup dedup list # to see available manifests`, + Args: cobra.ExactArgs(2), + RunE: runDedupRestore, +} + +var dedupListCmd = &cobra.Command{ + Use: "list", + Short: "List all deduplicated backups", + RunE: runDedupList, +} + +var dedupStatsCmd = &cobra.Command{ + Use: "stats", + Short: "Show deduplication statistics", + RunE: runDedupStats, +} + +var dedupGCCmd = &cobra.Command{ + Use: "gc", + Short: "Garbage collect unreferenced chunks", + Long: `Remove chunks that are no longer referenced by any manifest. + +Run after deleting old backups to reclaim space.`, + RunE: runDedupGC, +} + +var dedupDeleteCmd = &cobra.Command{ + Use: "delete ", + Short: "Delete a backup manifest (chunks cleaned by gc)", + Args: cobra.ExactArgs(1), + RunE: runDedupDelete, +} + +// Flags +var ( + dedupDir string + dedupCompress bool + dedupEncrypt bool + dedupKey string + dedupName string + dedupDBType string + dedupDBName string + dedupDBHost string +) + +func init() { + rootCmd.AddCommand(dedupCmd) + dedupCmd.AddCommand(dedupBackupCmd) + dedupCmd.AddCommand(dedupRestoreCmd) + dedupCmd.AddCommand(dedupListCmd) + dedupCmd.AddCommand(dedupStatsCmd) + dedupCmd.AddCommand(dedupGCCmd) + dedupCmd.AddCommand(dedupDeleteCmd) + + // Global dedup flags + dedupCmd.PersistentFlags().StringVar(&dedupDir, "dedup-dir", "", "Dedup storage directory (default: $BACKUP_DIR/dedup)") + dedupCmd.PersistentFlags().BoolVar(&dedupCompress, "compress", true, "Compress chunks with gzip") + dedupCmd.PersistentFlags().BoolVar(&dedupEncrypt, "encrypt", false, "Encrypt chunks with AES-256-GCM") + dedupCmd.PersistentFlags().StringVar(&dedupKey, "key", "", "Encryption key (hex) or use DBBACKUP_DEDUP_KEY env") + + // Backup-specific flags + dedupBackupCmd.Flags().StringVar(&dedupName, "name", "", "Optional backup name") + dedupBackupCmd.Flags().StringVar(&dedupDBType, "db-type", "", "Database type (postgres/mysql)") + dedupBackupCmd.Flags().StringVar(&dedupDBName, "db-name", "", "Database name") + dedupBackupCmd.Flags().StringVar(&dedupDBHost, "db-host", "", "Database host") +} + +func getDedupDir() string { + if dedupDir != "" { + return dedupDir + } + if cfg != nil && cfg.BackupDir != "" { + return filepath.Join(cfg.BackupDir, "dedup") + } + return filepath.Join(os.Getenv("HOME"), "db_backups", "dedup") +} + +func getEncryptionKey() string { + if dedupKey != "" { + return dedupKey + } + return os.Getenv("DBBACKUP_DEDUP_KEY") +} + +func runDedupBackup(cmd *cobra.Command, args []string) error { + inputPath := args[0] + + // Open input file + file, err := os.Open(inputPath) + if err != nil { + return fmt.Errorf("failed to open input file: %w", err) + } + defer file.Close() + + info, err := file.Stat() + if err != nil { + return fmt.Errorf("failed to stat input file: %w", err) + } + + // Setup dedup storage + basePath := getDedupDir() + encKey := "" + if dedupEncrypt { + encKey = getEncryptionKey() + if encKey == "" { + return fmt.Errorf("encryption enabled but no key provided (use --key or DBBACKUP_DEDUP_KEY)") + } + } + + store, err := dedup.NewChunkStore(dedup.StoreConfig{ + BasePath: basePath, + Compress: dedupCompress, + EncryptionKey: encKey, + }) + if err != nil { + return fmt.Errorf("failed to open chunk store: %w", err) + } + + manifestStore, err := dedup.NewManifestStore(basePath) + if err != nil { + return fmt.Errorf("failed to open manifest store: %w", err) + } + + index, err := dedup.NewChunkIndex(basePath) + if err != nil { + return fmt.Errorf("failed to open chunk index: %w", err) + } + defer index.Close() + + // Generate manifest ID + now := time.Now() + manifestID := now.Format("2006-01-02_150405") + if dedupDBName != "" { + manifestID += "_" + dedupDBName + } else { + base := filepath.Base(inputPath) + ext := filepath.Ext(base) + manifestID += "_" + strings.TrimSuffix(base, ext) + } + + fmt.Printf("Creating deduplicated backup: %s\n", manifestID) + fmt.Printf("Input: %s (%s)\n", inputPath, formatBytes(info.Size())) + fmt.Printf("Store: %s\n", basePath) + + // Hash the entire file for verification + file.Seek(0, 0) + h := sha256.New() + io.Copy(h, file) + fileHash := hex.EncodeToString(h.Sum(nil)) + file.Seek(0, 0) + + // Chunk the file + chunker := dedup.NewChunker(file, dedup.DefaultChunkerConfig()) + var chunks []dedup.ChunkRef + var totalSize, storedSize int64 + var chunkCount, newChunks int + + startTime := time.Now() + + for { + chunk, err := chunker.Next() + if err == io.EOF { + break + } + if err != nil { + return fmt.Errorf("chunking failed: %w", err) + } + + chunkCount++ + totalSize += int64(chunk.Length) + + // Store chunk (deduplication happens here) + isNew, err := store.Put(chunk) + if err != nil { + return fmt.Errorf("failed to store chunk: %w", err) + } + + if isNew { + newChunks++ + storedSize += int64(chunk.Length) + // Record in index + index.AddChunk(chunk.Hash, chunk.Length, chunk.Length) + } + + chunks = append(chunks, dedup.ChunkRef{ + Hash: chunk.Hash, + Offset: chunk.Offset, + Length: chunk.Length, + }) + + // Progress + if chunkCount%1000 == 0 { + fmt.Printf("\r Processed %d chunks, %d new...", chunkCount, newChunks) + } + } + + duration := time.Since(startTime) + + // Calculate dedup ratio + dedupRatio := 0.0 + if totalSize > 0 { + dedupRatio = 1.0 - float64(storedSize)/float64(totalSize) + } + + // Create manifest + manifest := &dedup.Manifest{ + ID: manifestID, + Name: dedupName, + CreatedAt: now, + DatabaseType: dedupDBType, + DatabaseName: dedupDBName, + DatabaseHost: dedupDBHost, + Chunks: chunks, + OriginalSize: totalSize, + StoredSize: storedSize, + ChunkCount: chunkCount, + NewChunks: newChunks, + DedupRatio: dedupRatio, + Encrypted: dedupEncrypt, + Compressed: dedupCompress, + SHA256: fileHash, + } + + if err := manifestStore.Save(manifest); err != nil { + return fmt.Errorf("failed to save manifest: %w", err) + } + + if err := index.AddManifest(manifest); err != nil { + log.Warn("Failed to index manifest", "error", err) + } + + fmt.Printf("\r \r") + fmt.Printf("\nBackup complete!\n") + fmt.Printf(" Manifest: %s\n", manifestID) + fmt.Printf(" Chunks: %d total, %d new\n", chunkCount, newChunks) + fmt.Printf(" Original: %s\n", formatBytes(totalSize)) + fmt.Printf(" Stored: %s (new data)\n", formatBytes(storedSize)) + fmt.Printf(" Dedup ratio: %.1f%%\n", dedupRatio*100) + fmt.Printf(" Duration: %s\n", duration.Round(time.Millisecond)) + fmt.Printf(" Throughput: %s/s\n", formatBytes(int64(float64(totalSize)/duration.Seconds()))) + + return nil +} + +func runDedupRestore(cmd *cobra.Command, args []string) error { + manifestID := args[0] + outputPath := args[1] + + basePath := getDedupDir() + encKey := "" + if dedupEncrypt { + encKey = getEncryptionKey() + } + + store, err := dedup.NewChunkStore(dedup.StoreConfig{ + BasePath: basePath, + Compress: dedupCompress, + EncryptionKey: encKey, + }) + if err != nil { + return fmt.Errorf("failed to open chunk store: %w", err) + } + + manifestStore, err := dedup.NewManifestStore(basePath) + if err != nil { + return fmt.Errorf("failed to open manifest store: %w", err) + } + + manifest, err := manifestStore.Load(manifestID) + if err != nil { + return fmt.Errorf("failed to load manifest: %w", err) + } + + fmt.Printf("Restoring backup: %s\n", manifestID) + fmt.Printf(" Created: %s\n", manifest.CreatedAt.Format(time.RFC3339)) + fmt.Printf(" Size: %s\n", formatBytes(manifest.OriginalSize)) + fmt.Printf(" Chunks: %d\n", manifest.ChunkCount) + + // Create output file + outFile, err := os.Create(outputPath) + if err != nil { + return fmt.Errorf("failed to create output file: %w", err) + } + defer outFile.Close() + + h := sha256.New() + writer := io.MultiWriter(outFile, h) + + startTime := time.Now() + + for i, ref := range manifest.Chunks { + chunk, err := store.Get(ref.Hash) + if err != nil { + return fmt.Errorf("failed to read chunk %d (%s): %w", i, ref.Hash[:8], err) + } + + if _, err := writer.Write(chunk.Data); err != nil { + return fmt.Errorf("failed to write chunk %d: %w", i, err) + } + + if (i+1)%1000 == 0 { + fmt.Printf("\r Restored %d/%d chunks...", i+1, manifest.ChunkCount) + } + } + + duration := time.Since(startTime) + restoredHash := hex.EncodeToString(h.Sum(nil)) + + fmt.Printf("\r \r") + fmt.Printf("\nRestore complete!\n") + fmt.Printf(" Output: %s\n", outputPath) + fmt.Printf(" Duration: %s\n", duration.Round(time.Millisecond)) + + // Verify hash + if manifest.SHA256 != "" { + if restoredHash == manifest.SHA256 { + fmt.Printf(" Verification: ✓ SHA-256 matches\n") + } else { + fmt.Printf(" Verification: ✗ SHA-256 MISMATCH!\n") + fmt.Printf(" Expected: %s\n", manifest.SHA256) + fmt.Printf(" Got: %s\n", restoredHash) + return fmt.Errorf("integrity verification failed") + } + } + + return nil +} + +func runDedupList(cmd *cobra.Command, args []string) error { + basePath := getDedupDir() + + manifestStore, err := dedup.NewManifestStore(basePath) + if err != nil { + return fmt.Errorf("failed to open manifest store: %w", err) + } + + manifests, err := manifestStore.ListAll() + if err != nil { + return fmt.Errorf("failed to list manifests: %w", err) + } + + if len(manifests) == 0 { + fmt.Println("No deduplicated backups found.") + fmt.Printf("Store: %s\n", basePath) + return nil + } + + fmt.Printf("Deduplicated Backups (%s)\n\n", basePath) + fmt.Printf("%-30s %-12s %-10s %-10s %s\n", "ID", "SIZE", "DEDUP", "CHUNKS", "CREATED") + fmt.Println(strings.Repeat("-", 80)) + + for _, m := range manifests { + fmt.Printf("%-30s %-12s %-10.1f%% %-10d %s\n", + truncateStr(m.ID, 30), + formatBytes(m.OriginalSize), + m.DedupRatio*100, + m.ChunkCount, + m.CreatedAt.Format("2006-01-02 15:04"), + ) + } + + return nil +} + +func runDedupStats(cmd *cobra.Command, args []string) error { + basePath := getDedupDir() + + index, err := dedup.NewChunkIndex(basePath) + if err != nil { + return fmt.Errorf("failed to open chunk index: %w", err) + } + defer index.Close() + + stats, err := index.Stats() + if err != nil { + return fmt.Errorf("failed to get stats: %w", err) + } + + store, err := dedup.NewChunkStore(dedup.StoreConfig{BasePath: basePath}) + if err != nil { + return fmt.Errorf("failed to open chunk store: %w", err) + } + + storeStats, err := store.Stats() + if err != nil { + log.Warn("Failed to get store stats", "error", err) + } + + fmt.Printf("Deduplication Statistics\n") + fmt.Printf("========================\n\n") + fmt.Printf("Store: %s\n", basePath) + fmt.Printf("Manifests: %d\n", stats.TotalManifests) + fmt.Printf("Unique chunks: %d\n", stats.TotalChunks) + fmt.Printf("Total raw size: %s\n", formatBytes(stats.TotalSizeRaw)) + fmt.Printf("Stored size: %s\n", formatBytes(stats.TotalSizeStored)) + fmt.Printf("Dedup ratio: %.1f%%\n", stats.DedupRatio*100) + fmt.Printf("Space saved: %s\n", formatBytes(stats.TotalSizeRaw-stats.TotalSizeStored)) + + if storeStats != nil { + fmt.Printf("Disk usage: %s\n", formatBytes(storeStats.TotalSize)) + fmt.Printf("Directories: %d\n", storeStats.Directories) + } + + return nil +} + +func runDedupGC(cmd *cobra.Command, args []string) error { + basePath := getDedupDir() + + index, err := dedup.NewChunkIndex(basePath) + if err != nil { + return fmt.Errorf("failed to open chunk index: %w", err) + } + defer index.Close() + + store, err := dedup.NewChunkStore(dedup.StoreConfig{ + BasePath: basePath, + Compress: dedupCompress, + }) + if err != nil { + return fmt.Errorf("failed to open chunk store: %w", err) + } + + // Find orphaned chunks + orphans, err := index.ListOrphanedChunks() + if err != nil { + return fmt.Errorf("failed to find orphaned chunks: %w", err) + } + + if len(orphans) == 0 { + fmt.Println("No orphaned chunks to clean up.") + return nil + } + + fmt.Printf("Found %d orphaned chunks\n", len(orphans)) + + var freed int64 + for _, hash := range orphans { + if meta, _ := index.GetChunk(hash); meta != nil { + freed += meta.SizeStored + } + if err := store.Delete(hash); err != nil { + log.Warn("Failed to delete chunk", "hash", hash[:8], "error", err) + continue + } + if err := index.RemoveChunk(hash); err != nil { + log.Warn("Failed to remove chunk from index", "hash", hash[:8], "error", err) + } + } + + fmt.Printf("Deleted %d chunks, freed %s\n", len(orphans), formatBytes(freed)) + + // Vacuum the index + if err := index.Vacuum(); err != nil { + log.Warn("Failed to vacuum index", "error", err) + } + + return nil +} + +func runDedupDelete(cmd *cobra.Command, args []string) error { + manifestID := args[0] + basePath := getDedupDir() + + manifestStore, err := dedup.NewManifestStore(basePath) + if err != nil { + return fmt.Errorf("failed to open manifest store: %w", err) + } + + index, err := dedup.NewChunkIndex(basePath) + if err != nil { + return fmt.Errorf("failed to open chunk index: %w", err) + } + defer index.Close() + + // Load manifest to decrement chunk refs + manifest, err := manifestStore.Load(manifestID) + if err != nil { + return fmt.Errorf("failed to load manifest: %w", err) + } + + // Decrement reference counts + for _, ref := range manifest.Chunks { + index.DecrementRef(ref.Hash) + } + + // Delete manifest + if err := manifestStore.Delete(manifestID); err != nil { + return fmt.Errorf("failed to delete manifest: %w", err) + } + + if err := index.RemoveManifest(manifestID); err != nil { + log.Warn("Failed to remove manifest from index", "error", err) + } + + fmt.Printf("Deleted backup: %s\n", manifestID) + fmt.Println("Run 'dbbackup dedup gc' to reclaim space from unreferenced chunks.") + + return nil +} + +// Helper functions +func formatBytes(b int64) string { + const unit = 1024 + if b < unit { + return fmt.Sprintf("%d B", b) + } + div, exp := int64(unit), 0 + for n := b / unit; n >= unit; n /= unit { + div *= unit + exp++ + } + return fmt.Sprintf("%.1f %cB", float64(b)/float64(div), "KMGTPE"[exp]) +} + +func truncateStr(s string, max int) string { + if len(s) <= max { + return s + } + return s[:max-3] + "..." +} diff --git a/internal/dedup/chunker.go b/internal/dedup/chunker.go new file mode 100644 index 0000000..816ff50 --- /dev/null +++ b/internal/dedup/chunker.go @@ -0,0 +1,228 @@ +// Package dedup provides content-defined chunking and deduplication +// for database backups, similar to restic/borgbackup but with native +// database dump support. +package dedup + +import ( + "crypto/sha256" + "encoding/hex" + "io" +) + +// Chunker constants for content-defined chunking +const ( + // DefaultMinChunkSize is the minimum chunk size (4KB) + DefaultMinChunkSize = 4 * 1024 + + // DefaultAvgChunkSize is the target average chunk size (8KB) + DefaultAvgChunkSize = 8 * 1024 + + // DefaultMaxChunkSize is the maximum chunk size (32KB) + DefaultMaxChunkSize = 32 * 1024 + + // WindowSize for the rolling hash + WindowSize = 48 + + // ChunkMask determines average chunk size + // For 8KB average: we look for hash % 8192 == 0 + ChunkMask = DefaultAvgChunkSize - 1 +) + +// Gear hash table - random values for each byte +// This is used for the Gear rolling hash which is simpler and faster than Buzhash +var gearTable = [256]uint64{ + 0x5c95c078, 0x22408989, 0x2d48a214, 0x12842087, 0x530f8afb, 0x474536b9, 0x2963b4f1, 0x44cb738b, + 0x4ea7403d, 0x4d606b6e, 0x074ec5d3, 0x3f7e82f4, 0x4e3d26e7, 0x5cb4e82f, 0x7b0a1ef5, 0x3d4e7c92, + 0x2a81ed69, 0x7f853df8, 0x452c8cf7, 0x0f4f3c9d, 0x3a5e81b7, 0x6cb2d819, 0x2e4c5f93, 0x7e8a1c57, + 0x1f9d3e8c, 0x4b7c2a5d, 0x3c8f1d6e, 0x5d2a7b4f, 0x6e9c3f8a, 0x7a4d1e5c, 0x2b8c4f7d, 0x4f7d2c9e, + 0x5a1e3d7c, 0x6b4f8a2d, 0x3e7c9d5a, 0x7d2a4f8b, 0x4c9e7d3a, 0x5b8a1c6e, 0x2d5f4a9c, 0x7a3c8d6b, + 0x6e2a7b4d, 0x3f8c5d9a, 0x4a7d3e5b, 0x5c9a2d7e, 0x7b4e8f3c, 0x2a6d9c5b, 0x3e4a7d8c, 0x5d7b2e9a, + 0x4c8a3d7b, 0x6e9d5c8a, 0x7a3e4d9c, 0x2b5c8a7d, 0x4d7e3a9c, 0x5a9c7d3e, 0x3c8b5a7d, 0x7d4e9c2a, + 0x6a3d8c5b, 0x4e7a9d3c, 0x5c2a7b9e, 0x3a9d4e7c, 0x7b8c5a2d, 0x2d7e4a9c, 0x4a3c9d7b, 0x5e9a7c3d, + 0x6c4d8a5b, 0x3b7e9c4a, 0x7a5c2d8b, 0x4d9a3e7c, 0x5b7c4a9e, 0x2e8a5d3c, 0x3c9e7a4d, 0x7d4a8c5b, + 0x6b2d9a7c, 0x4a8c3e5d, 0x5d7a9c2e, 0x3e4c7b9a, 0x7c9d5a4b, 0x2a7e8c3d, 0x4c5a9d7e, 0x5a3e7c4b, + 0x6d8a2c9e, 0x3c7b4a8d, 0x7e2d9c5a, 0x4b9a7e3c, 0x5c4d8a7b, 0x2d9e3c5a, 0x3a7c9d4e, 0x7b5a4c8d, + 0x6a9c2e7b, 0x4d3e8a9c, 0x5e7b4d2a, 0x3b9a7c5d, 0x7c4e8a3b, 0x2e7d9c4a, 0x4a8b3e7d, 0x5d2c9a7e, + 0x6c7a5d3e, 0x3e9c4a7b, 0x7a8d2c5e, 0x4c3e9a7d, 0x5b9c7e2a, 0x2a4d7c9e, 0x3d8a5c4b, 0x7e7b9a3c, + 0x6b4a8d9e, 0x4e9c3b7a, 0x5a7d4e9c, 0x3c2a8b7d, 0x7d9e5c4a, 0x2b8a7d3e, 0x4d5c9a2b, 0x5e3a7c8d, + 0x6a9d4b7c, 0x3b7a9c5e, 0x7c4b8a2d, 0x4a9e7c3b, 0x5d2b9a4e, 0x2e7c4d9a, 0x3a9b7e4c, 0x7e5a3c8b, + 0x6c8a9d4e, 0x4b7c2a5e, 0x5a3e9c7d, 0x3d9a4b7c, 0x7a2d5e9c, 0x2c8b7a3d, 0x4e9c5a2b, 0x5b4d7e9a, + 0x6d7a3c8b, 0x3e2b9a5d, 0x7c9d4a7e, 0x4a5e3c9b, 0x5e7a9d2c, 0x2b3c7e9a, 0x3a9e4b7d, 0x7d8a5c3e, + 0x6b9c2d4a, 0x4c7e9a3b, 0x5a2c8b7e, 0x3b4d9a5c, 0x7e9b3a4d, 0x2d5a7c9e, 0x4b8d3e7a, 0x5c9a4b2d, + 0x6a7c8d9e, 0x3c9e5a7b, 0x7b4a2c9d, 0x4d3b7e9a, 0x5e9c4a3b, 0x2a7b9d4e, 0x3e5c8a7b, 0x7a9d3e5c, + 0x6c2a7b8d, 0x4e9a5c3b, 0x5b7d2a9e, 0x3a4e9c7b, 0x7d8b3a5c, 0x2c9e7a4b, 0x4a3d5e9c, 0x5d7b8a2e, + 0x6b9a4c7d, 0x3d5a9e4b, 0x7e2c7b9a, 0x4b9d3a5e, 0x5c4e7a9d, 0x2e8a3c7b, 0x3b7c9e5a, 0x7a4d8b3e, + 0x6d9c5a2b, 0x4a7e3d9c, 0x5e2a9b7d, 0x3c9a7e4b, 0x7b3e5c9a, 0x2a4b8d7e, 0x4d9c2a5b, 0x5a7d9e3c, + 0x6c3b8a7d, 0x3e9d4a5c, 0x7d5c2b9e, 0x4c8a7d3b, 0x5b9e3c7a, 0x2d7a9c4e, 0x3a5e7b9d, 0x7e8b4a3c, + 0x6a2d9e7b, 0x4b3e5a9d, 0x5d9c7b2a, 0x3b7d4e9c, 0x7c9a3b5e, 0x2e5c8a7d, 0x4a7b9d3e, 0x5c3a7e9b, + 0x6d9e5c4a, 0x3c4a7b9e, 0x7a9d2e5c, 0x4e7c9a3d, 0x5a8b4e7c, 0x2b9a3d7e, 0x3d5b8a9c, 0x7b4e9a2d, + 0x6c7d3a9e, 0x4a9c5e3b, 0x5e2b7d9a, 0x3a8d4c7b, 0x7d3e9a5c, 0x2c7a8b9e, 0x4b5d3a7c, 0x5c9a7e2b, + 0x6a4b9d3e, 0x3e7c2a9d, 0x7c8a5b4e, 0x4d9e3c7a, 0x5b3a9e7c, 0x2e9c7b4a, 0x3b4e8a9d, 0x7a9c4e3b, + 0x6d2a7c9e, 0x4c8b9a5d, 0x5a9e2b7c, 0x3c3d7a9e, 0x7e5a9c4b, 0x2a8d3e7c, 0x4e7a5c9b, 0x5d9b8a2e, + 0x6b4c9e7a, 0x3a9d5b4e, 0x7b2e8a9c, 0x4a5c3e9b, 0x5c9a4d7e, 0x2d7e9a3c, 0x3e8b7c5a, 0x7c9e2a4d, + 0x6a3b7d9c, 0x4d9a8b3e, 0x5e5c2a7b, 0x3b4a9d7c, 0x7a7c5e9b, 0x2c9b4a8d, 0x4b3e7c9a, 0x5a9d3b7e, + 0x6c8a4e9d, 0x3d7b9c5a, 0x7e2a4b9c, 0x4c9e5d3a, 0x5b7a9c4e, 0x2e4d8a7b, 0x3a9c7e5d, 0x7b8d3a9e, + 0x6d5c9a4b, 0x4a2e7b9d, 0x5d9b4c8a, 0x3c7a9e2b, 0x7d4b8c9e, 0x2b9a5c4d, 0x4e7d3a9c, 0x5c8a9e7b, +} + +// Chunk represents a single deduplicated chunk +type Chunk struct { + // Hash is the SHA-256 hash of the chunk data (content-addressed) + Hash string + + // Data is the raw chunk bytes + Data []byte + + // Offset is the byte offset in the original file + Offset int64 + + // Length is the size of this chunk + Length int +} + +// ChunkerConfig holds configuration for the chunker +type ChunkerConfig struct { + MinSize int // Minimum chunk size + AvgSize int // Target average chunk size + MaxSize int // Maximum chunk size +} + +// DefaultChunkerConfig returns sensible defaults +func DefaultChunkerConfig() ChunkerConfig { + return ChunkerConfig{ + MinSize: DefaultMinChunkSize, + AvgSize: DefaultAvgChunkSize, + MaxSize: DefaultMaxChunkSize, + } +} + +// Chunker performs content-defined chunking using Gear hash +type Chunker struct { + reader io.Reader + config ChunkerConfig + + // Rolling hash state + hash uint64 + + // Current chunk state + buf []byte + offset int64 + mask uint64 +} + +// NewChunker creates a new chunker for the given reader +func NewChunker(r io.Reader, config ChunkerConfig) *Chunker { + // Calculate mask for target average size + // We want: avg_size = 1 / P(boundary) + // With mask, P(boundary) = 1 / (mask + 1) + // So mask = avg_size - 1 + mask := uint64(config.AvgSize - 1) + + return &Chunker{ + reader: r, + config: config, + buf: make([]byte, 0, config.MaxSize), + mask: mask, + } +} + +// Next returns the next chunk from the input stream +// Returns io.EOF when no more data is available +func (c *Chunker) Next() (*Chunk, error) { + c.buf = c.buf[:0] + c.hash = 0 + + // Read bytes until we find a chunk boundary or hit max size + singleByte := make([]byte, 1) + + for { + n, err := c.reader.Read(singleByte) + if n == 0 { + if err == io.EOF { + // Return remaining data as final chunk + if len(c.buf) > 0 { + return c.makeChunk(), nil + } + return nil, io.EOF + } + if err != nil { + return nil, err + } + continue + } + + b := singleByte[0] + c.buf = append(c.buf, b) + + // Update Gear rolling hash + // Gear hash: hash = (hash << 1) + gear_table[byte] + c.hash = (c.hash << 1) + gearTable[b] + + // Check for chunk boundary after minimum size + if len(c.buf) >= c.config.MinSize { + // Check if we hit a boundary (hash matches mask pattern) + if (c.hash & c.mask) == 0 { + return c.makeChunk(), nil + } + } + + // Force boundary at max size + if len(c.buf) >= c.config.MaxSize { + return c.makeChunk(), nil + } + } +} + +// makeChunk creates a Chunk from the current buffer +func (c *Chunker) makeChunk() *Chunk { + // Compute SHA-256 hash + h := sha256.Sum256(c.buf) + hash := hex.EncodeToString(h[:]) + + // Copy data + data := make([]byte, len(c.buf)) + copy(data, c.buf) + + chunk := &Chunk{ + Hash: hash, + Data: data, + Offset: c.offset, + Length: len(data), + } + + c.offset += int64(len(data)) + return chunk +} + +// ChunkReader splits a reader into content-defined chunks +// and returns them via a channel for concurrent processing +func ChunkReader(r io.Reader, config ChunkerConfig) (<-chan *Chunk, <-chan error) { + chunks := make(chan *Chunk, 100) + errs := make(chan error, 1) + + go func() { + defer close(chunks) + defer close(errs) + + chunker := NewChunker(r, config) + for { + chunk, err := chunker.Next() + if err == io.EOF { + return + } + if err != nil { + errs <- err + return + } + chunks <- chunk + } + }() + + return chunks, errs +} + +// HashData computes SHA-256 hash of data +func HashData(data []byte) string { + h := sha256.Sum256(data) + return hex.EncodeToString(h[:]) +} diff --git a/internal/dedup/chunker_test.go b/internal/dedup/chunker_test.go new file mode 100644 index 0000000..d6a9139 --- /dev/null +++ b/internal/dedup/chunker_test.go @@ -0,0 +1,217 @@ +package dedup + +import ( + "bytes" + "crypto/rand" + "io" + "testing" +) + +func TestChunker_Basic(t *testing.T) { + // Create test data + data := make([]byte, 100*1024) // 100KB + rand.Read(data) + + chunker := NewChunker(bytes.NewReader(data), DefaultChunkerConfig()) + + var chunks []*Chunk + var totalBytes int + + for { + chunk, err := chunker.Next() + if err == io.EOF { + break + } + if err != nil { + t.Fatalf("Chunker.Next() error: %v", err) + } + + chunks = append(chunks, chunk) + totalBytes += chunk.Length + + // Verify chunk properties + if chunk.Length < DefaultMinChunkSize && len(chunks) < 10 { + // Only the last chunk can be smaller than min + // (unless file is smaller than min) + } + if chunk.Length > DefaultMaxChunkSize { + t.Errorf("Chunk %d exceeds max size: %d > %d", len(chunks), chunk.Length, DefaultMaxChunkSize) + } + if chunk.Hash == "" { + t.Errorf("Chunk %d has empty hash", len(chunks)) + } + if len(chunk.Hash) != 64 { // SHA-256 hex length + t.Errorf("Chunk %d has invalid hash length: %d", len(chunks), len(chunk.Hash)) + } + } + + if totalBytes != len(data) { + t.Errorf("Total bytes mismatch: got %d, want %d", totalBytes, len(data)) + } + + t.Logf("Chunked %d bytes into %d chunks", totalBytes, len(chunks)) + t.Logf("Average chunk size: %d bytes", totalBytes/len(chunks)) +} + +func TestChunker_Deterministic(t *testing.T) { + // Same data should produce same chunks + data := make([]byte, 50*1024) + rand.Read(data) + + // First pass + chunker1 := NewChunker(bytes.NewReader(data), DefaultChunkerConfig()) + var hashes1 []string + for { + chunk, err := chunker1.Next() + if err == io.EOF { + break + } + if err != nil { + t.Fatal(err) + } + hashes1 = append(hashes1, chunk.Hash) + } + + // Second pass + chunker2 := NewChunker(bytes.NewReader(data), DefaultChunkerConfig()) + var hashes2 []string + for { + chunk, err := chunker2.Next() + if err == io.EOF { + break + } + if err != nil { + t.Fatal(err) + } + hashes2 = append(hashes2, chunk.Hash) + } + + // Compare + if len(hashes1) != len(hashes2) { + t.Fatalf("Different chunk counts: %d vs %d", len(hashes1), len(hashes2)) + } + + for i := range hashes1 { + if hashes1[i] != hashes2[i] { + t.Errorf("Hash mismatch at chunk %d: %s vs %s", i, hashes1[i], hashes2[i]) + } + } +} + +func TestChunker_ShiftedData(t *testing.T) { + // Test that shifted data still shares chunks (the key CDC benefit) + original := make([]byte, 100*1024) + rand.Read(original) + + // Create shifted version (prepend some bytes) + prefix := make([]byte, 1000) + rand.Read(prefix) + shifted := append(prefix, original...) + + // Chunk both + config := DefaultChunkerConfig() + + chunker1 := NewChunker(bytes.NewReader(original), config) + hashes1 := make(map[string]bool) + for { + chunk, err := chunker1.Next() + if err == io.EOF { + break + } + if err != nil { + t.Fatal(err) + } + hashes1[chunk.Hash] = true + } + + chunker2 := NewChunker(bytes.NewReader(shifted), config) + var matched, total int + for { + chunk, err := chunker2.Next() + if err == io.EOF { + break + } + if err != nil { + t.Fatal(err) + } + total++ + if hashes1[chunk.Hash] { + matched++ + } + } + + // Should have significant overlap despite the shift + overlapRatio := float64(matched) / float64(total) + t.Logf("Chunk overlap after %d-byte shift: %.1f%% (%d/%d chunks)", + len(prefix), overlapRatio*100, matched, total) + + // We expect at least 50% overlap for content-defined chunking + if overlapRatio < 0.5 { + t.Errorf("Low chunk overlap: %.1f%% (expected >50%%)", overlapRatio*100) + } +} + +func TestChunker_SmallFile(t *testing.T) { + // File smaller than min chunk size + data := []byte("hello world") + chunker := NewChunker(bytes.NewReader(data), DefaultChunkerConfig()) + + chunk, err := chunker.Next() + if err != nil { + t.Fatal(err) + } + + if chunk.Length != len(data) { + t.Errorf("Expected chunk length %d, got %d", len(data), chunk.Length) + } + + // Should be EOF after + _, err = chunker.Next() + if err != io.EOF { + t.Errorf("Expected EOF, got %v", err) + } +} + +func TestChunker_EmptyFile(t *testing.T) { + chunker := NewChunker(bytes.NewReader(nil), DefaultChunkerConfig()) + + _, err := chunker.Next() + if err != io.EOF { + t.Errorf("Expected EOF for empty file, got %v", err) + } +} + +func TestHashData(t *testing.T) { + hash := HashData([]byte("test")) + if len(hash) != 64 { + t.Errorf("Expected 64-char hash, got %d", len(hash)) + } + + // Known SHA-256 of "test" + expected := "9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08" + if hash != expected { + t.Errorf("Hash mismatch: got %s, want %s", hash, expected) + } +} + +func BenchmarkChunker(b *testing.B) { + // 1MB of random data + data := make([]byte, 1024*1024) + rand.Read(data) + + b.ResetTimer() + b.SetBytes(int64(len(data))) + + for i := 0; i < b.N; i++ { + chunker := NewChunker(bytes.NewReader(data), DefaultChunkerConfig()) + for { + _, err := chunker.Next() + if err == io.EOF { + break + } + if err != nil { + b.Fatal(err) + } + } + } +} diff --git a/internal/dedup/index.go b/internal/dedup/index.go new file mode 100644 index 0000000..d810693 --- /dev/null +++ b/internal/dedup/index.go @@ -0,0 +1,239 @@ +package dedup + +import ( + "database/sql" + "fmt" + "path/filepath" + "time" + + _ "github.com/mattn/go-sqlite3" // SQLite driver +) + +// ChunkIndex provides fast chunk lookups using SQLite +type ChunkIndex struct { + db *sql.DB +} + +// NewChunkIndex opens or creates a chunk index database +func NewChunkIndex(basePath string) (*ChunkIndex, error) { + dbPath := filepath.Join(basePath, "chunks.db") + + db, err := sql.Open("sqlite3", dbPath+"?_journal_mode=WAL&_synchronous=NORMAL") + if err != nil { + return nil, fmt.Errorf("failed to open chunk index: %w", err) + } + + idx := &ChunkIndex{db: db} + if err := idx.migrate(); err != nil { + db.Close() + return nil, err + } + + return idx, nil +} + +// migrate creates the schema if needed +func (idx *ChunkIndex) migrate() error { + schema := ` + CREATE TABLE IF NOT EXISTS chunks ( + hash TEXT PRIMARY KEY, + size_raw INTEGER NOT NULL, + size_stored INTEGER NOT NULL, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + last_accessed DATETIME, + ref_count INTEGER DEFAULT 1 + ); + + CREATE TABLE IF NOT EXISTS manifests ( + id TEXT PRIMARY KEY, + database_type TEXT, + database_name TEXT, + database_host TEXT, + created_at DATETIME, + original_size INTEGER, + stored_size INTEGER, + chunk_count INTEGER, + new_chunks INTEGER, + dedup_ratio REAL, + sha256 TEXT, + verified_at DATETIME + ); + + CREATE INDEX IF NOT EXISTS idx_chunks_created ON chunks(created_at); + CREATE INDEX IF NOT EXISTS idx_chunks_accessed ON chunks(last_accessed); + CREATE INDEX IF NOT EXISTS idx_manifests_created ON manifests(created_at); + CREATE INDEX IF NOT EXISTS idx_manifests_database ON manifests(database_name); + ` + + _, err := idx.db.Exec(schema) + return err +} + +// Close closes the database +func (idx *ChunkIndex) Close() error { + return idx.db.Close() +} + +// AddChunk records a chunk in the index +func (idx *ChunkIndex) AddChunk(hash string, sizeRaw, sizeStored int) error { + _, err := idx.db.Exec(` + INSERT INTO chunks (hash, size_raw, size_stored, created_at, last_accessed, ref_count) + VALUES (?, ?, ?, ?, ?, 1) + ON CONFLICT(hash) DO UPDATE SET + ref_count = ref_count + 1, + last_accessed = ? + `, hash, sizeRaw, sizeStored, time.Now(), time.Now(), time.Now()) + return err +} + +// HasChunk checks if a chunk exists in the index +func (idx *ChunkIndex) HasChunk(hash string) (bool, error) { + var count int + err := idx.db.QueryRow("SELECT COUNT(*) FROM chunks WHERE hash = ?", hash).Scan(&count) + return count > 0, err +} + +// GetChunk retrieves chunk metadata +func (idx *ChunkIndex) GetChunk(hash string) (*ChunkMeta, error) { + var m ChunkMeta + err := idx.db.QueryRow(` + SELECT hash, size_raw, size_stored, created_at, ref_count + FROM chunks WHERE hash = ? + `, hash).Scan(&m.Hash, &m.SizeRaw, &m.SizeStored, &m.CreatedAt, &m.RefCount) + if err == sql.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, err + } + return &m, nil +} + +// ChunkMeta holds metadata about a chunk +type ChunkMeta struct { + Hash string + SizeRaw int64 + SizeStored int64 + CreatedAt time.Time + RefCount int +} + +// DecrementRef decreases the reference count for a chunk +// Returns true if the chunk should be deleted (ref_count <= 0) +func (idx *ChunkIndex) DecrementRef(hash string) (shouldDelete bool, err error) { + result, err := idx.db.Exec(` + UPDATE chunks SET ref_count = ref_count - 1 WHERE hash = ? + `, hash) + if err != nil { + return false, err + } + + affected, _ := result.RowsAffected() + if affected == 0 { + return false, nil + } + + var refCount int + err = idx.db.QueryRow("SELECT ref_count FROM chunks WHERE hash = ?", hash).Scan(&refCount) + if err != nil { + return false, err + } + + return refCount <= 0, nil +} + +// RemoveChunk removes a chunk from the index +func (idx *ChunkIndex) RemoveChunk(hash string) error { + _, err := idx.db.Exec("DELETE FROM chunks WHERE hash = ?", hash) + return err +} + +// AddManifest records a manifest in the index +func (idx *ChunkIndex) AddManifest(m *Manifest) error { + _, err := idx.db.Exec(` + INSERT OR REPLACE INTO manifests + (id, database_type, database_name, database_host, created_at, + original_size, stored_size, chunk_count, new_chunks, dedup_ratio, sha256) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + `, m.ID, m.DatabaseType, m.DatabaseName, m.DatabaseHost, m.CreatedAt, + m.OriginalSize, m.StoredSize, m.ChunkCount, m.NewChunks, m.DedupRatio, m.SHA256) + return err +} + +// RemoveManifest removes a manifest from the index +func (idx *ChunkIndex) RemoveManifest(id string) error { + _, err := idx.db.Exec("DELETE FROM manifests WHERE id = ?", id) + return err +} + +// IndexStats holds statistics about the dedup index +type IndexStats struct { + TotalChunks int64 + TotalManifests int64 + TotalSizeRaw int64 // Uncompressed, undeduplicated + TotalSizeStored int64 // On-disk after dedup+compression + DedupRatio float64 + OldestChunk time.Time + NewestChunk time.Time +} + +// Stats returns statistics about the index +func (idx *ChunkIndex) Stats() (*IndexStats, error) { + stats := &IndexStats{} + + var oldestStr, newestStr string + err := idx.db.QueryRow(` + SELECT + COUNT(*), + COALESCE(SUM(size_raw), 0), + COALESCE(SUM(size_stored), 0), + COALESCE(MIN(created_at), ''), + COALESCE(MAX(created_at), '') + FROM chunks + `).Scan(&stats.TotalChunks, &stats.TotalSizeRaw, &stats.TotalSizeStored, + &oldestStr, &newestStr) + if err != nil { + return nil, err + } + + // Parse time strings + if oldestStr != "" { + stats.OldestChunk, _ = time.Parse("2006-01-02 15:04:05", oldestStr) + } + if newestStr != "" { + stats.NewestChunk, _ = time.Parse("2006-01-02 15:04:05", newestStr) + } + + idx.db.QueryRow("SELECT COUNT(*) FROM manifests").Scan(&stats.TotalManifests) + + if stats.TotalSizeRaw > 0 { + stats.DedupRatio = 1.0 - float64(stats.TotalSizeStored)/float64(stats.TotalSizeRaw) + } + + return stats, nil +} + +// ListOrphanedChunks returns chunks that have ref_count <= 0 +func (idx *ChunkIndex) ListOrphanedChunks() ([]string, error) { + rows, err := idx.db.Query("SELECT hash FROM chunks WHERE ref_count <= 0") + if err != nil { + return nil, err + } + defer rows.Close() + + var hashes []string + for rows.Next() { + var hash string + if err := rows.Scan(&hash); err != nil { + continue + } + hashes = append(hashes, hash) + } + return hashes, rows.Err() +} + +// Vacuum cleans up the database +func (idx *ChunkIndex) Vacuum() error { + _, err := idx.db.Exec("VACUUM") + return err +} diff --git a/internal/dedup/manifest.go b/internal/dedup/manifest.go new file mode 100644 index 0000000..b4a0f17 --- /dev/null +++ b/internal/dedup/manifest.go @@ -0,0 +1,188 @@ +package dedup + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "time" +) + +// Manifest describes a single backup as a list of chunks +type Manifest struct { + // ID is the unique identifier (typically timestamp-based) + ID string `json:"id"` + + // Name is an optional human-readable name + Name string `json:"name,omitempty"` + + // CreatedAt is when this backup was created + CreatedAt time.Time `json:"created_at"` + + // Database information + DatabaseType string `json:"database_type"` // postgres, mysql + DatabaseName string `json:"database_name"` + DatabaseHost string `json:"database_host"` + + // Chunks is the ordered list of chunk hashes + // The file is reconstructed by concatenating chunks in order + Chunks []ChunkRef `json:"chunks"` + + // Stats about the backup + OriginalSize int64 `json:"original_size"` // Size before deduplication + StoredSize int64 `json:"stored_size"` // Size after dedup (new chunks only) + ChunkCount int `json:"chunk_count"` // Total chunks + NewChunks int `json:"new_chunks"` // Chunks that weren't deduplicated + DedupRatio float64 `json:"dedup_ratio"` // 1.0 = no dedup, 0.0 = 100% dedup + + // Encryption and compression settings used + Encrypted bool `json:"encrypted"` + Compressed bool `json:"compressed"` + + // Verification + SHA256 string `json:"sha256"` // Hash of reconstructed file + VerifiedAt time.Time `json:"verified_at,omitempty"` +} + +// ChunkRef references a chunk in the manifest +type ChunkRef struct { + Hash string `json:"h"` // SHA-256 hash (64 chars) + Offset int64 `json:"o"` // Offset in original file + Length int `json:"l"` // Chunk length +} + +// ManifestStore manages backup manifests +type ManifestStore struct { + basePath string +} + +// NewManifestStore creates a new manifest store +func NewManifestStore(basePath string) (*ManifestStore, error) { + manifestDir := filepath.Join(basePath, "manifests") + if err := os.MkdirAll(manifestDir, 0700); err != nil { + return nil, fmt.Errorf("failed to create manifest directory: %w", err) + } + return &ManifestStore{basePath: basePath}, nil +} + +// manifestPath returns the path for a manifest ID +func (s *ManifestStore) manifestPath(id string) string { + return filepath.Join(s.basePath, "manifests", id+".manifest.json") +} + +// Save writes a manifest to disk +func (s *ManifestStore) Save(m *Manifest) error { + path := s.manifestPath(m.ID) + + data, err := json.MarshalIndent(m, "", " ") + if err != nil { + return fmt.Errorf("failed to marshal manifest: %w", err) + } + + // Atomic write + tmpPath := path + ".tmp" + if err := os.WriteFile(tmpPath, data, 0600); err != nil { + return fmt.Errorf("failed to write manifest: %w", err) + } + + if err := os.Rename(tmpPath, path); err != nil { + os.Remove(tmpPath) + return fmt.Errorf("failed to commit manifest: %w", err) + } + + return nil +} + +// Load reads a manifest from disk +func (s *ManifestStore) Load(id string) (*Manifest, error) { + path := s.manifestPath(id) + + data, err := os.ReadFile(path) + if err != nil { + return nil, fmt.Errorf("failed to read manifest %s: %w", id, err) + } + + var m Manifest + if err := json.Unmarshal(data, &m); err != nil { + return nil, fmt.Errorf("failed to parse manifest %s: %w", id, err) + } + + return &m, nil +} + +// Delete removes a manifest +func (s *ManifestStore) Delete(id string) error { + path := s.manifestPath(id) + if err := os.Remove(path); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to delete manifest %s: %w", id, err) + } + return nil +} + +// List returns all manifest IDs +func (s *ManifestStore) List() ([]string, error) { + manifestDir := filepath.Join(s.basePath, "manifests") + entries, err := os.ReadDir(manifestDir) + if err != nil { + return nil, fmt.Errorf("failed to list manifests: %w", err) + } + + var ids []string + for _, e := range entries { + if e.IsDir() { + continue + } + name := e.Name() + if len(name) > 14 && name[len(name)-14:] == ".manifest.json" { + ids = append(ids, name[:len(name)-14]) + } + } + + return ids, nil +} + +// ListAll returns all manifests sorted by creation time (newest first) +func (s *ManifestStore) ListAll() ([]*Manifest, error) { + ids, err := s.List() + if err != nil { + return nil, err + } + + var manifests []*Manifest + for _, id := range ids { + m, err := s.Load(id) + if err != nil { + continue // Skip corrupted manifests + } + manifests = append(manifests, m) + } + + // Sort by creation time (newest first) + for i := 0; i < len(manifests)-1; i++ { + for j := i + 1; j < len(manifests); j++ { + if manifests[j].CreatedAt.After(manifests[i].CreatedAt) { + manifests[i], manifests[j] = manifests[j], manifests[i] + } + } + } + + return manifests, nil +} + +// GetChunkHashes returns all unique chunk hashes referenced by manifests +func (s *ManifestStore) GetChunkHashes() (map[string]int, error) { + manifests, err := s.ListAll() + if err != nil { + return nil, err + } + + // Map hash -> reference count + refs := make(map[string]int) + for _, m := range manifests { + for _, c := range m.Chunks { + refs[c.Hash]++ + } + } + + return refs, nil +} diff --git a/internal/dedup/store.go b/internal/dedup/store.go new file mode 100644 index 0000000..026bf90 --- /dev/null +++ b/internal/dedup/store.go @@ -0,0 +1,367 @@ +package dedup + +import ( + "compress/gzip" + "crypto/aes" + "crypto/cipher" + "crypto/rand" + "crypto/sha256" + "encoding/hex" + "fmt" + "io" + "os" + "path/filepath" + "sync" +) + +// ChunkStore manages content-addressed chunk storage +// Chunks are stored as: //.chunk[.gz][.enc] +type ChunkStore struct { + basePath string + compress bool + encryptionKey []byte // 32 bytes for AES-256 + mu sync.RWMutex + existingChunks map[string]bool // Cache of known chunks +} + +// StoreConfig holds configuration for the chunk store +type StoreConfig struct { + BasePath string + Compress bool // Enable gzip compression + EncryptionKey string // Optional: hex-encoded 32-byte key for AES-256-GCM +} + +// NewChunkStore creates a new chunk store +func NewChunkStore(config StoreConfig) (*ChunkStore, error) { + store := &ChunkStore{ + basePath: config.BasePath, + compress: config.Compress, + existingChunks: make(map[string]bool), + } + + // Parse encryption key if provided + if config.EncryptionKey != "" { + key, err := hex.DecodeString(config.EncryptionKey) + if err != nil { + return nil, fmt.Errorf("invalid encryption key: %w", err) + } + if len(key) != 32 { + return nil, fmt.Errorf("encryption key must be 32 bytes (got %d)", len(key)) + } + store.encryptionKey = key + } + + // Create base directory structure + if err := os.MkdirAll(config.BasePath, 0700); err != nil { + return nil, fmt.Errorf("failed to create chunk store: %w", err) + } + + // Create chunks and manifests directories + for _, dir := range []string{"chunks", "manifests"} { + if err := os.MkdirAll(filepath.Join(config.BasePath, dir), 0700); err != nil { + return nil, fmt.Errorf("failed to create %s directory: %w", dir, err) + } + } + + return store, nil +} + +// chunkPath returns the filesystem path for a chunk hash +// Uses 2-character prefix for directory sharding (256 subdirs) +func (s *ChunkStore) chunkPath(hash string) string { + if len(hash) < 2 { + return filepath.Join(s.basePath, "chunks", "xx", hash+s.chunkExt()) + } + prefix := hash[:2] + return filepath.Join(s.basePath, "chunks", prefix, hash+s.chunkExt()) +} + +// chunkExt returns the file extension based on compression/encryption settings +func (s *ChunkStore) chunkExt() string { + ext := ".chunk" + if s.compress { + ext += ".gz" + } + if s.encryptionKey != nil { + ext += ".enc" + } + return ext +} + +// Has checks if a chunk exists in the store +func (s *ChunkStore) Has(hash string) bool { + s.mu.RLock() + if exists, ok := s.existingChunks[hash]; ok { + s.mu.RUnlock() + return exists + } + s.mu.RUnlock() + + // Check filesystem + path := s.chunkPath(hash) + _, err := os.Stat(path) + exists := err == nil + + s.mu.Lock() + s.existingChunks[hash] = exists + s.mu.Unlock() + + return exists +} + +// Put stores a chunk, returning true if it was new (not deduplicated) +func (s *ChunkStore) Put(chunk *Chunk) (isNew bool, err error) { + // Check if already exists (deduplication!) + if s.Has(chunk.Hash) { + return false, nil + } + + path := s.chunkPath(chunk.Hash) + + // Create prefix directory + if err := os.MkdirAll(filepath.Dir(path), 0700); err != nil { + return false, fmt.Errorf("failed to create chunk directory: %w", err) + } + + // Prepare data + data := chunk.Data + + // Compress if enabled + if s.compress { + data, err = s.compressData(data) + if err != nil { + return false, fmt.Errorf("compression failed: %w", err) + } + } + + // Encrypt if enabled + if s.encryptionKey != nil { + data, err = s.encryptData(data) + if err != nil { + return false, fmt.Errorf("encryption failed: %w", err) + } + } + + // Write atomically (write to temp, then rename) + tmpPath := path + ".tmp" + if err := os.WriteFile(tmpPath, data, 0600); err != nil { + return false, fmt.Errorf("failed to write chunk: %w", err) + } + + if err := os.Rename(tmpPath, path); err != nil { + os.Remove(tmpPath) + return false, fmt.Errorf("failed to commit chunk: %w", err) + } + + // Update cache + s.mu.Lock() + s.existingChunks[chunk.Hash] = true + s.mu.Unlock() + + return true, nil +} + +// Get retrieves a chunk by hash +func (s *ChunkStore) Get(hash string) (*Chunk, error) { + path := s.chunkPath(hash) + + data, err := os.ReadFile(path) + if err != nil { + return nil, fmt.Errorf("failed to read chunk %s: %w", hash, err) + } + + // Decrypt if encrypted + if s.encryptionKey != nil { + data, err = s.decryptData(data) + if err != nil { + return nil, fmt.Errorf("decryption failed: %w", err) + } + } + + // Decompress if compressed + if s.compress { + data, err = s.decompressData(data) + if err != nil { + return nil, fmt.Errorf("decompression failed: %w", err) + } + } + + // Verify hash + h := sha256.Sum256(data) + actualHash := hex.EncodeToString(h[:]) + if actualHash != hash { + return nil, fmt.Errorf("chunk hash mismatch: expected %s, got %s", hash, actualHash) + } + + return &Chunk{ + Hash: hash, + Data: data, + Length: len(data), + }, nil +} + +// Delete removes a chunk from the store +func (s *ChunkStore) Delete(hash string) error { + path := s.chunkPath(hash) + + if err := os.Remove(path); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to delete chunk %s: %w", hash, err) + } + + s.mu.Lock() + delete(s.existingChunks, hash) + s.mu.Unlock() + + return nil +} + +// Stats returns storage statistics +type StoreStats struct { + TotalChunks int64 + TotalSize int64 // Bytes on disk (after compression/encryption) + UniqueSize int64 // Bytes of unique data + Directories int +} + +// Stats returns statistics about the chunk store +func (s *ChunkStore) Stats() (*StoreStats, error) { + stats := &StoreStats{} + + chunksDir := filepath.Join(s.basePath, "chunks") + err := filepath.Walk(chunksDir, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if info.IsDir() { + stats.Directories++ + return nil + } + stats.TotalChunks++ + stats.TotalSize += info.Size() + return nil + }) + + return stats, err +} + +// LoadIndex loads the existing chunk hashes into memory +func (s *ChunkStore) LoadIndex() error { + s.mu.Lock() + defer s.mu.Unlock() + + s.existingChunks = make(map[string]bool) + + chunksDir := filepath.Join(s.basePath, "chunks") + return filepath.Walk(chunksDir, func(path string, info os.FileInfo, err error) error { + if err != nil || info.IsDir() { + return err + } + + // Extract hash from filename + base := filepath.Base(path) + hash := base + // Remove extensions + for _, ext := range []string{".enc", ".gz", ".chunk"} { + if len(hash) > len(ext) && hash[len(hash)-len(ext):] == ext { + hash = hash[:len(hash)-len(ext)] + } + } + if len(hash) == 64 { // SHA-256 hex length + s.existingChunks[hash] = true + } + + return nil + }) +} + +// compressData compresses data using gzip +func (s *ChunkStore) compressData(data []byte) ([]byte, error) { + var buf []byte + w, err := gzip.NewWriterLevel((*bytesBuffer)(&buf), gzip.BestCompression) + if err != nil { + return nil, err + } + if _, err := w.Write(data); err != nil { + return nil, err + } + if err := w.Close(); err != nil { + return nil, err + } + return buf, nil +} + +// bytesBuffer is a simple io.Writer that appends to a byte slice +type bytesBuffer []byte + +func (b *bytesBuffer) Write(p []byte) (int, error) { + *b = append(*b, p...) + return len(p), nil +} + +// decompressData decompresses gzip data +func (s *ChunkStore) decompressData(data []byte) ([]byte, error) { + r, err := gzip.NewReader(&bytesReader{data: data}) + if err != nil { + return nil, err + } + defer r.Close() + return io.ReadAll(r) +} + +// bytesReader is a simple io.Reader from a byte slice +type bytesReader struct { + data []byte + pos int +} + +func (r *bytesReader) Read(p []byte) (int, error) { + if r.pos >= len(r.data) { + return 0, io.EOF + } + n := copy(p, r.data[r.pos:]) + r.pos += n + return n, nil +} + +// encryptData encrypts data using AES-256-GCM +func (s *ChunkStore) encryptData(plaintext []byte) ([]byte, error) { + block, err := aes.NewCipher(s.encryptionKey) + if err != nil { + return nil, err + } + + gcm, err := cipher.NewGCM(block) + if err != nil { + return nil, err + } + + nonce := make([]byte, gcm.NonceSize()) + if _, err := rand.Read(nonce); err != nil { + return nil, err + } + + // Prepend nonce to ciphertext + return gcm.Seal(nonce, nonce, plaintext, nil), nil +} + +// decryptData decrypts AES-256-GCM encrypted data +func (s *ChunkStore) decryptData(ciphertext []byte) ([]byte, error) { + block, err := aes.NewCipher(s.encryptionKey) + if err != nil { + return nil, err + } + + gcm, err := cipher.NewGCM(block) + if err != nil { + return nil, err + } + + if len(ciphertext) < gcm.NonceSize() { + return nil, fmt.Errorf("ciphertext too short") + } + + nonce := ciphertext[:gcm.NonceSize()] + ciphertext = ciphertext[gcm.NonceSize():] + + return gcm.Open(nil, nonce, ciphertext, nil) +}