diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index be3da64c6a..08d0a5e486 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -194,7 +194,7 @@ void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo); int32_t streamTaskResetTimewindowFilter(SStreamTask* pTask); void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo); -void streamClearChkptReadyMsg(SStreamTask* pTask); +void streamClearChkptReadyMsg(SActiveCheckpointInfo* pActiveInfo); EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, int32_t* blockSize); int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 8e97d05f7f..ad33d44b15 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -405,7 +405,7 @@ void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) { streamTaskClearActiveInfo(pTask->chkInfo.pActiveInfo); streamTaskOpenAllUpstreamInput(pTask); // open inputQ for all upstream tasks if (clearChkpReadyMsg) { - streamClearChkptReadyMsg(pTask); + streamClearChkptReadyMsg(pTask->chkInfo.pActiveInfo); } } @@ -696,14 +696,16 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { // TODO: monitoring the checkpoint-report msg // update the latest checkpoint info if all works are done successfully, for rsma, the pMsgCb is null. - if (code == TSDB_CODE_SUCCESS && (pTask->pMsgCb != NULL)) { - code = streamSendChkptReportMsg(pTask, &pTask->chkInfo, dropRelHTask); - } else { // clear the checkpoint info if failed + if (code == TSDB_CODE_SUCCESS) { + if (pTask->pMsgCb != NULL) { + code = streamSendChkptReportMsg(pTask, &pTask->chkInfo, dropRelHTask); + } + } else { // clear the checkpoint info if failed taosThreadMutexLock(&pTask->lock); streamTaskClearCheckInfo(pTask, false); - code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE); taosThreadMutexUnlock(&pTask->lock); + code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE); streamTaskSetFailedCheckpointId(pTask); stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%" PRId64, id, ckId); } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index afce6e47c4..0f5559df89 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -856,7 +856,7 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) { "and quit from timer, ref:%d", id, vgId, ref); - streamClearChkptReadyMsg(pTask); + streamClearChkptReadyMsg(pActiveInfo); taosThreadMutexUnlock(&pActiveInfo->lock); streamMetaReleaseTask(pTask->pMeta, pTask); } @@ -1128,8 +1128,7 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId, return 0; } -void streamClearChkptReadyMsg(SStreamTask* pTask) { - SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; +void streamClearChkptReadyMsg(SActiveCheckpointInfo* pActiveInfo) { if (pActiveInfo == NULL) { return; } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 4a6e98f4a0..70e3790209 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -254,7 +254,7 @@ void tFreeStreamTask(SStreamTask* pTask) { walCloseReader(pTask->exec.pWalReader); } - streamClearChkptReadyMsg(pTask); + streamClearChkptReadyMsg(pTask->chkInfo.pActiveInfo); if (pTask->msgInfo.pData != NULL) { clearBufferedDispatchMsg(pTask);