Merge pull request 'updated adapterid to schedule apis' (#115) from tzwang/pcm-coordinator:master into master
Former-commit-id: 4c9cce1bec2a7c28fcc3ffaddf889fe1211095fe
This commit is contained in:
commit
9766e68075
|
@ -906,13 +906,13 @@ service pcm {
|
||||||
get /schedule/ai/getTaskTypes returns (AiTaskTypesResp)
|
get /schedule/ai/getTaskTypes returns (AiTaskTypesResp)
|
||||||
|
|
||||||
@handler ScheduleGetDatasetsHandler
|
@handler ScheduleGetDatasetsHandler
|
||||||
get /schedule/ai/getDatasets returns (AiDatasetsResp)
|
get /schedule/ai/getDatasets/:adapterId (AiDatasetsReq) returns (AiDatasetsResp)
|
||||||
|
|
||||||
@handler ScheduleGetStrategyHandler
|
@handler ScheduleGetStrategyHandler
|
||||||
get /schedule/ai/getStrategies returns (AiStrategyResp)
|
get /schedule/ai/getStrategies returns (AiStrategyResp)
|
||||||
|
|
||||||
@handler ScheduleGetAlgorithmsHandler
|
@handler ScheduleGetAlgorithmsHandler
|
||||||
get /schedule/ai/getAlgorithms/:resourceType/:taskType/:dataset (AiAlgorithmsReq) returns (AiAlgorithmsResp)
|
get /schedule/ai/getAlgorithms/:adapterId/:resourceType/:taskType/:dataset (AiAlgorithmsReq) returns (AiAlgorithmsResp)
|
||||||
|
|
||||||
@handler ScheduleSubmitHandler
|
@handler ScheduleSubmitHandler
|
||||||
post /schedule/submit (ScheduleReq) returns (ScheduleResp)
|
post /schedule/submit (ScheduleReq) returns (ScheduleResp)
|
||||||
|
|
|
@ -26,7 +26,8 @@ type (
|
||||||
|
|
||||||
AiOption {
|
AiOption {
|
||||||
TaskName string `json:"taskName"`
|
TaskName string `json:"taskName"`
|
||||||
AiClusterId string `json:"aiClusterId,optional"`
|
AdapterId string `json:"adapterId"`
|
||||||
|
AiClusterIds []string `json:"aiClusterIds"`
|
||||||
ResourceType string `json:"resourceType"`
|
ResourceType string `json:"resourceType"`
|
||||||
Tops float64 `json:"Tops,optional"`
|
Tops float64 `json:"Tops,optional"`
|
||||||
TaskType string `json:"taskType"`
|
TaskType string `json:"taskType"`
|
||||||
|
@ -47,6 +48,10 @@ type (
|
||||||
TaskTypes []string `json:"taskTypes"`
|
TaskTypes []string `json:"taskTypes"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
AiDatasetsReq {
|
||||||
|
AdapterId string `path:"adapterId"`
|
||||||
|
}
|
||||||
|
|
||||||
AiDatasetsResp {
|
AiDatasetsResp {
|
||||||
Datasets []string `json:"datasets"`
|
Datasets []string `json:"datasets"`
|
||||||
}
|
}
|
||||||
|
@ -56,6 +61,7 @@ type (
|
||||||
}
|
}
|
||||||
|
|
||||||
AiAlgorithmsReq {
|
AiAlgorithmsReq {
|
||||||
|
AdapterId string `path:"adapterId"`
|
||||||
ResourceType string `path:"resourceType"`
|
ResourceType string `path:"resourceType"`
|
||||||
TaskType string `path:"taskType"`
|
TaskType string `path:"taskType"`
|
||||||
Dataset string `path:"dataset"`
|
Dataset string `path:"dataset"`
|
||||||
|
|
|
@ -1122,7 +1122,7 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Method: http.MethodGet,
|
Method: http.MethodGet,
|
||||||
Path: "/schedule/ai/getDatasets",
|
Path: "/schedule/ai/getDatasets/:adapterId",
|
||||||
Handler: schedule.ScheduleGetDatasetsHandler(serverCtx),
|
Handler: schedule.ScheduleGetDatasetsHandler(serverCtx),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
@ -1132,7 +1132,7 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Method: http.MethodGet,
|
Method: http.MethodGet,
|
||||||
Path: "/schedule/ai/getAlgorithms/:resourceType/:taskType/:dataset",
|
Path: "/schedule/ai/getAlgorithms/:adapterId/:resourceType/:taskType/:dataset",
|
||||||
Handler: schedule.ScheduleGetAlgorithmsHandler(serverCtx),
|
Handler: schedule.ScheduleGetAlgorithmsHandler(serverCtx),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
|
|
@ -1,16 +1,24 @@
|
||||||
package schedule
|
package schedule
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/zeromicro/go-zero/rest/httpx"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/logic/schedule"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/logic/schedule"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
|
||||||
"net/http"
|
"net/http"
|
||||||
)
|
)
|
||||||
|
|
||||||
func ScheduleGetDatasetsHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
|
func ScheduleGetDatasetsHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
var req types.AiDatasetsReq
|
||||||
|
if err := httpx.Parse(r, &req); err != nil {
|
||||||
|
result.ParamErrorResult(r, w, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
l := schedule.NewScheduleGetDatasetsLogic(r.Context(), svcCtx)
|
l := schedule.NewScheduleGetDatasetsLogic(r.Context(), svcCtx)
|
||||||
resp, err := l.ScheduleGetDatasets()
|
resp, err := l.ScheduleGetDatasets(&req)
|
||||||
result.HttpResult(r, w, resp, err)
|
result.HttpResult(r, w, resp, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,7 +26,7 @@ func NewScheduleGetAlgorithmsLogic(ctx context.Context, svcCtx *svc.ServiceConte
|
||||||
|
|
||||||
func (l *ScheduleGetAlgorithmsLogic) ScheduleGetAlgorithms(req *types.AiAlgorithmsReq) (resp *types.AiAlgorithmsResp, err error) {
|
func (l *ScheduleGetAlgorithmsLogic) ScheduleGetAlgorithms(req *types.AiAlgorithmsReq) (resp *types.AiAlgorithmsResp, err error) {
|
||||||
resp = &types.AiAlgorithmsResp{}
|
resp = &types.AiAlgorithmsResp{}
|
||||||
algorithms, err := storeLink.GetAlgorithms(l.ctx, l.svcCtx.Scheduler.ResourceCollector, req.ResourceType, req.TaskType, req.Dataset)
|
algorithms, err := storeLink.GetAlgorithms(l.ctx, l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[req.AdapterId], req.ResourceType, req.TaskType, req.Dataset)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ package schedule
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/storeLink"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/storeLink"
|
||||||
|
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
|
||||||
|
|
||||||
|
@ -23,9 +24,9 @@ func NewScheduleGetDatasetsLogic(ctx context.Context, svcCtx *svc.ServiceContext
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *ScheduleGetDatasetsLogic) ScheduleGetDatasets() (resp *types.AiDatasetsResp, err error) {
|
func (l *ScheduleGetDatasetsLogic) ScheduleGetDatasets(req *types.AiDatasetsReq) (resp *types.AiDatasetsResp, err error) {
|
||||||
resp = &types.AiDatasetsResp{}
|
resp = &types.AiDatasetsResp{}
|
||||||
names, err := storeLink.GetDatasetsNames(l.ctx, l.svcCtx.Scheduler.ResourceCollector)
|
names, err := storeLink.GetDatasetsNames(l.ctx, l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[req.AdapterId])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,6 +27,7 @@ func NewScheduleSubmitLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Sc
|
||||||
func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *types.ScheduleResp, err error) {
|
func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *types.ScheduleResp, err error) {
|
||||||
resp = &types.ScheduleResp{}
|
resp = &types.ScheduleResp{}
|
||||||
opt := &option.AiOption{
|
opt := &option.AiOption{
|
||||||
|
AdapterId: req.AiOption.AdapterId,
|
||||||
ResourceType: req.AiOption.ResourceType,
|
ResourceType: req.AiOption.ResourceType,
|
||||||
Tops: req.AiOption.Tops,
|
Tops: req.AiOption.Tops,
|
||||||
TaskType: req.AiOption.TaskType,
|
TaskType: req.AiOption.TaskType,
|
||||||
|
|
|
@ -33,6 +33,21 @@ func (s *AiStorage) GetClustersByAdapterId(id string) (*types.ClusterListResp, e
|
||||||
return &resp, nil
|
return &resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *AiStorage) GetAdapterIdsByType(adapterType string) ([]string, error) {
|
||||||
|
var list []types.AdapterInfo
|
||||||
|
var ids []string
|
||||||
|
db := s.DbEngin.Model(&types.AdapterInfo{}).Table("t_adapter")
|
||||||
|
db = db.Where("type = ?", adapterType)
|
||||||
|
err := db.Order("create_time desc").Find(&list).Error
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
for _, info := range list {
|
||||||
|
ids = append(ids, info.Id)
|
||||||
|
}
|
||||||
|
return ids, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *AiStorage) SaveTask(name string) error {
|
func (s *AiStorage) SaveTask(name string) error {
|
||||||
// 构建主任务结构体
|
// 构建主任务结构体
|
||||||
taskModel := models.Task{
|
taskModel := models.Task{
|
||||||
|
|
|
@ -20,8 +20,7 @@ import (
|
||||||
"github.com/zeromicro/go-zero/core/logx"
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/common"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/common"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/database"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/database"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/executor"
|
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/rpc/client/participantservice"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/rpc/client/participantservice"
|
||||||
|
@ -32,16 +31,15 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type Scheduler struct {
|
type Scheduler struct {
|
||||||
task *response.TaskInfo
|
task *response.TaskInfo
|
||||||
participantIds []int64
|
participantIds []int64
|
||||||
subSchedule SubSchedule
|
subSchedule SubSchedule
|
||||||
dbEngin *gorm.DB
|
dbEngin *gorm.DB
|
||||||
result []string //pID:子任务yamlstring 键值对
|
result []string //pID:子任务yamlstring 键值对
|
||||||
participantRpc participantservice.ParticipantService
|
participantRpc participantservice.ParticipantService
|
||||||
ResourceCollector *map[string]collector.AiCollector
|
AiStorages *database.AiStorage
|
||||||
AiStorages *database.AiStorage
|
AiService *service.AiService
|
||||||
AiExecutor *map[string]executor.AiExecutor
|
mu sync.RWMutex
|
||||||
mu sync.RWMutex
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type SubSchedule interface {
|
type SubSchedule interface {
|
||||||
|
@ -59,8 +57,8 @@ func NewScheduler(subSchedule SubSchedule, val string, dbEngin *gorm.DB, partici
|
||||||
return &Scheduler{task: task, subSchedule: subSchedule, dbEngin: dbEngin, participantRpc: participantRpc}, nil
|
return &Scheduler{task: task, subSchedule: subSchedule, dbEngin: dbEngin, participantRpc: participantRpc}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSchdlr(resourceCollector *map[string]collector.AiCollector, storages *database.AiStorage, aiExecutor *map[string]executor.AiExecutor) *Scheduler {
|
func NewSchdlr(aiService *service.AiService, storages *database.AiStorage) *Scheduler {
|
||||||
return &Scheduler{ResourceCollector: resourceCollector, AiStorages: storages, AiExecutor: aiExecutor}
|
return &Scheduler{AiService: aiService, AiStorages: storages}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Scheduler) SpecifyClusters() {
|
func (s *Scheduler) SpecifyClusters() {
|
||||||
|
|
|
@ -64,9 +64,8 @@ func (as *AiScheduler) GetNewStructForDb(task *response.TaskInfo, resource strin
|
||||||
}
|
}
|
||||||
|
|
||||||
func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
|
func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
|
||||||
if as.option.AiClusterId != "" {
|
if len(as.option.ClusterIds) == 1 {
|
||||||
// TODO database operation Find
|
return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: as.option.ClusterIds[0], Replicas: 1}}, nil
|
||||||
return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: "", Replicas: 1}}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
resources, err := as.findClustersWithResources()
|
resources, err := as.findClustersWithResources()
|
||||||
|
@ -131,7 +130,7 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa
|
||||||
var ch = make(chan *AiResult, len(clusters))
|
var ch = make(chan *AiResult, len(clusters))
|
||||||
var errCh = make(chan interface{}, len(clusters))
|
var errCh = make(chan interface{}, len(clusters))
|
||||||
|
|
||||||
executorMap := *as.AiExecutor
|
executorMap := as.AiService.AiExecutorAdapterMap[as.option.AdapterId]
|
||||||
for _, cluster := range clusters {
|
for _, cluster := range clusters {
|
||||||
c := cluster
|
c := cluster
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
|
@ -202,13 +201,14 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa
|
||||||
|
|
||||||
func (as *AiScheduler) findClustersWithResources() ([]*collector.ResourceStats, error) {
|
func (as *AiScheduler) findClustersWithResources() ([]*collector.ResourceStats, error) {
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
var ch = make(chan *collector.ResourceStats, len(*as.ResourceCollector))
|
var clustersNum = len(as.AiService.AiCollectorAdapterMap[as.option.AdapterId])
|
||||||
var errCh = make(chan interface{}, len(*as.ResourceCollector))
|
var ch = make(chan *collector.ResourceStats, clustersNum)
|
||||||
|
var errCh = make(chan interface{}, clustersNum)
|
||||||
|
|
||||||
var resourceSpecs []*collector.ResourceStats
|
var resourceSpecs []*collector.ResourceStats
|
||||||
var errs []interface{}
|
var errs []interface{}
|
||||||
|
|
||||||
for s, resourceCollector := range *as.ResourceCollector {
|
for s, resourceCollector := range as.AiService.AiCollectorAdapterMap[as.option.AdapterId] {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
rc := resourceCollector
|
rc := resourceCollector
|
||||||
id := s
|
id := s
|
||||||
|
@ -242,7 +242,7 @@ func (as *AiScheduler) findClustersWithResources() ([]*collector.ResourceStats,
|
||||||
errs = append(errs, e)
|
errs = append(errs, e)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(errs) == len(*as.ResourceCollector) {
|
if len(errs) == clustersNum {
|
||||||
return nil, errors.New("get resources failed")
|
return nil, errors.New("get resources failed")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,8 @@
|
||||||
package option
|
package option
|
||||||
|
|
||||||
type AiOption struct {
|
type AiOption struct {
|
||||||
AiClusterId string // shuguangAi /octopus ClusterId
|
AdapterId string
|
||||||
|
ClusterIds []string
|
||||||
TaskName string
|
TaskName string
|
||||||
ResourceType string // cpu/gpu/compute card
|
ResourceType string // cpu/gpu/compute card
|
||||||
CpuCoreNum int64
|
CpuCoreNum int64
|
||||||
|
|
|
@ -1,11 +1,14 @@
|
||||||
package service
|
package service
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/zeromicro/go-zero/zrpc"
|
||||||
"gitlink.org.cn/JointCloud/pcm-ac/hpcacclient"
|
"gitlink.org.cn/JointCloud/pcm-ac/hpcacclient"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/config"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/database"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/database"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/executor"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/executor"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/storeLink"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/storeLink"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
|
||||||
"gitlink.org.cn/JointCloud/pcm-octopus/octopusclient"
|
"gitlink.org.cn/JointCloud/pcm-octopus/octopusclient"
|
||||||
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/imagesservice"
|
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/imagesservice"
|
||||||
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/modelartsservice"
|
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/modelartsservice"
|
||||||
|
@ -18,30 +21,60 @@ const (
|
||||||
SHUGUANGAI = "shuguangAi"
|
SHUGUANGAI = "shuguangAi"
|
||||||
)
|
)
|
||||||
|
|
||||||
func InitAiClusterMap(octopusRpc octopusclient.Octopus, modelArtsRpc modelartsservice.ModelArtsService, modelArtsImgRpc imagesservice.ImagesService, aCRpc hpcacclient.HpcAC, storages *database.AiStorage) (*map[string]executor.AiExecutor, *map[string]collector.AiCollector) {
|
type AiService struct {
|
||||||
clusters, _ := storages.GetClustersByAdapterId("1777144940459986944")
|
AiExecutorAdapterMap map[string]map[string]executor.AiExecutor
|
||||||
|
AiCollectorAdapterMap map[string]map[string]collector.AiCollector
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewAiService(conf *config.Config, storages *database.AiStorage) (*AiService, error) {
|
||||||
|
var aiType = "1"
|
||||||
|
adapterIds, err := storages.GetAdapterIdsByType(aiType)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
aiService := &AiService{
|
||||||
|
AiExecutorAdapterMap: make(map[string]map[string]executor.AiExecutor),
|
||||||
|
AiCollectorAdapterMap: make(map[string]map[string]collector.AiCollector),
|
||||||
|
}
|
||||||
|
for _, id := range adapterIds {
|
||||||
|
clusters, err := storages.GetClustersByAdapterId(id)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
exeClusterMap, colClusterMap := InitAiClusterMap(conf, clusters.List)
|
||||||
|
aiService.AiExecutorAdapterMap[id] = exeClusterMap
|
||||||
|
aiService.AiCollectorAdapterMap[id] = colClusterMap
|
||||||
|
}
|
||||||
|
|
||||||
|
return aiService, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func InitAiClusterMap(conf *config.Config, clusters []types.ClusterInfo) (map[string]executor.AiExecutor, map[string]collector.AiCollector) {
|
||||||
executorMap := make(map[string]executor.AiExecutor)
|
executorMap := make(map[string]executor.AiExecutor)
|
||||||
collectorMap := make(map[string]collector.AiCollector)
|
collectorMap := make(map[string]collector.AiCollector)
|
||||||
for _, c := range clusters.List {
|
for _, c := range clusters {
|
||||||
switch c.Name {
|
switch c.Name {
|
||||||
case OCTOPUS:
|
case OCTOPUS:
|
||||||
id, _ := strconv.ParseInt(c.Id, 10, 64)
|
id, _ := strconv.ParseInt(c.Id, 10, 64)
|
||||||
|
octopusRpc := octopusclient.NewOctopus(zrpc.MustNewClient(conf.OctopusRpcConf))
|
||||||
octopus := storeLink.NewOctopusLink(octopusRpc, c.Nickname, id)
|
octopus := storeLink.NewOctopusLink(octopusRpc, c.Nickname, id)
|
||||||
collectorMap[c.Id] = octopus
|
collectorMap[c.Id] = octopus
|
||||||
executorMap[c.Id] = octopus
|
executorMap[c.Id] = octopus
|
||||||
case MODELARTS:
|
case MODELARTS:
|
||||||
id, _ := strconv.ParseInt(c.Id, 10, 64)
|
id, _ := strconv.ParseInt(c.Id, 10, 64)
|
||||||
|
modelArtsRpc := modelartsservice.NewModelArtsService(zrpc.MustNewClient(conf.ModelArtsRpcConf))
|
||||||
|
modelArtsImgRpc := imagesservice.NewImagesService(zrpc.MustNewClient(conf.ModelArtsImgRpcConf))
|
||||||
modelarts := storeLink.NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, c.Nickname, id)
|
modelarts := storeLink.NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, c.Nickname, id)
|
||||||
collectorMap[c.Id] = modelarts
|
collectorMap[c.Id] = modelarts
|
||||||
executorMap[c.Id] = modelarts
|
executorMap[c.Id] = modelarts
|
||||||
case SHUGUANGAI:
|
case SHUGUANGAI:
|
||||||
id, _ := strconv.ParseInt(c.Id, 10, 64)
|
id, _ := strconv.ParseInt(c.Id, 10, 64)
|
||||||
|
aCRpc := hpcacclient.NewHpcAC(zrpc.MustNewClient(conf.ACRpcConf))
|
||||||
sgai := storeLink.NewShuguangAi(aCRpc, c.Nickname, id)
|
sgai := storeLink.NewShuguangAi(aCRpc, c.Nickname, id)
|
||||||
collectorMap[c.Id] = sgai
|
collectorMap[c.Id] = sgai
|
||||||
executorMap[c.Id] = sgai
|
executorMap[c.Id] = sgai
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return &executorMap, &collectorMap
|
return executorMap, collectorMap
|
||||||
}
|
}
|
||||||
|
|
|
@ -128,13 +128,13 @@ func GetResourceTypes() []string {
|
||||||
return resourceTypes
|
return resourceTypes
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetDatasetsNames(ctx context.Context, collectorMap *map[string]collector.AiCollector) ([]string, error) {
|
func GetDatasetsNames(ctx context.Context, collectorMap map[string]collector.AiCollector) ([]string, error) {
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
var errCh = make(chan interface{}, len(*collectorMap))
|
var errCh = make(chan interface{}, len(collectorMap))
|
||||||
var errs []interface{}
|
var errs []interface{}
|
||||||
var names []string
|
var names []string
|
||||||
var mu sync.Mutex
|
var mu sync.Mutex
|
||||||
colMap := *collectorMap
|
colMap := collectorMap
|
||||||
for s, col := range colMap {
|
for s, col := range colMap {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
c := col
|
c := col
|
||||||
|
@ -200,14 +200,14 @@ func GetDatasetsNames(ctx context.Context, collectorMap *map[string]collector.Ai
|
||||||
return names, nil
|
return names, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetAlgorithms(ctx context.Context, collectorMap *map[string]collector.AiCollector, resourceType string, taskType string, dataset string) ([]string, error) {
|
func GetAlgorithms(ctx context.Context, collectorMap map[string]collector.AiCollector, resourceType string, taskType string, dataset string) ([]string, error) {
|
||||||
var names []string
|
var names []string
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
var errCh = make(chan interface{}, len(*collectorMap))
|
var errCh = make(chan interface{}, len(collectorMap))
|
||||||
var errs []interface{}
|
var errs []interface{}
|
||||||
var mu sync.Mutex
|
var mu sync.Mutex
|
||||||
|
|
||||||
colMap := *collectorMap
|
colMap := collectorMap
|
||||||
for s, col := range colMap {
|
for s, col := range colMap {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
c := col
|
c := col
|
||||||
|
|
|
@ -116,24 +116,28 @@ func NewServiceContext(c config.Config) *ServiceContext {
|
||||||
})
|
})
|
||||||
|
|
||||||
// scheduler
|
// scheduler
|
||||||
octopusRpc := octopusclient.NewOctopus(zrpc.MustNewClient(c.OctopusRpcConf))
|
//octopusRpc := octopusclient.NewOctopus(zrpc.MustNewClient(c.OctopusRpcConf))
|
||||||
aCRpc := hpcacclient.NewHpcAC(zrpc.MustNewClient(c.ACRpcConf))
|
//aCRpc := hpcacclient.NewHpcAC(zrpc.MustNewClient(c.ACRpcConf))
|
||||||
modelArtsRpc := modelartsservice.NewModelArtsService(zrpc.MustNewClient(c.ModelArtsRpcConf))
|
//modelArtsRpc := modelartsservice.NewModelArtsService(zrpc.MustNewClient(c.ModelArtsRpcConf))
|
||||||
modelArtsImgRpc := imagesservice.NewImagesService(zrpc.MustNewClient(c.ModelArtsImgRpcConf))
|
//modelArtsImgRpc := imagesservice.NewImagesService(zrpc.MustNewClient(c.ModelArtsImgRpcConf))
|
||||||
storage := &database.AiStorage{DbEngin: dbEngin}
|
storage := &database.AiStorage{DbEngin: dbEngin}
|
||||||
aiExecutor, resourceCollector := service.InitAiClusterMap(octopusRpc, modelArtsRpc, modelArtsImgRpc, aCRpc, storage)
|
aiService, err := service.NewAiService(&c, storage)
|
||||||
scheduler := scheduler.NewSchdlr(resourceCollector, storage, aiExecutor)
|
if err != nil {
|
||||||
|
logx.Error(err.Error())
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
scheduler := scheduler.NewSchdlr(aiService, storage)
|
||||||
|
|
||||||
return &ServiceContext{
|
return &ServiceContext{
|
||||||
Cron: cron.New(cron.WithSeconds()),
|
Cron: cron.New(cron.WithSeconds()),
|
||||||
DbEngin: dbEngin,
|
DbEngin: dbEngin,
|
||||||
Config: c,
|
Config: c,
|
||||||
RedisClient: redisClient,
|
RedisClient: redisClient,
|
||||||
ModelArtsRpc: modelArtsRpc,
|
ModelArtsRpc: modelartsservice.NewModelArtsService(zrpc.MustNewClient(c.ModelArtsRpcConf)),
|
||||||
ModelArtsImgRpc: modelArtsImgRpc,
|
ModelArtsImgRpc: imagesservice.NewImagesService(zrpc.MustNewClient(c.ModelArtsImgRpcConf)),
|
||||||
CephRpc: cephclient.NewCeph(zrpc.MustNewClient(c.CephRpcConf)),
|
CephRpc: cephclient.NewCeph(zrpc.MustNewClient(c.CephRpcConf)),
|
||||||
ACRpc: aCRpc,
|
ACRpc: hpcacclient.NewHpcAC(zrpc.MustNewClient(c.ACRpcConf)),
|
||||||
OctopusRpc: octopusRpc,
|
OctopusRpc: octopusclient.NewOctopus(zrpc.MustNewClient(c.OctopusRpcConf)),
|
||||||
OpenstackRpc: openstackclient.NewOpenstack(zrpc.MustNewClient(c.OpenstackRpcConf)),
|
OpenstackRpc: openstackclient.NewOpenstack(zrpc.MustNewClient(c.OpenstackRpcConf)),
|
||||||
K8sRpc: kubernetesclient.NewKubernetes(zrpc.MustNewClient(c.K8sNativeConf)),
|
K8sRpc: kubernetesclient.NewKubernetes(zrpc.MustNewClient(c.K8sNativeConf)),
|
||||||
MonitorClient: make(map[int64]tracker.Prometheus),
|
MonitorClient: make(map[int64]tracker.Prometheus),
|
||||||
|
|
|
@ -5278,7 +5278,8 @@ type ScheduleResult struct {
|
||||||
|
|
||||||
type AiOption struct {
|
type AiOption struct {
|
||||||
TaskName string `json:"taskName"`
|
TaskName string `json:"taskName"`
|
||||||
AiClusterId string `json:"aiClusterId,optional"`
|
AdapterId string `json:"adapterId"`
|
||||||
|
AiClusterIds []string `json:"aiClusterIds"`
|
||||||
ResourceType string `json:"resourceType"`
|
ResourceType string `json:"resourceType"`
|
||||||
Tops float64 `json:"Tops,optional"`
|
Tops float64 `json:"Tops,optional"`
|
||||||
TaskType string `json:"taskType"`
|
TaskType string `json:"taskType"`
|
||||||
|
@ -5299,6 +5300,10 @@ type AiTaskTypesResp struct {
|
||||||
TaskTypes []string `json:"taskTypes"`
|
TaskTypes []string `json:"taskTypes"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type AiDatasetsReq struct {
|
||||||
|
AdapterId string `path:"adapterId"`
|
||||||
|
}
|
||||||
|
|
||||||
type AiDatasetsResp struct {
|
type AiDatasetsResp struct {
|
||||||
Datasets []string `json:"datasets"`
|
Datasets []string `json:"datasets"`
|
||||||
}
|
}
|
||||||
|
@ -5308,6 +5313,7 @@ type AiStrategyResp struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type AiAlgorithmsReq struct {
|
type AiAlgorithmsReq struct {
|
||||||
|
AdapterId string `path:"adapterId"`
|
||||||
ResourceType string `path:"resourceType"`
|
ResourceType string `path:"resourceType"`
|
||||||
TaskType string `path:"taskType"`
|
TaskType string `path:"taskType"`
|
||||||
Dataset string `path:"dataset"`
|
Dataset string `path:"dataset"`
|
||||||
|
@ -5317,6 +5323,156 @@ type AiAlgorithmsResp struct {
|
||||||
Algorithms []string `json:"algorithms"`
|
Algorithms []string `json:"algorithms"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type PullTaskInfoReq struct {
|
||||||
|
AdapterId int64 `form:"adapterId"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type PullTaskInfoResp struct {
|
||||||
|
HpcInfoList []*HpcInfo `json:"HpcInfoList,omitempty"`
|
||||||
|
CloudInfoList []*CloudInfo `json:"CloudInfoList,omitempty"`
|
||||||
|
AiInfoList []*AiInfo `json:"AiInfoList,omitempty"`
|
||||||
|
VmInfoList []*VmInfo `json:"VmInfoList,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type HpcInfo struct {
|
||||||
|
Id int64 `json:"id"` // id
|
||||||
|
TaskId int64 `json:"task_id"` // 任务id
|
||||||
|
JobId string `json:"job_id"` // 作业id(在第三方系统中的作业id)
|
||||||
|
AdapterId int64 `json:"adapter_id"` // 执行任务的适配器id
|
||||||
|
ClusterId int64 `json:"cluster_id"` // 执行任务的集群id
|
||||||
|
ClusterType string `json:"cluster_type"` // 执行任务的集群类型
|
||||||
|
Name string `json:"name"` // 名称
|
||||||
|
Status string `json:"status"` // 状态
|
||||||
|
CmdScript string `json:"cmd_script"`
|
||||||
|
StartTime string `json:"start_time"` // 开始时间
|
||||||
|
RunningTime int64 `json:"running_time"` // 运行时间
|
||||||
|
DerivedEs string `json:"derived_es"`
|
||||||
|
Cluster string `json:"cluster"`
|
||||||
|
BlockId int64 `json:"block_id"`
|
||||||
|
AllocNodes int64 `json:"alloc_nodes"`
|
||||||
|
AllocCpu int64 `json:"alloc_cpu"`
|
||||||
|
CardCount int64 `json:"card_count"` // 卡数
|
||||||
|
Version string `json:"version"`
|
||||||
|
Account string `json:"account"`
|
||||||
|
WorkDir string `json:"work_dir"` // 工作路径
|
||||||
|
AssocId int64 `json:"assoc_id"`
|
||||||
|
ExitCode int64 `json:"exit_code"`
|
||||||
|
WallTime string `json:"wall_time"` // 最大运行时间
|
||||||
|
Result string `json:"result"` // 运行结果
|
||||||
|
DeletedAt string `json:"deleted_at"` // 删除时间
|
||||||
|
YamlString string `json:"yaml_string"`
|
||||||
|
AppType string `json:"app_type"` // 应用类型
|
||||||
|
AppName string `json:"app_name"` // 应用名称
|
||||||
|
Queue string `json:"queue"` // 队列名称
|
||||||
|
SubmitType string `json:"submit_type"` // cmd(命令行模式)
|
||||||
|
NNode string `json:"n_node"` // 节点个数(当指定该参数时,GAP_NODE_STRING必须为"")
|
||||||
|
StdOutFile string `json:"std_out_file"` // 工作路径/std.err.%j
|
||||||
|
StdErrFile string `json:"std_err_file"` // 工作路径/std.err.%j
|
||||||
|
StdInput string `json:"std_input"`
|
||||||
|
Environment string `json:"environment"`
|
||||||
|
DeletedFlag int64 `json:"deleted_flag"` // 是否删除(0-否,1-是)
|
||||||
|
CreatedBy int64 `json:"created_by"` // 创建人
|
||||||
|
CreatedTime string `json:"created_time"` // 创建时间
|
||||||
|
UpdatedBy int64 `json:"updated_by"` // 更新人
|
||||||
|
UpdatedTime string `json:"updated_time"` // 更新时间
|
||||||
|
}
|
||||||
|
|
||||||
|
type CloudInfo struct {
|
||||||
|
Participant int64 `json:"participant,omitempty"`
|
||||||
|
Id int64 `json:"id,omitempty"`
|
||||||
|
TaskId int64 `json:"taskId,omitempty"`
|
||||||
|
ApiVersion string `json:"apiVersion,omitempty"`
|
||||||
|
Kind string `json:"kind,omitempty"`
|
||||||
|
Namespace string `json:"namespace,omitempty"`
|
||||||
|
Name string `json:"name,omitempty"`
|
||||||
|
Status string `json:"status,omitempty"`
|
||||||
|
StartTime string `json:"startTime,omitempty"`
|
||||||
|
RunningTime int64 `json:"runningTime,omitempty"`
|
||||||
|
Result string `json:"result,omitempty"`
|
||||||
|
YamlString string `json:"yamlString,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type AiInfo struct {
|
||||||
|
ParticipantId int64 `json:"participantId,omitempty"`
|
||||||
|
TaskId int64 `json:"taskId,omitempty"`
|
||||||
|
ProjectId string `json:"project_id,omitempty"`
|
||||||
|
Name string `json:"name,omitempty"`
|
||||||
|
Status string `json:"status,omitempty"`
|
||||||
|
StartTime string `json:"startTime,omitempty"`
|
||||||
|
RunningTime int64 `json:"runningTime,omitempty"`
|
||||||
|
Result string `json:"result,omitempty"`
|
||||||
|
JobId string `json:"jobId,omitempty"`
|
||||||
|
CreateTime string `json:"createTime,omitempty"`
|
||||||
|
ImageUrl string `json:"imageUrl,omitempty"`
|
||||||
|
Command string `json:"command,omitempty"`
|
||||||
|
FlavorId string `json:"flavorId,omitempty"`
|
||||||
|
SubscriptionId string `json:"subscriptionId,omitempty"`
|
||||||
|
ItemVersionId string `json:"itemVersionId,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type VmInfo struct {
|
||||||
|
ParticipantId int64 `json:"participantId,omitempty"`
|
||||||
|
TaskId int64 `json:"taskId,omitempty"`
|
||||||
|
Name string `json:"name,omitempty"`
|
||||||
|
FlavorRef string `json:"flavor_ref,omitempty"`
|
||||||
|
ImageRef string `json:"image_ref,omitempty"`
|
||||||
|
NetworkUuid string `json:"network_uuid,omitempty"`
|
||||||
|
BlockUuid string `json:"block_uuid,omitempty"`
|
||||||
|
SourceType string `json:"source_type,omitempty"`
|
||||||
|
DeleteOnTermination bool `json:"delete_on_termination,omitempty"`
|
||||||
|
Status string `json:"status,omitempty"`
|
||||||
|
MinCount string `json:"min_count,omitempty"`
|
||||||
|
Platform string `json:"platform,omitempty"`
|
||||||
|
Uuid string `json:"uuid,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type PushTaskInfoReq struct {
|
||||||
|
AdapterId int64 `json:"adapterId"`
|
||||||
|
HpcInfoList []*HpcInfo `json:"hpcInfoList"`
|
||||||
|
CloudInfoList []*CloudInfo `json:"cloudInfoList"`
|
||||||
|
AiInfoList []*AiInfo `json:"aiInfoList"`
|
||||||
|
VmInfoList []*VmInfo `json:"vmInfoList"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type PushTaskInfoResp struct {
|
||||||
|
Code int64 `json:"code"`
|
||||||
|
Msg string `json:"msg"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type PushResourceInfoReq struct {
|
||||||
|
AdapterId int64 `json:"adapterId"`
|
||||||
|
ResourceStats []ResourceStats `json:"resourceStats"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type PushResourceInfoResp struct {
|
||||||
|
Code int64 `json:"code"`
|
||||||
|
Msg string `json:"msg"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ResourceStats struct {
|
||||||
|
ClusterId int64 `json:"clusterId"`
|
||||||
|
Name string `json:"name"`
|
||||||
|
CpuCoreAvail int64 `json:"cpuCoreAvail"`
|
||||||
|
CpuCoreTotal int64 `json:"cpuCoreTotal"`
|
||||||
|
MemAvail float64 `json:"memAvail"`
|
||||||
|
MemTotal float64 `json:"memTotal"`
|
||||||
|
DiskAvail float64 `json:"diskAvail"`
|
||||||
|
DiskTotal float64 `json:"diskTotal"`
|
||||||
|
GpuAvail int64 `json:"gpuAvail"`
|
||||||
|
CardsAvail []*Card `json:"cardsAvail"`
|
||||||
|
CpuCoreHours float64 `json:"cpuCoreHours"`
|
||||||
|
Balance float64 `json:"balance"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type Card struct {
|
||||||
|
Platform string `json:"platform"`
|
||||||
|
Type string `json:"type"`
|
||||||
|
Name string `json:"name"`
|
||||||
|
TOpsAtFp16 float64 `json:"TOpsAtFp16"`
|
||||||
|
CardHours float64 `json:"cardHours"`
|
||||||
|
CardNum int32 `json:"cardNum"`
|
||||||
|
}
|
||||||
|
|
||||||
type CreateAlertRuleReq struct {
|
type CreateAlertRuleReq struct {
|
||||||
CLusterId int64 `json:"clusterId"`
|
CLusterId int64 `json:"clusterId"`
|
||||||
ClusterName string `json:"clusterName"`
|
ClusterName string `json:"clusterName"`
|
||||||
|
|
Loading…
Reference in New Issue