From 7d5f76c6f1cf35660fcd2e3537ea62b16c1f4cec Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 23 Sep 2023 11:31:14 +0800 Subject: [PATCH] fix(stream): disable auto rsp for checkpoint source. --- source/dnode/vnode/src/inc/vnodeInt.h | 2 +- source/dnode/vnode/src/tq/tq.c | 5 ++++- source/dnode/vnode/src/vnd/vnodeSvr.c | 2 +- source/libs/stream/src/streamDispatch.c | 1 - 4 files changed, 6 insertions(+), 4 deletions(-) diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 39f3d465f2..a29d595ef7 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -224,7 +224,7 @@ int tqPushMsg(STQ*, tmsg_t msgType); int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg); int tqUnregisterPushHandle(STQ* pTq, void* pHandle); int tqScanWalAsync(STQ* pTq, bool ckPause); -int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp); int32_t tqProcessStreamTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index b202dce8b3..840291a467 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1603,13 +1603,16 @@ FAIL: } // todo error code cannot be return, since this is invoked by an mnode-launched transaction. -int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) { +int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) { int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t len = pMsg->contLen - sizeof(SMsgHead); int32_t code = 0; + // disable auto rsp to source + pRsp->info.handle = NULL; + SStreamCheckpointSourceReq req = {0}; if (!vnodeIsRoleLeader(pTq->pVnode)) { tqDebug("vgId:%d not leader, ignore checkpoint-source msg", vgId); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index e8afc57e85..8ab1a40d4e 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -602,7 +602,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg vnodeProcessDropIndexReq(pVnode, ver, pReq, len, pRsp); break; case TDMT_VND_STREAM_CHECK_POINT_SOURCE: - tqProcessStreamCheckPointSourceReq(pVnode->pTq, pMsg); + tqProcessStreamCheckPointSourceReq(pVnode->pTq, pMsg, pRsp); break; case TDMT_VND_STREAM_TASK_UPDATE: tqProcessTaskUpdateReq(pVnode->pTq, pMsg); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 5977d31e0d..e98c115e7c 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -854,7 +854,6 @@ int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInf } ((SMsgHead*)pBuf)->vgId = htonl(pReq->mnodeId); - void* abuf = POINTER_SHIFT(pBuf, sizeof(SMsgHead)); tEncoderInit(&encoder, (uint8_t*)abuf, len);