Merge pull request #25437 from taosdata/fix/TD-29721
fix(stream):save invalid delete info for final interval op
This commit is contained in:
commit
cda0087d40
|
@ -262,7 +262,7 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDa
|
||||||
if (chIds) {
|
if (chIds) {
|
||||||
int32_t childId = getChildIndex(pBlock);
|
int32_t childId = getChildIndex(pBlock);
|
||||||
if (pInvalidWins) {
|
if (pInvalidWins) {
|
||||||
qDebug("===stream===save mid delete window:%" PRId64 ",groupId:%" PRId64 ",chId:%d", winRes.ts, winRes.groupId, childId);
|
qDebug("===stream===save invalid delete window:%" PRId64 ",groupId:%" PRId64 ",chId:%d", winRes.ts, winRes.groupId, childId);
|
||||||
taosHashPut(pInvalidWins, &winRes, sizeof(SWinKey), NULL, 0);
|
taosHashPut(pInvalidWins, &winRes, sizeof(SWinKey), NULL, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -654,11 +654,12 @@ static bool processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFina
|
||||||
.calWin.skey = nextWin.skey,
|
.calWin.skey = nextWin.skey,
|
||||||
.calWin.ekey = nextWin.skey};
|
.calWin.ekey = nextWin.skey};
|
||||||
// add pull data request
|
// add pull data request
|
||||||
qDebug("===stream===prepare final retrive for delete window:%" PRId64 ",groupId%" PRId64 ", size:%d", winRes.ts, winRes.groupId, numOfCh);
|
qDebug("===stream===prepare final retrive for delete window:%" PRId64 ",groupId:%" PRId64 ", size:%d", winRes.ts, winRes.groupId, numOfCh);
|
||||||
if (IS_MID_INTERVAL_OP(pOperator)) {
|
if (IS_MID_INTERVAL_OP(pOperator)) {
|
||||||
SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperator->info;
|
SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperator->info;
|
||||||
taosArrayPush(pInfo->pMidPullDatas, &winRes);
|
taosArrayPush(pInfo->pMidPullDatas, &winRes);
|
||||||
} else if (savePullWindow(&pull, pPullWins) == TSDB_CODE_SUCCESS) {
|
} else if (savePullWindow(&pull, pPullWins) == TSDB_CODE_SUCCESS) {
|
||||||
|
taosArrayPush(pInfo->pDelWins, &winRes);
|
||||||
addPullWindow(pMap, &winRes, numOfCh);
|
addPullWindow(pMap, &winRes, numOfCh);
|
||||||
if (pInfo->destHasPrimaryKey) {
|
if (pInfo->destHasPrimaryKey) {
|
||||||
tSimpleHashPut(pInfo->pDeletedMap,&winRes, sizeof(SWinKey), NULL, 0);
|
tSimpleHashPut(pInfo->pDeletedMap,&winRes, sizeof(SWinKey), NULL, 0);
|
||||||
|
@ -1328,7 +1329,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
|
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
|
||||||
pBlock->info.type == STREAM_CLEAR) {
|
pBlock->info.type == STREAM_CLEAR) {
|
||||||
SArray* delWins = taosArrayInit(8, sizeof(SWinKey));
|
SArray* delWins = taosArrayInit(8, sizeof(SWinKey));
|
||||||
doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap, NULL);
|
SHashObj* finalMap = IS_FINAL_INTERVAL_OP(pOperator) ? pInfo->pFinalPullDataMap : NULL;
|
||||||
|
doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap, finalMap);
|
||||||
if (IS_FINAL_INTERVAL_OP(pOperator)) {
|
if (IS_FINAL_INTERVAL_OP(pOperator)) {
|
||||||
int32_t chId = getChildIndex(pBlock);
|
int32_t chId = getChildIndex(pBlock);
|
||||||
addRetriveWindow(delWins, pInfo, chId);
|
addRetriveWindow(delWins, pInfo, chId);
|
||||||
|
|
Loading…
Reference in New Issue