fix/TD-32622-add-closed-hash-fix-case
This commit is contained in:
parent
9f111b60ab
commit
7a6c21814e
|
@ -95,7 +95,7 @@ SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId);
|
||||||
SVnodeObj *vmAcquireVnodeImpl(SVnodeMgmt *pMgmt, int32_t vgId, bool strict);
|
SVnodeObj *vmAcquireVnodeImpl(SVnodeMgmt *pMgmt, int32_t vgId, bool strict);
|
||||||
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode);
|
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode);
|
||||||
int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl);
|
int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl);
|
||||||
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal);
|
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal, bool keepClosed);
|
||||||
|
|
||||||
// vmHandle.c
|
// vmHandle.c
|
||||||
SArray *vmGetMsgHandles();
|
SArray *vmGetMsgHandles();
|
||||||
|
|
|
@ -535,7 +535,7 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
|
tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
|
||||||
|
|
||||||
bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
|
bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
|
||||||
vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal);
|
vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
|
||||||
|
|
||||||
int32_t diskPrimary = wrapperCfg.diskPrimary;
|
int32_t diskPrimary = wrapperCfg.diskPrimary;
|
||||||
char path[TSDB_FILENAME_LEN] = {0};
|
char path[TSDB_FILENAME_LEN] = {0};
|
||||||
|
@ -683,7 +683,7 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
dInfo("vgId:%d, close vnode", srcVgId);
|
dInfo("vgId:%d, close vnode", srcVgId);
|
||||||
vmCloseVnode(pMgmt, pVnode, true);
|
vmCloseVnode(pMgmt, pVnode, true, true);
|
||||||
|
|
||||||
int32_t diskPrimary = wrapperCfg.diskPrimary;
|
int32_t diskPrimary = wrapperCfg.diskPrimary;
|
||||||
char srcPath[TSDB_FILENAME_LEN] = {0};
|
char srcPath[TSDB_FILENAME_LEN] = {0};
|
||||||
|
@ -792,7 +792,7 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
|
tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
|
||||||
|
|
||||||
bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
|
bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
|
||||||
vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal);
|
vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
|
||||||
|
|
||||||
int32_t diskPrimary = wrapperCfg.diskPrimary;
|
int32_t diskPrimary = wrapperCfg.diskPrimary;
|
||||||
char path[TSDB_FILENAME_LEN] = {0};
|
char path[TSDB_FILENAME_LEN] = {0};
|
||||||
|
@ -860,7 +860,7 @@ int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
vmCloseVnode(pMgmt, pVnode, false);
|
vmCloseVnode(pMgmt, pVnode, false, false);
|
||||||
if (vmWriteVnodeListToFile(pMgmt) != 0) {
|
if (vmWriteVnodeListToFile(pMgmt) != 0) {
|
||||||
dError("vgId:%d, failed to write vnode list since %s", vgId, terrstr());
|
dError("vgId:%d, failed to write vnode list since %s", vgId, terrstr());
|
||||||
}
|
}
|
||||||
|
|
|
@ -183,6 +183,7 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
|
||||||
vmFreeVnodeObj(&pOld);
|
vmFreeVnodeObj(&pOld);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
dInfo("vgId:%d, remove from closedHash", pVnode->vgId);
|
||||||
r = taosHashRemove(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t));
|
r = taosHashRemove(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t));
|
||||||
if (r != 0) {
|
if (r != 0) {
|
||||||
dError("vgId:%d, failed to remove vnode from hash", pVnode->vgId);
|
dError("vgId:%d, failed to remove vnode from hash", pVnode->vgId);
|
||||||
|
@ -192,7 +193,7 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal) {
|
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal, bool keepClosed) {
|
||||||
char path[TSDB_FILENAME_LEN] = {0};
|
char path[TSDB_FILENAME_LEN] = {0};
|
||||||
bool atExit = true;
|
bool atExit = true;
|
||||||
|
|
||||||
|
@ -205,15 +206,20 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal)
|
||||||
if (r != 0) {
|
if (r != 0) {
|
||||||
dError("vgId:%d, failed to remove vnode from hash", pVnode->vgId);
|
dError("vgId:%d, failed to remove vnode from hash", pVnode->vgId);
|
||||||
}
|
}
|
||||||
|
if (keepClosed) {
|
||||||
SVnodeObj *pClosedVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
|
SVnodeObj *pClosedVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
|
||||||
|
(void)memset(pClosedVnode, 0, sizeof(SVnodeObj));
|
||||||
if (pVnode == NULL) {
|
if (pVnode == NULL) {
|
||||||
dError("vgId:%d, failed to alloc vnode since %s", pVnode->vgId, terrstr());
|
dError("vgId:%d, failed to alloc vnode since %s", pVnode->vgId, terrstr());
|
||||||
(void)taosThreadRwlockUnlock(&pMgmt->lock);
|
(void)taosThreadRwlockUnlock(&pMgmt->lock);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
*pClosedVnode = *pVnode;
|
pClosedVnode->vgId = pVnode->vgId;
|
||||||
|
pClosedVnode->dropped = pVnode->dropped;
|
||||||
|
pClosedVnode->vgVersion = pVnode->vgVersion;
|
||||||
|
pClosedVnode->diskPrimary = pVnode->diskPrimary;
|
||||||
|
pClosedVnode->toVgId = pVnode->toVgId;
|
||||||
|
|
||||||
SVnodeObj *pOld = NULL;
|
SVnodeObj *pOld = NULL;
|
||||||
r = taosHashGetDup(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
|
r = taosHashGetDup(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
|
||||||
|
@ -223,10 +229,12 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal)
|
||||||
if (pOld) {
|
if (pOld) {
|
||||||
vmFreeVnodeObj(&pOld);
|
vmFreeVnodeObj(&pOld);
|
||||||
}
|
}
|
||||||
|
dInfo("vgId:%d, put vnode to closedHash", pVnode->vgId);
|
||||||
r = taosHashPut(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), &pClosedVnode, sizeof(SVnodeObj *));
|
r = taosHashPut(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), &pClosedVnode, sizeof(SVnodeObj *));
|
||||||
if (r != 0) {
|
if (r != 0) {
|
||||||
dError("vgId:%d, failed to put vnode to closedHash", pVnode->vgId);
|
dError("vgId:%d, failed to put vnode to closedHash", pVnode->vgId);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
(void)taosThreadRwlockUnlock(&pMgmt->lock);
|
(void)taosThreadRwlockUnlock(&pMgmt->lock);
|
||||||
|
|
||||||
vmReleaseVnode(pMgmt, pVnode);
|
vmReleaseVnode(pMgmt, pVnode);
|
||||||
|
@ -508,7 +516,7 @@ static void *vmCloseVnodeInThread(void *param) {
|
||||||
pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
|
pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
|
||||||
tmsgReportStartup("vnode-close", stepDesc);
|
tmsgReportStartup("vnode-close", stepDesc);
|
||||||
|
|
||||||
vmCloseVnode(pMgmt, pVnode, false);
|
vmCloseVnode(pMgmt, pVnode, false, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
dInfo("thread:%d, numOfVnodes:%d is closed", pThread->threadIndex, pThread->vnodeNum);
|
dInfo("thread:%d, numOfVnodes:%d is closed", pThread->threadIndex, pThread->vnodeNum);
|
||||||
|
|
Loading…
Reference in New Issue