fix(tsdb): return code for tMergeTreeNext

This commit is contained in:
Haojun Liao 2024-08-19 11:01:54 +08:00
parent ee36bd741f
commit d0e31f711f
4 changed files with 58 additions and 38 deletions

View File

@ -898,7 +898,7 @@ typedef struct SSttDataInfoForTable {
int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoForTable *pTableInfo);
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
bool tMergeTreeNext(SMergeTree *pMTree);
int32_t tMergeTreeNext(SMergeTree *pMTree, bool* pHasNext);
void tMergeTreePinSttBlock(SMergeTree *pMTree);
void tMergeTreeUnpinSttBlock(SMergeTree *pMTree);
bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree);

View File

@ -2245,17 +2245,18 @@ static int32_t lastIterClose(SFSLastIter **iter) {
}
static int32_t lastIterNext(SFSLastIter *iter, TSDBROW **ppRow) {
int32_t code = 0;
bool hasVal = false;
int32_t code = tMergeTreeNext(iter->pMergeTree, &hasVal);
if (code != 0) {
return code;
}
bool hasVal = tMergeTreeNext(iter->pMergeTree);
if (!hasVal) {
*ppRow = NULL;
TAOS_RETURN(code);
}
*ppRow = tMergeTreeGetRow(iter->pMergeTree);
TAOS_RETURN(code);
}

View File

@ -1102,13 +1102,21 @@ void tMergeTreeUnpinSttBlock(SMergeTree *pMTree) {
tLDataIterUnpinSttBlock(pIter, pMTree->idStr);
}
bool tMergeTreeNext(SMergeTree *pMTree) {
int32_t tMergeTreeNext(SMergeTree *pMTree, bool *pHasNext) {
int32_t code = 0;
if (pHasNext == NULL) {
return TSDB_CODE_INVALID_PARA;
}
if (pMTree->pIter) {
SLDataIter *pIter = pMTree->pIter;
bool hasVal = false;
int32_t code = tLDataIterNextRow(pIter, pMTree->idStr, &hasVal);
bool hasVal = false;
code = tLDataIterNextRow(pIter, pMTree->idStr, &hasVal);
if (!hasVal || (code != 0)) {
if (code == TSDB_CODE_FILE_CORRUPTED) {
code = 0; // suppress the file corrupt error to enable all queries within this cluster can run without failed.
}
pMTree->pIter = NULL;
}
@ -1117,7 +1125,7 @@ bool tMergeTreeNext(SMergeTree *pMTree) {
if (pMTree->pIter && pIter) {
int32_t c = pMTree->rbt.cmprFn(&pMTree->pIter->node, &pIter->node);
if (c > 0) {
(void) tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter);
(void)tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter);
pMTree->pIter = NULL;
} else {
ASSERT(c);
@ -1132,7 +1140,8 @@ bool tMergeTreeNext(SMergeTree *pMTree) {
}
}
return pMTree->pIter != NULL;
*pHasNext = (pMTree->pIter != NULL);
return code;
}
void tMergeTreeClose(SMergeTree *pMTree) {

View File

@ -1759,14 +1759,22 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB
return code;
}
static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, int32_t pkSrcSlot,
SVersionRange* pVerRange) {
static int32_t nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, int32_t pkSrcSlot,
SVersionRange* pVerRange) {
int32_t code = 0;
int32_t order = pSttBlockReader->order;
int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
SRowKey* pNextProc = &pScanInfo->sttKeyInfo.nextProcKey;
while (1) {
bool hasVal = tMergeTreeNext(&pSttBlockReader->mergeTree);
bool hasVal = false;
code = tMergeTreeNext(&pSttBlockReader->mergeTree, &hasVal);
if (code) {
tsdbError("failed to iter the next row in stt-file merge tree, code:%s, %s", tstrerror(code),
pSttBlockReader->mergeTree.idStr);
return code;
}
if (!hasVal) { // the next value will be the accessed key in stt
pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA;
@ -1779,7 +1787,6 @@ static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockSc
memset(pNextProc->pks[0].pData, 0, pNextProc->pks[0].nData);
}
}
return false;
}
TSDBROW* pRow = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
@ -1798,13 +1805,13 @@ static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockSc
if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->sttBlockDelIndex, key, ver, order, pVerRange,
pSttBlockReader->numOfPks > 0)) {
pScanInfo->sttKeyInfo.status = STT_FILE_HAS_DATA;
return true;
}
} else {
pScanInfo->sttKeyInfo.status = STT_FILE_HAS_DATA;
return true;
}
}
return code;
}
static void doPinSttBlock(SSttBlockReader* pSttBlockReader) { tMergeTreePinSttBlock(&pSttBlockReader->mergeTree); }
@ -2380,14 +2387,13 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, int32_t rowIndex, STable
return true;
}
static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
bool hasData = true;
static void initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
int32_t order = pReader->info.order;
bool asc = ASCENDING_TRAVERSE(order);
// the stt block reader has been initialized for this table.
if (pSttBlockReader->uid == pScanInfo->uid) {
return hasDataInSttBlock(pScanInfo);
return;
}
if (pSttBlockReader->uid != 0) {
@ -2396,9 +2402,14 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
pSttBlockReader->uid = pScanInfo->uid;
// second time init stt block reader
// second or third time init stt block reader
if (pScanInfo->cleanSttBlocks && (pReader->info.execMode == READER_EXEC_ROWS)) {
return !pScanInfo->sttBlockReturned;
// only allowed to retrieve clean stt blocks for count once
if (pScanInfo->sttBlockReturned) {
pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA;
tsdbDebug("uid:%" PRIu64 " set no stt-file data after stt-block retrieved", pScanInfo->uid, pReader->idStr);
}
return;
}
STimeWindow w = pSttBlockReader->window;
@ -2435,28 +2446,28 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
SSttDataInfoForTable info = {.pKeyRangeList = taosArrayInit(4, sizeof(SSttKeyRange))};
if (info.pKeyRangeList == NULL) {
pReader->code = terrno;
return false;
return;
}
int32_t code = tMergeTreeOpen2(&pSttBlockReader->mergeTree, &conf, &info);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(info.pKeyRangeList);
pReader->code = code;
return false;
return;
}
code = initMemDataIterator(pScanInfo, pReader);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(info.pKeyRangeList);
pReader->code = code;
return false;
return;
}
code = initDelSkylineIterator(pScanInfo, pReader->info.order, &pReader->cost);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(info.pKeyRangeList);
pReader->code = code;
return false;
return;
}
if (conf.rspRows) {
@ -2484,27 +2495,26 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
SRowKey* p = asc ? &pScanInfo->sttRange.skey : &pScanInfo->sttRange.ekey;
tRowKeyAssign(&pScanInfo->sttKeyInfo.nextProcKey, p);
hasData = (pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA);
} else { // not clean stt blocks
INIT_KEYRANGE(&pScanInfo->sttRange); // reset the time window
pScanInfo->sttBlockReturned = false;
hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pReader->suppInfo.pkSrcSlot, &pReader->info.verRange);
code = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pReader->suppInfo.pkSrcSlot, &pReader->info.verRange);
}
} else {
pScanInfo->cleanSttBlocks = false;
INIT_KEYRANGE(&pScanInfo->sttRange); // reset the time window
pScanInfo->sttBlockReturned = false;
hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pReader->suppInfo.pkSrcSlot, &pReader->info.verRange);
code = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pReader->suppInfo.pkSrcSlot, &pReader->info.verRange);
}
pScanInfo->sttBlockReturned = false;
taosArrayDestroy(info.pKeyRangeList);
int64_t el = taosGetTimestampUs() - st;
pReader->cost.initSttBlockReader += (el / 1000.0);
tsdbDebug("init stt block reader completed, elapsed time:%" PRId64 "us %s", el, pReader->idStr);
return hasData;
if (code != 0) {
pReader->code = code;
}
}
static bool hasDataInSttBlock(STableBlockScanInfo* pInfo) { return pInfo->sttKeyInfo.status == STT_FILE_HAS_DATA; }
@ -2772,7 +2782,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
}
SBlockData* pBlockData = &pReader->status.fileBlockData;
(void) initSttBlockReader(pSttBlockReader, pBlockScanInfo, pReader);
initSttBlockReader(pSttBlockReader, pBlockScanInfo, pReader);
if (pReader->code != 0) {
code = pReader->code;
goto _end;
@ -3190,12 +3200,12 @@ static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) {
continue;
}
bool hasDataInSttFile = initSttBlockReader(pSttBlockReader, pScanInfo, pReader);
initSttBlockReader(pSttBlockReader, pScanInfo, pReader);
if (pReader->code != TSDB_CODE_SUCCESS) {
return pReader->code;
}
if (!hasDataInSttFile) {
if (!hasDataInSttBlock(pScanInfo)) {
bool hasNexTable = moveToNextTable(pUidList, pStatus);
if (!hasNexTable) {
return TSDB_CODE_SUCCESS;
@ -3287,7 +3297,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
}
if (pScanInfo->sttKeyInfo.status == STT_FILE_READER_UNINIT) {
(void) initSttBlockReader(pSttBlockReader, pScanInfo, pReader);
initSttBlockReader(pSttBlockReader, pScanInfo, pReader);
if (pReader->code != 0) {
return pReader->code;
}
@ -3331,7 +3341,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
int64_t st = taosGetTimestampUs();
// let's load data from stt files, make sure clear the cleanStt block flag before load the data from stt files
(void) initSttBlockReader(pSttBlockReader, pScanInfo, pReader);
initSttBlockReader(pSttBlockReader, pScanInfo, pReader);
if (pReader->code != 0) {
return pReader->code;
}