fix(stream): update checkpoint-info after check the failed checkpointId, and update the consensus-checkpoint id in mnode.

This commit is contained in:
Haojun Liao 2025-01-21 16:29:00 +08:00
parent df6ec3afc2
commit 0ea46585f4
3 changed files with 24 additions and 10 deletions

View File

@ -175,8 +175,8 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
int32_t mndGetConsensusInfo(SHashObj *pHash, int64_t streamId, int32_t numOfTasks, SCheckpointConsensusInfo **pInfo);
void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo);
void mndClearConsensusRspEntry(SCheckpointConsensusInfo *pInfo);
int64_t mndClearConsensusCheckpointId(SHashObj *pHash, int64_t streamId);
int64_t mndClearChkptReportInfo(SHashObj *pHash, int64_t streamId);
int32_t mndClearConsensusCheckpointId(SHashObj *pHash, int64_t streamId);
int32_t mndClearChkptReportInfo(SHashObj *pHash, int64_t streamId);
int32_t mndResetChkptReportInfo(SHashObj *pHash, int64_t streamId);
int32_t setStreamAttrInResBlock(SStreamObj *pStream, SSDataBlock *pBlock, int32_t numOfRows);

View File

@ -902,7 +902,8 @@ void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpo
SCheckpointConsensusEntry info = {.ts = taosGetTimestampMs()};
memcpy(&info.req, pRestoreInfo, sizeof(info.req));
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pTaskList); ++i) {
int32_t num = (int32_t) taosArrayGetSize(pInfo->pTaskList);
for (int32_t i = 0; i < num; ++i) {
SCheckpointConsensusEntry *p = taosArrayGet(pInfo->pTaskList, i);
if (p == NULL) {
continue;
@ -910,10 +911,12 @@ void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpo
if (p->req.taskId == info.req.taskId) {
mDebug("s-task:0x%x already in consensus-checkpointId list for stream:0x%" PRIx64 ", update ts %" PRId64
"->%" PRId64 " total existed:%d",
pRestoreInfo->taskId, pRestoreInfo->streamId, p->req.startTs, info.req.startTs,
(int32_t)taosArrayGetSize(pInfo->pTaskList));
"->%" PRId64 " checkpointId:%" PRId64 " -> %" PRId64 " total existed:%d",
pRestoreInfo->taskId, pRestoreInfo->streamId, p->req.startTs, info.req.startTs, p->req.checkpointId,
info.req.checkpointId, num);
p->req.startTs = info.req.startTs;
p->req.checkpointId = info.req.checkpointId;
p->req.transId = info.req.transId;
return;
}
}
@ -922,7 +925,7 @@ void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpo
if (p == NULL) {
mError("s-task:0x%x failed to put task into consensus-checkpointId list, code: out of memory", info.req.taskId);
} else {
int32_t num = taosArrayGetSize(pInfo->pTaskList);
num = taosArrayGetSize(pInfo->pTaskList);
mDebug("s-task:0x%x checkpointId:%" PRId64 " added into consensus-checkpointId list, stream:0x%" PRIx64
" waiting tasks:%d",
pRestoreInfo->taskId, pRestoreInfo->checkpointId, pRestoreInfo->streamId, num);
@ -934,7 +937,7 @@ void mndClearConsensusRspEntry(SCheckpointConsensusInfo *pInfo) {
pInfo->pTaskList = NULL;
}
int64_t mndClearConsensusCheckpointId(SHashObj *pHash, int64_t streamId) {
int32_t mndClearConsensusCheckpointId(SHashObj *pHash, int64_t streamId) {
int32_t code = 0;
int32_t numOfStreams = taosHashGetSize(pHash);
if (numOfStreams == 0) {
@ -951,7 +954,7 @@ int64_t mndClearConsensusCheckpointId(SHashObj *pHash, int64_t streamId) {
return code;
}
int64_t mndClearChkptReportInfo(SHashObj *pHash, int64_t streamId) {
int32_t mndClearChkptReportInfo(SHashObj *pHash, int64_t streamId) {
int32_t code = 0;
int32_t numOfStreams = taosHashGetSize(pHash);
if (numOfStreams == 0) {

View File

@ -604,6 +604,17 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
streamMutexLock(&pTask->lock);
// not update the checkpoint info if the checkpointId is less than the failed checkpointId
if (pReq->checkpointId < pInfo->pActiveInfo->failedId) {
stWarn("s-task:%s vgId:%d not update the checkpoint-info, since update checkpointId:%" PRId64
" is less than the failed checkpointId:%" PRId64 ", discard the update info",
id, vgId, pReq->checkpointId, pInfo->pActiveInfo->failedId);
streamMutexUnlock(&pTask->lock);
// always return true
return TSDB_CODE_SUCCESS;
}
if (pReq->checkpointId <= pInfo->checkpointId) {
stDebug("s-task:%s vgId:%d latest checkpointId:%" PRId64 " Ver:%" PRId64
" no need to update checkpoint info, updated checkpointId:%" PRId64 " Ver:%" PRId64 " transId:%d ignored",
@ -638,7 +649,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
pInfo->checkpointTime, pReq->checkpointTs);
} else { // not in restore status, must be in checkpoint status
if ((pStatus.state == TASK_STATUS__CK) || (pMeta->role == NODE_ROLE_FOLLOWER)) {
stDebug("s-task:%s vgId:%d status:%s start to update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64
stDebug("s-task:%s vgId:%d status:%s role:%d start to update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64
" checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64,
id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer,
pReq->checkpointVer, pInfo->checkpointTime, pReq->checkpointTs);