add pull and push task api

Former-commit-id: 40540a6dd07fa468ceeb2e6f3e3790032b05bed4
This commit is contained in:
zhouqunjie 2024-03-19 09:47:55 +08:00
parent 3279fde8c4
commit d3ff595ea1
7 changed files with 309 additions and 128 deletions

View File

@ -6,6 +6,12 @@ type TaskOptions struct {
pushResourceInfoReq PushResourceInfoReq pushResourceInfoReq PushResourceInfoReq
} }
type Task interface {
PullTaskInfo(pullTaskInfoReq PullTaskInfoReq) (*PullTaskInfoResp, error)
PushTaskInfo(pushTaskInfoReq PushTaskInfoReq) (*PushTaskInfoResp, error)
PushResourceInfo(pushResourceInfoReq PushResourceInfoReq) error
}
type PullTaskInfoReq struct { type PullTaskInfoReq struct {
AdapterId int64 `json:"adapterId"` AdapterId int64 `json:"adapterId"`
} }
@ -33,9 +39,3 @@ type PushTaskInfoResp struct {
type PushResourceInfoReq struct { type PushResourceInfoReq struct {
AdapterId int64 `json:"adapterId"` AdapterId int64 `json:"adapterId"`
} }
type Task interface {
PullTaskInfo(pullTaskInfoReq PullTaskInfoReq) (*PullTaskInfoResp, error)
PushTaskInfo(pushTaskInfoReq PushTaskInfoReq) (*PushTaskInfoResp, error)
PushResourceInfo(pushResourceInfoReq PushResourceInfoReq)
}

View File

@ -1,13 +1,10 @@
package client package client
import ( import (
"github.com/jinzhu/copier" "io/ioutil"
"github.com/zeromicro/go-zero/core/logx" "k8s.io/apimachinery/pkg/util/json"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gorm.io/gorm"
"log" "log"
"net/http"
"strings" "strings"
"sync" "sync"
) )
@ -29,128 +26,47 @@ func newTask(client *client, options *TaskOptions) (*task, error) {
return task, nil return task, nil
} }
func (t task) PullTaskInfo(pullTaskInfoReq PullTaskInfoReq) (*PullTaskInfoResp, error) { func (t *task) PullTaskInfo(pullTaskInfoReq PullTaskInfoReq) (*PullTaskInfoResp, error) {
result := PullTaskInfoResp{}
// 查询p端类型
var kind int32
t.client.DbEngin.Raw("select type as kind from `t_adapter` where id = ?", pullTaskInfoReq.AdapterId).Scan(&kind)
// 查询云智超中的数据列表
switch kind {
case 2:
var hpcModelList []models.TaskHpc
findModelList(pullTaskInfoReq.AdapterId, t.client.DbEngin, &hpcModelList)
utils.Convert(hpcModelList, &result.HpcInfoList)
if len(result.HpcInfoList) > 0 {
for i, hpcInfo := range hpcModelList {
err := copier.CopyWithOption(result.HpcInfoList[i], hpcInfo, copier.Option{Converters: utils.Converters})
if err != nil {
return nil, err
}
var clusterType string
t.client.DbEngin.Raw("SELECT label FROM `t_cluster` where id = ? ", hpcInfo.ClusterId).Scan(&clusterType)
result.HpcInfoList[i].ClusterType = clusterType url := t.client.url + "/pcm/v1/core/pullTaskInfo"
} method := "GET"
} infoReq := PullTaskInfoReq{AdapterId: pullTaskInfoReq.AdapterId}
case 0: jsonStr, _ := json.Marshal(infoReq)
var cloudModelList []models.Cloud payload := strings.NewReader(string(jsonStr))
findModelList(pullTaskInfoReq.AdapterId, t.client.DbEngin, &cloudModelList)
utils.Convert(cloudModelList, &result.CloudInfoList) client := &http.Client{}
case 1: req, _ := http.NewRequest(method, url, payload)
var aiModelList []models.Ai req.Header.Add("Content-Type", "application/json")
findModelList(pullTaskInfoReq.AdapterId, t.client.DbEngin, &aiModelList) res, _ := client.Do(req)
utils.Convert(aiModelList, &result.AiInfoList) defer res.Body.Close()
}
return &result, nil body, _ := ioutil.ReadAll(res.Body)
var resp PullTaskInfoResp
json.Unmarshal(body, &resp)
return &resp, nil
} }
func (t task) PushTaskInfo(pushTaskInfoReq PushTaskInfoReq) (*PushTaskInfoResp, error) { func (t *task) PushTaskInfo(pushTaskInfoReq PushTaskInfoReq) (*PushTaskInfoResp, error) {
// 查询p端类型
var kind int32
t.client.DbEngin.Raw("select type as kind from t_adapter where id = ?", pushTaskInfoReq.AdapterId).Scan(&kind)
switch kind {
case 0:
for _, cloudInfo := range pushTaskInfoReq.CloudInfoList {
t.client.DbEngin.Exec("update cloud set status = ?,start_time = ?,result = ? where participant_id = ? and id = ?",
cloudInfo.Status, cloudInfo.StartTime, cloudInfo.Result, pushTaskInfoReq.AdapterId, cloudInfo.Id)
syncTask(t.client.DbEngin, cloudInfo.TaskId)
}
case 2:
for _, hpcInfo := range pushTaskInfoReq.HpcInfoList {
t.client.DbEngin.Exec("update task_hpc set status = ?,start_time = ?,job_id = ? where cluster_id = ? and task_id = ? and name = ?",
hpcInfo.Status, hpcInfo.StartTime, hpcInfo.RunningTime, hpcInfo.JobId, pushTaskInfoReq.AdapterId, hpcInfo.TaskId, hpcInfo.Name)
syncTask(t.client.DbEngin, hpcInfo.TaskId)
}
case 1:
for _, aiInfo := range pushTaskInfoReq.AiInfoList {
t.client.DbEngin.Exec("update ai set status = ?,start_time = ?,project_id = ?,job_id = ? where participant_id = ? and task_id = ? and name = ?",
aiInfo.Status, aiInfo.StartTime, aiInfo.ProjectId, aiInfo.JobId, pushTaskInfoReq.AdapterId, aiInfo.TaskId, aiInfo.Name)
syncTask(t.client.DbEngin, aiInfo.TaskId)
}
}
return &PushTaskInfoResp{}, nil url := t.client.url + "/pcm/v1/core/pushTaskInfo"
method := "POST"
infoReq := PullTaskInfoReq{AdapterId: pushTaskInfoReq.AdapterId}
jsonStr, _ := json.Marshal(infoReq)
payload := strings.NewReader(string(jsonStr))
client := &http.Client{}
req, _ := http.NewRequest(method, url, payload)
req.Header.Add("Content-Type", "application/json")
res, _ := client.Do(req)
defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)
var resp PushTaskInfoResp
json.Unmarshal(body, &resp)
return &resp, nil
} }
func (t task) PushResourceInfo(pushResourceInfoReq PushResourceInfoReq) { func (t *task) PushResourceInfo(pushResourceInfoReq PushResourceInfoReq) error {
//TODO implement me //TODO implement me
panic("implement me") panic("implement me")
} }
func findModelList(participantId int64, dbEngin *gorm.DB, data interface{}) error {
tx := dbEngin.Where("cluster_id = (select id from t_cluster where adapter_id = ?) AND status NOT IN ?", participantId, []string{"Deleted", "Succeeded", "Completed", "Failed"}).Find(data)
if tx.Error != nil {
return tx.Error
}
return nil
}
func syncTask(gorm *gorm.DB, taskId int64) {
var allStatus string
tx := gorm.Raw("SELECT CONCAT_WS(',',GROUP_CONCAT(DISTINCT h.status) ,GROUP_CONCAT(DISTINCT a.status) ,GROUP_CONCAT(DISTINCT c.status))as status from task t left join hpc h on t.id = h.task_id left join cloud c on t.id = c.task_id left join ai a on t.id = a.task_id where t.id = ?", taskId).Scan(&allStatus)
if tx.Error != nil {
logx.Error(tx.Error)
}
// 子状态统一则修改主任务状态
statusArray := strings.Split(allStatus, ",")
if len(removeRepeatedElement(statusArray)) == 1 {
updateTask(gorm, taskId, statusArray[0])
}
// 子任务包含失败状态 主任务则失败
if strings.Contains(allStatus, constants.Failed) {
updateTask(gorm, taskId, constants.Failed)
}
if strings.Contains(allStatus, constants.Running) {
updateTask(gorm, taskId, constants.Running)
}
}
func updateTask(gorm *gorm.DB, taskId int64, status string) {
var task models.Task
gorm.Where("id = ? ", taskId).Find(&task)
if task.Status != status {
task.Status = status
gorm.Updates(&task)
}
}
func removeRepeatedElement(arr []string) (newArr []string) {
newArr = make([]string, 0)
for i := 0; i < len(arr); i++ {
repeat := false
for j := i + 1; j < len(arr); j++ {
if arr[i] == arr[j] {
repeat = true
break
}
}
if !repeat {
newArr = append(newArr, arr[i])
}
}
return
}

View File

@ -0,0 +1,28 @@
package core
import (
clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/api/client"
"net/http"
"github.com/zeromicro/go-zero/rest/httpx"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/logic/core"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
)
func PullTaskInfoHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req clientCore.PullTaskInfoReq
if err := httpx.Parse(r, &req); err != nil {
httpx.ErrorCtx(r.Context(), w, err)
return
}
l := core.NewPullTaskInfoLogic(r.Context(), svcCtx)
resp, err := l.PullTaskInfo(&req)
if err != nil {
httpx.ErrorCtx(r.Context(), w, err)
} else {
httpx.OkJsonCtx(r.Context(), w, resp)
}
}
}

View File

@ -0,0 +1,28 @@
package core
import (
clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/api/client"
"net/http"
"github.com/zeromicro/go-zero/rest/httpx"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/logic/core"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
)
func PushTaskInfoHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req clientCore.PushTaskInfoReq
if err := httpx.Parse(r, &req); err != nil {
httpx.ErrorCtx(r.Context(), w, err)
return
}
l := core.NewPushTaskInfoLogic(r.Context(), svcCtx)
resp, err := l.PushTaskInfo(&req)
if err != nil {
httpx.ErrorCtx(r.Context(), w, err)
} else {
httpx.OkJsonCtx(r.Context(), w, resp)
}
}
}

View File

@ -134,6 +134,16 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
Path: "/core/metrics", Path: "/core/metrics",
Handler: core.MetricsHandler(serverCtx), Handler: core.MetricsHandler(serverCtx),
}, },
{
Method: http.MethodGet,
Path: "/core/pullTaskInfo",
Handler: core.PullTaskInfoHandler(serverCtx),
},
{
Method: http.MethodGet,
Path: "/core/pushTaskInfo",
Handler: core.PushTaskInfoHandler(serverCtx),
},
}, },
rest.WithPrefix("/pcm/v1"), rest.WithPrefix("/pcm/v1"),
) )

View File

@ -0,0 +1,94 @@
package core
import (
"context"
"github.com/jinzhu/copier"
clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/api/client"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gorm.io/gorm"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
)
type PullTaskInfoLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewPullTaskInfoLogic(ctx context.Context, svcCtx *svc.ServiceContext) *PullTaskInfoLogic {
return &PullTaskInfoLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *PullTaskInfoLogic) PullTaskInfo(req *clientCore.PullTaskInfoReq) (*clientCore.PullTaskInfoResp, error) {
//opt := clientPCM.Options{
// Url: "http://localhost:8999",
// DataSource: "root:uJpLd6u-J?HC1@(47.92.88.143:3306)/pcm?parseTime=true&loc=Local",
//}
//coreCli, _ := clientPCM.NewClient(opt)
//taskOpt := clientPCM.TaskOptions{}
//coreTask, _ := coreCli.Task(taskOpt)
//adapterId := 1706858330967773111
//// 查询core端分发下来的任务列表
//pullReq := types.PullTaskInfoReq{
// AdapterId: int64(adapterId),
//}
//hpcList, _ := coreTask.PullTaskInfo(pullReq)
//println(hpcList)
// 查询p端类型
resp := clientCore.PullTaskInfoResp{}
var kind int32
l.svcCtx.DbEngin.Raw("select type as kind from `t_adapter` where id = ?", req.AdapterId).Scan(&kind)
// 查询云智超中的数据列表
switch kind {
case 2:
var hpcModelList []models.TaskHpc
err := findModelList(req.AdapterId, l.svcCtx.DbEngin, &hpcModelList)
if err != nil {
return nil, err
}
utils.Convert(hpcModelList, &resp.HpcInfoList)
if len(resp.HpcInfoList) > 0 {
for i, hpcInfo := range hpcModelList {
err := copier.CopyWithOption(resp.HpcInfoList[i], hpcInfo, copier.Option{Converters: utils.Converters})
if err != nil {
return nil, err
}
var clusterType string
l.svcCtx.DbEngin.Raw("SELECT label FROM `t_cluster` where id = ? ", hpcInfo.ClusterId).Scan(&clusterType)
resp.HpcInfoList[i].ClusterType = clusterType
}
}
case 0:
var cloudModelList []models.Cloud
err := findModelList(req.AdapterId, l.svcCtx.DbEngin, &cloudModelList)
if err != nil {
return nil, err
}
utils.Convert(cloudModelList, &resp.CloudInfoList)
case 1:
var aiModelList []models.Ai
err := findModelList(req.AdapterId, l.svcCtx.DbEngin, &aiModelList)
if err != nil {
return nil, err
}
utils.Convert(aiModelList, &resp.AiInfoList)
}
return &resp, nil
}
func findModelList(participantId int64, dbEngin *gorm.DB, data interface{}) error {
tx := dbEngin.Where("cluster_id = (select id from t_cluster where adapter_id = ?) AND status NOT IN ?", participantId, []string{"Deleted", "Succeeded", "Completed", "Failed"}).Find(data)
if tx.Error != nil {
return tx.Error
}
return nil
}

View File

@ -0,0 +1,105 @@
package core
import (
"context"
clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/api/client"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"gorm.io/gorm"
"strings"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
)
type PushTaskInfoLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewPushTaskInfoLogic(ctx context.Context, svcCtx *svc.ServiceContext) *PushTaskInfoLogic {
return &PushTaskInfoLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *PushTaskInfoLogic) PushTaskInfo(req *clientCore.PushTaskInfoReq) (*clientCore.PushTaskInfoResp, error) {
resp := clientCore.PushTaskInfoResp{}
var kind int32
l.svcCtx.DbEngin.Raw("select type as kind from t_adapter where id = ?", req.AdapterId).Scan(&kind)
switch kind {
case 0:
for _, cloudInfo := range req.CloudInfoList {
l.svcCtx.DbEngin.Exec("update cloud set status = ?,start_time = ?,result = ? where participant_id = ? and id = ?",
cloudInfo.Status, cloudInfo.StartTime, cloudInfo.Result, req.AdapterId, cloudInfo.Id)
syncTask(l.svcCtx.DbEngin, cloudInfo.TaskId)
}
case 2:
for _, hpcInfo := range req.HpcInfoList {
l.svcCtx.DbEngin.Exec("update task_hpc set status = ?,start_time = ?,job_id = ? where cluster_id = ? and task_id = ? and name = ?",
hpcInfo.Status, hpcInfo.StartTime, hpcInfo.RunningTime, hpcInfo.JobId, req.AdapterId, hpcInfo.TaskId, hpcInfo.Name)
syncTask(l.svcCtx.DbEngin, hpcInfo.TaskId)
}
case 1:
for _, aiInfo := range req.AiInfoList {
l.svcCtx.DbEngin.Exec("update ai set status = ?,start_time = ?,project_id = ?,job_id = ? where participant_id = ? and task_id = ? and name = ?",
aiInfo.Status, aiInfo.StartTime, aiInfo.ProjectId, aiInfo.JobId, req.AdapterId, aiInfo.TaskId, aiInfo.Name)
syncTask(l.svcCtx.DbEngin, aiInfo.TaskId)
}
}
return &resp, nil
}
func syncTask(gorm *gorm.DB, taskId int64) {
var allStatus string
tx := gorm.Raw("SELECT CONCAT_WS(',',GROUP_CONCAT(DISTINCT h.status) ,GROUP_CONCAT(DISTINCT a.status) ,GROUP_CONCAT(DISTINCT c.status))as status from task t left join hpc h on t.id = h.task_id left join cloud c on t.id = c.task_id left join ai a on t.id = a.task_id where t.id = ?", taskId).Scan(&allStatus)
if tx.Error != nil {
logx.Error(tx.Error)
}
// 子状态统一则修改主任务状态
statusArray := strings.Split(allStatus, ",")
if len(removeRepeatedElement(statusArray)) == 1 {
updateTask(gorm, taskId, statusArray[0])
}
// 子任务包含失败状态 主任务则失败
if strings.Contains(allStatus, constants.Failed) {
updateTask(gorm, taskId, constants.Failed)
}
if strings.Contains(allStatus, constants.Running) {
updateTask(gorm, taskId, constants.Running)
}
}
func updateTask(gorm *gorm.DB, taskId int64, status string) {
var task models.Task
gorm.Where("id = ? ", taskId).Find(&task)
if task.Status != status {
task.Status = status
gorm.Updates(&task)
}
}
func removeRepeatedElement(arr []string) (newArr []string) {
newArr = make([]string, 0)
for i := 0; i < len(arr); i++ {
repeat := false
for j := i + 1; j < len(arr); j++ {
if arr[i] == arr[j] {
repeat = true
break
}
}
if !repeat {
newArr = append(newArr, arr[i])
}
}
return
}