add redirect
This commit is contained in:
parent
23c69aff61
commit
4d40af9929
|
@ -150,6 +150,7 @@ ESyncState syncGetMyRole(int64_t rid);
|
||||||
const char* syncGetMyRoleStr(int64_t rid);
|
const char* syncGetMyRoleStr(int64_t rid);
|
||||||
SyncTerm syncGetMyTerm(int64_t rid);
|
SyncTerm syncGetMyTerm(int64_t rid);
|
||||||
void syncGetEpSet(int64_t rid, SEpSet* pEpSet);
|
void syncGetEpSet(int64_t rid, SEpSet* pEpSet);
|
||||||
|
int32_t syncGetVgId(int64_t rid);
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
TAOS_SYNC_PROPOSE_SUCCESS = 0,
|
TAOS_SYNC_PROPOSE_SUCCESS = 0,
|
||||||
|
|
|
@ -310,7 +310,7 @@ static inline void dmSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline void dmSendRedirectRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp, const SEpSet *pNewEpSet) {
|
static inline void dmSendRedirectRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp, const SEpSet *pNewEpSet) {
|
||||||
ASSERT(pRsp->code == TSDB_CODE_NODE_REDIRECT);
|
ASSERT(pRsp->code == TSDB_CODE_RPC_REDIRECT);
|
||||||
if (pWrapper->procType != DND_PROC_CHILD) {
|
if (pWrapper->procType != DND_PROC_CHILD) {
|
||||||
rpcSendRedirectRsp(pRsp->handle, pNewEpSet);
|
rpcSendRedirectRsp(pRsp->handle, pNewEpSet);
|
||||||
} else {
|
} else {
|
||||||
|
@ -402,6 +402,14 @@ SProcCfg dmGenProcCfg(SMgmtWrapper *pWrapper) {
|
||||||
return cfg;
|
return cfg;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool rpcRfp(int32_t code) {
|
||||||
|
if (code == TSDB_CODE_RPC_REDIRECT) {
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t dmInitClient(SDnode *pDnode) {
|
static int32_t dmInitClient(SDnode *pDnode) {
|
||||||
SDnodeTrans *pTrans = &pDnode->trans;
|
SDnodeTrans *pTrans = &pDnode->trans;
|
||||||
|
|
||||||
|
@ -416,6 +424,7 @@ static int32_t dmInitClient(SDnode *pDnode) {
|
||||||
rpcInit.ckey = INTERNAL_CKEY;
|
rpcInit.ckey = INTERNAL_CKEY;
|
||||||
rpcInit.spi = 1;
|
rpcInit.spi = 1;
|
||||||
rpcInit.parent = pDnode;
|
rpcInit.parent = pDnode;
|
||||||
|
rpcInit.rfp = rpcRfp;
|
||||||
|
|
||||||
char pass[TSDB_PASSWORD_LEN + 1] = {0};
|
char pass[TSDB_PASSWORD_LEN + 1] = {0};
|
||||||
taosEncryptPass_c((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass);
|
taosEncryptPass_c((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass);
|
||||||
|
|
|
@ -151,7 +151,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
|
||||||
if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) {
|
if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) {
|
||||||
// rsp.code = TSDB_CODE_SYN_NOT_LEADER;
|
// rsp.code = TSDB_CODE_SYN_NOT_LEADER;
|
||||||
// tmsgSendRsp(&rsp);
|
// tmsgSendRsp(&rsp);
|
||||||
|
dTrace("syncPropose not leader redirect, vgId:%d ", syncGetVgId(vnodeGetSyncHandle(pVnode->pImpl)));
|
||||||
rsp.code = TSDB_CODE_RPC_REDIRECT;
|
rsp.code = TSDB_CODE_RPC_REDIRECT;
|
||||||
SEpSet newEpSet;
|
SEpSet newEpSet;
|
||||||
syncGetEpSet(vnodeGetSyncHandle(pVnode->pImpl), &newEpSet);
|
syncGetEpSet(vnodeGetSyncHandle(pVnode->pImpl), &newEpSet);
|
||||||
|
|
|
@ -141,6 +141,18 @@ const char* syncGetMyRoleStr(int64_t rid) {
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t syncGetVgId(int64_t rid) {
|
||||||
|
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||||
|
if (pSyncNode == NULL) {
|
||||||
|
return TAOS_SYNC_STATE_ERROR;
|
||||||
|
}
|
||||||
|
assert(rid == pSyncNode->rid);
|
||||||
|
int32_t vgId = pSyncNode->vgId;
|
||||||
|
|
||||||
|
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||||
|
return vgId;
|
||||||
|
}
|
||||||
|
|
||||||
SyncTerm syncGetMyTerm(int64_t rid) {
|
SyncTerm syncGetMyTerm(int64_t rid) {
|
||||||
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||||
if (pSyncNode == NULL) {
|
if (pSyncNode == NULL) {
|
||||||
|
@ -278,6 +290,8 @@ void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
|
int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
|
||||||
|
sTrace("syncPropose msgType:%d ", pMsg->msgType);
|
||||||
|
|
||||||
int32_t ret = TAOS_SYNC_PROPOSE_SUCCESS;
|
int32_t ret = TAOS_SYNC_PROPOSE_SUCCESS;
|
||||||
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||||
if (pSyncNode == NULL) {
|
if (pSyncNode == NULL) {
|
||||||
|
|
Loading…
Reference in New Issue