enh(query): support merge multiple last files.

This commit is contained in:
Haojun Liao 2022-09-01 11:56:56 +08:00
parent 4271911b50
commit 1cfd3e74f9
3 changed files with 296 additions and 110 deletions

View File

@ -64,6 +64,7 @@ typedef struct STsdbReadSnap STsdbReadSnap;
typedef struct SBlockInfo SBlockInfo; typedef struct SBlockInfo SBlockInfo;
typedef struct SSmaInfo SSmaInfo; typedef struct SSmaInfo SSmaInfo;
typedef struct SBlockCol SBlockCol; typedef struct SBlockCol SBlockCol;
typedef struct SVersionRange SVersionRange;
#define TSDB_FILE_DLMT ((uint32_t)0xF00AFA0F) #define TSDB_FILE_DLMT ((uint32_t)0xF00AFA0F)
#define TSDB_MAX_SUBBLOCKS 8 #define TSDB_MAX_SUBBLOCKS 8
@ -306,6 +307,12 @@ size_t tsdbCacheGetCapacity(SVnode *pVnode);
int32_t tsdbCacheLastArray2Row(SArray *pLastArray, STSRow **ppRow, STSchema *pSchema); int32_t tsdbCacheLastArray2Row(SArray *pLastArray, STSRow **ppRow, STSchema *pSchema);
struct SLDataIter;
int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iLast, int8_t backward, uint64_t uid,
STimeWindow *pTimeWindow, SVersionRange *pRange);
void tLDataIterClose(struct SLDataIter *pIter);
bool tLDataIterNextRow(struct SLDataIter *pIter);
// structs ======================= // structs =======================
struct STsdbFS { struct STsdbFS {
SDelFile *pDelFile; SDelFile *pDelFile;
@ -329,6 +336,11 @@ struct TSDBKEY {
TSKEY ts; TSKEY ts;
}; };
struct SVersionRange {
uint64_t minVer;
uint64_t maxVer;
};
typedef struct SMemSkipListNode SMemSkipListNode; typedef struct SMemSkipListNode SMemSkipListNode;
struct SMemSkipListNode { struct SMemSkipListNode {
int8_t level; int8_t level;
@ -626,6 +638,19 @@ typedef struct {
TSDBROW row; TSDBROW row;
} SRowInfo; } SRowInfo;
typedef struct SMergeTree {
int8_t backward;
SRBTreeNode *pNode;
SRBTree rbt;
struct SLDataIter *pIter;
SDataFReader* pLFileReader;
} SMergeTree;
void tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader* pFReader, uint64_t uid, STimeWindow* pTimeWindow, SVersionRange* pVerRange);
void tMergeTreeAddIter(SMergeTree *pMTree, struct SLDataIter *pIter);
bool tMergeTreeNext(SMergeTree* pMTree);
TSDBROW tMergeTreeGetRow(SMergeTree* pMTree);
// ========== inline functions ========== // ========== inline functions ==========
static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) { static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) {
TSDBKEY *pKey1 = (TSDBKEY *)p1; TSDBKEY *pKey1 = (TSDBKEY *)p1;

View File

@ -15,12 +15,12 @@
#include "tsdb.h" #include "tsdb.h"
#define INITIAL_IROW_INDEX (-1)
// SLDataIter ================================================= // SLDataIter =================================================
typedef struct { typedef struct SLDataIter {
SRBTreeNode node; SRBTreeNode node;
SBlockL *pBlockL; SBlockL *pBlockL;
SRowInfo *pRowInfo;
SDataFReader *pReader; SDataFReader *pReader;
int32_t iLast; int32_t iLast;
int8_t backward; int8_t backward;
@ -29,31 +29,54 @@ typedef struct {
SBlockData bData; SBlockData bData;
int32_t iRow; int32_t iRow;
SRowInfo rInfo; SRowInfo rInfo;
uint64_t uid;
STimeWindow timeWindow;
SVersionRange verRange;
} SLDataIter; } SLDataIter;
int32_t tLDataIterOpen(SLDataIter *pIter, SDataFReader *pReader, int32_t iLast, int8_t backward) { int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iLast, int8_t backward, uint64_t uid,
STimeWindow *pTimeWindow, SVersionRange *pRange) {
int32_t code = 0; int32_t code = 0;
*pIter = taosMemoryCalloc(1, sizeof(SLDataIter));
pIter->pReader = pReader; (*pIter)->uid = uid;
pIter->iLast = iLast; (*pIter)->timeWindow = *pTimeWindow;
pIter->backward = backward; (*pIter)->verRange = *pRange;
(*pIter)->pReader = pReader;
pIter->aBlockL = taosArrayInit(0, sizeof(SBlockL)); (*pIter)->iLast = iLast;
if (pIter->aBlockL == NULL) { (*pIter)->backward = backward;
(*pIter)->aBlockL = taosArrayInit(0, sizeof(SBlockL));
if ((*pIter)->aBlockL == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _exit;
} }
code = tBlockDataCreate(&pIter->bData); code = tBlockDataCreate(&(*pIter)->bData);
if (code) goto _exit; if (code) {
goto _exit;
}
code = tsdbReadBlockL(pReader, iLast, pIter->aBlockL); code = tsdbReadBlockL(pReader, iLast, (*pIter)->aBlockL);
if (code) goto _exit; if (code) {
goto _exit;
}
if (backward) { size_t size = taosArrayGetSize((*pIter)->aBlockL);
pIter->iBlockL = taosArrayGetSize(pIter->aBlockL) - 1;
} else { // find the start block
pIter->iBlockL = 0; // todo handle the desc
int32_t index = -1;
for(int32_t i = 0; i < size; ++i) {
SBlockL *p = taosArrayGet((*pIter)->aBlockL, i);
if (p->minUid <= uid && p->maxUid >= uid) {
index = i;
break;
}
}
(*pIter)->iBlockL = index;
if (index != -1) {
(*pIter)->pBlockL = taosArrayGet((*pIter)->aBlockL, (*pIter)->iBlockL);
} }
_exit: _exit:
@ -74,15 +97,93 @@ void tLDataIterNextBlock(SLDataIter *pIter) {
pIter->iBlockL++; pIter->iBlockL++;
} }
if (pIter->iBlockL >= 0 && pIter->iBlockL < taosArrayGetSize(pIter->aBlockL)) { // todo handle desc order check.
pIter->pBlockL = (SBlockL *)taosArrayGet(pIter->aBlockL, pIter->iBlockL); int32_t index = -1;
} else { size_t size = taosArrayGetSize(pIter->aBlockL);
pIter->pBlockL = NULL; for(int32_t i = pIter->iBlockL; i < size; ++i) {
SBlockL *p = taosArrayGet(pIter->aBlockL, i);
if (p->minUid <= pIter->uid && p->maxUid >= pIter->uid) {
index = i;
break;
}
if (p->minUid > pIter->uid) {
break;
} }
} }
int32_t tLDataIterNextRow(SLDataIter *pIter) { if (index == -1) {
pIter->pBlockL = NULL;
} else {
pIter->pBlockL = (SBlockL *)taosArrayGet(pIter->aBlockL, pIter->iBlockL);
}
}
static void findNextValidRow(SLDataIter* pIter) {
int32_t step = pIter->backward? -1:1;
bool hasVal = false;
int32_t i = pIter->iRow;
for (; i < pIter->bData.nRow && i >= 0; i += step) {
if (pIter->bData.aUid != NULL) {
if (!pIter->backward) {
if (pIter->bData.aUid[i] < pIter->uid) {
continue;
} else if (pIter->bData.aUid[i] > pIter->uid) {
break;
}
} else {
if (pIter->bData.aUid[i] > pIter->uid) {
continue;
} else if (pIter->bData.aUid[i] < pIter->uid) {
break;
}
}
}
int64_t ts = pIter->bData.aTSKEY[i];
if (ts < pIter->timeWindow.skey) {
continue;
}
int64_t ver = pIter->bData.aVersion[i];
if (ver < pIter->verRange.minVer) {
continue;
}
// no data any more, todo opt handle desc case
if (ts > pIter->timeWindow.ekey) {
continue;
}
// todo opt handle desc case
if (ver > pIter->verRange.maxVer) {
continue;
}
// todo handle delete soon
#if 0
TSDBKEY k = {.ts = ts, .version = ver};
if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order)) {
continue;
}
#endif
hasVal = true;
break;
}
pIter->iRow = (hasVal)? i:-1;
}
bool tLDataIterNextRow(SLDataIter *pIter) {
int32_t code = 0; int32_t code = 0;
int32_t step = pIter->backward? -1:1;
// no qualified last file block in current file, no need to fetch row
if (pIter->pBlockL == NULL) {
return false;
}
int32_t iBlockL = pIter->iBlockL; int32_t iBlockL = pIter->iBlockL;
if (pIter->backward) { if (pIter->backward) {
@ -91,18 +192,38 @@ int32_t tLDataIterNextRow(SLDataIter *pIter) {
tLDataIterNextBlock(pIter); tLDataIterNextBlock(pIter);
} }
} else { } else {
pIter->iRow++; if (pIter->bData.nRow == 0 && pIter->pBlockL != NULL) { // current block not loaded yet
if (pIter->iRow >= pIter->bData.nRow) { code = tsdbReadLastBlockEx(pIter->pReader, pIter->iLast, pIter->pBlockL, &pIter->bData);
pIter->iBlockL++; if (code != TSDB_CODE_SUCCESS) {
goto _exit;
}
pIter->iRow = (pIter->backward)? pIter->bData.nRow:-1;
}
pIter->iRow += step;
findNextValidRow(pIter);
if (pIter->iRow >= pIter->bData.nRow || pIter->iRow < 0) {
tLDataIterNextBlock(pIter); tLDataIterNextBlock(pIter);
if (pIter->pBlockL == NULL) { // no more data
goto _exit;
}
} }
} }
if (iBlockL != pIter->iBlockL) { if (iBlockL != pIter->iBlockL) {
if (pIter->pBlockL) { if (pIter->pBlockL) {
code = tsdbReadLastBlockEx(pIter->pReader, pIter->iLast, pIter->pBlockL, &pIter->bData); code = tsdbReadLastBlockEx(pIter->pReader, pIter->iLast, pIter->pBlockL, &pIter->bData);
if (code) goto _exit; if (code) {
pIter->iRow = 0; goto _exit;
}
pIter->iRow = pIter->backward? (pIter->bData.nRow-1):0;
findNextValidRow(pIter);
if (pIter->iRow >= pIter->bData.nRow || pIter->iRow < 0) {
// todo try next block
}
} else { } else {
// no more data // no more data
goto _exit; goto _exit;
@ -114,7 +235,11 @@ int32_t tLDataIterNextRow(SLDataIter *pIter) {
pIter->rInfo.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow); pIter->rInfo.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow);
_exit: _exit:
return code; if (code != TSDB_CODE_SUCCESS) {
return false;
} else {
return pIter->pBlockL != NULL;
}
} }
SRowInfo *tLDataIterGet(SLDataIter *pIter) { SRowInfo *tLDataIterGet(SLDataIter *pIter) {
@ -123,12 +248,6 @@ SRowInfo *tLDataIterGet(SLDataIter *pIter) {
} }
// SMergeTree ================================================= // SMergeTree =================================================
typedef struct {
int8_t backward;
SRBTreeNode *pNode;
SRBTree rbt;
} SMergeTree;
static FORCE_INLINE int32_t tLDataIterCmprFn(const void *p1, const void *p2) { static FORCE_INLINE int32_t tLDataIterCmprFn(const void *p1, const void *p2) {
SLDataIter *pIter1 = (SLDataIter *)p1; SLDataIter *pIter1 = (SLDataIter *)p1;
SLDataIter *pIter2 = (SLDataIter *)p2; SLDataIter *pIter2 = (SLDataIter *)p2;
@ -139,10 +258,61 @@ static FORCE_INLINE int32_t tLDataIterCmprFn(const void *p1, const void *p2) {
return 0; return 0;
} }
void tMergeTreeOpen(SMergeTree *pMTree, int8_t backward) { void tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader* pFReader, uint64_t uid, STimeWindow* pTimeWindow, SVersionRange* pVerRange) {
pMTree->backward = backward; pMTree->backward = backward;
pMTree->pNode = NULL; pMTree->pNode = NULL;
pMTree->pIter = NULL;
tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn); tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
struct SLDataIter* pIterList[TSDB_DEFAULT_LAST_FILE] = {0};
for(int32_t i = 0; i < pFReader->pSet->nLastF; ++i) { // open all last file
/*int32_t code = */tLDataIterOpen(&pIterList[i], pFReader, i, 0, uid, pTimeWindow, pVerRange);
bool hasVal = tLDataIterNextRow(pIterList[i]);
if (hasVal) {
tMergeTreeAddIter(pMTree, pIterList[i]);
}
}
} }
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter) { tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pIter); } void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter) { tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pIter); }
bool tMergeTreeNext(SMergeTree* pMTree) {
int32_t code = TSDB_CODE_SUCCESS;
if (pMTree->pIter) {
SLDataIter *pIter = pMTree->pIter;
bool hasVal = tLDataIterNextRow(pIter);
if (!hasVal) {
pMTree->pIter = NULL;
}
// compare with min in RB Tree
pIter = (SLDataIter *)tRBTreeMin(&pMTree->rbt);
if (pMTree->pIter && pIter) {
int32_t c = pMTree->rbt.cmprFn(&pMTree->pIter->rInfo.row, &pIter->rInfo.row);
if (c > 0) {
tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter);
pMTree->pIter = NULL;
} else {
ASSERT(c);
}
}
}
if (pMTree->pIter == NULL) {
pMTree->pIter = (SLDataIter *)tRBTreeMin(&pMTree->rbt);
if (pMTree->pIter) {
tRBTreeDrop(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter);
}
}
return pMTree->pIter != NULL;
}
TSDBROW tMergeTreeGetRow(SMergeTree* pMTree) {
return pMTree->pIter->rInfo.row;
}
void tMergeTreeClose(SMergeTree* pMTree) {
}

View File

@ -83,11 +83,6 @@ typedef struct SBlockLoadSuppInfo {
char** buildBuf; // build string tmp buffer, todo remove it later after all string format being updated. char** buildBuf; // build string tmp buffer, todo remove it later after all string format being updated.
} SBlockLoadSuppInfo; } SBlockLoadSuppInfo;
typedef struct SVersionRange {
uint64_t minVer;
uint64_t maxVer;
} SVersionRange;
typedef struct SLastBlockReader { typedef struct SLastBlockReader {
SArray* pBlockL; SArray* pBlockL;
int32_t currentBlockIndex; int32_t currentBlockIndex;
@ -96,7 +91,7 @@ typedef struct SLastBlockReader {
SVersionRange verRange; SVersionRange verRange;
int32_t order; int32_t order;
uint64_t uid; uint64_t uid;
int16_t* rowIndex; // row index ptr, usually from the STableBlockScanInfo->indexInBlockL SMergeTree mergeTree;
} SLastBlockReader; } SLastBlockReader;
typedef struct SFilesetIter { typedef struct SFilesetIter {
@ -352,7 +347,6 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet,
pLReader->order = pReader->order; pLReader->order = pReader->order;
pLReader->window = pReader->window; pLReader->window = pReader->window;
pLReader->verRange = pReader->verRange; pLReader->verRange = pReader->verRange;
pLReader->currentBlockIndex = -1;
int32_t code = tBlockDataCreate(&pLReader->lastBlockData); int32_t code = tBlockDataCreate(&pLReader->lastBlockData);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -1346,7 +1340,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
} }
if (minKey == tsLast) { if (minKey == tsLast) {
TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex); TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
if (init) { if (init) {
tRowMerge(&merge, &fRow1); tRowMerge(&merge, &fRow1);
} else { } else {
@ -1375,7 +1369,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
} }
if (minKey == tsLast) { if (minKey == tsLast) {
TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex); TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
if (init) { if (init) {
tRowMerge(&merge, &fRow1); tRowMerge(&merge, &fRow1);
} else { } else {
@ -1407,14 +1401,12 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STsdbReader* pReader, static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STsdbReader* pReader,
STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
bool mergeBlockData) { bool mergeBlockData) {
SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData;
int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader); int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
STSRow* pTSRow = NULL; STSRow* pTSRow = NULL;
SRowMerger merge = {0}; SRowMerger merge = {0};
TSDBROW fRow = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex); TSDBROW fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
tRowMergerInit(&merge, &fRow, pReader->pSchema); tRowMergerInit(&merge, &fRow, pReader->pSchema);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge);
@ -1548,7 +1540,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
} }
if (minKey == tsLast) { if (minKey == tsLast) {
TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex); TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
if (init) { if (init) {
tRowMerge(&merge, &fRow1); tRowMerge(&merge, &fRow1);
} else { } else {
@ -1598,7 +1590,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
} }
if (minKey == tsLast) { if (minKey == tsLast) {
TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex); TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
if (init) { if (init) {
tRowMerge(&merge, &fRow1); tRowMerge(&merge, &fRow1);
} else { } else {
@ -1803,10 +1795,13 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); } static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
static void initLastBlockReader(SLastBlockReader* pLastBlockReader, uint64_t uid, int16_t* startPos) { static bool initLastBlockReader(SLastBlockReader* pLastBlockReader, uint64_t uid, int16_t* startPos, SDataFReader* pFReader) {
pLastBlockReader->uid = uid; // the last block reader has been initialized for this table.
pLastBlockReader->rowIndex = startPos; if (pLastBlockReader->uid == uid) {
return true;
}
pLastBlockReader->uid = uid;
if (*startPos == -1) { if (*startPos == -1) {
if (ASCENDING_TRAVERSE(pLastBlockReader->order)) { if (ASCENDING_TRAVERSE(pLastBlockReader->order)) {
// do nothing // do nothing
@ -1814,19 +1809,24 @@ static void initLastBlockReader(SLastBlockReader* pLastBlockReader, uint64_t uid
*startPos = pLastBlockReader->lastBlockData.nRow; *startPos = pLastBlockReader->lastBlockData.nRow;
} }
} }
/*int32_t code = */ tMergeTreeOpen(&pLastBlockReader->mergeTree, (pLastBlockReader->order == TSDB_ORDER_DESC),
pFReader, uid, &pLastBlockReader->window, &pLastBlockReader->verRange);
bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree);
return hasVal;
} }
static void setAllRowsChecked(SLastBlockReader* pLastBlockReader) { static void setAllRowsChecked(SLastBlockReader* pLastBlockReader) {
*pLastBlockReader->rowIndex = ALL_ROWS_CHECKED_INDEX; // *pLastBlockReader->rowIndex = ALL_ROWS_CHECKED_INDEX;
} }
static bool nextRowInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo) { static bool nextRowInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo) {
bool asc = ASCENDING_TRAVERSE(pLastBlockReader->order); // if (*pLastBlockReader->rowIndex == ALL_ROWS_CHECKED_INDEX) {
int32_t step = (asc) ? 1 : -1; // return false;
if (*pLastBlockReader->rowIndex == ALL_ROWS_CHECKED_INDEX) { // }
return false; return tMergeTreeNext(&pLastBlockReader->mergeTree);
}
#if 0
*(pLastBlockReader->rowIndex) += step; *(pLastBlockReader->rowIndex) += step;
SBlockData* pBlockData = &pLastBlockReader->lastBlockData; SBlockData* pBlockData = &pLastBlockReader->lastBlockData;
@ -1879,20 +1879,17 @@ static bool nextRowInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockSc
// set all data is consumed in last block // set all data is consumed in last block
setAllRowsChecked(pLastBlockReader); setAllRowsChecked(pLastBlockReader);
return false; return false;
#endif
} }
static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) { static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
SBlockData* pBlockData = &pLastBlockReader->lastBlockData; TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
return pBlockData->aTSKEY[*pLastBlockReader->rowIndex]; TSDBKEY key = TSDBROW_KEY(&row);
return key.ts;
} }
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) {
if (*pLastBlockReader->rowIndex == ALL_ROWS_CHECKED_INDEX) { return pLastBlockReader->mergeTree.pIter != NULL;
return false;
}
ASSERT(pLastBlockReader->lastBlockData.nRow > 0);
return true;
} }
int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key, int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
@ -1985,6 +1982,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
} }
} }
bool hasBlockLData = hasDataInLastBlock(pLastBlockReader); bool hasBlockLData = hasDataInLastBlock(pLastBlockReader);
// no data in last block and block, no need to proceed. // no data in last block and block, no need to proceed.
@ -2248,6 +2246,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
#if 0
static int32_t doLoadRelatedLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo, static int32_t doLoadRelatedLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo,
STsdbReader* pReader) { STsdbReader* pReader) {
SArray* pBlocks = pLastBlockReader->pBlockL; SArray* pBlocks = pLastBlockReader->pBlockL;
@ -2308,6 +2307,7 @@ static int32_t doLoadRelatedLastBlock(SLastBlockReader* pLastBlockReader, STable
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
#endif
static int32_t uidComparFunc(const void* p1, const void* p2) { static int32_t uidComparFunc(const void* p1, const void* p2) {
uint64_t pu1 = *(uint64_t*)p1; uint64_t pu1 = *(uint64_t*)p1;
@ -2401,26 +2401,14 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
while (1) { while (1) {
// load the last data block of current table // load the last data block of current table
STableBlockScanInfo* pScanInfo = pStatus->pTableIter; STableBlockScanInfo* pScanInfo = pStatus->pTableIter;
code = doLoadRelatedLastBlock(pLastBlockReader, pScanInfo, pReader); // code = doLoadRelatedLastBlock(pLastBlockReader, pScanInfo, pReader);
if (code != TSDB_CODE_SUCCESS) { // if (code != TSDB_CODE_SUCCESS) {
return code; // return code;
} // }
if (pLastBlockReader->currentBlockIndex != -1) { bool hasVal =
initLastBlockReader(pLastBlockReader, pScanInfo->uid, &pScanInfo->indexInBlockL); initLastBlockReader(pLastBlockReader, pScanInfo->uid, &pScanInfo->indexInBlockL, pReader->pFileReader);
int32_t index = pScanInfo->indexInBlockL; if (!hasVal) {
if (index == INITIAL_ROW_INDEX_VAL || index == pLastBlockReader->lastBlockData.nRow) {
bool hasData = nextRowInLastBlock(pLastBlockReader, pScanInfo);
if (!hasData) { // current table does not have rows in last block, try next table
bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
if (!hasNexTable) {
return TSDB_CODE_SUCCESS;
}
continue;
}
}
} else { // no data in last block, try next table
bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus); bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
if (!hasNexTable) { if (!hasNexTable) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -2428,6 +2416,26 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
continue; continue;
} }
// int32_t index = pScanInfo->indexInBlockL;
// if (index == INITIAL_ROW_INDEX_VAL || index == pLastBlockReader->lastBlockData.nRow) {
// bool hasData = nextRowInLastBlock(pLastBlockReader, pScanInfo);
// if (!hasData) { // current table does not have rows in last block, try next table
// bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
// if (!hasNexTable) {
// return TSDB_CODE_SUCCESS;
// }
// continue;
// }
// }
// } else { // no data in last block, try next table
// bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
// if (!hasNexTable) {
// return TSDB_CODE_SUCCESS;
// }
// continue;
// }
code = doBuildDataBlock(pReader); code = doBuildDataBlock(pReader);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
@ -2446,7 +2454,6 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
} }
static int32_t doBuildDataBlock(STsdbReader* pReader) { static int32_t doBuildDataBlock(STsdbReader* pReader) {
TSDBKEY key = {0};
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SBlock* pBlock = NULL; SBlock* pBlock = NULL;
@ -2466,22 +2473,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
pBlock = getCurrentBlock(pBlockIter); pBlock = getCurrentBlock(pBlockIter);
} }
{ TSDBKEY key = getCurrentKeyInBuf(pScanInfo, pReader);
key = getCurrentKeyInBuf(pScanInfo, pReader);
// load the last data block of current table
code = doLoadRelatedLastBlock(pLastBlockReader, pScanInfo, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// note: the lastblock may be null here
initLastBlockReader(pLastBlockReader, pScanInfo->uid, &pScanInfo->indexInBlockL);
if (pScanInfo->indexInBlockL == INITIAL_ROW_INDEX_VAL ||
pScanInfo->indexInBlockL == pLastBlockReader->lastBlockData.nRow) {
bool hasData = nextRowInLastBlock(pLastBlockReader, pScanInfo);
}
}
if (pBlockInfo == NULL) { // build data block from last data file if (pBlockInfo == NULL) { // build data block from last data file
ASSERT(pBlockIter->numOfBlocks == 0); ASSERT(pBlockIter->numOfBlocks == 0);
@ -2594,7 +2586,6 @@ static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBl
} }
SLastBlockReader* pLReader = pReader->status.fileIter.pLastBlockReader; SLastBlockReader* pLReader = pReader->status.fileIter.pLastBlockReader;
pLReader->currentBlockIndex = -1;
// set the correct start position according to the query time window // set the correct start position according to the query time window
initBlockDumpInfo(pReader, pBlockIter); initBlockDumpInfo(pReader, pBlockIter);
@ -2660,8 +2651,8 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
bool hasNext = blockIteratorNext(&pReader->status.blockIter); bool hasNext = blockIteratorNext(&pReader->status.blockIter);
if (hasNext) { // check for the next block in the block accessed order list if (hasNext) { // check for the next block in the block accessed order list
initBlockDumpInfo(pReader, pBlockIter); initBlockDumpInfo(pReader, pBlockIter);
} else if (taosArrayGetSize(pReader->status.fileIter.pLastBlockReader->pBlockL) > } else if (taosArrayGetSize(pReader->status.fileIter.pLastBlockReader->pBlockL) > 0) {
0) { // data blocks in current file are exhausted, let's try the next file now // data blocks in current file are exhausted, let's try the next file now
tBlockDataReset(&pReader->status.fileBlockData); tBlockDataReset(&pReader->status.fileBlockData);
resetDataBlockIterator(pBlockIter, pReader->order); resetDataBlockIterator(pBlockIter, pReader->order);
goto _begin; goto _begin;
@ -3024,7 +3015,7 @@ int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockSc
while (nextRowInLastBlock(pLastBlockReader, pScanInfo)) { while (nextRowInLastBlock(pLastBlockReader, pScanInfo)) {
int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader); int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
if (next1 == ts) { if (next1 == ts) {
TSDBROW fRow1 = tsdbRowFromBlockData(&pLastBlockReader->lastBlockData, *pLastBlockReader->rowIndex); TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
tRowMerge(pMerger, &fRow1); tRowMerge(pMerger, &fRow1);
} else { } else {
break; break;