From af1f61cd3902761b6655456ed60147cb128047dc Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 22 Feb 2024 19:05:30 +0800 Subject: [PATCH 1/3] fix(stream): fix the compatible issue when the fill-history exists. --- source/libs/stream/src/streamTask.c | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index f3494377d6..c0cd297286 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -494,6 +494,8 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i pRange->range.maxVer = ver; pRange->range.minVer = ver; } else { + // the initial value of processedVer/nextProcessVer/checkpointVer for stream task with related fill-history task + // is set at the mnode. if (pTask->info.fillHistory == 1) { pChkInfo->checkpointVer = pRange->range.maxVer; pChkInfo->processedVer = pRange->range.maxVer; @@ -502,6 +504,14 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i pChkInfo->checkpointVer = pRange->range.minVer - 1; pChkInfo->processedVer = pRange->range.minVer - 1; pChkInfo->nextProcessVer = pRange->range.minVer; + + { // for compatible purpose, remove it later + if (pRange->range.minVer == 0) { + pChkInfo->checkpointVer = 0; + pChkInfo->processedVer = 0; + stDebug("s-task:%s update the processedVer to 0 from -1 due to compatible purpose", pTask->id.idStr); + } + } } } } From d8ffa65b62c215f883fda66cb45456067198bfbd Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 23 Feb 2024 09:12:27 +0800 Subject: [PATCH 2/3] fix(stream): fix the compatible issue when the fill-history exists. --- source/libs/stream/src/streamTask.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index c0cd297286..b63dc50836 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -509,6 +509,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i if (pRange->range.minVer == 0) { pChkInfo->checkpointVer = 0; pChkInfo->processedVer = 0; + pChkInfo->nextProcessVer = 1; stDebug("s-task:%s update the processedVer to 0 from -1 due to compatible purpose", pTask->id.idStr); } } From 4248da14e7ed9ce03fe99611cf939cfed61fefed Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 23 Feb 2024 10:02:54 +0800 Subject: [PATCH 3/3] fix(stream): inc the default stream threads for each vnode. --- source/common/src/tglobal.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index ee85a909e7..7a5d554b97 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -58,7 +58,7 @@ int32_t tsNumOfMnodeQueryThreads = 4; int32_t tsNumOfMnodeFetchThreads = 1; int32_t tsNumOfMnodeReadThreads = 1; int32_t tsNumOfVnodeQueryThreads = 4; -float tsRatioOfVnodeStreamThreads = 0.5F; +float tsRatioOfVnodeStreamThreads = 1.5F; int32_t tsNumOfVnodeFetchThreads = 4; int32_t tsNumOfVnodeRsmaThreads = 2; int32_t tsNumOfQnodeQueryThreads = 4;