From 7895c492bac52507cb944621fd324008f801d5bf Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 19 Apr 2022 02:37:45 +0000 Subject: [PATCH 1/2] refact vnod3 --- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 7 ++++--- source/dnode/vnode/inc/vnode.h | 4 ++-- source/dnode/vnode/src/vnd/vnodeSvr.c | 22 +++++++++------------ 3 files changed, 15 insertions(+), 18 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 2fb29ce944..8338d20206 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -97,6 +97,7 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { SVnodeObj *pVnode = pInfo->ahandle; + int64_t version; SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *)); if (pArray == NULL) { @@ -115,7 +116,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO } } - vnodePreprocessWriteReqs(pVnode->pImpl, pArray); + vnodePreprocessWriteReqs(pVnode->pImpl, pArray, &version); numOfMsgs = taosArrayGetSize(pArray); for (int32_t i = 0; i < numOfMsgs; i++) { @@ -123,7 +124,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO SRpcMsg *pRpc = &pMsg->rpcMsg; SRpcMsg *pRsp = NULL; - int32_t code = vnodeProcessWriteReq(pVnode->pImpl, pRpc, &pRsp); + int32_t code = vnodeProcessWriteReq(pVnode->pImpl, pRpc, version++, &pRsp); if (pRsp != NULL) { pRsp->ahandle = pRpc->ahandle; tmsgSendRsp(pRsp); @@ -153,7 +154,7 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO // todo SRpcMsg *pRsp = NULL; - (void)vnodeProcessWriteReq(pVnode->pImpl, &pMsg->rpcMsg, &pRsp); + // (void)vnodeProcessWriteReq(pVnode->pImpl, &pMsg->rpcMsg, &pRsp); } } diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 12e22154e8..2a9f540ffb 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -48,8 +48,8 @@ int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs); void vnodeDestroy(const char *path, STfs *pTfs); SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb); void vnodeClose(SVnode *pVnode); -void vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs); -int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); +int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version); +int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg **pRsp); int vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 928a2d4e4b..e7a3e1b29c 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -20,30 +20,28 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq); static int vnodeProcessSubmitReq(SVnode *pVnode, SSubmitReq *pSubmitReq, SRpcMsg *pRsp); -void vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs) { +int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version) { SNodeMsg *pMsg; SRpcMsg *pRpc; + *version = pVnode->state.processed; for (int i = 0; i < taosArrayGetSize(pMsgs); i++) { pMsg = *(SNodeMsg **)taosArrayGet(pMsgs, i); pRpc = &pMsg->rpcMsg; // set request version - void *pBuf = POINTER_SHIFT(pRpc->pCont, sizeof(SMsgHead)); - int64_t ver = pVnode->state.processed++; - taosEncodeFixedI64(&pBuf, ver); - - if (walWrite(pVnode->pWal, ver, pRpc->msgType, pRpc->pCont, pRpc->contLen) < 0) { - // TODO: handle error - /*ASSERT(false);*/ + if (walWrite(pVnode->pWal, pVnode->state.processed++, pRpc->msgType, pRpc->pCont, pRpc->contLen) < 0) { vError("vnode:%d write wal error since %s", TD_VID(pVnode), terrstr()); + return -1; } } walFsync(pVnode->pWal, false); + + return 0; } -int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { +int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg **pRsp) { void *ptr = NULL; int ret; @@ -58,9 +56,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { } // todo: change the interface here - int64_t ver; - taosDecodeFixedI64(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &ver); - if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, ver) < 0) { + if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) { // TODO: handle error } @@ -128,7 +124,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { break; } - pVnode->state.applied = ver; + pVnode->state.applied = version; // Check if it needs to commit if (vnodeShouldCommit(pVnode)) { From f8d1f5a813fa80ecc9d8e9052ab7630e06c084cb Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 19 Apr 2022 03:17:23 +0000 Subject: [PATCH 2/2] refact vnode --- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 15 +++++++++--- source/dnode/vnode/inc/vnode.h | 2 +- source/dnode/vnode/src/vnd/vnodeSvr.c | 26 +++++++-------------- 3 files changed, 21 insertions(+), 22 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 8338d20206..777f9eb36e 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -122,17 +122,26 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO for (int32_t i = 0; i < numOfMsgs; i++) { SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i); SRpcMsg *pRpc = &pMsg->rpcMsg; - SRpcMsg *pRsp = NULL; + SRpcMsg rsp; - int32_t code = vnodeProcessWriteReq(pVnode->pImpl, pRpc, version++, &pRsp); + rsp.pCont = NULL; + rsp.contLen = 0; + rsp.code = 0; + rsp.handle = pRpc->handle; + rsp.ahandle = pRpc->ahandle; + + int32_t code = vnodeProcessWriteReq(pVnode->pImpl, pRpc, version++, &rsp); + tmsgSendRsp(&rsp); + +#if 0 if (pRsp != NULL) { pRsp->ahandle = pRpc->ahandle; - tmsgSendRsp(pRsp); taosMemoryFree(pRsp); } else { if (code != 0 && terrno != 0) code = terrno; vmSendRsp(pVnode->pWrapper, pMsg, code); } +#endif } for (int32_t i = 0; i < numOfMsgs; i++) { diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 2a9f540ffb..834d11fc20 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -49,7 +49,7 @@ void vnodeDestroy(const char *path, STfs *pTfs); SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb); void vnodeClose(SVnode *pVnode); int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version); -int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg **pRsp); +int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp); int vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index e7a3e1b29c..603b92d4a0 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -16,7 +16,7 @@ #include "vnodeInt.h" static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq); -static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SRpcMsg **pRsp); +static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SRpcMsg *pRsp); static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq); static int vnodeProcessSubmitReq(SVnode *pVnode, SSubmitReq *pSubmitReq, SRpcMsg *pRsp); @@ -41,7 +41,7 @@ int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version) { return 0; } -int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg **pRsp) { +int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) { void *ptr = NULL; int ret; @@ -65,6 +65,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg ret = vnodeProcessCreateStbReq(pVnode, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))); return 0; case TDMT_VND_CREATE_TABLE: + pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP; return vnodeProcessCreateTbReq(pVnode, pMsg, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pRsp); case TDMT_VND_ALTER_STB: return vnodeProcessAlterStbReq(pVnode, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))); @@ -74,14 +75,8 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg case TDMT_VND_DROP_TABLE: break; case TDMT_VND_SUBMIT: - /*printf("vnode %d write data %ld\n", TD_VID(pVnode), ver);*/ - if (pVnode->config.streamMode == 0) { - *pRsp = taosMemoryCalloc(1, sizeof(SRpcMsg)); - (*pRsp)->handle = pMsg->handle; - (*pRsp)->ahandle = pMsg->ahandle; - return vnodeProcessSubmitReq(pVnode, ptr, *pRsp); - } - break; + pRsp->msgType = TDMT_VND_SUBMIT_RSP; + return vnodeProcessSubmitReq(pVnode, ptr, pRsp); case TDMT_VND_MQ_SET_CONN: { if (tqProcessSetConnReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) { // TODO: handle error @@ -218,7 +213,7 @@ static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq) { return 0; } -static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SRpcMsg **pRsp) { +static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SRpcMsg *pRsp) { SVCreateTbBatchReq vCreateTbBatchReq = {0}; SVCreateTbBatchRsp vCreateTbBatchRsp = {0}; tDeserializeSVCreateTbBatchReq(pReq, &vCreateTbBatchReq); @@ -270,12 +265,8 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR tSerializeSVCreateTbBatchRsp(msg, contLen, &vCreateTbBatchRsp); taosArrayDestroy(vCreateTbBatchRsp.rspList); - *pRsp = taosMemoryCalloc(1, sizeof(SRpcMsg)); - (*pRsp)->msgType = TDMT_VND_CREATE_TABLE_RSP; - (*pRsp)->pCont = msg; - (*pRsp)->contLen = contLen; - (*pRsp)->handle = pMsg->handle; - (*pRsp)->ahandle = pMsg->ahandle; + pRsp->pCont = msg; + pRsp->contLen = contLen; } return 0; @@ -308,7 +299,6 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, SSubmitReq *pSubmitReq, SRpcMsg } // encode the response (TODO) - pRsp->msgType = TDMT_VND_SUBMIT_RSP; pRsp->pCont = rpcMallocCont(sizeof(SSubmitRsp)); memcpy(pRsp->pCont, &rsp, sizeof(rsp)); pRsp->contLen = sizeof(SSubmitRsp);