Merge remote-tracking branch 'origin/fix/dnode' into fix/mnode
This commit is contained in:
commit
856c0eed7c
|
@ -123,6 +123,11 @@ typedef enum {
|
|||
TRN_POLICY_RETRY = 1,
|
||||
} ETrnPolicy;
|
||||
|
||||
typedef enum {
|
||||
TRN_EXEC_PARALLEL = 0,
|
||||
TRN_EXEC_ONE_BY_ONE = 1,
|
||||
} ETrnExecType;
|
||||
|
||||
typedef enum {
|
||||
DND_REASON_ONLINE = 0,
|
||||
DND_REASON_STATUS_MSG_TIMEOUT,
|
||||
|
@ -151,6 +156,7 @@ typedef struct {
|
|||
ETrnStage stage;
|
||||
ETrnPolicy policy;
|
||||
ETrnType type;
|
||||
ETrnExecType parallel;
|
||||
int32_t code;
|
||||
int32_t failedTimes;
|
||||
SRpcHandleInfo rpcInfo;
|
||||
|
|
|
@ -57,6 +57,7 @@ int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction);
|
|||
void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen);
|
||||
void mndTransSetCb(STrans *pTrans, ETrnFuncType startFunc, ETrnFuncType stopFunc, void *param, int32_t paramLen);
|
||||
void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb);
|
||||
void mndTransSetExecOneByOne(STrans *pTrans);
|
||||
|
||||
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans);
|
||||
void mndTransProcessRsp(SRpcMsg *pRsp);
|
||||
|
|
|
@ -312,25 +312,6 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno
|
|||
createEpset.eps[0].port = pDnode->port;
|
||||
memcpy(createEpset.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
|
||||
|
||||
{
|
||||
int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &alterReq);
|
||||
void *pReq = taosMemoryMalloc(contLen);
|
||||
tSerializeSDCreateMnodeReq(pReq, contLen, &alterReq);
|
||||
|
||||
STransAction action = {
|
||||
.epSet = alterEpset,
|
||||
.pCont = pReq,
|
||||
.contLen = contLen,
|
||||
.msgType = TDMT_DND_ALTER_MNODE,
|
||||
.acceptableCode = 0,
|
||||
};
|
||||
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
taosMemoryFree(pReq);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &createReq);
|
||||
void *pReq = taosMemoryMalloc(contLen);
|
||||
|
@ -350,6 +331,25 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno
|
|||
}
|
||||
}
|
||||
|
||||
{
|
||||
int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &alterReq);
|
||||
void *pReq = taosMemoryMalloc(contLen);
|
||||
tSerializeSDCreateMnodeReq(pReq, contLen, &alterReq);
|
||||
|
||||
STransAction action = {
|
||||
.epSet = alterEpset,
|
||||
.pCont = pReq,
|
||||
.contLen = contLen,
|
||||
.msgType = TDMT_DND_ALTER_MNODE,
|
||||
.acceptableCode = 0,
|
||||
};
|
||||
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
taosMemoryFree(pReq);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -365,6 +365,7 @@ static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode,
|
|||
if (pTrans == NULL) goto _OVER;
|
||||
|
||||
mDebug("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId);
|
||||
mndTransSetExecOneByOne(pTrans);
|
||||
if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
|
||||
if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
|
||||
if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) goto _OVER;
|
||||
|
@ -536,7 +537,7 @@ static int32_t mndDropMnode(SMnode *pMnode, SRpcMsg *pReq, SMnodeObj *pObj) {
|
|||
if (pTrans == NULL) goto _OVER;
|
||||
|
||||
mDebug("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id);
|
||||
|
||||
mndTransSetExecOneByOne(pTrans);
|
||||
if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) goto _OVER;
|
||||
if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) goto _OVER;
|
||||
if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj) != 0) goto _OVER;
|
||||
|
|
|
@ -507,6 +507,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
|||
|
||||
mDebug("trans:%d, used to create sma:%s", pTrans->id, pCreate->name);
|
||||
mndTransSetDbInfo(pTrans, pDb);
|
||||
mndTransSetExecOneByOne(pTrans);
|
||||
|
||||
if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
|
||||
if (mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER;
|
||||
|
|
|
@ -140,6 +140,7 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
|
|||
SDB_SET_INT16(pRaw, dataPos, stage, _OVER)
|
||||
SDB_SET_INT16(pRaw, dataPos, pTrans->policy, _OVER)
|
||||
SDB_SET_INT16(pRaw, dataPos, pTrans->type, _OVER)
|
||||
SDB_SET_INT16(pRaw, dataPos, pTrans->parallel, _OVER)
|
||||
SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER)
|
||||
SDB_SET_INT64(pRaw, dataPos, pTrans->dbUid, _OVER)
|
||||
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER)
|
||||
|
@ -245,12 +246,15 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
|
|||
int16_t stage = 0;
|
||||
int16_t policy = 0;
|
||||
int16_t type = 0;
|
||||
int16_t parallel = 0;
|
||||
SDB_GET_INT16(pRaw, dataPos, &stage, _OVER)
|
||||
SDB_GET_INT16(pRaw, dataPos, &policy, _OVER)
|
||||
SDB_GET_INT16(pRaw, dataPos, &type, _OVER)
|
||||
SDB_GET_INT16(pRaw, dataPos, ¶llel, _OVER)
|
||||
pTrans->stage = stage;
|
||||
pTrans->policy = policy;
|
||||
pTrans->type = type;
|
||||
pTrans->parallel = parallel;
|
||||
SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER)
|
||||
SDB_GET_INT64(pRaw, dataPos, &pTrans->dbUid, _OVER)
|
||||
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER)
|
||||
|
@ -665,6 +669,8 @@ void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb) {
|
|||
memcpy(pTrans->dbname, pDb->name, TSDB_DB_FNAME_LEN);
|
||||
}
|
||||
|
||||
void mndTransSetExecOneByOne(STrans *pTrans) { pTrans->parallel = TRN_EXEC_ONE_BY_ONE; }
|
||||
|
||||
static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
|
||||
SSdbRaw *pRaw = mndTransActionEncode(pTrans);
|
||||
if (pRaw == NULL) {
|
||||
|
@ -970,7 +976,18 @@ static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pAr
|
|||
for (int32_t action = 0; action < numOfActions; ++action) {
|
||||
STransAction *pAction = taosArrayGet(pArray, action);
|
||||
if (pAction == NULL) continue;
|
||||
if (pAction->msgSent) continue;
|
||||
|
||||
if (pAction->msgSent) {
|
||||
if (pAction->msgReceived) {
|
||||
continue;
|
||||
} else {
|
||||
if (pTrans->parallel == TRN_EXEC_ONE_BY_ONE) {
|
||||
break;
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int64_t signature = pTrans->id;
|
||||
signature = (signature << 32);
|
||||
|
@ -990,6 +1007,9 @@ static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pAr
|
|||
pAction->msgSent = 1;
|
||||
pAction->msgReceived = 0;
|
||||
pAction->errCode = 0;
|
||||
if (pTrans->parallel == TRN_EXEC_ONE_BY_ONE) {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
pAction->msgSent = 0;
|
||||
pAction->msgReceived = 0;
|
||||
|
|
|
@ -60,7 +60,9 @@ void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId,
|
|||
return;
|
||||
}
|
||||
}
|
||||
assert(0);
|
||||
|
||||
// maybe config change
|
||||
// assert(0);
|
||||
}
|
||||
|
||||
SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId) {
|
||||
|
|
|
@ -981,6 +981,7 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) {
|
|||
}
|
||||
|
||||
void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, bool* isDrop) {
|
||||
SSyncCfg oldConfig = pSyncNode->pRaftCfg->cfg;
|
||||
pSyncNode->pRaftCfg->cfg = *newConfig;
|
||||
int32_t ret = 0;
|
||||
|
||||
|
@ -1014,6 +1015,15 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, bool* isDro
|
|||
|
||||
// isDrop
|
||||
*isDrop = true;
|
||||
bool IamInOld, IamInNew;
|
||||
for (int i = 0; i < oldConfig.replicaNum; ++i) {
|
||||
if (strcmp((oldConfig.nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
|
||||
(oldConfig.nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
|
||||
*isDrop = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = 0; i < newConfig->replicaNum; ++i) {
|
||||
if (strcmp((newConfig->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
|
||||
(newConfig->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
|
||||
|
|
|
@ -55,7 +55,7 @@
|
|||
./test.sh -f tsim/bnode/basic1.sim
|
||||
|
||||
# ---- mnode
|
||||
#./test.sh -f tsim/mnode/basic1.sim
|
||||
./test.sh -f tsim/mnode/basic1.sim
|
||||
./test.sh -f tsim/mnode/basic2.sim
|
||||
|
||||
# ---- show
|
||||
|
|
|
@ -88,7 +88,7 @@ sql show mnodes
|
|||
print $data(1)[0] $data(1)[1] $data(1)[2]
|
||||
print $data(2)[0] $data(2)[1] $data(2)[2]
|
||||
|
||||
if $rows != 2 then
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data(1)[0] != 1 then
|
||||
|
@ -97,16 +97,16 @@ endi
|
|||
if $data(1)[2] != LEADER then
|
||||
return -1
|
||||
endi
|
||||
if $data(2)[0] != NULL then
|
||||
if $data(2)[0] != null then
|
||||
goto step2
|
||||
endi
|
||||
if $data(2)[2] != NULL then
|
||||
if $data(2)[2] != null then
|
||||
goto step2
|
||||
endi
|
||||
|
||||
sleep 2000
|
||||
|
||||
print =============== create drop mnodes
|
||||
print =============== create mnodes
|
||||
sql create mnode on dnode 2
|
||||
sql show mnodes
|
||||
if $rows != 2 then
|
||||
|
|
Loading…
Reference in New Issue