fix(stream): wait for 50ms when no token in bucket for sink task.

This commit is contained in:
Haojun Liao 2024-04-16 14:42:21 +08:00
parent 810eb22eb2
commit 8bdd43c305
6 changed files with 32 additions and 21 deletions

View File

@ -247,11 +247,11 @@ typedef enum {
TASK_SCANHISTORY_CONT = 0x1,
TASK_SCANHISTORY_QUIT = 0x2,
TASK_SCANHISTORY_REXEC = 0x3,
} EScanHistoryRet;
} EScanHistoryCode;
typedef struct {
EScanHistoryRet ret;
int32_t idleTime;
EScanHistoryCode ret;
int32_t idleTime;
} SScanhistoryDataInfo;
typedef struct {
@ -811,7 +811,7 @@ int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq*
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp);
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask);
int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated);
int32_t streamReExecScanHistoryFuture(SStreamTask* pTask, int32_t idleDuration);
int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration);
bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer);
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);

View File

@ -936,7 +936,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
if (retInfo.ret == TASK_SCANHISTORY_REXEC) {
streamReExecScanHistoryFuture(pTask, retInfo.idleTime);
streamExecScanHistoryInFuture(pTask, retInfo.idleTime);
} else {
SStreamTaskState* p = streamTaskGetStatus(pTask);
ETaskStatus s = p->state;

View File

@ -87,6 +87,11 @@ struct SStreamQueue {
int8_t status;
};
typedef enum {
EXEC_CONTINUE = 0x0,
EXEC_AFTER_IDLE = 0x1,
} EExtractDataCode;
extern void* streamTimer;
extern int32_t streamBackendId;
extern int32_t streamBackendCfWrapperId;
@ -125,7 +130,7 @@ void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo);
int32_t streamTaskResetTimewindowFilter(SStreamTask* pTask);
void streamClearChkptReadyMsg(SStreamTask* pTask);
int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
int32_t* blockSize);
int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem);
void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size);

View File

@ -20,6 +20,7 @@
#define STREAM_RESULT_DUMP_THRESHOLD 300
#define STREAM_RESULT_DUMP_SIZE_THRESHOLD (1048576 * 1) // 1MiB result data
#define STREAM_SCAN_HISTORY_TIMESLICE 1000 // 1000 ms
#define MIN_INVOKE_INTERVAL 50 // 50ms
static int32_t streamTransferStateDoPrepare(SStreamTask* pTask);
@ -580,16 +581,21 @@ int32_t doStreamExecTask(SStreamTask* pTask) {
return 0;
}
if (taosGetTimestampMs() - pTask->status.lastExecTs < 50) {
if (taosGetTimestampMs() - pTask->status.lastExecTs < MIN_INVOKE_INTERVAL) {
stDebug("s-task:%s invoke with high frequency, idle and retry exec in 50ms", id);
setTaskSchedInfo(pTask, 50);
setTaskSchedInfo(pTask, MIN_INVOKE_INTERVAL);
return 0;
}
/*int32_t code = */ streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks, &blockSize);
if (pInput == NULL) {
ASSERT(numOfBlocks == 0);
return 0;
EExtractDataCode ret = streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks, &blockSize);
if (ret == EXEC_AFTER_IDLE) {
ASSERT(pInput == NULL && numOfBlocks == 0);
setTaskSchedInfo(pTask, MIN_INVOKE_INTERVAL);
} else {
if (pInput == NULL) {
ASSERT(numOfBlocks == 0);
return 0;
}
}
// dispatch checkpoint msg to all downstream tasks

View File

@ -145,7 +145,7 @@ const char* streamQueueItemGetTypeStr(int32_t type) {
}
}
int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
int32_t* blockSize) {
const char* id = pTask->id.idStr;
int32_t taskLevel = pTask->info.taskLevel;
@ -157,13 +157,13 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
// no available token in bucket for sink task, let's wait for a little bit
if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, id))) {
stDebug("s-task:%s no available token in bucket for sink data, wait for 10ms", id);
return TSDB_CODE_SUCCESS;
return EXEC_AFTER_IDLE;
}
while (1) {
if (streamTaskShouldPause(pTask) || streamTaskShouldStop(pTask)) {
stDebug("s-task:%s task should pause, extract input blocks:%d", id, *numOfBlocks);
return TSDB_CODE_SUCCESS;
return EXEC_CONTINUE;
}
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputq.queue);
@ -179,7 +179,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
streamTaskPutbackToken(pTask->outputInfo.pTokenBucket);
}
return TSDB_CODE_SUCCESS;
return EXEC_CONTINUE;
}
// do not merge blocks for sink node and check point data block
@ -196,7 +196,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
*blockSize = 0;
*numOfBlocks = 1;
*pInput = qItem;
return TSDB_CODE_SUCCESS;
return EXEC_CONTINUE;
} else { // previous existed blocks needs to be handle, before handle the checkpoint msg block
stDebug("s-task:%s %s msg extracted, handle previous blocks, numOfBlocks:%d", id, p, *numOfBlocks);
*blockSize = streamQueueItemGetSize(*pInput);
@ -205,7 +205,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
}
streamQueueProcessFail(pTask->inputq.queue);
return TSDB_CODE_SUCCESS;
return EXEC_CONTINUE;
}
} else {
if (*pInput == NULL) {
@ -226,7 +226,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
}
streamQueueProcessFail(pTask->inputq.queue);
return TSDB_CODE_SUCCESS;
return EXEC_CONTINUE;
}
*pInput = newRet;
@ -243,7 +243,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
}
return TSDB_CODE_SUCCESS;
return EXEC_CONTINUE;
}
}
}

View File

@ -109,7 +109,7 @@ static void doReExecScanhistory(void* param, void* tmrId) {
}
}
int32_t streamReExecScanHistoryFuture(SStreamTask* pTask, int32_t idleDuration) {
int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) {
int32_t numOfTicks = idleDuration / SCANHISTORY_IDLE_TIME_SLICE;
if (numOfTicks <= 0) {
numOfTicks = 1;