From 190c158322988847d70d1f7085a54a12825fdd71 Mon Sep 17 00:00:00 2001 From: tzwang Date: Fri, 26 Apr 2024 18:19:44 +0800 Subject: [PATCH 1/8] added get algorithmcode funcs Former-commit-id: ed06d1cb96131215df04fed7b4ffbf352d01477a --- api/internal/scheduler/service/collector/collector.go | 2 ++ api/internal/storeLink/modelarts.go | 8 ++++++++ api/internal/storeLink/octopus.go | 8 ++++++++ api/internal/storeLink/shuguangai.go | 8 ++++++++ go.mod | 2 +- go.sum | 4 ++-- 6 files changed, 29 insertions(+), 3 deletions(-) diff --git a/api/internal/scheduler/service/collector/collector.go b/api/internal/scheduler/service/collector/collector.go index e313baff..99d34b51 100644 --- a/api/internal/scheduler/service/collector/collector.go +++ b/api/internal/scheduler/service/collector/collector.go @@ -7,6 +7,8 @@ type AiCollector interface { GetDatasetsSpecs(ctx context.Context) ([]*DatasetsSpecs, error) GetAlgorithms(ctx context.Context) ([]*Algorithm, error) GetTrainingTaskLog(ctx context.Context, taskId string, instanceNum string) (string, error) + DownloadAlgorithmCode(ctx context.Context, resourceType string, taskType string, dataset string, algorithm string) (string, error) + UploadAlgorithmCode(ctx context.Context, resourceType string, taskType string, dataset string, algorithm string, code string) error } type ResourceStats struct { diff --git a/api/internal/storeLink/modelarts.go b/api/internal/storeLink/modelarts.go index 5a9bed87..84bc0dbf 100644 --- a/api/internal/storeLink/modelarts.go +++ b/api/internal/storeLink/modelarts.go @@ -162,6 +162,14 @@ func (m *ModelArtsLink) GetAlgorithms(ctx context.Context) ([]*collector.Algorit return nil, nil } +func (m *ModelArtsLink) DownloadAlgorithmCode(ctx context.Context, resourceType string, taskType string, dataset string, algorithm string) (string, error) { + return "", nil +} + +func (m *ModelArtsLink) UploadAlgorithmCode(ctx context.Context, resourceType string, taskType string, dataset string, algorithm string, code string) error { + return nil +} + func (m *ModelArtsLink) GetTrainingTaskLog(ctx context.Context, taskId string, instanceNum string) (string, error) { return "", nil } diff --git a/api/internal/storeLink/octopus.go b/api/internal/storeLink/octopus.go index e052f637..22781e2f 100644 --- a/api/internal/storeLink/octopus.go +++ b/api/internal/storeLink/octopus.go @@ -337,6 +337,14 @@ func (o *OctopusLink) GetAlgorithms(ctx context.Context) ([]*collector.Algorithm return algorithms, nil } +func (o *OctopusLink) DownloadAlgorithmCode(ctx context.Context, resourceType string, taskType string, dataset string, algorithm string) (string, error) { + return "", nil +} + +func (o *OctopusLink) UploadAlgorithmCode(ctx context.Context, resourceType string, taskType string, dataset string, algorithm string, code string) error { + return nil +} + func (o *OctopusLink) GetTrainingTaskLog(ctx context.Context, taskId string, instanceNum string) (string, error) { instance, err := strconv.ParseInt(instanceNum, 10, 32) if err != nil { diff --git a/api/internal/storeLink/shuguangai.go b/api/internal/storeLink/shuguangai.go index b07c3401..4a84cea4 100644 --- a/api/internal/storeLink/shuguangai.go +++ b/api/internal/storeLink/shuguangai.go @@ -447,6 +447,14 @@ func (s *ShuguangAi) GetAlgorithms(ctx context.Context) ([]*collector.Algorithm, return algorithms, nil } +func (s *ShuguangAi) DownloadAlgorithmCode(ctx context.Context, resourceType string, taskType string, dataset string, algorithm string) (string, error) { + return "", nil +} + +func (s *ShuguangAi) UploadAlgorithmCode(ctx context.Context, resourceType string, taskType string, dataset string, algorithm string, code string) error { + return nil +} + func (s *ShuguangAi) GetTrainingTaskLog(ctx context.Context, taskId string, instanceNum string) (string, error) { req := &hpcAC.GetInstanceLogReq{ TaskId: taskId, diff --git a/go.mod b/go.mod index a46abb60..fb46a881 100644 --- a/go.mod +++ b/go.mod @@ -24,7 +24,7 @@ require ( github.com/robfig/cron/v3 v3.0.1 github.com/rs/zerolog v1.28.0 github.com/zeromicro/go-zero v1.6.3 - gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240420083915-58d6e2958aeb + gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240426095603-549fefd8bece gitlink.org.cn/JointCloud/pcm-kubernetes v0.0.0-20240301071143-347480abff2c gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240424085753-6899615e9142 gitlink.org.cn/JointCloud/pcm-openstack v0.0.0-20240403033338-e7edabad4203 diff --git a/go.sum b/go.sum index 6f886672..c8224c75 100644 --- a/go.sum +++ b/go.sum @@ -1078,8 +1078,8 @@ github.com/yuin/gopher-lua v1.1.0/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7 github.com/zeromicro/go-zero v1.5.1/go.mod h1:bGYm4XWsGN9GhDsO2O2BngpVoWjf3Eog2a5hUOMhlXs= github.com/zeromicro/go-zero v1.6.3 h1:OL0NnHD5LdRNDolfcK9vUkJt7K8TcBE3RkzfM8poOVw= github.com/zeromicro/go-zero v1.6.3/go.mod h1:XZL435ZxVi9MSXXtw2MRQhHgx6OoX3++MRMOE9xU70c= -gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240420083915-58d6e2958aeb h1:k6mNEWKp+haQUaK2dWs/rI9OKgzJHY1/9KNKuBDN0Vw= -gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240420083915-58d6e2958aeb/go.mod h1:w3Nb5TNymCItQ7K3x4Q0JLuoq9OerwAzAWT2zsPE9Xo= +gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240426095603-549fefd8bece h1:W3yBnvAVV8dlRNQKYD6Mf8ySRrYsP0tPk7JjvqZzNHQ= +gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240426095603-549fefd8bece/go.mod h1:w3Nb5TNymCItQ7K3x4Q0JLuoq9OerwAzAWT2zsPE9Xo= gitlink.org.cn/JointCloud/pcm-kubernetes v0.0.0-20240301071143-347480abff2c h1:2Wl/hvaSFjh6fmCSIQhjkr9llMRREQeqcXNLZ/HPY18= gitlink.org.cn/JointCloud/pcm-kubernetes v0.0.0-20240301071143-347480abff2c/go.mod h1:lSRfGs+PxFvw7CcndHWRd6UlLlGrZn0b0hp5cfaMNGw= gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240424085753-6899615e9142 h1:+po0nesBDSWsgCySBG7eEXk7i9Ytd58wqvjL1M9y6d8= From 9bd725380e77ab2a8c421459dddac308fa245f9f Mon Sep 17 00:00:00 2001 From: tzwang Date: Sun, 28 Apr 2024 18:02:29 +0800 Subject: [PATCH 2/8] added ai center overview apis Former-commit-id: 02ea4ae3c93275c3c76f29aecba6ac7be32ef555 --- api/desc/ai/pcm-ai.api | 38 +++++++++++++++++++++++++++++ api/desc/pcm.api | 19 +++++++++++++++ api/desc/schedule/pcm-schedule.api | 3 +++ api/internal/scheduler/scheduler.go | 29 +++------------------- 4 files changed, 63 insertions(+), 26 deletions(-) diff --git a/api/desc/ai/pcm-ai.api b/api/desc/ai/pcm-ai.api index fcaf9d30..23cfff29 100644 --- a/api/desc/ai/pcm-ai.api +++ b/api/desc/ai/pcm-ai.api @@ -1697,6 +1697,44 @@ PayloadCreateTrainJob{ jobId string `json:"jobId,optional"` } ********************/ + + /******************Ai Center overview*************************/ + CenterOverviewResp { + CenterNum int32 `json:"totalCenters,optional"` + TaskNum int32 `json:"totalTasks,optional"` + CardNum int32 `json:"totalCards,optional"` + PowerInTops float64 `json:"totalPower,optional"` + } + + CenterQueueingResp { + Current []*CenterQueue `json:"current,optional"` + History []*CenterQueue `json:"history,optional"` + } + + CenterQueue { + Name string `json:"name,optional"` + QueueingNum int32 `json:"num,optional"` + } + + CenterListResp { + List []*Center `json:"centerList,optional"` + } + + Center { + Name string `json:"name,optional"` + StackName string `json:"stack,optional"` + Version string `json:"version,optional"` + } + + CenterTaskListResp { + List []*AiTask `json:"taskList,optional"` + } + + AiTask { + Name string `json:"name,optional"` + status string `json:"status,optional"` + TimeElapsed int32 `json:"elapsed,optional"` + } ) /******************create TrainIngJob end*************************/ diff --git a/api/desc/pcm.api b/api/desc/pcm.api index 12d338a0..c6734f1f 100644 --- a/api/desc/pcm.api +++ b/api/desc/pcm.api @@ -219,6 +219,22 @@ service pcm { group: ai ) service pcm { + @doc "智算中心概览" + @handler getCenterOverviewHandler + get /ai/getCenterOverview returns (CenterOverviewResp) + + @doc "智算中心排队状况" + @handler getCenterQueueingHandler + get /ai/getCenterQueueing returns (CenterQueueingResp) + + @doc "智算中心列表" + @handler getCenterListHandler + get /ai/getCenterList returns (CenterListResp) + + @doc "智算中心任务列表" + @handler getCenterTaskListHandler + get /ai/getCenterTaskList returns (CenterTaskListResp) + @doc "查询数据集列表" @handler listDataSetHandler get /ai/listDataSet/:projectId (DataSetReq) returns (DataSetResp) @@ -927,6 +943,9 @@ service pcm { @handler ScheduleSubmitHandler post /schedule/submit (ScheduleReq) returns (ScheduleResp) + + @handler ScheduleGetOverviewHandler + post /schedule/getOverview returns (ScheduleOverviewResp) } @server( diff --git a/api/desc/schedule/pcm-schedule.api b/api/desc/schedule/pcm-schedule.api index 02783746..a3068a25 100644 --- a/api/desc/schedule/pcm-schedule.api +++ b/api/desc/schedule/pcm-schedule.api @@ -24,6 +24,9 @@ type ( Msg string `json:"msg"` } + ScheduleOverviewResp { + } + AiOption { TaskName string `json:"taskName"` AdapterId string `json:"adapterId"` diff --git a/api/internal/scheduler/scheduler.go b/api/internal/scheduler/scheduler.go index d214e76a..bbdb1f23 100644 --- a/api/internal/scheduler/scheduler.go +++ b/api/internal/scheduler/scheduler.go @@ -129,42 +129,19 @@ func (s *Scheduler) TempAssign() error { } func (s *Scheduler) AssignAndSchedule(ss SubSchedule) (interface{}, error) { - //// 已指定 ParticipantId - //if s.task.ParticipantId != 0 { - // return nil - //} - //// 标签匹配以及后,未找到ParticipantIds - //if len(s.participantIds) == 0 { - // return errors.New("未找到匹配的ParticipantIds") - //} - // - //// 指定或者标签匹配的结果只有一个集群,给任务信息指定 - //if len(s.participantIds) == 1 { - // s.task.ParticipantId = s.participantIds[0] - // //replicas := s.task.Metadata.(map[string]interface{})["spec"].(map[string]interface{})["replicas"].(float64) - // //result := make(map[int64]string) - // //result[s.participantIds[0]] = strconv.FormatFloat(replicas, 'f', 2, 64) - // //s.result = result - // - // return nil - //} - + //choose strategy strategy, err := ss.PickOptimalStrategy() if err != nil { return nil, err } + //schedule clusters, err := strategy.Schedule() if err != nil { return nil, err } - //集群数量不满足,指定到标签匹配后第一个集群 - //if len(providerList) < 2 { - // s.task.ParticipantId = s.participantIds[0] - // return nil - //} - + //assign tasks to clusters resp, err := ss.AssignTask(clusters) if err != nil { return nil, err From 46913fb6775693f68a058a0568c5eb4bb33fcfbe Mon Sep 17 00:00:00 2001 From: tzwang Date: Sun, 28 Apr 2024 18:21:26 +0800 Subject: [PATCH 3/8] generate ai center overview logic Former-commit-id: 1f661622ed2630972e98a283c9020a9e0fa18dad --- api/desc/ai/pcm-ai.api | 4 +- .../handler/ai/getcenterlisthandler.go | 21 ++++++++++ .../handler/ai/getcenteroverviewhandler.go | 21 ++++++++++ .../handler/ai/getcenterqueueinghandler.go | 21 ++++++++++ .../handler/ai/getcentertasklisthandler.go | 21 ++++++++++ api/internal/handler/routes.go | 25 ++++++++++++ .../schedule/schedulegetoverviewhandler.go | 21 ++++++++++ api/internal/logic/ai/getcenterlistlogic.go | 30 ++++++++++++++ .../logic/ai/getcenteroverviewlogic.go | 30 ++++++++++++++ .../logic/ai/getcenterqueueinglogic.go | 30 ++++++++++++++ .../logic/ai/getcentertasklistlogic.go | 30 ++++++++++++++ .../schedule/schedulegetoverviewlogic.go | 30 ++++++++++++++ api/internal/types/types.go | 40 +++++++++++++++++++ 13 files changed, 322 insertions(+), 2 deletions(-) create mode 100644 api/internal/handler/ai/getcenterlisthandler.go create mode 100644 api/internal/handler/ai/getcenteroverviewhandler.go create mode 100644 api/internal/handler/ai/getcenterqueueinghandler.go create mode 100644 api/internal/handler/ai/getcentertasklisthandler.go create mode 100644 api/internal/handler/schedule/schedulegetoverviewhandler.go create mode 100644 api/internal/logic/ai/getcenterlistlogic.go create mode 100644 api/internal/logic/ai/getcenteroverviewlogic.go create mode 100644 api/internal/logic/ai/getcenterqueueinglogic.go create mode 100644 api/internal/logic/ai/getcentertasklistlogic.go create mode 100644 api/internal/logic/schedule/schedulegetoverviewlogic.go diff --git a/api/desc/ai/pcm-ai.api b/api/desc/ai/pcm-ai.api index 23cfff29..b0cf10fe 100644 --- a/api/desc/ai/pcm-ai.api +++ b/api/desc/ai/pcm-ai.api @@ -1717,10 +1717,10 @@ PayloadCreateTrainJob{ } CenterListResp { - List []*Center `json:"centerList,optional"` + List []*AiCenter `json:"centerList,optional"` } - Center { + AiCenter { Name string `json:"name,optional"` StackName string `json:"stack,optional"` Version string `json:"version,optional"` diff --git a/api/internal/handler/ai/getcenterlisthandler.go b/api/internal/handler/ai/getcenterlisthandler.go new file mode 100644 index 00000000..40a7dc01 --- /dev/null +++ b/api/internal/handler/ai/getcenterlisthandler.go @@ -0,0 +1,21 @@ +package ai + +import ( + "net/http" + + "github.com/zeromicro/go-zero/rest/httpx" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/logic/ai" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" +) + +func GetCenterListHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + l := ai.NewGetCenterListLogic(r.Context(), svcCtx) + resp, err := l.GetCenterList() + if err != nil { + httpx.ErrorCtx(r.Context(), w, err) + } else { + httpx.OkJsonCtx(r.Context(), w, resp) + } + } +} diff --git a/api/internal/handler/ai/getcenteroverviewhandler.go b/api/internal/handler/ai/getcenteroverviewhandler.go new file mode 100644 index 00000000..84b97958 --- /dev/null +++ b/api/internal/handler/ai/getcenteroverviewhandler.go @@ -0,0 +1,21 @@ +package ai + +import ( + "net/http" + + "github.com/zeromicro/go-zero/rest/httpx" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/logic/ai" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" +) + +func GetCenterOverviewHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + l := ai.NewGetCenterOverviewLogic(r.Context(), svcCtx) + resp, err := l.GetCenterOverview() + if err != nil { + httpx.ErrorCtx(r.Context(), w, err) + } else { + httpx.OkJsonCtx(r.Context(), w, resp) + } + } +} diff --git a/api/internal/handler/ai/getcenterqueueinghandler.go b/api/internal/handler/ai/getcenterqueueinghandler.go new file mode 100644 index 00000000..6577b34c --- /dev/null +++ b/api/internal/handler/ai/getcenterqueueinghandler.go @@ -0,0 +1,21 @@ +package ai + +import ( + "net/http" + + "github.com/zeromicro/go-zero/rest/httpx" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/logic/ai" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" +) + +func GetCenterQueueingHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + l := ai.NewGetCenterQueueingLogic(r.Context(), svcCtx) + resp, err := l.GetCenterQueueing() + if err != nil { + httpx.ErrorCtx(r.Context(), w, err) + } else { + httpx.OkJsonCtx(r.Context(), w, resp) + } + } +} diff --git a/api/internal/handler/ai/getcentertasklisthandler.go b/api/internal/handler/ai/getcentertasklisthandler.go new file mode 100644 index 00000000..37312b25 --- /dev/null +++ b/api/internal/handler/ai/getcentertasklisthandler.go @@ -0,0 +1,21 @@ +package ai + +import ( + "net/http" + + "github.com/zeromicro/go-zero/rest/httpx" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/logic/ai" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" +) + +func GetCenterTaskListHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + l := ai.NewGetCenterTaskListLogic(r.Context(), svcCtx) + resp, err := l.GetCenterTaskList() + if err != nil { + httpx.ErrorCtx(r.Context(), w, err) + } else { + httpx.OkJsonCtx(r.Context(), w, resp) + } + } +} diff --git a/api/internal/handler/routes.go b/api/internal/handler/routes.go index 5402a03d..4db7721f 100644 --- a/api/internal/handler/routes.go +++ b/api/internal/handler/routes.go @@ -258,6 +258,26 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) { server.AddRoutes( []rest.Route{ + { + Method: http.MethodGet, + Path: "/ai/getCenterOverview", + Handler: ai.GetCenterOverviewHandler(serverCtx), + }, + { + Method: http.MethodGet, + Path: "/ai/getCenterQueueing", + Handler: ai.GetCenterQueueingHandler(serverCtx), + }, + { + Method: http.MethodGet, + Path: "/ai/getCenterList", + Handler: ai.GetCenterListHandler(serverCtx), + }, + { + Method: http.MethodGet, + Path: "/ai/getCenterTaskList", + Handler: ai.GetCenterTaskListHandler(serverCtx), + }, { Method: http.MethodGet, Path: "/ai/listDataSet/:projectId", @@ -1155,6 +1175,11 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) { Path: "/schedule/submit", Handler: schedule.ScheduleSubmitHandler(serverCtx), }, + { + Method: http.MethodPost, + Path: "/schedule/getOverview", + Handler: schedule.ScheduleGetOverviewHandler(serverCtx), + }, }, rest.WithPrefix("/pcm/v1"), ) diff --git a/api/internal/handler/schedule/schedulegetoverviewhandler.go b/api/internal/handler/schedule/schedulegetoverviewhandler.go new file mode 100644 index 00000000..bbf66cc7 --- /dev/null +++ b/api/internal/handler/schedule/schedulegetoverviewhandler.go @@ -0,0 +1,21 @@ +package schedule + +import ( + "net/http" + + "github.com/zeromicro/go-zero/rest/httpx" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/logic/schedule" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" +) + +func ScheduleGetOverviewHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + l := schedule.NewScheduleGetOverviewLogic(r.Context(), svcCtx) + resp, err := l.ScheduleGetOverview() + if err != nil { + httpx.ErrorCtx(r.Context(), w, err) + } else { + httpx.OkJsonCtx(r.Context(), w, resp) + } + } +} diff --git a/api/internal/logic/ai/getcenterlistlogic.go b/api/internal/logic/ai/getcenterlistlogic.go new file mode 100644 index 00000000..ce9db87d --- /dev/null +++ b/api/internal/logic/ai/getcenterlistlogic.go @@ -0,0 +1,30 @@ +package ai + +import ( + "context" + + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" + + "github.com/zeromicro/go-zero/core/logx" +) + +type GetCenterListLogic struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext +} + +func NewGetCenterListLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetCenterListLogic { + return &GetCenterListLogic{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +func (l *GetCenterListLogic) GetCenterList() (resp *types.CenterListResp, err error) { + // todo: add your logic here and delete this line + + return +} diff --git a/api/internal/logic/ai/getcenteroverviewlogic.go b/api/internal/logic/ai/getcenteroverviewlogic.go new file mode 100644 index 00000000..0de00f18 --- /dev/null +++ b/api/internal/logic/ai/getcenteroverviewlogic.go @@ -0,0 +1,30 @@ +package ai + +import ( + "context" + + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" + + "github.com/zeromicro/go-zero/core/logx" +) + +type GetCenterOverviewLogic struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext +} + +func NewGetCenterOverviewLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetCenterOverviewLogic { + return &GetCenterOverviewLogic{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +func (l *GetCenterOverviewLogic) GetCenterOverview() (resp *types.CenterOverviewResp, err error) { + // todo: add your logic here and delete this line + + return +} diff --git a/api/internal/logic/ai/getcenterqueueinglogic.go b/api/internal/logic/ai/getcenterqueueinglogic.go new file mode 100644 index 00000000..6ff23825 --- /dev/null +++ b/api/internal/logic/ai/getcenterqueueinglogic.go @@ -0,0 +1,30 @@ +package ai + +import ( + "context" + + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" + + "github.com/zeromicro/go-zero/core/logx" +) + +type GetCenterQueueingLogic struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext +} + +func NewGetCenterQueueingLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetCenterQueueingLogic { + return &GetCenterQueueingLogic{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +func (l *GetCenterQueueingLogic) GetCenterQueueing() (resp *types.CenterQueueingResp, err error) { + // todo: add your logic here and delete this line + + return +} diff --git a/api/internal/logic/ai/getcentertasklistlogic.go b/api/internal/logic/ai/getcentertasklistlogic.go new file mode 100644 index 00000000..96242e9b --- /dev/null +++ b/api/internal/logic/ai/getcentertasklistlogic.go @@ -0,0 +1,30 @@ +package ai + +import ( + "context" + + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" + + "github.com/zeromicro/go-zero/core/logx" +) + +type GetCenterTaskListLogic struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext +} + +func NewGetCenterTaskListLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetCenterTaskListLogic { + return &GetCenterTaskListLogic{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +func (l *GetCenterTaskListLogic) GetCenterTaskList() (resp *types.CenterTaskListResp, err error) { + // todo: add your logic here and delete this line + + return +} diff --git a/api/internal/logic/schedule/schedulegetoverviewlogic.go b/api/internal/logic/schedule/schedulegetoverviewlogic.go new file mode 100644 index 00000000..f3ab02af --- /dev/null +++ b/api/internal/logic/schedule/schedulegetoverviewlogic.go @@ -0,0 +1,30 @@ +package schedule + +import ( + "context" + + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" + + "github.com/zeromicro/go-zero/core/logx" +) + +type ScheduleGetOverviewLogic struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext +} + +func NewScheduleGetOverviewLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleGetOverviewLogic { + return &ScheduleGetOverviewLogic{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +func (l *ScheduleGetOverviewLogic) ScheduleGetOverview() (resp *types.ScheduleOverviewResp, err error) { + // todo: add your logic here and delete this line + + return +} diff --git a/api/internal/types/types.go b/api/internal/types/types.go index 52164d6e..9120e293 100644 --- a/api/internal/types/types.go +++ b/api/internal/types/types.go @@ -2707,6 +2707,43 @@ type Nfs struct { ReadOnly bool `json:"readOnly,optional"` } +type CenterOverviewResp struct { + CenterNum int32 `json:"totalCenters,optional"` + TaskNum int32 `json:"totalTasks,optional"` + CardNum int32 `json:"totalCards,optional"` + PowerInTops float64 `json:"totalPower,optional"` +} + +type CenterQueueingResp struct { + Current []*CenterQueue `json:"current,optional"` + History []*CenterQueue `json:"history,optional"` +} + +type CenterQueue struct { + Name string `json:"name,optional"` + QueueingNum int32 `json:"num,optional"` +} + +type CenterListResp struct { + List []*AiCenter `json:"centerList,optional"` +} + +type AiCenter struct { + Name string `json:"name,optional"` + StackName string `json:"stack,optional"` + Version string `json:"version,optional"` +} + +type CenterTaskListResp struct { + List []*AiTask `json:"taskList,optional"` +} + +type AiTask struct { + Name string `json:"name,optional"` + Status string `json:"status,optional"` + TimeElapsed int32 `json:"elapsed,optional"` +} + type StorageScreenReq struct { } @@ -5488,6 +5525,9 @@ type ScheduleResult struct { Msg string `json:"msg"` } +type ScheduleOverviewResp struct { +} + type AiOption struct { TaskName string `json:"taskName"` AdapterId string `json:"adapterId"` From c151c8db07678a8ac0b350e3c2a25668bd197e92 Mon Sep 17 00:00:00 2001 From: tzwang Date: Mon, 29 Apr 2024 10:10:15 +0800 Subject: [PATCH 4/8] added ai task db model Former-commit-id: b81c68d0f5d6fdbd137c0a4c5531e23fae81c502 --- api/desc/schedule/pcm-schedule.api | 16 ++++++++++++++++ api/internal/types/types.go | 16 ++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/api/desc/schedule/pcm-schedule.api b/api/desc/schedule/pcm-schedule.api index a3068a25..e1892770 100644 --- a/api/desc/schedule/pcm-schedule.api +++ b/api/desc/schedule/pcm-schedule.api @@ -84,4 +84,20 @@ type ( AiJobLogResp { Log string `json:"log"` } + + AiTaskDb { + Id string `json:"id,omitempty" db:"id"` + TaskId string `json:"taskId,omitempty" db:"task_id"` + AdapterId string `json:"adapterId,omitempty" db:"adapter_id"` + ClusterId string `json:"clusterId,omitempty" db:"cluster_id"` + Name string `json:"name,omitempty" db:"name"` + Replica string `json:"replica,omitempty" db:"replica"` + ClusterTaskId string `json:"clusterTaskId,omitempty" db:"c_task_id"` + Strategy string `json:"strategy,omitempty" db:"strategy"` + Status string `json:"status,omitempty" db:"status"` + Msg string `json:"msg,omitempty" db:"msg"` + CommitTime string `json:"commitTime,omitempty" db:"commit_time"` + StartTime string `json:"startTime,omitempty" db:"start_time"` + EndTime string `json:"endTime,omitempty" db:"end_time"` + } ) \ No newline at end of file diff --git a/api/internal/types/types.go b/api/internal/types/types.go index 9120e293..8f20e9a2 100644 --- a/api/internal/types/types.go +++ b/api/internal/types/types.go @@ -5586,6 +5586,22 @@ type AiJobLogResp struct { Log string `json:"log"` } +type AiTaskDb struct { + Id string `json:"id,omitempty" db:"id"` + TaskId string `json:"taskId,omitempty" db:"task_id"` + AdapterId string `json:"adapterId,omitempty" db:"adapter_id"` + ClusterId string `json:"clusterId,omitempty" db:"cluster_id"` + Name string `json:"name,omitempty" db:"name"` + Replica string `json:"replica,omitempty" db:"replica"` + ClusterTaskId string `json:"clusterTaskId,omitempty" db:"c_task_id"` + Strategy string `json:"strategy,omitempty" db:"strategy"` + Status string `json:"status,omitempty" db:"status"` + Msg string `json:"msg,omitempty" db:"msg"` + CommitTime string `json:"commitTime,omitempty" db:"commit_time"` + StartTime string `json:"startTime,omitempty" db:"start_time"` + EndTime string `json:"endTime,omitempty" db:"end_time"` +} + type CreateAlertRuleReq struct { CLusterId string `json:"clusterId"` ClusterName string `json:"clusterName"` From 15c991ead1fc38e3e4ef92b00dd53274e6e35ceb Mon Sep 17 00:00:00 2001 From: tzwang Date: Mon, 29 Apr 2024 11:12:43 +0800 Subject: [PATCH 5/8] generate aitask model Former-commit-id: 7878a3900d5d2d077c9a8896a4866987b5ffbf99 --- pkg/models/taskaimodel.go | 24 ++++++++ pkg/models/taskaimodel_gen.go | 103 ++++++++++++++++++++++++++++++++++ 2 files changed, 127 insertions(+) create mode 100644 pkg/models/taskaimodel.go create mode 100644 pkg/models/taskaimodel_gen.go diff --git a/pkg/models/taskaimodel.go b/pkg/models/taskaimodel.go new file mode 100644 index 00000000..8db14d5b --- /dev/null +++ b/pkg/models/taskaimodel.go @@ -0,0 +1,24 @@ +package models + +import "github.com/zeromicro/go-zero/core/stores/sqlx" + +var _ TaskAiModel = (*customTaskAiModel)(nil) + +type ( + // TaskAiModel is an interface to be customized, add more methods here, + // and implement the added methods in customTaskAiModel. + TaskAiModel interface { + taskAiModel + } + + customTaskAiModel struct { + *defaultTaskAiModel + } +) + +// NewTaskAiModel returns a model for the database table. +func NewTaskAiModel(conn sqlx.SqlConn) TaskAiModel { + return &customTaskAiModel{ + defaultTaskAiModel: newTaskAiModel(conn), + } +} diff --git a/pkg/models/taskaimodel_gen.go b/pkg/models/taskaimodel_gen.go new file mode 100644 index 00000000..026eea1d --- /dev/null +++ b/pkg/models/taskaimodel_gen.go @@ -0,0 +1,103 @@ +// Code generated by goctl. DO NOT EDIT. + +package models + +import ( + "context" + "database/sql" + "fmt" + "strings" + "time" + + "github.com/zeromicro/go-zero/core/stores/builder" + "github.com/zeromicro/go-zero/core/stores/sqlc" + "github.com/zeromicro/go-zero/core/stores/sqlx" + "github.com/zeromicro/go-zero/core/stringx" +) + +var ( + taskAiFieldNames = builder.RawFieldNames(&TaskAi{}) + taskAiRows = strings.Join(taskAiFieldNames, ",") + taskAiRowsExpectAutoSet = strings.Join(stringx.Remove(taskAiFieldNames, "`id`", "`create_at`", "`create_time`", "`created_at`", "`update_at`", "`update_time`", "`updated_at`"), ",") + taskAiRowsWithPlaceHolder = strings.Join(stringx.Remove(taskAiFieldNames, "`id`", "`create_at`", "`create_time`", "`created_at`", "`update_at`", "`update_time`", "`updated_at`"), "=?,") + "=?" +) + +type ( + taskAiModel interface { + Insert(ctx context.Context, data *TaskAi) (sql.Result, error) + FindOne(ctx context.Context, id int64) (*TaskAi, error) + Update(ctx context.Context, data *TaskAi) error + Delete(ctx context.Context, id int64) error + } + + defaultTaskAiModel struct { + conn sqlx.SqlConn + table string + } + + TaskAi struct { + Id int64 `db:"id"` // id + TaskId int64 `db:"task_id"` // 任务id + AdapterId int64 `db:"adapter_id"` // 设配器id + ClusterId int64 `db:"cluster_id"` // 集群id + Name string `db:"name"` // 任务名 + Replica int64 `db:"replica"` // 执行数 + CTaskId string `db:"c_task_id"` // 集群返回任务id + Strategy string `db:"strategy"` // 主任务使用策略 + Status string `db:"status"` // 任务状态 + Msg sql.NullString `db:"msg"` // 集群返回任务信息 + CommitTime time.Time `db:"commit_time"` // 提交时间 + StartTime time.Time `db:"start_time"` // 开始时间 + EndTime time.Time `db:"end_time"` // 结束时间 + } +) + +func newTaskAiModel(conn sqlx.SqlConn) *defaultTaskAiModel { + return &defaultTaskAiModel{ + conn: conn, + table: "`task_ai`", + } +} + +func (m *defaultTaskAiModel) withSession(session sqlx.Session) *defaultTaskAiModel { + return &defaultTaskAiModel{ + conn: sqlx.NewSqlConnFromSession(session), + table: "`task_ai`", + } +} + +func (m *defaultTaskAiModel) Delete(ctx context.Context, id int64) error { + query := fmt.Sprintf("delete from %s where `id` = ?", m.table) + _, err := m.conn.ExecCtx(ctx, query, id) + return err +} + +func (m *defaultTaskAiModel) FindOne(ctx context.Context, id int64) (*TaskAi, error) { + query := fmt.Sprintf("select %s from %s where `id` = ? limit 1", taskAiRows, m.table) + var resp TaskAi + err := m.conn.QueryRowCtx(ctx, &resp, query, id) + switch err { + case nil: + return &resp, nil + case sqlc.ErrNotFound: + return nil, ErrNotFound + default: + return nil, err + } +} + +func (m *defaultTaskAiModel) Insert(ctx context.Context, data *TaskAi) (sql.Result, error) { + query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, taskAiRowsExpectAutoSet) + ret, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.AdapterId, data.ClusterId, data.Name, data.Replica, data.CTaskId, data.Strategy, data.Status, data.Msg, data.CommitTime, data.StartTime, data.EndTime) + return ret, err +} + +func (m *defaultTaskAiModel) Update(ctx context.Context, data *TaskAi) error { + query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, taskAiRowsWithPlaceHolder) + _, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.AdapterId, data.ClusterId, data.Name, data.Replica, data.CTaskId, data.Strategy, data.Status, data.Msg, data.CommitTime, data.StartTime, data.EndTime, data.Id) + return err +} + +func (m *defaultTaskAiModel) tableName() string { + return m.table +} From 50083890bd463ac3c2ff8376b9cd10d6582578ab Mon Sep 17 00:00:00 2001 From: tzwang Date: Mon, 29 Apr 2024 16:39:56 +0800 Subject: [PATCH 6/8] updated aitask model Former-commit-id: 390f37794e551c366df0107ba11c6f7e3a50d002 --- pkg/models/taskaimodel_gen.go | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/pkg/models/taskaimodel_gen.go b/pkg/models/taskaimodel_gen.go index 026eea1d..b8a9e627 100644 --- a/pkg/models/taskaimodel_gen.go +++ b/pkg/models/taskaimodel_gen.go @@ -36,19 +36,19 @@ type ( } TaskAi struct { - Id int64 `db:"id"` // id - TaskId int64 `db:"task_id"` // 任务id - AdapterId int64 `db:"adapter_id"` // 设配器id - ClusterId int64 `db:"cluster_id"` // 集群id - Name string `db:"name"` // 任务名 - Replica int64 `db:"replica"` // 执行数 - CTaskId string `db:"c_task_id"` // 集群返回任务id - Strategy string `db:"strategy"` // 主任务使用策略 - Status string `db:"status"` // 任务状态 - Msg sql.NullString `db:"msg"` // 集群返回任务信息 - CommitTime time.Time `db:"commit_time"` // 提交时间 - StartTime time.Time `db:"start_time"` // 开始时间 - EndTime time.Time `db:"end_time"` // 结束时间 + Id int64 `db:"id"` // id + TaskId int64 `db:"task_id"` // 任务id + AdapterId int64 `db:"adapter_id"` // 设配器id + ClusterId int64 `db:"cluster_id"` // 集群id + Name string `db:"name"` // 任务名 + Replica int64 `db:"replica"` // 执行数 + CTaskId string `db:"c_task_id"` // 集群返回任务id + Strategy string `db:"strategy"` // 主任务使用策略 + Status string `db:"status"` // 任务状态 + Msg string `db:"msg"` // 集群返回任务信息 + CommitTime time.Time `db:"commit_time"` // 提交时间 + StartTime string `db:"start_time"` // 开始时间 + EndTime string `db:"end_time"` // 结束时间 } ) From 239ca4fc1798046be2dc5b757f2518dc83cc3dcf Mon Sep 17 00:00:00 2001 From: tzwang Date: Mon, 29 Apr 2024 17:45:37 +0800 Subject: [PATCH 7/8] updated aitask model Former-commit-id: 2092d2b73533dbcb11437f5b6502f90c1119dece --- pkg/models/taskaimodel_gen.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/models/taskaimodel_gen.go b/pkg/models/taskaimodel_gen.go index b8a9e627..ab0c5502 100644 --- a/pkg/models/taskaimodel_gen.go +++ b/pkg/models/taskaimodel_gen.go @@ -42,7 +42,7 @@ type ( ClusterId int64 `db:"cluster_id"` // 集群id Name string `db:"name"` // 任务名 Replica int64 `db:"replica"` // 执行数 - CTaskId string `db:"c_task_id"` // 集群返回任务id + JobId string `db:"job_id"` // 集群返回任务id Strategy string `db:"strategy"` // 主任务使用策略 Status string `db:"status"` // 任务状态 Msg string `db:"msg"` // 集群返回任务信息 @@ -88,13 +88,13 @@ func (m *defaultTaskAiModel) FindOne(ctx context.Context, id int64) (*TaskAi, er func (m *defaultTaskAiModel) Insert(ctx context.Context, data *TaskAi) (sql.Result, error) { query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, taskAiRowsExpectAutoSet) - ret, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.AdapterId, data.ClusterId, data.Name, data.Replica, data.CTaskId, data.Strategy, data.Status, data.Msg, data.CommitTime, data.StartTime, data.EndTime) + ret, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.AdapterId, data.ClusterId, data.Name, data.Replica, data.JobId, data.Strategy, data.Status, data.Msg, data.CommitTime, data.StartTime, data.EndTime) return ret, err } func (m *defaultTaskAiModel) Update(ctx context.Context, data *TaskAi) error { query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, taskAiRowsWithPlaceHolder) - _, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.AdapterId, data.ClusterId, data.Name, data.Replica, data.CTaskId, data.Strategy, data.Status, data.Msg, data.CommitTime, data.StartTime, data.EndTime, data.Id) + _, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.AdapterId, data.ClusterId, data.Name, data.Replica, data.JobId, data.Strategy, data.Status, data.Msg, data.CommitTime, data.StartTime, data.EndTime, data.Id) return err } From cee3ae17bdcac24a903324a8d0dcd9c19708ede1 Mon Sep 17 00:00:00 2001 From: tzwang Date: Mon, 29 Apr 2024 17:51:25 +0800 Subject: [PATCH 8/8] updated schedule submit func Former-commit-id: 8bfeec069c549374b47fa80c0dd4b2293937b04a --- .../schedule/schedulegetaijoblogloglogic.go | 6 ++- .../logic/schedule/schedulesubmitlogic.go | 14 +++-- api/internal/scheduler/database/aiStorage.go | 54 ++++++++++++++++++- .../scheduler/schedulers/aiScheduler.go | 39 +++++++++----- .../scheduler/schedulers/option/aiOption.go | 1 + 5 files changed, 96 insertions(+), 18 deletions(-) diff --git a/api/internal/logic/schedule/schedulegetaijoblogloglogic.go b/api/internal/logic/schedule/schedulegetaijoblogloglogic.go index da5a0c7a..e0f304de 100644 --- a/api/internal/logic/schedule/schedulegetaijoblogloglogic.go +++ b/api/internal/logic/schedule/schedulegetaijoblogloglogic.go @@ -26,7 +26,11 @@ func NewScheduleGetAiJobLogLogLogic(ctx context.Context, svcCtx *svc.ServiceCont func (l *ScheduleGetAiJobLogLogLogic) ScheduleGetAiJobLogLog(req *types.AiJobLogReq) (resp *types.AiJobLogResp, err error) { resp = &types.AiJobLogResp{} - log, err := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[req.AdapterId][req.ClusterId].GetTrainingTaskLog(l.ctx, req.TaskId, req.InstanceNum) + id, err := l.svcCtx.Scheduler.AiStorages.GetAiTaskIdByClusterIdAndTaskId(req.ClusterId, req.TaskId) + if err != nil { + return nil, err + } + log, err := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[req.AdapterId][req.ClusterId].GetTrainingTaskLog(l.ctx, id, req.InstanceNum) if err != nil { return nil, err } diff --git a/api/internal/logic/schedule/schedulesubmitlogic.go b/api/internal/logic/schedule/schedulesubmitlogic.go index 2b9956d1..183699f2 100644 --- a/api/internal/logic/schedule/schedulesubmitlogic.go +++ b/api/internal/logic/schedule/schedulesubmitlogic.go @@ -6,6 +6,7 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "github.com/zeromicro/go-zero/core/logx" ) @@ -51,6 +52,10 @@ func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *type switch opt.GetOptionType() { case option.AI: + id, err := l.svcCtx.Scheduler.AiStorages.SaveTask(req.AiOption.TaskName) + if err != nil { + return nil, err + } rs := (results).([]*schedulers.AiResult) for _, r := range rs { scheResult := &types.ScheduleResult{} @@ -59,12 +64,13 @@ func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *type scheResult.Strategy = r.Strategy scheResult.Replica = r.Replica scheResult.Msg = r.Msg + err := l.svcCtx.Scheduler.AiStorages.SaveAiTask(id, opt, r.ClusterId, r.TaskId, constants.Running, r.Msg) + if err != nil { + return nil, err + } resp.Results = append(resp.Results, scheResult) } - err = l.svcCtx.Scheduler.AiStorages.SaveTask(req.AiOption.TaskName) - if err != nil { - return nil, err - } + } return resp, nil diff --git a/api/internal/scheduler/database/aiStorage.go b/api/internal/scheduler/database/aiStorage.go index 2cf648aa..8efb98b8 100644 --- a/api/internal/scheduler/database/aiStorage.go +++ b/api/internal/scheduler/database/aiStorage.go @@ -2,10 +2,12 @@ package database import ( "github.com/zeromicro/go-zero/core/logx" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gorm.io/gorm" + "strconv" "time" ) @@ -48,7 +50,17 @@ func (s *AiStorage) GetAdapterIdsByType(adapterType string) ([]string, error) { return ids, nil } -func (s *AiStorage) SaveTask(name string) error { +func (s *AiStorage) GetAiTasks() ([]*types.AiTaskDb, error) { + var resp []*types.AiTaskDb + tx := s.DbEngin.Raw("select * from task_ai").Scan(&resp) + if tx.Error != nil { + logx.Errorf(tx.Error.Error()) + return nil, tx.Error + } + return resp, nil +} + +func (s *AiStorage) SaveTask(name string) (int64, error) { // 构建主任务结构体 taskModel := models.Task{ Status: constants.Saved, @@ -58,12 +70,52 @@ func (s *AiStorage) SaveTask(name string) error { } // 保存任务数据到数据库 tx := s.DbEngin.Create(&taskModel) + if tx.Error != nil { + return 0, tx.Error + } + return taskModel.Id, nil +} + +func (s *AiStorage) SaveAiTask(taskId int64, option *option.AiOption, clusterId string, jobId string, status string, msg string) error { + // 构建主任务结构体 + aId, err := strconv.ParseInt(option.AdapterId, 10, 64) + if err != nil { + return err + } + cId, err := strconv.ParseInt(clusterId, 10, 64) + if err != nil { + return err + } + aiTaskModel := models.TaskAi{ + TaskId: taskId, + AdapterId: aId, + ClusterId: cId, + Name: option.TaskName, + Replica: option.Replica, + JobId: jobId, + Strategy: option.StrategyName, + Status: status, + Msg: msg, + CommitTime: time.Now(), + } + // 保存任务数据到数据库 + tx := s.DbEngin.Create(&aiTaskModel) if tx.Error != nil { return tx.Error } return nil } +func (s *AiStorage) GetAiTaskIdByClusterIdAndTaskId(clusterId string, taskId string) (string, error) { + var aiTask models.TaskAi + tx := s.DbEngin.Raw("select * from task_ai where `cluster_id` = ? and `task_id` = ?", clusterId, taskId).Scan(&aiTask) + if tx.Error != nil { + logx.Errorf(tx.Error.Error()) + return "", tx.Error + } + return aiTask.JobId, nil +} + func (s *AiStorage) UpdateTask() error { return nil } diff --git a/api/internal/scheduler/schedulers/aiScheduler.go b/api/internal/scheduler/schedulers/aiScheduler.go index a3e3e366..af50d201 100644 --- a/api/internal/scheduler/schedulers/aiScheduler.go +++ b/api/internal/scheduler/schedulers/aiScheduler.go @@ -26,6 +26,7 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy/param" "gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gitlink.org.cn/JointCloud/pcm-octopus/octopus" @@ -168,32 +169,46 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa errs = append(errs, e) } - if len(errs) == len(clusters) { - return nil, errors.New("submit task failed") + for s := range ch { + results = append(results, s) } if len(errs) != 0 { - var msg string + taskId, err := as.AiStorages.SaveTask(as.option.TaskName) + if err != nil { + return nil, err + } + var errmsg string for _, err := range errs { e := (err).(struct { err error clusterId string }) - msg += fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error()) + msg := fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error()) + errmsg += msg + err := as.AiStorages.SaveAiTask(taskId, as.option, e.clusterId, "", constants.Failed, msg) + if err != nil { + return nil, err + } } for s := range ch { if s.Msg != "" { - msg += fmt.Sprintf("clusterId: %v , error: %v \n", s.ClusterId, s.Msg) + msg := fmt.Sprintf("clusterId: %v , error: %v \n", s.ClusterId, s.Msg) + errmsg += msg + err := as.AiStorages.SaveAiTask(taskId, as.option, s.ClusterId, "", constants.Failed, msg) + if err != nil { + return nil, err + } } else { - msg += fmt.Sprintf("clusterId: %v , submitted successfully, taskId: %v \n", s.ClusterId, s.TaskId) + msg := fmt.Sprintf("clusterId: %v , submitted successfully, taskId: %v \n", s.ClusterId, s.TaskId) + errmsg += msg + err := as.AiStorages.SaveAiTask(taskId, as.option, s.ClusterId, s.TaskId, constants.Succeeded, msg) + if err != nil { + return nil, err + } } } - return nil, errors.New(msg) - } - - for s := range ch { - // TODO: database operation - results = append(results, s) + return nil, errors.New(errmsg) } return results, nil diff --git a/api/internal/scheduler/schedulers/option/aiOption.go b/api/internal/scheduler/schedulers/option/aiOption.go index f8a6495f..d2f8d3eb 100644 --- a/api/internal/scheduler/schedulers/option/aiOption.go +++ b/api/internal/scheduler/schedulers/option/aiOption.go @@ -4,6 +4,7 @@ type AiOption struct { AdapterId string ClusterIds []string TaskName string + Replica int64 ResourceType string // cpu/gpu/compute card CpuCoreNum int64 TaskType string // pytorch/tensorflow/mindspore