avoid delete data

This commit is contained in:
liuyao 2023-05-10 09:11:04 +08:00
parent e4e177d7fe
commit 62ccd4bf16
2 changed files with 9 additions and 2 deletions

View File

@ -2565,7 +2565,9 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap); doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap);
if (IS_FINAL_OP(pInfo)) { if (IS_FINAL_OP(pInfo)) {
addRetriveWindow(delWins, pInfo); addRetriveWindow(delWins, pInfo);
taosArrayAddAll(pInfo->pDelWins, delWins); if (pBlock->info.type != STREAM_CLEAR) {
taosArrayAddAll(pInfo->pDelWins, delWins);
}
taosArrayDestroy(delWins); taosArrayDestroy(delWins);
continue; continue;
} }
@ -2577,6 +2579,11 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
if (pInfo->pDelRes->info.rows != 0) { if (pInfo->pDelRes->info.rows != 0) {
// process the rest of the data // process the rest of the data
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
if (pBlock->info.type == STREAM_CLEAR) {
pInfo->pDelRes->info.type = STREAM_CLEAR;
} else {
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;
}
return pInfo->pDelRes; return pInfo->pDelRes;
} }

View File

@ -274,7 +274,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
qDebug("===stream===try agian batchSize:%d", batchSize); qDebug("===stream===try agian batchSize:%d", batchSize);
continue; continue;
} }
qDebug("===stream===break batchSize:%d", batchSize);
break; break;
} }