From c0d3c155283cfaa5658ea51b84f6df59bf37be42 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 16 Nov 2023 16:34:47 +0800 Subject: [PATCH] fix(stream): wait for task to be normal and then send data block. --- source/libs/stream/src/streamDispatch.c | 33 ++++++++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 5665e7a917..9492981306 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1039,6 +1039,25 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) { return 0; } +static void dispatchDataInFuture(void* param, void* tmrId) { + SStreamTask* pTask = param; + if (streamTaskShouldStop(pTask)) { + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref); + return; + } + + ETaskStatus status = streamTaskGetStatus(pTask, NULL); + if (status == TASK_STATUS__CK) { + stDebug("s-task:%s in checkpoint status, wait for 500ms to dispatch data downstream", pTask->id.idStr); + taosTmrReset(doRetryDispatchData, 500, pTask, streamEnv.timer, &pTask->msgInfo.pTimer); + } else { + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s start to dispatch data, jump out of timer, ref:%d", pTask->id.idStr, ref); + streamDispatchStreamBlock(pTask); + } +} + // this message has been sent successfully, let's try next one. static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId) { destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask)); @@ -1059,7 +1078,19 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL); // otherwise, continue dispatch the first block to down stream task in pipeline - streamDispatchStreamBlock(pTask); + ETaskStatus status = streamTaskGetStatus(pTask, NULL); + if (status == TASK_STATUS__CK) { + int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s in checkpoint status, add in timer, try dispatch data in 500ms, ref:%d", pTask->id.idStr, ref); + if (pTask->msgInfo.pTimer != NULL) { + taosTmrReset(dispatchDataInFuture, 500, pTask, streamEnv.timer, &pTask->msgInfo.pTimer); + } else { + pTask->msgInfo.pTimer = taosTmrStart(dispatchDataInFuture, 500, pTask, streamEnv.timer); + } + } else { + streamDispatchStreamBlock(pTask); + } + return 0; }