fix query end bug

This commit is contained in:
dapan1121 2022-01-25 16:21:23 +08:00
parent 16cd734b72
commit ac7008c516
1 changed files with 13 additions and 13 deletions

View File

@ -456,7 +456,7 @@ _return:
QW_RET(code); QW_RET(code);
} }
int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
int32_t code = 0; int32_t code = 0;
bool qcontinue = true; bool qcontinue = true;
SSDataBlock* pRes = NULL; SSDataBlock* pRes = NULL;
@ -485,6 +485,10 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
if (TASK_TYPE_TEMP == ctx->taskType) { if (TASK_TYPE_TEMP == ctx->taskType) {
qwFreeTaskHandle(QW_FPARAMS(), taskHandle); qwFreeTaskHandle(QW_FPARAMS(), taskHandle);
} }
if (queryEnd) {
*queryEnd = true;
}
break; break;
} }
@ -587,12 +591,7 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void
QW_ERR_RET(code); QW_ERR_RET(code);
} }
queryEnd = pOutput->queryEnd; if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) {
pOutput->queryEnd = false;
if (DS_BUF_EMPTY == pOutput->bufStatus && queryEnd) {
pOutput->queryEnd = true;
QW_SCH_TASK_DLOG("task all fetched, status:%d", JOB_TASK_STATUS_SUCCEED); QW_SCH_TASK_DLOG("task all fetched, status:%d", JOB_TASK_STATUS_SUCCEED);
QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED)); QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED));
} }
@ -996,7 +995,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) {
atomic_store_ptr(&ctx->sinkHandle, sinkHandle); atomic_store_ptr(&ctx->sinkHandle, sinkHandle);
if (pTaskInfo && sinkHandle) { if (pTaskInfo && sinkHandle) {
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx)); QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL));
} }
_return: _return:
@ -1083,6 +1082,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
SQWPhaseOutput output = {0}; SQWPhaseOutput output = {0};
void *rsp = NULL; void *rsp = NULL;
int32_t dataLen = 0; int32_t dataLen = 0;
bool queryEnd = false;
do { do {
QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, &output)); QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, &output));
@ -1102,7 +1102,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
DataSinkHandle sinkHandle = ctx->sinkHandle; DataSinkHandle sinkHandle = ctx->sinkHandle;
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx)); QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryEnd));
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
SOutputData sOutput = {0}; SOutputData sOutput = {0};
@ -1114,10 +1114,6 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
// RC WARNING // RC WARNING
atomic_store_8(&ctx->queryContinue, 1); atomic_store_8(&ctx->queryContinue, 1);
} }
if (sOutput.queryEnd) {
needStop = true;
}
if (rsp) { if (rsp) {
qwBuildFetchRsp(rsp, &sOutput, dataLen); qwBuildFetchRsp(rsp, &sOutput, dataLen);
@ -1131,6 +1127,10 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
} }
} }
if (queryEnd) {
needStop = true;
}
_return: _return:
if (NULL == ctx) { if (NULL == ctx) {