From a9f1cff14f0273f2358ca5da981500b8a63389bc Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Mon, 5 Aug 2024 17:08:21 +0800 Subject: [PATCH] fix issue --- source/libs/executor/src/exchangeoperator.c | 1 + source/libs/executor/src/executil.c | 11 ++++++- source/libs/executor/src/mergeoperator.c | 4 +++ source/libs/executor/src/operator.c | 30 +++++++++++++++++++ source/libs/executor/src/projectoperator.c | 1 + source/libs/executor/src/scanoperator.c | 2 ++ source/libs/executor/src/sortoperator.c | 2 ++ .../executor/src/streameventwindowoperator.c | 2 +- 8 files changed, 51 insertions(+), 2 deletions(-) diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 13096cc696..ed63df6f4e 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -408,6 +408,7 @@ int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNo QUERY_CHECK_CODE(code, lino, _error); pInfo->pDummyBlock = createDataBlockFromDescNode(pExNode->node.pOutputDataBlockDesc); + QUERY_CHECK_NULL(pInfo->pDummyBlock, code, lino, _error, terrno); pInfo->pResultBlockList = taosArrayInit(64, POINTER_BYTES); QUERY_CHECK_NULL(pInfo->pResultBlockList, code, lino, _error, terrno); pInfo->pRecycledBlocks = taosArrayInit(64, POINTER_BYTES); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 045bda14b6..57f0fd4325 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1473,19 +1473,28 @@ _error: } int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray** tableList, void* pTaskInfo) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SSubplan* pSubplan = (SSubplan*)node; SScanPhysiNode pNode = {0}; pNode.suid = suid; pNode.uid = suid; pNode.tableType = TSDB_SUPER_TABLE; STableListInfo* pTableListInfo = tableListCreate(); + QUERY_CHECK_NULL(pTableListInfo, code, lino, _end, terrno); uint8_t digest[17] = {0}; - int code = + code = getTableList(pVnode, &pNode, pSubplan ? pSubplan->pTagCond : NULL, pSubplan ? pSubplan->pTagIndexCond : NULL, pTableListInfo, digest, "qGetTableList", &((SExecTaskInfo*)pTaskInfo)->storageAPI); + QUERY_CHECK_CODE(code, lino, _end); *tableList = pTableListInfo->pTableList; pTableListInfo->pTableList = NULL; tableListDestroy(pTableListInfo); + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } return code; } diff --git a/source/libs/executor/src/mergeoperator.c b/source/libs/executor/src/mergeoperator.c index 3b390c8719..6e242339ad 100644 --- a/source/libs/executor/src/mergeoperator.c +++ b/source/libs/executor/src/mergeoperator.c @@ -594,9 +594,11 @@ int32_t createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size_t numS SSortMergeInfo* pSortMergeInfo = &pInfo->sortMergeInfo; initLimitInfo(pMergePhyNode->node.pLimit, pMergePhyNode->node.pSlimit, &pInfo->limitInfo); pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode); + TSDB_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno); SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); SSDataBlock* pInputBlock = createDataBlockFromDescNode(pChildNode->pOutputDataBlockDesc); + TSDB_CHECK_NULL(pInputBlock, code, lino, _error, terrno); initResultSizeInfo(&pOperator->resultInfo, 1024); code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); @@ -620,6 +622,7 @@ int32_t createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size_t numS case MERGE_TYPE_NON_SORT: { SNonSortMergeInfo* pNonSortMerge = &pInfo->nsortMergeInfo; pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode); + TSDB_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno); initResultSizeInfo(&pOperator->resultInfo, 1024); code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); TSDB_CHECK_CODE(code, lino, _error); @@ -629,6 +632,7 @@ int32_t createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size_t numS case MERGE_TYPE_COLUMNS: { SColsMergeInfo* pColsMerge = &pInfo->colsMergeInfo; pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode); + TSDB_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno); initResultSizeInfo(&pOperator->resultInfo, 1); code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); TSDB_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index 701ed0ddbc..4f1b2b1fdd 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -295,6 +295,11 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand } STableListInfo* pTableListInfo = tableListCreate(); + if (!pTableListInfo) { + pTaskInfo->code = terrno; + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno)); + return terrno; + } code = initQueriedTableSchemaInfo(pHandle, &pTableScanNode->scan, dbname, pTaskInfo); if (code) { @@ -330,6 +335,11 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) { STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode; STableListInfo* pTableListInfo = tableListCreate(); + if (!pTableListInfo) { + pTaskInfo->code = terrno; + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno)); + return terrno; + } code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, true, pHandle, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo); @@ -362,6 +372,11 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) { STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; STableListInfo* pTableListInfo = tableListCreate(); + if (!pTableListInfo) { + pTaskInfo->code = terrno; + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno)); + return terrno; + } if (pHandle->vnode) { code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, @@ -385,6 +400,11 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) { STagScanPhysiNode* pTagScanPhyNode = (STagScanPhysiNode*)pPhyNode; STableListInfo* pTableListInfo = tableListCreate(); + if (!pTableListInfo) { + pTaskInfo->code = terrno; + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno)); + return terrno; + } if (!pTagScanPhyNode->onlyMetaCtbIdx) { code = createScanTableListInfo((SScanPhysiNode*)pTagScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo); @@ -398,6 +418,11 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) { SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode; STableListInfo* pTableListInfo = tableListCreate(); + if (!pTableListInfo) { + pTaskInfo->code = terrno; + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno)); + return terrno; + } if (pBlockNode->tableType == TSDB_SUPER_TABLE) { SArray* pList = taosArrayInit(4, sizeof(uint64_t)); @@ -436,6 +461,11 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand } else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) { SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode; STableListInfo* pTableListInfo = tableListCreate(); + if (!pTableListInfo) { + pTaskInfo->code = terrno; + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno)); + return terrno; + } code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo); diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 7185f74254..3f86291d6a 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -485,6 +485,7 @@ int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* } SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->node.pOutputDataBlockDesc); + TSDB_CHECK_NULL(pResBlock, code, lino, _error, terrno); // Make sure the size of SSDataBlock will never exceed the size of 2MB. int32_t TWOMB = 2 * 1024 * 1024; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index fd99735d0e..4fa0bb9e24 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3694,6 +3694,7 @@ int32_t createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo QRY_OPTR_CHECK(pOptrInfo); int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SStreamRawScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamRawScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); @@ -3703,6 +3704,7 @@ int32_t createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo } pInfo->pTableListInfo = tableListCreate(); + QUERY_CHECK_NULL(pInfo->pTableListInfo, code, lino, _end, terrno); pInfo->vnode = pHandle->vnode; pInfo->pAPI = &pTaskInfo->storageAPI; diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 2f62d406d7..010f53cd4c 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -101,6 +101,7 @@ int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortN } pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode); + QUERY_CHECK_NULL(pInfo->binfo.pRes , code, lino, _error, terrno); pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys); if (pSortNode->calcGroupId) { @@ -789,6 +790,7 @@ int32_t createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNo QUERY_CHECK_NULL(pOperator->exprSupp.pCtx, code, lino, _error, terrno); pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode); + QUERY_CHECK_NULL(pInfo->binfo.pRes , code, lino, _error, terrno); code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); TSDB_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 54fddf843b..201b19d2cb 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -899,7 +899,7 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* QUERY_CHECK_CODE(code, lino, _error); SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); - QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error);