From 0f271360d2425b69198c8d254614eef82c524c71 Mon Sep 17 00:00:00 2001 From: tzwang Date: Fri, 21 Jun 2024 20:10:51 +0800 Subject: [PATCH 1/3] updated imageinfer api Former-commit-id: 2b0e2a56398b2c10fefb359b63d0b6f1fdfa5582 --- .../logic/inference/imageinferencelogic.go | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/api/internal/logic/inference/imageinferencelogic.go b/api/internal/logic/inference/imageinferencelogic.go index 2a06f78d..30567d8b 100644 --- a/api/internal/logic/inference/imageinferencelogic.go +++ b/api/internal/logic/inference/imageinferencelogic.go @@ -217,6 +217,20 @@ func infer(opt *option.InferOption, clusters []*strategy.AssignedCluster, ts []s return nil, tx.Error } + + //no cluster available + if len(cs) == 0 { + for _, t := range aiTaskList { + t.Status = constants.Failed + err := svcCtx.Scheduler.AiStorages.UpdateAiTask(t) + if err != nil { + logx.Errorf(tx.Error.Error()) + } + } + svcCtx.Scheduler.AiStorages.AddNoticeInfo(opt.AdapterId, adapterName, "", "", opt.TaskName, "failed", "任务失败") + return nil, errors.New("image infer task failed") + } + //change cluster status if len(clusters) != len(cs) { var acs []*strategy.AssignedCluster @@ -275,6 +289,7 @@ func infer(opt *option.InferOption, clusters []*strategy.AssignedCluster, ts []s }) // update succeeded cluster status + var successStatusCount int for _, c := range cs { for _, t := range aiTaskList { if c.clusterId == strconv.Itoa(int(t.ClusterId)) { @@ -283,9 +298,19 @@ func infer(opt *option.InferOption, clusters []*strategy.AssignedCluster, ts []s if err != nil { logx.Errorf(tx.Error.Error()) } + successStatusCount++ + } else { + continue } } } + + if len(cs) == successStatusCount { + svcCtx.Scheduler.AiStorages.AddNoticeInfo(opt.AdapterId, adapterName, "", "", opt.TaskName, "failed", "任务失败") + } else { + svcCtx.Scheduler.AiStorages.AddNoticeInfo(opt.AdapterId, adapterName, "", "", opt.TaskName, "completed", "任务完成") + } + return results, nil } From df733bcff29db330951c594e3731100582f27f84 Mon Sep 17 00:00:00 2001 From: jagger Date: Sat, 22 Jun 2024 10:13:14 +0800 Subject: [PATCH 2/3] Dockerfile dependent upgrade Signed-off-by: jagger Former-commit-id: a37f526225a0f7a3e4350cc6dfd3c001a630200f --- api/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/Dockerfile b/api/Dockerfile index 97053deb..3c78b3fd 100644 --- a/api/Dockerfile +++ b/api/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.21.2-alpine3.18 AS builder +FROM golang:1.22-alpine3.18 AS builder WORKDIR /app From d6e6176294a71666ea682eaf624e15500cfde6a5 Mon Sep 17 00:00:00 2001 From: qiwang <1364512070@qq.com> Date: Sat, 22 Jun 2024 15:04:02 +0800 Subject: [PATCH 3/3] fix: update modelarts image 0622 Former-commit-id: fb842f96900b886481abeeb6c76d6f360792683e --- .../logic/inference/imageinferencelogic.go | 109 +++++++++++++++++- 1 file changed, 105 insertions(+), 4 deletions(-) diff --git a/api/internal/logic/inference/imageinferencelogic.go b/api/internal/logic/inference/imageinferencelogic.go index 30567d8b..0eac524d 100644 --- a/api/internal/logic/inference/imageinferencelogic.go +++ b/api/internal/logic/inference/imageinferencelogic.go @@ -1,8 +1,13 @@ package inference +import "C" import ( + "APIGW-go-sdk/core" + "bytes" "context" + "crypto/tls" "errors" + "fmt" "github.com/go-resty/resty/v2" "github.com/zeromicro/go-zero/core/logx" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" @@ -12,6 +17,9 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" + "io" + "k8s.io/apimachinery/pkg/util/json" + "log" "math/rand" "mime/multipart" "net/http" @@ -334,7 +342,7 @@ func sendInferReq(images []struct { imageNum int32 }) { if len(c.urls) == 1 { - r, err := getInferResult(c.urls[0].Url, t.file, t.imageResult.ImageName) + r, err := getInferResult(c.urls[0].Url, t.file, t.imageResult.ImageName, c.clusterName) if err != nil { t.imageResult.ImageResult = err.Error() t.imageResult.ClusterName = c.clusterName @@ -352,7 +360,7 @@ func sendInferReq(images []struct { return } else { idx := rand.Intn(len(c.urls)) - r, err := getInferResult(c.urls[idx].Url, t.file, t.imageResult.ImageName) + r, err := getInferResult(c.urls[idx].Url, t.file, t.imageResult.ImageName, c.clusterName) if err != nil { t.imageResult.ImageResult = err.Error() t.imageResult.ClusterName = c.clusterName @@ -373,20 +381,113 @@ func sendInferReq(images []struct { } } -func getInferResult(url string, file multipart.File, fileName string) (string, error) { +func getInferResult(url string, file multipart.File, fileName string, clusterName string) (string, error) { + if clusterName == "鹏城云脑II-modelarts" { + r, err := getInferResultModelarts(url, file, fileName) + if err != nil { + return "", err + } + return r, nil + } var res Res req := GetRestyRequest(10) _, err := req. SetFileReader("file", fileName, file). SetResult(&res). Post(url) - if err != nil { return "", err } return res.Result, nil } +func getInferResultModelarts(url string, file multipart.File, fileName string) (string, error) { + var res Res + body, err := SendRequest("POST", url, file, fileName) + if err != nil { + return "", err + } + errjson := json.Unmarshal([]byte(body), &res) + if errjson != nil { + log.Fatalf("Error parsing JSON: %s", errjson) + } + + return res.Result, nil +} + +// SignClient AK/SK签名认证 +func SignClient(r *http.Request, writer *multipart.Writer) (*http.Client, error) { + r.Header.Add("content-type", "application/json;charset=UTF-8") + r.Header.Add("X-Project-Id", "d18190e28e3f45a281ef0b0696ec9d52") + r.Header.Add("x-stage", "RELEASE") + r.Header.Add("x-sdk-content-sha256", "UNSIGNED-PAYLOAD") + r.Header.Set("Content-Type", writer.FormDataContentType()) + s := core.Signer{ + Key: "UNEHPHO4Z7YSNPKRXFE4", + Secret: "JWXCE9qcYbc7RjpSRIWt4WgG3ZKF6Q4lPzkJReX9", + } + err := s.Sign(r) + if err != nil { + return nil, err + } + + //设置client信任所有证书 + tr := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + } + client := &http.Client{ + Transport: tr, + } + return client, nil +} + +func SendRequest(method, url string, file multipart.File, fileName string) (string, error) { + /*body := &bytes.Buffer{} + writer := multipart.NewWriter(body)*/ + // 创建一个新的缓冲区以写入multipart表单 + var body bytes.Buffer + // 创建一个新的multipart writer + writer := multipart.NewWriter(&body) + // 创建一个用于写入文件的表单字段 + part, err := writer.CreateFormFile("file", fileName) // "file"是表单的字段名,第二个参数是文件名 + if err != nil { + fmt.Println("Error creating form file:", err) + } + // 将文件的内容拷贝到multipart writer中 + _, err = io.Copy(part, file) + if err != nil { + fmt.Println("Error copying file data:", err) + + } + err = writer.Close() + if err != nil { + fmt.Println("Error closing multipart writer:", err) + } + request, err := http.NewRequest(method, "https://modelarts-inference.cloudbrain2.pcl.ac.cn/v1/infers/fb0f011f-3e74-4396-ab81-20d65525d22b/image", &body) + if err != nil { + fmt.Println("Error creating new request:", err) + //return nil, err + } + signedR, err := SignClient(request, writer) + if err != nil { + fmt.Println("Error signing request:", err) + //return nil, err + } + + res, err := signedR.Do(request) + if err != nil { + fmt.Println("Error sending request:", err) + //return nil, err + } + defer res.Body.Close() + Resbody, err := io.ReadAll(res.Body) + if err != nil { + fmt.Println("Error reading response body:", err) + //return nil, err + } + return string(Resbody), nil +} + func GetRestyRequest(timeoutSeconds int64) *resty.Request { client := resty.New().SetTimeout(time.Duration(timeoutSeconds) * time.Second) request := client.R()