调度算法

Former-commit-id: 067588372806f5583ea7b8f56e771fb0d2e1da01
This commit is contained in:
tzwang 2023-08-21 11:11:10 +08:00
parent 8cf6dd1a77
commit 82b18814ad
5 changed files with 380 additions and 0 deletions

View File

@ -0,0 +1,241 @@
package algo
import (
"errors"
"gonum.org/v1/gonum/mat"
"math"
)
type pcmStrategy struct {
ProviderList []*Provider
Task *Task
StrategyList []*Strategy
}
func NewPcmStrategy(task *Task, providers ...*Provider) *pcmStrategy {
var providerList []*Provider
var res [][]int
for _, p := range providers {
p.GenMaxResourceNum(task)
providerList = append(providerList, p)
}
back_trace_task(task.Replicas, 0, providerList, 0, &res, 0)
var strategyList []*Strategy
for _, r := range res {
var path []int
var pathlist [][]int
var resourcePerProvider []int
for j, p := range providerList {
if r[j] > p.MaxReplicas {
resourcePerProvider = append(resourcePerProvider, p.MaxReplicas)
} else {
resourcePerProvider = append(resourcePerProvider, r[j])
}
}
back_trace_resource(resourcePerProvider, 0, path, &pathlist)
strategy := NewStrategy()
strategy.Tasksolution = r
strategy.Resourcesolution = pathlist
strategyList = append(strategyList, strategy)
}
return &pcmStrategy{ProviderList: providerList, Task: task, StrategyList: strategyList}
}
func (ps pcmStrategy) computeMaxScore() (*Task, error) {
maxStrategy := NewStrategy()
var maxprofit float64
//先计算出最大的利润值
for _, strategy := range ps.StrategyList {
for _, resourceSolu := range strategy.Resourcesolution {
profit := computeProfit(ps.Task, strategy.Tasksolution, resourceSolu, ps.ProviderList)
if profit > maxprofit {
maxprofit = profit
}
}
}
for _, strategy := range ps.StrategyList {
for _, resourceSolu := range strategy.Resourcesolution {
profit := computeProfit(ps.Task, strategy.Tasksolution, resourceSolu, ps.ProviderList)
highDegree := computeHighDegree(ps.Task, resourceSolu, ps.ProviderList)
valueSum := profit/maxprofit + highDegree
//将每个确定任务分配策略的最高的策略得分存储到里面
if valueSum > maxStrategy.ValueSum {
strategy.Profit = profit
strategy.HighDegree = highDegree
}
if valueSum > maxStrategy.ValueSum {
maxStrategy.ValueSum = valueSum
maxStrategy.Tasksolution = strategy.Tasksolution
newResourceSolu := [][]int{}
newResourceSolu = append(newResourceSolu, resourceSolu)
maxStrategy.Resourcesolution = newResourceSolu
maxStrategy.Profit = profit
maxStrategy.HighDegree = highDegree
}
}
}
if len(ps.ProviderList) == 0 {
return nil, errors.New("empty providers")
}
ps.Task.MaxscoreStrategy = maxStrategy // 记录该任务的最终分配策略
return ps.Task, nil
}
func computeProfit(task *Task, tasksolution []int, resourcesolution []int, providerList []*Provider) float64 {
var timeexecution int //记录任务的实际最大执行时间
var costSum float64 //该任务在多个云厂商所需支付的成本总价
for i, provider := range providerList {
//如果该厂商分的任务为0则直接跳过该厂商循环到下一厂商
if tasksolution[i] == 0 {
continue
}
//先计算下该云厂商的执行时间ddl并替换任务的最大执行时间,向上取整
t := math.Ceil(float64(tasksolution[i])/float64(resourcesolution[i])) * float64(task.Time)
if int(t) > timeexecution {
timeexecution = int(t)
}
//计算前几份资源多执行任务
forOneMoreTaskNUm := tasksolution[i] % resourcesolution[i]
for j := 0; j < resourcesolution[i]; j++ {
if j < forOneMoreTaskNUm {
t = math.Ceil(float64(tasksolution[i])/float64(resourcesolution[i])) * float64(task.Time)
} else {
t = math.Floor(float64(tasksolution[i])/float64(resourcesolution[i])) * float64(task.Time)
}
//如果这份资源分的的任务数
cost := (provider.CpuCost*task.Cpu + provider.MemCost*task.Mem + provider.DiskCost*task.Disk) * t * (math.Pow(float64(j+1), math.Log2(provider.LearnIndex)))
costSum += cost
}
}
//计算用户的支付价格pay
pay := task.Pr
if timeexecution == task.Time { //没有排队等待,且只有一个副本直接执行或者多个副本完全并行执行
if pay < costSum {
pay = costSum
}
} else if timeexecution >= task.T0 && timeexecution <= task.T1 { //有排队时间或者任务存在串行执行
if task.T1 == task.T0 { //仅有一个副本,时间中有排队时间
e := math.Exp(float64(-task.B) * float64(timeexecution-task.T1))
pay = (1 - 1/(1+e)) * task.Pr
} else { //多个副本
e := math.Exp(float64(-task.B) * float64(timeexecution-task.T1) / float64(task.T1-task.T0))
pay = (1 - 1/(1+e)) * task.Pr
}
if pay < costSum {
pay = costSum
}
} else { //超出用户满意度的完全串行时间
pay = 1 / 2 * task.Pr
if pay < costSum {
pay = costSum
}
}
profitSum := pay - costSum
return profitSum
}
func computeHighDegree(task *Task, resourcesolution []int, providerList []*Provider) float64 {
var highDegreeSum float64
// 依次计算每个云厂商的资源可用度
for i, provider := range providerList {
// 定义两个四维向量
// 未来任务资源需求比例
futureDemand := mat.NewVecDense(3, []float64{1, 1, 1})
// 定义假设按此方案分配后的剩余资源可用量,时间虽然有差异,但是先按那个时刻算吧,这里可能还要改一下
nowLeft_cpu := provider.CpuAvail - task.Cpu*float64(resourcesolution[i])
nowLeft_mem := provider.MemAvail - task.Mem*float64(resourcesolution[i])
nowLeft_disk := provider.DiskAvail - task.Disk*float64(resourcesolution[i])
nowLeft := mat.NewVecDense(3, []float64{nowLeft_cpu, nowLeft_mem, nowLeft_disk})
// 使用余弦相似度计算两个比值的相近度
// 计算向量的内积
dot_product := mat.Dot(futureDemand, nowLeft)
// 计算向量的模长
magnitude1 := mat.Norm(futureDemand, 2)
magnitude2 := mat.Norm(nowLeft, 2)
// 计算余弦相似度
cosine_similarity := dot_product / (magnitude1 * magnitude2)
highDegreeSum += cosine_similarity
}
return highDegreeSum / float64(len(providerList))
}
func back_trace_task(ReplicaNum int, DoneReplicasNum int, providerList []*Provider, staclu int, res *[][]int, sum int) {
//var count int = 0
pnum := len(providerList)
//所有的任务数都已经进行分配
if DoneReplicasNum == ReplicaNum {
var a []int
for i := 0; i < pnum; i++ {
a = append(a, providerList[i].CurReplicas)
}
*res = append(*res, a)
//(*res)[0] = append((*res)[0], a)
//count += 1
return
}
//遍历完所有的云厂商序号
if staclu >= pnum {
return
}
if providerList[staclu].CurReplicas < providerList[staclu].MaxTaskCanRun {
providerList[staclu].CurReplicas += 1
back_trace_task(ReplicaNum, DoneReplicasNum+1, providerList, staclu, res, sum)
providerList[staclu].CurReplicas -= 1
back_trace_task(ReplicaNum, DoneReplicasNum, providerList, staclu+1, res, sum)
} else {
back_trace_task(ReplicaNum, DoneReplicasNum, providerList, staclu+1, res, sum)
}
}
func back_trace_resource(list []int, i int, path []int, pathlist *[][]int) {
if i == len(list) {
var pathCopy = make([]int, len(path))
copy(pathCopy, path)
*pathlist = append(*pathlist, pathCopy)
return
}
if list[i] == 0 {
path = append(path, 0)
back_trace_resource(list, i+1, path, pathlist)
path = path[:len(path)-1]
} else {
for j := 1; j < list[i]+1; j++ {
path = append(path, j)
back_trace_resource(list, i+1, path, pathlist)
path = path[:len(path)-1]
}
}
}

View File

@ -0,0 +1,60 @@
package algo
import "math"
type Provider struct {
Pid int
CpuSum float64
MemSum float64
DiskSum float64
CpuCost float64
MemCost float64
DiskCost float64
CpuAvail float64
MemAvail float64
DiskAvail float64
CurReplicas int
MaxReplicas int
MaxTaskCanRun int //记录该厂商针对当前可以接收的最大任务数
TrustDegree float64 //信任度取值为0到1默认初始为1
LearnIndex float64 //云厂商生产成本的学习指数
PartWill float64 //合作意愿
ProfitSum float64
ProfitPerTask map[int]float64
}
func NewProvider(Pid int, CpuSum float64, MemSum float64, DiskSum float64, CpuCost float64, MemCost float64, DiskCost float64) *Provider {
return &Provider{
Pid: Pid,
CpuSum: CpuSum,
MemSum: MemSum,
DiskSum: DiskSum,
CpuCost: CpuCost,
MemCost: MemCost,
DiskCost: DiskCost,
CpuAvail: CpuSum,
MemAvail: MemSum,
DiskAvail: DiskSum,
LearnIndex: 0.9,
PartWill: 1,
}
}
func (p *Provider) GenMaxResourceNum(t *Task) {
p.MaxReplicas = int(math.Floor(p.CpuAvail / t.Cpu))
if p.MaxReplicas > int(math.Floor(p.MemAvail/t.Mem)) {
p.MaxReplicas = int(math.Floor(p.MemAvail / t.Mem))
}
if p.MaxReplicas > int(math.Floor(p.DiskAvail/t.Disk)) {
p.MaxReplicas = int(math.Floor(p.DiskAvail / t.Disk))
}
if p.MaxReplicas > 0 {
p.MaxTaskCanRun = t.Replicas
}
}
func (p Provider) name() {
}

View File

@ -0,0 +1,35 @@
package algo
type scheduleService interface {
computeMaxScore() (*Task, error)
}
func ScheduleWithFullCollaboration(scheduleService scheduleService, ProviderList []*Provider) (*Task, error) {
task, err := scheduleService.computeMaxScore()
if err != nil {
return nil, err
}
//计算任务i的resourcePerTask属性
for i, _ := range ProviderList {
tasksolu := task.MaxscoreStrategy.Tasksolution[i] // 第j个提供商分到的任务数
resourcesolu := task.MaxscoreStrategy.Resourcesolution[0][i] // 第j个提供商分到的资源数
// 在第j个云提供商处声明一个长度为资源数的链表
resourcePerTaskPerProviders := make([]int, resourcesolu)
if tasksolu > 0 {
for tasksolu > 0 {
for j := 0; j < resourcesolu; j++ {
resourcePerTaskPerProviders[j] += 1
tasksolu -= 1
}
}
} else if tasksolu == 0 {
resourcePerTaskPerProviders = []int{0}
}
task.ResourcePerTask = append(task.ResourcePerTask, resourcePerTaskPerProviders)
}
return task, nil
}

View File

@ -0,0 +1,13 @@
package algo
type Strategy struct {
Tasksolution []int //对应1种任务分配方案
Resourcesolution [][]int //有多种资源购买方案
ValueSum float64 //记录该参与联盟最大策略的总分值
Profit float64 //记录该参与联盟最大策略的利润
HighDegree float64 //记录该参与联盟最大策略的高可用度值
}
func NewStrategy() *Strategy {
return &Strategy{}
}

31
api/internal/algo/task.go Normal file
View File

@ -0,0 +1,31 @@
package algo
type Task struct {
Tid int
Replicas int //副本数
Cpu float64
Mem float64
Disk float64
Time int //单个副本的运行时间
T0 int
T1 int
Pr float64 //延迟最低用户满意度为1时用户的支付价格
B int
MaxscoreStrategy *Strategy
ResourcePerTask [][]int //存储调度后每个云厂商的具体占用资源剩余执行时间
}
func NewTask(id int, replicas int, cpu float64, mem float64, disk float64, time int, t0 int, t1 int, Pr float64) *Task {
return &Task{
Tid: id,
Replicas: replicas,
Cpu: cpu,
Mem: mem,
Disk: disk,
Time: time,
T0: t0,
T1: t1,
Pr: Pr,
B: 5,
}
}