update to latest stable
This commit is contained in:
parent
c1f8bc1534
commit
330956ebc7
@ -21,11 +21,10 @@ import (
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"sync"
|
||||
|
||||
"github.com/dutchcoders/go-clamd" // ClamAV integration
|
||||
"github.com/go-redis/redis/v8" // Redis integration
|
||||
"github.com/patrickmn/go-cache"
|
||||
@ -156,15 +155,15 @@ var (
|
||||
uploadSizeBytes prometheus.Histogram
|
||||
downloadSizeBytes prometheus.Histogram
|
||||
|
||||
// Constants for worker pool
|
||||
MinWorkers = 5 // Increased from 10 to 20 for better concurrency
|
||||
UploadQueueSize = 10000 // Increased from 5000 to 10000
|
||||
|
||||
// Channels
|
||||
scanQueue chan ScanTask
|
||||
ScanWorkers = 5 // Number of ClamAV scan workers
|
||||
)
|
||||
|
||||
const (
|
||||
MinFreeBytes = 1 << 30 // 1 GB
|
||||
)
|
||||
|
||||
func main() {
|
||||
// Set default configuration values
|
||||
setDefaults()
|
||||
@ -191,6 +190,12 @@ func main() {
|
||||
}
|
||||
log.WithField("directory", conf.Server.StoragePath).Info("Store directory is ready")
|
||||
|
||||
// Check free space with retry
|
||||
err = checkFreeSpaceWithRetry(conf.Server.StoragePath, 3, 5*time.Second)
|
||||
if err != nil {
|
||||
log.Fatalf("Insufficient free space: %v", err)
|
||||
}
|
||||
|
||||
// Setup logging
|
||||
setupLogging()
|
||||
|
||||
@ -203,6 +208,7 @@ func main() {
|
||||
|
||||
// Initialize upload and scan queues
|
||||
uploadQueue = make(chan UploadTask, conf.Workers.UploadQueueSize)
|
||||
log.Infof("Upload queue initialized with size: %d", conf.Workers.UploadQueueSize)
|
||||
scanQueue = make(chan ScanTask, conf.Workers.UploadQueueSize)
|
||||
networkEvents = make(chan NetworkEvent, 100)
|
||||
log.Info("Upload, scan, and network event channels initialized.")
|
||||
@ -797,10 +803,10 @@ func uploadWorker(ctx context.Context, workerID int) {
|
||||
|
||||
// Initialize upload worker pool
|
||||
func initializeUploadWorkerPool(ctx context.Context) {
|
||||
for i := 0; i < MinWorkers; i++ {
|
||||
for i := 0; i < conf.Workers.NumWorkers; i++ {
|
||||
go uploadWorker(ctx, i)
|
||||
}
|
||||
log.Infof("Initialized %d upload workers", MinWorkers)
|
||||
log.Infof("Initialized %d upload workers", conf.Workers.NumWorkers)
|
||||
}
|
||||
|
||||
// Worker function to process scan tasks
|
||||
@ -841,10 +847,10 @@ func scanWorker(ctx context.Context, workerID int) {
|
||||
|
||||
// Initialize scan worker pool
|
||||
func initializeScanWorkerPool(ctx context.Context) {
|
||||
for i := 0; i < ScanWorkers; i++ {
|
||||
for i := 0; i < conf.ClamAV.NumScanWorkers; i++ {
|
||||
go scanWorker(ctx, i)
|
||||
}
|
||||
log.Infof("Initialized %d scan workers", ScanWorkers)
|
||||
log.Infof("Initialized %d scan workers", conf.ClamAV.NumScanWorkers)
|
||||
}
|
||||
|
||||
// Setup router with middleware
|
||||
@ -1589,7 +1595,7 @@ func MonitorRedisHealth(ctx context.Context, client *redis.Client, checkInterval
|
||||
}
|
||||
redisConnected = false
|
||||
} else {
|
||||
if !redisConnected {
|
||||
if (!redisConnected) {
|
||||
log.Info("Redis reconnected successfully")
|
||||
}
|
||||
redisConnected = true
|
||||
@ -1732,7 +1738,7 @@ func computeFileHash(filePath string) (string, error) {
|
||||
return hex.EncodeToString(hasher.Sum(nil)), nil
|
||||
}
|
||||
|
||||
// Handle multipart uploads
|
||||
// handleMultipartUpload handles multipart file uploads
|
||||
func handleMultipartUpload(w http.ResponseWriter, r *http.Request, absFilename string) error {
|
||||
err := r.ParseMultipartForm(32 << 20) // 32MB is the default used by FormFile
|
||||
if err != nil {
|
||||
@ -1866,26 +1872,12 @@ func checkStorageSpace(storagePath string, minFreeBytes int64) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Function to compute SHA256 checksum of a file
|
||||
func computeSHA256(filePath string) (string, error) {
|
||||
file, err := os.Open(filePath)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to open file for checksum: %w", err)
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
hasher := sha256.New()
|
||||
if _, err := io.Copy(hasher, file); err != nil {
|
||||
return "", fmt.Errorf("failed to compute checksum: %w", err)
|
||||
}
|
||||
|
||||
return hex.EncodeToString(hasher.Sum(nil)), nil
|
||||
}
|
||||
// Helper function to create formatted errors
|
||||
|
||||
// handleDeduplication handles file deduplication using SHA256 checksum and hard links
|
||||
func handleDeduplication(ctx context.Context, absFilename string) error {
|
||||
// Compute checksum of the uploaded file
|
||||
checksum, err := computeSHA256(absFilename)
|
||||
checksum, err := computeSHA256(ctx, absFilename)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to compute SHA256 for %s: %v", absFilename, err)
|
||||
return fmt.Errorf("checksum computation failed: %w", err)
|
||||
@ -1894,30 +1886,95 @@ func handleDeduplication(ctx context.Context, absFilename string) error {
|
||||
|
||||
// Check Redis for existing checksum
|
||||
existingPath, err := redisClient.Get(ctx, checksum).Result()
|
||||
if err != nil && err != redis.Nil {
|
||||
if err != nil {
|
||||
if err == redis.Nil {
|
||||
// Checksum does not exist, store it in Redis
|
||||
err = redisClient.Set(ctx, checksum, absFilename, 0).Err()
|
||||
if err != nil {
|
||||
log.Errorf("Redis error while setting checksum %s: %v", checksum, err)
|
||||
return fmt.Errorf("redis error: %w", err)
|
||||
}
|
||||
log.Infof("Stored new checksum %s for file %s", checksum, absFilename)
|
||||
return nil
|
||||
}
|
||||
// Handle other Redis errors
|
||||
log.Errorf("Redis error while fetching checksum %s: %v", checksum, err)
|
||||
return fmt.Errorf("redis error: %w", err)
|
||||
}
|
||||
|
||||
if err != redis.Nil {
|
||||
// Duplicate found, create hard link
|
||||
log.Infof("Duplicate detected: %s already exists at %s", absFilename, existingPath)
|
||||
// Checksum exists, create a hard link to the existing file
|
||||
if existingPath != absFilename {
|
||||
// Verify that existingPath exists before creating a hard link
|
||||
if _, err := os.Stat(existingPath); os.IsNotExist(err) {
|
||||
log.Errorf("Existing file for checksum %s does not exist at path %s", checksum, existingPath)
|
||||
return fmt.Errorf("existing file does not exist: %w", err)
|
||||
}
|
||||
|
||||
// Attempt to create a hard link
|
||||
err = os.Link(existingPath, absFilename)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to create hard link from %s to %s: %v", existingPath, absFilename, err)
|
||||
return fmt.Errorf("failed to create hard link: %w", err)
|
||||
}
|
||||
log.Infof("Created hard link from %s to %s", existingPath, absFilename)
|
||||
return nil
|
||||
log.Infof("Created hard link for duplicate file %s pointing to %s", absFilename, existingPath)
|
||||
} else {
|
||||
log.Infof("File %s already exists with the same checksum", absFilename)
|
||||
}
|
||||
|
||||
// No duplicate found, store checksum in Redis
|
||||
err = redisClient.Set(ctx, checksum, absFilename, 0).Err()
|
||||
if err != nil {
|
||||
log.Errorf("Failed to store checksum %s in Redis: %v", checksum, err)
|
||||
return fmt.Errorf("failed to store checksum in Redis: %w", err)
|
||||
}
|
||||
|
||||
log.Infof("Stored new file checksum in Redis: %s -> %s", checksum, absFilename)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Usage in checkFreeSpace function
|
||||
|
||||
// Function to compute SHA256 with context support
|
||||
func computeSHA256(ctx context.Context, filePath string) (string, error) {
|
||||
if filePath == "" {
|
||||
return "", fmt.Errorf("computeSHA256: filePath cannot be empty")
|
||||
}
|
||||
|
||||
file, err := os.Open(filePath)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to open file for checksum: %v", err)
|
||||
return "", fmt.Errorf("computeSHA256: failed to open file %s: %w", filePath, err)
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
hasher := sha256.New()
|
||||
reader := bufio.NewReader(file)
|
||||
|
||||
buffer := make([]byte, 4096)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return "", fmt.Errorf("computeSHA256: operation cancelled")
|
||||
default:
|
||||
n, err := reader.Read(buffer)
|
||||
if n > 0 {
|
||||
if _, wErr := hasher.Write(buffer[:n]); wErr != nil {
|
||||
return "", fmt.Errorf("computeSHA256: failed to write to hasher: %w", wErr)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
checksum := hex.EncodeToString(hasher.Sum(nil))
|
||||
log.Debugf("Computed SHA256 checksum for file %s: %s", filePath, checksum)
|
||||
return checksum, nil
|
||||
}
|
||||
return "", fmt.Errorf("computeSHA256: read error: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Function to check free space with retry mechanism
|
||||
func checkFreeSpaceWithRetry(path string, retries int, delay time.Duration) error {
|
||||
for i := 0; i < retries; i++ {
|
||||
if err := checkStorageSpace(path, MinFreeBytes); err != nil {
|
||||
log.Warnf("Free space check failed (attempt %d/%d): %v", i+1, retries, err)
|
||||
time.Sleep(delay)
|
||||
continue
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("checkFreeSpace: insufficient free space after %d attempts", retries)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user