From 318c3b7cf78915363a2c4c9610eb7fb8cc4ccb90 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 4 Nov 2021 14:18:03 +0800 Subject: [PATCH] add apply callback --- include/server/vnode/vnode.h | 10 ++++++++-- source/dnode/mgmt/src/dnodeVnodes.c | 23 ++++++++++++++++++++++- source/dnode/vnode/impl/src/vnodeInt.c | 2 +- 3 files changed, 31 insertions(+), 4 deletions(-) diff --git a/include/server/vnode/vnode.h b/include/server/vnode/vnode.h index ca0706f0b9..6db0eb86f6 100644 --- a/include/server/vnode/vnode.h +++ b/include/server/vnode/vnode.h @@ -55,13 +55,19 @@ typedef enum { VN_MSG_TYPE_FETCH } EVMType; -typedef struct SVnodeMsg { +typedef struct { int32_t curNum; int32_t allocNum; SRpcMsg rpcMsg[]; } SVnodeMsg; -int32_t vnodeInit(); +typedef struct { + void (*SendMsgToDnode)(SEpSet *pEpSet, SRpcMsg *pMsg); + void (*SendMsgToMnode)(SRpcMsg *pMsg); + int32_t (*PutMsgIntoApplyQueue)(int32_t vgId, SVnodeMsg *pMsg); +} SVnodePara; + +int32_t vnodeInit(SVnodePara); void vnodeCleanup(); SVnode *vnodeOpen(int32_t vgId, const char *path); diff --git a/source/dnode/mgmt/src/dnodeVnodes.c b/source/dnode/mgmt/src/dnodeVnodes.c index 72048a15a5..46b79c84a0 100644 --- a/source/dnode/mgmt/src/dnodeVnodes.c +++ b/source/dnode/mgmt/src/dnodeVnodes.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "dnodeVnodes.h" +#include "dnodeTransport.h" #include "thash.h" #include "tqueue.h" #include "tstep.h" @@ -666,6 +667,17 @@ void dnodeProcessVnodeFetchMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { } } +static int32_t dnodePutMsgIntoVnodeApplyQueue(int32_t vgId, SVnodeMsg *pMsg) { + SVnodeObj *pVnode = dnodeAcquireVnode(vgId); + if (pVnode == NULL) { + return terrno; + } + + int32_t code = taosWriteQitem(pVnode->pApplyQ, pMsg); + dnodeReleaseVnode(pVnode); + return code; +} + static int32_t dnodeInitVnodeMgmtWorker() { SWorkerPool *pPool = &tsVnodes.mgmtPool; pPool->name = "vnode-mgmt"; @@ -811,11 +823,20 @@ static int32_t dnodeInitVnodeSyncWorker() { static void dnodeCleanupVnodeSyncWorker() { tMWorkerCleanup(&tsVnodes.syncPool); } +static int32_t dnodeInitVnodeModule() { + SVnodePara para; + para.SendMsgToDnode = dnodeSendMsgToDnode; + para.SendMsgToMnode = dnodeSendMsgToMnode; + para.PutMsgIntoApplyQueue = dnodePutMsgIntoVnodeApplyQueue; + + return vnodeInit(para); +} + int32_t dnodeInitVnodes() { dInfo("dnode-vnodes start to init"); SSteps *pSteps = taosStepInit(3, dnodeReportStartup); - taosStepAdd(pSteps, "dnode-vnode-env", vnodeInit, vnodeCleanup); + taosStepAdd(pSteps, "dnode-vnode-env", dnodeInitVnodeModule, vnodeCleanup); taosStepAdd(pSteps, "dnode-vnode-mgmt", dnodeInitVnodeMgmtWorker, dnodeCleanupVnodeMgmtWorker); taosStepAdd(pSteps, "dnode-vnode-read", dnodeInitVnodeReadWorker, dnodeCleanupVnodeReadWorker); taosStepAdd(pSteps, "dnode-vnode-write", dnodeInitVnodeWriteWorker, dnodeCleanupVnodeWriteWorker); diff --git a/source/dnode/vnode/impl/src/vnodeInt.c b/source/dnode/vnode/impl/src/vnodeInt.c index c345f2e1b9..e08cc47aa1 100644 --- a/source/dnode/vnode/impl/src/vnodeInt.c +++ b/source/dnode/vnode/impl/src/vnodeInt.c @@ -17,7 +17,7 @@ #include "vnodeInt.h" #include "tqueue.h" -int32_t vnodeInit() { return 0; } +int32_t vnodeInit(SVnodePara para) { return 0; } void vnodeCleanup() {} SVnode *vnodeOpen(int32_t vgId, const char *path) { return NULL; }