Add test script for large file asynchronous post-processing

- Implemented a comprehensive test script to validate the new asynchronous handling of large file uploads (>1GB).
- The script checks for immediate HTTP responses, verifies server configurations for deduplication and virus scanning, and ensures server responsiveness during rapid uploads.
- Included checks for relevant response headers and session tracking.
- Documented the problem being solved, implementation details, and next steps for deployment and monitoring.
This commit is contained in:
2025-08-26 20:20:05 +00:00
parent 1c9700e51a
commit da403de111
33 changed files with 900 additions and 119 deletions

View File

@ -267,6 +267,13 @@ func generateSessionID(userJID, filename string) string {
return fmt.Sprintf("sess_%s", hex.EncodeToString(h.Sum(nil))[:16])
}
// Generate session ID for multi-upload scenarios
func generateUploadSessionID(uploadType, userAgent, clientIP string) string {
h := sha256.New()
h.Write([]byte(fmt.Sprintf("%s:%s:%s:%d", uploadType, userAgent, clientIP, time.Now().UnixNano())))
return fmt.Sprintf("upload_%s", hex.EncodeToString(h.Sum(nil))[:16])
}
// Detect network context for intelligent switching
func detectNetworkContext(r *http.Request) string {
clientIP := getClientIP(r)
@ -2572,6 +2579,17 @@ func handleUpload(w http.ResponseWriter, r *http.Request) {
activeConnections.Inc()
defer activeConnections.Dec()
// Enhanced session handling for multi-upload scenarios (Gajim fix)
sessionID := r.Header.Get("X-Session-ID")
if sessionID == "" {
// Generate session ID for multi-upload tracking
sessionID = generateUploadSessionID("upload", r.Header.Get("User-Agent"), getClientIP(r))
}
// Set session headers for client continuation
w.Header().Set("X-Session-ID", sessionID)
w.Header().Set("X-Upload-Session-Timeout", "3600") // 1 hour
// Only allow POST method
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
@ -2807,19 +2825,19 @@ func handleUpload(w http.ResponseWriter, r *http.Request) {
// Register upload with network resilience manager for WLAN/5G switching support
var uploadCtx *UploadContext
var sessionID string
var networkSessionID string
if networkManager != nil {
sessionID = r.Header.Get("X-Upload-Session-ID")
if sessionID == "" {
sessionID = fmt.Sprintf("upload_%s_%d", getClientIP(r), time.Now().UnixNano())
networkSessionID = r.Header.Get("X-Upload-Session-ID")
if networkSessionID == "" {
networkSessionID = fmt.Sprintf("upload_%s_%d", getClientIP(r), time.Now().UnixNano())
}
uploadCtx = networkManager.RegisterUpload(sessionID)
defer networkManager.UnregisterUpload(sessionID)
log.Infof("🌐 Registered upload with network resilience: session=%s, IP=%s", sessionID, getClientIP(r))
uploadCtx = networkManager.RegisterUpload(networkSessionID)
defer networkManager.UnregisterUpload(networkSessionID)
log.Infof("🌐 Registered upload with network resilience: session=%s, IP=%s", networkSessionID, getClientIP(r))
// Add network resilience headers
w.Header().Set("X-Network-Resilience", "enabled")
w.Header().Set("X-Upload-Context-ID", sessionID)
w.Header().Set("X-Upload-Context-ID", networkSessionID)
}
// Copy file content with network resilience support and enhanced progress tracking
@ -2833,6 +2851,97 @@ func handleUpload(w http.ResponseWriter, r *http.Request) {
return
}
// ✅ CRITICAL FIX: Send immediate success response for large files (>1GB)
// This prevents client timeouts while server does post-processing
isLargeFile := header.Size > 1024*1024*1024 // 1GB threshold
if isLargeFile {
log.Infof("🚀 Large file detected (%s), sending immediate success response", formatBytes(header.Size))
// Send immediate success response to client
duration := time.Since(startTime)
uploadDuration.Observe(duration.Seconds())
uploadsTotal.Inc()
uploadSizeBytes.Observe(float64(written))
w.Header().Set("Content-Type", "application/json")
w.Header().Set("X-Upload-Success", "true")
w.Header().Set("X-Upload-Duration", duration.String())
w.Header().Set("X-Large-File-Processing", "async")
w.Header().Set("X-Post-Processing", "background")
w.WriteHeader(http.StatusOK)
response := map[string]interface{}{
"success": true,
"filename": filename,
"size": written,
"duration": duration.String(),
"client_ip": getClientIP(r),
"timestamp": time.Now().Unix(),
"post_processing": "background",
}
// Add session information if available
if clientSession != nil {
response["session_id"] = clientSession.SessionID
response["connection_type"] = clientSession.ConnectionType
response["ip_count"] = len(clientSession.ClientIPs)
}
// Add user information if available
if bearerClaims != nil {
response["user"] = bearerClaims.User
}
// Send response immediately
if jsonBytes, err := json.Marshal(response); err == nil {
w.Write(jsonBytes)
} else {
fmt.Fprintf(w, `{"success": true, "filename": "%s", "size": %d, "post_processing": "background"}`, filename, written)
}
log.Infof("✅ Immediate response sent for large file %s (%s) in %s from IP %s",
filename, formatBytes(written), duration, getClientIP(r))
// Process deduplication asynchronously for large files
go func() {
if conf.Server.DeduplicationEnabled {
log.Infof("🔄 Starting background deduplication for large file: %s", filename)
ctx := context.Background()
err := handleDeduplication(ctx, absFilename)
if err != nil {
log.Warnf("⚠️ Background deduplication failed for %s: %v", absFilename, err)
} else {
log.Infof("✅ Background deduplication completed for %s", filename)
}
}
// Add to scan queue for virus scanning if enabled
if conf.ClamAV.ClamAVEnabled && len(conf.ClamAV.ScanFileExtensions) > 0 {
ext := strings.ToLower(filepath.Ext(header.Filename))
shouldScan := false
for _, scanExt := range conf.ClamAV.ScanFileExtensions {
if ext == strings.ToLower(scanExt) {
shouldScan = true
break
}
}
if shouldScan {
log.Infof("🔍 Starting background virus scan for large file: %s", filename)
err := scanFileWithClamAV(absFilename)
if err != nil {
log.Warnf("⚠️ Background virus scan failed for %s: %v", filename, err)
} else {
log.Infof("✅ Background virus scan completed for %s", filename)
}
}
}
}()
return
}
// Standard processing for small files (synchronous)
// Handle deduplication if enabled
if conf.Server.DeduplicationEnabled {
ctx := context.Background()
@ -3204,6 +3313,84 @@ func handleV3Upload(w http.ResponseWriter, r *http.Request) {
return
}
// ✅ CRITICAL FIX: Send immediate success response for large files (>1GB)
// This prevents client timeouts while server does post-processing
isLargeFile := written > 1024*1024*1024 // 1GB threshold
if isLargeFile {
log.Infof("🚀 Large file detected (%s), sending immediate success response (v3)", formatBytes(written))
// Send immediate success response to client
duration := time.Since(startTime)
uploadDuration.Observe(duration.Seconds())
uploadsTotal.Inc()
uploadSizeBytes.Observe(float64(written))
w.Header().Set("Content-Type", "application/json")
w.Header().Set("X-Upload-Success", "true")
w.Header().Set("X-Upload-Duration", duration.String())
w.Header().Set("X-Large-File-Processing", "async")
w.Header().Set("X-Post-Processing", "background")
w.WriteHeader(http.StatusOK)
response := map[string]interface{}{
"success": true,
"filename": filename,
"size": written,
"duration": duration.String(),
"protocol": "v3",
"post_processing": "background",
}
// Send response immediately
if jsonBytes, err := json.Marshal(response); err == nil {
w.Write(jsonBytes)
} else {
fmt.Fprintf(w, `{"success": true, "filename": "%s", "size": %d, "post_processing": "background"}`, filename, written)
}
log.Infof("✅ Immediate response sent for large file %s (%s) in %s via v3 protocol",
filename, formatBytes(written), duration)
// Process deduplication asynchronously for large files
go func() {
if conf.Server.DeduplicationEnabled {
log.Infof("🔄 Starting background deduplication for large file (v3): %s", filename)
ctx := context.Background()
err := handleDeduplication(ctx, absFilename)
if err != nil {
log.Warnf("⚠️ Background deduplication failed for %s: %v", absFilename, err)
} else {
log.Infof("✅ Background deduplication completed for %s (v3)", filename)
}
}
// Add to scan queue for virus scanning if enabled
if conf.ClamAV.ClamAVEnabled && len(conf.ClamAV.ScanFileExtensions) > 0 {
ext := strings.ToLower(filepath.Ext(originalFilename))
shouldScan := false
for _, scanExt := range conf.ClamAV.ScanFileExtensions {
if ext == strings.ToLower(scanExt) {
shouldScan = true
break
}
}
if shouldScan {
log.Infof("🔍 Starting background virus scan for large file (v3): %s", filename)
err := scanFileWithClamAV(absFilename)
if err != nil {
log.Warnf("⚠️ Background virus scan failed for %s: %v", filename, err)
} else {
log.Infof("✅ Background virus scan completed for %s (v3)", filename)
}
}
}
}()
return
}
// Standard processing for small files (synchronous)
// Handle deduplication if enabled
if conf.Server.DeduplicationEnabled {
ctx := context.Background()
@ -3248,6 +3435,18 @@ func handleLegacyUpload(w http.ResponseWriter, r *http.Request) {
log.Infof("🔥 DEBUG: handleLegacyUpload called - method:%s path:%s query:%s", r.Method, r.URL.Path, r.URL.RawQuery)
// Enhanced session handling for multi-upload scenarios (Gajim XMPP fix)
sessionID := r.Header.Get("X-Session-ID")
if sessionID == "" {
// Generate session ID for XMPP multi-upload tracking
sessionID = generateUploadSessionID("legacy", r.Header.Get("User-Agent"), getClientIP(r))
}
// Set session headers for XMPP client continuation
w.Header().Set("X-Session-ID", sessionID)
w.Header().Set("X-Upload-Session-Timeout", "3600") // 1 hour
w.Header().Set("X-Upload-Type", "legacy-xmpp")
log.Debugf("handleLegacyUpload: Processing request to %s with query: %s", r.URL.Path, r.URL.RawQuery)
// Only allow PUT method for legacy uploads
@ -3394,6 +3593,68 @@ func handleLegacyUpload(w http.ResponseWriter, r *http.Request) {
return
}
// ✅ CRITICAL FIX: Send immediate success response for large files (>1GB)
// This prevents client timeouts while server does post-processing
isLargeFile := written > 1024*1024*1024 // 1GB threshold
if isLargeFile {
log.Infof("🚀 Large file detected (%s), sending immediate success response (legacy)", formatBytes(written))
// Send immediate success response to client
duration := time.Since(startTime)
uploadDuration.Observe(duration.Seconds())
uploadsTotal.Inc()
uploadSizeBytes.Observe(float64(written))
// Return success response (201 Created for legacy compatibility)
w.Header().Set("X-Upload-Success", "true")
w.Header().Set("X-Upload-Duration", duration.String())
w.Header().Set("X-Large-File-Processing", "async")
w.Header().Set("X-Post-Processing", "background")
w.WriteHeader(http.StatusCreated)
log.Infof("✅ Immediate response sent for large file %s (%s) in %s via legacy protocol",
filename, formatBytes(written), duration)
// Process deduplication asynchronously for large files
go func() {
if conf.Server.DeduplicationEnabled {
log.Infof("🔄 Starting background deduplication for large file (legacy): %s", filename)
ctx := context.Background()
err := handleDeduplication(ctx, absFilename)
if err != nil {
log.Warnf("⚠️ Background deduplication failed for %s: %v", absFilename, err)
} else {
log.Infof("✅ Background deduplication completed for %s (legacy)", filename)
}
}
// Add to scan queue for virus scanning if enabled
if conf.ClamAV.ClamAVEnabled && len(conf.ClamAV.ScanFileExtensions) > 0 {
ext := strings.ToLower(filepath.Ext(fileStorePath))
shouldScan := false
for _, scanExt := range conf.ClamAV.ScanFileExtensions {
if ext == strings.ToLower(scanExt) {
shouldScan = true
break
}
}
if shouldScan {
log.Infof("🔍 Starting background virus scan for large file (legacy): %s", filename)
err := scanFileWithClamAV(absFilename)
if err != nil {
log.Warnf("⚠️ Background virus scan failed for %s: %v", filename, err)
} else {
log.Infof("✅ Background virus scan completed for %s (legacy)", filename)
}
}
}
}()
return
}
// Standard processing for small files (synchronous)
// Handle deduplication if enabled
if conf.Server.DeduplicationEnabled {
ctx := context.Background()