diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 4644c38ec4..dc125f5371 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -194,6 +194,7 @@ extern int64_t tsWalFsyncDataSizeLimit; extern int32_t tsTransPullupInterval; extern int32_t tsMqRebalanceInterval; extern int32_t tsStreamCheckpointTickInterval; +extern double tsSinkDataRate; extern int32_t tsStreamNodeCheckInterval; extern int32_t tsTtlUnit; extern int32_t tsTtlPushIntervalSec; @@ -202,9 +203,6 @@ extern int32_t tsTrimVDbIntervalSec; extern int32_t tsGrantHBInterval; extern int32_t tsUptimeInterval; -extern int32_t tsRpcRetryLimit; -extern int32_t tsRpcRetryInterval; - extern bool tsDisableStream; extern int64_t tsStreamBufferSize; extern bool tsFilterScalarMode; diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 629efa00b3..6cbe6ad7da 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -714,7 +714,6 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask); int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated); bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer); int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue); -int32_t streamQueueGetAvailableSpace(const SStreamQueue* pQueue, int32_t* availNum, double* availSize); // common int32_t streamRestoreParam(SStreamTask* pTask); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 3d7b38161a..77684552d4 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -245,6 +245,7 @@ int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch int32_t tsTransPullupInterval = 2; int32_t tsMqRebalanceInterval = 2; int32_t tsStreamCheckpointTickInterval = 300; +double tsSinkDataRate = 2.0; int32_t tsStreamNodeCheckInterval = 30; int32_t tsTtlUnit = 86400; int32_t tsTtlPushIntervalSec = 10; @@ -651,6 +652,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "disableStream", tsDisableStream, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt64(pCfg, "streamBufferSize", tsStreamBufferSize, 0, INT64_MAX, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt64(pCfg, "checkpointInterval", tsStreamCheckpointTickInterval, 60, 1200, CFG_SCOPE_SERVER) != 0) return -1; + if (cfgAddDouble(pCfg, "streamSinkDataRate", tsSinkDataRate, 0.1, 5, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, CFG_SCOPE_SERVER) != 0) return -1; diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 4cd8319a07..d98fa2f436 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -122,7 +122,7 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask); int32_t streamTaskFillHistoryFinished(SStreamTask* pTask); int32_t streamTransferStateToStreamTask(SStreamTask* pTask); -int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, int32_t quotaRate); +int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate); STaskId streamTaskExtractKey(const SStreamTask* pTask); void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo); void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo); diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index ae285046ef..e396ac77b4 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -18,7 +18,7 @@ #define MAX_STREAM_EXEC_BATCH_NUM 32 #define MAX_SMOOTH_BURST_RATIO 5 // 5 sec #define WAIT_FOR_DURATION 40 -#define SINK_TASK_IDLE_DURATION 200 // 200 ms +#define OUTPUT_QUEUE_FULL_WAIT_DURATION 500 // 500 ms // todo refactor: // read data from input queue @@ -119,14 +119,6 @@ int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue) { return numOfItems1 + numOfItems2; } -int32_t streamQueueGetAvailableSpace(const SStreamQueue* pQueue, int32_t* availNum, double* availSize) { - int32_t num = streamQueueGetNumOfItems(pQueue); - *availNum = STREAM_TASK_QUEUE_CAPACITY - num; - - *availSize = STREAM_TASK_QUEUE_CAPACITY_IN_SIZE - taosQueueMemorySize(pQueue->pQueue); - return 0; -} - // todo: fix it: data in Qall is not included here int32_t streamQueueGetItemSize(const SStreamQueue* pQueue) { return taosQueueMemorySize(pQueue->pQueue); @@ -362,9 +354,10 @@ int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBloc int32_t total = streamQueueGetNumOfItems(pTask->outputq.queue); double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue)); // let's wait for there are enough space to hold this result pBlock - stDebug("s-task:%s outputQ is full, wait for 500ms and retry, outputQ items:%d, size:%.2fMiB", pTask->id.idStr, - total, size); - taosMsleep(500); + stDebug("s-task:%s outputQ is full, wait for %dms and retry, outputQ items:%d, size:%.2fMiB", pTask->id.idStr, + OUTPUT_QUEUE_FULL_WAIT_DURATION, total, size); + + taosMsleep(OUTPUT_QUEUE_FULL_WAIT_DURATION); } int32_t code = taosWriteQitem(pQueue, pBlock); @@ -381,7 +374,7 @@ int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBloc return TSDB_CODE_SUCCESS; } -int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, int32_t quotaRate) { +int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate) { if (numCap < 10 || numRate < 10 || pBucket == NULL) { stError("failed to init sink task bucket, cap:%d, rate:%d", numCap, numRate); return TSDB_CODE_INVALID_PARA; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 57103e5a96..fd9902406d 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -427,7 +427,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i // 2MiB per second for sink task // 50 times sink operator per second - streamTaskInitTokenBucket(pTask->outputInfo.pTokenBucket, 50, 50, 2); + streamTaskInitTokenBucket(pTask->outputInfo.pTokenBucket, 50, 50, tsSinkDataRate); TdThreadMutexAttr attr = {0}; int code = taosThreadMutexAttrInit(&attr);