Merge pull request #28496 from taosdata/fix/TD-32622-add-closed-hash
fix/TD-32622-add-closed-hash
This commit is contained in:
commit
c65f6c3e79
|
@ -36,6 +36,7 @@ typedef struct SVnodeMgmt {
|
|||
SSingleWorker mgmtWorker;
|
||||
SSingleWorker mgmtMultiWorker;
|
||||
SHashObj *hash;
|
||||
SHashObj *closedHash;
|
||||
TdThreadRwlock lock;
|
||||
SVnodesStat state;
|
||||
STfs *pTfs;
|
||||
|
@ -94,7 +95,7 @@ SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId);
|
|||
SVnodeObj *vmAcquireVnodeImpl(SVnodeMgmt *pMgmt, int32_t vgId, bool strict);
|
||||
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode);
|
||||
int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl);
|
||||
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal);
|
||||
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal, bool keepClosed);
|
||||
|
||||
// vmHandle.c
|
||||
SArray *vmGetMsgHandles();
|
||||
|
@ -111,6 +112,7 @@ int32_t vmProcessArbHeartBeatReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
|||
int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes);
|
||||
int32_t vmWriteVnodeListToFile(SVnodeMgmt *pMgmt);
|
||||
int32_t vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeObj ***ppVnodes);
|
||||
int32_t vmGetAllVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeObj ***ppVnodes);
|
||||
|
||||
// vmWorker.c
|
||||
int32_t vmStartWorker(SVnodeMgmt *pMgmt);
|
||||
|
|
|
@ -19,6 +19,54 @@
|
|||
|
||||
#define MAX_CONTENT_LEN 2 * 1024 * 1024
|
||||
|
||||
int32_t vmGetAllVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeObj ***ppVnodes) {
|
||||
(void)taosThreadRwlockRdlock(&pMgmt->lock);
|
||||
|
||||
int32_t num = 0;
|
||||
int32_t size = taosHashGetSize(pMgmt->hash);
|
||||
int32_t closedSize = taosHashGetSize(pMgmt->closedHash);
|
||||
size += closedSize;
|
||||
SVnodeObj **pVnodes = taosMemoryCalloc(size, sizeof(SVnodeObj *));
|
||||
if (pVnodes == NULL) {
|
||||
(void)taosThreadRwlockUnlock(&pMgmt->lock);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
void *pIter = taosHashIterate(pMgmt->hash, NULL);
|
||||
while (pIter) {
|
||||
SVnodeObj **ppVnode = pIter;
|
||||
SVnodeObj *pVnode = *ppVnode;
|
||||
if (pVnode && num < size) {
|
||||
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
|
||||
// dTrace("vgId:%d, acquire vnode list, ref:%d", pVnode->vgId, refCount);
|
||||
pVnodes[num++] = (*ppVnode);
|
||||
pIter = taosHashIterate(pMgmt->hash, pIter);
|
||||
} else {
|
||||
taosHashCancelIterate(pMgmt->hash, pIter);
|
||||
}
|
||||
}
|
||||
|
||||
pIter = taosHashIterate(pMgmt->closedHash, NULL);
|
||||
while (pIter) {
|
||||
SVnodeObj **ppVnode = pIter;
|
||||
SVnodeObj *pVnode = *ppVnode;
|
||||
if (pVnode && num < size) {
|
||||
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
|
||||
// dTrace("vgId:%d, acquire vnode list, ref:%d", pVnode->vgId, refCount);
|
||||
pVnodes[num++] = (*ppVnode);
|
||||
pIter = taosHashIterate(pMgmt->closedHash, pIter);
|
||||
} else {
|
||||
taosHashCancelIterate(pMgmt->closedHash, pIter);
|
||||
}
|
||||
}
|
||||
|
||||
(void)taosThreadRwlockUnlock(&pMgmt->lock);
|
||||
*numOfVnodes = num;
|
||||
*ppVnodes = pVnodes;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeObj ***ppVnodes) {
|
||||
(void)taosThreadRwlockRdlock(&pMgmt->lock);
|
||||
|
||||
|
@ -217,7 +265,7 @@ int32_t vmWriteVnodeListToFile(SVnodeMgmt *pMgmt) {
|
|||
}
|
||||
|
||||
int32_t numOfVnodes = 0;
|
||||
TAOS_CHECK_GOTO(vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes), &lino, _OVER);
|
||||
TAOS_CHECK_GOTO(vmGetAllVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes), &lino, _OVER);
|
||||
|
||||
// terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
pJson = tjsonCreateObject();
|
||||
|
|
|
@ -538,7 +538,7 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
|
||||
|
||||
bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
|
||||
vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal);
|
||||
vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
|
||||
|
||||
int32_t diskPrimary = wrapperCfg.diskPrimary;
|
||||
char path[TSDB_FILENAME_LEN] = {0};
|
||||
|
@ -686,7 +686,7 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
dInfo("vgId:%d, close vnode", srcVgId);
|
||||
vmCloseVnode(pMgmt, pVnode, true);
|
||||
vmCloseVnode(pMgmt, pVnode, true, false);
|
||||
|
||||
int32_t diskPrimary = wrapperCfg.diskPrimary;
|
||||
char srcPath[TSDB_FILENAME_LEN] = {0};
|
||||
|
@ -795,7 +795,7 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
|
||||
|
||||
bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
|
||||
vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal);
|
||||
vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
|
||||
|
||||
int32_t diskPrimary = wrapperCfg.diskPrimary;
|
||||
char path[TSDB_FILENAME_LEN] = {0};
|
||||
|
@ -863,7 +863,7 @@ int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
return code;
|
||||
}
|
||||
|
||||
vmCloseVnode(pMgmt, pVnode, false);
|
||||
vmCloseVnode(pMgmt, pVnode, false, false);
|
||||
if (vmWriteVnodeListToFile(pMgmt) != 0) {
|
||||
dError("vgId:%d, failed to write vnode list since %s", vgId, terrstr());
|
||||
}
|
||||
|
|
|
@ -166,16 +166,34 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
|
|||
(void)taosThreadRwlockWrlock(&pMgmt->lock);
|
||||
SVnodeObj *pOld = NULL;
|
||||
int32_t r = taosHashGetDup(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
|
||||
if (r != 0) {
|
||||
dError("vgId:%d, failed to get vnode from hash", pVnode->vgId);
|
||||
}
|
||||
if (pOld) {
|
||||
vmFreeVnodeObj(&pOld);
|
||||
}
|
||||
int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
|
||||
|
||||
pOld = NULL;
|
||||
r = taosHashGetDup(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
|
||||
if (r != 0) {
|
||||
dError("vgId:%d, failed to get vnode from closedHash", pVnode->vgId);
|
||||
}
|
||||
if (pOld) {
|
||||
vmFreeVnodeObj(&pOld);
|
||||
}
|
||||
|
||||
dInfo("vgId:%d, remove from closedHash", pVnode->vgId);
|
||||
r = taosHashRemove(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t));
|
||||
if (r != 0) {
|
||||
dError("vgId:%d, failed to remove vnode from hash", pVnode->vgId);
|
||||
}
|
||||
(void)taosThreadRwlockUnlock(&pMgmt->lock);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal) {
|
||||
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal, bool keepClosed) {
|
||||
char path[TSDB_FILENAME_LEN] = {0};
|
||||
bool atExit = true;
|
||||
|
||||
|
@ -185,7 +203,40 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal)
|
|||
|
||||
(void)taosThreadRwlockWrlock(&pMgmt->lock);
|
||||
int32_t r = taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t));
|
||||
if (r != 0) {
|
||||
dError("vgId:%d, failed to remove vnode from hash", pVnode->vgId);
|
||||
}
|
||||
if (keepClosed) {
|
||||
SVnodeObj *pClosedVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
|
||||
(void)memset(pClosedVnode, 0, sizeof(SVnodeObj));
|
||||
if (pVnode == NULL) {
|
||||
dError("vgId:%d, failed to alloc vnode since %s", pVnode->vgId, terrstr());
|
||||
(void)taosThreadRwlockUnlock(&pMgmt->lock);
|
||||
return;
|
||||
}
|
||||
|
||||
pClosedVnode->vgId = pVnode->vgId;
|
||||
pClosedVnode->dropped = pVnode->dropped;
|
||||
pClosedVnode->vgVersion = pVnode->vgVersion;
|
||||
pClosedVnode->diskPrimary = pVnode->diskPrimary;
|
||||
pClosedVnode->toVgId = pVnode->toVgId;
|
||||
|
||||
SVnodeObj *pOld = NULL;
|
||||
r = taosHashGetDup(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
|
||||
if (r != 0) {
|
||||
dError("vgId:%d, failed to get vnode from closedHash", pVnode->vgId);
|
||||
}
|
||||
if (pOld) {
|
||||
vmFreeVnodeObj(&pOld);
|
||||
}
|
||||
dInfo("vgId:%d, put vnode to closedHash", pVnode->vgId);
|
||||
r = taosHashPut(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), &pClosedVnode, sizeof(SVnodeObj *));
|
||||
if (r != 0) {
|
||||
dError("vgId:%d, failed to put vnode to closedHash", pVnode->vgId);
|
||||
}
|
||||
}
|
||||
(void)taosThreadRwlockUnlock(&pMgmt->lock);
|
||||
|
||||
vmReleaseVnode(pMgmt, pVnode);
|
||||
|
||||
if (pVnode->failed) {
|
||||
|
@ -362,9 +413,15 @@ static void *vmOpenVnodeInThread(void *param) {
|
|||
static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
|
||||
pMgmt->hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
|
||||
if (pMgmt->hash == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
dError("failed to init vnode hash since %s", terrstr());
|
||||
return -1;
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pMgmt->closedHash =
|
||||
taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
|
||||
if (pMgmt->hash == NULL) {
|
||||
dError("failed to init vnode closed hash since %s", terrstr());
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
SWrapperCfg *pCfgs = NULL;
|
||||
|
@ -459,7 +516,7 @@ static void *vmCloseVnodeInThread(void *param) {
|
|||
pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
|
||||
tmsgReportStartup("vnode-close", stepDesc);
|
||||
|
||||
vmCloseVnode(pMgmt, pVnode, false);
|
||||
vmCloseVnode(pMgmt, pVnode, false, false);
|
||||
}
|
||||
|
||||
dInfo("thread:%d, numOfVnodes:%d is closed", pThread->threadIndex, pThread->vnodeNum);
|
||||
|
@ -537,6 +594,18 @@ static void vmCloseVnodes(SVnodeMgmt *pMgmt) {
|
|||
pMgmt->hash = NULL;
|
||||
}
|
||||
|
||||
void *pIter = taosHashIterate(pMgmt->closedHash, NULL);
|
||||
while (pIter) {
|
||||
SVnodeObj **ppVnode = pIter;
|
||||
vmFreeVnodeObj(ppVnode);
|
||||
pIter = taosHashIterate(pMgmt->closedHash, pIter);
|
||||
}
|
||||
|
||||
if (pMgmt->closedHash != NULL) {
|
||||
taosHashCleanup(pMgmt->closedHash);
|
||||
pMgmt->closedHash = NULL;
|
||||
}
|
||||
|
||||
dInfo("total vnodes:%d are all closed", numOfVnodes);
|
||||
}
|
||||
|
||||
|
|
|
@ -400,8 +400,8 @@ static int32_t sdbReadFileImp(SSdb *pSdb) {
|
|||
pSdb->commitTerm = pSdb->applyTerm;
|
||||
pSdb->commitConfig = pSdb->applyConfig;
|
||||
memcpy(pSdb->tableVer, tableVer, sizeof(tableVer));
|
||||
mInfo("read sdb file:%s success, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64, file, pSdb->commitIndex,
|
||||
pSdb->commitTerm, pSdb->commitConfig);
|
||||
mInfo("vgId:1, trans:0, read sdb file:%s success, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64, file,
|
||||
pSdb->commitIndex, pSdb->commitTerm, pSdb->commitConfig);
|
||||
|
||||
_OVER:
|
||||
if ((ret = taosCloseFile(&pFile)) != 0) {
|
||||
|
@ -573,7 +573,8 @@ static int32_t sdbWriteFileImp(SSdb *pSdb, int32_t skip_type) {
|
|||
pSdb->commitIndex = pSdb->applyIndex;
|
||||
pSdb->commitTerm = pSdb->applyTerm;
|
||||
pSdb->commitConfig = pSdb->applyConfig;
|
||||
mInfo("write sdb file success, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " file:%s",
|
||||
mInfo("vgId:1, trans:0, write sdb file success, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64
|
||||
" file:%s",
|
||||
pSdb->commitIndex, pSdb->commitTerm, pSdb->commitConfig, curfile);
|
||||
}
|
||||
|
||||
|
@ -610,8 +611,8 @@ int32_t sdbWriteFile(SSdb *pSdb, int32_t delta) {
|
|||
if (code != 0) {
|
||||
mError("failed to write sdb file since %s", tstrerror(code));
|
||||
} else {
|
||||
mInfo("write sdb file success, apply index:%" PRId64 " term:%" PRId64 " config:%" PRId64, pSdb->applyIndex,
|
||||
pSdb->applyTerm, pSdb->applyConfig);
|
||||
mInfo("vgId:1, trans:0, write sdb file success, apply index:%" PRId64 ", term:%" PRId64 ", config:%" PRId64,
|
||||
pSdb->applyIndex, pSdb->applyTerm, pSdb->applyConfig);
|
||||
}
|
||||
(void)taosThreadMutexUnlock(&pSdb->filelock);
|
||||
return code;
|
||||
|
|
Loading…
Reference in New Issue