From fe9df727f35ead1e7b6b97af06a6cc1d906a477d Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 14 Apr 2023 09:15:14 +0800 Subject: [PATCH] fix: fetch rsp message type issue --- source/libs/qworker/inc/qwInt.h | 3 ++- source/libs/qworker/src/qwDbg.c | 8 ++++---- source/libs/qworker/src/qworker.c | 16 ++++++++-------- 3 files changed, 14 insertions(+), 13 deletions(-) diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index f198b73c7d..4bc357d7dd 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -120,7 +120,8 @@ typedef struct SQWTaskCtx { int8_t explain; int8_t needFetch; int8_t localExec; - int32_t msgType; + int32_t queryMsgType; + int32_t fetchMsgType; int32_t level; uint64_t sId; diff --git a/source/libs/qworker/src/qwDbg.c b/source/libs/qworker/src/qwDbg.c index 59e63e9eae..f2e48918ab 100644 --- a/source/libs/qworker/src/qwDbg.c +++ b/source/libs/qworker/src/qwDbg.c @@ -126,10 +126,10 @@ void qwDbgDumpTasksInfo(SQWorker *mgmt) { void *key = taosHashGetKey(pIter, NULL); QW_GET_QTID(key, qId, tId, eId); - QW_TASK_DLOG("%p lock:%x, phase:%d, type:%d, explain:%d, needFetch:%d, localExec:%d, msgType:%d, " + QW_TASK_DLOG("%p lock:%x, phase:%d, type:%d, explain:%d, needFetch:%d, localExec:%d, queryMsgType:%d, " "sId:%" PRId64 ", level:%d, queryGotData:%d, queryRsped:%d, queryEnd:%d, queryContinue:%d, queryInQueue:%d, " "rspCode:%x, affectedRows:%" PRId64 ", taskHandle:%p, sinkHandle:%p, tbFName:%s, sver:%d, tver:%d, events:%d,%d,%d,%d,%d", - ctx, ctx->lock, ctx->phase, ctx->taskType, ctx->explain, ctx->needFetch, ctx->localExec, ctx->msgType, + ctx, ctx->lock, ctx->phase, ctx->taskType, ctx->explain, ctx->needFetch, ctx->localExec, ctx->queryMsgType, ctx->sId, ctx->level, ctx->queryGotData, ctx->queryRsped, ctx->queryEnd, ctx->queryContinue, ctx->queryInQueue, ctx->rspCode, ctx->affectedRows, ctx->taskHandle, ctx->sinkHandle, ctx->tbInfo.tbFName, ctx->tbInfo.sversion, ctx->tbInfo.tversion, ctx->events[QW_EVENT_CANCEL], ctx->events[QW_EVENT_READY], @@ -259,9 +259,9 @@ void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *rsped) { static int32_t ignoreTime = 0; if (++ignoreTime > 10 && 0 == taosRand() % 9) { - if (ctx->msgType == TDMT_SCH_FETCH) { + if (ctx->fetchMsgType == TDMT_SCH_FETCH) { qwBuildAndSendErrorRsp(TDMT_SCH_LINK_BROKEN, &ctx->ctrlConnInfo, TSDB_CODE_RPC_BROKEN_LINK); - qwBuildAndSendErrorRsp(ctx->msgType + 1, &ctx->dataConnInfo, TSDB_CODE_QRY_TASK_CTX_NOT_EXIST); + qwBuildAndSendErrorRsp(ctx->fetchMsgType + 1, &ctx->dataConnInfo, TSDB_CODE_QRY_TASK_CTX_NOT_EXIST); *rsped = true; taosSsleep(3); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index f92f31fb1c..533575a62f 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -464,7 +464,7 @@ int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWTaskCtx * ctx, SQWMsg *qwMsg, i qwMsg->connInfo = ctx->dataConnInfo; QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); - qwBuildAndSendFetchRsp(ctx->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code); + qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, rsp, dataLen, code); rsp = NULL; QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), @@ -626,7 +626,7 @@ _return: if (QW_PHASE_POST_QUERY == phase && ctx && !ctx->queryRsped) { bool rsped = false; - SQWMsg qwMsg = {.msgType = ctx->msgType, .connInfo = ctx->ctrlConnInfo}; + SQWMsg qwMsg = {.msgType = ctx->queryMsgType, .connInfo = ctx->ctrlConnInfo}; qwDbgSimulateRedirect(&qwMsg, ctx, &rsped); qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped); if (!rsped) { @@ -704,7 +704,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { ctx->taskType = qwMsg->msgInfo.taskType; ctx->explain = qwMsg->msgInfo.explain; ctx->needFetch = qwMsg->msgInfo.needFetch; - ctx->msgType = qwMsg->msgType; + ctx->queryMsgType = qwMsg->msgType; ctx->localExec = false; // QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg); @@ -793,7 +793,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { qwMsg->connInfo = ctx->dataConnInfo; QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); - qwBuildAndSendFetchRsp(ctx->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code); + qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, rsp, dataLen, code); rsp = NULL; QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, @@ -815,7 +815,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { rsp = NULL; qwMsg->connInfo = ctx->dataConnInfo; - qwBuildAndSendFetchRsp(ctx->msgType + 1, &qwMsg->connInfo, NULL, 0, code); + qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, NULL, 0, code); QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), 0); } @@ -849,7 +849,7 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); - ctx->msgType = qwMsg->msgType; + ctx->fetchMsgType = qwMsg->msgType; ctx->dataConnInfo = qwMsg->connInfo; SOutputData sOutput = {0}; @@ -1328,7 +1328,7 @@ int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64 ctx->taskType = qwMsg->msgInfo.taskType; ctx->explain = qwMsg->msgInfo.explain; ctx->needFetch = qwMsg->msgInfo.needFetch; - ctx->msgType = qwMsg->msgType; + ctx->queryMsgType = qwMsg->msgType; ctx->localExec = true; ctx->explainRes = explainRes; @@ -1383,7 +1383,7 @@ int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64 QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); - ctx->msgType = TDMT_SCH_MERGE_FETCH; + ctx->fetchMsgType = TDMT_SCH_MERGE_FETCH; ctx->explainRes = explainRes; SOutputData sOutput = {0};