diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 189b5f2f7f..fac54cfd81 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -37,14 +37,19 @@ int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) { } void tqUpdateNodeStage(STQ* pTq, bool isLeader) { - SSyncState state = syncGetState(pTq->pVnode->sync); + SSyncState state = syncGetState(pTq->pVnode->sync); SStreamMeta* pMeta = pTq->pStreamMeta; - tqInfo("vgId:%d update the meta stage:%"PRId64", prev:%"PRId64" leader:%d", pMeta->vgId, state.term, pMeta->stage, isLeader); + int64_t stage = pMeta->stage; pMeta->stage = state.term; pMeta->leader = isLeader; if (isLeader) { + tqInfo("vgId:%d update meta stage:%" PRId64 ", prev:%" PRId64 " leader:%d, start to send Hb", pMeta->vgId, + state.term, stage, isLeader); streamMetaStartHb(pMeta); + } else { + tqInfo("vgId:%d update meta stage:%" PRId64 " prev:%" PRId64 " leader:%d", pMeta->vgId, state.term, stage, + isLeader); } } diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 42acdd2b40..cf299dc79c 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -612,10 +612,10 @@ static void vnodeBecomeLearner(const SSyncFSM *pFsm) { static void vnodeBecomeLeader(const SSyncFSM *pFsm) { SVnode *pVnode = pFsm->data; + vDebug("vgId:%d, become leader", pVnode->config.vgId); if (pVnode->pTq) { tqUpdateNodeStage(pVnode->pTq, true); } - vDebug("vgId:%d, become leader", pVnode->config.vgId); } static bool vnodeApplyQueueEmpty(const SSyncFSM *pFsm) { diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index c328ff4bbc..777e93da47 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -543,7 +543,22 @@ int32_t streamTaskStop(SStreamTask* pTask) { int64_t st = taosGetTimestampMs(); const char* id = pTask->id.idStr; - pTask->status.taskStatus = TASK_STATUS__STOP; + // we should wait for the task complete the checkpoint operation before stop it, otherwise, the operation maybe blocked + // by the unfinished checkpoint operation, even if the leader has become the follower. + while(1) { + taosThreadMutexLock(&pTask->lock); + + if (pTask->status.taskStatus == TASK_STATUS__CK) { + stDebug("s-task:%s in checkpoint, wait for it completed for 500ms before stop task", pTask->id.idStr); + taosThreadMutexUnlock(&pTask->lock); + taosMsleep(500); + } else { + pTask->status.taskStatus = TASK_STATUS__STOP; + taosThreadMutexUnlock(&pTask->lock); + break; + } + } + qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS); while (/*pTask->status.schedStatus != TASK_SCHED_STATUS__INACTIVE */ !streamTaskIsIdle(pTask)) {