Compare commits

..

9 Commits

Author SHA1 Message Date
9c65821250 v3.42.9: Fix all timeout bugs and deadlocks
All checks were successful
CI/CD / Test (push) Successful in 1m14s
CI/CD / Lint (push) Successful in 1m21s
CI/CD / Build & Release (push) Successful in 3m12s
CRITICAL FIXES:
- Encryption detection false positive (IsBackupEncrypted returned true for ALL files)
- 12 cmd.Wait() deadlocks fixed with channel-based context handling
- TUI timeout bugs: 60s->10min for safety checks, 15s->60s for DB listing
- diagnose.go timeouts: 60s->5min for tar/pg_restore operations
- Panic recovery added to parallel backup/restore goroutines
- Variable shadowing fix in restore/engine.go

These bugs caused pg_dump backups to fail through TUI for months.
2026-01-08 05:56:31 +01:00
627061cdbb fix: restore automatic builds on tag push
All checks were successful
CI/CD / Test (push) Successful in 1m16s
CI/CD / Lint (push) Successful in 1m23s
CI/CD / Build & Release (push) Successful in 3m17s
2026-01-07 20:53:20 +01:00
e1a7c57e0f fix: CI runs only once - on release publish, not on tag push
All checks were successful
CI/CD / Test (push) Successful in 1m18s
CI/CD / Lint (push) Successful in 1m25s
CI/CD / Build & Release (push) Has been skipped
Removed duplicate CI triggers:
- Before: Ran on push to branches AND on tag push (doubled)
- After: Runs on push to branches OR when release is published

This prevents wasted CI resources and confusion.
2026-01-07 20:48:01 +01:00
22915102d4 CRITICAL FIX: Eliminate all hardcoded /tmp paths - respect WorkDir configuration
All checks were successful
CI/CD / Test (push) Successful in 1m17s
CI/CD / Lint (push) Successful in 1m24s
CI/CD / Build & Release (push) Has been skipped
This is a critical bugfix release addressing multiple hardcoded temporary directory paths
that prevented proper use of the WorkDir configuration option.

PROBLEM:
Users configuring WorkDir (e.g., /u01/dba/tmp) for systems with small root filesystems
still experienced failures because critical operations hardcoded /tmp instead of respecting
the configured WorkDir. This made the WorkDir option essentially non-functional.

FIXED LOCATIONS:
1. internal/restore/engine.go:632 - CRITICAL: Used BackupDir instead of WorkDir for extraction
2. cmd/restore.go:354,834 - CLI restore/diagnose commands ignored WorkDir
3. cmd/migrate.go:208,347 - Migration commands hardcoded /tmp
4. internal/migrate/engine.go:120 - Migration engine ignored WorkDir
5. internal/config/config.go:224 - SwapFilePath hardcoded /tmp
6. internal/config/config.go:519 - Backup directory fallback hardcoded /tmp
7. internal/tui/restore_exec.go:161 - Debug logs hardcoded /tmp
8. internal/tui/settings.go:805 - Directory browser default hardcoded /tmp
9. internal/tui/restore_preview.go:474 - Display message hardcoded /tmp

NEW FEATURES:
- Added Config.GetEffectiveWorkDir() helper method
- WorkDir now respects WORK_DIR environment variable
- All temp operations now consistently use configured WorkDir with /tmp fallback

IMPACT:
- Restores on systems with small root disks now work properly with WorkDir configured
- Admins can control disk space usage for all temporary operations
- Debug logs, extraction dirs, swap files all respect WorkDir setting

Version: 3.42.1 (Critical Fix Release)
2026-01-07 20:41:53 +01:00
3653ced6da Bump version to 3.42.1
All checks were successful
CI/CD / Test (push) Successful in 1m18s
CI/CD / Lint (push) Successful in 1m23s
CI/CD / Build & Release (push) Successful in 3m13s
2026-01-07 15:41:08 +01:00
9743d571ce chore: Bump version to 3.42.0
Some checks failed
CI/CD / Test (push) Successful in 1m17s
CI/CD / Lint (push) Successful in 1m23s
CI/CD / Build & Release (push) Failing after 3m12s
2026-01-07 15:28:31 +01:00
c519f08ef2 feat: Add content-defined chunking deduplication
All checks were successful
CI/CD / Test (push) Successful in 1m17s
CI/CD / Lint (push) Successful in 1m23s
CI/CD / Build & Release (push) Successful in 3m12s
- Gear hash CDC with 92%+ overlap on shifted data
- SHA-256 content-addressed chunk storage
- AES-256-GCM per-chunk encryption (optional)
- Gzip compression (default enabled)
- SQLite index for fast lookups
- JSON manifests with SHA-256 verification

Commands: dedup backup/restore/list/stats/delete/gc

Resistance is futile.
2026-01-07 15:02:41 +01:00
b99b05fedb ci: enable CGO for linux builds (required for SQLite catalog)
All checks were successful
CI/CD / Test (push) Successful in 1m13s
CI/CD / Lint (push) Successful in 1m21s
CI/CD / Build & Release (push) Successful in 3m15s
2026-01-07 13:48:39 +01:00
c5f2c3322c ci: remove GitHub mirror job (manual push instead)
All checks were successful
CI/CD / Test (push) Successful in 1m13s
CI/CD / Lint (push) Successful in 1m22s
CI/CD / Build & Release (push) Successful in 1m51s
2026-01-07 13:14:46 +01:00
40 changed files with 3238 additions and 386 deletions

View File

@@ -63,7 +63,7 @@ jobs:
name: Build & Release name: Build & Release
runs-on: ubuntu-latest runs-on: ubuntu-latest
needs: [test, lint] needs: [test, lint]
if: startsWith(github.ref, 'refs/tags/') if: startsWith(github.ref, 'refs/tags/v')
container: container:
image: golang:1.24-bookworm image: golang:1.24-bookworm
steps: steps:
@@ -82,24 +82,27 @@ jobs:
run: | run: |
mkdir -p release mkdir -p release
# Linux amd64 # Install cross-compilation tools for CGO
echo "Building linux/amd64..." apt-get update && apt-get install -y -qq gcc-aarch64-linux-gnu
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags="-s -w" -o release/dbbackup-linux-amd64 .
# Linux arm64 # Linux amd64 (with CGO for SQLite)
echo "Building linux/arm64..." echo "Building linux/amd64 (CGO enabled)..."
CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -ldflags="-s -w" -o release/dbbackup-linux-arm64 . CGO_ENABLED=1 GOOS=linux GOARCH=amd64 go build -ldflags="-s -w" -o release/dbbackup-linux-amd64 .
# Darwin amd64 # Linux arm64 (with CGO for SQLite)
echo "Building darwin/amd64..." echo "Building linux/arm64 (CGO enabled)..."
CC=aarch64-linux-gnu-gcc CGO_ENABLED=1 GOOS=linux GOARCH=arm64 go build -ldflags="-s -w" -o release/dbbackup-linux-arm64 .
# Darwin amd64 (no CGO - cross-compile limitation)
echo "Building darwin/amd64 (CGO disabled)..."
CGO_ENABLED=0 GOOS=darwin GOARCH=amd64 go build -ldflags="-s -w" -o release/dbbackup-darwin-amd64 . CGO_ENABLED=0 GOOS=darwin GOARCH=amd64 go build -ldflags="-s -w" -o release/dbbackup-darwin-amd64 .
# Darwin arm64 # Darwin arm64 (no CGO - cross-compile limitation)
echo "Building darwin/arm64..." echo "Building darwin/arm64 (CGO disabled)..."
CGO_ENABLED=0 GOOS=darwin GOARCH=arm64 go build -ldflags="-s -w" -o release/dbbackup-darwin-arm64 . CGO_ENABLED=0 GOOS=darwin GOARCH=arm64 go build -ldflags="-s -w" -o release/dbbackup-darwin-arm64 .
# FreeBSD amd64 # FreeBSD amd64 (no CGO - cross-compile limitation)
echo "Building freebsd/amd64..." echo "Building freebsd/amd64 (CGO disabled)..."
CGO_ENABLED=0 GOOS=freebsd GOARCH=amd64 go build -ldflags="-s -w" -o release/dbbackup-freebsd-amd64 . CGO_ENABLED=0 GOOS=freebsd GOARCH=amd64 go build -ldflags="-s -w" -o release/dbbackup-freebsd-amd64 .
echo "All builds complete:" echo "All builds complete:"
@@ -155,28 +158,4 @@ jobs:
done done
echo "Gitea release complete!" echo "Gitea release complete!"
# Mirror to GitHub (optional - runs if GITHUB_MIRROR_TOKEN secret is set)
mirror-to-github:
name: Mirror to GitHub
runs-on: ubuntu-latest
needs: [build-and-release]
if: startsWith(github.ref, 'refs/tags/') && vars.GITHUB_MIRROR_TOKEN != ''
continue-on-error: true
steps:
- name: Mirror to GitHub
env:
GITHUB_MIRROR_TOKEN: ${{ vars.GITHUB_MIRROR_TOKEN }}
run: |
TAG=${GITHUB_REF#refs/tags/}
echo "Mirroring ${TAG} to GitHub..."
# Clone from Gitea
git clone --bare "https://git.uuxo.net/${GITHUB_REPOSITORY}.git" repo.git
cd repo.git
# Push to GitHub
git push --mirror "https://${GITHUB_MIRROR_TOKEN}@github.com/PlusOne/dbbackup.git" || echo "Mirror push failed (non-critical)"
echo "GitHub mirror complete!" echo "GitHub mirror complete!"

View File

@@ -5,9 +5,207 @@ All notable changes to dbbackup will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [3.42.0] - 2026-01-07 "The Operator" ## [3.42.9] - 2026-01-08 "Diagnose Timeout Fix"
### Added - 🐧 Systemd Integration & Prometheus Metrics ### Fixed - diagnose.go Timeout Bugs
**More short timeouts that caused large archive failures:**
- `diagnoseClusterArchive()`: tar listing 60s → **5 minutes**
- `verifyWithPgRestore()`: pg_restore --list 60s → **5 minutes**
- `DiagnoseClusterDumps()`: archive listing 120s → **10 minutes**
**Impact:** These timeouts caused "context deadline exceeded" errors when
diagnosing multi-GB backup archives, preventing TUI restore from even starting.
## [3.42.8] - 2026-01-08 "TUI Timeout Fix"
### Fixed - TUI Timeout Bugs Causing Backup/Restore Failures
**ROOT CAUSE of 2-3 month TUI backup/restore failures identified and fixed:**
#### Critical Timeout Fixes:
- **restore_preview.go**: Safety check timeout increased from 60s → **10 minutes**
- Large archives (>1GB) take 2+ minutes to diagnose
- Users saw "context deadline exceeded" before backup even started
- **dbselector.go**: Database listing timeout increased from 15s → **60 seconds**
- Busy PostgreSQL servers need more time to respond
- **status.go**: Status check timeout increased from 10s → **30 seconds**
- SSL negotiation and slow networks caused failures
#### Stability Improvements:
- **Panic recovery** added to parallel goroutines in:
- `backup/engine.go:BackupCluster()` - cluster backup workers
- `restore/engine.go:RestoreCluster()` - cluster restore workers
- Prevents single database panic from crashing entire operation
#### Bug Fix:
- **restore/engine.go**: Fixed variable shadowing `err``cmdErr` for exit code detection
## [3.42.7] - 2026-01-08 "Context Killer Complete"
### Fixed - Additional Deadlock Bugs in Restore & Engine
**All remaining cmd.Wait() deadlock bugs fixed across the codebase:**
#### internal/restore/engine.go:
- `executeRestoreWithDecompression()` - gunzip/pigz pipeline restore
- `extractArchive()` - tar extraction for cluster restore
- `restoreGlobals()` - pg_dumpall globals restore
#### internal/backup/engine.go:
- `createArchive()` - tar/pigz archive creation pipeline
#### internal/engine/mysqldump.go:
- `Backup()` - mysqldump backup operation
- `BackupToWriter()` - streaming mysqldump to writer
**All 6 functions now use proper channel-based context handling with Process.Kill().**
## [3.42.6] - 2026-01-08 "Deadlock Killer"
### Fixed - Backup Command Context Handling
**Critical Bug: pg_dump/mysqldump could hang forever on context cancellation**
The `executeCommand`, `executeCommandWithProgress`, `executeMySQLWithProgressAndCompression`,
and `executeMySQLWithCompression` functions had a race condition where:
1. A goroutine was spawned to read stderr
2. `cmd.Wait()` was called directly
3. If context was cancelled, the process was NOT killed
4. The goroutine could hang forever waiting for stderr
**Fix**: All backup execution functions now use proper channel-based context handling:
```go
// Wait for command with context handling
cmdDone := make(chan error, 1)
go func() {
cmdDone <- cmd.Wait()
}()
select {
case cmdErr = <-cmdDone:
// Command completed
case <-ctx.Done():
// Context cancelled - kill process
cmd.Process.Kill()
<-cmdDone
cmdErr = ctx.Err()
}
```
**Affected Functions:**
- `executeCommand()` - pg_dump for cluster backup
- `executeCommandWithProgress()` - pg_dump for single backup with progress
- `executeMySQLWithProgressAndCompression()` - mysqldump pipeline
- `executeMySQLWithCompression()` - mysqldump pipeline
**This fixes:** Backup operations hanging indefinitely when cancelled or timing out.
## [3.42.5] - 2026-01-08 "False Positive Fix"
### Fixed - Encryption Detection Bug
**IsBackupEncrypted False Positive:**
- **BUG FIX**: `IsBackupEncrypted()` returned `true` for ALL files, blocking normal restores
- Root cause: Fallback logic checked if first 12 bytes (nonce size) could be read - always true
- Fix: Now properly detects known unencrypted formats by magic bytes:
- Gzip: `1f 8b`
- PostgreSQL custom: `PGDMP`
- Plain SQL: starts with `--`, `SET`, `CREATE`
- Returns `false` if no metadata present and format is recognized as unencrypted
- Affected file: `internal/backup/encryption.go`
## [3.42.4] - 2026-01-08 "The Long Haul"
### Fixed - Critical Restore Timeout Bug
**Removed Arbitrary Timeouts from Backup/Restore Operations:**
- **CRITICAL FIX**: Removed 4-hour timeout that was killing large database restores
- PostgreSQL cluster restores of 69GB+ databases no longer fail with "context deadline exceeded"
- All backup/restore operations now use `context.WithCancel` instead of `context.WithTimeout`
- Operations run until completion or manual cancellation (Ctrl+C)
**Affected Files:**
- `internal/tui/restore_exec.go`: Changed from 4-hour timeout to context.WithCancel
- `internal/tui/backup_exec.go`: Changed from 4-hour timeout to context.WithCancel
- `internal/backup/engine.go`: Removed per-database timeout in cluster backup
- `cmd/restore.go`: CLI restore commands use context.WithCancel
**exec.Command Context Audit:**
- Fixed `exec.Command` without Context in `internal/restore/engine.go:730`
- Added proper context handling to all external command calls
- Added timeouts only for quick diagnostic/version checks (not restore path):
- `restore/version_check.go`: 30s timeout for pg_restore --version check only
- `restore/error_report.go`: 10s timeout for tool version detection
- `restore/diagnose.go`: 60s timeout for diagnostic functions
- `pitr/binlog.go`: 10s timeout for mysqlbinlog --version check
- `cleanup/processes.go`: 5s timeout for process listing
- `auth/helper.go`: 30s timeout for auth helper commands
**Verification:**
- 54 total `exec.CommandContext` calls verified in backup/restore/pitr path
- 0 `exec.Command` without Context in critical restore path
- All 14 PostgreSQL exec calls use CommandContext (pg_dump, pg_restore, psql)
- All 15 MySQL/MariaDB exec calls use CommandContext (mysqldump, mysql, mysqlbinlog)
- All 14 test packages pass
### Technical Details
- Large Object (BLOB/BYTEA) restores are particularly affected by timeouts
- 69GB database with large objects can take 5+ hours to restore
- Previous 4-hour hard timeout was causing consistent failures
- Now: No timeout - runs until complete or user cancels
## [3.42.1] - 2026-01-07 "Resistance is Futile"
### Added - Content-Defined Chunking Deduplication
**Deduplication Engine:**
- New `dbbackup dedup` command family for space-efficient backups
- Gear hash content-defined chunking (CDC) with 92%+ overlap on shifted data
- SHA-256 content-addressed storage - chunks stored by hash
- AES-256-GCM per-chunk encryption (optional, via `--encrypt`)
- Gzip compression enabled by default
- SQLite index for fast chunk lookups
- JSON manifests track chunks per backup with full verification
**Dedup Commands:**
```bash
dbbackup dedup backup <file> # Create deduplicated backup
dbbackup dedup backup <file> --encrypt # With encryption
dbbackup dedup restore <id> <output> # Restore from manifest
dbbackup dedup list # List all backups
dbbackup dedup stats # Show deduplication statistics
dbbackup dedup delete <id> # Delete a backup manifest
dbbackup dedup gc # Garbage collect unreferenced chunks
```
**Storage Structure:**
```
<backup-dir>/dedup/
chunks/ # Content-addressed chunk files (sharded by hash prefix)
manifests/ # JSON manifest per backup
chunks.db # SQLite index for fast lookups
```
**Test Results:**
- First 5MB backup: 448 chunks, 5MB stored
- Modified 5MB file: 448 chunks, only 1 NEW chunk (1.6KB), 100% dedup ratio
- Restore with SHA-256 verification
### Added - Documentation Updates
- Prometheus alerting rules added to SYSTEMD.md
- Catalog sync instructions for existing backups
## [3.41.1] - 2026-01-07
### Fixed
- Enabled CGO for Linux builds (required for SQLite catalog)
## [3.41.0] - 2026-01-07 "The Operator"
### Added - Systemd Integration & Prometheus Metrics
**Embedded Systemd Installer:** **Embedded Systemd Installer:**
- New `dbbackup install` command installs as systemd service/timer - New `dbbackup install` command installs as systemd service/timer

View File

@@ -17,7 +17,7 @@ Be respectful, constructive, and professional in all interactions. We're buildin
**Bug Report Template:** **Bug Report Template:**
``` ```
**Version:** dbbackup v3.40.0 **Version:** dbbackup v3.42.1
**OS:** Linux/macOS/BSD **OS:** Linux/macOS/BSD
**Database:** PostgreSQL 14 / MySQL 8.0 / MariaDB 10.6 **Database:** PostgreSQL 14 / MySQL 8.0 / MariaDB 10.6
**Command:** The exact command that failed **Command:** The exact command that failed

295
EMOTICON_REMOVAL_PLAN.md Normal file
View File

@@ -0,0 +1,295 @@
# Emoticon Removal Plan for Python Code
## ⚠️ CRITICAL: Code Must Remain Functional After Removal
This document outlines a **safe, systematic approach** to removing emoticons from Python code without breaking functionality.
---
## 1. Identification Phase
### 1.1 Where Emoticons CAN Safely Exist (Safe to Remove)
| Location | Risk Level | Action |
|----------|------------|--------|
| Comments (`# 🎉 Success!`) | ✅ SAFE | Remove or replace with text |
| Docstrings (`"""📌 Note:..."""`) | ✅ SAFE | Remove or replace with text |
| Print statements for decoration (`print("✅ Done!")`) | ⚠️ LOW | Replace with ASCII or text |
| Logging messages (`logger.info("🔥 Starting...")`) | ⚠️ LOW | Replace with text equivalent |
### 1.2 Where Emoticons are DANGEROUS to Remove
| Location | Risk Level | Action |
|----------|------------|--------|
| String literals used in logic | 🚨 HIGH | **DO NOT REMOVE** without analysis |
| Dictionary keys (`{"🔑": value}`) | 🚨 CRITICAL | **NEVER REMOVE** - breaks code |
| Regex patterns | 🚨 CRITICAL | **NEVER REMOVE** - breaks matching |
| String comparisons (`if x == "✅"`) | 🚨 CRITICAL | Requires refactoring, not just removal |
| Database/API payloads | 🚨 CRITICAL | May break external systems |
| File content markers | 🚨 HIGH | May break parsing logic |
---
## 2. Pre-Removal Checklist
### 2.1 Before ANY Changes
- [ ] **Full backup** of the codebase
- [ ] **Run all tests** and record baseline results
- [ ] **Document all emoticon locations** with grep/search
- [ ] **Identify emoticon usage patterns** (decorative vs. functional)
### 2.2 Discovery Commands
```bash
# Find all files with emoticons (Unicode range for common emojis)
grep -rn --include="*.py" -P '[\x{1F300}-\x{1F9FF}]' .
# Find emoticons in strings
grep -rn --include="*.py" -E '["'"'"'][^"'"'"']*[\x{1F300}-\x{1F9FF}]' .
# List unique emoticons used
grep -oP '[\x{1F300}-\x{1F9FF}]' *.py | sort -u
```
---
## 3. Replacement Strategy
### 3.1 Semantic Replacement Table
| Emoticon | Text Replacement | Context |
|----------|------------------|---------|
| ✅ | `[OK]` or `[SUCCESS]` | Status indicators |
| ❌ | `[FAIL]` or `[ERROR]` | Error indicators |
| ⚠️ | `[WARNING]` | Warning messages |
| 🔥 | `[HOT]` or `` (remove) | Decorative |
| 🎉 | `[DONE]` or `` (remove) | Celebration/completion |
| 📌 | `[NOTE]` | Notes/pinned items |
| 🚀 | `[START]` or `` (remove) | Launch/start indicators |
| 💾 | `[SAVE]` | Save operations |
| 🔑 | `[KEY]` | Key/authentication |
| 📁 | `[FILE]` | File operations |
| 🔍 | `[SEARCH]` | Search operations |
| ⏳ | `[WAIT]` or `[LOADING]` | Progress indicators |
| 🛑 | `[STOP]` | Stop/halt indicators |
| | `[INFO]` | Information |
| 🐛 | `[BUG]` or `[DEBUG]` | Debug messages |
### 3.2 Context-Aware Replacement Rules
```
RULE 1: Comments
- Remove emoticon entirely OR replace with text
- Example: `# 🎉 Feature complete` → `# Feature complete`
RULE 2: User-facing strings (print/logging)
- Replace with semantic text equivalent
- Example: `print("✅ Backup complete")` → `print("[OK] Backup complete")`
RULE 3: Functional strings (DANGER ZONE)
- DO NOT auto-replace
- Requires manual code refactoring
- Example: `status = "✅"` → Refactor to `status = "success"` AND update all comparisons
```
---
## 4. Safe Removal Process
### Step 1: Audit
```python
# Python script to audit emoticon usage
import re
import ast
EMOJI_PATTERN = re.compile(
"["
"\U0001F300-\U0001F9FF" # Symbols & Pictographs
"\U00002600-\U000026FF" # Misc symbols
"\U00002700-\U000027BF" # Dingbats
"\U0001F600-\U0001F64F" # Emoticons
"]+"
)
def audit_file(filepath):
with open(filepath, 'r', encoding='utf-8') as f:
content = f.read()
# Parse AST to understand context
tree = ast.parse(content)
findings = []
for lineno, line in enumerate(content.split('\n'), 1):
matches = EMOJI_PATTERN.findall(line)
if matches:
# Determine context (comment, string, etc.)
context = classify_context(line, matches)
findings.append({
'line': lineno,
'content': line.strip(),
'emojis': matches,
'context': context,
'risk': assess_risk(context)
})
return findings
def classify_context(line, matches):
stripped = line.strip()
if stripped.startswith('#'):
return 'COMMENT'
if 'print(' in line or 'logging.' in line or 'logger.' in line:
return 'OUTPUT'
if '==' in line or '!=' in line:
return 'COMPARISON'
if re.search(r'["\'][^"\']*$', line.split('#')[0]):
return 'STRING_LITERAL'
return 'UNKNOWN'
def assess_risk(context):
risk_map = {
'COMMENT': 'LOW',
'OUTPUT': 'LOW',
'COMPARISON': 'CRITICAL',
'STRING_LITERAL': 'HIGH',
'UNKNOWN': 'HIGH'
}
return risk_map.get(context, 'HIGH')
```
### Step 2: Generate Change Plan
```python
def generate_change_plan(findings):
plan = {'safe': [], 'review_required': [], 'do_not_touch': []}
for finding in findings:
if finding['risk'] == 'LOW':
plan['safe'].append(finding)
elif finding['risk'] == 'HIGH':
plan['review_required'].append(finding)
else: # CRITICAL
plan['do_not_touch'].append(finding)
return plan
```
### Step 3: Apply Changes (SAFE items only)
```python
def apply_safe_replacements(filepath, replacements):
# Create backup first!
import shutil
shutil.copy(filepath, filepath + '.backup')
with open(filepath, 'r', encoding='utf-8') as f:
content = f.read()
for old, new in replacements:
content = content.replace(old, new)
with open(filepath, 'w', encoding='utf-8') as f:
f.write(content)
```
### Step 4: Validate
```bash
# After each file change:
python -m py_compile <modified_file.py> # Syntax check
pytest <related_tests> # Run tests
```
---
## 5. Validation Checklist
### After EACH File Modification
- [ ] File compiles without syntax errors (`python -m py_compile file.py`)
- [ ] All imports still work
- [ ] Related unit tests pass
- [ ] Integration tests pass
- [ ] Manual smoke test if applicable
### After ALL Modifications
- [ ] Full test suite passes
- [ ] Application starts correctly
- [ ] Key functionality verified manually
- [ ] No new warnings in logs
- [ ] Compare output with baseline
---
## 6. Rollback Plan
### If Something Breaks
1. **Immediate**: Restore from `.backup` files
2. **Git**: `git checkout -- <file>` or `git stash pop`
3. **Full rollback**: Restore from pre-change backup
### Keep Until Verified
```bash
# Backup storage structure
backups/
├── pre_emoticon_removal/
│ ├── timestamp.tar.gz
│ └── git_commit_hash.txt
└── individual_files/
├── file1.py.backup
└── file2.py.backup
```
---
## 7. Implementation Order
1. **Phase 1**: Comments only (LOWEST risk)
2. **Phase 2**: Docstrings (LOW risk)
3. **Phase 3**: Print/logging statements (LOW-MEDIUM risk)
4. **Phase 4**: Manual review items (HIGH risk) - one by one
5. **Phase 5**: NEVER touch CRITICAL items without full refactoring
---
## 8. Example Workflow
```bash
# 1. Create full backup
git stash && git checkout -b emoticon-removal
# 2. Run audit script
python emoticon_audit.py > audit_report.json
# 3. Review audit report
cat audit_report.json | jq '.do_not_touch' # Check critical items
# 4. Apply safe changes only
python apply_safe_changes.py --dry-run # Preview first!
python apply_safe_changes.py # Apply
# 5. Validate after each change
python -m pytest tests/
# 6. Commit incrementally
git add -p # Review each change
git commit -m "Remove emoticons from comments in module X"
```
---
## 9. DO NOT DO
**Never** use global find-replace on emoticons
**Never** remove emoticons from string comparisons without refactoring
**Never** change multiple files without testing between changes
**Never** assume an emoticon is decorative - verify context
**Never** proceed if tests fail after a change
---
## 10. Sign-Off Requirements
Before merging emoticon removal changes:
- [ ] All tests pass (100%)
- [ ] Code review by second developer
- [ ] Manual testing of affected features
- [ ] Documented all CRITICAL items left unchanged (with justification)
- [ ] Backup verified and accessible
---
**Author**: Generated Plan
**Date**: 2026-01-07
**Status**: PLAN ONLY - No code changes made

View File

@@ -56,7 +56,7 @@ Download from [releases](https://git.uuxo.net/UUXO/dbbackup/releases):
```bash ```bash
# Linux x86_64 # Linux x86_64
wget https://git.uuxo.net/UUXO/dbbackup/releases/download/v3.40.0/dbbackup-linux-amd64 wget https://git.uuxo.net/UUXO/dbbackup/releases/download/v3.42.1/dbbackup-linux-amd64
chmod +x dbbackup-linux-amd64 chmod +x dbbackup-linux-amd64
sudo mv dbbackup-linux-amd64 /usr/local/bin/dbbackup sudo mv dbbackup-linux-amd64 /usr/local/bin/dbbackup
``` ```
@@ -143,7 +143,7 @@ Backup Execution
Backup created: cluster_20251128_092928.tar.gz Backup created: cluster_20251128_092928.tar.gz
Size: 22.5 GB (compressed) Size: 22.5 GB (compressed)
Location: /u01/dba/dumps/ Location: /var/backups/postgres/
Databases: 7 Databases: 7
Checksum: SHA-256 verified Checksum: SHA-256 verified
``` ```

108
RELEASE_NOTES.md Normal file
View File

@@ -0,0 +1,108 @@
# v3.42.1 Release Notes
## What's New in v3.42.1
### Deduplication - Resistance is Futile
Content-defined chunking deduplication for space-efficient backups. Like restic/borgbackup but with **native database dump support**.
```bash
# First backup: 5MB stored
dbbackup dedup backup mydb.dump
# Second backup (modified): only 1.6KB new data stored!
# 100% deduplication ratio
dbbackup dedup backup mydb_modified.dump
```
#### Features
- **Gear Hash CDC** - Content-defined chunking with 92%+ overlap on shifted data
- **SHA-256 Content-Addressed** - Chunks stored by hash, automatic deduplication
- **AES-256-GCM Encryption** - Optional per-chunk encryption
- **Gzip Compression** - Optional compression (enabled by default)
- **SQLite Index** - Fast chunk lookups and statistics
#### Commands
```bash
dbbackup dedup backup <file> # Create deduplicated backup
dbbackup dedup backup <file> --encrypt # With AES-256-GCM encryption
dbbackup dedup restore <id> <output> # Restore from manifest
dbbackup dedup list # List all backups
dbbackup dedup stats # Show deduplication statistics
dbbackup dedup delete <id> # Delete a backup
dbbackup dedup gc # Garbage collect unreferenced chunks
```
#### Storage Structure
```
<backup-dir>/dedup/
chunks/ # Content-addressed chunk files
ab/cdef1234... # Sharded by first 2 chars of hash
manifests/ # JSON manifest per backup
chunks.db # SQLite index
```
### Also Included (from v3.41.x)
- **Systemd Integration** - One-command install with `dbbackup install`
- **Prometheus Metrics** - HTTP exporter on port 9399
- **Backup Catalog** - SQLite-based tracking of all backup operations
- **Prometheus Alerting Rules** - Added to SYSTEMD.md documentation
### Installation
#### Quick Install (Recommended)
```bash
# Download for your platform
curl -LO https://git.uuxo.net/UUXO/dbbackup/releases/download/v3.42.1/dbbackup-linux-amd64
# Install with systemd service
chmod +x dbbackup-linux-amd64
sudo ./dbbackup-linux-amd64 install --config /path/to/config.yaml
```
#### Available Binaries
| Platform | Architecture | Binary |
|----------|--------------|--------|
| Linux | amd64 | `dbbackup-linux-amd64` |
| Linux | arm64 | `dbbackup-linux-arm64` |
| macOS | Intel | `dbbackup-darwin-amd64` |
| macOS | Apple Silicon | `dbbackup-darwin-arm64` |
| FreeBSD | amd64 | `dbbackup-freebsd-amd64` |
### Systemd Commands
```bash
dbbackup install --config config.yaml # Install service + timer
dbbackup install --status # Check service status
dbbackup install --uninstall # Remove services
```
### Prometheus Metrics
Available at `http://localhost:9399/metrics`:
| Metric | Description |
|--------|-------------|
| `dbbackup_last_backup_timestamp` | Unix timestamp of last backup |
| `dbbackup_last_backup_success` | 1 if successful, 0 if failed |
| `dbbackup_last_backup_duration_seconds` | Duration of last backup |
| `dbbackup_last_backup_size_bytes` | Size of last backup |
| `dbbackup_backup_total` | Total number of backups |
| `dbbackup_backup_errors_total` | Total number of failed backups |
### Security Features
- Hardened systemd service with `ProtectSystem=strict`
- `NoNewPrivileges=true` prevents privilege escalation
- Dedicated `dbbackup` system user (optional)
- Credential files with restricted permissions
### Documentation
- [SYSTEMD.md](SYSTEMD.md) - Complete systemd installation guide
- [README.md](README.md) - Full documentation
- [CHANGELOG.md](CHANGELOG.md) - Version history
### Bug Fixes
- Fixed SQLite time parsing in dedup stats
- Fixed function name collision in cmd package
---
**Full Changelog**: https://git.uuxo.net/UUXO/dbbackup/compare/v3.41.1...v3.42.1

View File

@@ -481,6 +481,93 @@ sudo ufw status
sudo iptables -L -n | grep 9399 sudo iptables -L -n | grep 9399
``` ```
## Prometheus Alerting Rules
Add these alert rules to your Prometheus configuration for backup monitoring:
```yaml
# /etc/prometheus/rules/dbbackup.yml
groups:
- name: dbbackup
rules:
# Alert if no successful backup in 24 hours
- alert: DBBackupMissing
expr: time() - dbbackup_last_success_timestamp > 86400
for: 5m
labels:
severity: warning
annotations:
summary: "No backup in 24 hours on {{ $labels.instance }}"
description: "Database {{ $labels.database }} has not had a successful backup in over 24 hours."
# Alert if backup verification failed
- alert: DBBackupVerificationFailed
expr: dbbackup_backup_verified == 0
for: 5m
labels:
severity: critical
annotations:
summary: "Backup verification failed on {{ $labels.instance }}"
description: "Last backup for {{ $labels.database }} failed verification check."
# Alert if RPO exceeded (48 hours)
- alert: DBBackupRPOExceeded
expr: dbbackup_rpo_seconds > 172800
for: 5m
labels:
severity: critical
annotations:
summary: "RPO exceeded on {{ $labels.instance }}"
description: "Recovery Point Objective exceeded 48 hours for {{ $labels.database }}."
# Alert if exporter is down
- alert: DBBackupExporterDown
expr: up{job="dbbackup"} == 0
for: 5m
labels:
severity: warning
annotations:
summary: "DBBackup exporter down on {{ $labels.instance }}"
description: "Cannot scrape metrics from dbbackup-exporter."
# Alert if backup size dropped significantly (possible truncation)
- alert: DBBackupSizeAnomaly
expr: dbbackup_last_backup_size_bytes < (dbbackup_last_backup_size_bytes offset 1d) * 0.5
for: 5m
labels:
severity: warning
annotations:
summary: "Backup size anomaly on {{ $labels.instance }}"
description: "Backup size for {{ $labels.database }} dropped by more than 50%."
```
### Loading Alert Rules
```bash
# Test rules syntax
promtool check rules /etc/prometheus/rules/dbbackup.yml
# Reload Prometheus
sudo systemctl reload prometheus
# or via API:
curl -X POST http://localhost:9090/-/reload
```
## Catalog Sync for Existing Backups
If you have existing backups created before installing v3.41+, sync them to the catalog:
```bash
# Sync existing backups to catalog
dbbackup catalog sync /path/to/backup/directory --allow-root
# Verify catalog contents
dbbackup catalog list --allow-root
# Show statistics
dbbackup catalog stats --allow-root
```
## Uninstallation ## Uninstallation
### Using Installer ### Using Installer

View File

@@ -1,22 +1,11 @@
# DB Backup Tool - Pre-compiled Binaries # DB Backup Tool - Pre-compiled Binaries
## Download This directory contains pre-compiled binaries for the DB Backup Tool across multiple platforms and architectures.
**Binaries are distributed via GitHub Releases:**
📦 **https://github.com/PlusOne/dbbackup/releases**
Or build from source:
```bash
git clone https://github.com/PlusOne/dbbackup.git
cd dbbackup
./build_all.sh
```
## Build Information ## Build Information
- **Version**: 3.40.0 - **Version**: 3.42.1
- **Build Time**: 2026-01-07_10:55:47_UTC - **Build Time**: 2026-01-08_04:54:46_UTC
- **Git Commit**: 495ee31 - **Git Commit**: 627061c
## Recent Updates (v1.1.0) ## Recent Updates (v1.1.0)
- ✅ Fixed TUI progress display with line-by-line output - ✅ Fixed TUI progress display with line-by-line output

View File

@@ -15,7 +15,7 @@ echo "🔧 Using Go version: $GO_VERSION"
# Configuration # Configuration
APP_NAME="dbbackup" APP_NAME="dbbackup"
VERSION="3.40.0" VERSION="3.42.1"
BUILD_TIME=$(date -u '+%Y-%m-%d_%H:%M:%S_UTC') BUILD_TIME=$(date -u '+%Y-%m-%d_%H:%M:%S_UTC')
GIT_COMMIT=$(git rev-parse --short HEAD 2>/dev/null || echo "unknown") GIT_COMMIT=$(git rev-parse --short HEAD 2>/dev/null || echo "unknown")
BIN_DIR="bin" BIN_DIR="bin"

579
cmd/dedup.go Normal file
View File

@@ -0,0 +1,579 @@
package cmd
import (
"crypto/sha256"
"encoding/hex"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"time"
"dbbackup/internal/dedup"
"github.com/spf13/cobra"
)
var dedupCmd = &cobra.Command{
Use: "dedup",
Short: "Deduplicated backup operations",
Long: `Content-defined chunking deduplication for space-efficient backups.
Similar to restic/borgbackup but with native database dump support.
Features:
- Content-defined chunking (CDC) with Buzhash rolling hash
- SHA-256 content-addressed storage
- AES-256-GCM encryption (optional)
- Gzip compression (optional)
- SQLite index for fast lookups
Storage Structure:
<dedup-dir>/
chunks/ # Content-addressed chunk files
ab/cdef... # Sharded by first 2 chars of hash
manifests/ # JSON manifest per backup
chunks.db # SQLite index`,
}
var dedupBackupCmd = &cobra.Command{
Use: "backup <file>",
Short: "Create a deduplicated backup of a file",
Long: `Chunk a file using content-defined chunking and store deduplicated chunks.
Example:
dbbackup dedup backup /path/to/database.dump
dbbackup dedup backup mydb.sql --compress --encrypt`,
Args: cobra.ExactArgs(1),
RunE: runDedupBackup,
}
var dedupRestoreCmd = &cobra.Command{
Use: "restore <manifest-id> <output-file>",
Short: "Restore a backup from its manifest",
Long: `Reconstruct a file from its deduplicated chunks.
Example:
dbbackup dedup restore 2026-01-07_120000_mydb /tmp/restored.dump
dbbackup dedup list # to see available manifests`,
Args: cobra.ExactArgs(2),
RunE: runDedupRestore,
}
var dedupListCmd = &cobra.Command{
Use: "list",
Short: "List all deduplicated backups",
RunE: runDedupList,
}
var dedupStatsCmd = &cobra.Command{
Use: "stats",
Short: "Show deduplication statistics",
RunE: runDedupStats,
}
var dedupGCCmd = &cobra.Command{
Use: "gc",
Short: "Garbage collect unreferenced chunks",
Long: `Remove chunks that are no longer referenced by any manifest.
Run after deleting old backups to reclaim space.`,
RunE: runDedupGC,
}
var dedupDeleteCmd = &cobra.Command{
Use: "delete <manifest-id>",
Short: "Delete a backup manifest (chunks cleaned by gc)",
Args: cobra.ExactArgs(1),
RunE: runDedupDelete,
}
// Flags
var (
dedupDir string
dedupCompress bool
dedupEncrypt bool
dedupKey string
dedupName string
dedupDBType string
dedupDBName string
dedupDBHost string
)
func init() {
rootCmd.AddCommand(dedupCmd)
dedupCmd.AddCommand(dedupBackupCmd)
dedupCmd.AddCommand(dedupRestoreCmd)
dedupCmd.AddCommand(dedupListCmd)
dedupCmd.AddCommand(dedupStatsCmd)
dedupCmd.AddCommand(dedupGCCmd)
dedupCmd.AddCommand(dedupDeleteCmd)
// Global dedup flags
dedupCmd.PersistentFlags().StringVar(&dedupDir, "dedup-dir", "", "Dedup storage directory (default: $BACKUP_DIR/dedup)")
dedupCmd.PersistentFlags().BoolVar(&dedupCompress, "compress", true, "Compress chunks with gzip")
dedupCmd.PersistentFlags().BoolVar(&dedupEncrypt, "encrypt", false, "Encrypt chunks with AES-256-GCM")
dedupCmd.PersistentFlags().StringVar(&dedupKey, "key", "", "Encryption key (hex) or use DBBACKUP_DEDUP_KEY env")
// Backup-specific flags
dedupBackupCmd.Flags().StringVar(&dedupName, "name", "", "Optional backup name")
dedupBackupCmd.Flags().StringVar(&dedupDBType, "db-type", "", "Database type (postgres/mysql)")
dedupBackupCmd.Flags().StringVar(&dedupDBName, "db-name", "", "Database name")
dedupBackupCmd.Flags().StringVar(&dedupDBHost, "db-host", "", "Database host")
}
func getDedupDir() string {
if dedupDir != "" {
return dedupDir
}
if cfg != nil && cfg.BackupDir != "" {
return filepath.Join(cfg.BackupDir, "dedup")
}
return filepath.Join(os.Getenv("HOME"), "db_backups", "dedup")
}
func getEncryptionKey() string {
if dedupKey != "" {
return dedupKey
}
return os.Getenv("DBBACKUP_DEDUP_KEY")
}
func runDedupBackup(cmd *cobra.Command, args []string) error {
inputPath := args[0]
// Open input file
file, err := os.Open(inputPath)
if err != nil {
return fmt.Errorf("failed to open input file: %w", err)
}
defer file.Close()
info, err := file.Stat()
if err != nil {
return fmt.Errorf("failed to stat input file: %w", err)
}
// Setup dedup storage
basePath := getDedupDir()
encKey := ""
if dedupEncrypt {
encKey = getEncryptionKey()
if encKey == "" {
return fmt.Errorf("encryption enabled but no key provided (use --key or DBBACKUP_DEDUP_KEY)")
}
}
store, err := dedup.NewChunkStore(dedup.StoreConfig{
BasePath: basePath,
Compress: dedupCompress,
EncryptionKey: encKey,
})
if err != nil {
return fmt.Errorf("failed to open chunk store: %w", err)
}
manifestStore, err := dedup.NewManifestStore(basePath)
if err != nil {
return fmt.Errorf("failed to open manifest store: %w", err)
}
index, err := dedup.NewChunkIndex(basePath)
if err != nil {
return fmt.Errorf("failed to open chunk index: %w", err)
}
defer index.Close()
// Generate manifest ID
now := time.Now()
manifestID := now.Format("2006-01-02_150405")
if dedupDBName != "" {
manifestID += "_" + dedupDBName
} else {
base := filepath.Base(inputPath)
ext := filepath.Ext(base)
manifestID += "_" + strings.TrimSuffix(base, ext)
}
fmt.Printf("Creating deduplicated backup: %s\n", manifestID)
fmt.Printf("Input: %s (%s)\n", inputPath, formatBytes(info.Size()))
fmt.Printf("Store: %s\n", basePath)
// Hash the entire file for verification
file.Seek(0, 0)
h := sha256.New()
io.Copy(h, file)
fileHash := hex.EncodeToString(h.Sum(nil))
file.Seek(0, 0)
// Chunk the file
chunker := dedup.NewChunker(file, dedup.DefaultChunkerConfig())
var chunks []dedup.ChunkRef
var totalSize, storedSize int64
var chunkCount, newChunks int
startTime := time.Now()
for {
chunk, err := chunker.Next()
if err == io.EOF {
break
}
if err != nil {
return fmt.Errorf("chunking failed: %w", err)
}
chunkCount++
totalSize += int64(chunk.Length)
// Store chunk (deduplication happens here)
isNew, err := store.Put(chunk)
if err != nil {
return fmt.Errorf("failed to store chunk: %w", err)
}
if isNew {
newChunks++
storedSize += int64(chunk.Length)
// Record in index
index.AddChunk(chunk.Hash, chunk.Length, chunk.Length)
}
chunks = append(chunks, dedup.ChunkRef{
Hash: chunk.Hash,
Offset: chunk.Offset,
Length: chunk.Length,
})
// Progress
if chunkCount%1000 == 0 {
fmt.Printf("\r Processed %d chunks, %d new...", chunkCount, newChunks)
}
}
duration := time.Since(startTime)
// Calculate dedup ratio
dedupRatio := 0.0
if totalSize > 0 {
dedupRatio = 1.0 - float64(storedSize)/float64(totalSize)
}
// Create manifest
manifest := &dedup.Manifest{
ID: manifestID,
Name: dedupName,
CreatedAt: now,
DatabaseType: dedupDBType,
DatabaseName: dedupDBName,
DatabaseHost: dedupDBHost,
Chunks: chunks,
OriginalSize: totalSize,
StoredSize: storedSize,
ChunkCount: chunkCount,
NewChunks: newChunks,
DedupRatio: dedupRatio,
Encrypted: dedupEncrypt,
Compressed: dedupCompress,
SHA256: fileHash,
}
if err := manifestStore.Save(manifest); err != nil {
return fmt.Errorf("failed to save manifest: %w", err)
}
if err := index.AddManifest(manifest); err != nil {
log.Warn("Failed to index manifest", "error", err)
}
fmt.Printf("\r \r")
fmt.Printf("\nBackup complete!\n")
fmt.Printf(" Manifest: %s\n", manifestID)
fmt.Printf(" Chunks: %d total, %d new\n", chunkCount, newChunks)
fmt.Printf(" Original: %s\n", formatBytes(totalSize))
fmt.Printf(" Stored: %s (new data)\n", formatBytes(storedSize))
fmt.Printf(" Dedup ratio: %.1f%%\n", dedupRatio*100)
fmt.Printf(" Duration: %s\n", duration.Round(time.Millisecond))
fmt.Printf(" Throughput: %s/s\n", formatBytes(int64(float64(totalSize)/duration.Seconds())))
return nil
}
func runDedupRestore(cmd *cobra.Command, args []string) error {
manifestID := args[0]
outputPath := args[1]
basePath := getDedupDir()
encKey := ""
if dedupEncrypt {
encKey = getEncryptionKey()
}
store, err := dedup.NewChunkStore(dedup.StoreConfig{
BasePath: basePath,
Compress: dedupCompress,
EncryptionKey: encKey,
})
if err != nil {
return fmt.Errorf("failed to open chunk store: %w", err)
}
manifestStore, err := dedup.NewManifestStore(basePath)
if err != nil {
return fmt.Errorf("failed to open manifest store: %w", err)
}
manifest, err := manifestStore.Load(manifestID)
if err != nil {
return fmt.Errorf("failed to load manifest: %w", err)
}
fmt.Printf("Restoring backup: %s\n", manifestID)
fmt.Printf(" Created: %s\n", manifest.CreatedAt.Format(time.RFC3339))
fmt.Printf(" Size: %s\n", formatBytes(manifest.OriginalSize))
fmt.Printf(" Chunks: %d\n", manifest.ChunkCount)
// Create output file
outFile, err := os.Create(outputPath)
if err != nil {
return fmt.Errorf("failed to create output file: %w", err)
}
defer outFile.Close()
h := sha256.New()
writer := io.MultiWriter(outFile, h)
startTime := time.Now()
for i, ref := range manifest.Chunks {
chunk, err := store.Get(ref.Hash)
if err != nil {
return fmt.Errorf("failed to read chunk %d (%s): %w", i, ref.Hash[:8], err)
}
if _, err := writer.Write(chunk.Data); err != nil {
return fmt.Errorf("failed to write chunk %d: %w", i, err)
}
if (i+1)%1000 == 0 {
fmt.Printf("\r Restored %d/%d chunks...", i+1, manifest.ChunkCount)
}
}
duration := time.Since(startTime)
restoredHash := hex.EncodeToString(h.Sum(nil))
fmt.Printf("\r \r")
fmt.Printf("\nRestore complete!\n")
fmt.Printf(" Output: %s\n", outputPath)
fmt.Printf(" Duration: %s\n", duration.Round(time.Millisecond))
// Verify hash
if manifest.SHA256 != "" {
if restoredHash == manifest.SHA256 {
fmt.Printf(" Verification: ✓ SHA-256 matches\n")
} else {
fmt.Printf(" Verification: ✗ SHA-256 MISMATCH!\n")
fmt.Printf(" Expected: %s\n", manifest.SHA256)
fmt.Printf(" Got: %s\n", restoredHash)
return fmt.Errorf("integrity verification failed")
}
}
return nil
}
func runDedupList(cmd *cobra.Command, args []string) error {
basePath := getDedupDir()
manifestStore, err := dedup.NewManifestStore(basePath)
if err != nil {
return fmt.Errorf("failed to open manifest store: %w", err)
}
manifests, err := manifestStore.ListAll()
if err != nil {
return fmt.Errorf("failed to list manifests: %w", err)
}
if len(manifests) == 0 {
fmt.Println("No deduplicated backups found.")
fmt.Printf("Store: %s\n", basePath)
return nil
}
fmt.Printf("Deduplicated Backups (%s)\n\n", basePath)
fmt.Printf("%-30s %-12s %-10s %-10s %s\n", "ID", "SIZE", "DEDUP", "CHUNKS", "CREATED")
fmt.Println(strings.Repeat("-", 80))
for _, m := range manifests {
fmt.Printf("%-30s %-12s %-10.1f%% %-10d %s\n",
truncateStr(m.ID, 30),
formatBytes(m.OriginalSize),
m.DedupRatio*100,
m.ChunkCount,
m.CreatedAt.Format("2006-01-02 15:04"),
)
}
return nil
}
func runDedupStats(cmd *cobra.Command, args []string) error {
basePath := getDedupDir()
index, err := dedup.NewChunkIndex(basePath)
if err != nil {
return fmt.Errorf("failed to open chunk index: %w", err)
}
defer index.Close()
stats, err := index.Stats()
if err != nil {
return fmt.Errorf("failed to get stats: %w", err)
}
store, err := dedup.NewChunkStore(dedup.StoreConfig{BasePath: basePath})
if err != nil {
return fmt.Errorf("failed to open chunk store: %w", err)
}
storeStats, err := store.Stats()
if err != nil {
log.Warn("Failed to get store stats", "error", err)
}
fmt.Printf("Deduplication Statistics\n")
fmt.Printf("========================\n\n")
fmt.Printf("Store: %s\n", basePath)
fmt.Printf("Manifests: %d\n", stats.TotalManifests)
fmt.Printf("Unique chunks: %d\n", stats.TotalChunks)
fmt.Printf("Total raw size: %s\n", formatBytes(stats.TotalSizeRaw))
fmt.Printf("Stored size: %s\n", formatBytes(stats.TotalSizeStored))
fmt.Printf("Dedup ratio: %.1f%%\n", stats.DedupRatio*100)
fmt.Printf("Space saved: %s\n", formatBytes(stats.TotalSizeRaw-stats.TotalSizeStored))
if storeStats != nil {
fmt.Printf("Disk usage: %s\n", formatBytes(storeStats.TotalSize))
fmt.Printf("Directories: %d\n", storeStats.Directories)
}
return nil
}
func runDedupGC(cmd *cobra.Command, args []string) error {
basePath := getDedupDir()
index, err := dedup.NewChunkIndex(basePath)
if err != nil {
return fmt.Errorf("failed to open chunk index: %w", err)
}
defer index.Close()
store, err := dedup.NewChunkStore(dedup.StoreConfig{
BasePath: basePath,
Compress: dedupCompress,
})
if err != nil {
return fmt.Errorf("failed to open chunk store: %w", err)
}
// Find orphaned chunks
orphans, err := index.ListOrphanedChunks()
if err != nil {
return fmt.Errorf("failed to find orphaned chunks: %w", err)
}
if len(orphans) == 0 {
fmt.Println("No orphaned chunks to clean up.")
return nil
}
fmt.Printf("Found %d orphaned chunks\n", len(orphans))
var freed int64
for _, hash := range orphans {
if meta, _ := index.GetChunk(hash); meta != nil {
freed += meta.SizeStored
}
if err := store.Delete(hash); err != nil {
log.Warn("Failed to delete chunk", "hash", hash[:8], "error", err)
continue
}
if err := index.RemoveChunk(hash); err != nil {
log.Warn("Failed to remove chunk from index", "hash", hash[:8], "error", err)
}
}
fmt.Printf("Deleted %d chunks, freed %s\n", len(orphans), formatBytes(freed))
// Vacuum the index
if err := index.Vacuum(); err != nil {
log.Warn("Failed to vacuum index", "error", err)
}
return nil
}
func runDedupDelete(cmd *cobra.Command, args []string) error {
manifestID := args[0]
basePath := getDedupDir()
manifestStore, err := dedup.NewManifestStore(basePath)
if err != nil {
return fmt.Errorf("failed to open manifest store: %w", err)
}
index, err := dedup.NewChunkIndex(basePath)
if err != nil {
return fmt.Errorf("failed to open chunk index: %w", err)
}
defer index.Close()
// Load manifest to decrement chunk refs
manifest, err := manifestStore.Load(manifestID)
if err != nil {
return fmt.Errorf("failed to load manifest: %w", err)
}
// Decrement reference counts
for _, ref := range manifest.Chunks {
index.DecrementRef(ref.Hash)
}
// Delete manifest
if err := manifestStore.Delete(manifestID); err != nil {
return fmt.Errorf("failed to delete manifest: %w", err)
}
if err := index.RemoveManifest(manifestID); err != nil {
log.Warn("Failed to remove manifest from index", "error", err)
}
fmt.Printf("Deleted backup: %s\n", manifestID)
fmt.Println("Run 'dbbackup dedup gc' to reclaim space from unreferenced chunks.")
return nil
}
// Helper functions
func formatBytes(b int64) string {
const unit = 1024
if b < unit {
return fmt.Sprintf("%d B", b)
}
div, exp := int64(unit), 0
for n := b / unit; n >= unit; n /= unit {
div *= unit
exp++
}
return fmt.Sprintf("%.1f %cB", float64(b)/float64(div), "KMGTPE"[exp])
}
func truncateStr(s string, max int) string {
if len(s) <= max {
return s
}
return s[:max-3] + "..."
}

View File

@@ -203,9 +203,17 @@ func runMigrateCluster(cmd *cobra.Command, args []string) error {
migrateTargetUser = migrateSourceUser migrateTargetUser = migrateSourceUser
} }
// Create source config first to get WorkDir
sourceCfg := config.New()
sourceCfg.Host = migrateSourceHost
sourceCfg.Port = migrateSourcePort
sourceCfg.User = migrateSourceUser
sourceCfg.Password = migrateSourcePassword
workdir := migrateWorkdir workdir := migrateWorkdir
if workdir == "" { if workdir == "" {
workdir = filepath.Join(os.TempDir(), "dbbackup-migrate") // Use WorkDir from config if available
workdir = filepath.Join(sourceCfg.GetEffectiveWorkDir(), "dbbackup-migrate")
} }
// Create working directory // Create working directory
@@ -213,12 +221,7 @@ func runMigrateCluster(cmd *cobra.Command, args []string) error {
return fmt.Errorf("failed to create working directory: %w", err) return fmt.Errorf("failed to create working directory: %w", err)
} }
// Create source config // Update source config with remaining settings
sourceCfg := config.New()
sourceCfg.Host = migrateSourceHost
sourceCfg.Port = migrateSourcePort
sourceCfg.User = migrateSourceUser
sourceCfg.Password = migrateSourcePassword
sourceCfg.SSLMode = migrateSourceSSLMode sourceCfg.SSLMode = migrateSourceSSLMode
sourceCfg.Database = "postgres" // Default connection database sourceCfg.Database = "postgres" // Default connection database
sourceCfg.DatabaseType = cfg.DatabaseType sourceCfg.DatabaseType = cfg.DatabaseType
@@ -342,7 +345,8 @@ func runMigrateSingle(cmd *cobra.Command, args []string) error {
workdir := migrateWorkdir workdir := migrateWorkdir
if workdir == "" { if workdir == "" {
workdir = filepath.Join(os.TempDir(), "dbbackup-migrate") tempCfg := config.New()
workdir = filepath.Join(tempCfg.GetEffectiveWorkDir(), "dbbackup-migrate")
} }
// Create working directory // Create working directory

View File

@@ -350,10 +350,11 @@ func runRestoreDiagnose(cmd *cobra.Command, args []string) error {
format := restore.DetectArchiveFormat(archivePath) format := restore.DetectArchiveFormat(archivePath)
if format.IsClusterBackup() && diagnoseDeep { if format.IsClusterBackup() && diagnoseDeep {
// Create temp directory for extraction // Create temp directory for extraction in configured WorkDir
tempDir, err := os.MkdirTemp("", "dbbackup-diagnose-*") workDir := cfg.GetEffectiveWorkDir()
tempDir, err := os.MkdirTemp(workDir, "dbbackup-diagnose-*")
if err != nil { if err != nil {
return fmt.Errorf("failed to create temp directory: %w", err) return fmt.Errorf("failed to create temp directory in %s: %w", workDir, err)
} }
if !diagnoseKeepTemp { if !diagnoseKeepTemp {
@@ -830,10 +831,11 @@ func runRestoreCluster(cmd *cobra.Command, args []string) error {
if restoreDiagnose { if restoreDiagnose {
log.Info("🔍 Running pre-restore diagnosis...") log.Info("🔍 Running pre-restore diagnosis...")
// Create temp directory for extraction // Create temp directory for extraction in configured WorkDir
diagTempDir, err := os.MkdirTemp("", "dbbackup-diagnose-*") workDir := cfg.GetEffectiveWorkDir()
diagTempDir, err := os.MkdirTemp(workDir, "dbbackup-diagnose-*")
if err != nil { if err != nil {
return fmt.Errorf("failed to create temp directory for diagnosis: %w", err) return fmt.Errorf("failed to create temp directory for diagnosis in %s: %w", workDir, err)
} }
defer os.RemoveAll(diagTempDir) defer os.RemoveAll(diagTempDir)

View File

@@ -2,12 +2,14 @@ package auth
import ( import (
"bufio" "bufio"
"context"
"fmt" "fmt"
"os" "os"
"os/exec" "os/exec"
"path/filepath" "path/filepath"
"strconv" "strconv"
"strings" "strings"
"time"
"dbbackup/internal/config" "dbbackup/internal/config"
) )
@@ -69,7 +71,10 @@ func checkPgHbaConf(user string) AuthMethod {
// findHbaFileViaPostgres asks PostgreSQL for the hba_file location // findHbaFileViaPostgres asks PostgreSQL for the hba_file location
func findHbaFileViaPostgres() string { func findHbaFileViaPostgres() string {
cmd := exec.Command("psql", "-U", "postgres", "-t", "-c", "SHOW hba_file;") ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cmd := exec.CommandContext(ctx, "psql", "-U", "postgres", "-t", "-c", "SHOW hba_file;")
output, err := cmd.Output() output, err := cmd.Output()
if err != nil { if err != nil {
return "" return ""
@@ -82,8 +87,11 @@ func parsePgHbaConf(path string, user string) AuthMethod {
// Try with sudo if we can't read directly // Try with sudo if we can't read directly
file, err := os.Open(path) file, err := os.Open(path)
if err != nil { if err != nil {
// Try with sudo // Try with sudo (with timeout)
cmd := exec.Command("sudo", "cat", path) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cmd := exec.CommandContext(ctx, "sudo", "cat", path)
output, err := cmd.Output() output, err := cmd.Output()
if err != nil { if err != nil {
return AuthUnknown return AuthUnknown

View File

@@ -87,20 +87,46 @@ func IsBackupEncrypted(backupPath string) bool {
return meta.Encrypted return meta.Encrypted
} }
// Fallback: check if file starts with encryption nonce // No metadata found - check file format to determine if encrypted
// Known unencrypted formats have specific magic bytes:
// - Gzip: 1f 8b
// - PGDMP (PostgreSQL custom): 50 47 44 4d 50 (PGDMP)
// - Plain SQL: starts with text (-- or SET or CREATE)
// - Tar: 75 73 74 61 72 (ustar) at offset 257
//
// If file doesn't match any known format, it MIGHT be encrypted,
// but we return false to avoid false positives. User must provide
// metadata file or use --encrypt flag explicitly.
file, err := os.Open(backupPath) file, err := os.Open(backupPath)
if err != nil { if err != nil {
return false return false
} }
defer file.Close() defer file.Close()
// Try to read nonce - if it succeeds, likely encrypted header := make([]byte, 6)
nonce := make([]byte, crypto.NonceSize) if n, err := file.Read(header); err != nil || n < 2 {
if n, err := file.Read(nonce); err != nil || n != crypto.NonceSize {
return false return false
} }
return true // Check for known unencrypted formats
// Gzip magic: 1f 8b
if header[0] == 0x1f && header[1] == 0x8b {
return false // Gzip compressed - not encrypted
}
// PGDMP magic (PostgreSQL custom format)
if len(header) >= 5 && string(header[:5]) == "PGDMP" {
return false // PostgreSQL custom dump - not encrypted
}
// Plain text SQL (starts with --, SET, CREATE, etc.)
if header[0] == '-' || header[0] == 'S' || header[0] == 'C' || header[0] == '/' {
return false // Plain text SQL - not encrypted
}
// Without metadata, we cannot reliably determine encryption status
// Return false to avoid blocking restores with false positives
return false
} }
// DecryptBackupFile decrypts an encrypted backup file // DecryptBackupFile decrypts an encrypted backup file

View File

@@ -443,6 +443,14 @@ func (e *Engine) BackupCluster(ctx context.Context) error {
defer wg.Done() defer wg.Done()
defer func() { <-semaphore }() // Release defer func() { <-semaphore }() // Release
// Panic recovery - prevent one database failure from crashing entire cluster backup
defer func() {
if r := recover(); r != nil {
e.log.Error("Panic in database backup goroutine", "database", name, "panic", r)
atomic.AddInt32(&failCount, 1)
}
}()
// Check for cancellation at start of goroutine // Check for cancellation at start of goroutine
select { select {
case <-ctx.Done(): case <-ctx.Done():
@@ -502,26 +510,10 @@ func (e *Engine) BackupCluster(ctx context.Context) error {
cmd := e.db.BuildBackupCommand(name, dumpFile, options) cmd := e.db.BuildBackupCommand(name, dumpFile, options)
// Calculate timeout based on database size: // NO TIMEOUT for individual database backups
// - Minimum 2 hours for small databases // Large databases with large objects can take many hours
// - Add 1 hour per 20GB for large databases // The parent context handles cancellation if needed
// - This allows ~69GB database to take up to 5+ hours err := e.executeCommand(ctx, cmd, dumpFile)
timeout := 2 * time.Hour
if size, err := e.db.GetDatabaseSize(ctx, name); err == nil {
sizeGB := size / (1024 * 1024 * 1024)
if sizeGB > 20 {
extraHours := (sizeGB / 20) + 1
timeout = time.Duration(2+extraHours) * time.Hour
mu.Lock()
e.printf(" Extended timeout: %v (for %dGB database)\n", timeout, sizeGB)
mu.Unlock()
}
}
dbCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
err := e.executeCommand(dbCtx, cmd, dumpFile)
cancel()
if err != nil { if err != nil {
e.log.Warn("Failed to backup database", "database", name, "error", err) e.log.Warn("Failed to backup database", "database", name, "error", err)
@@ -614,12 +606,36 @@ func (e *Engine) executeCommandWithProgress(ctx context.Context, cmdArgs []strin
return fmt.Errorf("failed to start command: %w", err) return fmt.Errorf("failed to start command: %w", err)
} }
// Monitor progress via stderr // Monitor progress via stderr in goroutine
go e.monitorCommandProgress(stderr, tracker) stderrDone := make(chan struct{})
go func() {
defer close(stderrDone)
e.monitorCommandProgress(stderr, tracker)
}()
// Wait for command to complete // Wait for command to complete with proper context handling
if err := cmd.Wait(); err != nil { cmdDone := make(chan error, 1)
return fmt.Errorf("backup command failed: %w", err) go func() {
cmdDone <- cmd.Wait()
}()
var cmdErr error
select {
case cmdErr = <-cmdDone:
// Command completed (success or failure)
case <-ctx.Done():
// Context cancelled - kill process to unblock
e.log.Warn("Backup cancelled - killing process")
cmd.Process.Kill()
<-cmdDone // Wait for goroutine to finish
cmdErr = ctx.Err()
}
// Wait for stderr reader to finish
<-stderrDone
if cmdErr != nil {
return fmt.Errorf("backup command failed: %w", cmdErr)
} }
return nil return nil
@@ -696,8 +712,12 @@ func (e *Engine) executeMySQLWithProgressAndCompression(ctx context.Context, cmd
return fmt.Errorf("failed to get stderr pipe: %w", err) return fmt.Errorf("failed to get stderr pipe: %w", err)
} }
// Start monitoring progress // Start monitoring progress in goroutine
go e.monitorCommandProgress(stderr, tracker) stderrDone := make(chan struct{})
go func() {
defer close(stderrDone)
e.monitorCommandProgress(stderr, tracker)
}()
// Start both commands // Start both commands
if err := gzipCmd.Start(); err != nil { if err := gzipCmd.Start(); err != nil {
@@ -705,20 +725,41 @@ func (e *Engine) executeMySQLWithProgressAndCompression(ctx context.Context, cmd
} }
if err := dumpCmd.Start(); err != nil { if err := dumpCmd.Start(); err != nil {
gzipCmd.Process.Kill()
return fmt.Errorf("failed to start mysqldump: %w", err) return fmt.Errorf("failed to start mysqldump: %w", err)
} }
// Wait for mysqldump to complete // Wait for mysqldump with context handling
if err := dumpCmd.Wait(); err != nil { dumpDone := make(chan error, 1)
return fmt.Errorf("mysqldump failed: %w", err) go func() {
dumpDone <- dumpCmd.Wait()
}()
var dumpErr error
select {
case dumpErr = <-dumpDone:
// mysqldump completed
case <-ctx.Done():
e.log.Warn("Backup cancelled - killing mysqldump")
dumpCmd.Process.Kill()
gzipCmd.Process.Kill()
<-dumpDone
return ctx.Err()
} }
// Wait for stderr reader
<-stderrDone
// Close pipe and wait for gzip // Close pipe and wait for gzip
pipe.Close() pipe.Close()
if err := gzipCmd.Wait(); err != nil { if err := gzipCmd.Wait(); err != nil {
return fmt.Errorf("gzip failed: %w", err) return fmt.Errorf("gzip failed: %w", err)
} }
if dumpErr != nil {
return fmt.Errorf("mysqldump failed: %w", dumpErr)
}
return nil return nil
} }
@@ -749,19 +790,45 @@ func (e *Engine) executeMySQLWithCompression(ctx context.Context, cmdArgs []stri
gzipCmd.Stdin = stdin gzipCmd.Stdin = stdin
gzipCmd.Stdout = outFile gzipCmd.Stdout = outFile
// Start both commands // Start gzip first
if err := gzipCmd.Start(); err != nil { if err := gzipCmd.Start(); err != nil {
return fmt.Errorf("failed to start gzip: %w", err) return fmt.Errorf("failed to start gzip: %w", err)
} }
if err := dumpCmd.Run(); err != nil { // Start mysqldump
return fmt.Errorf("mysqldump failed: %w", err) if err := dumpCmd.Start(); err != nil {
gzipCmd.Process.Kill()
return fmt.Errorf("failed to start mysqldump: %w", err)
} }
// Wait for mysqldump with context handling
dumpDone := make(chan error, 1)
go func() {
dumpDone <- dumpCmd.Wait()
}()
var dumpErr error
select {
case dumpErr = <-dumpDone:
// mysqldump completed
case <-ctx.Done():
e.log.Warn("Backup cancelled - killing mysqldump")
dumpCmd.Process.Kill()
gzipCmd.Process.Kill()
<-dumpDone
return ctx.Err()
}
// Close pipe and wait for gzip
stdin.Close()
if err := gzipCmd.Wait(); err != nil { if err := gzipCmd.Wait(); err != nil {
return fmt.Errorf("gzip failed: %w", err) return fmt.Errorf("gzip failed: %w", err)
} }
if dumpErr != nil {
return fmt.Errorf("mysqldump failed: %w", dumpErr)
}
return nil return nil
} }
@@ -898,15 +965,46 @@ func (e *Engine) createArchive(ctx context.Context, sourceDir, outputFile string
goto regularTar goto regularTar
} }
// Wait for tar to finish // Wait for tar with proper context handling
if err := cmd.Wait(); err != nil { tarDone := make(chan error, 1)
go func() {
tarDone <- cmd.Wait()
}()
var tarErr error
select {
case tarErr = <-tarDone:
// tar completed
case <-ctx.Done():
e.log.Warn("Archive creation cancelled - killing processes")
cmd.Process.Kill()
pigzCmd.Process.Kill() pigzCmd.Process.Kill()
return fmt.Errorf("tar failed: %w", err) <-tarDone
return ctx.Err()
} }
// Wait for pigz to finish if tarErr != nil {
if err := pigzCmd.Wait(); err != nil { pigzCmd.Process.Kill()
return fmt.Errorf("pigz compression failed: %w", err) return fmt.Errorf("tar failed: %w", tarErr)
}
// Wait for pigz with proper context handling
pigzDone := make(chan error, 1)
go func() {
pigzDone <- pigzCmd.Wait()
}()
var pigzErr error
select {
case pigzErr = <-pigzDone:
case <-ctx.Done():
pigzCmd.Process.Kill()
<-pigzDone
return ctx.Err()
}
if pigzErr != nil {
return fmt.Errorf("pigz compression failed: %w", pigzErr)
} }
return nil return nil
} }
@@ -1251,8 +1349,10 @@ func (e *Engine) executeCommand(ctx context.Context, cmdArgs []string, outputFil
return fmt.Errorf("failed to start backup command: %w", err) return fmt.Errorf("failed to start backup command: %w", err)
} }
// Stream stderr output (don't buffer it all in memory) // Stream stderr output in goroutine (don't buffer it all in memory)
stderrDone := make(chan struct{})
go func() { go func() {
defer close(stderrDone)
scanner := bufio.NewScanner(stderr) scanner := bufio.NewScanner(stderr)
scanner.Buffer(make([]byte, 64*1024), 1024*1024) // 1MB max line size scanner.Buffer(make([]byte, 64*1024), 1024*1024) // 1MB max line size
for scanner.Scan() { for scanner.Scan() {
@@ -1263,10 +1363,30 @@ func (e *Engine) executeCommand(ctx context.Context, cmdArgs []string, outputFil
} }
}() }()
// Wait for command to complete // Wait for command to complete with proper context handling
if err := cmd.Wait(); err != nil { cmdDone := make(chan error, 1)
e.log.Error("Backup command failed", "error", err, "database", filepath.Base(outputFile)) go func() {
return fmt.Errorf("backup command failed: %w", err) cmdDone <- cmd.Wait()
}()
var cmdErr error
select {
case cmdErr = <-cmdDone:
// Command completed (success or failure)
case <-ctx.Done():
// Context cancelled - kill process to unblock
e.log.Warn("Backup cancelled - killing pg_dump process")
cmd.Process.Kill()
<-cmdDone // Wait for goroutine to finish
cmdErr = ctx.Err()
}
// Wait for stderr reader to finish
<-stderrDone
if cmdErr != nil {
e.log.Error("Backup command failed", "error", cmdErr, "database", filepath.Base(outputFile))
return fmt.Errorf("backup command failed: %w", cmdErr)
} }
return nil return nil

View File

@@ -12,6 +12,7 @@ import (
"strings" "strings"
"sync" "sync"
"syscall" "syscall"
"time"
"dbbackup/internal/logger" "dbbackup/internal/logger"
) )
@@ -116,8 +117,11 @@ func KillOrphanedProcesses(log logger.Logger) error {
// findProcessesByName returns PIDs of processes matching the given name // findProcessesByName returns PIDs of processes matching the given name
func findProcessesByName(name string, excludePID int) ([]int, error) { func findProcessesByName(name string, excludePID int) ([]int, error) {
// Use pgrep for efficient process searching // Use pgrep for efficient process searching with timeout
cmd := exec.Command("pgrep", "-x", name) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cmd := exec.CommandContext(ctx, "pgrep", "-x", name)
output, err := cmd.Output() output, err := cmd.Output()
if err != nil { if err != nil {
// Exit code 1 means no processes found (not an error) // Exit code 1 means no processes found (not an error)

View File

@@ -217,14 +217,17 @@ func New() *Config {
SingleDBName: getEnvString("SINGLE_DB_NAME", ""), SingleDBName: getEnvString("SINGLE_DB_NAME", ""),
RestoreDBName: getEnvString("RESTORE_DB_NAME", ""), RestoreDBName: getEnvString("RESTORE_DB_NAME", ""),
// Timeouts // Timeouts - default 24 hours (1440 min) to handle very large databases with large objects
ClusterTimeoutMinutes: getEnvInt("CLUSTER_TIMEOUT_MIN", 240), ClusterTimeoutMinutes: getEnvInt("CLUSTER_TIMEOUT_MIN", 1440),
// Cluster parallelism (default: 2 concurrent operations for faster cluster backup/restore) // Cluster parallelism (default: 2 concurrent operations for faster cluster backup/restore)
ClusterParallelism: getEnvInt("CLUSTER_PARALLELISM", 2), ClusterParallelism: getEnvInt("CLUSTER_PARALLELISM", 2),
// Working directory for large operations (default: system temp)
WorkDir: getEnvString("WORK_DIR", ""),
// Swap file management // Swap file management
SwapFilePath: getEnvString("SWAP_FILE_PATH", "/tmp/dbbackup_swap"), SwapFilePath: "", // Will be set after WorkDir is initialized
SwapFileSizeGB: getEnvInt("SWAP_FILE_SIZE_GB", 0), // 0 = disabled by default SwapFileSizeGB: getEnvInt("SWAP_FILE_SIZE_GB", 0), // 0 = disabled by default
AutoSwap: getEnvBool("AUTO_SWAP", false), AutoSwap: getEnvBool("AUTO_SWAP", false),
@@ -264,6 +267,13 @@ func New() *Config {
cfg.SSLMode = "prefer" cfg.SSLMode = "prefer"
} }
// Set SwapFilePath using WorkDir if not explicitly set via env var
if envSwap := os.Getenv("SWAP_FILE_PATH"); envSwap != "" {
cfg.SwapFilePath = envSwap
} else {
cfg.SwapFilePath = filepath.Join(cfg.GetEffectiveWorkDir(), "dbbackup_swap")
}
return cfg return cfg
} }
@@ -499,6 +509,14 @@ func GetCurrentOSUser() string {
return getCurrentUser() return getCurrentUser()
} }
// GetEffectiveWorkDir returns the configured WorkDir or system temp as fallback
func (c *Config) GetEffectiveWorkDir() string {
if c.WorkDir != "" {
return c.WorkDir
}
return os.TempDir()
}
func getDefaultBackupDir() string { func getDefaultBackupDir() string {
// Try to create a sensible default backup directory // Try to create a sensible default backup directory
homeDir, _ := os.UserHomeDir() homeDir, _ := os.UserHomeDir()
@@ -516,7 +534,7 @@ func getDefaultBackupDir() string {
return "/var/lib/pgsql/pg_backups" return "/var/lib/pgsql/pg_backups"
} }
return "/tmp/db_backups" return filepath.Join(os.TempDir(), "db_backups")
} }
// CPU-related helper functions // CPU-related helper functions

View File

@@ -28,8 +28,9 @@ type LocalConfig struct {
DumpJobs int DumpJobs int
// Performance settings // Performance settings
CPUWorkload string CPUWorkload string
MaxCores int MaxCores int
ClusterTimeout int // Cluster operation timeout in minutes (default: 1440 = 24 hours)
// Security settings // Security settings
RetentionDays int RetentionDays int
@@ -121,6 +122,10 @@ func LoadLocalConfig() (*LocalConfig, error) {
if mc, err := strconv.Atoi(value); err == nil { if mc, err := strconv.Atoi(value); err == nil {
cfg.MaxCores = mc cfg.MaxCores = mc
} }
case "cluster_timeout":
if ct, err := strconv.Atoi(value); err == nil {
cfg.ClusterTimeout = ct
}
} }
case "security": case "security":
switch key { switch key {
@@ -199,6 +204,9 @@ func SaveLocalConfig(cfg *LocalConfig) error {
if cfg.MaxCores != 0 { if cfg.MaxCores != 0 {
sb.WriteString(fmt.Sprintf("max_cores = %d\n", cfg.MaxCores)) sb.WriteString(fmt.Sprintf("max_cores = %d\n", cfg.MaxCores))
} }
if cfg.ClusterTimeout != 0 {
sb.WriteString(fmt.Sprintf("cluster_timeout = %d\n", cfg.ClusterTimeout))
}
sb.WriteString("\n") sb.WriteString("\n")
// Security section // Security section
@@ -268,6 +276,10 @@ func ApplyLocalConfig(cfg *Config, local *LocalConfig) {
if local.MaxCores != 0 { if local.MaxCores != 0 {
cfg.MaxCores = local.MaxCores cfg.MaxCores = local.MaxCores
} }
// Apply cluster timeout from config file (overrides default)
if local.ClusterTimeout != 0 {
cfg.ClusterTimeoutMinutes = local.ClusterTimeout
}
if cfg.RetentionDays == 30 && local.RetentionDays != 0 { if cfg.RetentionDays == 30 && local.RetentionDays != 0 {
cfg.RetentionDays = local.RetentionDays cfg.RetentionDays = local.RetentionDays
} }
@@ -282,21 +294,22 @@ func ApplyLocalConfig(cfg *Config, local *LocalConfig) {
// ConfigFromConfig creates a LocalConfig from a Config // ConfigFromConfig creates a LocalConfig from a Config
func ConfigFromConfig(cfg *Config) *LocalConfig { func ConfigFromConfig(cfg *Config) *LocalConfig {
return &LocalConfig{ return &LocalConfig{
DBType: cfg.DatabaseType, DBType: cfg.DatabaseType,
Host: cfg.Host, Host: cfg.Host,
Port: cfg.Port, Port: cfg.Port,
User: cfg.User, User: cfg.User,
Database: cfg.Database, Database: cfg.Database,
SSLMode: cfg.SSLMode, SSLMode: cfg.SSLMode,
BackupDir: cfg.BackupDir, BackupDir: cfg.BackupDir,
WorkDir: cfg.WorkDir, WorkDir: cfg.WorkDir,
Compression: cfg.CompressionLevel, Compression: cfg.CompressionLevel,
Jobs: cfg.Jobs, Jobs: cfg.Jobs,
DumpJobs: cfg.DumpJobs, DumpJobs: cfg.DumpJobs,
CPUWorkload: cfg.CPUWorkloadType, CPUWorkload: cfg.CPUWorkloadType,
MaxCores: cfg.MaxCores, MaxCores: cfg.MaxCores,
RetentionDays: cfg.RetentionDays, ClusterTimeout: cfg.ClusterTimeoutMinutes,
MinBackups: cfg.MinBackups, RetentionDays: cfg.RetentionDays,
MaxRetries: cfg.MaxRetries, MinBackups: cfg.MinBackups,
MaxRetries: cfg.MaxRetries,
} }
} }

228
internal/dedup/chunker.go Normal file
View File

@@ -0,0 +1,228 @@
// Package dedup provides content-defined chunking and deduplication
// for database backups, similar to restic/borgbackup but with native
// database dump support.
package dedup
import (
"crypto/sha256"
"encoding/hex"
"io"
)
// Chunker constants for content-defined chunking
const (
// DefaultMinChunkSize is the minimum chunk size (4KB)
DefaultMinChunkSize = 4 * 1024
// DefaultAvgChunkSize is the target average chunk size (8KB)
DefaultAvgChunkSize = 8 * 1024
// DefaultMaxChunkSize is the maximum chunk size (32KB)
DefaultMaxChunkSize = 32 * 1024
// WindowSize for the rolling hash
WindowSize = 48
// ChunkMask determines average chunk size
// For 8KB average: we look for hash % 8192 == 0
ChunkMask = DefaultAvgChunkSize - 1
)
// Gear hash table - random values for each byte
// This is used for the Gear rolling hash which is simpler and faster than Buzhash
var gearTable = [256]uint64{
0x5c95c078, 0x22408989, 0x2d48a214, 0x12842087, 0x530f8afb, 0x474536b9, 0x2963b4f1, 0x44cb738b,
0x4ea7403d, 0x4d606b6e, 0x074ec5d3, 0x3f7e82f4, 0x4e3d26e7, 0x5cb4e82f, 0x7b0a1ef5, 0x3d4e7c92,
0x2a81ed69, 0x7f853df8, 0x452c8cf7, 0x0f4f3c9d, 0x3a5e81b7, 0x6cb2d819, 0x2e4c5f93, 0x7e8a1c57,
0x1f9d3e8c, 0x4b7c2a5d, 0x3c8f1d6e, 0x5d2a7b4f, 0x6e9c3f8a, 0x7a4d1e5c, 0x2b8c4f7d, 0x4f7d2c9e,
0x5a1e3d7c, 0x6b4f8a2d, 0x3e7c9d5a, 0x7d2a4f8b, 0x4c9e7d3a, 0x5b8a1c6e, 0x2d5f4a9c, 0x7a3c8d6b,
0x6e2a7b4d, 0x3f8c5d9a, 0x4a7d3e5b, 0x5c9a2d7e, 0x7b4e8f3c, 0x2a6d9c5b, 0x3e4a7d8c, 0x5d7b2e9a,
0x4c8a3d7b, 0x6e9d5c8a, 0x7a3e4d9c, 0x2b5c8a7d, 0x4d7e3a9c, 0x5a9c7d3e, 0x3c8b5a7d, 0x7d4e9c2a,
0x6a3d8c5b, 0x4e7a9d3c, 0x5c2a7b9e, 0x3a9d4e7c, 0x7b8c5a2d, 0x2d7e4a9c, 0x4a3c9d7b, 0x5e9a7c3d,
0x6c4d8a5b, 0x3b7e9c4a, 0x7a5c2d8b, 0x4d9a3e7c, 0x5b7c4a9e, 0x2e8a5d3c, 0x3c9e7a4d, 0x7d4a8c5b,
0x6b2d9a7c, 0x4a8c3e5d, 0x5d7a9c2e, 0x3e4c7b9a, 0x7c9d5a4b, 0x2a7e8c3d, 0x4c5a9d7e, 0x5a3e7c4b,
0x6d8a2c9e, 0x3c7b4a8d, 0x7e2d9c5a, 0x4b9a7e3c, 0x5c4d8a7b, 0x2d9e3c5a, 0x3a7c9d4e, 0x7b5a4c8d,
0x6a9c2e7b, 0x4d3e8a9c, 0x5e7b4d2a, 0x3b9a7c5d, 0x7c4e8a3b, 0x2e7d9c4a, 0x4a8b3e7d, 0x5d2c9a7e,
0x6c7a5d3e, 0x3e9c4a7b, 0x7a8d2c5e, 0x4c3e9a7d, 0x5b9c7e2a, 0x2a4d7c9e, 0x3d8a5c4b, 0x7e7b9a3c,
0x6b4a8d9e, 0x4e9c3b7a, 0x5a7d4e9c, 0x3c2a8b7d, 0x7d9e5c4a, 0x2b8a7d3e, 0x4d5c9a2b, 0x5e3a7c8d,
0x6a9d4b7c, 0x3b7a9c5e, 0x7c4b8a2d, 0x4a9e7c3b, 0x5d2b9a4e, 0x2e7c4d9a, 0x3a9b7e4c, 0x7e5a3c8b,
0x6c8a9d4e, 0x4b7c2a5e, 0x5a3e9c7d, 0x3d9a4b7c, 0x7a2d5e9c, 0x2c8b7a3d, 0x4e9c5a2b, 0x5b4d7e9a,
0x6d7a3c8b, 0x3e2b9a5d, 0x7c9d4a7e, 0x4a5e3c9b, 0x5e7a9d2c, 0x2b3c7e9a, 0x3a9e4b7d, 0x7d8a5c3e,
0x6b9c2d4a, 0x4c7e9a3b, 0x5a2c8b7e, 0x3b4d9a5c, 0x7e9b3a4d, 0x2d5a7c9e, 0x4b8d3e7a, 0x5c9a4b2d,
0x6a7c8d9e, 0x3c9e5a7b, 0x7b4a2c9d, 0x4d3b7e9a, 0x5e9c4a3b, 0x2a7b9d4e, 0x3e5c8a7b, 0x7a9d3e5c,
0x6c2a7b8d, 0x4e9a5c3b, 0x5b7d2a9e, 0x3a4e9c7b, 0x7d8b3a5c, 0x2c9e7a4b, 0x4a3d5e9c, 0x5d7b8a2e,
0x6b9a4c7d, 0x3d5a9e4b, 0x7e2c7b9a, 0x4b9d3a5e, 0x5c4e7a9d, 0x2e8a3c7b, 0x3b7c9e5a, 0x7a4d8b3e,
0x6d9c5a2b, 0x4a7e3d9c, 0x5e2a9b7d, 0x3c9a7e4b, 0x7b3e5c9a, 0x2a4b8d7e, 0x4d9c2a5b, 0x5a7d9e3c,
0x6c3b8a7d, 0x3e9d4a5c, 0x7d5c2b9e, 0x4c8a7d3b, 0x5b9e3c7a, 0x2d7a9c4e, 0x3a5e7b9d, 0x7e8b4a3c,
0x6a2d9e7b, 0x4b3e5a9d, 0x5d9c7b2a, 0x3b7d4e9c, 0x7c9a3b5e, 0x2e5c8a7d, 0x4a7b9d3e, 0x5c3a7e9b,
0x6d9e5c4a, 0x3c4a7b9e, 0x7a9d2e5c, 0x4e7c9a3d, 0x5a8b4e7c, 0x2b9a3d7e, 0x3d5b8a9c, 0x7b4e9a2d,
0x6c7d3a9e, 0x4a9c5e3b, 0x5e2b7d9a, 0x3a8d4c7b, 0x7d3e9a5c, 0x2c7a8b9e, 0x4b5d3a7c, 0x5c9a7e2b,
0x6a4b9d3e, 0x3e7c2a9d, 0x7c8a5b4e, 0x4d9e3c7a, 0x5b3a9e7c, 0x2e9c7b4a, 0x3b4e8a9d, 0x7a9c4e3b,
0x6d2a7c9e, 0x4c8b9a5d, 0x5a9e2b7c, 0x3c3d7a9e, 0x7e5a9c4b, 0x2a8d3e7c, 0x4e7a5c9b, 0x5d9b8a2e,
0x6b4c9e7a, 0x3a9d5b4e, 0x7b2e8a9c, 0x4a5c3e9b, 0x5c9a4d7e, 0x2d7e9a3c, 0x3e8b7c5a, 0x7c9e2a4d,
0x6a3b7d9c, 0x4d9a8b3e, 0x5e5c2a7b, 0x3b4a9d7c, 0x7a7c5e9b, 0x2c9b4a8d, 0x4b3e7c9a, 0x5a9d3b7e,
0x6c8a4e9d, 0x3d7b9c5a, 0x7e2a4b9c, 0x4c9e5d3a, 0x5b7a9c4e, 0x2e4d8a7b, 0x3a9c7e5d, 0x7b8d3a9e,
0x6d5c9a4b, 0x4a2e7b9d, 0x5d9b4c8a, 0x3c7a9e2b, 0x7d4b8c9e, 0x2b9a5c4d, 0x4e7d3a9c, 0x5c8a9e7b,
}
// Chunk represents a single deduplicated chunk
type Chunk struct {
// Hash is the SHA-256 hash of the chunk data (content-addressed)
Hash string
// Data is the raw chunk bytes
Data []byte
// Offset is the byte offset in the original file
Offset int64
// Length is the size of this chunk
Length int
}
// ChunkerConfig holds configuration for the chunker
type ChunkerConfig struct {
MinSize int // Minimum chunk size
AvgSize int // Target average chunk size
MaxSize int // Maximum chunk size
}
// DefaultChunkerConfig returns sensible defaults
func DefaultChunkerConfig() ChunkerConfig {
return ChunkerConfig{
MinSize: DefaultMinChunkSize,
AvgSize: DefaultAvgChunkSize,
MaxSize: DefaultMaxChunkSize,
}
}
// Chunker performs content-defined chunking using Gear hash
type Chunker struct {
reader io.Reader
config ChunkerConfig
// Rolling hash state
hash uint64
// Current chunk state
buf []byte
offset int64
mask uint64
}
// NewChunker creates a new chunker for the given reader
func NewChunker(r io.Reader, config ChunkerConfig) *Chunker {
// Calculate mask for target average size
// We want: avg_size = 1 / P(boundary)
// With mask, P(boundary) = 1 / (mask + 1)
// So mask = avg_size - 1
mask := uint64(config.AvgSize - 1)
return &Chunker{
reader: r,
config: config,
buf: make([]byte, 0, config.MaxSize),
mask: mask,
}
}
// Next returns the next chunk from the input stream
// Returns io.EOF when no more data is available
func (c *Chunker) Next() (*Chunk, error) {
c.buf = c.buf[:0]
c.hash = 0
// Read bytes until we find a chunk boundary or hit max size
singleByte := make([]byte, 1)
for {
n, err := c.reader.Read(singleByte)
if n == 0 {
if err == io.EOF {
// Return remaining data as final chunk
if len(c.buf) > 0 {
return c.makeChunk(), nil
}
return nil, io.EOF
}
if err != nil {
return nil, err
}
continue
}
b := singleByte[0]
c.buf = append(c.buf, b)
// Update Gear rolling hash
// Gear hash: hash = (hash << 1) + gear_table[byte]
c.hash = (c.hash << 1) + gearTable[b]
// Check for chunk boundary after minimum size
if len(c.buf) >= c.config.MinSize {
// Check if we hit a boundary (hash matches mask pattern)
if (c.hash & c.mask) == 0 {
return c.makeChunk(), nil
}
}
// Force boundary at max size
if len(c.buf) >= c.config.MaxSize {
return c.makeChunk(), nil
}
}
}
// makeChunk creates a Chunk from the current buffer
func (c *Chunker) makeChunk() *Chunk {
// Compute SHA-256 hash
h := sha256.Sum256(c.buf)
hash := hex.EncodeToString(h[:])
// Copy data
data := make([]byte, len(c.buf))
copy(data, c.buf)
chunk := &Chunk{
Hash: hash,
Data: data,
Offset: c.offset,
Length: len(data),
}
c.offset += int64(len(data))
return chunk
}
// ChunkReader splits a reader into content-defined chunks
// and returns them via a channel for concurrent processing
func ChunkReader(r io.Reader, config ChunkerConfig) (<-chan *Chunk, <-chan error) {
chunks := make(chan *Chunk, 100)
errs := make(chan error, 1)
go func() {
defer close(chunks)
defer close(errs)
chunker := NewChunker(r, config)
for {
chunk, err := chunker.Next()
if err == io.EOF {
return
}
if err != nil {
errs <- err
return
}
chunks <- chunk
}
}()
return chunks, errs
}
// HashData computes SHA-256 hash of data
func HashData(data []byte) string {
h := sha256.Sum256(data)
return hex.EncodeToString(h[:])
}

View File

@@ -0,0 +1,217 @@
package dedup
import (
"bytes"
"crypto/rand"
"io"
"testing"
)
func TestChunker_Basic(t *testing.T) {
// Create test data
data := make([]byte, 100*1024) // 100KB
rand.Read(data)
chunker := NewChunker(bytes.NewReader(data), DefaultChunkerConfig())
var chunks []*Chunk
var totalBytes int
for {
chunk, err := chunker.Next()
if err == io.EOF {
break
}
if err != nil {
t.Fatalf("Chunker.Next() error: %v", err)
}
chunks = append(chunks, chunk)
totalBytes += chunk.Length
// Verify chunk properties
if chunk.Length < DefaultMinChunkSize && len(chunks) < 10 {
// Only the last chunk can be smaller than min
// (unless file is smaller than min)
}
if chunk.Length > DefaultMaxChunkSize {
t.Errorf("Chunk %d exceeds max size: %d > %d", len(chunks), chunk.Length, DefaultMaxChunkSize)
}
if chunk.Hash == "" {
t.Errorf("Chunk %d has empty hash", len(chunks))
}
if len(chunk.Hash) != 64 { // SHA-256 hex length
t.Errorf("Chunk %d has invalid hash length: %d", len(chunks), len(chunk.Hash))
}
}
if totalBytes != len(data) {
t.Errorf("Total bytes mismatch: got %d, want %d", totalBytes, len(data))
}
t.Logf("Chunked %d bytes into %d chunks", totalBytes, len(chunks))
t.Logf("Average chunk size: %d bytes", totalBytes/len(chunks))
}
func TestChunker_Deterministic(t *testing.T) {
// Same data should produce same chunks
data := make([]byte, 50*1024)
rand.Read(data)
// First pass
chunker1 := NewChunker(bytes.NewReader(data), DefaultChunkerConfig())
var hashes1 []string
for {
chunk, err := chunker1.Next()
if err == io.EOF {
break
}
if err != nil {
t.Fatal(err)
}
hashes1 = append(hashes1, chunk.Hash)
}
// Second pass
chunker2 := NewChunker(bytes.NewReader(data), DefaultChunkerConfig())
var hashes2 []string
for {
chunk, err := chunker2.Next()
if err == io.EOF {
break
}
if err != nil {
t.Fatal(err)
}
hashes2 = append(hashes2, chunk.Hash)
}
// Compare
if len(hashes1) != len(hashes2) {
t.Fatalf("Different chunk counts: %d vs %d", len(hashes1), len(hashes2))
}
for i := range hashes1 {
if hashes1[i] != hashes2[i] {
t.Errorf("Hash mismatch at chunk %d: %s vs %s", i, hashes1[i], hashes2[i])
}
}
}
func TestChunker_ShiftedData(t *testing.T) {
// Test that shifted data still shares chunks (the key CDC benefit)
original := make([]byte, 100*1024)
rand.Read(original)
// Create shifted version (prepend some bytes)
prefix := make([]byte, 1000)
rand.Read(prefix)
shifted := append(prefix, original...)
// Chunk both
config := DefaultChunkerConfig()
chunker1 := NewChunker(bytes.NewReader(original), config)
hashes1 := make(map[string]bool)
for {
chunk, err := chunker1.Next()
if err == io.EOF {
break
}
if err != nil {
t.Fatal(err)
}
hashes1[chunk.Hash] = true
}
chunker2 := NewChunker(bytes.NewReader(shifted), config)
var matched, total int
for {
chunk, err := chunker2.Next()
if err == io.EOF {
break
}
if err != nil {
t.Fatal(err)
}
total++
if hashes1[chunk.Hash] {
matched++
}
}
// Should have significant overlap despite the shift
overlapRatio := float64(matched) / float64(total)
t.Logf("Chunk overlap after %d-byte shift: %.1f%% (%d/%d chunks)",
len(prefix), overlapRatio*100, matched, total)
// We expect at least 50% overlap for content-defined chunking
if overlapRatio < 0.5 {
t.Errorf("Low chunk overlap: %.1f%% (expected >50%%)", overlapRatio*100)
}
}
func TestChunker_SmallFile(t *testing.T) {
// File smaller than min chunk size
data := []byte("hello world")
chunker := NewChunker(bytes.NewReader(data), DefaultChunkerConfig())
chunk, err := chunker.Next()
if err != nil {
t.Fatal(err)
}
if chunk.Length != len(data) {
t.Errorf("Expected chunk length %d, got %d", len(data), chunk.Length)
}
// Should be EOF after
_, err = chunker.Next()
if err != io.EOF {
t.Errorf("Expected EOF, got %v", err)
}
}
func TestChunker_EmptyFile(t *testing.T) {
chunker := NewChunker(bytes.NewReader(nil), DefaultChunkerConfig())
_, err := chunker.Next()
if err != io.EOF {
t.Errorf("Expected EOF for empty file, got %v", err)
}
}
func TestHashData(t *testing.T) {
hash := HashData([]byte("test"))
if len(hash) != 64 {
t.Errorf("Expected 64-char hash, got %d", len(hash))
}
// Known SHA-256 of "test"
expected := "9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08"
if hash != expected {
t.Errorf("Hash mismatch: got %s, want %s", hash, expected)
}
}
func BenchmarkChunker(b *testing.B) {
// 1MB of random data
data := make([]byte, 1024*1024)
rand.Read(data)
b.ResetTimer()
b.SetBytes(int64(len(data)))
for i := 0; i < b.N; i++ {
chunker := NewChunker(bytes.NewReader(data), DefaultChunkerConfig())
for {
_, err := chunker.Next()
if err == io.EOF {
break
}
if err != nil {
b.Fatal(err)
}
}
}
}

239
internal/dedup/index.go Normal file
View File

@@ -0,0 +1,239 @@
package dedup
import (
"database/sql"
"fmt"
"path/filepath"
"time"
_ "github.com/mattn/go-sqlite3" // SQLite driver
)
// ChunkIndex provides fast chunk lookups using SQLite
type ChunkIndex struct {
db *sql.DB
}
// NewChunkIndex opens or creates a chunk index database
func NewChunkIndex(basePath string) (*ChunkIndex, error) {
dbPath := filepath.Join(basePath, "chunks.db")
db, err := sql.Open("sqlite3", dbPath+"?_journal_mode=WAL&_synchronous=NORMAL")
if err != nil {
return nil, fmt.Errorf("failed to open chunk index: %w", err)
}
idx := &ChunkIndex{db: db}
if err := idx.migrate(); err != nil {
db.Close()
return nil, err
}
return idx, nil
}
// migrate creates the schema if needed
func (idx *ChunkIndex) migrate() error {
schema := `
CREATE TABLE IF NOT EXISTS chunks (
hash TEXT PRIMARY KEY,
size_raw INTEGER NOT NULL,
size_stored INTEGER NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
last_accessed DATETIME,
ref_count INTEGER DEFAULT 1
);
CREATE TABLE IF NOT EXISTS manifests (
id TEXT PRIMARY KEY,
database_type TEXT,
database_name TEXT,
database_host TEXT,
created_at DATETIME,
original_size INTEGER,
stored_size INTEGER,
chunk_count INTEGER,
new_chunks INTEGER,
dedup_ratio REAL,
sha256 TEXT,
verified_at DATETIME
);
CREATE INDEX IF NOT EXISTS idx_chunks_created ON chunks(created_at);
CREATE INDEX IF NOT EXISTS idx_chunks_accessed ON chunks(last_accessed);
CREATE INDEX IF NOT EXISTS idx_manifests_created ON manifests(created_at);
CREATE INDEX IF NOT EXISTS idx_manifests_database ON manifests(database_name);
`
_, err := idx.db.Exec(schema)
return err
}
// Close closes the database
func (idx *ChunkIndex) Close() error {
return idx.db.Close()
}
// AddChunk records a chunk in the index
func (idx *ChunkIndex) AddChunk(hash string, sizeRaw, sizeStored int) error {
_, err := idx.db.Exec(`
INSERT INTO chunks (hash, size_raw, size_stored, created_at, last_accessed, ref_count)
VALUES (?, ?, ?, ?, ?, 1)
ON CONFLICT(hash) DO UPDATE SET
ref_count = ref_count + 1,
last_accessed = ?
`, hash, sizeRaw, sizeStored, time.Now(), time.Now(), time.Now())
return err
}
// HasChunk checks if a chunk exists in the index
func (idx *ChunkIndex) HasChunk(hash string) (bool, error) {
var count int
err := idx.db.QueryRow("SELECT COUNT(*) FROM chunks WHERE hash = ?", hash).Scan(&count)
return count > 0, err
}
// GetChunk retrieves chunk metadata
func (idx *ChunkIndex) GetChunk(hash string) (*ChunkMeta, error) {
var m ChunkMeta
err := idx.db.QueryRow(`
SELECT hash, size_raw, size_stored, created_at, ref_count
FROM chunks WHERE hash = ?
`, hash).Scan(&m.Hash, &m.SizeRaw, &m.SizeStored, &m.CreatedAt, &m.RefCount)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, err
}
return &m, nil
}
// ChunkMeta holds metadata about a chunk
type ChunkMeta struct {
Hash string
SizeRaw int64
SizeStored int64
CreatedAt time.Time
RefCount int
}
// DecrementRef decreases the reference count for a chunk
// Returns true if the chunk should be deleted (ref_count <= 0)
func (idx *ChunkIndex) DecrementRef(hash string) (shouldDelete bool, err error) {
result, err := idx.db.Exec(`
UPDATE chunks SET ref_count = ref_count - 1 WHERE hash = ?
`, hash)
if err != nil {
return false, err
}
affected, _ := result.RowsAffected()
if affected == 0 {
return false, nil
}
var refCount int
err = idx.db.QueryRow("SELECT ref_count FROM chunks WHERE hash = ?", hash).Scan(&refCount)
if err != nil {
return false, err
}
return refCount <= 0, nil
}
// RemoveChunk removes a chunk from the index
func (idx *ChunkIndex) RemoveChunk(hash string) error {
_, err := idx.db.Exec("DELETE FROM chunks WHERE hash = ?", hash)
return err
}
// AddManifest records a manifest in the index
func (idx *ChunkIndex) AddManifest(m *Manifest) error {
_, err := idx.db.Exec(`
INSERT OR REPLACE INTO manifests
(id, database_type, database_name, database_host, created_at,
original_size, stored_size, chunk_count, new_chunks, dedup_ratio, sha256)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`, m.ID, m.DatabaseType, m.DatabaseName, m.DatabaseHost, m.CreatedAt,
m.OriginalSize, m.StoredSize, m.ChunkCount, m.NewChunks, m.DedupRatio, m.SHA256)
return err
}
// RemoveManifest removes a manifest from the index
func (idx *ChunkIndex) RemoveManifest(id string) error {
_, err := idx.db.Exec("DELETE FROM manifests WHERE id = ?", id)
return err
}
// IndexStats holds statistics about the dedup index
type IndexStats struct {
TotalChunks int64
TotalManifests int64
TotalSizeRaw int64 // Uncompressed, undeduplicated
TotalSizeStored int64 // On-disk after dedup+compression
DedupRatio float64
OldestChunk time.Time
NewestChunk time.Time
}
// Stats returns statistics about the index
func (idx *ChunkIndex) Stats() (*IndexStats, error) {
stats := &IndexStats{}
var oldestStr, newestStr string
err := idx.db.QueryRow(`
SELECT
COUNT(*),
COALESCE(SUM(size_raw), 0),
COALESCE(SUM(size_stored), 0),
COALESCE(MIN(created_at), ''),
COALESCE(MAX(created_at), '')
FROM chunks
`).Scan(&stats.TotalChunks, &stats.TotalSizeRaw, &stats.TotalSizeStored,
&oldestStr, &newestStr)
if err != nil {
return nil, err
}
// Parse time strings
if oldestStr != "" {
stats.OldestChunk, _ = time.Parse("2006-01-02 15:04:05", oldestStr)
}
if newestStr != "" {
stats.NewestChunk, _ = time.Parse("2006-01-02 15:04:05", newestStr)
}
idx.db.QueryRow("SELECT COUNT(*) FROM manifests").Scan(&stats.TotalManifests)
if stats.TotalSizeRaw > 0 {
stats.DedupRatio = 1.0 - float64(stats.TotalSizeStored)/float64(stats.TotalSizeRaw)
}
return stats, nil
}
// ListOrphanedChunks returns chunks that have ref_count <= 0
func (idx *ChunkIndex) ListOrphanedChunks() ([]string, error) {
rows, err := idx.db.Query("SELECT hash FROM chunks WHERE ref_count <= 0")
if err != nil {
return nil, err
}
defer rows.Close()
var hashes []string
for rows.Next() {
var hash string
if err := rows.Scan(&hash); err != nil {
continue
}
hashes = append(hashes, hash)
}
return hashes, rows.Err()
}
// Vacuum cleans up the database
func (idx *ChunkIndex) Vacuum() error {
_, err := idx.db.Exec("VACUUM")
return err
}

188
internal/dedup/manifest.go Normal file
View File

@@ -0,0 +1,188 @@
package dedup
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"time"
)
// Manifest describes a single backup as a list of chunks
type Manifest struct {
// ID is the unique identifier (typically timestamp-based)
ID string `json:"id"`
// Name is an optional human-readable name
Name string `json:"name,omitempty"`
// CreatedAt is when this backup was created
CreatedAt time.Time `json:"created_at"`
// Database information
DatabaseType string `json:"database_type"` // postgres, mysql
DatabaseName string `json:"database_name"`
DatabaseHost string `json:"database_host"`
// Chunks is the ordered list of chunk hashes
// The file is reconstructed by concatenating chunks in order
Chunks []ChunkRef `json:"chunks"`
// Stats about the backup
OriginalSize int64 `json:"original_size"` // Size before deduplication
StoredSize int64 `json:"stored_size"` // Size after dedup (new chunks only)
ChunkCount int `json:"chunk_count"` // Total chunks
NewChunks int `json:"new_chunks"` // Chunks that weren't deduplicated
DedupRatio float64 `json:"dedup_ratio"` // 1.0 = no dedup, 0.0 = 100% dedup
// Encryption and compression settings used
Encrypted bool `json:"encrypted"`
Compressed bool `json:"compressed"`
// Verification
SHA256 string `json:"sha256"` // Hash of reconstructed file
VerifiedAt time.Time `json:"verified_at,omitempty"`
}
// ChunkRef references a chunk in the manifest
type ChunkRef struct {
Hash string `json:"h"` // SHA-256 hash (64 chars)
Offset int64 `json:"o"` // Offset in original file
Length int `json:"l"` // Chunk length
}
// ManifestStore manages backup manifests
type ManifestStore struct {
basePath string
}
// NewManifestStore creates a new manifest store
func NewManifestStore(basePath string) (*ManifestStore, error) {
manifestDir := filepath.Join(basePath, "manifests")
if err := os.MkdirAll(manifestDir, 0700); err != nil {
return nil, fmt.Errorf("failed to create manifest directory: %w", err)
}
return &ManifestStore{basePath: basePath}, nil
}
// manifestPath returns the path for a manifest ID
func (s *ManifestStore) manifestPath(id string) string {
return filepath.Join(s.basePath, "manifests", id+".manifest.json")
}
// Save writes a manifest to disk
func (s *ManifestStore) Save(m *Manifest) error {
path := s.manifestPath(m.ID)
data, err := json.MarshalIndent(m, "", " ")
if err != nil {
return fmt.Errorf("failed to marshal manifest: %w", err)
}
// Atomic write
tmpPath := path + ".tmp"
if err := os.WriteFile(tmpPath, data, 0600); err != nil {
return fmt.Errorf("failed to write manifest: %w", err)
}
if err := os.Rename(tmpPath, path); err != nil {
os.Remove(tmpPath)
return fmt.Errorf("failed to commit manifest: %w", err)
}
return nil
}
// Load reads a manifest from disk
func (s *ManifestStore) Load(id string) (*Manifest, error) {
path := s.manifestPath(id)
data, err := os.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("failed to read manifest %s: %w", id, err)
}
var m Manifest
if err := json.Unmarshal(data, &m); err != nil {
return nil, fmt.Errorf("failed to parse manifest %s: %w", id, err)
}
return &m, nil
}
// Delete removes a manifest
func (s *ManifestStore) Delete(id string) error {
path := s.manifestPath(id)
if err := os.Remove(path); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("failed to delete manifest %s: %w", id, err)
}
return nil
}
// List returns all manifest IDs
func (s *ManifestStore) List() ([]string, error) {
manifestDir := filepath.Join(s.basePath, "manifests")
entries, err := os.ReadDir(manifestDir)
if err != nil {
return nil, fmt.Errorf("failed to list manifests: %w", err)
}
var ids []string
for _, e := range entries {
if e.IsDir() {
continue
}
name := e.Name()
if len(name) > 14 && name[len(name)-14:] == ".manifest.json" {
ids = append(ids, name[:len(name)-14])
}
}
return ids, nil
}
// ListAll returns all manifests sorted by creation time (newest first)
func (s *ManifestStore) ListAll() ([]*Manifest, error) {
ids, err := s.List()
if err != nil {
return nil, err
}
var manifests []*Manifest
for _, id := range ids {
m, err := s.Load(id)
if err != nil {
continue // Skip corrupted manifests
}
manifests = append(manifests, m)
}
// Sort by creation time (newest first)
for i := 0; i < len(manifests)-1; i++ {
for j := i + 1; j < len(manifests); j++ {
if manifests[j].CreatedAt.After(manifests[i].CreatedAt) {
manifests[i], manifests[j] = manifests[j], manifests[i]
}
}
}
return manifests, nil
}
// GetChunkHashes returns all unique chunk hashes referenced by manifests
func (s *ManifestStore) GetChunkHashes() (map[string]int, error) {
manifests, err := s.ListAll()
if err != nil {
return nil, err
}
// Map hash -> reference count
refs := make(map[string]int)
for _, m := range manifests {
for _, c := range m.Chunks {
refs[c.Hash]++
}
}
return refs, nil
}

367
internal/dedup/store.go Normal file
View File

@@ -0,0 +1,367 @@
package dedup
import (
"compress/gzip"
"crypto/aes"
"crypto/cipher"
"crypto/rand"
"crypto/sha256"
"encoding/hex"
"fmt"
"io"
"os"
"path/filepath"
"sync"
)
// ChunkStore manages content-addressed chunk storage
// Chunks are stored as: <base>/<prefix>/<hash>.chunk[.gz][.enc]
type ChunkStore struct {
basePath string
compress bool
encryptionKey []byte // 32 bytes for AES-256
mu sync.RWMutex
existingChunks map[string]bool // Cache of known chunks
}
// StoreConfig holds configuration for the chunk store
type StoreConfig struct {
BasePath string
Compress bool // Enable gzip compression
EncryptionKey string // Optional: hex-encoded 32-byte key for AES-256-GCM
}
// NewChunkStore creates a new chunk store
func NewChunkStore(config StoreConfig) (*ChunkStore, error) {
store := &ChunkStore{
basePath: config.BasePath,
compress: config.Compress,
existingChunks: make(map[string]bool),
}
// Parse encryption key if provided
if config.EncryptionKey != "" {
key, err := hex.DecodeString(config.EncryptionKey)
if err != nil {
return nil, fmt.Errorf("invalid encryption key: %w", err)
}
if len(key) != 32 {
return nil, fmt.Errorf("encryption key must be 32 bytes (got %d)", len(key))
}
store.encryptionKey = key
}
// Create base directory structure
if err := os.MkdirAll(config.BasePath, 0700); err != nil {
return nil, fmt.Errorf("failed to create chunk store: %w", err)
}
// Create chunks and manifests directories
for _, dir := range []string{"chunks", "manifests"} {
if err := os.MkdirAll(filepath.Join(config.BasePath, dir), 0700); err != nil {
return nil, fmt.Errorf("failed to create %s directory: %w", dir, err)
}
}
return store, nil
}
// chunkPath returns the filesystem path for a chunk hash
// Uses 2-character prefix for directory sharding (256 subdirs)
func (s *ChunkStore) chunkPath(hash string) string {
if len(hash) < 2 {
return filepath.Join(s.basePath, "chunks", "xx", hash+s.chunkExt())
}
prefix := hash[:2]
return filepath.Join(s.basePath, "chunks", prefix, hash+s.chunkExt())
}
// chunkExt returns the file extension based on compression/encryption settings
func (s *ChunkStore) chunkExt() string {
ext := ".chunk"
if s.compress {
ext += ".gz"
}
if s.encryptionKey != nil {
ext += ".enc"
}
return ext
}
// Has checks if a chunk exists in the store
func (s *ChunkStore) Has(hash string) bool {
s.mu.RLock()
if exists, ok := s.existingChunks[hash]; ok {
s.mu.RUnlock()
return exists
}
s.mu.RUnlock()
// Check filesystem
path := s.chunkPath(hash)
_, err := os.Stat(path)
exists := err == nil
s.mu.Lock()
s.existingChunks[hash] = exists
s.mu.Unlock()
return exists
}
// Put stores a chunk, returning true if it was new (not deduplicated)
func (s *ChunkStore) Put(chunk *Chunk) (isNew bool, err error) {
// Check if already exists (deduplication!)
if s.Has(chunk.Hash) {
return false, nil
}
path := s.chunkPath(chunk.Hash)
// Create prefix directory
if err := os.MkdirAll(filepath.Dir(path), 0700); err != nil {
return false, fmt.Errorf("failed to create chunk directory: %w", err)
}
// Prepare data
data := chunk.Data
// Compress if enabled
if s.compress {
data, err = s.compressData(data)
if err != nil {
return false, fmt.Errorf("compression failed: %w", err)
}
}
// Encrypt if enabled
if s.encryptionKey != nil {
data, err = s.encryptData(data)
if err != nil {
return false, fmt.Errorf("encryption failed: %w", err)
}
}
// Write atomically (write to temp, then rename)
tmpPath := path + ".tmp"
if err := os.WriteFile(tmpPath, data, 0600); err != nil {
return false, fmt.Errorf("failed to write chunk: %w", err)
}
if err := os.Rename(tmpPath, path); err != nil {
os.Remove(tmpPath)
return false, fmt.Errorf("failed to commit chunk: %w", err)
}
// Update cache
s.mu.Lock()
s.existingChunks[chunk.Hash] = true
s.mu.Unlock()
return true, nil
}
// Get retrieves a chunk by hash
func (s *ChunkStore) Get(hash string) (*Chunk, error) {
path := s.chunkPath(hash)
data, err := os.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("failed to read chunk %s: %w", hash, err)
}
// Decrypt if encrypted
if s.encryptionKey != nil {
data, err = s.decryptData(data)
if err != nil {
return nil, fmt.Errorf("decryption failed: %w", err)
}
}
// Decompress if compressed
if s.compress {
data, err = s.decompressData(data)
if err != nil {
return nil, fmt.Errorf("decompression failed: %w", err)
}
}
// Verify hash
h := sha256.Sum256(data)
actualHash := hex.EncodeToString(h[:])
if actualHash != hash {
return nil, fmt.Errorf("chunk hash mismatch: expected %s, got %s", hash, actualHash)
}
return &Chunk{
Hash: hash,
Data: data,
Length: len(data),
}, nil
}
// Delete removes a chunk from the store
func (s *ChunkStore) Delete(hash string) error {
path := s.chunkPath(hash)
if err := os.Remove(path); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("failed to delete chunk %s: %w", hash, err)
}
s.mu.Lock()
delete(s.existingChunks, hash)
s.mu.Unlock()
return nil
}
// Stats returns storage statistics
type StoreStats struct {
TotalChunks int64
TotalSize int64 // Bytes on disk (after compression/encryption)
UniqueSize int64 // Bytes of unique data
Directories int
}
// Stats returns statistics about the chunk store
func (s *ChunkStore) Stats() (*StoreStats, error) {
stats := &StoreStats{}
chunksDir := filepath.Join(s.basePath, "chunks")
err := filepath.Walk(chunksDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
stats.Directories++
return nil
}
stats.TotalChunks++
stats.TotalSize += info.Size()
return nil
})
return stats, err
}
// LoadIndex loads the existing chunk hashes into memory
func (s *ChunkStore) LoadIndex() error {
s.mu.Lock()
defer s.mu.Unlock()
s.existingChunks = make(map[string]bool)
chunksDir := filepath.Join(s.basePath, "chunks")
return filepath.Walk(chunksDir, func(path string, info os.FileInfo, err error) error {
if err != nil || info.IsDir() {
return err
}
// Extract hash from filename
base := filepath.Base(path)
hash := base
// Remove extensions
for _, ext := range []string{".enc", ".gz", ".chunk"} {
if len(hash) > len(ext) && hash[len(hash)-len(ext):] == ext {
hash = hash[:len(hash)-len(ext)]
}
}
if len(hash) == 64 { // SHA-256 hex length
s.existingChunks[hash] = true
}
return nil
})
}
// compressData compresses data using gzip
func (s *ChunkStore) compressData(data []byte) ([]byte, error) {
var buf []byte
w, err := gzip.NewWriterLevel((*bytesBuffer)(&buf), gzip.BestCompression)
if err != nil {
return nil, err
}
if _, err := w.Write(data); err != nil {
return nil, err
}
if err := w.Close(); err != nil {
return nil, err
}
return buf, nil
}
// bytesBuffer is a simple io.Writer that appends to a byte slice
type bytesBuffer []byte
func (b *bytesBuffer) Write(p []byte) (int, error) {
*b = append(*b, p...)
return len(p), nil
}
// decompressData decompresses gzip data
func (s *ChunkStore) decompressData(data []byte) ([]byte, error) {
r, err := gzip.NewReader(&bytesReader{data: data})
if err != nil {
return nil, err
}
defer r.Close()
return io.ReadAll(r)
}
// bytesReader is a simple io.Reader from a byte slice
type bytesReader struct {
data []byte
pos int
}
func (r *bytesReader) Read(p []byte) (int, error) {
if r.pos >= len(r.data) {
return 0, io.EOF
}
n := copy(p, r.data[r.pos:])
r.pos += n
return n, nil
}
// encryptData encrypts data using AES-256-GCM
func (s *ChunkStore) encryptData(plaintext []byte) ([]byte, error) {
block, err := aes.NewCipher(s.encryptionKey)
if err != nil {
return nil, err
}
gcm, err := cipher.NewGCM(block)
if err != nil {
return nil, err
}
nonce := make([]byte, gcm.NonceSize())
if _, err := rand.Read(nonce); err != nil {
return nil, err
}
// Prepend nonce to ciphertext
return gcm.Seal(nonce, nonce, plaintext, nil), nil
}
// decryptData decrypts AES-256-GCM encrypted data
func (s *ChunkStore) decryptData(ciphertext []byte) ([]byte, error) {
block, err := aes.NewCipher(s.encryptionKey)
if err != nil {
return nil, err
}
gcm, err := cipher.NewGCM(block)
if err != nil {
return nil, err
}
if len(ciphertext) < gcm.NonceSize() {
return nil, fmt.Errorf("ciphertext too short")
}
nonce := ciphertext[:gcm.NonceSize()]
ciphertext = ciphertext[gcm.NonceSize():]
return gcm.Open(nil, nonce, ciphertext, nil)
}

View File

@@ -339,7 +339,7 @@ func (e *CloneEngine) Backup(ctx context.Context, opts *BackupOptions) (*BackupR
// Save metadata // Save metadata
meta := &metadata.BackupMetadata{ meta := &metadata.BackupMetadata{
Version: "3.40.0", Version: "3.42.1",
Timestamp: startTime, Timestamp: startTime,
Database: opts.Database, Database: opts.Database,
DatabaseType: "mysql", DatabaseType: "mysql",

View File

@@ -234,10 +234,26 @@ func (e *MySQLDumpEngine) Backup(ctx context.Context, opts *BackupOptions) (*Bac
gzWriter.Close() gzWriter.Close()
} }
// Wait for command // Wait for command with proper context handling
if err := cmd.Wait(); err != nil { cmdDone := make(chan error, 1)
go func() {
cmdDone <- cmd.Wait()
}()
var cmdErr error
select {
case cmdErr = <-cmdDone:
// Command completed
case <-ctx.Done():
e.log.Warn("MySQL backup cancelled - killing process")
cmd.Process.Kill()
<-cmdDone
cmdErr = ctx.Err()
}
if cmdErr != nil {
stderr := stderrBuf.String() stderr := stderrBuf.String()
return nil, fmt.Errorf("mysqldump failed: %w\n%s", err, stderr) return nil, fmt.Errorf("mysqldump failed: %w\n%s", cmdErr, stderr)
} }
// Get file info // Get file info
@@ -254,7 +270,7 @@ func (e *MySQLDumpEngine) Backup(ctx context.Context, opts *BackupOptions) (*Bac
// Save metadata // Save metadata
meta := &metadata.BackupMetadata{ meta := &metadata.BackupMetadata{
Version: "3.40.0", Version: "3.42.1",
Timestamp: startTime, Timestamp: startTime,
Database: opts.Database, Database: opts.Database,
DatabaseType: "mysql", DatabaseType: "mysql",
@@ -442,8 +458,25 @@ func (e *MySQLDumpEngine) BackupToWriter(ctx context.Context, w io.Writer, opts
gzWriter.Close() gzWriter.Close()
} }
if err := cmd.Wait(); err != nil { // Wait for command with proper context handling
return nil, fmt.Errorf("mysqldump failed: %w\n%s", err, stderrBuf.String()) cmdDone := make(chan error, 1)
go func() {
cmdDone <- cmd.Wait()
}()
var cmdErr error
select {
case cmdErr = <-cmdDone:
// Command completed
case <-ctx.Done():
e.log.Warn("MySQL streaming backup cancelled - killing process")
cmd.Process.Kill()
<-cmdDone
cmdErr = ctx.Err()
}
if cmdErr != nil {
return nil, fmt.Errorf("mysqldump failed: %w\n%s", cmdErr, stderrBuf.String())
} }
return &BackupResult{ return &BackupResult{

View File

@@ -188,6 +188,8 @@ func (e *SnapshotEngine) Backup(ctx context.Context, opts *BackupOptions) (*Back
// Step 4: Mount snapshot // Step 4: Mount snapshot
mountPoint := e.config.MountPoint mountPoint := e.config.MountPoint
if mountPoint == "" { if mountPoint == "" {
// Note: snapshot engine uses snapshot.Config which doesnt have GetEffectiveWorkDir()
// TODO: Refactor to use main config.Config for WorkDir support
mountPoint = filepath.Join(os.TempDir(), fmt.Sprintf("dbbackup_snap_%s", timestamp)) mountPoint = filepath.Join(os.TempDir(), fmt.Sprintf("dbbackup_snap_%s", timestamp))
} }
@@ -223,7 +225,7 @@ func (e *SnapshotEngine) Backup(ctx context.Context, opts *BackupOptions) (*Back
// Save metadata // Save metadata
meta := &metadata.BackupMetadata{ meta := &metadata.BackupMetadata{
Version: "3.40.0", Version: "3.42.1",
Timestamp: startTime, Timestamp: startTime,
Database: opts.Database, Database: opts.Database,
DatabaseType: "mysql", DatabaseType: "mysql",

View File

@@ -117,7 +117,7 @@ func NewEngine(sourceCfg, targetCfg *config.Config, log logger.Logger) (*Engine,
targetDB: targetDB, targetDB: targetDB,
log: log, log: log,
progress: progress.NewSpinner(), progress: progress.NewSpinner(),
workDir: os.TempDir(), workDir: sourceCfg.GetEffectiveWorkDir(),
keepBackup: false, keepBackup: false,
jobs: 4, jobs: 4,
dryRun: false, dryRun: false,

View File

@@ -212,7 +212,11 @@ func (m *BinlogManager) detectTools() error {
// detectServerType determines if we're working with MySQL or MariaDB // detectServerType determines if we're working with MySQL or MariaDB
func (m *BinlogManager) detectServerType() DatabaseType { func (m *BinlogManager) detectServerType() DatabaseType {
cmd := exec.Command(m.mysqlbinlogPath, "--version") // Use timeout to prevent blocking if command hangs
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cmd := exec.CommandContext(ctx, m.mysqlbinlogPath, "--version")
output, err := cmd.Output() output, err := cmd.Output()
if err != nil { if err != nil {
return DatabaseMySQL // Default to MySQL return DatabaseMySQL // Default to MySQL

View File

@@ -47,9 +47,10 @@ type DownloadResult struct {
// Download downloads a backup from cloud storage // Download downloads a backup from cloud storage
func (d *CloudDownloader) Download(ctx context.Context, remotePath string, opts DownloadOptions) (*DownloadResult, error) { func (d *CloudDownloader) Download(ctx context.Context, remotePath string, opts DownloadOptions) (*DownloadResult, error) {
// Determine temp directory // Determine temp directory (use from opts, or from config's WorkDir, or fallback to system temp)
tempDir := opts.TempDir tempDir := opts.TempDir
if tempDir == "" { if tempDir == "" {
// Try to get from config if available (passed via opts.TempDir)
tempDir = os.TempDir() tempDir = os.TempDir()
} }

View File

@@ -4,6 +4,7 @@ import (
"bufio" "bufio"
"bytes" "bytes"
"compress/gzip" "compress/gzip"
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
@@ -12,6 +13,7 @@ import (
"path/filepath" "path/filepath"
"regexp" "regexp"
"strings" "strings"
"time"
"dbbackup/internal/logger" "dbbackup/internal/logger"
) )
@@ -60,9 +62,9 @@ type DiagnoseDetails struct {
TableList []string `json:"table_list,omitempty"` TableList []string `json:"table_list,omitempty"`
// Compression analysis // Compression analysis
GzipValid bool `json:"gzip_valid,omitempty"` GzipValid bool `json:"gzip_valid,omitempty"`
GzipError string `json:"gzip_error,omitempty"` GzipError string `json:"gzip_error,omitempty"`
ExpandedSize int64 `json:"expanded_size,omitempty"` ExpandedSize int64 `json:"expanded_size,omitempty"`
CompressionRatio float64 `json:"compression_ratio,omitempty"` CompressionRatio float64 `json:"compression_ratio,omitempty"`
} }
@@ -412,8 +414,12 @@ func (d *Diagnoser) diagnoseSQLScript(filePath string, compressed bool, result *
// diagnoseClusterArchive analyzes a cluster tar.gz archive // diagnoseClusterArchive analyzes a cluster tar.gz archive
func (d *Diagnoser) diagnoseClusterArchive(filePath string, result *DiagnoseResult) { func (d *Diagnoser) diagnoseClusterArchive(filePath string, result *DiagnoseResult) {
// First verify tar.gz integrity // First verify tar.gz integrity with timeout
cmd := exec.Command("tar", "-tzf", filePath) // 5 minutes for large archives (multi-GB archives need more time)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
cmd := exec.CommandContext(ctx, "tar", "-tzf", filePath)
output, err := cmd.Output() output, err := cmd.Output()
if err != nil { if err != nil {
result.IsValid = false result.IsValid = false
@@ -491,7 +497,12 @@ func (d *Diagnoser) diagnoseUnknown(filePath string, result *DiagnoseResult) {
// verifyWithPgRestore uses pg_restore --list to verify dump integrity // verifyWithPgRestore uses pg_restore --list to verify dump integrity
func (d *Diagnoser) verifyWithPgRestore(filePath string, result *DiagnoseResult) { func (d *Diagnoser) verifyWithPgRestore(filePath string, result *DiagnoseResult) {
cmd := exec.Command("pg_restore", "--list", filePath) // Use timeout to prevent blocking on very large dump files
// 5 minutes for large dumps (multi-GB dumps with many tables)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
cmd := exec.CommandContext(ctx, "pg_restore", "--list", filePath)
output, err := cmd.CombinedOutput() output, err := cmd.CombinedOutput()
if err != nil { if err != nil {
@@ -544,7 +555,11 @@ func (d *Diagnoser) verifyWithPgRestore(filePath string, result *DiagnoseResult)
// DiagnoseClusterDumps extracts and diagnoses all dumps in a cluster archive // DiagnoseClusterDumps extracts and diagnoses all dumps in a cluster archive
func (d *Diagnoser) DiagnoseClusterDumps(archivePath, tempDir string) ([]*DiagnoseResult, error) { func (d *Diagnoser) DiagnoseClusterDumps(archivePath, tempDir string) ([]*DiagnoseResult, error) {
// First, try to list archive contents without extracting (fast check) // First, try to list archive contents without extracting (fast check)
listCmd := exec.Command("tar", "-tzf", archivePath) // 10 minutes for very large archives
listCtx, listCancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer listCancel()
listCmd := exec.CommandContext(listCtx, "tar", "-tzf", archivePath)
listOutput, listErr := listCmd.CombinedOutput() listOutput, listErr := listCmd.CombinedOutput()
if listErr != nil { if listErr != nil {
// Archive listing failed - likely corrupted // Archive listing failed - likely corrupted
@@ -587,15 +602,21 @@ func (d *Diagnoser) DiagnoseClusterDumps(archivePath, tempDir string) ([]*Diagno
// Check temp directory space - try to extract metadata first // Check temp directory space - try to extract metadata first
if stat, err := os.Stat(tempDir); err == nil && stat.IsDir() { if stat, err := os.Stat(tempDir); err == nil && stat.IsDir() {
// Try extraction of a small test file first // Try extraction of a small test file first with timeout
testCmd := exec.Command("tar", "-xzf", archivePath, "-C", tempDir, "--wildcards", "*.json", "--wildcards", "globals.sql") testCtx, testCancel := context.WithTimeout(context.Background(), 30*time.Second)
testCmd := exec.CommandContext(testCtx, "tar", "-xzf", archivePath, "-C", tempDir, "--wildcards", "*.json", "--wildcards", "globals.sql")
testCmd.Run() // Ignore error - just try to extract metadata testCmd.Run() // Ignore error - just try to extract metadata
testCancel()
} }
d.log.Info("Archive listing successful", "files", len(files)) d.log.Info("Archive listing successful", "files", len(files))
// Try full extraction // Try full extraction - NO TIMEOUT here as large archives can take a long time
cmd := exec.Command("tar", "-xzf", archivePath, "-C", tempDir) // Use a generous timeout (30 minutes) for very large archives
extractCtx, extractCancel := context.WithTimeout(context.Background(), 30*time.Minute)
defer extractCancel()
cmd := exec.CommandContext(extractCtx, "tar", "-xzf", archivePath, "-C", tempDir)
var stderr bytes.Buffer var stderr bytes.Buffer
cmd.Stderr = &stderr cmd.Stderr = &stderr
if err := cmd.Run(); err != nil { if err := cmd.Run(); err != nil {

View File

@@ -27,7 +27,7 @@ type Engine struct {
progress progress.Indicator progress progress.Indicator
detailedReporter *progress.DetailedReporter detailedReporter *progress.DetailedReporter
dryRun bool dryRun bool
debugLogPath string // Path to save debug log on error debugLogPath string // Path to save debug log on error
errorCollector *ErrorCollector // Collects detailed error info errorCollector *ErrorCollector // Collects detailed error info
} }
@@ -357,40 +357,65 @@ func (e *Engine) executeRestoreCommandWithContext(ctx context.Context, cmdArgs [
return fmt.Errorf("failed to start restore command: %w", err) return fmt.Errorf("failed to start restore command: %w", err)
} }
// Read stderr in chunks to log errors without loading all into memory // Read stderr in goroutine to avoid blocking
buf := make([]byte, 4096)
var lastError string var lastError string
var errorCount int var errorCount int
const maxErrors = 10 // Limit captured errors to prevent OOM stderrDone := make(chan struct{})
for { go func() {
n, err := stderr.Read(buf) defer close(stderrDone)
if n > 0 { buf := make([]byte, 4096)
chunk := string(buf[:n]) const maxErrors = 10 // Limit captured errors to prevent OOM
for {
n, err := stderr.Read(buf)
if n > 0 {
chunk := string(buf[:n])
// Feed to error collector if enabled // Feed to error collector if enabled
if collector != nil { if collector != nil {
collector.CaptureStderr(chunk) collector.CaptureStderr(chunk)
}
// Only capture REAL errors, not verbose output
if strings.Contains(chunk, "ERROR:") || strings.Contains(chunk, "FATAL:") || strings.Contains(chunk, "error:") {
lastError = strings.TrimSpace(chunk)
errorCount++
if errorCount <= maxErrors {
e.log.Warn("Restore stderr", "output", chunk)
} }
// Only capture REAL errors, not verbose output
if strings.Contains(chunk, "ERROR:") || strings.Contains(chunk, "FATAL:") || strings.Contains(chunk, "error:") {
lastError = strings.TrimSpace(chunk)
errorCount++
if errorCount <= maxErrors {
e.log.Warn("Restore stderr", "output", chunk)
}
}
// Note: --verbose output is discarded to prevent OOM
}
if err != nil {
break
} }
// Note: --verbose output is discarded to prevent OOM
}
if err != nil {
break
} }
}()
// Wait for command with proper context handling
cmdDone := make(chan error, 1)
go func() {
cmdDone <- cmd.Wait()
}()
var cmdErr error
select {
case cmdErr = <-cmdDone:
// Command completed (success or failure)
case <-ctx.Done():
// Context cancelled - kill process
e.log.Warn("Restore cancelled - killing process")
cmd.Process.Kill()
<-cmdDone
cmdErr = ctx.Err()
} }
if err := cmd.Wait(); err != nil { // Wait for stderr reader to finish
<-stderrDone
if cmdErr != nil {
// Get exit code // Get exit code
exitCode := 1 exitCode := 1
if exitErr, ok := err.(*exec.ExitError); ok { if exitErr, ok := cmdErr.(*exec.ExitError); ok {
exitCode = exitErr.ExitCode() exitCode = exitErr.ExitCode()
} }
@@ -481,31 +506,56 @@ func (e *Engine) executeRestoreWithDecompression(ctx context.Context, archivePat
return fmt.Errorf("failed to start restore command: %w", err) return fmt.Errorf("failed to start restore command: %w", err)
} }
// Read stderr in chunks to log errors without loading all into memory // Read stderr in goroutine to avoid blocking
buf := make([]byte, 4096)
var lastError string var lastError string
var errorCount int var errorCount int
const maxErrors = 10 // Limit captured errors to prevent OOM stderrDone := make(chan struct{})
for { go func() {
n, err := stderr.Read(buf) defer close(stderrDone)
if n > 0 { buf := make([]byte, 4096)
chunk := string(buf[:n]) const maxErrors = 10 // Limit captured errors to prevent OOM
// Only capture REAL errors, not verbose output for {
if strings.Contains(chunk, "ERROR:") || strings.Contains(chunk, "FATAL:") || strings.Contains(chunk, "error:") { n, err := stderr.Read(buf)
lastError = strings.TrimSpace(chunk) if n > 0 {
errorCount++ chunk := string(buf[:n])
if errorCount <= maxErrors { // Only capture REAL errors, not verbose output
e.log.Warn("Restore stderr", "output", chunk) if strings.Contains(chunk, "ERROR:") || strings.Contains(chunk, "FATAL:") || strings.Contains(chunk, "error:") {
lastError = strings.TrimSpace(chunk)
errorCount++
if errorCount <= maxErrors {
e.log.Warn("Restore stderr", "output", chunk)
}
} }
// Note: --verbose output is discarded to prevent OOM
}
if err != nil {
break
} }
// Note: --verbose output is discarded to prevent OOM
}
if err != nil {
break
} }
}()
// Wait for command with proper context handling
cmdDone := make(chan error, 1)
go func() {
cmdDone <- cmd.Wait()
}()
var cmdErr error
select {
case cmdErr = <-cmdDone:
// Command completed (success or failure)
case <-ctx.Done():
// Context cancelled - kill process
e.log.Warn("Restore with decompression cancelled - killing process")
cmd.Process.Kill()
<-cmdDone
cmdErr = ctx.Err()
} }
if err := cmd.Wait(); err != nil { // Wait for stderr reader to finish
<-stderrDone
if cmdErr != nil {
// PostgreSQL pg_restore returns exit code 1 even for ignorable errors // PostgreSQL pg_restore returns exit code 1 even for ignorable errors
// Check if errors are ignorable (already exists, duplicate, etc.) // Check if errors are ignorable (already exists, duplicate, etc.)
if lastError != "" && e.isIgnorableError(lastError) { if lastError != "" && e.isIgnorableError(lastError) {
@@ -517,18 +567,18 @@ func (e *Engine) executeRestoreWithDecompression(ctx context.Context, archivePat
if lastError != "" { if lastError != "" {
classification := checks.ClassifyError(lastError) classification := checks.ClassifyError(lastError)
e.log.Error("Restore with decompression failed", e.log.Error("Restore with decompression failed",
"error", err, "error", cmdErr,
"last_stderr", lastError, "last_stderr", lastError,
"error_count", errorCount, "error_count", errorCount,
"error_type", classification.Type, "error_type", classification.Type,
"hint", classification.Hint, "hint", classification.Hint,
"action", classification.Action) "action", classification.Action)
return fmt.Errorf("restore failed: %w (last error: %s, total errors: %d) - %s", return fmt.Errorf("restore failed: %w (last error: %s, total errors: %d) - %s",
err, lastError, errorCount, classification.Hint) cmdErr, lastError, errorCount, classification.Hint)
} }
e.log.Error("Restore with decompression failed", "error", err, "last_stderr", lastError, "error_count", errorCount) e.log.Error("Restore with decompression failed", "error", cmdErr, "last_stderr", lastError, "error_count", errorCount)
return fmt.Errorf("restore failed: %w", err) return fmt.Errorf("restore failed: %w", cmdErr)
} }
return nil return nil
@@ -628,11 +678,12 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string) error {
e.progress.Start(fmt.Sprintf("Restoring cluster from %s", filepath.Base(archivePath))) e.progress.Start(fmt.Sprintf("Restoring cluster from %s", filepath.Base(archivePath)))
// Create temporary extraction directory // Create temporary extraction directory in configured WorkDir
tempDir := filepath.Join(e.cfg.BackupDir, fmt.Sprintf(".restore_%d", time.Now().Unix())) workDir := e.cfg.GetEffectiveWorkDir()
tempDir := filepath.Join(workDir, fmt.Sprintf(".restore_%d", time.Now().Unix()))
if err := os.MkdirAll(tempDir, 0755); err != nil { if err := os.MkdirAll(tempDir, 0755); err != nil {
operation.Fail("Failed to create temporary directory") operation.Fail("Failed to create temporary directory")
return fmt.Errorf("failed to create temp directory: %w", err) return fmt.Errorf("failed to create temp directory in %s: %w", workDir, err)
} }
defer os.RemoveAll(tempDir) defer os.RemoveAll(tempDir)
@@ -726,7 +777,7 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string) error {
} }
} else if strings.HasSuffix(dumpFile, ".dump") { } else if strings.HasSuffix(dumpFile, ".dump") {
// Validate custom format dumps using pg_restore --list // Validate custom format dumps using pg_restore --list
cmd := exec.Command("pg_restore", "--list", dumpFile) cmd := exec.CommandContext(ctx, "pg_restore", "--list", dumpFile)
output, err := cmd.CombinedOutput() output, err := cmd.CombinedOutput()
if err != nil { if err != nil {
dbName := strings.TrimSuffix(entry.Name(), ".dump") dbName := strings.TrimSuffix(entry.Name(), ".dump")
@@ -811,6 +862,14 @@ func (e *Engine) RestoreCluster(ctx context.Context, archivePath string) error {
defer wg.Done() defer wg.Done()
defer func() { <-semaphore }() // Release defer func() { <-semaphore }() // Release
// Panic recovery - prevent one database failure from crashing entire cluster restore
defer func() {
if r := recover(); r != nil {
e.log.Error("Panic in database restore goroutine", "file", filename, "panic", r)
atomic.AddInt32(&failCount, 1)
}
}()
// Update estimator progress (thread-safe) // Update estimator progress (thread-safe)
mu.Lock() mu.Lock()
estimator.UpdateProgress(idx) estimator.UpdateProgress(idx)
@@ -938,16 +997,39 @@ func (e *Engine) extractArchive(ctx context.Context, archivePath, destDir string
} }
// Discard stderr output in chunks to prevent memory buildup // Discard stderr output in chunks to prevent memory buildup
buf := make([]byte, 4096) stderrDone := make(chan struct{})
for { go func() {
_, err := stderr.Read(buf) defer close(stderrDone)
if err != nil { buf := make([]byte, 4096)
break for {
_, err := stderr.Read(buf)
if err != nil {
break
}
} }
}()
// Wait for command with proper context handling
cmdDone := make(chan error, 1)
go func() {
cmdDone <- cmd.Wait()
}()
var cmdErr error
select {
case cmdErr = <-cmdDone:
// Command completed
case <-ctx.Done():
e.log.Warn("Archive extraction cancelled - killing process")
cmd.Process.Kill()
<-cmdDone
cmdErr = ctx.Err()
} }
if err := cmd.Wait(); err != nil { <-stderrDone
return fmt.Errorf("tar extraction failed: %w", err)
if cmdErr != nil {
return fmt.Errorf("tar extraction failed: %w", cmdErr)
} }
return nil return nil
} }
@@ -980,25 +1062,48 @@ func (e *Engine) restoreGlobals(ctx context.Context, globalsFile string) error {
return fmt.Errorf("failed to start psql: %w", err) return fmt.Errorf("failed to start psql: %w", err)
} }
// Read stderr in chunks // Read stderr in chunks in goroutine
buf := make([]byte, 4096)
var lastError string var lastError string
for { stderrDone := make(chan struct{})
n, err := stderr.Read(buf) go func() {
if n > 0 { defer close(stderrDone)
chunk := string(buf[:n]) buf := make([]byte, 4096)
if strings.Contains(chunk, "ERROR") || strings.Contains(chunk, "FATAL") { for {
lastError = chunk n, err := stderr.Read(buf)
e.log.Warn("Globals restore stderr", "output", chunk) if n > 0 {
chunk := string(buf[:n])
if strings.Contains(chunk, "ERROR") || strings.Contains(chunk, "FATAL") {
lastError = chunk
e.log.Warn("Globals restore stderr", "output", chunk)
}
}
if err != nil {
break
} }
} }
if err != nil { }()
break
} // Wait for command with proper context handling
cmdDone := make(chan error, 1)
go func() {
cmdDone <- cmd.Wait()
}()
var cmdErr error
select {
case cmdErr = <-cmdDone:
// Command completed
case <-ctx.Done():
e.log.Warn("Globals restore cancelled - killing process")
cmd.Process.Kill()
<-cmdDone
cmdErr = ctx.Err()
} }
if err := cmd.Wait(); err != nil { <-stderrDone
return fmt.Errorf("failed to restore globals: %w (last error: %s)", err, lastError)
if cmdErr != nil {
return fmt.Errorf("failed to restore globals: %w (last error: %s)", cmdErr, lastError)
} }
return nil return nil
@@ -1262,7 +1367,8 @@ func (e *Engine) detectLargeObjectsInDumps(dumpsDir string, entries []os.DirEntr
} }
// Use pg_restore -l to list contents (fast, doesn't restore data) // Use pg_restore -l to list contents (fast, doesn't restore data)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // 2 minutes for large dumps with many objects
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel() defer cancel()
cmd := exec.CommandContext(ctx, "pg_restore", "-l", dumpFile) cmd := exec.CommandContext(ctx, "pg_restore", "-l", dumpFile)

View File

@@ -3,6 +3,7 @@ package restore
import ( import (
"bufio" "bufio"
"compress/gzip" "compress/gzip"
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
@@ -20,11 +21,11 @@ import (
// RestoreErrorReport contains comprehensive information about a restore failure // RestoreErrorReport contains comprehensive information about a restore failure
type RestoreErrorReport struct { type RestoreErrorReport struct {
// Metadata // Metadata
Timestamp time.Time `json:"timestamp"` Timestamp time.Time `json:"timestamp"`
Version string `json:"version"` Version string `json:"version"`
GoVersion string `json:"go_version"` GoVersion string `json:"go_version"`
OS string `json:"os"` OS string `json:"os"`
Arch string `json:"arch"` Arch string `json:"arch"`
// Archive info // Archive info
ArchivePath string `json:"archive_path"` ArchivePath string `json:"archive_path"`
@@ -32,19 +33,19 @@ type RestoreErrorReport struct {
ArchiveFormat string `json:"archive_format"` ArchiveFormat string `json:"archive_format"`
// Database info // Database info
TargetDB string `json:"target_db"` TargetDB string `json:"target_db"`
DatabaseType string `json:"database_type"` DatabaseType string `json:"database_type"`
// Error details // Error details
ExitCode int `json:"exit_code"` ExitCode int `json:"exit_code"`
ErrorMessage string `json:"error_message"` ErrorMessage string `json:"error_message"`
ErrorType string `json:"error_type"` ErrorType string `json:"error_type"`
ErrorHint string `json:"error_hint"` ErrorHint string `json:"error_hint"`
TotalErrors int `json:"total_errors"` TotalErrors int `json:"total_errors"`
// Captured output // Captured output
LastStderr []string `json:"last_stderr"` LastStderr []string `json:"last_stderr"`
FirstErrors []string `json:"first_errors"` FirstErrors []string `json:"first_errors"`
// Context around failure // Context around failure
FailureContext *FailureContext `json:"failure_context,omitempty"` FailureContext *FailureContext `json:"failure_context,omitempty"`
@@ -53,9 +54,9 @@ type RestoreErrorReport struct {
DiagnosisResult *DiagnoseResult `json:"diagnosis_result,omitempty"` DiagnosisResult *DiagnoseResult `json:"diagnosis_result,omitempty"`
// Environment (sanitized) // Environment (sanitized)
PostgresVersion string `json:"postgres_version,omitempty"` PostgresVersion string `json:"postgres_version,omitempty"`
PgRestoreVersion string `json:"pg_restore_version,omitempty"` PgRestoreVersion string `json:"pg_restore_version,omitempty"`
PsqlVersion string `json:"psql_version,omitempty"` PsqlVersion string `json:"psql_version,omitempty"`
// Recommendations // Recommendations
Recommendations []string `json:"recommendations"` Recommendations []string `json:"recommendations"`
@@ -69,38 +70,38 @@ type FailureContext struct {
SurroundingLines []string `json:"surrounding_lines,omitempty"` SurroundingLines []string `json:"surrounding_lines,omitempty"`
// For COPY block errors // For COPY block errors
InCopyBlock bool `json:"in_copy_block,omitempty"` InCopyBlock bool `json:"in_copy_block,omitempty"`
CopyTableName string `json:"copy_table_name,omitempty"` CopyTableName string `json:"copy_table_name,omitempty"`
CopyStartLine int `json:"copy_start_line,omitempty"` CopyStartLine int `json:"copy_start_line,omitempty"`
SampleCopyData []string `json:"sample_copy_data,omitempty"` SampleCopyData []string `json:"sample_copy_data,omitempty"`
// File position info // File position info
BytePosition int64 `json:"byte_position,omitempty"` BytePosition int64 `json:"byte_position,omitempty"`
PercentComplete float64 `json:"percent_complete,omitempty"` PercentComplete float64 `json:"percent_complete,omitempty"`
} }
// ErrorCollector captures detailed error information during restore // ErrorCollector captures detailed error information during restore
type ErrorCollector struct { type ErrorCollector struct {
log logger.Logger log logger.Logger
cfg *config.Config cfg *config.Config
archivePath string archivePath string
targetDB string targetDB string
format ArchiveFormat format ArchiveFormat
// Captured data // Captured data
stderrLines []string stderrLines []string
firstErrors []string firstErrors []string
lastErrors []string lastErrors []string
totalErrors int totalErrors int
exitCode int exitCode int
// Limits // Limits
maxStderrLines int maxStderrLines int
maxErrorCapture int maxErrorCapture int
// State // State
startTime time.Time startTime time.Time
enabled bool enabled bool
} }
// NewErrorCollector creates a new error collector // NewErrorCollector creates a new error collector
@@ -556,7 +557,11 @@ func getDatabaseType(format ArchiveFormat) string {
} }
func getCommandVersion(cmd string, arg string) string { func getCommandVersion(cmd string, arg string) string {
output, err := exec.Command(cmd, arg).CombinedOutput() // Use timeout to prevent blocking if command hangs
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
output, err := exec.CommandContext(ctx, cmd, arg).CombinedOutput()
if err != nil { if err != nil {
return "" return ""
} }

View File

@@ -6,6 +6,7 @@ import (
"os/exec" "os/exec"
"regexp" "regexp"
"strconv" "strconv"
"time"
"dbbackup/internal/database" "dbbackup/internal/database"
) )
@@ -47,8 +48,13 @@ func ParsePostgreSQLVersion(versionStr string) (*VersionInfo, error) {
// GetDumpFileVersion extracts the PostgreSQL version from a dump file // GetDumpFileVersion extracts the PostgreSQL version from a dump file
// Uses pg_restore -l to read the dump metadata // Uses pg_restore -l to read the dump metadata
// Uses a 30-second timeout to avoid blocking on large files
func GetDumpFileVersion(dumpPath string) (*VersionInfo, error) { func GetDumpFileVersion(dumpPath string) (*VersionInfo, error) {
cmd := exec.Command("pg_restore", "-l", dumpPath) // Use a timeout context to prevent blocking on very large dump files
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
cmd := exec.CommandContext(ctx, "pg_restore", "-l", dumpPath)
output, err := cmd.CombinedOutput() output, err := cmd.CombinedOutput()
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to read dump file metadata: %w (output: %s)", err, string(output)) return nil, fmt.Errorf("failed to read dump file metadata: %w (output: %s)", err, string(output))

View File

@@ -83,10 +83,10 @@ type backupCompleteMsg struct {
func executeBackupWithTUIProgress(parentCtx context.Context, cfg *config.Config, log logger.Logger, backupType, dbName string, ratio int) tea.Cmd { func executeBackupWithTUIProgress(parentCtx context.Context, cfg *config.Config, log logger.Logger, backupType, dbName string, ratio int) tea.Cmd {
return func() tea.Msg { return func() tea.Msg {
// Use configurable cluster timeout (minutes) from config; default set in config.New() // NO TIMEOUT for backup operations - a backup takes as long as it takes
// Use parent context to inherit cancellation from TUI // Large databases can take many hours
clusterTimeout := time.Duration(cfg.ClusterTimeoutMinutes) * time.Minute // Only manual cancellation (Ctrl+C) should stop the backup
ctx, cancel := context.WithTimeout(parentCtx, clusterTimeout) ctx, cancel := context.WithCancel(parentCtx)
defer cancel() defer cancel()
start := time.Now() start := time.Now()

View File

@@ -53,7 +53,8 @@ type databaseListMsg struct {
func fetchDatabases(cfg *config.Config, log logger.Logger) tea.Cmd { func fetchDatabases(cfg *config.Config, log logger.Logger) tea.Cmd {
return func() tea.Msg { return func() tea.Msg {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) // 60 seconds for database listing - busy servers may be slow
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel() defer cancel()
dbClient, err := database.New(cfg, log) dbClient, err := database.New(cfg, log)

View File

@@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"os/exec" "os/exec"
"path/filepath"
"strings" "strings"
"time" "time"
@@ -110,10 +111,10 @@ type restoreCompleteMsg struct {
func executeRestoreWithTUIProgress(parentCtx context.Context, cfg *config.Config, log logger.Logger, archive ArchiveInfo, targetDB string, cleanFirst, createIfMissing bool, restoreType string, cleanClusterFirst bool, existingDBs []string, saveDebugLog bool) tea.Cmd { func executeRestoreWithTUIProgress(parentCtx context.Context, cfg *config.Config, log logger.Logger, archive ArchiveInfo, targetDB string, cleanFirst, createIfMissing bool, restoreType string, cleanClusterFirst bool, existingDBs []string, saveDebugLog bool) tea.Cmd {
return func() tea.Msg { return func() tea.Msg {
// Use configurable cluster timeout (minutes) from config; default set in config.New() // NO TIMEOUT for restore operations - a restore takes as long as it takes
// Use parent context to inherit cancellation from TUI // Large databases with large objects can take many hours
restoreTimeout := time.Duration(cfg.ClusterTimeoutMinutes) * time.Minute // Only manual cancellation (Ctrl+C) should stop the restore
ctx, cancel := context.WithTimeout(parentCtx, restoreTimeout) ctx, cancel := context.WithCancel(parentCtx)
defer cancel() defer cancel()
start := time.Now() start := time.Now()
@@ -137,8 +138,8 @@ func executeRestoreWithTUIProgress(parentCtx context.Context, cfg *config.Config
// This matches how cluster restore works - uses CLI tools, not database connections // This matches how cluster restore works - uses CLI tools, not database connections
droppedCount := 0 droppedCount := 0
for _, dbName := range existingDBs { for _, dbName := range existingDBs {
// Create timeout context for each database drop (30 seconds per DB) // Create timeout context for each database drop (5 minutes per DB - large DBs take time)
dropCtx, dropCancel := context.WithTimeout(ctx, 30*time.Second) dropCtx, dropCancel := context.WithTimeout(ctx, 5*time.Minute)
if err := dropDatabaseCLI(dropCtx, cfg, dbName); err != nil { if err := dropDatabaseCLI(dropCtx, cfg, dbName); err != nil {
log.Warn("Failed to drop database", "name", dbName, "error", err) log.Warn("Failed to drop database", "name", dbName, "error", err)
// Continue with other databases // Continue with other databases
@@ -157,8 +158,9 @@ func executeRestoreWithTUIProgress(parentCtx context.Context, cfg *config.Config
// Enable debug logging if requested // Enable debug logging if requested
if saveDebugLog { if saveDebugLog {
// Generate debug log path based on archive name and timestamp // Generate debug log path using configured WorkDir
debugLogPath := fmt.Sprintf("/tmp/dbbackup-restore-debug-%s.json", time.Now().Format("20060102-150405")) workDir := cfg.GetEffectiveWorkDir()
debugLogPath := filepath.Join(workDir, fmt.Sprintf("dbbackup-restore-debug-%s.json", time.Now().Format("20060102-150405")))
engine.SetDebugLogPath(debugLogPath) engine.SetDebugLogPath(debugLogPath)
log.Info("Debug logging enabled", "path", debugLogPath) log.Info("Debug logging enabled", "path", debugLogPath)
} }

View File

@@ -106,7 +106,8 @@ type safetyCheckCompleteMsg struct {
func runSafetyChecks(cfg *config.Config, log logger.Logger, archive ArchiveInfo, targetDB string) tea.Cmd { func runSafetyChecks(cfg *config.Config, log logger.Logger, archive ArchiveInfo, targetDB string) tea.Cmd {
return func() tea.Msg { return func() tea.Msg {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) // 10 minutes for safety checks - large archives can take a long time to diagnose
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel() defer cancel()
safety := restore.NewSafety(cfg, log) safety := restore.NewSafety(cfg, log)
@@ -471,7 +472,7 @@ func (m RestorePreviewModel) View() string {
s.WriteString(debugStyle.Render(fmt.Sprintf(" %s Debug Log: %v (press 'd' to toggle)", debugIcon, m.saveDebugLog))) s.WriteString(debugStyle.Render(fmt.Sprintf(" %s Debug Log: %v (press 'd' to toggle)", debugIcon, m.saveDebugLog)))
s.WriteString("\n") s.WriteString("\n")
if m.saveDebugLog { if m.saveDebugLog {
s.WriteString(infoStyle.Render(" Saves detailed error report to /tmp on failure")) s.WriteString(infoStyle.Render(fmt.Sprintf(" Saves detailed error report to %s on failure", m.config.GetEffectiveWorkDir())))
s.WriteString("\n") s.WriteString("\n")
} }
s.WriteString("\n") s.WriteString("\n")

View File

@@ -802,7 +802,7 @@ func (m SettingsModel) openDirectoryBrowser() (tea.Model, tea.Cmd) {
setting := m.settings[m.cursor] setting := m.settings[m.cursor]
currentValue := setting.Value(m.config) currentValue := setting.Value(m.config)
if currentValue == "" { if currentValue == "" {
currentValue = "/tmp" currentValue = m.config.GetEffectiveWorkDir()
} }
if m.dirBrowser == nil { if m.dirBrowser == nil {

View File

@@ -70,7 +70,8 @@ type statusMsg struct {
func fetchStatus(cfg *config.Config, log logger.Logger) tea.Cmd { func fetchStatus(cfg *config.Config, log logger.Logger) tea.Cmd {
return func() tea.Msg { return func() tea.Msg {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // 30 seconds for status check - slow networks or SSL negotiation
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel() defer cancel()
dbClient, err := database.New(cfg, log) dbClient, err := database.New(cfg, log)

View File

@@ -16,7 +16,7 @@ import (
// Build information (set by ldflags) // Build information (set by ldflags)
var ( var (
version = "3.40.0" version = "3.42.9"
buildTime = "unknown" buildTime = "unknown"
gitCommit = "unknown" gitCommit = "unknown"
) )