Merge pull request #28003 from taosdata/fix/ly_stream_3.0
fix(stream):fix issue for stream operator decode
This commit is contained in:
commit
2f698f525e
|
@ -3287,9 +3287,8 @@ int32_t doStreamSessionDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpe
|
|||
int32_t mapSize = 0;
|
||||
buf = taosDecodeFixedI32(buf, &mapSize);
|
||||
for (int32_t i = 0; i < mapSize; i++) {
|
||||
SSessionKey key = {0};
|
||||
SResultWindowInfo winfo = {0};
|
||||
buf = decodeSSessionKey(buf, &key);
|
||||
buf = decodeSSessionKey(buf, &winfo.sessionWin);
|
||||
int32_t winCode = TSDB_CODE_SUCCESS;
|
||||
code = pAggSup->stateStore.streamStateSessionAddIfNotExist(
|
||||
pAggSup->pState, &winfo.sessionWin, pAggSup->gap, (void**)&winfo.pStatePos, &pAggSup->resultRowSize, &winCode);
|
||||
|
@ -3298,7 +3297,7 @@ int32_t doStreamSessionDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpe
|
|||
|
||||
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
|
||||
code =
|
||||
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo));
|
||||
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &winfo.sessionWin, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo));
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
||||
|
@ -4588,9 +4587,8 @@ int32_t doStreamStateDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
|
|||
int32_t mapSize = 0;
|
||||
buf = taosDecodeFixedI32(buf, &mapSize);
|
||||
for (int32_t i = 0; i < mapSize; i++) {
|
||||
SSessionKey key = {0};
|
||||
SResultWindowInfo winfo = {0};
|
||||
buf = decodeSSessionKey(buf, &key);
|
||||
buf = decodeSSessionKey(buf, &winfo.sessionWin);
|
||||
int32_t winCode = TSDB_CODE_SUCCESS;
|
||||
code = pAggSup->stateStore.streamStateStateAddIfNotExist(
|
||||
pAggSup->pState, &winfo.sessionWin, NULL, pAggSup->stateKeySize, compareStateKey, (void**)&winfo.pStatePos,
|
||||
|
@ -4599,7 +4597,7 @@ int32_t doStreamStateDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
|
|||
|
||||
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
|
||||
code =
|
||||
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo));
|
||||
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &winfo.sessionWin, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo));
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue