fix(stream): do not wait for the checkpoint finish before stop tasks.
This commit is contained in:
parent
7167c3c5de
commit
fc96ec6bae
|
@ -558,21 +558,12 @@ int32_t streamTaskStop(SStreamTask* pTask) {
|
||||||
int64_t st = taosGetTimestampMs();
|
int64_t st = taosGetTimestampMs();
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
|
|
||||||
// we should wait for the task complete the checkpoint operation before stop it, otherwise, the operation maybe blocked
|
taosThreadMutexLock(&pTask->lock);
|
||||||
// by the unfinished checkpoint operation, even if the leader has become the follower.
|
if (pTask->status.taskStatus == TASK_STATUS__CK) {
|
||||||
while(1) {
|
stDebug("s-task:%s in checkpoint will be discarded since task is stopped", pTask->id.idStr);
|
||||||
taosThreadMutexLock(&pTask->lock);
|
|
||||||
|
|
||||||
if (pTask->status.taskStatus == TASK_STATUS__CK) {
|
|
||||||
stDebug("s-task:%s in checkpoint, wait for it completed for 500ms before stop task", pTask->id.idStr);
|
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
|
||||||
taosMsleep(500);
|
|
||||||
} else {
|
|
||||||
pTask->status.taskStatus = TASK_STATUS__STOP;
|
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
pTask->status.taskStatus = TASK_STATUS__STOP;
|
||||||
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
|
qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue