Merge pull request #27747 from taosdata/fix/3.0/TD-31869
fix client taskqueueWorkerpool stuck problem when querypolicy = 4
This commit is contained in:
commit
f3ef229c69
|
@ -348,6 +348,8 @@ extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char** msg, int32_t msgSi
|
|||
void* (*mallocFp)(int64_t));
|
||||
extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t msgSize);
|
||||
|
||||
void* getTaskPoolWorkerCb();
|
||||
|
||||
#define SET_META_TYPE_NULL(t) (t) = META_TYPE_NULL_TABLE
|
||||
#define SET_META_TYPE_CTABLE(t) (t) = META_TYPE_CTABLE
|
||||
#define SET_META_TYPE_TABLE(t) (t) = META_TYPE_TABLE
|
||||
|
|
|
@ -72,6 +72,7 @@ typedef struct SQWMsg {
|
|||
int32_t msgLen;
|
||||
SQWMsgInfo msgInfo;
|
||||
SRpcHandleInfo connInfo;
|
||||
void *pWorkerCb;
|
||||
} SQWMsg;
|
||||
|
||||
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const SMsgCb *pMsgCb);
|
||||
|
|
|
@ -67,6 +67,7 @@ typedef struct SSchedulerReq {
|
|||
SExecResult* pExecRes;
|
||||
void** pFetchRes;
|
||||
int8_t source;
|
||||
void* pWorkerCb;
|
||||
} SSchedulerReq;
|
||||
|
||||
int32_t schedulerInit(void);
|
||||
|
|
|
@ -841,6 +841,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
|
|||
.chkKillParam = (void*)pRequest->self,
|
||||
.pExecRes = &res,
|
||||
.source = pRequest->source,
|
||||
.pWorkerCb = getTaskPoolWorkerCb(),
|
||||
};
|
||||
|
||||
int32_t code = schedulerExecJob(&req, &pRequest->body.queryJob);
|
||||
|
@ -1358,6 +1359,7 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat
|
|||
.chkKillParam = (void*)pRequest->self,
|
||||
.pExecRes = NULL,
|
||||
.source = pRequest->source,
|
||||
.pWorkerCb = getTaskPoolWorkerCb(),
|
||||
};
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = schedulerExecJob(&req, &pRequest->body.queryJob);
|
||||
|
|
|
@ -1363,6 +1363,19 @@ void restartAsyncQuery(SRequestObj *pRequest, int32_t code) {
|
|||
doAsyncQuery(pUserReq, true);
|
||||
}
|
||||
|
||||
typedef struct SAsyncFetchParam {
|
||||
SRequestObj *pReq;
|
||||
__taos_async_fn_t fp;
|
||||
void *param;
|
||||
} SAsyncFetchParam;
|
||||
|
||||
static int32_t doAsyncFetch(void* pParam) {
|
||||
SAsyncFetchParam *param = pParam;
|
||||
taosAsyncFetchImpl(param->pReq, param->fp, param->param);
|
||||
taosMemoryFree(param);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
|
||||
if (res == NULL || fp == NULL) {
|
||||
tscError("taos_fetch_rows_a invalid paras");
|
||||
|
@ -1370,6 +1383,7 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
|
|||
}
|
||||
if (!TD_RES_QUERY(res)) {
|
||||
tscError("taos_fetch_rows_a res is NULL");
|
||||
fp(param, res, TSDB_CODE_APP_ERROR);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -1379,7 +1393,20 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
|
|||
return;
|
||||
}
|
||||
|
||||
taosAsyncFetchImpl(pRequest, fp, param);
|
||||
SAsyncFetchParam* pParam = taosMemoryCalloc(1, sizeof(SAsyncFetchParam));
|
||||
if (!pParam) {
|
||||
fp(param, res, terrno);
|
||||
return;
|
||||
}
|
||||
pParam->pReq = pRequest;
|
||||
pParam->fp = fp;
|
||||
pParam->param = param;
|
||||
int32_t code = taosAsyncExec(doAsyncFetch, pParam, NULL);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
taosMemoryFree(pParam);
|
||||
fp(param, res, code);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
|
||||
|
|
|
@ -671,3 +671,7 @@ void freeDbCfgInfo(SDbCfgInfo* pInfo) {
|
|||
}
|
||||
taosMemoryFree(pInfo);
|
||||
}
|
||||
|
||||
void* getTaskPoolWorkerCb() {
|
||||
return taskQueue.wrokrerPool.pCb;
|
||||
}
|
||||
|
|
|
@ -1478,6 +1478,7 @@ int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64
|
|||
ctx->explainRes = explainRes;
|
||||
|
||||
rHandle.pMsgCb = taosMemoryCalloc(1, sizeof(SMsgCb));
|
||||
rHandle.pWorkerCb = qwMsg->pWorkerCb;
|
||||
if (NULL == rHandle.pMsgCb) {
|
||||
QW_ERR_JRET(terrno);
|
||||
}
|
||||
|
|
|
@ -306,6 +306,7 @@ typedef struct SSchJob {
|
|||
char *sql;
|
||||
SQueryProfileSummary summary;
|
||||
int8_t source;
|
||||
void *pWorkerCb;
|
||||
} SSchJob;
|
||||
|
||||
typedef struct SSchTaskCtx {
|
||||
|
|
|
@ -836,6 +836,7 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
|
|||
pJob->userRes.execFp = pReq->execFp;
|
||||
pJob->userRes.cbParam = pReq->cbParam;
|
||||
pJob->source = pReq->source;
|
||||
pJob->pWorkerCb = pReq->pWorkerCb;
|
||||
|
||||
if (pReq->pNodeList == NULL || taosArrayGetSize(pReq->pNodeList) <= 0) {
|
||||
qDebug("qid:0x%" PRIx64 " input exec nodeList is empty", pReq->pDag->queryId);
|
||||
|
|
|
@ -1134,6 +1134,7 @@ int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) {
|
|||
qwMsg.msg = pTask->plan;
|
||||
qwMsg.msgType = pTask->plan->msgType;
|
||||
qwMsg.connInfo.handle = pJob->conn.pTrans;
|
||||
qwMsg.pWorkerCb = pJob->pWorkerCb;
|
||||
|
||||
if (SCH_IS_EXPLAIN_JOB(pJob)) {
|
||||
explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
|
||||
|
|
Loading…
Reference in New Issue