From be90d2c511a3cd3756eea011cbc75f4b9fe3de6d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 16 Apr 2023 23:07:54 +0800 Subject: [PATCH] fix(stream): disable stream task when no tasks exist. --- include/libs/stream/tstream.h | 1 - source/dnode/vnode/src/tq/tq.c | 11 ++++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 103f807191..9e0a2826c5 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -346,7 +346,6 @@ typedef struct SStreamMeta { int32_t vgId; SRWLatch lock; int8_t walScan; - bool quit; } SStreamMeta; int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a641d44dba..190a0893a8 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -111,7 +111,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo); - tqInitialize(pVnode->pTq); + tqInitialize(pTq); return pTq; } @@ -1281,6 +1281,13 @@ int32_t tqStartStreamTasks(STQ* pTq) { SStreamMeta* pMeta = pTq->pStreamMeta; taosWLockLatch(&pMeta->lock); + int32_t numOfTasks = taosHashGetSize(pTq->pStreamMeta->pTasks); + if (numOfTasks == 0) { + tqInfo("vgId:%d no stream tasks exists", vgId); + taosWUnLockLatch(&pTq->pStreamMeta->lock); + return 0; + } + pMeta->walScan += 1; if (pMeta->walScan > 1) { @@ -1297,8 +1304,6 @@ int32_t tqStartStreamTasks(STQ* pTq) { return -1; } - int32_t numOfTasks = taosHashGetSize(pTq->pStreamMeta->pTasks); - tqInfo("vgId:%d start wal scan stream tasks, tasks:%d", vgId, numOfTasks); initOffsetForAllRestoreTasks(pTq);