TD-10431 process mnode profile
This commit is contained in:
parent
56a4b9d23d
commit
3e34c4cd26
|
@ -248,6 +248,41 @@ static int32_t mndProcessConnectMsg(SMnodeMsg *pMsg) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mnodeSaveQueryStreamList(SConnObj *pConn, SHeartBeatMsg *pMsg) {
|
||||
pConn->numOfQueries = 0;
|
||||
pConn->numOfStreams = 0;
|
||||
int32_t numOfQueries = htonl(pMsg->numOfQueries);
|
||||
int32_t numOfStreams = htonl(pMsg->numOfStreams);
|
||||
|
||||
if (numOfQueries > 0) {
|
||||
if (pConn->pQueries == NULL) {
|
||||
pConn->pQueries = calloc(sizeof(SQueryDesc), QUERY_STREAM_SAVE_SIZE);
|
||||
}
|
||||
|
||||
pConn->numOfQueries = MIN(QUERY_STREAM_SAVE_SIZE, numOfQueries);
|
||||
|
||||
int32_t saveSize = pConn->numOfQueries * sizeof(SQueryDesc);
|
||||
if (saveSize > 0 && pConn->pQueries != NULL) {
|
||||
memcpy(pConn->pQueries, pMsg->pData, saveSize);
|
||||
}
|
||||
}
|
||||
|
||||
if (numOfStreams > 0) {
|
||||
if (pConn->pStreams == NULL) {
|
||||
pConn->pStreams = calloc(sizeof(SStreamDesc), QUERY_STREAM_SAVE_SIZE);
|
||||
}
|
||||
|
||||
pConn->numOfStreams = MIN(QUERY_STREAM_SAVE_SIZE, numOfStreams);
|
||||
|
||||
int32_t saveSize = pConn->numOfStreams * sizeof(SStreamDesc);
|
||||
if (saveSize > 0 && pConn->pStreams != NULL) {
|
||||
memcpy(pConn->pStreams, pMsg->pData + numOfQueries * sizeof(SQueryDesc), saveSize);
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mndProcessHeartBeatMsg(SMnodeMsg *pMsg) {
|
||||
SMnode *pMnode = pMsg->pMnode;
|
||||
SHeartBeatMsg *pReq = pMsg->rpcMsg.pCont;
|
||||
|
@ -279,8 +314,24 @@ static int32_t mndProcessHeartBeatMsg(SMnodeMsg *pMsg) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
mnodeSaveQueryStreamList(pConn, pReq);
|
||||
if (pConn->killed != 0) {
|
||||
pRsp->killConnection = 1;
|
||||
}
|
||||
|
||||
if (pConn->streamId != 0) {
|
||||
pRsp->streamId = htonl(pConn->streamId);
|
||||
pConn->streamId = 0;
|
||||
}
|
||||
|
||||
if (pConn->queryId != 0) {
|
||||
pRsp->queryId = htonl(pConn->queryId);
|
||||
pConn->queryId = 0;
|
||||
}
|
||||
|
||||
pRsp->connId = htonl(pConn->connId);
|
||||
pRsp->killConnection = pConn->killed;
|
||||
pRsp->totalDnodes = htnol(1);
|
||||
pRsp->onlineDnodes = htonl(1);
|
||||
mndGetMnodeEpSet(pMnode, &pRsp->epSet);
|
||||
mndReleaseConn(pMnode, pConn);
|
||||
|
||||
|
|
|
@ -40,7 +40,72 @@ int32_t mndInitShow(SMnode *pMnode) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
void mndCleanupShow(SMnode *pMnode) {}
|
||||
void mndCleanupShow(SMnode *pMnode) {
|
||||
SShowMgmt *pMgmt = &pMnode->showMgmt;
|
||||
if (pMgmt->cache != NULL) {
|
||||
taosCacheCleanup(pMgmt->cache);
|
||||
pMgmt->cache = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t mndAcquireShowObj(SMnode *pMnode, SShowObj *pShow) {
|
||||
TSDB_CACHE_PTR_TYPE handleVal = (TSDB_CACHE_PTR_TYPE)pShow;
|
||||
|
||||
SShowMgmt *pMgmt = &pMnode->showMgmt;
|
||||
SShowObj **ppShow = taosCacheAcquireByKey(pMgmt->cache, &handleVal, sizeof(TSDB_CACHE_PTR_TYPE));
|
||||
if (ppShow) {
|
||||
mTrace("show:%d, data:%p acquired from cache", pShow->id, ppShow);
|
||||
return 0;
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove) {
|
||||
SMnode *pMnode = pShow->pMnode;
|
||||
SShowMgmt *pMgmt = &pMnode->showMgmt;
|
||||
SShowObj **ppShow = (SShowObj **)pShow->ppShow;
|
||||
taosCacheRelease(pMgmt->cache, (void **)(&ppShow), forceRemove);
|
||||
mDebug("show:%d, data:%p released from cache, force:%d", pShow->id, ppShow, forceRemove);
|
||||
}
|
||||
|
||||
static int32_t mndPutShowObj(SMnode *pMnode, SShowObj *pShow) {
|
||||
SShowMgmt *pMgmt = &pMnode->showMgmt;
|
||||
int32_t lifeSpan = pMnode->shellActivityTimer * 6 * 1000;
|
||||
|
||||
TSDB_CACHE_PTR_TYPE val = (TSDB_CACHE_PTR_TYPE)pShow;
|
||||
pShow->id = atomic_add_fetch_32(&pMgmt->showId, 1);
|
||||
SShowObj **ppShow =
|
||||
taosCachePut(pMgmt->cache, &val, sizeof(TSDB_CACHE_PTR_TYPE), &pShow, sizeof(TSDB_CACHE_PTR_TYPE), lifeSpan);
|
||||
if (ppShow == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
mError("show:%d, failed to put into cache", pShow->id);
|
||||
return -1;
|
||||
}
|
||||
|
||||
mTrace("show:%d, data:%p put into cache", pShow->id, ppShow);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void mndFreeShowObj(void *ppShow) {
|
||||
SShowObj *pShow = *(SShowObj **)ppShow;
|
||||
SMnode *pMnode = pShow->pMnode;
|
||||
SShowMgmt *pMgmt = &pMnode->showMgmt;
|
||||
|
||||
ShowFreeIterFp freeFp = pMgmt->freeIterFps[pShow->type];
|
||||
if (freeFp != NULL) {
|
||||
if (pShow->pVgIter != NULL) {
|
||||
// only used in 'show vnodes "ep"'
|
||||
(*freeFp)(pMnode, pShow->pVgIter);
|
||||
}
|
||||
if (pShow->pIter != NULL) {
|
||||
(*freeFp)(pMnode, pShow->pIter);
|
||||
}
|
||||
}
|
||||
|
||||
mDebug("show:%d, data:%p destroyed", pShow->id, ppShow);
|
||||
tfree(pShow);
|
||||
}
|
||||
|
||||
static int32_t mndProcessShowMsg(SMnodeMsg *pMnodeMsg) {
|
||||
SMnode *pMnode = pMnodeMsg->pMnode;
|
||||
|
@ -246,65 +311,6 @@ static bool mndCheckRetrieveFinished(SShowObj *pShow) {
|
|||
return false;
|
||||
}
|
||||
|
||||
static int32_t mndAcquireShowObj(SMnode *pMnode, SShowObj *pShow) {
|
||||
TSDB_CACHE_PTR_TYPE handleVal = (TSDB_CACHE_PTR_TYPE)pShow;
|
||||
|
||||
SShowMgmt *pMgmt = &pMnode->showMgmt;
|
||||
SShowObj **ppShow = taosCacheAcquireByKey(pMgmt->cache, &handleVal, sizeof(TSDB_CACHE_PTR_TYPE));
|
||||
if (ppShow) {
|
||||
mTrace("show:%d, data:%p acquired from cache", pShow->id, ppShow);
|
||||
return 0;
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove) {
|
||||
SMnode *pMnode = pShow->pMnode;
|
||||
SShowMgmt *pMgmt = &pMnode->showMgmt;
|
||||
SShowObj **ppShow = (SShowObj **)pShow->ppShow;
|
||||
taosCacheRelease(pMgmt->cache, (void **)(&ppShow), forceRemove);
|
||||
mDebug("show:%d, data:%p released from cache, force:%d", pShow->id, ppShow, forceRemove);
|
||||
}
|
||||
|
||||
static int32_t mndPutShowObj(SMnode *pMnode, SShowObj *pShow) {
|
||||
SShowMgmt *pMgmt = &pMnode->showMgmt;
|
||||
int32_t lifeSpan = pMnode->shellActivityTimer * 6 * 1000;
|
||||
|
||||
TSDB_CACHE_PTR_TYPE val = (TSDB_CACHE_PTR_TYPE)pShow;
|
||||
pShow->id = atomic_add_fetch_32(&pMgmt->showId, 1);
|
||||
SShowObj **ppShow =
|
||||
taosCachePut(pMgmt->cache, &val, sizeof(TSDB_CACHE_PTR_TYPE), &pShow, sizeof(TSDB_CACHE_PTR_TYPE), lifeSpan);
|
||||
if (ppShow == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
mError("show:%d, failed to put into cache", pShow->id);
|
||||
return -1;
|
||||
}
|
||||
|
||||
mTrace("show:%d, data:%p put into cache", pShow->id, ppShow);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void mndFreeShowObj(void *ppShow) {
|
||||
SShowObj *pShow = *(SShowObj **)ppShow;
|
||||
SMnode *pMnode = pShow->pMnode;
|
||||
SShowMgmt *pMgmt = &pMnode->showMgmt;
|
||||
|
||||
ShowFreeIterFp freeFp = pMgmt->freeIterFps[pShow->type];
|
||||
if (freeFp != NULL) {
|
||||
if (pShow->pVgIter != NULL) {
|
||||
// only used in 'show vnodes "ep"'
|
||||
(*freeFp)(pMnode, pShow->pVgIter);
|
||||
}
|
||||
if (pShow->pIter != NULL) {
|
||||
(*freeFp)(pMnode, pShow->pIter);
|
||||
}
|
||||
}
|
||||
|
||||
mDebug("show:%d, data:%p destroyed", pShow->id, ppShow);
|
||||
tfree(pShow);
|
||||
}
|
||||
|
||||
void mnodeVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_t capacity, SShowObj *pShow) {
|
||||
if (rows < capacity) {
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
|
|
Loading…
Reference in New Issue