feat: update chunked upload endpoints and enhance upload resilience with improved logging and HMAC validation
This commit is contained in:
@ -30,6 +30,25 @@ func handleChunkedUpload(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
// BASIC HMAC VALIDATION - same as original handleUpload
|
||||
if conf.Security.EnableJWT {
|
||||
_, err := validateJWTFromRequest(r, conf.Security.JWTSecret)
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("JWT Authentication failed: %v", err), http.StatusUnauthorized)
|
||||
uploadErrorsTotal.Inc()
|
||||
return
|
||||
}
|
||||
log.Debugf("JWT authentication successful for chunked upload: %s", r.URL.Path)
|
||||
} else {
|
||||
err := validateHMAC(r, conf.Security.Secret)
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("HMAC Authentication failed: %v", err), http.StatusUnauthorized)
|
||||
uploadErrorsTotal.Inc()
|
||||
return
|
||||
}
|
||||
log.Debugf("HMAC authentication successful for chunked upload: %s", r.URL.Path)
|
||||
}
|
||||
|
||||
// Extract headers for chunked upload
|
||||
sessionID := r.Header.Get("X-Upload-Session-ID")
|
||||
chunkNumberStr := r.Header.Get("X-Chunk-Number")
|
||||
@ -122,8 +141,10 @@ func handleChunkedUpload(w http.ResponseWriter, r *http.Request) {
|
||||
defer chunkFile.Close()
|
||||
|
||||
// Copy chunk data with progress tracking
|
||||
log.Printf("DEBUG: Processing chunk %d for session %s (content-length: %d)", chunkNumber, sessionID, r.ContentLength)
|
||||
written, err := copyChunkWithResilience(chunkFile, r.Body, r.ContentLength, sessionID, chunkNumber)
|
||||
if err != nil {
|
||||
log.Printf("ERROR: Failed to save chunk %d for session %s: %v", chunkNumber, sessionID, err)
|
||||
http.Error(w, fmt.Sprintf("Error saving chunk: %v", err), http.StatusInternalServerError)
|
||||
uploadErrorsTotal.Inc()
|
||||
os.Remove(chunkPath) // Clean up failed chunk
|
||||
@ -138,15 +159,32 @@ func handleChunkedUpload(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
// Get updated session for completion check
|
||||
session, _ = uploadSessionStore.GetSession(sessionID)
|
||||
progress := float64(session.UploadedBytes) / float64(session.TotalSize)
|
||||
|
||||
// Debug logging for large files
|
||||
if session.TotalSize > 50*1024*1024 { // Log for files > 50MB
|
||||
log.Debugf("Chunk %d uploaded for %s: %d/%d bytes (%.1f%%)",
|
||||
chunkNumber, session.Filename, session.UploadedBytes, session.TotalSize, progress*100)
|
||||
}
|
||||
|
||||
// Check if upload is complete
|
||||
if uploadSessionStore.IsSessionComplete(sessionID) {
|
||||
isComplete := uploadSessionStore.IsSessionComplete(sessionID)
|
||||
log.Printf("DEBUG: Session %s completion check: %v (uploaded: %d, total: %d, progress: %.1f%%)",
|
||||
sessionID, isComplete, session.UploadedBytes, session.TotalSize, progress*100)
|
||||
|
||||
if isComplete {
|
||||
log.Printf("DEBUG: Starting file assembly for session %s", sessionID)
|
||||
// Assemble final file
|
||||
finalPath, err := uploadSessionStore.AssembleFile(sessionID)
|
||||
if err != nil {
|
||||
log.Printf("ERROR: File assembly failed for session %s: %v", sessionID, err)
|
||||
http.Error(w, fmt.Sprintf("Error assembling file: %v", err), http.StatusInternalServerError)
|
||||
uploadErrorsTotal.Inc()
|
||||
return
|
||||
}
|
||||
log.Printf("DEBUG: File assembly completed for session %s: %s", sessionID, finalPath)
|
||||
|
||||
// Handle deduplication if enabled (reuse existing logic)
|
||||
if conf.Server.DeduplicationEnabled {
|
||||
|
@ -603,11 +603,6 @@ func setupRouter() *http.ServeMux {
|
||||
|
||||
log.Info("HTTP router configured successfully with full protocol support (v, v2, token, v3)")
|
||||
|
||||
// Enhance router with network resilience features (non-intrusive)
|
||||
if conf.Uploads.ChunkedUploadsEnabled {
|
||||
EnhanceExistingRouter(mux)
|
||||
}
|
||||
|
||||
return mux
|
||||
}
|
||||
|
||||
|
@ -8,6 +8,21 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// Global flag to prevent duplicate route registration
|
||||
var routesEnhanced bool
|
||||
|
||||
// InitializeEnhancements initializes all new features and enhances the router
|
||||
func InitializeEnhancements(router *http.ServeMux) {
|
||||
// Initialize upload resilience system
|
||||
InitializeUploadResilience()
|
||||
|
||||
// Enhance the existing router with new endpoints (only once)
|
||||
if !routesEnhanced {
|
||||
EnhanceExistingRouter(router)
|
||||
routesEnhanced = true
|
||||
}
|
||||
}
|
||||
|
||||
// InitializeUploadResilience initializes the upload resilience system
|
||||
func InitializeUploadResilience() {
|
||||
// Initialize upload session store
|
||||
@ -22,16 +37,13 @@ func InitializeUploadResilience() {
|
||||
|
||||
// EnhanceExistingRouter adds new routes without modifying existing setupRouter function
|
||||
func EnhanceExistingRouter(mux *http.ServeMux) {
|
||||
// Add chunked upload endpoints
|
||||
mux.HandleFunc("/upload/chunked", ResilientHTTPHandler(handleChunkedUpload, networkManager))
|
||||
mux.HandleFunc("/upload/status", handleUploadStatus)
|
||||
// BASIC FUNCTION: Add chunked upload endpoints directly without wrappers
|
||||
mux.HandleFunc("/chunked-upload", handleChunkedUpload)
|
||||
mux.HandleFunc("/upload-status", handleUploadStatus)
|
||||
|
||||
// Wrap existing upload handlers with resilience (optional)
|
||||
if conf.Uploads.ChunkedUploadsEnabled {
|
||||
log.Info("Enhanced upload endpoints added:")
|
||||
log.Info(" POST/PUT /upload/chunked - Chunked/resumable uploads")
|
||||
log.Info(" GET /upload/status - Upload status check")
|
||||
}
|
||||
log.Info("Enhanced upload endpoints added:")
|
||||
log.Info(" POST/PUT /chunked-upload - Chunked/resumable uploads")
|
||||
log.Info(" GET /upload-status - Upload status check")
|
||||
}
|
||||
|
||||
// UpdateConfigurationDefaults suggests better defaults without forcing changes
|
||||
@ -116,19 +128,3 @@ func GetResilienceStatus() map[string]interface{} {
|
||||
|
||||
return status
|
||||
}
|
||||
|
||||
// Non-intrusive initialization function to be called from main()
|
||||
func InitializeEnhancements() {
|
||||
// Only initialize if chunked uploads are enabled
|
||||
if conf.Uploads.ChunkedUploadsEnabled {
|
||||
InitializeUploadResilience()
|
||||
|
||||
// Start performance monitoring
|
||||
go MonitorUploadPerformance()
|
||||
|
||||
// Log configuration recommendations
|
||||
UpdateConfigurationDefaults()
|
||||
} else {
|
||||
log.Info("Chunked uploads disabled. Enable 'chunkeduploadsenabled = true' for network resilience features")
|
||||
}
|
||||
}
|
||||
|
@ -698,6 +698,9 @@ func main() {
|
||||
|
||||
router := setupRouter() // Assuming setupRouter is defined (likely in this file or router.go
|
||||
|
||||
// Initialize enhancements and enhance the router
|
||||
InitializeEnhancements(router)
|
||||
|
||||
go handleFileCleanup(&conf) // Directly call handleFileCleanup
|
||||
|
||||
readTimeout, err := time.ParseDuration(conf.Timeouts.Read) // Corrected field name
|
||||
@ -766,9 +769,6 @@ func main() {
|
||||
}
|
||||
log.Infof("Running version: %s", versionString)
|
||||
|
||||
// Initialize network resilience features (non-intrusive)
|
||||
InitializeEnhancements()
|
||||
|
||||
log.Infof("Starting HMAC file server %s...", versionString)
|
||||
if conf.Server.UnixSocket {
|
||||
socketPath := "/tmp/hmac-file-server.sock" // Use a default socket path since ListenAddress is now a port
|
||||
|
Reference in New Issue
Block a user