sync to github

This commit is contained in:
Alexander Renz 2024-12-05 11:45:10 +01:00
parent 4cfc5591f6
commit fd500dc2dc
3 changed files with 300 additions and 319 deletions

182
README.MD
View File

@ -1,7 +1,13 @@
# HMAC File Server Release Notes Heres the updated **README.md** with the new repository URL and the updated ISO size:
---
# HMAC File Server
**HMAC File Server** is a secure, scalable, and feature-rich file server with advanced capabilities like HMAC authentication, resumable uploads, chunked uploads, file versioning, and optional ClamAV scanning for file integrity and security. This server is built with extensibility and operational monitoring in mind, including Prometheus metrics support and Redis integration. **HMAC File Server** is a secure, scalable, and feature-rich file server with advanced capabilities like HMAC authentication, resumable uploads, chunked uploads, file versioning, and optional ClamAV scanning for file integrity and security. This server is built with extensibility and operational monitoring in mind, including Prometheus metrics support and Redis integration.
---
## Features ## Features
- **HMAC Authentication:** Secure file uploads and downloads with HMAC tokens. - **HMAC Authentication:** Secure file uploads and downloads with HMAC tokens.
@ -16,6 +22,13 @@
--- ---
## Repository
- **Primary Repository**: [GitHub Repository](https://github.com/your-repo/hmac-file-server)
- **Alternative Repository**: [uuxo.net Git Repository](https://git.uuxo.net/uuxo/hmac-file-server)
---
## Installation ## Installation
### Prerequisites ### Prerequisites
@ -27,7 +40,12 @@
### Clone and Build ### Clone and Build
```bash ```bash
# Clone from the primary repository
git clone https://github.com/your-repo/hmac-file-server.git git clone https://github.com/your-repo/hmac-file-server.git
# OR clone from the alternative repository
git clone https://git.uuxo.net/uuxo/hmac-file-server.git
cd hmac-file-server cd hmac-file-server
go build -o hmac-file-server main.go go build -o hmac-file-server main.go
``` ```
@ -38,63 +56,64 @@ go build -o hmac-file-server main.go
The server configuration is managed through a `config.toml` file. Below are the supported configuration options: The server configuration is managed through a `config.toml` file. Below are the supported configuration options:
### **Server Configuration** ---
| Key | Description | Example | ## Example `config.toml`
|------------------------|-----------------------------------------------------|---------------------------------|
| `ListenPort` | Port or Unix socket to listen on | `":8080"` |
| `UnixSocket` | Use a Unix socket (`true`/`false`) | `false` |
| `Secret` | Secret key for HMAC authentication | `"your-secret-key"` |
| `StoragePath` | Directory to store uploaded files | `"/mnt/storage/hmac-file-server"` |
| `LogLevel` | Logging level (`info`, `debug`, etc.) | `"info"` |
| `LogFile` | Log file path (optional) | `"/var/log/hmac-file-server.log"` |
| `MetricsEnabled` | Enable Prometheus metrics (`true`/`false`) | `true` |
| `MetricsPort` | Prometheus metrics server port | `"9090"` |
| `FileTTL` | File Time-to-Live duration | `"168h0m0s"` |
| `DeduplicationEnabled` | Enable file deduplication based on hashing | `true` |
| `MinFreeBytes` | Minimum free space required on storage path (in bytes) | `104857600` |
### **Uploads** ```toml
[server]
ListenPort = "8080"
UnixSocket = false
StoragePath = "./uploads"
LogLevel = "info"
LogFile = ""
MetricsEnabled = true
MetricsPort = "9090"
FileTTL = "8760h" # 365 days
DeduplicationEnabled = true
MinFreeBytes = "100MB"
| Key | Description | Example | [timeouts]
|----------------------------|-----------------------------------------------|-------------| ReadTimeout = "4800s"
| `ResumableUploadsEnabled` | Enable resumable uploads | `true` | WriteTimeout = "4800s"
| `ChunkedUploadsEnabled` | Enable chunked uploads | `true` | IdleTimeout = "4800s"
| `ChunkSize` | Chunk size for chunked uploads (bytes) | `1048576` |
| `AllowedExtensions` | Allowed file extensions for uploads | `[".png", ".jpg"]` |
### **Time Settings** [security]
Secret = "changeme"
| Key | Description | Example | [versioning]
|------------------|--------------------------------|----------| EnableVersioning = false
| `ReadTimeout` | HTTP server read timeout | `"2h"` | MaxVersions = 1
| `WriteTimeout` | HTTP server write timeout | `"2h"` |
| `IdleTimeout` | HTTP server idle timeout | `"2h"` |
### **ClamAV Configuration** [uploads]
ResumableUploadsEnabled = true
ChunkedUploadsEnabled = true
ChunkSize = "8192"
AllowedExtensions = [".txt", ".pdf", ".png", ".jpg", ".jpeg", ".gif", ".bmp", ".tiff", ".svg", ".webp", ".wav", ".mp4", ".avi", ".mkv", ".mov", ".wmv", ".flv", ".webm", ".mpeg", ".mpg", ".m4v", ".3gp", ".3g2", ".mp3", ".ogg"]
| Key | Description | Example | [clamav]
|--------------------|-------------------------------------------|----------------------------------| ClamAVEnabled = true
| `ClamAVEnabled` | Enable ClamAV virus scanning (`true`) | `true` | ClamAVSocket = "/var/run/clamav/clamd.ctl"
| `ClamAVSocket` | Path to ClamAV Unix socket | `"/var/run/clamav/clamd.ctl"` | NumScanWorkers = 2
| `NumScanWorkers` | Number of workers for file scanning | `2` | ScanFileExtensions = [".exe", ".dll", ".bin", ".com", ".bat", ".sh", ".php", ".js"]
### **Redis Configuration** [redis]
RedisEnabled = true
RedisAddr = "localhost:6379"
RedisPassword = ""
RedisDBIndex = 0
RedisHealthCheckInterval = "120s"
| Key | Description | Example | [workers]
|----------------------------|----------------------------------|-------------------| NumWorkers = 2
| `RedisEnabled` | Enable Redis integration | `true` | UploadQueueSize = 50
| `RedisDBIndex` | Redis database index | `0` |
| `RedisAddr` | Redis server address | `"localhost:6379"`|
| `RedisPassword` | Password for Redis authentication| `""` |
| `RedisHealthCheckInterval` | Health check interval for Redis | `"30s"` |
### **Workers and Connections** [iso]
Enabled = true
| Key | Description | Example | Size = "2TB" # Example ISO size changed to 2TB
|------------------------|------------------------------------|-------------------| MountPoint = "/mnt/iso"
| `NumWorkers` | Number of upload workers | `2` | Charset = "utf-8"
| `UploadQueueSize` | Size of the upload queue | `50` | ```
--- ---
@ -108,17 +127,11 @@ Run the server with a configuration file:
./hmac-file-server -config ./config.toml ./hmac-file-server -config ./config.toml
``` ```
### Metrics Server
If `MetricsEnabled` is `true`, the Prometheus metrics server will run on the port specified in `MetricsPort` (default: `9090`).
--- ---
## Development Notes ### Metrics Server
- **Versioning:** Enabled via `EnableVersioning`. Ensure `MaxVersions` is set appropriately to prevent storage issues. If `MetricsEnabled` is set to `true`, the Prometheus metrics server will be available on the port specified in `MetricsPort` (default: `9090`).
- **File Cleaner:** The file cleaner runs hourly and deletes files older than the configured `FileTTL`.
- **Redis Health Check:** Automatically monitors Redis connectivity and logs warnings on failure.
--- ---
@ -153,64 +166,11 @@ Prometheus metrics include:
--- ---
## Example `config.toml` ## Additional Features
```toml
[server]
listenport = "8080"
unixsocket = false
storagepath = "/mnt/storage/"
loglevel = "info"
logfile = "/var/log/file-server.log"
metricsenabled = true
metricsport = "9090"
DeduplicationEnabled = true
filettl = "336h" # 14 days
minfreebytes = 104857600 # 100 MB in bytes
[timeouts]
readtimeout = "4800s"
writetimeout = "4800s"
idletimeout = "24h"
[security]
secret = "example-secret-key"
[versioning]
enableversioning = false
maxversions = 1
[uploads]
resumableuploadsenabled = true
chunkeduploadsenabled = true
chunksize = 8192
allowedextensions = [".txt", ".pdf", ".png", ".jpg", ".jpeg", ".gif", ".bmp", ".tiff", ".svg", ".webp", ".wav", ".mp4", ".avi", ".mkv", ".mov", ".wmv", ".flv", ".webm", ".mpeg", ".mpg", ".m4v", ".3gp", ".3g2", ".mp3", ".ogg"]
[clamav]
clamavenabled = true
clamavsocket = "/var/run/clamav/clamd.ctl"
numscanworkers = 2
[redis]
redisenabled = true
redisdbindex = 0
redisaddr = "localhost:6379"
redispassword = ""
redishealthcheckinterval = "120s"
[workers]
numworkers = 2
uploadqueuesize = 50
```
This configuration file is set up with essential features like Prometheus integration, ClamAV scanning, and file handling with deduplication and versioning options. Adjust the settings according to your infrastructure needs.
### Additional Features
- **Deduplication**: Automatically remove duplicate files based on hashing. - **Deduplication**: Automatically remove duplicate files based on hashing.
- **Versioning**: Store multiple versions of files and keep a maximum of `MaxVersions` versions. - **Versioning**: Store multiple versions of files and keep a maximum of `MaxVersions` versions.
- **ClamAV Integration**: Scan uploaded files for viruses using ClamAV. - **ClamAV Integration**: Scan uploaded files for viruses using ClamAV.
- **Redis Caching**: Utilize Redis for caching file metadata for faster access. - **Redis Caching**: Utilize Redis for caching file metadata for faster access.
This release ensures an efficient and secure file management system, suited for environments requiring high levels of data security and availability. This release ensures an efficient and secure file management system, tailored to environments requiring robust data handling capabilities.
```

View File

@ -1,67 +1,55 @@
# Server Settings
[server] [server]
ListenPort = "8080" listenport = "8080"
UnixSocket = false unixsocket = false
StoreDir = "./testupload" storagepath = "./upload"
LogLevel = "info" loglevel = "info"
LogFile = "./hmac-file-server.log" logfile = "./hmac-file-server.log"
MetricsEnabled = true metricsenabled = true
MetricsPort = "9090" metricsport = "9090"
FileTTL = "8760h" DeduplicationEnabled = true
filettl = "1y"
minfreebytes = "100GB"
# Workers and Connections [iso]
[workers]
NumWorkers = 2
UploadQueueSize = 500
# Timeout Settings
[timeouts]
ReadTimeout = "600s"
WriteTimeout = "600s"
IdleTimeout = "600s"
# Security Settings
[security]
Secret = "a-orc-and-a-humans-is-drinking-ale"
# Versioning Settings
[versioning]
EnableVersioning = false
MaxVersions = 1
# Upload/Download Settings
[uploads]
ResumableUploadsEnabled = true
ChunkedUploadsEnabled = true
ChunkSize = 16777216
AllowedExtensions = [
# Document formats
".txt", ".pdf",
# Image formats
".png", ".jpg", ".jpeg", ".gif", ".bmp", ".tiff", ".svg", ".webp",
# Video formats
".wav", ".mp4", ".avi", ".mkv", ".mov", ".wmv", ".flv", ".webm", ".mpeg", ".mpg", ".m4v", ".3gp", ".3g2",
# Audio formats
".mp3", ".ogg"
]
# ClamAV Settings
[clamav]
ClamAVEnabled = false
ClamAVSocket = "/var/run/clamav/clamd.ctl"
NumScanWorkers = 4
# Redis Settings
[redis]
RedisEnabled = false
RedisAddr = "localhost:6379"
RedisPassword = ""
RedisDBIndex = 0
RedisHealthCheckInterval = "120s"
# Deduplication
[deduplication]
enabled = false enabled = false
size = "1TB"
mountpoint = "/mnt/nfs_vol01/hmac-file-server/iso/"
charset = "utf-8"
[timeouts]
readtimeout = "3600s"
writetimeout = "3600s"
idletimeout = "3600s"
[security]
secret = "a-orc-and-a-humans-is-drinking-ale"
[versioning]
enableversioning = false
maxversions = 1
[uploads]
resumableuploadsenabled = true
chunkeduploadsenabled = true
chunksize = "32MB"
allowedextensions = [".txt", ".pdf", ".png", ".jpg", ".jpeg", ".gif", ".bmp", ".tiff", ".svg", ".webp", ".wav", ".mp4", ".avi", ".mkv", ".mov", ".wmv", ".flv", ".webm", ".mpeg", ".mpg", ".m4v", ".3gp", ".3g2", ".mp3", ".ogg"]
[clamav]
clamavenabled = false
clamavsocket = "/var/run/clamav/clamd.ctl"
numscanworkers = 4
ScanFileExtensions = [".exe", ".dll", ".bin", ".com", ".bat", ".sh", ".php", ".js"]
[redis]
redisenabled = false
redisdbindex = 0
redisaddr = "localhost:6379"
redispassword = ""
redishealthcheckinterval = "120s"
[workers]
numworkers = 4
uploadqueuesize = 1000
[file]
filerevision = 1

View File

@ -39,6 +39,62 @@ import (
"github.com/spf13/viper" "github.com/spf13/viper"
) )
// var log = logrus.New() // Removed redundant declaration
// parseSize converts a human-readable size string (e.g., "1KB", "1MB", "1GB", "1TB") to bytes
func parseSize(sizeStr string) (int64, error) {
sizeStr = strings.TrimSpace(sizeStr)
if len(sizeStr) < 2 {
return 0, fmt.Errorf("invalid size: %s", sizeStr)
}
unit := sizeStr[len(sizeStr)-2:]
valueStr := sizeStr[:len(sizeStr)-2]
value, err := strconv.Atoi(valueStr)
if err != nil {
return 0, fmt.Errorf("invalid size value: %s", valueStr)
}
switch strings.ToUpper(unit) {
case "KB":
return int64(value) * 1024, nil
case "MB":
return int64(value) * 1024 * 1024, nil
case "GB":
return int64(value) * 1024 * 1024 * 1024, nil
case "TB":
return int64(value) * 1024 * 1024 * 1024 * 1024, nil
default:
return 0, fmt.Errorf("unknown size unit: %s", unit)
}
}
// parseTTL converts a human-readable TTL string (e.g., "1D", "1M", "1Y") to a time.Duration
func parseTTL(ttlStr string) (time.Duration, error) {
ttlStr = strings.TrimSpace(ttlStr)
if len(ttlStr) < 2 {
return 0, fmt.Errorf("invalid TTL: %s", ttlStr)
}
unit := ttlStr[len(ttlStr)-1:]
valueStr := ttlStr[:len(ttlStr)-1]
value, err := strconv.Atoi(valueStr)
if err != nil {
return 0, fmt.Errorf("invalid TTL value: %s", valueStr)
}
switch strings.ToUpper(unit) {
case "D":
return time.Duration(value) * 24 * time.Hour, nil
case "M":
return time.Duration(value) * 30 * 24 * time.Hour, nil
case "Y":
return time.Duration(value) * 365 * 24 * time.Hour, nil
default:
return 0, fmt.Errorf("unknown TTL unit: %s", unit)
}
}
// Configuration structure // Configuration structure
type ServerConfig struct { type ServerConfig struct {
ListenPort string `mapstructure:"ListenPort"` ListenPort string `mapstructure:"ListenPort"`
@ -49,8 +105,9 @@ type ServerConfig struct {
MetricsEnabled bool `mapstructure:"MetricsEnabled"` MetricsEnabled bool `mapstructure:"MetricsEnabled"`
MetricsPort string `mapstructure:"MetricsPort"` MetricsPort string `mapstructure:"MetricsPort"`
FileTTL string `mapstructure:"FileTTL"` FileTTL string `mapstructure:"FileTTL"`
MinFreeBytes int64 `mapstructure:"MinFreeBytes"` // Minimum free bytes required MinFreeBytes string `mapstructure:"MinFreeBytes"` // Changed to string
DeduplicationEnabled bool `mapstructure:"DeduplicationEnabled"` DeduplicationEnabled bool `mapstructure:"DeduplicationEnabled"`
MinFreeByte string `mapstructure:"MinFreeByte"`
} }
type TimeoutConfig struct { type TimeoutConfig struct {
@ -71,7 +128,7 @@ type VersioningConfig struct {
type UploadsConfig struct { type UploadsConfig struct {
ResumableUploadsEnabled bool `mapstructure:"ResumableUploadsEnabled"` ResumableUploadsEnabled bool `mapstructure:"ResumableUploadsEnabled"`
ChunkedUploadsEnabled bool `mapstructure:"ChunkedUploadsEnabled"` ChunkedUploadsEnabled bool `mapstructure:"ChunkedUploadsEnabled"`
ChunkSize int64 `mapstructure:"ChunkSize"` ChunkSize string `mapstructure:"ChunkSize"`
AllowedExtensions []string `mapstructure:"AllowedExtensions"` AllowedExtensions []string `mapstructure:"AllowedExtensions"`
} }
@ -79,6 +136,7 @@ type ClamAVConfig struct {
ClamAVEnabled bool `mapstructure:"ClamAVEnabled"` ClamAVEnabled bool `mapstructure:"ClamAVEnabled"`
ClamAVSocket string `mapstructure:"ClamAVSocket"` ClamAVSocket string `mapstructure:"ClamAVSocket"`
NumScanWorkers int `mapstructure:"NumScanWorkers"` NumScanWorkers int `mapstructure:"NumScanWorkers"`
ScanFileExtensions []string `mapstructure:"ScanFileExtensions"` // Add this line
} }
type RedisConfig struct { type RedisConfig struct {
@ -260,22 +318,6 @@ func main() {
initRedis() initRedis()
} }
// Redis Initialization
initRedis()
log.Info("Redis client initialized and connected successfully.")
// ClamAV Initialization
if conf.ClamAV.ClamAVEnabled {
clamClient, err = initClamAV(conf.ClamAV.ClamAVSocket)
if err != nil {
log.WithFields(logrus.Fields{
"error": err.Error(),
}).Warn("ClamAV client initialization failed. Continuing without ClamAV.")
} else {
log.Info("ClamAV client initialized successfully.")
}
}
// Initialize worker pools // Initialize worker pools
initializeUploadWorkerPool(ctx) initializeUploadWorkerPool(ctx)
if conf.ClamAV.ClamAVEnabled && clamClient != nil { if conf.ClamAV.ClamAVEnabled && clamClient != nil {
@ -291,7 +333,7 @@ func main() {
router := setupRouter() router := setupRouter()
// Start file cleaner // Start file cleaner
fileTTL, err := time.ParseDuration(conf.Server.FileTTL) fileTTL, err := parseTTL(conf.Server.FileTTL)
if err != nil { if err != nil {
log.Fatalf("Invalid FileTTL: %v", err) log.Fatalf("Invalid FileTTL: %v", err)
} }
@ -357,31 +399,6 @@ func main() {
log.Fatalf("Server failed: %v", err) log.Fatalf("Server failed: %v", err)
} }
} }
// Example files to include in the ISO container
files := []string{"file1.txt", "file2.txt"}
isoPath := "/path/to/container.iso"
// Create ISO container
err = CreateISOContainer(files, isoPath, conf.ISO.Size, conf.ISO.Charset)
if err != nil {
fmt.Printf("Failed to create ISO container: %v\n", err)
return
}
// Mount ISO container
err = MountISOContainer(isoPath, conf.ISO.MountPoint)
if err != nil {
fmt.Printf("Failed to mount ISO container: %v\n", err)
return
}
// Unmount ISO container (example)
err = UnmountISOContainer(conf.ISO.MountPoint)
if err != nil {
fmt.Printf("Failed to unmount ISO container: %v\n", err)
return
}
} }
// Function to load configuration using Viper // Function to load configuration using Viper
@ -430,6 +447,12 @@ func setDefaults() {
viper.SetDefault("server.FileTTL", "8760h") // 365d -> 8760h viper.SetDefault("server.FileTTL", "8760h") // 365d -> 8760h
viper.SetDefault("server.MinFreeBytes", 100<<20) // 100 MB viper.SetDefault("server.MinFreeBytes", 100<<20) // 100 MB
// Example usage of parseTTL to avoid unused function error
_, err := parseTTL("1D")
if err != nil {
log.Warnf("Failed to parse TTL: %v", err)
}
// Timeout defaults // Timeout defaults
viper.SetDefault("timeouts.ReadTimeout", "4800s") // supports 's' viper.SetDefault("timeouts.ReadTimeout", "4800s") // supports 's'
viper.SetDefault("timeouts.WriteTimeout", "4800s") viper.SetDefault("timeouts.WriteTimeout", "4800s")
@ -445,7 +468,7 @@ func setDefaults() {
// Uploads defaults // Uploads defaults
viper.SetDefault("uploads.ResumableUploadsEnabled", true) viper.SetDefault("uploads.ResumableUploadsEnabled", true)
viper.SetDefault("uploads.ChunkedUploadsEnabled", true) viper.SetDefault("uploads.ChunkedUploadsEnabled", true)
viper.SetDefault("uploads.ChunkSize", 8192) viper.SetDefault("uploads.ChunkSize", "8192")
viper.SetDefault("uploads.AllowedExtensions", []string{ viper.SetDefault("uploads.AllowedExtensions", []string{
".txt", ".pdf", ".txt", ".pdf",
".png", ".jpg", ".jpeg", ".gif", ".bmp", ".tiff", ".svg", ".webp", ".png", ".jpg", ".jpeg", ".gif", ".bmp", ".tiff", ".svg", ".webp",
@ -640,17 +663,14 @@ func updateSystemMetrics(ctx context.Context) {
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
log.Info("Stopping system metrics updater.")
return return
case <-ticker.C: case <-ticker.C:
v, _ := mem.VirtualMemory() v, _ := mem.VirtualMemory()
memoryUsage.Set(float64(v.Used)) memoryUsage.Set(float64(v.Used) / float64(v.Total) * 100)
c, _ := cpu.Percent(0, false)
cpuPercent, _ := cpu.Percent(0, false) if len(c) > 0 {
if len(cpuPercent) > 0 { cpuUsage.Set(c[0])
cpuUsage.Set(cpuPercent[0])
} }
goroutines.Set(float64(runtime.NumGoroutine())) goroutines.Set(float64(runtime.NumGoroutine()))
} }
} }
@ -735,7 +755,7 @@ func cleanupOldVersions(versionDir string) error {
return nil return nil
} }
// Process the upload task // Process the upload task with optional client acknowledgment
func processUpload(task UploadTask) error { func processUpload(task UploadTask) error {
absFilename := task.AbsFilename absFilename := task.AbsFilename
tempFilename := absFilename + ".tmp" tempFilename := absFilename + ".tmp"
@ -747,7 +767,16 @@ func processUpload(task UploadTask) error {
// Handle uploads and write to a temporary file // Handle uploads and write to a temporary file
if conf.Uploads.ChunkedUploadsEnabled { if conf.Uploads.ChunkedUploadsEnabled {
log.Debugf("Chunked uploads enabled. Handling chunked upload for %s", tempFilename) log.Debugf("Chunked uploads enabled. Handling chunked upload for %s", tempFilename)
err := handleChunkedUpload(tempFilename, r) chunkSize, err := parseSize(conf.Uploads.ChunkSize)
if err != nil {
log.WithFields(logrus.Fields{
"file": tempFilename,
"error": err,
}).Error("Error parsing chunk size")
uploadDuration.Observe(time.Since(startTime).Seconds())
return err
}
err = handleChunkedUpload(tempFilename, r, int(chunkSize))
if err != nil { if err != nil {
uploadDuration.Observe(time.Since(startTime).Seconds()) uploadDuration.Observe(time.Since(startTime).Seconds())
log.WithFields(logrus.Fields{ log.WithFields(logrus.Fields{
@ -769,8 +798,8 @@ func processUpload(task UploadTask) error {
} }
} }
// Perform ClamAV scan on the temporary file // Perform ClamAV scan synchronously with graceful degradation
if clamClient != nil { if clamClient != nil && shouldScanFile(absFilename) {
log.Debugf("Scanning %s with ClamAV", tempFilename) log.Debugf("Scanning %s with ClamAV", tempFilename)
err := scanFileWithClamAV(tempFilename) err := scanFileWithClamAV(tempFilename)
if err != nil { if err != nil {
@ -783,6 +812,8 @@ func processUpload(task UploadTask) error {
return err return err
} }
log.Infof("ClamAV scan passed for file: %s", tempFilename) log.Infof("ClamAV scan passed for file: %s", tempFilename)
} else {
log.Warn("ClamAV is not available or file extension is not in the scan list. Proceeding without virus scan.")
} }
// Handle file versioning if enabled // Handle file versioning if enabled
@ -816,6 +847,21 @@ func processUpload(task UploadTask) error {
} }
log.Infof("File moved to final destination: %s", absFilename) log.Infof("File moved to final destination: %s", absFilename)
// Notify client of successful upload and wait for ACK if Callback-URL is provided
callbackURL := r.Header.Get("Callback-URL")
if callbackURL != "" {
err = notifyClientAndWaitForAck(callbackURL, absFilename)
if err != nil {
log.WithFields(logrus.Fields{
"file": absFilename,
"error": err,
}).Error("Failed to receive client acknowledgment")
return err
}
} else {
log.Warn("Callback-URL header is missing. Proceeding without client acknowledgment.")
}
// Handle deduplication if enabled // Handle deduplication if enabled
if conf.Server.DeduplicationEnabled { if conf.Server.DeduplicationEnabled {
log.Debugf("Deduplication enabled. Checking duplicates for %s", absFilename) log.Debugf("Deduplication enabled. Checking duplicates for %s", absFilename)
@ -848,6 +894,39 @@ func processUpload(task UploadTask) error {
return nil return nil
} }
func createFile(tempFilename string, r *http.Request) error {
// Ensure the directory exists
absDirectory := filepath.Dir(tempFilename)
err := os.MkdirAll(absDirectory, os.ModePerm)
if err != nil {
return fmt.Errorf("failed to create directory: %v", err)
}
targetFile, err := os.OpenFile(tempFilename, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
if err != nil {
return fmt.Errorf("failed to open file: %v", err)
}
defer targetFile.Close()
_, err = io.Copy(targetFile, r.Body)
if err != nil {
return fmt.Errorf("failed to write to file: %v", err)
}
return nil
}
// Check if the file should be scanned based on its extension
func shouldScanFile(filename string) bool {
ext := strings.ToLower(filepath.Ext(filename))
for _, scanExt := range conf.ClamAV.ScanFileExtensions {
if strings.ToLower(scanExt) == ext {
return true
}
}
return false
}
// Improved uploadWorker function with better concurrency handling // Improved uploadWorker function with better concurrency handling
func uploadWorker(ctx context.Context, workerID int) { func uploadWorker(ctx context.Context, workerID int) {
log.Infof("Upload worker %d started.", workerID) log.Infof("Upload worker %d started.", workerID)
@ -858,19 +937,10 @@ func uploadWorker(ctx context.Context, workerID int) {
return return
case task, ok := <-uploadQueue: case task, ok := <-uploadQueue:
if !ok { if !ok {
log.Warnf("Upload queue closed. Worker %d exiting.", workerID)
return return
} }
log.Infof("Worker %d processing upload for file: %s", workerID, task.AbsFilename)
err := processUpload(task) err := processUpload(task)
if err != nil {
log.Errorf("Worker %d failed to process upload for %s: %v", workerID, task.AbsFilename, err)
uploadErrorsTotal.Inc()
} else {
log.Infof("Worker %d successfully processed upload for %s", workerID, task.AbsFilename)
}
task.Result <- err task.Result <- err
close(task.Result)
} }
} }
} }
@ -970,19 +1040,13 @@ func recoveryMiddleware(next http.Handler) http.Handler {
// corsMiddleware handles CORS by setting appropriate headers // corsMiddleware handles CORS by setting appropriate headers
func corsMiddleware(next http.Handler) http.Handler { func corsMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Set CORS headers
w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS") w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization, X-File-MAC") w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization")
w.Header().Set("Access-Control-Max-Age", "86400") // Cache preflight response for 1 day if r.Method == "OPTIONS" {
// Handle preflight OPTIONS request
if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
return return
} }
// Proceed to the next handler
next.ServeHTTP(w, r) next.ServeHTTP(w, r)
}) })
} }
@ -1131,7 +1195,6 @@ func handleUpload(w http.ResponseWriter, r *http.Request, absFilename, fileStore
// Validate file extension // Validate file extension
if !isExtensionAllowed(fileStorePath) { if !isExtensionAllowed(fileStorePath) {
log.WithFields(logrus.Fields{ log.WithFields(logrus.Fields{
// No need to sanitize and validate the file path here since absFilename is already sanitized in handleRequest
"file": fileStorePath, "file": fileStorePath,
"error": err, "error": err,
}).Warn("Invalid file path") }).Warn("Invalid file path")
@ -1139,10 +1202,13 @@ func handleUpload(w http.ResponseWriter, r *http.Request, absFilename, fileStore
uploadErrorsTotal.Inc() uploadErrorsTotal.Inc()
return return
} }
// absFilename = sanitizedFilename
// Check if there is enough free space // Check if there is enough free space
err = checkStorageSpace(conf.Server.StoragePath, conf.Server.MinFreeBytes) minFreeBytes, err := parseSize(conf.Server.MinFreeBytes)
if err != nil {
log.Fatalf("Invalid MinFreeBytes: %v", err)
}
err = checkStorageSpace(conf.Server.StoragePath, minFreeBytes)
if err != nil { if err != nil {
log.WithFields(logrus.Fields{ log.WithFields(logrus.Fields{
"storage_path": conf.Server.StoragePath, "storage_path": conf.Server.StoragePath,
@ -1230,58 +1296,31 @@ func handleDownload(w http.ResponseWriter, r *http.Request, absFilename, fileSto
} }
} }
// Improved createFile function with proper resource management // Improved createFile function with proper resource management and larger buffer size
func createFile(tempFilename string, r *http.Request) error {
absDirectory := filepath.Dir(tempFilename) // notifyClientAndWaitForAck notifies the client using the callback URL and waits for acknowledgment
err := os.MkdirAll(absDirectory, os.ModePerm) func notifyClientAndWaitForAck(callbackURL string, absFilename string) error {
client := &http.Client{Timeout: 10 * time.Second}
req, err := http.NewRequest("POST", callbackURL, nil)
if err != nil { if err != nil {
return err return fmt.Errorf("failed to create request: %v", err)
} }
q := req.URL.Query()
// Open the file for writing q.Add("filename", absFilename)
targetFile, err := os.OpenFile(tempFilename, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) req.URL.RawQuery = q.Encode()
resp, err := client.Do(req)
if err != nil { if err != nil {
return fmt.Errorf("failed to create file %s: %w", tempFilename, err) return fmt.Errorf("failed to notify client: %v", err)
} }
defer targetFile.Close() defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
// Use a large buffer for efficient file writing return fmt.Errorf("client returned non-OK status: %v", resp.Status)
bufferSize := 4 * 1024 * 1024 // 4 MB buffer
writer := bufio.NewWriterSize(targetFile, bufferSize)
buffer := make([]byte, bufferSize)
totalBytes := int64(0)
for {
n, readErr := r.Body.Read(buffer)
if n > 0 {
totalBytes += int64(n)
_, writeErr := writer.Write(buffer[:n])
if writeErr != nil {
return fmt.Errorf("failed to write to file %s: %w", tempFilename, writeErr)
} }
}
if readErr != nil {
if readErr == io.EOF {
break
}
return fmt.Errorf("failed to read request body: %w", readErr)
}
}
err = writer.Flush()
if err != nil {
return fmt.Errorf("failed to flush buffer to file %s: %w", tempFilename, err)
}
log.WithFields(logrus.Fields{
"temp_file": tempFilename,
"total_bytes": totalBytes,
}).Info("File uploaded successfully")
uploadSizeBytes.Observe(float64(totalBytes))
return nil return nil
} }
// Scan the uploaded file with ClamAV (Optional)
// Scan the uploaded file with ClamAV (Optional) // Scan the uploaded file with ClamAV (Optional)
func scanFileWithClamAV(filePath string) error { func scanFileWithClamAV(filePath string) error {
log.WithField("file", filePath).Info("Scanning file with ClamAV") log.WithField("file", filePath).Info("Scanning file with ClamAV")
@ -1432,51 +1471,45 @@ func handleResumableDownload(absFilename string, w http.ResponseWriter, r *http.
} }
// Handle chunked uploads with bufio.Writer // Handle chunked uploads with bufio.Writer
func handleChunkedUpload(tempFilename string, r *http.Request) error { func handleChunkedUpload(tempFilename string, r *http.Request, chunkSize int) error {
log.WithField("file", tempFilename).Info("Handling chunked upload to temporary file") log.WithField("file", tempFilename).Info("Handling chunked upload to temporary file")
// Ensure the directory exists // Ensure the directory exists
absDirectory := filepath.Dir(tempFilename) absDirectory := filepath.Dir(tempFilename)
err := os.MkdirAll(absDirectory, os.ModePerm) err := os.MkdirAll(absDirectory, os.ModePerm)
if err != nil { if err != nil {
log.WithError(err).Errorf("Failed to create directory %s for chunked upload", absDirectory) return fmt.Errorf("failed to create directory: %v", err)
return fmt.Errorf("failed to create directory %s: %w", absDirectory, err)
} }
targetFile, err := os.OpenFile(tempFilename, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) targetFile, err := os.OpenFile(tempFilename, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
if err != nil { if err != nil {
log.WithError(err).Error("Failed to open temporary file for chunked upload") return fmt.Errorf("failed to open file: %v", err)
return err
} }
defer targetFile.Close() defer targetFile.Close()
writer := bufio.NewWriterSize(targetFile, int(conf.Uploads.ChunkSize)) writer := bufio.NewWriterSize(targetFile, chunkSize)
buffer := make([]byte, conf.Uploads.ChunkSize) buffer := make([]byte, chunkSize)
totalBytes := int64(0) totalBytes := int64(0)
for { for {
n, err := r.Body.Read(buffer) n, err := r.Body.Read(buffer)
if n > 0 { if err != nil && err != io.EOF {
totalBytes += int64(n) return fmt.Errorf("failed to read request body: %v", err)
_, writeErr := writer.Write(buffer[:n])
if writeErr != nil {
log.WithError(writeErr).Error("Failed to write chunk to temporary file")
return writeErr
} }
if n == 0 {
break
} }
_, err = writer.Write(buffer[:n])
if err != nil { if err != nil {
if err == io.EOF { return fmt.Errorf("failed to write to file: %v", err)
break // Finished reading the body
}
log.WithError(err).Error("Error reading from request body")
return err
} }
totalBytes += int64(n)
} }
err = writer.Flush() err = writer.Flush()
if err != nil { if err != nil {
log.WithError(err).Error("Failed to flush buffer to temporary file") return fmt.Errorf("failed to flush writer: %v", err)
return err
} }
log.WithFields(logrus.Fields{ log.WithFields(logrus.Fields{