init delete mark

This commit is contained in:
54liuyao 2024-06-03 16:41:38 +08:00
parent ca7e1344e1
commit 1ad25599ca
1 changed files with 8 additions and 7 deletions

View File

@ -677,6 +677,14 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys
goto _error; goto _error;
} }
pInfo->twAggSup = (STimeWindowAggSupp){
.waterMark = pCountNode->window.watermark,
.calTrigger = pCountNode->window.triggerType,
.maxTs = INT64_MIN,
.minTs = INT64_MAX,
.deleteMark = getDeleteMark(&pCountNode->window, 0),
};
pInfo->primaryTsIndex = ((SColumnNode*)pCountNode->window.pTspk)->slotId; pInfo->primaryTsIndex = ((SColumnNode*)pCountNode->window.pTspk)->slotId;
code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0,
pTaskInfo->streamInfo.pState, sizeof(COUNT_TYPE), 0, &pTaskInfo->storageAPI.stateStore, pHandle, pTaskInfo->streamInfo.pState, sizeof(COUNT_TYPE), 0, &pTaskInfo->storageAPI.stateStore, pHandle,
@ -687,13 +695,6 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->streamAggSup.windowCount = pCountNode->windowCount; pInfo->streamAggSup.windowCount = pCountNode->windowCount;
pInfo->streamAggSup.windowSliding = pCountNode->windowSliding; pInfo->streamAggSup.windowSliding = pCountNode->windowSliding;
pInfo->twAggSup = (STimeWindowAggSupp){
.waterMark = pCountNode->window.watermark,
.calTrigger = pCountNode->window.triggerType,
.maxTs = INT64_MIN,
.minTs = INT64_MAX,
};
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
pInfo->binfo.pRes = pResBlock; pInfo->binfo.pRes = pResBlock;