From be97e73406faa80e87cb9e76fac0d9c2d8444fa7 Mon Sep 17 00:00:00 2001 From: qiwang <1364512070@qq.com> Date: Wed, 23 Oct 2024 11:03:41 +0800 Subject: [PATCH 1/8] fix: Restore taskAi field Former-commit-id: 5745f4a539f53dc8f903fe8054286d0452d8949d --- pkg/models/taskaimodel_gen.go | 54 +++++++++++++++-------------------- 1 file changed, 23 insertions(+), 31 deletions(-) diff --git a/pkg/models/taskaimodel_gen.go b/pkg/models/taskaimodel_gen.go index b4608705..5c58decc 100644 --- a/pkg/models/taskaimodel_gen.go +++ b/pkg/models/taskaimodel_gen.go @@ -35,34 +35,26 @@ type ( } TaskAi struct { - Id int64 `db:"id"` // id - TaskId int64 `db:"task_id"` // 任务id - AdapterId int64 `db:"adapter_id"` // 适配器id - AdapterName string `db:"adapter_name"` // 适配器名称 - ClusterId int64 `db:"cluster_id"` // 集群id - ClusterName string `db:"cluster_name"` // 集群名称 - Name string `db:"name"` // 任务名 - Replica int64 `db:"replica"` // 执行数 - JobId string `db:"job_id"` // 集群返回任务id - Strategy string `db:"strategy"` // 主任务使用策略 - Status string `db:"status"` // 任务状态 - Msg string `db:"msg"` // 集群返回任务信息 - CommitTime time.Time `db:"commit_time"` // 提交时间 - StartTime string `db:"start_time"` // 开始时间 - EndTime string `db:"end_time"` // 结束时间 - TaskType string `db:"task_type"` - DeletedAt time.Time `db:"deleted_at"` - Card string `db:"card"` - Remark string `db:"remark"` // 备注 - InferUrl string `db:"infer_url"` - ModelName string `db:"model_name"` - AlgorithmId string `db:"algorithm_id"` // 算法id - ImageId string `db:"image_id"` // 镜像id - Command string `db:"command"` // 启动命令 - Environments string `db:"environments"` // 训练作业的环境变量 - Parameters string `db:"parameters"` // 训练作业的运行参数 - FlavorId string `db:"flavor_id"` // 规格id - MetadataName string `db:"metadata_name"` // 训练作业名称 + Id int64 `db:"id"` // id + TaskId int64 `db:"task_id"` // 任务id + AdapterId int64 `db:"adapter_id"` // 适配器id + AdapterName string `db:"adapter_name"` // 适配器名称 + ClusterId int64 `db:"cluster_id"` // 集群id + ClusterName string `db:"cluster_name"` // 集群名称 + Name string `db:"name"` // 任务名 + Replica int64 `db:"replica"` // 执行数 + JobId string `db:"job_id"` // 集群返回任务id + Strategy string `db:"strategy"` // 主任务使用策略 + Status string `db:"status"` // 任务状态 + Msg string `db:"msg"` // 集群返回任务信息 + CommitTime time.Time `db:"commit_time"` // 提交时间 + StartTime string `db:"start_time"` // 开始时间 + EndTime string `db:"end_time"` // 结束时间 + TaskType string `db:"task_type"` + DeletedAt *time.Time `db:"deleted_at"` + Card string `db:"card"` + InferUrl string `db:"infer_url"` + ModelName string `db:"model_name"` } ) @@ -94,14 +86,14 @@ func (m *defaultTaskAiModel) FindOne(ctx context.Context, id int64) (*TaskAi, er } func (m *defaultTaskAiModel) Insert(ctx context.Context, data *TaskAi) (sql.Result, error) { - query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, taskAiRowsExpectAutoSet) - ret, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.AdapterId, data.AdapterName, data.ClusterId, data.ClusterName, data.Name, data.Replica, data.JobId, data.Strategy, data.Status, data.Msg, data.CommitTime, data.StartTime, data.EndTime, data.TaskType, data.DeletedAt, data.Card, data.Remark, data.InferUrl, data.ModelName, data.AlgorithmId, data.ImageId, data.Command, data.Environments, data.Parameters, data.FlavorId, data.MetadataName) + query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, taskAiRowsExpectAutoSet) + ret, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.AdapterId, data.AdapterName, data.ClusterId, data.ClusterName, data.Name, data.Replica, data.JobId, data.Strategy, data.Status, data.Msg, data.CommitTime, data.StartTime, data.EndTime, data.TaskType, data.DeletedAt, data.Card, data.InferUrl, data.ModelName) return ret, err } func (m *defaultTaskAiModel) Update(ctx context.Context, data *TaskAi) error { query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, taskAiRowsWithPlaceHolder) - _, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.AdapterId, data.AdapterName, data.ClusterId, data.ClusterName, data.Name, data.Replica, data.JobId, data.Strategy, data.Status, data.Msg, data.CommitTime, data.StartTime, data.EndTime, data.TaskType, data.DeletedAt, data.Card, data.Remark, data.InferUrl, data.ModelName, data.AlgorithmId, data.ImageId, data.Command, data.Environments, data.Parameters, data.FlavorId, data.MetadataName, data.Id) + _, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.AdapterId, data.AdapterName, data.ClusterId, data.ClusterName, data.Name, data.Replica, data.JobId, data.Strategy, data.Status, data.Msg, data.CommitTime, data.StartTime, data.EndTime, data.TaskType, data.DeletedAt, data.Card, data.InferUrl, data.ModelName, data.Id) return err } From 86a670e59bb0378ad4df100677222e664073129a Mon Sep 17 00:00:00 2001 From: qiwang <1364512070@qq.com> Date: Wed, 23 Oct 2024 11:11:23 +0800 Subject: [PATCH 2/8] fix: Restore taskAi field Former-commit-id: bc6af6d16b53319c959ab5dc79296bdcd1808905 --- internal/logic/core/pulltaskinfologic.go | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/internal/logic/core/pulltaskinfologic.go b/internal/logic/core/pulltaskinfologic.go index 12ba381e..a727f766 100644 --- a/internal/logic/core/pulltaskinfologic.go +++ b/internal/logic/core/pulltaskinfologic.go @@ -2,18 +2,14 @@ package core import ( "context" - "encoding/json" - "fmt" "github.com/jinzhu/copier" + "github.com/zeromicro/go-zero/core/logx" clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/client" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models/cloud" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gorm.io/gorm" - "log" - - "github.com/zeromicro/go-zero/core/logx" - "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" ) type PullTaskInfoLogic struct { @@ -84,7 +80,7 @@ func (l *PullTaskInfoLogic) PullTaskInfo(req *clientCore.PullTaskInfoReq) (*clie return nil, err } utils.Convert(aiModelList, &resp.AiInfoList) - if len(resp.AiInfoList) > 0 { + /*if len(resp.AiInfoList) > 0 { for i, aiInfo := range aiModelList { if resp.AiInfoList[i].Environments != "" { // 定义一个map来存储解析后的JSON数据 @@ -116,7 +112,7 @@ func (l *PullTaskInfoLogic) PullTaskInfo(req *clientCore.PullTaskInfoReq) (*clie } } - } + }*/ } return &resp, nil } From b7a8602155675e0f30e1733f62099b0eef3fadca Mon Sep 17 00:00:00 2001 From: qiwang <1364512070@qq.com> Date: Wed, 23 Oct 2024 14:25:15 +0800 Subject: [PATCH 3/8] fix: add task_ai_modelarts Former-commit-id: 9e9675e3bfa20f095abb9bd7424c9e382041ccf0 --- internal/logic/core/pulltaskinfologic.go | 2 +- internal/storeLink/modelarts.go | 19 +++-- pkg/models/taskaimodelartsmodel.go | 29 +++++++ pkg/models/taskaimodelartsmodel_gen.go | 98 ++++++++++++++++++++++++ 4 files changed, 139 insertions(+), 9 deletions(-) create mode 100644 pkg/models/taskaimodelartsmodel.go create mode 100644 pkg/models/taskaimodelartsmodel_gen.go diff --git a/internal/logic/core/pulltaskinfologic.go b/internal/logic/core/pulltaskinfologic.go index a727f766..d13e1ca0 100644 --- a/internal/logic/core/pulltaskinfologic.go +++ b/internal/logic/core/pulltaskinfologic.go @@ -74,7 +74,7 @@ func (l *PullTaskInfoLogic) PullTaskInfo(req *clientCore.PullTaskInfoReq) (*clie } case 1: - var aiModelList []models.TaskAi + var aiModelList []models.TaskAiModelarts err := findModelList(req.AdapterId, l.svcCtx.DbEngin, &aiModelList) if err != nil { return nil, err diff --git a/internal/storeLink/modelarts.go b/internal/storeLink/modelarts.go index 29f63753..4ce605b7 100644 --- a/internal/storeLink/modelarts.go +++ b/internal/storeLink/modelarts.go @@ -22,6 +22,7 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils/timeutils" "gitlink.org.cn/JointCloud/pcm-modelarts/client/imagesservice" @@ -63,10 +64,6 @@ type Version struct { Major, Minor, Patch int } -type AiStorage struct { - DbEngin *gorm.DB -} - // ParseVersion 从字符串解析版本号 func ParseVersion(versionStr string) (*Version, error) { parts := strings.Split(versionStr, ".") @@ -177,13 +174,19 @@ func (m *ModelArtsLink) SubmitTask(ctx context.Context, imageId string, cmd stri NodeCount: 1, }, }, - Platform: m.platform, + //Platform: m.platform, + Platform: "modelarts-CloudBrain2", } resp, err := m.modelArtsRpc.CreateTrainingJob(ctx, req) - //tx := m.DbEngin.Create(adapterId) - /*if tx.Error != nil { + aiModelarts := models.TaskAiModelarts{} + aiModelarts.ImageId = imageId + aiModelarts.FlavorId = resourceId + aiModelarts.Cmd = cmd + //aiModelarts.TaskId = + tx := m.DbEngin.Table("task_ai_modelarts").Create(&aiModelarts) + if tx.Error != nil { return tx.Error, nil - }*/ + } if err != nil { return nil, err diff --git a/pkg/models/taskaimodelartsmodel.go b/pkg/models/taskaimodelartsmodel.go new file mode 100644 index 00000000..e225f9ae --- /dev/null +++ b/pkg/models/taskaimodelartsmodel.go @@ -0,0 +1,29 @@ +package models + +import "github.com/zeromicro/go-zero/core/stores/sqlx" + +var _ TaskAiModelartsModel = (*customTaskAiModelartsModel)(nil) + +type ( + // TaskAiModelartsModel is an interface to be customized, add more methods here, + // and implement the added methods in customTaskAiModelartsModel. + TaskAiModelartsModel interface { + taskAiModelartsModel + withSession(session sqlx.Session) TaskAiModelartsModel + } + + customTaskAiModelartsModel struct { + *defaultTaskAiModelartsModel + } +) + +// NewTaskAiModelartsModel returns a model for the database table. +func NewTaskAiModelartsModel(conn sqlx.SqlConn) TaskAiModelartsModel { + return &customTaskAiModelartsModel{ + defaultTaskAiModelartsModel: newTaskAiModelartsModel(conn), + } +} + +func (m *customTaskAiModelartsModel) withSession(session sqlx.Session) TaskAiModelartsModel { + return NewTaskAiModelartsModel(sqlx.NewSqlConnFromSession(session)) +} diff --git a/pkg/models/taskaimodelartsmodel_gen.go b/pkg/models/taskaimodelartsmodel_gen.go new file mode 100644 index 00000000..244604bd --- /dev/null +++ b/pkg/models/taskaimodelartsmodel_gen.go @@ -0,0 +1,98 @@ +// Code generated by goctl. DO NOT EDIT. + +package models + +import ( + "context" + "database/sql" + "fmt" + "strings" + + "github.com/zeromicro/go-zero/core/stores/builder" + "github.com/zeromicro/go-zero/core/stores/sqlx" + "github.com/zeromicro/go-zero/core/stringx" +) + +var ( + taskAiModelartsFieldNames = builder.RawFieldNames(&TaskAiModelarts{}) + taskAiModelartsRows = strings.Join(taskAiModelartsFieldNames, ",") + taskAiModelartsRowsExpectAutoSet = strings.Join(stringx.Remove(taskAiModelartsFieldNames, "`create_at`", "`create_time`", "`created_at`", "`update_at`", "`update_time`", "`updated_at`"), ",") + taskAiModelartsRowsWithPlaceHolder = strings.Join(stringx.Remove(taskAiModelartsFieldNames, "`id`", "`create_at`", "`create_time`", "`created_at`", "`update_at`", "`update_time`", "`updated_at`"), "=?,") + "=?" +) + +type ( + taskAiModelartsModel interface { + Insert(ctx context.Context, data *TaskAiModelarts) (sql.Result, error) + FindOne(ctx context.Context, id int64) (*TaskAiModelarts, error) + Update(ctx context.Context, data *TaskAiModelarts) error + Delete(ctx context.Context, id int64) error + } + + defaultTaskAiModelartsModel struct { + conn sqlx.SqlConn + table string + } + + TaskAiModelarts struct { + Id int64 `db:"id"` // 训练作业资源规格id + TaskId int64 `db:"task_id"` // 任务id + AdapterId int64 `db:"adapter_id"` // 执行任务的适配器id + AdapterName string `db:"adapter_name"` // 适配器名称 + ClusterId int64 `db:"cluster_id"` // 集群id + ClusterName string `db:"cluster_name"` // 集群名称 + Name string `db:"name"` // 任务名 + Replica int64 `db:"replica"` // 执行数 + JobId string `db:"job_id"` // 集群返回任务id + StartTime string `db:"start_time"` // 开始时间 + RunningTime string `db:"running_time"` // 运行时间 + Result string `db:"result"` // 运行结果 + DeletedAt string `db:"deleted_at"` // 删除时间 + ImageId string `db:"image_id"` // 镜像id + Cmd string `db:"cmd"` // 命令行 + FlavorId string `db:"flavor_id"` // 训练作业资源规格id + Status string `db:"status"` // 任务状态 + } +) + +func newTaskAiModelartsModel(conn sqlx.SqlConn) *defaultTaskAiModelartsModel { + return &defaultTaskAiModelartsModel{ + conn: conn, + table: "`task_ai_modelarts`", + } +} + +func (m *defaultTaskAiModelartsModel) Delete(ctx context.Context, id int64) error { + query := fmt.Sprintf("delete from %s where `id` = ?", m.table) + _, err := m.conn.ExecCtx(ctx, query, id) + return err +} + +func (m *defaultTaskAiModelartsModel) FindOne(ctx context.Context, id int64) (*TaskAiModelarts, error) { + query := fmt.Sprintf("select %s from %s where `id` = ? limit 1", taskAiModelartsRows, m.table) + var resp TaskAiModelarts + err := m.conn.QueryRowCtx(ctx, &resp, query, id) + switch err { + case nil: + return &resp, nil + case sqlx.ErrNotFound: + return nil, ErrNotFound + default: + return nil, err + } +} + +func (m *defaultTaskAiModelartsModel) Insert(ctx context.Context, data *TaskAiModelarts) (sql.Result, error) { + query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, taskAiModelartsRowsExpectAutoSet) + ret, err := m.conn.ExecCtx(ctx, query, data.Id, data.TaskId, data.AdapterId, data.AdapterName, data.ClusterId, data.ClusterName, data.Name, data.Replica, data.JobId, data.StartTime, data.RunningTime, data.Result, data.DeletedAt, data.ImageId, data.Cmd, data.FlavorId, data.Status) + return ret, err +} + +func (m *defaultTaskAiModelartsModel) Update(ctx context.Context, data *TaskAiModelarts) error { + query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, taskAiModelartsRowsWithPlaceHolder) + _, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.AdapterId, data.AdapterName, data.ClusterId, data.ClusterName, data.Name, data.Replica, data.JobId, data.StartTime, data.RunningTime, data.Result, data.DeletedAt, data.ImageId, data.Cmd, data.FlavorId, data.Status, data.Id) + return err +} + +func (m *defaultTaskAiModelartsModel) tableName() string { + return m.table +} From 26ac5724000dcbcecb124836e43a6cc0e2ca19df Mon Sep 17 00:00:00 2001 From: qiwang <1364512070@qq.com> Date: Thu, 24 Oct 2024 15:05:55 +0800 Subject: [PATCH 4/8] fix: add task_ai_async Former-commit-id: 55fdcb01ab8719726afec147431087c2f2d284f8 --- client/types.go | 100 +++++++++++----------- internal/logic/core/pulltaskinfologic.go | 35 +------- internal/logic/core/pushtaskinfologic.go | 4 +- internal/storeLink/modelarts.go | 16 +--- pkg/models/taskaiasynchronousmodel.go | 29 +++++++ pkg/models/taskaiasynchronousmodel_gen.go | 98 +++++++++++++++++++++ 6 files changed, 181 insertions(+), 101 deletions(-) create mode 100644 pkg/models/taskaiasynchronousmodel.go create mode 100644 pkg/models/taskaiasynchronousmodel_gen.go diff --git a/client/types.go b/client/types.go index 97d96867..80f842be 100644 --- a/client/types.go +++ b/client/types.go @@ -136,61 +136,61 @@ type CloudInfo struct { } type AiInfo struct { - Id int64 `json:"id"` // id - AdapterId int64 `json:"adapterId,omitempty,optional"` - AdapterName string `json:"adapterName,omitempty,optional"` - ClusterId int64 `json:"clusterId,omitempty,optional"` - ClusterIds []int64 `json:"clusterIds,omitempty,optional"` - TaskId int64 `json:"taskId,omitempty"` - TaskName string `json:"taskName,omitempty"` - Replica int32 `json:"replica,omitempty"` - ResourceType string `json:"resourceType,omitempty"` - CpuCoreNum int32 `json:"cpuCoreNum,omitempty"` - TaskType string `json:"taskType,omitempty"` - DatasetsName string `json:"datasetsName,omitempty"` - ProjectId string `json:"project_id,omitempty"` - StrategyName string `json:"strategyName,omitempty"` - ClusterToStaticWeight map[string]int32 `json:"clusterToStaticWeight,omitempty"` - Tops float64 `json:"tops,omitempty"` - ComputeCard string `json:"computeCard,omitempty,optional"` - CodeType string `json:"codeType,omitempty,optional"` - ClusterName string `json:"clusterName,omitempty,optional"` - ModelName string `json:"ModelName,omitempty,optional"` - AlgorithmName string `json:"algorithmName,omitempty,optional"` - Strategy string `json:"strategy,omitempty"` + Id int64 `json:"id"` // id + AdapterId int64 `json:"adapterId,omitempty,optional"` + AdapterName string `json:"adapterName,omitempty,optional"` + ClusterId int64 `json:"clusterId,omitempty,optional"` + ClusterIds []int64 `json:"clusterIds,omitempty,optional"` + TaskId int64 `json:"taskId,omitempty"` + ClusterName string `json:"clusterName,omitempty,optional"` + ImageId string `json:"imageId,omitempty"` + ResourceId string `json:"resourceId,omitempty"` + AlgorithmId string `json:"algorithmId,omitempty"` + MetadataName string `json:"metadataName,omitempty"` + Command string `json:"command,omitempty"` + Environments string `json:"environments,omitempty"` + Parameters string `json:"parameters,omitempty"` - ImageId string `json:"imageId,omitempty"` - SpecId string `json:"specId,omitempty"` - DatasetsId string `json:"datasetsId,omitempty"` - CodeId string `json:"codeId,omitempty"` - ResourceId string `json:"resourceId,omitempty"` - AlgorithmId string `json:"algorithmId,omitempty"` - MetadataName string `json:"metadataName,omitempty"` + Name string `json:"name,omitempty"` + Status string `json:"status,omitempty"` + StartTime string `json:"startTime,omitempty"` + //RunningTime int64 `json:"runningTime,omitempty"` + JobId string `json:"jobId,omitempty"` + FlavorId string `json:"flavorId,omitempty"` + //TaskName string `json:"taskName,omitempty"` + //Replica int32 `json:"replica,omitempty"` + //ResourceType string `json:"resourceType,omitempty"` + //CpuCoreNum int32 `json:"cpuCoreNum,omitempty"` + //TaskType string `json:"taskType,omitempty"` + //DatasetsName string `json:"datasetsName,omitempty"` + //ProjectId string `json:"project_id,omitempty"` + //StrategyName string `json:"strategyName,omitempty"` + //ClusterToStaticWeight map[string]int32 `json:"clusterToStaticWeight,omitempty"` + //Tops float64 `json:"tops,omitempty"` + //ComputeCard string `json:"computeCard,omitempty,optional"` + //CodeType string `json:"codeType,omitempty,optional"` - Cmd string `json:"cmd,omitempty"` - Envs []string `json:"envs,omitempty"` - Params []string `json:"params,omitempty"` - Environments string `json:"environments,omitempty"` - Parameters string `json:"parameters,omitempty"` + //ModelName string `json:"ModelName,omitempty,optional"` + //AlgorithmName string `json:"algorithmName,omitempty,optional"` + //Strategy string `json:"strategy,omitempty"` + //Envs []string `json:"envs,omitempty"` + //Params []string `json:"params,omitempty"` - Name string `json:"name,omitempty"` - Status string `json:"status,omitempty"` - StartTime string `json:"startTime,omitempty"` - RunningTime int64 `json:"runningTime,omitempty"` - Result string `json:"result,omitempty"` - JobId string `json:"jobId,omitempty"` + //SpecId string `json:"specId,omitempty"` + //DatasetsId string `json:"datasetsId,omitempty"` + //CodeId string `json:"codeId,omitempty"` + //Result string `json:"result,omitempty"` - Datasets string `json:"datasets,omitempty"` - AlgorithmCode string `json:"algorithmCode,omitempty"` - Image string `json:"image,omitempty"` + //Datasets string `json:"datasets,omitempty"` + //AlgorithmCode string `json:"algorithmCode,omitempty"` + //Image string `json:"image,omitempty"` - CreateTime string `json:"createTime,omitempty"` - ImageUrl string `json:"imageUrl,omitempty"` - Command string `json:"command,omitempty"` - FlavorId string `json:"flavorId,omitempty"` - SubscriptionId string `json:"subscriptionId,omitempty"` - ItemVersionId string `json:"itemVersionId,omitempty"` - ObsUrl string `json:"obsUrl,omitempty"` + //CreateTime string `json:"createTime,omitempty"` + //ImageUrl string `json:"imageUrl,omitempty"` + + //SubscriptionId string `json:"subscriptionId,omitempty"` + //ItemVersionId string `json:"itemVersionId,omitempty"` + //ObsUrl string `json:"obsUrl,omitempty"` } type VmInfo struct { diff --git a/internal/logic/core/pulltaskinfologic.go b/internal/logic/core/pulltaskinfologic.go index d13e1ca0..671e0770 100644 --- a/internal/logic/core/pulltaskinfologic.go +++ b/internal/logic/core/pulltaskinfologic.go @@ -74,45 +74,12 @@ func (l *PullTaskInfoLogic) PullTaskInfo(req *clientCore.PullTaskInfoReq) (*clie } case 1: - var aiModelList []models.TaskAiModelarts + var aiModelList []models.TaskAiAsynchronous err := findModelList(req.AdapterId, l.svcCtx.DbEngin, &aiModelList) if err != nil { return nil, err } utils.Convert(aiModelList, &resp.AiInfoList) - /*if len(resp.AiInfoList) > 0 { - for i, aiInfo := range aiModelList { - if resp.AiInfoList[i].Environments != "" { - // 定义一个map来存储解析后的JSON数据 - var result map[string]interface{} - // 解析JSON字符串 - err := json.Unmarshal([]byte(resp.AiInfoList[i].Environments), &result) - if err != nil { - log.Fatalf("Error parsing JSON: %v", err) - } - // 如果你需要将解析后的map再次转换为JSON字符串,可以使用json.MarshalIndent - formattedJSON, err := json.MarshalIndent(result, "", " ") - aiInfo.Environments = string(formattedJSON) - fmt.Println(aiInfo.Environments) - resp.AiInfoList[i].Environments = aiInfo.Environments - } - if resp.AiInfoList[i].Parameters != "" { - // 定义一个map来存储解析后的JSON数据 - var result []interface{} - // 解析JSON字符串 - err := json.Unmarshal([]byte(resp.AiInfoList[i].Parameters), &result) - if err != nil { - log.Fatalf("Error parsing JSON: %v", err) - } - // 如果你需要将解析后的map再次转换为JSON字符串,可以使用json.MarshalIndent - formattedJSON, err := json.MarshalIndent(result, "", " ") - aiInfo.Parameters = string(formattedJSON) - fmt.Println(aiInfo.Parameters) - resp.AiInfoList[i].Parameters = aiInfo.Parameters - } - - } - }*/ } return &resp, nil } diff --git a/internal/logic/core/pushtaskinfologic.go b/internal/logic/core/pushtaskinfologic.go index 7e0f28d5..bd028241 100644 --- a/internal/logic/core/pushtaskinfologic.go +++ b/internal/logic/core/pushtaskinfologic.go @@ -89,8 +89,8 @@ func (l *PushTaskInfoLogic) PushTaskInfo(req *clientCore.PushTaskInfoReq) (*clie } case 1: for _, aiInfo := range req.AiInfoList { - l.svcCtx.DbEngin.Exec("update task_ai set status = ?,start_time = ?,project_id = ?,job_id = ? where participant_id = ? and task_id = ? and name = ?", - aiInfo.Status, aiInfo.StartTime, aiInfo.ProjectId, aiInfo.JobId, req.AdapterId, aiInfo.TaskId, aiInfo.Name) + l.svcCtx.DbEngin.Exec("update task_ai_asynchronous set status = ?,start_time = ?,job_id = ? where cluster_id = ? and task_id = ? and name = ?", + aiInfo.Status, aiInfo.StartTime, aiInfo.JobId, aiInfo.ClusterId, aiInfo.TaskId, aiInfo.Name) noticeInfo := clientCore.NoticeInfo{ TaskId: aiInfo.TaskId, AdapterId: aiInfo.AdapterId, diff --git a/internal/storeLink/modelarts.go b/internal/storeLink/modelarts.go index 4ce605b7..86fedfe1 100644 --- a/internal/storeLink/modelarts.go +++ b/internal/storeLink/modelarts.go @@ -22,14 +22,12 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" - "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils/timeutils" "gitlink.org.cn/JointCloud/pcm-modelarts/client/imagesservice" "gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice" "gitlink.org.cn/JointCloud/pcm-modelarts/modelarts" modelartsclient "gitlink.org.cn/JointCloud/pcm-modelarts/modelarts" - "gorm.io/gorm" "log" "mime/multipart" "strconv" @@ -56,7 +54,6 @@ type ModelArtsLink struct { Version string ModelId string ModelType string - DbEngin *gorm.DB } // Version 结构体表示版本号 @@ -174,20 +171,9 @@ func (m *ModelArtsLink) SubmitTask(ctx context.Context, imageId string, cmd stri NodeCount: 1, }, }, - //Platform: m.platform, - Platform: "modelarts-CloudBrain2", + Platform: m.platform, } resp, err := m.modelArtsRpc.CreateTrainingJob(ctx, req) - aiModelarts := models.TaskAiModelarts{} - aiModelarts.ImageId = imageId - aiModelarts.FlavorId = resourceId - aiModelarts.Cmd = cmd - //aiModelarts.TaskId = - tx := m.DbEngin.Table("task_ai_modelarts").Create(&aiModelarts) - if tx.Error != nil { - return tx.Error, nil - } - if err != nil { return nil, err } diff --git a/pkg/models/taskaiasynchronousmodel.go b/pkg/models/taskaiasynchronousmodel.go new file mode 100644 index 00000000..8943c365 --- /dev/null +++ b/pkg/models/taskaiasynchronousmodel.go @@ -0,0 +1,29 @@ +package models + +import "github.com/zeromicro/go-zero/core/stores/sqlx" + +var _ TaskAiAsynchronousModel = (*customTaskAiAsynchronousModel)(nil) + +type ( + // TaskAiAsynchronousModel is an interface to be customized, add more methods here, + // and implement the added methods in customTaskAiAsynchronousModel. + TaskAiAsynchronousModel interface { + taskAiAsynchronousModel + withSession(session sqlx.Session) TaskAiAsynchronousModel + } + + customTaskAiAsynchronousModel struct { + *defaultTaskAiAsynchronousModel + } +) + +// NewTaskAiAsynchronousModel returns a model for the database table. +func NewTaskAiAsynchronousModel(conn sqlx.SqlConn) TaskAiAsynchronousModel { + return &customTaskAiAsynchronousModel{ + defaultTaskAiAsynchronousModel: newTaskAiAsynchronousModel(conn), + } +} + +func (m *customTaskAiAsynchronousModel) withSession(session sqlx.Session) TaskAiAsynchronousModel { + return NewTaskAiAsynchronousModel(sqlx.NewSqlConnFromSession(session)) +} diff --git a/pkg/models/taskaiasynchronousmodel_gen.go b/pkg/models/taskaiasynchronousmodel_gen.go new file mode 100644 index 00000000..cca85a77 --- /dev/null +++ b/pkg/models/taskaiasynchronousmodel_gen.go @@ -0,0 +1,98 @@ +// Code generated by goctl. DO NOT EDIT. + +package models + +import ( + "context" + "database/sql" + "fmt" + "strings" + + "github.com/zeromicro/go-zero/core/stores/builder" + "github.com/zeromicro/go-zero/core/stores/sqlx" + "github.com/zeromicro/go-zero/core/stringx" +) + +var ( + taskAiAsynchronousFieldNames = builder.RawFieldNames(&TaskAiAsynchronous{}) + taskAiAsynchronousRows = strings.Join(taskAiAsynchronousFieldNames, ",") + taskAiAsynchronousRowsExpectAutoSet = strings.Join(stringx.Remove(taskAiAsynchronousFieldNames, "`create_at`", "`create_time`", "`created_at`", "`update_at`", "`update_time`", "`updated_at`"), ",") + taskAiAsynchronousRowsWithPlaceHolder = strings.Join(stringx.Remove(taskAiAsynchronousFieldNames, "`id`", "`create_at`", "`create_time`", "`created_at`", "`update_at`", "`update_time`", "`updated_at`"), "=?,") + "=?" +) + +type ( + taskAiAsynchronousModel interface { + Insert(ctx context.Context, data *TaskAiAsynchronous) (sql.Result, error) + FindOne(ctx context.Context, id int64) (*TaskAiAsynchronous, error) + Update(ctx context.Context, data *TaskAiAsynchronous) error + Delete(ctx context.Context, id int64) error + } + + defaultTaskAiAsynchronousModel struct { + conn sqlx.SqlConn + table string + } + + TaskAiAsynchronous struct { + Id int64 `db:"id"` // 训练作业资源规格id + TaskId int64 `db:"task_id"` // 任务id + AdapterId int64 `db:"adapter_id"` // 执行任务的适配器id + AdapterName string `db:"adapter_name"` // 适配器名称 + ClusterId int64 `db:"cluster_id"` // 集群id + ClusterName string `db:"cluster_name"` // 集群名称 + Name string `db:"name"` // 任务名 + Replica int64 `db:"replica"` // 执行数 + JobId string `db:"job_id"` // 集群返回任务id + StartTime string `db:"start_time"` // 开始时间 + RunningTime string `db:"running_time"` // 运行时间 + Result string `db:"result"` // 运行结果 + DeletedAt string `db:"deleted_at"` // 删除时间 + ImageId string `db:"image_id"` // 镜像id + Cmd string `db:"cmd"` // 命令行 + FlavorId string `db:"flavor_id"` // 训练作业资源规格id + Status string `db:"status"` // 任务状态 + } +) + +func newTaskAiAsynchronousModel(conn sqlx.SqlConn) *defaultTaskAiAsynchronousModel { + return &defaultTaskAiAsynchronousModel{ + conn: conn, + table: "`task_ai_asynchronous`", + } +} + +func (m *defaultTaskAiAsynchronousModel) Delete(ctx context.Context, id int64) error { + query := fmt.Sprintf("delete from %s where `id` = ?", m.table) + _, err := m.conn.ExecCtx(ctx, query, id) + return err +} + +func (m *defaultTaskAiAsynchronousModel) FindOne(ctx context.Context, id int64) (*TaskAiAsynchronous, error) { + query := fmt.Sprintf("select %s from %s where `id` = ? limit 1", taskAiAsynchronousRows, m.table) + var resp TaskAiAsynchronous + err := m.conn.QueryRowCtx(ctx, &resp, query, id) + switch err { + case nil: + return &resp, nil + case sqlx.ErrNotFound: + return nil, ErrNotFound + default: + return nil, err + } +} + +func (m *defaultTaskAiAsynchronousModel) Insert(ctx context.Context, data *TaskAiAsynchronous) (sql.Result, error) { + query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, taskAiAsynchronousRowsExpectAutoSet) + ret, err := m.conn.ExecCtx(ctx, query, data.Id, data.TaskId, data.AdapterId, data.AdapterName, data.ClusterId, data.ClusterName, data.Name, data.Replica, data.JobId, data.StartTime, data.RunningTime, data.Result, data.DeletedAt, data.ImageId, data.Cmd, data.FlavorId, data.Status) + return ret, err +} + +func (m *defaultTaskAiAsynchronousModel) Update(ctx context.Context, data *TaskAiAsynchronous) error { + query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, taskAiAsynchronousRowsWithPlaceHolder) + _, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.AdapterId, data.AdapterName, data.ClusterId, data.ClusterName, data.Name, data.Replica, data.JobId, data.StartTime, data.RunningTime, data.Result, data.DeletedAt, data.ImageId, data.Cmd, data.FlavorId, data.Status, data.Id) + return err +} + +func (m *defaultTaskAiAsynchronousModel) tableName() string { + return m.table +} From 1bbb35d3cf819f4855ed3639c3a75cfe8e8ddf17 Mon Sep 17 00:00:00 2001 From: qiwang <1364512070@qq.com> Date: Thu, 24 Oct 2024 15:35:16 +0800 Subject: [PATCH 5/8] fix: update AiInfo task_ai Former-commit-id: 8d900ea964c3d21e34eb015849f373d68e452fe6 --- client/types.go | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/client/types.go b/client/types.go index 80f842be..ca6987c9 100644 --- a/client/types.go +++ b/client/types.go @@ -136,20 +136,20 @@ type CloudInfo struct { } type AiInfo struct { - Id int64 `json:"id"` // id - AdapterId int64 `json:"adapterId,omitempty,optional"` - AdapterName string `json:"adapterName,omitempty,optional"` - ClusterId int64 `json:"clusterId,omitempty,optional"` - ClusterIds []int64 `json:"clusterIds,omitempty,optional"` - TaskId int64 `json:"taskId,omitempty"` - ClusterName string `json:"clusterName,omitempty,optional"` - ImageId string `json:"imageId,omitempty"` - ResourceId string `json:"resourceId,omitempty"` - AlgorithmId string `json:"algorithmId,omitempty"` - MetadataName string `json:"metadataName,omitempty"` - Command string `json:"command,omitempty"` - Environments string `json:"environments,omitempty"` - Parameters string `json:"parameters,omitempty"` + Id int64 `json:"id"` // id + AdapterId int64 `json:"adapterId,omitempty,optional"` + AdapterName string `json:"adapterName,omitempty,optional"` + ClusterId int64 `json:"clusterId,omitempty,optional"` + ClusterIds []int64 `json:"clusterIds,omitempty,optional"` + TaskId int64 `json:"taskId,omitempty"` + ClusterName string `json:"clusterName,omitempty,optional"` + ImageId string `json:"imageId,omitempty"` + //ResourceId string `json:"resourceId,omitempty"` + AlgorithmId string `json:"algorithmId,omitempty"` + MetadataName string `json:"metadataName,omitempty"` + Command string `json:"command,omitempty"` + Environments string `json:"environments,omitempty"` + Parameters string `json:"parameters,omitempty"` Name string `json:"name,omitempty"` Status string `json:"status,omitempty"` From 92428bcc069308d9fdc8d7a41893fe62c042858f Mon Sep 17 00:00:00 2001 From: tzwang Date: Thu, 24 Oct 2024 16:14:29 +0800 Subject: [PATCH 6/8] fix getadepterByModel bugs Former-commit-id: 1dc4ccd87fc22d7df3ebc427caede9aa69a82a0b --- .../logic/inference/getadaptersbymodellogic.go | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/internal/logic/inference/getadaptersbymodellogic.go b/internal/logic/inference/getadaptersbymodellogic.go index fe6a3c2b..fed1abde 100644 --- a/internal/logic/inference/getadaptersbymodellogic.go +++ b/internal/logic/inference/getadaptersbymodellogic.go @@ -38,7 +38,19 @@ func (l *GetAdaptersByModelLogic) GetAdaptersByModel(req *types.GetAdaptersByMod } for _, cluster := range clusters.List { - exist := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[adapter.Id][cluster.Id].CheckModelExistence(l.ctx, req.ModelName, req.ModelType) + + cmap, found := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[adapter.Id] + if !found { + continue + } + + iCluster, found := cmap[cluster.Id] + if !found { + continue + } + + exist := iCluster.CheckModelExistence(l.ctx, req.ModelName, req.ModelType) + if exist { c := &types.ClusterAvail{ ClusterId: cluster.Id, From 1da1c53ac4b13fca00b249357371951b5446309a Mon Sep 17 00:00:00 2001 From: qiwang <1364512070@qq.com> Date: Thu, 24 Oct 2024 16:54:14 +0800 Subject: [PATCH 7/8] fix: update AiInfo task_ai_async Former-commit-id: 9ecc944d36e6fe7fafcc6760572bf90069663570 --- client/types.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client/types.go b/client/types.go index ca6987c9..a2f8df49 100644 --- a/client/types.go +++ b/client/types.go @@ -145,8 +145,8 @@ type AiInfo struct { ClusterName string `json:"clusterName,omitempty,optional"` ImageId string `json:"imageId,omitempty"` //ResourceId string `json:"resourceId,omitempty"` - AlgorithmId string `json:"algorithmId,omitempty"` - MetadataName string `json:"metadataName,omitempty"` + AlgorithmId string `json:"algorithmId,omitempty"` + //MetadataName string `json:"metadataName,omitempty"` Command string `json:"command,omitempty"` Environments string `json:"environments,omitempty"` Parameters string `json:"parameters,omitempty"` From 92451c41d43e0827ffa0a9be7cbaba5522516a0b Mon Sep 17 00:00:00 2001 From: qiwang <1364512070@qq.com> Date: Thu, 24 Oct 2024 17:41:50 +0800 Subject: [PATCH 8/8] fix: update AiInfo task_ai_async and Former-commit-id: c4a6452facc465db3c0753ea96e8e972b7eb1bf9 --- client/types.go | 24 +++++++++++------------ pkg/models/taskaiasynchronousmodel_gen.go | 6 +++--- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/client/types.go b/client/types.go index a2f8df49..5c494b26 100644 --- a/client/types.go +++ b/client/types.go @@ -136,20 +136,20 @@ type CloudInfo struct { } type AiInfo struct { - Id int64 `json:"id"` // id - AdapterId int64 `json:"adapterId,omitempty,optional"` - AdapterName string `json:"adapterName,omitempty,optional"` - ClusterId int64 `json:"clusterId,omitempty,optional"` - ClusterIds []int64 `json:"clusterIds,omitempty,optional"` - TaskId int64 `json:"taskId,omitempty"` - ClusterName string `json:"clusterName,omitempty,optional"` - ImageId string `json:"imageId,omitempty"` + Id int64 `json:"id"` // id + AdapterId int64 `json:"adapterId,omitempty,optional"` + AdapterName string `json:"adapterName,omitempty,optional"` + ClusterId int64 `json:"clusterId,omitempty,optional"` + //ClusterIds []int64 `json:"clusterIds,omitempty,optional"` + TaskId int64 `json:"taskId,omitempty"` + ClusterName string `json:"clusterName,omitempty,optional"` + ImageId string `json:"imageId,omitempty"` //ResourceId string `json:"resourceId,omitempty"` - AlgorithmId string `json:"algorithmId,omitempty"` + //AlgorithmId string `json:"algorithmId,omitempty"` //MetadataName string `json:"metadataName,omitempty"` - Command string `json:"command,omitempty"` - Environments string `json:"environments,omitempty"` - Parameters string `json:"parameters,omitempty"` + Command string `json:"command,omitempty"` + //Environments string `json:"environments,omitempty"` + //Parameters string `json:"parameters,omitempty"` Name string `json:"name,omitempty"` Status string `json:"status,omitempty"` diff --git a/pkg/models/taskaiasynchronousmodel_gen.go b/pkg/models/taskaiasynchronousmodel_gen.go index cca85a77..a0157e4a 100644 --- a/pkg/models/taskaiasynchronousmodel_gen.go +++ b/pkg/models/taskaiasynchronousmodel_gen.go @@ -48,7 +48,7 @@ type ( Result string `db:"result"` // 运行结果 DeletedAt string `db:"deleted_at"` // 删除时间 ImageId string `db:"image_id"` // 镜像id - Cmd string `db:"cmd"` // 命令行 + Command string `db:"command"` // 命令行 FlavorId string `db:"flavor_id"` // 训练作业资源规格id Status string `db:"status"` // 任务状态 } @@ -83,13 +83,13 @@ func (m *defaultTaskAiAsynchronousModel) FindOne(ctx context.Context, id int64) func (m *defaultTaskAiAsynchronousModel) Insert(ctx context.Context, data *TaskAiAsynchronous) (sql.Result, error) { query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, taskAiAsynchronousRowsExpectAutoSet) - ret, err := m.conn.ExecCtx(ctx, query, data.Id, data.TaskId, data.AdapterId, data.AdapterName, data.ClusterId, data.ClusterName, data.Name, data.Replica, data.JobId, data.StartTime, data.RunningTime, data.Result, data.DeletedAt, data.ImageId, data.Cmd, data.FlavorId, data.Status) + ret, err := m.conn.ExecCtx(ctx, query, data.Id, data.TaskId, data.AdapterId, data.AdapterName, data.ClusterId, data.ClusterName, data.Name, data.Replica, data.JobId, data.StartTime, data.RunningTime, data.Result, data.DeletedAt, data.ImageId, data.Command, data.FlavorId, data.Status) return ret, err } func (m *defaultTaskAiAsynchronousModel) Update(ctx context.Context, data *TaskAiAsynchronous) error { query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, taskAiAsynchronousRowsWithPlaceHolder) - _, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.AdapterId, data.AdapterName, data.ClusterId, data.ClusterName, data.Name, data.Replica, data.JobId, data.StartTime, data.RunningTime, data.Result, data.DeletedAt, data.ImageId, data.Cmd, data.FlavorId, data.Status, data.Id) + _, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.AdapterId, data.AdapterName, data.ClusterId, data.ClusterName, data.Name, data.Replica, data.JobId, data.StartTime, data.RunningTime, data.Result, data.DeletedAt, data.ImageId, data.Command, data.FlavorId, data.Status, data.Id) return err }