From 344b8aaf4860e0b973b0761029adace74bbf2ab5 Mon Sep 17 00:00:00 2001 From: zhangwei <894646498@qq.com> Date: Tue, 12 Dec 2023 11:10:23 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Former-commit-id: 6c24bf2812d4f15eac35a47056ecc9a2958d81a8 --- rpc/etc/pcmcore.yaml | 31 ++++---- rpc/internal/cron/cronlogic.go | 79 --------------------- rpc/internal/logic/pcmcore/syncinfologic.go | 66 +++++++++++++++-- rpc/pb/pcmCore.proto | 13 ++-- rpc/pcmCore/pcmCore.pb.go | 13 +++- rpc/pcmcore.go | 26 +------ 6 files changed, 96 insertions(+), 132 deletions(-) delete mode 100644 rpc/internal/cron/cronlogic.go diff --git a/rpc/etc/pcmcore.yaml b/rpc/etc/pcmcore.yaml index 65c58022..163b0bab 100644 --- a/rpc/etc/pcmcore.yaml +++ b/rpc/etc/pcmcore.yaml @@ -1,15 +1,16 @@ -NacosConfig: - DataId: pcm-core-rpc.yaml - Group: DEFAULT_GROUP - ServerConfigs: -# - IpAddr: 127.0.0.1 -# Port: 8848 - - IpAddr: nacos.jcce.dev - Port: 8848 - ClientConfig: - NamespaceId: test - TimeoutMs: 5000 - NotLoadCacheAtStart: true - LogDir: - CacheDir: - LogLevel: info \ No newline at end of file +Name: pcm.core.rpc +ListenOn: 0.0.0.0:2004 + +Timeout: 15000 # 15s,设置rpc服务的响应的超时时间,若超过15s还未返回则结束请求 + +DB: + DataSource: root:uJpLd6u-J?HC1@(10.206.0.12:3306)/pcm?parseTime=true + +SnowflakeConf: + MachineId: 1 + +RedisConf: + Host: 10.206.0.7:6379 + Pass: redisPW123 + Type: node + Tls: false \ No newline at end of file diff --git a/rpc/internal/cron/cronlogic.go b/rpc/internal/cron/cronlogic.go deleted file mode 100644 index 208744f8..00000000 --- a/rpc/internal/cron/cronlogic.go +++ /dev/null @@ -1,79 +0,0 @@ -/* - - Copyright (c) [2023] [pcm] - [pcm-coordinator] is licensed under Mulan PSL v2. - You can use this software according to the terms and conditions of the Mulan PSL v2. - You may obtain a copy of Mulan PSL v2 at: - http://license.coscl.org.cn/MulanPSL2 - THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, - EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, - MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. - See the Mulan PSL v2 for more details. - -*/ - -package cron - -import ( - "github.com/zeromicro/go-zero/core/logx" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/constants" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/internal/svc" - "gorm.io/gorm" - "strings" -) - -func InitCron(svc *svc.ServiceContext) { - svc.Cron.Start() - svc.Cron.AddFunc("*/5 * * * * ?", func() { - - var tasks []models.Task - svc.DbEngin.Where("status not in ?", []string{constants.Deleted, constants.Succeeded, constants.Completed, constants.Failed}).Find(&tasks) - for _, task := range tasks { - var allStatus string - tx := svc.DbEngin.Raw("SELECT CONCAT_WS(',',GROUP_CONCAT(DISTINCT h.status) ,GROUP_CONCAT(DISTINCT a.status) ,GROUP_CONCAT(DISTINCT c.status))as status from task t left join hpc h on t.id = h.task_id left join cloud c on t.id = c.task_id left join ai a on t.id = a.task_id where t.id = ?", task.Id).Scan(&allStatus) - if tx.Error != nil { - logx.Error(tx.Error) - } - // 子状态统一则修改主任务状态 - statusArray := strings.Split(allStatus, ",") - if len(removeRepeatedElement(statusArray)) == 1 { - updateTask(svc.DbEngin, &task, statusArray[0]) - continue - } - // 子任务包含失败状态 主任务则失败 - if strings.Contains(allStatus, constants.Failed) { - updateTask(svc.DbEngin, &task, constants.Failed) - continue - } - if strings.Contains(allStatus, constants.Running) { - updateTask(svc.DbEngin, &task, constants.Running) - } - } - - }) -} - -func updateTask(dbEngin *gorm.DB, task *models.Task, status string) { - if task.Status != status { - task.Status = status - dbEngin.Updates(&task) - } -} - -func removeRepeatedElement(arr []string) (newArr []string) { - newArr = make([]string, 0) - for i := 0; i < len(arr); i++ { - repeat := false - for j := i + 1; j < len(arr); j++ { - if arr[i] == arr[j] { - repeat = true - break - } - } - if !repeat { - newArr = append(newArr, arr[i]) - } - } - return -} diff --git a/rpc/internal/logic/pcmcore/syncinfologic.go b/rpc/internal/logic/pcmcore/syncinfologic.go index 222a3562..8acb5c9e 100644 --- a/rpc/internal/logic/pcmcore/syncinfologic.go +++ b/rpc/internal/logic/pcmcore/syncinfologic.go @@ -17,8 +17,11 @@ package pcmcorelogic import ( "context" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/constants" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models" "gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/internal/svc" "gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/pcmCore" + "gorm.io/gorm" + "strings" "time" "github.com/zeromicro/go-zero/core/logx" @@ -54,20 +57,73 @@ func (l *SyncInfoLogic) SyncInfo(in *pcmCore.SyncInfoReq) (*pcmCore.SyncInfoResp switch kind { case constants.CLOUD: for _, cloudInfo := range in.CloudInfoList { - l.svcCtx.DbEngin.Exec("update cloud set status = ?,start_time = ?,running_time = ?,result = ? where participant_id = ? and task_id = ? and namespace = ? and name = ?", - cloudInfo.Status, cloudInfo.StartTime, cloudInfo.RunningTime, cloudInfo.Result, in.ParticipantId, cloudInfo.TaskId, cloudInfo.Namespace, cloudInfo.Name) + l.svcCtx.DbEngin.Exec("update cloud set status = ?,start_time = ?,result = ? where participant_id = ? and id = ?", + cloudInfo.Status, cloudInfo.StartTime, cloudInfo.Result, in.ParticipantId, cloudInfo.Id) + syncTask(l.svcCtx.DbEngin, cloudInfo.TaskId) } case constants.HPC: for _, hpcInfo := range in.HpcInfoList { - l.svcCtx.DbEngin.Exec("update hpc set status = ?,start_time = ?,running_time = ?,job_id = ? where participant_id = ? and task_id = ? and name = ?", + l.svcCtx.DbEngin.Exec("update hpc set status = ?,start_time = ?,job_id = ? where participant_id = ? and task_id = ? and name = ?", hpcInfo.Status, hpcInfo.StartTime, hpcInfo.RunningTime, hpcInfo.JobId, in.ParticipantId, hpcInfo.TaskId, hpcInfo.Name) + syncTask(l.svcCtx.DbEngin, hpcInfo.TaskId) } case constants.AI: for _, aiInfo := range in.AiInfoList { - l.svcCtx.DbEngin.Exec("update ai set status = ?,start_time = ?,running_time = ?,project_id = ?,job_id = ? where participant_id = ? and task_id = ? and name = ?", - aiInfo.Status, aiInfo.StartTime, aiInfo.RunningTime, aiInfo.ProjectId, aiInfo.JobId, in.ParticipantId, aiInfo.TaskId, aiInfo.Name) + l.svcCtx.DbEngin.Exec("update ai set status = ?,start_time = ?,project_id = ?,job_id = ? where participant_id = ? and task_id = ? and name = ?", + aiInfo.Status, aiInfo.StartTime, aiInfo.ProjectId, aiInfo.JobId, in.ParticipantId, aiInfo.TaskId, aiInfo.Name) + syncTask(l.svcCtx.DbEngin, aiInfo.TaskId) } } return &pcmCore.SyncInfoResp{}, nil } + +func syncTask(gorm *gorm.DB, taskId int64) { + + var allStatus string + tx := gorm.Raw("SELECT CONCAT_WS(',',GROUP_CONCAT(DISTINCT h.status) ,GROUP_CONCAT(DISTINCT a.status) ,GROUP_CONCAT(DISTINCT c.status))as status from task t left join hpc h on t.id = h.task_id left join cloud c on t.id = c.task_id left join ai a on t.id = a.task_id where t.id = ?", taskId).Scan(&allStatus) + if tx.Error != nil { + logx.Error(tx.Error) + } + // 子状态统一则修改主任务状态 + statusArray := strings.Split(allStatus, ",") + if len(removeRepeatedElement(statusArray)) == 1 { + updateTask(gorm, taskId, statusArray[0]) + + } + // 子任务包含失败状态 主任务则失败 + if strings.Contains(allStatus, constants.Failed) { + updateTask(gorm, taskId, constants.Failed) + + } + if strings.Contains(allStatus, constants.Running) { + updateTask(gorm, taskId, constants.Running) + } + +} + +func updateTask(gorm *gorm.DB, taskId int64, status string) { + var task models.Task + gorm.Where("id = ? ", taskId).Find(&task) + if task.Status != status { + task.Status = status + gorm.Updates(&task) + } +} + +func removeRepeatedElement(arr []string) (newArr []string) { + newArr = make([]string, 0) + for i := 0; i < len(arr); i++ { + repeat := false + for j := i + 1; j < len(arr); j++ { + if arr[i] == arr[j] { + repeat = true + break + } + } + if !repeat { + newArr = append(newArr, arr[i]) + } + } + return +} diff --git a/rpc/pb/pcmCore.proto b/rpc/pb/pcmCore.proto index 8b682075..dc27ed3b 100644 --- a/rpc/pb/pcmCore.proto +++ b/rpc/pb/pcmCore.proto @@ -42,18 +42,19 @@ message CloudInfo { int64 runningTime = 9; string result = 10; string yamlString = 11; + int64 id = 12; } message VmInfo { int64 participantId = 1; int64 taskId = 2; string name = 3; - string flavor_ref =4; - string image_ref =5; - string network_uuid=6; - string block_uuid=7; - string source_type=8; - bool delete_on_termination=9; + string flavor_ref = 4; + string image_ref = 5; + string network_uuid = 6; + string block_uuid = 7; + string source_type = 8; + bool delete_on_termination = 9; string state = 10; } diff --git a/rpc/pcmCore/pcmCore.pb.go b/rpc/pcmCore/pcmCore.pb.go index 0c230fe4..7d0b4f1a 100644 --- a/rpc/pcmCore/pcmCore.pb.go +++ b/rpc/pcmCore/pcmCore.pb.go @@ -323,6 +323,7 @@ type CloudInfo struct { RunningTime int64 `protobuf:"varint,9,opt,name=runningTime,proto3" json:"runningTime,omitempty"` Result string `protobuf:"bytes,10,opt,name=result,proto3" json:"result,omitempty"` YamlString string `protobuf:"bytes,11,opt,name=yamlString,proto3" json:"yamlString,omitempty"` + Id int64 `protobuf:"varint,12,opt,name=id,proto3" json:"id,omitempty"` } func (x *CloudInfo) Reset() { @@ -434,6 +435,13 @@ func (x *CloudInfo) GetYamlString() string { return "" } +func (x *CloudInfo) GetId() int64 { + if x != nil { + return x.Id + } + return 0 +} + type VmInfo struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -2542,7 +2550,7 @@ var file_pcmCore_proto_rawDesc = []byte{ 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x24, 0x0a, 0x0d, 0x69, 0x74, 0x65, 0x6d, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x18, 0x0f, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x69, 0x74, 0x65, 0x6d, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, - 0x22, 0xbb, 0x02, 0x0a, 0x09, 0x43, 0x6c, 0x6f, 0x75, 0x64, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x20, + 0x22, 0xcb, 0x02, 0x0a, 0x09, 0x43, 0x6c, 0x6f, 0x75, 0x64, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x20, 0x0a, 0x0b, 0x70, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69, 0x70, 0x61, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0b, 0x70, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69, 0x70, 0x61, 0x6e, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x61, 0x73, 0x6b, 0x49, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, @@ -2561,7 +2569,8 @@ var file_pcmCore_proto_rawDesc = []byte{ 0x6e, 0x67, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x1e, 0x0a, 0x0a, 0x79, 0x61, 0x6d, 0x6c, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x18, 0x0b, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x0a, 0x79, 0x61, 0x6d, 0x6c, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x22, 0xc3, + 0x28, 0x09, 0x52, 0x0a, 0x79, 0x61, 0x6d, 0x6c, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x12, 0x0e, + 0x0a, 0x02, 0x69, 0x64, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x03, 0x52, 0x02, 0x69, 0x64, 0x22, 0xc3, 0x02, 0x0a, 0x06, 0x56, 0x6d, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x24, 0x0a, 0x0d, 0x70, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69, 0x70, 0x61, 0x6e, 0x74, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0d, 0x70, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69, 0x70, 0x61, 0x6e, 0x74, 0x49, 0x64, 0x12, diff --git a/rpc/pcmcore.go b/rpc/pcmcore.go index f1353023..6212e64c 100644 --- a/rpc/pcmcore.go +++ b/rpc/pcmcore.go @@ -21,13 +21,11 @@ import ( "github.com/zeromicro/go-zero/core/service" "github.com/zeromicro/go-zero/zrpc" "gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/internal/config" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/internal/cron" participantserviceServer "gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/internal/server/participantservice" pcmcoreServer "gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/internal/server/pcmcore" "gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/internal/svc" "gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/pcmCore" "gitlink.org.cn/jcce-pcm/utils/interceptor/rpcserver" - commonConfig "gitlink.org.cn/jcce-pcm/utils/nacos" "google.golang.org/grpc" "google.golang.org/grpc/reflection" ) @@ -38,28 +36,8 @@ func main() { flag.Parse() - var bootstrapConfig commonConfig.BootstrapConfig - conf.MustLoad(*configFile, &bootstrapConfig) - - //解析业务配置 var c config.Config - nacosConfig := bootstrapConfig.NacosConfig - - serviceConfigContent := nacosConfig.InitConfig(func(data string) { - err := conf.LoadFromYamlBytes([]byte(data), &c) - if err != nil { - panic(err) - } - }) - err := conf.LoadFromYamlBytes([]byte(serviceConfigContent), &c) - if err != nil { - panic(err) - } - // start log component - logx.MustSetup(c.LogConf) - // 注册到nacos - nacosConfig.Discovery(&c.RpcServerConf) - + conf.MustLoad(*configFile, &c) ctx := svc.NewServiceContext(c) s := zrpc.MustNewServer(c.RpcServerConf, func(grpcServer *grpc.Server) { @@ -74,8 +52,6 @@ func main() { s.AddUnaryInterceptors(rpcserver.LoggerInterceptor) defer s.Stop() - // 初始化定时任务 - cron.InitCron(ctx) logx.Infof("Starting rpc server at %s...\n", c.ListenOn) s.Start() }