Add enhanced configuration templates for adaptive I/O features
- Introduced a comprehensive configuration template (config-adaptive.toml) for adaptive I/O, enabling improved upload/download dual stack with various performance optimizations, security settings, and network resilience features. - Created a test configuration template (test-config.toml) mirroring the adaptive configuration for testing purposes. - Added a simple test configuration (test-simple-config.toml) for basic adaptive features testing with essential parameters. - Included an empty Jupyter notebook (xep0363_analysis.ipynb) for future analysis related to XEP-0363.
This commit is contained in:
1263
cmd/server/adaptive_io.go
Normal file
1263
cmd/server/adaptive_io.go
Normal file
File diff suppressed because it is too large
Load Diff
309
cmd/server/client_network_handler.go
Normal file
309
cmd/server/client_network_handler.go
Normal file
@ -0,0 +1,309 @@
|
||||
// client_network_handler.go - Handles clients with multiple network interfaces
|
||||
// This is the CORRECT implementation focusing on CLIENT multi-interface support
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// ClientConnectionTracker manages clients that switch between network interfaces
|
||||
type ClientConnectionTracker struct {
|
||||
sessions map[string]*ClientSession // sessionID -> session info
|
||||
ipToSession map[string]string // IP -> sessionID for quick lookup
|
||||
mutex sync.RWMutex
|
||||
config *ClientNetworkConfig
|
||||
}
|
||||
|
||||
// ClientSession represents a client that may connect from multiple IPs/interfaces
|
||||
type ClientSession struct {
|
||||
SessionID string
|
||||
ClientIPs []string // All IPs this session has used
|
||||
ConnectionType string // mobile, wifi, ethernet, unknown
|
||||
LastSeen time.Time
|
||||
UploadInfo *UploadSessionInfo
|
||||
NetworkQuality float64 // 0-100 quality score
|
||||
mutex sync.RWMutex
|
||||
}
|
||||
|
||||
// UploadSessionInfo tracks upload progress across network switches
|
||||
type UploadSessionInfo struct {
|
||||
FileName string
|
||||
TotalSize int64
|
||||
UploadedBytes int64
|
||||
ChunkSize int64
|
||||
LastChunkID int
|
||||
Chunks map[int]bool // chunkID -> received
|
||||
Started time.Time
|
||||
LastActivity time.Time
|
||||
}
|
||||
|
||||
// ClientNetworkConfig holds configuration for client network handling
|
||||
type ClientNetworkConfig struct {
|
||||
SessionBasedTracking bool `toml:"session_based_tracking" mapstructure:"session_based_tracking"`
|
||||
AllowIPChanges bool `toml:"allow_ip_changes" mapstructure:"allow_ip_changes"`
|
||||
SessionMigrationTimeout time.Duration // Will be parsed from string in main.go
|
||||
MaxIPChangesPerSession int `toml:"max_ip_changes_per_session" mapstructure:"max_ip_changes_per_session"`
|
||||
ClientConnectionDetection bool `toml:"client_connection_detection" mapstructure:"client_connection_detection"`
|
||||
AdaptToClientNetwork bool `toml:"adapt_to_client_network" mapstructure:"adapt_to_client_network"`
|
||||
}
|
||||
|
||||
// ConnectionType represents different client connection types
|
||||
type ConnectionType int
|
||||
|
||||
const (
|
||||
ConnectionUnknown ConnectionType = iota
|
||||
ConnectionMobile // LTE/5G
|
||||
ConnectionWiFi // WiFi
|
||||
ConnectionEthernet // Wired
|
||||
)
|
||||
|
||||
func (ct ConnectionType) String() string {
|
||||
switch ct {
|
||||
case ConnectionMobile:
|
||||
return "mobile"
|
||||
case ConnectionWiFi:
|
||||
return "wifi"
|
||||
case ConnectionEthernet:
|
||||
return "ethernet"
|
||||
default:
|
||||
return "unknown"
|
||||
}
|
||||
}
|
||||
|
||||
// NewClientConnectionTracker creates a new tracker for multi-interface clients
|
||||
func NewClientConnectionTracker(config *ClientNetworkConfig) *ClientConnectionTracker {
|
||||
return &ClientConnectionTracker{
|
||||
sessions: make(map[string]*ClientSession),
|
||||
ipToSession: make(map[string]string),
|
||||
config: config,
|
||||
}
|
||||
}
|
||||
|
||||
// DetectClientConnectionType analyzes the request to determine client connection type
|
||||
func (cct *ClientConnectionTracker) DetectClientConnectionType(r *http.Request) string {
|
||||
// Check User-Agent for mobile indicators
|
||||
userAgent := strings.ToLower(r.Header.Get("User-Agent"))
|
||||
|
||||
// Mobile detection
|
||||
if containsAny(userAgent, "mobile", "android", "iphone", "ipad", "phone") {
|
||||
return "mobile"
|
||||
}
|
||||
|
||||
// Check for specific network indicators in headers
|
||||
if xForwardedFor := r.Header.Get("X-Forwarded-For"); xForwardedFor != "" {
|
||||
// This might indicate the client is behind a mobile carrier NAT
|
||||
// Additional logic could be added here
|
||||
}
|
||||
|
||||
// Check connection patterns (this would need more sophisticated logic)
|
||||
clientIP := getClientIP(r)
|
||||
if cct.isLikelyMobileIP(clientIP) {
|
||||
return "mobile"
|
||||
}
|
||||
|
||||
// Default assumption for unknown
|
||||
return "unknown"
|
||||
}
|
||||
|
||||
// TrackClientSession tracks a client session across potential IP changes
|
||||
func (cct *ClientConnectionTracker) TrackClientSession(sessionID string, clientIP string, r *http.Request) *ClientSession {
|
||||
cct.mutex.Lock()
|
||||
defer cct.mutex.Unlock()
|
||||
|
||||
// Check if this IP is already associated with a different session
|
||||
if existingSessionID, exists := cct.ipToSession[clientIP]; exists && existingSessionID != sessionID {
|
||||
// This IP was previously used by a different session
|
||||
// This could indicate a client that switched networks
|
||||
if cct.config.AllowIPChanges {
|
||||
// Remove old association
|
||||
delete(cct.ipToSession, clientIP)
|
||||
}
|
||||
}
|
||||
|
||||
// Get or create session
|
||||
session, exists := cct.sessions[sessionID]
|
||||
if !exists {
|
||||
session = &ClientSession{
|
||||
SessionID: sessionID,
|
||||
ClientIPs: []string{clientIP},
|
||||
ConnectionType: cct.DetectClientConnectionType(r),
|
||||
LastSeen: time.Now(),
|
||||
NetworkQuality: 100.0, // Start with good quality
|
||||
}
|
||||
cct.sessions[sessionID] = session
|
||||
} else {
|
||||
session.mutex.Lock()
|
||||
// Add this IP if it's not already tracked
|
||||
if !contains(session.ClientIPs, clientIP) {
|
||||
if len(session.ClientIPs) < cct.config.MaxIPChangesPerSession {
|
||||
session.ClientIPs = append(session.ClientIPs, clientIP)
|
||||
fmt.Printf("Client session %s now using new IP: %s (total IPs: %d)\n",
|
||||
sessionID, clientIP, len(session.ClientIPs))
|
||||
}
|
||||
}
|
||||
session.LastSeen = time.Now()
|
||||
session.mutex.Unlock()
|
||||
}
|
||||
|
||||
// Update IP to session mapping
|
||||
cct.ipToSession[clientIP] = sessionID
|
||||
|
||||
return session
|
||||
}
|
||||
|
||||
// GetOptimalChunkSize returns the optimal chunk size for a client's connection type
|
||||
func (cct *ClientConnectionTracker) GetOptimalChunkSize(session *ClientSession) int64 {
|
||||
switch session.ConnectionType {
|
||||
case "mobile":
|
||||
return 256 * 1024 // 256KB for mobile/LTE
|
||||
case "wifi":
|
||||
return 2 * 1024 * 1024 // 2MB for WiFi
|
||||
case "ethernet":
|
||||
return 8 * 1024 * 1024 // 8MB for ethernet
|
||||
default:
|
||||
return 1 * 1024 * 1024 // 1MB default
|
||||
}
|
||||
}
|
||||
|
||||
// GetOptimalTimeout returns the optimal timeout for a client's connection type
|
||||
func (cct *ClientConnectionTracker) GetOptimalTimeout(session *ClientSession, baseTimeout time.Duration) time.Duration {
|
||||
switch session.ConnectionType {
|
||||
case "mobile":
|
||||
return time.Duration(float64(baseTimeout) * 2.0) // 2x timeout for mobile
|
||||
case "wifi":
|
||||
return baseTimeout // Standard timeout for WiFi
|
||||
case "ethernet":
|
||||
return time.Duration(float64(baseTimeout) * 0.8) // 0.8x timeout for ethernet
|
||||
default:
|
||||
return baseTimeout
|
||||
}
|
||||
}
|
||||
|
||||
// HandleClientReconnection handles when a client reconnects from a different IP
|
||||
func (cct *ClientConnectionTracker) HandleClientReconnection(sessionID string, newIP string, r *http.Request) error {
|
||||
cct.mutex.Lock()
|
||||
defer cct.mutex.Unlock()
|
||||
|
||||
session, exists := cct.sessions[sessionID]
|
||||
if !exists {
|
||||
return fmt.Errorf("session %s not found", sessionID)
|
||||
}
|
||||
|
||||
session.mutex.Lock()
|
||||
defer session.mutex.Unlock()
|
||||
|
||||
// Check if this is actually a new IP
|
||||
if contains(session.ClientIPs, newIP) {
|
||||
// Client reconnected from known IP
|
||||
session.LastSeen = time.Now()
|
||||
return nil
|
||||
}
|
||||
|
||||
// This is a new IP for this session - client likely switched networks
|
||||
if len(session.ClientIPs) >= cct.config.MaxIPChangesPerSession {
|
||||
return fmt.Errorf("session %s exceeded maximum IP changes (%d)",
|
||||
sessionID, cct.config.MaxIPChangesPerSession)
|
||||
}
|
||||
|
||||
// Add new IP and update connection type
|
||||
session.ClientIPs = append(session.ClientIPs, newIP)
|
||||
session.ConnectionType = cct.DetectClientConnectionType(r)
|
||||
session.LastSeen = time.Now()
|
||||
|
||||
// Update IP mapping
|
||||
cct.ipToSession[newIP] = sessionID
|
||||
|
||||
fmt.Printf("Client session %s reconnected from new IP %s (connection type: %s)\n",
|
||||
sessionID, newIP, session.ConnectionType)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ResumeUpload handles resuming an upload when client switches networks
|
||||
func (cct *ClientConnectionTracker) ResumeUpload(sessionID string, uploadInfo *UploadSessionInfo) error {
|
||||
cct.mutex.RLock()
|
||||
session, exists := cct.sessions[sessionID]
|
||||
cct.mutex.RUnlock()
|
||||
|
||||
if !exists {
|
||||
return fmt.Errorf("session %s not found for upload resume", sessionID)
|
||||
}
|
||||
|
||||
session.mutex.Lock()
|
||||
session.UploadInfo = uploadInfo
|
||||
session.LastSeen = time.Now()
|
||||
session.mutex.Unlock()
|
||||
|
||||
fmt.Printf("Resumed upload for session %s: %s (%d/%d bytes)\n",
|
||||
sessionID, uploadInfo.FileName, uploadInfo.UploadedBytes, uploadInfo.TotalSize)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// CleanupStaleSession removes sessions that haven't been seen recently
|
||||
func (cct *ClientConnectionTracker) CleanupStaleSessions() {
|
||||
cct.mutex.Lock()
|
||||
defer cct.mutex.Unlock()
|
||||
|
||||
cutoff := time.Now().Add(-cct.config.SessionMigrationTimeout)
|
||||
|
||||
for sessionID, session := range cct.sessions {
|
||||
if session.LastSeen.Before(cutoff) {
|
||||
// Remove from IP mappings
|
||||
for _, ip := range session.ClientIPs {
|
||||
delete(cct.ipToSession, ip)
|
||||
}
|
||||
// Remove session
|
||||
delete(cct.sessions, sessionID)
|
||||
fmt.Printf("Cleaned up stale session: %s\n", sessionID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// isLikelyMobileIP attempts to determine if an IP is from a mobile carrier
|
||||
func (cct *ClientConnectionTracker) isLikelyMobileIP(ip string) bool {
|
||||
// This is a simplified check - in practice, you'd check against
|
||||
// known mobile carrier IP ranges
|
||||
|
||||
parsedIP := net.ParseIP(ip)
|
||||
if parsedIP == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// Example: Some mobile carriers use specific IP ranges
|
||||
// This would need to be populated with actual carrier ranges
|
||||
mobileRanges := []string{
|
||||
"10.0.0.0/8", // Some carriers use 10.x for mobile
|
||||
"172.16.0.0/12", // Some carriers use 172.x for mobile
|
||||
}
|
||||
|
||||
for _, rangeStr := range mobileRanges {
|
||||
_, cidr, err := net.ParseCIDR(rangeStr)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if cidr.Contains(parsedIP) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// Helper function to start cleanup routine
|
||||
func (cct *ClientConnectionTracker) StartCleanupRoutine() {
|
||||
go func() {
|
||||
ticker := time.NewTicker(5 * time.Minute) // Clean up every 5 minutes
|
||||
defer ticker.Stop()
|
||||
|
||||
for range ticker.C {
|
||||
cct.CleanupStaleSessions()
|
||||
}
|
||||
}()
|
||||
}
|
@ -224,13 +224,33 @@ type BuildConfig struct {
|
||||
}
|
||||
|
||||
type NetworkResilienceConfig struct {
|
||||
FastDetection bool `toml:"fast_detection" mapstructure:"fast_detection"`
|
||||
QualityMonitoring bool `toml:"quality_monitoring" mapstructure:"quality_monitoring"`
|
||||
PredictiveSwitching bool `toml:"predictive_switching" mapstructure:"predictive_switching"`
|
||||
MobileOptimizations bool `toml:"mobile_optimizations" mapstructure:"mobile_optimizations"`
|
||||
DetectionInterval string `toml:"detection_interval" mapstructure:"detection_interval"`
|
||||
QualityCheckInterval string `toml:"quality_check_interval" mapstructure:"quality_check_interval"`
|
||||
MaxDetectionInterval string `toml:"max_detection_interval" mapstructure:"max_detection_interval"`
|
||||
FastDetection bool `toml:"fast_detection" mapstructure:"fast_detection"`
|
||||
QualityMonitoring bool `toml:"quality_monitoring" mapstructure:"quality_monitoring"`
|
||||
PredictiveSwitching bool `toml:"predictive_switching" mapstructure:"predictive_switching"`
|
||||
MobileOptimizations bool `toml:"mobile_optimizations" mapstructure:"mobile_optimizations"`
|
||||
DetectionInterval string `toml:"detection_interval" mapstructure:"detection_interval"`
|
||||
QualityCheckInterval string `toml:"quality_check_interval" mapstructure:"quality_check_interval"`
|
||||
MaxDetectionInterval string `toml:"max_detection_interval" mapstructure:"max_detection_interval"`
|
||||
|
||||
// Multi-interface support
|
||||
MultiInterfaceEnabled bool `toml:"multi_interface_enabled" mapstructure:"multi_interface_enabled"`
|
||||
InterfacePriority []string `toml:"interface_priority" mapstructure:"interface_priority"`
|
||||
AutoSwitchEnabled bool `toml:"auto_switch_enabled" mapstructure:"auto_switch_enabled"`
|
||||
SwitchThresholdLatency string `toml:"switch_threshold_latency" mapstructure:"switch_threshold_latency"`
|
||||
SwitchThresholdPacketLoss float64 `toml:"switch_threshold_packet_loss" mapstructure:"switch_threshold_packet_loss"`
|
||||
QualityDegradationThreshold float64 `toml:"quality_degradation_threshold" mapstructure:"quality_degradation_threshold"`
|
||||
MaxSwitchAttempts int `toml:"max_switch_attempts" mapstructure:"max_switch_attempts"`
|
||||
SwitchDetectionInterval string `toml:"switch_detection_interval" mapstructure:"switch_detection_interval"`
|
||||
}
|
||||
|
||||
// ClientNetworkConfigTOML is used for loading from TOML where timeout is a string
|
||||
type ClientNetworkConfigTOML struct {
|
||||
SessionBasedTracking bool `toml:"session_based_tracking" mapstructure:"session_based_tracking"`
|
||||
AllowIPChanges bool `toml:"allow_ip_changes" mapstructure:"allow_ip_changes"`
|
||||
SessionMigrationTimeout string `toml:"session_migration_timeout" mapstructure:"session_migration_timeout"`
|
||||
MaxIPChangesPerSession int `toml:"max_ip_changes_per_session" mapstructure:"max_ip_changes_per_session"`
|
||||
ClientConnectionDetection bool `toml:"client_connection_detection" mapstructure:"client_connection_detection"`
|
||||
AdaptToClientNetwork bool `toml:"adapt_to_client_network" mapstructure:"adapt_to_client_network"`
|
||||
}
|
||||
|
||||
// This is the main Config struct to be used
|
||||
@ -249,7 +269,8 @@ type Config struct {
|
||||
Workers WorkersConfig `mapstructure:"workers"`
|
||||
File FileConfig `mapstructure:"file"`
|
||||
Build BuildConfig `mapstructure:"build"`
|
||||
NetworkResilience NetworkResilienceConfig `mapstructure:"network_resilience"`
|
||||
NetworkResilience NetworkResilienceConfig `mapstructure:"network_resilience"`
|
||||
ClientNetwork ClientNetworkConfigTOML `mapstructure:"client_network_support"`
|
||||
}
|
||||
|
||||
type UploadTask struct {
|
||||
@ -350,6 +371,9 @@ const maxConcurrentOperations = 10
|
||||
|
||||
var semaphore = make(chan struct{}, maxConcurrentOperations)
|
||||
|
||||
// Global client connection tracker for multi-interface support
|
||||
var clientTracker *ClientConnectionTracker
|
||||
|
||||
var logMessages []string
|
||||
var logMu sync.Mutex
|
||||
|
||||
@ -564,6 +588,37 @@ func main() {
|
||||
|
||||
// Perform comprehensive configuration validation
|
||||
validationResult := ValidateConfigComprehensive(&conf)
|
||||
|
||||
// Initialize client connection tracker for multi-interface support
|
||||
clientNetworkConfig := &ClientNetworkConfig{
|
||||
SessionBasedTracking: conf.ClientNetwork.SessionBasedTracking,
|
||||
AllowIPChanges: conf.ClientNetwork.AllowIPChanges,
|
||||
MaxIPChangesPerSession: conf.ClientNetwork.MaxIPChangesPerSession,
|
||||
AdaptToClientNetwork: conf.ClientNetwork.AdaptToClientNetwork,
|
||||
}
|
||||
|
||||
// Parse session migration timeout
|
||||
if conf.ClientNetwork.SessionMigrationTimeout != "" {
|
||||
if timeout, err := time.ParseDuration(conf.ClientNetwork.SessionMigrationTimeout); err == nil {
|
||||
clientNetworkConfig.SessionMigrationTimeout = timeout
|
||||
} else {
|
||||
clientNetworkConfig.SessionMigrationTimeout = 5 * time.Minute // default
|
||||
}
|
||||
} else {
|
||||
clientNetworkConfig.SessionMigrationTimeout = 5 * time.Minute // default
|
||||
}
|
||||
|
||||
// Set defaults if not configured
|
||||
if clientNetworkConfig.MaxIPChangesPerSession == 0 {
|
||||
clientNetworkConfig.MaxIPChangesPerSession = 10
|
||||
}
|
||||
|
||||
// Initialize the client tracker
|
||||
clientTracker = NewClientConnectionTracker(clientNetworkConfig)
|
||||
if clientTracker != nil {
|
||||
clientTracker.StartCleanupRoutine()
|
||||
log.Info("Client multi-interface support initialized")
|
||||
}
|
||||
PrintValidationResults(validationResult)
|
||||
|
||||
if validationResult.HasErrors() {
|
||||
@ -1417,6 +1472,12 @@ func validateV3HMAC(r *http.Request, secret string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// generateSessionID creates a unique session ID for client tracking
|
||||
func generateSessionID() string {
|
||||
return fmt.Sprintf("session_%d_%x", time.Now().UnixNano(),
|
||||
sha256.Sum256([]byte(fmt.Sprintf("%d%s", time.Now().UnixNano(), conf.Security.Secret))))[:16]
|
||||
}
|
||||
|
||||
// handleUpload handles file uploads.
|
||||
func handleUpload(w http.ResponseWriter, r *http.Request) {
|
||||
startTime := time.Now()
|
||||
@ -1449,6 +1510,30 @@ func handleUpload(w http.ResponseWriter, r *http.Request) {
|
||||
log.Debugf("HMAC authentication successful for upload request: %s", r.URL.Path)
|
||||
}
|
||||
|
||||
// Client multi-interface tracking
|
||||
var clientSession *ClientSession
|
||||
if clientTracker != nil && conf.ClientNetwork.SessionBasedTracking {
|
||||
// Generate or extract session ID (from headers, form data, or create new)
|
||||
sessionID := r.Header.Get("X-Upload-Session-ID")
|
||||
if sessionID == "" {
|
||||
// Check if there's a session ID in form data
|
||||
sessionID = r.FormValue("session_id")
|
||||
}
|
||||
if sessionID == "" {
|
||||
// Generate new session ID
|
||||
sessionID = generateSessionID()
|
||||
}
|
||||
|
||||
clientIP := getClientIP(r)
|
||||
clientSession = clientTracker.TrackClientSession(sessionID, clientIP, r)
|
||||
|
||||
// Add session ID to response headers for client to use in subsequent requests
|
||||
w.Header().Set("X-Upload-Session-ID", sessionID)
|
||||
|
||||
log.Debugf("Client session tracking: %s from IP %s (connection type: %s)",
|
||||
sessionID, clientIP, clientSession.ConnectionType)
|
||||
}
|
||||
|
||||
// Parse multipart form
|
||||
err := r.ParseMultipartForm(32 << 20) // 32MB max memory
|
||||
if err != nil {
|
||||
|
@ -288,6 +288,17 @@ func (m *NetworkResilienceManager) UnregisterUpload(sessionID string) {
|
||||
}
|
||||
}
|
||||
|
||||
// GetUploadContext retrieves the upload context for a given session ID
|
||||
func (m *NetworkResilienceManager) GetUploadContext(sessionID string) *UploadContext {
|
||||
m.mutex.RLock()
|
||||
defer m.mutex.RUnlock()
|
||||
|
||||
if ctx, exists := m.activeUploads[sessionID]; exists {
|
||||
return ctx
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// PauseAllUploads pauses all active uploads
|
||||
func (m *NetworkResilienceManager) PauseAllUploads() {
|
||||
m.mutex.Lock()
|
||||
|
@ -305,10 +305,6 @@ func (s *UploadSessionStore) cleanupExpiredSessions() {
|
||||
}
|
||||
|
||||
// Helper functions
|
||||
func generateSessionID() string {
|
||||
return fmt.Sprintf("%d_%s", time.Now().Unix(), randomString(16))
|
||||
}
|
||||
|
||||
func getChunkSize() int64 {
|
||||
// Default 5MB chunks, configurable
|
||||
if conf.Uploads.ChunkSize != "" {
|
||||
|
Reference in New Issue
Block a user