Merge pull request #17613 from taosdata/enh/TD-19460
enh: redistribute vgroup
This commit is contained in:
commit
9520ef4b41
|
@ -307,6 +307,10 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
|
||||||
}
|
}
|
||||||
vmReleaseVnode(pMgmt, pVnode);
|
vmReleaseVnode(pMgmt, pVnode);
|
||||||
}
|
}
|
||||||
|
if (size < 0) {
|
||||||
|
dError("vgId:%d, can't get size from queue since %s, qtype:%d", vgId, terrstr(), qtype);
|
||||||
|
size = 0;
|
||||||
|
}
|
||||||
return size;
|
return size;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -918,13 +918,19 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
|
||||||
sendRsp = true;
|
sendRsp = true;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (pTrans->stage == TRN_STAGE_REDO_ACTION && pTrans->failedTimes > 6) {
|
if (pTrans->stage == TRN_STAGE_REDO_ACTION && ((code == TSDB_CODE_APP_NOT_READY && pTrans->failedTimes > 60) ||
|
||||||
|
(code != TSDB_CODE_APP_NOT_READY && pTrans->failedTimes > 6))) {
|
||||||
if (code == 0) code = TSDB_CODE_MND_TRANS_UNKNOW_ERROR;
|
if (code == 0) code = TSDB_CODE_MND_TRANS_UNKNOW_ERROR;
|
||||||
sendRsp = true;
|
sendRsp = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!sendRsp) return;
|
if (!sendRsp) {
|
||||||
|
return;
|
||||||
|
} else {
|
||||||
|
mInfo("trans:%d, send rsp, stage:%s failedTimes:%d code:0x%x", pTrans->id, mndTransStr(pTrans->stage),
|
||||||
|
pTrans->failedTimes, code);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t size = taosArrayGetSize(pTrans->pRpcArray);
|
int32_t size = taosArrayGetSize(pTrans->pRpcArray);
|
||||||
if (size <= 0) return;
|
if (size <= 0) return;
|
||||||
|
|
|
@ -260,6 +260,12 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mInfo("vgId:%d, build create vnode req, replica:%d selfIndex:%d strict:%d", createReq.vgId, createReq.replica,
|
||||||
|
createReq.selfIndex, createReq.strict);
|
||||||
|
for (int32_t i = 0; i < createReq.replica; ++i) {
|
||||||
|
mInfo("vgId:%d, replica:%d ep:%s:%u", createReq.vgId, i, createReq.replicas[i].fqdn, createReq.replicas[i].port);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t contLen = tSerializeSCreateVnodeReq(NULL, 0, &createReq);
|
int32_t contLen = tSerializeSCreateVnodeReq(NULL, 0, &createReq);
|
||||||
if (contLen < 0) {
|
if (contLen < 0) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -293,6 +299,7 @@ static void *mndBuildAlterVnodeConfigReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pV
|
||||||
alterReq.strict = pDb->cfg.strict;
|
alterReq.strict = pDb->cfg.strict;
|
||||||
alterReq.cacheLast = pDb->cfg.cacheLast;
|
alterReq.cacheLast = pDb->cfg.cacheLast;
|
||||||
|
|
||||||
|
mInfo("vgId:%d, build alter vnode config req", pVgroup->vgId);
|
||||||
int32_t contLen = tSerializeSAlterVnodeConfigReq(NULL, 0, &alterReq);
|
int32_t contLen = tSerializeSAlterVnodeConfigReq(NULL, 0, &alterReq);
|
||||||
if (contLen < 0) {
|
if (contLen < 0) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -342,7 +349,7 @@ static void *mndBuildAlterVnodeReplicaReq(SMnode *pMnode, SDbObj *pDb, SVgObj *p
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
alterReq.replica = pVgroup->replica;
|
alterReq.replica = pVgroup->replica;
|
||||||
mInfo("vgId:%d, start to alter vnode, replica:%d selfIndex:%d strict:%d", alterReq.vgId, alterReq.replica,
|
mInfo("vgId:%d, build alter vnode req, replica:%d selfIndex:%d strict:%d", alterReq.vgId, alterReq.replica,
|
||||||
alterReq.selfIndex, alterReq.strict);
|
alterReq.selfIndex, alterReq.strict);
|
||||||
for (int32_t i = 0; i < alterReq.replica; ++i) {
|
for (int32_t i = 0; i < alterReq.replica; ++i) {
|
||||||
mInfo("vgId:%d, replica:%d ep:%s:%u", alterReq.vgId, i, alterReq.replicas[i].fqdn, alterReq.replicas[i].port);
|
mInfo("vgId:%d, replica:%d ep:%s:%u", alterReq.vgId, i, alterReq.replicas[i].fqdn, alterReq.replicas[i].port);
|
||||||
|
@ -377,6 +384,7 @@ void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgOb
|
||||||
memcpy(dropReq.db, pDb->name, TSDB_DB_FNAME_LEN);
|
memcpy(dropReq.db, pDb->name, TSDB_DB_FNAME_LEN);
|
||||||
dropReq.dbUid = pDb->uid;
|
dropReq.dbUid = pDb->uid;
|
||||||
|
|
||||||
|
mInfo("vgId:%d, build drop vnode req", dropReq.vgId);
|
||||||
int32_t contLen = tSerializeSDropVnodeReq(NULL, 0, &dropReq);
|
int32_t contLen = tSerializeSDropVnodeReq(NULL, 0, &dropReq);
|
||||||
if (contLen < 0) {
|
if (contLen < 0) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -1185,9 +1193,21 @@ static int32_t mndAddIncVgroupReplicaToTrans(SMnode *pMnode, STrans *pTrans, SDb
|
||||||
pGid->dnodeId = newDnodeId;
|
pGid->dnodeId = newDnodeId;
|
||||||
pGid->syncState = TAOS_SYNC_STATE_ERROR;
|
pGid->syncState = TAOS_SYNC_STATE_ERROR;
|
||||||
|
|
||||||
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, pVgroup, pGid) != 0) return -1;
|
if (pVgroup->replica == 2) {
|
||||||
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, -1) != 0) return -1;
|
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, pVgroup, pGid) != 0) return -1;
|
||||||
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup) != 0) return -1;
|
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[0].dnodeId) != 0) return -1;
|
||||||
|
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup) != 0) return -1;
|
||||||
|
} else if (pVgroup->replica == 4) {
|
||||||
|
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, pVgroup, pGid) != 0) return -1;
|
||||||
|
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[0].dnodeId) != 0) return -1;
|
||||||
|
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[1].dnodeId) != 0) return -1;
|
||||||
|
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[2].dnodeId) != 0) return -1;
|
||||||
|
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup) != 0) return -1;
|
||||||
|
} else {
|
||||||
|
mError("vgId:%d, failed to add 1 vnode since invalid replica:%d", pVgroup->vgId, pVgroup->replica);
|
||||||
|
terrno = TSDB_CODE_MND_APP_ERROR;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -1212,9 +1232,21 @@ static int32_t mndAddDecVgroupReplicaFromTrans(SMnode *pMnode, STrans *pTrans, S
|
||||||
memcpy(pGid, &pVgroup->vnodeGid[pVgroup->replica], sizeof(SVnodeGid));
|
memcpy(pGid, &pVgroup->vnodeGid[pVgroup->replica], sizeof(SVnodeGid));
|
||||||
memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
|
memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
|
||||||
|
|
||||||
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, -1) != 0) return -1;
|
if (pVgroup->replica == 1) {
|
||||||
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, pVgroup, &delGid, true) != 0) return -1;
|
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[0].dnodeId) != 0) return -1;
|
||||||
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup) != 0) return -1;
|
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, pVgroup, &delGid, true) != 0) return -1;
|
||||||
|
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup) != 0) return -1;
|
||||||
|
} else if (pVgroup->replica == 3) {
|
||||||
|
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, pVgroup, &delGid, true) != 0) return -1;
|
||||||
|
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[0].dnodeId) != 0) return -1;
|
||||||
|
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[1].dnodeId) != 0) return -1;
|
||||||
|
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[2].dnodeId) != 0) return -1;
|
||||||
|
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup) != 0) return -1;
|
||||||
|
} else {
|
||||||
|
mError("vgId:%d, failed to remove 1 vnode since invalid replica:%d", pVgroup->vgId, pVgroup->replica);
|
||||||
|
terrno = TSDB_CODE_MND_APP_ERROR;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -1334,9 +1366,6 @@ _OVER:
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq) {
|
static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq) {
|
||||||
#if 1
|
|
||||||
return TSDB_CODE_OPS_NOT_SUPPORT;
|
|
||||||
#else
|
|
||||||
SMnode *pMnode = pReq->info.node;
|
SMnode *pMnode = pReq->info.node;
|
||||||
SDnodeObj *pNew1 = NULL;
|
SDnodeObj *pNew1 = NULL;
|
||||||
SDnodeObj *pNew2 = NULL;
|
SDnodeObj *pNew2 = NULL;
|
||||||
|
@ -1530,7 +1559,6 @@ _OVER:
|
||||||
mndReleaseDb(pMnode, pDb);
|
mndReleaseDb(pMnode, pDb);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndCheckDnodeMemory(SMnode *pMnode, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pOldVgroup,
|
static int32_t mndCheckDnodeMemory(SMnode *pMnode, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pOldVgroup,
|
||||||
|
@ -1868,7 +1896,7 @@ static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq) {
|
||||||
SMnode *pMnode = pReq->info.node;
|
SMnode *pMnode = pReq->info.node;
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
SArray *pArray = NULL;
|
SArray *pArray = NULL;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
int64_t curMs = taosGetTimestampMs();
|
int64_t curMs = taosGetTimestampMs();
|
||||||
|
|
||||||
SBalanceVgroupReq req = {0};
|
SBalanceVgroupReq req = {0};
|
||||||
|
|
|
@ -137,6 +137,8 @@ int32_t taosQueueItemSize(STaosQueue *queue) {
|
||||||
taosThreadMutexLock(&queue->mutex);
|
taosThreadMutexLock(&queue->mutex);
|
||||||
int32_t numOfItems = queue->numOfItems;
|
int32_t numOfItems = queue->numOfItems;
|
||||||
taosThreadMutexUnlock(&queue->mutex);
|
taosThreadMutexUnlock(&queue->mutex);
|
||||||
|
|
||||||
|
uTrace("queue:%p, numOfItems:%d memOfItems:%" PRId64, queue, queue->numOfItems, queue->memOfItems);
|
||||||
return numOfItems;
|
return numOfItems;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -50,11 +50,11 @@
|
||||||
# unsupport ./test.sh -f tsim/dnode/drop_dnode_has_multi_vnode_replica1.sim
|
# unsupport ./test.sh -f tsim/dnode/drop_dnode_has_multi_vnode_replica1.sim
|
||||||
# unsupport ./test.sh -f tsim/dnode/drop_dnode_has_multi_vnode_replica3.sim
|
# unsupport ./test.sh -f tsim/dnode/drop_dnode_has_multi_vnode_replica3.sim
|
||||||
./test.sh -f tsim/dnode/offline_reason.sim
|
./test.sh -f tsim/dnode/offline_reason.sim
|
||||||
# unsupport ./test.sh -f tsim/dnode/redistribute_vgroup_replica1.sim
|
./test.sh -f tsim/dnode/redistribute_vgroup_replica1.sim
|
||||||
# unsupport ./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim
|
./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim
|
||||||
# unsupport ./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim
|
./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim
|
||||||
# unsupport ./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v2.sim
|
./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v2.sim
|
||||||
# unsupport ./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v3.sim
|
./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v3.sim
|
||||||
# unsupport ./test.sh -f tsim/dnode/vnode_clean.sim
|
# unsupport ./test.sh -f tsim/dnode/vnode_clean.sim
|
||||||
./test.sh -f tsim/dnode/use_dropped_dnode.sim
|
./test.sh -f tsim/dnode/use_dropped_dnode.sim
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue