diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 2bb2609ae9..aba3b0945f 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -39,7 +39,7 @@ int32_t tqScanWal(STQ* pTq) { bool shouldIdle = true; doScanWalForAllTasks(pTq->pStreamMeta, &shouldIdle); - if (shouldIdle) { +// if (shouldIdle) { streamMetaWLock(pMeta); int32_t times = (--pMeta->walScanCounter); ASSERT(pMeta->walScanCounter >= 0); @@ -50,7 +50,7 @@ int32_t tqScanWal(STQ* pTq) { } else { tqDebug("vgId:%d scan wal for stream tasks for %d times in %dms", vgId, times, SCAN_WAL_IDLE_DURATION); } - } +// } taosMsleep(SCAN_WAL_IDLE_DURATION); } diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index e98777544b..64235fff0c 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -846,6 +846,7 @@ bool checkExpiredData(SStateStore* pAPI, SUpdateInfo* pUpdateInfo, STimeWind int64_t getDeleteMark(SWindowPhysiNode* pWinPhyNode, int64_t interval); void resetUnCloseSessionWinInfo(SSHashObj* winMap); void setStreamOperatorCompleted(struct SOperatorInfo* pOperator); +void reloadAggSupFromDownStream(struct SOperatorInfo* downstream, SStreamAggSupporter* pAggSup); int32_t encodeSSessionKey(void** buf, SSessionKey* key); void* decodeSSessionKey(void* buf, SSessionKey* key); diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 97a0f5b147..8aca76597b 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -654,6 +654,7 @@ void streamEventReloadState(SOperatorInfo* pOperator) { if (downstream->fpSet.reloadStreamStateFn) { downstream->fpSet.reloadStreamStateFn(downstream); } + reloadAggSupFromDownStream(downstream, &pInfo->streamAggSup); } SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index e79016bcff..b2d0f25466 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -2761,6 +2761,18 @@ void getSessionWindowInfoByKey(SStreamAggSupporter* pAggSup, SSessionKey* pKey, } } +void reloadAggSupFromDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup) { + SStateStore* pAPI = &downstream->pTaskInfo->storageAPI.stateStore; + + if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { + reloadAggSupFromDownStream(downstream->pDownstream[0], pAggSup); + return; + } + + SStreamScanInfo* pScanInfo = downstream->info; + pAggSup->pUpdateInfo = pScanInfo->pUpdateInfo; +} + void streamSessionSemiReloadState(SOperatorInfo* pOperator) { SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; @@ -2792,6 +2804,7 @@ void streamSessionSemiReloadState(SOperatorInfo* pOperator) { if (downstream->fpSet.reloadStreamStateFn) { downstream->fpSet.reloadStreamStateFn(downstream); } + reloadAggSupFromDownStream(downstream, &pInfo->streamAggSup); } void streamSessionReloadState(SOperatorInfo* pOperator) { @@ -2844,6 +2857,7 @@ void streamSessionReloadState(SOperatorInfo* pOperator) { if (downstream->fpSet.reloadStreamStateFn) { downstream->fpSet.reloadStreamStateFn(downstream); } + reloadAggSupFromDownStream(downstream, &pInfo->streamAggSup); } SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, @@ -3726,6 +3740,7 @@ void streamStateReloadState(SOperatorInfo* pOperator) { if (downstream->fpSet.reloadStreamStateFn) { downstream->fpSet.reloadStreamStateFn(downstream); } + reloadAggSupFromDownStream(downstream, &pInfo->streamAggSup); } SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,