Merge pull request #24473 from taosdata/fix/TD-28319

set delete for session op
This commit is contained in:
Haojun Liao 2024-01-16 10:02:06 +08:00 committed by GitHub
commit 8f6256dfae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 10 additions and 9 deletions

View File

@ -2893,6 +2893,14 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
goto _error; goto _error;
} }
pInfo->twAggSup = (STimeWindowAggSupp){
.waterMark = pSessionNode->window.watermark,
.calTrigger = pSessionNode->window.triggerType,
.maxTs = INT64_MIN,
.minTs = INT64_MAX,
.deleteMark = getDeleteMark(&pSessionNode->window, 0),
};
code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, pSessionNode->gap, code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, pSessionNode->gap,
pTaskInfo->streamInfo.pState, 0, 0, &pTaskInfo->storageAPI.stateStore, pHandle, pTaskInfo->streamInfo.pState, 0, 0, &pTaskInfo->storageAPI.stateStore, pHandle,
&pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI); &pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI);
@ -2900,13 +2908,6 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
goto _error; goto _error;
} }
pInfo->twAggSup = (STimeWindowAggSupp){
.waterMark = pSessionNode->window.watermark,
.calTrigger = pSessionNode->window.triggerType,
.maxTs = INT64_MIN,
.minTs = INT64_MAX,
};
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
pInfo->primaryTsIndex = ((SColumnNode*)pSessionNode->window.pTspk)->slotId; pInfo->primaryTsIndex = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
@ -3775,6 +3776,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
.calTrigger = pStateNode->window.triggerType, .calTrigger = pStateNode->window.triggerType,
.maxTs = INT64_MIN, .maxTs = INT64_MIN,
.minTs = INT64_MAX, .minTs = INT64_MAX,
.deleteMark = getDeleteMark(&pStateNode->window, 0),
}; };
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);

View File

@ -527,8 +527,7 @@ bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen) {
} }
SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) { SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) {
int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs) ? INT64_MIN int64_t mark = (pFileState->deleteMark == INT64_MAX) ? INT64_MIN : pFileState->maxTs - pFileState->deleteMark;
: pFileState->maxTs - pFileState->deleteMark;
clearExpiredRowBuff(pFileState, mark, false); clearExpiredRowBuff(pFileState, mark, false);
return pFileState->usedBuffs; return pFileState->usedBuffs;
} }