From ce1a3a379c329fd1a0d0891626a257750d08e115 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 25 Aug 2023 10:12:12 +0800 Subject: [PATCH] fix(stream): handle the case when the task not in ready state, and the checkpoint msg arrived. --- include/libs/stream/tstream.h | 7 +++-- source/dnode/vnode/src/tq/tq.c | 32 +++++++++++++++-------- source/libs/stream/src/streamCheckpoint.c | 2 ++ source/libs/stream/src/streamDispatch.c | 27 ++++++++++++------- 4 files changed, 45 insertions(+), 23 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index fdaf02864a..a59d7897ed 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -526,6 +526,7 @@ typedef struct { int32_t nodeId; int32_t mnodeId; int64_t expireTime; + int8_t success; } SStreamCheckpointSourceRsp; int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq); @@ -726,8 +727,10 @@ int32_t streamTaskReleaseState(SStreamTask* pTask); int32_t streamTaskReloadState(SStreamTask* pTask); int32_t streamAlignTransferState(SStreamTask* pTask); -int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask); - +int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, + SStreamTask* pTask, int8_t isSucceed); +int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg, + int8_t isSucceed); #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 1011a0cbfb..53764181c2 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1493,17 +1493,19 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) { int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t vgId = pTq->pStreamMeta->vgId; - int32_t taskId = htonl(pRsp->upstreamTaskId); - int64_t streamId = htobe64(pRsp->streamId); - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, streamId, taskId); + int32_t vgId = pTq->pStreamMeta->vgId; + pRsp->upstreamTaskId = htonl(pRsp->upstreamTaskId); + pRsp->streamId = htobe64(pRsp->streamId); + pRsp->downstreamTaskId = htonl(pRsp->downstreamTaskId); + pRsp->downstreamNodeId = htonl(pRsp->downstreamNodeId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pRsp->streamId, pRsp->upstreamTaskId); if (pTask) { streamProcessDispatchRsp(pTask, pRsp, pMsg->code); streamMetaReleaseTask(pTq->pStreamMeta, pTask); return TSDB_CODE_SUCCESS; } else { - tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, taskId); + tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, pRsp->upstreamTaskId); terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST; return terrno; } @@ -1719,6 +1721,7 @@ FAIL: int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; } +// todo error code cannot be return, since this is invoked by an mnode-launched transaction. int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) { int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; @@ -1738,7 +1741,6 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) { } tDecoderClear(&decoder); - // todo handle the case when the task not in ready state, and the checkpoint msg is arrived. SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId); if (pTask == NULL) { tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. it may have been destroyed already", vgId, @@ -1746,14 +1748,22 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) { return TSDB_CODE_SUCCESS; } + // downstream not ready, current the stream tasks are not all ready. Ignore this checkpoint req. + if (pTask->status.downstreamReady != 1) { + qError("s-task:%s not ready for checkpoint, since downstream not ready, ignore this checkpoint:%" PRId64 + ", set it failure", pTask->id.idStr, req.checkpointId); + + SRpcMsg rsp = {0}; + buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); + tmsgSendRsp(&rsp); // error occurs + return TSDB_CODE_SUCCESS; + } + int32_t total = 0; taosWLockLatch(&pMeta->lock); // set the initial value for generating check point - // set the mgmt epset info according to the checkout source msg from mnode, todo opt perf - // pMeta->mgmtInfo.epset = req.mgmtEps; - // pMeta->mgmtInfo.mnodeId = req.mnodeId; - + // set the mgmt epset info according to the checkout source msg from mnode, todo update mgmt epset if needed if (pMeta->chkptNotReadyTasks == 0) { pMeta->chkptNotReadyTasks = taosArrayGetSize(pMeta->pTaskList); } @@ -1764,7 +1774,7 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) { qDebug("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg, chkpt:%" PRId64 ", total checkpoint req:%d", pTask->id.idStr, vgId, pTask->info.taskLevel, req.checkpointId, total); - code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask); + code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask, 1); if (code != TSDB_CODE_SUCCESS) { return code; } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 56f8549b36..37402a3c78 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -48,6 +48,7 @@ int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckp if (tEncodeI32(pEncoder, pRsp->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->nodeId) < 0) return -1; if (tEncodeI64(pEncoder, pRsp->expireTime) < 0) return -1; + if (tEncodeI8(pEncoder, pRsp->success) < 0) return -1; tEndEncode(pEncoder); return pEncoder->pos; } @@ -59,6 +60,7 @@ int32_t tDecodeStreamCheckpointSourceRsp(SDecoder* pDecoder, SStreamCheckpointSo if (tDecodeI32(pDecoder, &pRsp->taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->nodeId) < 0) return -1; if (tDecodeI64(pDecoder, &pRsp->expireTime) < 0) return -1; + if (tDecodeI8(pDecoder, &pRsp->success) < 0) return -1; tEndDecode(pDecoder); return 0; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 355e0103d6..a41360fbd1 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -28,9 +28,9 @@ typedef struct SBlockName { } SBlockName; typedef struct { - int32_t taskId; + int32_t upStreamTaskId; + SEpSet upstreamNodeEpset; SRpcMsg msg; - SEpSet epset; } SStreamChkptReadyInfo; static void doRetryDispatchData(void* param, void* tmrId); @@ -587,9 +587,10 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) { for (int32_t i = 0; i < num; ++i) { SStreamChkptReadyInfo* pInfo = taosArrayGet(pTask->pReadyMsgList, i); - tmsgSendReq(&pInfo->epset, &pInfo->msg); + tmsgSendReq(&pInfo->upstreamNodeEpset, &pInfo->msg); + qDebug("s-task:%s level:%d checkpoint ready msg sent to upstream:0x%x", pTask->id.idStr, pTask->info.taskLevel, - pInfo->taskId); + pInfo->upStreamTaskId); } taosArrayClear(pTask->pReadyMsgList); @@ -724,8 +725,8 @@ FAIL: return code; } -int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, - SStreamTask* pTask) { +int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg, + int8_t isSucceed) { int32_t len = 0; int32_t code = 0; SEncoder encoder; @@ -737,6 +738,7 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa .streamId = pReq->streamId, .expireTime = pReq->expireTime, .mnodeId = pReq->mnodeId, + .success = isSucceed, }; tEncodeSize(tEncodeStreamCheckpointSourceRsp, &rsp, len, code); @@ -757,16 +759,21 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa tEncodeStreamCheckpointSourceRsp(&encoder, &rsp); tEncoderClear(&encoder); + initRpcMsg(pMsg, 0, pBuf, sizeof(SMsgHead) + len); + pMsg->info = *pRpcInfo; + return 0; +} + +int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, + SStreamTask* pTask, int8_t isSucceed) { SStreamChkptReadyInfo info = {0}; - initRpcMsg(&info.msg, 0, pBuf, sizeof(SMsgHead) + len); - info.msg.info = *pRpcInfo; + buildCheckpointSourceRsp(pReq, pRpcInfo, &info.msg, isSucceed); if (pTask->pReadyMsgList == NULL) { pTask->pReadyMsgList = taosArrayInit(4, sizeof(SStreamChkptReadyInfo)); } taosArrayPush(pTask->pReadyMsgList, &info); - qDebug("s-task:%s add checkpoint source rsp msg, total:%d", pTask->id.idStr, (int32_t)taosArrayGetSize(pTask->pReadyMsgList)); return TSDB_CODE_SUCCESS; } @@ -813,7 +820,7 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId, ASSERT(req.upstreamTaskId != 0); - SStreamChkptReadyInfo info = {.taskId = pInfo->taskId, .epset = pInfo->epSet}; + SStreamChkptReadyInfo info = {.upStreamTaskId = pInfo->taskId, .upstreamNodeEpset = pInfo->epSet}; initRpcMsg(&info.msg, TDMT_STREAM_TASK_CHECKPOINT_READY, buf, tlen + sizeof(SMsgHead)); info.msg.info.noResp = 1; // refactor later.