From 167657c1926a1959327887657c3d6d48be30029a Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 19 Apr 2022 09:07:42 +0000 Subject: [PATCH] refactor: sync --- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 57 +++++++++++---------- source/dnode/vnode/inc/vnode.h | 6 ++- source/dnode/vnode/src/inc/vnodeInt.h | 4 +- source/dnode/vnode/src/vnd/vnodeOpen.c | 2 + 4 files changed, 38 insertions(+), 31 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 754a174b6b..17347eb59a 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -97,7 +97,7 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { SVnodeObj *pVnode = pInfo->ahandle; - int64_t version; + SRpcMsg rsp; SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *)); if (pArray == NULL) { @@ -116,13 +116,15 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO } } +#if 1 + int64_t version; + vnodePreprocessWriteReqs(pVnode->pImpl, pArray, &version); numOfMsgs = taosArrayGetSize(pArray); for (int32_t i = 0; i < numOfMsgs; i++) { SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i); SRpcMsg *pRpc = &pMsg->rpcMsg; - SRpcMsg rsp; rsp.pCont = NULL; rsp.contLen = 0; @@ -132,20 +134,9 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO int32_t code = vnodeProcessWriteReq(pVnode->pImpl, pRpc, version++, &rsp); tmsgSendRsp(&rsp); - -#if 0 - if (pRsp != NULL) { - pRsp->ahandle = pRpc->ahandle; - taosMemoryFree(pRsp); - } else { - if (code != 0 && terrno != 0) code = terrno; - vmSendRsp(pVnode->pWrapper, pMsg, code); - } -#endif } - +#else // sync integration response - /* for (int i = 0; i < taosArrayGetSize(pArray); i++) { SNodeMsg *pMsg; SRpcMsg *pRpc; @@ -153,18 +144,18 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO pMsg = *(SNodeMsg **)taosArrayGet(pArray, i); pRpc = &pMsg->rpcMsg; - // set request version - void *pBuf = POINTER_SHIFT(pRpc->pCont, sizeof(SMsgHead)); - // int64_t ver = pVnode->pImpl->state.processed++; // ??????? - int64_t ver; - taosEncodeFixedI64(&pBuf, ver); + rsp.ahandle = pRpc->ahandle; + rsp.handle = pRpc->handle; + rsp.pCont = NULL; + rsp.contLen = 0; - int32_t ret = syncPropose(pVnode->pImpl->sync, pRpc, false); + int32_t ret = syncPropose(vnodeGetSyncHandle(pVnode->pImpl), pRpc, false); if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) { - // not leader - // send response + rsp.code = -1; + tmsgSendRsp(&rsp); } else if (ret == TAOS_SYNC_PROPOSE_OTHER_ERROR) { - // send response + rsp.code = -2; + tmsgSendRsp(&rsp); } else if (ret == TAOS_SYNC_PROPOSE_SUCCESS) { // ok // send response in applyQ @@ -172,7 +163,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO assert(0); } } - */ +#endif for (int32_t i = 0; i < numOfMsgs; i++) { SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i); @@ -187,13 +178,25 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { SVnodeObj *pVnode = pInfo->ahandle; SNodeMsg *pMsg = NULL; + SRpcMsg rsp; for (int32_t i = 0; i < numOfMsgs; ++i) { +#if 0 taosGetQitem(qall, (void **)&pMsg); - // todo - SRpcMsg *pRsp = NULL; - // (void)vnodeProcessWriteReq(pVnode->pImpl, &pMsg->rpcMsg, &pRsp); + if (pMsg->rpcMsg.handle != NULL && pMsg->rpcMsg.ahandle != NULL) { + rsp.ahandle = pMsg->rpcMsg.ahandle; + rsp.handle = pMsg->rpcMsg.handle; + rsp.code = 0; + rsp.pCont = NULL; + rsp.contLen = 0; + + if (vnodeProcessWriteReq(pVnode->pImpl, &pMsg->rpcMsg, &rsp) < 0) { + rsp.code = terrno; + tmsgSendRsp(&rsp); + } + } +#endif } } diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 4e42927be0..ff9f87d65e 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -21,10 +21,10 @@ #include "tqueue.h" #include "trpc.h" +#include "sync.h" #include "tarray.h" #include "tfs.h" #include "wal.h" -#include "sync.h" #include "tcommon.h" #include "tfs.h" @@ -61,6 +61,8 @@ int32_t vnodeSync(SVnode *pVnode); int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName); +int64_t vnodeGetSyncHandle(SVnode *pVnode); + // meta typedef struct SMeta SMeta; // todo: remove typedef struct SMTbCursor SMTbCursor; @@ -148,7 +150,7 @@ struct SVnodeCfg { bool isWeak; STsdbCfg tsdbCfg; SWalCfg walCfg; - SSyncCfg syncCfg; // sync integration + SSyncCfg syncCfg; // sync integration uint32_t hashBegin; uint32_t hashEnd; int8_t hashMethod; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 55b1473e4e..06ec03741c 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -54,8 +54,8 @@ typedef struct SQWorkerMgmt SQHandle; #define VNODE_META_DIR "meta" #define VNODE_TSDB_DIR "tsdb" -#define VNODE_TQ_DIR "tq" -#define VNODE_WAL_DIR "wal" +#define VNODE_TQ_DIR "tq" +#define VNODE_WAL_DIR "wal" typedef struct { int8_t streamType; // sma or other diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index b61bb48db3..ae5c99ff03 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -165,3 +165,5 @@ void vnodeClose(SVnode *pVnode) { taosMemoryFree(pVnode); } } + +int64_t vnodeGetSyncHandle(SVnode *pVnode) { return pVnode->sync; } \ No newline at end of file