sync index

This commit is contained in:
Minghao Li 2022-03-14 16:27:25 +08:00
parent 1a78917767
commit 7d897369d5
2 changed files with 220 additions and 216 deletions

View File

@ -67,9 +67,6 @@ extern "C" {
} \ } \
} }
struct SRaft;
typedef struct SRaft SRaft;
struct SyncTimeout; struct SyncTimeout;
typedef struct SyncTimeout SyncTimeout; typedef struct SyncTimeout SyncTimeout;
@ -117,8 +114,10 @@ typedef struct SSyncNode {
SSyncCfg syncCfg; SSyncCfg syncCfg;
char path[TSDB_FILENAME_LEN]; char path[TSDB_FILENAME_LEN];
char raftStorePath[TSDB_FILENAME_LEN * 2]; char raftStorePath[TSDB_FILENAME_LEN * 2];
SWal* pWal;
void* rpcClient; // sync io
SWal* pWal;
void* rpcClient;
int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg); int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg);
void* queue; void* queue;
int32_t (*FpEqMsg)(void* queue, SRpcMsg* pMsg); int32_t (*FpEqMsg)(void* queue, SRpcMsg* pMsg);
@ -164,7 +163,7 @@ typedef struct SSyncNode {
int32_t pingTimerMS; int32_t pingTimerMS;
uint64_t pingTimerLogicClock; uint64_t pingTimerLogicClock;
uint64_t pingTimerLogicClockUser; uint64_t pingTimerLogicClockUser;
TAOS_TMR_CALLBACK FpPingTimer; // Timer Fp TAOS_TMR_CALLBACK FpPingTimerCB; // Timer Fp
uint64_t pingTimerCounter; uint64_t pingTimerCounter;
// elect timer // elect timer
@ -172,7 +171,7 @@ typedef struct SSyncNode {
int32_t electTimerMS; int32_t electTimerMS;
uint64_t electTimerLogicClock; uint64_t electTimerLogicClock;
uint64_t electTimerLogicClockUser; uint64_t electTimerLogicClockUser;
TAOS_TMR_CALLBACK FpElectTimer; // Timer Fp TAOS_TMR_CALLBACK FpElectTimerCB; // Timer Fp
uint64_t electTimerCounter; uint64_t electTimerCounter;
// heartbeat timer // heartbeat timer
@ -180,7 +179,7 @@ typedef struct SSyncNode {
int32_t heartbeatTimerMS; int32_t heartbeatTimerMS;
uint64_t heartbeatTimerLogicClock; uint64_t heartbeatTimerLogicClock;
uint64_t heartbeatTimerLogicClockUser; uint64_t heartbeatTimerLogicClockUser;
TAOS_TMR_CALLBACK FpHeartbeatTimer; // Timer Fp TAOS_TMR_CALLBACK FpHeartbeatTimerCB; // Timer Fp
uint64_t heartbeatTimerCounter; uint64_t heartbeatTimerCounter;
// callback // callback
@ -194,16 +193,17 @@ typedef struct SSyncNode {
} SSyncNode; } SSyncNode;
// open/close --------------
SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo); SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo);
void syncNodeClose(SSyncNode* pSyncNode); void syncNodeClose(SSyncNode* pSyncNode);
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg); // ping --------------
int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg);
int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg); int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg);
int32_t syncNodePingAll(SSyncNode* pSyncNode);
int32_t syncNodePingPeers(SSyncNode* pSyncNode);
int32_t syncNodePingSelf(SSyncNode* pSyncNode); int32_t syncNodePingSelf(SSyncNode* pSyncNode);
int32_t syncNodePingPeers(SSyncNode* pSyncNode);
int32_t syncNodePingAll(SSyncNode* pSyncNode);
// timer control --------------
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode); int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode); int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode);
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms); int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms);
@ -211,6 +211,10 @@ int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode);
int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms); int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms);
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode); int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode); int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode);
// utils --------------
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg);
int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg);
cJSON* syncNode2Json(const SSyncNode* pSyncNode); cJSON* syncNode2Json(const SSyncNode* pSyncNode);
char* syncNode2Str(const SSyncNode* pSyncNode); char* syncNode2Str(const SSyncNode* pSyncNode);

View File

@ -31,13 +31,16 @@
static int32_t tsNodeRefId = -1; static int32_t tsNodeRefId = -1;
// ------ local funciton --------- // ------ local funciton ---------
// enqueue message ----
static void syncNodeEqPingTimer(void* param, void* tmrId); static void syncNodeEqPingTimer(void* param, void* tmrId);
static void syncNodeEqElectTimer(void* param, void* tmrId); static void syncNodeEqElectTimer(void* param, void* tmrId);
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId); static void syncNodeEqHeartbeatTimer(void* param, void* tmrId);
// on message ----
static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
// raft algorithm ----
static void UpdateTerm(SSyncNode* pSyncNode, SyncTerm term); static void UpdateTerm(SSyncNode* pSyncNode, SyncTerm term);
static void syncNodeBecomeFollower(SSyncNode* pSyncNode); static void syncNodeBecomeFollower(SSyncNode* pSyncNode);
static void syncNodeBecomeLeader(SSyncNode* pSyncNode); static void syncNodeBecomeLeader(SSyncNode* pSyncNode);
@ -48,16 +51,20 @@ static void syncNodeCandidate2Follower(SSyncNode* pSyncNode);
// --------------------------------- // ---------------------------------
int32_t syncInit() { int32_t syncInit() {
sTrace("syncInit ok"); int32_t ret = syncEnvStart();
return 0; return ret;
} }
void syncCleanUp() { sTrace("syncCleanUp ok"); } void syncCleanUp() {
int32_t ret = syncEnvStop();
assert(ret == 0);
}
int64_t syncStart(const SSyncInfo* pSyncInfo) { int64_t syncStart(const SSyncInfo* pSyncInfo) {
int32_t ret = 0;
SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo); SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
assert(pSyncNode != NULL); assert(pSyncNode != NULL);
return 0; return ret;
} }
void syncStop(int64_t rid) { void syncStop(int64_t rid) {
@ -65,9 +72,13 @@ void syncStop(int64_t rid) {
syncNodeClose(pSyncNode); syncNodeClose(pSyncNode);
} }
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { return 0; } int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) {
int32_t ret = 0;
return ret;
}
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
int32_t ret = 0;
SSyncNode* pSyncNode = NULL; // get pointer from rid SSyncNode* pSyncNode = NULL; // get pointer from rid
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
SyncClientRequest* pSyncMsg = syncClientRequestBuild2(pMsg, 0, isWeak); SyncClientRequest* pSyncMsg = syncClientRequestBuild2(pMsg, 0, isWeak);
@ -75,11 +86,13 @@ int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg); syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg);
pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg);
syncClientRequestDestroy(pSyncMsg); syncClientRequestDestroy(pSyncMsg);
ret = 0;
} else { } else {
sTrace("syncForwardToPeer not leader, %s", syncUtilState2String(pSyncNode->state)); sTrace("syncForwardToPeer not leader, %s", syncUtilState2String(pSyncNode->state));
return -1; // need define err code !! ret = -1; // need define err code !!
} }
return 0; return ret;
} }
ESyncState syncGetMyRole(int64_t rid) { ESyncState syncGetMyRole(int64_t rid) {
@ -89,6 +102,7 @@ ESyncState syncGetMyRole(int64_t rid) {
void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole) {} void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole) {}
// open/close --------------
SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
SSyncNode* pSyncNode = (SSyncNode*)malloc(sizeof(SSyncNode)); SSyncNode* pSyncNode = (SSyncNode*)malloc(sizeof(SSyncNode));
assert(pSyncNode != NULL); assert(pSyncNode != NULL);
@ -162,7 +176,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
pSyncNode->pingTimerMS = PING_TIMER_MS; pSyncNode->pingTimerMS = PING_TIMER_MS;
atomic_store_64(&pSyncNode->pingTimerLogicClock, 0); atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0); atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
pSyncNode->FpPingTimer = syncNodeEqPingTimer; pSyncNode->FpPingTimerCB = syncNodeEqPingTimer;
pSyncNode->pingTimerCounter = 0; pSyncNode->pingTimerCounter = 0;
// init elect timer // init elect timer
@ -170,7 +184,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
pSyncNode->electTimerMS = syncUtilElectRandomMS(); pSyncNode->electTimerMS = syncUtilElectRandomMS();
atomic_store_64(&pSyncNode->electTimerLogicClock, 0); atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
atomic_store_64(&pSyncNode->electTimerLogicClockUser, 0); atomic_store_64(&pSyncNode->electTimerLogicClockUser, 0);
pSyncNode->FpElectTimer = syncNodeEqElectTimer; pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
pSyncNode->electTimerCounter = 0; pSyncNode->electTimerCounter = 0;
// init heartbeat timer // init heartbeat timer
@ -178,7 +192,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
pSyncNode->heartbeatTimerMS = HEARTBEAT_TIMER_MS; pSyncNode->heartbeatTimerMS = HEARTBEAT_TIMER_MS;
atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0); atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0); atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
pSyncNode->FpHeartbeatTimer = syncNodeEqHeartbeatTimer; pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
pSyncNode->heartbeatTimerCounter = 0; pSyncNode->heartbeatTimerCounter = 0;
// init callback // init callback
@ -194,10 +208,146 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
} }
void syncNodeClose(SSyncNode* pSyncNode) { void syncNodeClose(SSyncNode* pSyncNode) {
int32_t ret;
assert(pSyncNode != NULL); assert(pSyncNode != NULL);
ret = raftStoreClose(pSyncNode->pRaftStore);
assert(ret == 0);
voteGrantedDestroy(pSyncNode->pVotesGranted);
votesRespondDestory(pSyncNode->pVotesRespond);
syncIndexMgrDestroy(pSyncNode->pNextIndex);
syncIndexMgrDestroy(pSyncNode->pMatchIndex);
logStoreDestory(pSyncNode->pLogStore);
syncNodeStopPingTimer(pSyncNode);
syncNodeStopElectTimer(pSyncNode);
syncNodeStopHeartbeatTimer(pSyncNode);
free(pSyncNode); free(pSyncNode);
} }
// ping --------------
int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) {
syncPingLog2((char*)"==syncNodePing==", pMsg);
int32_t ret = 0;
SRpcMsg rpcMsg;
syncPing2RpcMsg(pMsg, &rpcMsg);
syncRpcMsgLog2((char*)"==syncNodePing==", &rpcMsg);
ret = syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg);
return ret;
}
int32_t syncNodePingSelf(SSyncNode* pSyncNode) {
int32_t ret = 0;
SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, &pSyncNode->myRaftId);
ret = syncNodePing(pSyncNode, &pMsg->destId, pMsg);
assert(ret == 0);
syncPingDestroy(pMsg);
return ret;
}
int32_t syncNodePingPeers(SSyncNode* pSyncNode) {
int32_t ret = 0;
for (int i = 0; i < pSyncNode->peersNum; ++i) {
SRaftId destId;
syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &destId);
SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, &destId);
ret = syncNodePing(pSyncNode, &destId, pMsg);
assert(ret == 0);
syncPingDestroy(pMsg);
}
return ret;
}
int32_t syncNodePingAll(SSyncNode* pSyncNode) {
int32_t ret = 0;
for (int i = 0; i < pSyncNode->syncCfg.replicaNum; ++i) {
SRaftId destId;
syncUtilnodeInfo2raftId(&pSyncNode->syncCfg.nodeInfo[i], pSyncNode->vgId, &destId);
SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, &destId);
ret = syncNodePing(pSyncNode, &destId, pMsg);
assert(ret == 0);
syncPingDestroy(pMsg);
}
return ret;
}
// timer control --------------
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
int32_t ret = 0;
taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager,
&pSyncNode->pPingTimer);
atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
return ret;
}
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
int32_t ret = 0;
atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
taosTmrStop(pSyncNode->pPingTimer);
pSyncNode->pPingTimer = NULL;
return ret;
}
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
int32_t ret = 0;
pSyncNode->electTimerMS = ms;
taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager,
&pSyncNode->pElectTimer);
atomic_store_64(&pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser);
return ret;
}
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
int32_t ret = 0;
atomic_add_fetch_64(&pSyncNode->electTimerLogicClockUser, 1);
taosTmrStop(pSyncNode->pElectTimer);
pSyncNode->pElectTimer = NULL;
return ret;
}
int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
int32_t ret = 0;
syncNodeStopElectTimer(pSyncNode);
syncNodeStartElectTimer(pSyncNode, ms);
return ret;
}
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
int32_t ret = 0;
taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager,
&pSyncNode->pHeartbeatTimer);
atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
return ret;
}
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
int32_t ret = 0;
atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1);
taosTmrStop(pSyncNode->pHeartbeatTimer);
pSyncNode->pHeartbeatTimer = NULL;
return ret;
}
// utils --------------
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
SEpSet epSet;
syncUtilraftId2EpSet(destRaftId, &epSet);
pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg);
return 0;
}
int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
SEpSet epSet;
syncUtilnodeInfo2EpSet(nodeInfo, &epSet);
pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg);
return 0;
}
cJSON* syncNode2Json(const SSyncNode* pSyncNode) { cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
char u64buf[128]; char u64buf[128];
cJSON* pRoot = cJSON_CreateObject(); cJSON* pRoot = cJSON_CreateObject();
@ -253,12 +403,22 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
// tla+ server vars // tla+ server vars
cJSON_AddNumberToObject(pRoot, "state", pSyncNode->state); cJSON_AddNumberToObject(pRoot, "state", pSyncNode->state);
cJSON_AddStringToObject(pRoot, "state_str", syncUtilState2String(pSyncNode->state)); cJSON_AddStringToObject(pRoot, "state_str", syncUtilState2String(pSyncNode->state));
char tmpBuf[RAFT_STORE_BLOCK_SIZE];
raftStoreSerialize(pSyncNode->pRaftStore, tmpBuf, sizeof(tmpBuf));
cJSON_AddStringToObject(pRoot, "pRaftStore", tmpBuf);
// tla+ candidate vars // tla+ candidate vars
cJSON_AddItemToObject(pRoot, "pVotesGranted", voteGranted2Json(pSyncNode->pVotesGranted));
cJSON_AddItemToObject(pRoot, "pVotesRespond", votesRespond2Json(pSyncNode->pVotesRespond));
// tla+ leader vars // tla+ leader vars
cJSON_AddItemToObject(pRoot, "pNextIndex", syncIndexMgr2Json(pSyncNode->pNextIndex));
cJSON_AddItemToObject(pRoot, "pMatchIndex", syncIndexMgr2Json(pSyncNode->pMatchIndex));
// tla+ log vars // tla+ log vars
cJSON_AddItemToObject(pRoot, "pLogStore", logStore2Json(pSyncNode->pLogStore));
snprintf(u64buf, sizeof(u64buf), "%ld", pSyncNode->commitIndex);
cJSON_AddStringToObject(pRoot, "commitIndex", u64buf);
// ping timer // ping timer
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pPingTimer); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pPingTimer);
@ -268,8 +428,8 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
cJSON_AddStringToObject(pRoot, "pingTimerLogicClock", u64buf); cJSON_AddStringToObject(pRoot, "pingTimerLogicClock", u64buf);
snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->pingTimerLogicClockUser); snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->pingTimerLogicClockUser);
cJSON_AddStringToObject(pRoot, "pingTimerLogicClockUser", u64buf); cJSON_AddStringToObject(pRoot, "pingTimerLogicClockUser", u64buf);
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpPingTimer); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpPingTimerCB);
cJSON_AddStringToObject(pRoot, "FpPingTimer", u64buf); cJSON_AddStringToObject(pRoot, "FpPingTimerCB", u64buf);
snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->pingTimerCounter); snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->pingTimerCounter);
cJSON_AddStringToObject(pRoot, "pingTimerCounter", u64buf); cJSON_AddStringToObject(pRoot, "pingTimerCounter", u64buf);
@ -281,8 +441,8 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
cJSON_AddStringToObject(pRoot, "electTimerLogicClock", u64buf); cJSON_AddStringToObject(pRoot, "electTimerLogicClock", u64buf);
snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->electTimerLogicClockUser); snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->electTimerLogicClockUser);
cJSON_AddStringToObject(pRoot, "electTimerLogicClockUser", u64buf); cJSON_AddStringToObject(pRoot, "electTimerLogicClockUser", u64buf);
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpElectTimer); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpElectTimerCB);
cJSON_AddStringToObject(pRoot, "FpElectTimer", u64buf); cJSON_AddStringToObject(pRoot, "FpElectTimerCB", u64buf);
snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->electTimerCounter); snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->electTimerCounter);
cJSON_AddStringToObject(pRoot, "electTimerCounter", u64buf); cJSON_AddStringToObject(pRoot, "electTimerCounter", u64buf);
@ -294,8 +454,8 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
cJSON_AddStringToObject(pRoot, "heartbeatTimerLogicClock", u64buf); cJSON_AddStringToObject(pRoot, "heartbeatTimerLogicClock", u64buf);
snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->heartbeatTimerLogicClockUser); snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->heartbeatTimerLogicClockUser);
cJSON_AddStringToObject(pRoot, "heartbeatTimerLogicClockUser", u64buf); cJSON_AddStringToObject(pRoot, "heartbeatTimerLogicClockUser", u64buf);
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpHeartbeatTimer); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpHeartbeatTimerCB);
cJSON_AddStringToObject(pRoot, "FpHeartbeatTimer", u64buf); cJSON_AddStringToObject(pRoot, "FpHeartbeatTimerCB", u64buf);
snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->heartbeatTimerCounter); snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->heartbeatTimerCounter);
cJSON_AddStringToObject(pRoot, "heartbeatTimerCounter", u64buf); cJSON_AddStringToObject(pRoot, "heartbeatTimerCounter", u64buf);
@ -327,143 +487,6 @@ char* syncNode2Str(const SSyncNode* pSyncNode) {
return serialized; return serialized;
} }
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
SEpSet epSet;
syncUtilraftId2EpSet(destRaftId, &epSet);
pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg);
return 0;
}
int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
SEpSet epSet;
syncUtilnodeInfo2EpSet(nodeInfo, &epSet);
pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg);
return 0;
}
int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) {
sTrace("syncNodePing pSyncNode:%p ", pSyncNode);
int32_t ret = 0;
SRpcMsg rpcMsg;
syncPing2RpcMsg(pMsg, &rpcMsg);
syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg);
{
cJSON* pJson = syncPing2Json(pMsg);
char* serialized = cJSON_Print(pJson);
sTrace("syncNodePing pMsg:%s ", serialized);
free(serialized);
cJSON_Delete(pJson);
}
{
SyncPing* pMsg2 = rpcMsg.pCont;
cJSON* pJson = syncPing2Json(pMsg2);
char* serialized = cJSON_Print(pJson);
sTrace("syncNodePing rpcMsg.pCont:%s ", serialized);
free(serialized);
cJSON_Delete(pJson);
}
return ret;
}
int32_t syncNodePingAll(SSyncNode* pSyncNode) {
sTrace("syncNodePingAll pSyncNode:%p ", pSyncNode);
int32_t ret = 0;
for (int i = 0; i < pSyncNode->syncCfg.replicaNum; ++i) {
SRaftId destId;
syncUtilnodeInfo2raftId(&pSyncNode->syncCfg.nodeInfo[i], pSyncNode->vgId, &destId);
SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, &destId);
ret = syncNodePing(pSyncNode, &destId, pMsg);
assert(ret == 0);
syncPingDestroy(pMsg);
}
}
int32_t syncNodePingPeers(SSyncNode* pSyncNode) {
int32_t ret = 0;
for (int i = 0; i < pSyncNode->peersNum; ++i) {
SRaftId destId;
syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &destId);
SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, &destId);
ret = syncNodePing(pSyncNode, &destId, pMsg);
assert(ret == 0);
syncPingDestroy(pMsg);
}
}
int32_t syncNodePingSelf(SSyncNode* pSyncNode) {
int32_t ret;
SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, &pSyncNode->myRaftId);
ret = syncNodePing(pSyncNode, &pMsg->destId, pMsg);
assert(ret == 0);
syncPingDestroy(pMsg);
}
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
pSyncNode->pingTimerMS = PING_TIMER_MS;
if (pSyncNode->pPingTimer == NULL) {
pSyncNode->pPingTimer =
taosTmrStart(pSyncNode->FpPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager);
} else {
taosTmrReset(pSyncNode->FpPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager,
&pSyncNode->pPingTimer);
}
return 0;
}
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
pSyncNode->pingTimerMS = TIMER_MAX_MS;
return 0;
}
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
pSyncNode->electTimerMS = ms;
atomic_store_64(&pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser);
if (pSyncNode->pElectTimer == NULL) {
pSyncNode->pElectTimer =
taosTmrStart(pSyncNode->FpElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager);
} else {
taosTmrReset(pSyncNode->FpElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager,
&pSyncNode->pElectTimer);
}
return 0;
}
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
atomic_add_fetch_64(&pSyncNode->electTimerLogicClockUser, 1);
pSyncNode->electTimerMS = TIMER_MAX_MS;
return 0;
}
int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
syncNodeStopElectTimer(pSyncNode);
syncNodeStartElectTimer(pSyncNode, ms);
return 0;
}
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
if (pSyncNode->pHeartbeatTimer == NULL) {
pSyncNode->pHeartbeatTimer =
taosTmrStart(pSyncNode->FpHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager);
} else {
taosTmrReset(pSyncNode->FpHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager,
&pSyncNode->pHeartbeatTimer);
}
return 0;
}
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1);
pSyncNode->heartbeatTimerMS = TIMER_MAX_MS;
return 0;
}
// for debug -------------- // for debug --------------
void syncNodePrint(SSyncNode* pObj) { void syncNodePrint(SSyncNode* pObj) {
char* serialized = syncNode2Str(pObj); char* serialized = syncNode2Str(pObj);
@ -492,61 +515,23 @@ void syncNodeLog2(char* s, SSyncNode* pObj) {
} }
// ------ local funciton --------- // ------ local funciton ---------
static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) { // enqueue message ----
int32_t ret = 0;
sTrace("<-- syncNodeOnPingCb -->");
{
cJSON* pJson = syncPing2Json(pMsg);
char* serialized = cJSON_Print(pJson);
sTrace("process syncMessage recv: syncNodeOnPingCb pMsg:%s ", serialized);
free(serialized);
cJSON_Delete(pJson);
}
SyncPingReply* pMsgReply = syncPingReplyBuild3(&ths->myRaftId, &pMsg->srcId);
SRpcMsg rpcMsg;
syncPingReply2RpcMsg(pMsgReply, &rpcMsg);
syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
return ret;
}
static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) {
int32_t ret = 0;
sTrace("<-- syncNodeOnPingReplyCb -->");
{
cJSON* pJson = syncPingReply2Json(pMsg);
char* serialized = cJSON_Print(pJson);
sTrace("process syncMessage recv: syncNodeOnPingReplyCb pMsg:%s ", serialized);
free(serialized);
cJSON_Delete(pJson);
}
return ret;
}
static void syncNodeEqPingTimer(void* param, void* tmrId) { static void syncNodeEqPingTimer(void* param, void* tmrId) {
sTrace("<-- syncNodeEqPingTimer -->");
SSyncNode* pSyncNode = (SSyncNode*)param; SSyncNode* pSyncNode = (SSyncNode*)param;
if (atomic_load_64(&pSyncNode->pingTimerLogicClockUser) <= atomic_load_64(&pSyncNode->pingTimerLogicClock)) { if (atomic_load_64(&pSyncNode->pingTimerLogicClockUser) <= atomic_load_64(&pSyncNode->pingTimerLogicClock)) {
SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_PING, atomic_load_64(&pSyncNode->pingTimerLogicClock), SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_PING, atomic_load_64(&pSyncNode->pingTimerLogicClock),
pSyncNode->pingTimerMS, pSyncNode); pSyncNode->pingTimerMS, pSyncNode);
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
syncRpcMsgLog2((char*)"==syncNodeEqPingTimer==", &rpcMsg);
pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg);
syncTimeoutDestroy(pSyncMsg); syncTimeoutDestroy(pSyncMsg);
// reset timer ms
// pSyncNode->pingTimerMS += 100;
taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager, taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager,
&pSyncNode->pPingTimer); &pSyncNode->pPingTimer);
} else { } else {
sTrace("syncNodeEqPingTimer: pingTimerLogicClock:%lu, pingTimerLogicClockUser:%lu", pSyncNode->pingTimerLogicClock, sTrace("==syncNodeEqPingTimer== pingTimerLogicClock:%lu, pingTimerLogicClockUser:%lu",
pSyncNode->pingTimerLogicClockUser); pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
} }
} }
@ -555,19 +540,18 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
if (atomic_load_64(&pSyncNode->electTimerLogicClockUser) <= atomic_load_64(&pSyncNode->electTimerLogicClock)) { if (atomic_load_64(&pSyncNode->electTimerLogicClockUser) <= atomic_load_64(&pSyncNode->electTimerLogicClock)) {
SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_ELECTION, atomic_load_64(&pSyncNode->electTimerLogicClock), SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_ELECTION, atomic_load_64(&pSyncNode->electTimerLogicClock),
pSyncNode->electTimerMS, pSyncNode); pSyncNode->electTimerMS, pSyncNode);
SRpcMsg rpcMsg;
SRpcMsg rpcMsg;
syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
syncRpcMsgLog2((char*)"==syncNodeEqElectTimer==", &rpcMsg);
pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg);
syncTimeoutDestroy(pSyncMsg); syncTimeoutDestroy(pSyncMsg);
// reset timer ms // reset timer ms
pSyncNode->electTimerMS = syncUtilElectRandomMS(); pSyncNode->electTimerMS = syncUtilElectRandomMS();
taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager, taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager,
&pSyncNode->pPingTimer); &pSyncNode->pPingTimer);
} else { } else {
sTrace("syncNodeEqElectTimer: electTimerLogicClock:%lu, electTimerLogicClockUser:%lu", sTrace("==syncNodeEqElectTimer== electTimerLogicClock:%lu, electTimerLogicClockUser:%lu",
pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser); pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser);
} }
} }
@ -579,23 +563,39 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
SyncTimeout* pSyncMsg = SyncTimeout* pSyncMsg =
syncTimeoutBuild2(SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pSyncNode->heartbeatTimerLogicClock), syncTimeoutBuild2(SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pSyncNode->heartbeatTimerLogicClock),
pSyncNode->heartbeatTimerMS, pSyncNode); pSyncNode->heartbeatTimerMS, pSyncNode);
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
syncRpcMsgLog2((char*)"==syncNodeEqHeartbeatTimer==", &rpcMsg);
pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg);
syncTimeoutDestroy(pSyncMsg); syncTimeoutDestroy(pSyncMsg);
// reset timer ms
// pSyncNode->heartbeatTimerMS += 100;
taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager, taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager,
&pSyncNode->pHeartbeatTimer); &pSyncNode->pHeartbeatTimer);
} else { } else {
sTrace("syncNodeEqHeartbeatTimer: heartbeatTimerLogicClock:%lu, heartbeatTimerLogicClockUser:%lu", sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%lu, heartbeatTimerLogicClockUser:%lu",
pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser); pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
} }
} }
// on message ----
static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) {
int32_t ret = 0;
syncPingLog2("==syncNodeOnPingCb==", pMsg);
SyncPingReply* pMsgReply = syncPingReplyBuild3(&ths->myRaftId, &pMsg->srcId);
SRpcMsg rpcMsg;
syncPingReply2RpcMsg(pMsgReply, &rpcMsg);
syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
return ret;
}
static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) {
int32_t ret = 0;
syncPingReplyLog2("==syncNodeOnPingReplyCb==", pMsg);
return ret;
}
// raft algorithm ----
static void UpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { static void UpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
if (term > pSyncNode->pRaftStore->currentTerm) { if (term > pSyncNode->pRaftStore->currentTerm) {
pSyncNode->pRaftStore->currentTerm = term; pSyncNode->pRaftStore->currentTerm = term;