Merge pull request #29989 from taosdata/ehn-TD-33933-decouple-send-heartbeat

ehn:TD-33933-decouple-send-heartbeat
This commit is contained in:
Simon Guan 2025-03-05 18:55:09 +08:00 committed by GitHub
commit 393f49141f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 48 additions and 15 deletions

View File

@ -43,6 +43,9 @@ bool taosTmrIsStopped(tmr_h* timerId);
bool taosTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void *param, void *handle, tmr_h *pTmrId);
bool taosTmrResetPriority(TAOS_TMR_CALLBACK fp, int32_t mseconds, void *param, void *handle, tmr_h *pTmrId,
uint8_t priority);
#ifdef __cplusplus
}
#endif

View File

@ -1014,8 +1014,8 @@ static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
sTrace("vgId:%d, start hb timer, rid:%" PRId64 " addr:%" PRId64 " at %d", pSyncNode->vgId, pData->rid,
pData->destId.addr, pSyncTimer->timerMS);
bool stopped = taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS, (void*)(pData->rid), syncEnv()->pTimerManager,
&pSyncTimer->pTimer);
bool stopped = taosTmrResetPriority(pSyncTimer->timerCb, pSyncTimer->timerMS, (void*)(pData->rid),
syncEnv()->pTimerManager, &pSyncTimer->pTimer, 2);
if (stopped) {
sError("vgId:%d, failed to reset hb timer success", pSyncNode->vgId);
return TSDB_CODE_SYN_INTERNAL_ERROR;
@ -1663,8 +1663,8 @@ ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->raftCfg
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
int32_t code = 0;
if (syncIsInit()) {
bool stopped = taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, (void*)pSyncNode->rid,
syncEnv()->pTimerManager, &pSyncNode->pPingTimer);
bool stopped = taosTmrResetPriority(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, (void*)pSyncNode->rid,
syncEnv()->pTimerManager, &pSyncNode->pPingTimer, 2);
if (stopped) {
sError("vgId:%d, failed to reset ping timer, ms:%d", pSyncNode->vgId, pSyncNode->pingTimerMS);
return TSDB_CODE_SYN_INTERNAL_ERROR;
@ -2801,8 +2801,8 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
if (syncIsInit()) {
sTrace("vgId:%d, reset peer hb timer at %d", pSyncNode->vgId, pSyncTimer->timerMS);
bool stopped = taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, (void*)hbDataRid,
syncEnv()->pTimerManager, &pSyncTimer->pTimer);
bool stopped = taosTmrResetPriority(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, (void*)hbDataRid,
syncEnv()->pTimerManager, &pSyncTimer->pTimer, 2);
if (stopped) sError("vgId:%d, reset peer hb timer error, %s", pSyncNode->vgId, tstrerror(code));
} else {

View File

@ -147,6 +147,8 @@ void *taosProcessSchedQueue(void *scheduler) {
char name[16] = {0};
snprintf(name, tListLen(name), "%s-taskQ", pSched->label);
setThreadName(name);
int64_t pid = taosGetSelfPthreadId();
uInfo("scheduler %s is started, thread:%" PRId64, name, pid);
while (1) {
if ((ret = tsem_wait(&pSched->fullSem)) != 0) {

View File

@ -61,6 +61,7 @@ typedef struct tmr_obj_t {
};
TAOS_TMR_CALLBACK fp;
void* param;
uint8_t priority;
} tmr_obj_t;
typedef struct timer_list_t {
@ -90,6 +91,7 @@ static TdThreadMutex tmrCtrlMutex;
static tmr_ctrl_t* tmrCtrls;
static tmr_ctrl_t* unusedTmrCtrl = NULL;
static void* tmrQhandle;
static void* tmrQhandleHigh;
static int32_t numOfTmrCtrl = 0;
int32_t taosTmrThreads = 1;
@ -288,22 +290,35 @@ static void addToExpired(tmr_obj_t* head) {
schedMsg.msg = NULL;
schedMsg.ahandle = head;
schedMsg.thandle = NULL;
if (taosScheduleTask(tmrQhandle, &schedMsg) != 0) {
tmrError("%s failed to add expired timer[id=%" PRIuPTR "] to queue.", head->ctrl->label, id);
uint8_t priority = head->priority;
if (priority == 1) {
if (taosScheduleTask(tmrQhandle, &schedMsg) != 0) {
tmrError("%s failed to add expired timer[id=%" PRIuPTR "] to queue.", head->ctrl->label, id);
}
} else if (priority == 2) {
if (taosScheduleTask(tmrQhandleHigh, &schedMsg) != 0) {
tmrError("%s failed to add expired timer[id=%" PRIuPTR "] to high level queue.", head->ctrl->label, id);
}
}
else{
tmrError("%s invalid priority level %d for timer[id=%" PRIuPTR "].", head->ctrl->label, priority, id);
}
tmrDebug("timer[id=%" PRIuPTR "] has been added to queue.", id);
tmrDebug("timer[id=%" PRIuPTR "] has been added to queue priority:%d.", id, priority);
head = next;
}
}
static uintptr_t doStartTimer(tmr_obj_t* timer, TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, tmr_ctrl_t* ctrl) {
static uintptr_t doStartTimer(tmr_obj_t* timer, TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, tmr_ctrl_t* ctrl,
uint8_t priority) {
uintptr_t id = getNextTimerId();
timer->id = id;
timer->state = TIMER_STATE_WAITING;
timer->fp = fp;
timer->param = param;
timer->ctrl = ctrl;
timer->priority = priority;
addTimer(timer);
const char* fmt = "%s timer[id=%" PRIuPTR ", fp=%p, param=%p] started";
@ -321,7 +336,7 @@ static uintptr_t doStartTimer(tmr_obj_t* timer, TAOS_TMR_CALLBACK fp, int32_t ms
return id;
}
tmr_h taosTmrStart(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle) {
tmr_h taosTmrStartPriority(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, uint8_t priority) {
tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
if (ctrl == NULL || ctrl->label[0] == 0) {
return NULL;
@ -333,7 +348,11 @@ tmr_h taosTmrStart(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* ha
return NULL;
}
return (tmr_h)doStartTimer(timer, fp, mseconds, param, ctrl);
return (tmr_h)doStartTimer(timer, fp, mseconds, param, ctrl, priority);
}
tmr_h taosTmrStart(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle) {
return taosTmrStartPriority(fp, mseconds, param, handle, 1);
}
static void taosTimerLoopFunc(int32_t signo) {
@ -460,7 +479,8 @@ bool taosTmrIsStopped(tmr_h* timerId) {
return (state == TIMER_STATE_CANCELED) || (state == TIMER_STATE_STOPPED);
}
bool taosTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, tmr_h* pTmrId) {
bool taosTmrResetPriority(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, tmr_h* pTmrId,
uint8_t priority) {
tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
if (ctrl == NULL || ctrl->label[0] == 0) {
return false;
@ -481,7 +501,7 @@ bool taosTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* han
}
if (timer == NULL) {
*pTmrId = taosTmrStart(fp, mseconds, param, handle);
*pTmrId = taosTmrStartPriority(fp, mseconds, param, handle, priority);
if (NULL == *pTmrId) {
stopped = true;
}
@ -502,11 +522,15 @@ bool taosTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* han
uError("timer refCount=%d not expected 1", timer->refCount);
}
memset(timer, 0, sizeof(*timer));
*pTmrId = (tmr_h)doStartTimer(timer, fp, mseconds, param, ctrl);
*pTmrId = (tmr_h)doStartTimer(timer, fp, mseconds, param, ctrl, priority);
return stopped;
}
bool taosTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, tmr_h* pTmrId) {
return taosTmrResetPriority(fp, mseconds, param, handle, pTmrId, 1);
}
static int32_t taosTmrModuleInit(void) {
tmrCtrls = taosMemoryMalloc(sizeof(tmr_ctrl_t) * tsMaxTmrCtrl);
if (tmrCtrls == NULL) {
@ -550,6 +574,7 @@ static int32_t taosTmrModuleInit(void) {
}
tmrQhandle = taosInitScheduler(10000, taosTmrThreads, "tmr", NULL);
tmrQhandleHigh = taosInitScheduler(10000, taosTmrThreads, "high-tmr", NULL);
if (taosInitTimer(taosTimerLoopFunc, MSECONDS_PER_TICK) != 0) {
tmrError("failed to initialize timer");
}
@ -633,6 +658,9 @@ void taosTmrCleanUp(void* handle) {
taosCleanUpScheduler(tmrQhandle);
taosMemoryFreeClear(tmrQhandle);
taosCleanUpScheduler(tmrQhandleHigh);
taosMemoryFreeClear(tmrQhandleHigh);
for (int32_t i = 0; i < tListLen(wheels); i++) {
time_wheel_t* wheel = wheels + i;
(void)taosThreadMutexDestroy(&wheel->mutex);