根据标签匹配任务

Former-commit-id: cf60ab6a9ef1dc5c19bf442199bca280d30270b0
This commit is contained in:
zhangwei 2023-07-26 14:56:22 +08:00
parent fffd0e3382
commit 77dfd5c84c
18 changed files with 537 additions and 465 deletions

View File

@ -181,7 +181,7 @@ type (
}
TaskYaml {
TaskId int64 `yaml:"taskId"`
serviceName string `yaml:"serviceName"`
taskType string `yaml:"taskType"`
metadata interface{} `yaml:"metadata"`
}
)

View File

@ -308,6 +308,5 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
},
},
rest.WithPrefix("/pcm/v1"),
rest.WithMaxBytes(1111111111),
)
}

View File

@ -56,12 +56,12 @@ func (l *ScheduleTaskByYamlLogic) ScheduleTaskByYaml(req *types.ScheduleTaskByYa
logx.Error(err)
return err
}
switch task.ServiceName {
case "kubeNative":
switch task.TaskType {
case "cloud":
l.svcCtx.ScheduleCloudClient.Push(string(reqMessage))
case "ac", "th":
case "hpc":
l.svcCtx.ScheduleHpcClient.Push(string(reqMessage))
case "modelArts":
case "ai":
l.svcCtx.ScheduleAiClient.Push(string(reqMessage))
}
}

View File

@ -61,7 +61,7 @@ func (l *ScheduleTaskLogic) ScheduleTask(req *types.ScheduleTaskReq) (err error)
logx.Error(err)
return err
}
switch task.ServiceName {
switch task.TaskType {
case "kubeNative":
l.svcCtx.ScheduleCloudClient.Push(string(reqMessage))
case "ac", "th":

View File

@ -32,7 +32,6 @@ func (l *ScheduleAiMq) Consume(_, val string) error {
ai := model.Ai{
TaskId: task.TaskId,
Status: "Saved",
ServiceName: task.ServiceName,
YamlString: val,
}
tool.Convert(task.Metadata, &ai)

View File

@ -5,6 +5,7 @@ import (
"context"
"encoding/json"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/pkg/scheduler"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/model"
@ -60,7 +61,6 @@ func UnMarshalK8sStruct(yamlString string, taskId int64) model.Cloud {
Kind: unstructureObj.GetKind(),
Namespace: unstructureObj.GetNamespace(),
Status: "Saved",
ServiceName: "kubeNative",
}
}
return cloud
@ -69,6 +69,10 @@ func UnMarshalK8sStruct(yamlString string, taskId int64) model.Cloud {
func (l *ScheduleCloudMq) Consume(_, val string) error {
var task *types.TaskInfo
json.Unmarshal([]byte(val), &task)
participantId, err := scheduler.MatchLabels(l.svcCtx.DbEngin, task)
if err != nil {
return err
}
// 构建提交作业到云算的结构体
bytes, err := json.Marshal(task.Metadata)
if err != nil {
@ -76,6 +80,9 @@ func (l *ScheduleCloudMq) Consume(_, val string) error {
}
cloud := UnMarshalK8sStruct(string(bytes), task.TaskId)
cloud.YamlString = string(bytes)
if len(participantId) != 0 {
cloud.ParticipantId = participantId[0]
}
// 存储数据
tx := l.svcCtx.DbEngin.Create(&cloud)
if tx.Error != nil {

View File

@ -33,7 +33,7 @@ func (l *ScheduleHpcMq) Consume(_, val string) error {
hpc := model.Hpc{
TaskId: task.TaskId,
Status: "Saved",
ServiceName: task.ServiceName,
ServiceName: task.TaskType,
YamlString: val,
}
tool.Convert(task.Metadata, &hpc)

View File

@ -0,0 +1 @@
package scheduler

View File

@ -0,0 +1,62 @@
package scheduler
import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
"gorm.io/gorm"
"math/rand"
"time"
)
func MatchLabels(dbEngin *gorm.DB, task *types.TaskInfo) ([]int64, error) {
var ids []int64
count := 0
for key := range task.MatchLabels {
var participantId []int64
dbEngin.Raw("select participant_id from sc_participant_label_info where `key` = ? and value = ?", key, task.MatchLabels[key]).Scan(&participantId)
if count == 0 {
ids = participantId
}
if len(participantId) == 0 || len(ids) == 0 {
return nil, nil
}
ids = intersect(ids, participantId)
count++
}
return micsSlice(ids, 1), nil
}
// 求交集
func intersect(slice1, slice2 []int64) []int64 {
m := make(map[int64]int)
nn := make([]int64, 0)
for _, v := range slice1 {
m[v]++
}
for _, v := range slice2 {
times, _ := m[v]
if times == 1 {
nn = append(nn, v)
}
}
return nn
}
func micsSlice(origin []int64, count int) []int64 {
tmpOrigin := make([]int64, len(origin))
copy(tmpOrigin, origin)
//一定要seed
rand.Seed(time.Now().Unix())
rand.Shuffle(len(tmpOrigin), func(i int, j int) {
tmpOrigin[i], tmpOrigin[j] = tmpOrigin[j], tmpOrigin[i]
})
result := make([]int64, 0, count)
for index, value := range tmpOrigin {
if index == count {
break
}
result = append(result, value)
}
return result
}

View File

@ -162,7 +162,7 @@ type ScheduleTaskByYamlReq struct {
type TaskYaml struct {
TaskId int64 `yaml:"taskId"`
ServiceName string `yaml:"serviceName"`
TaskType string `yaml:"taskType"`
Metadata interface{} `yaml:"metadata"`
}
@ -176,7 +176,8 @@ type ScheduleTaskReq struct {
type TaskInfo struct {
TaskId int64 `json:"taskId,optional"`
ServiceName string `json:"serviceName"`
TaskType string `json:"taskType"`
MatchLabels map[string]string `json:"matchLabels"`
Metadata interface{} `json:"metadata"`
}

View File

@ -64,7 +64,6 @@ func main() {
for _, mq := range services {
serviceGroup.Add(mq)
}
logx.Infof("Starting server at %s:%d...\n", c.Host, c.Port)
serviceGroup.Start()

View File

@ -38,6 +38,7 @@ type (
Ai struct {
Id int64 `db:"id"` // id
TaskId int64 `db:"task_id"` // 任务id
ParticipantId int64 `db:"participant_id"` // 集群静态信息id
ProjectId string `db:"project_id"` // 项目id
Name string `db:"name"` // 名称
Status string `db:"status"` // 状态

View File

@ -38,6 +38,7 @@ type (
Cloud struct {
Id int64 `db:"id"` // id
TaskId int64 `db:"task_id"` // 任务id
ParticipantId int64 `db:"participant_id"` // 集群静态信息id
ApiVersion string `db:"api_version"`
Name string `db:"name"` // 名称
Namespace string `db:"namespace"` // 命名空间

View File

@ -38,6 +38,7 @@ type (
Hpc struct {
Id int64 `db:"id"` // id
TaskId int64 `db:"task_id"` // 任务id
ParticipantId int64 `db:"participant_id"` // 集群静态信息id
JobId string `db:"job_id"` // 作业id
ServiceName string `db:"service_name"` // 服务名称
Name string `db:"name"` // 名称

View File

@ -4,7 +4,7 @@ package pcmCore;
option go_package = "/pcmCore";
message SyncInfoReq {
string serviceName = 1;
int64 participantId = 1;
string kind = 2;
repeated HpcInfo HpcInfoList = 3;
repeated CloudInfo CloudInfoList = 4;
@ -12,7 +12,7 @@ message SyncInfoReq {
}
message AiInfo {
string serviceName = 1;
int64 participantId = 1;
int64 taskId = 2;
string project_id = 3;
string name = 4;
@ -30,7 +30,7 @@ message AiInfo {
}
message CloudInfo {
string serviceName = 1;
int64 participant = 1;
int64 taskId = 2;
string apiVersion = 3;
string kind = 4;
@ -44,7 +44,7 @@ message CloudInfo {
}
message HpcInfo {
string serviceName = 1;
int64 participantId = 1;
int64 taskId = 2;
string jobId = 3;
@ -74,7 +74,7 @@ message SyncInfoResp{
message InfoListReq{
string kind = 1;
string serviceName = 2;
string participantId = 2;
}
message InfoListResp{
@ -158,7 +158,7 @@ message NodePhyInfo {
message ParticipantHeartbeatReq{
int64 participantId = 1; //participantId
string host = 2; //host
int32 port = 3; //port
string port = 3; //port
}
// participant

File diff suppressed because it is too large Load Diff

View File

@ -1,8 +1,8 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.3.0
// - protoc v4.23.4
// source: pb/pcmCore.proto
// - protoc v3.19.4
// source: pcmCore.proto
package pcmCore
@ -146,7 +146,7 @@ var PcmCore_ServiceDesc = grpc.ServiceDesc{
},
},
Streams: []grpc.StreamDesc{},
Metadata: "pb/pcmCore.proto",
Metadata: "pcmCore.proto",
}
const (
@ -277,5 +277,5 @@ var ParticipantService_ServiceDesc = grpc.ServiceDesc{
},
},
Streams: []grpc.StreamDesc{},
Metadata: "pb/pcmCore.proto",
Metadata: "pcmCore.proto",
}