Merge remote-tracking branch 'origin/3.0' into fix/dnode
This commit is contained in:
commit
bb032f13f5
|
@ -1808,37 +1808,39 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
|
|||
|
||||
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
|
||||
SSyncNode* pSyncNode = (SSyncNode*)param;
|
||||
if (atomic_load_64(&pSyncNode->heartbeatTimerLogicClockUser) <=
|
||||
atomic_load_64(&pSyncNode->heartbeatTimerLogicClock)) {
|
||||
SyncTimeout* pSyncMsg =
|
||||
syncTimeoutBuild2(SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pSyncNode->heartbeatTimerLogicClock),
|
||||
pSyncNode->heartbeatTimerMS, pSyncNode->vgId, pSyncNode);
|
||||
SRpcMsg rpcMsg;
|
||||
syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
|
||||
syncRpcMsgLog2((char*)"==syncNodeEqHeartbeatTimer==", &rpcMsg);
|
||||
if (pSyncNode->FpEqMsg != NULL) {
|
||||
int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
|
||||
if (code != 0) {
|
||||
sError("vgId:%d sync enqueue timer msg error, code:%d", pSyncNode->vgId, code);
|
||||
rpcFreeCont(rpcMsg.pCont);
|
||||
syncTimeoutDestroy(pSyncMsg);
|
||||
return;
|
||||
if (pSyncNode->replicaNum > 1) {
|
||||
if (atomic_load_64(&pSyncNode->heartbeatTimerLogicClockUser) <=
|
||||
atomic_load_64(&pSyncNode->heartbeatTimerLogicClock)) {
|
||||
SyncTimeout* pSyncMsg =
|
||||
syncTimeoutBuild2(SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pSyncNode->heartbeatTimerLogicClock),
|
||||
pSyncNode->heartbeatTimerMS, pSyncNode->vgId, pSyncNode);
|
||||
SRpcMsg rpcMsg;
|
||||
syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
|
||||
syncRpcMsgLog2((char*)"==syncNodeEqHeartbeatTimer==", &rpcMsg);
|
||||
if (pSyncNode->FpEqMsg != NULL) {
|
||||
int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
|
||||
if (code != 0) {
|
||||
sError("vgId:%d sync enqueue timer msg error, code:%d", pSyncNode->vgId, code);
|
||||
rpcFreeCont(rpcMsg.pCont);
|
||||
syncTimeoutDestroy(pSyncMsg);
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
sError("syncNodeEqHeartbeatTimer FpEqMsg is NULL");
|
||||
}
|
||||
syncTimeoutDestroy(pSyncMsg);
|
||||
|
||||
if (syncEnvIsStart()) {
|
||||
taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager,
|
||||
&pSyncNode->pHeartbeatTimer);
|
||||
} else {
|
||||
sError("sync env is stop, syncNodeEqHeartbeatTimer");
|
||||
}
|
||||
} else {
|
||||
sError("syncNodeEqHeartbeatTimer FpEqMsg is NULL");
|
||||
sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRIu64 ", heartbeatTimerLogicClockUser:%" PRIu64
|
||||
"",
|
||||
pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
|
||||
}
|
||||
syncTimeoutDestroy(pSyncMsg);
|
||||
|
||||
if (syncEnvIsStart()) {
|
||||
taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager,
|
||||
&pSyncNode->pHeartbeatTimer);
|
||||
} else {
|
||||
sError("sync env is stop, syncNodeEqHeartbeatTimer");
|
||||
}
|
||||
} else {
|
||||
sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRIu64 ", heartbeatTimerLogicClockUser:%" PRIu64
|
||||
"",
|
||||
pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2184,7 +2186,7 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
|
|||
ths->pFsm->FpRestoreFinishCb(ths->pFsm);
|
||||
}
|
||||
ths->restoreFinish = true;
|
||||
sDebug("vgId:%d sync event restore finish", ths->vgId);
|
||||
sDebug("vgId:%d sync event restore finish, index:%ld", ths->vgId, pEntry->index);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue