update imageinference

Former-commit-id: e184c0f785466f4b0e626d0cc8a3ca86f5346f5e
This commit is contained in:
tzwang 2024-07-16 17:43:34 +08:00
parent 4120d60511
commit 4f3418eb28
6 changed files with 819 additions and 392 deletions

View File

@ -5,7 +5,7 @@ import (
"errors" "errors"
"github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference/imageInference"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
@ -45,7 +45,7 @@ func (l *ImageInferenceLogic) ImageInfer(r *http.Request, req *types.ImageInfere
StaticWeightMap: req.StaticWeightMap, StaticWeightMap: req.StaticWeightMap,
} }
var ts []*inference.ImageFile var ts []*imageInference.ImageFile
uploadedFiles := r.MultipartForm.File uploadedFiles := r.MultipartForm.File
@ -65,7 +65,7 @@ func (l *ImageInferenceLogic) ImageInfer(r *http.Request, req *types.ImageInfere
defer file.Close() defer file.Close()
var ir types.ImageResult var ir types.ImageResult
ir.ImageName = header.Filename ir.ImageName = header.Filename
t := inference.ImageFile{ t := imageInference.ImageFile{
ImageResult: &ir, ImageResult: &ir,
File: file, File: file,
} }
@ -134,11 +134,12 @@ func (l *ImageInferenceLogic) ImageInfer(r *http.Request, req *types.ImageInfere
} }
go func() { go func() {
r := http.Request{} ic, err := imageInference.NewImageClassification(ts, clusters, opt, l.svcCtx.Scheduler.AiStorages, l.svcCtx.Scheduler.AiService.InferenceAdapterMap, id, adapterName)
_, err := inference.ImageInfer(opt, id, adapterName, clusters, ts, l.svcCtx.Scheduler.AiService.InferenceAdapterMap, l.svcCtx.Scheduler.AiStorages, r.Context())
if err != nil { if err != nil {
logx.Errorf(err.Error()) logx.Errorf(err.Error())
return
} }
ic.Classify()
}() }()
return resp, nil return resp, nil

View File

@ -0,0 +1,419 @@
package imageInference
import (
"encoding/json"
"errors"
"github.com/go-resty/resty/v2"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/database"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"log"
"math/rand"
"mime/multipart"
"net/http"
"sort"
"strconv"
"sync"
"time"
)
const (
IMAGE = "image"
FORWARD_SLASH = "/"
)
type ImageClassificationInterface interface {
Classify() ([]*types.ImageResult, error)
}
type ImageFile struct {
ImageResult *types.ImageResult
File multipart.File
}
type FilteredCluster struct {
urls []*inference.InferUrl
clusterId string
clusterName string
imageNum int32
}
type ImageClassification struct {
files []*ImageFile
clusters []*strategy.AssignedCluster
opt *option.InferOption
storage *database.AiStorage
inferAdapter map[string]map[string]inference.Inference
errMap map[string]string
taskId int64
adapterName string
aiTaskList []*models.TaskAi
}
func NewImageClassification(files []*ImageFile,
clusters []*strategy.AssignedCluster,
opt *option.InferOption,
storage *database.AiStorage,
inferAdapter map[string]map[string]inference.Inference,
taskId int64,
adapterName string) (*ImageClassification, error) {
aiTaskList, err := storage.GetAiTaskListById(taskId)
if err != nil || len(aiTaskList) == 0 {
return nil, err
}
return &ImageClassification{
files: files,
clusters: clusters,
opt: opt,
storage: storage,
inferAdapter: inferAdapter,
taskId: taskId,
adapterName: adapterName,
errMap: make(map[string]string),
aiTaskList: aiTaskList,
}, nil
}
func (i *ImageClassification) Classify() ([]*types.ImageResult, error) {
clusters, err := i.filterClusters()
if err != nil {
return nil, err
}
err = i.updateStatus(clusters)
if err != nil {
return nil, err
}
results, err := i.inferImages(clusters)
if err != nil {
return nil, err
}
return results, nil
}
func (i *ImageClassification) filterClusters() ([]*FilteredCluster, error) {
var wg sync.WaitGroup
var ch = make(chan *FilteredCluster, len(i.clusters))
var cs []*FilteredCluster
var mutex sync.Mutex
inferMap := i.inferAdapter[i.opt.AdapterId]
for _, cluster := range i.clusters {
wg.Add(1)
c := cluster
go func() {
r := http.Request{}
imageUrls, err := inferMap[c.ClusterId].GetInferUrl(r.Context(), i.opt)
if err != nil {
mutex.Lock()
i.errMap[c.ClusterId] = err.Error()
mutex.Unlock()
wg.Done()
return
}
for i, _ := range imageUrls {
imageUrls[i].Url = imageUrls[i].Url + FORWARD_SLASH + IMAGE
}
clusterName, _ := i.storage.GetClusterNameById(c.ClusterId)
var f FilteredCluster
f.urls = imageUrls
f.clusterId = c.ClusterId
f.clusterName = clusterName
f.imageNum = c.Replicas
ch <- &f
wg.Done()
return
}()
}
wg.Wait()
close(ch)
for s := range ch {
cs = append(cs, s)
}
return cs, nil
}
func (i *ImageClassification) inferImages(cs []*FilteredCluster) ([]*types.ImageResult, error) {
var wg sync.WaitGroup
var ch = make(chan *types.ImageResult, len(i.files))
var results []*types.ImageResult
limit := make(chan bool, 7)
var imageNumIdx int32 = 0
var imageNumIdxEnd int32 = 0
for _, c := range cs {
new_images := make([]*ImageFile, len(i.files))
copy(new_images, i.files)
imageNumIdxEnd = imageNumIdxEnd + c.imageNum
new_images = new_images[imageNumIdx:imageNumIdxEnd]
imageNumIdx = imageNumIdx + c.imageNum
wg.Add(len(new_images))
go sendInferReq(new_images, c, &wg, ch, limit)
}
wg.Wait()
close(ch)
for s := range ch {
results = append(results, s)
}
sort.Slice(results, func(p, q int) bool {
return results[p].ClusterName < results[q].ClusterName
})
//save ai sub tasks
for _, r := range results {
for _, task := range i.aiTaskList {
if r.ClusterId == strconv.Itoa(int(task.ClusterId)) {
taskAiSub := models.TaskAiSub{
TaskId: i.taskId,
TaskName: task.Name,
TaskAiId: task.TaskId,
TaskAiName: task.Name,
ImageName: r.ImageName,
Result: r.ImageResult,
Card: r.Card,
ClusterId: task.ClusterId,
ClusterName: r.ClusterName,
}
err := i.storage.SaveAiTaskImageSubTask(&taskAiSub)
if err != nil {
panic(err)
}
}
}
}
// update succeeded cluster status
var successStatusCount int
for _, c := range cs {
for _, t := range i.aiTaskList {
if c.clusterId == strconv.Itoa(int(t.ClusterId)) {
t.Status = constants.Completed
t.EndTime = time.Now().Format(time.RFC3339)
err := i.storage.UpdateAiTask(t)
if err != nil {
logx.Errorf(err.Error())
}
successStatusCount++
} else {
continue
}
}
}
if len(cs) == successStatusCount {
i.storage.AddNoticeInfo(i.opt.AdapterId, i.adapterName, "", "", i.opt.TaskName, "completed", "任务完成")
} else {
i.storage.AddNoticeInfo(i.opt.AdapterId, i.adapterName, "", "", i.opt.TaskName, "failed", "任务失败")
}
return results, nil
}
func (i *ImageClassification) updateStatus(cs []*FilteredCluster) error {
//no cluster available
if len(cs) == 0 {
for _, t := range i.aiTaskList {
t.Status = constants.Failed
t.EndTime = time.Now().Format(time.RFC3339)
if _, ok := i.errMap[strconv.Itoa(int(t.ClusterId))]; ok {
t.Msg = i.errMap[strconv.Itoa(int(t.ClusterId))]
}
err := i.storage.UpdateAiTask(t)
if err != nil {
logx.Errorf(err.Error())
}
}
i.storage.AddNoticeInfo(i.opt.AdapterId, i.adapterName, "", "", i.opt.TaskName, "failed", "任务失败")
return errors.New("image infer task failed")
}
//change cluster status
if len(i.clusters) != len(cs) {
var acs []*strategy.AssignedCluster
var rcs []*strategy.AssignedCluster
for _, cluster := range i.clusters {
if contains(cs, cluster.ClusterId) {
var ac *strategy.AssignedCluster
ac = cluster
rcs = append(rcs, ac)
} else {
var ac *strategy.AssignedCluster
ac = cluster
acs = append(acs, ac)
}
}
// update failed cluster status
for _, ac := range acs {
for _, t := range i.aiTaskList {
if ac.ClusterId == strconv.Itoa(int(t.ClusterId)) {
t.Status = constants.Failed
t.EndTime = time.Now().Format(time.RFC3339)
if _, ok := i.errMap[strconv.Itoa(int(t.ClusterId))]; ok {
t.Msg = i.errMap[strconv.Itoa(int(t.ClusterId))]
}
err := i.storage.UpdateAiTask(t)
if err != nil {
logx.Errorf(err.Error())
}
}
}
}
// update running cluster status
for _, ac := range rcs {
for _, t := range i.aiTaskList {
if ac.ClusterId == strconv.Itoa(int(t.ClusterId)) {
t.Status = constants.Running
err := i.storage.UpdateAiTask(t)
if err != nil {
logx.Errorf(err.Error())
}
}
}
}
i.storage.AddNoticeInfo(i.opt.AdapterId, i.adapterName, "", "", i.opt.TaskName, "failed", "任务失败")
} else {
for _, t := range i.aiTaskList {
t.Status = constants.Running
err := i.storage.UpdateAiTask(t)
if err != nil {
logx.Errorf(err.Error())
}
}
i.storage.AddNoticeInfo(i.opt.AdapterId, i.adapterName, "", "", i.opt.TaskName, "running", "任务运行中")
}
return nil
}
func sendInferReq(images []*ImageFile, cluster *FilteredCluster, wg *sync.WaitGroup, ch chan<- *types.ImageResult, limit chan bool) {
for _, image := range images {
limit <- true
go func(t *ImageFile, c *FilteredCluster) {
if len(c.urls) == 1 {
r, err := getInferResult(c.urls[0].Url, t.File, t.ImageResult.ImageName, c.clusterName)
if err != nil {
t.ImageResult.ImageResult = err.Error()
t.ImageResult.ClusterId = c.clusterId
t.ImageResult.ClusterName = c.clusterName
t.ImageResult.Card = c.urls[0].Card
ch <- t.ImageResult
wg.Done()
<-limit
return
}
t.ImageResult.ImageResult = r
t.ImageResult.ClusterId = c.clusterId
t.ImageResult.ClusterName = c.clusterName
t.ImageResult.Card = c.urls[0].Card
ch <- t.ImageResult
wg.Done()
<-limit
return
} else {
idx := rand.Intn(len(c.urls))
r, err := getInferResult(c.urls[idx].Url, t.File, t.ImageResult.ImageName, c.clusterName)
if err != nil {
t.ImageResult.ImageResult = err.Error()
t.ImageResult.ClusterId = c.clusterId
t.ImageResult.ClusterName = c.clusterName
t.ImageResult.Card = c.urls[idx].Card
ch <- t.ImageResult
wg.Done()
<-limit
return
}
t.ImageResult.ImageResult = r
t.ImageResult.ClusterId = c.clusterId
t.ImageResult.ClusterName = c.clusterName
t.ImageResult.Card = c.urls[idx].Card
ch <- t.ImageResult
wg.Done()
<-limit
return
}
}(image, cluster)
<-limit
}
}
func getInferResult(url string, file multipart.File, fileName string, clusterName string) (string, error) {
if clusterName == "鹏城云脑II-modelarts" {
r, err := getInferResultModelarts(url, file, fileName)
if err != nil {
return "", err
}
return r, nil
}
var res Res
req := GetRestyRequest(20)
_, err := req.
SetFileReader("file", fileName, file).
SetResult(&res).
Post(url)
if err != nil {
return "", err
}
return res.Result, nil
}
func getInferResultModelarts(url string, file multipart.File, fileName string) (string, error) {
var res Res
/* req := GetRestyRequest(20)
_, err := req.
SetFileReader("file", fileName, file).
SetHeaders(map[string]string{
"ak": "UNEHPHO4Z7YSNPKRXFE4",
"sk": "JWXCE9qcYbc7RjpSRIWt4WgG3ZKF6Q4lPzkJReX9",
}).
SetResult(&res).
Post(url)
if err != nil {
return "", err
}*/
body, err := utils.SendRequest("POST", url, file, fileName)
if err != nil {
return "", err
}
errjson := json.Unmarshal([]byte(body), &res)
if errjson != nil {
log.Fatalf("Error parsing JSON: %s", errjson)
}
return res.Result, nil
}
func GetRestyRequest(timeoutSeconds int64) *resty.Request {
client := resty.New().SetTimeout(time.Duration(timeoutSeconds) * time.Second)
request := client.R()
return request
}
type Res struct {
Result string `json:"result"`
}
func contains(cs []*FilteredCluster, e string) bool {
for _, c := range cs {
if c.clusterId == e {
return true
}
}
return false
}

View File

@ -0,0 +1,9 @@
package imageInference
import "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
type ImageInference interface {
filterClusters() ([]*FilteredCluster, error)
inferImages(cs []*FilteredCluster) ([]*types.ImageResult, error)
updateStatus(cs []*FilteredCluster) error
}

View File

@ -0,0 +1,19 @@
package imageInference
import (
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/database"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy"
)
type ImageToText struct {
files []*ImageFile
clusters []*strategy.AssignedCluster
opt *option.InferOption
storage *database.AiStorage
inferAdapter map[string]map[string]inference.Inference
errMap map[string]string
taskId int64
adapterName string
}

View File

@ -2,28 +2,12 @@ package inference
import ( import (
"context" "context"
"encoding/json"
"errors"
"github.com/go-resty/resty/v2"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/database"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"log"
"math/rand"
"mime/multipart"
"sort"
"strconv"
"sync"
"time"
) )
type Inference interface { type Inference interface {
GetInferUrl(ctx context.Context, option *option.InferOption) ([]*InferUrl, error) GetInferUrl(ctx context.Context, option *option.InferOption) ([]*InferUrl, error)
//GetInferDeployInstanceList(ctx context.Context, option *option.InferOption)
} }
type InferUrl struct { type InferUrl struct {
@ -31,373 +15,367 @@ type InferUrl struct {
Card string Card string
} }
type ImageFile struct { //func ImageInfer(opt *option.InferOption, id int64, adapterName string, clusters []*strategy.AssignedCluster, ts []*ImageFile, inferAdapterMap map[string]map[string]Inference, storage *database.AiStorage, ctx context.Context) ([]*types.ImageResult, error) {
ImageResult *types.ImageResult //
File multipart.File // //for i := len(clusters) - 1; i >= 0; i-- {
} // // if clusters[i].Replicas == 0 {
// // clusters = append(clusters[:i], clusters[i+1:]...)
func ImageInfer(opt *option.InferOption, id int64, adapterName string, clusters []*strategy.AssignedCluster, ts []*ImageFile, inferAdapterMap map[string]map[string]Inference, storage *database.AiStorage, ctx context.Context) ([]*types.ImageResult, error) { // // }
// //}
//for i := len(clusters) - 1; i >= 0; i-- { // var wg sync.WaitGroup
// if clusters[i].Replicas == 0 { // var cluster_ch = make(chan struct {
// clusters = append(clusters[:i], clusters[i+1:]...) // urls []*InferUrl
// } // clusterId string
//} // clusterName string
// imageNum int32
var wg sync.WaitGroup // }, len(clusters))
var cluster_ch = make(chan struct { //
urls []*InferUrl // var cs []struct {
clusterId string // urls []*InferUrl
clusterName string // clusterId string
imageNum int32 // clusterName string
}, len(clusters)) // imageNum int32
// }
var cs []struct { // inferMap := inferAdapterMap[opt.AdapterId]
urls []*InferUrl //
clusterId string // ////save taskai
clusterName string // //for _, c := range clusters {
imageNum int32 // // clusterName, _ := storage.GetClusterNameById(c.ClusterId)
} // // opt.Replica = c.Replicas
inferMap := inferAdapterMap[opt.AdapterId] // // err := storage.SaveAiTask(id, opt, adapterName, c.ClusterId, clusterName, "", constants.Saved, "")
// // if err != nil {
////save taskai // // return nil, err
//for _, c := range clusters { // // }
// clusterName, _ := storage.GetClusterNameById(c.ClusterId) // //}
// opt.Replica = c.Replicas //
// err := storage.SaveAiTask(id, opt, adapterName, c.ClusterId, clusterName, "", constants.Saved, "") // var mutex sync.Mutex
// if err != nil { // errMap := make(map[string]string)
// return nil, err // for _, cluster := range clusters {
// } // wg.Add(1)
//} // c := cluster
// go func() {
var mutex sync.Mutex // imageUrls, err := inferMap[c.ClusterId].GetInferUrl(ctx, opt)
errMap := make(map[string]string) // if err != nil {
for _, cluster := range clusters { // mutex.Lock()
wg.Add(1) // errMap[c.ClusterId] = err.Error()
c := cluster // mutex.Unlock()
go func() { // wg.Done()
imageUrls, err := inferMap[c.ClusterId].GetInferUrl(ctx, opt) // return
if err != nil { // }
mutex.Lock() // for i, _ := range imageUrls {
errMap[c.ClusterId] = err.Error() // imageUrls[i].Url = imageUrls[i].Url + "/" + "image"
mutex.Unlock() // }
wg.Done() // clusterName, _ := storage.GetClusterNameById(c.ClusterId)
return //
} // s := struct {
for i, _ := range imageUrls { // urls []*InferUrl
imageUrls[i].Url = imageUrls[i].Url + "/" + "image" // clusterId string
} // clusterName string
clusterName, _ := storage.GetClusterNameById(c.ClusterId) // imageNum int32
// }{
s := struct { // urls: imageUrls,
urls []*InferUrl // clusterId: c.ClusterId,
clusterId string // clusterName: clusterName,
clusterName string // imageNum: c.Replicas,
imageNum int32 // }
}{ //
urls: imageUrls, // cluster_ch <- s
clusterId: c.ClusterId, // wg.Done()
clusterName: clusterName, // return
imageNum: c.Replicas, // }()
} // }
// wg.Wait()
cluster_ch <- s // close(cluster_ch)
wg.Done() //
return // for s := range cluster_ch {
}() // cs = append(cs, s)
} // }
wg.Wait() //
close(cluster_ch) // aiTaskList, err := storage.GetAiTaskListById(id)
// if err != nil {
for s := range cluster_ch { // return nil, err
cs = append(cs, s) // }
} //
// //no cluster available
aiTaskList, err := storage.GetAiTaskListById(id) // if len(cs) == 0 {
if err != nil { // for _, t := range aiTaskList {
return nil, err // t.Status = constants.Failed
} // t.EndTime = time.Now().Format(time.RFC3339)
// if _, ok := errMap[strconv.Itoa(int(t.ClusterId))]; ok {
//no cluster available // t.Msg = errMap[strconv.Itoa(int(t.ClusterId))]
if len(cs) == 0 { // }
for _, t := range aiTaskList { // err := storage.UpdateAiTask(t)
t.Status = constants.Failed // if err != nil {
t.EndTime = time.Now().Format(time.RFC3339) // logx.Errorf(err.Error())
if _, ok := errMap[strconv.Itoa(int(t.ClusterId))]; ok { // }
t.Msg = errMap[strconv.Itoa(int(t.ClusterId))] // }
} // storage.AddNoticeInfo(opt.AdapterId, adapterName, "", "", opt.TaskName, "failed", "任务失败")
err := storage.UpdateAiTask(t) // return nil, errors.New("image infer task failed")
if err != nil { // }
logx.Errorf(err.Error()) //
} // //change cluster status
} // if len(clusters) != len(cs) {
storage.AddNoticeInfo(opt.AdapterId, adapterName, "", "", opt.TaskName, "failed", "任务失败") // var acs []*strategy.AssignedCluster
return nil, errors.New("image infer task failed") // var rcs []*strategy.AssignedCluster
} // for _, cluster := range clusters {
// if contains(cs, cluster.ClusterId) {
//change cluster status // var ac *strategy.AssignedCluster
if len(clusters) != len(cs) { // ac = cluster
var acs []*strategy.AssignedCluster // rcs = append(rcs, ac)
var rcs []*strategy.AssignedCluster // } else {
for _, cluster := range clusters { // var ac *strategy.AssignedCluster
if contains(cs, cluster.ClusterId) { // ac = cluster
var ac *strategy.AssignedCluster // acs = append(acs, ac)
ac = cluster // }
rcs = append(rcs, ac) // }
} else { //
var ac *strategy.AssignedCluster // // update failed cluster status
ac = cluster // for _, ac := range acs {
acs = append(acs, ac) // for _, t := range aiTaskList {
} // if ac.ClusterId == strconv.Itoa(int(t.ClusterId)) {
} // t.Status = constants.Failed
// t.EndTime = time.Now().Format(time.RFC3339)
// update failed cluster status // if _, ok := errMap[strconv.Itoa(int(t.ClusterId))]; ok {
for _, ac := range acs { // t.Msg = errMap[strconv.Itoa(int(t.ClusterId))]
for _, t := range aiTaskList { // }
if ac.ClusterId == strconv.Itoa(int(t.ClusterId)) { // err := storage.UpdateAiTask(t)
t.Status = constants.Failed // if err != nil {
t.EndTime = time.Now().Format(time.RFC3339) // logx.Errorf(err.Error())
if _, ok := errMap[strconv.Itoa(int(t.ClusterId))]; ok { // }
t.Msg = errMap[strconv.Itoa(int(t.ClusterId))] // }
} // }
err := storage.UpdateAiTask(t) // }
if err != nil { //
logx.Errorf(err.Error()) // // update running cluster status
} // for _, ac := range rcs {
} // for _, t := range aiTaskList {
} // if ac.ClusterId == strconv.Itoa(int(t.ClusterId)) {
} // t.Status = constants.Running
// err := storage.UpdateAiTask(t)
// update running cluster status // if err != nil {
for _, ac := range rcs { // logx.Errorf(err.Error())
for _, t := range aiTaskList { // }
if ac.ClusterId == strconv.Itoa(int(t.ClusterId)) { // }
t.Status = constants.Running // }
err := storage.UpdateAiTask(t) // }
if err != nil { // storage.AddNoticeInfo(opt.AdapterId, adapterName, "", "", opt.TaskName, "failed", "任务失败")
logx.Errorf(err.Error()) // } else {
} // for _, t := range aiTaskList {
} // t.Status = constants.Running
} // err := storage.UpdateAiTask(t)
} // if err != nil {
storage.AddNoticeInfo(opt.AdapterId, adapterName, "", "", opt.TaskName, "failed", "任务失败") // logx.Errorf(err.Error())
} else { // }
for _, t := range aiTaskList { // }
t.Status = constants.Running // storage.AddNoticeInfo(opt.AdapterId, adapterName, "", "", opt.TaskName, "running", "任务运行中")
err := storage.UpdateAiTask(t) // }
if err != nil { //
logx.Errorf(err.Error()) // var result_ch = make(chan *types.ImageResult, len(ts))
} // var results []*types.ImageResult
} // limit := make(chan bool, 7)
storage.AddNoticeInfo(opt.AdapterId, adapterName, "", "", opt.TaskName, "running", "任务运行中") //
} // var imageNumIdx int32 = 0
// var imageNumIdxEnd int32 = 0
var result_ch = make(chan *types.ImageResult, len(ts)) // for _, c := range cs {
var results []*types.ImageResult // new_images := make([]*ImageFile, len(ts))
limit := make(chan bool, 7) // copy(new_images, ts)
//
var imageNumIdx int32 = 0 // imageNumIdxEnd = imageNumIdxEnd + c.imageNum
var imageNumIdxEnd int32 = 0 // new_images = new_images[imageNumIdx:imageNumIdxEnd]
for _, c := range cs { // imageNumIdx = imageNumIdx + c.imageNum
new_images := make([]*ImageFile, len(ts)) //
copy(new_images, ts) // wg.Add(len(new_images))
// go sendInferReq(new_images, c, &wg, result_ch, limit)
imageNumIdxEnd = imageNumIdxEnd + c.imageNum // }
new_images = new_images[imageNumIdx:imageNumIdxEnd] // wg.Wait()
imageNumIdx = imageNumIdx + c.imageNum // close(result_ch)
//
wg.Add(len(new_images)) // for s := range result_ch {
go sendInferReq(new_images, c, &wg, result_ch, limit) // results = append(results, s)
} // }
wg.Wait() //
close(result_ch) // sort.Slice(results, func(p, q int) bool {
// return results[p].ClusterName < results[q].ClusterName
for s := range result_ch { // })
results = append(results, s) //
} // //save ai sub tasks
// for _, r := range results {
sort.Slice(results, func(p, q int) bool { // for _, task := range aiTaskList {
return results[p].ClusterName < results[q].ClusterName // if r.ClusterId == strconv.Itoa(int(task.ClusterId)) {
}) // taskAiSub := models.TaskAiSub{
// TaskId: id,
//save ai sub tasks // TaskName: task.Name,
for _, r := range results { // TaskAiId: task.TaskId,
for _, task := range aiTaskList { // TaskAiName: task.Name,
if r.ClusterId == strconv.Itoa(int(task.ClusterId)) { // ImageName: r.ImageName,
taskAiSub := models.TaskAiSub{ // Result: r.ImageResult,
TaskId: id, // Card: r.Card,
TaskName: task.Name, // ClusterId: task.ClusterId,
TaskAiId: task.TaskId, // ClusterName: r.ClusterName,
TaskAiName: task.Name, // }
ImageName: r.ImageName, // err := storage.SaveAiTaskImageSubTask(&taskAiSub)
Result: r.ImageResult, // if err != nil {
Card: r.Card, // panic(err)
ClusterId: task.ClusterId, // }
ClusterName: r.ClusterName, // }
} // }
err := storage.SaveAiTaskImageSubTask(&taskAiSub) // }
if err != nil { //
panic(err) // // update succeeded cluster status
} // var successStatusCount int
} // for _, c := range cs {
} // for _, t := range aiTaskList {
} // if c.clusterId == strconv.Itoa(int(t.ClusterId)) {
// t.Status = constants.Completed
// update succeeded cluster status // t.EndTime = time.Now().Format(time.RFC3339)
var successStatusCount int // err := storage.UpdateAiTask(t)
for _, c := range cs { // if err != nil {
for _, t := range aiTaskList { // logx.Errorf(err.Error())
if c.clusterId == strconv.Itoa(int(t.ClusterId)) { // }
t.Status = constants.Completed // successStatusCount++
t.EndTime = time.Now().Format(time.RFC3339) // } else {
err := storage.UpdateAiTask(t) // continue
if err != nil { // }
logx.Errorf(err.Error()) // }
} // }
successStatusCount++ //
} else { // if len(cs) == successStatusCount {
continue // storage.AddNoticeInfo(opt.AdapterId, adapterName, "", "", opt.TaskName, "completed", "任务完成")
} // } else {
} // storage.AddNoticeInfo(opt.AdapterId, adapterName, "", "", opt.TaskName, "failed", "任务失败")
} // }
//
if len(cs) == successStatusCount { // return results, nil
storage.AddNoticeInfo(opt.AdapterId, adapterName, "", "", opt.TaskName, "completed", "任务完成") //}
} else { //
storage.AddNoticeInfo(opt.AdapterId, adapterName, "", "", opt.TaskName, "failed", "任务失败") //func sendInferReq(images []*ImageFile, cluster struct {
} // urls []*InferUrl
// clusterId string
return results, nil // clusterName string
} // imageNum int32
//}, wg *sync.WaitGroup, ch chan<- *types.ImageResult, limit chan bool) {
func sendInferReq(images []*ImageFile, cluster struct { // for _, image := range images {
urls []*InferUrl // limit <- true
clusterId string // go func(t *ImageFile, c struct {
clusterName string // urls []*InferUrl
imageNum int32 // clusterId string
}, wg *sync.WaitGroup, ch chan<- *types.ImageResult, limit chan bool) { // clusterName string
for _, image := range images { // imageNum int32
limit <- true // }) {
go func(t *ImageFile, c struct { // if len(c.urls) == 1 {
urls []*InferUrl // r, err := getInferResult(c.urls[0].Url, t.File, t.ImageResult.ImageName, c.clusterName)
clusterId string // if err != nil {
clusterName string // t.ImageResult.ImageResult = err.Error()
imageNum int32 // t.ImageResult.ClusterId = c.clusterId
}) { // t.ImageResult.ClusterName = c.clusterName
if len(c.urls) == 1 { // t.ImageResult.Card = c.urls[0].Card
r, err := getInferResult(c.urls[0].Url, t.File, t.ImageResult.ImageName, c.clusterName) // ch <- t.ImageResult
if err != nil { // wg.Done()
t.ImageResult.ImageResult = err.Error() // <-limit
t.ImageResult.ClusterId = c.clusterId // return
t.ImageResult.ClusterName = c.clusterName // }
t.ImageResult.Card = c.urls[0].Card // t.ImageResult.ImageResult = r
ch <- t.ImageResult // t.ImageResult.ClusterId = c.clusterId
wg.Done() // t.ImageResult.ClusterName = c.clusterName
<-limit // t.ImageResult.Card = c.urls[0].Card
return //
} // ch <- t.ImageResult
t.ImageResult.ImageResult = r // wg.Done()
t.ImageResult.ClusterId = c.clusterId // <-limit
t.ImageResult.ClusterName = c.clusterName // return
t.ImageResult.Card = c.urls[0].Card // } else {
// idx := rand.Intn(len(c.urls))
ch <- t.ImageResult // r, err := getInferResult(c.urls[idx].Url, t.File, t.ImageResult.ImageName, c.clusterName)
wg.Done() // if err != nil {
<-limit // t.ImageResult.ImageResult = err.Error()
return // t.ImageResult.ClusterId = c.clusterId
} else { // t.ImageResult.ClusterName = c.clusterName
idx := rand.Intn(len(c.urls)) // t.ImageResult.Card = c.urls[idx].Card
r, err := getInferResult(c.urls[idx].Url, t.File, t.ImageResult.ImageName, c.clusterName) // ch <- t.ImageResult
if err != nil { // wg.Done()
t.ImageResult.ImageResult = err.Error() // <-limit
t.ImageResult.ClusterId = c.clusterId // return
t.ImageResult.ClusterName = c.clusterName // }
t.ImageResult.Card = c.urls[idx].Card // t.ImageResult.ImageResult = r
ch <- t.ImageResult // t.ImageResult.ClusterId = c.clusterId
wg.Done() // t.ImageResult.ClusterName = c.clusterName
<-limit // t.ImageResult.Card = c.urls[idx].Card
return //
} // ch <- t.ImageResult
t.ImageResult.ImageResult = r // wg.Done()
t.ImageResult.ClusterId = c.clusterId // <-limit
t.ImageResult.ClusterName = c.clusterName // return
t.ImageResult.Card = c.urls[idx].Card // }
// }(image, cluster)
ch <- t.ImageResult // <-limit
wg.Done() // }
<-limit //}
return //
} //func getInferResult(url string, file multipart.File, fileName string, clusterName string) (string, error) {
}(image, cluster) // if clusterName == "鹏城云脑II-modelarts" {
<-limit // r, err := getInferResultModelarts(url, file, fileName)
} // if err != nil {
} // return "", err
// }
func getInferResult(url string, file multipart.File, fileName string, clusterName string) (string, error) { // return r, nil
if clusterName == "鹏城云脑II-modelarts" { // }
r, err := getInferResultModelarts(url, file, fileName) // var res Res
if err != nil { // req := GetRestyRequest(20)
return "", err // _, err := req.
} // SetFileReader("file", fileName, file).
return r, nil // SetResult(&res).
} // Post(url)
var res Res // if err != nil {
req := GetRestyRequest(20) // return "", err
_, err := req. // }
SetFileReader("file", fileName, file). // return res.Result, nil
SetResult(&res). //}
Post(url) //
if err != nil { //func getInferResultModelarts(url string, file multipart.File, fileName string) (string, error) {
return "", err // var res Res
} // /* req := GetRestyRequest(20)
return res.Result, nil // _, err := req.
} // SetFileReader("file", fileName, file).
// SetHeaders(map[string]string{
func getInferResultModelarts(url string, file multipart.File, fileName string) (string, error) { // "ak": "UNEHPHO4Z7YSNPKRXFE4",
var res Res // "sk": "JWXCE9qcYbc7RjpSRIWt4WgG3ZKF6Q4lPzkJReX9",
/* req := GetRestyRequest(20) // }).
_, err := req. // SetResult(&res).
SetFileReader("file", fileName, file). // Post(url)
SetHeaders(map[string]string{ // if err != nil {
"ak": "UNEHPHO4Z7YSNPKRXFE4", // return "", err
"sk": "JWXCE9qcYbc7RjpSRIWt4WgG3ZKF6Q4lPzkJReX9", // }*/
}). // body, err := utils.SendRequest("POST", url, file, fileName)
SetResult(&res). // if err != nil {
Post(url) // return "", err
if err != nil { // }
return "", err // errjson := json.Unmarshal([]byte(body), &res)
}*/ // if errjson != nil {
body, err := utils.SendRequest("POST", url, file, fileName) // log.Fatalf("Error parsing JSON: %s", errjson)
if err != nil { // }
return "", err // return res.Result, nil
} //}
errjson := json.Unmarshal([]byte(body), &res) //
if errjson != nil { //func GetRestyRequest(timeoutSeconds int64) *resty.Request {
log.Fatalf("Error parsing JSON: %s", errjson) // client := resty.New().SetTimeout(time.Duration(timeoutSeconds) * time.Second)
} // request := client.R()
return res.Result, nil // return request
} //}
//
func GetRestyRequest(timeoutSeconds int64) *resty.Request { //type Res struct {
client := resty.New().SetTimeout(time.Duration(timeoutSeconds) * time.Second) // Result string `json:"result"`
request := client.R() //}
return request //
} //func contains(cs []struct {
// urls []*InferUrl
type Res struct { // clusterId string
Result string `json:"result"` // clusterName string
} // imageNum int32
//}, e string) bool {
func contains(cs []struct { // for _, c := range cs {
urls []*InferUrl // if c.clusterId == e {
clusterId string // return true
clusterName string // }
imageNum int32 // }
}, e string) bool { // return false
for _, c := range cs { //}
if c.clusterId == e {
return true
}
}
return false
}

View File

@ -0,0 +1 @@
package textInference