Former-commit-id: 19b1b24a6cc9dad88bf59e60e967f307d1e096d3
This commit is contained in:
zhangwei 2024-02-05 11:05:26 +08:00
commit a0c9f23255
77 changed files with 2213 additions and 455 deletions

View File

@ -10,12 +10,6 @@ global:
required: false
type: STRING
hidden: false
- ref: nacos_host
name: nacos_host
value: '"10.206.0.12"'
required: false
type: STRING
hidden: false
- ref: secret_name
name: ""
value: '"jcce-aliyuncs"'
@ -45,8 +39,8 @@ workflow:
name: git clone
task: git_clone@1.2.6
input:
remote_url: '"https://gitlink.org.cn/jcce-pcm/pcm-coordinator.git"'
ref: '"refs/heads/JCOS"'
remote_url: '"https://gitlink.org.cn/JointCloud/pcm-coordinator.git"'
ref: '"refs/heads/master"'
commit_id: '""'
depth: 1
needs:
@ -93,14 +87,12 @@ workflow:
IMAGE_NAME: '"registry.cn-hangzhou.aliyuncs.com/jcce/pcm-coordinator-api"'
IMAGE_TAG: git_clone_0.commit_time
SECRET_NAME: global.secret_name
NACOS_HOST: global.nacos_host
PROJECT_NAME: global.project_name
PROJECT_PATH: git_clone_0.git_path
script:
- cd ${PROJECT_PATH}/api
- sed -i "s#image_name#${IMAGE_NAME}:${IMAGE_TAG}#" ${PROJECT_NAME}.yaml
- sed -i "s#secret_name#${SECRET_NAME}#" ${PROJECT_NAME}.yaml
- sed -i "s#nacos_host#${NACOS_HOST}#" ${PROJECT_NAME}.yaml
- cat ${PROJECT_NAME}.yaml
needs:
- git_clone_0

View File

@ -10,12 +10,6 @@ global:
required: false
type: STRING
hidden: false
- ref: nacos_host
name: nacos_host
value: '"10.206.0.12"'
required: false
type: STRING
hidden: false
- ref: secret_name
name: ""
value: '"jcce-aliyuncs"'
@ -45,8 +39,8 @@ workflow:
name: git clone
task: git_clone@1.2.6
input:
remote_url: '"https://gitlink.org.cn/jcce-pcm/pcm-coordinator.git"'
ref: '"refs/heads/JCOS"'
remote_url: '"https://gitlink.org.cn/JointCloud/pcm-coordinator.git"'
ref: '"refs/heads/master"'
commit_id: '""'
depth: 1
needs:
@ -93,14 +87,12 @@ workflow:
IMAGE_NAME: '"registry.cn-hangzhou.aliyuncs.com/jcce/pcm-coordinator-rpc"'
IMAGE_TAG: git_clone_0.commit_time
SECRET_NAME: global.secret_name
NACOS_HOST: global.nacos_host
PROJECT_NAME: global.project_name
PROJECT_PATH: git_clone_0.git_path
script:
- cd ${PROJECT_PATH}/rpc
- sed -i "s#image_name#${IMAGE_NAME}:${IMAGE_TAG}#" ${PROJECT_NAME}.yaml
- sed -i "s#secret_name#${SECRET_NAME}#" ${PROJECT_NAME}.yaml
- sed -i "s#nacos_host#${NACOS_HOST}#" ${PROJECT_NAME}.yaml
- cat ${PROJECT_NAME}.yaml
needs:
- git_clone_0

View File

@ -673,3 +673,95 @@ type (
Data interface{} `json:"data,omitempty"`
}
)
type (
AdapterReq {
Id string `json:"id,optional" db:"id"`
Name string `json:"name,optional"`
Type string `json:"type,optional"`
Nickname string `json:"nickname,optional"`
Version string `json:"version,optional"`
Server string `json:"server,optional"`
}
AdapterDelReq {
Id string `form:"id,optional" db:"id"`
}
AdapterInfo {
Id string `json:"id,omitempty" db:"id"`
Name string `json:"name,omitempty" db:"name"`
Type string `json:"type,omitempty" db:"type"`
Nickname string `json:"nickname,omitempty" db:"nickname"`
Version string `json:"version,omitempty" db:"version"`
Server string `json:"server,omitempty" db:"server"`
CreateTime string `json:"createTime,omitempty" db:"create_time" gorm:"autoCreateTime"`
}
AdapterResp {
Code int `json:"code,omitempty"`
Msg string `json:"msg,omitempty"`
Data AdapterInfo `json:"data,omitempty"`
}
AdapterListResp {
Code int `json:"code,omitempty"`
Msg string `json:"msg,omitempty"`
Data []AdapterInfo `json:"data,omitempty"`
}
)
type ClusterReq {
Id string `json:"id,optional"`
AdapterId string `json:"adapterId,optional"`
Name string `json:"name,optional"`
Nickname string `json:"nickname,optional"`
Description string `json:"description,optional"`
Server string `json:"server,optional"`
MonitorServer string `json:"monitorServer,optional"`
Username string `json:"username,optional"`
Password string `json:"password,optional"`
Token string `json:"token,optional"`
Ak string `json:"ak,optional"`
Sk string `json:"sk,optional"`
Region string `json:"region,optional"`
ProjectId string `json:"projectId,optional"`
Version string `json:"version,optional"`
Label string `json:"label,optional"`
OwnerId string `json:"ownerId,omitempty,optional"`
AuthType string `json:"authType,optional"`
}
type ClusterDelReq {
Id string `form:"id,optional"`
}
type ClusterInfo {
Id string `json:"id,omitempty" db:"id"`
AdapterId string `json:"adapterId,omitempty" db:"adapter_id"`
Name string `json:"name,omitempty" db:"name"`
Nickname string `json:"nickname,omitempty" db:"nickname"`
Description string `json:"description,omitempty" db:"description"`
Server string `json:"server,omitempty" db:"server"`
MonitorServer string `json:"monitorServer,omitempty" db:"monitor_server"`
Username string `json:"username,omitempty" db:"username"`
Password string `json:"password,omitempty" db:"password"`
Token string `json:"token,omitempty" db:"token"`
Ak string `json:"ak,omitempty" db:"ak"`
Sk string `json:"sk,omitempty" db:"sk"`
Region string `json:"region,omitempty" db:"region"`
ProjectId string `json:"projectId,omitempty" db:"project_id"`
Version string `json:"version,omitempty" db:"version"`
Label string `json:"label,omitempty" db:"label"`
OwnerId string `json:"ownerId,omitempty" db:"owner_id"`
AuthType string `json:"authType,omitempty" db:"auth_type"`
CreateTime string `json:"createTime,omitempty" db:"created_time" gorm:"autoCreateTime"`
}
type ClusterResp {
Code int `json:"code,omitempty"`
Msg string `json:"msg,omitempty"`
Data ClusterInfo `json:"data,omitempty"`
}
type ClusterListResp {
Code int `json:"code,omitempty"`
Msg string `json:"msg,omitempty"`
Data []ClusterInfo `json:"data,omitempty"`
}

View File

@ -572,3 +572,41 @@ service pcm {
@handler StartAppByAppName
put /apps/startApp (DeleteAppReq) returns (AppResp)
}
// 接口
@server(
prefix: pcm/v1
group : adapters
)
service pcm {
@handler AdaptersListHandler
get /adapter/list (AdapterReq) returns (AdapterListResp)
@handler CreateAdapterHandler
post /adapter/create (AdapterReq) returns (AdapterResp)
@handler UpdateAdapterHandler
put /adapter/update (AdapterReq) returns (AdapterResp)
@handler DeleteAdapterHandler
delete /adapter/delete (AdapterDelReq) returns (AdapterResp)
@handler GetAdapterHandler
get /adapter/get (AdapterDelReq) returns (AdapterResp)
@handler ClusterListHandler
get /adapter/cluster/list (ClusterReq) returns (ClusterListResp)
@handler CreateClusterHandler
post /adapter/cluster/create (ClusterReq) returns (ClusterResp)
@handler UpdateClusterHandler
put /adapter/cluster/update (ClusterReq) returns (ClusterResp)
@handler DeleteClusterHandler
delete /adapter/cluster/delete (ClusterDelReq) returns (ClusterResp)
@handler GetClusterHandler
get /adapter/cluster/get (ClusterDelReq) returns (ClusterResp)
}

View File

@ -5,7 +5,7 @@ Port: 8999
Timeout: 50000
DB:
DataSource: root:uJpLd6u-J?HC1@(119.45.100.73:3306)/pcm?parseTime=true&loc=Local
DataSource: root:uJpLd6u-J?HC1@(10.206.0.12:3306)/pcm?parseTime=true&loc=Local
Redis:
Host: 10.206.0.12:6379
@ -19,7 +19,7 @@ Cache:
K8sNativeConf:
# target: nacos://10.206.0.12:8848/pcm.kubenative.rpc?timeout=30s&namespaceid=test&groupname=DEFAULT_GROUP&appName=pcm.core.api
Endpoints:
- pcm-participant-kubernetes-service:2003
- 127.0.0.1:2003
NonBlock: true
#rpc
@ -53,9 +53,9 @@ ACRpcConf:
#rpc
CephRpcConf:
target: nacos://10.206.0.12:8848/pcm.ceph.rpc?timeout=30s&namespaceid=test&groupname=DEFAULT_GROUP&appName=pcm.core.api
# Endpoints:
# - 127.0.0.1:8888
# target: nacos://10.206.0.12:8848/pcm.ceph.rpc?timeout=30s&namespaceid=test&groupname=DEFAULT_GROUP&appName=pcm.core.api
Endpoints:
- pcm-participant-ceph-service:2008
NonBlock: true
Timeout: 50000

View File

@ -0,0 +1,24 @@
package adapters
import (
"github.com/zeromicro/go-zero/rest/httpx"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/logic/adapters"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/repository/result"
"net/http"
)
func AdaptersListHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req types.AdapterReq
if err := httpx.Parse(r, &req); err != nil {
result.ParamErrorResult(r, w, err)
return
}
l := adapters.NewAdaptersListLogic(r.Context(), svcCtx)
resp, err := l.AdaptersList(&req)
result.HttpResult(r, w, resp, err)
}
}

View File

@ -0,0 +1,24 @@
package adapters
import (
"github.com/zeromicro/go-zero/rest/httpx"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/logic/adapters"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/repository/result"
"net/http"
)
func ClusterListHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req types.ClusterReq
if err := httpx.Parse(r, &req); err != nil {
result.ParamErrorResult(r, w, err)
return
}
l := adapters.NewClusterListLogic(r.Context(), svcCtx)
resp, err := l.ClusterList(&req)
result.HttpResult(r, w, resp, err)
}
}

View File

@ -0,0 +1,24 @@
package adapters
import (
"github.com/zeromicro/go-zero/rest/httpx"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/logic/adapters"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/repository/result"
"net/http"
)
func CreateAdapterHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req types.AdapterReq
if err := httpx.Parse(r, &req); err != nil {
result.ParamErrorResult(r, w, err)
return
}
l := adapters.NewCreateAdapterLogic(r.Context(), svcCtx)
resp, err := l.CreateAdapter(&req)
result.HttpResult(r, w, resp, err)
}
}

View File

@ -0,0 +1,24 @@
package adapters
import (
"github.com/zeromicro/go-zero/rest/httpx"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/logic/adapters"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/repository/result"
"net/http"
)
func CreateClusterHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req types.ClusterReq
if err := httpx.Parse(r, &req); err != nil {
result.ParamErrorResult(r, w, err)
return
}
l := adapters.NewCreateClusterLogic(r.Context(), svcCtx)
resp, err := l.CreateCluster(&req)
result.HttpResult(r, w, resp, err)
}
}

View File

@ -0,0 +1,24 @@
package adapters
import (
"github.com/zeromicro/go-zero/rest/httpx"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/logic/adapters"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/repository/result"
"net/http"
)
func DeleteAdapterHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req types.AdapterDelReq
if err := httpx.Parse(r, &req); err != nil {
result.ParamErrorResult(r, w, err)
return
}
l := adapters.NewDeleteAdapterLogic(r.Context(), svcCtx)
resp, err := l.DeleteAdapter(&req)
result.HttpResult(r, w, resp, err)
}
}

View File

@ -0,0 +1,24 @@
package adapters
import (
"github.com/zeromicro/go-zero/rest/httpx"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/logic/adapters"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/repository/result"
"net/http"
)
func DeleteClusterHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req types.ClusterDelReq
if err := httpx.Parse(r, &req); err != nil {
result.ParamErrorResult(r, w, err)
return
}
l := adapters.NewDeleteClusterLogic(r.Context(), svcCtx)
resp, err := l.DeleteCluster(&req)
result.HttpResult(r, w, resp, err)
}
}

View File

@ -0,0 +1,24 @@
package adapters
import (
"github.com/zeromicro/go-zero/rest/httpx"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/logic/adapters"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/repository/result"
"net/http"
)
func GetAdapterHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req types.AdapterDelReq
if err := httpx.Parse(r, &req); err != nil {
result.ParamErrorResult(r, w, err)
return
}
l := adapters.NewGetAdapterLogic(r.Context(), svcCtx)
resp, err := l.GetAdapter(&req)
result.HttpResult(r, w, resp, err)
}
}

View File

@ -0,0 +1,24 @@
package adapters
import (
"github.com/zeromicro/go-zero/rest/httpx"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/logic/adapters"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/repository/result"
"net/http"
)
func GetClusterHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req types.ClusterDelReq
if err := httpx.Parse(r, &req); err != nil {
result.ParamErrorResult(r, w, err)
return
}
l := adapters.NewGetClusterLogic(r.Context(), svcCtx)
resp, err := l.GetCluster(&req)
result.HttpResult(r, w, resp, err)
}
}

View File

@ -0,0 +1,24 @@
package adapters
import (
"github.com/zeromicro/go-zero/rest/httpx"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/logic/adapters"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/repository/result"
"net/http"
)
func UpdateAdapterHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req types.AdapterReq
if err := httpx.Parse(r, &req); err != nil {
result.ParamErrorResult(r, w, err)
return
}
l := adapters.NewUpdateAdapterLogic(r.Context(), svcCtx)
resp, err := l.UpdateAdapter(&req)
result.HttpResult(r, w, resp, err)
}
}

View File

@ -0,0 +1,24 @@
package adapters
import (
"github.com/zeromicro/go-zero/rest/httpx"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/logic/adapters"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/repository/result"
"net/http"
)
func UpdateClusterHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req types.ClusterReq
if err := httpx.Parse(r, &req); err != nil {
result.ParamErrorResult(r, w, err)
return
}
l := adapters.NewUpdateClusterLogic(r.Context(), svcCtx)
resp, err := l.UpdateCluster(&req)
result.HttpResult(r, w, resp, err)
}
}

View File

@ -4,6 +4,7 @@ package handler
import (
"net/http"
adapters "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/handler/adapters"
ai "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/handler/ai"
apps "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/handler/apps"
cloud "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/handler/cloud"
@ -692,4 +693,60 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
},
rest.WithPrefix("/pcm/v1"),
)
server.AddRoutes(
[]rest.Route{
{
Method: http.MethodGet,
Path: "/adapter/list",
Handler: adapters.AdaptersListHandler(serverCtx),
},
{
Method: http.MethodPost,
Path: "/adapter/create",
Handler: adapters.CreateAdapterHandler(serverCtx),
},
{
Method: http.MethodPut,
Path: "/adapter/update",
Handler: adapters.UpdateAdapterHandler(serverCtx),
},
{
Method: http.MethodDelete,
Path: "/adapter/delete",
Handler: adapters.DeleteAdapterHandler(serverCtx),
},
{
Method: http.MethodGet,
Path: "/adapter/get",
Handler: adapters.GetAdapterHandler(serverCtx),
},
{
Method: http.MethodGet,
Path: "/adapter/cluster/list",
Handler: adapters.ClusterListHandler(serverCtx),
},
{
Method: http.MethodPost,
Path: "/adapter/cluster/create",
Handler: adapters.CreateClusterHandler(serverCtx),
},
{
Method: http.MethodPut,
Path: "/adapter/cluster/update",
Handler: adapters.UpdateClusterHandler(serverCtx),
},
{
Method: http.MethodDelete,
Path: "/adapter/cluster/delete",
Handler: adapters.DeleteClusterHandler(serverCtx),
},
{
Method: http.MethodGet,
Path: "/adapter/cluster/get",
Handler: adapters.GetClusterHandler(serverCtx),
},
},
rest.WithPrefix("/pcm/v1"),
)
}

View File

@ -0,0 +1,34 @@
package adapters
import (
"context"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type AdaptersListLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewAdaptersListLogic(ctx context.Context, svcCtx *svc.ServiceContext) *AdaptersListLogic {
return &AdaptersListLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *AdaptersListLogic) AdaptersList(req *types.AdapterReq) (resp *types.AdapterListResp, err error) {
resp = &types.AdapterListResp{}
tx := l.svcCtx.DbEngin.Raw("select * from t_adapter where `deleted_at` IS NULL ORDER BY create_time Desc").Scan(&resp.Data)
if tx.Error != nil {
logx.Errorf(tx.Error.Error())
return nil, tx.Error
}
return resp, nil
}

View File

@ -0,0 +1,34 @@
package adapters
import (
"context"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type ClusterListLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewClusterListLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ClusterListLogic {
return &ClusterListLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *ClusterListLogic) ClusterList(req *types.ClusterReq) (resp *types.ClusterListResp, err error) {
resp = &types.ClusterListResp{}
tx := l.svcCtx.DbEngin.Raw("select * from t_cluster where `deleted_at` IS NULL ORDER BY create_time Desc").Scan(&resp.Data)
if tx.Error != nil {
logx.Errorf(tx.Error.Error())
return nil, tx.Error
}
return resp, nil
}

View File

@ -0,0 +1,35 @@
package adapters
import (
"context"
"fmt"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils"
"time"
)
type CreateAdapterLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewCreateAdapterLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CreateAdapterLogic {
return &CreateAdapterLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *CreateAdapterLogic) CreateAdapter(req *types.AdapterReq) (resp *types.AdapterResp, err error) {
adapter := types.AdapterInfo{}
utils.Convert(req, &adapter)
adapter.Id = utils.GenSnowflakeIDStr()
adapter.CreateTime = time.Now().Format("2006-01-02 15:04:05")
tx := l.svcCtx.DbEngin.Table("t_adapter").Create(&adapter)
fmt.Print(tx.Error)
return
}

View File

@ -0,0 +1,41 @@
package adapters
import (
"context"
"errors"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils"
"time"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type CreateClusterLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewCreateClusterLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CreateClusterLogic {
return &CreateClusterLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *CreateClusterLogic) CreateCluster(req *types.ClusterReq) (resp *types.ClusterResp, err error) {
cluster := types.ClusterInfo{}
utils.Convert(req, &cluster)
cluster.Id = utils.GenSnowflakeIDStr()
cluster.CreateTime = time.Now().Format("2006-01-02 15:04:05")
cluster.OwnerId = "0"
tx := l.svcCtx.DbEngin.Table("t_cluster").Create(&cluster)
if tx.Error != nil {
logx.Errorf(tx.Error.Error())
return nil, errors.New("cluster create failed")
}
return
}

View File

@ -0,0 +1,44 @@
package adapters
import (
"context"
"github.com/pkg/errors"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type DeleteAdapterLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewDeleteAdapterLogic(ctx context.Context, svcCtx *svc.ServiceContext) *DeleteAdapterLogic {
return &DeleteAdapterLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *DeleteAdapterLogic) DeleteAdapter(req *types.AdapterDelReq) (resp *types.AdapterResp, err error) {
var sId int64
l.svcCtx.DbEngin.Table("t_adapter").Raw("select a.id from t_cluster c left join t_adapter a on c.adapter_id=a.id where a.id = ? ", req.Id).Scan(&sId)
if sId != 0 {
return nil, errors.New("Delete failed,The adapter is associated with a cluster")
}
db := l.svcCtx.DbEngin.Table("t_adapter").Where("id = ?", req.Id).First(&types.AdapterInfo{})
if db.Error != nil {
logx.Errorf("err %v", db.Error.Error())
return nil, errors.New("Adapter does not exist")
}
tx := l.svcCtx.DbEngin.Table("t_adapter").Delete(types.AdapterInfo{}, req.Id)
if tx.Error != nil {
logx.Errorf("err %v", db.Error.Error())
return nil, errors.New("Delete adapter failed")
}
return
}

View File

@ -0,0 +1,39 @@
package adapters
import (
"context"
"github.com/pkg/errors"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type DeleteClusterLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewDeleteClusterLogic(ctx context.Context, svcCtx *svc.ServiceContext) *DeleteClusterLogic {
return &DeleteClusterLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *DeleteClusterLogic) DeleteCluster(req *types.ClusterDelReq) (resp *types.ClusterResp, err error) {
db := l.svcCtx.DbEngin.Table("t_cluster").Where("id = ?", req.Id).First(&types.ClusterInfo{})
if db.Error != nil {
logx.Errorf("err %v", db.Error.Error())
return nil, errors.New("Cluster does not exist")
}
tx := l.svcCtx.DbEngin.Table("t_cluster").Delete(types.AdapterInfo{}, req.Id)
if tx.Error != nil {
logx.Errorf("err %v", db.Error.Error())
return nil, errors.New("Delete Cluster failed")
}
return
}

View File

@ -0,0 +1,35 @@
package adapters
import (
"context"
"github.com/pkg/errors"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type GetAdapterLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewGetAdapterLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetAdapterLogic {
return &GetAdapterLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *GetAdapterLogic) GetAdapter(req *types.AdapterDelReq) (resp *types.AdapterResp, err error) {
resp = &types.AdapterResp{}
db := l.svcCtx.DbEngin.Table("t_adapter").Where("id = ?", req.Id).First(&resp.Data)
if db.Error != nil {
logx.Errorf("err %v", db.Error.Error())
return nil, errors.New("Adapter does not exist")
}
return
}

View File

@ -0,0 +1,36 @@
package adapters
import (
"context"
"github.com/pkg/errors"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type GetClusterLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewGetClusterLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetClusterLogic {
return &GetClusterLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *GetClusterLogic) GetCluster(req *types.ClusterDelReq) (resp *types.ClusterResp, err error) {
resp = &types.ClusterResp{}
db := l.svcCtx.DbEngin.Table("t_cluster").Where("id = ?", req.Id).First(&resp.Data)
if db.Error != nil {
logx.Errorf("err %v", db.Error.Error())
return nil, errors.New("Adapter does not exist")
}
return
}

View File

@ -0,0 +1,30 @@
package adapters
import (
"context"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type UpdateAdapterLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewUpdateAdapterLogic(ctx context.Context, svcCtx *svc.ServiceContext) *UpdateAdapterLogic {
return &UpdateAdapterLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *UpdateAdapterLogic) UpdateAdapter(req *types.AdapterReq) (resp *types.AdapterResp, err error) {
// todo: add your logic here and delete this line
return
}

View File

@ -0,0 +1,30 @@
package adapters
import (
"context"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type UpdateClusterLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewUpdateClusterLogic(ctx context.Context, svcCtx *svc.ServiceContext) *UpdateClusterLogic {
return &UpdateClusterLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *UpdateClusterLogic) UpdateCluster(req *types.ClusterReq) (resp *types.ClusterResp, err error) {
// todo: add your logic here and delete this line
return
}

View File

@ -57,9 +57,18 @@ func (l *DeleteLinkImageLogic) DeleteLinkImage(req *types.DeleteLinkImageReq) (r
return nil, err
}
if img == nil {
return nil, nil
return nil, storeLink.ERROR_RESP_EMPTY
}
imgResp := img.(types.DeleteLinkImageResp)
return &imgResp, nil
resp = &types.DeleteLinkImageResp{}
//转换成统一返回类型
imgResp, err := storeLink.ConvertType(img, resp, participant)
if err != nil {
return nil, err
}
if imgResp == nil {
return nil, storeLink.ERROR_CONVERT_EMPTY
}
return imgResp.(*types.DeleteLinkImageResp), nil
}

View File

@ -57,9 +57,18 @@ func (l *DeleteLinkTaskLogic) DeleteLinkTask(req *types.DeleteLinkTaskReq) (resp
return nil, err
}
if task == nil {
return nil, nil
return nil, storeLink.ERROR_RESP_EMPTY
}
taskResp := task.(types.DeleteLinkTaskResp)
return &taskResp, nil
resp = &types.DeleteLinkTaskResp{}
//转换成统一返回类型
taskResp, err := storeLink.ConvertType(task, resp, participant)
if err != nil {
return nil, err
}
if taskResp == nil {
return nil, storeLink.ERROR_CONVERT_EMPTY
}
return taskResp.(*types.DeleteLinkTaskResp), nil
}

View File

@ -57,9 +57,18 @@ func (l *GetAISpecsLogic) GetAISpecs(req *types.GetResourceSpecsReq) (resp *type
return nil, err
}
if specs == nil {
return nil, nil
return nil, storeLink.ERROR_RESP_EMPTY
}
specsResp := specs.(types.GetResourceSpecsResp)
return &specsResp, nil
resp = &types.GetResourceSpecsResp{}
//转换成统一返回类型
specsResp, err := storeLink.ConvertType(specs, resp, participant)
if err != nil {
return nil, err
}
if specsResp == nil {
return nil, storeLink.ERROR_CONVERT_EMPTY
}
return specsResp.(*types.GetResourceSpecsResp), nil
}

View File

@ -57,9 +57,18 @@ func (l *GetLinkImageListLogic) GetLinkImageList(req *types.GetLinkImageListReq)
return nil, err
}
if list == nil {
return nil, nil
return nil, storeLink.ERROR_RESP_EMPTY
}
listResp := list.(types.GetLinkImageListResp)
return &listResp, nil
resp = &types.GetLinkImageListResp{}
//转换成统一返回类型
listResp, err := storeLink.ConvertType(list, resp, participant)
if err != nil {
return nil, err
}
if listResp == nil {
return nil, storeLink.ERROR_CONVERT_EMPTY
}
return listResp.(*types.GetLinkImageListResp), nil
}

View File

@ -58,9 +58,18 @@ func (l *GetLinkTaskLogic) GetLinkTask(req *types.GetLinkTaskReq) (resp *types.G
return nil, err
}
if task == nil {
return nil, nil
return nil, storeLink.ERROR_RESP_EMPTY
}
taskResp := task.(types.GetLinkTaskResp)
return &taskResp, nil
resp = &types.GetLinkTaskResp{}
//转换成统一返回类型
taskResp, err := storeLink.ConvertType(task, resp, participant)
if err != nil {
return nil, err
}
if taskResp == nil {
return nil, storeLink.ERROR_CONVERT_EMPTY
}
return taskResp.(*types.GetLinkTaskResp), nil
}

View File

@ -67,11 +67,20 @@ func (l *SubmitLinkTaskLogic) SubmitLinkTask(req *types.SubmitLinkTaskReq) (resp
envs = append(envs, env)
}
}
task, err := storelink.ILinkage.SubmitTask(req.ImageId, req.Cmd, envs, params, req.ResourceId)
task, err := storelink.ILinkage.SubmitTask(req.ImageId, req.Cmd, envs, params, req.ResourceId, "")
if err != nil {
return nil, err
}
taskResp := task.(types.SubmitLinkTaskResp)
return &taskResp, nil
resp = &types.SubmitLinkTaskResp{}
//转换成统一返回类型
taskResp, err := storeLink.ConvertType(task, resp, participant)
if err != nil {
return nil, err
}
if taskResp == nil {
return nil, storeLink.ERROR_CONVERT_EMPTY
}
return taskResp.(*types.SubmitLinkTaskResp), nil
}

View File

@ -58,9 +58,18 @@ func (l *UploadLinkImageLogic) UploadLinkImage(req *types.UploadLinkImageReq) (r
return nil, err
}
if img == nil {
return nil, nil
return nil, storeLink.ERROR_RESP_EMPTY
}
imgResp := img.(types.UploadLinkImageResp)
return &imgResp, nil
resp = &types.UploadLinkImageResp{}
//转换成统一返回类型
imgResp, err := storeLink.ConvertType(img, resp, participant)
if err != nil {
return nil, err
}
if imgResp == nil {
return nil, storeLink.ERROR_CONVERT_EMPTY
}
return imgResp.(*types.UploadLinkImageResp), nil
}

View File

@ -16,8 +16,10 @@ package mqs
import (
"context"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/schedulers"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
scheduler2 "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler"
)
/*
@ -25,36 +27,29 @@ import (
Listening to the payment flow status change notification message queue
*/
type AiQueue struct {
ctx context.Context
svcCtx *svc.ServiceContext
ctx context.Context
svcCtx *svc.ServiceContext
scheduler *scheduler.Scheduler
}
func NewAiMq(ctx context.Context, svcCtx *svc.ServiceContext) *AiQueue {
aiExecutorMap, aiCollectorMap := service.InitAiClusterMap(ctx, svcCtx)
return &AiQueue{
ctx: ctx,
svcCtx: svcCtx,
ctx: ctx,
svcCtx: svcCtx,
scheduler: scheduler.NewScheduler2(aiCollectorMap, nil, aiExecutorMap),
}
}
func (l *AiQueue) Consume(val string) error {
// 接受消息, 根据标签筛选过滤
aiSchdl := scheduler2.NewAiScheduler(val)
schdl, err := scheduler2.NewScheduler(aiSchdl, val, l.svcCtx.DbEngin, nil)
if err != nil {
return err
}
schdl.MatchLabels()
aiSchdl, _ := schedulers.NewAiScheduler(val, l.scheduler)
// 调度算法
err = schdl.AssignAndSchedule()
err := l.scheduler.AssignAndSchedule(aiSchdl)
if err != nil {
return err
}
// 存储数据
err = schdl.SaveToDb()
if err != nil {
return err
}
return nil
}

View File

@ -16,8 +16,9 @@ package mqs
import (
"context"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/schedulers"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler"
)
/*
@ -37,7 +38,7 @@ func NewCloudMq(ctx context.Context, svcCtx *svc.ServiceContext) *CloudMq {
func (l *CloudMq) Consume(val string) error {
// 接受消息, 根据标签筛选过滤
cloudScheduler := scheduler.NewCloudScheduler()
cloudScheduler := schedulers.NewCloudScheduler()
schdl, err := scheduler.NewScheduler(cloudScheduler, val, l.svcCtx.DbEngin, l.svcCtx.ParticipantRpc)
if err != nil {
return err

View File

@ -17,7 +17,6 @@ package mqs
import (
"context"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
scheduler2 "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler"
)
/*
@ -37,24 +36,5 @@ func NewHpcMq(ctx context.Context, svcCtx *svc.ServiceContext) *HpcMq {
}
func (l *HpcMq) Consume(val string) error {
// 接受消息, 根据标签筛选过滤
hpcSchdl := scheduler2.NewHpcScheduler(val)
schdl, err := scheduler2.NewScheduler(hpcSchdl, val, l.svcCtx.DbEngin, nil)
if err != nil {
return err
}
schdl.MatchLabels()
// 调度算法
err = schdl.AssignAndSchedule()
if err != nil {
return err
}
// 存储数据
err = schdl.SaveToDb()
if err != nil {
return err
}
return nil
}

View File

@ -12,22 +12,23 @@
*/
package scheduler
package common
import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/strategies"
"math/rand"
"time"
)
type scheduleService interface {
getNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error)
pickOptimalStrategy() (strategies.Strategy, error)
type SubSchedule interface {
GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error)
PickOptimalStrategy() (strategy.Strategy, error)
AssignTask(clusters []*strategy.AssignedCluster) error
}
// 求交集
func intersect(slice1, slice2 []int64) []int64 {
func Intersect(slice1, slice2 []int64) []int64 {
m := make(map[int64]int)
nn := make([]int64, 0)
for _, v := range slice1 {
@ -43,7 +44,7 @@ func intersect(slice1, slice2 []int64) []int64 {
return nn
}
func micsSlice(origin []int64, count int) []int64 {
func MicsSlice(origin []int64, count int) []int64 {
tmpOrigin := make([]int64, len(origin))
copy(tmpOrigin, origin)
//一定要seed

View File

@ -1,6 +1,9 @@
package database
import "gorm.io/gorm"
import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/entity"
"gorm.io/gorm"
)
type CloudStorage struct {
dbEngin *gorm.DB
@ -10,8 +13,8 @@ func NewCloudStorage(dbEngin *gorm.DB) *CloudStorage {
return &CloudStorage{dbEngin: dbEngin}
}
func (c *CloudStorage) GetProviderParams() ([]providerParams, error) {
var proParams []providerParams
func (c *CloudStorage) GetProviderParams() ([]entity.ProviderParams, error) {
var proParams []entity.ProviderParams
sqlstr := "SELECT SUM(a.disk_avail) as disk_avail,SUM(a.mem_avail) as mem_avail,SUM(a.cpu_total * a.cpu_usable) as cpu_avail,participant_id from (SELECT * from sc_node_avail_info where created_time in (SELECT MAX(created_time) as time from sc_node_avail_info where deleted_flag = 0 GROUP BY participant_id,node_name)) a GROUP BY a.participant_id"
c.dbEngin.Raw(sqlstr).Scan(&proParams)
if len(proParams) == 0 {
@ -20,9 +23,12 @@ func (c *CloudStorage) GetProviderParams() ([]providerParams, error) {
return proParams, nil
}
type providerParams struct {
Disk_avail float64
Mem_avail float64
Cpu_avail float64
Participant_id int64
func (c *CloudStorage) FindAvailableParticipants() ([]entity.Participant, error) {
var parts []entity.Participant
sqlstr := "select id as participant_id, name as name from sc_participant_phy_info"
c.dbEngin.Raw(sqlstr).Scan(&parts)
if len(parts) == 0 {
return nil, nil
}
return parts, nil
}

View File

@ -0,0 +1,13 @@
package database
import "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/entity"
type Storage interface {
GetProviderParams() ([]entity.ProviderParams, error)
FindAvailableParticipants() ([]entity.Participant, error)
}
type NSIDSpecified interface {
Storage
NSID() error
}

View File

@ -0,0 +1,19 @@
package entity
type ProviderParams struct {
Disk_avail float64
Mem_avail float64
Cpu_avail float64
Participant_id int64
}
type Participant struct {
Name string
Participant_id int64
}
type WeightP struct {
Participant_id int64
Weight int32
Name string
}

View File

@ -18,32 +18,45 @@ import (
"encoding/json"
"github.com/pkg/errors"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/common"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/database"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/executor"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/client/participantservice"
"gorm.io/gorm"
"sigs.k8s.io/yaml"
"strings"
"sync"
)
type scheduler struct {
task *response.TaskInfo
participantIds []int64
scheduleService scheduleService
dbEngin *gorm.DB
result []string //pID:子任务yamlstring 键值对
participantRpc participantservice.ParticipantService
type Scheduler struct {
task *response.TaskInfo
participantIds []int64
subSchedule common.SubSchedule
dbEngin *gorm.DB
result []string //pID:子任务yamlstring 键值对
participantRpc participantservice.ParticipantService
ResourceCollector *map[string]collector.ResourceCollector
Storages database.Storage
AiExecutor *map[string]executor.AiExecutor
mu sync.RWMutex
}
func NewScheduler(scheduleService scheduleService, val string, dbEngin *gorm.DB, participantRpc participantservice.ParticipantService) (*scheduler, error) {
func NewScheduler(subSchedule common.SubSchedule, val string, dbEngin *gorm.DB, participantRpc participantservice.ParticipantService) (*Scheduler, error) {
var task *response.TaskInfo
err := json.Unmarshal([]byte(val), &task)
if err != nil {
return nil, errors.New("create scheduler failed : " + err.Error())
}
return &scheduler{task: task, scheduleService: scheduleService, dbEngin: dbEngin, participantRpc: participantRpc}, nil
return &Scheduler{task: task, subSchedule: subSchedule, dbEngin: dbEngin, participantRpc: participantRpc}, nil
}
func (s *scheduler) SpecifyClusters() {
func NewScheduler2(resourceCollector *map[string]collector.ResourceCollector, storages database.Storage, aiExecutor *map[string]executor.AiExecutor) *Scheduler {
return &Scheduler{ResourceCollector: resourceCollector, Storages: storages, AiExecutor: aiExecutor}
}
func (s *Scheduler) SpecifyClusters() {
// 如果已指定集群名通过数据库查询后返回p端ip列表
if len(s.task.Clusters) != 0 {
s.dbEngin.Raw("select id from sc_participant_phy_info where `name` in (?)", s.task.Clusters).Scan(&s.participantIds)
@ -51,7 +64,7 @@ func (s *scheduler) SpecifyClusters() {
}
}
func (s *scheduler) SpecifyNsID() {
func (s *Scheduler) SpecifyNsID() {
// 未指定集群名只指定nsID
if len(s.task.Clusters) == 0 {
if len(s.task.NsID) != 0 {
@ -67,7 +80,7 @@ func (s *scheduler) SpecifyNsID() {
}
}
func (s *scheduler) MatchLabels() {
func (s *Scheduler) MatchLabels() {
var ids []int64
count := 0
@ -80,7 +93,7 @@ func (s *scheduler) MatchLabels() {
if count == 0 {
ids = participantIds
}
ids = intersect(ids, participantIds)
ids = common.Intersect(ids, participantIds)
count++
}
s.participantIds = ids
@ -90,7 +103,7 @@ func (s *scheduler) MatchLabels() {
}
// TempAssign todo 屏蔽原调度算法
func (s *scheduler) TempAssign() error {
func (s *Scheduler) TempAssign() error {
//需要判断task中的资源类型针对metadata中的多个kind做不同处理
//输入副本数和集群列表最终结果输出为pID对应副本数量列表针对多个kind需要做拆分和重新拼接组合
@ -110,33 +123,33 @@ func (s *scheduler) TempAssign() error {
return nil
}
func (s *scheduler) AssignAndSchedule() error {
// 已指定 ParticipantId
if s.task.ParticipantId != 0 {
return nil
}
// 标签匹配以及后未找到ParticipantIds
if len(s.participantIds) == 0 {
return errors.New("未找到匹配的ParticipantIds")
}
func (s *Scheduler) AssignAndSchedule(ss common.SubSchedule) error {
//// 已指定 ParticipantId
//if s.task.ParticipantId != 0 {
// return nil
//}
//// 标签匹配以及后未找到ParticipantIds
//if len(s.participantIds) == 0 {
// return errors.New("未找到匹配的ParticipantIds")
//}
//
//// 指定或者标签匹配的结果只有一个集群,给任务信息指定
//if len(s.participantIds) == 1 {
// s.task.ParticipantId = s.participantIds[0]
// //replicas := s.task.Metadata.(map[string]interface{})["spec"].(map[string]interface{})["replicas"].(float64)
// //result := make(map[int64]string)
// //result[s.participantIds[0]] = strconv.FormatFloat(replicas, 'f', 2, 64)
// //s.result = result
//
// return nil
//}
// 指定或者标签匹配的结果只有一个集群,给任务信息指定
if len(s.participantIds) == 1 {
s.task.ParticipantId = s.participantIds[0]
//replicas := s.task.Metadata.(map[string]interface{})["spec"].(map[string]interface{})["replicas"].(float64)
//result := make(map[int64]string)
//result[s.participantIds[0]] = strconv.FormatFloat(replicas, 'f', 2, 64)
//s.result = result
return nil
}
strategy, err := s.scheduleService.pickOptimalStrategy()
strategy, err := ss.PickOptimalStrategy()
if err != nil {
return err
}
_, err = strategy.Schedule()
clusters, err := strategy.Schedule()
if err != nil {
return err
}
@ -147,15 +160,20 @@ func (s *scheduler) AssignAndSchedule() error {
// return nil
//}
err = ss.AssignTask(clusters)
if err != nil {
return err
}
return nil
}
func (s *scheduler) SaveToDb() error {
func (s *Scheduler) SaveToDb() error {
for _, participantId := range s.participantIds {
for _, resource := range s.task.Metadata {
structForDb, err := s.scheduleService.getNewStructForDb(s.task, resource, participantId)
structForDb, err := s.subSchedule.GetNewStructForDb(s.task, resource, participantId)
if err != nil {
return err
}

View File

@ -0,0 +1,100 @@
/*
Copyright (c) [2023] [pcm]
[pcm-coordinator] is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
*/
package schedulers
import (
"errors"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/schedulers/option"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy/param"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils"
)
type AiScheduler struct {
yamlString string
task *response.TaskInfo
*scheduler.Scheduler
option *option.AiOption
}
func NewAiScheduler(val string, scheduler *scheduler.Scheduler) (*AiScheduler, error) {
return &AiScheduler{yamlString: val, Scheduler: scheduler}, nil
}
func (as *AiScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) {
ai := models.Ai{
ParticipantId: participantId,
TaskId: task.TaskId,
Status: "Saved",
YamlString: as.yamlString,
}
utils.Convert(task.Metadata, &ai)
return ai, nil
}
func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
resources, err := as.findClustersWithResources()
if err != nil {
return nil, err
}
if len(resources) == 0 {
return nil, errors.New("no cluster has resources")
}
params := &param.Params{Resources: resources}
if len(resources) < 2 /*|| as.task */ {
strategy := strategy.NewReplicationStrategy(&param.ReplicationParams{Params: params /*, Replicas: 1*/})
return strategy, nil
}
strategy := strategy.NewPricingStrategy(&param.ResourcePricingParams{Params: params})
return strategy, nil
}
func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) error {
if clusters == nil {
return errors.New("clusters is nil")
}
executorMap := *as.AiExecutor
for _, cluster := range clusters {
_, err := executorMap[cluster.Name].Execute(as.option)
if err != nil {
// TODO: database operation
}
// TODO: database operation
}
return nil
}
func (as *AiScheduler) findClustersWithResources() ([]*collector.ResourceSpecs, error) {
var resourceSpecs []*collector.ResourceSpecs
for _, resourceCollector := range *as.ResourceCollector {
spec, err := resourceCollector.GetResourceSpecs()
if err != nil {
continue
}
resourceSpecs = append(resourceSpecs, spec)
}
if len(resourceSpecs) == 0 {
return nil, errors.New("no resource found")
}
return resourceSpecs, nil
}

View File

@ -12,15 +12,16 @@
*/
package scheduler
package schedulers
import (
"bytes"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/database"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy/param"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/algorithm/providerPricing"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/database"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/strategies"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils"
"io"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@ -37,17 +38,14 @@ func NewCloudScheduler() *CloudScheduler {
return &CloudScheduler{}
}
func (cs *CloudScheduler) pickOptimalStrategy() (strategies.Strategy, error) {
task, providerList, err := cs.genTaskAndProviders()
if err != nil {
return nil, nil
}
func (cs *CloudScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
//获取所有计算中心
//调度算法
strategy := strategies.NewPricingStrategy(task, providerList...)
strategy := strategy.NewPricingStrategy(&param.ResourcePricingParams{})
return strategy, nil
}
func (cs *CloudScheduler) getNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) {
func (cs *CloudScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) {
cloud := cs.UnMarshalK8sStruct(resource, task.TaskId, task.NsID)
cloud.Id = utils.GenSnowflakeID()
cloud.NsID = task.NsID
@ -116,3 +114,7 @@ func (cs *CloudScheduler) genTaskAndProviders() (*providerPricing.Task, []*provi
return nil, providerList, nil
}
func (cs *CloudScheduler) AssignTask(clusters []*strategy.AssignedCluster) error {
return nil
}

View File

@ -12,14 +12,14 @@
*/
package scheduler
package schedulers
import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/constants"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/algorithm/providerPricing"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/strategies"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils"
)
@ -31,7 +31,7 @@ func NewHpcScheduler(val string) *HpcScheduler {
return &HpcScheduler{yamlString: val}
}
func (h *HpcScheduler) getNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) {
func (h *HpcScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) {
hpc := models.Hpc{}
utils.Convert(task.Metadata, &hpc)
hpc.Id = utils.GenSnowflakeID()
@ -42,10 +42,14 @@ func (h *HpcScheduler) getNewStructForDb(task *response.TaskInfo, resource strin
return hpc, nil
}
func (h *HpcScheduler) pickOptimalStrategy() (strategies.Strategy, error) {
func (h *HpcScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
return nil, nil
}
func (h *HpcScheduler) genTaskAndProviders(task *response.TaskInfo) (*providerPricing.Task, []*providerPricing.Provider) {
return nil, nil
}
func (h *HpcScheduler) AssignTask(clusters []*strategy.AssignedCluster) error {
return nil
}

View File

@ -0,0 +1,20 @@
package option
type AiOption struct {
AiType string // shuguangAi/octopus
ResourceType string // cpu/gpu/compute card
TaskType string // pytorch/tensorflow
ImageId string
SpecId string
DatasetsId string
CodeId string
ResourceId string
Cmd string
Envs []string
Params []string
Datasets string
Code string
}

View File

@ -0,0 +1,24 @@
package schedulers
import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response"
)
type VmScheduler struct {
}
func (v VmScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) {
//TODO implement me
panic("implement me")
}
func (v VmScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
//TODO implement me
panic("implement me")
}
func (v VmScheduler) AssignTask(clusters []*strategy.AssignedCluster) error {
//TODO implement me
panic("implement me")
}

View File

@ -0,0 +1,47 @@
package service
import (
"context"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/executor"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/storeLink"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
)
const (
OCTOPUS = "Octopus"
MODELARTS = "Modelarts"
SHUGUANGAI = "ShuguangAi"
)
var (
AiTypeMap = map[string]string{
"Hanwuji": OCTOPUS,
"Suiyan": OCTOPUS,
"Sailingsi": OCTOPUS,
"Modelarts-CloudBrain2": MODELARTS,
"ShuguangAi": SHUGUANGAI,
}
)
func InitAiClusterMap(ctx context.Context, svcCtx *svc.ServiceContext) (*map[string]executor.AiExecutor, *map[string]collector.ResourceCollector) {
executorMap := make(map[string]executor.AiExecutor)
collectorMap := make(map[string]collector.ResourceCollector)
for k, v := range AiTypeMap {
switch v {
case OCTOPUS:
octopus := storeLink.NewOctopusLink(ctx, svcCtx, k, 0)
collectorMap[k] = octopus
executorMap[k] = octopus
case MODELARTS:
modelarts := storeLink.NewModelArtsLink(ctx, svcCtx, k, 0)
collectorMap[k] = modelarts
executorMap[k] = modelarts
case SHUGUANGAI:
sgai := storeLink.NewShuguangAi(ctx, svcCtx, k, 0)
collectorMap[k] = sgai
executorMap[k] = sgai
}
}
return &executorMap, &collectorMap
}

View File

@ -0,0 +1,22 @@
package collector
type ResourceCollector interface {
GetResourceSpecs() (*ResourceSpecs, error)
}
type ResourceSpecs struct {
ParticipantId int64
Name string
CpuAvail float64
MemAvail float64
DiskAvail float64
GpuAvail float64
CardAvail []Card
Balance float64
}
type Card struct {
Type string
Name string
TOpsAtFp16 float64
}

View File

@ -0,0 +1,11 @@
package executor
import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/schedulers/option"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/storeLink"
)
type AiExecutor interface {
Execute(option *option.AiOption) (interface{}, error)
storeLink.Linkage
}

View File

@ -0,0 +1,8 @@
package strategy
type DynamicResourcesStrategy struct {
}
func (ps *DynamicResourcesStrategy) Schedule() ([]*AssignedCluster, error) {
return nil, nil
}

View File

@ -0,0 +1,9 @@
package param
import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector"
)
type Params struct {
Resources []*collector.ResourceSpecs
}

View File

@ -0,0 +1,23 @@
package param
import "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/entity"
type ReplicationParams struct {
Replicas int32
*Params
}
func (r *ReplicationParams) GetReplicas() int32 {
return r.Replicas
}
func (r *ReplicationParams) GetParticipants() []*entity.Participant {
var participants []*entity.Participant
for _, resource := range r.Resources {
participants = append(participants, &entity.Participant{
Participant_id: resource.ParticipantId,
Name: resource.Name,
})
}
return participants
}

View File

@ -0,0 +1,32 @@
package param
import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing"
)
type ResourcePricingParams struct {
replicas int32
task *providerPricing.Task
*Params
}
func (r *ResourcePricingParams) GetReplicas() int32 {
return r.replicas
}
func (r *ResourcePricingParams) GetTask() *providerPricing.Task {
return r.task
}
func (r *ResourcePricingParams) GetProviders() []*providerPricing.Provider {
var providerList []*providerPricing.Provider
for _, resource := range r.Resources {
provider := providerPricing.NewProvider(
resource.ParticipantId,
resource.CpuAvail,
resource.MemAvail,
resource.DiskAvail, 0.0, 0.0, 0.0)
providerList = append(providerList, provider)
}
return providerList
}

View File

@ -0,0 +1,35 @@
package strategy
import (
"github.com/pkg/errors"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/entity"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy/param"
)
type ReplicationStrategy struct {
replicas int32
participants []*entity.Participant
}
func NewReplicationStrategy(params *param.ReplicationParams) *ReplicationStrategy {
return &ReplicationStrategy{replicas: params.GetReplicas(),
participants: params.GetParticipants(),
}
}
func (ps *ReplicationStrategy) Schedule() ([]*AssignedCluster, error) {
if ps.replicas < 1 {
return nil, errors.New("replicas must be greater than 0")
}
if ps.participants == nil {
return nil, errors.New("participantId must be set")
}
var results []*AssignedCluster
for _, p := range ps.participants {
cluster := &AssignedCluster{ParticipantId: p.Participant_id, Name: p.Name, Replicas: ps.replicas}
results = append(results, cluster)
}
return results, nil
}

View File

@ -12,11 +12,12 @@
*/
package strategies
package strategy
import (
"errors"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/algorithm/providerPricing"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy/param"
)
type PricingStrategy struct {
@ -25,7 +26,9 @@ type PricingStrategy struct {
StrategyList []*providerPricing.Strategy
}
func NewPricingStrategy(task *providerPricing.Task, providers ...*providerPricing.Provider) *PricingStrategy {
func NewPricingStrategy(params *param.ResourcePricingParams) *PricingStrategy {
providers := params.GetProviders()
task := params.GetTask()
var providerList []*providerPricing.Provider
var res [][]int

View File

@ -0,0 +1,75 @@
package strategy
import (
"errors"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/entity"
)
type StaticWeightStrategy struct {
// TODO: add fields
//每个
num int32
weights []entity.WeightP
}
func NewStaticWeightStrategy(weights []entity.WeightP, replicas int32) *StaticWeightStrategy {
return &StaticWeightStrategy{weights: weights,
num: replicas,
}
}
func (ps *StaticWeightStrategy) Schedule() ([]*AssignedCluster, error) {
// TODO: implement the scheduling logic return nil, nil
if ps.num < 1 {
return nil, errors.New("numbers must be greater than 0")
}
if ps.weights == nil {
return nil, errors.New("weight must be set")
}
var weightSum int32
weightSum = 0
for _, w := range ps.weights {
weightSum += w.Weight
}
weightRatio := make([]float64, len(ps.weights))
for i, w := range ps.weights {
weightRatio[i] = float64(w.Weight) / float64(weightSum)
}
var rest = ps.num
var results []*AssignedCluster
for i := 0; i < len(ps.weights); i++ {
var n = int(float64(ps.num) * weightRatio[i])
rest -= int32(n)
cluster := &AssignedCluster{ParticipantId: ps.weights[i].Participant_id, Name: ps.weights[i].Name, Replicas: int32(n)}
results = append(results, cluster)
}
if rest != 0 {
if rest < 0 { // 如果差值小于0需要增加某些元素的值
for i := len(ps.weights) - 1; rest < 0 && i >= 0; i-- {
if results[i].Replicas < ps.weights[i].Weight {
results[i].Replicas++
rest++
}
}
} else {
for i := len(ps.weights) - 1; rest > 0 && i >= 0; i-- {
if results[i].Replicas < ps.weights[i].Weight {
results[i].Replicas--
rest--
}
}
}
}
return results, nil
}

View File

@ -1,4 +1,4 @@
package strategies
package strategy
type Strategy interface {
Schedule() ([]*AssignedCluster, error)
@ -9,6 +9,3 @@ type AssignedCluster struct {
Name string
Replicas int32
}
type Options struct {
}

View File

@ -0,0 +1,106 @@
package test
import (
"fmt"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/entity"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy/param"
"testing"
)
func TestReplication(t *testing.T) {
parts := []entity.Participant{
{Name: "test1", Participant_id: 1},
{Name: "test2", Participant_id: 2},
{Name: "test3", Participant_id: 3},
}
rsc := []*collector.ResourceSpecs{
{
ParticipantId: 1,
Name: "test1",
},
{
ParticipantId: 1,
Name: "test2"},
{
ParticipantId: 1,
Name: "test3"},
}
tests := []struct {
name string
replica int32
ps []entity.Participant
res []*collector.ResourceSpecs
}{
{
name: "test1",
replica: 1,
ps: parts,
},
{
name: "test2",
replica: 2,
ps: parts,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
params := &param.Params{Resources: rsc}
repl := strategy.NewReplicationStrategy(&param.ReplicationParams{Params: params, Replicas: tt.replica})
schedule, err := repl.Schedule()
if err != nil {
return
}
for _, cluster := range schedule {
fmt.Println(cluster)
}
})
}
}
func TestStaticWeight(t *testing.T) {
parts := []entity.WeightP{
{Name: "p1", Participant_id: 1, Weight: 3},
{Name: "p2", Participant_id: 2, Weight: 5},
{Name: "p3", Participant_id: 3, Weight: 2},
}
tests := []struct {
name string
replica int32
ps []entity.WeightP
}{
{
name: "test1",
replica: 1,
ps: parts,
},
{
name: "test2",
replica: 5,
ps: parts,
},
{
name: "test2",
replica: 6,
ps: parts,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
repl := strategy.NewStaticWeightStrategy(tt.ps, tt.replica)
schedule, err := repl.Schedule()
if err != nil {
return
}
for _, cluster := range schedule {
fmt.Println(cluster)
}
})
}
}

View File

@ -16,8 +16,9 @@ package storeLink
import (
"context"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/schedulers/option"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/modelarts"
"strconv"
@ -25,16 +26,16 @@ import (
)
type ModelArtsLink struct {
ctx context.Context
svcCtx *svc.ServiceContext
platform string
pageIndex int32
pageSize int32
participant *models.StorelinkCenter
ctx context.Context
svcCtx *svc.ServiceContext
platform string
participantId int64
pageIndex int32
pageSize int32
}
func NewModelArtsLink(ctx context.Context, svcCtx *svc.ServiceContext, participant *models.StorelinkCenter) *ModelArtsLink {
return &ModelArtsLink{ctx: ctx, svcCtx: svcCtx, participant: participant, platform: participant.Name, pageIndex: 1, pageSize: 100}
func NewModelArtsLink(ctx context.Context, svcCtx *svc.ServiceContext, name string, id int64) *ModelArtsLink {
return &ModelArtsLink{ctx: ctx, svcCtx: svcCtx, platform: name, participantId: id, pageIndex: 1, pageSize: 100}
}
func (o *ModelArtsLink) UploadImage(path string) (interface{}, error) {
@ -59,16 +60,10 @@ func (o *ModelArtsLink) QueryImageList() (interface{}, error) {
return nil, err
}
//转换成统一返回类型
imgListResp, err := ConvertType[modelarts.ListReposDetailsResp](resp, nil)
if err != nil {
return nil, err
}
return imgListResp, nil
return resp, nil
}
func (o *ModelArtsLink) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (interface{}, error) {
func (o *ModelArtsLink) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, aiType string) (interface{}, error) {
// modelArts提交任务
environments := make(map[string]string)
parameters := make([]*modelarts.ParametersTrainJob, 0)
@ -110,13 +105,7 @@ func (o *ModelArtsLink) SubmitTask(imageId string, cmd string, envs []string, pa
return nil, err
}
//转换成统一返回类型
submitResp, err := ConvertType[modelarts.CreateTrainingJobResp](resp, nil)
if err != nil {
return nil, err
}
return submitResp, nil
return resp, nil
}
func (o *ModelArtsLink) QueryTask(taskId string) (interface{}, error) {
@ -130,13 +119,7 @@ func (o *ModelArtsLink) QueryTask(taskId string) (interface{}, error) {
return nil, err
}
//转换成统一返回类型
taskResp, err := ConvertType[modelarts.JobResponse](resp, o.participant)
if err != nil {
return nil, err
}
return taskResp, nil
return resp, nil
}
func (o *ModelArtsLink) DeleteTask(taskId string) (interface{}, error) {
@ -150,13 +133,7 @@ func (o *ModelArtsLink) DeleteTask(taskId string) (interface{}, error) {
return nil, err
}
//转换成统一返回类型
deleteResp, err := ConvertType[modelarts.DeleteTrainingJobResp](resp, nil)
if err != nil {
return nil, err
}
return deleteResp, nil
return resp, nil
}
func (o *ModelArtsLink) QuerySpecs() (interface{}, error) {
@ -169,11 +146,17 @@ func (o *ModelArtsLink) QuerySpecs() (interface{}, error) {
return nil, err
}
//转换成统一返回类型
specsResp, err := ConvertType[modelarts.TrainingJobFlavorsResp](resp, o.participant)
return resp, nil
}
func (o *ModelArtsLink) GetResourceSpecs() (*collector.ResourceSpecs, error) {
return nil, nil
}
func (o *ModelArtsLink) Execute(option *option.AiOption) (interface{}, error) {
task, err := o.SubmitTask(option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.AiType)
if err != nil {
return nil, err
}
return specsResp, nil
return task, nil
}

View File

@ -16,19 +16,21 @@ package storeLink
import (
"context"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/schedulers/option"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils"
"gitlink.org.cn/jcce-pcm/pcm-participant-octopus/octopus"
"strings"
)
type OctopusLink struct {
ctx context.Context
svcCtx *svc.ServiceContext
pageIndex int32
pageSize int32
participant *models.StorelinkCenter
ctx context.Context
svcCtx *svc.ServiceContext
pageIndex int32
pageSize int32
platform string
participantId int64
}
const (
@ -38,14 +40,14 @@ const (
RESOURCE_POOL = "common-pool"
)
func NewOctopusLink(ctx context.Context, svcCtx *svc.ServiceContext, participant *models.StorelinkCenter) *OctopusLink {
return &OctopusLink{ctx: ctx, svcCtx: svcCtx, participant: participant, pageIndex: 1, pageSize: 100}
func NewOctopusLink(ctx context.Context, svcCtx *svc.ServiceContext, name string, id int64) *OctopusLink {
return &OctopusLink{ctx: ctx, svcCtx: svcCtx, platform: name, participantId: id, pageIndex: 1, pageSize: 100}
}
func (o *OctopusLink) UploadImage(path string) (interface{}, error) {
// octopus创建镜像
createReq := &octopus.CreateImageReq{
Platform: o.participant.Name,
Platform: o.platform,
CreateImage: &octopus.CreateImage{
SourceType: 1,
ImageName: IMG_NAME_PREFIX + utils.RandomString(7),
@ -59,7 +61,7 @@ func (o *OctopusLink) UploadImage(path string) (interface{}, error) {
// octopus上传镜像
uploadReq := &octopus.UploadImageReq{
Platform: o.participant.Name,
Platform: o.platform,
ImageId: createResp.Payload.ImageId,
Params: &octopus.UploadImageParam{
Domain: "",
@ -73,19 +75,13 @@ func (o *OctopusLink) UploadImage(path string) (interface{}, error) {
// Todo 实际上传
//转换成统一返回类型
resp, err := ConvertType[octopus.UploadImageResp](uploadResp, nil)
if err != nil {
return nil, err
}
return resp, nil
return uploadResp, nil
}
func (o *OctopusLink) DeleteImage(imageId string) (interface{}, error) {
// octopus删除镜像
req := &octopus.DeleteImageReq{
Platform: o.participant.Name,
Platform: o.platform,
ImageId: imageId,
}
resp, err := o.svcCtx.OctopusRpc.DeleteImage(o.ctx, req)
@ -93,19 +89,13 @@ func (o *OctopusLink) DeleteImage(imageId string) (interface{}, error) {
return nil, err
}
//转换成统一返回类型
deleteResp, err := ConvertType[octopus.DeleteImageResp](resp, nil)
if err != nil {
return nil, err
}
return deleteResp, nil
return resp, nil
}
func (o *OctopusLink) QueryImageList() (interface{}, error) {
// octopus获取镜像列表
req := &octopus.GetUserImageListReq{
Platform: o.participant.Name,
Platform: o.platform,
PageIndex: o.pageIndex,
PageSize: o.pageSize,
}
@ -114,16 +104,10 @@ func (o *OctopusLink) QueryImageList() (interface{}, error) {
return nil, err
}
//转换成统一返回类型
imgListResp, err := ConvertType[octopus.GetUserImageListResp](resp, nil)
if err != nil {
return nil, err
}
return imgListResp, nil
return resp, nil
}
func (o *OctopusLink) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (interface{}, error) {
func (o *OctopusLink) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, aiType string) (interface{}, error) {
// octopus提交任务
// python参数
@ -144,7 +128,7 @@ func (o *OctopusLink) SubmitTask(imageId string, cmd string, envs []string, para
}
req := &octopus.CreateTrainJobReq{
Platform: o.participant.Name,
Platform: o.platform,
Params: &octopus.CreateTrainJobParam{
ImageId: imageId,
Name: TASK_NAME_PREFIX + UNDERSCORE + utils.RandomString(10),
@ -167,19 +151,13 @@ func (o *OctopusLink) SubmitTask(imageId string, cmd string, envs []string, para
return nil, err
}
//转换成统一返回类型
submitResp, err := ConvertType[octopus.CreateTrainJobResp](resp, nil)
if err != nil {
return nil, err
}
return submitResp, nil
return resp, nil
}
func (o *OctopusLink) QueryTask(taskId string) (interface{}, error) {
// octopus获取任务
req := &octopus.GetTrainJobReq{
Platform: o.participant.Name,
Platform: o.platform,
Id: taskId,
}
resp, err := o.svcCtx.OctopusRpc.GetTrainJob(o.ctx, req)
@ -187,19 +165,13 @@ func (o *OctopusLink) QueryTask(taskId string) (interface{}, error) {
return nil, err
}
//转换成统一返回类型
taskResp, err := ConvertType[octopus.GetTrainJobResp](resp, nil)
if err != nil {
return nil, err
}
return taskResp, nil
return resp, nil
}
func (o *OctopusLink) DeleteTask(taskId string) (interface{}, error) {
// octopus删除任务
req := &octopus.DeleteTrainJobReq{
Platform: o.participant.Name,
Platform: o.platform,
JobIds: []string{taskId},
}
resp, err := o.svcCtx.OctopusRpc.DeleteTrainJob(o.ctx, req)
@ -207,19 +179,13 @@ func (o *OctopusLink) DeleteTask(taskId string) (interface{}, error) {
return nil, err
}
//转换成统一返回类型
deleteResp, err := ConvertType[octopus.DeleteTrainJobResp](resp, nil)
if err != nil {
return nil, err
}
return deleteResp, nil
return resp, nil
}
func (o *OctopusLink) QuerySpecs() (interface{}, error) {
// octopus查询资源规格
req := &octopus.GetResourceSpecsReq{
Platform: o.participant.Name,
Platform: o.platform,
ResourcePool: "common-pool",
}
resp, err := o.svcCtx.OctopusRpc.GetResourceSpecs(o.ctx, req)
@ -227,11 +193,17 @@ func (o *OctopusLink) QuerySpecs() (interface{}, error) {
return nil, err
}
//转换成统一返回类型
specsResp, err := ConvertType[octopus.GetResourceSpecsResp](resp, o.participant)
return resp, nil
}
func (o *OctopusLink) GetResourceSpecs() (*collector.ResourceSpecs, error) {
return nil, nil
}
func (o *OctopusLink) Execute(option *option.AiOption) (interface{}, error) {
task, err := o.SubmitTask(option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.AiType)
if err != nil {
return nil, err
}
return specsResp, nil
return task, nil
}

View File

@ -7,16 +7,16 @@ import (
"gitlink.org.cn/jcce-pcm/pcm-ac/hpcAC"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils"
"strconv"
"strings"
)
type ShuguangHpc struct {
ctx context.Context
svcCtx *svc.ServiceContext
participant *models.StorelinkCenter
ctx context.Context
svcCtx *svc.ServiceContext
platform string
participantId int64
}
const (
@ -128,8 +128,8 @@ type ResourceSpec struct {
GAP_NDCU string
}
func NewShuguangHpc(ctx context.Context, svcCtx *svc.ServiceContext, participant *models.StorelinkCenter) *ShuguangHpc {
return &ShuguangHpc{ctx: ctx, svcCtx: svcCtx, participant: participant}
func NewShuguangHpc(ctx context.Context, svcCtx *svc.ServiceContext, name string, id int64) *ShuguangHpc {
return &ShuguangHpc{ctx: ctx, svcCtx: svcCtx, platform: name, participantId: id}
}
func (s ShuguangHpc) UploadImage(path string) (interface{}, error) {
@ -144,7 +144,7 @@ func (s ShuguangHpc) QueryImageList() (interface{}, error) {
return nil, nil
}
func (s ShuguangHpc) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (interface{}, error) {
func (s ShuguangHpc) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, aiType string) (interface{}, error) {
// shuguangHpc提交任务
//判断是否resourceId匹配自定义资源Id
@ -199,13 +199,7 @@ func (s ShuguangHpc) SubmitTask(imageId string, cmd string, envs []string, param
return nil, err
}
//转换成统一返回类型
submitResp, err := ConvertType[hpcAC.SubmitJobResp](resp, nil)
if err != nil {
return nil, err
}
return submitResp, nil
return resp, nil
}
@ -221,30 +215,21 @@ func (s ShuguangHpc) QueryTask(taskId string) (interface{}, error) {
//实时作业检查是否成功
if respC.Data != nil && respC.Data.JobEndTime != "" {
taskRespC, err := ConvertType[hpcAC.GetJobDetailResp](respC, nil)
return respC, nil
} else {
//历史作业
reqH := &hpcAC.HistoryJobDetailReq{
JobId: taskId,
JobmanagerId: strconv.Itoa(StrJobManagerID),
}
respH, err := s.svcCtx.ACRpc.HistoryJobDetail(s.ctx, reqH)
if err != nil {
return nil, err
}
return taskRespC, nil
}
//历史作业
reqH := &hpcAC.HistoryJobDetailReq{
JobId: taskId,
JobmanagerId: strconv.Itoa(StrJobManagerID),
return respH, nil
}
respH, err := s.svcCtx.ACRpc.HistoryJobDetail(s.ctx, reqH)
if err != nil {
return nil, err
}
taskRespH, err := ConvertType[hpcAC.HistoryJobDetailResp](respH, nil)
if err != nil {
return nil, err
}
return taskRespH, nil
}
func (s ShuguangHpc) QuerySpecs() (interface{}, error) {
@ -254,8 +239,8 @@ func (s ShuguangHpc) QuerySpecs() (interface{}, error) {
var respec types.ResourceSpecSl
respec.SpecId = k
respec.SpecName = v
respec.ParticipantId = s.participant.Id
respec.ParticipantName = s.participant.Name
respec.ParticipantId = s.participantId
respec.ParticipantName = s.platform
resp.ResourceSpecs = append(resp.ResourceSpecs, &respec)
}
@ -273,13 +258,7 @@ func (s ShuguangHpc) DeleteTask(taskId string) (interface{}, error) {
return nil, err
}
//转换成统一返回类型
taskResp, err := ConvertType[hpcAC.DeleteJobResp](resp, nil)
if err != nil {
return nil, err
}
return taskResp, nil
return resp, nil
}
func updateRequestByResourceId(resourceId string, req *hpcAC.SubmitJobReq) {

View File

@ -18,16 +18,19 @@ import (
"context"
"errors"
"gitlink.org.cn/jcce-pcm/pcm-ac/hpcAC"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/schedulers/option"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils"
"strconv"
"strings"
)
type ShuguangAi struct {
ctx context.Context
svcCtx *svc.ServiceContext
participant *models.StorelinkCenter
ctx context.Context
svcCtx *svc.ServiceContext
platform string
participantId int64
}
const (
@ -47,8 +50,8 @@ const (
PythonCodePath = "/work/home/acgnnmfbwo/111111/py/test.py"
)
func NewShuguangAi(ctx context.Context, svcCtx *svc.ServiceContext, participant *models.StorelinkCenter) *ShuguangAi {
return &ShuguangAi{ctx: ctx, svcCtx: svcCtx, participant: participant}
func NewShuguangAi(ctx context.Context, svcCtx *svc.ServiceContext, name string, id int64) *ShuguangAi {
return &ShuguangAi{ctx: ctx, svcCtx: svcCtx, platform: name, participantId: id}
}
func (s *ShuguangAi) UploadImage(path string) (interface{}, error) {
@ -70,18 +73,10 @@ func (s *ShuguangAi) QueryImageList() (interface{}, error) {
return nil, err
}
//转换成统一返回类型
imgListResp, err := ConvertType[hpcAC.GetImageListAiResp](resp, nil)
if err != nil {
return nil, err
}
return imgListResp, nil
return resp, nil
}
func (s *ShuguangAi) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (interface{}, error) {
// shuguangAi提交任务
func (s *ShuguangAi) SubmitPytorchTask(imageId string, cmd string, envs []string, params []string, resourceId string) (interface{}, error) {
//判断是否resourceId匹配自定义资源Id
if resourceId != SHUGUANGAI_CUSTOM_RESOURCE_ID {
return nil, errors.New("shuguangAi资源Id不存在")
@ -133,13 +128,19 @@ func (s *ShuguangAi) SubmitTask(imageId string, cmd string, envs []string, param
return nil, err
}
//转换成统一返回类型
submitResp, err := ConvertType[hpcAC.SubmitTaskAiResp](resp, nil)
if err != nil {
return nil, err
}
return resp, nil
}
return submitResp, nil
func (s *ShuguangAi) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, aiType string) (interface{}, error) {
// shuguangAi提交任务
if aiType == PYTORCH {
task, err := s.SubmitPytorchTask(imageId, cmd, envs, params, resourceId)
if err != nil {
return nil, err
}
return task, nil
}
return nil, errors.New("shuguangAi不支持的任务类型")
}
func (s *ShuguangAi) QueryTask(taskId string) (interface{}, error) {
@ -152,13 +153,7 @@ func (s *ShuguangAi) QueryTask(taskId string) (interface{}, error) {
return nil, err
}
//转换成统一返回类型
taskResp, err := ConvertType[hpcAC.GetPytorchTaskResp](resp, nil)
if err != nil {
return nil, err
}
return taskResp, nil
return resp, nil
}
func (s *ShuguangAi) DeleteTask(taskId string) (interface{}, error) {
@ -171,13 +166,7 @@ func (s *ShuguangAi) DeleteTask(taskId string) (interface{}, error) {
return nil, err
}
//转换成统一返回类型
deleteResp, err := ConvertType[hpcAC.DeleteTaskAiResp](resp, nil)
if err != nil {
return nil, err
}
return deleteResp, nil
return resp, nil
}
func (o *ShuguangAi) QuerySpecs() (interface{}, error) {
@ -191,11 +180,39 @@ func (o *ShuguangAi) QuerySpecs() (interface{}, error) {
return nil, err
}
//转换成统一返回类型
specsResp, err := ConvertType[hpcAC.GetResourceSpecResp](specs, o.participant)
return specs, nil
}
func (o *ShuguangAi) GetResourceSpecs() (*collector.ResourceSpecs, error) {
userReq := &hpcAC.GetUserInfoReq{}
userinfo, err := o.svcCtx.ACRpc.GetUserInfo(o.ctx, userReq)
if err != nil {
return nil, err
}
limitReq := &hpcAC.QueueReq{}
_, err = o.svcCtx.ACRpc.QueryUserQuotasLimit(o.ctx, limitReq)
if err != nil {
return nil, err
}
diskReq := &hpcAC.ParaStorQuotaReq{}
_, err = o.svcCtx.ACRpc.ParaStorQuota(o.ctx, diskReq)
if err != nil {
return nil, err
}
return specsResp, nil
balance, _ := strconv.ParseFloat(userinfo.Data.AccountBalance, 64)
_ = &collector.ResourceSpecs{
ParticipantId: o.participantId,
Name: o.platform,
Balance: balance,
}
return nil, nil
}
func (o *ShuguangAi) Execute(option *option.AiOption) (interface{}, error) {
task, err := o.SubmitTask(option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.AiType)
if err != nil {
return nil, err
}
return task, nil
}

View File

@ -31,7 +31,7 @@ type Linkage interface {
UploadImage(path string) (interface{}, error)
DeleteImage(imageId string) (interface{}, error)
QueryImageList() (interface{}, error)
SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (interface{}, error)
SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, aiType string) (interface{}, error)
QueryTask(taskId string) (interface{}, error)
QuerySpecs() (interface{}, error)
DeleteTask(taskId string) (interface{}, error)
@ -65,6 +65,8 @@ var (
"3": SHUGUANGAI,
"4": SHUGUANGHPC,
}
ERROR_RESP_EMPTY = errors.New("resp empty error")
ERROR_CONVERT_EMPTY = errors.New("convert empty error")
)
type StoreLink struct {
@ -74,16 +76,16 @@ type StoreLink struct {
func NewStoreLink(ctx context.Context, svcCtx *svc.ServiceContext, participant *models.StorelinkCenter) *StoreLink {
switch participant.Type {
case TYPE_OCTOPUS:
linkStruct := NewOctopusLink(ctx, svcCtx, participant)
linkStruct := NewOctopusLink(ctx, svcCtx, participant.Name, participant.Id)
return &StoreLink{ILinkage: linkStruct}
case TYPE_MODELARTS:
linkStruct := NewModelArtsLink(ctx, svcCtx, participant)
linkStruct := NewModelArtsLink(ctx, svcCtx, participant.Name, participant.Id)
return &StoreLink{ILinkage: linkStruct}
case TYPE_SHUGUANGAI:
linkStruct := NewShuguangAi(ctx, svcCtx, participant)
linkStruct := NewShuguangAi(ctx, svcCtx, participant.Name, participant.Id)
return &StoreLink{ILinkage: linkStruct}
case TYPE_SHUGUANGHPC:
linkStruct := NewShuguangHpc(ctx, svcCtx, participant)
linkStruct := NewShuguangHpc(ctx, svcCtx, participant.Name, participant.Id)
return &StoreLink{ILinkage: linkStruct}
default:
return nil
@ -102,7 +104,431 @@ func GetParticipantById(partId int64, dbEngin *gorm.DB) *models.StorelinkCenter
return &participant
}
func ConvertType[T any](in *T, participant *models.StorelinkCenter) (interface{}, error) {
func ConvertType(in interface{}, out interface{}, participant *models.StorelinkCenter) (interface{}, error) {
switch (interface{})(in).(type) {
case *octopus.UploadImageResp:
inresp := (interface{})(in).(*octopus.UploadImageResp)
switch (interface{})(out).(type) {
case *types.UploadLinkImageResp:
resp := (interface{})(out).(*types.UploadLinkImageResp)
resp.Success = inresp.Success
if !resp.Success {
resp.ErrorMsg = inresp.Error.Message
return resp, nil
}
return resp, nil
}
return nil, nil
case *octopus.DeleteImageResp:
inresp := (interface{})(in).(*octopus.DeleteImageResp)
switch (interface{})(out).(type) {
case *types.DeleteLinkImageResp:
resp := (interface{})(out).(*types.DeleteLinkImageResp)
resp.Success = inresp.Success
if !resp.Success {
resp.ErrorMsg = inresp.Error.Message
return resp, nil
}
return resp, nil
}
return nil, nil
case *octopus.GetUserImageListResp:
inresp := (interface{})(in).(*octopus.GetUserImageListResp)
switch (interface{})(out).(type) {
case *types.GetLinkImageListResp:
resp := (interface{})(out).(*types.GetLinkImageListResp)
resp.Success = inresp.Success
if !resp.Success {
resp.ErrorMsg = inresp.Error.Message
resp.Images = nil
return resp, nil
}
for _, v := range inresp.Payload.Images {
var image types.ImageSl
image.ImageId = v.Image.Id
image.ImageName = v.Image.ImageName
image.ImageStatus = OctImgStatus[v.Image.ImageStatus]
resp.Images = append(resp.Images, &image)
}
return resp, nil
}
return nil, nil
case *modelarts.ListReposDetailsResp:
inresp := (interface{})(in).(*modelarts.ListReposDetailsResp)
switch (interface{})(out).(type) {
case *types.GetLinkImageListResp:
resp := (interface{})(out).(*types.GetLinkImageListResp)
if inresp.Errors != nil {
resp.Success = false
resp.ErrorMsg = inresp.Errors[0].ErrorMessage
resp.Images = nil
return resp, nil
}
resp.Success = true
for _, v := range inresp.Items {
for _, r := range v.Tags {
var image types.ImageSl
image.ImageId = v.Namespace + "/" + v.Name + ":" + r
image.ImageName = v.Name
image.ImageStatus = "created"
resp.Images = append(resp.Images, &image)
}
}
return resp, nil
}
return nil, nil
case *hpcAC.GetImageListAiResp:
inresp := (interface{})(in).(*hpcAC.GetImageListAiResp)
switch (interface{})(out).(type) {
case *types.GetLinkImageListResp:
resp := (interface{})(out).(*types.GetLinkImageListResp)
if inresp.Code == "0" {
resp.Success = true
for _, img := range inresp.Data {
var image types.ImageSl
image.ImageId = img.ImageId
image.ImageName = img.Version
image.ImageStatus = "created"
resp.Images = append(resp.Images, &image)
}
} else {
resp.Success = false
resp.ErrorMsg = inresp.Msg
resp.Images = nil
}
return resp, nil
}
return nil, nil
case *octopus.CreateTrainJobResp:
inresp := (interface{})(in).(*octopus.CreateTrainJobResp)
switch (interface{})(out).(type) {
case *types.SubmitLinkTaskResp:
resp := (interface{})(out).(*types.SubmitLinkTaskResp)
resp.Success = inresp.Success
if !resp.Success {
resp.ErrorMsg = inresp.Error.Message
return resp, nil
}
resp.TaskId = inresp.Payload.JobId
return resp, nil
}
return nil, nil
case *modelarts.CreateTrainingJobResp:
inresp := (interface{})(in).(*modelarts.CreateTrainingJobResp)
switch (interface{})(out).(type) {
case *types.SubmitLinkTaskResp:
resp := (interface{})(out).(*types.SubmitLinkTaskResp)
if inresp.ErrorMsg != "" {
resp.ErrorMsg = inresp.ErrorMsg
resp.Success = false
return resp, nil
}
resp.Success = true
resp.TaskId = inresp.Metadata.Id
return resp, nil
}
return nil, nil
case *hpcAC.SubmitTaskAiResp:
inresp := (interface{})(in).(*hpcAC.SubmitTaskAiResp)
switch (interface{})(out).(type) {
case *types.SubmitLinkTaskResp:
resp := (interface{})(out).(*types.SubmitLinkTaskResp)
if inresp.Code == "0" {
resp.Success = true
resp.TaskId = inresp.Data
} else {
resp.Success = false
resp.ErrorMsg = inresp.Msg
}
return resp, nil
}
return nil, nil
case *hpcAC.SubmitJobResp:
inresp := (interface{})(in).(*hpcAC.SubmitJobResp)
switch (interface{})(out).(type) {
case *types.SubmitLinkTaskResp:
resp := (interface{})(out).(*types.SubmitLinkTaskResp)
if inresp.Code == "0" {
resp.Success = true
resp.TaskId = inresp.Data
} else {
resp.Success = false
resp.ErrorMsg = inresp.Msg
}
return resp, nil
}
return nil, nil
case *octopus.GetTrainJobResp:
inresp := (interface{})(in).(*octopus.GetTrainJobResp)
switch (interface{})(out).(type) {
case *types.GetLinkTaskResp:
resp := (interface{})(out).(*types.GetLinkTaskResp)
resp.Success = inresp.Success
if !resp.Success {
resp.ErrorMsg = inresp.Error.Message
return resp, nil
}
var task types.TaskSl
task.TaskId = inresp.Payload.TrainJob.Id
task.TaskName = inresp.Payload.TrainJob.Name
task.StartedAt = inresp.Payload.TrainJob.StartedAt
task.CompletedAt = inresp.Payload.TrainJob.CompletedAt
task.TaskStatus = inresp.Payload.TrainJob.Status
resp.Task = &task
return resp, nil
}
return nil, nil
case *modelarts.JobResponse:
inresp := (interface{})(in).(*modelarts.JobResponse)
switch (interface{})(out).(type) {
case *types.GetLinkTaskResp:
resp := (interface{})(out).(*types.GetLinkTaskResp)
if inresp.ErrorMsg != "" {
resp.ErrorMsg = inresp.ErrorMsg
resp.Success = false
return resp, nil
}
resp.Success = true
resp.Task = &types.TaskSl{}
resp.Task.TaskId = inresp.Metadata.Id
resp.Task.TaskName = inresp.Metadata.Name
resp.Task.StartedAt = int64(inresp.Status.StartTime)
resp.Task.CompletedAt = int64(inresp.Status.Duration)
resp.Task.TaskStatus = inresp.Status.Phase
return resp, nil
}
return nil, nil
case *hpcAC.GetPytorchTaskResp:
inresp := (interface{})(in).(*hpcAC.GetPytorchTaskResp)
switch (interface{})(out).(type) {
case *types.GetLinkTaskResp:
resp := (interface{})(out).(*types.GetLinkTaskResp)
if inresp.Code == "0" {
resp.Success = true
var task types.TaskSl
task.TaskId = inresp.Data.Id
task.TaskName = inresp.Data.TaskName
task.TaskStatus = inresp.Data.Status
task.StartedAt = timeutils.StringToUnixTime(inresp.Data.StartTime)
task.CompletedAt = timeutils.StringToUnixTime(inresp.Data.EndTime)
resp.Task = &task
} else {
resp.Success = false
resp.ErrorMsg = inresp.Msg
resp.Task = nil
}
return resp, nil
}
return nil, nil
case *hpcAC.GetJobDetailResp:
inresp := (interface{})(in).(*hpcAC.GetJobDetailResp)
switch (interface{})(out).(type) {
case *types.GetLinkTaskResp:
resp := (interface{})(out).(*types.GetLinkTaskResp)
if inresp.Code == "0" {
resp.Success = true
var task types.TaskSl
task.TaskId = inresp.Data.JobId
task.TaskName = inresp.Data.JobName
task.TaskStatus = AcStatus[inresp.Data.JobStatus]
task.StartedAt = timeutils.StringToUnixTime(inresp.Data.JobStartTime)
task.CompletedAt = timeutils.StringToUnixTime(inresp.Data.JobEndTime)
resp.Task = &task
} else {
resp.Success = false
resp.ErrorMsg = inresp.Msg
resp.Task = nil
}
return resp, nil
}
return nil, nil
case *hpcAC.HistoryJobDetailResp:
inresp := (interface{})(in).(*hpcAC.HistoryJobDetailResp)
switch (interface{})(out).(type) {
case *types.GetLinkTaskResp:
resp := (interface{})(out).(*types.GetLinkTaskResp)
if inresp.Code == "0" {
resp.Success = true
var task types.TaskSl
task.TaskId = inresp.Data.JobId
task.TaskName = inresp.Data.JobName
task.TaskStatus = AcStatus[inresp.Data.JobState]
task.StartedAt = timeutils.StringToUnixTime(inresp.Data.JobStartTime)
task.CompletedAt = timeutils.StringToUnixTime(inresp.Data.JobEndTime)
resp.Task = &task
} else {
resp.Success = false
resp.ErrorMsg = inresp.Msg
resp.Task = nil
}
return resp, nil
}
return nil, nil
case *octopus.DeleteTrainJobResp:
inresp := (interface{})(in).(*octopus.DeleteTrainJobResp)
switch (interface{})(out).(type) {
case *types.DeleteLinkTaskResp:
resp := (interface{})(out).(*types.DeleteLinkTaskResp)
resp.Success = inresp.Success
if !resp.Success {
resp.ErrorMsg = inresp.Error.Message
return resp, nil
}
return resp, nil
}
return nil, nil
case *modelarts.DeleteTrainingJobResp:
inresp := (interface{})(in).(*modelarts.DeleteTrainingJobResp)
switch (interface{})(out).(type) {
case *types.DeleteLinkTaskResp:
resp := (interface{})(out).(*types.DeleteLinkTaskResp)
if inresp.ErrorMsg != "" {
resp.ErrorMsg = inresp.ErrorMsg
resp.Success = false
return resp, nil
}
resp.Success = true
return resp, nil
}
return nil, nil
case *hpcAC.DeleteTaskAiResp:
inresp := (interface{})(in).(*hpcAC.DeleteTaskAiResp)
switch (interface{})(out).(type) {
case *types.DeleteLinkTaskResp:
resp := (interface{})(out).(*types.DeleteLinkTaskResp)
if inresp.Code == "0" {
resp.Success = true
} else {
resp.Success = false
resp.ErrorMsg = inresp.Msg
}
return resp, nil
}
return nil, nil
case *hpcAC.DeleteJobResp:
inresp := (interface{})(in).(*hpcAC.DeleteJobResp)
switch (interface{})(out).(type) {
case *types.DeleteLinkTaskResp:
resp := (interface{})(out).(*types.DeleteLinkTaskResp)
if inresp.Code == "0" {
resp.Success = true
} else {
resp.Success = false
resp.ErrorMsg = inresp.Msg
}
return resp, nil
}
return nil, nil
case *octopus.GetResourceSpecsResp:
inresp := (interface{})(in).(*octopus.GetResourceSpecsResp)
switch (interface{})(out).(type) {
case *types.GetResourceSpecsResp:
resp := (interface{})(out).(*types.GetResourceSpecsResp)
resp.Success = inresp.Success
if !resp.Success {
resp.ResourceSpecs = nil
return resp, nil
}
for _, spec := range inresp.TrainResourceSpecs {
var respec types.ResourceSpecSl
respec.SpecId = spec.Id
respec.SpecName = spec.Name
respec.ParticipantId = participant.Id
respec.ParticipantName = participant.Name
respec.SpecPrice = spec.Price
resp.ResourceSpecs = append(resp.ResourceSpecs, &respec)
}
return resp, nil
}
return nil, nil
case *hpcAC.GetResourceSpecResp:
inresp := (interface{})(in).(*hpcAC.GetResourceSpecResp)
switch (interface{})(out).(type) {
case *types.GetResourceSpecsResp:
resp := (interface{})(out).(*types.GetResourceSpecsResp)
if inresp.Code != "0" {
resp.Success = false
resp.ResourceSpecs = nil
} else {
var spec types.ResourceSpecSl
resp.Success = true
spec.ParticipantName = participant.Name
spec.ParticipantId = participant.Id
spec.SpecName = SHUGUANGAI_CUSTOM_RESOURCE_NAME
spec.SpecId = SHUGUANGAI_CUSTOM_RESOURCE_ID
resp.ResourceSpecs = append(resp.ResourceSpecs, &spec)
}
return resp, nil
}
return nil, nil
case *modelarts.TrainingJobFlavorsResp:
inresp := (interface{})(in).(*modelarts.TrainingJobFlavorsResp)
switch (interface{})(out).(type) {
case *types.GetResourceSpecsResp:
resp := (interface{})(out).(*types.GetResourceSpecsResp)
resp.Success = true
if inresp.Flavors == nil {
resp.Success = false
resp.ResourceSpecs = nil
return resp, nil
}
for _, spec := range inresp.Flavors {
var respec types.ResourceSpecSl
respec.SpecId = spec.FlavorId
respec.SpecName = spec.FlavorName
respec.ParticipantId = participant.Id
respec.ParticipantName = participant.Name
respec.SpecPrice = 0
resp.ResourceSpecs = append(resp.ResourceSpecs, &respec)
}
return resp, nil
}
return nil, nil
default:
return nil, errors.New("type convert fail")
}
}
func ConvertTypeOld[T any](in *T, participant *models.StorelinkCenter) (interface{}, error) {
switch (interface{})(in).(type) {
case *octopus.UploadImageResp:

View File

@ -637,6 +637,100 @@ type AppResp struct {
Data interface{} `json:"data,omitempty"`
}
type AdapterReq struct {
Id string `json:"id,optional" db:"id"`
Name string `json:"name,optional"`
Type string `json:"type,optional"`
Nickname string `json:"nickname,optional"`
Version string `json:"version,optional"`
Server string `json:"server,optional"`
}
type AdapterDelReq struct {
Id string `form:"id,optional" db:"id"`
}
type AdapterInfo struct {
Id string `json:"id,omitempty" db:"id"`
Name string `json:"name,omitempty" db:"name"`
Type string `json:"type,omitempty" db:"type"`
Nickname string `json:"nickname,omitempty" db:"nickname"`
Version string `json:"version,omitempty" db:"version"`
Server string `json:"server,omitempty" db:"server"`
CreateTime string `json:"createTime,omitempty" db:"create_time" gorm:"autoCreateTime"`
}
type AdapterResp struct {
Code int `json:"code,omitempty"`
Msg string `json:"msg,omitempty"`
Data AdapterInfo `json:"data,omitempty"`
}
type AdapterListResp struct {
Code int `json:"code,omitempty"`
Msg string `json:"msg,omitempty"`
Data []AdapterInfo `json:"data,omitempty"`
}
type ClusterReq struct {
Id string `json:"id,optional"`
AdapterId string `json:"adapterId,optional"`
Name string `json:"name,optional"`
Nickname string `json:"nickname,optional"`
Description string `json:"description,optional"`
Server string `json:"server,optional"`
MonitorServer string `json:"monitorServer,optional"`
Username string `json:"username,optional"`
Password string `json:"password,optional"`
Token string `json:"token,optional"`
Ak string `json:"ak,optional"`
Sk string `json:"sk,optional"`
Region string `json:"region,optional"`
ProjectId string `json:"projectId,optional"`
Version string `json:"version,optional"`
Label string `json:"label,optional"`
OwnerId string `json:"ownerId,omitempty,optional"`
AuthType string `json:"authType,optional"`
}
type ClusterDelReq struct {
Id string `form:"id,optional"`
}
type ClusterInfo struct {
Id string `json:"id,omitempty" db:"id"`
AdapterId string `json:"adapterId,omitempty" db:"adapter_id"`
Name string `json:"name,omitempty" db:"name"`
Nickname string `json:"nickname,omitempty" db:"nickname"`
Description string `json:"description,omitempty" db:"description"`
Server string `json:"server,omitempty" db:"server"`
MonitorServer string `json:"monitorServer,omitempty" db:"monitor_server"`
Username string `json:"username,omitempty" db:"username"`
Password string `json:"password,omitempty" db:"password"`
Token string `json:"token,omitempty" db:"token"`
Ak string `json:"ak,omitempty" db:"ak"`
Sk string `json:"sk,omitempty" db:"sk"`
Region string `json:"region,omitempty" db:"region"`
ProjectId string `json:"projectId,omitempty" db:"project_id"`
Version string `json:"version,omitempty" db:"version"`
Label string `json:"label,omitempty" db:"label"`
OwnerId string `json:"ownerId,omitempty" db:"owner_id"`
AuthType string `json:"authType,omitempty" db:"auth_type"`
CreateTime string `json:"createTime,omitempty" db:"created_time" gorm:"autoCreateTime"`
}
type ClusterResp struct {
Code int `json:"code,omitempty"`
Msg string `json:"msg,omitempty"`
Data ClusterInfo `json:"data,omitempty"`
}
type ClusterListResp struct {
Code int `json:"code,omitempty"`
Msg string `json:"msg,omitempty"`
Data []ClusterInfo `json:"data,omitempty"`
}
type Job struct {
SlurmVersion string `json:"slurmVersion"`
Name string `json:"name"`

View File

@ -16,10 +16,6 @@ spec:
labels:
k8s-app: pcm-coordinator-api
spec:
hostAliases:
- hostnames:
- nacos.jcce.dev
ip: nacos_host
imagePullSecrets:
- name: secret_name
containers:

View File

@ -1,54 +0,0 @@
/*
Copyright (c) [2023] [pcm]
[pcm-coordinator] is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
*/
package scheduler
import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/algorithm/providerPricing"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/strategies"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils"
)
type AiScheduler struct {
yamlString string
collector collector.ResourceCollector
}
func NewAiScheduler(val string) *AiScheduler {
return &AiScheduler{yamlString: val}
}
func (as *AiScheduler) getNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) {
ai := models.Ai{
ParticipantId: participantId,
TaskId: task.TaskId,
Status: "Saved",
YamlString: as.yamlString,
}
utils.Convert(task.Metadata, &ai)
return ai, nil
}
func (as *AiScheduler) pickOptimalStrategy() (strategies.Strategy, error) {
//a, b := as.genTaskAndProviders()
return nil, nil
}
func (as *AiScheduler) genTaskAndProviders() (*providerPricing.Task, []*providerPricing.Provider) {
return nil, nil
}

View File

@ -1,8 +0,0 @@
package collector
type AiCollector struct {
}
func (a *AiCollector) getResourceSpecs() {
}

View File

@ -1,8 +0,0 @@
package collector
type ResourceCollector interface {
getResourceSpecs()
}
type ResourceSpecs struct {
}

View File

@ -1,11 +0,0 @@
package database
type Storage interface {
GetProviderParams() ([]providerParams, error)
FindAvailableCluster()
}
type NSIDSpecified interface {
Storage
NSID() error
}

View File

@ -1,12 +0,0 @@
package strategies
type ReplicationStrategy struct {
}
func (ps *ReplicationStrategy) findAvailableCLuster() error {
return nil
}
func (ps *ReplicationStrategy) Schedule() ([]*AssignedCluster, error) {
return nil, nil
}

View File

@ -36,3 +36,8 @@ func InitSnowflake(machineID int64) (err error) {
func GenSnowflakeID() int64 {
return node.Generate().Int64()
}
// machineId 工作id
func GenSnowflakeIDStr() string {
return node.Generate().String()
}

View File

@ -16,10 +16,6 @@ spec:
labels:
k8s-app: pcm-coordinator-rpc
spec:
hostAliases:
- hostnames:
- nacos.jcce.dev
ip: nacos_host
imagePullSecrets:
- name: secret_name
containers: