From a69b870260511ddc93d3005f46c4642a7736533f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 16 Nov 2023 16:34:47 +0800 Subject: [PATCH 1/3] fix(stream): wait for task to be normal and then send data block. --- source/libs/stream/src/streamDispatch.c | 33 ++++++++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 6bb15dfd23..21fedfc78b 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1039,6 +1039,25 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) { return 0; } +static void dispatchDataInFuture(void* param, void* tmrId) { + SStreamTask* pTask = param; + if (streamTaskShouldStop(pTask)) { + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref); + return; + } + + ETaskStatus status = streamTaskGetStatus(pTask, NULL); + if (status == TASK_STATUS__CK) { + stDebug("s-task:%s in checkpoint status, wait for 500ms to dispatch data downstream", pTask->id.idStr); + taosTmrReset(doRetryDispatchData, 500, pTask, streamEnv.timer, &pTask->msgInfo.pTimer); + } else { + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s start to dispatch data, jump out of timer, ref:%d", pTask->id.idStr, ref); + streamDispatchStreamBlock(pTask); + } +} + // this message has been sent successfully, let's try next one. static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId) { destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask)); @@ -1059,7 +1078,19 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL); // otherwise, continue dispatch the first block to down stream task in pipeline - streamDispatchStreamBlock(pTask); + ETaskStatus status = streamTaskGetStatus(pTask, NULL); + if (status == TASK_STATUS__CK) { + int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s in checkpoint status, add in timer, try dispatch data in 500ms, ref:%d", pTask->id.idStr, ref); + if (pTask->msgInfo.pTimer != NULL) { + taosTmrReset(dispatchDataInFuture, 500, pTask, streamEnv.timer, &pTask->msgInfo.pTimer); + } else { + pTask->msgInfo.pTimer = taosTmrStart(dispatchDataInFuture, 500, pTask, streamEnv.timer); + } + } else { + streamDispatchStreamBlock(pTask); + } + return 0; } From f0f4b988a13ad90e5ba9b90df0cc4ddc950612d3 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 16 Nov 2023 17:00:50 +0800 Subject: [PATCH 2/3] fix(stream): fix syntax error. --- source/libs/stream/src/streamDispatch.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 21fedfc78b..b3e3cb554d 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1050,7 +1050,7 @@ static void dispatchDataInFuture(void* param, void* tmrId) { ETaskStatus status = streamTaskGetStatus(pTask, NULL); if (status == TASK_STATUS__CK) { stDebug("s-task:%s in checkpoint status, wait for 500ms to dispatch data downstream", pTask->id.idStr); - taosTmrReset(doRetryDispatchData, 500, pTask, streamEnv.timer, &pTask->msgInfo.pTimer); + taosTmrReset(dispatchDataInFuture, 500, pTask, streamEnv.timer, &pTask->msgInfo.pTimer); } else { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s start to dispatch data, jump out of timer, ref:%d", pTask->id.idStr, ref); From bc779edda943c78b168155da1bcd20c57553b74a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 16 Nov 2023 17:34:47 +0800 Subject: [PATCH 3/3] fix(stream): fix bug in delay send. --- source/libs/stream/src/streamDispatch.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index b3e3cb554d..42280b0d0f 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1061,7 +1061,11 @@ static void dispatchDataInFuture(void* param, void* tmrId) { // this message has been sent successfully, let's try next one. static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId) { destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask)); + + bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER); + pTask->msgInfo.pData = NULL; + pTask->msgInfo.dispatchMsgType = 0; int64_t el = taosGetTimestampMs() - pTask->msgInfo.startTs; @@ -1078,8 +1082,7 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL); // otherwise, continue dispatch the first block to down stream task in pipeline - ETaskStatus status = streamTaskGetStatus(pTask, NULL); - if (status == TASK_STATUS__CK) { + if (delayDispatch) { int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s in checkpoint status, add in timer, try dispatch data in 500ms, ref:%d", pTask->id.idStr, ref); if (pTask->msgInfo.pTimer != NULL) {