feat: query redirect
This commit is contained in:
parent
1b3eec48bd
commit
47bc36872f
|
@ -1594,6 +1594,7 @@ typedef struct {
|
||||||
uint64_t queryId;
|
uint64_t queryId;
|
||||||
uint64_t taskId;
|
uint64_t taskId;
|
||||||
int64_t refId;
|
int64_t refId;
|
||||||
|
int32_t execId;
|
||||||
} STaskCancelReq;
|
} STaskCancelReq;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -1294,10 +1294,10 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
|
||||||
*/
|
*/
|
||||||
int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start;
|
int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start;
|
||||||
if (pMsg->code == TSDB_CODE_SUCCESS) {
|
if (pMsg->code == TSDB_CODE_SUCCESS) {
|
||||||
tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%" PRIx64, pRequest->self,
|
tscDebug("0x%" PRIx64 " rsp msg:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%" PRIx64, pRequest->self,
|
||||||
TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed / 1000, pRequest->requestId);
|
TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed / 1000, pRequest->requestId);
|
||||||
} else {
|
} else {
|
||||||
tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%" PRIx64, pRequest->self,
|
tscError("0x%" PRIx64 " rsp msg:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%" PRIx64, pRequest->self,
|
||||||
TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed / 1000, pRequest->requestId);
|
TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed / 1000, pRequest->requestId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3430,6 +3430,7 @@ static int32_t jsonToSlotDescNode(const SJson* pJson, void* pObj) {
|
||||||
static const char* jkDownstreamSourceAddr = "Addr";
|
static const char* jkDownstreamSourceAddr = "Addr";
|
||||||
static const char* jkDownstreamSourceTaskId = "TaskId";
|
static const char* jkDownstreamSourceTaskId = "TaskId";
|
||||||
static const char* jkDownstreamSourceSchedId = "SchedId";
|
static const char* jkDownstreamSourceSchedId = "SchedId";
|
||||||
|
static const char* jkDownstreamSourceExecId = "ExecId";
|
||||||
|
|
||||||
static int32_t downstreamSourceNodeToJson(const void* pObj, SJson* pJson) {
|
static int32_t downstreamSourceNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
const SDownstreamSourceNode* pNode = (const SDownstreamSourceNode*)pObj;
|
const SDownstreamSourceNode* pNode = (const SDownstreamSourceNode*)pObj;
|
||||||
|
@ -3442,7 +3443,7 @@ static int32_t downstreamSourceNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
code = tjsonAddIntegerToObject(pJson, jkDownstreamSourceSchedId, pNode->schedId);
|
code = tjsonAddIntegerToObject(pJson, jkDownstreamSourceSchedId, pNode->schedId);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonAddIntegerToObject(pJson, jkDownstreamSourceSchedId, pNode->execId);
|
code = tjsonAddIntegerToObject(pJson, jkDownstreamSourceExecId, pNode->execId);
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
@ -3459,7 +3460,7 @@ static int32_t jsonToDownstreamSourceNode(const SJson* pJson, void* pObj) {
|
||||||
code = tjsonGetUBigIntValue(pJson, jkDownstreamSourceSchedId, &pNode->schedId);
|
code = tjsonGetUBigIntValue(pJson, jkDownstreamSourceSchedId, &pNode->schedId);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonGetIntValue(pJson, jkDownstreamSourceSchedId, &pNode->execId);
|
code = tjsonGetIntValue(pJson, jkDownstreamSourceExecId, &pNode->execId);
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -281,22 +281,22 @@ typedef struct SQWorkerMgmt {
|
||||||
#define QW_SCH_ELOG(param, ...) qError("QW:%p SID:%" PRIx64 " " param, mgmt, sId, __VA_ARGS__)
|
#define QW_SCH_ELOG(param, ...) qError("QW:%p SID:%" PRIx64 " " param, mgmt, sId, __VA_ARGS__)
|
||||||
#define QW_SCH_DLOG(param, ...) qDebug("QW:%p SID:%" PRIx64 " " param, mgmt, sId, __VA_ARGS__)
|
#define QW_SCH_DLOG(param, ...) qDebug("QW:%p SID:%" PRIx64 " " param, mgmt, sId, __VA_ARGS__)
|
||||||
|
|
||||||
#define QW_TASK_ELOG(param, ...) qError("QW:%p QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, qId, tId, __VA_ARGS__)
|
#define QW_TASK_ELOG(param, ...) qError("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, eId, __VA_ARGS__)
|
||||||
#define QW_TASK_WLOG(param, ...) qWarn("QW:%p QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, qId, tId, __VA_ARGS__)
|
#define QW_TASK_WLOG(param, ...) qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, eId, __VA_ARGS__)
|
||||||
#define QW_TASK_DLOG(param, ...) qDebug("QW:%p QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, qId, tId, __VA_ARGS__)
|
#define QW_TASK_DLOG(param, ...) qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, eId, __VA_ARGS__)
|
||||||
#define QW_TASK_DLOGL(param, ...) \
|
#define QW_TASK_DLOGL(param, ...) \
|
||||||
qDebugL("QW:%p QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, qId, tId, __VA_ARGS__)
|
qDebugL("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, eId, __VA_ARGS__)
|
||||||
|
|
||||||
#define QW_TASK_ELOG_E(param) qError("QW:%p QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, qId, tId)
|
#define QW_TASK_ELOG_E(param) qError("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, eId)
|
||||||
#define QW_TASK_WLOG_E(param) qWarn("QW:%p QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, qId, tId)
|
#define QW_TASK_WLOG_E(param) qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, eId)
|
||||||
#define QW_TASK_DLOG_E(param) qDebug("QW:%p QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, qId, tId)
|
#define QW_TASK_DLOG_E(param) qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, eId)
|
||||||
|
|
||||||
#define QW_SCH_TASK_ELOG(param, ...) \
|
#define QW_SCH_TASK_ELOG(param, ...) \
|
||||||
qError("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, sId, qId, tId, __VA_ARGS__)
|
qError("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, qId, tId, eId, __VA_ARGS__)
|
||||||
#define QW_SCH_TASK_WLOG(param, ...) \
|
#define QW_SCH_TASK_WLOG(param, ...) \
|
||||||
qWarn("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, sId, qId, tId, __VA_ARGS__)
|
qWarn("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, qId, tId, eId, __VA_ARGS__)
|
||||||
#define QW_SCH_TASK_DLOG(param, ...) \
|
#define QW_SCH_TASK_DLOG(param, ...) \
|
||||||
qDebug("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, sId, qId, tId, __VA_ARGS__)
|
qDebug("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, qId, tId, eId, __VA_ARGS__)
|
||||||
|
|
||||||
#define QW_LOCK_DEBUG(...) \
|
#define QW_LOCK_DEBUG(...) \
|
||||||
do { \
|
do { \
|
||||||
|
|
|
@ -480,11 +480,13 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in
|
||||||
msg->queryId = be64toh(msg->queryId);
|
msg->queryId = be64toh(msg->queryId);
|
||||||
msg->taskId = be64toh(msg->taskId);
|
msg->taskId = be64toh(msg->taskId);
|
||||||
msg->refId = be64toh(msg->refId);
|
msg->refId = be64toh(msg->refId);
|
||||||
|
msg->execId = ntohl(msg->execId);
|
||||||
|
|
||||||
uint64_t sId = msg->sId;
|
uint64_t sId = msg->sId;
|
||||||
uint64_t qId = msg->queryId;
|
uint64_t qId = msg->queryId;
|
||||||
uint64_t tId = msg->taskId;
|
uint64_t tId = msg->taskId;
|
||||||
int64_t rId = msg->refId;
|
int64_t rId = msg->refId;
|
||||||
|
int32_t eId = msg->execId;
|
||||||
|
|
||||||
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info};
|
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info};
|
||||||
|
|
||||||
|
@ -598,7 +600,7 @@ int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SR
|
||||||
uint64_t qId = req.queryId;
|
uint64_t qId = req.queryId;
|
||||||
uint64_t tId = req.taskId;
|
uint64_t tId = req.taskId;
|
||||||
int64_t rId = 0;
|
int64_t rId = 0;
|
||||||
int32_t eId = 0;
|
int32_t eId = -1;
|
||||||
|
|
||||||
SQWMsg qwMsg = {.node = node, .msg = req.msg, .msgLen = req.phyLen, .connInfo = pMsg->info};
|
SQWMsg qwMsg = {.node = node, .msg = req.msg, .msgLen = req.phyLen, .connInfo = pMsg->info};
|
||||||
QW_SCH_TASK_DLOG("processDelete start, node:%p, handle:%p, sql:%s", node, pMsg->info.handle, req.sql);
|
QW_SCH_TASK_DLOG("processDelete start, node:%p, handle:%p, sql:%s", node, pMsg->info.handle, req.sql);
|
||||||
|
|
|
@ -566,20 +566,22 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex
|
||||||
|
|
||||||
|
|
||||||
if (gQWDebug.tmp) {
|
if (gQWDebug.tmp) {
|
||||||
#if 0
|
#if 1
|
||||||
SEpSet epSet = {0};
|
if (TDMT_SCH_QUERY == qwMsg->msgType) {
|
||||||
epSet.inUse = 1;
|
SEpSet epSet = {0};
|
||||||
epSet.numOfEps = 3;
|
epSet.inUse = 1;
|
||||||
strcpy(epSet.eps[0].fqdn, "localhost");
|
epSet.numOfEps = 3;
|
||||||
epSet.eps[0].port = 7100;
|
strcpy(epSet.eps[0].fqdn, "localhost");
|
||||||
strcpy(epSet.eps[1].fqdn, "localhost");
|
epSet.eps[0].port = 7100;
|
||||||
epSet.eps[1].port = 7200;
|
strcpy(epSet.eps[1].fqdn, "localhost");
|
||||||
strcpy(epSet.eps[2].fqdn, "localhost");
|
epSet.eps[1].port = 7200;
|
||||||
epSet.eps[2].port = 7300;
|
strcpy(epSet.eps[2].fqdn, "localhost");
|
||||||
|
epSet.eps[2].port = 7300;
|
||||||
|
|
||||||
qwDbgBuildAndSendRedirectRsp(pMsg->msgType + 1, &pMsg->info, TSDB_CODE_RPC_REDIRECT, &epSet);
|
qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, &epSet);
|
||||||
gQWDebug.tmp = false;
|
gQWDebug.tmp = false;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
#else
|
#else
|
||||||
if (TDMT_SCH_MERGE_QUERY == qwMsg->msgType) {
|
if (TDMT_SCH_MERGE_QUERY == qwMsg->msgType) {
|
||||||
ctx->phase = QW_PHASE_POST_QUERY;
|
ctx->phase = QW_PHASE_POST_QUERY;
|
||||||
|
|
|
@ -1276,6 +1276,9 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList) {
|
||||||
for (int32_t i = 0; i < taskNum; ++i) {
|
for (int32_t i = 0; i < taskNum; ++i) {
|
||||||
STaskStatus *taskStatus = taosArrayGet(pStatusList, i);
|
STaskStatus *taskStatus = taosArrayGet(pStatusList, i);
|
||||||
|
|
||||||
|
qDebug("QID:%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d task status in server: %s",
|
||||||
|
taskStatus->queryId, taskStatus->taskId, taskStatus->execId, jobTaskStatusStr(taskStatus->status));
|
||||||
|
|
||||||
SSchJob *pJob = schAcquireJob(taskStatus->refId);
|
SSchJob *pJob = schAcquireJob(taskStatus->refId);
|
||||||
if (NULL == pJob) {
|
if (NULL == pJob) {
|
||||||
qWarn("job not found, refId:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64, taskStatus->refId,
|
qWarn("job not found, refId:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64, taskStatus->refId,
|
||||||
|
@ -1284,8 +1287,6 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
SCH_JOB_DLOG("TID:0x%" PRIx64 "EID:%d task status in server: %s", taskStatus->taskId, taskStatus->execId, jobTaskStatusStr(taskStatus->status));
|
|
||||||
|
|
||||||
pTask = NULL;
|
pTask = NULL;
|
||||||
schGetTaskInJob(pJob, taskStatus->taskId, &pTask);
|
schGetTaskInJob(pJob, taskStatus->taskId, &pTask);
|
||||||
if (NULL == pTask) {
|
if (NULL == pTask) {
|
||||||
|
|
|
@ -1062,7 +1062,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
|
||||||
pMsg->queryId = htobe64(pJob->queryId);
|
pMsg->queryId = htobe64(pJob->queryId);
|
||||||
pMsg->taskId = htobe64(pTask->taskId);
|
pMsg->taskId = htobe64(pTask->taskId);
|
||||||
pMsg->refId = htobe64(pJob->refId);
|
pMsg->refId = htobe64(pJob->refId);
|
||||||
pMsg->execId = htobe64(pTask->execId);
|
pMsg->execId = htonl(pTask->execId);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case TDMT_SCH_QUERY_HEARTBEAT: {
|
case TDMT_SCH_QUERY_HEARTBEAT: {
|
||||||
|
|
Loading…
Reference in New Issue