From 93f546f50d198a26884da3abf729dcc5997cdd28 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Mon, 10 Jul 2023 19:33:31 +0800 Subject: [PATCH] stream operator decode --- source/libs/executor/src/timewindowoperator.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index a0bf9d052a..fd04bdac04 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2616,7 +2616,7 @@ int32_t doStreamIntervalEncodeOpState(void **buf, SOperatorInfo* pOperator) { tlen += encodeSPullWindowInfoArray(buf, pInfo->pPullWins); // 5.dataVersion - tlen += taosEncodeFixedI32(buf, pInfo->dataVersion); + tlen += taosEncodeFixedI64(buf, pInfo->dataVersion); return tlen; } @@ -3086,6 +3086,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, if (res == TSDB_CODE_SUCCESS) { doStreamIntervalDecodeOpState(buff, pOperator); taosMemoryFree(buff); + setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId); } return pOperator; @@ -4153,6 +4154,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh if (res == TSDB_CODE_SUCCESS) { doStreamSessionDecodeOpState(buff, pOperator); taosMemoryFree(buff); + setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->streamAggSup.pState->checkPointId); } setOperatorInfo(pOperator, "StreamSessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true, @@ -4876,6 +4878,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys if (res == TSDB_CODE_SUCCESS) { doStreamStateDecodeOpState(buff, pOperator); taosMemoryFree(buff); + setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->streamAggSup.pState->checkPointId); } setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED, @@ -5732,6 +5735,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys if (res == TSDB_CODE_SUCCESS) { doStreamIntervalDecodeOpState(buff, pOperator); taosMemoryFree(buff); + setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId); } initIntervalDownStream(downstream, pPhyNode->type, pInfo);