diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index a83c6caa1c..ca10465ed4 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -150,6 +150,7 @@ ESyncState syncGetMyRole(int64_t rid); const char* syncGetMyRoleStr(int64_t rid); SyncTerm syncGetMyTerm(int64_t rid); void syncGetEpSet(int64_t rid, SEpSet* pEpSet); +int32_t syncGetVgId(int64_t rid); typedef enum { TAOS_SYNC_PROPOSE_SUCCESS = 0, diff --git a/source/dnode/mgmt/implement/src/dmTransport.c b/source/dnode/mgmt/implement/src/dmTransport.c index 7bfb497d8c..7abf5d65cd 100644 --- a/source/dnode/mgmt/implement/src/dmTransport.c +++ b/source/dnode/mgmt/implement/src/dmTransport.c @@ -310,7 +310,7 @@ static inline void dmSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) { } static inline void dmSendRedirectRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp, const SEpSet *pNewEpSet) { - ASSERT(pRsp->code == TSDB_CODE_NODE_REDIRECT); + ASSERT(pRsp->code == TSDB_CODE_RPC_REDIRECT); if (pWrapper->procType != DND_PROC_CHILD) { rpcSendRedirectRsp(pRsp->handle, pNewEpSet); } else { @@ -402,6 +402,14 @@ SProcCfg dmGenProcCfg(SMgmtWrapper *pWrapper) { return cfg; } +bool rpcRfp(int32_t code) { + if (code == TSDB_CODE_RPC_REDIRECT) { + return true; + } else { + return false; + } +} + static int32_t dmInitClient(SDnode *pDnode) { SDnodeTrans *pTrans = &pDnode->trans; @@ -416,6 +424,7 @@ static int32_t dmInitClient(SDnode *pDnode) { rpcInit.ckey = INTERNAL_CKEY; rpcInit.spi = 1; rpcInit.parent = pDnode; + rpcInit.rfp = rpcRfp; char pass[TSDB_PASSWORD_LEN + 1] = {0}; taosEncryptPass_c((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index dfb5c56ea5..bfeec26d23 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -151,7 +151,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) { // rsp.code = TSDB_CODE_SYN_NOT_LEADER; // tmsgSendRsp(&rsp); - + dTrace("syncPropose not leader redirect, vgId:%d ", syncGetVgId(vnodeGetSyncHandle(pVnode->pImpl))); rsp.code = TSDB_CODE_RPC_REDIRECT; SEpSet newEpSet; syncGetEpSet(vnodeGetSyncHandle(pVnode->pImpl), &newEpSet); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index fbb906e4d7..857b572cab 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -141,6 +141,18 @@ const char* syncGetMyRoleStr(int64_t rid) { return s; } +int32_t syncGetVgId(int64_t rid) { + SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + if (pSyncNode == NULL) { + return TAOS_SYNC_STATE_ERROR; + } + assert(rid == pSyncNode->rid); + int32_t vgId = pSyncNode->vgId; + + taosReleaseRef(tsNodeRefId, pSyncNode->rid); + return vgId; +} + SyncTerm syncGetMyTerm(int64_t rid) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { @@ -278,6 +290,8 @@ void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS) { } int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { + sTrace("syncPropose msgType:%d ", pMsg->msgType); + int32_t ret = TAOS_SYNC_PROPOSE_SUCCESS; SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) {