diff --git a/source/dnode/mgmt/mgmt_bnode/src/bmWorker.c b/source/dnode/mgmt/mgmt_bnode/src/bmWorker.c index 54c57fbb0b..14c6250b2a 100644 --- a/source/dnode/mgmt/mgmt_bnode/src/bmWorker.c +++ b/source/dnode/mgmt/mgmt_bnode/src/bmWorker.c @@ -63,7 +63,7 @@ static void bmProcessMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { bmSendRsp(pMsg, code); } - dTrace("msg:%p, is freed, result:0x%x:%s", pMsg, code, tstrerror(code)); + dTrace("msg:%p, is freed, code:0x%x", pMsg, code); rpcFreeCont(pRpc->pCont); taosFreeQitem(pMsg); } diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c index 57737293c5..be8ddf1430 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c @@ -160,7 +160,7 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { rpcSendResponse(&rsp); } - dTrace("msg:%p, is freed, result:0x%x:%s", pMsg, code, tstrerror(code)); + dTrace("msg:%p, is freed, code:0x%x", pMsg, code); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index 2d690f3306..98e234af3c 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -51,7 +51,7 @@ static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { mmSendRsp(pMsg, code); } - dTrace("msg:%p, is freed, result:0x%x:%s", pMsg, code, tstrerror(code)); + dTrace("msg:%p, is freed, code:0x%x", pMsg, code); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } @@ -73,24 +73,31 @@ static void mmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { } } - dTrace("msg:%p, is freed, result:0x%x:%s", pMsg, code, tstrerror(code)); + dTrace("msg:%p, is freed, code:0x%x", pMsg, code); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } static int32_t mmPutNodeMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pMsg) { - dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); + dTrace("msg:%p, put into worker %s, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType)); taosWriteQitem(pWorker->queue, pMsg); return 0; } -int32_t mmPutNodeMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutNodeMsgToWorker(&pMgmt->writeWorker, pMsg); } +int32_t mmPutNodeMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { + return mmPutNodeMsgToWorker(&pMgmt->writeWorker, pMsg); +} -int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutNodeMsgToWorker(&pMgmt->syncWorker, pMsg); } +int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { + return mmPutNodeMsgToWorker(&pMgmt->syncWorker, pMsg); +} -int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutNodeMsgToWorker(&pMgmt->readWorker, pMsg); } +int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { + return mmPutNodeMsgToWorker(&pMgmt->readWorker, pMsg); +} -int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutNodeMsgToWorker(&pMgmt->queryWorker, pMsg); +int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { + return mmPutNodeMsgToWorker(&pMgmt->queryWorker, pMsg); } int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { @@ -101,25 +108,25 @@ static inline int32_t mmPutRpcMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pRpc) SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM); if (pMsg == NULL) return -1; - dTrace("msg:%p, is created and put into worker:%s, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType)); + dTrace("msg:%p, create and put into worker:%s, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType)); memcpy(pMsg, pRpc, sizeof(SRpcMsg)); taosWriteQitem(pWorker->queue, pMsg); return 0; } -int32_t mmPutRpcMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc) { - return mmPutRpcMsgToWorker(&pMgmt->queryWorker, pRpc); +int32_t mmPutRpcMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { + return mmPutRpcMsgToWorker(&pMgmt->queryWorker, pMsg); } -int32_t mmPutRpcMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc) { - return mmPutRpcMsgToWorker(&pMgmt->writeWorker, pRpc); +int32_t mmPutRpcMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { + return mmPutRpcMsgToWorker(&pMgmt->writeWorker, pMsg); } -int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc) { - return mmPutRpcMsgToWorker(&pMgmt->readWorker, pRpc); +int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { + return mmPutRpcMsgToWorker(&pMgmt->readWorker, pMsg); } -int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc) { return mmPutRpcMsgToWorker(&pMgmt->syncWorker, pRpc); } +int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutRpcMsgToWorker(&pMgmt->syncWorker, pMsg); } int32_t mmStartWorker(SMnodeMgmt *pMgmt) { SSingleWorkerCfg qCfg = { diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c index 6b08cc4b62..444c42717a 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c @@ -44,7 +44,7 @@ static void qmProcessMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { qmSendRsp(pMsg, code); } - dTrace("msg:%p, is freed, result:0x%x:%s", pMsg, code, tstrerror(code)); + dTrace("msg:%p, is freed, code:0x%x", pMsg, code); rpcFreeCont(pRpc->pCont); taosFreeQitem(pMsg); } @@ -60,7 +60,7 @@ static void qmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { qmSendRsp(pMsg, code); } - dTrace("msg:%p, is freed, result:0x%x:%s", pMsg, code, tstrerror(code)); + dTrace("msg:%p, is freed, code:0x%x", pMsg, code); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } @@ -76,7 +76,7 @@ static void qmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { qmSendRsp(pMsg, code); } - dTrace("msg:%p, is freed, result:0x%x:%s", pMsg, code, tstrerror(code)); + dTrace("msg:%p, is freed, code:0x%x", pMsg, code); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } @@ -105,7 +105,7 @@ static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SSingleWorker *pWorker, SR return -1; } - dTrace("msg:%p, is created and put into worker:%s, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType)); + dTrace("msg:%p, create and put into worker:%s, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType)); memcpy(pMsg, pRpc, sizeof(SRpcMsg)); taosWriteQitem(pWorker->queue, pMsg); return 0; diff --git a/source/dnode/mgmt/mgmt_snode/src/smWorker.c b/source/dnode/mgmt/mgmt_snode/src/smWorker.c index a2e88eb18e..c94cf527c7 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smWorker.c +++ b/source/dnode/mgmt/mgmt_snode/src/smWorker.c @@ -44,7 +44,7 @@ static void smProcessMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { smSendRsp(pMsg, code); } - dTrace("msg:%p, is freed, result:0x%x:%s", pMsg, code, tstrerror(code)); + dTrace("msg:%p, is freed, code:0x%x", pMsg, code); rpcFreeCont(pRpc->pCont); taosFreeQitem(pMsg); } @@ -166,7 +166,7 @@ int32_t smPutNodeMsgToMgmtQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) { return -1; } - dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); + dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); taosWriteQitem(pWorker->queue, pMsg); return 0; } @@ -174,7 +174,7 @@ int32_t smPutNodeMsgToMgmtQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t smPutNodeMsgToMonitorQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) { SSingleWorker *pWorker = &pMgmt->monitorWorker; - dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); + dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); taosWriteQitem(pWorker->queue, pMsg); return 0; } @@ -187,7 +187,7 @@ int32_t smPutNodeMsgToUniqueQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) { return -1; } - dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); + dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); taosWriteQitem(pWorker->queue, pMsg); return 0; } @@ -195,7 +195,7 @@ int32_t smPutNodeMsgToUniqueQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t smPutNodeMsgToSharedQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) { SSingleWorker *pWorker = &pMgmt->sharedWorker; - dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); + dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); taosWriteQitem(pWorker->queue, pMsg); return 0; } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 4839266c0e..9f77180cd8 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -51,7 +51,7 @@ static void vmProcessMgmtMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { break; default: terrno = TSDB_CODE_MSG_NOT_PROCESSED; - dError("msg:%p, not processed in vnode-mgmt/monitor queue", pMsg); + dError("msg:%p, not processed in vnode queue", pMsg); } if (msgType & 1u) { @@ -59,7 +59,7 @@ static void vmProcessMgmtMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { vmSendRsp(pMsg, code); } - dTrace("msg:%p, is freed, result:0x%x:%s", pMsg, code, tstrerror(code)); + dTrace("msg:%p, is freed, code:0x%x", pMsg, code); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } @@ -67,13 +67,13 @@ static void vmProcessMgmtMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { static void vmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { SVnodeObj *pVnode = pInfo->ahandle; - dTrace("msg:%p, will be processed in vnode-query queue", pMsg); + dTrace("msg:%p, get from vnode-query queue", pMsg); int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, pMsg); if (code != 0) { if (terrno != 0) code = terrno; vmSendRsp(pMsg, code); - dTrace("msg:%p, is freed, result:0x%x:%s", pMsg, code, tstrerror(code)); + dTrace("msg:%p, is freed, code:0x%x", pMsg, code); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } @@ -82,13 +82,13 @@ static void vmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { static void vmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { SVnodeObj *pVnode = pInfo->ahandle; - dTrace("msg:%p, will be processed in vnode-fetch queue", pMsg); + dTrace("msg:%p, get from vnode-fetch queue", pMsg); int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo); if (code != 0) { if (terrno != 0) code = terrno; vmSendRsp(pMsg, code); - dTrace("msg:%p, is freed, result:0x%x:%s", pMsg, code, tstrerror(code)); + dTrace("msg:%p, is freed, code:0x%x", pMsg, code); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } @@ -96,7 +96,6 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { SVnodeObj *pVnode = pInfo->ahandle; - SRpcMsg rsp; SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *)); if (pArray == NULL) { @@ -108,7 +107,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO SRpcMsg *pMsg = NULL; if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; - dTrace("msg:%p, will be processed in vnode-write queue", pMsg); + dTrace("msg:%p, get from vnode-write queue", pMsg); if (taosArrayPush(pArray, &pMsg) == NULL) { dTrace("msg:%p, failed to process since %s", pMsg, terrstr()); vmSendRsp(pMsg, TSDB_CODE_OUT_OF_MEMORY); @@ -116,21 +115,12 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO } for (int i = 0; i < taosArrayGetSize(pArray); i++) { - SRpcMsg *pMsg; - SRpcMsg *pRpc; + SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i); + SRpcMsg rsp = {.info = pMsg->info, .pCont = NULL, .contLen = 0}; - pMsg = *(SRpcMsg **)taosArrayGet(pArray, i); - pRpc = pMsg; - - rsp.info = pRpc->info; - rsp.pCont = NULL; - rsp.contLen = 0; - - int32_t ret = syncPropose(vnodeGetSyncHandle(pVnode->pImpl), pRpc, false); + int32_t ret = syncPropose(vnodeGetSyncHandle(pVnode->pImpl), pMsg, false); if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) { - // rsp.code = TSDB_CODE_SYN_NOT_LEADER; - // tmsgSendRsp(&rsp); - dTrace("syncPropose not leader redirect, vgId:%d ", syncGetVgId(vnodeGetSyncHandle(pVnode->pImpl))); + dTrace("msg:%p, is redirect since not leader, vgId:%d ", pMsg, pVnode->vgId); rsp.code = TSDB_CODE_RPC_REDIRECT; SEpSet newEpSet; syncGetEpSet(vnodeGetSyncHandle(pVnode->pImpl), &newEpSet); @@ -160,7 +150,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { SVnodeObj *pVnode = pInfo->ahandle; - SRpcMsg *pMsg = NULL; + SRpcMsg *pMsg = NULL; SRpcMsg rsp; for (int32_t i = 0; i < numOfMsgs; ++i) { @@ -181,7 +171,7 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO // apply data into tsdb if (vnodeProcessWriteReq(pVnode->pImpl, &originalRpcMsg, pSyncApplyMsg->fsmMeta.index, &rsp) < 0) { rsp.code = terrno; - dTrace("vnodeProcessWriteReq error, code:%d", terrno); + dTrace("msg:%p, process write error since %s", pMsg, terrstr()); } syncApplyMsgDestroy(pSyncApplyMsg); @@ -215,7 +205,7 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf SRpcMsg rsp = {0}; rsp.code = terrno; rsp.info = pMsg->info; - dTrace("vmProcessSyncQueue error, code:%d", terrno); + dTrace("msg:%p, process sync queue error since code:%s", pMsg, terrstr()); tmsgSendRsp(&rsp); } } @@ -227,18 +217,18 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { SVnodeObj *pVnode = pInfo->ahandle; - SRpcMsg *pMsg = NULL; + SRpcMsg *pMsg = NULL; for (int32_t i = 0; i < numOfMsgs; ++i) { taosGetQitem(qall, (void **)&pMsg); - dTrace("msg:%p, will be processed in vnode-merge queue", pMsg); + dTrace("msg:%p, get from vnode-merge queue", pMsg); int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo); if (code != 0) { if (terrno != 0) code = terrno; vmSendRsp(pMsg, code); - dTrace("msg:%p, is freed, result:0x%x:%s", pMsg, code, tstrerror(code)); + dTrace("msg:%p, is freed, code:0x%x", pMsg, code); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } @@ -255,29 +245,29 @@ static int32_t vmPutNodeMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); if (pVnode == NULL) { - dError("vgId:%d, failed to write msg:%p to vnode-queue since %s", pHead->vgId, pMsg, terrstr()); + dError("vgId:%d, failed to put msg:%p into vnode-queue since %s", pHead->vgId, pMsg, terrstr()); return terrno != 0 ? terrno : -1; } switch (qtype) { case QUERY_QUEUE: - dTrace("msg:%p, type:%s will be written into vnode-query queue", pMsg, TMSG_INFO(pRpc->msgType)); + dTrace("msg:%p, put into vnode-query worker, type:%s", pMsg, TMSG_INFO(pRpc->msgType)); taosWriteQitem(pVnode->pQueryQ, pMsg); break; case FETCH_QUEUE: - dTrace("msg:%p, type:%s will be written into vnode-fetch queue", pMsg, TMSG_INFO(pRpc->msgType)); + dTrace("msg:%p, put into vnode-fetch worker, type:%s", pMsg, TMSG_INFO(pRpc->msgType)); taosWriteQitem(pVnode->pFetchQ, pMsg); break; case WRITE_QUEUE: - dTrace("msg:%p, type:%s will be written into vnode-write queue", pMsg, TMSG_INFO(pRpc->msgType)); + dTrace("msg:%p, put into vnode-write worker, type:%s", pMsg, TMSG_INFO(pRpc->msgType)); taosWriteQitem(pVnode->pWriteQ, pMsg); break; case SYNC_QUEUE: - dTrace("msg:%p, type:%s will be written into vnode-sync queue", pMsg, TMSG_INFO(pRpc->msgType)); + dTrace("msg:%p, put into vnode-sync worker, type:%s", pMsg, TMSG_INFO(pRpc->msgType)); taosWriteQitem(pVnode->pSyncQ, pMsg); break; case MERGE_QUEUE: - dTrace("msg:%p, type:%s will be written into vnode-merge queue", pMsg, TMSG_INFO(pRpc->msgType)); + dTrace("msg:%p, put into vnode-merge worker, type:%s", pMsg, TMSG_INFO(pRpc->msgType)); taosWriteQitem(pVnode->pMergeQ, pMsg); break; default: @@ -312,7 +302,7 @@ int32_t vmPutNodeMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t vmPutNodeMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { SSingleWorker *pWorker = &pMgmt->mgmtWorker; - dTrace("msg:%p, will be put into vnode-mgmt queue, worker:%s", pMsg, pWorker->name); + dTrace("msg:%p, put into vnode-mgmt worker, type:%s", pMsg, TMSG_INFO(pMsg->msgType)); taosWriteQitem(pWorker->queue, pMsg); return 0; } @@ -320,7 +310,7 @@ int32_t vmPutNodeMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t vmPutNodeMsgToMonitorQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { SSingleWorker *pWorker = &pMgmt->monitorWorker; - dTrace("msg:%p, will be put into vnode-monitor queue, worker:%s", pMsg, pWorker->name); + dTrace("msg:%p, put into vnode-monitor worker, type:%s", pMsg, TMSG_INFO(pMsg->msgType)); taosWriteQitem(pWorker->queue, pMsg); return 0; } @@ -332,35 +322,33 @@ static int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc, EQueueType q if (pVnode == NULL) return -1; SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM); - int32_t code = 0; + int32_t code = 0; if (pMsg != NULL) { - dTrace("msg:%p, is created, type:%s", pMsg, TMSG_INFO(pRpc->msgType)); memcpy(pMsg, pRpc, sizeof(SRpcMsg)); - // if (pMsg->handle != NULL) assert(pMsg->refId != 0); switch (qtype) { case WRITE_QUEUE: - dTrace("msg:%p, will be put into vnode-write queue", pMsg); + dTrace("msg:%p, create and put into vnode-write worker, type:%s", pMsg, TMSG_INFO(pRpc->msgType)); taosWriteQitem(pVnode->pWriteQ, pMsg); break; case QUERY_QUEUE: - dTrace("msg:%p, will be put into vnode-query queue", pMsg); + dTrace("msg:%p, create and put into vnode-query queue, type:%s", pMsg, TMSG_INFO(pRpc->msgType)); taosWriteQitem(pVnode->pQueryQ, pMsg); break; case FETCH_QUEUE: - dTrace("msg:%p, will be put into vnode-fetch queue", pMsg); + dTrace("msg:%p, create and put into vnode-fetch queue, type:%s", pMsg, TMSG_INFO(pRpc->msgType)); taosWriteQitem(pVnode->pFetchQ, pMsg); break; case APPLY_QUEUE: - dTrace("msg:%p, will be put into vnode-apply queue", pMsg); + dTrace("msg:%p, create and put into vnode-apply queue, type:%s", pMsg, TMSG_INFO(pRpc->msgType)); taosWriteQitem(pVnode->pApplyQ, pMsg); break; case MERGE_QUEUE: - dTrace("msg:%p, will be put into vnode-merge queue", pMsg); + dTrace("msg:%p, create and put into vnode-merge queue, type:%s", pMsg, TMSG_INFO(pRpc->msgType)); taosWriteQitem(pVnode->pMergeQ, pMsg); break; case SYNC_QUEUE: - dTrace("msg:%p, will be put into vnode-sync queue", pMsg); + dTrace("msg:%p, create and put into vnode-sync queue, type:%s", pMsg, TMSG_INFO(pRpc->msgType)); taosWriteQitem(pVnode->pSyncQ, pMsg); break; default: