339 lines
8.5 KiB
Go
339 lines
8.5 KiB
Go
// network_resilience.go - Network resilience middleware without modifying core functions
|
|
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"net"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// NetworkResilienceManager handles network change detection and upload pausing
|
|
type NetworkResilienceManager struct {
|
|
activeUploads map[string]*UploadContext
|
|
mutex sync.RWMutex
|
|
isPaused bool
|
|
pauseChannel chan bool
|
|
resumeChannel chan bool
|
|
lastInterfaces []net.Interface
|
|
}
|
|
|
|
// UploadContext tracks active upload state
|
|
type UploadContext struct {
|
|
SessionID string
|
|
PauseChan chan bool
|
|
ResumeChan chan bool
|
|
CancelChan chan bool
|
|
IsPaused bool
|
|
}
|
|
|
|
// NewNetworkResilienceManager creates a new network resilience manager
|
|
func NewNetworkResilienceManager() *NetworkResilienceManager {
|
|
manager := &NetworkResilienceManager{
|
|
activeUploads: make(map[string]*UploadContext),
|
|
pauseChannel: make(chan bool, 100),
|
|
resumeChannel: make(chan bool, 100),
|
|
}
|
|
|
|
// Start network monitoring if enabled
|
|
if conf.Server.NetworkEvents {
|
|
go manager.monitorNetworkChanges()
|
|
}
|
|
|
|
return manager
|
|
}
|
|
|
|
// RegisterUpload registers an active upload for pause/resume functionality
|
|
func (m *NetworkResilienceManager) RegisterUpload(sessionID string) *UploadContext {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
ctx := &UploadContext{
|
|
SessionID: sessionID,
|
|
PauseChan: make(chan bool, 1),
|
|
ResumeChan: make(chan bool, 1),
|
|
CancelChan: make(chan bool, 1),
|
|
IsPaused: false,
|
|
}
|
|
|
|
m.activeUploads[sessionID] = ctx
|
|
|
|
// If currently paused, immediately pause this upload
|
|
if m.isPaused {
|
|
select {
|
|
case ctx.PauseChan <- true:
|
|
ctx.IsPaused = true
|
|
default:
|
|
}
|
|
}
|
|
|
|
return ctx
|
|
}
|
|
|
|
// UnregisterUpload removes an upload from tracking
|
|
func (m *NetworkResilienceManager) UnregisterUpload(sessionID string) {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
if ctx, exists := m.activeUploads[sessionID]; exists {
|
|
close(ctx.PauseChan)
|
|
close(ctx.ResumeChan)
|
|
close(ctx.CancelChan)
|
|
delete(m.activeUploads, sessionID)
|
|
}
|
|
}
|
|
|
|
// PauseAllUploads pauses all active uploads
|
|
func (m *NetworkResilienceManager) PauseAllUploads() {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
m.isPaused = true
|
|
log.Info("Pausing all active uploads due to network change")
|
|
|
|
for _, ctx := range m.activeUploads {
|
|
if !ctx.IsPaused {
|
|
select {
|
|
case ctx.PauseChan <- true:
|
|
ctx.IsPaused = true
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// ResumeAllUploads resumes all paused uploads
|
|
func (m *NetworkResilienceManager) ResumeAllUploads() {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
m.isPaused = false
|
|
log.Info("Resuming all paused uploads")
|
|
|
|
for _, ctx := range m.activeUploads {
|
|
if ctx.IsPaused {
|
|
select {
|
|
case ctx.ResumeChan <- true:
|
|
ctx.IsPaused = false
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// monitorNetworkChanges monitors for network interface changes
|
|
func (m *NetworkResilienceManager) monitorNetworkChanges() {
|
|
ticker := time.NewTicker(5 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
// Get initial interface state
|
|
m.lastInterfaces, _ = net.Interfaces()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
currentInterfaces, err := net.Interfaces()
|
|
if err != nil {
|
|
log.Warnf("Failed to get network interfaces: %v", err)
|
|
continue
|
|
}
|
|
|
|
if m.hasNetworkChanges(m.lastInterfaces, currentInterfaces) {
|
|
log.Info("Network change detected")
|
|
m.PauseAllUploads()
|
|
|
|
// Wait for network stabilization
|
|
time.Sleep(2 * time.Second)
|
|
|
|
m.ResumeAllUploads()
|
|
}
|
|
|
|
m.lastInterfaces = currentInterfaces
|
|
}
|
|
}
|
|
}
|
|
|
|
// hasNetworkChanges compares interface states to detect changes
|
|
func (m *NetworkResilienceManager) hasNetworkChanges(old, new []net.Interface) bool {
|
|
if len(old) != len(new) {
|
|
return true
|
|
}
|
|
|
|
// Create maps for comparison
|
|
oldMap := make(map[string]net.Flags)
|
|
newMap := make(map[string]net.Flags)
|
|
|
|
for _, iface := range old {
|
|
if iface.Flags&net.FlagLoopback == 0 { // Skip loopback
|
|
oldMap[iface.Name] = iface.Flags
|
|
}
|
|
}
|
|
|
|
for _, iface := range new {
|
|
if iface.Flags&net.FlagLoopback == 0 { // Skip loopback
|
|
newMap[iface.Name] = iface.Flags
|
|
}
|
|
}
|
|
|
|
// Check for status changes
|
|
for name, oldFlags := range oldMap {
|
|
newFlags, exists := newMap[name]
|
|
if !exists || (oldFlags&net.FlagUp) != (newFlags&net.FlagUp) {
|
|
return true
|
|
}
|
|
}
|
|
|
|
// Check for new interfaces
|
|
for name := range newMap {
|
|
if _, exists := oldMap[name]; !exists {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// ResilientHTTPHandler wraps existing handlers with network resilience
|
|
func ResilientHTTPHandler(handler http.HandlerFunc, manager *NetworkResilienceManager) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
// Check for chunked upload headers
|
|
sessionID := r.Header.Get("X-Upload-Session-ID")
|
|
|
|
if sessionID != "" {
|
|
// This is a chunked upload, register for pause/resume
|
|
uploadCtx := manager.RegisterUpload(sessionID)
|
|
defer manager.UnregisterUpload(sessionID)
|
|
|
|
// Create a context that can be cancelled
|
|
ctx, cancel := context.WithCancel(r.Context())
|
|
defer cancel()
|
|
|
|
// Monitor for pause/resume signals in a goroutine
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-uploadCtx.PauseChan:
|
|
// Pause by setting a short timeout
|
|
log.Debugf("Upload %s paused", sessionID)
|
|
// Note: We can't actually pause an ongoing HTTP request,
|
|
// but we can ensure the next chunk upload waits
|
|
|
|
case <-uploadCtx.ResumeChan:
|
|
log.Debugf("Upload %s resumed", sessionID)
|
|
|
|
case <-uploadCtx.CancelChan:
|
|
cancel()
|
|
return
|
|
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Pass the context-aware request to the handler
|
|
r = r.WithContext(ctx)
|
|
}
|
|
|
|
// Call the original handler
|
|
handler(w, r)
|
|
}
|
|
}
|
|
|
|
// RetryableUploadWrapper adds retry logic around upload operations
|
|
func RetryableUploadWrapper(originalHandler http.HandlerFunc, maxRetries int) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
var lastErr error
|
|
|
|
for attempt := 0; attempt <= maxRetries; attempt++ {
|
|
if attempt > 0 {
|
|
// Exponential backoff with jitter
|
|
delay := time.Duration(attempt*attempt) * time.Second
|
|
jitter := time.Duration(float64(delay.Nanoseconds()) * 0.1 * float64((time.Now().UnixNano()%2)*2-1))
|
|
time.Sleep(delay + jitter)
|
|
|
|
log.Infof("Retrying upload attempt %d/%d", attempt+1, maxRetries+1)
|
|
}
|
|
|
|
// Create a custom ResponseWriter that captures errors
|
|
recorder := &ResponseRecorder{
|
|
ResponseWriter: w,
|
|
statusCode: 200,
|
|
}
|
|
|
|
// Call the original handler
|
|
originalHandler(recorder, r)
|
|
|
|
// Check if the request was successful
|
|
if recorder.statusCode < 400 {
|
|
return // Success
|
|
}
|
|
|
|
lastErr = recorder.lastError
|
|
|
|
// Don't retry on client errors (4xx)
|
|
if recorder.statusCode >= 400 && recorder.statusCode < 500 {
|
|
break
|
|
}
|
|
}
|
|
|
|
// All retries failed
|
|
if lastErr != nil {
|
|
log.Errorf("Upload failed after %d retries: %v", maxRetries+1, lastErr)
|
|
http.Error(w, lastErr.Error(), http.StatusInternalServerError)
|
|
} else {
|
|
http.Error(w, "Upload failed after retries", http.StatusInternalServerError)
|
|
}
|
|
}
|
|
}
|
|
|
|
// ResponseRecorder captures response information for retry logic
|
|
type ResponseRecorder struct {
|
|
http.ResponseWriter
|
|
statusCode int
|
|
lastError error
|
|
}
|
|
|
|
func (r *ResponseRecorder) WriteHeader(statusCode int) {
|
|
r.statusCode = statusCode
|
|
r.ResponseWriter.WriteHeader(statusCode)
|
|
}
|
|
|
|
func (r *ResponseRecorder) Write(data []byte) (int, error) {
|
|
n, err := r.ResponseWriter.Write(data)
|
|
if err != nil {
|
|
r.lastError = err
|
|
}
|
|
return n, err
|
|
}
|
|
|
|
// Enhanced timeout configuration for mobile scenarios
|
|
func ConfigureEnhancedTimeouts() {
|
|
// These don't modify core functions, just suggest better defaults
|
|
log.Info("Applying enhanced timeout configuration for mobile/network switching scenarios")
|
|
|
|
// Log current timeout settings
|
|
log.Infof("Current ReadTimeout: %s", conf.Timeouts.Read)
|
|
log.Infof("Current WriteTimeout: %s", conf.Timeouts.Write)
|
|
log.Infof("Current IdleTimeout: %s", conf.Timeouts.Idle)
|
|
|
|
// Suggest better timeouts in logs
|
|
log.Info("Recommended timeouts for mobile scenarios:")
|
|
log.Info(" ReadTimeout: 300s (5 minutes)")
|
|
log.Info(" WriteTimeout: 300s (5 minutes)")
|
|
log.Info(" IdleTimeout: 600s (10 minutes)")
|
|
log.Info(" Update your configuration file to apply these settings")
|
|
}
|
|
|
|
// Global network resilience manager
|
|
var networkManager *NetworkResilienceManager
|
|
|
|
// InitializeNetworkResilience initializes the network resilience system
|
|
func InitializeNetworkResilience() {
|
|
networkManager = NewNetworkResilienceManager()
|
|
ConfigureEnhancedTimeouts()
|
|
log.Info("Network resilience system initialized")
|
|
}
|