enh(stream): add parameter to limit the stream sink task.

This commit is contained in:
Haojun Liao 2023-10-16 22:52:39 +08:00
parent 62c58ecd8b
commit 164159f293
6 changed files with 11 additions and 19 deletions

View File

@ -194,6 +194,7 @@ extern int64_t tsWalFsyncDataSizeLimit;
extern int32_t tsTransPullupInterval; extern int32_t tsTransPullupInterval;
extern int32_t tsMqRebalanceInterval; extern int32_t tsMqRebalanceInterval;
extern int32_t tsStreamCheckpointTickInterval; extern int32_t tsStreamCheckpointTickInterval;
extern double tsSinkDataRate;
extern int32_t tsStreamNodeCheckInterval; extern int32_t tsStreamNodeCheckInterval;
extern int32_t tsTtlUnit; extern int32_t tsTtlUnit;
extern int32_t tsTtlPushIntervalSec; extern int32_t tsTtlPushIntervalSec;
@ -202,9 +203,6 @@ extern int32_t tsTrimVDbIntervalSec;
extern int32_t tsGrantHBInterval; extern int32_t tsGrantHBInterval;
extern int32_t tsUptimeInterval; extern int32_t tsUptimeInterval;
extern int32_t tsRpcRetryLimit;
extern int32_t tsRpcRetryInterval;
extern bool tsDisableStream; extern bool tsDisableStream;
extern int64_t tsStreamBufferSize; extern int64_t tsStreamBufferSize;
extern bool tsFilterScalarMode; extern bool tsFilterScalarMode;

View File

@ -714,7 +714,6 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask);
int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated); int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated);
bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer); bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer);
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue); int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);
int32_t streamQueueGetAvailableSpace(const SStreamQueue* pQueue, int32_t* availNum, double* availSize);
// common // common
int32_t streamRestoreParam(SStreamTask* pTask); int32_t streamRestoreParam(SStreamTask* pTask);

View File

@ -245,6 +245,7 @@ int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch
int32_t tsTransPullupInterval = 2; int32_t tsTransPullupInterval = 2;
int32_t tsMqRebalanceInterval = 2; int32_t tsMqRebalanceInterval = 2;
int32_t tsStreamCheckpointTickInterval = 300; int32_t tsStreamCheckpointTickInterval = 300;
double tsSinkDataRate = 2.0;
int32_t tsStreamNodeCheckInterval = 30; int32_t tsStreamNodeCheckInterval = 30;
int32_t tsTtlUnit = 86400; int32_t tsTtlUnit = 86400;
int32_t tsTtlPushIntervalSec = 10; int32_t tsTtlPushIntervalSec = 10;
@ -651,6 +652,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "disableStream", tsDisableStream, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddBool(pCfg, "disableStream", tsDisableStream, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt64(pCfg, "streamBufferSize", tsStreamBufferSize, 0, INT64_MAX, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt64(pCfg, "streamBufferSize", tsStreamBufferSize, 0, INT64_MAX, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt64(pCfg, "checkpointInterval", tsStreamCheckpointTickInterval, 60, 1200, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt64(pCfg, "checkpointInterval", tsStreamCheckpointTickInterval, 60, 1200, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddDouble(pCfg, "streamSinkDataRate", tsSinkDataRate, 0.1, 5, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, CFG_SCOPE_SERVER) != 0) if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, CFG_SCOPE_SERVER) != 0)
return -1; return -1;

View File

@ -122,7 +122,7 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask);
int32_t streamTaskFillHistoryFinished(SStreamTask* pTask); int32_t streamTaskFillHistoryFinished(SStreamTask* pTask);
int32_t streamTransferStateToStreamTask(SStreamTask* pTask); int32_t streamTransferStateToStreamTask(SStreamTask* pTask);
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, int32_t quotaRate); int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate);
STaskId streamTaskExtractKey(const SStreamTask* pTask); STaskId streamTaskExtractKey(const SStreamTask* pTask);
void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo); void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo);
void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo); void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo);

View File

@ -18,7 +18,7 @@
#define MAX_STREAM_EXEC_BATCH_NUM 32 #define MAX_STREAM_EXEC_BATCH_NUM 32
#define MAX_SMOOTH_BURST_RATIO 5 // 5 sec #define MAX_SMOOTH_BURST_RATIO 5 // 5 sec
#define WAIT_FOR_DURATION 40 #define WAIT_FOR_DURATION 40
#define SINK_TASK_IDLE_DURATION 200 // 200 ms #define OUTPUT_QUEUE_FULL_WAIT_DURATION 500 // 500 ms
// todo refactor: // todo refactor:
// read data from input queue // read data from input queue
@ -119,14 +119,6 @@ int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue) {
return numOfItems1 + numOfItems2; return numOfItems1 + numOfItems2;
} }
int32_t streamQueueGetAvailableSpace(const SStreamQueue* pQueue, int32_t* availNum, double* availSize) {
int32_t num = streamQueueGetNumOfItems(pQueue);
*availNum = STREAM_TASK_QUEUE_CAPACITY - num;
*availSize = STREAM_TASK_QUEUE_CAPACITY_IN_SIZE - taosQueueMemorySize(pQueue->pQueue);
return 0;
}
// todo: fix it: data in Qall is not included here // todo: fix it: data in Qall is not included here
int32_t streamQueueGetItemSize(const SStreamQueue* pQueue) { int32_t streamQueueGetItemSize(const SStreamQueue* pQueue) {
return taosQueueMemorySize(pQueue->pQueue); return taosQueueMemorySize(pQueue->pQueue);
@ -362,9 +354,10 @@ int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBloc
int32_t total = streamQueueGetNumOfItems(pTask->outputq.queue); int32_t total = streamQueueGetNumOfItems(pTask->outputq.queue);
double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue)); double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
// let's wait for there are enough space to hold this result pBlock // let's wait for there are enough space to hold this result pBlock
stDebug("s-task:%s outputQ is full, wait for 500ms and retry, outputQ items:%d, size:%.2fMiB", pTask->id.idStr, stDebug("s-task:%s outputQ is full, wait for %dms and retry, outputQ items:%d, size:%.2fMiB", pTask->id.idStr,
total, size); OUTPUT_QUEUE_FULL_WAIT_DURATION, total, size);
taosMsleep(500);
taosMsleep(OUTPUT_QUEUE_FULL_WAIT_DURATION);
} }
int32_t code = taosWriteQitem(pQueue, pBlock); int32_t code = taosWriteQitem(pQueue, pBlock);
@ -381,7 +374,7 @@ int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBloc
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, int32_t quotaRate) { int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate) {
if (numCap < 10 || numRate < 10 || pBucket == NULL) { if (numCap < 10 || numRate < 10 || pBucket == NULL) {
stError("failed to init sink task bucket, cap:%d, rate:%d", numCap, numRate); stError("failed to init sink task bucket, cap:%d, rate:%d", numCap, numRate);
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;

View File

@ -427,7 +427,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
// 2MiB per second for sink task // 2MiB per second for sink task
// 50 times sink operator per second // 50 times sink operator per second
streamTaskInitTokenBucket(pTask->outputInfo.pTokenBucket, 50, 50, 2); streamTaskInitTokenBucket(pTask->outputInfo.pTokenBucket, 50, 50, tsSinkDataRate);
TdThreadMutexAttr attr = {0}; TdThreadMutexAttr attr = {0};
int code = taosThreadMutexAttrInit(&attr); int code = taosThreadMutexAttrInit(&attr);