diff --git a/include/libs/monitorfw/taos_counter.h b/include/libs/monitorfw/taos_counter.h index 28a9eed41c..a9d196d8ec 100644 --- a/include/libs/monitorfw/taos_counter.h +++ b/include/libs/monitorfw/taos_counter.h @@ -15,7 +15,7 @@ #ifndef TAOS_COUNTER_H #define TAOS_COUNTER_H - +#include #include #include "taos_metric.h" @@ -99,4 +99,7 @@ int taos_counter_inc(taos_counter_t *self, const char **label_values); */ int taos_counter_add(taos_counter_t *self, double r_value, const char **label_values); +int taos_counter_get_vgroup_ids(taos_counter_t *self, char ***keys, int32_t **vgroup_ids, int *list_size); +int taos_counter_get_keys_size(taos_counter_t *self); +int taos_counter_delete(taos_counter_t *self, char *key); #endif // TAOS_COUNTER_H diff --git a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h index be9ff56674..18b3f66a60 100644 --- a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h +++ b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h @@ -23,26 +23,27 @@ extern "C" { #endif typedef struct SDnodeMgmt { - SDnodeData *pData; - SMsgCb msgCb; - const char *path; - const char *name; - TdThread statusThread; - TdThread notifyThread; - TdThread monitorThread; - TdThread auditThread; - TdThread crashReportThread; - SSingleWorker mgmtWorker; - ProcessCreateNodeFp processCreateNodeFp; - ProcessAlterNodeTypeFp processAlterNodeTypeFp; - ProcessDropNodeFp processDropNodeFp; - SendMonitorReportFp sendMonitorReportFp; - SendAuditRecordsFp sendAuditRecordsFp; - GetVnodeLoadsFp getVnodeLoadsFp; - GetVnodeLoadsFp getVnodeLoadsLiteFp; - GetMnodeLoadsFp getMnodeLoadsFp; - GetQnodeLoadsFp getQnodeLoadsFp; - int32_t statusSeq; + SDnodeData *pData; + SMsgCb msgCb; + const char *path; + const char *name; + TdThread statusThread; + TdThread notifyThread; + TdThread monitorThread; + TdThread auditThread; + TdThread crashReportThread; + SSingleWorker mgmtWorker; + ProcessCreateNodeFp processCreateNodeFp; + ProcessAlterNodeTypeFp processAlterNodeTypeFp; + ProcessDropNodeFp processDropNodeFp; + SendMonitorReportFp sendMonitorReportFp; + MonitorCleanExpiredSamplesFp monitorCleanExpiredSamplesFp; + SendAuditRecordsFp sendAuditRecordsFp; + GetVnodeLoadsFp getVnodeLoadsFp; + GetVnodeLoadsFp getVnodeLoadsLiteFp; + GetMnodeLoadsFp getMnodeLoadsFp; + GetQnodeLoadsFp getQnodeLoadsFp; + int32_t statusSeq; } SDnodeMgmt; // dmHandle.c diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c index 958b411881..d7170398fb 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c @@ -65,6 +65,7 @@ static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { pMgmt->processAlterNodeTypeFp = pInput->processAlterNodeTypeFp; pMgmt->processDropNodeFp = pInput->processDropNodeFp; pMgmt->sendMonitorReportFp = pInput->sendMonitorReportFp; + pMgmt->monitorCleanExpiredSamplesFp = pInput->monitorCleanExpiredSamplesFp; pMgmt->sendAuditRecordsFp = pInput->sendAuditRecordFp; pMgmt->getVnodeLoadsFp = pInput->getVnodeLoadsFp; pMgmt->getVnodeLoadsLiteFp = pInput->getVnodeLoadsLiteFp; diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c index aeb519596d..4cfadc8f59 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c @@ -168,6 +168,7 @@ static void *dmMonitorThreadFp(void *param) { float interval = (curTime - lastTime) / 1000.0f; if (interval >= tsMonitorInterval) { (*pMgmt->sendMonitorReportFp)(); + (*pMgmt->monitorCleanExpiredSamplesFp)(); lastTime = curTime; trimCount = (trimCount + 1) % TRIM_FREQ; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 4cb6c5c724..2abf292e73 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -14,8 +14,11 @@ */ #define _DEFAULT_SOURCE +#include "taos_monitor.h" #include "vmInt.h" +extern taos_counter_t *tsInsertCounter; + void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) { pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad)); if (pInfo->pVloads == NULL) return; @@ -117,6 +120,34 @@ void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) { taosArrayDestroy(pVloads); } +void vmCleanExpriedSamples(SVnodeMgmt *pMgmt) { + int list_size = taos_counter_get_keys_size(tsInsertCounter); + if (list_size == 0) return; + int32_t *vgroup_ids; + char **keys; + int r = 0; + r = taos_counter_get_vgroup_ids(tsInsertCounter, &keys, &vgroup_ids, &list_size); + if (r) { + dError("failed to get vgroup ids"); + return; + } + (void)taosThreadRwlockRdlock(&pMgmt->lock); + for (int i = 0; i < list_size; i++) { + int32_t vgroup_id = vgroup_ids[i]; + void *vnode = taosHashGet(pMgmt->hash, &vgroup_id, sizeof(int32_t)); + if (vnode == NULL) { + r = taos_counter_delete(tsInsertCounter, keys[i]); + if (r) { + dError("failed to delete monitor sample key:%s", keys[i]); + } + } + } + (void)taosThreadRwlockUnlock(&pMgmt->lock); + if (vgroup_ids) taosMemoryFree(vgroup_ids); + if (keys) taosMemoryFree(keys); + return; +} + static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) { memcpy(pCfg, &vnodeCfgDefault, sizeof(SVnodeCfg)); diff --git a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h index 9548d0cad9..5196987f28 100644 --- a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h +++ b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h @@ -128,6 +128,7 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); // dmMonitor.c void dmSendMonitorReport(); +void dmMonitorCleanExpiredSamples(); void dmSendAuditRecords(); void dmGetVnodeLoads(SMonVloadInfo *pInfo); void dmGetVnodeLoadsLite(SMonVloadInfo *pInfo); diff --git a/source/dnode/mgmt/node_mgmt/inc/dmNodes.h b/source/dnode/mgmt/node_mgmt/inc/dmNodes.h index 7d635c6bdc..2561a13b92 100644 --- a/source/dnode/mgmt/node_mgmt/inc/dmNodes.h +++ b/source/dnode/mgmt/node_mgmt/inc/dmNodes.h @@ -39,6 +39,8 @@ void vmGetVnodeLoadsLite(void *pMgmt, SMonVloadInfo *pInfo); void mmGetMnodeLoads(void *pMgmt, SMonMloadInfo *pInfo); void qmGetQnodeLoads(void *pMgmt, SQnodeLoad *pInfo); +void vmCleanExpriedSamples(void *pMgmt); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/node_mgmt/src/dmEnv.c b/source/dnode/mgmt/node_mgmt/src/dmEnv.c index 0a75847d96..620aed709f 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmEnv.c +++ b/source/dnode/mgmt/node_mgmt/src/dmEnv.c @@ -409,6 +409,7 @@ SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) { .processAlterNodeTypeFp = dmProcessAlterNodeTypeReq, .processDropNodeFp = dmProcessDropNodeReq, .sendMonitorReportFp = dmSendMonitorReport, + .monitorCleanExpiredSamplesFp = dmMonitorCleanExpiredSamples, .sendAuditRecordFp = auditSendRecordsInBatch, .getVnodeLoadsFp = dmGetVnodeLoads, .getVnodeLoadsLiteFp = dmGetVnodeLoadsLite, diff --git a/source/dnode/mgmt/node_mgmt/src/dmMonitor.c b/source/dnode/mgmt/node_mgmt/src/dmMonitor.c index d3197282b6..ce0b2b59e0 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMonitor.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMonitor.c @@ -33,8 +33,8 @@ static void dmGetMonitorBasicInfoBasic(SDnode *pDnode, SMonBasicInfo *pInfo) { } static void dmGetMonitorDnodeInfo(SDnode *pDnode, SMonDnodeInfo *pInfo) { - //pInfo->uptime = (taosGetTimestampMs() - pDnode->data.rebootTime) / (86400000.0f); - pInfo->uptime = (taosGetTimestampMs() - pDnode->data.rebootTime) /1000.0f; + // pInfo->uptime = (taosGetTimestampMs() - pDnode->data.rebootTime) / (86400000.0f); + pInfo->uptime = (taosGetTimestampMs() - pDnode->data.rebootTime) / 1000.0f; pInfo->has_mnode = pDnode->wrappers[MNODE].required; pInfo->has_qnode = pDnode->wrappers[QNODE].required; pInfo->has_snode = pDnode->wrappers[SNODE].required; @@ -52,6 +52,17 @@ static void dmGetDmMonitorInfo(SDnode *pDnode) { monSetDmInfo(&dmInfo); } +void dmCleanExpriedSamples(SDnode *pDnode) { + SMgmtWrapper *pWrapper = &pDnode->wrappers[VNODE]; + if (dmMarkWrapper(pWrapper) == 0) { + if (pWrapper->pMgmt != NULL) { + vmCleanExpriedSamples(pWrapper->pMgmt); + } + } + dmReleaseWrapper(pWrapper); + return; +} + static void dmGetDmMonitorInfoBasic(SDnode *pDnode) { SMonDmInfo dmInfo = {0}; dmGetMonitorBasicInfoBasic(pDnode, &dmInfo.basic); @@ -123,11 +134,17 @@ void dmSendMonitorReport() { monGenAndSendReport(); } -//Todo: put this in seperate file in the future -void dmSendAuditRecords() { - auditSendRecordsInBatch(); +void dmMonitorCleanExpiredSamples() { + if (!tsEnableMonitor || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0) return; + dTrace("clean monitor expired samples"); + + SDnode *pDnode = dmInstance(); + (void)dmCleanExpriedSamples(pDnode); } +// Todo: put this in seperate file in the future +void dmSendAuditRecords() { auditSendRecordsInBatch(); } + void dmGetVnodeLoads(SMonVloadInfo *pInfo) { SDnode *pDnode = dmInstance(); SMgmtWrapper *pWrapper = &pDnode->wrappers[VNODE]; diff --git a/source/dnode/mgmt/node_util/inc/dmUtil.h b/source/dnode/mgmt/node_util/inc/dmUtil.h index 4ad4ea7c30..425f10392f 100644 --- a/source/dnode/mgmt/node_util/inc/dmUtil.h +++ b/source/dnode/mgmt/node_util/inc/dmUtil.h @@ -116,6 +116,7 @@ typedef enum { typedef int32_t (*ProcessCreateNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg); typedef int32_t (*ProcessDropNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg); typedef void (*SendMonitorReportFp)(); +typedef void (*MonitorCleanExpiredSamplesFp)(); typedef void (*SendAuditRecordsFp)(); typedef void (*GetVnodeLoadsFp)(SMonVloadInfo *pInfo); typedef void (*GetMnodeLoadsFp)(SMonMloadInfo *pInfo); @@ -146,21 +147,22 @@ typedef struct { } SDnodeData; typedef struct { - const char *path; - const char *name; - STfs *pTfs; - SDnodeData *pData; - SMsgCb msgCb; - ProcessCreateNodeFp processCreateNodeFp; - ProcessAlterNodeTypeFp processAlterNodeTypeFp; - ProcessDropNodeFp processDropNodeFp; - SendMonitorReportFp sendMonitorReportFp; - SendAuditRecordsFp sendAuditRecordFp; - GetVnodeLoadsFp getVnodeLoadsFp; - GetVnodeLoadsFp getVnodeLoadsLiteFp; - GetMnodeLoadsFp getMnodeLoadsFp; - GetQnodeLoadsFp getQnodeLoadsFp; - StopDnodeFp stopDnodeFp; + const char *path; + const char *name; + STfs *pTfs; + SDnodeData *pData; + SMsgCb msgCb; + ProcessCreateNodeFp processCreateNodeFp; + ProcessAlterNodeTypeFp processAlterNodeTypeFp; + ProcessDropNodeFp processDropNodeFp; + SendMonitorReportFp sendMonitorReportFp; + MonitorCleanExpiredSamplesFp monitorCleanExpiredSamplesFp; + SendAuditRecordsFp sendAuditRecordFp; + GetVnodeLoadsFp getVnodeLoadsFp; + GetVnodeLoadsFp getVnodeLoadsLiteFp; + GetMnodeLoadsFp getMnodeLoadsFp; + GetQnodeLoadsFp getQnodeLoadsFp; + StopDnodeFp stopDnodeFp; } SMgmtInputOpt; typedef struct { diff --git a/source/libs/monitorfw/inc/taos_metric_formatter_i.h b/source/libs/monitorfw/inc/taos_metric_formatter_i.h index ab60359f4a..54e683fa91 100644 --- a/source/libs/monitorfw/inc/taos_metric_formatter_i.h +++ b/source/libs/monitorfw/inc/taos_metric_formatter_i.h @@ -16,6 +16,7 @@ #ifndef TAOS_METRIC_FORMATTER_I_H #define TAOS_METRIC_FORMATTER_I_H +#include // Private #include "taos_metric_formatter_t.h" @@ -57,8 +58,8 @@ int taos_metric_formatter_load_l_value(taos_metric_formatter_t *metric_formatter /** * @brief API PRIVATE Loads the formatter with a metric sample */ -int taos_metric_formatter_load_sample(taos_metric_formatter_t *metric_formatter, taos_metric_sample_t *sample, - char *ts, char *format); +int taos_metric_formatter_load_sample(taos_metric_formatter_t *metric_formatter, taos_metric_sample_t *sample, char *ts, + char *format); /** * @brief API PRIVATE Loads a metric in the string exposition format @@ -80,4 +81,5 @@ int taos_metric_formatter_clear(taos_metric_formatter_t *self); */ char *taos_metric_formatter_dump(taos_metric_formatter_t *metric_formatter); +int32_t taos_metric_formatter_get_vgroup_id(char *key); #endif // TAOS_METRIC_FORMATTER_I_H diff --git a/source/libs/monitorfw/src/taos_counter.c b/source/libs/monitorfw/src/taos_counter.c index ef7d41cf2c..096d19c190 100644 --- a/source/libs/monitorfw/src/taos_counter.c +++ b/source/libs/monitorfw/src/taos_counter.c @@ -20,13 +20,14 @@ #include "taos_alloc.h" // Private -#include "taos_test.h" #include "taos_errors.h" #include "taos_log.h" +#include "taos_metric_formatter_i.h" #include "taos_metric_i.h" #include "taos_metric_sample_i.h" #include "taos_metric_sample_t.h" #include "taos_metric_t.h" +#include "taos_test.h" taos_counter_t *taos_counter_new(const char *name, const char *help, size_t label_key_count, const char **label_keys) { return (taos_counter_t *)taos_metric_new(TAOS_COUNTER, name, help, label_key_count, label_keys); @@ -64,3 +65,49 @@ int taos_counter_add(taos_counter_t *self, double r_value, const char **label_va if (sample == NULL) return 1; return taos_metric_sample_add(sample, r_value); } + +int taos_counter_get_keys_size(taos_counter_t *self) { return self->samples->keys->size; } + +int taos_counter_get_vgroup_ids(taos_counter_t *self, char ***keys, int32_t **vgroup_ids, int *list_size) { + TAOS_TEST_PARA(self != NULL); + if (self == NULL) return 1; + if (self->type != TAOS_COUNTER) { + TAOS_LOG(TAOS_METRIC_INCORRECT_TYPE); + return 1; + } + if (self->samples == NULL) return 1; + (void)pthread_rwlock_rdlock(self->rwlock); + taos_linked_list_t *key_list = self->samples->keys; + *list_size = key_list->size; + int r = 0; + *vgroup_ids = (int32_t *)taos_malloc(*list_size * sizeof(int32_t)); + if (vgroup_ids == NULL) { + (void)pthread_rwlock_unlock(self->rwlock); + return 1; + } + *keys = (char **)taos_malloc(*list_size * sizeof(char *)); + if (keys == NULL) { + (void)pthread_rwlock_unlock(self->rwlock); + return 1; + } + int index = 0; + for (taos_linked_list_node_t *current_key = key_list->head; current_key != NULL; current_key = current_key->next) { + char *key = (char *)current_key->item; + int32_t vgroup_id = taos_metric_formatter_get_vgroup_id(key); + (*vgroup_ids)[index] = vgroup_id; + (*keys)[index] = key; + index++; + } + (void)pthread_rwlock_unlock(self->rwlock); + return r; +} + +int taos_counter_delete(taos_counter_t *self, char *key) { + TAOS_TEST_PARA(self != NULL); + if (self == NULL) return 1; + if (self->type != TAOS_COUNTER) { + TAOS_LOG(TAOS_METRIC_INCORRECT_TYPE); + return 1; + } + return taos_map_delete(self->samples, key); +} \ No newline at end of file diff --git a/source/libs/monitorfw/src/taos_metric_formatter.c b/source/libs/monitorfw/src/taos_metric_formatter.c index a20a8d919c..cb1edd30b6 100644 --- a/source/libs/monitorfw/src/taos_metric_formatter.c +++ b/source/libs/monitorfw/src/taos_metric_formatter.c @@ -156,6 +156,21 @@ int taos_metric_formatter_load_l_value(taos_metric_formatter_t *self, const char } return 0; } +int32_t taos_metric_formatter_get_vgroup_id(char *key) { + char *start, *end; + char vgroupid[10]; + start = strstr(key, "vgroup_id=\""); + if (start) { + start += strlen("vgroup_id=\""); + end = strchr(start, '\"'); + if (end) { + strncpy(vgroupid, start, end - start); + vgroupid[end - start] = '\0'; + } + return strtol(vgroupid, NULL, 10); + } + return 0; +} /* int taos_metric_formatter_load_sample(taos_metric_formatter_t *self, taos_metric_sample_t *sample, char *ts, char *format) {