Files
hmac-file-server/cmd/server/adaptive_io.go
Alexander Renz 3887feb12c Add enhanced configuration templates for adaptive I/O features
- Introduced a comprehensive configuration template (config-adaptive.toml) for adaptive I/O, enabling improved upload/download dual stack with various performance optimizations, security settings, and network resilience features.
- Created a test configuration template (test-config.toml) mirroring the adaptive configuration for testing purposes.
- Added a simple test configuration (test-simple-config.toml) for basic adaptive features testing with essential parameters.
- Included an empty Jupyter notebook (xep0363_analysis.ipynb) for future analysis related to XEP-0363.
2025-08-23 12:07:31 +00:00

1264 lines
34 KiB
Go

// adaptive_io.go - Enhanced I/O engine with adaptive buffer management and network optimization
package main
import (
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"os"
"path/filepath"
"strings"
"sync"
"time"
)
// AdaptiveBufferPool manages multiple buffer pools of different sizes
type AdaptiveBufferPool struct {
pools map[int]*sync.Pool
metrics *NetworkMetrics
currentOptimalSize int
mutex sync.RWMutex
lastOptimization time.Time
optimizationInterval time.Duration
}
// NetworkMetrics tracks performance characteristics
type NetworkMetrics struct {
ThroughputSamples []ThroughputSample
LatencySamples []time.Duration
ErrorRate float64
LastUpdate time.Time
mutex sync.RWMutex
}
// ThroughputSample represents a throughput measurement
type ThroughputSample struct {
Timestamp time.Time
BytesPerSec int64
BufferSize int
}
// StreamingEngine provides unified streaming with adaptive optimization
type StreamingEngine struct {
bufferPool *AdaptiveBufferPool
metrics *NetworkMetrics
resilienceManager *NetworkResilienceManager
interfaceManager *MultiInterfaceManager
}
// ClientProfile stores optimization data per client
type ClientProfile struct {
OptimalChunkSize int64
OptimalBufferSize int
ReliabilityScore float64
AverageThroughput int64
LastSeen time.Time
ConnectionType string
PreferredInterface string
InterfaceHistory []InterfaceUsage
}
// InterfaceUsage tracks performance per network interface
type InterfaceUsage struct {
InterfaceName string
LastUsed time.Time
AverageThroughput int64
ReliabilityScore float64
OptimalBufferSize int
}
var (
globalStreamingEngine *StreamingEngine
clientProfiles = make(map[string]*ClientProfile)
clientProfilesMutex sync.RWMutex
multiInterfaceManager *MultiInterfaceManager
)
// Initialize the global streaming engine
func initStreamingEngine() {
// Initialize multi-interface manager
multiInterfaceManager = NewMultiInterfaceManager()
globalStreamingEngine = &StreamingEngine{
bufferPool: NewAdaptiveBufferPool(),
metrics: NewNetworkMetrics(),
interfaceManager: multiInterfaceManager,
}
// Start optimization routine
go globalStreamingEngine.optimizationLoop()
// Start multi-interface monitoring
if conf.NetworkResilience.MultiInterfaceEnabled {
go multiInterfaceManager.StartMonitoring()
log.Info("Multi-interface monitoring enabled")
}
log.Info("Adaptive streaming engine with multi-interface support initialized")
}
// NewAdaptiveBufferPool creates a new adaptive buffer pool
func NewAdaptiveBufferPool() *AdaptiveBufferPool {
pool := &AdaptiveBufferPool{
pools: make(map[int]*sync.Pool),
metrics: NewNetworkMetrics(),
currentOptimalSize: 64 * 1024, // Start with 64KB
optimizationInterval: 30 * time.Second,
}
// Initialize pools for different buffer sizes
sizes := []int{
16 * 1024, // 16KB - for slow connections
32 * 1024, // 32KB - current default
64 * 1024, // 64KB - balanced
128 * 1024, // 128KB - fast connections
256 * 1024, // 256KB - very fast connections
512 * 1024, // 512KB - high-speed networks
1024 * 1024, // 1MB - maximum for extreme cases
}
for _, size := range sizes {
size := size // capture for closure
pool.pools[size] = &sync.Pool{
New: func() interface{} {
buf := make([]byte, size)
return &buf
},
}
}
return pool
}
// NewNetworkMetrics creates a new network metrics tracker
func NewNetworkMetrics() *NetworkMetrics {
return &NetworkMetrics{
ThroughputSamples: make([]ThroughputSample, 0, 100),
LatencySamples: make([]time.Duration, 0, 100),
}
}
// GetOptimalBuffer returns the best buffer size for current conditions
func (abp *AdaptiveBufferPool) GetOptimalBuffer() (*[]byte, int) {
abp.mutex.RLock()
size := abp.currentOptimalSize
abp.mutex.RUnlock()
pool, exists := abp.pools[size]
if !exists {
// Fallback to 64KB if size not available
size = 64 * 1024
pool = abp.pools[size]
}
bufPtr := pool.Get().(*[]byte)
return bufPtr, size
}
// PutBuffer returns a buffer to the appropriate pool
func (abp *AdaptiveBufferPool) PutBuffer(bufPtr *[]byte, size int) {
if pool, exists := abp.pools[size]; exists {
pool.Put(bufPtr)
}
}
// StreamWithAdaptation performs streaming I/O with adaptive optimization
func (se *StreamingEngine) StreamWithAdaptation(
dst io.Writer,
src io.Reader,
contentLength int64,
sessionID string,
clientIP string,
) (int64, error) {
startTime := time.Now()
// Get client profile for optimization
profile := getClientProfile(clientIP)
// Select optimal buffer size
bufPtr, bufferSize := se.selectOptimalBuffer(contentLength, profile)
defer se.bufferPool.PutBuffer(bufPtr, bufferSize)
buf := *bufPtr
var written int64
var lastMetricUpdate time.Time
for {
// Check for network resilience signals
if se.resilienceManager != nil {
if uploadCtx := se.resilienceManager.GetUploadContext(sessionID); uploadCtx != nil {
select {
case <-uploadCtx.PauseChan:
// Wait for resume signal
<-uploadCtx.ResumeChan
case <-uploadCtx.CancelChan:
return written, fmt.Errorf("upload cancelled due to network issues")
default:
// Continue
}
}
}
// Read data
n, readErr := src.Read(buf)
if n > 0 {
// Write data
w, writeErr := dst.Write(buf[:n])
written += int64(w)
if writeErr != nil {
se.recordError(clientIP, writeErr)
return written, writeErr
}
// Update metrics periodically
if time.Since(lastMetricUpdate) > time.Second {
se.updateMetrics(written, startTime, bufferSize, clientIP)
lastMetricUpdate = time.Now()
}
}
if readErr != nil {
if readErr == io.EOF {
break
}
se.recordError(clientIP, readErr)
return written, readErr
}
}
// Final metrics update
duration := time.Since(startTime)
se.recordTransferComplete(written, duration, bufferSize, clientIP)
return written, nil
}
// selectOptimalBuffer chooses the best buffer size based on various factors
func (se *StreamingEngine) selectOptimalBuffer(contentLength int64, profile *ClientProfile) (*[]byte, int) {
// Start with current optimal size
bufferSize := se.bufferPool.currentOptimalSize
// Adjust based on file size
if contentLength > 0 {
if contentLength < 1024*1024 { // < 1MB
bufferSize = minInt(bufferSize, 64*1024)
} else if contentLength > 100*1024*1024 { // > 100MB
bufferSize = maxInt(bufferSize, 256*1024)
}
}
// Adjust based on client profile
if profile != nil {
if profile.OptimalBufferSize > 0 {
bufferSize = profile.OptimalBufferSize
}
// Adjust for connection type
switch profile.ConnectionType {
case "mobile", "cellular":
bufferSize = minInt(bufferSize, 64*1024)
case "wifi":
bufferSize = minInt(bufferSize, 256*1024)
case "ethernet", "fiber":
bufferSize = maxInt(bufferSize, 128*1024)
}
}
return se.bufferPool.GetOptimalBuffer()
}
// updateMetrics records performance metrics
func (se *StreamingEngine) updateMetrics(bytesTransferred int64, startTime time.Time, bufferSize int, clientIP string) {
duration := time.Since(startTime)
if duration == 0 {
return
}
throughput := bytesTransferred * int64(time.Second) / int64(duration)
se.metrics.mutex.Lock()
se.metrics.ThroughputSamples = append(se.metrics.ThroughputSamples, ThroughputSample{
Timestamp: time.Now(),
BytesPerSec: throughput,
BufferSize: bufferSize,
})
// Keep only recent samples
if len(se.metrics.ThroughputSamples) > 100 {
se.metrics.ThroughputSamples = se.metrics.ThroughputSamples[1:]
}
se.metrics.LastUpdate = time.Now()
se.metrics.mutex.Unlock()
// Update client profile
updateClientProfile(clientIP, throughput, bufferSize)
}
// recordTransferComplete records final transfer metrics
func (se *StreamingEngine) recordTransferComplete(bytesTransferred int64, duration time.Duration, bufferSize int, clientIP string) {
if duration == 0 {
return
}
throughput := bytesTransferred * int64(time.Second) / int64(duration)
// Update global metrics
se.updateMetrics(bytesTransferred, time.Now().Add(-duration), bufferSize, clientIP)
// Log performance for large transfers
if bytesTransferred > 10*1024*1024 {
log.Debugf("Transfer complete: %s in %s (%.2f MB/s) using %dKB buffer",
formatBytes(bytesTransferred),
duration,
float64(throughput)/(1024*1024),
bufferSize/1024)
}
}
// recordError records transfer errors
func (se *StreamingEngine) recordError(clientIP string, err error) {
se.metrics.mutex.Lock()
se.metrics.ErrorRate = se.metrics.ErrorRate*0.9 + 0.1 // Exponential moving average
se.metrics.mutex.Unlock()
log.Warnf("Transfer error for client %s: %v", clientIP, err)
}
// optimizationLoop continuously optimizes buffer sizes
func (se *StreamingEngine) optimizationLoop() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
se.optimizeBufferSizes()
}
}
}
// optimizeBufferSizes analyzes performance and adjusts optimal buffer size
func (se *StreamingEngine) optimizeBufferSizes() {
se.metrics.mutex.RLock()
samples := make([]ThroughputSample, len(se.metrics.ThroughputSamples))
copy(samples, se.metrics.ThroughputSamples)
se.metrics.mutex.RUnlock()
if len(samples) < 10 {
return // Not enough data
}
// Analyze throughput by buffer size
bufferPerformance := make(map[int][]int64)
for _, sample := range samples {
if time.Since(sample.Timestamp) < 5*time.Minute { // Only recent samples
bufferPerformance[sample.BufferSize] = append(
bufferPerformance[sample.BufferSize],
sample.BytesPerSec,
)
}
}
// Find the buffer size with best average performance
bestSize := se.bufferPool.currentOptimalSize
bestPerformance := int64(0)
for size, throughputs := range bufferPerformance {
if len(throughputs) < 3 {
continue // Not enough samples
}
var total int64
for _, t := range throughputs {
total += t
}
avg := total / int64(len(throughputs))
if avg > bestPerformance {
bestPerformance = avg
bestSize = size
}
}
// Update optimal size if significantly better
if bestSize != se.bufferPool.currentOptimalSize {
se.bufferPool.mutex.Lock()
oldSize := se.bufferPool.currentOptimalSize
se.bufferPool.currentOptimalSize = bestSize
se.bufferPool.lastOptimization = time.Now()
se.bufferPool.mutex.Unlock()
log.Infof("Optimized buffer size: %dKB -> %dKB (%.2f%% improvement)",
oldSize/1024,
bestSize/1024,
float64(bestPerformance-bestPerformance*int64(oldSize)/int64(bestSize))*100/float64(bestPerformance))
}
}
// handleInterfaceSwitch handles network interface switching during transfers
func (se *StreamingEngine) handleInterfaceSwitch(oldInterface, newInterface string, reason SwitchReason) {
log.Infof("Handling interface switch from %s to %s (reason: %s)",
oldInterface, newInterface, multiInterfaceManager.switchReasonString(reason))
// Update client profiles with interface preference
clientProfilesMutex.Lock()
for clientIP, profile := range clientProfiles {
// Update preferred interface if the new one performs better
if profile.PreferredInterface == oldInterface {
// Check if we have good performance data for the new interface
for _, usage := range profile.InterfaceHistory {
if usage.InterfaceName == newInterface && usage.ReliabilityScore > 0.8 {
profile.PreferredInterface = newInterface
log.Debugf("Updated preferred interface for client %s: %s -> %s",
clientIP, oldInterface, newInterface)
break
}
}
}
}
clientProfilesMutex.Unlock()
// Adjust streaming parameters for the new interface if we have that data
if se.interfaceManager != nil {
if newIfaceInfo := se.interfaceManager.GetInterfaceInfo(newInterface); newIfaceInfo != nil {
se.adjustParametersForInterface(newIfaceInfo)
}
}
// Force buffer optimization on next transfer
se.bufferPool.mutex.Lock()
se.bufferPool.lastOptimization = time.Time{} // Force immediate re-optimization
se.bufferPool.mutex.Unlock()
}
// adjustParametersForInterface adjusts streaming parameters based on interface type
func (se *StreamingEngine) adjustParametersForInterface(iface *NetworkInterface) {
if iface == nil {
return
}
// Adjust buffer pool optimal size based on interface type and quality
var recommendedBufferSize int
switch iface.Type {
case InterfaceEthernet:
recommendedBufferSize = 512 * 1024 // 512KB for Ethernet
if iface.Quality != nil && iface.Quality.RTT < 10*time.Millisecond {
recommendedBufferSize = 1024 * 1024 // 1MB for very fast Ethernet
}
case InterfaceWiFi:
recommendedBufferSize = 256 * 1024 // 256KB for WiFi
if iface.Quality != nil && iface.Quality.Stability > 0.9 {
recommendedBufferSize = 512 * 1024 // 512KB for stable WiFi
}
case InterfaceLTE:
recommendedBufferSize = 128 * 1024 // 128KB for LTE
if iface.Quality != nil && iface.Quality.PacketLoss < 1.0 {
recommendedBufferSize = 256 * 1024 // 256KB for good LTE
}
case InterfaceCellular:
recommendedBufferSize = 64 * 1024 // 64KB for cellular
case InterfaceVPN:
recommendedBufferSize = 128 * 1024 // 128KB for VPN (account for overhead)
default:
recommendedBufferSize = 128 * 1024 // Default 128KB
}
// Update the adaptive buffer pool's optimal size
se.bufferPool.mutex.Lock()
se.bufferPool.currentOptimalSize = recommendedBufferSize
se.bufferPool.mutex.Unlock()
log.Debugf("Adjusted buffer size for interface %s (%s): %dKB",
iface.Name, multiInterfaceManager.interfaceTypeString(iface.Type), recommendedBufferSize/1024)
}// getClientProfile retrieves or creates a client profile
func getClientProfile(clientIP string) *ClientProfile {
clientProfilesMutex.RLock()
profile, exists := clientProfiles[clientIP]
clientProfilesMutex.RUnlock()
if exists {
return profile
}
// Create new profile
clientProfilesMutex.Lock()
defer clientProfilesMutex.Unlock()
// Double-check after acquiring write lock
if profile, exists := clientProfiles[clientIP]; exists {
return profile
}
profile = &ClientProfile{
OptimalChunkSize: 2 * 1024 * 1024, // 2MB default
OptimalBufferSize: 64 * 1024, // 64KB default
ReliabilityScore: 0.8, // Assume good initially
LastSeen: time.Now(),
ConnectionType: "unknown",
}
clientProfiles[clientIP] = profile
return profile
}
// updateClientProfile updates performance data for a client
func updateClientProfile(clientIP string, throughput int64, bufferSize int) {
profile := getClientProfile(clientIP)
clientProfilesMutex.Lock()
defer clientProfilesMutex.Unlock()
// Exponential moving average for throughput
if profile.AverageThroughput == 0 {
profile.AverageThroughput = throughput
} else {
profile.AverageThroughput = (profile.AverageThroughput*9 + throughput) / 10
}
// Update optimal buffer size if this performed well
if throughput > profile.AverageThroughput*110/100 { // 10% better
profile.OptimalBufferSize = bufferSize
}
// Track interface usage if multi-interface manager is available
if multiInterfaceManager != nil {
currentInterface := multiInterfaceManager.GetActiveInterface()
if currentInterface != "" {
updateInterfaceUsage(profile, currentInterface, throughput, bufferSize)
}
}
profile.LastSeen = time.Now()
}
// updateInterfaceUsage updates interface-specific performance data
func updateInterfaceUsage(profile *ClientProfile, interfaceName string, throughput int64, bufferSize int) {
// Find existing interface usage record
var usage *InterfaceUsage
for i := range profile.InterfaceHistory {
if profile.InterfaceHistory[i].InterfaceName == interfaceName {
usage = &profile.InterfaceHistory[i]
break
}
}
// Create new record if not found
if usage == nil {
profile.InterfaceHistory = append(profile.InterfaceHistory, InterfaceUsage{
InterfaceName: interfaceName,
LastUsed: time.Now(),
AverageThroughput: throughput,
ReliabilityScore: 0.8, // Start with good assumption
OptimalBufferSize: bufferSize,
})
} else {
// Update existing record
usage.LastUsed = time.Now()
usage.AverageThroughput = (usage.AverageThroughput*4 + throughput) / 5 // Faster adaptation
// Update reliability score based on performance consistency
if throughput > usage.AverageThroughput*90/100 { // Within 10% of average
usage.ReliabilityScore = minFloat64(usage.ReliabilityScore+0.1, 1.0)
} else {
usage.ReliabilityScore = maxFloat64(usage.ReliabilityScore-0.1, 0.0)
}
// Update optimal buffer size if performance improved
if throughput > usage.AverageThroughput {
usage.OptimalBufferSize = bufferSize
}
}
// Keep only recent interface history (last 10 interfaces)
if len(profile.InterfaceHistory) > 10 {
profile.InterfaceHistory = profile.InterfaceHistory[1:]
}
// Update preferred interface if this one is performing significantly better
if usage != nil && (profile.PreferredInterface == "" ||
usage.AverageThroughput > profile.AverageThroughput*120/100) {
profile.PreferredInterface = interfaceName
}
}
// detectConnectionType attempts to determine connection type from request
func detectConnectionType(r *http.Request) string {
userAgent := r.Header.Get("User-Agent")
// Simple heuristics - could be enhanced with more sophisticated detection
if containsAny(userAgent, "Mobile", "Android", "iPhone", "iPad") {
return "mobile"
}
// Check for specific client indicators
if containsAny(userAgent, "curl", "wget", "HTTPie") {
return "cli"
}
// Default assumption
return "browser"
}
// containsAny checks if any of the substrings exist in the main string
func containsAny(s string, substrings ...string) bool {
for _, substr := range substrings {
if len(s) >= len(substr) {
for i := 0; i <= len(s)-len(substr); i++ {
if s[i:i+len(substr)] == substr {
return true
}
}
}
}
return false
}
// Helper functions for adaptive I/O
func minInt(a, b int) int {
if a < b {
return a
}
return b
}
func maxInt(a, b int) int {
if a > b {
return a
}
return b
}
func minFloat64(a, b float64) float64 {
if a < b {
return a
}
return b
}
func maxFloat64(a, b float64) float64 {
if a > b {
return a
}
return b
}
// Enhanced upload handler using the streaming engine
func handleUploadWithAdaptiveIO(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
activeConnections.Inc()
defer activeConnections.Dec()
// Standard authentication and validation...
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
uploadErrorsTotal.Inc()
return
}
// Parse multipart form
err := r.ParseMultipartForm(32 << 20) // 32MB max memory
if err != nil {
http.Error(w, fmt.Sprintf("Error parsing form: %v", err), http.StatusBadRequest)
uploadErrorsTotal.Inc()
return
}
file, header, err := r.FormFile("file")
if err != nil {
http.Error(w, fmt.Sprintf("Error retrieving file: %v", err), http.StatusBadRequest)
uploadErrorsTotal.Inc()
return
}
defer file.Close()
// Generate filename and path...
var filename string
switch conf.Server.FileNaming {
case "HMAC":
h := hmac.New(sha256.New, []byte(conf.Security.Secret))
h.Write([]byte(header.Filename + time.Now().String()))
filename = hex.EncodeToString(h.Sum(nil)) + filepath.Ext(header.Filename)
default:
filename = header.Filename
}
storagePath := conf.Server.StoragePath
if conf.ISO.Enabled {
storagePath = conf.ISO.MountPoint
}
absFilename := filepath.Join(storagePath, filename)
// Create the file
dst, err := os.Create(absFilename)
if err != nil {
http.Error(w, fmt.Sprintf("Error creating file: %v", err), http.StatusInternalServerError)
uploadErrorsTotal.Inc()
return
}
defer dst.Close()
// Use adaptive streaming engine
clientIP := getClientIP(r)
sessionID := generateSessionID()
written, err := globalStreamingEngine.StreamWithAdaptation(
dst,
file,
header.Size,
sessionID,
clientIP,
)
if err != nil {
http.Error(w, fmt.Sprintf("Error saving file: %v", err), http.StatusInternalServerError)
uploadErrorsTotal.Inc()
os.Remove(absFilename)
return
}
// Update metrics
duration := time.Since(startTime)
uploadDuration.Observe(duration.Seconds())
uploadsTotal.Inc()
uploadSizeBytes.Observe(float64(written))
// Return success response
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
response := map[string]interface{}{
"success": true,
"filename": filename,
"size": written,
"duration": duration.String(),
}
json.NewEncoder(w).Encode(response)
log.Infof("Successfully uploaded %s (%s) in %s using adaptive I/O",
filename, formatBytes(written), duration)
}
// Enhanced download handler with adaptive streaming
func handleDownloadWithAdaptiveIO(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
activeConnections.Inc()
defer activeConnections.Dec()
// Extract filename from URL path
filename := filepath.Base(r.URL.Path)
if filename == "." || filename == "/" {
http.Error(w, "Invalid filename", http.StatusBadRequest)
downloadErrorsTotal.Inc()
return
}
// Construct full file path
storagePath := conf.Server.StoragePath
if conf.ISO.Enabled {
storagePath = conf.ISO.MountPoint
}
absFilename := filepath.Join(storagePath, filename)
// Sanitize the file path
absFilename, err := sanitizeFilePath(storagePath, filename)
if err != nil {
http.Error(w, fmt.Sprintf("Invalid file path: %v", err), http.StatusBadRequest)
downloadErrorsTotal.Inc()
return
}
// Check if file exists
fileInfo, err := os.Stat(absFilename)
if os.IsNotExist(err) {
http.Error(w, "File not found", http.StatusNotFound)
downloadErrorsTotal.Inc()
return
}
if err != nil {
http.Error(w, fmt.Sprintf("Error accessing file: %v", err), http.StatusInternalServerError)
downloadErrorsTotal.Inc()
return
}
// Open the file
file, err := os.Open(absFilename)
if err != nil {
http.Error(w, fmt.Sprintf("Error opening file: %v", err), http.StatusInternalServerError)
downloadErrorsTotal.Inc()
return
}
defer file.Close()
// Set headers
w.Header().Set("Content-Disposition", "attachment; filename=\""+filepath.Base(absFilename)+"\"")
w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("Content-Length", fmt.Sprintf("%d", fileInfo.Size()))
// Use adaptive streaming engine
clientIP := getClientIP(r)
sessionID := generateSessionID()
n, err := globalStreamingEngine.StreamWithAdaptation(
w,
file,
fileInfo.Size(),
sessionID,
clientIP,
)
if err != nil {
log.Errorf("Error during download of %s: %v", absFilename, err)
downloadErrorsTotal.Inc()
return
}
// Update metrics
duration := time.Since(startTime)
downloadDuration.Observe(duration.Seconds())
downloadsTotal.Inc()
downloadSizeBytes.Observe(float64(n))
log.Infof("Successfully downloaded %s (%s) in %s using adaptive I/O",
filename, formatBytes(n), duration)
}
// MultiInterfaceManager handles multiple network interfaces for seamless switching
type MultiInterfaceManager struct {
interfaces map[string]*NetworkInterface
activeInterface string
mutex sync.RWMutex
switchHistory []InterfaceSwitch
config *MultiInterfaceConfig
}
// NetworkInterface represents a network adapter
type NetworkInterface struct {
Name string
Type InterfaceType
Priority int
Quality *InterfaceQuality
Active bool
Gateway net.IP
MTU int
LastSeen time.Time
ThroughputHistory []ThroughputSample
}
// InterfaceType represents different types of network connections
type InterfaceType int
const (
InterfaceEthernet InterfaceType = iota
InterfaceWiFi
InterfaceLTE
InterfaceCellular
InterfaceVPN
InterfaceUnknown
)
// InterfaceSwitch tracks interface switching events
type InterfaceSwitch struct {
FromInterface string
ToInterface string
Timestamp time.Time
Reason SwitchReason
TransferStatus TransferStatus
SessionID string
}
// SwitchReason indicates why an interface switch occurred
type SwitchReason int
const (
SwitchReasonQualityDegradation SwitchReason = iota
SwitchReasonInterfaceDown
SwitchReasonBetterAlternative
SwitchReasonManual
SwitchReasonTimeout
)
// TransferStatus indicates the status of transfers during switch
type TransferStatus int
const (
TransferStatusContinuous TransferStatus = iota
TransferStatusPaused
TransferStatusFailed
TransferStatusRetried
)
// MultiInterfaceConfig holds configuration for multi-interface support
type MultiInterfaceConfig struct {
Enabled bool
InterfacePriority []string
AutoSwitchEnabled bool
SwitchThresholdLatency time.Duration
SwitchThresholdPacketLoss float64
QualityDegradationThreshold float64
MaxSwitchAttempts int
SwitchDetectionInterval time.Duration
}
// NewMultiInterfaceManager creates a new multi-interface manager
func NewMultiInterfaceManager() *MultiInterfaceManager {
config := &MultiInterfaceConfig{
Enabled: conf.NetworkResilience.MultiInterfaceEnabled,
InterfacePriority: []string{"eth0", "wlan0", "wwan0", "ppp0"},
AutoSwitchEnabled: true,
SwitchThresholdLatency: 500 * time.Millisecond,
SwitchThresholdPacketLoss: 5.0,
QualityDegradationThreshold: 0.3,
MaxSwitchAttempts: 3,
SwitchDetectionInterval: 2 * time.Second,
}
return &MultiInterfaceManager{
interfaces: make(map[string]*NetworkInterface),
switchHistory: make([]InterfaceSwitch, 0, 100),
config: config,
}
}
// StartMonitoring begins monitoring all network interfaces
func (mim *MultiInterfaceManager) StartMonitoring() {
ticker := time.NewTicker(mim.config.SwitchDetectionInterval)
defer ticker.Stop()
// Initial discovery
mim.discoverInterfaces()
for {
select {
case <-ticker.C:
mim.updateInterfaceStatus()
mim.evaluateInterfaceSwitching()
}
}
}
// discoverInterfaces discovers all available network interfaces
func (mim *MultiInterfaceManager) discoverInterfaces() {
interfaces, err := net.Interfaces()
if err != nil {
log.Errorf("Failed to discover network interfaces: %v", err)
return
}
mim.mutex.Lock()
defer mim.mutex.Unlock()
for _, iface := range interfaces {
if iface.Flags&net.FlagUp != 0 && iface.Flags&net.FlagLoopback == 0 {
netIface := &NetworkInterface{
Name: iface.Name,
Type: mim.detectInterfaceType(iface.Name),
Priority: mim.getInterfacePriority(iface.Name),
Active: true,
MTU: iface.MTU,
LastSeen: time.Now(),
Quality: &InterfaceQuality{
Name: iface.Name,
Connectivity: ConnectivityUnknown,
},
ThroughputHistory: make([]ThroughputSample, 0, 50),
}
mim.interfaces[iface.Name] = netIface
log.Infof("Discovered network interface: %s (type: %s, priority: %d)",
iface.Name, mim.interfaceTypeString(netIface.Type), netIface.Priority)
}
}
// Set initial active interface
if mim.activeInterface == "" {
mim.activeInterface = mim.selectBestInterface()
}
}
// detectInterfaceType determines the type of network interface
func (mim *MultiInterfaceManager) detectInterfaceType(name string) InterfaceType {
switch {
case strings.HasPrefix(name, "eth"), strings.HasPrefix(name, "en"):
return InterfaceEthernet
case strings.HasPrefix(name, "wlan"), strings.HasPrefix(name, "wl"):
return InterfaceWiFi
case strings.HasPrefix(name, "wwan"), strings.HasPrefix(name, "usb"):
return InterfaceLTE
case strings.HasPrefix(name, "ppp"), strings.HasPrefix(name, "rmnet"):
return InterfaceCellular
case strings.HasPrefix(name, "tun"), strings.HasPrefix(name, "tap"):
return InterfaceVPN
default:
return InterfaceUnknown
}
}
// GetActiveInterface returns the currently active network interface
func (mim *MultiInterfaceManager) GetActiveInterface() string {
mim.mutex.RLock()
defer mim.mutex.RUnlock()
return mim.activeInterface
}
// selectBestInterface chooses the optimal network interface
func (mim *MultiInterfaceManager) selectBestInterface() string {
mim.mutex.RLock()
defer mim.mutex.RUnlock()
var bestInterface *NetworkInterface
var bestName string
for name, iface := range mim.interfaces {
if !iface.Active {
continue
}
if bestInterface == nil || mim.isInterfaceBetter(iface, bestInterface) {
bestInterface = iface
bestName = name
}
}
return bestName
}
// getInterfacePriority returns the priority of an interface (lower = higher priority)
func (mim *MultiInterfaceManager) getInterfacePriority(name string) int {
for i, priority := range mim.config.InterfacePriority {
if priority == name {
return i
}
}
// Default priority based on interface type
interfaceType := mim.detectInterfaceType(name)
switch interfaceType {
case InterfaceEthernet:
return 10
case InterfaceWiFi:
return 20
case InterfaceLTE:
return 30
case InterfaceCellular:
return 40
case InterfaceVPN:
return 50
default:
return 100
}
}
// isInterfaceBetter determines if interface A is better than interface B
func (mim *MultiInterfaceManager) isInterfaceBetter(a, b *NetworkInterface) bool {
// First check priority (lower number = higher priority)
if a.Priority != b.Priority {
return a.Priority < b.Priority
}
// Then check quality metrics if available
if a.Quality != nil && b.Quality != nil {
aScore := mim.calculateInterfaceScore(a)
bScore := mim.calculateInterfaceScore(b)
return aScore > bScore
}
// Fallback to priority only
return a.Priority < b.Priority
}
// calculateInterfaceScore calculates a quality score for an interface
func (mim *MultiInterfaceManager) calculateInterfaceScore(iface *NetworkInterface) float64 {
if iface.Quality == nil {
return 0.0
}
score := 100.0 // Base score
// Penalize high latency
if iface.Quality.RTT > 100*time.Millisecond {
score -= float64(iface.Quality.RTT.Milliseconds()) * 0.1
}
// Penalize packet loss
score -= iface.Quality.PacketLoss * 10
// Reward stability
score += iface.Quality.Stability * 50
// Adjust for interface type
switch iface.Type {
case InterfaceEthernet:
score += 20 // Prefer wired connections
case InterfaceWiFi:
score += 10
case InterfaceLTE:
score += 5
case InterfaceCellular:
score += 0
case InterfaceVPN:
score -= 10 // VPN adds overhead
}
return maxFloat64(score, 0.0)
}
// updateInterfaceStatus updates the status of all interfaces
func (mim *MultiInterfaceManager) updateInterfaceStatus() {
interfaces, err := net.Interfaces()
if err != nil {
log.Errorf("Failed to update interface status: %v", err)
return
}
mim.mutex.Lock()
defer mim.mutex.Unlock()
// Mark all interfaces as potentially inactive
for _, iface := range mim.interfaces {
iface.Active = false
}
// Update active interfaces
for _, iface := range interfaces {
if iface.Flags&net.FlagUp != 0 && iface.Flags&net.FlagLoopback == 0 {
if netIface, exists := mim.interfaces[iface.Name]; exists {
netIface.Active = true
netIface.LastSeen = time.Now()
netIface.MTU = iface.MTU
}
}
}
}
// evaluateInterfaceSwitching determines if an interface switch is needed
func (mim *MultiInterfaceManager) evaluateInterfaceSwitching() {
if !mim.config.AutoSwitchEnabled {
return
}
currentInterface := mim.GetActiveInterface()
if currentInterface == "" {
return
}
bestInterface := mim.selectBestInterface()
if bestInterface != currentInterface && bestInterface != "" {
reason := mim.determineSwitchReason(currentInterface, bestInterface)
mim.switchToInterface(bestInterface, reason)
}
}
// determineSwitchReason determines why an interface switch is needed
func (mim *MultiInterfaceManager) determineSwitchReason(current, target string) SwitchReason {
mim.mutex.RLock()
defer mim.mutex.RUnlock()
currentIface := mim.interfaces[current]
if currentIface == nil || !currentIface.Active {
return SwitchReasonInterfaceDown
}
// Check if current interface quality has degraded
if currentIface.Quality != nil {
if currentIface.Quality.PacketLoss > mim.config.SwitchThresholdPacketLoss {
return SwitchReasonQualityDegradation
}
if currentIface.Quality.RTT > mim.config.SwitchThresholdLatency {
return SwitchReasonQualityDegradation
}
}
return SwitchReasonBetterAlternative
}
// switchToInterface performs the actual interface switch
func (mim *MultiInterfaceManager) switchToInterface(newInterface string, reason SwitchReason) {
mim.mutex.Lock()
oldInterface := mim.activeInterface
mim.activeInterface = newInterface
mim.mutex.Unlock()
// Record the switch
switchEvent := InterfaceSwitch{
FromInterface: oldInterface,
ToInterface: newInterface,
Timestamp: time.Now(),
Reason: reason,
TransferStatus: TransferStatusContinuous,
}
mim.mutex.Lock()
mim.switchHistory = append(mim.switchHistory, switchEvent)
if len(mim.switchHistory) > 100 {
mim.switchHistory = mim.switchHistory[1:]
}
mim.mutex.Unlock()
log.Infof("Switched network interface: %s -> %s (reason: %s)",
oldInterface, newInterface, mim.switchReasonString(reason))
// Notify active transfers about the switch
if globalStreamingEngine != nil {
go globalStreamingEngine.handleInterfaceSwitch(oldInterface, newInterface, reason)
}
}
// interfaceTypeString returns a string representation of interface type
func (mim *MultiInterfaceManager) interfaceTypeString(t InterfaceType) string {
switch t {
case InterfaceEthernet:
return "Ethernet"
case InterfaceWiFi:
return "WiFi"
case InterfaceLTE:
return "LTE"
case InterfaceCellular:
return "Cellular"
case InterfaceVPN:
return "VPN"
default:
return "Unknown"
}
}
// switchReasonString returns a string representation of switch reason
func (mim *MultiInterfaceManager) switchReasonString(r SwitchReason) string {
switch r {
case SwitchReasonQualityDegradation:
return "Quality Degradation"
case SwitchReasonInterfaceDown:
return "Interface Down"
case SwitchReasonBetterAlternative:
return "Better Alternative"
case SwitchReasonManual:
return "Manual"
case SwitchReasonTimeout:
return "Timeout"
default:
return "Unknown"
}
}
// GetInterfaceInfo retrieves information about a specific network interface
func (mim *MultiInterfaceManager) GetInterfaceInfo(interfaceName string) *NetworkInterface {
mim.mutex.RLock()
defer mim.mutex.RUnlock()
for _, iface := range mim.interfaces {
if iface.Name == interfaceName {
return iface
}
}
return nil
}