fix: quick response invalid write and job remain issues

This commit is contained in:
dapan1121 2024-12-18 13:48:34 +08:00
parent 8d0ca8d24e
commit 8fb4e390b2
2 changed files with 60 additions and 43 deletions

View File

@ -153,6 +153,7 @@ int32_t qwRetrieveJobInfo(QW_FPARAMS_DEF, SQWJobInfo** ppJob) {
if (atomic_load_8(&pJob->destroyed)) {
QW_UNLOCK(QW_READ, &pJob->lock);
taosHashRelease(gQueryMgmt.pJobInfo, pJob);
continue;
}

View File

@ -449,9 +449,18 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes
return TSDB_CODE_SUCCESS;
}
int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWMsg *qwMsg, int32_t code) {
if (QUERY_RSP_POLICY_QUICK == tsQueryRspPolicy && ctx != NULL) {
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWMsg *qwMsg, int32_t code) {
if (QUERY_RSP_POLICY_QUICK != tsQueryRspPolicy) {
return TSDB_CODE_SUCCESS;
}
SQWTaskCtx *ctx = NULL;
QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
if (!QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
goto _return;
}
void *rsp = NULL;
int32_t dataLen = 0;
int32_t rawLen = 0;
@ -467,7 +476,7 @@ int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWMsg *qwMsg, int32
}
if (NULL == rsp && TSDB_CODE_SUCCESS == code) {
return TSDB_CODE_SUCCESS;
goto _return;
}
if (NULL != rsp) {
@ -485,15 +494,19 @@ int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWMsg *qwMsg, int32
qwMsg->connInfo = ctx->dataConnInfo;
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
QW_ERR_RET(qwBuildAndSendFetchRsp(ctx, ctx->fetchMsgType + 1, &qwMsg->connInfo, rsp, dataLen, code));
QW_ERR_JRET(qwBuildAndSendFetchRsp(ctx, ctx->fetchMsgType + 1, &qwMsg->connInfo, rsp, dataLen, code));
rsp = NULL;
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
dataLen);
}
_return:
if (ctx) {
qwReleaseTaskCtx(mgmt, ctx);
}
return TSDB_CODE_SUCCESS;
return code;
}
int32_t qwStartDynamicTaskNewExec(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWMsg *qwMsg) {
@ -748,6 +761,8 @@ int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
QW_LOCK(QW_WRITE, &ctx->lock);
ctx->ctrlConnInfo = qwMsg->connInfo;
ctx->sId = sId;
ctx->phase = -1;
@ -768,6 +783,7 @@ _return:
(void)qwDropTask(QW_FPARAMS());
}
QW_UNLOCK(QW_WRITE, &ctx->lock);
qwReleaseTaskCtx(mgmt, ctx);
}
@ -848,7 +864,7 @@ _return:
input.msgType = qwMsg->msgType;
code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
QW_ERR_RET(qwQuickRspFetchReq(QW_FPARAMS(), ctx, qwMsg, code));
QW_ERR_RET(qwQuickRspFetchReq(QW_FPARAMS(), qwMsg, code));
QW_RET(TSDB_CODE_SUCCESS);
}