fix issue
This commit is contained in:
parent
cf796a45c1
commit
a82c220645
|
@ -561,6 +561,10 @@ int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNo
|
|||
}
|
||||
initBasicInfo(&pInfo->binfo, pResBlock);
|
||||
|
||||
pInfo->pGroupCols = NULL;
|
||||
code = extractColumnInfo(pAggNode->pGroupKeys, &pInfo->pGroupCols);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
int32_t numOfScalarExpr = 0;
|
||||
SExprInfo* pScalarExprInfo = NULL;
|
||||
if (pAggNode->pExprs != NULL) {
|
||||
|
@ -568,10 +572,6 @@ int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNo
|
|||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
}
|
||||
|
||||
pInfo->pGroupCols = NULL;
|
||||
code = extractColumnInfo(pAggNode->pGroupKeys, &pInfo->pGroupCols);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
|
@ -1165,6 +1165,8 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo
|
|||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pPartNode->pTargets, NULL, &pExprInfo, &numOfCols);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||
|
||||
pInfo->pGroupCols = makeColumnArrayFromList(pPartNode->pPartitionKeys);
|
||||
|
||||
|
@ -1230,8 +1232,6 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo
|
|||
|
||||
setOperatorInfo(pOperator, "PartitionOperator", QUERY_NODE_PHYSICAL_PLAN_PARTITION, false, OP_NOT_OPENED, pInfo,
|
||||
pTaskInfo);
|
||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, hashPartition, NULL, destroyPartitionOperatorInfo,
|
||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||
|
|
|
@ -474,11 +474,6 @@ int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
|
||||
SIndefRowsFuncPhysiNode* pPhyNode = (SIndefRowsFuncPhysiNode*)pNode;
|
||||
|
||||
int32_t numOfExpr = 0;
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pPhyNode->pFuncs, NULL, &pExprInfo, &numOfExpr);
|
||||
TSDB_CHECK_CODE(code, lino, _error);
|
||||
|
||||
if (pPhyNode->pExprs != NULL) {
|
||||
int32_t num = 0;
|
||||
SExprInfo* pSExpr = NULL;
|
||||
|
@ -505,6 +500,11 @@ int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
code = blockDataEnsureCapacity(pResBlock, numOfRows);
|
||||
TSDB_CHECK_CODE(code, lino, _error);
|
||||
|
||||
int32_t numOfExpr = 0;
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pPhyNode->pFuncs, NULL, &pExprInfo, &numOfExpr);
|
||||
TSDB_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str,
|
||||
pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
|
||||
TSDB_CHECK_CODE(code, lino, _error);
|
||||
|
|
|
@ -834,12 +834,14 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
}
|
||||
SExprSupp* pExpSup = &pOperator->exprSupp;
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pCountNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
|
@ -863,7 +865,6 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pStDeleted = tSimpleHashInit(64, hashFn);
|
||||
QUERY_CHECK_NULL(pInfo->pStDeleted, code, lino, _error, terrno);
|
||||
|
|
|
@ -895,14 +895,16 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
|
||||
SExprSupp* pExpSup = &pOperator->exprSupp;
|
||||
int32_t numOfCols = 0;
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pEventNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
|
|
|
@ -1370,6 +1370,9 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi
|
|||
code = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pFillExprInfo, &numOfFillCols);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = initExprSupp(&pOperator->exprSupp, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
pInfo->pFillSup = initStreamFillSup(pPhyFillNode, pInterval, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI);
|
||||
if (!pInfo->pFillSup) {
|
||||
code = TSDB_CODE_FAILED;
|
||||
|
@ -1440,9 +1443,6 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi
|
|||
code = filterInitFromNode((SNode*)pPhyFillNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = initExprSupp(&pOperator->exprSupp, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
pInfo->srcRowIndex = -1;
|
||||
setOperatorInfo(pOperator, "StreamFillOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, false, OP_NOT_OPENED, pInfo,
|
||||
pTaskInfo);
|
||||
|
|
|
@ -1896,11 +1896,6 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN
|
|||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
}
|
||||
|
||||
int32_t numOfCols = 0;
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
initBasicInfo(&pInfo->binfo, pResBlock);
|
||||
|
@ -1914,6 +1909,12 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN
|
|||
qInfo("copy state %p to %p", pTaskInfo->streamInfo.pState, pInfo->pState);
|
||||
|
||||
pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1, pInfo->primaryTsIndex);
|
||||
|
||||
int32_t numOfCols = 0;
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
|
||||
pInfo->pState, &pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
@ -3742,13 +3743,15 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode
|
|||
}
|
||||
}
|
||||
SExprSupp* pExpSup = &pOperator->exprSupp;
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pSessionNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
|
@ -3774,7 +3777,7 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode
|
|||
if (pSessionNode->window.pTsEnd) {
|
||||
pInfo->endTsIndex = ((SColumnNode*)pSessionNode->window.pTsEnd)->slotId;
|
||||
}
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
|
||||
pInfo->order = TSDB_ORDER_ASC;
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pStDeleted = tSimpleHashInit(64, hashFn);
|
||||
|
@ -4924,14 +4927,16 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
|
||||
SExprSupp* pExpSup = &pOperator->exprSupp;
|
||||
int32_t numOfCols = 0;
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pStateNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
|
@ -5218,10 +5223,6 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
|
||||
SStreamIntervalPhysiNode* pIntervalPhyNode = (SStreamIntervalPhysiNode*)pPhyNode;
|
||||
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
initBasicInfo(&pInfo->binfo, pResBlock);
|
||||
|
@ -5265,6 +5266,9 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1, pInfo->primaryTsIndex);
|
||||
|
||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, pInfo->pState,
|
||||
&pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
|
|
@ -764,6 +764,7 @@ SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprIn
|
|||
SValueNode* pv = (SValueNode*)nodesListGetNode(pValNode->pNodeList, index);
|
||||
QUERY_CHECK_NULL(pv, code, lino, _end, terrno);
|
||||
code = nodesValueNodeToVariant(pv, &pFillCol[i].fillVal);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
goto _end;
|
||||
|
|
|
@ -1746,15 +1746,15 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh
|
|||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
initBasicInfo(&pInfo->binfo, pResBlock);
|
||||
|
||||
int32_t numOfCols = 0;
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pSessionNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc);
|
||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
initBasicInfo(&pInfo->binfo, pResBlock);
|
||||
|
||||
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
|
||||
pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
@ -2392,11 +2392,6 @@ int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeInterva
|
|||
goto _error;
|
||||
}
|
||||
|
||||
int32_t num = 0;
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
SInterval interval = {.interval = pIntervalPhyNode->interval,
|
||||
.sliding = pIntervalPhyNode->sliding,
|
||||
.intervalUnit = pIntervalPhyNode->intervalUnit,
|
||||
|
@ -2420,6 +2415,11 @@ int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeInterva
|
|||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
|
||||
int32_t num = 0;
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = initAggSup(pExprSupp, &pIntervalInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
|
||||
pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
|
Loading…
Reference in New Issue