diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index 2ab703df95..b4c75da131 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -48,7 +48,7 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo // @pSource one execution location of this group of datasource subplans int32_t qSetSubplanExecutionNode(SSubplan* pSubplan, int32_t groupId, SDownstreamSourceNode* pSource); -int32_t qClearSubplanExecutionNode(SSubplan* pSubplan); +void qClearSubplanExecutionNode(SSubplan* pSubplan); // Convert to subplan to string for the scheduler to send to the executor int32_t qSubPlanToString(const SSubplan* pSubplan, char** pStr, int32_t* pLen); diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 5767926e5e..92131e354a 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -135,6 +135,7 @@ typedef struct STableMetaOutput { } STableMetaOutput; typedef struct SDataBuf { + int32_t msgType; void* pData; uint32_t len; void* handle; @@ -147,7 +148,7 @@ typedef struct STargetInfo { int32_t vgId; } STargetInfo; -typedef int32_t (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code); +typedef int32_t (*__async_send_cb_fn_t)(void* param, SDataBuf* pMsg, int32_t code); typedef int32_t (*__async_exec_fn_t)(void* param); typedef struct SRequestConnInfo { diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 2de630e181..48fa2d7938 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -262,7 +262,7 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { return TSDB_CODE_SUCCESS; } -static int32_t hbAsyncCallBack(void *param, const SDataBuf *pMsg, int32_t code) { +static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) { static int32_t emptyRspNum = 0; if (code != 0) { taosMemoryFreeClear(param); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 68a225f64c..b20fd4b9b0 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1307,7 +1307,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { updateTargetEpSet(pSendInfo, pTscObj, pMsg, pEpSet); - SDataBuf buf = {.len = pMsg->contLen, .pData = NULL, .handle = pMsg->info.handle, .pEpSet = pEpSet}; + SDataBuf buf = {.msgType = pMsg->msgType, .len = pMsg->contLen, .pData = NULL, .handle = pMsg->info.handle, .pEpSet = pEpSet}; if (pMsg->contLen > 0) { buf.pData = taosMemoryCalloc(1, pMsg->contLen); diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 5c30df4ae2..ce866d2c20 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -26,7 +26,7 @@ static void setErrno(SRequestObj* pRequest, int32_t code) { terrno = code; } -int32_t genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code) { +int32_t genericRspCallback(void* param, SDataBuf* pMsg, int32_t code) { SRequestObj* pRequest = param; setErrno(pRequest, code); @@ -39,7 +39,7 @@ int32_t genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code) { return code; } -int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { +int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) { SRequestObj* pRequest = param; if (code != TSDB_CODE_SUCCESS) { taosMemoryFree(pMsg->pData); @@ -116,7 +116,7 @@ SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pRequest) { return pMsgSendInfo; } -int32_t processCreateDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { +int32_t processCreateDbRsp(void* param, SDataBuf* pMsg, int32_t code) { // todo rsp with the vnode id list SRequestObj* pRequest = param; taosMemoryFree(pMsg->pData); @@ -132,7 +132,7 @@ int32_t processCreateDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { return code; } -int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { +int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) { SRequestObj* pRequest = param; if (TSDB_CODE_MND_DB_NOT_EXIST == code) { @@ -211,7 +211,7 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { return 0; } -int32_t processCreateSTableRsp(void* param, const SDataBuf* pMsg, int32_t code) { +int32_t processCreateSTableRsp(void* param, SDataBuf* pMsg, int32_t code) { assert(pMsg != NULL && param != NULL); SRequestObj* pRequest = param; @@ -229,7 +229,7 @@ int32_t processCreateSTableRsp(void* param, const SDataBuf* pMsg, int32_t code) return code; } -int32_t processDropDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { +int32_t processDropDbRsp(void* param, SDataBuf* pMsg, int32_t code) { SRequestObj* pRequest = param; if (code != TSDB_CODE_SUCCESS) { setErrno(pRequest, code); @@ -250,7 +250,7 @@ int32_t processDropDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { return code; } -int32_t processAlterStbRsp(void* param, const SDataBuf* pMsg, int32_t code) { +int32_t processAlterStbRsp(void* param, SDataBuf* pMsg, int32_t code) { SRequestObj* pRequest = param; if (code != TSDB_CODE_SUCCESS) { setErrno(pRequest, code); @@ -357,7 +357,7 @@ static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) { return TSDB_CODE_SUCCESS; } -int32_t processShowVariablesRsp(void* param, const SDataBuf* pMsg, int32_t code) { +int32_t processShowVariablesRsp(void* param, SDataBuf* pMsg, int32_t code) { SRequestObj* pRequest = param; if (code != TSDB_CODE_SUCCESS) { setErrno(pRequest, code); diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 637a7ee5dd..165deaa6c5 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -372,7 +372,7 @@ int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) { return 0; } -int32_t tmqCommitCb2(void* param, const SDataBuf* pBuf, int32_t code) { +int32_t tmqCommitCb2(void* param, SDataBuf* pBuf, int32_t code) { SMqCommitCbParam2* pParam = (SMqCommitCbParam2*)param; SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params; // push into array @@ -862,7 +862,7 @@ void tmqClearUnhandleMsg(tmq_t* tmq) { } } -int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) { +int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) { SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param; pParam->rspErr = code; /*tmq_t* tmq = pParam->tmq;*/ @@ -1116,7 +1116,7 @@ int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) { } #endif -int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { +int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { SMqPollCbParam* pParam = (SMqPollCbParam*)param; SMqClientVg* pVg = pParam->pVg; SMqClientTopic* pTopic = pParam->pTopic; @@ -1368,7 +1368,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) { } #endif -int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { +int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) { SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param; tmq_t* tmq = pParam->tmq; int8_t async = pParam->async; diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c b/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c index 5911daaa2b..1f22eefddf 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c @@ -111,7 +111,7 @@ SArray *qmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, qmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, qmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_RSP, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH_RSP, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_CANCEL_TASK, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 75e69e3716..4e581fd28e 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -86,7 +86,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { return; case TDMT_MND_SYSTABLE_RETRIEVE_RSP: case TDMT_DND_SYSTABLE_RETRIEVE_RSP: - case TDMT_VND_FETCH_RSP: + case TDMT_SCH_FETCH_RSP: qWorkerProcessFetchRsp(NULL, NULL, pRpc, 0); return; case TDMT_MND_STATUS_RSP: diff --git a/source/dnode/qnode/src/qnode.c b/source/dnode/qnode/src/qnode.c index 5909d6f599..cfc63b083d 100644 --- a/source/dnode/qnode/src/qnode.c +++ b/source/dnode/qnode/src/qnode.c @@ -88,7 +88,7 @@ int32_t qndProcessQueryMsg(SQnode *pQnode, int64_t ts, SRpcMsg *pMsg) { case TDMT_SCH_FETCH: code = qWorkerProcessFetchMsg(pQnode, pQnode->pQuery, pMsg, ts); break; - case TDMT_VND_FETCH_RSP: + case TDMT_SCH_FETCH_RSP: code = qWorkerProcessFetchRsp(pQnode, pQnode->pQuery, pMsg, ts); break; case TDMT_SCH_CANCEL_TASK: diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 64fa6c705d..d0a1b5417f 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -246,7 +246,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { switch (pMsg->msgType) { case TDMT_SCH_FETCH: return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg, 0); - case TDMT_VND_FETCH_RSP: + case TDMT_SCH_FETCH_RSP: return qWorkerProcessFetchRsp(pVnode, pVnode->pQuery, pMsg, 0); case TDMT_SCH_CANCEL_TASK: return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg, 0); diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c index 304da88888..59ad009527 100644 --- a/source/libs/catalog/src/ctgRemote.c +++ b/source/libs/catalog/src/ctgRemote.c @@ -241,7 +241,7 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, } -int32_t ctgHandleMsgCallback(void *param, const SDataBuf *pMsg, int32_t rspCode) { +int32_t ctgHandleMsgCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { SCtgTaskCallbackParam* cbParam = (SCtgTaskCallbackParam*)param; int32_t code = 0; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 9b38f46ca6..c6f1096bfe 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1933,7 +1933,7 @@ typedef struct SFetchRspHandleWrapper { int32_t sourceIndex; } SFetchRspHandleWrapper; -int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code) { +int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) { SFetchRspHandleWrapper* pWrapper = (SFetchRspHandleWrapper*)param; SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pWrapper->exchangeId); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index c51ef44154..146b6ca76b 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1322,7 +1322,7 @@ static void getDBNameFromCondition(SNode* pCondition, const char* dbName) { nodesWalkExpr(pCondition, getDBNameFromConditionWalker, (char*)dbName); } -static int32_t loadSysTableCallback(void* param, const SDataBuf* pMsg, int32_t code) { +static int32_t loadSysTableCallback(void* param, SDataBuf* pMsg, int32_t code) { SOperatorInfo* operator=(SOperatorInfo*) param; SSysTableScanInfo* pScanResInfo = (SSysTableScanInfo*)operator->info; if (TSDB_CODE_SUCCESS == code) { diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 2368b13dd6..4edd7a8a6e 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -82,6 +82,7 @@ extern SQWDebug gQWDebug; typedef struct SQWMsg { void *node; int32_t code; + int32_t msgType; char *msg; int32_t msgLen; SRpcHandleInfo connInfo; @@ -100,6 +101,7 @@ typedef struct SQWHbInfo { typedef struct SQWPhaseInput { int32_t code; + int32_t msgType; } SQWPhaseInput; typedef struct SQWPhaseOutput { @@ -119,6 +121,7 @@ typedef struct SQWTaskCtx { int8_t phase; int8_t taskType; int8_t explain; + int32_t queryType; bool queryFetched; bool queryEnd; diff --git a/source/libs/qworker/inc/qwMsg.h b/source/libs/qworker/inc/qwMsg.h index 8c7c030dce..9e9d1f44cb 100644 --- a/source/libs/qworker/inc/qwMsg.h +++ b/source/libs/qworker/inc/qwMsg.h @@ -39,7 +39,7 @@ int32_t qwBuildAndSendFetchRsp(SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, i int32_t code); void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete); int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn); -int32_t qwBuildAndSendQueryRsp(SRpcHandleInfo *pConn, int32_t code, STbVerInfo* tbInfo); +int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, STbVerInfo* tbInfo); int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execInfo, int32_t num); void qwFreeFetchRsp(void *msg); int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp); diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index 8c1371fce9..70a6a70c44 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -43,7 +43,7 @@ void qwFreeFetchRsp(void *msg) { } } -int32_t qwBuildAndSendQueryRsp(SRpcHandleInfo *pConn, int32_t code, STbVerInfo* tbInfo) { +int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, STbVerInfo* tbInfo) { SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp)); pRsp->code = code; if (tbInfo) { @@ -53,7 +53,7 @@ int32_t qwBuildAndSendQueryRsp(SRpcHandleInfo *pConn, int32_t code, STbVerInfo* } SRpcMsg rpcRsp = { - .msgType = TDMT_VND_QUERY_RSP, + .msgType = rspType, .pCont = pRsp, .contLen = sizeof(*pRsp), .code = code, @@ -73,7 +73,7 @@ int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execIn tSerializeSExplainRsp(pRsp, contLen, &rsp); SRpcMsg rpcRsp = { - .msgType = TDMT_VND_EXPLAIN_RSP, + .msgType = TDMT_SCH_EXPLAIN_RSP, .pCont = pRsp, .contLen = contLen, .code = 0, @@ -92,7 +92,7 @@ int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *pStatus, int tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus); SRpcMsg rpcRsp = { - .msgType = TDMT_VND_QUERY_HEARTBEAT_RSP, + .msgType = TDMT_SCH_QUERY_HEARTBEAT_RSP, .contLen = contLen, .pCont = pRsp, .code = code, @@ -112,7 +112,7 @@ int32_t qwBuildAndSendFetchRsp(SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, i } SRpcMsg rpcRsp = { - .msgType = TDMT_VND_FETCH_RSP, + .msgType = TDMT_SCH_FETCH_RSP, .pCont = pRsp, .contLen = sizeof(*pRsp) + dataLength, .code = code, @@ -129,7 +129,7 @@ int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code) { pRsp->code = code; SRpcMsg rpcRsp = { - .msgType = TDMT_VND_CANCEL_TASK_RSP, + .msgType = TDMT_SCH_CANCEL_TASK_RSP, .pCont = pRsp, .contLen = sizeof(*pRsp), .code = code, @@ -145,7 +145,7 @@ int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code) { pRsp->code = code; SRpcMsg rpcRsp = { - .msgType = TDMT_VND_DROP_TASK_RSP, + .msgType = TDMT_SCH_DROP_TASK_RSP, .pCont = pRsp, .contLen = sizeof(*pRsp), .code = code, @@ -325,9 +325,9 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int uint64_t tId = msg->taskId; int64_t rId = msg->refId; - SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info}; + SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info, .msgType = pMsg->msgType}; char * sql = strndup(msg->msg, msg->sqlLen); - QW_SCH_TASK_DLOG("processQuery start, node:%p, handle:%p, sql:%s", node, pMsg->info.handle, sql); + QW_SCH_TASK_DLOG("processQuery start, node:%p, type:%s, handle:%p, sql:%s", node, TMSG_INFO(pMsg->msgType), pMsg->info.handle, sql); QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, msg->taskType, msg->explain, sql)); QW_SCH_TASK_DLOG("processQuery end, node:%p", node); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 4250785f7c..8ab293d0ad 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -417,17 +417,10 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp } if (QW_PHASE_POST_QUERY == phase) { -#if 0 - if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_READY)) { - readyConnection = &ctx->connInfo; - QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY); - } -#else connInfo = ctx->ctrlConnInfo; rspConnection = &connInfo; QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY); -#endif } if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { @@ -458,8 +451,8 @@ _return: } if (rspConnection) { - qwBuildAndSendQueryRsp(rspConnection, code, ctx ? &ctx->tbInfo : NULL); - QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", rspConnection->handle, code, tstrerror(code)); + qwBuildAndSendQueryRsp(input->msgType + 1, rspConnection, code, ctx ? &ctx->tbInfo : NULL); + QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", rspConnection->handle, code, tstrerror(code)); } if (ctx) { @@ -530,8 +523,9 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); - atomic_store_8(&ctx->taskType, taskType); - atomic_store_8(&ctx->explain, explain); + ctx->taskType = taskType; + ctx->explain = explain; + ctx->queryType = qwMsg->msgType; QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg); @@ -571,6 +565,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex _return: input.code = code; + input.msgType = qwMsg->msgType; code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL); // if (!queryRsped) { diff --git a/source/libs/qworker/test/qworkerTests.cpp b/source/libs/qworker/test/qworkerTests.cpp index 7c7d262167..bc37400249 100644 --- a/source/libs/qworker/test/qworkerTests.cpp +++ b/source/libs/qworker/test/qworkerTests.cpp @@ -202,7 +202,8 @@ void qwtSendReqToDnode(void* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq) void qwtRpcSendResponse(const SRpcMsg *pRsp) { switch (pRsp->msgType) { - case TDMT_VND_QUERY_RSP: { + case TDMT_SCH_QUERY_RSP: + case TDMT_SCH_MERGE_QUERY_RSP: { SQueryTableRsp *rsp = (SQueryTableRsp *)pRsp->pCont; if (pRsp->code) { @@ -213,7 +214,7 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) { rpcFreeCont(rsp); break; } - case TDMT_VND_FETCH_RSP: { + case TDMT_SCH_FETCH_RSP: { SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)pRsp->pCont; if (0 == pRsp->code && 0 == rsp->completed) { @@ -229,7 +230,7 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) { break; } - case TDMT_VND_DROP_TASK_RSP: { + case TDMT_SCH_DROP_TASK_RSP: { STaskDropRsp *rsp = (STaskDropRsp *)pRsp->pCont; rpcFreeCont(rsp); diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 4e36bd3f10..fb0c43b0ff 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -353,7 +353,7 @@ void schFreeJobImpl(void *job); int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx); int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask); int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans); -int32_t schHandleHbCallback(void *param, const SDataBuf *pMsg, int32_t code); +int32_t schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code); void schFreeRpcCtx(SRpcCtx *pCtx); int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp); bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus); @@ -383,7 +383,8 @@ char* schGetOpStr(SCH_OP_TYPE type); int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync); int32_t schInitJob(SSchedulerReq *pReq, SSchJob **pSchJob); int32_t schSetJobQueryRes(SSchJob* pJob, SQueryResult* pRes); -int32_t schUpdateTaskCandidateAddr(SSchTask *pTask, SEpSet* pEpSet); +int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet* pEpSet); +int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32_t rspCode); #ifdef __cplusplus diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index fd27193b0f..643594f4e0 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -682,9 +682,9 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { return TSDB_CODE_SUCCESS; } -int32_t schUpdateTaskCandidateAddr(SSchTask *pTask, SEpSet* pEpSet) { +int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet* pEpSet) { if (NULL == pTask->candidateAddrs || 1 != taosArrayGetSize(pTask->candidateAddrs)) { - SCH_TASK_ELOG("not able to update cndidate addr, addr num %d", (pTask->candidateAddrs ? taosArrayGetSize(pTask->candidateAddrs): 0)); + SCH_TASK_ELOG("not able to update cndidate addr, addr num %d", (int32_t)(pTask->candidateAddrs ? taosArrayGetSize(pTask->candidateAddrs): 0)); SCH_ERR_RET(TSDB_CODE_APP_ERROR); } @@ -1685,7 +1685,7 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, int32_t rspCode) { return TSDB_CODE_SUCCESS; } - SCH_TASK_DLOG("task will be redirected now, status:%d", SCH_GET_TASK_STATUS_STR(pTask)); + SCH_TASK_DLOG("task will be redirected now, status:%s", SCH_GET_TASK_STATUS_STR(pTask)); schDropTaskOnExecNode(pJob, pTask); taosHashClear(pTask->execNodes); @@ -1735,7 +1735,7 @@ _return: SCH_RET(code); } -int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, int32_t msgType, SDataBuf* pData, int32_t rspCode) { +int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32_t rspCode) { int32_t code = 0; if (SCH_IS_DATA_SRC_QRY_TASK(pTask)) { @@ -1744,7 +1744,7 @@ int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, int32_t msgType, SData SCH_ERR_JRET(rspCode); } - SCH_ERR_JRET(schUpdateTaskCandidateAddr(pTask, pData->pEpSet)); + SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet)); } schDoTaskRedirect(pJob, pTask, rspCode); diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 76d2061936..91f5ff979c 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -28,9 +28,10 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy int32_t reqMsgType = msgType - 1; switch (msgType) { case TDMT_SCH_LINK_BROKEN: - case TDMT_VND_EXPLAIN_RSP: + case TDMT_SCH_EXPLAIN_RSP: return TSDB_CODE_SUCCESS; - case TDMT_VND_QUERY_RSP: // query_rsp may be processed later than ready_rsp + case TDMT_SCH_MERGE_QUERY_RSP: + case TDMT_SCH_QUERY_RSP: // query_rsp may be processed later than ready_rsp if (lastMsgType != reqMsgType && -1 != lastMsgType) { SCH_TASK_DLOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType), TMSG_INFO(msgType)); @@ -43,7 +44,7 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy SCH_SET_TASK_LASTMSG_TYPE(pTask, -1); return TSDB_CODE_SUCCESS; - case TDMT_VND_FETCH_RSP: + case TDMT_SCH_FETCH_RSP: if (lastMsgType != reqMsgType && -1 != lastMsgType) { SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType), TMSG_INFO(msgType)); @@ -238,7 +239,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch break; } - case TDMT_VND_QUERY_RSP: { + case TDMT_SCH_QUERY_RSP: + case TDMT_SCH_MERGE_QUERY_RSP: { SQueryTableRsp *rsp = (SQueryTableRsp *)msg; SCH_ERR_JRET(rspCode); @@ -255,7 +257,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch break; } - case TDMT_VND_EXPLAIN_RSP: { + case TDMT_SCH_EXPLAIN_RSP: { SCH_ERR_JRET(rspCode); if (NULL == msg) { SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); @@ -285,7 +287,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch } break; } - case TDMT_VND_FETCH_RSP: { + case TDMT_SCH_FETCH_RSP: { SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg; SCH_ERR_JRET(rspCode); @@ -333,8 +335,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch schProcessOnDataFetched(pJob); break; } - case TDMT_VND_DROP_TASK_RSP: { - // SHOULD NEVER REACH HERE + case TDMT_SCH_DROP_TASK_RSP: { + // NEVER REACH HERE SCH_TASK_ELOG("invalid status to handle drop task rsp, refId:0x%" PRIx64, pJob->refId); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); break; @@ -358,8 +360,9 @@ _return: } -int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, int32_t rspCode) { +int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { int32_t code = 0; + int32_t msgType = pMsg->msgType; SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param; SSchTask *pTask = NULL; @@ -393,8 +396,8 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in SCH_ERR_JRET(schValidateReceivedMsgType(pJob, pTask, msgType)); - if (NEED_SCHEDULER_REDIRECT_ERROR(rspCode) || ((rspCode == TSDB_CODE_RPC_NETWORK_UNAVAIL) && msgSize > 0)) { - code = schHandleRedirect(pJob, pTask, msgType, pMsg, rspCode); + if (NEED_SCHEDULER_REDIRECT_ERROR(rspCode) || ((rspCode == TSDB_CODE_RPC_NETWORK_UNAVAIL) && pMsg->len > 0)) { + code = schHandleRedirect(pJob, pTask, (SDataBuf *)pMsg, rspCode); goto _return; } @@ -416,46 +419,14 @@ _return: SCH_RET(code); } -int32_t schHandleSubmitCallback(void *param, const SDataBuf *pMsg, int32_t code) { - return schHandleCallback(param, pMsg, TDMT_VND_SUBMIT_RSP, code); -} - -int32_t schHandleCreateTbCallback(void *param, const SDataBuf *pMsg, int32_t code) { - return schHandleCallback(param, pMsg, TDMT_VND_CREATE_TABLE_RSP, code); -} - -int32_t schHandleDropTbCallback(void *param, const SDataBuf *pMsg, int32_t code) { - return schHandleCallback(param, pMsg, TDMT_VND_DROP_TABLE_RSP, code); -} - -int32_t schHandleAlterTbCallback(void *param, const SDataBuf *pMsg, int32_t code) { - return schHandleCallback(param, pMsg, TDMT_VND_ALTER_TABLE_RSP, code); -} - -int32_t schHandleQueryCallback(void *param, const SDataBuf *pMsg, int32_t code) { - return schHandleCallback(param, pMsg, TDMT_VND_QUERY_RSP, code); -} - -int32_t schHandleDeleteCallback(void *param, const SDataBuf *pMsg, int32_t code) { - return schHandleCallback(param, pMsg, TDMT_VND_DELETE_RSP, code); -} - -int32_t schHandleFetchCallback(void *param, const SDataBuf *pMsg, int32_t code) { - return schHandleCallback(param, pMsg, TDMT_VND_FETCH_RSP, code); -} - -int32_t schHandleExplainCallback(void *param, const SDataBuf *pMsg, int32_t code) { - return schHandleCallback(param, pMsg, TDMT_VND_EXPLAIN_RSP, code); -} - -int32_t schHandleDropCallback(void *param, const SDataBuf *pMsg, int32_t code) { +int32_t schHandleDropCallback(void *param, SDataBuf *pMsg, int32_t code) { SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param; qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " drop task rsp received, code:0x%x", pParam->queryId, pParam->taskId, code); taosMemoryFreeClear(param); return TSDB_CODE_SUCCESS; } -int32_t schHandleLinkBrokenCallback(void *param, const SDataBuf *pMsg, int32_t code) { +int32_t schHandleLinkBrokenCallback(void *param, SDataBuf *pMsg, int32_t code) { SSchCallbackParamHeader *head = (SSchCallbackParamHeader *)param; rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT); @@ -468,7 +439,7 @@ int32_t schHandleLinkBrokenCallback(void *param, const SDataBuf *pMsg, int32_t c SCH_ERR_RET(schBuildAndSendHbMsg(&hbParam->nodeEpId, NULL)); } else { - SCH_ERR_RET(schHandleCallback(param, pMsg, TDMT_SCH_LINK_BROKEN, code)); + SCH_ERR_RET(schHandleCallback(param, pMsg, code)); } return TSDB_CODE_SUCCESS; @@ -564,29 +535,15 @@ _return: int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) { switch (msgType) { case TDMT_VND_CREATE_TABLE: - *fp = schHandleCreateTbCallback; - break; case TDMT_VND_DROP_TABLE: - *fp = schHandleDropTbCallback; - break; case TDMT_VND_ALTER_TABLE: - *fp = schHandleAlterTbCallback; - break; case TDMT_VND_SUBMIT: - *fp = schHandleSubmitCallback; - break; case TDMT_SCH_QUERY: case TDMT_SCH_MERGE_QUERY: - *fp = schHandleQueryCallback; - break; case TDMT_VND_DELETE: - *fp = schHandleDeleteCallback; - break; case TDMT_SCH_EXPLAIN: - *fp = schHandleExplainCallback; - break; case TDMT_SCH_FETCH: - *fp = schHandleFetchCallback; + *fp = schHandleCallback; break; case TDMT_SCH_DROP_TASK: *fp = schHandleDropCallback; @@ -700,7 +657,7 @@ int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) { SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - int32_t msgType = TDMT_VND_QUERY_HEARTBEAT_RSP; + int32_t msgType = TDMT_SCH_QUERY_HEARTBEAT_RSP; __async_send_cb_fn_t fp = NULL; SCH_ERR_JRET(schGetCallbackFp(TDMT_SCH_QUERY_HEARTBEAT, &fp)); @@ -730,7 +687,7 @@ _return: SCH_RET(code); } -int32_t schHandleHbCallback(void *param, const SDataBuf *pMsg, int32_t code) { +int32_t schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code) { SSchedulerHbRsp rsp = {0}; SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param; @@ -794,7 +751,7 @@ int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) { SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)}; SCH_ERR_JRET(schGenerateCallBackInfo(pJob, pTask, NULL, 0, TDMT_SCH_EXPLAIN, &trans, false, &pExplainMsgSendInfo)); - int32_t msgType = TDMT_VND_EXPLAIN_RSP; + int32_t msgType = TDMT_SCH_EXPLAIN_RSP; SRpcCtxVal ctxVal = {.val = pExplainMsgSendInfo, .clone = schCloneSMsgSendInfo}; if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) { SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType); diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index 43822e4f5b..22b08e9037 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -412,7 +412,7 @@ void *schtCreateFetchRspThread(void *param) { rsp->completed = 1; rsp->numOfRows = 10; - code = schHandleResponseMsg(pJob, pJob->fetchTask, TDMT_VND_FETCH_RSP, (char *)rsp, sizeof(*rsp), 0); + code = schHandleResponseMsg(pJob, pJob->fetchTask, TDMT_SCH_FETCH_RSP, (char *)rsp, sizeof(*rsp), 0); schReleaseJob(job); @@ -445,7 +445,7 @@ void *schtFetchRspThread(void *aa) { dataBuf.pData = rsp; dataBuf.len = sizeof(*rsp); - code = schHandleCallback(param, &dataBuf, TDMT_VND_FETCH_RSP, 0); + code = schHandleCallback(param, &dataBuf, TDMT_SCH_FETCH_RSP, 0); assert(code == 0 || code); } @@ -547,7 +547,7 @@ void* schtRunJobThread(void *aa) { dataBuf.pData = &rsp; dataBuf.len = sizeof(rsp); - code = schHandleCallback(param, &dataBuf, TDMT_VND_QUERY_RSP, 0); + code = schHandleCallback(param, &dataBuf, TDMT_SCH_QUERY_RSP, 0); assert(code == 0 || code); pIter = taosHashIterate(execTasks, pIter); @@ -566,7 +566,7 @@ void* schtRunJobThread(void *aa) { dataBuf.pData = &rsp; dataBuf.len = sizeof(rsp); - code = schHandleCallback(param, &dataBuf, TDMT_VND_QUERY_RSP, 0); + code = schHandleCallback(param, &dataBuf, TDMT_SCH_QUERY_RSP, 0); assert(code == 0 || code); pIter = taosHashIterate(execTasks, pIter); @@ -677,7 +677,7 @@ TEST(queryTest, normalCase) { SSchTask *task = *(SSchTask **)pIter; SQueryTableRsp rsp = {0}; - code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0); + code = schHandleResponseMsg(pJob, task, TDMT_SCH_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0); ASSERT_EQ(code, 0); pIter = taosHashIterate(pJob->execTasks, pIter); @@ -688,7 +688,7 @@ TEST(queryTest, normalCase) { SSchTask *task = *(SSchTask **)pIter; SQueryTableRsp rsp = {0}; - code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0); + code = schHandleResponseMsg(pJob, task, TDMT_SCH_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0); ASSERT_EQ(code, 0); pIter = taosHashIterate(pJob->execTasks, pIter); @@ -780,7 +780,7 @@ TEST(queryTest, readyFirstCase) { SSchTask *task = *(SSchTask **)pIter; SQueryTableRsp rsp = {0}; - code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0); + code = schHandleResponseMsg(pJob, task, TDMT_SCH_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0); ASSERT_EQ(code, 0); pIter = taosHashIterate(pJob->execTasks, pIter); @@ -791,7 +791,7 @@ TEST(queryTest, readyFirstCase) { SSchTask *task = *(SSchTask **)pIter; SQueryTableRsp rsp = {0}; - code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0); + code = schHandleResponseMsg(pJob, task, TDMT_SCH_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0); ASSERT_EQ(code, 0); pIter = taosHashIterate(pJob->execTasks, pIter); @@ -898,7 +898,7 @@ TEST(queryTest, flowCtrlCase) { if (task->lastMsgType == TDMT_SCH_QUERY) { SQueryTableRsp rsp = {0}; - code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0); + code = schHandleResponseMsg(pJob, task, TDMT_SCH_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0); ASSERT_EQ(code, 0); } else { diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index f53507c8aa..aacae590a0 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -1114,4 +1114,6 @@ _return2: rpcFreeCont(msg->pCont); } +int transGetConnInfo(void* thandle, STransHandleInfo* pConnInfo) { return -1; } + #endif