pcm-client for participant to pull and push task info
Former-commit-id: 9e0057609cdd0c9aa60a1c97d23a8012254fadd1
This commit is contained in:
parent
aa6b343f34
commit
cbc4674662
|
@ -0,0 +1,13 @@
|
|||
package client
|
||||
|
||||
type Options struct {
|
||||
Url string
|
||||
DataSource string
|
||||
}
|
||||
type Client interface {
|
||||
Task(TaskOptions) (Task, error)
|
||||
}
|
||||
|
||||
func NewClient(options Options) (Client, error) {
|
||||
return newClient(options)
|
||||
}
|
|
@ -0,0 +1,41 @@
|
|||
package client
|
||||
|
||||
import (
|
||||
"gorm.io/driver/mysql"
|
||||
"gorm.io/gorm"
|
||||
"gorm.io/gorm/logger"
|
||||
"gorm.io/gorm/schema"
|
||||
"time"
|
||||
)
|
||||
|
||||
type client struct {
|
||||
url string
|
||||
dataSource string
|
||||
DbEngin *gorm.DB
|
||||
}
|
||||
|
||||
func (c *client) Task(options TaskOptions) (Task, error) {
|
||||
task, _ := newTask(c, &options)
|
||||
return task, nil
|
||||
}
|
||||
|
||||
func newClient(options Options) (Client, error) {
|
||||
//init dbEngine
|
||||
dbEngin, _ := gorm.Open(mysql.Open(options.DataSource), &gorm.Config{
|
||||
NamingStrategy: schema.NamingStrategy{
|
||||
SingularTable: true,
|
||||
},
|
||||
Logger: logger.Default.LogMode(logger.Warn),
|
||||
})
|
||||
sqlDB, _ := dbEngin.DB()
|
||||
sqlDB.SetMaxIdleConns(10)
|
||||
sqlDB.SetMaxOpenConns(50)
|
||||
sqlDB.SetConnMaxLifetime(time.Hour)
|
||||
c := &client{
|
||||
url: options.Url,
|
||||
dataSource: options.DataSource,
|
||||
DbEngin: dbEngin,
|
||||
}
|
||||
|
||||
return c, nil
|
||||
}
|
|
@ -0,0 +1,41 @@
|
|||
package client
|
||||
|
||||
type TaskOptions struct {
|
||||
pullTaskInfoReq PullTaskInfoReq
|
||||
pushTaskInfoReq PushTaskInfoReq
|
||||
pushResourceInfoReq PushResourceInfoReq
|
||||
}
|
||||
|
||||
type PullTaskInfoReq struct {
|
||||
AdapterId int64 `json:"adapterId"`
|
||||
}
|
||||
|
||||
type PullTaskInfoResp struct {
|
||||
HpcInfoList []*HpcInfo `json:"HpcInfoList,omitempty"`
|
||||
CloudInfoList []*CloudInfo `json:"CloudInfoList,omitempty"`
|
||||
AiInfoList []*AiInfo `json:"AiInfoList,omitempty"`
|
||||
VmInfoList []*VmInfo `json:"VmInfoList,omitempty"`
|
||||
}
|
||||
|
||||
type PushTaskInfoReq struct {
|
||||
AdapterId int64 `json:"adapterId"`
|
||||
HpcInfoList []*HpcInfo
|
||||
CloudInfoList []*CloudInfo
|
||||
AiInfoList []*AiInfo
|
||||
VmInfoList []*VmInfo
|
||||
}
|
||||
|
||||
type PushTaskInfoResp struct {
|
||||
Code int64
|
||||
Msg string
|
||||
}
|
||||
|
||||
type PushResourceInfoReq struct {
|
||||
AdapterId int64 `json:"adapterId"`
|
||||
}
|
||||
|
||||
type Task interface {
|
||||
PullTaskInfo(pullTaskInfoReq PullTaskInfoReq) (*PullTaskInfoResp, error)
|
||||
PushTaskInfo(pushTaskInfoReq PushTaskInfoReq) (*PushTaskInfoResp, error)
|
||||
PushResourceInfo(pushResourceInfoReq PushResourceInfoReq)
|
||||
}
|
|
@ -0,0 +1,143 @@
|
|||
package client
|
||||
|
||||
import (
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
|
||||
"gorm.io/gorm"
|
||||
"log"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type task struct {
|
||||
sync.RWMutex
|
||||
client *client
|
||||
options *TaskOptions
|
||||
log log.Logger
|
||||
}
|
||||
|
||||
func newTask(client *client, options *TaskOptions) (*task, error) {
|
||||
task := &task{
|
||||
RWMutex: sync.RWMutex{},
|
||||
client: client,
|
||||
options: options,
|
||||
log: log.Logger{},
|
||||
}
|
||||
return task, nil
|
||||
}
|
||||
|
||||
func (t task) PullTaskInfo(pullTaskInfoReq PullTaskInfoReq) (*PullTaskInfoResp, error) {
|
||||
result := PullTaskInfoResp{}
|
||||
// 查询p端类型
|
||||
var kind int32
|
||||
t.client.DbEngin.Raw("select type as kind from `t_adapter` where id = ?", pullTaskInfoReq.AdapterId).Scan(&kind)
|
||||
// 查询云智超中的数据列表
|
||||
switch kind {
|
||||
case 2:
|
||||
var hpcModelList []models.TaskHpc
|
||||
findModelList(pullTaskInfoReq.AdapterId, t.client.DbEngin, &hpcModelList)
|
||||
utils.Convert(hpcModelList, &result.HpcInfoList)
|
||||
case 0:
|
||||
var cloudModelList []models.Cloud
|
||||
findModelList(pullTaskInfoReq.AdapterId, t.client.DbEngin, &cloudModelList)
|
||||
utils.Convert(cloudModelList, &result.CloudInfoList)
|
||||
case 1:
|
||||
var aiModelList []models.Ai
|
||||
findModelList(pullTaskInfoReq.AdapterId, t.client.DbEngin, &aiModelList)
|
||||
utils.Convert(aiModelList, &result.AiInfoList)
|
||||
}
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
func (t task) PushTaskInfo(pushTaskInfoReq PushTaskInfoReq) (*PushTaskInfoResp, error) {
|
||||
// 查询p端类型
|
||||
var kind int32
|
||||
t.client.DbEngin.Raw("select type as kind from t_adapter where id = ?", pushTaskInfoReq.AdapterId).Scan(&kind)
|
||||
switch kind {
|
||||
case 0:
|
||||
for _, cloudInfo := range pushTaskInfoReq.CloudInfoList {
|
||||
t.client.DbEngin.Exec("update cloud set status = ?,start_time = ?,result = ? where participant_id = ? and id = ?",
|
||||
cloudInfo.Status, cloudInfo.StartTime, cloudInfo.Result, pushTaskInfoReq.AdapterId, cloudInfo.Id)
|
||||
syncTask(t.client.DbEngin, cloudInfo.TaskId)
|
||||
}
|
||||
case 2:
|
||||
for _, hpcInfo := range pushTaskInfoReq.HpcInfoList {
|
||||
t.client.DbEngin.Exec("update task_hpc set status = ?,start_time = ?,job_id = ? where cluster_id = ? and task_id = ? and name = ?",
|
||||
hpcInfo.Status, hpcInfo.StartTime, hpcInfo.RunningTime, hpcInfo.JobId, pushTaskInfoReq.AdapterId, hpcInfo.TaskId, hpcInfo.Name)
|
||||
syncTask(t.client.DbEngin, hpcInfo.TaskId)
|
||||
}
|
||||
case 1:
|
||||
for _, aiInfo := range pushTaskInfoReq.AiInfoList {
|
||||
t.client.DbEngin.Exec("update ai set status = ?,start_time = ?,project_id = ?,job_id = ? where participant_id = ? and task_id = ? and name = ?",
|
||||
aiInfo.Status, aiInfo.StartTime, aiInfo.ProjectId, aiInfo.JobId, pushTaskInfoReq.AdapterId, aiInfo.TaskId, aiInfo.Name)
|
||||
syncTask(t.client.DbEngin, aiInfo.TaskId)
|
||||
}
|
||||
}
|
||||
|
||||
return &PushTaskInfoResp{}, nil
|
||||
}
|
||||
|
||||
func (t task) PushResourceInfo(pushResourceInfoReq PushResourceInfoReq) {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func findModelList(participantId int64, dbEngin *gorm.DB, data interface{}) error {
|
||||
tx := dbEngin.Where("cluster_id = (select id from t_cluster where adapter_id = ?) AND status NOT IN ?", participantId, []string{"Deleted", "Succeeded", "Completed", "Failed"}).Find(data)
|
||||
if tx.Error != nil {
|
||||
return tx.Error
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func syncTask(gorm *gorm.DB, taskId int64) {
|
||||
|
||||
var allStatus string
|
||||
tx := gorm.Raw("SELECT CONCAT_WS(',',GROUP_CONCAT(DISTINCT h.status) ,GROUP_CONCAT(DISTINCT a.status) ,GROUP_CONCAT(DISTINCT c.status))as status from task t left join hpc h on t.id = h.task_id left join cloud c on t.id = c.task_id left join ai a on t.id = a.task_id where t.id = ?", taskId).Scan(&allStatus)
|
||||
if tx.Error != nil {
|
||||
logx.Error(tx.Error)
|
||||
}
|
||||
// 子状态统一则修改主任务状态
|
||||
statusArray := strings.Split(allStatus, ",")
|
||||
if len(removeRepeatedElement(statusArray)) == 1 {
|
||||
updateTask(gorm, taskId, statusArray[0])
|
||||
|
||||
}
|
||||
// 子任务包含失败状态 主任务则失败
|
||||
if strings.Contains(allStatus, constants.Failed) {
|
||||
updateTask(gorm, taskId, constants.Failed)
|
||||
|
||||
}
|
||||
if strings.Contains(allStatus, constants.Running) {
|
||||
updateTask(gorm, taskId, constants.Running)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func updateTask(gorm *gorm.DB, taskId int64, status string) {
|
||||
var task models.Task
|
||||
gorm.Where("id = ? ", taskId).Find(&task)
|
||||
if task.Status != status {
|
||||
task.Status = status
|
||||
gorm.Updates(&task)
|
||||
}
|
||||
}
|
||||
|
||||
func removeRepeatedElement(arr []string) (newArr []string) {
|
||||
newArr = make([]string, 0)
|
||||
for i := 0; i < len(arr); i++ {
|
||||
repeat := false
|
||||
for j := i + 1; j < len(arr); j++ {
|
||||
if arr[i] == arr[j] {
|
||||
repeat = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !repeat {
|
||||
newArr = append(newArr, arr[i])
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
|
@ -0,0 +1,79 @@
|
|||
package client
|
||||
|
||||
type HpcInfo struct {
|
||||
ParticipantId int64 `json:"participantId,omitempty"`
|
||||
TaskId int64 `json:"taskId,omitempty"`
|
||||
JobId string `json:"jobId,omitempty"`
|
||||
Name string `json:"name,omitempty"`
|
||||
Status string `json:"status,omitempty"`
|
||||
StartTime string `json:"startTime,omitempty"`
|
||||
RunningTime int64 `json:"runningTime,omitempty"`
|
||||
Result string `json:"result,omitempty"`
|
||||
WorkDir string `json:"workDir,omitempty"`
|
||||
WallTime string `json:"wallTime,omitempty"`
|
||||
CmdScript string `json:"cmdScript,omitempty"`
|
||||
DerivedEs string `json:"derivedEs,omitempty"`
|
||||
Cluster string `json:"cluster,omitempty"`
|
||||
BlockId string `json:"blockId,omitempty"`
|
||||
AllocNodes uint32 `json:"allocNodes,omitempty"`
|
||||
AllocCpu uint32 `json:"allocCpu,omitempty"`
|
||||
Version string `json:"version,omitempty"`
|
||||
Account string `json:"account,omitempty"`
|
||||
ExitCode uint32 `json:"exitCode,omitempty"`
|
||||
AssocId uint32 `json:"assocId,omitempty"`
|
||||
AppType string `json:"appType,omitempty"`
|
||||
AppName string `json:"appName,omitempty"`
|
||||
Queue string `json:"queue,omitempty"`
|
||||
SubmitType string `json:"submitType,omitempty"`
|
||||
NNode string `json:"nNode,omitempty"`
|
||||
StdOutFile string `json:"stdOutFile,omitempty"`
|
||||
StdErrFile string `json:"stdErrFile,omitempty"`
|
||||
StdInput string `json:"stdInput,omitempty"`
|
||||
Environment string `json:"environment,omitempty"`
|
||||
}
|
||||
|
||||
type CloudInfo struct {
|
||||
Participant int64 `json:"participant,omitempty"`
|
||||
Id int64 `json:"id,omitempty"`
|
||||
TaskId int64 `json:"taskId,omitempty"`
|
||||
ApiVersion string `json:"apiVersion,omitempty"`
|
||||
Kind string `json:"kind,omitempty"`
|
||||
Namespace string `json:"namespace,omitempty"`
|
||||
Name string `json:"name,omitempty"`
|
||||
Status string `json:"status,omitempty"`
|
||||
StartTime string `json:"startTime,omitempty"`
|
||||
RunningTime int64 `json:"runningTime,omitempty"`
|
||||
Result string `json:"result,omitempty"`
|
||||
YamlString string `json:"yamlString,omitempty"`
|
||||
}
|
||||
|
||||
type AiInfo struct {
|
||||
ParticipantId int64 `json:"participantId,omitempty"`
|
||||
TaskId int64 `json:"taskId,omitempty"`
|
||||
ProjectId string `json:"project_id,omitempty"`
|
||||
Name string `json:"name,omitempty"`
|
||||
Status string `json:"status,omitempty"`
|
||||
StartTime string `json:"startTime,omitempty"`
|
||||
RunningTime int64 `json:"runningTime,omitempty"`
|
||||
Result string `json:"result,omitempty"`
|
||||
JobId string `json:"jobId,omitempty"`
|
||||
CreateTime string `json:"createTime,omitempty"`
|
||||
ImageUrl string `json:"imageUrl,omitempty"`
|
||||
Command string `json:"command,omitempty"`
|
||||
FlavorId string `json:"flavorId,omitempty"`
|
||||
SubscriptionId string `json:"subscriptionId,omitempty"`
|
||||
ItemVersionId string `json:"itemVersionId,omitempty"`
|
||||
}
|
||||
|
||||
type VmInfo struct {
|
||||
ParticipantId int64 `json:"participantId,omitempty"`
|
||||
TaskId int64 `json:"taskId,omitempty"`
|
||||
Name string `json:"name,omitempty"`
|
||||
FlavorRef string `json:"flavor_ref,omitempty"`
|
||||
ImageRef string `json:"image_ref,omitempty"`
|
||||
NetworkUuid string `json:"network_uuid,omitempty"`
|
||||
BlockUuid string `json:"block_uuid,omitempty"`
|
||||
SourceType string `json:"source_type,omitempty"`
|
||||
DeleteOnTermination bool `json:"delete_on_termination,omitempty"`
|
||||
State string `json:"state,omitempty"`
|
||||
}
|
Loading…
Reference in New Issue