diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index a0bf9d052a..fd04bdac04 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2616,7 +2616,7 @@ int32_t doStreamIntervalEncodeOpState(void **buf, SOperatorInfo* pOperator) { tlen += encodeSPullWindowInfoArray(buf, pInfo->pPullWins); // 5.dataVersion - tlen += taosEncodeFixedI32(buf, pInfo->dataVersion); + tlen += taosEncodeFixedI64(buf, pInfo->dataVersion); return tlen; } @@ -3086,6 +3086,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, if (res == TSDB_CODE_SUCCESS) { doStreamIntervalDecodeOpState(buff, pOperator); taosMemoryFree(buff); + setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId); } return pOperator; @@ -4153,6 +4154,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh if (res == TSDB_CODE_SUCCESS) { doStreamSessionDecodeOpState(buff, pOperator); taosMemoryFree(buff); + setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->streamAggSup.pState->checkPointId); } setOperatorInfo(pOperator, "StreamSessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true, @@ -4876,6 +4878,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys if (res == TSDB_CODE_SUCCESS) { doStreamStateDecodeOpState(buff, pOperator); taosMemoryFree(buff); + setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->streamAggSup.pState->checkPointId); } setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED, @@ -5732,6 +5735,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys if (res == TSDB_CODE_SUCCESS) { doStreamIntervalDecodeOpState(buff, pOperator); taosMemoryFree(buff); + setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId); } initIntervalDownStream(downstream, pPhyNode->type, pInfo);