From 7895c492bac52507cb944621fd324008f801d5bf Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 19 Apr 2022 02:37:45 +0000 Subject: [PATCH] refact vnod3 --- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 7 ++++--- source/dnode/vnode/inc/vnode.h | 4 ++-- source/dnode/vnode/src/vnd/vnodeSvr.c | 22 +++++++++------------ 3 files changed, 15 insertions(+), 18 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 2fb29ce944..8338d20206 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -97,6 +97,7 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { SVnodeObj *pVnode = pInfo->ahandle; + int64_t version; SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *)); if (pArray == NULL) { @@ -115,7 +116,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO } } - vnodePreprocessWriteReqs(pVnode->pImpl, pArray); + vnodePreprocessWriteReqs(pVnode->pImpl, pArray, &version); numOfMsgs = taosArrayGetSize(pArray); for (int32_t i = 0; i < numOfMsgs; i++) { @@ -123,7 +124,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO SRpcMsg *pRpc = &pMsg->rpcMsg; SRpcMsg *pRsp = NULL; - int32_t code = vnodeProcessWriteReq(pVnode->pImpl, pRpc, &pRsp); + int32_t code = vnodeProcessWriteReq(pVnode->pImpl, pRpc, version++, &pRsp); if (pRsp != NULL) { pRsp->ahandle = pRpc->ahandle; tmsgSendRsp(pRsp); @@ -153,7 +154,7 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO // todo SRpcMsg *pRsp = NULL; - (void)vnodeProcessWriteReq(pVnode->pImpl, &pMsg->rpcMsg, &pRsp); + // (void)vnodeProcessWriteReq(pVnode->pImpl, &pMsg->rpcMsg, &pRsp); } } diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 12e22154e8..2a9f540ffb 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -48,8 +48,8 @@ int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs); void vnodeDestroy(const char *path, STfs *pTfs); SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb); void vnodeClose(SVnode *pVnode); -void vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs); -int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); +int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version); +int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg **pRsp); int vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 928a2d4e4b..e7a3e1b29c 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -20,30 +20,28 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq); static int vnodeProcessSubmitReq(SVnode *pVnode, SSubmitReq *pSubmitReq, SRpcMsg *pRsp); -void vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs) { +int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version) { SNodeMsg *pMsg; SRpcMsg *pRpc; + *version = pVnode->state.processed; for (int i = 0; i < taosArrayGetSize(pMsgs); i++) { pMsg = *(SNodeMsg **)taosArrayGet(pMsgs, i); pRpc = &pMsg->rpcMsg; // set request version - void *pBuf = POINTER_SHIFT(pRpc->pCont, sizeof(SMsgHead)); - int64_t ver = pVnode->state.processed++; - taosEncodeFixedI64(&pBuf, ver); - - if (walWrite(pVnode->pWal, ver, pRpc->msgType, pRpc->pCont, pRpc->contLen) < 0) { - // TODO: handle error - /*ASSERT(false);*/ + if (walWrite(pVnode->pWal, pVnode->state.processed++, pRpc->msgType, pRpc->pCont, pRpc->contLen) < 0) { vError("vnode:%d write wal error since %s", TD_VID(pVnode), terrstr()); + return -1; } } walFsync(pVnode->pWal, false); + + return 0; } -int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { +int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg **pRsp) { void *ptr = NULL; int ret; @@ -58,9 +56,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { } // todo: change the interface here - int64_t ver; - taosDecodeFixedI64(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &ver); - if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, ver) < 0) { + if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) { // TODO: handle error } @@ -128,7 +124,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { break; } - pVnode->state.applied = ver; + pVnode->state.applied = version; // Check if it needs to commit if (vnodeShouldCommit(pVnode)) {