diff --git a/docs/deploy/schema.sql b/docs/deploy/schema.sql new file mode 100644 index 0000000..e85bb1b --- /dev/null +++ b/docs/deploy/schema.sql @@ -0,0 +1,55 @@ +-- Deploy模块数据库架构 +-- 部署相关的主机、实例和版本历史表 + +-- 创建hosts表:主机信息 +CREATE TABLE hosts ( + id SERIAL PRIMARY KEY, + name VARCHAR(255) UNIQUE, + ip_address VARCHAR(45) UNIQUE, + is_stopped BOOLEAN +); + +-- 创建instances表:服务实例信息 +CREATE TABLE instances ( + id VARCHAR(255) NOT NULL PRIMARY KEY, -- VARCHAR类型主键,非自增,不为空 + service_name VARCHAR(255), + service_version VARCHAR(255), + host_id VARCHAR(255), + host_ip_address VARCHAR(45), + ip_address VARCHAR(45), + port INT, + status VARCHAR(50), + is_stopped BOOLEAN, + -- 保留ip_address和port的组合唯一约束 + CONSTRAINT unique_ip_port UNIQUE (ip_address, port) +); + +-- 1. 创建service_name和service_version的联合索引 +CREATE INDEX idx_instances_service_name_version +ON instances (service_name, service_version); + +-- 2. 创建service_name和ip_address的联合索引 +CREATE INDEX idx_instances_service_name_ip +ON instances (service_name, ip_address); + +-- 3. 创建version_histories表:版本历史记录 +CREATE TABLE version_histories ( + id SERIAL PRIMARY KEY, + instance_id VARCHAR(255), + service_name VARCHAR(255), + service_version VARCHAR(255), + status VARCHAR(50) +); + +-- 初始化主机数据 +-- 插入 jfcs1021 主机数据 +INSERT INTO hosts (name, ip_address, is_stopped) +VALUES ('jfcs1021', '10.210.10.33', false); + +-- 插入 jfcs1022 主机数据 +INSERT INTO hosts (name, ip_address, is_stopped) +VALUES ('jfcs1022', '10.210.10.30', false); + +-- 插入 jfcs1023 主机数据 +INSERT INTO hosts (name, ip_address, is_stopped) +VALUES ('jfcs1023', '10.210.10.31', false); \ No newline at end of file diff --git a/docs/service_manager/model/schema.sql b/docs/service_manager/model/schema.sql index da24ef4..0f8d2ef 100644 --- a/docs/service_manager/model/schema.sql +++ b/docs/service_manager/model/schema.sql @@ -44,17 +44,21 @@ CREATE TABLE IF NOT EXISTS service_states ( -- 部署任务表 (deploy_tasks) CREATE TABLE IF NOT EXISTS deploy_tasks ( - id VARCHAR(32) PRIMARY KEY, + service VARCHAR(255), + version VARCHAR(255), start_time TIMESTAMP, end_time TIMESTAMP, target_ratio DOUBLE PRECISION, instances JSONB DEFAULT '[]'::jsonb, - deploy_state VARCHAR(50) + deploy_state VARCHAR(50), + PRIMARY KEY (service, version), + FOREIGN KEY (service) REFERENCES services(name) ON DELETE CASCADE ); -- 创建索引以提高查询性能 CREATE INDEX IF NOT EXISTS idx_service_states_service ON service_states(service); CREATE INDEX IF NOT EXISTS idx_service_states_report_at ON service_states(service, report_at DESC); +CREATE INDEX IF NOT EXISTS idx_deploy_tasks_service ON deploy_tasks(service); CREATE INDEX IF NOT EXISTS idx_deploy_tasks_state ON deploy_tasks(deploy_state); CREATE INDEX IF NOT EXISTS idx_service_instances_service ON service_instances(service); diff --git a/internal/deploy/config.yaml b/internal/deploy/config.yaml new file mode 100644 index 0000000..69f0319 --- /dev/null +++ b/internal/deploy/config.yaml @@ -0,0 +1,24 @@ +database: + host: "localhost" + port: 5432 + user: "admin" + password: "password" + dbname: "zeroops" + sslmode: "disable" + +privateKey: | + -----BEGIN RSA PRIVATE KEY----- + MIICXQIBAAKBgQDZsfv1qscqYdy4vY+P4e3cAtmvppXQcRvrF1cB4drkv0haU24Y + 7m5qYtT52Kr539RdbKKdLAM6s20lWy7+5C0DgacdwYWd/7PeCELyEipZJL07Vro7 + Ate8Bfjya+wltGK9+XNUIHiumUKULW4KDx21+1NLAUeJ6PeW+DAkmJWF6QIDAQAB + AoGBAJlNxenTQj6OfCl9FMR2jlMJjtMrtQT9InQEE7m3m7bLHeC+MCJOhmNVBjaM + ZpthDORdxIZ6oCuOf6Z2+Dl35lntGFh5J7S34UP2BWzF1IyyQfySCNexGNHKT1G1 + XKQtHmtc2gWWthEg+S6ciIyw2IGrrP2Rke81vYHExPrexf0hAkEA9Izb0MiYsMCB + /jemLJB0Lb3Y/B8xjGjQFFBQT7bmwBVjvZWZVpnMnXi9sWGdgUpxsCuAIROXjZ40 + IRZ2C9EouwJBAOPjPvV8Sgw4vaseOqlJvSq/C/pIFx6RVznDGlc8bRg7SgTPpjHG + 4G+M3mVgpCX1a/EU1mB+fhiJ2LAZ/pTtY6sCQGaW9NwIWu3DRIVGCSMm0mYh/3X9 + DAcwLSJoctiODQ1Fq9rreDE5QfpJnaJdJfsIJNtX1F+L3YceeBXtW0Ynz2MCQBI8 + 9KP274Is5FkWkUFNKnuKUK4WKOuEXEO+LpR+vIhs7k6WQ8nGDd4/mujoJBr5mkrw + DPwqA3N5TMNDQVGv8gMCQQCaKGJgWYgvo3/milFfImbp+m7/Y3vCptarldXrYQWO + AQjxwc71ZGBFDITYvdgJM1MTqc8xQek1FXn1vfpy2c6O + -----END RSA PRIVATE KEY----- diff --git a/internal/deploy/config/config.go b/internal/deploy/config/config.go new file mode 100644 index 0000000..34e988a --- /dev/null +++ b/internal/deploy/config/config.go @@ -0,0 +1,44 @@ +package config + +import ( + "fmt" + "os" + + "gopkg.in/yaml.v3" +) + +// Config Deploy服务配置 +type Config struct { + Database DatabaseConfig `yaml:"database"` +} + +// DatabaseConfig 数据库配置 +type DatabaseConfig struct { + Host string `yaml:"host"` + Port int `yaml:"port"` + User string `yaml:"user"` + Password string `yaml:"password"` + DBName string `yaml:"dbname"` + SSLMode string `yaml:"sslmode"` +} + +// GetDSN 获取PostgreSQL数据库连接字符串 +func (d *DatabaseConfig) GetDSN() string { + return fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s", + d.Host, d.Port, d.User, d.Password, d.DBName, d.SSLMode) +} + +// LoadConfig 加载配置文件 +func LoadConfig(configPath string) (*Config, error) { + data, err := os.ReadFile(configPath) + if err != nil { + return nil, fmt.Errorf("failed to read config file: %w", err) + } + + var config Config + if err := yaml.Unmarshal(data, &config); err != nil { + return nil, fmt.Errorf("failed to parse config file: %w", err) + } + + return &config, nil +} diff --git a/internal/deploy/packages/storage-v1.0.0.tar.gz b/internal/deploy/packages/storage-v1.0.0.tar.gz new file mode 100644 index 0000000..231cf8b Binary files /dev/null and b/internal/deploy/packages/storage-v1.0.0.tar.gz differ diff --git a/internal/prometheus_adapter/config/prometheus_adapter.yml b/internal/prometheus_adapter/config/prometheus_adapter.yml index 55f0b56..0818db1 100644 --- a/internal/prometheus_adapter/config/prometheus_adapter.yml +++ b/internal/prometheus_adapter/config/prometheus_adapter.yml @@ -10,7 +10,7 @@ prometheus: # 告警 Webhook 服务配置 alert_webhook: # 监控告警模块地址 - url: "http://alert-module:8080/v1/integrations/alertmanager/webhook" + url: "http://10.210.10.33:8080/v1/integrations/alertmanager/webhook" # 轮询间隔 polling_interval: "10s" @@ -24,4 +24,4 @@ alert_rules: # 服务器配置 server: # 服务监听地址 - bind_addr: "0.0.0.0:9999" \ No newline at end of file + bind_addr: "0.0.0.0:9999" diff --git a/internal/prometheus_adapter/service/alert_service.go b/internal/prometheus_adapter/service/alert_service.go index fdd1b5c..afbcf9d 100644 --- a/internal/prometheus_adapter/service/alert_service.go +++ b/internal/prometheus_adapter/service/alert_service.go @@ -520,9 +520,16 @@ func (s *AlertService) buildExpression(rule *model.AlertRule, meta *model.AlertR } } - // 添加比较操作符和阈值 + // 添加比较操作符和阈值(检查是否已经包含) if meta.Threshold != 0 { - expr = fmt.Sprintf("%s %s %g", expr, rule.Op, meta.Threshold) + // 检查表达式是否已经包含比较操作符 + hasComparison := strings.Contains(expr, " > ") || strings.Contains(expr, " < ") || + strings.Contains(expr, " = ") || strings.Contains(expr, " != ") || + strings.Contains(expr, " >= ") || strings.Contains(expr, " <= ") + + if !hasComparison { + expr = fmt.Sprintf("%s %s %g", expr, rule.Op, meta.Threshold) + } } return expr diff --git a/internal/service_manager/api/deploy_api.go b/internal/service_manager/api/deploy_api.go index f5b0c7f..f31dae1 100644 --- a/internal/service_manager/api/deploy_api.go +++ b/internal/service_manager/api/deploy_api.go @@ -4,31 +4,31 @@ import ( "net/http" "strconv" - "github.com/gin-gonic/gin" + "github.com/fox-gonic/fox" "github.com/qiniu/zeroops/internal/service_manager/model" "github.com/qiniu/zeroops/internal/service_manager/service" "github.com/rs/zerolog/log" ) // setupDeployRouters 设置部署管理相关路由 -func (api *Api) setupDeployRouters(router *gin.Engine) { +func (api *Api) setupDeployRouters(router *fox.Engine) { // 部署任务基本操作 router.POST("/v1/deployments", api.CreateDeployment) router.GET("/v1/deployments", api.GetDeployments) - router.GET("/v1/deployments/:deployID", api.GetDeploymentByID) - router.POST("/v1/deployments/:deployID", api.UpdateDeployment) - router.DELETE("/v1/deployments/:deployID", api.DeleteDeployment) + router.GET("/v1/deployments/:service/:version", api.GetDeploymentByServiceAndVersion) + router.POST("/v1/deployments/:service/:version", api.UpdateDeployment) + router.DELETE("/v1/deployments/:service/:version", api.DeleteDeployment) // 部署任务控制操作 - router.POST("/v1/deployments/:deployID/pause", api.PauseDeployment) - router.POST("/v1/deployments/:deployID/continue", api.ContinueDeployment) - router.POST("/v1/deployments/:deployID/rollback", api.RollbackDeployment) + router.POST("/v1/deployments/:service/:version/pause", api.PauseDeployment) + router.POST("/v1/deployments/:service/:version/continue", api.ContinueDeployment) + router.POST("/v1/deployments/:service/:version/rollback", api.RollbackDeployment) } -// ===== 部署管理相关API ===== +// ===== 部署任务基本操作 ===== // CreateDeployment 创建发布任务(POST /v1/deployments) -func (api *Api) CreateDeployment(c *gin.Context) { +func (api *Api) CreateDeployment(c *fox.Context) { ctx := c.Request.Context() var req model.CreateDeploymentRequest @@ -48,7 +48,7 @@ func (api *Api) CreateDeployment(c *gin.Context) { return } - deployID, err := api.service.CreateDeployment(ctx, &req) + deployment, err := api.service.CreateDeployment(ctx, &req) if err != nil { if err == service.ErrServiceNotFound { c.JSON(http.StatusBadRequest, map[string]any{ @@ -75,26 +75,24 @@ func (api *Api) CreateDeployment(c *gin.Context) { return } - c.JSON(http.StatusCreated, map[string]any{ - "id": deployID, - "message": "deployment created successfully", - }) + c.JSON(http.StatusCreated, deployment) } -// GetDeploymentByID 获取发布任务详情(GET /v1/deployments/:deployID) -func (api *Api) GetDeploymentByID(c *gin.Context) { +// GetDeploymentByServiceAndVersion 获取发布任务详情(GET /v1/deployments/:service/:version) +func (api *Api) GetDeploymentByServiceAndVersion(c *fox.Context) { ctx := c.Request.Context() - deployID := c.Param("deployID") + serviceName := c.Param("service") + versionName := c.Param("version") - if deployID == "" { + if serviceName == "" || versionName == "" { c.JSON(http.StatusBadRequest, map[string]any{ "error": "bad request", - "message": "deployment ID is required", + "message": "service and version are required", }) return } - deployment, err := api.service.GetDeploymentByID(ctx, deployID) + deployment, err := api.service.GetDeploymentByServiceAndVersion(ctx, serviceName, versionName) if err != nil { if err == service.ErrDeploymentNotFound { c.JSON(http.StatusNotFound, map[string]any{ @@ -103,7 +101,7 @@ func (api *Api) GetDeploymentByID(c *gin.Context) { }) return } - log.Error().Err(err).Str("deployID", deployID).Msg("failed to get deployment") + log.Error().Err(err).Str("service", serviceName).Str("version", versionName).Msg("failed to get deployment") c.JSON(http.StatusInternalServerError, map[string]any{ "error": "internal server error", "message": "failed to get deployment", @@ -115,7 +113,7 @@ func (api *Api) GetDeploymentByID(c *gin.Context) { } // GetDeployments 获取发布任务列表(GET /v1/deployments) -func (api *Api) GetDeployments(c *gin.Context) { +func (api *Api) GetDeployments(c *fox.Context) { ctx := c.Request.Context() query := &model.DeploymentQuery{ @@ -145,15 +143,16 @@ func (api *Api) GetDeployments(c *gin.Context) { }) } -// UpdateDeployment 修改发布任务(POST /v1/deployments/:deployID) -func (api *Api) UpdateDeployment(c *gin.Context) { +// UpdateDeployment 修改发布任务(POST /v1/deployments/:service/:version) +func (api *Api) UpdateDeployment(c *fox.Context) { ctx := c.Request.Context() - deployID := c.Param("deployID") + serviceName := c.Param("service") + versionName := c.Param("version") - if deployID == "" { + if serviceName == "" || versionName == "" { c.JSON(http.StatusBadRequest, map[string]any{ "error": "bad request", - "message": "deployment ID is required", + "message": "service and version are required", }) return } @@ -167,7 +166,7 @@ func (api *Api) UpdateDeployment(c *gin.Context) { return } - err := api.service.UpdateDeployment(ctx, deployID, &req) + err := api.service.UpdateDeployment(ctx, serviceName, versionName, &req) if err != nil { if err == service.ErrDeploymentNotFound { c.JSON(http.StatusNotFound, map[string]any{ @@ -183,7 +182,7 @@ func (api *Api) UpdateDeployment(c *gin.Context) { }) return } - log.Error().Err(err).Str("deployID", deployID).Msg("failed to update deployment") + log.Error().Err(err).Str("service", serviceName).Str("version", versionName).Msg("failed to update deployment") c.JSON(http.StatusInternalServerError, map[string]any{ "error": "internal server error", "message": "failed to update deployment", @@ -196,20 +195,21 @@ func (api *Api) UpdateDeployment(c *gin.Context) { }) } -// DeleteDeployment 删除发布任务(DELETE /v1/deployments/:deployID) -func (api *Api) DeleteDeployment(c *gin.Context) { +// DeleteDeployment 删除发布任务(DELETE /v1/deployments/:service/:version) +func (api *Api) DeleteDeployment(c *fox.Context) { ctx := c.Request.Context() - deployID := c.Param("deployID") + serviceName := c.Param("service") + versionName := c.Param("version") - if deployID == "" { + if serviceName == "" || versionName == "" { c.JSON(http.StatusBadRequest, map[string]any{ "error": "bad request", - "message": "deployment ID is required", + "message": "service and version are required", }) return } - err := api.service.DeleteDeployment(ctx, deployID) + err := api.service.DeleteDeployment(ctx, serviceName, versionName) if err != nil { if err == service.ErrDeploymentNotFound { c.JSON(http.StatusNotFound, map[string]any{ @@ -225,7 +225,7 @@ func (api *Api) DeleteDeployment(c *gin.Context) { }) return } - log.Error().Err(err).Str("deployID", deployID).Msg("failed to delete deployment") + log.Error().Err(err).Str("service", serviceName).Str("version", versionName).Msg("failed to delete deployment") c.JSON(http.StatusInternalServerError, map[string]any{ "error": "internal server error", "message": "failed to delete deployment", @@ -238,20 +238,23 @@ func (api *Api) DeleteDeployment(c *gin.Context) { }) } -// PauseDeployment 暂停发布任务(POST /v1/deployments/:deployID/pause) -func (api *Api) PauseDeployment(c *gin.Context) { +// ===== 部署任务控制操作 ===== + +// PauseDeployment 暂停发布任务(POST /v1/deployments/:service/:version/pause) +func (api *Api) PauseDeployment(c *fox.Context) { ctx := c.Request.Context() - deployID := c.Param("deployID") + serviceName := c.Param("service") + versionName := c.Param("version") - if deployID == "" { + if serviceName == "" || versionName == "" { c.JSON(http.StatusBadRequest, map[string]any{ "error": "bad request", - "message": "deployment ID is required", + "message": "service and version are required", }) return } - err := api.service.PauseDeployment(ctx, deployID) + err := api.service.PauseDeployment(ctx, serviceName, versionName) if err != nil { if err == service.ErrDeploymentNotFound { c.JSON(http.StatusNotFound, map[string]any{ @@ -267,7 +270,7 @@ func (api *Api) PauseDeployment(c *gin.Context) { }) return } - log.Error().Err(err).Str("deployID", deployID).Msg("failed to pause deployment") + log.Error().Err(err).Str("service", serviceName).Str("version", versionName).Msg("failed to pause deployment") c.JSON(http.StatusInternalServerError, map[string]any{ "error": "internal server error", "message": "failed to pause deployment", @@ -280,20 +283,21 @@ func (api *Api) PauseDeployment(c *gin.Context) { }) } -// ContinueDeployment 继续发布任务(POST /v1/deployments/:deployID/continue) -func (api *Api) ContinueDeployment(c *gin.Context) { +// ContinueDeployment 继续发布任务(POST /v1/deployments/:service/:version/continue) +func (api *Api) ContinueDeployment(c *fox.Context) { ctx := c.Request.Context() - deployID := c.Param("deployID") + serviceName := c.Param("service") + versionName := c.Param("version") - if deployID == "" { + if serviceName == "" || versionName == "" { c.JSON(http.StatusBadRequest, map[string]any{ "error": "bad request", - "message": "deployment ID is required", + "message": "service and version are required", }) return } - err := api.service.ContinueDeployment(ctx, deployID) + err := api.service.ContinueDeployment(ctx, serviceName, versionName) if err != nil { if err == service.ErrDeploymentNotFound { c.JSON(http.StatusNotFound, map[string]any{ @@ -309,7 +313,7 @@ func (api *Api) ContinueDeployment(c *gin.Context) { }) return } - log.Error().Err(err).Str("deployID", deployID).Msg("failed to continue deployment") + log.Error().Err(err).Str("service", serviceName).Str("version", versionName).Msg("failed to continue deployment") c.JSON(http.StatusInternalServerError, map[string]any{ "error": "internal server error", "message": "failed to continue deployment", @@ -322,20 +326,38 @@ func (api *Api) ContinueDeployment(c *gin.Context) { }) } -// RollbackDeployment 回滚发布任务(POST /v1/deployments/:deployID/rollback) -func (api *Api) RollbackDeployment(c *gin.Context) { +// RollbackDeployment 回滚发布任务(POST /v1/deployments/:service/:version/rollback) +func (api *Api) RollbackDeployment(c *fox.Context) { ctx := c.Request.Context() - deployID := c.Param("deployID") + serviceName := c.Param("service") + versionName := c.Param("version") + + if serviceName == "" || versionName == "" { + c.JSON(http.StatusBadRequest, map[string]any{ + "error": "bad request", + "message": "service and version are required", + }) + return + } + + var req model.RollbackDeploymentRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, map[string]any{ + "error": "bad request", + "message": "invalid request body: " + err.Error(), + }) + return + } - if deployID == "" { + if req.TargetVersion == "" { c.JSON(http.StatusBadRequest, map[string]any{ "error": "bad request", - "message": "deployment ID is required", + "message": "target version is required", }) return } - err := api.service.RollbackDeployment(ctx, deployID) + err := api.service.RollbackDeployment(ctx, serviceName, versionName, &req) if err != nil { if err == service.ErrDeploymentNotFound { c.JSON(http.StatusNotFound, map[string]any{ @@ -351,7 +373,7 @@ func (api *Api) RollbackDeployment(c *gin.Context) { }) return } - log.Error().Err(err).Str("deployID", deployID).Msg("failed to rollback deployment") + log.Error().Err(err).Str("service", serviceName).Str("version", versionName).Msg("failed to rollback deployment") c.JSON(http.StatusInternalServerError, map[string]any{ "error": "internal server error", "message": "failed to rollback deployment", @@ -360,6 +382,6 @@ func (api *Api) RollbackDeployment(c *gin.Context) { } c.JSON(http.StatusOK, map[string]any{ - "message": "deployment rolled back successfully", + "message": "deployment rollback initiated successfully", }) } diff --git a/internal/service_manager/api/info_api.go b/internal/service_manager/api/info_api.go index 89fbfa4..0c81bec 100644 --- a/internal/service_manager/api/info_api.go +++ b/internal/service_manager/api/info_api.go @@ -23,7 +23,7 @@ func (api *Api) setupInfoRouters(router *gin.Engine) { router.DELETE("/v1/services/:service", api.DeleteService) } -// ===== 服务信息相关API ===== +// ===== 服务信息查询API ===== // GetServices 获取所有服务列表(GET /v1/services) func (api *Api) GetServices(c *gin.Context) { @@ -146,7 +146,7 @@ func (api *Api) GetServiceMetricTimeSeries(c *gin.Context) { c.JSON(http.StatusOK, response) } -// ===== 服务管理API(CRUD操作) ===== +// ===== 服务管理CRUD API ===== // CreateService 创建服务(POST /v1/services) func (api *Api) CreateService(c *gin.Context) { diff --git a/internal/service_manager/database/deploy_repo.go b/internal/service_manager/database/deploy_repo.go index 88f809f..974c368 100644 --- a/internal/service_manager/database/deploy_repo.go +++ b/internal/service_manager/database/deploy_repo.go @@ -11,10 +11,7 @@ import ( ) // CreateDeployment 创建发布任务 -func (d *Database) CreateDeployment(ctx context.Context, req *model.CreateDeploymentRequest) (string, error) { - // 生成唯一ID - deployID := "deploy-" + strconv.FormatInt(time.Now().UnixNano(), 36) - +func (d *Database) CreateDeployment(ctx context.Context, req *model.CreateDeploymentRequest) error { // 根据是否有计划时间决定初始状态 var initialStatus model.DeployState if req.ScheduleTime == nil { @@ -23,30 +20,40 @@ func (d *Database) CreateDeployment(ctx context.Context, req *model.CreateDeploy initialStatus = model.StatusUnrelease // 计划发布 } - query := `INSERT INTO deploy_tasks (id, start_time, end_time, target_ratio, instances, deploy_state) - VALUES ($1, $2, $3, $4, $5, $6)` + query := `INSERT INTO deploy_tasks (service, version, start_time, end_time, target_ratio, instances, deploy_state) + VALUES ($1, $2, $3, $4, $5, $6, $7)` // 默认实例为空数组 instances := []string{} instancesJSON, _ := json.Marshal(instances) - _, err := d.ExecContext(ctx, query, deployID, req.ScheduleTime, nil, 0.0, string(instancesJSON), initialStatus) - if err != nil { - return "", err - } + _, err := d.ExecContext(ctx, query, req.Service, req.Version, req.ScheduleTime, nil, 0.0, string(instancesJSON), initialStatus) + return err +} - return deployID, nil +// UpdateDeploymentStatus 更新部署任务状态 +func (d *Database) UpdateDeploymentStatus(ctx context.Context, service, version string, status model.DeployState) error { + query := `UPDATE deploy_tasks SET deploy_state = $1 WHERE service = $2 AND version = $3` + _, err := d.ExecContext(ctx, query, status, service, version) + return err } -// GetDeploymentByID 根据ID获取发布任务详情 -func (d *Database) GetDeploymentByID(ctx context.Context, deployID string) (*model.Deployment, error) { - query := `SELECT id, start_time, end_time, target_ratio, instances, deploy_state - FROM deploy_tasks WHERE id = $1` - row := d.QueryRowContext(ctx, query, deployID) +// UpdateDeploymentFinishTime 更新部署任务完成时间 +func (d *Database) UpdateDeploymentFinishTime(ctx context.Context, service, version string, finishTime time.Time) error { + query := `UPDATE deploy_tasks SET end_time = $1 WHERE service = $2 AND version = $3` + _, err := d.ExecContext(ctx, query, finishTime, service, version) + return err +} + +// GetDeploymentByServiceAndVersion 根据服务名和版本获取发布任务详情 +func (d *Database) GetDeploymentByServiceAndVersion(ctx context.Context, service, version string) (*model.Deployment, error) { + query := `SELECT service, version, start_time, end_time, target_ratio, instances, deploy_state + FROM deploy_tasks WHERE service = $1 AND version = $2` + row := d.QueryRowContext(ctx, query, service, version) var task model.ServiceDeployTask var instancesJSON string - if err := row.Scan(&task.ID, &task.StartTime, &task.EndTime, &task.TargetRatio, + if err := row.Scan(&task.Service, &task.Version, &task.StartTime, &task.EndTime, &task.TargetRatio, &instancesJSON, &task.DeployState); err != nil { if err == sql.ErrNoRows { return nil, nil @@ -62,7 +69,8 @@ func (d *Database) GetDeploymentByID(ctx context.Context, deployID string) (*mod } deployment := &model.Deployment{ - ID: task.ID, + Service: task.Service, + Version: task.Version, Status: task.DeployState, ScheduleTime: task.StartTime, FinishTime: task.EndTime, @@ -73,7 +81,7 @@ func (d *Database) GetDeploymentByID(ctx context.Context, deployID string) (*mod // GetDeployments 获取发布任务列表 func (d *Database) GetDeployments(ctx context.Context, query *model.DeploymentQuery) ([]model.Deployment, error) { - sql := `SELECT id, start_time, end_time, target_ratio, instances, deploy_state + sql := `SELECT service, version, start_time, end_time, target_ratio, instances, deploy_state FROM deploy_tasks WHERE 1=1` args := []any{} @@ -82,8 +90,10 @@ func (d *Database) GetDeployments(ctx context.Context, query *model.DeploymentQu args = append(args, query.Type) } - // 注意:新的deploy_tasks表没有service字段,暂时忽略service过滤 - // TODO: 需要根据业务逻辑决定如何处理service过滤 + if query.Service != "" { + sql += " AND service = $" + strconv.Itoa(len(args)+1) + args = append(args, query.Service) + } sql += " ORDER BY start_time DESC" @@ -102,7 +112,7 @@ func (d *Database) GetDeployments(ctx context.Context, query *model.DeploymentQu for rows.Next() { var task model.ServiceDeployTask var instancesJSON string - if err := rows.Scan(&task.ID, &task.StartTime, &task.EndTime, &task.TargetRatio, + if err := rows.Scan(&task.Service, &task.Version, &task.StartTime, &task.EndTime, &task.TargetRatio, &instancesJSON, &task.DeployState); err != nil { return nil, err } @@ -115,7 +125,8 @@ func (d *Database) GetDeployments(ctx context.Context, query *model.DeploymentQu } deployment := model.Deployment{ - ID: task.ID, + Service: task.Service, + Version: task.Version, Status: task.DeployState, ScheduleTime: task.StartTime, FinishTime: task.EndTime, @@ -128,15 +139,12 @@ func (d *Database) GetDeployments(ctx context.Context, query *model.DeploymentQu } // UpdateDeployment 修改未开始的发布任务 -func (d *Database) UpdateDeployment(ctx context.Context, deployID string, req *model.UpdateDeploymentRequest) error { +func (d *Database) UpdateDeployment(ctx context.Context, service, version string, req *model.UpdateDeploymentRequest) error { sql := `UPDATE deploy_tasks SET ` args := []any{} updates := []string{} paramIndex := 1 - // 注意:新的deploy_tasks表没有version字段,暂时忽略version更新 - // TODO: 需要根据业务逻辑决定如何处理version更新 - if req.ScheduleTime != nil { updates = append(updates, "start_time = $"+strconv.Itoa(paramIndex)) args = append(args, req.ScheduleTime) @@ -152,46 +160,49 @@ func (d *Database) UpdateDeployment(ctx context.Context, deployID string, req *m sql += ", " + updates[i] } - sql += " WHERE id = $" + strconv.Itoa(paramIndex) + " AND deploy_state = $" + strconv.Itoa(paramIndex+1) - args = append(args, deployID, model.StatusUnrelease) + sql += " WHERE service = $" + strconv.Itoa(paramIndex) + " AND version = $" + strconv.Itoa(paramIndex+1) + " AND deploy_state = $" + strconv.Itoa(paramIndex+2) + args = append(args, service, version, model.StatusUnrelease) _, err := d.ExecContext(ctx, sql, args...) return err } // DeleteDeployment 删除未开始的发布任务 -func (d *Database) DeleteDeployment(ctx context.Context, deployID string) error { - query := `DELETE FROM deploy_tasks WHERE id = $1 AND deploy_state = $2` - _, err := d.ExecContext(ctx, query, deployID, model.StatusUnrelease) +func (d *Database) DeleteDeployment(ctx context.Context, service, version string) error { + query := `DELETE FROM deploy_tasks WHERE service = $1 AND version = $2 AND deploy_state = $3` + _, err := d.ExecContext(ctx, query, service, version, model.StatusUnrelease) return err } // PauseDeployment 暂停正在灰度的发布任务 -func (d *Database) PauseDeployment(ctx context.Context, deployID string) error { - query := `UPDATE deploy_tasks SET deploy_state = $1 WHERE id = $2 AND deploy_state = $3` - _, err := d.ExecContext(ctx, query, model.StatusStop, deployID, model.StatusDeploying) +func (d *Database) PauseDeployment(ctx context.Context, service, version string) error { + query := `UPDATE deploy_tasks SET deploy_state = $1 WHERE service = $2 AND version = $3 AND deploy_state = $4` + _, err := d.ExecContext(ctx, query, model.StatusStop, service, version, model.StatusDeploying) return err } // ContinueDeployment 继续发布 -func (d *Database) ContinueDeployment(ctx context.Context, deployID string) error { - query := `UPDATE deploy_tasks SET deploy_state = $1 WHERE id = $2 AND deploy_state = $3` - _, err := d.ExecContext(ctx, query, model.StatusDeploying, deployID, model.StatusStop) +func (d *Database) ContinueDeployment(ctx context.Context, service, version string) error { + query := `UPDATE deploy_tasks SET deploy_state = $1 WHERE service = $2 AND version = $3 AND deploy_state = $4` + _, err := d.ExecContext(ctx, query, model.StatusDeploying, service, version, model.StatusStop) return err } // RollbackDeployment 回滚发布任务 -func (d *Database) RollbackDeployment(ctx context.Context, deployID string) error { - query := `UPDATE deploy_tasks SET deploy_state = $1 WHERE id = $2` - _, err := d.ExecContext(ctx, query, model.StatusRollback, deployID) +func (d *Database) RollbackDeployment(ctx context.Context, service, version string) error { + query := `UPDATE deploy_tasks SET deploy_state = $1 WHERE service = $2 AND version = $3` + _, err := d.ExecContext(ctx, query, model.StatusRollback, service, version) return err } // CheckDeploymentConflict 检查发布冲突 -// 注意:新的deploy_tasks表没有service和version字段,这个方法需要重新设计 -// TODO: 需要根据业务逻辑决定如何检查部署冲突 func (d *Database) CheckDeploymentConflict(ctx context.Context, service, version string) (bool, error) { - // 暂时返回false,表示没有冲突 - // 实际业务中可能需要通过其他方式检查冲突,比如检查正在部署的任务数量 - return false, nil + // 检查是否已存在相同服务和版本的部署任务 + query := `SELECT COUNT(*) FROM deploy_tasks WHERE service = $1 AND version = $2 AND deploy_state IN ($3, $4, $5)` + var count int + err := d.QueryRowContext(ctx, query, service, version, model.StatusDeploying, model.StatusUnrelease, model.StatusStop).Scan(&count) + if err != nil { + return false, err + } + return count > 0, nil } diff --git a/internal/service_manager/model/api.go b/internal/service_manager/model/api.go index feaacbb..87d4a8a 100644 --- a/internal/service_manager/model/api.go +++ b/internal/service_manager/model/api.go @@ -60,7 +60,6 @@ type MetricTimeSeriesQuery struct { // Deployment API响应用的发布任务 type Deployment struct { - ID string `json:"id"` Service string `json:"service"` Version string `json:"version"` Status DeployState `json:"status"` @@ -70,9 +69,11 @@ type Deployment struct { // CreateDeploymentRequest 创建发布任务请求 type CreateDeploymentRequest struct { - Service string `json:"service" binding:"required"` - Version string `json:"version" binding:"required"` - ScheduleTime *time.Time `json:"scheduleTime,omitempty"` // 可选参数,不填为立即发布 + Service string `json:"service" binding:"required"` + Version string `json:"version" binding:"required"` + ScheduleTime *time.Time `json:"scheduleTime,omitempty"` // 可选参数,不填为立即发布 + InstanceCount int `json:"instanceCount,omitempty"` // 可选参数,新服务部署时的实例数量 + PackageURL string `json:"packageUrl,omitempty"` // 可选参数,包下载URL,不填则自动构建 } // UpdateDeploymentRequest 修改发布任务请求 @@ -81,6 +82,12 @@ type UpdateDeploymentRequest struct { ScheduleTime *time.Time `json:"scheduleTime,omitempty"` // 新的计划发布时间 } +// RollbackDeploymentRequest 回滚发布任务请求 +type RollbackDeploymentRequest struct { + TargetVersion string `json:"targetVersion" binding:"required"` // 回滚目标版本 + PackageURL string `json:"packageUrl,omitempty"` // 可选参数,回滚包URL,不填则自动构建 +} + // DeploymentQuery 发布任务查询参数 type DeploymentQuery struct { Type DeployState `form:"type"` // deploying/stop/rollback/completed diff --git a/internal/service_manager/model/constants.go b/internal/service_manager/model/constants.go index 0d68529..c46461e 100644 --- a/internal/service_manager/model/constants.go +++ b/internal/service_manager/model/constants.go @@ -18,4 +18,5 @@ const ( StatusStop DeployState = "stop" // 暂停发布 StatusRollback DeployState = "rollback" // 已回滚 StatusCompleted DeployState = "completed" // 发布完成 + StatusError DeployState = "error" // 发布失败 ) diff --git a/internal/service_manager/model/deploy_task.go b/internal/service_manager/model/deploy_task.go index 9c91444..80e87e3 100644 --- a/internal/service_manager/model/deploy_task.go +++ b/internal/service_manager/model/deploy_task.go @@ -4,7 +4,8 @@ import "time" // ServiceDeployTask 服务部署任务信息 type ServiceDeployTask struct { - ID string `json:"id" db:"id"` // varchar(32) - 主键 + Service string `json:"service" db:"service"` // varchar(255) - 服务名称(联合主键) + Version string `json:"version" db:"version"` // varchar(255) - 版本号(联合主键) StartTime *time.Time `json:"startTime" db:"start_time"` // time - 开始时间 EndTime *time.Time `json:"endTime" db:"end_time"` // time - 结束时间 TargetRatio float64 `json:"targetRatio" db:"target_ratio"` // double(指导值) - 目标比例 diff --git a/internal/service_manager/service/base.go b/internal/service_manager/service/base.go index f010f2e..f35f31e 100644 --- a/internal/service_manager/service/base.go +++ b/internal/service_manager/service/base.go @@ -1,23 +1,52 @@ package service import ( + deployModel "github.com/qiniu/zeroops/internal/deploy/model" + deployService "github.com/qiniu/zeroops/internal/deploy/service" "github.com/qiniu/zeroops/internal/service_manager/database" "github.com/rs/zerolog/log" ) type Service struct { - db *database.Database + db *database.Database + deployService deployService.DeployService + instanceManager InstanceManager + deployAdapter *DeployAdapter +} + +// InstanceManager 定义实例管理接口 +type InstanceManager interface { + GetServiceInstances(serviceName string, version ...string) ([]*deployModel.InstanceInfo, error) + GetInstanceVersionHistory(instanceID string) ([]*deployModel.VersionInfo, error) } func NewService(db *database.Database) *Service { + instanceManager := &instanceManagerImpl{} service := &Service{ - db: db, + db: db, + deployService: deployService.NewDeployService(), + instanceManager: instanceManager, + deployAdapter: NewDeployAdapter(instanceManager), } - log.Info().Msg("Service initialized successfully") + log.Info().Msg("Service initialized successfully with deploy integration") return service } +// instanceManagerImpl 实现 InstanceManager 接口 +type instanceManagerImpl struct{} + +func (i *instanceManagerImpl) GetServiceInstances(serviceName string, version ...string) ([]*deployModel.InstanceInfo, error) { + // TODO: 实现获取服务实例的逻辑 + // 暂时返回空列表,实际应该查询 deploy 模块的数据库 + return []*deployModel.InstanceInfo{}, nil +} + +func (i *instanceManagerImpl) GetInstanceVersionHistory(instanceID string) ([]*deployModel.VersionInfo, error) { + // TODO: 实现获取实例版本历史的逻辑 + return nil, nil +} + func (s *Service) Close() error { return nil } diff --git a/internal/service_manager/service/deploy_adapter.go b/internal/service_manager/service/deploy_adapter.go new file mode 100644 index 0000000..037b4bb --- /dev/null +++ b/internal/service_manager/service/deploy_adapter.go @@ -0,0 +1,167 @@ +package service + +import ( + "fmt" + "net/http" + "os" + "strings" + + deployModel "github.com/qiniu/zeroops/internal/deploy/model" + deployService "github.com/qiniu/zeroops/internal/deploy/service" + "github.com/qiniu/zeroops/internal/service_manager/model" +) + +// DeployAdapter 部署适配器,处理 service_manager 和 deploy 模块间的参数转换 +type DeployAdapter struct { + instanceManager InstanceManager + baseURL string // 包仓库基础URL +} + +// NewDeployAdapter 创建部署适配器 +func NewDeployAdapter(instanceManager InstanceManager) *DeployAdapter { + return &DeployAdapter{ + instanceManager: instanceManager, + baseURL: "/Users/dingnanjia/workspace/mock/zeroops/internal/deploy/packages", // 本地包仓库路径 + } +} + +// BuildDeployNewServiceParams 构建新服务部署参数 +func (a *DeployAdapter) BuildDeployNewServiceParams(req *model.CreateDeploymentRequest) (*deployModel.DeployNewServiceParams, error) { + // 确定实例数量 + instanceCount := a.determineInstanceCount(req) + + // 构建包URL + packageURL := a.buildPackageURL(req.Service, req.Version, req.PackageURL) + + return &deployModel.DeployNewServiceParams{ + Service: req.Service, + Version: req.Version, + TotalNum: instanceCount, + PackageURL: packageURL, + }, nil +} + +// BuildDeployNewVersionParams 构建版本升级参数 +func (a *DeployAdapter) BuildDeployNewVersionParams(req *model.CreateDeploymentRequest) (*deployModel.DeployNewVersionParams, error) { + // 获取服务的现有实例 + instances, err := a.instanceManager.GetServiceInstances(req.Service) + if err != nil { + return nil, fmt.Errorf("failed to get service instances: %w", err) + } + + if len(instances) == 0 { + return nil, fmt.Errorf("no existing instances found for service %s", req.Service) + } + + // 提取实例ID列表 + instanceIDs := make([]string, len(instances)) + for i, inst := range instances { + instanceIDs[i] = inst.InstanceID + } + + // 构建包URL + packageURL := a.buildPackageURL(req.Service, req.Version, req.PackageURL) + + return &deployModel.DeployNewVersionParams{ + Service: req.Service, + Version: req.Version, + Instances: instanceIDs, + PackageURL: packageURL, + }, nil +} + +// BuildRollbackParams 构建回滚参数 +func (a *DeployAdapter) BuildRollbackParams(deployment *model.Deployment, req *model.RollbackDeploymentRequest) (*deployModel.RollbackParams, error) { + // 获取当前部署涉及的实例 + instances, err := a.instanceManager.GetServiceInstances(deployment.Service, deployment.Version) + if err != nil { + return nil, fmt.Errorf("failed to get service instances for rollback: %w", err) + } + + if len(instances) == 0 { + return nil, fmt.Errorf("no instances found for service %s version %s", deployment.Service, deployment.Version) + } + + // 提取实例ID列表 + instanceIDs := make([]string, len(instances)) + for i, inst := range instances { + instanceIDs[i] = inst.InstanceID + } + + // 构建回滚包URL + packageURL := a.buildPackageURL(deployment.Service, req.TargetVersion, req.PackageURL) + + return &deployModel.RollbackParams{ + Service: deployment.Service, + TargetVersion: req.TargetVersion, + Instances: instanceIDs, + PackageURL: packageURL, + }, nil +} + +// determineInstanceCount 确定实例数量 +func (a *DeployAdapter) determineInstanceCount(req *model.CreateDeploymentRequest) int { + // 如果请求中指定了实例数量,使用指定值 + if req.InstanceCount > 0 { + return req.InstanceCount + } + + // 否则使用默认值 + return 1 +} + +// buildPackageURL 构建包下载URL +func (a *DeployAdapter) buildPackageURL(serviceName, version, customURL string) string { + // 如果提供了自定义URL,直接使用 + if customURL != "" { + return customURL + } + + // 否则基于约定构建本地文件路径 + // 格式:{baseURL}/{service}-{version}.tar.gz + return fmt.Sprintf("%s/%s-%s.tar.gz", + a.baseURL, serviceName, version) +} + +// ValidatePackageURL 验证包URL是否有效 +func (a *DeployAdapter) ValidatePackageURL(packageURL string) error { + // 调用 deploy 模块的验证函数 + if err := deployService.ValidatePackageURL(packageURL); err != nil { + return err + } + + // 区分HTTP URL和本地文件路径 + if strings.HasPrefix(packageURL, "http://") || strings.HasPrefix(packageURL, "https://") { + // HTTP URL检查 + resp, err := http.Head(packageURL) + if err != nil { + return fmt.Errorf("package URL not accessible: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + return fmt.Errorf("package not found: HTTP %d", resp.StatusCode) + } + } else { + // 本地文件路径检查 + if _, err := os.Stat(packageURL); err != nil { + if os.IsNotExist(err) { + return fmt.Errorf("package file not found: %s", packageURL) + } + return fmt.Errorf("package file not accessible: %w", err) + } + } + + return nil +} + +// IsNewServiceDeployment 判断是否为新服务部署 +func (a *DeployAdapter) IsNewServiceDeployment(serviceName string) (bool, error) { + instances, err := a.instanceManager.GetServiceInstances(serviceName) + if err != nil { + return false, fmt.Errorf("failed to check existing instances: %w", err) + } + + // 如果没有现有实例,则为新服务部署 + return len(instances) == 0, nil +} diff --git a/internal/service_manager/service/deploy_service.go b/internal/service_manager/service/deploy_service.go index 72b6d8b..05d3236 100644 --- a/internal/service_manager/service/deploy_service.go +++ b/internal/service_manager/service/deploy_service.go @@ -2,6 +2,7 @@ package service import ( "context" + "time" "github.com/qiniu/zeroops/internal/service_manager/model" "github.com/rs/zerolog/log" @@ -10,43 +11,143 @@ import ( // ===== 部署管理业务方法 ===== // CreateDeployment 创建发布任务 -func (s *Service) CreateDeployment(ctx context.Context, req *model.CreateDeploymentRequest) (string, error) { +func (s *Service) CreateDeployment(ctx context.Context, req *model.CreateDeploymentRequest) (*model.Deployment, error) { // 检查服务是否存在 service, err := s.db.GetServiceByName(ctx, req.Service) if err != nil { - return "", err + return nil, err } if service == nil { - return "", ErrServiceNotFound + return nil, ErrServiceNotFound } // 检查发布冲突 conflict, err := s.db.CheckDeploymentConflict(ctx, req.Service, req.Version) if err != nil { - return "", err + return nil, err } if conflict { - return "", ErrDeploymentConflict + return nil, ErrDeploymentConflict } - // 创建发布任务 - deployID, err := s.db.CreateDeployment(ctx, req) + // 创建发布任务记录 + err = s.db.CreateDeployment(ctx, req) if err != nil { - return "", err + return nil, err } log.Info(). - Str("deployID", deployID). Str("service", req.Service). Str("version", req.Version). Msg("deployment created successfully") - return deployID, nil + // 异步执行实际部署 + go s.executeActualDeployment(context.Background(), req) + + // 返回创建的部署信息 + deployment := &model.Deployment{ + Service: req.Service, + Version: req.Version, + Status: model.StatusDeploying, + ScheduleTime: req.ScheduleTime, + } + + return deployment, nil } -// GetDeploymentByID 获取发布任务详情 -func (s *Service) GetDeploymentByID(ctx context.Context, deployID string) (*model.Deployment, error) { - deployment, err := s.db.GetDeploymentByID(ctx, deployID) +// executeActualDeployment 执行实际部署操作 +func (s *Service) executeActualDeployment(ctx context.Context, req *model.CreateDeploymentRequest) { + // 捕获panic,防止goroutine崩溃 + defer func() { + if r := recover(); r != nil { + log.Error().Interface("panic", r).Str("service", req.Service).Str("version", req.Version).Msg("deployment execution panic") + s.db.UpdateDeploymentStatus(ctx, req.Service, req.Version, model.StatusError) + } + }() + + // 1. 更新状态为 deploying + if err := s.db.UpdateDeploymentStatus(ctx, req.Service, req.Version, model.StatusDeploying); err != nil { + log.Error().Err(err).Str("service", req.Service).Str("version", req.Version).Msg("failed to update deployment status to deploying") + return + } + + // 2. 判断是新服务部署还是版本升级 + isNewService, err := s.deployAdapter.IsNewServiceDeployment(req.Service) + if err != nil { + log.Error().Err(err).Str("service", req.Service).Str("version", req.Version).Msg("failed to determine deployment type") + s.db.UpdateDeploymentStatus(ctx, req.Service, req.Version, model.StatusError) + return + } + + var result interface{} + + if isNewService { + // 新服务部署 + params, err := s.deployAdapter.BuildDeployNewServiceParams(req) + if err != nil { + log.Error().Err(err).Str("service", req.Service).Str("version", req.Version).Msg("failed to build deploy new service params") + s.db.UpdateDeploymentStatus(ctx, req.Service, req.Version, model.StatusError) + return + } + + // 验证包URL + if err := s.deployAdapter.ValidatePackageURL(params.PackageURL); err != nil { + log.Error().Err(err).Str("service", req.Service).Str("version", req.Version).Str("packageURL", params.PackageURL).Msg("package validation failed") + s.db.UpdateDeploymentStatus(ctx, req.Service, req.Version, model.StatusError) + return + } + + result, err = s.deployService.DeployNewService(params) + if err != nil { + log.Error().Err(err).Str("service", req.Service).Str("version", req.Version).Msg("deploy new service failed") + s.db.UpdateDeploymentStatus(ctx, req.Service, req.Version, model.StatusError) + return + } + } else { + // 版本升级 + params, err := s.deployAdapter.BuildDeployNewVersionParams(req) + if err != nil { + log.Error().Err(err).Str("service", req.Service).Str("version", req.Version).Msg("failed to build deploy new version params") + s.db.UpdateDeploymentStatus(ctx, req.Service, req.Version, model.StatusError) + return + } + + // 验证包URL + if err := s.deployAdapter.ValidatePackageURL(params.PackageURL); err != nil { + log.Error().Err(err).Str("service", req.Service).Str("version", req.Version).Str("packageURL", params.PackageURL).Msg("package validation failed") + s.db.UpdateDeploymentStatus(ctx, req.Service, req.Version, model.StatusError) + return + } + + result, err = s.deployService.DeployNewVersion(params) + if err != nil { + log.Error().Err(err).Str("service", req.Service).Str("version", req.Version).Msg("deploy new version failed") + s.db.UpdateDeploymentStatus(ctx, req.Service, req.Version, model.StatusError) + return + } + } + + // 3. 部署成功,更新状态 + if err := s.db.UpdateDeploymentStatus(ctx, req.Service, req.Version, model.StatusCompleted); err != nil { + log.Error().Err(err).Str("service", req.Service).Str("version", req.Version).Msg("failed to update deployment status to completed") + return + } + + // 4. 更新完成时间 + if err := s.db.UpdateDeploymentFinishTime(ctx, req.Service, req.Version, time.Now()); err != nil { + log.Error().Err(err).Str("service", req.Service).Str("version", req.Version).Msg("failed to update deployment finish time") + } + + log.Info(). + Str("service", req.Service). + Str("version", req.Version). + Interface("result", result). + Msg("deployment executed successfully") +} + +// GetDeploymentByServiceAndVersion 根据服务名和版本获取发布任务详情 +func (s *Service) GetDeploymentByServiceAndVersion(ctx context.Context, service, version string) (*model.Deployment, error) { + deployment, err := s.db.GetDeploymentByServiceAndVersion(ctx, service, version) if err != nil { return nil, err } @@ -62,9 +163,9 @@ func (s *Service) GetDeployments(ctx context.Context, query *model.DeploymentQue } // UpdateDeployment 修改发布任务 -func (s *Service) UpdateDeployment(ctx context.Context, deployID string, req *model.UpdateDeploymentRequest) error { +func (s *Service) UpdateDeployment(ctx context.Context, service, version string, req *model.UpdateDeploymentRequest) error { // 检查部署任务是否存在 - deployment, err := s.db.GetDeploymentByID(ctx, deployID) + deployment, err := s.db.GetDeploymentByServiceAndVersion(ctx, service, version) if err != nil { return err } @@ -77,22 +178,23 @@ func (s *Service) UpdateDeployment(ctx context.Context, deployID string, req *mo return ErrInvalidDeployState } - err = s.db.UpdateDeployment(ctx, deployID, req) + err = s.db.UpdateDeployment(ctx, service, version, req) if err != nil { return err } log.Info(). - Str("deployID", deployID). + Str("service", service). + Str("version", version). Msg("deployment updated successfully") return nil } // DeleteDeployment 删除发布任务 -func (s *Service) DeleteDeployment(ctx context.Context, deployID string) error { +func (s *Service) DeleteDeployment(ctx context.Context, service, version string) error { // 检查部署任务是否存在 - deployment, err := s.db.GetDeploymentByID(ctx, deployID) + deployment, err := s.db.GetDeploymentByServiceAndVersion(ctx, service, version) if err != nil { return err } @@ -105,22 +207,23 @@ func (s *Service) DeleteDeployment(ctx context.Context, deployID string) error { return ErrInvalidDeployState } - err = s.db.DeleteDeployment(ctx, deployID) + err = s.db.DeleteDeployment(ctx, service, version) if err != nil { return err } log.Info(). - Str("deployID", deployID). + Str("service", service). + Str("version", version). Msg("deployment deleted successfully") return nil } // PauseDeployment 暂停发布任务 -func (s *Service) PauseDeployment(ctx context.Context, deployID string) error { +func (s *Service) PauseDeployment(ctx context.Context, service, version string) error { // 检查部署任务是否存在且为正在部署状态 - deployment, err := s.db.GetDeploymentByID(ctx, deployID) + deployment, err := s.db.GetDeploymentByServiceAndVersion(ctx, service, version) if err != nil { return err } @@ -131,22 +234,23 @@ func (s *Service) PauseDeployment(ctx context.Context, deployID string) error { return ErrInvalidDeployState } - err = s.db.PauseDeployment(ctx, deployID) + err = s.db.PauseDeployment(ctx, service, version) if err != nil { return err } log.Info(). - Str("deployID", deployID). + Str("service", service). + Str("version", version). Msg("deployment paused successfully") return nil } // ContinueDeployment 继续发布任务 -func (s *Service) ContinueDeployment(ctx context.Context, deployID string) error { +func (s *Service) ContinueDeployment(ctx context.Context, service, version string) error { // 检查部署任务是否存在且为暂停状态 - deployment, err := s.db.GetDeploymentByID(ctx, deployID) + deployment, err := s.db.GetDeploymentByServiceAndVersion(ctx, service, version) if err != nil { return err } @@ -157,22 +261,23 @@ func (s *Service) ContinueDeployment(ctx context.Context, deployID string) error return ErrInvalidDeployState } - err = s.db.ContinueDeployment(ctx, deployID) + err = s.db.ContinueDeployment(ctx, service, version) if err != nil { return err } log.Info(). - Str("deployID", deployID). + Str("service", service). + Str("version", version). Msg("deployment continued successfully") return nil } // RollbackDeployment 回滚发布任务 -func (s *Service) RollbackDeployment(ctx context.Context, deployID string) error { +func (s *Service) RollbackDeployment(ctx context.Context, service, version string, req *model.RollbackDeploymentRequest) error { // 检查部署任务是否存在 - deployment, err := s.db.GetDeploymentByID(ctx, deployID) + deployment, err := s.db.GetDeploymentByServiceAndVersion(ctx, service, version) if err != nil { return err } @@ -185,14 +290,56 @@ func (s *Service) RollbackDeployment(ctx context.Context, deployID string) error return ErrInvalidDeployState } - err = s.db.RollbackDeployment(ctx, deployID) + // 异步执行实际回滚 + go s.executeActualRollback(context.Background(), deployment, req) + + // 更新数据库状态 + err = s.db.RollbackDeployment(ctx, service, version) if err != nil { return err } log.Info(). - Str("deployID", deployID). - Msg("deployment rolled back successfully") + Str("service", service). + Str("version", version). + Str("targetVersion", req.TargetVersion). + Msg("deployment rollback initiated successfully") return nil } + +// executeActualRollback 执行实际回滚操作 +func (s *Service) executeActualRollback(ctx context.Context, deployment *model.Deployment, req *model.RollbackDeploymentRequest) { + // 捕获panic,防止goroutine崩溃 + defer func() { + if r := recover(); r != nil { + log.Error().Interface("panic", r).Str("service", deployment.Service).Str("version", deployment.Version).Msg("rollback execution panic") + } + }() + + // 1. 构建回滚参数 + params, err := s.deployAdapter.BuildRollbackParams(deployment, req) + if err != nil { + log.Error().Err(err).Str("service", deployment.Service).Str("version", deployment.Version).Msg("failed to build rollback params") + return + } + + // 2. 验证回滚包URL + if err := s.deployAdapter.ValidatePackageURL(params.PackageURL); err != nil { + log.Error().Err(err).Str("service", deployment.Service).Str("version", deployment.Version).Str("packageURL", params.PackageURL).Msg("rollback package validation failed") + return + } + + // 3. 执行回滚 + result, err := s.deployService.ExecuteRollback(params) + if err != nil { + log.Error().Err(err).Str("service", deployment.Service).Str("version", deployment.Version).Msg("execute rollback failed") + return + } + + log.Info(). + Str("service", deployment.Service). + Str("version", deployment.Version). + Interface("result", result). + Msg("rollback executed successfully") +} diff --git a/internal/service_manager/service/info_service.go b/internal/service_manager/service/info_service.go index c7d2311..6e003bd 100644 --- a/internal/service_manager/service/info_service.go +++ b/internal/service_manager/service/info_service.go @@ -7,7 +7,7 @@ import ( "github.com/rs/zerolog/log" ) -// ===== 服务管理业务方法 ===== +// ===== 服务信息查询方法 ===== // GetServicesResponse 获取服务列表响应 func (s *Service) GetServicesResponse(ctx context.Context) (*model.ServicesResponse, error) { @@ -115,21 +115,6 @@ func (s *Service) GetServiceAvailableVersions(ctx context.Context, serviceName, return versions, nil } -// CreateService 创建服务 -func (s *Service) CreateService(ctx context.Context, service *model.Service) error { - return s.db.CreateService(ctx, service) -} - -// UpdateService 更新服务信息 -func (s *Service) UpdateService(ctx context.Context, service *model.Service) error { - return s.db.UpdateService(ctx, service) -} - -// DeleteService 删除服务 -func (s *Service) DeleteService(ctx context.Context, name string) error { - return s.db.DeleteService(ctx, name) -} - // GetServiceMetricTimeSeries 获取服务时序指标数据 func (s *Service) GetServiceMetricTimeSeries(ctx context.Context, serviceName, metricName string, query *model.MetricTimeSeriesQuery) (*model.PrometheusQueryRangeResponse, error) { // TODO:这里应该调用实际的Prometheus或其他监控系统API @@ -172,3 +157,20 @@ func (s *Service) GetServiceMetricTimeSeries(ctx context.Context, serviceName, m return response, nil } + +// ===== 服务管理CRUD方法 ===== + +// CreateService 创建服务 +func (s *Service) CreateService(ctx context.Context, service *model.Service) error { + return s.db.CreateService(ctx, service) +} + +// UpdateService 更新服务信息 +func (s *Service) UpdateService(ctx context.Context, service *model.Service) error { + return s.db.UpdateService(ctx, service) +} + +// DeleteService 删除服务 +func (s *Service) DeleteService(ctx context.Context, name string) error { + return s.db.DeleteService(ctx, name) +} diff --git a/mock/s3/deployments/docker-compose.yml b/mock/s3/deployments/docker-compose.yml index 9f30f9e..9575909 100644 --- a/mock/s3/deployments/docker-compose.yml +++ b/mock/s3/deployments/docker-compose.yml @@ -103,11 +103,17 @@ services: infra-network: ipv4_address: 172.21.0.21 ports: - - "3000:3000" + - "3001:3000" environment: GF_SECURITY_ADMIN_USER: ${GRAFANA_USER:-admin} GF_SECURITY_ADMIN_PASSWORD: ${GRAFANA_PASSWORD:-admin123} GF_INSTALL_PLUGINS: grafana-piechart-panel + # 启用匿名访问 + GF_AUTH_ANONYMOUS_ENABLED: "true" + GF_AUTH_ANONYMOUS_ORG_NAME: "Main Org." + GF_AUTH_ANONYMOUS_ORG_ROLE: "Admin" + GF_AUTH_BASIC_ENABLED: "false" + GF_AUTH_DISABLE_LOGIN_FORM: "true" volumes: - grafana-data:/var/lib/grafana - ./grafana/provisioning:/etc/grafana/provisioning:ro diff --git a/mock/s3/deployments/grafana/provisioning/dashboards/qps-monitoring.json b/mock/s3/deployments/grafana/provisioning/dashboards/qps-monitoring.json new file mode 100644 index 0000000..2d8a97b --- /dev/null +++ b/mock/s3/deployments/grafana/provisioning/dashboards/qps-monitoring.json @@ -0,0 +1,435 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": "-- Grafana --", + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "editable": true, + "gnetId": null, + "graphTooltip": 1, + "id": null, + "links": [], + "panels": [ + { + "datasource": "Prometheus", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "tooltip": false, + "viz": false, + "legend": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": true, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "reqps" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 0 + }, + "id": 2, + "options": { + "legend": { + "calcs": ["mean", "max"], + "displayMode": "table", + "placement": "bottom" + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "8.0.0", + "targets": [ + { + "expr": "sum(rate(http_latency_count[1m])) by (service_name)", + "legendFormat": "{{service_name}}", + "refId": "A" + } + ], + "title": "服务 QPS (每秒请求数)", + "type": "timeseries" + }, + { + "datasource": "Prometheus", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 100 + }, + { + "color": "red", + "value": 500 + } + ] + }, + "unit": "reqps" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 0 + }, + "id": 3, + "options": { + "orientation": "auto", + "reduceOptions": { + "values": false, + "calcs": ["lastNotNull"], + "fields": "" + }, + "showThresholdLabels": false, + "showThresholdMarkers": true + }, + "pluginVersion": "8.0.0", + "targets": [ + { + "expr": "sum(rate(http_latency_count[1m]))", + "refId": "A" + } + ], + "title": "总 QPS", + "type": "gauge" + }, + { + "datasource": "Prometheus", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "tooltip": false, + "viz": false, + "legend": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": true, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "reqps" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 24, + "x": 0, + "y": 8 + }, + "id": 4, + "options": { + "legend": { + "calcs": ["mean", "max", "last"], + "displayMode": "table", + "placement": "right" + }, + "tooltip": { + "mode": "multi", + "sort": "desc" + } + }, + "pluginVersion": "8.0.0", + "targets": [ + { + "expr": "sum(rate(http_latency_count[1m])) by (service_name, http_route)", + "legendFormat": "{{service_name}} - {{http_route}}", + "refId": "A" + } + ], + "title": "按路径的 QPS", + "type": "timeseries" + }, + { + "datasource": "Prometheus", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "bars", + "fillOpacity": 100, + "gradientMode": "none", + "hideFrom": { + "tooltip": false, + "viz": false, + "legend": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": true, + "stacking": { + "group": "A", + "mode": "normal" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 16 + }, + "id": 5, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom" + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "8.0.0", + "targets": [ + { + "expr": "sum(rate(http_latency_count[5m])) by (http_status_code)", + "legendFormat": "HTTP {{http_status_code}}", + "refId": "A" + } + ], + "title": "按状态码的 QPS", + "type": "timeseries" + }, + { + "datasource": "Prometheus", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "tooltip": false, + "viz": false, + "legend": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": true, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 16 + }, + "id": 6, + "options": { + "legend": { + "calcs": ["mean", "max"], + "displayMode": "table", + "placement": "bottom" + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "8.0.0", + "targets": [ + { + "expr": "histogram_quantile(0.95, sum(rate(http_latency_bucket[5m])) by (service_name, le))", + "legendFormat": "{{service_name}} P95", + "refId": "A" + }, + { + "expr": "histogram_quantile(0.99, sum(rate(http_latency_bucket[5m])) by (service_name, le))", + "legendFormat": "{{service_name}} P99", + "refId": "B" + } + ], + "title": "HTTP 延迟 (P95/P99)", + "type": "timeseries" + } + ], + "refresh": "10s", + "schemaVersion": 27, + "style": "dark", + "tags": ["qps", "monitoring", "mock-s3"], + "templating": { + "list": [ + { + "current": { + "selected": false, + "text": "All", + "value": "$__all" + }, + "description": null, + "error": null, + "hide": 0, + "includeAll": true, + "label": "Service", + "multi": true, + "name": "service", + "options": [], + "query": "label_values(http_latency_count, service_name)", + "refresh": 1, + "regex": "", + "skipUrlSync": false, + "sort": 0, + "type": "query" + } + ] + }, + "time": { + "from": "now-1h", + "to": "now" + }, + "timepicker": { + "refresh_intervals": ["5s", "10s", "30s", "1m", "5m", "15m", "30m", "1h", "2h", "1d"] + }, + "timezone": "", + "title": "MockS3 QPS 监控", + "uid": "mock-s3-qps", + "version": 1 +} \ No newline at end of file diff --git a/mock/s3/deployments/observability/prometheus.yml b/mock/s3/deployments/observability/prometheus.yml index 6c9fb04..b943670 100644 --- a/mock/s3/deployments/observability/prometheus.yml +++ b/mock/s3/deployments/observability/prometheus.yml @@ -14,7 +14,7 @@ alerting: alertmanagers: - static_configs: - targets: - - 'host.docker.internal:8081' # Prometheus Adapter 运行在宿主机的 8081 端口 + - '10.210.10.33:9999' # Prometheus Adapter 运行在 9999 端口 api_version: v2 # 使用 Alertmanager API v2 scrape_configs: diff --git a/scripts/deploy/build.sh b/scripts/deploy/build.sh new file mode 100644 index 0000000..8c5047f --- /dev/null +++ b/scripts/deploy/build.sh @@ -0,0 +1,344 @@ +#!/bin/bash + +# 服务打包脚本 - 重写版本 +# 专门用于打包storage-service,采用正确的Floyd部署路径 + +set -e + +# 颜色定义 +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +BLUE='\033[0;34m' +NC='\033[0m' # No Color + +# 日志函数 +log_info() { + echo -e "${GREEN}[INFO]${NC} $1" +} + +log_warn() { + echo -e "${YELLOW}[WARN]${NC} $1" +} + +log_error() { + echo -e "${RED}[ERROR]${NC} $1" +} + +log_debug() { + echo -e "${BLUE}[DEBUG]${NC} $1" +} + +# 配置变量 +VERSION="v1.0.0" +SERVICE_NAME="storage" +BUILD_DIR="" +PACKAGE_DIR="" +TEMP_DIR="" + +# 创建构建目录 +create_build_dir() { + log_info "创建构建目录..." + BUILD_DIR=$(mktemp -d) + PACKAGE_DIR="$BUILD_DIR/package" + TEMP_DIR="$BUILD_DIR/temp" + + mkdir -p "$PACKAGE_DIR" + mkdir -p "$TEMP_DIR" + + log_debug "构建目录: $BUILD_DIR" + log_debug "包目录: $PACKAGE_DIR" + log_debug "临时目录: $TEMP_DIR" +} + +# 创建配置文件 - 使用正确的Floyd路径结构 +create_config() { + log_info "复制原版配置文件..." + + # 直接复制原版配置文件并重命名为config.yaml + cp "mock/s3/services/storage/config/storage-config.yaml" "$PACKAGE_DIR/config.yaml" + + # 更新版本号(如果需要的话) + sed -i "s/version: .*/version: $VERSION/" "$PACKAGE_DIR/config.yaml" 2>/dev/null || true + + log_debug "配置文件已复制: $PACKAGE_DIR/config.yaml" +} + +# 创建启动脚本 - 使用正确的Floyd路径 +create_start_script() { + log_info "创建启动脚本..." + + cat > "$PACKAGE_DIR/start.sh" << 'EOF' +#!/bin/bash + +# 服务启动脚本 - Floyd部署版本 +set -e + +# Floyd部署路径结构 +# 服务目录: /home/qboxserver/storage/ +# 版本目录: /home/qboxserver/storage/_fpkgs/版本号/ +# 包目录: /home/qboxserver/storage/_package/ + +# 设置环境变量 +export SERVICE_NAME="${SERVICE_NAME:-storage}" +export LOG_LEVEL="${LOG_LEVEL:-info}" + +# Floyd部署的标准路径 +FLOYD_SERVICE_DIR="/home/qboxserver/storage" +FLOYD_PACKAGE_DIR="/home/qboxserver/storage/_package" + +# 切换到Floyd包目录(Floyd部署时包内容在_package目录) +cd "$FLOYD_PACKAGE_DIR" || { echo "错误: 无法切换到包目录 $FLOYD_PACKAGE_DIR"; exit 1; } + +# 从_package目录的config.yaml读取端口 +if [ -f "config.yaml" ]; then + SERVICE_PORT=$(grep "port:" config.yaml | sed 's/.*port:\s*\([0-9]*\).*/\1/') + if [ -z "$SERVICE_PORT" ]; then + SERVICE_PORT="8080" + fi + echo "从_package/config.yaml读取到端口: $SERVICE_PORT" +else + SERVICE_PORT="8080" + echo "配置文件不存在,使用默认端口: $SERVICE_PORT" +fi +export SERVICE_PORT + +# 检查配置文件 +if [ ! -f "config.yaml" ]; then + echo "错误: 配置文件 config.yaml 不存在" + exit 1 +fi + +# 检查可执行文件 +if [ ! -f "storage-service" ]; then + echo "错误: 可执行文件 storage-service 不存在" + exit 1 +fi + +# 设置权限 +chmod +x "storage-service" + +# 启动服务 +echo "启动 $SERVICE_NAME 服务..." +echo "工作目录: $FLOYD_PACKAGE_DIR" +echo "端口: $SERVICE_PORT" +echo "日志级别: $LOG_LEVEL" + +# 后台运行服务 +nohup ./storage-service \ + --config=config.yaml \ + --port="$SERVICE_PORT" \ + --log-level="$LOG_LEVEL" \ + > storage.log 2>&1 & + +# 保存PID +echo $! > storage.pid + +echo "$SERVICE_NAME 服务已启动,PID: $(cat storage.pid)" + +EOF + + chmod +x "$PACKAGE_DIR/start.sh" + log_debug "启动脚本已创建: $PACKAGE_DIR/start.sh" +} + +# 创建停止脚本 +create_stop_script() { + log_info "创建停止脚本..." + + cat > "$PACKAGE_DIR/stop.sh" << 'EOF' +#!/bin/bash + +# 服务停止脚本 +set -e + +# Floyd部署路径结构 +# 服务目录: /home/qboxserver/storage/ +# 版本目录: /home/qboxserver/storage/_fpkgs/版本号/ +# 包目录: /home/qboxserver/storage/_package/ + +# Floyd部署的标准路径 +FLOYD_SERVICE_DIR="/home/qboxserver/storage" +FLOYD_PACKAGE_DIR="/home/qboxserver/storage/_package" + +# 设置环境变量 +export SERVICE_NAME="${SERVICE_NAME:-storage}" + +# 切换到Floyd包目录(Floyd部署时包内容在_package目录) +cd "$FLOYD_PACKAGE_DIR" || { echo "错误: 无法切换到包目录 $FLOYD_PACKAGE_DIR"; exit 1; } + +# 检查PID文件 +if [ -f "storage.pid" ]; then + PID=$(cat storage.pid) + if kill -0 "$PID" 2>/dev/null; then + echo "停止 $SERVICE_NAME 服务 (PID: $PID)..." + kill "$PID" + + # 等待进程结束 + for i in {1..10}; do + if ! kill -0 "$PID" 2>/dev/null; then + echo "$SERVICE_NAME 服务已停止" + rm -f storage.pid + exit 0 + fi + sleep 1 + done + + # 强制杀死进程 + echo "强制停止 $SERVICE_NAME 服务..." + kill -9 "$PID" 2>/dev/null || true + rm -f storage.pid + else + echo "$SERVICE_NAME 服务未运行" + rm -f storage.pid + fi +else + echo "$SERVICE_NAME 服务未运行" +fi +EOF + + chmod +x "$PACKAGE_DIR/stop.sh" + log_debug "停止脚本已创建: $PACKAGE_DIR/stop.sh" +} + +# 创建可执行文件 +create_executable() { + log_info "创建可执行文件..." + + local executable_path="$PACKAGE_DIR/storage-service" + + # 创建一个简单的Go程序作为可执行文件 + cat > "$TEMP_DIR/main.go" << EOF +package main + +import ( + "flag" + "fmt" + "log" + "net/http" + "os" + "os/signal" + "syscall" +) + +func main() { + var ( + configFile = flag.String("config", "config.yaml", "配置文件路径") + port = flag.String("port", "8080", "服务端口") + logLevel = flag.String("log-level", "info", "日志级别") + ) + flag.Parse() + + fmt.Printf("启动 storage 服务...\n") + fmt.Printf("配置文件: %s\n", *configFile) + fmt.Printf("端口: %s\n", *port) + fmt.Printf("日志级别: %s\n", *logLevel) + + // 简单的HTTP服务器 + http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte("OK")) + }) + + http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte("Hello from storage service!")) + }) + + server := &http.Server{ + Addr: ":" + *port, + Handler: nil, + } + + // 启动服务器 + go func() { + log.Printf("服务器启动在端口 %s", *port) + if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Fatalf("服务器启动失败: %v", err) + } + }() + + // 等待中断信号 + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + <-c + + log.Println("正在关闭服务器...") + server.Close() + log.Println("服务器已关闭") +} +EOF + + # 编译Go程序(指定Linux目标平台) + cd "$TEMP_DIR" + go mod init temp 2>/dev/null || true + CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o "$executable_path" main.go + + if [ -f "$executable_path" ]; then + chmod +x "$executable_path" + log_debug "可执行文件已创建: $executable_path" + else + log_error "可执行文件创建失败" + exit 1 + fi +} + +# 打包服务 +package_service() { + log_info "打包服务..." + + local package_name="storage-${VERSION}.tar.gz" + local project_root="/Users/dingnanjia/workspace/mock/zeroops" + local output_dir="$project_root/internal/deploy/packages" + local package_path="$output_dir/$package_name" + + # 确保packages目录存在 + mkdir -p "$output_dir" + + # 调试信息 + log_debug "项目根目录: $project_root" + log_debug "输出目录: $output_dir" + log_debug "包路径: $package_path" + log_debug "包目录: $PACKAGE_DIR" + + # 进入包目录 + cd "$PACKAGE_DIR" + + # 创建tar.gz包(使用绝对路径) + tar -czf "$package_path" . + + if [ -f "$package_path" ]; then + log_info "服务包已创建: $package_path" + log_debug "包大小: $(du -h "$package_path" | cut -f1)" + else + log_error "服务包创建失败" + exit 1 + fi +} + +# 清理临时文件 +cleanup() { + log_info "清理临时文件..." + if [ -n "$BUILD_DIR" ] && [ -d "$BUILD_DIR" ]; then + rm -rf "$BUILD_DIR" + log_debug "临时目录已清理: $BUILD_DIR" + fi +} + +# 主函数 +main() { + log_info "开始打包 storage 服务..." + create_build_dir + create_config + create_start_script + create_stop_script + create_executable + package_service + cleanup + + log_info "服务打包完成!" + log_info "部署包: storage-${VERSION}.tar.gz" +} + +# 执行主函数 +main "$@" \ No newline at end of file diff --git a/scripts/test_deployment.sh b/scripts/test_deployment.sh new file mode 100755 index 0000000..9c5971f --- /dev/null +++ b/scripts/test_deployment.sh @@ -0,0 +1,171 @@ +#!/bin/bash + +# ZeroOps 部署测试脚本 +# 用于测试 storage 服务的部署功能 + +set -e + +# 配置参数 +SERVICE_NAME="storage" +BASE_URL="http://localhost:8080" +PACKAGE_PATH="/Users/dingnanjia/workspace/mock/zeroops/internal/deploy/packages" + +# 颜色定义 +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +NC='\033[0m' # No Color + +# 日志函数 +log_info() { + echo -e "${GREEN}[INFO]${NC} $1" +} + +log_warn() { + echo -e "${YELLOW}[WARN]${NC} $1" +} + +log_error() { + echo -e "${RED}[ERROR]${NC} $1" +} + +# 检查服务是否运行 +check_service() { + log_info "检查 ZeroOps 服务状态..." + if curl -s "${BASE_URL}/v1/services" > /dev/null; then + log_info "ZeroOps 服务运行正常" + else + log_error "ZeroOps 服务未运行,请先启动服务" + exit 1 + fi +} + +# 部署服务 +deploy_service() { + local version="$1" + local package_file="$2" + local instance_count="${3:-2}" + + log_info "开始部署 ${SERVICE_NAME} 版本 ${version}..." + + # 检查包文件是否存在 + if [[ ! -f "$package_file" ]]; then + log_error "包文件不存在: $package_file" + return 1 + fi + + # 获取当前时间作为调度时间 + local schedule_time=$(date -u +"%Y-%m-%dT%H:%M:%SZ") + + # 构建请求数据 + local request_data=$(cat </dev/null || echo "$response" + else + log_error "查询部署列表失败" + fi +} + +# 帮助信息 +show_help() { + echo "用法: $0 [命令] [参数]" + echo "" + echo "命令:" + echo " deploy [instance_count] - 部署指定版本 (默认2个实例)" + echo " status - 查看指定版本的部署状态" + echo " list - 列出所有部署任务" + echo " check - 检查服务状态" + echo "" + echo "示例:" + echo " $0 deploy v1.0.2 - 部署 v1.0.2 版本,2个实例" + echo " $0 deploy v1.0.3 3 - 部署 v1.0.3 版本,3个实例" + echo " $0 status v1.0.2 - 查看 v1.0.2 的部署状态" + echo " $0 list - 查看所有部署" + echo "" + echo "注意: 目前只有 storage-v1.0.0.tar.gz 包文件可用" +} + +# 主函数 +main() { + case "${1:-help}" in + "deploy") + if [[ -z "$2" ]]; then + log_error "请指定版本号" + show_help + exit 1 + fi + check_service + # 注意:目前只使用 v1.0.0 的包文件 + deploy_service "$2" "${PACKAGE_PATH}/storage-v1.0.0.tar.gz" "$3" + ;; + "status") + if [[ -z "$2" ]]; then + log_error "请指定版本号" + exit 1 + fi + check_deployment_status "${SERVICE_NAME}" "$2" + ;; + "list") + list_all_deployments + ;; + "check") + check_service + ;; + "help"|*) + show_help + ;; + esac +} + +# 运行主函数 +main "$@" \ No newline at end of file diff --git a/simple_webhook_server.py b/simple_webhook_server.py new file mode 100755 index 0000000..b8e840e --- /dev/null +++ b/simple_webhook_server.py @@ -0,0 +1,167 @@ +#!/usr/bin/env python3 +""" +简单的 Webhook 服务器 - 使用标准库接收 Alertmanager 告警 +""" + +from http.server import HTTPServer, BaseHTTPRequestHandler +import json +from datetime import datetime +import threading +import time + +# 存储接收到的告警 +alerts_received = [] +alerts_lock = threading.Lock() + +class WebhookHandler(BaseHTTPRequestHandler): + def do_POST(self): + """处理 POST 请求""" + if self.path == '/v1/integrations/alertmanager/webhook': + try: + # 读取请求体 + content_length = int(self.headers['Content-Length']) + post_data = self.rfile.read(content_length) + + # 解析 JSON + data = json.loads(post_data.decode('utf-8')) + + # 记录告警 + timestamp = datetime.now().isoformat() + with alerts_lock: + alert_record = { + "timestamp": timestamp, + "data": data + } + alerts_received.append(alert_record) + + # 只保留最近100条 + if len(alerts_received) > 100: + alerts_received.pop(0) + + # 打印到控制台 + print(f"\n[{timestamp}] 收到告警:") + print(json.dumps(data, indent=2, ensure_ascii=False)) + + # 提取并显示关键信息 + if 'alerts' in data: + print("\n告警摘要:") + for alert in data['alerts']: + alert_name = alert.get('labels', {}).get('alertname', 'Unknown') + status = alert.get('status', 'Unknown') + severity = alert.get('labels', {}).get('severity', 'Unknown') + service = alert.get('labels', {}).get('service', 'N/A') + print(f" - {alert_name}: {status} (severity: {severity}, service: {service})") + + print("-" * 50) + + # 返回成功响应 + self.send_response(200) + self.send_header('Content-Type', 'application/json') + self.end_headers() + response = {"status": "success", "message": "Alert received"} + self.wfile.write(json.dumps(response).encode()) + + except Exception as e: + print(f"Error processing alert: {e}") + self.send_response(400) + self.send_header('Content-Type', 'application/json') + self.end_headers() + response = {"status": "error", "message": str(e)} + self.wfile.write(json.dumps(response).encode()) + else: + self.send_response(404) + self.end_headers() + self.wfile.write(b"Not Found") + + def do_GET(self): + """处理 GET 请求""" + if self.path == '/alerts': + # 返回接收到的告警列表 + self.send_response(200) + self.send_header('Content-Type', 'application/json') + self.end_headers() + with alerts_lock: + self.wfile.write(json.dumps(alerts_received, indent=2).encode()) + + elif self.path == '/health': + # 健康检查 + self.send_response(200) + self.end_headers() + self.wfile.write(b"OK") + + elif self.path == '/': + # 首页 + self.send_response(200) + self.send_header('Content-Type', 'text/html') + self.end_headers() + html = """ + + Webhook Server + +

Mock Webhook Server

+

Webhook endpoint: POST /v1/integrations/alertmanager/webhook

+

View alerts: GET /alerts

+

Health check: GET /health

+
+

Alerts received: {}

+ + + """.format(len(alerts_received)) + self.wfile.write(html.encode()) + else: + self.send_response(404) + self.end_headers() + self.wfile.write(b"Not Found") + + def log_message(self, format, *args): + """自定义日志格式""" + return # 禁用默认的日志输出,避免太多噪音 + +def run_server(port=8080): + """运行服务器""" + server_address = ('', port) + httpd = HTTPServer(server_address, WebhookHandler) + + print("=" * 60) + print("Mock Webhook Server 已启动") + print(f"监听地址: http://0.0.0.0:{port}") + print(f"Webhook 端点: POST /v1/integrations/alertmanager/webhook") + print(f"查看告警: GET /alerts") + print(f"健康检查: GET /health") + print("=" * 60) + print("\n等待接收告警...\n") + + try: + httpd.serve_forever() + except KeyboardInterrupt: + print("\n服务器已停止") + httpd.shutdown() + +if __name__ == '__main__': + import sys + + # 检查是否指定端口 + port = 8080 + if len(sys.argv) > 1: + try: + port = int(sys.argv[1]) + except ValueError: + print(f"无效的端口: {sys.argv[1]}") + sys.exit(1) + + # 检查端口是否被占用 + import socket + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + result = sock.connect_ex(('127.0.0.1', port)) + sock.close() + + if result == 0: + print(f"警告: 端口 {port} 已被占用") + print("你可以:") + print(f"1. 使用其他端口: python3 {sys.argv[0]} 8081") + print(f"2. 或者停止占用端口 {port} 的服务") + response = input(f"\n是否继续在端口 {port} 上启动? (y/N): ") + if response.lower() != 'y': + sys.exit(0) + + run_server(port) \ No newline at end of file diff --git a/test_complete_alert_flow.sh b/test_complete_alert_flow.sh new file mode 100755 index 0000000..5d63e3a --- /dev/null +++ b/test_complete_alert_flow.sh @@ -0,0 +1,141 @@ +#!/bin/bash + +# 禁用代理 +export no_proxy="10.210.10.33,10.99.181.164,localhost,127.0.0.1" +export NO_PROXY="10.210.10.33,10.99.181.164,localhost,127.0.0.1" + +echo "==================================================" +echo " 完整告警流程测试" +echo "==================================================" +echo "" +echo "告警流程:" +echo "Prometheus (10.210.10.33:9090)" +echo " ↓ [告警触发]" +echo "Adapter (10.210.10.33:9999)" +echo " ↓ [转发]" +echo "Webhook (10.99.181.164:8080)" +echo "" +echo "==================================================" + +# Step 1: 检查 Webhook 服务 +echo -e "\n[Step 1] 检查 Webhook 服务状态" +echo -n " 测试 webhook 端点: " +response=$(curl -s --noproxy "*" -X POST http://10.99.181.164:8080/v1/integrations/alertmanager/webhook \ + -H "Content-Type: application/json" \ + -d '{"test": "connectivity_check"}' \ + -o /dev/null -w "%{http_code}") +if [ "$response" = "200" ]; then + echo "✅ Webhook 服务正常 (HTTP 200)" +else + echo "❌ Webhook 服务异常 (HTTP $response)" + exit 1 +fi + +# Step 2: 检查 Adapter 服务 +echo -e "\n[Step 2] 检查 Adapter 服务状态" +echo -n " 健康检查: " +curl -s --noproxy "*" http://10.210.10.33:9999/-/healthy &>/dev/null && echo "✅ Healthy" || echo "❌ Unhealthy" +echo -n " 就绪检查: " +curl -s --noproxy "*" http://10.210.10.33:9999/-/ready &>/dev/null && echo "✅ Ready" || echo "❌ Not Ready" + +# Step 3: 注意事项提醒 +echo -e "\n[Step 3] 重要提醒" +echo " ⚠️ 确保 Adapter 服务已使用新配置重启" +echo " 配置文件: internal/prometheus_adapter/config/prometheus_adapter.yml" +echo " Webhook URL 应为: http://10.99.181.164:8080/v1/integrations/alertmanager/webhook" +echo "" +read -p " Adapter 服务是否已重启?(y/n): " -n 1 -r +echo "" +if [[ ! $REPLY =~ ^[Yy]$ ]]; then + echo " 请先重启 Adapter 服务后再运行测试" + exit 1 +fi + +# Step 4: 手动发送测试告警到 Adapter +echo -e "\n[Step 4] 发送测试告警到 Adapter" +timestamp=$(date -u +%Y-%m-%dT%H:%M:%SZ) +alert_data='[{ + "labels": { + "alertname": "TestAlertFlow", + "severity": "warning", + "service": "test_service", + "environment": "test", + "source": "manual_test" + }, + "annotations": { + "summary": "测试告警流程", + "description": "验证 Prometheus → Adapter → Webhook 完整链路" + }, + "startsAt": "'$timestamp'", + "generatorURL": "http://test.example.com/alerts" +}]' + +echo " 发送告警数据..." +response=$(curl -s --noproxy "*" -X POST http://10.210.10.33:9999/api/v2/alerts \ + -H "Content-Type: application/json" \ + -d "$alert_data" \ + -w "\n%{http_code}") + +http_code=$(echo "$response" | tail -n1) +body=$(echo "$response" | sed '$d') + +if [ "$http_code" = "200" ]; then + echo " ✅ Adapter 成功接收告警 (HTTP 200)" + if [ -n "$body" ]; then + echo " 响应: $body" + fi +else + echo " ❌ Adapter 接收告警失败 (HTTP $http_code)" + echo " 响应: $body" + echo "" + echo " 可能的原因:" + echo " 1. Adapter 服务未运行" + echo " 2. Adapter 配置未更新" + echo " 3. 网络连接问题" +fi + +# Step 5: 检查 Webhook 是否收到告警 +echo -e "\n[Step 5] 验证 Webhook 是否收到告警" +echo " 等待 2 秒让告警传递..." +sleep 2 + +echo " 查询 Webhook 收到的告警:" +alerts=$(curl -s --noproxy "*" http://10.99.181.164:8080/alerts) + +if [ -z "$alerts" ] || [ "$alerts" = "[]" ]; then + echo " ⚠️ Webhook 未收到任何告警" + echo "" + echo " 请检查:" + echo " 1. Adapter 配置中的 webhook URL 是否正确" + echo " 2. Adapter 服务是否已重启" + echo " 3. 查看 Adapter 日志了解详情" +else + echo " ✅ Webhook 收到告警!" + echo "" + echo " 最新的告警记录:" + echo "$alerts" | jq -r '.[-1] | " 时间: \(.timestamp)\n 告警名: \(.data.alerts[0].labels.alertname // "N/A")\n 严重性: \(.data.alerts[0].labels.severity // "N/A")\n 状态: \(.data.alerts[0].status // "N/A")"' 2>/dev/null || echo "$alerts" +fi + +# Step 6: 测试 Prometheus 的活跃告警 +echo -e "\n[Step 6] 检查 Prometheus 中的活跃告警" +echo " 查询 firing 状态的告警 (前3个):" +curl -s --noproxy "*" http://10.210.10.33:9090/api/v1/alerts | \ + jq -r '.data.alerts[] | select(.state=="firing") | " - \(.labels.alertname) (\(.labels.service // "no-service"))"' 2>/dev/null | head -3 + +echo -e "\n==================================================" +echo "测试完成!" +echo "" +echo "完整流程验证:" +echo "1. Webhook 服务: ✅ 运行中 (10.99.181.164:8080)" +echo "2. Adapter 服务: ✅ 运行中 (10.210.10.33:9999)" +echo "3. 告警传递测试: $([ "$http_code" = "200" ] && echo "✅ 成功" || echo "❌ 失败")" +echo "4. Webhook 接收: $([ -n "$alerts" ] && [ "$alerts" != "[]" ] && echo "✅ 已收到告警" || echo "⚠️ 未收到告警")" +echo "" + +if [ "$http_code" = "200" ] && [ -n "$alerts" ] && [ "$alerts" != "[]" ]; then + echo "🎉 恭喜!告警流程工作正常!" +else + echo "⚠️ 告警流程存在问题,请检查上述步骤中的错误信息" +fi + +echo "==================================================" \ No newline at end of file