refactor(stream): move the generate checkpoint procedure out of lock, to avoid blocking heartbeat, and resulting in leader/follower switch.
This commit is contained in:
parent
7b56df0adc
commit
b6707fbbd2
|
@ -910,9 +910,12 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
|
|||
int64_t startTs = pTask->chkInfo.startTs;
|
||||
int64_t ckId = pTask->chkInfo.pActiveInfo->activeId;
|
||||
const char* id = pTask->id.idStr;
|
||||
bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT);
|
||||
SStreamMeta* pMeta = pTask->pMeta;
|
||||
|
||||
streamMutexLock(&pTask->lock);
|
||||
bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT);
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
|
||||
// sink task does not need to save the status, and generated the checkpoint
|
||||
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
|
||||
stDebug("s-task:%s level:%d start gen checkpoint, checkpointId:%" PRId64, id, pTask->info.taskLevel, ckId);
|
||||
|
|
|
@ -698,6 +698,8 @@ static int32_t doHandleChkptBlock(SStreamTask* pTask) {
|
|||
|
||||
streamMutexLock(&pTask->lock);
|
||||
SStreamTaskState pState = streamTaskGetStatus(pTask);
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
|
||||
if (pState.state == TASK_STATUS__CK) { // todo other thread may change the status
|
||||
stDebug("s-task:%s checkpoint block received, set status:%s", id, pState.name);
|
||||
code = streamTaskBuildCheckpoint(pTask); // ignore this error msg, and continue
|
||||
|
@ -715,7 +717,7 @@ static int32_t doHandleChkptBlock(SStreamTask* pTask) {
|
|||
}
|
||||
}
|
||||
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
// streamMutexUnlock(&pTask->lock);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue