diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 1c0f73249c..119edb47bc 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -954,6 +954,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { int32_t code = TSDB_CODE_SUCCESS; SStreamTask* pTask = NULL; SStreamTask* pStreamTask = NULL; + char* pStatus = NULL; code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask); if (pTask == NULL) { @@ -964,7 +965,29 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { // do recovery step1 const char* id = pTask->id.idStr; - char* pStatus = streamTaskGetStatus(pTask).name; + streamMutexLock(&pTask->lock); + + SStreamTaskState s = streamTaskGetStatus(pTask); + pStatus = s.name; + + if ((s.state != TASK_STATUS__SCAN_HISTORY) || (pTask->status.downstreamReady == 0)) { + tqError("s-task:%s vgId:%d status:%s downstreamReady:%d not allowed/ready for scan-history data, quit", id, + pMeta->vgId, s.name, pTask->status.downstreamReady); + + streamMutexUnlock(&pTask->lock); + streamMetaReleaseTask(pMeta, pTask); + return 0; + } + + if (pTask->exec.pExecutor == NULL) { + tqError("s-task:%s vgId:%d executor is null, not executor scan history", id, pMeta->vgId); + + streamMutexUnlock(&pTask->lock); + streamMetaReleaseTask(pMeta, pTask); + return 0; + } + + streamMutexUnlock(&pTask->lock); // avoid multi-thread exec while (1) { diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 1a49e50547..c53e1a19a3 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -595,68 +595,71 @@ void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) { pTask->id.idStr, pInfo->failedId, pTask->chkInfo.checkpointId); } -int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SVUpdateCheckpointInfoReq* pReq) { +// The checkpointInfo can be updated in the following three cases: +// 1. follower tasks; 2. leader task with status of TASK_STATUS__CK; 3. restore not completed +static int32_t doUpdateCheckpointInfoCheck(SStreamTask* pTask, bool restored, SVUpdateCheckpointInfoReq* pReq, + bool* pContinue) { SStreamMeta* pMeta = pTask->pMeta; int32_t vgId = pMeta->vgId; int32_t code = 0; const char* id = pTask->id.idStr; SCheckpointInfo* pInfo = &pTask->chkInfo; - streamMutexLock(&pTask->lock); + *pContinue = true; // not update the checkpoint info if the checkpointId is less than the failed checkpointId if (pReq->checkpointId < pInfo->pActiveInfo->failedId) { stWarn("s-task:%s vgId:%d not update the checkpoint-info, since update checkpointId:%" PRId64 - " is less than the failed checkpointId:%" PRId64 ", discard the update info", + " is less than the failed checkpointId:%" PRId64 ", discard", id, vgId, pReq->checkpointId, pInfo->pActiveInfo->failedId); - streamMutexUnlock(&pTask->lock); - // always return true + *pContinue = false; return TSDB_CODE_SUCCESS; } + // it's an expired checkpointInfo update msg, we still try to drop the required drop fill-history task. if (pReq->checkpointId <= pInfo->checkpointId) { stDebug("s-task:%s vgId:%d latest checkpointId:%" PRId64 " Ver:%" PRId64 " no need to update checkpoint info, updated checkpointId:%" PRId64 " Ver:%" PRId64 " transId:%d ignored", id, vgId, pInfo->checkpointId, pInfo->checkpointVer, pReq->checkpointId, pReq->checkpointVer, pReq->transId); - streamMutexUnlock(&pTask->lock); - { // destroy the related fill-history tasks - // drop task should not in the meta-lock, and drop the related fill-history task now + { // destroy the related fill-history tasks if (pReq->dropRelHTask) { - code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId); - int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); - stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped in update checkpointInfo, remain tasks:%d", - id, vgId, pReq->taskId, numOfTasks); - } + code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId); - if (pReq->dropRelHTask) { - code = streamMetaCommit(pMeta); + int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); + stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped in update checkpointInfo, remain tasks:%d", + id, vgId, pReq->taskId, numOfTasks); + + //todo: task may not exist, commit anyway, optimize this later + code = streamMetaCommit(pMeta); } } + *pContinue = false; // always return true return TSDB_CODE_SUCCESS; } - SStreamTaskState pStatus = streamTaskGetStatus(pTask); + SStreamTaskState status = streamTaskGetStatus(pTask); if (!restored) { // during restore procedure, do update checkpoint-info stDebug("s-task:%s vgId:%d status:%s update the checkpoint-info during restore, checkpointId:%" PRId64 "->%" PRId64 " checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64, - id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer, + id, vgId, status.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer, pInfo->checkpointTime, pReq->checkpointTs); } else { // not in restore status, must be in checkpoint status - if ((pStatus.state == TASK_STATUS__CK) || (pMeta->role == NODE_ROLE_FOLLOWER)) { - stDebug("s-task:%s vgId:%d status:%s role:%d start to update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64 - " checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64, - id, vgId, pStatus.name, pMeta->role, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, + if (((status.state == TASK_STATUS__CK) && (pMeta->role == NODE_ROLE_LEADER)) || + (pMeta->role == NODE_ROLE_FOLLOWER)) { + stDebug("s-task:%s vgId:%d status:%s role:%d start to update the checkpoint-info, checkpointId:%" PRId64 + "->%" PRId64 " checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64, + id, vgId, status.name, pMeta->role, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer, pInfo->checkpointTime, pReq->checkpointTs); } else { stDebug("s-task:%s vgId:%d status:%s NOT update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64 " checkpointVer:%" PRId64 "->%" PRId64, - id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, + id, vgId, status.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer); } } @@ -665,14 +668,48 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV pInfo->processedVer <= pReq->checkpointVer); if (!valid) { - stFatal("s-task:%s invalid checkpointId update info recv, current checkpointId:%" PRId64 " checkpointVer:%" PRId64 - " processedVer:%" PRId64 " req checkpointId:%" PRId64 " checkpointVer:%" PRId64 " discard it", - id, pInfo->checkpointId, pInfo->checkpointVer, pInfo->processedVer, pReq->checkpointId, - pReq->checkpointVer); - streamMutexUnlock(&pTask->lock); - return TSDB_CODE_STREAM_INTERNAL_ERROR; + // invalid update checkpoint info for leader, since the processedVer is greater than the checkpointVer + // It is possible for follower tasks that the processedVer is greater than the checkpointVer, and the processed info + // in follower tasks will be discarded, since the leader/follower switch happens before the checkpoint of the + // processedVer being generated. + if (pMeta->role == NODE_ROLE_LEADER) { + + stFatal("s-task:%s checkpointId update info recv, current checkpointId:%" PRId64 " checkpointVer:%" PRId64 + " processedVer:%" PRId64 " req checkpointId:%" PRId64 " checkpointVer:%" PRId64 " discard it", + id, pInfo->checkpointId, pInfo->checkpointVer, pInfo->processedVer, pReq->checkpointId, + pReq->checkpointVer); + + *pContinue = false; + return TSDB_CODE_STREAM_INTERNAL_ERROR; + } else { + stInfo("s-task:%s vgId:%d follower recv checkpointId update info, current checkpointId:%" PRId64 + " checkpointVer:%" PRId64 " processedVer:%" PRId64 " req checkpointId:%" PRId64 " checkpointVer:%" PRId64, + id, pMeta->vgId, pInfo->checkpointId, pInfo->checkpointVer, pInfo->processedVer, pReq->checkpointId, + pReq->checkpointVer); + } } + return TSDB_CODE_SUCCESS; +} + +int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SVUpdateCheckpointInfoReq* pReq) { + SStreamMeta* pMeta = pTask->pMeta; + int32_t vgId = pMeta->vgId; + int32_t code = 0; + const char* id = pTask->id.idStr; + SCheckpointInfo* pInfo = &pTask->chkInfo; + bool continueUpdate = true; + + streamMutexLock(&pTask->lock); + code = doUpdateCheckpointInfoCheck(pTask, restored, pReq, &continueUpdate); + + if (!continueUpdate) { + streamMutexUnlock(&pTask->lock); + return code; + } + + SStreamTaskState pStatus = streamTaskGetStatus(pTask); + // update only it is in checkpoint status, or during restore procedure. if ((pStatus.state == TASK_STATUS__CK) || (!restored) || (pMeta->role == NODE_ROLE_FOLLOWER)) { pInfo->checkpointId = pReq->checkpointId; @@ -1537,14 +1574,6 @@ int32_t deleteCheckpointFile(const char* id, const char* name) { int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask) { streamMutexLock(&pTask->lock); ETaskStatus p = streamTaskGetStatus(pTask).state; - // if (pInfo->alreadySendChkptId == true) { - // stDebug("s-task:%s already start to consensus-checkpointId, not start again before it completed", id); - // streamMutexUnlock(&pTask->lock); - // return TSDB_CODE_SUCCESS; - // } else { - // pInfo->alreadySendChkptId = true; - // } - // streamTaskSetReqConsenChkptId(pTask, taosGetTimestampMs()); streamMutexUnlock(&pTask->lock); diff --git a/source/libs/stream/src/streamStartTask.c b/source/libs/stream/src/streamStartTask.c index 9c16ff036e..c40d5ef928 100644 --- a/source/libs/stream/src/streamStartTask.c +++ b/source/libs/stream/src/streamStartTask.c @@ -45,6 +45,10 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { if (numOfTasks == 0) { stInfo("vgId:%d no tasks exist, quit from consensus checkpointId", pMeta->vgId); + + streamMetaWLock(pMeta); + streamMetaResetStartInfo(&pMeta->startInfo, vgId); + streamMetaWUnLock(pMeta); return TSDB_CODE_SUCCESS; }