From a09c553ab8e06cd5dbc15ba40e59ba3697b44ca1 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 21 Dec 2021 16:59:26 +0800 Subject: [PATCH] TD-10431 refact vnode msg processor --- include/common/taosmsg.h | 20 +++---- include/dnode/vnode/vnode.h | 29 +-------- source/dnode/mgmt/impl/src/dndVnodes.c | 82 +++++++++++--------------- source/dnode/vnode/impl/src/vnodeInt.c | 52 +++++----------- 4 files changed, 61 insertions(+), 122 deletions(-) diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index 0b85c4eb88..8229cae281 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -100,16 +100,16 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_STB_IN, "drop-stb-internal" ) // message from mnode to mnode // message from mnode to qnode // message from mnode to dnode -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_VNODE_IN, "create-vnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_VNODE_IN, "alter-vnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_VNODE_IN, "drop-vnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_AUTH_VNODE_IN, "auth-vnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SYNC_VNODE_IN, "sync-vnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_COMPACT_VNODE_IN, "compact-vnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_MNODE_IN, "create-mnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_MNODE_IN, "alter-mnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_MNODE_IN, "drop-mnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONFIG_DNODE_IN, "config-dnode" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_VNODE_IN, "create-vnode-internal" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_VNODE_IN, "alter-vnode-internal" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_VNODE_IN, "drop-vnode-internal" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_AUTH_VNODE_IN, "auth-vnode-internal" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SYNC_VNODE_IN, "sync-vnode-internal" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_COMPACT_VNODE_IN, "compact-vnode-internal" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_MNODE_IN, "create-mnode-internal" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_MNODE_IN, "alter-mnode-internal" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_MNODE_IN, "drop-mnode-internal" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONFIG_DNODE_IN, "config-dnode-internal" ) // message from qnode to vnode // message from qnode to mnode diff --git a/include/dnode/vnode/vnode.h b/include/dnode/vnode/vnode.h index 82b059e93c..d51ac48f01 100644 --- a/include/dnode/vnode/vnode.h +++ b/include/dnode/vnode/vnode.h @@ -140,7 +140,7 @@ int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); * @param pRsp The response message * @return int 0 for success, -1 for failure */ -int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); +int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); /** * @brief Process a fetch message. @@ -150,7 +150,7 @@ int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); * @param pRsp The response message * @return int 0 for success, -1 for failure */ -int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); +int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); /** * @brief Process a consume message. @@ -160,7 +160,7 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); * @param pRsp The response message * @return int 0 for success, -1 for failure */ -int vnodeProcessConsumeMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); +int vnodeProcessConsumeReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); /* ------------------------ SVnodeCfg ------------------------ */ /** @@ -241,34 +241,11 @@ void *vnodeParseReq(void *buf, SVnodeReq *pReq, uint8_t type); /* ------------------------ FOR COMPILE ------------------------ */ -#if 1 - -typedef enum { - VN_MSG_TYPE_WRITE = 1, - VN_MSG_TYPE_APPLY, - VN_MSG_TYPE_SYNC, - VN_MSG_TYPE_QUERY, - VN_MSG_TYPE_FETCH -} EVnMsgType; - -typedef struct { - int32_t curNum; - int32_t allocNum; - SRpcMsg rpcMsg[]; -} SVnodeMsg; - int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg); int32_t vnodeCompact(SVnode *pVnode); int32_t vnodeSync(SVnode *pVnode); int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); -SVnodeMsg *vnodeInitMsg(int32_t msgNum); -int32_t vnodeAppendMsg(SVnodeMsg *pMsg, SRpcMsg *pRpcMsg); -void vnodeCleanupMsg(SVnodeMsg *pMsg); -void vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg, EVnMsgType msgType); - -#endif - #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index cc4fc5b7d7..1623d032ce 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -72,8 +72,8 @@ static void dndFreeVnodeWriteQueue(SDnode *pDnode, SVnodeObj *pVnode); static void dndFreeVnodeApplyQueue(SDnode *pDnode, SVnodeObj *pVnode); static void dndFreeVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode); -static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SVnodeMsg *pMsg); -static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SVnodeMsg *pMsg); +static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg); +static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg); static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs); static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs); static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs); @@ -83,7 +83,7 @@ void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pE void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); -static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SVnodeMsg *pMsg); +static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMsg *pMsg); static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId); static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode); @@ -802,40 +802,51 @@ static void dndProcessVnodeMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) { taosFreeQitem(pMsg); } -static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SVnodeMsg *pMsg) { - vnodeProcessMsg(pVnode->pImpl, pMsg, VN_MSG_TYPE_QUERY); +static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) { + SRpcMsg *pRsp = NULL; + vnodeProcessQueryReq(pVnode->pImpl, pMsg, &pRsp); } -static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SVnodeMsg *pMsg) { - vnodeProcessMsg(pVnode->pImpl, pMsg, VN_MSG_TYPE_FETCH); +static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) { + SRpcMsg *pRsp = NULL; + vnodeProcessFetchReq(pVnode->pImpl, pMsg, &pRsp); } static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) { - SVnodeMsg *pMsg = vnodeInitMsg(numOfMsgs); - SRpcMsg *pRpcMsg = NULL; + SRpcMsg *pRpcMsg = NULL; + SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg)); for (int32_t i = 0; i < numOfMsgs; ++i) { taosGetQitem(qall, (void **)&pRpcMsg); - vnodeAppendMsg(pMsg, pRpcMsg); + + void *ptr = taosArrayPush(pArray, pRpcMsg); + assert(ptr != NULL); + taosFreeQitem(pRpcMsg); } - vnodeProcessMsg(pVnode->pImpl, pMsg, VN_MSG_TYPE_WRITE); + vnodeProcessWMsgs(pVnode->pImpl, pArray); } static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) { - SVnodeMsg *pMsg = NULL; + SRpcMsg *pMsg = NULL; + for (int32_t i = 0; i < numOfMsgs; ++i) { taosGetQitem(qall, (void **)&pMsg); - vnodeProcessMsg(pVnode->pImpl, pMsg, VN_MSG_TYPE_APPLY); + + SRpcMsg *pRsp = NULL; + (void)vnodeApplyWMsg(pVnode->pImpl, pMsg, &pRsp); } } static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) { - SVnodeMsg *pMsg = NULL; + SRpcMsg *pMsg = NULL; + for (int32_t i = 0; i < numOfMsgs; ++i) { taosGetQitem(qall, (void **)&pMsg); - vnodeProcessMsg(pVnode->pImpl, pMsg, VN_MSG_TYPE_SYNC); + + SRpcMsg *pRsp = NULL; + (void)vnodeProcessSyncReq(pVnode->pImpl, pMsg, &pRsp); } } @@ -863,40 +874,13 @@ static int32_t dndWriteRpcMsgToVnodeQueue(taos_queue pQueue, SRpcMsg *pRpcMsg) { } } -static int32_t dndWriteVnodeMsgToVnodeQueue(taos_queue pQueue, SRpcMsg *pRpcMsg) { - int32_t code = 0; - - if (pQueue == NULL) { - code = TSDB_CODE_MSG_NOT_PROCESSED; - } else { - SVnodeMsg *pMsg = vnodeInitMsg(1); - if (pMsg == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - } else { - if (vnodeAppendMsg(pMsg, pRpcMsg) != 0) { - code = terrno; - } else { - if (taosWriteQitem(pQueue, pMsg) != 0) { - code = TSDB_CODE_OUT_OF_MEMORY; - } - } - } - } - - if (code != TSDB_CODE_SUCCESS) { - SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = code}; - rpcSendResponse(&rsp); - rpcFreeCont(pRpcMsg->pCont); - } -} - static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) { SMsgHead *pHead = (SMsgHead *)pMsg->pCont; pHead->vgId = htonl(pHead->vgId); SVnodeObj *pVnode = dndAcquireVnode(pDnode, pHead->vgId); if (pVnode == NULL) { - SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno}; + SRpcMsg rsp = {.handle = pMsg->handle, .code = TSDB_CODE_VND_INVALID_VGROUP_ID}; rpcSendResponse(&rsp); rpcFreeCont(pMsg->pCont); } @@ -920,7 +904,7 @@ void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg); if (pVnode != NULL) { - dndWriteVnodeMsgToVnodeQueue(pVnode->pSyncQ, pMsg); + dndWriteRpcMsgToVnodeQueue(pVnode->pSyncQ, pMsg); dndReleaseVnode(pDnode, pVnode); } } @@ -928,7 +912,7 @@ void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg); if (pVnode != NULL) { - dndWriteVnodeMsgToVnodeQueue(pVnode->pQueryQ, pMsg); + dndWriteRpcMsgToVnodeQueue(pVnode->pQueryQ, pMsg); dndReleaseVnode(pDnode, pVnode); } } @@ -936,12 +920,12 @@ void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg); if (pVnode != NULL) { - dndWriteVnodeMsgToVnodeQueue(pVnode->pFetchQ, pMsg); + dndWriteRpcMsgToVnodeQueue(pVnode->pFetchQ, pMsg); dndReleaseVnode(pDnode, pVnode); } } -static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SVnodeMsg *pMsg) { +static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMsg *pMsg) { SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId); if (pVnode == NULL) { return -1; @@ -1106,7 +1090,7 @@ static void dndCleanupVnodeWriteWorker(SDnode *pDnode) { static int32_t dndAllocVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; - pVnode->pSyncQ = tMWorkerAllocQueue(&pMgmt->writePool, pVnode, (FProcessItems)dndProcessVnodeSyncQueue); + pVnode->pSyncQ = tMWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FProcessItems)dndProcessVnodeSyncQueue); if (pVnode->pSyncQ == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -1117,7 +1101,7 @@ static int32_t dndAllocVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode) { static void dndFreeVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; - tMWorkerFreeQueue(&pMgmt->writePool, pVnode->pSyncQ); + tMWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ); pVnode->pSyncQ = NULL; } diff --git a/source/dnode/vnode/impl/src/vnodeInt.c b/source/dnode/vnode/impl/src/vnodeInt.c index 8a6fc8bf5e..5deaffe6d2 100644 --- a/source/dnode/vnode/impl/src/vnodeInt.c +++ b/source/dnode/vnode/impl/src/vnodeInt.c @@ -15,53 +15,31 @@ #define _DEFAULT_SOURCE #include "vnodeInt.h" -#include "tqueue.h" int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg) { return 0; } -SVnode *vnodeCreate(int32_t vgId, const char *path, const SVnodeCfg *pCfg) { return NULL; } -void vnodeDrop(SVnode *pVnode) {} + int32_t vnodeCompact(SVnode *pVnode) { return 0; } + int32_t vnodeSync(SVnode *pVnode) { return 0; } int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { return 0; } -SVnodeMsg *vnodeInitMsg(int32_t msgNum) { - SVnodeMsg *pMsg = taosAllocateQitem(msgNum * sizeof(SRpcMsg *) + sizeof(SVnodeMsg)); - if (pMsg == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } else { - pMsg->allocNum = msgNum; - return pMsg; - } +int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { + vInfo("query message is processed"); + return 0; } -int32_t vnodeAppendMsg(SVnodeMsg *pMsg, SRpcMsg *pRpcMsg) { - if (pMsg->curNum >= pMsg->allocNum) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - pMsg->rpcMsg[pMsg->curNum++] = *pRpcMsg; +int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { + vInfo("fetch message is processed"); + return 0; } -void vnodeCleanupMsg(SVnodeMsg *pMsg) { - for (int32_t i = 0; i < pMsg->curNum; ++i) { - rpcFreeCont(pMsg->rpcMsg[i].pCont); - } - taosFreeQitem(pMsg); +int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { + vInfo("sync message is processed"); + return 0; } -void vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg, EVnMsgType msgType) { - switch (msgType) { - case VN_MSG_TYPE_WRITE: - break; - case VN_MSG_TYPE_APPLY: - break; - case VN_MSG_TYPE_SYNC: - break; - case VN_MSG_TYPE_QUERY: - break; - case VN_MSG_TYPE_FETCH: - break; - } -} \ No newline at end of file +int vnodeProcessConsumeReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { + vInfo("consume message is processed"); + return 0; +}