Merge branch 'feature/wal' into feature/sim
This commit is contained in:
commit
1e4a66d883
|
@ -44,7 +44,7 @@ static void bnUnLock() {
|
||||||
|
|
||||||
static bool bnCheckFree(SDnodeObj *pDnode) {
|
static bool bnCheckFree(SDnodeObj *pDnode) {
|
||||||
if (pDnode->status == TAOS_DN_STATUS_DROPPING || pDnode->status == TAOS_DN_STATUS_OFFLINE) {
|
if (pDnode->status == TAOS_DN_STATUS_DROPPING || pDnode->status == TAOS_DN_STATUS_OFFLINE) {
|
||||||
mError("dnode:%d, status:%s not available", pDnode->dnodeId, mnodeGetDnodeStatusStr(pDnode->status));
|
mError("dnode:%d, status:%s not available", pDnode->dnodeId, dnodeStatus[pDnode->status]);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -92,13 +92,12 @@ static void bnDiscardVnode(SVgObj *pVgroup, SVnodeGid *pVnodeGid) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void bnSwapVnodeGid(SVnodeGid *pVnodeGid1, SVnodeGid *pVnodeGid2) {
|
static void bnSwapVnodeGid(SVnodeGid *pVnodeGid1, SVnodeGid *pVnodeGid2) {
|
||||||
// SVnodeGid tmp = *pVnodeGid1;
|
SVnodeGid tmp = *pVnodeGid1;
|
||||||
// *pVnodeGid1 = *pVnodeGid2;
|
*pVnodeGid1 = *pVnodeGid2;
|
||||||
// *pVnodeGid2 = tmp;
|
*pVnodeGid2 = tmp;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t bnAllocVnodes(SVgObj *pVgroup) {
|
int32_t bnAllocVnodes(SVgObj *pVgroup) {
|
||||||
static int32_t randIndex = 0;
|
|
||||||
int32_t dnode = 0;
|
int32_t dnode = 0;
|
||||||
int32_t vnodes = 0;
|
int32_t vnodes = 0;
|
||||||
|
|
||||||
|
@ -120,8 +119,7 @@ int32_t bnAllocVnodes(SVgObj *pVgroup) {
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
mDebug("dnode:%d, is not selected, status:%s vnodes:%d disk:%fGB role:%d", pDnode->dnodeId,
|
mDebug("dnode:%d, is not selected, status:%s vnodes:%d disk:%fGB role:%d", pDnode->dnodeId,
|
||||||
mnodeGetDnodeStatusStr(pDnode->status), pDnode->openVnodes, pDnode->diskAvailable,
|
dnodeStatus[pDnode->status], pDnode->openVnodes, pDnode->diskAvailable, pDnode->alternativeRole);
|
||||||
pDnode->alternativeRole);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -137,7 +135,7 @@ int32_t bnAllocVnodes(SVgObj *pVgroup) {
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = mnodeGetNextDnode(pIter, &pDnode);
|
pIter = mnodeGetNextDnode(pIter, &pDnode);
|
||||||
if (pDnode == NULL) break;
|
if (pDnode == NULL) break;
|
||||||
mDebug("dnode:%d, status:%s vnodes:%d disk:%fGB role:%d", pDnode->dnodeId, mnodeGetDnodeStatusStr(pDnode->status),
|
mDebug("dnode:%d, status:%s vnodes:%d disk:%fGB role:%d", pDnode->dnodeId, dnodeStatus[pDnode->status],
|
||||||
pDnode->openVnodes, pDnode->diskAvailable, pDnode->alternativeRole);
|
pDnode->openVnodes, pDnode->diskAvailable, pDnode->alternativeRole);
|
||||||
mnodeDecDnodeRef(pDnode);
|
mnodeDecDnodeRef(pDnode);
|
||||||
}
|
}
|
||||||
|
@ -149,36 +147,6 @@ int32_t bnAllocVnodes(SVgObj *pVgroup) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* make the choice more random.
|
|
||||||
* replica 1: no choice
|
|
||||||
* replica 2: there are 2 combinations
|
|
||||||
* replica 3 or larger: there are 6 combinations
|
|
||||||
*/
|
|
||||||
if (pVgroup->numOfVnodes == 1) {
|
|
||||||
} else if (pVgroup->numOfVnodes == 2) {
|
|
||||||
if (randIndex++ % 2 == 0) {
|
|
||||||
bnSwapVnodeGid(pVgroup->vnodeGid, pVgroup->vnodeGid + 1);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
int32_t randVal = randIndex++ % 6;
|
|
||||||
if (randVal == 1) { // 1, 0, 2
|
|
||||||
bnSwapVnodeGid(pVgroup->vnodeGid + 0, pVgroup->vnodeGid + 1);
|
|
||||||
} else if (randVal == 2) { // 1, 2, 0
|
|
||||||
bnSwapVnodeGid(pVgroup->vnodeGid + 0, pVgroup->vnodeGid + 1);
|
|
||||||
bnSwapVnodeGid(pVgroup->vnodeGid + 1, pVgroup->vnodeGid + 2);
|
|
||||||
} else if (randVal == 3) { // 2, 1, 0
|
|
||||||
bnSwapVnodeGid(pVgroup->vnodeGid + 0, pVgroup->vnodeGid + 2);
|
|
||||||
} else if (randVal == 4) { // 2, 0, 1
|
|
||||||
bnSwapVnodeGid(pVgroup->vnodeGid + 0, pVgroup->vnodeGid + 2);
|
|
||||||
bnSwapVnodeGid(pVgroup->vnodeGid + 1, pVgroup->vnodeGid + 2);
|
|
||||||
}
|
|
||||||
if (randVal == 5) { // 0, 2, 1
|
|
||||||
bnSwapVnodeGid(pVgroup->vnodeGid + 1, pVgroup->vnodeGid + 2);
|
|
||||||
} else {
|
|
||||||
} // 0, 1, 2
|
|
||||||
}
|
|
||||||
|
|
||||||
bnReleaseDnodes();
|
bnReleaseDnodes();
|
||||||
bnUnLock();
|
bnUnLock();
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -214,44 +182,8 @@ static bool bnCheckVgroupReady(SVgObj *pVgroup, SVnodeGid *pRmVnode) {
|
||||||
static int32_t bnRemoveVnode(SVgObj *pVgroup) {
|
static int32_t bnRemoveVnode(SVgObj *pVgroup) {
|
||||||
if (pVgroup->numOfVnodes <= 1) return -1;
|
if (pVgroup->numOfVnodes <= 1) return -1;
|
||||||
|
|
||||||
SVnodeGid *pRmVnode = NULL;
|
SVnodeGid *pSelVnode = &pVgroup->vnodeGid[pVgroup->numOfVnodes - 1];
|
||||||
SVnodeGid *pSelVnode = NULL;
|
mDebug("vgId:%d, vnode in dnode:%d will be dropped", pVgroup->vgId, pSelVnode->dnodeId);
|
||||||
int32_t maxScore = 0;
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
|
|
||||||
SVnodeGid *pVnode = &(pVgroup->vnodeGid[i]);
|
|
||||||
SDnodeObj *pDnode = mnodeGetDnode(pVnode->dnodeId);
|
|
||||||
|
|
||||||
if (pDnode == NULL) {
|
|
||||||
mError("vgId:%d, dnode:%d not exist, remove it", pVgroup->vgId, pVnode->dnodeId);
|
|
||||||
pRmVnode = pVnode;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pDnode->status == TAOS_DN_STATUS_DROPPING) {
|
|
||||||
mDebug("vgId:%d, dnode:%d in dropping state", pVgroup->vgId, pVnode->dnodeId);
|
|
||||||
pRmVnode = pVnode;
|
|
||||||
} else if (pVnode->dnodeId == pVgroup->lbDnodeId) {
|
|
||||||
mDebug("vgId:%d, dnode:%d in updating state", pVgroup->vgId, pVnode->dnodeId);
|
|
||||||
pRmVnode = pVnode;
|
|
||||||
} else {
|
|
||||||
if (pSelVnode == NULL) {
|
|
||||||
pSelVnode = pVnode;
|
|
||||||
maxScore = pDnode->score;
|
|
||||||
} else {
|
|
||||||
if (maxScore < pDnode->score) {
|
|
||||||
pSelVnode = pVnode;
|
|
||||||
maxScore = pDnode->score;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
mnodeDecDnodeRef(pDnode);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pRmVnode != NULL) {
|
|
||||||
pSelVnode = pRmVnode;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!bnCheckVgroupReady(pVgroup, pSelVnode)) {
|
if (!bnCheckVgroupReady(pVgroup, pSelVnode)) {
|
||||||
mDebug("vgId:%d, is not ready", pVgroup->vgId);
|
mDebug("vgId:%d, is not ready", pVgroup->vgId);
|
||||||
|
@ -275,36 +207,42 @@ static bool bnCheckDnodeInVgroup(SDnodeObj *pDnode, SVgObj *pVgroup) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
static SDnodeObj *bnGetAvailDnode(SVgObj *pVgroup) {
|
||||||
* desc: add vnode to vgroup, find a new one if dest dnode is null
|
for (int32_t i = 0; i < tsBnDnodes.size; ++i) {
|
||||||
**/
|
SDnodeObj *pDnode = tsBnDnodes.list[i];
|
||||||
|
if (bnCheckDnodeInVgroup(pDnode, pVgroup)) continue;
|
||||||
|
if (!bnCheckFree(pDnode)) continue;
|
||||||
|
|
||||||
|
mDebug("vgId:%d, add vnode to dnode:%d", pVgroup->vgId, pDnode->dnodeId);
|
||||||
|
return pDnode;
|
||||||
|
}
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t bnAddVnode(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDestDnode) {
|
static int32_t bnAddVnode(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDestDnode) {
|
||||||
if (pDestDnode == NULL) {
|
if (pDestDnode == NULL || pSrcDnode == pDestDnode) {
|
||||||
for (int32_t i = 0; i < tsBnDnodes.size; ++i) {
|
return TSDB_CODE_MND_DNODE_NOT_EXIST;
|
||||||
SDnodeObj *pDnode = tsBnDnodes.list[i];
|
}
|
||||||
if (pDnode == pSrcDnode) continue;
|
|
||||||
if (bnCheckDnodeInVgroup(pDnode, pVgroup)) continue;
|
SVnodeGid vnodeGids[TSDB_MAX_REPLICA];
|
||||||
if (!bnCheckFree(pDnode)) continue;
|
memcpy(&vnodeGids, &pVgroup->vnodeGid, sizeof(SVnodeGid) * TSDB_MAX_REPLICA);
|
||||||
|
|
||||||
pDestDnode = pDnode;
|
int32_t numOfVnodes = pVgroup->numOfVnodes;
|
||||||
mDebug("vgId:%d, add vnode to dnode:%d", pVgroup->vgId, pDnode->dnodeId);
|
vnodeGids[numOfVnodes].dnodeId = pDestDnode->dnodeId;
|
||||||
|
vnodeGids[numOfVnodes].pDnode = pDestDnode;
|
||||||
|
numOfVnodes++;
|
||||||
|
|
||||||
|
for (int32_t v = 0; v < numOfVnodes; ++v) {
|
||||||
|
if (pSrcDnode != NULL && pSrcDnode->dnodeId == vnodeGids[v].dnodeId) {
|
||||||
|
bnSwapVnodeGid(&vnodeGids[v], &vnodeGids[numOfVnodes - 1]);
|
||||||
|
pVgroup->lbDnodeId = pSrcDnode->dnodeId;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pDestDnode == NULL) {
|
memcpy(&pVgroup->vnodeGid, &vnodeGids, sizeof(SVnodeGid) * TSDB_MAX_REPLICA);
|
||||||
return TSDB_CODE_MND_DNODE_NOT_EXIST;
|
pVgroup->numOfVnodes = numOfVnodes;
|
||||||
}
|
|
||||||
|
|
||||||
SVnodeGid *pVnodeGid = pVgroup->vnodeGid + pVgroup->numOfVnodes;
|
|
||||||
pVnodeGid->dnodeId = pDestDnode->dnodeId;
|
|
||||||
pVnodeGid->pDnode = pDestDnode;
|
|
||||||
pVgroup->numOfVnodes++;
|
|
||||||
|
|
||||||
if (pSrcDnode != NULL) {
|
|
||||||
pVgroup->lbDnodeId = pSrcDnode->dnodeId;
|
|
||||||
}
|
|
||||||
|
|
||||||
atomic_add_fetch_32(&pDestDnode->openVnodes, 1);
|
atomic_add_fetch_32(&pDestDnode->openVnodes, 1);
|
||||||
|
|
||||||
mnodeUpdateVgroup(pVgroup);
|
mnodeUpdateVgroup(pVgroup);
|
||||||
|
@ -315,16 +253,16 @@ static int32_t bnAddVnode(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDes
|
||||||
static bool bnMonitorBalance() {
|
static bool bnMonitorBalance() {
|
||||||
if (tsBnDnodes.size < 2) return false;
|
if (tsBnDnodes.size < 2) return false;
|
||||||
|
|
||||||
|
mDebug("monitor dnodes for balance, avail:%d", tsBnDnodes.size);
|
||||||
for (int32_t src = tsBnDnodes.size - 1; src >= 0; --src) {
|
for (int32_t src = tsBnDnodes.size - 1; src >= 0; --src) {
|
||||||
SDnodeObj *pDnode = tsBnDnodes.list[src];
|
SDnodeObj *pDnode = tsBnDnodes.list[src];
|
||||||
mDebug("%d-dnode:%d, state:%s, score:%.1f, numOfCores:%d, openVnodes:%d", tsBnDnodes.size - src - 1,
|
mDebug("%d-dnode:%d, state:%s, score:%.1f, cores:%d, vnodes:%d", tsBnDnodes.size - src - 1, pDnode->dnodeId,
|
||||||
pDnode->dnodeId, mnodeGetDnodeStatusStr(pDnode->status), pDnode->score, pDnode->numOfCores,
|
dnodeStatus[pDnode->status], pDnode->score, pDnode->numOfCores, pDnode->openVnodes);
|
||||||
pDnode->openVnodes);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
float scoresDiff = tsBnDnodes.list[tsBnDnodes.size - 1]->score - tsBnDnodes.list[0]->score;
|
float scoresDiff = tsBnDnodes.list[tsBnDnodes.size - 1]->score - tsBnDnodes.list[0]->score;
|
||||||
if (scoresDiff < 0.01) {
|
if (scoresDiff < 0.01) {
|
||||||
mDebug("all dnodes:%d is already balanced, scoresDiff:%f", tsBnDnodes.size, scoresDiff);
|
mDebug("all dnodes:%d is already balanced, scoreDiff:%.1f", tsBnDnodes.size, scoresDiff);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -412,7 +350,13 @@ static int32_t bnMonitorVgroups() {
|
||||||
} else if (vgReplica < dbReplica) {
|
} else if (vgReplica < dbReplica) {
|
||||||
mInfo("vgId:%d, replica:%d numOfVnodes:%d, try add one vnode", pVgroup->vgId, dbReplica, vgReplica);
|
mInfo("vgId:%d, replica:%d numOfVnodes:%d, try add one vnode", pVgroup->vgId, dbReplica, vgReplica);
|
||||||
hasUpdatingVgroup = true;
|
hasUpdatingVgroup = true;
|
||||||
code = bnAddVnode(pVgroup, NULL, NULL);
|
|
||||||
|
SDnodeObj *pAvailDnode = bnGetAvailDnode(pVgroup);
|
||||||
|
if (pAvailDnode == NULL) {
|
||||||
|
code = TSDB_CODE_MND_DNODE_NOT_EXIST;
|
||||||
|
} else {
|
||||||
|
code = bnAddVnode(pVgroup, NULL, pAvailDnode);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
mnodeDecVgroupRef(pVgroup);
|
mnodeDecVgroupRef(pVgroup);
|
||||||
|
|
|
@ -299,7 +299,7 @@ static int32_t bnRetrieveScores(SShowObj *pShow, char *data, int32_t rows, void
|
||||||
cols++;
|
cols++;
|
||||||
|
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||||
STR_TO_VARSTR(pWrite, mnodeGetDnodeStatusStr(pDnode->status));
|
STR_TO_VARSTR(pWrite, dnodeStatus[pDnode->status]);
|
||||||
cols++;
|
cols++;
|
||||||
|
|
||||||
numOfRows++;
|
numOfRows++;
|
||||||
|
|
|
@ -129,7 +129,8 @@ static void *dnodeProcessMgmtQueue(void *wparam) {
|
||||||
static SCreateVnodeMsg* dnodeParseVnodeMsg(SRpcMsg *rpcMsg) {
|
static SCreateVnodeMsg* dnodeParseVnodeMsg(SRpcMsg *rpcMsg) {
|
||||||
SCreateVnodeMsg *pCreate = rpcMsg->pCont;
|
SCreateVnodeMsg *pCreate = rpcMsg->pCont;
|
||||||
pCreate->cfg.vgId = htonl(pCreate->cfg.vgId);
|
pCreate->cfg.vgId = htonl(pCreate->cfg.vgId);
|
||||||
pCreate->cfg.cfgVersion = htonl(pCreate->cfg.cfgVersion);
|
pCreate->cfg.dbCfgVersion = htonl(pCreate->cfg.dbCfgVersion);
|
||||||
|
pCreate->cfg.vgCfgVersion = htonl(pCreate->cfg.vgCfgVersion);
|
||||||
pCreate->cfg.maxTables = htonl(pCreate->cfg.maxTables);
|
pCreate->cfg.maxTables = htonl(pCreate->cfg.maxTables);
|
||||||
pCreate->cfg.cacheBlockSize = htonl(pCreate->cfg.cacheBlockSize);
|
pCreate->cfg.cacheBlockSize = htonl(pCreate->cfg.cacheBlockSize);
|
||||||
pCreate->cfg.totalBlocks = htonl(pCreate->cfg.totalBlocks);
|
pCreate->cfg.totalBlocks = htonl(pCreate->cfg.totalBlocks);
|
||||||
|
|
|
@ -518,14 +518,15 @@ typedef struct SRetrieveTableRsp {
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
int32_t cfgVersion;
|
int32_t dbCfgVersion;
|
||||||
int64_t totalStorage;
|
int64_t totalStorage;
|
||||||
int64_t compStorage;
|
int64_t compStorage;
|
||||||
int64_t pointsWritten;
|
int64_t pointsWritten;
|
||||||
uint8_t status;
|
uint8_t status;
|
||||||
uint8_t role;
|
uint8_t role;
|
||||||
uint8_t replica;
|
uint8_t replica;
|
||||||
uint8_t reserved[5];
|
uint8_t reserved;
|
||||||
|
int32_t vgCfgVersion;
|
||||||
} SVnodeLoad;
|
} SVnodeLoad;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -641,7 +642,7 @@ typedef struct {
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
uint32_t vgId;
|
uint32_t vgId;
|
||||||
int32_t cfgVersion;
|
int32_t dbCfgVersion;
|
||||||
int32_t maxTables;
|
int32_t maxTables;
|
||||||
int32_t cacheBlockSize;
|
int32_t cacheBlockSize;
|
||||||
int32_t totalBlocks;
|
int32_t totalBlocks;
|
||||||
|
@ -660,7 +661,8 @@ typedef struct {
|
||||||
int8_t wals;
|
int8_t wals;
|
||||||
int8_t quorum;
|
int8_t quorum;
|
||||||
int8_t update;
|
int8_t update;
|
||||||
int8_t reserved[15];
|
int8_t reserved[11];
|
||||||
|
int32_t vgCfgVersion;
|
||||||
} SVnodeCfg;
|
} SVnodeCfg;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -144,7 +144,8 @@ typedef struct SVgObj {
|
||||||
int8_t status;
|
int8_t status;
|
||||||
int8_t reserved0[4];
|
int8_t reserved0[4];
|
||||||
SVnodeGid vnodeGid[TSDB_MAX_REPLICA];
|
SVnodeGid vnodeGid[TSDB_MAX_REPLICA];
|
||||||
int8_t reserved1[12];
|
int32_t vgCfgVersion;
|
||||||
|
int8_t reserved1[8];
|
||||||
int8_t updateEnd[4];
|
int8_t updateEnd[4];
|
||||||
int32_t refCount;
|
int32_t refCount;
|
||||||
int32_t numOfTables;
|
int32_t numOfTables;
|
||||||
|
@ -181,7 +182,7 @@ typedef struct SDbObj {
|
||||||
int8_t reserved0[4];
|
int8_t reserved0[4];
|
||||||
char acct[TSDB_USER_LEN];
|
char acct[TSDB_USER_LEN];
|
||||||
int64_t createdTime;
|
int64_t createdTime;
|
||||||
int32_t cfgVersion;
|
int32_t dbCfgVersion;
|
||||||
SDbCfg cfg;
|
SDbCfg cfg;
|
||||||
int8_t status;
|
int8_t status;
|
||||||
int8_t reserved1[11];
|
int8_t reserved1[11];
|
||||||
|
|
|
@ -55,12 +55,12 @@ typedef enum EDnodeOfflineReason {
|
||||||
TAOS_DN_OFF_OTHERS
|
TAOS_DN_OFF_OTHERS
|
||||||
} EDnodeOfflineReason;
|
} EDnodeOfflineReason;
|
||||||
|
|
||||||
|
extern char* dnodeStatus[];
|
||||||
|
extern char* dnodeRoles[];
|
||||||
|
|
||||||
int32_t mnodeInitDnodes();
|
int32_t mnodeInitDnodes();
|
||||||
void mnodeCleanupDnodes();
|
void mnodeCleanupDnodes();
|
||||||
|
|
||||||
char* mnodeGetDnodeStatusStr(int32_t dnodeStatus);
|
|
||||||
void mgmtMonitorDnodeModule();
|
|
||||||
|
|
||||||
int32_t mnodeGetDnodesNum();
|
int32_t mnodeGetDnodesNum();
|
||||||
int32_t mnodeGetOnlinDnodesCpuCoreNum();
|
int32_t mnodeGetOnlinDnodesCpuCoreNum();
|
||||||
int32_t mnodeGetOnlineDnodesNum();
|
int32_t mnodeGetOnlineDnodesNum();
|
||||||
|
|
|
@ -1015,7 +1015,7 @@ static int32_t mnodeAlterDb(SDbObj *pDb, SAlterDbMsg *pAlter, void *pMsg) {
|
||||||
|
|
||||||
if (memcmp(&newCfg, &pDb->cfg, sizeof(SDbCfg)) != 0) {
|
if (memcmp(&newCfg, &pDb->cfg, sizeof(SDbCfg)) != 0) {
|
||||||
pDb->cfg = newCfg;
|
pDb->cfg = newCfg;
|
||||||
pDb->cfgVersion++;
|
pDb->dbCfgVersion++;
|
||||||
SSdbRow row = {
|
SSdbRow row = {
|
||||||
.type = SDB_OPER_GLOBAL,
|
.type = SDB_OPER_GLOBAL,
|
||||||
.pTable = tsDbSdb,
|
.pTable = tsDbSdb,
|
||||||
|
|
|
@ -63,7 +63,6 @@ static int32_t mnodeGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC
|
||||||
static int32_t mnodeRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn);
|
static int32_t mnodeRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn);
|
||||||
static int32_t mnodeGetDnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
|
static int32_t mnodeGetDnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
|
||||||
static int32_t mnodeRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn);
|
static int32_t mnodeRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn);
|
||||||
static char* mnodeGetDnodeAlternativeRoleStr(int32_t alternativeRole);
|
|
||||||
static void mnodeUpdateDnodeEps();
|
static void mnodeUpdateDnodeEps();
|
||||||
|
|
||||||
static char* offlineReason[] = {
|
static char* offlineReason[] = {
|
||||||
|
@ -557,7 +556,8 @@ static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) {
|
||||||
for (int32_t j = 0; j < openVnodes; ++j) {
|
for (int32_t j = 0; j < openVnodes; ++j) {
|
||||||
SVnodeLoad *pVload = &pStatus->load[j];
|
SVnodeLoad *pVload = &pStatus->load[j];
|
||||||
pVload->vgId = htonl(pVload->vgId);
|
pVload->vgId = htonl(pVload->vgId);
|
||||||
pVload->cfgVersion = htonl(pVload->cfgVersion);
|
pVload->dbCfgVersion = htonl(pVload->dbCfgVersion);
|
||||||
|
pVload->vgCfgVersion = htonl(pVload->vgCfgVersion);
|
||||||
|
|
||||||
SVgObj *pVgroup = mnodeGetVgroup(pVload->vgId);
|
SVgObj *pVgroup = mnodeGetVgroup(pVload->vgId);
|
||||||
if (pVgroup == NULL) {
|
if (pVgroup == NULL) {
|
||||||
|
@ -833,12 +833,12 @@ static int32_t mnodeRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, vo
|
||||||
cols++;
|
cols++;
|
||||||
|
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||||
char* status = mnodeGetDnodeStatusStr(pDnode->status);
|
char* status = dnodeStatus[pDnode->status];
|
||||||
STR_TO_VARSTR(pWrite, status);
|
STR_TO_VARSTR(pWrite, status);
|
||||||
cols++;
|
cols++;
|
||||||
|
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||||
char* role = mnodeGetDnodeAlternativeRoleStr(pDnode->alternativeRole);
|
char* role = dnodeRoles[pDnode->alternativeRole];
|
||||||
STR_TO_VARSTR(pWrite, role);
|
STR_TO_VARSTR(pWrite, role);
|
||||||
cols++;
|
cols++;
|
||||||
|
|
||||||
|
@ -1154,21 +1154,17 @@ static int32_t mnodeRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, vo
|
||||||
return numOfRows;
|
return numOfRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
char* mnodeGetDnodeStatusStr(int32_t dnodeStatus) {
|
char* dnodeStatus[] = {
|
||||||
switch (dnodeStatus) {
|
"offline",
|
||||||
case TAOS_DN_STATUS_OFFLINE: return "offline";
|
"dropping",
|
||||||
case TAOS_DN_STATUS_DROPPING: return "dropping";
|
"balancing",
|
||||||
case TAOS_DN_STATUS_BALANCING: return "balancing";
|
"ready",
|
||||||
case TAOS_DN_STATUS_READY: return "ready";
|
"undefined"
|
||||||
default: return "undefined";
|
};
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static char* mnodeGetDnodeAlternativeRoleStr(int32_t alternativeRole) {
|
char* dnodeRoles[] = {
|
||||||
switch (alternativeRole) {
|
"any",
|
||||||
case TAOS_DN_ALTERNATIVE_ROLE_ANY: return "any";
|
"mnode",
|
||||||
case TAOS_DN_ALTERNATIVE_ROLE_MNODE: return "mnode";
|
"vnode",
|
||||||
case TAOS_DN_ALTERNATIVE_ROLE_VNODE: return "vnode";
|
"any"
|
||||||
default:return "any";
|
};
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -256,6 +256,8 @@ SVgObj *mnodeGetVgroup(int32_t vgId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void mnodeUpdateVgroup(SVgObj *pVgroup) {
|
void mnodeUpdateVgroup(SVgObj *pVgroup) {
|
||||||
|
pVgroup->vgCfgVersion++;
|
||||||
|
|
||||||
SSdbRow row = {
|
SSdbRow row = {
|
||||||
.type = SDB_OPER_GLOBAL,
|
.type = SDB_OPER_GLOBAL,
|
||||||
.pTable = tsVgroupSdb,
|
.pTable = tsVgroupSdb,
|
||||||
|
@ -339,10 +341,11 @@ void mnodeUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVl
|
||||||
pVgroup->pointsWritten = htobe64(pVload->pointsWritten);
|
pVgroup->pointsWritten = htobe64(pVload->pointsWritten);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pVload->cfgVersion != pVgroup->pDb->cfgVersion || pVload->replica != pVgroup->numOfVnodes) {
|
if (pVload->dbCfgVersion != pVgroup->pDb->dbCfgVersion || pVload->replica != pVgroup->numOfVnodes ||
|
||||||
mError("dnode:%d, vgId:%d, vnode cfgVersion:%d repica:%d not match with mnode cfgVersion:%d replica:%d",
|
pVload->vgCfgVersion != pVgroup->vgCfgVersion) {
|
||||||
pDnode->dnodeId, pVload->vgId, pVload->cfgVersion, pVload->replica, pVgroup->pDb->cfgVersion,
|
mError("dnode:%d, vgId:%d, vnode cfgVersion:%d:%d repica:%d not match with mnode cfgVersion:%d:%d replica:%d",
|
||||||
pVgroup->numOfVnodes);
|
pDnode->dnodeId, pVload->vgId, pVload->dbCfgVersion, pVload->vgCfgVersion, pVload->replica,
|
||||||
|
pVgroup->pDb->dbCfgVersion, pVgroup->vgCfgVersion, pVgroup->numOfVnodes);
|
||||||
mnodeSendAlterVgroupMsg(pVgroup);
|
mnodeSendAlterVgroupMsg(pVgroup);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -840,7 +843,8 @@ static SCreateVnodeMsg *mnodeBuildVnodeMsg(SVgObj *pVgroup) {
|
||||||
|
|
||||||
SVnodeCfg *pCfg = &pVnode->cfg;
|
SVnodeCfg *pCfg = &pVnode->cfg;
|
||||||
pCfg->vgId = htonl(pVgroup->vgId);
|
pCfg->vgId = htonl(pVgroup->vgId);
|
||||||
pCfg->cfgVersion = htonl(pDb->cfgVersion);
|
pCfg->dbCfgVersion = htonl(pDb->dbCfgVersion);
|
||||||
|
pCfg->vgCfgVersion = htonl(pVgroup->vgCfgVersion);
|
||||||
pCfg->cacheBlockSize = htonl(pDb->cfg.cacheBlockSize);
|
pCfg->cacheBlockSize = htonl(pDb->cfg.cacheBlockSize);
|
||||||
pCfg->totalBlocks = htonl(pDb->cfg.totalBlocks);
|
pCfg->totalBlocks = htonl(pDb->cfg.totalBlocks);
|
||||||
pCfg->maxTables = htonl(maxTables + 1);
|
pCfg->maxTables = htonl(maxTables + 1);
|
||||||
|
|
|
@ -56,7 +56,8 @@ typedef struct {
|
||||||
int64_t sync;
|
int64_t sync;
|
||||||
void * events;
|
void * events;
|
||||||
void * cq; // continuous query
|
void * cq; // continuous query
|
||||||
int32_t cfgVersion;
|
int32_t dbCfgVersion;
|
||||||
|
int32_t vgCfgVersion;
|
||||||
STsdbCfg tsdbCfg;
|
STsdbCfg tsdbCfg;
|
||||||
SSyncCfg syncCfg;
|
SSyncCfg syncCfg;
|
||||||
SWalCfg walCfg;
|
SWalCfg walCfg;
|
||||||
|
|
|
@ -22,7 +22,8 @@
|
||||||
|
|
||||||
static void vnodeLoadCfg(SVnodeObj *pVnode, SCreateVnodeMsg* vnodeMsg) {
|
static void vnodeLoadCfg(SVnodeObj *pVnode, SCreateVnodeMsg* vnodeMsg) {
|
||||||
tstrncpy(pVnode->db, vnodeMsg->db, sizeof(pVnode->db));
|
tstrncpy(pVnode->db, vnodeMsg->db, sizeof(pVnode->db));
|
||||||
pVnode->cfgVersion = vnodeMsg->cfg.cfgVersion;
|
pVnode->dbCfgVersion = vnodeMsg->cfg.dbCfgVersion;
|
||||||
|
pVnode->vgCfgVersion = vnodeMsg->cfg.vgCfgVersion;
|
||||||
pVnode->tsdbCfg.cacheBlockSize = vnodeMsg->cfg.cacheBlockSize;
|
pVnode->tsdbCfg.cacheBlockSize = vnodeMsg->cfg.cacheBlockSize;
|
||||||
pVnode->tsdbCfg.totalBlocks = vnodeMsg->cfg.totalBlocks;
|
pVnode->tsdbCfg.totalBlocks = vnodeMsg->cfg.totalBlocks;
|
||||||
pVnode->tsdbCfg.daysPerFile = vnodeMsg->cfg.daysPerFile;
|
pVnode->tsdbCfg.daysPerFile = vnodeMsg->cfg.daysPerFile;
|
||||||
|
@ -95,12 +96,19 @@ int32_t vnodeReadCfg(SVnodeObj *pVnode) {
|
||||||
}
|
}
|
||||||
tstrncpy(vnodeMsg.db, db->valuestring, sizeof(vnodeMsg.db));
|
tstrncpy(vnodeMsg.db, db->valuestring, sizeof(vnodeMsg.db));
|
||||||
|
|
||||||
cJSON *cfgVersion = cJSON_GetObjectItem(root, "cfgVersion");
|
cJSON *dbCfgVersion = cJSON_GetObjectItem(root, "cfgVersion");
|
||||||
if (!cfgVersion || cfgVersion->type != cJSON_Number) {
|
if (!dbCfgVersion || dbCfgVersion->type != cJSON_Number) {
|
||||||
vError("vgId:%d, failed to read %s, cfgVersion not found", pVnode->vgId, file);
|
vError("vgId:%d, failed to read %s, cfgVersion not found", pVnode->vgId, file);
|
||||||
goto PARSE_VCFG_ERROR;
|
goto PARSE_VCFG_ERROR;
|
||||||
}
|
}
|
||||||
vnodeMsg.cfg.cfgVersion = cfgVersion->valueint;
|
vnodeMsg.cfg.dbCfgVersion = dbCfgVersion->valueint;
|
||||||
|
|
||||||
|
cJSON *vgCfgVersion = cJSON_GetObjectItem(root, "vgCfgVersion");
|
||||||
|
if (!vgCfgVersion || vgCfgVersion->type != cJSON_Number) {
|
||||||
|
vError("vgId:%d, failed to read %s, vgCfgVersion not found", pVnode->vgId, file);
|
||||||
|
goto PARSE_VCFG_ERROR;
|
||||||
|
}
|
||||||
|
vnodeMsg.cfg.vgCfgVersion = vgCfgVersion->valueint;
|
||||||
|
|
||||||
cJSON *cacheBlockSize = cJSON_GetObjectItem(root, "cacheBlockSize");
|
cJSON *cacheBlockSize = cJSON_GetObjectItem(root, "cacheBlockSize");
|
||||||
if (!cacheBlockSize || cacheBlockSize->type != cJSON_Number) {
|
if (!cacheBlockSize || cacheBlockSize->type != cJSON_Number) {
|
||||||
|
@ -278,7 +286,8 @@ int32_t vnodeWriteCfg(SCreateVnodeMsg *pMsg) {
|
||||||
|
|
||||||
len += snprintf(content + len, maxLen - len, "{\n");
|
len += snprintf(content + len, maxLen - len, "{\n");
|
||||||
len += snprintf(content + len, maxLen - len, " \"db\": \"%s\",\n", pMsg->db);
|
len += snprintf(content + len, maxLen - len, " \"db\": \"%s\",\n", pMsg->db);
|
||||||
len += snprintf(content + len, maxLen - len, " \"cfgVersion\": %d,\n", pMsg->cfg.cfgVersion);
|
len += snprintf(content + len, maxLen - len, " \"cfgVersion\": %d,\n", pMsg->cfg.dbCfgVersion);
|
||||||
|
len += snprintf(content + len, maxLen - len, " \"vgCfgVersion\": %d,\n", pMsg->cfg.vgCfgVersion);
|
||||||
len += snprintf(content + len, maxLen - len, " \"cacheBlockSize\": %d,\n", pMsg->cfg.cacheBlockSize);
|
len += snprintf(content + len, maxLen - len, " \"cacheBlockSize\": %d,\n", pMsg->cfg.cacheBlockSize);
|
||||||
len += snprintf(content + len, maxLen - len, " \"totalBlocks\": %d,\n", pMsg->cfg.totalBlocks);
|
len += snprintf(content + len, maxLen - len, " \"totalBlocks\": %d,\n", pMsg->cfg.totalBlocks);
|
||||||
len += snprintf(content + len, maxLen - len, " \"daysPerFile\": %d,\n", pMsg->cfg.daysPerFile);
|
len += snprintf(content + len, maxLen - len, " \"daysPerFile\": %d,\n", pMsg->cfg.daysPerFile);
|
||||||
|
|
|
@ -154,7 +154,7 @@ int32_t vnodeAlter(void *vparam, SCreateVnodeMsg *pVnodeCfg) {
|
||||||
SVnodeObj *pVnode = vparam;
|
SVnodeObj *pVnode = vparam;
|
||||||
|
|
||||||
// vnode in non-ready state and still needs to return success instead of TSDB_CODE_VND_INVALID_STATUS
|
// vnode in non-ready state and still needs to return success instead of TSDB_CODE_VND_INVALID_STATUS
|
||||||
// cfgVersion can be corrected by status msg
|
// dbCfgVersion can be corrected by status msg
|
||||||
if (!vnodeSetUpdatingStatus(pVnode)) {
|
if (!vnodeSetUpdatingStatus(pVnode)) {
|
||||||
vDebug("vgId:%d, vnode is not ready, do alter operation later", pVnode->vgId);
|
vDebug("vgId:%d, vnode is not ready, do alter operation later", pVnode->vgId);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -134,7 +134,8 @@ static void vnodeBuildVloadMsg(SVnodeObj *pVnode, SStatusMsg *pStatus) {
|
||||||
|
|
||||||
SVnodeLoad *pLoad = &pStatus->load[pStatus->openVnodes++];
|
SVnodeLoad *pLoad = &pStatus->load[pStatus->openVnodes++];
|
||||||
pLoad->vgId = htonl(pVnode->vgId);
|
pLoad->vgId = htonl(pVnode->vgId);
|
||||||
pLoad->cfgVersion = htonl(pVnode->cfgVersion);
|
pLoad->dbCfgVersion = htonl(pVnode->dbCfgVersion);
|
||||||
|
pLoad->vgCfgVersion = htonl(pVnode->vgCfgVersion);
|
||||||
pLoad->totalStorage = htobe64(totalStorage);
|
pLoad->totalStorage = htobe64(totalStorage);
|
||||||
pLoad->compStorage = htobe64(compStorage);
|
pLoad->compStorage = htobe64(compStorage);
|
||||||
pLoad->pointsWritten = htobe64(pointsWritten);
|
pLoad->pointsWritten = htobe64(pointsWritten);
|
||||||
|
|
Loading…
Reference in New Issue