fix(stream): update the checkpoint info for follower anyway.

This commit is contained in:
Haojun Liao 2025-02-21 10:00:55 +08:00
parent 306f6ebc93
commit 24cce3477f
2 changed files with 69 additions and 40 deletions

View File

@ -608,7 +608,7 @@ void qUpdateOperatorParam(qTaskInfo_t tinfo, void* pParam) {
}
int32_t qExecutorInit(void) {
taosThreadOnce(&initPoolOnce, initRefPool);
(void) taosThreadOnce(&initPoolOnce, initRefPool);
return TSDB_CODE_SUCCESS;
}

View File

@ -595,68 +595,71 @@ void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) {
pTask->id.idStr, pInfo->failedId, pTask->chkInfo.checkpointId);
}
int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SVUpdateCheckpointInfoReq* pReq) {
// The checkpointInfo can be updated in the following three cases:
// 1. follower tasks; 2. leader task with status of TASK_STATUS__CK; 3. restore not completed
static int32_t doUpdateCheckpointInfoCheck(SStreamTask* pTask, bool restored, SVUpdateCheckpointInfoReq* pReq,
bool* pContinue) {
SStreamMeta* pMeta = pTask->pMeta;
int32_t vgId = pMeta->vgId;
int32_t code = 0;
const char* id = pTask->id.idStr;
SCheckpointInfo* pInfo = &pTask->chkInfo;
streamMutexLock(&pTask->lock);
*pContinue = true;
// not update the checkpoint info if the checkpointId is less than the failed checkpointId
if (pReq->checkpointId < pInfo->pActiveInfo->failedId) {
stWarn("s-task:%s vgId:%d not update the checkpoint-info, since update checkpointId:%" PRId64
" is less than the failed checkpointId:%" PRId64 ", discard the update info",
" is less than the failed checkpointId:%" PRId64 ", discard",
id, vgId, pReq->checkpointId, pInfo->pActiveInfo->failedId);
streamMutexUnlock(&pTask->lock);
// always return true
*pContinue = false;
return TSDB_CODE_SUCCESS;
}
// it's an expired checkpointInfo update msg, we still try to drop the required drop fill-history task.
if (pReq->checkpointId <= pInfo->checkpointId) {
stDebug("s-task:%s vgId:%d latest checkpointId:%" PRId64 " Ver:%" PRId64
" no need to update checkpoint info, updated checkpointId:%" PRId64 " Ver:%" PRId64 " transId:%d ignored",
id, vgId, pInfo->checkpointId, pInfo->checkpointVer, pReq->checkpointId, pReq->checkpointVer,
pReq->transId);
streamMutexUnlock(&pTask->lock);
{ // destroy the related fill-history tasks
// drop task should not in the meta-lock, and drop the related fill-history task now
if (pReq->dropRelHTask) {
code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped in update checkpointInfo, remain tasks:%d",
id, vgId, pReq->taskId, numOfTasks);
}
{ // destroy the related fill-history tasks
if (pReq->dropRelHTask) {
code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
if (pReq->dropRelHTask) {
code = streamMetaCommit(pMeta);
}
}
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped in update checkpointInfo, remain tasks:%d",
id, vgId, pReq->taskId, numOfTasks);
//todo: task may not exist, commit anyway, optimize this later
code = streamMetaCommit(pMeta);
}
}
*pContinue = false;
// always return true
return TSDB_CODE_SUCCESS;
}
SStreamTaskState pStatus = streamTaskGetStatus(pTask);
SStreamTaskState status = streamTaskGetStatus(pTask);
if (!restored) { // during restore procedure, do update checkpoint-info
stDebug("s-task:%s vgId:%d status:%s update the checkpoint-info during restore, checkpointId:%" PRId64 "->%" PRId64
" checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64,
id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer,
id, vgId, status.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer,
pInfo->checkpointTime, pReq->checkpointTs);
} else { // not in restore status, must be in checkpoint status
if ((pStatus.state == TASK_STATUS__CK) || (pMeta->role == NODE_ROLE_FOLLOWER)) {
stDebug("s-task:%s vgId:%d status:%s role:%d start to update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64
" checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64,
id, vgId, pStatus.name, pMeta->role, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer,
if (((status.state == TASK_STATUS__CK) && (pMeta->role == NODE_ROLE_LEADER)) ||
(pMeta->role == NODE_ROLE_FOLLOWER)) {
stDebug("s-task:%s vgId:%d status:%s role:%d start to update the checkpoint-info, checkpointId:%" PRId64
"->%" PRId64 " checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64,
id, vgId, status.name, pMeta->role, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer,
pReq->checkpointVer, pInfo->checkpointTime, pReq->checkpointTs);
} else {
stDebug("s-task:%s vgId:%d status:%s NOT update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64
" checkpointVer:%" PRId64 "->%" PRId64,
id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer,
id, vgId, status.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer,
pReq->checkpointVer);
}
}
@ -665,14 +668,48 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
pInfo->processedVer <= pReq->checkpointVer);
if (!valid) {
stFatal("s-task:%s invalid checkpointId update info recv, current checkpointId:%" PRId64 " checkpointVer:%" PRId64
" processedVer:%" PRId64 " req checkpointId:%" PRId64 " checkpointVer:%" PRId64 " discard it",
id, pInfo->checkpointId, pInfo->checkpointVer, pInfo->processedVer, pReq->checkpointId,
pReq->checkpointVer);
streamMutexUnlock(&pTask->lock);
return TSDB_CODE_STREAM_INTERNAL_ERROR;
// invalid update checkpoint info for leader, since the processedVer is greater than the checkpointVer
// It is possible for follower tasks that the processedVer is greater than the checkpointVer, and the processed info
// in follower tasks will be discarded, since the leader/follower switch happens before the checkpoint of the
// processedVer being generated.
if (pMeta->role == NODE_ROLE_LEADER) {
stFatal("s-task:%s checkpointId update info recv, current checkpointId:%" PRId64 " checkpointVer:%" PRId64
" processedVer:%" PRId64 " req checkpointId:%" PRId64 " checkpointVer:%" PRId64 " discard it",
id, pInfo->checkpointId, pInfo->checkpointVer, pInfo->processedVer, pReq->checkpointId,
pReq->checkpointVer);
*pContinue = false;
return TSDB_CODE_STREAM_INTERNAL_ERROR;
} else {
stInfo("s-task:%s vgId:%d follower recv checkpointId update info, current checkpointId:%" PRId64
" checkpointVer:%" PRId64 " processedVer:%" PRId64 " req checkpointId:%" PRId64 " checkpointVer:%" PRId64,
id, pMeta->vgId, pInfo->checkpointId, pInfo->checkpointVer, pInfo->processedVer, pReq->checkpointId,
pReq->checkpointVer);
}
}
return TSDB_CODE_SUCCESS;
}
int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SVUpdateCheckpointInfoReq* pReq) {
SStreamMeta* pMeta = pTask->pMeta;
int32_t vgId = pMeta->vgId;
int32_t code = 0;
const char* id = pTask->id.idStr;
SCheckpointInfo* pInfo = &pTask->chkInfo;
bool continueUpdate = true;
streamMutexLock(&pTask->lock);
code = doUpdateCheckpointInfoCheck(pTask, restored, pReq, &continueUpdate);
if (!continueUpdate) {
streamMutexUnlock(&pTask->lock);
return code;
}
SStreamTaskState pStatus = streamTaskGetStatus(pTask);
// update only it is in checkpoint status, or during restore procedure.
if ((pStatus.state == TASK_STATUS__CK) || (!restored) || (pMeta->role == NODE_ROLE_FOLLOWER)) {
pInfo->checkpointId = pReq->checkpointId;
@ -1537,14 +1574,6 @@ int32_t deleteCheckpointFile(const char* id, const char* name) {
int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask) {
streamMutexLock(&pTask->lock);
ETaskStatus p = streamTaskGetStatus(pTask).state;
// if (pInfo->alreadySendChkptId == true) {
// stDebug("s-task:%s already start to consensus-checkpointId, not start again before it completed", id);
// streamMutexUnlock(&pTask->lock);
// return TSDB_CODE_SUCCESS;
// } else {
// pInfo->alreadySendChkptId = true;
// }
//
streamTaskSetReqConsenChkptId(pTask, taosGetTimestampMs());
streamMutexUnlock(&pTask->lock);