From 129b289bdf8f60a2df03dee4fef1165d0daab086 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 12 Jul 2022 14:34:18 +0800 Subject: [PATCH 1/3] refactor(sync): do leader transfer --- source/libs/sync/src/syncMain.c | 39 ++++++++------------------------- 1 file changed, 9 insertions(+), 30 deletions(-) diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index e1c3d4bb33..6093d622b7 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -273,16 +273,8 @@ int32_t syncLeaderTransfer(int64_t rid) { } ASSERT(rid == pSyncNode->rid); - if (pSyncNode->peersNum == 0) { - taosReleaseRef(tsNodeRefId, pSyncNode->rid); - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; - return -1; - } - - SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0]; + int32_t ret = syncNodeLeaderTransfer(pSyncNode); taosReleaseRef(tsNodeRefId, pSyncNode->rid); - - int32_t ret = syncLeaderTransferTo(rid, newLeader); return ret; } @@ -293,25 +285,8 @@ int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) { return -1; } ASSERT(rid == pSyncNode->rid); - int32_t ret = 0; - if (pSyncNode->replicaNum == 1) { - sError("only one replica, cannot drop leader"); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); - terrno = TSDB_CODE_SYN_ONE_REPLICA; - return -1; - } - - SyncLeaderTransfer* pMsg = syncLeaderTransferBuild(pSyncNode->vgId); - pMsg->newLeaderId.addr = syncUtilAddr2U64(newLeader.nodeFqdn, newLeader.nodePort); - pMsg->newLeaderId.vgId = pSyncNode->vgId; - pMsg->newNodeInfo = newLeader; - ASSERT(pMsg != NULL); - SRpcMsg rpcMsg = {0}; - syncLeaderTransfer2RpcMsg(pMsg, &rpcMsg); - syncLeaderTransferDestroy(pMsg); - - ret = syncNodePropose(pSyncNode, &rpcMsg, false); + int32_t ret = syncNodeLeaderTransferTo(pSyncNode, newLeader); taosReleaseRef(tsNodeRefId, pSyncNode->rid); return ret; } @@ -337,6 +312,12 @@ int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) { return -1; } + do { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "begin leader transfer to %s:%u", newLeader.nodeFqdn, newLeader.nodePort); + syncNodeEventLog(pSyncNode, logBuf); + } while (0); + SyncLeaderTransfer* pMsg = syncLeaderTransferBuild(pSyncNode->vgId); pMsg->newLeaderId.addr = syncUtilAddr2U64(newLeader.nodeFqdn, newLeader.nodePort); pMsg->newLeaderId.vgId = pSyncNode->vgId; @@ -1147,8 +1128,6 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) { void syncNodeClose(SSyncNode* pSyncNode) { syncNodeEventLog(pSyncNode, "sync close"); - // leader transfer - int32_t ret; ASSERT(pSyncNode != NULL); @@ -2643,7 +2622,7 @@ const char* syncStr(ESyncState state) { static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) { SyncLeaderTransfer* pSyncLeaderTransfer = syncLeaderTransferFromRpcMsg2(pRpcMsg); - syncNodeEventLog(ths, "begin leader transfer"); + syncNodeEventLog(ths, "do leader transfer"); bool sameId = syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId)); bool sameNodeInfo = strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 && From 37ebcdeaaa68f56ca78808a64ced84bafb55a718 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 12 Jul 2022 14:36:38 +0800 Subject: [PATCH 2/3] refactor(sync): add elect case --- tests/script/tsim/sync/electTest.sim | 193 +++++++++++++++++++++++++++ 1 file changed, 193 insertions(+) create mode 100644 tests/script/tsim/sync/electTest.sim diff --git a/tests/script/tsim/sync/electTest.sim b/tests/script/tsim/sync/electTest.sim new file mode 100644 index 0000000000..5433434014 --- /dev/null +++ b/tests/script/tsim/sync/electTest.sim @@ -0,0 +1,193 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/deploy.sh -n dnode2 -i 2 +system sh/deploy.sh -n dnode3 -i 3 +system sh/deploy.sh -n dnode4 -i 4 + +system sh/cfg.sh -n dnode1 -c supportVnodes -v 0 + +system sh/exec.sh -n dnode1 -s start +system sh/exec.sh -n dnode2 -s start +system sh/exec.sh -n dnode3 -s start +system sh/exec.sh -n dnode4 -s start + +$loop_cnt = 0 +check_dnode_ready: + $loop_cnt = $loop_cnt + 1 + sleep 200 + if $loop_cnt == 10 then + print ====> dnode not ready! + return -1 + endi +sql show dnodes +print ===> $rows $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] +print ===> $rows $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6] +print ===> $rows $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[2][4] $data[2][5] $data[2][6] +print ===> $rows $data[3][0] $data[3][1] $data[3][2] $data[3][3] $data[3][4] $data[3][5] $data[3][6] +if $data[0][0] != 1 then + return -1 +endi +if $data[0][4] != ready then + goto check_dnode_ready +endi + +sql connect +sql create dnode $hostname port 7200 +sql create dnode $hostname port 7300 +sql create dnode $hostname port 7400 + +$loop_cnt = 0 +check_dnode_ready_1: +$loop_cnt = $loop_cnt + 1 +sleep 200 +if $loop_cnt == 10 then + print ====> dnodes not ready! + return -1 +endi +sql show dnodes +print ===> $rows $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] +print ===> $rows $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6] +print ===> $rows $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[2][4] $data[2][5] $data[2][6] +print ===> $rows $data[3][0] $data[3][1] $data[3][2] $data[3][3] $data[3][4] $data[3][5] $data[3][6] +if $data[0][4] != ready then + goto check_dnode_ready_1 +endi +if $data[1][4] != ready then + goto check_dnode_ready_1 +endi +if $data[2][4] != ready then + goto check_dnode_ready_1 +endi +if $data[3][4] != ready then + goto check_dnode_ready_1 +endi + +$replica = 3 +$vgroups = 1 + +print ============= create database +sql create database db replica $replica vgroups $vgroups + +$loop_cnt = 0 +check_db_ready: +$loop_cnt = $loop_cnt + 1 +sleep 200 +if $loop_cnt == 100 then + print ====> db not ready! + return -1 +endi +sql show databases +print ===> rows: $rows +print $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[2][4] $data[2][5] $data[2][6] $data[2][7] $data[2][8] $data[2][9] $data[2][6] $data[2][11] $data[2][12] $data[2][13] $data[2][14] $data[2][15] $data[2][16] $data[2][17] $data[2][18] $data[2][19] +if $rows != 3 then + return -1 +endi +if $data[2][19] != ready then + goto check_db_ready +endi + +sql use db + +$loop_cnt = 0 +check_vg_ready: +$loop_cnt = $loop_cnt + 1 +sleep 200 +if $loop_cnt == 300 then + print ====> vgroups not ready! + return -1 +endi + +sql show vgroups +print ===> rows: $rows +print $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] $data[0][7] $data[0][8] $data[0][9] $data[0][10] $data[0][11] + +if $rows != $vgroups then + return -1 +endi + +if $data[0][4] == leader then + if $data[0][6] == follower then + if $data[0][8] == follower then + print ---- vgroup $data[0][0] leader locate on dnode $data[0][3] + endi + endi +elif $data[0][6] == leader then + if $data[0][4] == follower then + if $data[0][8] == follower then + print ---- vgroup $data[0][0] leader locate on dnode $data[0][5] + endi + endi +elif $data[0][8] == leader then + if $data[0][4] == follower then + if $data[0][6] == follower then + print ---- vgroup $data[0][0] leader locate on dnode $data[0][7] + endi + endi +else + goto check_vg_ready +endi + + +vg_ready: +print ====> create stable/child table +sql create table stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int) + +sql show stables +if $rows != 1 then + return -1 +endi + +sql create table ct1 using stb tags(1000) + + +print ===> write 100 records +$N = 100 +$count = 0 +while $count < $N + $ms = 1591200000000 + $count + sql insert into ct1 values( $ms , $count , 2.1, 3.1) + $count = $count + 1 +endw + + +#sql flush database db; + + +sleep 3000 + + +print ===> stop dnode1 dnode2 dnode3 dnode4 +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode2 -s stop -x SIGINT +system sh/exec.sh -n dnode3 -s stop -x SIGINT +system sh/exec.sh -n dnode4 -s stop -x SIGINT + + + +######################################################## +print ===> start dnode1 dnode2 dnode3 dnode4 +system sh/exec.sh -n dnode1 -s start +system sh/exec.sh -n dnode2 -s start +system sh/exec.sh -n dnode3 -s start +system sh/exec.sh -n dnode4 -s start + +sleep 3000 + +print =============== query data +sql connect +sql use db +sql select * from ct1 +print rows: $rows +print $data00 $data01 $data02 +if $rows != 100 then + return -1 +endi + +#system sh/exec.sh -n dnode1 -s stop -x SIGINT +#system sh/exec.sh -n dnode2 -s stop -x SIGINT +#system sh/exec.sh -n dnode3 -s stop -x SIGINT +#system sh/exec.sh -n dnode4 -s stop -x SIGINT +######################################################### + + + From 96f9274fef73be521a866df88b760e25c23e22bb Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 12 Jul 2022 15:04:32 +0800 Subject: [PATCH 3/3] refactor(sync): add timer routines --- source/dnode/mnode/impl/src/mndSync.c | 1 + source/dnode/vnode/src/vnd/vnodeSync.c | 2 +- source/libs/sync/inc/syncEnv.h | 2 +- source/libs/sync/src/syncMain.c | 30 +++++++------------------- source/libs/sync/src/syncTimeout.c | 12 +++++++++-- 5 files changed, 21 insertions(+), 26 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index d77b39003a..bcf926e5ee 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -199,6 +199,7 @@ int32_t mndInitSync(SMnode *pMnode) { } // decrease election timer + setPingTimerMS(pMgmt->sync, 5000); setElectTimerMS(pMgmt->sync, 600); setHeartbeatTimerMS(pMgmt->sync, 300); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 97ce8eaab7..bdcfe208d6 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -569,7 +569,7 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) { return -1; } - setPingTimerMS(pVnode->sync, 3000); + setPingTimerMS(pVnode->sync, 5000); setElectTimerMS(pVnode->sync, 500); setHeartbeatTimerMS(pVnode->sync, 100); return 0; diff --git a/source/libs/sync/inc/syncEnv.h b/source/libs/sync/inc/syncEnv.h index beddec64c5..dd032f1481 100644 --- a/source/libs/sync/inc/syncEnv.h +++ b/source/libs/sync/inc/syncEnv.h @@ -30,7 +30,7 @@ extern "C" { #define TIMER_MAX_MS 0x7FFFFFFF #define ENV_TICK_TIMER_MS 1000 -#define PING_TIMER_MS 1000 +#define PING_TIMER_MS 5000 #define ELECT_TIMER_MS_MIN 1300 #define ELECT_TIMER_MS_MAX (ELECT_TIMER_MS_MIN * 2) #define ELECT_TIMER_MS_RANGE (ELECT_TIMER_MS_MAX - ELECT_TIMER_MS_MIN) diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 6093d622b7..abc0f53611 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1099,19 +1099,13 @@ void syncNodeStart(SSyncNode* pSyncNode) { // Raft 3.6.2 Committing entries from previous terms syncNodeAppendNoop(pSyncNode); syncMaybeAdvanceCommitIndex(pSyncNode); - - return; + } else { + syncNodeBecomeFollower(pSyncNode, "first start"); } - syncNodeBecomeFollower(pSyncNode, "first start"); - - // int32_t ret = 0; - // ret = syncNodeStartPingTimer(pSyncNode); - // ASSERT(ret == 0); - - if (gRaftDetailLog) { - syncNodeLog2("==state change become leader immediately==", pSyncNode); - } + int32_t ret = 0; + ret = syncNodeStartPingTimer(pSyncNode); + ASSERT(ret == 0); } void syncNodeStartStandBy(SSyncNode* pSyncNode) { @@ -1162,14 +1156,6 @@ void syncNodeClose(SSyncNode* pSyncNode) { pSyncNode->pNewNodeReceiver = NULL; } - /* - if (pSyncNode->pSnapshot != NULL) { - taosMemoryFree(pSyncNode->pSnapshot); - } - */ - - // tsem_destroy(&pSyncNode->restoreSem); - // free memory in syncFreeNode // taosMemoryFree(pSyncNode); } @@ -1234,7 +1220,7 @@ int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) { &pSyncNode->pPingTimer); atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser); } else { - sError("sync env is stop, syncNodeStartPingTimer"); + sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId); } return ret; } @@ -1255,7 +1241,7 @@ int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) { &pSyncNode->pElectTimer); atomic_store_64(&pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser); } else { - sError("sync env is stop, syncNodeStartElectTimer"); + sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId); } return ret; } @@ -1295,7 +1281,7 @@ int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) { &pSyncNode->pHeartbeatTimer); atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser); } else { - sError("sync env is stop, syncNodeStartHeartbeatTimer"); + sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId); } return ret; } diff --git a/source/libs/sync/src/syncTimeout.c b/source/libs/sync/src/syncTimeout.c index 0d3a3c3cc5..52181a3da8 100644 --- a/source/libs/sync/src/syncTimeout.c +++ b/source/libs/sync/src/syncTimeout.c @@ -17,6 +17,11 @@ #include "syncElection.h" #include "syncReplication.h" +int32_t syncNodeTimerRoutine(SSyncNode* ths) { + syncNodeEventLog(ths, "timer routines ... "); + return 0; +} + int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { int32_t ret = 0; syncTimeoutLog2("==syncNodeOnTimeoutCb==", pMsg); @@ -24,8 +29,11 @@ int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { if (pMsg->timeoutType == SYNC_TIMEOUT_PING) { if (atomic_load_64(&ths->pingTimerLogicClockUser) <= pMsg->logicClock) { ++(ths->pingTimerCounter); + // syncNodePingAll(ths); - syncNodePingPeers(ths); + // syncNodePingPeers(ths); + + syncNodeTimerRoutine(ths); } } else if (pMsg->timeoutType == SYNC_TIMEOUT_ELECTION) { @@ -40,7 +48,7 @@ int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { syncNodeReplicate(ths); } } else { - sTrace("unknown timeoutType:%d", pMsg->timeoutType); + sError("vgId:%d, unknown timeout-type:%d", ths->vgId, pMsg->timeoutType); } return ret;