From 36fd0dac68fd72b6361fc3a19f227c5a2ab67250 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sun, 5 Jun 2022 16:34:06 +0800 Subject: [PATCH] refactor: add alter-confirm while alter db --- include/util/taoserror.h | 6 ++--- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 28 ++++++++++++++------- source/dnode/mnode/impl/src/mndTrans.c | 9 ++++++- source/dnode/vnode/src/vnd/vnodeSync.c | 2 +- tests/script/tsim/db/alter_replica_31.sim | 21 ++++++++++++++++ 5 files changed, 52 insertions(+), 14 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 2d016cab9e..cbef0b46a7 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -40,9 +40,9 @@ int32_t* taosGetErrno(); #define TSDB_CODE_FAILED -1 // unknown or needn't tell detail error //common & util -#define TSDB_CODE_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0001) -#define TSDB_CODE_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0002) -#define TSDB_CODE_APP_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0003) +#define TSDB_CODE_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0003) +#define TSDB_CODE_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0004) +#define TSDB_CODE_APP_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0005) #define TSDB_CODE_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0010) #define TSDB_CODE_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x0011) #define TSDB_CODE_OUT_OF_SHM_MEM TAOS_DEF_ERROR_CODE(0, 0x0012) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index b1c04fd8cf..427b3d5c94 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -118,25 +118,36 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO pMsg = *(SRpcMsg **)taosArrayGet(pArray, m); code = vnodePreprocessReq(pVnode->pImpl, pMsg); - if (code == TSDB_CODE_ACTION_IN_PROGRESS) continue; - if (code != 0) { - dError("vgId:%d, msg:%p failed to write since %s", pVnode->vgId, pMsg, tstrerror(code)); - vmSendRsp(pMsg, code); + if (code == TSDB_CODE_ACTION_IN_PROGRESS) { + dTrace("vgId:%d, msg:%p in progress and no rsp", pVnode->vgId, pMsg); continue; } - code = syncPropose(sync, pMsg, false); + if (pMsg->msgType != TDMT_VND_ALTER_REPLICA) { + code = syncPropose(sync, pMsg, false); + } + if (code == TAOS_SYNC_PROPOSE_SUCCESS) { + dTrace("vgId:%d, msg:%p is proposed and no rsp", pVnode->vgId, pMsg); continue; } else if (code == TAOS_SYNC_PROPOSE_NOT_LEADER) { - dTrace("vgId:%d, msg:%p is redirect since not leader", pVnode->vgId, pMsg); SEpSet newEpSet = {0}; syncGetEpSet(sync, &newEpSet); - newEpSet.inUse = (newEpSet.inUse + 1) % newEpSet.numOfEps; + SEp *pEp = &newEpSet.eps[newEpSet.inUse]; + if (pEp->port == tsServerPort && strcmp(pEp->fqdn, tsLocalFqdn) == 0) { + newEpSet.inUse = (newEpSet.inUse + 1) % newEpSet.numOfEps; + } + + dTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", pVnode->vgId, pMsg, + newEpSet.numOfEps, newEpSet.inUse); + for (int32_t i = 0; i < newEpSet.numOfEps; ++i) { + dTrace("vgId:%d, msg:%p ep:%s:%u", pVnode->vgId, pMsg, newEpSet.eps[i].fqdn, newEpSet.eps[i].port); + } + SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info}; tmsgSendRedirectRsp(&rsp, &newEpSet); } else { - dError("vgId:%d, msg:%p failed to write since %s", pVnode->vgId, pMsg, tstrerror(code)); + dError("vgId:%d, msg:%p failed to propose write since %s, code:0x%x", pVnode->vgId, pMsg, tstrerror(code), code); vmSendRsp(pMsg, code); } } @@ -251,7 +262,6 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp switch (qtype) { case QUERY_QUEUE: vnodePreprocessQueryMsg(pVnode->pImpl, pMsg); - dTrace("vgId:%d, msg:%p put into vnode-query queue", pVnode->vgId, pMsg); taosWriteQitem(pVnode->pQueryQ, pMsg); break; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index cc9cc6edc2..8c0ba65fb2 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -870,8 +870,15 @@ static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) pAction->rawWritten = 0; pAction->msgSent = 0; pAction->msgReceived = 0; + if (pAction->errCode == TSDB_CODE_RPC_REDIRECT) { + pAction->epSet.inUse = (pAction->epSet.inUse + 1) % pAction->epSet.numOfEps; + mDebug("trans:%d, %s:%d execute status is reset and set epset inuse:%d", pTrans->id, mndTransStr(pAction->stage), + action, pAction->epSet.inUse); + } else { + mDebug("trans:%d, %s:%d execute status is reset", pTrans->id, mndTransStr(pAction->stage), action); + } pAction->errCode = 0; - mDebug("trans:%d, %s:%d execute status is reset", pTrans->id, mndTransStr(pAction->stage), action); + } } diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 37f765d786..8f98e9cec5 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -69,7 +69,7 @@ int32_t vnodeSyncAlter(SVnode *pVnode, SRpcMsg *pMsg) { int32_t code = syncReconfig(pVnode->sync, &cfg); if (code == TAOS_SYNC_PROPOSE_SUCCESS) { // todo refactor - SRpcMsg rsp = {.info = pMsg->info, .code = terrno}; + SRpcMsg rsp = {.info = pMsg->info, .code = 0}; tmsgSendRsp(&rsp); return TSDB_CODE_ACTION_IN_PROGRESS; } diff --git a/tests/script/tsim/db/alter_replica_31.sim b/tests/script/tsim/db/alter_replica_31.sim index 896d6708b7..cbd4183b5e 100644 --- a/tests/script/tsim/db/alter_replica_31.sim +++ b/tests/script/tsim/db/alter_replica_31.sim @@ -78,6 +78,15 @@ if $data(4)[2] != 1 then endi # v1_dnode +$hasleader = 0 +$x = 0 +step2: + $x = $x + 1 + sleep 1000 + if $x == 20 then + print ====> dnode not ready! + return -1 + endi sql show db.vgroups print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 if $data(2)[3] != 4 then @@ -89,6 +98,18 @@ endi if $data(2)[7] != 2 then return -1 endi +if $data(2)[4] == leader then + $hasleader = 1 +endi +if $data(2)[6] == leader then + $hasleader = 1 +endi +if $data(2)[8] == leader then + $hasleader = 1 +endi +if $hasleader != 1 then + goto step2 +endi sql create table db.stb (ts timestamp, c1 int, c2 binary(4)) tags(t1 int, t2 binary(16)) comment "abd" sql create table db.ctb using db.stb tags(101, "102")