From 7eb0e42bb9906fe0c11df5d28ddae2af193b0fb2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 26 Dec 2023 19:02:52 +0800 Subject: [PATCH 1/3] fix(stream): handle error when checkpoint is interrupted by nodeUpdate. --- source/dnode/vnode/src/tq/tq.c | 4 ++-- source/libs/stream/src/streamDispatch.c | 2 +- source/libs/stream/src/streamExec.c | 25 ++++++++++++++++++++++--- 3 files changed, 25 insertions(+), 6 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index c8cc4f9d54..33f937a909 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1294,8 +1294,8 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) if (status == TASK_STATUS__CK) { ASSERT(pTask->chkInfo.checkpointingId == req.checkpointId); tqWarn("s-task:%s recv checkpoint-source msg again checkpointId:%" PRId64 - " already received, ignore this msg and continue process checkpoint", - pTask->id.idStr, pTask->chkInfo.checkpointingId); + " transId:%d already received, ignore this msg and continue process checkpoint", + pTask->id.idStr, pTask->chkInfo.checkpointingId, req.transId); taosThreadMutexUnlock(&pTask->lock); streamMetaReleaseTask(pMeta, pTask); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index bfa269b0d5..00a8940b6a 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -746,7 +746,7 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) { taosArrayClear(pTask->pReadyMsgList); stDebug("s-task:%s level:%d source checkpoint completed msg sent to mnode", pTask->id.idStr, pTask->info.taskLevel); } else { - stDebug("s-task:%s level:%d already send rsp to mnode", pTask->id.idStr, pTask->info.taskLevel); + stDebug("s-task:%s level:%d already send rsp checkpoint success to mnode", pTask->id.idStr, pTask->info.taskLevel); } taosThreadMutexUnlock(&pTask->lock); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 25f32195be..5822b18bc3 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -625,10 +625,29 @@ int32_t doStreamExecTask(SStreamTask* pTask) { // todo other thread may change the status // do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed. if (type == STREAM_INPUT__CHECKPOINT) { + + // todo add lock char* p = NULL; - streamTaskGetStatus(pTask, &p); - stDebug("s-task:%s checkpoint block received, set status:%s", pTask->id.idStr, p); - streamTaskBuildCheckpoint(pTask); + ETaskStatus s = streamTaskGetStatus(pTask, &p); + if (s == TASK_STATUS__CK) { + stDebug("s-task:%s checkpoint block received, set status:%s", pTask->id.idStr, p); + streamTaskBuildCheckpoint(pTask); + } else { + // todo refactor + int32_t code = 0; + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + code = streamTaskSendCheckpointSourceRsp(pTask); + } else { + code = streamTaskSendCheckpointReadyMsg(pTask); + } + + if (code != TSDB_CODE_SUCCESS) { + // todo: let's retry send rsp to upstream/mnode + stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s", pTask->id.idStr, + 0, tstrerror(code)); + } + } + return 0; } } From 646f24659e94336b67906c4f7181e0d76b2e7fd6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 26 Dec 2023 19:16:49 +0800 Subject: [PATCH 2/3] fix(stream): fix syntax error. --- source/libs/stream/src/streamExec.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 5822b18bc3..5b4f603f87 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -643,7 +643,7 @@ int32_t doStreamExecTask(SStreamTask* pTask) { if (code != TSDB_CODE_SUCCESS) { // todo: let's retry send rsp to upstream/mnode - stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s", pTask->id.idStr, + stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%d, code:%s", pTask->id.idStr, 0, tstrerror(code)); } } From 1659164926c8c862819be06dfced8af8142c27d6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 27 Dec 2023 23:00:18 +0800 Subject: [PATCH 3/3] fix(stream): remove invalid checkpoint id check. --- source/dnode/vnode/src/tq/tq.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 33f937a909..58f1fe954a 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1292,7 +1292,6 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) // check if the checkpoint msg already sent or not. if (status == TASK_STATUS__CK) { - ASSERT(pTask->chkInfo.checkpointingId == req.checkpointId); tqWarn("s-task:%s recv checkpoint-source msg again checkpointId:%" PRId64 " transId:%d already received, ignore this msg and continue process checkpoint", pTask->id.idStr, pTask->chkInfo.checkpointingId, req.transId);