refactor(stream): move the generate checkpoint procedure out of lock, to avoid blocking heartbeat, and resulting in leader/follower switch.
This commit is contained in:
parent
2d4697d32f
commit
8c671f51fb
|
@ -910,9 +910,12 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
|
||||||
int64_t startTs = pTask->chkInfo.startTs;
|
int64_t startTs = pTask->chkInfo.startTs;
|
||||||
int64_t ckId = pTask->chkInfo.pActiveInfo->activeId;
|
int64_t ckId = pTask->chkInfo.pActiveInfo->activeId;
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT);
|
|
||||||
SStreamMeta* pMeta = pTask->pMeta;
|
SStreamMeta* pMeta = pTask->pMeta;
|
||||||
|
|
||||||
|
streamMutexLock(&pTask->lock);
|
||||||
|
bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT);
|
||||||
|
streamMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
// sink task does not need to save the status, and generated the checkpoint
|
// sink task does not need to save the status, and generated the checkpoint
|
||||||
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
|
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
|
||||||
stDebug("s-task:%s level:%d start gen checkpoint, checkpointId:%" PRId64, id, pTask->info.taskLevel, ckId);
|
stDebug("s-task:%s level:%d start gen checkpoint, checkpointId:%" PRId64, id, pTask->info.taskLevel, ckId);
|
||||||
|
|
|
@ -698,6 +698,8 @@ static int32_t doHandleChkptBlock(SStreamTask* pTask) {
|
||||||
|
|
||||||
streamMutexLock(&pTask->lock);
|
streamMutexLock(&pTask->lock);
|
||||||
SStreamTaskState pState = streamTaskGetStatus(pTask);
|
SStreamTaskState pState = streamTaskGetStatus(pTask);
|
||||||
|
streamMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
if (pState.state == TASK_STATUS__CK) { // todo other thread may change the status
|
if (pState.state == TASK_STATUS__CK) { // todo other thread may change the status
|
||||||
stDebug("s-task:%s checkpoint block received, set status:%s", id, pState.name);
|
stDebug("s-task:%s checkpoint block received, set status:%s", id, pState.name);
|
||||||
code = streamTaskBuildCheckpoint(pTask); // ignore this error msg, and continue
|
code = streamTaskBuildCheckpoint(pTask); // ignore this error msg, and continue
|
||||||
|
@ -715,7 +717,7 @@ static int32_t doHandleChkptBlock(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMutexUnlock(&pTask->lock);
|
// streamMutexUnlock(&pTask->lock);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue