From ac7008c51694ab440d3117548fe4d9c4ce0c1a91 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 25 Jan 2022 16:21:23 +0800 Subject: [PATCH] fix query end bug --- source/libs/qworker/src/qworker.c | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index af422a6976..8ffa6cd60d 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -456,7 +456,7 @@ _return: QW_RET(code); } -int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { +int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { int32_t code = 0; bool qcontinue = true; SSDataBlock* pRes = NULL; @@ -485,6 +485,10 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { if (TASK_TYPE_TEMP == ctx->taskType) { qwFreeTaskHandle(QW_FPARAMS(), taskHandle); } + + if (queryEnd) { + *queryEnd = true; + } break; } @@ -587,12 +591,7 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void QW_ERR_RET(code); } - queryEnd = pOutput->queryEnd; - pOutput->queryEnd = false; - - if (DS_BUF_EMPTY == pOutput->bufStatus && queryEnd) { - pOutput->queryEnd = true; - + if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) { QW_SCH_TASK_DLOG("task all fetched, status:%d", JOB_TASK_STATUS_SUCCEED); QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED)); } @@ -996,7 +995,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) { atomic_store_ptr(&ctx->sinkHandle, sinkHandle); if (pTaskInfo && sinkHandle) { - QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx)); + QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL)); } _return: @@ -1083,6 +1082,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { SQWPhaseOutput output = {0}; void *rsp = NULL; int32_t dataLen = 0; + bool queryEnd = false; do { QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, &output)); @@ -1102,7 +1102,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { DataSinkHandle sinkHandle = ctx->sinkHandle; - QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx)); + QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryEnd)); if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { SOutputData sOutput = {0}; @@ -1114,10 +1114,6 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { // RC WARNING atomic_store_8(&ctx->queryContinue, 1); } - - if (sOutput.queryEnd) { - needStop = true; - } if (rsp) { qwBuildFetchRsp(rsp, &sOutput, dataLen); @@ -1131,6 +1127,10 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { } } + if (queryEnd) { + needStop = true; + } + _return: if (NULL == ctx) {