fix(raft): crash while start vnode
This commit is contained in:
parent
758a155c92
commit
8864a9ed59
|
@ -74,12 +74,6 @@ int32_t vmOpenVnode(SVnodesMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// sync integration
|
|
||||||
vnodeSyncSetQ(pImpl, NULL);
|
|
||||||
vnodeSyncSetRpc(pImpl, NULL);
|
|
||||||
int32_t ret = vnodeSyncStart(pImpl);
|
|
||||||
assert(ret == 0);
|
|
||||||
|
|
||||||
taosWLockLatch(&pMgmt->latch);
|
taosWLockLatch(&pMgmt->latch);
|
||||||
int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
|
int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
|
||||||
taosWUnLockLatch(&pMgmt->latch);
|
taosWUnLockLatch(&pMgmt->latch);
|
||||||
|
@ -153,6 +147,7 @@ static void *vmOpenVnodeFunc(void *param) {
|
||||||
pThread->failed++;
|
pThread->failed++;
|
||||||
} else {
|
} else {
|
||||||
vmOpenVnode(pMgmt, pCfg, pImpl);
|
vmOpenVnode(pMgmt, pCfg, pImpl);
|
||||||
|
vnodeStart(pImpl);
|
||||||
dDebug("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
|
dDebug("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
|
||||||
pThread->opened++;
|
pThread->opened++;
|
||||||
}
|
}
|
||||||
|
@ -364,10 +359,50 @@ static int32_t vmRequire(SMgmtWrapper *pWrapper, bool *required) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t vmStart(SMgmtWrapper *pWrapper) {
|
||||||
|
dDebug("vnode-mgmt start to run");
|
||||||
|
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
|
||||||
|
|
||||||
|
taosRLockLatch(&pMgmt->latch);
|
||||||
|
|
||||||
|
void *pIter = taosHashIterate(pMgmt->hash, NULL);
|
||||||
|
while (pIter) {
|
||||||
|
SVnodeObj **ppVnode = pIter;
|
||||||
|
if (ppVnode == NULL || *ppVnode == NULL) continue;
|
||||||
|
|
||||||
|
SVnodeObj *pVnode = *ppVnode;
|
||||||
|
vnodeStart(pVnode->pImpl);
|
||||||
|
pIter = taosHashIterate(pMgmt->hash, pIter);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosRUnLockLatch(&pMgmt->latch);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void vmStop(SMgmtWrapper *pWrapper) {
|
||||||
|
dDebug("vnode-mgmt start to stop");
|
||||||
|
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
|
||||||
|
taosRLockLatch(&pMgmt->latch);
|
||||||
|
|
||||||
|
void *pIter = taosHashIterate(pMgmt->hash, NULL);
|
||||||
|
while (pIter) {
|
||||||
|
SVnodeObj **ppVnode = pIter;
|
||||||
|
if (ppVnode == NULL || *ppVnode == NULL) continue;
|
||||||
|
|
||||||
|
SVnodeObj *pVnode = *ppVnode;
|
||||||
|
vnodeStop(pVnode->pImpl);
|
||||||
|
pIter = taosHashIterate(pMgmt->hash, pIter);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosRUnLockLatch(&pMgmt->latch);
|
||||||
|
}
|
||||||
|
|
||||||
void vmSetMgmtFp(SMgmtWrapper *pWrapper) {
|
void vmSetMgmtFp(SMgmtWrapper *pWrapper) {
|
||||||
SMgmtFp mgmtFp = {0};
|
SMgmtFp mgmtFp = {0};
|
||||||
mgmtFp.openFp = vmInit;
|
mgmtFp.openFp = vmInit;
|
||||||
mgmtFp.closeFp = vmCleanup;
|
mgmtFp.closeFp = vmCleanup;
|
||||||
|
mgmtFp.startFp = vmStart;
|
||||||
|
mgmtFp.stopFp = vmStop;
|
||||||
mgmtFp.requiredFp = vmRequire;
|
mgmtFp.requiredFp = vmRequire;
|
||||||
|
|
||||||
vmInitMsgHandle(pWrapper);
|
vmInitMsgHandle(pWrapper);
|
||||||
|
|
|
@ -61,6 +61,9 @@ int32_t vnodeSync(SVnode *pVnode);
|
||||||
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
|
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
|
||||||
int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName);
|
int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName);
|
||||||
|
|
||||||
|
int32_t vnodeStart(SVnode *pVnode);
|
||||||
|
void vnodeStop(SVnode *pVnode);
|
||||||
|
|
||||||
int64_t vnodeGetSyncHandle(SVnode *pVnode);
|
int64_t vnodeGetSyncHandle(SVnode *pVnode);
|
||||||
void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot);
|
void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot);
|
||||||
|
|
||||||
|
@ -171,11 +174,6 @@ typedef struct {
|
||||||
uint64_t uid;
|
uint64_t uid;
|
||||||
} STableKeyInfo;
|
} STableKeyInfo;
|
||||||
|
|
||||||
// sync integration
|
|
||||||
void vnodeSyncSetQ(SVnode *pVnode, void *qHandle);
|
|
||||||
void vnodeSyncSetRpc(SVnode *pVnode, void *rpcHandle);
|
|
||||||
int32_t vnodeSyncStart(SVnode *pVnode);
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "vnodeInt.h"
|
#include "vnodeInt.h"
|
||||||
|
#include "vnodeSync.h"
|
||||||
|
|
||||||
int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
|
int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
|
||||||
SVnodeInfo info = {0};
|
SVnodeInfo info = {0};
|
||||||
|
@ -171,6 +172,16 @@ void vnodeClose(SVnode *pVnode) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// start the sync timer after the queue is ready
|
||||||
|
int32_t vnodeStart(SVnode *pVnode) {
|
||||||
|
vnodeSyncSetQ(pVnode, NULL);
|
||||||
|
vnodeSyncSetRpc(pVnode, NULL);
|
||||||
|
vnodeSyncStart(pVnode);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void vnodeStop(SVnode *pVnode) {}
|
||||||
|
|
||||||
int64_t vnodeGetSyncHandle(SVnode *pVnode) { return pVnode->sync; }
|
int64_t vnodeGetSyncHandle(SVnode *pVnode) { return pVnode->sync; }
|
||||||
|
|
||||||
void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot) { pSnapshot->lastApplyIndex = pVnode->state.committed; }
|
void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot) { pSnapshot->lastApplyIndex = pVnode->state.committed; }
|
Loading…
Reference in New Issue