From cbc4674662fa07a3ee85f83af894d3f80fa979af Mon Sep 17 00:00:00 2001 From: zhouqunjie Date: Wed, 13 Mar 2024 20:49:10 +0800 Subject: [PATCH] pcm-client for participant to pull and push task info Former-commit-id: 9e0057609cdd0c9aa60a1c97d23a8012254fadd1 --- api/client/client.go | 13 ++++ api/client/client_impl.go | 41 +++++++++++ api/client/task.go | 41 +++++++++++ api/client/task_impl.go | 143 ++++++++++++++++++++++++++++++++++++++ api/client/types.go | 79 +++++++++++++++++++++ 5 files changed, 317 insertions(+) create mode 100644 api/client/client.go create mode 100644 api/client/client_impl.go create mode 100644 api/client/task.go create mode 100644 api/client/task_impl.go create mode 100644 api/client/types.go diff --git a/api/client/client.go b/api/client/client.go new file mode 100644 index 00000000..1e13a835 --- /dev/null +++ b/api/client/client.go @@ -0,0 +1,13 @@ +package client + +type Options struct { + Url string + DataSource string +} +type Client interface { + Task(TaskOptions) (Task, error) +} + +func NewClient(options Options) (Client, error) { + return newClient(options) +} diff --git a/api/client/client_impl.go b/api/client/client_impl.go new file mode 100644 index 00000000..3b69c1b8 --- /dev/null +++ b/api/client/client_impl.go @@ -0,0 +1,41 @@ +package client + +import ( + "gorm.io/driver/mysql" + "gorm.io/gorm" + "gorm.io/gorm/logger" + "gorm.io/gorm/schema" + "time" +) + +type client struct { + url string + dataSource string + DbEngin *gorm.DB +} + +func (c *client) Task(options TaskOptions) (Task, error) { + task, _ := newTask(c, &options) + return task, nil +} + +func newClient(options Options) (Client, error) { + //init dbEngine + dbEngin, _ := gorm.Open(mysql.Open(options.DataSource), &gorm.Config{ + NamingStrategy: schema.NamingStrategy{ + SingularTable: true, + }, + Logger: logger.Default.LogMode(logger.Warn), + }) + sqlDB, _ := dbEngin.DB() + sqlDB.SetMaxIdleConns(10) + sqlDB.SetMaxOpenConns(50) + sqlDB.SetConnMaxLifetime(time.Hour) + c := &client{ + url: options.Url, + dataSource: options.DataSource, + DbEngin: dbEngin, + } + + return c, nil +} diff --git a/api/client/task.go b/api/client/task.go new file mode 100644 index 00000000..557f2052 --- /dev/null +++ b/api/client/task.go @@ -0,0 +1,41 @@ +package client + +type TaskOptions struct { + pullTaskInfoReq PullTaskInfoReq + pushTaskInfoReq PushTaskInfoReq + pushResourceInfoReq PushResourceInfoReq +} + +type PullTaskInfoReq struct { + AdapterId int64 `json:"adapterId"` +} + +type PullTaskInfoResp struct { + HpcInfoList []*HpcInfo `json:"HpcInfoList,omitempty"` + CloudInfoList []*CloudInfo `json:"CloudInfoList,omitempty"` + AiInfoList []*AiInfo `json:"AiInfoList,omitempty"` + VmInfoList []*VmInfo `json:"VmInfoList,omitempty"` +} + +type PushTaskInfoReq struct { + AdapterId int64 `json:"adapterId"` + HpcInfoList []*HpcInfo + CloudInfoList []*CloudInfo + AiInfoList []*AiInfo + VmInfoList []*VmInfo +} + +type PushTaskInfoResp struct { + Code int64 + Msg string +} + +type PushResourceInfoReq struct { + AdapterId int64 `json:"adapterId"` +} + +type Task interface { + PullTaskInfo(pullTaskInfoReq PullTaskInfoReq) (*PullTaskInfoResp, error) + PushTaskInfo(pushTaskInfoReq PushTaskInfoReq) (*PushTaskInfoResp, error) + PushResourceInfo(pushResourceInfoReq PushResourceInfoReq) +} diff --git a/api/client/task_impl.go b/api/client/task_impl.go new file mode 100644 index 00000000..cb716d65 --- /dev/null +++ b/api/client/task_impl.go @@ -0,0 +1,143 @@ +package client + +import ( + "github.com/zeromicro/go-zero/core/logx" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" + "gorm.io/gorm" + "log" + "strings" + "sync" +) + +type task struct { + sync.RWMutex + client *client + options *TaskOptions + log log.Logger +} + +func newTask(client *client, options *TaskOptions) (*task, error) { + task := &task{ + RWMutex: sync.RWMutex{}, + client: client, + options: options, + log: log.Logger{}, + } + return task, nil +} + +func (t task) PullTaskInfo(pullTaskInfoReq PullTaskInfoReq) (*PullTaskInfoResp, error) { + result := PullTaskInfoResp{} + // 查询p端类型 + var kind int32 + t.client.DbEngin.Raw("select type as kind from `t_adapter` where id = ?", pullTaskInfoReq.AdapterId).Scan(&kind) + // 查询云智超中的数据列表 + switch kind { + case 2: + var hpcModelList []models.TaskHpc + findModelList(pullTaskInfoReq.AdapterId, t.client.DbEngin, &hpcModelList) + utils.Convert(hpcModelList, &result.HpcInfoList) + case 0: + var cloudModelList []models.Cloud + findModelList(pullTaskInfoReq.AdapterId, t.client.DbEngin, &cloudModelList) + utils.Convert(cloudModelList, &result.CloudInfoList) + case 1: + var aiModelList []models.Ai + findModelList(pullTaskInfoReq.AdapterId, t.client.DbEngin, &aiModelList) + utils.Convert(aiModelList, &result.AiInfoList) + } + return &result, nil +} + +func (t task) PushTaskInfo(pushTaskInfoReq PushTaskInfoReq) (*PushTaskInfoResp, error) { + // 查询p端类型 + var kind int32 + t.client.DbEngin.Raw("select type as kind from t_adapter where id = ?", pushTaskInfoReq.AdapterId).Scan(&kind) + switch kind { + case 0: + for _, cloudInfo := range pushTaskInfoReq.CloudInfoList { + t.client.DbEngin.Exec("update cloud set status = ?,start_time = ?,result = ? where participant_id = ? and id = ?", + cloudInfo.Status, cloudInfo.StartTime, cloudInfo.Result, pushTaskInfoReq.AdapterId, cloudInfo.Id) + syncTask(t.client.DbEngin, cloudInfo.TaskId) + } + case 2: + for _, hpcInfo := range pushTaskInfoReq.HpcInfoList { + t.client.DbEngin.Exec("update task_hpc set status = ?,start_time = ?,job_id = ? where cluster_id = ? and task_id = ? and name = ?", + hpcInfo.Status, hpcInfo.StartTime, hpcInfo.RunningTime, hpcInfo.JobId, pushTaskInfoReq.AdapterId, hpcInfo.TaskId, hpcInfo.Name) + syncTask(t.client.DbEngin, hpcInfo.TaskId) + } + case 1: + for _, aiInfo := range pushTaskInfoReq.AiInfoList { + t.client.DbEngin.Exec("update ai set status = ?,start_time = ?,project_id = ?,job_id = ? where participant_id = ? and task_id = ? and name = ?", + aiInfo.Status, aiInfo.StartTime, aiInfo.ProjectId, aiInfo.JobId, pushTaskInfoReq.AdapterId, aiInfo.TaskId, aiInfo.Name) + syncTask(t.client.DbEngin, aiInfo.TaskId) + } + } + + return &PushTaskInfoResp{}, nil +} + +func (t task) PushResourceInfo(pushResourceInfoReq PushResourceInfoReq) { + //TODO implement me + panic("implement me") +} + +func findModelList(participantId int64, dbEngin *gorm.DB, data interface{}) error { + tx := dbEngin.Where("cluster_id = (select id from t_cluster where adapter_id = ?) AND status NOT IN ?", participantId, []string{"Deleted", "Succeeded", "Completed", "Failed"}).Find(data) + if tx.Error != nil { + return tx.Error + } + return nil +} + +func syncTask(gorm *gorm.DB, taskId int64) { + + var allStatus string + tx := gorm.Raw("SELECT CONCAT_WS(',',GROUP_CONCAT(DISTINCT h.status) ,GROUP_CONCAT(DISTINCT a.status) ,GROUP_CONCAT(DISTINCT c.status))as status from task t left join hpc h on t.id = h.task_id left join cloud c on t.id = c.task_id left join ai a on t.id = a.task_id where t.id = ?", taskId).Scan(&allStatus) + if tx.Error != nil { + logx.Error(tx.Error) + } + // 子状态统一则修改主任务状态 + statusArray := strings.Split(allStatus, ",") + if len(removeRepeatedElement(statusArray)) == 1 { + updateTask(gorm, taskId, statusArray[0]) + + } + // 子任务包含失败状态 主任务则失败 + if strings.Contains(allStatus, constants.Failed) { + updateTask(gorm, taskId, constants.Failed) + + } + if strings.Contains(allStatus, constants.Running) { + updateTask(gorm, taskId, constants.Running) + } + +} + +func updateTask(gorm *gorm.DB, taskId int64, status string) { + var task models.Task + gorm.Where("id = ? ", taskId).Find(&task) + if task.Status != status { + task.Status = status + gorm.Updates(&task) + } +} + +func removeRepeatedElement(arr []string) (newArr []string) { + newArr = make([]string, 0) + for i := 0; i < len(arr); i++ { + repeat := false + for j := i + 1; j < len(arr); j++ { + if arr[i] == arr[j] { + repeat = true + break + } + } + if !repeat { + newArr = append(newArr, arr[i]) + } + } + return +} diff --git a/api/client/types.go b/api/client/types.go new file mode 100644 index 00000000..69e828ab --- /dev/null +++ b/api/client/types.go @@ -0,0 +1,79 @@ +package client + +type HpcInfo struct { + ParticipantId int64 `json:"participantId,omitempty"` + TaskId int64 `json:"taskId,omitempty"` + JobId string `json:"jobId,omitempty"` + Name string `json:"name,omitempty"` + Status string `json:"status,omitempty"` + StartTime string `json:"startTime,omitempty"` + RunningTime int64 `json:"runningTime,omitempty"` + Result string `json:"result,omitempty"` + WorkDir string `json:"workDir,omitempty"` + WallTime string `json:"wallTime,omitempty"` + CmdScript string `json:"cmdScript,omitempty"` + DerivedEs string `json:"derivedEs,omitempty"` + Cluster string `json:"cluster,omitempty"` + BlockId string `json:"blockId,omitempty"` + AllocNodes uint32 `json:"allocNodes,omitempty"` + AllocCpu uint32 `json:"allocCpu,omitempty"` + Version string `json:"version,omitempty"` + Account string `json:"account,omitempty"` + ExitCode uint32 `json:"exitCode,omitempty"` + AssocId uint32 `json:"assocId,omitempty"` + AppType string `json:"appType,omitempty"` + AppName string `json:"appName,omitempty"` + Queue string `json:"queue,omitempty"` + SubmitType string `json:"submitType,omitempty"` + NNode string `json:"nNode,omitempty"` + StdOutFile string `json:"stdOutFile,omitempty"` + StdErrFile string `json:"stdErrFile,omitempty"` + StdInput string `json:"stdInput,omitempty"` + Environment string `json:"environment,omitempty"` +} + +type CloudInfo struct { + Participant int64 `json:"participant,omitempty"` + Id int64 `json:"id,omitempty"` + TaskId int64 `json:"taskId,omitempty"` + ApiVersion string `json:"apiVersion,omitempty"` + Kind string `json:"kind,omitempty"` + Namespace string `json:"namespace,omitempty"` + Name string `json:"name,omitempty"` + Status string `json:"status,omitempty"` + StartTime string `json:"startTime,omitempty"` + RunningTime int64 `json:"runningTime,omitempty"` + Result string `json:"result,omitempty"` + YamlString string `json:"yamlString,omitempty"` +} + +type AiInfo struct { + ParticipantId int64 `json:"participantId,omitempty"` + TaskId int64 `json:"taskId,omitempty"` + ProjectId string `json:"project_id,omitempty"` + Name string `json:"name,omitempty"` + Status string `json:"status,omitempty"` + StartTime string `json:"startTime,omitempty"` + RunningTime int64 `json:"runningTime,omitempty"` + Result string `json:"result,omitempty"` + JobId string `json:"jobId,omitempty"` + CreateTime string `json:"createTime,omitempty"` + ImageUrl string `json:"imageUrl,omitempty"` + Command string `json:"command,omitempty"` + FlavorId string `json:"flavorId,omitempty"` + SubscriptionId string `json:"subscriptionId,omitempty"` + ItemVersionId string `json:"itemVersionId,omitempty"` +} + +type VmInfo struct { + ParticipantId int64 `json:"participantId,omitempty"` + TaskId int64 `json:"taskId,omitempty"` + Name string `json:"name,omitempty"` + FlavorRef string `json:"flavor_ref,omitempty"` + ImageRef string `json:"image_ref,omitempty"` + NetworkUuid string `json:"network_uuid,omitempty"` + BlockUuid string `json:"block_uuid,omitempty"` + SourceType string `json:"source_type,omitempty"` + DeleteOnTermination bool `json:"delete_on_termination,omitempty"` + State string `json:"state,omitempty"` +}