fix: quick rsp issue
This commit is contained in:
parent
8ecf5bdb33
commit
8ad82f07b0
|
@ -254,6 +254,7 @@ bool qwTaskNotInExec(SQWTaskCtx *ctx) {
|
|||
return false;
|
||||
}
|
||||
|
||||
|
||||
int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) {
|
||||
int32_t taskNum = 0;
|
||||
|
||||
|
@ -438,6 +439,44 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWTaskCtx * ctx, SQWMsg *qwMsg, int32_t code) {
|
||||
QW_LOCK(QW_WRITE, &ctx->lock);
|
||||
if (QUERY_RSP_POLICY_QUICK == tsQueryRspPolicy && ctx != NULL && QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
|
||||
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
|
||||
QW_UNLOCK(QW_WRITE, &ctx->lock);
|
||||
|
||||
void *rsp = NULL;
|
||||
int32_t dataLen = 0;
|
||||
SOutputData sOutput = {0};
|
||||
if (qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (rsp) {
|
||||
bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
|
||||
|
||||
qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
|
||||
if (qComplete) {
|
||||
atomic_store_8((int8_t *)&ctx->queryEnd, true);
|
||||
}
|
||||
|
||||
qwMsg->connInfo = ctx->dataConnInfo;
|
||||
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
|
||||
|
||||
qwBuildAndSendFetchRsp(ctx->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code);
|
||||
rsp = NULL;
|
||||
|
||||
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
|
||||
dataLen);
|
||||
} else {
|
||||
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH);
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
|
||||
int32_t code = 0;
|
||||
SQWTaskCtx *ctx = NULL;
|
||||
|
@ -705,32 +744,7 @@ _return:
|
|||
input.msgType = qwMsg->msgType;
|
||||
code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
|
||||
|
||||
if (QUERY_RSP_POLICY_QUICK == tsQueryRspPolicy && ctx != NULL && QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
|
||||
void *rsp = NULL;
|
||||
int32_t dataLen = 0;
|
||||
SOutputData sOutput = {0};
|
||||
if (qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (rsp) {
|
||||
bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
|
||||
|
||||
qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
|
||||
if (qComplete) {
|
||||
atomic_store_8((int8_t *)&ctx->queryEnd, true);
|
||||
}
|
||||
|
||||
qwMsg->connInfo = ctx->dataConnInfo;
|
||||
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
|
||||
|
||||
qwBuildAndSendFetchRsp(ctx->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code);
|
||||
rsp = NULL;
|
||||
|
||||
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
|
||||
dataLen);
|
||||
}
|
||||
}
|
||||
qwQuickRspFetchReq(QW_FPARAMS(), ctx, qwMsg, code);
|
||||
|
||||
QW_RET(TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
@ -900,6 +914,8 @@ _return:
|
|||
qwFreeFetchRsp(rsp);
|
||||
rsp = NULL;
|
||||
}
|
||||
} else {
|
||||
qwQuickRspFetchReq(QW_FPARAMS(), ctx, qwMsg, code);
|
||||
}
|
||||
|
||||
QW_RET(TSDB_CODE_SUCCESS);
|
||||
|
|
Loading…
Reference in New Issue