From 51bee72807351202e5d325e8c0e200680cdb634c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 10 Sep 2024 12:58:08 +0800 Subject: [PATCH] fix(stream):not reset the failed checkpointId --- source/libs/stream/src/streamCheckpoint.c | 25 ++++++++++++++---- source/libs/stream/src/streamDispatch.c | 28 ++++++++++++++------- source/libs/stream/src/streamTask.c | 6 +++-- tests/script/tsim/stream/pauseAndResume.sim | 5 +++- 4 files changed, 47 insertions(+), 17 deletions(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index e091d0f34b..242e12f591 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -235,6 +235,16 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock return code; } + if (pActiveInfo->failedId >= checkpointId) { + stError("s-task:%s vgId:%d checkpointId:%" PRId64 " transId:%d, has been marked failed, failedId:%" PRId64 + "discard the checkpoint-trigger block", + id, vgId, checkpointId, transId, pActiveInfo->failedId); + streamMutexUnlock(&pTask->lock); + + streamFreeQitem((SStreamQueueItem*)pBlock); + return code; + } + if (pTask->chkInfo.checkpointId == checkpointId) { { // send checkpoint-ready msg to upstream SRpcMsg msg = {0}; @@ -531,15 +541,20 @@ int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstream } void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) { + SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; + pTask->chkInfo.startTs = 0; // clear the recorded start time streamTaskOpenAllUpstreamInput(pTask); // open inputQ for all upstream tasks - streamMutexLock(&pTask->chkInfo.pActiveInfo->lock); - streamTaskClearActiveInfo(pTask->chkInfo.pActiveInfo); + streamMutexLock(&pInfo->lock); + streamTaskClearActiveInfo(pInfo); if (clearChkpReadyMsg) { - streamClearChkptReadyMsg(pTask->chkInfo.pActiveInfo); + streamClearChkptReadyMsg(pInfo); } - streamMutexUnlock(&pTask->chkInfo.pActiveInfo->lock); + streamMutexUnlock(&pInfo->lock); + + stDebug("s-task:%s clear active checkpointInfo, failed checkpointId:%"PRId64", current checkpointId:%"PRId64, + pTask->id.idStr, pInfo->failedId, pTask->chkInfo.checkpointId); } int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SVUpdateCheckpointInfoReq* pReq) { @@ -669,7 +684,7 @@ void streamTaskSetFailedCheckpointId(SStreamTask* pTask) { stWarn("s-task:%s checkpoint-info is cleared now, not set the failed checkpoint info", pTask->id.idStr); } else { pInfo->failedId = pInfo->activeId; - stDebug("s-task:%s mark the checkpointId:%" PRId64 " (transId:%d) failed", pTask->id.idStr, pInfo->activeId, + stDebug("s-task:%s mark and set the failed checkpointId:%" PRId64 " (transId:%d)", pTask->id.idStr, pInfo->activeId, pInfo->transId); } } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 2a32a1d522..4926dcb69d 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -726,9 +726,10 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S } int32_t streamDispatchStreamBlock(SStreamTask* pTask) { - const char* id = pTask->id.idStr; - int32_t code = 0; - SStreamDataBlock* pBlock = NULL; + const char* id = pTask->id.idStr; + int32_t code = 0; + SStreamDataBlock* pBlock = NULL; + SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; int32_t numOfElems = streamQueueGetNumOfItems(pTask->outputq.queue); if (numOfElems > 0) { @@ -746,10 +747,15 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { return 0; } - if (pTask->chkInfo.pActiveInfo->dispatchTrigger) { - stDebug("s-task:%s already send checkpoint-trigger, no longer dispatch any other data", id); - atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL); - return 0; + if (pInfo->dispatchTrigger) { + if ((pInfo->activeId != 0) && (pInfo->failedId < pInfo->activeId)) { + stDebug("s-task:%s already send checkpoint-trigger, no longer dispatch any other data", id); + atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL); + return 0; + } else { + stDebug("s-task:%s dispatch trigger set, and ignore since current active checkpointId:%" PRId64 " failed", id, + pInfo->activeId); + } } if (pTask->msgInfo.pData != NULL) { @@ -788,8 +794,10 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) { // outputQ should be empty here, otherwise, set the checkpoint failed due to the retrieve req happens if (streamQueueGetNumOfUnAccessedItems(pTask->outputq.queue) > 0) { - stError("s-task:%s items are still in outputQ due to downstream retrieve, failed to init trigger dispatch", - pTask->id.idStr); + stError( + "s-task:%s items are still in outputQ due to downstream retrieve, failed to init and discard " + "checkpoint-trigger dispatch", + pTask->id.idStr); streamTaskSetCheckpointFailed(pTask); clearBufferedDispatchMsg(pTask); continue; @@ -1478,6 +1486,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i int32_t numOfFailed = 0; bool triggerDispatchRsp = false; + taosMsleep(500); + // we only set the dispatch msg info for current checkpoint trans streamMutexLock(&pTask->lock); triggerDispatchRsp = (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) && diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 6d8f90f7f6..355a586e52 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -1190,6 +1190,7 @@ void streamTaskSetFailedChkptInfo(SStreamTask* pTask, int32_t transId, int64_t c pTask->chkInfo.pActiveInfo->transId = transId; pTask->chkInfo.pActiveInfo->activeId = checkpointId; pTask->chkInfo.pActiveInfo->failedId = checkpointId; + stDebug("s-task:%s set failed checkpointId:%"PRId64, pTask->id.idStr, checkpointId); } int32_t streamTaskCreateActiveChkptInfo(SActiveCheckpointInfo** pRes) { @@ -1239,12 +1240,13 @@ void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) { taosMemoryFree(pInfo); } +//NOTE: clear the checkpoint id, and keep the failed id void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo) { - pInfo->activeId = 0; // clear the checkpoint id + pInfo->activeId = 0; pInfo->transId = 0; pInfo->allUpstreamTriggerRecv = 0; pInfo->dispatchTrigger = false; - pInfo->failedId = 0; +// pInfo->failedId = 0; taosArrayClear(pInfo->pDispatchTriggerList); taosArrayClear(pInfo->pCheckpointReadyRecvList); diff --git a/tests/script/tsim/stream/pauseAndResume.sim b/tests/script/tsim/stream/pauseAndResume.sim index c2998dea30..1f4caf5c03 100644 --- a/tests/script/tsim/stream/pauseAndResume.sim +++ b/tests/script/tsim/stream/pauseAndResume.sim @@ -109,7 +109,7 @@ endi print ===== idle for 70 sec for checkpoint gen sleep 70000 -print ===== idle 60 sec completed , continue +print ===== idle 70 sec completed , continue print ===== step 1 over @@ -127,9 +127,12 @@ sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 wate sql_error create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt2 as select _wstart, count(*) c1, sum(a) c3 from t1 interval(10s); sql create stream if not exists streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt2 as select _wstart, count(*) c1, sum(a) c3 from t1 interval(10s); +print start to check stream status + sleep 1000 run tsim/stream/checkTaskStatus.sim +print pause stream2 sql pause stream streams2; sql insert into t1 values(1648791213001,1,12,3,1.0);