diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 341706ff3b..9b5134d449 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -109,8 +109,6 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq); int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId); -int32_t streamTaskBuildCheckpoint(SStreamTask* pTask); -int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId); int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet); int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t srcTaskId, int32_t index, int64_t checkpointId); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 2a766f21ec..b54adb0f96 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -114,6 +114,7 @@ static int32_t streamAlignCheckpoint(SStreamTask* pTask) { return atomic_sub_fetch_32(&pTask->chkInfo.downstreamAlignNum, 1); } +// todo handle down the transId of checkpoint to sink/agg tasks. static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType) { SStreamDataBlock* pChkpoint = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock)); if (pChkpoint == NULL) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 9e20a2997c..8b2330fda0 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1102,7 +1102,10 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) { entry.checkpointFailed = ((*pTask)->chkInfo.failedId >= (*pTask)->chkInfo.checkpointingId); entry.activeCheckpointId = (*pTask)->chkInfo.checkpointingId; entry.chkpointTransId = (*pTask)->chkInfo.transId; - stInfo("s-task:%s send kill checkpoint trans info, transId:%d", (*pTask)->id.idStr, (*pTask)->chkInfo.transId); + + if (entry.checkpointFailed) { + stInfo("s-task:%s send kill checkpoint trans info, transId:%d", (*pTask)->id.idStr, (*pTask)->chkInfo.transId); + } } if ((*pTask)->exec.pWalReader != NULL) { diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 267dadf109..ea5e2edc09 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -379,10 +379,11 @@ int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask) { stDebug("s-task:%s enter into scan-history data stage, status:%s", id, p); streamTaskStartScanHistory(pTask); - // start the related fill-history task, when current task is ready - if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { - streamLaunchFillHistoryTask(pTask); - } + // NOTE: there will be an deadlock if launch fill history here. +// // start the related fill-history task, when current task is ready +// if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { +// streamLaunchFillHistoryTask(pTask); +// } return TSDB_CODE_SUCCESS; } @@ -400,6 +401,13 @@ void doProcessDownstreamReadyRsp(SStreamTask* pTask) { int64_t initTs = pTask->execInfo.init; int64_t startTs = pTask->execInfo.start; streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, initTs, startTs, true); + + // start the related fill-history task, when current task is ready + // not invoke in success callback due to the deadlock. + if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { + stDebug("s-task:%s try to launch related fill-history task", pTask->id.idStr); + streamLaunchFillHistoryTask(pTask); + } } static void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) {