fix(stream):fix issue for stream operator decode
This commit is contained in:
parent
a032de131b
commit
681b20bdfb
|
@ -3287,9 +3287,8 @@ int32_t doStreamSessionDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpe
|
||||||
int32_t mapSize = 0;
|
int32_t mapSize = 0;
|
||||||
buf = taosDecodeFixedI32(buf, &mapSize);
|
buf = taosDecodeFixedI32(buf, &mapSize);
|
||||||
for (int32_t i = 0; i < mapSize; i++) {
|
for (int32_t i = 0; i < mapSize; i++) {
|
||||||
SSessionKey key = {0};
|
|
||||||
SResultWindowInfo winfo = {0};
|
SResultWindowInfo winfo = {0};
|
||||||
buf = decodeSSessionKey(buf, &key);
|
buf = decodeSSessionKey(buf, &winfo.sessionWin);
|
||||||
int32_t winCode = TSDB_CODE_SUCCESS;
|
int32_t winCode = TSDB_CODE_SUCCESS;
|
||||||
code = pAggSup->stateStore.streamStateSessionAddIfNotExist(
|
code = pAggSup->stateStore.streamStateSessionAddIfNotExist(
|
||||||
pAggSup->pState, &winfo.sessionWin, pAggSup->gap, (void**)&winfo.pStatePos, &pAggSup->resultRowSize, &winCode);
|
pAggSup->pState, &winfo.sessionWin, pAggSup->gap, (void**)&winfo.pStatePos, &pAggSup->resultRowSize, &winCode);
|
||||||
|
@ -3298,7 +3297,7 @@ int32_t doStreamSessionDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpe
|
||||||
|
|
||||||
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
|
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
|
||||||
code =
|
code =
|
||||||
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo));
|
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &winfo.sessionWin, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo));
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4588,9 +4587,8 @@ int32_t doStreamStateDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
|
||||||
int32_t mapSize = 0;
|
int32_t mapSize = 0;
|
||||||
buf = taosDecodeFixedI32(buf, &mapSize);
|
buf = taosDecodeFixedI32(buf, &mapSize);
|
||||||
for (int32_t i = 0; i < mapSize; i++) {
|
for (int32_t i = 0; i < mapSize; i++) {
|
||||||
SSessionKey key = {0};
|
|
||||||
SResultWindowInfo winfo = {0};
|
SResultWindowInfo winfo = {0};
|
||||||
buf = decodeSSessionKey(buf, &key);
|
buf = decodeSSessionKey(buf, &winfo.sessionWin);
|
||||||
int32_t winCode = TSDB_CODE_SUCCESS;
|
int32_t winCode = TSDB_CODE_SUCCESS;
|
||||||
code = pAggSup->stateStore.streamStateStateAddIfNotExist(
|
code = pAggSup->stateStore.streamStateStateAddIfNotExist(
|
||||||
pAggSup->pState, &winfo.sessionWin, NULL, pAggSup->stateKeySize, compareStateKey, (void**)&winfo.pStatePos,
|
pAggSup->pState, &winfo.sessionWin, NULL, pAggSup->stateKeySize, compareStateKey, (void**)&winfo.pStatePos,
|
||||||
|
@ -4599,7 +4597,7 @@ int32_t doStreamStateDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
|
||||||
|
|
||||||
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
|
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
|
||||||
code =
|
code =
|
||||||
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo));
|
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &winfo.sessionWin, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo));
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue