From 87e543c8240655c9f14882c963316ffc27730dfd Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 1 Feb 2024 17:14:36 +0800 Subject: [PATCH] fix(stream): pass down the transId of checkpoint, to make sure the downstream task can report the error transId successfully. --- source/libs/stream/src/streamCheckpoint.c | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 50a010d779..f45904f036 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -116,7 +116,6 @@ static int32_t streamAlignCheckpoint(SStreamTask* pTask) { return atomic_sub_fetch_32(&pTask->chkInfo.downstreamAlignNum, 1); } -// todo handle down the transId of checkpoint to sink/agg tasks. static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType) { SStreamDataBlock* pChkpoint = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock)); if (pChkpoint == NULL) { @@ -133,6 +132,7 @@ static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpoint pBlock->info.type = STREAM_CHECKPOINT; pBlock->info.version = pTask->chkInfo.checkpointingId; + pBlock->info.window.ekey = pBlock->info.window.skey = pTask->chkInfo.transId; // NOTE: set the transId pBlock->info.rows = 1; pBlock->info.childId = pTask->info.selfChildId; @@ -185,17 +185,20 @@ static int32_t continueDispatchCheckpointBlock(SStreamDataBlock* pBlock, SStream int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) { SSDataBlock* pDataBlock = taosArrayGet(pBlock->blocks, 0); int64_t checkpointId = pDataBlock->info.version; + int32_t transId = pDataBlock->info.window.skey; const char* id = pTask->id.idStr; int32_t code = TSDB_CODE_SUCCESS; int32_t vgId = pTask->pMeta->vgId; stDebug("s-task:%s vgId:%d start to handle the checkpoint block, checkpointId:%" PRId64 " ver:%" PRId64 - ", current checkpointingId:%" PRId64, - id, vgId, pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer, checkpointId); + ", transId:%d current checkpointingId:%" PRId64, + id, vgId, pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer, transId, checkpointId); // set task status if (streamTaskGetStatus(pTask)->state != TASK_STATUS__CK) { pTask->chkInfo.checkpointingId = checkpointId; + pTask->chkInfo.transId = transId; + code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT); if (code != TSDB_CODE_SUCCESS) { stError("s-task:%s handle checkpoint-trigger block failed, code:%s", id, tstrerror(code));