Merge pull request #29615 from taosdata/fix/disp_lost

fix(stream): check the checkpoint-report transId and checkpointId
This commit is contained in:
Shengliang Guan 2025-01-23 09:47:36 +08:00 committed by GitHub
commit f6f08fa846
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 137 additions and 31 deletions

View File

@ -159,6 +159,7 @@ void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo);
int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList, int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList,
SVgroupChangeInfo *pInfo); SVgroupChangeInfo *pInfo);
void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo); void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo);
bool isNodeUpdateTransActive();
int32_t createStreamTaskIter(SStreamObj *pStream, SStreamTaskIter **pIter); int32_t createStreamTaskIter(SStreamObj *pStream, SStreamTaskIter **pIter);
void destroyStreamTaskIter(SStreamTaskIter *pIter); void destroyStreamTaskIter(SStreamTaskIter *pIter);
@ -175,8 +176,8 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
int32_t mndGetConsensusInfo(SHashObj *pHash, int64_t streamId, int32_t numOfTasks, SCheckpointConsensusInfo **pInfo); int32_t mndGetConsensusInfo(SHashObj *pHash, int64_t streamId, int32_t numOfTasks, SCheckpointConsensusInfo **pInfo);
void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo); void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo);
void mndClearConsensusRspEntry(SCheckpointConsensusInfo *pInfo); void mndClearConsensusRspEntry(SCheckpointConsensusInfo *pInfo);
int64_t mndClearConsensusCheckpointId(SHashObj *pHash, int64_t streamId); int32_t mndClearConsensusCheckpointId(SHashObj *pHash, int64_t streamId);
int64_t mndClearChkptReportInfo(SHashObj *pHash, int64_t streamId); int32_t mndClearChkptReportInfo(SHashObj *pHash, int64_t streamId);
int32_t mndResetChkptReportInfo(SHashObj *pHash, int64_t streamId); int32_t mndResetChkptReportInfo(SHashObj *pHash, int64_t streamId);
int32_t setStreamAttrInResBlock(SStreamObj *pStream, SSDataBlock *pBlock, int32_t numOfRows); int32_t setStreamAttrInResBlock(SStreamObj *pStream, SSDataBlock *pBlock, int32_t numOfRows);

View File

@ -65,8 +65,6 @@ static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq);
static void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); static void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
static void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo); static void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo);
static void removeExpiredNodeInfo(const SArray *pNodeSnapshot);
static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len);
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream); SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream);
@ -801,6 +799,13 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
TSDB_CHECK_NULL(sql, code, lino, _OVER, terrno); TSDB_CHECK_NULL(sql, code, lino, _OVER, terrno);
} }
// check for the taskEp update trans
if (isNodeUpdateTransActive()) {
mError("stream:%s failed to create stream, node update trans is active", createReq.name);
code = TSDB_CODE_STREAM_TASK_IVLD_STATUS;
goto _OVER;
}
SDbObj *pSourceDb = mndAcquireDb(pMnode, createReq.sourceDB); SDbObj *pSourceDb = mndAcquireDb(pMnode, createReq.sourceDB);
if (pSourceDb == NULL) { if (pSourceDb == NULL) {
code = terrno; code = terrno;
@ -2416,8 +2421,8 @@ static bool validateChkptReport(const SCheckpointReport *pReport, int64_t report
return true; return true;
} }
static void doAddReportStreamTask(SArray *pList, int64_t reportChkptId, const SCheckpointReport *pReport) { static void doAddReportStreamTask(SArray *pList, int64_t reportedChkptId, const SCheckpointReport *pReport) {
bool valid = validateChkptReport(pReport, reportChkptId); bool valid = validateChkptReport(pReport, reportedChkptId);
if (!valid) { if (!valid) {
return; return;
} }
@ -2433,7 +2438,7 @@ static void doAddReportStreamTask(SArray *pList, int64_t reportChkptId, const SC
mError("s-task:0x%x invalid checkpoint-report msg, existed:%" PRId64 " req checkpointId:%" PRId64 ", discard", mError("s-task:0x%x invalid checkpoint-report msg, existed:%" PRId64 " req checkpointId:%" PRId64 ", discard",
pReport->taskId, p->checkpointId, pReport->checkpointId); pReport->taskId, p->checkpointId, pReport->checkpointId);
} else if (p->checkpointId < pReport->checkpointId) { // expired checkpoint-report msg, update it } else if (p->checkpointId < pReport->checkpointId) { // expired checkpoint-report msg, update it
mDebug("s-task:0x%x expired checkpoint-report msg in checkpoint-report list update from %" PRId64 "->%" PRId64, mInfo("s-task:0x%x expired checkpoint-report info in checkpoint-report list update from %" PRId64 "->%" PRId64,
pReport->taskId, p->checkpointId, pReport->checkpointId); pReport->taskId, p->checkpointId, pReport->checkpointId);
// update the checkpoint report info // update the checkpoint report info
@ -2465,7 +2470,8 @@ static void doAddReportStreamTask(SArray *pList, int64_t reportChkptId, const SC
mError("failed to put into task list, taskId:0x%x", pReport->taskId); mError("failed to put into task list, taskId:0x%x", pReport->taskId);
} else { } else {
int32_t size = taosArrayGetSize(pList); int32_t size = taosArrayGetSize(pList);
mDebug("stream:0x%" PRIx64 " %d tasks has send checkpoint-report", pReport->streamId, size); mDebug("stream:0x%" PRIx64 " taskId:0x%x checkpoint-report recv, %d tasks has send checkpoint-report",
pReport->streamId, pReport->taskId, size);
} }
} }
@ -2491,7 +2497,7 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) {
" checkpointVer:%" PRId64 " transId:%d", " checkpointVer:%" PRId64 " transId:%d",
req.nodeId, req.taskId, req.checkpointId, req.checkpointVer, req.transId); req.nodeId, req.taskId, req.checkpointId, req.checkpointVer, req.transId);
// register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans. // register to the stream task done map, if all tasks has sent these kinds of message, start the checkpoint trans.
streamMutexLock(&execInfo.lock); streamMutexLock(&execInfo.lock);
SStreamObj *pStream = NULL; SStreamObj *pStream = NULL;
@ -2500,7 +2506,7 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) {
mWarn("failed to find the stream:0x%" PRIx64 ", not handle checkpoint-report, try to acquire in buf", req.streamId); mWarn("failed to find the stream:0x%" PRIx64 ", not handle checkpoint-report, try to acquire in buf", req.streamId);
// not in meta-store yet, try to acquire the task in exec buffer // not in meta-store yet, try to acquire the task in exec buffer
// the checkpoint req arrives too soon before the completion of the create stream trans. // the checkpoint req arrives too soon before the completion of the creation of stream trans.
STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
void *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); void *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
if (p == NULL) { if (p == NULL) {
@ -2533,7 +2539,7 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) {
} }
int32_t total = taosArrayGetSize(pInfo->pTaskList); int32_t total = taosArrayGetSize(pInfo->pTaskList);
if (total == numOfTasks) { // all tasks has send the reqs if (total == numOfTasks) { // all tasks have sent the reqs
mInfo("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, checkpoint meta-info for checkpointId:%" PRId64 mInfo("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, checkpoint meta-info for checkpointId:%" PRId64
" will be issued soon", " will be issued soon",
req.streamId, pStream->name, total, req.checkpointId); req.streamId, pStream->name, total, req.checkpointId);

View File

@ -292,6 +292,25 @@ int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msg
return mndTransAppendRedoAction(pTrans, &action); return mndTransAppendRedoAction(pTrans, &action);
} }
bool isNodeUpdateTransActive() {
bool exist = false;
void *pIter = NULL;
streamMutexLock(&execInfo.lock);
while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) {
SStreamTransInfo *pTransInfo = (SStreamTransInfo *)pIter;
if (strcmp(pTransInfo->name, MND_STREAM_TASK_UPDATE_NAME) == 0) {
mDebug("stream:0x%" PRIx64 " %s st:%" PRId64 " is in task nodeEp update, create new stream not allowed",
pTransInfo->streamId, pTransInfo->name, pTransInfo->startTime);
exist = true;
}
}
streamMutexUnlock(&execInfo.lock);
return exist;
}
int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) { int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) {
void *pIter = NULL; void *pIter = NULL;

View File

@ -658,6 +658,72 @@ int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot) {
return 0; return 0;
} }
static int32_t allTasksSendChkptReport(SChkptReportInfo* pReportInfo, int32_t numOfTasks, const char* pName) {
int64_t checkpointId = -1;
int32_t transId = -1;
int32_t taskId = -1;
int32_t existed = (int32_t)taosArrayGetSize(pReportInfo->pTaskList);
if (existed != numOfTasks) {
mDebug("stream:0x%" PRIx64 " %s %d/%d tasks send checkpoint-report, %d not send", pReportInfo->streamId, pName,
existed, numOfTasks, numOfTasks - existed);
return -1;
}
// acquire current active checkpointId, and do cross-check checkpointId info in exec.pTaskList
for(int32_t i = 0; i < numOfTasks; ++i) {
STaskChkptInfo *pInfo = taosArrayGet(pReportInfo->pTaskList, i);
if (pInfo == NULL) {
continue;
}
if (checkpointId == -1) {
checkpointId = pInfo->checkpointId;
transId = pInfo->transId;
taskId = pInfo->taskId;
} else if (checkpointId != pInfo->checkpointId) {
mError("stream:0x%" PRIx64
" checkpointId in checkpoint-report list are not identical, type 1 taskId:0x%x checkpointId:%" PRId64
", type 2 taskId:0x%x checkpointId:%" PRId64,
pReportInfo->streamId, taskId, checkpointId, pInfo->taskId, pInfo->checkpointId);
return -1;
}
}
// check for the correct checkpointId for current task info in STaskChkptInfo
STaskChkptInfo *p = taosArrayGet(pReportInfo->pTaskList, 0);
STaskId id = {.streamId = p->streamId, .taskId = p->taskId};
STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
// cross-check failed, there must be something unknown wrong
SStreamTransInfo *pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, &id.streamId, sizeof(id.streamId));
if (pTransInfo == NULL) {
mWarn("stream:0x%" PRIx64 " no active trans exists for checkpoint transId:%d, it may have been cleared already",
id.streamId, transId);
if (pe->checkpointInfo.activeId != 0 && pe->checkpointInfo.activeId != checkpointId) {
mWarn("stream:0x%" PRIx64 " active checkpointId is not equalled to the required, current:%" PRId64
", req:%" PRId64 " recheck next time",
id.streamId, pe->checkpointInfo.activeId, checkpointId);
return -1;
} else {
// do nothing
}
} else {
if (pTransInfo->transId != transId) {
mError("stream:0x%" PRIx64
" checkpoint-report list info are expired, active transId:%d trans in list:%d, recheck next time",
id.streamId, pTransInfo->transId, transId);
return -1;
}
}
mDebug("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, start to update checkpoint-info", id.streamId,
pName, numOfTasks);
return TSDB_CODE_SUCCESS;
}
int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
void *pIter = NULL; void *pIter = NULL;
@ -668,6 +734,7 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) {
} }
mDebug("start to scan checkpoint report info"); mDebug("start to scan checkpoint report info");
streamMutexLock(&execInfo.lock); streamMutexLock(&execInfo.lock);
while ((pIter = taosHashIterate(execInfo.pChkptStreams, pIter)) != NULL) { while ((pIter = taosHashIterate(execInfo.pChkptStreams, pIter)) != NULL) {
@ -693,30 +760,27 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) {
} }
int32_t total = mndGetNumOfStreamTasks(pStream); int32_t total = mndGetNumOfStreamTasks(pStream);
int32_t existed = (int32_t)taosArrayGetSize(px->pTaskList); int32_t ret = allTasksSendChkptReport(px, total, pStream->name);
if (ret == 0) {
if (total == existed) {
mDebug("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, start to update checkpoint-info",
pStream->uid, pStream->name, total);
code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false); code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false);
if (code == 0) { if (code == 0) {
code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, px->pTaskList); code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, px->pTaskList);
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { // remove this entry if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { // remove this entry
taosArrayClear(px->pTaskList); taosArrayClear(px->pTaskList);
mInfo("stream:0x%" PRIx64 " clear checkpoint-report list and update the report checkpointId from:%" PRId64
" to %" PRId64,
pInfo->streamId, px->reportChkpt, pInfo->checkpointId);
px->reportChkpt = pInfo->checkpointId; px->reportChkpt = pInfo->checkpointId;
mDebug("stream:0x%" PRIx64 " clear checkpoint-report list", pInfo->streamId);
} else { } else {
mDebug("stream:0x%" PRIx64 " not launch chkpt-meta update trans, due to checkpoint not finished yet", mDebug("stream:0x%" PRIx64 " not launch chkpt-info update trans, due to checkpoint not finished yet",
pInfo->streamId); pInfo->streamId);
} }
sdbRelease(pMnode->pSdb, pStream);
break; break;
} else { } else {
mDebug("stream:0x%" PRIx64 " active checkpoint trans not finished yet, wait", pInfo->streamId); mDebug("stream:0x%" PRIx64 " active checkpoint trans not finished yet, wait", pInfo->streamId);
} }
} else {
mDebug("stream:0x%" PRIx64 " %s %d/%d tasks send checkpoint-report, %d not send", pInfo->streamId, pStream->name,
existed, total, total - existed);
} }
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
@ -743,6 +807,8 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) {
streamMutexUnlock(&execInfo.lock); streamMutexUnlock(&execInfo.lock);
taosArrayDestroy(pDropped); taosArrayDestroy(pDropped);
mDebug("end to scan checkpoint report info")
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -836,7 +902,8 @@ void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpo
SCheckpointConsensusEntry info = {.ts = taosGetTimestampMs()}; SCheckpointConsensusEntry info = {.ts = taosGetTimestampMs()};
memcpy(&info.req, pRestoreInfo, sizeof(info.req)); memcpy(&info.req, pRestoreInfo, sizeof(info.req));
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pTaskList); ++i) { int32_t num = (int32_t) taosArrayGetSize(pInfo->pTaskList);
for (int32_t i = 0; i < num; ++i) {
SCheckpointConsensusEntry *p = taosArrayGet(pInfo->pTaskList, i); SCheckpointConsensusEntry *p = taosArrayGet(pInfo->pTaskList, i);
if (p == NULL) { if (p == NULL) {
continue; continue;
@ -844,10 +911,12 @@ void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpo
if (p->req.taskId == info.req.taskId) { if (p->req.taskId == info.req.taskId) {
mDebug("s-task:0x%x already in consensus-checkpointId list for stream:0x%" PRIx64 ", update ts %" PRId64 mDebug("s-task:0x%x already in consensus-checkpointId list for stream:0x%" PRIx64 ", update ts %" PRId64
"->%" PRId64 " total existed:%d", "->%" PRId64 " checkpointId:%" PRId64 " -> %" PRId64 " total existed:%d",
pRestoreInfo->taskId, pRestoreInfo->streamId, p->req.startTs, info.req.startTs, pRestoreInfo->taskId, pRestoreInfo->streamId, p->req.startTs, info.req.startTs, p->req.checkpointId,
(int32_t)taosArrayGetSize(pInfo->pTaskList)); info.req.checkpointId, num);
p->req.startTs = info.req.startTs; p->req.startTs = info.req.startTs;
p->req.checkpointId = info.req.checkpointId;
p->req.transId = info.req.transId;
return; return;
} }
} }
@ -856,7 +925,7 @@ void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpo
if (p == NULL) { if (p == NULL) {
mError("s-task:0x%x failed to put task into consensus-checkpointId list, code: out of memory", info.req.taskId); mError("s-task:0x%x failed to put task into consensus-checkpointId list, code: out of memory", info.req.taskId);
} else { } else {
int32_t num = taosArrayGetSize(pInfo->pTaskList); num = taosArrayGetSize(pInfo->pTaskList);
mDebug("s-task:0x%x checkpointId:%" PRId64 " added into consensus-checkpointId list, stream:0x%" PRIx64 mDebug("s-task:0x%x checkpointId:%" PRId64 " added into consensus-checkpointId list, stream:0x%" PRIx64
" waiting tasks:%d", " waiting tasks:%d",
pRestoreInfo->taskId, pRestoreInfo->checkpointId, pRestoreInfo->streamId, num); pRestoreInfo->taskId, pRestoreInfo->checkpointId, pRestoreInfo->streamId, num);
@ -868,7 +937,7 @@ void mndClearConsensusRspEntry(SCheckpointConsensusInfo *pInfo) {
pInfo->pTaskList = NULL; pInfo->pTaskList = NULL;
} }
int64_t mndClearConsensusCheckpointId(SHashObj *pHash, int64_t streamId) { int32_t mndClearConsensusCheckpointId(SHashObj *pHash, int64_t streamId) {
int32_t code = 0; int32_t code = 0;
int32_t numOfStreams = taosHashGetSize(pHash); int32_t numOfStreams = taosHashGetSize(pHash);
if (numOfStreams == 0) { if (numOfStreams == 0) {
@ -885,7 +954,7 @@ int64_t mndClearConsensusCheckpointId(SHashObj *pHash, int64_t streamId) {
return code; return code;
} }
int64_t mndClearChkptReportInfo(SHashObj *pHash, int64_t streamId) { int32_t mndClearChkptReportInfo(SHashObj *pHash, int64_t streamId) {
int32_t code = 0; int32_t code = 0;
int32_t numOfStreams = taosHashGetSize(pHash); int32_t numOfStreams = taosHashGetSize(pHash);
if (numOfStreams == 0) { if (numOfStreams == 0) {

View File

@ -604,6 +604,17 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
streamMutexLock(&pTask->lock); streamMutexLock(&pTask->lock);
// not update the checkpoint info if the checkpointId is less than the failed checkpointId
if (pReq->checkpointId < pInfo->pActiveInfo->failedId) {
stWarn("s-task:%s vgId:%d not update the checkpoint-info, since update checkpointId:%" PRId64
" is less than the failed checkpointId:%" PRId64 ", discard the update info",
id, vgId, pReq->checkpointId, pInfo->pActiveInfo->failedId);
streamMutexUnlock(&pTask->lock);
// always return true
return TSDB_CODE_SUCCESS;
}
if (pReq->checkpointId <= pInfo->checkpointId) { if (pReq->checkpointId <= pInfo->checkpointId) {
stDebug("s-task:%s vgId:%d latest checkpointId:%" PRId64 " Ver:%" PRId64 stDebug("s-task:%s vgId:%d latest checkpointId:%" PRId64 " Ver:%" PRId64
" no need to update checkpoint info, updated checkpointId:%" PRId64 " Ver:%" PRId64 " transId:%d ignored", " no need to update checkpoint info, updated checkpointId:%" PRId64 " Ver:%" PRId64 " transId:%d ignored",
@ -638,9 +649,9 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
pInfo->checkpointTime, pReq->checkpointTs); pInfo->checkpointTime, pReq->checkpointTs);
} else { // not in restore status, must be in checkpoint status } else { // not in restore status, must be in checkpoint status
if ((pStatus.state == TASK_STATUS__CK) || (pMeta->role == NODE_ROLE_FOLLOWER)) { if ((pStatus.state == TASK_STATUS__CK) || (pMeta->role == NODE_ROLE_FOLLOWER)) {
stDebug("s-task:%s vgId:%d status:%s start to update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64 stDebug("s-task:%s vgId:%d status:%s role:%d start to update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64
" checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64, " checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64,
id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, id, vgId, pStatus.name, pMeta->role, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer,
pReq->checkpointVer, pInfo->checkpointTime, pReq->checkpointTs); pReq->checkpointVer, pInfo->checkpointTime, pReq->checkpointTs);
} else { } else {
stDebug("s-task:%s vgId:%d status:%s NOT update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64 stDebug("s-task:%s vgId:%d status:%s NOT update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64