diff --git a/desc/core/pcm-core.api b/desc/core/pcm-core.api index 48f2b841..873b38db 100644 --- a/desc/core/pcm-core.api +++ b/desc/core/pcm-core.api @@ -222,7 +222,7 @@ type ( ImageRef int64 `json:"imageRef,optional"` FlavorRef int64 `json:"flavorRef,optional"` Uuid int64 `json:"uuid,optional"` - Replicas int64 `json:"replicas,string"` + Replicas int64 `json:"replicas,optional"` VmName string `json:"vm_name,optional"` } TaskVm { @@ -317,6 +317,30 @@ type ( } ) +type( + asynCommitAiTaskReq{ + Name string `json:"name,optional"` + AdapterIds []string `json:"adapterIds,optional"` + ClusterIds []string `json:"clusterIds,optional"` + Strategy string `json:"strategy,optional"` + StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"` + Replicas int64 `json:"replicas,optional"` + ImageId string `json:"imageId,optional"` + Command string `json:"command,optional"` + FlavorId string `json:"flavorId,optional"` + Status string `json:"status,optional"` + ClusterId int64 `json:"clusterId,optional"` + AdapterId string `json:"adapterId,optional"` + + } + + asynCommitAiTaskResp{ + Code int32 `json:"code"` + Msg string `json:"msg"` + TaskId int64 `json:"taskId"` + } +) + type ( scheduleTaskByYamlResp { TaskId int64 `json:"taskId"` diff --git a/desc/pcm.api b/desc/pcm.api index 4df0f31a..96e8f766 100644 --- a/desc/pcm.api +++ b/desc/pcm.api @@ -43,6 +43,10 @@ service pcm { @handler commitVmTaskHandler post /core/commitVmTask (commitVmTaskReq) returns (commitVmTaskResp) + @doc "异步提交智算任务" + @handler asynCommitAiTaskHandler + post /core/asynCommitAiTask (asynCommitAiTaskReq) returns (asynCommitAiTaskResp) + @doc "删除任务" @handler deleteTaskHandler delete /core/deleteTask/:id (deleteTaskReq) diff --git a/internal/handler/core/asyncommitaitaskhandler.go b/internal/handler/core/asyncommitaitaskhandler.go new file mode 100644 index 00000000..a04b067b --- /dev/null +++ b/internal/handler/core/asyncommitaitaskhandler.go @@ -0,0 +1,29 @@ +package core + +import ( + "net/http" + + "github.com/zeromicro/go-zero/rest/httpx" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/logic/core" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" +) + +// 异步提交智算任务 +func AsynCommitAiTaskHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var req types.AsynCommitAiTaskReq + if err := httpx.Parse(r, &req); err != nil { + httpx.ErrorCtx(r.Context(), w, err) + return + } + + l := core.NewAsynCommitAiTaskLogic(r.Context(), svcCtx) + resp, err := l.AsynCommitAiTask(&req) + if err != nil { + httpx.ErrorCtx(r.Context(), w, err) + } else { + httpx.OkJsonCtx(r.Context(), w, resp) + } + } +} diff --git a/internal/handler/routes.go b/internal/handler/routes.go index 8d3c71f2..bacf662d 100644 --- a/internal/handler/routes.go +++ b/internal/handler/routes.go @@ -370,6 +370,12 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) { Path: "/core/assets", Handler: core.NodeAssetsHandler(serverCtx), }, + { + // 异步提交智算任务 + Method: http.MethodPost, + Path: "/core/asynCommitAiTask", + Handler: core.AsynCommitAiTaskHandler(serverCtx), + }, { // Center Resources top3 Method: http.MethodGet, diff --git a/internal/logic/core/asyncommitaitasklogic.go b/internal/logic/core/asyncommitaitasklogic.go new file mode 100644 index 00000000..d6bffe1e --- /dev/null +++ b/internal/logic/core/asyncommitaitasklogic.go @@ -0,0 +1,101 @@ +package core + +import ( + "context" + "github.com/pkg/errors" + clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/client" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" + "time" + + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" + + "github.com/zeromicro/go-zero/core/logx" +) + +type AsynCommitAiTaskLogic struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext +} + +// 异步提交智算任务 +func NewAsynCommitAiTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *AsynCommitAiTaskLogic { + return &AsynCommitAiTaskLogic{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +func (l *AsynCommitAiTaskLogic) AsynCommitAiTask(req *types.AsynCommitAiTaskReq) (resp *types.AsynCommitAiTaskResp, err error) { + // todo: add your logic here and delete this line + resp = &types.AsynCommitAiTaskResp{} + var adapterName string + var clusterName string + var adapterId int64 + + //TODO adapter + //adapterId, _ := strconv.ParseUint(req.AdapterIds[0], 10, 64) + //taskAiAsynchronous := models.TaskAiAsynchronous{} + + // 构建主任务结构体 + taskModel := models.Task{ + Name: req.Name, + Description: "ai asynchronous task", + CommitTime: time.Now(), + Status: "Saved", + AdapterTypeDict: "1", + } + // 保存任务数据到数据库 + tx := l.svcCtx.DbEngin.Create(&taskModel) + if tx.Error != nil { + return nil, tx.Error + } + + l.svcCtx.DbEngin.Raw("SELECT nickname FROM `t_cluster` where id = ?", req.ClusterId).Scan(&clusterName) + l.svcCtx.DbEngin.Raw("SELECT adapter_id FROM `t_cluster` where id = ?", req.ClusterId).Scan(&adapterId) + l.svcCtx.DbEngin.Raw("SELECT name FROM `t_adapter` where id = ?", adapterId).Scan(&adapterName) + if len(adapterName) == 0 || adapterName == "" { + return nil, errors.New("no corresponding adapter found") + } + + AiAsynchronousInfo := models.TaskAiAsynchronous{ + TaskId: taskModel.Id, + AdapterId: adapterId, + AdapterName: adapterName, + ClusterId: req.ClusterId, + ClusterName: clusterName, + Name: "trainJob" + utils.RandomString(10), + StartTime: time.Now().String(), + Status: "Saved", + ImageId: req.ImageId, + Command: req.Command, + FlavorId: req.FlavorId, + } + tx = l.svcCtx.DbEngin.Create(&AiAsynchronousInfo) + if tx.Error != nil { + return nil, tx.Error + } + + noticeInfo := clientCore.NoticeInfo{ + AdapterId: adapterId, + AdapterName: adapterName, + ClusterId: req.ClusterId, + ClusterName: clusterName, + NoticeType: "create", + TaskName: req.Name, + Incident: "任务创建中", + } + result := l.svcCtx.DbEngin.Table("t_notice").Create(¬iceInfo) + if result.Error != nil { + logx.Errorf("Task creation failure, err: %v", result.Error) + } + resp = &types.AsynCommitAiTaskResp{ + Code: 200, + Msg: "success", + TaskId: taskModel.Id, + } + return resp, nil +} diff --git a/internal/types/types.go b/internal/types/types.go index 2a97272e..f611dd49 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -7559,6 +7559,27 @@ type AlertListResp struct { AlertMap map[string]interface{} `json:"alertMap"` } +type AsynCommitAiTaskReq struct { + Name string `json:"name,optional"` + AdapterIds []string `json:"adapterIds,optional"` + ClusterIds []string `json:"clusterIds,optional"` + Strategy string `json:"strategy,optional"` + StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"` + Replicas int64 `json:"replicas,optional"` + ImageId string `json:"imageId,optional"` + Command string `json:"command,optional"` + FlavorId string `json:"flavorId,optional"` + Status string `json:"status,optional"` + ClusterId int64 `json:"clusterId,optional"` + AdapterId string `json:"adapterId,optional"` +} + +type AsynCommitAiTaskResp struct { + Code int32 `json:"code"` + Msg string `json:"msg"` + TaskId int64 `json:"taskId"` +} + type CancelJobReq struct { ClusterId int64 `form:"clusterId"` JobId string `form:"jobId"` @@ -7659,7 +7680,7 @@ type CommitVmTaskReq struct { ImageRef int64 `json:"imageRef,optional"` FlavorRef int64 `json:"flavorRef,optional"` Uuid int64 `json:"uuid,optional"` - Replicas int64 `json:"replicas,string"` + Replicas int64 `json:"replicas,optional"` VmName string `json:"vm_name,optional"` } diff --git a/pkg/models/taskaiasynchronousmodel_gen.go b/pkg/models/taskaiasynchronousmodel_gen.go index a0157e4a..6bf6d4e8 100644 --- a/pkg/models/taskaiasynchronousmodel_gen.go +++ b/pkg/models/taskaiasynchronousmodel_gen.go @@ -16,7 +16,7 @@ import ( var ( taskAiAsynchronousFieldNames = builder.RawFieldNames(&TaskAiAsynchronous{}) taskAiAsynchronousRows = strings.Join(taskAiAsynchronousFieldNames, ",") - taskAiAsynchronousRowsExpectAutoSet = strings.Join(stringx.Remove(taskAiAsynchronousFieldNames, "`create_at`", "`create_time`", "`created_at`", "`update_at`", "`update_time`", "`updated_at`"), ",") + taskAiAsynchronousRowsExpectAutoSet = strings.Join(stringx.Remove(taskAiAsynchronousFieldNames, "`id`", "`create_at`", "`create_time`", "`created_at`", "`update_at`", "`update_time`", "`updated_at`"), ",") taskAiAsynchronousRowsWithPlaceHolder = strings.Join(stringx.Remove(taskAiAsynchronousFieldNames, "`id`", "`create_at`", "`create_time`", "`created_at`", "`update_at`", "`update_time`", "`updated_at`"), "=?,") + "=?" ) @@ -82,8 +82,8 @@ func (m *defaultTaskAiAsynchronousModel) FindOne(ctx context.Context, id int64) } func (m *defaultTaskAiAsynchronousModel) Insert(ctx context.Context, data *TaskAiAsynchronous) (sql.Result, error) { - query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, taskAiAsynchronousRowsExpectAutoSet) - ret, err := m.conn.ExecCtx(ctx, query, data.Id, data.TaskId, data.AdapterId, data.AdapterName, data.ClusterId, data.ClusterName, data.Name, data.Replica, data.JobId, data.StartTime, data.RunningTime, data.Result, data.DeletedAt, data.ImageId, data.Command, data.FlavorId, data.Status) + query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, taskAiAsynchronousRowsExpectAutoSet) + ret, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.AdapterId, data.AdapterName, data.ClusterId, data.ClusterName, data.Name, data.Replica, data.JobId, data.StartTime, data.RunningTime, data.Result, data.DeletedAt, data.ImageId, data.Command, data.FlavorId, data.Status) return ret, err }