reset update info

This commit is contained in:
54liuyao 2024-01-02 14:16:34 +08:00
parent 4368bfb20a
commit afc2a346e5
3 changed files with 17 additions and 0 deletions

View File

@ -845,6 +845,7 @@ void resetWinRange(STimeWindow* winRange);
bool checkExpiredData(SStateStore* pAPI, SUpdateInfo* pUpdateInfo, STimeWindowAggSupp* pTwSup, uint64_t tableId, TSKEY ts); bool checkExpiredData(SStateStore* pAPI, SUpdateInfo* pUpdateInfo, STimeWindowAggSupp* pTwSup, uint64_t tableId, TSKEY ts);
int64_t getDeleteMark(SWindowPhysiNode* pWinPhyNode, int64_t interval); int64_t getDeleteMark(SWindowPhysiNode* pWinPhyNode, int64_t interval);
void resetUnCloseSessionWinInfo(SSHashObj* winMap); void resetUnCloseSessionWinInfo(SSHashObj* winMap);
void reloadAggSupFromDownStream(struct SOperatorInfo* downstream, SStreamAggSupporter* pAggSup);
int32_t encodeSSessionKey(void** buf, SSessionKey* key); int32_t encodeSSessionKey(void** buf, SSessionKey* key);
void* decodeSSessionKey(void* buf, SSessionKey* key); void* decodeSSessionKey(void* buf, SSessionKey* key);

View File

@ -654,6 +654,7 @@ void streamEventReloadState(SOperatorInfo* pOperator) {
if (downstream->fpSet.reloadStreamStateFn) { if (downstream->fpSet.reloadStreamStateFn) {
downstream->fpSet.reloadStreamStateFn(downstream); downstream->fpSet.reloadStreamStateFn(downstream);
} }
reloadAggSupFromDownStream(downstream, &pInfo->streamAggSup);
} }
SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,

View File

@ -2759,6 +2759,18 @@ void getSessionWindowInfoByKey(SStreamAggSupporter* pAggSup, SSessionKey* pKey,
} }
} }
void reloadAggSupFromDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup) {
SStateStore* pAPI = &downstream->pTaskInfo->storageAPI.stateStore;
if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
reloadAggSupFromDownStream(downstream->pDownstream[0], pAggSup);
return;
}
SStreamScanInfo* pScanInfo = downstream->info;
pAggSup->pUpdateInfo = pScanInfo->pUpdateInfo;
}
void streamSessionSemiReloadState(SOperatorInfo* pOperator) { void streamSessionSemiReloadState(SOperatorInfo* pOperator) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
@ -2790,6 +2802,7 @@ void streamSessionSemiReloadState(SOperatorInfo* pOperator) {
if (downstream->fpSet.reloadStreamStateFn) { if (downstream->fpSet.reloadStreamStateFn) {
downstream->fpSet.reloadStreamStateFn(downstream); downstream->fpSet.reloadStreamStateFn(downstream);
} }
reloadAggSupFromDownStream(downstream, &pInfo->streamAggSup);
} }
void streamSessionReloadState(SOperatorInfo* pOperator) { void streamSessionReloadState(SOperatorInfo* pOperator) {
@ -2842,6 +2855,7 @@ void streamSessionReloadState(SOperatorInfo* pOperator) {
if (downstream->fpSet.reloadStreamStateFn) { if (downstream->fpSet.reloadStreamStateFn) {
downstream->fpSet.reloadStreamStateFn(downstream); downstream->fpSet.reloadStreamStateFn(downstream);
} }
reloadAggSupFromDownStream(downstream, &pInfo->streamAggSup);
} }
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
@ -3724,6 +3738,7 @@ void streamStateReloadState(SOperatorInfo* pOperator) {
if (downstream->fpSet.reloadStreamStateFn) { if (downstream->fpSet.reloadStreamStateFn) {
downstream->fpSet.reloadStreamStateFn(downstream); downstream->fpSet.reloadStreamStateFn(downstream);
} }
reloadAggSupFromDownStream(downstream, &pInfo->streamAggSup);
} }
SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,