From 8bdd43c3057b53d7e5ad7e08efe39598d966ca49 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 16 Apr 2024 14:42:21 +0800 Subject: [PATCH] fix(stream): wait for 50ms when no token in bucket for sink task. --- include/libs/stream/tstream.h | 8 ++++---- source/dnode/vnode/src/tq/tq.c | 2 +- source/libs/stream/inc/streamInt.h | 7 ++++++- source/libs/stream/src/streamExec.c | 18 ++++++++++++------ source/libs/stream/src/streamQueue.c | 16 ++++++++-------- source/libs/stream/src/streamStart.c | 2 +- 6 files changed, 32 insertions(+), 21 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 8bced20ca3..5e647c0f9e 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -247,11 +247,11 @@ typedef enum { TASK_SCANHISTORY_CONT = 0x1, TASK_SCANHISTORY_QUIT = 0x2, TASK_SCANHISTORY_REXEC = 0x3, -} EScanHistoryRet; +} EScanHistoryCode; typedef struct { - EScanHistoryRet ret; - int32_t idleTime; + EScanHistoryCode ret; + int32_t idleTime; } SScanhistoryDataInfo; typedef struct { @@ -811,7 +811,7 @@ int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp); int32_t streamLaunchFillHistoryTask(SStreamTask* pTask); int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated); -int32_t streamReExecScanHistoryFuture(SStreamTask* pTask, int32_t idleDuration); +int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration); bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer); int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 30ca4c7a36..ef3e643926 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -936,7 +936,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { atomic_store_32(&pTask->status.inScanHistorySentinel, 0); if (retInfo.ret == TASK_SCANHISTORY_REXEC) { - streamReExecScanHistoryFuture(pTask, retInfo.idleTime); + streamExecScanHistoryInFuture(pTask, retInfo.idleTime); } else { SStreamTaskState* p = streamTaskGetStatus(pTask); ETaskStatus s = p->state; diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index ff56ae76b0..d1cc4fb710 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -87,6 +87,11 @@ struct SStreamQueue { int8_t status; }; +typedef enum { + EXEC_CONTINUE = 0x0, + EXEC_AFTER_IDLE = 0x1, +} EExtractDataCode; + extern void* streamTimer; extern int32_t streamBackendId; extern int32_t streamBackendCfWrapperId; @@ -125,7 +130,7 @@ void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo); int32_t streamTaskResetTimewindowFilter(SStreamTask* pTask); void streamClearChkptReadyMsg(SStreamTask* pTask); -int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, +EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, int32_t* blockSize); int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem); void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 24d1bf7603..d55382be83 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -20,6 +20,7 @@ #define STREAM_RESULT_DUMP_THRESHOLD 300 #define STREAM_RESULT_DUMP_SIZE_THRESHOLD (1048576 * 1) // 1MiB result data #define STREAM_SCAN_HISTORY_TIMESLICE 1000 // 1000 ms +#define MIN_INVOKE_INTERVAL 50 // 50ms static int32_t streamTransferStateDoPrepare(SStreamTask* pTask); @@ -580,16 +581,21 @@ int32_t doStreamExecTask(SStreamTask* pTask) { return 0; } - if (taosGetTimestampMs() - pTask->status.lastExecTs < 50) { + if (taosGetTimestampMs() - pTask->status.lastExecTs < MIN_INVOKE_INTERVAL) { stDebug("s-task:%s invoke with high frequency, idle and retry exec in 50ms", id); - setTaskSchedInfo(pTask, 50); + setTaskSchedInfo(pTask, MIN_INVOKE_INTERVAL); return 0; } - /*int32_t code = */ streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks, &blockSize); - if (pInput == NULL) { - ASSERT(numOfBlocks == 0); - return 0; + EExtractDataCode ret = streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks, &blockSize); + if (ret == EXEC_AFTER_IDLE) { + ASSERT(pInput == NULL && numOfBlocks == 0); + setTaskSchedInfo(pTask, MIN_INVOKE_INTERVAL); + } else { + if (pInput == NULL) { + ASSERT(numOfBlocks == 0); + return 0; + } } // dispatch checkpoint msg to all downstream tasks diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 0936d410bf..9f79501471 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -145,7 +145,7 @@ const char* streamQueueItemGetTypeStr(int32_t type) { } } -int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, +EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, int32_t* blockSize) { const char* id = pTask->id.idStr; int32_t taskLevel = pTask->info.taskLevel; @@ -157,13 +157,13 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu // no available token in bucket for sink task, let's wait for a little bit if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, id))) { stDebug("s-task:%s no available token in bucket for sink data, wait for 10ms", id); - return TSDB_CODE_SUCCESS; + return EXEC_AFTER_IDLE; } while (1) { if (streamTaskShouldPause(pTask) || streamTaskShouldStop(pTask)) { stDebug("s-task:%s task should pause, extract input blocks:%d", id, *numOfBlocks); - return TSDB_CODE_SUCCESS; + return EXEC_CONTINUE; } SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputq.queue); @@ -179,7 +179,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu streamTaskPutbackToken(pTask->outputInfo.pTokenBucket); } - return TSDB_CODE_SUCCESS; + return EXEC_CONTINUE; } // do not merge blocks for sink node and check point data block @@ -196,7 +196,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu *blockSize = 0; *numOfBlocks = 1; *pInput = qItem; - return TSDB_CODE_SUCCESS; + return EXEC_CONTINUE; } else { // previous existed blocks needs to be handle, before handle the checkpoint msg block stDebug("s-task:%s %s msg extracted, handle previous blocks, numOfBlocks:%d", id, p, *numOfBlocks); *blockSize = streamQueueItemGetSize(*pInput); @@ -205,7 +205,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu } streamQueueProcessFail(pTask->inputq.queue); - return TSDB_CODE_SUCCESS; + return EXEC_CONTINUE; } } else { if (*pInput == NULL) { @@ -226,7 +226,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu } streamQueueProcessFail(pTask->inputq.queue); - return TSDB_CODE_SUCCESS; + return EXEC_CONTINUE; } *pInput = newRet; @@ -243,7 +243,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize); } - return TSDB_CODE_SUCCESS; + return EXEC_CONTINUE; } } } diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 3abca307da..276997e5a8 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -109,7 +109,7 @@ static void doReExecScanhistory(void* param, void* tmrId) { } } -int32_t streamReExecScanHistoryFuture(SStreamTask* pTask, int32_t idleDuration) { +int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) { int32_t numOfTicks = idleDuration / SCANHISTORY_IDLE_TIME_SLICE; if (numOfTicks <= 0) { numOfTicks = 1;