diff --git a/.devops/api.yml b/.devops/api.yml index ecf4f08a..1ecd581d 100644 --- a/.devops/api.yml +++ b/.devops/api.yml @@ -51,8 +51,8 @@ workflow: input: docker_username: ((dev.docker_user)) docker_password: ((dev.docker_password)) - image_name: '"registry.cn-hangzhou.aliyuncs.com/jcce/pcm-coordinator-api"' - image_tag: git_clone_0.commit_time + image_name: '"registry.cn-hangzhou.aliyuncs.com/jcce/pcm-core-api"' + image_tag: '"latest"' registry_address: '"registry.cn-hangzhou.aliyuncs.com"' docker_file: git_clone_0.git_path + '/api/Dockerfile' docker_build_path: git_clone_0.git_path @@ -65,26 +65,13 @@ workflow: - ref: end name: 结束 task: end - needs: - - kubectl_deploy_0 - - ref: kubectl_deploy_0 - name: kubectl部署资源 - task: kubectl_deploy@1.1.0 - input: - command: '"apply"' - resource_file_path: git_clone_0.git_path + '/api' - certificate_authority_data: ((dev.k8s_cad)) - server: '"https://119.45.100.73:6443"' - client_certificate_data: ((dev.k8s_ccd)) - client_key_data: ((dev.k8s_ckd)) - hosts: '""' needs: - docker_image_build_0 - ref: shell_0 name: shell image: docker.jianmuhub.com/library/debian:buster-slim env: - IMAGE_NAME: '"registry.cn-hangzhou.aliyuncs.com/jcce/pcm-coordinator-api"' + IMAGE_NAME: '"registry.cn-hangzhou.aliyuncs.com/jcce/pcm-core-api"' IMAGE_TAG: git_clone_0.commit_time SECRET_NAME: global.secret_name PROJECT_NAME: global.project_name diff --git a/api/Dockerfile b/api/Dockerfile index a27e861c..80416fda 100644 --- a/api/Dockerfile +++ b/api/Dockerfile @@ -1,15 +1,15 @@ -FROM golang:1.22.4-alpine3.20 AS builder +FROM registry.cn-hangzhou.aliyuncs.com/jcce-images/golang:1.22.4-alpine3.20 AS builder WORKDIR /app +LABEL stage=gobuilder +ENV CGO_ENABLED 0 +ENV GOARCH amd64 +ENV GOPROXY https://goproxy.cn,direct + COPY . . - -RUN go env -w GO111MODULE=on \ -&& go env -w GOPROXY=https://goproxy.cn,direct \ -&& go env -w CGO_ENABLED=0 -RUN go build -o pcm-coordinator-api /app/api/pcm.go - -FROM alpine:3.20 +COPY api/etc/ /app/ +RUN go mod download && go build -o pcm-coordinator-api /app/api/pcm.go WORKDIR /app diff --git a/api/client/types.go b/api/client/types.go index 2aa28e22..bf5c676d 100644 --- a/api/client/types.go +++ b/api/client/types.go @@ -175,7 +175,7 @@ type VmInfo struct { //DeletedAt string `json:"deletedAt,omitempty"` VmName string `json:"vmName,omitempty"` Replicas int64 `json:"replicas,omitempty"` - //ServerId string `json:"serverId,omitempty"` + ServerId string `json:"serverId,omitempty"` } type ResourceStats struct { diff --git a/api/desc/ai/pcm-ai.api b/api/desc/ai/pcm-ai.api index fefc7ea3..8d2ccfe3 100644 --- a/api/desc/ai/pcm-ai.api +++ b/api/desc/ai/pcm-ai.api @@ -1822,8 +1822,11 @@ service AICore-api { type ( ChatReq{ - ApiUrl string `json:"apiUrl,optional"` + ApiUrl string `json:"apiUrl"` Method string `json:"method,optional"` ReqData map[string]interface{} `json:"reqData"` } + ChatResult{ + Resuluts string `json:"results,optional"` + } ) \ No newline at end of file diff --git a/api/internal/handler/ai/proxyapihandler.go b/api/internal/handler/ai/proxyapihandler.go index cbb732e2..b86ce9d2 100644 --- a/api/internal/handler/ai/proxyapihandler.go +++ b/api/internal/handler/ai/proxyapihandler.go @@ -18,7 +18,7 @@ func ProxyApiHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { } l := ai.NewProxyApiLogic(r.Context(), svcCtx) - resp, err := l.ProxyApi(&req, w) + resp, err := l.ProxyApi(&req) result.HttpResult(r, w, resp, err) } } diff --git a/api/internal/logic/ai/proxyapilogic.go b/api/internal/logic/ai/proxyapilogic.go index 1e7235b0..adec665f 100644 --- a/api/internal/logic/ai/proxyapilogic.go +++ b/api/internal/logic/ai/proxyapilogic.go @@ -5,8 +5,8 @@ import ( "context" "crypto/tls" "encoding/json" - "fmt" - tool "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" + "github.com/go-resty/resty/v2" + "github.com/pkg/errors" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils/hws" "net/http" @@ -30,21 +30,28 @@ func NewProxyApiLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ProxyApi } } -type ChatResult struct { - Results string `json:"results"` -} +const ( + XProjectID = "d18190e28e3f45a281ef0b0696ec9d52" + XStage = "RELEASE" + ContentType = "application/json" +) -type ResponseData struct { - Results string `json:"results"` -} - -func (l *ProxyApiLogic) ProxyApi(req *types.ChatReq, w http.ResponseWriter) (resp *types.CommonResp, err error) { +func (l *ProxyApiLogic) ProxyApi(req *types.ChatReq) (resp *types.ChatResult, err error) { + logx.Infof("【开始处理请求,目标URL: %s】", req.ApiUrl) jsonBytes, err := json.Marshal(&req.ReqData) - // 调用第三方接口的 POST 方法 - thirdReq, err := http.NewRequest("POST", req.ApiUrl, bytes.NewBuffer(jsonBytes)) if err != nil { - return + logx.Errorf("【序列化请求数据失败: %v】", err) + return nil, errors.New("请求数据序列化失败") + } + + resp = &types.ChatResult{} + + // 构建 HTTP 请求 + request, err := http.NewRequest("POST", req.ApiUrl, bytes.NewBuffer(jsonBytes)) + if err != nil { + logx.Errorf("【构建 HTTP 请求失败: %v】", err) + return nil, errors.New("网络错误,请稍后重试") } signer := &hws.Signer{ @@ -52,38 +59,33 @@ func (l *ProxyApiLogic) ProxyApi(req *types.ChatReq, w http.ResponseWriter) (res Secret: "JWXCE9qcYbc7RjpSRIWt4WgG3ZKF6Q4lPzkJReX9", } - if err := signer.Sign(thirdReq); err != nil { - return nil, err + if err := signer.Sign(request); err != nil { + logx.Errorf("【接口签名错误: %v】", err) + return nil, errors.New("网络错误,请稍后重试") } - // 设置client信任所有证书 - tr := &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, - } - client := &http.Client{ - Transport: tr, + client := resty.New().SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true}) + + response, err := client.R(). + SetHeader("X-Project-Id", XProjectID). + SetHeader("x-stage", XStage). + SetHeader("Content-Type", ContentType). + SetHeader("Authorization", request.Header.Get(hws.HeaderXAuthorization)). + SetHeader("X-Sdk-Date", request.Header.Get(hws.HeaderXDateTime)). + SetBody(jsonBytes). + SetResult(&resp). + Post(req.ApiUrl) + + if err != nil { + logx.Errorf("【远程调用接口URL:%s, 返回错误: %s】", req.ApiUrl, err.Error()) + return nil, errors.New("网络错误,请稍后重试") } - thirdReq.Header.Set("X-Project-Id", "d18190e28e3f45a281ef0b0696ec9d52") - thirdReq.Header.Set("x-stage", "RELEASE") - thirdReq.Header.Set("Authorization", thirdReq.Header.Get(hws.HeaderXAuthorization)) - thirdReq.Header.Set("X-Sdk-Date", thirdReq.Header.Get(hws.HeaderXDateTime)) - thirdReq.Header.Set("Content-Type", "application/json") - - thirdResp, err := client.Do(thirdReq) - - defer thirdReq.Body.Close() - var responseData ResponseData - decoder := json.NewDecoder(thirdResp.Body) - if err := decoder.Decode(&responseData); err != nil { - fmt.Println("Error decoding response:", err) + if response.StatusCode() != 200 { + logx.Errorf("【远程调用接口URL:%s, 返回错误: %s】", req.ApiUrl, response.Body()) + return nil, errors.New("网络错误,请稍后重试") } - chatResult := &ChatResult{} - tool.Convert(responseData, &chatResult) - return &types.CommonResp{ - Code: thirdResp.StatusCode, - Msg: "success", - Data: chatResult, - }, nil + logx.Infof("【请求处理成功,目标URL: %s】", req.ApiUrl) + return resp, nil } diff --git a/api/internal/logic/inference/imageinferencelogic.go b/api/internal/logic/inference/imageinferencelogic.go index f10a3c53..e37265ab 100644 --- a/api/internal/logic/inference/imageinferencelogic.go +++ b/api/internal/logic/inference/imageinferencelogic.go @@ -115,7 +115,7 @@ func (l *ImageInferenceLogic) ImageInfer(r *http.Request, req *types.ImageInfere l.svcCtx.Scheduler.AiStorages.AddNoticeInfo(opt.AdapterId, adapterName, "", "", opt.TaskName, "create", "任务创建中") - go l.svcCtx.Scheduler.AiService.ImageInfer(opt, id, adapterName, clusters, ts, l.ctx) + go l.svcCtx.Scheduler.AiService.ImageInfer(opt, id, adapterName, clusters, ts) return resp, nil } diff --git a/api/internal/scheduler/algorithm/weightDistributing/weightDistributing.go b/api/internal/scheduler/algorithm/weightDistributing/weightDistributing.go index 83c15723..ad592dc4 100644 --- a/api/internal/scheduler/algorithm/weightDistributing/weightDistributing.go +++ b/api/internal/scheduler/algorithm/weightDistributing/weightDistributing.go @@ -1,6 +1,7 @@ package weightDistributing import ( + "errors" "math" ) @@ -10,13 +11,18 @@ type Weight struct { Replica int32 } -func DistributeReplicas(weights []*Weight, replicas int32) { +func DistributeReplicas(weights []*Weight, replicas int32) error { + var weightSum int32 weightSum = 0 for _, w := range weights { weightSum += w.Weight } + if weightSum == 0 { + return errors.New("static weights are empty") + } + weightRatio := make([]float64, len(weights)) for i, w := range weights { weightRatio[i] = float64(w.Weight) / float64(weightSum) @@ -59,9 +65,14 @@ func DistributeReplicas(weights []*Weight, replicas int32) { weightRatio[maxIdx]-- rest-- } else { + if weights[minIdx].Replica == 0 { + weightRatio[minIdx]++ + continue + } weights[minIdx].Replica-- weightRatio[minIdx]++ rest++ } } + return nil } diff --git a/api/internal/scheduler/service/aiService.go b/api/internal/scheduler/service/aiService.go index 1e1d9df0..6d07a8e7 100644 --- a/api/internal/scheduler/service/aiService.go +++ b/api/internal/scheduler/service/aiService.go @@ -1,8 +1,7 @@ package service import ( - "context" - "fmt" + "github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/zrpc" hpcacclient "gitlink.org.cn/JointCloud/pcm-ac/hpcacclient" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/config" @@ -17,6 +16,7 @@ import ( "gitlink.org.cn/JointCloud/pcm-modelarts/client/imagesservice" "gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice" "gitlink.org.cn/JointCloud/pcm-octopus/octopusclient" + "net/http" "strconv" "sync" ) @@ -91,13 +91,14 @@ func InitAiClusterMap(conf *config.Config, clusters []types.ClusterInfo) (map[st return executorMap, collectorMap } -func (as *AiService) ImageInfer(opt *option.InferOption, id int64, adapterName string, clusters []*strategy.AssignedCluster, ts []*inference.ImageFile, ctx context.Context) { +func (as *AiService) ImageInfer(opt *option.InferOption, id int64, adapterName string, clusters []*strategy.AssignedCluster, ts []*inference.ImageFile) { - res, err := inference.Infer(opt, id, adapterName, clusters, ts, as.AiCollectorAdapterMap, as.Storage, ctx) + r := http.Request{} + _, err := inference.Infer(opt, id, adapterName, clusters, ts, as.AiCollectorAdapterMap, as.Storage, r.Context()) if err != nil { + logx.Errorf(err.Error()) return } - fmt.Println(res) } //func (a *AiService) AddCluster() error { diff --git a/api/internal/scheduler/service/inference/imageInfer.go b/api/internal/scheduler/service/inference/imageInfer.go index 732072b2..2cf1e713 100644 --- a/api/internal/scheduler/service/inference/imageInfer.go +++ b/api/internal/scheduler/service/inference/imageInfer.go @@ -63,18 +63,23 @@ func Infer(opt *option.InferOption, id int64, adapterName string, clusters []*st } } + var mutex sync.Mutex + errMap := make(map[string]string) for _, cluster := range clusters { wg.Add(1) c := cluster go func() { imageUrls, err := collectorMap[c.ClusterId].GetInferUrl(ctx, opt) - for i, _ := range imageUrls { - imageUrls[i].Url = imageUrls[i].Url + storeLink.FORWARD_SLASH + "image" - } if err != nil { + mutex.Lock() + errMap[c.ClusterId] = err.Error() + mutex.Unlock() wg.Done() return } + for i, _ := range imageUrls { + imageUrls[i].Url = imageUrls[i].Url + storeLink.FORWARD_SLASH + "image" + } clusterName, _ := storage.GetClusterNameById(c.ClusterId) s := struct { @@ -111,6 +116,9 @@ func Infer(opt *option.InferOption, id int64, adapterName string, clusters []*st for _, t := range aiTaskList { t.Status = constants.Failed t.EndTime = time.Now().Format(time.RFC3339) + if _, ok := errMap[strconv.Itoa(int(t.ClusterId))]; ok { + t.Msg = errMap[strconv.Itoa(int(t.ClusterId))] + } err := storage.UpdateAiTask(t) if err != nil { logx.Errorf(err.Error()) @@ -142,6 +150,9 @@ func Infer(opt *option.InferOption, id int64, adapterName string, clusters []*st if ac.ClusterId == strconv.Itoa(int(t.ClusterId)) { t.Status = constants.Failed t.EndTime = time.Now().Format(time.RFC3339) + if _, ok := errMap[strconv.Itoa(int(t.ClusterId))]; ok { + t.Msg = errMap[strconv.Itoa(int(t.ClusterId))] + } err := storage.UpdateAiTask(t) if err != nil { logx.Errorf(err.Error()) diff --git a/api/internal/scheduler/strategy/staticWeight.go b/api/internal/scheduler/strategy/staticWeight.go index 8b3108e9..f106b608 100644 --- a/api/internal/scheduler/strategy/staticWeight.go +++ b/api/internal/scheduler/strategy/staticWeight.go @@ -35,7 +35,10 @@ func (s *StaticWeightStrategy) Schedule() ([]*AssignedCluster, error) { weights = append(weights, weight) } - weightDistributing.DistributeReplicas(weights, s.replicas) + err := weightDistributing.DistributeReplicas(weights, s.replicas) + if err != nil { + return nil, err + } var results []*AssignedCluster for _, weight := range weights { diff --git a/api/internal/types/types.go b/api/internal/types/types.go index 5ab49701..82be107b 100644 --- a/api/internal/types/types.go +++ b/api/internal/types/types.go @@ -2903,11 +2903,15 @@ type AiTask struct { } type ChatReq struct { - ApiUrl string `json:"apiUrl,optional"` + ApiUrl string `json:"apiUrl"` Method string `json:"method,optional"` ReqData map[string]interface{} `json:"reqData"` } +type ChatResult struct { + Resuluts string `json:"results,optional"` +} + type StorageScreenReq struct { } diff --git a/pkg/models/taskvmmodel_gen.go b/pkg/models/taskvmmodel_gen.go index 697ffbe9..74f6d4e6 100644 --- a/pkg/models/taskvmmodel_gen.go +++ b/pkg/models/taskvmmodel_gen.go @@ -55,6 +55,7 @@ type ( DeletedAt string `db:"deleted_at"` // 删除时间 VmName string `db:"vm_name"` // 虚拟机名称 Replicas int64 `db:"replicas"` // 副本数 + ServerId string `db:"server_id"` // 虚拟机id } ) @@ -93,14 +94,14 @@ func (m *defaultTaskVmModel) FindOne(ctx context.Context, id int64) (*TaskVm, er } func (m *defaultTaskVmModel) Insert(ctx context.Context, data *TaskVm) (sql.Result, error) { - query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, taskVmRowsExpectAutoSet) - ret, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.Name, data.AdapterId, data.AdapterName, data.ClusterId, data.ClusterName, data.FlavorRef, data.ImageRef, data.Status, data.Platform, data.MinCount, data.Uuid, data.StartTime, data.RunningTime, data.Result, data.Remark, data.DeletedAt, data.VmName, data.Replicas) + query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, taskVmRowsExpectAutoSet) + ret, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.Name, data.AdapterId, data.AdapterName, data.ClusterId, data.ClusterName, data.FlavorRef, data.ImageRef, data.Status, data.Platform, data.MinCount, data.Uuid, data.StartTime, data.RunningTime, data.Result, data.Remark, data.DeletedAt, data.VmName, data.Replicas, data.ServerId) return ret, err } func (m *defaultTaskVmModel) Update(ctx context.Context, data *TaskVm) error { query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, taskVmRowsWithPlaceHolder) - _, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.Name, data.AdapterId, data.AdapterName, data.ClusterId, data.ClusterName, data.FlavorRef, data.ImageRef, data.Status, data.Platform, data.MinCount, data.Uuid, data.StartTime, data.RunningTime, data.Result, data.Remark, data.DeletedAt, data.VmName, data.Replicas, data.Id) + _, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.Name, data.AdapterId, data.AdapterName, data.ClusterId, data.ClusterName, data.FlavorRef, data.ImageRef, data.Status, data.Platform, data.MinCount, data.Uuid, data.StartTime, data.RunningTime, data.Result, data.Remark, data.DeletedAt, data.VmName, data.Replicas, data.ServerId, data.Id) return err }