fix(query): fix error in query last block.
This commit is contained in:
parent
c7d7cb7d5f
commit
eaef3dffaf
|
@ -88,6 +88,7 @@ typedef struct SLastBlockReader {
|
|||
SBlockData lastBlockData;
|
||||
STimeWindow window;
|
||||
SVersionRange verRange;
|
||||
int32_t order;
|
||||
uint64_t uid;
|
||||
int16_t* rowIndex; // row index ptr, usually from the STableBlockScanInfo->indexInBlockL
|
||||
} SLastBlockReader;
|
||||
|
@ -313,11 +314,11 @@ static void limitOutputBufferSize(const SQueryTableDataCond* pCond, int32_t* cap
|
|||
}
|
||||
|
||||
// init file iterator
|
||||
static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, int32_t order, const char* idstr) {
|
||||
static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdbReader* pReader/*int32_t order, const char* idstr*/) {
|
||||
size_t numOfFileset = taosArrayGetSize(aDFileSet);
|
||||
|
||||
pIter->index = ASCENDING_TRAVERSE(order) ? -1 : numOfFileset;
|
||||
pIter->order = order;
|
||||
pIter->index = ASCENDING_TRAVERSE(pReader->order) ? -1 : numOfFileset;
|
||||
pIter->order = pReader->order;
|
||||
pIter->pFileList = aDFileSet;
|
||||
pIter->numOfFiles = numOfFileset;
|
||||
|
||||
|
@ -325,14 +326,18 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, int32
|
|||
pIter->pLastBlockReader = taosMemoryCalloc(1, sizeof(struct SLastBlockReader));
|
||||
if (pIter->pLastBlockReader == NULL) {
|
||||
int32_t code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
tsdbError("failed to prepare the last block iterator, code:%d %s", tstrerror(code), idstr);
|
||||
tsdbError("failed to prepare the last block iterator, code:%d %s", tstrerror(code), pReader->idStr);
|
||||
return code;
|
||||
}
|
||||
|
||||
pIter->pLastBlockReader->pBlockL = taosArrayInit(4, sizeof(SBlockL));
|
||||
SLastBlockReader* pLReader = pIter->pLastBlockReader;
|
||||
pLReader->pBlockL = taosArrayInit(4, sizeof(SBlockL));
|
||||
pLReader->order = pReader->order;
|
||||
pLReader->window = pReader->window;
|
||||
pLReader->verRange = pReader->verRange;
|
||||
}
|
||||
|
||||
tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, idstr);
|
||||
tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, pReader->idStr);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -1284,7 +1289,7 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBloc
|
|||
|
||||
// todo here we need to each key in the last files to identify if it is really overlapped with last block
|
||||
bool overlapWithlastBlock = false;
|
||||
if (hasDataInLastBlock(pLastBlockReader)) {
|
||||
if (/*hasDataInLastBlock(pLastBlockReader)*/taosArrayGetSize(pLastBlockReader->pBlockL) > 0) {
|
||||
SBlockL *pBlockL = taosArrayGet(pLastBlockReader->pBlockL, pLastBlockReader->currentBlockIndex);
|
||||
overlapWithlastBlock = !(pBlock->maxKey.ts < pBlockL->minKey || pBlock->minKey.ts > pBlockL->maxKey);
|
||||
}
|
||||
|
@ -1364,7 +1369,6 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader*
|
|||
return pReader->pMemSchema;
|
||||
}
|
||||
|
||||
// todo handle desc
|
||||
static int32_t doMergeBufAndFileRows_Rv(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
|
||||
SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) {
|
||||
SRowMerger merge = {0};
|
||||
|
@ -1392,31 +1396,57 @@ static int32_t doMergeBufAndFileRows_Rv(STsdbReader* pReader, STableBlockScanInf
|
|||
minKey = key;
|
||||
}
|
||||
|
||||
// file block ---> last block -----> imem -----> mem
|
||||
bool init = false;
|
||||
if (minKey == key) {
|
||||
init = true;
|
||||
tRowMergerInit(&merge, &fRow, pReader->pSchema);
|
||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
||||
}
|
||||
|
||||
if (minKey == tsLast) {
|
||||
if (!init) {
|
||||
// file block ---> last block -----> imem -----> mem
|
||||
if (pReader->order == TSDB_ORDER_ASC) {
|
||||
if (minKey == key) {
|
||||
init = true;
|
||||
TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex);
|
||||
tRowMergerInit(&merge, &fRow1, pReader->pSchema);
|
||||
tRowMergerInit(&merge, &fRow, pReader->pSchema);
|
||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
||||
}
|
||||
|
||||
doMergeRowsInLastBlock(pLastBlockReader, tsLast, &merge);
|
||||
}
|
||||
if (minKey == tsLast) {
|
||||
if (!init) {
|
||||
init = true;
|
||||
TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex);
|
||||
tRowMergerInit(&merge, &fRow1, pReader->pSchema);
|
||||
}
|
||||
doMergeRowsInLastBlock(pLastBlockReader, tsLast, &merge);
|
||||
}
|
||||
|
||||
if (minKey == k.ts) {
|
||||
if (!init) {
|
||||
if (minKey == k.ts) {
|
||||
if (!init) {
|
||||
init = true;
|
||||
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
|
||||
tRowMergerInit(&merge, pRow, pSchema);
|
||||
}
|
||||
doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
|
||||
}
|
||||
} else {
|
||||
if (minKey == k.ts) {
|
||||
init = true;
|
||||
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
|
||||
tRowMergerInit(&merge, pRow, pSchema);
|
||||
doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
|
||||
}
|
||||
|
||||
doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
|
||||
if (minKey == tsLast) {
|
||||
if (!init) {
|
||||
init = true;
|
||||
TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex);
|
||||
tRowMergerInit(&merge, &fRow1, pReader->pSchema);
|
||||
}
|
||||
doMergeRowsInLastBlock(pLastBlockReader, tsLast, &merge);
|
||||
}
|
||||
|
||||
if (minKey == key) {
|
||||
if (!init) {
|
||||
init = true;
|
||||
tRowMergerInit(&merge, &fRow, pReader->pSchema);
|
||||
}
|
||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
||||
}
|
||||
}
|
||||
|
||||
tRowMergerGetRow(&merge, &pTSRow);
|
||||
|
@ -1753,23 +1783,34 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum
|
|||
|
||||
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
|
||||
|
||||
static void initLastBlockReader(SLastBlockReader* pLastBlockReader, uint64_t uid, STimeWindow* pWin,
|
||||
SVersionRange* pVerRange, int16_t* startPos) {
|
||||
static void initLastBlockReader(SLastBlockReader* pLastBlockReader, uint64_t uid, int16_t* startPos) {
|
||||
pLastBlockReader->uid = uid;
|
||||
pLastBlockReader->window = *pWin;
|
||||
pLastBlockReader->verRange = *pVerRange;
|
||||
pLastBlockReader->rowIndex = startPos;
|
||||
|
||||
if (*startPos == -1) {
|
||||
if (ASCENDING_TRAVERSE(pLastBlockReader->order)) {
|
||||
// do nothing
|
||||
} else {
|
||||
*startPos = pLastBlockReader->lastBlockData.nRow;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#define ALL_ROWS_CHECKED_INDEX INT16_MIN
|
||||
static void setAllRowsChecked(SLastBlockReader *pLastBlockReader) {
|
||||
*pLastBlockReader->rowIndex = ALL_ROWS_CHECKED_INDEX;
|
||||
}
|
||||
|
||||
static bool nextRowInLastBlock(SLastBlockReader *pLastBlockReader) {
|
||||
if (*(pLastBlockReader->rowIndex) >= pLastBlockReader->lastBlockData.nRow) {
|
||||
int32_t step = (pLastBlockReader->order == TSDB_ORDER_ASC) ? 1 : -1;
|
||||
if (*pLastBlockReader->rowIndex == ALL_ROWS_CHECKED_INDEX) {
|
||||
return false;
|
||||
}
|
||||
|
||||
*(pLastBlockReader->rowIndex) += 1;
|
||||
*(pLastBlockReader->rowIndex) += step;
|
||||
|
||||
SBlockData* pBlockData = &pLastBlockReader->lastBlockData;
|
||||
for(int32_t i = *(pLastBlockReader->rowIndex); i < pBlockData->nRow; ++i) {
|
||||
for(int32_t i = *(pLastBlockReader->rowIndex); i < pBlockData->nRow && i >= 0; i += step) {
|
||||
if (pBlockData->aUid[i] != pLastBlockReader->uid) {
|
||||
continue;
|
||||
}
|
||||
|
@ -1784,12 +1825,12 @@ static bool nextRowInLastBlock(SLastBlockReader *pLastBlockReader) {
|
|||
|
||||
// no data any more
|
||||
if (pBlockData->aTSKEY[i] > pLastBlockReader->window.ekey) {
|
||||
*(pLastBlockReader->rowIndex) = pBlockData->nRow;
|
||||
setAllRowsChecked(pLastBlockReader);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (pBlockData->aVersion[i] > pLastBlockReader->verRange.maxVer) {
|
||||
*(pLastBlockReader->rowIndex) = pBlockData->nRow;
|
||||
setAllRowsChecked(pLastBlockReader);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -1798,7 +1839,7 @@ static bool nextRowInLastBlock(SLastBlockReader *pLastBlockReader) {
|
|||
}
|
||||
|
||||
// set all data is consumed in last block
|
||||
*(pLastBlockReader->rowIndex) = pBlockData->nRow;
|
||||
setAllRowsChecked(pLastBlockReader);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -1817,15 +1858,14 @@ static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
|
|||
return pBlockData->aTSKEY[*pLastBlockReader->rowIndex];
|
||||
}
|
||||
|
||||
// todo handle desc order
|
||||
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) {
|
||||
if (*pLastBlockReader->rowIndex >= pLastBlockReader->lastBlockData.nRow) {
|
||||
if (*pLastBlockReader->rowIndex == ALL_ROWS_CHECKED_INDEX) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
// todo refactor
|
||||
static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, SLastBlockReader *pLastBlockReader) {
|
||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||
|
||||
|
@ -1849,6 +1889,25 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
|
|||
if (pBlockData->nRow > 0) {
|
||||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
||||
|
||||
// no last block
|
||||
if (pLastBlockReader->lastBlockData.nRow == 0) {
|
||||
if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else {
|
||||
STSRow* pTSRow = NULL;
|
||||
SRowMerger merge = {0};
|
||||
|
||||
tRowMergerInit(&merge, &fRow, pReader->pSchema);
|
||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
||||
tRowMergerGetRow(&merge, &pTSRow);
|
||||
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
|
||||
|
||||
taosMemoryFree(pTSRow);
|
||||
tRowMergerClear(&merge);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
// row in last file block
|
||||
int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader);
|
||||
if (ts < key) { // save rows in last block
|
||||
|
@ -1901,7 +1960,6 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
|
|||
}
|
||||
}
|
||||
} else { // only last block exists
|
||||
// only last block exits
|
||||
SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData;
|
||||
int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
|
||||
|
||||
|
@ -1936,7 +1994,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
|
|||
}
|
||||
|
||||
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
|
||||
initLastBlockReader(pLastBlockReader, pBlockScanInfo->uid, &pReader->window, &pReader->verRange, &pBlockScanInfo->indexInBlockL);
|
||||
initLastBlockReader(pLastBlockReader, pBlockScanInfo->uid, &pBlockScanInfo->indexInBlockL);
|
||||
// bool has = nextRowInLastBlock(pLastBlockReader); // todo handle if no data in last block
|
||||
|
||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||
|
@ -2274,12 +2332,12 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
|
|||
// todo opt perf by avoiding load last block repeatly
|
||||
STableBlockScanInfo* pScanInfo = pStatus->pTableIter;
|
||||
int32_t code = doLoadRelatedLastBlock(pLastBlockReader, pScanInfo->uid, pReader);
|
||||
if (code != TSDB_CODE_SUCCESS) { // todo handle error
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
initLastBlockReader(pLastBlockReader, pScanInfo->uid, &pReader->window, &pReader->verRange, &pScanInfo->indexInBlockL);
|
||||
if (pScanInfo->indexInBlockL == -1) {
|
||||
initLastBlockReader(pLastBlockReader, pScanInfo->uid, &pScanInfo->indexInBlockL);
|
||||
if (pScanInfo->indexInBlockL == -1 || pScanInfo->indexInBlockL == pLastBlockReader->lastBlockData.nRow) {
|
||||
bool hasData = nextRowInLastBlock(pLastBlockReader);
|
||||
if (!hasData) { // current table does not have rows in last block, try next table
|
||||
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
|
||||
|
@ -2327,10 +2385,10 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
|||
// load the last data block of current table
|
||||
code = doLoadRelatedLastBlock(pLastBlockReader, pScanInfo->uid, pReader);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
// todo handle error
|
||||
return code;
|
||||
}
|
||||
|
||||
initLastBlockReader(pLastBlockReader, pBlockInfo->uid, &pReader->window, &pReader->verRange, &pScanInfo->indexInBlockL);
|
||||
initLastBlockReader(pLastBlockReader, pBlockInfo->uid, &pScanInfo->indexInBlockL);
|
||||
}
|
||||
|
||||
if (pBlockInfo == NULL) { // build data block from last data file
|
||||
|
@ -2340,7 +2398,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
|||
tBlockDataReset(&pStatus->fileBlockData);
|
||||
code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pScanInfo->uid, pReader->pSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
// todo
|
||||
return code;
|
||||
}
|
||||
|
||||
code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData);
|
||||
|
@ -2456,6 +2514,10 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
|
|||
return code;
|
||||
}
|
||||
|
||||
if (pReader->pResBlock->info.rows > 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// all data blocks are checked in this last block file, now let's try the next file
|
||||
if (pReader->status.pTableIter == NULL) {
|
||||
code = initForFirstBlockInFile(pReader, pBlockIter);
|
||||
|
@ -2881,7 +2943,7 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// todo support desc order
|
||||
// todo check if the rows are dropped or not
|
||||
int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, int64_t ts, SRowMerger* pMerger) {
|
||||
while(nextRowInLastBlock(pLastBlockReader)) {
|
||||
int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
|
||||
|
@ -3240,7 +3302,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
|
|||
if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) {
|
||||
SDataBlockIter* pBlockIter = &pReader->status.blockIter;
|
||||
|
||||
initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader->order, pReader->idStr);
|
||||
initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
|
||||
resetDataBlockIterator(&pReader->status.blockIter, pReader->order, pReader->status.pTableMap);
|
||||
|
||||
// no data in files, let's try buffer in memory
|
||||
|
@ -3261,8 +3323,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
|
|||
goto _err;
|
||||
}
|
||||
|
||||
initFilesetIterator(&pPrevReader->status.fileIter, pPrevReader->pReadSnap->fs.aDFileSet, pPrevReader->order,
|
||||
pPrevReader->idStr);
|
||||
initFilesetIterator(&pPrevReader->status.fileIter, pPrevReader->pReadSnap->fs.aDFileSet, pPrevReader);
|
||||
resetDataBlockIterator(&pPrevReader->status.blockIter, pPrevReader->order, pReader->status.pTableMap);
|
||||
|
||||
// no data in files, let's try buffer in memory
|
||||
|
@ -3507,13 +3568,13 @@ static SArray* doRetrieveDataBlock(STsdbReader* pReader) {
|
|||
tBlockDataReset(&pStatus->fileBlockData);
|
||||
int32_t code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pBlockScanInfo->uid, pReader->pSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
//todo
|
||||
terrno = code;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tBlockDataDestroy(&pStatus->fileBlockData, 1);
|
||||
|
||||
terrno = code;
|
||||
return NULL;
|
||||
}
|
||||
|
@ -3555,7 +3616,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
|
|||
int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
|
||||
tsdbDataFReaderClose(&pReader->pFileReader);
|
||||
|
||||
initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader->order, pReader->idStr);
|
||||
initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
|
||||
resetDataBlockIterator(&pReader->status.blockIter, pReader->order, pReader->status.pTableMap);
|
||||
resetDataBlockScanInfo(pReader->status.pTableMap);
|
||||
|
||||
|
|
Loading…
Reference in New Issue