fix cycle import err

Former-commit-id: 7b1ada2f3eeaf0d03d67d2b534de2eda17e840ff
This commit is contained in:
tzwang 2024-03-30 16:44:10 +08:00
parent 96f3a16b71
commit 6b2ae7c0f5
13 changed files with 77 additions and 22 deletions

View File

@ -2,6 +2,8 @@ package schedule
import ( import (
"context" "context"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
@ -24,7 +26,28 @@ func NewScheduleSubmitLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Sc
} }
func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *types.ScheduleResp, err error) { func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *types.ScheduleResp, err error) {
// todo: add your logic here and delete this line resp = &types.ScheduleResp{}
opt := &option.AiOption{
ResourceType: req.AiOption.ResourceType,
Tops: 0,
TaskType: req.AiOption.TaskType,
DatasetsName: req.AiOption.Datasets,
AlgorithmName: "cnn",
StrategyName: req.AiOption.Strategy,
ClusterToStaticWeight: nil,
Params: []string{
"epoch,1",
},
}
aiSchdl, err := schedulers.NewAiScheduler(l.ctx, "", l.svcCtx.Scheduler, opt)
if err != nil {
return nil, err
}
return err = l.svcCtx.Scheduler.AssignAndSchedule(aiSchdl)
if err != nil {
return nil, err
}
return resp, nil
} }

View File

@ -47,7 +47,7 @@ func (l *DeleteLinkImageLogic) DeleteLinkImage(req *types.DeleteLinkImageReq) (r
return resp, nil return resp, nil
} }
storelink := storeLink.NewStoreLink(l.svcCtx, participant) storelink := storeLink.NewStoreLink(l.svcCtx.OctopusRpc, l.svcCtx.ModelArtsRpc, l.svcCtx.ModelArtsImgRpc, l.svcCtx.ACRpc, participant)
if storelink == nil { if storelink == nil {
return nil, nil return nil, nil
} }

View File

@ -47,7 +47,7 @@ func (l *DeleteLinkTaskLogic) DeleteLinkTask(req *types.DeleteLinkTaskReq) (resp
return resp, nil return resp, nil
} }
storelink := storeLink.NewStoreLink(l.svcCtx, participant) storelink := storeLink.NewStoreLink(l.svcCtx.OctopusRpc, l.svcCtx.ModelArtsRpc, l.svcCtx.ModelArtsImgRpc, l.svcCtx.ACRpc, participant)
if storelink == nil { if storelink == nil {
return nil, nil return nil, nil
} }

View File

@ -47,7 +47,7 @@ func (l *GetAISpecsLogic) GetAISpecs(req *types.GetResourceSpecsReq) (resp *type
return resp, nil return resp, nil
} }
storelink := storeLink.NewStoreLink(l.svcCtx, participant) storelink := storeLink.NewStoreLink(l.svcCtx.OctopusRpc, l.svcCtx.ModelArtsRpc, l.svcCtx.ModelArtsImgRpc, l.svcCtx.ACRpc, participant)
if storelink == nil { if storelink == nil {
return nil, nil return nil, nil
} }

View File

@ -47,7 +47,7 @@ func (l *GetLinkImageListLogic) GetLinkImageList(req *types.GetLinkImageListReq)
return resp, nil return resp, nil
} }
storelink := storeLink.NewStoreLink(l.svcCtx, participant) storelink := storeLink.NewStoreLink(l.svcCtx.OctopusRpc, l.svcCtx.ModelArtsRpc, l.svcCtx.ModelArtsImgRpc, l.svcCtx.ACRpc, participant)
if storelink == nil { if storelink == nil {
return nil, nil return nil, nil
} }

View File

@ -48,7 +48,7 @@ func (l *GetLinkTaskLogic) GetLinkTask(req *types.GetLinkTaskReq) (resp *types.G
return resp, nil return resp, nil
} }
storelink := storeLink.NewStoreLink(l.svcCtx, participant) storelink := storeLink.NewStoreLink(l.svcCtx.OctopusRpc, l.svcCtx.ModelArtsRpc, l.svcCtx.ModelArtsImgRpc, l.svcCtx.ACRpc, participant)
if storelink == nil { if storelink == nil {
return nil, nil return nil, nil
} }

View File

@ -48,7 +48,7 @@ func (l *SubmitLinkTaskLogic) SubmitLinkTask(req *types.SubmitLinkTaskReq) (resp
return resp, nil return resp, nil
} }
storelink := storeLink.NewStoreLink(l.svcCtx, participant) storelink := storeLink.NewStoreLink(l.svcCtx.OctopusRpc, l.svcCtx.ModelArtsRpc, l.svcCtx.ModelArtsImgRpc, l.svcCtx.ACRpc, participant)
if storelink == nil { if storelink == nil {
return nil, nil return nil, nil
} }

View File

@ -48,7 +48,7 @@ func (l *UploadLinkImageLogic) UploadLinkImage(req *types.UploadLinkImageReq) (r
return resp, nil return resp, nil
} }
storelink := storeLink.NewStoreLink(l.svcCtx, participant) storelink := storeLink.NewStoreLink(l.svcCtx.OctopusRpc, l.svcCtx.ModelArtsRpc, l.svcCtx.ModelArtsImgRpc, l.svcCtx.ACRpc, participant)
if storelink == nil { if storelink == nil {
return nil, nil return nil, nil
} }

View File

@ -2,18 +2,44 @@ package database
import ( import (
"github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"gorm.io/gorm" "gorm.io/gorm"
"time"
) )
type AiStorage struct { type AiStorage struct {
DbEngin *gorm.DB DbEngin *gorm.DB
} }
func (s *AiStorage) GetParticipants() { func (s *AiStorage) GetParticipants() (*types.ClusterListResp, error) {
var resp types.ClusterListResp var resp types.ClusterListResp
tx := s.DbEngin.Raw("select * from t_cluster where `deleted_at` IS NULL ORDER BY create_time Desc").Scan(&resp.List) tx := s.DbEngin.Raw("select * from t_cluster where `deleted_at` IS NULL ORDER BY create_time Desc").Scan(&resp.List)
if tx.Error != nil { if tx.Error != nil {
logx.Errorf(tx.Error.Error()) logx.Errorf(tx.Error.Error())
return nil, tx.Error
} }
return &resp, nil
}
func (s *AiStorage) SaveTask(cluster strategy.AssignedCluster) error {
// 构建主任务结构体
taskModel := models.Task{
Status: constants.Saved,
Description: "ai task",
Name: "testAi",
CommitTime: time.Now(),
}
// 保存任务数据到数据库
tx := s.DbEngin.Create(&taskModel)
if tx.Error != nil {
return tx.Error
}
return nil
}
func (s *AiStorage) UpdateTask() error {
return nil
} }

View File

@ -39,7 +39,7 @@ type Scheduler struct {
result []string //pID:子任务yamlstring 键值对 result []string //pID:子任务yamlstring 键值对
participantRpc participantservice.ParticipantService participantRpc participantservice.ParticipantService
ResourceCollector *map[string]collector.AiCollector ResourceCollector *map[string]collector.AiCollector
Storages database.Storage AiStorages *database.AiStorage
AiExecutor *map[string]executor.AiExecutor AiExecutor *map[string]executor.AiExecutor
mu sync.RWMutex mu sync.RWMutex
} }
@ -59,8 +59,8 @@ func NewScheduler(subSchedule SubSchedule, val string, dbEngin *gorm.DB, partici
return &Scheduler{task: task, subSchedule: subSchedule, dbEngin: dbEngin, participantRpc: participantRpc}, nil return &Scheduler{task: task, subSchedule: subSchedule, dbEngin: dbEngin, participantRpc: participantRpc}, nil
} }
func NewSchdlr(resourceCollector *map[string]collector.AiCollector, storages database.Storage, aiExecutor *map[string]executor.AiExecutor) *Scheduler { func NewSchdlr(resourceCollector *map[string]collector.AiCollector, storages *database.AiStorage, aiExecutor *map[string]executor.AiExecutor) *Scheduler {
return &Scheduler{ResourceCollector: resourceCollector, Storages: storages, AiExecutor: aiExecutor} return &Scheduler{ResourceCollector: resourceCollector, AiStorages: storages, AiExecutor: aiExecutor}
} }
func (s *Scheduler) SpecifyClusters() { func (s *Scheduler) SpecifyClusters() {

View File

@ -58,6 +58,7 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
} }
resources, err := as.findClustersWithResources() resources, err := as.findClustersWithResources()
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -19,13 +19,16 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/common" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/common"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils/timeutils" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils/timeutils"
"gitlink.org.cn/jcce-pcm/pcm-ac/hpcAC" "gitlink.org.cn/jcce-pcm/pcm-ac/hpcAC"
"gitlink.org.cn/jcce-pcm/pcm-ac/hpcacclient"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/imagesservice"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/modelartsservice"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/modelarts" "gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/modelarts"
"gitlink.org.cn/jcce-pcm/pcm-participant-octopus/octopus" "gitlink.org.cn/jcce-pcm/pcm-participant-octopus/octopus"
"gitlink.org.cn/jcce-pcm/pcm-participant-octopus/octopusclient"
"gorm.io/gorm" "gorm.io/gorm"
"strings" "strings"
"sync" "sync"
@ -89,19 +92,19 @@ type StoreLink struct {
ILinkage Linkage ILinkage Linkage
} }
func NewStoreLink(svcCtx *svc.ServiceContext, participant *models.StorelinkCenter) *StoreLink { func NewStoreLink(octopusRpc octopusclient.Octopus, modelArtsRpc modelartsservice.ModelArtsService, modelArtsImgRpc imagesservice.ImagesService, aCRpc hpcacclient.HpcAC, participant *models.StorelinkCenter) *StoreLink {
switch participant.Type { switch participant.Type {
case TYPE_OCTOPUS: case TYPE_OCTOPUS:
linkStruct := NewOctopusLink(svcCtx.OctopusRpc, participant.Name, participant.Id) linkStruct := NewOctopusLink(octopusRpc, participant.Name, participant.Id)
return &StoreLink{ILinkage: linkStruct} return &StoreLink{ILinkage: linkStruct}
case TYPE_MODELARTS: case TYPE_MODELARTS:
linkStruct := NewModelArtsLink(svcCtx.ModelArtsRpc, svcCtx.ModelArtsImgRpc, participant.Name, participant.Id) linkStruct := NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, participant.Name, participant.Id)
return &StoreLink{ILinkage: linkStruct} return &StoreLink{ILinkage: linkStruct}
case TYPE_SHUGUANGAI: case TYPE_SHUGUANGAI:
linkStruct := NewShuguangAi(svcCtx.ACRpc, participant.Name, participant.Id) linkStruct := NewShuguangAi(aCRpc, participant.Name, participant.Id)
return &StoreLink{ILinkage: linkStruct} return &StoreLink{ILinkage: linkStruct}
case TYPE_SHUGUANGHPC: case TYPE_SHUGUANGHPC:
linkStruct := NewShuguangHpc(svcCtx.ACRpc, participant.Name, participant.Id) linkStruct := NewShuguangHpc(aCRpc, participant.Name, participant.Id)
return &StoreLink{ILinkage: linkStruct} return &StoreLink{ILinkage: linkStruct}
default: default:
return nil return nil

View File

@ -23,6 +23,8 @@ import (
"github.com/zeromicro/go-zero/zrpc" "github.com/zeromicro/go-zero/zrpc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/config" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/config"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/database"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/tracker" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/tracker"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/JointCloud/pcm-coordinator/rpc/client/participantservice" "gitlink.org.cn/JointCloud/pcm-coordinator/rpc/client/participantservice"
@ -118,9 +120,9 @@ func NewServiceContext(c config.Config) *ServiceContext {
aCRpc := hpcacclient.NewHpcAC(zrpc.MustNewClient(c.ACRpcConf)) aCRpc := hpcacclient.NewHpcAC(zrpc.MustNewClient(c.ACRpcConf))
modelArtsRpc := modelartsservice.NewModelArtsService(zrpc.MustNewClient(c.ModelArtsRpcConf)) modelArtsRpc := modelartsservice.NewModelArtsService(zrpc.MustNewClient(c.ModelArtsRpcConf))
modelArtsImgRpc := imagesservice.NewImagesService(zrpc.MustNewClient(c.ModelArtsImgRpcConf)) modelArtsImgRpc := imagesservice.NewImagesService(zrpc.MustNewClient(c.ModelArtsImgRpcConf))
//aiExecutor, resourceCollector := service2.InitAiClusterMap(octopusRpc, modelArtsRpc, modelArtsImgRpc, aCRpc) aiExecutor, resourceCollector := service.InitAiClusterMap(octopusRpc, modelArtsRpc, modelArtsImgRpc, aCRpc)
//storage := &database.AiStorage{DbEngin: dbEngin} storage := &database.AiStorage{DbEngin: dbEngin}
scheduler := scheduler.NewSchdlr(nil, nil, nil) scheduler := scheduler.NewSchdlr(resourceCollector, storage, aiExecutor)
return &ServiceContext{ return &ServiceContext{
Cron: cron.New(cron.WithSeconds()), Cron: cron.New(cron.WithSeconds()),