From 7ce3d2c200896656c15a1876602a70b111f0ea06 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 7 Oct 2023 14:54:11 +0800 Subject: [PATCH] fix(stream): quit from loop when input queue is full. --- source/dnode/vnode/src/tq/tqStreamTask.c | 2 +- source/libs/stream/inc/streamInt.h | 8 ++++---- source/libs/stream/src/streamMeta.c | 2 +- source/libs/stream/src/streamQueue.c | 22 +++++++++++----------- 4 files changed, 17 insertions(+), 17 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 8bee7d80a2..3dbb957151 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -311,7 +311,6 @@ bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) { double el = (taosGetTimestampMs() - pTask->execInfo.step2Start) / 1000.0; qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el); /*int32_t code = */streamTaskPutTranstateIntoInputQ(pTask); -// /*int32_t code = */streamSchedExec(pTask); return true; } else { qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the maximum ver:%" PRId64 ", not scan wal", @@ -390,6 +389,7 @@ static bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32 } } else { tqError("s-task:%s append input queue failed, code: too many items, ver:%" PRId64, id, pTask->chkInfo.nextProcessVer); + break; } } } diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 3ee88aaa1b..4cd8319a07 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -67,9 +67,9 @@ struct STokenBucket { int32_t numCapacity; // total capacity, available token per second int32_t numOfToken; // total available tokens int32_t numRate; // number of token per second - double bytesCapacity; // available capacity for maximum input size, KiloBytes per Second - double bytesRemain; // not consumed bytes per second - double bytesRate; // number of token per second + double quotaCapacity; // available capacity for maximum input size, KiloBytes per Second + double quotaRemain; // not consumed bytes per second + double quotaRate; // number of token per second int64_t fillTimestamp; // fill timestamp }; @@ -122,7 +122,7 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask); int32_t streamTaskFillHistoryFinished(SStreamTask* pTask); int32_t streamTransferStateToStreamTask(SStreamTask* pTask); -int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, int32_t bytesRate); +int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, int32_t quotaRate); STaskId streamTaskExtractKey(const SStreamTask* pTask); void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo); void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 05a39fa3b8..b646b470ef 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -891,7 +891,7 @@ void metaHbToMnode(void* param, void* tmrId) { entry.inputRate = entry.inputQUsed*100.0/STREAM_TASK_QUEUE_CAPACITY_IN_SIZE; if ((*pTask)->info.taskLevel == TASK_LEVEL__SINK) { - entry.sinkQuota = (*pTask)->pTokenBucket->bytesRate; + entry.sinkQuota = (*pTask)->pTokenBucket->quotaRate; entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize); } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 2975d1f0f3..8b595c2593 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -380,7 +380,7 @@ int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBloc return TSDB_CODE_SUCCESS; } -int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, int32_t bytesRate) { +int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, int32_t quotaRate) { if (numCap < 10 || numRate < 10 || pBucket == NULL) { stError("failed to init sink task bucket, cap:%d, rate:%d", numCap, numRate); return TSDB_CODE_INVALID_PARA; @@ -390,15 +390,15 @@ int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t pBucket->numOfToken = numCap; pBucket->numRate = numRate; - pBucket->bytesRate = bytesRate; - pBucket->bytesCapacity = bytesRate * MAX_SMOOTH_BURST_RATIO; - pBucket->bytesRemain = pBucket->bytesCapacity; + pBucket->quotaRate = quotaRate; + pBucket->quotaCapacity = quotaRate * MAX_SMOOTH_BURST_RATIO; + pBucket->quotaRemain = pBucket->quotaCapacity; pBucket->fillTimestamp = taosGetTimestampMs(); return TSDB_CODE_SUCCESS; } -static void fillTokenBucket(STokenBucket* pBucket) { +static void fillTokenBucket(STokenBucket* pBucket, const char* id) { int64_t now = taosGetTimestampMs(); int64_t delta = now - pBucket->fillTimestamp; ASSERT(pBucket->numOfToken >= 0); @@ -410,15 +410,15 @@ static void fillTokenBucket(STokenBucket* pBucket) { } // increase the new available quota as time goes on - double incSize = (delta / 1000.0) * pBucket->bytesRate; + double incSize = (delta / 1000.0) * pBucket->quotaRate; if (incSize > 0) { - pBucket->bytesRemain = TMIN(pBucket->bytesRemain + incSize, pBucket->bytesCapacity); + pBucket->quotaRemain = TMIN(pBucket->quotaRemain + incSize, pBucket->quotaCapacity); } if (incNum > 0) { stDebug("new token and capacity available, current token:%d inc:%d, current quota:%.2fMiB inc:%.2fMiB, ts:%" PRId64 - " wait for %.2f Sec", - pBucket->numOfToken, incNum, pBucket->bytesRemain, incSize, now, delta / 1000.0); + " idle for %.2f Sec, %s", + pBucket->numOfToken, incNum, pBucket->quotaRemain, incSize, now, delta / 1000.0, id); } } @@ -426,7 +426,7 @@ bool streamTaskExtractAvailableToken(STokenBucket* pBucket) { fillTokenBucket(pBucket); if (pBucket->numOfToken > 0) { - if (pBucket->bytesRemain > 0) { + if (pBucket->quotaRemain > 0) { pBucket->numOfToken -= 1; return true; } else { // no available size quota now @@ -443,5 +443,5 @@ void streamTaskPutbackToken(STokenBucket* pBucket) { // size in KB void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes) { - pBucket->bytesRemain -= SIZE_IN_MiB(bytes); + pBucket->quotaRemain -= SIZE_IN_MiB(bytes); } \ No newline at end of file