From 30eb8d50b000a84d8ad62a8316c63cfef612eede Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 28 Mar 2022 19:44:53 +0800 Subject: [PATCH 1/4] shm --- include/libs/transport/trpc.h | 12 +++++---- source/dnode/mgmt/container/src/dndMsg.c | 2 ++ source/dnode/mnode/impl/src/mndProfile.c | 31 +++++++++++------------- source/util/src/tprocess.c | 6 ++++- 4 files changed, 28 insertions(+), 23 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 8125de7647..54813a77a3 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -49,11 +49,13 @@ typedef struct SRpcMsg { } SRpcMsg; typedef struct { - char user[TSDB_USER_LEN]; - SRpcMsg rpcMsg; - int32_t rspLen; - void * pRsp; - void * pNode; + char user[TSDB_USER_LEN]; + uint32_t clientIp; + uint16_t clientPort; + SRpcMsg rpcMsg; + int32_t rspLen; + void *pRsp; + void *pNode; } SNodeMsg; typedef struct SRpcInit { diff --git a/source/dnode/mgmt/container/src/dndMsg.c b/source/dnode/mgmt/container/src/dndMsg.c index 5da1d73034..e4ecbe6af8 100644 --- a/source/dnode/mgmt/container/src/dndMsg.c +++ b/source/dnode/mgmt/container/src/dndMsg.c @@ -42,6 +42,8 @@ static inline int32_t dndBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc) { } memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN); + pMsg->clientIp = connInfo.clientIp; + pMsg->clientPort = connInfo.clientPort; memcpy(&pMsg->rpcMsg, pRpc, sizeof(SRpcMsg)); return 0; } diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index bf378a4d43..6c38d8626c 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -44,7 +44,8 @@ typedef struct { SQueryDesc *pQueries; } SConnObj; -static SConnObj *mndCreateConn(SMnode *pMnode, SRpcConnInfo *pInfo, int32_t pid, const char *app, int64_t startTime); +static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, uint32_t ip, uint16_t port, int32_t pid, + const char *app, int64_t startTime); static void mndFreeConn(SConnObj *pConn); static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId); static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn); @@ -94,7 +95,8 @@ void mndCleanupProfile(SMnode *pMnode) { } } -static SConnObj *mndCreateConn(SMnode *pMnode, SRpcConnInfo *pInfo, int32_t pid, const char *app, int64_t startTime) { +static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, uint32_t ip, uint16_t port, int32_t pid, + const char *app, int64_t startTime) { SProfileMgmt *pMgmt = &pMnode->profileMgmt; int32_t connId = atomic_add_fetch_32(&pMgmt->connId, 1); @@ -104,8 +106,8 @@ static SConnObj *mndCreateConn(SMnode *pMnode, SRpcConnInfo *pInfo, int32_t pid, SConnObj connObj = {.id = connId, .appStartTimeMs = startTime, .pid = pid, - .ip = pInfo->clientIp, - .port = pInfo->clientPort, + .ip = ip, + .port = port, .killed = 0, .loginTimeMs = taosGetTimestampMs(), .lastAccessTimeMs = 0, @@ -114,17 +116,17 @@ static SConnObj *mndCreateConn(SMnode *pMnode, SRpcConnInfo *pInfo, int32_t pid, .pQueries = NULL}; connObj.lastAccessTimeMs = connObj.loginTimeMs; - tstrncpy(connObj.user, pInfo->user, TSDB_USER_LEN); + tstrncpy(connObj.user, user, TSDB_USER_LEN); tstrncpy(connObj.app, app, TSDB_APP_NAME_LEN); int32_t keepTime = tsShellActivityTimer * 3; SConnObj *pConn = taosCachePut(pMgmt->cache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), keepTime * 1000); if (pConn == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - mError("conn:%d, failed to put into cache since %s, user:%s", connId, pInfo->user, terrstr()); + mError("conn:%d, failed to put into cache since %s, user:%s", connId, user, terrstr()); return NULL; } else { - mTrace("conn:%d, is created, data:%p user:%s", pConn->id, pConn, pInfo->user); + mTrace("conn:%d, is created, data:%p user:%s", pConn->id, pConn, user); return pConn; } } @@ -184,20 +186,14 @@ static int32_t mndProcessConnectReq(SNodeMsg *pReq) { SConnObj *pConn = NULL; int32_t code = -1; SConnectReq connReq = {0}; + char ip[30] = {0}; if (tDeserializeSConnectReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &connReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; goto CONN_OVER; } - SRpcConnInfo info = {0}; - if (rpcGetConnInfo(pReq->rpcMsg.handle, &info) != 0) { - mError("user:%s, failed to login while get connection info since %s", pReq->user, terrstr()); - goto CONN_OVER; - } - - char ip[30]; - taosIp2String(info.clientIp, ip); + taosIp2String(pReq->clientIp, ip); pUser = mndAcquireUser(pMnode, pReq->user); if (pUser == NULL) { @@ -216,7 +212,8 @@ static int32_t mndProcessConnectReq(SNodeMsg *pReq) { } } - pConn = mndCreateConn(pMnode, &info, connReq.pid, connReq.app, connReq.startTime); + pConn = + mndCreateConn(pMnode, pReq->user, pReq->clientIp, pReq->clientPort, connReq.pid, connReq.app, connReq.startTime); if (pConn == NULL) { mError("user:%s, failed to login from %s while create connection since %s", pReq->user, ip, terrstr()); goto CONN_OVER; @@ -241,7 +238,7 @@ static int32_t mndProcessConnectReq(SNodeMsg *pReq) { pReq->rspLen = contLen; pReq->pRsp = pRsp; - mDebug("user:%s, login from %s, conn:%d, app:%s", info.user, ip, pConn->id, connReq.app); + mDebug("user:%s, login from %s, conn:%d, app:%s", pReq->user, ip, pConn->id, connReq.app); code = 0; diff --git a/source/util/src/tprocess.c b/source/util/src/tprocess.c index d06aab6fda..650794971c 100644 --- a/source/util/src/tprocess.c +++ b/source/util/src/tprocess.c @@ -24,7 +24,6 @@ #include #define SHM_DEFAULT_SIZE (20 * 1024 * 1024) -#define CEIL8(n) (ceil((float)(n) / 8) * 8) typedef void *(*ProcThreadFp)(void *param); typedef struct SProcQueue { @@ -58,6 +57,11 @@ typedef struct SProcObj { bool stopFlag; } SProcObj; +static inline int32_t CEIL8(int32_t v) { + const int32_t c = ceil((float)(v) / 8) * 8; + return c < 8 ? 8 : c; +} + static int32_t taosProcInitMutex(TdThreadMutex **ppMutex, int32_t *pShmid) { TdThreadMutex *pMutex = NULL; TdThreadMutexAttr mattr = {0}; From dffa1a7285f688ed9548f3b795f2fec8777a1df4 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 28 Mar 2022 19:55:45 +0800 Subject: [PATCH 2/4] adjust logs --- source/dnode/mnode/impl/src/mndQuery.c | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndQuery.c b/source/dnode/mnode/impl/src/mndQuery.c index e93a0d9b17..40b4f60bd4 100644 --- a/source/dnode/mnode/impl/src/mndQuery.c +++ b/source/dnode/mnode/impl/src/mndQuery.c @@ -14,15 +14,15 @@ */ #include "mndQuery.h" -#include "mndMnode.h" #include "executor.h" +#include "mndMnode.h" #include "qworker.h" int32_t mndProcessQueryMsg(SNodeMsg *pReq) { - mTrace("message in query queue is processing"); - SMnode *pMnode = pReq->pNode; + SMnode *pMnode = pReq->pNode; SReadHandle handle = {0}; + mTrace("msg:%p, in query queue is processing", pReq); switch (pReq->rpcMsg.msgType) { case TDMT_VND_QUERY: return qWorkerProcessQueryMsg(&handle, pMnode->pQuery, &pReq->rpcMsg); @@ -35,9 +35,9 @@ int32_t mndProcessQueryMsg(SNodeMsg *pReq) { } int32_t mndProcessFetchMsg(SNodeMsg *pReq) { - mTrace("message in fetch queue is processing"); SMnode *pMnode = pReq->pNode; - + mTrace("msg:%p, in fetch queue is processing", pReq); + switch (pReq->rpcMsg.msgType) { case TDMT_VND_FETCH: return qWorkerProcessFetchMsg(pMnode, pMnode->pQuery, &pReq->rpcMsg); @@ -52,9 +52,9 @@ int32_t mndProcessFetchMsg(SNodeMsg *pReq) { } int32_t mndInitQuery(SMnode *pMnode) { - int32_t code = qWorkerInit(NODE_TYPE_MNODE, MND_VGID, NULL, (void **)&pMnode->pQuery, &pMnode->msgCb); - if (code) { - return code; + if (qWorkerInit(NODE_TYPE_MNODE, MND_VGID, NULL, (void **)&pMnode->pQuery, &pMnode->msgCb) != 0) { + mError("failed to init qworker in mnode since %s", terrstr()); + return -1; } mndSetMsgHandle(pMnode, TDMT_VND_QUERY, mndProcessQueryMsg); @@ -67,4 +67,3 @@ int32_t mndInitQuery(SMnode *pMnode) { } void mndCleanupQuery(SMnode *pMnode) { qWorkerDestroy((void **)&pMnode->pQuery); } - From f82afcfe4dd2c274ecae2afa162065cb64f852fe Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 28 Mar 2022 20:45:52 +0800 Subject: [PATCH 3/4] registerBrokenLinkArgFp --- include/common/tmsgcb.h | 15 +++--- include/util/tprocess.h | 12 +++-- source/common/src/tmsgcb.c | 4 ++ source/dnode/mgmt/bnode/src/bmInt.c | 1 + source/dnode/mgmt/container/inc/dnd.h | 1 + source/dnode/mgmt/container/inc/dndInt.h | 1 + source/dnode/mgmt/container/src/dndExec.c | 27 ++++++---- source/dnode/mgmt/container/src/dndMsg.c | 2 +- .../dnode/mgmt/container/src/dndTransport.c | 18 ++++++- source/dnode/mgmt/mnode/src/mmInt.c | 1 + source/dnode/mgmt/qnode/src/qmInt.c | 1 + source/dnode/mgmt/snode/src/smInt.c | 1 + source/dnode/mgmt/vnode/src/vmInt.c | 1 + source/dnode/mgmt/vnode/src/vmMsg.c | 1 + source/libs/qworker/src/qworkerMsg.c | 2 +- source/util/src/tprocess.c | 49 ++++++++++++------- 16 files changed, 95 insertions(+), 42 deletions(-) diff --git a/include/common/tmsgcb.h b/include/common/tmsgcb.h index 1b3311b400..192b4cebbc 100644 --- a/include/common/tmsgcb.h +++ b/include/common/tmsgcb.h @@ -41,14 +41,16 @@ typedef int32_t (*GetQueueSizeFp)(SMgmtWrapper* pWrapper, int32_t vgId, EQueueTy typedef int32_t (*SendReqFp)(SMgmtWrapper* pWrapper, SEpSet* epSet, SRpcMsg* pReq); typedef int32_t (*SendMnodeReqFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq); typedef void (*SendRspFp)(SMgmtWrapper* pWrapper, SRpcMsg* pRsp); +typedef void (*RegisterBrokenLinkArgFp)(SMgmtWrapper* pWrapper, SRpcMsg *pMsg); typedef struct { - SMgmtWrapper* pWrapper; - PutToQueueFp queueFps[QUEUE_MAX]; - GetQueueSizeFp qsizeFp; - SendReqFp sendReqFp; - SendMnodeReqFp sendMnodeReqFp; - SendRspFp sendRspFp; + SMgmtWrapper* pWrapper; + PutToQueueFp queueFps[QUEUE_MAX]; + GetQueueSizeFp qsizeFp; + SendReqFp sendReqFp; + SendMnodeReqFp sendMnodeReqFp; + SendRspFp sendRspFp; + RegisterBrokenLinkArgFp registerBrokenLinkArgFp; } SMsgCb; int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq); @@ -56,6 +58,7 @@ int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype); int32_t tmsgSendReq(const SMsgCb* pMsgCb, SEpSet* epSet, SRpcMsg* pReq); int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq); void tmsgSendRsp(const SMsgCb* pMsgCb, SRpcMsg* pRsp); +void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg); #ifdef __cplusplus } diff --git a/include/util/tprocess.h b/include/util/tprocess.h index a0be38a3ad..052a6619c8 100644 --- a/include/util/tprocess.h +++ b/include/util/tprocess.h @@ -22,11 +22,14 @@ extern "C" { #endif +typedef enum { PROC_REQ, PROC_RSP, PROC_REGISTER } ProcFuncType; + typedef struct SProcQueue SProcQueue; typedef struct SProcObj SProcObj; typedef void *(*ProcMallocFp)(int32_t contLen); typedef void *(*ProcFreeFp)(void *pCont); -typedef void *(*ProcConsumeFp)(void *pParent, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen); +typedef void *(*ProcConsumeFp)(void *pParent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, + ProcFuncType ftype); typedef struct { int32_t childQueueSize; @@ -52,9 +55,10 @@ int32_t taosProcRun(SProcObj *pProc); void taosProcStop(SProcObj *pProc); bool taosProcIsChild(SProcObj *pProc); int32_t taosProcChildId(SProcObj *pProc); - -int32_t taosProcPutToChildQueue(SProcObj *pProc, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen); -int32_t taosProcPutToParentQueue(SProcObj *pProc, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen); +int32_t taosProcPutToChildQ(SProcObj *pProc, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, + ProcFuncType ftype); +int32_t taosProcPutToParentQ(SProcObj *pProc, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, + ProcFuncType ftype); #ifdef __cplusplus } diff --git a/source/common/src/tmsgcb.c b/source/common/src/tmsgcb.c index 98ee1b679d..328a0c2b0c 100644 --- a/source/common/src/tmsgcb.c +++ b/source/common/src/tmsgcb.c @@ -33,3 +33,7 @@ int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq) { } void tmsgSendRsp(const SMsgCb* pMsgCb, SRpcMsg* pRsp) { return (*pMsgCb->sendRspFp)(pMsgCb->pWrapper, pRsp); } + +void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg) { + (*pMsgCb->registerBrokenLinkArgFp)(pMsgCb->pWrapper, pMsg); +} \ No newline at end of file diff --git a/source/dnode/mgmt/bnode/src/bmInt.c b/source/dnode/mgmt/bnode/src/bmInt.c index e2506ab383..4b87f4463c 100644 --- a/source/dnode/mgmt/bnode/src/bmInt.c +++ b/source/dnode/mgmt/bnode/src/bmInt.c @@ -24,6 +24,7 @@ static void bmInitOption(SBnodeMgmt *pMgmt, SBnodeOpt *pOption) { msgCb.sendReqFp = dndSendReqToDnode; msgCb.sendMnodeReqFp = dndSendReqToMnode; msgCb.sendRspFp = dndSendRsp; + msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg; pOption->msgCb = msgCb; } diff --git a/source/dnode/mgmt/container/inc/dnd.h b/source/dnode/mgmt/container/inc/dnd.h index 7c06e08dff..6968ae4609 100644 --- a/source/dnode/mgmt/container/inc/dnd.h +++ b/source/dnode/mgmt/container/inc/dnd.h @@ -139,6 +139,7 @@ void dndSendMonitorReport(SDnode *pDnode); int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, SEpSet *pEpSet, SRpcMsg *pMsg); void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp); +void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg); int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed); diff --git a/source/dnode/mgmt/container/inc/dndInt.h b/source/dnode/mgmt/container/inc/dndInt.h index 8ea496b2fb..d10835b67f 100644 --- a/source/dnode/mgmt/container/inc/dndInt.h +++ b/source/dnode/mgmt/container/inc/dndInt.h @@ -56,6 +56,7 @@ void dndCleanupServer(SDnode *pDnode); int32_t dndInitClient(SDnode *pDnode); void dndCleanupClient(SDnode *pDnode); int32_t dndInitMsgHandle(SDnode *pDnode); +void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/container/src/dndExec.c b/source/dnode/mgmt/container/src/dndExec.c index 8ffa53f034..61868588da 100644 --- a/source/dnode/mgmt/container/src/dndExec.c +++ b/source/dnode/mgmt/container/src/dndExec.c @@ -116,11 +116,12 @@ static void dndClearNodesExecpt(SDnode *pDnode, ENodeType except) { } } -static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) { +static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen, + ProcFuncType ftype) { SRpcMsg *pRpc = &pMsg->rpcMsg; pRpc->pCont = pCont; - dTrace("msg:%p, get from child process queue, type:%s handle:%p app:%p", pMsg, TMSG_INFO(pRpc->msgType), - pRpc->handle, pRpc->ahandle); + dTrace("msg:%p, get from child process queue, type:%s handle:%p app:%p", pMsg, TMSG_INFO(pRpc->msgType), pRpc->handle, + pRpc->ahandle); NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)]; int32_t code = (*msgFp)(pWrapper, pMsg); @@ -138,13 +139,21 @@ static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t } } -static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, int32_t msgLen, void *pCont, int32_t contLen) { - pRpc->pCont = pCont; - dTrace("msg:%p, get from parent process queue, type:%s handle:%p app:%p", pRpc, TMSG_INFO(pRpc->msgType), - pRpc->handle, pRpc->ahandle); +static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen, + ProcFuncType ftype) { + pMsg->pCont = pCont; + dTrace("msg:%p, get from parent process queue, type:%s handle:%p app:%p", pMsg, TMSG_INFO(pMsg->msgType), + pMsg->handle, pMsg->ahandle); - dndSendRsp(pWrapper, pRpc); - taosMemoryFree(pRpc); + switch (ftype) { + case PROC_REGISTER: + rpcRegisterBrokenLinkArg(pMsg); + break; + default: + dndSendRpcRsp(pWrapper, pMsg); + break; + } + taosMemoryFree(pMsg); } static int32_t dndRunInMultiProcess(SDnode *pDnode) { diff --git a/source/dnode/mgmt/container/src/dndMsg.c b/source/dnode/mgmt/container/src/dndMsg.c index e4ecbe6af8..3aafa5a5e3 100644 --- a/source/dnode/mgmt/container/src/dndMsg.c +++ b/source/dnode/mgmt/container/src/dndMsg.c @@ -68,7 +68,7 @@ void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) { } else if (pWrapper->procType == PROC_PARENT) { dTrace("msg:%p, is created and will put into child queue, handle:%p app:%p user:%s", pMsg, pRpc->handle, pRpc->ahandle, pMsg->user); - code = taosProcPutToChildQueue(pWrapper->pProc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen); + code = taosProcPutToChildQ(pWrapper->pProc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen, PROC_REQ); } else { dTrace("msg:%p, should not processed in child process, handle:%p app:%p user:%s", pMsg, pRpc->handle, pRpc->ahandle, pMsg->user); diff --git a/source/dnode/mgmt/container/src/dndTransport.c b/source/dnode/mgmt/container/src/dndTransport.c index b7d0cf26c0..797e2535d6 100644 --- a/source/dnode/mgmt/container/src/dndTransport.c +++ b/source/dnode/mgmt/container/src/dndTransport.c @@ -348,7 +348,7 @@ int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pReq) { } } -static void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) { +void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) { if (pRsp->code == TSDB_CODE_APP_NOT_READY) { SMgmtWrapper *pDnodeWrapper = dndAcquireWrapper(pWrapper->pDnode, DNODE); if (pDnodeWrapper != NULL) { @@ -366,7 +366,7 @@ void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) { if (pWrapper->procType == PROC_CHILD) { int32_t code = -1; do { - code = taosProcPutToParentQueue(pWrapper->pProc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen); + code = taosProcPutToParentQ(pWrapper->pProc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_RSP); if (code != 0) { taosMsleep(10); } @@ -375,3 +375,17 @@ void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) { dndSendRpcRsp(pWrapper, pRsp); } } + +void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) { + if (pWrapper->procType == PROC_CHILD) { + int32_t code = -1; + do { + code = taosProcPutToParentQ(pWrapper->pProc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, PROC_REGISTER); + if (code != 0) { + taosMsleep(10); + } + } while (code != 0); + } else { + rpcRegisterBrokenLinkArg(pMsg); + } +} \ No newline at end of file diff --git a/source/dnode/mgmt/mnode/src/mmInt.c b/source/dnode/mgmt/mnode/src/mmInt.c index f5a3252fa2..3d15c9b9ae 100644 --- a/source/dnode/mgmt/mnode/src/mmInt.c +++ b/source/dnode/mgmt/mnode/src/mmInt.c @@ -52,6 +52,7 @@ static void mmInitOption(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { msgCb.sendReqFp = dndSendReqToDnode; msgCb.sendMnodeReqFp = dndSendReqToMnode; msgCb.sendRspFp = dndSendRsp; + msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg; pOption->msgCb = msgCb; } diff --git a/source/dnode/mgmt/qnode/src/qmInt.c b/source/dnode/mgmt/qnode/src/qmInt.c index c8cb7258c3..11c80a2904 100644 --- a/source/dnode/mgmt/qnode/src/qmInt.c +++ b/source/dnode/mgmt/qnode/src/qmInt.c @@ -27,6 +27,7 @@ static void qmInitOption(SQnodeMgmt *pMgmt, SQnodeOpt *pOption) { msgCb.sendReqFp = dndSendReqToDnode; msgCb.sendMnodeReqFp = dndSendReqToMnode; msgCb.sendRspFp = dndSendRsp; + msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg; pOption->msgCb = msgCb; } diff --git a/source/dnode/mgmt/snode/src/smInt.c b/source/dnode/mgmt/snode/src/smInt.c index 351f7d656e..a639fc76bb 100644 --- a/source/dnode/mgmt/snode/src/smInt.c +++ b/source/dnode/mgmt/snode/src/smInt.c @@ -24,6 +24,7 @@ static void smInitOption(SSnodeMgmt *pMgmt, SSnodeOpt *pOption) { msgCb.sendReqFp = dndSendReqToDnode; msgCb.sendMnodeReqFp = dndSendReqToMnode; msgCb.sendRspFp = dndSendRsp; + msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg; pOption->msgCb = msgCb; } diff --git a/source/dnode/mgmt/vnode/src/vmInt.c b/source/dnode/mgmt/vnode/src/vmInt.c index 464e789e46..e40c2658e4 100644 --- a/source/dnode/mgmt/vnode/src/vmInt.c +++ b/source/dnode/mgmt/vnode/src/vmInt.c @@ -137,6 +137,7 @@ static void *vmOpenVnodeFunc(void *param) { msgCb.sendReqFp = dndSendReqToDnode; msgCb.sendMnodeReqFp = dndSendReqToMnode; msgCb.sendRspFp = dndSendRsp; + msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg; SVnodeCfg cfg = {.msgCb = msgCb, .pTfs = pMgmt->pTfs, .vgId = pCfg->vgId, .dbId = pCfg->dbUid}; SVnode *pImpl = vnodeOpen(pCfg->path, &cfg); if (pImpl == NULL) { diff --git a/source/dnode/mgmt/vnode/src/vmMsg.c b/source/dnode/mgmt/vnode/src/vmMsg.c index 8090319582..bcc494fd07 100644 --- a/source/dnode/mgmt/vnode/src/vmMsg.c +++ b/source/dnode/mgmt/vnode/src/vmMsg.c @@ -91,6 +91,7 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { msgCb.sendReqFp = dndSendReqToDnode; msgCb.sendMnodeReqFp = dndSendReqToMnode; msgCb.sendRspFp = dndSendRsp; + msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg; vnodeCfg.msgCb = msgCb; vnodeCfg.pTfs = pMgmt->pTfs; diff --git a/source/libs/qworker/src/qworkerMsg.c b/source/libs/qworker/src/qworkerMsg.c index ff9527f7b9..76c42581f1 100644 --- a/source/libs/qworker/src/qworkerMsg.c +++ b/source/libs/qworker/src/qworkerMsg.c @@ -287,7 +287,7 @@ int32_t qwRegisterBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) { .code = TSDB_CODE_RPC_NETWORK_UNAVAIL, }; - rpcRegisterBrokenLinkArg(&pMsg); + tmsgRegisterBrokenLinkArg(&mgmt->msgCb, &pMsg); return TSDB_CODE_SUCCESS; } diff --git a/source/util/src/tprocess.c b/source/util/src/tprocess.c index 650794971c..fcc5a0bcb8 100644 --- a/source/util/src/tprocess.c +++ b/source/util/src/tprocess.c @@ -207,7 +207,8 @@ static void taosProcCleanupQueue(SProcQueue *pQueue) { } } -static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int32_t rawHeadLen, char *pBody, int32_t rawBodyLen) { +static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int16_t rawHeadLen, char *pBody, int32_t rawBodyLen, + ProcFuncType funcType) { const int32_t headLen = CEIL8(rawHeadLen); const int32_t bodyLen = CEIL8(rawBodyLen); const int32_t fullLen = headLen + bodyLen + 8; @@ -225,10 +226,12 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int32_t rawHea } if (pQueue->tail < pQueue->total) { - *(int32_t *)(pQueue->pBuffer + pQueue->head) = headLen; + *(int16_t *)(pQueue->pBuffer + pQueue->head) = headLen; + *(int8_t *)(pQueue->pBuffer + pQueue->head + 2) = (int8_t)funcType; *(int32_t *)(pQueue->pBuffer + pQueue->head + 4) = bodyLen; } else { - *(int32_t *)(pQueue->pBuffer) = headLen; + *(int16_t *)(pQueue->pBuffer) = headLen; + *(int8_t *)(pQueue->pBuffer + pQueue->head + 2) = (int8_t)funcType; *(int32_t *)(pQueue->pBuffer + 4) = bodyLen; } @@ -268,13 +271,13 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int32_t rawHea taosThreadMutexUnlock(pQueue->mutex); tsem_post(&pQueue->sem); - uTrace("proc:%s, push msg to queue:%p remains:%d, head:%d:%p body:%d:%p", pQueue->name, pQueue, pQueue->items, - headLen, pHead, bodyLen, pBody); + uTrace("proc:%s, push msg to queue:%p remains:%d, head:%d:%p body:%d:%p ftype:%d", pQueue->name, pQueue, pQueue->items, + headLen, pHead, bodyLen, pBody, funcType); return 0; } -static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int32_t *pHeadLen, void **ppBody, - int32_t *pBodyLen) { +static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHeadLen, void **ppBody, + int32_t *pBodyLen, ProcFuncType *pFuncType) { tsem_wait(&pQueue->sem); taosThreadMutexLock(pQueue->mutex); @@ -285,13 +288,16 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int32_t *pHea return 0; } - int32_t headLen = 0; + int16_t headLen = 0; + int8_t ftype = 0; int32_t bodyLen = 0; if (pQueue->head < pQueue->total) { - headLen = *(int32_t *)(pQueue->pBuffer + pQueue->head); + headLen = *(int16_t *)(pQueue->pBuffer + pQueue->head); + ftype = *(int8_t *)(pQueue->pBuffer + pQueue->head + 2); bodyLen = *(int32_t *)(pQueue->pBuffer + pQueue->head + 4); } else { - headLen = *(int32_t *)(pQueue->pBuffer); + headLen = *(int16_t *)(pQueue->pBuffer); + headLen = *(int8_t *)(pQueue->pBuffer + 2); bodyLen = *(int32_t *)(pQueue->pBuffer + 4); } @@ -345,9 +351,10 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int32_t *pHea *ppBody = pBody; *pHeadLen = headLen; *pBodyLen = bodyLen; + *pFuncType = (ProcFuncType)ftype; - uTrace("proc:%s, pop msg from queue:%p remains:%d, head:%d:%p body:%d:%p", pQueue->name, pQueue, pQueue->items, - headLen, pHead, bodyLen, pBody); + uTrace("proc:%s, pop msg from queue:%p remains:%d, head:%d:%p body:%d:%p ftype:%d", pQueue->name, pQueue, pQueue->items, + headLen, pHead, bodyLen, pBody, ftype); return 1; } @@ -400,12 +407,14 @@ static void taosProcThreadLoop(SProcQueue *pQueue) { ProcConsumeFp consumeFp = pQueue->consumeFp; void *pParent = pQueue->pParent; void *pHead, *pBody; - int32_t headLen, bodyLen; + int16_t headLen; + ProcFuncType ftype; + int32_t bodyLen; uDebug("proc:%s, start to get msg from queue:%p", pQueue->name, pQueue); while (1) { - int32_t numOfMsgs = taosProcQueuePop(pQueue, &pHead, &headLen, &pBody, &bodyLen); + int32_t numOfMsgs = taosProcQueuePop(pQueue, &pHead, &headLen, &pBody, &bodyLen, &ftype); if (numOfMsgs == 0) { uDebug("proc:%s, get no msg from queue:%p and exit the proc thread", pQueue->name, pQueue); break; @@ -414,7 +423,7 @@ static void taosProcThreadLoop(SProcQueue *pQueue) { taosMsleep(1); continue; } else { - (*consumeFp)(pParent, pHead, headLen, pBody, bodyLen); + (*consumeFp)(pParent, pHead, headLen, pBody, bodyLen, ftype); } } } @@ -462,10 +471,12 @@ void taosProcCleanup(SProcObj *pProc) { } } -int32_t taosProcPutToChildQueue(SProcObj *pProc, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen) { - return taosProcQueuePush(pProc->pChildQueue, pHead, headLen, pBody, bodyLen); +int32_t taosProcPutToChildQ(SProcObj *pProc, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, + ProcFuncType funcType) { + return taosProcQueuePush(pProc->pChildQueue, pHead, headLen, pBody, bodyLen, funcType); } -int32_t taosProcPutToParentQueue(SProcObj *pProc, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen) { - return taosProcQueuePush(pProc->pParentQueue, pHead, headLen, pBody, bodyLen); +int32_t taosProcPutToParentQ(SProcObj *pProc, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, + ProcFuncType funcType) { + return taosProcQueuePush(pProc->pParentQueue, pHead, headLen, pBody, bodyLen, funcType); } From 681dbc015071811a00f074b999a7e3bd5d41ed30 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 28 Mar 2022 21:24:59 +0800 Subject: [PATCH 4/4] remove rpcSendResponse in qnode --- source/dnode/mgmt/container/src/dndExec.c | 6 +-- source/dnode/mnode/impl/src/mndTrans.c | 8 ++-- source/libs/qworker/inc/qworkerMsg.h | 19 +++++---- source/libs/qworker/src/qworker.c | 47 ++++++++++++----------- source/libs/qworker/src/qworkerMsg.c | 45 +++++++++++----------- source/util/src/tprocess.c | 2 +- tests/script/tsim/insert/basic1.sim | 2 +- 7 files changed, 65 insertions(+), 64 deletions(-) diff --git a/source/dnode/mgmt/container/src/dndExec.c b/source/dnode/mgmt/container/src/dndExec.c index 61868588da..412fa388b6 100644 --- a/source/dnode/mgmt/container/src/dndExec.c +++ b/source/dnode/mgmt/container/src/dndExec.c @@ -120,7 +120,7 @@ static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t ProcFuncType ftype) { SRpcMsg *pRpc = &pMsg->rpcMsg; pRpc->pCont = pCont; - dTrace("msg:%p, get from child process queue, type:%s handle:%p app:%p", pMsg, TMSG_INFO(pRpc->msgType), pRpc->handle, + dTrace("msg:%p, get from child queue, type:%s handle:%p app:%p", pMsg, TMSG_INFO(pRpc->msgType), pRpc->handle, pRpc->ahandle); NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)]; @@ -142,8 +142,8 @@ static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen, ProcFuncType ftype) { pMsg->pCont = pCont; - dTrace("msg:%p, get from parent process queue, type:%s handle:%p app:%p", pMsg, TMSG_INFO(pMsg->msgType), - pMsg->handle, pMsg->ahandle); + dTrace("msg:%p, get from parent queue, type:%s handle:%p app:%p", pMsg, TMSG_INFO(pMsg->msgType), pMsg->handle, + pMsg->ahandle); switch (ftype) { case PROC_REGISTER: diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 3314211437..d59ed3e605 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -54,7 +54,7 @@ static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans); static void mndTransExecute(SMnode *pMnode, STrans *pTrans); -static void mndTransSendRpcRsp(STrans *pTrans); +static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans); static int32_t mndProcessTransReq(SNodeMsg *pReq); static int32_t mndProcessKillTransReq(SNodeMsg *pReq); @@ -737,7 +737,7 @@ static int32_t mndTransRollback(SMnode *pMnode, STrans *pTrans) { return 0; } -static void mndTransSendRpcRsp(STrans *pTrans) { +static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) { bool sendRsp = false; if (pTrans->stage == TRN_STAGE_FINISHED) { @@ -771,7 +771,7 @@ static void mndTransSendRpcRsp(STrans *pTrans) { .ahandle = pTrans->rpcAHandle, .pCont = rpcCont, .contLen = pTrans->rpcRspLen}; - rpcSendResponse(&rspMsg); + tmsgSendRsp(&pMnode->msgCb, &rspMsg); pTrans->rpcHandle = NULL; pTrans->rpcRsp = NULL; pTrans->rpcRspLen = 0; @@ -1158,7 +1158,7 @@ static void mndTransExecute(SMnode *pMnode, STrans *pTrans) { } } - mndTransSendRpcRsp(pTrans); + mndTransSendRpcRsp(pMnode, pTrans); } static int32_t mndProcessTransReq(SNodeMsg *pReq) { diff --git a/source/libs/qworker/inc/qworkerMsg.h b/source/libs/qworker/inc/qworkerMsg.h index be1d47a189..2ccd255909 100644 --- a/source/libs/qworker/inc/qworkerMsg.h +++ b/source/libs/qworker/inc/qworkerMsg.h @@ -30,21 +30,20 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req); -int32_t qwBuildAndSendDropRsp(SQWConnInfo *pConn, int32_t code); -int32_t qwBuildAndSendCancelRsp(SQWConnInfo *pConn, int32_t code); -int32_t qwBuildAndSendFetchRsp(SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code); -void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete); +int32_t qwBuildAndSendDropRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code); +int32_t qwBuildAndSendCancelRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code); +int32_t qwBuildAndSendFetchRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, + int32_t code); +void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete); int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn); -int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code); -int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code); -void qwFreeFetchRsp(void *msg); +int32_t qwBuildAndSendReadyRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code); +int32_t qwBuildAndSendQueryRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code); +void qwFreeFetchRsp(void *msg); int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp); int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp); -int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *rsp, int32_t code); +int32_t qwBuildAndSendHbRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, SSchedulerHbRsp *rsp, int32_t code); int32_t qwRegisterBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn); - - #ifdef __cplusplus } #endif diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 71e7415ea5..133a80b154 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -763,8 +763,8 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu dropConnection = &ctx->connInfo; QW_ERR_JRET(qwDropTask(QW_FPARAMS())); dropConnection = NULL; - - qwBuildAndSendDropRsp(&ctx->connInfo, code); + + qwBuildAndSendDropRsp(&mgmt->msgCb, &ctx->connInfo, code); QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->connInfo.handle, code, tstrerror(code)); QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); @@ -802,9 +802,9 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu QW_ERR_JRET(qwDropTask(QW_FPARAMS())); dropConnection = NULL; - qwBuildAndSendDropRsp(&ctx->connInfo, code); + qwBuildAndSendDropRsp(&mgmt->msgCb, &ctx->connInfo, code); QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->connInfo.handle, code, tstrerror(code)); - + QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); } @@ -830,12 +830,12 @@ _return: } if (dropConnection) { - qwBuildAndSendDropRsp(dropConnection, code); + qwBuildAndSendDropRsp(&mgmt->msgCb, dropConnection, code); QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", dropConnection->handle, code, tstrerror(code)); } if (cancelConnection) { - qwBuildAndSendCancelRsp(cancelConnection, code); + qwBuildAndSendCancelRsp(&mgmt->msgCb, cancelConnection, code); QW_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", cancelConnection->handle, code, tstrerror(code)); } @@ -886,9 +886,9 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } - qwBuildAndSendDropRsp(&ctx->connInfo, code); + qwBuildAndSendDropRsp(&mgmt->msgCb, &ctx->connInfo, code); QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->connInfo.handle, code, tstrerror(code)); - + QW_ERR_JRET(qwDropTask(QW_FPARAMS())); QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); } @@ -918,7 +918,7 @@ _return: } if (readyConnection) { - qwBuildAndSendReadyRsp(readyConnection, code); + qwBuildAndSendReadyRsp(&mgmt->msgCb, readyConnection, code); QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", readyConnection->handle, code, tstrerror(code)); } @@ -970,7 +970,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) { QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } - QW_ERR_JRET(qwBuildAndSendQueryRsp(&qwMsg->connInfo, code)); + QW_ERR_JRET(qwBuildAndSendQueryRsp(&mgmt->msgCb, &qwMsg->connInfo, code)); QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); queryRsped = true; @@ -986,9 +986,9 @@ _return: input.code = code; code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL); - + if (!queryRsped) { - qwBuildAndSendQueryRsp(&qwMsg->connInfo, code); + qwBuildAndSendQueryRsp(&mgmt->msgCb, &qwMsg->connInfo, code); QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); } @@ -1051,7 +1051,7 @@ _return: } if (needRsp) { - qwBuildAndSendReadyRsp(&qwMsg->connInfo, code); + qwBuildAndSendReadyRsp(&mgmt->msgCb, &qwMsg->connInfo, code); QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); } @@ -1095,7 +1095,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { qwMsg->connInfo = ctx->connInfo; QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); - qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code); + qwBuildAndSendFetchRsp(&mgmt->msgCb, &qwMsg->connInfo, rsp, dataLen, code); QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), dataLen); } else { atomic_store_8((int8_t*)&ctx->queryContinue, 1); @@ -1114,7 +1114,7 @@ _return: rsp = NULL; qwMsg->connInfo = ctx->connInfo; - qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, 0, code); + qwBuildAndSendFetchRsp(&mgmt->msgCb, &qwMsg->connInfo, rsp, 0, code); QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), 0); } @@ -1195,7 +1195,7 @@ _return: } if (code || rsp) { - qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code); + qwBuildAndSendFetchRsp(&mgmt->msgCb, &qwMsg->connInfo, rsp, dataLen, code); QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), dataLen); } @@ -1226,9 +1226,9 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx)); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING); } else if (ctx->phase > 0) { - qwBuildAndSendDropRsp(&qwMsg->connInfo, code); + qwBuildAndSendDropRsp(&mgmt->msgCb, &qwMsg->connInfo, code); QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); - + QW_ERR_JRET(qwDropTask(QW_FPARAMS())); rsped = true; } else { @@ -1241,7 +1241,7 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP); } - + _return: if (code) { @@ -1261,7 +1261,7 @@ _return: } if (TSDB_CODE_SUCCESS != code) { - qwBuildAndSendDropRsp(&qwMsg->connInfo, code); + qwBuildAndSendDropRsp(&mgmt->msgCb, &qwMsg->connInfo, code); QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); } @@ -1297,7 +1297,7 @@ int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { _return: - qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code); + qwBuildAndSendHbRsp(&mgmt->msgCb, &qwMsg->connInfo, &rsp, code); QW_DLOG("hb rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); QW_RET(TSDB_CODE_SUCCESS); @@ -1351,8 +1351,9 @@ _return: QW_UNLOCK(QW_READ, &mgmt->schLock); for (int32_t j = 0; j < i; ++j) { - qwBuildAndSendHbRsp(&rspList[j].connInfo, &rspList[j].rsp, code); - QW_DLOG("hb rsp send, handle:%p, code:%x - %s, taskNum:%d", rspList[j].connInfo.handle, code, tstrerror(code), (rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0)); + qwBuildAndSendHbRsp(&mgmt->msgCb, &rspList[j].connInfo, &rspList[j].rsp, code); + QW_DLOG("hb rsp send, handle:%p, code:%x - %s, taskNum:%d", rspList[j].connInfo.handle, code, tstrerror(code), + (rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0)); tFreeSSchedulerHbRsp(&rspList[j].rsp); } diff --git a/source/libs/qworker/src/qworkerMsg.c b/source/libs/qworker/src/qworkerMsg.c index 76c42581f1..78553462b8 100644 --- a/source/libs/qworker/src/qworkerMsg.c +++ b/source/libs/qworker/src/qworkerMsg.c @@ -46,7 +46,7 @@ void qwFreeFetchRsp(void *msg) { } } -int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code) { +int32_t qwBuildAndSendQueryRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code) { SQueryTableRsp rsp = {.code = code}; int32_t contLen = tSerializeSQueryTableRsp(NULL, 0, &rsp); @@ -62,12 +62,12 @@ int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code) { .code = code, }; - rpcSendResponse(&rpcRsp); + tmsgSendRsp(pMsgCb, &rpcRsp); return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code) { +int32_t qwBuildAndSendReadyRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code) { SResReadyRsp *pRsp = (SResReadyRsp *)rpcMallocCont(sizeof(SResReadyRsp)); pRsp->code = code; @@ -80,12 +80,12 @@ int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code) { .code = code, }; - rpcSendResponse(&rpcRsp); + tmsgSendRsp(pMsgCb, &rpcRsp); return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) { +int32_t qwBuildAndSendHbRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) { int32_t contLen = tSerializeSSchedulerHbRsp(NULL, 0, pStatus); void *pRsp = rpcMallocCont(contLen); tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus); @@ -99,12 +99,12 @@ int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *pStatus, int32_ .code = code, }; - rpcSendResponse(&rpcRsp); + tmsgSendRsp(pMsgCb, &rpcRsp); return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendFetchRsp(SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) { +int32_t qwBuildAndSendFetchRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) { if (NULL == pRsp) { pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); memset(pRsp, 0, sizeof(SRetrieveTableRsp)); @@ -120,12 +120,12 @@ int32_t qwBuildAndSendFetchRsp(SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int3 .code = code, }; - rpcSendResponse(&rpcRsp); + tmsgSendRsp(pMsgCb, &rpcRsp); return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendCancelRsp(SQWConnInfo *pConn, int32_t code) { +int32_t qwBuildAndSendCancelRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code) { STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp)); pRsp->code = code; @@ -138,11 +138,11 @@ int32_t qwBuildAndSendCancelRsp(SQWConnInfo *pConn, int32_t code) { .code = code, }; - rpcSendResponse(&rpcRsp); + tmsgSendRsp(pMsgCb, &rpcRsp); return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendDropRsp(SQWConnInfo *pConn, int32_t code) { +int32_t qwBuildAndSendDropRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code) { STaskDropRsp *pRsp = (STaskDropRsp *)rpcMallocCont(sizeof(STaskDropRsp)); pRsp->code = code; @@ -155,11 +155,11 @@ int32_t qwBuildAndSendDropRsp(SQWConnInfo *pConn, int32_t code) { .code = code, }; - rpcSendResponse(&rpcRsp); + tmsgSendRsp(pMsgCb, &rpcRsp); return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) { +int32_t qwBuildAndSendShowRsp(const SMsgCb *pMsgCb, SRpcMsg *pMsg, int32_t code) { int32_t numOfCols = 6; SVShowTablesRsp showRsp = {0}; @@ -210,11 +210,11 @@ int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) { .code = code, }; - rpcSendResponse(&rpcMsg); + tmsgSendRsp(pMsgCb, &rpcMsg); return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchReq) { +int32_t qwBuildAndSendShowFetchRsp(const SMsgCb *pMsgCb, SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchReq) { SVShowTablesFetchRsp *pRsp = (SVShowTablesFetchRsp *)rpcMallocCont(sizeof(SVShowTablesFetchRsp)); int32_t handle = htonl(pFetchReq->id); @@ -227,7 +227,7 @@ int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchRe .code = 0, }; - rpcSendResponse(&rpcMsg); + tmsgSendRsp(pMsgCb, &rpcMsg); return TSDB_CODE_SUCCESS; } @@ -498,7 +498,7 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { _return: - QW_ERR_RET(qwBuildAndSendCancelRsp(&qwMsg.connInfo, code)); + QW_ERR_RET(qwBuildAndSendCancelRsp(&mgmt->msgCb, &qwMsg.connInfo, code)); QW_SCH_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", qwMsg.connInfo.handle, code, tstrerror(code)); return TSDB_CODE_SUCCESS; @@ -579,15 +579,16 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; } - int32_t qWorkerProcessShowMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { return TSDB_CODE_QRY_INVALID_INPUT; } + SQWorkerMgmt *pMgmt = qWorkerMgmt; + int32_t code = 0; SVShowTablesReq *pReq = pMsg->pCont; - QW_RET(qwBuildAndSendShowRsp(pMsg, code)); + QW_RET(qwBuildAndSendShowRsp(&pMgmt->msgCb, pMsg, code)); } int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { @@ -595,8 +596,8 @@ int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) return TSDB_CODE_QRY_INVALID_INPUT; } + SQWorkerMgmt *pMgmt = qWorkerMgmt; + SVShowTablesFetchReq *pFetchReq = pMsg->pCont; - QW_RET(qwBuildAndSendShowFetchRsp(pMsg, pFetchReq)); + QW_RET(qwBuildAndSendShowFetchRsp(&pMgmt->msgCb, pMsg, pFetchReq)); } - - diff --git a/source/util/src/tprocess.c b/source/util/src/tprocess.c index fcc5a0bcb8..4dce5f560f 100644 --- a/source/util/src/tprocess.c +++ b/source/util/src/tprocess.c @@ -297,7 +297,7 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea bodyLen = *(int32_t *)(pQueue->pBuffer + pQueue->head + 4); } else { headLen = *(int16_t *)(pQueue->pBuffer); - headLen = *(int8_t *)(pQueue->pBuffer + 2); + ftype = *(int8_t *)(pQueue->pBuffer + 2); bodyLen = *(int32_t *)(pQueue->pBuffer + 4); } diff --git a/tests/script/tsim/insert/basic1.sim b/tests/script/tsim/insert/basic1.sim index 653a44a18a..3a3f8d000e 100644 --- a/tests/script/tsim/insert/basic1.sim +++ b/tests/script/tsim/insert/basic1.sim @@ -41,7 +41,7 @@ print =============== insert data, mode1: one row one table in sql print =============== insert data, mode1: mulit rows one table in sql print =============== insert data, mode1: one rows mulit table in sql print =============== insert data, mode1: mulit rows mulit table in sql -sql insert into c1 values(now+0s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40) +sql insert into c1 values(now-1s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40) sql insert into c1 values(now+0s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40) (now+1s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40) (now+2s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40) print =============== query data