From 79be59d20c21f26385b16170d610d0c886adf6f5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 28 Jun 2024 16:25:59 +0800 Subject: [PATCH 1/9] fix(stream): reset the status before re-send data. --- source/libs/stream/src/streamDispatch.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 5595085197..789cb5cbcf 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -416,6 +416,7 @@ static void setResendInfo(SDispatchEntry* pEntry, int64_t now) { pEntry->sendTs = now; pEntry->rspTs = -1; pEntry->retryCount += 1; + pEntry->status = TSDB_CODE_SUCCESS; } static void addDispatchEntry(SDispatchMsgInfo* pMsgInfo, int32_t nodeId, int64_t now, bool lock) { From 707fe02885c7b9515a5de2b65376b5e5eda280ab Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 28 Jun 2024 16:30:12 +0800 Subject: [PATCH 2/9] fix(stream):set the initial transId --- source/dnode/vnode/src/tqCommon/tqCommon.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 624257d4c6..0a7870ec49 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -171,7 +171,11 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM SStreamTask* pTask = *ppTask; const char* idstr = pTask->id.idStr; - if ((pMeta->updateInfo.transId != req.transId) && (pMeta->updateInfo.transId != -1)) { + if (pMeta->updateInfo.transId == -1) { // info needs to be kept till the new trans to update the nodeEp arrived. + streamMetaInitUpdateTaskList(pMeta, req.transId); + } + + if (pMeta->updateInfo.transId != req.transId) { if (req.transId < pMeta->updateInfo.transId) { tqError("s-task:%s vgId:%d disorder update nodeEp msg recv, discarded, newest transId:%d, recv:%d", idstr, vgId, pMeta->updateInfo.transId, req.transId); From 1ffec769b8d10205899d49817222b4dd929fed8b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 28 Jun 2024 17:03:14 +0800 Subject: [PATCH 3/9] fix(stream): clear the msgId if send success, and handle the race condition problem. --- source/libs/stream/src/streamDispatch.c | 41 +++++++++++++++---------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 789cb5cbcf..83e73e8c88 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -40,6 +40,20 @@ void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) { pMsg->contLen = contLen; } +static void initDispatchInfo(SDispatchMsgInfo* pInfo, int32_t msgId) { + pInfo->startTs = taosGetTimestampMs(); + pInfo->rspTs = -1; + pInfo->msgId = msgId; +} + +static void clearDispatchInfo(SDispatchMsgInfo* pInfo) { + pInfo->startTs = -1; + pInfo->msgId = -1; + pInfo->rspTs = -1; +} + +static void updateDispatchInfo(SDispatchMsgInfo* pInfo, int64_t recvTs) { pInfo->rspTs = recvTs; } + static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId, int32_t numOfBlocks, int64_t dstTaskId, int32_t type) { pReq->streamId = pTask->id.streamId; @@ -225,12 +239,15 @@ void clearBufferedDispatchMsg(SStreamTask* pTask) { destroyDispatchMsg(pMsgInfo->pData, streamTaskGetNumOfDownstream(pTask)); } + taosThreadMutexLock(&pMsgInfo->lock); + pMsgInfo->checkpointId = -1; pMsgInfo->transId = -1; pMsgInfo->pData = NULL; pMsgInfo->dispatchMsgType = 0; - taosThreadMutexLock(&pMsgInfo->lock); + clearDispatchInfo(pMsgInfo); + taosArrayClear(pTask->msgInfo.pSendInfo); taosThreadMutexUnlock(&pMsgInfo->lock); } @@ -643,20 +660,6 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S return 0; } -static void initDispatchInfo(SDispatchMsgInfo* pInfo, int32_t msgId) { - pInfo->startTs = taosGetTimestampMs(); - pInfo->rspTs = -1; - pInfo->msgId = msgId; -} - -static void clearDispatchInfo(SDispatchMsgInfo* pInfo) { - pInfo->startTs = -1; - pInfo->msgId = -1; - pInfo->rspTs = -1; -} - -static void updateDispatchInfo(SDispatchMsgInfo* pInfo, int64_t recvTs) { pInfo->rspTs = recvTs; } - int32_t streamDispatchStreamBlock(SStreamTask* pTask) { ASSERT((pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH)); @@ -699,7 +702,10 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { type == STREAM_INPUT__TRANS_STATE); pTask->execInfo.dispatch += 1; + + taosThreadMutexLock(&pTask->msgInfo.lock); initDispatchInfo(&pTask->msgInfo, pTask->execInfo.dispatch); + taosThreadMutexUnlock(&pTask->msgInfo.lock); int32_t code = doBuildDispatchMsg(pTask, pBlock); if (code == 0) { @@ -1222,10 +1228,13 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i const char* id = pTask->id.idStr; int32_t vgId = pTask->pMeta->vgId; SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo; - int32_t msgId = pMsgInfo->msgId; int64_t now = taosGetTimestampMs(); int32_t totalRsp = 0; + taosThreadMutexLock(&pMsgInfo->lock); + int32_t msgId = pMsgInfo->msgId; + taosThreadMutexUnlock(&pMsgInfo->lock); + // follower not handle the dispatch rsp if ((pTask->pMeta->role == NODE_ROLE_FOLLOWER) || (pTask->status.downstreamReady != 1)) { stError("s-task:%s vgId:%d is follower or task just re-launched, not handle the dispatch rsp, discard it", id, From 5585a141d53ffff9940027d248932c9eccc88bda Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 28 Jun 2024 23:16:03 +0800 Subject: [PATCH 4/9] fix(stream): set the failed id before clear the checkpoint info. --- source/libs/stream/src/streamCheckpoint.c | 4 ++-- source/libs/stream/src/streamMeta.c | 1 + source/libs/stream/src/streamTask.c | 1 - 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 52265963bb..b5e27fde87 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -707,11 +707,11 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { } } else { // clear the checkpoint info if failed taosThreadMutexLock(&pTask->lock); - streamTaskClearCheckInfo(pTask, false); + streamTaskSetFailedCheckpointId(pTask); // set failed checkpoint id before clear the checkpoint info + streamTaskClearCheckInfo(pTask, true); taosThreadMutexUnlock(&pTask->lock); code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE); - streamTaskSetFailedCheckpointId(pTask); stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%" PRId64, id, ckId); } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 08e7c97150..bc07d1811a 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1063,6 +1063,7 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) { stInfo("s-task:%s set kill checkpoint trans in hb, transId:%d", (*pTask)->id.idStr, (*pTask)->chkInfo.pActiveInfo->transId); } + (*pTask)->chkInfo.pActiveInfo->failedId = 0; } if ((*pTask)->exec.pWalReader != NULL) { diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 8abac4ce85..41dd4d7f26 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -1039,7 +1039,6 @@ void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) { void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo) { pInfo->activeId = 0; // clear the checkpoint id - pInfo->failedId = 0; pInfo->transId = 0; pInfo->allUpstreamTriggerRecv = 0; pInfo->dispatchTrigger = false; From e74b8473f1035b0af5597327909efc2c1318a968 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 28 Jun 2024 23:40:05 +0800 Subject: [PATCH 5/9] fix(stream): clear the active checkpoint info after report to mnode. --- source/libs/stream/src/streamCheckpoint.c | 1 - source/libs/stream/src/streamMeta.c | 19 +++++++++++-------- source/libs/stream/src/streamTask.c | 1 + 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index b5e27fde87..4666cec4b6 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -708,7 +708,6 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { } else { // clear the checkpoint info if failed taosThreadMutexLock(&pTask->lock); streamTaskSetFailedCheckpointId(pTask); // set failed checkpoint id before clear the checkpoint info - streamTaskClearCheckInfo(pTask, true); taosThreadMutexUnlock(&pTask->lock); code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index bc07d1811a..a7f73d1b52 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1053,17 +1053,20 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) { entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize); } - if ((*pTask)->chkInfo.pActiveInfo->activeId != 0) { - entry.checkpointInfo.failed = - ((*pTask)->chkInfo.pActiveInfo->failedId >= (*pTask)->chkInfo.pActiveInfo->activeId) ? 1 : 0; - entry.checkpointInfo.activeId = (*pTask)->chkInfo.pActiveInfo->activeId; - entry.checkpointInfo.activeTransId = (*pTask)->chkInfo.pActiveInfo->transId; + SActiveCheckpointInfo* p = (*pTask)->chkInfo.pActiveInfo; + if (p->activeId != 0) { + entry.checkpointInfo.failed = (p->failedId >= p->activeId) ? 1 : 0; + entry.checkpointInfo.activeId = p->activeId; + entry.checkpointInfo.activeTransId = p->transId; if (entry.checkpointInfo.failed) { - stInfo("s-task:%s set kill checkpoint trans in hb, transId:%d", (*pTask)->id.idStr, - (*pTask)->chkInfo.pActiveInfo->transId); + stInfo("s-task:%s set kill checkpoint trans in hb, transId:%d, clear the active checkpointInfo", + (*pTask)->id.idStr, p->transId); + + taosThreadMutexLock(&(*pTask)->lock); + streamTaskClearCheckInfo((*pTask), true); + taosThreadMutexUnlock(&(*pTask)->lock); } - (*pTask)->chkInfo.pActiveInfo->failedId = 0; } if ((*pTask)->exec.pWalReader != NULL) { diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 41dd4d7f26..ff020e88c9 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -1042,6 +1042,7 @@ void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo) { pInfo->transId = 0; pInfo->allUpstreamTriggerRecv = 0; pInfo->dispatchTrigger = false; + pInfo->failedId = 0; taosArrayClear(pInfo->pDispatchTriggerList); taosArrayClear(pInfo->pCheckpointReadyRecvList); From 1f8991ec91830f1b7bc1a4e1b44762d00451f776 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 29 Jun 2024 00:40:57 +0800 Subject: [PATCH 6/9] fix(stream): set the correct rsp msg position. --- source/libs/stream/src/streamCheckpoint.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 4666cec4b6..0e55e18248 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -109,8 +109,8 @@ int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTri int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId, SRpcHandleInfo* pRpcInfo, int32_t code) { int32_t size = sizeof(SMsgHead) + sizeof(SCheckpointTriggerRsp); + void* pBuf = rpcMallocCont(size); - void* pBuf = rpcMallocCont(size); SCheckpointTriggerRsp* pRsp = POINTER_SHIFT(pBuf, sizeof(SMsgHead)); ((SMsgHead*)pBuf)->vgId = htonl(downstreamNodeId); @@ -129,7 +129,7 @@ int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId pRsp->rspCode = code; - SRpcMsg rspMsg = {.code = 0, .pCont = pRsp, .contLen = size, .info = *pRpcInfo}; + SRpcMsg rspMsg = {.code = 0, .pCont = pBuf, .contLen = size, .info = *pRpcInfo}; tmsgSendRsp(&rspMsg); return 0; } From 79085015d6ddca99d2c808001c0e459f53e8cadf Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 29 Jun 2024 00:49:02 +0800 Subject: [PATCH 7/9] fix(stream): set correct msg start position. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 0a7870ec49..40da0f0320 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -941,7 +941,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) } int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { - SCheckpointTriggerRsp* pRsp = pMsg->pCont; + SCheckpointTriggerRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); SStreamTask* pTask = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->taskId); if (pTask == NULL) { From 398a1b08acb61e2c234dcf741dbcb92461ee14f1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 29 Jun 2024 00:39:51 +0800 Subject: [PATCH 8/9] fix(stream): set the correct rsp msg start position. --- source/libs/stream/src/streamCheckpoint.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 0e55e18248..078c6fc730 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -118,6 +118,7 @@ int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId pRsp->streamId = pTask->id.streamId; pRsp->upstreamTaskId = pTask->id.taskId; pRsp->taskId = dstTaskId; + pRsp->rspCode = code; if (code == TSDB_CODE_SUCCESS) { pRsp->checkpointId = pTask->chkInfo.pActiveInfo->activeId; @@ -127,8 +128,6 @@ int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId pRsp->transId = -1; } - pRsp->rspCode = code; - SRpcMsg rspMsg = {.code = 0, .pCont = pBuf, .contLen = size, .info = *pRpcInfo}; tmsgSendRsp(&rspMsg); return 0; From 5c002e4bbea9c57a28c98db530da7cb978e635fd Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 30 Jun 2024 00:06:11 +0800 Subject: [PATCH 9/9] fix(stream):set the srcTaskId for checkpoint-trigger block --- source/libs/stream/src/streamCheckpoint.c | 28 ++++++++++++----------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 078c6fc730..78bd5c4511 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -25,15 +25,12 @@ static int32_t deleteCheckpoint(const char* id); static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName); static int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask); static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, - int32_t transId); + int32_t transId, int32_t srcTaskId); static int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList); static void checkpointTriggerMonitorFn(void* param, void* tmrId); -static SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, - int32_t transId); - SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, - int32_t transId) { + int32_t transId, int32_t srcTaskId) { SStreamDataBlock* pChkpoint = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock)); if (pChkpoint == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -41,6 +38,10 @@ SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpoint } pChkpoint->type = checkpointType; + if (checkpointType == STREAM_INPUT__CHECKPOINT_TRIGGER && (pTask->info.taskLevel != TASK_LEVEL__SOURCE)) { + pChkpoint->srcTaskId = srcTaskId; + ASSERT(srcTaskId != 0); + } SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); if (pBlock == NULL) { @@ -64,8 +65,9 @@ SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpoint return pChkpoint; } -int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId) { - SStreamDataBlock* pCheckpoint = createChkptTriggerBlock(pTask, checkpointType, checkpointId, transId); +int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId, + int32_t srcTaskId) { + SStreamDataBlock* pCheckpoint = createChkptTriggerBlock(pTask, checkpointType, checkpointId, transId, srcTaskId); if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pCheckpoint) < 0) { return TSDB_CODE_OUT_OF_MEMORY; @@ -90,7 +92,7 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo // 2. Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task // and this is the last item in the inputQ. - return appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pReq->checkpointId, pReq->transId); + return appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pReq->checkpointId, pReq->transId, -1); } int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTriggerRsp* pRsp) { @@ -102,7 +104,8 @@ int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTri return TSDB_CODE_SUCCESS; } - appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pRsp->checkpointId, pRsp->transId); + appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pRsp->checkpointId, pRsp->transId, + pRsp->upstreamTaskId); return TSDB_CODE_SUCCESS; } @@ -266,7 +269,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId); continueDispatchCheckpointTriggerBlock(pBlock, pTask); } else { // only one task exists, no need to dispatch downstream info - appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, pActiveInfo->activeId, pActiveInfo->transId); + appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, pActiveInfo->activeId, pActiveInfo->transId, -1); streamFreeQitem((SStreamQueueItem*)pBlock); } } else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) { @@ -363,9 +366,8 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId taosThreadMutexUnlock(&pInfo->lock); if (notReady == 0) { - stDebug("s-task:%s all downstream task(s) have completed build checkpoint, start to do checkpoint for current task", - id); - appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, checkpointId, transId); + stDebug("s-task:%s all downstream tasks have completed build checkpoint, do checkpoint for current task", id); + appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, checkpointId, transId, -1); } return 0;