From 1c89972fc6c97ef3a0df57e34eecbeeafb91bdc7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 3 Sep 2022 22:00:24 +0800 Subject: [PATCH] fix(query): add multiple blockdata structure. --- source/dnode/vnode/src/tsdb/tsdbMergeTree.c | 63 ++++++++++++++------- 1 file changed, 42 insertions(+), 21 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index ce36d74467..95f26161c3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -24,7 +24,8 @@ typedef struct SLDataIter { int8_t backward; SArray *aSstBlk; int32_t iSstBlk; - SBlockData bData; + SBlockData bData[2]; + int32_t loadIndex; int32_t iRow; SRowInfo rInfo; uint64_t uid; @@ -32,6 +33,15 @@ typedef struct SLDataIter { SVersionRange verRange; } SLDataIter; +static SBlockData* getCurrentBlock(SLDataIter* pIter) { + return &pIter->bData[pIter->loadIndex]; +} + +static SBlockData* getNextBlock(SLDataIter* pIter) { + pIter->loadIndex ^= 1; + return getCurrentBlock(pIter); +} + int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iSst, int8_t backward, uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange) { int32_t code = 0; @@ -53,7 +63,12 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t goto _exit; } - code = tBlockDataCreate(&(*pIter)->bData); + code = tBlockDataCreate(&(*pIter)->bData[0]); + if (code) { + goto _exit; + } + + code = tBlockDataCreate(&(*pIter)->bData[1]); if (code) { goto _exit; } @@ -95,7 +110,8 @@ _exit: } void tLDataIterClose(SLDataIter *pIter) { - tBlockDataDestroy(&pIter->bData, 1); + tBlockDataDestroy(&pIter->bData[0], 1); + tBlockDataDestroy(&pIter->bData[1], 1); taosArrayDestroy(pIter->aSstBlk); taosMemoryFree(pIter); } @@ -136,24 +152,26 @@ static void findNextValidRow(SLDataIter *pIter) { bool hasVal = false; int32_t i = pIter->iRow; - for (; i < pIter->bData.nRow && i >= 0; i += step) { - if (pIter->bData.aUid != NULL) { + SBlockData* pBlockData = getCurrentBlock(pIter); + + for (; i < pBlockData->nRow && i >= 0; i += step) { + if (pBlockData->aUid != NULL) { if (!pIter->backward) { - if (pIter->bData.aUid[i] < pIter->uid) { + if (pBlockData->aUid[i] < pIter->uid) { continue; - } else if (pIter->bData.aUid[i] > pIter->uid) { + } else if (pBlockData->aUid[i] > pIter->uid) { break; } } else { - if (pIter->bData.aUid[i] > pIter->uid) { + if (pBlockData->aUid[i] > pIter->uid) { continue; - } else if (pIter->bData.aUid[i] < pIter->uid) { + } else if (pBlockData->aUid[i] < pIter->uid) { break; } } } - int64_t ts = pIter->bData.aTSKEY[i]; + int64_t ts = pBlockData->aTSKEY[i]; if (!pIter->backward) { // asc if (ts > pIter->timeWindow.ekey) { // no more data break; @@ -168,7 +186,7 @@ static void findNextValidRow(SLDataIter *pIter) { } } - int64_t ver = pIter->bData.aVersion[i]; + int64_t ver = pBlockData->aVersion[i]; if (ver < pIter->verRange.minVer) { continue; } @@ -203,14 +221,16 @@ bool tLDataIterNextRow(SLDataIter *pIter) { } int32_t iBlockL = pIter->iSstBlk; + SBlockData* pBlockData = getCurrentBlock(pIter); - if (pIter->bData.nRow == 0 && pIter->pSstBlk != NULL) { // current block not loaded yet - code = tsdbReadSstBlockEx(pIter->pReader, pIter->iSst, pIter->pSstBlk, &pIter->bData); + if (pBlockData->nRow == 0 && pIter->pSstBlk != NULL) { // current block not loaded yet + pBlockData = getNextBlock(pIter); + code = tsdbReadSstBlockEx(pIter->pReader, pIter->iSst, pIter->pSstBlk, pBlockData); if (code != TSDB_CODE_SUCCESS) { goto _exit; } - pIter->iRow = (pIter->backward) ? pIter->bData.nRow : -1; + pIter->iRow = (pIter->backward) ? pBlockData->nRow : -1; } pIter->iRow += step; @@ -218,7 +238,7 @@ bool tLDataIterNextRow(SLDataIter *pIter) { while (1) { findNextValidRow(pIter); - if (pIter->iRow >= pIter->bData.nRow || pIter->iRow < 0) { + if (pIter->iRow >= pBlockData->nRow || pIter->iRow < 0) { tLDataIterNextBlock(pIter); if (pIter->pSstBlk == NULL) { // no more data goto _exit; @@ -228,17 +248,18 @@ bool tLDataIterNextRow(SLDataIter *pIter) { } if (iBlockL != pIter->iSstBlk) { - code = tsdbReadSstBlockEx(pIter->pReader, pIter->iSst, pIter->pSstBlk, &pIter->bData); + pBlockData = getNextBlock(pIter); + code = tsdbReadSstBlockEx(pIter->pReader, pIter->iSst, pIter->pSstBlk, pBlockData); if (code) { goto _exit; } - pIter->iRow = pIter->backward ? (pIter->bData.nRow - 1) : 0; + pIter->iRow = pIter->backward ? (pBlockData->nRow - 1) : 0; } } - pIter->rInfo.suid = pIter->bData.suid; - pIter->rInfo.uid = pIter->bData.uid; - pIter->rInfo.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow); + pIter->rInfo.suid = pBlockData->suid; + pIter->rInfo.uid = pBlockData->uid; + pIter->rInfo.row = tsdbRowFromBlockData(pBlockData, pIter->iRow); _exit: if (code != TSDB_CODE_SUCCESS) { @@ -322,7 +343,7 @@ bool tMergeTreeNext(SMergeTree *pMTree) { // compare with min in RB Tree pIter = (SLDataIter *)tRBTreeMin(&pMTree->rbt); if (pMTree->pIter && pIter) { - int32_t c = pMTree->rbt.cmprFn(pMTree->pIter->node.payload, &pIter->node.payload); + int32_t c = pMTree->rbt.cmprFn(pMTree->pIter->node.payload, pIter->node.payload); if (c > 0) { tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter); pMTree->pIter = NULL;