diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 60b8734080..a28b63a0ba 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -88,7 +88,8 @@ int32_t mndInitStream(SMnode *pMnode) { static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, const SStreamTask *pTask, SMStreamDoCheckpointMsg *pMsg); -static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId); +static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId, + int64_t streamId, int32_t taskId); void mndCleanupStream(SMnode *pMnode) {} @@ -853,7 +854,7 @@ static int32_t mndCreateCheckpoint(SMnode *pMnode, int32_t vgId, SList *pStreamL taosArrayPush(stream, &pStream); } - if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, vgId, checkpointId) < 0) { + if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, vgId, checkpointId, 0, 0) < 0) { mndReleaseVgroup(pMnode, pVgObj); for (int i = 0; i < taosArrayGetSize(stream); i++) { SStreamObj *p = taosArrayGetP(stream, i); @@ -936,13 +937,14 @@ static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, con return 0; } -static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId) { +static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId, + int64_t streamId, int32_t taskId) { SStreamCheckpointSourceReq req = {0}; req.checkpointId = checkpointId; req.nodeId = nodeId; req.expireTime = -1; - req.streamId = 0; // pTask->id.streamId; - req.taskId = 0; // pTask->id.taskId; + req.streamId = streamId; // pTask->id.streamId; + req.taskId = taskId; // pTask->id.taskId; int32_t code; int32_t blen; @@ -1013,7 +1015,8 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre void *buf; int32_t tlen; - if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, checkpointId) < 0) { + if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId, + pTask->id.taskId) < 0) { mndReleaseVgroup(pMnode, pVgObj); taosRUnLockLatch(&pStream->lock); mndTransDrop(pTrans);