enh(stream): add parameter to limit the stream sink task.

This commit is contained in:
Haojun Liao 2023-10-16 22:52:39 +08:00
parent a1aa2c9e0c
commit fdb6ec1fa3
6 changed files with 11 additions and 19 deletions

View File

@ -194,6 +194,7 @@ extern int64_t tsWalFsyncDataSizeLimit;
extern int32_t tsTransPullupInterval;
extern int32_t tsMqRebalanceInterval;
extern int32_t tsStreamCheckpointTickInterval;
extern double tsSinkDataRate;
extern int32_t tsStreamNodeCheckInterval;
extern int32_t tsTtlUnit;
extern int32_t tsTtlPushIntervalSec;
@ -202,9 +203,6 @@ extern int32_t tsTrimVDbIntervalSec;
extern int32_t tsGrantHBInterval;
extern int32_t tsUptimeInterval;
extern int32_t tsRpcRetryLimit;
extern int32_t tsRpcRetryInterval;
extern bool tsDisableStream;
extern int64_t tsStreamBufferSize;
extern bool tsFilterScalarMode;

View File

@ -714,7 +714,6 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask);
int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated);
bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer);
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);
int32_t streamQueueGetAvailableSpace(const SStreamQueue* pQueue, int32_t* availNum, double* availSize);
// common
int32_t streamRestoreParam(SStreamTask* pTask);

View File

@ -245,6 +245,7 @@ int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch
int32_t tsTransPullupInterval = 2;
int32_t tsMqRebalanceInterval = 2;
int32_t tsStreamCheckpointTickInterval = 300;
double tsSinkDataRate = 2.0;
int32_t tsStreamNodeCheckInterval = 30;
int32_t tsTtlUnit = 86400;
int32_t tsTtlPushIntervalSec = 10;
@ -651,6 +652,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "disableStream", tsDisableStream, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt64(pCfg, "streamBufferSize", tsStreamBufferSize, 0, INT64_MAX, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt64(pCfg, "checkpointInterval", tsStreamCheckpointTickInterval, 60, 1200, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddDouble(pCfg, "streamSinkDataRate", tsSinkDataRate, 0.1, 5, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, CFG_SCOPE_SERVER) != 0)
return -1;

View File

@ -122,7 +122,7 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask);
int32_t streamTaskFillHistoryFinished(SStreamTask* pTask);
int32_t streamTransferStateToStreamTask(SStreamTask* pTask);
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, int32_t quotaRate);
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate);
STaskId streamTaskExtractKey(const SStreamTask* pTask);
void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo);
void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo);

View File

@ -18,7 +18,7 @@
#define MAX_STREAM_EXEC_BATCH_NUM 32
#define MAX_SMOOTH_BURST_RATIO 5 // 5 sec
#define WAIT_FOR_DURATION 40
#define SINK_TASK_IDLE_DURATION 200 // 200 ms
#define OUTPUT_QUEUE_FULL_WAIT_DURATION 500 // 500 ms
// todo refactor:
// read data from input queue
@ -119,14 +119,6 @@ int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue) {
return numOfItems1 + numOfItems2;
}
int32_t streamQueueGetAvailableSpace(const SStreamQueue* pQueue, int32_t* availNum, double* availSize) {
int32_t num = streamQueueGetNumOfItems(pQueue);
*availNum = STREAM_TASK_QUEUE_CAPACITY - num;
*availSize = STREAM_TASK_QUEUE_CAPACITY_IN_SIZE - taosQueueMemorySize(pQueue->pQueue);
return 0;
}
// todo: fix it: data in Qall is not included here
int32_t streamQueueGetItemSize(const SStreamQueue* pQueue) {
return taosQueueMemorySize(pQueue->pQueue);
@ -362,9 +354,10 @@ int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBloc
int32_t total = streamQueueGetNumOfItems(pTask->outputq.queue);
double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
// let's wait for there are enough space to hold this result pBlock
stDebug("s-task:%s outputQ is full, wait for 500ms and retry, outputQ items:%d, size:%.2fMiB", pTask->id.idStr,
total, size);
taosMsleep(500);
stDebug("s-task:%s outputQ is full, wait for %dms and retry, outputQ items:%d, size:%.2fMiB", pTask->id.idStr,
OUTPUT_QUEUE_FULL_WAIT_DURATION, total, size);
taosMsleep(OUTPUT_QUEUE_FULL_WAIT_DURATION);
}
int32_t code = taosWriteQitem(pQueue, pBlock);
@ -381,7 +374,7 @@ int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBloc
return TSDB_CODE_SUCCESS;
}
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, int32_t quotaRate) {
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate) {
if (numCap < 10 || numRate < 10 || pBucket == NULL) {
stError("failed to init sink task bucket, cap:%d, rate:%d", numCap, numRate);
return TSDB_CODE_INVALID_PARA;

View File

@ -427,7 +427,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
// 2MiB per second for sink task
// 50 times sink operator per second
streamTaskInitTokenBucket(pTask->outputInfo.pTokenBucket, 50, 50, 2);
streamTaskInitTokenBucket(pTask->outputInfo.pTokenBucket, 50, 50, tsSinkDataRate);
TdThreadMutexAttr attr = {0};
int code = taosThreadMutexAttrInit(&attr);