diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 4e4ba6c0d7..1dc36f1304 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -130,17 +130,17 @@ typedef struct SFileBlockDumpInfo { bool allDumped; } SFileBlockDumpInfo; -typedef struct SUidOrderCheckInfo { +typedef struct SUidOrderedList { uint64_t* tableUidList; // access table uid list in uid ascending order list int32_t currentIndex; // index in table uid list -} SUidOrderCheckInfo; +} SUidOrderedList; typedef struct SReaderStatus { bool loadFromFile; // check file stage bool composedDataBlock; // the returned data block is a composed block or not SHashObj* pTableMap; // SHash STableBlockScanInfo** pTableIter; // table iterator used in building in-memory buffer data blocks. - SUidOrderCheckInfo uidCheckInfo; // check all table in uid order + SUidOrderedList uidCheckInfo; // check all table in uid order SFileBlockDumpInfo fBlockDumpInfo; SDFileSet* pCurrentFileset; // current opened file set SBlockData fileBlockData; @@ -311,6 +311,16 @@ static void* getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index) { return (*pBucket) + (index % pBuf->numPerBucket) * sizeof(STableBlockScanInfo); } +static int32_t uidComparFunc(const void* p1, const void* p2) { + uint64_t pu1 = *(uint64_t*)p1; + uint64_t pu2 = *(uint64_t*)p2; + if (pu1 == pu2) { + return 0; + } else { + return (pu1 < pu2) ? -1 : 1; + } +} + // NOTE: speedup the whole processing by preparing the buffer for STableBlockScanInfo in batch model static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList, int32_t numOfTables) { // allocate buffer in order to load data blocks from file @@ -324,9 +334,20 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf int64_t st = taosGetTimestampUs(); initBlockScanInfoBuf(pBuf, numOfTables); + SUidOrderedList* pOrderedCheckInfo = &pTsdbReader->status.uidCheckInfo; + + pOrderedCheckInfo->tableUidList = taosMemoryMalloc(numOfTables * sizeof(uint64_t)); + if (pOrderedCheckInfo->tableUidList == NULL) { + return NULL; + } + pOrderedCheckInfo->currentIndex = 0; + for (int32_t j = 0; j < numOfTables; ++j) { STableBlockScanInfo* pScanInfo = getPosInBlockInfoBuf(pBuf, j); + pScanInfo->uid = idList[j].uid; + pOrderedCheckInfo->tableUidList[j] = idList[j].uid; + if (ASCENDING_TRAVERSE(pTsdbReader->order)) { int64_t skey = pTsdbReader->window.skey; pScanInfo->lastKey = (skey > INT64_MIN) ? (skey - 1) : skey; @@ -340,6 +361,8 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf pScanInfo->lastKey, pTsdbReader->idStr); } + taosSort(pOrderedCheckInfo->tableUidList, numOfTables, sizeof(uint64_t), uidComparFunc); + pTsdbReader->cost.createScanInfoList = (taosGetTimestampUs() - st) / 1000.0; tsdbDebug("%p create %d tables scan-info, size:%.2f Kb, elapsed time:%.2f ms, %s", pTsdbReader, numOfTables, (sizeof(STableBlockScanInfo) * numOfTables) / 1024.0, pTsdbReader->cost.createScanInfoList, @@ -663,28 +686,42 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, int64_t et1 = taosGetTimestampUs(); SBlockIdx* pBlockIdx = NULL; - for (int32_t i = 0; i < num; ++i) { + SUidOrderedList* pList = &pReader->status.uidCheckInfo; + + int32_t i = 0, j = 0; + while(i < num && j < numOfTables) { pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i); - - // uid check if (pBlockIdx->suid != pReader->suid) { + i += 1; continue; } - // this block belongs to a table that is not queried. - void* p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(uint64_t)); - if (p == NULL) { + if (pBlockIdx->uid < pList->tableUidList[j]) { + i += 1; continue; } - STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)p; - if (pScanInfo->pBlockList == NULL) { - pScanInfo->pBlockList = taosArrayInit(4, sizeof(SBlockIndex)); + if (pBlockIdx->uid == pList->tableUidList[j]) { + i += 1; + j += 1; + + // this block belongs to a table that is not queried. + void* p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(uint64_t)); + if (p == NULL) { + tsdbError("failed to locate the tableBlockScan Info in hashmap, uid:%"PRIu64", %s", pBlockIdx->uid, pReader->idStr); + return TSDB_CODE_APP_ERROR; + } + + STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)p; + if (pScanInfo->pBlockList == NULL) { + pScanInfo->pBlockList = taosArrayInit(4, sizeof(SBlockIndex)); + } + + taosArrayPush(pIndexList, pBlockIdx); } - taosArrayPush(pIndexList, pBlockIdx); - if (taosArrayGetSize(pIndexList) == numOfTables) { - break; + if (pBlockIdx->uid > pList->tableUidList[j]) { + j += 1; } } @@ -2682,17 +2719,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) { return TSDB_CODE_SUCCESS; } -static int32_t uidComparFunc(const void* p1, const void* p2) { - uint64_t pu1 = *(uint64_t*)p1; - uint64_t pu2 = *(uint64_t*)p2; - if (pu1 == pu2) { - return 0; - } else { - return (pu1 < pu2) ? -1 : 1; - } -} - -static void extractOrderedTableUidList(SUidOrderCheckInfo* pOrderCheckInfo, SReaderStatus* pStatus, int32_t order) { +static void extractOrderedTableUidList(SUidOrderedList* pOrderCheckInfo, SReaderStatus* pStatus, int32_t order) { int32_t index = 0; int32_t total = taosHashGetSize(pStatus->pTableMap); @@ -2718,7 +2745,7 @@ static void resetScanBlockLastBlockDelIndex(SReaderStatus* pStatus, int32_t orde } } -static int32_t initOrderCheckInfo(SUidOrderCheckInfo* pOrderCheckInfo, STsdbReader* pReader) { +static int32_t initOrderCheckInfo(SUidOrderedList* pOrderCheckInfo, STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; int32_t total = taosHashGetSize(pStatus->pTableMap); @@ -2742,7 +2769,7 @@ static int32_t initOrderCheckInfo(SUidOrderCheckInfo* pOrderCheckInfo, STsdbRead uint64_t uid = pOrderCheckInfo->tableUidList[pOrderCheckInfo->currentIndex]; pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid)); - // the tableMap has already updated + // the tableMap has already updated, let's also update the order list if (pStatus->pTableIter == NULL) { void* p = taosMemoryRealloc(pOrderCheckInfo->tableUidList, total * sizeof(uint64_t)); if (p == NULL) { @@ -2761,7 +2788,15 @@ static int32_t initOrderCheckInfo(SUidOrderCheckInfo* pOrderCheckInfo, STsdbRead return TSDB_CODE_SUCCESS; } -static bool moveToNextTable(SUidOrderCheckInfo* pOrderedCheckInfo, SReaderStatus* pStatus) { +static void resetTableListIndex(SReaderStatus *pStatus) { + SUidOrderedList* pList = &pStatus->uidCheckInfo; + + pList->currentIndex = 0; + uint64_t uid = pList->tableUidList[0]; + pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid)); +} + +static bool moveToNextTable(SUidOrderedList* pOrderedCheckInfo, SReaderStatus* pStatus) { pOrderedCheckInfo->currentIndex += 1; if (pOrderedCheckInfo->currentIndex >= taosHashGetSize(pStatus->pTableMap)) { pStatus->pTableIter = NULL; @@ -2777,10 +2812,9 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader; - SUidOrderCheckInfo* pOrderedCheckInfo = &pStatus->uidCheckInfo; - int32_t code = initOrderCheckInfo(pOrderedCheckInfo, pReader); - if (code != TSDB_CODE_SUCCESS || (taosHashGetSize(pStatus->pTableMap) == 0)) { - return code; + SUidOrderedList* pOrderedCheckInfo = &pStatus->uidCheckInfo; + if (taosHashGetSize(pStatus->pTableMap) == 0) { + return TSDB_CODE_SUCCESS; } SSDataBlock* pResBlock = pReader->pResBlock; @@ -3028,6 +3062,7 @@ static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBl } else { // no block data, only last block exists tBlockDataReset(&pReader->status.fileBlockData); resetDataBlockIterator(pBlockIter, pReader->order); + resetTableListIndex(&pReader->status); } // set the correct start position according to the query time window @@ -3069,6 +3104,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { // this file does not have data files, let's start check the last block file if exists if (pBlockIter->numOfBlocks == 0) { resetScanBlockLastBlockDelIndex(&pReader->status, pReader->order); + resetTableListIndex(&pReader->status); goto _begin; } } @@ -3101,6 +3137,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { tBlockDataReset(&pReader->status.fileBlockData); resetDataBlockIterator(pBlockIter, pReader->order); resetScanBlockLastBlockDelIndex(&pReader->status, pReader->order); + resetTableListIndex(&pReader->status); goto _begin; } else { code = initForFirstBlockInFile(pReader, pBlockIter); @@ -3113,6 +3150,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { // this file does not have blocks, let's start check the last block file if (pBlockIter->numOfBlocks == 0) { resetScanBlockLastBlockDelIndex(&pReader->status, pReader->order); + resetTableListIndex(&pReader->status); goto _begin; } } @@ -3774,11 +3812,15 @@ int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t n ASSERT(size >= num); taosHashClear(pReader->status.pTableMap); + SUidOrderedList* pUidList = &pReader->status.uidCheckInfo; + pUidList->currentIndex = 0; STableKeyInfo* pList = (STableKeyInfo*)pTableList; for (int32_t i = 0; i < num; ++i) { STableBlockScanInfo* pInfo = getPosInBlockInfoBuf(&pReader->blockInfoBuf, i); pInfo->uid = pList[i].uid; + pUidList->tableUidList[i] = pList[i].uid; + taosHashPut(pReader->status.pTableMap, &pInfo->uid, sizeof(uint64_t), &pInfo, POINTER_BYTES); } @@ -3825,13 +3867,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL pCond->twindows.ekey -= 1; } - int32_t capacity = 0; - if (pResBlock == NULL) { - capacity = 4096; - } else { - capacity = pResBlock->info.capacity; - } - + int32_t capacity = (pResBlock == NULL)? 4096:pResBlock->info.capacity; int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, capacity, pResBlock, idstr); if (code != TSDB_CODE_SUCCESS) { goto _err; @@ -4304,12 +4340,14 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { return TSDB_CODE_SUCCESS; } - SDataBlockIter* pBlockIter = &pReader->status.blockIter; + SReaderStatus* pStatus = &pReader->status; + + SDataBlockIter* pBlockIter = &pStatus->blockIter; pReader->order = pCond->order; pReader->type = TIMEWINDOW_RANGE_CONTAINED; - pReader->status.loadFromFile = true; - pReader->status.pTableIter = NULL; + pStatus->loadFromFile = true; + pStatus->pTableIter = NULL; pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows); // allocate buffer in order to load data blocks from file @@ -4318,19 +4356,20 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID; tsdbDataFReaderClose(&pReader->pFileReader); - int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap); + int32_t numOfTables = taosHashGetSize(pStatus->pTableMap); - initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader); + initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->fs.aDFileSet, pReader); resetDataBlockIterator(pBlockIter, pReader->order); + resetTableListIndex(&pReader->status); int64_t ts = ASCENDING_TRAVERSE(pReader->order) ? pReader->window.skey - 1 : pReader->window.ekey + 1; - resetAllDataBlockScanInfo(pReader->status.pTableMap, ts); + resetAllDataBlockScanInfo(pStatus->pTableMap, ts); int32_t code = 0; // no data in files, let's try buffer in memory - if (pReader->status.fileIter.numOfFiles == 0) { - pReader->status.loadFromFile = false; + if (pStatus->fileIter.numOfFiles == 0) { + pStatus->loadFromFile = false; } else { code = initForFirstBlockInFile(pReader, pBlockIter); if (code != TSDB_CODE_SUCCESS) { @@ -4408,7 +4447,7 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa hasNext = blockIteratorNext(&pStatus->blockIter, pReader->idStr); } else { code = initForFirstBlockInFile(pReader, pBlockIter); - if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) { + if ((code != TSDB_CODE_SUCCESS) || (pStatus->loadFromFile == false)) { break; }