From bc2cc6487339248fb39eb68036412431148a28cc Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Fri, 16 Jun 2023 18:17:56 +0800 Subject: [PATCH] fix: show queries returned less rows when one conn has more than 100 queries --- source/dnode/mnode/impl/inc/mndDef.h | 1 + source/dnode/mnode/impl/src/mndProfile.c | 187 +++++++++++++---------- 2 files changed, 107 insertions(+), 81 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 3859dd688d..e913ecb6ab 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -467,6 +467,7 @@ typedef struct { int8_t replica; int16_t numOfColumns; int32_t numOfRows; + int32_t curIterPackedRows; void* pIter; SMnode* pMnode; STableMetaRsp* pMeta; diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 01dd223b5f..0bfab227c4 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -764,11 +764,101 @@ static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl return numOfRows; } +/** + * @param pConn the conn queries pack from + * @param[out] pBlock the block data packed into + * @param offset skip [offset] queries in pConn + * @param rowsToPack at most rows to pack + * @return rows packed +*/ +static int32_t packQueriesIntoBlock(SShowObj* pShow, SConnObj* pConn, SSDataBlock* pBlock, uint32_t offset, uint32_t rowsToPack) { + int32_t cols = 0; + taosRLockLatch(&pConn->queryLock); + int32_t numOfQueries = taosArrayGetSize(pConn->pQueries); + if (NULL == pConn->pQueries || numOfQueries <= offset) { + taosRUnLockLatch(&pConn->queryLock); + return 0; + } + + int32_t i = offset; + for (; i < numOfQueries && (i - offset) < rowsToPack; ++i) { + int32_t curRowIndex = pBlock->info.rows; + SQueryDesc *pQuery = taosArrayGet(pConn->pQueries, i); + cols = 0; + + char queryId[26 + VARSTR_HEADER_SIZE] = {0}; + sprintf(&queryId[VARSTR_HEADER_SIZE], "%x:%" PRIx64, pConn->id, pQuery->reqRid); + varDataLen(queryId) = strlen(&queryId[VARSTR_HEADER_SIZE]); + SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, curRowIndex, (const char *)queryId, false); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, curRowIndex, (const char *)&pQuery->queryId, false); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, curRowIndex, (const char *)&pConn->id, false); + + char app[TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE]; + STR_TO_VARSTR(app, pConn->app); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, curRowIndex, (const char *)app, false); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, curRowIndex, (const char *)&pConn->pid, false); + + char user[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_TO_VARSTR(user, pConn->user); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, curRowIndex, (const char *)user, false); + + char endpoint[TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE] = {0}; + sprintf(&endpoint[VARSTR_HEADER_SIZE], "%s:%d", taosIpStr(pConn->ip), pConn->port); + varDataLen(endpoint) = strlen(&endpoint[VARSTR_HEADER_SIZE]); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, curRowIndex, (const char *)endpoint, false); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, curRowIndex, (const char *)&pQuery->stime, false); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, curRowIndex, (const char *)&pQuery->useconds, false); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, curRowIndex, (const char *)&pQuery->stableQuery, false); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, curRowIndex, (const char *)&pQuery->subPlanNum, false); + + char subStatus[TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE] = {0}; + int32_t strSize = sizeof(subStatus); + int32_t offset = VARSTR_HEADER_SIZE; + for (int32_t i = 0; i < pQuery->subPlanNum && offset < strSize; ++i) { + if (i) { + offset += snprintf(subStatus + offset, strSize - offset - 1, ","); + } + SQuerySubDesc *pDesc = taosArrayGet(pQuery->subDesc, i); + offset += snprintf(subStatus + offset, strSize - offset - 1, "%" PRIu64 ":%s", pDesc->tid, pDesc->status); + } + varDataLen(subStatus) = strlen(&subStatus[VARSTR_HEADER_SIZE]); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, curRowIndex, subStatus, false); + + char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_TO_VARSTR(sql, pQuery->sql); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, curRowIndex, (const char *)sql, false); + + pBlock->info.rows++; + } + + taosRUnLockLatch(&pConn->queryLock); + return i - offset; +} + static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { - SMnode *pMnode = pReq->info.node; - SSdb *pSdb = pMnode->pSdb; + SMnode * pMnode = pReq->info.node; + SSdb * pSdb = pMnode->pSdb; int32_t numOfRows = 0; - int32_t cols = 0; SConnObj *pConn = NULL; if (pShow->pIter == NULL) { @@ -776,6 +866,16 @@ static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p pShow->pIter = taosCacheCreateIter(pMgmt->connCache); } + // means fetched some data last time for this conn + if (pShow->curIterPackedRows > 0) { + size_t len = 0; + pConn = taosCacheIterGetData(pShow->pIter, &len); + if (pConn && (taosArrayGetSize(pConn->pQueries) > pShow->curIterPackedRows)) { + numOfRows = packQueriesIntoBlock(pShow, pConn, pBlock, pShow->curIterPackedRows, rows); + pShow->curIterPackedRows += numOfRows; + } + } + while (numOfRows < rows) { pConn = mndGetNextConn(pMnode, pShow->pIter); if (pConn == NULL) { @@ -783,85 +883,10 @@ static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p break; } - taosRLockLatch(&pConn->queryLock); - if (NULL == pConn->pQueries || taosArrayGetSize(pConn->pQueries) <= 0) { - taosRUnLockLatch(&pConn->queryLock); - continue; - } - - int32_t numOfQueries = taosArrayGetSize(pConn->pQueries); - for (int32_t i = 0; i < numOfQueries && numOfRows < rows; ++i) { - SQueryDesc *pQuery = taosArrayGet(pConn->pQueries, i); - cols = 0; - - char queryId[26 + VARSTR_HEADER_SIZE] = {0}; - sprintf(&queryId[VARSTR_HEADER_SIZE], "%x:%" PRIx64, pConn->id, pQuery->reqRid); - varDataLen(queryId) = strlen(&queryId[VARSTR_HEADER_SIZE]); - SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)queryId, false); - - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&pQuery->queryId, false); - - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&pConn->id, false); - - char app[TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE]; - STR_TO_VARSTR(app, pConn->app); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)app, false); - - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&pConn->pid, false); - - char user[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0}; - STR_TO_VARSTR(user, pConn->user); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)user, false); - - char endpoint[TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE] = {0}; - sprintf(&endpoint[VARSTR_HEADER_SIZE], "%s:%d", taosIpStr(pConn->ip), pConn->port); - varDataLen(endpoint) = strlen(&endpoint[VARSTR_HEADER_SIZE]); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)endpoint, false); - - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&pQuery->stime, false); - - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&pQuery->useconds, false); - - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&pQuery->stableQuery, false); - - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&pQuery->subPlanNum, false); - - char subStatus[TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE] = {0}; - int32_t strSize = sizeof(subStatus); - int32_t offset = VARSTR_HEADER_SIZE; - for (int32_t i = 0; i < pQuery->subPlanNum && offset < strSize; ++i) { - if (i) { - offset += snprintf(subStatus + offset, strSize - offset - 1, ","); - } - SQuerySubDesc *pDesc = taosArrayGet(pQuery->subDesc, i); - offset += snprintf(subStatus + offset, strSize - offset - 1, "%" PRIu64 ":%s", pDesc->tid, pDesc->status); - } - varDataLen(subStatus) = strlen(&subStatus[VARSTR_HEADER_SIZE]); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, subStatus, false); - - char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0}; - STR_TO_VARSTR(sql, pQuery->sql); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)sql, false); - - numOfRows++; - } - - taosRUnLockLatch(&pConn->queryLock); + int32_t packedRows = packQueriesIntoBlock(pShow, pConn, pBlock, 0, rows - numOfRows); + pShow->curIterPackedRows = packedRows; + numOfRows += packedRows; } - pShow->numOfRows += numOfRows; return numOfRows; }