diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 0052917eb1..310617670f 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -441,34 +441,38 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWTaskCtx * ctx, SQWMsg *qwMsg, int32_t code) { - QW_LOCK(QW_WRITE, &ctx->lock); - if (QUERY_RSP_POLICY_QUICK == tsQueryRspPolicy && ctx != NULL && QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { - QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); - QW_UNLOCK(QW_WRITE, &ctx->lock); - - void *rsp = NULL; - int32_t dataLen = 0; - SOutputData sOutput = {0}; - if (qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)) { - return TSDB_CODE_SUCCESS; - } - - if (rsp) { - bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd); - - qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete); - if (qComplete) { - atomic_store_8((int8_t *)&ctx->queryEnd, true); + if (QUERY_RSP_POLICY_QUICK == tsQueryRspPolicy && ctx != NULL) { + QW_LOCK(QW_WRITE, &ctx->lock); + if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { + QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); + QW_UNLOCK(QW_WRITE, &ctx->lock); + + void *rsp = NULL; + int32_t dataLen = 0; + SOutputData sOutput = {0}; + if (qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)) { + return TSDB_CODE_SUCCESS; } - qwMsg->connInfo = ctx->dataConnInfo; - QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); + if (rsp) { + bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd); - qwBuildAndSendFetchRsp(ctx->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code); - rsp = NULL; + qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete); + if (qComplete) { + atomic_store_8((int8_t *)&ctx->queryEnd, true); + } - QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), - dataLen); + qwMsg->connInfo = ctx->dataConnInfo; + QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); + + qwBuildAndSendFetchRsp(ctx->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code); + rsp = NULL; + + QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), + dataLen); + } else { + QW_UNLOCK(QW_WRITE, &ctx->lock); + } } else { QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH); }