fix(stream): check return value.

This commit is contained in:
Haojun Liao 2024-08-05 18:02:32 +08:00
parent 6e1d061ce4
commit 3772f25840
9 changed files with 52 additions and 16 deletions

View File

@ -1620,6 +1620,7 @@ void blockDataFreeRes(SSDataBlock* pBlock) {
if (pBlock == NULL){
return;
}
int32_t numOfOutput = taosArrayGetSize(pBlock->pDataBlock);
for (int32_t i = 0; i < numOfOutput; ++i) {
SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);

View File

@ -26,7 +26,7 @@
#define T_LONG_JMP(_obj, _c) \
do { \
ASSERT((_c) != 1); \
ASSERT((_c) != 1); \
longjmp((_obj), (_c)); \
} while (0)

View File

@ -1692,8 +1692,11 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
SDataType* pType = &pColNode->node.resType;
pExp->base.resSchema =
createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pColNode->colName);
pExp->base.pParam[0].pCol =
createColumn(pColNode->dataBlockId, pColNode->slotId, pColNode->colId, pType, pColNode->colType);
QUERY_CHECK_NULL(pExp->base.pParam[0].pCol, code, lino, _end, terrno);
pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN;
} else if (type == QUERY_NODE_VALUE) {
pExp->pExpr->nodeType = QUERY_NODE_VALUE;
@ -1761,6 +1764,7 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
pExp->base.pParam[j].type = FUNC_PARAM_TYPE_COLUMN;
pExp->base.pParam[j].pCol =
createColumn(pcn->dataBlockId, pcn->slotId, pcn->colId, &pcn->node.resType, pcn->colType);
QUERY_CHECK_NULL(pExp->base.pParam[j].pCol, code, lino, _end, terrno);
} else if (p1->type == QUERY_NODE_VALUE) {
SValueNode* pvn = (SValueNode*)p1;
pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE;
@ -2437,7 +2441,6 @@ void tableListDestroy(STableListInfo* pTableListInfo) {
}
taosArrayDestroy(pTableListInfo->pTableList);
pTableListInfo->pTableList = NULL;
taosMemoryFreeClear(pTableListInfo->groupOffset);
taosHashCleanup(pTableListInfo->map);

View File

@ -1139,8 +1139,8 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo
int32_t numOfCols = 0;
SExprInfo* pExprInfo = NULL;
code = createExprInfo(pPartNode->pTargets, NULL, &pExprInfo, &numOfCols);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->pGroupCols = makeColumnArrayFromList(pPartNode->pPartitionKeys);

View File

@ -594,9 +594,11 @@ int32_t createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size_t numS
SSortMergeInfo* pSortMergeInfo = &pInfo->sortMergeInfo;
initLimitInfo(pMergePhyNode->node.pLimit, pMergePhyNode->node.pSlimit, &pInfo->limitInfo);
pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
TSDB_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
SSDataBlock* pInputBlock = createDataBlockFromDescNode(pChildNode->pOutputDataBlockDesc);
TSDB_CHECK_NULL(pInputBlock, code, lino, _error, terrno);
initResultSizeInfo(&pOperator->resultInfo, 1024);
code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
@ -620,6 +622,8 @@ int32_t createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size_t numS
case MERGE_TYPE_NON_SORT: {
SNonSortMergeInfo* pNonSortMerge = &pInfo->nsortMergeInfo;
pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
TSDB_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
initResultSizeInfo(&pOperator->resultInfo, 1024);
code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
TSDB_CHECK_CODE(code, lino, _error);
@ -629,6 +633,8 @@ int32_t createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size_t numS
case MERGE_TYPE_COLUMNS: {
SColsMergeInfo* pColsMerge = &pInfo->colsMergeInfo;
pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
TSDB_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
initResultSizeInfo(&pOperator->resultInfo, 1);
code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
TSDB_CHECK_CODE(code, lino, _error);

View File

@ -64,6 +64,10 @@ int32_t doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC
p->id.queryId = queryId;
p->id.taskId = taskId;
p->id.str = taosMemoryMalloc(64);
if (p->id.str == NULL) {
return terrno;
}
buildTaskId(taskId, queryId, p->id.str);
p->schemaInfos = taosArrayInit(1, sizeof(SSchemaInfo));
if (p->id.str == NULL || p->schemaInfos == NULL) {
@ -174,9 +178,16 @@ int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNo
schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow);
}
if (schemaInfo.sw == NULL) {
return terrno;
}
pAPI->metaReaderFn.clearReader(&mr);
schemaInfo.qsw = extractQueriedColumnSchema(pScanNode);
if (schemaInfo.qsw == NULL) {
return terrno;
}
void* p = taosArrayPush(pTaskInfo->schemaInfos, &schemaInfo);
return (p != NULL)? TSDB_CODE_SUCCESS:TSDB_CODE_OUT_OF_MEMORY;
}
@ -186,7 +197,14 @@ SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) {
int32_t numOfTags = LIST_LENGTH(pScanNode->pScanPseudoCols);
SSchemaWrapper* pqSw = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
if (pqSw == NULL) {
return NULL;
}
pqSw->pSchema = taosMemoryCalloc(numOfCols + numOfTags, sizeof(SSchema));
if (pqSw->pSchema == NULL) {
return NULL;
}
for (int32_t i = 0; i < numOfCols; ++i) {
STargetNode* pNode = (STargetNode*)nodesListGetNode(pScanNode->pScanCols, i);

View File

@ -182,8 +182,8 @@ static void* tupleGetField(char* t, uint32_t colIdx, uint32_t colNum) {
}
int32_t tsortGetSortedDataBlock(const SSortHandle* pSortHandle, SSDataBlock** pBlock) {
*pBlock = NULL;
if (pSortHandle->pDataBlock == NULL) {
*pBlock = NULL;
return TSDB_CODE_SUCCESS;
}
return createOneDataBlock(pSortHandle->pDataBlock, false, pBlock);
@ -2478,7 +2478,9 @@ static bool tsortOpenForBufMergeSort(SSortHandle* pHandle) {
return code;
}
return tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn);
code = tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn);
ASSERT(code != 1);
return code;
}
void tsortClose(SSortHandle* pHandle) {
@ -2808,19 +2810,24 @@ static int32_t tsortSingleTableMergeNextTuple(SSortHandle* pHandle, STupleHandle
}
int32_t tsortOpen(SSortHandle* pHandle) {
int32_t code = 0;
if (pHandle->opened) {
return 0;
return code;
}
if (pHandle->fetchfp == NULL || pHandle->comparFn == NULL) {
return TSDB_CODE_INVALID_PARA;
if (pHandle == NULL || pHandle->fetchfp == NULL || pHandle->comparFn == NULL) {
code = TSDB_CODE_INVALID_PARA;
return code;
}
pHandle->opened = true;
if (tsortIsPQSortApplicable(pHandle))
return tsortOpenForPQSort(pHandle);
else
return tsortOpenForBufMergeSort(pHandle);
if (tsortIsPQSortApplicable(pHandle)) {
code = tsortOpenForPQSort(pHandle);
} else {
code = tsortOpenForBufMergeSort(pHandle);
}
return code;
}
int32_t tsortNextTuple(SSortHandle* pHandle, STupleHandle** pTupleHandle) {

View File

@ -95,11 +95,12 @@ int32_t taosArrayEnsureCap(SArray* pArray, size_t newCap) {
tsize = (newSize == tsize) ? (tsize + 2) : newSize;
}
pArray->pData = taosMemoryRealloc(pArray->pData, tsize * pArray->elemSize);
if (pArray->pData == NULL) {
char* p = taosMemoryRealloc(pArray->pData, tsize * pArray->elemSize);
if (p == NULL) {
return terrno;
}
pArray->pData = p;
pArray->capacity = tsize;
}
return 0;

View File

@ -39,7 +39,7 @@ int32_t tMergeTreeCreate(SMultiwayMergeTreeInfo** pTree, uint32_t numOfSources,
(SMultiwayMergeTreeInfo*)taosMemoryCalloc(1, sizeof(SMultiwayMergeTreeInfo) + sizeof(STreeNode) * totalEntries);
if (pTreeInfo == NULL) {
uError("allocate memory for loser-tree failed. reason:%s", strerror(errno));
return TAOS_SYSTEM_ERROR(errno);
return terrno;
}
pTreeInfo->pNode = (STreeNode*)(((char*)pTreeInfo) + sizeof(SMultiwayMergeTreeInfo));