From 24cce3477f2e2b1d0d1130b471eb171b41310062 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 21 Feb 2025 10:00:55 +0800 Subject: [PATCH] fix(stream): update the checkpoint info for follower anyway. --- source/libs/executor/src/executor.c | 2 +- source/libs/stream/src/streamCheckpoint.c | 107 ++++++++++++++-------- 2 files changed, 69 insertions(+), 40 deletions(-) diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 1aa114a02a..b6044eaacf 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -608,7 +608,7 @@ void qUpdateOperatorParam(qTaskInfo_t tinfo, void* pParam) { } int32_t qExecutorInit(void) { - taosThreadOnce(&initPoolOnce, initRefPool); + (void) taosThreadOnce(&initPoolOnce, initRefPool); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index a43cdd0b85..2d53c99f34 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -595,68 +595,71 @@ void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) { pTask->id.idStr, pInfo->failedId, pTask->chkInfo.checkpointId); } -int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SVUpdateCheckpointInfoReq* pReq) { +// The checkpointInfo can be updated in the following three cases: +// 1. follower tasks; 2. leader task with status of TASK_STATUS__CK; 3. restore not completed +static int32_t doUpdateCheckpointInfoCheck(SStreamTask* pTask, bool restored, SVUpdateCheckpointInfoReq* pReq, + bool* pContinue) { SStreamMeta* pMeta = pTask->pMeta; int32_t vgId = pMeta->vgId; int32_t code = 0; const char* id = pTask->id.idStr; SCheckpointInfo* pInfo = &pTask->chkInfo; - streamMutexLock(&pTask->lock); + *pContinue = true; // not update the checkpoint info if the checkpointId is less than the failed checkpointId if (pReq->checkpointId < pInfo->pActiveInfo->failedId) { stWarn("s-task:%s vgId:%d not update the checkpoint-info, since update checkpointId:%" PRId64 - " is less than the failed checkpointId:%" PRId64 ", discard the update info", + " is less than the failed checkpointId:%" PRId64 ", discard", id, vgId, pReq->checkpointId, pInfo->pActiveInfo->failedId); - streamMutexUnlock(&pTask->lock); - // always return true + *pContinue = false; return TSDB_CODE_SUCCESS; } + // it's an expired checkpointInfo update msg, we still try to drop the required drop fill-history task. if (pReq->checkpointId <= pInfo->checkpointId) { stDebug("s-task:%s vgId:%d latest checkpointId:%" PRId64 " Ver:%" PRId64 " no need to update checkpoint info, updated checkpointId:%" PRId64 " Ver:%" PRId64 " transId:%d ignored", id, vgId, pInfo->checkpointId, pInfo->checkpointVer, pReq->checkpointId, pReq->checkpointVer, pReq->transId); - streamMutexUnlock(&pTask->lock); - { // destroy the related fill-history tasks - // drop task should not in the meta-lock, and drop the related fill-history task now - if (pReq->dropRelHTask) { - code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId); - int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); - stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped in update checkpointInfo, remain tasks:%d", - id, vgId, pReq->taskId, numOfTasks); - } + { // destroy the related fill-history tasks + if (pReq->dropRelHTask) { + code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId); - if (pReq->dropRelHTask) { - code = streamMetaCommit(pMeta); - } - } + int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); + stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped in update checkpointInfo, remain tasks:%d", + id, vgId, pReq->taskId, numOfTasks); + //todo: task may not exist, commit anyway, optimize this later + code = streamMetaCommit(pMeta); + } + } + + *pContinue = false; // always return true return TSDB_CODE_SUCCESS; } - SStreamTaskState pStatus = streamTaskGetStatus(pTask); + SStreamTaskState status = streamTaskGetStatus(pTask); if (!restored) { // during restore procedure, do update checkpoint-info stDebug("s-task:%s vgId:%d status:%s update the checkpoint-info during restore, checkpointId:%" PRId64 "->%" PRId64 " checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64, - id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer, + id, vgId, status.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer, pInfo->checkpointTime, pReq->checkpointTs); } else { // not in restore status, must be in checkpoint status - if ((pStatus.state == TASK_STATUS__CK) || (pMeta->role == NODE_ROLE_FOLLOWER)) { - stDebug("s-task:%s vgId:%d status:%s role:%d start to update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64 - " checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64, - id, vgId, pStatus.name, pMeta->role, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, + if (((status.state == TASK_STATUS__CK) && (pMeta->role == NODE_ROLE_LEADER)) || + (pMeta->role == NODE_ROLE_FOLLOWER)) { + stDebug("s-task:%s vgId:%d status:%s role:%d start to update the checkpoint-info, checkpointId:%" PRId64 + "->%" PRId64 " checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64, + id, vgId, status.name, pMeta->role, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer, pInfo->checkpointTime, pReq->checkpointTs); } else { stDebug("s-task:%s vgId:%d status:%s NOT update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64 " checkpointVer:%" PRId64 "->%" PRId64, - id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, + id, vgId, status.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer); } } @@ -665,14 +668,48 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV pInfo->processedVer <= pReq->checkpointVer); if (!valid) { - stFatal("s-task:%s invalid checkpointId update info recv, current checkpointId:%" PRId64 " checkpointVer:%" PRId64 - " processedVer:%" PRId64 " req checkpointId:%" PRId64 " checkpointVer:%" PRId64 " discard it", - id, pInfo->checkpointId, pInfo->checkpointVer, pInfo->processedVer, pReq->checkpointId, - pReq->checkpointVer); - streamMutexUnlock(&pTask->lock); - return TSDB_CODE_STREAM_INTERNAL_ERROR; + // invalid update checkpoint info for leader, since the processedVer is greater than the checkpointVer + // It is possible for follower tasks that the processedVer is greater than the checkpointVer, and the processed info + // in follower tasks will be discarded, since the leader/follower switch happens before the checkpoint of the + // processedVer being generated. + if (pMeta->role == NODE_ROLE_LEADER) { + + stFatal("s-task:%s checkpointId update info recv, current checkpointId:%" PRId64 " checkpointVer:%" PRId64 + " processedVer:%" PRId64 " req checkpointId:%" PRId64 " checkpointVer:%" PRId64 " discard it", + id, pInfo->checkpointId, pInfo->checkpointVer, pInfo->processedVer, pReq->checkpointId, + pReq->checkpointVer); + + *pContinue = false; + return TSDB_CODE_STREAM_INTERNAL_ERROR; + } else { + stInfo("s-task:%s vgId:%d follower recv checkpointId update info, current checkpointId:%" PRId64 + " checkpointVer:%" PRId64 " processedVer:%" PRId64 " req checkpointId:%" PRId64 " checkpointVer:%" PRId64, + id, pMeta->vgId, pInfo->checkpointId, pInfo->checkpointVer, pInfo->processedVer, pReq->checkpointId, + pReq->checkpointVer); + } } + return TSDB_CODE_SUCCESS; +} + +int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SVUpdateCheckpointInfoReq* pReq) { + SStreamMeta* pMeta = pTask->pMeta; + int32_t vgId = pMeta->vgId; + int32_t code = 0; + const char* id = pTask->id.idStr; + SCheckpointInfo* pInfo = &pTask->chkInfo; + bool continueUpdate = true; + + streamMutexLock(&pTask->lock); + code = doUpdateCheckpointInfoCheck(pTask, restored, pReq, &continueUpdate); + + if (!continueUpdate) { + streamMutexUnlock(&pTask->lock); + return code; + } + + SStreamTaskState pStatus = streamTaskGetStatus(pTask); + // update only it is in checkpoint status, or during restore procedure. if ((pStatus.state == TASK_STATUS__CK) || (!restored) || (pMeta->role == NODE_ROLE_FOLLOWER)) { pInfo->checkpointId = pReq->checkpointId; @@ -1537,14 +1574,6 @@ int32_t deleteCheckpointFile(const char* id, const char* name) { int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask) { streamMutexLock(&pTask->lock); ETaskStatus p = streamTaskGetStatus(pTask).state; - // if (pInfo->alreadySendChkptId == true) { - // stDebug("s-task:%s already start to consensus-checkpointId, not start again before it completed", id); - // streamMutexUnlock(&pTask->lock); - // return TSDB_CODE_SUCCESS; - // } else { - // pInfo->alreadySendChkptId = true; - // } - // streamTaskSetReqConsenChkptId(pTask, taosGetTimestampMs()); streamMutexUnlock(&pTask->lock);