diff --git a/include/common/tmsgcb.h b/include/common/tmsgcb.h index 1b3311b400..192b4cebbc 100644 --- a/include/common/tmsgcb.h +++ b/include/common/tmsgcb.h @@ -41,14 +41,16 @@ typedef int32_t (*GetQueueSizeFp)(SMgmtWrapper* pWrapper, int32_t vgId, EQueueTy typedef int32_t (*SendReqFp)(SMgmtWrapper* pWrapper, SEpSet* epSet, SRpcMsg* pReq); typedef int32_t (*SendMnodeReqFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq); typedef void (*SendRspFp)(SMgmtWrapper* pWrapper, SRpcMsg* pRsp); +typedef void (*RegisterBrokenLinkArgFp)(SMgmtWrapper* pWrapper, SRpcMsg *pMsg); typedef struct { - SMgmtWrapper* pWrapper; - PutToQueueFp queueFps[QUEUE_MAX]; - GetQueueSizeFp qsizeFp; - SendReqFp sendReqFp; - SendMnodeReqFp sendMnodeReqFp; - SendRspFp sendRspFp; + SMgmtWrapper* pWrapper; + PutToQueueFp queueFps[QUEUE_MAX]; + GetQueueSizeFp qsizeFp; + SendReqFp sendReqFp; + SendMnodeReqFp sendMnodeReqFp; + SendRspFp sendRspFp; + RegisterBrokenLinkArgFp registerBrokenLinkArgFp; } SMsgCb; int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq); @@ -56,6 +58,7 @@ int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype); int32_t tmsgSendReq(const SMsgCb* pMsgCb, SEpSet* epSet, SRpcMsg* pReq); int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq); void tmsgSendRsp(const SMsgCb* pMsgCb, SRpcMsg* pRsp); +void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg); #ifdef __cplusplus } diff --git a/include/util/tprocess.h b/include/util/tprocess.h index a0be38a3ad..052a6619c8 100644 --- a/include/util/tprocess.h +++ b/include/util/tprocess.h @@ -22,11 +22,14 @@ extern "C" { #endif +typedef enum { PROC_REQ, PROC_RSP, PROC_REGISTER } ProcFuncType; + typedef struct SProcQueue SProcQueue; typedef struct SProcObj SProcObj; typedef void *(*ProcMallocFp)(int32_t contLen); typedef void *(*ProcFreeFp)(void *pCont); -typedef void *(*ProcConsumeFp)(void *pParent, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen); +typedef void *(*ProcConsumeFp)(void *pParent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, + ProcFuncType ftype); typedef struct { int32_t childQueueSize; @@ -52,9 +55,10 @@ int32_t taosProcRun(SProcObj *pProc); void taosProcStop(SProcObj *pProc); bool taosProcIsChild(SProcObj *pProc); int32_t taosProcChildId(SProcObj *pProc); - -int32_t taosProcPutToChildQueue(SProcObj *pProc, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen); -int32_t taosProcPutToParentQueue(SProcObj *pProc, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen); +int32_t taosProcPutToChildQ(SProcObj *pProc, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, + ProcFuncType ftype); +int32_t taosProcPutToParentQ(SProcObj *pProc, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, + ProcFuncType ftype); #ifdef __cplusplus } diff --git a/source/common/src/tmsgcb.c b/source/common/src/tmsgcb.c index 98ee1b679d..328a0c2b0c 100644 --- a/source/common/src/tmsgcb.c +++ b/source/common/src/tmsgcb.c @@ -33,3 +33,7 @@ int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq) { } void tmsgSendRsp(const SMsgCb* pMsgCb, SRpcMsg* pRsp) { return (*pMsgCb->sendRspFp)(pMsgCb->pWrapper, pRsp); } + +void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg) { + (*pMsgCb->registerBrokenLinkArgFp)(pMsgCb->pWrapper, pMsg); +} \ No newline at end of file diff --git a/source/dnode/mgmt/bnode/src/bmInt.c b/source/dnode/mgmt/bnode/src/bmInt.c index e2506ab383..4b87f4463c 100644 --- a/source/dnode/mgmt/bnode/src/bmInt.c +++ b/source/dnode/mgmt/bnode/src/bmInt.c @@ -24,6 +24,7 @@ static void bmInitOption(SBnodeMgmt *pMgmt, SBnodeOpt *pOption) { msgCb.sendReqFp = dndSendReqToDnode; msgCb.sendMnodeReqFp = dndSendReqToMnode; msgCb.sendRspFp = dndSendRsp; + msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg; pOption->msgCb = msgCb; } diff --git a/source/dnode/mgmt/container/inc/dnd.h b/source/dnode/mgmt/container/inc/dnd.h index 7c06e08dff..6968ae4609 100644 --- a/source/dnode/mgmt/container/inc/dnd.h +++ b/source/dnode/mgmt/container/inc/dnd.h @@ -139,6 +139,7 @@ void dndSendMonitorReport(SDnode *pDnode); int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, SEpSet *pEpSet, SRpcMsg *pMsg); void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp); +void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg); int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed); diff --git a/source/dnode/mgmt/container/inc/dndInt.h b/source/dnode/mgmt/container/inc/dndInt.h index 8ea496b2fb..d10835b67f 100644 --- a/source/dnode/mgmt/container/inc/dndInt.h +++ b/source/dnode/mgmt/container/inc/dndInt.h @@ -56,6 +56,7 @@ void dndCleanupServer(SDnode *pDnode); int32_t dndInitClient(SDnode *pDnode); void dndCleanupClient(SDnode *pDnode); int32_t dndInitMsgHandle(SDnode *pDnode); +void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/container/src/dndExec.c b/source/dnode/mgmt/container/src/dndExec.c index 8ffa53f034..61868588da 100644 --- a/source/dnode/mgmt/container/src/dndExec.c +++ b/source/dnode/mgmt/container/src/dndExec.c @@ -116,11 +116,12 @@ static void dndClearNodesExecpt(SDnode *pDnode, ENodeType except) { } } -static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) { +static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen, + ProcFuncType ftype) { SRpcMsg *pRpc = &pMsg->rpcMsg; pRpc->pCont = pCont; - dTrace("msg:%p, get from child process queue, type:%s handle:%p app:%p", pMsg, TMSG_INFO(pRpc->msgType), - pRpc->handle, pRpc->ahandle); + dTrace("msg:%p, get from child process queue, type:%s handle:%p app:%p", pMsg, TMSG_INFO(pRpc->msgType), pRpc->handle, + pRpc->ahandle); NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)]; int32_t code = (*msgFp)(pWrapper, pMsg); @@ -138,13 +139,21 @@ static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t } } -static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, int32_t msgLen, void *pCont, int32_t contLen) { - pRpc->pCont = pCont; - dTrace("msg:%p, get from parent process queue, type:%s handle:%p app:%p", pRpc, TMSG_INFO(pRpc->msgType), - pRpc->handle, pRpc->ahandle); +static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen, + ProcFuncType ftype) { + pMsg->pCont = pCont; + dTrace("msg:%p, get from parent process queue, type:%s handle:%p app:%p", pMsg, TMSG_INFO(pMsg->msgType), + pMsg->handle, pMsg->ahandle); - dndSendRsp(pWrapper, pRpc); - taosMemoryFree(pRpc); + switch (ftype) { + case PROC_REGISTER: + rpcRegisterBrokenLinkArg(pMsg); + break; + default: + dndSendRpcRsp(pWrapper, pMsg); + break; + } + taosMemoryFree(pMsg); } static int32_t dndRunInMultiProcess(SDnode *pDnode) { diff --git a/source/dnode/mgmt/container/src/dndMsg.c b/source/dnode/mgmt/container/src/dndMsg.c index e4ecbe6af8..3aafa5a5e3 100644 --- a/source/dnode/mgmt/container/src/dndMsg.c +++ b/source/dnode/mgmt/container/src/dndMsg.c @@ -68,7 +68,7 @@ void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) { } else if (pWrapper->procType == PROC_PARENT) { dTrace("msg:%p, is created and will put into child queue, handle:%p app:%p user:%s", pMsg, pRpc->handle, pRpc->ahandle, pMsg->user); - code = taosProcPutToChildQueue(pWrapper->pProc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen); + code = taosProcPutToChildQ(pWrapper->pProc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen, PROC_REQ); } else { dTrace("msg:%p, should not processed in child process, handle:%p app:%p user:%s", pMsg, pRpc->handle, pRpc->ahandle, pMsg->user); diff --git a/source/dnode/mgmt/container/src/dndTransport.c b/source/dnode/mgmt/container/src/dndTransport.c index b7d0cf26c0..797e2535d6 100644 --- a/source/dnode/mgmt/container/src/dndTransport.c +++ b/source/dnode/mgmt/container/src/dndTransport.c @@ -348,7 +348,7 @@ int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pReq) { } } -static void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) { +void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) { if (pRsp->code == TSDB_CODE_APP_NOT_READY) { SMgmtWrapper *pDnodeWrapper = dndAcquireWrapper(pWrapper->pDnode, DNODE); if (pDnodeWrapper != NULL) { @@ -366,7 +366,7 @@ void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) { if (pWrapper->procType == PROC_CHILD) { int32_t code = -1; do { - code = taosProcPutToParentQueue(pWrapper->pProc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen); + code = taosProcPutToParentQ(pWrapper->pProc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_RSP); if (code != 0) { taosMsleep(10); } @@ -375,3 +375,17 @@ void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) { dndSendRpcRsp(pWrapper, pRsp); } } + +void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) { + if (pWrapper->procType == PROC_CHILD) { + int32_t code = -1; + do { + code = taosProcPutToParentQ(pWrapper->pProc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, PROC_REGISTER); + if (code != 0) { + taosMsleep(10); + } + } while (code != 0); + } else { + rpcRegisterBrokenLinkArg(pMsg); + } +} \ No newline at end of file diff --git a/source/dnode/mgmt/mnode/src/mmInt.c b/source/dnode/mgmt/mnode/src/mmInt.c index f5a3252fa2..3d15c9b9ae 100644 --- a/source/dnode/mgmt/mnode/src/mmInt.c +++ b/source/dnode/mgmt/mnode/src/mmInt.c @@ -52,6 +52,7 @@ static void mmInitOption(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { msgCb.sendReqFp = dndSendReqToDnode; msgCb.sendMnodeReqFp = dndSendReqToMnode; msgCb.sendRspFp = dndSendRsp; + msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg; pOption->msgCb = msgCb; } diff --git a/source/dnode/mgmt/qnode/src/qmInt.c b/source/dnode/mgmt/qnode/src/qmInt.c index c8cb7258c3..11c80a2904 100644 --- a/source/dnode/mgmt/qnode/src/qmInt.c +++ b/source/dnode/mgmt/qnode/src/qmInt.c @@ -27,6 +27,7 @@ static void qmInitOption(SQnodeMgmt *pMgmt, SQnodeOpt *pOption) { msgCb.sendReqFp = dndSendReqToDnode; msgCb.sendMnodeReqFp = dndSendReqToMnode; msgCb.sendRspFp = dndSendRsp; + msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg; pOption->msgCb = msgCb; } diff --git a/source/dnode/mgmt/snode/src/smInt.c b/source/dnode/mgmt/snode/src/smInt.c index 351f7d656e..a639fc76bb 100644 --- a/source/dnode/mgmt/snode/src/smInt.c +++ b/source/dnode/mgmt/snode/src/smInt.c @@ -24,6 +24,7 @@ static void smInitOption(SSnodeMgmt *pMgmt, SSnodeOpt *pOption) { msgCb.sendReqFp = dndSendReqToDnode; msgCb.sendMnodeReqFp = dndSendReqToMnode; msgCb.sendRspFp = dndSendRsp; + msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg; pOption->msgCb = msgCb; } diff --git a/source/dnode/mgmt/vnode/src/vmInt.c b/source/dnode/mgmt/vnode/src/vmInt.c index 464e789e46..e40c2658e4 100644 --- a/source/dnode/mgmt/vnode/src/vmInt.c +++ b/source/dnode/mgmt/vnode/src/vmInt.c @@ -137,6 +137,7 @@ static void *vmOpenVnodeFunc(void *param) { msgCb.sendReqFp = dndSendReqToDnode; msgCb.sendMnodeReqFp = dndSendReqToMnode; msgCb.sendRspFp = dndSendRsp; + msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg; SVnodeCfg cfg = {.msgCb = msgCb, .pTfs = pMgmt->pTfs, .vgId = pCfg->vgId, .dbId = pCfg->dbUid}; SVnode *pImpl = vnodeOpen(pCfg->path, &cfg); if (pImpl == NULL) { diff --git a/source/dnode/mgmt/vnode/src/vmMsg.c b/source/dnode/mgmt/vnode/src/vmMsg.c index 8090319582..bcc494fd07 100644 --- a/source/dnode/mgmt/vnode/src/vmMsg.c +++ b/source/dnode/mgmt/vnode/src/vmMsg.c @@ -91,6 +91,7 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { msgCb.sendReqFp = dndSendReqToDnode; msgCb.sendMnodeReqFp = dndSendReqToMnode; msgCb.sendRspFp = dndSendRsp; + msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg; vnodeCfg.msgCb = msgCb; vnodeCfg.pTfs = pMgmt->pTfs; diff --git a/source/libs/qworker/src/qworkerMsg.c b/source/libs/qworker/src/qworkerMsg.c index ff9527f7b9..76c42581f1 100644 --- a/source/libs/qworker/src/qworkerMsg.c +++ b/source/libs/qworker/src/qworkerMsg.c @@ -287,7 +287,7 @@ int32_t qwRegisterBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) { .code = TSDB_CODE_RPC_NETWORK_UNAVAIL, }; - rpcRegisterBrokenLinkArg(&pMsg); + tmsgRegisterBrokenLinkArg(&mgmt->msgCb, &pMsg); return TSDB_CODE_SUCCESS; } diff --git a/source/util/src/tprocess.c b/source/util/src/tprocess.c index 650794971c..fcc5a0bcb8 100644 --- a/source/util/src/tprocess.c +++ b/source/util/src/tprocess.c @@ -207,7 +207,8 @@ static void taosProcCleanupQueue(SProcQueue *pQueue) { } } -static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int32_t rawHeadLen, char *pBody, int32_t rawBodyLen) { +static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int16_t rawHeadLen, char *pBody, int32_t rawBodyLen, + ProcFuncType funcType) { const int32_t headLen = CEIL8(rawHeadLen); const int32_t bodyLen = CEIL8(rawBodyLen); const int32_t fullLen = headLen + bodyLen + 8; @@ -225,10 +226,12 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int32_t rawHea } if (pQueue->tail < pQueue->total) { - *(int32_t *)(pQueue->pBuffer + pQueue->head) = headLen; + *(int16_t *)(pQueue->pBuffer + pQueue->head) = headLen; + *(int8_t *)(pQueue->pBuffer + pQueue->head + 2) = (int8_t)funcType; *(int32_t *)(pQueue->pBuffer + pQueue->head + 4) = bodyLen; } else { - *(int32_t *)(pQueue->pBuffer) = headLen; + *(int16_t *)(pQueue->pBuffer) = headLen; + *(int8_t *)(pQueue->pBuffer + pQueue->head + 2) = (int8_t)funcType; *(int32_t *)(pQueue->pBuffer + 4) = bodyLen; } @@ -268,13 +271,13 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int32_t rawHea taosThreadMutexUnlock(pQueue->mutex); tsem_post(&pQueue->sem); - uTrace("proc:%s, push msg to queue:%p remains:%d, head:%d:%p body:%d:%p", pQueue->name, pQueue, pQueue->items, - headLen, pHead, bodyLen, pBody); + uTrace("proc:%s, push msg to queue:%p remains:%d, head:%d:%p body:%d:%p ftype:%d", pQueue->name, pQueue, pQueue->items, + headLen, pHead, bodyLen, pBody, funcType); return 0; } -static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int32_t *pHeadLen, void **ppBody, - int32_t *pBodyLen) { +static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHeadLen, void **ppBody, + int32_t *pBodyLen, ProcFuncType *pFuncType) { tsem_wait(&pQueue->sem); taosThreadMutexLock(pQueue->mutex); @@ -285,13 +288,16 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int32_t *pHea return 0; } - int32_t headLen = 0; + int16_t headLen = 0; + int8_t ftype = 0; int32_t bodyLen = 0; if (pQueue->head < pQueue->total) { - headLen = *(int32_t *)(pQueue->pBuffer + pQueue->head); + headLen = *(int16_t *)(pQueue->pBuffer + pQueue->head); + ftype = *(int8_t *)(pQueue->pBuffer + pQueue->head + 2); bodyLen = *(int32_t *)(pQueue->pBuffer + pQueue->head + 4); } else { - headLen = *(int32_t *)(pQueue->pBuffer); + headLen = *(int16_t *)(pQueue->pBuffer); + headLen = *(int8_t *)(pQueue->pBuffer + 2); bodyLen = *(int32_t *)(pQueue->pBuffer + 4); } @@ -345,9 +351,10 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int32_t *pHea *ppBody = pBody; *pHeadLen = headLen; *pBodyLen = bodyLen; + *pFuncType = (ProcFuncType)ftype; - uTrace("proc:%s, pop msg from queue:%p remains:%d, head:%d:%p body:%d:%p", pQueue->name, pQueue, pQueue->items, - headLen, pHead, bodyLen, pBody); + uTrace("proc:%s, pop msg from queue:%p remains:%d, head:%d:%p body:%d:%p ftype:%d", pQueue->name, pQueue, pQueue->items, + headLen, pHead, bodyLen, pBody, ftype); return 1; } @@ -400,12 +407,14 @@ static void taosProcThreadLoop(SProcQueue *pQueue) { ProcConsumeFp consumeFp = pQueue->consumeFp; void *pParent = pQueue->pParent; void *pHead, *pBody; - int32_t headLen, bodyLen; + int16_t headLen; + ProcFuncType ftype; + int32_t bodyLen; uDebug("proc:%s, start to get msg from queue:%p", pQueue->name, pQueue); while (1) { - int32_t numOfMsgs = taosProcQueuePop(pQueue, &pHead, &headLen, &pBody, &bodyLen); + int32_t numOfMsgs = taosProcQueuePop(pQueue, &pHead, &headLen, &pBody, &bodyLen, &ftype); if (numOfMsgs == 0) { uDebug("proc:%s, get no msg from queue:%p and exit the proc thread", pQueue->name, pQueue); break; @@ -414,7 +423,7 @@ static void taosProcThreadLoop(SProcQueue *pQueue) { taosMsleep(1); continue; } else { - (*consumeFp)(pParent, pHead, headLen, pBody, bodyLen); + (*consumeFp)(pParent, pHead, headLen, pBody, bodyLen, ftype); } } } @@ -462,10 +471,12 @@ void taosProcCleanup(SProcObj *pProc) { } } -int32_t taosProcPutToChildQueue(SProcObj *pProc, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen) { - return taosProcQueuePush(pProc->pChildQueue, pHead, headLen, pBody, bodyLen); +int32_t taosProcPutToChildQ(SProcObj *pProc, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, + ProcFuncType funcType) { + return taosProcQueuePush(pProc->pChildQueue, pHead, headLen, pBody, bodyLen, funcType); } -int32_t taosProcPutToParentQueue(SProcObj *pProc, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen) { - return taosProcQueuePush(pProc->pParentQueue, pHead, headLen, pBody, bodyLen); +int32_t taosProcPutToParentQ(SProcObj *pProc, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, + ProcFuncType funcType) { + return taosProcQueuePush(pProc->pParentQueue, pHead, headLen, pBody, bodyLen, funcType); }