fix: quick rsp lock issue

This commit is contained in:
dapan1121 2023-04-13 18:30:13 +08:00
parent 8ad82f07b0
commit 7800da65ae
1 changed files with 28 additions and 24 deletions

View File

@ -441,34 +441,38 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes
int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWTaskCtx * ctx, SQWMsg *qwMsg, int32_t code) {
QW_LOCK(QW_WRITE, &ctx->lock);
if (QUERY_RSP_POLICY_QUICK == tsQueryRspPolicy && ctx != NULL && QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
QW_UNLOCK(QW_WRITE, &ctx->lock);
void *rsp = NULL;
int32_t dataLen = 0;
SOutputData sOutput = {0};
if (qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)) {
return TSDB_CODE_SUCCESS;
}
if (rsp) {
bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
if (qComplete) {
atomic_store_8((int8_t *)&ctx->queryEnd, true);
if (QUERY_RSP_POLICY_QUICK == tsQueryRspPolicy && ctx != NULL) {
QW_LOCK(QW_WRITE, &ctx->lock);
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
QW_UNLOCK(QW_WRITE, &ctx->lock);
void *rsp = NULL;
int32_t dataLen = 0;
SOutputData sOutput = {0};
if (qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)) {
return TSDB_CODE_SUCCESS;
}
qwMsg->connInfo = ctx->dataConnInfo;
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
if (rsp) {
bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
qwBuildAndSendFetchRsp(ctx->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code);
rsp = NULL;
qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
if (qComplete) {
atomic_store_8((int8_t *)&ctx->queryEnd, true);
}
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
dataLen);
qwMsg->connInfo = ctx->dataConnInfo;
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
qwBuildAndSendFetchRsp(ctx->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code);
rsp = NULL;
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
dataLen);
} else {
QW_UNLOCK(QW_WRITE, &ctx->lock);
}
} else {
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH);
}