fix(stream): check return value.

This commit is contained in:
Haojun Liao 2024-08-05 11:57:18 +08:00
parent 9a2ee54719
commit 40537001a2
18 changed files with 234 additions and 110 deletions

View File

@ -174,9 +174,9 @@ SArray* makeColumnArrayFromList(SNodeList* pNodeList);
int32_t extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols,
int32_t type, SColMatchInfo* pMatchInfo);
int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId);
int32_t createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode);
SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs);
int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId);
int32_t createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode);
int32_t createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, SExprInfo** pExprInfo, int32_t* numOfExprs);
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowEntryInfoOffset,
SFunctionStateStore* pStore);
@ -197,9 +197,6 @@ char* getStreamOpName(uint16_t opType);
void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr);
void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr, const char* taskIdStr);
void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order);
void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery);
TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols);
void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t delta);

View File

@ -73,7 +73,13 @@ int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pA
SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
int32_t code = 0;
int32_t lino = 0;
int32_t code = 0;
int32_t num = 0;
SExprInfo* pExprInfo = NULL;
int32_t numOfScalarExpr = 0;
SExprInfo* pScalarExprInfo = NULL;
SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
@ -89,29 +95,23 @@ int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pA
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(&pOperator->resultInfo, 4096);
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
code = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &pExprInfo, &num);
TSDB_CHECK_CODE(code, lino, _error);
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
TSDB_CHECK_CODE(code, lino, _error);
int32_t numOfScalarExpr = 0;
SExprInfo* pScalarExprInfo = NULL;
if (pAggNode->pExprs != NULL) {
pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr);
code = createExprInfo(pAggNode->pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
TSDB_CHECK_CODE(code, lino, _error);
}
code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
TSDB_CHECK_CODE(code, lino, _error);
code = filterInitFromNode((SNode*)pAggNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
TSDB_CHECK_CODE(code, lino, _error);
pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock;
pInfo->groupKeyOptimized = pAggNode->groupKeyOptimized;

View File

@ -200,7 +200,9 @@ int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandl
if (pScanNode->scan.pScanPseudoCols != NULL) {
SExprSupp* p = &pInfo->pseudoExprSup;
p->pExprInfo = createExprInfo(pScanNode->scan.pScanPseudoCols, NULL, &p->numOfExprs);
code = createExprInfo(pScanNode->scan.pScanPseudoCols, NULL, &p->pExprInfo, &p->numOfExprs);
TSDB_CHECK_CODE(code, lino, _error);
p->pCtx =
createSqlFunctionCtx(p->pExprInfo, p->numOfExprs, &p->rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore);
}

View File

@ -256,14 +256,19 @@ int32_t createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy
if (pCountWindowNode->window.pExprs != NULL) {
int32_t numOfScalarExpr = 0;
SExprInfo* pScalarExprInfo = createExprInfo(pCountWindowNode->window.pExprs, NULL, &numOfScalarExpr);
SExprInfo* pScalarExprInfo = NULL;
code = createExprInfo(pCountWindowNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
QUERY_CHECK_CODE(code, lino, _error);
code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
QUERY_CHECK_CODE(code, lino, _error);
}
size_t keyBufSize = 0;
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pCountWindowNode->window.pFuncs, NULL, &num);
SExprInfo* pExprInfo = NULL;
code = createExprInfo(pCountWindowNode->window.pFuncs, NULL, &pExprInfo, &num);
QUERY_CHECK_CODE(code, lino, _error);
initResultSizeInfo(&pOperator->resultInfo, 4096);
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
@ -286,6 +291,7 @@ int32_t createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy
if (pInfo->windowCount != pInfo->windowSliding) {
numOfItem = pInfo->windowCount / pInfo->windowSliding + 1;
}
pInfo->countSup.pWinStates = taosArrayInit_s(itemSize, numOfItem);
if (!pInfo->countSup.pWinStates) {
goto _error;

View File

@ -84,7 +84,10 @@ int32_t createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy
if (pEventWindowNode->window.pExprs != NULL) {
int32_t numOfScalarExpr = 0;
SExprInfo* pScalarExprInfo = createExprInfo(pEventWindowNode->window.pExprs, NULL, &numOfScalarExpr);
SExprInfo* pScalarExprInfo = NULL;
code = createExprInfo(pEventWindowNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
QUERY_CHECK_CODE(code, lino, _error);
code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
QUERY_CHECK_CODE(code, lino, _error);
}
@ -95,7 +98,10 @@ int32_t createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pEventWindowNode->window.pFuncs, NULL, &num);
SExprInfo* pExprInfo = NULL;
code = createExprInfo(pEventWindowNode->window.pFuncs, NULL, &pExprInfo, &num);
QUERY_CHECK_CODE(code, lino, _error);
initResultSizeInfo(&pOperator->resultInfo, 4096);
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,

View File

@ -1822,7 +1822,10 @@ SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs) {
return pExprs;
}
SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs) {
int32_t createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, SExprInfo** pExprInfo, int32_t* numOfExprs) {
QRY_OPTR_CHECK(pExprInfo);
int32_t code = 0;
int32_t numOfFuncs = LIST_LENGTH(pNodeList);
int32_t numOfGroupKeys = 0;
if (pGroupKeys != NULL) {
@ -1831,10 +1834,13 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
*numOfExprs = numOfFuncs + numOfGroupKeys;
if (*numOfExprs == 0) {
return NULL;
return code;
}
SExprInfo* pExprs = taosMemoryCalloc(*numOfExprs, sizeof(SExprInfo));
if (pExprs == NULL) {
return terrno;
}
for (int32_t i = 0; i < (*numOfExprs); ++i) {
STargetNode* pTargetNode = NULL;
@ -1845,15 +1851,16 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
}
SExprInfo* pExp = &pExprs[i];
int32_t code = createExprFromTargetNode(pExp, pTargetNode);
code = createExprFromTargetNode(pExp, pTargetNode);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFreeClear(pExprs);
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
return NULL;
return code;
}
}
return pExprs;
*pExprInfo = pExprs;
return code;
}
// set the output buffer for the selectivity + tag query

View File

@ -455,6 +455,7 @@ int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFi
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
int32_t code = 0;
int32_t lino = 0;
SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
@ -464,21 +465,23 @@ int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFi
}
pInfo->pRes = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pInfo->numOfExpr);
SExprInfo* pExprInfo = NULL;
code = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pExprInfo, &pInfo->numOfExpr);
QUERY_CHECK_CODE(code, lino, _error);
pOperator->exprSupp.pExprInfo = pExprInfo;
SExprSupp* pNoFillSupp = &pInfo->noFillExprSupp;
pNoFillSupp->pExprInfo = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &pNoFillSupp->numOfExprs);
code = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &pNoFillSupp->pExprInfo, &pNoFillSupp->numOfExprs);
QUERY_CHECK_CODE(code, lino, _error);
code = createPrimaryTsExprIfNeeded(pInfo, pPhyFillNode, pNoFillSupp, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
QUERY_CHECK_CODE(code, lino, _error);
code =
initExprSupp(pNoFillSupp, pNoFillSupp->pExprInfo, pNoFillSupp->numOfExprs, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
QUERY_CHECK_CODE(code, lino, _error);
SInterval* pInterval =
QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType

View File

@ -560,7 +560,8 @@ int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNo
int32_t numOfScalarExpr = 0;
SExprInfo* pScalarExprInfo = NULL;
if (pAggNode->pExprs != NULL) {
pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr);
code = createExprInfo(pAggNode->pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
QUERY_CHECK_CODE(code, lino, _error);
}
pInfo->pGroupCols = NULL;
@ -578,7 +579,11 @@ int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNo
QUERY_CHECK_CODE(code, lino, _error);
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
SExprInfo* pExprInfo = NULL;
code = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &pExprInfo, &num);
QUERY_CHECK_CODE(code, lino, _error);
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, pInfo->groupKeyLen, pTaskInfo->id.str,
pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
QUERY_CHECK_CODE(code, lino, _error);
@ -1125,42 +1130,42 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo
SPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SPartitionOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
pTaskInfo->code = code = TSDB_CODE_OUT_OF_MEMORY;
pTaskInfo->code = code = terrno;
goto _error;
}
int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &numOfCols);
SExprInfo* pExprInfo = NULL;
code = createExprInfo(pPartNode->pTargets, NULL, &pExprInfo, &numOfCols);
pInfo->pGroupCols = makeColumnArrayFromList(pPartNode->pPartitionKeys);
if (pPartNode->needBlockOutputTsOrder) {
SBlockOrderInfo order = {.order = ORDER_ASC, .pColData = NULL, .nullFirst = false, .slotId = pPartNode->tsSlotId};
pInfo->pOrderInfoArr = taosArrayInit(1, sizeof(SBlockOrderInfo));
if (!pInfo->pOrderInfoArr) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
pTaskInfo->code = terrno;
goto _error;
}
void* tmp = taosArrayPush(pInfo->pOrderInfoArr, &order);
QUERY_CHECK_NULL(tmp, code, lino, _error, terrno);
}
if (pPartNode->pExprs != NULL) {
int32_t num = 0;
SExprInfo* pExprInfo1 = createExprInfo(pPartNode->pExprs, NULL, &num);
SExprInfo* pExprInfo1 = NULL;
code = createExprInfo(pPartNode->pExprs, NULL, &pExprInfo1, &num);
QUERY_CHECK_CODE(code, lino, _error);
code = initExprSupp(&pInfo->scalarSup, pExprInfo1, num, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
pTaskInfo->code = terrno;
goto _error;
}
QUERY_CHECK_CODE(code, lino, _error);
}
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pGroupSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK);
if (pInfo->pGroupSet == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
pTaskInfo->code = terrno;
goto _error;
}
@ -1170,22 +1175,17 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo
pInfo->binfo.pRes = createDataBlockFromDescNode(pPartNode->node.pOutputDataBlockDesc);
code = getBufferPgSize(pInfo->binfo.pRes->info.rowSize, &defaultPgsz, &defaultBufsz);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
pTaskInfo->code = code;
goto _error;
}
if (!osTempSpaceAvailable()) {
terrno = TSDB_CODE_NO_DISKSPACE;
pTaskInfo->code = terrno;
qError("Create partition operator info failed since %s, tempDir:%s", terrstr(), tsTempDir);
goto _error;
}
code = createDiskbasedBuf(&pInfo->pBuf, defaultPgsz, defaultBufsz, pTaskInfo->id.str, tsTempDir);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
pTaskInfo->code = code;
goto _error;
}
@ -1195,8 +1195,6 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo
pInfo->columnOffset = setupColumnOffset(pInfo->binfo.pRes, pInfo->rowCapacity);
code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
pTaskInfo->code = code;
goto _error;
}
@ -1210,8 +1208,6 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
pTaskInfo->code = code;
goto _error;
}
@ -1224,7 +1220,7 @@ _error:
}
pTaskInfo->code = code;
taosMemoryFreeClear(pOperator);
return code;
TAOS_RETURN(code);
}
int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData,
@ -1663,7 +1659,10 @@ int32_t createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPart
if (pPartNode->part.pExprs != NULL) {
int32_t num = 0;
SExprInfo* pCalExprInfo = createExprInfo(pPartNode->part.pExprs, NULL, &num);
SExprInfo* pCalExprInfo = NULL;
code = createExprInfo(pPartNode->part.pExprs, NULL, &pCalExprInfo, &num);
QUERY_CHECK_CODE(code, lino, _error);
code = initExprSupp(&pInfo->scalarSup, pCalExprInfo, num, &pTaskInfo->storageAPI.functionStore);
QUERY_CHECK_CODE(code, lino, _error);
}
@ -1724,7 +1723,9 @@ int32_t createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPart
QUERY_CHECK_CODE(code, lino, _error);
int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pPartNode->part.pTargets, NULL, &numOfCols);
SExprInfo* pExprInfo = NULL;
code = createExprInfo(pPartNode->part.pTargets, NULL, &pExprInfo, &numOfCols);
QUERY_CHECK_CODE(code, lino, _error);
setOperatorInfo(pOperator, "StreamPartitionOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION, false, OP_NOT_OPENED,
pInfo, pTaskInfo);

View File

@ -108,9 +108,13 @@ int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode*
int32_t lino = 0;
int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pProjPhyNode->pProjections, NULL, &numOfCols);
SExprInfo* pExprInfo = NULL;
code = createExprInfo(pProjPhyNode->pProjections, NULL, &pExprInfo, &numOfCols);
TSDB_CHECK_CODE(code, lino, _error);
SSDataBlock* pResBlock = createDataBlockFromDescNode(pProjPhyNode->node.pOutputDataBlockDesc);
TSDB_CHECK_NULL(pResBlock, code, lino, _error, terrno);
initLimitInfo(pProjPhyNode->node.pLimit, pProjPhyNode->node.pSlimit, &pInfo->limitInfo);
pInfo->binfo.pRes = pResBlock;
@ -258,14 +262,13 @@ int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
SProjectOperatorInfo* pProjectInfo = pOperator->info;
SOptrBasicInfo* pInfo = &pProjectInfo->binfo;
SExprSupp* pSup = &pOperator->exprSupp;
SSDataBlock* pRes = pInfo->pRes;
SSDataBlock* pFinalRes = pProjectInfo->pFinalRes;
int32_t code = 0;
int64_t st = 0;
int32_t order = pInfo->inputTsOrder;
int32_t scanFlag = 0;
SExprSupp* pSup = &pOperator->exprSupp;
SSDataBlock* pRes = pInfo->pRes;
SSDataBlock* pFinalRes = pProjectInfo->pFinalRes;
int32_t code = 0;
int64_t st = 0;
int32_t order = pInfo->inputTsOrder;
int32_t scanFlag = 0;
blockDataCleanup(pFinalRes);
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -465,11 +468,16 @@ int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
SIndefRowsFuncPhysiNode* pPhyNode = (SIndefRowsFuncPhysiNode*)pNode;
int32_t numOfExpr = 0;
SExprInfo* pExprInfo = createExprInfo(pPhyNode->pFuncs, NULL, &numOfExpr);
SExprInfo* pExprInfo = NULL;
code = createExprInfo(pPhyNode->pFuncs, NULL, &pExprInfo, &numOfExpr);
TSDB_CHECK_CODE(code, lino, _error);
if (pPhyNode->pExprs != NULL) {
int32_t num = 0;
SExprInfo* pSExpr = createExprInfo(pPhyNode->pExprs, NULL, &num);
SExprInfo* pSExpr = NULL;
code = createExprInfo(pPhyNode->pExprs, NULL, &pSExpr, &num);
QUERY_CHECK_CODE(code, lino, _error);
code = initExprSupp(&pInfo->scalarSup, pSExpr, num, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) {
goto _error;

View File

@ -1338,7 +1338,10 @@ int32_t createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHa
if (pScanNode->pScanPseudoCols != NULL) {
SExprSupp* pSup = &pInfo->base.pseudoSup;
pSup->pExprInfo = createExprInfo(pScanNode->pScanPseudoCols, NULL, &pSup->numOfExprs);
pSup->pExprInfo = NULL;
code = createExprInfo(pScanNode->pScanPseudoCols, NULL, &pSup->pExprInfo, &pSup->numOfExprs);
QUERY_CHECK_CODE(code, lino, _error);
pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset,
&pTaskInfo->storageAPI.functionStore);
}
@ -3981,13 +3984,12 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode*
// create the pseduo columns info
if (pTableScanNode->scan.pScanPseudoCols != NULL) {
pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr);
code = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->pPseudoExpr, &pInfo->numOfPseudoExpr);
QUERY_CHECK_CODE(code, lino, _error);
}
code = filterInitFromNode((SNode*)pScanPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
QUERY_CHECK_CODE(code, lino, _error);
pInfo->pRes = createDataBlockFromDescNode(pDescNode);
code = createSpecialDataBlock(STREAM_CLEAR, &pInfo->pUpdateRes);
@ -4539,7 +4541,11 @@ int32_t createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* p
SDataBlockDescNode* pDescNode = pPhyNode->node.pOutputDataBlockDesc;
int32_t numOfExprs = 0;
SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs);
SExprInfo* pExprInfo = NULL;
code = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &pExprInfo, &numOfExprs);
QUERY_CHECK_CODE(code, lino, _error);
code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs, &pTaskInfo->storageAPI.functionStore);
QUERY_CHECK_CODE(code, lino, _error);
@ -5694,7 +5700,9 @@ int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SR
if (pTableScanNode->scan.pScanPseudoCols != NULL) {
SExprSupp* pSup = &pInfo->base.pseudoSup;
pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs);
code = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->pExprInfo, &pSup->numOfExprs);
QUERY_CHECK_CODE(code, lino, _error);
pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset,
&pTaskInfo->storageAPI.functionStore);
}

View File

@ -60,6 +60,8 @@ int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortN
QRY_OPTR_CHECK(pOptrInfo);
int32_t code = 0;
int32_t lino = 0;
SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
@ -71,7 +73,9 @@ int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortN
SDataBlockDescNode* pDescNode = pSortNode->node.pOutputDataBlockDesc;
int32_t numOfCols = 0;
pOperator->exprSupp.pExprInfo = createExprInfo(pSortNode->pExprs, NULL, &numOfCols);
code = createExprInfo(pSortNode->pExprs, NULL, &pOperator->exprSupp.pExprInfo, &numOfCols);
QUERY_CHECK_CODE(code, lino, _error);
pOperator->exprSupp.numOfExprs = numOfCols;
int32_t numOfOutputCols = 0;
code =
@ -770,7 +774,9 @@ int32_t createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNo
SDataBlockDescNode* pDescNode = pSortPhyNode->node.pOutputDataBlockDesc;
int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pSortPhyNode->pExprs, NULL, &numOfCols);
SExprInfo* pExprInfo = NULL;
code = createExprInfo(pSortPhyNode->pExprs, NULL, &pExprInfo, &numOfCols);
QUERY_CHECK_CODE(code, lino, _error);
pSup->pExprInfo = pExprInfo;
pSup->numOfExprs = numOfCols;

View File

@ -823,13 +823,19 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
initResultSizeInfo(&pOperator->resultInfo, 4096);
if (pCountNode->window.pExprs != NULL) {
int32_t numOfScalar = 0;
SExprInfo* pScalarExprInfo = createExprInfo(pCountNode->window.pExprs, NULL, &numOfScalar);
SExprInfo* pScalarExprInfo = NULL;
code = createExprInfo(pCountNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
QUERY_CHECK_CODE(code, lino, _error);
code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
QUERY_CHECK_CODE(code, lino, _error);
}
SExprSupp* pExpSup = &pOperator->exprSupp;
SExprInfo* pExprInfo = createExprInfo(pCountNode->window.pFuncs, NULL, &numOfCols);
SExprInfo* pExprInfo = NULL;
code = createExprInfo(pCountNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
QUERY_CHECK_CODE(code, lino, _error);
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
QUERY_CHECK_CODE(code, lino, _error);

View File

@ -864,7 +864,10 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
initResultSizeInfo(&pOperator->resultInfo, 4096);
if (pEventNode->window.pExprs != NULL) {
int32_t numOfScalar = 0;
SExprInfo* pScalarExprInfo = createExprInfo(pEventNode->window.pExprs, NULL, &numOfScalar);
SExprInfo* pScalarExprInfo = NULL;
code = createExprInfo(pEventNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
QUERY_CHECK_CODE(code, lino, _error);
code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
@ -884,7 +887,10 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
SExprSupp* pExpSup = &pOperator->exprSupp;
int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pEventNode->window.pFuncs, NULL, &numOfCols);
SExprInfo* pExprInfo = NULL;
code = createExprInfo(pEventNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
QUERY_CHECK_CODE(code, lino, _error);
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
QUERY_CHECK_CODE(code, lino, _error);

View File

@ -1190,7 +1190,11 @@ static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNod
}
pFillSup->numOfFillCols = numOfFillCols;
int32_t numOfNotFillCols = 0;
SExprInfo* noFillExprInfo = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &numOfNotFillCols);
SExprInfo* noFillExprInfo = NULL;
code = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &noFillExprInfo, &numOfNotFillCols);
QUERY_CHECK_CODE(code, lino, _end);
pFillSup->pAllColInfo = createFillColInfo(pFillExprInfo, pFillSup->numOfFillCols, noFillExprInfo, numOfNotFillCols,
(const SNodeListNode*)(pPhyFillNode->pValues));
pFillSup->type = convertFillType(pPhyFillNode->mode);
@ -1201,7 +1205,10 @@ static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNod
code = initResultBuf(pFillSup);
QUERY_CHECK_CODE(code, lino, _end);
SExprInfo* noFillExpr = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &numOfNotFillCols);
SExprInfo* noFillExpr = NULL;
code = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &noFillExpr, &numOfNotFillCols);
QUERY_CHECK_CODE(code, lino, _end);
code = initExprSupp(&pFillSup->notFillExprSup, noFillExpr, numOfNotFillCols, &pAPI->functionStore);
QUERY_CHECK_CODE(code, lino, _end);
@ -1343,7 +1350,11 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi
SInterval* pInterval = &((SStreamIntervalOperatorInfo*)downstream->info)->interval;
int32_t numOfFillCols = 0;
SExprInfo* pFillExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &numOfFillCols);
SExprInfo* pFillExprInfo = NULL;
code = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pFillExprInfo, &numOfFillCols);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->pFillSup = initStreamFillSup(pPhyFillNode, pInterval, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI);
if (!pInfo->pFillSup) {
code = TSDB_CODE_FAILED;

View File

@ -1880,13 +1880,20 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN
initResultSizeInfo(&pOperator->resultInfo, 4096);
if (pIntervalPhyNode->window.pExprs != NULL) {
int32_t numOfScalar = 0;
SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar);
SExprInfo* pScalarExprInfo = NULL;
code = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
QUERY_CHECK_CODE(code, lino, _error);
code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
QUERY_CHECK_CODE(code, lino, _error);
}
int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols);
SExprInfo* pExprInfo = NULL;
code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
QUERY_CHECK_CODE(code, lino, _error);
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
initBasicInfo(&pInfo->binfo, pResBlock);
@ -3690,7 +3697,10 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode
initResultSizeInfo(&pOperator->resultInfo, 4096);
if (pSessionNode->window.pExprs != NULL) {
int32_t numOfScalar = 0;
SExprInfo* pScalarExprInfo = createExprInfo(pSessionNode->window.pExprs, NULL, &numOfScalar);
SExprInfo* pScalarExprInfo = NULL;
code = createExprInfo(pSessionNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
QUERY_CHECK_CODE(code, lino, _error);
code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
@ -3698,7 +3708,10 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode
}
SExprSupp* pExpSup = &pOperator->exprSupp;
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols);
SExprInfo* pExprInfo = NULL;
code = createExprInfo(pSessionNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
QUERY_CHECK_CODE(code, lino, _error);
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) {
@ -4831,7 +4844,10 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
initResultSizeInfo(&pOperator->resultInfo, 4096);
if (pStateNode->window.pExprs != NULL) {
int32_t numOfScalar = 0;
SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalar);
SExprInfo* pScalarExprInfo = NULL;
code = createExprInfo(pStateNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
QUERY_CHECK_CODE(code, lino, _error);
code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
QUERY_CHECK_CODE(code, lino, _error);
}
@ -4849,7 +4865,10 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
SExprSupp* pExpSup = &pOperator->exprSupp;
int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &numOfCols);
SExprInfo* pExprInfo = NULL;
code = createExprInfo(pStateNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
QUERY_CHECK_CODE(code, lino, _error);
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) {
@ -5126,7 +5145,10 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
}
SStreamIntervalPhysiNode* pIntervalPhyNode = (SStreamIntervalPhysiNode*)pPhyNode;
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols);
SExprInfo* pExprInfo = NULL;
code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
QUERY_CHECK_CODE(code, lino, _error);
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
pInfo->interval = (SInterval){
@ -5174,7 +5196,11 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
if (pIntervalPhyNode->window.pExprs != NULL) {
int32_t numOfScalar = 0;
SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar);
SExprInfo* pScalarExprInfo = NULL;
code = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
QUERY_CHECK_CODE(code, lino, _error);
code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
QUERY_CHECK_CODE(code, lino, _error);
}

View File

@ -2716,7 +2716,10 @@ int32_t createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanP
pInfo->uid = (pBlockScanNode->suid != 0) ? pBlockScanNode->suid : pBlockScanNode->uid;
int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pBlockScanNode->pScanPseudoCols, NULL, &numOfCols);
SExprInfo* pExprInfo = NULL;
code = createExprInfo(pBlockScanNode->pScanPseudoCols, NULL, &pExprInfo, &numOfCols);
QUERY_CHECK_CODE(code, lino, _error);
code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfCols, &pTaskInfo->storageAPI.functionStore);
QUERY_CHECK_CODE(code, lino, _error);

View File

@ -1126,13 +1126,19 @@ int32_t createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyN
SExprSupp* pSup = &pOperator->exprSupp;
int32_t numOfExprs = 0;
SExprInfo* pExprInfo = createExprInfo(pInterpPhyNode->pFuncs, NULL, &numOfExprs);
SExprInfo* pExprInfo = NULL;
code = createExprInfo(pInterpPhyNode->pFuncs, NULL, &pExprInfo, &numOfExprs);
QUERY_CHECK_CODE(code, lino, _error);
code = initExprSupp(pSup, pExprInfo, numOfExprs, &pTaskInfo->storageAPI.functionStore);
QUERY_CHECK_CODE(code, lino, _error);
if (pInterpPhyNode->pExprs != NULL) {
int32_t num = 0;
SExprInfo* pScalarExprInfo = createExprInfo(pInterpPhyNode->pExprs, NULL, &num);
SExprInfo* pScalarExprInfo = NULL;
code = createExprInfo(pInterpPhyNode->pExprs, NULL, &pScalarExprInfo, &num);
QUERY_CHECK_CODE(code, lino, _error);
code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, num, &pTaskInfo->storageAPI.functionStore);
QUERY_CHECK_CODE(code, lino, _error);
}

View File

@ -1298,7 +1298,10 @@ int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode
QUERY_CHECK_CODE(code, lino, _error);
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pPhyNode->window.pFuncs, NULL, &num);
SExprInfo* pExprInfo = NULL;
code = createExprInfo(pPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
QUERY_CHECK_CODE(code, lino, _error);
code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
&pTaskInfo->storageAPI.functionStore);
QUERY_CHECK_CODE(code, lino, _error);
@ -1336,7 +1339,10 @@ int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode
if (pPhyNode->window.pExprs != NULL) {
int32_t numOfScalar = 0;
SExprInfo* pScalarExprInfo = createExprInfo(pPhyNode->window.pExprs, NULL, &numOfScalar);
SExprInfo* pScalarExprInfo = NULL;
code = createExprInfo(pPhyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
QUERY_CHECK_CODE(code, lino, _error);
code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
@ -1578,7 +1584,10 @@ int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhy
if (pStateNode->window.pExprs != NULL) {
int32_t numOfScalarExpr = 0;
SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalarExpr);
SExprInfo* pScalarExprInfo = NULL;
code = createExprInfo(pStateNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
QUERY_CHECK_CODE(code, lino, _error);
code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
@ -1603,7 +1612,10 @@ int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhy
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num);
SExprInfo* pExprInfo = NULL;
code = createExprInfo(pStateNode->window.pFuncs, NULL, &pExprInfo, &num);
QUERY_CHECK_CODE(code, lino, _error);
initResultSizeInfo(&pOperator->resultInfo, 4096);
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
@ -1682,7 +1694,10 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh
initResultSizeInfo(&pOperator->resultInfo, 4096);
int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols);
SExprInfo* pExprInfo = NULL;
code = createExprInfo(pSessionNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
QUERY_CHECK_CODE(code, lino, _error);
SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc);
initBasicInfo(&pInfo->binfo, pResBlock);
@ -1709,7 +1724,10 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh
if (pSessionNode->window.pExprs != NULL) {
int32_t numOfScalar = 0;
SExprInfo* pScalarExprInfo = createExprInfo(pSessionNode->window.pExprs, NULL, &numOfScalar);
SExprInfo* pScalarExprInfo = NULL;
code = createExprInfo(pSessionNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
QUERY_CHECK_CODE(code, lino, _error);
code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
@ -2012,7 +2030,9 @@ int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMerge
initResultSizeInfo(&pOperator->resultInfo, 512);
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pNode->window.pFuncs, NULL, &num);
SExprInfo* pExprInfo = NULL;
code = createExprInfo(pNode->window.pFuncs, NULL, &pExprInfo, &num);
QUERY_CHECK_CODE(code, lino, _error);
code = initAggSup(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
@ -2312,7 +2332,9 @@ int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeInterva
}
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
SExprInfo* pExprInfo = NULL;
code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
QUERY_CHECK_CODE(code, lino, _error);
SInterval interval = {.interval = pIntervalPhyNode->interval,
.sliding = pIntervalPhyNode->sliding,