From 35b97cbd19edac6746c28e7fa658ff44bf42947b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 8 Sep 2023 23:50:41 +0800 Subject: [PATCH] fix(stream): set correct token in bucket. --- source/libs/stream/src/streamDispatch.c | 13 ++++++++++--- source/libs/stream/src/streamExec.c | 2 +- source/libs/stream/src/streamQueue.c | 8 ++++---- 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 916cc6e9ee..d04f55628c 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1001,10 +1001,17 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i // so the TASK_INPUT_STATUS_BLOCKED is rsp if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { pTask->inputInfo.status = TASK_INPUT_STATUS__BLOCKED; // block the input of current task, to push pressure to upstream - pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time + double el = 0; + if (pTask->msgInfo.blockingTs == 0) { + pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time + } else { + el = (taosGetTimestampMs() - pTask->msgInfo.blockingTs) / 1000.0; + } + int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1); - qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 " wait for %dms and retry dispatch data, ref:%d", - id, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS, ref); + qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 + " wait for %dms and retry dispatch data, total wait:%.2fSec ref:%d", + id, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS, el, ref); streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS); } else { // pipeline send data in output queue // this message has been sent successfully, let's try next one. diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index ff667fa778..158727efea 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -84,7 +84,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i } if (pTask->inputInfo.status == TASK_INPUT_STATUS__BLOCKED) { - qWarn("s-task:%s downstream task inputQ blocked, idle for 1sec and retry", pTask->id.idStr); + qWarn("s-task:%s downstream task inputQ blocked, idle for 1sec and retry exec task", pTask->id.idStr); taosMsleep(1000); continue; } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 609a67fb95..b65063f49a 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -347,6 +347,7 @@ int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t cap, int32_t ra static void fillBucket(STokenBucket* pBucket) { int64_t now = taosGetTimestampMs(); int64_t delta = now - pBucket->fillTimestamp; + ASSERT(pBucket->numOfToken >= 0); int32_t inc = (delta / 1000.0) * pBucket->rate; if (inc > 0) { @@ -363,10 +364,9 @@ static void fillBucket(STokenBucket* pBucket) { bool streamTaskHasAvailableToken(STokenBucket* pBucket) { fillBucket(pBucket); - bool hasToken = (--pBucket->numOfToken) > 0; - if (hasToken) { - qDebug("remain token:%d", pBucket->numOfToken); - return true; + if (pBucket->numOfToken > 0) { + qDebug("remain token:%d", pBucket->numOfToken-1); + return --pBucket->numOfToken; } else { return false; }