diff --git a/include/libs/monitorfw/taos_counter.h b/include/libs/monitorfw/taos_counter.h index 28a9eed41c..a9d196d8ec 100644 --- a/include/libs/monitorfw/taos_counter.h +++ b/include/libs/monitorfw/taos_counter.h @@ -15,7 +15,7 @@ #ifndef TAOS_COUNTER_H #define TAOS_COUNTER_H - +#include #include #include "taos_metric.h" @@ -99,4 +99,7 @@ int taos_counter_inc(taos_counter_t *self, const char **label_values); */ int taos_counter_add(taos_counter_t *self, double r_value, const char **label_values); +int taos_counter_get_vgroup_ids(taos_counter_t *self, char ***keys, int32_t **vgroup_ids, int *list_size); +int taos_counter_get_keys_size(taos_counter_t *self); +int taos_counter_delete(taos_counter_t *self, char *key); #endif // TAOS_COUNTER_H diff --git a/include/util/tdef.h b/include/util/tdef.h index a750074953..46a0d01457 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -499,7 +499,7 @@ typedef enum ELogicConditionType { #ifdef WINDOWS #define TSDB_MAX_RPC_THREADS 4 // windows pipe only support 4 connections. #else -#define TSDB_MAX_RPC_THREADS 10 +#define TSDB_MAX_RPC_THREADS 50 #endif #define TSDB_QUERY_TYPE_NON_TYPE 0x00u // none type diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index e2f6eb5ff4..a208cedd65 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -349,8 +349,8 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) { STscObj* pTscObj = pRequest->pTscObj; SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest); - int64_t transporterId = 0; - TSC_ERR_RET(asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg)); + // int64_t transporterId = 0; + TSC_ERR_RET(asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg)); (void)tsem_wait(&pRequest->body.rspSem); return TSDB_CODE_SUCCESS; } @@ -406,8 +406,8 @@ int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) { SAppInstInfo* pAppInfo = getAppInfo(pRequest); SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest); - int64_t transporterId = 0; - int32_t code = asyncSendMsgToServer(pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg); + // int64_t transporterId = 0; + int32_t code = asyncSendMsgToServer(pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg); if (code) { doRequestCallback(pRequest, code); } diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 721f995719..27c1878dc8 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -296,9 +296,8 @@ void taos_fetch_whitelist_a(TAOS *taos, __taos_async_whitelist_fn_t fp, void *pa pSendInfo->fp = fetchWhiteListCallbackFn; pSendInfo->msgType = TDMT_MND_GET_USER_WHITELIST; - int64_t transportId = 0; - SEpSet epSet = getEpSet_s(&pTsc->pAppInfo->mgmtEp); - if (TSDB_CODE_SUCCESS != asyncSendMsgToServer(pTsc->pAppInfo->pTransporter, &epSet, &transportId, pSendInfo)) { + SEpSet epSet = getEpSet_s(&pTsc->pAppInfo->mgmtEp); + if (TSDB_CODE_SUCCESS != asyncSendMsgToServer(pTsc->pAppInfo->pTransporter, &epSet, NULL, pSendInfo)) { tscWarn("failed to async send msg to server"); } releaseTscObj(connId); @@ -860,9 +859,9 @@ int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) { return pResInfo->pCol[columnIndex].offset; } -int taos_is_null_by_column(TAOS_RES *res, int columnIndex, bool result[], int *rows){ - if (res == NULL || result == NULL || rows == NULL || *rows <= 0 || - columnIndex < 0 || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { +int taos_is_null_by_column(TAOS_RES *res, int columnIndex, bool result[], int *rows) { + if (res == NULL || result == NULL || rows == NULL || *rows <= 0 || columnIndex < 0 || TD_RES_TMQ_META(res) || + TD_RES_TMQ_BATCH_META(res)) { return TSDB_CODE_INVALID_PARA; } @@ -875,22 +874,22 @@ int taos_is_null_by_column(TAOS_RES *res, int columnIndex, bool result[], int *r TAOS_FIELD *pField = &pResInfo->userFields[columnIndex]; SResultColumn *pCol = &pResInfo->pCol[columnIndex]; - if (*rows > pResInfo->numOfRows){ + if (*rows > pResInfo->numOfRows) { *rows = pResInfo->numOfRows; } if (IS_VAR_DATA_TYPE(pField->type)) { - for(int i = 0; i < *rows; i++){ - if(pCol->offset[i] == -1){ + for (int i = 0; i < *rows; i++) { + if (pCol->offset[i] == -1) { result[i] = true; - }else{ + } else { result[i] = false; } } - }else{ - for(int i = 0; i < *rows; i++){ - if (colDataIsNull_f(pCol->nullbitmap, i)){ + } else { + for (int i = 0; i < *rows; i++) { + if (colDataIsNull_f(pCol->nullbitmap, i)) { result[i] = true; - }else{ + } else { result[i] = false; } } diff --git a/source/client/src/clientMonitor.c b/source/client/src/clientMonitor.c index 612f57ecdd..9ed6512352 100644 --- a/source/client/src/clientMonitor.c +++ b/source/client/src/clientMonitor.c @@ -113,15 +113,15 @@ static int32_t monitorReportAsyncCB(void* param, SDataBuf* pMsg, int32_t code) { tscError("failed to send slow log:%s, clusterId:%" PRIx64, p->data, p->clusterId); } MonitorSlowLogData tmp = {.clusterId = p->clusterId, - .type = p->type, - .fileName = p->fileName, - .pFile = p->pFile, - .offset = p->offset, - .data = NULL}; + .type = p->type, + .fileName = p->fileName, + .pFile = p->pFile, + .offset = p->offset, + .data = NULL}; if (monitorPutData2MonitorQueue(tmp) == 0) { p->fileName = NULL; } else { - if(taosCloseFile(&(p->pFile)) != 0) { + if (taosCloseFile(&(p->pFile)) != 0) { tscError("failed to close file:%p", p->pFile); } } @@ -165,8 +165,8 @@ static int32_t sendReport(void* pTransporter, SEpSet* epSet, char* pCont, MONITO pInfo->requestId = tGenIdPI64(); pInfo->requestObjRefId = 0; - int64_t transporterId = 0; - return asyncSendMsgToServer(pTransporter, epSet, &transporterId, pInfo); + // int64_t transporterId = 0; + return asyncSendMsgToServer(pTransporter, epSet, NULL, pInfo); FAILED: if (taosCloseFile(&(((MonitorSlowLogData*)param)->pFile)) != 0) { @@ -286,7 +286,7 @@ void monitorCreateClient(int64_t clusterId) { return; - fail: +fail: destroyMonitorClient(&pMonitor); taosWUnLockLatch(&monitorLock); } @@ -302,7 +302,7 @@ void monitorCreateClientCounter(int64_t clusterId, const char* name, const char* taos_counter_t* newCounter = taos_counter_new(name, help, label_key_count, label_keys); if (newCounter == NULL) return; MonitorClient* pMonitor = *ppMonitor; - if (taos_collector_add_metric(pMonitor->colector, newCounter) != 0){ + if (taos_collector_add_metric(pMonitor->colector, newCounter) != 0) { tscError("failed to add metric to collector"); (void)taos_counter_destroy(newCounter); goto end; @@ -315,7 +315,7 @@ void monitorCreateClientCounter(int64_t clusterId, const char* name, const char* tscInfo("[monitor] monitorCreateClientCounter %" PRIx64 "(%p):%s : %p.", pMonitor->clusterId, pMonitor, name, newCounter); - end: +end: taosWUnLockLatch(&monitorLock); } @@ -338,13 +338,13 @@ void monitorCounterInc(int64_t clusterId, const char* counterName, const char** tscError("monitorCounterInc not found pCounter %" PRIx64 ":%s.", clusterId, counterName); goto end; } - if (taos_counter_inc(*ppCounter, label_values) != 0){ + if (taos_counter_inc(*ppCounter, label_values) != 0) { tscError("monitorCounterInc failed to inc %" PRIx64 ":%s.", clusterId, counterName); goto end; } tscDebug("[monitor] monitorCounterInc %" PRIx64 "(%p):%s", pMonitor->clusterId, pMonitor, counterName); - end: +end: taosWUnLockLatch(&monitorLock); } @@ -413,7 +413,7 @@ static char* readFile(TdFilePtr pFile, int64_t* offset, int64_t size) { return NULL; } - if((size <= *offset)){ + if ((size <= *offset)) { tscError("invalid size:%" PRId64 ", offset:%" PRId64, size, *offset); terrno = TSDB_CODE_TSC_INTERNAL_ERROR; return NULL; @@ -510,13 +510,13 @@ static int32_t monitorReadSend(int64_t clusterId, TdFilePtr pFile, int64_t* offs } SEpSet ep = getEpSet_s(&pInst->mgmtEp); char* data = readFile(pFile, offset, size); - if(data == NULL) return terrno; + if (data == NULL) return terrno; return sendSlowLog(clusterId, data, (type == SLOW_LOG_READ_BEGINNIG ? pFile : NULL), *offset, type, fileName, pInst->pTransporter, &ep); } static void monitorSendSlowLogAtBeginning(int64_t clusterId, char** fileName, TdFilePtr pFile, int64_t offset) { - if (fileName == NULL){ + if (fileName == NULL) { return; } int64_t size = getFileSize(*fileName); @@ -525,10 +525,11 @@ static void monitorSendSlowLogAtBeginning(int64_t clusterId, char** fileName, Td tscDebug("[monitor] monitorSendSlowLogAtBeginning delete file:%s", *fileName); } else { int32_t code = monitorReadSend(clusterId, pFile, &offset, size, SLOW_LOG_READ_BEGINNIG, *fileName); - if (code == 0){ + if (code == 0) { tscDebug("[monitor] monitorSendSlowLogAtBeginning send slow log succ, clusterId:%" PRId64, clusterId); - }else{ - tscError("[monitor] monitorSendSlowLogAtBeginning send slow log failed, clusterId:%" PRId64 ",ret:%d", clusterId, code); + } else { + tscError("[monitor] monitorSendSlowLogAtBeginning send slow log failed, clusterId:%" PRId64 ",ret:%d", clusterId, + code); } *fileName = NULL; } diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 3927172b61..783815d97f 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -552,9 +552,9 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse pMsgSendInfo->fp = tmqCommitCb; pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET; - int64_t transporterId = 0; + // int64_t transporterId = 0; (void)atomic_add_fetch_32(&pParamSet->waitingRspNum, 1); - code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, pMsgSendInfo); + code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, NULL, pMsgSendInfo); if (code != 0) { (void)atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1); return code; @@ -955,8 +955,7 @@ void tmqSendHbReq(void* param, void* tmrId) { SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); - int64_t transporterId = 0; - int32_t code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); + int32_t code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo); if (code != 0) { tscError("tmqSendHbReq asyncSendMsgToServer failed"); } @@ -1436,8 +1435,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); - int64_t transporterId = 0; - code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); + code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo); if (code != 0) { goto FAIL; } @@ -2044,10 +2042,10 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p sendInfo->fp = tmqPollCb; sendInfo->msgType = TDMT_VND_TMQ_CONSUME; - int64_t transporterId = 0; - char offsetFormatBuf[TSDB_OFFSET_LEN] = {0}; + // int64_t transporterId = 0; + char offsetFormatBuf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.endOffset); - code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); + code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, NULL, sendInfo); tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, code:%d, epoch %d, req:%s,QID:0x%" PRIx64, pTmq->consumerId, pTopic->topicName, pVg->vgId, code, pTmq->epoch, offsetFormatBuf, req.reqId); if (code != 0) { @@ -3221,8 +3219,7 @@ int64_t getCommittedFromServer(tmq_t* tmq, char* tname, int32_t vgId, SEpSet* ep sendInfo->fp = tmCommittedCb; sendInfo->msgType = TDMT_VND_TMQ_VG_COMMITTEDINFO; - int64_t transporterId = 0; - code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, sendInfo); + code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, NULL, sendInfo); if (code != 0) { (void)tsem2_destroy(&pParam->sem); taosMemoryFree(pParam); @@ -3498,13 +3495,13 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a sendInfo->fp = tmqGetWalInfoCb; sendInfo->msgType = TDMT_VND_TMQ_VG_WALINFO; - int64_t transporterId = 0; - char offsetFormatBuf[TSDB_OFFSET_LEN] = {0}; + // int64_t transporterId = 0; + char offsetFormatBuf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.beginOffset); tscInfo("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s,QID:0x%" PRIx64, tmq->consumerId, pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId); - code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, &transporterId, sendInfo); + code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, NULL, sendInfo); if (code != 0) { goto end; } @@ -3668,8 +3665,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ sendInfo->fp = tmqSeekCb; sendInfo->msgType = TDMT_VND_TMQ_SEEK; - int64_t transporterId = 0; - code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); + code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo); if (code != 0) { (void)tsem2_destroy(&pParam->sem); taosMemoryFree(pParam); diff --git a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h index be9ff56674..18b3f66a60 100644 --- a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h +++ b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h @@ -23,26 +23,27 @@ extern "C" { #endif typedef struct SDnodeMgmt { - SDnodeData *pData; - SMsgCb msgCb; - const char *path; - const char *name; - TdThread statusThread; - TdThread notifyThread; - TdThread monitorThread; - TdThread auditThread; - TdThread crashReportThread; - SSingleWorker mgmtWorker; - ProcessCreateNodeFp processCreateNodeFp; - ProcessAlterNodeTypeFp processAlterNodeTypeFp; - ProcessDropNodeFp processDropNodeFp; - SendMonitorReportFp sendMonitorReportFp; - SendAuditRecordsFp sendAuditRecordsFp; - GetVnodeLoadsFp getVnodeLoadsFp; - GetVnodeLoadsFp getVnodeLoadsLiteFp; - GetMnodeLoadsFp getMnodeLoadsFp; - GetQnodeLoadsFp getQnodeLoadsFp; - int32_t statusSeq; + SDnodeData *pData; + SMsgCb msgCb; + const char *path; + const char *name; + TdThread statusThread; + TdThread notifyThread; + TdThread monitorThread; + TdThread auditThread; + TdThread crashReportThread; + SSingleWorker mgmtWorker; + ProcessCreateNodeFp processCreateNodeFp; + ProcessAlterNodeTypeFp processAlterNodeTypeFp; + ProcessDropNodeFp processDropNodeFp; + SendMonitorReportFp sendMonitorReportFp; + MonitorCleanExpiredSamplesFp monitorCleanExpiredSamplesFp; + SendAuditRecordsFp sendAuditRecordsFp; + GetVnodeLoadsFp getVnodeLoadsFp; + GetVnodeLoadsFp getVnodeLoadsLiteFp; + GetMnodeLoadsFp getMnodeLoadsFp; + GetQnodeLoadsFp getQnodeLoadsFp; + int32_t statusSeq; } SDnodeMgmt; // dmHandle.c diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c index 958b411881..d7170398fb 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c @@ -65,6 +65,7 @@ static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { pMgmt->processAlterNodeTypeFp = pInput->processAlterNodeTypeFp; pMgmt->processDropNodeFp = pInput->processDropNodeFp; pMgmt->sendMonitorReportFp = pInput->sendMonitorReportFp; + pMgmt->monitorCleanExpiredSamplesFp = pInput->monitorCleanExpiredSamplesFp; pMgmt->sendAuditRecordsFp = pInput->sendAuditRecordFp; pMgmt->getVnodeLoadsFp = pInput->getVnodeLoadsFp; pMgmt->getVnodeLoadsLiteFp = pInput->getVnodeLoadsLiteFp; diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c index aeb519596d..4cfadc8f59 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c @@ -168,6 +168,7 @@ static void *dmMonitorThreadFp(void *param) { float interval = (curTime - lastTime) / 1000.0f; if (interval >= tsMonitorInterval) { (*pMgmt->sendMonitorReportFp)(); + (*pMgmt->monitorCleanExpiredSamplesFp)(); lastTime = curTime; trimCount = (trimCount + 1) % TRIM_FREQ; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 4cb6c5c724..2abf292e73 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -14,8 +14,11 @@ */ #define _DEFAULT_SOURCE +#include "taos_monitor.h" #include "vmInt.h" +extern taos_counter_t *tsInsertCounter; + void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) { pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad)); if (pInfo->pVloads == NULL) return; @@ -117,6 +120,34 @@ void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) { taosArrayDestroy(pVloads); } +void vmCleanExpriedSamples(SVnodeMgmt *pMgmt) { + int list_size = taos_counter_get_keys_size(tsInsertCounter); + if (list_size == 0) return; + int32_t *vgroup_ids; + char **keys; + int r = 0; + r = taos_counter_get_vgroup_ids(tsInsertCounter, &keys, &vgroup_ids, &list_size); + if (r) { + dError("failed to get vgroup ids"); + return; + } + (void)taosThreadRwlockRdlock(&pMgmt->lock); + for (int i = 0; i < list_size; i++) { + int32_t vgroup_id = vgroup_ids[i]; + void *vnode = taosHashGet(pMgmt->hash, &vgroup_id, sizeof(int32_t)); + if (vnode == NULL) { + r = taos_counter_delete(tsInsertCounter, keys[i]); + if (r) { + dError("failed to delete monitor sample key:%s", keys[i]); + } + } + } + (void)taosThreadRwlockUnlock(&pMgmt->lock); + if (vgroup_ids) taosMemoryFree(vgroup_ids); + if (keys) taosMemoryFree(keys); + return; +} + static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) { memcpy(pCfg, &vnodeCfgDefault, sizeof(SVnodeCfg)); diff --git a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h index 9548d0cad9..5196987f28 100644 --- a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h +++ b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h @@ -128,6 +128,7 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); // dmMonitor.c void dmSendMonitorReport(); +void dmMonitorCleanExpiredSamples(); void dmSendAuditRecords(); void dmGetVnodeLoads(SMonVloadInfo *pInfo); void dmGetVnodeLoadsLite(SMonVloadInfo *pInfo); diff --git a/source/dnode/mgmt/node_mgmt/inc/dmNodes.h b/source/dnode/mgmt/node_mgmt/inc/dmNodes.h index 7d635c6bdc..2561a13b92 100644 --- a/source/dnode/mgmt/node_mgmt/inc/dmNodes.h +++ b/source/dnode/mgmt/node_mgmt/inc/dmNodes.h @@ -39,6 +39,8 @@ void vmGetVnodeLoadsLite(void *pMgmt, SMonVloadInfo *pInfo); void mmGetMnodeLoads(void *pMgmt, SMonMloadInfo *pInfo); void qmGetQnodeLoads(void *pMgmt, SQnodeLoad *pInfo); +void vmCleanExpriedSamples(void *pMgmt); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/node_mgmt/src/dmEnv.c b/source/dnode/mgmt/node_mgmt/src/dmEnv.c index 0a75847d96..620aed709f 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmEnv.c +++ b/source/dnode/mgmt/node_mgmt/src/dmEnv.c @@ -409,6 +409,7 @@ SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) { .processAlterNodeTypeFp = dmProcessAlterNodeTypeReq, .processDropNodeFp = dmProcessDropNodeReq, .sendMonitorReportFp = dmSendMonitorReport, + .monitorCleanExpiredSamplesFp = dmMonitorCleanExpiredSamples, .sendAuditRecordFp = auditSendRecordsInBatch, .getVnodeLoadsFp = dmGetVnodeLoads, .getVnodeLoadsLiteFp = dmGetVnodeLoadsLite, diff --git a/source/dnode/mgmt/node_mgmt/src/dmMonitor.c b/source/dnode/mgmt/node_mgmt/src/dmMonitor.c index d3197282b6..ce0b2b59e0 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMonitor.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMonitor.c @@ -33,8 +33,8 @@ static void dmGetMonitorBasicInfoBasic(SDnode *pDnode, SMonBasicInfo *pInfo) { } static void dmGetMonitorDnodeInfo(SDnode *pDnode, SMonDnodeInfo *pInfo) { - //pInfo->uptime = (taosGetTimestampMs() - pDnode->data.rebootTime) / (86400000.0f); - pInfo->uptime = (taosGetTimestampMs() - pDnode->data.rebootTime) /1000.0f; + // pInfo->uptime = (taosGetTimestampMs() - pDnode->data.rebootTime) / (86400000.0f); + pInfo->uptime = (taosGetTimestampMs() - pDnode->data.rebootTime) / 1000.0f; pInfo->has_mnode = pDnode->wrappers[MNODE].required; pInfo->has_qnode = pDnode->wrappers[QNODE].required; pInfo->has_snode = pDnode->wrappers[SNODE].required; @@ -52,6 +52,17 @@ static void dmGetDmMonitorInfo(SDnode *pDnode) { monSetDmInfo(&dmInfo); } +void dmCleanExpriedSamples(SDnode *pDnode) { + SMgmtWrapper *pWrapper = &pDnode->wrappers[VNODE]; + if (dmMarkWrapper(pWrapper) == 0) { + if (pWrapper->pMgmt != NULL) { + vmCleanExpriedSamples(pWrapper->pMgmt); + } + } + dmReleaseWrapper(pWrapper); + return; +} + static void dmGetDmMonitorInfoBasic(SDnode *pDnode) { SMonDmInfo dmInfo = {0}; dmGetMonitorBasicInfoBasic(pDnode, &dmInfo.basic); @@ -123,11 +134,17 @@ void dmSendMonitorReport() { monGenAndSendReport(); } -//Todo: put this in seperate file in the future -void dmSendAuditRecords() { - auditSendRecordsInBatch(); +void dmMonitorCleanExpiredSamples() { + if (!tsEnableMonitor || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0) return; + dTrace("clean monitor expired samples"); + + SDnode *pDnode = dmInstance(); + (void)dmCleanExpriedSamples(pDnode); } +// Todo: put this in seperate file in the future +void dmSendAuditRecords() { auditSendRecordsInBatch(); } + void dmGetVnodeLoads(SMonVloadInfo *pInfo) { SDnode *pDnode = dmInstance(); SMgmtWrapper *pWrapper = &pDnode->wrappers[VNODE]; diff --git a/source/dnode/mgmt/node_util/inc/dmUtil.h b/source/dnode/mgmt/node_util/inc/dmUtil.h index 4ad4ea7c30..425f10392f 100644 --- a/source/dnode/mgmt/node_util/inc/dmUtil.h +++ b/source/dnode/mgmt/node_util/inc/dmUtil.h @@ -116,6 +116,7 @@ typedef enum { typedef int32_t (*ProcessCreateNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg); typedef int32_t (*ProcessDropNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg); typedef void (*SendMonitorReportFp)(); +typedef void (*MonitorCleanExpiredSamplesFp)(); typedef void (*SendAuditRecordsFp)(); typedef void (*GetVnodeLoadsFp)(SMonVloadInfo *pInfo); typedef void (*GetMnodeLoadsFp)(SMonMloadInfo *pInfo); @@ -146,21 +147,22 @@ typedef struct { } SDnodeData; typedef struct { - const char *path; - const char *name; - STfs *pTfs; - SDnodeData *pData; - SMsgCb msgCb; - ProcessCreateNodeFp processCreateNodeFp; - ProcessAlterNodeTypeFp processAlterNodeTypeFp; - ProcessDropNodeFp processDropNodeFp; - SendMonitorReportFp sendMonitorReportFp; - SendAuditRecordsFp sendAuditRecordFp; - GetVnodeLoadsFp getVnodeLoadsFp; - GetVnodeLoadsFp getVnodeLoadsLiteFp; - GetMnodeLoadsFp getMnodeLoadsFp; - GetQnodeLoadsFp getQnodeLoadsFp; - StopDnodeFp stopDnodeFp; + const char *path; + const char *name; + STfs *pTfs; + SDnodeData *pData; + SMsgCb msgCb; + ProcessCreateNodeFp processCreateNodeFp; + ProcessAlterNodeTypeFp processAlterNodeTypeFp; + ProcessDropNodeFp processDropNodeFp; + SendMonitorReportFp sendMonitorReportFp; + MonitorCleanExpiredSamplesFp monitorCleanExpiredSamplesFp; + SendAuditRecordsFp sendAuditRecordFp; + GetVnodeLoadsFp getVnodeLoadsFp; + GetVnodeLoadsFp getVnodeLoadsLiteFp; + GetMnodeLoadsFp getMnodeLoadsFp; + GetQnodeLoadsFp getQnodeLoadsFp; + StopDnodeFp stopDnodeFp; } SMgmtInputOpt; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndPerfSchema.c b/source/dnode/mnode/impl/src/mndPerfSchema.c index 8ff9f8f27e..da5e201c05 100644 --- a/source/dnode/mnode/impl/src/mndPerfSchema.c +++ b/source/dnode/mnode/impl/src/mndPerfSchema.c @@ -42,7 +42,7 @@ int32_t mndPerfsInitMeta(SHashObj *hash) { int32_t code = 0; STableMetaRsp meta = {0}; - tstrncpy(meta.dbFName, TSDB_INFORMATION_SCHEMA_DB, sizeof(meta.dbFName)); + tstrncpy(meta.dbFName, TSDB_PERFORMANCE_SCHEMA_DB, sizeof(meta.dbFName)); meta.tableType = TSDB_SYSTEM_TABLE; meta.sversion = 1; meta.tversion = 1; diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c index 7244f2d7c9..576ba736b5 100644 --- a/source/dnode/mnode/sdb/src/sdb.c +++ b/source/dnode/mnode/sdb/src/sdb.c @@ -68,7 +68,11 @@ SSdb *sdbInit(SSdbOpt *pOption) { void sdbCleanup(SSdb *pSdb) { mInfo("start to cleanup sdb"); - (void)sdbWriteFile(pSdb, 0); + int32_t code = 0; + + if ((code = sdbWriteFile(pSdb, 0)) != 0) { + mError("failed to write sdb file since %s", tstrerror(code)); + } if (pSdb->currDir != NULL) { taosMemoryFreeClear(pSdb->currDir); diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index dec37bfbf1..94826405dc 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -258,8 +258,11 @@ static int32_t sdbReadFileImp(SSdb *pSdb) { if (code != 0) { mError("failed to read sdb file:%s head since %s", file, tstrerror(code)); taosMemoryFree(pRaw); - (void)taosCloseFile(&pFile); - return -1; + int32_t ret = 0; + if ((ret = taosCloseFile(&pFile)) != 0) { + mError("failed to close sdb file:%s since %s", file, tstrerror(ret)); + } + return code; } int64_t tableVer[SDB_MAX] = {0}; @@ -361,7 +364,9 @@ static int32_t sdbReadFileImp(SSdb *pSdb) { pSdb->commitTerm, pSdb->commitConfig); _OVER: - (void)taosCloseFile(&pFile); + if ((ret = taosCloseFile(&pFile)) != 0) { + mError("failed to close sdb file:%s since %s", file, tstrerror(ret)); + } sdbFreeRaw(pRaw); TAOS_RETURN(code); @@ -404,8 +409,11 @@ static int32_t sdbWriteFileImp(SSdb *pSdb, int32_t skip_type) { code = sdbWriteFileHead(pSdb, pFile); if (code != 0) { mError("failed to write sdb file:%s head since %s", tmpfile, tstrerror(code)); - (void)taosCloseFile(&pFile); - return -1; + int32_t ret = 0; + if ((ret = taosCloseFile(&pFile)) != 0) { + mError("failed to close sdb file:%s since %s", tmpfile, tstrerror(ret)); + } + return code; } for (int32_t i = SDB_MAX - 1; i >= 0; --i) { @@ -613,12 +621,18 @@ static void sdbCloseIter(SSdbIter *pIter) { if (pIter == NULL) return; if (pIter->file != NULL) { - (void)taosCloseFile(&pIter->file); + int32_t ret = 0; + if ((ret = taosCloseFile(&pIter->file)) != 0) { + mError("failed to close sdb file since %s", tstrerror(ret)); + } pIter->file = NULL; } if (pIter->name != NULL) { - (void)taosRemoveFile(pIter->name); + int32_t ret = 0; + if ((ret = taosRemoveFile(pIter->name)) != 0) { + mError("failed to remove sdb file:%s since %s", pIter->name, tstrerror(ret)); + } taosMemoryFree(pIter->name); pIter->name = NULL; } diff --git a/source/dnode/mnode/sdb/src/sdbHash.c b/source/dnode/mnode/sdb/src/sdbHash.c index 0e41e0732f..5a2d1c981c 100644 --- a/source/dnode/mnode/sdb/src/sdbHash.c +++ b/source/dnode/mnode/sdb/src/sdbHash.c @@ -174,12 +174,12 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * if (insertFp != NULL) { code = (*insertFp)(pSdb, pRow->pObj); if (code != 0) { - if (terrno == 0) terrno = TSDB_CODE_MND_TRANS_UNKNOW_ERROR; - code = terrno; - (void)taosHashRemove(hash, pRow->pObj, keySize); + if (taosHashRemove(hash, pRow->pObj, keySize) != 0) { + mError("failed to remove row from hash"); + } sdbFreeRow(pSdb, pRow, false); - terrno = code; sdbUnLock(pSdb, type); + terrno = code; return terrno; } } diff --git a/source/libs/executor/src/dataInserter.c b/source/libs/executor/src/dataInserter.c index 44342b0ac9..5ed97c8db6 100644 --- a/source/libs/executor/src/dataInserter.c +++ b/source/libs/executor/src/dataInserter.c @@ -61,14 +61,14 @@ int32_t inserterCallback(void* param, SDataBuf* pMsg, int32_t code) { if (code) { pInserter->submitRes.code = code; } - + if (code == TSDB_CODE_SUCCESS) { pInserter->submitRes.pRsp = taosMemoryCalloc(1, sizeof(SSubmitRsp2)); if (NULL == pInserter->submitRes.pRsp) { pInserter->submitRes.code = terrno; goto _return; } - + SDecoder coder = {0}; tDecoderInit(&coder, pMsg->pData, pMsg->len); code = tDecodeSSubmitRsp2(&coder, pInserter->submitRes.pRsp); @@ -108,7 +108,7 @@ _return: (void)tsem_post(&pInserter->ready); taosMemoryFree(pMsg->pData); - + return TSDB_CODE_SUCCESS; } @@ -136,8 +136,7 @@ static int32_t sendSubmitRequest(SDataInserterHandle* pInserter, void* pMsg, int pMsgSendInfo->msgType = TDMT_VND_SUBMIT; pMsgSendInfo->fp = inserterCallback; - int64_t transporterId = 0; - return asyncSendMsgToServer(pTransporter, pEpset, &transporterId, pMsgSendInfo); + return asyncSendMsgToServer(pTransporter, pEpset, NULL, pMsgSendInfo); } static int32_t submitReqToMsg(int32_t vgId, SSubmitReq2* pReq, void** pData, int32_t* pLen) { @@ -166,7 +165,7 @@ static int32_t submitReqToMsg(int32_t vgId, SSubmitReq2* pReq, void** pData, int } else { taosMemoryFree(pBuf); } - + return code; } @@ -228,14 +227,15 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; goto _end; } - void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes); + void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes); switch (pColInfoData->info.type) { case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_VARBINARY: case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY if (pColInfoData->info.type != pCol->type) { - qError("column:%d type:%d in block dismatch with schema col:%d type:%d", colIdx, pColInfoData->info.type, k, pCol->type); + qError("column:%d type:%d in block dismatch with schema col:%d type:%d", colIdx, pColInfoData->info.type, k, + pCol->type); terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; goto _end; } @@ -331,11 +331,11 @@ _end: tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE); taosMemoryFree(pReq); } - + return terrno; } *ppReq = pReq; - + return TSDB_CODE_SUCCESS; } @@ -462,7 +462,8 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat inserter->explain = pInserterNode->explain; int64_t suid = 0; - int32_t code = pManager->pAPI->metaFn.getTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId, &inserter->pSchema, &suid); + int32_t code = pManager->pAPI->metaFn.getTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId, + &inserter->pSchema, &suid); if (code) { terrno = code; goto _return; @@ -484,9 +485,9 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat inserter->pCols = taosHashInit(pInserterNode->pCols->length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK); if (NULL == inserter->pCols) { - goto _return; + goto _return; } - + SNode* pNode = NULL; int32_t i = 0; FOREACH(pNode, pInserterNode->pCols) { diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 7e22e38c95..85a8b035da 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -1575,8 +1575,8 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) { SMetaReader mr = {0}; pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn); - code = doSetUserTableMetaInfo(&pAPI->metaReaderFn, &pAPI->metaFn, pInfo->readHandle.vnode, &mr, *uid, dbname, vgId, p, - numOfRows, GET_TASKID(pTaskInfo)); + code = doSetUserTableMetaInfo(&pAPI->metaReaderFn, &pAPI->metaFn, pInfo->readHandle.vnode, &mr, *uid, dbname, vgId, + p, numOfRows, GET_TASKID(pTaskInfo)); pAPI->metaReaderFn.clearReader(&mr); QUERY_CHECK_CODE(code, lino, _end); @@ -2170,8 +2170,7 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca pMsgSendInfo->fp = loadSysTableCallback; pMsgSendInfo->requestId = pTaskInfo->id.queryId; - int64_t transporterId = 0; - code = asyncSendMsgToServer(pInfo->readHandle.pMsgCb->clientRpc, &pInfo->epSet, &transporterId, pMsgSendInfo); + code = asyncSendMsgToServer(pInfo->readHandle.pMsgCb->clientRpc, &pInfo->epSet, NULL, pMsgSendInfo); if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); pTaskInfo->code = code; @@ -2880,7 +2879,7 @@ int32_t createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanP code = tableListGetSize(pTableListInfo, &num); QUERY_CHECK_CODE(code, lino, _error); - void* pList = tableListGetInfo(pTableListInfo, 0); + void* pList = tableListGetInfo(pTableListInfo, 0); code = readHandle->api.tsdReader.tsdReaderOpen(readHandle->vnode, &cond, pList, num, pInfo->pResBlock, (void**)&pInfo->pHandle, pTaskInfo->id.str, NULL); diff --git a/source/libs/monitorfw/inc/taos_metric_formatter_i.h b/source/libs/monitorfw/inc/taos_metric_formatter_i.h index ab60359f4a..54e683fa91 100644 --- a/source/libs/monitorfw/inc/taos_metric_formatter_i.h +++ b/source/libs/monitorfw/inc/taos_metric_formatter_i.h @@ -16,6 +16,7 @@ #ifndef TAOS_METRIC_FORMATTER_I_H #define TAOS_METRIC_FORMATTER_I_H +#include // Private #include "taos_metric_formatter_t.h" @@ -57,8 +58,8 @@ int taos_metric_formatter_load_l_value(taos_metric_formatter_t *metric_formatter /** * @brief API PRIVATE Loads the formatter with a metric sample */ -int taos_metric_formatter_load_sample(taos_metric_formatter_t *metric_formatter, taos_metric_sample_t *sample, - char *ts, char *format); +int taos_metric_formatter_load_sample(taos_metric_formatter_t *metric_formatter, taos_metric_sample_t *sample, char *ts, + char *format); /** * @brief API PRIVATE Loads a metric in the string exposition format @@ -80,4 +81,5 @@ int taos_metric_formatter_clear(taos_metric_formatter_t *self); */ char *taos_metric_formatter_dump(taos_metric_formatter_t *metric_formatter); +int32_t taos_metric_formatter_get_vgroup_id(char *key); #endif // TAOS_METRIC_FORMATTER_I_H diff --git a/source/libs/monitorfw/src/taos_counter.c b/source/libs/monitorfw/src/taos_counter.c index ef7d41cf2c..096d19c190 100644 --- a/source/libs/monitorfw/src/taos_counter.c +++ b/source/libs/monitorfw/src/taos_counter.c @@ -20,13 +20,14 @@ #include "taos_alloc.h" // Private -#include "taos_test.h" #include "taos_errors.h" #include "taos_log.h" +#include "taos_metric_formatter_i.h" #include "taos_metric_i.h" #include "taos_metric_sample_i.h" #include "taos_metric_sample_t.h" #include "taos_metric_t.h" +#include "taos_test.h" taos_counter_t *taos_counter_new(const char *name, const char *help, size_t label_key_count, const char **label_keys) { return (taos_counter_t *)taos_metric_new(TAOS_COUNTER, name, help, label_key_count, label_keys); @@ -64,3 +65,49 @@ int taos_counter_add(taos_counter_t *self, double r_value, const char **label_va if (sample == NULL) return 1; return taos_metric_sample_add(sample, r_value); } + +int taos_counter_get_keys_size(taos_counter_t *self) { return self->samples->keys->size; } + +int taos_counter_get_vgroup_ids(taos_counter_t *self, char ***keys, int32_t **vgroup_ids, int *list_size) { + TAOS_TEST_PARA(self != NULL); + if (self == NULL) return 1; + if (self->type != TAOS_COUNTER) { + TAOS_LOG(TAOS_METRIC_INCORRECT_TYPE); + return 1; + } + if (self->samples == NULL) return 1; + (void)pthread_rwlock_rdlock(self->rwlock); + taos_linked_list_t *key_list = self->samples->keys; + *list_size = key_list->size; + int r = 0; + *vgroup_ids = (int32_t *)taos_malloc(*list_size * sizeof(int32_t)); + if (vgroup_ids == NULL) { + (void)pthread_rwlock_unlock(self->rwlock); + return 1; + } + *keys = (char **)taos_malloc(*list_size * sizeof(char *)); + if (keys == NULL) { + (void)pthread_rwlock_unlock(self->rwlock); + return 1; + } + int index = 0; + for (taos_linked_list_node_t *current_key = key_list->head; current_key != NULL; current_key = current_key->next) { + char *key = (char *)current_key->item; + int32_t vgroup_id = taos_metric_formatter_get_vgroup_id(key); + (*vgroup_ids)[index] = vgroup_id; + (*keys)[index] = key; + index++; + } + (void)pthread_rwlock_unlock(self->rwlock); + return r; +} + +int taos_counter_delete(taos_counter_t *self, char *key) { + TAOS_TEST_PARA(self != NULL); + if (self == NULL) return 1; + if (self->type != TAOS_COUNTER) { + TAOS_LOG(TAOS_METRIC_INCORRECT_TYPE); + return 1; + } + return taos_map_delete(self->samples, key); +} \ No newline at end of file diff --git a/source/libs/monitorfw/src/taos_metric_formatter.c b/source/libs/monitorfw/src/taos_metric_formatter.c index a20a8d919c..cb1edd30b6 100644 --- a/source/libs/monitorfw/src/taos_metric_formatter.c +++ b/source/libs/monitorfw/src/taos_metric_formatter.c @@ -156,6 +156,21 @@ int taos_metric_formatter_load_l_value(taos_metric_formatter_t *self, const char } return 0; } +int32_t taos_metric_formatter_get_vgroup_id(char *key) { + char *start, *end; + char vgroupid[10]; + start = strstr(key, "vgroup_id=\""); + if (start) { + start += strlen("vgroup_id=\""); + end = strchr(start, '\"'); + if (end) { + strncpy(vgroupid, start, end - start); + vgroupid[end - start] = '\0'; + } + return strtol(vgroupid, NULL, 10); + } + return 0; +} /* int taos_metric_formatter_load_sample(taos_metric_formatter_t *self, taos_metric_sample_t *sample, char *ts, char *format) { diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index 207bd91bd9..b5b660a51b 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -574,7 +574,7 @@ int32_t queryProcessTableMetaRsp(void *output, char *msg, int32_t msgSize) { goto PROCESS_META_OVER; } - if (0 != strcmp(metaRsp.dbFName, TSDB_INFORMATION_SCHEMA_DB) && + if (!IS_SYS_DBNAME(metaRsp.dbFName) && !tIsValidSchema(metaRsp.pSchemas, metaRsp.numOfColumns, metaRsp.numOfTags)) { code = TSDB_CODE_TSC_INVALID_VALUE; goto PROCESS_META_OVER; diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c index 382b83012d..696222784e 100644 --- a/source/libs/scalar/src/filter.c +++ b/source/libs/scalar/src/filter.c @@ -5238,22 +5238,20 @@ int32_t filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData **p, *pResultStatus = FILTER_RESULT_ALL_QUALIFIED; return TSDB_CODE_SUCCESS; } - + int32_t code = TSDB_CODE_SUCCESS; SScalarParam output = {0}; SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)}; - int32_t code = sclCreateColumnInfoData(&type, pSrc->info.rows, &output); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + FLT_ERR_JRET(sclCreateColumnInfoData(&type, pSrc->info.rows, &output)); if (info->scalarMode) { SArray *pList = taosArrayInit(1, POINTER_BYTES); if (NULL == pList) { - FLT_ERR_RET(terrno); + FLT_ERR_JRET(terrno); } if (NULL == taosArrayPush(pList, &pSrc)) { - FLT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + taosArrayDestroy(pList); + FLT_ERR_JRET(terrno); } code = scalarCalculate(info->sclCtx.node, pList, &output); @@ -5261,7 +5259,7 @@ int32_t filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData **p, *p = output.columnData; - FLT_ERR_RET(code); + FLT_ERR_JRET(code); if (output.numOfQualified == output.numOfRows) { *pResultStatus = FILTER_RESULT_ALL_QUALIFIED; @@ -5277,11 +5275,12 @@ int32_t filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData **p, output.numOfRows = pSrc->info.rows; if (*p == NULL) { - return TSDB_CODE_APP_ERROR; + fltError("filterExecute failed, column data is NULL"); + FLT_ERR_JRET(TSDB_CODE_APP_ERROR); } bool keepAll = false; - FLT_ERR_RET((info->func)(info, pSrc->info.rows, *p, statis, numOfCols, &output.numOfQualified, &keepAll)); + FLT_ERR_JRET((info->func)(info, pSrc->info.rows, *p, statis, numOfCols, &output.numOfQualified, &keepAll)); // todo this should be return during filter procedure if (keepAll) { @@ -5304,6 +5303,10 @@ int32_t filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData **p, } return TSDB_CODE_SUCCESS; +_return: + sclFreeParam(&output); + *p = NULL; + return code; } typedef struct SClassifyConditionCxt { diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 84f0ffc8cb..95ea6944e3 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -272,19 +272,19 @@ bool transAsyncPoolIsEmpty(SAsyncPool* pool); } \ } while (0) -#define ASYNC_CHECK_HANDLE(exh1, id) \ - do { \ - if (id > 0) { \ - SExHandle* exh2 = transAcquireExHandle(transGetRefMgt(), id); \ - if (exh2 == NULL || id != exh2->refId) { \ - tDebug("ref:%" PRId64 " already released" PRIu64, id); \ - code = terrno; \ - goto _return1; \ - } \ - } else { \ - tWarn("invalid handle to release"); \ - goto _return2; \ - } \ +#define ASYNC_CHECK_HANDLE(exh1, id) \ + do { \ + if (id > 0) { \ + SExHandle* exh2 = transAcquireExHandle(transGetSvrRefMgt(), id); \ + if (exh2 == NULL || id != exh2->refId) { \ + tDebug("ref:%" PRId64 " already released", id); \ + code = terrno; \ + goto _return1; \ + } \ + } else { \ + tDebug("invalid handle to release"); \ + goto _return2; \ + } \ } while (0) int32_t transInitBuffer(SConnBuffer* buf); @@ -443,6 +443,7 @@ int32_t transReleaseExHandle(int32_t refMgt, int64_t refId); void transDestroyExHandle(void* handle); int32_t transGetRefMgt(); +int32_t transGetSvrRefMgt(); int32_t transGetInstMgt(); int32_t transGetSyncMsgMgt(); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 1758ca65cc..1329489be6 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -20,6 +20,7 @@ static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT; static int32_t refMgt; +static int32_t svrRefMgt; static int32_t instMgt; static int32_t transSyncMsgMgt; @@ -704,12 +705,14 @@ bool transEpSetIsEqual2(SEpSet* a, SEpSet* b) { static void transInitEnv() { refMgt = transOpenRefMgt(50000, transDestroyExHandle); + svrRefMgt = transOpenRefMgt(50000, transDestroyExHandle); instMgt = taosOpenRef(50, rpcCloseImpl); transSyncMsgMgt = taosOpenRef(50, transDestroySyncMsg); (void)uv_os_setenv("UV_TCP_SINGLE_ACCEPT", "1"); } static void transDestroyEnv() { transCloseRefMgt(refMgt); + transCloseRefMgt(svrRefMgt); transCloseRefMgt(instMgt); transCloseRefMgt(transSyncMsgMgt); } @@ -724,6 +727,7 @@ int32_t transInit() { } int32_t transGetRefMgt() { return refMgt; } +int32_t transGetSvrRefMgt() { return svrRefMgt; } int32_t transGetInstMgt() { return instMgt; } int32_t transGetSyncMsgMgt() { return transSyncMsgMgt; } diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 53a7dee7be..11aa468b19 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -373,6 +373,7 @@ static bool uvHandleReq(SSvrConn* pConn) { STrans* pTransInst = pConn->pTransInst; SWorkThrd* pThrd = pConn->hostThrd; + int8_t acquire = 0; STransMsgHead* pHead = NULL; int8_t resetBuf = pConn->status == ConnAcquire ? 0 : 1; @@ -459,7 +460,13 @@ static bool uvHandleReq(SSvrConn* pConn) { // 2. once send out data, cli conn released to conn pool immediately // 3. not mixed with persist transMsg.info.ahandle = (void*)pHead->ahandle; - transMsg.info.handle = (void*)transAcquireExHandle(transGetRefMgt(), pConn->refId); + + if (pHead->noResp == 1) { + transMsg.info.handle = NULL; + } else { + transMsg.info.handle = (void*)transAcquireExHandle(transGetSvrRefMgt(), pConn->refId); + acquire = 1; + } transMsg.info.refId = pConn->refId; transMsg.info.traceId = pHead->traceId; transMsg.info.cliVer = htonl(pHead->compatibilityVer); @@ -468,10 +475,10 @@ static bool uvHandleReq(SSvrConn* pConn) { tGTrace("%s handle %p conn:%p translated to app, refId:%" PRIu64, transLabel(pTransInst), transMsg.info.handle, pConn, pConn->refId); - if (transMsg.info.handle == NULL) { - tError("%s handle %p conn:%p handle failed to init" PRIu64, transLabel(pTransInst), transMsg.info.handle, pConn); - return false; - } + // if (transMsg.info.handle == NULL) { + // tError("%s handle %p conn:%p handle failed to init" PRIu64, transLabel(pTransInst), transMsg.info.handle, pConn); + // return false; + // } if (pHead->noResp == 1) { transMsg.info.refId = -1; @@ -483,7 +490,7 @@ static bool uvHandleReq(SSvrConn* pConn) { pConnInfo->clientPort = pConn->port; tstrncpy(pConnInfo->user, pConn->user, sizeof(pConnInfo->user)); - (void)transReleaseExHandle(transGetRefMgt(), pConn->refId); + if (acquire) transReleaseExHandle(transGetSvrRefMgt(), pConn->refId); (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); return true; @@ -770,15 +777,15 @@ void uvWorkerAsyncCb(uv_async_t* handle) { SExHandle* exh1 = transMsg.info.handle; int64_t refId = transMsg.info.refId; - SExHandle* exh2 = transAcquireExHandle(transGetRefMgt(), refId); + SExHandle* exh2 = transAcquireExHandle(transGetSvrRefMgt(), refId); if (exh2 == NULL || exh1 != exh2) { tTrace("handle except msg %p, ignore it", exh1); - (void)transReleaseExHandle(transGetRefMgt(), refId); + (void)transReleaseExHandle(transGetSvrRefMgt(), refId); destroySmsg(msg); continue; } msg->pConn = exh1->handle; - (void)transReleaseExHandle(transGetRefMgt(), refId); + (void)transReleaseExHandle(transGetSvrRefMgt(), refId); (*transAsyncHandle[msg->type])(msg, pThrd); } } @@ -874,15 +881,15 @@ static void uvPrepareCb(uv_prepare_t* handle) { SExHandle* exh1 = transMsg.info.handle; int64_t refId = transMsg.info.refId; - SExHandle* exh2 = transAcquireExHandle(transGetRefMgt(), refId); + SExHandle* exh2 = transAcquireExHandle(transGetSvrRefMgt(), refId); if (exh2 == NULL || exh1 != exh2) { tTrace("handle except msg %p, ignore it", exh1); - (void)transReleaseExHandle(transGetRefMgt(), refId); + (void)transReleaseExHandle(transGetSvrRefMgt(), refId); destroySmsg(msg); continue; } msg->pConn = exh1->handle; - (void)transReleaseExHandle(transGetRefMgt(), refId); + (void)transReleaseExHandle(transGetSvrRefMgt(), refId); (*transAsyncHandle[msg->type])(msg, pThrd); } } @@ -1215,14 +1222,14 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) { exh->handle = pConn; exh->pThrd = pThrd; - exh->refId = transAddExHandle(transGetRefMgt(), exh); + exh->refId = transAddExHandle(transGetSvrRefMgt(), exh); if (exh->refId < 0) { TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _end); } QUEUE_INIT(&exh->q); - SExHandle* pSelf = transAcquireExHandle(transGetRefMgt(), exh->refId); + SExHandle* pSelf = transAcquireExHandle(transGetSvrRefMgt(), exh->refId); if (pSelf != exh) { TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _end); } @@ -1284,8 +1291,8 @@ static FORCE_INLINE void destroyConnRegArg(SSvrConn* conn) { } static int32_t reallocConnRef(SSvrConn* conn) { if (conn->refId > 0) { - (void)transReleaseExHandle(transGetRefMgt(), conn->refId); - (void)transRemoveExHandle(transGetRefMgt(), conn->refId); + (void)transReleaseExHandle(transGetSvrRefMgt(), conn->refId); + (void)transRemoveExHandle(transGetSvrRefMgt(), conn->refId); } // avoid app continue to send msg on invalid handle SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle)); @@ -1295,14 +1302,14 @@ static int32_t reallocConnRef(SSvrConn* conn) { exh->handle = conn; exh->pThrd = conn->hostThrd; - exh->refId = transAddExHandle(transGetRefMgt(), exh); + exh->refId = transAddExHandle(transGetSvrRefMgt(), exh); if (exh->refId < 0) { taosMemoryFree(exh); return TSDB_CODE_REF_INVALID_ID; } QUEUE_INIT(&exh->q); - SExHandle* pSelf = transAcquireExHandle(transGetRefMgt(), exh->refId); + SExHandle* pSelf = transAcquireExHandle(transGetSvrRefMgt(), exh->refId); if (pSelf != exh) { tError("conn %p failed to acquire handle", conn); taosMemoryFree(exh); @@ -1321,8 +1328,8 @@ static void uvDestroyConn(uv_handle_t* handle) { } SWorkThrd* thrd = conn->hostThrd; - (void)transReleaseExHandle(transGetRefMgt(), conn->refId); - (void)transRemoveExHandle(transGetRefMgt(), conn->refId); + (void)transReleaseExHandle(transGetSvrRefMgt(), conn->refId); + (void)transRemoveExHandle(transGetSvrRefMgt(), conn->refId); STrans* pTransInst = thrd->pTransInst; tDebug("%s conn %p destroy", transLabel(pTransInst), conn); @@ -1752,15 +1759,15 @@ int32_t transReleaseSrvHandle(void* handle) { tDebug("%s conn %p start to release", transLabel(pThrd->pTransInst), exh->handle); if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) { destroySmsg(m); - (void)transReleaseExHandle(transGetRefMgt(), refId); + (void)transReleaseExHandle(transGetSvrRefMgt(), refId); return code; } - (void)transReleaseExHandle(transGetRefMgt(), refId); + (void)transReleaseExHandle(transGetSvrRefMgt(), refId); return 0; _return1: tDebug("handle %p failed to send to release handle", exh); - (void)transReleaseExHandle(transGetRefMgt(), refId); + (void)transReleaseExHandle(transGetSvrRefMgt(), refId); return code; _return2: tDebug("handle %p failed to send to release handle", exh); @@ -1803,17 +1810,17 @@ int32_t transSendResponse(const STransMsg* msg) { tGDebug("conn %p start to send resp (1/2)", exh->handle); if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) { destroySmsg(m); - (void)transReleaseExHandle(transGetRefMgt(), refId); + (void)transReleaseExHandle(transGetSvrRefMgt(), refId); return code; } - (void)transReleaseExHandle(transGetRefMgt(), refId); + (void)transReleaseExHandle(transGetSvrRefMgt(), refId); return 0; _return1: tDebug("handle %p failed to send resp", exh); rpcFreeCont(msg->pCont); - (void)transReleaseExHandle(transGetRefMgt(), refId); + (void)transReleaseExHandle(transGetSvrRefMgt(), refId); return code; _return2: tDebug("handle %p failed to send resp", exh); @@ -1848,17 +1855,17 @@ int32_t transRegisterMsg(const STransMsg* msg) { tDebug("%s conn %p start to register brokenlink callback", transLabel(pTransInst), exh->handle); if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) { destroySmsg(m); - (void)transReleaseExHandle(transGetRefMgt(), refId); + (void)transReleaseExHandle(transGetSvrRefMgt(), refId); return code; } - (void)transReleaseExHandle(transGetRefMgt(), refId); + (void)transReleaseExHandle(transGetSvrRefMgt(), refId); return 0; _return1: tDebug("handle %p failed to register brokenlink", exh); rpcFreeCont(msg->pCont); - (void)transReleaseExHandle(transGetRefMgt(), refId); + (void)transReleaseExHandle(transGetSvrRefMgt(), refId); return code; _return2: tDebug("handle %p failed to register brokenlink", exh);