add apply callback

This commit is contained in:
Shengliang Guan 2021-11-04 14:18:03 +08:00
parent 2f09d49aa4
commit 318c3b7cf7
3 changed files with 31 additions and 4 deletions

View File

@ -55,13 +55,19 @@ typedef enum {
VN_MSG_TYPE_FETCH
} EVMType;
typedef struct SVnodeMsg {
typedef struct {
int32_t curNum;
int32_t allocNum;
SRpcMsg rpcMsg[];
} SVnodeMsg;
int32_t vnodeInit();
typedef struct {
void (*SendMsgToDnode)(SEpSet *pEpSet, SRpcMsg *pMsg);
void (*SendMsgToMnode)(SRpcMsg *pMsg);
int32_t (*PutMsgIntoApplyQueue)(int32_t vgId, SVnodeMsg *pMsg);
} SVnodePara;
int32_t vnodeInit(SVnodePara);
void vnodeCleanup();
SVnode *vnodeOpen(int32_t vgId, const char *path);

View File

@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE
#include "dnodeVnodes.h"
#include "dnodeTransport.h"
#include "thash.h"
#include "tqueue.h"
#include "tstep.h"
@ -666,6 +667,17 @@ void dnodeProcessVnodeFetchMsg(SRpcMsg *pMsg, SEpSet *pEpSet) {
}
}
static int32_t dnodePutMsgIntoVnodeApplyQueue(int32_t vgId, SVnodeMsg *pMsg) {
SVnodeObj *pVnode = dnodeAcquireVnode(vgId);
if (pVnode == NULL) {
return terrno;
}
int32_t code = taosWriteQitem(pVnode->pApplyQ, pMsg);
dnodeReleaseVnode(pVnode);
return code;
}
static int32_t dnodeInitVnodeMgmtWorker() {
SWorkerPool *pPool = &tsVnodes.mgmtPool;
pPool->name = "vnode-mgmt";
@ -811,11 +823,20 @@ static int32_t dnodeInitVnodeSyncWorker() {
static void dnodeCleanupVnodeSyncWorker() { tMWorkerCleanup(&tsVnodes.syncPool); }
static int32_t dnodeInitVnodeModule() {
SVnodePara para;
para.SendMsgToDnode = dnodeSendMsgToDnode;
para.SendMsgToMnode = dnodeSendMsgToMnode;
para.PutMsgIntoApplyQueue = dnodePutMsgIntoVnodeApplyQueue;
return vnodeInit(para);
}
int32_t dnodeInitVnodes() {
dInfo("dnode-vnodes start to init");
SSteps *pSteps = taosStepInit(3, dnodeReportStartup);
taosStepAdd(pSteps, "dnode-vnode-env", vnodeInit, vnodeCleanup);
taosStepAdd(pSteps, "dnode-vnode-env", dnodeInitVnodeModule, vnodeCleanup);
taosStepAdd(pSteps, "dnode-vnode-mgmt", dnodeInitVnodeMgmtWorker, dnodeCleanupVnodeMgmtWorker);
taosStepAdd(pSteps, "dnode-vnode-read", dnodeInitVnodeReadWorker, dnodeCleanupVnodeReadWorker);
taosStepAdd(pSteps, "dnode-vnode-write", dnodeInitVnodeWriteWorker, dnodeCleanupVnodeWriteWorker);

View File

@ -17,7 +17,7 @@
#include "vnodeInt.h"
#include "tqueue.h"
int32_t vnodeInit() { return 0; }
int32_t vnodeInit(SVnodePara para) { return 0; }
void vnodeCleanup() {}
SVnode *vnodeOpen(int32_t vgId, const char *path) { return NULL; }