From 1ed620e702fc70fe06cdeefcb12e37f382b3acc1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 17 Jul 2024 11:17:27 +0800 Subject: [PATCH] fix(stream): return after get value. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 58 +++++++---- source/dnode/vnode/src/tsdb/tsdbReadUtil.c | 112 ++++++++++++++++----- source/dnode/vnode/src/tsdb/tsdbReadUtil.h | 1 - 3 files changed, 124 insertions(+), 47 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index cf2d23cfc8..dcca320f7f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -48,7 +48,7 @@ typedef struct { static int32_t getCurrentBlockInfo(SDataBlockIter* pBlockIter, SFileDataBlockInfo** pInfo); static int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity, STsdbReader* pReader); -static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader); +static int32_t getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader, TSDBROW** pRow); static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, SRowKey* pKey, STsdbReader* pReader); static int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, @@ -2025,9 +2025,11 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SArray* pDelList = pBlockScanInfo->delSkyline; int32_t pkSrcSlot = pReader->suppInfo.pkSrcSlot; + TSDBROW* pRow = NULL; + TSDBROW* piRow = NULL; - TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader); - TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader); + getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader, &pRow); + getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader, &piRow); SRowKey* pSttKey = NULL; if (hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) { @@ -2201,9 +2203,10 @@ int32_t doInitMemDataIter(STsdbReader* pReader, STbData** pData, STableBlockScan static void doForwardDataIter(SRowKey* pKey, SIterInfo* pIter, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) { SRowKey rowKey = {0}; + TSDBROW* pRow = NULL; while (1) { - TSDBROW* pRow = getValidMemRow(pIter, pBlockScanInfo->delSkyline, pReader); + getValidMemRow(pIter, pBlockScanInfo->delSkyline, pReader, &pRow); if (!pIter->hasVal) { break; } @@ -2559,11 +2562,11 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI } if (pBlockScanInfo->iter.hasVal) { - pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader); + getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader, &pRow); } if (pBlockScanInfo->iiter.hasVal) { - piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader); + getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader, &piRow); } // two levels of mem-table does contain the valid rows @@ -2810,13 +2813,16 @@ TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) TSDBKEY key = {.ts = TSKEY_INITIAL_VAL}, ikey = {.ts = TSKEY_INITIAL_VAL}; bool hasKey = false, hasIKey = false; - TSDBROW* pRow = getValidMemRow(&pScanInfo->iter, pScanInfo->delSkyline, pReader); + TSDBROW* pRow = NULL; + TSDBROW* pIRow = NULL; + + getValidMemRow(&pScanInfo->iter, pScanInfo->delSkyline, pReader, &pRow); if (pRow != NULL) { hasKey = true; key = TSDBROW_KEY(pRow); } - TSDBROW* pIRow = getValidMemRow(&pScanInfo->iiter, pScanInfo->delSkyline, pReader); + getValidMemRow(&pScanInfo->iiter, pScanInfo->delSkyline, pReader, &pIRow); if (pIRow != NULL) { hasIKey = true; ikey = TSDBROW_KEY(pIRow); @@ -3742,9 +3748,11 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t return false; } -FORCE_INLINE TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader) { +FORCE_INLINE int32_t getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader, TSDBROW** pRes) { + *pRes = NULL; + if (!pIter->hasVal) { - return NULL; + return TSDB_CODE_SUCCESS; } int32_t order = pReader->info.order; @@ -3754,18 +3762,20 @@ FORCE_INLINE TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, S TSDBROW_INIT_KEY(pRow, key); if (outOfTimeWindow(key.ts, &pReader->info.window)) { pIter->hasVal = false; - return NULL; + return TSDB_CODE_SUCCESS; } // it is a valid data version if (key.version <= pReader->info.verRange.maxVer && key.version >= pReader->info.verRange.minVer) { if (pDelList == NULL || TARRAY_SIZE(pDelList) == 0) { - return pRow; + *pRes = pRow; + return TSDB_CODE_SUCCESS; } else { bool dropped = hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, order, &pReader->info.verRange, pReader->suppInfo.numOfPks > 0); if (!dropped) { - return pRow; + *pRes = pRow; + return TSDB_CODE_SUCCESS; } } } @@ -3773,7 +3783,7 @@ FORCE_INLINE TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, S while (1) { pIter->hasVal = tsdbTbDataIterNext(pIter->iter); if (!pIter->hasVal) { - return NULL; + return TSDB_CODE_SUCCESS; } pRow = tsdbTbDataIterGet(pIter->iter); @@ -3781,17 +3791,19 @@ FORCE_INLINE TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, S TSDBROW_INIT_KEY(pRow, key); if (outOfTimeWindow(key.ts, &pReader->info.window)) { pIter->hasVal = false; - return NULL; + return TSDB_CODE_SUCCESS; } if (key.version <= pReader->info.verRange.maxVer && key.version >= pReader->info.verRange.minVer) { if (pDelList == NULL || TARRAY_SIZE(pDelList) == 0) { - return pRow; + *pRes = pRow; + return TSDB_CODE_SUCCESS; } else { bool dropped = hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, order, &pReader->info.verRange, pReader->suppInfo.numOfPks > 0); if (!dropped) { - return pRow; + *pRes = pRow; + return TSDB_CODE_SUCCESS; } } } @@ -3809,7 +3821,8 @@ int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, SRowKey* pCurKey, SArra } // data exists but not valid - TSDBROW* pRow = getValidMemRow(pIter, pDelList, pReader); + TSDBROW* pRow = NULL; + getValidMemRow(pIter, pDelList, pReader, &pRow); if (pRow == NULL) { break; } @@ -3974,7 +3987,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, SRowKey* pKey, uint64_t uid, SIt *freeTSRow = false; return TSDB_CODE_SUCCESS; } else { // has next point in mem/imem - pNextRow = getValidMemRow(pIter, pDelList, pReader); + getValidMemRow(pIter, pDelList, pReader, &pNextRow); if (pNextRow == NULL) { *pResRow = current; *freeTSRow = false; @@ -4118,8 +4131,11 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, SRowKey* pRowKey, TSDBROW* piRow, SRow static int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, TSDBROW* pResRow, int64_t endKey, bool* freeTSRow) { - TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader); - TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader); + TSDBROW* pRow = NULL; + TSDBROW* piRow = NULL; + + getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader, &pRow); + getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader, &piRow); SArray* pDelList = pBlockScanInfo->delSkyline; uint64_t uid = pBlockScanInfo->uid; diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index 84f235ab25..f0918ceb22 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -37,7 +37,10 @@ static int32_t initBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) { return TSDB_CODE_OUT_OF_MEMORY; } - taosArrayPush(pBuf->pData, &p); + void* px = taosArrayPush(pBuf->pData, &p); + if (px == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } } if (remainder > 0) { @@ -45,7 +48,10 @@ static int32_t initBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) { if (p == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - taosArrayPush(pBuf->pData, &p); + void* px = taosArrayPush(pBuf->pData, &p); + if (px == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } } pBuf->numOfTables = numOfTables; @@ -86,7 +92,10 @@ int32_t ensureBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) { return TSDB_CODE_OUT_OF_MEMORY; } - taosArrayPush(pBuf->pData, &p); + void* px = taosArrayPush(pBuf->pData, &p); + if (px == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } } if (remainder > 0) { @@ -94,7 +103,10 @@ int32_t ensureBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) { if (p == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - taosArrayPush(pBuf->pData, &p); + void* px = taosArrayPush(pBuf->pData, &p); + if (px == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } } pBuf->numOfTables = numOfTables; @@ -214,7 +226,8 @@ void clearRowKey(SRowKey* pKey) { taosMemoryFreeClear(pKey->pks[0].pData); } -static void initLastProcKey(STableBlockScanInfo *pScanInfo, STsdbReader* pReader) { +static int32_t initLastProcKey(STableBlockScanInfo *pScanInfo, STsdbReader* pReader) { + int32_t code = 0; int32_t numOfPks = pReader->suppInfo.numOfPks; bool asc = ASCENDING_TRAVERSE(pReader->info.order); int8_t type = pReader->suppInfo.pk.type; @@ -225,18 +238,37 @@ static void initLastProcKey(STableBlockScanInfo *pScanInfo, STsdbReader* pReader int64_t skey = pReader->info.window.skey; int64_t ts = (skey > INT64_MIN) ? (skey - 1) : skey; - initRowKey(pRowKey, ts, numOfPks, type, bytes, asc); - initRowKey(&pScanInfo->sttKeyInfo.nextProcKey, skey, numOfPks, type, bytes, asc); + code = initRowKey(pRowKey, ts, numOfPks, type, bytes, asc); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + code = initRowKey(&pScanInfo->sttKeyInfo.nextProcKey, skey, numOfPks, type, bytes, asc); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } else { int64_t ekey = pReader->info.window.ekey; int64_t ts = (ekey < INT64_MAX) ? (ekey + 1) : ekey; - initRowKey(pRowKey, ts, numOfPks, type, bytes, asc); - initRowKey(&pScanInfo->sttKeyInfo.nextProcKey, ekey, numOfPks, type, bytes, asc); + code = initRowKey(pRowKey, ts, numOfPks, type, bytes, asc); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + code = initRowKey(&pScanInfo->sttKeyInfo.nextProcKey, ekey, numOfPks, type, bytes, asc); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } - initRowKey(&pScanInfo->sttRange.skey, INT64_MAX, numOfPks, type, bytes, asc); - initRowKey(&pScanInfo->sttRange.ekey, INT64_MIN, numOfPks, type, bytes, asc); + code = initRowKey(&pScanInfo->sttRange.skey, INT64_MAX, numOfPks, type, bytes, asc); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + code = initRowKey(&pScanInfo->sttRange.ekey, INT64_MIN, numOfPks, type, bytes, asc); + return code; } int32_t initTableBlockScanInfo(STableBlockScanInfo* pScanInfo, uint64_t uid, SSHashObj* pTableMap, @@ -248,10 +280,13 @@ int32_t initTableBlockScanInfo(STableBlockScanInfo* pScanInfo, uint64_t uid, SSH pScanInfo->cleanSttBlocks = false; pScanInfo->sttBlockReturned = false; - initLastProcKey(pScanInfo, pReader); + int32_t code = initLastProcKey(pScanInfo, pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } pScanInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT; - int32_t code = tSimpleHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES); + code = tSimpleHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -607,7 +642,10 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3 STableBlockScanInfo* pTableScanInfo = taosArrayGetP(pTableList, 0); for (int32_t i = 0; i < numOfBlocks; ++i) { STableDataBlockIdx tableDataBlockIdx = {.globalIndex = i}; - taosArrayPush(pTableScanInfo->pBlockIdxList, &tableDataBlockIdx); + void* px = taosArrayPush(pTableScanInfo->pBlockIdxList, &tableDataBlockIdx); + if (px == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } } taosArrayAddAll(pBlockIter->blockList, pTableScanInfo->pBlockList); @@ -640,11 +678,18 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3 int32_t index = sup.indexPerTable[pos]++; SFileDataBlockInfo* pBlockInfo = taosArrayGet(sup.pDataBlockInfo[pos][index].pInfo->pBlockList, index); - taosArrayPush(pBlockIter->blockList, pBlockInfo); + void* px = taosArrayPush(pBlockIter->blockList, pBlockInfo); + if (px == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } STableBlockScanInfo* pTableScanInfo = sup.pDataBlockInfo[pos][index].pInfo; STableDataBlockIdx tableDataBlockIdx = {.globalIndex = numOfTotal}; - taosArrayPush(pTableScanInfo->pBlockIdxList, &tableDataBlockIdx); + + px = taosArrayPush(pTableScanInfo->pBlockIdxList, &tableDataBlockIdx); + if (px == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } // set data block index overflow, in order to disable the offset comparator if (sup.indexPerTable[pos] >= sup.numOfBlocksPerTable[pos]) { @@ -752,7 +797,10 @@ static int32_t doCheckTombBlock(STombBlock* pBlock, STsdbReader* pReader, int32_ if (record.version <= pReader->info.verRange.maxVer) { SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey}; - taosArrayPush(pScanInfo->pFileDelData, &delData); + void* px = taosArrayPush(pScanInfo->pFileDelData, &delData); + if (px == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } } } @@ -858,7 +906,10 @@ int32_t loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piM p = pMemTbData->pHead; while (p) { if (p->version <= ver) { - taosArrayPush(pMemDelData, p); + void* px = taosArrayPush(pMemDelData, p); + if (px == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } } p = p->pNext; @@ -870,7 +921,10 @@ int32_t loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piM p = piMemTbData->pHead; while (p) { if (p->version <= ver) { - taosArrayPush(pMemDelData, p); + void* px = taosArrayPush(pMemDelData, p); + if (px == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } } p = p->pNext; } @@ -914,7 +968,7 @@ int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo } if (index >= pStatisBlock->numOfRecords) { - tStatisBlockDestroy(pStatisBlock); + (void) tStatisBlockDestroy(pStatisBlock); taosMemoryFreeClear(pStatisBlock); return num; } @@ -924,7 +978,7 @@ int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo while (i < TARRAY2_SIZE(pStatisBlkArray) && uidIndex < numOfTables) { p = &pStatisBlkArray->data[i]; if (p->minTbid.suid > suid) { - tStatisBlockDestroy(pStatisBlock); + (void) tStatisBlockDestroy(pStatisBlock); taosMemoryFreeClear(pStatisBlock); return num; } @@ -944,7 +998,7 @@ int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo } } - tStatisBlockDestroy(pStatisBlock); + (void) tStatisBlockDestroy(pStatisBlock); taosMemoryFreeClear(pStatisBlock); return num; } @@ -961,14 +1015,17 @@ static void loadNextStatisticsBlock(SSttFileReader* pSttFileReader, STbStatisBlo } } -void doAdjustValidDataIters(SArray* pLDIterList, int32_t numOfFileObj) { +int32_t doAdjustValidDataIters(SArray* pLDIterList, int32_t numOfFileObj) { int32_t size = taosArrayGetSize(pLDIterList); if (size < numOfFileObj) { int32_t inc = numOfFileObj - size; for (int32_t k = 0; k < inc; ++k) { SLDataIter* pIter = taosMemoryCalloc(1, sizeof(SLDataIter)); - taosArrayPush(pLDIterList, &pIter); + void* px = taosArrayPush(pLDIterList, &pIter); + if (px == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } } } else if (size > numOfFileObj) { // remove unused LDataIter int32_t inc = size - numOfFileObj; @@ -978,6 +1035,8 @@ void doAdjustValidDataIters(SArray* pLDIterList, int32_t numOfFileObj) { destroyLDataIter(pIter); } } + + return TSDB_CODE_SUCCESS; } int32_t adjustSttDataIters(SArray* pSttFileBlockIterArray, STFileSet* pFileSet) { @@ -986,7 +1045,10 @@ int32_t adjustSttDataIters(SArray* pSttFileBlockIterArray, STFileSet* pFileSet) // add the list/iter placeholder while (taosArrayGetSize(pSttFileBlockIterArray) < numOfLevels) { SArray* pList = taosArrayInit(4, POINTER_BYTES); - taosArrayPush(pSttFileBlockIterArray, &pList); + void* px = taosArrayPush(pSttFileBlockIterArray, &pList); + if (px == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } } for (int32_t j = 0; j < numOfLevels; ++j) { diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index 45a2384b65..865e8e2d41 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -247,7 +247,6 @@ typedef struct SDataBlockIter { typedef struct SFileBlockDumpInfo { int32_t totalRows; int32_t rowIndex; -// STsdbRowKey lastKey; // this key should be removed bool allDumped; } SFileBlockDumpInfo;