diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 26c6631ee4..d07a302920 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -319,7 +319,8 @@ typedef struct SDispatchMsgInfo { int32_t retryCount; // retry send data count int64_t startTs; // dispatch start time, record total elapsed time for dispatch SArray* pRetryList; // current dispatch successfully completed node of downstream - void* pTimer; // used to dispatch data after a given time duration + void* pRetryTmr; // used to dispatch data after a given time duration + void* pRspTmr; // used to dispatch data after a given time duration } SDispatchMsgInfo; typedef struct STaskQueue { diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 2fefeb4cf2..f034244c69 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -273,7 +273,7 @@ int32_t tsCompactPullupInterval = 10; int32_t tsMqRebalanceInterval = 2; int32_t tsStreamCheckpointInterval = 60; float tsSinkDataRate = 2.0; -int32_t tsStreamNodeCheckInterval = 16; +int32_t tsStreamNodeCheckInterval = 20; int32_t tsMaxConcurrentCheckpoint = 1; int32_t tsTtlUnit = 86400; int32_t tsTtlPushIntervalSec = 10; diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index c82007fb59..cad8c6d745 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -345,7 +345,7 @@ void mndDoTimerPullupTask(SMnode *pMnode, int64_t sec) { mndCalMqRebalance(pMnode); } - if (sec % 30 == 0) { // send the checkpoint info every 10 sec + if (sec % 30 == 0) { // send the checkpoint info every 30 sec mndStreamCheckpointTimer(pMnode); } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index db4b345536..e108ba557a 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1062,7 +1062,7 @@ static bool taskNodeIsUpdated(SMnode *pMnode) { bool allReady = true; SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allReady); if (!allReady) { - mWarn("not all vnodes ready"); + mWarn("not all vnodes ready, quit from vnodes status check"); taosArrayDestroy(pNodeSnapshot); taosThreadMutexUnlock(&execInfo.lock); return 0; diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 42a4e4e8fb..fb5f1e33c5 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -420,7 +420,7 @@ static void doRetryDispatchData(void* param, void* tmrId) { } } - stDebug("s-task:%s complete re-try shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr, + stDebug("s-task:%s complete retry shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr, numOfFailed, msgId); } else { int32_t vgId = pTask->outputInfo.fixedDispatcher.nodeId; @@ -461,10 +461,10 @@ void streamRetryDispatchData(SStreamTask* pTask, int64_t waitDuration) { stTrace("s-task:%s retry send dispatch data in %" PRId64 "ms, in timer msgId:%d, retryTimes:%d", pTask->id.idStr, waitDuration, pTask->execInfo.dispatch, pTask->msgInfo.retryCount); - if (pTask->msgInfo.pTimer != NULL) { - taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamTimer, &pTask->msgInfo.pTimer); + if (pTask->msgInfo.pRetryTmr != NULL) { + taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamTimer, &pTask->msgInfo.pRetryTmr); } else { - pTask->msgInfo.pTimer = taosTmrStart(doRetryDispatchData, waitDuration, pTask, streamTimer); + pTask->msgInfo.pRetryTmr = taosTmrStart(doRetryDispatchData, waitDuration, pTask, streamTimer); } } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 7fb45a884c..834daf15d0 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -228,9 +228,9 @@ void tFreeStreamTask(SStreamTask* pTask) { pTask->hTaskInfo.pTimer = NULL; } - if (pTask->msgInfo.pTimer != NULL) { - /*bool ret = */taosTmrStop(pTask->msgInfo.pTimer); - pTask->msgInfo.pTimer = NULL; + if (pTask->msgInfo.pRetryTmr != NULL) { + /*bool ret = */taosTmrStop(pTask->msgInfo.pRetryTmr); + pTask->msgInfo.pRetryTmr = NULL; } if (pTask->inputq.queue) { @@ -1004,6 +1004,11 @@ void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) { pInfo->pChkptTriggerTmr = NULL; } + if (pInfo->pSendReadyMsgTmr != NULL) { + taosTmrStop(pInfo->pSendReadyMsgTmr); + pInfo->pSendReadyMsgTmr = NULL; + } + taosMemoryFree(pInfo); }