fix(stream): pass down the transId of checkpoint, to make sure the downstream task can report the error transId successfully.

This commit is contained in:
Haojun Liao 2024-02-01 17:14:36 +08:00
parent 88f246b988
commit 87e543c824
1 changed files with 6 additions and 3 deletions

View File

@ -116,7 +116,6 @@ static int32_t streamAlignCheckpoint(SStreamTask* pTask) {
return atomic_sub_fetch_32(&pTask->chkInfo.downstreamAlignNum, 1);
}
// todo handle down the transId of checkpoint to sink/agg tasks.
static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType) {
SStreamDataBlock* pChkpoint = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock));
if (pChkpoint == NULL) {
@ -133,6 +132,7 @@ static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpoint
pBlock->info.type = STREAM_CHECKPOINT;
pBlock->info.version = pTask->chkInfo.checkpointingId;
pBlock->info.window.ekey = pBlock->info.window.skey = pTask->chkInfo.transId; // NOTE: set the transId
pBlock->info.rows = 1;
pBlock->info.childId = pTask->info.selfChildId;
@ -185,17 +185,20 @@ static int32_t continueDispatchCheckpointBlock(SStreamDataBlock* pBlock, SStream
int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
SSDataBlock* pDataBlock = taosArrayGet(pBlock->blocks, 0);
int64_t checkpointId = pDataBlock->info.version;
int32_t transId = pDataBlock->info.window.skey;
const char* id = pTask->id.idStr;
int32_t code = TSDB_CODE_SUCCESS;
int32_t vgId = pTask->pMeta->vgId;
stDebug("s-task:%s vgId:%d start to handle the checkpoint block, checkpointId:%" PRId64 " ver:%" PRId64
", current checkpointingId:%" PRId64,
id, vgId, pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer, checkpointId);
", transId:%d current checkpointingId:%" PRId64,
id, vgId, pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer, transId, checkpointId);
// set task status
if (streamTaskGetStatus(pTask)->state != TASK_STATUS__CK) {
pTask->chkInfo.checkpointingId = checkpointId;
pTask->chkInfo.transId = transId;
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s handle checkpoint-trigger block failed, code:%s", id, tstrerror(code));