From 4f3418eb28f226e8355b73b1dd681b9a4ff5e395 Mon Sep 17 00:00:00 2001 From: tzwang Date: Tue, 16 Jul 2024 17:43:34 +0800 Subject: [PATCH] update imageinference Former-commit-id: e184c0f785466f4b0e626d0cc8a3ca86f5346f5e --- .../logic/inference/imageinferencelogic.go | 11 +- .../imageInference/imageClassification.go | 419 ++++++++++ .../imageInference/imageInference.go | 9 + .../inference/imageInference/imageToText.go | 19 + .../scheduler/service/inference/inference.go | 752 +++++++++--------- .../inference/textInference/textInference.go | 1 + 6 files changed, 819 insertions(+), 392 deletions(-) create mode 100644 internal/scheduler/service/inference/imageInference/imageClassification.go create mode 100644 internal/scheduler/service/inference/imageInference/imageInference.go create mode 100644 internal/scheduler/service/inference/imageInference/imageToText.go create mode 100644 internal/scheduler/service/inference/textInference/textInference.go diff --git a/internal/logic/inference/imageinferencelogic.go b/internal/logic/inference/imageinferencelogic.go index 9dbfc6f5..d381b0f2 100644 --- a/internal/logic/inference/imageinferencelogic.go +++ b/internal/logic/inference/imageinferencelogic.go @@ -5,7 +5,7 @@ import ( "errors" "github.com/zeromicro/go-zero/core/logx" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" - "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference/imageInference" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" @@ -45,7 +45,7 @@ func (l *ImageInferenceLogic) ImageInfer(r *http.Request, req *types.ImageInfere StaticWeightMap: req.StaticWeightMap, } - var ts []*inference.ImageFile + var ts []*imageInference.ImageFile uploadedFiles := r.MultipartForm.File @@ -65,7 +65,7 @@ func (l *ImageInferenceLogic) ImageInfer(r *http.Request, req *types.ImageInfere defer file.Close() var ir types.ImageResult ir.ImageName = header.Filename - t := inference.ImageFile{ + t := imageInference.ImageFile{ ImageResult: &ir, File: file, } @@ -134,11 +134,12 @@ func (l *ImageInferenceLogic) ImageInfer(r *http.Request, req *types.ImageInfere } go func() { - r := http.Request{} - _, err := inference.ImageInfer(opt, id, adapterName, clusters, ts, l.svcCtx.Scheduler.AiService.InferenceAdapterMap, l.svcCtx.Scheduler.AiStorages, r.Context()) + ic, err := imageInference.NewImageClassification(ts, clusters, opt, l.svcCtx.Scheduler.AiStorages, l.svcCtx.Scheduler.AiService.InferenceAdapterMap, id, adapterName) if err != nil { logx.Errorf(err.Error()) + return } + ic.Classify() }() return resp, nil diff --git a/internal/scheduler/service/inference/imageInference/imageClassification.go b/internal/scheduler/service/inference/imageInference/imageClassification.go new file mode 100644 index 00000000..a9d10912 --- /dev/null +++ b/internal/scheduler/service/inference/imageInference/imageClassification.go @@ -0,0 +1,419 @@ +package imageInference + +import ( + "encoding/json" + "errors" + "github.com/go-resty/resty/v2" + "github.com/zeromicro/go-zero/core/logx" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/database" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" + "log" + "math/rand" + "mime/multipart" + "net/http" + "sort" + "strconv" + "sync" + "time" +) + +const ( + IMAGE = "image" + FORWARD_SLASH = "/" +) + +type ImageClassificationInterface interface { + Classify() ([]*types.ImageResult, error) +} + +type ImageFile struct { + ImageResult *types.ImageResult + File multipart.File +} + +type FilteredCluster struct { + urls []*inference.InferUrl + clusterId string + clusterName string + imageNum int32 +} + +type ImageClassification struct { + files []*ImageFile + clusters []*strategy.AssignedCluster + opt *option.InferOption + storage *database.AiStorage + inferAdapter map[string]map[string]inference.Inference + errMap map[string]string + taskId int64 + adapterName string + aiTaskList []*models.TaskAi +} + +func NewImageClassification(files []*ImageFile, + clusters []*strategy.AssignedCluster, + opt *option.InferOption, + storage *database.AiStorage, + inferAdapter map[string]map[string]inference.Inference, + taskId int64, + adapterName string) (*ImageClassification, error) { + + aiTaskList, err := storage.GetAiTaskListById(taskId) + if err != nil || len(aiTaskList) == 0 { + return nil, err + } + return &ImageClassification{ + files: files, + clusters: clusters, + opt: opt, + storage: storage, + inferAdapter: inferAdapter, + taskId: taskId, + adapterName: adapterName, + errMap: make(map[string]string), + aiTaskList: aiTaskList, + }, nil +} + +func (i *ImageClassification) Classify() ([]*types.ImageResult, error) { + clusters, err := i.filterClusters() + if err != nil { + return nil, err + } + err = i.updateStatus(clusters) + if err != nil { + return nil, err + } + results, err := i.inferImages(clusters) + if err != nil { + return nil, err + } + return results, nil +} + +func (i *ImageClassification) filterClusters() ([]*FilteredCluster, error) { + var wg sync.WaitGroup + var ch = make(chan *FilteredCluster, len(i.clusters)) + var cs []*FilteredCluster + var mutex sync.Mutex + + inferMap := i.inferAdapter[i.opt.AdapterId] + + for _, cluster := range i.clusters { + wg.Add(1) + c := cluster + go func() { + r := http.Request{} + imageUrls, err := inferMap[c.ClusterId].GetInferUrl(r.Context(), i.opt) + if err != nil { + mutex.Lock() + i.errMap[c.ClusterId] = err.Error() + mutex.Unlock() + wg.Done() + return + } + for i, _ := range imageUrls { + imageUrls[i].Url = imageUrls[i].Url + FORWARD_SLASH + IMAGE + } + clusterName, _ := i.storage.GetClusterNameById(c.ClusterId) + + var f FilteredCluster + f.urls = imageUrls + f.clusterId = c.ClusterId + f.clusterName = clusterName + f.imageNum = c.Replicas + + ch <- &f + wg.Done() + return + }() + } + wg.Wait() + close(ch) + + for s := range ch { + cs = append(cs, s) + } + return cs, nil +} + +func (i *ImageClassification) inferImages(cs []*FilteredCluster) ([]*types.ImageResult, error) { + var wg sync.WaitGroup + var ch = make(chan *types.ImageResult, len(i.files)) + var results []*types.ImageResult + limit := make(chan bool, 7) + + var imageNumIdx int32 = 0 + var imageNumIdxEnd int32 = 0 + for _, c := range cs { + new_images := make([]*ImageFile, len(i.files)) + copy(new_images, i.files) + + imageNumIdxEnd = imageNumIdxEnd + c.imageNum + new_images = new_images[imageNumIdx:imageNumIdxEnd] + imageNumIdx = imageNumIdx + c.imageNum + + wg.Add(len(new_images)) + go sendInferReq(new_images, c, &wg, ch, limit) + } + wg.Wait() + close(ch) + + for s := range ch { + results = append(results, s) + } + + sort.Slice(results, func(p, q int) bool { + return results[p].ClusterName < results[q].ClusterName + }) + + //save ai sub tasks + for _, r := range results { + for _, task := range i.aiTaskList { + if r.ClusterId == strconv.Itoa(int(task.ClusterId)) { + taskAiSub := models.TaskAiSub{ + TaskId: i.taskId, + TaskName: task.Name, + TaskAiId: task.TaskId, + TaskAiName: task.Name, + ImageName: r.ImageName, + Result: r.ImageResult, + Card: r.Card, + ClusterId: task.ClusterId, + ClusterName: r.ClusterName, + } + err := i.storage.SaveAiTaskImageSubTask(&taskAiSub) + if err != nil { + panic(err) + } + } + } + } + + // update succeeded cluster status + var successStatusCount int + for _, c := range cs { + for _, t := range i.aiTaskList { + if c.clusterId == strconv.Itoa(int(t.ClusterId)) { + t.Status = constants.Completed + t.EndTime = time.Now().Format(time.RFC3339) + err := i.storage.UpdateAiTask(t) + if err != nil { + logx.Errorf(err.Error()) + } + successStatusCount++ + } else { + continue + } + } + } + + if len(cs) == successStatusCount { + i.storage.AddNoticeInfo(i.opt.AdapterId, i.adapterName, "", "", i.opt.TaskName, "completed", "任务完成") + } else { + i.storage.AddNoticeInfo(i.opt.AdapterId, i.adapterName, "", "", i.opt.TaskName, "failed", "任务失败") + } + + return results, nil +} + +func (i *ImageClassification) updateStatus(cs []*FilteredCluster) error { + + //no cluster available + if len(cs) == 0 { + for _, t := range i.aiTaskList { + t.Status = constants.Failed + t.EndTime = time.Now().Format(time.RFC3339) + if _, ok := i.errMap[strconv.Itoa(int(t.ClusterId))]; ok { + t.Msg = i.errMap[strconv.Itoa(int(t.ClusterId))] + } + err := i.storage.UpdateAiTask(t) + if err != nil { + logx.Errorf(err.Error()) + } + } + i.storage.AddNoticeInfo(i.opt.AdapterId, i.adapterName, "", "", i.opt.TaskName, "failed", "任务失败") + return errors.New("image infer task failed") + } + + //change cluster status + if len(i.clusters) != len(cs) { + var acs []*strategy.AssignedCluster + var rcs []*strategy.AssignedCluster + for _, cluster := range i.clusters { + if contains(cs, cluster.ClusterId) { + var ac *strategy.AssignedCluster + ac = cluster + rcs = append(rcs, ac) + } else { + var ac *strategy.AssignedCluster + ac = cluster + acs = append(acs, ac) + } + } + + // update failed cluster status + for _, ac := range acs { + for _, t := range i.aiTaskList { + if ac.ClusterId == strconv.Itoa(int(t.ClusterId)) { + t.Status = constants.Failed + t.EndTime = time.Now().Format(time.RFC3339) + if _, ok := i.errMap[strconv.Itoa(int(t.ClusterId))]; ok { + t.Msg = i.errMap[strconv.Itoa(int(t.ClusterId))] + } + err := i.storage.UpdateAiTask(t) + if err != nil { + logx.Errorf(err.Error()) + } + } + } + } + + // update running cluster status + for _, ac := range rcs { + for _, t := range i.aiTaskList { + if ac.ClusterId == strconv.Itoa(int(t.ClusterId)) { + t.Status = constants.Running + err := i.storage.UpdateAiTask(t) + if err != nil { + logx.Errorf(err.Error()) + } + } + } + } + i.storage.AddNoticeInfo(i.opt.AdapterId, i.adapterName, "", "", i.opt.TaskName, "failed", "任务失败") + } else { + for _, t := range i.aiTaskList { + t.Status = constants.Running + err := i.storage.UpdateAiTask(t) + if err != nil { + logx.Errorf(err.Error()) + } + } + i.storage.AddNoticeInfo(i.opt.AdapterId, i.adapterName, "", "", i.opt.TaskName, "running", "任务运行中") + } + return nil +} + +func sendInferReq(images []*ImageFile, cluster *FilteredCluster, wg *sync.WaitGroup, ch chan<- *types.ImageResult, limit chan bool) { + for _, image := range images { + limit <- true + go func(t *ImageFile, c *FilteredCluster) { + if len(c.urls) == 1 { + r, err := getInferResult(c.urls[0].Url, t.File, t.ImageResult.ImageName, c.clusterName) + if err != nil { + t.ImageResult.ImageResult = err.Error() + t.ImageResult.ClusterId = c.clusterId + t.ImageResult.ClusterName = c.clusterName + t.ImageResult.Card = c.urls[0].Card + ch <- t.ImageResult + wg.Done() + <-limit + return + } + t.ImageResult.ImageResult = r + t.ImageResult.ClusterId = c.clusterId + t.ImageResult.ClusterName = c.clusterName + t.ImageResult.Card = c.urls[0].Card + + ch <- t.ImageResult + wg.Done() + <-limit + return + } else { + idx := rand.Intn(len(c.urls)) + r, err := getInferResult(c.urls[idx].Url, t.File, t.ImageResult.ImageName, c.clusterName) + if err != nil { + t.ImageResult.ImageResult = err.Error() + t.ImageResult.ClusterId = c.clusterId + t.ImageResult.ClusterName = c.clusterName + t.ImageResult.Card = c.urls[idx].Card + ch <- t.ImageResult + wg.Done() + <-limit + return + } + t.ImageResult.ImageResult = r + t.ImageResult.ClusterId = c.clusterId + t.ImageResult.ClusterName = c.clusterName + t.ImageResult.Card = c.urls[idx].Card + + ch <- t.ImageResult + wg.Done() + <-limit + return + } + }(image, cluster) + <-limit + } +} + +func getInferResult(url string, file multipart.File, fileName string, clusterName string) (string, error) { + if clusterName == "鹏城云脑II-modelarts" { + r, err := getInferResultModelarts(url, file, fileName) + if err != nil { + return "", err + } + return r, nil + } + var res Res + req := GetRestyRequest(20) + _, err := req. + SetFileReader("file", fileName, file). + SetResult(&res). + Post(url) + if err != nil { + return "", err + } + return res.Result, nil +} + +func getInferResultModelarts(url string, file multipart.File, fileName string) (string, error) { + var res Res + /* req := GetRestyRequest(20) + _, err := req. + SetFileReader("file", fileName, file). + SetHeaders(map[string]string{ + "ak": "UNEHPHO4Z7YSNPKRXFE4", + "sk": "JWXCE9qcYbc7RjpSRIWt4WgG3ZKF6Q4lPzkJReX9", + }). + SetResult(&res). + Post(url) + if err != nil { + return "", err + }*/ + body, err := utils.SendRequest("POST", url, file, fileName) + if err != nil { + return "", err + } + errjson := json.Unmarshal([]byte(body), &res) + if errjson != nil { + log.Fatalf("Error parsing JSON: %s", errjson) + } + return res.Result, nil +} + +func GetRestyRequest(timeoutSeconds int64) *resty.Request { + client := resty.New().SetTimeout(time.Duration(timeoutSeconds) * time.Second) + request := client.R() + return request +} + +type Res struct { + Result string `json:"result"` +} + +func contains(cs []*FilteredCluster, e string) bool { + for _, c := range cs { + if c.clusterId == e { + return true + } + } + return false +} diff --git a/internal/scheduler/service/inference/imageInference/imageInference.go b/internal/scheduler/service/inference/imageInference/imageInference.go new file mode 100644 index 00000000..e82da39a --- /dev/null +++ b/internal/scheduler/service/inference/imageInference/imageInference.go @@ -0,0 +1,9 @@ +package imageInference + +import "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" + +type ImageInference interface { + filterClusters() ([]*FilteredCluster, error) + inferImages(cs []*FilteredCluster) ([]*types.ImageResult, error) + updateStatus(cs []*FilteredCluster) error +} diff --git a/internal/scheduler/service/inference/imageInference/imageToText.go b/internal/scheduler/service/inference/imageInference/imageToText.go new file mode 100644 index 00000000..66c68e99 --- /dev/null +++ b/internal/scheduler/service/inference/imageInference/imageToText.go @@ -0,0 +1,19 @@ +package imageInference + +import ( + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/database" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy" +) + +type ImageToText struct { + files []*ImageFile + clusters []*strategy.AssignedCluster + opt *option.InferOption + storage *database.AiStorage + inferAdapter map[string]map[string]inference.Inference + errMap map[string]string + taskId int64 + adapterName string +} diff --git a/internal/scheduler/service/inference/inference.go b/internal/scheduler/service/inference/inference.go index 1361c3c5..c962778c 100644 --- a/internal/scheduler/service/inference/inference.go +++ b/internal/scheduler/service/inference/inference.go @@ -2,28 +2,12 @@ package inference import ( "context" - "encoding/json" - "errors" - "github.com/go-resty/resty/v2" - "github.com/zeromicro/go-zero/core/logx" - "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/database" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" - "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy" - "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" - "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" - "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" - "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" - "log" - "math/rand" - "mime/multipart" - "sort" - "strconv" - "sync" - "time" ) type Inference interface { GetInferUrl(ctx context.Context, option *option.InferOption) ([]*InferUrl, error) + //GetInferDeployInstanceList(ctx context.Context, option *option.InferOption) } type InferUrl struct { @@ -31,373 +15,367 @@ type InferUrl struct { Card string } -type ImageFile struct { - ImageResult *types.ImageResult - File multipart.File -} - -func ImageInfer(opt *option.InferOption, id int64, adapterName string, clusters []*strategy.AssignedCluster, ts []*ImageFile, inferAdapterMap map[string]map[string]Inference, storage *database.AiStorage, ctx context.Context) ([]*types.ImageResult, error) { - - //for i := len(clusters) - 1; i >= 0; i-- { - // if clusters[i].Replicas == 0 { - // clusters = append(clusters[:i], clusters[i+1:]...) - // } - //} - - var wg sync.WaitGroup - var cluster_ch = make(chan struct { - urls []*InferUrl - clusterId string - clusterName string - imageNum int32 - }, len(clusters)) - - var cs []struct { - urls []*InferUrl - clusterId string - clusterName string - imageNum int32 - } - inferMap := inferAdapterMap[opt.AdapterId] - - ////save taskai - //for _, c := range clusters { - // clusterName, _ := storage.GetClusterNameById(c.ClusterId) - // opt.Replica = c.Replicas - // err := storage.SaveAiTask(id, opt, adapterName, c.ClusterId, clusterName, "", constants.Saved, "") - // if err != nil { - // return nil, err - // } - //} - - var mutex sync.Mutex - errMap := make(map[string]string) - for _, cluster := range clusters { - wg.Add(1) - c := cluster - go func() { - imageUrls, err := inferMap[c.ClusterId].GetInferUrl(ctx, opt) - if err != nil { - mutex.Lock() - errMap[c.ClusterId] = err.Error() - mutex.Unlock() - wg.Done() - return - } - for i, _ := range imageUrls { - imageUrls[i].Url = imageUrls[i].Url + "/" + "image" - } - clusterName, _ := storage.GetClusterNameById(c.ClusterId) - - s := struct { - urls []*InferUrl - clusterId string - clusterName string - imageNum int32 - }{ - urls: imageUrls, - clusterId: c.ClusterId, - clusterName: clusterName, - imageNum: c.Replicas, - } - - cluster_ch <- s - wg.Done() - return - }() - } - wg.Wait() - close(cluster_ch) - - for s := range cluster_ch { - cs = append(cs, s) - } - - aiTaskList, err := storage.GetAiTaskListById(id) - if err != nil { - return nil, err - } - - //no cluster available - if len(cs) == 0 { - for _, t := range aiTaskList { - t.Status = constants.Failed - t.EndTime = time.Now().Format(time.RFC3339) - if _, ok := errMap[strconv.Itoa(int(t.ClusterId))]; ok { - t.Msg = errMap[strconv.Itoa(int(t.ClusterId))] - } - err := storage.UpdateAiTask(t) - if err != nil { - logx.Errorf(err.Error()) - } - } - storage.AddNoticeInfo(opt.AdapterId, adapterName, "", "", opt.TaskName, "failed", "任务失败") - return nil, errors.New("image infer task failed") - } - - //change cluster status - if len(clusters) != len(cs) { - var acs []*strategy.AssignedCluster - var rcs []*strategy.AssignedCluster - for _, cluster := range clusters { - if contains(cs, cluster.ClusterId) { - var ac *strategy.AssignedCluster - ac = cluster - rcs = append(rcs, ac) - } else { - var ac *strategy.AssignedCluster - ac = cluster - acs = append(acs, ac) - } - } - - // update failed cluster status - for _, ac := range acs { - for _, t := range aiTaskList { - if ac.ClusterId == strconv.Itoa(int(t.ClusterId)) { - t.Status = constants.Failed - t.EndTime = time.Now().Format(time.RFC3339) - if _, ok := errMap[strconv.Itoa(int(t.ClusterId))]; ok { - t.Msg = errMap[strconv.Itoa(int(t.ClusterId))] - } - err := storage.UpdateAiTask(t) - if err != nil { - logx.Errorf(err.Error()) - } - } - } - } - - // update running cluster status - for _, ac := range rcs { - for _, t := range aiTaskList { - if ac.ClusterId == strconv.Itoa(int(t.ClusterId)) { - t.Status = constants.Running - err := storage.UpdateAiTask(t) - if err != nil { - logx.Errorf(err.Error()) - } - } - } - } - storage.AddNoticeInfo(opt.AdapterId, adapterName, "", "", opt.TaskName, "failed", "任务失败") - } else { - for _, t := range aiTaskList { - t.Status = constants.Running - err := storage.UpdateAiTask(t) - if err != nil { - logx.Errorf(err.Error()) - } - } - storage.AddNoticeInfo(opt.AdapterId, adapterName, "", "", opt.TaskName, "running", "任务运行中") - } - - var result_ch = make(chan *types.ImageResult, len(ts)) - var results []*types.ImageResult - limit := make(chan bool, 7) - - var imageNumIdx int32 = 0 - var imageNumIdxEnd int32 = 0 - for _, c := range cs { - new_images := make([]*ImageFile, len(ts)) - copy(new_images, ts) - - imageNumIdxEnd = imageNumIdxEnd + c.imageNum - new_images = new_images[imageNumIdx:imageNumIdxEnd] - imageNumIdx = imageNumIdx + c.imageNum - - wg.Add(len(new_images)) - go sendInferReq(new_images, c, &wg, result_ch, limit) - } - wg.Wait() - close(result_ch) - - for s := range result_ch { - results = append(results, s) - } - - sort.Slice(results, func(p, q int) bool { - return results[p].ClusterName < results[q].ClusterName - }) - - //save ai sub tasks - for _, r := range results { - for _, task := range aiTaskList { - if r.ClusterId == strconv.Itoa(int(task.ClusterId)) { - taskAiSub := models.TaskAiSub{ - TaskId: id, - TaskName: task.Name, - TaskAiId: task.TaskId, - TaskAiName: task.Name, - ImageName: r.ImageName, - Result: r.ImageResult, - Card: r.Card, - ClusterId: task.ClusterId, - ClusterName: r.ClusterName, - } - err := storage.SaveAiTaskImageSubTask(&taskAiSub) - if err != nil { - panic(err) - } - } - } - } - - // update succeeded cluster status - var successStatusCount int - for _, c := range cs { - for _, t := range aiTaskList { - if c.clusterId == strconv.Itoa(int(t.ClusterId)) { - t.Status = constants.Completed - t.EndTime = time.Now().Format(time.RFC3339) - err := storage.UpdateAiTask(t) - if err != nil { - logx.Errorf(err.Error()) - } - successStatusCount++ - } else { - continue - } - } - } - - if len(cs) == successStatusCount { - storage.AddNoticeInfo(opt.AdapterId, adapterName, "", "", opt.TaskName, "completed", "任务完成") - } else { - storage.AddNoticeInfo(opt.AdapterId, adapterName, "", "", opt.TaskName, "failed", "任务失败") - } - - return results, nil -} - -func sendInferReq(images []*ImageFile, cluster struct { - urls []*InferUrl - clusterId string - clusterName string - imageNum int32 -}, wg *sync.WaitGroup, ch chan<- *types.ImageResult, limit chan bool) { - for _, image := range images { - limit <- true - go func(t *ImageFile, c struct { - urls []*InferUrl - clusterId string - clusterName string - imageNum int32 - }) { - if len(c.urls) == 1 { - r, err := getInferResult(c.urls[0].Url, t.File, t.ImageResult.ImageName, c.clusterName) - if err != nil { - t.ImageResult.ImageResult = err.Error() - t.ImageResult.ClusterId = c.clusterId - t.ImageResult.ClusterName = c.clusterName - t.ImageResult.Card = c.urls[0].Card - ch <- t.ImageResult - wg.Done() - <-limit - return - } - t.ImageResult.ImageResult = r - t.ImageResult.ClusterId = c.clusterId - t.ImageResult.ClusterName = c.clusterName - t.ImageResult.Card = c.urls[0].Card - - ch <- t.ImageResult - wg.Done() - <-limit - return - } else { - idx := rand.Intn(len(c.urls)) - r, err := getInferResult(c.urls[idx].Url, t.File, t.ImageResult.ImageName, c.clusterName) - if err != nil { - t.ImageResult.ImageResult = err.Error() - t.ImageResult.ClusterId = c.clusterId - t.ImageResult.ClusterName = c.clusterName - t.ImageResult.Card = c.urls[idx].Card - ch <- t.ImageResult - wg.Done() - <-limit - return - } - t.ImageResult.ImageResult = r - t.ImageResult.ClusterId = c.clusterId - t.ImageResult.ClusterName = c.clusterName - t.ImageResult.Card = c.urls[idx].Card - - ch <- t.ImageResult - wg.Done() - <-limit - return - } - }(image, cluster) - <-limit - } -} - -func getInferResult(url string, file multipart.File, fileName string, clusterName string) (string, error) { - if clusterName == "鹏城云脑II-modelarts" { - r, err := getInferResultModelarts(url, file, fileName) - if err != nil { - return "", err - } - return r, nil - } - var res Res - req := GetRestyRequest(20) - _, err := req. - SetFileReader("file", fileName, file). - SetResult(&res). - Post(url) - if err != nil { - return "", err - } - return res.Result, nil -} - -func getInferResultModelarts(url string, file multipart.File, fileName string) (string, error) { - var res Res - /* req := GetRestyRequest(20) - _, err := req. - SetFileReader("file", fileName, file). - SetHeaders(map[string]string{ - "ak": "UNEHPHO4Z7YSNPKRXFE4", - "sk": "JWXCE9qcYbc7RjpSRIWt4WgG3ZKF6Q4lPzkJReX9", - }). - SetResult(&res). - Post(url) - if err != nil { - return "", err - }*/ - body, err := utils.SendRequest("POST", url, file, fileName) - if err != nil { - return "", err - } - errjson := json.Unmarshal([]byte(body), &res) - if errjson != nil { - log.Fatalf("Error parsing JSON: %s", errjson) - } - return res.Result, nil -} - -func GetRestyRequest(timeoutSeconds int64) *resty.Request { - client := resty.New().SetTimeout(time.Duration(timeoutSeconds) * time.Second) - request := client.R() - return request -} - -type Res struct { - Result string `json:"result"` -} - -func contains(cs []struct { - urls []*InferUrl - clusterId string - clusterName string - imageNum int32 -}, e string) bool { - for _, c := range cs { - if c.clusterId == e { - return true - } - } - return false -} +//func ImageInfer(opt *option.InferOption, id int64, adapterName string, clusters []*strategy.AssignedCluster, ts []*ImageFile, inferAdapterMap map[string]map[string]Inference, storage *database.AiStorage, ctx context.Context) ([]*types.ImageResult, error) { +// +// //for i := len(clusters) - 1; i >= 0; i-- { +// // if clusters[i].Replicas == 0 { +// // clusters = append(clusters[:i], clusters[i+1:]...) +// // } +// //} +// var wg sync.WaitGroup +// var cluster_ch = make(chan struct { +// urls []*InferUrl +// clusterId string +// clusterName string +// imageNum int32 +// }, len(clusters)) +// +// var cs []struct { +// urls []*InferUrl +// clusterId string +// clusterName string +// imageNum int32 +// } +// inferMap := inferAdapterMap[opt.AdapterId] +// +// ////save taskai +// //for _, c := range clusters { +// // clusterName, _ := storage.GetClusterNameById(c.ClusterId) +// // opt.Replica = c.Replicas +// // err := storage.SaveAiTask(id, opt, adapterName, c.ClusterId, clusterName, "", constants.Saved, "") +// // if err != nil { +// // return nil, err +// // } +// //} +// +// var mutex sync.Mutex +// errMap := make(map[string]string) +// for _, cluster := range clusters { +// wg.Add(1) +// c := cluster +// go func() { +// imageUrls, err := inferMap[c.ClusterId].GetInferUrl(ctx, opt) +// if err != nil { +// mutex.Lock() +// errMap[c.ClusterId] = err.Error() +// mutex.Unlock() +// wg.Done() +// return +// } +// for i, _ := range imageUrls { +// imageUrls[i].Url = imageUrls[i].Url + "/" + "image" +// } +// clusterName, _ := storage.GetClusterNameById(c.ClusterId) +// +// s := struct { +// urls []*InferUrl +// clusterId string +// clusterName string +// imageNum int32 +// }{ +// urls: imageUrls, +// clusterId: c.ClusterId, +// clusterName: clusterName, +// imageNum: c.Replicas, +// } +// +// cluster_ch <- s +// wg.Done() +// return +// }() +// } +// wg.Wait() +// close(cluster_ch) +// +// for s := range cluster_ch { +// cs = append(cs, s) +// } +// +// aiTaskList, err := storage.GetAiTaskListById(id) +// if err != nil { +// return nil, err +// } +// +// //no cluster available +// if len(cs) == 0 { +// for _, t := range aiTaskList { +// t.Status = constants.Failed +// t.EndTime = time.Now().Format(time.RFC3339) +// if _, ok := errMap[strconv.Itoa(int(t.ClusterId))]; ok { +// t.Msg = errMap[strconv.Itoa(int(t.ClusterId))] +// } +// err := storage.UpdateAiTask(t) +// if err != nil { +// logx.Errorf(err.Error()) +// } +// } +// storage.AddNoticeInfo(opt.AdapterId, adapterName, "", "", opt.TaskName, "failed", "任务失败") +// return nil, errors.New("image infer task failed") +// } +// +// //change cluster status +// if len(clusters) != len(cs) { +// var acs []*strategy.AssignedCluster +// var rcs []*strategy.AssignedCluster +// for _, cluster := range clusters { +// if contains(cs, cluster.ClusterId) { +// var ac *strategy.AssignedCluster +// ac = cluster +// rcs = append(rcs, ac) +// } else { +// var ac *strategy.AssignedCluster +// ac = cluster +// acs = append(acs, ac) +// } +// } +// +// // update failed cluster status +// for _, ac := range acs { +// for _, t := range aiTaskList { +// if ac.ClusterId == strconv.Itoa(int(t.ClusterId)) { +// t.Status = constants.Failed +// t.EndTime = time.Now().Format(time.RFC3339) +// if _, ok := errMap[strconv.Itoa(int(t.ClusterId))]; ok { +// t.Msg = errMap[strconv.Itoa(int(t.ClusterId))] +// } +// err := storage.UpdateAiTask(t) +// if err != nil { +// logx.Errorf(err.Error()) +// } +// } +// } +// } +// +// // update running cluster status +// for _, ac := range rcs { +// for _, t := range aiTaskList { +// if ac.ClusterId == strconv.Itoa(int(t.ClusterId)) { +// t.Status = constants.Running +// err := storage.UpdateAiTask(t) +// if err != nil { +// logx.Errorf(err.Error()) +// } +// } +// } +// } +// storage.AddNoticeInfo(opt.AdapterId, adapterName, "", "", opt.TaskName, "failed", "任务失败") +// } else { +// for _, t := range aiTaskList { +// t.Status = constants.Running +// err := storage.UpdateAiTask(t) +// if err != nil { +// logx.Errorf(err.Error()) +// } +// } +// storage.AddNoticeInfo(opt.AdapterId, adapterName, "", "", opt.TaskName, "running", "任务运行中") +// } +// +// var result_ch = make(chan *types.ImageResult, len(ts)) +// var results []*types.ImageResult +// limit := make(chan bool, 7) +// +// var imageNumIdx int32 = 0 +// var imageNumIdxEnd int32 = 0 +// for _, c := range cs { +// new_images := make([]*ImageFile, len(ts)) +// copy(new_images, ts) +// +// imageNumIdxEnd = imageNumIdxEnd + c.imageNum +// new_images = new_images[imageNumIdx:imageNumIdxEnd] +// imageNumIdx = imageNumIdx + c.imageNum +// +// wg.Add(len(new_images)) +// go sendInferReq(new_images, c, &wg, result_ch, limit) +// } +// wg.Wait() +// close(result_ch) +// +// for s := range result_ch { +// results = append(results, s) +// } +// +// sort.Slice(results, func(p, q int) bool { +// return results[p].ClusterName < results[q].ClusterName +// }) +// +// //save ai sub tasks +// for _, r := range results { +// for _, task := range aiTaskList { +// if r.ClusterId == strconv.Itoa(int(task.ClusterId)) { +// taskAiSub := models.TaskAiSub{ +// TaskId: id, +// TaskName: task.Name, +// TaskAiId: task.TaskId, +// TaskAiName: task.Name, +// ImageName: r.ImageName, +// Result: r.ImageResult, +// Card: r.Card, +// ClusterId: task.ClusterId, +// ClusterName: r.ClusterName, +// } +// err := storage.SaveAiTaskImageSubTask(&taskAiSub) +// if err != nil { +// panic(err) +// } +// } +// } +// } +// +// // update succeeded cluster status +// var successStatusCount int +// for _, c := range cs { +// for _, t := range aiTaskList { +// if c.clusterId == strconv.Itoa(int(t.ClusterId)) { +// t.Status = constants.Completed +// t.EndTime = time.Now().Format(time.RFC3339) +// err := storage.UpdateAiTask(t) +// if err != nil { +// logx.Errorf(err.Error()) +// } +// successStatusCount++ +// } else { +// continue +// } +// } +// } +// +// if len(cs) == successStatusCount { +// storage.AddNoticeInfo(opt.AdapterId, adapterName, "", "", opt.TaskName, "completed", "任务完成") +// } else { +// storage.AddNoticeInfo(opt.AdapterId, adapterName, "", "", opt.TaskName, "failed", "任务失败") +// } +// +// return results, nil +//} +// +//func sendInferReq(images []*ImageFile, cluster struct { +// urls []*InferUrl +// clusterId string +// clusterName string +// imageNum int32 +//}, wg *sync.WaitGroup, ch chan<- *types.ImageResult, limit chan bool) { +// for _, image := range images { +// limit <- true +// go func(t *ImageFile, c struct { +// urls []*InferUrl +// clusterId string +// clusterName string +// imageNum int32 +// }) { +// if len(c.urls) == 1 { +// r, err := getInferResult(c.urls[0].Url, t.File, t.ImageResult.ImageName, c.clusterName) +// if err != nil { +// t.ImageResult.ImageResult = err.Error() +// t.ImageResult.ClusterId = c.clusterId +// t.ImageResult.ClusterName = c.clusterName +// t.ImageResult.Card = c.urls[0].Card +// ch <- t.ImageResult +// wg.Done() +// <-limit +// return +// } +// t.ImageResult.ImageResult = r +// t.ImageResult.ClusterId = c.clusterId +// t.ImageResult.ClusterName = c.clusterName +// t.ImageResult.Card = c.urls[0].Card +// +// ch <- t.ImageResult +// wg.Done() +// <-limit +// return +// } else { +// idx := rand.Intn(len(c.urls)) +// r, err := getInferResult(c.urls[idx].Url, t.File, t.ImageResult.ImageName, c.clusterName) +// if err != nil { +// t.ImageResult.ImageResult = err.Error() +// t.ImageResult.ClusterId = c.clusterId +// t.ImageResult.ClusterName = c.clusterName +// t.ImageResult.Card = c.urls[idx].Card +// ch <- t.ImageResult +// wg.Done() +// <-limit +// return +// } +// t.ImageResult.ImageResult = r +// t.ImageResult.ClusterId = c.clusterId +// t.ImageResult.ClusterName = c.clusterName +// t.ImageResult.Card = c.urls[idx].Card +// +// ch <- t.ImageResult +// wg.Done() +// <-limit +// return +// } +// }(image, cluster) +// <-limit +// } +//} +// +//func getInferResult(url string, file multipart.File, fileName string, clusterName string) (string, error) { +// if clusterName == "鹏城云脑II-modelarts" { +// r, err := getInferResultModelarts(url, file, fileName) +// if err != nil { +// return "", err +// } +// return r, nil +// } +// var res Res +// req := GetRestyRequest(20) +// _, err := req. +// SetFileReader("file", fileName, file). +// SetResult(&res). +// Post(url) +// if err != nil { +// return "", err +// } +// return res.Result, nil +//} +// +//func getInferResultModelarts(url string, file multipart.File, fileName string) (string, error) { +// var res Res +// /* req := GetRestyRequest(20) +// _, err := req. +// SetFileReader("file", fileName, file). +// SetHeaders(map[string]string{ +// "ak": "UNEHPHO4Z7YSNPKRXFE4", +// "sk": "JWXCE9qcYbc7RjpSRIWt4WgG3ZKF6Q4lPzkJReX9", +// }). +// SetResult(&res). +// Post(url) +// if err != nil { +// return "", err +// }*/ +// body, err := utils.SendRequest("POST", url, file, fileName) +// if err != nil { +// return "", err +// } +// errjson := json.Unmarshal([]byte(body), &res) +// if errjson != nil { +// log.Fatalf("Error parsing JSON: %s", errjson) +// } +// return res.Result, nil +//} +// +//func GetRestyRequest(timeoutSeconds int64) *resty.Request { +// client := resty.New().SetTimeout(time.Duration(timeoutSeconds) * time.Second) +// request := client.R() +// return request +//} +// +//type Res struct { +// Result string `json:"result"` +//} +// +//func contains(cs []struct { +// urls []*InferUrl +// clusterId string +// clusterName string +// imageNum int32 +//}, e string) bool { +// for _, c := range cs { +// if c.clusterId == e { +// return true +// } +// } +// return false +//} diff --git a/internal/scheduler/service/inference/textInference/textInference.go b/internal/scheduler/service/inference/textInference/textInference.go new file mode 100644 index 00000000..29d62ad7 --- /dev/null +++ b/internal/scheduler/service/inference/textInference/textInference.go @@ -0,0 +1 @@ +package textInference