From 185fa75376c6fc9de82c02d9b4da50fbd49be1c7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 3 Jan 2024 17:59:42 +0800 Subject: [PATCH] enh(stream): add task into timer to be idle during exec when the corresponding output queue is full. --- include/libs/stream/tstream.h | 23 ++-- source/dnode/mnode/impl/src/mndStream.c | 4 +- source/dnode/vnode/src/tq/tq.c | 2 +- source/dnode/vnode/src/tq/tqStreamTask.c | 6 +- source/dnode/vnode/src/tqCommon/tqCommon.c | 24 +++-- source/libs/stream/src/stream.c | 12 +-- source/libs/stream/src/streamExec.c | 118 +++++++++++++++++---- source/libs/stream/src/streamMeta.c | 4 +- source/libs/stream/src/streamQueue.c | 11 +- source/libs/stream/src/streamTask.c | 6 +- 10 files changed, 158 insertions(+), 52 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index a6dac7f5ba..79fc2a87a7 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -50,10 +50,11 @@ extern "C" { (_t)->hTaskInfo.id.streamId = 0; \ } while (0) -#define STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID (-1) -#define STREAM_EXEC_START_ALL_TASKS_ID (-2) -#define STREAM_EXEC_RESTART_ALL_TASKS_ID (-3) -#define STREAM_EXEC_STOP_ALL_TASKS_ID (-4) +#define STREAM_EXEC_T_EXTRACT_WAL_DATA (-1) +#define STREAM_EXEC_T_START_ALL_TASKS (-2) +#define STREAM_EXEC_T_RESTART_ALL_TASKS (-3) +#define STREAM_EXEC_T_STOP_ALL_TASKS (-4) +#define STREAM_EXEC_T_RESUME_TASK (-5) typedef struct SStreamTask SStreamTask; typedef struct SStreamQueue SStreamQueue; @@ -81,14 +82,12 @@ typedef enum ETaskStatus { TASK_STATUS__HALT, // pause, but not be manipulated by user command TASK_STATUS__PAUSE, // pause TASK_STATUS__CK, // stream task is in checkpoint status, no data are allowed to put into inputQ anymore -// TASK_STATUS__STREAM_SCAN_HISTORY, } ETaskStatus; enum { TASK_SCHED_STATUS__INACTIVE = 1, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__ACTIVE, - TASK_SCHED_STATUS__FAILED, TASK_SCHED_STATUS__DROPPING, }; @@ -322,10 +321,11 @@ typedef struct SStreamStatus { int8_t taskStatus; int8_t downstreamReady; // downstream tasks are all ready now, if this flag is set int8_t schedStatus; - int8_t keepTaskStatus; - bool appendTranstateBlock; // has append the transfer state data block already, todo: remove it + int32_t schedIdleTime; // idle time before invoke again + int64_t lastExecTs; // last exec time stamp + int8_t statusBackup; + bool appendTranstateBlock; // has append the transfer state data block already int32_t timerActive; // timer is active - int8_t allowedAddInTimer; // allowed to add into timer int32_t inScanHistorySentinel; } SStreamStatus; @@ -366,7 +366,8 @@ typedef struct STaskQueue { typedef struct STaskSchedInfo { int8_t status; - void* pTimer; + tmr_h pDelayTimer; + tmr_h pIdleTimer; } STaskSchedInfo; typedef struct SSinkRecorder { @@ -541,6 +542,7 @@ typedef struct { SMsgHead head; int64_t streamId; int32_t taskId; + int32_t reqType; } SStreamTaskRunReq; struct SStreamDispatchReq { @@ -764,6 +766,7 @@ SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t void streamTaskInputFail(SStreamTask* pTask); int32_t streamExecTask(SStreamTask* pTask); +int32_t streamResumeTask(SStreamTask* pTask); int32_t streamSchedExec(SStreamTask* pTask); bool streamTaskShouldStop(const SStreamTask* pStatus); bool streamTaskShouldPause(const SStreamTask* pStatus); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 3c700665d3..777a2a70b2 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1847,7 +1847,7 @@ int32_t mndPauseAllStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStre } if (atomic_load_8(&pTask->status.taskStatus) != TASK_STATUS__PAUSE) { - atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus); + atomic_store_8(&pTask->status.statusBackup, pTask->status.taskStatus); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE); } } @@ -2005,7 +2005,7 @@ int32_t mndResumeAllStreamTasks(STrans *pTrans, SMnode *pMnode, SStreamObj *pStr } if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__PAUSE) { - atomic_store_8(&pTask->status.taskStatus, pTask->status.keepTaskStatus); + atomic_store_8(&pTask->status.taskStatus, pTask->status.statusBackup); } } } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 65cc56c0b4..2533c721ac 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1068,7 +1068,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { int32_t taskId = pReq->taskId; - if (taskId == STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID) { // all tasks are extracted submit data from the wal + if (taskId == STREAM_EXEC_T_EXTRACT_WAL_DATA) { // all tasks are extracted submit data from the wal tqScanWal(pTq); return 0; } diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index aba3b0945f..b11e231c62 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -114,7 +114,8 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { pRunReq->head.vgId = vgId; pRunReq->streamId = 0; - pRunReq->taskId = STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID; + pRunReq->taskId = 0; + pRunReq->reqType = STREAM_EXEC_T_EXTRACT_WAL_DATA; SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg); @@ -138,7 +139,8 @@ int32_t tqStopStreamTasksAsync(STQ* pTq) { pRunReq->head.vgId = vgId; pRunReq->streamId = 0; - pRunReq->taskId = STREAM_EXEC_STOP_ALL_TASKS_ID; + pRunReq->taskId = 0; + pRunReq->reqType = STREAM_EXEC_T_STOP_ALL_TASKS; SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index a3d860fd78..b869d67bed 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -42,7 +42,8 @@ int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) { tqDebug("vgId:%d start all %d stream task(s) async", vgId, numOfTasks); pRunReq->head.vgId = vgId; pRunReq->streamId = 0; - pRunReq->taskId = restart ? STREAM_EXEC_RESTART_ALL_TASKS_ID : STREAM_EXEC_START_ALL_TASKS_ID; + pRunReq->taskId = 0; + pRunReq->reqType = restart ? STREAM_EXEC_T_RESTART_ALL_TASKS : STREAM_EXEC_T_START_ALL_TASKS; SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; tmsgPutToQueue(cb, STREAM_QUEUE, &msg); @@ -675,21 +676,30 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) { SStreamTaskRunReq* pReq = pMsg->pCont; - int32_t taskId = pReq->taskId; + int32_t type = pReq->reqType; int32_t vgId = pMeta->vgId; - if (taskId == STREAM_EXEC_START_ALL_TASKS_ID) { + if (type == STREAM_EXEC_T_START_ALL_TASKS) { streamMetaStartAllTasks(pMeta); return 0; - } else if (taskId == STREAM_EXEC_RESTART_ALL_TASKS_ID) { + } else if (type == STREAM_EXEC_T_RESTART_ALL_TASKS) { restartStreamTasks(pMeta, isLeader); return 0; - } else if (taskId == STREAM_EXEC_STOP_ALL_TASKS_ID) { + } else if (type == STREAM_EXEC_T_STOP_ALL_TASKS) { streamMetaStopAllTasks(pMeta); return 0; + } else if (type == STREAM_EXEC_T_RESUME_TASK) { + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); + if (pTask != NULL) { + ASSERT(streamTaskReadyToRun(pTask, NULL)); + streamResumeTask(pTask); + } + + streamMetaReleaseTask(pMeta, pTask); + return 0; } - SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, taskId); + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); if (pTask != NULL) { // even in halt status, the data in inputQ must be processed char* p = NULL; if (streamTaskReadyToRun(pTask, &p)) { @@ -706,7 +716,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead return 0; } else { // NOTE: pTask->status.schedStatus is not updated since it is not be handled by the run exec. // todo add one function to handle this - tqError("vgId:%d failed to found s-task, taskId:0x%x may have been dropped", vgId, taskId); + tqError("vgId:%d failed to found s-task, taskId:0x%x may have been dropped", vgId, pReq->taskId); return -1; } } diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 1bef42bf14..f383f0f31d 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -57,7 +57,7 @@ static void streamSchedByTimer(void* param, void* tmrId) { if (pTrigger == NULL) { stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory", nextTrigger); - taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pTimer); + taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer); return; } @@ -68,7 +68,7 @@ static void streamSchedByTimer(void* param, void* tmrId) { stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory", nextTrigger); - taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pTimer); + taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer); return; } @@ -77,7 +77,7 @@ static void streamSchedByTimer(void* param, void* tmrId) { int32_t code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger); if (code != TSDB_CODE_SUCCESS) { - taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pTimer); + taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer); return; } @@ -85,17 +85,17 @@ static void streamSchedByTimer(void* param, void* tmrId) { } } - taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pTimer); + taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer); } int32_t streamSetupScheduleTrigger(SStreamTask* pTask) { if (pTask->info.triggerParam != 0 && pTask->info.fillHistory == 0) { int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1); - ASSERT(ref == 2 && pTask->schedInfo.pTimer == NULL); + ASSERT(ref == 2 && pTask->schedInfo.pDelayTimer == NULL); stDebug("s-task:%s setup scheduler trigger, delay:%" PRId64 " ms", pTask->id.idStr, pTask->info.triggerParam); - pTask->schedInfo.pTimer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamTimer); + pTask->schedInfo.pDelayTimer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamTimer); pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index b9839dfc0c..70b7ec3309 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -549,6 +549,13 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock return code; } +static void setTaskSchedInfo(SStreamTask* pTask, int32_t idleTime) { + SStreamStatus* pStatus = &pTask->status; + + pStatus->schedIdleTime = idleTime; + pStatus->lastExecTs = taosGetTimestampMs(); +} + /** * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the * appropriate batch of blocks should be handled in 5 to 10 sec. @@ -568,6 +575,12 @@ int32_t doStreamExecTask(SStreamTask* pTask) { break; } + if (streamQueueIsFull(pTask->outputq.queue)) { + stWarn("s-task:%s outputQ is full, idle for 500ms and retry", id); + setTaskSchedInfo(pTask, 500); + break; + } + /*int32_t code = */ streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks, &blockSize); if (pInput == NULL) { ASSERT(numOfBlocks == 0); @@ -582,7 +595,7 @@ int32_t doStreamExecTask(SStreamTask* pTask) { continue; } - if (pInput->type == STREAM_INPUT__TRANS_STATE) { + if (type == STREAM_INPUT__TRANS_STATE) { streamProcessTranstateBlock(pTask, (SStreamDataBlock*)pInput); continue; } @@ -671,27 +684,85 @@ bool streamTaskIsIdle(const SStreamTask* pTask) { } bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus) { - ETaskStatus st = streamTaskGetStatus(pTask, NULL); - return (st == TASK_STATUS__READY || st == TASK_STATUS__SCAN_HISTORY/* || st == TASK_STATUS__STREAM_SCAN_HISTORY*/ || - st == TASK_STATUS__CK); + ETaskStatus st = streamTaskGetStatus(pTask, pStatus); + return (st == TASK_STATUS__READY || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK); } -int32_t streamExecTask(SStreamTask* pTask) { - // this function may be executed by multi-threads, so status check is required. +static void doStreamExecTaskHelper(void* param, void* tmrId) { + SStreamTask* pTask = (SStreamTask*)param; + + char* p = NULL; + ETaskStatus status = streamTaskGetStatus(pTask, &p); + if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) { + streamTaskSetSchedStatusInactive(pTask); + + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s status:%s not resume task, ref:%d", pTask->id.idStr, p, ref); + + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } + + // task resume running + SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); + if (pRunReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + /*int8_t status = */streamTaskSetSchedStatusInactive(pTask); + + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stError("failed to create msg to resume s-task:%s, reason out of memory, ref:%d", pTask->id.idStr, ref); + + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } + + pRunReq->head.vgId = pTask->info.nodeId; + pRunReq->streamId = pTask->id.streamId; + pRunReq->taskId = pTask->id.taskId; + pRunReq->reqType = STREAM_EXEC_T_RESUME_TASK; + + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("trigger to resume s-task:%s after being idled for %dms, ref:%d", pTask->id.idStr, pTask->status.schedIdleTime, ref); + + SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; + tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &msg); + + // release the task ref count + streamMetaReleaseTask(pTask->pMeta, pTask); +} + +static int32_t schedTaskInFuture(SStreamTask* pTask) { + int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s task should idle, add into timer to retry in %dms, ref:%d", + pTask->id.idStr, DISPATCH_RETRY_INTERVAL_MS, ref); + + // add one ref count for task + SStreamTask* pAddRefTask = streamMetaAcquireTask(pTask->pMeta, pTask->id.streamId, pTask->id.taskId); + + if (pTask->schedInfo.pIdleTimer == NULL) { + pTask->schedInfo.pIdleTimer = taosTmrStart(doStreamExecTaskHelper, pTask->status.schedIdleTime, pTask, streamTimer); + } else { + taosTmrReset(doStreamExecTaskHelper, pTask->status.schedIdleTime, pTask, streamTimer, &pTask->schedInfo.pIdleTimer); + } + + return TSDB_CODE_SUCCESS; +} + +int32_t streamResumeTask(SStreamTask* pTask) { + ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__ACTIVE); const char* id = pTask->id.idStr; - int8_t schedStatus = streamTaskSetSchedStatusActive(pTask); - if (schedStatus == TASK_SCHED_STATUS__WAITING) { - while (1) { - int32_t code = doStreamExecTask(pTask); - if (code < 0) { // todo this status should be removed - atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__FAILED); - return -1; - } + while (1) { + /*int32_t code = */doStreamExecTask(pTask); + taosThreadMutexLock(&pTask->lock); - taosThreadMutexLock(&pTask->lock); - if ((streamQueueGetNumOfItems(pTask->inputq.queue) == 0) || streamTaskShouldStop(pTask) || - streamTaskShouldPause(pTask)) { + // check if this task needs to be idle for a while + if (pTask->status.schedIdleTime > 0) { + stDebug("s-task:%s idled, and will be invoked in %dms", id, pTask->status.schedIdleTime); + schedTaskInFuture(pTask); + } else { + int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue); + if ((numOfItems == 0) || streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) { atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); taosThreadMutexUnlock(&pTask->lock); @@ -700,8 +771,19 @@ int32_t streamExecTask(SStreamTask* pTask) { stDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, p, pTask->status.schedStatus); return 0; } - taosThreadMutexUnlock(&pTask->lock); } + + taosThreadMutexUnlock(&pTask->lock); + } +} + +int32_t streamExecTask(SStreamTask* pTask) { + // this function may be executed by multi-threads, so status check is required. + const char* id = pTask->id.idStr; + + int8_t schedStatus = streamTaskSetSchedStatusActive(pTask); + if (schedStatus == TASK_SCHED_STATUS__WAITING) { + streamResumeTask(pTask); } else { char* p = NULL; streamTaskGetStatus(pTask, &p); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 9c94799237..dc2e3bd651 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -452,7 +452,7 @@ void streamMetaClear(SStreamMeta* pMeta) { // release the ref by timer if (p->info.triggerParam != 0 && p->info.fillHistory == 0) { // one more ref in timer stDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", p->id.idStr, p->refCnt); - taosTmrStop(p->schedInfo.pTimer); + taosTmrStop(p->schedInfo.pDelayTimer); p->info.triggerParam = 0; streamMetaReleaseTask(pMeta, p); } @@ -730,7 +730,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t if (pTask->info.triggerParam != 0 && pTask->info.fillHistory == 0) { stDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", pTask->id.idStr, pTask->refCnt); - taosTmrStop(pTask->schedInfo.pTimer); + taosTmrStop(pTask->schedInfo.pDelayTimer); pTask->info.triggerParam = 0; streamMetaReleaseTask(pMeta, pTask); } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index d1610362f9..459efb838c 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -343,6 +343,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock) { STaosQueue* pQueue = pTask->outputq.queue->pQueue; +#if 0 // wait for the output queue is available for new data to dispatch while (streamQueueIsFull(pTask->outputq.queue)) { if (streamTaskShouldStop(pTask)) { @@ -358,6 +359,7 @@ int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBloc taosMsleep(OUTPUT_QUEUE_FULL_WAIT_DURATION); } +#endif int32_t code = taosWriteQitem(pQueue, pBlock); @@ -367,7 +369,14 @@ int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBloc stError("s-task:%s failed to put res into outputQ, outputQ items:%d, size:%.2fMiB code:%s, result lost", pTask->id.idStr, total + 1, size, tstrerror(code)); } else { - stDebug("s-task:%s data put into outputQ, outputQ items:%d, size:%.2fMiB", pTask->id.idStr, total, size); + if (streamQueueIsFull(pTask->outputq.queue)) { + stWarn( + "s-task:%s outputQ is full(outputQ items:%d, size:%.2fMiB), set the output status BLOCKING, wait for 500ms " + "after handle this batch of blocks", + pTask->id.idStr, total, size); + } else { + stDebug("s-task:%s data put into outputQ, outputQ items:%d, size:%.2fMiB", pTask->id.idStr, total, size); + } } return TSDB_CODE_SUCCESS; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 4d343a484d..32fd5cbb39 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -328,9 +328,9 @@ void tFreeStreamTask(SStreamTask* pTask) { taosMsleep(100); } - if (pTask->schedInfo.pTimer != NULL) { - taosTmrStop(pTask->schedInfo.pTimer); - pTask->schedInfo.pTimer = NULL; + if (pTask->schedInfo.pDelayTimer != NULL) { + taosTmrStop(pTask->schedInfo.pDelayTimer); + pTask->schedInfo.pDelayTimer = NULL; } if (pTask->hTaskInfo.pTimer != NULL) {