From 681b20bdfb0d57f9f9f56aa7b7b9f74840638acb Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Fri, 20 Sep 2024 16:41:14 +0800 Subject: [PATCH] fix(stream):fix issue for stream operator decode --- source/libs/executor/src/streamtimewindowoperator.c | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 4213d71db4..2d027e8fef 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -3287,9 +3287,8 @@ int32_t doStreamSessionDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpe int32_t mapSize = 0; buf = taosDecodeFixedI32(buf, &mapSize); for (int32_t i = 0; i < mapSize; i++) { - SSessionKey key = {0}; SResultWindowInfo winfo = {0}; - buf = decodeSSessionKey(buf, &key); + buf = decodeSSessionKey(buf, &winfo.sessionWin); int32_t winCode = TSDB_CODE_SUCCESS; code = pAggSup->stateStore.streamStateSessionAddIfNotExist( pAggSup->pState, &winfo.sessionWin, pAggSup->gap, (void**)&winfo.pStatePos, &pAggSup->resultRowSize, &winCode); @@ -3298,7 +3297,7 @@ int32_t doStreamSessionDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpe buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize); code = - tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo)); + tSimpleHashPut(pInfo->streamAggSup.pResultRows, &winfo.sessionWin, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo)); QUERY_CHECK_CODE(code, lino, _end); } @@ -4588,9 +4587,8 @@ int32_t doStreamStateDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera int32_t mapSize = 0; buf = taosDecodeFixedI32(buf, &mapSize); for (int32_t i = 0; i < mapSize; i++) { - SSessionKey key = {0}; SResultWindowInfo winfo = {0}; - buf = decodeSSessionKey(buf, &key); + buf = decodeSSessionKey(buf, &winfo.sessionWin); int32_t winCode = TSDB_CODE_SUCCESS; code = pAggSup->stateStore.streamStateStateAddIfNotExist( pAggSup->pState, &winfo.sessionWin, NULL, pAggSup->stateKeySize, compareStateKey, (void**)&winfo.pStatePos, @@ -4599,7 +4597,7 @@ int32_t doStreamStateDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize); code = - tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo)); + tSimpleHashPut(pInfo->streamAggSup.pResultRows, &winfo.sessionWin, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo)); QUERY_CHECK_CODE(code, lino, _end); }