Merge pull request #6128 from taosdata/fix/TD-4081-v3
[TD-4081]<fix>: fix vnode dropping
This commit is contained in:
commit
d070bcd756
|
@ -389,7 +389,7 @@ static void mnodeSetDefaultDbCfg(SDbCfg *pCfg) {
|
|||
if (pCfg->compression < 0) pCfg->compression = tsCompression;
|
||||
if (pCfg->walLevel < 0) pCfg->walLevel = tsWAL;
|
||||
if (pCfg->replications < 0) pCfg->replications = tsReplications;
|
||||
if (pCfg->quorum < 0) pCfg->quorum = tsQuorum;
|
||||
if (pCfg->quorum < 0) pCfg->quorum = MIN(tsQuorum, pCfg->replications);
|
||||
if (pCfg->update < 0) pCfg->update = tsUpdate;
|
||||
if (pCfg->cacheLastRow < 0) pCfg->cacheLastRow = tsCacheLastRow;
|
||||
if (pCfg->dbType < 0) pCfg->dbType = 0;
|
||||
|
|
|
@ -120,12 +120,14 @@ int32_t vnodeDrop(int32_t vgId) {
|
|||
vDebug("vgId:%d, failed to drop, vnode not find", vgId);
|
||||
return TSDB_CODE_VND_INVALID_VGROUP_ID;
|
||||
}
|
||||
if (pVnode->dropped) {
|
||||
vnodeRelease(pVnode);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
vInfo("vgId:%d, vnode will be dropped, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
|
||||
pVnode->dropped = 1;
|
||||
|
||||
// remove from hash, so new messages wont be consumed
|
||||
vnodeRemoveFromHash(pVnode);
|
||||
vnodeRelease(pVnode);
|
||||
vnodeCleanupInMWorker(pVnode);
|
||||
|
||||
|
@ -390,6 +392,10 @@ int32_t vnodeOpen(int32_t vgId) {
|
|||
int32_t vnodeClose(int32_t vgId) {
|
||||
SVnodeObj *pVnode = vnodeAcquire(vgId);
|
||||
if (pVnode == NULL) return 0;
|
||||
if (pVnode->dropped) {
|
||||
vnodeRelease(pVnode);
|
||||
return 0;
|
||||
}
|
||||
|
||||
vDebug("vgId:%d, vnode will be closed, pVnode:%p", pVnode->vgId, pVnode);
|
||||
vnodeRemoveFromHash(pVnode);
|
||||
|
@ -475,6 +481,8 @@ void vnodeCleanUp(SVnodeObj *pVnode) {
|
|||
|
||||
vnodeSetClosingStatus(pVnode);
|
||||
|
||||
vnodeRemoveFromHash(pVnode);
|
||||
|
||||
// stop replication module
|
||||
if (pVnode->sync > 0) {
|
||||
int64_t sync = pVnode->sync;
|
||||
|
|
|
@ -117,14 +117,17 @@ static SVReadMsg *vnodeBuildVReadMsg(SVnodeObj *pVnode, void *pCont, int32_t con
|
|||
}
|
||||
|
||||
int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qtype, void *rparam) {
|
||||
SVnodeObj *pVnode = vparam;
|
||||
if (pVnode->dropped) {
|
||||
return TSDB_CODE_APP_NOT_READY;
|
||||
}
|
||||
|
||||
SVReadMsg *pRead = vnodeBuildVReadMsg(vparam, pCont, contLen, qtype, rparam);
|
||||
if (pRead == NULL) {
|
||||
assert(terrno != 0);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
SVnodeObj *pVnode = vparam;
|
||||
|
||||
int32_t code = vnodeCheckRead(pVnode);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosFreeQitem(pRead);
|
||||
|
|
|
@ -66,6 +66,9 @@ static bool vnodeSetClosingStatusImp(SVnodeObj* pVnode) {
|
|||
}
|
||||
|
||||
bool vnodeSetClosingStatus(SVnodeObj* pVnode) {
|
||||
if (pVnode->status == TAOS_VN_STATUS_CLOSING)
|
||||
return true;
|
||||
|
||||
while (!vnodeSetClosingStatusImp(pVnode)) {
|
||||
taosMsleep(1);
|
||||
}
|
||||
|
|
|
@ -55,6 +55,11 @@ void vnodeNotifyRole(int32_t vgId, int8_t role) {
|
|||
vTrace("vgId:%d, vnode not found while notify role", vgId);
|
||||
return;
|
||||
}
|
||||
if (pVnode->dropped) {
|
||||
vTrace("vgId:%d, vnode dropped while notify role", vgId);
|
||||
vnodeRelease(pVnode);
|
||||
return;
|
||||
}
|
||||
|
||||
vInfo("vgId:%d, sync role changed from %s to %s", pVnode->vgId, syncRole[pVnode->role], syncRole[role]);
|
||||
pVnode->role = role;
|
||||
|
@ -75,6 +80,11 @@ void vnodeCtrlFlow(int32_t vgId, int32_t level) {
|
|||
vTrace("vgId:%d, vnode not found while flow ctrl", vgId);
|
||||
return;
|
||||
}
|
||||
if (pVnode->dropped) {
|
||||
vTrace("vgId:%d, vnode dropped while flow ctrl", vgId);
|
||||
vnodeRelease(pVnode);
|
||||
return;
|
||||
}
|
||||
|
||||
if (pVnode->flowctrlLevel != level) {
|
||||
vDebug("vgId:%d, set flowctrl level from %d to %d", pVnode->vgId, pVnode->flowctrlLevel, level);
|
||||
|
@ -129,6 +139,7 @@ int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rpara
|
|||
SVnodeObj *pVnode = vnodeAcquire(vgId);
|
||||
if (pVnode == NULL) {
|
||||
vError("vgId:%d, vnode not found while write to cache", vgId);
|
||||
vnodeRelease(pVnode);
|
||||
return TSDB_CODE_VND_INVALID_VGROUP_ID;
|
||||
}
|
||||
|
||||
|
|
|
@ -386,4 +386,6 @@ void vnodeWaitWriteCompleted(SVnodeObj *pVnode) {
|
|||
vTrace("vgId:%d, queued wmsg num:%d", pVnode->vgId, pVnode->queuedWMsg);
|
||||
taosMsleep(10);
|
||||
}
|
||||
|
||||
taosMsleep(900);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue