From 57baceeeaf2bc939f1f2e743c05140c005878d90 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Tue, 5 Mar 2024 19:14:20 +0800 Subject: [PATCH] load operator checkpoint --- source/libs/executor/src/streamcountwindowoperator.c | 8 +++++--- source/libs/executor/src/streamtimewindowoperator.c | 1 - 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index d9ce10d38a..720734431f 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -390,10 +390,12 @@ void* doStreamCountDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperato buf = taosDecodeFixedI32(buf, &mapSize); for (int32_t i = 0; i < mapSize; i++) { SSessionKey key = {0}; - SResultWindowInfo winfo = {0}; + SCountWindowInfo curWin = {0}; buf = decodeSSessionKey(buf, &key); - buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize); - tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo)); + SBuffInfo buffInfo = {.rebuildWindow = false, .winBuffOp = NONE_WINDOW, .pCur = NULL}; + setCountOutputBuf(&pInfo->streamAggSup, key.win.skey, key.groupId, &curWin, &buffInfo); + buf = decodeSResultWindowInfo(buf, &curWin.winInfo, pInfo->streamAggSup.resultRowSize); + tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo)); } // 2.twAggSup diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 00877ae1b1..3b62cc80e3 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -2533,7 +2533,6 @@ int32_t encodeSResultWindowInfo(void** buf, SResultWindowInfo* key, int32_t outL void* decodeSResultWindowInfo(void* buf, SResultWindowInfo* key, int32_t outLen) { buf = taosDecodeFixedBool(buf, &key->isOutput); - key->pStatePos->pRowBuff = NULL; buf = decodeSSessionKey(buf, &key->sessionWin); return buf; }