load operator checkpoint

This commit is contained in:
54liuyao 2024-03-05 19:37:54 +08:00
parent 57baceeeaf
commit bd430a031a
2 changed files with 10 additions and 0 deletions

View File

@ -406,6 +406,7 @@ void* doStreamEventDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperato
if (!pInfo) {
return buf;
}
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
// 4.checksum
int32_t dataLen = len - sizeof(uint32_t);
@ -423,6 +424,8 @@ void* doStreamEventDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperato
SSessionKey key = {0};
SResultWindowInfo winfo = {0};
buf = decodeSSessionKey(buf, &key);
pAggSup->stateStore.streamStateSessionAddIfNotExist(pAggSup->pState, &winfo.sessionWin, pAggSup->gap,
(void**)&winfo.pStatePos, &pAggSup->resultRowSize);
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo));
}

View File

@ -2590,6 +2590,7 @@ void* doStreamSessionDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
if (!pInfo) {
return buf;
}
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
// 5.checksum
if (isParent) {
@ -2608,6 +2609,8 @@ void* doStreamSessionDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
SSessionKey key = {0};
SResultWindowInfo winfo = {0};
buf = decodeSSessionKey(buf, &key);
pAggSup->stateStore.streamStateSessionAddIfNotExist(pAggSup->pState, &winfo.sessionWin, pAggSup->gap,
(void**)&winfo.pStatePos, &pAggSup->resultRowSize);
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo));
}
@ -3537,6 +3540,7 @@ void* doStreamStateDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperato
if (!pInfo) {
return buf;
}
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
// 5.checksum
if (isParent) {
@ -3555,6 +3559,9 @@ void* doStreamStateDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperato
SSessionKey key = {0};
SResultWindowInfo winfo = {0};
buf = decodeSSessionKey(buf, &key);
pAggSup->stateStore.streamStateStateAddIfNotExist(pAggSup->pState, &winfo.sessionWin, NULL,
pAggSup->stateKeySize, compareStateKey,
(void**)&winfo.pStatePos, &pAggSup->resultRowSize);
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo));
}