diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index e0fce2aa6a..162b6fb2ed 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -22,6 +22,7 @@ extern "C" { #include "query.h" #include "tcommon.h" +#include "tmsgcb.h" typedef void* qTaskInfo_t; typedef void* DataSinkHandle; @@ -29,11 +30,12 @@ struct SRpcMsg; struct SSubplan; typedef struct SReadHandle { - void* reader; - void* meta; - void* config; - void* vnode; - void* mnd; + void* reader; + void* meta; + void* config; + void* vnode; + void* mnd; + SMsgCb* pMsgCb; } SReadHandle; #define STREAM_DATA_TYPE_SUBMIT_BLOCK 0x1 diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index c1e53fa805..d606821bae 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -24,6 +24,7 @@ extern "C" { #include "thash.h" #include "tlog.h" #include "tmsg.h" +#include "tmsgcb.h" typedef enum { JOB_TASK_STATUS_NULL = 0, @@ -149,7 +150,7 @@ int32_t cleanupTaskQueue(); */ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code); -int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo, bool persistHandle, void *ctx); +int32_t asyncSendMsgToServerExt(SMsgCb *pMsgCb, void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo, bool persistHandle, void *ctx); /** * Asynchronously send message to server, after the response received, the callback will be incured. @@ -160,7 +161,7 @@ int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTra * @param pInfo * @return */ -int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo); +int32_t asyncSendMsgToServer(SMsgCb *pMsgCb, void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo); int32_t queryBuildUseDbOutput(SUseDbOutput* pOut, SUseDbRsp* usedbRsp); diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index fc39e80c1e..1e4ab0bfcb 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -621,7 +621,7 @@ static void *hbThreadFunc(void *param) { SAppInstInfo *pAppInstInfo = pAppHbMgr->pAppInstInfo; int64_t transporterId = 0; SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp); - asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo); + asyncSendMsgToServer(NULL, pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo); tFreeClientHbBatchReq(pReq, false); hbClearReqInfo(pAppHbMgr); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 427cd72c0f..6c6f5695e1 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -219,7 +219,7 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) { SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest); int64_t transporterId = 0; - asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg); + asyncSendMsgToServer(NULL, pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg); tsem_wait(&pRequest->body.rspSem); return TSDB_CODE_SUCCESS; @@ -504,7 +504,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t SMsgSendInfo* body = buildConnectMsg(pRequest, connType); int64_t transporterId = 0; - asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body); + asyncSendMsgToServer(NULL, pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body); tsem_wait(&pRequest->body.rspSem); if (pRequest->code != TSDB_CODE_SUCCESS) { diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 698c0cc4e7..b88335267e 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -589,7 +589,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); int64_t transporterId = 0; - asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); + asyncSendMsgToServer(NULL, tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); if (!async) { tsem_wait(&pParam->rspSem); @@ -666,7 +666,7 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); int64_t transporterId = 0; - asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); + asyncSendMsgToServer(NULL, tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); // avoid double free if msg is sent buf = NULL; @@ -773,7 +773,7 @@ TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbNa SEpSet epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); int64_t transporterId = 0; - asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); + asyncSendMsgToServer(NULL, pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); tsem_wait(&pRequest->body.rspSem); @@ -1046,7 +1046,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) { tscDebug("consumer %ld ask ep", tmq->consumerId); int64_t transporterId = 0; - asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); + asyncSendMsgToServer(NULL, tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); if (!async) { tsem_wait(&pParam->rspSem); @@ -1198,7 +1198,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t waitTime) { tscDebug("consumer %ld send poll to %s : vg %d, epoch %d, req offset %ld, reqId %lu", tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, pVg->currentOffset, pReq->reqId); /*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/ - asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); + asyncSendMsgToServer(NULL, tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); pVg->pollCnt++; tmq->pollCnt++; } diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index ed9384a869..a0f523fbae 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -201,6 +201,7 @@ void mmInitMsgHandle(SMgmtWrapper *pWrapper) { dmSetMsgHandle(pWrapper, TDMT_MND_KILL_CONN, mmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_MND_HEARTBEAT, mmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_MND_SYSTABLE_RETRIEVE, mmProcessReadMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_MND_SYSTABLE_RETRIEVE_RSP, mmProcessReadMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_MND_STATUS, mmProcessReadMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_MND_KILL_TRANS, mmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_MND_GRANT, mmProcessWriteMsg, DEFAULT_HANDLE); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 9329e7b15d..d3003b59c4 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -262,7 +262,7 @@ void vmInitMsgHandle(SMgmtWrapper *pWrapper) { dmSetMsgHandle(pWrapper, TDMT_VND_QUERY, vmProcessQueryMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, vmProcessQueryMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_FETCH, vmProcessFetchMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, vmProcessFetchMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, vmProcessMgmtMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, vmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_UPDATE_TAG_VAL, vmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_TABLE_META, vmProcessFetchMsg, DEFAULT_HANDLE); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index f3d7253e71..1858359078 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -17,6 +17,7 @@ #include "vmInt.h" +#include "qworker.h" #include "sync.h" #include "syncTools.h" @@ -50,6 +51,11 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { case TDMT_DND_DROP_VNODE: code = vmProcessDropVnodeReq(pMgmt, pMsg); break; + case TDMT_VND_FETCH_RSP: + // todo refactor + code = qWorkerProcessFetchRsp(NULL, NULL, &pMsg->rpcMsg); + pMsg->rpcMsg.pCont = NULL; // already freed in qworker + break; default: terrno = TSDB_CODE_MSG_NOT_PROCESSED; dError("msg:%p, not processed in vnode-mgmt queue", pMsg); diff --git a/source/dnode/mnode/impl/src/mndQuery.c b/source/dnode/mnode/impl/src/mndQuery.c index 7aa55e2109..36cde396fa 100644 --- a/source/dnode/mnode/impl/src/mndQuery.c +++ b/source/dnode/mnode/impl/src/mndQuery.c @@ -20,7 +20,7 @@ int32_t mndProcessQueryMsg(SNodeMsg *pReq) { SMnode *pMnode = pReq->pNode; - SReadHandle handle = {.mnd = pMnode}; + SReadHandle handle = {.mnd = pMnode, .pMsgCb = &pMnode->msgCb}; mTrace("msg:%p, in query queue is processing", pReq); switch (pReq->rpcMsg.msgType) { diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index be333d154a..185e2443ce 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -16,6 +16,7 @@ #define _DEFAULT_SOURCE #include "mndShow.h" #include "systable.h" +#include "qworker.h" #define SHOW_STEP_SIZE 100 @@ -25,6 +26,7 @@ static SShowObj *mndAcquireShowObj(SMnode *pMnode, int64_t showId); static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove); static bool mndCheckRetrieveFinished(SShowObj *pShow); static int32_t mndProcessRetrieveSysTableReq(SNodeMsg *pReq); +static int32_t mndProcessRetrieveSysTableRsp(SNodeMsg *pRsp); int32_t mndInitShow(SMnode *pMnode) { SShowMgmt *pMgmt = &pMnode->showMgmt; @@ -37,6 +39,7 @@ int32_t mndInitShow(SMnode *pMnode) { } mndSetMsgHandle(pMnode, TDMT_MND_SYSTABLE_RETRIEVE, mndProcessRetrieveSysTableReq); + mndSetMsgHandle(pMnode, TDMT_MND_SYSTABLE_RETRIEVE_RSP, mndProcessRetrieveSysTableRsp); return 0; } @@ -175,6 +178,13 @@ static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove) { taosCacheRelease(pMgmt->cache, (void **)(&pShow), forceRemove); } +static int32_t mndProcessRetrieveSysTableRsp(SNodeMsg *pRsp) { + mTrace("mnode-systable-retrieve-rsp is received"); + qWorkerProcessFetchRsp(NULL, NULL, &pRsp->rpcMsg); + pRsp->rpcMsg.pCont = NULL; // already freed in qworker + return 0; +} + static int32_t mndProcessRetrieveSysTableReq(SNodeMsg *pReq) { SMnode *pMnode = pReq->pNode; SShowMgmt *pMgmt = &pMnode->showMgmt; diff --git a/source/dnode/qnode/src/qnode.c b/source/dnode/qnode/src/qnode.c index b21141001a..54b29f546c 100644 --- a/source/dnode/qnode/src/qnode.c +++ b/source/dnode/qnode/src/qnode.c @@ -51,7 +51,7 @@ int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad) { return 0; } int32_t qndProcessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg) { qTrace("message in qnode query queue is processing"); - SReadHandle handle = {0}; + SReadHandle handle = {.pMsgCb = &pQnode->msgCb}; switch (pMsg->msgType) { case TDMT_VND_QUERY: { diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 3724a98a5e..5c7dddd98f 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -34,6 +34,7 @@ #include "tlockfree.h" #include "tlosertree.h" #include "tmallocator.h" +#include "tmsgcb.h" #include "tskiplist.h" #include "tstream.h" #include "ttime.h" @@ -120,7 +121,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId); // sma -int32_t tsdbRegisterRSma(STsdb* pTsdb, SMeta* pMeta, SVCreateStbReq* pReq); +int32_t tsdbRegisterRSma(STsdb* pTsdb, SMeta* pMeta, SVCreateStbReq* pReq, SMsgCb* pMsgCb); int32_t tsdbFetchTbUidList(STsdb* pTsdb, STbUidStore** ppStore, tb_uid_t suid, tb_uid_t uid); int32_t tsdbUpdateTbUidList(STsdb* pTsdb, STbUidStore* pUidStore); void tsdbUidStoreDestory(STbUidStore* pStore); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ffd76af5e2..147e106085 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -378,6 +378,7 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu SReadHandle handle = { .reader = pReadHandle, .meta = pTq->pVnode->pMeta, + .pMsgCb = &pTq->pVnode->msgCb, }; pTopic->buffer.output[j].pReadHandle = pReadHandle; pTopic->buffer.output[j].task = qCreateStreamExecTaskInfo(pTopic->qmsg, &handle); @@ -857,6 +858,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { SReadHandle handle = { .reader = pExec->pExecReader[i], .meta = pTq->pVnode->pMeta, + .pMsgCb = &pTq->pVnode->msgCb, }; pExec->task[i] = qCreateStreamExecTaskInfo(pExec->qmsg, &handle); ASSERT(pExec->task[i]); @@ -897,6 +899,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) { SReadHandle handle = { .reader = pStreamReader, .meta = pTq->pVnode->pMeta, + .pMsgCb = &pTq->pVnode->msgCb, }; pTask->exec.runners[i].inputHandle = pStreamReader; pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c index 856481bc5f..6d396e6438 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSma.c +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -1698,7 +1698,7 @@ int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg) { * @param pReq * @return int32_t */ -int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateStbReq *pReq) { +int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateStbReq *pReq, SMsgCb *pMsgCb) { if (!pReq->rollup) { tsdbDebug("vgId:%d return directly since no rollup for stable %s %" PRIi64, REPO_ID(pTsdb), pReq->name, pReq->suid); return TSDB_CODE_SUCCESS; @@ -1742,6 +1742,7 @@ int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateStbReq *pReq) { SReadHandle handle = { .reader = pReadHandle, .meta = pMeta, + .pMsgCb = pMsgCb, }; if (param->qmsg1) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 403c02b440..0603656080 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -142,9 +142,9 @@ _err: int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { vTrace("message in vnode query queue is processing"); #if 0 - SReadHandle handle = {.reader = pVnode->pTsdb, .meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode}; + SReadHandle handle = {.reader = pVnode->pTsdb, .meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb}; #endif - SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode}; + SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb}; switch (pMsg->msgType) { case TDMT_VND_QUERY: return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg); @@ -306,7 +306,7 @@ static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, goto _err; } - tsdbRegisterRSma(pVnode->pTsdb, pVnode->pMeta, &req); + tsdbRegisterRSma(pVnode->pTsdb, pVnode->pMeta, &req, &pVnode->msgCb); tCoderClear(&coder); return 0; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 3ed65f4a05..56311af8b8 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -320,6 +320,7 @@ typedef struct SExchangeInfo { SArray* pSourceDataInfo; tsem_t ready; void* pTransporter; + SMsgCb* pMsgCb; SSDataBlock* pResult; bool seqLoadData; // sequential load data or not, false by default int32_t current; @@ -389,10 +390,7 @@ typedef struct SStreamBlockScanInfo { } SStreamBlockScanInfo; typedef struct SSysTableScanInfo { - union { - void* pTransporter; - SReadHandle readHandle; - }; + SReadHandle readHandle; SRetrieveMetaTableRsp* pRsp; SRetrieveTableReq req; @@ -659,7 +657,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR char* pData, int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggSupporter* pSup); -SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createExchangeOperatorInfo(SMsgCb *pMsgCb, const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, SQueryTableDataCond* pCond, int32_t numOfOutput, int32_t dataLoadFlag, const uint8_t* scanInfo, SArray* pColMatchInfo, SSDataBlock* pResBlock, SNode* pCondition, SInterval* pInterval, double sampleRatio, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index c0ea54ce4a..f0c137e461 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2810,7 +2810,8 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf pMsgSendInfo->fp = loadRemoteDataCallback; int64_t transporterId = 0; - int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo); + int32_t code = asyncSendMsgToServer(pExchangeInfo->pMsgCb, pExchangeInfo->pTransporter, &pSource->addr.epSet, + &transporterId, pMsgSendInfo); return TSDB_CODE_SUCCESS; } @@ -3256,7 +3257,8 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo) { return TSDB_CODE_SUCCESS; } -SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createExchangeOperatorInfo(SMsgCb* pMsgCb, const SNodeList* pSources, SSDataBlock* pBlock, + SExecTaskInfo* pTaskInfo) { SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); @@ -3299,29 +3301,7 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL, destroyExchangeOperatorInfo, NULL, NULL, NULL); - -#if 1 - { // todo refactor - SRpcInit rpcInit; - memset(&rpcInit, 0, sizeof(rpcInit)); - rpcInit.localPort = 0; - rpcInit.label = "EX"; - rpcInit.numOfThreads = 1; - rpcInit.cfp = qProcessFetchRsp; - rpcInit.sessions = tsMaxConnections; - rpcInit.connType = TAOS_CONN_CLIENT; - rpcInit.user = (char*)"root"; - rpcInit.idleTime = tsShellActivityTimer * 1000; - rpcInit.ckey = "key"; - rpcInit.spi = 1; - rpcInit.secret = (char*)"dcc5bed04851fec854c035b2e40263b6"; - - pInfo->pTransporter = rpcOpen(&rpcInit); - if (pInfo->pTransporter == NULL) { - return NULL; // todo - } - } -#endif + pInfo->pMsgCb = pMsgCb; return pOperator; @@ -4774,7 +4754,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) { SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode; SSDataBlock* pResBlock = createResDataBlock(pExchange->node.pOutputDataBlockDesc); - return createExchangeOperatorInfo(pExchange->pSrcEndPoints, pResBlock, pTaskInfo); + return createExchangeOperatorInfo(pHandle->pMsgCb, pExchange->pSrcEndPoints, pResBlock, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) { SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table. diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 2b94c5fdce..ffabfe334c 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1002,7 +1002,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { pMsgSendInfo->fp = loadSysTableCallback; int64_t transporterId = 0; - int32_t code = asyncSendMsgToServer(pInfo->pTransporter, &pInfo->epSet, &transporterId, pMsgSendInfo); + int32_t code = asyncSendMsgToServer(pInfo->readHandle.pMsgCb, NULL, &pInfo->epSet, &transporterId, pMsgSendInfo); tsem_wait(&pInfo->ready); if (pTaskInfo->code) { @@ -1126,29 +1126,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe } else { tsem_init(&pInfo->ready, 0, 0); pInfo->epSet = epset; - -#if 1 - { // todo refactor - SRpcInit rpcInit; - memset(&rpcInit, 0, sizeof(rpcInit)); - rpcInit.localPort = 0; - rpcInit.label = "DB-META"; - rpcInit.numOfThreads = 1; - rpcInit.cfp = qProcessFetchRsp; - rpcInit.sessions = tsMaxConnections; - rpcInit.connType = TAOS_CONN_CLIENT; - rpcInit.user = (char*)"root"; - rpcInit.idleTime = tsShellActivityTimer * 1000; - rpcInit.ckey = "key"; - rpcInit.spi = 1; - rpcInit.secret = (char*)"dcc5bed04851fec854c035b2e40263b6"; - - pInfo->pTransporter = rpcOpen(&rpcInit); - if (pInfo->pTransporter == NULL) { - return NULL; // todo - } - } -#endif + pInfo->readHandle = *(SReadHandle*)readHandle; } pOperator->name = "SysTableScanOperator"; diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 288d2e5f76..e6e4ad23e1 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -136,7 +136,7 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code) return 0; } -int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo, bool persistHandle, void *rpcCtx) { +int32_t asyncSendMsgToServerExt(SMsgCb *pMsgCb, void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo, bool persistHandle, void *rpcCtx) { char* pMsg = rpcMallocCont(pInfo->msgInfo.len); if (NULL == pMsg) { qError("0x%" PRIx64 " msg:%s malloc failed", pInfo->requestId, TMSG_INFO(pInfo->msgType)); @@ -159,12 +159,20 @@ int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTra assert(pInfo->fp != NULL); - rpcSendRequestWithCtx(pTransporter, epSet, &rpcMsg, pTransporterId, rpcCtx); + if (pMsgCb != NULL) { + // todo in multi-process mode + ASSERT(pTransporterId == NULL || *pTransporterId == 0); + ASSERT(rpcCtx == NULL); + tmsgSendReq(pMsgCb, epSet, &rpcMsg); + } else { + ASSERT(pTransporter != NULL); + rpcSendRequestWithCtx(pTransporter, epSet, &rpcMsg, pTransporterId, rpcCtx); + } return TSDB_CODE_SUCCESS; } -int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo) { - return asyncSendMsgToServerExt(pTransporter, epSet, pTransporterId, pInfo, false, NULL); +int32_t asyncSendMsgToServer(SMsgCb *pMsgCb, void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo) { + return asyncSendMsgToServerExt(pMsgCb, pTransporter, epSet, pTransporterId, pInfo, false, NULL); } char *jobTaskStatusStr(int32_t status) { diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 5906ee8970..ce3808033d 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -40,8 +40,9 @@ enum { }; typedef struct SSchTrans { - void *transInst; - void *transHandle; + void *transInst; + void *transHandle; + SMsgCb *pMsgCb; } SSchTrans; typedef struct SSchHbTrans { diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 53772601ca..4093c6d5c2 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -1850,7 +1850,7 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet trans->transInst, trans->transHandle); int64_t transporterId = 0; - code = asyncSendMsgToServerExt(trans->transInst, epSet, &transporterId, pMsgSendInfo, persistHandle, ctx); + code = asyncSendMsgToServerExt(trans->pMsgCb, trans->transInst, epSet, &transporterId, pMsgSendInfo, persistHandle, ctx); if (code) { SCH_ERR_JRET(code); } @@ -1940,7 +1940,7 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId) { qDebug("start to send hb msg, instance:%p, handle:%p, fqdn:%s, port:%d", trans.transInst, trans.transHandle, nodeEpId->ep.fqdn, nodeEpId->ep.port); - code = asyncSendMsgToServerExt(trans.transInst, &epSet, &transporterId, pMsgSendInfo, true, &rpcCtx); + code = asyncSendMsgToServerExt(trans.pMsgCb, trans.transInst, &epSet, &transporterId, pMsgSendInfo, true, &rpcCtx); if (code) { qError("fail to send hb msg, instance:%p, handle:%p, fqdn:%s, port:%d, error:%x - %s", trans.transInst, trans.transHandle, nodeEpId->ep.fqdn, nodeEpId->ep.port, code, tstrerror(code));