From 4c999f74ae663a70b9b1ad568d0e684ada1289fd Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 21 Mar 2022 20:52:17 +0800 Subject: [PATCH 1/6] Enabling qnodes to process messages --- source/dnode/mgmt/container/inc/dnd.h | 6 +- source/dnode/mgmt/container/src/dndInt.c | 3 +- .../dnode/mgmt/container/src/dndTransport.c | 54 +++++-- source/dnode/mgmt/dnode/src/dmMsg.c | 26 ++-- source/dnode/mgmt/mnode/src/mmMsg.c | 140 +++++++++--------- source/dnode/mgmt/qnode/src/qmMsg.c | 14 +- source/dnode/mgmt/snode/src/smMsg.c | 4 +- source/dnode/mgmt/vnode/src/vmMsg.c | 74 ++++----- source/dnode/mgmt/vnode/src/vmWorker.c | 2 +- 9 files changed, 184 insertions(+), 139 deletions(-) diff --git a/source/dnode/mgmt/container/inc/dnd.h b/source/dnode/mgmt/container/inc/dnd.h index 29c05d6d01..f9fa896f42 100644 --- a/source/dnode/mgmt/container/inc/dnd.h +++ b/source/dnode/mgmt/container/inc/dnd.h @@ -89,6 +89,9 @@ typedef struct { } SDnodeWorker; typedef struct SMsgHandle { + int32_t vgId; + NodeMsgFp vgIdMsgFp; + SMgmtWrapper *pVgIdWrapper; // Handle the case where the same message type is distributed to qnode or vnode NodeMsgFp msgFp; SMgmtWrapper *pWrapper; } SMsgHandle; @@ -114,6 +117,7 @@ typedef struct SMgmtWrapper { void *pMgmt; SDnode *pDnode; NodeMsgFp msgFps[TDMT_MAX]; + int32_t msgVgIds[TDMT_MAX]; // Handle the case where the same message type is distributed to qnode or vnode SMgmtFp fp; } SMgmtWrapper; @@ -149,7 +153,7 @@ typedef struct SDnode { EDndStatus dndGetStatus(SDnode *pDnode); void dndSetStatus(SDnode *pDnode, EDndStatus stat); SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType nodeType); -void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp); +void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp, int32_t vgId); void dndReportStartup(SDnode *pDnode, char *pName, char *pDesc); void dndSendMonitorReport(SDnode *pDnode); diff --git a/source/dnode/mgmt/container/src/dndInt.c b/source/dnode/mgmt/container/src/dndInt.c index 4decc6ba87..8ad4351a88 100644 --- a/source/dnode/mgmt/container/src/dndInt.c +++ b/source/dnode/mgmt/container/src/dndInt.c @@ -65,8 +65,9 @@ void dndCleanup() { dInfo("dnode env is cleaned up"); } -void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp) { +void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp, int32_t vgId) { pWrapper->msgFps[TMSG_INDEX(msgType)] = nodeMsgFp; + pWrapper->msgVgIds[TMSG_INDEX(msgType)] = vgId; } EDndStatus dndGetStatus(SDnode *pDnode) { return pDnode->status; } diff --git a/source/dnode/mgmt/container/src/dndTransport.c b/source/dnode/mgmt/container/src/dndTransport.c index 2593a762e8..20069d89d6 100644 --- a/source/dnode/mgmt/container/src/dndTransport.c +++ b/source/dnode/mgmt/container/src/dndTransport.c @@ -20,13 +20,26 @@ #define INTERNAL_CKEY "_key" #define INTERNAL_SECRET "_pwd" +static inline void dndProcessQVnodeRpcMsg(SMsgHandle *pHandle, SRpcMsg *pMsg, SEpSet *pEpSet) { + SMsgHead *pHead = pMsg->pCont; + int32_t vgId = htonl(pHead->vgId); + + SMgmtWrapper *pWrapper = pHandle->pWrapper; + if (vgId == pHandle->vgId && pHandle->pVgIdWrapper != NULL) { + pWrapper = pHandle->pVgIdWrapper; + } + + dTrace("msg:%s will be processed by %s, handle:%p app:%p vgId:%d", TMSG_INFO(pMsg->msgType), pWrapper->name, + pMsg->handle, pMsg->ahandle, vgId); + dndProcessRpcMsg(pWrapper, pMsg, pEpSet); +} + static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) { SDnode *pDnode = parent; STransMgmt *pMgmt = &pDnode->trans; tmsg_t msgType = pRsp->msgType; if (dndGetStatus(pDnode) != DND_STAT_RUNNING) { - // if (pRsp == NULL || pRsp->pCont == NULL) return; dTrace("rsp:%s ignored since dnode not running, handle:%p app:%p", TMSG_INFO(msgType), pRsp->handle, pRsp->ahandle); rpcFreeCont(pRsp->pCont); return; @@ -34,9 +47,13 @@ static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) { SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)]; if (pHandle->msgFp != NULL) { - dTrace("rsp:%s will be processed by %s, handle:%p app:%p code:0x%04x:%s", TMSG_INFO(msgType), - pHandle->pWrapper->name, pRsp->handle, pRsp->ahandle, pRsp->code & 0XFFFF, tstrerror(pRsp->code)); - dndProcessRpcMsg(pHandle->pWrapper, pRsp, pEpSet); + if (pHandle->vgId == 0) { + dTrace("rsp:%s will be processed by %s, handle:%p app:%p code:0x%04x:%s", TMSG_INFO(msgType), + pHandle->pWrapper->name, pRsp->handle, pRsp->ahandle, pRsp->code & 0XFFFF, tstrerror(pRsp->code)); + dndProcessRpcMsg(pHandle->pWrapper, pRsp, pEpSet); + } else { + dndProcessQVnodeRpcMsg(pHandle, pRsp, pEpSet); + } } else { dError("rsp:%s not processed since no handle, handle:%p app:%p", TMSG_INFO(msgType), pRsp->handle, pRsp->ahandle); rpcFreeCont(pRsp->pCont); @@ -110,9 +127,13 @@ static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) { SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)]; if (pHandle->msgFp != NULL) { - dTrace("req:%s will be processed by %s, handle:%p app:%p", TMSG_INFO(msgType), pHandle->pWrapper->name, - pReq->handle, pReq->ahandle); - dndProcessRpcMsg(pHandle->pWrapper, pReq, pEpSet); + if (pHandle->vgId == 0) { + dTrace("req:%s will be processed by %s, handle:%p app:%p", TMSG_INFO(msgType), pHandle->pWrapper->name, + pReq->handle, pReq->ahandle); + dndProcessRpcMsg(pHandle->pWrapper, pReq, pEpSet); + } else { + dndProcessQVnodeRpcMsg(pHandle, pReq, pEpSet); + } } else { dError("req:%s not processed since no handle, handle:%p app:%p", TMSG_INFO(msgType), pReq->handle, pReq->ahandle); SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED, .ahandle = pReq->ahandle}; @@ -245,17 +266,24 @@ int32_t dndInitMsgHandle(SDnode *pDnode) { for (int32_t msgIndex = 0; msgIndex < TDMT_MAX; ++msgIndex) { NodeMsgFp msgFp = pWrapper->msgFps[msgIndex]; + int32_t vgId = pWrapper->msgVgIds[msgIndex]; if (msgFp == NULL) continue; SMsgHandle *pHandle = &pMgmt->msgHandles[msgIndex]; - if (pHandle->msgFp != NULL) { - dError("msg:%s has multiple process nodes, prev node:%s, curr node:%s", tMsgInfo[msgIndex], - pHandle->pWrapper->name, pWrapper->name); + if (pHandle->msgFp != NULL && pHandle->vgId == vgId) { + dError("msg:%s has multiple process nodes, prev node:%s:%d, curr node:%s:%d", tMsgInfo[msgIndex], + pHandle->pWrapper->name, pHandle->pWrapper->msgVgIds[msgIndex], pWrapper->name, vgId); return -1; } else { - dTrace("msg:%s will be processed by %s", tMsgInfo[msgIndex], pWrapper->name); - pHandle->msgFp = msgFp; - pHandle->pWrapper = pWrapper; + dTrace("msg:%s will be processed by %s, vgId:%d", tMsgInfo[msgIndex], pWrapper->name, vgId); + if (vgId == 0) { + pHandle->msgFp = msgFp; + pHandle->pWrapper = pWrapper; + } else { + pHandle->vgId = vgId; + pHandle->vgIdMsgFp = msgFp; + pHandle->pVgIdWrapper = pWrapper; + } } } } diff --git a/source/dnode/mgmt/dnode/src/dmMsg.c b/source/dnode/mgmt/dnode/src/dmMsg.c index f451c2ab05..7876d854ea 100644 --- a/source/dnode/mgmt/dnode/src/dmMsg.c +++ b/source/dnode/mgmt/dnode/src/dmMsg.c @@ -114,19 +114,19 @@ int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { void dmInitMsgHandles(SMgmtWrapper *pWrapper) { // Requests handled by DNODE - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE, (NodeMsgFp)dmProcessMgmtMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE, (NodeMsgFp)dmProcessMgmtMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE, (NodeMsgFp)dmProcessMgmtMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE, (NodeMsgFp)dmProcessMgmtMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE, (NodeMsgFp)dmProcessMgmtMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE, (NodeMsgFp)dmProcessMgmtMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE, (NodeMsgFp)dmProcessMgmtMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE, (NodeMsgFp)dmProcessMgmtMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE, (NodeMsgFp)dmProcessMgmtMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_NETWORK_TEST, (NodeMsgFp)dmProcessMgmtMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE, (NodeMsgFp)dmProcessMgmtMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE, (NodeMsgFp)dmProcessMgmtMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE, (NodeMsgFp)dmProcessMgmtMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE, (NodeMsgFp)dmProcessMgmtMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE, (NodeMsgFp)dmProcessMgmtMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE, (NodeMsgFp)dmProcessMgmtMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE, (NodeMsgFp)dmProcessMgmtMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE, (NodeMsgFp)dmProcessMgmtMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE, (NodeMsgFp)dmProcessMgmtMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_DND_NETWORK_TEST, (NodeMsgFp)dmProcessMgmtMsg, 0); // Requests handled by MNODE - dndSetMsgHandle(pWrapper, TDMT_MND_STATUS_RSP, (NodeMsgFp)dmProcessMgmtMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_GRANT_RSP, (NodeMsgFp)dmProcessMgmtMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_AUTH_RSP, (NodeMsgFp)dmProcessMgmtMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_STATUS_RSP, (NodeMsgFp)dmProcessMgmtMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_GRANT_RSP, (NodeMsgFp)dmProcessMgmtMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_AUTH_RSP, (NodeMsgFp)dmProcessMgmtMsg, 0); } diff --git a/source/dnode/mgmt/mnode/src/mmMsg.c b/source/dnode/mgmt/mnode/src/mmMsg.c index 8dbea36c22..333c4954ba 100644 --- a/source/dnode/mgmt/mnode/src/mmMsg.c +++ b/source/dnode/mgmt/mnode/src/mmMsg.c @@ -75,78 +75,78 @@ int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { void mmInitMsgHandles(SMgmtWrapper *pWrapper) { // Requests handled by DNODE - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE_RSP, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE_RSP, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE_RSP, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE_RSP, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE_RSP, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE_RSP, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE_RSP, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE_RSP, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE_RSP, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE_RSP, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); // Requests handled by MNODE - dndSetMsgHandle(pWrapper, TDMT_MND_CONNECT, (NodeMsgFp)mmProcessReadMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_ACCT, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_ACCT, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_ACCT, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_USER, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_USER, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_USER, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_GET_USER_AUTH, (NodeMsgFp)mmProcessReadMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_DNODE, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_CONFIG_DNODE, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_DNODE, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_MNODE, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_MNODE, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_QNODE, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_QNODE, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_SNODE, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_SNODE, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_BNODE, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_BNODE, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_DB, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_DB, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_USE_DB, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_DB, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_SYNC_DB, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_COMPACT_DB, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_FUNC, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_RETRIEVE_FUNC, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_FUNC, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STB, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_STB, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_STB, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_TABLE_META, (NodeMsgFp)mmProcessReadMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_VGROUP_LIST, (NodeMsgFp)mmProcessReadMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_KILL_QUERY, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_KILL_CONN, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_HEARTBEAT, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_SHOW, (NodeMsgFp)mmProcessReadMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_SHOW_RETRIEVE, (NodeMsgFp)mmProcessReadMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_STATUS, (NodeMsgFp)mmProcessReadMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_KILL_TRANS, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_GRANT, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_AUTH, (NodeMsgFp)mmProcessReadMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_TOPIC, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_TOPIC, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_TOPIC, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_SUBSCRIBE, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_MQ_COMMIT_OFFSET, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_MND_GET_SUB_EP, (NodeMsgFp)mmProcessReadMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_CONNECT, (NodeMsgFp)mmProcessReadMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_ACCT, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_ACCT, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_ACCT, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_USER, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_USER, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_USER, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_GET_USER_AUTH, (NodeMsgFp)mmProcessReadMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_DNODE, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_CONFIG_DNODE, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_DNODE, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_MNODE, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_MNODE, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_QNODE, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_QNODE, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_SNODE, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_SNODE, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_BNODE, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_BNODE, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_DB, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_DB, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_USE_DB, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_DB, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_SYNC_DB, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_COMPACT_DB, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_FUNC, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_RETRIEVE_FUNC, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_FUNC, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STB, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_STB, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_STB, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_TABLE_META, (NodeMsgFp)mmProcessReadMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_VGROUP_LIST, (NodeMsgFp)mmProcessReadMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_KILL_QUERY, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_KILL_CONN, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_HEARTBEAT, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_SHOW, (NodeMsgFp)mmProcessReadMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_SHOW_RETRIEVE, (NodeMsgFp)mmProcessReadMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_STATUS, (NodeMsgFp)mmProcessReadMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_KILL_TRANS, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_GRANT, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_AUTH, (NodeMsgFp)mmProcessReadMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_TOPIC, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_TOPIC, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_TOPIC, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_SUBSCRIBE, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_MQ_COMMIT_OFFSET, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_GET_SUB_EP, (NodeMsgFp)mmProcessReadMsg, 0); // Requests handled by VNODE - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN_RSP, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB_RSP, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB_RSP, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB_RSP, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB_RSP, (NodeMsgFp)mmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, (NodeMsgFp)mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, (NodeMsgFp)mmProcessWriteMsg, 0); } diff --git a/source/dnode/mgmt/qnode/src/qmMsg.c b/source/dnode/mgmt/qnode/src/qmMsg.c index fa7d42b3e3..ddcd791b7b 100644 --- a/source/dnode/mgmt/qnode/src/qmMsg.c +++ b/source/dnode/mgmt/qnode/src/qmMsg.c @@ -54,4 +54,16 @@ int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { } } -void qmInitMsgHandles(SMgmtWrapper *pWrapper) {} +void qmInitMsgHandles(SMgmtWrapper *pWrapper) { + // Requests handled by VNODE + dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, (NodeMsgFp)qmProcessQueryMsg, 1); + dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, (NodeMsgFp)qmProcessQueryMsg, 1); + dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, (NodeMsgFp)qmProcessFetchMsg, 1); + dndSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, (NodeMsgFp)qmProcessFetchMsg, 1); + + dndSetMsgHandle(pWrapper, TDMT_VND_RES_READY, (NodeMsgFp)qmProcessFetchMsg, 1); + dndSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, (NodeMsgFp)qmProcessFetchMsg, 1); + dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, (NodeMsgFp)qmProcessFetchMsg, 1); + dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, (NodeMsgFp)qmProcessFetchMsg, 1); + dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES, (NodeMsgFp)qmProcessFetchMsg, 1); +} diff --git a/source/dnode/mgmt/snode/src/smMsg.c b/source/dnode/mgmt/snode/src/smMsg.c index 1bff5597bf..0986556207 100644 --- a/source/dnode/mgmt/snode/src/smMsg.c +++ b/source/dnode/mgmt/snode/src/smMsg.c @@ -56,6 +56,6 @@ int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { void smInitMsgHandles(SMgmtWrapper *pWrapper) { // Requests handled by SNODE - dndSetMsgHandle(pWrapper, TDMT_SND_TASK_DEPLOY, (NodeMsgFp)smProcessMgmtMsg); - dndSetMsgHandle(pWrapper, TDMT_SND_TASK_EXEC, (NodeMsgFp)smProcessExecMsg); + dndSetMsgHandle(pWrapper, TDMT_SND_TASK_DEPLOY, (NodeMsgFp)smProcessMgmtMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_SND_TASK_EXEC, (NodeMsgFp)smProcessExecMsg, 0); } diff --git a/source/dnode/mgmt/vnode/src/vmMsg.c b/source/dnode/mgmt/vnode/src/vmMsg.c index 375a83421a..9f86351985 100644 --- a/source/dnode/mgmt/vnode/src/vmMsg.c +++ b/source/dnode/mgmt/vnode/src/vmMsg.c @@ -243,42 +243,42 @@ int32_t vmProcessCompactVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { void vmInitMsgHandles(SMgmtWrapper *pWrapper) { // Requests handled by VNODE - dndSetMsgHandle(pWrapper, TDMT_VND_SUBMIT, (NodeMsgFp)vmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, (NodeMsgFp)vmProcessQueryMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, (NodeMsgFp)vmProcessQueryMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, (NodeMsgFp)vmProcessFetchMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, (NodeMsgFp)vmProcessFetchMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, (NodeMsgFp)vmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_UPDATE_TAG_VAL, (NodeMsgFp)vmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_TABLE_META, (NodeMsgFp)vmProcessFetchMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_TABLES_META, (NodeMsgFp)vmProcessFetchMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONSUME, (NodeMsgFp)vmProcessQueryMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_QUERY, (NodeMsgFp)vmProcessQueryMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONNECT, (NodeMsgFp)vmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_DISCONNECT, (NodeMsgFp)vmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_RES_READY, (NodeMsgFp)vmProcessFetchMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, (NodeMsgFp)vmProcessFetchMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, (NodeMsgFp)vmProcessFetchMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, (NodeMsgFp)vmProcessFetchMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB, (NodeMsgFp)vmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB, (NodeMsgFp)vmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB, (NodeMsgFp)vmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_TABLE, (NodeMsgFp)vmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, (NodeMsgFp)vmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TABLE, (NodeMsgFp)vmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES, (NodeMsgFp)vmProcessFetchMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES_FETCH, (NodeMsgFp)vmProcessFetchMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN, (NodeMsgFp)vmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB, (NodeMsgFp)vmProcessWriteMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessFetchMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_CONSUME, (NodeMsgFp)vmProcessFetchMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)vmProcessFetchMsg); - dndSetMsgHandle(pWrapper, TDMT_VND_TASK_EXEC, (NodeMsgFp)vmProcessFetchMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_SUBMIT, (NodeMsgFp)vmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, (NodeMsgFp)vmProcessQueryMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, (NodeMsgFp)vmProcessQueryMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, (NodeMsgFp)vmProcessFetchMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, (NodeMsgFp)vmProcessFetchMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, (NodeMsgFp)vmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_UPDATE_TAG_VAL, (NodeMsgFp)vmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_TABLE_META, (NodeMsgFp)vmProcessFetchMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_TABLES_META, (NodeMsgFp)vmProcessFetchMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONSUME, (NodeMsgFp)vmProcessQueryMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_QUERY, (NodeMsgFp)vmProcessQueryMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONNECT, (NodeMsgFp)vmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_DISCONNECT, (NodeMsgFp)vmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_RES_READY, (NodeMsgFp)vmProcessFetchMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, (NodeMsgFp)vmProcessFetchMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, (NodeMsgFp)vmProcessFetchMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, (NodeMsgFp)vmProcessFetchMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB, (NodeMsgFp)vmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB, (NodeMsgFp)vmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB, (NodeMsgFp)vmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_TABLE, (NodeMsgFp)vmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, (NodeMsgFp)vmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TABLE, (NodeMsgFp)vmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES, (NodeMsgFp)vmProcessFetchMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES_FETCH, (NodeMsgFp)vmProcessFetchMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN, (NodeMsgFp)vmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB, (NodeMsgFp)vmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessFetchMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_CONSUME, (NodeMsgFp)vmProcessFetchMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)vmProcessFetchMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_TASK_EXEC, (NodeMsgFp)vmProcessFetchMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, (NodeMsgFp)vmProcessMgmtMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE, (NodeMsgFp)vmProcessMgmtMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE, (NodeMsgFp)vmProcessMgmtMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE, (NodeMsgFp)vmProcessMgmtMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE, (NodeMsgFp)vmProcessMgmtMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, (NodeMsgFp)vmProcessMgmtMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE, (NodeMsgFp)vmProcessMgmtMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE, (NodeMsgFp)vmProcessMgmtMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE, (NodeMsgFp)vmProcessMgmtMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE, (NodeMsgFp)vmProcessMgmtMsg, 0); } diff --git a/source/dnode/mgmt/vnode/src/vmWorker.c b/source/dnode/mgmt/vnode/src/vmWorker.c index b0fe8d55e1..52ee624103 100644 --- a/source/dnode/mgmt/vnode/src/vmWorker.c +++ b/source/dnode/mgmt/vnode/src/vmWorker.c @@ -162,7 +162,7 @@ static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueTyp SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); if (pVnode == NULL) { - dError("vgId:%d, failed to write msg:%p to queue since %s", pHead->vgId, pMsg, terrstr()); + dError("vgId:%d, failed to write msg:%p to vnode-queue since %s", pHead->vgId, pMsg, terrstr()); return -1; } From 5f9f8ac7e2f7ffae137e0c9f0e7a47884f0296a2 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 22 Mar 2022 10:03:17 +0800 Subject: [PATCH 2/6] fix double free --- source/dnode/mgmt/vnode/src/vmWorker.c | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/source/dnode/mgmt/vnode/src/vmWorker.c b/source/dnode/mgmt/vnode/src/vmWorker.c index d5f0a9d20e..035b805d0e 100644 --- a/source/dnode/mgmt/vnode/src/vmWorker.c +++ b/source/dnode/mgmt/vnode/src/vmWorker.c @@ -62,11 +62,10 @@ static void vmProcessQueryQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) { int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, &pMsg->rpcMsg); if (code != 0) { vmSendRsp(pVnode->pWrapper, pMsg, code); + dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); + rpcFreeCont(pMsg->rpcMsg.pCont); + taosFreeQitem(pMsg); } - - dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); - rpcFreeCont(pMsg->rpcMsg.pCont); - taosFreeQitem(pMsg); } static void vmProcessFetchQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) { @@ -74,12 +73,10 @@ static void vmProcessFetchQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) { int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg); if (code != 0) { vmSendRsp(pVnode->pWrapper, pMsg, code); + dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); + rpcFreeCont(pMsg->rpcMsg.pCont); + taosFreeQitem(pMsg); } - - dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); - // TODO: handle invalid write - /*rpcFreeCont(pMsg->rpcMsg.pCont);*/ - /*taosFreeQitem(pMsg);*/ } static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) { From c5c00d595568e667a609c85e1d1c107fe9336517 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 22 Mar 2022 10:42:02 +0800 Subject: [PATCH 3/6] sync refactor --- source/libs/sync/inc/syncEnv.h | 6 +++--- source/libs/sync/inc/syncInt.h | 5 +++++ source/libs/sync/inc/syncUtil.h | 2 +- source/libs/sync/src/syncMain.c | 15 ++++++++++----- source/libs/sync/src/syncUtil.c | 5 ++++- source/libs/sync/test/syncUtilTest.cpp | 2 +- 6 files changed, 24 insertions(+), 11 deletions(-) diff --git a/source/libs/sync/inc/syncEnv.h b/source/libs/sync/inc/syncEnv.h index 66c7c8620d..7940d7f436 100644 --- a/source/libs/sync/inc/syncEnv.h +++ b/source/libs/sync/inc/syncEnv.h @@ -31,10 +31,10 @@ extern "C" { #define TIMER_MAX_MS 0x7FFFFFFF #define ENV_TICK_TIMER_MS 1000 #define PING_TIMER_MS 1000 -#define ELECT_TIMER_MS_MIN 1500 -#define ELECT_TIMER_MS_MAX 3000 +#define ELECT_TIMER_MS_MIN 150 +#define ELECT_TIMER_MS_MAX (ELECT_TIMER_MS_MIN * 2) #define ELECT_TIMER_MS_RANGE (ELECT_TIMER_MS_MAX - ELECT_TIMER_MS_MIN) -#define HEARTBEAT_TIMER_MS 300 +#define HEARTBEAT_TIMER_MS 30 #define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0}) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index eaf4d56f67..cfa87048a7 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -160,6 +160,11 @@ typedef struct SSyncNode { SSyncLogStore* pLogStore; SyncIndex commitIndex; + // timer ms init + int32_t pingBaseLine; + int32_t electBaseLine; + int32_t hbBaseLine; + // ping timer tmr_h pPingTimer; int32_t pingTimerMS; diff --git a/source/libs/sync/inc/syncUtil.h b/source/libs/sync/inc/syncUtil.h index 1b702c2528..a4bb11afc6 100644 --- a/source/libs/sync/inc/syncUtil.h +++ b/source/libs/sync/inc/syncUtil.h @@ -44,7 +44,7 @@ void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest); // ---- misc ---- int32_t syncUtilRand(int32_t max); -int32_t syncUtilElectRandomMS(); +int32_t syncUtilElectRandomMS(int32_t min, int32_t max); int32_t syncUtilQuorum(int32_t replicaNum); cJSON* syncUtilNodeInfo2Json(const SNodeInfo* p); cJSON* syncUtilRaftId2Json(const SRaftId* p); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index facb0180f9..b92317ef3f 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -242,9 +242,14 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { assert(pSyncNode->pLogStore != NULL); pSyncNode->commitIndex = SYNC_INDEX_INVALID; + // timer ms init + pSyncNode->pingBaseLine = PING_TIMER_MS; + pSyncNode->electBaseLine = ELECT_TIMER_MS_MIN; + pSyncNode->hbBaseLine = HEARTBEAT_TIMER_MS; + // init ping timer pSyncNode->pPingTimer = NULL; - pSyncNode->pingTimerMS = PING_TIMER_MS; + pSyncNode->pingTimerMS = pSyncNode->pingBaseLine; atomic_store_64(&pSyncNode->pingTimerLogicClock, 0); atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0); pSyncNode->FpPingTimerCB = syncNodeEqPingTimer; @@ -252,7 +257,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { // init elect timer pSyncNode->pElectTimer = NULL; - pSyncNode->electTimerMS = syncUtilElectRandomMS(); + pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine); atomic_store_64(&pSyncNode->electTimerLogicClock, 0); atomic_store_64(&pSyncNode->electTimerLogicClockUser, 0); pSyncNode->FpElectTimerCB = syncNodeEqElectTimer; @@ -260,7 +265,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { // init heartbeat timer pSyncNode->pHeartbeatTimer = NULL; - pSyncNode->heartbeatTimerMS = HEARTBEAT_TIMER_MS; + pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine; atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0); atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0); pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer; @@ -394,7 +399,7 @@ int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) { int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) { int32_t ret = 0; - int32_t electMS = syncUtilElectRandomMS(); + int32_t electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine); ret = syncNodeRestartElectTimer(pSyncNode, electMS); return ret; } @@ -763,7 +768,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { syncTimeoutDestroy(pSyncMsg); // reset timer ms - pSyncNode->electTimerMS = syncUtilElectRandomMS(); + pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine); taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager, &pSyncNode->pPingTimer); } else { diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 9486405331..8fc17ccb51 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -96,7 +96,10 @@ void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest) { int32_t syncUtilRand(int32_t max) { return taosRand() % max; } -int32_t syncUtilElectRandomMS() { return ELECT_TIMER_MS_MIN + syncUtilRand(ELECT_TIMER_MS_RANGE); } +int32_t syncUtilElectRandomMS(int32_t min, int32_t max) { + assert(min > 0 && max > 0 && max >= min); + return min + syncUtilRand(max - min); +} int32_t syncUtilQuorum(int32_t replicaNum) { return replicaNum / 2 + 1; } diff --git a/source/libs/sync/test/syncUtilTest.cpp b/source/libs/sync/test/syncUtilTest.cpp index 663db3a7b3..ae2afd2f53 100644 --- a/source/libs/sync/test/syncUtilTest.cpp +++ b/source/libs/sync/test/syncUtilTest.cpp @@ -16,7 +16,7 @@ void logTest() { void electRandomMSTest() { for (int i = 0; i < 10; ++i) { - int32_t ms = syncUtilElectRandomMS(); + int32_t ms = syncUtilElectRandomMS(150, 300); printf("syncUtilElectRandomMS: %d \n", ms); } } From fe8714fe56b14b8fd71251faa75a3e08cf5da78f Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 22 Mar 2022 10:46:16 +0800 Subject: [PATCH 4/6] refact worker --- include/util/tqueue.h | 3 -- include/util/tworker.h | 33 +++++++---------- source/dnode/mgmt/vnode/inc/vmInt.h | 2 +- source/dnode/mgmt/vnode/src/vmWorker.c | 10 ++--- source/util/src/tqueue.c | 48 ------------------------ source/util/src/tworker.c | 51 +------------------------- 6 files changed, 22 insertions(+), 125 deletions(-) diff --git a/include/util/tqueue.h b/include/util/tqueue.h index d51184edfc..9faf90113e 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -70,10 +70,7 @@ int32_t taosGetQueueNumber(STaosQset *qset); int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp); int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FItems *itemsFp); - -int32_t taosReadQitemFromQsetByThread(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp, int32_t threadId); void taosResetQsetThread(STaosQset *qset, void *pItem); - int32_t taosGetQueueItemsNumber(STaosQueue *queue); int32_t taosGetQsetItemsNumber(STaosQset *qset); diff --git a/include/util/tworker.h b/include/util/tworker.h index dabd1ac9b3..3da8a1db63 100644 --- a/include/util/tworker.h +++ b/include/util/tworker.h @@ -27,33 +27,33 @@ typedef struct SWWorkerPool SWWorkerPool; typedef struct SQWorker { int32_t id; // worker ID - TdThread thread; // thread + TdThread thread; // thread SQWorkerPool *pool; -} SQWorker, SFWorker; +} SQWorker; typedef struct SQWorkerPool { - int32_t max; // max number of workers - int32_t min; // min number of workers - int32_t num; // current number of workers - STaosQset *qset; - const char *name; - SQWorker *workers; + int32_t max; // max number of workers + int32_t min; // min number of workers + int32_t num; // current number of workers + STaosQset *qset; + const char *name; + SQWorker *workers; TdThreadMutex mutex; -} SQWorkerPool, SFWorkerPool; +} SQWorkerPool; typedef struct SWWorker { int32_t id; // worker id - TdThread thread; // thread + TdThread thread; // thread STaosQall *qall; STaosQset *qset; // queue set SWWorkerPool *pool; } SWWorker; typedef struct SWWorkerPool { - int32_t max; // max number of workers - int32_t nextId; // from 0 to max-1, cyclic - const char *name; - SWWorker *workers; + int32_t max; // max number of workers + int32_t nextId; // from 0 to max-1, cyclic + const char *name; + SWWorker *workers; TdThreadMutex mutex; } SWWorkerPool; @@ -62,11 +62,6 @@ void tQWorkerCleanup(SQWorkerPool *pool); STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp); void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue); -int32_t tFWorkerInit(SFWorkerPool *pool); -void tFWorkerCleanup(SFWorkerPool *pool); -STaosQueue *tFWorkerAllocQueue(SFWorkerPool *pool, void *ahandle, FItem fp); -void tFWorkerFreeQueue(SFWorkerPool *pool, STaosQueue *queue); - int32_t tWWorkerInit(SWWorkerPool *pool); void tWWorkerCleanup(SWWorkerPool *pool); STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp); diff --git a/source/dnode/mgmt/vnode/inc/vmInt.h b/source/dnode/mgmt/vnode/inc/vmInt.h index d670e073da..ba59258d07 100644 --- a/source/dnode/mgmt/vnode/inc/vmInt.h +++ b/source/dnode/mgmt/vnode/inc/vmInt.h @@ -30,7 +30,7 @@ typedef struct SVnodesMgmt { SVnodesStat state; STfs *pTfs; SQWorkerPool queryPool; - SFWorkerPool fetchPool; + SQWorkerPool fetchPool; SWWorkerPool syncPool; SWWorkerPool writePool; const char *path; diff --git a/source/dnode/mgmt/vnode/src/vmWorker.c b/source/dnode/mgmt/vnode/src/vmWorker.c index 035b805d0e..e0632cee68 100644 --- a/source/dnode/mgmt/vnode/src/vmWorker.c +++ b/source/dnode/mgmt/vnode/src/vmWorker.c @@ -262,7 +262,7 @@ int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessWriteQueue); pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessApplyQueue); pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue); - pVnode->pFetchQ = tFWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue); + pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue); pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue); if (pVnode->pApplyQ == NULL || pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pFetchQ == NULL || @@ -277,7 +277,7 @@ int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ); - tFWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ); + tQWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ); tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ); tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ); tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ); @@ -303,11 +303,11 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) { pQPool->max = maxQueryThreads; if (tQWorkerInit(pQPool) != 0) return -1; - SFWorkerPool *pFPool = &pMgmt->fetchPool; + SQWorkerPool *pFPool = &pMgmt->fetchPool; pFPool->name = "vnode-fetch"; pFPool->min = minFetchThreads; pFPool->max = maxFetchThreads; - if (tFWorkerInit(pFPool) != 0) return -1; + if (tQWorkerInit(pFPool) != 0) return -1; SWWorkerPool *pWPool = &pMgmt->writePool; pWPool->name = "vnode-write"; @@ -330,7 +330,7 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) { void vmStopWorker(SVnodesMgmt *pMgmt) { dndCleanupWorker(&pMgmt->mgmtWorker); - tFWorkerCleanup(&pMgmt->fetchPool); + tQWorkerCleanup(&pMgmt->fetchPool); tQWorkerCleanup(&pMgmt->queryPool); tWWorkerCleanup(&pMgmt->writePool); tWWorkerCleanup(&pMgmt->syncPool); diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index 99675630f4..258ca1402f 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -68,7 +68,6 @@ STaosQueue *taosOpenQueue() { return NULL; } - queue->threadId = -1; uDebug("queue:%p is opened", queue); return queue; } @@ -437,53 +436,6 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahand return code; } -int32_t taosReadQitemFromQsetByThread(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp, int32_t threadId) { - STaosQnode *pNode = NULL; - int32_t code = -1; - - tsem_wait(&qset->sem); - - taosThreadMutexLock(&qset->mutex); - - for (int32_t i = 0; i < qset->numOfQueues; ++i) { - if (qset->current == NULL) qset->current = qset->head; - STaosQueue *queue = qset->current; - if (queue) qset->current = queue->next; - if (queue == NULL) break; - if (queue->head == NULL) continue; - if (queue->threadId != -1 && queue->threadId != threadId) { - code = 0; - continue; - } - - taosThreadMutexLock(&queue->mutex); - - if (queue->head) { - pNode = queue->head; - pNode->queue = queue; - queue->threadId = threadId; - *ppItem = pNode->item; - - if (ahandle) *ahandle = queue->ahandle; - if (itemFp) *itemFp = queue->itemFp; - - queue->head = pNode->next; - if (queue->head == NULL) queue->tail = NULL; - queue->numOfItems--; - atomic_sub_fetch_32(&qset->numOfItems, 1); - code = 1; - uTrace("item:%p is read out from queue:%p, items:%d", *ppItem, queue, queue->numOfItems); - } - - taosThreadMutexUnlock(&queue->mutex); - if (pNode) break; - } - - taosThreadMutexUnlock(&qset->mutex); - - return code; -} - void taosResetQsetThread(STaosQset *qset, void *pItem) { if (pItem == NULL) return; STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode)); diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 78098af3bd..7d7f819dec 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -93,7 +93,7 @@ static void *tQWorkerThreadFp(SQWorker *worker) { return NULL; } -STaosQueue *tWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp, ThreadFp threadFp) { +STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) { taosThreadMutexLock(&pool->mutex); STaosQueue *queue = taosOpenQueue(); if (queue == NULL) { @@ -114,7 +114,7 @@ STaosQueue *tWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp, Threa taosThreadAttrInit(&thAttr); taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); - if (taosThreadCreate(&worker->thread, &thAttr, threadFp, worker) != 0) { + if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tQWorkerThreadFp, worker) != 0) { uError("worker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno)); taosCloseQueue(queue); terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -134,58 +134,11 @@ STaosQueue *tWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp, Threa return queue; } -STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) { - return tWorkerAllocQueue(pool, ahandle, fp, (ThreadFp)tQWorkerThreadFp); -} - void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) { taosCloseQueue(queue); uDebug("worker:%s, queue:%p is freed", pool->name, queue); } -int32_t tFWorkerInit(SFWorkerPool *pool) { return tQWorkerInit((SQWorkerPool *)pool); } - -void tFWorkerCleanup(SFWorkerPool *pool) { tQWorkerCleanup(pool); } - -static void *tFWorkerThreadFp(SQWorker *worker) { - SQWorkerPool *pool = worker->pool; - - FItem fp = NULL; - void *msg = NULL; - void *ahandle = NULL; - int32_t code = 0; - - taosBlockSIGPIPE(); - setThreadName(pool->name); - uDebug("worker:%s:%d is running", pool->name, worker->id); - - while (1) { - code = taosReadQitemFromQsetByThread(pool->qset, (void **)&msg, &ahandle, &fp, worker->id); - - if (code < 0) { - uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset); - break; - } else if (code == 0) { - // uTrace("worker:%s:%d qset:%p, got no message and continue", pool->name, worker->id, pool->qset); - continue; - } - - if (fp != NULL) { - (*fp)(ahandle, msg); - } - - taosResetQsetThread(pool->qset, msg); - } - - return NULL; -} - -STaosQueue *tFWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) { - return tWorkerAllocQueue(pool, ahandle, fp, (ThreadFp)tQWorkerThreadFp); -} - -void tFWorkerFreeQueue(SFWorkerPool *pool, STaosQueue *queue) { tQWorkerFreeQueue(pool, queue); } - int32_t tWWorkerInit(SWWorkerPool *pool) { pool->nextId = 0; pool->workers = calloc(pool->max, sizeof(SWWorker)); From a4e6423b51747ba405846252989a1241f42e5d42 Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Tue, 22 Mar 2022 12:01:26 +0800 Subject: [PATCH 5/6] [add tmq cases] --- tests/script/jenkins/basic.txt | 3 ++ tests/script/tsim/tmq/basic.sim | 75 +++++++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+) create mode 100644 tests/script/tsim/tmq/basic.sim diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 0c2b4fe1f4..cc0576e1f3 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -21,4 +21,7 @@ # ---- query ./test.sh -f tsim/query/interval.sim + +# ---- tmq +./test.sh -f tsim/tmq/basic.sim #======================b1-end=============== diff --git a/tests/script/tsim/tmq/basic.sim b/tests/script/tsim/tmq/basic.sim new file mode 100644 index 0000000000..a091e45061 --- /dev/null +++ b/tests/script/tsim/tmq/basic.sim @@ -0,0 +1,75 @@ +system sh/stop_dnodes.sh + +system sh/deploy.sh -n dnode1 -i 1 +system sh/cfg.sh -n dnode1 -c wal -v 1 +system sh/exec.sh -n dnode1 -s start +sleep 500 +sql connect + +$loop_cnt = 0 +check_dnode_ready: + $loop_cnt = $loop_cnt + 1 + sleep 100 + if $loop_cnt == 10 then + print ====> dnode not ready! + return -1 + endi +sql show dnodes +print ===> $rows $data00 $data01 $data02 $data03 $data04 $data05 +if $data00 != 1 then + return -1 +endi +if $data04 != ready then + goto check_dnode_ready +endi + +#root@trd02 /data2/dnode $ tmq_demo --help +#Used to tmq_demo +# -c Configuration directory, default is +# -d The name of the database to be created, default is tmqdb +# -s The name of the super table to be created, default is stb +# -f The file of result, default is ./tmqResult.txt +# -w The path of vnode of wal, default is /data2/dnode/data/vnodes/vnode2/wal +# -t numOfThreads, default is 1 +# -n numOfTables, default is 1 +# -v numOfVgroups, default is 1 +# -a runMode, default is 0 +# -l numOfColumn, default is 1 +# -q ratio, default is 1.000000 +# -b batchNumOfRow, default is 1 +# -r totalRowsOfPerTbl, default is 10000 +# -m startTimestamp, default is 1640966400000 [2022-01-01 00:00:00] +# -g showMsgFlag, default is 0 +# +#system_content ../../debug/tests/test/c/tmq_demo -c ../../sim/tsim/cfg +system ../../debug/tests/test/c/tmq_demo -c ../../sim/tsim/cfg +print result-> $system_content + +sql show databases +print ===> $rows $data00 $data01 $data02 $data03 +if $rows != 1 then + return -1 +endi +if $data00 != tmqdb then + return -1 +endi + +sql use tmqdb +sql show tables +print ===> $rows $data00 $data01 $data02 $data03 +if $rows != 1 then + return -1 +endi +if $data00 != stb0 then + return -1 +endi + +sql select count(*) from stb0 +print ===> $rows $data00 $data01 $data02 $data03 +if $rows != 1 then + return -1 +endi +if $data00 != 10000 then + return -1 +endi +#system sh/exec.sh -n dnode1 -s stop -x SIGINT From 4091771274f95910111e46c41d9b5579363b62bf Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 22 Mar 2022 14:38:26 +0800 Subject: [PATCH 6/6] refact queue --- include/util/tworker.h | 32 ++++++++++++++ source/dnode/mgmt/mnode/inc/mmInt.h | 6 +-- source/dnode/mgmt/mnode/src/mmWorker.c | 28 ++++++------ source/util/src/tqueue.c | 1 - source/util/src/tworker.c | 59 +++++++++++++++++++++++++- 5 files changed, 108 insertions(+), 18 deletions(-) diff --git a/include/util/tworker.h b/include/util/tworker.h index 3da8a1db63..91f4fbf7ff 100644 --- a/include/util/tworker.h +++ b/include/util/tworker.h @@ -67,6 +67,38 @@ void tWWorkerCleanup(SWWorkerPool *pool); STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp); void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue); +typedef struct { + const char *name; + int32_t minNum; + int32_t maxNum; + FItem fp; + void *param; +} SQWorkerAllCfg; + +typedef struct { + const char *name; + STaosQueue *queue; + SQWorkerPool pool; +} SQWorkerAll; + +typedef struct { + const char *name; + int32_t maxNum; + FItems fp; + void *param; +} SWWorkerAllCfg; + +typedef struct { + const char *name; + STaosQueue *queue; + SWWorkerPool pool; +} SWWorkerAll; + +int32_t tQWorkerAllInit(SQWorkerAll *pWorker, const SQWorkerAllCfg *pCfg); +void tQWorkerAllCleanup(SQWorkerAll *pWorker); +int32_t tWWorkerAllInit(SWWorkerAll *pWorker, const SWWorkerAllCfg *pCfg); +void tWWorkerAllCleanup(SWWorkerAll *pWorker); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/mnode/inc/mmInt.h b/source/dnode/mgmt/mnode/inc/mmInt.h index 1751131764..06ed637791 100644 --- a/source/dnode/mgmt/mnode/inc/mmInt.h +++ b/source/dnode/mgmt/mnode/inc/mmInt.h @@ -28,9 +28,9 @@ typedef struct SMnodeMgmt { SDnode *pDnode; SMgmtWrapper *pWrapper; const char *path; - SDnodeWorker readWorker; - SDnodeWorker writeWorker; - SDnodeWorker syncWorker; + SQWorkerAll readWorker; + SQWorkerAll writeWorker; + SQWorkerAll syncWorker; SReplica replicas[TSDB_MAX_REPLICA]; int8_t replica; int8_t selfIndex; diff --git a/source/dnode/mgmt/mnode/src/mmWorker.c b/source/dnode/mgmt/mnode/src/mmWorker.c index ab9f46e323..470128940b 100644 --- a/source/dnode/mgmt/mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mnode/src/mmWorker.c @@ -42,9 +42,9 @@ static void mmProcessQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { taosFreeQitem(pMsg); } -static int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SDnodeWorker *pWorker, SNodeMsg *pMsg) { +static int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SQWorkerAll *pWorker, SNodeMsg *pMsg) { dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); - return dndWriteMsgToWorker(pWorker, pMsg); + return taosWriteQitem(pWorker->queue, pMsg); } int32_t mmProcessWriteMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { @@ -59,7 +59,7 @@ int32_t mmProcessReadMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg); } -static int32_t mmPutRpcMsgToWorker(SMnodeMgmt *pMgmt, SDnodeWorker *pWorker, SRpcMsg *pRpc) { +static int32_t mmPutRpcMsgToWorker(SMnodeMgmt *pMgmt, SQWorkerAll *pWorker, SRpcMsg *pRpc) { SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); if (pMsg == NULL) { return -1; @@ -68,7 +68,7 @@ static int32_t mmPutRpcMsgToWorker(SMnodeMgmt *pMgmt, SDnodeWorker *pWorker, SRp dTrace("msg:%p, is created and put into worker:%s, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType)); pMsg->rpcMsg = *pRpc; - int32_t code = dndWriteMsgToWorker(pWorker, pMsg); + int32_t code = taosWriteQitem(pWorker->queue, pMsg); if (code != 0) { dTrace("msg:%p, is freed", pMsg); taosFreeQitem(pMsg); @@ -89,18 +89,20 @@ int32_t mmPutMsgToReadQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { } int32_t mmStartWorker(SMnodeMgmt *pMgmt) { - if (dndInitWorker(pMgmt, &pMgmt->readWorker, DND_WORKER_SINGLE, "mnode-read", 0, 1, mmProcessQueue) != 0) { - dError("failed to start mnode read worker since %s", terrstr()); + SQWorkerAllCfg cfg = {.minNum = 0, .maxNum = 1, .name = "mnode-read", .fp = (FItem)mmProcessQueue, .param = pMgmt}; + + if (tQWorkerAllInit(&pMgmt->readWorker, &cfg) != 0) { + dError("failed to start mnode-read worker since %s", terrstr()); return -1; } - if (dndInitWorker(pMgmt, &pMgmt->writeWorker, DND_WORKER_SINGLE, "mnode-write", 0, 1, mmProcessQueue) != 0) { - dError("failed to start mnode write worker since %s", terrstr()); + if (tQWorkerAllInit(&pMgmt->writeWorker, &cfg) != 0) { + dError("failed to start mnode-write worker since %s", terrstr()); return -1; } - if (dndInitWorker(pMgmt, &pMgmt->syncWorker, DND_WORKER_SINGLE, "mnode-sync", 0, 1, mmProcessQueue) != 0) { - dError("failed to start mnode sync worker since %s", terrstr()); + if (tQWorkerAllInit(&pMgmt->syncWorker, &cfg) != 0) { + dError("failed to start mnode sync-worker since %s", terrstr()); return -1; } @@ -108,7 +110,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { } void mmStopWorker(SMnodeMgmt *pMgmt) { - dndCleanupWorker(&pMgmt->readWorker); - dndCleanupWorker(&pMgmt->writeWorker); - dndCleanupWorker(&pMgmt->syncWorker); + tQWorkerAllCleanup(&pMgmt->readWorker); + tQWorkerAllCleanup(&pMgmt->writeWorker); + tQWorkerAllCleanup(&pMgmt->syncWorker); } diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index 258ca1402f..70f2871f41 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -441,7 +441,6 @@ void taosResetQsetThread(STaosQset *qset, void *pItem) { STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode)); taosThreadMutexLock(&qset->mutex); - pNode->queue->threadId = -1; for (int32_t i = 0; i < pNode->queue->numOfItems; ++i) { tsem_post(&qset->sem); } diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 7d7f819dec..e05c4e0a78 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -22,7 +22,7 @@ typedef void *(*ThreadFp)(void *param); int32_t tQWorkerInit(SQWorkerPool *pool) { pool->qset = taosOpenQset(); - pool->workers = calloc(sizeof(SQWorker), pool->max); + pool->workers = calloc(pool->max, sizeof(SQWorker)); if (pool->workers == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -279,3 +279,60 @@ void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) { taosCloseQueue(queue); uDebug("worker:%s, queue:%p is freed", pool->name, queue); } + +int32_t tQWorkerAllInit(SQWorkerAll *pWorker, const SQWorkerAllCfg *pCfg) { + SQWorkerPool *pPool = &pWorker->pool; + pPool->name = pCfg->name; + pPool->min = pCfg->minNum; + pPool->max = pCfg->maxNum; + if (tQWorkerInit(pPool) != 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + pWorker->queue = tQWorkerAllocQueue(pPool, pCfg->param, pCfg->fp); + if (pWorker->queue == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + pWorker->name = pCfg->name; + return 0; +} + +void tQWorkerAllCleanup(SQWorkerAll *pWorker) { + if (pWorker->queue == NULL) return; + + while (!taosQueueEmpty(pWorker->queue)) { + taosMsleep(10); + } + + tQWorkerCleanup(&pWorker->pool); + tQWorkerFreeQueue(&pWorker->pool, pWorker->queue); +} + +int32_t tWWorkerAllInit(SWWorkerAll *pWorker, const SWWorkerAllCfg *pCfg) { + SWWorkerPool *pPool = &pWorker->pool; + pPool->name = pCfg->name; + pPool->max = pCfg->maxNum; + if (tWWorkerInit(pPool) != 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + pWorker->queue = tWWorkerAllocQueue(pPool, pCfg->param, pCfg->fp); + if (pWorker->queue == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + pWorker->name = pCfg->name; + return 0; +} + +void tWWorkerAllCleanup(SWWorkerAll *pWorker) { + if (pWorker->queue == NULL) return; + + while (!taosQueueEmpty(pWorker->queue)) { + taosMsleep(10); + } + + tWWorkerCleanup(&pWorker->pool); + tWWorkerFreeQueue(&pWorker->pool, pWorker->queue); +}