feat: add asynCommitAiTask interface

This commit is contained in:
qiwang 2024-10-31 12:31:54 +08:00
parent 0beb2acda0
commit 8461a125da
7 changed files with 190 additions and 5 deletions

View File

@ -222,7 +222,7 @@ type (
ImageRef int64 `json:"imageRef,optional"` ImageRef int64 `json:"imageRef,optional"`
FlavorRef int64 `json:"flavorRef,optional"` FlavorRef int64 `json:"flavorRef,optional"`
Uuid int64 `json:"uuid,optional"` Uuid int64 `json:"uuid,optional"`
Replicas int64 `json:"replicas,string"` Replicas int64 `json:"replicas,optional"`
VmName string `json:"vm_name,optional"` VmName string `json:"vm_name,optional"`
} }
TaskVm { TaskVm {
@ -317,6 +317,30 @@ type (
} }
) )
type(
asynCommitAiTaskReq{
Name string `json:"name,optional"`
AdapterIds []string `json:"adapterIds,optional"`
ClusterIds []string `json:"clusterIds,optional"`
Strategy string `json:"strategy,optional"`
StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"`
Replicas int64 `json:"replicas,optional"`
ImageId string `json:"imageId,optional"`
Command string `json:"command,optional"`
FlavorId string `json:"flavorId,optional"`
Status string `json:"status,optional"`
ClusterId int64 `json:"clusterId,optional"`
AdapterId string `json:"adapterId,optional"`
}
asynCommitAiTaskResp{
Code int32 `json:"code"`
Msg string `json:"msg"`
TaskId int64 `json:"taskId"`
}
)
type ( type (
scheduleTaskByYamlResp { scheduleTaskByYamlResp {
TaskId int64 `json:"taskId"` TaskId int64 `json:"taskId"`

View File

@ -43,6 +43,10 @@ service pcm {
@handler commitVmTaskHandler @handler commitVmTaskHandler
post /core/commitVmTask (commitVmTaskReq) returns (commitVmTaskResp) post /core/commitVmTask (commitVmTaskReq) returns (commitVmTaskResp)
@doc "异步提交智算任务"
@handler asynCommitAiTaskHandler
post /core/asynCommitAiTask (asynCommitAiTaskReq) returns (asynCommitAiTaskResp)
@doc "删除任务" @doc "删除任务"
@handler deleteTaskHandler @handler deleteTaskHandler
delete /core/deleteTask/:id (deleteTaskReq) delete /core/deleteTask/:id (deleteTaskReq)

View File

@ -0,0 +1,29 @@
package core
import (
"net/http"
"github.com/zeromicro/go-zero/rest/httpx"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/logic/core"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
)
// 异步提交智算任务
func AsynCommitAiTaskHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req types.AsynCommitAiTaskReq
if err := httpx.Parse(r, &req); err != nil {
httpx.ErrorCtx(r.Context(), w, err)
return
}
l := core.NewAsynCommitAiTaskLogic(r.Context(), svcCtx)
resp, err := l.AsynCommitAiTask(&req)
if err != nil {
httpx.ErrorCtx(r.Context(), w, err)
} else {
httpx.OkJsonCtx(r.Context(), w, resp)
}
}
}

View File

@ -370,6 +370,12 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
Path: "/core/assets", Path: "/core/assets",
Handler: core.NodeAssetsHandler(serverCtx), Handler: core.NodeAssetsHandler(serverCtx),
}, },
{
// 异步提交智算任务
Method: http.MethodPost,
Path: "/core/asynCommitAiTask",
Handler: core.AsynCommitAiTaskHandler(serverCtx),
},
{ {
// Center Resources top3 // Center Resources top3
Method: http.MethodGet, Method: http.MethodGet,

View File

@ -0,0 +1,101 @@
package core
import (
"context"
"github.com/pkg/errors"
clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/client"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"time"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type AsynCommitAiTaskLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
// 异步提交智算任务
func NewAsynCommitAiTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *AsynCommitAiTaskLogic {
return &AsynCommitAiTaskLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *AsynCommitAiTaskLogic) AsynCommitAiTask(req *types.AsynCommitAiTaskReq) (resp *types.AsynCommitAiTaskResp, err error) {
// todo: add your logic here and delete this line
resp = &types.AsynCommitAiTaskResp{}
var adapterName string
var clusterName string
var adapterId int64
//TODO adapter
//adapterId, _ := strconv.ParseUint(req.AdapterIds[0], 10, 64)
//taskAiAsynchronous := models.TaskAiAsynchronous{}
// 构建主任务结构体
taskModel := models.Task{
Name: req.Name,
Description: "ai asynchronous task",
CommitTime: time.Now(),
Status: "Saved",
AdapterTypeDict: "1",
}
// 保存任务数据到数据库
tx := l.svcCtx.DbEngin.Create(&taskModel)
if tx.Error != nil {
return nil, tx.Error
}
l.svcCtx.DbEngin.Raw("SELECT nickname FROM `t_cluster` where id = ?", req.ClusterId).Scan(&clusterName)
l.svcCtx.DbEngin.Raw("SELECT adapter_id FROM `t_cluster` where id = ?", req.ClusterId).Scan(&adapterId)
l.svcCtx.DbEngin.Raw("SELECT name FROM `t_adapter` where id = ?", adapterId).Scan(&adapterName)
if len(adapterName) == 0 || adapterName == "" {
return nil, errors.New("no corresponding adapter found")
}
AiAsynchronousInfo := models.TaskAiAsynchronous{
TaskId: taskModel.Id,
AdapterId: adapterId,
AdapterName: adapterName,
ClusterId: req.ClusterId,
ClusterName: clusterName,
Name: "trainJob" + utils.RandomString(10),
StartTime: time.Now().String(),
Status: "Saved",
ImageId: req.ImageId,
Command: req.Command,
FlavorId: req.FlavorId,
}
tx = l.svcCtx.DbEngin.Create(&AiAsynchronousInfo)
if tx.Error != nil {
return nil, tx.Error
}
noticeInfo := clientCore.NoticeInfo{
AdapterId: adapterId,
AdapterName: adapterName,
ClusterId: req.ClusterId,
ClusterName: clusterName,
NoticeType: "create",
TaskName: req.Name,
Incident: "任务创建中",
}
result := l.svcCtx.DbEngin.Table("t_notice").Create(&noticeInfo)
if result.Error != nil {
logx.Errorf("Task creation failure, err: %v", result.Error)
}
resp = &types.AsynCommitAiTaskResp{
Code: 200,
Msg: "success",
TaskId: taskModel.Id,
}
return resp, nil
}

View File

@ -7559,6 +7559,27 @@ type AlertListResp struct {
AlertMap map[string]interface{} `json:"alertMap"` AlertMap map[string]interface{} `json:"alertMap"`
} }
type AsynCommitAiTaskReq struct {
Name string `json:"name,optional"`
AdapterIds []string `json:"adapterIds,optional"`
ClusterIds []string `json:"clusterIds,optional"`
Strategy string `json:"strategy,optional"`
StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"`
Replicas int64 `json:"replicas,optional"`
ImageId string `json:"imageId,optional"`
Command string `json:"command,optional"`
FlavorId string `json:"flavorId,optional"`
Status string `json:"status,optional"`
ClusterId int64 `json:"clusterId,optional"`
AdapterId string `json:"adapterId,optional"`
}
type AsynCommitAiTaskResp struct {
Code int32 `json:"code"`
Msg string `json:"msg"`
TaskId int64 `json:"taskId"`
}
type CancelJobReq struct { type CancelJobReq struct {
ClusterId int64 `form:"clusterId"` ClusterId int64 `form:"clusterId"`
JobId string `form:"jobId"` JobId string `form:"jobId"`
@ -7659,7 +7680,7 @@ type CommitVmTaskReq struct {
ImageRef int64 `json:"imageRef,optional"` ImageRef int64 `json:"imageRef,optional"`
FlavorRef int64 `json:"flavorRef,optional"` FlavorRef int64 `json:"flavorRef,optional"`
Uuid int64 `json:"uuid,optional"` Uuid int64 `json:"uuid,optional"`
Replicas int64 `json:"replicas,string"` Replicas int64 `json:"replicas,optional"`
VmName string `json:"vm_name,optional"` VmName string `json:"vm_name,optional"`
} }

View File

@ -16,7 +16,7 @@ import (
var ( var (
taskAiAsynchronousFieldNames = builder.RawFieldNames(&TaskAiAsynchronous{}) taskAiAsynchronousFieldNames = builder.RawFieldNames(&TaskAiAsynchronous{})
taskAiAsynchronousRows = strings.Join(taskAiAsynchronousFieldNames, ",") taskAiAsynchronousRows = strings.Join(taskAiAsynchronousFieldNames, ",")
taskAiAsynchronousRowsExpectAutoSet = strings.Join(stringx.Remove(taskAiAsynchronousFieldNames, "`create_at`", "`create_time`", "`created_at`", "`update_at`", "`update_time`", "`updated_at`"), ",") taskAiAsynchronousRowsExpectAutoSet = strings.Join(stringx.Remove(taskAiAsynchronousFieldNames, "`id`", "`create_at`", "`create_time`", "`created_at`", "`update_at`", "`update_time`", "`updated_at`"), ",")
taskAiAsynchronousRowsWithPlaceHolder = strings.Join(stringx.Remove(taskAiAsynchronousFieldNames, "`id`", "`create_at`", "`create_time`", "`created_at`", "`update_at`", "`update_time`", "`updated_at`"), "=?,") + "=?" taskAiAsynchronousRowsWithPlaceHolder = strings.Join(stringx.Remove(taskAiAsynchronousFieldNames, "`id`", "`create_at`", "`create_time`", "`created_at`", "`update_at`", "`update_time`", "`updated_at`"), "=?,") + "=?"
) )
@ -82,8 +82,8 @@ func (m *defaultTaskAiAsynchronousModel) FindOne(ctx context.Context, id int64)
} }
func (m *defaultTaskAiAsynchronousModel) Insert(ctx context.Context, data *TaskAiAsynchronous) (sql.Result, error) { func (m *defaultTaskAiAsynchronousModel) Insert(ctx context.Context, data *TaskAiAsynchronous) (sql.Result, error) {
query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, taskAiAsynchronousRowsExpectAutoSet) query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, taskAiAsynchronousRowsExpectAutoSet)
ret, err := m.conn.ExecCtx(ctx, query, data.Id, data.TaskId, data.AdapterId, data.AdapterName, data.ClusterId, data.ClusterName, data.Name, data.Replica, data.JobId, data.StartTime, data.RunningTime, data.Result, data.DeletedAt, data.ImageId, data.Command, data.FlavorId, data.Status) ret, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.AdapterId, data.AdapterName, data.ClusterId, data.ClusterName, data.Name, data.Replica, data.JobId, data.StartTime, data.RunningTime, data.Result, data.DeletedAt, data.ImageId, data.Command, data.FlavorId, data.Status)
return ret, err return ret, err
} }