pcm-coordinator/internal/cron/aiCronTask.go

94 lines
2.5 KiB
Go

package cron
import (
"errors"
"fmt"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
)
func GetTaskList(svc *svc.ServiceContext) ([]*types.TaskModel, error) {
limit := 10
offset := 0
var list []*types.TaskModel
db := svc.DbEngin.Model(&types.TaskModel{}).Table("task")
db = db.Where("deleted_at is null")
//count total
var total int64
err := db.Count(&total).Error
db.Limit(limit).Offset(offset)
if err != nil {
return nil, err
}
err = db.Order("created_time desc").Find(&list).Error
if err != nil {
return nil, err
}
return list, nil
}
func UpdateAiAdapterMaps(svc *svc.ServiceContext) {
var aiType = "1"
adapterIds, err := svc.Scheduler.AiStorages.GetAdapterIdsByType(aiType)
if err != nil {
msg := fmt.Sprintf("###UpdateAiAdapterMaps###, error: %v \n", err.Error())
logx.Errorf(errors.New(msg).Error())
return
}
if len(adapterIds) == 0 {
return
}
for _, id := range adapterIds {
clusters, err := svc.Scheduler.AiStorages.GetClustersByAdapterId(id)
if err != nil {
msg := fmt.Sprintf("###UpdateAiAdapterMaps###, error: %v \n", err.Error())
logx.Errorf(errors.New(msg).Error())
return
}
if len(clusters.List) == 0 {
continue
}
if isAdapterExist(svc, id, len(clusters.List)) {
continue
} else {
if isAdapterEmpty(svc, id) {
exeClusterMap, colClusterMap, inferMap := service.InitAiClusterMap(&svc.Config, clusters.List)
svc.Scheduler.AiService.AiExecutorAdapterMap[id] = exeClusterMap
svc.Scheduler.AiService.AiCollectorAdapterMap[id] = colClusterMap
svc.Scheduler.AiService.InferenceAdapterMap[id] = inferMap
} else {
svc.Scheduler.AiService.UpdateClusterMaps(&svc.Config, id, clusters.List)
}
}
}
}
func isAdapterExist(svc *svc.ServiceContext, id string, clusterNum int) bool {
emap, ok := svc.Scheduler.AiService.AiExecutorAdapterMap[id]
cmap, ok2 := svc.Scheduler.AiService.AiCollectorAdapterMap[id]
imap, ok3 := svc.Scheduler.AiService.InferenceAdapterMap[id]
if ok && ok2 && ok3 {
if len(emap) == clusterNum && len(cmap) == clusterNum && len(imap) == clusterNum {
return true
}
}
return false
}
func isAdapterEmpty(svc *svc.ServiceContext, id string) bool {
_, ok := svc.Scheduler.AiService.AiExecutorAdapterMap[id]
_, ok2 := svc.Scheduler.AiService.AiCollectorAdapterMap[id]
_, ok3 := svc.Scheduler.AiService.InferenceAdapterMap[id]
if !ok && !ok2 && !ok3 {
return true
}
return false
}