added adapterId for ai scheduler submit func
Former-commit-id: 94e73c0fd326bf84d26196d97cc70654994ceca2
This commit is contained in:
parent
a440f41fbf
commit
ea7216e153
|
@ -26,7 +26,7 @@ func NewScheduleGetAlgorithmsLogic(ctx context.Context, svcCtx *svc.ServiceConte
|
|||
|
||||
func (l *ScheduleGetAlgorithmsLogic) ScheduleGetAlgorithms(req *types.AiAlgorithmsReq) (resp *types.AiAlgorithmsResp, err error) {
|
||||
resp = &types.AiAlgorithmsResp{}
|
||||
algorithms, err := storeLink.GetAlgorithms(l.ctx, l.svcCtx.Scheduler.ResourceCollector, req.ResourceType, req.TaskType, req.Dataset)
|
||||
algorithms, err := storeLink.GetAlgorithms(l.ctx, l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap["1777144940459986944"], req.ResourceType, req.TaskType, req.Dataset)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -25,7 +25,7 @@ func NewScheduleGetDatasetsLogic(ctx context.Context, svcCtx *svc.ServiceContext
|
|||
|
||||
func (l *ScheduleGetDatasetsLogic) ScheduleGetDatasets() (resp *types.AiDatasetsResp, err error) {
|
||||
resp = &types.AiDatasetsResp{}
|
||||
names, err := storeLink.GetDatasetsNames(l.ctx, l.svcCtx.Scheduler.ResourceCollector)
|
||||
names, err := storeLink.GetDatasetsNames(l.ctx, l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap["1777144940459986944"])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -33,6 +33,21 @@ func (s *AiStorage) GetClustersByAdapterId(id string) (*types.ClusterListResp, e
|
|||
return &resp, nil
|
||||
}
|
||||
|
||||
func (s *AiStorage) GetAdapterIdsByType(adapterType string) ([]string, error) {
|
||||
var list []types.AdapterInfo
|
||||
var ids []string
|
||||
db := s.DbEngin.Model(&types.AdapterInfo{}).Table("t_adapter")
|
||||
db = db.Where("type = ?", adapterType)
|
||||
err := db.Order("create_time desc").Find(&list).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, info := range list {
|
||||
ids = append(ids, info.Id)
|
||||
}
|
||||
return ids, nil
|
||||
}
|
||||
|
||||
func (s *AiStorage) SaveTask(name string) error {
|
||||
// 构建主任务结构体
|
||||
taskModel := models.Task{
|
||||
|
|
|
@ -20,8 +20,7 @@ import (
|
|||
"github.com/zeromicro/go-zero/core/logx"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/common"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/database"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/executor"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/rpc/client/participantservice"
|
||||
|
@ -32,16 +31,15 @@ import (
|
|||
)
|
||||
|
||||
type Scheduler struct {
|
||||
task *response.TaskInfo
|
||||
participantIds []int64
|
||||
subSchedule SubSchedule
|
||||
dbEngin *gorm.DB
|
||||
result []string //pID:子任务yamlstring 键值对
|
||||
participantRpc participantservice.ParticipantService
|
||||
ResourceCollector *map[string]collector.AiCollector
|
||||
AiStorages *database.AiStorage
|
||||
AiExecutor *map[string]executor.AiExecutor
|
||||
mu sync.RWMutex
|
||||
task *response.TaskInfo
|
||||
participantIds []int64
|
||||
subSchedule SubSchedule
|
||||
dbEngin *gorm.DB
|
||||
result []string //pID:子任务yamlstring 键值对
|
||||
participantRpc participantservice.ParticipantService
|
||||
AiStorages *database.AiStorage
|
||||
AiService *service.AiService
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
type SubSchedule interface {
|
||||
|
@ -59,8 +57,8 @@ func NewScheduler(subSchedule SubSchedule, val string, dbEngin *gorm.DB, partici
|
|||
return &Scheduler{task: task, subSchedule: subSchedule, dbEngin: dbEngin, participantRpc: participantRpc}, nil
|
||||
}
|
||||
|
||||
func NewSchdlr(resourceCollector *map[string]collector.AiCollector, storages *database.AiStorage, aiExecutor *map[string]executor.AiExecutor) *Scheduler {
|
||||
return &Scheduler{ResourceCollector: resourceCollector, AiStorages: storages, AiExecutor: aiExecutor}
|
||||
func NewSchdlr(aiService *service.AiService, storages *database.AiStorage) *Scheduler {
|
||||
return &Scheduler{AiService: aiService, AiStorages: storages}
|
||||
}
|
||||
|
||||
func (s *Scheduler) SpecifyClusters() {
|
||||
|
|
|
@ -64,9 +64,8 @@ func (as *AiScheduler) GetNewStructForDb(task *response.TaskInfo, resource strin
|
|||
}
|
||||
|
||||
func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
|
||||
if as.option.AiClusterId != "" {
|
||||
// TODO database operation Find
|
||||
return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: "", Replicas: 1}}, nil
|
||||
if len(as.option.ClusterIds) == 1 {
|
||||
return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: as.option.ClusterIds[0], Replicas: 1}}, nil
|
||||
}
|
||||
|
||||
resources, err := as.findClustersWithResources()
|
||||
|
@ -131,7 +130,7 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa
|
|||
var ch = make(chan *AiResult, len(clusters))
|
||||
var errCh = make(chan interface{}, len(clusters))
|
||||
|
||||
executorMap := *as.AiExecutor
|
||||
executorMap := as.AiService.AiExecutorAdapterMap[as.option.AdapterId]
|
||||
for _, cluster := range clusters {
|
||||
c := cluster
|
||||
wg.Add(1)
|
||||
|
@ -202,13 +201,14 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa
|
|||
|
||||
func (as *AiScheduler) findClustersWithResources() ([]*collector.ResourceStats, error) {
|
||||
var wg sync.WaitGroup
|
||||
var ch = make(chan *collector.ResourceStats, len(*as.ResourceCollector))
|
||||
var errCh = make(chan interface{}, len(*as.ResourceCollector))
|
||||
var clustersNum = len(as.AiService.AiCollectorAdapterMap[as.option.AdapterId])
|
||||
var ch = make(chan *collector.ResourceStats, clustersNum)
|
||||
var errCh = make(chan interface{}, clustersNum)
|
||||
|
||||
var resourceSpecs []*collector.ResourceStats
|
||||
var errs []interface{}
|
||||
|
||||
for s, resourceCollector := range *as.ResourceCollector {
|
||||
for s, resourceCollector := range as.AiService.AiCollectorAdapterMap[as.option.AdapterId] {
|
||||
wg.Add(1)
|
||||
rc := resourceCollector
|
||||
id := s
|
||||
|
@ -242,7 +242,7 @@ func (as *AiScheduler) findClustersWithResources() ([]*collector.ResourceStats,
|
|||
errs = append(errs, e)
|
||||
}
|
||||
|
||||
if len(errs) == len(*as.ResourceCollector) {
|
||||
if len(errs) == clustersNum {
|
||||
return nil, errors.New("get resources failed")
|
||||
}
|
||||
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
package option
|
||||
|
||||
type AiOption struct {
|
||||
AiClusterId string // shuguangAi /octopus ClusterId
|
||||
AdapterId string
|
||||
ClusterIds []string
|
||||
TaskName string
|
||||
ResourceType string // cpu/gpu/compute card
|
||||
CpuCoreNum int64
|
||||
|
|
|
@ -1,11 +1,14 @@
|
|||
package service
|
||||
|
||||
import (
|
||||
"github.com/zeromicro/go-zero/zrpc"
|
||||
"gitlink.org.cn/JointCloud/pcm-ac/hpcacclient"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/config"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/database"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/executor"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/storeLink"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
|
||||
"gitlink.org.cn/JointCloud/pcm-octopus/octopusclient"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/imagesservice"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/modelartsservice"
|
||||
|
@ -18,30 +21,60 @@ const (
|
|||
SHUGUANGAI = "shuguangAi"
|
||||
)
|
||||
|
||||
func InitAiClusterMap(octopusRpc octopusclient.Octopus, modelArtsRpc modelartsservice.ModelArtsService, modelArtsImgRpc imagesservice.ImagesService, aCRpc hpcacclient.HpcAC, storages *database.AiStorage) (*map[string]executor.AiExecutor, *map[string]collector.AiCollector) {
|
||||
clusters, _ := storages.GetClustersByAdapterId("1777144940459986944")
|
||||
type AiService struct {
|
||||
AiExecutorAdapterMap map[string]map[string]executor.AiExecutor
|
||||
AiCollectorAdapterMap map[string]map[string]collector.AiCollector
|
||||
}
|
||||
|
||||
func NewAiService(conf *config.Config, storages *database.AiStorage) (*AiService, error) {
|
||||
var aiType = "1"
|
||||
adapterIds, err := storages.GetAdapterIdsByType(aiType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
aiService := &AiService{
|
||||
AiExecutorAdapterMap: make(map[string]map[string]executor.AiExecutor),
|
||||
AiCollectorAdapterMap: make(map[string]map[string]collector.AiCollector),
|
||||
}
|
||||
for _, id := range adapterIds {
|
||||
clusters, err := storages.GetClustersByAdapterId(id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
exeClusterMap, colClusterMap := InitAiClusterMap(conf, clusters.List)
|
||||
aiService.AiExecutorAdapterMap[id] = exeClusterMap
|
||||
aiService.AiCollectorAdapterMap[id] = colClusterMap
|
||||
}
|
||||
|
||||
return aiService, nil
|
||||
}
|
||||
|
||||
func InitAiClusterMap(conf *config.Config, clusters []types.ClusterInfo) (map[string]executor.AiExecutor, map[string]collector.AiCollector) {
|
||||
executorMap := make(map[string]executor.AiExecutor)
|
||||
collectorMap := make(map[string]collector.AiCollector)
|
||||
for _, c := range clusters.List {
|
||||
for _, c := range clusters {
|
||||
switch c.Name {
|
||||
case OCTOPUS:
|
||||
id, _ := strconv.ParseInt(c.Id, 10, 64)
|
||||
octopusRpc := octopusclient.NewOctopus(zrpc.MustNewClient(conf.OctopusRpcConf))
|
||||
octopus := storeLink.NewOctopusLink(octopusRpc, c.Nickname, id)
|
||||
collectorMap[c.Id] = octopus
|
||||
executorMap[c.Id] = octopus
|
||||
case MODELARTS:
|
||||
id, _ := strconv.ParseInt(c.Id, 10, 64)
|
||||
modelArtsRpc := modelartsservice.NewModelArtsService(zrpc.MustNewClient(conf.ModelArtsRpcConf))
|
||||
modelArtsImgRpc := imagesservice.NewImagesService(zrpc.MustNewClient(conf.ModelArtsImgRpcConf))
|
||||
modelarts := storeLink.NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, c.Nickname, id)
|
||||
collectorMap[c.Id] = modelarts
|
||||
executorMap[c.Id] = modelarts
|
||||
case SHUGUANGAI:
|
||||
id, _ := strconv.ParseInt(c.Id, 10, 64)
|
||||
aCRpc := hpcacclient.NewHpcAC(zrpc.MustNewClient(conf.ACRpcConf))
|
||||
sgai := storeLink.NewShuguangAi(aCRpc, c.Nickname, id)
|
||||
collectorMap[c.Id] = sgai
|
||||
executorMap[c.Id] = sgai
|
||||
}
|
||||
}
|
||||
|
||||
return &executorMap, &collectorMap
|
||||
return executorMap, collectorMap
|
||||
}
|
||||
|
|
|
@ -128,13 +128,13 @@ func GetResourceTypes() []string {
|
|||
return resourceTypes
|
||||
}
|
||||
|
||||
func GetDatasetsNames(ctx context.Context, collectorMap *map[string]collector.AiCollector) ([]string, error) {
|
||||
func GetDatasetsNames(ctx context.Context, collectorMap map[string]collector.AiCollector) ([]string, error) {
|
||||
var wg sync.WaitGroup
|
||||
var errCh = make(chan interface{}, len(*collectorMap))
|
||||
var errCh = make(chan interface{}, len(collectorMap))
|
||||
var errs []interface{}
|
||||
var names []string
|
||||
var mu sync.Mutex
|
||||
colMap := *collectorMap
|
||||
colMap := collectorMap
|
||||
for s, col := range colMap {
|
||||
wg.Add(1)
|
||||
c := col
|
||||
|
@ -200,14 +200,14 @@ func GetDatasetsNames(ctx context.Context, collectorMap *map[string]collector.Ai
|
|||
return names, nil
|
||||
}
|
||||
|
||||
func GetAlgorithms(ctx context.Context, collectorMap *map[string]collector.AiCollector, resourceType string, taskType string, dataset string) ([]string, error) {
|
||||
func GetAlgorithms(ctx context.Context, collectorMap map[string]collector.AiCollector, resourceType string, taskType string, dataset string) ([]string, error) {
|
||||
var names []string
|
||||
var wg sync.WaitGroup
|
||||
var errCh = make(chan interface{}, len(*collectorMap))
|
||||
var errCh = make(chan interface{}, len(collectorMap))
|
||||
var errs []interface{}
|
||||
var mu sync.Mutex
|
||||
|
||||
colMap := *collectorMap
|
||||
colMap := collectorMap
|
||||
for s, col := range colMap {
|
||||
wg.Add(1)
|
||||
c := col
|
||||
|
|
|
@ -116,24 +116,28 @@ func NewServiceContext(c config.Config) *ServiceContext {
|
|||
})
|
||||
|
||||
// scheduler
|
||||
octopusRpc := octopusclient.NewOctopus(zrpc.MustNewClient(c.OctopusRpcConf))
|
||||
aCRpc := hpcacclient.NewHpcAC(zrpc.MustNewClient(c.ACRpcConf))
|
||||
modelArtsRpc := modelartsservice.NewModelArtsService(zrpc.MustNewClient(c.ModelArtsRpcConf))
|
||||
modelArtsImgRpc := imagesservice.NewImagesService(zrpc.MustNewClient(c.ModelArtsImgRpcConf))
|
||||
//octopusRpc := octopusclient.NewOctopus(zrpc.MustNewClient(c.OctopusRpcConf))
|
||||
//aCRpc := hpcacclient.NewHpcAC(zrpc.MustNewClient(c.ACRpcConf))
|
||||
//modelArtsRpc := modelartsservice.NewModelArtsService(zrpc.MustNewClient(c.ModelArtsRpcConf))
|
||||
//modelArtsImgRpc := imagesservice.NewImagesService(zrpc.MustNewClient(c.ModelArtsImgRpcConf))
|
||||
storage := &database.AiStorage{DbEngin: dbEngin}
|
||||
aiExecutor, resourceCollector := service.InitAiClusterMap(octopusRpc, modelArtsRpc, modelArtsImgRpc, aCRpc, storage)
|
||||
scheduler := scheduler.NewSchdlr(resourceCollector, storage, aiExecutor)
|
||||
aiService, err := service.NewAiService(&c, storage)
|
||||
if err != nil {
|
||||
logx.Error(err.Error())
|
||||
return nil
|
||||
}
|
||||
scheduler := scheduler.NewSchdlr(aiService, storage)
|
||||
|
||||
return &ServiceContext{
|
||||
Cron: cron.New(cron.WithSeconds()),
|
||||
DbEngin: dbEngin,
|
||||
Config: c,
|
||||
RedisClient: redisClient,
|
||||
ModelArtsRpc: modelArtsRpc,
|
||||
ModelArtsImgRpc: modelArtsImgRpc,
|
||||
ModelArtsRpc: modelartsservice.NewModelArtsService(zrpc.MustNewClient(c.ModelArtsRpcConf)),
|
||||
ModelArtsImgRpc: imagesservice.NewImagesService(zrpc.MustNewClient(c.ModelArtsImgRpcConf)),
|
||||
CephRpc: cephclient.NewCeph(zrpc.MustNewClient(c.CephRpcConf)),
|
||||
ACRpc: aCRpc,
|
||||
OctopusRpc: octopusRpc,
|
||||
ACRpc: hpcacclient.NewHpcAC(zrpc.MustNewClient(c.ACRpcConf)),
|
||||
OctopusRpc: octopusclient.NewOctopus(zrpc.MustNewClient(c.OctopusRpcConf)),
|
||||
OpenstackRpc: openstackclient.NewOpenstack(zrpc.MustNewClient(c.OpenstackRpcConf)),
|
||||
K8sRpc: kubernetesclient.NewKubernetes(zrpc.MustNewClient(c.K8sNativeConf)),
|
||||
MonitorClient: make(map[int64]tracker.Prometheus),
|
||||
|
|
Loading…
Reference in New Issue