From 40537001a2bab42ca1bf529ce1e3b10e647a0df3 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 5 Aug 2024 11:57:18 +0800 Subject: [PATCH] fix(stream): check return value. --- source/libs/executor/inc/executil.h | 9 ++-- source/libs/executor/src/aggregateoperator.c | 30 +++++------ source/libs/executor/src/cachescanoperator.c | 4 +- .../libs/executor/src/countwindowoperator.c | 10 +++- .../libs/executor/src/eventwindowoperator.c | 10 +++- source/libs/executor/src/executil.c | 17 +++++-- source/libs/executor/src/filloperator.c | 19 ++++--- source/libs/executor/src/groupoperator.c | 51 ++++++++++--------- source/libs/executor/src/projectoperator.c | 30 +++++++---- source/libs/executor/src/scanoperator.c | 22 +++++--- source/libs/executor/src/sortoperator.c | 10 +++- .../executor/src/streamcountwindowoperator.c | 10 +++- .../executor/src/streameventwindowoperator.c | 10 +++- source/libs/executor/src/streamfilloperator.c | 17 +++++-- .../executor/src/streamtimewindowoperator.c | 42 ++++++++++++--- source/libs/executor/src/sysscanoperator.c | 5 +- source/libs/executor/src/timesliceoperator.c | 10 +++- source/libs/executor/src/timewindowoperator.c | 38 +++++++++++--- 18 files changed, 234 insertions(+), 110 deletions(-) diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index f3ceb33f64..2adc863baf 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -174,9 +174,9 @@ SArray* makeColumnArrayFromList(SNodeList* pNodeList); int32_t extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, int32_t type, SColMatchInfo* pMatchInfo); -int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId); -int32_t createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode); -SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs); +int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId); +int32_t createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode); +int32_t createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, SExprInfo** pExprInfo, int32_t* numOfExprs); SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowEntryInfoOffset, SFunctionStateStore* pStore); @@ -197,9 +197,6 @@ char* getStreamOpName(uint16_t opType); void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr); void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr, const char* taskIdStr); -void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order); -void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery); - TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols); void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t delta); diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index 7e105d2260..7c63120fcf 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -73,7 +73,13 @@ int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pA SOperatorInfo** pOptrInfo) { QRY_OPTR_CHECK(pOptrInfo); - int32_t code = 0; + int32_t lino = 0; + int32_t code = 0; + int32_t num = 0; + SExprInfo* pExprInfo = NULL; + int32_t numOfScalarExpr = 0; + SExprInfo* pScalarExprInfo = NULL; + SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -89,29 +95,23 @@ int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pA size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(&pOperator->resultInfo, 4096); - int32_t num = 0; - SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); + code = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &pExprInfo, &num); + TSDB_CHECK_CODE(code, lino, _error); + code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + TSDB_CHECK_CODE(code, lino, _error); - int32_t numOfScalarExpr = 0; - SExprInfo* pScalarExprInfo = NULL; if (pAggNode->pExprs != NULL) { - pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr); + code = createExprInfo(pAggNode->pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr); + TSDB_CHECK_CODE(code, lino, _error); } code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + TSDB_CHECK_CODE(code, lino, _error); code = filterInitFromNode((SNode*)pAggNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + TSDB_CHECK_CODE(code, lino, _error); pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock; pInfo->groupKeyOptimized = pAggNode->groupKeyOptimized; diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 9d49c8e9ca..81d55ec092 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -200,7 +200,9 @@ int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandl if (pScanNode->scan.pScanPseudoCols != NULL) { SExprSupp* p = &pInfo->pseudoExprSup; - p->pExprInfo = createExprInfo(pScanNode->scan.pScanPseudoCols, NULL, &p->numOfExprs); + code = createExprInfo(pScanNode->scan.pScanPseudoCols, NULL, &p->pExprInfo, &p->numOfExprs); + TSDB_CHECK_CODE(code, lino, _error); + p->pCtx = createSqlFunctionCtx(p->pExprInfo, p->numOfExprs, &p->rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore); } diff --git a/source/libs/executor/src/countwindowoperator.c b/source/libs/executor/src/countwindowoperator.c index b7aa57e4b1..8d2ad4cbad 100644 --- a/source/libs/executor/src/countwindowoperator.c +++ b/source/libs/executor/src/countwindowoperator.c @@ -256,14 +256,19 @@ int32_t createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy if (pCountWindowNode->window.pExprs != NULL) { int32_t numOfScalarExpr = 0; - SExprInfo* pScalarExprInfo = createExprInfo(pCountWindowNode->window.pExprs, NULL, &numOfScalarExpr); + SExprInfo* pScalarExprInfo = NULL; + code = createExprInfo(pCountWindowNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr); + QUERY_CHECK_CODE(code, lino, _error); code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); } size_t keyBufSize = 0; int32_t num = 0; - SExprInfo* pExprInfo = createExprInfo(pCountWindowNode->window.pFuncs, NULL, &num); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pCountWindowNode->window.pFuncs, NULL, &pExprInfo, &num); + QUERY_CHECK_CODE(code, lino, _error); + initResultSizeInfo(&pOperator->resultInfo, 4096); code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, @@ -286,6 +291,7 @@ int32_t createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy if (pInfo->windowCount != pInfo->windowSliding) { numOfItem = pInfo->windowCount / pInfo->windowSliding + 1; } + pInfo->countSup.pWinStates = taosArrayInit_s(itemSize, numOfItem); if (!pInfo->countSup.pWinStates) { goto _error; diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index 629afbbb8e..6a39cac525 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -84,7 +84,10 @@ int32_t createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy if (pEventWindowNode->window.pExprs != NULL) { int32_t numOfScalarExpr = 0; - SExprInfo* pScalarExprInfo = createExprInfo(pEventWindowNode->window.pExprs, NULL, &numOfScalarExpr); + SExprInfo* pScalarExprInfo = NULL; + + code = createExprInfo(pEventWindowNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr); + QUERY_CHECK_CODE(code, lino, _error); code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); } @@ -95,7 +98,10 @@ int32_t createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; int32_t num = 0; - SExprInfo* pExprInfo = createExprInfo(pEventWindowNode->window.pFuncs, NULL, &num); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pEventWindowNode->window.pFuncs, NULL, &pExprInfo, &num); + QUERY_CHECK_CODE(code, lino, _error); + initResultSizeInfo(&pOperator->resultInfo, 4096); code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 5957d08a18..cdc8cc6dd5 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1822,7 +1822,10 @@ SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs) { return pExprs; } -SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs) { +int32_t createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, SExprInfo** pExprInfo, int32_t* numOfExprs) { + QRY_OPTR_CHECK(pExprInfo); + + int32_t code = 0; int32_t numOfFuncs = LIST_LENGTH(pNodeList); int32_t numOfGroupKeys = 0; if (pGroupKeys != NULL) { @@ -1831,10 +1834,13 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* *numOfExprs = numOfFuncs + numOfGroupKeys; if (*numOfExprs == 0) { - return NULL; + return code; } SExprInfo* pExprs = taosMemoryCalloc(*numOfExprs, sizeof(SExprInfo)); + if (pExprs == NULL) { + return terrno; + } for (int32_t i = 0; i < (*numOfExprs); ++i) { STargetNode* pTargetNode = NULL; @@ -1845,15 +1851,16 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* } SExprInfo* pExp = &pExprs[i]; - int32_t code = createExprFromTargetNode(pExp, pTargetNode); + code = createExprFromTargetNode(pExp, pTargetNode); if (code != TSDB_CODE_SUCCESS) { taosMemoryFreeClear(pExprs); qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); - return NULL; + return code; } } - return pExprs; + *pExprInfo = pExprs; + return code; } // set the output buffer for the selectivity + tag query diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index c4ef74608a..d88e09273f 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -455,6 +455,7 @@ int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFi SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { QRY_OPTR_CHECK(pOptrInfo); int32_t code = 0; + int32_t lino = 0; SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); @@ -464,21 +465,23 @@ int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFi } pInfo->pRes = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc); - SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pInfo->numOfExpr); + SExprInfo* pExprInfo = NULL; + + code = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pExprInfo, &pInfo->numOfExpr); + QUERY_CHECK_CODE(code, lino, _error); + pOperator->exprSupp.pExprInfo = pExprInfo; SExprSupp* pNoFillSupp = &pInfo->noFillExprSupp; - pNoFillSupp->pExprInfo = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &pNoFillSupp->numOfExprs); + code = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &pNoFillSupp->pExprInfo, &pNoFillSupp->numOfExprs); + QUERY_CHECK_CODE(code, lino, _error); + code = createPrimaryTsExprIfNeeded(pInfo, pPhyFillNode, pNoFillSupp, pTaskInfo->id.str); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + QUERY_CHECK_CODE(code, lino, _error); code = initExprSupp(pNoFillSupp, pNoFillSupp->pExprInfo, pNoFillSupp->numOfExprs, &pTaskInfo->storageAPI.functionStore); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + QUERY_CHECK_CODE(code, lino, _error); SInterval* pInterval = QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index d88aef8fb7..43b2f5ab6d 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -560,7 +560,8 @@ int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNo int32_t numOfScalarExpr = 0; SExprInfo* pScalarExprInfo = NULL; if (pAggNode->pExprs != NULL) { - pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr); + code = createExprInfo(pAggNode->pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr); + QUERY_CHECK_CODE(code, lino, _error); } pInfo->pGroupCols = NULL; @@ -578,7 +579,11 @@ int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNo QUERY_CHECK_CODE(code, lino, _error); int32_t num = 0; - SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); + SExprInfo* pExprInfo = NULL; + + code = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &pExprInfo, &num); + QUERY_CHECK_CODE(code, lino, _error); + code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, pInfo->groupKeyLen, pTaskInfo->id.str, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); @@ -1125,42 +1130,42 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo SPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SPartitionOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { - pTaskInfo->code = code = TSDB_CODE_OUT_OF_MEMORY; + pTaskInfo->code = code = terrno; goto _error; } int32_t numOfCols = 0; - SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &numOfCols); + SExprInfo* pExprInfo = NULL; + + code = createExprInfo(pPartNode->pTargets, NULL, &pExprInfo, &numOfCols); + pInfo->pGroupCols = makeColumnArrayFromList(pPartNode->pPartitionKeys); if (pPartNode->needBlockOutputTsOrder) { SBlockOrderInfo order = {.order = ORDER_ASC, .pColData = NULL, .nullFirst = false, .slotId = pPartNode->tsSlotId}; pInfo->pOrderInfoArr = taosArrayInit(1, sizeof(SBlockOrderInfo)); if (!pInfo->pOrderInfoArr) { - terrno = TSDB_CODE_OUT_OF_MEMORY; pTaskInfo->code = terrno; goto _error; } + void* tmp = taosArrayPush(pInfo->pOrderInfoArr, &order); QUERY_CHECK_NULL(tmp, code, lino, _error, terrno); } if (pPartNode->pExprs != NULL) { int32_t num = 0; - SExprInfo* pExprInfo1 = createExprInfo(pPartNode->pExprs, NULL, &num); + SExprInfo* pExprInfo1 = NULL; + code = createExprInfo(pPartNode->pExprs, NULL, &pExprInfo1, &num); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pInfo->scalarSup, pExprInfo1, num, &pTaskInfo->storageAPI.functionStore); - if (code != TSDB_CODE_SUCCESS) { - terrno = code; - pTaskInfo->code = terrno; - goto _error; - } + QUERY_CHECK_CODE(code, lino, _error); } _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pGroupSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK); if (pInfo->pGroupSet == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - pTaskInfo->code = terrno; goto _error; } @@ -1170,22 +1175,17 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo pInfo->binfo.pRes = createDataBlockFromDescNode(pPartNode->node.pOutputDataBlockDesc); code = getBufferPgSize(pInfo->binfo.pRes->info.rowSize, &defaultPgsz, &defaultBufsz); if (code != TSDB_CODE_SUCCESS) { - terrno = code; - pTaskInfo->code = code; goto _error; } if (!osTempSpaceAvailable()) { terrno = TSDB_CODE_NO_DISKSPACE; - pTaskInfo->code = terrno; qError("Create partition operator info failed since %s, tempDir:%s", terrstr(), tsTempDir); goto _error; } code = createDiskbasedBuf(&pInfo->pBuf, defaultPgsz, defaultBufsz, pTaskInfo->id.str, tsTempDir); if (code != TSDB_CODE_SUCCESS) { - terrno = code; - pTaskInfo->code = code; goto _error; } @@ -1195,8 +1195,6 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo pInfo->columnOffset = setupColumnOffset(pInfo->binfo.pRes, pInfo->rowCapacity); code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols); if (code != TSDB_CODE_SUCCESS) { - terrno = code; - pTaskInfo->code = code; goto _error; } @@ -1210,8 +1208,6 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { - terrno = code; - pTaskInfo->code = code; goto _error; } @@ -1224,7 +1220,7 @@ _error: } pTaskInfo->code = code; taosMemoryFreeClear(pOperator); - return code; + TAOS_RETURN(code); } int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, @@ -1663,7 +1659,10 @@ int32_t createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPart if (pPartNode->part.pExprs != NULL) { int32_t num = 0; - SExprInfo* pCalExprInfo = createExprInfo(pPartNode->part.pExprs, NULL, &num); + SExprInfo* pCalExprInfo = NULL; + code = createExprInfo(pPartNode->part.pExprs, NULL, &pCalExprInfo, &num); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pInfo->scalarSup, pCalExprInfo, num, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); } @@ -1724,7 +1723,9 @@ int32_t createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPart QUERY_CHECK_CODE(code, lino, _error); int32_t numOfCols = 0; - SExprInfo* pExprInfo = createExprInfo(pPartNode->part.pTargets, NULL, &numOfCols); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pPartNode->part.pTargets, NULL, &pExprInfo, &numOfCols); + QUERY_CHECK_CODE(code, lino, _error); setOperatorInfo(pOperator, "StreamPartitionOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION, false, OP_NOT_OPENED, pInfo, pTaskInfo); diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 295180652d..7185f74254 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -108,9 +108,13 @@ int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* int32_t lino = 0; int32_t numOfCols = 0; - SExprInfo* pExprInfo = createExprInfo(pProjPhyNode->pProjections, NULL, &numOfCols); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pProjPhyNode->pProjections, NULL, &pExprInfo, &numOfCols); + TSDB_CHECK_CODE(code, lino, _error); SSDataBlock* pResBlock = createDataBlockFromDescNode(pProjPhyNode->node.pOutputDataBlockDesc); + TSDB_CHECK_NULL(pResBlock, code, lino, _error, terrno); + initLimitInfo(pProjPhyNode->node.pLimit, pProjPhyNode->node.pSlimit, &pInfo->limitInfo); pInfo->binfo.pRes = pResBlock; @@ -258,14 +262,13 @@ int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { SProjectOperatorInfo* pProjectInfo = pOperator->info; SOptrBasicInfo* pInfo = &pProjectInfo->binfo; - - SExprSupp* pSup = &pOperator->exprSupp; - SSDataBlock* pRes = pInfo->pRes; - SSDataBlock* pFinalRes = pProjectInfo->pFinalRes; - int32_t code = 0; - int64_t st = 0; - int32_t order = pInfo->inputTsOrder; - int32_t scanFlag = 0; + SExprSupp* pSup = &pOperator->exprSupp; + SSDataBlock* pRes = pInfo->pRes; + SSDataBlock* pFinalRes = pProjectInfo->pFinalRes; + int32_t code = 0; + int64_t st = 0; + int32_t order = pInfo->inputTsOrder; + int32_t scanFlag = 0; blockDataCleanup(pFinalRes); SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -465,11 +468,16 @@ int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* SIndefRowsFuncPhysiNode* pPhyNode = (SIndefRowsFuncPhysiNode*)pNode; int32_t numOfExpr = 0; - SExprInfo* pExprInfo = createExprInfo(pPhyNode->pFuncs, NULL, &numOfExpr); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pPhyNode->pFuncs, NULL, &pExprInfo, &numOfExpr); + TSDB_CHECK_CODE(code, lino, _error); if (pPhyNode->pExprs != NULL) { int32_t num = 0; - SExprInfo* pSExpr = createExprInfo(pPhyNode->pExprs, NULL, &num); + SExprInfo* pSExpr = NULL; + code = createExprInfo(pPhyNode->pExprs, NULL, &pSExpr, &num); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pInfo->scalarSup, pSExpr, num, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { goto _error; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index acc3de3447..d491ffb524 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1338,7 +1338,10 @@ int32_t createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHa if (pScanNode->pScanPseudoCols != NULL) { SExprSupp* pSup = &pInfo->base.pseudoSup; - pSup->pExprInfo = createExprInfo(pScanNode->pScanPseudoCols, NULL, &pSup->numOfExprs); + pSup->pExprInfo = NULL; + code = createExprInfo(pScanNode->pScanPseudoCols, NULL, &pSup->pExprInfo, &pSup->numOfExprs); + QUERY_CHECK_CODE(code, lino, _error); + pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore); } @@ -3981,13 +3984,12 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* // create the pseduo columns info if (pTableScanNode->scan.pScanPseudoCols != NULL) { - pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr); + code = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->pPseudoExpr, &pInfo->numOfPseudoExpr); + QUERY_CHECK_CODE(code, lino, _error); } code = filterInitFromNode((SNode*)pScanPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + QUERY_CHECK_CODE(code, lino, _error); pInfo->pRes = createDataBlockFromDescNode(pDescNode); code = createSpecialDataBlock(STREAM_CLEAR, &pInfo->pUpdateRes); @@ -4539,7 +4541,11 @@ int32_t createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* p SDataBlockDescNode* pDescNode = pPhyNode->node.pOutputDataBlockDesc; int32_t numOfExprs = 0; - SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs); + SExprInfo* pExprInfo = NULL; + + code = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &pExprInfo, &numOfExprs); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); @@ -5694,7 +5700,9 @@ int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SR if (pTableScanNode->scan.pScanPseudoCols != NULL) { SExprSupp* pSup = &pInfo->base.pseudoSup; - pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs); + code = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->pExprInfo, &pSup->numOfExprs); + QUERY_CHECK_CODE(code, lino, _error); + pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore); } diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index a0c56df49c..a08787d358 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -60,6 +60,8 @@ int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortN QRY_OPTR_CHECK(pOptrInfo); int32_t code = 0; + int32_t lino = 0; + SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -71,7 +73,9 @@ int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortN SDataBlockDescNode* pDescNode = pSortNode->node.pOutputDataBlockDesc; int32_t numOfCols = 0; - pOperator->exprSupp.pExprInfo = createExprInfo(pSortNode->pExprs, NULL, &numOfCols); + code = createExprInfo(pSortNode->pExprs, NULL, &pOperator->exprSupp.pExprInfo, &numOfCols); + QUERY_CHECK_CODE(code, lino, _error); + pOperator->exprSupp.numOfExprs = numOfCols; int32_t numOfOutputCols = 0; code = @@ -770,7 +774,9 @@ int32_t createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNo SDataBlockDescNode* pDescNode = pSortPhyNode->node.pOutputDataBlockDesc; int32_t numOfCols = 0; - SExprInfo* pExprInfo = createExprInfo(pSortPhyNode->pExprs, NULL, &numOfCols); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pSortPhyNode->pExprs, NULL, &pExprInfo, &numOfCols); + QUERY_CHECK_CODE(code, lino, _error); pSup->pExprInfo = pExprInfo; pSup->numOfExprs = numOfCols; diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 6adc60b79e..8ac73b44f6 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -823,13 +823,19 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* initResultSizeInfo(&pOperator->resultInfo, 4096); if (pCountNode->window.pExprs != NULL) { int32_t numOfScalar = 0; - SExprInfo* pScalarExprInfo = createExprInfo(pCountNode->window.pExprs, NULL, &numOfScalar); + SExprInfo* pScalarExprInfo = NULL; + code = createExprInfo(pCountNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); } SExprSupp* pExpSup = &pOperator->exprSupp; - SExprInfo* pExprInfo = createExprInfo(pCountNode->window.pFuncs, NULL, &numOfCols); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pCountNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); + QUERY_CHECK_CODE(code, lino, _error); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 17ef2fe41f..1311216c06 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -864,7 +864,10 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* initResultSizeInfo(&pOperator->resultInfo, 4096); if (pEventNode->window.pExprs != NULL) { int32_t numOfScalar = 0; - SExprInfo* pScalarExprInfo = createExprInfo(pEventNode->window.pExprs, NULL, &numOfScalar); + SExprInfo* pScalarExprInfo = NULL; + code = createExprInfo(pEventNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -884,7 +887,10 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* SExprSupp* pExpSup = &pOperator->exprSupp; int32_t numOfCols = 0; - SExprInfo* pExprInfo = createExprInfo(pEventNode->window.pFuncs, NULL, &numOfCols); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pEventNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); + QUERY_CHECK_CODE(code, lino, _error); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index 480814f6a0..39aedd9d59 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -1190,7 +1190,11 @@ static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNod } pFillSup->numOfFillCols = numOfFillCols; int32_t numOfNotFillCols = 0; - SExprInfo* noFillExprInfo = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &numOfNotFillCols); + SExprInfo* noFillExprInfo = NULL; + + code = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &noFillExprInfo, &numOfNotFillCols); + QUERY_CHECK_CODE(code, lino, _end); + pFillSup->pAllColInfo = createFillColInfo(pFillExprInfo, pFillSup->numOfFillCols, noFillExprInfo, numOfNotFillCols, (const SNodeListNode*)(pPhyFillNode->pValues)); pFillSup->type = convertFillType(pPhyFillNode->mode); @@ -1201,7 +1205,10 @@ static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNod code = initResultBuf(pFillSup); QUERY_CHECK_CODE(code, lino, _end); - SExprInfo* noFillExpr = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &numOfNotFillCols); + SExprInfo* noFillExpr = NULL; + code = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &noFillExpr, &numOfNotFillCols); + QUERY_CHECK_CODE(code, lino, _end); + code = initExprSupp(&pFillSup->notFillExprSup, noFillExpr, numOfNotFillCols, &pAPI->functionStore); QUERY_CHECK_CODE(code, lino, _end); @@ -1343,7 +1350,11 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi SInterval* pInterval = &((SStreamIntervalOperatorInfo*)downstream->info)->interval; int32_t numOfFillCols = 0; - SExprInfo* pFillExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &numOfFillCols); + SExprInfo* pFillExprInfo = NULL; + + code = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pFillExprInfo, &numOfFillCols); + QUERY_CHECK_CODE(code, lino, _error); + pInfo->pFillSup = initStreamFillSup(pPhyFillNode, pInterval, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI); if (!pInfo->pFillSup) { code = TSDB_CODE_FAILED; diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 7462d71a8a..3c696a1be8 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1880,13 +1880,20 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN initResultSizeInfo(&pOperator->resultInfo, 4096); if (pIntervalPhyNode->window.pExprs != NULL) { int32_t numOfScalar = 0; - SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar); + SExprInfo* pScalarExprInfo = NULL; + + code = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); } int32_t numOfCols = 0; - SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); + QUERY_CHECK_CODE(code, lino, _error); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); initBasicInfo(&pInfo->binfo, pResBlock); @@ -3690,7 +3697,10 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode initResultSizeInfo(&pOperator->resultInfo, 4096); if (pSessionNode->window.pExprs != NULL) { int32_t numOfScalar = 0; - SExprInfo* pScalarExprInfo = createExprInfo(pSessionNode->window.pExprs, NULL, &numOfScalar); + SExprInfo* pScalarExprInfo = NULL; + code = createExprInfo(pSessionNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -3698,7 +3708,10 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode } SExprSupp* pExpSup = &pOperator->exprSupp; - SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pSessionNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); + QUERY_CHECK_CODE(code, lino, _error); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { @@ -4831,7 +4844,10 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* initResultSizeInfo(&pOperator->resultInfo, 4096); if (pStateNode->window.pExprs != NULL) { int32_t numOfScalar = 0; - SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalar); + SExprInfo* pScalarExprInfo = NULL; + code = createExprInfo(pStateNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); } @@ -4849,7 +4865,10 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* SExprSupp* pExpSup = &pOperator->exprSupp; int32_t numOfCols = 0; - SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &numOfCols); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pStateNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); + QUERY_CHECK_CODE(code, lino, _error); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { @@ -5126,7 +5145,10 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* } SStreamIntervalPhysiNode* pIntervalPhyNode = (SStreamIntervalPhysiNode*)pPhyNode; - SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols); + + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); + QUERY_CHECK_CODE(code, lino, _error); SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); pInfo->interval = (SInterval){ @@ -5174,7 +5196,11 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* if (pIntervalPhyNode->window.pExprs != NULL) { int32_t numOfScalar = 0; - SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar); + SExprInfo* pScalarExprInfo = NULL; + + code = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); } diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 5f4bbd66ce..90c760136e 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -2716,7 +2716,10 @@ int32_t createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanP pInfo->uid = (pBlockScanNode->suid != 0) ? pBlockScanNode->suid : pBlockScanNode->uid; int32_t numOfCols = 0; - SExprInfo* pExprInfo = createExprInfo(pBlockScanNode->pScanPseudoCols, NULL, &numOfCols); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pBlockScanNode->pScanPseudoCols, NULL, &pExprInfo, &numOfCols); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfCols, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 6eaef50491..99a66efecb 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -1126,13 +1126,19 @@ int32_t createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyN SExprSupp* pSup = &pOperator->exprSupp; int32_t numOfExprs = 0; - SExprInfo* pExprInfo = createExprInfo(pInterpPhyNode->pFuncs, NULL, &numOfExprs); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pInterpPhyNode->pFuncs, NULL, &pExprInfo, &numOfExprs); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(pSup, pExprInfo, numOfExprs, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); if (pInterpPhyNode->pExprs != NULL) { int32_t num = 0; - SExprInfo* pScalarExprInfo = createExprInfo(pInterpPhyNode->pExprs, NULL, &num); + SExprInfo* pScalarExprInfo = NULL; + code = createExprInfo(pInterpPhyNode->pExprs, NULL, &pScalarExprInfo, &num); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, num, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index a1ec923352..989eb97327 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1298,7 +1298,10 @@ int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode QUERY_CHECK_CODE(code, lino, _error); int32_t num = 0; - SExprInfo* pExprInfo = createExprInfo(pPhyNode->window.pFuncs, NULL, &num); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pPhyNode->window.pFuncs, NULL, &pExprInfo, &num); + QUERY_CHECK_CODE(code, lino, _error); + code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); @@ -1336,7 +1339,10 @@ int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode if (pPhyNode->window.pExprs != NULL) { int32_t numOfScalar = 0; - SExprInfo* pScalarExprInfo = createExprInfo(pPhyNode->window.pExprs, NULL, &numOfScalar); + SExprInfo* pScalarExprInfo = NULL; + code = createExprInfo(pPhyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -1578,7 +1584,10 @@ int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhy if (pStateNode->window.pExprs != NULL) { int32_t numOfScalarExpr = 0; - SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalarExpr); + SExprInfo* pScalarExprInfo = NULL; + code = createExprInfo(pStateNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -1603,7 +1612,10 @@ int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhy size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; int32_t num = 0; - SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pStateNode->window.pFuncs, NULL, &pExprInfo, &num); + QUERY_CHECK_CODE(code, lino, _error); + initResultSizeInfo(&pOperator->resultInfo, 4096); code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, @@ -1682,7 +1694,10 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh initResultSizeInfo(&pOperator->resultInfo, 4096); int32_t numOfCols = 0; - SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pSessionNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); + QUERY_CHECK_CODE(code, lino, _error); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc); initBasicInfo(&pInfo->binfo, pResBlock); @@ -1709,7 +1724,10 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh if (pSessionNode->window.pExprs != NULL) { int32_t numOfScalar = 0; - SExprInfo* pScalarExprInfo = createExprInfo(pSessionNode->window.pExprs, NULL, &numOfScalar); + SExprInfo* pScalarExprInfo = NULL; + code = createExprInfo(pSessionNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -2012,7 +2030,9 @@ int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMerge initResultSizeInfo(&pOperator->resultInfo, 512); int32_t num = 0; - SExprInfo* pExprInfo = createExprInfo(pNode->window.pFuncs, NULL, &num); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pNode->window.pFuncs, NULL, &pExprInfo, &num); + QUERY_CHECK_CODE(code, lino, _error); code = initAggSup(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); @@ -2312,7 +2332,9 @@ int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeInterva } int32_t num = 0; - SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num); + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &num); + QUERY_CHECK_CODE(code, lino, _error); SInterval interval = {.interval = pIntervalPhyNode->interval, .sliding = pIntervalPhyNode->sliding,