From fed9efa84043eaf6303e7c86af55d7af2973cf8c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 1 Sep 2022 18:46:50 +0800 Subject: [PATCH] fix(query): check memory malloc failure. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 79 +++++++++++++++++--------- 1 file changed, 53 insertions(+), 26 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index a92e8189a1..17f30b5194 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -184,9 +184,9 @@ static int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pR static void setComposedBlockFlag(STsdbReader* pReader, bool composed); static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order); -static void doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow, +static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow, STsdbReader* pReader, bool* freeTSRow); -static void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, +static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow); static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key, STsdbReader* pReader); @@ -1395,7 +1395,11 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* } } - tRowMergerGetRow(&merge, &pTSRow); + int32_t code = tRowMergerGetRow(&merge, &pTSRow); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); taosMemoryFree(pTSRow); @@ -1422,7 +1426,11 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); } - tRowMergerGetRow(&merge, &pTSRow); + int32_t code = tRowMergerGetRow(&merge, &pTSRow); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); taosMemoryFree(pTSRow); @@ -1456,12 +1464,16 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge); - tRowMergerGetRow(&merge, &pTSRow); + int32_t code = tRowMergerGetRow(&merge, &pTSRow); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); taosMemoryFree(pTSRow); tRowMergerClear(&merge); - return TSDB_CODE_SUCCESS; + return code; } else { ASSERT(0); return TSDB_CODE_SUCCESS; @@ -1618,12 +1630,16 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* } } - tRowMergerGetRow(&merge, &pTSRow); + int32_t code = tRowMergerGetRow(&merge, &pTSRow); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); taosMemoryFree(pTSRow); tRowMergerClear(&merge); - return TSDB_CODE_SUCCESS; + return code; } #if 0 @@ -1907,7 +1923,11 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc tRowMergerInit(&merge, &fRow, pReader->pSchema); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); - tRowMergerGetRow(&merge, &pTSRow); + int32_t code = tRowMergerGetRow(&merge, &pTSRow); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); taosMemoryFree(pTSRow); @@ -3026,8 +3046,8 @@ int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockSc return TSDB_CODE_SUCCESS; } -void doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow, - STsdbReader* pReader, bool* freeTSRow) { +int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow, + STsdbReader* pReader, bool* freeTSRow) { TSDBROW* pNextRow = NULL; TSDBROW current = *pRow; @@ -3037,19 +3057,19 @@ void doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SAr if (!pIter->hasVal) { *pTSRow = current.pTSRow; *freeTSRow = false; - return; + return TSDB_CODE_SUCCESS; } else { // has next point in mem/imem pNextRow = getValidMemRow(pIter, pDelList, pReader); if (pNextRow == NULL) { *pTSRow = current.pTSRow; *freeTSRow = false; - return; + return TSDB_CODE_SUCCESS; } if (current.pTSRow->ts != pNextRow->pTSRow->ts) { *pTSRow = current.pTSRow; *freeTSRow = false; - return; + return TSDB_CODE_SUCCESS; } } } @@ -3069,13 +3089,17 @@ void doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SAr tRowMergerAdd(&merge, pNextRow, pTSchema1); doMergeRowsInBuf(pIter, uid, current.pTSRow->ts, pDelList, &merge, pReader); - tRowMergerGetRow(&merge, pTSRow); - tRowMergerClear(&merge); + int32_t code = tRowMergerGetRow(&merge, pTSRow); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + tRowMergerClear(&merge); *freeTSRow = true; + return TSDB_CODE_SUCCESS; } -void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, +int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow) { SRowMerger merge = {0}; @@ -3100,7 +3124,8 @@ void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlo doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); } - tRowMergerGetRow(&merge, pTSRow); + int32_t code = tRowMergerGetRow(&merge, pTSRow); + return code; } int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow, int64_t endKey, @@ -3130,28 +3155,30 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR TSDBKEY k = TSDBROW_KEY(pRow); TSDBKEY ik = TSDBROW_KEY(piRow); + int32_t code = TSDB_CODE_SUCCESS; if (ik.ts != k.ts) { if (((ik.ts < k.ts) && asc) || ((ik.ts > k.ts) && (!asc))) { // ik.ts < k.ts - doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow); + code = doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow); } else if (((k.ts < ik.ts) && asc) || ((k.ts > ik.ts) && (!asc))) { - doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow); + code = doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow); } } else { // ik.ts == k.ts - doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow); *freeTSRow = true; + code = doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } - return TSDB_CODE_SUCCESS; + return code; } if (pBlockScanInfo->iter.hasVal && pRow != NULL) { - doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow); - return TSDB_CODE_SUCCESS; + return doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow); } if (pBlockScanInfo->iiter.hasVal && piRow != NULL) { - doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow); - return TSDB_CODE_SUCCESS; + return doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow); } return TSDB_CODE_SUCCESS;