From 24ec06bfe4b85ba6ec53757072042903bb5ec114 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Sun, 25 Feb 2024 19:28:32 +0800 Subject: [PATCH 1/4] reset close window state --- source/libs/executor/src/streamcountwindowoperator.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 294c2730df..3daefcdc7e 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -287,6 +287,7 @@ static void doStreamCountAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl } } if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { + curWin.winInfo.pStatePos->beUpdated = true; SSessionKey key = {0}; getSessionHashKey(&curWin.winInfo.sessionWin, &key); tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo)); @@ -330,6 +331,7 @@ static SSDataBlock* buildCountResult(SOperatorInfo* pOperator) { int32_t doStreamCountEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOperator, bool isParent) { SStreamCountAggOperatorInfo* pInfo = pOperator->info; + return 0; if (!pInfo) { return 0; } From 27219f70afb35ab44259ddbcdce37f5d55067d4c Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Sun, 25 Feb 2024 19:30:56 +0800 Subject: [PATCH 2/4] reset close window state --- source/libs/executor/src/streamcountwindowoperator.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 3daefcdc7e..2d523a49fd 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -331,7 +331,6 @@ static SSDataBlock* buildCountResult(SOperatorInfo* pOperator) { int32_t doStreamCountEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOperator, bool isParent) { SStreamCountAggOperatorInfo* pInfo = pOperator->info; - return 0; if (!pInfo) { return 0; } From 9d560c494d655e723f41501a1b5320947e9f4332 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Sun, 25 Feb 2024 19:56:38 +0800 Subject: [PATCH 3/4] reset close window state --- source/libs/executor/src/streamcountwindowoperator.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 2d523a49fd..70aee38264 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -290,6 +290,7 @@ static void doStreamCountAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl curWin.winInfo.pStatePos->beUpdated = true; SSessionKey key = {0}; getSessionHashKey(&curWin.winInfo.sessionWin, &key); + qDebug("===stream=== window close %" PRId64, key.win.skey); tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo)); } @@ -345,7 +346,7 @@ int32_t doStreamCountEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOper size_t keyLen = 0; int32_t iter = 0; while ((pIte = tSimpleHashIterate(pInfo->streamAggSup.pResultRows, pIte, &iter)) != NULL) { - void* key = taosHashGetKey(pIte, &keyLen); + void* key = tSimpleHashGetKey(pIte, &keyLen); tlen += encodeSSessionKey(buf, key); tlen += encodeSResultWindowInfo(buf, pIte, pInfo->streamAggSup.resultRowSize); } From 176afbb74a8db188cd5f2e9d50b9b65eda259029 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Sun, 25 Feb 2024 20:07:04 +0800 Subject: [PATCH 4/4] reset close window state --- source/libs/executor/src/streamcountwindowoperator.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 70aee38264..93bff984d3 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -290,7 +290,6 @@ static void doStreamCountAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl curWin.winInfo.pStatePos->beUpdated = true; SSessionKey key = {0}; getSessionHashKey(&curWin.winInfo.sessionWin, &key); - qDebug("===stream=== window close %" PRId64, key.win.skey); tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo)); }