From 3e34c4cd2645bd9ceaeadb4eeaff5fbc7ec70824 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 3 Dec 2021 17:22:44 +0800 Subject: [PATCH] TD-10431 process mnode profile --- source/dnode/mnode/impl/src/mndProfile.c | 53 +++++++++- source/dnode/mnode/impl/src/mndShow.c | 126 ++++++++++++----------- 2 files changed, 118 insertions(+), 61 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index d70d045a04..eb9926cfb6 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -248,6 +248,41 @@ static int32_t mndProcessConnectMsg(SMnodeMsg *pMsg) { return 0; } +static int32_t mnodeSaveQueryStreamList(SConnObj *pConn, SHeartBeatMsg *pMsg) { + pConn->numOfQueries = 0; + pConn->numOfStreams = 0; + int32_t numOfQueries = htonl(pMsg->numOfQueries); + int32_t numOfStreams = htonl(pMsg->numOfStreams); + + if (numOfQueries > 0) { + if (pConn->pQueries == NULL) { + pConn->pQueries = calloc(sizeof(SQueryDesc), QUERY_STREAM_SAVE_SIZE); + } + + pConn->numOfQueries = MIN(QUERY_STREAM_SAVE_SIZE, numOfQueries); + + int32_t saveSize = pConn->numOfQueries * sizeof(SQueryDesc); + if (saveSize > 0 && pConn->pQueries != NULL) { + memcpy(pConn->pQueries, pMsg->pData, saveSize); + } + } + + if (numOfStreams > 0) { + if (pConn->pStreams == NULL) { + pConn->pStreams = calloc(sizeof(SStreamDesc), QUERY_STREAM_SAVE_SIZE); + } + + pConn->numOfStreams = MIN(QUERY_STREAM_SAVE_SIZE, numOfStreams); + + int32_t saveSize = pConn->numOfStreams * sizeof(SStreamDesc); + if (saveSize > 0 && pConn->pStreams != NULL) { + memcpy(pConn->pStreams, pMsg->pData + numOfQueries * sizeof(SQueryDesc), saveSize); + } + } + + return TSDB_CODE_SUCCESS; +} + static int32_t mndProcessHeartBeatMsg(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; SHeartBeatMsg *pReq = pMsg->rpcMsg.pCont; @@ -279,8 +314,24 @@ static int32_t mndProcessHeartBeatMsg(SMnodeMsg *pMsg) { return -1; } + mnodeSaveQueryStreamList(pConn, pReq); + if (pConn->killed != 0) { + pRsp->killConnection = 1; + } + + if (pConn->streamId != 0) { + pRsp->streamId = htonl(pConn->streamId); + pConn->streamId = 0; + } + + if (pConn->queryId != 0) { + pRsp->queryId = htonl(pConn->queryId); + pConn->queryId = 0; + } + pRsp->connId = htonl(pConn->connId); - pRsp->killConnection = pConn->killed; + pRsp->totalDnodes = htnol(1); + pRsp->onlineDnodes = htonl(1); mndGetMnodeEpSet(pMnode, &pRsp->epSet); mndReleaseConn(pMnode, pConn); diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index ff9f2a9805..e652f94957 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -40,7 +40,72 @@ int32_t mndInitShow(SMnode *pMnode) { return 0; } -void mndCleanupShow(SMnode *pMnode) {} +void mndCleanupShow(SMnode *pMnode) { + SShowMgmt *pMgmt = &pMnode->showMgmt; + if (pMgmt->cache != NULL) { + taosCacheCleanup(pMgmt->cache); + pMgmt->cache = NULL; + } +} + +static int32_t mndAcquireShowObj(SMnode *pMnode, SShowObj *pShow) { + TSDB_CACHE_PTR_TYPE handleVal = (TSDB_CACHE_PTR_TYPE)pShow; + + SShowMgmt *pMgmt = &pMnode->showMgmt; + SShowObj **ppShow = taosCacheAcquireByKey(pMgmt->cache, &handleVal, sizeof(TSDB_CACHE_PTR_TYPE)); + if (ppShow) { + mTrace("show:%d, data:%p acquired from cache", pShow->id, ppShow); + return 0; + } + + return -1; +} + +static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove) { + SMnode *pMnode = pShow->pMnode; + SShowMgmt *pMgmt = &pMnode->showMgmt; + SShowObj **ppShow = (SShowObj **)pShow->ppShow; + taosCacheRelease(pMgmt->cache, (void **)(&ppShow), forceRemove); + mDebug("show:%d, data:%p released from cache, force:%d", pShow->id, ppShow, forceRemove); +} + +static int32_t mndPutShowObj(SMnode *pMnode, SShowObj *pShow) { + SShowMgmt *pMgmt = &pMnode->showMgmt; + int32_t lifeSpan = pMnode->shellActivityTimer * 6 * 1000; + + TSDB_CACHE_PTR_TYPE val = (TSDB_CACHE_PTR_TYPE)pShow; + pShow->id = atomic_add_fetch_32(&pMgmt->showId, 1); + SShowObj **ppShow = + taosCachePut(pMgmt->cache, &val, sizeof(TSDB_CACHE_PTR_TYPE), &pShow, sizeof(TSDB_CACHE_PTR_TYPE), lifeSpan); + if (ppShow == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + mError("show:%d, failed to put into cache", pShow->id); + return -1; + } + + mTrace("show:%d, data:%p put into cache", pShow->id, ppShow); + return 0; +} + +static void mndFreeShowObj(void *ppShow) { + SShowObj *pShow = *(SShowObj **)ppShow; + SMnode *pMnode = pShow->pMnode; + SShowMgmt *pMgmt = &pMnode->showMgmt; + + ShowFreeIterFp freeFp = pMgmt->freeIterFps[pShow->type]; + if (freeFp != NULL) { + if (pShow->pVgIter != NULL) { + // only used in 'show vnodes "ep"' + (*freeFp)(pMnode, pShow->pVgIter); + } + if (pShow->pIter != NULL) { + (*freeFp)(pMnode, pShow->pIter); + } + } + + mDebug("show:%d, data:%p destroyed", pShow->id, ppShow); + tfree(pShow); +} static int32_t mndProcessShowMsg(SMnodeMsg *pMnodeMsg) { SMnode *pMnode = pMnodeMsg->pMnode; @@ -246,65 +311,6 @@ static bool mndCheckRetrieveFinished(SShowObj *pShow) { return false; } -static int32_t mndAcquireShowObj(SMnode *pMnode, SShowObj *pShow) { - TSDB_CACHE_PTR_TYPE handleVal = (TSDB_CACHE_PTR_TYPE)pShow; - - SShowMgmt *pMgmt = &pMnode->showMgmt; - SShowObj **ppShow = taosCacheAcquireByKey(pMgmt->cache, &handleVal, sizeof(TSDB_CACHE_PTR_TYPE)); - if (ppShow) { - mTrace("show:%d, data:%p acquired from cache", pShow->id, ppShow); - return 0; - } - - return -1; -} - -static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove) { - SMnode *pMnode = pShow->pMnode; - SShowMgmt *pMgmt = &pMnode->showMgmt; - SShowObj **ppShow = (SShowObj **)pShow->ppShow; - taosCacheRelease(pMgmt->cache, (void **)(&ppShow), forceRemove); - mDebug("show:%d, data:%p released from cache, force:%d", pShow->id, ppShow, forceRemove); -} - -static int32_t mndPutShowObj(SMnode *pMnode, SShowObj *pShow) { - SShowMgmt *pMgmt = &pMnode->showMgmt; - int32_t lifeSpan = pMnode->shellActivityTimer * 6 * 1000; - - TSDB_CACHE_PTR_TYPE val = (TSDB_CACHE_PTR_TYPE)pShow; - pShow->id = atomic_add_fetch_32(&pMgmt->showId, 1); - SShowObj **ppShow = - taosCachePut(pMgmt->cache, &val, sizeof(TSDB_CACHE_PTR_TYPE), &pShow, sizeof(TSDB_CACHE_PTR_TYPE), lifeSpan); - if (ppShow == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - mError("show:%d, failed to put into cache", pShow->id); - return -1; - } - - mTrace("show:%d, data:%p put into cache", pShow->id, ppShow); - return 0; -} - -static void mndFreeShowObj(void *ppShow) { - SShowObj *pShow = *(SShowObj **)ppShow; - SMnode *pMnode = pShow->pMnode; - SShowMgmt *pMgmt = &pMnode->showMgmt; - - ShowFreeIterFp freeFp = pMgmt->freeIterFps[pShow->type]; - if (freeFp != NULL) { - if (pShow->pVgIter != NULL) { - // only used in 'show vnodes "ep"' - (*freeFp)(pMnode, pShow->pVgIter); - } - if (pShow->pIter != NULL) { - (*freeFp)(pMnode, pShow->pIter); - } - } - - mDebug("show:%d, data:%p destroyed", pShow->id, ppShow); - tfree(pShow); -} - void mnodeVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_t capacity, SShowObj *pShow) { if (rows < capacity) { for (int32_t i = 0; i < numOfCols; ++i) {