diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index cde73e6fa0..48bae163d9 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -73,6 +73,12 @@ int32_t vmOpenVnode(SVnodesMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { return -1; } + // sync integration + vnodeSyncSetQ(pImpl, NULL); + vnodeSyncSetRpc(pImpl, NULL); + int32_t ret = vnodeSyncStart(pImpl); + assert(ret == 0); + taosWLockLatch(&pMgmt->latch); int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *)); taosWUnLockLatch(&pMgmt->latch); @@ -305,6 +311,12 @@ static int32_t vmInit(SMgmtWrapper *pWrapper) { goto _OVER; } + // sync integration + if (syncInit() != 0) { + dError("failed to open sync since %s", terrstr()); + return -1; + } + if (vnodeInit(tsNumOfCommitThreads) != 0) { dError("failed to init vnode since %s", terrstr()); goto _OVER; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index ff9f87d65e..00bcec7be9 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -170,6 +170,11 @@ typedef struct { uint64_t uid; } STableKeyInfo; +// sync integration +void vnodeSyncSetQ(SVnode *pVnode, void *qHandle); +void vnodeSyncSetRpc(SVnode *pVnode, void *rpcHandle); +int32_t vnodeSyncStart(SVnode *pVnode); + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/inc/vnodeSync.h b/source/dnode/vnode/src/inc/vnodeSync.h index a3612bec92..6bbdc4ae72 100644 --- a/source/dnode/vnode/src/inc/vnodeSync.h +++ b/source/dnode/vnode/src/inc/vnodeSync.h @@ -24,6 +24,9 @@ int32_t vnodeSyncOpen(SVnode *pVnode); int32_t vnodeSyncStart(SVnode *pVnode); void vnodeSyncClose(SVnode *pVnode); +void vnodeSyncSetQ(SVnode *pVnode, void *qHandle); +void vnodeSyncSetRpc(SVnode *pVnode, void *rpcHandle); + int32_t vnodeSyncEqMsg(void *qHandle, SRpcMsg *pMsg); int32_t vnodeSendMsg(void *rpcHandle, const SEpSet *pEpSet, SRpcMsg *pMsg);