load operator checkpoint
This commit is contained in:
parent
4d1b556bc5
commit
57baceeeaf
|
@ -390,10 +390,12 @@ void* doStreamCountDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperato
|
||||||
buf = taosDecodeFixedI32(buf, &mapSize);
|
buf = taosDecodeFixedI32(buf, &mapSize);
|
||||||
for (int32_t i = 0; i < mapSize; i++) {
|
for (int32_t i = 0; i < mapSize; i++) {
|
||||||
SSessionKey key = {0};
|
SSessionKey key = {0};
|
||||||
SResultWindowInfo winfo = {0};
|
SCountWindowInfo curWin = {0};
|
||||||
buf = decodeSSessionKey(buf, &key);
|
buf = decodeSSessionKey(buf, &key);
|
||||||
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
|
SBuffInfo buffInfo = {.rebuildWindow = false, .winBuffOp = NONE_WINDOW, .pCur = NULL};
|
||||||
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo));
|
setCountOutputBuf(&pInfo->streamAggSup, key.win.skey, key.groupId, &curWin, &buffInfo);
|
||||||
|
buf = decodeSResultWindowInfo(buf, &curWin.winInfo, pInfo->streamAggSup.resultRowSize);
|
||||||
|
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo));
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2.twAggSup
|
// 2.twAggSup
|
||||||
|
|
|
@ -2533,7 +2533,6 @@ int32_t encodeSResultWindowInfo(void** buf, SResultWindowInfo* key, int32_t outL
|
||||||
|
|
||||||
void* decodeSResultWindowInfo(void* buf, SResultWindowInfo* key, int32_t outLen) {
|
void* decodeSResultWindowInfo(void* buf, SResultWindowInfo* key, int32_t outLen) {
|
||||||
buf = taosDecodeFixedBool(buf, &key->isOutput);
|
buf = taosDecodeFixedBool(buf, &key->isOutput);
|
||||||
key->pStatePos->pRowBuff = NULL;
|
|
||||||
buf = decodeSSessionKey(buf, &key->sessionWin);
|
buf = decodeSSessionKey(buf, &key->sessionWin);
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue