From 1b5cf65ab9156dc7d79f3afddef52eb9cdd1f71a Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Mon, 5 Aug 2024 11:17:49 +0800 Subject: [PATCH] fix issue for function res --- source/libs/executor/src/aggregateoperator.c | 18 +++++++++-- source/libs/executor/src/cachescanoperator.c | 1 + .../libs/executor/src/countwindowoperator.c | 1 + source/libs/executor/src/executil.c | 5 ++++ source/libs/executor/src/executorInt.c | 10 +++++-- source/libs/executor/src/filloperator.c | 4 ++- source/libs/executor/src/groupoperator.c | 1 + source/libs/executor/src/scanoperator.c | 26 ++++++++++++++++ .../executor/src/streamcountwindowoperator.c | 1 + .../executor/src/streameventwindowoperator.c | 1 + source/libs/executor/src/streamfilloperator.c | 2 ++ .../executor/src/streamtimewindowoperator.c | 4 +++ source/libs/executor/src/sysscanoperator.c | 30 +++++++++++-------- source/libs/executor/src/timesliceoperator.c | 1 + source/libs/executor/src/timewindowoperator.c | 5 ++++ 15 files changed, 92 insertions(+), 18 deletions(-) diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index 7e105d2260..4f6840b918 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -73,7 +73,8 @@ int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pA SOperatorInfo** pOptrInfo) { QRY_OPTR_CHECK(pOptrInfo); - int32_t code = 0; + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -84,6 +85,7 @@ int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pA pOperator->exprSupp.hasWindowOrGroup = false; SSDataBlock* pResBlock = createDataBlockFromDescNode(pAggNode->node.pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); initBasicInfo(&pInfo->binfo, pResBlock); size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; @@ -91,6 +93,7 @@ int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pA int32_t num = 0; SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); + code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { @@ -140,6 +143,9 @@ int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pA return code; _error: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } if (pInfo != NULL) { destroyAggOperatorInfo(pInfo); } @@ -480,6 +486,10 @@ int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, ui if (taosArrayGetSize(list) == 0) { pData = getNewBufPage(pResultBuf, &pageId); + if (pData == NULL) { + qError("failed to get buffer, code:%s", tstrerror(terrno)); + return terrno; + } pData->num = sizeof(SFilePage); } else { SPageInfo* pi = getLastPageInfo(list); @@ -496,9 +506,11 @@ int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, ui releaseBufPageInfo(pResultBuf, pi); pData = getNewBufPage(pResultBuf, &pageId); - if (pData != NULL) { - pData->num = sizeof(SFilePage); + if (pData == NULL) { + qError("failed to get buffer, code:%s", tstrerror(terrno)); + return terrno; } + pData->num = sizeof(SFilePage); } } diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 9d49c8e9ca..cb6223f389 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -107,6 +107,7 @@ int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandl SDataBlockDescNode* pDescNode = pScanNode->scan.node.pOutputDataBlockDesc; pInfo->pRes = createDataBlockFromDescNode(pDescNode); + QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno); code = extractColMatchInfo(pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo); QUERY_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/countwindowoperator.c b/source/libs/executor/src/countwindowoperator.c index b7aa57e4b1..95b001d089 100644 --- a/source/libs/executor/src/countwindowoperator.c +++ b/source/libs/executor/src/countwindowoperator.c @@ -271,6 +271,7 @@ int32_t createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy QUERY_CHECK_CODE(code, lino, _error); SSDataBlock* pResBlock = createDataBlockFromDescNode(pCountWindowNode->window.node.pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); code = blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity); QUERY_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 5957d08a18..8e79320907 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1835,6 +1835,9 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* } SExprInfo* pExprs = taosMemoryCalloc(*numOfExprs, sizeof(SExprInfo)); + if (!pExprs) { + return NULL; + } for (int32_t i = 0; i < (*numOfExprs); ++i) { STargetNode* pTargetNode = NULL; @@ -1937,6 +1940,8 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, } else { char* udfName = pExpr->pExpr->_function.pFunctNode->functionName; pCtx->udfName = taosStrdup(udfName); + QUERY_CHECK_NULL(pCtx->udfName, code, lino, _end, terrno); + code = fmGetUdafExecFuncs(pCtx->functionId, &pCtx->fpSet); QUERY_CHECK_CODE(code, lino, _end); } diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index bceefd2c0d..e0c180b149 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -89,6 +89,10 @@ SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, i int32_t pageId = -1; if (*currentPageId == -1) { pData = getNewBufPage(pResultBuf, &pageId); + if (pData == NULL) { + qError("failed to get buffer, code:%s", tstrerror(terrno)); + return NULL; + } pData->num = sizeof(SFilePage); } else { pData = getBufPage(pResultBuf, *currentPageId); @@ -104,9 +108,11 @@ SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, i releaseBufPage(pResultBuf, pData); pData = getNewBufPage(pResultBuf, &pageId); - if (pData != NULL) { - pData->num = sizeof(SFilePage); + if (pData == NULL) { + qError("failed to get buffer, code:%s", tstrerror(terrno)); + return NULL; } + pData->num = sizeof(SFilePage); } } diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index c4ef74608a..d1f92c1245 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -454,7 +454,8 @@ static int32_t createPrimaryTsExprIfNeeded(SFillOperatorInfo* pInfo, SFillPhysiN int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { QRY_OPTR_CHECK(pOptrInfo); - int32_t code = 0; + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); @@ -464,6 +465,7 @@ int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFi } pInfo->pRes = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc); + QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno); SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pInfo->numOfExpr); pOperator->exprSupp.pExprInfo = pExprInfo; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index d88aef8fb7..ee94f435e3 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -1168,6 +1168,7 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo uint32_t defaultBufsz = 0; pInfo->binfo.pRes = createDataBlockFromDescNode(pPartNode->node.pOutputDataBlockDesc); + QUERY_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno); code = getBufferPgSize(pInfo->binfo.pRes->info.rowSize, &defaultPgsz, &defaultBufsz); if (code != TSDB_CODE_SUCCESS) { terrno = code; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index acc3de3447..4b154af0d6 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -476,17 +476,27 @@ static void freeTableCachedVal(void* param) { } static STableCachedVal* createTableCacheVal(const SMetaReader* pMetaReader) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; STableCachedVal* pVal = taosMemoryMalloc(sizeof(STableCachedVal)); + QUERY_CHECK_NULL(pVal, code, lino, _end, terrno); pVal->pName = taosStrdup(pMetaReader->me.name); + QUERY_CHECK_NULL(pVal->pName, code, lino, _end, terrno); pVal->pTags = NULL; // only child table has tag value if (pMetaReader->me.type == TSDB_CHILD_TABLE) { STag* pTag = (STag*)pMetaReader->me.ctbEntry.pTags; pVal->pTags = taosMemoryMalloc(pTag->len); + QUERY_CHECK_NULL(pVal->pTags, code, lino, _end, terrno); memcpy(pVal->pTags, pTag, pTag->len); } +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + return NULL; + } return pVal; } @@ -581,6 +591,9 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int pHandle->api.metaReaderFn.readerReleaseLock(&mr); STableCachedVal* pVal = createTableCacheVal(&mr); + if(!pVal) { + return terrno; + } val = *pVal; freeReader = true; @@ -1356,6 +1369,8 @@ int32_t createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHa pInfo->base.readerAPI = pTaskInfo->storageAPI.tsdReader; initResultSizeInfo(&pOperator->resultInfo, 4096); pInfo->pResBlock = createDataBlockFromDescNode(pDescNode); + QUERY_CHECK_NULL(pInfo->pResBlock, code, lino, _error, terrno); + code = prepareDataBlockBuf(pInfo->pResBlock, &pInfo->base.matchInfo); QUERY_CHECK_CODE(code, lino, _error); @@ -2485,6 +2500,7 @@ static int32_t doBlockDataPrimaryKeyFilter(SSDataBlock* pBlock, STqOffsetVal* of void* data = colDataGetData(pColPk, i); if (IS_VAR_DATA_TYPE(pColPk->info.type)) { void* tmq = taosMemoryMalloc(offset->primaryKey.nData + VARSTR_HEADER_SIZE); + QUERY_CHECK_NULL(tmq, code, lino, _end, terrno); memcpy(varDataVal(tmq), offset->primaryKey.pData, offset->primaryKey.nData); varDataLen(tmq) = offset->primaryKey.nData; p[i] = (*ts > offset->ts) || (func(data, tmq) > 0); @@ -2709,6 +2725,7 @@ _end: static int32_t processPrimaryKey(SSDataBlock* pBlock, bool hasPrimaryKey, STqOffsetVal* offset) { int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SValue val = {0}; if (hasPrimaryKey) { code = doBlockDataPrimaryKeyFilter(pBlock, offset); @@ -2725,6 +2742,7 @@ static int32_t processPrimaryKey(SSDataBlock* pBlock, bool hasPrimaryKey, STqOff val.type = pColPk->info.type; if (IS_VAR_DATA_TYPE(pColPk->info.type)) { val.pData = taosMemoryMalloc(varDataLen(tmp)); + QUERY_CHECK_NULL(val.pData, code, lino, _end, terrno); val.nData = varDataLen(tmp); memcpy(val.pData, varDataVal(tmp), varDataLen(tmp)); } else { @@ -2732,6 +2750,11 @@ static int32_t processPrimaryKey(SSDataBlock* pBlock, bool hasPrimaryKey, STqOff } } tqOffsetResetToData(offset, pBlock->info.id.uid, pBlock->info.window.ekey, val); + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } return code; } @@ -3990,6 +4013,7 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* } pInfo->pRes = createDataBlockFromDescNode(pDescNode); + QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno); code = createSpecialDataBlock(STREAM_CLEAR, &pInfo->pUpdateRes); QUERY_CHECK_CODE(code, lino, _error); @@ -4384,6 +4408,8 @@ static int32_t doTagScanFromCtbIdxNext(SOperatorInfo* pOperator, SSDataBlock** p } STUidTagInfo info = {.uid = uid, .pTagVal = pCur->pVal}; info.pTagVal = taosMemoryMalloc(pCur->vLen); + QUERY_CHECK_NULL(info.pTagVal, code, lino, _end, terrno); + memcpy(info.pTagVal, pCur->pVal, pCur->vLen); void* tmp = taosArrayPush(aUidTags, &info); QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 6adc60b79e..c4ca4733b2 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -831,6 +831,7 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* SExprInfo* pExprInfo = createExprInfo(pCountNode->window.pFuncs, NULL, &numOfCols); SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 17ef2fe41f..d1610c47e9 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -886,6 +886,7 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* int32_t numOfCols = 0; SExprInfo* pExprInfo = createExprInfo(pEventNode->window.pFuncs, NULL, &numOfCols); SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index 480814f6a0..7138a8bf21 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -1352,7 +1352,9 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi initResultSizeInfo(&pOperator->resultInfo, 4096); pInfo->pRes = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc); + QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno); pInfo->pSrcBlock = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc); + QUERY_CHECK_NULL(pInfo->pSrcBlock, code, lino, _error, terrno); code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); QUERY_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 7462d71a8a..49930ad19b 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1888,6 +1888,7 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN int32_t numOfCols = 0; SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols); SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); initBasicInfo(&pInfo->binfo, pResBlock); pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState)); @@ -3700,6 +3701,7 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols); SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -4851,6 +4853,7 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* int32_t numOfCols = 0; SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &numOfCols); SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -5129,6 +5132,7 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols); SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); pInfo->interval = (SInterval){ .interval = pIntervalPhyNode->interval, .sliding = pIntervalPhyNode->sliding, diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 5f4bbd66ce..e5fe457fe4 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -2083,6 +2083,7 @@ int32_t createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNo SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; + lino = __LINE__; goto _error; } @@ -2091,9 +2092,7 @@ int32_t createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNo int32_t num = 0; code = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + QUERY_CHECK_CODE(code, lino, _error); extractTbnameSlotId(pInfo, pScanNode); @@ -2101,9 +2100,11 @@ int32_t createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNo pInfo->accountId = pScanPhyNode->accountId; pInfo->pUser = taosStrdup((void*)pUser); + QUERY_CHECK_NULL(pInfo->pUser, code, lino, _error, terrno); pInfo->sysInfo = pScanPhyNode->sysInfo; pInfo->showRewrite = pScanPhyNode->showRewrite; pInfo->pRes = createDataBlockFromDescNode(pDescNode); + QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno); pInfo->pCondition = pScanNode->node.pConditions; code = filterInitFromNode(pScanNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); QUERY_CHECK_CODE(code, lino, _error); @@ -2143,7 +2144,7 @@ _error: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } - taosMemoryFreeClear(pOperator); + destroyOperator(pOperator); pTaskInfo->code = code; return code; } @@ -2173,15 +2174,19 @@ void destroySysScanOperator(void* param) { (void)tsem_destroy(&pInfo->ready); blockDataDestroy(pInfo->pRes); - const char* name = tNameGetTableName(&pInfo->name); - if (strncasecmp(name, TSDB_INS_TABLE_TABLES, TSDB_TABLE_FNAME_LEN) == 0 || - strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0 || - strncasecmp(name, TSDB_INS_TABLE_COLS, TSDB_TABLE_FNAME_LEN) == 0 || pInfo->pCur != NULL) { - if (pInfo->pAPI->metaFn.closeTableMetaCursor != NULL) { - pInfo->pAPI->metaFn.closeTableMetaCursor(pInfo->pCur); - } + if (pInfo->name.type == TSDB_TABLE_NAME_T) { + const char* name = tNameGetTableName(&pInfo->name); + if (strncasecmp(name, TSDB_INS_TABLE_TABLES, TSDB_TABLE_FNAME_LEN) == 0 || + strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0 || + strncasecmp(name, TSDB_INS_TABLE_COLS, TSDB_TABLE_FNAME_LEN) == 0 || pInfo->pCur != NULL) { + if (pInfo->pAPI->metaFn.closeTableMetaCursor != NULL) { + pInfo->pAPI->metaFn.closeTableMetaCursor(pInfo->pCur); + } - pInfo->pCur = NULL; + pInfo->pCur = NULL; + } + } else { + qError("pInfo->name is not initialized"); } if (pInfo->pIdx) { @@ -2694,6 +2699,7 @@ int32_t createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanP } pInfo->pResBlock = createDataBlockFromDescNode(pBlockScanNode->node.pOutputDataBlockDesc); + QUERY_CHECK_NULL(pInfo->pResBlock, code, lino, _error, terrno); code = blockDataEnsureCapacity(pInfo->pResBlock, 1); QUERY_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 6eaef50491..968432b385 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -1150,6 +1150,7 @@ int32_t createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyN pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfExprs, NULL, 0, (SNodeListNode*)pInterpPhyNode->pFillValues); pInfo->pLinearInfo = NULL; pInfo->pRes = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); + QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno); pInfo->win = pInterpPhyNode->timeRange; pInfo->interval.interval = pInterpPhyNode->interval; pInfo->current = pInfo->win.skey; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index a1ec923352..a173611595 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1285,6 +1285,7 @@ int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode } SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->window.node.pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); initBasicInfo(&pInfo->binfo, pResBlock); SExprSupp* pSup = &pOperator->exprSupp; @@ -1613,6 +1614,7 @@ int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhy } SSDataBlock* pResBlock = createDataBlockFromDescNode(pStateNode->window.node.pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); initBasicInfo(&pInfo->binfo, pResBlock); initResultRowInfo(&pInfo->binfo.resultRowInfo); @@ -1684,6 +1686,7 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh int32_t numOfCols = 0; SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols); SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); initBasicInfo(&pInfo->binfo, pResBlock); code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, @@ -2019,6 +2022,7 @@ int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMerge QUERY_CHECK_CODE(code, lino, _error); SSDataBlock* pResBlock = createDataBlockFromDescNode(pNode->window.node.pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); initBasicInfo(&iaInfo->binfo, pResBlock); code = initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win); QUERY_CHECK_CODE(code, lino, _error); @@ -2344,6 +2348,7 @@ int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeInterva } SSDataBlock* pResBlock = createDataBlockFromDescNode(pIntervalPhyNode->window.node.pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); initBasicInfo(&pIntervalInfo->binfo, pResBlock); code = initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win); QUERY_CHECK_CODE(code, lino, _error);