From 7d897369d5259aad38e41dcfa3ab0cc2094d2ea4 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 14 Mar 2022 16:27:25 +0800 Subject: [PATCH] sync index --- source/libs/sync/inc/syncInt.h | 28 ++- source/libs/sync/src/syncMain.c | 408 ++++++++++++++++---------------- 2 files changed, 220 insertions(+), 216 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index df277e2d7e..8866603b8e 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -67,9 +67,6 @@ extern "C" { } \ } -struct SRaft; -typedef struct SRaft SRaft; - struct SyncTimeout; typedef struct SyncTimeout SyncTimeout; @@ -117,8 +114,10 @@ typedef struct SSyncNode { SSyncCfg syncCfg; char path[TSDB_FILENAME_LEN]; char raftStorePath[TSDB_FILENAME_LEN * 2]; - SWal* pWal; - void* rpcClient; + + // sync io + SWal* pWal; + void* rpcClient; int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg); void* queue; int32_t (*FpEqMsg)(void* queue, SRpcMsg* pMsg); @@ -164,7 +163,7 @@ typedef struct SSyncNode { int32_t pingTimerMS; uint64_t pingTimerLogicClock; uint64_t pingTimerLogicClockUser; - TAOS_TMR_CALLBACK FpPingTimer; // Timer Fp + TAOS_TMR_CALLBACK FpPingTimerCB; // Timer Fp uint64_t pingTimerCounter; // elect timer @@ -172,7 +171,7 @@ typedef struct SSyncNode { int32_t electTimerMS; uint64_t electTimerLogicClock; uint64_t electTimerLogicClockUser; - TAOS_TMR_CALLBACK FpElectTimer; // Timer Fp + TAOS_TMR_CALLBACK FpElectTimerCB; // Timer Fp uint64_t electTimerCounter; // heartbeat timer @@ -180,7 +179,7 @@ typedef struct SSyncNode { int32_t heartbeatTimerMS; uint64_t heartbeatTimerLogicClock; uint64_t heartbeatTimerLogicClockUser; - TAOS_TMR_CALLBACK FpHeartbeatTimer; // Timer Fp + TAOS_TMR_CALLBACK FpHeartbeatTimerCB; // Timer Fp uint64_t heartbeatTimerCounter; // callback @@ -194,16 +193,17 @@ typedef struct SSyncNode { } SSyncNode; +// open/close -------------- SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo); void syncNodeClose(SSyncNode* pSyncNode); -int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg); -int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg); +// ping -------------- int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg); -int32_t syncNodePingAll(SSyncNode* pSyncNode); -int32_t syncNodePingPeers(SSyncNode* pSyncNode); int32_t syncNodePingSelf(SSyncNode* pSyncNode); +int32_t syncNodePingPeers(SSyncNode* pSyncNode); +int32_t syncNodePingAll(SSyncNode* pSyncNode); +// timer control -------------- int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode); int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode); int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms); @@ -211,6 +211,10 @@ int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode); int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms); int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode); int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode); + +// utils -------------- +int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg); +int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg); cJSON* syncNode2Json(const SSyncNode* pSyncNode); char* syncNode2Str(const SSyncNode* pSyncNode); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 63fb4062e0..4caa376599 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -31,13 +31,16 @@ static int32_t tsNodeRefId = -1; // ------ local funciton --------- +// enqueue message ---- static void syncNodeEqPingTimer(void* param, void* tmrId); static void syncNodeEqElectTimer(void* param, void* tmrId); static void syncNodeEqHeartbeatTimer(void* param, void* tmrId); +// on message ---- static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); +// raft algorithm ---- static void UpdateTerm(SSyncNode* pSyncNode, SyncTerm term); static void syncNodeBecomeFollower(SSyncNode* pSyncNode); static void syncNodeBecomeLeader(SSyncNode* pSyncNode); @@ -48,16 +51,20 @@ static void syncNodeCandidate2Follower(SSyncNode* pSyncNode); // --------------------------------- int32_t syncInit() { - sTrace("syncInit ok"); - return 0; + int32_t ret = syncEnvStart(); + return ret; } -void syncCleanUp() { sTrace("syncCleanUp ok"); } +void syncCleanUp() { + int32_t ret = syncEnvStop(); + assert(ret == 0); +} int64_t syncStart(const SSyncInfo* pSyncInfo) { + int32_t ret = 0; SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo); assert(pSyncNode != NULL); - return 0; + return ret; } void syncStop(int64_t rid) { @@ -65,9 +72,13 @@ void syncStop(int64_t rid) { syncNodeClose(pSyncNode); } -int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { return 0; } +int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { + int32_t ret = 0; + return ret; +} int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { + int32_t ret = 0; SSyncNode* pSyncNode = NULL; // get pointer from rid if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { SyncClientRequest* pSyncMsg = syncClientRequestBuild2(pMsg, 0, isWeak); @@ -75,11 +86,13 @@ int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg); pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); syncClientRequestDestroy(pSyncMsg); + ret = 0; + } else { sTrace("syncForwardToPeer not leader, %s", syncUtilState2String(pSyncNode->state)); - return -1; // need define err code !! + ret = -1; // need define err code !! } - return 0; + return ret; } ESyncState syncGetMyRole(int64_t rid) { @@ -89,6 +102,7 @@ ESyncState syncGetMyRole(int64_t rid) { void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole) {} +// open/close -------------- SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { SSyncNode* pSyncNode = (SSyncNode*)malloc(sizeof(SSyncNode)); assert(pSyncNode != NULL); @@ -162,7 +176,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { pSyncNode->pingTimerMS = PING_TIMER_MS; atomic_store_64(&pSyncNode->pingTimerLogicClock, 0); atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0); - pSyncNode->FpPingTimer = syncNodeEqPingTimer; + pSyncNode->FpPingTimerCB = syncNodeEqPingTimer; pSyncNode->pingTimerCounter = 0; // init elect timer @@ -170,7 +184,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { pSyncNode->electTimerMS = syncUtilElectRandomMS(); atomic_store_64(&pSyncNode->electTimerLogicClock, 0); atomic_store_64(&pSyncNode->electTimerLogicClockUser, 0); - pSyncNode->FpElectTimer = syncNodeEqElectTimer; + pSyncNode->FpElectTimerCB = syncNodeEqElectTimer; pSyncNode->electTimerCounter = 0; // init heartbeat timer @@ -178,7 +192,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { pSyncNode->heartbeatTimerMS = HEARTBEAT_TIMER_MS; atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0); atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0); - pSyncNode->FpHeartbeatTimer = syncNodeEqHeartbeatTimer; + pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer; pSyncNode->heartbeatTimerCounter = 0; // init callback @@ -194,10 +208,146 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { } void syncNodeClose(SSyncNode* pSyncNode) { + int32_t ret; assert(pSyncNode != NULL); + + ret = raftStoreClose(pSyncNode->pRaftStore); + assert(ret == 0); + + voteGrantedDestroy(pSyncNode->pVotesGranted); + votesRespondDestory(pSyncNode->pVotesRespond); + syncIndexMgrDestroy(pSyncNode->pNextIndex); + syncIndexMgrDestroy(pSyncNode->pMatchIndex); + logStoreDestory(pSyncNode->pLogStore); + + syncNodeStopPingTimer(pSyncNode); + syncNodeStopElectTimer(pSyncNode); + syncNodeStopHeartbeatTimer(pSyncNode); + free(pSyncNode); } +// ping -------------- +int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) { + syncPingLog2((char*)"==syncNodePing==", pMsg); + int32_t ret = 0; + + SRpcMsg rpcMsg; + syncPing2RpcMsg(pMsg, &rpcMsg); + syncRpcMsgLog2((char*)"==syncNodePing==", &rpcMsg); + + ret = syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg); + return ret; +} + +int32_t syncNodePingSelf(SSyncNode* pSyncNode) { + int32_t ret = 0; + SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, &pSyncNode->myRaftId); + ret = syncNodePing(pSyncNode, &pMsg->destId, pMsg); + assert(ret == 0); + + syncPingDestroy(pMsg); + return ret; +} + +int32_t syncNodePingPeers(SSyncNode* pSyncNode) { + int32_t ret = 0; + for (int i = 0; i < pSyncNode->peersNum; ++i) { + SRaftId destId; + syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &destId); + SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, &destId); + ret = syncNodePing(pSyncNode, &destId, pMsg); + assert(ret == 0); + syncPingDestroy(pMsg); + } + return ret; +} + +int32_t syncNodePingAll(SSyncNode* pSyncNode) { + int32_t ret = 0; + for (int i = 0; i < pSyncNode->syncCfg.replicaNum; ++i) { + SRaftId destId; + syncUtilnodeInfo2raftId(&pSyncNode->syncCfg.nodeInfo[i], pSyncNode->vgId, &destId); + SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, &destId); + ret = syncNodePing(pSyncNode, &destId, pMsg); + assert(ret == 0); + syncPingDestroy(pMsg); + } + return ret; +} + +// timer control -------------- +int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) { + int32_t ret = 0; + taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager, + &pSyncNode->pPingTimer); + atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser); + return ret; +} + +int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) { + int32_t ret = 0; + atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1); + taosTmrStop(pSyncNode->pPingTimer); + pSyncNode->pPingTimer = NULL; + return ret; +} + +int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) { + int32_t ret = 0; + pSyncNode->electTimerMS = ms; + taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager, + &pSyncNode->pElectTimer); + atomic_store_64(&pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser); + return ret; +} + +int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) { + int32_t ret = 0; + atomic_add_fetch_64(&pSyncNode->electTimerLogicClockUser, 1); + taosTmrStop(pSyncNode->pElectTimer); + pSyncNode->pElectTimer = NULL; + return ret; +} + +int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) { + int32_t ret = 0; + syncNodeStopElectTimer(pSyncNode); + syncNodeStartElectTimer(pSyncNode, ms); + return ret; +} + +int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) { + int32_t ret = 0; + taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager, + &pSyncNode->pHeartbeatTimer); + atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser); + return ret; +} + +int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) { + int32_t ret = 0; + atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1); + taosTmrStop(pSyncNode->pHeartbeatTimer); + pSyncNode->pHeartbeatTimer = NULL; + return ret; +} + +// utils -------------- +int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) { + SEpSet epSet; + syncUtilraftId2EpSet(destRaftId, &epSet); + pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg); + return 0; +} + +int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) { + SEpSet epSet; + syncUtilnodeInfo2EpSet(nodeInfo, &epSet); + pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg); + return 0; +} + cJSON* syncNode2Json(const SSyncNode* pSyncNode) { char u64buf[128]; cJSON* pRoot = cJSON_CreateObject(); @@ -253,12 +403,22 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) { // tla+ server vars cJSON_AddNumberToObject(pRoot, "state", pSyncNode->state); cJSON_AddStringToObject(pRoot, "state_str", syncUtilState2String(pSyncNode->state)); + char tmpBuf[RAFT_STORE_BLOCK_SIZE]; + raftStoreSerialize(pSyncNode->pRaftStore, tmpBuf, sizeof(tmpBuf)); + cJSON_AddStringToObject(pRoot, "pRaftStore", tmpBuf); // tla+ candidate vars + cJSON_AddItemToObject(pRoot, "pVotesGranted", voteGranted2Json(pSyncNode->pVotesGranted)); + cJSON_AddItemToObject(pRoot, "pVotesRespond", votesRespond2Json(pSyncNode->pVotesRespond)); // tla+ leader vars + cJSON_AddItemToObject(pRoot, "pNextIndex", syncIndexMgr2Json(pSyncNode->pNextIndex)); + cJSON_AddItemToObject(pRoot, "pMatchIndex", syncIndexMgr2Json(pSyncNode->pMatchIndex)); // tla+ log vars + cJSON_AddItemToObject(pRoot, "pLogStore", logStore2Json(pSyncNode->pLogStore)); + snprintf(u64buf, sizeof(u64buf), "%ld", pSyncNode->commitIndex); + cJSON_AddStringToObject(pRoot, "commitIndex", u64buf); // ping timer snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pPingTimer); @@ -268,8 +428,8 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) { cJSON_AddStringToObject(pRoot, "pingTimerLogicClock", u64buf); snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->pingTimerLogicClockUser); cJSON_AddStringToObject(pRoot, "pingTimerLogicClockUser", u64buf); - snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpPingTimer); - cJSON_AddStringToObject(pRoot, "FpPingTimer", u64buf); + snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpPingTimerCB); + cJSON_AddStringToObject(pRoot, "FpPingTimerCB", u64buf); snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->pingTimerCounter); cJSON_AddStringToObject(pRoot, "pingTimerCounter", u64buf); @@ -281,8 +441,8 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) { cJSON_AddStringToObject(pRoot, "electTimerLogicClock", u64buf); snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->electTimerLogicClockUser); cJSON_AddStringToObject(pRoot, "electTimerLogicClockUser", u64buf); - snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpElectTimer); - cJSON_AddStringToObject(pRoot, "FpElectTimer", u64buf); + snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpElectTimerCB); + cJSON_AddStringToObject(pRoot, "FpElectTimerCB", u64buf); snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->electTimerCounter); cJSON_AddStringToObject(pRoot, "electTimerCounter", u64buf); @@ -294,8 +454,8 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) { cJSON_AddStringToObject(pRoot, "heartbeatTimerLogicClock", u64buf); snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->heartbeatTimerLogicClockUser); cJSON_AddStringToObject(pRoot, "heartbeatTimerLogicClockUser", u64buf); - snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpHeartbeatTimer); - cJSON_AddStringToObject(pRoot, "FpHeartbeatTimer", u64buf); + snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpHeartbeatTimerCB); + cJSON_AddStringToObject(pRoot, "FpHeartbeatTimerCB", u64buf); snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->heartbeatTimerCounter); cJSON_AddStringToObject(pRoot, "heartbeatTimerCounter", u64buf); @@ -327,143 +487,6 @@ char* syncNode2Str(const SSyncNode* pSyncNode) { return serialized; } -int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) { - SEpSet epSet; - syncUtilraftId2EpSet(destRaftId, &epSet); - pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg); - return 0; -} - -int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) { - SEpSet epSet; - syncUtilnodeInfo2EpSet(nodeInfo, &epSet); - pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg); - return 0; -} - -int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) { - sTrace("syncNodePing pSyncNode:%p ", pSyncNode); - int32_t ret = 0; - - SRpcMsg rpcMsg; - syncPing2RpcMsg(pMsg, &rpcMsg); - syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg); - - { - cJSON* pJson = syncPing2Json(pMsg); - char* serialized = cJSON_Print(pJson); - sTrace("syncNodePing pMsg:%s ", serialized); - free(serialized); - cJSON_Delete(pJson); - } - - { - SyncPing* pMsg2 = rpcMsg.pCont; - cJSON* pJson = syncPing2Json(pMsg2); - char* serialized = cJSON_Print(pJson); - sTrace("syncNodePing rpcMsg.pCont:%s ", serialized); - free(serialized); - cJSON_Delete(pJson); - } - - return ret; -} - -int32_t syncNodePingAll(SSyncNode* pSyncNode) { - sTrace("syncNodePingAll pSyncNode:%p ", pSyncNode); - int32_t ret = 0; - for (int i = 0; i < pSyncNode->syncCfg.replicaNum; ++i) { - SRaftId destId; - syncUtilnodeInfo2raftId(&pSyncNode->syncCfg.nodeInfo[i], pSyncNode->vgId, &destId); - SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, &destId); - ret = syncNodePing(pSyncNode, &destId, pMsg); - assert(ret == 0); - syncPingDestroy(pMsg); - } -} - -int32_t syncNodePingPeers(SSyncNode* pSyncNode) { - int32_t ret = 0; - for (int i = 0; i < pSyncNode->peersNum; ++i) { - SRaftId destId; - syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &destId); - SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, &destId); - ret = syncNodePing(pSyncNode, &destId, pMsg); - assert(ret == 0); - syncPingDestroy(pMsg); - } -} - -int32_t syncNodePingSelf(SSyncNode* pSyncNode) { - int32_t ret; - SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, &pSyncNode->myRaftId); - ret = syncNodePing(pSyncNode, &pMsg->destId, pMsg); - assert(ret == 0); - syncPingDestroy(pMsg); -} - -int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) { - atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser); - pSyncNode->pingTimerMS = PING_TIMER_MS; - if (pSyncNode->pPingTimer == NULL) { - pSyncNode->pPingTimer = - taosTmrStart(pSyncNode->FpPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager); - } else { - taosTmrReset(pSyncNode->FpPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager, - &pSyncNode->pPingTimer); - } - return 0; -} - -int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) { - atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1); - pSyncNode->pingTimerMS = TIMER_MAX_MS; - return 0; -} - -int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) { - pSyncNode->electTimerMS = ms; - atomic_store_64(&pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser); - if (pSyncNode->pElectTimer == NULL) { - pSyncNode->pElectTimer = - taosTmrStart(pSyncNode->FpElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager); - } else { - taosTmrReset(pSyncNode->FpElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager, - &pSyncNode->pElectTimer); - } - return 0; -} - -int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) { - atomic_add_fetch_64(&pSyncNode->electTimerLogicClockUser, 1); - pSyncNode->electTimerMS = TIMER_MAX_MS; - return 0; -} - -int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) { - syncNodeStopElectTimer(pSyncNode); - syncNodeStartElectTimer(pSyncNode, ms); - return 0; -} - -int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) { - atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser); - if (pSyncNode->pHeartbeatTimer == NULL) { - pSyncNode->pHeartbeatTimer = - taosTmrStart(pSyncNode->FpHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager); - } else { - taosTmrReset(pSyncNode->FpHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager, - &pSyncNode->pHeartbeatTimer); - } - return 0; -} - -int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) { - atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1); - pSyncNode->heartbeatTimerMS = TIMER_MAX_MS; - return 0; -} - // for debug -------------- void syncNodePrint(SSyncNode* pObj) { char* serialized = syncNode2Str(pObj); @@ -492,61 +515,23 @@ void syncNodeLog2(char* s, SSyncNode* pObj) { } // ------ local funciton --------- -static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) { - int32_t ret = 0; - sTrace("<-- syncNodeOnPingCb -->"); - - { - cJSON* pJson = syncPing2Json(pMsg); - char* serialized = cJSON_Print(pJson); - sTrace("process syncMessage recv: syncNodeOnPingCb pMsg:%s ", serialized); - free(serialized); - cJSON_Delete(pJson); - } - - SyncPingReply* pMsgReply = syncPingReplyBuild3(&ths->myRaftId, &pMsg->srcId); - SRpcMsg rpcMsg; - syncPingReply2RpcMsg(pMsgReply, &rpcMsg); - syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg); - - return ret; -} - -static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) { - int32_t ret = 0; - sTrace("<-- syncNodeOnPingReplyCb -->"); - - { - cJSON* pJson = syncPingReply2Json(pMsg); - char* serialized = cJSON_Print(pJson); - sTrace("process syncMessage recv: syncNodeOnPingReplyCb pMsg:%s ", serialized); - free(serialized); - cJSON_Delete(pJson); - } - - return ret; -} - +// enqueue message ---- static void syncNodeEqPingTimer(void* param, void* tmrId) { - sTrace("<-- syncNodeEqPingTimer -->"); - SSyncNode* pSyncNode = (SSyncNode*)param; if (atomic_load_64(&pSyncNode->pingTimerLogicClockUser) <= atomic_load_64(&pSyncNode->pingTimerLogicClock)) { SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_PING, atomic_load_64(&pSyncNode->pingTimerLogicClock), pSyncNode->pingTimerMS, pSyncNode); SRpcMsg rpcMsg; syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); + syncRpcMsgLog2((char*)"==syncNodeEqPingTimer==", &rpcMsg); pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); syncTimeoutDestroy(pSyncMsg); - // reset timer ms - // pSyncNode->pingTimerMS += 100; - taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager, &pSyncNode->pPingTimer); } else { - sTrace("syncNodeEqPingTimer: pingTimerLogicClock:%lu, pingTimerLogicClockUser:%lu", pSyncNode->pingTimerLogicClock, - pSyncNode->pingTimerLogicClockUser); + sTrace("==syncNodeEqPingTimer== pingTimerLogicClock:%lu, pingTimerLogicClockUser:%lu", + pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser); } } @@ -555,19 +540,18 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { if (atomic_load_64(&pSyncNode->electTimerLogicClockUser) <= atomic_load_64(&pSyncNode->electTimerLogicClock)) { SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_ELECTION, atomic_load_64(&pSyncNode->electTimerLogicClock), pSyncNode->electTimerMS, pSyncNode); - - SRpcMsg rpcMsg; + SRpcMsg rpcMsg; syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); + syncRpcMsgLog2((char*)"==syncNodeEqElectTimer==", &rpcMsg); pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); syncTimeoutDestroy(pSyncMsg); // reset timer ms pSyncNode->electTimerMS = syncUtilElectRandomMS(); - taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager, &pSyncNode->pPingTimer); } else { - sTrace("syncNodeEqElectTimer: electTimerLogicClock:%lu, electTimerLogicClockUser:%lu", + sTrace("==syncNodeEqElectTimer== electTimerLogicClock:%lu, electTimerLogicClockUser:%lu", pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser); } } @@ -579,23 +563,39 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pSyncNode->heartbeatTimerLogicClock), pSyncNode->heartbeatTimerMS, pSyncNode); - SRpcMsg rpcMsg; syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); + syncRpcMsgLog2((char*)"==syncNodeEqHeartbeatTimer==", &rpcMsg); pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); syncTimeoutDestroy(pSyncMsg); - // reset timer ms - // pSyncNode->heartbeatTimerMS += 100; - taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager, &pSyncNode->pHeartbeatTimer); } else { - sTrace("syncNodeEqHeartbeatTimer: heartbeatTimerLogicClock:%lu, heartbeatTimerLogicClockUser:%lu", + sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%lu, heartbeatTimerLogicClockUser:%lu", pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser); } } +// on message ---- +static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) { + int32_t ret = 0; + syncPingLog2("==syncNodeOnPingCb==", pMsg); + SyncPingReply* pMsgReply = syncPingReplyBuild3(&ths->myRaftId, &pMsg->srcId); + SRpcMsg rpcMsg; + syncPingReply2RpcMsg(pMsgReply, &rpcMsg); + syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg); + + return ret; +} + +static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) { + int32_t ret = 0; + syncPingReplyLog2("==syncNodeOnPingReplyCb==", pMsg); + return ret; +} + +// raft algorithm ---- static void UpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { if (term > pSyncNode->pRaftStore->currentTerm) { pSyncNode->pRaftStore->currentTerm = term;