From 4ddd25a29ca2c69ea5b7ce04b4975062899050be Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 7 Nov 2022 20:31:26 +0800 Subject: [PATCH 1/4] enh: adjust sync propose --- source/libs/sync/inc/syncTools.h | 5 ++--- source/libs/sync/src/syncMain.c | 19 +++++++------------ source/libs/sync/src/syncMessage.c | 10 ++++------ source/libs/sync/test/syncApplyMsgTest.cpp | 2 +- .../sync/test/syncClientRequestBatchTest.cpp | 2 +- .../libs/sync/test/syncClientRequestTest.cpp | 8 ++++---- source/libs/sync/test/syncEncodeTest.cpp | 4 ++-- source/libs/sync/test/syncEntryTest.cpp | 4 ++-- source/libs/sync/test/syncRpcMsgTest.cpp | 4 ++-- source/libs/sync/test/syncSnapshotTest.cpp | 4 ++-- source/libs/sync/test/syncWriteTest.cpp | 4 ++-- 11 files changed, 29 insertions(+), 37 deletions(-) diff --git a/source/libs/sync/inc/syncTools.h b/source/libs/sync/inc/syncTools.h index f7a9c404c1..932432d1f8 100644 --- a/source/libs/sync/inc/syncTools.h +++ b/source/libs/sync/inc/syncTools.h @@ -182,9 +182,8 @@ typedef struct SyncClientRequest { char data[]; // origin RpcMsg.pCont } SyncClientRequest; -SyncClientRequest* syncClientRequestBuild(uint32_t dataLen); -SyncClientRequest* syncClientRequestBuild2(const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, bool isWeak, - int32_t vgId); // step 1 +SyncClientRequest* syncClientRequestAlloc(uint32_t dataLen); +SyncClientRequest* syncClientRequestBuild(const SRpcMsg* pMsg, uint64_t seqNum, bool isWeak, int32_t vgId); // step 1 void syncClientRequestDestroy(SyncClientRequest* pMsg); void syncClientRequestSerialize(const SyncClientRequest* pMsg, char* buf, uint32_t bufLen); void syncClientRequestDeserialize(const char* buf, uint32_t len, SyncClientRequest* pMsg); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index e9facfb381..cb1b19ca5e 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -669,13 +669,11 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { goto _END; } - SRespStub stub; - stub.createTime = taosGetTimestampMs(); - stub.rpcMsg = *pMsg; - uint64_t seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub); + SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg}; + uint64_t seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub); - SyncClientRequest* pSyncMsg = syncClientRequestBuild2(pMsg, seqNum, isWeak, pSyncNode->vgId); - SRpcMsg rpcMsg; + SyncClientRequest* pSyncMsg = syncClientRequestBuild(pMsg, seqNum, isWeak, pSyncNode->vgId); + SRpcMsg rpcMsg = {0}; syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg); // optimized one replica @@ -696,12 +694,9 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { sError("vgId:%d, failed to sync optimize index:%" PRId64 ", type:%s", pSyncNode->vgId, retIndex, TMSG_INFO(pMsg->msgType)); } - } else { - if (pSyncNode->syncEqMsg != NULL && (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg) == 0) { - ret = 0; - } else { - ret = -1; + ret = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg); + if (ret != 0) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; sError("vgId:%d, failed to enqueue msg since its null", pSyncNode->vgId); } @@ -2322,7 +2317,7 @@ static int32_t syncNodeEqNoop(SSyncNode* ths) { uint32_t entryLen; char* serialized = syncEntrySerialize(pEntry, &entryLen); - SyncClientRequest* pSyncMsg = syncClientRequestBuild(entryLen); + SyncClientRequest* pSyncMsg = syncClientRequestAlloc(entryLen); ASSERT(pSyncMsg->dataLen == entryLen); memcpy(pSyncMsg->data, serialized, entryLen); diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index f4e9d83503..e052a7a4fa 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -831,10 +831,9 @@ void syncPingReplyLog2(char* s, const SyncPingReply* pMsg) { } // ---- message process SyncClientRequest---- -SyncClientRequest* syncClientRequestBuild(uint32_t dataLen) { +SyncClientRequest* syncClientRequestAlloc(uint32_t dataLen) { uint32_t bytes = sizeof(SyncClientRequest) + dataLen; - SyncClientRequest* pMsg = taosMemoryMalloc(bytes); - memset(pMsg, 0, bytes); + SyncClientRequest* pMsg = taosMemoryCalloc(1, bytes); pMsg->bytes = bytes; pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST; pMsg->seqNum = 0; @@ -844,8 +843,8 @@ SyncClientRequest* syncClientRequestBuild(uint32_t dataLen) { } // step 1. original SRpcMsg => SyncClientRequest, add seqNum, isWeak -SyncClientRequest* syncClientRequestBuild2(const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, bool isWeak, int32_t vgId) { - SyncClientRequest* pMsg = syncClientRequestBuild(pOriginalRpcMsg->contLen); +SyncClientRequest* syncClientRequestBuild(const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, bool isWeak, int32_t vgId) { + SyncClientRequest* pMsg = syncClientRequestAlloc(pOriginalRpcMsg->contLen); pMsg->vgId = vgId; pMsg->originalRpcType = pOriginalRpcMsg->msgType; pMsg->seqNum = seqNum; @@ -891,7 +890,6 @@ SyncClientRequest* syncClientRequestDeserialize2(const char* buf, uint32_t len) // step 2. SyncClientRequest => RpcMsg, to queue void syncClientRequest2RpcMsg(const SyncClientRequest* pMsg, SRpcMsg* pRpcMsg) { - memset(pRpcMsg, 0, sizeof(*pRpcMsg)); pRpcMsg->msgType = pMsg->msgType; pRpcMsg->contLen = pMsg->bytes; pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); diff --git a/source/libs/sync/test/syncApplyMsgTest.cpp b/source/libs/sync/test/syncApplyMsgTest.cpp index 9d5e7dd8e1..ce129015c6 100644 --- a/source/libs/sync/test/syncApplyMsgTest.cpp +++ b/source/libs/sync/test/syncApplyMsgTest.cpp @@ -81,7 +81,7 @@ void test4() { void test5() { SyncApplyMsg *pMsg = createMsg(); - SRpcMsg rpcMsg; + SRpcMsg rpcMsg = {0}; syncApplyMsg2RpcMsg(pMsg, &rpcMsg); SyncApplyMsg *pMsg2 = syncApplyMsgFromRpcMsg2(&rpcMsg); syncApplyMsgLog2((char *)"test5: syncClientRequest2RpcMsg -> syncApplyMsgFromRpcMsg2 ", pMsg2); diff --git a/source/libs/sync/test/syncClientRequestBatchTest.cpp b/source/libs/sync/test/syncClientRequestBatchTest.cpp index 5586b7a6ce..f07ee08b2b 100644 --- a/source/libs/sync/test/syncClientRequestBatchTest.cpp +++ b/source/libs/sync/test/syncClientRequestBatchTest.cpp @@ -59,7 +59,7 @@ void test2() { uint32_t len = pMsg->bytes; char * serialized = (char *)taosMemoryMalloc(len); syncClientRequestSerialize(pMsg, serialized, len); - SyncClientRequest *pMsg2 = syncClientRequestBuild(pMsg->dataLen); + SyncClientRequest *pMsg2 = syncClientRequestAlloc(pMsg->dataLen); syncClientRequestDeserialize(serialized, len, pMsg2); syncClientRequestLog2((char *)"test2: syncClientRequestSerialize -> syncClientRequestDeserialize ", pMsg2); diff --git a/source/libs/sync/test/syncClientRequestTest.cpp b/source/libs/sync/test/syncClientRequestTest.cpp index 56e53cc1c9..b6bfcc2da5 100644 --- a/source/libs/sync/test/syncClientRequestTest.cpp +++ b/source/libs/sync/test/syncClientRequestTest.cpp @@ -21,7 +21,7 @@ SyncClientRequest *createMsg() { rpcMsg.contLen = 20; rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); strcpy((char *)rpcMsg.pCont, "hello rpc"); - SyncClientRequest *pMsg = syncClientRequestBuild2(&rpcMsg, 123, true, 1000); + SyncClientRequest *pMsg = syncClientRequestBuild(&rpcMsg, 123, true, 1000); rpcFreeCont(rpcMsg.pCont); return pMsg; } @@ -37,7 +37,7 @@ void test2() { uint32_t len = pMsg->bytes; char *serialized = (char *)taosMemoryMalloc(len); syncClientRequestSerialize(pMsg, serialized, len); - SyncClientRequest *pMsg2 = syncClientRequestBuild(pMsg->dataLen); + SyncClientRequest *pMsg2 = syncClientRequestAlloc(pMsg->dataLen); syncClientRequestDeserialize(serialized, len, pMsg2); syncClientRequestLog2((char *)"test2: syncClientRequestSerialize -> syncClientRequestDeserialize ", pMsg2); @@ -60,7 +60,7 @@ void test3() { void test4() { SyncClientRequest *pMsg = createMsg(); - SRpcMsg rpcMsg; + SRpcMsg rpcMsg = {0}; syncClientRequest2RpcMsg(pMsg, &rpcMsg); SyncClientRequest *pMsg2 = (SyncClientRequest *)taosMemoryMalloc(rpcMsg.contLen); syncClientRequestFromRpcMsg(&rpcMsg, pMsg2); @@ -73,7 +73,7 @@ void test4() { void test5() { SyncClientRequest *pMsg = createMsg(); - SRpcMsg rpcMsg; + SRpcMsg rpcMsg = {0}; syncClientRequest2RpcMsg(pMsg, &rpcMsg); SyncClientRequest *pMsg2 = syncClientRequestFromRpcMsg2(&rpcMsg); syncClientRequestLog2((char *)"test5: syncClientRequest2RpcMsg -> syncClientRequestFromRpcMsg2 ", pMsg2); diff --git a/source/libs/sync/test/syncEncodeTest.cpp b/source/libs/sync/test/syncEncodeTest.cpp index 092442bab1..c60176fbf8 100644 --- a/source/libs/sync/test/syncEncodeTest.cpp +++ b/source/libs/sync/test/syncEncodeTest.cpp @@ -102,12 +102,12 @@ SRpcMsg *step0() { } SyncClientRequest *step1(const SRpcMsg *pMsg) { - SyncClientRequest *pRetMsg = syncClientRequestBuild2(pMsg, 123, true, 1000); + SyncClientRequest *pRetMsg = syncClientRequestBuild(pMsg, 123, true, 1000); return pRetMsg; } SRpcMsg *step2(const SyncClientRequest *pMsg) { - SRpcMsg *pRetMsg = (SRpcMsg *)taosMemoryMalloc(sizeof(SRpcMsg)); + SRpcMsg *pRetMsg = (SRpcMsg *)taosMemoryCalloc(sizeof(SRpcMsg), 1); syncClientRequest2RpcMsg(pMsg, pRetMsg); return pRetMsg; } diff --git a/source/libs/sync/test/syncEntryTest.cpp b/source/libs/sync/test/syncEntryTest.cpp index 53ae91fc60..b274408c01 100644 --- a/source/libs/sync/test/syncEntryTest.cpp +++ b/source/libs/sync/test/syncEntryTest.cpp @@ -32,7 +32,7 @@ void test1() { } void test2() { - SyncClientRequest* pSyncMsg = syncClientRequestBuild(10); + SyncClientRequest* pSyncMsg = syncClientRequestAlloc(10); pSyncMsg->originalRpcType = 33; pSyncMsg->seqNum = 11; pSyncMsg->isWeak = 1; @@ -46,7 +46,7 @@ void test2() { } void test3() { - SyncClientRequest* pSyncMsg = syncClientRequestBuild(10); + SyncClientRequest* pSyncMsg = syncClientRequestAlloc(10); pSyncMsg->originalRpcType = 33; pSyncMsg->seqNum = 11; pSyncMsg->isWeak = 1; diff --git a/source/libs/sync/test/syncRpcMsgTest.cpp b/source/libs/sync/test/syncRpcMsgTest.cpp index 0b21f41080..941fa7eab5 100644 --- a/source/libs/sync/test/syncRpcMsgTest.cpp +++ b/source/libs/sync/test/syncRpcMsgTest.cpp @@ -47,7 +47,7 @@ SyncClientRequest *createSyncClientRequest() { rpcMsg.contLen = 20; rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); strcpy((char *)rpcMsg.pCont, "hello rpc"); - SyncClientRequest *pMsg = syncClientRequestBuild2(&rpcMsg, 123, true, 1000); + SyncClientRequest *pMsg = syncClientRequestBuild(&rpcMsg, 123, true, 1000); return pMsg; } @@ -156,7 +156,7 @@ void test7() { void test8() { SyncClientRequest *pMsg = createSyncClientRequest(); - SRpcMsg rpcMsg; + SRpcMsg rpcMsg = {0}; syncClientRequest2RpcMsg(pMsg, &rpcMsg); syncRpcMsgLog2((char *)"test8", &rpcMsg); syncClientRequestDestroy(pMsg); diff --git a/source/libs/sync/test/syncSnapshotTest.cpp b/source/libs/sync/test/syncSnapshotTest.cpp index 5b5c902a01..a1cedd624a 100644 --- a/source/libs/sync/test/syncSnapshotTest.cpp +++ b/source/libs/sync/test/syncSnapshotTest.cpp @@ -162,7 +162,7 @@ SRpcMsg *step0() { } SyncClientRequest *step1(const SRpcMsg *pMsg) { - SyncClientRequest *pRetMsg = syncClientRequestBuild2(pMsg, 123, true, 1000); + SyncClientRequest *pRetMsg = syncClientRequestBuild(pMsg, 123, true, 1000); return pRetMsg; } @@ -206,7 +206,7 @@ int main(int argc, char **argv) { for (int i = 0; i < 10; ++i) { SyncClientRequest *pSyncClientRequest = pMsg1; - SRpcMsg rpcMsg; + SRpcMsg rpcMsg = {0}; syncClientRequest2RpcMsg(pSyncClientRequest, &rpcMsg); gSyncNode->syncEqMsg(gSyncNode->msgcb, &rpcMsg); diff --git a/source/libs/sync/test/syncWriteTest.cpp b/source/libs/sync/test/syncWriteTest.cpp index 570b614f3b..7c5334f668 100644 --- a/source/libs/sync/test/syncWriteTest.cpp +++ b/source/libs/sync/test/syncWriteTest.cpp @@ -140,7 +140,7 @@ SRpcMsg *step0() { } SyncClientRequest *step1(const SRpcMsg *pMsg) { - SyncClientRequest *pRetMsg = syncClientRequestBuild2(pMsg, 123, true, 1000); + SyncClientRequest *pRetMsg = syncClientRequestBuild(pMsg, 123, true, 1000); return pRetMsg; } @@ -181,7 +181,7 @@ int main(int argc, char **argv) { for (int i = 0; i < 10; ++i) { SyncClientRequest *pSyncClientRequest = pMsg1; - SRpcMsg rpcMsg; + SRpcMsg rpcMsg = {0}; syncClientRequest2RpcMsg(pSyncClientRequest, &rpcMsg); gSyncNode->syncEqMsg(gSyncNode->msgcb, &rpcMsg); From 848fd58402e2db8a2bb62a3c3c7e0236e935e476 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 7 Nov 2022 21:12:43 +0800 Subject: [PATCH 2/4] enh: adjust sync propose --- source/libs/sync/src/syncMain.c | 89 ++++++++++++++---------------- source/libs/sync/src/syncMessage.c | 2 - source/libs/sync/src/syncUtil.c | 24 +------- 3 files changed, 44 insertions(+), 71 deletions(-) diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index cb1b19ca5e..22a3ff539a 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -656,64 +656,57 @@ int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) { } int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { - int32_t ret = 0; - sNTrace(pSyncNode, "propose message, type:%s", TMSG_INFO(pMsg->msgType)); + if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { + terrno = TSDB_CODE_SYN_NOT_LEADER; + sNError(pSyncNode, "sync propose not leader, %s, type:%s", syncStr(pSyncNode->state), TMSG_INFO(pMsg->msgType)); + return -1; + } - if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { - // not restored, vnode enable - if (!pSyncNode->restoreFinish && pSyncNode->vgId != 1) { + // not restored, vnode enable + if (!pSyncNode->restoreFinish && pSyncNode->vgId != 1) { + terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY; + sNError(pSyncNode, "failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64, + TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex); + return -1; + } + + int32_t ret = 0; + SyncClientRequest* pSyncMsg; + + // optimized one replica + if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) { + pSyncMsg = syncClientRequestBuild(pMsg, 0, isWeak, pSyncNode->vgId); + + SyncIndex retIndex; + int32_t code = syncNodeOnClientRequest(pSyncNode, pSyncMsg, &retIndex); + if (code == 0) { + pMsg->info.conn.applyIndex = retIndex; + pMsg->info.conn.applyTerm = pSyncNode->pRaftStore->currentTerm; + ret = 1; + sTrace("vgId:%d, sync optimize index:%" PRId64 ", type:%s", pSyncNode->vgId, retIndex, TMSG_INFO(pMsg->msgType)); + } else { ret = -1; - terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY; - sError("vgId:%d, failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64, - pSyncNode->vgId, TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex); - goto _END; + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + sError("vgId:%d, failed to sync optimize index:%" PRId64 ", type:%s", pSyncNode->vgId, retIndex, + TMSG_INFO(pMsg->msgType)); } - + } else { SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg}; uint64_t seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub); - SyncClientRequest* pSyncMsg = syncClientRequestBuild(pMsg, seqNum, isWeak, pSyncNode->vgId); - SRpcMsg rpcMsg = {0}; + pSyncMsg = syncClientRequestBuild(pMsg, seqNum, isWeak, pSyncNode->vgId); + SRpcMsg rpcMsg = {0}; syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg); - // optimized one replica - if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) { - SyncIndex retIndex; - int32_t code = syncNodeOnClientRequest(pSyncNode, pSyncMsg, &retIndex); - if (code == 0) { - pMsg->info.conn.applyIndex = retIndex; - pMsg->info.conn.applyTerm = pSyncNode->pRaftStore->currentTerm; - rpcFreeCont(rpcMsg.pCont); - syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum); - ret = 1; - sDebug("vgId:%d, sync optimize index:%" PRId64 ", type:%s", pSyncNode->vgId, retIndex, - TMSG_INFO(pMsg->msgType)); - } else { - ret = -1; - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; - sError("vgId:%d, failed to sync optimize index:%" PRId64 ", type:%s", pSyncNode->vgId, retIndex, - TMSG_INFO(pMsg->msgType)); - } - } else { - ret = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg); - if (ret != 0) { - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; - sError("vgId:%d, failed to enqueue msg since its null", pSyncNode->vgId); - } + sNTrace(pSyncNode, "propose message, type:%s", TMSG_INFO(pMsg->msgType)); + ret = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg); + if (ret != 0) { + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + sError("vgId:%d, failed to enqueue msg since %s", pSyncNode->vgId, terrstr()); } - - syncClientRequestDestroy(pSyncMsg); - goto _END; - - } else { - ret = -1; - terrno = TSDB_CODE_SYN_NOT_LEADER; - sError("vgId:%d, sync propose not leader, %s, type:%s", pSyncNode->vgId, syncStr(pSyncNode->state), - TMSG_INFO(pMsg->msgType)); - goto _END; } -_END: + syncClientRequestDestroy(pSyncMsg); return ret; } @@ -2522,7 +2515,7 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SyncClientRequest* pMsg, SyncInd SyncIndex index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore); SyncTerm term = ths->pRaftStore->currentTerm; - SSyncRaftEntry* pEntry = syncEntryBuild2((SyncClientRequest*)pMsg, term, index); + SSyncRaftEntry* pEntry = syncEntryBuild2(pMsg, term, index); ASSERT(pEntry != NULL); LRUHandle* h = NULL; diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index e052a7a4fa..6453693576 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -836,8 +836,6 @@ SyncClientRequest* syncClientRequestAlloc(uint32_t dataLen) { SyncClientRequest* pMsg = taosMemoryCalloc(1, bytes); pMsg->bytes = bytes; pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST; - pMsg->seqNum = 0; - pMsg->isWeak = false; pMsg->dataLen = dataLen; return pMsg; } diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 31c649b2f3..4f6f5bdc33 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -178,29 +178,11 @@ void syncUtilMsgNtoH(void* msg) { pHead->vgId = ntohl(pHead->vgId); } -bool syncUtilUserPreCommit(tmsg_t msgType) { - if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER) { - return true; - } +bool syncUtilUserPreCommit(tmsg_t msgType) { return msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER; } - return false; -} +bool syncUtilUserCommit(tmsg_t msgType) { return msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER; } -bool syncUtilUserCommit(tmsg_t msgType) { - if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER) { - return true; - } - - return false; -} - -bool syncUtilUserRollback(tmsg_t msgType) { - if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER) { - return true; - } - - return false; -} +bool syncUtilUserRollback(tmsg_t msgType) { return msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER; } void syncCfg2SimpleStr(const SSyncCfg* pCfg, char* buf, int32_t bufLen) { int32_t len = snprintf(buf, bufLen, "{r-num:%d, my:%d, ", pCfg->replicaNum, pCfg->myIndex); From 9914657cb031d4d3fc5a783ea844ffdcf1a51d20 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 7 Nov 2022 23:31:01 +0800 Subject: [PATCH 3/4] refactor: adjust sync hb --- source/libs/sync/inc/syncInt.h | 4 -- source/libs/sync/src/syncMain.c | 97 +++++++++++++++--------------- source/libs/sync/src/syncUtil.c | 6 +- source/libs/sync/src/syncVoteMgr.c | 24 ++++---- 4 files changed, 65 insertions(+), 66 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 6b6f14da00..3085b6d8f4 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -111,10 +111,6 @@ typedef struct SElectTimer { void* pData; } SElectTimer; -int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId); -int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer); -int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer); - typedef struct SPeerState { SyncIndex lastSendIndex; int64_t lastSendTime; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 22a3ff539a..3d68f294e7 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -42,6 +42,9 @@ static int32_t syncNodeEqNoop(SSyncNode* ths); static int32_t syncNodeAppendNoop(SSyncNode* ths); static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId); static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg); +static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId); +static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer); +static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer); int64_t syncOpen(SSyncInfo* pSyncInfo) { SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo); @@ -512,7 +515,7 @@ int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct ASSERT(pSyncNode->pRaftCfg->configIndexCount >= 1); SyncIndex lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[0]; - for (int i = 0; i < pSyncNode->pRaftCfg->configIndexCount; ++i) { + for (int32_t i = 0; i < pSyncNode->pRaftCfg->configIndexCount; ++i) { if ((pSyncNode->pRaftCfg->configIndexArr)[i] > lastIndex && (pSyncNode->pRaftCfg->configIndexArr)[i] <= snapshotIndex) { lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i]; @@ -531,7 +534,7 @@ SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapsho ASSERT(pSyncNode->pRaftCfg->configIndexCount >= 1); SyncIndex lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[0]; - for (int i = 0; i < pSyncNode->pRaftCfg->configIndexCount; ++i) { + for (int32_t i = 0; i < pSyncNode->pRaftCfg->configIndexCount; ++i) { if ((pSyncNode->pRaftCfg->configIndexArr)[i] > lastIndex && (pSyncNode->pRaftCfg->configIndexArr)[i] <= snapshotLastApplyIndex) { lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i]; @@ -600,7 +603,7 @@ void syncGetEpSet(int64_t rid, SEpSet* pEpSet) { } ASSERT(rid == pSyncNode->rid); pEpSet->numOfEps = 0; - for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) { + for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) { snprintf(pEpSet->eps[i].fqdn, sizeof(pEpSet->eps[i].fqdn), "%s", (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodeFqdn); pEpSet->eps[i].port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort; (pEpSet->numOfEps)++; @@ -621,7 +624,7 @@ void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) { } pEpSet->numOfEps = 0; - for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) { + for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) { snprintf(pEpSet->eps[i].fqdn, sizeof(pEpSet->eps[i].fqdn), "%s", (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodeFqdn); pEpSet->eps[i].port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort; (pEpSet->numOfEps)++; @@ -710,7 +713,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { return ret; } -int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId) { +static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId) { pSyncTimer->pTimer = NULL; pSyncTimer->counter = 0; pSyncTimer->timerMS = pSyncNode->hbBaseLine; @@ -720,7 +723,7 @@ int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId de return 0; } -int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { +static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { int32_t ret = 0; if (syncIsInit()) { SSyncHbTimerData* pData = taosMemoryMalloc(sizeof(SSyncHbTimerData)); @@ -737,7 +740,7 @@ int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { return ret; } -int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { +static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { int32_t ret = 0; atomic_add_fetch_64(&pSyncTimer->logicClock, 1); taosTmrStop(pSyncTimer->pTimer); @@ -837,14 +840,14 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { // init peersNum, peers, peersId pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1; - int j = 0; - for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) { + int32_t j = 0; + for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) { if (i != pSyncNode->pRaftCfg->cfg.myIndex) { pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i]; j++; } } - for (int i = 0; i < pSyncNode->peersNum; ++i) { + for (int32_t i = 0; i < pSyncNode->peersNum; ++i) { if (!syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i])) { sError("vgId:%d, failed to determine raft member id, peer:%d", pSyncNode->vgId, i); goto _error; @@ -853,7 +856,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { // init replicaNum, replicasId pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum; - for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) { + for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) { if (!syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i])) { sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i); goto _error; @@ -1002,7 +1005,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { pSyncNode->restoreFinish = false; // snapshot senders - for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { + for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) { SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, i); // ASSERT(pSender != NULL); (pSyncNode->senders)[i] = pSender; @@ -1133,7 +1136,7 @@ void syncNodeClose(SSyncNode* pSyncNode) { taosMemoryFree(pSyncNode->pFsm); } - for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { + for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) { if ((pSyncNode->senders)[i] != NULL) { snapshotSenderDestroy((pSyncNode->senders)[i]); (pSyncNode->senders)[i] = NULL; @@ -1178,7 +1181,7 @@ int32_t syncNodePingSelf(SSyncNode* pSyncNode) { int32_t syncNodePingPeers(SSyncNode* pSyncNode) { int32_t ret = 0; - for (int i = 0; i < pSyncNode->peersNum; ++i) { + for (int32_t i = 0; i < pSyncNode->peersNum; ++i) { SRaftId* destId = &(pSyncNode->peersId[i]); SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, destId, pSyncNode->vgId); ret = syncNodePing(pSyncNode, destId, pMsg); @@ -1190,7 +1193,7 @@ int32_t syncNodePingPeers(SSyncNode* pSyncNode) { int32_t syncNodePingAll(SSyncNode* pSyncNode) { int32_t ret = 0; - for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) { + for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) { SRaftId* destId = &(pSyncNode->replicasId[i]); SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, destId, pSyncNode->vgId); ret = syncNodePing(pSyncNode, destId, pMsg); @@ -1294,7 +1297,7 @@ int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) { ret = syncNodeDoStartHeartbeatTimer(pSyncNode); #endif - for (int i = 0; i < pSyncNode->peersNum; ++i) { + for (int32_t i = 0; i < pSyncNode->peersNum; ++i) { SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i])); if (pSyncTimer != NULL) { syncHbTimerStart(pSyncNode, pSyncTimer); @@ -1313,7 +1316,7 @@ int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) { pSyncNode->pHeartbeatTimer = NULL; #endif - for (int i = 0; i < pSyncNode->peersNum; ++i) { + for (int32_t i = 0; i < pSyncNode->peersNum; ++i) { SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i])); if (pSyncTimer != NULL) { syncHbTimerStop(pSyncNode, pSyncTimer); @@ -1396,19 +1399,19 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) { cJSON_AddNumberToObject(pRoot, "peersNum", pSyncNode->peersNum); cJSON* pPeers = cJSON_CreateArray(); cJSON_AddItemToObject(pRoot, "peersNodeInfo", pPeers); - for (int i = 0; i < pSyncNode->peersNum; ++i) { + for (int32_t i = 0; i < pSyncNode->peersNum; ++i) { cJSON_AddItemToArray(pPeers, syncUtilNodeInfo2Json(&pSyncNode->peersNodeInfo[i])); } cJSON* pPeersId = cJSON_CreateArray(); cJSON_AddItemToObject(pRoot, "peersId", pPeersId); - for (int i = 0; i < pSyncNode->peersNum; ++i) { + for (int32_t i = 0; i < pSyncNode->peersNum; ++i) { cJSON_AddItemToArray(pPeersId, syncUtilRaftId2Json(&pSyncNode->peersId[i])); } cJSON_AddNumberToObject(pRoot, "replicaNum", pSyncNode->replicaNum); cJSON* pReplicasId = cJSON_CreateArray(); cJSON_AddItemToObject(pRoot, "replicasId", pReplicasId); - for (int i = 0; i < pSyncNode->replicaNum; ++i) { + for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) { cJSON_AddItemToArray(pReplicasId, syncUtilRaftId2Json(&pSyncNode->replicasId[i])); } @@ -1505,7 +1508,7 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) { // snapshot senders cJSON* pSenders = cJSON_CreateArray(); cJSON_AddItemToObject(pRoot, "senders", pSenders); - for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { + for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) { cJSON_AddItemToArray(pSenders, snapshotSender2Json((pSyncNode->senders)[i])); } @@ -1530,8 +1533,8 @@ char* syncNode2Str(const SSyncNode* pSyncNode) { } inline char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { - int len = 256; - char* s = (char*)taosMemoryMalloc(len); + int32_t len = 256; + char* s = (char*)taosMemoryMalloc(len); SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { @@ -1556,7 +1559,7 @@ inline bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config) { bool b1 = false; bool b2 = false; - for (int i = 0; i < config->replicaNum; ++i) { + for (int32_t i = 0; i < config->replicaNum; ++i) { if (strcmp((config->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 && (config->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) { b1 = true; @@ -1564,7 +1567,7 @@ inline bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config) { } } - for (int i = 0; i < config->replicaNum; ++i) { + for (int32_t i = 0; i < config->replicaNum; ++i) { SRaftId raftId; raftId.addr = syncUtilAddr2U64((config->nodeInfo)[i].nodeFqdn, (config->nodeInfo)[i].nodePort); raftId.vgId = pSyncNode->vgId; @@ -1646,7 +1649,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde SRaftId oldReplicasId[TSDB_MAX_REPLICA]; memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId)); SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA]; - for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { + for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) { oldSenders[i] = (pSyncNode->senders)[i]; sSTrace(oldSenders[i], "snapshot sender save old"); } @@ -1657,20 +1660,20 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde // init peersNum, peers, peersId pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1; - int j = 0; - for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) { + int32_t j = 0; + for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) { if (i != pSyncNode->pRaftCfg->cfg.myIndex) { pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i]; j++; } } - for (int i = 0; i < pSyncNode->peersNum; ++i) { + for (int32_t i = 0; i < pSyncNode->peersNum; ++i) { syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]); } // init replicaNum, replicasId pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum; - for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) { + for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) { syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]); } @@ -1685,15 +1688,15 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde // reset snapshot senders // clear new - for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { + for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) { (pSyncNode->senders)[i] = NULL; } // reset new - for (int i = 0; i < pSyncNode->replicaNum; ++i) { + for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) { // reset sender bool reset = false; - for (int j = 0; j < TSDB_MAX_REPLICA; ++j) { + for (int32_t j = 0; j < TSDB_MAX_REPLICA; ++j) { if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j])) { char host[128]; uint16_t port; @@ -1716,7 +1719,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde } // create new - for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { + for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) { if ((pSyncNode->senders)[i] == NULL) { (pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i); sSTrace((pSyncNode->senders)[i], "snapshot sender create new"); @@ -1724,7 +1727,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde } // free old - for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { + for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) { if (oldSenders[i] != NULL) { snapshotSenderDestroy(oldSenders[i]); sNTrace(pSyncNode, "snapshot sender delete old %p replica-index:%d", oldSenders[i], i); @@ -1865,7 +1868,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { // set leader cache pSyncNode->leaderCache = pSyncNode->myRaftId; - for (int i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) { + for (int32_t i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) { // maybe overwrite myself, no harm // just do it! @@ -1879,7 +1882,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { pSyncNode->pNextIndex->index[i] = lastIndex + 1; } - for (int i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) { + for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) { // maybe overwrite myself, no harm // just do it! pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID; @@ -1892,7 +1895,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { // update sender private term SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId)); if (pMySender != NULL) { - for (int i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) { + for (int32_t i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) { if ((pSyncNode->senders)[i]->privateTerm > pMySender->privateTerm) { pMySender->privateTerm = (pSyncNode->senders)[i]->privateTerm; } @@ -1946,7 +1949,7 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) { bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); } int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) { - for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { + for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) { pSyncNode->peerStates[i].lastSendIndex = SYNC_INDEX_INVALID; pSyncNode->peerStates[i].lastSendTime = 0; } @@ -2332,8 +2335,8 @@ static int32_t syncNodeEqNoop(SSyncNode* ths) { static void deleteCacheEntry(const void* key, size_t keyLen, void* value) { taosMemoryFree(value); } static int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) { - int code = 0; - int entryLen = sizeof(*pEntry) + pEntry->dataLen; + int32_t code = 0; + int32_t entryLen = sizeof(*pEntry) + pEntry->dataLen; LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen, deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW); if (status != TAOS_LRU_STATUS_OK) { @@ -2641,7 +2644,7 @@ int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* p } int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) { - for (int i = 0; i < pNewCfg->replicaNum; ++i) { + for (int32_t i = 0; i < pNewCfg->replicaNum; ++i) { SRaftId raftId; raftId.addr = syncUtilAddr2U64((pNewCfg->nodeInfo)[i].nodeFqdn, (pNewCfg->nodeInfo)[i].nodePort); raftId.vgId = ths->vgId; @@ -2772,7 +2775,7 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde } bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) { - for (int i = 0; i < ths->replicaNum; ++i) { + for (int32_t i = 0; i < ths->replicaNum; ++i) { if (syncUtilSameId(&((ths->replicasId)[i]), pRaftId)) { return true; } @@ -2782,7 +2785,7 @@ bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) { SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) { SSyncSnapshotSender* pSender = NULL; - for (int i = 0; i < ths->replicaNum; ++i) { + for (int32_t i = 0; i < ths->replicaNum; ++i) { if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) { pSender = (ths->senders)[i]; } @@ -2792,7 +2795,7 @@ SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) { SSyncTimer* pTimer = NULL; - for (int i = 0; i < ths->replicaNum; ++i) { + for (int32_t i = 0; i < ths->replicaNum; ++i) { if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) { pTimer = &((ths->peerHeartbeatTimerArr)[i]); } @@ -2802,7 +2805,7 @@ SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId) { SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId) { SPeerState* pState = NULL; - for (int i = 0; i < ths->replicaNum; ++i) { + for (int32_t i = 0; i < ths->replicaNum; ++i) { if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) { pState = &((ths->peerStates)[i]); } @@ -2841,7 +2844,7 @@ bool syncNodeCanChange(SSyncNode* pSyncNode) { } } - for (int i = 0; i < pSyncNode->peersNum; ++i) { + for (int32_t i = 0; i < pSyncNode->peersNum; ++i) { SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]); if (pSender != NULL && pSender->start) { sError("sync cannot change3"); diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 4f6f5bdc33..e82578ef4d 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -142,7 +142,7 @@ char* syncUtilPrintBin(char* ptr, uint32_t len) { memset(s, 0, len + 1); memcpy(s, ptr, len); - for (int i = 0; i < len; ++i) { + for (int32_t i = 0; i < len; ++i) { if (!syncUtilCanPrint(s[i])) { s[i] = '.'; } @@ -157,8 +157,8 @@ char* syncUtilPrintBin2(char* ptr, uint32_t len) { memset(s, 0, len2); char* p = s; - for (int i = 0; i < len; ++i) { - int n = sprintf(p, "%d,", ptr[i]); + for (int32_t i = 0; i < len; ++i) { + int32_t n = sprintf(p, "%d,", ptr[i]); p += n; } return s; diff --git a/source/libs/sync/src/syncVoteMgr.c b/source/libs/sync/src/syncVoteMgr.c index 5aeea23b55..ee1f83ee6a 100644 --- a/source/libs/sync/src/syncVoteMgr.c +++ b/source/libs/sync/src/syncVoteMgr.c @@ -74,8 +74,8 @@ void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg) { ASSERT(syncUtilSameId(&pVotesGranted->pSyncNode->myRaftId, &pMsg->destId)); - int j = -1; - for (int i = 0; i < pVotesGranted->replicaNum; ++i) { + int32_t j = -1; + for (int32_t i = 0; i < pVotesGranted->replicaNum; ++i) { if (syncUtilSameId(&((*(pVotesGranted->replicas))[i]), &(pMsg->srcId))) { j = i; break; @@ -105,11 +105,11 @@ cJSON *voteGranted2Json(SVotesGranted *pVotesGranted) { cJSON_AddNumberToObject(pRoot, "replicaNum", pVotesGranted->replicaNum); cJSON *pReplicas = cJSON_CreateArray(); cJSON_AddItemToObject(pRoot, "replicas", pReplicas); - for (int i = 0; i < pVotesGranted->replicaNum; ++i) { + for (int32_t i = 0; i < pVotesGranted->replicaNum; ++i) { cJSON_AddItemToArray(pReplicas, syncUtilRaftId2Json(&(*(pVotesGranted->replicas))[i])); } - int *arr = (int *)taosMemoryMalloc(sizeof(int) * pVotesGranted->replicaNum); - for (int i = 0; i < pVotesGranted->replicaNum; ++i) { + int32_t *arr = (int32_t *)taosMemoryMalloc(sizeof(int32_t) * pVotesGranted->replicaNum); + for (int32_t i = 0; i < pVotesGranted->replicaNum; ++i) { arr[i] = pVotesGranted->isGranted[i]; } cJSON *pIsGranted = cJSON_CreateIntArray(arr, pVotesGranted->replicaNum); @@ -168,7 +168,7 @@ void votesRespondUpdate(SVotesRespond *pVotesRespond, SSyncNode *pSyncNode) { bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId) { bool ret = false; - for (int i = 0; i < pVotesRespond->replicaNum; ++i) { + for (int32_t i = 0; i < pVotesRespond->replicaNum; ++i) { if (syncUtilSameId(&(*pVotesRespond->replicas)[i], pRaftId) && pVotesRespond->isRespond[i]) { ret = true; break; @@ -183,7 +183,7 @@ void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *p return; } - for (int i = 0; i < pVotesRespond->replicaNum; ++i) { + for (int32_t i = 0; i < pVotesRespond->replicaNum; ++i) { if (syncUtilSameId(&((*(pVotesRespond->replicas))[i]), &pMsg->srcId)) { // ASSERT(pVotesRespond->isRespond[i] == false); pVotesRespond->isRespond[i] = true; @@ -197,7 +197,7 @@ void votesRespondReset(SVotesRespond *pVotesRespond, SyncTerm term) { pVotesRespond->term = term; memset(pVotesRespond->isRespond, 0, sizeof(pVotesRespond->isRespond)); /* - for (int i = 0; i < pVotesRespond->replicaNum; ++i) { + for (int32_t i = 0; i < pVotesRespond->replicaNum; ++i) { pVotesRespond->isRespond[i] = false; } */ @@ -211,12 +211,12 @@ cJSON *votesRespond2Json(SVotesRespond *pVotesRespond) { cJSON_AddNumberToObject(pRoot, "replicaNum", pVotesRespond->replicaNum); cJSON *pReplicas = cJSON_CreateArray(); cJSON_AddItemToObject(pRoot, "replicas", pReplicas); - for (int i = 0; i < pVotesRespond->replicaNum; ++i) { + for (int32_t i = 0; i < pVotesRespond->replicaNum; ++i) { cJSON_AddItemToArray(pReplicas, syncUtilRaftId2Json(&(*(pVotesRespond->replicas))[i])); } - int respondNum = 0; - int *arr = (int *)taosMemoryMalloc(sizeof(int) * pVotesRespond->replicaNum); - for (int i = 0; i < pVotesRespond->replicaNum; ++i) { + int32_t respondNum = 0; + int32_t *arr = (int32_t *)taosMemoryMalloc(sizeof(int32_t) * pVotesRespond->replicaNum); + for (int32_t i = 0; i < pVotesRespond->replicaNum; ++i) { arr[i] = pVotesRespond->isRespond[i]; if (pVotesRespond->isRespond[i]) { respondNum++; From 29f88785194792be55247986eab6a5ba6c76b30d Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 7 Nov 2022 23:32:04 +0800 Subject: [PATCH 4/4] refactor: adjust sync hb --- source/libs/sync/src/syncMain.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index d07b4d8b27..0d7306e4fd 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -222,7 +222,6 @@ SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) { int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; sError("sync begin snapshot error"); return -1; }