diff --git a/server/internal/service/backup_execution_service.go b/server/internal/service/backup_execution_service.go index 8c0e335..a514547 100644 --- a/server/internal/service/backup_execution_service.go +++ b/server/internal/service/backup_execution_service.go @@ -363,33 +363,46 @@ func (s *BackupExecutionService) executeTask(ctx context.Context, task *model.Ba logger.Warnf("存储目标 %s 创建客户端失败:%v", targetName, resolveErr) return } - artifact, openErr := os.Open(finalPath) - if openErr != nil { - uploadResults[index] = StorageUploadResultItem{StorageTargetID: targetID, StorageTargetName: targetName, Status: "failed", Error: openErr.Error()} - logger.Warnf("存储目标 %s 打开备份文件失败:%v", targetName, openErr) - return - } - defer artifact.Close() logger.Infof("开始上传备份到存储目标:%s", targetName) - // hashingReader: 上传过程中同步计算字节数 + SHA-256,单次读取零额外 I/O - hr := newHashingReader(artifact) - // progressReader: 包装 hashingReader,通过 LogHub 推送实时上传进度 - pr := newProgressReader(hr, fileSize, func(bytesRead int64, speedBps float64) { - percent := float64(0) - if fileSize > 0 { - percent = float64(bytesRead) / float64(fileSize) * 100 + // 上传级重试:最多 3 次,指数退避(10s, 30s, 90s) + maxAttempts := 3 + var lastUploadErr error + var hr *hashingReader + for attempt := 1; attempt <= maxAttempts; attempt++ { + if attempt > 1 { + backoff := time.Duration(attempt*attempt) * 10 * time.Second + logger.Warnf("存储目标 %s 第 %d 次重试(等待 %v):%v", targetName, attempt, backoff, lastUploadErr) + time.Sleep(backoff) + } + artifact, openErr := os.Open(finalPath) + if openErr != nil { + uploadResults[index] = StorageUploadResultItem{StorageTargetID: targetID, StorageTargetName: targetName, Status: "failed", Error: openErr.Error()} + logger.Warnf("存储目标 %s 打开备份文件失败:%v", targetName, openErr) + return } - s.logHub.AppendProgress(recordID, backup.ProgressInfo{ - BytesSent: bytesRead, - TotalBytes: fileSize, - Percent: percent, - SpeedBps: speedBps, - TargetName: targetName, + hr = newHashingReader(artifact) + pr := newProgressReader(hr, fileSize, func(bytesRead int64, speedBps float64) { + percent := float64(0) + if fileSize > 0 { + percent = float64(bytesRead) / float64(fileSize) * 100 + } + s.logHub.AppendProgress(recordID, backup.ProgressInfo{ + BytesSent: bytesRead, + TotalBytes: fileSize, + Percent: percent, + SpeedBps: speedBps, + TargetName: targetName, + }) }) - }) - if uploadErr := provider.Upload(ctx, storagePath, pr, fileSize, map[string]string{"taskId": fmt.Sprintf("%d", task.ID), "recordId": fmt.Sprintf("%d", recordID)}); uploadErr != nil { - uploadResults[index] = StorageUploadResultItem{StorageTargetID: targetID, StorageTargetName: targetName, Status: "failed", Error: uploadErr.Error()} - logger.Warnf("存储目标 %s 上传失败:%v", targetName, uploadErr) + lastUploadErr = provider.Upload(ctx, storagePath, pr, fileSize, map[string]string{"taskId": fmt.Sprintf("%d", task.ID), "recordId": fmt.Sprintf("%d", recordID)}) + artifact.Close() + if lastUploadErr == nil { + break + } + } + if lastUploadErr != nil { + uploadResults[index] = StorageUploadResultItem{StorageTargetID: targetID, StorageTargetName: targetName, Status: "failed", Error: lastUploadErr.Error()} + logger.Warnf("存储目标 %s 上传失败(已重试 %d 次):%v", targetName, maxAttempts, lastUploadErr) return } // 完整性校验:对比实际传输字节数