From 9d735e7f8baa5b3776afa1f27d96b4ac9601a43c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 23 Nov 2023 18:04:09 +0800 Subject: [PATCH] fix(stream): reduce the sink throughput. and fix the bug in status check. --- source/dnode/mnode/impl/src/mndStream.c | 7 +++++-- source/libs/stream/src/streamTask.c | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index f727fe68fc..36d48144bc 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2775,8 +2775,11 @@ static bool needDropRelatedFillhistoryTask(STaskStatusEntry *pTaskEntry, SStream if (pTaskEntry->id.streamId == pId->streamId) { numOfTotal++; - if (pTaskEntry->id.taskId != pId->taskId && pTaskEntry->status == TASK_STATUS__READY) { - numOfReady++; + if (pTaskEntry->id.taskId != pId->taskId) { + STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId)); + if (pEntry->status == TASK_STATUS__READY) { + numOfReady++; + } } } } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 24228c0307..e1000bc68e 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -448,7 +448,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i // 2MiB per second for sink task // 50 times sink operator per second - streamTaskInitTokenBucket(pTask->outputInfo.pTokenBucket, 50, 50, tsSinkDataRate, pTask->id.idStr); + streamTaskInitTokenBucket(pTask->outputInfo.pTokenBucket, 35, 35, tsSinkDataRate, pTask->id.idStr); TdThreadMutexAttr attr = {0}; int code = taosThreadMutexAttrInit(&attr);