From 6c641cff39e7cb4d7947953e6ae4870be423e6df Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 21 Apr 2023 09:37:16 +0800 Subject: [PATCH] fix(stream): fix error in start stream tasks. --- source/dnode/vnode/src/tq/tq.c | 2 -- source/dnode/vnode/src/tq/tqRestore.c | 6 +++--- source/libs/stream/src/streamMeta.c | 2 +- tests/script/tsim/stream/basic1.sim | 2 +- 4 files changed, 5 insertions(+), 7 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 8634029dd8..ae52db163f 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1016,8 +1016,6 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { atomic_add_fetch_32(pRefBlock->dataRef, 1); if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pRefBlock) < 0) { - qError("s-task:%s stream task append submit into input queue failed", pTask->id.idStr); - atomic_sub_fetch_32(pRef, 1); taosFreeQitem(pRefBlock); continue; diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 741e633ae4..20bbcf2eeb 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -15,7 +15,7 @@ #include "tq.h" -static int32_t doCreateReqsByScanWal(SStreamMeta* pStreamMeta, bool* pScanIdle); +static int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle); // this function should be executed by stream threads. // there is a case that the WAL increases more fast than the restore procedure, and this restore procedure @@ -31,7 +31,7 @@ int32_t tqStreamTasksScanWal(STQ* pTq) { // check all restore tasks bool shouldIdle = true; - doCreateReqsByScanWal(pTq->pStreamMeta, &shouldIdle); + createStreamRunReq(pTq->pStreamMeta, &shouldIdle); int32_t times = 0; @@ -76,7 +76,7 @@ static SArray* extractTaskIdList(SStreamMeta* pStreamMeta, int32_t numOfTasks) { return pTaskIdList; } -int32_t doCreateReqsByScanWal(SStreamMeta* pStreamMeta, bool* pScanIdle) { +int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { *pScanIdle = true; bool noNewDataInWal = true; int32_t vgId = pStreamMeta->vgId; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 7f06892a9f..065e9d280f 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -192,7 +192,7 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) { SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); if (ppTask != NULL) { - if (streamTaskShouldStop(&(*ppTask)->status)) { + if (!streamTaskShouldStop(&(*ppTask)->status)) { atomic_add_fetch_32(&(*ppTask)->refCnt, 1); taosRUnLockLatch(&pMeta->lock); return *ppTask; diff --git a/tests/script/tsim/stream/basic1.sim b/tests/script/tsim/stream/basic1.sim index 15ca6bf7c9..65032817b3 100644 --- a/tests/script/tsim/stream/basic1.sim +++ b/tests/script/tsim/stream/basic1.sim @@ -37,7 +37,7 @@ if $loop_count == 20 then endi if $rows != 4 then - print =====rows=$rows, expect 4 + print =====rows=$rows expect 4 goto loop0 endi