Merge pull request 'fix clusterResouces bugs' (#217) from tzwang/pcm-coordinator:master into master
Former-commit-id: dd2de7fa06b567190ee40dbe20b58188d4c4af25
This commit is contained in:
commit
7bfbed0ce9
|
@ -271,9 +271,6 @@ func UpdateAiAdapterMaps(svc *svc.ServiceContext) {
|
|||
}
|
||||
|
||||
for _, id := range adapterIds {
|
||||
if isAdapterExist(svc, id) {
|
||||
continue
|
||||
}
|
||||
clusters, err := svc.Scheduler.AiStorages.GetClustersByAdapterId(id)
|
||||
if err != nil {
|
||||
msg := fmt.Sprintf("###UpdateAiAdapterMaps###, error: %v \n", err.Error())
|
||||
|
@ -283,16 +280,68 @@ func UpdateAiAdapterMaps(svc *svc.ServiceContext) {
|
|||
if len(clusters.List) == 0 {
|
||||
continue
|
||||
}
|
||||
exeClusterMap, colClusterMap := InitAiClusterMap(&svc.Config, clusters.List)
|
||||
svc.Scheduler.AiService.AiExecutorAdapterMap[id] = exeClusterMap
|
||||
svc.Scheduler.AiService.AiCollectorAdapterMap[id] = colClusterMap
|
||||
if isAdapterExist(svc, id, len(clusters.List)) {
|
||||
continue
|
||||
} else {
|
||||
if isAdapterEmpty(svc, id) {
|
||||
exeClusterMap, colClusterMap := InitAiClusterMap(&svc.Config, clusters.List)
|
||||
svc.Scheduler.AiService.AiExecutorAdapterMap[id] = exeClusterMap
|
||||
svc.Scheduler.AiService.AiCollectorAdapterMap[id] = colClusterMap
|
||||
} else {
|
||||
UpdateClusterMaps(svc, id, clusters.List)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func isAdapterExist(svc *svc.ServiceContext, id string) bool {
|
||||
func UpdateClusterMaps(svc *svc.ServiceContext, adapterId string, clusters []types.ClusterInfo) {
|
||||
for _, c := range clusters {
|
||||
_, ok := svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id]
|
||||
_, ok2 := svc.Scheduler.AiService.AiCollectorAdapterMap[adapterId][c.Id]
|
||||
if !ok && !ok2 {
|
||||
switch c.Name {
|
||||
case OCTOPUS:
|
||||
id, _ := strconv.ParseInt(c.Id, 10, 64)
|
||||
octopusRpc := octopusclient.NewOctopus(zrpc.MustNewClient(svc.Config.OctopusRpcConf))
|
||||
octopus := storeLink.NewOctopusLink(octopusRpc, c.Nickname, id)
|
||||
svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = octopus
|
||||
svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = octopus
|
||||
case MODELARTS:
|
||||
id, _ := strconv.ParseInt(c.Id, 10, 64)
|
||||
modelArtsRpc := modelartsservice.NewModelArtsService(zrpc.MustNewClient(svc.Config.ModelArtsRpcConf))
|
||||
modelArtsImgRpc := imagesservice.NewImagesService(zrpc.MustNewClient(svc.Config.ModelArtsImgRpcConf))
|
||||
modelarts := storeLink.NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, c.Name, id, c.Nickname)
|
||||
svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = modelarts
|
||||
svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = modelarts
|
||||
case SHUGUANGAI:
|
||||
id, _ := strconv.ParseInt(c.Id, 10, 64)
|
||||
aCRpc := hpcacclient.NewHpcAC(zrpc.MustNewClient(svc.Config.ACRpcConf))
|
||||
sgai := storeLink.NewShuguangAi(aCRpc, c.Nickname, id)
|
||||
svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = sgai
|
||||
svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = sgai
|
||||
}
|
||||
} else {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func isAdapterExist(svc *svc.ServiceContext, id string, clusterNum int) bool {
|
||||
emap, ok := svc.Scheduler.AiService.AiExecutorAdapterMap[id]
|
||||
cmap, ok2 := svc.Scheduler.AiService.AiCollectorAdapterMap[id]
|
||||
if ok && ok2 {
|
||||
if len(emap) == clusterNum && len(cmap) == clusterNum {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func isAdapterEmpty(svc *svc.ServiceContext, id string) bool {
|
||||
_, ok := svc.Scheduler.AiService.AiExecutorAdapterMap[id]
|
||||
_, ok2 := svc.Scheduler.AiService.AiCollectorAdapterMap[id]
|
||||
if ok && ok2 {
|
||||
if !ok && !ok2 {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
|
@ -375,7 +424,7 @@ func UpdateClusterResource(svc *svc.ServiceContext) {
|
|||
}
|
||||
|
||||
if (models.TClusterResource{} == *clusterResource) {
|
||||
err = svc.Scheduler.AiStorages.SaveClusterResources(c.Id, c.Name, clusterType, float64(stat.CpuCoreAvail), float64(stat.CpuCoreTotal),
|
||||
err = svc.Scheduler.AiStorages.SaveClusterResources(adapter.Id, c.Id, c.Name, clusterType, float64(stat.CpuCoreAvail), float64(stat.CpuCoreTotal),
|
||||
stat.MemAvail, stat.MemTotal, stat.DiskAvail, stat.DiskTotal, float64(stat.GpuAvail), float64(stat.GpuTotal), cardTotal, topsTotal)
|
||||
if err != nil {
|
||||
wg.Done()
|
||||
|
|
|
@ -44,7 +44,7 @@ func AddCronGroup(svc *svc.ServiceContext) {
|
|||
UpdateAiAdapterMaps(svc)
|
||||
})
|
||||
|
||||
svc.Cron.AddFunc("@every 7h30m", func() {
|
||||
svc.Cron.AddFunc("*/59 * * * * ?", func() {
|
||||
UpdateClusterResource(svc)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -187,13 +187,18 @@ func (s *AiStorage) GetClusterResourcesById(clusterId string) (*models.TClusterR
|
|||
return &clusterResource, nil
|
||||
}
|
||||
|
||||
func (s *AiStorage) SaveClusterResources(clusterId string, clusterName string, clusterType int64, cpuAvail float64, cpuTotal float64,
|
||||
func (s *AiStorage) SaveClusterResources(adapterId string, clusterId string, clusterName string, clusterType int64, cpuAvail float64, cpuTotal float64,
|
||||
memAvail float64, memTotal float64, diskAvail float64, diskTotal float64, gpuAvail float64, gpuTotal float64, cardTotal int64, topsTotal float64) error {
|
||||
cId, err := strconv.ParseInt(clusterId, 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
aId, err := strconv.ParseInt(adapterId, 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
clusterResource := models.TClusterResource{
|
||||
AdapterId: aId,
|
||||
ClusterId: cId,
|
||||
ClusterName: clusterName,
|
||||
ClusterType: clusterType,
|
||||
|
|
|
@ -48,6 +48,7 @@ type (
|
|||
GpuTotal float64 `db:"gpu_total"`
|
||||
CardTotal int64 `db:"card_total"` // 算力卡数量
|
||||
CardTopsTotal float64 `db:"card_tops_total"` // 算力总量tops
|
||||
AdapterId int64 `db:"adapter_id"`
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -86,14 +87,14 @@ func (m *defaultTClusterResourceModel) FindOne(ctx context.Context, clusterId in
|
|||
}
|
||||
|
||||
func (m *defaultTClusterResourceModel) Insert(ctx context.Context, data *TClusterResource) (sql.Result, error) {
|
||||
query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, tClusterResourceRowsExpectAutoSet)
|
||||
ret, err := m.conn.ExecCtx(ctx, query, data.ClusterId, data.ClusterName, data.ClusterType, data.CpuAvail, data.CpuTotal, data.MemAvail, data.MemTotal, data.DiskAvail, data.DiskTotal, data.GpuAvail, data.GpuTotal, data.CardTotal, data.CardTopsTotal)
|
||||
query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, tClusterResourceRowsExpectAutoSet)
|
||||
ret, err := m.conn.ExecCtx(ctx, query, data.ClusterId, data.ClusterName, data.ClusterType, data.CpuAvail, data.CpuTotal, data.MemAvail, data.MemTotal, data.DiskAvail, data.DiskTotal, data.GpuAvail, data.GpuTotal, data.CardTotal, data.CardTopsTotal, data.AdapterId)
|
||||
return ret, err
|
||||
}
|
||||
|
||||
func (m *defaultTClusterResourceModel) Update(ctx context.Context, data *TClusterResource) error {
|
||||
query := fmt.Sprintf("update %s set %s where `cluster_id` = ?", m.table, tClusterResourceRowsWithPlaceHolder)
|
||||
_, err := m.conn.ExecCtx(ctx, query, data.ClusterName, data.ClusterType, data.CpuAvail, data.CpuTotal, data.MemAvail, data.MemTotal, data.DiskAvail, data.DiskTotal, data.GpuAvail, data.GpuTotal, data.CardTotal, data.CardTopsTotal, data.ClusterId)
|
||||
_, err := m.conn.ExecCtx(ctx, query, data.ClusterName, data.ClusterType, data.CpuAvail, data.CpuTotal, data.MemAvail, data.MemTotal, data.DiskAvail, data.DiskTotal, data.GpuAvail, data.GpuTotal, data.CardTotal, data.CardTopsTotal, data.AdapterId, data.ClusterId)
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue