Merge pull request #11363 from taosdata/feature/node
fix(cluster): redirect msg when mnode not deployed
This commit is contained in:
commit
a6be63e2ec
|
@ -158,7 +158,7 @@ SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, EDndType ntype) {
|
|||
int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
|
||||
dTrace("node:%s, is acquired, refCount:%d", pWrapper->name, refCount);
|
||||
} else {
|
||||
terrno = TSDB_CODE_NODE_REDIRECT;
|
||||
terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
|
||||
pRetWrapper = NULL;
|
||||
}
|
||||
taosRUnLockLatch(&pWrapper->latch);
|
||||
|
@ -174,7 +174,7 @@ int32_t dndMarkWrapper(SMgmtWrapper *pWrapper) {
|
|||
int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
|
||||
dTrace("node:%s, is marked, refCount:%d", pWrapper->name, refCount);
|
||||
} else {
|
||||
terrno = TSDB_CODE_NODE_REDIRECT;
|
||||
terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
|
||||
code = -1;
|
||||
}
|
||||
taosRUnLockLatch(&pWrapper->latch);
|
||||
|
|
|
@ -53,8 +53,9 @@ static void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpS
|
|||
int32_t code = -1;
|
||||
SNodeMsg *pMsg = NULL;
|
||||
NodeMsgFp msgFp = NULL;
|
||||
uint16_t msgType = pRpc->msgType;
|
||||
|
||||
if (pEpSet && pEpSet->numOfEps > 0 && pRpc->msgType == TDMT_MND_STATUS_RSP) {
|
||||
if (pEpSet && pEpSet->numOfEps > 0 && msgType == TDMT_MND_STATUS_RSP) {
|
||||
dndUpdateMnodeEpSet(pWrapper->pDnode, pEpSet);
|
||||
}
|
||||
|
||||
|
@ -84,9 +85,15 @@ _OVER:
|
|||
}
|
||||
} else {
|
||||
dError("msg:%p, failed to process since 0x%04x:%s", pMsg, code & 0XFFFF, terrstr());
|
||||
if (pRpc->msgType & 1U) {
|
||||
if (msgType & 1U) {
|
||||
if (terrno != 0) code = terrno;
|
||||
SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
|
||||
if (code == TSDB_CODE_NODE_NOT_DEPLOYED || code == TSDB_CODE_NODE_OFFLINE) {
|
||||
if (msgType > TDMT_MND_MSG && msgType < TDMT_VND_MSG) {
|
||||
code = TSDB_CODE_NODE_REDIRECT;
|
||||
}
|
||||
}
|
||||
|
||||
SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = code};
|
||||
tmsgSendRsp(&rsp);
|
||||
}
|
||||
dTrace("msg:%p, is freed", pMsg);
|
||||
|
@ -348,8 +355,7 @@ static int32_t dndSendRpcReq(STransMgmt *pMgmt, const SEpSet *pEpSet, SRpcMsg *p
|
|||
}
|
||||
|
||||
static void dndSendRpcRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) {
|
||||
if (pRsp->code == TSDB_CODE_APP_NOT_READY || pRsp->code == TSDB_CODE_NODE_REDIRECT ||
|
||||
pRsp->code == TSDB_CODE_NODE_OFFLINE) {
|
||||
if (pRsp->code == TSDB_CODE_NODE_REDIRECT) {
|
||||
dmSendRedirectRsp(pWrapper->pMgmt, pRsp);
|
||||
} else {
|
||||
rpcSendResponse(pRsp);
|
||||
|
@ -442,7 +448,8 @@ static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t
|
|||
static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen,
|
||||
ProcFuncType ftype) {
|
||||
pMsg->pCont = pCont;
|
||||
dTrace("msg:%p, get from parent queue, ftype:%d handle:%p, app:%p", pMsg, ftype, pMsg->handle, pMsg->ahandle);
|
||||
dTrace("msg:%p, get from parent queue, ftype:%d handle:%p code:0x%04x mtype:%d, app:%p", pMsg, ftype, pMsg->handle,
|
||||
pMsg->code & 0xFFFF, pMsg->msgType, pMsg->ahandle);
|
||||
|
||||
switch (ftype) {
|
||||
case PROC_REGIST:
|
||||
|
|
|
@ -96,7 +96,7 @@ int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
|||
|
||||
if (alterReq.dnodeId != pDnode->dnodeId) {
|
||||
terrno = TSDB_CODE_INVALID_OPTION;
|
||||
dError("failed to alter mnode since %s", terrstr());
|
||||
dError("failed to alter mnode since %s, dnodeId:%d input:%d", terrstr(), pDnode->dnodeId, alterReq.dnodeId);
|
||||
return -1;
|
||||
} else {
|
||||
return mmAlter(pMgmt, &alterReq);
|
||||
|
|
|
@ -112,6 +112,9 @@ static int32_t mmOpenImp(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pReq) {
|
|||
|
||||
if (!deployed) {
|
||||
dInfo("mnode start to deploy");
|
||||
if (pMgmt->pWrapper->procType == PROC_CHILD) {
|
||||
pMgmt->pDnode->dnodeId = 1;
|
||||
}
|
||||
mmBuildOptionForDeploy(pMgmt, &option);
|
||||
} else {
|
||||
dInfo("mnode start to open");
|
||||
|
|
|
@ -188,7 +188,7 @@ TEST_F(DndTestMnode, 03_Drop_Mnode) {
|
|||
|
||||
SRpcMsg* pRsp = test.SendReq(TDMT_DND_ALTER_MNODE, pReq, contLen);
|
||||
ASSERT_NE(pRsp, nullptr);
|
||||
ASSERT_EQ(pRsp->code, TSDB_CODE_RPC_REDIRECT);
|
||||
ASSERT_EQ(pRsp->code, TSDB_CODE_NODE_NOT_DEPLOYED);
|
||||
}
|
||||
|
||||
{
|
||||
|
|
|
@ -43,7 +43,7 @@
|
|||
./test.sh -f tsim/bnode/basic1.sim
|
||||
|
||||
# ---- mnode
|
||||
./test.sh -f tsim/bnode/basic1.sim
|
||||
./test.sh -f tsim/mnode/basic1.sim
|
||||
|
||||
# ---- show
|
||||
./test.sh -f tsim/show/basic.sim
|
||||
|
@ -69,7 +69,13 @@
|
|||
|
||||
# --- for multi process mode
|
||||
./test.sh -f tsim/user/basic1.sim -m
|
||||
./test.sh -f tsim/stable/vnode3.sim -m
|
||||
./test.sh -f tsim/db/basic3.sim -m
|
||||
./test.sh -f tsim/insert/backquote.sim
|
||||
./test.sh -f tsim/parser/fourArithmetic-basic.sim -m
|
||||
./test.sh -f tsim/query/interval-offset.sim -m
|
||||
./test.sh -f tsim/tmq/basic.sim -m
|
||||
./test.sh -f tsim/stable/vnode3.sim -m
|
||||
./test.sh -f tsim/qnode/basic1.sim -m
|
||||
./test.sh -f tsim/mnode/basic1.sim -m
|
||||
|
||||
#======================b1-end===============
|
||||
|
|
|
@ -75,7 +75,6 @@ if $data02 != master then
|
|||
return -1
|
||||
endi
|
||||
|
||||
return
|
||||
print =============== create drop mnode 1
|
||||
sql_error create mnode on dnode 1
|
||||
sql_error drop mnode on dnode 1
|
||||
|
|
Loading…
Reference in New Issue