Files
dbbackup/internal/engine/binlog/s3_target.go
Alexander Renz f033b02cec
Some checks failed
CI/CD / Test (push) Failing after 4s
CI/CD / Generate SBOM (push) Has been skipped
CI/CD / Lint (push) Failing after 4s
CI/CD / Build (darwin-amd64) (push) Has been skipped
CI/CD / Build (linux-amd64) (push) Has been skipped
CI/CD / Build (darwin-arm64) (push) Has been skipped
CI/CD / Build (linux-arm64) (push) Has been skipped
CI/CD / Release (push) Has been skipped
CI/CD / Build & Push Docker Image (push) Has been skipped
CI/CD / Mirror to GitHub (push) Has been skipped
fix(build): move EstimateBackupSize to platform-independent file
Fixes Windows, OpenBSD, and NetBSD builds by extracting
EstimateBackupSize from disk_check.go (which has build tags
excluding those platforms) to a new estimate.go file.
2025-12-13 21:55:39 +01:00

245 lines
5.2 KiB
Go

package binlog
import (
"bytes"
"context"
"encoding/json"
"fmt"
"sync"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
)
// S3Target writes binlog events to S3
type S3Target struct {
client *s3.Client
bucket string
prefix string
region string
partSize int64
mu sync.Mutex
buffer *bytes.Buffer
bufferSize int
currentKey string
uploadID string
parts []types.CompletedPart
partNumber int32
fileNum int
healthy bool
lastErr error
lastWrite time.Time
}
// NewS3Target creates a new S3 target
func NewS3Target(bucket, prefix, region string) (*S3Target, error) {
if bucket == "" {
return nil, fmt.Errorf("bucket required for S3 target")
}
// Load AWS config
cfg, err := config.LoadDefaultConfig(context.Background(),
config.WithRegion(region),
)
if err != nil {
return nil, fmt.Errorf("failed to load AWS config: %w", err)
}
client := s3.NewFromConfig(cfg)
return &S3Target{
client: client,
bucket: bucket,
prefix: prefix,
region: region,
partSize: 10 * 1024 * 1024, // 10MB parts
buffer: bytes.NewBuffer(nil),
healthy: true,
}, nil
}
// Name returns the target name
func (s *S3Target) Name() string {
return fmt.Sprintf("s3://%s/%s", s.bucket, s.prefix)
}
// Type returns the target type
func (s *S3Target) Type() string {
return "s3"
}
// Write writes events to S3 buffer
func (s *S3Target) Write(ctx context.Context, events []*Event) error {
s.mu.Lock()
defer s.mu.Unlock()
// Write events to buffer
for _, ev := range events {
data, err := json.Marshal(ev)
if err != nil {
continue
}
data = append(data, '\n')
s.buffer.Write(data)
s.bufferSize += len(data)
}
// Upload part if buffer exceeds threshold
if int64(s.bufferSize) >= s.partSize {
if err := s.uploadPart(ctx); err != nil {
s.healthy = false
s.lastErr = err
return err
}
}
s.healthy = true
s.lastWrite = time.Now()
return nil
}
// uploadPart uploads the current buffer as a part
func (s *S3Target) uploadPart(ctx context.Context) error {
if s.bufferSize == 0 {
return nil
}
// Start multipart upload if not started
if s.uploadID == "" {
s.fileNum++
s.currentKey = fmt.Sprintf("%sbinlog_%s_%04d.jsonl",
s.prefix,
time.Now().Format("20060102_150405"),
s.fileNum)
result, err := s.client.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
Bucket: aws.String(s.bucket),
Key: aws.String(s.currentKey),
})
if err != nil {
return fmt.Errorf("failed to create multipart upload: %w", err)
}
s.uploadID = *result.UploadId
s.parts = nil
s.partNumber = 0
}
// Upload part
s.partNumber++
result, err := s.client.UploadPart(ctx, &s3.UploadPartInput{
Bucket: aws.String(s.bucket),
Key: aws.String(s.currentKey),
UploadId: aws.String(s.uploadID),
PartNumber: aws.Int32(s.partNumber),
Body: bytes.NewReader(s.buffer.Bytes()),
})
if err != nil {
return fmt.Errorf("failed to upload part: %w", err)
}
s.parts = append(s.parts, types.CompletedPart{
ETag: result.ETag,
PartNumber: aws.Int32(s.partNumber),
})
// Reset buffer
s.buffer.Reset()
s.bufferSize = 0
return nil
}
// Flush completes the current multipart upload
func (s *S3Target) Flush(ctx context.Context) error {
s.mu.Lock()
defer s.mu.Unlock()
// Upload remaining buffer
if s.bufferSize > 0 {
if err := s.uploadPart(ctx); err != nil {
return err
}
}
// Complete multipart upload
if s.uploadID != "" && len(s.parts) > 0 {
_, err := s.client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{
Bucket: aws.String(s.bucket),
Key: aws.String(s.currentKey),
UploadId: aws.String(s.uploadID),
MultipartUpload: &types.CompletedMultipartUpload{
Parts: s.parts,
},
})
if err != nil {
return fmt.Errorf("failed to complete upload: %w", err)
}
// Reset for next file
s.uploadID = ""
s.parts = nil
s.partNumber = 0
}
return nil
}
// Close closes the target
func (s *S3Target) Close() error {
return s.Flush(context.Background())
}
// Healthy returns target health status
func (s *S3Target) Healthy() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.healthy
}
// S3StreamingTarget supports larger files with resumable uploads
type S3StreamingTarget struct {
*S3Target
rotateSize int64
currentSize int64
}
// NewS3StreamingTarget creates an S3 target with file rotation
func NewS3StreamingTarget(bucket, prefix, region string, rotateSize int64) (*S3StreamingTarget, error) {
base, err := NewS3Target(bucket, prefix, region)
if err != nil {
return nil, err
}
if rotateSize == 0 {
rotateSize = 1024 * 1024 * 1024 // 1GB default
}
return &S3StreamingTarget{
S3Target: base,
rotateSize: rotateSize,
}, nil
}
// Write writes with rotation support
func (s *S3StreamingTarget) Write(ctx context.Context, events []*Event) error {
// Check if we need to rotate
if s.currentSize >= s.rotateSize {
if err := s.Flush(ctx); err != nil {
return err
}
s.currentSize = 0
}
// Estimate size
for _, ev := range events {
s.currentSize += int64(len(ev.RawData))
}
return s.S3Target.Write(ctx, events)
}