From e7149cce3a96120c0f14beb0fc9219ad1d0696c0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 7 Sep 2022 13:18:34 +0800 Subject: [PATCH] fix(query): opt perf in query last file blocks. --- source/dnode/vnode/src/tsdb/tsdbMergeTree.c | 134 +++++++++++++++----- source/dnode/vnode/src/tsdb/tsdbRead.c | 108 +++++++++++----- 2 files changed, 174 insertions(+), 68 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index 950c9348af..a721cfdbfc 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -120,6 +120,92 @@ static SBlockData* loadBlockIfMissing(SLDataIter *pIter) { return NULL; } +// find the earliest block that contains the required records +static FORCE_INLINE int32_t findEarliestIndex(int32_t index, uint64_t uid, const SSttBlk* pBlockList, int32_t backward) { + int32_t i = index; + int32_t step = backward? 1:-1; + while (uid >= pBlockList[i].minUid && uid <= pBlockList[i].maxUid && i >= 0) { + i += step; + } + return i - step; +} + +static int32_t binarySearchForStartBlock(SSttBlk*pBlockList, int32_t num, uint64_t uid, int32_t backward) { + int32_t midPos = -1; + if (num <= 0) { + return -1; + } + + int32_t firstPos = 0; + int32_t lastPos = num - 1; + + // find the first position which is bigger than the key + if ((uid > pBlockList[lastPos].maxUid) || (uid < pBlockList[firstPos].minUid)) { + return -1; + } + + while (1) { + if (uid >= pBlockList[firstPos].minUid && uid <= pBlockList[firstPos].maxUid) { + return findEarliestIndex(firstPos, uid, pBlockList, backward); + } + + if (uid > pBlockList[lastPos].maxUid || uid < pBlockList[firstPos].minUid) { + return -1; + } + + int32_t numOfRows = lastPos - firstPos + 1; + midPos = (numOfRows >> 1u) + firstPos; + + if (uid < pBlockList[midPos].minUid) { + lastPos = midPos - 1; + } else if (uid > pBlockList[midPos].maxUid) { + firstPos = midPos + 1; + } else { + return findEarliestIndex(midPos, uid, pBlockList, backward); + } + } +} + +static FORCE_INLINE int32_t findEarliestRow(int32_t index, uint64_t uid, const uint64_t* uidList, int32_t backward) { + int32_t i = index; + int32_t step = backward? 1:-1; + while (uid == uidList[i] && i >= 0) { + i += step; + } + return i - step; +} + +static int32_t binarySearchForStartRowIndex(uint64_t* uidList, int32_t num, uint64_t uid, int32_t backward) { + int32_t firstPos = 0; + int32_t lastPos = num - 1; + + // find the first position which is bigger than the key + if ((uid > uidList[lastPos]) || (uid < uidList[firstPos])) { + return -1; + } + + while (1) { + if (uid == uidList[firstPos]) { + return findEarliestRow(firstPos, uid, uidList, backward); + } + + if (uid > uidList[lastPos] || uid < uidList[firstPos]) { + return -1; + } + + int32_t numOfRows = lastPos - firstPos + 1; + int32_t midPos = (numOfRows >> 1u) + firstPos; + + if (uid < uidList[midPos]) { + lastPos = midPos - 1; + } else if (uid > uidList[midPos]) { + firstPos = midPos + 1; + } else { + return findEarliestRow(midPos, uid, uidList, backward); + } + } +} + int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid, uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo* pBlockLoadInfo) { int32_t code = 0; @@ -147,33 +233,7 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk); // find the start block - int32_t index = -1; - if (!backward) { // asc - for (int32_t i = 0; i < size; ++i) { - SSttBlk *p = taosArrayGet(pBlockLoadInfo->aSttBlk, i); - if (p->suid != suid) { - continue; - } - - if (p->minUid <= uid && p->maxUid >= uid) { - index = i; - break; - } - } - } else { // desc - for (int32_t i = size - 1; i >= 0; --i) { - SSttBlk *p = taosArrayGet(pBlockLoadInfo->aSttBlk, i); - if (p->suid != suid) { - continue; - } - - if (p->minUid <= uid && p->maxUid >= uid) { - index = i; - break; - } - } - } - + int32_t index = binarySearchForStartBlock(pBlockLoadInfo->aSttBlk->pData, size, uid, backward); (*pIter)->iSttBlk = index; if (index != -1) { (*pIter)->pSttBlk = taosArrayGet(pBlockLoadInfo->aSttBlk, (*pIter)->iSttBlk); @@ -193,7 +253,7 @@ void tLDataIterNextBlock(SLDataIter *pIter) { pIter->iSttBlk += step; int32_t index = -1; - size_t size = taosArrayGetSize(pIter->pBlockLoadInfo->aSttBlk); + size_t size = pIter->pBlockLoadInfo->aSttBlk->size;//taosArrayGetSize(pIter->pBlockLoadInfo->aSttBlk); for (int32_t i = pIter->iSttBlk; i < size && i >= 0; i += step) { SSttBlk *p = taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, i); if ((!pIter->backward) && p->minUid > pIter->uid) { @@ -232,9 +292,8 @@ void tLDataIterNextBlock(SLDataIter *pIter) { } } - if (index == -1) { - pIter->pSttBlk = NULL; - } else { + pIter->pSttBlk = NULL; + if (index != -1) { pIter->pSttBlk = (SSttBlk *)taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, pIter->iSttBlk); } } @@ -247,18 +306,23 @@ static void findNextValidRow(SLDataIter *pIter) { SBlockData *pBlockData = loadBlockIfMissing(pIter); + // mostly we only need to find the start position for a given table + if ((((i == 0) && (!pIter->backward)) || (i == pBlockData->nRow - 1 && pIter->backward)) && pBlockData->aUid != NULL) { + i = binarySearchForStartRowIndex((uint64_t*)pBlockData->aUid, pBlockData->nRow, pIter->uid, pIter->backward); + } + for (; i < pBlockData->nRow && i >= 0; i += step) { if (pBlockData->aUid != NULL) { if (!pIter->backward) { - if (pBlockData->aUid[i] < pIter->uid) { + /*if (pBlockData->aUid[i] < pIter->uid) { continue; - } else if (pBlockData->aUid[i] > pIter->uid) { + } else */if (pBlockData->aUid[i] > pIter->uid) { break; } } else { - if (pBlockData->aUid[i] > pIter->uid) { + /*if (pBlockData->aUid[i] > pIter->uid) { continue; - } else if (pBlockData->aUid[i] < pIter->uid) { + } else */if (pBlockData->aUid[i] < pIter->uid) { break; } } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index facfdb9a62..d205ca13e2 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -1238,6 +1238,38 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB return false; } +static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo) { + while (1) { + bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree); + if (!hasVal) { + return false; + } + + TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree); + TSDBKEY k = TSDBROW_KEY(&row); + if (!hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order)) { + return true; + } + } +} + +static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLastBlockReader, + STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader) { + bool hasVal = nextRowFromLastBlocks(pLastBlockReader, pScanInfo); + if (hasVal) { + int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader); + if (next1 != ts) { + doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow); + return true; + } + } else { + doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow); + return true; + } + + return false; +} + static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* pReader, uint64_t uid) { // always set the newest schema version in pReader->pSchema if (pReader->pSchema == NULL) { @@ -1385,29 +1417,55 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, bool mergeBlockData) { SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; - int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader); + int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader); STSRow* pTSRow = NULL; SRowMerger merge = {0}; + TSDBROW fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree); - TSDBROW fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree); - tRowMergerInit(&merge, &fRow, pReader->pSchema); - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge); + // only last block exists + if ((!mergeBlockData) || (tsLastBlock != pBlockData->aTSKEY[pDumpInfo->rowIndex])) { + if (tryCopyDistinctRowFromSttBlock(&fRow, pLastBlockReader, pBlockScanInfo, tsLastBlock, pReader)) { + return TSDB_CODE_SUCCESS; + } else { + tRowMergerInit(&merge, &fRow, pReader->pSchema); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge); - // merge with block data if ts == key - if (mergeBlockData && (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex])) { - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); + // merge with block data if ts == key + if (mergeBlockData && (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex])) { + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); + } + + int32_t code = tRowMergerGetRow(&merge, &pTSRow); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); + + taosMemoryFree(pTSRow); + tRowMergerClear(&merge); + } + } else { // not merge block data + tRowMergerInit(&merge, &fRow, pReader->pSchema); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge); + + // merge with block data if ts == key + if (mergeBlockData && (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex])) { + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); + } + + int32_t code = tRowMergerGetRow(&merge, &pTSRow); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); + + taosMemoryFree(pTSRow); + tRowMergerClear(&merge); } - int32_t code = tRowMergerGetRow(&merge, &pTSRow); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); - - taosMemoryFree(pTSRow); - tRowMergerClear(&merge); return TSDB_CODE_SUCCESS; } @@ -1858,21 +1916,6 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); } -static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo) { - while (1) { - bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree); - if (!hasVal) { - return false; - } - - TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree); - TSDBKEY k = TSDBROW_KEY(&row); - if (!hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order)) { - return true; - } - } -} - static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) { // the last block reader has been initialized for this table. if (pLBlockReader->uid == pScanInfo->uid) { @@ -1906,8 +1949,7 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) { TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree); - TSDBKEY key = TSDBROW_KEY(&row); - return key.ts; + return TSDBROW_TS(&row); } static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; }