From ebbf3af3b14464240c14aa8ea134e1970efba490 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 6 Feb 2023 13:46:47 +0800 Subject: [PATCH] fix: invalid msg order issue --- source/libs/qworker/src/qworker.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index fedaa96ed9..102a34517c 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -263,6 +263,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, SOutputData output = {0}; if (NULL == ctx->sinkHandle) { + pOutput->queryEnd = true; return TSDB_CODE_SUCCESS; } @@ -758,7 +759,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { } QW_LOCK(QW_WRITE, &ctx->lock); - if (qComplete || (queryStop && (0 == atomic_load_8((int8_t *)&ctx->queryContinue))) || code) { + if (atomic_load_8((int8_t*)&ctx->queryEnd) || (queryStop && (0 == atomic_load_8((int8_t *)&ctx->queryContinue))) || code) { // Note: query is not running anymore QW_SET_PHASE(ctx, QW_PHASE_POST_CQUERY); QW_UNLOCK(QW_WRITE, &ctx->lock);