From 47bc36872f47cb2338e29f3f24a4f90ea1bf09d1 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 30 Jun 2022 13:44:26 +0800 Subject: [PATCH] feat: query redirect --- include/common/tmsg.h | 1 + source/client/src/clientImpl.c | 4 ++-- source/libs/nodes/src/nodesCodeFuncs.c | 5 +++-- source/libs/qworker/inc/qwInt.h | 20 ++++++++--------- source/libs/qworker/src/qwMsg.c | 4 +++- source/libs/qworker/src/qworker.c | 30 ++++++++++++++------------ source/libs/scheduler/src/schJob.c | 5 +++-- source/libs/scheduler/src/schRemote.c | 2 +- 8 files changed, 39 insertions(+), 32 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 2876105748..4ba668cbf5 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1594,6 +1594,7 @@ typedef struct { uint64_t queryId; uint64_t taskId; int64_t refId; + int32_t execId; } STaskCancelReq; typedef struct { diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index b20fd4b9b0..51e43f927b 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1294,10 +1294,10 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { */ int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start; if (pMsg->code == TSDB_CODE_SUCCESS) { - tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%" PRIx64, pRequest->self, + tscDebug("0x%" PRIx64 " rsp msg:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%" PRIx64, pRequest->self, TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed / 1000, pRequest->requestId); } else { - tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%" PRIx64, pRequest->self, + tscError("0x%" PRIx64 " rsp msg:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%" PRIx64, pRequest->self, TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed / 1000, pRequest->requestId); } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 4375a7b04c..72263feea1 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -3430,6 +3430,7 @@ static int32_t jsonToSlotDescNode(const SJson* pJson, void* pObj) { static const char* jkDownstreamSourceAddr = "Addr"; static const char* jkDownstreamSourceTaskId = "TaskId"; static const char* jkDownstreamSourceSchedId = "SchedId"; +static const char* jkDownstreamSourceExecId = "ExecId"; static int32_t downstreamSourceNodeToJson(const void* pObj, SJson* pJson) { const SDownstreamSourceNode* pNode = (const SDownstreamSourceNode*)pObj; @@ -3442,7 +3443,7 @@ static int32_t downstreamSourceNodeToJson(const void* pObj, SJson* pJson) { code = tjsonAddIntegerToObject(pJson, jkDownstreamSourceSchedId, pNode->schedId); } if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddIntegerToObject(pJson, jkDownstreamSourceSchedId, pNode->execId); + code = tjsonAddIntegerToObject(pJson, jkDownstreamSourceExecId, pNode->execId); } return code; @@ -3459,7 +3460,7 @@ static int32_t jsonToDownstreamSourceNode(const SJson* pJson, void* pObj) { code = tjsonGetUBigIntValue(pJson, jkDownstreamSourceSchedId, &pNode->schedId); } if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetIntValue(pJson, jkDownstreamSourceSchedId, &pNode->execId); + code = tjsonGetIntValue(pJson, jkDownstreamSourceExecId, &pNode->execId); } return code; diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 4fa2615470..6faffa13b3 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -281,22 +281,22 @@ typedef struct SQWorkerMgmt { #define QW_SCH_ELOG(param, ...) qError("QW:%p SID:%" PRIx64 " " param, mgmt, sId, __VA_ARGS__) #define QW_SCH_DLOG(param, ...) qDebug("QW:%p SID:%" PRIx64 " " param, mgmt, sId, __VA_ARGS__) -#define QW_TASK_ELOG(param, ...) qError("QW:%p QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, qId, tId, __VA_ARGS__) -#define QW_TASK_WLOG(param, ...) qWarn("QW:%p QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, qId, tId, __VA_ARGS__) -#define QW_TASK_DLOG(param, ...) qDebug("QW:%p QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, qId, tId, __VA_ARGS__) +#define QW_TASK_ELOG(param, ...) qError("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, eId, __VA_ARGS__) +#define QW_TASK_WLOG(param, ...) qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, eId, __VA_ARGS__) +#define QW_TASK_DLOG(param, ...) qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, eId, __VA_ARGS__) #define QW_TASK_DLOGL(param, ...) \ - qDebugL("QW:%p QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, qId, tId, __VA_ARGS__) + qDebugL("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, eId, __VA_ARGS__) -#define QW_TASK_ELOG_E(param) qError("QW:%p QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, qId, tId) -#define QW_TASK_WLOG_E(param) qWarn("QW:%p QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, qId, tId) -#define QW_TASK_DLOG_E(param) qDebug("QW:%p QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, qId, tId) +#define QW_TASK_ELOG_E(param) qError("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, eId) +#define QW_TASK_WLOG_E(param) qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, eId) +#define QW_TASK_DLOG_E(param) qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, eId) #define QW_SCH_TASK_ELOG(param, ...) \ - qError("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, sId, qId, tId, __VA_ARGS__) + qError("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, qId, tId, eId, __VA_ARGS__) #define QW_SCH_TASK_WLOG(param, ...) \ - qWarn("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, sId, qId, tId, __VA_ARGS__) + qWarn("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, qId, tId, eId, __VA_ARGS__) #define QW_SCH_TASK_DLOG(param, ...) \ - qDebug("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, sId, qId, tId, __VA_ARGS__) + qDebug("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, qId, tId, eId, __VA_ARGS__) #define QW_LOCK_DEBUG(...) \ do { \ diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index cc642caa70..cc4228f7c7 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -480,11 +480,13 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in msg->queryId = be64toh(msg->queryId); msg->taskId = be64toh(msg->taskId); msg->refId = be64toh(msg->refId); + msg->execId = ntohl(msg->execId); uint64_t sId = msg->sId; uint64_t qId = msg->queryId; uint64_t tId = msg->taskId; int64_t rId = msg->refId; + int32_t eId = msg->execId; SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info}; @@ -598,7 +600,7 @@ int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SR uint64_t qId = req.queryId; uint64_t tId = req.taskId; int64_t rId = 0; - int32_t eId = 0; + int32_t eId = -1; SQWMsg qwMsg = {.node = node, .msg = req.msg, .msgLen = req.phyLen, .connInfo = pMsg->info}; QW_SCH_TASK_DLOG("processDelete start, node:%p, handle:%p, sql:%s", node, pMsg->info.handle, req.sql); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 2b23f7a27f..cd48452df2 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -566,20 +566,22 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex if (gQWDebug.tmp) { -#if 0 - SEpSet epSet = {0}; - epSet.inUse = 1; - epSet.numOfEps = 3; - strcpy(epSet.eps[0].fqdn, "localhost"); - epSet.eps[0].port = 7100; - strcpy(epSet.eps[1].fqdn, "localhost"); - epSet.eps[1].port = 7200; - strcpy(epSet.eps[2].fqdn, "localhost"); - epSet.eps[2].port = 7300; - - qwDbgBuildAndSendRedirectRsp(pMsg->msgType + 1, &pMsg->info, TSDB_CODE_RPC_REDIRECT, &epSet); - gQWDebug.tmp = false; - return TSDB_CODE_SUCCESS; +#if 1 + if (TDMT_SCH_QUERY == qwMsg->msgType) { + SEpSet epSet = {0}; + epSet.inUse = 1; + epSet.numOfEps = 3; + strcpy(epSet.eps[0].fqdn, "localhost"); + epSet.eps[0].port = 7100; + strcpy(epSet.eps[1].fqdn, "localhost"); + epSet.eps[1].port = 7200; + strcpy(epSet.eps[2].fqdn, "localhost"); + epSet.eps[2].port = 7300; + + qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, &epSet); + gQWDebug.tmp = false; + return TSDB_CODE_SUCCESS; + } #else if (TDMT_SCH_MERGE_QUERY == qwMsg->msgType) { ctx->phase = QW_PHASE_POST_QUERY; diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 26824738e9..29179536df 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -1276,6 +1276,9 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList) { for (int32_t i = 0; i < taskNum; ++i) { STaskStatus *taskStatus = taosArrayGet(pStatusList, i); + qDebug("QID:%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d task status in server: %s", + taskStatus->queryId, taskStatus->taskId, taskStatus->execId, jobTaskStatusStr(taskStatus->status)); + SSchJob *pJob = schAcquireJob(taskStatus->refId); if (NULL == pJob) { qWarn("job not found, refId:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64, taskStatus->refId, @@ -1284,8 +1287,6 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList) { continue; } - SCH_JOB_DLOG("TID:0x%" PRIx64 "EID:%d task status in server: %s", taskStatus->taskId, taskStatus->execId, jobTaskStatusStr(taskStatus->status)); - pTask = NULL; schGetTaskInJob(pJob, taskStatus->taskId, &pTask); if (NULL == pTask) { diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 3d547ffbf8..69e41d3111 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -1062,7 +1062,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, pMsg->queryId = htobe64(pJob->queryId); pMsg->taskId = htobe64(pTask->taskId); pMsg->refId = htobe64(pJob->refId); - pMsg->execId = htobe64(pTask->execId); + pMsg->execId = htonl(pTask->execId); break; } case TDMT_SCH_QUERY_HEARTBEAT: {