diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 8338d20206..777f9eb36e 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -122,17 +122,26 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO for (int32_t i = 0; i < numOfMsgs; i++) { SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i); SRpcMsg *pRpc = &pMsg->rpcMsg; - SRpcMsg *pRsp = NULL; + SRpcMsg rsp; - int32_t code = vnodeProcessWriteReq(pVnode->pImpl, pRpc, version++, &pRsp); + rsp.pCont = NULL; + rsp.contLen = 0; + rsp.code = 0; + rsp.handle = pRpc->handle; + rsp.ahandle = pRpc->ahandle; + + int32_t code = vnodeProcessWriteReq(pVnode->pImpl, pRpc, version++, &rsp); + tmsgSendRsp(&rsp); + +#if 0 if (pRsp != NULL) { pRsp->ahandle = pRpc->ahandle; - tmsgSendRsp(pRsp); taosMemoryFree(pRsp); } else { if (code != 0 && terrno != 0) code = terrno; vmSendRsp(pVnode->pWrapper, pMsg, code); } +#endif } for (int32_t i = 0; i < numOfMsgs; i++) { diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 2a9f540ffb..834d11fc20 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -49,7 +49,7 @@ void vnodeDestroy(const char *path, STfs *pTfs); SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb); void vnodeClose(SVnode *pVnode); int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version); -int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg **pRsp); +int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp); int vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index e7a3e1b29c..603b92d4a0 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -16,7 +16,7 @@ #include "vnodeInt.h" static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq); -static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SRpcMsg **pRsp); +static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SRpcMsg *pRsp); static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq); static int vnodeProcessSubmitReq(SVnode *pVnode, SSubmitReq *pSubmitReq, SRpcMsg *pRsp); @@ -41,7 +41,7 @@ int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version) { return 0; } -int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg **pRsp) { +int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) { void *ptr = NULL; int ret; @@ -65,6 +65,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg ret = vnodeProcessCreateStbReq(pVnode, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))); return 0; case TDMT_VND_CREATE_TABLE: + pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP; return vnodeProcessCreateTbReq(pVnode, pMsg, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pRsp); case TDMT_VND_ALTER_STB: return vnodeProcessAlterStbReq(pVnode, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))); @@ -74,14 +75,8 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg case TDMT_VND_DROP_TABLE: break; case TDMT_VND_SUBMIT: - /*printf("vnode %d write data %ld\n", TD_VID(pVnode), ver);*/ - if (pVnode->config.streamMode == 0) { - *pRsp = taosMemoryCalloc(1, sizeof(SRpcMsg)); - (*pRsp)->handle = pMsg->handle; - (*pRsp)->ahandle = pMsg->ahandle; - return vnodeProcessSubmitReq(pVnode, ptr, *pRsp); - } - break; + pRsp->msgType = TDMT_VND_SUBMIT_RSP; + return vnodeProcessSubmitReq(pVnode, ptr, pRsp); case TDMT_VND_MQ_SET_CONN: { if (tqProcessSetConnReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) { // TODO: handle error @@ -218,7 +213,7 @@ static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq) { return 0; } -static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SRpcMsg **pRsp) { +static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SRpcMsg *pRsp) { SVCreateTbBatchReq vCreateTbBatchReq = {0}; SVCreateTbBatchRsp vCreateTbBatchRsp = {0}; tDeserializeSVCreateTbBatchReq(pReq, &vCreateTbBatchReq); @@ -270,12 +265,8 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR tSerializeSVCreateTbBatchRsp(msg, contLen, &vCreateTbBatchRsp); taosArrayDestroy(vCreateTbBatchRsp.rspList); - *pRsp = taosMemoryCalloc(1, sizeof(SRpcMsg)); - (*pRsp)->msgType = TDMT_VND_CREATE_TABLE_RSP; - (*pRsp)->pCont = msg; - (*pRsp)->contLen = contLen; - (*pRsp)->handle = pMsg->handle; - (*pRsp)->ahandle = pMsg->ahandle; + pRsp->pCont = msg; + pRsp->contLen = contLen; } return 0; @@ -308,7 +299,6 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, SSubmitReq *pSubmitReq, SRpcMsg } // encode the response (TODO) - pRsp->msgType = TDMT_VND_SUBMIT_RSP; pRsp->pCont = rpcMallocCont(sizeof(SSubmitRsp)); memcpy(pRsp->pCont, &rsp, sizeof(rsp)); pRsp->contLen = sizeof(SSubmitRsp);