From aec15ae39f9e631b6a8b66b7cbba4badfb976cea Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 26 Sep 2023 23:29:54 +0800 Subject: [PATCH] enh(stream): add quota limitation for sink task. --- source/libs/stream/inc/streamInt.h | 15 ++-- source/libs/stream/src/streamExec.c | 5 +- source/libs/stream/src/streamQueue.c | 115 ++++++++++++++++++++------- source/libs/stream/src/streamTask.c | 4 +- 4 files changed, 102 insertions(+), 37 deletions(-) diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index dbe868b54f..dc5cf497db 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -49,10 +49,13 @@ typedef struct SStreamContinueExecInfo { } SStreamContinueExecInfo; struct STokenBucket { - int32_t capacity; // total capacity - int64_t fillTimestamp;// fill timestamp - int32_t numOfToken; // total available tokens - int32_t rate; // number of token per second + int32_t numCapacity; // total capacity, available token per second + int32_t numOfToken; // total available tokens + int32_t numRate; // number of token per second + double bytesCapacity; // available capacity for maximum input size, KiloBytes per Second + double bytesRemain; // not consumed bytes per second + double bytesRate; // number of token per second + int64_t fillTimestamp; // fill timestamp }; struct STaskTimer { @@ -89,7 +92,7 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask); int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask); int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask); -int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks); +int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, int32_t* blockSize); int32_t streamQueueGetNumOfItemsInQueue(const SStreamQueue* pQueue); int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem); void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size); @@ -103,7 +106,7 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask); int32_t streamTaskFillHistoryFinished(SStreamTask* pTask); int32_t streamTransferStateToStreamTask(SStreamTask* pTask); -int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t cap, int32_t rate); +int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, int32_t bytesRate); SStreamQueue* streamQueueOpen(int64_t cap); void streamQueueClose(SStreamQueue* pQueue, int32_t taskId); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index e46b094f60..a6f7ac27d4 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -524,6 +524,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { stDebug("s-task:%s start to extract data block from inputQ", id); while (1) { + int32_t blockSize = 0; int32_t numOfBlocks = 0; SStreamQueueItem* pInput = NULL; if (streamTaskShouldStop(&pTask->status)) { @@ -531,7 +532,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { break; } - /*int32_t code = */ streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks); + /*int32_t code = */ streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks, &blockSize); if (pInput == NULL) { ASSERT(numOfBlocks == 0); return 0; @@ -555,9 +556,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { // here only handle the data block sink operation if (type == STREAM_INPUT__DATA_BLOCK) { - int32_t blockSize = streamQueueItemGetSize(pInput); pTask->execInfo.sink.dataSize += blockSize; - stDebug("s-task:%s sink task start to sink %d blocks, size:%.2fKiB", id, numOfBlocks, SIZE_IN_KiB(blockSize)); doOutputResultBlockImpl(pTask, (SStreamDataBlock*)pInput); continue; diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 61453cb54e..708b0572a4 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -20,6 +20,7 @@ #define STREAM_TASK_QUEUE_CAPACITY 20480 #define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30) #define STREAM_TASK_OUTPUT_QUEUE_CAPACITY_IN_SIZE (50) +#define MAX_SMOOTH_BURST_RATIO 10 // 20 sec // todo refactor: // read data from input queue @@ -30,7 +31,9 @@ typedef struct SQueueReader { int32_t waitDuration; // maximum wait time to format several block into a batch to process, unit: ms } SQueueReader; -static bool streamTaskHasAvailableToken(STokenBucket* pBucket); +static bool streamTaskExtractAvailableToken(STokenBucket* pBucket); +static void streamTaskPutbackToken(STokenBucket* pBucket); +static void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes); static void streamQueueCleanup(SStreamQueue* pQueue) { void* qItem = NULL; @@ -147,15 +150,19 @@ const char* streamQueueItemGetTypeStr(int32_t type) { } } -int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks) { +int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, + int32_t* blockSize) { int32_t retryTimes = 0; int32_t MAX_RETRY_TIMES = 5; const char* id = pTask->id.idStr; int32_t taskLevel = pTask->info.taskLevel; - *numOfBlocks = 0; - // no available token in bucket for sink task, let's wait - if (taskLevel == TASK_LEVEL__SINK && (!streamTaskHasAvailableToken(pTask->pTokenBucket))) { + *pInput = NULL; + *numOfBlocks = 0; + *blockSize = 0; + + // no available token in bucket for sink task, let's wait for a little bit + if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->pTokenBucket))) { stDebug("s-task:%s no available token in bucket for sink data, wait", id); return TSDB_CODE_SUCCESS; } @@ -172,6 +179,17 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu taosMsleep(10); continue; } + + // restore the token to bucket + if (*numOfBlocks > 0) { + *blockSize = streamQueueItemGetSize(*pInput); + if (taskLevel == TASK_LEVEL__SINK) { + streamTaskConsumeQuota(pTask->pTokenBucket, *blockSize); + } + } else { + streamTaskPutbackToken(pTask->pTokenBucket); + } + return TSDB_CODE_SUCCESS; } @@ -179,17 +197,24 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu int8_t type = qItem->type; if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__TRANS_STATE) { - const char* p = streamQueueItemGetTypeStr(qItem->type); + const char* p = streamQueueItemGetTypeStr(type); if (*pInput == NULL) { stDebug("s-task:%s %s msg extracted, start to process immediately", id, p); + // restore the token to bucket in case of checkpoint/trans-state msg + streamTaskPutbackToken(pTask->pTokenBucket); + *blockSize = 0; *numOfBlocks = 1; *pInput = qItem; return TSDB_CODE_SUCCESS; - } else { - // previous existed blocks needs to be handle, before handle the checkpoint msg block + } else { // previous existed blocks needs to be handle, before handle the checkpoint msg block stDebug("s-task:%s %s msg extracted, handle previous blocks, numOfBlocks:%d", id, p, *numOfBlocks); + *blockSize = streamQueueItemGetSize(*pInput); + if (taskLevel == TASK_LEVEL__SINK) { + streamTaskConsumeQuota(pTask->pTokenBucket, *blockSize); + } + streamQueueProcessFail(pTask->inputInfo.queue); return TSDB_CODE_SUCCESS; } @@ -198,7 +223,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu ASSERT((*numOfBlocks) == 0); *pInput = qItem; } else { - // todo we need to sort the data block, instead of just appending into the array list. + // merge current block failed, let's handle the already merged blocks. void* newRet = streamMergeQueueItem(*pInput, qItem); if (newRet == NULL) { if (terrno != 0) { @@ -206,6 +231,11 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu tstrerror(terrno)); } + *blockSize = streamQueueItemGetSize(*pInput); + if (taskLevel == TASK_LEVEL__SINK) { + streamTaskConsumeQuota(pTask->pTokenBucket, *blockSize); + } + streamQueueProcessFail(pTask->inputInfo.queue); return TSDB_CODE_SUCCESS; } @@ -218,6 +248,12 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) { stDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM); + + *blockSize = streamQueueItemGetSize(*pInput); + if (taskLevel == TASK_LEVEL__SINK) { + streamTaskConsumeQuota(pTask->pTokenBucket, *blockSize); + } + return TSDB_CODE_SUCCESS; } } @@ -340,43 +376,68 @@ int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBloc return TSDB_CODE_SUCCESS; } -int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t cap, int32_t rate) { - if (cap < 50 || rate < 50 || pBucket == NULL) { - stError("failed to init sink task bucket, cap:%d, rate:%d", cap, rate); +int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, int32_t bytesRate) { + if (numCap < 10 || numRate < 10 || pBucket == NULL) { + stError("failed to init sink task bucket, cap:%d, rate:%d", numCap, numRate); return TSDB_CODE_INVALID_PARA; } - pBucket->capacity = cap; - pBucket->rate = rate; - pBucket->numOfToken = cap; + pBucket->numCapacity = numCap; + pBucket->numOfToken = numCap; + pBucket->numRate = numRate; + + pBucket->bytesRate = bytesRate; + pBucket->bytesCapacity = bytesRate * MAX_SMOOTH_BURST_RATIO; + pBucket->bytesRemain = pBucket->bytesCapacity; + pBucket->fillTimestamp = taosGetTimestampMs(); return TSDB_CODE_SUCCESS; } -static void fillBucket(STokenBucket* pBucket) { +static void fillTokenBucket(STokenBucket* pBucket) { int64_t now = taosGetTimestampMs(); int64_t delta = now - pBucket->fillTimestamp; ASSERT(pBucket->numOfToken >= 0); - int32_t inc = (delta / 1000.0) * pBucket->rate; - if (inc > 0) { - if ((pBucket->numOfToken + inc) < pBucket->capacity) { - pBucket->numOfToken += inc; + int32_t incNum = (delta / 1000.0) * pBucket->numRate; + if (incNum > 0) { + if ((pBucket->numOfToken + incNum) < pBucket->numCapacity) { + pBucket->numOfToken += incNum; } else { - pBucket->numOfToken = pBucket->capacity; + pBucket->numOfToken = pBucket->numCapacity; } - pBucket->fillTimestamp = now; - stDebug("new token available, current:%d, inc:%d ts:%"PRId64, pBucket->numOfToken, inc, now); + stDebug("new token available, current:%d, inc:%d ts:%" PRId64, pBucket->numOfToken, incNum, now); + } + + // increase the new available quota as time goes on + double incSize = (delta / 1000.0) * pBucket->bytesRate; + if (incSize > 0) { + pBucket->bytesRemain = MIN(pBucket->bytesRemain + incSize, pBucket->bytesCapacity); + stDebug("new bytes token available, current:%.2fKiB, inc:%.2fKiB ts:%" PRId64, pBucket->bytesRemain, incSize, now); } } -bool streamTaskHasAvailableToken(STokenBucket* pBucket) { - fillBucket(pBucket); +bool streamTaskExtractAvailableToken(STokenBucket* pBucket) { + fillTokenBucket(pBucket); + if (pBucket->numOfToken > 0) { - --pBucket->numOfToken; - return true; + if (pBucket->bytesRemain > 0) { + pBucket->numOfToken -= 1; + return true; + } else { // no available size quota now + return false; + } } else { return false; } +} + +void streamTaskPutbackToken(STokenBucket* pBucket) { + pBucket->numOfToken = MIN(pBucket->numOfToken + 1, pBucket->numCapacity); +} + +// size in KB +void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes) { + pBucket->bytesRemain -= SIZE_IN_MiB(bytes); } \ No newline at end of file diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index e5088e9c69..ea7e89cf1b 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -431,7 +431,9 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i return TSDB_CODE_OUT_OF_MEMORY; } - streamTaskInitTokenBucket(pTask->pTokenBucket, 50, 50); + // 2MiB per second for sink task + // 50 times sink operator per second + streamTaskInitTokenBucket(pTask->pTokenBucket, 50, 50, 2); TdThreadMutexAttr attr = {0}; int code = taosThreadMutexAttrInit(&attr);