fix client taskqueueWorkerpool stuck problem when querypolicy = 4
This commit is contained in:
parent
9f70866ddb
commit
b39b6589c4
|
@ -348,6 +348,8 @@ extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char** msg, int32_t msgSi
|
||||||
void* (*mallocFp)(int64_t));
|
void* (*mallocFp)(int64_t));
|
||||||
extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t msgSize);
|
extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t msgSize);
|
||||||
|
|
||||||
|
void* getTaskPoolWorkerCb();
|
||||||
|
|
||||||
#define SET_META_TYPE_NULL(t) (t) = META_TYPE_NULL_TABLE
|
#define SET_META_TYPE_NULL(t) (t) = META_TYPE_NULL_TABLE
|
||||||
#define SET_META_TYPE_CTABLE(t) (t) = META_TYPE_CTABLE
|
#define SET_META_TYPE_CTABLE(t) (t) = META_TYPE_CTABLE
|
||||||
#define SET_META_TYPE_TABLE(t) (t) = META_TYPE_TABLE
|
#define SET_META_TYPE_TABLE(t) (t) = META_TYPE_TABLE
|
||||||
|
|
|
@ -72,6 +72,7 @@ typedef struct SQWMsg {
|
||||||
int32_t msgLen;
|
int32_t msgLen;
|
||||||
SQWMsgInfo msgInfo;
|
SQWMsgInfo msgInfo;
|
||||||
SRpcHandleInfo connInfo;
|
SRpcHandleInfo connInfo;
|
||||||
|
void *pWorkerCb;
|
||||||
} SQWMsg;
|
} SQWMsg;
|
||||||
|
|
||||||
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const SMsgCb *pMsgCb);
|
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const SMsgCb *pMsgCb);
|
||||||
|
|
|
@ -67,6 +67,7 @@ typedef struct SSchedulerReq {
|
||||||
SExecResult* pExecRes;
|
SExecResult* pExecRes;
|
||||||
void** pFetchRes;
|
void** pFetchRes;
|
||||||
int8_t source;
|
int8_t source;
|
||||||
|
void* pWorkerCb;
|
||||||
} SSchedulerReq;
|
} SSchedulerReq;
|
||||||
|
|
||||||
int32_t schedulerInit(void);
|
int32_t schedulerInit(void);
|
||||||
|
|
|
@ -841,6 +841,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
|
||||||
.chkKillParam = (void*)pRequest->self,
|
.chkKillParam = (void*)pRequest->self,
|
||||||
.pExecRes = &res,
|
.pExecRes = &res,
|
||||||
.source = pRequest->source,
|
.source = pRequest->source,
|
||||||
|
.pWorkerCb = getTaskPoolWorkerCb(),
|
||||||
};
|
};
|
||||||
|
|
||||||
int32_t code = schedulerExecJob(&req, &pRequest->body.queryJob);
|
int32_t code = schedulerExecJob(&req, &pRequest->body.queryJob);
|
||||||
|
@ -1358,6 +1359,7 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat
|
||||||
.chkKillParam = (void*)pRequest->self,
|
.chkKillParam = (void*)pRequest->self,
|
||||||
.pExecRes = NULL,
|
.pExecRes = NULL,
|
||||||
.source = pRequest->source,
|
.source = pRequest->source,
|
||||||
|
.pWorkerCb = getTaskPoolWorkerCb(),
|
||||||
};
|
};
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = schedulerExecJob(&req, &pRequest->body.queryJob);
|
code = schedulerExecJob(&req, &pRequest->body.queryJob);
|
||||||
|
|
|
@ -671,3 +671,7 @@ void freeDbCfgInfo(SDbCfgInfo* pInfo) {
|
||||||
}
|
}
|
||||||
taosMemoryFree(pInfo);
|
taosMemoryFree(pInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void* getTaskPoolWorkerCb() {
|
||||||
|
return taskQueue.wrokrerPool.pCb;
|
||||||
|
}
|
||||||
|
|
|
@ -1478,6 +1478,7 @@ int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64
|
||||||
ctx->explainRes = explainRes;
|
ctx->explainRes = explainRes;
|
||||||
|
|
||||||
rHandle.pMsgCb = taosMemoryCalloc(1, sizeof(SMsgCb));
|
rHandle.pMsgCb = taosMemoryCalloc(1, sizeof(SMsgCb));
|
||||||
|
rHandle.pWorkerCb = qwMsg->pWorkerCb;
|
||||||
if (NULL == rHandle.pMsgCb) {
|
if (NULL == rHandle.pMsgCb) {
|
||||||
QW_ERR_JRET(terrno);
|
QW_ERR_JRET(terrno);
|
||||||
}
|
}
|
||||||
|
|
|
@ -306,6 +306,7 @@ typedef struct SSchJob {
|
||||||
char *sql;
|
char *sql;
|
||||||
SQueryProfileSummary summary;
|
SQueryProfileSummary summary;
|
||||||
int8_t source;
|
int8_t source;
|
||||||
|
void *pWorkerCb;
|
||||||
} SSchJob;
|
} SSchJob;
|
||||||
|
|
||||||
typedef struct SSchTaskCtx {
|
typedef struct SSchTaskCtx {
|
||||||
|
|
|
@ -836,6 +836,7 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
|
||||||
pJob->userRes.execFp = pReq->execFp;
|
pJob->userRes.execFp = pReq->execFp;
|
||||||
pJob->userRes.cbParam = pReq->cbParam;
|
pJob->userRes.cbParam = pReq->cbParam;
|
||||||
pJob->source = pReq->source;
|
pJob->source = pReq->source;
|
||||||
|
pJob->pWorkerCb = pReq->pWorkerCb;
|
||||||
|
|
||||||
if (pReq->pNodeList == NULL || taosArrayGetSize(pReq->pNodeList) <= 0) {
|
if (pReq->pNodeList == NULL || taosArrayGetSize(pReq->pNodeList) <= 0) {
|
||||||
qDebug("qid:0x%" PRIx64 " input exec nodeList is empty", pReq->pDag->queryId);
|
qDebug("qid:0x%" PRIx64 " input exec nodeList is empty", pReq->pDag->queryId);
|
||||||
|
|
|
@ -1134,6 +1134,7 @@ int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) {
|
||||||
qwMsg.msg = pTask->plan;
|
qwMsg.msg = pTask->plan;
|
||||||
qwMsg.msgType = pTask->plan->msgType;
|
qwMsg.msgType = pTask->plan->msgType;
|
||||||
qwMsg.connInfo.handle = pJob->conn.pTrans;
|
qwMsg.connInfo.handle = pJob->conn.pTrans;
|
||||||
|
qwMsg.pWorkerCb = pJob->pWorkerCb;
|
||||||
|
|
||||||
if (SCH_IS_EXPLAIN_JOB(pJob)) {
|
if (SCH_IS_EXPLAIN_JOB(pJob)) {
|
||||||
explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
|
explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
|
||||||
|
|
Loading…
Reference in New Issue