diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index a0f523fbae..ed9384a869 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -201,7 +201,6 @@ void mmInitMsgHandle(SMgmtWrapper *pWrapper) { dmSetMsgHandle(pWrapper, TDMT_MND_KILL_CONN, mmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_MND_HEARTBEAT, mmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_MND_SYSTABLE_RETRIEVE, mmProcessReadMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_SYSTABLE_RETRIEVE_RSP, mmProcessReadMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_MND_STATUS, mmProcessReadMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_MND_KILL_TRANS, mmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_MND_GRANT, mmProcessWriteMsg, DEFAULT_HANDLE); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 0196dd0cec..c5919e06b6 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -289,7 +289,6 @@ void vmInitMsgHandle(SMgmtWrapper *pWrapper) { dmSetMsgHandle(pWrapper, TDMT_VND_QUERY, vmProcessQueryMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, vmProcessQueryMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_FETCH, vmProcessFetchMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, vmProcessMgmtMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, vmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_UPDATE_TAG_VAL, vmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_TABLE_META, vmProcessFetchMsg, DEFAULT_HANDLE); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 6cfcf67a99..2baa8b8942 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -16,7 +16,6 @@ #define _DEFAULT_SOURCE #include "vmInt.h" -#include "qworker.h" #include "sync.h" #include "syncTools.h" @@ -52,11 +51,6 @@ static void vmProcessMgmtMonitorQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { case TDMT_DND_DROP_VNODE: code = vmProcessDropVnodeReq(pMgmt, pMsg); break; - case TDMT_VND_FETCH_RSP: - // todo refactor - code = qWorkerProcessFetchRsp(NULL, NULL, &pMsg->rpcMsg); - pMsg->rpcMsg.pCont = NULL; // already freed in qworker - break; default: terrno = TSDB_CODE_MSG_NOT_PROCESSED; dError("msg:%p, not processed in vnode-mgmt/monitor queue", pMsg); diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index 185e2443ce..be333d154a 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -16,7 +16,6 @@ #define _DEFAULT_SOURCE #include "mndShow.h" #include "systable.h" -#include "qworker.h" #define SHOW_STEP_SIZE 100 @@ -26,7 +25,6 @@ static SShowObj *mndAcquireShowObj(SMnode *pMnode, int64_t showId); static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove); static bool mndCheckRetrieveFinished(SShowObj *pShow); static int32_t mndProcessRetrieveSysTableReq(SNodeMsg *pReq); -static int32_t mndProcessRetrieveSysTableRsp(SNodeMsg *pRsp); int32_t mndInitShow(SMnode *pMnode) { SShowMgmt *pMgmt = &pMnode->showMgmt; @@ -39,7 +37,6 @@ int32_t mndInitShow(SMnode *pMnode) { } mndSetMsgHandle(pMnode, TDMT_MND_SYSTABLE_RETRIEVE, mndProcessRetrieveSysTableReq); - mndSetMsgHandle(pMnode, TDMT_MND_SYSTABLE_RETRIEVE_RSP, mndProcessRetrieveSysTableRsp); return 0; } @@ -178,13 +175,6 @@ static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove) { taosCacheRelease(pMgmt->cache, (void **)(&pShow), forceRemove); } -static int32_t mndProcessRetrieveSysTableRsp(SNodeMsg *pRsp) { - mTrace("mnode-systable-retrieve-rsp is received"); - qWorkerProcessFetchRsp(NULL, NULL, &pRsp->rpcMsg); - pRsp->rpcMsg.pCont = NULL; // already freed in qworker - return 0; -} - static int32_t mndProcessRetrieveSysTableReq(SNodeMsg *pReq) { SMnode *pMnode = pReq->pNode; SShowMgmt *pMgmt = &pMnode->showMgmt; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 1292fed4fc..f607e0367e 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -916,6 +916,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) { SReadHandle handle = { .reader = pStreamReader, .meta = pTq->pVnode->pMeta, + .pMsgCb = &pTq->pVnode->msgCb, }; pTask->exec.runners[i].inputHandle = pStreamReader; pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);