commit
fe5a6dab4a
|
@ -202,6 +202,17 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
|||
return code;
|
||||
}
|
||||
|
||||
code = vnodeStart(pImpl);
|
||||
if (code != 0) {
|
||||
tFreeSCreateVnodeReq(&createReq);
|
||||
dError("vgId:%d, failed to start sync since %s", createReq.vgId, terrstr());
|
||||
vnodeClose(pImpl);
|
||||
vnodeDestroy(path, pMgmt->pTfs);
|
||||
terrno = code;
|
||||
return code;
|
||||
}
|
||||
|
||||
|
||||
code = vmWriteVnodesToFile(pMgmt);
|
||||
if (code != 0) {
|
||||
tFreeSCreateVnodeReq(&createReq);
|
||||
|
|
|
@ -74,12 +74,6 @@ int32_t vmOpenVnode(SVnodesMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
// sync integration
|
||||
vnodeSyncSetQ(pImpl, NULL);
|
||||
vnodeSyncSetRpc(pImpl, NULL);
|
||||
int32_t ret = vnodeSyncStart(pImpl);
|
||||
assert(ret == 0);
|
||||
|
||||
taosWLockLatch(&pMgmt->latch);
|
||||
int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
|
||||
taosWUnLockLatch(&pMgmt->latch);
|
||||
|
@ -153,6 +147,7 @@ static void *vmOpenVnodeFunc(void *param) {
|
|||
pThread->failed++;
|
||||
} else {
|
||||
vmOpenVnode(pMgmt, pCfg, pImpl);
|
||||
//vnodeStart(pImpl);
|
||||
dDebug("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
|
||||
pThread->opened++;
|
||||
}
|
||||
|
@ -364,10 +359,52 @@ static int32_t vmRequire(SMgmtWrapper *pWrapper, bool *required) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t vmStart(SMgmtWrapper *pWrapper) {
|
||||
dDebug("vnode-mgmt start to run");
|
||||
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
|
||||
|
||||
taosRLockLatch(&pMgmt->latch);
|
||||
|
||||
void *pIter = taosHashIterate(pMgmt->hash, NULL);
|
||||
while (pIter) {
|
||||
SVnodeObj **ppVnode = pIter;
|
||||
if (ppVnode == NULL || *ppVnode == NULL) continue;
|
||||
|
||||
SVnodeObj *pVnode = *ppVnode;
|
||||
vnodeStart(pVnode->pImpl);
|
||||
pIter = taosHashIterate(pMgmt->hash, pIter);
|
||||
}
|
||||
|
||||
taosRUnLockLatch(&pMgmt->latch);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void vmStop(SMgmtWrapper *pWrapper) {
|
||||
#if 0
|
||||
dDebug("vnode-mgmt start to stop");
|
||||
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
|
||||
taosRLockLatch(&pMgmt->latch);
|
||||
|
||||
void *pIter = taosHashIterate(pMgmt->hash, NULL);
|
||||
while (pIter) {
|
||||
SVnodeObj **ppVnode = pIter;
|
||||
if (ppVnode == NULL || *ppVnode == NULL) continue;
|
||||
|
||||
SVnodeObj *pVnode = *ppVnode;
|
||||
vnodeStop(pVnode->pImpl);
|
||||
pIter = taosHashIterate(pMgmt->hash, pIter);
|
||||
}
|
||||
|
||||
taosRUnLockLatch(&pMgmt->latch);
|
||||
#endif
|
||||
}
|
||||
|
||||
void vmSetMgmtFp(SMgmtWrapper *pWrapper) {
|
||||
SMgmtFp mgmtFp = {0};
|
||||
mgmtFp.openFp = vmInit;
|
||||
mgmtFp.closeFp = vmCleanup;
|
||||
mgmtFp.startFp = vmStart;
|
||||
mgmtFp.stopFp = vmStop;
|
||||
mgmtFp.requiredFp = vmRequire;
|
||||
|
||||
vmInitMsgHandle(pWrapper);
|
||||
|
@ -396,4 +433,4 @@ void vmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo) {
|
|||
}
|
||||
|
||||
taosRUnLockLatch(&pMgmt->latch);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -61,6 +61,9 @@ int32_t vnodeSync(SVnode *pVnode);
|
|||
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
|
||||
int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName);
|
||||
|
||||
int32_t vnodeStart(SVnode *pVnode);
|
||||
void vnodeStop(SVnode *pVnode);
|
||||
|
||||
int64_t vnodeGetSyncHandle(SVnode *pVnode);
|
||||
void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot);
|
||||
|
||||
|
@ -171,11 +174,6 @@ typedef struct {
|
|||
uint64_t uid;
|
||||
} STableKeyInfo;
|
||||
|
||||
// sync integration
|
||||
void vnodeSyncSetQ(SVnode *pVnode, void *qHandle);
|
||||
void vnodeSyncSetRpc(SVnode *pVnode, void *rpcHandle);
|
||||
int32_t vnodeSyncStart(SVnode *pVnode);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
*/
|
||||
|
||||
#include "vnodeInt.h"
|
||||
#include "vnodeSync.h"
|
||||
|
||||
int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
|
||||
SVnodeInfo info = {0};
|
||||
|
@ -171,6 +172,16 @@ void vnodeClose(SVnode *pVnode) {
|
|||
}
|
||||
}
|
||||
|
||||
// start the sync timer after the queue is ready
|
||||
int32_t vnodeStart(SVnode *pVnode) {
|
||||
vnodeSyncSetQ(pVnode, NULL);
|
||||
vnodeSyncSetRpc(pVnode, NULL);
|
||||
vnodeSyncStart(pVnode);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void vnodeStop(SVnode *pVnode) {}
|
||||
|
||||
int64_t vnodeGetSyncHandle(SVnode *pVnode) { return pVnode->sync; }
|
||||
|
||||
void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot) { pSnapshot->lastApplyIndex = pVnode->state.committed; }
|
Loading…
Reference in New Issue