fix: query remain issues

This commit is contained in:
dapan1121 2024-12-16 18:40:27 +08:00
parent 90272cf0c7
commit 7be31eea30
5 changed files with 41 additions and 14 deletions

View File

@ -830,7 +830,7 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
vTrace("message in vnode query queue is processing");
if ((pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_VND_TMQ_CONSUME) && !syncIsReadyForRead(pVnode->sync)) {
if (pMsg->msgType == TDMT_VND_TMQ_CONSUME && !syncIsReadyForRead(pVnode->sync)) {
vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
return 0;
}
@ -842,9 +842,21 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
SReadHandle handle = {.vnode = pVnode, .pMsgCb = &pVnode->msgCb, .pWorkerCb = pInfo->workerCb};
initStorageAPI(&handle.api);
int32_t code = TSDB_CODE_SUCCESS;
bool redirected = false;
switch (pMsg->msgType) {
case TDMT_SCH_QUERY:
if (!syncIsReadyForRead(pVnode->sync)) {
pMsg->code = (terrno) ? terrno : TSDB_CODE_SYN_NOT_LEADER;
redirected = true;
}
code = qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
if (redirected) {
vnodeRedirectRpcMsg(pVnode, pMsg, pMsg->code);
return 0;
}
return code;
case TDMT_SCH_MERGE_QUERY:
return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
case TDMT_SCH_QUERY_CONTINUE:

View File

@ -320,9 +320,8 @@ extern SQueryMgmt gQueryMgmt;
case QW_PHASE_POST_QUERY: \
case QW_PHASE_PRE_CQUERY: \
case QW_PHASE_POST_CQUERY: \
atomic_store_8(&(ctx)->phase, _value); \
break; \
default: \
atomic_store_8(&(ctx)->phase, _value); \
break; \
} \
} while (0)

View File

@ -83,13 +83,16 @@ void qwDestroySession(QW_FPARAMS_DEF, SQWJobInfo *pJobInfo, void* session) {
taosMemPoolDestroySession(gMemPoolHandle, session);
QW_LOCK(QW_WRITE, &pJobInfo->lock);
int32_t remainSessions = atomic_sub_fetch_32(&pJobInfo->memInfo->remainSession, 1);
if (remainSessions != 0) {
QW_UNLOCK(QW_WRITE, &pJobInfo->lock);
}
QW_TASK_DLOG("task session destoryed, remainSessions:%d", remainSessions);
if (0 == remainSessions) {
QW_LOCK(QW_WRITE, &pJobInfo->lock);
if (/*0 == taosHashGetSize(pJobInfo->pSessions) && */0 == atomic_load_32(&pJobInfo->memInfo->remainSession)) {
// if (/*0 == taosHashGetSize(pJobInfo->pSessions) && */0 == atomic_load_32(&pJobInfo->memInfo->remainSession)) {
atomic_store_8(&pJobInfo->destroyed, 1);
QW_UNLOCK(QW_WRITE, &pJobInfo->lock);
@ -98,10 +101,10 @@ void qwDestroySession(QW_FPARAMS_DEF, SQWJobInfo *pJobInfo, void* session) {
TAOS_UNUSED(taosHashRemove(gQueryMgmt.pJobInfo, id2, sizeof(id2)));
QW_TASK_DLOG_E("the whole query job removed");
} else {
QW_TASK_DLOG("job not removed, remainSessions:%d, %d", taosHashGetSize(pJobInfo->pSessions), pJobInfo->memInfo->remainSession);
QW_UNLOCK(QW_WRITE, &pJobInfo->lock);
}
// } else {
// QW_TASK_DLOG("job not removed, remainSessions:%d, %d", taosHashGetSize(pJobInfo->pSessions), pJobInfo->memInfo->remainSession);
// QW_UNLOCK(QW_WRITE, &pJobInfo->lock);
// }
}
}
@ -147,12 +150,14 @@ int32_t qwRetrieveJobInfo(QW_FPARAMS_DEF, SQWJobInfo** ppJob) {
}
QW_LOCK(QW_READ, &pJob->lock);
if (atomic_load_8(&pJob->destroyed)) {
QW_UNLOCK(QW_READ, &pJob->lock);
continue;
}
(void)atomic_add_fetch_32(&pJob->memInfo->remainSession, 1);
QW_UNLOCK(QW_READ, &pJob->lock);
break;

View File

@ -498,14 +498,14 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
int64_t rId = msg.refId;
int32_t eId = msg.execId;
SQWMsg qwMsg = {.node = node, .msg = msg.msg, .msgLen = msg.msgLen, .connInfo = pMsg->info, .msgType = pMsg->msgType};
SQWMsg qwMsg = {.node = node, .msg = msg.msg, .msgLen = msg.msgLen, .connInfo = pMsg->info, .msgType = pMsg->msgType, .code = pMsg->code};
qwMsg.msgInfo.explain = msg.explain;
qwMsg.msgInfo.taskType = msg.taskType;
qwMsg.msgInfo.needFetch = msg.needFetch;
qwMsg.msgInfo.compressMsg = msg.compress;
QW_SCH_TASK_DLOG("processQuery start, node:%p, type:%s, compress:%d, handle:%p, SQL:%s", node, TMSG_INFO(pMsg->msgType),
msg.compress, pMsg->info.handle, msg.sql);
QW_SCH_TASK_DLOG("processQuery start, node:%p, type:%s, compress:%d, handle:%p, SQL:%s, code:0x%x", node, TMSG_INFO(pMsg->msgType),
msg.compress, pMsg->info.handle, msg.sql, qwMsg.code);
code = qwProcessQuery(QW_FPARAMS(), &qwMsg, msg.sql);
msg.sql = NULL;

View File

@ -567,6 +567,13 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
QW_ERR_JRET(ctx->rspCode);
}
if (TSDB_CODE_SUCCESS != input->code) {
QW_TASK_ELOG("task already failed at phase %s, code:0x%x", qwPhaseStr(phase), input->code);
(void)qwDropTask(QW_FPARAMS());
QW_ERR_JRET(input->code);
}
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC, ctx->dynamicTask));
break;
}
@ -631,6 +638,10 @@ _return:
if (ctx) {
QW_UPDATE_RSP_CODE(ctx, code);
if (QW_PHASE_PRE_CQUERY == phase && code) {
QW_SET_PHASE(ctx, QW_PHASE_POST_CQUERY);
}
QW_UNLOCK(QW_WRITE, &ctx->lock);
qwReleaseTaskCtx(mgmt, ctx);
}
@ -767,7 +778,7 @@ _return:
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
int32_t code = 0;
SSubplan *plan = NULL;
SQWPhaseInput input = {0};
SQWPhaseInput input = {.code = qwMsg->code};
qTaskInfo_t pTaskInfo = NULL;
DataSinkHandle sinkHandle = NULL;
SQWTaskCtx *ctx = NULL;