From 9dbefbe5432940c4f75a20eaa93166583af35c9f Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 16 Dec 2021 11:37:41 +0800 Subject: [PATCH] TD-10431 alloc vgroup --- source/dnode/mnode/impl/inc/mndDef.h | 6 +++- source/dnode/mnode/impl/inc/mndVgroup.h | 2 +- source/dnode/mnode/impl/src/mndDb.c | 39 ++++++++++++++--------- source/dnode/mnode/impl/src/mndVgroup.c | 42 +++++++++++++++++++------ 4 files changed, 63 insertions(+), 26 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index e0633bb6b6..428f499e1e 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -207,8 +207,10 @@ typedef struct { int64_t createdTime; int64_t updateTime; int64_t uid; - int32_t version; + int32_t cfgVersion; + int32_t vgVersion; int32_t numOfVgroups; + int8_t hashMethod; // default is 1 SDbCfg cfg; } SDbObj; @@ -222,6 +224,8 @@ typedef struct { int64_t createdTime; int64_t updateTime; int32_t version; + int32_t hashBegin; + int32_t hashEnd; char dbName[TSDB_FULL_DB_NAME_LEN]; int32_t numOfTables; int32_t numOfTimeSeries; diff --git a/source/dnode/mnode/impl/inc/mndVgroup.h b/source/dnode/mnode/impl/inc/mndVgroup.h index e905d65616..7ee087929d 100644 --- a/source/dnode/mnode/impl/inc/mndVgroup.h +++ b/source/dnode/mnode/impl/inc/mndVgroup.h @@ -26,7 +26,7 @@ int32_t mndInitVgroup(SMnode *pMnode); void mndCleanupVgroup(SMnode *pMnode); SVgObj *mndAcquireVgroup(SMnode *pMnode, int32_t vgId); void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup); -int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb); +int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index e9a9744915..efe500d414 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -75,8 +75,10 @@ static SSdbRaw *mndDbActionEncode(SDbObj *pDb) { SDB_SET_INT64(pRaw, dataPos, pDb->createdTime) SDB_SET_INT64(pRaw, dataPos, pDb->updateTime) SDB_SET_INT64(pRaw, dataPos, pDb->uid) - SDB_SET_INT32(pRaw, dataPos, pDb->version) + SDB_SET_INT32(pRaw, dataPos, pDb->cfgVersion) + SDB_SET_INT32(pRaw, dataPos, pDb->vgVersion) SDB_SET_INT32(pRaw, dataPos, pDb->numOfVgroups) + SDB_SET_INT8(pRaw, dataPos, pDb->hashMethod) SDB_SET_INT32(pRaw, dataPos, pDb->cfg.cacheBlockSize) SDB_SET_INT32(pRaw, dataPos, pDb->cfg.totalBlocks) SDB_SET_INT32(pRaw, dataPos, pDb->cfg.daysPerFile) @@ -120,8 +122,10 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) { SDB_GET_INT64(pRaw, pRow, dataPos, &pDb->createdTime) SDB_GET_INT64(pRaw, pRow, dataPos, &pDb->updateTime) SDB_GET_INT64(pRaw, pRow, dataPos, &pDb->uid) - SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->version) + SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfgVersion) + SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->vgVersion) SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->numOfVgroups) + SDB_GET_INT8(pRaw, pRow, dataPos, &pDb->hashMethod) SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.cacheBlockSize) SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.totalBlocks) SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.daysPerFile) @@ -345,6 +349,9 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreat dbObj.updateTime = dbObj.createdTime; dbObj.uid = mndGenerateUid(dbObj.name, TSDB_FULL_DB_NAME_LEN); dbObj.numOfVgroups = pCreate->numOfVgroups; + dbObj.hashMethod = 1; + dbObj.cfgVersion = 0; + dbObj.vgVersion = 0; dbObj.cfg = (SDbCfg){.cacheBlockSize = pCreate->cacheBlockSize, .totalBlocks = pCreate->totalBlocks, .daysPerFile = pCreate->daysPerFile, @@ -376,50 +383,53 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreat return -1; } - if (mndAllocVgroup(pMnode, &dbObj) != 0) { + SVgObj *pVgroups = NULL; + if (mndAllocVgroup(pMnode, &dbObj, &pVgroups) != 0) { mError("db:%s, failed to create since %s", pCreate->db, terrstr()); return -1; } + int32_t code = -1; + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); if (pTrans == NULL) { mError("db:%s, failed to create since %s", pCreate->db, terrstr()); - return -1; + goto CREATE_DB_OVER; } mDebug("trans:%d, used to create db:%s", pTrans->id, pCreate->db); SSdbRaw *pRedoRaw = mndDbActionEncode(&dbObj); if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) { mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; + goto CREATE_DB_OVER; } sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING); SSdbRaw *pUndoRaw = mndDbActionEncode(&dbObj); if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) { mError("trans:%d, failed to append undo log since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; + goto CREATE_DB_OVER; } sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED); SSdbRaw *pCommitRaw = mndDbActionEncode(&dbObj); if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; + goto CREATE_DB_OVER; } sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; + goto CREATE_DB_OVER; } + code = 0; + +CREATE_DB_OVER: + free(pVgroups); mndTransDrop(pTrans); - return 0; + return code; } static int32_t mndProcessCreateDbMsg(SMnodeMsg *pMsg) { @@ -583,7 +593,8 @@ static int32_t mndProcessAlterDbMsg(SMnodeMsg *pMsg) { return code; } - dbObj.version++; + dbObj.cfgVersion++; + dbObj.updateTime = taosGetTimestampMs(); code = mndUpdateDb(pMnode, pMsg, pDb, &dbObj); mndReleaseDb(pMnode, pDb); diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index b2a2903f09..1fba3f7998 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -79,6 +79,8 @@ static SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) { SDB_SET_INT64(pRaw, dataPos, pVgroup->createdTime) SDB_SET_INT64(pRaw, dataPos, pVgroup->updateTime) SDB_SET_INT32(pRaw, dataPos, pVgroup->version) + SDB_SET_INT32(pRaw, dataPos, pVgroup->hashBegin) + SDB_SET_INT32(pRaw, dataPos, pVgroup->hashEnd) SDB_SET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_FULL_DB_NAME_LEN) SDB_SET_INT8(pRaw, dataPos, pVgroup->replica) for (int8_t i = 0; i < pVgroup->replica; ++i) { @@ -111,6 +113,8 @@ static SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) { SDB_GET_INT64(pRaw, pRow, dataPos, &pVgroup->createdTime) SDB_GET_INT64(pRaw, pRow, dataPos, &pVgroup->updateTime) SDB_GET_INT32(pRaw, pRow, dataPos, &pVgroup->version) + SDB_GET_INT32(pRaw, pRow, dataPos, &pVgroup->hashBegin) + SDB_GET_INT32(pRaw, pRow, dataPos, &pVgroup->hashEnd) SDB_GET_BINARY(pRaw, pRow, dataPos, pVgroup->dbName, TSDB_FULL_DB_NAME_LEN) SDB_GET_INT8(pRaw, pRow, dataPos, &pVgroup->replica) for (int8_t i = 0; i < pVgroup->replica; ++i) { @@ -184,7 +188,7 @@ static int32_t mndGetAvailableDnode(SMnode *pMnode, SVgObj *pVgroup) { return 0; } -int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb) { +int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) { if (pDb->numOfVgroups != -1 && (pDb->numOfVgroups < TSDB_MIN_VNODES_PER_DB || pDb->numOfVgroups > TSDB_MAX_VNODES_PER_DB)) { terrno = TSDB_CODE_MND_INVALID_DB_OPTION; @@ -199,26 +203,44 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb) { } } + SVgObj *pVgroups = calloc(pDb->numOfVgroups, sizeof(SVgObj)); + if (pVgroups == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + int32_t alloceVgroups = 0; int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP); + int32_t hashMin = 0; + int32_t hashMax = INT32_MAX; + int32_t hashInterval = (hashMax - hashMin) / pDb->numOfVgroups; - while (alloceVgroups < pDb->numOfVgroups) { - SVgObj vgObj = {0}; - vgObj.vgId == maxVgId++; - vgObj.createdTime = taosGetTimestampMs(); - vgObj.updateTime = vgObj.createdTime; - vgObj.version = 0; - memcpy(vgObj.dbName, pDb->name, TSDB_FULL_DB_NAME_LEN); - vgObj.replica = pDb->cfg.replications; + for (int32_t v = 0; v < pDb->numOfVgroups; v++) { + SVgObj *pVgroup = &pVgroups[v]; + pVgroup->vgId == maxVgId++; + pVgroup->createdTime = taosGetTimestampMs(); + pVgroup->updateTime = pVgroups->createdTime; + pVgroup->version = 0; + pVgroup->hashBegin = hashMin + hashInterval * v; + if (v == pDb->numOfVgroups - 1) { + pVgroup->hashEnd = hashMax; + } else { + pVgroup->hashEnd = hashMin + hashInterval * (v + 1); + } - if (mndGetAvailableDnode(pMnode, &vgObj) != 0) { + memcpy(pVgroup->dbName, pDb->name, TSDB_FULL_DB_NAME_LEN); + pVgroup->replica = pDb->cfg.replications; + + if (mndGetAvailableDnode(pMnode, pVgroup) != 0) { terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES; + free(pVgroups); return -1; } alloceVgroups++; } + *ppVgroups = pVgroups; return 0; }