add checkpoint

This commit is contained in:
yihaoDeng 2023-07-07 10:13:31 +00:00
parent b03ca31a7f
commit 2dc041929a
1 changed files with 9 additions and 6 deletions

View File

@ -88,7 +88,8 @@ int32_t mndInitStream(SMnode *pMnode) {
static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, const SStreamTask *pTask, static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, const SStreamTask *pTask,
SMStreamDoCheckpointMsg *pMsg); SMStreamDoCheckpointMsg *pMsg);
static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId); static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
int64_t streamId, int32_t taskId);
void mndCleanupStream(SMnode *pMnode) {} void mndCleanupStream(SMnode *pMnode) {}
@ -853,7 +854,7 @@ static int32_t mndCreateCheckpoint(SMnode *pMnode, int32_t vgId, SList *pStreamL
taosArrayPush(stream, &pStream); taosArrayPush(stream, &pStream);
} }
if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, vgId, checkpointId) < 0) { if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, vgId, checkpointId, 0, 0) < 0) {
mndReleaseVgroup(pMnode, pVgObj); mndReleaseVgroup(pMnode, pVgObj);
for (int i = 0; i < taosArrayGetSize(stream); i++) { for (int i = 0; i < taosArrayGetSize(stream); i++) {
SStreamObj *p = taosArrayGetP(stream, i); SStreamObj *p = taosArrayGetP(stream, i);
@ -936,13 +937,14 @@ static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, con
return 0; return 0;
} }
static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId) { static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
int64_t streamId, int32_t taskId) {
SStreamCheckpointSourceReq req = {0}; SStreamCheckpointSourceReq req = {0};
req.checkpointId = checkpointId; req.checkpointId = checkpointId;
req.nodeId = nodeId; req.nodeId = nodeId;
req.expireTime = -1; req.expireTime = -1;
req.streamId = 0; // pTask->id.streamId; req.streamId = streamId; // pTask->id.streamId;
req.taskId = 0; // pTask->id.taskId; req.taskId = taskId; // pTask->id.taskId;
int32_t code; int32_t code;
int32_t blen; int32_t blen;
@ -1013,7 +1015,8 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
void *buf; void *buf;
int32_t tlen; int32_t tlen;
if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, checkpointId) < 0) { if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId,
pTask->id.taskId) < 0) {
mndReleaseVgroup(pMnode, pVgObj); mndReleaseVgroup(pMnode, pVgObj);
taosRUnLockLatch(&pStream->lock); taosRUnLockLatch(&pStream->lock);
mndTransDrop(pTrans); mndTransDrop(pTrans);