when dst table has composite primary key, need to delete result

This commit is contained in:
54liuyao 2024-05-31 15:22:38 +08:00
parent d7ecd98cf9
commit a77c6940da
1 changed files with 9 additions and 3 deletions

View File

@ -1427,7 +1427,9 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, pBlock->info.window.skey); pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, pBlock->info.window.skey);
} }
if (IS_FINAL_INTERVAL_OP(pOperator) && !pInfo->destHasPrimaryKey) {
removeDeleteResults(pInfo->pUpdatedMap, pInfo->pDelWins); removeDeleteResults(pInfo->pUpdatedMap, pInfo->pDelWins);
}
if (IS_FINAL_INTERVAL_OP(pOperator)) { if (IS_FINAL_INTERVAL_OP(pOperator)) {
closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval,
pInfo->pPullDataMap, pInfo->pUpdatedMap, pInfo->pDelWins, pOperator); pInfo->pPullDataMap, pInfo->pUpdatedMap, pInfo->pDelWins, pOperator);
@ -2845,7 +2847,9 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
closeSessionWindow(pAggSup->pResultRows, &pInfo->twAggSup, pInfo->pStUpdated); closeSessionWindow(pAggSup->pResultRows, &pInfo->twAggSup, pInfo->pStUpdated);
closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs); closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs);
copyUpdateResult(&pInfo->pStUpdated, pInfo->pUpdated, sessionKeyCompareAsc); copyUpdateResult(&pInfo->pStUpdated, pInfo->pUpdated, sessionKeyCompareAsc);
if (!pInfo->destHasPrimaryKey) {
removeSessionDeleteResults(pInfo->pStDeleted, pInfo->pUpdated); removeSessionDeleteResults(pInfo->pStDeleted, pInfo->pUpdated);
}
if (pInfo->isHistoryOp) { if (pInfo->isHistoryOp) {
getMaxTsWins(pInfo->pUpdated, pInfo->historyWins); getMaxTsWins(pInfo->pUpdated, pInfo->historyWins);
} }
@ -4131,7 +4135,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, pBlock->info.window.skey); pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, pBlock->info.window.skey);
} }
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
if (!pInfo->destHasPrimaryKey) {
removeDeleteResults(pInfo->pUpdatedMap, pInfo->pDelWins); removeDeleteResults(pInfo->pUpdatedMap, pInfo->pDelWins);
}
closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL,
pInfo->pUpdatedMap, pInfo->pDelWins, pOperator); pInfo->pUpdatedMap, pInfo->pDelWins, pOperator);
if (pInfo->destHasPrimaryKey && IS_NORMAL_INTERVAL_OP(pOperator)) { if (pInfo->destHasPrimaryKey && IS_NORMAL_INTERVAL_OP(pOperator)) {