From 6b2ae7c0f516256d8173fe12cec4acf280eb1374 Mon Sep 17 00:00:00 2001 From: tzwang Date: Sat, 30 Mar 2024 16:44:10 +0800 Subject: [PATCH] fix cycle import err Former-commit-id: 7b1ada2f3eeaf0d03d67d2b534de2eda17e840ff --- .../logic/schedule/schedulesubmitlogic.go | 27 ++++++++++++++++-- .../logic/storelink/deletelinkimagelogic.go | 2 +- .../logic/storelink/deletelinktasklogic.go | 2 +- .../logic/storelink/getaispecslogic.go | 2 +- .../logic/storelink/getlinkimagelistlogic.go | 2 +- .../logic/storelink/getlinktasklogic.go | 2 +- .../logic/storelink/submitlinktasklogic.go | 2 +- .../logic/storelink/uploadlinkimagelogic.go | 2 +- api/internal/scheduler/database/aiStorage.go | 28 ++++++++++++++++++- api/internal/scheduler/scheduler.go | 6 ++-- .../scheduler/schedulers/aiScheduler.go | 1 + api/internal/storeLink/storeLink.go | 15 ++++++---- api/internal/svc/servicecontext.go | 8 ++++-- 13 files changed, 77 insertions(+), 22 deletions(-) diff --git a/api/internal/logic/schedule/schedulesubmitlogic.go b/api/internal/logic/schedule/schedulesubmitlogic.go index fad58867..fc7469f9 100644 --- a/api/internal/logic/schedule/schedulesubmitlogic.go +++ b/api/internal/logic/schedule/schedulesubmitlogic.go @@ -2,6 +2,8 @@ package schedule import ( "context" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" @@ -24,7 +26,28 @@ func NewScheduleSubmitLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Sc } func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *types.ScheduleResp, err error) { - // todo: add your logic here and delete this line + resp = &types.ScheduleResp{} + opt := &option.AiOption{ + ResourceType: req.AiOption.ResourceType, + Tops: 0, + TaskType: req.AiOption.TaskType, + DatasetsName: req.AiOption.Datasets, + AlgorithmName: "cnn", + StrategyName: req.AiOption.Strategy, + ClusterToStaticWeight: nil, + Params: []string{ + "epoch,1", + }, + } + aiSchdl, err := schedulers.NewAiScheduler(l.ctx, "", l.svcCtx.Scheduler, opt) + if err != nil { + return nil, err + } - return + err = l.svcCtx.Scheduler.AssignAndSchedule(aiSchdl) + if err != nil { + return nil, err + } + + return resp, nil } diff --git a/api/internal/logic/storelink/deletelinkimagelogic.go b/api/internal/logic/storelink/deletelinkimagelogic.go index 25eb2f4d..6b2f91e5 100644 --- a/api/internal/logic/storelink/deletelinkimagelogic.go +++ b/api/internal/logic/storelink/deletelinkimagelogic.go @@ -47,7 +47,7 @@ func (l *DeleteLinkImageLogic) DeleteLinkImage(req *types.DeleteLinkImageReq) (r return resp, nil } - storelink := storeLink.NewStoreLink(l.svcCtx, participant) + storelink := storeLink.NewStoreLink(l.svcCtx.OctopusRpc, l.svcCtx.ModelArtsRpc, l.svcCtx.ModelArtsImgRpc, l.svcCtx.ACRpc, participant) if storelink == nil { return nil, nil } diff --git a/api/internal/logic/storelink/deletelinktasklogic.go b/api/internal/logic/storelink/deletelinktasklogic.go index 8f377f62..1321f983 100644 --- a/api/internal/logic/storelink/deletelinktasklogic.go +++ b/api/internal/logic/storelink/deletelinktasklogic.go @@ -47,7 +47,7 @@ func (l *DeleteLinkTaskLogic) DeleteLinkTask(req *types.DeleteLinkTaskReq) (resp return resp, nil } - storelink := storeLink.NewStoreLink(l.svcCtx, participant) + storelink := storeLink.NewStoreLink(l.svcCtx.OctopusRpc, l.svcCtx.ModelArtsRpc, l.svcCtx.ModelArtsImgRpc, l.svcCtx.ACRpc, participant) if storelink == nil { return nil, nil } diff --git a/api/internal/logic/storelink/getaispecslogic.go b/api/internal/logic/storelink/getaispecslogic.go index 62dc6a06..2ba8ee38 100644 --- a/api/internal/logic/storelink/getaispecslogic.go +++ b/api/internal/logic/storelink/getaispecslogic.go @@ -47,7 +47,7 @@ func (l *GetAISpecsLogic) GetAISpecs(req *types.GetResourceSpecsReq) (resp *type return resp, nil } - storelink := storeLink.NewStoreLink(l.svcCtx, participant) + storelink := storeLink.NewStoreLink(l.svcCtx.OctopusRpc, l.svcCtx.ModelArtsRpc, l.svcCtx.ModelArtsImgRpc, l.svcCtx.ACRpc, participant) if storelink == nil { return nil, nil } diff --git a/api/internal/logic/storelink/getlinkimagelistlogic.go b/api/internal/logic/storelink/getlinkimagelistlogic.go index 48e37d7b..9e811da8 100644 --- a/api/internal/logic/storelink/getlinkimagelistlogic.go +++ b/api/internal/logic/storelink/getlinkimagelistlogic.go @@ -47,7 +47,7 @@ func (l *GetLinkImageListLogic) GetLinkImageList(req *types.GetLinkImageListReq) return resp, nil } - storelink := storeLink.NewStoreLink(l.svcCtx, participant) + storelink := storeLink.NewStoreLink(l.svcCtx.OctopusRpc, l.svcCtx.ModelArtsRpc, l.svcCtx.ModelArtsImgRpc, l.svcCtx.ACRpc, participant) if storelink == nil { return nil, nil } diff --git a/api/internal/logic/storelink/getlinktasklogic.go b/api/internal/logic/storelink/getlinktasklogic.go index 24e7b3b7..97aab7af 100644 --- a/api/internal/logic/storelink/getlinktasklogic.go +++ b/api/internal/logic/storelink/getlinktasklogic.go @@ -48,7 +48,7 @@ func (l *GetLinkTaskLogic) GetLinkTask(req *types.GetLinkTaskReq) (resp *types.G return resp, nil } - storelink := storeLink.NewStoreLink(l.svcCtx, participant) + storelink := storeLink.NewStoreLink(l.svcCtx.OctopusRpc, l.svcCtx.ModelArtsRpc, l.svcCtx.ModelArtsImgRpc, l.svcCtx.ACRpc, participant) if storelink == nil { return nil, nil } diff --git a/api/internal/logic/storelink/submitlinktasklogic.go b/api/internal/logic/storelink/submitlinktasklogic.go index 971d75fc..5e8f9d14 100644 --- a/api/internal/logic/storelink/submitlinktasklogic.go +++ b/api/internal/logic/storelink/submitlinktasklogic.go @@ -48,7 +48,7 @@ func (l *SubmitLinkTaskLogic) SubmitLinkTask(req *types.SubmitLinkTaskReq) (resp return resp, nil } - storelink := storeLink.NewStoreLink(l.svcCtx, participant) + storelink := storeLink.NewStoreLink(l.svcCtx.OctopusRpc, l.svcCtx.ModelArtsRpc, l.svcCtx.ModelArtsImgRpc, l.svcCtx.ACRpc, participant) if storelink == nil { return nil, nil } diff --git a/api/internal/logic/storelink/uploadlinkimagelogic.go b/api/internal/logic/storelink/uploadlinkimagelogic.go index 5bd7514a..60beade5 100644 --- a/api/internal/logic/storelink/uploadlinkimagelogic.go +++ b/api/internal/logic/storelink/uploadlinkimagelogic.go @@ -48,7 +48,7 @@ func (l *UploadLinkImageLogic) UploadLinkImage(req *types.UploadLinkImageReq) (r return resp, nil } - storelink := storeLink.NewStoreLink(l.svcCtx, participant) + storelink := storeLink.NewStoreLink(l.svcCtx.OctopusRpc, l.svcCtx.ModelArtsRpc, l.svcCtx.ModelArtsImgRpc, l.svcCtx.ACRpc, participant) if storelink == nil { return nil, nil } diff --git a/api/internal/scheduler/database/aiStorage.go b/api/internal/scheduler/database/aiStorage.go index aa984f16..edcc5969 100644 --- a/api/internal/scheduler/database/aiStorage.go +++ b/api/internal/scheduler/database/aiStorage.go @@ -2,18 +2,44 @@ package database import ( "github.com/zeromicro/go-zero/core/logx" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gorm.io/gorm" + "time" ) type AiStorage struct { DbEngin *gorm.DB } -func (s *AiStorage) GetParticipants() { +func (s *AiStorage) GetParticipants() (*types.ClusterListResp, error) { var resp types.ClusterListResp tx := s.DbEngin.Raw("select * from t_cluster where `deleted_at` IS NULL ORDER BY create_time Desc").Scan(&resp.List) if tx.Error != nil { logx.Errorf(tx.Error.Error()) + return nil, tx.Error } + return &resp, nil +} + +func (s *AiStorage) SaveTask(cluster strategy.AssignedCluster) error { + // 构建主任务结构体 + taskModel := models.Task{ + Status: constants.Saved, + Description: "ai task", + Name: "testAi", + CommitTime: time.Now(), + } + // 保存任务数据到数据库 + tx := s.DbEngin.Create(&taskModel) + if tx.Error != nil { + return tx.Error + } + return nil +} + +func (s *AiStorage) UpdateTask() error { + return nil } diff --git a/api/internal/scheduler/scheduler.go b/api/internal/scheduler/scheduler.go index 8c3265db..281788e6 100644 --- a/api/internal/scheduler/scheduler.go +++ b/api/internal/scheduler/scheduler.go @@ -39,7 +39,7 @@ type Scheduler struct { result []string //pID:子任务yamlstring 键值对 participantRpc participantservice.ParticipantService ResourceCollector *map[string]collector.AiCollector - Storages database.Storage + AiStorages *database.AiStorage AiExecutor *map[string]executor.AiExecutor mu sync.RWMutex } @@ -59,8 +59,8 @@ func NewScheduler(subSchedule SubSchedule, val string, dbEngin *gorm.DB, partici return &Scheduler{task: task, subSchedule: subSchedule, dbEngin: dbEngin, participantRpc: participantRpc}, nil } -func NewSchdlr(resourceCollector *map[string]collector.AiCollector, storages database.Storage, aiExecutor *map[string]executor.AiExecutor) *Scheduler { - return &Scheduler{ResourceCollector: resourceCollector, Storages: storages, AiExecutor: aiExecutor} +func NewSchdlr(resourceCollector *map[string]collector.AiCollector, storages *database.AiStorage, aiExecutor *map[string]executor.AiExecutor) *Scheduler { + return &Scheduler{ResourceCollector: resourceCollector, AiStorages: storages, AiExecutor: aiExecutor} } func (s *Scheduler) SpecifyClusters() { diff --git a/api/internal/scheduler/schedulers/aiScheduler.go b/api/internal/scheduler/schedulers/aiScheduler.go index bbb2b59c..024ef7ae 100644 --- a/api/internal/scheduler/schedulers/aiScheduler.go +++ b/api/internal/scheduler/schedulers/aiScheduler.go @@ -58,6 +58,7 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) { } resources, err := as.findClustersWithResources() + if err != nil { return nil, err } diff --git a/api/internal/storeLink/storeLink.go b/api/internal/storeLink/storeLink.go index 62a1079d..ce00a540 100644 --- a/api/internal/storeLink/storeLink.go +++ b/api/internal/storeLink/storeLink.go @@ -19,13 +19,16 @@ import ( "github.com/pkg/errors" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/common" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils/timeutils" "gitlink.org.cn/jcce-pcm/pcm-ac/hpcAC" + "gitlink.org.cn/jcce-pcm/pcm-ac/hpcacclient" + "gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/imagesservice" + "gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/modelartsservice" "gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/modelarts" "gitlink.org.cn/jcce-pcm/pcm-participant-octopus/octopus" + "gitlink.org.cn/jcce-pcm/pcm-participant-octopus/octopusclient" "gorm.io/gorm" "strings" "sync" @@ -89,19 +92,19 @@ type StoreLink struct { ILinkage Linkage } -func NewStoreLink(svcCtx *svc.ServiceContext, participant *models.StorelinkCenter) *StoreLink { +func NewStoreLink(octopusRpc octopusclient.Octopus, modelArtsRpc modelartsservice.ModelArtsService, modelArtsImgRpc imagesservice.ImagesService, aCRpc hpcacclient.HpcAC, participant *models.StorelinkCenter) *StoreLink { switch participant.Type { case TYPE_OCTOPUS: - linkStruct := NewOctopusLink(svcCtx.OctopusRpc, participant.Name, participant.Id) + linkStruct := NewOctopusLink(octopusRpc, participant.Name, participant.Id) return &StoreLink{ILinkage: linkStruct} case TYPE_MODELARTS: - linkStruct := NewModelArtsLink(svcCtx.ModelArtsRpc, svcCtx.ModelArtsImgRpc, participant.Name, participant.Id) + linkStruct := NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, participant.Name, participant.Id) return &StoreLink{ILinkage: linkStruct} case TYPE_SHUGUANGAI: - linkStruct := NewShuguangAi(svcCtx.ACRpc, participant.Name, participant.Id) + linkStruct := NewShuguangAi(aCRpc, participant.Name, participant.Id) return &StoreLink{ILinkage: linkStruct} case TYPE_SHUGUANGHPC: - linkStruct := NewShuguangHpc(svcCtx.ACRpc, participant.Name, participant.Id) + linkStruct := NewShuguangHpc(aCRpc, participant.Name, participant.Id) return &StoreLink{ILinkage: linkStruct} default: return nil diff --git a/api/internal/svc/servicecontext.go b/api/internal/svc/servicecontext.go index 95dc0a46..4bec99a7 100644 --- a/api/internal/svc/servicecontext.go +++ b/api/internal/svc/servicecontext.go @@ -23,6 +23,8 @@ import ( "github.com/zeromicro/go-zero/zrpc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/config" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/database" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/tracker" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gitlink.org.cn/JointCloud/pcm-coordinator/rpc/client/participantservice" @@ -118,9 +120,9 @@ func NewServiceContext(c config.Config) *ServiceContext { aCRpc := hpcacclient.NewHpcAC(zrpc.MustNewClient(c.ACRpcConf)) modelArtsRpc := modelartsservice.NewModelArtsService(zrpc.MustNewClient(c.ModelArtsRpcConf)) modelArtsImgRpc := imagesservice.NewImagesService(zrpc.MustNewClient(c.ModelArtsImgRpcConf)) - //aiExecutor, resourceCollector := service2.InitAiClusterMap(octopusRpc, modelArtsRpc, modelArtsImgRpc, aCRpc) - //storage := &database.AiStorage{DbEngin: dbEngin} - scheduler := scheduler.NewSchdlr(nil, nil, nil) + aiExecutor, resourceCollector := service.InitAiClusterMap(octopusRpc, modelArtsRpc, modelArtsImgRpc, aCRpc) + storage := &database.AiStorage{DbEngin: dbEngin} + scheduler := scheduler.NewSchdlr(resourceCollector, storage, aiExecutor) return &ServiceContext{ Cron: cron.New(cron.WithSeconds()),