From 4ea49fe28578c225a1a3f3961eac669e96cd2272 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 22 Nov 2023 23:49:16 +0800 Subject: [PATCH] fix(stream): fix the race condition in dispatching data. --- include/libs/stream/tstream.h | 1 + source/libs/stream/src/streamCheckpoint.c | 2 ++ source/libs/stream/src/streamDispatch.c | 36 +++++++---------------- source/libs/stream/src/streamExec.c | 1 + 4 files changed, 14 insertions(+), 26 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index d48665d351..8277d4749e 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -305,6 +305,7 @@ typedef struct SCheckpointInfo { int64_t processedVer; // already processed ver, that has generated results version. int64_t nextProcessVer; // current offset in WAL, not serialize it int64_t failedId; // record the latest failed checkpoint id + bool dispatchCheckpointTrigger; } SCheckpointInfo; typedef struct SStreamStatus { diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 6201329b95..031bb812de 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -158,6 +158,7 @@ static int32_t continueDispatchCheckpointBlock(SStreamDataBlock* pBlock, SStream int32_t code = taosWriteQitem(pTask->outputq.queue->pQueue, pBlock); if (code == 0) { + ASSERT(pTask->chkInfo.dispatchCheckpointTrigger == false); streamDispatchStreamBlock(pTask); } else { stError("s-task:%s failed to put checkpoint into outputQ, code:%s", pTask->id.idStr, tstrerror(code)); @@ -278,6 +279,7 @@ void streamTaskClearCheckInfo(SStreamTask* pTask) { pTask->chkInfo.startTs = 0; // clear the recorded start time pTask->checkpointNotReadyTasks = 0; pTask->checkpointAlignCnt = 0; + pTask->chkInfo.dispatchCheckpointTrigger = false; streamTaskOpenAllUpstreamInput(pTask); // open inputQ for all upstream tasks } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 1f9b21becc..1306d9f746 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -593,6 +593,12 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { return 0; } + if (pTask->chkInfo.dispatchCheckpointTrigger) { + stDebug("s-task:%s already send checkpoint trigger, not dispatch anymore", id); + atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL); + return 0; + } + ASSERT(pTask->msgInfo.pData == NULL); stDebug("s-task:%s start to dispatch msg, set output status:%d", id, pTask->outputq.status); @@ -1039,30 +1045,14 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) { return 0; } -static void dispatchDataInFuture(void* param, void* tmrId) { - SStreamTask* pTask = param; - if (streamTaskShouldStop(pTask)) { - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref); - return; - } - - ETaskStatus status = streamTaskGetStatus(pTask, NULL); - if (status == TASK_STATUS__CK) { - stDebug("s-task:%s in checkpoint status, wait for 500ms to dispatch data downstream", pTask->id.idStr); - taosTmrReset(dispatchDataInFuture, 500, pTask, streamEnv.timer, &pTask->msgInfo.pTimer); - } else { - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s start to dispatch data, jump out of timer, ref:%d", pTask->id.idStr, ref); - streamDispatchStreamBlock(pTask); - } -} - // this message has been sent successfully, let's try next one. static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId) { destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask)); bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER); + if (delayDispatch) { + pTask->chkInfo.dispatchCheckpointTrigger = true; + } pTask->msgInfo.pData = NULL; pTask->msgInfo.dispatchMsgType = 0; @@ -1083,13 +1073,7 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId // otherwise, continue dispatch the first block to down stream task in pipeline if (delayDispatch) { - int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s in checkpoint status, add in timer, try dispatch data in 500ms, ref:%d", pTask->id.idStr, ref); - if (pTask->msgInfo.pTimer != NULL) { - taosTmrReset(dispatchDataInFuture, 500, pTask, streamEnv.timer, &pTask->msgInfo.pTimer); - } else { - pTask->msgInfo.pTimer = taosTmrStart(dispatchDataInFuture, 500, pTask, streamEnv.timer); - } + return 0; } else { streamDispatchStreamBlock(pTask); } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index cae537a860..43875319b7 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -48,6 +48,7 @@ static int32_t doOutputResultBlockImpl(SStreamTask* pTask, SStreamDataBlock* pBl return code; } + // checkpoint trigger will be checked streamDispatchStreamBlock(pTask); }