diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index c1c10b2fc2..e3dbaa8803 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -485,6 +485,10 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe return code; } +typedef struct SMStreamCheckpointReadyRspMsg { + int8_t placeholder; +}SMStreamCheckpointReadyRspMsg; + int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) { int32_t vgId = pMeta->vgId; char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); @@ -513,6 +517,18 @@ int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) streamProcessCheckpointReadyMsg(pTask); streamMetaReleaseTask(pMeta, pTask); + + { // send checkpoint ready rsp + SRpcMsg rsp = {.code = 0, .info = pMsg->info, .contLen = sizeof(SMStreamCheckpointReadyRspMsg)}; + rsp.pCont = rpcMallocCont(rsp.contLen); + SMsgHead* pHead = rsp.pCont; + pHead->vgId = htonl(req.downstreamNodeId); + + tmsgSendRsp(&rsp); + + pMsg->info.handle = NULL; // disable auto rsp + } + return code; }