diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index 989adf84ac..7842077d88 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -37,7 +37,9 @@ typedef struct SVnodeMgmt { SSingleWorker mgmtMultiWorker; SHashObj *hash; SHashObj *closedHash; + SHashObj *creatingHash; TdThreadRwlock lock; + TdThreadMutex mutex; SVnodesStat state; STfs *pTfs; TdThread thread; @@ -96,6 +98,7 @@ SVnodeObj *vmAcquireVnodeImpl(SVnodeMgmt *pMgmt, int32_t vgId, bool strict); void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode); int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl); void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal, bool keepClosed); +void vmRemoveFromCreatingHash(SVnodeMgmt *pMgmt, int32_t vgId); // vmHandle.c SArray *vmGetMsgHandles(); @@ -113,6 +116,7 @@ int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t int32_t vmWriteVnodeListToFile(SVnodeMgmt *pMgmt); int32_t vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeObj ***ppVnodes); int32_t vmGetAllVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeObj ***ppVnodes); +int32_t vmGetAllVnodeListFromHashWithCreating(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeObj ***ppVnodes); // vmWorker.c int32_t vmStartWorker(SVnodeMgmt *pMgmt); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmFile.c b/source/dnode/mgmt/mgmt_vnode/src/vmFile.c index 7566b69c02..b4453ad6fc 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmFile.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmFile.c @@ -67,6 +67,54 @@ int32_t vmGetAllVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnod return 0; } +int32_t vmGetAllVnodeListFromHashWithCreating(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeObj ***ppVnodes) { + (void)taosThreadRwlockRdlock(&pMgmt->lock); + + int32_t num = 0; + int32_t size = taosHashGetSize(pMgmt->hash); + int32_t creatingSize = taosHashGetSize(pMgmt->creatingHash); + size += creatingSize; + SVnodeObj **pVnodes = taosMemoryCalloc(size, sizeof(SVnodeObj *)); + if (pVnodes == NULL) { + (void)taosThreadRwlockUnlock(&pMgmt->lock); + return terrno; + } + + void *pIter = taosHashIterate(pMgmt->hash, NULL); + while (pIter) { + SVnodeObj **ppVnode = pIter; + SVnodeObj *pVnode = *ppVnode; + if (pVnode && num < size) { + int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); + dTrace("vgId:%d,acquire vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount); + pVnodes[num++] = (*ppVnode); + pIter = taosHashIterate(pMgmt->hash, pIter); + } else { + taosHashCancelIterate(pMgmt->hash, pIter); + } + } + + pIter = taosHashIterate(pMgmt->creatingHash, NULL); + while (pIter) { + SVnodeObj **ppVnode = pIter; + SVnodeObj *pVnode = *ppVnode; + if (pVnode && num < size) { + int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); + dTrace("vgId:%d, acquire vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount); + pVnodes[num++] = (*ppVnode); + pIter = taosHashIterate(pMgmt->creatingHash, pIter); + } else { + taosHashCancelIterate(pMgmt->creatingHash, pIter); + } + } + (void)taosThreadRwlockUnlock(&pMgmt->lock); + + *numOfVnodes = num; + *ppVnodes = pVnodes; + + return 0; +} + int32_t vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeObj ***ppVnodes) { (void)taosThreadRwlockRdlock(&pMgmt->lock); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 006f44b349..90b3f0025d 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -381,6 +381,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { if (vnodeCreate(path, &vnodeCfg, diskPrimary, pMgmt->pTfs) < 0) { dError("vgId:%d, failed to create vnode since %s", req.vgId, terrstr()); vmReleaseVnode(pMgmt, pVnode); + vmRemoveFromCreatingHash(pMgmt, req.vgId); (void)tFreeSCreateVnodeReq(&req); code = terrno != 0 ? terrno : -1; return code; @@ -422,6 +423,8 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { } _OVER: + vmRemoveFromCreatingHash(pMgmt, req.vgId); + if (code != 0) { int32_t r = 0; r = taosThreadRwlockWrlock(&pMgmt->lock); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 682c179270..2ee607949d 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -16,6 +16,7 @@ #define _DEFAULT_SOURCE #include "vmInt.h" #include "libs/function/tudf.h" +#include "osMemory.h" #include "tfs.h" #include "vnd.h" @@ -62,10 +63,20 @@ int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) { int32_t numOfVnodes = 0; SVnodeObj **ppVnodes = NULL; - code = vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes); + code = taosThreadMutexLock(&pMgmt->mutex); if (code != 0) { return code; } + + code = vmGetAllVnodeListFromHashWithCreating(pMgmt, &numOfVnodes, &ppVnodes); + if (code != 0) { + int32_t r = taosThreadMutexUnlock(&pMgmt->mutex); + if (r != 0) { + dError("vgId:%d, failed to unlock mutex since %s", vgId, tstrerror(r)); + } + return code; + } + for (int32_t v = 0; v < numOfVnodes; v++) { SVnodeObj *pVnode = ppVnodes[v]; disks[pVnode->diskPrimary] += 1; @@ -81,6 +92,51 @@ int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) { } } + SVnodeObj *pCreatingVnode = taosMemoryCalloc(1, sizeof(SVnodeObj)); + if (pCreatingVnode == NULL) { + code = -1; + if (terrno != 0) code = terrno; + dError("failed to alloc vnode since %s", tstrerror(code)); + int32_t r = taosThreadMutexUnlock(&pMgmt->mutex); + if (r != 0) { + dError("vgId:%d, failed to unlock mutex since %s", vgId, tstrerror(r)); + } + goto _OVER; + } + (void)memset(pCreatingVnode, 0, sizeof(SVnodeObj)); + + pCreatingVnode->vgId = vgId; + pCreatingVnode->diskPrimary = diskId; + + code = taosThreadRwlockWrlock(&pMgmt->lock); + if (code != 0) { + int32_t r = taosThreadMutexUnlock(&pMgmt->mutex); + if (r != 0) { + dError("vgId:%d, failed to unlock mutex since %s", vgId, tstrerror(r)); + } + taosMemoryFree(pCreatingVnode); + goto _OVER; + } + + dTrace("vgId:%d, put vnode into creating hash, pCreatingVnode:%p", vgId, pCreatingVnode); + code = taosHashPut(pMgmt->creatingHash, &vgId, sizeof(int32_t), &pCreatingVnode, sizeof(SVnodeObj *)); + if (code != 0) { + dError("vgId:%d, failed to put vnode to creatingHash", vgId); + taosMemoryFree(pCreatingVnode); + } + + int32_t r = taosThreadRwlockUnlock(&pMgmt->lock); + if (r != 0) { + dError("vgId:%d, failed to unlock since %s", vgId, tstrerror(r)); + } + + code = taosThreadMutexUnlock(&pMgmt->mutex); + if (code != 0) { + goto _OVER; + } + +_OVER: + for (int32_t i = 0; i < numOfVnodes; ++i) { if (ppVnodes == NULL || ppVnodes[i] == NULL) continue; vmReleaseVnode(pMgmt, ppVnodes[i]); @@ -89,8 +145,13 @@ int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) { taosMemoryFree(ppVnodes); } - dInfo("vgId:%d, alloc disk:%d of level 0. ndisk:%d, vnodes: %d", vgId, diskId, ndisk, numOfVnodes); - return diskId; + if (code != 0) { + dError("vgId:%d, failed to alloc disk since %s", vgId, tstrerror(code)); + return code; + } else { + dInfo("vgId:%d, alloc disk:%d of level 0. ndisk:%d, vnodes: %d", vgId, diskId, ndisk, numOfVnodes); + return diskId; + } } SVnodeObj *vmAcquireVnodeImpl(SVnodeMgmt *pMgmt, int32_t vgId, bool strict) { @@ -216,12 +277,12 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal, } if (keepClosed) { SVnodeObj *pClosedVnode = taosMemoryCalloc(1, sizeof(SVnodeObj)); - (void)memset(pClosedVnode, 0, sizeof(SVnodeObj)); - if (pVnode == NULL) { - dError("vgId:%d, failed to alloc vnode since %s", pVnode->vgId, terrstr()); + if (pClosedVnode == NULL) { + dError("failed to alloc vnode since %s", terrstr()); (void)taosThreadRwlockUnlock(&pMgmt->lock); return; } + (void)memset(pClosedVnode, 0, sizeof(SVnodeObj)); pClosedVnode->vgId = pVnode->vgId; pClosedVnode->dropped = pVnode->dropped; @@ -427,11 +488,18 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) { pMgmt->closedHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); - if (pMgmt->hash == NULL) { + if (pMgmt->closedHash == NULL) { dError("failed to init vnode closed hash since %s", terrstr()); return TSDB_CODE_OUT_OF_MEMORY; } + pMgmt->creatingHash = + taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + if (pMgmt->creatingHash == NULL) { + dError("failed to init vnode creatingHash hash since %s", terrstr()); + return TSDB_CODE_OUT_OF_MEMORY; + } + SWrapperCfg *pCfgs = NULL; int32_t numOfVnodes = 0; if (vmGetVnodeListFromFile(pMgmt, &pCfgs, &numOfVnodes) != 0) { @@ -509,6 +577,30 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) { return 0; } +void vmRemoveFromCreatingHash(SVnodeMgmt *pMgmt, int32_t vgId) { + (void)taosThreadRwlockWrlock(&pMgmt->lock); + SVnodeObj *pOld = NULL; + int32_t r = taosHashGetDup(pMgmt->creatingHash, &vgId, sizeof(int32_t), (void *)&pOld); + if (r != 0) { + dError("vgId:%d, failed to get vnode from creating Hash", vgId); + } + if (pOld) { + dTrace("vgId:%d, free vnode pOld:%p", vgId, &pOld); + vmFreeVnodeObj(&pOld); + } + dTrace("vgId:%d, remove from creating Hash", vgId); + r = taosHashRemove(pMgmt->creatingHash, &vgId, sizeof(int32_t)); + if (r != 0) { + dError("vgId:%d, failed to remove vnode from hash", vgId); + } + (void)taosThreadRwlockUnlock(&pMgmt->lock); + +_OVER: + if (r != 0) { + dError("vgId:%d, failed to remove vnode from creatingHash since %s", vgId, tstrerror(r)); + } +} + static void *vmCloseVnodeInThread(void *param) { SVnodeThread *pThread = param; SVnodeMgmt *pMgmt = pThread->pMgmt; @@ -614,6 +706,18 @@ static void vmCloseVnodes(SVnodeMgmt *pMgmt) { pMgmt->closedHash = NULL; } + pIter = taosHashIterate(pMgmt->creatingHash, NULL); + while (pIter) { + SVnodeObj **ppVnode = pIter; + vmFreeVnodeObj(ppVnode); + pIter = taosHashIterate(pMgmt->creatingHash, pIter); + } + + if (pMgmt->creatingHash != NULL) { + taosHashCleanup(pMgmt->creatingHash); + pMgmt->creatingHash = NULL; + } + dInfo("total vnodes:%d are all closed", numOfVnodes); } @@ -622,6 +726,7 @@ static void vmCleanup(SVnodeMgmt *pMgmt) { vmStopWorker(pMgmt); vnodeCleanup(); (void)taosThreadRwlockDestroy(&pMgmt->lock); + (void)taosThreadMutexDestroy(&pMgmt->mutex); (void)taosThreadMutexDestroy(&pMgmt->fileLock); taosMemoryFree(pMgmt); } @@ -714,6 +819,12 @@ static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { goto _OVER; } + code = taosThreadMutexInit(&pMgmt->mutex, NULL); + if (code != 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _OVER; + } + code = taosThreadMutexInit(&pMgmt->fileLock, NULL); if (code != 0) { code = TAOS_SYSTEM_ERROR(errno); diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 1e3975a556..ed842f6463 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -351,6 +351,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/user_privilege_all.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/fsync.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/multilevel.py +,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/multilevel_createdb.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/ttl.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/ttlChangeOnWrite.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/compress_tsz1.py diff --git a/tests/system-test/0-others/multilevel_createdb.py b/tests/system-test/0-others/multilevel_createdb.py new file mode 100644 index 0000000000..70131a760b --- /dev/null +++ b/tests/system-test/0-others/multilevel_createdb.py @@ -0,0 +1,89 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + + +from util.log import * +from util.cases import * +from util.sql import * +from util.common import * +from util.sqlset import * +import glob + +def scanFiles(pattern): + res = [] + for f in glob.iglob(pattern): + res += [f] + return res + +def checkFiles(pattern, state): + res = scanFiles(pattern) + tdLog.info(res) + num = len(res) + if num: + if state: + tdLog.info("%s: %d files exist. expect: files exist" % (pattern, num)) + else: + tdLog.exit("%s: %d files exist. expect: files not exist." % (pattern, num)) + else: + if state: + tdLog.exit("%s: %d files exist. expect: files exist" % (pattern, num)) + else: + tdLog.info("%s: %d files exist. expect: files not exist." % (pattern, num)) + +class TDTestCase: + def init(self, conn, logSql, replicaVar=1): + + self.replicaVar = int(replicaVar) + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor()) + self.setsql = TDSetSql() + + def basic(self): + tdLog.info("============== basic test ===============") + cfg={ + '/mnt/data1 0 1 0' : 'dataDir', + '/mnt/data2 0 0 0' : 'dataDir', + '/mnt/data3 0 0 0' : 'dataDir', + '/mnt/data4 0 0 0' : 'dataDir' + } + tdSql.createDir('/mnt/data1') + tdSql.createDir('/mnt/data2') + tdSql.createDir('/mnt/data3') + tdSql.createDir('/mnt/data4') + + tdDnodes.stop(1) + tdDnodes.deploy(1,cfg) + tdDnodes.start(1) + + checkFiles(r'/mnt/data1/*/*',1) + checkFiles(r'/mnt/data2/*/*',0) + + tdSql.execute('create database nws vgroups 20 stt_trigger 1 wal_level 1 wal_retention_period 0') + + checkFiles(r'/mnt/data1/vnode/*/wal',5) + checkFiles(r'/mnt/data2/vnode/*/wal',5) + checkFiles(r'/mnt/data3/vnode/*/wal',5) + checkFiles(r'/mnt/data4/vnode/*/wal',5) + + def run(self): + self.basic() + + + + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) \ No newline at end of file