From b39b6589c4fb61e0236da4705f0b51867fcc909f Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Mon, 9 Sep 2024 10:02:47 +0800 Subject: [PATCH] fix client taskqueueWorkerpool stuck problem when querypolicy = 4 --- include/libs/qcom/query.h | 2 ++ include/libs/qworker/qworker.h | 1 + include/libs/scheduler/scheduler.h | 1 + source/client/src/clientImpl.c | 2 ++ source/libs/qcom/src/queryUtil.c | 4 ++++ source/libs/qworker/src/qworker.c | 1 + source/libs/scheduler/inc/schInt.h | 1 + source/libs/scheduler/src/schJob.c | 1 + source/libs/scheduler/src/schTask.c | 1 + 9 files changed, 14 insertions(+) diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 6e2b83dce7..c8c4fcad44 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -348,6 +348,8 @@ extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char** msg, int32_t msgSi void* (*mallocFp)(int64_t)); extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t msgSize); +void* getTaskPoolWorkerCb(); + #define SET_META_TYPE_NULL(t) (t) = META_TYPE_NULL_TABLE #define SET_META_TYPE_CTABLE(t) (t) = META_TYPE_CTABLE #define SET_META_TYPE_TABLE(t) (t) = META_TYPE_TABLE diff --git a/include/libs/qworker/qworker.h b/include/libs/qworker/qworker.h index ac27534ab0..83daf0376c 100644 --- a/include/libs/qworker/qworker.h +++ b/include/libs/qworker/qworker.h @@ -72,6 +72,7 @@ typedef struct SQWMsg { int32_t msgLen; SQWMsgInfo msgInfo; SRpcHandleInfo connInfo; + void *pWorkerCb; } SQWMsg; int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const SMsgCb *pMsgCb); diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index 8ab3b898ca..b98170f168 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -67,6 +67,7 @@ typedef struct SSchedulerReq { SExecResult* pExecRes; void** pFetchRes; int8_t source; + void* pWorkerCb; } SSchedulerReq; int32_t schedulerInit(void); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 34a82d4c42..b9a00f4434 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -841,6 +841,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList .chkKillParam = (void*)pRequest->self, .pExecRes = &res, .source = pRequest->source, + .pWorkerCb = getTaskPoolWorkerCb(), }; int32_t code = schedulerExecJob(&req, &pRequest->body.queryJob); @@ -1358,6 +1359,7 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat .chkKillParam = (void*)pRequest->self, .pExecRes = NULL, .source = pRequest->source, + .pWorkerCb = getTaskPoolWorkerCb(), }; if (TSDB_CODE_SUCCESS == code) { code = schedulerExecJob(&req, &pRequest->body.queryJob); diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 34821d05cd..7c7439eac3 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -671,3 +671,7 @@ void freeDbCfgInfo(SDbCfgInfo* pInfo) { } taosMemoryFree(pInfo); } + +void* getTaskPoolWorkerCb() { + return taskQueue.wrokrerPool.pCb; +} diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 2526ac73bc..1a9d3e7ba9 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -1478,6 +1478,7 @@ int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64 ctx->explainRes = explainRes; rHandle.pMsgCb = taosMemoryCalloc(1, sizeof(SMsgCb)); + rHandle.pWorkerCb = qwMsg->pWorkerCb; if (NULL == rHandle.pMsgCb) { QW_ERR_JRET(terrno); } diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index f12d0fd246..3a25f37895 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -306,6 +306,7 @@ typedef struct SSchJob { char *sql; SQueryProfileSummary summary; int8_t source; + void *pWorkerCb; } SSchJob; typedef struct SSchTaskCtx { diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index cf4a132eb5..8e2fbb878d 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -836,6 +836,7 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) { pJob->userRes.execFp = pReq->execFp; pJob->userRes.cbParam = pReq->cbParam; pJob->source = pReq->source; + pJob->pWorkerCb = pReq->pWorkerCb; if (pReq->pNodeList == NULL || taosArrayGetSize(pReq->pNodeList) <= 0) { qDebug("qid:0x%" PRIx64 " input exec nodeList is empty", pReq->pDag->queryId); diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index 729c5066ac..59b8954a48 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -1134,6 +1134,7 @@ int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) { qwMsg.msg = pTask->plan; qwMsg.msgType = pTask->plan->msgType; qwMsg.connInfo.handle = pJob->conn.pTrans; + qwMsg.pWorkerCb = pJob->pWorkerCb; if (SCH_IS_EXPLAIN_JOB(pJob)) { explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));