From eddd5b390cb4eb313094a5992e26c457a4d39e15 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 20 May 2022 12:48:15 +0800 Subject: [PATCH] refactor: adjust vm worker --- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 44 ++++++++------------- 1 file changed, 17 insertions(+), 27 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index c3f5a2cd93..50e2256145 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -22,21 +22,19 @@ static inline void vmSendRsp(SRpcMsg *pMsg, int32_t code) { SRpcMsg rsp = { .code = code, - .info = pMsg->info, .pCont = pMsg->info.rsp, .contLen = pMsg->info.rspLen, + .info = pMsg->info, }; tmsgSendRsp(&rsp); } static void vmProcessMgmtMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { SVnodeMgmt *pMgmt = pInfo->ahandle; + int32_t code = -1; + dTrace("msg:%p, get from vnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType)); - int32_t code = -1; - tmsg_t msgType = pMsg->msgType; - dTrace("msg:%p, get from vnode queue, type:%s", pMsg, TMSG_INFO(msgType)); - - switch (msgType) { + switch (pMsg->msgType) { case TDMT_MON_VM_INFO: code = vmProcessGetMonitorInfoReq(pMgmt, pMsg); break; @@ -54,7 +52,7 @@ static void vmProcessMgmtMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { dError("msg:%p, not processed in vnode queue", pMsg); } - if (msgType & 1u) { + if (IsReq(pMsg)) { if (code != 0 && terrno != 0) code = terrno; vmSendRsp(pMsg, code); } @@ -96,7 +94,6 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { SVnodeObj *pVnode = pInfo->ahandle; - SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *)); if (pArray == NULL) { dError("failed to process %d msgs in write-queue since %s", numOfMsgs, terrstr()); @@ -116,7 +113,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO for (int i = 0; i < taosArrayGetSize(pArray); i++) { SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i); - SRpcMsg rsp = {.info = pMsg->info, .pCont = NULL, .contLen = 0}; + SRpcMsg rsp = {.info = pMsg->info}; int32_t ret = syncPropose(vnodeGetSyncHandle(pVnode->pImpl), pMsg, false); if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) { @@ -130,7 +127,6 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO rsp.code = TSDB_CODE_SYN_INTERNAL_ERROR; tmsgSendRsp(&rsp); } else if (ret == TAOS_SYNC_PROPOSE_SUCCESS) { - // ok // send response in applyQ } else { assert(0); @@ -149,16 +145,13 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { SVnodeObj *pVnode = pInfo->ahandle; - SRpcMsg *pMsg = NULL; - SRpcMsg rsp; for (int32_t i = 0; i < numOfMsgs; ++i) { + SRpcMsg *pMsg = NULL; taosGetQitem(qall, (void **)&pMsg); // init response rpc msg - rsp.code = 0; - rsp.pCont = NULL; - rsp.contLen = 0; + SRpcMsg rsp = {0}; // get original rpc msg assert(pMsg->msgType == TDMT_VND_SYNC_APPLY_MSG); @@ -177,7 +170,6 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO rpcFreeCont(originalRpcMsg.pCont); // if leader, send response - // if (pMsg->rpcMsg.handle != NULL && pMsg->rpcMsg.ahandle != NULL) { if (pMsg->info.handle != NULL) { rsp.info = pMsg->info; tmsgSendRsp(&rsp); @@ -190,21 +182,19 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { SVnodeObj *pVnode = pInfo->ahandle; - SRpcMsg *pMsg = NULL; for (int32_t i = 0; i < numOfMsgs; ++i) { + SRpcMsg *pMsg = NULL; taosGetQitem(qall, (void **)&pMsg); - // todo - SRpcMsg *pRsp = NULL; - int32_t ret = vnodeProcessSyncReq(pVnode->pImpl, pMsg, &pRsp); - if (ret != 0) { - // if leader, send response + int32_t code = vnodeProcessSyncReq(pVnode->pImpl, pMsg, NULL); + if (code != 0) { if (pMsg->info.handle != NULL) { - SRpcMsg rsp = {0}; - rsp.code = terrno; - rsp.info = pMsg->info; - dTrace("msg:%p, process sync queue error since code:%s", pMsg, terrstr()); + SRpcMsg rsp = { + .code = (terrno < 0) ? terrno : code, + .info = pMsg->info, + }; + dTrace("msg:%p, failed to process sync queue since %s", pMsg, terrstr()); tmsgSendRsp(&rsp); } } @@ -216,9 +206,9 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { SVnodeObj *pVnode = pInfo->ahandle; - SRpcMsg *pMsg = NULL; for (int32_t i = 0; i < numOfMsgs; ++i) { + SRpcMsg *pMsg = NULL; taosGetQitem(qall, (void **)&pMsg); dTrace("msg:%p, get from vnode-merge queue", pMsg);