add syncGetEpSet
This commit is contained in:
parent
9432dc6f6c
commit
2584b034df
|
@ -320,6 +320,27 @@ static inline void dmSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) {
|
|||
}
|
||||
}
|
||||
|
||||
static inline void dmSendRedirectRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp, const SEpSet *pNewEpSet) {
|
||||
ASSERT(pRsp->code == TSDB_CODE_RPC_REDIRECT);
|
||||
ASSERT(pRsp->pCont == NULL);
|
||||
if (pWrapper->procType != DND_PROC_CHILD) {
|
||||
SRpcMsg resp = {0};
|
||||
SMEpSet msg = {.epSet = *pNewEpSet};
|
||||
int32_t len = tSerializeSMEpSet(NULL, 0, &msg);
|
||||
resp.pCont = rpcMallocCont(len);
|
||||
resp.contLen = len;
|
||||
tSerializeSMEpSet(resp.pCont, len, &msg);
|
||||
|
||||
resp.code = TSDB_CODE_RPC_REDIRECT;
|
||||
resp.handle = pRsp->handle;
|
||||
resp.refId = pRsp->refId;
|
||||
rpcSendResponse(&resp);
|
||||
} else {
|
||||
taosProcPutToParentQ(pWrapper->procObj, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_FUNC_RSP);
|
||||
}
|
||||
}
|
||||
|
||||
#if 0
|
||||
static inline void dmSendRedirectRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp, const SEpSet *pNewEpSet) {
|
||||
ASSERT(pRsp->code == TSDB_CODE_RPC_REDIRECT);
|
||||
if (pWrapper->procType != DND_PROC_CHILD) {
|
||||
|
@ -328,6 +349,7 @@ static inline void dmSendRedirectRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp
|
|||
taosProcPutToParentQ(pWrapper->procObj, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_FUNC_RSP);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
static inline void dmRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
|
||||
if (pWrapper->procType != DND_PROC_CHILD) {
|
||||
|
|
|
@ -155,6 +155,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
|
|||
rsp.code = TSDB_CODE_RPC_REDIRECT;
|
||||
SEpSet newEpSet;
|
||||
syncGetEpSet(vnodeGetSyncHandle(pVnode->pImpl), &newEpSet);
|
||||
newEpSet.inUse = (newEpSet.inUse + 1) % newEpSet.numOfEps;
|
||||
tmsgSendRedirectRsp(&rsp, &newEpSet);
|
||||
|
||||
} else if (ret == TAOS_SYNC_PROPOSE_OTHER_ERROR) {
|
||||
|
|
|
@ -174,11 +174,16 @@ void syncGetEpSet(int64_t rid, SEpSet* pEpSet) {
|
|||
assert(rid == pSyncNode->rid);
|
||||
pEpSet->numOfEps = 0;
|
||||
for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
|
||||
snprintf(pEpSet->eps->fqdn, sizeof(pEpSet->eps->fqdn), "%s", (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodeFqdn);
|
||||
pEpSet->eps->port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort;
|
||||
snprintf(pEpSet->eps[i].fqdn, sizeof(pEpSet->eps[i].fqdn), "%s", (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodeFqdn);
|
||||
pEpSet->eps[i].port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort;
|
||||
(pEpSet->numOfEps)++;
|
||||
|
||||
sInfo("syncGetEpSet index:%d %s:%d", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
|
||||
|
||||
}
|
||||
pEpSet->inUse = pSyncNode->pRaftCfg->cfg.myIndex;
|
||||
|
||||
sInfo("syncGetEpSet pEpSet->inUse:%d ", pEpSet->inUse);
|
||||
|
||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue