From bc7f7f5a1a604f8cc22131ad0244134e139eae55 Mon Sep 17 00:00:00 2001 From: zhangwei <894646498@qq.com> Date: Thu, 24 Aug 2023 14:51:56 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E5=BA=A6=E6=A8=A1=E5=9D=97=E4=BF=AE?= =?UTF-8?q?=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Former-commit-id: 3df2c09c694bc7c4034e9903e484ca5d8adbd6ec --- api/internal/config/config.go | 6 ++++++ api/internal/mqs/kq/ScheduleCloud.go | 8 ++++---- api/internal/pkg/scheduler/aiScheduler.go | 4 ++-- api/internal/pkg/scheduler/cloudScheduler.go | 8 +++++--- api/internal/pkg/scheduler/hpcScheduler.go | 4 ++-- api/internal/pkg/scheduler/scheduler.go | 6 +++--- api/internal/svc/servicecontext.go | 7 +++++++ 7 files changed, 29 insertions(+), 14 deletions(-) diff --git a/api/internal/config/config.go b/api/internal/config/config.go index 8ffc6adb..f01e508e 100644 --- a/api/internal/config/config.go +++ b/api/internal/config/config.go @@ -44,4 +44,10 @@ type Config struct { Username string Password string } + SnowflakeConf SnowflakeConf +} + +// SnowflakeConf 雪花算法机器id配置 +type SnowflakeConf struct { + MachineId int64 `json:"machineId"` } diff --git a/api/internal/mqs/kq/ScheduleCloud.go b/api/internal/mqs/kq/ScheduleCloud.go index bc5b96a3..64e192ab 100644 --- a/api/internal/mqs/kq/ScheduleCloud.go +++ b/api/internal/mqs/kq/ScheduleCloud.go @@ -65,15 +65,15 @@ func UnMarshalK8sStruct(yamlString string, taskId int64) model.Cloud { func (l *ScheduleCloudMq) Consume(_, val string) error { // 接受消息, 根据标签筛选过滤 - cloudSchdl := scheduler.NewCloudScheduler() - schdl, err := scheduler.NewScheduler(cloudSchdl, val) + cloudScheduler := scheduler.NewCloudScheduler() + scheduler, err := scheduler.NewScheduler(cloudScheduler, val) if err != nil { return err } - schdl.MatchLabels(l.svcCtx.DbEngin) + scheduler.MatchLabels(l.svcCtx.DbEngin) // 存储数据 - err = schdl.SaveToDb(l.svcCtx.DbEngin) + err = scheduler.SaveToDb(l.svcCtx.DbEngin) if err != nil { return err } diff --git a/api/internal/pkg/scheduler/aiScheduler.go b/api/internal/pkg/scheduler/aiScheduler.go index 2b181662..e2a0a46c 100644 --- a/api/internal/pkg/scheduler/aiScheduler.go +++ b/api/internal/pkg/scheduler/aiScheduler.go @@ -15,7 +15,7 @@ func NewAiScheduler(val string) *aiScheduler { return &aiScheduler{yamlString: val} } -func (cs aiScheduler) getNewStructForDb(task *types.TaskInfo, participantIds []int64) (interface{}, error) { +func (cs *aiScheduler) getNewStructForDb(task *types.TaskInfo, participantIds []int64) (interface{}, error) { ai := model.Ai{ ParticipantId: participantIds[0], TaskId: task.TaskId, @@ -26,6 +26,6 @@ func (cs aiScheduler) getNewStructForDb(task *types.TaskInfo, participantIds []i return ai, nil } -func (cs aiScheduler) pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Task, error) { +func (cs *aiScheduler) pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Task, error) { return nil, nil } diff --git a/api/internal/pkg/scheduler/cloudScheduler.go b/api/internal/pkg/scheduler/cloudScheduler.go index 4b2c208d..3d2407c6 100644 --- a/api/internal/pkg/scheduler/cloudScheduler.go +++ b/api/internal/pkg/scheduler/cloudScheduler.go @@ -6,6 +6,7 @@ import ( "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/algo" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types" "gitlink.org.cn/jcce-pcm/pcm-coordinator/model" + "gitlink.org.cn/jcce-pcm/utils/tool" "io" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" @@ -20,7 +21,7 @@ func NewCloudScheduler() *cloudScheduler { return &cloudScheduler{} } -func (cs cloudScheduler) pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Task, error) { +func (cs *cloudScheduler) pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Task, error) { strategy := algo.NewK8sStrategy(task, providers...) taskResult, err := algo.ScheduleWithFullCollaboration(strategy, strategy.ProviderList) if err != nil { @@ -29,12 +30,13 @@ func (cs cloudScheduler) pickOptimalStrategy(task *algo.Task, providers ...*algo return taskResult, nil } -func (cs cloudScheduler) getNewStructForDb(task *types.TaskInfo, participantIds []int64) (interface{}, error) { +func (cs *cloudScheduler) getNewStructForDb(task *types.TaskInfo, participantIds []int64) (interface{}, error) { bytes, err := json.Marshal(task.Metadata) if err != nil { return nil, err } cloud := cs.UnMarshalK8sStruct(string(bytes), task.TaskId) + cloud.Id = tool.GenSnowflakeID() cloud.YamlString = string(bytes) if len(participantIds) != 0 { cloud.ParticipantId = participantIds[0] @@ -42,7 +44,7 @@ func (cs cloudScheduler) getNewStructForDb(task *types.TaskInfo, participantIds return cloud, nil } -func (cs cloudScheduler) UnMarshalK8sStruct(yamlString string, taskId int64) model.Cloud { +func (cs *cloudScheduler) UnMarshalK8sStruct(yamlString string, taskId int64) model.Cloud { var cloud model.Cloud d := kyaml.NewYAMLOrJSONDecoder(bytes.NewBufferString(yamlString), 4096) var err error diff --git a/api/internal/pkg/scheduler/hpcScheduler.go b/api/internal/pkg/scheduler/hpcScheduler.go index 15443e0d..0f8f7620 100644 --- a/api/internal/pkg/scheduler/hpcScheduler.go +++ b/api/internal/pkg/scheduler/hpcScheduler.go @@ -15,7 +15,7 @@ func NewHpcScheduler(val string) *hpcScheduler { return &hpcScheduler{yamlString: val} } -func (h hpcScheduler) getNewStructForDb(task *types.TaskInfo, participantIds []int64) (interface{}, error) { +func (h *hpcScheduler) getNewStructForDb(task *types.TaskInfo, participantIds []int64) (interface{}, error) { hpc := model.Hpc{ TaskId: task.TaskId, Status: "Saved", @@ -26,6 +26,6 @@ func (h hpcScheduler) getNewStructForDb(task *types.TaskInfo, participantIds []i return hpc, nil } -func (cs hpcScheduler) pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Task, error) { +func (cs *hpcScheduler) pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Task, error) { return nil, nil } diff --git a/api/internal/pkg/scheduler/scheduler.go b/api/internal/pkg/scheduler/scheduler.go index 17ede9ec..2cbafe85 100644 --- a/api/internal/pkg/scheduler/scheduler.go +++ b/api/internal/pkg/scheduler/scheduler.go @@ -23,7 +23,7 @@ func NewScheduler(scheduleService scheduleService, val string) (*scheduler, erro return &scheduler{task: task, scheduleService: scheduleService}, nil } -func (s scheduler) MatchLabels(dbEngin *gorm.DB) { +func (s *scheduler) MatchLabels(dbEngin *gorm.DB) { //if len(task.MatchLabels) != 0 { // participantId, err := scheduler.MatchLabels(l.svcCtx.DbEngin, task) // if err != nil { @@ -47,11 +47,11 @@ func (s scheduler) MatchLabels(dbEngin *gorm.DB) { s.participantIds = micsSlice(ids, 1) } -func (s scheduler) AssignAndSchedule() { +func (s *scheduler) AssignAndSchedule() { } -func (s scheduler) SaveToDb(dbEngin *gorm.DB) error { +func (s *scheduler) SaveToDb(dbEngin *gorm.DB) error { if len(s.participantIds) == 0 { return errors.New("participantIds 为空") } diff --git a/api/internal/svc/servicecontext.go b/api/internal/svc/servicecontext.go index 25d72ce1..5867d93e 100644 --- a/api/internal/svc/servicecontext.go +++ b/api/internal/svc/servicecontext.go @@ -17,6 +17,7 @@ import ( "gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/modelartsclient" "gitlink.org.cn/jcce-pcm/pcm-participant-octopus/octopusclient" "gitlink.org.cn/jcce-pcm/pcm-participant-slurm/hpcthclient" + "gitlink.org.cn/jcce-pcm/utils/tool" "gorm.io/driver/mysql" "gorm.io/gorm" "gorm.io/gorm/schema" @@ -49,6 +50,12 @@ func NewServiceContext(c config.Config) *ServiceContext { DisableSSL: aws.Bool(false), //是否禁用https,这里表示不禁用,即使用HTTPS S3ForcePathStyle: aws.Bool(true), //使用路径样式而非虚拟主机样式,区别请参考:https://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html }) + //添加snowflake支持 + err := tool.InitSnowflake(c.SnowflakeConf.MachineId) + if err != nil { + logx.Errorf("InitSnowflake err: ", err) + panic("InitSnowflake err") + } downloader := s3manager.NewDownloader(session) uploader := s3manager.NewUploader(session) //启动Gorm支持