From 86ca5ce21050e29eea90c97ac8a59b6fc457517c Mon Sep 17 00:00:00 2001 From: cadem Date: Fri, 24 Mar 2023 09:13:20 +0800 Subject: [PATCH] fix/trigger election by sync msg --- include/common/tmsg.h | 6 ++-- include/common/tmsgdef.h | 2 +- include/libs/sync/sync.h | 2 +- source/common/src/tmsg.c | 4 +-- source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 2 +- source/dnode/mgmt/mgmt_vnode/inc/vmInt.h | 1 - source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 22 +------------ source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 3 -- source/dnode/mnode/impl/src/mndVgroup.c | 34 ++++++++++++++------- source/dnode/vnode/inc/vnode.h | 1 - source/dnode/vnode/src/vnd/vnodeOpen.c | 4 --- source/libs/sync/src/syncMain.c | 20 ++++++++---- 12 files changed, 46 insertions(+), 55 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index ba8ab07ba8..d7f9e16d87 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1670,10 +1670,10 @@ int32_t tDeserializeSBalanceVgroupLeaderReq(void* buf, int32_t bufLen, SBalanceV typedef struct { int32_t vgId; -} SForceElectionReq; +} SForceBecomeFollowerReq; -int32_t tSerializeSForceElectionReq(void* buf, int32_t bufLen, SForceElectionReq* pReq); -int32_t tDeserializeSForceElectionReq(void* buf, int32_t bufLen, SForceElectionReq* pReq); +int32_t tSerializeSForceBecomeFollowerReq(void* buf, int32_t bufLen, SForceBecomeFollowerReq* pReq); +int32_t tDeserializeSForceBecomeFollowerReq(void* buf, int32_t bufLen, SForceBecomeFollowerReq* pReq); typedef struct { int32_t vgId; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 67fd832f13..65bb367bfa 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -83,7 +83,6 @@ enum { TD_DEF_MSG_TYPE(TDMT_DND_CONFIG_DNODE, "config-dnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_SYSTABLE_RETRIEVE, "dnode-retrieve", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_MAX_MSG, "dnd-max", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_DND_FORCE_ELECTION, "balance-force-election", NULL, NULL) TD_NEW_MSG_SEG(TDMT_MND_MSG) TD_DEF_MSG_TYPE(TDMT_MND_CONNECT, "connect", NULL, NULL) @@ -288,6 +287,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_SYNC_PRE_SNAPSHOT, "sync-pre-snapshot", NULL, NULL) // no longer used TD_DEF_MSG_TYPE(TDMT_SYNC_PRE_SNAPSHOT_REPLY, "sync-pre-snapshot-reply", NULL, NULL) // no longer used TD_DEF_MSG_TYPE(TDMT_SYNC_MAX_MSG, "sync-max", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_FORCE_FOLLOWER, "sync-force-become-follower", NULL, NULL) TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 337aba567e..a86e97ea78 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -246,7 +246,7 @@ bool syncIsReadyForRead(int64_t rid); bool syncSnapshotSending(int64_t rid); bool syncSnapshotRecving(int64_t rid); int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq); -int32_t syncLeaderForceElection(int64_t rid); +int32_t syncForceBecomeFollower(SSyncNode* ths, const SRpcMsg* pRpcMsg); SSyncState syncGetState(int64_t rid); void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 4b3b22f06e..ccc3ceae7b 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -4569,7 +4569,7 @@ int32_t tDeserializeSSplitVgroupReq(void *buf, int32_t bufLen, SSplitVgroupReq * return 0; } -int32_t tSerializeSForceElectionReq(void *buf, int32_t bufLen, SForceElectionReq *pReq) { +int32_t tSerializeSForceBecomeFollowerReq(void *buf, int32_t bufLen, SForceBecomeFollowerReq *pReq) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); @@ -4582,7 +4582,7 @@ int32_t tSerializeSForceElectionReq(void *buf, int32_t bufLen, SForceElectionReq return tlen; } -int32_t tDeserializeSForceElectionReq(void *buf, int32_t bufLen, SForceElectionReq *pReq) { +int32_t tDeserializeSForceBecomeFollowerReq(void *buf, int32_t bufLen, SForceBecomeFollowerReq *pReq) { SDecoder decoder = {0}; tDecoderInit(&decoder, buf, bufLen); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 0fa889169e..b96a0ffd42 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -92,7 +92,6 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_DND_FORCE_ELECTION_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CONNECT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_ACCT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; @@ -205,6 +204,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, mmPutMsgToSyncCtrlQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, mmPutMsgToSyncCtrlQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_FORCE_FOLLOWER_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; code = 0; diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index d10565975b..e3fa2964b7 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -86,7 +86,6 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemo // vmHandle.c SArray *vmGetMsgHandles(); int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); -int32_t vmProcessForceElectionReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 92429a4157..11ccff3198 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -304,26 +304,6 @@ int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return 0; } -int32_t vmProcessForceElectionReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg){ - SForceElectionReq req = {0}; - - if (tDeserializeSForceElectionReq(pMsg->pCont, pMsg->contLen, &req) != 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; - } - - SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId); - if (pVnode == NULL) { - dError("vgId:%d, failed to alter hashrange since %s", req.vgId, terrstr()); - terrno = TSDB_CODE_VND_NOT_EXIST; - return -1; - } - - vnodeForceElection(pVnode->pImpl); - - return 0; -} - int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { SAlterVnodeHashRangeReq req = {0}; if (tDeserializeSAlterVnodeHashRangeReq(pMsg->pCont, pMsg->contLen, &req) != 0) { @@ -568,7 +548,6 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_DND_FORCE_ELECTION, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; @@ -586,6 +565,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_FORCE_FOLLOWER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; code = 0; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index e0f141639e..7aa1c9f56a 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -37,9 +37,6 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { case TDMT_DND_CREATE_VNODE: code = vmProcessCreateVnodeReq(pMgmt, pMsg); break; - case TDMT_DND_FORCE_ELECTION: - code = vmProcessForceElectionReq(pMgmt, pMsg); - break; case TDMT_DND_DROP_VNODE: code = vmProcessDropVnodeReq(pMgmt, pMsg); break; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 2851b56395..31924e0471 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -61,7 +61,7 @@ int32_t mndInitVgroup(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_DND_DROP_VNODE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_COMPACT_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_DISABLE_WRITE_RSP, mndTransProcessRsp); - mndSetMsgHandle(pMnode, TDMT_DND_FORCE_ELECTION_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_SYNC_FORCE_FOLLOWER_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_REDISTRIBUTE_VGROUP, mndProcessRedistributeVgroupMsg); mndSetMsgHandle(pMnode, TDMT_MND_SPLIT_VGROUP, mndProcessSplitVgroupMsg); @@ -1779,17 +1779,18 @@ _OVER: return code; } -static void *mndBuildSForceElectionReq(SMnode *pMnode, SVgObj *pVgroup, int32_t dnodeId, +static void *mndBuildSForceBecomeFollowerReq(SMnode *pMnode, SVgObj *pVgroup, int32_t dnodeId, int32_t *pContLen) { - SForceElectionReq balanceReq = { + SForceBecomeFollowerReq balanceReq = { .vgId = pVgroup->vgId, }; - int32_t contLen = tSerializeSForceElectionReq(NULL, 0, &balanceReq); + int32_t contLen = tSerializeSForceBecomeFollowerReq(NULL, 0, &balanceReq); if (contLen < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } + contLen += sizeof(SMsgHead); void *pReq = taosMemoryMalloc(contLen); if (pReq == NULL) { @@ -1797,9 +1798,13 @@ static void *mndBuildSForceElectionReq(SMnode *pMnode, SVgObj *pVgroup, int32_t return NULL; } - tSerializeSForceElectionReq((char *)pReq, contLen, &balanceReq); + SMsgHead *pHead = pReq; + pHead->contLen = htonl(contLen); + pHead->vgId = htonl(pVgroup->vgId); + + tSerializeSForceBecomeFollowerReq((char *)pReq + sizeof(SMsgHead), contLen, &balanceReq); *pContLen = contLen; - return pReq; + return pReq; } int32_t mndAddBalanceVgroupLeaderAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t dnodeId) { @@ -1811,12 +1816,12 @@ int32_t mndAddBalanceVgroupLeaderAction(SMnode *pMnode, STrans *pTrans, SVgObj * mndReleaseDnode(pMnode, pDnode); int32_t contLen = 0; - void *pReq = mndBuildSForceElectionReq(pMnode, pVgroup, dnodeId, &contLen); + void *pReq = mndBuildSForceBecomeFollowerReq(pMnode, pVgroup, dnodeId, &contLen); if (pReq == NULL) return -1; action.pCont = pReq; action.contLen = contLen; - action.msgType = TDMT_DND_FORCE_ELECTION; + action.msgType = TDMT_SYNC_FORCE_FOLLOWER; if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(pReq); @@ -1832,13 +1837,20 @@ int32_t mndAddVgroupBalanceToTrans(SMnode *pMnode, SVgObj *pVgroup, STrans *pTra int32_t vgid = pVgroup->vgId; int8_t replica = pVgroup->replica; - if(pVgroup->replica <= 1) { + if(pVgroup->replica <= 1) { mInfo("trans:%d, vgid:%d no need to balance, replica:%d", pTrans->id, vgid, replica); return -1; } - int32_t index = vgid%replica; - int32_t dnodeId = pVgroup->vnodeGid[index].dnodeId; + int32_t dnodeId = pVgroup->vnodeGid[0].dnodeId; + + for(int i = 0; i < replica; i++) + { + if(pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEADER){ + dnodeId = pVgroup->vnodeGid[i].dnodeId; + break; + } + } bool exist = false; bool online = false; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index f3094b39fe..2d053d04ae 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -56,7 +56,6 @@ void vnodeDestroy(const char *path, STfs *pTfs); SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb); void vnodePreClose(SVnode *pVnode); void vnodePostClose(SVnode *pVnode); -void vnodeForceElection(SVnode *pVnode); void vnodeSyncCheckTimeout(SVnode *pVnode); void vnodeClose(SVnode *pVnode); int32_t vnodeSyncCommit(SVnode *pVnode); diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index c5f9412461..c7d155be0d 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -380,10 +380,6 @@ void vnodePreClose(SVnode *pVnode) { void vnodePostClose(SVnode *pVnode) { vnodeSyncPostClose(pVnode); } -void vnodeForceElection(SVnode *pVnode) { - syncLeaderForceElection(pVnode->sync); -} - void vnodeClose(SVnode *pVnode) { if (pVnode) { tsem_wait(&pVnode->canCommit); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 8d7426f699..c9b997af2e 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -206,6 +206,9 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) { case TDMT_SYNC_LOCAL_CMD: code = syncNodeOnLocalCmd(pSyncNode, pMsg); break; + case TDMT_SYNC_FORCE_FOLLOWER: + code = syncForceBecomeFollower(pSyncNode, pMsg); + break; default: terrno = TSDB_CODE_MSG_NOT_PROCESSED; code = -1; @@ -228,13 +231,18 @@ int32_t syncLeaderTransfer(int64_t rid) { return ret; } -int32_t syncLeaderForceElection(int64_t rid) { - SSyncNode* pSyncNode = syncNodeAcquire(rid); - if (pSyncNode == NULL) return -1; +int32_t syncForceBecomeFollower(SSyncNode* ths, const SRpcMsg* pRpcMsg) { + syncNodeBecomeFollower(ths, "force election"); - int32_t ret = syncNodeElect(pSyncNode); - syncNodeRelease(pSyncNode); - return ret; + SRpcMsg rsp = { + .code = 0, + .pCont = pRpcMsg->info.rsp, + .contLen = pRpcMsg->info.rspLen, + .info = pRpcMsg->info, + }; + tmsgSendRsp(&rsp); + + return 0; } int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) {