fix(stream): check return value.

This commit is contained in:
Haojun Liao 2024-07-23 16:38:05 +08:00
parent 7497f193c7
commit cd0722fe1f
2 changed files with 45 additions and 16 deletions

View File

@ -165,13 +165,13 @@ void tsortGetValue(STupleHandle* pVHandle, int32_t colId, void** pVal);
* @return
*/
uint64_t tsortGetGroupId(STupleHandle* pVHandle);
void* tsortGetBlockInfo(STupleHandle* pVHandle);
void tsortGetBlockInfo(STupleHandle* pVHandle, SDataBlockInfo* pInfo);
/**
*
* @param pSortHandle
* @return
*/
SSDataBlock* tsortGetSortedDataBlock(const SSortHandle* pSortHandle);
int32_t tsortGetSortedDataBlock(const SSortHandle* pSortHandle, SSDataBlock** pBlock);
/**
* return the sort execution information.
@ -215,8 +215,8 @@ int32_t tsortCompAndBuildKeys(const SArray* pSortCols, char* keyBuf, int32_t* ke
*/
void tsortSetMergeLimitReachedFp(SSortHandle* pHandle, void (*mergeLimitReached)(uint64_t tableUid, void* param), void* param);
int tsortComparBlockCell(SSDataBlock* pLeftBlock, SSDataBlock* pRightBlock,
int32_t leftRowIndex, int32_t rightRowIndex, void* pOrder);
int tsortComparBlockCell(SSDataBlock* pLeftBlock, SSDataBlock* pRightBlock, int32_t leftRowIndex, int32_t rightRowIndex,
void* pOrder);
#ifdef __cplusplus
}
#endif

View File

@ -167,8 +167,17 @@ static void* tupleGetField(char* t, uint32_t colIdx, uint32_t colNum) {
return t + *tupleOffset(t, colIdx);
}
SSDataBlock* tsortGetSortedDataBlock(const SSortHandle* pSortHandle) {
return createOneDataBlock(pSortHandle->pDataBlock, false);
int32_t tsortGetSortedDataBlock(const SSortHandle* pSortHandle, SSDataBlock** pBlock) {
if (pBlock == NULL) {
return TSDB_CODE_INVALID_PARA;
}
*pBlock = createOneDataBlock(pSortHandle->pDataBlock, false);
if (*pBlock == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
} else {
return TSDB_CODE_SUCCESS;
}
}
#define AllocatedTupleType 0
@ -1099,7 +1108,11 @@ int32_t tsortAppendTupleToBlock(SSortHandle* pHandle, SSDataBlock* pBlock, STupl
}
pBlock->info.dataLoad = 1;
pBlock->info.scanFlag = ((SDataBlockInfo*)tsortGetBlockInfo(pTupleHandle))->scanFlag;
SDataBlockInfo info = {0};
tsortGetBlockInfo(pTupleHandle, &info);
pBlock->info.scanFlag = info.scanFlag;
pBlock->info.rows += 1;
} else {
@ -1121,7 +1134,10 @@ int32_t tsortAppendTupleToBlock(SSortHandle* pHandle, SSDataBlock* pBlock, STupl
}
pBlock->info.dataLoad = 1;
pBlock->info.scanFlag = ((SDataBlockInfo*)tsortGetBlockInfo(pTupleHandle))->scanFlag;
SDataBlockInfo info = {0};
tsortGetBlockInfo(pTupleHandle, &info);
pBlock->info.scanFlag = info.scanFlag;
pBlock->info.rows += 1;
}
@ -1463,35 +1479,41 @@ static int32_t initRowIdSort(SSortHandle* pHandle) {
SColumnInfoData pkCol = {0};
SSDataBlock* pSortInput = createDataBlock();
pHandle->pDataBlock = pSortInput;
SColumnInfoData tsCol = createColumnInfoData(TSDB_DATA_TYPE_TIMESTAMP, 8, 1);
int32_t code = blockDataAppendColInfo(pSortInput, &tsCol);
if (code) {
blockDataDestroy(pSortInput);
return code;
}
SColumnInfoData regionIdCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 2);
code = blockDataAppendColInfo(pSortInput, &regionIdCol);
if (code) {
blockDataDestroy(pSortInput);
return code;
}
SColumnInfoData offsetCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 3);
code = blockDataAppendColInfo(pSortInput, &offsetCol);
if (code) {
blockDataDestroy(pSortInput);
return code;
}
SColumnInfoData lengthCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 4);
code = blockDataAppendColInfo(pSortInput, &lengthCol);
if (code) {
blockDataDestroy(pSortInput);
return code;
}
if (pHandle->bSortPk) {
pkCol = createColumnInfoData(extPkCol->info.type, extPkCol->info.bytes, 5);
code = blockDataAppendColInfo(pSortInput, &pkCol);
if (code) {
blockDataDestroy(pSortInput);
return code;
}
}
blockDataDestroy(pHandle->pDataBlock);
@ -1990,12 +2012,13 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
bool bExtractedBlock = false;
bool bSkipBlock = false;
if (pBlk != NULL && pHandle->mergeLimit > 0) {
pBlk = NULL;
code = getRowsBlockWithinMergeLimit(pHandle, mTableNumRows, pBlk, &bExtractedBlock, &bSkipBlock, &pBlk);
SSDataBlock* p = NULL;
code = getRowsBlockWithinMergeLimit(pHandle, mTableNumRows, pBlk, &bExtractedBlock, &bSkipBlock, &p);
if (bSkipBlock || code != 0) {
continue;
}
pBlk = p;
}
if (pBlk != NULL) {
@ -2416,15 +2439,21 @@ static int32_t tsortOpenForPQSort(SSortHandle* pHandle) {
while (1) {
// fetch data
SSDataBlock* pBlock = pHandle->fetchfp(source->param);
if (NULL == pBlock) break;
if (NULL == pBlock) {
break;
}
if (pHandle->beforeFp != NULL) {
pHandle->beforeFp(pBlock, pHandle->param);
}
if (pHandle->pDataBlock == NULL) {
pHandle->pDataBlock = createOneDataBlock(pBlock, false);
}
if (pHandle->pDataBlock == NULL) return TSDB_CODE_OUT_OF_MEMORY;
if (pHandle->pDataBlock == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
size_t colNum = blockDataGetNumOfCols(pBlock);
@ -2581,7 +2610,7 @@ void tsortGetValue(STupleHandle* pVHandle, int32_t colIndex, void** pVal) {
}
uint64_t tsortGetGroupId(STupleHandle* pVHandle) { return pVHandle->pBlock->info.id.groupId; }
void* tsortGetBlockInfo(STupleHandle* pVHandle) { return &pVHandle->pBlock->info; }
void tsortGetBlockInfo(STupleHandle* pVHandle, SDataBlockInfo* pBlockInfo) { *pBlockInfo = pVHandle->pBlock->info; }
SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle) {
SSortExecInfo info = {0};