fix clusterResouces bugs
Former-commit-id: da9b51cd929ee055b9623b9a94be3dd545c67a19
This commit is contained in:
parent
f4d1587527
commit
e53e9d5800
|
@ -271,9 +271,6 @@ func UpdateAiAdapterMaps(svc *svc.ServiceContext) {
|
|||
}
|
||||
|
||||
for _, id := range adapterIds {
|
||||
if isAdapterExist(svc, id) {
|
||||
continue
|
||||
}
|
||||
clusters, err := svc.Scheduler.AiStorages.GetClustersByAdapterId(id)
|
||||
if err != nil {
|
||||
msg := fmt.Sprintf("###UpdateAiAdapterMaps###, error: %v \n", err.Error())
|
||||
|
@ -283,16 +280,68 @@ func UpdateAiAdapterMaps(svc *svc.ServiceContext) {
|
|||
if len(clusters.List) == 0 {
|
||||
continue
|
||||
}
|
||||
if isAdapterExist(svc, id, len(clusters.List)) {
|
||||
continue
|
||||
} else {
|
||||
if isAdapterEmpty(svc, id) {
|
||||
exeClusterMap, colClusterMap := InitAiClusterMap(&svc.Config, clusters.List)
|
||||
svc.Scheduler.AiService.AiExecutorAdapterMap[id] = exeClusterMap
|
||||
svc.Scheduler.AiService.AiCollectorAdapterMap[id] = colClusterMap
|
||||
} else {
|
||||
UpdateClusterMaps(svc, id, clusters.List)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func isAdapterExist(svc *svc.ServiceContext, id string) bool {
|
||||
func UpdateClusterMaps(svc *svc.ServiceContext, adapterId string, clusters []types.ClusterInfo) {
|
||||
for _, c := range clusters {
|
||||
_, ok := svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id]
|
||||
_, ok2 := svc.Scheduler.AiService.AiCollectorAdapterMap[adapterId][c.Id]
|
||||
if !ok && !ok2 {
|
||||
switch c.Name {
|
||||
case OCTOPUS:
|
||||
id, _ := strconv.ParseInt(c.Id, 10, 64)
|
||||
octopusRpc := octopusclient.NewOctopus(zrpc.MustNewClient(svc.Config.OctopusRpcConf))
|
||||
octopus := storeLink.NewOctopusLink(octopusRpc, c.Nickname, id)
|
||||
svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = octopus
|
||||
svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = octopus
|
||||
case MODELARTS:
|
||||
id, _ := strconv.ParseInt(c.Id, 10, 64)
|
||||
modelArtsRpc := modelartsservice.NewModelArtsService(zrpc.MustNewClient(svc.Config.ModelArtsRpcConf))
|
||||
modelArtsImgRpc := imagesservice.NewImagesService(zrpc.MustNewClient(svc.Config.ModelArtsImgRpcConf))
|
||||
modelarts := storeLink.NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, c.Name, id, c.Nickname)
|
||||
svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = modelarts
|
||||
svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = modelarts
|
||||
case SHUGUANGAI:
|
||||
id, _ := strconv.ParseInt(c.Id, 10, 64)
|
||||
aCRpc := hpcacclient.NewHpcAC(zrpc.MustNewClient(svc.Config.ACRpcConf))
|
||||
sgai := storeLink.NewShuguangAi(aCRpc, c.Nickname, id)
|
||||
svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = sgai
|
||||
svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = sgai
|
||||
}
|
||||
} else {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func isAdapterExist(svc *svc.ServiceContext, id string, clusterNum int) bool {
|
||||
emap, ok := svc.Scheduler.AiService.AiExecutorAdapterMap[id]
|
||||
cmap, ok2 := svc.Scheduler.AiService.AiCollectorAdapterMap[id]
|
||||
if ok && ok2 {
|
||||
if len(emap) == clusterNum && len(cmap) == clusterNum {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func isAdapterEmpty(svc *svc.ServiceContext, id string) bool {
|
||||
_, ok := svc.Scheduler.AiService.AiExecutorAdapterMap[id]
|
||||
_, ok2 := svc.Scheduler.AiService.AiCollectorAdapterMap[id]
|
||||
if ok && ok2 {
|
||||
if !ok && !ok2 {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
|
@ -375,7 +424,7 @@ func UpdateClusterResource(svc *svc.ServiceContext) {
|
|||
}
|
||||
|
||||
if (models.TClusterResource{} == *clusterResource) {
|
||||
err = svc.Scheduler.AiStorages.SaveClusterResources(c.Id, c.Name, clusterType, float64(stat.CpuCoreAvail), float64(stat.CpuCoreTotal),
|
||||
err = svc.Scheduler.AiStorages.SaveClusterResources(adapter.Id, c.Id, c.Name, clusterType, float64(stat.CpuCoreAvail), float64(stat.CpuCoreTotal),
|
||||
stat.MemAvail, stat.MemTotal, stat.DiskAvail, stat.DiskTotal, float64(stat.GpuAvail), float64(stat.GpuTotal), cardTotal, topsTotal)
|
||||
if err != nil {
|
||||
wg.Done()
|
||||
|
|
|
@ -44,7 +44,7 @@ func AddCronGroup(svc *svc.ServiceContext) {
|
|||
UpdateAiAdapterMaps(svc)
|
||||
})
|
||||
|
||||
svc.Cron.AddFunc("@every 7h30m", func() {
|
||||
svc.Cron.AddFunc("*/59 * * * * ?", func() {
|
||||
UpdateClusterResource(svc)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -187,13 +187,18 @@ func (s *AiStorage) GetClusterResourcesById(clusterId string) (*models.TClusterR
|
|||
return &clusterResource, nil
|
||||
}
|
||||
|
||||
func (s *AiStorage) SaveClusterResources(clusterId string, clusterName string, clusterType int64, cpuAvail float64, cpuTotal float64,
|
||||
func (s *AiStorage) SaveClusterResources(adapterId string, clusterId string, clusterName string, clusterType int64, cpuAvail float64, cpuTotal float64,
|
||||
memAvail float64, memTotal float64, diskAvail float64, diskTotal float64, gpuAvail float64, gpuTotal float64, cardTotal int64, topsTotal float64) error {
|
||||
cId, err := strconv.ParseInt(clusterId, 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
aId, err := strconv.ParseInt(adapterId, 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
clusterResource := models.TClusterResource{
|
||||
AdapterId: aId,
|
||||
ClusterId: cId,
|
||||
ClusterName: clusterName,
|
||||
ClusterType: clusterType,
|
||||
|
|
Loading…
Reference in New Issue