refactor: remove rpc client in executor and scanoperator

This commit is contained in:
Shengliang Guan 2022-05-09 17:03:44 +08:00
parent 3f472fceda
commit 5f33d88cb2
5 changed files with 1 additions and 18 deletions

View File

@ -201,7 +201,6 @@ void mmInitMsgHandle(SMgmtWrapper *pWrapper) {
dmSetMsgHandle(pWrapper, TDMT_MND_KILL_CONN, mmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_MND_KILL_CONN, mmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_MND_HEARTBEAT, mmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_MND_HEARTBEAT, mmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_MND_SYSTABLE_RETRIEVE, mmProcessReadMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_MND_SYSTABLE_RETRIEVE, mmProcessReadMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_MND_SYSTABLE_RETRIEVE_RSP, mmProcessReadMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_MND_STATUS, mmProcessReadMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_MND_STATUS, mmProcessReadMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_MND_KILL_TRANS, mmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_MND_KILL_TRANS, mmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_MND_GRANT, mmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_MND_GRANT, mmProcessWriteMsg, DEFAULT_HANDLE);

View File

@ -289,7 +289,6 @@ void vmInitMsgHandle(SMgmtWrapper *pWrapper) {
dmSetMsgHandle(pWrapper, TDMT_VND_QUERY, vmProcessQueryMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_QUERY, vmProcessQueryMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, vmProcessQueryMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, vmProcessQueryMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_FETCH, vmProcessFetchMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_FETCH, vmProcessFetchMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, vmProcessMgmtMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, vmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, vmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_UPDATE_TAG_VAL, vmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_UPDATE_TAG_VAL, vmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_TABLE_META, vmProcessFetchMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_TABLE_META, vmProcessFetchMsg, DEFAULT_HANDLE);

View File

@ -16,7 +16,6 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "vmInt.h" #include "vmInt.h"
#include "qworker.h"
#include "sync.h" #include "sync.h"
#include "syncTools.h" #include "syncTools.h"
@ -52,11 +51,6 @@ static void vmProcessMgmtMonitorQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
case TDMT_DND_DROP_VNODE: case TDMT_DND_DROP_VNODE:
code = vmProcessDropVnodeReq(pMgmt, pMsg); code = vmProcessDropVnodeReq(pMgmt, pMsg);
break; break;
case TDMT_VND_FETCH_RSP:
// todo refactor
code = qWorkerProcessFetchRsp(NULL, NULL, &pMsg->rpcMsg);
pMsg->rpcMsg.pCont = NULL; // already freed in qworker
break;
default: default:
terrno = TSDB_CODE_MSG_NOT_PROCESSED; terrno = TSDB_CODE_MSG_NOT_PROCESSED;
dError("msg:%p, not processed in vnode-mgmt/monitor queue", pMsg); dError("msg:%p, not processed in vnode-mgmt/monitor queue", pMsg);

View File

@ -16,7 +16,6 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndShow.h" #include "mndShow.h"
#include "systable.h" #include "systable.h"
#include "qworker.h"
#define SHOW_STEP_SIZE 100 #define SHOW_STEP_SIZE 100
@ -26,7 +25,6 @@ static SShowObj *mndAcquireShowObj(SMnode *pMnode, int64_t showId);
static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove); static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove);
static bool mndCheckRetrieveFinished(SShowObj *pShow); static bool mndCheckRetrieveFinished(SShowObj *pShow);
static int32_t mndProcessRetrieveSysTableReq(SNodeMsg *pReq); static int32_t mndProcessRetrieveSysTableReq(SNodeMsg *pReq);
static int32_t mndProcessRetrieveSysTableRsp(SNodeMsg *pRsp);
int32_t mndInitShow(SMnode *pMnode) { int32_t mndInitShow(SMnode *pMnode) {
SShowMgmt *pMgmt = &pMnode->showMgmt; SShowMgmt *pMgmt = &pMnode->showMgmt;
@ -39,7 +37,6 @@ int32_t mndInitShow(SMnode *pMnode) {
} }
mndSetMsgHandle(pMnode, TDMT_MND_SYSTABLE_RETRIEVE, mndProcessRetrieveSysTableReq); mndSetMsgHandle(pMnode, TDMT_MND_SYSTABLE_RETRIEVE, mndProcessRetrieveSysTableReq);
mndSetMsgHandle(pMnode, TDMT_MND_SYSTABLE_RETRIEVE_RSP, mndProcessRetrieveSysTableRsp);
return 0; return 0;
} }
@ -178,13 +175,6 @@ static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove) {
taosCacheRelease(pMgmt->cache, (void **)(&pShow), forceRemove); taosCacheRelease(pMgmt->cache, (void **)(&pShow), forceRemove);
} }
static int32_t mndProcessRetrieveSysTableRsp(SNodeMsg *pRsp) {
mTrace("mnode-systable-retrieve-rsp is received");
qWorkerProcessFetchRsp(NULL, NULL, &pRsp->rpcMsg);
pRsp->rpcMsg.pCont = NULL; // already freed in qworker
return 0;
}
static int32_t mndProcessRetrieveSysTableReq(SNodeMsg *pReq) { static int32_t mndProcessRetrieveSysTableReq(SNodeMsg *pReq) {
SMnode *pMnode = pReq->pNode; SMnode *pMnode = pReq->pNode;
SShowMgmt *pMgmt = &pMnode->showMgmt; SShowMgmt *pMgmt = &pMnode->showMgmt;

View File

@ -916,6 +916,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) {
SReadHandle handle = { SReadHandle handle = {
.reader = pStreamReader, .reader = pStreamReader,
.meta = pTq->pVnode->pMeta, .meta = pTq->pVnode->pMeta,
.pMsgCb = &pTq->pVnode->msgCb,
}; };
pTask->exec.runners[i].inputHandle = pStreamReader; pTask->exec.runners[i].inputHandle = pStreamReader;
pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);