From 1157bca45fa9cff59759579368aa00fdcd5b0b44 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 2 Jun 2022 13:57:12 +0800 Subject: [PATCH] enh: adjust vnode replica --- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 55 +++++++++++---------- source/dnode/vnode/src/inc/vnd.h | 2 +- source/dnode/vnode/src/vnd/vnodeSvr.c | 5 +- source/dnode/vnode/src/vnd/vnodeSync.c | 14 +++--- tests/script/tsim/db/alter_replica_13.sim | 14 +++--- 5 files changed, 47 insertions(+), 43 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index a945358d34..03db74abd7 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -91,51 +91,52 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { } static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { + int32_t code = 0; + SRpcMsg *pMsg = NULL; SVnodeObj *pVnode = pInfo->ahandle; - SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *)); - if (pArray == NULL) { - dError("failed to process %d msgs in write-queue since %s", numOfMsgs, terrstr()); - return; - } + int64_t sync = vnodeGetSyncHandle(pVnode->pImpl); + SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg **)); - for (int32_t i = 0; i < numOfMsgs; ++i) { - SRpcMsg *pMsg = NULL; + for (int32_t m = 0; m < numOfMsgs; m++) { if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; + dTrace("vgId:%d, get msg:%p from vnode-write queue, type:%s", pVnode->vgId, pMsg, TMSG_INFO(pMsg->msgType)); - dTrace("msg:%p, get from vnode-write queue", pMsg); if (taosArrayPush(pArray, &pMsg) == NULL) { - dTrace("msg:%p, failed to push to array since %s", pMsg, terrstr()); + dError("vgId:%d, failed to push msg:%p to vnode-write array", pVnode->vgId, pMsg); vmSendRsp(pMsg, TSDB_CODE_OUT_OF_MEMORY); } } - for (int i = 0; i < taosArrayGetSize(pArray); i++) { - SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i); - SRpcMsg rsp = {.info = pMsg->info}; + for (int32_t m = 0; m < taosArrayGetSize(pArray); m++) { + pMsg = *(SRpcMsg **)taosArrayGet(pArray, m); + code = vnodePreprocessReq(pVnode->pImpl, pMsg); - vnodePreprocessReq(pVnode->pImpl, pMsg); + if (code == TSDB_CODE_ACTION_IN_PROGRESS) continue; + if (code != 0) { + dError("vgId:%d, msg:%p failed to process since %s", pVnode->vgId, pMsg, tstrerror(code)); + vmSendRsp(pMsg, code); + continue; + } - int32_t ret = syncPropose(vnodeGetSyncHandle(pVnode->pImpl), pMsg, false); - if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) { - dTrace("msg:%p, is redirect since not leader, vgId:%d ", pMsg, pVnode->vgId); - rsp.code = TSDB_CODE_RPC_REDIRECT; - SEpSet newEpSet; - syncGetEpSet(vnodeGetSyncHandle(pVnode->pImpl), &newEpSet); + code = syncPropose(sync, pMsg, false); + if (code == TAOS_SYNC_PROPOSE_SUCCESS) { + continue; + } else if (code == TAOS_SYNC_PROPOSE_NOT_LEADER) { + dTrace("vgId:%d, msg:%p is redirect since not leader", pVnode->vgId, pMsg); + SEpSet newEpSet = {0}; + syncGetEpSet(sync, &newEpSet); newEpSet.inUse = (newEpSet.inUse + 1) % newEpSet.numOfEps; + SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info}; tmsgSendRedirectRsp(&rsp, &newEpSet); - } else if (ret == TAOS_SYNC_PROPOSE_OTHER_ERROR) { - rsp.code = TSDB_CODE_SYN_INTERNAL_ERROR; - tmsgSendRsp(&rsp); - } else if (ret == TAOS_SYNC_PROPOSE_SUCCESS) { - // send response in applyQ } else { - assert(0); + dError("vgId:%d, msg:%p failed to process since %s", pVnode->vgId, pMsg, tstrerror(code)); + vmSendRsp(pMsg, code); } } for (int32_t i = 0; i < numOfMsgs; i++) { - SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i); - dTrace("msg:%p, is freed", pMsg); + pMsg = *(SRpcMsg **)taosArrayGet(pArray, i); + dTrace("vgId:%d, msg:%p, is freed", pVnode->vgId, pMsg); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index a5907cf991..5f4f7e70da 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -84,7 +84,7 @@ int32_t vnodeAsyncCommit(SVnode* pVnode); int32_t vnodeSyncOpen(SVnode* pVnode, char* path); void vnodeSyncStart(SVnode* pVnode); void vnodeSyncClose(SVnode* pVnode); -void vnodeSyncAlter(SVnode* pVnode, SRpcMsg* pMsg); +int32_t vnodeSyncAlter(SVnode* pVnode, SRpcMsg* pMsg); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index b66695f7c2..f26461b55f 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -25,6 +25,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg) { + int32_t code = 0; SDecoder dc = {0}; switch (pMsg->msgType) { @@ -89,13 +90,13 @@ int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg) { } break; case TDMT_VND_ALTER_REPLICA: { - vnodeSyncAlter(pVnode, pMsg); + code = vnodeSyncAlter(pVnode, pMsg); } break; default: break; } - return 0; + return code; } int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) { diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 8792fbbb0c..37f765d786 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -50,13 +50,11 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) { return 0; } -void vnodeSyncAlter(SVnode *pVnode, SRpcMsg *pMsg) { +int32_t vnodeSyncAlter(SVnode *pVnode, SRpcMsg *pMsg) { SAlterVnodeReq req = {0}; if (tDeserializeSAlterVnodeReq((char *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead), &req) != 0) { terrno = TSDB_CODE_INVALID_MSG; - vError("vgId:%d, failed to alter replica since %s", TD_VID(pVnode), terrstr()); - SRpcMsg rsp = {.info = pMsg->info, .code = terrno}; - tmsgSendRsp(&rsp); + return TSDB_CODE_INVALID_MSG; } vInfo("vgId:%d, start to alter vnode replica to %d", TD_VID(pVnode), req.replica); @@ -68,11 +66,15 @@ void vnodeSyncAlter(SVnode *pVnode, SRpcMsg *pMsg) { vInfo("vgId:%d, replica:%d %s:%u", TD_VID(pVnode), r, pNode->nodeFqdn, pNode->nodePort); } - if (syncReconfig(pVnode->sync, &cfg) != 0) { - vError("vgId:%d, failed to propose sync reconfig since %s", TD_VID(pVnode), terrstr()); + int32_t code = syncReconfig(pVnode->sync, &cfg); + if (code == TAOS_SYNC_PROPOSE_SUCCESS) { + // todo refactor SRpcMsg rsp = {.info = pMsg->info, .code = terrno}; tmsgSendRsp(&rsp); + return TSDB_CODE_ACTION_IN_PROGRESS; } + + return code; } void vnodeSyncStart(SVnode *pVnode) { diff --git a/tests/script/tsim/db/alter_replica_13.sim b/tests/script/tsim/db/alter_replica_13.sim index 8ab6eb64fd..7cf5c53ad4 100644 --- a/tests/script/tsim/db/alter_replica_13.sim +++ b/tests/script/tsim/db/alter_replica_13.sim @@ -12,11 +12,11 @@ sql connect print =============== step1: create dnodes sql create dnode $hostname port 7200 -$loop_cnt = 0 +$x = 0 step1: - $loop_cnt = $loop_cnt + 1 + $ = $x + 1 sleep 1000 - if $loop_cnt == 10 then + if $x == 10 then print ====> dnode not ready! return -1 endi @@ -73,11 +73,11 @@ print =============== step3: create dnodes sql create dnode $hostname port 7300 sql create dnode $hostname port 7400 -$loop_cnt = 0 +$x = 0 step3: - $loop_cnt = $loop_cnt + 1 + $x = $x + 1 sleep 1000 - if $loop_cnt == 10 then + if $x == 10 then print ====> dnode not ready! return -1 endi @@ -118,7 +118,7 @@ if $rows != 1 then return -1 endi - +return system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode2 -s stop -x SIGINT system sh/exec.sh -n dnode3 -s stop -x SIGINT