diff --git a/api/internal/cron/aiTask.go b/api/internal/cron/aiCronTask.go similarity index 80% rename from api/internal/cron/aiTask.go rename to api/internal/cron/aiCronTask.go index bb970c74..b783dfc2 100644 --- a/api/internal/cron/aiTask.go +++ b/api/internal/cron/aiCronTask.go @@ -327,3 +327,79 @@ func InitAiClusterMap(conf *config.Config, clusters []types.ClusterInfo) (map[st return executorMap, collectorMap } + +func UpdateClusterResource(svc *svc.ServiceContext) { + list, err := svc.Scheduler.AiStorages.GetAdaptersByType("1") + if err != nil { + return + } + var wg sync.WaitGroup + for _, adapter := range list { + clusters, err := svc.Scheduler.AiStorages.GetClustersByAdapterId(adapter.Id) + if err != nil { + continue + } + for _, cluster := range clusters.List { + c := cluster + clusterResource, err := svc.Scheduler.AiStorages.GetClusterResourcesById(c.Id) + if err != nil { + continue + } + wg.Add(1) + go func() { + _, ok := svc.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][c.Id] + if !ok { + wg.Done() + return + } + h := http.Request{} + stat, err := svc.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][c.Id].GetResourceStats(h.Context()) + if err != nil { + wg.Done() + return + } + if stat == nil { + wg.Done() + return + } + clusterType, err := strconv.ParseInt(adapter.Type, 10, 64) + if err != nil { + wg.Done() + return + } + var cardTotal int64 + var topsTotal float64 + for _, card := range stat.CardsAvail { + cardTotal += int64(card.CardNum) + topsTotal += card.TOpsAtFp16 * float64(card.CardNum) + } + + if (models.TClusterResource{} == *clusterResource) { + err = svc.Scheduler.AiStorages.SaveClusterResources(c.Id, c.Name, clusterType, float64(stat.CpuCoreAvail), float64(stat.CpuCoreTotal), + stat.MemAvail, stat.MemTotal, stat.DiskAvail, stat.DiskTotal, float64(stat.GpuAvail), float64(stat.GpuTotal), cardTotal, topsTotal) + if err != nil { + wg.Done() + return + } + } else { + clusterResource.CardTotal = cardTotal + clusterResource.CardTopsTotal = topsTotal + clusterResource.CpuAvail = float64(stat.CpuCoreAvail) + clusterResource.CpuTotal = float64(stat.CpuCoreTotal) + clusterResource.MemAvail = stat.MemAvail + clusterResource.MemTotal = stat.MemTotal + clusterResource.DiskAvail = stat.DiskAvail + clusterResource.DiskTotal = stat.DiskTotal + + err := svc.Scheduler.AiStorages.UpdateClusterResources(clusterResource) + if err != nil { + wg.Done() + return + } + } + wg.Done() + }() + } + } + wg.Wait() +} diff --git a/api/internal/cron/cron.go b/api/internal/cron/cron.go index be7de1a7..07669fcb 100644 --- a/api/internal/cron/cron.go +++ b/api/internal/cron/cron.go @@ -44,4 +44,7 @@ func AddCronGroup(svc *svc.ServiceContext) { UpdateAiAdapterMaps(svc) }) + svc.Cron.AddFunc("30 21 * * *", func() { + UpdateClusterResource(svc) + }) } diff --git a/api/internal/logic/ai/getcenteroverviewlogic.go b/api/internal/logic/ai/getcenteroverviewlogic.go index e6a45106..bcec6855 100644 --- a/api/internal/logic/ai/getcenteroverviewlogic.go +++ b/api/internal/logic/ai/getcenteroverviewlogic.go @@ -137,6 +137,13 @@ func (l *GetCenterOverviewLogic) updateClusterResource(mu *sync.RWMutex, ch chan } else { clusterResource.CardTotal = cardTotal clusterResource.CardTopsTotal = topsTotal + clusterResource.CpuAvail = float64(stat.CpuCoreAvail) + clusterResource.CpuTotal = float64(stat.CpuCoreTotal) + clusterResource.MemAvail = stat.MemAvail + clusterResource.MemTotal = stat.MemTotal + clusterResource.DiskAvail = stat.DiskAvail + clusterResource.DiskTotal = stat.DiskTotal + err := l.svcCtx.Scheduler.AiStorages.UpdateClusterResources(clusterResource) if err != nil { mu.Unlock() diff --git a/api/internal/logic/core/syncclusterloadlogic.go b/api/internal/logic/core/syncclusterloadlogic.go index abad70e5..5a84e321 100644 --- a/api/internal/logic/core/syncclusterloadlogic.go +++ b/api/internal/logic/core/syncclusterloadlogic.go @@ -2,12 +2,11 @@ package core import ( "context" + "github.com/zeromicro/go-zero/core/logx" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/tracker" - "strconv" - - "github.com/zeromicro/go-zero/core/logx" + tool "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" ) type SyncClusterLoadLogic struct { @@ -25,24 +24,10 @@ func NewSyncClusterLoadLogic(ctx context.Context, svcCtx *svc.ServiceContext) *S } func (l *SyncClusterLoadLogic) SyncClusterLoad(req *types.SyncClusterLoadReq) error { - if len(req.ClusterLoadRecords) != 0 { - for _, record := range req.ClusterLoadRecords { - tracker.ClusterCpuUtilisationGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.CpuUtilisation) - tracker.ClusterCpuAvailGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.CpuAvail) - tracker.ClusterCpuTotalGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.CpuTotal) - - tracker.ClusterMemoryUtilisationGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.MemoryUtilisation) - tracker.ClusterMemoryAvailGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.MemoryAvail) - tracker.ClusterMemoryTotalGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.MemoryTotal) - - tracker.ClusterDiskUtilisationGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.DiskUtilisation) - tracker.ClusterDiskAvailGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.DiskAvail) - tracker.ClusterDiskTotalGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.DiskTotal) - - tracker.ClusterPodUtilisationGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.PodsUtilisation) - tracker.ClusterPodCountGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(float64(record.PodsCount)) - tracker.ClusterPodTotalGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(float64(record.PodsTotal)) - } + if nil != req.ClusterLoadRecords { + var param tracker.ClusterLoadRecord + tool.Convert(req, ¶m) + tracker.SyncClusterLoad(param) } return nil } diff --git a/api/internal/scheduler/database/aiStorage.go b/api/internal/scheduler/database/aiStorage.go index 8b0ad7ca..4fcda695 100644 --- a/api/internal/scheduler/database/aiStorage.go +++ b/api/internal/scheduler/database/aiStorage.go @@ -6,6 +6,7 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/tracker" "gorm.io/gorm" "strconv" "time" @@ -211,6 +212,17 @@ func (s *AiStorage) SaveClusterResources(clusterId string, clusterName string, c if tx.Error != nil { return tx.Error } + // prometheus + param := tracker.ClusterLoadRecord{ + ClusterName: clusterName, + CpuAvail: cpuAvail, + CpuTotal: cpuTotal, + MemoryAvail: memAvail, + MemoryTotal: memTotal, + DiskAvail: diskAvail, + DiskTotal: diskTotal, + } + tracker.SyncClusterLoad(param) return nil } @@ -219,6 +231,17 @@ func (s *AiStorage) UpdateClusterResources(clusterResource *models.TClusterResou if tx.Error != nil { return tx.Error } + // prometheus + param := tracker.ClusterLoadRecord{ + ClusterName: clusterResource.ClusterName, + CpuAvail: clusterResource.CpuAvail, + CpuTotal: clusterResource.CpuTotal, + MemoryAvail: clusterResource.MemAvail, + MemoryTotal: clusterResource.MemTotal, + DiskAvail: clusterResource.DiskAvail, + DiskTotal: clusterResource.DiskTotal, + } + tracker.SyncClusterLoad(param) return nil } diff --git a/pkg/tracker/tracker.go b/pkg/tracker/tracker.go index 7941f74b..2e8384a6 100644 --- a/pkg/tracker/tracker.go +++ b/pkg/tracker/tracker.go @@ -107,6 +107,23 @@ var ( } ) +type ClusterLoadRecord struct { + AdapterId int64 `json:"adapterId,optional"` + ClusterName string `json:"clusterName,optional"` + CpuAvail float64 `json:"cpuAvail,optional"` + CpuTotal float64 `json:"cpuTotal,optional"` + CpuUtilisation float64 `json:"cpuUtilisation,optional"` + MemoryAvail float64 `json:"memoryAvail,optional"` + MemoryUtilisation float64 `json:"memoryUtilisation,optional"` + MemoryTotal float64 `json:"memoryTotal,optional"` + DiskAvail float64 `json:"diskAvail,optional"` + DiskTotal float64 `json:"diskTotal,optional"` + DiskUtilisation float64 `json:"diskUtilisation,optional"` + PodsUtilisation float64 `json:"podsUtilisation,optional"` + PodsCount int64 `json:"podsCount,optional"` + PodsTotal int64 `json:"podsTotal,optional"` +} + func init() { prometheus.MustRegister(metrics...) } @@ -302,3 +319,21 @@ func (p Prometheus) GetRawData(expr string, o QueryOption) (model.Value, error) } return value, nil } + +func SyncClusterLoad(record ClusterLoadRecord) { + ClusterCpuUtilisationGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.CpuUtilisation) + ClusterCpuAvailGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.CpuAvail) + ClusterCpuTotalGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.CpuTotal) + + ClusterMemoryUtilisationGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.MemoryUtilisation) + ClusterMemoryAvailGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.MemoryAvail) + ClusterMemoryTotalGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.MemoryTotal) + + ClusterDiskUtilisationGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.DiskUtilisation) + ClusterDiskAvailGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.DiskAvail) + ClusterDiskTotalGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.DiskTotal) + + ClusterPodUtilisationGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.PodsUtilisation) + ClusterPodCountGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(float64(record.PodsCount)) + ClusterPodTotalGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(float64(record.PodsTotal)) +}