commit
5e2b0ee299
|
@ -149,6 +149,9 @@ _error:
|
|||
}
|
||||
if (pOperator != NULL) {
|
||||
pOperator->info = NULL;
|
||||
if (pOperator->pDownstream == NULL && downstream != NULL) {
|
||||
destroyOperator(downstream);
|
||||
}
|
||||
destroyOperator(pOperator);
|
||||
}
|
||||
pTaskInfo->code = code;
|
||||
|
|
|
@ -344,6 +344,9 @@ _error:
|
|||
|
||||
if (pOperator != NULL) {
|
||||
pOperator->info = NULL;
|
||||
if (pOperator->pDownstream == NULL && downstream != NULL) {
|
||||
destroyOperator(downstream);
|
||||
}
|
||||
destroyOperator(pOperator);
|
||||
}
|
||||
pTaskInfo->code = code;
|
||||
|
|
|
@ -147,6 +147,9 @@ _error:
|
|||
|
||||
if (pOperator != NULL) {
|
||||
pOperator->info = NULL;
|
||||
if (pOperator->pDownstream == NULL && downstream != NULL) {
|
||||
destroyOperator(downstream);
|
||||
}
|
||||
destroyOperator(pOperator);
|
||||
}
|
||||
pTaskInfo->code = code;
|
||||
|
|
|
@ -1814,6 +1814,10 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
|
|||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
res->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT};
|
||||
code = nodesListAppend(pFuncNode->pParameterList, (SNode*)res);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
nodesDestroyNode((SNode*)res);
|
||||
res = NULL;
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
#endif
|
||||
|
@ -1945,7 +1949,7 @@ int32_t createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, SExprInfo**
|
|||
SExprInfo* pExp = &pExprs[i];
|
||||
code = createExprFromTargetNode(pExp, pTargetNode);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosMemoryFreeClear(pExprs);
|
||||
destroyExprInfo(pExprs, *numOfExprs);
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -579,6 +579,9 @@ _error:
|
|||
pTaskInfo->code = code;
|
||||
if (pOperator != NULL) {
|
||||
pOperator->info = NULL;
|
||||
if (pOperator->pDownstream == NULL && downstream != NULL) {
|
||||
destroyOperator(downstream);
|
||||
}
|
||||
destroyOperator(pOperator);
|
||||
}
|
||||
return code;
|
||||
|
|
|
@ -1510,6 +1510,9 @@ _error:
|
|||
|
||||
if (pOperator != NULL) {
|
||||
pOperator->info = NULL;
|
||||
if (pOperator->pDownstream == NULL && pDownstream != NULL && (*pDownstream) != NULL) {
|
||||
destroyOperator(*pDownstream);
|
||||
}
|
||||
destroyOperator(pOperator);
|
||||
}
|
||||
pTaskInfo->code = code;
|
||||
|
|
|
@ -561,6 +561,10 @@ int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNo
|
|||
}
|
||||
initBasicInfo(&pInfo->binfo, pResBlock);
|
||||
|
||||
pInfo->pGroupCols = NULL;
|
||||
code = extractColumnInfo(pAggNode->pGroupKeys, &pInfo->pGroupCols);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
int32_t numOfScalarExpr = 0;
|
||||
SExprInfo* pScalarExprInfo = NULL;
|
||||
if (pAggNode->pExprs != NULL) {
|
||||
|
@ -568,10 +572,6 @@ int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNo
|
|||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
}
|
||||
|
||||
pInfo->pGroupCols = NULL;
|
||||
code = extractColumnInfo(pAggNode->pGroupKeys, &pInfo->pGroupCols);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
|
@ -618,6 +618,9 @@ _error:
|
|||
|
||||
if (pOperator) {
|
||||
pOperator->info = NULL;
|
||||
if (pOperator->pDownstream == NULL && downstream != NULL) {
|
||||
destroyOperator(downstream);
|
||||
}
|
||||
destroyOperator(pOperator);
|
||||
}
|
||||
|
||||
|
@ -1162,6 +1165,8 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo
|
|||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pPartNode->pTargets, NULL, &pExprInfo, &numOfCols);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||
|
||||
pInfo->pGroupCols = makeColumnArrayFromList(pPartNode->pPartitionKeys);
|
||||
|
||||
|
@ -1227,8 +1232,6 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo
|
|||
|
||||
setOperatorInfo(pOperator, "PartitionOperator", QUERY_NODE_PHYSICAL_PLAN_PARTITION, false, OP_NOT_OPENED, pInfo,
|
||||
pTaskInfo);
|
||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, hashPartition, NULL, destroyPartitionOperatorInfo,
|
||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||
|
@ -1248,6 +1251,9 @@ _error:
|
|||
pTaskInfo->code = code;
|
||||
if (pOperator != NULL) {
|
||||
pOperator->info = NULL;
|
||||
if (pOperator->pDownstream == NULL && downstream != NULL) {
|
||||
destroyOperator(downstream);
|
||||
}
|
||||
destroyOperator(pOperator);
|
||||
}
|
||||
TAOS_RETURN(code);
|
||||
|
@ -1797,6 +1803,9 @@ _error:
|
|||
if (pInfo != NULL) destroyStreamPartitionOperatorInfo(pInfo);
|
||||
if (pOperator != NULL) {
|
||||
pOperator->info = NULL;
|
||||
if (pOperator->pDownstream == NULL && downstream != NULL) {
|
||||
destroyOperator(downstream);
|
||||
}
|
||||
destroyOperator(pOperator);
|
||||
}
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
|
|
|
@ -107,10 +107,6 @@ int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode*
|
|||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
int32_t lino = 0;
|
||||
int32_t numOfCols = 0;
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pProjPhyNode->pProjections, NULL, &pExprInfo, &numOfCols);
|
||||
TSDB_CHECK_CODE(code, lino, _error);
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pProjPhyNode->node.pOutputDataBlockDesc);
|
||||
TSDB_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
|
@ -148,6 +144,11 @@ int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode*
|
|||
}
|
||||
|
||||
initResultSizeInfo(&pOperator->resultInfo, numOfRows);
|
||||
|
||||
int32_t numOfCols = 0;
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pProjPhyNode->pProjections, NULL, &pExprInfo, &numOfCols);
|
||||
TSDB_CHECK_CODE(code, lino, _error);
|
||||
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
|
||||
pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
|
||||
TSDB_CHECK_CODE(code, lino, _error);
|
||||
|
@ -182,6 +183,9 @@ _error:
|
|||
if (pInfo != NULL) destroyProjectOperatorInfo(pInfo);
|
||||
if (pOperator != NULL) {
|
||||
pOperator->info = NULL;
|
||||
if (pOperator->pDownstream == NULL && downstream != NULL) {
|
||||
destroyOperator(downstream);
|
||||
}
|
||||
destroyOperator(pOperator);
|
||||
}
|
||||
pTaskInfo->code = code;
|
||||
|
@ -470,11 +474,6 @@ int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
|
||||
SIndefRowsFuncPhysiNode* pPhyNode = (SIndefRowsFuncPhysiNode*)pNode;
|
||||
|
||||
int32_t numOfExpr = 0;
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pPhyNode->pFuncs, NULL, &pExprInfo, &numOfExpr);
|
||||
TSDB_CHECK_CODE(code, lino, _error);
|
||||
|
||||
if (pPhyNode->pExprs != NULL) {
|
||||
int32_t num = 0;
|
||||
SExprInfo* pSExpr = NULL;
|
||||
|
@ -501,6 +500,11 @@ int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
code = blockDataEnsureCapacity(pResBlock, numOfRows);
|
||||
TSDB_CHECK_CODE(code, lino, _error);
|
||||
|
||||
int32_t numOfExpr = 0;
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pPhyNode->pFuncs, NULL, &pExprInfo, &numOfExpr);
|
||||
TSDB_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str,
|
||||
pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
|
||||
TSDB_CHECK_CODE(code, lino, _error);
|
||||
|
@ -534,6 +538,9 @@ _error:
|
|||
if (pInfo != NULL) destroyIndefinitOperatorInfo(pInfo);
|
||||
if (pOperator != NULL) {
|
||||
pOperator->info = NULL;
|
||||
if (pOperator->pDownstream == NULL && downstream != NULL) {
|
||||
destroyOperator(downstream);
|
||||
}
|
||||
destroyOperator(pOperator);
|
||||
}
|
||||
pTaskInfo->code = code;
|
||||
|
|
|
@ -166,6 +166,9 @@ _error:
|
|||
|
||||
if (pOperator != NULL) {
|
||||
pOperator->info = NULL;
|
||||
if (pOperator->pDownstream == NULL && downstream != NULL) {
|
||||
destroyOperator(downstream);
|
||||
}
|
||||
destroyOperator(pOperator);
|
||||
}
|
||||
pTaskInfo->code = code;
|
||||
|
@ -841,6 +844,9 @@ _error:
|
|||
}
|
||||
if (pOperator != NULL) {
|
||||
pOperator->info = NULL;
|
||||
if (pOperator->pDownstream == NULL && downstream != NULL) {
|
||||
destroyOperator(downstream);
|
||||
}
|
||||
destroyOperator(pOperator);
|
||||
}
|
||||
return code;
|
||||
|
|
|
@ -834,12 +834,14 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
}
|
||||
SExprSupp* pExpSup = &pOperator->exprSupp;
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pCountNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
|
@ -863,7 +865,6 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pStDeleted = tSimpleHashInit(64, hashFn);
|
||||
QUERY_CHECK_NULL(pInfo->pStDeleted, code, lino, _error, terrno);
|
||||
|
@ -928,6 +929,9 @@ _error:
|
|||
|
||||
if (pOperator != NULL) {
|
||||
pOperator->info = NULL;
|
||||
if (pOperator->pDownstream == NULL && downstream != NULL) {
|
||||
destroyOperator(downstream);
|
||||
}
|
||||
destroyOperator(pOperator);
|
||||
}
|
||||
pTaskInfo->code = code;
|
||||
|
|
|
@ -895,14 +895,16 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
|
||||
SExprSupp* pExpSup = &pOperator->exprSupp;
|
||||
int32_t numOfCols = 0;
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pEventNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
|
@ -986,6 +988,9 @@ _error:
|
|||
if (pInfo != NULL) destroyStreamEventOperatorInfo(pInfo);
|
||||
if (pOperator != NULL) {
|
||||
pOperator->info = NULL;
|
||||
if (pOperator->pDownstream == NULL && downstream != NULL) {
|
||||
destroyOperator(downstream);
|
||||
}
|
||||
destroyOperator(pOperator);
|
||||
}
|
||||
pTaskInfo->code = code;
|
||||
|
|
|
@ -1370,6 +1370,9 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi
|
|||
code = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pFillExprInfo, &numOfFillCols);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = initExprSupp(&pOperator->exprSupp, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
pInfo->pFillSup = initStreamFillSup(pPhyFillNode, pInterval, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI);
|
||||
if (!pInfo->pFillSup) {
|
||||
code = TSDB_CODE_FAILED;
|
||||
|
@ -1440,9 +1443,6 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi
|
|||
code = filterInitFromNode((SNode*)pPhyFillNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = initExprSupp(&pOperator->exprSupp, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
pInfo->srcRowIndex = -1;
|
||||
setOperatorInfo(pOperator, "StreamFillOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, false, OP_NOT_OPENED, pInfo,
|
||||
pTaskInfo);
|
||||
|
@ -1463,6 +1463,9 @@ _error:
|
|||
if (pInfo != NULL) destroyStreamFillOperatorInfo(pInfo);
|
||||
if (pOperator != NULL) {
|
||||
pOperator->info = NULL;
|
||||
if (pOperator->pDownstream == NULL && downstream != NULL) {
|
||||
destroyOperator(downstream);
|
||||
}
|
||||
destroyOperator(pOperator);
|
||||
}
|
||||
pTaskInfo->code = code;
|
||||
|
|
|
@ -1896,11 +1896,6 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN
|
|||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
}
|
||||
|
||||
int32_t numOfCols = 0;
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
initBasicInfo(&pInfo->binfo, pResBlock);
|
||||
|
@ -1914,6 +1909,12 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN
|
|||
qInfo("copy state %p to %p", pTaskInfo->streamInfo.pState, pInfo->pState);
|
||||
|
||||
pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1, pInfo->primaryTsIndex);
|
||||
|
||||
int32_t numOfCols = 0;
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
|
||||
pInfo->pState, &pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
@ -2018,6 +2019,9 @@ _error:
|
|||
if (pInfo != NULL) destroyStreamFinalIntervalOperatorInfo(pInfo);
|
||||
if (pOperator != NULL) {
|
||||
pOperator->info = NULL;
|
||||
if (pOperator->pDownstream == NULL && downstream != NULL) {
|
||||
destroyOperator(downstream);
|
||||
}
|
||||
destroyOperator(pOperator);
|
||||
}
|
||||
pTaskInfo->code = code;
|
||||
|
@ -3739,13 +3743,15 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode
|
|||
}
|
||||
}
|
||||
SExprSupp* pExpSup = &pOperator->exprSupp;
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pSessionNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
|
@ -3771,7 +3777,7 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode
|
|||
if (pSessionNode->window.pTsEnd) {
|
||||
pInfo->endTsIndex = ((SColumnNode*)pSessionNode->window.pTsEnd)->slotId;
|
||||
}
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
|
||||
pInfo->order = TSDB_ORDER_ASC;
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pStDeleted = tSimpleHashInit(64, hashFn);
|
||||
|
@ -3843,6 +3849,9 @@ _error:
|
|||
|
||||
if (pOperator != NULL) {
|
||||
pOperator->info = NULL;
|
||||
if (pOperator->pDownstream == NULL && downstream != NULL) {
|
||||
destroyOperator(downstream);
|
||||
}
|
||||
destroyOperator(pOperator);
|
||||
}
|
||||
pTaskInfo->code = code;
|
||||
|
@ -4102,6 +4111,9 @@ _error:
|
|||
}
|
||||
if (pOperator != NULL) {
|
||||
pOperator->info = NULL;
|
||||
if (pOperator->pDownstream == NULL && downstream != NULL) {
|
||||
destroyOperator(downstream);
|
||||
}
|
||||
destroyOperator(pOperator);
|
||||
}
|
||||
pTaskInfo->code = code;
|
||||
|
@ -4915,14 +4927,16 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
|
||||
SExprSupp* pExpSup = &pOperator->exprSupp;
|
||||
int32_t numOfCols = 0;
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pStateNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
|
@ -4998,6 +5012,9 @@ _error:
|
|||
if (pInfo != NULL) destroyStreamStateOperatorInfo(pInfo);
|
||||
if (pOperator != NULL) {
|
||||
pOperator->info = NULL;
|
||||
if (pOperator->pDownstream == NULL && downstream != NULL) {
|
||||
destroyOperator(downstream);
|
||||
}
|
||||
destroyOperator(pOperator);
|
||||
}
|
||||
pTaskInfo->code = code;
|
||||
|
@ -5206,10 +5223,6 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
|
||||
SStreamIntervalPhysiNode* pIntervalPhyNode = (SStreamIntervalPhysiNode*)pPhyNode;
|
||||
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
initBasicInfo(&pInfo->binfo, pResBlock);
|
||||
|
@ -5253,6 +5266,9 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1, pInfo->primaryTsIndex);
|
||||
|
||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, pInfo->pState,
|
||||
&pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
@ -5337,6 +5353,9 @@ _error:
|
|||
if (pInfo != NULL) destroyStreamFinalIntervalOperatorInfo(pInfo);
|
||||
if (pOperator != NULL) {
|
||||
pOperator->info = NULL;
|
||||
if (pOperator->pDownstream == NULL && downstream != NULL) {
|
||||
destroyOperator(downstream);
|
||||
}
|
||||
destroyOperator(pOperator);
|
||||
}
|
||||
pTaskInfo->code = code;
|
||||
|
|
|
@ -570,8 +570,8 @@ int32_t taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFi
|
|||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosArrayDestroy(pFillInfo->next.pRowVal);
|
||||
taosArrayDestroy(pFillInfo->prev.pRowVal);
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
pFillInfo = taosDestroyFillInfo(pFillInfo);
|
||||
}
|
||||
(*ppFillInfo) = pFillInfo;
|
||||
return code;
|
||||
|
@ -764,6 +764,7 @@ SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprIn
|
|||
SValueNode* pv = (SValueNode*)nodesListGetNode(pValNode->pNodeList, index);
|
||||
QUERY_CHECK_NULL(pv, code, lino, _end, terrno);
|
||||
code = nodesValueNodeToVariant(pv, &pFillCol[i].fillVal);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
goto _end;
|
||||
|
|
|
@ -1212,6 +1212,9 @@ _error:
|
|||
if (pInfo != NULL) destroyTimeSliceOperatorInfo(pInfo);
|
||||
if (pOperator != NULL) {
|
||||
pOperator->info = NULL;
|
||||
if (pOperator->pDownstream == NULL && downstream != NULL) {
|
||||
destroyOperator(downstream);
|
||||
}
|
||||
destroyOperator(pOperator);
|
||||
}
|
||||
pTaskInfo->code = code;
|
||||
|
@ -1249,10 +1252,11 @@ void destroyTimeSliceOperatorInfo(void* param) {
|
|||
}
|
||||
|
||||
cleanupExprSupp(&pInfo->scalarSup);
|
||||
|
||||
for (int32_t i = 0; i < pInfo->pFillColInfo->numOfFillExpr; ++i) {
|
||||
taosVariantDestroy(&pInfo->pFillColInfo[i].fillVal);
|
||||
if (pInfo->pFillColInfo != NULL) {
|
||||
for (int32_t i = 0; i < pInfo->pFillColInfo->numOfFillExpr; ++i) {
|
||||
taosVariantDestroy(&pInfo->pFillColInfo[i].fillVal);
|
||||
}
|
||||
taosMemoryFree(pInfo->pFillColInfo);
|
||||
}
|
||||
taosMemoryFree(pInfo->pFillColInfo);
|
||||
taosMemoryFreeClear(param);
|
||||
}
|
||||
|
|
|
@ -1421,6 +1421,9 @@ _error:
|
|||
}
|
||||
if (pOperator != NULL) {
|
||||
pOperator->info = NULL;
|
||||
if (pOperator->pDownstream == NULL && downstream != NULL) {
|
||||
destroyOperator(downstream);
|
||||
}
|
||||
destroyOperator(pOperator);
|
||||
}
|
||||
pTaskInfo->code = code;
|
||||
|
@ -1700,6 +1703,9 @@ _error:
|
|||
|
||||
if (pOperator != NULL) {
|
||||
pOperator->info = NULL;
|
||||
if (pOperator->pDownstream == NULL && downstream != NULL) {
|
||||
destroyOperator(downstream);
|
||||
}
|
||||
destroyOperator(pOperator);
|
||||
}
|
||||
pTaskInfo->code = code;
|
||||
|
@ -1740,15 +1746,15 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh
|
|||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
initBasicInfo(&pInfo->binfo, pResBlock);
|
||||
|
||||
int32_t numOfCols = 0;
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pSessionNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
initBasicInfo(&pInfo->binfo, pResBlock);
|
||||
|
||||
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
|
||||
pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
@ -1796,6 +1802,9 @@ _error:
|
|||
if (pInfo != NULL) destroySWindowOperatorInfo(pInfo);
|
||||
if (pOperator != NULL) {
|
||||
pOperator->info = NULL;
|
||||
if (pOperator->pDownstream == NULL && downstream != NULL) {
|
||||
destroyOperator(downstream);
|
||||
}
|
||||
destroyOperator(pOperator);
|
||||
}
|
||||
pTaskInfo->code = code;
|
||||
|
@ -2113,6 +2122,9 @@ _error:
|
|||
if (miaInfo != NULL) destroyMAIOperatorInfo(miaInfo);
|
||||
if (pOperator != NULL) {
|
||||
pOperator->info = NULL;
|
||||
if (pOperator->pDownstream == NULL && downstream != NULL) {
|
||||
destroyOperator(downstream);
|
||||
}
|
||||
destroyOperator(pOperator);
|
||||
}
|
||||
pTaskInfo->code = code;
|
||||
|
@ -2380,11 +2392,6 @@ int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeInterva
|
|||
goto _error;
|
||||
}
|
||||
|
||||
int32_t num = 0;
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
SInterval interval = {.interval = pIntervalPhyNode->interval,
|
||||
.sliding = pIntervalPhyNode->sliding,
|
||||
.intervalUnit = pIntervalPhyNode->intervalUnit,
|
||||
|
@ -2408,6 +2415,11 @@ int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeInterva
|
|||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
|
||||
int32_t num = 0;
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = initAggSup(pExprSupp, &pIntervalInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
|
||||
pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -2450,6 +2462,9 @@ _error:
|
|||
|
||||
if (pOperator != NULL) {
|
||||
pOperator->info = NULL;
|
||||
if (pOperator->pDownstream == NULL && downstream != NULL) {
|
||||
destroyOperator(downstream);
|
||||
}
|
||||
destroyOperator(pOperator);
|
||||
}
|
||||
pTaskInfo->code = code;
|
||||
|
|
Loading…
Reference in New Issue