diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index 0e1a4bc98e..5bf151fced 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -36,6 +36,7 @@ typedef struct SVnodeMgmt { SSingleWorker mgmtWorker; SSingleWorker mgmtMultiWorker; SHashObj *hash; + SHashObj *closedHash; TdThreadRwlock lock; SVnodesStat state; STfs *pTfs; @@ -111,6 +112,7 @@ int32_t vmProcessArbHeartBeatReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes); int32_t vmWriteVnodeListToFile(SVnodeMgmt *pMgmt); int32_t vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeObj ***ppVnodes); +int32_t vmGetAllVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeObj ***ppVnodes); // vmWorker.c int32_t vmStartWorker(SVnodeMgmt *pMgmt); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmFile.c b/source/dnode/mgmt/mgmt_vnode/src/vmFile.c index 215a057618..c3f103d45c 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmFile.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmFile.c @@ -19,6 +19,54 @@ #define MAX_CONTENT_LEN 2 * 1024 * 1024 +int32_t vmGetAllVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeObj ***ppVnodes) { + (void)taosThreadRwlockRdlock(&pMgmt->lock); + + int32_t num = 0; + int32_t size = taosHashGetSize(pMgmt->hash); + int32_t closedSize = taosHashGetSize(pMgmt->closedHash); + size += closedSize; + SVnodeObj **pVnodes = taosMemoryCalloc(size, sizeof(SVnodeObj *)); + if (pVnodes == NULL) { + (void)taosThreadRwlockUnlock(&pMgmt->lock); + return terrno; + } + + void *pIter = taosHashIterate(pMgmt->hash, NULL); + while (pIter) { + SVnodeObj **ppVnode = pIter; + SVnodeObj *pVnode = *ppVnode; + if (pVnode && num < size) { + int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); + // dTrace("vgId:%d, acquire vnode list, ref:%d", pVnode->vgId, refCount); + pVnodes[num++] = (*ppVnode); + pIter = taosHashIterate(pMgmt->hash, pIter); + } else { + taosHashCancelIterate(pMgmt->hash, pIter); + } + } + + pIter = taosHashIterate(pMgmt->closedHash, NULL); + while (pIter) { + SVnodeObj **ppVnode = pIter; + SVnodeObj *pVnode = *ppVnode; + if (pVnode && num < size) { + int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); + // dTrace("vgId:%d, acquire vnode list, ref:%d", pVnode->vgId, refCount); + pVnodes[num++] = (*ppVnode); + pIter = taosHashIterate(pMgmt->closedHash, pIter); + } else { + taosHashCancelIterate(pMgmt->closedHash, pIter); + } + } + + (void)taosThreadRwlockUnlock(&pMgmt->lock); + *numOfVnodes = num; + *ppVnodes = pVnodes; + + return 0; +} + int32_t vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeObj ***ppVnodes) { (void)taosThreadRwlockRdlock(&pMgmt->lock); @@ -216,7 +264,7 @@ int32_t vmWriteVnodeListToFile(SVnodeMgmt *pMgmt) { } int32_t numOfVnodes = 0; - TAOS_CHECK_GOTO(vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes), &lino, _OVER); + TAOS_CHECK_GOTO(vmGetAllVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes), &lino, _OVER); // terrno = TSDB_CODE_OUT_OF_MEMORY; pJson = tjsonCreateObject(); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 20618dbdf3..55d42646d4 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -166,10 +166,27 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { (void)taosThreadRwlockWrlock(&pMgmt->lock); SVnodeObj *pOld = NULL; int32_t r = taosHashGetDup(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld); + if (r != 0) { + dError("vgId:%d, failed to get vnode from hash", pVnode->vgId); + } if (pOld) { vmFreeVnodeObj(&pOld); } int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *)); + + pOld = NULL; + r = taosHashGetDup(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld); + if (r != 0) { + dError("vgId:%d, failed to get vnode from closedHash", pVnode->vgId); + } + if (pOld) { + vmFreeVnodeObj(&pOld); + } + + r = taosHashRemove(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t)); + if (r != 0) { + dError("vgId:%d, failed to remove vnode from hash", pVnode->vgId); + } (void)taosThreadRwlockUnlock(&pMgmt->lock); return code; @@ -185,7 +202,33 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal) (void)taosThreadRwlockWrlock(&pMgmt->lock); int32_t r = taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t)); + if (r != 0) { + dError("vgId:%d, failed to remove vnode from hash", pVnode->vgId); + } + + SVnodeObj *pClosedVnode = taosMemoryCalloc(1, sizeof(SVnodeObj)); + if (pVnode == NULL) { + dError("vgId:%d, failed to alloc vnode since %s", pVnode->vgId, terrstr()); + (void)taosThreadRwlockUnlock(&pMgmt->lock); + return; + } + + *pClosedVnode = *pVnode; + + SVnodeObj *pOld = NULL; + r = taosHashGetDup(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld); + if (r != 0) { + dError("vgId:%d, failed to get vnode from closedHash", pVnode->vgId); + } + if (pOld) { + vmFreeVnodeObj(&pOld); + } + r = taosHashPut(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), &pClosedVnode, sizeof(SVnodeObj *)); + if (r != 0) { + dError("vgId:%d, failed to put vnode to closedHash", pVnode->vgId); + } (void)taosThreadRwlockUnlock(&pMgmt->lock); + vmReleaseVnode(pMgmt, pVnode); if (pVnode->failed) { @@ -362,9 +405,15 @@ static void *vmOpenVnodeInThread(void *param) { static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) { pMgmt->hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); if (pMgmt->hash == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; dError("failed to init vnode hash since %s", terrstr()); - return -1; + return TSDB_CODE_OUT_OF_MEMORY; + } + + pMgmt->closedHash = + taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + if (pMgmt->hash == NULL) { + dError("failed to init vnode closed hash since %s", terrstr()); + return TSDB_CODE_OUT_OF_MEMORY; } SWrapperCfg *pCfgs = NULL; @@ -537,6 +586,11 @@ static void vmCloseVnodes(SVnodeMgmt *pMgmt) { pMgmt->hash = NULL; } + if (pMgmt->closedHash != NULL) { + taosHashCleanup(pMgmt->closedHash); + pMgmt->closedHash = NULL; + } + dInfo("total vnodes:%d are all closed", numOfVnodes); } diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index 227ff15da9..474b22cca0 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -400,8 +400,8 @@ static int32_t sdbReadFileImp(SSdb *pSdb) { pSdb->commitTerm = pSdb->applyTerm; pSdb->commitConfig = pSdb->applyConfig; memcpy(pSdb->tableVer, tableVer, sizeof(tableVer)); - mInfo("read sdb file:%s success, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64, file, pSdb->commitIndex, - pSdb->commitTerm, pSdb->commitConfig); + mInfo("vgId:1, trans:0, read sdb file:%s success, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64, file, + pSdb->commitIndex, pSdb->commitTerm, pSdb->commitConfig); _OVER: if ((ret = taosCloseFile(&pFile)) != 0) { @@ -573,7 +573,8 @@ static int32_t sdbWriteFileImp(SSdb *pSdb, int32_t skip_type) { pSdb->commitIndex = pSdb->applyIndex; pSdb->commitTerm = pSdb->applyTerm; pSdb->commitConfig = pSdb->applyConfig; - mInfo("write sdb file success, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " file:%s", + mInfo("vgId:1, trans:0, write sdb file success, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64 + " file:%s", pSdb->commitIndex, pSdb->commitTerm, pSdb->commitConfig, curfile); } @@ -610,8 +611,8 @@ int32_t sdbWriteFile(SSdb *pSdb, int32_t delta) { if (code != 0) { mError("failed to write sdb file since %s", tstrerror(code)); } else { - mInfo("write sdb file success, apply index:%" PRId64 " term:%" PRId64 " config:%" PRId64, pSdb->applyIndex, - pSdb->applyTerm, pSdb->applyConfig); + mInfo("vgId:1, trans:0, write sdb file success, apply index:%" PRId64 ", term:%" PRId64 ", config:%" PRId64, + pSdb->applyIndex, pSdb->applyTerm, pSdb->applyConfig); } (void)taosThreadMutexUnlock(&pSdb->filelock); return code;