add redirect before syncPropose
This commit is contained in:
parent
a273559e93
commit
9a603170b4
|
@ -149,6 +149,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg);
|
||||||
ESyncState syncGetMyRole(int64_t rid);
|
ESyncState syncGetMyRole(int64_t rid);
|
||||||
const char* syncGetMyRoleStr(int64_t rid);
|
const char* syncGetMyRoleStr(int64_t rid);
|
||||||
SyncTerm syncGetMyTerm(int64_t rid);
|
SyncTerm syncGetMyTerm(int64_t rid);
|
||||||
|
void syncGetEpSet(int64_t rid, SEpSet* pEpSet);
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
TAOS_SYNC_PROPOSE_SUCCESS = 0,
|
TAOS_SYNC_PROPOSE_SUCCESS = 0,
|
||||||
|
|
|
@ -149,9 +149,13 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
|
||||||
|
|
||||||
int32_t ret = syncPropose(vnodeGetSyncHandle(pVnode->pImpl), pRpc, false);
|
int32_t ret = syncPropose(vnodeGetSyncHandle(pVnode->pImpl), pRpc, false);
|
||||||
if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) {
|
if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) {
|
||||||
rsp.code = TSDB_CODE_SYN_NOT_LEADER;
|
// rsp.code = TSDB_CODE_SYN_NOT_LEADER;
|
||||||
tmsgSendRsp(&rsp);
|
// tmsgSendRsp(&rsp);
|
||||||
// SEpSet epSet; TODO
|
|
||||||
|
rsp.code = TSDB_CODE_RPC_REDIRECT;
|
||||||
|
SEpSet newEpSet;
|
||||||
|
syncGetEpSet(vnodeGetSyncHandle(pVnode->pImpl), &newEpSet);
|
||||||
|
tmsgSendRedirectRsp(&rsp, &newEpSet);
|
||||||
|
|
||||||
} else if (ret == TAOS_SYNC_PROPOSE_OTHER_ERROR) {
|
} else if (ret == TAOS_SYNC_PROPOSE_OTHER_ERROR) {
|
||||||
rsp.code = TSDB_CODE_SYN_INTERNAL_ERROR;
|
rsp.code = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||||
|
|
|
@ -153,6 +153,24 @@ SyncTerm syncGetMyTerm(int64_t rid) {
|
||||||
return term;
|
return term;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void syncGetEpSet(int64_t rid, SEpSet* pEpSet) {
|
||||||
|
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||||
|
if (pSyncNode == NULL) {
|
||||||
|
memset(pEpSet, 0, sizeof(*pEpSet));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
assert(rid == pSyncNode->rid);
|
||||||
|
pEpSet->numOfEps = 0;
|
||||||
|
for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
|
||||||
|
snprintf(pEpSet->eps->fqdn, sizeof(pEpSet->eps->fqdn), "%s", (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodeFqdn);
|
||||||
|
pEpSet->eps->port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort;
|
||||||
|
(pEpSet->numOfEps)++;
|
||||||
|
}
|
||||||
|
pEpSet->inUse = pSyncNode->pRaftCfg->cfg.myIndex;
|
||||||
|
|
||||||
|
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) {
|
int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) {
|
||||||
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||||
if (pSyncNode == NULL) {
|
if (pSyncNode == NULL) {
|
||||||
|
|
Loading…
Reference in New Issue