enh(stream): add token bucket to limit the rate of sink operation.

This commit is contained in:
Haojun Liao 2023-09-07 09:58:48 +08:00
parent 2b84e0b02e
commit b240e91c91
5 changed files with 64 additions and 2 deletions

View File

@ -326,6 +326,13 @@ typedef struct {
int64_t sinkStart;
} STaskTimestamp;
typedef struct STokenBucket {
int32_t capacity; // total capacity
int64_t fillTimestamp;// fill timestamp
int32_t numOfToken; // total available tokens
int32_t rate; // number of token per second
} STokenBucket;
struct SStreamTask {
int64_t ver;
SStreamTaskId id;
@ -354,6 +361,7 @@ struct SStreamTask {
STaskSinkFetch fetchSink;
};
SSinkTaskRecorder sinkRecorder;
STokenBucket tokenBucket;
void* launchTaskTimer;
SMsgCb* pMsgCb; // msg handle

View File

@ -25,10 +25,12 @@ typedef struct STableSinkInfo {
} STableSinkInfo;
static int32_t doBuildSubmitFromResBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, int64_t suid,
SSDataBlock* pDataBlock, SStreamTask* pTask, SSubmitTbData* pTableData);
SSDataBlock* pDataBlock, SStreamTask* pTask, SSubmitTbData* pTableData);
static int32_t doSinkDeleteBlock(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
int64_t suid);
static int32_t tqBuildSubmitReq(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen);
static void fillBucket(STokenBucket* pBucket);
static bool hasAvailableToken(STokenBucket* pBucket);
int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
const char* pIdStr) {
@ -692,4 +694,4 @@ int32_t doBuildSubmitFromResBlock(SVnode* pVnode, int32_t blockIndex, char* stbF
taosArrayDestroy(pVals);
return code;
}
}

View File

@ -77,6 +77,8 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask);
int32_t streamTaskFillHistoryFinished(SStreamTask* pTask);
int32_t streamTransferStateToStreamTask(SStreamTask* pTask);
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t cap, int32_t rate);
#ifdef __cplusplus
}
#endif

View File

@ -29,6 +29,8 @@ typedef struct SQueueReader {
int32_t waitDuration; // maximum wait time to format several block into a batch to process, unit: ms
} SQueueReader;
static bool streamTaskHasAvailableToken(STokenBucket* pBucket);
static void streamQueueCleanup(SStreamQueue* pQueue) {
void* qItem = NULL;
while ((qItem = streamQueueNextItem(pQueue)) != NULL) {
@ -175,6 +177,14 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
return TSDB_CODE_SUCCESS;
}
STokenBucket* pBucket = &pTask->tokenBucket;
bool has = streamTaskHasAvailableToken(pBucket);
if (!has) { // no available token in th bucket, ignore this execution
qInfo("s-task:%s no available token for sink, capacity:%d, rate:%d token/sec, quit", pTask->id.idStr,
pBucket->capacity, pBucket->rate);
return TSDB_CODE_SUCCESS;
}
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputInfo.queue);
if (qItem == NULL) {
qDebug("===stream===break batchSize:%d, %s", *numOfBlocks, id);
@ -320,3 +330,42 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
return 0;
}
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t cap, int32_t rate) {
if (cap < 100 || rate < 50 || pBucket == NULL) {
qError("failed to init sink task bucket, cap:%d, rate:%d", cap, rate);
return TSDB_CODE_INVALID_PARA;
}
pBucket->capacity = cap;
pBucket->rate = rate;
pBucket->numOfToken = cap;
pBucket->fillTimestamp = taosGetTimestampMs();
return TSDB_CODE_SUCCESS;
}
static void fillBucket(STokenBucket* pBucket) {
int64_t now = taosGetTimestampMs();
int64_t delta = now - pBucket->fillTimestamp;
int32_t inc = (delta / 1000.0) * pBucket->rate;
if (inc > 0) {
if ((pBucket->numOfToken + inc) < pBucket->capacity) {
pBucket->numOfToken += inc;
} else {
pBucket->numOfToken = pBucket->capacity;
}
pBucket->fillTimestamp = now;
}
}
bool streamTaskHasAvailableToken(STokenBucket* pBucket) {
fillBucket(pBucket);
bool hasToken = pBucket->numOfToken > 0;
if (hasToken) {
return true;
} else {
return false;
}
}

View File

@ -385,6 +385,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
pTask->dataRange.range.minVer = ver;
pTask->pMsgCb = pMsgCb;
streamTaskInitTokenBucket(&pTask->tokenBucket, 400, 200);
taosThreadMutexInit(&pTask->lock, NULL);
streamTaskOpenAllUpstreamInput(pTask);