fix(stream):add jump condition for loop.
This commit is contained in:
commit
29afc2ad6a
|
@ -39,7 +39,7 @@ int32_t tqScanWal(STQ* pTq) {
|
||||||
bool shouldIdle = true;
|
bool shouldIdle = true;
|
||||||
doScanWalForAllTasks(pTq->pStreamMeta, &shouldIdle);
|
doScanWalForAllTasks(pTq->pStreamMeta, &shouldIdle);
|
||||||
|
|
||||||
if (shouldIdle) {
|
// if (shouldIdle) {
|
||||||
streamMetaWLock(pMeta);
|
streamMetaWLock(pMeta);
|
||||||
int32_t times = (--pMeta->walScanCounter);
|
int32_t times = (--pMeta->walScanCounter);
|
||||||
ASSERT(pMeta->walScanCounter >= 0);
|
ASSERT(pMeta->walScanCounter >= 0);
|
||||||
|
@ -50,7 +50,7 @@ int32_t tqScanWal(STQ* pTq) {
|
||||||
} else {
|
} else {
|
||||||
tqDebug("vgId:%d scan wal for stream tasks for %d times in %dms", vgId, times, SCAN_WAL_IDLE_DURATION);
|
tqDebug("vgId:%d scan wal for stream tasks for %d times in %dms", vgId, times, SCAN_WAL_IDLE_DURATION);
|
||||||
}
|
}
|
||||||
}
|
// }
|
||||||
|
|
||||||
taosMsleep(SCAN_WAL_IDLE_DURATION);
|
taosMsleep(SCAN_WAL_IDLE_DURATION);
|
||||||
}
|
}
|
||||||
|
|
|
@ -846,6 +846,7 @@ bool checkExpiredData(SStateStore* pAPI, SUpdateInfo* pUpdateInfo, STimeWind
|
||||||
int64_t getDeleteMark(SWindowPhysiNode* pWinPhyNode, int64_t interval);
|
int64_t getDeleteMark(SWindowPhysiNode* pWinPhyNode, int64_t interval);
|
||||||
void resetUnCloseSessionWinInfo(SSHashObj* winMap);
|
void resetUnCloseSessionWinInfo(SSHashObj* winMap);
|
||||||
void setStreamOperatorCompleted(struct SOperatorInfo* pOperator);
|
void setStreamOperatorCompleted(struct SOperatorInfo* pOperator);
|
||||||
|
void reloadAggSupFromDownStream(struct SOperatorInfo* downstream, SStreamAggSupporter* pAggSup);
|
||||||
|
|
||||||
int32_t encodeSSessionKey(void** buf, SSessionKey* key);
|
int32_t encodeSSessionKey(void** buf, SSessionKey* key);
|
||||||
void* decodeSSessionKey(void* buf, SSessionKey* key);
|
void* decodeSSessionKey(void* buf, SSessionKey* key);
|
||||||
|
|
|
@ -654,6 +654,7 @@ void streamEventReloadState(SOperatorInfo* pOperator) {
|
||||||
if (downstream->fpSet.reloadStreamStateFn) {
|
if (downstream->fpSet.reloadStreamStateFn) {
|
||||||
downstream->fpSet.reloadStreamStateFn(downstream);
|
downstream->fpSet.reloadStreamStateFn(downstream);
|
||||||
}
|
}
|
||||||
|
reloadAggSupFromDownStream(downstream, &pInfo->streamAggSup);
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
|
SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
|
||||||
|
|
|
@ -2761,6 +2761,18 @@ void getSessionWindowInfoByKey(SStreamAggSupporter* pAggSup, SSessionKey* pKey,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void reloadAggSupFromDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup) {
|
||||||
|
SStateStore* pAPI = &downstream->pTaskInfo->storageAPI.stateStore;
|
||||||
|
|
||||||
|
if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||||
|
reloadAggSupFromDownStream(downstream->pDownstream[0], pAggSup);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
SStreamScanInfo* pScanInfo = downstream->info;
|
||||||
|
pAggSup->pUpdateInfo = pScanInfo->pUpdateInfo;
|
||||||
|
}
|
||||||
|
|
||||||
void streamSessionSemiReloadState(SOperatorInfo* pOperator) {
|
void streamSessionSemiReloadState(SOperatorInfo* pOperator) {
|
||||||
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
||||||
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
||||||
|
@ -2792,6 +2804,7 @@ void streamSessionSemiReloadState(SOperatorInfo* pOperator) {
|
||||||
if (downstream->fpSet.reloadStreamStateFn) {
|
if (downstream->fpSet.reloadStreamStateFn) {
|
||||||
downstream->fpSet.reloadStreamStateFn(downstream);
|
downstream->fpSet.reloadStreamStateFn(downstream);
|
||||||
}
|
}
|
||||||
|
reloadAggSupFromDownStream(downstream, &pInfo->streamAggSup);
|
||||||
}
|
}
|
||||||
|
|
||||||
void streamSessionReloadState(SOperatorInfo* pOperator) {
|
void streamSessionReloadState(SOperatorInfo* pOperator) {
|
||||||
|
@ -2844,6 +2857,7 @@ void streamSessionReloadState(SOperatorInfo* pOperator) {
|
||||||
if (downstream->fpSet.reloadStreamStateFn) {
|
if (downstream->fpSet.reloadStreamStateFn) {
|
||||||
downstream->fpSet.reloadStreamStateFn(downstream);
|
downstream->fpSet.reloadStreamStateFn(downstream);
|
||||||
}
|
}
|
||||||
|
reloadAggSupFromDownStream(downstream, &pInfo->streamAggSup);
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
|
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
|
||||||
|
@ -3726,6 +3740,7 @@ void streamStateReloadState(SOperatorInfo* pOperator) {
|
||||||
if (downstream->fpSet.reloadStreamStateFn) {
|
if (downstream->fpSet.reloadStreamStateFn) {
|
||||||
downstream->fpSet.reloadStreamStateFn(downstream);
|
downstream->fpSet.reloadStreamStateFn(downstream);
|
||||||
}
|
}
|
||||||
|
reloadAggSupFromDownStream(downstream, &pInfo->streamAggSup);
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
|
SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
|
||||||
|
|
Loading…
Reference in New Issue