enh(stream): add quota limitation for sink task.

This commit is contained in:
Haojun Liao 2023-09-26 23:29:54 +08:00
parent 890d658aea
commit aec15ae39f
4 changed files with 102 additions and 37 deletions

View File

@ -49,10 +49,13 @@ typedef struct SStreamContinueExecInfo {
} SStreamContinueExecInfo;
struct STokenBucket {
int32_t capacity; // total capacity
int64_t fillTimestamp;// fill timestamp
int32_t numOfToken; // total available tokens
int32_t rate; // number of token per second
int32_t numCapacity; // total capacity, available token per second
int32_t numOfToken; // total available tokens
int32_t numRate; // number of token per second
double bytesCapacity; // available capacity for maximum input size, KiloBytes per Second
double bytesRemain; // not consumed bytes per second
double bytesRate; // number of token per second
int64_t fillTimestamp; // fill timestamp
};
struct STaskTimer {
@ -89,7 +92,7 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask);
int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask);
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask);
int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks);
int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, int32_t* blockSize);
int32_t streamQueueGetNumOfItemsInQueue(const SStreamQueue* pQueue);
int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem);
void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size);
@ -103,7 +106,7 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask);
int32_t streamTaskFillHistoryFinished(SStreamTask* pTask);
int32_t streamTransferStateToStreamTask(SStreamTask* pTask);
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t cap, int32_t rate);
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, int32_t bytesRate);
SStreamQueue* streamQueueOpen(int64_t cap);
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId);

View File

@ -524,6 +524,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
stDebug("s-task:%s start to extract data block from inputQ", id);
while (1) {
int32_t blockSize = 0;
int32_t numOfBlocks = 0;
SStreamQueueItem* pInput = NULL;
if (streamTaskShouldStop(&pTask->status)) {
@ -531,7 +532,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
break;
}
/*int32_t code = */ streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks);
/*int32_t code = */ streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks, &blockSize);
if (pInput == NULL) {
ASSERT(numOfBlocks == 0);
return 0;
@ -555,9 +556,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
// here only handle the data block sink operation
if (type == STREAM_INPUT__DATA_BLOCK) {
int32_t blockSize = streamQueueItemGetSize(pInput);
pTask->execInfo.sink.dataSize += blockSize;
stDebug("s-task:%s sink task start to sink %d blocks, size:%.2fKiB", id, numOfBlocks, SIZE_IN_KiB(blockSize));
doOutputResultBlockImpl(pTask, (SStreamDataBlock*)pInput);
continue;

View File

@ -20,6 +20,7 @@
#define STREAM_TASK_QUEUE_CAPACITY 20480
#define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30)
#define STREAM_TASK_OUTPUT_QUEUE_CAPACITY_IN_SIZE (50)
#define MAX_SMOOTH_BURST_RATIO 10 // 20 sec
// todo refactor:
// read data from input queue
@ -30,7 +31,9 @@ typedef struct SQueueReader {
int32_t waitDuration; // maximum wait time to format several block into a batch to process, unit: ms
} SQueueReader;
static bool streamTaskHasAvailableToken(STokenBucket* pBucket);
static bool streamTaskExtractAvailableToken(STokenBucket* pBucket);
static void streamTaskPutbackToken(STokenBucket* pBucket);
static void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes);
static void streamQueueCleanup(SStreamQueue* pQueue) {
void* qItem = NULL;
@ -147,15 +150,19 @@ const char* streamQueueItemGetTypeStr(int32_t type) {
}
}
int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks) {
int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
int32_t* blockSize) {
int32_t retryTimes = 0;
int32_t MAX_RETRY_TIMES = 5;
const char* id = pTask->id.idStr;
int32_t taskLevel = pTask->info.taskLevel;
*numOfBlocks = 0;
// no available token in bucket for sink task, let's wait
if (taskLevel == TASK_LEVEL__SINK && (!streamTaskHasAvailableToken(pTask->pTokenBucket))) {
*pInput = NULL;
*numOfBlocks = 0;
*blockSize = 0;
// no available token in bucket for sink task, let's wait for a little bit
if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->pTokenBucket))) {
stDebug("s-task:%s no available token in bucket for sink data, wait", id);
return TSDB_CODE_SUCCESS;
}
@ -172,6 +179,17 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
taosMsleep(10);
continue;
}
// restore the token to bucket
if (*numOfBlocks > 0) {
*blockSize = streamQueueItemGetSize(*pInput);
if (taskLevel == TASK_LEVEL__SINK) {
streamTaskConsumeQuota(pTask->pTokenBucket, *blockSize);
}
} else {
streamTaskPutbackToken(pTask->pTokenBucket);
}
return TSDB_CODE_SUCCESS;
}
@ -179,17 +197,24 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
int8_t type = qItem->type;
if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
type == STREAM_INPUT__TRANS_STATE) {
const char* p = streamQueueItemGetTypeStr(qItem->type);
const char* p = streamQueueItemGetTypeStr(type);
if (*pInput == NULL) {
stDebug("s-task:%s %s msg extracted, start to process immediately", id, p);
// restore the token to bucket in case of checkpoint/trans-state msg
streamTaskPutbackToken(pTask->pTokenBucket);
*blockSize = 0;
*numOfBlocks = 1;
*pInput = qItem;
return TSDB_CODE_SUCCESS;
} else {
// previous existed blocks needs to be handle, before handle the checkpoint msg block
} else { // previous existed blocks needs to be handle, before handle the checkpoint msg block
stDebug("s-task:%s %s msg extracted, handle previous blocks, numOfBlocks:%d", id, p, *numOfBlocks);
*blockSize = streamQueueItemGetSize(*pInput);
if (taskLevel == TASK_LEVEL__SINK) {
streamTaskConsumeQuota(pTask->pTokenBucket, *blockSize);
}
streamQueueProcessFail(pTask->inputInfo.queue);
return TSDB_CODE_SUCCESS;
}
@ -198,7 +223,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
ASSERT((*numOfBlocks) == 0);
*pInput = qItem;
} else {
// todo we need to sort the data block, instead of just appending into the array list.
// merge current block failed, let's handle the already merged blocks.
void* newRet = streamMergeQueueItem(*pInput, qItem);
if (newRet == NULL) {
if (terrno != 0) {
@ -206,6 +231,11 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
tstrerror(terrno));
}
*blockSize = streamQueueItemGetSize(*pInput);
if (taskLevel == TASK_LEVEL__SINK) {
streamTaskConsumeQuota(pTask->pTokenBucket, *blockSize);
}
streamQueueProcessFail(pTask->inputInfo.queue);
return TSDB_CODE_SUCCESS;
}
@ -218,6 +248,12 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) {
stDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM);
*blockSize = streamQueueItemGetSize(*pInput);
if (taskLevel == TASK_LEVEL__SINK) {
streamTaskConsumeQuota(pTask->pTokenBucket, *blockSize);
}
return TSDB_CODE_SUCCESS;
}
}
@ -340,43 +376,68 @@ int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBloc
return TSDB_CODE_SUCCESS;
}
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t cap, int32_t rate) {
if (cap < 50 || rate < 50 || pBucket == NULL) {
stError("failed to init sink task bucket, cap:%d, rate:%d", cap, rate);
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, int32_t bytesRate) {
if (numCap < 10 || numRate < 10 || pBucket == NULL) {
stError("failed to init sink task bucket, cap:%d, rate:%d", numCap, numRate);
return TSDB_CODE_INVALID_PARA;
}
pBucket->capacity = cap;
pBucket->rate = rate;
pBucket->numOfToken = cap;
pBucket->numCapacity = numCap;
pBucket->numOfToken = numCap;
pBucket->numRate = numRate;
pBucket->bytesRate = bytesRate;
pBucket->bytesCapacity = bytesRate * MAX_SMOOTH_BURST_RATIO;
pBucket->bytesRemain = pBucket->bytesCapacity;
pBucket->fillTimestamp = taosGetTimestampMs();
return TSDB_CODE_SUCCESS;
}
static void fillBucket(STokenBucket* pBucket) {
static void fillTokenBucket(STokenBucket* pBucket) {
int64_t now = taosGetTimestampMs();
int64_t delta = now - pBucket->fillTimestamp;
ASSERT(pBucket->numOfToken >= 0);
int32_t inc = (delta / 1000.0) * pBucket->rate;
if (inc > 0) {
if ((pBucket->numOfToken + inc) < pBucket->capacity) {
pBucket->numOfToken += inc;
int32_t incNum = (delta / 1000.0) * pBucket->numRate;
if (incNum > 0) {
if ((pBucket->numOfToken + incNum) < pBucket->numCapacity) {
pBucket->numOfToken += incNum;
} else {
pBucket->numOfToken = pBucket->capacity;
pBucket->numOfToken = pBucket->numCapacity;
}
pBucket->fillTimestamp = now;
stDebug("new token available, current:%d, inc:%d ts:%"PRId64, pBucket->numOfToken, inc, now);
stDebug("new token available, current:%d, inc:%d ts:%" PRId64, pBucket->numOfToken, incNum, now);
}
// increase the new available quota as time goes on
double incSize = (delta / 1000.0) * pBucket->bytesRate;
if (incSize > 0) {
pBucket->bytesRemain = MIN(pBucket->bytesRemain + incSize, pBucket->bytesCapacity);
stDebug("new bytes token available, current:%.2fKiB, inc:%.2fKiB ts:%" PRId64, pBucket->bytesRemain, incSize, now);
}
}
bool streamTaskHasAvailableToken(STokenBucket* pBucket) {
fillBucket(pBucket);
bool streamTaskExtractAvailableToken(STokenBucket* pBucket) {
fillTokenBucket(pBucket);
if (pBucket->numOfToken > 0) {
--pBucket->numOfToken;
return true;
if (pBucket->bytesRemain > 0) {
pBucket->numOfToken -= 1;
return true;
} else { // no available size quota now
return false;
}
} else {
return false;
}
}
void streamTaskPutbackToken(STokenBucket* pBucket) {
pBucket->numOfToken = MIN(pBucket->numOfToken + 1, pBucket->numCapacity);
}
// size in KB
void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes) {
pBucket->bytesRemain -= SIZE_IN_MiB(bytes);
}

View File

@ -431,7 +431,9 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
return TSDB_CODE_OUT_OF_MEMORY;
}
streamTaskInitTokenBucket(pTask->pTokenBucket, 50, 50);
// 2MiB per second for sink task
// 50 times sink operator per second
streamTaskInitTokenBucket(pTask->pTokenBucket, 50, 50, 2);
TdThreadMutexAttr attr = {0};
int code = taosThreadMutexAttrInit(&attr);