diff --git a/api/internal/cron/aiCronTask.go b/api/internal/cron/aiCronTask.go index 0330e01f..80a9d877 100644 --- a/api/internal/cron/aiCronTask.go +++ b/api/internal/cron/aiCronTask.go @@ -271,9 +271,6 @@ func UpdateAiAdapterMaps(svc *svc.ServiceContext) { } for _, id := range adapterIds { - if isAdapterExist(svc, id) { - continue - } clusters, err := svc.Scheduler.AiStorages.GetClustersByAdapterId(id) if err != nil { msg := fmt.Sprintf("###UpdateAiAdapterMaps###, error: %v \n", err.Error()) @@ -283,16 +280,68 @@ func UpdateAiAdapterMaps(svc *svc.ServiceContext) { if len(clusters.List) == 0 { continue } - exeClusterMap, colClusterMap := InitAiClusterMap(&svc.Config, clusters.List) - svc.Scheduler.AiService.AiExecutorAdapterMap[id] = exeClusterMap - svc.Scheduler.AiService.AiCollectorAdapterMap[id] = colClusterMap + if isAdapterExist(svc, id, len(clusters.List)) { + continue + } else { + if isAdapterEmpty(svc, id) { + exeClusterMap, colClusterMap := InitAiClusterMap(&svc.Config, clusters.List) + svc.Scheduler.AiService.AiExecutorAdapterMap[id] = exeClusterMap + svc.Scheduler.AiService.AiCollectorAdapterMap[id] = colClusterMap + } else { + UpdateClusterMaps(svc, id, clusters.List) + } + } } } -func isAdapterExist(svc *svc.ServiceContext, id string) bool { +func UpdateClusterMaps(svc *svc.ServiceContext, adapterId string, clusters []types.ClusterInfo) { + for _, c := range clusters { + _, ok := svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] + _, ok2 := svc.Scheduler.AiService.AiCollectorAdapterMap[adapterId][c.Id] + if !ok && !ok2 { + switch c.Name { + case OCTOPUS: + id, _ := strconv.ParseInt(c.Id, 10, 64) + octopusRpc := octopusclient.NewOctopus(zrpc.MustNewClient(svc.Config.OctopusRpcConf)) + octopus := storeLink.NewOctopusLink(octopusRpc, c.Nickname, id) + svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = octopus + svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = octopus + case MODELARTS: + id, _ := strconv.ParseInt(c.Id, 10, 64) + modelArtsRpc := modelartsservice.NewModelArtsService(zrpc.MustNewClient(svc.Config.ModelArtsRpcConf)) + modelArtsImgRpc := imagesservice.NewImagesService(zrpc.MustNewClient(svc.Config.ModelArtsImgRpcConf)) + modelarts := storeLink.NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, c.Name, id, c.Nickname) + svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = modelarts + svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = modelarts + case SHUGUANGAI: + id, _ := strconv.ParseInt(c.Id, 10, 64) + aCRpc := hpcacclient.NewHpcAC(zrpc.MustNewClient(svc.Config.ACRpcConf)) + sgai := storeLink.NewShuguangAi(aCRpc, c.Nickname, id) + svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = sgai + svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = sgai + } + } else { + continue + } + } + +} + +func isAdapterExist(svc *svc.ServiceContext, id string, clusterNum int) bool { + emap, ok := svc.Scheduler.AiService.AiExecutorAdapterMap[id] + cmap, ok2 := svc.Scheduler.AiService.AiCollectorAdapterMap[id] + if ok && ok2 { + if len(emap) == clusterNum && len(cmap) == clusterNum { + return true + } + } + return false +} + +func isAdapterEmpty(svc *svc.ServiceContext, id string) bool { _, ok := svc.Scheduler.AiService.AiExecutorAdapterMap[id] _, ok2 := svc.Scheduler.AiService.AiCollectorAdapterMap[id] - if ok && ok2 { + if !ok && !ok2 { return true } return false @@ -375,7 +424,7 @@ func UpdateClusterResource(svc *svc.ServiceContext) { } if (models.TClusterResource{} == *clusterResource) { - err = svc.Scheduler.AiStorages.SaveClusterResources(c.Id, c.Name, clusterType, float64(stat.CpuCoreAvail), float64(stat.CpuCoreTotal), + err = svc.Scheduler.AiStorages.SaveClusterResources(adapter.Id, c.Id, c.Name, clusterType, float64(stat.CpuCoreAvail), float64(stat.CpuCoreTotal), stat.MemAvail, stat.MemTotal, stat.DiskAvail, stat.DiskTotal, float64(stat.GpuAvail), float64(stat.GpuTotal), cardTotal, topsTotal) if err != nil { wg.Done() diff --git a/api/internal/cron/cron.go b/api/internal/cron/cron.go index a7e682ac..00480db9 100644 --- a/api/internal/cron/cron.go +++ b/api/internal/cron/cron.go @@ -44,7 +44,7 @@ func AddCronGroup(svc *svc.ServiceContext) { UpdateAiAdapterMaps(svc) }) - svc.Cron.AddFunc("@every 7h30m", func() { + svc.Cron.AddFunc("*/59 * * * * ?", func() { UpdateClusterResource(svc) }) } diff --git a/api/internal/scheduler/database/aiStorage.go b/api/internal/scheduler/database/aiStorage.go index d88edca2..ee3d8680 100644 --- a/api/internal/scheduler/database/aiStorage.go +++ b/api/internal/scheduler/database/aiStorage.go @@ -187,13 +187,18 @@ func (s *AiStorage) GetClusterResourcesById(clusterId string) (*models.TClusterR return &clusterResource, nil } -func (s *AiStorage) SaveClusterResources(clusterId string, clusterName string, clusterType int64, cpuAvail float64, cpuTotal float64, +func (s *AiStorage) SaveClusterResources(adapterId string, clusterId string, clusterName string, clusterType int64, cpuAvail float64, cpuTotal float64, memAvail float64, memTotal float64, diskAvail float64, diskTotal float64, gpuAvail float64, gpuTotal float64, cardTotal int64, topsTotal float64) error { cId, err := strconv.ParseInt(clusterId, 10, 64) if err != nil { return err } + aId, err := strconv.ParseInt(adapterId, 10, 64) + if err != nil { + return err + } clusterResource := models.TClusterResource{ + AdapterId: aId, ClusterId: cId, ClusterName: clusterName, ClusterType: clusterType,