From 7ae6282867afec6e63cd50fa9a42376b1e152dfa Mon Sep 17 00:00:00 2001 From: zhouqunjie Date: Sat, 22 Jun 2024 14:55:43 +0800 Subject: [PATCH 1/7] task ai sub Former-commit-id: 33de4df1da4b3a2d67151112d2d30dd1cd1c6f2a --- .../logic/inference/imageinferencelogic.go | 29 +++++- pkg/models/taskaisubmodel.go | 29 ++++++ pkg/models/taskaisubmodel_gen.go | 88 +++++++++++++++++++ 3 files changed, 142 insertions(+), 4 deletions(-) create mode 100644 pkg/models/taskaisubmodel.go create mode 100644 pkg/models/taskaisubmodel_gen.go diff --git a/api/internal/logic/inference/imageinferencelogic.go b/api/internal/logic/inference/imageinferencelogic.go index 2a06f78d..3eec7d69 100644 --- a/api/internal/logic/inference/imageinferencelogic.go +++ b/api/internal/logic/inference/imageinferencelogic.go @@ -114,6 +114,9 @@ func (l *ImageInferenceLogic) ImageInfer(r *http.Request, req *types.ImageInfere return resp, nil } +var acs []*strategy.AssignedCluster +var aiTaskList []*models.TaskAi + func infer(opt *option.InferOption, clusters []*strategy.AssignedCluster, ts []struct { imageResult *types.ImageResult file multipart.File @@ -211,7 +214,6 @@ func infer(opt *option.InferOption, clusters []*strategy.AssignedCluster, ts []s cs = append(cs, s) } - var aiTaskList []*models.TaskAi tx := svcCtx.DbEngin.Raw("select * from task_ai where `task_id` = ? ", id).Scan(&aiTaskList) if tx.Error != nil { return nil, tx.Error @@ -219,7 +221,6 @@ func infer(opt *option.InferOption, clusters []*strategy.AssignedCluster, ts []s } //change cluster status if len(clusters) != len(cs) { - var acs []*strategy.AssignedCluster for _, cluster := range clusters { if contains(cs, cluster.ClusterId) { continue @@ -261,7 +262,7 @@ func infer(opt *option.InferOption, clusters []*strategy.AssignedCluster, ts []s imageNumIdx = imageNumIdx + c.imageNum wg.Add(len(new_images)) - go sendInferReq(new_images, c, &wg, result_ch) + go sendInferReq(new_images, c, &wg, *svcCtx, result_ch) } wg.Wait() close(result_ch) @@ -297,7 +298,7 @@ func sendInferReq(images []struct { clusterId string clusterName string imageNum int32 -}, wg *sync.WaitGroup, ch chan<- *types.ImageResult) { +}, wg *sync.WaitGroup, svcCtx svc.ServiceContext, ch chan<- *types.ImageResult) { for _, image := range images { go func(t struct { imageResult *types.ImageResult @@ -339,7 +340,27 @@ func sendInferReq(images []struct { t.imageResult.ImageResult = r t.imageResult.ClusterName = c.clusterName t.imageResult.Card = c.urls[idx].Card + for _, ac := range acs { + for _, task := range aiTaskList { + if ac.ClusterId == strconv.Itoa(int(task.ClusterId)) && ac.ClusterId == t.imageResult.ClusterId { + taskAiSub := &models.TaskAiSub{ + Id: task.Id, + ImageName: t.imageResult.ImageName, + Result: t.imageResult.ImageResult, + Card: t.imageResult.Card, + ClusterId: task.ClusterId, + ClusterName: t.imageResult.ClusterName, + } + tx := svcCtx.DbEngin.Save(&taskAiSub) + if tx.Error != nil { + logx.Errorf(err.Error()) + } + } + continue + } + continue + } ch <- t.imageResult wg.Done() return diff --git a/pkg/models/taskaisubmodel.go b/pkg/models/taskaisubmodel.go new file mode 100644 index 00000000..207772c6 --- /dev/null +++ b/pkg/models/taskaisubmodel.go @@ -0,0 +1,29 @@ +package models + +import "github.com/zeromicro/go-zero/core/stores/sqlx" + +var _ TaskAiSubModel = (*customTaskAiSubModel)(nil) + +type ( + // TaskAiSubModel is an interface to be customized, add more methods here, + // and implement the added methods in customTaskAiSubModel. + TaskAiSubModel interface { + taskAiSubModel + withSession(session sqlx.Session) TaskAiSubModel + } + + customTaskAiSubModel struct { + *defaultTaskAiSubModel + } +) + +// NewTaskAiSubModel returns a model for the database table. +func NewTaskAiSubModel(conn sqlx.SqlConn) TaskAiSubModel { + return &customTaskAiSubModel{ + defaultTaskAiSubModel: newTaskAiSubModel(conn), + } +} + +func (m *customTaskAiSubModel) withSession(session sqlx.Session) TaskAiSubModel { + return NewTaskAiSubModel(sqlx.NewSqlConnFromSession(session)) +} diff --git a/pkg/models/taskaisubmodel_gen.go b/pkg/models/taskaisubmodel_gen.go new file mode 100644 index 00000000..53e48575 --- /dev/null +++ b/pkg/models/taskaisubmodel_gen.go @@ -0,0 +1,88 @@ +// Code generated by goctl. DO NOT EDIT. + +package models + +import ( + "context" + "database/sql" + "fmt" + "strings" + + "github.com/zeromicro/go-zero/core/stores/builder" + "github.com/zeromicro/go-zero/core/stores/sqlc" + "github.com/zeromicro/go-zero/core/stores/sqlx" + "github.com/zeromicro/go-zero/core/stringx" +) + +var ( + taskAiSubFieldNames = builder.RawFieldNames(&TaskAiSub{}) + taskAiSubRows = strings.Join(taskAiSubFieldNames, ",") + taskAiSubRowsExpectAutoSet = strings.Join(stringx.Remove(taskAiSubFieldNames, "`id`", "`create_at`", "`create_time`", "`created_at`", "`update_at`", "`update_time`", "`updated_at`"), ",") + taskAiSubRowsWithPlaceHolder = strings.Join(stringx.Remove(taskAiSubFieldNames, "`id`", "`create_at`", "`create_time`", "`created_at`", "`update_at`", "`update_time`", "`updated_at`"), "=?,") + "=?" +) + +type ( + taskAiSubModel interface { + Insert(ctx context.Context, data *TaskAiSub) (sql.Result, error) + FindOne(ctx context.Context, id int64) (*TaskAiSub, error) + Update(ctx context.Context, data *TaskAiSub) error + Delete(ctx context.Context, id int64) error + } + + defaultTaskAiSubModel struct { + conn sqlx.SqlConn + table string + } + + TaskAiSub struct { + Id int64 `db:"id"` // id + ImageName string `db:"image_name"` // 图片名称 + Result string `db:"result"` // 识别结果 + Card string `db:"card"` // 加速卡 + ClusterId int64 `db:"cluster_id"` // 集群id + ClusterName string `db:"cluster_name"` // 集群名称 + } +) + +func newTaskAiSubModel(conn sqlx.SqlConn) *defaultTaskAiSubModel { + return &defaultTaskAiSubModel{ + conn: conn, + table: "`task_ai_sub`", + } +} + +func (m *defaultTaskAiSubModel) Delete(ctx context.Context, id int64) error { + query := fmt.Sprintf("delete from %s where `id` = ?", m.table) + _, err := m.conn.ExecCtx(ctx, query, id) + return err +} + +func (m *defaultTaskAiSubModel) FindOne(ctx context.Context, id int64) (*TaskAiSub, error) { + query := fmt.Sprintf("select %s from %s where `id` = ? limit 1", taskAiSubRows, m.table) + var resp TaskAiSub + err := m.conn.QueryRowCtx(ctx, &resp, query, id) + switch err { + case nil: + return &resp, nil + case sqlc.ErrNotFound: + return nil, ErrNotFound + default: + return nil, err + } +} + +func (m *defaultTaskAiSubModel) Insert(ctx context.Context, data *TaskAiSub) (sql.Result, error) { + query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?)", m.table, taskAiSubRowsExpectAutoSet) + ret, err := m.conn.ExecCtx(ctx, query, data.ImageName, data.Result, data.Card, data.ClusterId, data.ClusterName) + return ret, err +} + +func (m *defaultTaskAiSubModel) Update(ctx context.Context, data *TaskAiSub) error { + query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, taskAiSubRowsWithPlaceHolder) + _, err := m.conn.ExecCtx(ctx, query, data.ImageName, data.Result, data.Card, data.ClusterId, data.ClusterName, data.Id) + return err +} + +func (m *defaultTaskAiSubModel) tableName() string { + return m.table +} From d6e6176294a71666ea682eaf624e15500cfde6a5 Mon Sep 17 00:00:00 2001 From: qiwang <1364512070@qq.com> Date: Sat, 22 Jun 2024 15:04:02 +0800 Subject: [PATCH 2/7] fix: update modelarts image 0622 Former-commit-id: fb842f96900b886481abeeb6c76d6f360792683e --- .../logic/inference/imageinferencelogic.go | 109 +++++++++++++++++- 1 file changed, 105 insertions(+), 4 deletions(-) diff --git a/api/internal/logic/inference/imageinferencelogic.go b/api/internal/logic/inference/imageinferencelogic.go index 30567d8b..0eac524d 100644 --- a/api/internal/logic/inference/imageinferencelogic.go +++ b/api/internal/logic/inference/imageinferencelogic.go @@ -1,8 +1,13 @@ package inference +import "C" import ( + "APIGW-go-sdk/core" + "bytes" "context" + "crypto/tls" "errors" + "fmt" "github.com/go-resty/resty/v2" "github.com/zeromicro/go-zero/core/logx" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" @@ -12,6 +17,9 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" + "io" + "k8s.io/apimachinery/pkg/util/json" + "log" "math/rand" "mime/multipart" "net/http" @@ -334,7 +342,7 @@ func sendInferReq(images []struct { imageNum int32 }) { if len(c.urls) == 1 { - r, err := getInferResult(c.urls[0].Url, t.file, t.imageResult.ImageName) + r, err := getInferResult(c.urls[0].Url, t.file, t.imageResult.ImageName, c.clusterName) if err != nil { t.imageResult.ImageResult = err.Error() t.imageResult.ClusterName = c.clusterName @@ -352,7 +360,7 @@ func sendInferReq(images []struct { return } else { idx := rand.Intn(len(c.urls)) - r, err := getInferResult(c.urls[idx].Url, t.file, t.imageResult.ImageName) + r, err := getInferResult(c.urls[idx].Url, t.file, t.imageResult.ImageName, c.clusterName) if err != nil { t.imageResult.ImageResult = err.Error() t.imageResult.ClusterName = c.clusterName @@ -373,20 +381,113 @@ func sendInferReq(images []struct { } } -func getInferResult(url string, file multipart.File, fileName string) (string, error) { +func getInferResult(url string, file multipart.File, fileName string, clusterName string) (string, error) { + if clusterName == "鹏城云脑II-modelarts" { + r, err := getInferResultModelarts(url, file, fileName) + if err != nil { + return "", err + } + return r, nil + } var res Res req := GetRestyRequest(10) _, err := req. SetFileReader("file", fileName, file). SetResult(&res). Post(url) - if err != nil { return "", err } return res.Result, nil } +func getInferResultModelarts(url string, file multipart.File, fileName string) (string, error) { + var res Res + body, err := SendRequest("POST", url, file, fileName) + if err != nil { + return "", err + } + errjson := json.Unmarshal([]byte(body), &res) + if errjson != nil { + log.Fatalf("Error parsing JSON: %s", errjson) + } + + return res.Result, nil +} + +// SignClient AK/SK签名认证 +func SignClient(r *http.Request, writer *multipart.Writer) (*http.Client, error) { + r.Header.Add("content-type", "application/json;charset=UTF-8") + r.Header.Add("X-Project-Id", "d18190e28e3f45a281ef0b0696ec9d52") + r.Header.Add("x-stage", "RELEASE") + r.Header.Add("x-sdk-content-sha256", "UNSIGNED-PAYLOAD") + r.Header.Set("Content-Type", writer.FormDataContentType()) + s := core.Signer{ + Key: "UNEHPHO4Z7YSNPKRXFE4", + Secret: "JWXCE9qcYbc7RjpSRIWt4WgG3ZKF6Q4lPzkJReX9", + } + err := s.Sign(r) + if err != nil { + return nil, err + } + + //设置client信任所有证书 + tr := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + } + client := &http.Client{ + Transport: tr, + } + return client, nil +} + +func SendRequest(method, url string, file multipart.File, fileName string) (string, error) { + /*body := &bytes.Buffer{} + writer := multipart.NewWriter(body)*/ + // 创建一个新的缓冲区以写入multipart表单 + var body bytes.Buffer + // 创建一个新的multipart writer + writer := multipart.NewWriter(&body) + // 创建一个用于写入文件的表单字段 + part, err := writer.CreateFormFile("file", fileName) // "file"是表单的字段名,第二个参数是文件名 + if err != nil { + fmt.Println("Error creating form file:", err) + } + // 将文件的内容拷贝到multipart writer中 + _, err = io.Copy(part, file) + if err != nil { + fmt.Println("Error copying file data:", err) + + } + err = writer.Close() + if err != nil { + fmt.Println("Error closing multipart writer:", err) + } + request, err := http.NewRequest(method, "https://modelarts-inference.cloudbrain2.pcl.ac.cn/v1/infers/fb0f011f-3e74-4396-ab81-20d65525d22b/image", &body) + if err != nil { + fmt.Println("Error creating new request:", err) + //return nil, err + } + signedR, err := SignClient(request, writer) + if err != nil { + fmt.Println("Error signing request:", err) + //return nil, err + } + + res, err := signedR.Do(request) + if err != nil { + fmt.Println("Error sending request:", err) + //return nil, err + } + defer res.Body.Close() + Resbody, err := io.ReadAll(res.Body) + if err != nil { + fmt.Println("Error reading response body:", err) + //return nil, err + } + return string(Resbody), nil +} + func GetRestyRequest(timeoutSeconds int64) *resty.Request { client := resty.New().SetTimeout(time.Duration(timeoutSeconds) * time.Second) request := client.R() From b626a9a185d030f3cff788e0e33d73ba7b502522 Mon Sep 17 00:00:00 2001 From: zhouqunjie Date: Sat, 22 Jun 2024 15:10:09 +0800 Subject: [PATCH 3/7] Revert "task ai sub" This reverts commit 7ae6282867afec6e63cd50fa9a42376b1e152dfa [formerly 33de4df1da4b3a2d67151112d2d30dd1cd1c6f2a]. Former-commit-id: b34a6c977c1bbbbb97e2ab63754983b8906222ce --- .../logic/inference/imageinferencelogic.go | 29 +----- pkg/models/taskaisubmodel.go | 29 ------ pkg/models/taskaisubmodel_gen.go | 88 ------------------- 3 files changed, 4 insertions(+), 142 deletions(-) delete mode 100644 pkg/models/taskaisubmodel.go delete mode 100644 pkg/models/taskaisubmodel_gen.go diff --git a/api/internal/logic/inference/imageinferencelogic.go b/api/internal/logic/inference/imageinferencelogic.go index 3eec7d69..2a06f78d 100644 --- a/api/internal/logic/inference/imageinferencelogic.go +++ b/api/internal/logic/inference/imageinferencelogic.go @@ -114,9 +114,6 @@ func (l *ImageInferenceLogic) ImageInfer(r *http.Request, req *types.ImageInfere return resp, nil } -var acs []*strategy.AssignedCluster -var aiTaskList []*models.TaskAi - func infer(opt *option.InferOption, clusters []*strategy.AssignedCluster, ts []struct { imageResult *types.ImageResult file multipart.File @@ -214,6 +211,7 @@ func infer(opt *option.InferOption, clusters []*strategy.AssignedCluster, ts []s cs = append(cs, s) } + var aiTaskList []*models.TaskAi tx := svcCtx.DbEngin.Raw("select * from task_ai where `task_id` = ? ", id).Scan(&aiTaskList) if tx.Error != nil { return nil, tx.Error @@ -221,6 +219,7 @@ func infer(opt *option.InferOption, clusters []*strategy.AssignedCluster, ts []s } //change cluster status if len(clusters) != len(cs) { + var acs []*strategy.AssignedCluster for _, cluster := range clusters { if contains(cs, cluster.ClusterId) { continue @@ -262,7 +261,7 @@ func infer(opt *option.InferOption, clusters []*strategy.AssignedCluster, ts []s imageNumIdx = imageNumIdx + c.imageNum wg.Add(len(new_images)) - go sendInferReq(new_images, c, &wg, *svcCtx, result_ch) + go sendInferReq(new_images, c, &wg, result_ch) } wg.Wait() close(result_ch) @@ -298,7 +297,7 @@ func sendInferReq(images []struct { clusterId string clusterName string imageNum int32 -}, wg *sync.WaitGroup, svcCtx svc.ServiceContext, ch chan<- *types.ImageResult) { +}, wg *sync.WaitGroup, ch chan<- *types.ImageResult) { for _, image := range images { go func(t struct { imageResult *types.ImageResult @@ -340,27 +339,7 @@ func sendInferReq(images []struct { t.imageResult.ImageResult = r t.imageResult.ClusterName = c.clusterName t.imageResult.Card = c.urls[idx].Card - for _, ac := range acs { - for _, task := range aiTaskList { - if ac.ClusterId == strconv.Itoa(int(task.ClusterId)) && ac.ClusterId == t.imageResult.ClusterId { - taskAiSub := &models.TaskAiSub{ - Id: task.Id, - ImageName: t.imageResult.ImageName, - Result: t.imageResult.ImageResult, - Card: t.imageResult.Card, - ClusterId: task.ClusterId, - ClusterName: t.imageResult.ClusterName, - } - tx := svcCtx.DbEngin.Save(&taskAiSub) - if tx.Error != nil { - logx.Errorf(err.Error()) - } - } - continue - } - continue - } ch <- t.imageResult wg.Done() return diff --git a/pkg/models/taskaisubmodel.go b/pkg/models/taskaisubmodel.go deleted file mode 100644 index 207772c6..00000000 --- a/pkg/models/taskaisubmodel.go +++ /dev/null @@ -1,29 +0,0 @@ -package models - -import "github.com/zeromicro/go-zero/core/stores/sqlx" - -var _ TaskAiSubModel = (*customTaskAiSubModel)(nil) - -type ( - // TaskAiSubModel is an interface to be customized, add more methods here, - // and implement the added methods in customTaskAiSubModel. - TaskAiSubModel interface { - taskAiSubModel - withSession(session sqlx.Session) TaskAiSubModel - } - - customTaskAiSubModel struct { - *defaultTaskAiSubModel - } -) - -// NewTaskAiSubModel returns a model for the database table. -func NewTaskAiSubModel(conn sqlx.SqlConn) TaskAiSubModel { - return &customTaskAiSubModel{ - defaultTaskAiSubModel: newTaskAiSubModel(conn), - } -} - -func (m *customTaskAiSubModel) withSession(session sqlx.Session) TaskAiSubModel { - return NewTaskAiSubModel(sqlx.NewSqlConnFromSession(session)) -} diff --git a/pkg/models/taskaisubmodel_gen.go b/pkg/models/taskaisubmodel_gen.go deleted file mode 100644 index 53e48575..00000000 --- a/pkg/models/taskaisubmodel_gen.go +++ /dev/null @@ -1,88 +0,0 @@ -// Code generated by goctl. DO NOT EDIT. - -package models - -import ( - "context" - "database/sql" - "fmt" - "strings" - - "github.com/zeromicro/go-zero/core/stores/builder" - "github.com/zeromicro/go-zero/core/stores/sqlc" - "github.com/zeromicro/go-zero/core/stores/sqlx" - "github.com/zeromicro/go-zero/core/stringx" -) - -var ( - taskAiSubFieldNames = builder.RawFieldNames(&TaskAiSub{}) - taskAiSubRows = strings.Join(taskAiSubFieldNames, ",") - taskAiSubRowsExpectAutoSet = strings.Join(stringx.Remove(taskAiSubFieldNames, "`id`", "`create_at`", "`create_time`", "`created_at`", "`update_at`", "`update_time`", "`updated_at`"), ",") - taskAiSubRowsWithPlaceHolder = strings.Join(stringx.Remove(taskAiSubFieldNames, "`id`", "`create_at`", "`create_time`", "`created_at`", "`update_at`", "`update_time`", "`updated_at`"), "=?,") + "=?" -) - -type ( - taskAiSubModel interface { - Insert(ctx context.Context, data *TaskAiSub) (sql.Result, error) - FindOne(ctx context.Context, id int64) (*TaskAiSub, error) - Update(ctx context.Context, data *TaskAiSub) error - Delete(ctx context.Context, id int64) error - } - - defaultTaskAiSubModel struct { - conn sqlx.SqlConn - table string - } - - TaskAiSub struct { - Id int64 `db:"id"` // id - ImageName string `db:"image_name"` // 图片名称 - Result string `db:"result"` // 识别结果 - Card string `db:"card"` // 加速卡 - ClusterId int64 `db:"cluster_id"` // 集群id - ClusterName string `db:"cluster_name"` // 集群名称 - } -) - -func newTaskAiSubModel(conn sqlx.SqlConn) *defaultTaskAiSubModel { - return &defaultTaskAiSubModel{ - conn: conn, - table: "`task_ai_sub`", - } -} - -func (m *defaultTaskAiSubModel) Delete(ctx context.Context, id int64) error { - query := fmt.Sprintf("delete from %s where `id` = ?", m.table) - _, err := m.conn.ExecCtx(ctx, query, id) - return err -} - -func (m *defaultTaskAiSubModel) FindOne(ctx context.Context, id int64) (*TaskAiSub, error) { - query := fmt.Sprintf("select %s from %s where `id` = ? limit 1", taskAiSubRows, m.table) - var resp TaskAiSub - err := m.conn.QueryRowCtx(ctx, &resp, query, id) - switch err { - case nil: - return &resp, nil - case sqlc.ErrNotFound: - return nil, ErrNotFound - default: - return nil, err - } -} - -func (m *defaultTaskAiSubModel) Insert(ctx context.Context, data *TaskAiSub) (sql.Result, error) { - query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?)", m.table, taskAiSubRowsExpectAutoSet) - ret, err := m.conn.ExecCtx(ctx, query, data.ImageName, data.Result, data.Card, data.ClusterId, data.ClusterName) - return ret, err -} - -func (m *defaultTaskAiSubModel) Update(ctx context.Context, data *TaskAiSub) error { - query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, taskAiSubRowsWithPlaceHolder) - _, err := m.conn.ExecCtx(ctx, query, data.ImageName, data.Result, data.Card, data.ClusterId, data.ClusterName, data.Id) - return err -} - -func (m *defaultTaskAiSubModel) tableName() string { - return m.table -} From 3f32ddb1f689822a5f0be260081f8f0f745effa0 Mon Sep 17 00:00:00 2001 From: zhouqunjie Date: Sat, 22 Jun 2024 15:20:45 +0800 Subject: [PATCH 4/7] task ai sub Former-commit-id: 044290e63a82193bcb65c66d1948f76bc6302b2f --- api/desc/inference/inference.api | 11 ++++ api/internal/types/types.go | 11 ++++ pkg/models/taskaisubmodel.go | 29 +++++++++++ pkg/models/taskaisubmodel_gen.go | 88 ++++++++++++++++++++++++++++++++ 4 files changed, 139 insertions(+) create mode 100644 pkg/models/taskaisubmodel.go create mode 100644 pkg/models/taskaisubmodel_gen.go diff --git a/api/desc/inference/inference.api b/api/desc/inference/inference.api index 86b3ad0e..e7a04c83 100644 --- a/api/desc/inference/inference.api +++ b/api/desc/inference/inference.api @@ -44,4 +44,15 @@ type ( ImageResult string `json:"imageResult"` } + InferenceTaskDetailReq{ + aiTaskId int64 `json:"aiTaskId"` + } + + InferenceTaskDetailResp{ + imageName string `json:"imageName"` + result string `json:"result"` + card string `json:"card"` + clusterName string `json:"clusterName"` + } + ) diff --git a/api/internal/types/types.go b/api/internal/types/types.go index 5aea737a..1e632bd3 100644 --- a/api/internal/types/types.go +++ b/api/internal/types/types.go @@ -5919,3 +5919,14 @@ type ImageResult struct { Card string `json:"card"` ImageResult string `json:"imageResult"` } + +type InferenceTaskDetailReq struct { + AiTaskId int64 `json:"aiTaskId"` +} + +type InferenceTaskDetailResp struct { + ImageName string `json:"imageName"` + Result string `json:"result"` + Card string `json:"card"` + ClusterName string `json:"clusterName"` +} diff --git a/pkg/models/taskaisubmodel.go b/pkg/models/taskaisubmodel.go new file mode 100644 index 00000000..207772c6 --- /dev/null +++ b/pkg/models/taskaisubmodel.go @@ -0,0 +1,29 @@ +package models + +import "github.com/zeromicro/go-zero/core/stores/sqlx" + +var _ TaskAiSubModel = (*customTaskAiSubModel)(nil) + +type ( + // TaskAiSubModel is an interface to be customized, add more methods here, + // and implement the added methods in customTaskAiSubModel. + TaskAiSubModel interface { + taskAiSubModel + withSession(session sqlx.Session) TaskAiSubModel + } + + customTaskAiSubModel struct { + *defaultTaskAiSubModel + } +) + +// NewTaskAiSubModel returns a model for the database table. +func NewTaskAiSubModel(conn sqlx.SqlConn) TaskAiSubModel { + return &customTaskAiSubModel{ + defaultTaskAiSubModel: newTaskAiSubModel(conn), + } +} + +func (m *customTaskAiSubModel) withSession(session sqlx.Session) TaskAiSubModel { + return NewTaskAiSubModel(sqlx.NewSqlConnFromSession(session)) +} diff --git a/pkg/models/taskaisubmodel_gen.go b/pkg/models/taskaisubmodel_gen.go new file mode 100644 index 00000000..42ff3ed0 --- /dev/null +++ b/pkg/models/taskaisubmodel_gen.go @@ -0,0 +1,88 @@ +// Code generated by goctl. DO NOT EDIT. + +package models + +import ( + "context" + "database/sql" + "fmt" + "strings" + + "github.com/zeromicro/go-zero/core/stores/builder" + "github.com/zeromicro/go-zero/core/stores/sqlc" + "github.com/zeromicro/go-zero/core/stores/sqlx" + "github.com/zeromicro/go-zero/core/stringx" +) + +var ( + taskAiSubFieldNames = builder.RawFieldNames(&TaskAiSub{}) + taskAiSubRows = strings.Join(taskAiSubFieldNames, ",") + taskAiSubRowsExpectAutoSet = strings.Join(stringx.Remove(taskAiSubFieldNames, "`id`", "`create_at`", "`create_time`", "`created_at`", "`update_at`", "`update_time`", "`updated_at`"), ",") + taskAiSubRowsWithPlaceHolder = strings.Join(stringx.Remove(taskAiSubFieldNames, "`id`", "`create_at`", "`create_time`", "`created_at`", "`update_at`", "`update_time`", "`updated_at`"), "=?,") + "=?" +) + +type ( + taskAiSubModel interface { + Insert(ctx context.Context, data *TaskAiSub) (sql.Result, error) + FindOne(ctx context.Context, id int64) (*TaskAiSub, error) + Update(ctx context.Context, data *TaskAiSub) error + Delete(ctx context.Context, id int64) error + } + + defaultTaskAiSubModel struct { + conn sqlx.SqlConn + table string + } + + TaskAiSub struct { + Id int64 `db:"id"` // id + ImageName string `db:"image_name"` // 图片名称 + Result string `db:"result"` // 识别结果 + Card sql.NullString `db:"card"` // 加速卡 + ClusterId int64 `db:"cluster_id"` // 集群id + ClusterName sql.NullString `db:"cluster_name"` // 集群名称 + } +) + +func newTaskAiSubModel(conn sqlx.SqlConn) *defaultTaskAiSubModel { + return &defaultTaskAiSubModel{ + conn: conn, + table: "`task_ai_sub`", + } +} + +func (m *defaultTaskAiSubModel) Delete(ctx context.Context, id int64) error { + query := fmt.Sprintf("delete from %s where `id` = ?", m.table) + _, err := m.conn.ExecCtx(ctx, query, id) + return err +} + +func (m *defaultTaskAiSubModel) FindOne(ctx context.Context, id int64) (*TaskAiSub, error) { + query := fmt.Sprintf("select %s from %s where `id` = ? limit 1", taskAiSubRows, m.table) + var resp TaskAiSub + err := m.conn.QueryRowCtx(ctx, &resp, query, id) + switch err { + case nil: + return &resp, nil + case sqlc.ErrNotFound: + return nil, ErrNotFound + default: + return nil, err + } +} + +func (m *defaultTaskAiSubModel) Insert(ctx context.Context, data *TaskAiSub) (sql.Result, error) { + query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?)", m.table, taskAiSubRowsExpectAutoSet) + ret, err := m.conn.ExecCtx(ctx, query, data.ImageName, data.Result, data.Card, data.ClusterId, data.ClusterName) + return ret, err +} + +func (m *defaultTaskAiSubModel) Update(ctx context.Context, data *TaskAiSub) error { + query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, taskAiSubRowsWithPlaceHolder) + _, err := m.conn.ExecCtx(ctx, query, data.ImageName, data.Result, data.Card, data.ClusterId, data.ClusterName, data.Id) + return err +} + +func (m *defaultTaskAiSubModel) tableName() string { + return m.table +} From 21f591939647c506a378917524cfbd54971e8a07 Mon Sep 17 00:00:00 2001 From: jagger Date: Sat, 22 Jun 2024 15:26:17 +0800 Subject: [PATCH 5/7] fix bug Signed-off-by: jagger Former-commit-id: 1178ca75f5121fac9b359c005d8e2a34da5dca00 --- .../logic/inference/imageinferencelogic.go | 2 +- go.mod | 1 + go.sum | 2 + .../JCCE-nudt/apigw-go-sdk/core/escape.go | 42 ++++ .../JCCE-nudt/apigw-go-sdk/core/signer.go | 201 ++++++++++++++++++ vendor/modules.txt | 3 + 6 files changed, 250 insertions(+), 1 deletion(-) create mode 100644 vendor/github.com/JCCE-nudt/apigw-go-sdk/core/escape.go create mode 100644 vendor/github.com/JCCE-nudt/apigw-go-sdk/core/signer.go diff --git a/api/internal/logic/inference/imageinferencelogic.go b/api/internal/logic/inference/imageinferencelogic.go index 0eac524d..f7e1f1f9 100644 --- a/api/internal/logic/inference/imageinferencelogic.go +++ b/api/internal/logic/inference/imageinferencelogic.go @@ -2,12 +2,12 @@ package inference import "C" import ( - "APIGW-go-sdk/core" "bytes" "context" "crypto/tls" "errors" "fmt" + "github.com/JCCE-nudt/apigw-go-sdk/core" "github.com/go-resty/resty/v2" "github.com/zeromicro/go-zero/core/logx" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" diff --git a/go.mod b/go.mod index b6b42b91..8899453d 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ toolchain go1.22.4 retract v0.1.20-0.20240319015239-6ae13da05255 require ( + github.com/JCCE-nudt/apigw-go-sdk v0.0.0-20230525025609-34159d6f2818 github.com/Masterminds/squirrel v1.5.4 github.com/bwmarrin/snowflake v0.3.0 github.com/ghodss/yaml v1.0.0 diff --git a/go.sum b/go.sum index 57f21dc6..56a21131 100644 --- a/go.sum +++ b/go.sum @@ -38,6 +38,8 @@ github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU= github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU= github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= +github.com/JCCE-nudt/apigw-go-sdk v0.0.0-20230525025609-34159d6f2818 h1:QLulhUyxPDs9FFieVZwmKAnUBLeRDhsVNehotAAL/FE= +github.com/JCCE-nudt/apigw-go-sdk v0.0.0-20230525025609-34159d6f2818/go.mod h1:j+am5/1URgsvyhOAyURFR9vH3malaW7Tq6d33OyPsnM= github.com/Masterminds/squirrel v1.5.4 h1:uUcX/aBc8O7Fg9kaISIUsHXdKuqehiXAMQTYX8afzqM= github.com/Masterminds/squirrel v1.5.4/go.mod h1:NNaOrjSoIDfDA40n7sr2tPNZRfjzjA400rg+riTZj10= github.com/alecthomas/kingpin/v2 v2.4.0 h1:f48lwail6p8zpO1bC4TxtqACaGqHYA22qkHjHpqDjYY= diff --git a/vendor/github.com/JCCE-nudt/apigw-go-sdk/core/escape.go b/vendor/github.com/JCCE-nudt/apigw-go-sdk/core/escape.go new file mode 100644 index 00000000..e8c76b8a --- /dev/null +++ b/vendor/github.com/JCCE-nudt/apigw-go-sdk/core/escape.go @@ -0,0 +1,42 @@ +// based on https://github.com/golang/go/blob/master/src/net/url/url.go +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package core + +func shouldEscape(c byte) bool { + if 'A' <= c && c <= 'Z' || 'a' <= c && c <= 'z' || '0' <= c && c <= '9' || c == '_' || c == '-' || c == '~' || c == '.' { + return false + } + return true +} +func escape(s string) string { + hexCount := 0 + for i := 0; i < len(s); i++ { + c := s[i] + if shouldEscape(c) { + hexCount++ + } + } + + if hexCount == 0 { + return s + } + + t := make([]byte, len(s)+2*hexCount) + j := 0 + for i := 0; i < len(s); i++ { + switch c := s[i]; { + case shouldEscape(c): + t[j] = '%' + t[j+1] = "0123456789ABCDEF"[c>>4] + t[j+2] = "0123456789ABCDEF"[c&15] + j += 3 + default: + t[j] = s[i] + j++ + } + } + return string(t) +} diff --git a/vendor/github.com/JCCE-nudt/apigw-go-sdk/core/signer.go b/vendor/github.com/JCCE-nudt/apigw-go-sdk/core/signer.go new file mode 100644 index 00000000..e9b6f76b --- /dev/null +++ b/vendor/github.com/JCCE-nudt/apigw-go-sdk/core/signer.go @@ -0,0 +1,201 @@ +// HWS API Gateway Signature +// based on https://github.com/datastream/aws/blob/master/signv4.go +// Copyright (c) 2014, Xianjie + +package core + +import ( + "bytes" + "crypto/hmac" + "crypto/sha256" + + "fmt" + "io/ioutil" + "net/http" + "sort" + "strings" + "time" +) + +const ( + DateFormat = "20060102T150405Z" + SignAlgorithm = "SDK-HMAC-SHA256" + HeaderXDateTime = "X-Sdk-Date" + HeaderXHost = "host" + HeaderXAuthorization = "Authorization" + HeaderXContentSha256 = "X-Sdk-Content-Sha256" +) + +func hmacsha256(keyByte []byte, dataStr string) ([]byte, error) { + hm := hmac.New(sha256.New, []byte(keyByte)) + if _, err := hm.Write([]byte(dataStr)); err != nil { + return nil, err + } + return hm.Sum(nil), nil +} + +// Build a CanonicalRequest from a regular request string +func CanonicalRequest(request *http.Request, signedHeaders []string) (string, error) { + var hexencode string + var err error + if hex := request.Header.Get(HeaderXContentSha256); hex != "" { + hexencode = hex + } else { + bodyData, err := RequestPayload(request) + if err != nil { + return "", err + } + hexencode, err = HexEncodeSHA256Hash(bodyData) + if err != nil { + return "", err + } + } + return fmt.Sprintf("%s\n%s\n%s\n%s\n%s\n%s", request.Method, CanonicalURI(request), CanonicalQueryString(request), CanonicalHeaders(request, signedHeaders), strings.Join(signedHeaders, ";"), hexencode), err +} + +// CanonicalURI returns request uri +func CanonicalURI(request *http.Request) string { + pattens := strings.Split(request.URL.Path, "/") + var uriSlice []string + for _, v := range pattens { + uriSlice = append(uriSlice, escape(v)) + } + urlpath := strings.Join(uriSlice, "/") + if len(urlpath) == 0 || urlpath[len(urlpath)-1] != '/' { + urlpath = urlpath + "/" + } + return urlpath +} + +// CanonicalQueryString +func CanonicalQueryString(request *http.Request) string { + var keys []string + queryMap := request.URL.Query() + for key := range queryMap { + keys = append(keys, key) + } + sort.Strings(keys) + var query []string + for _, key := range keys { + k := escape(key) + sort.Strings(queryMap[key]) + for _, v := range queryMap[key] { + kv := fmt.Sprintf("%s=%s", k, escape(v)) + query = append(query, kv) + } + } + queryStr := strings.Join(query, "&") + request.URL.RawQuery = queryStr + return queryStr +} + +// CanonicalHeaders +func CanonicalHeaders(request *http.Request, signerHeaders []string) string { + var canonicalHeaders []string + header := make(map[string][]string) + for k, v := range request.Header { + header[strings.ToLower(k)] = v + } + for _, key := range signerHeaders { + value := header[key] + if strings.EqualFold(key, HeaderXHost) { + value = []string{request.Host} + } + sort.Strings(value) + for _, v := range value { + canonicalHeaders = append(canonicalHeaders, key+":"+strings.TrimSpace(v)) + } + } + return fmt.Sprintf("%s\n", strings.Join(canonicalHeaders, "\n")) +} + +// SignedHeaders +func SignedHeaders(r *http.Request) []string { + var signedHeaders []string + for key := range r.Header { + signedHeaders = append(signedHeaders, strings.ToLower(key)) + } + sort.Strings(signedHeaders) + return signedHeaders +} + +// RequestPayload +func RequestPayload(request *http.Request) ([]byte, error) { + if request.Body == nil { + return []byte(""), nil + } + bodyByte, err := ioutil.ReadAll(request.Body) + if err != nil { + return []byte(""), err + } + request.Body = ioutil.NopCloser(bytes.NewBuffer(bodyByte)) + return bodyByte, err +} + +// Create a "String to Sign". +func StringToSign(canonicalRequest string, t time.Time) (string, error) { + hashStruct := sha256.New() + _, err := hashStruct.Write([]byte(canonicalRequest)) + if err != nil { + return "", err + } + return fmt.Sprintf("%s\n%s\n%x", + SignAlgorithm, t.UTC().Format(DateFormat), hashStruct.Sum(nil)), nil +} + +// Create the HWS Signature. +func SignStringToSign(stringToSign string, signingKey []byte) (string, error) { + hmsha, err := hmacsha256(signingKey, stringToSign) + return fmt.Sprintf("%x", hmsha), err +} + +// HexEncodeSHA256Hash returns hexcode of sha256 +func HexEncodeSHA256Hash(body []byte) (string, error) { + hashStruct := sha256.New() + if len(body) == 0 { + body = []byte("") + } + _, err := hashStruct.Write(body) + return fmt.Sprintf("%x", hashStruct.Sum(nil)), err +} + +// Get the finalized value for the "Authorization" header. The signature parameter is the output from SignStringToSign +func AuthHeaderValue(signatureStr, accessKeyStr string, signedHeaders []string) string { + return fmt.Sprintf("%s Access=%s, SignedHeaders=%s, Signature=%s", SignAlgorithm, accessKeyStr, strings.Join(signedHeaders, ";"), signatureStr) +} + +// Signature HWS meta +type Signer struct { + Key string + Secret string +} + +// SignRequest set Authorization header +func (s *Signer) Sign(request *http.Request) error { + var t time.Time + var err error + var date string + if date = request.Header.Get(HeaderXDateTime); date != "" { + t, err = time.Parse(DateFormat, date) + } + if err != nil || date == "" { + t = time.Now() + request.Header.Set(HeaderXDateTime, t.UTC().Format(DateFormat)) + } + signedHeaders := SignedHeaders(request) + canonicalRequest, err := CanonicalRequest(request, signedHeaders) + if err != nil { + return err + } + stringToSignStr, err := StringToSign(canonicalRequest, t) + if err != nil { + return err + } + signatureStr, err := SignStringToSign(stringToSignStr, []byte(s.Secret)) + if err != nil { + return err + } + authValueStr := AuthHeaderValue(signatureStr, s.Key, signedHeaders) + request.Header.Set(HeaderXAuthorization, authValueStr) + return nil +} diff --git a/vendor/modules.txt b/vendor/modules.txt index c8fb2211..34d90f1a 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -2,6 +2,9 @@ ## explicit; go 1.20 filippo.io/edwards25519 filippo.io/edwards25519/field +# github.com/JCCE-nudt/apigw-go-sdk v0.0.0-20230525025609-34159d6f2818 +## explicit; go 1.17 +github.com/JCCE-nudt/apigw-go-sdk/core # github.com/Masterminds/squirrel v1.5.4 ## explicit; go 1.14 github.com/Masterminds/squirrel From 3891d1c73b404dc0696e9a9450be794510b9782c Mon Sep 17 00:00:00 2001 From: tzwang Date: Sat, 22 Jun 2024 15:38:17 +0800 Subject: [PATCH 6/7] updated imageinfer api Former-commit-id: 2f5167a75b39f49531c6ebf4e8f32354fcbd44b8 --- .../logic/inference/imageinferencelogic.go | 24 +++++++++++++++++++ pkg/models/taskaisubmodel_gen.go | 12 +++++----- 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/api/internal/logic/inference/imageinferencelogic.go b/api/internal/logic/inference/imageinferencelogic.go index 0eac524d..0504be48 100644 --- a/api/internal/logic/inference/imageinferencelogic.go +++ b/api/internal/logic/inference/imageinferencelogic.go @@ -292,6 +292,26 @@ func infer(opt *option.InferOption, clusters []*strategy.AssignedCluster, ts []s results = append(results, s) } + //save ai sub tasks + for _, r := range results { + for _, task := range aiTaskList { + if r.ClusterId == strconv.Itoa(int(task.ClusterId)) { + taskAiSub := &models.TaskAiSub{ + Id: task.Id, + ImageName: r.ImageName, + Result: r.ImageResult, + Card: r.Card, + ClusterId: task.ClusterId, + ClusterName: r.ClusterName, + } + tx := svcCtx.DbEngin.Save(&taskAiSub) + if tx.Error != nil { + logx.Errorf(err.Error()) + } + } + } + } + sort.Slice(results, func(p, q int) bool { return results[p].ClusterName < results[q].ClusterName }) @@ -345,6 +365,7 @@ func sendInferReq(images []struct { r, err := getInferResult(c.urls[0].Url, t.file, t.imageResult.ImageName, c.clusterName) if err != nil { t.imageResult.ImageResult = err.Error() + t.imageResult.ClusterId = c.clusterId t.imageResult.ClusterName = c.clusterName t.imageResult.Card = c.urls[0].Card ch <- t.imageResult @@ -352,6 +373,7 @@ func sendInferReq(images []struct { return } t.imageResult.ImageResult = r + t.imageResult.ClusterId = c.clusterId t.imageResult.ClusterName = c.clusterName t.imageResult.Card = c.urls[0].Card @@ -363,6 +385,7 @@ func sendInferReq(images []struct { r, err := getInferResult(c.urls[idx].Url, t.file, t.imageResult.ImageName, c.clusterName) if err != nil { t.imageResult.ImageResult = err.Error() + t.imageResult.ClusterId = c.clusterId t.imageResult.ClusterName = c.clusterName t.imageResult.Card = c.urls[idx].Card ch <- t.imageResult @@ -370,6 +393,7 @@ func sendInferReq(images []struct { return } t.imageResult.ImageResult = r + t.imageResult.ClusterId = c.clusterId t.imageResult.ClusterName = c.clusterName t.imageResult.Card = c.urls[idx].Card diff --git a/pkg/models/taskaisubmodel_gen.go b/pkg/models/taskaisubmodel_gen.go index 42ff3ed0..3784442e 100644 --- a/pkg/models/taskaisubmodel_gen.go +++ b/pkg/models/taskaisubmodel_gen.go @@ -35,12 +35,12 @@ type ( } TaskAiSub struct { - Id int64 `db:"id"` // id - ImageName string `db:"image_name"` // 图片名称 - Result string `db:"result"` // 识别结果 - Card sql.NullString `db:"card"` // 加速卡 - ClusterId int64 `db:"cluster_id"` // 集群id - ClusterName sql.NullString `db:"cluster_name"` // 集群名称 + Id int64 `db:"id"` // id + ImageName string `db:"image_name"` // 图片名称 + Result string `db:"result"` // 识别结果 + Card string `db:"card"` // 加速卡 + ClusterId int64 `db:"cluster_id"` // 集群id + ClusterName string `db:"cluster_name"` // 集群名称 } ) From 5757cd47a3fcfae169e4c3a4d572aaeced4c9cac Mon Sep 17 00:00:00 2001 From: qiwang <1364512070@qq.com> Date: Sat, 22 Jun 2024 15:44:11 +0800 Subject: [PATCH 7/7] fix: update modelarts access_address 0622 Former-commit-id: c913e9759a8797446ce97f99e8681fe84f01d887 --- api/internal/logic/inference/imageinferencelogic.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/internal/logic/inference/imageinferencelogic.go b/api/internal/logic/inference/imageinferencelogic.go index bf1edd9e..c8f2e247 100644 --- a/api/internal/logic/inference/imageinferencelogic.go +++ b/api/internal/logic/inference/imageinferencelogic.go @@ -487,7 +487,7 @@ func SendRequest(method, url string, file multipart.File, fileName string) (stri if err != nil { fmt.Println("Error closing multipart writer:", err) } - request, err := http.NewRequest(method, "https://modelarts-inference.cloudbrain2.pcl.ac.cn/v1/infers/fb0f011f-3e74-4396-ab81-20d65525d22b/image", &body) + request, err := http.NewRequest(method, url, &body) if err != nil { fmt.Println("Error creating new request:", err) //return nil, err