diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index 1623d032ce..598d6e47be 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -813,19 +813,38 @@ static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) { } static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) { - SRpcMsg *pRpcMsg = NULL; - SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg)); + SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *)); for (int32_t i = 0; i < numOfMsgs; ++i) { - taosGetQitem(qall, (void **)&pRpcMsg); - - void *ptr = taosArrayPush(pArray, pRpcMsg); + SRpcMsg *pMsg = NULL; + taosGetQitem(qall, (void **)&pMsg); + void *ptr = taosArrayPush(pArray, &pMsg); assert(ptr != NULL); - - taosFreeQitem(pRpcMsg); } vnodeProcessWMsgs(pVnode->pImpl, pArray); + + for (size_t i = 0; i < numOfMsgs; i++) { + SRpcMsg *pRsp = NULL; + SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i); + int32_t code = vnodeApplyWMsg(pVnode->pImpl, pMsg, &pRsp); + if (pRsp != NULL) { + rpcSendResponse(pRsp); + free(pRsp); + } else { + if (code != 0) code = terrno; + SRpcMsg rpcRsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code}; + rpcSendResponse(&rpcRsp); + } + } + + for (size_t i = 0; i < numOfMsgs; i++) { + SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); + } + + taosArrayDestroy(pArray); } static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) { @@ -876,6 +895,7 @@ static int32_t dndWriteRpcMsgToVnodeQueue(taos_queue pQueue, SRpcMsg *pRpcMsg) { static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) { SMsgHead *pHead = (SMsgHead *)pMsg->pCont; + pHead->contLen = htonl(pHead->contLen); pHead->vgId = htonl(pHead->vgId); SVnodeObj *pVnode = dndAcquireVnode(pDnode, pHead->vgId); diff --git a/source/dnode/mgmt/impl/test/stb/stb.cpp b/source/dnode/mgmt/impl/test/stb/stb.cpp index 6641060d25..c12e8eadf4 100644 --- a/source/dnode/mgmt/impl/test/stb/stb.cpp +++ b/source/dnode/mgmt/impl/test/stb/stb.cpp @@ -270,8 +270,6 @@ TEST_F(DndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) { SRpcMsg* pMsg = pClient->pRsp; ASSERT_NE(pMsg, nullptr); ASSERT_EQ(pMsg->code, 0); - - taosMsleep(100000); } SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_STB, "show stables", 4, "1.d1"); diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index c0345e448e..a51312a2a9 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -344,7 +344,7 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj STransAction action = {0}; action.epSet = mndGetVgroupEpset(pMnode, pVgroup); action.pCont = pMsg; - action.contLen = sizeof(SCreateStbInternalMsg); + action.contLen = htonl(pMsg->head.contLen); action.msgType = TSDB_MSG_TYPE_CREATE_STB_IN; if (mndTransAppendRedoAction(pTrans, &action) != 0) { free(pMsg); diff --git a/source/dnode/vnode/impl/src/vnodeWrite.c b/source/dnode/vnode/impl/src/vnodeWrite.c index f564afd613..5ec03f1fd3 100644 --- a/source/dnode/vnode/impl/src/vnodeWrite.c +++ b/source/dnode/vnode/impl/src/vnodeWrite.c @@ -70,6 +70,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { vnodeParseReq(pMsg->pCont, &vReq, pMsg->msgType); switch (pMsg->msgType) { + case TSDB_MSG_TYPE_CREATE_STB_IN: case TSDB_MSG_TYPE_CREATE_TABLE: if (metaCreateTable(pVnode->pMeta, &(vReq.ctReq)) < 0) { // TODO: handle error @@ -77,6 +78,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { // TODO: maybe need to clear the requst struct break; + case TSDB_MSG_TYPE_DROP_STB_IN: case TSDB_MSG_TYPE_DROP_TABLE: if (metaDropTable(pVnode->pMeta, vReq.dtReq.uid) < 0) { // TODO: handle error