From 00704f9da727912756acea29fa0d232a108e27cb Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 9 May 2022 18:07:26 +0800 Subject: [PATCH] refactor: remove rpc client in executor and scanoperator --- include/libs/qcom/query.h | 5 +++-- source/client/src/clientHb.c | 2 +- source/client/src/clientImpl.c | 4 ++-- source/client/src/tmq.c | 10 +++++----- source/libs/executor/src/executorimpl.c | 3 +-- source/libs/qcom/src/queryUtil.c | 17 +++++------------ source/libs/scheduler/inc/schedulerInt.h | 5 ++--- source/libs/scheduler/src/scheduler.c | 4 ++-- 8 files changed, 21 insertions(+), 29 deletions(-) diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index d606821bae..cac60c33bb 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -150,7 +150,8 @@ int32_t cleanupTaskQueue(); */ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code); -int32_t asyncSendMsgToServerExt(SMsgCb *pMsgCb, void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo, bool persistHandle, void *ctx); +int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo, + bool persistHandle, void* ctx); /** * Asynchronously send message to server, after the response received, the callback will be incured. @@ -161,7 +162,7 @@ int32_t asyncSendMsgToServerExt(SMsgCb *pMsgCb, void* pTransporter, SEpSet* epSe * @param pInfo * @return */ -int32_t asyncSendMsgToServer(SMsgCb *pMsgCb, void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo); +int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo); int32_t queryBuildUseDbOutput(SUseDbOutput* pOut, SUseDbRsp* usedbRsp); diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 1e4ab0bfcb..fc39e80c1e 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -621,7 +621,7 @@ static void *hbThreadFunc(void *param) { SAppInstInfo *pAppInstInfo = pAppHbMgr->pAppInstInfo; int64_t transporterId = 0; SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp); - asyncSendMsgToServer(NULL, pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo); + asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo); tFreeClientHbBatchReq(pReq, false); hbClearReqInfo(pAppHbMgr); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index b7a7fd7121..97c7d2bad1 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -218,7 +218,7 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) { SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest); int64_t transporterId = 0; - asyncSendMsgToServer(NULL, pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg); + asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg); tsem_wait(&pRequest->body.rspSem); return TSDB_CODE_SUCCESS; @@ -507,7 +507,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t SMsgSendInfo* body = buildConnectMsg(pRequest, connType); int64_t transporterId = 0; - asyncSendMsgToServer(NULL, pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body); + asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body); tsem_wait(&pRequest->body.rspSem); if (pRequest->code != TSDB_CODE_SUCCESS) { diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 0c630dc5f9..9280756a8a 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -589,7 +589,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); int64_t transporterId = 0; - asyncSendMsgToServer(NULL, tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); + asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); if (!async) { tsem_wait(&pParam->rspSem); @@ -666,7 +666,7 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); int64_t transporterId = 0; - asyncSendMsgToServer(NULL, tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); + asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); // avoid double free if msg is sent buf = NULL; @@ -773,7 +773,7 @@ TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbNa SEpSet epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); int64_t transporterId = 0; - asyncSendMsgToServer(NULL, pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); + asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); tsem_wait(&pRequest->body.rspSem); @@ -1046,7 +1046,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) { tscDebug("consumer %ld ask ep", tmq->consumerId); int64_t transporterId = 0; - asyncSendMsgToServer(NULL, tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); + asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); if (!async) { tsem_wait(&pParam->rspSem); @@ -1198,7 +1198,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t waitTime) { tscDebug("consumer %ld send poll to %s : vg %d, epoch %d, req offset %ld, reqId %lu", tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, pVg->currentOffset, pReq->reqId); /*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/ - asyncSendMsgToServer(NULL, tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); + asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); pVg->pollCnt++; tmq->pollCnt++; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 59c6a7f959..d4fb14f6fc 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2813,8 +2813,7 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf pMsgSendInfo->fp = loadRemoteDataCallback; int64_t transporterId = 0; - int32_t code = asyncSendMsgToServer(pExchangeInfo->pMsgCb, pExchangeInfo->pTransporter, &pSource->addr.epSet, - &transporterId, pMsgSendInfo); + int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index e6e4ad23e1..3e3e393f5f 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -136,7 +136,8 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code) return 0; } -int32_t asyncSendMsgToServerExt(SMsgCb *pMsgCb, void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo, bool persistHandle, void *rpcCtx) { +int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo, + bool persistHandle, void* rpcCtx) { char* pMsg = rpcMallocCont(pInfo->msgInfo.len); if (NULL == pMsg) { qError("0x%" PRIx64 " msg:%s malloc failed", pInfo->requestId, TMSG_INFO(pInfo->msgType)); @@ -159,20 +160,12 @@ int32_t asyncSendMsgToServerExt(SMsgCb *pMsgCb, void* pTransporter, SEpSet* epSe assert(pInfo->fp != NULL); - if (pMsgCb != NULL) { - // todo in multi-process mode - ASSERT(pTransporterId == NULL || *pTransporterId == 0); - ASSERT(rpcCtx == NULL); - tmsgSendReq(pMsgCb, epSet, &rpcMsg); - } else { - ASSERT(pTransporter != NULL); - rpcSendRequestWithCtx(pTransporter, epSet, &rpcMsg, pTransporterId, rpcCtx); - } + rpcSendRequestWithCtx(pTransporter, epSet, &rpcMsg, pTransporterId, rpcCtx); return TSDB_CODE_SUCCESS; } -int32_t asyncSendMsgToServer(SMsgCb *pMsgCb, void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo) { - return asyncSendMsgToServerExt(pMsgCb, pTransporter, epSet, pTransporterId, pInfo, false, NULL); +int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo) { + return asyncSendMsgToServerExt(pTransporter, epSet, pTransporterId, pInfo, false, NULL); } char *jobTaskStatusStr(int32_t status) { diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index ce3808033d..5906ee8970 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -40,9 +40,8 @@ enum { }; typedef struct SSchTrans { - void *transInst; - void *transHandle; - SMsgCb *pMsgCb; + void *transInst; + void *transHandle; } SSchTrans; typedef struct SSchHbTrans { diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 088b596873..10e4255022 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -1850,7 +1850,7 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet trans->transInst, trans->transHandle); int64_t transporterId = 0; - code = asyncSendMsgToServerExt(trans->pMsgCb, trans->transInst, epSet, &transporterId, pMsgSendInfo, persistHandle, ctx); + code = asyncSendMsgToServerExt(trans->transInst, epSet, &transporterId, pMsgSendInfo, persistHandle, ctx); if (code) { SCH_ERR_JRET(code); } @@ -1940,7 +1940,7 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId) { qDebug("start to send hb msg, instance:%p, handle:%p, fqdn:%s, port:%d", trans.transInst, trans.transHandle, nodeEpId->ep.fqdn, nodeEpId->ep.port); - code = asyncSendMsgToServerExt(trans.pMsgCb, trans.transInst, &epSet, &transporterId, pMsgSendInfo, true, &rpcCtx); + code = asyncSendMsgToServerExt(trans.transInst, &epSet, &transporterId, pMsgSendInfo, true, &rpcCtx); if (code) { qError("fail to send hb msg, instance:%p, handle:%p, fqdn:%s, port:%d, error:%x - %s", trans.transInst, trans.transHandle, nodeEpId->ep.fqdn, nodeEpId->ep.port, code, tstrerror(code));