From ac6b121348d2e772be9984146e5eff9b42fe740e Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 29 Mar 2022 13:39:55 +0800 Subject: [PATCH 1/4] shm --- include/common/tmsgcb.h | 3 +- include/util/tprocess.h | 2 +- source/common/src/tmsgcb.c | 8 ++- source/dnode/mgmt/container/inc/dnd.h | 3 ++ source/dnode/mgmt/container/src/dndExec.c | 11 +++- .../dnode/mgmt/container/src/dndTransport.c | 51 ++++++++++++------- source/libs/qworker/src/qworker.c | 4 +- 7 files changed, 57 insertions(+), 25 deletions(-) diff --git a/include/common/tmsgcb.h b/include/common/tmsgcb.h index 50ff735f86..3ccbbf4c47 100644 --- a/include/common/tmsgcb.h +++ b/include/common/tmsgcb.h @@ -56,13 +56,14 @@ typedef struct { ReleaseHandleFp releaseHandleFp; } SMsgCb; +void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb); int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq); int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype); int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq); int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq); void tmsgSendRsp(const SMsgCb* pMsgCb, const SRpcMsg* pRsp); void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg); -void tmsgReleaseHandle(const SMsgCb* pMsgCb, void* handle, int8_t type); +void tmsgReleaseHandle(void* handle, int8_t type); #ifdef __cplusplus } diff --git a/include/util/tprocess.h b/include/util/tprocess.h index be6d58615e..51ce0243b7 100644 --- a/include/util/tprocess.h +++ b/include/util/tprocess.h @@ -22,7 +22,7 @@ extern "C" { #endif -typedef enum { PROC_REQ, PROC_RSP, PROC_REGISTER } ProcFuncType; +typedef enum { PROC_REQ, PROC_RSP, PROC_REG, PROC_RELEASE } ProcFuncType; typedef struct SProcQueue SProcQueue; typedef struct SProcObj SProcObj; diff --git a/source/common/src/tmsgcb.c b/source/common/src/tmsgcb.c index 2b68aab95a..857a501625 100644 --- a/source/common/src/tmsgcb.c +++ b/source/common/src/tmsgcb.c @@ -16,6 +16,10 @@ #define _DEFAULT_SOURCE #include "tmsgcb.h" +static SMsgCb tsDefaultMsgCb; + +void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb) { tsDefaultMsgCb = *pMsgCb; } + int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq) { return (*pMsgCb->queueFps[qtype])(pMsgCb->pWrapper, pReq); } @@ -38,6 +42,6 @@ void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg) { (*pMsgCb->registerBrokenLinkArgFp)(pMsgCb->pWrapper, pMsg); } -void tmsgReleaseHandle(const SMsgCb* pMsgCb, void* handle, int8_t type) { - (*pMsgCb->releaseHandleFp)(pMsgCb->pWrapper, handle, type); +void tmsgReleaseHandle(void* handle, int8_t type) { + (*tsDefaultMsgCb.releaseHandleFp)(tsDefaultMsgCb.pWrapper, handle, type); } \ No newline at end of file diff --git a/source/dnode/mgmt/container/inc/dnd.h b/source/dnode/mgmt/container/inc/dnd.h index 3cdde09532..4c6a9cf1ab 100644 --- a/source/dnode/mgmt/container/inc/dnd.h +++ b/source/dnode/mgmt/container/inc/dnd.h @@ -33,6 +33,7 @@ #include "tthread.h" #include "ttime.h" #include "tworker.h" +#include "tmsgcb.h" #include "dnode.h" #include "monitor.h" @@ -140,6 +141,8 @@ int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pMsg); void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp); void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); +void dndReleaseHandle(SMgmtWrapper *pWrapper, void *handle, int8_t type); +SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper); int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg); int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed); diff --git a/source/dnode/mgmt/container/src/dndExec.c b/source/dnode/mgmt/container/src/dndExec.c index 412fa388b6..56e64d3aed 100644 --- a/source/dnode/mgmt/container/src/dndExec.c +++ b/source/dnode/mgmt/container/src/dndExec.c @@ -77,6 +77,8 @@ static int32_t dndRunInSingleProcess(SDnode *pDnode) { SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; pWrapper->required = dndRequireNode(pWrapper); if (!pWrapper->required) continue; + SMsgCb msgCb = dndCreateMsgcb(pWrapper); + tmsgSetDefaultMsgCb(&msgCb); if (taosMkDir(pWrapper->path) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); @@ -146,9 +148,13 @@ static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t pMsg->ahandle); switch (ftype) { - case PROC_REGISTER: + case PROC_REG: rpcRegisterBrokenLinkArg(pMsg); break; + case PROC_RELEASE: + rpcReleaseHandle(pMsg->handle, (int8_t)pMsg->code); + rpcFreeCont(pCont); + break; default: dndSendRpcRsp(pWrapper, pMsg); break; @@ -164,6 +170,9 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) { pWrapper->required = dndRequireNode(pWrapper); if (!pWrapper->required) continue; + SMsgCb msgCb = dndCreateMsgcb(pWrapper); + tmsgSetDefaultMsgCb(&msgCb); + if (taosMkDir(pWrapper->path) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); dError("failed to create dir:%s since %s", pWrapper->path, terrstr()); diff --git a/source/dnode/mgmt/container/src/dndTransport.c b/source/dnode/mgmt/container/src/dndTransport.c index baf25b73d6..626dd6babb 100644 --- a/source/dnode/mgmt/container/src/dndTransport.c +++ b/source/dnode/mgmt/container/src/dndTransport.c @@ -363,29 +363,44 @@ void dndSendRpcRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) { } void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) { - if (pWrapper->procType == PROC_CHILD) { - int32_t code = -1; - do { - code = taosProcPutToParentQ(pWrapper->pProc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_RSP); - if (code != 0) { - taosMsleep(10); - } - } while (code != 0); - } else { + if (pWrapper->procType != PROC_CHILD) { dndSendRpcRsp(pWrapper, pRsp); + } else { + while (taosProcPutToParentQ(pWrapper->pProc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_RSP) != 0) { + taosMsleep(1); + } } } void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) { - if (pWrapper->procType == PROC_CHILD) { - int32_t code = -1; - do { - code = taosProcPutToParentQ(pWrapper->pProc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, PROC_REGISTER); - if (code != 0) { - taosMsleep(10); - } - } while (code != 0); - } else { + if (pWrapper->procType != PROC_CHILD) { rpcRegisterBrokenLinkArg(pMsg); + } else { + while (taosProcPutToParentQ(pWrapper->pProc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, PROC_REG) != 0) { + taosMsleep(1); + } } +} + +void dndReleaseHandle(SMgmtWrapper *pWrapper, void *handle, int8_t type) { + if (pWrapper->procType != PROC_CHILD) { + rpcReleaseHandle(handle, type); + } else { + SRpcMsg msg = {.handle = handle, .code = type}; + while (taosProcPutToParentQ(pWrapper->pProc, &msg, sizeof(SRpcMsg), NULL, 0, PROC_RELEASE) != 0) { + taosMsleep(1); + } + } +} + +SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper) { + SMsgCb msgCb = { + .pWrapper = pWrapper, + .registerBrokenLinkArgFp = dndRegisterBrokenLinkArg, + .releaseHandleFp = dndReleaseHandle, + .sendMnodeReqFp = dndSendReqToMnode, + .sendReqFp = dndSendReqToDnode, + .sendRspFp = dndSendRsp, + }; + return msgCb; } \ No newline at end of file diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 133a80b154..7612ca5704 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -432,7 +432,7 @@ int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { void qwFreeTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { - rpcReleaseHandle(ctx->connInfo.handle, TAOS_CONN_SERVER); + tmsgReleaseHandle(ctx->connInfo.handle, TAOS_CONN_SERVER); ctx->connInfo.handle = NULL; qwFreeTaskHandle(QW_FPARAMS(), &ctx->taskHandle); @@ -1282,7 +1282,7 @@ int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { QW_LOCK(QW_WRITE, &sch->hbConnLock); if (sch->hbConnInfo.handle) { - rpcReleaseHandle(sch->hbConnInfo.handle, TAOS_CONN_SERVER); + tmsgReleaseHandle(sch->hbConnInfo.handle, TAOS_CONN_SERVER); } memcpy(&sch->hbConnInfo, &qwMsg->connInfo, sizeof(qwMsg->connInfo)); From 232404fa86c157ec7e13808b20691ee170ed745b Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 29 Mar 2022 15:21:48 +0800 Subject: [PATCH 2/4] shm queue --- source/util/src/tprocess.c | 49 ++++++++++++++++++-------------------- 1 file changed, 23 insertions(+), 26 deletions(-) diff --git a/source/util/src/tprocess.c b/source/util/src/tprocess.c index 2675c6cdf2..b2d4d1eddc 100644 --- a/source/util/src/tprocess.c +++ b/source/util/src/tprocess.c @@ -208,16 +208,11 @@ static void taosProcCleanupQueue(SProcQueue *pQueue) { } static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t rawHeadLen, const char *pBody, - int32_t rawBodyLen, ProcFuncType funcType) { + int32_t rawBodyLen, ProcFuncType ftype) { const int32_t headLen = CEIL8(rawHeadLen); const int32_t bodyLen = CEIL8(rawBodyLen); const int32_t fullLen = headLen + bodyLen + 8; - if (headLen <= 0 || bodyLen <= 0) { - terrno = TSDB_CODE_INVALID_PARA; - return -1; - } - taosThreadMutexLock(pQueue->mutex); if (fullLen > pQueue->avail) { taosThreadMutexUnlock(pQueue->mutex); @@ -225,13 +220,14 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t return -1; } + const int32_t pos = pQueue->tail; if (pQueue->tail < pQueue->total) { - *(int16_t *)(pQueue->pBuffer + pQueue->head) = headLen; - *(int8_t *)(pQueue->pBuffer + pQueue->head + 2) = (int8_t)funcType; - *(int32_t *)(pQueue->pBuffer + pQueue->head + 4) = bodyLen; + *(int16_t *)(pQueue->pBuffer + pQueue->tail) = headLen; + *(int8_t *)(pQueue->pBuffer + pQueue->tail + 2) = (int8_t)ftype; + *(int32_t *)(pQueue->pBuffer + pQueue->tail + 4) = bodyLen; } else { *(int16_t *)(pQueue->pBuffer) = headLen; - *(int8_t *)(pQueue->pBuffer + pQueue->head + 2) = (int8_t)funcType; + *(int8_t *)(pQueue->pBuffer + 2) = (int8_t)ftype; *(int32_t *)(pQueue->pBuffer + 4) = bodyLen; } @@ -250,19 +246,19 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t memcpy(pQueue->pBuffer + headLen, pBody, rawBodyLen); pQueue->tail = headLen + bodyLen; } else if (remain < 8 + headLen) { - memcpy(pQueue->pBuffer + pQueue->head + 8, pHead, remain - 8); + memcpy(pQueue->pBuffer + pQueue->tail + 8, pHead, remain - 8); memcpy(pQueue->pBuffer, pHead + remain - 8, rawHeadLen - (remain - 8)); memcpy(pQueue->pBuffer + headLen - (remain - 8), pBody, rawBodyLen); pQueue->tail = headLen - (remain - 8) + bodyLen; - } else if (remain < 8 + bodyLen) { - memcpy(pQueue->pBuffer + pQueue->head + 8, pHead, rawHeadLen); - memcpy(pQueue->pBuffer + pQueue->head + 8 + headLen, pBody, remain - 8 - headLen); + } else if (remain < 8 + headLen + bodyLen) { + memcpy(pQueue->pBuffer + pQueue->tail + 8, pHead, rawHeadLen); + memcpy(pQueue->pBuffer + pQueue->tail + 8 + headLen, pBody, remain - 8 - headLen); memcpy(pQueue->pBuffer, pBody + remain - 8 - headLen, rawBodyLen - (remain - 8 - headLen)); pQueue->tail = bodyLen - (remain - 8 - headLen); } else { - memcpy(pQueue->pBuffer + pQueue->head + 8, pHead, rawHeadLen); - memcpy(pQueue->pBuffer + pQueue->head + headLen + 8, pBody, rawBodyLen); - pQueue->tail = pQueue->head + headLen + bodyLen + 8; + memcpy(pQueue->pBuffer + pQueue->tail + 8, pHead, rawHeadLen); + memcpy(pQueue->pBuffer + pQueue->tail + headLen + 8, pBody, rawBodyLen); + pQueue->tail = pQueue->tail + headLen + bodyLen + 8; } } @@ -271,8 +267,8 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t taosThreadMutexUnlock(pQueue->mutex); tsem_post(&pQueue->sem); - uTrace("proc:%s, push msg to queue:%p remains:%d, head:%d:%p body:%d:%p ftype:%d", pQueue->name, pQueue, pQueue->items, - headLen, pHead, bodyLen, pBody, funcType); + uTrace("proc:%s, push msg at pos:%d ftype:%d remain:%d, head:%d %p body:%d %p", pQueue->name, pos, ftype, + pQueue->items, headLen, pHead, bodyLen, pBody); return 0; } @@ -312,6 +308,7 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea return -1; } + const int32_t pos = pQueue->head; if (pQueue->head < pQueue->tail) { memcpy(pHead, pQueue->pBuffer + pQueue->head + 8, headLen); memcpy(pBody, pQueue->pBuffer + pQueue->head + 8 + headLen, bodyLen); @@ -331,7 +328,7 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea memcpy(pHead + remain - 8, pQueue->pBuffer, headLen - (remain - 8)); memcpy(pBody, pQueue->pBuffer + headLen - (remain - 8), bodyLen); pQueue->head = headLen - (remain - 8) + bodyLen; - } else if (remain < 8 + bodyLen) { + } else if (remain < 8 + headLen + bodyLen) { memcpy(pHead, pQueue->pBuffer + pQueue->head + 8, headLen); memcpy(pBody, pQueue->pBuffer + pQueue->head + 8 + headLen, remain - 8 - headLen); memcpy(pBody + remain - 8 - headLen, pQueue->pBuffer, bodyLen - (remain - 8 - headLen)); @@ -353,8 +350,8 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea *pBodyLen = bodyLen; *pFuncType = (ProcFuncType)ftype; - uTrace("proc:%s, pop msg from queue:%p remains:%d, head:%d:%p body:%d:%p ftype:%d", pQueue->name, pQueue, pQueue->items, - headLen, pHead, bodyLen, pBody, ftype); + uTrace("proc:%s, pop msg at pos:%d ftype:%d remain:%d, head:%d %p body:%d %p", pQueue->name, pos, ftype, + pQueue->items, headLen, pHead, bodyLen, pBody); return 1; } @@ -472,11 +469,11 @@ void taosProcCleanup(SProcObj *pProc) { } int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, - ProcFuncType funcType) { - return taosProcQueuePush(pProc->pChildQueue, pHead, headLen, pBody, bodyLen, funcType); + ProcFuncType ftype) { + return taosProcQueuePush(pProc->pChildQueue, pHead, headLen, pBody, bodyLen, ftype); } int32_t taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, - ProcFuncType funcType) { - return taosProcQueuePush(pProc->pParentQueue, pHead, headLen, pBody, bodyLen, funcType); + ProcFuncType ftype) { + return taosProcQueuePush(pProc->pParentQueue, pHead, headLen, pBody, bodyLen, ftype); } From d76a0c9b6d32f0f09186ad95d1a7aab85d8df584 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 29 Mar 2022 17:15:48 +0800 Subject: [PATCH 3/4] shm --- include/common/tmsgcb.h | 2 +- source/common/src/tmsgcb.c | 2 +- source/dnode/mgmt/container/inc/dnd.h | 1 - source/dnode/mgmt/container/src/dndExec.c | 29 ++++++++----- .../dnode/mgmt/container/src/dndTransport.c | 28 ++++++------- source/dnode/mgmt/vnode/src/vmMsg.c | 3 +- source/dnode/mnode/impl/src/mndTrans.c | 6 +-- source/dnode/vnode/src/tq/tq.c | 6 +-- source/dnode/vnode/src/vnd/vnodeQuery.c | 6 +-- source/libs/qworker/inc/qworkerMsg.h | 12 +++--- source/libs/qworker/src/qworker.c | 32 +++++++------- source/libs/qworker/src/qworkerMsg.c | 42 +++++++++---------- 12 files changed, 86 insertions(+), 83 deletions(-) diff --git a/include/common/tmsgcb.h b/include/common/tmsgcb.h index 3ccbbf4c47..cb59599d9a 100644 --- a/include/common/tmsgcb.h +++ b/include/common/tmsgcb.h @@ -61,7 +61,7 @@ int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq); int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype); int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq); int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq); -void tmsgSendRsp(const SMsgCb* pMsgCb, const SRpcMsg* pRsp); +void tmsgSendRsp(const SRpcMsg* pRsp); void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg); void tmsgReleaseHandle(void* handle, int8_t type); diff --git a/source/common/src/tmsgcb.c b/source/common/src/tmsgcb.c index 857a501625..e90634a604 100644 --- a/source/common/src/tmsgcb.c +++ b/source/common/src/tmsgcb.c @@ -36,7 +36,7 @@ int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq) { return (*pMsgCb->sendMnodeReqFp)(pMsgCb->pWrapper, pReq); } -void tmsgSendRsp(const SMsgCb* pMsgCb, const SRpcMsg* pRsp) { return (*pMsgCb->sendRspFp)(pMsgCb->pWrapper, pRsp); } +void tmsgSendRsp(const SRpcMsg* pRsp) { return (*tsDefaultMsgCb.sendRspFp)(tsDefaultMsgCb.pWrapper, pRsp); } void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg) { (*pMsgCb->registerBrokenLinkArgFp)(pMsgCb->pWrapper, pMsg); diff --git a/source/dnode/mgmt/container/inc/dnd.h b/source/dnode/mgmt/container/inc/dnd.h index 4c6a9cf1ab..57f127ec26 100644 --- a/source/dnode/mgmt/container/inc/dnd.h +++ b/source/dnode/mgmt/container/inc/dnd.h @@ -141,7 +141,6 @@ int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pMsg); void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp); void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); -void dndReleaseHandle(SMgmtWrapper *pWrapper, void *handle, int8_t type); SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper); int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg); diff --git a/source/dnode/mgmt/container/src/dndExec.c b/source/dnode/mgmt/container/src/dndExec.c index 56e64d3aed..5a9077a937 100644 --- a/source/dnode/mgmt/container/src/dndExec.c +++ b/source/dnode/mgmt/container/src/dndExec.c @@ -122,8 +122,7 @@ static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t ProcFuncType ftype) { SRpcMsg *pRpc = &pMsg->rpcMsg; pRpc->pCont = pCont; - dTrace("msg:%p, get from child queue, type:%s handle:%p app:%p", pMsg, TMSG_INFO(pRpc->msgType), pRpc->handle, - pRpc->ahandle); + dTrace("msg:%p, get from child queue, handle:%p app:%p", pMsg, pRpc->handle, pRpc->ahandle); NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)]; int32_t code = (*msgFp)(pWrapper, pMsg); @@ -144,8 +143,7 @@ static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen, ProcFuncType ftype) { pMsg->pCont = pCont; - dTrace("msg:%p, get from parent queue, type:%s handle:%p app:%p", pMsg, TMSG_INFO(pMsg->msgType), pMsg->handle, - pMsg->ahandle); + dTrace("msg:%p, get from parent queue, handle:%p app:%p", pMsg, pMsg->handle, pMsg->ahandle); switch (ftype) { case PROC_REG: @@ -155,6 +153,9 @@ static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t rpcReleaseHandle(pMsg->handle, (int8_t)pMsg->code); rpcFreeCont(pCont); break; + case PROC_REQ: + // todo send to dnode + dndSendReqToMnode(pWrapper, pMsg); default: dndSendRpcRsp(pWrapper, pMsg); break; @@ -220,15 +221,23 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) { dndClearNodesExecpt(pDnode, n); dInfo("node:%s, will be initialized in child process", pWrapper->name); - dndOpenNode(pWrapper); + if (dndOpenNode(pWrapper) != 0) { + dInfo("node:%s, failed to init in child process since %s", pWrapper->name, terrstr()); + return -1; + } + + if (taosProcRun(pProc) != 0) { + dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr()); + return -1; + } + break; } else { dInfo("node:%s, will not start in parent process, child pid:%d", pWrapper->name, taosProcChildId(pProc)); pWrapper->procType = PROC_PARENT; - } - - if (taosProcRun(pProc) != 0) { - dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr()); - return -1; + if (taosProcRun(pProc) != 0) { + dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr()); + return -1; + } } } diff --git a/source/dnode/mgmt/container/src/dndTransport.c b/source/dnode/mgmt/container/src/dndTransport.c index 626dd6babb..07ea0309a8 100644 --- a/source/dnode/mgmt/container/src/dndTransport.c +++ b/source/dnode/mgmt/container/src/dndTransport.c @@ -320,8 +320,7 @@ static int32_t dndSendRpcReq(STransMgmt *pMgmt, const SEpSet *pEpSet, SRpcMsg *p } int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) { - if (pWrapper->procType == PROC_CHILD) { - } else { + if (pWrapper->procType != PROC_CHILD) { SDnode *pDnode = pWrapper->pDnode; if (dndGetStatus(pDnode) != DND_STAT_RUNNING) { terrno = TSDB_CODE_DND_OFFLINE; @@ -329,23 +328,24 @@ int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg return -1; } return dndSendRpcReq(&pDnode->trans, pEpSet, pReq); + } else { + while (taosProcPutToParentQ(pWrapper->pProc, pReq, sizeof(SRpcMsg), pReq->pCont, pReq->contLen, PROC_REQ) != 0) { + taosMsleep(1); + } } } int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pReq) { - if (pWrapper->procType == PROC_CHILD) { - } else { - SDnode *pDnode = pWrapper->pDnode; - STransMgmt *pTrans = &pDnode->trans; - SEpSet epSet = {0}; + SDnode *pDnode = pWrapper->pDnode; + STransMgmt *pTrans = &pDnode->trans; + SEpSet epSet = {0}; - SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, DNODE); - if (pWrapper != NULL) { - dmGetMnodeEpSet(pWrapper->pMgmt, &epSet); - dndReleaseWrapper(pWrapper); - } - return dndSendRpcReq(pTrans, &epSet, pReq); + SMgmtWrapper *pWrapper2 = dndAcquireWrapper(pDnode, DNODE); + if (pWrapper2 != NULL) { + dmGetMnodeEpSet(pWrapper2->pMgmt, &epSet); + dndReleaseWrapper(pWrapper2); } + return dndSendRpcReq(pTrans, &epSet, pReq); } void dndSendRpcRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) { @@ -382,7 +382,7 @@ void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) { } } -void dndReleaseHandle(SMgmtWrapper *pWrapper, void *handle, int8_t type) { +static void dndReleaseHandle(SMgmtWrapper *pWrapper, void *handle, int8_t type) { if (pWrapper->procType != PROC_CHILD) { rpcReleaseHandle(handle, type); } else { diff --git a/source/dnode/mgmt/vnode/src/vmMsg.c b/source/dnode/mgmt/vnode/src/vmMsg.c index bcc494fd07..707cc6b4bf 100644 --- a/source/dnode/mgmt/vnode/src/vmMsg.c +++ b/source/dnode/mgmt/vnode/src/vmMsg.c @@ -68,12 +68,13 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { SWrapperCfg wrapperCfg = {0}; vmGenerateWrapperCfg(pMgmt, &createReq, &wrapperCfg); +#if 0 if (createReq.dnodeId != pMgmt->pDnode->dnodeId) { terrno = TSDB_CODE_DND_VNODE_INVALID_OPTION; dDebug("vgId:%d, failed to create vnode since %s", createReq.vgId, terrstr()); return -1; } - +#endif SVnodeObj *pVnode = vmAcquireVnode(pMgmt, createReq.vgId); if (pVnode != NULL) { dDebug("vgId:%d, already exist", createReq.vgId); diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index d59ed3e605..4e54d56c09 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -771,7 +771,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) { .ahandle = pTrans->rpcAHandle, .pCont = rpcCont, .contLen = pTrans->rpcRspLen}; - tmsgSendRsp(&pMnode->msgCb, &rspMsg); + tmsgSendRsp(&rspMsg); pTrans->rpcHandle = NULL; pTrans->rpcRsp = NULL; pTrans->rpcRspLen = 0; @@ -898,7 +898,7 @@ static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pAr pAction->msgReceived = 0; pAction->errCode = 0; } else { - mDebug("trans:%d, action:%d not send since %s", pTrans->id, action, terrstr()); + mError("trans:%d, action:%d not send since %s", pTrans->id, action, terrstr()); return -1; } } @@ -938,7 +938,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA return errCode; } } else { - mDebug("trans:%d, %d of %d actions executed, code:0x%04x", pTrans->id, numOfReceived, numOfActions, errCode & 0XFFFF); + mDebug("trans:%d, %d of %d actions executing", pTrans->id, numOfReceived, numOfActions); return TSDB_CODE_MND_ACTION_IN_PROGRESS; } } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index efc7ac80e9..b61484bf08 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -275,7 +275,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { pMsg->pCont = NULL; pMsg->contLen = 0; pMsg->code = -1; - rpcSendResponse(pMsg); + tmsgSendRsp(pMsg); return 0; } @@ -340,7 +340,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { pMsg->pCont = buf; pMsg->contLen = tlen; pMsg->code = 0; - rpcSendResponse(pMsg); + tmsgSendRsp(pMsg); return 0; } } else { @@ -367,7 +367,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { pMsg->pCont = buf; pMsg->contLen = tlen; pMsg->code = 0; - rpcSendResponse(pMsg); + tmsgSendRsp(pMsg); /*}*/ return 0; diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 74d7558e0d..8ebdad48f7 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -205,8 +205,7 @@ _exit: rpcMsg.contLen = rspLen; rpcMsg.code = code; - rpcSendResponse(&rpcMsg); - + tmsgSendRsp(&rpcMsg); return TSDB_CODE_SUCCESS; } @@ -276,8 +275,7 @@ static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg) { .code = 0, }; - rpcSendResponse(&rpcMsg); - + tmsgSendRsp(&rpcMsg); taosArrayDestroyEx(pArray, freeItemHelper); return 0; } diff --git a/source/libs/qworker/inc/qworkerMsg.h b/source/libs/qworker/inc/qworkerMsg.h index 2ccd255909..313b9459e2 100644 --- a/source/libs/qworker/inc/qworkerMsg.h +++ b/source/libs/qworker/inc/qworkerMsg.h @@ -30,18 +30,18 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req); -int32_t qwBuildAndSendDropRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code); -int32_t qwBuildAndSendCancelRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code); -int32_t qwBuildAndSendFetchRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, +int32_t qwBuildAndSendDropRsp(SQWConnInfo *pConn, int32_t code); +int32_t qwBuildAndSendCancelRsp(SQWConnInfo *pConn, int32_t code); +int32_t qwBuildAndSendFetchRsp(SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code); void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete); int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn); -int32_t qwBuildAndSendReadyRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code); -int32_t qwBuildAndSendQueryRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code); +int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code); +int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code); void qwFreeFetchRsp(void *msg); int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp); int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp); -int32_t qwBuildAndSendHbRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, SSchedulerHbRsp *rsp, int32_t code); +int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *rsp, int32_t code); int32_t qwRegisterBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn); #ifdef __cplusplus diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 7612ca5704..34159a1355 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -764,7 +764,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu QW_ERR_JRET(qwDropTask(QW_FPARAMS())); dropConnection = NULL; - qwBuildAndSendDropRsp(&mgmt->msgCb, &ctx->connInfo, code); + qwBuildAndSendDropRsp(&ctx->connInfo, code); QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->connInfo.handle, code, tstrerror(code)); QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); @@ -802,7 +802,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu QW_ERR_JRET(qwDropTask(QW_FPARAMS())); dropConnection = NULL; - qwBuildAndSendDropRsp(&mgmt->msgCb, &ctx->connInfo, code); + qwBuildAndSendDropRsp(&ctx->connInfo, code); QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->connInfo.handle, code, tstrerror(code)); QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); @@ -830,12 +830,12 @@ _return: } if (dropConnection) { - qwBuildAndSendDropRsp(&mgmt->msgCb, dropConnection, code); + qwBuildAndSendDropRsp(dropConnection, code); QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", dropConnection->handle, code, tstrerror(code)); } if (cancelConnection) { - qwBuildAndSendCancelRsp(&mgmt->msgCb, cancelConnection, code); + qwBuildAndSendCancelRsp(cancelConnection, code); QW_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", cancelConnection->handle, code, tstrerror(code)); } @@ -886,7 +886,7 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } - qwBuildAndSendDropRsp(&mgmt->msgCb, &ctx->connInfo, code); + qwBuildAndSendDropRsp(&ctx->connInfo, code); QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->connInfo.handle, code, tstrerror(code)); QW_ERR_JRET(qwDropTask(QW_FPARAMS())); @@ -918,7 +918,7 @@ _return: } if (readyConnection) { - qwBuildAndSendReadyRsp(&mgmt->msgCb, readyConnection, code); + qwBuildAndSendReadyRsp(readyConnection, code); QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", readyConnection->handle, code, tstrerror(code)); } @@ -970,7 +970,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) { QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } - QW_ERR_JRET(qwBuildAndSendQueryRsp(&mgmt->msgCb, &qwMsg->connInfo, code)); + QW_ERR_JRET(qwBuildAndSendQueryRsp(&qwMsg->connInfo, code)); QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); queryRsped = true; @@ -988,7 +988,7 @@ _return: code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL); if (!queryRsped) { - qwBuildAndSendQueryRsp(&mgmt->msgCb, &qwMsg->connInfo, code); + qwBuildAndSendQueryRsp(&qwMsg->connInfo, code); QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); } @@ -1051,7 +1051,7 @@ _return: } if (needRsp) { - qwBuildAndSendReadyRsp(&mgmt->msgCb, &qwMsg->connInfo, code); + qwBuildAndSendReadyRsp(&qwMsg->connInfo, code); QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); } @@ -1095,7 +1095,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { qwMsg->connInfo = ctx->connInfo; QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); - qwBuildAndSendFetchRsp(&mgmt->msgCb, &qwMsg->connInfo, rsp, dataLen, code); + qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code); QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), dataLen); } else { atomic_store_8((int8_t*)&ctx->queryContinue, 1); @@ -1114,7 +1114,7 @@ _return: rsp = NULL; qwMsg->connInfo = ctx->connInfo; - qwBuildAndSendFetchRsp(&mgmt->msgCb, &qwMsg->connInfo, rsp, 0, code); + qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, 0, code); QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), 0); } @@ -1195,7 +1195,7 @@ _return: } if (code || rsp) { - qwBuildAndSendFetchRsp(&mgmt->msgCb, &qwMsg->connInfo, rsp, dataLen, code); + qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code); QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), dataLen); } @@ -1226,7 +1226,7 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx)); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING); } else if (ctx->phase > 0) { - qwBuildAndSendDropRsp(&mgmt->msgCb, &qwMsg->connInfo, code); + qwBuildAndSendDropRsp(&qwMsg->connInfo, code); QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); QW_ERR_JRET(qwDropTask(QW_FPARAMS())); @@ -1261,7 +1261,7 @@ _return: } if (TSDB_CODE_SUCCESS != code) { - qwBuildAndSendDropRsp(&mgmt->msgCb, &qwMsg->connInfo, code); + qwBuildAndSendDropRsp(&qwMsg->connInfo, code); QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); } @@ -1297,7 +1297,7 @@ int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { _return: - qwBuildAndSendHbRsp(&mgmt->msgCb, &qwMsg->connInfo, &rsp, code); + qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code); QW_DLOG("hb rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); QW_RET(TSDB_CODE_SUCCESS); @@ -1351,7 +1351,7 @@ _return: QW_UNLOCK(QW_READ, &mgmt->schLock); for (int32_t j = 0; j < i; ++j) { - qwBuildAndSendHbRsp(&mgmt->msgCb, &rspList[j].connInfo, &rspList[j].rsp, code); + qwBuildAndSendHbRsp(&rspList[j].connInfo, &rspList[j].rsp, code); QW_DLOG("hb rsp send, handle:%p, code:%x - %s, taskNum:%d", rspList[j].connInfo.handle, code, tstrerror(code), (rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0)); tFreeSSchedulerHbRsp(&rspList[j].rsp); diff --git a/source/libs/qworker/src/qworkerMsg.c b/source/libs/qworker/src/qworkerMsg.c index 78553462b8..80ae013653 100644 --- a/source/libs/qworker/src/qworkerMsg.c +++ b/source/libs/qworker/src/qworkerMsg.c @@ -46,7 +46,7 @@ void qwFreeFetchRsp(void *msg) { } } -int32_t qwBuildAndSendQueryRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code) { +int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code) { SQueryTableRsp rsp = {.code = code}; int32_t contLen = tSerializeSQueryTableRsp(NULL, 0, &rsp); @@ -62,12 +62,12 @@ int32_t qwBuildAndSendQueryRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t .code = code, }; - tmsgSendRsp(pMsgCb, &rpcRsp); + tmsgSendRsp(&rpcRsp); return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendReadyRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code) { +int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code) { SResReadyRsp *pRsp = (SResReadyRsp *)rpcMallocCont(sizeof(SResReadyRsp)); pRsp->code = code; @@ -80,12 +80,12 @@ int32_t qwBuildAndSendReadyRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t .code = code, }; - tmsgSendRsp(pMsgCb, &rpcRsp); + tmsgSendRsp(&rpcRsp); return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendHbRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) { +int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) { int32_t contLen = tSerializeSSchedulerHbRsp(NULL, 0, pStatus); void *pRsp = rpcMallocCont(contLen); tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus); @@ -99,12 +99,12 @@ int32_t qwBuildAndSendHbRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, SScheduler .code = code, }; - tmsgSendRsp(pMsgCb, &rpcRsp); + tmsgSendRsp(&rpcRsp); return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendFetchRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) { +int32_t qwBuildAndSendFetchRsp(SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) { if (NULL == pRsp) { pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); memset(pRsp, 0, sizeof(SRetrieveTableRsp)); @@ -120,12 +120,12 @@ int32_t qwBuildAndSendFetchRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, SRetrie .code = code, }; - tmsgSendRsp(pMsgCb, &rpcRsp); + tmsgSendRsp(&rpcRsp); return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendCancelRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code) { +int32_t qwBuildAndSendCancelRsp(SQWConnInfo *pConn, int32_t code) { STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp)); pRsp->code = code; @@ -138,11 +138,11 @@ int32_t qwBuildAndSendCancelRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_ .code = code, }; - tmsgSendRsp(pMsgCb, &rpcRsp); + tmsgSendRsp(&rpcRsp); return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendDropRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code) { +int32_t qwBuildAndSendDropRsp(SQWConnInfo *pConn, int32_t code) { STaskDropRsp *pRsp = (STaskDropRsp *)rpcMallocCont(sizeof(STaskDropRsp)); pRsp->code = code; @@ -155,11 +155,11 @@ int32_t qwBuildAndSendDropRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t .code = code, }; - tmsgSendRsp(pMsgCb, &rpcRsp); + tmsgSendRsp(&rpcRsp); return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendShowRsp(const SMsgCb *pMsgCb, SRpcMsg *pMsg, int32_t code) { +int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) { int32_t numOfCols = 6; SVShowTablesRsp showRsp = {0}; @@ -210,11 +210,11 @@ int32_t qwBuildAndSendShowRsp(const SMsgCb *pMsgCb, SRpcMsg *pMsg, int32_t code) .code = code, }; - tmsgSendRsp(pMsgCb, &rpcMsg); + tmsgSendRsp(&rpcMsg); return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendShowFetchRsp(const SMsgCb *pMsgCb, SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchReq) { +int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchReq) { SVShowTablesFetchRsp *pRsp = (SVShowTablesFetchRsp *)rpcMallocCont(sizeof(SVShowTablesFetchRsp)); int32_t handle = htonl(pFetchReq->id); @@ -227,7 +227,7 @@ int32_t qwBuildAndSendShowFetchRsp(const SMsgCb *pMsgCb, SRpcMsg *pMsg, SVShowTa .code = 0, }; - tmsgSendRsp(pMsgCb, &rpcMsg); + tmsgSendRsp(&rpcMsg); return TSDB_CODE_SUCCESS; } @@ -498,7 +498,7 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { _return: - QW_ERR_RET(qwBuildAndSendCancelRsp(&mgmt->msgCb, &qwMsg.connInfo, code)); + QW_ERR_RET(qwBuildAndSendCancelRsp(&qwMsg.connInfo, code)); QW_SCH_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", qwMsg.connInfo.handle, code, tstrerror(code)); return TSDB_CODE_SUCCESS; @@ -584,11 +584,9 @@ int32_t qWorkerProcessShowMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { return TSDB_CODE_QRY_INVALID_INPUT; } - SQWorkerMgmt *pMgmt = qWorkerMgmt; - int32_t code = 0; SVShowTablesReq *pReq = pMsg->pCont; - QW_RET(qwBuildAndSendShowRsp(&pMgmt->msgCb, pMsg, code)); + QW_RET(qwBuildAndSendShowRsp(pMsg, code)); } int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { @@ -596,8 +594,6 @@ int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) return TSDB_CODE_QRY_INVALID_INPUT; } - SQWorkerMgmt *pMgmt = qWorkerMgmt; - SVShowTablesFetchReq *pFetchReq = pMsg->pCont; - QW_RET(qwBuildAndSendShowFetchRsp(&pMgmt->msgCb, pMsg, pFetchReq)); + QW_RET(qwBuildAndSendShowFetchRsp(pMsg, pFetchReq)); } From 457f5450e662fda5a017f0cac65fb25b02977a7e Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 29 Mar 2022 17:46:03 +0800 Subject: [PATCH 4/4] minor changes --- source/dnode/mgmt/vnode/src/vmMsg.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/source/dnode/mgmt/vnode/src/vmMsg.c b/source/dnode/mgmt/vnode/src/vmMsg.c index 707cc6b4bf..bcc494fd07 100644 --- a/source/dnode/mgmt/vnode/src/vmMsg.c +++ b/source/dnode/mgmt/vnode/src/vmMsg.c @@ -68,13 +68,12 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { SWrapperCfg wrapperCfg = {0}; vmGenerateWrapperCfg(pMgmt, &createReq, &wrapperCfg); -#if 0 if (createReq.dnodeId != pMgmt->pDnode->dnodeId) { terrno = TSDB_CODE_DND_VNODE_INVALID_OPTION; dDebug("vgId:%d, failed to create vnode since %s", createReq.vgId, terrstr()); return -1; } -#endif + SVnodeObj *pVnode = vmAcquireVnode(pMgmt, createReq.vgId); if (pVnode != NULL) { dDebug("vgId:%d, already exist", createReq.vgId);