diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index 950c9348af..d660dfa99d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -120,6 +120,92 @@ static SBlockData* loadBlockIfMissing(SLDataIter *pIter) { return NULL; } +// find the earliest block that contains the required records +static FORCE_INLINE int32_t findEarliestIndex(int32_t index, uint64_t uid, const SSttBlk* pBlockList, int32_t num, int32_t backward) { + int32_t i = index; + int32_t step = backward? 1:-1; + while (i >= 0 && i < num && uid >= pBlockList[i].minUid && uid <= pBlockList[i].maxUid) { + i += step; + } + return i - step; +} + +static int32_t binarySearchForStartBlock(SSttBlk*pBlockList, int32_t num, uint64_t uid, int32_t backward) { + int32_t midPos = -1; + if (num <= 0) { + return -1; + } + + int32_t firstPos = 0; + int32_t lastPos = num - 1; + + // find the first position which is bigger than the key + if ((uid > pBlockList[lastPos].maxUid) || (uid < pBlockList[firstPos].minUid)) { + return -1; + } + + while (1) { + if (uid >= pBlockList[firstPos].minUid && uid <= pBlockList[firstPos].maxUid) { + return findEarliestIndex(firstPos, uid, pBlockList, num, backward); + } + + if (uid > pBlockList[lastPos].maxUid || uid < pBlockList[firstPos].minUid) { + return -1; + } + + int32_t numOfRows = lastPos - firstPos + 1; + midPos = (numOfRows >> 1u) + firstPos; + + if (uid < pBlockList[midPos].minUid) { + lastPos = midPos - 1; + } else if (uid > pBlockList[midPos].maxUid) { + firstPos = midPos + 1; + } else { + return findEarliestIndex(midPos, uid, pBlockList, num, backward); + } + } +} + +static FORCE_INLINE int32_t findEarliestRow(int32_t index, uint64_t uid, const uint64_t* uidList, int32_t num, int32_t backward) { + int32_t i = index; + int32_t step = backward? 1:-1; + while (i >= 0 && i < num && uid == uidList[i]) { + i += step; + } + return i - step; +} + +static int32_t binarySearchForStartRowIndex(uint64_t* uidList, int32_t num, uint64_t uid, int32_t backward) { + int32_t firstPos = 0; + int32_t lastPos = num - 1; + + // find the first position which is bigger than the key + if ((uid > uidList[lastPos]) || (uid < uidList[firstPos])) { + return -1; + } + + while (1) { + if (uid == uidList[firstPos]) { + return findEarliestRow(firstPos, uid, uidList, num, backward); + } + + if (uid > uidList[lastPos] || uid < uidList[firstPos]) { + return -1; + } + + int32_t numOfRows = lastPos - firstPos + 1; + int32_t midPos = (numOfRows >> 1u) + firstPos; + + if (uid < uidList[midPos]) { + lastPos = midPos - 1; + } else if (uid > uidList[midPos]) { + firstPos = midPos + 1; + } else { + return findEarliestRow(midPos, uid, uidList, num, backward); + } + } +} + int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid, uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo* pBlockLoadInfo) { int32_t code = 0; @@ -141,39 +227,25 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t code = tsdbReadSttBlk(pReader, iStt, pBlockLoadInfo->aSttBlk); if (code) { goto _exit; + } else { + size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk); + SArray* pTmp = taosArrayInit(size, sizeof(SSttBlk)); + for(int32_t i = 0; i < size; ++i) { + SSttBlk* p = taosArrayGet(pBlockLoadInfo->aSttBlk, i); + if (p->suid == suid) { + taosArrayPush(pTmp, p); + } + } + + taosArrayDestroy(pBlockLoadInfo->aSttBlk); + pBlockLoadInfo->aSttBlk = pTmp; } } size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk); // find the start block - int32_t index = -1; - if (!backward) { // asc - for (int32_t i = 0; i < size; ++i) { - SSttBlk *p = taosArrayGet(pBlockLoadInfo->aSttBlk, i); - if (p->suid != suid) { - continue; - } - - if (p->minUid <= uid && p->maxUid >= uid) { - index = i; - break; - } - } - } else { // desc - for (int32_t i = size - 1; i >= 0; --i) { - SSttBlk *p = taosArrayGet(pBlockLoadInfo->aSttBlk, i); - if (p->suid != suid) { - continue; - } - - if (p->minUid <= uid && p->maxUid >= uid) { - index = i; - break; - } - } - } - + int32_t index = binarySearchForStartBlock(pBlockLoadInfo->aSttBlk->pData, size, uid, backward); (*pIter)->iSttBlk = index; if (index != -1) { (*pIter)->pSttBlk = taosArrayGet(pBlockLoadInfo->aSttBlk, (*pIter)->iSttBlk); @@ -193,7 +265,7 @@ void tLDataIterNextBlock(SLDataIter *pIter) { pIter->iSttBlk += step; int32_t index = -1; - size_t size = taosArrayGetSize(pIter->pBlockLoadInfo->aSttBlk); + size_t size = pIter->pBlockLoadInfo->aSttBlk->size;//taosArrayGetSize(pIter->pBlockLoadInfo->aSttBlk); for (int32_t i = pIter->iSttBlk; i < size && i >= 0; i += step) { SSttBlk *p = taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, i); if ((!pIter->backward) && p->minUid > pIter->uid) { @@ -232,9 +304,8 @@ void tLDataIterNextBlock(SLDataIter *pIter) { } } - if (index == -1) { - pIter->pSttBlk = NULL; - } else { + pIter->pSttBlk = NULL; + if (index != -1) { pIter->pSttBlk = (SSttBlk *)taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, pIter->iSttBlk); } } @@ -247,18 +318,27 @@ static void findNextValidRow(SLDataIter *pIter) { SBlockData *pBlockData = loadBlockIfMissing(pIter); + // mostly we only need to find the start position for a given table + if ((((i == 0) && (!pIter->backward)) || (i == pBlockData->nRow - 1 && pIter->backward)) && pBlockData->aUid != NULL) { + i = binarySearchForStartRowIndex((uint64_t*)pBlockData->aUid, pBlockData->nRow, pIter->uid, pIter->backward); + if (i == -1) { + pIter->iRow = -1; + return; + } + } + for (; i < pBlockData->nRow && i >= 0; i += step) { if (pBlockData->aUid != NULL) { if (!pIter->backward) { - if (pBlockData->aUid[i] < pIter->uid) { + /*if (pBlockData->aUid[i] < pIter->uid) { continue; - } else if (pBlockData->aUid[i] > pIter->uid) { + } else */if (pBlockData->aUid[i] > pIter->uid) { break; } } else { - if (pBlockData->aUid[i] > pIter->uid) { + /*if (pBlockData->aUid[i] > pIter->uid) { continue; - } else if (pBlockData->aUid[i] < pIter->uid) { + } else */if (pBlockData->aUid[i] < pIter->uid) { break; } } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index ea488d8f1c..dea17f92da 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -1238,6 +1238,38 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB return false; } +static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo) { + while (1) { + bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree); + if (!hasVal) { + return false; + } + + TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree); + TSDBKEY k = TSDBROW_KEY(&row); + if (!hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order)) { + return true; + } + } +} + +static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLastBlockReader, + STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader) { + bool hasVal = nextRowFromLastBlocks(pLastBlockReader, pScanInfo); + if (hasVal) { + int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader); + if (next1 != ts) { + doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow); + return true; + } + } else { + doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow); + return true; + } + + return false; +} + static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* pReader, uint64_t uid) { // always set the newest schema version in pReader->pSchema if (pReader->pSchema == NULL) { @@ -1389,25 +1421,54 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STSRow* pTSRow = NULL; SRowMerger merge = {0}; + TSDBROW fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree); - TSDBROW fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree); - tRowMergerInit(&merge, &fRow, pReader->pSchema); - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge); + // only last block exists + if ((!mergeBlockData) || (tsLastBlock != pBlockData->aTSKEY[pDumpInfo->rowIndex])) { + if (tryCopyDistinctRowFromSttBlock(&fRow, pLastBlockReader, pBlockScanInfo, tsLastBlock, pReader)) { + return TSDB_CODE_SUCCESS; + } else { + tRowMergerInit(&merge, &fRow, pReader->pSchema); - // merge with block data if ts == key - if (mergeBlockData && (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex])) { - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); + TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); + tRowMerge(&merge, &fRow1); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge); + + // merge with block data if ts == key + if (mergeBlockData && (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex])) { + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); + } + + int32_t code = tRowMergerGetRow(&merge, &pTSRow); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); + + taosMemoryFree(pTSRow); + tRowMergerClear(&merge); + } + } else { // not merge block data + tRowMergerInit(&merge, &fRow, pReader->pSchema); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge); + + // merge with block data if ts == key + if (mergeBlockData && (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex])) { + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); + } + + int32_t code = tRowMergerGetRow(&merge, &pTSRow); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); + + taosMemoryFree(pTSRow); + tRowMergerClear(&merge); } - int32_t code = tRowMergerGetRow(&merge, &pTSRow); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); - - taosMemoryFree(pTSRow); - tRowMergerClear(&merge); return TSDB_CODE_SUCCESS; } @@ -1858,21 +1919,6 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); } -static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo) { - while (1) { - bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree); - if (!hasVal) { - return false; - } - - TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree); - TSDBKEY k = TSDBROW_KEY(&row); - if (!hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order)) { - return true; - } - } -} - static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) { // the last block reader has been initialized for this table. if (pLBlockReader->uid == pScanInfo->uid) { @@ -1906,8 +1952,7 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) { TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree); - TSDBKEY key = TSDBROW_KEY(&row); - return key.ts; + return TSDBROW_TS(&row); } static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; } @@ -3080,11 +3125,16 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i); SColData* pData = tBlockDataGetColDataByIdx(pBlockData, j); + if (pData->cid < pCol->info.colId) { + j += 1; + continue; + } + if (pData->cid == pCol->info.colId) { tColDataGetValue(pData, rowIndex, &cv); doCopyColVal(pCol, outputRowIndex, i, &cv, pSupInfo); j += 1; - } else { // the specified column does not exist in file block, fill with null data + } else if (pData->cid > pCol->info.colId) { // the specified column does not exist in file block, fill with null data colDataAppendNULL(pCol, outputRowIndex); } @@ -3206,11 +3256,18 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl } } + // NOTE: the endVersion in pCond is the data version not schema version, so pCond->endVersion is not correct here. if (pCond->suid != 0) { - pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, pCond->endVersion); + pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, /*pCond->endVersion*/ -1); + if (pReader->pSchema == NULL) { + tsdbError("failed to get table schema, suid:%"PRIu64", ver:%"PRId64" , %s", pReader->suid, -1, pReader->idStr); + } } else if (taosArrayGetSize(pTableList) > 0) { STableKeyInfo* pKey = taosArrayGet(pTableList, 0); - pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, pCond->endVersion); + pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, /*pCond->endVersion*/ -1); + if (pReader->pSchema == NULL) { + tsdbError("failed to get table schema, uid:%"PRIu64", ver:%"PRId64" , %s", pKey->uid, -1, pReader->idStr); + } } int32_t numOfTables = taosArrayGetSize(pTableList); diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index 11df13d451..8987ba3bbd 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -333,7 +333,7 @@ python3 ./test.py -f 7-tmq/stbTagFilter-1ctb.py python3 ./test.py -f 7-tmq/dataFromTsdbNWal.py python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py python3 ./test.py -f 7-tmq/tmq_taosx.py -# python3 ./test.py -f 7-tmq/stbTagFilter-multiCtb.py +python3 ./test.py -f 7-tmq/stbTagFilter-multiCtb.py #------------querPolicy 2-----------