fix(query): add multiple blockdata structure.

This commit is contained in:
Haojun Liao 2022-09-03 22:00:24 +08:00
parent 957550c472
commit 1c89972fc6
1 changed files with 42 additions and 21 deletions

View File

@ -24,7 +24,8 @@ typedef struct SLDataIter {
int8_t backward;
SArray *aSstBlk;
int32_t iSstBlk;
SBlockData bData;
SBlockData bData[2];
int32_t loadIndex;
int32_t iRow;
SRowInfo rInfo;
uint64_t uid;
@ -32,6 +33,15 @@ typedef struct SLDataIter {
SVersionRange verRange;
} SLDataIter;
static SBlockData* getCurrentBlock(SLDataIter* pIter) {
return &pIter->bData[pIter->loadIndex];
}
static SBlockData* getNextBlock(SLDataIter* pIter) {
pIter->loadIndex ^= 1;
return getCurrentBlock(pIter);
}
int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iSst, int8_t backward, uint64_t uid,
STimeWindow *pTimeWindow, SVersionRange *pRange) {
int32_t code = 0;
@ -53,7 +63,12 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
goto _exit;
}
code = tBlockDataCreate(&(*pIter)->bData);
code = tBlockDataCreate(&(*pIter)->bData[0]);
if (code) {
goto _exit;
}
code = tBlockDataCreate(&(*pIter)->bData[1]);
if (code) {
goto _exit;
}
@ -95,7 +110,8 @@ _exit:
}
void tLDataIterClose(SLDataIter *pIter) {
tBlockDataDestroy(&pIter->bData, 1);
tBlockDataDestroy(&pIter->bData[0], 1);
tBlockDataDestroy(&pIter->bData[1], 1);
taosArrayDestroy(pIter->aSstBlk);
taosMemoryFree(pIter);
}
@ -136,24 +152,26 @@ static void findNextValidRow(SLDataIter *pIter) {
bool hasVal = false;
int32_t i = pIter->iRow;
for (; i < pIter->bData.nRow && i >= 0; i += step) {
if (pIter->bData.aUid != NULL) {
SBlockData* pBlockData = getCurrentBlock(pIter);
for (; i < pBlockData->nRow && i >= 0; i += step) {
if (pBlockData->aUid != NULL) {
if (!pIter->backward) {
if (pIter->bData.aUid[i] < pIter->uid) {
if (pBlockData->aUid[i] < pIter->uid) {
continue;
} else if (pIter->bData.aUid[i] > pIter->uid) {
} else if (pBlockData->aUid[i] > pIter->uid) {
break;
}
} else {
if (pIter->bData.aUid[i] > pIter->uid) {
if (pBlockData->aUid[i] > pIter->uid) {
continue;
} else if (pIter->bData.aUid[i] < pIter->uid) {
} else if (pBlockData->aUid[i] < pIter->uid) {
break;
}
}
}
int64_t ts = pIter->bData.aTSKEY[i];
int64_t ts = pBlockData->aTSKEY[i];
if (!pIter->backward) { // asc
if (ts > pIter->timeWindow.ekey) { // no more data
break;
@ -168,7 +186,7 @@ static void findNextValidRow(SLDataIter *pIter) {
}
}
int64_t ver = pIter->bData.aVersion[i];
int64_t ver = pBlockData->aVersion[i];
if (ver < pIter->verRange.minVer) {
continue;
}
@ -203,14 +221,16 @@ bool tLDataIterNextRow(SLDataIter *pIter) {
}
int32_t iBlockL = pIter->iSstBlk;
SBlockData* pBlockData = getCurrentBlock(pIter);
if (pIter->bData.nRow == 0 && pIter->pSstBlk != NULL) { // current block not loaded yet
code = tsdbReadSstBlockEx(pIter->pReader, pIter->iSst, pIter->pSstBlk, &pIter->bData);
if (pBlockData->nRow == 0 && pIter->pSstBlk != NULL) { // current block not loaded yet
pBlockData = getNextBlock(pIter);
code = tsdbReadSstBlockEx(pIter->pReader, pIter->iSst, pIter->pSstBlk, pBlockData);
if (code != TSDB_CODE_SUCCESS) {
goto _exit;
}
pIter->iRow = (pIter->backward) ? pIter->bData.nRow : -1;
pIter->iRow = (pIter->backward) ? pBlockData->nRow : -1;
}
pIter->iRow += step;
@ -218,7 +238,7 @@ bool tLDataIterNextRow(SLDataIter *pIter) {
while (1) {
findNextValidRow(pIter);
if (pIter->iRow >= pIter->bData.nRow || pIter->iRow < 0) {
if (pIter->iRow >= pBlockData->nRow || pIter->iRow < 0) {
tLDataIterNextBlock(pIter);
if (pIter->pSstBlk == NULL) { // no more data
goto _exit;
@ -228,17 +248,18 @@ bool tLDataIterNextRow(SLDataIter *pIter) {
}
if (iBlockL != pIter->iSstBlk) {
code = tsdbReadSstBlockEx(pIter->pReader, pIter->iSst, pIter->pSstBlk, &pIter->bData);
pBlockData = getNextBlock(pIter);
code = tsdbReadSstBlockEx(pIter->pReader, pIter->iSst, pIter->pSstBlk, pBlockData);
if (code) {
goto _exit;
}
pIter->iRow = pIter->backward ? (pIter->bData.nRow - 1) : 0;
pIter->iRow = pIter->backward ? (pBlockData->nRow - 1) : 0;
}
}
pIter->rInfo.suid = pIter->bData.suid;
pIter->rInfo.uid = pIter->bData.uid;
pIter->rInfo.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow);
pIter->rInfo.suid = pBlockData->suid;
pIter->rInfo.uid = pBlockData->uid;
pIter->rInfo.row = tsdbRowFromBlockData(pBlockData, pIter->iRow);
_exit:
if (code != TSDB_CODE_SUCCESS) {
@ -322,7 +343,7 @@ bool tMergeTreeNext(SMergeTree *pMTree) {
// compare with min in RB Tree
pIter = (SLDataIter *)tRBTreeMin(&pMTree->rbt);
if (pMTree->pIter && pIter) {
int32_t c = pMTree->rbt.cmprFn(pMTree->pIter->node.payload, &pIter->node.payload);
int32_t c = pMTree->rbt.cmprFn(pMTree->pIter->node.payload, pIter->node.payload);
if (c > 0) {
tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter);
pMTree->pIter = NULL;