144 lines
4.6 KiB
Go
144 lines
4.6 KiB
Go
package client
|
|
|
|
import (
|
|
"github.com/zeromicro/go-zero/core/logx"
|
|
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
|
|
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
|
|
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
|
|
"gorm.io/gorm"
|
|
"log"
|
|
"strings"
|
|
"sync"
|
|
)
|
|
|
|
type task struct {
|
|
sync.RWMutex
|
|
client *client
|
|
options *TaskOptions
|
|
log log.Logger
|
|
}
|
|
|
|
func newTask(client *client, options *TaskOptions) (*task, error) {
|
|
task := &task{
|
|
RWMutex: sync.RWMutex{},
|
|
client: client,
|
|
options: options,
|
|
log: log.Logger{},
|
|
}
|
|
return task, nil
|
|
}
|
|
|
|
func (t task) PullTaskInfo(pullTaskInfoReq PullTaskInfoReq) (*PullTaskInfoResp, error) {
|
|
result := PullTaskInfoResp{}
|
|
// 查询p端类型
|
|
var kind int32
|
|
t.client.DbEngin.Raw("select type as kind from `t_adapter` where id = ?", pullTaskInfoReq.AdapterId).Scan(&kind)
|
|
// 查询云智超中的数据列表
|
|
switch kind {
|
|
case 2:
|
|
var hpcModelList []models.TaskHpc
|
|
findModelList(pullTaskInfoReq.AdapterId, t.client.DbEngin, &hpcModelList)
|
|
utils.Convert(hpcModelList, &result.HpcInfoList)
|
|
case 0:
|
|
var cloudModelList []models.Cloud
|
|
findModelList(pullTaskInfoReq.AdapterId, t.client.DbEngin, &cloudModelList)
|
|
utils.Convert(cloudModelList, &result.CloudInfoList)
|
|
case 1:
|
|
var aiModelList []models.Ai
|
|
findModelList(pullTaskInfoReq.AdapterId, t.client.DbEngin, &aiModelList)
|
|
utils.Convert(aiModelList, &result.AiInfoList)
|
|
}
|
|
return &result, nil
|
|
}
|
|
|
|
func (t task) PushTaskInfo(pushTaskInfoReq PushTaskInfoReq) (*PushTaskInfoResp, error) {
|
|
// 查询p端类型
|
|
var kind int32
|
|
t.client.DbEngin.Raw("select type as kind from t_adapter where id = ?", pushTaskInfoReq.AdapterId).Scan(&kind)
|
|
switch kind {
|
|
case 0:
|
|
for _, cloudInfo := range pushTaskInfoReq.CloudInfoList {
|
|
t.client.DbEngin.Exec("update cloud set status = ?,start_time = ?,result = ? where participant_id = ? and id = ?",
|
|
cloudInfo.Status, cloudInfo.StartTime, cloudInfo.Result, pushTaskInfoReq.AdapterId, cloudInfo.Id)
|
|
syncTask(t.client.DbEngin, cloudInfo.TaskId)
|
|
}
|
|
case 2:
|
|
for _, hpcInfo := range pushTaskInfoReq.HpcInfoList {
|
|
t.client.DbEngin.Exec("update task_hpc set status = ?,start_time = ?,job_id = ? where cluster_id = ? and task_id = ? and name = ?",
|
|
hpcInfo.Status, hpcInfo.StartTime, hpcInfo.RunningTime, hpcInfo.JobId, pushTaskInfoReq.AdapterId, hpcInfo.TaskId, hpcInfo.Name)
|
|
syncTask(t.client.DbEngin, hpcInfo.TaskId)
|
|
}
|
|
case 1:
|
|
for _, aiInfo := range pushTaskInfoReq.AiInfoList {
|
|
t.client.DbEngin.Exec("update ai set status = ?,start_time = ?,project_id = ?,job_id = ? where participant_id = ? and task_id = ? and name = ?",
|
|
aiInfo.Status, aiInfo.StartTime, aiInfo.ProjectId, aiInfo.JobId, pushTaskInfoReq.AdapterId, aiInfo.TaskId, aiInfo.Name)
|
|
syncTask(t.client.DbEngin, aiInfo.TaskId)
|
|
}
|
|
}
|
|
|
|
return &PushTaskInfoResp{}, nil
|
|
}
|
|
|
|
func (t task) PushResourceInfo(pushResourceInfoReq PushResourceInfoReq) {
|
|
//TODO implement me
|
|
panic("implement me")
|
|
}
|
|
|
|
func findModelList(participantId int64, dbEngin *gorm.DB, data interface{}) error {
|
|
tx := dbEngin.Where("cluster_id = (select id from t_cluster where adapter_id = ?) AND status NOT IN ?", participantId, []string{"Deleted", "Succeeded", "Completed", "Failed"}).Find(data)
|
|
if tx.Error != nil {
|
|
return tx.Error
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func syncTask(gorm *gorm.DB, taskId int64) {
|
|
|
|
var allStatus string
|
|
tx := gorm.Raw("SELECT CONCAT_WS(',',GROUP_CONCAT(DISTINCT h.status) ,GROUP_CONCAT(DISTINCT a.status) ,GROUP_CONCAT(DISTINCT c.status))as status from task t left join hpc h on t.id = h.task_id left join cloud c on t.id = c.task_id left join ai a on t.id = a.task_id where t.id = ?", taskId).Scan(&allStatus)
|
|
if tx.Error != nil {
|
|
logx.Error(tx.Error)
|
|
}
|
|
// 子状态统一则修改主任务状态
|
|
statusArray := strings.Split(allStatus, ",")
|
|
if len(removeRepeatedElement(statusArray)) == 1 {
|
|
updateTask(gorm, taskId, statusArray[0])
|
|
|
|
}
|
|
// 子任务包含失败状态 主任务则失败
|
|
if strings.Contains(allStatus, constants.Failed) {
|
|
updateTask(gorm, taskId, constants.Failed)
|
|
|
|
}
|
|
if strings.Contains(allStatus, constants.Running) {
|
|
updateTask(gorm, taskId, constants.Running)
|
|
}
|
|
|
|
}
|
|
|
|
func updateTask(gorm *gorm.DB, taskId int64, status string) {
|
|
var task models.Task
|
|
gorm.Where("id = ? ", taskId).Find(&task)
|
|
if task.Status != status {
|
|
task.Status = status
|
|
gorm.Updates(&task)
|
|
}
|
|
}
|
|
|
|
func removeRepeatedElement(arr []string) (newArr []string) {
|
|
newArr = make([]string, 0)
|
|
for i := 0; i < len(arr); i++ {
|
|
repeat := false
|
|
for j := i + 1; j < len(arr); j++ {
|
|
if arr[i] == arr[j] {
|
|
repeat = true
|
|
break
|
|
}
|
|
}
|
|
if !repeat {
|
|
newArr = append(newArr, arr[i])
|
|
}
|
|
}
|
|
return
|
|
}
|