From a1d71f8fa73181a9b813c260dbbf82bfc51dfa52 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Mon, 25 Jul 2022 13:28:31 +0800 Subject: [PATCH 1/2] ci(stream): recover stream ci --- tests/script/jenkins/basic.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index a606311f3c..0c19e4a2fe 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -224,7 +224,7 @@ # ---- stream ./test.sh -f tsim/stream/basic0.sim -#./test.sh -f tsim/stream/basic1.sim +./test.sh -f tsim/stream/basic1.sim ./test.sh -f tsim/stream/basic2.sim ./test.sh -f tsim/stream/drop_stream.sim ./test.sh -f tsim/stream/distributeInterval0.sim From 2fcfa22520826752e919ff2e17a64b66fd06db30 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 25 Jul 2022 14:10:46 +0800 Subject: [PATCH 2/2] fix: fix out of block in data sink issue --- source/libs/qworker/src/qworker.c | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index ebccb7950c..d77e42388b 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -66,7 +66,7 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { return TSDB_CODE_SUCCESS; } -int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { +int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { int32_t code = 0; bool qcontinue = true; SSDataBlock *pRes = NULL; @@ -104,8 +104,8 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { QW_ERR_RET(qwHandleTaskComplete(QW_FPARAMS(), ctx)); - if (queryEnd) { - *queryEnd = true; + if (queryStop) { + *queryStop = true; } break; @@ -125,6 +125,10 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { QW_TASK_DLOG("data put into sink, rows:%d, continueExecTask:%d", rows, qcontinue); if (!qcontinue) { + if (queryStop) { + *queryStop = true; + } + break; } @@ -566,7 +570,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { SQWPhaseInput input = {0}; void *rsp = NULL; int32_t dataLen = 0; - bool queryEnd = false; + bool queryStop = false; do { QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, NULL)); @@ -576,7 +580,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { atomic_store_8((int8_t *)&ctx->queryInQueue, 0); atomic_store_8((int8_t *)&ctx->queryContinue, 0); - QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryEnd)); + QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryStop)); if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { SOutputData sOutput = {0}; @@ -627,7 +631,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { } QW_LOCK(QW_WRITE, &ctx->lock); - if (queryEnd || code || 0 == atomic_load_8((int8_t *)&ctx->queryContinue)) { + if (queryStop || code || 0 == atomic_load_8((int8_t *)&ctx->queryContinue)) { // Note: query is not running anymore QW_SET_PHASE(ctx, 0); QW_UNLOCK(QW_WRITE, &ctx->lock);