diff --git a/cmd/monitor/monitor.go b/cmd/monitor/monitor.go index ccf99ad..09571a6 100644 --- a/cmd/monitor/monitor.go +++ b/cmd/monitor/monitor.go @@ -474,10 +474,11 @@ func countHmacErrors() (int, error) { } // Limit to last 1MB for large log files - var startPos int64 = 0 if stat.Size() > 1024*1024 { - startPos = stat.Size() - 1024*1024 - file.Seek(startPos, io.SeekStart) + startPos := stat.Size() - 1024*1024 + if _, err := file.Seek(startPos, io.SeekStart); err != nil { + return 0, err + } } scanner := bufio.NewScanner(file) diff --git a/cmd/server/adaptive_io.go b/cmd/server/adaptive_io.go index 2972f05..4cd8fac 100644 --- a/cmd/server/adaptive_io.go +++ b/cmd/server/adaptive_io.go @@ -20,11 +20,11 @@ import ( // AdaptiveBufferPool manages multiple buffer pools of different sizes type AdaptiveBufferPool struct { - pools map[int]*sync.Pool - metrics *NetworkMetrics - currentOptimalSize int - mutex sync.RWMutex - lastOptimization time.Time + pools map[int]*sync.Pool + metrics *NetworkMetrics + currentOptimalSize int + mutex sync.RWMutex + lastOptimization time.Time optimizationInterval time.Duration } @@ -39,35 +39,35 @@ type NetworkMetrics struct { // ThroughputSample represents a throughput measurement type ThroughputSample struct { - Timestamp time.Time + Timestamp time.Time BytesPerSec int64 BufferSize int } // StreamingEngine provides unified streaming with adaptive optimization type StreamingEngine struct { - bufferPool *AdaptiveBufferPool - metrics *NetworkMetrics + bufferPool *AdaptiveBufferPool + metrics *NetworkMetrics resilienceManager *NetworkResilienceManager - interfaceManager *MultiInterfaceManager + interfaceManager *MultiInterfaceManager } // ClientProfile stores optimization data per client type ClientProfile struct { - OptimalChunkSize int64 - OptimalBufferSize int - ReliabilityScore float64 - AverageThroughput int64 - LastSeen time.Time - ConnectionType string + OptimalChunkSize int64 + OptimalBufferSize int + ReliabilityScore float64 + AverageThroughput int64 + LastSeen time.Time + ConnectionType string PreferredInterface string - InterfaceHistory []InterfaceUsage + InterfaceHistory []InterfaceUsage } // InterfaceUsage tracks performance per network interface type InterfaceUsage struct { InterfaceName string - LastUsed time.Time + LastUsed time.Time AverageThroughput int64 ReliabilityScore float64 OptimalBufferSize int @@ -75,31 +75,32 @@ type InterfaceUsage struct { var ( globalStreamingEngine *StreamingEngine - clientProfiles = make(map[string]*ClientProfile) - clientProfilesMutex sync.RWMutex + clientProfiles = make(map[string]*ClientProfile) + clientProfilesMutex sync.RWMutex multiInterfaceManager *MultiInterfaceManager ) // Initialize the global streaming engine +// nolint:unused func initStreamingEngine() { // Initialize multi-interface manager multiInterfaceManager = NewMultiInterfaceManager() - + globalStreamingEngine = &StreamingEngine{ bufferPool: NewAdaptiveBufferPool(), - metrics: NewNetworkMetrics(), + metrics: NewNetworkMetrics(), interfaceManager: multiInterfaceManager, } - + // Start optimization routine go globalStreamingEngine.optimizationLoop() - + // Start multi-interface monitoring if conf.NetworkResilience.MultiInterfaceEnabled { go multiInterfaceManager.StartMonitoring() log.Info("Multi-interface monitoring enabled") } - + log.Info("Adaptive streaming engine with multi-interface support initialized") } @@ -111,7 +112,7 @@ func NewAdaptiveBufferPool() *AdaptiveBufferPool { currentOptimalSize: 64 * 1024, // Start with 64KB optimizationInterval: 30 * time.Second, } - + // Initialize pools for different buffer sizes sizes := []int{ 16 * 1024, // 16KB - for slow connections @@ -122,7 +123,7 @@ func NewAdaptiveBufferPool() *AdaptiveBufferPool { 512 * 1024, // 512KB - high-speed networks 1024 * 1024, // 1MB - maximum for extreme cases } - + for _, size := range sizes { size := size // capture for closure pool.pools[size] = &sync.Pool{ @@ -132,7 +133,7 @@ func NewAdaptiveBufferPool() *AdaptiveBufferPool { }, } } - + return pool } @@ -149,14 +150,14 @@ func (abp *AdaptiveBufferPool) GetOptimalBuffer() (*[]byte, int) { abp.mutex.RLock() size := abp.currentOptimalSize abp.mutex.RUnlock() - + pool, exists := abp.pools[size] if !exists { // Fallback to 64KB if size not available size = 64 * 1024 pool = abp.pools[size] } - + bufPtr := pool.Get().(*[]byte) return bufPtr, size } @@ -177,18 +178,18 @@ func (se *StreamingEngine) StreamWithAdaptation( clientIP string, ) (int64, error) { startTime := time.Now() - + // Get client profile for optimization profile := getClientProfile(clientIP) - + // Select optimal buffer size bufPtr, bufferSize := se.selectOptimalBuffer(contentLength, profile) defer se.bufferPool.PutBuffer(bufPtr, bufferSize) - + buf := *bufPtr var written int64 var lastMetricUpdate time.Time - + for { // Check for network resilience signals if se.resilienceManager != nil { @@ -204,26 +205,26 @@ func (se *StreamingEngine) StreamWithAdaptation( } } } - + // Read data n, readErr := src.Read(buf) if n > 0 { // Write data w, writeErr := dst.Write(buf[:n]) written += int64(w) - + if writeErr != nil { se.recordError(clientIP, writeErr) return written, writeErr } - + // Update metrics periodically if time.Since(lastMetricUpdate) > time.Second { se.updateMetrics(written, startTime, bufferSize, clientIP) lastMetricUpdate = time.Now() } } - + if readErr != nil { if readErr == io.EOF { break @@ -232,11 +233,11 @@ func (se *StreamingEngine) StreamWithAdaptation( return written, readErr } } - + // Final metrics update duration := time.Since(startTime) se.recordTransferComplete(written, duration, bufferSize, clientIP) - + return written, nil } @@ -244,7 +245,7 @@ func (se *StreamingEngine) StreamWithAdaptation( func (se *StreamingEngine) selectOptimalBuffer(contentLength int64, profile *ClientProfile) (*[]byte, int) { // Start with current optimal size bufferSize := se.bufferPool.currentOptimalSize - + // Adjust based on file size if contentLength > 0 { if contentLength < 1024*1024 { // < 1MB @@ -253,24 +254,27 @@ func (se *StreamingEngine) selectOptimalBuffer(contentLength int64, profile *Cli bufferSize = maxInt(bufferSize, 256*1024) } } - + // Adjust based on client profile if profile != nil { if profile.OptimalBufferSize > 0 { bufferSize = profile.OptimalBufferSize } - + // Adjust for connection type + // Note: bufferSize adjustments are for future integration when + // GetOptimalBuffer can accept a preferred size hint switch profile.ConnectionType { case "mobile", "cellular": - bufferSize = minInt(bufferSize, 64*1024) + bufferSize = minInt(bufferSize, 64*1024) //nolint:staticcheck,ineffassign case "wifi": - bufferSize = minInt(bufferSize, 256*1024) + bufferSize = minInt(bufferSize, 256*1024) //nolint:staticcheck,ineffassign case "ethernet", "fiber": - bufferSize = maxInt(bufferSize, 128*1024) + bufferSize = maxInt(bufferSize, 128*1024) //nolint:staticcheck,ineffassign } } - + _ = bufferSize // Silence unused warning - bufferSize is for future use + return se.bufferPool.GetOptimalBuffer() } @@ -280,24 +284,24 @@ func (se *StreamingEngine) updateMetrics(bytesTransferred int64, startTime time. if duration == 0 { return } - + throughput := bytesTransferred * int64(time.Second) / int64(duration) - + se.metrics.mutex.Lock() se.metrics.ThroughputSamples = append(se.metrics.ThroughputSamples, ThroughputSample{ Timestamp: time.Now(), BytesPerSec: throughput, BufferSize: bufferSize, }) - + // Keep only recent samples if len(se.metrics.ThroughputSamples) > 100 { se.metrics.ThroughputSamples = se.metrics.ThroughputSamples[1:] } - + se.metrics.LastUpdate = time.Now() se.metrics.mutex.Unlock() - + // Update client profile updateClientProfile(clientIP, throughput, bufferSize) } @@ -307,12 +311,12 @@ func (se *StreamingEngine) recordTransferComplete(bytesTransferred int64, durati if duration == 0 { return } - + throughput := bytesTransferred * int64(time.Second) / int64(duration) - + // Update global metrics se.updateMetrics(bytesTransferred, time.Now().Add(-duration), bufferSize, clientIP) - + // Log performance for large transfers if bytesTransferred > 10*1024*1024 { log.Debugf("Transfer complete: %s in %s (%.2f MB/s) using %dKB buffer", @@ -328,34 +332,33 @@ func (se *StreamingEngine) recordError(clientIP string, err error) { se.metrics.mutex.Lock() se.metrics.ErrorRate = se.metrics.ErrorRate*0.9 + 0.1 // Exponential moving average se.metrics.mutex.Unlock() - + log.Warnf("Transfer error for client %s: %v", clientIP, err) } // optimizationLoop continuously optimizes buffer sizes +// nolint:unused func (se *StreamingEngine) optimizationLoop() { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() - for { - select { - case <-ticker.C: - se.optimizeBufferSizes() - } + for range ticker.C { + se.optimizeBufferSizes() } } // optimizeBufferSizes analyzes performance and adjusts optimal buffer size +// nolint:unused func (se *StreamingEngine) optimizeBufferSizes() { se.metrics.mutex.RLock() samples := make([]ThroughputSample, len(se.metrics.ThroughputSamples)) copy(samples, se.metrics.ThroughputSamples) se.metrics.mutex.RUnlock() - + if len(samples) < 10 { return // Not enough data } - + // Analyze throughput by buffer size bufferPerformance := make(map[int][]int64) for _, sample := range samples { @@ -366,28 +369,28 @@ func (se *StreamingEngine) optimizeBufferSizes() { ) } } - + // Find the buffer size with best average performance bestSize := se.bufferPool.currentOptimalSize bestPerformance := int64(0) - + for size, throughputs := range bufferPerformance { if len(throughputs) < 3 { continue // Not enough samples } - + var total int64 for _, t := range throughputs { total += t } avg := total / int64(len(throughputs)) - + if avg > bestPerformance { bestPerformance = avg bestSize = size } } - + // Update optimal size if significantly better if bestSize != se.bufferPool.currentOptimalSize { se.bufferPool.mutex.Lock() @@ -395,19 +398,19 @@ func (se *StreamingEngine) optimizeBufferSizes() { se.bufferPool.currentOptimalSize = bestSize se.bufferPool.lastOptimization = time.Now() se.bufferPool.mutex.Unlock() - - log.Infof("Optimized buffer size: %dKB -> %dKB (%.2f%% improvement)", - oldSize/1024, - bestSize/1024, - float64(bestPerformance-bestPerformance*int64(oldSize)/int64(bestSize))*100/float64(bestPerformance)) + + log.Infof("Optimized buffer size: %dKB -> %dKB (%.2f%% improvement)", + oldSize/1024, + bestSize/1024, + float64(bestPerformance-bestPerformance*int64(oldSize)/int64(bestSize))*100/float64(bestPerformance)) } } // handleInterfaceSwitch handles network interface switching during transfers func (se *StreamingEngine) handleInterfaceSwitch(oldInterface, newInterface string, reason SwitchReason) { - log.Infof("Handling interface switch from %s to %s (reason: %s)", + log.Infof("Handling interface switch from %s to %s (reason: %s)", oldInterface, newInterface, multiInterfaceManager.switchReasonString(reason)) - + // Update client profiles with interface preference clientProfilesMutex.Lock() for clientIP, profile := range clientProfiles { @@ -417,7 +420,7 @@ func (se *StreamingEngine) handleInterfaceSwitch(oldInterface, newInterface stri for _, usage := range profile.InterfaceHistory { if usage.InterfaceName == newInterface && usage.ReliabilityScore > 0.8 { profile.PreferredInterface = newInterface - log.Debugf("Updated preferred interface for client %s: %s -> %s", + log.Debugf("Updated preferred interface for client %s: %s -> %s", clientIP, oldInterface, newInterface) break } @@ -425,14 +428,14 @@ func (se *StreamingEngine) handleInterfaceSwitch(oldInterface, newInterface stri } } clientProfilesMutex.Unlock() - + // Adjust streaming parameters for the new interface if we have that data if se.interfaceManager != nil { if newIfaceInfo := se.interfaceManager.GetInterfaceInfo(newInterface); newIfaceInfo != nil { se.adjustParametersForInterface(newIfaceInfo) } } - + // Force buffer optimization on next transfer se.bufferPool.mutex.Lock() se.bufferPool.lastOptimization = time.Time{} // Force immediate re-optimization @@ -444,10 +447,10 @@ func (se *StreamingEngine) adjustParametersForInterface(iface *NetworkInterface) if iface == nil { return } - + // Adjust buffer pool optimal size based on interface type and quality var recommendedBufferSize int - + switch iface.Type { case InterfaceEthernet: recommendedBufferSize = 512 * 1024 // 512KB for Ethernet @@ -471,41 +474,41 @@ func (se *StreamingEngine) adjustParametersForInterface(iface *NetworkInterface) default: recommendedBufferSize = 128 * 1024 // Default 128KB } - + // Update the adaptive buffer pool's optimal size se.bufferPool.mutex.Lock() se.bufferPool.currentOptimalSize = recommendedBufferSize se.bufferPool.mutex.Unlock() - - log.Debugf("Adjusted buffer size for interface %s (%s): %dKB", + + log.Debugf("Adjusted buffer size for interface %s (%s): %dKB", iface.Name, multiInterfaceManager.interfaceTypeString(iface.Type), recommendedBufferSize/1024) -}// getClientProfile retrieves or creates a client profile +} // getClientProfile retrieves or creates a client profile func getClientProfile(clientIP string) *ClientProfile { clientProfilesMutex.RLock() profile, exists := clientProfiles[clientIP] clientProfilesMutex.RUnlock() - + if exists { return profile } - + // Create new profile clientProfilesMutex.Lock() defer clientProfilesMutex.Unlock() - + // Double-check after acquiring write lock if profile, exists := clientProfiles[clientIP]; exists { return profile } - + profile = &ClientProfile{ OptimalChunkSize: 2 * 1024 * 1024, // 2MB default OptimalBufferSize: 64 * 1024, // 64KB default ReliabilityScore: 0.8, // Assume good initially - LastSeen: time.Now(), - ConnectionType: "unknown", + LastSeen: time.Now(), + ConnectionType: "unknown", } - + clientProfiles[clientIP] = profile return profile } @@ -513,22 +516,22 @@ func getClientProfile(clientIP string) *ClientProfile { // updateClientProfile updates performance data for a client func updateClientProfile(clientIP string, throughput int64, bufferSize int) { profile := getClientProfile(clientIP) - + clientProfilesMutex.Lock() defer clientProfilesMutex.Unlock() - + // Exponential moving average for throughput if profile.AverageThroughput == 0 { profile.AverageThroughput = throughput } else { profile.AverageThroughput = (profile.AverageThroughput*9 + throughput) / 10 } - + // Update optimal buffer size if this performed well if throughput > profile.AverageThroughput*110/100 { // 10% better profile.OptimalBufferSize = bufferSize } - + // Track interface usage if multi-interface manager is available if multiInterfaceManager != nil { currentInterface := multiInterfaceManager.GetActiveInterface() @@ -536,7 +539,7 @@ func updateClientProfile(clientIP string, throughput int64, bufferSize int) { updateInterfaceUsage(profile, currentInterface, throughput, bufferSize) } } - + profile.LastSeen = time.Now() } @@ -550,12 +553,12 @@ func updateInterfaceUsage(profile *ClientProfile, interfaceName string, throughp break } } - + // Create new record if not found if usage == nil { profile.InterfaceHistory = append(profile.InterfaceHistory, InterfaceUsage{ InterfaceName: interfaceName, - LastUsed: time.Now(), + LastUsed: time.Now(), AverageThroughput: throughput, ReliabilityScore: 0.8, // Start with good assumption OptimalBufferSize: bufferSize, @@ -564,46 +567,47 @@ func updateInterfaceUsage(profile *ClientProfile, interfaceName string, throughp // Update existing record usage.LastUsed = time.Now() usage.AverageThroughput = (usage.AverageThroughput*4 + throughput) / 5 // Faster adaptation - + // Update reliability score based on performance consistency if throughput > usage.AverageThroughput*90/100 { // Within 10% of average usage.ReliabilityScore = minFloat64(usage.ReliabilityScore+0.1, 1.0) } else { usage.ReliabilityScore = maxFloat64(usage.ReliabilityScore-0.1, 0.0) } - + // Update optimal buffer size if performance improved if throughput > usage.AverageThroughput { usage.OptimalBufferSize = bufferSize } } - + // Keep only recent interface history (last 10 interfaces) if len(profile.InterfaceHistory) > 10 { profile.InterfaceHistory = profile.InterfaceHistory[1:] } - + // Update preferred interface if this one is performing significantly better - if usage != nil && (profile.PreferredInterface == "" || + if usage != nil && (profile.PreferredInterface == "" || usage.AverageThroughput > profile.AverageThroughput*120/100) { profile.PreferredInterface = interfaceName } } // detectConnectionType attempts to determine connection type from request +// nolint:unused func detectConnectionType(r *http.Request) string { userAgent := r.Header.Get("User-Agent") - + // Simple heuristics - could be enhanced with more sophisticated detection if containsAny(userAgent, "Mobile", "Android", "iPhone", "iPad") { return "mobile" } - + // Check for specific client indicators if containsAny(userAgent, "curl", "wget", "HTTPie") { return "cli" } - + // Default assumption return "browser" } @@ -652,6 +656,7 @@ func maxFloat64(a, b float64) float64 { } // Enhanced upload handler using the streaming engine +// nolint:unused func handleUploadWithAdaptiveIO(w http.ResponseWriter, r *http.Request) { startTime := time.Now() activeConnections.Inc() @@ -709,7 +714,7 @@ func handleUploadWithAdaptiveIO(w http.ResponseWriter, r *http.Request) { // Use adaptive streaming engine clientIP := getClientIP(r) sessionID := generateSessionID("", "") - + written, err := globalStreamingEngine.StreamWithAdaptation( dst, file, @@ -717,7 +722,7 @@ func handleUploadWithAdaptiveIO(w http.ResponseWriter, r *http.Request) { sessionID, clientIP, ) - + if err != nil { http.Error(w, fmt.Sprintf("Error saving file: %v", err), http.StatusInternalServerError) uploadErrorsTotal.Inc() @@ -740,13 +745,14 @@ func handleUploadWithAdaptiveIO(w http.ResponseWriter, r *http.Request) { "size": written, "duration": duration.String(), } - json.NewEncoder(w).Encode(response) + _ = json.NewEncoder(w).Encode(response) log.Infof("Successfully uploaded %s (%s) in %s using adaptive I/O", filename, formatBytes(written), duration) } // Enhanced download handler with adaptive streaming +// nolint:unused func handleDownloadWithAdaptiveIO(w http.ResponseWriter, r *http.Request) { startTime := time.Now() activeConnections.Inc() @@ -765,7 +771,6 @@ func handleDownloadWithAdaptiveIO(w http.ResponseWriter, r *http.Request) { if conf.ISO.Enabled { storagePath = conf.ISO.MountPoint } - absFilename := filepath.Join(storagePath, filename) // Sanitize the file path absFilename, err := sanitizeFilePath(storagePath, filename) @@ -805,7 +810,7 @@ func handleDownloadWithAdaptiveIO(w http.ResponseWriter, r *http.Request) { // Use adaptive streaming engine clientIP := getClientIP(r) sessionID := generateSessionID("", "") - + n, err := globalStreamingEngine.StreamWithAdaptation( w, file, @@ -813,7 +818,7 @@ func handleDownloadWithAdaptiveIO(w http.ResponseWriter, r *http.Request) { sessionID, clientIP, ) - + if err != nil { log.Errorf("Error during download of %s: %v", absFilename, err) downloadErrorsTotal.Inc() @@ -832,23 +837,23 @@ func handleDownloadWithAdaptiveIO(w http.ResponseWriter, r *http.Request) { // MultiInterfaceManager handles multiple network interfaces for seamless switching type MultiInterfaceManager struct { - interfaces map[string]*NetworkInterface + interfaces map[string]*NetworkInterface activeInterface string - mutex sync.RWMutex - switchHistory []InterfaceSwitch - config *MultiInterfaceConfig + mutex sync.RWMutex + switchHistory []InterfaceSwitch + config *MultiInterfaceConfig } // NetworkInterface represents a network adapter type NetworkInterface struct { - Name string - Type InterfaceType - Priority int - Quality *InterfaceQuality - Active bool - Gateway net.IP - MTU int - LastSeen time.Time + Name string + Type InterfaceType + Priority int + Quality *InterfaceQuality + Active bool + Gateway net.IP + MTU int + LastSeen time.Time ThroughputHistory []ThroughputSample } @@ -897,33 +902,33 @@ const ( // MultiInterfaceConfig holds configuration for multi-interface support type MultiInterfaceConfig struct { - Enabled bool - InterfacePriority []string - AutoSwitchEnabled bool - SwitchThresholdLatency time.Duration - SwitchThresholdPacketLoss float64 + Enabled bool + InterfacePriority []string + AutoSwitchEnabled bool + SwitchThresholdLatency time.Duration + SwitchThresholdPacketLoss float64 QualityDegradationThreshold float64 - MaxSwitchAttempts int - SwitchDetectionInterval time.Duration + MaxSwitchAttempts int + SwitchDetectionInterval time.Duration } // NewMultiInterfaceManager creates a new multi-interface manager func NewMultiInterfaceManager() *MultiInterfaceManager { config := &MultiInterfaceConfig{ - Enabled: conf.NetworkResilience.MultiInterfaceEnabled, - InterfacePriority: []string{"eth0", "wlan0", "wwan0", "ppp0"}, - AutoSwitchEnabled: true, - SwitchThresholdLatency: 500 * time.Millisecond, - SwitchThresholdPacketLoss: 5.0, + Enabled: conf.NetworkResilience.MultiInterfaceEnabled, + InterfacePriority: []string{"eth0", "wlan0", "wwan0", "ppp0"}, + AutoSwitchEnabled: true, + SwitchThresholdLatency: 500 * time.Millisecond, + SwitchThresholdPacketLoss: 5.0, QualityDegradationThreshold: 0.3, - MaxSwitchAttempts: 3, - SwitchDetectionInterval: 2 * time.Second, + MaxSwitchAttempts: 3, + SwitchDetectionInterval: 2 * time.Second, } - + return &MultiInterfaceManager{ interfaces: make(map[string]*NetworkInterface), switchHistory: make([]InterfaceSwitch, 0, 100), - config: config, + config: config, } } @@ -931,16 +936,13 @@ func NewMultiInterfaceManager() *MultiInterfaceManager { func (mim *MultiInterfaceManager) StartMonitoring() { ticker := time.NewTicker(mim.config.SwitchDetectionInterval) defer ticker.Stop() - + // Initial discovery mim.discoverInterfaces() - - for { - select { - case <-ticker.C: - mim.updateInterfaceStatus() - mim.evaluateInterfaceSwitching() - } + + for range ticker.C { + mim.updateInterfaceStatus() + mim.evaluateInterfaceSwitching() } } @@ -951,10 +953,10 @@ func (mim *MultiInterfaceManager) discoverInterfaces() { log.Errorf("Failed to discover network interfaces: %v", err) return } - + mim.mutex.Lock() defer mim.mutex.Unlock() - + for _, iface := range interfaces { if iface.Flags&net.FlagUp != 0 && iface.Flags&net.FlagLoopback == 0 { netIface := &NetworkInterface{ @@ -964,19 +966,19 @@ func (mim *MultiInterfaceManager) discoverInterfaces() { Active: true, MTU: iface.MTU, LastSeen: time.Now(), - Quality: &InterfaceQuality{ + Quality: &InterfaceQuality{ Name: iface.Name, Connectivity: ConnectivityUnknown, }, ThroughputHistory: make([]ThroughputSample, 0, 50), } - + mim.interfaces[iface.Name] = netIface - log.Infof("Discovered network interface: %s (type: %s, priority: %d)", + log.Infof("Discovered network interface: %s (type: %s, priority: %d)", iface.Name, mim.interfaceTypeString(netIface.Type), netIface.Priority) } } - + // Set initial active interface if mim.activeInterface == "" { mim.activeInterface = mim.selectBestInterface() @@ -1012,21 +1014,21 @@ func (mim *MultiInterfaceManager) GetActiveInterface() string { func (mim *MultiInterfaceManager) selectBestInterface() string { mim.mutex.RLock() defer mim.mutex.RUnlock() - + var bestInterface *NetworkInterface var bestName string - + for name, iface := range mim.interfaces { if !iface.Active { continue } - + if bestInterface == nil || mim.isInterfaceBetter(iface, bestInterface) { bestInterface = iface bestName = name } } - + return bestName } @@ -1037,7 +1039,7 @@ func (mim *MultiInterfaceManager) getInterfacePriority(name string) int { return i } } - + // Default priority based on interface type interfaceType := mim.detectInterfaceType(name) switch interfaceType { @@ -1062,14 +1064,14 @@ func (mim *MultiInterfaceManager) isInterfaceBetter(a, b *NetworkInterface) bool if a.Priority != b.Priority { return a.Priority < b.Priority } - + // Then check quality metrics if available if a.Quality != nil && b.Quality != nil { aScore := mim.calculateInterfaceScore(a) bScore := mim.calculateInterfaceScore(b) return aScore > bScore } - + // Fallback to priority only return a.Priority < b.Priority } @@ -1079,20 +1081,20 @@ func (mim *MultiInterfaceManager) calculateInterfaceScore(iface *NetworkInterfac if iface.Quality == nil { return 0.0 } - + score := 100.0 // Base score - + // Penalize high latency if iface.Quality.RTT > 100*time.Millisecond { score -= float64(iface.Quality.RTT.Milliseconds()) * 0.1 } - + // Penalize packet loss score -= iface.Quality.PacketLoss * 10 - + // Reward stability score += iface.Quality.Stability * 50 - + // Adjust for interface type switch iface.Type { case InterfaceEthernet: @@ -1106,7 +1108,7 @@ func (mim *MultiInterfaceManager) calculateInterfaceScore(iface *NetworkInterfac case InterfaceVPN: score -= 10 // VPN adds overhead } - + return maxFloat64(score, 0.0) } @@ -1117,15 +1119,15 @@ func (mim *MultiInterfaceManager) updateInterfaceStatus() { log.Errorf("Failed to update interface status: %v", err) return } - + mim.mutex.Lock() defer mim.mutex.Unlock() - + // Mark all interfaces as potentially inactive for _, iface := range mim.interfaces { iface.Active = false } - + // Update active interfaces for _, iface := range interfaces { if iface.Flags&net.FlagUp != 0 && iface.Flags&net.FlagLoopback == 0 { @@ -1143,14 +1145,14 @@ func (mim *MultiInterfaceManager) evaluateInterfaceSwitching() { if !mim.config.AutoSwitchEnabled { return } - + currentInterface := mim.GetActiveInterface() if currentInterface == "" { return } - + bestInterface := mim.selectBestInterface() - + if bestInterface != currentInterface && bestInterface != "" { reason := mim.determineSwitchReason(currentInterface, bestInterface) mim.switchToInterface(bestInterface, reason) @@ -1161,13 +1163,13 @@ func (mim *MultiInterfaceManager) evaluateInterfaceSwitching() { func (mim *MultiInterfaceManager) determineSwitchReason(current, target string) SwitchReason { mim.mutex.RLock() defer mim.mutex.RUnlock() - + currentIface := mim.interfaces[current] - + if currentIface == nil || !currentIface.Active { return SwitchReasonInterfaceDown } - + // Check if current interface quality has degraded if currentIface.Quality != nil { if currentIface.Quality.PacketLoss > mim.config.SwitchThresholdPacketLoss { @@ -1177,7 +1179,7 @@ func (mim *MultiInterfaceManager) determineSwitchReason(current, target string) return SwitchReasonQualityDegradation } } - + return SwitchReasonBetterAlternative } @@ -1187,7 +1189,7 @@ func (mim *MultiInterfaceManager) switchToInterface(newInterface string, reason oldInterface := mim.activeInterface mim.activeInterface = newInterface mim.mutex.Unlock() - + // Record the switch switchEvent := InterfaceSwitch{ FromInterface: oldInterface, @@ -1196,17 +1198,17 @@ func (mim *MultiInterfaceManager) switchToInterface(newInterface string, reason Reason: reason, TransferStatus: TransferStatusContinuous, } - + mim.mutex.Lock() mim.switchHistory = append(mim.switchHistory, switchEvent) if len(mim.switchHistory) > 100 { mim.switchHistory = mim.switchHistory[1:] } mim.mutex.Unlock() - + log.Infof("Switched network interface: %s -> %s (reason: %s)", oldInterface, newInterface, mim.switchReasonString(reason)) - + // Notify active transfers about the switch if globalStreamingEngine != nil { go globalStreamingEngine.handleInterfaceSwitch(oldInterface, newInterface, reason) @@ -1253,7 +1255,7 @@ func (mim *MultiInterfaceManager) switchReasonString(r SwitchReason) string { func (mim *MultiInterfaceManager) GetInterfaceInfo(interfaceName string) *NetworkInterface { mim.mutex.RLock() defer mim.mutex.RUnlock() - + for _, iface := range mim.interfaces { if iface.Name == interfaceName { return iface diff --git a/cmd/server/chunked_upload_handler.go b/cmd/server/chunked_upload_handler.go index dac2922..f1cb9e1 100644 --- a/cmd/server/chunked_upload_handler.go +++ b/cmd/server/chunked_upload_handler.go @@ -81,7 +81,7 @@ func handleChunkedUpload(w http.ResponseWriter, r *http.Request) { return } if totalSize > maxSizeBytes { - http.Error(w, fmt.Sprintf("File size %s exceeds maximum allowed size %s", + http.Error(w, fmt.Sprintf("File size %s exceeds maximum allowed size %s", formatBytes(totalSize), conf.Server.MaxUploadSize), http.StatusRequestEntityTooLarge) uploadErrorsTotal.Inc() return @@ -115,7 +115,7 @@ func handleChunkedUpload(w http.ResponseWriter, r *http.Request) { uploadsTotal.Inc() uploadSizeBytes.Observe(float64(existingFileInfo.Size())) filesDeduplicatedTotal.Inc() - + w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) response := map[string]interface{}{ @@ -126,8 +126,8 @@ func handleChunkedUpload(w http.ResponseWriter, r *http.Request) { "message": "File already exists (deduplication hit)", } writeJSONResponse(w, response) - - log.Infof("Chunked upload deduplication hit: file %s already exists (%s), returning success immediately", + + log.Infof("Chunked upload deduplication hit: file %s already exists (%s), returning success immediately", filename, formatBytes(existingFileInfo.Size())) return } @@ -141,9 +141,9 @@ func handleChunkedUpload(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusCreated) response := map[string]interface{}{ - "session_id": session.ID, - "chunk_size": session.ChunkSize, - "total_chunks": (totalSize + session.ChunkSize - 1) / session.ChunkSize, + "session_id": session.ID, + "chunk_size": session.ChunkSize, + "total_chunks": (totalSize + session.ChunkSize - 1) / session.ChunkSize, } writeJSONResponse(w, response) return @@ -207,18 +207,18 @@ func handleChunkedUpload(w http.ResponseWriter, r *http.Request) { // Get updated session for completion check session, _ = uploadSessionStore.GetSession(sessionID) progress := float64(session.UploadedBytes) / float64(session.TotalSize) - + // Debug logging for large files if session.TotalSize > 50*1024*1024 { // Log for files > 50MB - log.Debugf("Chunk %d uploaded for %s: %d/%d bytes (%.1f%%)", + log.Debugf("Chunk %d uploaded for %s: %d/%d bytes (%.1f%%)", chunkNumber, session.Filename, session.UploadedBytes, session.TotalSize, progress*100) } // Check if upload is complete isComplete := uploadSessionStore.IsSessionComplete(sessionID) - log.Printf("DEBUG: Session %s completion check: %v (uploaded: %d, total: %d, progress: %.1f%%)", + log.Printf("DEBUG: Session %s completion check: %v (uploaded: %d, total: %d, progress: %.1f%%)", sessionID, isComplete, session.UploadedBytes, session.TotalSize, progress*100) - + if isComplete { log.Printf("DEBUG: Starting file assembly for session %s", sessionID) // Assemble final file @@ -257,8 +257,8 @@ func handleChunkedUpload(w http.ResponseWriter, r *http.Request) { "completed": true, } writeJSONResponse(w, response) - - log.Infof("Successfully completed chunked upload %s (%s) in %s", + + log.Infof("Successfully completed chunked upload %s (%s) in %s", session.Filename, formatBytes(session.TotalSize), duration) } else { // Return partial success @@ -365,12 +365,12 @@ func getClientIP(r *http.Request) string { parts := strings.Split(xff, ",") return strings.TrimSpace(parts[0]) } - + // Check X-Real-IP header if xri := r.Header.Get("X-Real-IP"); xri != "" { return xri } - + // Fall back to remote address host, _, _ := strings.Cut(r.RemoteAddr, ":") return host @@ -379,7 +379,7 @@ func getClientIP(r *http.Request) string { func writeJSONResponse(w http.ResponseWriter, data interface{}) { w.Header().Set("Content-Type", "application/json") if jsonBytes, err := json.Marshal(data); err == nil { - w.Write(jsonBytes) + _, _ = w.Write(jsonBytes) } else { http.Error(w, "Error encoding JSON response", http.StatusInternalServerError) } diff --git a/cmd/server/client_network_handler.go b/cmd/server/client_network_handler.go index 2a912ef..50b75eb 100644 --- a/cmd/server/client_network_handler.go +++ b/cmd/server/client_network_handler.go @@ -96,10 +96,9 @@ func (cct *ClientConnectionTracker) DetectClientConnectionType(r *http.Request) } // Check for specific network indicators in headers - if xForwardedFor := r.Header.Get("X-Forwarded-For"); xForwardedFor != "" { - // This might indicate the client is behind a mobile carrier NAT - // Additional logic could be added here - } + // X-Forwarded-For might indicate client is behind a mobile carrier NAT + // This is noted for future enhancement + _ = r.Header.Get("X-Forwarded-For") // Check connection patterns (this would need more sophisticated logic) clientIP := getClientIP(r) diff --git a/cmd/server/config_test_scenarios.go b/cmd/server/config_test_scenarios.go index 0dac8bb..e71f3a6 100644 --- a/cmd/server/config_test_scenarios.go +++ b/cmd/server/config_test_scenarios.go @@ -211,7 +211,7 @@ func RunConfigTests() { // Create temporary directories for testing tempDir := filepath.Join(os.TempDir(), fmt.Sprintf("hmac-test-%d", i)) - os.MkdirAll(tempDir, 0755) + _ = os.MkdirAll(tempDir, 0755) defer os.RemoveAll(tempDir) // Update paths in config to use temp directory diff --git a/cmd/server/config_validator.go b/cmd/server/config_validator.go index c2997af..1be00a1 100644 --- a/cmd/server/config_validator.go +++ b/cmd/server/config_validator.go @@ -498,6 +498,7 @@ func validateCrossSection(c *Config, result *ConfigValidationResult) { // Enhanced Security Validation Functions // checkSecretStrength analyzes the strength of secrets/passwords +// nolint:unused func checkSecretStrength(secret string) (score int, issues []string) { if len(secret) == 0 { return 0, []string{"secret is empty"} @@ -586,6 +587,7 @@ func checkSecretStrength(secret string) (score int, issues []string) { } // hasRepeatedChars checks if a string has excessive repeated characters +// nolint:unused func hasRepeatedChars(s string) bool { if len(s) < 4 { return false @@ -601,6 +603,7 @@ func hasRepeatedChars(s string) bool { } // isDefaultOrExampleSecret checks if a secret appears to be a default/example value +// nolint:unused func isDefaultOrExampleSecret(secret string) bool { defaultSecrets := []string{ "your-secret-key-here", @@ -642,6 +645,7 @@ func isDefaultOrExampleSecret(secret string) bool { } // calculateEntropy calculates the Shannon entropy of a string +// nolint:unused func calculateEntropy(s string) float64 { if len(s) == 0 { return 0 @@ -668,6 +672,7 @@ func calculateEntropy(s string) float64 { } // validateSecretSecurity performs comprehensive secret security validation +// nolint:unused func validateSecretSecurity(fieldName, secret string, result *ConfigValidationResult) { if secret == "" { return // Already handled by other validators diff --git a/cmd/server/helpers.go b/cmd/server/helpers.go index 2a03166..716357f 100644 --- a/cmd/server/helpers.go +++ b/cmd/server/helpers.go @@ -29,11 +29,11 @@ import ( // WorkerPool represents a pool of workers type WorkerPool struct { - workers int - taskQueue chan UploadTask - scanQueue chan ScanTask - ctx context.Context - cancel context.CancelFunc + workers int + taskQueue chan UploadTask + scanQueue chan ScanTask + ctx context.Context + cancel context.CancelFunc } // NewWorkerPool creates a new worker pool @@ -148,37 +148,37 @@ func handleDeduplication(ctx context.Context, absFilename string) error { confMutex.RLock() dedupEnabled := conf.Server.DeduplicationEnabled && conf.Deduplication.Enabled confMutex.RUnlock() - + if !dedupEnabled { log.Debugf("Deduplication disabled, skipping for file: %s", absFilename) return nil } - + // Check file size and skip deduplication for very large files (performance optimization) fileInfo, err := os.Stat(absFilename) if err != nil { log.Warnf("Failed to get file size for deduplication: %v", err) return nil // Don't fail upload, just skip deduplication } - + // Parse maxsize from config, default to 500MB if not set confMutex.RLock() maxDedupSizeStr := conf.Deduplication.MaxSize confMutex.RUnlock() - + maxDedupSize := int64(500 * 1024 * 1024) // Default 500MB if maxDedupSizeStr != "" { if parsedSize, parseErr := parseSize(maxDedupSizeStr); parseErr == nil { maxDedupSize = parsedSize } } - + if fileInfo.Size() > maxDedupSize { - log.Infof("File %s (%d bytes) exceeds deduplication size limit (%d bytes), skipping deduplication", + log.Infof("File %s (%d bytes) exceeds deduplication size limit (%d bytes), skipping deduplication", absFilename, fileInfo.Size(), maxDedupSize) return nil } - + log.Infof("Starting deduplication for file %s (%d bytes)", absFilename, fileInfo.Size()) checksum, err := computeSHA256(ctx, absFilename) @@ -215,18 +215,20 @@ func handleDeduplication(ctx context.Context, absFilename string) error { } if err := os.Link(existingPath, absFilename); err != nil { - log.Warnf("Failed to create link after deduplication: %v", err) + log.Warnf("Failed to create link after deduplication: %v", err) // Try to restore original file if restoreErr := os.Rename(existingPath, absFilename); restoreErr != nil { log.Errorf("Failed to restore file after deduplication error: %v", restoreErr) } return nil // Don't fail upload } - + log.Infof("Successfully deduplicated file %s", absFilename) return nil } +// handleISOContainer handles ISO container operations +// nolint:unused func handleISOContainer(absFilename string) error { isoPath := filepath.Join(conf.ISO.MountPoint, "container.iso") if err := CreateISOContainer([]string{absFilename}, isoPath, conf.ISO.Size, conf.ISO.Charset); err != nil { @@ -329,9 +331,9 @@ func logSystemInfo() { if err != nil { log.Warnf("Failed to get memory stats: %v", err) } else { - log.Infof("System Memory: Total=%s, Available=%s, Used=%.1f%%", - formatBytes(int64(memStats.Total)), - formatBytes(int64(memStats.Available)), + log.Infof("System Memory: Total=%s, Available=%s, Used=%.1f%%", + formatBytes(int64(memStats.Total)), + formatBytes(int64(memStats.Available)), memStats.UsedPercent) } @@ -342,7 +344,7 @@ func logSystemInfo() { log.Infof("CPU: %s, Cores=%d", cpuStats[0].ModelName, len(cpuStats)) } - log.Infof("Go Runtime: Version=%s, NumCPU=%d, NumGoroutine=%d", + log.Infof("Go Runtime: Version=%s, NumCPU=%d, NumGoroutine=%d", runtime.Version(), runtime.NumCPU(), runtime.NumGoroutine()) } @@ -479,30 +481,30 @@ func scanFileWithClamAV(filename string) error { if err != nil { return fmt.Errorf("failed to get file size: %w", err) } - + // Parse maxscansize from config, default to 200MB if not set confMutex.RLock() maxScanSizeStr := conf.ClamAV.MaxScanSize confMutex.RUnlock() - + maxScanSize := int64(200 * 1024 * 1024) // Default 200MB if maxScanSizeStr != "" { if parsedSize, parseErr := parseSize(maxScanSizeStr); parseErr == nil { maxScanSize = parsedSize } } - + if fileInfo.Size() > maxScanSize { - log.Infof("File %s (%d bytes) exceeds ClamAV scan limit (%d bytes), skipping scan", + log.Infof("File %s (%d bytes) exceeds ClamAV scan limit (%d bytes), skipping scan", filename, fileInfo.Size(), maxScanSize) return nil } - + // Also check file extension - only scan configured dangerous types confMutex.RLock() scanExtensions := conf.ClamAV.ScanFileExtensions confMutex.RUnlock() - + if len(scanExtensions) > 0 { ext := strings.ToLower(filepath.Ext(filename)) shouldScan := false @@ -519,14 +521,14 @@ func scanFileWithClamAV(filename string) error { } log.Infof("Scanning file %s (%d bytes) with ClamAV", filename, fileInfo.Size()) - + result, err := clamClient.ScanFile(filename) if err != nil { return fmt.Errorf("ClamAV scan failed: %w", err) } // Handle the result channel with timeout based on file size - timeout := 10 * time.Second // Base timeout + timeout := 10 * time.Second // Base timeout if fileInfo.Size() > 10*1024*1024 { // 10MB+ timeout = 30 * time.Second } @@ -558,7 +560,7 @@ func initClamAV(socketPath string) (*clamd.Clamd, error) { } client := clamd.NewClamd(socketPath) - + // Test connection err := client.Ping() if err != nil { @@ -591,6 +593,7 @@ func initRedis() { } // monitorNetwork monitors network events +// nolint:unused func monitorNetwork(ctx context.Context) { log.Info("Starting network monitoring") ticker := time.NewTicker(30 * time.Second) @@ -630,9 +633,10 @@ func monitorNetwork(ctx context.Context) { } // handleNetworkEvents handles network events +// nolint:unused func handleNetworkEvents(ctx context.Context) { log.Info("Starting network event handler") - + for { select { case <-ctx.Done(): @@ -673,7 +677,7 @@ func updateSystemMetrics(ctx context.Context) { // setupRouter sets up HTTP routes func setupRouter() *http.ServeMux { mux := http.NewServeMux() - + // Add CORS middleware wrapper - Enhanced for multi-upload scenarios corsWrapper := func(handler http.HandlerFunc) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { @@ -684,23 +688,23 @@ func setupRouter() *http.ServeMux { w.Header().Set("Access-Control-Expose-Headers", "Content-Length, Content-Range, X-Upload-Status, X-Session-ID, Location, ETag") w.Header().Set("Access-Control-Max-Age", "86400") w.Header().Set("Access-Control-Allow-Credentials", "false") - + // Handle OPTIONS preflight for all endpoints if r.Method == http.MethodOptions { log.Infof("🔍 CORS DEBUG: OPTIONS preflight for %s from origin %s", r.URL.Path, r.Header.Get("Origin")) w.WriteHeader(http.StatusOK) return } - + handler(w, r) } } - + mux.HandleFunc("/upload", corsWrapper(handleUpload)) mux.HandleFunc("/download/", corsWrapper(handleDownload)) mux.HandleFunc("/health", corsWrapper(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) - w.Write([]byte("OK")) + _, _ = w.Write([]byte("OK")) })) if conf.Server.MetricsEnabled { @@ -711,7 +715,7 @@ func setupRouter() *http.ServeMux { // This must be added last as it matches all paths mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { log.Infof("🔍 ROUTER DEBUG: Catch-all handler called - method:%s path:%s query:%s", r.Method, r.URL.Path, r.URL.RawQuery) - + // Enhanced CORS headers for all responses - Multi-upload compatible w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Methods", "GET, PUT, POST, DELETE, OPTIONS, HEAD") @@ -719,41 +723,41 @@ func setupRouter() *http.ServeMux { w.Header().Set("Access-Control-Expose-Headers", "Content-Length, Content-Range, X-Upload-Status, X-Session-ID, Location, ETag") w.Header().Set("Access-Control-Max-Age", "86400") w.Header().Set("Access-Control-Allow-Credentials", "false") - + // Handle CORS preflight requests (fix for Gajim "bad gateway" error) if r.Method == http.MethodOptions { log.Infof("🔍 ROUTER DEBUG: Handling CORS preflight (OPTIONS) request for %s", r.URL.Path) w.WriteHeader(http.StatusOK) return } - + // Handle PUT requests for all upload protocols if r.Method == http.MethodPut { query := r.URL.Query() - - log.Infof("🔍 ROUTER DEBUG: Query parameters - v:%s v2:%s v3:%s token:%s expires:%s", + + log.Infof("🔍 ROUTER DEBUG: Query parameters - v:%s v2:%s v3:%s token:%s expires:%s", query.Get("v"), query.Get("v2"), query.Get("v3"), query.Get("token"), query.Get("expires")) - + // Check if this is a v3 request (mod_http_upload_external) if query.Get("v3") != "" && query.Get("expires") != "" { log.Info("🔍 ROUTER DEBUG: Routing to handleV3Upload") handleV3Upload(w, r) return } - + // Check if this is a legacy protocol request (v, v2, token) if query.Get("v") != "" || query.Get("v2") != "" || query.Get("token") != "" { log.Info("🔍 ROUTER DEBUG: Routing to handleLegacyUpload") handleLegacyUpload(w, r) return } - + // Handle regular PUT uploads (non-XMPP) - route to general upload handler log.Info("🔍 ROUTER DEBUG: PUT request with no protocol parameters - routing to handlePutUpload") handlePutUpload(w, r) return } - + // Handle GET/HEAD requests for downloads if r.Method == http.MethodGet || r.Method == http.MethodHead { // Only handle download requests if the path looks like a file @@ -763,13 +767,13 @@ func setupRouter() *http.ServeMux { return } } - + // For all other requests, return 404 http.NotFound(w, r) }) log.Info("HTTP router configured successfully with full protocol support (v, v2, token, v3)") - + return mux } @@ -831,7 +835,7 @@ func NewProgressWriter(dst io.Writer, total int64, filename string) *ProgressWri percentage := float64(written) / float64(total) * 100 sizeMiB := float64(written) / (1024 * 1024) totalMiB := float64(total) / (1024 * 1024) - log.Infof("Upload progress for %s: %.1f%% (%.1f/%.1f MiB)", + log.Infof("Upload progress for %s: %.1f%% (%.1f/%.1f MiB)", filepath.Base(filename), percentage, sizeMiB, totalMiB) } }, @@ -845,38 +849,38 @@ func (pw *ProgressWriter) Write(p []byte) (int, error) { if err != nil { return n, err } - + pw.written += int64(n) - + // Report progress every 30 seconds or every 50MB for large files now := time.Now() shouldReport := false - + if pw.total > 100*1024*1024 { // Files larger than 100MB - shouldReport = now.Sub(pw.lastReport) > 30*time.Second || - (pw.written%(50*1024*1024) == 0 && pw.written > 0) + shouldReport = now.Sub(pw.lastReport) > 30*time.Second || + (pw.written%(50*1024*1024) == 0 && pw.written > 0) } else if pw.total > 10*1024*1024 { // Files larger than 10MB shouldReport = now.Sub(pw.lastReport) > 10*time.Second || - (pw.written%(10*1024*1024) == 0 && pw.written > 0) + (pw.written%(10*1024*1024) == 0 && pw.written > 0) } - + if shouldReport && pw.onProgress != nil { pw.onProgress(pw.written, pw.total, pw.filename) pw.lastReport = now } - + return n, err } // copyWithProgress copies data from src to dst with progress reporting func copyWithProgress(dst io.Writer, src io.Reader, total int64, filename string) (int64, error) { progressWriter := NewProgressWriter(dst, total, filename) - + // Use a pooled buffer for efficient copying bufPtr := bufferPool.Get().(*[]byte) defer bufferPool.Put(bufPtr) buf := *bufPtr - + return io.CopyBuffer(progressWriter, src, buf) } @@ -930,7 +934,7 @@ func handlePutUpload(w http.ResponseWriter, r *http.Request) { return } if r.ContentLength > maxSizeBytes { - http.Error(w, fmt.Sprintf("File size %s exceeds maximum allowed size %s", + http.Error(w, fmt.Sprintf("File size %s exceeds maximum allowed size %s", formatBytes(r.ContentLength), conf.Server.MaxUploadSize), http.StatusRequestEntityTooLarge) uploadErrorsTotal.Inc() return @@ -1007,7 +1011,7 @@ func handlePutUpload(w http.ResponseWriter, r *http.Request) { // Return success response w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) - + if err := json.NewEncoder(w).Encode(response); err != nil { log.Errorf("Failed to encode response: %v", err) } @@ -1016,6 +1020,6 @@ func handlePutUpload(w http.ResponseWriter, r *http.Request) { requestDuration := time.Since(startTime) uploadDuration.Observe(requestDuration.Seconds()) uploadsTotal.Inc() - + log.Infof("PUT upload completed: %s (%d bytes) in %v", filename, written, requestDuration) } diff --git a/cmd/server/integration.go b/cmd/server/integration.go index 555419f..9d7f70d 100644 --- a/cmd/server/integration.go +++ b/cmd/server/integration.go @@ -70,34 +70,31 @@ func MonitorUploadPerformance() { ticker := time.NewTicker(60 * time.Second) defer ticker.Stop() - for { - select { - case <-ticker.C: - // Log upload session statistics - if uploadSessionStore != nil { - uploadSessionStore.mutex.RLock() - activeSessionsCount := len(uploadSessionStore.sessions) - uploadSessionStore.mutex.RUnlock() - - if activeSessionsCount > 0 { - log.Infof("Active upload sessions: %d", activeSessionsCount) - } - } + for range ticker.C { + // Log upload session statistics + if uploadSessionStore != nil { + uploadSessionStore.mutex.RLock() + activeSessionsCount := len(uploadSessionStore.sessions) + uploadSessionStore.mutex.RUnlock() - // Log network resilience status - if networkManager != nil { - networkManager.mutex.RLock() - activeUploadsCount := len(networkManager.activeUploads) - isPaused := networkManager.isPaused - networkManager.mutex.RUnlock() - - if activeUploadsCount > 0 { - status := "active" - if isPaused { - status = "paused" - } - log.Infof("Network resilience: %d uploads %s", activeUploadsCount, status) + if activeSessionsCount > 0 { + log.Infof("Active upload sessions: %d", activeSessionsCount) + } + } + + // Log network resilience status + if networkManager != nil { + networkManager.mutex.RLock() + activeUploadsCount := len(networkManager.activeUploads) + isPaused := networkManager.isPaused + networkManager.mutex.RUnlock() + + if activeUploadsCount > 0 { + status := "active" + if isPaused { + status = "paused" } + log.Infof("Network resilience: %d uploads %s", activeUploadsCount, status) } } } diff --git a/cmd/server/main.go b/cmd/server/main.go index 42e2ab0..7e3e783 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -57,6 +57,14 @@ type NetworkResilientSession struct { LastActivity time.Time `json:"last_activity"` } +// contextKey is a custom type for context keys to avoid collisions +type contextKey string + +// Context keys +const ( + responseWriterKey contextKey = "responseWriter" +) + // NetworkEvent tracks network transitions during session type NetworkEvent struct { Timestamp time.Time `json:"timestamp"` @@ -275,6 +283,7 @@ func generateUploadSessionID(uploadType, userAgent, clientIP string) string { } // Detect network context for intelligent switching +// nolint:unused func detectNetworkContext(r *http.Request) string { clientIP := getClientIP(r) userAgent := r.Header.Get("User-Agent") @@ -612,8 +621,8 @@ var ( conf Config versionString string log = logrus.New() - fileInfoCache *cache.Cache - fileMetadataCache *cache.Cache + fileInfoCache *cache.Cache //nolint:unused + fileMetadataCache *cache.Cache //nolint:unused clamClient *clamd.Clamd redisClient *redis.Client redisConnected bool @@ -642,7 +651,7 @@ var ( isoMountErrorsTotal prometheus.Counter workerPool *WorkerPool - networkEvents chan NetworkEvent + networkEvents chan NetworkEvent //nolint:unused workerAdjustmentsTotal prometheus.Counter workerReAdjustmentsTotal prometheus.Counter @@ -662,9 +671,12 @@ var semaphore = make(chan struct{}, maxConcurrentOperations) // Global client connection tracker for multi-interface support var clientTracker *ClientConnectionTracker +//nolint:unused var logMessages []string +//nolint:unused var logMu sync.Mutex +//nolint:unused func flushLogMessages() { logMu.Lock() defer logMu.Unlock() @@ -770,6 +782,7 @@ func initializeNetworkProtocol(forceProtocol string) (*net.Dialer, error) { } } +//nolint:unused var dualStackClient *http.Client func main() { @@ -1165,6 +1178,8 @@ func main() { go handleFileCleanup(&conf) } +// printExampleConfig prints an example configuration file +// nolint:unused func printExampleConfig() { fmt.Print(` [server] @@ -1261,6 +1276,8 @@ version = "3.3.0" `) } +// getExampleConfigString returns an example configuration string +// nolint:unused func getExampleConfigString() string { return `[server] listen_address = ":8080" @@ -1439,6 +1456,8 @@ func monitorWorkerPerformance(ctx context.Context, server *ServerConfig, w *Work } } +// readConfig reads configuration from a file +// nolint:unused func readConfig(configFilename string, conf *Config) error { viper.SetConfigFile(configFilename) if err := viper.ReadInConfig(); err != nil { @@ -1451,6 +1470,8 @@ func readConfig(configFilename string, conf *Config) error { return nil } +// setDefaults sets default configuration values +// nolint:unused func setDefaults() { viper.SetDefault("server.listen_address", ":8080") viper.SetDefault("server.storage_path", "./uploads") @@ -2604,7 +2625,7 @@ func handleUpload(w http.ResponseWriter, r *http.Request) { if strings.HasPrefix(authHeader, "Bearer ") { // Bearer token authentication with session recovery for network switching // Store response writer in context for session headers - ctx := context.WithValue(r.Context(), "responseWriter", w) + ctx := context.WithValue(r.Context(), responseWriterKey, w) r = r.WithContext(ctx) claims, err := validateBearerTokenWithSession(r, conf.Security.Secret) @@ -2805,7 +2826,7 @@ func handleUpload(w http.ResponseWriter, r *http.Request) { "message": "File already exists (deduplication hit)", "upload_time": duration.String(), } - json.NewEncoder(w).Encode(response) + _ = json.NewEncoder(w).Encode(response) log.Infof("💾 Deduplication hit: file %s already exists (%s), returning success immediately (IP: %s)", filename, formatBytes(existingFileInfo.Size()), getClientIP(r)) @@ -2895,7 +2916,7 @@ func handleUpload(w http.ResponseWriter, r *http.Request) { // Send response immediately if jsonBytes, err := json.Marshal(response); err == nil { - w.Write(jsonBytes) + _, _ = w.Write(jsonBytes) } else { fmt.Fprintf(w, `{"success": true, "filename": "%s", "size": %d, "post_processing": "background"}`, filename, written) } @@ -2988,7 +3009,7 @@ func handleUpload(w http.ResponseWriter, r *http.Request) { // Create JSON response if jsonBytes, err := json.Marshal(response); err == nil { - w.Write(jsonBytes) + _, _ = w.Write(jsonBytes) } else { fmt.Fprintf(w, `{"success": true, "filename": "%s", "size": %d}`, filename, written) } @@ -3286,7 +3307,7 @@ func handleV3Upload(w http.ResponseWriter, r *http.Request) { "size": existingFileInfo.Size(), "message": "File already exists (deduplication hit)", } - json.NewEncoder(w).Encode(response) + _ = json.NewEncoder(w).Encode(response) log.Infof("Deduplication hit: file %s already exists (%s), returning success immediately", filename, formatBytes(existingFileInfo.Size())) @@ -3344,7 +3365,7 @@ func handleV3Upload(w http.ResponseWriter, r *http.Request) { // Send response immediately if jsonBytes, err := json.Marshal(response); err == nil { - w.Write(jsonBytes) + _, _ = w.Write(jsonBytes) } else { fmt.Fprintf(w, `{"success": true, "filename": "%s", "size": %d, "post_processing": "background"}`, filename, written) } @@ -3419,7 +3440,7 @@ func handleV3Upload(w http.ResponseWriter, r *http.Request) { // Create JSON response if jsonBytes, err := json.Marshal(response); err == nil { - w.Write(jsonBytes) + _, _ = w.Write(jsonBytes) } else { fmt.Fprintf(w, `{"success": true, "filename": "%s", "size": %d}`, filename, written) } diff --git a/cmd/server/network_resilience.go b/cmd/server/network_resilience.go index cf12c99..d1c8373 100644 --- a/cmd/server/network_resilience.go +++ b/cmd/server/network_resilience.go @@ -388,11 +388,8 @@ func (m *NetworkResilienceManager) monitorNetworkQuality() { log.Info("Starting network quality monitoring") - for { - select { - case <-ticker.C: - m.updateNetworkQuality() - } + for range ticker.C { + m.updateNetworkQuality() } } @@ -629,27 +626,24 @@ func (m *NetworkResilienceManager) monitorNetworkChanges() { // Get initial interface state m.lastInterfaces, _ = net.Interfaces() - for { - select { - case <-ticker.C: - currentInterfaces, err := net.Interfaces() - if err != nil { - log.Warnf("Failed to get network interfaces: %v", err) - continue - } - - if m.hasNetworkChanges(m.lastInterfaces, currentInterfaces) { - log.Info("Network change detected") - m.PauseAllUploads() - - // Wait for network stabilization - time.Sleep(2 * time.Second) - - m.ResumeAllUploads() - } - - m.lastInterfaces = currentInterfaces + for range ticker.C { + currentInterfaces, err := net.Interfaces() + if err != nil { + log.Warnf("Failed to get network interfaces: %v", err) + continue } + + if m.hasNetworkChanges(m.lastInterfaces, currentInterfaces) { + log.Info("Network change detected") + m.PauseAllUploads() + + // Wait for network stabilization + time.Sleep(2 * time.Second) + + m.ResumeAllUploads() + } + + m.lastInterfaces = currentInterfaces } } diff --git a/cmd/server/queue_resilience.go b/cmd/server/queue_resilience.go index 8dab27c..de740e6 100644 --- a/cmd/server/queue_resilience.go +++ b/cmd/server/queue_resilience.go @@ -35,7 +35,7 @@ type RobustQueue struct { lowPriority chan QueueItem // Worker management - workers []*QueueWorker + workers []*QueueWorker //nolint:unused workerHealth map[int]*WorkerHealth healthMutex sync.RWMutex @@ -108,10 +108,10 @@ type WorkerHealth struct { // QueueWorker represents a queue worker type QueueWorker struct { ID int - queue *RobustQueue - health *WorkerHealth - ctx context.Context - cancel context.CancelFunc + queue *RobustQueue //nolint:unused + health *WorkerHealth //nolint:unused + ctx context.Context //nolint:unused + cancel context.CancelFunc //nolint:unused } // NewRobustQueue creates a new robust queue with timeout resilience @@ -383,7 +383,7 @@ func (q *RobustQueue) ageSpecificQueue(source, target chan QueueItem, now time.T case source <- item: default: // Both queues full, move to spillover - q.spilloverEnqueue(item) + _ = q.spilloverEnqueue(item) } } } else { @@ -391,7 +391,7 @@ func (q *RobustQueue) ageSpecificQueue(source, target chan QueueItem, now time.T select { case source <- item: default: - q.spilloverEnqueue(item) + _ = q.spilloverEnqueue(item) } } default: diff --git a/cmd/server/upload_session.go b/cmd/server/upload_session.go index c937d23..13de571 100644 --- a/cmd/server/upload_session.go +++ b/cmd/server/upload_session.go @@ -49,7 +49,7 @@ func NewUploadSessionStore(tempDir string) *UploadSessionStore { } // Create temp directory if it doesn't exist - os.MkdirAll(tempDir, 0755) + _ = os.MkdirAll(tempDir, 0755) // Start cleanup routine go store.cleanupExpiredSessions() @@ -64,7 +64,7 @@ func (s *UploadSessionStore) CreateSession(filename string, totalSize int64, cli sessionID := generateSessionID("", filename) tempDir := filepath.Join(s.tempDir, sessionID) - os.MkdirAll(tempDir, 0755) + _ = os.MkdirAll(tempDir, 0755) session := &ChunkedUploadSession{ ID: sessionID, @@ -245,7 +245,7 @@ func (s *UploadSessionStore) persistSession(session *ChunkedUploadSession) { // Fallback to disk persistence sessionFile := filepath.Join(s.tempDir, session.ID+".session") data, _ := json.Marshal(session) - os.WriteFile(sessionFile, data, 0644) + _ = os.WriteFile(sessionFile, data, 0644) } } @@ -289,18 +289,15 @@ func (s *UploadSessionStore) cleanupExpiredSessions() { ticker := time.NewTicker(1 * time.Hour) defer ticker.Stop() - for { - select { - case <-ticker.C: - s.mutex.Lock() - now := time.Now() - for sessionID, session := range s.sessions { - if now.Sub(session.LastActivity) > 24*time.Hour { - s.CleanupSession(sessionID) - } + for range ticker.C { + s.mutex.Lock() + now := time.Now() + for sessionID, session := range s.sessions { + if now.Sub(session.LastActivity) > 24*time.Hour { + s.CleanupSession(sessionID) } - s.mutex.Unlock() } + s.mutex.Unlock() } } @@ -315,6 +312,8 @@ func getChunkSize() int64 { return 5 * 1024 * 1024 // 5MB default } +// randomString generates a random string of given length +// nolint:unused func randomString(n int) string { const charset = "abcdefghijklmnopqrstuvwxyz0123456789" b := make([]byte, n) @@ -324,6 +323,8 @@ func randomString(n int) string { return string(b) } +// copyFileContent copies content from src to dst file +// nolint:unused func copyFileContent(dst, src *os.File) (int64, error) { // Use the existing buffer pool for efficiency bufPtr := bufferPool.Get().(*[]byte)