diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index ab5b07581a..85084a0b81 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -898,7 +898,7 @@ typedef struct SSttDataInfoForTable { int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoForTable *pTableInfo); void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter); -bool tMergeTreeNext(SMergeTree *pMTree); +int32_t tMergeTreeNext(SMergeTree *pMTree, bool* pHasNext); void tMergeTreePinSttBlock(SMergeTree *pMTree); void tMergeTreeUnpinSttBlock(SMergeTree *pMTree); bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index aa92597211..70e6e1ee2a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -2245,17 +2245,18 @@ static int32_t lastIterClose(SFSLastIter **iter) { } static int32_t lastIterNext(SFSLastIter *iter, TSDBROW **ppRow) { - int32_t code = 0; + bool hasVal = false; + int32_t code = tMergeTreeNext(iter->pMergeTree, &hasVal); + if (code != 0) { + return code; + } - bool hasVal = tMergeTreeNext(iter->pMergeTree); if (!hasVal) { *ppRow = NULL; - TAOS_RETURN(code); } *ppRow = tMergeTreeGetRow(iter->pMergeTree); - TAOS_RETURN(code); } diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index 4729b912a7..8bfc066731 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -1102,13 +1102,21 @@ void tMergeTreeUnpinSttBlock(SMergeTree *pMTree) { tLDataIterUnpinSttBlock(pIter, pMTree->idStr); } -bool tMergeTreeNext(SMergeTree *pMTree) { +int32_t tMergeTreeNext(SMergeTree *pMTree, bool *pHasNext) { + int32_t code = 0; + if (pHasNext == NULL) { + return TSDB_CODE_INVALID_PARA; + } + if (pMTree->pIter) { SLDataIter *pIter = pMTree->pIter; - - bool hasVal = false; - int32_t code = tLDataIterNextRow(pIter, pMTree->idStr, &hasVal); + bool hasVal = false; + code = tLDataIterNextRow(pIter, pMTree->idStr, &hasVal); if (!hasVal || (code != 0)) { + if (code == TSDB_CODE_FILE_CORRUPTED) { + code = 0; // suppress the file corrupt error to enable all queries within this cluster can run without failed. + } + pMTree->pIter = NULL; } @@ -1117,7 +1125,7 @@ bool tMergeTreeNext(SMergeTree *pMTree) { if (pMTree->pIter && pIter) { int32_t c = pMTree->rbt.cmprFn(&pMTree->pIter->node, &pIter->node); if (c > 0) { - (void) tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter); + (void)tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter); pMTree->pIter = NULL; } else { ASSERT(c); @@ -1132,7 +1140,8 @@ bool tMergeTreeNext(SMergeTree *pMTree) { } } - return pMTree->pIter != NULL; + *pHasNext = (pMTree->pIter != NULL); + return code; } void tMergeTreeClose(SMergeTree *pMTree) { diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 9a9c74a3a0..607d96bcbc 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -1759,14 +1759,22 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB return code; } -static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, int32_t pkSrcSlot, - SVersionRange* pVerRange) { +static int32_t nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, int32_t pkSrcSlot, + SVersionRange* pVerRange) { + int32_t code = 0; int32_t order = pSttBlockReader->order; int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1; SRowKey* pNextProc = &pScanInfo->sttKeyInfo.nextProcKey; while (1) { - bool hasVal = tMergeTreeNext(&pSttBlockReader->mergeTree); + bool hasVal = false; + code = tMergeTreeNext(&pSttBlockReader->mergeTree, &hasVal); + if (code) { + tsdbError("failed to iter the next row in stt-file merge tree, code:%s, %s", tstrerror(code), + pSttBlockReader->mergeTree.idStr); + return code; + } + if (!hasVal) { // the next value will be the accessed key in stt pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA; @@ -1779,7 +1787,6 @@ static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockSc memset(pNextProc->pks[0].pData, 0, pNextProc->pks[0].nData); } } - return false; } TSDBROW* pRow = tMergeTreeGetRow(&pSttBlockReader->mergeTree); @@ -1798,13 +1805,13 @@ static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockSc if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->sttBlockDelIndex, key, ver, order, pVerRange, pSttBlockReader->numOfPks > 0)) { pScanInfo->sttKeyInfo.status = STT_FILE_HAS_DATA; - return true; } } else { pScanInfo->sttKeyInfo.status = STT_FILE_HAS_DATA; - return true; } } + + return code; } static void doPinSttBlock(SSttBlockReader* pSttBlockReader) { tMergeTreePinSttBlock(&pSttBlockReader->mergeTree); } @@ -2380,14 +2387,13 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, int32_t rowIndex, STable return true; } -static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) { - bool hasData = true; +static void initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) { int32_t order = pReader->info.order; bool asc = ASCENDING_TRAVERSE(order); // the stt block reader has been initialized for this table. if (pSttBlockReader->uid == pScanInfo->uid) { - return hasDataInSttBlock(pScanInfo); + return; } if (pSttBlockReader->uid != 0) { @@ -2396,9 +2402,14 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan pSttBlockReader->uid = pScanInfo->uid; - // second time init stt block reader + // second or third time init stt block reader if (pScanInfo->cleanSttBlocks && (pReader->info.execMode == READER_EXEC_ROWS)) { - return !pScanInfo->sttBlockReturned; + // only allowed to retrieve clean stt blocks for count once + if (pScanInfo->sttBlockReturned) { + pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA; + tsdbDebug("uid:%" PRIu64 " set no stt-file data after stt-block retrieved", pScanInfo->uid, pReader->idStr); + } + return; } STimeWindow w = pSttBlockReader->window; @@ -2435,28 +2446,28 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan SSttDataInfoForTable info = {.pKeyRangeList = taosArrayInit(4, sizeof(SSttKeyRange))}; if (info.pKeyRangeList == NULL) { pReader->code = terrno; - return false; + return; } int32_t code = tMergeTreeOpen2(&pSttBlockReader->mergeTree, &conf, &info); if (code != TSDB_CODE_SUCCESS) { taosArrayDestroy(info.pKeyRangeList); pReader->code = code; - return false; + return; } code = initMemDataIterator(pScanInfo, pReader); if (code != TSDB_CODE_SUCCESS) { taosArrayDestroy(info.pKeyRangeList); pReader->code = code; - return false; + return; } code = initDelSkylineIterator(pScanInfo, pReader->info.order, &pReader->cost); if (code != TSDB_CODE_SUCCESS) { taosArrayDestroy(info.pKeyRangeList); pReader->code = code; - return false; + return; } if (conf.rspRows) { @@ -2484,27 +2495,26 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan SRowKey* p = asc ? &pScanInfo->sttRange.skey : &pScanInfo->sttRange.ekey; tRowKeyAssign(&pScanInfo->sttKeyInfo.nextProcKey, p); - - hasData = (pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA); } else { // not clean stt blocks INIT_KEYRANGE(&pScanInfo->sttRange); // reset the time window - pScanInfo->sttBlockReturned = false; - hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pReader->suppInfo.pkSrcSlot, &pReader->info.verRange); + code = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pReader->suppInfo.pkSrcSlot, &pReader->info.verRange); } } else { pScanInfo->cleanSttBlocks = false; INIT_KEYRANGE(&pScanInfo->sttRange); // reset the time window - pScanInfo->sttBlockReturned = false; - hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pReader->suppInfo.pkSrcSlot, &pReader->info.verRange); + code = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pReader->suppInfo.pkSrcSlot, &pReader->info.verRange); } + pScanInfo->sttBlockReturned = false; taosArrayDestroy(info.pKeyRangeList); int64_t el = taosGetTimestampUs() - st; pReader->cost.initSttBlockReader += (el / 1000.0); tsdbDebug("init stt block reader completed, elapsed time:%" PRId64 "us %s", el, pReader->idStr); - return hasData; + if (code != 0) { + pReader->code = code; + } } static bool hasDataInSttBlock(STableBlockScanInfo* pInfo) { return pInfo->sttKeyInfo.status == STT_FILE_HAS_DATA; } @@ -2772,7 +2782,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { } SBlockData* pBlockData = &pReader->status.fileBlockData; - (void) initSttBlockReader(pSttBlockReader, pBlockScanInfo, pReader); + initSttBlockReader(pSttBlockReader, pBlockScanInfo, pReader); if (pReader->code != 0) { code = pReader->code; goto _end; @@ -3190,12 +3200,12 @@ static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) { continue; } - bool hasDataInSttFile = initSttBlockReader(pSttBlockReader, pScanInfo, pReader); + initSttBlockReader(pSttBlockReader, pScanInfo, pReader); if (pReader->code != TSDB_CODE_SUCCESS) { return pReader->code; } - if (!hasDataInSttFile) { + if (!hasDataInSttBlock(pScanInfo)) { bool hasNexTable = moveToNextTable(pUidList, pStatus); if (!hasNexTable) { return TSDB_CODE_SUCCESS; @@ -3287,7 +3297,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { } if (pScanInfo->sttKeyInfo.status == STT_FILE_READER_UNINIT) { - (void) initSttBlockReader(pSttBlockReader, pScanInfo, pReader); + initSttBlockReader(pSttBlockReader, pScanInfo, pReader); if (pReader->code != 0) { return pReader->code; } @@ -3331,7 +3341,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { int64_t st = taosGetTimestampUs(); // let's load data from stt files, make sure clear the cleanStt block flag before load the data from stt files - (void) initSttBlockReader(pSttBlockReader, pScanInfo, pReader); + initSttBlockReader(pSttBlockReader, pScanInfo, pReader); if (pReader->code != 0) { return pReader->code; }