diff --git a/source/dnode/mgmt/impl/test/db/db.cpp b/source/dnode/mgmt/impl/test/db/db.cpp index 89e6603d16..231c3177a5 100644 --- a/source/dnode/mgmt/impl/test/db/db.cpp +++ b/source/dnode/mgmt/impl/test/db/db.cpp @@ -206,6 +206,8 @@ TEST_F(DndTestDb, 01_ShowDb) { TEST_F(DndTestDb, 02_CreateDb) { { + taosMsleep(1100); + SCreateDbMsg* pReq = (SCreateDbMsg*)rpcMallocCont(sizeof(SCreateDbMsg)); strcpy(pReq->db, "1.d1"); pReq->numOfVgroups = htonl(2); diff --git a/source/dnode/mnode/impl/inc/mndDnode.h b/source/dnode/mnode/impl/inc/mndDnode.h index 04ceb2820a..764dfbffc1 100644 --- a/source/dnode/mnode/impl/inc/mndDnode.h +++ b/source/dnode/mnode/impl/inc/mndDnode.h @@ -28,6 +28,7 @@ SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId); void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode); SEpSet mndGetDnodeEpset(SDnodeObj *pDnode); int32_t mndGetDnodeSize(SMnode *pMnode); +bool mndIsDnodeInReadyStatus(SMnode *pMnode, SDnodeObj *pDnode); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index ec5f68a713..791f43c69c 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -218,6 +218,15 @@ int32_t mndGetDnodeSize(SMnode *pMnode) { return sdbGetSize(pSdb, SDB_DNODE); } +bool mndIsDnodeInReadyStatus(SMnode *pMnode, SDnodeObj *pDnode) { + int64_t ms = taosGetTimestampMs(); + int64_t interval = ABS(pDnode->lastAccessTime - ms); + if (interval > 3000 * pMnode->cfg.statusInterval) { + return false; + } + return true; +} + static void mndGetDnodeData(SMnode *pMnode, SDnodeEps *pEps, int32_t numOfEps) { SSdb *pSdb = pMnode->pSdb; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 023171cdfe..b2a2903f09 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -152,18 +152,73 @@ void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) { sdbRelease(pSdb, pVgroup); } -static int32_t mndGetDefaultVgroupSize(SMnode *pMnode) { return 4; } +static int32_t mndGetDefaultVgroupSize(SMnode *pMnode) { + // todo + return 2; +} -int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb) { - if (pDb->numOfVgroups == -1) { - pDb->numOfVgroups = mndGetDefaultVgroupSize(pMnode); +static int32_t mndGetAvailableDnode(SMnode *pMnode, SVgObj *pVgroup) { + SSdb *pSdb = pMnode->pSdb; + int32_t allocedVnodes = 0; + void *pIter = NULL; + + while (allocedVnodes < pVgroup->replica) { + SDnodeObj *pDnode = NULL; + pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode); + if (pIter == NULL) break; + + // todo + if (mndIsDnodeInReadyStatus(pMnode, pDnode)) { + SVnodeGid *pVgid = &pVgroup->vnodeGid[allocedVnodes]; + pVgid->dnodeId = pDnode->id; + pVgid->role = TAOS_SYNC_STATE_FOLLOWER; + allocedVnodes++; + } + sdbRelease(pSdb, pDnode); } - if (pDb->numOfVgroups < TSDB_MIN_VNODES_PER_DB || pDb->numOfVgroups > TSDB_MAX_VNODES_PER_DB) { + if (allocedVnodes != pVgroup->replica) { + terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES; + return -1; + } + return 0; +} + +int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb) { + if (pDb->numOfVgroups != -1 && + (pDb->numOfVgroups < TSDB_MIN_VNODES_PER_DB || pDb->numOfVgroups > TSDB_MAX_VNODES_PER_DB)) { terrno = TSDB_CODE_MND_INVALID_DB_OPTION; return -1; } + if (pDb->numOfVgroups == -1) { + pDb->numOfVgroups = mndGetDefaultVgroupSize(pMnode); + if (pDb->numOfVgroups < 0) { + terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES; + return -1; + } + } + + int32_t alloceVgroups = 0; + int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP); + + while (alloceVgroups < pDb->numOfVgroups) { + SVgObj vgObj = {0}; + vgObj.vgId == maxVgId++; + vgObj.createdTime = taosGetTimestampMs(); + vgObj.updateTime = vgObj.createdTime; + vgObj.version = 0; + memcpy(vgObj.dbName, pDb->name, TSDB_FULL_DB_NAME_LEN); + vgObj.replica = pDb->cfg.replications; + + if (mndGetAvailableDnode(pMnode, &vgObj) != 0) { + terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES; + return -1; + } + + alloceVgroups++; + } + return 0; }