diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 2d053d04ae..eae398880b 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -178,7 +178,7 @@ typedef struct STsdbReader STsdbReader; int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t num); int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables, - SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr); + SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr, bool countOnly); void tsdbReaderSetId(STsdbReader* pReader, const char* idstr); void tsdbReaderClose(STsdbReader *pReader); diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 452b1f6c0b..0c4ada2cb1 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -224,6 +224,8 @@ int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, void *tsdbTbDataIterDestroy(STbDataIter *pIter); void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter *pIter); bool tsdbTbDataIterNext(STbDataIter *pIter); +void tsdbMemTableCountRows(SMemTable *pMemTable, SHashObj* pTableMap, int64_t *rowsNum); + // STbData int32_t tsdbGetNRowsInTbData(STbData *pTbData); // tsdbFile.c ============================================================================================== diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index d34af9acae..46d0509c38 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -282,6 +282,34 @@ bool tsdbTbDataIterNext(STbDataIter *pIter) { return true; } +int64_t tsdbCountTbDataRows(STbData *pTbData) { + SMemSkipListNode *pNode = NULL; + int64_t rowsNum = 0; + + while (true) { + pNode = SL_GET_NODE_FORWARD(pTbData->sl.pHead, 0); + if (pNode == pTbData->sl.pTail) { + return rowsNum; + } + + rowsNum++; + } +} + +void tsdbMemTableCountRows(SMemTable *pMemTable, SHashObj* pTableMap, int64_t *rowsNum) { + taosRLockLatch(&pMemTable->latch); + for (int32_t i = 0; i < pMemTable->nBucket; ++i) { + STbData *pTbData = pMemTable->aBucket[i]; + + void* p = taosHashGet(pTableMap, &pTbData->uid, sizeof(pTbData->uid)); + if (p == NULL) { + continue; + } + rowsNum += tsdbCountTbDataRows(pTbData); + } + taosRUnLockLatch(&pMemTable->latch); +} + static int32_t tsdbMemTableRehash(SMemTable *pMemTable) { int32_t code = 0; @@ -787,4 +815,4 @@ SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable) { _exit: return aTbDataP; -} \ No newline at end of file +} diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index c6444b5ded..8b6f318434 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3113,7 +3113,7 @@ static int32_t doSumBlockRows(STsdbReader* pReader, SDataFReader* pFileReader) { } STableBlockScanInfo** p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(pBlockIdx->uid)); - if (p == NULL || *p == NULL) { + if (p == NULL) { continue; } @@ -3189,6 +3189,10 @@ static int32_t readRowsCountFromStt(STsdbReader* pReader) { taosArrayClear(pBlockLoadInfo->aSttBlk); continue; } + for (int32_t i = 0; i < size; ++i) { + SSttBlk *p = taosArrayGet(pBlockLoadInfo->aSttBlk, i); + pReader->rowsNum += p->nRow; + } } else { for (int32_t i = 0; i < size; ++i) { SSttBlk *p = taosArrayGet(pBlockLoadInfo->aSttBlk, i); @@ -3210,6 +3214,22 @@ static int32_t readRowsCountFromStt(STsdbReader* pReader) { return code; } +static int32_t readRowsCountFromMem(STsdbReader* pReader) { + int32_t code = TSDB_CODE_SUCCESS; + int64_t memNum = 0, imemNum = 0; + if (pReader->pReadSnap->pMem != NULL) { + tsdbMemTableCountRows(pReader->pReadSnap->pMem, pReader->status.pTableMap, &memNum); + } + + if (pReader->pReadSnap->pIMem != NULL) { + tsdbMemTableCountRows(pReader->pReadSnap->pIMem, pReader->status.pTableMap, &imemNum); + } + + pReader->rowsNum += memNum + imemNum; + + return code; +} + static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; @@ -4170,7 +4190,7 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) { // ====================================== EXPOSED APIs ====================================== int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables, - SSDataBlock* pResBlock, STsdbReader** ppReader, const char* idstr) { + SSDataBlock* pResBlock, STsdbReader** ppReader, const char* idstr, bool countOnly) { STimeWindow window = pCond->twindows; if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) { pCond->twindows.skey += 1; @@ -4262,9 +4282,10 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL pReader->suspended = true; + if (countOnly) { + pReader->readMode = READ_MODE_COUNT_ONLY; + } - pReader->readMode = READ_MODE_COUNT_ONLY; - tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr); return code; @@ -4588,6 +4609,11 @@ static bool tsdbReadRowsCountOnly(STsdbReader* pReader) { } } + code = readRowsCountFromMem(pReader); + if (code != TSDB_CODE_SUCCESS) { + return false; + } + pBlock->info.rows = pReader->rowsNum; pBlock->info.id.uid = 0; pBlock->info.dataLoad = 0; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 3f519568c4..fd29df1acc 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -339,6 +339,7 @@ typedef struct STableScanInfo { int8_t scanMode; int8_t assignBlockUid; bool hasGroupByTag; + bool countOnly; } STableScanInfo; typedef struct STableMergeScanInfo { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index db58bd6f68..a1b27114b3 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1140,7 +1140,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT int32_t num = tableListGetSize(pTaskInfo->pTableInfoList); if (tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &pTableScanInfo->base.cond, pList, num, - pTableScanInfo->pResBlock, &pTableScanInfo->base.dataReader, NULL) < 0 || + pTableScanInfo->pResBlock, &pTableScanInfo->base.dataReader, NULL, false) < 0 || pTableScanInfo->base.dataReader == NULL) { qError("tsdbReaderOpen failed. uid:%" PRIi64, pOffset->uid); return -1; @@ -1192,7 +1192,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0); int32_t size = tableListGetSize(pTaskInfo->pTableInfoList); - tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL, &pInfo->dataReader, NULL); + tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL, &pInfo->dataReader, NULL, false); cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond); strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 2af13f083d..854a2c1aaf 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -31,6 +31,9 @@ #include "thash.h" #include "ttypes.h" +int32_t scanDebug = 0; + + #define MULTI_READER_MAX_TABLE_NUM 5000 #define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN) #define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) @@ -785,7 +788,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { ASSERT(pInfo->base.dataReader == NULL); int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock, - (STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo)); + (STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), pInfo->countOnly); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } @@ -916,6 +919,10 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, goto _error; } + if (scanDebug) { + pInfo->countOnly = true; + } + taosLRUCacheSetStrictCapacity(pInfo->base.metaCache.pTableMetaEntryCache, false); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScan, NULL, destroyTableScanOperatorInfo, optrDefaultBufFn, getTableScannerExecInfo); @@ -1007,7 +1014,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU SSDataBlock* pBlock = pTableScanInfo->pResBlock; STsdbReader* pReader = NULL; int32_t code = tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock, - (STsdbReader**)&pReader, GET_TASKID(pTaskInfo)); + (STsdbReader**)&pReader, GET_TASKID(pTaskInfo), false); if (code != TSDB_CODE_SUCCESS) { terrno = code; T_LONG_JMP(pTaskInfo->env, code); @@ -2601,7 +2608,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { SReadHandle* pHandle = &pInfo->base.readHandle; if (NULL == source->dataReader || !source->multiReader) { - code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &source->dataReader, GET_TASKID(pTaskInfo)); + code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &source->dataReader, GET_TASKID(pTaskInfo), false); if (code != 0) { T_LONG_JMP(pTaskInfo->env, code); } diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index f24d3523c8..d05c692f19 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -2267,7 +2267,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi size_t num = tableListGetSize(pTableListInfo); void* pList = tableListGetInfo(pTableListInfo, 0); - code = tsdbReaderOpen(readHandle->vnode, &cond, pList, num, pInfo->pResBlock, &pInfo->pHandle, pTaskInfo->id.str); + code = tsdbReaderOpen(readHandle->vnode, &cond, pList, num, pInfo->pResBlock, &pInfo->pHandle, pTaskInfo->id.str, false); cleanupQueryTableDataCond(&cond); if (code != 0) { goto _error;