diff --git a/api/client/task.go b/api/client/task.go index 557f2052..086249f7 100644 --- a/api/client/task.go +++ b/api/client/task.go @@ -6,6 +6,12 @@ type TaskOptions struct { pushResourceInfoReq PushResourceInfoReq } +type Task interface { + PullTaskInfo(pullTaskInfoReq PullTaskInfoReq) (*PullTaskInfoResp, error) + PushTaskInfo(pushTaskInfoReq PushTaskInfoReq) (*PushTaskInfoResp, error) + PushResourceInfo(pushResourceInfoReq PushResourceInfoReq) error +} + type PullTaskInfoReq struct { AdapterId int64 `json:"adapterId"` } @@ -33,9 +39,3 @@ type PushTaskInfoResp struct { type PushResourceInfoReq struct { AdapterId int64 `json:"adapterId"` } - -type Task interface { - PullTaskInfo(pullTaskInfoReq PullTaskInfoReq) (*PullTaskInfoResp, error) - PushTaskInfo(pushTaskInfoReq PushTaskInfoReq) (*PushTaskInfoResp, error) - PushResourceInfo(pushResourceInfoReq PushResourceInfoReq) -} diff --git a/api/client/task_impl.go b/api/client/task_impl.go index 8b141cea..a339c298 100644 --- a/api/client/task_impl.go +++ b/api/client/task_impl.go @@ -1,13 +1,10 @@ package client import ( - "github.com/jinzhu/copier" - "github.com/zeromicro/go-zero/core/logx" - "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" - "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" - "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" - "gorm.io/gorm" + "io/ioutil" + "k8s.io/apimachinery/pkg/util/json" "log" + "net/http" "strings" "sync" ) @@ -29,128 +26,47 @@ func newTask(client *client, options *TaskOptions) (*task, error) { return task, nil } -func (t task) PullTaskInfo(pullTaskInfoReq PullTaskInfoReq) (*PullTaskInfoResp, error) { - result := PullTaskInfoResp{} - // 查询p端类型 - var kind int32 - t.client.DbEngin.Raw("select type as kind from `t_adapter` where id = ?", pullTaskInfoReq.AdapterId).Scan(&kind) - // 查询云智超中的数据列表 - switch kind { - case 2: - var hpcModelList []models.TaskHpc - findModelList(pullTaskInfoReq.AdapterId, t.client.DbEngin, &hpcModelList) - utils.Convert(hpcModelList, &result.HpcInfoList) - if len(result.HpcInfoList) > 0 { - for i, hpcInfo := range hpcModelList { - err := copier.CopyWithOption(result.HpcInfoList[i], hpcInfo, copier.Option{Converters: utils.Converters}) - if err != nil { - return nil, err - } - var clusterType string - t.client.DbEngin.Raw("SELECT label FROM `t_cluster` where id = ? ", hpcInfo.ClusterId).Scan(&clusterType) +func (t *task) PullTaskInfo(pullTaskInfoReq PullTaskInfoReq) (*PullTaskInfoResp, error) { - result.HpcInfoList[i].ClusterType = clusterType - } - } - case 0: - var cloudModelList []models.Cloud - findModelList(pullTaskInfoReq.AdapterId, t.client.DbEngin, &cloudModelList) - utils.Convert(cloudModelList, &result.CloudInfoList) - case 1: - var aiModelList []models.Ai - findModelList(pullTaskInfoReq.AdapterId, t.client.DbEngin, &aiModelList) - utils.Convert(aiModelList, &result.AiInfoList) - } - return &result, nil + url := t.client.url + "/pcm/v1/core/pullTaskInfo" + method := "GET" + infoReq := PullTaskInfoReq{AdapterId: pullTaskInfoReq.AdapterId} + jsonStr, _ := json.Marshal(infoReq) + payload := strings.NewReader(string(jsonStr)) + + client := &http.Client{} + req, _ := http.NewRequest(method, url, payload) + req.Header.Add("Content-Type", "application/json") + res, _ := client.Do(req) + defer res.Body.Close() + + body, _ := ioutil.ReadAll(res.Body) + var resp PullTaskInfoResp + json.Unmarshal(body, &resp) + return &resp, nil } -func (t task) PushTaskInfo(pushTaskInfoReq PushTaskInfoReq) (*PushTaskInfoResp, error) { - // 查询p端类型 - var kind int32 - t.client.DbEngin.Raw("select type as kind from t_adapter where id = ?", pushTaskInfoReq.AdapterId).Scan(&kind) - switch kind { - case 0: - for _, cloudInfo := range pushTaskInfoReq.CloudInfoList { - t.client.DbEngin.Exec("update cloud set status = ?,start_time = ?,result = ? where participant_id = ? and id = ?", - cloudInfo.Status, cloudInfo.StartTime, cloudInfo.Result, pushTaskInfoReq.AdapterId, cloudInfo.Id) - syncTask(t.client.DbEngin, cloudInfo.TaskId) - } - case 2: - for _, hpcInfo := range pushTaskInfoReq.HpcInfoList { - t.client.DbEngin.Exec("update task_hpc set status = ?,start_time = ?,job_id = ? where cluster_id = ? and task_id = ? and name = ?", - hpcInfo.Status, hpcInfo.StartTime, hpcInfo.RunningTime, hpcInfo.JobId, pushTaskInfoReq.AdapterId, hpcInfo.TaskId, hpcInfo.Name) - syncTask(t.client.DbEngin, hpcInfo.TaskId) - } - case 1: - for _, aiInfo := range pushTaskInfoReq.AiInfoList { - t.client.DbEngin.Exec("update ai set status = ?,start_time = ?,project_id = ?,job_id = ? where participant_id = ? and task_id = ? and name = ?", - aiInfo.Status, aiInfo.StartTime, aiInfo.ProjectId, aiInfo.JobId, pushTaskInfoReq.AdapterId, aiInfo.TaskId, aiInfo.Name) - syncTask(t.client.DbEngin, aiInfo.TaskId) - } - } +func (t *task) PushTaskInfo(pushTaskInfoReq PushTaskInfoReq) (*PushTaskInfoResp, error) { - return &PushTaskInfoResp{}, nil + url := t.client.url + "/pcm/v1/core/pushTaskInfo" + method := "POST" + infoReq := PullTaskInfoReq{AdapterId: pushTaskInfoReq.AdapterId} + jsonStr, _ := json.Marshal(infoReq) + payload := strings.NewReader(string(jsonStr)) + + client := &http.Client{} + req, _ := http.NewRequest(method, url, payload) + req.Header.Add("Content-Type", "application/json") + res, _ := client.Do(req) + defer res.Body.Close() + + body, _ := ioutil.ReadAll(res.Body) + var resp PushTaskInfoResp + json.Unmarshal(body, &resp) + return &resp, nil } -func (t task) PushResourceInfo(pushResourceInfoReq PushResourceInfoReq) { +func (t *task) PushResourceInfo(pushResourceInfoReq PushResourceInfoReq) error { //TODO implement me panic("implement me") } - -func findModelList(participantId int64, dbEngin *gorm.DB, data interface{}) error { - tx := dbEngin.Where("cluster_id = (select id from t_cluster where adapter_id = ?) AND status NOT IN ?", participantId, []string{"Deleted", "Succeeded", "Completed", "Failed"}).Find(data) - if tx.Error != nil { - return tx.Error - } - return nil -} - -func syncTask(gorm *gorm.DB, taskId int64) { - - var allStatus string - tx := gorm.Raw("SELECT CONCAT_WS(',',GROUP_CONCAT(DISTINCT h.status) ,GROUP_CONCAT(DISTINCT a.status) ,GROUP_CONCAT(DISTINCT c.status))as status from task t left join hpc h on t.id = h.task_id left join cloud c on t.id = c.task_id left join ai a on t.id = a.task_id where t.id = ?", taskId).Scan(&allStatus) - if tx.Error != nil { - logx.Error(tx.Error) - } - // 子状态统一则修改主任务状态 - statusArray := strings.Split(allStatus, ",") - if len(removeRepeatedElement(statusArray)) == 1 { - updateTask(gorm, taskId, statusArray[0]) - - } - // 子任务包含失败状态 主任务则失败 - if strings.Contains(allStatus, constants.Failed) { - updateTask(gorm, taskId, constants.Failed) - - } - if strings.Contains(allStatus, constants.Running) { - updateTask(gorm, taskId, constants.Running) - } - -} - -func updateTask(gorm *gorm.DB, taskId int64, status string) { - var task models.Task - gorm.Where("id = ? ", taskId).Find(&task) - if task.Status != status { - task.Status = status - gorm.Updates(&task) - } -} - -func removeRepeatedElement(arr []string) (newArr []string) { - newArr = make([]string, 0) - for i := 0; i < len(arr); i++ { - repeat := false - for j := i + 1; j < len(arr); j++ { - if arr[i] == arr[j] { - repeat = true - break - } - } - if !repeat { - newArr = append(newArr, arr[i]) - } - } - return -} diff --git a/api/internal/handler/core/pulltaskinfohandler.go b/api/internal/handler/core/pulltaskinfohandler.go new file mode 100644 index 00000000..e1777968 --- /dev/null +++ b/api/internal/handler/core/pulltaskinfohandler.go @@ -0,0 +1,28 @@ +package core + +import ( + clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/api/client" + "net/http" + + "github.com/zeromicro/go-zero/rest/httpx" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/logic/core" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" +) + +func PullTaskInfoHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var req clientCore.PullTaskInfoReq + if err := httpx.Parse(r, &req); err != nil { + httpx.ErrorCtx(r.Context(), w, err) + return + } + + l := core.NewPullTaskInfoLogic(r.Context(), svcCtx) + resp, err := l.PullTaskInfo(&req) + if err != nil { + httpx.ErrorCtx(r.Context(), w, err) + } else { + httpx.OkJsonCtx(r.Context(), w, resp) + } + } +} diff --git a/api/internal/handler/core/pushtaskinfohandler.go b/api/internal/handler/core/pushtaskinfohandler.go new file mode 100644 index 00000000..fa7f5e91 --- /dev/null +++ b/api/internal/handler/core/pushtaskinfohandler.go @@ -0,0 +1,28 @@ +package core + +import ( + clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/api/client" + "net/http" + + "github.com/zeromicro/go-zero/rest/httpx" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/logic/core" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" +) + +func PushTaskInfoHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var req clientCore.PushTaskInfoReq + if err := httpx.Parse(r, &req); err != nil { + httpx.ErrorCtx(r.Context(), w, err) + return + } + + l := core.NewPushTaskInfoLogic(r.Context(), svcCtx) + resp, err := l.PushTaskInfo(&req) + if err != nil { + httpx.ErrorCtx(r.Context(), w, err) + } else { + httpx.OkJsonCtx(r.Context(), w, resp) + } + } +} diff --git a/api/internal/handler/routes.go b/api/internal/handler/routes.go index 799d3e73..f01f8ea0 100644 --- a/api/internal/handler/routes.go +++ b/api/internal/handler/routes.go @@ -134,6 +134,16 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) { Path: "/core/metrics", Handler: core.MetricsHandler(serverCtx), }, + { + Method: http.MethodGet, + Path: "/core/pullTaskInfo", + Handler: core.PullTaskInfoHandler(serverCtx), + }, + { + Method: http.MethodGet, + Path: "/core/pushTaskInfo", + Handler: core.PushTaskInfoHandler(serverCtx), + }, }, rest.WithPrefix("/pcm/v1"), ) diff --git a/api/internal/logic/core/pulltaskinfologic.go b/api/internal/logic/core/pulltaskinfologic.go new file mode 100644 index 00000000..438475d4 --- /dev/null +++ b/api/internal/logic/core/pulltaskinfologic.go @@ -0,0 +1,94 @@ +package core + +import ( + "context" + "github.com/jinzhu/copier" + clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/api/client" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" + "gorm.io/gorm" + + "github.com/zeromicro/go-zero/core/logx" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" +) + +type PullTaskInfoLogic struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext +} + +func NewPullTaskInfoLogic(ctx context.Context, svcCtx *svc.ServiceContext) *PullTaskInfoLogic { + return &PullTaskInfoLogic{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +func (l *PullTaskInfoLogic) PullTaskInfo(req *clientCore.PullTaskInfoReq) (*clientCore.PullTaskInfoResp, error) { + //opt := clientPCM.Options{ + // Url: "http://localhost:8999", + // DataSource: "root:uJpLd6u-J?HC1@(47.92.88.143:3306)/pcm?parseTime=true&loc=Local", + //} + //coreCli, _ := clientPCM.NewClient(opt) + //taskOpt := clientPCM.TaskOptions{} + //coreTask, _ := coreCli.Task(taskOpt) + //adapterId := 1706858330967773111 + //// 查询core端分发下来的任务列表 + //pullReq := types.PullTaskInfoReq{ + // AdapterId: int64(adapterId), + //} + //hpcList, _ := coreTask.PullTaskInfo(pullReq) + //println(hpcList) + // 查询p端类型 + resp := clientCore.PullTaskInfoResp{} + + var kind int32 + l.svcCtx.DbEngin.Raw("select type as kind from `t_adapter` where id = ?", req.AdapterId).Scan(&kind) + // 查询云智超中的数据列表 + switch kind { + case 2: + var hpcModelList []models.TaskHpc + err := findModelList(req.AdapterId, l.svcCtx.DbEngin, &hpcModelList) + if err != nil { + return nil, err + } + utils.Convert(hpcModelList, &resp.HpcInfoList) + if len(resp.HpcInfoList) > 0 { + for i, hpcInfo := range hpcModelList { + err := copier.CopyWithOption(resp.HpcInfoList[i], hpcInfo, copier.Option{Converters: utils.Converters}) + if err != nil { + return nil, err + } + var clusterType string + l.svcCtx.DbEngin.Raw("SELECT label FROM `t_cluster` where id = ? ", hpcInfo.ClusterId).Scan(&clusterType) + + resp.HpcInfoList[i].ClusterType = clusterType + } + } + case 0: + var cloudModelList []models.Cloud + err := findModelList(req.AdapterId, l.svcCtx.DbEngin, &cloudModelList) + if err != nil { + return nil, err + } + utils.Convert(cloudModelList, &resp.CloudInfoList) + case 1: + var aiModelList []models.Ai + err := findModelList(req.AdapterId, l.svcCtx.DbEngin, &aiModelList) + if err != nil { + return nil, err + } + utils.Convert(aiModelList, &resp.AiInfoList) + } + return &resp, nil +} + +func findModelList(participantId int64, dbEngin *gorm.DB, data interface{}) error { + tx := dbEngin.Where("cluster_id = (select id from t_cluster where adapter_id = ?) AND status NOT IN ?", participantId, []string{"Deleted", "Succeeded", "Completed", "Failed"}).Find(data) + if tx.Error != nil { + return tx.Error + } + return nil +} diff --git a/api/internal/logic/core/pushtaskinfologic.go b/api/internal/logic/core/pushtaskinfologic.go new file mode 100644 index 00000000..6a882aee --- /dev/null +++ b/api/internal/logic/core/pushtaskinfologic.go @@ -0,0 +1,105 @@ +package core + +import ( + "context" + clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/api/client" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" + "gorm.io/gorm" + "strings" + + "github.com/zeromicro/go-zero/core/logx" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" +) + +type PushTaskInfoLogic struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext +} + +func NewPushTaskInfoLogic(ctx context.Context, svcCtx *svc.ServiceContext) *PushTaskInfoLogic { + return &PushTaskInfoLogic{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +func (l *PushTaskInfoLogic) PushTaskInfo(req *clientCore.PushTaskInfoReq) (*clientCore.PushTaskInfoResp, error) { + resp := clientCore.PushTaskInfoResp{} + var kind int32 + l.svcCtx.DbEngin.Raw("select type as kind from t_adapter where id = ?", req.AdapterId).Scan(&kind) + switch kind { + case 0: + for _, cloudInfo := range req.CloudInfoList { + l.svcCtx.DbEngin.Exec("update cloud set status = ?,start_time = ?,result = ? where participant_id = ? and id = ?", + cloudInfo.Status, cloudInfo.StartTime, cloudInfo.Result, req.AdapterId, cloudInfo.Id) + syncTask(l.svcCtx.DbEngin, cloudInfo.TaskId) + } + case 2: + for _, hpcInfo := range req.HpcInfoList { + l.svcCtx.DbEngin.Exec("update task_hpc set status = ?,start_time = ?,job_id = ? where cluster_id = ? and task_id = ? and name = ?", + hpcInfo.Status, hpcInfo.StartTime, hpcInfo.RunningTime, hpcInfo.JobId, req.AdapterId, hpcInfo.TaskId, hpcInfo.Name) + syncTask(l.svcCtx.DbEngin, hpcInfo.TaskId) + } + case 1: + for _, aiInfo := range req.AiInfoList { + l.svcCtx.DbEngin.Exec("update ai set status = ?,start_time = ?,project_id = ?,job_id = ? where participant_id = ? and task_id = ? and name = ?", + aiInfo.Status, aiInfo.StartTime, aiInfo.ProjectId, aiInfo.JobId, req.AdapterId, aiInfo.TaskId, aiInfo.Name) + syncTask(l.svcCtx.DbEngin, aiInfo.TaskId) + } + } + + return &resp, nil +} + +func syncTask(gorm *gorm.DB, taskId int64) { + + var allStatus string + tx := gorm.Raw("SELECT CONCAT_WS(',',GROUP_CONCAT(DISTINCT h.status) ,GROUP_CONCAT(DISTINCT a.status) ,GROUP_CONCAT(DISTINCT c.status))as status from task t left join hpc h on t.id = h.task_id left join cloud c on t.id = c.task_id left join ai a on t.id = a.task_id where t.id = ?", taskId).Scan(&allStatus) + if tx.Error != nil { + logx.Error(tx.Error) + } + // 子状态统一则修改主任务状态 + statusArray := strings.Split(allStatus, ",") + if len(removeRepeatedElement(statusArray)) == 1 { + updateTask(gorm, taskId, statusArray[0]) + + } + // 子任务包含失败状态 主任务则失败 + if strings.Contains(allStatus, constants.Failed) { + updateTask(gorm, taskId, constants.Failed) + + } + if strings.Contains(allStatus, constants.Running) { + updateTask(gorm, taskId, constants.Running) + } + +} + +func updateTask(gorm *gorm.DB, taskId int64, status string) { + var task models.Task + gorm.Where("id = ? ", taskId).Find(&task) + if task.Status != status { + task.Status = status + gorm.Updates(&task) + } +} + +func removeRepeatedElement(arr []string) (newArr []string) { + newArr = make([]string, 0) + for i := 0; i < len(arr); i++ { + repeat := false + for j := i + 1; j < len(arr); j++ { + if arr[i] == arr[j] { + repeat = true + break + } + } + if !repeat { + newArr = append(newArr, arr[i]) + } + } + return +}