diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 1176094630..a83c6caa1c 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -149,6 +149,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg); ESyncState syncGetMyRole(int64_t rid); const char* syncGetMyRoleStr(int64_t rid); SyncTerm syncGetMyTerm(int64_t rid); +void syncGetEpSet(int64_t rid, SEpSet* pEpSet); typedef enum { TAOS_SYNC_PROPOSE_SUCCESS = 0, diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 040df158e4..dfb5c56ea5 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -149,9 +149,13 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO int32_t ret = syncPropose(vnodeGetSyncHandle(pVnode->pImpl), pRpc, false); if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) { - rsp.code = TSDB_CODE_SYN_NOT_LEADER; - tmsgSendRsp(&rsp); - // SEpSet epSet; TODO + // rsp.code = TSDB_CODE_SYN_NOT_LEADER; + // tmsgSendRsp(&rsp); + + rsp.code = TSDB_CODE_RPC_REDIRECT; + SEpSet newEpSet; + syncGetEpSet(vnodeGetSyncHandle(pVnode->pImpl), &newEpSet); + tmsgSendRedirectRsp(&rsp, &newEpSet); } else if (ret == TAOS_SYNC_PROPOSE_OTHER_ERROR) { rsp.code = TSDB_CODE_SYN_INTERNAL_ERROR; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 2dda089349..fbb906e4d7 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -153,6 +153,24 @@ SyncTerm syncGetMyTerm(int64_t rid) { return term; } +void syncGetEpSet(int64_t rid, SEpSet* pEpSet) { + SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + if (pSyncNode == NULL) { + memset(pEpSet, 0, sizeof(*pEpSet)); + return; + } + assert(rid == pSyncNode->rid); + pEpSet->numOfEps = 0; + for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) { + snprintf(pEpSet->eps->fqdn, sizeof(pEpSet->eps->fqdn), "%s", (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodeFqdn); + pEpSet->eps->port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort; + (pEpSet->numOfEps)++; + } + pEpSet->inUse = pSyncNode->pRaftCfg->cfg.myIndex; + + taosReleaseRef(tsNodeRefId, pSyncNode->rid); +} + int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) {