Merge pull request #29199 from taosdata/fix/job.remain
fix: quick response invalid write and job remain issues
This commit is contained in:
commit
1cc8c6d6cc
|
@ -153,6 +153,7 @@ int32_t qwRetrieveJobInfo(QW_FPARAMS_DEF, SQWJobInfo** ppJob) {
|
||||||
|
|
||||||
if (atomic_load_8(&pJob->destroyed)) {
|
if (atomic_load_8(&pJob->destroyed)) {
|
||||||
QW_UNLOCK(QW_READ, &pJob->lock);
|
QW_UNLOCK(QW_READ, &pJob->lock);
|
||||||
|
taosHashRelease(gQueryMgmt.pJobInfo, pJob);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -449,51 +449,64 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWMsg *qwMsg, int32_t code) {
|
int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWMsg *qwMsg, int32_t code) {
|
||||||
if (QUERY_RSP_POLICY_QUICK == tsQueryRspPolicy && ctx != NULL) {
|
if (QUERY_RSP_POLICY_QUICK != tsQueryRspPolicy) {
|
||||||
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
|
return TSDB_CODE_SUCCESS;
|
||||||
void *rsp = NULL;
|
}
|
||||||
int32_t dataLen = 0;
|
|
||||||
int32_t rawLen = 0;
|
SQWTaskCtx *ctx = NULL;
|
||||||
SOutputData sOutput = {0};
|
QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
|
||||||
code = qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rawLen, &rsp, &sOutput);
|
if (!QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
|
|
||||||
|
void *rsp = NULL;
|
||||||
|
int32_t dataLen = 0;
|
||||||
|
int32_t rawLen = 0;
|
||||||
|
SOutputData sOutput = {0};
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rawLen, &rsp, &sOutput);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (code) {
|
||||||
|
qwFreeFetchRsp(rsp);
|
||||||
|
rsp = NULL;
|
||||||
|
dataLen = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (NULL == rsp && TSDB_CODE_SUCCESS == code) {
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (NULL != rsp) {
|
||||||
|
bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
|
||||||
|
|
||||||
|
qwBuildFetchRsp(rsp, &sOutput, dataLen, rawLen, qComplete);
|
||||||
|
if (qComplete) {
|
||||||
|
atomic_store_8((int8_t *)&ctx->queryEnd, true);
|
||||||
|
if (!ctx->dynamicTask) {
|
||||||
|
qwFreeSinkHandle(ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code) {
|
|
||||||
qwFreeFetchRsp(rsp);
|
|
||||||
rsp = NULL;
|
|
||||||
dataLen = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (NULL == rsp && TSDB_CODE_SUCCESS == code) {
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (NULL != rsp) {
|
|
||||||
bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
|
|
||||||
|
|
||||||
qwBuildFetchRsp(rsp, &sOutput, dataLen, rawLen, qComplete);
|
|
||||||
if (qComplete) {
|
|
||||||
atomic_store_8((int8_t *)&ctx->queryEnd, true);
|
|
||||||
if (!ctx->dynamicTask) {
|
|
||||||
qwFreeSinkHandle(ctx);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
qwMsg->connInfo = ctx->dataConnInfo;
|
|
||||||
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
|
|
||||||
|
|
||||||
QW_ERR_RET(qwBuildAndSendFetchRsp(ctx, ctx->fetchMsgType + 1, &qwMsg->connInfo, rsp, dataLen, code));
|
|
||||||
rsp = NULL;
|
|
||||||
|
|
||||||
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
|
|
||||||
dataLen);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
qwMsg->connInfo = ctx->dataConnInfo;
|
||||||
|
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
|
||||||
|
|
||||||
|
QW_ERR_JRET(qwBuildAndSendFetchRsp(ctx, ctx->fetchMsgType + 1, &qwMsg->connInfo, rsp, dataLen, code));
|
||||||
|
rsp = NULL;
|
||||||
|
|
||||||
|
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
|
||||||
|
dataLen);
|
||||||
|
|
||||||
|
_return:
|
||||||
|
|
||||||
|
if (ctx) {
|
||||||
|
qwReleaseTaskCtx(mgmt, ctx);
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qwStartDynamicTaskNewExec(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWMsg *qwMsg) {
|
int32_t qwStartDynamicTaskNewExec(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWMsg *qwMsg) {
|
||||||
|
@ -748,6 +761,8 @@ int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
||||||
|
|
||||||
QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
|
QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
|
||||||
|
|
||||||
|
QW_LOCK(QW_WRITE, &ctx->lock);
|
||||||
|
|
||||||
ctx->ctrlConnInfo = qwMsg->connInfo;
|
ctx->ctrlConnInfo = qwMsg->connInfo;
|
||||||
ctx->sId = sId;
|
ctx->sId = sId;
|
||||||
ctx->phase = -1;
|
ctx->phase = -1;
|
||||||
|
@ -767,7 +782,8 @@ _return:
|
||||||
if (code) {
|
if (code) {
|
||||||
(void)qwDropTask(QW_FPARAMS());
|
(void)qwDropTask(QW_FPARAMS());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
QW_UNLOCK(QW_WRITE, &ctx->lock);
|
||||||
qwReleaseTaskCtx(mgmt, ctx);
|
qwReleaseTaskCtx(mgmt, ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -848,7 +864,7 @@ _return:
|
||||||
input.msgType = qwMsg->msgType;
|
input.msgType = qwMsg->msgType;
|
||||||
code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
|
code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
|
||||||
|
|
||||||
QW_ERR_RET(qwQuickRspFetchReq(QW_FPARAMS(), ctx, qwMsg, code));
|
QW_ERR_RET(qwQuickRspFetchReq(QW_FPARAMS(), qwMsg, code));
|
||||||
|
|
||||||
QW_RET(TSDB_CODE_SUCCESS);
|
QW_RET(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue