From a77c6940da711b11eb85d3186b9e48357156c5b0 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Fri, 31 May 2024 15:22:38 +0800 Subject: [PATCH] when dst table has composite primary key, need to delete result --- source/libs/executor/src/streamtimewindowoperator.c | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 2da9ed0353..46844a2470 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1427,7 +1427,9 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, pBlock->info.window.skey); } - removeDeleteResults(pInfo->pUpdatedMap, pInfo->pDelWins); + if (IS_FINAL_INTERVAL_OP(pOperator) && !pInfo->destHasPrimaryKey) { + removeDeleteResults(pInfo->pUpdatedMap, pInfo->pDelWins); + } if (IS_FINAL_INTERVAL_OP(pOperator)) { closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pInfo->pPullDataMap, pInfo->pUpdatedMap, pInfo->pDelWins, pOperator); @@ -2845,7 +2847,9 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { closeSessionWindow(pAggSup->pResultRows, &pInfo->twAggSup, pInfo->pStUpdated); closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs); copyUpdateResult(&pInfo->pStUpdated, pInfo->pUpdated, sessionKeyCompareAsc); - removeSessionDeleteResults(pInfo->pStDeleted, pInfo->pUpdated); + if (!pInfo->destHasPrimaryKey) { + removeSessionDeleteResults(pInfo->pStDeleted, pInfo->pUpdated); + } if (pInfo->isHistoryOp) { getMaxTsWins(pInfo->pUpdated, pInfo->historyWins); } @@ -4131,7 +4135,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, pBlock->info.window.skey); } pOperator->status = OP_RES_TO_RETURN; - removeDeleteResults(pInfo->pUpdatedMap, pInfo->pDelWins); + if (!pInfo->destHasPrimaryKey) { + removeDeleteResults(pInfo->pUpdatedMap, pInfo->pDelWins); + } closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pInfo->pUpdatedMap, pInfo->pDelWins, pOperator); if (pInfo->destHasPrimaryKey && IS_NORMAL_INTERVAL_OP(pOperator)) {