Former-commit-id: 91f8be93a0fa03ce6afe65737613f7c00c253d1a
This commit is contained in:
zhangwei 2024-04-30 15:15:37 +08:00
commit fdc7d53cd9
46 changed files with 5136 additions and 104 deletions

View File

@ -1697,6 +1697,44 @@ PayloadCreateTrainJob{
jobId string `json:"jobId,optional"`
}
********************/
/******************Ai Center overview*************************/
CenterOverviewResp {
CenterNum int32 `json:"totalCenters,optional"`
TaskNum int32 `json:"totalTasks,optional"`
CardNum int32 `json:"totalCards,optional"`
PowerInTops float64 `json:"totalPower,optional"`
}
CenterQueueingResp {
Current []*CenterQueue `json:"current,optional"`
History []*CenterQueue `json:"history,optional"`
}
CenterQueue {
Name string `json:"name,optional"`
QueueingNum int32 `json:"num,optional"`
}
CenterListResp {
List []*AiCenter `json:"centerList,optional"`
}
AiCenter {
Name string `json:"name,optional"`
StackName string `json:"stack,optional"`
Version string `json:"version,optional"`
}
CenterTaskListResp {
List []*AiTask `json:"taskList,optional"`
}
AiTask {
Name string `json:"name,optional"`
status string `json:"status,optional"`
TimeElapsed int32 `json:"elapsed,optional"`
}
)
/******************create TrainIngJob end*************************/

View File

@ -59,9 +59,9 @@ type (
Type int64 `json:"type"` // 租户所属(0数算1超算2智算
DeletedFlag int64 `json:"deletedFlag"` // 是否删除
CreatedBy int64 `json:"createdBy"` // 创建人
CreatedTime string `json:"createdTime"` // 创建时间
CreateTime string `json:"createdTime"` // 创建时间
UpdatedBy int64 `json:"updatedBy"` // 更新人
UpdatedTime string `json:"updated_time"` // 更新时间
UpdateTime string `json:"updated_time"` // 更新时间
}
UpdateTenantReq {
@ -115,6 +115,6 @@ type Cloud {
StartTime string `json:"startTime"` // 开始时间
RunningTime int64 `json:"runningTime"` // 运行时长
CreatedBy int64 `json:"createdBy"` // 创建人
CreatedTime string `json:"createdTime"` // 创建时间
CreateTime string `json:"createdTime"` // 创建时间
Result string `json:"result"`
}

View File

@ -32,6 +32,23 @@ type (
}
)
type (
HomeOverviewReq {
}
HomeOverviewResp {
Code int `json:"code"`
Message string `json:"message"`
Data HomeOverviewData `json:"data"`
}
HomeOverviewData{
AdaptSum int64 `json:"adaptSum"`
ClusterSum int64 `json:"clusterSum"`
StorageSum float32 `json:"storageSum"`
TaskSum int64 `json:"taskSum"`
}
)
type remoteResp {
Code int `json:"code"`
Message string `json:"message"`
@ -95,24 +112,13 @@ type (
type (
GeneralTaskReq {
Name string `json:"name"`
ComputeType string `json:"computeType"`
TemplateId string `json:"templateId"`
AdapterId string `json:"adapterId"`
ClusterIds []string `json:"clusterIds"`
Strategy Strategy `json:"strategy"`
Name string `json:"name"`
AdapterIds []string `json:"adapterIds"`
ClusterIds []string `json:"clusterIds"`
Strategy string `json:"strategy"`
StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"`
ReqBody []string `json:"reqBody"`
}
Strategy {
Name string `json:"name"`
StaticWeightList []StaticWeightList `json:"staticWeightList"`
}
StaticWeightList {
ClusterName string `json:"clusterName"`
Weight int `json:"weight"`
}
)
type deleteTaskReq {
@ -941,9 +947,9 @@ type (
Environment string `json:"environment"`
DeletedFlag int64 `json:"deleted_flag"` // 是否删除0-否1-是)
CreatedBy int64 `json:"created_by"` // 创建人
CreatedTime string `json:"created_time"` // 创建时间
CreateTime string `json:"created_time"` // 创建时间
UpdatedBy int64 `json:"updated_by"` // 更新人
UpdatedTime string `json:"updated_time"` // 更新时间
UpdateTime string `json:"updated_time"` // 更新时间
}
CloudInfo {
@ -1093,4 +1099,14 @@ type TaskStatusResp {
Failed int `json:"Failed"`
Running int `json:"Running"`
Pause int `json:"Pause"`
}
type TaskDetailsResp {
Name string `json:"name"`
description string `json:"description"`
StartTime string `json:"startTime"`
EndTime string `json:"endTime"`
Strategy int64 `json:"strategy,string"`
SynergyStatus int64 `json:"synergyStatus,string"`
ClusterInfos []*ClusterInfo `json:"clusterInfos"`
}

View File

@ -137,6 +137,14 @@ service pcm {
@doc "Statistical task status"
@handler countTaskStatus
get /core/task/countTaskStatus () returns (TaskStatusResp)
@doc "Home Page Overview"
@handler homeOverviewHandler
get /core/homeOverview (HomeOverviewReq) returns (HomeOverviewResp)
@doc "task details"
@handler taskDetails
get /core/task/details (FId) returns(TaskDetailsResp)
}
//hpc二级接口
@ -219,6 +227,22 @@ service pcm {
group: ai
)
service pcm {
@doc "智算中心概览"
@handler getCenterOverviewHandler
get /ai/getCenterOverview returns (CenterOverviewResp)
@doc "智算中心排队状况"
@handler getCenterQueueingHandler
get /ai/getCenterQueueing returns (CenterQueueingResp)
@doc "智算中心列表"
@handler getCenterListHandler
get /ai/getCenterList returns (CenterListResp)
@doc "智算中心任务列表"
@handler getCenterTaskListHandler
get /ai/getCenterTaskList returns (CenterTaskListResp)
@doc "查询数据集列表"
@handler listDataSetHandler
get /ai/listDataSet/:projectId (DataSetReq) returns (DataSetResp)
@ -927,6 +951,9 @@ service pcm {
@handler ScheduleSubmitHandler
post /schedule/submit (ScheduleReq) returns (ScheduleResp)
@handler ScheduleGetOverviewHandler
post /schedule/getOverview returns (ScheduleOverviewResp)
}
@server(
@ -994,7 +1021,7 @@ service pcm {
@doc "Synchronize Cluster alert Information"
@handler syncClusterAlertHandler
post /monitoring/syncClusterAlert (SyncClusterAlertReq)
post /core/syncClusterAlert (SyncClusterAlertReq)
@handler taskNumHandler
get /monitoring/task/num (taskNumReq) returns (taskNumResp)

View File

@ -24,6 +24,9 @@ type (
Msg string `json:"msg"`
}
ScheduleOverviewResp {
}
AiOption {
TaskName string `json:"taskName"`
AdapterId string `json:"adapterId"`
@ -81,4 +84,20 @@ type (
AiJobLogResp {
Log string `json:"log"`
}
AiTaskDb {
Id string `json:"id,omitempty" db:"id"`
TaskId string `json:"taskId,omitempty" db:"task_id"`
AdapterId string `json:"adapterId,omitempty" db:"adapter_id"`
ClusterId string `json:"clusterId,omitempty" db:"cluster_id"`
Name string `json:"name,omitempty" db:"name"`
Replica string `json:"replica,omitempty" db:"replica"`
ClusterTaskId string `json:"clusterTaskId,omitempty" db:"c_task_id"`
Strategy string `json:"strategy,omitempty" db:"strategy"`
Status string `json:"status,omitempty" db:"status"`
Msg string `json:"msg,omitempty" db:"msg"`
CommitTime string `json:"commitTime,omitempty" db:"commit_time"`
StartTime string `json:"startTime,omitempty" db:"start_time"`
EndTime string `json:"endTime,omitempty" db:"end_time"`
}
)

View File

@ -5,7 +5,7 @@ Port: 8999
Timeout: 50000
DB:
DataSource: root:uJpLd6u-J?HC1@(10.206.0.12:3306)/pcm?parseTime=true&loc=Local
DataSource: root:uJpLd6u-J?HC1@(47.92.88.143:3306)/pcm?parseTime=true&loc=Local
# DataSource: root:uJpLd6u-J?HC1@(47.92.88.143:3306)/pcm?parseTime=true&loc=Local
Redis:
Host: 10.206.0.12:6379

View File

@ -0,0 +1,21 @@
package ai
import (
"net/http"
"github.com/zeromicro/go-zero/rest/httpx"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/logic/ai"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
)
func GetCenterListHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
l := ai.NewGetCenterListLogic(r.Context(), svcCtx)
resp, err := l.GetCenterList()
if err != nil {
httpx.ErrorCtx(r.Context(), w, err)
} else {
httpx.OkJsonCtx(r.Context(), w, resp)
}
}
}

View File

@ -0,0 +1,21 @@
package ai
import (
"net/http"
"github.com/zeromicro/go-zero/rest/httpx"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/logic/ai"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
)
func GetCenterOverviewHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
l := ai.NewGetCenterOverviewLogic(r.Context(), svcCtx)
resp, err := l.GetCenterOverview()
if err != nil {
httpx.ErrorCtx(r.Context(), w, err)
} else {
httpx.OkJsonCtx(r.Context(), w, resp)
}
}
}

View File

@ -0,0 +1,21 @@
package ai
import (
"net/http"
"github.com/zeromicro/go-zero/rest/httpx"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/logic/ai"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
)
func GetCenterQueueingHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
l := ai.NewGetCenterQueueingLogic(r.Context(), svcCtx)
resp, err := l.GetCenterQueueing()
if err != nil {
httpx.ErrorCtx(r.Context(), w, err)
} else {
httpx.OkJsonCtx(r.Context(), w, resp)
}
}
}

View File

@ -0,0 +1,21 @@
package ai
import (
"net/http"
"github.com/zeromicro/go-zero/rest/httpx"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/logic/ai"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
)
func GetCenterTaskListHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
l := ai.NewGetCenterTaskListLogic(r.Context(), svcCtx)
resp, err := l.GetCenterTaskList()
if err != nil {
httpx.ErrorCtx(r.Context(), w, err)
} else {
httpx.OkJsonCtx(r.Context(), w, resp)
}
}
}

View File

@ -0,0 +1,28 @@
package core
import (
"net/http"
"github.com/zeromicro/go-zero/rest/httpx"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/logic/core"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
)
func HomeOverviewHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req types.HomeOverviewReq
if err := httpx.Parse(r, &req); err != nil {
httpx.ErrorCtx(r.Context(), w, err)
return
}
l := core.NewHomeOverviewLogic(r.Context(), svcCtx)
resp, err := l.HomeOverview(&req)
if err != nil {
httpx.ErrorCtx(r.Context(), w, err)
} else {
httpx.OkJsonCtx(r.Context(), w, resp)
}
}
}

View File

@ -0,0 +1,24 @@
package core
import (
"github.com/zeromicro/go-zero/rest/httpx"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/logic/core"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
"net/http"
)
func TaskDetailsHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req types.FId
if err := httpx.Parse(r, &req); err != nil {
result.ParamErrorResult(r, w, err)
return
}
l := core.NewTaskDetailsLogic(r.Context(), svcCtx)
resp, err := l.TaskDetails(&req)
result.HttpResult(r, w, resp, err)
}
}

View File

@ -165,6 +165,16 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
Path: "/core/task/countTaskStatus",
Handler: core.CountTaskStatusHandler(serverCtx),
},
{
Method: http.MethodGet,
Path: "/core/homeOverview",
Handler: core.HomeOverviewHandler(serverCtx),
},
{
Method: http.MethodGet,
Path: "/core/task/details",
Handler: core.TaskDetailsHandler(serverCtx),
},
},
rest.WithPrefix("/pcm/v1"),
)
@ -258,6 +268,26 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
server.AddRoutes(
[]rest.Route{
{
Method: http.MethodGet,
Path: "/ai/getCenterOverview",
Handler: ai.GetCenterOverviewHandler(serverCtx),
},
{
Method: http.MethodGet,
Path: "/ai/getCenterQueueing",
Handler: ai.GetCenterQueueingHandler(serverCtx),
},
{
Method: http.MethodGet,
Path: "/ai/getCenterList",
Handler: ai.GetCenterListHandler(serverCtx),
},
{
Method: http.MethodGet,
Path: "/ai/getCenterTaskList",
Handler: ai.GetCenterTaskListHandler(serverCtx),
},
{
Method: http.MethodGet,
Path: "/ai/listDataSet/:projectId",
@ -1155,6 +1185,11 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
Path: "/schedule/submit",
Handler: schedule.ScheduleSubmitHandler(serverCtx),
},
{
Method: http.MethodPost,
Path: "/schedule/getOverview",
Handler: schedule.ScheduleGetOverviewHandler(serverCtx),
},
},
rest.WithPrefix("/pcm/v1"),
)
@ -1249,7 +1284,7 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
},
{
Method: http.MethodPost,
Path: "/monitoring/syncClusterAlert",
Path: "/core/syncClusterAlert",
Handler: monitoring.SyncClusterAlertHandler(serverCtx),
},
{

View File

@ -0,0 +1,21 @@
package schedule
import (
"net/http"
"github.com/zeromicro/go-zero/rest/httpx"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/logic/schedule"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
)
func ScheduleGetOverviewHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
l := schedule.NewScheduleGetOverviewLogic(r.Context(), svcCtx)
resp, err := l.ScheduleGetOverview()
if err != nil {
httpx.ErrorCtx(r.Context(), w, err)
} else {
httpx.OkJsonCtx(r.Context(), w, resp)
}
}
}

View File

@ -0,0 +1,30 @@
package ai
import (
"context"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type GetCenterListLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewGetCenterListLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetCenterListLogic {
return &GetCenterListLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *GetCenterListLogic) GetCenterList() (resp *types.CenterListResp, err error) {
// todo: add your logic here and delete this line
return
}

View File

@ -0,0 +1,30 @@
package ai
import (
"context"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type GetCenterOverviewLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewGetCenterOverviewLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetCenterOverviewLogic {
return &GetCenterOverviewLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *GetCenterOverviewLogic) GetCenterOverview() (resp *types.CenterOverviewResp, err error) {
// todo: add your logic here and delete this line
return
}

View File

@ -0,0 +1,30 @@
package ai
import (
"context"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type GetCenterQueueingLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewGetCenterQueueingLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetCenterQueueingLogic {
return &GetCenterQueueingLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *GetCenterQueueingLogic) GetCenterQueueing() (resp *types.CenterQueueingResp, err error) {
// todo: add your logic here and delete this line
return
}

View File

@ -0,0 +1,30 @@
package ai
import (
"context"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type GetCenterTaskListLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewGetCenterTaskListLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetCenterTaskListLogic {
return &GetCenterTaskListLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *GetCenterTaskListLogic) GetCenterTaskList() (resp *types.CenterTaskListResp, err error) {
// todo: add your logic here and delete this line
return
}

View File

@ -7,6 +7,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models/cloud"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"io"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
@ -52,7 +53,6 @@ func (l *CommitGeneralTaskLogic) CommitGeneralTask(req *types.GeneralTaskReq) er
Status: constants.Saved,
Name: req.Name,
CommitTime: time.Now(),
NsID: "ns-admin",
YamlString: "[" + result + "]",
}
// Save the task data to the database
@ -62,7 +62,7 @@ func (l *CommitGeneralTaskLogic) CommitGeneralTask(req *types.GeneralTaskReq) er
}
var clusters []*models.CloudModel
err := l.svcCtx.DbEngin.Raw("SELECT * FROM `t_cluster` where adapter_id = ? and id in ?", req.AdapterId, req.ClusterIds).Scan(&clusters).Error
err := l.svcCtx.DbEngin.Raw("SELECT * FROM `t_cluster` where adapter_id in ? and id in ?", req.AdapterIds, req.ClusterIds).Scan(&clusters).Error
if err != nil {
logx.Errorf("CommitGeneralTask() => sql execution error: %v", err)
return errors.Errorf("the cluster does not match the drive resources. Check the data")
@ -73,11 +73,12 @@ func (l *CommitGeneralTaskLogic) CommitGeneralTask(req *types.GeneralTaskReq) er
for _, s := range req.ReqBody {
sStruct := UnMarshalK8sStruct(s)
unString, _ := sStruct.MarshalJSON()
taskCloud.Id = utils.GenSnowflakeIDUint()
taskCloud.TaskId = uint(taskModel.Id)
taskCloud.AdapterId = c.AdapterId
taskCloud.ClusterId = c.Id
taskCloud.ClusterName = c.Name
taskCloud.Status = "Saved"
taskCloud.Status = "Pending"
taskCloud.YamlString = string(unString)
taskCloud.Kind = sStruct.GetKind()
taskCloud.Namespace = sStruct.GetNamespace()
@ -113,6 +114,14 @@ func UnMarshalK8sStruct(yamlString string) *unstructured.Unstructured {
if len(unstructuredObj.GetNamespace()) == 0 {
unstructuredObj.SetNamespace("default")
}
//设置副本数
if unstructuredObj.GetKind() == "Deployment" || unstructuredObj.GetKind() == "StatefulSet" {
unstructured.SetNestedField(
unstructuredObj.Object,
int64(6),
"spec", "replicas",
)
}
}
return unstructuredObj
}

View File

@ -0,0 +1,73 @@
package core
import (
"context"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type HomeOverviewLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewHomeOverviewLogic(ctx context.Context, svcCtx *svc.ServiceContext) *HomeOverviewLogic {
return &HomeOverviewLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *HomeOverviewLogic) HomeOverview(req *types.HomeOverviewReq) (resp *types.HomeOverviewResp, err error) {
// todo: add your logic here and delete this line
resp = &types.HomeOverviewResp{}
var AdapterSum int //
var StorageSum float32 //
var ClusterSum int //
var TaskSum int //
//Task
sqlStrTask := "SELECT COUNT(*) FROM `task`"
txTask := l.svcCtx.DbEngin.Raw(sqlStrTask).Scan(&TaskSum)
if txTask.Error != nil {
logx.Error(err)
return nil, txTask.Error
}
//Storage
sqlStrStorage := "SELECT SUM(t.storage_space) as storageSum FROM `t_storage_device` t"
txStorage := l.svcCtx.DbEngin.Raw(sqlStrStorage).Scan(&StorageSum)
if txTask.Error != nil {
logx.Error(err)
return nil, txStorage.Error
}
//Cluster
sqlStrCluster := "SELECT COUNT(*) FROM `t_cluster`"
txCluster := l.svcCtx.DbEngin.Raw(sqlStrCluster).Scan(&ClusterSum)
if txTask.Error != nil {
logx.Error(err)
return nil, txCluster.Error
}
//Adapter
sqlStrAdapter := "SELECT COUNT(*) FROM `t_adapter`"
txAdapter := l.svcCtx.DbEngin.Raw(sqlStrAdapter).Scan(&AdapterSum)
if txTask.Error != nil {
logx.Error(err)
return nil, txAdapter.Error
}
resp.Data.TaskSum = int64(TaskSum)
resp.Data.StorageSum = StorageSum
resp.Data.AdaptSum = int64(AdapterSum)
resp.Data.ClusterSum = int64(ClusterSum)
resp.Code = 200
resp.Message = "Success"
return resp, nil
}

View File

@ -0,0 +1,61 @@
package core
import (
"context"
"github.com/pkg/errors"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"gorm.io/gorm"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type TaskDetailsLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewTaskDetailsLogic(ctx context.Context, svcCtx *svc.ServiceContext) *TaskDetailsLogic {
return &TaskDetailsLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *TaskDetailsLogic) TaskDetails(req *types.FId) (resp *types.TaskDetailsResp, err error) {
var task models.Task
if errors.Is(l.svcCtx.DbEngin.Where("id", req.Id).First(&task).Error, gorm.ErrRecordNotFound) {
return nil, errors.New("记录不存在")
}
clusterIds := make([]int64, 0)
var cList []*types.ClusterInfo
switch task.TaskTypeDict {
case 0:
l.svcCtx.DbEngin.Table("task_cloud").Select("cluster_id").Where("task_id", task.Id).Scan(&clusterIds)
case 1:
l.svcCtx.DbEngin.Table("task_ai").Select("cluster_id").Where("task_id", task.Id).Scan(&clusterIds)
case 2:
l.svcCtx.DbEngin.Table("task_hpc").Select("cluster_id").Where("task_id", task.Id).Scan(&clusterIds)
case 3:
l.svcCtx.DbEngin.Table("task_vm").Select("cluster_id").Where("task_id", task.Id).Find(&clusterIds)
}
err = l.svcCtx.DbEngin.Table("t_cluster").Where("id in ?", clusterIds).Scan(&cList).Error
if err != nil {
return resp, err
}
resp = &types.TaskDetailsResp{
Name: task.Name,
Description: task.Description,
StartTime: task.StartTime,
EndTime: task.EndTime,
Strategy: task.Strategy,
SynergyStatus: task.SynergyStatus,
ClusterInfos: cList,
}
return
}

View File

@ -26,7 +26,11 @@ func NewScheduleGetAiJobLogLogLogic(ctx context.Context, svcCtx *svc.ServiceCont
func (l *ScheduleGetAiJobLogLogLogic) ScheduleGetAiJobLogLog(req *types.AiJobLogReq) (resp *types.AiJobLogResp, err error) {
resp = &types.AiJobLogResp{}
log, err := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[req.AdapterId][req.ClusterId].GetTrainingTaskLog(l.ctx, req.TaskId, req.InstanceNum)
id, err := l.svcCtx.Scheduler.AiStorages.GetAiTaskIdByClusterIdAndTaskId(req.ClusterId, req.TaskId)
if err != nil {
return nil, err
}
log, err := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[req.AdapterId][req.ClusterId].GetTrainingTaskLog(l.ctx, id, req.InstanceNum)
if err != nil {
return nil, err
}

View File

@ -0,0 +1,30 @@
package schedule
import (
"context"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type ScheduleGetOverviewLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewScheduleGetOverviewLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleGetOverviewLogic {
return &ScheduleGetOverviewLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *ScheduleGetOverviewLogic) ScheduleGetOverview() (resp *types.ScheduleOverviewResp, err error) {
// todo: add your logic here and delete this line
return
}

View File

@ -6,6 +6,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"github.com/zeromicro/go-zero/core/logx"
)
@ -51,6 +52,10 @@ func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *type
switch opt.GetOptionType() {
case option.AI:
id, err := l.svcCtx.Scheduler.AiStorages.SaveTask(req.AiOption.TaskName)
if err != nil {
return nil, err
}
rs := (results).([]*schedulers.AiResult)
for _, r := range rs {
scheResult := &types.ScheduleResult{}
@ -59,12 +64,13 @@ func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *type
scheResult.Strategy = r.Strategy
scheResult.Replica = r.Replica
scheResult.Msg = r.Msg
err := l.svcCtx.Scheduler.AiStorages.SaveAiTask(id, opt, r.ClusterId, r.TaskId, constants.Running, r.Msg)
if err != nil {
return nil, err
}
resp.Results = append(resp.Results, scheResult)
}
err = l.svcCtx.Scheduler.AiStorages.SaveTask(req.AiOption.TaskName)
if err != nil {
return nil, err
}
}
return resp, nil

View File

@ -2,10 +2,12 @@ package database
import (
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"gorm.io/gorm"
"strconv"
"time"
)
@ -48,7 +50,17 @@ func (s *AiStorage) GetAdapterIdsByType(adapterType string) ([]string, error) {
return ids, nil
}
func (s *AiStorage) SaveTask(name string) error {
func (s *AiStorage) GetAiTasks() ([]*types.AiTaskDb, error) {
var resp []*types.AiTaskDb
tx := s.DbEngin.Raw("select * from task_ai").Scan(&resp)
if tx.Error != nil {
logx.Errorf(tx.Error.Error())
return nil, tx.Error
}
return resp, nil
}
func (s *AiStorage) SaveTask(name string) (int64, error) {
// 构建主任务结构体
taskModel := models.Task{
Status: constants.Saved,
@ -58,12 +70,52 @@ func (s *AiStorage) SaveTask(name string) error {
}
// 保存任务数据到数据库
tx := s.DbEngin.Create(&taskModel)
if tx.Error != nil {
return 0, tx.Error
}
return taskModel.Id, nil
}
func (s *AiStorage) SaveAiTask(taskId int64, option *option.AiOption, clusterId string, jobId string, status string, msg string) error {
// 构建主任务结构体
aId, err := strconv.ParseInt(option.AdapterId, 10, 64)
if err != nil {
return err
}
cId, err := strconv.ParseInt(clusterId, 10, 64)
if err != nil {
return err
}
aiTaskModel := models.TaskAi{
TaskId: taskId,
AdapterId: aId,
ClusterId: cId,
Name: option.TaskName,
Replica: option.Replica,
JobId: jobId,
Strategy: option.StrategyName,
Status: status,
Msg: msg,
CommitTime: time.Now(),
}
// 保存任务数据到数据库
tx := s.DbEngin.Create(&aiTaskModel)
if tx.Error != nil {
return tx.Error
}
return nil
}
func (s *AiStorage) GetAiTaskIdByClusterIdAndTaskId(clusterId string, taskId string) (string, error) {
var aiTask models.TaskAi
tx := s.DbEngin.Raw("select * from task_ai where `cluster_id` = ? and `task_id` = ?", clusterId, taskId).Scan(&aiTask)
if tx.Error != nil {
logx.Errorf(tx.Error.Error())
return "", tx.Error
}
return aiTask.JobId, nil
}
func (s *AiStorage) UpdateTask() error {
return nil
}

View File

@ -129,42 +129,19 @@ func (s *Scheduler) TempAssign() error {
}
func (s *Scheduler) AssignAndSchedule(ss SubSchedule) (interface{}, error) {
//// 已指定 ParticipantId
//if s.task.ParticipantId != 0 {
// return nil
//}
//// 标签匹配以及后未找到ParticipantIds
//if len(s.participantIds) == 0 {
// return errors.New("未找到匹配的ParticipantIds")
//}
//
//// 指定或者标签匹配的结果只有一个集群,给任务信息指定
//if len(s.participantIds) == 1 {
// s.task.ParticipantId = s.participantIds[0]
// //replicas := s.task.Metadata.(map[string]interface{})["spec"].(map[string]interface{})["replicas"].(float64)
// //result := make(map[int64]string)
// //result[s.participantIds[0]] = strconv.FormatFloat(replicas, 'f', 2, 64)
// //s.result = result
//
// return nil
//}
//choose strategy
strategy, err := ss.PickOptimalStrategy()
if err != nil {
return nil, err
}
//schedule
clusters, err := strategy.Schedule()
if err != nil {
return nil, err
}
//集群数量不满足,指定到标签匹配后第一个集群
//if len(providerList) < 2 {
// s.task.ParticipantId = s.participantIds[0]
// return nil
//}
//assign tasks to clusters
resp, err := ss.AssignTask(clusters)
if err != nil {
return nil, err

View File

@ -26,6 +26,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy/param"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/JointCloud/pcm-octopus/octopus"
@ -168,32 +169,46 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa
errs = append(errs, e)
}
if len(errs) == len(clusters) {
return nil, errors.New("submit task failed")
for s := range ch {
results = append(results, s)
}
if len(errs) != 0 {
var msg string
taskId, err := as.AiStorages.SaveTask(as.option.TaskName)
if err != nil {
return nil, err
}
var errmsg string
for _, err := range errs {
e := (err).(struct {
err error
clusterId string
})
msg += fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error())
msg := fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error())
errmsg += msg
err := as.AiStorages.SaveAiTask(taskId, as.option, e.clusterId, "", constants.Failed, msg)
if err != nil {
return nil, err
}
}
for s := range ch {
if s.Msg != "" {
msg += fmt.Sprintf("clusterId: %v , error: %v \n", s.ClusterId, s.Msg)
msg := fmt.Sprintf("clusterId: %v , error: %v \n", s.ClusterId, s.Msg)
errmsg += msg
err := as.AiStorages.SaveAiTask(taskId, as.option, s.ClusterId, "", constants.Failed, msg)
if err != nil {
return nil, err
}
} else {
msg += fmt.Sprintf("clusterId: %v , submitted successfully, taskId: %v \n", s.ClusterId, s.TaskId)
msg := fmt.Sprintf("clusterId: %v , submitted successfully, taskId: %v \n", s.ClusterId, s.TaskId)
errmsg += msg
err := as.AiStorages.SaveAiTask(taskId, as.option, s.ClusterId, s.TaskId, constants.Succeeded, msg)
if err != nil {
return nil, err
}
}
}
return nil, errors.New(msg)
}
for s := range ch {
// TODO: database operation
results = append(results, s)
return nil, errors.New(errmsg)
}
return results, nil

View File

@ -4,6 +4,7 @@ type AiOption struct {
AdapterId string
ClusterIds []string
TaskName string
Replica int64
ResourceType string // cpu/gpu/compute card
CpuCoreNum int64
TaskType string // pytorch/tensorflow/mindspore

View File

@ -1,7 +1,12 @@
package option
type CloudOption struct {
task interface{}
Name string `json:"name"`
AdapterIds []string `json:"adapterIds"`
ClusterIds []string `json:"clusterIds"`
Strategy string `json:"strategy"`
StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"`
ReqBody []string `json:"reqBody"`
}
func (c CloudOption) GetOptionType() string {

View File

@ -7,6 +7,8 @@ type AiCollector interface {
GetDatasetsSpecs(ctx context.Context) ([]*DatasetsSpecs, error)
GetAlgorithms(ctx context.Context) ([]*Algorithm, error)
GetTrainingTaskLog(ctx context.Context, taskId string, instanceNum string) (string, error)
DownloadAlgorithmCode(ctx context.Context, resourceType string, taskType string, dataset string, algorithm string) (string, error)
UploadAlgorithmCode(ctx context.Context, resourceType string, taskType string, dataset string, algorithm string, code string) error
}
type ResourceStats struct {

View File

@ -162,6 +162,14 @@ func (m *ModelArtsLink) GetAlgorithms(ctx context.Context) ([]*collector.Algorit
return nil, nil
}
func (m *ModelArtsLink) DownloadAlgorithmCode(ctx context.Context, resourceType string, taskType string, dataset string, algorithm string) (string, error) {
return "", nil
}
func (m *ModelArtsLink) UploadAlgorithmCode(ctx context.Context, resourceType string, taskType string, dataset string, algorithm string, code string) error {
return nil
}
func (m *ModelArtsLink) GetTrainingTaskLog(ctx context.Context, taskId string, instanceNum string) (string, error) {
return "", nil
}

View File

@ -337,6 +337,14 @@ func (o *OctopusLink) GetAlgorithms(ctx context.Context) ([]*collector.Algorithm
return algorithms, nil
}
func (o *OctopusLink) DownloadAlgorithmCode(ctx context.Context, resourceType string, taskType string, dataset string, algorithm string) (string, error) {
return "", nil
}
func (o *OctopusLink) UploadAlgorithmCode(ctx context.Context, resourceType string, taskType string, dataset string, algorithm string, code string) error {
return nil
}
func (o *OctopusLink) GetTrainingTaskLog(ctx context.Context, taskId string, instanceNum string) (string, error) {
instance, err := strconv.ParseInt(instanceNum, 10, 32)
if err != nil {

View File

@ -447,6 +447,14 @@ func (s *ShuguangAi) GetAlgorithms(ctx context.Context) ([]*collector.Algorithm,
return algorithms, nil
}
func (s *ShuguangAi) DownloadAlgorithmCode(ctx context.Context, resourceType string, taskType string, dataset string, algorithm string) (string, error) {
return "", nil
}
func (s *ShuguangAi) UploadAlgorithmCode(ctx context.Context, resourceType string, taskType string, dataset string, algorithm string, code string) error {
return nil
}
func (s *ShuguangAi) GetTrainingTaskLog(ctx context.Context, taskId string, instanceNum string) (string, error) {
req := &hpcAC.GetInstanceLogReq{
TaskId: taskId,

View File

@ -25,6 +25,22 @@ type CenterIndex struct {
CenterType string `json:"centerType"`
}
type HomeOverviewReq struct {
}
type HomeOverviewResp struct {
Code int `json:"code"`
Message string `json:"message"`
Data HomeOverviewData `json:"data"`
}
type HomeOverviewData struct {
AdaptSum int64 `json:"adaptSum"`
ClusterSum int64 `json:"clusterSum"`
StorageSum float32 `json:"storageSum"`
TaskSum int64 `json:"taskSum"`
}
type RemoteResp struct {
Code int `json:"code"`
Message string `json:"message"`
@ -83,23 +99,12 @@ type Region struct {
}
type GeneralTaskReq struct {
Name string `json:"name"`
ComputeType string `json:"computeType"`
TemplateId string `json:"templateId"`
AdapterId string `json:"adapterId"`
ClusterIds []string `json:"clusterIds"`
Strategy Strategy `json:"strategy"`
ReqBody []string `json:"reqBody"`
}
type Strategy struct {
Name string `json:"name"`
StaticWeightList []StaticWeightList `json:"staticWeightList"`
}
type StaticWeightList struct {
ClusterName string `json:"clusterName"`
Weight int `json:"weight"`
Name string `json:"name"`
AdapterIds []string `json:"adapterIds"`
ClusterIds []string `json:"clusterIds"`
Strategy string `json:"strategy"`
StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"`
ReqBody []string `json:"reqBody"`
}
type DeleteTaskReq struct {
@ -912,9 +917,9 @@ type HpcInfo struct {
Environment string `json:"environment"`
DeletedFlag int64 `json:"deleted_flag"` // 是否删除0-否1-是)
CreatedBy int64 `json:"created_by"` // 创建人
CreatedTime string `json:"created_time"` // 创建时间
CreateTime string `json:"created_time"` // 创建时间
UpdatedBy int64 `json:"updated_by"` // 更新人
UpdatedTime string `json:"updated_time"` // 更新时间
UpdateTime string `json:"updated_time"` // 更新时间
}
type CloudInfo struct {
@ -1059,6 +1064,16 @@ type TaskStatusResp struct {
Pause int `json:"Pause"`
}
type TaskDetailsResp struct {
Name string `json:"name"`
Description string `json:"description"`
StartTime string `json:"startTime"`
EndTime string `json:"endTime"`
Strategy int64 `json:"strategy,string"`
SynergyStatus int64 `json:"synergyStatus,string"`
ClusterInfos []*ClusterInfo `json:"clusterInfos"`
}
type CommitHpcTaskReq struct {
Name string `json:"name"` // paratera:jobName
Description string `json:"description,optional"`
@ -2707,6 +2722,43 @@ type Nfs struct {
ReadOnly bool `json:"readOnly,optional"`
}
type CenterOverviewResp struct {
CenterNum int32 `json:"totalCenters,optional"`
TaskNum int32 `json:"totalTasks,optional"`
CardNum int32 `json:"totalCards,optional"`
PowerInTops float64 `json:"totalPower,optional"`
}
type CenterQueueingResp struct {
Current []*CenterQueue `json:"current,optional"`
History []*CenterQueue `json:"history,optional"`
}
type CenterQueue struct {
Name string `json:"name,optional"`
QueueingNum int32 `json:"num,optional"`
}
type CenterListResp struct {
List []*AiCenter `json:"centerList,optional"`
}
type AiCenter struct {
Name string `json:"name,optional"`
StackName string `json:"stack,optional"`
Version string `json:"version,optional"`
}
type CenterTaskListResp struct {
List []*AiTask `json:"taskList,optional"`
}
type AiTask struct {
Name string `json:"name,optional"`
Status string `json:"status,optional"`
TimeElapsed int32 `json:"elapsed,optional"`
}
type StorageScreenReq struct {
}
@ -5289,9 +5341,9 @@ type TenantInfo struct {
Type int64 `json:"type"` // 租户所属(0数算1超算2智算
DeletedFlag int64 `json:"deletedFlag"` // 是否删除
CreatedBy int64 `json:"createdBy"` // 创建人
CreatedTime string `json:"createdTime"` // 创建时间
CreateTime string `json:"createdTime"` // 创建时间
UpdatedBy int64 `json:"updatedBy"` // 更新人
UpdatedTime string `json:"updated_time"` // 更新时间
UpdateTime string `json:"updated_time"` // 更新时间
}
type UpdateTenantReq struct {
@ -5345,7 +5397,7 @@ type Cloud struct {
StartTime string `json:"startTime"` // 开始时间
RunningTime int64 `json:"runningTime"` // 运行时长
CreatedBy int64 `json:"createdBy"` // 创建人
CreatedTime string `json:"createdTime"` // 创建时间
CreateTime string `json:"createdTime"` // 创建时间
Result string `json:"result"`
}
@ -5488,6 +5540,9 @@ type ScheduleResult struct {
Msg string `json:"msg"`
}
type ScheduleOverviewResp struct {
}
type AiOption struct {
TaskName string `json:"taskName"`
AdapterId string `json:"adapterId"`
@ -5546,6 +5601,22 @@ type AiJobLogResp struct {
Log string `json:"log"`
}
type AiTaskDb struct {
Id string `json:"id,omitempty" db:"id"`
TaskId string `json:"taskId,omitempty" db:"task_id"`
AdapterId string `json:"adapterId,omitempty" db:"adapter_id"`
ClusterId string `json:"clusterId,omitempty" db:"cluster_id"`
Name string `json:"name,omitempty" db:"name"`
Replica string `json:"replica,omitempty" db:"replica"`
ClusterTaskId string `json:"clusterTaskId,omitempty" db:"c_task_id"`
Strategy string `json:"strategy,omitempty" db:"strategy"`
Status string `json:"status,omitempty" db:"status"`
Msg string `json:"msg,omitempty" db:"msg"`
CommitTime string `json:"commitTime,omitempty" db:"commit_time"`
StartTime string `json:"startTime,omitempty" db:"start_time"`
EndTime string `json:"endTime,omitempty" db:"end_time"`
}
type CreateAlertRuleReq struct {
CLusterId string `json:"clusterId"`
ClusterName string `json:"clusterName"`

1159
deploy/pcm-auth.sql Normal file

File diff suppressed because one or more lines are too long

BIN
deploy/pcm-yaml.zip Normal file

Binary file not shown.

2882
deploy/pcm.sql Normal file

File diff suppressed because it is too large Load Diff

116
docs/pcm_deploy.md Normal file
View File

@ -0,0 +1,116 @@
## 1 安装部署kubekey
通过以下的命令,可以下载 KubeKey 的最新版本。您可以更改命令中的版本号来下载特定的版本。
```shell
export KKZONE=cn
curl -sfL https://get-kk.kubesphere.io | VERSION=v3.0.7 sh -
```
## 2 mysql部署及数据导入
#### 卸载已有的mariadb
`yum remove -y mariadb-server mariadb mariadb-libs`
#### 下载对应系统版本的mysql包
wget https://dev.mysql.com/get/Downloads/MySQL-8.0/mysql-8.0.36-1.el7.x86_64.rpm-bundle.tar
##### 解压
`tar -xvf mysql-8.0.36-1.el7.x86_64.rpm-bundle.tar`
##### 安装
```shell
rpm -ivh mysql-community-libs-8.0.36-1.el7.x86_64.rpm
rpm -ivh mysql-community-libs-compat-8.0.36-1.el7.x86_64.rpm
rpm -ivh mysql-community-client-plugins-8.0.36-1.el7.x86_64.rpm
rpm -ivh mysql-community-client-8.0.36-1.el7.x86_64.rpm
rpm -ivh mysql-community-common-8.0.36-1.el7.x86_64.rpm
rpm -ivh mysql-community-debuginfo-8.0.36-1.el7.x86_64.rpm
rpm -ivh mysql-community-devel-8.0.36-1.el7.x86_64.rpm
rpm -ivh mysql-community-server-8.0.36-1.el7.x86_64.rpm
```
##### 启动服务
`systemctl start mysqld`
##### 查看初始密码
`grep 'temporary password' /var/log/mysqld.log`
使用mysql -u root -p 登录
##### 修改密码
`ALTER USER 'root'@'localhost' IDENTIFIED BY 'Nudt!123';`
##### 配置外部访问
```sql
use mysql;
update user set host = '%' where user = 'root';
flush privileges;
```
##### 创建数据库
```sql
create database pcm;
create database pcm_auth;
```
##### 关闭防火墙
`systemctl stop firewalld`
##### 下载脚本
`wget -O pcm_auth.sql https://www.gitlink.org.cn/attachments/entries/get_file?download_url=https://www.gitlink.org.cn/api/JointCloud/pcm-coordinator/raw/deploy%2Fpcm-auth.sql?ref=master`
`wget -O pcm.sql https://www.gitlink.org.cn/attachments/entries/get_file?download_url=https://www.gitlink.org.cn/api/JointCloud/pcm-coordinator/raw/deploy%2Fpcm.sql?ref=master`
##### 执行sql脚本导入数据
`mysql -u root -p pcm < pcm.sql`
`mysql -u root -p pcm_auth < pcm_auth.sql`
## 3 安装部署k8s集群
```
export KKZONE=cn
sudo ./kk create cluster
```
执行可能会提示部分软件未安装直接yum安装即可
eg:
`sudo yum install -y conntrack`
`sudo yum install -y socat`
![](/api/attachments/3f8b9884-03b3-4e84-b408-d2ec451a533b)
然后重新执行创建集群命令执行成功后可以执行kubectl get pod 验证环境
![](/api/attachments/2e282429-d3ae-4019-8280-d6409da50b80)
## 3 部署鉴权、pcm-coordinator、前端服务
### 3.1 yaml文件下载
pcm所有服务的yaml文件包下载地址在[这里](https://www.gitlink.org.cn/attachments/entries/get_file?download_url=https://www.gitlink.org.cn/api/JointCloud/pcm-coordinator/raw/deploy%2Fpcm-yaml.zip?ref=master "这里")
或者在服务器上直接执行
```shell
wget -O yaml.zip https://www.gitlink.org.cn/attachments/entries/get_file?download_url=https://www.gitlink.org.cn/api/JointCloud/pcm-coordinator/raw/deploy%2Fpcm-yaml.zip?ref=master
```
下载完成解压
```shell
unzip yaml.zip
```
### 3.2 yaml执行完成服务、负载、配置文件的部署
#### 修改地址
需要修改配置文件中的数据库地址为mysql服务安装的地址
#### 一次性部署所有的文件
```shell
kubectl apply -f .
```
#### 或者单模块部署
##### 鉴权:
`kubectl apply -f pcm-auth.yaml`
##### C端
`kubectl apply -f pcm-core-api.yaml`
`kubectl apply -f pcm-core-rpc.yaml`
##### 前端:
`kubectl apply -f pcm-rip.yaml`
部署情况可以通过以下命令查看
`kubectl get pod`
![](/api/attachments/644de412-1155-4e07-a90d-367f63260a81)
## 4 配置驱动器、集群信息
此时前端服务可以通过服务器ip的31149端口访问到
默认账号密码为admin/Nudt@123
新建一个适配器配置成功后可以获取到对应的adapterId
![](/api/attachments/ad8e33d9-7155-4030-a813-227bb019c6e0)
将对应的id填写到对应的P端配置信息中(configmap 内容)
![](/api/attachments/f0d8ee8d-f94f-40c7-8785-58ce09c89ba0)
## 5 部署P端服务
### HPC服务端
kubectl apply -f pcm-hpc.yaml
### kubernetes适配器:
kubectl apply -f pcm-kubernetes.yaml
## 7.系统使用

2
go.mod
View File

@ -24,7 +24,7 @@ require (
github.com/robfig/cron/v3 v3.0.1
github.com/rs/zerolog v1.28.0
github.com/zeromicro/go-zero v1.6.3
gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240420083915-58d6e2958aeb
gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240426095603-549fefd8bece
gitlink.org.cn/JointCloud/pcm-kubernetes v0.0.0-20240301071143-347480abff2c
gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240424085753-6899615e9142
gitlink.org.cn/JointCloud/pcm-openstack v0.0.0-20240403033338-e7edabad4203

4
go.sum
View File

@ -1078,8 +1078,8 @@ github.com/yuin/gopher-lua v1.1.0/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7
github.com/zeromicro/go-zero v1.5.1/go.mod h1:bGYm4XWsGN9GhDsO2O2BngpVoWjf3Eog2a5hUOMhlXs=
github.com/zeromicro/go-zero v1.6.3 h1:OL0NnHD5LdRNDolfcK9vUkJt7K8TcBE3RkzfM8poOVw=
github.com/zeromicro/go-zero v1.6.3/go.mod h1:XZL435ZxVi9MSXXtw2MRQhHgx6OoX3++MRMOE9xU70c=
gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240420083915-58d6e2958aeb h1:k6mNEWKp+haQUaK2dWs/rI9OKgzJHY1/9KNKuBDN0Vw=
gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240420083915-58d6e2958aeb/go.mod h1:w3Nb5TNymCItQ7K3x4Q0JLuoq9OerwAzAWT2zsPE9Xo=
gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240426095603-549fefd8bece h1:W3yBnvAVV8dlRNQKYD6Mf8ySRrYsP0tPk7JjvqZzNHQ=
gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240426095603-549fefd8bece/go.mod h1:w3Nb5TNymCItQ7K3x4Q0JLuoq9OerwAzAWT2zsPE9Xo=
gitlink.org.cn/JointCloud/pcm-kubernetes v0.0.0-20240301071143-347480abff2c h1:2Wl/hvaSFjh6fmCSIQhjkr9llMRREQeqcXNLZ/HPY18=
gitlink.org.cn/JointCloud/pcm-kubernetes v0.0.0-20240301071143-347480abff2c/go.mod h1:lSRfGs+PxFvw7CcndHWRd6UlLlGrZn0b0hp5cfaMNGw=
gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240424085753-6899615e9142 h1:+po0nesBDSWsgCySBG7eEXk7i9Ytd58wqvjL1M9y6d8=

View File

@ -6,9 +6,9 @@ import (
)
type BaseModel struct {
DeletedAt gorm.DeletedAt `gorm:"index;comment:删除时间" json:"-"` // 删除时间
CreatedBy uint `gorm:"created_by;comment:创建人" json:"createdBy"` //创建人
CreatedTime time.Time `gorm:"comment:创建时间" json:"-"` // 创建时间
UpdatedBy uint `gorm:"updated_by;comment:更新人" json:"UpdatedBy"` //创建人
UpdatedTime time.Time `gorm:"comment:更新时间" json:"-"` // 更新时间
DeletedAt gorm.DeletedAt `gorm:"index;comment:删除时间" json:"-"` // 删除时间
CreatedBy uint `gorm:"created_by;comment:创建人" json:"createdBy"` //创建人
CreateTime time.Time `gorm:"autoCreateTime:nano;comment:创建时间" json:"-"` // 创建时间
UpdatedBy uint `gorm:"updated_by;comment:更新人" json:"UpdatedBy"` //创建人
UpdateTime time.Time `gorm:"autoUpdateTime:nano;;comment:更新时间" json:"-"` // 更新时间
}

View File

@ -37,9 +37,9 @@ type File struct {
Status string `gorm:"column:status" json:"Status"` //type:string comment:hash version:2023-05-06 09:58
DeletedFlag *int `gorm:"column:deleted_flag" json:"DeletedFlag"` //type:*int comment:是否删除 version:2023-05-06 09:58
CreatedBy *int `gorm:"column:created_by" json:"CreatedBy"` //type:*int comment:创建人 version:2023-05-06 09:58
CreatedTime *time.Time `gorm:"column:created_time" json:"CreatedTime"` //type:*time.Time comment:创建时间 version:2023-05-06 09:58
CreatedTime *time.Time `gorm:"column:created_time" json:"CreateTime"` //type:*time.Time comment:创建时间 version:2023-05-06 09:58
UpdatedBy *int `gorm:"column:updated_by" json:"UpdatedBy"` //type:*int comment:更新人 version:2023-05-06 09:58
UpdatedTime *time.Time `gorm:"column:updated_time" json:"UpdatedTime"` //type:*time.Time comment:更新时间 version:2023-05-06 09:58
UpdatedTime *time.Time `gorm:"column:updated_time" json:"UpdateTime"` //type:*time.Time comment:更新时间 version:2023-05-06 09:58
}
// TableName 表名:data_set

24
pkg/models/taskaimodel.go Normal file
View File

@ -0,0 +1,24 @@
package models
import "github.com/zeromicro/go-zero/core/stores/sqlx"
var _ TaskAiModel = (*customTaskAiModel)(nil)
type (
// TaskAiModel is an interface to be customized, add more methods here,
// and implement the added methods in customTaskAiModel.
TaskAiModel interface {
taskAiModel
}
customTaskAiModel struct {
*defaultTaskAiModel
}
)
// NewTaskAiModel returns a model for the database table.
func NewTaskAiModel(conn sqlx.SqlConn) TaskAiModel {
return &customTaskAiModel{
defaultTaskAiModel: newTaskAiModel(conn),
}
}

View File

@ -0,0 +1,103 @@
// Code generated by goctl. DO NOT EDIT.
package models
import (
"context"
"database/sql"
"fmt"
"strings"
"time"
"github.com/zeromicro/go-zero/core/stores/builder"
"github.com/zeromicro/go-zero/core/stores/sqlc"
"github.com/zeromicro/go-zero/core/stores/sqlx"
"github.com/zeromicro/go-zero/core/stringx"
)
var (
taskAiFieldNames = builder.RawFieldNames(&TaskAi{})
taskAiRows = strings.Join(taskAiFieldNames, ",")
taskAiRowsExpectAutoSet = strings.Join(stringx.Remove(taskAiFieldNames, "`id`", "`create_at`", "`create_time`", "`created_at`", "`update_at`", "`update_time`", "`updated_at`"), ",")
taskAiRowsWithPlaceHolder = strings.Join(stringx.Remove(taskAiFieldNames, "`id`", "`create_at`", "`create_time`", "`created_at`", "`update_at`", "`update_time`", "`updated_at`"), "=?,") + "=?"
)
type (
taskAiModel interface {
Insert(ctx context.Context, data *TaskAi) (sql.Result, error)
FindOne(ctx context.Context, id int64) (*TaskAi, error)
Update(ctx context.Context, data *TaskAi) error
Delete(ctx context.Context, id int64) error
}
defaultTaskAiModel struct {
conn sqlx.SqlConn
table string
}
TaskAi struct {
Id int64 `db:"id"` // id
TaskId int64 `db:"task_id"` // 任务id
AdapterId int64 `db:"adapter_id"` // 设配器id
ClusterId int64 `db:"cluster_id"` // 集群id
Name string `db:"name"` // 任务名
Replica int64 `db:"replica"` // 执行数
JobId string `db:"job_id"` // 集群返回任务id
Strategy string `db:"strategy"` // 主任务使用策略
Status string `db:"status"` // 任务状态
Msg string `db:"msg"` // 集群返回任务信息
CommitTime time.Time `db:"commit_time"` // 提交时间
StartTime string `db:"start_time"` // 开始时间
EndTime string `db:"end_time"` // 结束时间
}
)
func newTaskAiModel(conn sqlx.SqlConn) *defaultTaskAiModel {
return &defaultTaskAiModel{
conn: conn,
table: "`task_ai`",
}
}
func (m *defaultTaskAiModel) withSession(session sqlx.Session) *defaultTaskAiModel {
return &defaultTaskAiModel{
conn: sqlx.NewSqlConnFromSession(session),
table: "`task_ai`",
}
}
func (m *defaultTaskAiModel) Delete(ctx context.Context, id int64) error {
query := fmt.Sprintf("delete from %s where `id` = ?", m.table)
_, err := m.conn.ExecCtx(ctx, query, id)
return err
}
func (m *defaultTaskAiModel) FindOne(ctx context.Context, id int64) (*TaskAi, error) {
query := fmt.Sprintf("select %s from %s where `id` = ? limit 1", taskAiRows, m.table)
var resp TaskAi
err := m.conn.QueryRowCtx(ctx, &resp, query, id)
switch err {
case nil:
return &resp, nil
case sqlc.ErrNotFound:
return nil, ErrNotFound
default:
return nil, err
}
}
func (m *defaultTaskAiModel) Insert(ctx context.Context, data *TaskAi) (sql.Result, error) {
query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, taskAiRowsExpectAutoSet)
ret, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.AdapterId, data.ClusterId, data.Name, data.Replica, data.JobId, data.Strategy, data.Status, data.Msg, data.CommitTime, data.StartTime, data.EndTime)
return ret, err
}
func (m *defaultTaskAiModel) Update(ctx context.Context, data *TaskAi) error {
query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, taskAiRowsWithPlaceHolder)
_, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.AdapterId, data.ClusterId, data.Name, data.Replica, data.JobId, data.Strategy, data.Status, data.Msg, data.CommitTime, data.StartTime, data.EndTime, data.Id)
return err
}
func (m *defaultTaskAiModel) tableName() string {
return m.table
}

View File

@ -49,6 +49,7 @@ type (
Result string `db:"result"` // 作业结果
DeletedAt gorm.DeletedAt `gorm:"index"`
NsID string `db:"ns_id"`
TaskTypeDict int `db:"task_type_dict"` //任务类型(对应字典表的值)
}
)

View File

@ -41,3 +41,8 @@ func GenSnowflakeID() int64 {
func GenSnowflakeIDStr() string {
return node.Generate().String()
}
// GenSnowflakeIDStr 工作id
func GenSnowflakeIDUint() uint {
return uint(node.Generate().Int64())
}