From 423d06dce3301fbbf8917a5cd7839b082ee0db63 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 26 Jan 2022 01:20:48 +0000 Subject: [PATCH] minor changes --- include/libs/executor/executor.h | 2 +- source/libs/executor/src/executorimpl.c | 6 +++--- source/libs/qworker/src/qworkerMsg.c | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index b8a5f28ceb..7187248962 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -209,7 +209,7 @@ void** qReleaseTask(void* pMgmt, void* pQInfo, bool freeHandle); */ void** qDeregisterQInfo(void* pMgmt, void* pQInfo); -void qProcessFetchRsp(struct SRpcMsg* pMsg); +void qProcessFetchRsp(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet); #ifdef __cplusplus } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index edce69f026..4571d8ff32 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -5119,7 +5119,7 @@ static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { tfree(pMsgBody); } -void qProcessFetchRsp(SRpcMsg* pMsg) { +void qProcessFetchRsp(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { SMsgSendInfo *pSendInfo = (SMsgSendInfo *) pMsg->ahandle; assert(pMsg->ahandle != NULL); @@ -5289,14 +5289,14 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pOperator->exec = doLoadRemoteData; pOperator->pTaskInfo = pTaskInfo; -#if 0 +#if 1 { // todo refactor SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localPort = 0; rpcInit.label = "TSC"; rpcInit.numOfThreads = 1; - rpcInit.cfp = processRspMsg; + rpcInit.cfp = qProcessFetchRsp; rpcInit.sessions = tsMaxConnections; rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.user = (char *)"root"; diff --git a/source/libs/qworker/src/qworkerMsg.c b/source/libs/qworker/src/qworkerMsg.c index 92e1edc139..e68c2d8540 100644 --- a/source/libs/qworker/src/qworkerMsg.c +++ b/source/libs/qworker/src/qworkerMsg.c @@ -422,7 +422,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { } int32_t qWorkerProcessFetchRsp(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { - qProcessFetchRsp(pMsg); + qProcessFetchRsp(NULL, pMsg, NULL); return TSDB_CODE_SUCCESS; }