init delete mark

This commit is contained in:
54liuyao 2023-12-08 16:28:18 +08:00
parent 82e9e26e07
commit 7570fa758b
3 changed files with 20 additions and 7 deletions

View File

@ -636,6 +636,7 @@ typedef struct SStreamEventAggOperatorInfo {
bool isHistoryOp; bool isHistoryOp;
SArray* historyWins; SArray* historyWins;
bool reCkBlock; bool reCkBlock;
bool recvGetAll;
SSDataBlock* pCheckpointRes; SSDataBlock* pCheckpointRes;
SFilterInfo* pStartCondInfo; SFilterInfo* pStartCondInfo;
SFilterInfo* pEndCondInfo; SFilterInfo* pEndCondInfo;
@ -837,6 +838,8 @@ void compactTimeWindow(SExprSupp* pSup, SStreamAggSupporter* pAggSup, STimeW
int32_t releaseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI); int32_t releaseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI);
void resetWinRange(STimeWindow* winRange); void resetWinRange(STimeWindow* winRange);
bool checkExpiredData(SStateStore* pAPI, SUpdateInfo* pUpdateInfo, STimeWindowAggSupp* pTwSup, uint64_t tableId, TSKEY ts); bool checkExpiredData(SStateStore* pAPI, SUpdateInfo* pUpdateInfo, STimeWindowAggSupp* pTwSup, uint64_t tableId, TSKEY ts);
int64_t getDeleteMark(SWindowPhysiNode* pWinPhyNode, int64_t interval);
void resetUnCloseSessionWinInfo(SSHashObj* winMap);
int32_t encodeSSessionKey(void** buf, SSessionKey* key); int32_t encodeSSessionKey(void** buf, SSessionKey* key);
void* decodeSSessionKey(void* buf, SSessionKey* key); void* decodeSSessionKey(void* buf, SSessionKey* key);

View File

@ -486,6 +486,11 @@ static SSDataBlock* doStreamEventAgg(SOperatorInfo* pOperator) {
return pInfo->pCheckpointRes; return pInfo->pCheckpointRes;
} }
if (pInfo->recvGetAll) {
pInfo->recvGetAll = false;
resetUnCloseSessionWinInfo(pInfo->streamAggSup.pResultRows);
}
setOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
@ -510,6 +515,7 @@ static SSDataBlock* doStreamEventAgg(SOperatorInfo* pOperator) {
deleteSessionWinState(&pInfo->streamAggSup, pBlock, pInfo->pSeUpdated, pInfo->pSeDeleted); deleteSessionWinState(&pInfo->streamAggSup, pBlock, pInfo->pSeUpdated, pInfo->pSeDeleted);
continue; continue;
} else if (pBlock->info.type == STREAM_GET_ALL) { } else if (pBlock->info.type == STREAM_GET_ALL) {
pInfo->recvGetAll = true;
getAllSessionWindow(pInfo->streamAggSup.pResultRows, pInfo->pSeUpdated); getAllSessionWindow(pInfo->streamAggSup.pResultRows, pInfo->pSeUpdated);
continue; continue;
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
@ -672,6 +678,7 @@ SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhys
.calTrigger = pEventNode->window.triggerType, .calTrigger = pEventNode->window.triggerType,
.maxTs = INT64_MIN, .maxTs = INT64_MIN,
.minTs = INT64_MAX, .minTs = INT64_MAX,
.deleteMark = getDeleteMark(&pEventNode->window, 0),
}; };
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
@ -720,6 +727,7 @@ SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
pInfo->reCkBlock = false; pInfo->reCkBlock = false;
pInfo->recvGetAll = false;
// for stream // for stream
void* buff = NULL; void* buff = NULL;

View File

@ -1345,12 +1345,12 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
return buildIntervalResult(pOperator); return buildIntervalResult(pOperator);
} }
static int64_t getDeleteMark(SIntervalPhysiNode* pIntervalPhyNode) { int64_t getDeleteMark(SWindowPhysiNode* pWinPhyNode, int64_t interval) {
if (pIntervalPhyNode->window.deleteMark <= 0) { if (pWinPhyNode->deleteMark <= 0) {
return DEAULT_DELETE_MARK; return DEAULT_DELETE_MARK;
} }
int64_t deleteMark = TMAX(pIntervalPhyNode->window.deleteMark, pIntervalPhyNode->window.watermark); int64_t deleteMark = TMAX(pWinPhyNode->deleteMark, pWinPhyNode->watermark);
deleteMark = TMAX(deleteMark, pIntervalPhyNode->interval); deleteMark = TMAX(deleteMark, interval);
return deleteMark; return deleteMark;
} }
@ -1442,7 +1442,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
.calTrigger = pIntervalPhyNode->window.triggerType, .calTrigger = pIntervalPhyNode->window.triggerType,
.maxTs = INT64_MIN, .maxTs = INT64_MIN,
.minTs = INT64_MAX, .minTs = INT64_MAX,
.deleteMark = getDeleteMark(pIntervalPhyNode), .deleteMark = getDeleteMark(&pIntervalPhyNode->window, pIntervalPhyNode->interval),
.deleteMarkSaved = 0, .deleteMarkSaved = 0,
.calTriggerSaved = 0, .calTriggerSaved = 0,
}; };
@ -2565,7 +2565,7 @@ void doStreamSessionSaveCheckpoint(SOperatorInfo* pOperator) {
taosMemoryFree(buf); taosMemoryFree(buf);
} }
static void resetUnCloseSessionWinInfo(SSHashObj* winMap) { void resetUnCloseSessionWinInfo(SSHashObj* winMap) {
void* pIte = NULL; void* pIte = NULL;
int32_t iter = 0; int32_t iter = 0;
while ((pIte = tSimpleHashIterate(winMap, pIte, &iter)) != NULL) { while ((pIte = tSimpleHashIterate(winMap, pIte, &iter)) != NULL) {
@ -2864,6 +2864,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
.calTrigger = pSessionNode->window.triggerType, .calTrigger = pSessionNode->window.triggerType,
.maxTs = INT64_MIN, .maxTs = INT64_MIN,
.minTs = INT64_MAX, .minTs = INT64_MAX,
.deleteMark = getDeleteMark(&pSessionNode->window, 0),
}; };
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
@ -3732,6 +3733,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
.calTrigger = pStateNode->window.triggerType, .calTrigger = pStateNode->window.triggerType,
.maxTs = INT64_MIN, .maxTs = INT64_MIN,
.minTs = INT64_MAX, .minTs = INT64_MAX,
.deleteMark = getDeleteMark(&pStateNode->window, 0),
}; };
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
@ -3963,7 +3965,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
.calTrigger = pIntervalPhyNode->window.triggerType, .calTrigger = pIntervalPhyNode->window.triggerType,
.maxTs = INT64_MIN, .maxTs = INT64_MIN,
.minTs = INT64_MAX, .minTs = INT64_MAX,
.deleteMark = getDeleteMark(pIntervalPhyNode)}; .deleteMark = getDeleteMark(&pIntervalPhyNode->window, pIntervalPhyNode->interval)};
ASSERTS(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY, "trigger type should not be max delay"); ASSERTS(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY, "trigger type should not be max delay");