sync to github
This commit is contained in:
parent
330956ebc7
commit
4cfc5591f6
@ -16,6 +16,7 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
|
"os/exec"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"runtime"
|
"runtime"
|
||||||
@ -97,6 +98,15 @@ type FileConfig struct {
|
|||||||
FileRevision int `mapstructure:"FileRevision"`
|
FileRevision int `mapstructure:"FileRevision"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Configuration structure for ISO settings
|
||||||
|
type ISOConfig struct {
|
||||||
|
Enabled bool `mapstructure:"enabled"`
|
||||||
|
Size string `mapstructure:"size"`
|
||||||
|
MountPoint string `mapstructure:"mountpoint"`
|
||||||
|
Charset string `mapstructure:"charset"` // Add this line
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add ISO configuration to the main configuration structure
|
||||||
type Config struct {
|
type Config struct {
|
||||||
Server ServerConfig `mapstructure:"server"`
|
Server ServerConfig `mapstructure:"server"`
|
||||||
Timeouts TimeoutConfig `mapstructure:"timeouts"`
|
Timeouts TimeoutConfig `mapstructure:"timeouts"`
|
||||||
@ -107,6 +117,7 @@ type Config struct {
|
|||||||
Redis RedisConfig `mapstructure:"redis"`
|
Redis RedisConfig `mapstructure:"redis"`
|
||||||
Workers WorkersConfig `mapstructure:"workers"`
|
Workers WorkersConfig `mapstructure:"workers"`
|
||||||
File FileConfig `mapstructure:"file"`
|
File FileConfig `mapstructure:"file"`
|
||||||
|
ISO ISOConfig `mapstructure:"iso"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// UploadTask represents a file upload task
|
// UploadTask represents a file upload task
|
||||||
@ -180,6 +191,14 @@ func main() {
|
|||||||
}
|
}
|
||||||
log.Info("Configuration loaded successfully.")
|
log.Info("Configuration loaded successfully.")
|
||||||
|
|
||||||
|
// Verify and create ISO container if it doesn't exist
|
||||||
|
if conf.ISO.Enabled {
|
||||||
|
err = verifyAndCreateISOContainer()
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("ISO container verification failed: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Initialize file info cache
|
// Initialize file info cache
|
||||||
fileInfoCache = cache.New(5*time.Minute, 10*time.Minute)
|
fileInfoCache = cache.New(5*time.Minute, 10*time.Minute)
|
||||||
|
|
||||||
@ -338,6 +357,31 @@ func main() {
|
|||||||
log.Fatalf("Server failed: %v", err)
|
log.Fatalf("Server failed: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Example files to include in the ISO container
|
||||||
|
files := []string{"file1.txt", "file2.txt"}
|
||||||
|
isoPath := "/path/to/container.iso"
|
||||||
|
|
||||||
|
// Create ISO container
|
||||||
|
err = CreateISOContainer(files, isoPath, conf.ISO.Size, conf.ISO.Charset)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("Failed to create ISO container: %v\n", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mount ISO container
|
||||||
|
err = MountISOContainer(isoPath, conf.ISO.MountPoint)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("Failed to mount ISO container: %v\n", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unmount ISO container (example)
|
||||||
|
err = UnmountISOContainer(conf.ISO.MountPoint)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("Failed to unmount ISO container: %v\n", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Function to load configuration using Viper
|
// Function to load configuration using Viper
|
||||||
@ -427,6 +471,12 @@ func setDefaults() {
|
|||||||
|
|
||||||
// Deduplication defaults
|
// Deduplication defaults
|
||||||
viper.SetDefault("deduplication.Enabled", true)
|
viper.SetDefault("deduplication.Enabled", true)
|
||||||
|
|
||||||
|
// ISO defaults
|
||||||
|
viper.SetDefault("iso.Enabled", true)
|
||||||
|
viper.SetDefault("iso.Size", "1GB")
|
||||||
|
viper.SetDefault("iso.MountPoint", "/mnt/iso")
|
||||||
|
viper.SetDefault("iso.Charset", "utf-8") // Add this line
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate configuration fields
|
// Validate configuration fields
|
||||||
@ -462,6 +512,19 @@ func validateConfig(conf *Config) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Validate ISO configuration
|
||||||
|
if conf.ISO.Enabled {
|
||||||
|
if conf.ISO.Size == "" {
|
||||||
|
return fmt.Errorf("ISO size must be set")
|
||||||
|
}
|
||||||
|
if conf.ISO.MountPoint == "" {
|
||||||
|
return fmt.Errorf("ISO mount point must be set")
|
||||||
|
}
|
||||||
|
if conf.ISO.Charset == "" {
|
||||||
|
return fmt.Errorf("ISO charset must be set")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Add more validations as needed
|
// Add more validations as needed
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -765,6 +828,17 @@ func processUpload(task UploadTask) error {
|
|||||||
log.Infof("Deduplication handled successfully for file: %s", absFilename)
|
log.Infof("Deduplication handled successfully for file: %s", absFilename)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Handle ISO container if enabled
|
||||||
|
if conf.ISO.Enabled {
|
||||||
|
err = handleISOContainer(absFilename)
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).Error("ISO container handling failed")
|
||||||
|
uploadErrorsTotal.Inc()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
log.Infof("ISO container handled successfully for file: %s", absFilename)
|
||||||
|
}
|
||||||
|
|
||||||
log.WithFields(logrus.Fields{
|
log.WithFields(logrus.Fields{
|
||||||
"file": absFilename,
|
"file": absFilename,
|
||||||
}).Info("File uploaded and processed successfully")
|
}).Info("File uploaded and processed successfully")
|
||||||
@ -774,7 +848,7 @@ func processUpload(task UploadTask) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// uploadWorker processes upload tasks from the uploadQueue
|
// Improved uploadWorker function with better concurrency handling
|
||||||
func uploadWorker(ctx context.Context, workerID int) {
|
func uploadWorker(ctx context.Context, workerID int) {
|
||||||
log.Infof("Upload worker %d started.", workerID)
|
log.Infof("Upload worker %d started.", workerID)
|
||||||
defer log.Infof("Upload worker %d stopped.", workerID)
|
defer log.Infof("Upload worker %d stopped.", workerID)
|
||||||
@ -801,7 +875,7 @@ func uploadWorker(ctx context.Context, workerID int) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialize upload worker pool
|
// Improved initializeUploadWorkerPool function
|
||||||
func initializeUploadWorkerPool(ctx context.Context) {
|
func initializeUploadWorkerPool(ctx context.Context) {
|
||||||
for i := 0; i < conf.Workers.NumWorkers; i++ {
|
for i := 0; i < conf.Workers.NumWorkers; i++ {
|
||||||
go uploadWorker(ctx, i)
|
go uploadWorker(ctx, i)
|
||||||
@ -1156,19 +1230,17 @@ func handleDownload(w http.ResponseWriter, r *http.Request, absFilename, fileSto
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create the file for upload with buffered Writer
|
// Improved createFile function with proper resource management
|
||||||
func createFile(tempFilename string, r *http.Request) error {
|
func createFile(tempFilename string, r *http.Request) error {
|
||||||
absDirectory := filepath.Dir(tempFilename)
|
absDirectory := filepath.Dir(tempFilename)
|
||||||
err := os.MkdirAll(absDirectory, os.ModePerm)
|
err := os.MkdirAll(absDirectory, os.ModePerm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Errorf("Failed to create directory %s", absDirectory)
|
return err
|
||||||
return fmt.Errorf("failed to create directory %s: %w", absDirectory, err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open the file for writing
|
// Open the file for writing
|
||||||
targetFile, err := os.OpenFile(tempFilename, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
|
targetFile, err := os.OpenFile(tempFilename, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Errorf("Failed to create file %s", tempFilename)
|
|
||||||
return fmt.Errorf("failed to create file %s: %w", tempFilename, err)
|
return fmt.Errorf("failed to create file %s: %w", tempFilename, err)
|
||||||
}
|
}
|
||||||
defer targetFile.Close()
|
defer targetFile.Close()
|
||||||
@ -1185,7 +1257,6 @@ func createFile(tempFilename string, r *http.Request) error {
|
|||||||
totalBytes += int64(n)
|
totalBytes += int64(n)
|
||||||
_, writeErr := writer.Write(buffer[:n])
|
_, writeErr := writer.Write(buffer[:n])
|
||||||
if writeErr != nil {
|
if writeErr != nil {
|
||||||
log.WithError(writeErr).Errorf("Failed to write to file %s", tempFilename)
|
|
||||||
return fmt.Errorf("failed to write to file %s: %w", tempFilename, writeErr)
|
return fmt.Errorf("failed to write to file %s: %w", tempFilename, writeErr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1193,14 +1264,12 @@ func createFile(tempFilename string, r *http.Request) error {
|
|||||||
if readErr == io.EOF {
|
if readErr == io.EOF {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
log.WithError(readErr).Error("Failed to read request body")
|
|
||||||
return fmt.Errorf("failed to read request body: %w", readErr)
|
return fmt.Errorf("failed to read request body: %w", readErr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = writer.Flush()
|
err = writer.Flush()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Errorf("Failed to flush buffer to file %s", tempFilename)
|
|
||||||
return fmt.Errorf("failed to flush buffer to file %s: %w", tempFilename, err)
|
return fmt.Errorf("failed to flush buffer to file %s: %w", tempFilename, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1595,7 +1664,7 @@ func MonitorRedisHealth(ctx context.Context, client *redis.Client, checkInterval
|
|||||||
}
|
}
|
||||||
redisConnected = false
|
redisConnected = false
|
||||||
} else {
|
} else {
|
||||||
if (!redisConnected) {
|
if !redisConnected {
|
||||||
log.Info("Redis reconnected successfully")
|
log.Info("Redis reconnected successfully")
|
||||||
}
|
}
|
||||||
redisConnected = true
|
redisConnected = true
|
||||||
@ -1738,7 +1807,7 @@ func computeFileHash(filePath string) (string, error) {
|
|||||||
return hex.EncodeToString(hasher.Sum(nil)), nil
|
return hex.EncodeToString(hasher.Sum(nil)), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleMultipartUpload handles multipart file uploads
|
// handleMultipartUpload handles multipart uploads
|
||||||
func handleMultipartUpload(w http.ResponseWriter, r *http.Request, absFilename string) error {
|
func handleMultipartUpload(w http.ResponseWriter, r *http.Request, absFilename string) error {
|
||||||
err := r.ParseMultipartForm(32 << 20) // 32MB is the default used by FormFile
|
err := r.ParseMultipartForm(32 << 20) // 32MB is the default used by FormFile
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -1978,3 +2047,137 @@ func checkFreeSpaceWithRetry(path string, retries int, delay time.Duration) erro
|
|||||||
}
|
}
|
||||||
return fmt.Errorf("checkFreeSpace: insufficient free space after %d attempts", retries)
|
return fmt.Errorf("checkFreeSpace: insufficient free space after %d attempts", retries)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CreateISOContainer creates an ISO container with the specified size
|
||||||
|
func CreateISOContainer(files []string, isoPath string, size string, charset string) error {
|
||||||
|
args := []string{"-o", isoPath, "-V", "ISO_CONTAINER", "-J", "-R", "-input-charset", charset}
|
||||||
|
args = append(args, files...)
|
||||||
|
cmd := exec.Command("genisoimage", args...)
|
||||||
|
cmd.Stdout = os.Stdout
|
||||||
|
cmd.Stderr = os.Stderr
|
||||||
|
return cmd.Run()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Improved logging and error handling
|
||||||
|
func MountISOContainer(isoPath string, mountPoint string) error {
|
||||||
|
// Ensure the mount point directory exists
|
||||||
|
if err := os.MkdirAll(mountPoint, os.ModePerm); err != nil {
|
||||||
|
return fmt.Errorf("failed to create mount point directory: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var output []byte
|
||||||
|
var err error
|
||||||
|
for i := 0; i < 3; i++ {
|
||||||
|
cmd := exec.Command("mount", "-o", "loop,ro", isoPath, mountPoint)
|
||||||
|
output, err = cmd.CombinedOutput()
|
||||||
|
if err == nil {
|
||||||
|
log.Infof("ISO container mounted successfully at %s", mountPoint)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
log.Warnf("Failed to mount ISO container (attempt %d/3): %v, output: %s", i+1, err, string(output))
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
}
|
||||||
|
return fmt.Errorf("failed to mount ISO container: %w, output: %s", err, string(output))
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnmountISOContainer unmounts the ISO container from the specified mount point
|
||||||
|
func UnmountISOContainer(mountPoint string) error {
|
||||||
|
cmd := exec.Command("umount", mountPoint)
|
||||||
|
cmd.Stdout = os.Stdout
|
||||||
|
cmd.Stderr = os.Stderr
|
||||||
|
return cmd.Run()
|
||||||
|
}
|
||||||
|
|
||||||
|
func handleISOContainer(absFilename string) error {
|
||||||
|
isoPath := filepath.Join(conf.ISO.MountPoint, "container.iso")
|
||||||
|
|
||||||
|
// Create ISO container
|
||||||
|
err := CreateISOContainer([]string{absFilename}, isoPath, conf.ISO.Size, conf.ISO.Charset)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create ISO container: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure the mount point directory exists
|
||||||
|
if err := os.MkdirAll(conf.ISO.MountPoint, os.ModePerm); err != nil {
|
||||||
|
return fmt.Errorf("failed to create mount point directory: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mount ISO container
|
||||||
|
err = MountISOContainer(isoPath, conf.ISO.MountPoint)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to mount ISO container: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unmount ISO container (example)
|
||||||
|
err = UnmountISOContainer(conf.ISO.MountPoint)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to unmount ISO container: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify and create ISO container if it doesn't exist
|
||||||
|
func verifyAndCreateISOContainer() error {
|
||||||
|
isoPath := filepath.Join(conf.ISO.MountPoint, "container.iso")
|
||||||
|
|
||||||
|
// Check if ISO file exists
|
||||||
|
if exists, _ := fileExists(isoPath); !exists {
|
||||||
|
log.Infof("ISO container does not exist. Creating new ISO container at %s", isoPath)
|
||||||
|
|
||||||
|
// Example files to include in the ISO container
|
||||||
|
files := []string{conf.Server.StoragePath}
|
||||||
|
|
||||||
|
// Create ISO container
|
||||||
|
err := CreateISOContainer(files, isoPath, conf.ISO.Size, conf.ISO.Charset)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create ISO container: %w", err)
|
||||||
|
}
|
||||||
|
log.Infof("ISO container created successfully at %s", isoPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify ISO file consistency
|
||||||
|
err := verifyISOFile(isoPath)
|
||||||
|
if err != nil {
|
||||||
|
// Handle corrupted ISO file
|
||||||
|
files := []string{conf.Server.StoragePath}
|
||||||
|
err = handleCorruptedISOFile(isoPath, files, conf.ISO.Size, conf.ISO.Charset)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to handle corrupted ISO file: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure the mount point directory exists
|
||||||
|
if err := os.MkdirAll(conf.ISO.MountPoint, os.ModePerm); err != nil {
|
||||||
|
return fmt.Errorf("failed to create mount point directory: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mount ISO container
|
||||||
|
err = MountISOContainer(isoPath, conf.ISO.MountPoint)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to mount ISO container: %w", err)
|
||||||
|
}
|
||||||
|
log.Infof("ISO container mounted successfully at %s", conf.ISO.MountPoint)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify ISO file consistency using a checksum
|
||||||
|
func verifyISOFile(isoPath string) error {
|
||||||
|
cmd := exec.Command("isoinfo", "-i", isoPath, "-d")
|
||||||
|
output, err := cmd.CombinedOutput()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to verify ISO file: %w, output: %s", err, string(output))
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle corrupted ISO file by recreating it
|
||||||
|
func handleCorruptedISOFile(isoPath string, files []string, size string, charset string) error {
|
||||||
|
log.Warnf("ISO file %s is corrupted. Recreating it.", isoPath)
|
||||||
|
err := CreateISOContainer(files, isoPath, size, charset)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to recreate ISO container: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user