Former-commit-id: b4e6a91899ea1f44e7b8335940b9a9797ae6c6fe
This commit is contained in:
zhangwei 2024-06-28 09:09:15 +08:00
commit 047b9ff912
13 changed files with 106 additions and 83 deletions

View File

@ -51,8 +51,8 @@ workflow:
input:
docker_username: ((dev.docker_user))
docker_password: ((dev.docker_password))
image_name: '"registry.cn-hangzhou.aliyuncs.com/jcce/pcm-coordinator-api"'
image_tag: git_clone_0.commit_time
image_name: '"registry.cn-hangzhou.aliyuncs.com/jcce/pcm-core-api"'
image_tag: '"latest"'
registry_address: '"registry.cn-hangzhou.aliyuncs.com"'
docker_file: git_clone_0.git_path + '/api/Dockerfile'
docker_build_path: git_clone_0.git_path
@ -65,26 +65,13 @@ workflow:
- ref: end
name: 结束
task: end
needs:
- kubectl_deploy_0
- ref: kubectl_deploy_0
name: kubectl部署资源
task: kubectl_deploy@1.1.0
input:
command: '"apply"'
resource_file_path: git_clone_0.git_path + '/api'
certificate_authority_data: ((dev.k8s_cad))
server: '"https://119.45.100.73:6443"'
client_certificate_data: ((dev.k8s_ccd))
client_key_data: ((dev.k8s_ckd))
hosts: '""'
needs:
- docker_image_build_0
- ref: shell_0
name: shell
image: docker.jianmuhub.com/library/debian:buster-slim
env:
IMAGE_NAME: '"registry.cn-hangzhou.aliyuncs.com/jcce/pcm-coordinator-api"'
IMAGE_NAME: '"registry.cn-hangzhou.aliyuncs.com/jcce/pcm-core-api"'
IMAGE_TAG: git_clone_0.commit_time
SECRET_NAME: global.secret_name
PROJECT_NAME: global.project_name

View File

@ -1,15 +1,15 @@
FROM golang:1.22.4-alpine3.20 AS builder
FROM registry.cn-hangzhou.aliyuncs.com/jcce-images/golang:1.22.4-alpine3.20 AS builder
WORKDIR /app
LABEL stage=gobuilder
ENV CGO_ENABLED 0
ENV GOARCH amd64
ENV GOPROXY https://goproxy.cn,direct
COPY . .
RUN go env -w GO111MODULE=on \
&& go env -w GOPROXY=https://goproxy.cn,direct \
&& go env -w CGO_ENABLED=0
RUN go build -o pcm-coordinator-api /app/api/pcm.go
FROM alpine:3.20
COPY api/etc/ /app/
RUN go mod download && go build -o pcm-coordinator-api /app/api/pcm.go
WORKDIR /app

View File

@ -175,7 +175,7 @@ type VmInfo struct {
//DeletedAt string `json:"deletedAt,omitempty"`
VmName string `json:"vmName,omitempty"`
Replicas int64 `json:"replicas,omitempty"`
//ServerId string `json:"serverId,omitempty"`
ServerId string `json:"serverId,omitempty"`
}
type ResourceStats struct {

View File

@ -1822,8 +1822,11 @@ service AICore-api {
type (
ChatReq{
ApiUrl string `json:"apiUrl,optional"`
ApiUrl string `json:"apiUrl"`
Method string `json:"method,optional"`
ReqData map[string]interface{} `json:"reqData"`
}
ChatResult{
Resuluts string `json:"results,optional"`
}
)

View File

@ -18,7 +18,7 @@ func ProxyApiHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
}
l := ai.NewProxyApiLogic(r.Context(), svcCtx)
resp, err := l.ProxyApi(&req, w)
resp, err := l.ProxyApi(&req)
result.HttpResult(r, w, resp, err)
}
}

View File

@ -5,8 +5,8 @@ import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
tool "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"github.com/go-resty/resty/v2"
"github.com/pkg/errors"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils/hws"
"net/http"
@ -30,21 +30,28 @@ func NewProxyApiLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ProxyApi
}
}
type ChatResult struct {
Results string `json:"results"`
}
const (
XProjectID = "d18190e28e3f45a281ef0b0696ec9d52"
XStage = "RELEASE"
ContentType = "application/json"
)
type ResponseData struct {
Results string `json:"results"`
}
func (l *ProxyApiLogic) ProxyApi(req *types.ChatReq, w http.ResponseWriter) (resp *types.CommonResp, err error) {
func (l *ProxyApiLogic) ProxyApi(req *types.ChatReq) (resp *types.ChatResult, err error) {
logx.Infof("【开始处理请求目标URL: %s】", req.ApiUrl)
jsonBytes, err := json.Marshal(&req.ReqData)
// 调用第三方接口的 POST 方法
thirdReq, err := http.NewRequest("POST", req.ApiUrl, bytes.NewBuffer(jsonBytes))
if err != nil {
return
logx.Errorf("【序列化请求数据失败: %v】", err)
return nil, errors.New("请求数据序列化失败")
}
resp = &types.ChatResult{}
// 构建 HTTP 请求
request, err := http.NewRequest("POST", req.ApiUrl, bytes.NewBuffer(jsonBytes))
if err != nil {
logx.Errorf("【构建 HTTP 请求失败: %v】", err)
return nil, errors.New("网络错误,请稍后重试")
}
signer := &hws.Signer{
@ -52,38 +59,33 @@ func (l *ProxyApiLogic) ProxyApi(req *types.ChatReq, w http.ResponseWriter) (res
Secret: "JWXCE9qcYbc7RjpSRIWt4WgG3ZKF6Q4lPzkJReX9",
}
if err := signer.Sign(thirdReq); err != nil {
return nil, err
if err := signer.Sign(request); err != nil {
logx.Errorf("【接口签名错误: %v】", err)
return nil, errors.New("网络错误,请稍后重试")
}
// 设置client信任所有证书
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
client := &http.Client{
Transport: tr,
client := resty.New().SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true})
response, err := client.R().
SetHeader("X-Project-Id", XProjectID).
SetHeader("x-stage", XStage).
SetHeader("Content-Type", ContentType).
SetHeader("Authorization", request.Header.Get(hws.HeaderXAuthorization)).
SetHeader("X-Sdk-Date", request.Header.Get(hws.HeaderXDateTime)).
SetBody(jsonBytes).
SetResult(&resp).
Post(req.ApiUrl)
if err != nil {
logx.Errorf("【远程调用接口URL:%s 返回错误: %s】", req.ApiUrl, err.Error())
return nil, errors.New("网络错误,请稍后重试")
}
thirdReq.Header.Set("X-Project-Id", "d18190e28e3f45a281ef0b0696ec9d52")
thirdReq.Header.Set("x-stage", "RELEASE")
thirdReq.Header.Set("Authorization", thirdReq.Header.Get(hws.HeaderXAuthorization))
thirdReq.Header.Set("X-Sdk-Date", thirdReq.Header.Get(hws.HeaderXDateTime))
thirdReq.Header.Set("Content-Type", "application/json")
thirdResp, err := client.Do(thirdReq)
defer thirdReq.Body.Close()
var responseData ResponseData
decoder := json.NewDecoder(thirdResp.Body)
if err := decoder.Decode(&responseData); err != nil {
fmt.Println("Error decoding response:", err)
if response.StatusCode() != 200 {
logx.Errorf("【远程调用接口URL:%s 返回错误: %s】", req.ApiUrl, response.Body())
return nil, errors.New("网络错误,请稍后重试")
}
chatResult := &ChatResult{}
tool.Convert(responseData, &chatResult)
return &types.CommonResp{
Code: thirdResp.StatusCode,
Msg: "success",
Data: chatResult,
}, nil
logx.Infof("【请求处理成功目标URL: %s】", req.ApiUrl)
return resp, nil
}

View File

@ -115,7 +115,7 @@ func (l *ImageInferenceLogic) ImageInfer(r *http.Request, req *types.ImageInfere
l.svcCtx.Scheduler.AiStorages.AddNoticeInfo(opt.AdapterId, adapterName, "", "", opt.TaskName, "create", "任务创建中")
go l.svcCtx.Scheduler.AiService.ImageInfer(opt, id, adapterName, clusters, ts, l.ctx)
go l.svcCtx.Scheduler.AiService.ImageInfer(opt, id, adapterName, clusters, ts)
return resp, nil
}

View File

@ -1,6 +1,7 @@
package weightDistributing
import (
"errors"
"math"
)
@ -10,13 +11,18 @@ type Weight struct {
Replica int32
}
func DistributeReplicas(weights []*Weight, replicas int32) {
func DistributeReplicas(weights []*Weight, replicas int32) error {
var weightSum int32
weightSum = 0
for _, w := range weights {
weightSum += w.Weight
}
if weightSum == 0 {
return errors.New("static weights are empty")
}
weightRatio := make([]float64, len(weights))
for i, w := range weights {
weightRatio[i] = float64(w.Weight) / float64(weightSum)
@ -59,9 +65,14 @@ func DistributeReplicas(weights []*Weight, replicas int32) {
weightRatio[maxIdx]--
rest--
} else {
if weights[minIdx].Replica == 0 {
weightRatio[minIdx]++
continue
}
weights[minIdx].Replica--
weightRatio[minIdx]++
rest++
}
}
return nil
}

View File

@ -1,8 +1,7 @@
package service
import (
"context"
"fmt"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/zrpc"
hpcacclient "gitlink.org.cn/JointCloud/pcm-ac/hpcacclient"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/config"
@ -17,6 +16,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-modelarts/client/imagesservice"
"gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice"
"gitlink.org.cn/JointCloud/pcm-octopus/octopusclient"
"net/http"
"strconv"
"sync"
)
@ -91,13 +91,14 @@ func InitAiClusterMap(conf *config.Config, clusters []types.ClusterInfo) (map[st
return executorMap, collectorMap
}
func (as *AiService) ImageInfer(opt *option.InferOption, id int64, adapterName string, clusters []*strategy.AssignedCluster, ts []*inference.ImageFile, ctx context.Context) {
func (as *AiService) ImageInfer(opt *option.InferOption, id int64, adapterName string, clusters []*strategy.AssignedCluster, ts []*inference.ImageFile) {
res, err := inference.Infer(opt, id, adapterName, clusters, ts, as.AiCollectorAdapterMap, as.Storage, ctx)
r := http.Request{}
_, err := inference.Infer(opt, id, adapterName, clusters, ts, as.AiCollectorAdapterMap, as.Storage, r.Context())
if err != nil {
logx.Errorf(err.Error())
return
}
fmt.Println(res)
}
//func (a *AiService) AddCluster() error {

View File

@ -63,18 +63,23 @@ func Infer(opt *option.InferOption, id int64, adapterName string, clusters []*st
}
}
var mutex sync.Mutex
errMap := make(map[string]string)
for _, cluster := range clusters {
wg.Add(1)
c := cluster
go func() {
imageUrls, err := collectorMap[c.ClusterId].GetInferUrl(ctx, opt)
for i, _ := range imageUrls {
imageUrls[i].Url = imageUrls[i].Url + storeLink.FORWARD_SLASH + "image"
}
if err != nil {
mutex.Lock()
errMap[c.ClusterId] = err.Error()
mutex.Unlock()
wg.Done()
return
}
for i, _ := range imageUrls {
imageUrls[i].Url = imageUrls[i].Url + storeLink.FORWARD_SLASH + "image"
}
clusterName, _ := storage.GetClusterNameById(c.ClusterId)
s := struct {
@ -111,6 +116,9 @@ func Infer(opt *option.InferOption, id int64, adapterName string, clusters []*st
for _, t := range aiTaskList {
t.Status = constants.Failed
t.EndTime = time.Now().Format(time.RFC3339)
if _, ok := errMap[strconv.Itoa(int(t.ClusterId))]; ok {
t.Msg = errMap[strconv.Itoa(int(t.ClusterId))]
}
err := storage.UpdateAiTask(t)
if err != nil {
logx.Errorf(err.Error())
@ -142,6 +150,9 @@ func Infer(opt *option.InferOption, id int64, adapterName string, clusters []*st
if ac.ClusterId == strconv.Itoa(int(t.ClusterId)) {
t.Status = constants.Failed
t.EndTime = time.Now().Format(time.RFC3339)
if _, ok := errMap[strconv.Itoa(int(t.ClusterId))]; ok {
t.Msg = errMap[strconv.Itoa(int(t.ClusterId))]
}
err := storage.UpdateAiTask(t)
if err != nil {
logx.Errorf(err.Error())

View File

@ -35,7 +35,10 @@ func (s *StaticWeightStrategy) Schedule() ([]*AssignedCluster, error) {
weights = append(weights, weight)
}
weightDistributing.DistributeReplicas(weights, s.replicas)
err := weightDistributing.DistributeReplicas(weights, s.replicas)
if err != nil {
return nil, err
}
var results []*AssignedCluster
for _, weight := range weights {

View File

@ -2903,11 +2903,15 @@ type AiTask struct {
}
type ChatReq struct {
ApiUrl string `json:"apiUrl,optional"`
ApiUrl string `json:"apiUrl"`
Method string `json:"method,optional"`
ReqData map[string]interface{} `json:"reqData"`
}
type ChatResult struct {
Resuluts string `json:"results,optional"`
}
type StorageScreenReq struct {
}

View File

@ -55,6 +55,7 @@ type (
DeletedAt string `db:"deleted_at"` // 删除时间
VmName string `db:"vm_name"` // 虚拟机名称
Replicas int64 `db:"replicas"` // 副本数
ServerId string `db:"server_id"` // 虚拟机id
}
)
@ -93,14 +94,14 @@ func (m *defaultTaskVmModel) FindOne(ctx context.Context, id int64) (*TaskVm, er
}
func (m *defaultTaskVmModel) Insert(ctx context.Context, data *TaskVm) (sql.Result, error) {
query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, taskVmRowsExpectAutoSet)
ret, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.Name, data.AdapterId, data.AdapterName, data.ClusterId, data.ClusterName, data.FlavorRef, data.ImageRef, data.Status, data.Platform, data.MinCount, data.Uuid, data.StartTime, data.RunningTime, data.Result, data.Remark, data.DeletedAt, data.VmName, data.Replicas)
query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, taskVmRowsExpectAutoSet)
ret, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.Name, data.AdapterId, data.AdapterName, data.ClusterId, data.ClusterName, data.FlavorRef, data.ImageRef, data.Status, data.Platform, data.MinCount, data.Uuid, data.StartTime, data.RunningTime, data.Result, data.Remark, data.DeletedAt, data.VmName, data.Replicas, data.ServerId)
return ret, err
}
func (m *defaultTaskVmModel) Update(ctx context.Context, data *TaskVm) error {
query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, taskVmRowsWithPlaceHolder)
_, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.Name, data.AdapterId, data.AdapterName, data.ClusterId, data.ClusterName, data.FlavorRef, data.ImageRef, data.Status, data.Platform, data.MinCount, data.Uuid, data.StartTime, data.RunningTime, data.Result, data.Remark, data.DeletedAt, data.VmName, data.Replicas, data.Id)
_, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.Name, data.AdapterId, data.AdapterName, data.ClusterId, data.ClusterName, data.FlavorRef, data.ImageRef, data.Status, data.Platform, data.MinCount, data.Uuid, data.StartTime, data.RunningTime, data.Result, data.Remark, data.DeletedAt, data.VmName, data.Replicas, data.ServerId, data.Id)
return err
}