diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index eb5a432235..18f7ed061a 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -548,17 +548,9 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock return code; } -static void setTaskSchedInfo(SStreamTask* pTask, int32_t idleTime) { - SStreamStatus* pStatus = &pTask->status; - - pStatus->schedIdleTime = idleTime; - pStatus->lastExecTs = taosGetTimestampMs(); -} - -static void clearTaskSchedInfo(SStreamTask* pTask) { - SStreamStatus* pStatus = &pTask->status; - pStatus->schedIdleTime = 0; -} +static void setTaskSchedInfo(SStreamTask* pTask, int32_t idleTime) { pTask->status.schedIdleTime = idleTime; } +static void clearTaskSchedInfo(SStreamTask* pTask) { pTask->status.schedIdleTime = 0; } +static void setLastExecTs(SStreamTask* pTask, int64_t ts) { pTask->status.lastExecTs = ts; } /** * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the @@ -574,21 +566,28 @@ int32_t doStreamExecTask(SStreamTask* pTask) { int32_t blockSize = 0; int32_t numOfBlocks = 0; SStreamQueueItem* pInput = NULL; + if (streamTaskShouldStop(pTask) || (streamTaskGetStatus(pTask)->state == TASK_STATUS__UNINIT)) { stDebug("s-task:%s stream task is stopped", id); - break; + return 0; } if (streamQueueIsFull(pTask->outputq.queue)) { stWarn("s-task:%s outputQ is full, idle for 500ms and retry", id); setTaskSchedInfo(pTask, 500); - break; + return 0; } if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) { - stWarn("s-task:%s downstream task inputQ blocked, idle for 1sec and retry", pTask->id.idStr); + stWarn("s-task:%s downstream task inputQ blocked, idle for 1sec and retry", id); setTaskSchedInfo(pTask, 1000); - break; + return 0; + } + + if (taosGetTimestampMs() - pTask->status.lastExecTs < 50) { + stDebug("s-task:%s invoke with high frequency, idle and retry exec in 50ms", id); + setTaskSchedInfo(pTask, 50); + return 0; } /*int32_t code = */ streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks, &blockSize); @@ -597,9 +596,8 @@ int32_t doStreamExecTask(SStreamTask* pTask) { return 0; } - int32_t type = pInput->type; - // dispatch checkpoint msg to all downstream tasks + int32_t type = pInput->type; if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) { streamProcessCheckpointBlock(pTask, (SStreamDataBlock*)pInput); continue; @@ -646,7 +644,7 @@ int32_t doStreamExecTask(SStreamTask* pTask) { if (ver != pInfo->processedVer) { stDebug("s-task:%s update processedVer(unsaved) from %" PRId64 " to %" PRId64 " nextProcessVer:%" PRId64 " ckpt:%" PRId64, - pTask->id.idStr, pInfo->processedVer, ver, pInfo->nextProcessVer, pInfo->checkpointVer); + id, pInfo->processedVer, ver, pInfo->nextProcessVer, pInfo->checkpointVer); pInfo->processedVer = ver; } @@ -659,7 +657,7 @@ int32_t doStreamExecTask(SStreamTask* pTask) { // todo add lock SStreamTaskState* pState = streamTaskGetStatus(pTask); if (pState->state == TASK_STATUS__CK) { - stDebug("s-task:%s checkpoint block received, set status:%s", pTask->id.idStr, pState->name); + stDebug("s-task:%s checkpoint block received, set status:%s", id, pState->name); streamTaskBuildCheckpoint(pTask); } else { // todo refactor @@ -672,8 +670,8 @@ int32_t doStreamExecTask(SStreamTask* pTask) { if (code != TSDB_CODE_SUCCESS) { // todo: let's retry send rsp to upstream/mnode - stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%d, code:%s", pTask->id.idStr, - 0, tstrerror(code)); + stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%d, code:%s", id, 0, + tstrerror(code)); } } @@ -774,19 +772,24 @@ int32_t streamResumeTask(SStreamTask* pTask) { // check if this task needs to be idle for a while if (pTask->status.schedIdleTime > 0) { - stDebug("s-task:%s idled, and will be invoked in %dms", id, pTask->status.schedIdleTime); schedTaskInFuture(pTask); taosThreadMutexUnlock(&pTask->lock); + setLastExecTs(pTask, taosGetTimestampMs()); return 0; } else { int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue); + if ((numOfItems == 0) || streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) { atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); taosThreadMutexUnlock(&pTask->lock); + setLastExecTs(pTask, taosGetTimestampMs()); + char* p = streamTaskGetStatus(pTask)->name; - stDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, p, pTask->status.schedStatus); + stDebug("s-task:%s exec completed, status:%s, sched-status:%d, lastExecTs:%" PRId64, id, p, + pTask->status.schedStatus, pTask->status.lastExecTs); + return 0; } } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 5f55285ab1..d3287a6b96 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -17,8 +17,7 @@ #define MAX_STREAM_EXEC_BATCH_NUM 32 #define MAX_SMOOTH_BURST_RATIO 5 // 5 sec -#define WAIT_FOR_DURATION 40 -#define OUTPUT_QUEUE_FULL_WAIT_DURATION 500 // 500 ms +#define WAIT_FOR_DURATION 10 // todo refactor: // read data from input queue @@ -161,7 +160,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu // no available token in bucket for sink task, let's wait for a little bit if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, pTask->id.idStr))) { stDebug("s-task:%s no available token in bucket for sink data, wait for 10ms", id); - taosMsleep(10); + taosMsleep(WAIT_FOR_DURATION); return TSDB_CODE_SUCCESS; } @@ -173,11 +172,10 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputq.queue); if (qItem == NULL) { - if ((taskLevel == TASK_LEVEL__SOURCE || taskLevel == TASK_LEVEL__SINK) && (++retryTimes) < MAX_RETRY_TIMES) { - taosMsleep(WAIT_FOR_DURATION); - // todo remove it - continue; - } +// if ((taskLevel == TASK_LEVEL__SOURCE || taskLevel == TASK_LEVEL__SINK) && (++retryTimes) < MAX_RETRY_TIMES) { +// taosMsleep(WAIT_FOR_DURATION); +// continue; +// } // restore the token to bucket if (*numOfBlocks > 0) { @@ -344,25 +342,6 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) // the result should be put into the outputQ in any cases, the result may be lost otherwise. int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock) { STaosQueue* pQueue = pTask->outputq.queue->pQueue; - -#if 0 - // wait for the output queue is available for new data to dispatch - while (streamQueueIsFull(pTask->outputq.queue)) { - if (streamTaskShouldStop(pTask)) { - stInfo("s-task:%s discard result block due to task stop", pTask->id.idStr); - return TSDB_CODE_STREAM_EXEC_CANCELLED; - } - - int32_t total = streamQueueGetNumOfItems(pTask->outputq.queue); - double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue)); - // let's wait for there are enough space to hold this result pBlock - stDebug("s-task:%s outputQ is full, wait for %dms and retry, outputQ items:%d, size:%.2fMiB", pTask->id.idStr, - OUTPUT_QUEUE_FULL_WAIT_DURATION, total, size); - - taosMsleep(OUTPUT_QUEUE_FULL_WAIT_DURATION); - } -#endif - int32_t code = taosWriteQitem(pQueue, pBlock); int32_t total = streamQueueGetNumOfItems(pTask->outputq.queue); diff --git a/tests/system-test/8-stream/at_once_state_window.py b/tests/system-test/8-stream/at_once_state_window.py index fa9f4ddd78..60a4f4e890 100644 --- a/tests/system-test/8-stream/at_once_state_window.py +++ b/tests/system-test/8-stream/at_once_state_window.py @@ -59,6 +59,9 @@ class TDTestCase: self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.tdCom.stream_suffix}', des_table=self.tb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.tb_source_select_str} from {self.tb_name} partition by {partition} {partition_elm_alias} state_window({stream_state_window})', trigger_mode="at_once", subtable_value=tb_subtable_value, fill_history_value=fill_history_value) range_times = self.tdCom.range_count state_window_max = self.tdCom.dataDict['state_window_max'] + + time.sleep(2) + for i in range(range_times): state_window_value = random.randint(int((i)*state_window_max/range_times), int((i+1)*state_window_max/range_times)) for i in range(2, range_times+3):