From afb036b7d2660d39d44cece3a331b9dbdd56e4e0 Mon Sep 17 00:00:00 2001 From: tzwang Date: Wed, 23 Aug 2023 09:08:40 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E5=BA=A6=E7=BB=93=E6=9E=84=E4=BF=AE?= =?UTF-8?q?=E6=94=B92?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Former-commit-id: 01bea42264affc352108506de887062830b7fdf7 --- api/internal/mqs/kq/ScheduleAi.go | 27 +++++----------- api/internal/mqs/kq/ScheduleCloud.go | 29 +++++------------ api/internal/mqs/kq/ScheduleHpc.go | 36 +++++++--------------- api/internal/pkg/scheduler/hpcScheduler.go | 6 +++- api/internal/pkg/scheduler/scheduler.go | 21 ++++++++----- 5 files changed, 46 insertions(+), 73 deletions(-) diff --git a/api/internal/mqs/kq/ScheduleAi.go b/api/internal/mqs/kq/ScheduleAi.go index 02b050ed..afad6171 100644 --- a/api/internal/mqs/kq/ScheduleAi.go +++ b/api/internal/mqs/kq/ScheduleAi.go @@ -2,12 +2,8 @@ package kq import ( "context" - "encoding/json" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/pkg/scheduler" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/model" - "gitlink.org.cn/jcce-pcm/utils/tool" ) /* @@ -27,25 +23,18 @@ func NewScheduleAiMq(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleA } func (l *ScheduleAiMq) Consume(_, val string) error { - // 接受消息 - var task *types.TaskInfo - json.Unmarshal([]byte(val), &task) - - participantId, err := scheduler.MatchLabels(l.svcCtx.DbEngin, task) + // 接受消息, 根据标签筛选过滤 + aiSchdl := scheduler.NewAiScheduler(val) + schdl, err := scheduler.NewScheduler(aiSchdl, val) if err != nil { return err } - ai := model.Ai{ - ParticipantId: participantId[0], - TaskId: task.TaskId, - Status: "Saved", - YamlString: val, - } - tool.Convert(task.Metadata, &ai) + schdl.MatchLabels(l.svcCtx.DbEngin) + // 存储数据 - tx := l.svcCtx.DbEngin.Create(&ai) - if tx.Error != nil { - return tx.Error + err = schdl.SaveToDb(l.svcCtx.DbEngin) + if err != nil { + return err } return nil } diff --git a/api/internal/mqs/kq/ScheduleCloud.go b/api/internal/mqs/kq/ScheduleCloud.go index 60dfcf37..bc5b96a3 100644 --- a/api/internal/mqs/kq/ScheduleCloud.go +++ b/api/internal/mqs/kq/ScheduleCloud.go @@ -3,11 +3,8 @@ package kq import ( "bytes" "context" - "encoding/json" - "github.com/zeromicro/go-zero/core/logx" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/pkg/scheduler" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types" "gitlink.org.cn/jcce-pcm/pcm-coordinator/model" "io" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -67,28 +64,18 @@ func UnMarshalK8sStruct(yamlString string, taskId int64) model.Cloud { } func (l *ScheduleCloudMq) Consume(_, val string) error { - var task *types.TaskInfo - json.Unmarshal([]byte(val), &task) - // 根据标签筛选过滤 - participantId, err := scheduler.MatchLabels(l.svcCtx.DbEngin, task) + // 接受消息, 根据标签筛选过滤 + cloudSchdl := scheduler.NewCloudScheduler() + schdl, err := scheduler.NewScheduler(cloudSchdl, val) if err != nil { return err } - // 构建提交作业到云算的结构体 - bytes, err := json.Marshal(task.Metadata) - if err != nil { - return err - } - cloud := UnMarshalK8sStruct(string(bytes), task.TaskId) - cloud.YamlString = string(bytes) - if len(participantId) != 0 { - cloud.ParticipantId = participantId[0] - } + schdl.MatchLabels(l.svcCtx.DbEngin) + // 存储数据 - tx := l.svcCtx.DbEngin.Create(&cloud) - if tx.Error != nil { - logx.Error(tx.Error) - return tx.Error + err = schdl.SaveToDb(l.svcCtx.DbEngin) + if err != nil { + return err } return nil } diff --git a/api/internal/mqs/kq/ScheduleHpc.go b/api/internal/mqs/kq/ScheduleHpc.go index a0d24a1c..bed6b053 100644 --- a/api/internal/mqs/kq/ScheduleHpc.go +++ b/api/internal/mqs/kq/ScheduleHpc.go @@ -2,12 +2,8 @@ package kq import ( "context" - "encoding/json" - "github.com/zeromicro/go-zero/core/logx" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/pkg/scheduler" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/model" - "gitlink.org.cn/jcce-pcm/utils/tool" ) /* @@ -27,28 +23,18 @@ func NewScheduleHpcMq(ctx context.Context, svcCtx *svc.ServiceContext) *Schedule } func (l *ScheduleHpcMq) Consume(_, val string) error { - // 接受消息 - var task *types.TaskInfo - json.Unmarshal([]byte(val), &task) - //if len(task.MatchLabels) != 0 { - // participantId, err := scheduler.MatchLabels(l.svcCtx.DbEngin, task) - // if err != nil { - // return err - // } - //} - - hpc := model.Hpc{ - TaskId: task.TaskId, - Status: "Saved", - //ParticipantId: participantId[0], - YamlString: val, + // 接受消息, 根据标签筛选过滤 + hpcSchdl := scheduler.NewHpcScheduler(val) + schdl, err := scheduler.NewScheduler(hpcSchdl, val) + if err != nil { + return err } - tool.Convert(task.Metadata, &hpc) + schdl.MatchLabels(l.svcCtx.DbEngin) + // 存储数据 - tx := l.svcCtx.DbEngin.Create(&hpc) - if tx.Error != nil { - logx.Error(tx.Error) - return tx.Error + err = schdl.SaveToDb(l.svcCtx.DbEngin) + if err != nil { + return err } return nil } diff --git a/api/internal/pkg/scheduler/hpcScheduler.go b/api/internal/pkg/scheduler/hpcScheduler.go index c1f8ce1d..c1dba3cd 100644 --- a/api/internal/pkg/scheduler/hpcScheduler.go +++ b/api/internal/pkg/scheduler/hpcScheduler.go @@ -10,7 +10,11 @@ type hpcScheduler struct { yamlString string } -func (h hpcScheduler) getNewStructForDb(task *types.TaskInfo, participantIds []int) (interface{}, error) { +func NewHpcScheduler(val string) *hpcScheduler { + return &hpcScheduler{yamlString: val} +} + +func (h hpcScheduler) getNewStructForDb(task *types.TaskInfo, participantIds []int64) (interface{}, error) { hpc := model.Hpc{ TaskId: task.TaskId, Status: "Saved", diff --git a/api/internal/pkg/scheduler/scheduler.go b/api/internal/pkg/scheduler/scheduler.go index f4705602..4c953239 100644 --- a/api/internal/pkg/scheduler/scheduler.go +++ b/api/internal/pkg/scheduler/scheduler.go @@ -14,20 +14,27 @@ type scheduler struct { scheduleService scheduleService } -func NewScheduler(task *types.TaskInfo, val string) (*scheduler, error) { +func NewScheduler(scheduleService scheduleService, val string) (*scheduler, error) { + var task *types.TaskInfo err := json.Unmarshal([]byte(val), &task) if err != nil { return nil, errors.New("create scheduler failed : " + err.Error()) } - return &scheduler{task: task}, nil + return &scheduler{task: task, scheduleService: scheduleService}, nil } -func (s scheduler) matchLabels(dbEngin *gorm.DB, task *types.TaskInfo) { +func (s scheduler) MatchLabels(dbEngin *gorm.DB) { + //if len(task.MatchLabels) != 0 { + // participantId, err := scheduler.MatchLabels(l.svcCtx.DbEngin, task) + // if err != nil { + // return err + // } + //} var ids []int64 count := 0 - for key := range task.MatchLabels { + for key := range s.task.MatchLabels { var participantId []int64 - dbEngin.Raw("select participant_id from sc_participant_label_info where `key` = ? and value = ?", key, task.MatchLabels[key]).Scan(&participantId) + dbEngin.Raw("select participant_id from sc_participant_label_info where `key` = ? and value = ?", key, s.task.MatchLabels[key]).Scan(&participantId) if count == 0 { ids = participantId } @@ -40,7 +47,7 @@ func (s scheduler) matchLabels(dbEngin *gorm.DB, task *types.TaskInfo) { s.participantIds = micsSlice(ids, 1) } -func (s scheduler) saveToDb(dbEngin *gorm.DB) error { +func (s scheduler) SaveToDb(dbEngin *gorm.DB) error { if len(s.participantIds) == 0 { return errors.New("participantIds 为空") } @@ -48,7 +55,7 @@ func (s scheduler) saveToDb(dbEngin *gorm.DB) error { if err != nil { return err } - tx := dbEngin.Create(&structForDb) + tx := dbEngin.Create(structForDb) if tx.Error != nil { logx.Error(tx.Error) return tx.Error