# Conflicts:
#	api/desc/pcm.api


Former-commit-id: 9163707ba3f4c77f3fef173cec2491f1edfd2a94
This commit is contained in:
zhangwei 2024-05-11 18:25:47 +08:00
commit 81f81d209e
56 changed files with 1719 additions and 476 deletions

1
.gitignore vendored
View File

@ -27,5 +27,4 @@ buf.lock
configs/tenanter.yaml configs/tenanter.yaml
log/ log/
/go_build_gitlink_org_cn_JCCE_PCM
/cache/ /cache/

View File

@ -1,21 +1,9 @@
pcm-core-api: pcm-core-api:
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o pcm-core-api adaptor/PCM-CORE/api/pcm.go CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o pcm-core-api api/pcm.go
pcm-core-rpc: pcm-core-rpc:
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o pcm-core-rpc adaptor/PCM-CORE/rpc/pcmcore.go CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o pcm-core-rpc rpc/pcmcore.go
pcm-ac-rpc: all-build: pcm-core-rpc pcm-core-api
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o pcm-ac adaptor/PCM-HPC/PCM-AC/rpc/hpcac.go
pcm-kubenative-rpc: .PHONY: pcm-core-rpc pcm-core-api
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o pcm-kubenative adaptor/PCM-K8S/PCM-K8S-NATIVE/rpc/pcmkubenative.go
pcm-modelarts-rpc:
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o pcm-modelarts adaptor/PCM-AI/PCM-MODELARTS/rpc/pcmmodelarts.go
pcm-ceph-rpc:
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o pcm-ceph adaptor/PCM-STORAGE/PCM-CEPH/rpc/pcmceph.go
all-build: pcm-core-rpc pcm-core-api pcm-ac-rpc pcm-kubenative-rpc pcm-modelarts-rpc pcm-ceph-rpc
.PHONY: pcm-core-rpc pcm-core-api pcm-ac-rpc pcm-kubenative-rpc pcm-modelarts-rpc pcm-ceph-rpc

View File

@ -1,4 +1,5 @@
![PCM](https://www.gitlink.org.cn/images/avatars/Organization/123822?t=1689062058) <img src="https://www.gitlink.org.cn/images/avatars/Organization/123822?t=1689062058" alt="PCM" style="float:center" />
<p align="center"> <p align="center">
<a href="https://www.gitlink.org.cn/JointCloud/pcm-coordinator/tree/master/docs">Docs</a> | <a href="https://www.gitlink.org.cn/JointCloud/pcm-coordinator/tree/master/docs">Docs</a> |
<a href="https://www.gitlink.org.cn/JointCloud/pcm-coordinator/tree/master/docs">简体中文</a> | <a href="https://www.gitlink.org.cn/JointCloud/pcm-coordinator/tree/master/docs">简体中文</a> |
@ -7,35 +8,32 @@
## What is Jointcloud and PCM ## What is Jointcloud and PCM
&emsp;&emsp;Jointcloud Cooperation Environment (JCCE) comprises Distributed Cloud Trading, Distributed Cloud Community, Distributed Cloud Supervision, and a Blockchain-based Distributed Accounting System. JCCE, with its supporting technologies related to information and value exchange, facilitates breaking the information asymmetry among participants in cloud collaboration. It creates a win-win opportunity for all involved parties and provides robust support for innovative business models in the cloud collaboration computing environment. Jointcloud Cooperation Environment (JCCE) comprises Distributed Cloud Trading, Distributed Cloud Community, Distributed Cloud Supervision, and a Blockchain-based Distributed Accounting System. JCCE, with its supporting technologies related to information and value exchange, facilitates breaking the information asymmetry among participants in cloud collaboration. It creates a win-win opportunity for all involved parties and provides robust support for innovative business models in the cloud collaboration computing environment.
&emsp;&emsp;The vision of the Peer Cooperation Mechanism (PCM) is to build a set of collaboration mechanisms in the cloud environment where all service providers and demand-side entities can autonomously participate, achieve peer-to-peer transactions, and establish efficient connections and invocations in a non-intrusive manner. PCM focuses more on collaboration among clouds rather than individual clouds themselves. The vision of the Peer Cooperation Mechanism (PCM) is to build a set of collaboration mechanisms in the cloud environment where all service providers and demand-side entities can autonomously participate, achieve peer-to-peer transactions, and establish efficient connections and invocations in a non-intrusive manner. PCM focuses more on collaboration among clouds rather than individual clouds themselves.
&emsp;&emsp;PCM is built upon a standardized software-defined framework. Cloud service providers with physical cloud resources can autonomously define PCM rule frameworks. Other entities adhering to these rule frameworks can then participate in collaborative transactions. Subsequently, tailored virtual private clouds can be created for domain-specific applications, meeting the diverse requirements of various cloud participants, such as central, edge, and terminal infrastructure, enabling them to directly participate in collaboration and transactions. PCM is built upon a standardized software-defined framework. Cloud service providers with physical cloud resources can autonomously define PCM rule frameworks. Other entities adhering to these rule frameworks can then participate in collaborative transactions. Subsequently, tailored virtual private clouds can be created for domain-specific applications, meeting the diverse requirements of various cloud participants, such as central, edge, and terminal infrastructure, enabling them to directly participate in collaboration and transactions.
## Real-world Issues Addressed by PCM: ## Real-world Issues Addressed by PCM:
- **Performance Improvement**: - **Performance Improvement**
Developers in the cloud collaboration environment experience performance enhancement upon joining the PCM framework. They can generate code for the internal functionality structure and most specification definitions using descriptive language, allowing them to focus on business logic development without concerning themselves with underlying management functions. The framework offers features such as microservices management, multi-language code generation, and model bridging, reducing coding workload and entry barriers while improving efficiency.
Developers in the cloud collaboration environment experience performance enhancement upon joining the PCM framework. They can generate code for the internal functionality structure and most specification definitions using descriptive language, allowing them to focus on business logic development without concerning themselves with underlying management functions. The framework offers features such as microservices management, multi-language code generation, and model bridging, reducing coding workload and entry barriers while improving efficiency.
- **Platform Lock-in Resolution**: - **Platform Lock-in Resolution**
The PCM effectively addresses platform lock-in issues through multi-cloud adaptation, standardized interfaces, and abstraction layers. This enables cross-platform operations, deployment, and interaction. Standardized interfaces simplify the work for developers, lowering the risk of platform lock-in.
The PCM effectively addresses platform lock-in issues through multi-cloud adaptation, standardized interfaces, and abstraction layers. This enables cross-platform operations, deployment, and interaction. Standardized interfaces simplify the work for developers, lowering the risk of platform lock-in.
- **Reduced Code Development Complexity**: - **Reduced Code Development Complexity**
The PCM development framework lowers the complexity of development by providing structured development patterns, ready-made components, and documentation support. Developers of different skill levels can collaborate more fairly. The framework can save approximately 50% of development time, with junior programmers completing tasks that originally required one person-month in about 15 person-days. Features such as automatic generation tools, code templates, and component reuse allow developers to focus more on business logic implementation. There is no need for additional training of advanced developers, saving time, and reducing labor costs while improving the return on investment.
The PCM development framework lowers the complexity of development by providing structured development patterns, ready-made components, and documentation support. Developers of different skill levels can collaborate more fairly. The framework can save approximately 50% of development time, with junior programmers completing tasks that originally required one person-month in about 15 person-days. Features such as automatic generation tools, code templates, and component reuse allow developers to focus more on business logic implementation. There is no need for additional training of advanced developers, saving time, and reducing labor costs while improving the return on investment.
- **Reduced Code Development Workload**: - **Reduced Code Development Workload**
   The PCM framework offers automation features and code generation tools, reducing manual code writing and improving development speed and code quality. With the framework's auto-generation tools, over half of the code is generated, achieving a low code rate of approximately 63%. Developers primarily work on writing descriptive language files to generate the basic structure and then adjust and write logic functions. This enables teams to deliver products more quickly, iterate based on business optimization and user feedback, and be more agile. The PCM framework offers automation features and code generation tools, reducing manual code writing and improving development speed and code quality. With the framework's auto-generation tools, over half of the code is generated, achieving a low code rate of approximately 63%. Developers primarily work on writing descriptive language files to generate the basic structure and then adjust and write logic functions. This enables teams to deliver products more quickly, iterate based on business optimization and user feedback, and be more agile.
## Architecture ## Architecture
&emsp;&emsp;The Coordinator is the core component of the framework, providing heterogeneous abstraction for different Participant-side technology stacks. The framework, oriented towards the user side, primarily involves two major functionalities: resource changes (task submission) and centralized display. After task submission, PCM achieves dynamic task flow through a scheduling center deployed on cloud computing, intelligent computing, and supercomputing clouds. In the centralized display section, PCM mainly collects and aggregates information about resources and tasks from multiple Participant services through Tracker and actively reported by the Participant side. It provides users with a standardized unified interface. Users can view the information provided by PCM on the frontend page or directly access data through the interface. The Coordinator is the core component of the framework, providing heterogeneous abstraction for different Participant-side technology stacks. The framework, oriented towards the user side, primarily involves two major functionalities: resource changes (task submission) and centralized display. After task submission, PCM achieves dynamic task flow through a scheduling center deployed on cloud computing, intelligent computing, and supercomputing clouds. In the centralized display section, PCM mainly collects and aggregates information about resources and tasks from multiple Participant services through Tracker and actively reported by the Participant side. It provides users with a standardized unified interface. Users can view the information provided by PCM on the frontend page or directly access data through the interface.
![PCM架构](docs/images/arch-eng.png) ![PCM架构](docs/images/arch-eng.png)
## PCM deploy ## PCM deploy
@ -43,13 +41,13 @@
The development environment for PCM requires the installation of Go version 1.18 or above. Please refer to the following instructions to locally start the Kubernetes-related services. The development environment for PCM requires the installation of Go version 1.18 or above. Please refer to the following instructions to locally start the Kubernetes-related services.
The command to fetch the project is as follows: The command to fetch the project is as follows:
``` ```bash
git clone https://gitlink.org.cn/JointCloud/pcm-coordinator.git git clone https://gitlink.org.cn/JointCloud/pcm-coordinator.git
``` ```
&emsp;&emsp;After executing the following command, the Kubernetes RPC service will be launched locally. For its specific functionalities, please refer to the description in the architecture design mentioned earlier. After executing the following command, the Kubernetes RPC service will be launched locally. For its specific functionalities, please refer to the description in the architecture design mentioned earlier.
``` ``` bash
# get required packages # get required packages
go mod tidy go mod tidy
@ -57,8 +55,8 @@ go mod tidy
go run pcm-coordinator/rpc/pcmcore.go go run pcm-coordinator/rpc/pcmcore.go
``` ```
#### coordinator-api #### coordinator-api
&emsp;&emsp;The template for the configuration content of the API service is as follows, where the config information for each Participant (P-side) can be configured as needed based on the actual situation. The template for the configuration content of the API service is as follows, where the config information for each Participant (P-side) can be configured as needed based on the actual situation.
``` ``` bash
Name: pcm.core.api Name: pcm.core.api
Host: 0.0.0.0 Host: 0.0.0.0
Port: 8999 Port: 8999
@ -78,8 +76,8 @@ THRpcConf:
ModelArtsRpcConf: ModelArtsRpcConf:
Endpoints: - 127.0.0.1:2002NonBlock: true Endpoints: - 127.0.0.1:2002NonBlock: true
``` ```
&emsp;&emsp;After executing the following command, the Kubernetes API service will be launched locally. Once the service is started, users can make HTTP requests to its interfaces for various functional calls. After executing the following command, the Kubernetes API service will be launched locally. Once the service is started, users can make HTTP requests to its interfaces for various functional calls.
``` ``` bash
# get required packages # get required packages
go mod tidy go mod tidy
@ -89,10 +87,10 @@ go run pcm-coordinator/api/pcm.go
## Upcoming Plans ## Upcoming Plans
- Pluginization of Scheduling Algorithms and Definition of Basic Resource Templates - Pluginization of Scheduling Algorithms and Definition of Basic Resource Templates.
- Fundamental Definition of Resource Operation Types - Fundamental Definition of Resource Operation Types.
- Rapid Development Mode - Rapid Development Mode.
- Implementation of First-level Scheduling - Implementation of First-level Scheduling.
## JoinContribute ## JoinContribute
&emsp;&emsp;We look forward to your opinions and contributions. Welcome all friends to provide corrections and improvements to the project, collectively building an efficient and stable cloud collaboration mechanism. We look forward to your opinions and contributions. Welcome all friends to provide corrections and improvements to the project, collectively building an efficient and stable cloud collaboration mechanism.

View File

@ -1,40 +1,29 @@
FROM golang:1.21.2-alpine3.18 AS builder FROM golang:1.21.2-alpine3.18 AS builder
LABEL stage=gobuilder
ENV CGO_ENABLED 0
ENV GOARCH amd64
ENV GOPROXY https://goproxy.cn,direct
RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.sjtug.sjtu.edu.cn/g' /etc/apk/repositories && \
apk update --no-cache && apk add --no-cache tzdata
WORKDIR /app WORKDIR /app
ADD go.mod .
ADD go.sum .
RUN go mod download
COPY . . COPY . .
COPY api/etc/ /app/
RUN go env -w GO111MODULE=on \
&& go env -w GOPROXY=https://goproxy.cn,direct \
&& go env -w CGO_ENABLED=0 \
&& go mod download
RUN go build -o pcm-coordinator-api /app/api/pcm.go RUN go build -o pcm-coordinator-api /app/api/pcm.go
FROM alpine:3.18
FROM alpine:3.16.2
WORKDIR /app WORKDIR /app
#修改alpine源为上海交通大学
RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.sjtug.sjtu.edu.cn/g' /etc/apk/repositories && \ RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.sjtug.sjtu.edu.cn/g' /etc/apk/repositories && \
apk update && \ apk add --no-cache ca-certificates tzdata && \
apk upgrade && \ update-ca-certificates && \
apk add --no-cache ca-certificates && update-ca-certificates && \ rm -rf /var/cache/apk/*
apk add --update tzdata && \
rm -rf /var/cache/apk/*
COPY --from=builder /app/pcm-coordinator-api . COPY --from=builder /app/pcm-coordinator-api /app/
COPY api/etc/pcm.yaml . COPY --from=builder /app/api/etc/pcm.yaml /app/
ENV TZ=Asia/Shanghai ENV TZ=Asia/Shanghai
EXPOSE 8999 EXPOSE 8999
ENTRYPOINT ./pcm-coordinator-api -f pcm.yaml ENTRYPOINT ["./pcm-coordinator-api", "-f", "pcm.yaml"]

View File

@ -5,6 +5,12 @@ import (
"time" "time"
) )
var StatusMapping = map[string][]string{
"Running": {"RUNNING", "RUNNING", "CONFIGURING", "COMPLETING"},
"Succeeded": {"COMPLETED"},
"Failed": {"FAILED", "TIMEOUT", "DEADLINE", "OUT_OF_MEMORY", "BOOT_FAIL", "CANCELLED"},
}
type PullTaskInfoReq struct { type PullTaskInfoReq struct {
AdapterId int64 `form:"adapterId"` AdapterId int64 `form:"adapterId"`
} }
@ -44,6 +50,7 @@ type NoticeInfo struct {
ClusterId int64 `json:"clusterId"` ClusterId int64 `json:"clusterId"`
ClusterName string `json:"clusterName"` ClusterName string `json:"clusterName"`
NoticeType string `json:"noticeType"` NoticeType string `json:"noticeType"`
TaskId int64 `json:"taskId"`
TaskName string `json:"taskName"` TaskName string `json:"taskName"`
Incident string `json:"incident"` Incident string `json:"incident"`
CreatedTime time.Time `json:"createdTime"` CreatedTime time.Time `json:"createdTime"`
@ -68,11 +75,13 @@ type PushNoticeResp struct {
} }
type HpcInfo struct { type HpcInfo struct {
Id int64 `json:"id"` // id Id int64 `json:"id"` // id
TaskId int64 `json:"task_id"` // 任务id TaskId int64 `json:"task_id"` // 任务id
JobId string `json:"job_id"` // 作业id(在第三方系统中的作业id) JobId string `json:"job_id"` // 作业id(在第三方系统中的作业id)
AdapterId int64 `json:"adapter_id"` // 执行任务的适配器id AdapterId int64 `json:"adapter_id"` // 执行任务的适配器id
ClusterId int64 `json:"cluster_id"` // 执行任务的集群id AdapterName string `json:"adapterName,omitempty,optional"`
ClusterId int64 `json:"cluster_id"` // 执行任务的集群id
ClusterName string `json:"clusterName,omitempty,optional"`
ClusterType string `json:"cluster_type"` // 执行任务的集群类型 ClusterType string `json:"cluster_type"` // 执行任务的集群类型
Name string `json:"name"` // 名称 Name string `json:"name"` // 名称
Status string `json:"status"` // 状态 Status string `json:"status"` // 状态
@ -113,8 +122,9 @@ type HpcInfo struct {
type CloudInfo struct { type CloudInfo struct {
Id uint `json:"id,omitempty,optional"` Id uint `json:"id,omitempty,optional"`
TaskId int64 `json:"taskId,omitempty,optional"` TaskId int64 `json:"taskId,omitempty,optional"`
AdapterId uint `json:"adapterId,omitempty,optional"` AdapterId int64 `json:"adapterId,omitempty,optional"`
ClusterId uint `json:"clusterId,omitempty,optional"` AdapterName string `json:"adapterName,omitempty,optional"`
ClusterId int64 `json:"clusterId,omitempty,optional"`
ClusterName string `json:"clusterName,omitempty,optional"` ClusterName string `json:"clusterName,omitempty,optional"`
Kind string `json:"kind,omitempty,optional"` Kind string `json:"kind,omitempty,optional"`
Status string `json:"status,omitempty,optional"` Status string `json:"status,omitempty,optional"`
@ -125,9 +135,12 @@ type CloudInfo struct {
} }
type AiInfo struct { type AiInfo struct {
ParticipantId int64 `json:"participantId,omitempty"`
TaskId int64 `json:"taskId,omitempty"` TaskId int64 `json:"taskId,omitempty"`
ProjectId string `json:"project_id,omitempty"` ProjectId string `json:"project_id,omitempty"`
AdapterId int64 `json:"adapterId,omitempty,optional"`
AdapterName string `json:"adapterName,omitempty,optional"`
ClusterId int64 `json:"clusterId,omitempty,optional"`
ClusterName string `json:"clusterName,omitempty,optional"`
Name string `json:"name,omitempty"` Name string `json:"name,omitempty"`
Status string `json:"status,omitempty"` Status string `json:"status,omitempty"`
StartTime string `json:"startTime,omitempty"` StartTime string `json:"startTime,omitempty"`
@ -143,9 +156,12 @@ type AiInfo struct {
} }
type VmInfo struct { type VmInfo struct {
ParticipantId int64 `json:"participantId,omitempty"`
TaskId int64 `json:"taskId,omitempty"` TaskId int64 `json:"taskId,omitempty"`
Name string `json:"name,omitempty"` Name string `json:"name,omitempty"`
AdapterId int64 `json:"adapterId,omitempty,optional"`
AdapterName string `json:"adapterName,omitempty,optional"`
ClusterId int64 `json:"clusterId,omitempty,optional"`
ClusterName string `json:"clusterName,omitempty,optional"`
FlavorRef string `json:"flavor_ref,omitempty"` FlavorRef string `json:"flavor_ref,omitempty"`
ImageRef string `json:"image_ref,omitempty"` ImageRef string `json:"image_ref,omitempty"`
NetworkUuid string `json:"network_uuid,omitempty"` NetworkUuid string `json:"network_uuid,omitempty"`

View File

@ -49,6 +49,52 @@ type (
} }
) )
type (
PublicImageReq {
}
PublicImageResp {
Code int `json:"code"`
Message string `json:"message"`
ImageDict []ImageDict `json:"imageRDict"`
}
ImageDict {
Id int `json:"id"`
PublicImageName string `json:"public_image_name"`
}
)
type (
PublicFlavorReq {
}
PublicFlavorResp {
Code int `json:"code"`
Message string `json:"message"`
FlavorDict []FlavorDict `json:"flavorDict"`
}
FlavorDict {
Id int `json:"id"`
PublicFlavorName string `json:"public_flavor_name"`
}
)
type (
PublicNetworkReq {
}
PublicNetworkResp {
Code int `json:"code"`
Message string `json:"message"`
NetworkDict []NetworkDict `json:"networkDict"`
}
NetworkDict {
Id int `json:"id"`
PublicNetworkName string `json:"public_netWork_name"`
}
)
type remoteResp { type remoteResp {
Code int `json:"code"` Code int `json:"code"`
Message string `json:"message"` Message string `json:"message"`
@ -113,13 +159,25 @@ type (
type ( type (
GeneralTaskReq { GeneralTaskReq {
Name string `json:"name"` Name string `json:"name"`
AdapterIds []string `json:"adapterIds"` AdapterIds []string `json:"adapterIds"`
ClusterIds []string `json:"clusterIds"` ClusterIds []string `json:"clusterIds"`
Strategy string `json:"strategy"` Strategy string `json:"strategy"`
StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"` StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"`
ReqBody []string `json:"reqBody"` ReqBody []string `json:"reqBody"`
Replicas int64 `json:"replicas,string"` Replicas int64 `json:"replicas,string"`
} }
PodLogsReq {
TaskId string `json:"taskId"`
TaskName string `json:"taskName"`
ClusterId string `json:"clusterId"`
ClusterName string `json:"clusterName"`
AdapterId string `json:"adapterId"`
AdapterName string `json:"adapterName"`
PodName string `json:"podName"`
stream bool `json:"stream"`
}
) )
type deleteTaskReq { type deleteTaskReq {
@ -157,15 +215,30 @@ type (
type ( type (
commitVmTaskReq { commitVmTaskReq {
// Name string `json:"name"` Name string `json:"name"`
// NsID string `json:"nsID"` AdapterIds []string `json:"adapterIds,optional"`
ClusterIds []string `json:"clusterIds"`
Strategy string `json:"strategy"`
StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"`
MinCount int64 `json:"min_count,optional"`
ImageRef int64 `json:"imageRef,optional"`
FlavorRef int64 `json:"flavorRef,optional"`
Uuid int64 `json:"uuid,optional"`
//Replicas int64 `json:"replicas,string"`
VmName string `json:"vm_name,optional"`
// Replicas int64 `json:"replicas,optional"` // Replicas int64 `json:"replicas,optional"`
// MatchLabels map[string]string `json:"matchLabels,optional"` // MatchLabels map[string]string `json:"matchLabels,optional"`
// AdapterId string `json:"adapterId,optional"`
// ClusterType string `json:"clusterType,optional"` // ClusterType string `json:"clusterType,optional"`
// //Virtual Machine Section // //Virtual Machine Section
CreateMulServer []CreateMulDomainServer `json:"createMulServer,optional"` //CreateMulServer []CreateMulDomainServer `json:"createMulServer,optional"`
VmOption *VmOption `json:"vmOption,optional"` //VmOption *VmOption `json:"vmOption,optional"`
}
TaskVm {
ImageRef string `json:"imageRef"`
FlavorRef string `json:"flavorRef"`
Uuid string `json:"uuid"`
Platform string `json:"platform"`
} }
VmOption { VmOption {
AdapterId string `json:"adapterId"` AdapterId string `json:"adapterId"`
@ -179,23 +252,6 @@ type (
MatchLabels map[string]string `json:"matchLabels,optional"` MatchLabels map[string]string `json:"matchLabels,optional"`
StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"` StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"`
CreateMulServer []CreateMulDomainServer `json:"createMulServer,optional"` CreateMulServer []CreateMulDomainServer `json:"createMulServer,optional"`
// Id int64 `json:"id"`
// ParticipantId int64 `json:"participantId"`
// TaskId int64 `json:"taskId"`
// AdapterId int64 `json:"adapterId"`
// ClusterId int64 `json:"clusterId"`
// FlavorRef string `json:"flavorRef"`
// ImageRef string `json:"imageRef"`
// Status string `json:"status"`
// Platform string `json:"platform"`
// Description string `json:"description"`
// AvailabilityZone string `json:"availabilityZone"`
// MinCount int64 `json:"minCount"`
// Uuid string `json:"uuid"`
// StartTime string `json:"startTime"`
// RunningTime string `json:"runningTime"`
// Result string `json:"result"`
// DeletedAt string `json:"deletedAt"`
} }
CreateMulDomainServer { CreateMulDomainServer {
@ -338,7 +394,7 @@ type (
} }
TaskModel { TaskModel {
Id int64 `json:"id,omitempty" db:"id"` // id Id int64 `json:"id,omitempty,string" db:"id"` // id
Name string `json:"name,omitempty" db:"name"` // 作业名称 Name string `json:"name,omitempty" db:"name"` // 作业名称
Description string `json:"description,omitempty" db:"description"` // 作业描述 Description string `json:"description,omitempty" db:"description"` // 作业描述
Status string `json:"status,omitempty" db:"status"` // 作业状态 Status string `json:"status,omitempty" db:"status"` // 作业状态
@ -354,6 +410,7 @@ type (
NsID string `json:"nsId,omitempty" db:"ns_id"` NsID string `json:"nsId,omitempty" db:"ns_id"`
TenantId string `json:"tenantId,omitempty" db:"tenant_id"` TenantId string `json:"tenantId,omitempty" db:"tenant_id"`
CreateTime string `json:"createTime,omitempty" db:"create_time" gorm:"autoCreateTime"` CreateTime string `json:"createTime,omitempty" db:"create_time" gorm:"autoCreateTime"`
AdapterTypeDict int `json:"adapterTypeDict" db:"create_time" gorm:"adapter_type_dict"` //任务类型(对应字典表的值
} }
) )
@ -1173,7 +1230,7 @@ type TaskStatusResp {
Succeeded int `json:"Succeeded"` Succeeded int `json:"Succeeded"`
Failed int `json:"Failed"` Failed int `json:"Failed"`
Running int `json:"Running"` Running int `json:"Running"`
Pause int `json:"Pause"` Saved int `json:"Saved"`
} }
type TaskDetailsResp { type TaskDetailsResp {
@ -1181,7 +1238,7 @@ type TaskDetailsResp {
description string `json:"description"` description string `json:"description"`
StartTime string `json:"startTime"` StartTime string `json:"startTime"`
EndTime string `json:"endTime"` EndTime string `json:"endTime"`
Strategy int64 `json:"strategy,string"` Strategy int64 `json:"strategy"`
SynergyStatus int64 `json:"synergyStatus,string"` SynergyStatus int64 `json:"synergyStatus"`
ClusterInfos []*ClusterInfo `json:"clusterInfos"` ClusterInfos []*ClusterInfo `json:"clusterInfos"`
} }

View File

@ -14,7 +14,7 @@ type (
Description string `json:"description,optional"` Description string `json:"description,optional"`
TenantId int64 `json:"tenantId,optional"` TenantId int64 `json:"tenantId,optional"`
TaskId int64 `json:"taskId,optional"` TaskId int64 `json:"taskId,optional"`
AdapterId string `json:"adapterId,optional"` AdapterIds []string `json:"adapterId"`
MatchLabels map[string]string `json:"matchLabels,optional"` MatchLabels map[string]string `json:"matchLabels,optional"`
CardCount int64 `json:"cardCount,optional"` CardCount int64 `json:"cardCount,optional"`
WorkDir string `json:"workDir,optional"` //paratera:workingDir WorkDir string `json:"workDir,optional"` //paratera:workingDir

View File

@ -145,6 +145,18 @@ service pcm {
@doc "task details" @doc "task details"
@handler taskDetails @handler taskDetails
get /core/task/details (FId) returns(TaskDetailsResp) get /core/task/details (FId) returns(TaskDetailsResp)
@doc "Get Public Image"
@handler getPublicImageHandler
get /core/getPublicImage (PublicImageReq) returns (PublicImageResp)
@doc "Get Public Flavor"
@handler getPublicFlavorHandler
get /core/getPublicFlavor (PublicFlavorReq) returns (PublicFlavorResp)
@doc "Get Public Network"
@handler getPublicNetworkHandler
get /core/getPublicNetwork (PublicNetworkReq) returns (PublicNetworkResp)
} }
//hpc二级接口 //hpc二级接口
@ -220,6 +232,8 @@ service pcm {
@handler commitGeneralTask @handler commitGeneralTask
post /cloud/task/create (GeneralTaskReq) returns () post /cloud/task/create (GeneralTaskReq) returns ()
@handler podLogs
post /cloud/pod/logs (PodLogsReq) returns (string)
} }
//智算二级接口 //智算二级接口
@ -955,6 +969,15 @@ service pcm {
@handler ScheduleGetOverviewHandler @handler ScheduleGetOverviewHandler
post /schedule/getOverview returns (ScheduleOverviewResp) post /schedule/getOverview returns (ScheduleOverviewResp)
@handler DownloadAlgothmCodeHandler
get /schedule/downloadAlgorithmCode (DownloadAlgorithmCodeReq) returns (DownloadAlgorithmCodeResp)
@handler UploadAlgothmCodeHandler
post /schedule/uploadAlgorithmCode (UploadAlgorithmCodeReq) returns (UploadAlgorithmCodeResp)
@handler GetComputeCardsByClusterHandler
get /schedule/getComputeCardsByCluster/:adapterId/:clusterId (GetComputeCardsByClusterReq) returns (GetComputeCardsByClusterResp)
} }
@server( @server(
@ -1004,9 +1027,6 @@ service pcm {
@handler CreateAlertRuleHandler @handler CreateAlertRuleHandler
post /monitoring/alert/rule (CreateAlertRuleReq) post /monitoring/alert/rule (CreateAlertRuleReq)
@handler DeleteAlertRuleHandler
delete /monitoring/alert/rule (DeleteAlertRuleReq)
@doc "alert rules" @doc "alert rules"
@handler alertRulesHandler @handler alertRulesHandler
get /monitoring/alert/rule (AlertRulesReq) returns (AlertRulesResp) get /monitoring/alert/rule (AlertRulesReq) returns (AlertRulesResp)

View File

@ -100,4 +100,41 @@ type (
StartTime string `json:"startTime,omitempty" db:"start_time"` StartTime string `json:"startTime,omitempty" db:"start_time"`
EndTime string `json:"endTime,omitempty" db:"end_time"` EndTime string `json:"endTime,omitempty" db:"end_time"`
} }
DownloadAlgorithmCodeReq {
AdapterId string `form:"adapterId"`
ClusterId string `form:"clusterId"`
ResourceType string `form:"resourceType"`
Card string `form:"card"`
TaskType string `form:"taskType"`
Dataset string `form:"dataset"`
Algorithm string `form:"algorithm"`
}
DownloadAlgorithmCodeResp {
Code string `json:"code"`
}
UploadAlgorithmCodeReq {
AdapterId string `json:"adapterId"`
ClusterId string `json:"clusterId"`
ResourceType string `json:"resourceType"`
Card string `json:"card"`
TaskType string `json:"taskType"`
Dataset string `json:"dataset"`
Algorithm string `json:"algorithm"`
Code string `json:"code"`
}
UploadAlgorithmCodeResp {
}
GetComputeCardsByClusterReq {
AdapterId string `path:"adapterId"`
ClusterId string `path:"clusterId"`
}
GetComputeCardsByClusterResp {
Cards []string `json:"cards"`
}
) )

View File

@ -0,0 +1,24 @@
package cloud
import (
"github.com/zeromicro/go-zero/rest/httpx"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/logic/cloud"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
"net/http"
)
func PodLogsHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req types.PodLogsReq
if err := httpx.Parse(r, &req); err != nil {
result.ParamErrorResult(r, w, err)
return
}
l := cloud.NewPodLogsLogic(r.Context(), svcCtx, w)
resp, err := l.PodLogs(&req, w)
result.HttpResult(r, w, resp, err)
}
}

View File

@ -0,0 +1,28 @@
package core
import (
"net/http"
"github.com/zeromicro/go-zero/rest/httpx"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/logic/core"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
)
func GetPublicFlavorHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req types.PublicFlavorReq
if err := httpx.Parse(r, &req); err != nil {
httpx.ErrorCtx(r.Context(), w, err)
return
}
l := core.NewGetPublicFlavorLogic(r.Context(), svcCtx)
resp, err := l.GetPublicFlavor(&req)
if err != nil {
httpx.ErrorCtx(r.Context(), w, err)
} else {
httpx.OkJsonCtx(r.Context(), w, resp)
}
}
}

View File

@ -0,0 +1,28 @@
package core
import (
"net/http"
"github.com/zeromicro/go-zero/rest/httpx"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/logic/core"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
)
func GetPublicImageHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req types.PublicImageReq
if err := httpx.Parse(r, &req); err != nil {
httpx.ErrorCtx(r.Context(), w, err)
return
}
l := core.NewGetPublicImageLogic(r.Context(), svcCtx)
resp, err := l.GetPublicImage(&req)
if err != nil {
httpx.ErrorCtx(r.Context(), w, err)
} else {
httpx.OkJsonCtx(r.Context(), w, resp)
}
}
}

View File

@ -0,0 +1,28 @@
package core
import (
"net/http"
"github.com/zeromicro/go-zero/rest/httpx"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/logic/core"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
)
func GetPublicNetworkHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req types.PublicNetworkReq
if err := httpx.Parse(r, &req); err != nil {
httpx.ErrorCtx(r.Context(), w, err)
return
}
l := core.NewGetPublicNetworkLogic(r.Context(), svcCtx)
resp, err := l.GetPublicNetwork(&req)
if err != nil {
httpx.ErrorCtx(r.Context(), w, err)
} else {
httpx.OkJsonCtx(r.Context(), w, resp)
}
}
}

View File

@ -175,6 +175,21 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
Path: "/core/task/details", Path: "/core/task/details",
Handler: core.TaskDetailsHandler(serverCtx), Handler: core.TaskDetailsHandler(serverCtx),
}, },
{
Method: http.MethodGet,
Path: "/core/getPublicImage",
Handler: core.GetPublicImageHandler(serverCtx),
},
{
Method: http.MethodGet,
Path: "/core/getPublicFlavor",
Handler: core.GetPublicFlavorHandler(serverCtx),
},
{
Method: http.MethodGet,
Path: "/core/getPublicNetwork",
Handler: core.GetPublicNetworkHandler(serverCtx),
},
}, },
rest.WithPrefix("/pcm/v1"), rest.WithPrefix("/pcm/v1"),
) )
@ -262,6 +277,11 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
Path: "/cloud/task/create", Path: "/cloud/task/create",
Handler: cloud.CommitGeneralTaskHandler(serverCtx), Handler: cloud.CommitGeneralTaskHandler(serverCtx),
}, },
{
Method: http.MethodPost,
Path: "/cloud/pod/logs",
Handler: cloud.PodLogsHandler(serverCtx),
},
}, },
rest.WithPrefix("/pcm/v1"), rest.WithPrefix("/pcm/v1"),
) )
@ -1190,6 +1210,21 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
Path: "/schedule/getOverview", Path: "/schedule/getOverview",
Handler: schedule.ScheduleGetOverviewHandler(serverCtx), Handler: schedule.ScheduleGetOverviewHandler(serverCtx),
}, },
{
Method: http.MethodGet,
Path: "/schedule/downloadAlgorithmCode",
Handler: schedule.DownloadAlgothmCodeHandler(serverCtx),
},
{
Method: http.MethodPost,
Path: "/schedule/uploadAlgorithmCode",
Handler: schedule.UploadAlgothmCodeHandler(serverCtx),
},
{
Method: http.MethodGet,
Path: "/schedule/getComputeCardsByCluster/:adapterId/:clusterId",
Handler: schedule.GetComputeCardsByClusterHandler(serverCtx),
},
}, },
rest.WithPrefix("/pcm/v1"), rest.WithPrefix("/pcm/v1"),
) )

View File

@ -0,0 +1,25 @@
package schedule
import (
"net/http"
"github.com/zeromicro/go-zero/rest/httpx"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/logic/schedule"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
)
func DownloadAlgothmCodeHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req types.DownloadAlgorithmCodeReq
if err := httpx.Parse(r, &req); err != nil {
result.ParamErrorResult(r, w, err)
return
}
l := schedule.NewDownloadAlgothmCodeLogic(r.Context(), svcCtx)
resp, err := l.DownloadAlgorithmCode(&req)
result.HttpResult(r, w, resp, err)
}
}

View File

@ -0,0 +1,28 @@
package schedule
import (
"net/http"
"github.com/zeromicro/go-zero/rest/httpx"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/logic/schedule"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
)
func GetComputeCardsByClusterHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req types.GetComputeCardsByClusterReq
if err := httpx.Parse(r, &req); err != nil {
httpx.ErrorCtx(r.Context(), w, err)
return
}
l := schedule.NewGetComputeCardsByClusterLogic(r.Context(), svcCtx)
resp, err := l.GetComputeCardsByCluster(&req)
if err != nil {
httpx.ErrorCtx(r.Context(), w, err)
} else {
httpx.OkJsonCtx(r.Context(), w, resp)
}
}
}

View File

@ -0,0 +1,25 @@
package schedule
import (
"net/http"
"github.com/zeromicro/go-zero/rest/httpx"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/logic/schedule"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
)
func UploadAlgothmCodeHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req types.UploadAlgorithmCodeReq
if err := httpx.Parse(r, &req); err != nil {
result.ParamErrorResult(r, w, err)
return
}
l := schedule.NewUploadAlgothmCodeLogic(r.Context(), svcCtx)
resp, err := l.UploadAlgorithmCode(&req)
result.HttpResult(r, w, resp, err)
}
}

View File

@ -2,10 +2,13 @@ package ai
import ( import (
"context" "context"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"github.com/zeromicro/go-zero/core/logx" "strconv"
"sync"
"time"
) )
type GetCenterOverviewLogic struct { type GetCenterOverviewLogic struct {
@ -24,6 +27,8 @@ func NewGetCenterOverviewLogic(ctx context.Context, svcCtx *svc.ServiceContext)
func (l *GetCenterOverviewLogic) GetCenterOverview() (resp *types.CenterOverviewResp, err error) { func (l *GetCenterOverviewLogic) GetCenterOverview() (resp *types.CenterOverviewResp, err error) {
resp = &types.CenterOverviewResp{} resp = &types.CenterOverviewResp{}
var mu sync.RWMutex
ch := make(chan struct{})
var centerNum int32 var centerNum int32
var taskNum int32 var taskNum int32
@ -37,6 +42,8 @@ func (l *GetCenterOverviewLogic) GetCenterOverview() (resp *types.CenterOverview
centerNum = int32(len(adapterList)) centerNum = int32(len(adapterList))
resp.CenterNum = centerNum resp.CenterNum = centerNum
go l.updateClusterResource(&mu, ch, adapterList)
for _, adapter := range adapterList { for _, adapter := range adapterList {
taskList, err := l.svcCtx.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id) taskList, err := l.svcCtx.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id)
if err != nil { if err != nil {
@ -52,7 +59,10 @@ func (l *GetCenterOverviewLogic) GetCenterOverview() (resp *types.CenterOverview
continue continue
} }
for _, cluster := range clusters.List { for _, cluster := range clusters.List {
mu.RLock()
clusterResource, err := l.svcCtx.Scheduler.AiStorages.GetClusterResourcesById(cluster.Id) clusterResource, err := l.svcCtx.Scheduler.AiStorages.GetClusterResourcesById(cluster.Id)
mu.RUnlock()
if err != nil { if err != nil {
continue continue
} }
@ -60,9 +70,76 @@ func (l *GetCenterOverviewLogic) GetCenterOverview() (resp *types.CenterOverview
totalTops += clusterResource.CardTopsTotal totalTops += clusterResource.CardTopsTotal
} }
} }
resp.CardNum = cardNum
resp.CardNum = centerNum
resp.PowerInTops = totalTops resp.PowerInTops = totalTops
return resp, nil select {
case _ = <-ch:
return resp, nil
case <-time.After(2 * time.Second):
return resp, nil
}
}
func (l *GetCenterOverviewLogic) updateClusterResource(mu *sync.RWMutex, ch chan<- struct{}, list []*types.AdapterInfo) {
var wg sync.WaitGroup
for _, adapter := range list {
clusters, err := l.svcCtx.Scheduler.AiStorages.GetClustersByAdapterId(adapter.Id)
if err != nil {
continue
}
for _, cluster := range clusters.List {
c := cluster
mu.RLock()
clusterResource, err := l.svcCtx.Scheduler.AiStorages.GetClusterResourcesById(c.Id)
mu.RUnlock()
if err != nil {
continue
}
wg.Add(1)
go func() {
stat, err := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][c.Id].GetResourceStats(l.ctx)
if err != nil {
wg.Done()
return
}
clusterType, err := strconv.ParseInt(adapter.Type, 10, 64)
if err != nil {
wg.Done()
return
}
var cardTotal int64
var topsTotal float64
for _, card := range stat.CardsAvail {
cardTotal += int64(card.CardNum)
topsTotal += card.TOpsAtFp16 * float64(card.CardNum)
}
mu.Lock()
if (models.TClusterResource{} == *clusterResource) {
err = l.svcCtx.Scheduler.AiStorages.SaveClusterResources(c.Id, c.Name, clusterType, float64(stat.CpuCoreAvail), float64(stat.CpuCoreTotal),
stat.MemAvail, stat.MemTotal, stat.DiskAvail, stat.DiskTotal, float64(stat.GpuAvail), float64(stat.GpuTotal), cardTotal, topsTotal)
if err != nil {
mu.Unlock()
wg.Done()
return
}
} else {
clusterResource.CardTotal = cardTotal
clusterResource.CardTopsTotal = topsTotal
err := l.svcCtx.Scheduler.AiStorages.UpdateClusterResources(clusterResource)
if err != nil {
mu.Unlock()
wg.Done()
return
}
}
mu.Unlock()
wg.Done()
}()
}
}
wg.Wait()
ch <- struct{}{}
} }

View File

@ -3,6 +3,8 @@ package ai
import ( import (
"context" "context"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"strconv"
"sync"
"time" "time"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
@ -17,8 +19,6 @@ type GetCenterTaskListLogic struct {
svcCtx *svc.ServiceContext svcCtx *svc.ServiceContext
} }
const layout = "2006-01-02 15:04:05"
func NewGetCenterTaskListLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetCenterTaskListLogic { func NewGetCenterTaskListLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetCenterTaskListLogic {
return &GetCenterTaskListLogic{ return &GetCenterTaskListLogic{
Logger: logx.WithContext(ctx), Logger: logx.WithContext(ctx),
@ -29,25 +29,40 @@ func NewGetCenterTaskListLogic(ctx context.Context, svcCtx *svc.ServiceContext)
func (l *GetCenterTaskListLogic) GetCenterTaskList() (resp *types.CenterTaskListResp, err error) { func (l *GetCenterTaskListLogic) GetCenterTaskList() (resp *types.CenterTaskListResp, err error) {
resp = &types.CenterTaskListResp{} resp = &types.CenterTaskListResp{}
var mu sync.RWMutex
ch := make(chan struct{})
adapterList, err := l.svcCtx.Scheduler.AiStorages.GetAdaptersByType("1") adapterList, err := l.svcCtx.Scheduler.AiStorages.GetAdaptersByType("1")
if err != nil { if err != nil {
return nil, err return nil, err
} }
go l.updateAiTaskStatus(&mu, ch, adapterList)
for _, adapter := range adapterList { for _, adapter := range adapterList {
mu.RLock()
taskList, err := l.svcCtx.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id) taskList, err := l.svcCtx.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id)
mu.RUnlock()
if err != nil { if err != nil {
continue continue
} }
for _, task := range taskList { for _, task := range taskList {
var elapsed time.Duration var elapsed time.Duration
start, _ := time.Parse(layout, task.CommitTime) switch task.Status {
if task.Status != constants.Completed { case constants.Completed:
elapsed = start.Sub(time.Now()) end, err := time.ParseInLocation(constants.Layout, task.EndTime, time.Local)
} else { if err != nil {
end, _ := time.Parse(layout, task.EndTime) elapsed = time.Duration(0)
elapsed = start.Sub(end) }
start, err := time.ParseInLocation(constants.Layout, task.StartTime, time.Local)
if err != nil {
elapsed = time.Duration(0)
}
elapsed = end.Sub(start)
case constants.Running:
elapsed = time.Now().Sub(task.CommitTime)
default:
elapsed = 0
} }
t := &types.AiTask{ t := &types.AiTask{
@ -59,5 +74,48 @@ func (l *GetCenterTaskListLogic) GetCenterTaskList() (resp *types.CenterTaskList
} }
} }
return resp, nil select {
case _ = <-ch:
return resp, nil
case <-time.After(2 * time.Second):
return resp, nil
}
}
func (l *GetCenterTaskListLogic) updateAiTaskStatus(mu *sync.RWMutex, ch chan<- struct{}, list []*types.AdapterInfo) {
var wg sync.WaitGroup
for _, adapter := range list {
taskList, err := l.svcCtx.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id)
if err != nil {
continue
}
for _, task := range taskList {
t := task
if t.Status == constants.Completed {
continue
}
wg.Add(1)
go func() {
trainingTask, err := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][strconv.FormatInt(t.ClusterId, 10)].GetTrainingTask(l.ctx, t.JobId)
if err != nil {
wg.Done()
return
}
t.Status = trainingTask.Status
t.StartTime = trainingTask.Start
t.EndTime = trainingTask.End
mu.Lock()
err = l.svcCtx.Scheduler.AiStorages.UpdateAiTask(t)
mu.Unlock()
if err != nil {
wg.Done()
return
}
wg.Done()
}()
}
}
wg.Wait()
ch <- struct{}{}
} }

View File

@ -89,16 +89,18 @@ func (l *CommitGeneralTaskLogic) CommitGeneralTask(req *types.GeneralTaskReq) er
//查询调度策略 //查询调度策略
err = tx.Raw(sqlStr, req.Strategy).Scan(&strategy).Error err = tx.Raw(sqlStr, req.Strategy).Scan(&strategy).Error
taskModel := models.Task{ taskModel := models.Task{
Id: utils.GenSnowflakeID(), Id: utils.GenSnowflakeID(),
Status: constants.Pending, Status: constants.Saved,
Name: req.Name, Name: req.Name,
CommitTime: time.Now(), CommitTime: time.Now(),
YamlString: strings.Join(req.ReqBody, "\n---\n"), YamlString: strings.Join(req.ReqBody, "\n---\n"),
TaskTypeDict: 0, AdapterTypeDict: 0,
SynergyStatus: synergyStatus, SynergyStatus: synergyStatus,
Strategy: strategy, Strategy: strategy,
} }
var taskClouds []cloud.TaskCloudModel var taskClouds []cloud.TaskCloudModel
adapterName := ""
tx.Table("t_adapter").Select("name").Where("id=?", adapterId).Find(&adapterName)
for _, r := range rs { for _, r := range rs {
for _, s := range req.ReqBody { for _, s := range req.ReqBody {
sStruct := UnMarshalK8sStruct(s, int64(r.Replica)) sStruct := UnMarshalK8sStruct(s, int64(r.Replica))
@ -107,17 +109,16 @@ func (l *CommitGeneralTaskLogic) CommitGeneralTask(req *types.GeneralTaskReq) er
taskCloud.TaskId = uint(taskModel.Id) taskCloud.TaskId = uint(taskModel.Id)
clusterId, _ := strconv.ParseUint(r.ClusterId, 10, 64) clusterId, _ := strconv.ParseUint(r.ClusterId, 10, 64)
taskCloud.AdapterId = uint(adapterId) taskCloud.AdapterId = uint(adapterId)
taskCloud.AdapterName = adapterName
taskCloud.ClusterId = uint(clusterId) taskCloud.ClusterId = uint(clusterId)
taskCloud.ClusterName = r.ClusterName taskCloud.ClusterName = r.ClusterName
taskCloud.Status = constants.Pending taskCloud.Status = constants.Saved
taskCloud.YamlString = string(unString) taskCloud.YamlString = string(unString)
taskCloud.Kind = sStruct.GetKind() taskCloud.Kind = sStruct.GetKind()
taskCloud.Namespace = sStruct.GetNamespace() taskCloud.Namespace = sStruct.GetNamespace()
taskClouds = append(taskClouds, taskCloud) taskClouds = append(taskClouds, taskCloud)
} }
} }
adapterName := ""
tx.Table("t_adapter").Select("name").Where("id=?", adapterId).Find(&adapterName)
noticeInfo := clientCore.NoticeInfo{ noticeInfo := clientCore.NoticeInfo{
AdapterId: int64(adapterId), AdapterId: int64(adapterId),
AdapterName: adapterName, AdapterName: adapterName,

View File

@ -0,0 +1,30 @@
package cloud
import (
"context"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"net/http"
)
type PodLogsLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
w http.ResponseWriter
}
func NewPodLogsLogic(ctx context.Context, svcCtx *svc.ServiceContext, w http.ResponseWriter) *PodLogsLogic {
return &PodLogsLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
w: w,
}
}
func (l *PodLogsLogic) PodLogs(req *types.PodLogsReq, w http.ResponseWriter) (resp string, err error) {
// todo: add your logic here and delete this line
return
}

View File

@ -1,65 +0,0 @@
package core
import (
"context"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
tool "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"k8s.io/apimachinery/pkg/util/json"
"time"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type CommitHpcTaskLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewCommitHpcTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CommitHpcTaskLogic {
return &CommitHpcTaskLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *types.CommitHpcTaskResp, err error) {
// 构建主任务结构体
taskModel := models.Task{
Status: constants.Saved,
Description: req.Description,
Name: req.Name,
CommitTime: time.Now(),
}
// 保存任务数据到数据库
tx := l.svcCtx.DbEngin.Create(&taskModel)
if tx.Error != nil {
return nil, tx.Error
}
hpc := models.Hpc{}
tool.Convert(req, &hpc)
mqInfo := response.TaskInfo{
TaskId: taskModel.Id,
TaskType: "hpc",
MatchLabels: req.MatchLabels,
//Metadata: hpc,
}
req.TaskId = taskModel.Id
// 将任务数据转换成消息体
reqMessage, err := json.Marshal(mqInfo)
if err != nil {
logx.Error(err)
return nil, err
}
publish := l.svcCtx.RedisClient.Publish(context.Background(), mqInfo.TaskType, reqMessage)
if publish.Err() != nil {
return nil, publish.Err()
}
return
}

View File

@ -3,12 +3,14 @@ package core
import ( import (
"context" "context"
"fmt" "fmt"
clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/api/client"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"strconv" "strconv"
"time" "time"
@ -32,78 +34,116 @@ func NewCommitVmTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Comm
func (l *CommitVmTaskLogic) CommitVmTask(req *types.CommitVmTaskReq) (resp *types.CommitVmTaskResp, err error) { func (l *CommitVmTaskLogic) CommitVmTask(req *types.CommitVmTaskReq) (resp *types.CommitVmTaskResp, err error) {
// todo: add your logic here and delete this line // todo: add your logic here and delete this line
resp = &types.CommitVmTaskResp{} resp = &types.CommitVmTaskResp{}
tx := l.svcCtx.DbEngin.Begin()
//Building the main task structure //Building the main task structure
opt := &option.VmOption{ defer func() {
AdapterId: req.VmOption.AdapterId, if p := recover(); p != nil {
Replicas: req.VmOption.Replicas, tx.Rollback()
Strategy: req.VmOption.Strategy, logx.Error(p)
ClusterToStaticWeight: req.VmOption.StaticWeightMap, } else if tx.Error != nil {
Status: constants.Saved, logx.Info("rollback, error", tx.Error)
MatchLabels: req.VmOption.MatchLabels, tx.Rollback()
StaticWeightMap: req.VmOption.StaticWeightMap, } else {
Name: req.VmOption.Name, tx = tx.Commit()
CommitTime: time.Now(), logx.Info("commit success")
}
}()
//TODO adapter
adapterId, _ := strconv.ParseUint(req.AdapterIds[0], 10, 64)
var clusters []*models.VmModel
err2 := l.svcCtx.DbEngin.Raw("SELECT * FROM `t_cluster` where adapter_id in ? and id in ?", req.AdapterIds, req.ClusterIds).Scan(&clusters).Error
if err2 != nil {
logx.Errorf("CommitGeneralTask() => sql execution error: %v", err)
} }
taskModel := models.Task{
Status: constants.Saved,
Name: req.VmOption.Name,
CommitTime: time.Now(),
Description: "vm task",
}
// Save task data to database
tx := l.svcCtx.DbEngin.Create(&taskModel)
if tx.Error != nil {
return nil, tx.Error
}
//var clusters []*models.VmModel
//err2 := l.svcCtx.DbEngin.Raw("SELECT * FROM `t_cluster` where adapter_id in ? and id in ?", req.VmOption.AdapterId, req.VmOption.VmClusterIds).Scan(&clusters).Error
//if err2 != nil {
// logx.Errorf("CommitGeneralTask() => sql execution error: %v", err)
// //return errors.Errorf("the cluster does not match the drive resources. Check the data"), nil
//}
taskVm := models.TaskVm{} taskVm := models.TaskVm{}
//TODO 执行策略返回集群跟 Replica //TODO 执行策略返回集群跟 Replica
/*opt := &option.VmOption{} opt := &option.VmOption{}
utils.Convert(&req, &opt)*/ utils.Convert(&req, &opt)
// 2、Initialize scheduler // 2、Initialize scheduler
vmSchdl, err := schedulers.NewVmScheduler(l.ctx, "", l.svcCtx.Scheduler, opt, l.svcCtx.DbEngin, l.svcCtx.PromClient) vmSchdl, _ := schedulers.NewVmScheduler(l.ctx, "", l.svcCtx.Scheduler, opt, l.svcCtx.DbEngin, l.svcCtx.PromClient)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// 3、Return scheduling results // 3、Return scheduling results
results, err := l.svcCtx.Scheduler.AssignAndSchedule(vmSchdl) results, err := l.svcCtx.Scheduler.AssignAndSchedule(vmSchdl)
if err != nil { if err != nil {
logx.Errorf("AssignAndSchedule() => execution error: %v", err)
return nil, err return nil, err
} }
rs := (results).([]*schedulers.VmResult) rs := (results).([]*schedulers.VmResult)
var synergyStatus int64
if len(rs) > 1 {
synergyStatus = 1
}
var strategy int64
sqlStr := `select t_dict_item.item_value
from t_dict
left join t_dict_item on t_dict.id = t_dict_item.dict_id
where item_text = ?
and t_dict.dict_code = 'schedule_Strategy'`
//查询调度策略
err = tx.Raw(sqlStr, req.Strategy).Scan(&strategy).Error
taskModel := models.Task{
Id: utils.GenSnowflakeID(),
Status: constants.Saved,
Name: req.Name,
CommitTime: time.Now(),
Description: "vm task",
AdapterTypeDict: 3,
SynergyStatus: synergyStatus,
Strategy: strategy,
}
//var taskVms models.TaskVm
var VmObject types.TaskVm
for _, r := range rs { for _, r := range rs {
for _, CreateMulServer := range req.CreateMulServer { for _, clusterId := range req.ClusterIds {
if r.Replica > 0 && r.ClusterId == CreateMulServer.ClusterId { if r.Replica > 0 && r.ClusterId == clusterId {
fmt.Println("", req.CreateMulServer) fmt.Println("", clusterId)
var clusterIds []int64 sql := `SELECT vi.image_id as imageRef,vf.flavor_id as flavorRef,vn.network_id as uuid,vi.cluster_name as platform FROM
l.svcCtx.DbEngin.Raw("SELECT id FROM `t_cluster` where adapter_id = ? ", req.VmOption.AdapterId).Scan(&clusterIds) vm_flavor vf
if len(clusterIds) == 0 || clusterIds == nil { LEFT JOIN vm_image vi ON vf.cluster_id = vi.cluster_id
return nil, nil LEFT JOIN vm_network vn ON vf.cluster_id = vn.cluster_id
WHERE
vi.cluster_id = ?
AND vf.public_flavor_id = ?
AND vi.public_image_id = ?
AND vn.public_network_id = ?`
// err2 := l.svcCtx.DbEngin.Raw(sql, clusterId, req.FlavorRef, req.ImageRef, req.Uuid).Scan(&taskVm).Error
txVm := l.svcCtx.DbEngin.Raw(sql, clusterId, req.FlavorRef, req.ImageRef, req.Uuid).Scan(&VmObject)
if txVm.Error != nil {
logx.Error(err)
return nil, txVm.Error
} }
adapterId, _ := strconv.ParseUint(req.VmOption.AdapterId, 10, 64) if err2 != nil {
taskVm.AdapterId = int64(adapterId) logx.Errorf("CommitGeneralTask() => sql execution error: %v", err)
clusterId, _ := strconv.ParseUint(r.ClusterId, 10, 64) //return errors.Errorf("the cluster does not match the drive resources. Check the data"), nil
taskVm.ClusterId = int64(clusterId) }
taskVm.Name = req.VmOption.Name taskVm.Name = req.VmName
taskVm.TaskId = taskModel.Id
clusterId, _ = strconv.ParseUint(r.ClusterId, 10, 64)
taskVm.ClusterId = int64(clusterId)
taskVm.Status = "Saved" taskVm.Status = "Saved"
taskVm.StartTime = time.Now().String() taskVm.StartTime = time.Now().String()
taskVm.MinCount = CreateMulServer.Min_count taskVm.MinCount = req.MinCount
taskVm.ImageRef = CreateMulServer.ImageRef /* sqlImage := "SELECT image_id FROM `vm_image_dict` vm left join vm_image vi on vm.id=vi.public_image_id where cluster_id =? AND public_image_id = ?"
taskVm.FlavorRef = CreateMulServer.FlavorRef txImage := l.svcCtx.DbEngin.Raw(sqlImage, clusterId, req.ImageRef).Scan(&ImageRef)
taskVm.Uuid = CreateMulServer.Uuid if txImage.Error != nil {
taskVm.Platform = CreateMulServer.Platform logx.Error(err)
return nil, txImage.Error
}*/
taskVm.ImageRef = VmObject.ImageRef
/* sqlFlavor := "SELECT * FROM `vm_flavor_dict` vm left join vm_flavor vf on vm.id=vf.public_flavor_id where cluster_id =? AND public_flavor_id = ?"
txFlavor := l.svcCtx.DbEngin.Raw(sqlFlavor, clusterId, req.FlavorRef).Scan(&FlavorRef)
if txFlavor.Error != nil {
logx.Error(err)
return nil, txFlavor.Error
}*/
taskVm.FlavorRef = VmObject.FlavorRef
/* sqlNetwork := "SELECT * FROM `vm_network_dict` vm left join vm_network vi on vm.id=vi.public_network_id where cluster_id =? AND public_network_id = ?"
txNetwork := l.svcCtx.DbEngin.Raw(sqlNetwork, clusterId, req.Uuid).Scan(&NetworkRef)
if txNetwork.Error != nil {
logx.Error(err)
return nil, txNetwork.Error
}*/
taskVm.Uuid = VmObject.Uuid
taskVm.Platform = VmObject.Platform
tx = l.svcCtx.DbEngin.Create(&taskVm) tx = l.svcCtx.DbEngin.Create(&taskVm)
if tx.Error != nil { if tx.Error != nil {
return nil, tx.Error return nil, tx.Error
@ -111,8 +151,29 @@ func (l *CommitVmTaskLogic) CommitVmTask(req *types.CommitVmTaskReq) (resp *type
} }
} }
} }
adapterName := ""
tx.Table("t_adapter").Select("name").Where("id=?", adapterId).Find(&adapterName)
noticeInfo := clientCore.NoticeInfo{
AdapterId: int64(adapterId),
AdapterName: adapterName,
NoticeType: "create",
TaskName: req.Name,
Incident: "任务创建中",
CreatedTime: time.Now(),
}
// Save task data to database
tf := l.svcCtx.DbEngin.Create(&taskModel)
if tf.Error != nil {
return nil, tf.Error
}
result := l.svcCtx.DbEngin.Table("task_vm").Create(&taskVm)
result = l.svcCtx.DbEngin.Table("t_notice").Create(&noticeInfo)
if result.Error != nil {
logx.Errorf("Task creation failure, err: %v", result.Error)
}
resp.Code = 200 resp.Code = 200
resp.Msg = "Success" resp.Msg = "Success"
return resp, nil return resp, nil
} }

View File

@ -30,7 +30,7 @@ func (l *CountTaskStatusLogic) CountTaskStatus() (resp *types.TaskStatusResp, er
COUNT(CASE WHEN status = 'Succeeded' THEN 1 END) AS Succeeded, COUNT(CASE WHEN status = 'Succeeded' THEN 1 END) AS Succeeded,
COUNT(CASE WHEN status = 'Failed' THEN 1 END) AS Failed, COUNT(CASE WHEN status = 'Failed' THEN 1 END) AS Failed,
COUNT(CASE WHEN status = 'Running' THEN 1 END) AS Running, COUNT(CASE WHEN status = 'Running' THEN 1 END) AS Running,
COUNT(CASE WHEN status = 'Pause' THEN 1 END) AS Pause COUNT(CASE WHEN status = 'Saved' THEN 1 END) AS Saved
FROM task;` FROM task;`
err = l.svcCtx.DbEngin.Raw(sqlStr).Scan(&resp).Error err = l.svcCtx.DbEngin.Raw(sqlStr).Scan(&resp).Error
if err != nil { if err != nil {

View File

@ -0,0 +1,40 @@
package core
import (
"context"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type GetPublicFlavorLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewGetPublicFlavorLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetPublicFlavorLogic {
return &GetPublicFlavorLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *GetPublicFlavorLogic) GetPublicFlavor(req *types.PublicFlavorReq) (resp *types.PublicFlavorResp, err error) {
// todo: add your logic here and delete this line
resp = &types.PublicFlavorResp{}
var flavorDict []types.FlavorDict
sqlStrTask := "SELECT * FROM `vm_flavor_dict`"
txTask := l.svcCtx.DbEngin.Raw(sqlStrTask).Scan(&flavorDict)
if txTask.Error != nil {
logx.Error(err)
return nil, txTask.Error
}
resp.Code = 200
resp.Message = "success"
resp.FlavorDict = flavorDict
return resp, nil
}

View File

@ -0,0 +1,40 @@
package core
import (
"context"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type GetPublicImageLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewGetPublicImageLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetPublicImageLogic {
return &GetPublicImageLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *GetPublicImageLogic) GetPublicImage(req *types.PublicImageReq) (resp *types.PublicImageResp, err error) {
// todo: add your logic here and delete this line
resp = &types.PublicImageResp{}
var iamgeDict []types.ImageDict
sqlStrTask := "SELECT * FROM `vm_image_dict`"
txTask := l.svcCtx.DbEngin.Raw(sqlStrTask).Scan(&iamgeDict)
if txTask.Error != nil {
logx.Error(err)
return nil, txTask.Error
}
resp.Code = 200
resp.Message = "success"
resp.ImageDict = iamgeDict
return resp, nil
}

View File

@ -0,0 +1,39 @@
package core
import (
"context"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type GetPublicNetworkLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewGetPublicNetworkLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetPublicNetworkLogic {
return &GetPublicNetworkLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *GetPublicNetworkLogic) GetPublicNetwork(req *types.PublicNetworkReq) (resp *types.PublicNetworkResp, err error) {
// todo: add your logic here and delete this line
resp = &types.PublicNetworkResp{}
var networkDict []types.NetworkDict
sqlStrTask := "SELECT * FROM `vm_network_dict`"
txTask := l.svcCtx.DbEngin.Raw(sqlStrTask).Scan(&networkDict)
if txTask.Error != nil {
logx.Error(err)
return nil, txTask.Error
}
resp.Code = 200
resp.Message = "success"
resp.NetworkDict = networkDict
return resp, nil
}

View File

@ -2,10 +2,11 @@ package core
import ( import (
"context" "context"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils/timeutils"
"time"
"github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
) )
@ -28,7 +29,7 @@ func (l *PageListTaskLogic) PageListTask(req *types.PageTaskReq) (resp *types.Pa
limit := req.PageSize limit := req.PageSize
offset := req.PageSize * (req.PageNum - 1) offset := req.PageSize * (req.PageNum - 1)
resp = &types.PageResult{} resp = &types.PageResult{}
var list []types.TaskModel var list []*types.TaskModel
db := l.svcCtx.DbEngin.Model(&types.TaskModel{}).Table("task") db := l.svcCtx.DbEngin.Model(&types.TaskModel{}).Table("task")
db = db.Where("deleted_at is null") db = db.Where("deleted_at is null")
@ -48,8 +49,18 @@ func (l *PageListTaskLogic) PageListTask(req *types.PageTaskReq) (resp *types.Pa
if err != nil { if err != nil {
return nil, result.NewDefaultError(err.Error()) return nil, result.NewDefaultError(err.Error())
} }
for _, model := range list {
resp.List = list if model.StartTime != "" && model.EndTime == "" {
startTime := timeutils.TimeStringToGoTime(model.StartTime)
model.RunningTime = int64(time.Now().Sub(startTime).Seconds())
}
if model.StartTime != "" && model.EndTime != "" {
startTime := timeutils.TimeStringToGoTime(model.StartTime)
endTime := timeutils.TimeStringToGoTime(model.EndTime)
model.RunningTime = int64(endTime.Sub(startTime).Seconds())
}
}
resp.List = &list
resp.PageSize = req.PageSize resp.PageSize = req.PageSize
resp.PageNum = req.PageNum resp.PageNum = req.PageNum
resp.Total = total resp.Total = total

View File

@ -41,87 +41,168 @@ func (l *PushTaskInfoLogic) PushTaskInfo(req *clientCore.PushTaskInfoReq) (*clie
} }
l.svcCtx.DbEngin.Exec("update task_cloud set status = ?,start_time = ?,result = ? where task_id = ?", l.svcCtx.DbEngin.Exec("update task_cloud set status = ?,start_time = ?,result = ? where task_id = ?",
cloudInfo.Status, cloudInfo.StartTime, cloudInfo.Result, cloudInfo.TaskId) cloudInfo.Status, cloudInfo.StartTime, cloudInfo.Result, cloudInfo.TaskId)
syncTask(l.svcCtx.DbEngin, int64(taskId)) var taskName string
l.svcCtx.DbEngin.Raw("select name as kind from task where id = ?", taskId).Scan(&taskName)
noticeInfo := clientCore.NoticeInfo{
TaskId: cloudInfo.TaskId,
AdapterId: cloudInfo.AdapterId,
AdapterName: cloudInfo.AdapterName,
ClusterId: cloudInfo.ClusterId,
ClusterName: cloudInfo.ClusterName,
TaskName: taskName,
}
syncTask(l.svcCtx.DbEngin, noticeInfo)
} }
case 2: case 2:
for _, hpcInfo := range req.HpcInfoList { for _, hpcInfo := range req.HpcInfoList {
l.svcCtx.DbEngin.Exec("update task_hpc set status = ?,start_time = ?,job_id = ? where cluster_id = ? and task_id = ? and name = ?", l.svcCtx.DbEngin.Exec("update task_hpc set status = ?,start_time = ?,job_id = ? where cluster_id = ? and task_id = ? and name = ?",
hpcInfo.Status, hpcInfo.StartTime, hpcInfo.JobId, hpcInfo.ClusterId, hpcInfo.TaskId, hpcInfo.Name) hpcInfo.Status, hpcInfo.StartTime, hpcInfo.JobId, hpcInfo.ClusterId, hpcInfo.TaskId, hpcInfo.Name)
syncTask(l.svcCtx.DbEngin, hpcInfo.TaskId) noticeInfo := clientCore.NoticeInfo{
TaskId: hpcInfo.TaskId,
AdapterId: hpcInfo.AdapterId,
AdapterName: hpcInfo.AdapterName,
ClusterId: hpcInfo.ClusterId,
ClusterName: hpcInfo.ClusterName,
TaskName: hpcInfo.Name,
}
syncTask(l.svcCtx.DbEngin, noticeInfo)
} }
case 1: case 1:
for _, aiInfo := range req.AiInfoList { for _, aiInfo := range req.AiInfoList {
l.svcCtx.DbEngin.Exec("update ai set status = ?,start_time = ?,project_id = ?,job_id = ? where participant_id = ? and task_id = ? and name = ?", l.svcCtx.DbEngin.Exec("update task_ai set status = ?,start_time = ?,project_id = ?,job_id = ? where participant_id = ? and task_id = ? and name = ?",
aiInfo.Status, aiInfo.StartTime, aiInfo.ProjectId, aiInfo.JobId, req.AdapterId, aiInfo.TaskId, aiInfo.Name) aiInfo.Status, aiInfo.StartTime, aiInfo.ProjectId, aiInfo.JobId, req.AdapterId, aiInfo.TaskId, aiInfo.Name)
syncTask(l.svcCtx.DbEngin, aiInfo.TaskId) noticeInfo := clientCore.NoticeInfo{
TaskId: aiInfo.TaskId,
AdapterId: aiInfo.AdapterId,
AdapterName: aiInfo.AdapterName,
ClusterId: aiInfo.ClusterId,
ClusterName: aiInfo.ClusterName,
TaskName: aiInfo.Name,
}
syncTask(l.svcCtx.DbEngin, noticeInfo)
} }
case 3: case 3:
for _, vmInfo := range req.VmInfoList { for _, vmInfo := range req.VmInfoList {
l.svcCtx.DbEngin.Exec("update task_vm set status = ?,start_time = ? where participant_id = ? and task_id = ? and name = ?", l.svcCtx.DbEngin.Exec("update task_vm set status = ?,start_time = ? where participant_id = ? and task_id = ? and name = ?",
vmInfo.Status, vmInfo.StartTime, req.AdapterId, vmInfo.TaskId, vmInfo.Name) vmInfo.Status, vmInfo.StartTime, req.AdapterId, vmInfo.TaskId, vmInfo.Name)
syncTask(l.svcCtx.DbEngin, vmInfo.TaskId) noticeInfo := clientCore.NoticeInfo{
TaskId: vmInfo.TaskId,
AdapterId: vmInfo.AdapterId,
AdapterName: vmInfo.AdapterName,
ClusterId: vmInfo.ClusterId,
ClusterName: vmInfo.ClusterName,
TaskName: vmInfo.Name,
}
syncTask(l.svcCtx.DbEngin, noticeInfo)
} }
} }
return &resp, nil return &resp, nil
} }
func syncTask(gorm *gorm.DB, taskId int64) { func syncTask(gorm *gorm.DB, noticeInfo clientCore.NoticeInfo) {
var allStatus string var allStatus string
tx := gorm.Raw("SELECT CONCAT_WS(',',GROUP_CONCAT(DISTINCT h.status) ,GROUP_CONCAT(DISTINCT a.status) ,GROUP_CONCAT(DISTINCT c.status))as status from task t left join hpc h on t.id = h.task_id left join task_cloud c on t.id = c.task_id left join ai a on t.id = a.task_id where t.id = ?", taskId).Scan(&allStatus) tx := gorm.Raw("SELECT CONCAT_WS(',',GROUP_CONCAT(DISTINCT h.status) ,GROUP_CONCAT(DISTINCT a.status) ,GROUP_CONCAT(DISTINCT c.status))as status from task t left join task_hpc h on t.id = h.task_id left join task_cloud c on t.id = c.task_id left join task_ai a on t.id = a.task_id where t.id = ?", noticeInfo.TaskId).Scan(&allStatus)
if tx.Error != nil { if tx.Error != nil {
logx.Error(tx.Error) logx.Error(tx.Error)
} }
// 子状态统一则修改主任务状态 allStatus = strings.ToUpper(allStatus)
statusArray := strings.Split(allStatus, ",") for pcmStatus, ProviderStatus := range clientCore.StatusMapping {
if len(removeRepeatedElement(statusArray)) == 1 { for _, originalStatus := range ProviderStatus {
updateTask(gorm, taskId, statusArray[0]) // if Failed type status appears in subTask then update mainTask to Failed
if pcmStatus == "Failed" && strings.Contains(allStatus, originalStatus) {
updateTask(gorm, noticeInfo.TaskId, constants.Failed)
noticeInfo := clientCore.NoticeInfo{
AdapterId: noticeInfo.AdapterId,
AdapterName: noticeInfo.AdapterName,
ClusterId: noticeInfo.ClusterId,
ClusterName: noticeInfo.ClusterName,
NoticeType: "failed",
TaskName: noticeInfo.TaskName,
Incident: "任务执行失败,请查看日志!",
CreatedTime: time.Now(),
}
gorm.Table("t_notice").Create(&noticeInfo)
return
// no Failed type status in subTask,if Saved type status appears in subTask then update mainTask to Saved
} else if pcmStatus == "Saved" && strings.Contains(allStatus, originalStatus) {
if getTaskStatus(gorm, noticeInfo.TaskId) != "Saved" {
updateTask(gorm, noticeInfo.TaskId, constants.Saved)
noticeInfo := clientCore.NoticeInfo{
AdapterId: noticeInfo.AdapterId,
AdapterName: noticeInfo.AdapterName,
ClusterId: noticeInfo.ClusterId,
ClusterName: noticeInfo.ClusterName,
NoticeType: "saved",
TaskName: noticeInfo.TaskName,
Incident: "任务已处于队列中!",
CreatedTime: time.Now(),
}
gorm.Table("t_notice").Create(&noticeInfo)
return
} else {
return
}
// no Failed and Saved type status in subTask,if Running type status appears in subTask then update mainTask to Running
} else if pcmStatus == "Running" && strings.Contains(allStatus, originalStatus) {
if getTaskStatus(gorm, noticeInfo.TaskId) != "Running" {
updateTask(gorm, noticeInfo.TaskId, constants.Running)
noticeInfo := clientCore.NoticeInfo{
AdapterId: noticeInfo.AdapterId,
AdapterName: noticeInfo.AdapterName,
ClusterId: noticeInfo.ClusterId,
ClusterName: noticeInfo.ClusterName,
NoticeType: "running",
TaskName: noticeInfo.TaskName,
Incident: "任务状态切换为运行中!",
CreatedTime: time.Now(),
}
gorm.Table("t_notice").Create(&noticeInfo)
return
} else {
return
}
// at last, mainTask should be succeeded
} else {
if strings.Contains(allStatus, originalStatus) {
updateTask(gorm, noticeInfo.TaskId, constants.Succeeded)
noticeInfo := clientCore.NoticeInfo{
AdapterId: noticeInfo.AdapterId,
AdapterName: noticeInfo.AdapterName,
ClusterId: noticeInfo.ClusterId,
ClusterName: noticeInfo.ClusterName,
NoticeType: "succeeded",
TaskName: noticeInfo.TaskName,
Incident: "任务执行完成!",
CreatedTime: time.Now(),
}
gorm.Table("t_notice").Create(&noticeInfo)
return
}
}
}
} }
// 子任务包含失败状态 主任务则失败
if strings.Contains(allStatus, constants.Failed) {
updateTask(gorm, taskId, constants.Failed)
}
if strings.Contains(allStatus, constants.Running) {
updateTaskRunning(gorm, taskId, constants.Running)
}
} }
func updateTask(gorm *gorm.DB, taskId int64, status string) { func updateTask(gorm *gorm.DB, taskId int64, status string) {
now := time.Now()
var task models.Task var task models.Task
gorm.Where("id = ? ", taskId).Find(&task) gorm.Where("id = ? ", taskId).Find(&task)
if task.Status != status { if task.Status != status {
task.Status = status task.Status = status
if status == constants.Running {
task.StartTime = &now
}
if task.Status == constants.Failed || task.Status == constants.Succeeded {
task.EndTime = &now
}
gorm.Updates(&task) gorm.Updates(&task)
} }
} }
func updateTaskRunning(gorm *gorm.DB, taskId int64, status string) { func getTaskStatus(gorm *gorm.DB, taskId int64) (status string) {
var task models.Task var task models.Task
gorm.Where("id = ? ", taskId).Find(&task) gorm.Where("id = ? ", taskId).Find(&task)
if task.Status != status { return task.Status
task.Status = status
task.StartTime = time.Now().Format("2006-01-02 15:04:05")
gorm.Updates(&task)
}
}
func removeRepeatedElement(arr []string) (newArr []string) {
newArr = make([]string, 0)
for i := 0; i < len(arr); i++ {
repeat := false
for j := i + 1; j < len(arr); j++ {
if arr[i] == arr[j] {
repeat = true
break
}
}
if !repeat {
newArr = append(newArr, arr[i])
}
}
return
} }

View File

@ -3,11 +3,11 @@ package core
import ( import (
"context" "context"
"github.com/pkg/errors" "github.com/pkg/errors"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"gorm.io/gorm"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gorm.io/gorm"
"github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
) )
@ -27,14 +27,14 @@ func NewTaskDetailsLogic(ctx context.Context, svcCtx *svc.ServiceContext) *TaskD
} }
func (l *TaskDetailsLogic) TaskDetails(req *types.FId) (resp *types.TaskDetailsResp, err error) { func (l *TaskDetailsLogic) TaskDetails(req *types.FId) (resp *types.TaskDetailsResp, err error) {
resp = &types.TaskDetailsResp{}
var task models.Task task := &models.Task{}
if errors.Is(l.svcCtx.DbEngin.Where("id", req.Id).First(&task).Error, gorm.ErrRecordNotFound) { if errors.Is(l.svcCtx.DbEngin.Where("id", req.Id).First(&task).Error, gorm.ErrRecordNotFound) {
return nil, errors.New("记录不存在") return nil, errors.New("记录不存在")
} }
clusterIds := make([]int64, 0) clusterIds := make([]int64, 0)
var cList []*types.ClusterInfo var cList []*types.ClusterInfo
switch task.TaskTypeDict { switch task.AdapterTypeDict {
case 0: case 0:
l.svcCtx.DbEngin.Table("task_cloud").Select("cluster_id").Where("task_id", task.Id).Scan(&clusterIds) l.svcCtx.DbEngin.Table("task_cloud").Select("cluster_id").Where("task_id", task.Id).Scan(&clusterIds)
case 1: case 1:
@ -48,14 +48,7 @@ func (l *TaskDetailsLogic) TaskDetails(req *types.FId) (resp *types.TaskDetailsR
if err != nil { if err != nil {
return resp, err return resp, err
} }
resp = &types.TaskDetailsResp{ utils.Convert(&task, &resp)
Name: task.Name, resp.ClusterInfos = cList
Description: task.Description,
StartTime: task.StartTime,
EndTime: task.EndTime,
Strategy: task.Strategy,
SynergyStatus: task.SynergyStatus,
ClusterInfos: cList,
}
return return
} }

View File

@ -17,6 +17,7 @@ package core
import ( import (
"context" "context"
"fmt" "fmt"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"strconv" "strconv"
"time" "time"
@ -55,6 +56,11 @@ func (l *TaskListLogic) TaskList(req *types.TaskListReq) (resp *types.TaskListRe
if len(tasks) == 0 { if len(tasks) == 0 {
return nil, nil return nil, nil
} }
// 更新智算任务状态
var ch = make(chan struct{})
go l.updateAitaskStatus(tasks, ch)
// 查询任务总数 // 查询任务总数
l.svcCtx.DbEngin.Model(&models.Task{}).Count(&resp.TotalCount) l.svcCtx.DbEngin.Model(&models.Task{}).Count(&resp.TotalCount)
@ -93,13 +99,12 @@ func (l *TaskListLogic) TaskList(req *types.TaskListReq) (resp *types.TaskListRe
pStatus = "Normal" pStatus = "Normal"
} }
} }
resp.Tasks = append(resp.Tasks, types.Task{ resp.Tasks = append(resp.Tasks, types.Task{
Id: task.Id, Id: task.Id,
Name: task.Name, Name: task.Name,
Status: task.Status, Status: task.Status,
StartTime: task.StartTime, StartTime: task.StartTime.Format("2006-01-02 15:04:05"),
EndTime: task.EndTime, EndTime: task.EndTime.Format("2006-01-02 15:04:05"),
ParticipantId: pInfo.Id, ParticipantId: pInfo.Id,
ParticipantName: pInfo.Name, ParticipantName: pInfo.Name,
ParticipantStatus: pStatus, ParticipantStatus: pStatus,
@ -107,5 +112,64 @@ func (l *TaskListLogic) TaskList(req *types.TaskListReq) (resp *types.TaskListRe
} }
return select {
case _ = <-ch:
return resp, nil
case <-time.After(1 * time.Second):
return resp, nil
}
}
func (l *TaskListLogic) updateAitaskStatus(tasks []models.Task, ch chan<- struct{}) {
for _, task := range tasks {
if task.AdapterTypeDict != 1 {
continue
}
if task.Status == constants.Succeeded {
continue
}
var aiTask []*models.TaskAi
tx := l.svcCtx.DbEngin.Raw("select * from task_ai where `task_id` = ? ", task.Id).Scan(&aiTask)
if tx.Error != nil {
logx.Errorf(tx.Error.Error())
return
}
start, _ := time.ParseInLocation(constants.Layout, aiTask[0].StartTime, time.Local)
end, _ := time.ParseInLocation(constants.Layout, aiTask[0].EndTime, time.Local)
var status = constants.Succeeded
for _, a := range aiTask {
s, _ := time.ParseInLocation(constants.Layout, a.StartTime, time.Local)
e, _ := time.ParseInLocation(constants.Layout, a.EndTime, time.Local)
if s.Before(start) {
start = s
}
if e.After(end) {
end = e
}
if a.Status == constants.Failed {
status = a.Status
break
}
if a.Status == constants.Running {
status = a.Status
continue
}
}
task.Status = status
task.StartTime = &start
task.EndTime = &end
tx = l.svcCtx.DbEngin.Updates(task)
if tx.Error != nil {
return
}
}
ch <- struct{}{}
} }

View File

@ -2,6 +2,7 @@ package hpc
import ( import (
"context" "context"
clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/api/client"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"k8s.io/apimachinery/pkg/util/json" "k8s.io/apimachinery/pkg/util/json"
@ -32,11 +33,15 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t
// 构建主任务结构体 // 构建主任务结构体
taskModel := models.Task{ taskModel := models.Task{
Status: constants.Saved, Name: req.Name,
Description: req.Description, Description: req.Description,
Name: req.Name, Status: constants.Saved,
CommitTime: time.Now(), Strategy: 0,
SynergyStatus: 0,
CommitTime: time.Now(),
AdapterTypeDict: 2,
} }
// 保存任务数据到数据库 // 保存任务数据到数据库
tx := l.svcCtx.DbEngin.Create(&taskModel) tx := l.svcCtx.DbEngin.Create(&taskModel)
if tx.Error != nil { if tx.Error != nil {
@ -44,17 +49,29 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t
} }
var clusterIds []int64 var clusterIds []int64
l.svcCtx.DbEngin.Raw("SELECT id FROM `t_cluster` where adapter_id = ? and label = ?", req.AdapterId, req.ClusterType).Scan(&clusterIds) l.svcCtx.DbEngin.Raw("SELECT id FROM `t_cluster` where adapter_id in ? and label = ?", req.AdapterIds, req.ClusterType).Scan(&clusterIds)
if len(clusterIds) == 0 || clusterIds == nil {
resp.Code = 400
resp.Msg = "no cluster found"
return resp, nil
}
var clusterName string
var adapterId int64
var adapterName string
clusterId := clusterIds[rand.Intn(len(clusterIds))]
l.svcCtx.DbEngin.Raw("SELECT nickname FROM `t_cluster` where id = ?", clusterId).Scan(&clusterName)
l.svcCtx.DbEngin.Raw("SELECT adapter_id FROM `t_cluster` where id = ?", clusterId).Scan(&adapterId)
l.svcCtx.DbEngin.Raw("SELECT name FROM `t_adapter` where id = ?", adapterId).Scan(&adapterName)
env, _ := json.Marshal(req.Environment) env, _ := json.Marshal(req.Environment)
if len(clusterIds) == 0 || clusterIds == nil {
return nil, nil
}
hpcInfo := models.TaskHpc{ hpcInfo := models.TaskHpc{
TaskId: taskModel.Id, TaskId: taskModel.Id,
ClusterId: clusterIds[rand.Intn(len(clusterIds))], AdapterId: uint(adapterId),
AdapterName: adapterName,
ClusterId: uint(clusterId),
ClusterName: clusterName,
Name: taskModel.Name, Name: taskModel.Name,
Status: "Saved", Status: "Saved",
CmdScript: req.CmdScript, CmdScript: req.CmdScript,
@ -82,6 +99,20 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t
if tx.Error != nil { if tx.Error != nil {
return nil, tx.Error return nil, tx.Error
} }
noticeInfo := clientCore.NoticeInfo{
AdapterId: adapterId,
AdapterName: adapterName,
ClusterId: clusterId,
ClusterName: clusterName,
NoticeType: "create",
TaskName: req.Name,
Incident: "任务创建中",
CreatedTime: time.Now(),
}
result := l.svcCtx.DbEngin.Table("t_notice").Create(&noticeInfo)
if result.Error != nil {
logx.Errorf("Task creation failure, err: %v", result.Error)
}
// todo mq task manage // todo mq task manage
//reqMessage, err := json.Marshal(mqInfo) //reqMessage, err := json.Marshal(mqInfo)
//if err != nil { //if err != nil {

View File

@ -0,0 +1,37 @@
package schedule
import (
"context"
"strings"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type DownloadAlgothmCodeLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewDownloadAlgothmCodeLogic(ctx context.Context, svcCtx *svc.ServiceContext) *DownloadAlgothmCodeLogic {
return &DownloadAlgothmCodeLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *DownloadAlgothmCodeLogic) DownloadAlgorithmCode(req *types.DownloadAlgorithmCodeReq) (resp *types.DownloadAlgorithmCodeResp, err error) {
resp = &types.DownloadAlgorithmCodeResp{}
code, err := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[req.AdapterId][req.ClusterId].DownloadAlgorithmCode(l.ctx,
req.ResourceType, strings.ToLower(req.Card), req.TaskType, req.Dataset, req.Algorithm)
if err != nil {
return nil, err
}
resp.Code = code
return resp, nil
}

View File

@ -0,0 +1,35 @@
package schedule
import (
"context"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type GetComputeCardsByClusterLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewGetComputeCardsByClusterLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetComputeCardsByClusterLogic {
return &GetComputeCardsByClusterLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *GetComputeCardsByClusterLogic) GetComputeCardsByCluster(req *types.GetComputeCardsByClusterReq) (resp *types.GetComputeCardsByClusterResp, err error) {
resp = &types.GetComputeCardsByClusterResp{}
cards, err := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[req.AdapterId][req.ClusterId].GetComputeCards(l.ctx)
if err != nil {
return nil, err
}
resp.Cards = cards
return resp, nil
}

View File

@ -54,11 +54,18 @@ func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *type
switch opt.GetOptionType() { switch opt.GetOptionType() {
case option.AI: case option.AI:
id, err := l.svcCtx.Scheduler.AiStorages.SaveTask(req.AiOption.TaskName) rs := (results).([]*schedulers.AiResult)
var synergystatus int64
if len(rs) > 1 {
synergystatus = 1
}
strategyCode, err := l.svcCtx.Scheduler.AiStorages.GetStrategyCode(req.AiOption.Strategy)
id, err := l.svcCtx.Scheduler.AiStorages.SaveTask(req.AiOption.TaskName, strategyCode, synergystatus)
if err != nil { if err != nil {
return nil, err return nil, err
} }
rs := (results).([]*schedulers.AiResult)
for _, r := range rs { for _, r := range rs {
scheResult := &types.ScheduleResult{} scheResult := &types.ScheduleResult{}
scheResult.ClusterId = r.ClusterId scheResult.ClusterId = r.ClusterId
@ -66,7 +73,7 @@ func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *type
scheResult.Strategy = r.Strategy scheResult.Strategy = r.Strategy
scheResult.Replica = r.Replica scheResult.Replica = r.Replica
scheResult.Msg = r.Msg scheResult.Msg = r.Msg
err := l.svcCtx.Scheduler.AiStorages.SaveAiTask(id, opt, r.ClusterId, r.TaskId, constants.Running, r.Msg) err := l.svcCtx.Scheduler.AiStorages.SaveAiTask(id, opt, r.ClusterId, r.TaskId, constants.Saved, r.Msg)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -0,0 +1,35 @@
package schedule
import (
"context"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type UploadAlgothmCodeLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewUploadAlgothmCodeLogic(ctx context.Context, svcCtx *svc.ServiceContext) *UploadAlgothmCodeLogic {
return &UploadAlgothmCodeLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *UploadAlgothmCodeLogic) UploadAlgorithmCode(req *types.UploadAlgorithmCodeReq) (resp *types.UploadAlgorithmCodeResp, err error) {
resp = &types.UploadAlgorithmCodeResp{}
err = l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[req.AdapterId][req.ClusterId].UploadAlgorithmCode(l.ctx,
req.ResourceType, req.Card, req.TaskType, req.Dataset, req.Algorithm, req.Code)
if err != nil {
return nil, err
}
return resp, nil
}

View File

@ -61,8 +61,8 @@ func (s *AiStorage) GetAdaptersByType(adapterType string) ([]*types.AdapterInfo,
return list, nil return list, nil
} }
func (s *AiStorage) GetAiTasksByAdapterId(adapterId string) ([]*types.AiTaskDb, error) { func (s *AiStorage) GetAiTasksByAdapterId(adapterId string) ([]*models.TaskAi, error) {
var resp []*types.AiTaskDb var resp []*models.TaskAi
tx := s.DbEngin.Raw("select * from task_ai where `adapter_id` = ? ", adapterId).Scan(&resp) tx := s.DbEngin.Raw("select * from task_ai where `adapter_id` = ? ", adapterId).Scan(&resp)
if tx.Error != nil { if tx.Error != nil {
logx.Errorf(tx.Error.Error()) logx.Errorf(tx.Error.Error())
@ -71,13 +71,16 @@ func (s *AiStorage) GetAiTasksByAdapterId(adapterId string) ([]*types.AiTaskDb,
return resp, nil return resp, nil
} }
func (s *AiStorage) SaveTask(name string) (int64, error) { func (s *AiStorage) SaveTask(name string, strategyCode int64, synergyStatus int64) (int64, error) {
// 构建主任务结构体 // 构建主任务结构体
taskModel := models.Task{ taskModel := models.Task{
Status: constants.Saved, Status: constants.Saved,
Description: "ai task", Description: "ai task",
Name: name, Name: name,
CommitTime: time.Now(), SynergyStatus: synergyStatus,
Strategy: strategyCode,
AdapterTypeDict: 1,
CommitTime: time.Now(),
} }
// 保存任务数据到数据库 // 保存任务数据到数据库
tx := s.DbEngin.Create(&taskModel) tx := s.DbEngin.Create(&taskModel)
@ -169,6 +172,61 @@ func (s *AiStorage) GetClusterResourcesById(clusterId string) (*models.TClusterR
return &clusterResource, nil return &clusterResource, nil
} }
func (s *AiStorage) UpdateTask() error { func (s *AiStorage) SaveClusterResources(clusterId string, clusterName string, clusterType int64, cpuAvail float64, cpuTotal float64,
memAvail float64, memTotal float64, diskAvail float64, diskTotal float64, gpuAvail float64, gpuTotal float64, cardTotal int64, topsTotal float64) error {
cId, err := strconv.ParseInt(clusterId, 10, 64)
if err != nil {
return err
}
clusterResource := models.TClusterResource{
ClusterId: cId,
ClusterName: clusterName,
ClusterType: clusterType,
CpuAvail: cpuAvail,
CpuTotal: cpuTotal,
MemAvail: memAvail,
MemTotal: memTotal,
DiskAvail: diskAvail,
DiskTotal: diskTotal,
GpuAvail: gpuAvail,
GpuTotal: gpuTotal,
CardTotal: cardTotal,
CardTopsTotal: topsTotal,
}
tx := s.DbEngin.Create(&clusterResource)
if tx.Error != nil {
return tx.Error
}
return nil return nil
} }
func (s *AiStorage) UpdateClusterResources(clusterResource *models.TClusterResource) error {
tx := s.DbEngin.Updates(clusterResource)
if tx.Error != nil {
return tx.Error
}
return nil
}
func (s *AiStorage) UpdateAiTask(task *models.TaskAi) error {
tx := s.DbEngin.Updates(task)
if tx.Error != nil {
return tx.Error
}
return nil
}
func (s *AiStorage) GetStrategyCode(name string) (int64, error) {
var strategy int64
sqlStr := `select t_dict_item.item_value
from t_dict
left join t_dict_item on t_dict.id = t_dict_item.dict_id
where item_text = ?
and t_dict.dict_code = 'schedule_Strategy'`
//查询调度策略
err := s.DbEngin.Raw(sqlStr, name).Scan(&strategy).Error
if err != nil {
return strategy, nil
}
return strategy, nil
}

View File

@ -55,10 +55,10 @@ func NewAiScheduler(ctx context.Context, val string, scheduler *scheduler.Schedu
func (as *AiScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) { func (as *AiScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) {
ai := models.Ai{ ai := models.Ai{
ParticipantId: participantId, AdapterId: participantId,
TaskId: task.TaskId, TaskId: task.TaskId,
Status: "Saved", Status: "Saved",
YamlString: as.yamlString, YamlString: as.yamlString,
} }
utils.Convert(task.Metadata, &ai) utils.Convert(task.Metadata, &ai)
return ai, nil return ai, nil
@ -174,10 +174,16 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa
} }
if len(errs) != 0 { if len(errs) != 0 {
taskId, err := as.AiStorages.SaveTask(as.option.TaskName) var synergystatus int64
if err != nil { if len(clusters) > 1 {
return nil, err synergystatus = 1
} }
strategyCode, err := as.AiStorages.GetStrategyCode(as.option.StrategyName)
taskId, err := as.AiStorages.SaveTask(as.option.TaskName, strategyCode, synergystatus)
if err != nil {
return nil, errors.New("database add failed: " + err.Error())
}
var errmsg string var errmsg string
for _, err := range errs { for _, err := range errs {
e := (err).(struct { e := (err).(struct {
@ -188,7 +194,7 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa
errmsg += msg errmsg += msg
err := as.AiStorages.SaveAiTask(taskId, as.option, e.clusterId, "", constants.Failed, msg) err := as.AiStorages.SaveAiTask(taskId, as.option, e.clusterId, "", constants.Failed, msg)
if err != nil { if err != nil {
return nil, err return nil, errors.New("database add failed: " + err.Error())
} }
} }
for s := range ch { for s := range ch {
@ -197,14 +203,14 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa
errmsg += msg errmsg += msg
err := as.AiStorages.SaveAiTask(taskId, as.option, s.ClusterId, "", constants.Failed, msg) err := as.AiStorages.SaveAiTask(taskId, as.option, s.ClusterId, "", constants.Failed, msg)
if err != nil { if err != nil {
return nil, err return nil, errors.New("database add failed: " + err.Error())
} }
} else { } else {
msg := fmt.Sprintf("clusterId: %v , submitted successfully, taskId: %v \n", s.ClusterId, s.TaskId) msg := fmt.Sprintf("clusterId: %v , submitted successfully, taskId: %v \n", s.ClusterId, s.TaskId)
errmsg += msg errmsg += msg
err := as.AiStorages.SaveAiTask(taskId, as.option, s.ClusterId, s.TaskId, constants.Succeeded, msg) err := as.AiStorages.SaveAiTask(taskId, as.option, s.ClusterId, s.TaskId, constants.Succeeded, msg)
if err != nil { if err != nil {
return nil, err return nil, errors.New("database add failed: " + err.Error())
} }
} }
} }

View File

@ -24,6 +24,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy/param" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy/param"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response" "gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models/cloud" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models/cloud"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/tracker" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/tracker"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
@ -60,7 +61,7 @@ func (as *CloudScheduler) GetNewStructForDb(task *response.TaskInfo, resource st
c := cloud.TaskCloudModel{ c := cloud.TaskCloudModel{
AdapterId: uint(participantId), AdapterId: uint(participantId),
TaskId: uint(task.TaskId), TaskId: uint(task.TaskId),
Status: "Pending", Status: constants.Saved,
YamlString: as.yamlString, YamlString: as.yamlString,
} }
utils.Convert(task.Metadata, &c) utils.Convert(task.Metadata, &c)

View File

@ -83,7 +83,7 @@ func (vm *VmScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
return strategy, nil*/ return strategy, nil*/
case strategy.STATIC_WEIGHT: case strategy.STATIC_WEIGHT:
//todo resources should match cluster StaticWeightMap //todo resources should match cluster StaticWeightMap
strategy := strategy.NewStaticWeightStrategy(vm.option.ClusterToStaticWeight, 1) strategy := strategy.NewStaticWeightStrategy(vm.option.StaticWeightMap, 1)
return strategy, nil return strategy, nil
} }

View File

@ -7,8 +7,10 @@ type AiCollector interface {
GetDatasetsSpecs(ctx context.Context) ([]*DatasetsSpecs, error) GetDatasetsSpecs(ctx context.Context) ([]*DatasetsSpecs, error)
GetAlgorithms(ctx context.Context) ([]*Algorithm, error) GetAlgorithms(ctx context.Context) ([]*Algorithm, error)
GetTrainingTaskLog(ctx context.Context, taskId string, instanceNum string) (string, error) GetTrainingTaskLog(ctx context.Context, taskId string, instanceNum string) (string, error)
DownloadAlgorithmCode(ctx context.Context, resourceType string, taskType string, dataset string, algorithm string) (string, error) GetTrainingTask(ctx context.Context, taskId string) (*Task, error)
UploadAlgorithmCode(ctx context.Context, resourceType string, taskType string, dataset string, algorithm string, code string) error DownloadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string) (string, error)
UploadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string, code string) error
GetComputeCards(ctx context.Context) ([]string, error)
} }
type ResourceStats struct { type ResourceStats struct {
@ -21,6 +23,7 @@ type ResourceStats struct {
DiskAvail float64 DiskAvail float64
DiskTotal float64 DiskTotal float64
GpuAvail int64 GpuAvail int64
GpuTotal int64
CardsAvail []*Card CardsAvail []*Card
CpuCoreHours float64 CpuCoreHours float64
Balance float64 Balance float64
@ -45,3 +48,10 @@ type Algorithm struct {
Platform string Platform string
TaskType string TaskType string
} }
type Task struct {
Id string
Start string
End string
Status string
}

View File

@ -162,11 +162,15 @@ func (m *ModelArtsLink) GetAlgorithms(ctx context.Context) ([]*collector.Algorit
return nil, nil return nil, nil
} }
func (m *ModelArtsLink) DownloadAlgorithmCode(ctx context.Context, resourceType string, taskType string, dataset string, algorithm string) (string, error) { func (m *ModelArtsLink) GetComputeCards(ctx context.Context) ([]string, error) {
return nil, nil
}
func (m *ModelArtsLink) DownloadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string) (string, error) {
return "", nil return "", nil
} }
func (m *ModelArtsLink) UploadAlgorithmCode(ctx context.Context, resourceType string, taskType string, dataset string, algorithm string, code string) error { func (m *ModelArtsLink) UploadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string, code string) error {
return nil return nil
} }
@ -174,6 +178,10 @@ func (m *ModelArtsLink) GetTrainingTaskLog(ctx context.Context, taskId string, i
return "", nil return "", nil
} }
func (m *ModelArtsLink) GetTrainingTask(ctx context.Context, taskId string) (*collector.Task, error) {
return nil, nil
}
func (m *ModelArtsLink) Execute(ctx context.Context, option *option.AiOption) (interface{}, error) { func (m *ModelArtsLink) Execute(ctx context.Context, option *option.AiOption) (interface{}, error) {
err := m.GenerateSubmitParams(ctx, option) err := m.GenerateSubmitParams(ctx, option)
if err != nil { if err != nil {

View File

@ -19,12 +19,14 @@ import (
"errors" "errors"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/JointCloud/pcm-octopus/octopus" "gitlink.org.cn/JointCloud/pcm-octopus/octopus"
"gitlink.org.cn/JointCloud/pcm-octopus/octopusclient" "gitlink.org.cn/JointCloud/pcm-octopus/octopusclient"
"math" "math"
"strconv" "strconv"
"strings" "strings"
"time"
) )
type OctopusLink struct { type OctopusLink struct {
@ -44,20 +46,24 @@ const (
SUIYUAN = "suiyuan" SUIYUAN = "suiyuan"
SAILINGSI = "sailingsi" SAILINGSI = "sailingsi"
MLU = "MLU" MLU = "MLU"
BIV100 = "BI-V100"
CAMBRICONMLU290 = 256 CAMBRICONMLU290 = 256
GCU = "GCU" GCU = "GCU"
ENFLAME = "enflame" ENFLAME = "enflame"
EnflameT20 = 128 EnflameT20 = 128
BASE_TOPS = 128 BASE_TOPS = 128
CAMBRICON = "cambricon" CAMBRICON = "cambricon"
TIANSHU = "天数"
TRAIN_CMD = "cd /code; python train.py" TRAIN_CMD = "cd /code; python train.py"
VERSION = "V1" VERSION = "V1"
DOMAIN = "http://192.168.242.41:8001/"
) )
var ( var (
cardAliasMap = map[string]string{ cardAliasMap = map[string]string{
MLU: CAMBRICON, MLU: CAMBRICON,
GCU: ENFLAME, GCU: ENFLAME,
BIV100: TIANSHU,
} }
cardTopsMap = map[string]float64{ cardTopsMap = map[string]float64{
MLU: CAMBRICONMLU290, MLU: CAMBRICONMLU290,
@ -337,11 +343,91 @@ func (o *OctopusLink) GetAlgorithms(ctx context.Context) ([]*collector.Algorithm
return algorithms, nil return algorithms, nil
} }
func (o *OctopusLink) DownloadAlgorithmCode(ctx context.Context, resourceType string, taskType string, dataset string, algorithm string) (string, error) { func (o *OctopusLink) GetComputeCards(ctx context.Context) ([]string, error) {
return "", nil var cards []string
for s, _ := range cardAliasMap {
cards = append(cards, s)
}
return cards, nil
} }
func (o *OctopusLink) UploadAlgorithmCode(ctx context.Context, resourceType string, taskType string, dataset string, algorithm string, code string) error { func (o *OctopusLink) DownloadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string) (string, error) {
var name string
if resourceType == CARD {
name = dataset + UNDERSCORE + algorithm + UNDERSCORE + card
} else {
name = dataset + UNDERSCORE + algorithm + UNDERSCORE + CPU
}
req := &octopus.GetMyAlgorithmListReq{
Platform: o.platform,
PageIndex: o.pageIndex,
PageSize: o.pageSize,
}
resp, err := o.octopusRpc.GetMyAlgorithmList(ctx, req)
if err != nil {
return "", err
}
if !resp.Success {
return "", errors.New("failed to get algorithmList")
}
var algorithmId string
for _, a := range resp.Payload.Algorithms {
if strings.ToLower(a.FrameworkName) != taskType {
continue
}
if a.AlgorithmName == name {
algorithmId = a.AlgorithmId
break
}
}
if algorithmId == "" {
return "", errors.New("algorithmId not found")
}
dcReq := &octopus.DownloadCompressReq{
Platform: o.platform,
Version: VERSION,
AlgorithmId: algorithmId,
}
dcResp, err := o.octopusRpc.DownloadCompress(ctx, dcReq)
if err != nil {
return "", err
}
if !dcResp.Success {
return "", errors.New(dcResp.Error.Message)
}
daReq := &octopus.DownloadAlgorithmReq{
Platform: o.platform,
Version: VERSION,
AlgorithmId: algorithmId,
CompressAt: dcResp.Payload.CompressAt,
Domain: DOMAIN,
}
daResp, err := o.octopusRpc.DownloadAlgorithm(ctx, daReq)
if err != nil {
return "", err
}
if !daResp.Success {
return "", errors.New(dcResp.Error.Message)
}
urlReq := &octopus.AlgorithmUrlReq{
Platform: o.platform,
Url: daResp.Payload.DownloadUrl,
}
urlResp, err := o.octopusRpc.DownloadAlgorithmUrl(ctx, urlReq)
if err != nil {
return "", err
}
return urlResp.Algorithm, nil
}
func (o *OctopusLink) UploadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string, code string) error {
return nil return nil
} }
@ -364,6 +450,35 @@ func (o *OctopusLink) GetTrainingTaskLog(ctx context.Context, taskId string, ins
return resp.Content, nil return resp.Content, nil
} }
func (o *OctopusLink) GetTrainingTask(ctx context.Context, taskId string) (*collector.Task, error) {
resp, err := o.QueryTask(ctx, taskId)
if err != nil {
return nil, err
}
jobresp := (resp).(*octopus.GetTrainJobResp)
if !jobresp.Success {
return nil, errors.New(jobresp.Error.Message)
}
var task collector.Task
task.Id = jobresp.Payload.TrainJob.Id
task.Start = time.Unix(jobresp.Payload.TrainJob.StartedAt, 0).Format(constants.Layout)
task.End = time.Unix(jobresp.Payload.TrainJob.CompletedAt, 0).Format(constants.Layout)
switch jobresp.Payload.TrainJob.Status {
case "succeeded":
task.Status = constants.Completed
case "failed":
task.Status = constants.Failed
case "running":
task.Status = constants.Running
case "stopped":
task.Status = constants.Stopped
default:
task.Status = "undefined"
}
return &task, nil
}
func (o *OctopusLink) Execute(ctx context.Context, option *option.AiOption) (interface{}, error) { func (o *OctopusLink) Execute(ctx context.Context, option *option.AiOption) (interface{}, error) {
err := o.GenerateSubmitParams(ctx, option) err := o.GenerateSubmitParams(ctx, option)
if err != nil { if err != nil {
@ -522,16 +637,6 @@ func (o *OctopusLink) generateImageId(ctx context.Context, option *option.AiOpti
} }
func (o *OctopusLink) generateAlgorithmId(ctx context.Context, option *option.AiOption) error { func (o *OctopusLink) generateAlgorithmId(ctx context.Context, option *option.AiOption) error {
// temporarily set algorithm to cnn
if option.AlgorithmName == "" {
switch option.DatasetsName {
case "cifar10":
option.AlgorithmName = "cnn"
case "mnist":
option.AlgorithmName = "fcn"
}
}
req := &octopus.GetMyAlgorithmListReq{ req := &octopus.GetMyAlgorithmListReq{
Platform: o.platform, Platform: o.platform,
PageIndex: o.pageIndex, PageIndex: o.pageIndex,

View File

@ -447,11 +447,38 @@ func (s *ShuguangAi) GetAlgorithms(ctx context.Context) ([]*collector.Algorithm,
return algorithms, nil return algorithms, nil
} }
func (s *ShuguangAi) DownloadAlgorithmCode(ctx context.Context, resourceType string, taskType string, dataset string, algorithm string) (string, error) { func (s *ShuguangAi) GetComputeCards(ctx context.Context) ([]string, error) {
return "", nil var cards []string
cards = append(cards, DCU)
return cards, nil
} }
func (s *ShuguangAi) UploadAlgorithmCode(ctx context.Context, resourceType string, taskType string, dataset string, algorithm string, code string) error { func (s *ShuguangAi) DownloadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string) (string, error) {
algoName := dataset + DASH + algorithm
req := &hpcAC.GetFileReq{
Path: ALGORITHM_DIR + FORWARD_SLASH + taskType + FORWARD_SLASH + algoName + FORWARD_SLASH + TRAIN_FILE,
}
resp, err := s.aCRpc.GetFile(ctx, req)
if err != nil {
return "", err
}
return resp.Content, nil
}
func (s *ShuguangAi) UploadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string, code string) error {
algoName := dataset + DASH + algorithm
req := &hpcAC.UploadFileReq{
Path: ALGORITHM_DIR + FORWARD_SLASH + taskType + FORWARD_SLASH + algoName + FORWARD_SLASH,
Cover: "cover",
File: code,
}
_, err := s.aCRpc.UploadFile(ctx, req)
if err != nil {
return err
}
return nil return nil
} }
@ -473,6 +500,24 @@ func (s *ShuguangAi) GetTrainingTaskLog(ctx context.Context, taskId string, inst
return resp.Data.Content, nil return resp.Data.Content, nil
} }
func (s *ShuguangAi) GetTrainingTask(ctx context.Context, taskId string) (*collector.Task, error) {
resp, err := s.QueryTask(ctx, taskId)
if err != nil {
return nil, err
}
jobresp := (resp).(*hpcAC.GetPytorchTaskResp)
if jobresp.Code != "0" {
return nil, errors.New(jobresp.Msg)
}
var task collector.Task
task.Id = jobresp.Data.Id
task.Start = jobresp.Data.StartTime
task.End = jobresp.Data.EndTime
task.Status = jobresp.Data.Status
return &task, nil
}
func (s *ShuguangAi) Execute(ctx context.Context, option *option.AiOption) (interface{}, error) { func (s *ShuguangAi) Execute(ctx context.Context, option *option.AiOption) (interface{}, error) {
err := s.GenerateSubmitParams(ctx, option) err := s.GenerateSubmitParams(ctx, option)
if err != nil { if err != nil {

View File

@ -41,6 +41,48 @@ type HomeOverviewData struct {
TaskSum int64 `json:"taskSum"` TaskSum int64 `json:"taskSum"`
} }
type PublicImageReq struct {
}
type PublicImageResp struct {
Code int `json:"code"`
Message string `json:"message"`
ImageDict []ImageDict `json:"imageRDict"`
}
type ImageDict struct {
Id int `json:"id"`
PublicImageName string `json:"public_image_name"`
}
type PublicFlavorReq struct {
}
type PublicFlavorResp struct {
Code int `json:"code"`
Message string `json:"message"`
FlavorDict []FlavorDict `json:"flavorDict"`
}
type FlavorDict struct {
Id int `json:"id"`
PublicFlavorName string `json:"public_flavor_name"`
}
type PublicNetworkReq struct {
}
type PublicNetworkResp struct {
Code int `json:"code"`
Message string `json:"message"`
NetworkDict []NetworkDict `json:"networkDict"`
}
type NetworkDict struct {
Id int `json:"id"`
PublicNetworkName string `json:"public_netWork_name"`
}
type RemoteResp struct { type RemoteResp struct {
Code int `json:"code"` Code int `json:"code"`
Message string `json:"message"` Message string `json:"message"`
@ -108,6 +150,17 @@ type GeneralTaskReq struct {
Replicas int64 `json:"replicas,string"` Replicas int64 `json:"replicas,string"`
} }
type PodLogsReq struct {
TaskId string `json:"taskId"`
TaskName string `json:"taskName"`
ClusterId string `json:"clusterId"`
ClusterName string `json:"clusterName"`
AdapterId string `json:"adapterId"`
AdapterName string `json:"adapterName"`
PodName string `json:"podName"`
Stream bool `json:"stream"`
}
type DeleteTaskReq struct { type DeleteTaskReq struct {
Id int64 `path:"id"` Id int64 `path:"id"`
} }
@ -140,8 +193,23 @@ type TaskYaml struct {
} }
type CommitVmTaskReq struct { type CommitVmTaskReq struct {
CreateMulServer []CreateMulDomainServer `json:"createMulServer,optional"` Name string `json:"name"`
VmOption *VmOption `json:"vmOption,optional"` AdapterIds []string `json:"adapterIds,optional"`
ClusterIds []string `json:"clusterIds"`
Strategy string `json:"strategy"`
StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"`
MinCount int64 `json:"min_count,optional"`
ImageRef int64 `json:"imageRef,optional"`
FlavorRef int64 `json:"flavorRef,optional"`
Uuid int64 `json:"uuid,optional"`
VmName string `json:"vm_name,optional"`
}
type TaskVm struct {
ImageRef string `json:"imageRef"`
FlavorRef string `json:"flavorRef"`
Uuid string `json:"uuid"`
Platform string `json:"platform"`
} }
type VmOption struct { type VmOption struct {
@ -261,22 +329,23 @@ type PageTaskReq struct {
} }
type TaskModel struct { type TaskModel struct {
Id int64 `json:"id,omitempty" db:"id"` // id Id int64 `json:"id,omitempty,string" db:"id"` // id
Name string `json:"name,omitempty" db:"name"` // 作业名称 Name string `json:"name,omitempty" db:"name"` // 作业名称
Description string `json:"description,omitempty" db:"description"` // 作业描述 Description string `json:"description,omitempty" db:"description"` // 作业描述
Status string `json:"status,omitempty" db:"status"` // 作业状态 Status string `json:"status,omitempty" db:"status"` // 作业状态
Strategy int64 `json:"strategy" db:"strategy"` // 策略 Strategy int64 `json:"strategy" db:"strategy"` // 策略
SynergyStatus int64 `json:"synergyStatus" db:"synergy_status"` // 协同状态0-未协同、1-已协同) SynergyStatus int64 `json:"synergyStatus" db:"synergy_status"` // 协同状态0-未协同、1-已协同)
CommitTime string `json:"commitTime,omitempty" db:"commit_time"` // 提交时间 CommitTime string `json:"commitTime,omitempty" db:"commit_time"` // 提交时间
StartTime string `json:"startTime,omitempty" db:"start_time"` // 开始时间 StartTime string `json:"startTime,omitempty" db:"start_time"` // 开始时间
EndTime string `json:"endTime,omitempty" db:"end_time"` // 结束运行时间 EndTime string `json:"endTime,omitempty" db:"end_time"` // 结束运行时间
RunningTime int64 `json:"runningTime" db:"running_time"` // 已运行时间(单位秒) RunningTime int64 `json:"runningTime" db:"running_time"` // 已运行时间(单位秒)
YamlString string `json:"yamlString,omitempty" db:"yaml_string"` YamlString string `json:"yamlString,omitempty" db:"yaml_string"`
Result string `json:"result,omitempty" db:"result"` // 作业结果 Result string `json:"result,omitempty" db:"result"` // 作业结果
DeletedAt string `json:"deletedAt,omitempty" gorm:"index" db:"deleted_at"` DeletedAt string `json:"deletedAt,omitempty" gorm:"index" db:"deleted_at"`
NsID string `json:"nsId,omitempty" db:"ns_id"` NsID string `json:"nsId,omitempty" db:"ns_id"`
TenantId string `json:"tenantId,omitempty" db:"tenant_id"` TenantId string `json:"tenantId,omitempty" db:"tenant_id"`
CreateTime string `json:"createTime,omitempty" db:"create_time" gorm:"autoCreateTime"` CreateTime string `json:"createTime,omitempty" db:"create_time" gorm:"autoCreateTime"`
AdapterTypeDict int `json:"adapterTypeDict" db:"create_time" gorm:"adapter_type_dict"` //任务类型(对应字典表的值
} }
type TaskDetailReq struct { type TaskDetailReq struct {
@ -1077,7 +1146,7 @@ type TaskStatusResp struct {
Succeeded int `json:"Succeeded"` Succeeded int `json:"Succeeded"`
Failed int `json:"Failed"` Failed int `json:"Failed"`
Running int `json:"Running"` Running int `json:"Running"`
Pause int `json:"Pause"` Saved int `json:"Saved"`
} }
type TaskDetailsResp struct { type TaskDetailsResp struct {
@ -1085,8 +1154,8 @@ type TaskDetailsResp struct {
Description string `json:"description"` Description string `json:"description"`
StartTime string `json:"startTime"` StartTime string `json:"startTime"`
EndTime string `json:"endTime"` EndTime string `json:"endTime"`
Strategy int64 `json:"strategy,string"` Strategy int64 `json:"strategy"`
SynergyStatus int64 `json:"synergyStatus,string"` SynergyStatus int64 `json:"synergyStatus"`
ClusterInfos []*ClusterInfo `json:"clusterInfos"` ClusterInfos []*ClusterInfo `json:"clusterInfos"`
} }
@ -1095,7 +1164,7 @@ type CommitHpcTaskReq struct {
Description string `json:"description,optional"` Description string `json:"description,optional"`
TenantId int64 `json:"tenantId,optional"` TenantId int64 `json:"tenantId,optional"`
TaskId int64 `json:"taskId,optional"` TaskId int64 `json:"taskId,optional"`
AdapterId string `json:"adapterId,optional"` AdapterIds []string `json:"adapterIds"`
MatchLabels map[string]string `json:"matchLabels,optional"` MatchLabels map[string]string `json:"matchLabels,optional"`
CardCount int64 `json:"cardCount,optional"` CardCount int64 `json:"cardCount,optional"`
WorkDir string `json:"workDir,optional"` //paratera:workingDir WorkDir string `json:"workDir,optional"` //paratera:workingDir
@ -5633,6 +5702,43 @@ type AiTaskDb struct {
EndTime string `json:"endTime,omitempty" db:"end_time"` EndTime string `json:"endTime,omitempty" db:"end_time"`
} }
type DownloadAlgorithmCodeReq struct {
AdapterId string `form:"adapterId"`
ClusterId string `form:"clusterId"`
ResourceType string `form:"resourceType"`
Card string `form:"card"`
TaskType string `form:"taskType"`
Dataset string `form:"dataset"`
Algorithm string `form:"algorithm"`
}
type DownloadAlgorithmCodeResp struct {
Code string `json:"code"`
}
type UploadAlgorithmCodeReq struct {
AdapterId string `json:"adapterId"`
ClusterId string `json:"clusterId"`
ResourceType string `json:"resourceType"`
Card string `json:"card"`
TaskType string `json:"taskType"`
Dataset string `json:"dataset"`
Algorithm string `json:"algorithm"`
Code string `json:"code"`
}
type UploadAlgorithmCodeResp struct {
}
type GetComputeCardsByClusterReq struct {
AdapterId string `path:"adapterId"`
ClusterId string `path:"clusterId"`
}
type GetComputeCardsByClusterResp struct {
Cards []string `json:"cards"`
}
type CreateAlertRuleReq struct { type CreateAlertRuleReq struct {
CLusterId string `json:"clusterId"` CLusterId string `json:"clusterId"`
ClusterName string `json:"clusterName"` ClusterName string `json:"clusterName"`

2
go.mod
View File

@ -26,7 +26,7 @@ require (
github.com/zeromicro/go-zero v1.6.3 github.com/zeromicro/go-zero v1.6.3
gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240426095603-549fefd8bece gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240426095603-549fefd8bece
gitlink.org.cn/JointCloud/pcm-kubernetes v0.0.0-20240301071143-347480abff2c gitlink.org.cn/JointCloud/pcm-kubernetes v0.0.0-20240301071143-347480abff2c
gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240424085753-6899615e9142 gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240510133934-6a5526289b35
gitlink.org.cn/JointCloud/pcm-openstack v0.0.0-20240403033338-e7edabad4203 gitlink.org.cn/JointCloud/pcm-openstack v0.0.0-20240403033338-e7edabad4203
gitlink.org.cn/JointCloud/pcm-slurm v0.0.0-20240301080743-8b94bbaf57f5 gitlink.org.cn/JointCloud/pcm-slurm v0.0.0-20240301080743-8b94bbaf57f5
gitlink.org.cn/jcce-pcm/pcm-participant-ceph v0.0.0-20230904090036-24fc730ec87d gitlink.org.cn/jcce-pcm/pcm-participant-ceph v0.0.0-20230904090036-24fc730ec87d

4
go.sum
View File

@ -1082,8 +1082,8 @@ gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240426095603-549fefd8bece h1:W3yBnvAVV
gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240426095603-549fefd8bece/go.mod h1:w3Nb5TNymCItQ7K3x4Q0JLuoq9OerwAzAWT2zsPE9Xo= gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240426095603-549fefd8bece/go.mod h1:w3Nb5TNymCItQ7K3x4Q0JLuoq9OerwAzAWT2zsPE9Xo=
gitlink.org.cn/JointCloud/pcm-kubernetes v0.0.0-20240301071143-347480abff2c h1:2Wl/hvaSFjh6fmCSIQhjkr9llMRREQeqcXNLZ/HPY18= gitlink.org.cn/JointCloud/pcm-kubernetes v0.0.0-20240301071143-347480abff2c h1:2Wl/hvaSFjh6fmCSIQhjkr9llMRREQeqcXNLZ/HPY18=
gitlink.org.cn/JointCloud/pcm-kubernetes v0.0.0-20240301071143-347480abff2c/go.mod h1:lSRfGs+PxFvw7CcndHWRd6UlLlGrZn0b0hp5cfaMNGw= gitlink.org.cn/JointCloud/pcm-kubernetes v0.0.0-20240301071143-347480abff2c/go.mod h1:lSRfGs+PxFvw7CcndHWRd6UlLlGrZn0b0hp5cfaMNGw=
gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240424085753-6899615e9142 h1:+po0nesBDSWsgCySBG7eEXk7i9Ytd58wqvjL1M9y6d8= gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240510133934-6a5526289b35 h1:E2QfpS3Y0FjR8Zyv5l2Ti/2NetQFqHG66c8+T/+J1u0=
gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240424085753-6899615e9142/go.mod h1:QOD5+/l2D+AYBjF2h5T0mdJyfGAmF78QmeKdbBXbjLQ= gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240510133934-6a5526289b35/go.mod h1:QOD5+/l2D+AYBjF2h5T0mdJyfGAmF78QmeKdbBXbjLQ=
gitlink.org.cn/JointCloud/pcm-openstack v0.0.0-20240403033338-e7edabad4203 h1:s6PsZ1+bev294IWdZRlV7mnOwI1+UzFcldVW/BqhQzI= gitlink.org.cn/JointCloud/pcm-openstack v0.0.0-20240403033338-e7edabad4203 h1:s6PsZ1+bev294IWdZRlV7mnOwI1+UzFcldVW/BqhQzI=
gitlink.org.cn/JointCloud/pcm-openstack v0.0.0-20240403033338-e7edabad4203/go.mod h1:i2rrbMQ+Fve345BY9Heh4MUqVTAimZQElQhzzRee5B8= gitlink.org.cn/JointCloud/pcm-openstack v0.0.0-20240403033338-e7edabad4203/go.mod h1:i2rrbMQ+Fve345BY9Heh4MUqVTAimZQElQhzzRee5B8=
gitlink.org.cn/JointCloud/pcm-slurm v0.0.0-20240301080743-8b94bbaf57f5 h1:+/5vnzkJBfMRnya1NrhOzlroUtRa5ePiYbPKlHLoLV0= gitlink.org.cn/JointCloud/pcm-slurm v0.0.0-20240301080743-8b94bbaf57f5 h1:+/5vnzkJBfMRnya1NrhOzlroUtRa5ePiYbPKlHLoLV0=

View File

@ -27,4 +27,5 @@ const (
WaitPause = "WaitPause" WaitPause = "WaitPause"
WaitStart = "WaitStart" WaitStart = "WaitStart"
Pending = "Pending" Pending = "Pending"
Stopped = "Stopped"
) )

3
pkg/constants/time.go Normal file
View File

@ -0,0 +1,3 @@
package constants
const Layout = "2006-01-02 15:04:05"

View File

@ -36,19 +36,22 @@ type (
} }
Ai struct { Ai struct {
Id int64 `db:"id"` // id Id int64 `db:"id"` // id
TaskId int64 `db:"task_id"` // 任务id TaskId int64 `db:"task_id"` // 任务id
ParticipantId int64 `db:"participant_id"` // 集群静态信息id AdapterId int64 `db:"adapter_id"` // 适配器id
ProjectId string `db:"project_id"` // 项目id AdapterName string `db:"adapter_name"` //适配器名称
Name string `db:"name"` // 名称 ClusterId int64 `db:"cluster_id"` //集群id
Status string `db:"status"` // 状态 ClusterName string `db:"cluster_name"` //集群名称
StartTime string `db:"start_time"` // 开始时间 ProjectId string `db:"project_id"` // 项目id
RunningTime int64 `db:"running_time"` // 运行时间 Name string `db:"name"` // 名称
CreatedBy int64 `db:"created_by"` // 创建人 Status string `db:"status"` // 状态
CreatedTime sql.NullTime `db:"created_time"` // 创建时间 StartTime string `db:"start_time"` // 开始时间
UpdatedBy int64 `db:"updated_by"` // 更新人 RunningTime int64 `db:"running_time"` // 运行时间
UpdatedTime sql.NullTime `db:"updated_time"` // 更新时间 CreatedBy int64 `db:"created_by"` // 创建人
DeletedFlag int64 `db:"deleted_flag"` // 是否删除0-否1-是) CreatedTime sql.NullTime `db:"created_time"` // 创建时间
UpdatedBy int64 `db:"updated_by"` // 更新人
UpdatedTime sql.NullTime `db:"updated_time"` // 更新时间
DeletedFlag int64 `db:"deleted_flag"` // 是否删除0-否1-是)
Result string `db:"result"` Result string `db:"result"`
YamlString string `db:"yaml_string"` YamlString string `db:"yaml_string"`
JobId string `db:"job_id"` JobId string `db:"job_id"`

View File

@ -9,6 +9,7 @@ type TaskCloudModel struct {
Id uint `json:"id" gorm:"primarykey;not null;comment:id"` Id uint `json:"id" gorm:"primarykey;not null;comment:id"`
TaskId uint `json:"taskId" gorm:"not null;comment:task表id"` TaskId uint `json:"taskId" gorm:"not null;comment:task表id"`
AdapterId uint `json:"adapterId" gorm:"not null;comment:适配器id"` AdapterId uint `json:"adapterId" gorm:"not null;comment:适配器id"`
AdapterName string `json:"adapterName" gorm:"not null;comment:适配器名称"`
ClusterId uint `json:"clusterId" gorm:"not null;comment:集群id"` ClusterId uint `json:"clusterId" gorm:"not null;comment:集群id"`
ClusterName string `json:"clusterName" gorm:"not null;comment:集群名称"` ClusterName string `json:"clusterName" gorm:"not null;comment:集群名称"`
Kind string `json:"kind" gorm:"comment:种类"` Kind string `json:"kind" gorm:"comment:种类"`

View File

@ -36,12 +36,15 @@ type (
} }
TaskHpc struct { TaskHpc struct {
Id int64 `db:"id"` // id Id int64 `db:"id"` // id
TaskId int64 `db:"task_id"` // 任务id TaskId int64 `db:"task_id"` // 任务id
JobId string `db:"job_id"` // 作业id(在第三方系统中的作业id) JobId string `db:"job_id"` // 作业id(在第三方系统中的作业id)
ClusterId int64 `db:"cluster_id"` // 执行任务的集群id AdapterId uint `db:"adapter_d"` // 适配器id
Name string `db:"name"` // 名称 AdapterName string `db:"adapter_name"` //适配器名称
Status string `db:"status"` // 状态 ClusterId uint `db:"cluster_id"` //集群id
ClusterName string `db:"cluster_name"` //集群名称
Name string `db:"name"` // 名称
Status string `db:"status"` // 状态
CmdScript string `db:"cmd_script"` CmdScript string `db:"cmd_script"`
StartTime string `db:"start_time"` // 开始时间 StartTime string `db:"start_time"` // 开始时间
RunningTime int64 `db:"running_time"` // 运行时间 RunningTime int64 `db:"running_time"` // 运行时间

View File

@ -35,21 +35,21 @@ type (
} }
Task struct { Task struct {
Id int64 `db:"id"` // id Id int64 `db:"id"` // id
Name string `db:"name"` // 作业名称 Name string `db:"name"` // 作业名称
Description string `db:"description"` // 作业描述 Description string `db:"description"` // 作业描述
Status string `db:"status"` // 作业状态 Status string `db:"status"` // 作业状态
Strategy int64 `db:"strategy"` // 策略 Strategy int64 `db:"strategy"` // 策略
SynergyStatus int64 `db:"synergy_status"` // 协同状态0-未协同、1-已协同) SynergyStatus int64 `db:"synergy_status"` // 协同状态0-未协同、1-已协同)
CommitTime time.Time `db:"commit_time"` // 提交时间 CommitTime time.Time `db:"commit_time"` // 提交时间
StartTime string `db:"start_time"` // 开始时间 StartTime *time.Time `db:"start_time"` // 开始时间
EndTime string `db:"end_time"` // 结束运行时间 EndTime *time.Time `db:"end_time"` // 结束运行时间
RunningTime int64 `db:"running_time"` // 已运行时间(单位秒) RunningTime int64 `db:"running_time"` // 已运行时间(单位秒)
YamlString string `db:"yaml_string"` YamlString string `db:"yaml_string"`
Result string `db:"result"` // 作业结果 Result string `db:"result"` // 作业结果
DeletedAt gorm.DeletedAt `gorm:"index"` DeletedAt gorm.DeletedAt `gorm:"index"`
NsID string `db:"ns_id"` NsID string `db:"ns_id"`
TaskTypeDict int `db:"task_type_dict"` //任务类型(对应字典表的值) AdapterTypeDict int `db:"adapter_type_dict"` //任务类型(对应字典表的值)
} }
) )

View File

@ -35,17 +35,18 @@ type (
} }
TaskVm struct { TaskVm struct {
Id int64 `db:"id"` // id Id int64 `db:"id"` // id
ParticipantId int64 `db:"participant_id"` // p端id TaskId int64 `db:"task_id"` // 任务id
TaskId int64 `db:"task_id"` // 任务id Name string `db:"name"` // 虚拟机名称
Name string `db:"name"` // 虚拟机名称 AdapterId int64 `db:"adapter_id"` // 执行任务的适配器id
AdapterId int64 `db:"adapter_id"` // 执行任务的适配器id AdapterName string `db:"adapter_name"` // 适配器名称
ClusterId int64 `db:"cluster_id"` // 执行任务的集群id ClusterId int64 `db:"cluster_id"` // 执行任务的集群id
FlavorRef string `db:"flavor_ref"` // 规格索引 ClusterName string `db:"cluster_name"` // 集群名称
ImageRef string `db:"image_ref"` // 镜像索引 FlavorRef string `db:"flavor_ref"` // 规格索引
Status string `db:"status"` // 状态 ImageRef string `db:"image_ref"` // 镜像索引
Platform string `db:"platform"` // 平台 Status string `db:"status"` // 状态
Description string `db:"description"` // 描述 Platform string `db:"platform"` // 平台
Description string `db:"description"` // 描述
AvailabilityZone string `db:"availability_zone"` AvailabilityZone string `db:"availability_zone"`
MinCount int64 `db:"min_count"` // 数量 MinCount int64 `db:"min_count"` // 数量
Uuid string `db:"uuid"` // 网络id Uuid string `db:"uuid"` // 网络id
@ -91,14 +92,14 @@ func (m *defaultTaskVmModel) FindOne(ctx context.Context, id int64) (*TaskVm, er
} }
func (m *defaultTaskVmModel) Insert(ctx context.Context, data *TaskVm) (sql.Result, error) { func (m *defaultTaskVmModel) Insert(ctx context.Context, data *TaskVm) (sql.Result, error) {
query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, taskVmRowsExpectAutoSet) query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, taskVmRowsExpectAutoSet)
ret, err := m.conn.ExecCtx(ctx, query, data.ParticipantId, data.TaskId, data.Name, data.AdapterId, data.ClusterId, data.FlavorRef, data.ImageRef, data.Status, data.Platform, data.Description, data.AvailabilityZone, data.MinCount, data.Uuid, data.StartTime, data.RunningTime, data.Result, data.DeletedAt) ret, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.Name, data.AdapterId, data.AdapterName, data.ClusterId, data.ClusterName, data.FlavorRef, data.ImageRef, data.Status, data.Platform, data.Description, data.AvailabilityZone, data.MinCount, data.Uuid, data.StartTime, data.RunningTime, data.Result, data.DeletedAt)
return ret, err return ret, err
} }
func (m *defaultTaskVmModel) Update(ctx context.Context, data *TaskVm) error { func (m *defaultTaskVmModel) Update(ctx context.Context, data *TaskVm) error {
query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, taskVmRowsWithPlaceHolder) query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, taskVmRowsWithPlaceHolder)
_, err := m.conn.ExecCtx(ctx, query, data.ParticipantId, data.TaskId, data.Name, data.AdapterId, data.ClusterId, data.FlavorRef, data.ImageRef, data.Status, data.Platform, data.Description, data.AvailabilityZone, data.MinCount, data.Uuid, data.StartTime, data.RunningTime, data.Result, data.DeletedAt, data.Id) _, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.Name, data.AdapterId, data.AdapterName, data.ClusterId, data.ClusterName, data.FlavorRef, data.ImageRef, data.Status, data.Platform, data.Description, data.AvailabilityZone, data.MinCount, data.Uuid, data.StartTime, data.RunningTime, data.Result, data.DeletedAt, data.Id)
return err return err
} }

View File

@ -19,7 +19,9 @@ import (
) )
var timeTemplates = []string{ var timeTemplates = []string{
"2006-01-02 15:04:05", //常规类型 "2006-01-02T15:04:05Z07:00", //RFC3339
"2006-01-02 15:04:05", //常规类型
"2006/01/02T15:04:05Z07:00", //RFC3339
"2006/01/02 15:04:05", "2006/01/02 15:04:05",
"2006-01-02", "2006-01-02",
"2006/01/02", "2006/01/02",

View File

@ -1,37 +1,27 @@
FROM golang:1.21.2-alpine3.18 AS builder FROM golang:1.21.2-alpine3.18 AS builder
LABEL stage=gobuilder
ENV CGO_ENABLED 0
ENV GOARCH amd64
ENV GOPROXY https://goproxy.cn,direct
RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.sjtug.sjtu.edu.cn/g' /etc/apk/repositories && \
apk update --no-cache && apk add --no-cache tzdata
WORKDIR /app WORKDIR /app
ADD go.mod .
ADD go.sum .
RUN go mod download
COPY . . COPY . .
COPY rpc/etc/ /app/
RUN go env -w GO111MODULE=on \
&& go env -w GOPROXY=https://goproxy.cn,direct \
&& go env -w CGO_ENABLED=0 \
&& go mod download
RUN go build -o pcm-coordinator-rpc /app/rpc/pcmcore.go RUN go build -o pcm-coordinator-rpc /app/rpc/pcmcore.go
FROM alpine:3.16.2 FROM alpine:3.18
WORKDIR /app WORKDIR /app
#修改alpine源为上海交通大学
RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.sjtug.sjtu.edu.cn/g' /etc/apk/repositories && \ RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.sjtug.sjtu.edu.cn/g' /etc/apk/repositories && \
apk update && \ apk add --no-cache ca-certificates tzdata && \
apk upgrade && \ update-ca-certificates && \
apk add --no-cache ca-certificates && update-ca-certificates && \ rm -rf /var/cache/apk/*
apk add --update tzdata && \
rm -rf /var/cache/apk/*
COPY --from=builder /app/pcm-coordinator-rpc . COPY --from=builder /app/pcm-coordinator-api /app/
COPY rpc/etc/pcmcore.yaml . COPY --from=builder /app/api/etc/pcm.yaml /app/
ENV TZ=Asia/Shanghai ENV TZ=Asia/Shanghai