diff --git a/api/etc/pcm.yaml b/api/etc/pcm.yaml index 169e9e35..c18ddba8 100644 --- a/api/etc/pcm.yaml +++ b/api/etc/pcm.yaml @@ -4,10 +4,13 @@ NacosConfig: ServerConfigs: # - IpAddr: 127.0.0.1 # Port: 8848 - - IpAddr: nacos.jcce.dev +# - IpAddr: 10.101.15.7 +# Port: 8848 + - IpAddr: 119.45.100.73 Port: 8848 ClientConfig: - NamespaceId: test + NamespaceId: tzwang +# NamespaceId: test TimeoutMs: 5000 NotLoadCacheAtStart: true LogDir: diff --git a/api/internal/config/config.go b/api/internal/config/config.go index dc137e16..c731d85f 100644 --- a/api/internal/config/config.go +++ b/api/internal/config/config.go @@ -38,6 +38,7 @@ type Config struct { CephRpcConf zrpc.RpcClientConf OpenstackRpcConf zrpc.RpcClientConf OctopusRpcConf zrpc.RpcClientConf + PcmCoreRpcConf zrpc.RpcClientConf NexusUrl string JccScheduleUrl string MinioConf struct { diff --git a/api/internal/mqs/ScheduleAi.go b/api/internal/mqs/ScheduleAi.go index c3d49f6e..ec5c942e 100644 --- a/api/internal/mqs/ScheduleAi.go +++ b/api/internal/mqs/ScheduleAi.go @@ -39,7 +39,7 @@ func NewAiMq(ctx context.Context, svcCtx *svc.ServiceContext) *AiQueue { func (l *AiQueue) Consume(val string) error { // 接受消息, 根据标签筛选过滤 aiSchdl := scheduler2.NewAiScheduler(val) - schdl, err := scheduler2.NewScheduler(aiSchdl, val, l.svcCtx.DbEngin) + schdl, err := scheduler2.NewScheduler(aiSchdl, val, l.svcCtx.DbEngin, nil) if err != nil { return err } diff --git a/api/internal/mqs/ScheduleCloud.go b/api/internal/mqs/ScheduleCloud.go index 74c76020..d2e278a2 100644 --- a/api/internal/mqs/ScheduleCloud.go +++ b/api/internal/mqs/ScheduleCloud.go @@ -39,7 +39,7 @@ func NewCloudMq(ctx context.Context, svcCtx *svc.ServiceContext) *CloudMq { func (l *CloudMq) Consume(val string) error { // 接受消息, 根据标签筛选过滤 cloudScheduler := scheduler.NewCloudScheduler() - schdl, err := scheduler.NewScheduler(cloudScheduler, val, l.svcCtx.DbEngin) + schdl, err := scheduler.NewScheduler(cloudScheduler, val, l.svcCtx.DbEngin, l.svcCtx.ParticipantRpc) if err != nil { return err } diff --git a/api/internal/mqs/ScheduleHpc.go b/api/internal/mqs/ScheduleHpc.go index 7d0a8eb0..1e188e92 100644 --- a/api/internal/mqs/ScheduleHpc.go +++ b/api/internal/mqs/ScheduleHpc.go @@ -39,7 +39,7 @@ func NewHpcMq(ctx context.Context, svcCtx *svc.ServiceContext) *HpcMq { func (l *HpcMq) Consume(val string) error { // 接受消息, 根据标签筛选过滤 hpcSchdl := scheduler2.NewHpcScheduler(val) - schdl, err := scheduler2.NewScheduler(hpcSchdl, val, l.svcCtx.DbEngin) + schdl, err := scheduler2.NewScheduler(hpcSchdl, val, l.svcCtx.DbEngin, nil) if err != nil { return err } diff --git a/api/internal/svc/servicecontext.go b/api/internal/svc/servicecontext.go index d1bf064a..2a20ab4c 100644 --- a/api/internal/svc/servicecontext.go +++ b/api/internal/svc/servicecontext.go @@ -27,6 +27,7 @@ import ( "gitlink.org.cn/jcce-pcm/pcm-ac/hpcacclient" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/config" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/client/participantservice" "gitlink.org.cn/jcce-pcm/pcm-participant-ceph/cephclient" "gitlink.org.cn/jcce-pcm/pcm-participant-kubernetes/kubernetesclient" "gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/imagesservice" @@ -55,6 +56,7 @@ type ServiceContext struct { Downloader *s3manager.Downloader Uploader *s3manager.Uploader K8sRpc map[int64]kubernetesclient.Kubernetes + ParticipantRpc participantservice.ParticipantService } func NewServiceContext(c config.Config) *ServiceContext { @@ -101,6 +103,7 @@ func NewServiceContext(c config.Config) *ServiceContext { OctopusRpc: octopusclient.NewOctopus(zrpc.MustNewClient(c.OctopusRpcConf)), OpenstackRpc: openstackclient.NewOpenstack(zrpc.MustNewClient(c.OpenstackRpcConf)), K8sRpc: make(map[int64]kubernetesclient.Kubernetes), + ParticipantRpc: participantservice.NewParticipantService(zrpc.MustNewClient(c.PcmCoreRpcConf)), DockerClient: dockerClient, Downloader: downloader, Uploader: uploader, diff --git a/pkg/scheduler/aiScheduler.go b/pkg/scheduler/aiScheduler.go index b84b7e17..f0eb4cfa 100644 --- a/pkg/scheduler/aiScheduler.go +++ b/pkg/scheduler/aiScheduler.go @@ -41,7 +41,7 @@ func (as *aiScheduler) getNewStructForDb(task *response.TaskInfo, participantId return ai, nil } -func (as *aiScheduler) pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Task, error) { +func (as *aiScheduler) pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Strategy, error) { return nil, nil } diff --git a/pkg/scheduler/cloudScheduler.go b/pkg/scheduler/cloudScheduler.go index cb270e24..c6ff47a1 100644 --- a/pkg/scheduler/cloudScheduler.go +++ b/pkg/scheduler/cloudScheduler.go @@ -36,7 +36,7 @@ func NewCloudScheduler() *cloudScheduler { return &cloudScheduler{} } -func (cs *cloudScheduler) pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Task, error) { +func (cs *cloudScheduler) pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Strategy, error) { //参数为空返回 nil if len(providers) == 0 || task == nil { return nil, nil @@ -48,7 +48,7 @@ func (cs *cloudScheduler) pickOptimalStrategy(task *algo.Task, providers ...*alg if err != nil { return nil, err } - return taskResult, nil + return taskResult.MaxscoreStrategy, nil } func (cs *cloudScheduler) getNewStructForDb(task *response.TaskInfo, participantId int64) (interface{}, error) { diff --git a/pkg/scheduler/commonScheduler.go b/pkg/scheduler/common.go similarity index 98% rename from pkg/scheduler/commonScheduler.go rename to pkg/scheduler/common.go index f846e688..ef6a78f5 100644 --- a/pkg/scheduler/commonScheduler.go +++ b/pkg/scheduler/common.go @@ -24,7 +24,7 @@ import ( type scheduleService interface { getNewStructForDb(task *response.TaskInfo, participantId int64) (interface{}, error) - pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Task, error) + pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Strategy, error) genTaskAndProviders(task *response.TaskInfo, dbEngin *gorm.DB) (*algo.Task, []*algo.Provider) } diff --git a/pkg/scheduler/hpcScheduler.go b/pkg/scheduler/hpcScheduler.go index e9649c4d..10bcb78f 100644 --- a/pkg/scheduler/hpcScheduler.go +++ b/pkg/scheduler/hpcScheduler.go @@ -42,7 +42,7 @@ func (h *hpcScheduler) getNewStructForDb(task *response.TaskInfo, participantId return hpc, nil } -func (h *hpcScheduler) pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Task, error) { +func (h *hpcScheduler) pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Strategy, error) { return nil, nil } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index f6ee8ee4..ac2351aa 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -15,28 +15,36 @@ package scheduler import ( + "context" "encoding/json" "github.com/pkg/errors" "github.com/zeromicro/go-zero/core/logx" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/algo" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/client/participantservice" "gorm.io/gorm" ) +type Replicas int64 + +type ParticipantId int64 + type scheduler struct { task *response.TaskInfo participantIds []int64 scheduleService scheduleService dbEngin *gorm.DB + result map[ParticipantId]Replicas + participantRpc participantservice.ParticipantService } -func NewScheduler(scheduleService scheduleService, val string, dbEngin *gorm.DB) (*scheduler, error) { +func NewScheduler(scheduleService scheduleService, val string, dbEngin *gorm.DB, participantRpc participantservice.ParticipantService) (*scheduler, error) { var task *response.TaskInfo err := json.Unmarshal([]byte(val), &task) if err != nil { return nil, errors.New("create scheduler failed : " + err.Error()) } - return &scheduler{task: task, scheduleService: scheduleService, dbEngin: dbEngin}, nil + return &scheduler{task: task, scheduleService: scheduleService, dbEngin: dbEngin, participantRpc: participantRpc}, nil } func (s *scheduler) MatchLabels() { @@ -92,19 +100,10 @@ func (s *scheduler) AssignAndSchedule() error { return nil } - //调度结果 ParticipantId - for i, e := range strategy.ResourcePerTask { - - if len(e) != 0 { - for _, ej := range e { - if ej == 1 { - s.task.ParticipantId = providerList[i].Pid - break - } - } - } else { - continue - } + //调度结果 + err = s.assignReplicasToResult(strategy, providerList) + if err != nil { + return err } return nil @@ -130,3 +129,65 @@ func (s *scheduler) SaveToDb() error { func (s *scheduler) obtainParamsforStrategy() (*algo.Task, []*algo.Provider) { return s.scheduleService.genTaskAndProviders(s.task, s.dbEngin) } + +func (s *scheduler) checkAvailableParticipants() error { + resp, err := s.participantRpc.ListParticipant(context.Background(), nil) + if err != nil { + return err + } + + if resp.Code != 200 { + return errors.New("集群列表查询失败") + } + + var workingIds []int64 + for _, e := range resp.Data { + if e.ClientState == "UNKNOWN" { + continue + } + workingIds = append(workingIds, e.ParticipantId) + } + + for id, _ := range s.result { + if !contains(workingIds, int64(id)) { + return errors.Errorf("集群 %d 不可用", id) + } + } + + if len(s.result) == 0 { + return errors.New("可用集群为空") + } + + return nil +} + +func (s *scheduler) assignReplicasToResult(strategy *algo.Strategy, providerList []*algo.Provider) error { + + if len(strategy.Tasksolution) == 0 { + return errors.New("调度失败, 未能获取调度结果") + } + + for i, e := range strategy.Tasksolution { + if e == 0 { + continue + } + s.result[ParticipantId(providerList[i].Pid)] = Replicas(e) + } + + // 查询集群是否可用 + err := s.checkAvailableParticipants() + if err != nil { + return err + } + + return nil +} + +func contains(s []int64, e int64) bool { + for _, a := range s { + if a == e { + return true + } + } + return false +}