refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-06-04 10:08:18 +08:00
parent 407f79cfa6
commit 4fc21cc8b0
6 changed files with 17 additions and 11 deletions

View File

@ -319,7 +319,8 @@ typedef struct SDispatchMsgInfo {
int32_t retryCount; // retry send data count
int64_t startTs; // dispatch start time, record total elapsed time for dispatch
SArray* pRetryList; // current dispatch successfully completed node of downstream
void* pTimer; // used to dispatch data after a given time duration
void* pRetryTmr; // used to dispatch data after a given time duration
void* pRspTmr; // used to dispatch data after a given time duration
} SDispatchMsgInfo;
typedef struct STaskQueue {

View File

@ -273,7 +273,7 @@ int32_t tsCompactPullupInterval = 10;
int32_t tsMqRebalanceInterval = 2;
int32_t tsStreamCheckpointInterval = 60;
float tsSinkDataRate = 2.0;
int32_t tsStreamNodeCheckInterval = 16;
int32_t tsStreamNodeCheckInterval = 20;
int32_t tsMaxConcurrentCheckpoint = 1;
int32_t tsTtlUnit = 86400;
int32_t tsTtlPushIntervalSec = 10;

View File

@ -345,7 +345,7 @@ void mndDoTimerPullupTask(SMnode *pMnode, int64_t sec) {
mndCalMqRebalance(pMnode);
}
if (sec % 30 == 0) { // send the checkpoint info every 10 sec
if (sec % 30 == 0) { // send the checkpoint info every 30 sec
mndStreamCheckpointTimer(pMnode);
}

View File

@ -1062,7 +1062,7 @@ static bool taskNodeIsUpdated(SMnode *pMnode) {
bool allReady = true;
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allReady);
if (!allReady) {
mWarn("not all vnodes ready");
mWarn("not all vnodes ready, quit from vnodes status check");
taosArrayDestroy(pNodeSnapshot);
taosThreadMutexUnlock(&execInfo.lock);
return 0;

View File

@ -420,7 +420,7 @@ static void doRetryDispatchData(void* param, void* tmrId) {
}
}
stDebug("s-task:%s complete re-try shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr,
stDebug("s-task:%s complete retry shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr,
numOfFailed, msgId);
} else {
int32_t vgId = pTask->outputInfo.fixedDispatcher.nodeId;
@ -461,10 +461,10 @@ void streamRetryDispatchData(SStreamTask* pTask, int64_t waitDuration) {
stTrace("s-task:%s retry send dispatch data in %" PRId64 "ms, in timer msgId:%d, retryTimes:%d", pTask->id.idStr,
waitDuration, pTask->execInfo.dispatch, pTask->msgInfo.retryCount);
if (pTask->msgInfo.pTimer != NULL) {
taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamTimer, &pTask->msgInfo.pTimer);
if (pTask->msgInfo.pRetryTmr != NULL) {
taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamTimer, &pTask->msgInfo.pRetryTmr);
} else {
pTask->msgInfo.pTimer = taosTmrStart(doRetryDispatchData, waitDuration, pTask, streamTimer);
pTask->msgInfo.pRetryTmr = taosTmrStart(doRetryDispatchData, waitDuration, pTask, streamTimer);
}
}

View File

@ -228,9 +228,9 @@ void tFreeStreamTask(SStreamTask* pTask) {
pTask->hTaskInfo.pTimer = NULL;
}
if (pTask->msgInfo.pTimer != NULL) {
/*bool ret = */taosTmrStop(pTask->msgInfo.pTimer);
pTask->msgInfo.pTimer = NULL;
if (pTask->msgInfo.pRetryTmr != NULL) {
/*bool ret = */taosTmrStop(pTask->msgInfo.pRetryTmr);
pTask->msgInfo.pRetryTmr = NULL;
}
if (pTask->inputq.queue) {
@ -1004,6 +1004,11 @@ void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) {
pInfo->pChkptTriggerTmr = NULL;
}
if (pInfo->pSendReadyMsgTmr != NULL) {
taosTmrStop(pInfo->pSendReadyMsgTmr);
pInfo->pSendReadyMsgTmr = NULL;
}
taosMemoryFree(pInfo);
}