diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index f6d6ae55db..4213d71db4 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -2043,6 +2043,7 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN setOperatorStreamStateFn(pOperator, streamIntervalReleaseState, streamIntervalReloadState); if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL || pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL) { + pInfo->basic.primaryPkIndex = -1; code = initIntervalDownStream(downstream, pPhyNode->type, pInfo, &pInfo->basic); QUERY_CHECK_CODE(code, lino, _error); } @@ -3876,6 +3877,7 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionReloadState); if (downstream) { + pInfo->basic.primaryPkIndex = -1; code = initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup, &pInfo->basic); QUERY_CHECK_CODE(code, lino, _error); @@ -5387,6 +5389,7 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* taosMemoryFree(buff); } + pInfo->basic.primaryPkIndex = -1; code = initIntervalDownStream(downstream, pPhyNode->type, pInfo, &pInfo->basic); QUERY_CHECK_CODE(code, lino, _error);