diff --git a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c index 27bab8677f..1bac3e2dcf 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c @@ -267,7 +267,7 @@ int32_t dmMarkWrapper(SMgmtWrapper *pWrapper) { int32_t code = 0; taosRLockLatch(&pWrapper->latch); - if (pWrapper->deployed || (InParentProc(pWrapper->proc.ptype) && pWrapper->required)) { + if (pWrapper->deployed /* || (OnlyInParentProc(pWrapper->proc.ptype) && pWrapper->required) */) { int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1); dTrace("node:%s, is marked, ref:%d", pWrapper->name, refCount); } else { diff --git a/source/dnode/mgmt/node_mgmt/src/dmNodes.c b/source/dnode/mgmt/node_mgmt/src/dmNodes.c index e9928bec59..4608b43851 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmNodes.c +++ b/source/dnode/mgmt/node_mgmt/src/dmNodes.c @@ -73,7 +73,7 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) { SMgmtOutputOpt output = {0}; SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper); - if (pWrapper->ntype == DNODE || OnlyInChildProc(pWrapper->proc.ptype)) { + if (pWrapper->ntype == DNODE || InChildProc(pWrapper->proc.ptype)) { tmsgSetDefaultMsgCb(&input.msgCb); } diff --git a/source/dnode/mgmt/node_mgmt/src/dmProc.c b/source/dnode/mgmt/node_mgmt/src/dmProc.c index 187d129fac..c00c105b3b 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmProc.c +++ b/source/dnode/mgmt/node_mgmt/src/dmProc.c @@ -185,7 +185,7 @@ static int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, const char *pHe taosThreadMutexUnlock(&queue->mutex); tsem_post(&queue->sem); - dTrace("node:%s, push %s msg:%p %d cont:%p %d, pos:%d remain:%d", queue->name, dmFuncStr(ftype), pHead, headLen, + dTrace("node:%s, push %s msg:%p:%d cont:%p:%d, pos:%d remain:%d", queue->name, dmFuncStr(ftype), pHead, headLen, pBody, bodyLen, pos, queue->items); return 0; } @@ -269,8 +269,8 @@ static int32_t dmPopFromProcQueue(SProcQueue *queue, void **ppHead, int16_t *pHe *pBodyLen = rawBodyLen; *pFuncType = (EProcFuncType)ftype; - dTrace("node:%s, pop %s msg:%p %d body:%p %d, pos:%d remain:%d", queue->name, dmFuncStr(ftype), pHead, rawHeadLen, - pBody, rawBodyLen, pos, queue->items); + dTrace("node:%s, pop %s msg:%p:%d cont:%p:%d, pos:%d remain:%d", queue->name, dmFuncStr(ftype), pHead, headLen, pBody, + bodyLen, pos, queue->items); return 1; } diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index e242bf2849..af2cb71cf6 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -82,24 +82,27 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { if (msgType == TDMT_DND_NET_TEST) { dmProcessNetTestReq(pDnode, pRpc); - code = 0; - goto _OVER; + return; } else if (msgType == TDMT_MND_SYSTABLE_RETRIEVE_RSP || msgType == TDMT_VND_FETCH_RSP) { code = qWorkerProcessFetchRsp(NULL, NULL, pRpc); pRpc->pCont = NULL; // will be freed in qworker - code = 0; - goto _OVER; + return; } else { } if (pDnode->status != DND_STAT_RUNNING) { if (msgType == TDMT_DND_SERVER_STATUS) { dmProcessServerStartupStatus(pDnode, pRpc); - code = 0; } else { - terrno = TSDB_CODE_APP_NOT_READY; + SRpcMsg rspMsg = { + .handle = pRpc->handle, + .code = TSDB_CODE_APP_NOT_READY, + .ahandle = pRpc->ahandle, + .refId = pRpc->refId, + }; + rpcSendResponse(&rspMsg); } - goto _OVER; + return; } if (isReq && pRpc->pCont == NULL) { @@ -282,15 +285,17 @@ static inline int32_t dmSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SR } static inline void dmSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) { - if (!InChildProc(pWrapper->proc.ptype)) { - dmSendRpcRsp(pWrapper->pDnode, pRsp); - } else { + if (InChildProc(pWrapper->proc.ptype)) { dmPutToProcPQueue(&pWrapper->proc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, DND_FUNC_RSP); + } else { + dmSendRpcRsp(pWrapper->pDnode, pRsp); } } static inline void dmSendRedirectRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp, const SEpSet *pNewEpSet) { - if (!InChildProc(pWrapper->proc.ptype)) { + if (InChildProc(pWrapper->proc.ptype)) { + dmPutToProcPQueue(&pWrapper->proc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, DND_FUNC_RSP); + } else { SRpcMsg rsp = {0}; SMEpSet msg = {.epSet = *pNewEpSet}; int32_t len = tSerializeSMEpSet(NULL, 0, &msg); @@ -302,25 +307,23 @@ static inline void dmSendRedirectRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp rsp.handle = pRsp->handle; rsp.refId = pRsp->refId; rpcSendResponse(&rsp); - } else { - dmPutToProcPQueue(&pWrapper->proc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, DND_FUNC_RSP); } } static inline void dmRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) { - if (!InChildProc(pWrapper->proc.ptype)) { - rpcRegisterBrokenLinkArg(pMsg); - } else { + if (InChildProc(pWrapper->proc.ptype)) { dmPutToProcPQueue(&pWrapper->proc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, DND_FUNC_REGIST); + } else { + rpcRegisterBrokenLinkArg(pMsg); } } static inline void dmReleaseHandle(SMgmtWrapper *pWrapper, void *handle, int8_t type) { - if (!InChildProc(pWrapper->proc.ptype)) { - rpcReleaseHandle(handle, type); - } else { + if (InChildProc(pWrapper->proc.ptype)) { SRpcMsg msg = {.handle = handle, .code = type}; dmPutToProcPQueue(&pWrapper->proc, &msg, sizeof(SRpcMsg), NULL, 0, DND_FUNC_RELEASE); + } else { + rpcReleaseHandle(handle, type); } }