diff --git a/api/client/types.go b/api/client/types.go index e2b490f9..6e729cd5 100644 --- a/api/client/types.go +++ b/api/client/types.go @@ -156,23 +156,28 @@ type AiInfo struct { } type VmInfo struct { - TaskId int64 `json:"taskId,omitempty"` - Name string `json:"name,omitempty"` - AdapterId int64 `json:"adapterId,omitempty,optional"` - AdapterName string `json:"adapterName,omitempty,optional"` - ClusterId int64 `json:"clusterId,omitempty,optional"` - ClusterName string `json:"clusterName,omitempty,optional"` - FlavorRef string `json:"flavor_ref,omitempty"` - ImageRef string `json:"image_ref,omitempty"` - NetworkUuid string `json:"network_uuid,omitempty"` - BlockUuid string `json:"block_uuid,omitempty"` - SourceType string `json:"source_type,omitempty"` - DeleteOnTermination bool `json:"delete_on_termination,omitempty"` - Status string `json:"Status,omitempty"` - StartTime string `json:"startTime,omitempty"` - Platform string `json:"platform,omitempty"` - VmName string `json:"vm_name,omitempty"` - ServerId string `json:"server_id,omitempty"` + Id int64 `json:"id,omitempty"` + TaskId int64 `json:"taskId,omitempty"` + Name string `json:"name,omitempty"` + AdapterId int64 `json:"adapterId,omitempty,optional"` + AdapterName string `json:"adapterName,omitempty,optional"` + ClusterId int64 `json:"clusterId,omitempty,optional"` + ClusterName string `json:"clusterName,omitempty,optional"` + FlavorRef string `json:"flavorRef,omitempty"` + ImageRef string `json:"imageRef,omitempty"` + Status string `json:"status,omitempty"` + Platform string `json:"platform,omitempty"` + Description string `json:"description,omitempty"` // 描述 + AvailabilityZone string `json:"availabilityZone,omitempty"` + MinCount int64 `json:"minCount,omitempty"` + Uuid string `json:"uuid,omitempty"` + StartTime string `json:"startTime,omitempty"` + RunningTime string `json:"runningTime,omitempty"` + Result string `json:"result,omitempty"` + DeletedAt string `json:"deletedAt,omitempty"` + VmName string `json:"vmName,omitempty"` + Replicas int64 `json:"replicas,omitempty"` + ServerId string `json:"serverId,omitempty"` } type ResourceStats struct { diff --git a/api/desc/core/pcm-core.api b/api/desc/core/pcm-core.api index e5e144d4..5e2ed765 100644 --- a/api/desc/core/pcm-core.api +++ b/api/desc/core/pcm-core.api @@ -215,17 +215,17 @@ type ( type ( commitVmTaskReq { - Name string `json:"name"` + Name string `json:"name"` AdapterIds []string `json:"adapterIds,optional"` - ClusterIds []string `json:"clusterIds"` - Strategy string `json:"strategy"` + ClusterIds []string `json:"clusterIds"` + Strategy string `json:"strategy"` StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"` - MinCount int64 `json:"min_count,optional"` - ImageRef int64 `json:"imageRef,optional"` - FlavorRef int64 `json:"flavorRef,optional"` - Uuid int64 `json:"uuid,optional"` - Replicas int32 `json:"replicas,string"` - VmName string `json:"vm_name,optional"` + MinCount int64 `json:"min_count,optional"` + ImageRef int64 `json:"imageRef,optional"` + FlavorRef int64 `json:"flavorRef,optional"` + Uuid int64 `json:"uuid,optional"` + Replicas int64 `json:"replicas,string"` + VmName string `json:"vm_name,optional"` } TaskVm { Image string `json:"image"` @@ -1234,12 +1234,24 @@ type TaskStatusResp { Saved int `json:"Saved"` } -type TaskDetailsResp { - Name string `json:"name"` - description string `json:"description"` - StartTime string `json:"startTime"` - EndTime string `json:"endTime"` - Strategy int64 `json:"strategy"` - SynergyStatus int64 `json:"synergyStatus"` - ClusterInfos []*ClusterInfo `json:"clusterInfos"` -} \ No newline at end of file +type ( + TaskDetailsResp { + Name string `json:"name"` + description string `json:"description"` + StartTime string `json:"startTime"` + EndTime string `json:"endTime"` + Strategy int64 `json:"strategy"` + SynergyStatus int64 `json:"synergyStatus"` + ClusterInfos []*ClusterInfo `json:"clusterInfos"` + SubTaskInfos []*SubTaskInfo `json:"subTaskInfos"` + } + + SubTaskInfo{ + Id string `json:"id" db:"id"` + Name string `json:"name" db:"name"` + ClusterId string `json:"clusterId" db:"cluster_id"` + ClusterName string `json:"clusterName" db:"cluster_name"` + Status string `json:"status" db:"status"` + Remark string `json:"remark" db:"remark"` + } +) \ No newline at end of file diff --git a/api/etc/pcm.yaml b/api/etc/pcm.yaml index 6b156f01..127b6896 100644 --- a/api/etc/pcm.yaml +++ b/api/etc/pcm.yaml @@ -42,7 +42,7 @@ ACRpcConf: # Endpoints: # - 127.0.0.1:8888 NonBlock: true - Timeout: 20000 + Timeout: 50000 #rpc CephRpcConf: diff --git a/api/internal/cron/aiCronTask.go b/api/internal/cron/aiCronTask.go new file mode 100644 index 00000000..b325e6be --- /dev/null +++ b/api/internal/cron/aiCronTask.go @@ -0,0 +1,479 @@ +package cron + +import ( + "errors" + "fmt" + "github.com/zeromicro/go-zero/core/logx" + "github.com/zeromicro/go-zero/zrpc" + "gitlink.org.cn/JointCloud/pcm-ac/hpcacclient" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/config" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/executor" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/storeLink" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" + "gitlink.org.cn/JointCloud/pcm-modelarts/client/imagesservice" + "gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice" + "gitlink.org.cn/JointCloud/pcm-octopus/octopusclient" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "net/http" + "strconv" + "sync" + "time" +) + +const ( + OCTOPUS = "octopus" + MODELARTS = "modelarts" + SHUGUANGAI = "shuguangAi" +) + +func GetTaskList(svc *svc.ServiceContext) ([]*types.TaskModel, error) { + limit := 10 + offset := 0 + var list []*types.TaskModel + db := svc.DbEngin.Model(&types.TaskModel{}).Table("task") + + db = db.Where("deleted_at is null") + + //count total + var total int64 + err := db.Count(&total).Error + db.Limit(limit).Offset(offset) + + if err != nil { + return nil, err + } + err = db.Order("created_time desc").Find(&list).Error + if err != nil { + return nil, err + } + return list, nil +} + +func UpdateAiTaskStatus(svc *svc.ServiceContext, tasklist []*types.TaskModel) { + list := make([]*types.TaskModel, len(tasklist)) + copy(list, tasklist) + for i := len(list) - 1; i >= 0; i-- { + if list[i].AdapterTypeDict != 1 || list[i].Status == constants.Succeeded || list[i].Status == constants.Failed { + list = append(list[:i], list[i+1:]...) + } + } + + if len(list) == 0 { + return + } + + task := list[0] + for i := range list { + earliest, _ := time.Parse(constants.Layout, task.UpdatedTime) + latest, _ := time.Parse(constants.Layout, list[i].UpdatedTime) + if latest.Before(earliest) { + task = list[i] + } + } + + var aiTaskList []*models.TaskAi + tx := svc.DbEngin.Raw("select * from task_ai where `task_id` = ? ", task.Id).Scan(&aiTaskList) + if tx.Error != nil { + logx.Errorf(tx.Error.Error()) + return + } + + if len(aiTaskList) == 0 { + return + } + + var wg sync.WaitGroup + for _, aitask := range aiTaskList { + t := aitask + if t.Status == constants.Completed || t.Status == constants.Failed { + continue + } + wg.Add(1) + go func() { + h := http.Request{} + trainingTask, err := svc.Scheduler.AiService.AiCollectorAdapterMap[strconv.FormatInt(t.AdapterId, 10)][strconv.FormatInt(t.ClusterId, 10)].GetTrainingTask(h.Context(), t.JobId) + if err != nil { + if status.Code(err) == codes.DeadlineExceeded { + msg := fmt.Sprintf("###UpdateAiTaskStatus###, AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error()) + logx.Errorf(errors.New(msg).Error()) + wg.Done() + return + } + + msg := fmt.Sprintf("###UpdateAiTaskStatus###, AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error()) + logx.Errorf(errors.New(msg).Error()) + wg.Done() + return + } + if trainingTask == nil { + wg.Done() + return + } + switch trainingTask.Status { + case constants.Running: + if t.Status != trainingTask.Status { + svc.Scheduler.AiStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "running", "任务运行中") + t.Status = trainingTask.Status + } + case constants.Failed: + if t.Status != trainingTask.Status { + svc.Scheduler.AiStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "failed", "任务失败") + t.Status = trainingTask.Status + } + case constants.Completed: + if t.Status != trainingTask.Status { + svc.Scheduler.AiStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "completed", "任务完成") + t.Status = trainingTask.Status + } + default: + if t.Status != trainingTask.Status { + svc.Scheduler.AiStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "pending", "任务pending") + t.Status = trainingTask.Status + } + } + t.StartTime = trainingTask.Start + t.EndTime = trainingTask.End + err = svc.Scheduler.AiStorages.UpdateAiTask(t) + if err != nil { + msg := fmt.Sprintf("###UpdateAiTaskStatus###, AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error()) + logx.Errorf(errors.New(msg).Error()) + wg.Done() + return + } + wg.Done() + }() + } + wg.Wait() +} + +func UpdateTaskStatus(svc *svc.ServiceContext, tasklist []*types.TaskModel) { + list := make([]*types.TaskModel, len(tasklist)) + copy(list, tasklist) + for i := len(list) - 1; i >= 0; i-- { + if list[i].AdapterTypeDict != 1 || list[i].Status == constants.Succeeded || list[i].Status == constants.Failed { + list = append(list[:i], list[i+1:]...) + } + } + + if len(list) == 0 { + return + } + + task := list[0] + for i := range list { + earliest, _ := time.Parse(time.RFC3339, task.UpdatedTime) + latest, _ := time.Parse(time.RFC3339, list[i].UpdatedTime) + if latest.Before(earliest) { + task = list[i] + } + } + + var aiTask []*models.TaskAi + tx := svc.DbEngin.Raw("select * from task_ai where `task_id` = ? ", task.Id).Scan(&aiTask) + if tx.Error != nil { + logx.Errorf(tx.Error.Error()) + return + } + + if len(aiTask) == 0 { + tx = svc.DbEngin.Model(task).Table("task").Where("deleted_at is null").Updates(task) + if tx.Error != nil { + logx.Errorf(tx.Error.Error()) + return + } + return + } + + if len(aiTask) == 1 { + if aiTask[0].Status == constants.Completed { + task.Status = constants.Succeeded + } else { + task.Status = aiTask[0].Status + } + task.StartTime = aiTask[0].StartTime + task.EndTime = aiTask[0].EndTime + task.UpdatedTime = time.Now().Format(constants.Layout) + tx = svc.DbEngin.Model(task).Table("task").Where("deleted_at is null").Updates(task) + if tx.Error != nil { + logx.Errorf(tx.Error.Error()) + return + } + return + } + + for i := len(aiTask) - 1; i >= 0; i-- { + if aiTask[i].StartTime == "" { + task.Status = aiTask[i].Status + aiTask = append(aiTask[:i], aiTask[i+1:]...) + } + } + + if len(aiTask) == 0 { + task.UpdatedTime = time.Now().Format(constants.Layout) + tx = svc.DbEngin.Table("task").Model(task).Updates(task) + if tx.Error != nil { + logx.Errorf(tx.Error.Error()) + return + } + return + } + + start, _ := time.ParseInLocation(constants.Layout, aiTask[0].StartTime, time.Local) + end, _ := time.ParseInLocation(constants.Layout, aiTask[0].EndTime, time.Local) + + var status string + var count int + for _, a := range aiTask { + s, _ := time.ParseInLocation(constants.Layout, a.StartTime, time.Local) + e, _ := time.ParseInLocation(constants.Layout, a.EndTime, time.Local) + + if s.Before(start) { + start = s + } + + if e.After(end) { + end = e + } + + if a.Status == constants.Failed { + status = a.Status + break + } + + if a.Status == constants.Pending { + status = a.Status + continue + } + + if a.Status == constants.Running { + status = a.Status + continue + } + + if a.Status == constants.Completed { + count++ + continue + } + } + + if count == len(aiTask) { + status = constants.Succeeded + } + + if status != "" { + task.Status = status + task.StartTime = start.Format(constants.Layout) + task.EndTime = end.Format(constants.Layout) + } + + task.UpdatedTime = time.Now().Format(constants.Layout) + tx = svc.DbEngin.Table("task").Model(task).Updates(task) + if tx.Error != nil { + logx.Errorf(tx.Error.Error()) + return + } +} + +func UpdateAiAdapterMaps(svc *svc.ServiceContext) { + var aiType = "1" + adapterIds, err := svc.Scheduler.AiStorages.GetAdapterIdsByType(aiType) + if err != nil { + msg := fmt.Sprintf("###UpdateAiAdapterMaps###, error: %v \n", err.Error()) + logx.Errorf(errors.New(msg).Error()) + return + } + if len(adapterIds) == 0 { + return + } + + for _, id := range adapterIds { + clusters, err := svc.Scheduler.AiStorages.GetClustersByAdapterId(id) + if err != nil { + msg := fmt.Sprintf("###UpdateAiAdapterMaps###, error: %v \n", err.Error()) + logx.Errorf(errors.New(msg).Error()) + return + } + if len(clusters.List) == 0 { + continue + } + if isAdapterExist(svc, id, len(clusters.List)) { + continue + } else { + if isAdapterEmpty(svc, id) { + exeClusterMap, colClusterMap := InitAiClusterMap(&svc.Config, clusters.List) + svc.Scheduler.AiService.AiExecutorAdapterMap[id] = exeClusterMap + svc.Scheduler.AiService.AiCollectorAdapterMap[id] = colClusterMap + } else { + UpdateClusterMaps(svc, id, clusters.List) + } + } + } +} + +func UpdateClusterMaps(svc *svc.ServiceContext, adapterId string, clusters []types.ClusterInfo) { + for _, c := range clusters { + _, ok := svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] + _, ok2 := svc.Scheduler.AiService.AiCollectorAdapterMap[adapterId][c.Id] + if !ok && !ok2 { + switch c.Name { + case OCTOPUS: + id, _ := strconv.ParseInt(c.Id, 10, 64) + octopusRpc := octopusclient.NewOctopus(zrpc.MustNewClient(svc.Config.OctopusRpcConf)) + octopus := storeLink.NewOctopusLink(octopusRpc, c.Nickname, id) + svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = octopus + svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = octopus + case MODELARTS: + id, _ := strconv.ParseInt(c.Id, 10, 64) + modelArtsRpc := modelartsservice.NewModelArtsService(zrpc.MustNewClient(svc.Config.ModelArtsRpcConf)) + modelArtsImgRpc := imagesservice.NewImagesService(zrpc.MustNewClient(svc.Config.ModelArtsImgRpcConf)) + modelarts := storeLink.NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, c.Name, id, c.Nickname) + svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = modelarts + svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = modelarts + case SHUGUANGAI: + id, _ := strconv.ParseInt(c.Id, 10, 64) + aCRpc := hpcacclient.NewHpcAC(zrpc.MustNewClient(svc.Config.ACRpcConf)) + sgai := storeLink.NewShuguangAi(aCRpc, c.Nickname, id) + svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = sgai + svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = sgai + } + } else { + continue + } + } + +} + +func isAdapterExist(svc *svc.ServiceContext, id string, clusterNum int) bool { + emap, ok := svc.Scheduler.AiService.AiExecutorAdapterMap[id] + cmap, ok2 := svc.Scheduler.AiService.AiCollectorAdapterMap[id] + if ok && ok2 { + if len(emap) == clusterNum && len(cmap) == clusterNum { + return true + } + } + return false +} + +func isAdapterEmpty(svc *svc.ServiceContext, id string) bool { + _, ok := svc.Scheduler.AiService.AiExecutorAdapterMap[id] + _, ok2 := svc.Scheduler.AiService.AiCollectorAdapterMap[id] + if !ok && !ok2 { + return true + } + return false +} + +func InitAiClusterMap(conf *config.Config, clusters []types.ClusterInfo) (map[string]executor.AiExecutor, map[string]collector.AiCollector) { + executorMap := make(map[string]executor.AiExecutor) + collectorMap := make(map[string]collector.AiCollector) + for _, c := range clusters { + switch c.Name { + case OCTOPUS: + id, _ := strconv.ParseInt(c.Id, 10, 64) + octopusRpc := octopusclient.NewOctopus(zrpc.MustNewClient(conf.OctopusRpcConf)) + octopus := storeLink.NewOctopusLink(octopusRpc, c.Nickname, id) + collectorMap[c.Id] = octopus + executorMap[c.Id] = octopus + case MODELARTS: + id, _ := strconv.ParseInt(c.Id, 10, 64) + modelArtsRpc := modelartsservice.NewModelArtsService(zrpc.MustNewClient(conf.ModelArtsRpcConf)) + modelArtsImgRpc := imagesservice.NewImagesService(zrpc.MustNewClient(conf.ModelArtsImgRpcConf)) + modelarts := storeLink.NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, c.Name, id, c.Nickname) + collectorMap[c.Id] = modelarts + executorMap[c.Id] = modelarts + case SHUGUANGAI: + id, _ := strconv.ParseInt(c.Id, 10, 64) + aCRpc := hpcacclient.NewHpcAC(zrpc.MustNewClient(conf.ACRpcConf)) + sgai := storeLink.NewShuguangAi(aCRpc, c.Nickname, id) + collectorMap[c.Id] = sgai + executorMap[c.Id] = sgai + } + } + + return executorMap, collectorMap +} + +func UpdateClusterResource(svc *svc.ServiceContext) { + list, err := svc.Scheduler.AiStorages.GetAdaptersByType("1") + if err != nil { + return + } + var wg sync.WaitGroup + for _, adapter := range list { + clusters, err := svc.Scheduler.AiStorages.GetClustersByAdapterId(adapter.Id) + if err != nil { + continue + } + for _, cluster := range clusters.List { + c := cluster + clusterResource, err := svc.Scheduler.AiStorages.GetClusterResourcesById(c.Id) + if err != nil { + continue + } + wg.Add(1) + go func() { + _, ok := svc.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][c.Id] + if !ok { + wg.Done() + return + } + h := http.Request{} + stat, err := svc.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][c.Id].GetResourceStats(h.Context()) + if err != nil { + wg.Done() + return + } + if stat == nil { + wg.Done() + return + } + clusterType, err := strconv.ParseInt(adapter.Type, 10, 64) + if err != nil { + wg.Done() + return + } + var cardTotal int64 + var topsTotal float64 + for _, card := range stat.CardsAvail { + cardTotal += int64(card.CardNum) + topsTotal += card.TOpsAtFp16 * float64(card.CardNum) + } + + if (models.TClusterResource{} == *clusterResource) { + err = svc.Scheduler.AiStorages.SaveClusterResources(adapter.Id, c.Id, c.Name, clusterType, float64(stat.CpuCoreAvail), float64(stat.CpuCoreTotal), + stat.MemAvail, stat.MemTotal, stat.DiskAvail, stat.DiskTotal, float64(stat.GpuAvail), float64(stat.GpuTotal), cardTotal, topsTotal) + if err != nil { + wg.Done() + return + } + } else { + if stat.CpuCoreTotal == 0 || stat.MemTotal == 0 || stat.DiskTotal == 0 { + wg.Done() + return + } + clusterResource.CardTotal = cardTotal + clusterResource.CardTopsTotal = topsTotal + clusterResource.CpuAvail = float64(stat.CpuCoreAvail) + clusterResource.CpuTotal = float64(stat.CpuCoreTotal) + clusterResource.MemAvail = stat.MemAvail + clusterResource.MemTotal = stat.MemTotal + clusterResource.DiskAvail = stat.DiskAvail + clusterResource.DiskTotal = stat.DiskTotal + + err := svc.Scheduler.AiStorages.UpdateClusterResources(clusterResource) + if err != nil { + wg.Done() + return + } + } + wg.Done() + }() + } + } + wg.Wait() +} diff --git a/api/internal/cron/cron.go b/api/internal/cron/cron.go new file mode 100644 index 00000000..00480db9 --- /dev/null +++ b/api/internal/cron/cron.go @@ -0,0 +1,50 @@ +/* + + Copyright (c) [2023] [pcm] + [pcm-coordinator] is licensed under Mulan PSL v2. + You can use this software according to the terms and conditions of the Mulan PSL v2. + You may obtain a copy of Mulan PSL v2 at: + http://license.coscl.org.cn/MulanPSL2 + THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + See the Mulan PSL v2 for more details. + +*/ + +package cron + +import ( + "github.com/zeromicro/go-zero/core/logx" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" +) + +func AddCronGroup(svc *svc.ServiceContext) { + // 删除三天前的监控信息 + svc.Cron.AddFunc("0 0 0 ? * ? ", func() { + ClearMetricsData(svc) + }) + + // 同步任务信息到core端 + svc.Cron.AddFunc("*/5 * * * * ?", func() { + SyncParticipantRpc(svc) + }) + + svc.Cron.AddFunc("*/5 * * * * ?", func() { + list, err := GetTaskList(svc) + if err != nil { + logx.Errorf(err.Error()) + return + } + UpdateTaskStatus(svc, list) + UpdateAiTaskStatus(svc, list) + }) + + svc.Cron.AddFunc("*/5 * * * * ?", func() { + UpdateAiAdapterMaps(svc) + }) + + svc.Cron.AddFunc("*/59 * * * * ?", func() { + UpdateClusterResource(svc) + }) +} diff --git a/api/internal/logic/ai/getcenteroverviewlogic.go b/api/internal/logic/ai/getcenteroverviewlogic.go index e6a45106..0363edb7 100644 --- a/api/internal/logic/ai/getcenteroverviewlogic.go +++ b/api/internal/logic/ai/getcenteroverviewlogic.go @@ -79,7 +79,6 @@ func (l *GetCenterOverviewLogic) GetCenterOverview() (resp *types.CenterOverview case <-time.After(1 * time.Second): return resp, nil } - } func (l *GetCenterOverviewLogic) updateClusterResource(mu *sync.RWMutex, ch chan<- struct{}, list []*types.AdapterInfo) { @@ -127,7 +126,7 @@ func (l *GetCenterOverviewLogic) updateClusterResource(mu *sync.RWMutex, ch chan mu.Lock() if (models.TClusterResource{} == *clusterResource) { - err = l.svcCtx.Scheduler.AiStorages.SaveClusterResources(c.Id, c.Name, clusterType, float64(stat.CpuCoreAvail), float64(stat.CpuCoreTotal), + err = l.svcCtx.Scheduler.AiStorages.SaveClusterResources(adapter.Id, c.Id, c.Name, clusterType, float64(stat.CpuCoreAvail), float64(stat.CpuCoreTotal), stat.MemAvail, stat.MemTotal, stat.DiskAvail, stat.DiskTotal, float64(stat.GpuAvail), float64(stat.GpuTotal), cardTotal, topsTotal) if err != nil { mu.Unlock() @@ -135,8 +134,19 @@ func (l *GetCenterOverviewLogic) updateClusterResource(mu *sync.RWMutex, ch chan return } } else { + if stat.CpuCoreTotal == 0 || stat.MemTotal == 0 || stat.DiskTotal == 0 { + wg.Done() + return + } clusterResource.CardTotal = cardTotal clusterResource.CardTopsTotal = topsTotal + clusterResource.CpuAvail = float64(stat.CpuCoreAvail) + clusterResource.CpuTotal = float64(stat.CpuCoreTotal) + clusterResource.MemAvail = stat.MemAvail + clusterResource.MemTotal = stat.MemTotal + clusterResource.DiskAvail = stat.DiskAvail + clusterResource.DiskTotal = stat.DiskTotal + err := l.svcCtx.Scheduler.AiStorages.UpdateClusterResources(clusterResource) if err != nil { mu.Unlock() diff --git a/api/internal/logic/cloud/commitgeneraltasklogic.go b/api/internal/logic/cloud/commitgeneraltasklogic.go index 14240bc0..ac35e1b8 100644 --- a/api/internal/logic/cloud/commitgeneraltasklogic.go +++ b/api/internal/logic/cloud/commitgeneraltasklogic.go @@ -106,6 +106,7 @@ func (l *CommitGeneralTaskLogic) CommitGeneralTask(req *types.GeneralTaskReq) er sStruct := UnMarshalK8sStruct(s, int64(r.Replica)) unString, _ := sStruct.MarshalJSON() taskCloud.Id = utils.GenSnowflakeIDUint() + taskCloud.Name = sStruct.GetName() + "-" + sStruct.GetKind() taskCloud.TaskId = uint(taskModel.Id) clusterId, _ := strconv.ParseUint(r.ClusterId, 10, 64) taskCloud.AdapterId = uint(adapterId) diff --git a/api/internal/logic/core/commitvmtasklogic.go b/api/internal/logic/core/commitvmtasklogic.go index 84375e9a..dd0908ca 100644 --- a/api/internal/logic/core/commitvmtasklogic.go +++ b/api/internal/logic/core/commitvmtasklogic.go @@ -119,7 +119,8 @@ func (l *CommitVmTaskLogic) CommitVmTask(req *types.CommitVmTaskReq) (resp *type logx.Errorf("CommitGeneralTask() => sql execution error: %v", err) //return errors.Errorf("the cluster does not match the drive resources. Check the data"), nil }*/ - taskVm.Name = req.VmName + taskVm.Name = req.Name + taskVm.TaskId = taskModel.Id taskVm.Status = "Saved" taskVm.StartTime = time.Now().String() taskVm.MinCount = req.MinCount @@ -133,6 +134,8 @@ func (l *CommitVmTaskLogic) CommitVmTask(req *types.CommitVmTaskReq) (resp *type l.svcCtx.DbEngin.Raw("select name from t_cluster where id= ?", r.ClusterId).Scan(&clusterName) taskVm.ClusterName = clusterName taskVm.ClusterId, err = strconv.ParseInt(clusterId, 10, 64) + taskVm.VmName = req.VmName + taskVm.Replicas = req.Replicas if err != nil { fmt.Println("Error converting string to int64:", err) return diff --git a/api/internal/logic/core/pagelisttasklogic.go b/api/internal/logic/core/pagelisttasklogic.go index 108b3f78..96ccf437 100644 --- a/api/internal/logic/core/pagelisttasklogic.go +++ b/api/internal/logic/core/pagelisttasklogic.go @@ -80,6 +80,7 @@ func (l *PageListTaskLogic) PageListTask(req *types.PageTaskReq) (resp *types.Pa for _, ch := range chs { select { case <-ch: + case <-time.After(1 * time.Second): } } return diff --git a/api/internal/logic/core/pulltaskinfologic.go b/api/internal/logic/core/pulltaskinfologic.go index 9581659e..79fff62a 100644 --- a/api/internal/logic/core/pulltaskinfologic.go +++ b/api/internal/logic/core/pulltaskinfologic.go @@ -55,12 +55,25 @@ func (l *PullTaskInfoLogic) PullTaskInfo(req *clientCore.PullTaskInfoReq) (*clie } } case 0: - var cloudModelList []cloud.TaskCloudModel - err := findModelList(req.AdapterId, l.svcCtx.DbEngin, &cloudModelList) - if err != nil { - return nil, err + var resourceType int32 + l.svcCtx.DbEngin.Raw("select resource_type as resourceType from `t_adapter` where id = ?", req.AdapterId).Scan(&resourceType) + switch resourceType { + case 01: + var cloudModelList []cloud.TaskCloudModel + err := findModelList(req.AdapterId, l.svcCtx.DbEngin, &cloudModelList) + if err != nil { + return nil, err + } + utils.Convert(cloudModelList, &resp.CloudInfoList) + case 02: + var vmModelList []models.TaskVm + err := findModelList(req.AdapterId, l.svcCtx.DbEngin, &vmModelList) + if err != nil { + return nil, err + } + utils.Convert(vmModelList, &resp.VmInfoList) } - utils.Convert(cloudModelList, &resp.CloudInfoList) + case 1: var aiModelList []models.Ai err := findModelList(req.AdapterId, l.svcCtx.DbEngin, &aiModelList) @@ -68,13 +81,6 @@ func (l *PullTaskInfoLogic) PullTaskInfo(req *clientCore.PullTaskInfoReq) (*clie return nil, err } utils.Convert(aiModelList, &resp.AiInfoList) - case 3: - var vmModelList []models.TaskVm - err := findModelList(req.AdapterId, l.svcCtx.DbEngin, &vmModelList) - if err != nil { - return nil, err - } - utils.Convert(vmModelList, &resp.VmInfoList) } return &resp, nil } diff --git a/api/internal/logic/core/pushtaskinfologic.go b/api/internal/logic/core/pushtaskinfologic.go index 63169a86..0d5e8bd4 100644 --- a/api/internal/logic/core/pushtaskinfologic.go +++ b/api/internal/logic/core/pushtaskinfologic.go @@ -33,26 +33,46 @@ func (l *PushTaskInfoLogic) PushTaskInfo(req *clientCore.PushTaskInfoReq) (*clie l.svcCtx.DbEngin.Raw("select type as kind from t_adapter where id = ?", req.AdapterId).Scan(&kind) switch kind { case 0: - for _, cloudInfo := range req.CloudInfoList { - var taskId uint - result := l.svcCtx.DbEngin.Table("task_cloud").Select("task_id").Where("task_id = ?", cloudInfo.TaskId).Find(&taskId) - if errors.Is(result.Error, gorm.ErrRecordNotFound) { - return nil, errors.New("Record does not exist") + var resourceType int32 + l.svcCtx.DbEngin.Raw("select resource_type as resourceType from `t_adapter` where id = ?", req.AdapterId).Scan(&resourceType) + switch resourceType { + case 01: + for _, cloudInfo := range req.CloudInfoList { + var taskId uint + result := l.svcCtx.DbEngin.Table("task_cloud").Select("task_id").Where("task_id = ?", cloudInfo.TaskId).Find(&taskId) + if errors.Is(result.Error, gorm.ErrRecordNotFound) { + return nil, errors.New("Record does not exist") + } + l.svcCtx.DbEngin.Exec("update task_cloud set status = ?,start_time = ?,result = ? where task_id = ?", + cloudInfo.Status, cloudInfo.StartTime, cloudInfo.Result, cloudInfo.TaskId) + var taskName string + l.svcCtx.DbEngin.Raw("select name as kind from task where id = ?", taskId).Scan(&taskName) + noticeInfo := clientCore.NoticeInfo{ + TaskId: cloudInfo.TaskId, + AdapterId: cloudInfo.AdapterId, + AdapterName: cloudInfo.AdapterName, + ClusterId: cloudInfo.ClusterId, + ClusterName: cloudInfo.ClusterName, + TaskName: taskName, + } + syncTask(l.svcCtx.DbEngin, noticeInfo) } - l.svcCtx.DbEngin.Exec("update task_cloud set status = ?,start_time = ?,result = ? where task_id = ?", - cloudInfo.Status, cloudInfo.StartTime, cloudInfo.Result, cloudInfo.TaskId) - var taskName string - l.svcCtx.DbEngin.Raw("select name as kind from task where id = ?", taskId).Scan(&taskName) - noticeInfo := clientCore.NoticeInfo{ - TaskId: cloudInfo.TaskId, - AdapterId: cloudInfo.AdapterId, - AdapterName: cloudInfo.AdapterName, - ClusterId: cloudInfo.ClusterId, - ClusterName: cloudInfo.ClusterName, - TaskName: taskName, + case 02: + for _, vmInfo := range req.VmInfoList { + l.svcCtx.DbEngin.Exec("update task_vm set status = ?,start_time = ? where participant_id = ? and task_id = ? and name = ?", + vmInfo.Status, vmInfo.StartTime, req.AdapterId, vmInfo.TaskId, vmInfo.Name) + noticeInfo := clientCore.NoticeInfo{ + TaskId: vmInfo.TaskId, + AdapterId: vmInfo.AdapterId, + AdapterName: vmInfo.AdapterName, + ClusterId: vmInfo.ClusterId, + ClusterName: vmInfo.ClusterName, + TaskName: vmInfo.Name, + } + syncTask(l.svcCtx.DbEngin, noticeInfo) } - syncTask(l.svcCtx.DbEngin, noticeInfo) } + case 2: for _, hpcInfo := range req.HpcInfoList { l.svcCtx.DbEngin.Exec("update task_hpc set status = ?,start_time = ?,job_id = ? where cluster_id = ? and task_id = ? and name = ?", @@ -81,20 +101,6 @@ func (l *PushTaskInfoLogic) PushTaskInfo(req *clientCore.PushTaskInfoReq) (*clie } syncTask(l.svcCtx.DbEngin, noticeInfo) } - case 3: - for _, vmInfo := range req.VmInfoList { - l.svcCtx.DbEngin.Exec("update task_vm set status = ?,start_time = ? where participant_id = ? and task_id = ? and name = ?", - vmInfo.Status, vmInfo.StartTime, req.AdapterId, vmInfo.TaskId, vmInfo.Name) - noticeInfo := clientCore.NoticeInfo{ - TaskId: vmInfo.TaskId, - AdapterId: vmInfo.AdapterId, - AdapterName: vmInfo.AdapterName, - ClusterId: vmInfo.ClusterId, - ClusterName: vmInfo.ClusterName, - TaskName: vmInfo.Name, - } - syncTask(l.svcCtx.DbEngin, noticeInfo) - } } return &resp, nil } diff --git a/api/internal/logic/core/taskdetailslogic.go b/api/internal/logic/core/taskdetailslogic.go index c08edaaa..0a8c7dbd 100644 --- a/api/internal/logic/core/taskdetailslogic.go +++ b/api/internal/logic/core/taskdetailslogic.go @@ -32,18 +32,22 @@ func (l *TaskDetailsLogic) TaskDetails(req *types.FId) (resp *types.TaskDetailsR if errors.Is(l.svcCtx.DbEngin.Where("id", req.Id).First(&task).Error, gorm.ErrRecordNotFound) { return nil, errors.New("记录不存在") } - clusterIds := make([]int64, 0) + clusterIds := make([]string, 0) var cList []*types.ClusterInfo + var subList []*types.SubTaskInfo switch task.AdapterTypeDict { case 0: - l.svcCtx.DbEngin.Table("task_cloud").Select("cluster_id").Where("task_id", task.Id).Scan(&clusterIds) - if len(clusterIds) <= 0 { - l.svcCtx.DbEngin.Table("task_vm").Select("cluster_id").Where("task_id", task.Id).Find(&clusterIds) + l.svcCtx.DbEngin.Table("task_cloud").Where("task_id", task.Id).Scan(&subList) + if len(subList) <= 0 { + l.svcCtx.DbEngin.Table("task_vm").Where("task_id", task.Id).Find(&subList) } case 1: - l.svcCtx.DbEngin.Table("task_ai").Select("cluster_id").Where("task_id", task.Id).Scan(&clusterIds) + l.svcCtx.DbEngin.Table("task_ai").Where("task_id", task.Id).Scan(&subList) case 2: - l.svcCtx.DbEngin.Table("task_hpc").Select("cluster_id").Where("task_id", task.Id).Scan(&clusterIds) + l.svcCtx.DbEngin.Table("task_hpc").Where("task_id", task.Id).Scan(&subList) + } + for _, sub := range subList { + clusterIds = append(clusterIds, sub.ClusterId) } err = l.svcCtx.DbEngin.Table("t_cluster").Where("id in ?", clusterIds).Scan(&cList).Error if err != nil { @@ -51,5 +55,6 @@ func (l *TaskDetailsLogic) TaskDetails(req *types.FId) (resp *types.TaskDetailsR } utils.Convert(&task, &resp) resp.ClusterInfos = cList + resp.SubTaskInfos = subList return } diff --git a/api/internal/logic/schedule/schedulesubmitlogic.go b/api/internal/logic/schedule/schedulesubmitlogic.go index 5ce7a6ee..64a1fbc4 100644 --- a/api/internal/logic/schedule/schedulesubmitlogic.go +++ b/api/internal/logic/schedule/schedulesubmitlogic.go @@ -64,7 +64,13 @@ func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *type synergystatus = 1 } strategyCode, err := l.svcCtx.Scheduler.AiStorages.GetStrategyCode(req.AiOption.Strategy) - + if err != nil { + return nil, err + } + adapterName, err := l.svcCtx.Scheduler.AiStorages.GetAdapterNameById(rs[0].AdapterId) + if err != nil { + return nil, err + } id, err := l.svcCtx.Scheduler.AiStorages.SaveTask(req.AiOption.TaskName, strategyCode, synergystatus) if err != nil { return nil, err @@ -84,11 +90,13 @@ func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *type clusterName, _ := l.svcCtx.Scheduler.AiStorages.GetClusterNameById(r.ClusterId) - err := l.svcCtx.Scheduler.AiStorages.SaveAiTask(id, opt, r.ClusterId, clusterName, r.JobId, constants.Saved, r.Msg) + err := l.svcCtx.Scheduler.AiStorages.SaveAiTask(id, opt, adapterName, r.ClusterId, clusterName, r.JobId, constants.Saved, r.Msg) if err != nil { return nil, err } + l.svcCtx.Scheduler.AiStorages.AddNoticeInfo(r.AdapterId, adapterName, r.ClusterId, clusterName, r.TaskName, "create", "任务创建中") + resp.Results = append(resp.Results, scheResult) } diff --git a/api/internal/scheduler/database/aiStorage.go b/api/internal/scheduler/database/aiStorage.go index d99b8690..68e11316 100644 --- a/api/internal/scheduler/database/aiStorage.go +++ b/api/internal/scheduler/database/aiStorage.go @@ -2,6 +2,7 @@ package database import ( "github.com/zeromicro/go-zero/core/logx" + clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/api/client" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" @@ -46,6 +47,16 @@ func (s *AiStorage) GetClusterNameById(id string) (string, error) { return name, nil } +func (s *AiStorage) GetAdapterNameById(id string) (string, error) { + var name string + tx := s.DbEngin.Raw("select `name` from t_adapter where `id` = ?", id).Scan(&name) + if tx.Error != nil { + logx.Errorf(tx.Error.Error()) + return "", tx.Error + } + return name, nil +} + func (s *AiStorage) GetAdapterIdsByType(adapterType string) ([]string, error) { var list []types.AdapterInfo var ids []string @@ -102,7 +113,7 @@ func (s *AiStorage) SaveTask(name string, strategyCode int64, synergyStatus int6 return taskModel.Id, nil } -func (s *AiStorage) SaveAiTask(taskId int64, option *option.AiOption, clusterId string, clusterName string, jobId string, status string, msg string) error { +func (s *AiStorage) SaveAiTask(taskId int64, option *option.AiOption, adapterName string, clusterId string, clusterName string, jobId string, status string, msg string) error { // 构建主任务结构体 aId, err := strconv.ParseInt(option.AdapterId, 10, 64) if err != nil { @@ -116,6 +127,7 @@ func (s *AiStorage) SaveAiTask(taskId int64, option *option.AiOption, clusterId aiTaskModel := models.TaskAi{ TaskId: taskId, AdapterId: aId, + AdapterName: adapterName, ClusterId: cId, ClusterName: clusterName, Name: option.TaskName, @@ -187,13 +199,18 @@ func (s *AiStorage) GetClusterResourcesById(clusterId string) (*models.TClusterR return &clusterResource, nil } -func (s *AiStorage) SaveClusterResources(clusterId string, clusterName string, clusterType int64, cpuAvail float64, cpuTotal float64, +func (s *AiStorage) SaveClusterResources(adapterId string, clusterId string, clusterName string, clusterType int64, cpuAvail float64, cpuTotal float64, memAvail float64, memTotal float64, diskAvail float64, diskTotal float64, gpuAvail float64, gpuTotal float64, cardTotal int64, topsTotal float64) error { cId, err := strconv.ParseInt(clusterId, 10, 64) if err != nil { return err } + aId, err := strconv.ParseInt(adapterId, 10, 64) + if err != nil { + return err + } clusterResource := models.TClusterResource{ + AdapterId: aId, ClusterId: cId, ClusterName: clusterName, ClusterType: clusterType, @@ -212,26 +229,45 @@ func (s *AiStorage) SaveClusterResources(clusterId string, clusterName string, c if tx.Error != nil { return tx.Error } - // prometheus param := tracker.ClusterLoadRecord{ - ClusterName: clusterName, - CpuAvail: cpuAvail, - CpuTotal: cpuTotal, - MemoryAvail: memAvail, - MemoryTotal: memTotal, - DiskAvail: diskAvail, - DiskTotal: diskTotal, + AdapterId: aId, + ClusterName: clusterName, + CpuAvail: cpuAvail, + CpuTotal: cpuTotal, + CpuUtilisation: clusterResource.CpuAvail / clusterResource.CpuTotal, + MemoryAvail: memAvail, + MemoryTotal: memTotal, + MemoryUtilisation: clusterResource.MemAvail / clusterResource.MemTotal, + DiskAvail: diskAvail, + DiskTotal: diskTotal, + DiskUtilisation: clusterResource.DiskAvail / clusterResource.DiskTotal, } tracker.SyncClusterLoad(param) return nil } func (s *AiStorage) UpdateClusterResources(clusterResource *models.TClusterResource) error { - tx := s.DbEngin.Model(clusterResource).Updates(clusterResource) + tx := s.DbEngin.Where("cluster_id = ?", clusterResource.ClusterId).Updates(clusterResource) + if tx.Error != nil { return tx.Error } + // prometheus + param := tracker.ClusterLoadRecord{ + AdapterId: clusterResource.AdapterId, + ClusterName: clusterResource.ClusterName, + CpuAvail: clusterResource.CpuAvail, + CpuTotal: clusterResource.CpuTotal, + CpuUtilisation: clusterResource.CpuAvail / clusterResource.CpuTotal, + MemoryAvail: clusterResource.MemAvail, + MemoryTotal: clusterResource.MemTotal, + MemoryUtilisation: clusterResource.MemAvail / clusterResource.MemTotal, + DiskAvail: clusterResource.DiskAvail, + DiskTotal: clusterResource.DiskTotal, + DiskUtilisation: clusterResource.DiskAvail / clusterResource.DiskTotal, + } + tracker.SyncClusterLoad(param) return nil } @@ -257,3 +293,28 @@ func (s *AiStorage) GetStrategyCode(name string) (int64, error) { } return strategy, nil } + +func (s *AiStorage) AddNoticeInfo(adapterId string, adapterName string, clusterId string, clusterName string, taskName string, noticeType string, incident string) { + aId, err := strconv.ParseInt(adapterId, 10, 64) + if err != nil { + return + } + cId, err := strconv.ParseInt(clusterId, 10, 64) + if err != nil { + return + } + noticeInfo := clientCore.NoticeInfo{ + AdapterId: aId, + AdapterName: adapterName, + ClusterId: cId, + ClusterName: clusterName, + NoticeType: noticeType, + TaskName: taskName, + Incident: incident, + CreatedTime: time.Now(), + } + result := s.DbEngin.Table("t_notice").Create(¬iceInfo) + if result.Error != nil { + logx.Errorf("Task creation failure, err: %v", result.Error) + } +} diff --git a/api/internal/scheduler/schedulers/aiScheduler.go b/api/internal/scheduler/schedulers/aiScheduler.go index 7979f966..201f565d 100644 --- a/api/internal/scheduler/schedulers/aiScheduler.go +++ b/api/internal/scheduler/schedulers/aiScheduler.go @@ -45,6 +45,8 @@ type AiScheduler struct { } type AiResult struct { + AdapterId string + TaskName string JobId string ClusterId string Strategy string @@ -190,6 +192,8 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa result, _ = convertType(resp) mu.Unlock() + result.AdapterId = opt.AdapterId + result.TaskName = opt.TaskName result.Replica = c.Replicas result.ClusterId = c.ClusterId result.Strategy = as.option.StrategyName @@ -222,6 +226,10 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa if err != nil { return nil, errors.New("database add failed: " + err.Error()) } + adapterName, err := as.AiStorages.GetAdapterNameById(as.option.AdapterId) + if err != nil { + return nil, err + } var errmsg string for _, err := range errs { @@ -234,7 +242,7 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa clusterName, _ := as.AiStorages.GetClusterNameById(e.clusterId) - err := as.AiStorages.SaveAiTask(taskId, as.option, e.clusterId, clusterName, "", constants.Failed, msg) + err := as.AiStorages.SaveAiTask(taskId, as.option, adapterName, e.clusterId, clusterName, "", constants.Failed, msg) if err != nil { return nil, errors.New("database add failed: " + err.Error()) } @@ -246,14 +254,14 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa if s.Msg != "" { msg := fmt.Sprintf("clusterId: %v , error: %v \n", s.ClusterId, s.Msg) errmsg += msg - err := as.AiStorages.SaveAiTask(taskId, as.option, s.ClusterId, clusterName, "", constants.Failed, msg) + err := as.AiStorages.SaveAiTask(taskId, as.option, adapterName, s.ClusterId, clusterName, "", constants.Failed, msg) if err != nil { return nil, errors.New("database add failed: " + err.Error()) } } else { msg := fmt.Sprintf("clusterId: %v , submitted successfully, jobId: %v \n", s.ClusterId, s.JobId) errmsg += msg - err := as.AiStorages.SaveAiTask(taskId, as.option, s.ClusterId, clusterName, s.JobId, constants.Saved, msg) + err := as.AiStorages.SaveAiTask(taskId, as.option, adapterName, s.ClusterId, clusterName, s.JobId, constants.Saved, msg) if err != nil { return nil, errors.New("database add failed: " + err.Error()) } diff --git a/api/internal/scheduler/service/aiService.go b/api/internal/scheduler/service/aiService.go index 0567e4a3..3b34bd5a 100644 --- a/api/internal/scheduler/service/aiService.go +++ b/api/internal/scheduler/service/aiService.go @@ -13,6 +13,7 @@ import ( "gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice" "gitlink.org.cn/JointCloud/pcm-octopus/octopusclient" "strconv" + "sync" ) const ( @@ -24,6 +25,8 @@ const ( type AiService struct { AiExecutorAdapterMap map[string]map[string]executor.AiExecutor AiCollectorAdapterMap map[string]map[string]collector.AiCollector + Storage *database.AiStorage + mu sync.Mutex } func NewAiService(conf *config.Config, storages *database.AiStorage) (*AiService, error) { @@ -35,12 +38,16 @@ func NewAiService(conf *config.Config, storages *database.AiStorage) (*AiService aiService := &AiService{ AiExecutorAdapterMap: make(map[string]map[string]executor.AiExecutor), AiCollectorAdapterMap: make(map[string]map[string]collector.AiCollector), + Storage: storages, } for _, id := range adapterIds { clusters, err := storages.GetClustersByAdapterId(id) if err != nil { return nil, err } + if len(clusters.List) == 0 { + continue + } exeClusterMap, colClusterMap := InitAiClusterMap(conf, clusters.List) aiService.AiExecutorAdapterMap[id] = exeClusterMap aiService.AiCollectorAdapterMap[id] = colClusterMap @@ -78,3 +85,11 @@ func InitAiClusterMap(conf *config.Config, clusters []types.ClusterInfo) (map[st return executorMap, collectorMap } + +//func (a *AiService) AddCluster() error { +// +//} +// +//func (a *AiService) AddAdapter() error { +// +//} diff --git a/api/internal/svc/servicecontext.go b/api/internal/svc/servicecontext.go index ce687280..21334e33 100644 --- a/api/internal/svc/servicecontext.go +++ b/api/internal/svc/servicecontext.go @@ -87,7 +87,7 @@ func NewServiceContext(c config.Config) *ServiceContext { NamingStrategy: schema.NamingStrategy{ SingularTable: true, // 使用单数表名,启用该选项,此时,`User` 的表名应该是 `t_user` }, - Logger: logger.Default.LogMode(logger.Error), + Logger: logger.Default.LogMode(logger.Info), }) if err != nil { logx.Errorf("数据库连接失败, err%v", err) diff --git a/api/internal/types/types.go b/api/internal/types/types.go index 1539f520..e8f7fb77 100644 --- a/api/internal/types/types.go +++ b/api/internal/types/types.go @@ -202,7 +202,7 @@ type CommitVmTaskReq struct { ImageRef int64 `json:"imageRef,optional"` FlavorRef int64 `json:"flavorRef,optional"` Uuid int64 `json:"uuid,optional"` - Replicas int32 `json:"replicas,string"` + Replicas int64 `json:"replicas,string"` VmName string `json:"vm_name,optional"` } @@ -1166,6 +1166,16 @@ type TaskDetailsResp struct { Strategy int64 `json:"strategy"` SynergyStatus int64 `json:"synergyStatus"` ClusterInfos []*ClusterInfo `json:"clusterInfos"` + SubTaskInfos []*SubTaskInfo `json:"subTaskInfos"` +} + +type SubTaskInfo struct { + Id string `json:"id" db:"id"` + Name string `json:"name" db:"name"` + ClusterId string `json:"clusterId" db:"cluster_id"` + ClusterName string `json:"clusterName" db:"cluster_name"` + Status string `json:"status" db:"status"` + Remark string `json:"remark" db:"remark"` } type CommitHpcTaskReq struct { diff --git a/pkg/models/cloud/task_cloud.go b/pkg/models/cloud/task_cloud.go index 5cb1e1c9..39a51afe 100644 --- a/pkg/models/cloud/task_cloud.go +++ b/pkg/models/cloud/task_cloud.go @@ -7,6 +7,7 @@ import ( type TaskCloudModel struct { Id uint `json:"id" gorm:"primarykey;not null;comment:id"` + Name string `json:"name" gorm:"null;comment:名称"` TaskId uint `json:"taskId" gorm:"not null;comment:task表id"` AdapterId uint `json:"adapterId" gorm:"not null;comment:适配器id"` AdapterName string `json:"adapterName" gorm:"not null;comment:适配器名称"` diff --git a/pkg/models/taskvmmodel_gen.go b/pkg/models/taskvmmodel_gen.go index fa749168..9389a2c8 100644 --- a/pkg/models/taskvmmodel_gen.go +++ b/pkg/models/taskvmmodel_gen.go @@ -37,7 +37,7 @@ type ( TaskVm struct { Id int64 `db:"id"` // id TaskId int64 `db:"task_id"` // 任务id - Name string `db:"name"` // 虚拟机名称 + Name string `db:"name"` // 任务名称 AdapterId int64 `db:"adapter_id"` // 执行任务的适配器id AdapterName string `db:"adapter_name"` // 适配器名称 ClusterId int64 `db:"cluster_id"` // 执行任务的集群id @@ -53,7 +53,10 @@ type ( StartTime string `db:"start_time"` // 开始时间 RunningTime string `db:"running_time"` // 运行时间 Result string `db:"result"` // 运行结果 + Remark string `db:"remark"` // 备注 DeletedAt string `db:"deleted_at"` // 删除时间 + VmName string `db:"vm_name"` // 虚拟机名称 + Replicas int64 `db:"replicas"` // 副本数 } ) @@ -92,14 +95,14 @@ func (m *defaultTaskVmModel) FindOne(ctx context.Context, id int64) (*TaskVm, er } func (m *defaultTaskVmModel) Insert(ctx context.Context, data *TaskVm) (sql.Result, error) { - query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, taskVmRowsExpectAutoSet) - ret, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.Name, data.AdapterId, data.AdapterName, data.ClusterId, data.ClusterName, data.FlavorRef, data.ImageRef, data.Status, data.Platform, data.Description, data.AvailabilityZone, data.MinCount, data.Uuid, data.StartTime, data.RunningTime, data.Result, data.DeletedAt) + query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, taskVmRowsExpectAutoSet) + ret, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.Name, data.AdapterId, data.AdapterName, data.ClusterId, data.ClusterName, data.FlavorRef, data.ImageRef, data.Status, data.Platform, data.Description, data.AvailabilityZone, data.MinCount, data.Uuid, data.StartTime, data.RunningTime, data.Result, data.Remark, data.DeletedAt, data.VmName, data.Replicas) return ret, err } func (m *defaultTaskVmModel) Update(ctx context.Context, data *TaskVm) error { query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, taskVmRowsWithPlaceHolder) - _, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.Name, data.AdapterId, data.AdapterName, data.ClusterId, data.ClusterName, data.FlavorRef, data.ImageRef, data.Status, data.Platform, data.Description, data.AvailabilityZone, data.MinCount, data.Uuid, data.StartTime, data.RunningTime, data.Result, data.DeletedAt, data.Id) + _, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.Name, data.AdapterId, data.AdapterName, data.ClusterId, data.ClusterName, data.FlavorRef, data.ImageRef, data.Status, data.Platform, data.Description, data.AvailabilityZone, data.MinCount, data.Uuid, data.StartTime, data.RunningTime, data.Result, data.Remark, data.DeletedAt, data.VmName, data.Replicas, data.Id) return err } diff --git a/pkg/models/tclusterresourcemodel_gen.go b/pkg/models/tclusterresourcemodel_gen.go index 1bb40107..f02722ff 100644 --- a/pkg/models/tclusterresourcemodel_gen.go +++ b/pkg/models/tclusterresourcemodel_gen.go @@ -48,6 +48,7 @@ type ( GpuTotal float64 `db:"gpu_total"` CardTotal int64 `db:"card_total"` // 算力卡数量 CardTopsTotal float64 `db:"card_tops_total"` // 算力总量tops + AdapterId int64 `db:"adapter_id"` } ) @@ -86,14 +87,14 @@ func (m *defaultTClusterResourceModel) FindOne(ctx context.Context, clusterId in } func (m *defaultTClusterResourceModel) Insert(ctx context.Context, data *TClusterResource) (sql.Result, error) { - query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, tClusterResourceRowsExpectAutoSet) - ret, err := m.conn.ExecCtx(ctx, query, data.ClusterId, data.ClusterName, data.ClusterType, data.CpuAvail, data.CpuTotal, data.MemAvail, data.MemTotal, data.DiskAvail, data.DiskTotal, data.GpuAvail, data.GpuTotal, data.CardTotal, data.CardTopsTotal) + query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, tClusterResourceRowsExpectAutoSet) + ret, err := m.conn.ExecCtx(ctx, query, data.ClusterId, data.ClusterName, data.ClusterType, data.CpuAvail, data.CpuTotal, data.MemAvail, data.MemTotal, data.DiskAvail, data.DiskTotal, data.GpuAvail, data.GpuTotal, data.CardTotal, data.CardTopsTotal, data.AdapterId) return ret, err } func (m *defaultTClusterResourceModel) Update(ctx context.Context, data *TClusterResource) error { query := fmt.Sprintf("update %s set %s where `cluster_id` = ?", m.table, tClusterResourceRowsWithPlaceHolder) - _, err := m.conn.ExecCtx(ctx, query, data.ClusterName, data.ClusterType, data.CpuAvail, data.CpuTotal, data.MemAvail, data.MemTotal, data.DiskAvail, data.DiskTotal, data.GpuAvail, data.GpuTotal, data.CardTotal, data.CardTopsTotal, data.ClusterId) + _, err := m.conn.ExecCtx(ctx, query, data.ClusterName, data.ClusterType, data.CpuAvail, data.CpuTotal, data.MemAvail, data.MemTotal, data.DiskAvail, data.DiskTotal, data.GpuAvail, data.GpuTotal, data.CardTotal, data.CardTopsTotal, data.AdapterId, data.ClusterId) return err }