From f0e4d2f085e59663684d7abfc1f85619892cd109 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Mon, 5 Aug 2024 12:22:25 +0800 Subject: [PATCH] fix malloc issue --- source/libs/executor/src/cachescanoperator.c | 2 + .../libs/executor/src/eventwindowoperator.c | 3 ++ source/libs/executor/src/exchangeoperator.c | 5 +++ source/libs/executor/src/executil.c | 28 ++++++++++-- source/libs/executor/src/executor.c | 1 + source/libs/executor/src/executorInt.c | 3 ++ source/libs/executor/src/filloperator.c | 3 ++ source/libs/executor/src/groupoperator.c | 10 +++++ source/libs/executor/src/scanoperator.c | 17 ++++++++ source/libs/executor/src/sortoperator.c | 3 ++ .../executor/src/streamcountwindowoperator.c | 4 ++ .../executor/src/streameventwindowoperator.c | 8 ++++ source/libs/executor/src/streamfilloperator.c | 19 ++++++-- .../executor/src/streamtimewindowoperator.c | 43 +++++++++++++++++++ source/libs/executor/src/sysscanoperator.c | 11 +++++ source/libs/executor/src/timesliceoperator.c | 4 ++ source/libs/executor/test/executorTests.cpp | 2 + 17 files changed, 160 insertions(+), 6 deletions(-) diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 9d49c8e9ca..137f05c356 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -165,6 +165,7 @@ int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandl int32_t capacity = 0; pInfo->pUidList = taosArrayInit(4, sizeof(int64_t)); + QUERY_CHECK_NULL(pInfo->pUidList, code, lino, _error, terrno); // partition by tbname if (oneTableForEachGroup(pTableListInfo) || (totalTables == 1)) { @@ -203,6 +204,7 @@ int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandl p->pExprInfo = createExprInfo(pScanNode->scan.pScanPseudoCols, NULL, &p->numOfExprs); p->pCtx = createSqlFunctionCtx(p->pExprInfo, p->numOfExprs, &p->rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore); + QUERY_CHECK_NULL(p->pCtx, code, lino, _error, terrno); } setOperatorInfo(pOperator, "CachedRowScanOperator", QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN, false, OP_NOT_OPENED, diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index 629afbbb8e..90a0c7927e 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -238,6 +238,9 @@ static int32_t setSingleOutputTupleBufv1(SResultRowInfo* pResultRowInfo, STimeWi SExprSupp* pExprSup, SAggSupporter* pAggSup) { if (*pResult == NULL) { SResultRow* p = taosMemoryCalloc(1, pAggSup->resultRowSize); + if (!p) { + return terrno; + } pResultRowInfo->cur = (SResultRowPosition){.pageId = p->pageId, .offset = p->offset}; *pResult = p; } diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 5af8df8f06..2f018d7a69 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -307,6 +307,9 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const int32_t len = strlen(id) + 1; pInfo->pTaskId = taosMemoryCalloc(1, len); + if (!pInfo->pTaskId) { + return terrno; + } strncpy(pInfo->pTaskId, id, len); for (int32_t i = 0; i < numOfSources; ++i) { SSourceDataInfo dataInfo = {0}; @@ -389,7 +392,9 @@ int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNo pInfo->pDummyBlock = createDataBlockFromDescNode(pExNode->node.pOutputDataBlockDesc); pInfo->pResultBlockList = taosArrayInit(64, POINTER_BYTES); + QUERY_CHECK_NULL(pInfo->pResultBlockList, code, lino, _error, terrno); pInfo->pRecycledBlocks = taosArrayInit(64, POINTER_BYTES); + QUERY_CHECK_NULL(pInfo->pRecycledBlocks, code, lino, _error, terrno); SExchangeOpStopInfo stopInfo = {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, pInfo->self}; code = qAppendTaskStopInfo(pTaskInfo, &stopInfo); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 5957d08a18..d0f0fce8fc 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -144,6 +144,7 @@ int32_t initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, void* pData = NULL; pGroupResInfo->pRows = taosArrayInit(size, POINTER_BYTES); + QUERY_CHECK_NULL(pGroupResInfo->pRows, code, lino, _end, terrno); size_t keyLen = 0; int32_t iter = 0; @@ -353,9 +354,15 @@ EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) { } else if (pSColumnNode->node.resType.type == TSDB_DATA_TYPE_JSON) { int32_t len = ((const STag*)p)->len; res->datum.p = taosMemoryCalloc(len + 1, 1); + if (NULL == res->datum.p) { + return DEAL_RES_ERROR; + } memcpy(res->datum.p, p, len); } else if (IS_VAR_DATA_TYPE(pSColumnNode->node.resType.type)) { res->datum.p = taosMemoryCalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1, 1); + if (NULL == res->datum.p) { + return DEAL_RES_ERROR; + } memcpy(varDataVal(res->datum.p), tagVal.pData, tagVal.nData); varDataSetLen(res->datum.p, tagVal.nData); } else { @@ -378,6 +385,9 @@ EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) { int32_t len = strlen(mr->me.name); res->datum.p = taosMemoryCalloc(len + VARSTR_HEADER_SIZE + 1, 1); + if (NULL == res->datum.p) { + return DEAL_RES_ERROR; + } memcpy(varDataVal(res->datum.p), mr->me.name, len); varDataSetLen(res->datum.p, len); nodesDestroyNode(*pNode); @@ -856,6 +866,7 @@ static SArray* getTableNameList(const SNodeListNode* pList) { // remove the duplicates SArray* pNewList = taosArrayInit(taosArrayGetSize(pTbList), sizeof(void*)); + QUERY_CHECK_NULL(pNewList, code, lino, _end, terrno); void* tmp = taosArrayPush(pNewList, taosArrayGet(pTbList, 0)); QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); @@ -1739,6 +1750,7 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) { int32_t numOfParam = LIST_LENGTH(pFuncNode->pParameterList); pExp->base.pParam = taosMemoryCalloc(numOfParam, sizeof(SFunctParam)); + QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno); pExp->base.numOfParams = numOfParam; for (int32_t j = 0; j < numOfParam; ++j) { @@ -1760,6 +1772,7 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) { SOperatorNode* pOpNode = (SOperatorNode*)pNode; pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam)); + QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno); pExp->base.numOfParams = 1; SDataType* pType = &pOpNode->node.resType; @@ -1771,6 +1784,7 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) { SCaseWhenNode* pCaseNode = (SCaseWhenNode*)pNode; pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam)); + QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno); pExp->base.numOfParams = 1; SDataType* pType = &pCaseNode->node.resType; @@ -1781,9 +1795,8 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) { pExp->pExpr->nodeType = QUERY_NODE_OPERATOR; SLogicConditionNode* pCond = (SLogicConditionNode*)pNode; pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam)); - if (!pExp->base.pParam) { - code = terrno; - } + QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno); + if (TSDB_CODE_SUCCESS == code) { pExp->base.numOfParams = 1; SDataType* pType = &pCond->node.resType; @@ -1808,6 +1821,9 @@ int32_t createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode) { SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs) { *numOfExprs = LIST_LENGTH(pNodeList); SExprInfo* pExprs = taosMemoryCalloc(*numOfExprs, sizeof(SExprInfo)); + if (!pExprs) { + return NULL; + } for (int32_t i = 0; i < (*numOfExprs); ++i) { SExprInfo* pExp = &pExprs[i]; @@ -2068,6 +2084,9 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi pCond->numOfCols = LIST_LENGTH(pTableScanNode->scan.pScanCols); pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo)); + if (!pCond->colList) { + return terrno; + } pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t) * pCond->numOfCols); if (pCond->colList == NULL || pCond->pSlotList == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -2448,6 +2467,9 @@ static int32_t sortTableGroup(STableListInfo* pTableListInfo) { int32_t size = taosArrayGetSize(pTableListInfo->pTableList); SArray* pList = taosArrayInit(4, sizeof(int32_t)); + if (!pList) { + return terrno; + } STableKeyInfo* pInfo = taosArrayGet(pTableListInfo->pTableList, 0); uint64_t gid = pInfo->groupId; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 3ee08d8fb0..ef76f14aa9 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -373,6 +373,7 @@ static int32_t filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SArray* qa = taosArrayInit(4, sizeof(tb_uid_t)); + QUERY_CHECK_NULL(qa, code, lino, _end, terrno); int32_t numOfUids = taosArrayGetSize(tableIdList); if (numOfUids == 0) { (*ppArrayRes) = qa; diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index bceefd2c0d..acd500810c 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -409,6 +409,9 @@ static int32_t doCreateConstantValColumnSMAInfo(SInputColumnInfoData* pInput, SF SColumnDataAgg* da = NULL; if (pInput->pColumnDataAgg[paramIndex] == NULL) { da = taosMemoryCalloc(1, sizeof(SColumnDataAgg)); + if (!da) { + return terrno; + } pInput->pColumnDataAgg[paramIndex] = da; if (da == NULL) { return TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index c4ef74608a..964b331598 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -397,6 +397,9 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t pInfo->win.ekey = win.skey; } pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES); + if (pInfo->p) { + return terrno; + } if (pInfo->pFillInfo == NULL || pInfo->p == NULL) { taosMemoryFree(pInfo->pFillInfo); diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index d88aef8fb7..4371879cdc 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -854,6 +854,9 @@ _end: int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity) { size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); int32_t* offset = taosMemoryCalloc(numOfCols, sizeof(int32_t)); + if (!offset) { + return NULL; + } offset[0] = sizeof(int32_t) + sizeof(uint64_t); // the number of rows in current page, ref to SSDataBlock paged serialization format @@ -1193,6 +1196,8 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo blockDataGetCapacityInRow(pInfo->binfo.pRes, getBufPageSize(pInfo->pBuf), blockDataGetSerialMetaSize(taosArrayGetSize(pInfo->binfo.pRes->pDataBlock))); pInfo->columnOffset = setupColumnOffset(pInfo->binfo.pRes, pInfo->rowCapacity); + QUERY_CHECK_NULL(pInfo->columnOffset, code, lino, _error, terrno); + code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols); if (code != TSDB_CODE_SUCCESS) { terrno = code; @@ -1431,6 +1436,7 @@ static void doStreamHashPartitionImpl(SStreamPartitionOperatorInfo* pInfo, SSDat SPartitionDataInfo newParData = {0}; newParData.groupId = calcGroupId(pInfo->partitionSup.keyBuf, keyLen); newParData.rowIds = taosArrayInit(64, sizeof(int32_t)); + QUERY_CHECK_NULL(newParData.rowIds, code, lino, _end, terrno); void* tmp = taosArrayPush(newParData.rowIds, &i); QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); @@ -1594,6 +1600,9 @@ SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); + if (!pBlock) { + return NULL; + } pBlock->info.hasVarCol = false; pBlock->info.id.groupId = 0; pBlock->info.rows = 0; @@ -1601,6 +1610,7 @@ SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag) { pBlock->info.watermark = INT64_MIN; pBlock->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData)); + QUERY_CHECK_NULL(pBlock->pDataBlock, code, lino, _end, terrno); SColumnInfoData infoData = {0}; infoData.info.type = TSDB_DATA_TYPE_VARCHAR; if (tbName->numOfExprs > 0) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index acc3de3447..2daf9e1fc7 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1280,6 +1280,9 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { SFileBlockLoadRecorder* pRecorder = taosMemoryCalloc(1, sizeof(SFileBlockLoadRecorder)); + if (!pRecorder) { + return terrno; + } STableScanInfo* pTableScanInfo = pOptr->info; *pRecorder = pTableScanInfo->base.readRecorder; *pOptrExplain = pRecorder; @@ -1341,6 +1344,7 @@ int32_t createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHa pSup->pExprInfo = createExprInfo(pScanNode->pScanPseudoCols, NULL, &pSup->numOfExprs); pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore); + QUERY_CHECK_NULL(pSup->pCtx, code, lino, _error, terrno); } pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; @@ -3007,6 +3011,9 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo) } void* pUpInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo)); + if (!pUpInfo) { + return; + } int32_t code = pInfo->stateStore.updateInfoDeserialize(buf, tlen, pUpInfo); if (code == TSDB_CODE_SUCCESS) { pInfo->stateStore.updateInfoDestroy(pInfo->pUpdateInfo); @@ -3466,6 +3473,7 @@ static int32_t extractTableIdList(const STableListInfo* pTableListInfo, SArray** int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t)); + QUERY_CHECK_NULL(tableIdList, code, lino, _end, terrno); // Transfer the Array of STableKeyInfo into uid list. size_t size = tableListGetSize(pTableListInfo); @@ -3861,6 +3869,8 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pInfo->primaryKeyIndex = -1; int32_t numOfOutput = taosArrayGetSize(pInfo->matchInfo.pList); pColIds = taosArrayInit(numOfOutput, sizeof(int16_t)); + QUERY_CHECK_NULL(pColIds, code, lino, _error, terrno); + for (int32_t i = 0; i < numOfOutput; ++i) { SColMatchItem* id = taosArrayGet(pInfo->matchInfo.pList, i); @@ -5286,6 +5296,7 @@ int32_t generateSortByTsPkInfo(SArray* colMatchInfo, int32_t order, SArray** ppS int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SArray* pSortInfo = taosArrayInit(1, sizeof(SBlockOrderInfo)); + QUERY_CHECK_NULL(pSortInfo, code, lino, _end, terrno); SBlockOrderInfo biTs = {0}; SBlockOrderInfo biPk = {0}; @@ -5374,9 +5385,11 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) { QUERY_CHECK_CODE(code, lino, _end); STableMergeScanSortSourceParam* param = taosMemoryCalloc(1, sizeof(STableMergeScanSortSourceParam)); + QUERY_CHECK_NULL(param, code, lino, _end, terrno); param->pOperator = pOperator; SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); + QUERY_CHECK_NULL(ps, code, lino, _end, terrno); ps->param = param; ps->onlyRef = false; code = tsortAddSource(pInfo->pSortHandle, ps); @@ -5653,6 +5666,9 @@ int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExpla ASSERT(pOptr != NULL); // TODO: merge these two info into one struct STableMergeScanExecInfo* execInfo = taosMemoryCalloc(1, sizeof(STableMergeScanExecInfo)); + if (!execInfo) { + return terrno; + } STableMergeScanInfo* pInfo = pOptr->info; execInfo->blockRecorder = pInfo->base.readRecorder; execInfo->sortExecInfo = pInfo->sortExecInfo; @@ -5697,6 +5713,7 @@ int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SR pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs); pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore); + QUERY_CHECK_NULL(pSup->pCtx, code, lino, _error, terrno); } pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index a0c56df49c..a4e1e0b648 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -60,6 +60,7 @@ int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortN QRY_OPTR_CHECK(pOptrInfo); int32_t code = 0; + int32_t lino = 0; SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -89,6 +90,7 @@ int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortN pOperator->exprSupp.pCtx = createSqlFunctionCtx(pOperator->exprSupp.pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore); + QUERY_CHECK_NULL(pOperator->exprSupp.pCtx, code, lino, _error, terrno); initResultSizeInfo(&pOperator->resultInfo, 1024); code = filterInitFromNode((SNode*)pSortNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); if (code != TSDB_CODE_SUCCESS) { @@ -778,6 +780,7 @@ int32_t createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNo initResultSizeInfo(&pOperator->resultInfo, 1024); pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore); + QUERY_CHECK_NULL(pOperator->exprSupp.pCtx, code, lino, _error, terrno); pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode); code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 6adc60b79e..27809f8a69 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -651,10 +651,12 @@ static int32_t doStreamCountAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe SOperatorInfo* downstream = pOperator->pDownstream[0]; if (!pInfo->pUpdated) { pInfo->pUpdated = taosArrayInit(16, sizeof(SResultWindowInfo)); + QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno); } if (!pInfo->pStUpdated) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pStUpdated = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pStUpdated, code, lino, _end, terrno); } while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); @@ -857,6 +859,7 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pInfo->binfo.pRes = pResBlock; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pStDeleted = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pStDeleted, code, lino, _error, terrno); pInfo->pDelIterator = NULL; code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes); @@ -878,6 +881,7 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pInfo->recvGetAll = false; pInfo->pPkDeleted = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pPkDeleted, code, lino, _error, terrno); pInfo->destHasPrimaryKey = pCountNode->window.destHasPrimayKey; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT; diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 17ef2fe41f..30032b37ff 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -619,10 +619,12 @@ static int32_t doStreamEventAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe SOperatorInfo* downstream = pOperator->pDownstream[0]; if (!pInfo->pUpdated) { pInfo->pUpdated = taosArrayInit(16, sizeof(SEventWindowInfo)); + QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno); } if (!pInfo->pSeUpdated) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pSeUpdated = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pSeUpdated, code, lino, _end, terrno); } while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); @@ -738,6 +740,9 @@ void streamEventReleaseState(SOperatorInfo* pOperator) { int32_t winSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey); int32_t resSize = winSize + sizeof(TSKEY); char* pBuff = taosMemoryCalloc(1, resSize); + if (!pBuff) { + return ; + } memcpy(pBuff, pInfo->historyWins->pData, winSize); memcpy(pBuff + winSize, &pInfo->twAggSup.maxTs, sizeof(TSKEY)); qDebug("===stream=== event window operator relase state. save result count:%d", @@ -780,10 +785,12 @@ void streamEventReloadState(SOperatorInfo* pOperator) { if (!pInfo->pSeUpdated && num > 0) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pSeUpdated = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pSeUpdated, code, lino, _end, terrno); } if (!pInfo->pSeDeleted && num > 0) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pSeDeleted = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pSeUpdated, code, lino, _end, terrno); } for (int32_t i = 0; i < num; i++) { SEventWindowInfo curInfo = {0}; @@ -897,6 +904,7 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pSeDeleted = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pSeUpdated, code, lino, _error, terrno); pInfo->pDelIterator = NULL; code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes); QUERY_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index 480814f6a0..c0949ec012 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -604,6 +604,7 @@ static void doStreamFillLinear(SStreamFillSupporter* pFillSup, SStreamFillInfo* SPoint cur = {0}; cur.key = pFillInfo->current; cur.val = taosMemoryCalloc(1, pCell->bytes); + QUERY_CHECK_NULL(cur.val, code, lino, _end, terrno); taosGetLinearInterpolationVal(&cur, pCell->type, &start, pEnd, pCell->type); code = colDataSetVal(pColData, index, (const char*)cur.val, false); QUERY_CHECK_CODE(code, lino, _end); @@ -683,15 +684,24 @@ _end: } } -void keepBlockRowInDiscBuf(SOperatorInfo* pOperator, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, TSKEY* tsCol, +int32_t keepBlockRowInDiscBuf(SOperatorInfo* pOperator, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, TSKEY* tsCol, int32_t rowId, uint64_t groupId, int32_t rowSize) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; TSKEY ts = tsCol[rowId]; pFillInfo->nextRowKey = ts; SResultRowData tmpNextRow = {.key = ts}; tmpNextRow.pRowVal = taosMemoryCalloc(1, rowSize); + QUERY_CHECK_NULL(tmpNextRow.pRowVal, code, lino, _end, terrno); transBlockToResultRow(pBlock, rowId, ts, &tmpNextRow); keepResultInDiscBuf(pOperator, groupId, &tmpNextRow, rowSize); taosMemoryFreeClear(tmpNextRow.pRowVal); + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } static void doFillResults(SOperatorInfo* pOperator, SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, @@ -721,13 +731,15 @@ static void doStreamFillImpl(SOperatorInfo* pOperator) { pInfo->srcRowIndex++; if (pInfo->srcRowIndex == 0) { - keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex, groupId, pFillSup->rowSize); + code = keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex, groupId, pFillSup->rowSize); + QUERY_CHECK_CODE(code, lino, _end); pInfo->srcRowIndex++; } while (pInfo->srcRowIndex < pBlock->info.rows) { TSKEY ts = tsCol[pInfo->srcRowIndex]; - keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex, groupId, pFillSup->rowSize); + code = keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex, groupId, pFillSup->rowSize); + QUERY_CHECK_CODE(code, lino, _end); doFillResults(pOperator, pFillSup, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex - 1, pRes); if (pInfo->pRes->info.rows == pInfo->pRes->info.capacity) { code = blockDataUpdateTsWindow(pRes, pInfo->primaryTsCol); @@ -1207,6 +1219,7 @@ static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNod _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pFillSup->pResMap = tSimpleHashInit(16, hashFn); + QUERY_CHECK_NULL(pFillSup->pResMap, code, lino, _end, terrno); pFillSup->hasDelete = false; _end: diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 7462d71a8a..24490f3e1a 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -647,6 +647,7 @@ int32_t addPullWindow(SHashObj* pMap, SWinKey* pWinRes, int32_t size) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SArray* childIds = taosArrayInit(8, sizeof(int32_t)); + QUERY_CHECK_NULL(childIds, code, lino, _end, terrno); for (int32_t i = 0; i < size; i++) { void* tmp = taosArrayPush(childIds, &i); if (!tmp) { @@ -1579,10 +1580,12 @@ static int32_t doStreamFinalIntervalAggNext(SOperatorInfo* pOperator, SSDataBloc if (!pInfo->pUpdated) { pInfo->pUpdated = taosArrayInit(4096, POINTER_BYTES); + QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno); } if (!pInfo->pUpdatedMap) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pUpdatedMap = tSimpleHashInit(4096, hashFn); + QUERY_CHECK_NULL(pInfo->pUpdatedMap, code, lino, _end, terrno); } while (1) { @@ -1609,6 +1612,7 @@ static int32_t doStreamFinalIntervalAggNext(SOperatorInfo* pOperator, SSDataBloc } else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { SArray* delWins = taosArrayInit(8, sizeof(SWinKey)); + QUERY_CHECK_NULL(delWins, code, lino, _end, terrno); SHashObj* finalMap = IS_FINAL_INTERVAL_OP(pOperator) ? pInfo->pFinalPullDataMap : NULL; code = doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap, finalMap); QUERY_CHECK_CODE(code, lino, _end); @@ -1891,6 +1895,7 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN initBasicInfo(&pInfo->binfo, pResBlock); pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState)); + QUERY_CHECK_NULL(pInfo->pState, code, lino, _error, terrno); qInfo("open state %p", pInfo->pState); pAPI->stateStore.streamStateCopyBackend(pTaskInfo->streamInfo.pState, pInfo->pState); //*(pInfo->pState) = *(pTaskInfo->streamInfo.pState); @@ -1914,6 +1919,7 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN } pInfo->pPullWins = taosArrayInit(8, sizeof(SPullWindowInfo)); + QUERY_CHECK_NULL(pInfo->pPullWins, code, lino, _error, terrno); pInfo->pullIndex = 0; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pPullDataMap = taosHashInit(64, hashFn, true, HASH_NO_LOCK); @@ -1929,6 +1935,7 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN pInfo->delIndex = 0; pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey)); + QUERY_CHECK_NULL(pInfo->pDelWins, code, lino, _error, terrno); pInfo->delKey.ts = INT64_MAX; pInfo->delKey.groupId = 0; pInfo->numOfDatapack = 0; @@ -1953,7 +1960,9 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN pInfo->clearState = false; pInfo->pMidPullDatas = taosArrayInit(4, sizeof(SWinKey)); + QUERY_CHECK_NULL(pInfo->pMidPullDatas, code, lino, _error, terrno); pInfo->pDeletedMap = tSimpleHashInit(4096, hashFn); + QUERY_CHECK_NULL(pInfo->pDeletedMap, code, lino, _error, terrno); pInfo->destHasPrimaryKey = pIntervalPhyNode->window.destHasPrimayKey; pOperator->operatorType = pPhyNode->type; @@ -2129,6 +2138,9 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in initDummyFunction(pSup->pDummyCtx, pExpSup->pCtx, numOfOutput); pSup->pState = taosMemoryCalloc(1, sizeof(SStreamState)); + if (!pSup->pState) { + return terrno; + } *(pSup->pState) = *pState; pSup->stateStore.streamStateSetNumber(pSup->pState, -1, tsIndex); int32_t funResSize = getMaxFunResSize(pExpSup, numOfOutput); @@ -2138,6 +2150,9 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pSup->pResultRows = tSimpleHashInit(32, hashFn); + if (!pSup->pResultRows) { + return terrno; + } for (int32_t i = 0; i < numOfOutput; ++i) { pExpSup->pCtx[i].saveHandle.pState = pSup->pState; @@ -3340,10 +3355,12 @@ static int32_t doStreamSessionAggNext(SOperatorInfo* pOperator, SSDataBlock** pp SOperatorInfo* downstream = pOperator->pDownstream[0]; if (!pInfo->pUpdated) { pInfo->pUpdated = taosArrayInit(16, sizeof(SResultWindowInfo)); + QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno); } if (!pInfo->pStUpdated) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pStUpdated = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pStUpdated, code, lino, _end, terrno); } while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); @@ -3356,6 +3373,7 @@ static int32_t doStreamSessionAggNext(SOperatorInfo* pOperator, SSDataBlock** pp if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { SArray* pWins = taosArrayInit(16, sizeof(SSessionKey)); + QUERY_CHECK_NULL(pWins, code, lino, _end, terrno); // gap must be 0 code = doDeleteTimeWindows(pAggSup, pBlock, pWins); QUERY_CHECK_CODE(code, lino, _end); @@ -3496,6 +3514,9 @@ void streamSessionReleaseState(SOperatorInfo* pOperator) { int32_t winSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey); int32_t resSize = winSize + sizeof(TSKEY); char* pBuff = taosMemoryCalloc(1, resSize); + if (!pBuff) { + return; + } memcpy(pBuff, pInfo->historyWins->pData, winSize); memcpy(pBuff + winSize, &pInfo->twAggSup.maxTs, sizeof(TSKEY)); pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_STATE_NAME, @@ -3623,6 +3644,7 @@ void streamSessionReloadState(SOperatorInfo* pOperator) { if (!pInfo->pStUpdated && num > 0) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pStUpdated = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pStUpdated, code, lino, _end, terrno); } for (int32_t i = 0; i < num; i++) { SResultWindowInfo winInfo = {0}; @@ -3731,6 +3753,7 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode pInfo->order = TSDB_ORDER_ASC; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pStDeleted = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pStUpdated, code, lino, _error, terrno); pInfo->pDelIterator = NULL; code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes); QUERY_CHECK_CODE(code, lino, _error); @@ -3757,6 +3780,7 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode pInfo->recvGetAll = false; pInfo->destHasPrimaryKey = pSessionNode->window.destHasPrimayKey; pInfo->pPkDeleted = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pPkDeleted, code, lino, _error, terrno); pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION; setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true, @@ -3886,10 +3910,12 @@ static int32_t doStreamSessionSemiAggNext(SOperatorInfo* pOperator, SSDataBlock* SOperatorInfo* downstream = pOperator->pDownstream[0]; if (!pInfo->pUpdated) { pInfo->pUpdated = taosArrayInit(16, sizeof(SResultWindowInfo)); + QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno); } if (!pInfo->pStUpdated) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pStUpdated = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pStUpdated, code, lino, _end, terrno); } while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); @@ -4008,6 +4034,7 @@ int32_t createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhys if (numOfChild > 0) { pInfo->pChildren = taosArrayInit(numOfChild, sizeof(void*)); + QUERY_CHECK_NULL(pInfo->pChildren, code, lino, _error, terrno); for (int32_t i = 0; i < numOfChild; i++) { SOperatorInfo* pChildOp = NULL; code = createStreamFinalSessionAggOperatorInfo(NULL, pPhyNode, pTaskInfo, 0, pHandle, &pChildOp); @@ -4598,10 +4625,12 @@ static int32_t doStreamStateAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe SOperatorInfo* downstream = pOperator->pDownstream[0]; if (!pInfo->pUpdated) { pInfo->pUpdated = taosArrayInit(16, sizeof(SResultWindowInfo)); + QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno); } if (!pInfo->pSeUpdated) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pSeUpdated = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pSeUpdated, code, lino, _end, terrno); } while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); @@ -4702,6 +4731,9 @@ void streamStateReleaseState(SOperatorInfo* pOperator) { int32_t winSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey); int32_t resSize = winSize + sizeof(TSKEY); char* pBuff = taosMemoryCalloc(1, resSize); + if (!pBuff) { + return ; + } memcpy(pBuff, pInfo->historyWins->pData, winSize); memcpy(pBuff + winSize, &pInfo->twAggSup.maxTs, sizeof(TSKEY)); qDebug("===stream=== relase state. save result count:%d", (int32_t)taosArrayGetSize(pInfo->historyWins)); @@ -4752,10 +4784,12 @@ void streamStateReloadState(SOperatorInfo* pOperator) { if (!pInfo->pSeUpdated && num > 0) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pSeUpdated = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pSeUpdated, code, lino, _end, terrno); } if (!pInfo->pSeDeleted && num > 0) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pSeDeleted = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pSeDeleted, code, lino, _end, terrno); } for (int32_t i = 0; i < num; i++) { SStateWindowInfo curInfo = {0}; @@ -4865,6 +4899,7 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pSeDeleted = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pSeDeleted, code, lino, _error, terrno); pInfo->pDelIterator = NULL; code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes); @@ -4891,6 +4926,7 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pInfo->recvGetAll = false; pInfo->pPkDeleted = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pPkDeleted, code, lino, _error, terrno); pInfo->destHasPrimaryKey = pStateNode->window.destHasPrimayKey; setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED, @@ -4985,11 +5021,13 @@ static int32_t doStreamIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** p if (!pInfo->pUpdated) { pInfo->pUpdated = taosArrayInit(4096, POINTER_BYTES); + QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno); } if (!pInfo->pUpdatedMap) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pUpdatedMap = tSimpleHashInit(4096, hashFn); + QUERY_CHECK_NULL(pInfo->pUpdatedMap, code, lino, _end, terrno); } while (1) { @@ -5164,6 +5202,7 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* initResultSizeInfo(&pOperator->resultInfo, 4096); pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState)); + QUERY_CHECK_NULL(pInfo->pState, code, lino, _error, terrno); *(pInfo->pState) = *(pTaskInfo->streamInfo.pState); pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1, pInfo->primaryTsIndex); @@ -5181,6 +5220,7 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pInfo->invertible = false; pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey)); + QUERY_CHECK_NULL(pInfo->pDelWins, code, lino, _error, terrno); pInfo->delIndex = 0; code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes); @@ -5221,6 +5261,7 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pDeletedMap = tSimpleHashInit(4096, hashFn); + QUERY_CHECK_NULL(pInfo->pDeletedMap, code, lino, _error, terrno); pInfo->destHasPrimaryKey = pIntervalPhyNode->window.destHasPrimayKey; // for stream @@ -5453,10 +5494,12 @@ static int32_t doStreamMidIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock* if (!pInfo->pUpdated) { pInfo->pUpdated = taosArrayInit(4096, POINTER_BYTES); + QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno); } if (!pInfo->pUpdatedMap) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pUpdatedMap = tSimpleHashInit(4096, hashFn); + QUERY_CHECK_NULL(pInfo->pUpdatedMap, code, lino, _end, terrno); } while (1) { diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 5f4bbd66ce..80b4b646bf 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -1772,8 +1772,10 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) { .pMeta = pInfo->readHandle.vnode, .pVnode = pInfo->readHandle.vnode, .pAPI = &pTaskInfo->storageAPI}; SSysTableIndex* idx = taosMemoryMalloc(sizeof(SSysTableIndex)); + QUERY_CHECK_NULL(idx, code, lino, _end, terrno); idx->init = 0; idx->uids = taosArrayInit(128, sizeof(int64_t)); + QUERY_CHECK_NULL(idx->uids, code, lino, _end, terrno); idx->lastIdx = 0; pInfo->pIdx = idx; // set idx arg @@ -1992,6 +1994,9 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req); char* buf1 = taosMemoryCalloc(1, contLen); + if (!buf1) { + return NULL; + } int32_t tempRes = tSerializeSRetrieveTableReq(buf1, contLen, &pInfo->req); if (tempRes < 0) { code = terrno; @@ -2470,12 +2475,18 @@ static int32_t optSysTabFilte(void* arg, SNode* cond, SArray* result) { bool hasIdx = false; bool hasRslt = true; SArray* mRslt = taosArrayInit(len, POINTER_BYTES); + if (!mRslt) { + return terrno; + } SListCell* cell = pList->pHead; for (int i = 0; i < len; i++) { if (cell == NULL) break; SArray* aRslt = taosArrayInit(16, sizeof(int64_t)); + if (!aRslt) { + return terrno; + } ret = optSysTabFilteImpl(arg, cell->pNode, aRslt); if (ret == 0) { diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 6eaef50491..00f88934d9 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -666,6 +666,9 @@ static int32_t initGroupKeyKeeper(STimeSliceOperatorInfo* pInfo, SExprSupp* pExp pInfo->pPrevGroupKey->type = pExprInfo->base.resSchema.type; pInfo->pPrevGroupKey->isNull = false; pInfo->pPrevGroupKey->pData = taosMemoryCalloc(1, pInfo->pPrevGroupKey->bytes); + if (!pInfo->pPrevGroupKey->pData) { + return terrno; + } } } @@ -1168,6 +1171,7 @@ int32_t createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyN if (IS_VAR_DATA_TYPE(pInfo->pkCol.type)) { pInfo->prevKey.pks[0].pData = taosMemoryCalloc(1, pInfo->pkCol.bytes); + QUERY_CHECK_NULL(pInfo->prevKey.pks[0].pData, code, lino, _error, terrno); } } diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp index 5b34075ecf..ff33732b23 100644 --- a/source/libs/executor/test/executorTests.cpp +++ b/source/libs/executor/test/executorTests.cpp @@ -198,6 +198,7 @@ SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator) { SOperatorInfo* createDummyOperator(int32_t startVal, int32_t numOfBlocks, int32_t rowsPerPage, int32_t type, int32_t numOfCols) { SOperatorInfo* pOperator = static_cast(taosMemoryCalloc(1, sizeof(SOperatorInfo))); + ASSERT(!pOperator); pOperator->name = "dummyInputOpertor4Test"; if (numOfCols == 1) { @@ -207,6 +208,7 @@ SOperatorInfo* createDummyOperator(int32_t startVal, int32_t numOfBlocks, int32_ } SDummyInputInfo* pInfo = (SDummyInputInfo*)taosMemoryCalloc(1, sizeof(SDummyInputInfo)); + ASSERT(!pInfo); pInfo->totalPages = numOfBlocks; pInfo->startVal = startVal; pInfo->numOfRowsPerPage = rowsPerPage;