From 0c62f5417a9ce26974a55efc5d4fcff32da757e0 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Mon, 22 Apr 2024 18:05:35 +0800 Subject: [PATCH] save invalid delete info for final interval op --- source/libs/executor/src/streamtimewindowoperator.c | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 5c79ffa6de..0e3e74f16f 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -262,7 +262,7 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDa if (chIds) { int32_t childId = getChildIndex(pBlock); if (pInvalidWins) { - qDebug("===stream===save mid delete window:%" PRId64 ",groupId:%" PRId64 ",chId:%d", winRes.ts, winRes.groupId, childId); + qDebug("===stream===save invalid delete window:%" PRId64 ",groupId:%" PRId64 ",chId:%d", winRes.ts, winRes.groupId, childId); taosHashPut(pInvalidWins, &winRes, sizeof(SWinKey), NULL, 0); } @@ -654,11 +654,12 @@ static bool processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFina .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey}; // add pull data request - qDebug("===stream===prepare final retrive for delete window:%" PRId64 ",groupId%" PRId64 ", size:%d", winRes.ts, winRes.groupId, numOfCh); + qDebug("===stream===prepare final retrive for delete window:%" PRId64 ",groupId:%" PRId64 ", size:%d", winRes.ts, winRes.groupId, numOfCh); if (IS_MID_INTERVAL_OP(pOperator)) { SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperator->info; taosArrayPush(pInfo->pMidPullDatas, &winRes); } else if (savePullWindow(&pull, pPullWins) == TSDB_CODE_SUCCESS) { + taosArrayPush(pInfo->pDelWins, &winRes); addPullWindow(pMap, &winRes, numOfCh); if (pInfo->destHasPrimaryKey) { tSimpleHashPut(pInfo->pDeletedMap,&winRes, sizeof(SWinKey), NULL, 0); @@ -1328,7 +1329,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { SArray* delWins = taosArrayInit(8, sizeof(SWinKey)); - doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap, NULL); + SHashObj* finalMap = IS_FINAL_INTERVAL_OP(pOperator) ? pInfo->pFinalPullDataMap : NULL; + doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap, finalMap); if (IS_FINAL_INTERVAL_OP(pOperator)) { int32_t chId = getChildIndex(pBlock); addRetriveWindow(delWins, pInfo, chId);