From 1d435faaa2530011173d735e11dcc61fa6e8b7f2 Mon Sep 17 00:00:00 2001 From: dmchen Date: Mon, 3 Mar 2025 17:42:19 +0800 Subject: [PATCH] ehn:TD-33933-decouple-send-heartbeat --- include/util/ttimer.h | 3 +++ source/libs/sync/src/syncMain.c | 12 +++++------ source/util/src/tsched.c | 2 ++ source/util/src/ttimer.c | 38 +++++++++++++++++++++++++-------- 4 files changed, 40 insertions(+), 15 deletions(-) diff --git a/include/util/ttimer.h b/include/util/ttimer.h index 53a8f0a19f..3c0b716f58 100644 --- a/include/util/ttimer.h +++ b/include/util/ttimer.h @@ -43,6 +43,9 @@ bool taosTmrIsStopped(tmr_h* timerId); bool taosTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void *param, void *handle, tmr_h *pTmrId); +bool taosTmrResetPriority(TAOS_TMR_CALLBACK fp, int32_t mseconds, void *param, void *handle, tmr_h *pTmrId, + uint8_t priority); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index cb7eb59bd0..7ee29d278c 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1014,8 +1014,8 @@ static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { sTrace("vgId:%d, start hb timer, rid:%" PRId64 " addr:%" PRId64 " at %d", pSyncNode->vgId, pData->rid, pData->destId.addr, pSyncTimer->timerMS); - bool stopped = taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS, (void*)(pData->rid), syncEnv()->pTimerManager, - &pSyncTimer->pTimer); + bool stopped = taosTmrResetPriority(pSyncTimer->timerCb, pSyncTimer->timerMS, (void*)(pData->rid), + syncEnv()->pTimerManager, &pSyncTimer->pTimer, 2); if (stopped) { sError("vgId:%d, failed to reset hb timer success", pSyncNode->vgId); return TSDB_CODE_SYN_INTERNAL_ERROR; @@ -1663,8 +1663,8 @@ ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->raftCfg int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) { int32_t code = 0; if (syncIsInit()) { - bool stopped = taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, (void*)pSyncNode->rid, - syncEnv()->pTimerManager, &pSyncNode->pPingTimer); + bool stopped = taosTmrResetPriority(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, (void*)pSyncNode->rid, + syncEnv()->pTimerManager, &pSyncNode->pPingTimer, 2); if (stopped) { sError("vgId:%d, failed to reset ping timer, ms:%d", pSyncNode->vgId, pSyncNode->pingTimerMS); return TSDB_CODE_SYN_INTERNAL_ERROR; @@ -2801,8 +2801,8 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { if (syncIsInit()) { sTrace("vgId:%d, reset peer hb timer at %d", pSyncNode->vgId, pSyncTimer->timerMS); - bool stopped = taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, (void*)hbDataRid, - syncEnv()->pTimerManager, &pSyncTimer->pTimer); + bool stopped = taosTmrResetPriority(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, (void*)hbDataRid, + syncEnv()->pTimerManager, &pSyncTimer->pTimer, 2); if (stopped) sError("vgId:%d, reset peer hb timer error, %s", pSyncNode->vgId, tstrerror(code)); } else { diff --git a/source/util/src/tsched.c b/source/util/src/tsched.c index 8c708ac6b5..108168db55 100644 --- a/source/util/src/tsched.c +++ b/source/util/src/tsched.c @@ -147,6 +147,8 @@ void *taosProcessSchedQueue(void *scheduler) { char name[16] = {0}; snprintf(name, tListLen(name), "%s-taskQ", pSched->label); setThreadName(name); + int64_t pid = taosGetSelfPthreadId(); + uInfo("scheduler %s is started, thread:%" PRId64, pSched->label, pid); while (1) { if ((ret = tsem_wait(&pSched->fullSem)) != 0) { diff --git a/source/util/src/ttimer.c b/source/util/src/ttimer.c index ec300c5206..d6f47fb81b 100644 --- a/source/util/src/ttimer.c +++ b/source/util/src/ttimer.c @@ -89,6 +89,7 @@ typedef struct tmr_obj_t { }; TAOS_TMR_CALLBACK fp; void* param; + uint8_t priority; } tmr_obj_t; typedef struct timer_list_t { @@ -118,6 +119,7 @@ static TdThreadMutex tmrCtrlMutex; static tmr_ctrl_t* tmrCtrls; static tmr_ctrl_t* unusedTmrCtrl = NULL; static void* tmrQhandle; +static void* tmrQhandleHigh; static int32_t numOfTmrCtrl = 0; int32_t taosTmrThreads = 1; @@ -316,22 +318,30 @@ static void addToExpired(tmr_obj_t* head) { schedMsg.msg = NULL; schedMsg.ahandle = head; schedMsg.thandle = NULL; - if (taosScheduleTask(tmrQhandle, &schedMsg) != 0) { - tmrError("%s failed to add expired timer[id=%" PRIuPTR "] to queue.", head->ctrl->label, id); + if (head->priority == 1) { + if (taosScheduleTask(tmrQhandle, &schedMsg) != 0) { + tmrError("%s failed to add expired timer[id=%" PRIuPTR "] to queue.", head->ctrl->label, id); + } + } else if (head->priority == 2) { + if (taosScheduleTask(tmrQhandleHigh, &schedMsg) != 0) { + tmrError("%s failed to add expired timer[id=%" PRIuPTR "] to high level queue.", head->ctrl->label, id); + } } - tmrDebug("timer[id=%" PRIuPTR "] has been added to queue.", id); + tmrDebug("timer[id=%" PRIuPTR "] has been added to queue priority:%d.", id, head->priority); head = next; } } -static uintptr_t doStartTimer(tmr_obj_t* timer, TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, tmr_ctrl_t* ctrl) { +static uintptr_t doStartTimer(tmr_obj_t* timer, TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, tmr_ctrl_t* ctrl, + uint8_t priority) { uintptr_t id = getNextTimerId(); timer->id = id; timer->state = TIMER_STATE_WAITING; timer->fp = fp; timer->param = param; timer->ctrl = ctrl; + timer->priority = priority; addTimer(timer); const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] started"; @@ -349,7 +359,7 @@ static uintptr_t doStartTimer(tmr_obj_t* timer, TAOS_TMR_CALLBACK fp, int32_t ms return id; } -tmr_h taosTmrStart(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle) { +tmr_h taosTmrStartPriority(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, uint8_t priority) { tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle; if (ctrl == NULL || ctrl->label[0] == 0) { return NULL; @@ -361,7 +371,11 @@ tmr_h taosTmrStart(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* ha return NULL; } - return (tmr_h)doStartTimer(timer, fp, mseconds, param, ctrl); + return (tmr_h)doStartTimer(timer, fp, mseconds, param, ctrl, priority); +} + +tmr_h taosTmrStart(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle) { + return taosTmrStartPriority(fp, mseconds, param, handle, 1); } static void taosTimerLoopFunc(int32_t signo) { @@ -488,7 +502,8 @@ bool taosTmrIsStopped(tmr_h* timerId) { return (state == TIMER_STATE_CANCELED) || (state == TIMER_STATE_STOPPED); } -bool taosTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, tmr_h* pTmrId) { +bool taosTmrResetPriority(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, tmr_h* pTmrId, + uint8_t priority) { tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle; if (ctrl == NULL || ctrl->label[0] == 0) { return false; @@ -509,7 +524,7 @@ bool taosTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* han } if (timer == NULL) { - *pTmrId = taosTmrStart(fp, mseconds, param, handle); + *pTmrId = taosTmrStartPriority(fp, mseconds, param, handle, priority); if (NULL == *pTmrId) { stopped = true; } @@ -530,11 +545,15 @@ bool taosTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* han uError("timer refCount=%d not expected 1", timer->refCount); } memset(timer, 0, sizeof(*timer)); - *pTmrId = (tmr_h)doStartTimer(timer, fp, mseconds, param, ctrl); + *pTmrId = (tmr_h)doStartTimer(timer, fp, mseconds, param, ctrl, priority); return stopped; } +bool taosTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, tmr_h* pTmrId) { + return taosTmrResetPriority(fp, mseconds, param, handle, pTmrId, 1); +} + static int32_t taosTmrModuleInit(void) { tmrCtrls = taosMemoryMalloc(sizeof(tmr_ctrl_t) * tsMaxTmrCtrl); if (tmrCtrls == NULL) { @@ -578,6 +597,7 @@ static int32_t taosTmrModuleInit(void) { } tmrQhandle = taosInitScheduler(10000, taosTmrThreads, "tmr", NULL); + tmrQhandleHigh = taosInitScheduler(10000, taosTmrThreads, "high-tmr", NULL); if (taosInitTimer(taosTimerLoopFunc, MSECONDS_PER_TICK) != 0) { tmrError("failed to initialize timer"); }