From cd0722fe1f52641d1312d01369ae7e87d4a8a980 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 23 Jul 2024 16:38:05 +0800 Subject: [PATCH] fix(stream): check return value. --- source/libs/executor/inc/tsort.h | 8 ++--- source/libs/executor/src/tsort.c | 53 ++++++++++++++++++++++++-------- 2 files changed, 45 insertions(+), 16 deletions(-) diff --git a/source/libs/executor/inc/tsort.h b/source/libs/executor/inc/tsort.h index c6f3986776..cf1464ccd7 100644 --- a/source/libs/executor/inc/tsort.h +++ b/source/libs/executor/inc/tsort.h @@ -165,13 +165,13 @@ void tsortGetValue(STupleHandle* pVHandle, int32_t colId, void** pVal); * @return */ uint64_t tsortGetGroupId(STupleHandle* pVHandle); -void* tsortGetBlockInfo(STupleHandle* pVHandle); +void tsortGetBlockInfo(STupleHandle* pVHandle, SDataBlockInfo* pInfo); /** * * @param pSortHandle * @return */ -SSDataBlock* tsortGetSortedDataBlock(const SSortHandle* pSortHandle); +int32_t tsortGetSortedDataBlock(const SSortHandle* pSortHandle, SSDataBlock** pBlock); /** * return the sort execution information. @@ -215,8 +215,8 @@ int32_t tsortCompAndBuildKeys(const SArray* pSortCols, char* keyBuf, int32_t* ke */ void tsortSetMergeLimitReachedFp(SSortHandle* pHandle, void (*mergeLimitReached)(uint64_t tableUid, void* param), void* param); -int tsortComparBlockCell(SSDataBlock* pLeftBlock, SSDataBlock* pRightBlock, - int32_t leftRowIndex, int32_t rightRowIndex, void* pOrder); +int tsortComparBlockCell(SSDataBlock* pLeftBlock, SSDataBlock* pRightBlock, int32_t leftRowIndex, int32_t rightRowIndex, + void* pOrder); #ifdef __cplusplus } #endif diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 33596c4802..b4e5a9a01a 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -167,8 +167,17 @@ static void* tupleGetField(char* t, uint32_t colIdx, uint32_t colNum) { return t + *tupleOffset(t, colIdx); } -SSDataBlock* tsortGetSortedDataBlock(const SSortHandle* pSortHandle) { - return createOneDataBlock(pSortHandle->pDataBlock, false); +int32_t tsortGetSortedDataBlock(const SSortHandle* pSortHandle, SSDataBlock** pBlock) { + if (pBlock == NULL) { + return TSDB_CODE_INVALID_PARA; + } + + *pBlock = createOneDataBlock(pSortHandle->pDataBlock, false); + if (*pBlock == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } else { + return TSDB_CODE_SUCCESS; + } } #define AllocatedTupleType 0 @@ -1099,7 +1108,11 @@ int32_t tsortAppendTupleToBlock(SSortHandle* pHandle, SSDataBlock* pBlock, STupl } pBlock->info.dataLoad = 1; - pBlock->info.scanFlag = ((SDataBlockInfo*)tsortGetBlockInfo(pTupleHandle))->scanFlag; + + SDataBlockInfo info = {0}; + tsortGetBlockInfo(pTupleHandle, &info); + + pBlock->info.scanFlag = info.scanFlag; pBlock->info.rows += 1; } else { @@ -1121,7 +1134,10 @@ int32_t tsortAppendTupleToBlock(SSortHandle* pHandle, SSDataBlock* pBlock, STupl } pBlock->info.dataLoad = 1; - pBlock->info.scanFlag = ((SDataBlockInfo*)tsortGetBlockInfo(pTupleHandle))->scanFlag; + SDataBlockInfo info = {0}; + tsortGetBlockInfo(pTupleHandle, &info); + + pBlock->info.scanFlag = info.scanFlag; pBlock->info.rows += 1; } @@ -1463,35 +1479,41 @@ static int32_t initRowIdSort(SSortHandle* pHandle) { SColumnInfoData pkCol = {0}; SSDataBlock* pSortInput = createDataBlock(); - pHandle->pDataBlock = pSortInput; - SColumnInfoData tsCol = createColumnInfoData(TSDB_DATA_TYPE_TIMESTAMP, 8, 1); int32_t code = blockDataAppendColInfo(pSortInput, &tsCol); if (code) { + blockDataDestroy(pSortInput); return code; } SColumnInfoData regionIdCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 2); code = blockDataAppendColInfo(pSortInput, ®ionIdCol); if (code) { + blockDataDestroy(pSortInput); return code; } SColumnInfoData offsetCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 3); code = blockDataAppendColInfo(pSortInput, &offsetCol); if (code) { + blockDataDestroy(pSortInput); return code; } SColumnInfoData lengthCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 4); code = blockDataAppendColInfo(pSortInput, &lengthCol); if (code) { + blockDataDestroy(pSortInput); return code; } if (pHandle->bSortPk) { pkCol = createColumnInfoData(extPkCol->info.type, extPkCol->info.bytes, 5); code = blockDataAppendColInfo(pSortInput, &pkCol); + if (code) { + blockDataDestroy(pSortInput); + return code; + } } blockDataDestroy(pHandle->pDataBlock); @@ -1990,12 +2012,13 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { bool bExtractedBlock = false; bool bSkipBlock = false; if (pBlk != NULL && pHandle->mergeLimit > 0) { - pBlk = NULL; - - code = getRowsBlockWithinMergeLimit(pHandle, mTableNumRows, pBlk, &bExtractedBlock, &bSkipBlock, &pBlk); + SSDataBlock* p = NULL; + code = getRowsBlockWithinMergeLimit(pHandle, mTableNumRows, pBlk, &bExtractedBlock, &bSkipBlock, &p); if (bSkipBlock || code != 0) { continue; } + + pBlk = p; } if (pBlk != NULL) { @@ -2416,15 +2439,21 @@ static int32_t tsortOpenForPQSort(SSortHandle* pHandle) { while (1) { // fetch data SSDataBlock* pBlock = pHandle->fetchfp(source->param); - if (NULL == pBlock) break; + if (NULL == pBlock) { + break; + } if (pHandle->beforeFp != NULL) { pHandle->beforeFp(pBlock, pHandle->param); } + if (pHandle->pDataBlock == NULL) { pHandle->pDataBlock = createOneDataBlock(pBlock, false); } - if (pHandle->pDataBlock == NULL) return TSDB_CODE_OUT_OF_MEMORY; + + if (pHandle->pDataBlock == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } size_t colNum = blockDataGetNumOfCols(pBlock); @@ -2581,7 +2610,7 @@ void tsortGetValue(STupleHandle* pVHandle, int32_t colIndex, void** pVal) { } uint64_t tsortGetGroupId(STupleHandle* pVHandle) { return pVHandle->pBlock->info.id.groupId; } -void* tsortGetBlockInfo(STupleHandle* pVHandle) { return &pVHandle->pBlock->info; } +void tsortGetBlockInfo(STupleHandle* pVHandle, SDataBlockInfo* pBlockInfo) { *pBlockInfo = pVHandle->pBlock->info; } SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle) { SSortExecInfo info = {0};