refactor(sync): add SElectTimer
This commit is contained in:
parent
9caea0f944
commit
49af601e19
|
@ -79,6 +79,12 @@ typedef struct SSyncTimer {
|
||||||
void* pData;
|
void* pData;
|
||||||
} SSyncTimer;
|
} SSyncTimer;
|
||||||
|
|
||||||
|
typedef struct SElectTimer {
|
||||||
|
uint64_t logicClock;
|
||||||
|
SSyncNode* pSyncNode;
|
||||||
|
void* pData;
|
||||||
|
} SElectTimer;
|
||||||
|
|
||||||
int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId);
|
int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId);
|
||||||
int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
|
int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
|
||||||
int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
|
int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
|
||||||
|
|
|
@ -1564,7 +1564,13 @@ int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
if (syncEnvIsStart()) {
|
if (syncEnvIsStart()) {
|
||||||
pSyncNode->electTimerMS = ms;
|
pSyncNode->electTimerMS = ms;
|
||||||
taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager,
|
|
||||||
|
SElectTimer* pElectTimer = taosMemoryMalloc(sizeof(SElectTimer));
|
||||||
|
pElectTimer->logicClock = pSyncNode->electTimerLogicClock;
|
||||||
|
pElectTimer->pSyncNode = pSyncNode;
|
||||||
|
pElectTimer->pData = NULL;
|
||||||
|
|
||||||
|
taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pElectTimer, gSyncEnv->pTimerManager,
|
||||||
&pSyncNode->pElectTimer);
|
&pSyncNode->pElectTimer);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
|
@ -2776,10 +2782,11 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void syncNodeEqElectTimer(void* param, void* tmrId) {
|
static void syncNodeEqElectTimer(void* param, void* tmrId) {
|
||||||
SSyncNode* pSyncNode = (SSyncNode*)param;
|
SElectTimer* pElectTimer = (SElectTimer*)param;
|
||||||
|
SSyncNode* pSyncNode = pElectTimer->pSyncNode;
|
||||||
|
|
||||||
SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_ELECTION, atomic_load_64(&pSyncNode->electTimerLogicClock),
|
SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_ELECTION, pElectTimer->logicClock, pSyncNode->electTimerMS,
|
||||||
pSyncNode->electTimerMS, pSyncNode->vgId, pSyncNode);
|
pSyncNode->vgId, pSyncNode);
|
||||||
SRpcMsg rpcMsg;
|
SRpcMsg rpcMsg;
|
||||||
syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
|
syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
|
||||||
if (pSyncNode->FpEqMsg != NULL) {
|
if (pSyncNode->FpEqMsg != NULL) {
|
||||||
|
@ -2788,6 +2795,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
|
||||||
sError("vgId:%d, sync enqueue elect msg error, code:%d", pSyncNode->vgId, code);
|
sError("vgId:%d, sync enqueue elect msg error, code:%d", pSyncNode->vgId, code);
|
||||||
rpcFreeCont(rpcMsg.pCont);
|
rpcFreeCont(rpcMsg.pCont);
|
||||||
syncTimeoutDestroy(pSyncMsg);
|
syncTimeoutDestroy(pSyncMsg);
|
||||||
|
taosMemoryFree(pElectTimer);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2800,7 +2808,9 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
|
||||||
} else {
|
} else {
|
||||||
sTrace("syncNodeEqElectTimer FpEqMsg is NULL");
|
sTrace("syncNodeEqElectTimer FpEqMsg is NULL");
|
||||||
}
|
}
|
||||||
|
|
||||||
syncTimeoutDestroy(pSyncMsg);
|
syncTimeoutDestroy(pSyncMsg);
|
||||||
|
taosMemoryFree(pElectTimer);
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
// reset timer ms
|
// reset timer ms
|
||||||
|
|
Loading…
Reference in New Issue