alloc vgroups
This commit is contained in:
parent
c9f8a1109f
commit
a881adcfe4
|
@ -139,7 +139,7 @@ void dmnWaitSignal() {
|
||||||
void dmnInitOption(SDnodeOpt *pOption) {
|
void dmnInitOption(SDnodeOpt *pOption) {
|
||||||
pOption->sver = 30000000; //3.0.0.0
|
pOption->sver = 30000000; //3.0.0.0
|
||||||
pOption->numOfCores = tsNumOfCores;
|
pOption->numOfCores = tsNumOfCores;
|
||||||
pOption->numOfSupportVnodes = 1;
|
pOption->numOfSupportVnodes = 16;
|
||||||
pOption->numOfCommitThreads = 1;
|
pOption->numOfCommitThreads = 1;
|
||||||
pOption->statusInterval = tsStatusInterval;
|
pOption->statusInterval = tsStatusInterval;
|
||||||
pOption->numOfThreadsPerCore = tsNumOfThreadsPerCore;
|
pOption->numOfThreadsPerCore = tsNumOfThreadsPerCore;
|
||||||
|
|
|
@ -124,6 +124,7 @@ typedef struct {
|
||||||
int64_t rebootTime;
|
int64_t rebootTime;
|
||||||
int64_t lastAccessTime;
|
int64_t lastAccessTime;
|
||||||
int32_t accessTimes;
|
int32_t accessTimes;
|
||||||
|
int16_t numOfVnodes;
|
||||||
int16_t numOfSupportVnodes;
|
int16_t numOfSupportVnodes;
|
||||||
int16_t numOfCores;
|
int16_t numOfCores;
|
||||||
EDndStatus status;
|
EDndStatus status;
|
||||||
|
|
|
@ -442,7 +442,7 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
|
||||||
}
|
}
|
||||||
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
|
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
|
||||||
|
|
||||||
mTrace("trans:%d, sync to other nodes", pTrans->id);
|
mDebug("trans:%d, sync to other nodes", pTrans->id);
|
||||||
int32_t code = mndSyncPropose(pMnode, pRaw);
|
int32_t code = mndSyncPropose(pMnode, pRaw);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
mError("trans:%d, failed to sync since %s", pTrans->id, terrstr());
|
mError("trans:%d, failed to sync since %s", pTrans->id, terrstr());
|
||||||
|
@ -450,7 +450,7 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
mTrace("trans:%d, sync finished", pTrans->id);
|
mDebug("trans:%d, sync finished", pTrans->id);
|
||||||
|
|
||||||
code = sdbWrite(pMnode->pSdb, pRaw);
|
code = sdbWrite(pMnode->pSdb, pRaw);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
|
|
@ -86,7 +86,6 @@ SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) {
|
||||||
for (int8_t i = 0; i < pVgroup->replica; ++i) {
|
for (int8_t i = 0; i < pVgroup->replica; ++i) {
|
||||||
SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
|
SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
|
||||||
SDB_SET_INT32(pRaw, dataPos, pVgid->dnodeId)
|
SDB_SET_INT32(pRaw, dataPos, pVgid->dnodeId)
|
||||||
SDB_SET_INT8(pRaw, dataPos, pVgid->role)
|
|
||||||
}
|
}
|
||||||
SDB_SET_RESERVE(pRaw, dataPos, TSDB_VGROUP_RESERVE_SIZE)
|
SDB_SET_RESERVE(pRaw, dataPos, TSDB_VGROUP_RESERVE_SIZE)
|
||||||
SDB_SET_DATALEN(pRaw, dataPos);
|
SDB_SET_DATALEN(pRaw, dataPos);
|
||||||
|
@ -121,7 +120,6 @@ SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) {
|
||||||
for (int8_t i = 0; i < pVgroup->replica; ++i) {
|
for (int8_t i = 0; i < pVgroup->replica; ++i) {
|
||||||
SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
|
SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
|
||||||
SDB_GET_INT32(pRaw, pRow, dataPos, &pVgid->dnodeId)
|
SDB_GET_INT32(pRaw, pRow, dataPos, &pVgid->dnodeId)
|
||||||
SDB_GET_INT8(pRaw, pRow, dataPos, (int8_t *)&pVgid->role)
|
|
||||||
}
|
}
|
||||||
SDB_GET_RESERVE(pRaw, pRow, dataPos, TSDB_VGROUP_RESERVE_SIZE)
|
SDB_GET_RESERVE(pRaw, pRow, dataPos, TSDB_VGROUP_RESERVE_SIZE)
|
||||||
|
|
||||||
|
@ -237,44 +235,95 @@ SDropVnodeMsg *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *p
|
||||||
return pDrop;
|
return pDrop;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndGetAvailableDnode(SMnode *pMnode, SVgObj *pVgroup) {
|
static SArray *mndBuildDnodesArray(SMnode *pMnode) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
int32_t allocedVnodes = 0;
|
int32_t numOfDnodes = mndGetDnodeSize(pMnode);
|
||||||
void *pIter = NULL;
|
SArray *pArray = taosArrayInit(numOfDnodes, sizeof(SDnodeObj));
|
||||||
|
if (pArray == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
while (allocedVnodes < pVgroup->replica) {
|
void *pIter = NULL;
|
||||||
|
while (1) {
|
||||||
SDnodeObj *pDnode = NULL;
|
SDnodeObj *pDnode = NULL;
|
||||||
pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
|
pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
|
|
||||||
// todo
|
int32_t numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
|
||||||
if (mndIsDnodeInReadyStatus(pMnode, pDnode)) {
|
|
||||||
SVnodeGid *pVgid = &pVgroup->vnodeGid[allocedVnodes];
|
bool isMnode = mndIsMnode(pMnode, pDnode->id);
|
||||||
pVgid->dnodeId = pDnode->id;
|
if (isMnode) {
|
||||||
if (pVgroup->replica == 1) {
|
pDnode->numOfVnodes++;
|
||||||
pVgid->role = TAOS_SYNC_STATE_LEADER;
|
|
||||||
} else {
|
|
||||||
pVgid->role = TAOS_SYNC_STATE_FOLLOWER;
|
|
||||||
}
|
|
||||||
allocedVnodes++;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool isReady = mndIsDnodeInReadyStatus(pMnode, pDnode);
|
||||||
|
if (isReady) {
|
||||||
|
taosArrayPush(pArray, pDnode);
|
||||||
|
}
|
||||||
|
|
||||||
|
mDebug("dnode:%d, numOfVnodes:%d numOfSupportVnodes:%d isMnode:%d ready:%d", pDnode->id, numOfVnodes,
|
||||||
|
pDnode->numOfSupportVnodes, isMnode, isReady);
|
||||||
sdbRelease(pSdb, pDnode);
|
sdbRelease(pSdb, pDnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (allocedVnodes != pVgroup->replica) {
|
return pArray;
|
||||||
terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
|
}
|
||||||
return -1;
|
|
||||||
|
static int32_t mndCompareDnodeVnodes(SDnodeObj *pDnode1, SDnodeObj *pDnode2) {
|
||||||
|
float d1Score = (float)pDnode1->numOfVnodes / pDnode1->numOfSupportVnodes;
|
||||||
|
float d2Score = (float)pDnode2->numOfVnodes / pDnode2->numOfSupportVnodes;
|
||||||
|
return d1Score > d2Score ? 0 : 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndGetAvailableDnode(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray) {
|
||||||
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
int32_t allocedVnodes = 0;
|
||||||
|
void *pIter = NULL;
|
||||||
|
|
||||||
|
taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
|
||||||
|
|
||||||
|
for (int32_t v = 0; v < pVgroup->replica; ++v) {
|
||||||
|
SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
|
||||||
|
SDnodeObj *pDnode = taosArrayGet(pArray, v);
|
||||||
|
if (pDnode == NULL || pDnode->numOfVnodes > pDnode->numOfSupportVnodes) {
|
||||||
|
terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pVgid->dnodeId = pDnode->id;
|
||||||
|
if (pVgroup->replica == 1) {
|
||||||
|
pVgid->role = TAOS_SYNC_STATE_LEADER;
|
||||||
|
} else {
|
||||||
|
pVgid->role = TAOS_SYNC_STATE_FOLLOWER;
|
||||||
|
}
|
||||||
|
|
||||||
|
mDebug("db:%s, vgId:%d, vindex:%d dnodeId:%d is alloced", pVgroup->dbName, pVgroup->vgId, v, pVgid->dnodeId);
|
||||||
|
pDnode->numOfVnodes++;
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
|
int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
|
||||||
SVgObj *pVgroups = calloc(pDb->cfg.numOfVgroups, sizeof(SVgObj));
|
int32_t code = -1;
|
||||||
|
SArray *pArray = NULL;
|
||||||
|
SVgObj *pVgroups = NULL;
|
||||||
|
|
||||||
|
pVgroups = calloc(pDb->cfg.numOfVgroups, sizeof(SVgObj));
|
||||||
if (pVgroups == NULL) {
|
if (pVgroups == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
goto ALLOC_VGROUP_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pArray = mndBuildDnodesArray(pMnode);
|
||||||
|
if (pArray == NULL) {
|
||||||
|
goto ALLOC_VGROUP_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
mDebug("db:%s, total %d dnodes used to create %d vgroups (%d vnodes)", pDb->name, (int32_t)taosArrayGetSize(pArray),
|
||||||
|
pDb->cfg.numOfVgroups, pDb->cfg.numOfVgroups * pDb->cfg.replications);
|
||||||
|
|
||||||
int32_t allocedVgroups = 0;
|
int32_t allocedVgroups = 0;
|
||||||
int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
|
int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
|
||||||
uint32_t hashMin = 0;
|
uint32_t hashMin = 0;
|
||||||
|
@ -298,17 +347,23 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
|
||||||
pVgroup->dbUid = pDb->uid;
|
pVgroup->dbUid = pDb->uid;
|
||||||
pVgroup->replica = pDb->cfg.replications;
|
pVgroup->replica = pDb->cfg.replications;
|
||||||
|
|
||||||
if (mndGetAvailableDnode(pMnode, pVgroup) != 0) {
|
if (mndGetAvailableDnode(pMnode, pVgroup, pArray) != 0) {
|
||||||
terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
|
terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
|
||||||
free(pVgroups);
|
goto ALLOC_VGROUP_OVER;
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
allocedVgroups++;
|
allocedVgroups++;
|
||||||
}
|
}
|
||||||
|
|
||||||
*ppVgroups = pVgroups;
|
*ppVgroups = pVgroups;
|
||||||
return 0;
|
code = 0;
|
||||||
|
|
||||||
|
mDebug("db:%s, %d vgroups is alloced, replica:%d", pDb->name, pDb->cfg.numOfVgroups, pDb->cfg.replications);
|
||||||
|
|
||||||
|
ALLOC_VGROUP_OVER:
|
||||||
|
if (code != 0) free(pVgroups);
|
||||||
|
taosArrayDestroy(pArray);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
SEpSet mndGetVgroupEpset(SMnode *pMnode, SVgObj *pVgroup) {
|
SEpSet mndGetVgroupEpset(SMnode *pMnode, SVgObj *pVgroup) {
|
||||||
|
|
Loading…
Reference in New Issue