From 2efd155adfde56d3e4c22fb353748d4b13d3e37a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 21 Sep 2023 15:53:21 +0800 Subject: [PATCH] fix(stream): wait for the task checkpoint before stop. --- source/dnode/vnode/src/tq/tqUtil.c | 9 +++++++-- source/dnode/vnode/src/vnd/vnodeSync.c | 2 +- source/libs/stream/src/streamTask.c | 17 ++++++++++++++++- 3 files changed, 24 insertions(+), 4 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 189b5f2f7f..fac54cfd81 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -37,14 +37,19 @@ int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) { } void tqUpdateNodeStage(STQ* pTq, bool isLeader) { - SSyncState state = syncGetState(pTq->pVnode->sync); + SSyncState state = syncGetState(pTq->pVnode->sync); SStreamMeta* pMeta = pTq->pStreamMeta; - tqInfo("vgId:%d update the meta stage:%"PRId64", prev:%"PRId64" leader:%d", pMeta->vgId, state.term, pMeta->stage, isLeader); + int64_t stage = pMeta->stage; pMeta->stage = state.term; pMeta->leader = isLeader; if (isLeader) { + tqInfo("vgId:%d update meta stage:%" PRId64 ", prev:%" PRId64 " leader:%d, start to send Hb", pMeta->vgId, + state.term, stage, isLeader); streamMetaStartHb(pMeta); + } else { + tqInfo("vgId:%d update meta stage:%" PRId64 " prev:%" PRId64 " leader:%d", pMeta->vgId, state.term, stage, + isLeader); } } diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 42acdd2b40..cf299dc79c 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -612,10 +612,10 @@ static void vnodeBecomeLearner(const SSyncFSM *pFsm) { static void vnodeBecomeLeader(const SSyncFSM *pFsm) { SVnode *pVnode = pFsm->data; + vDebug("vgId:%d, become leader", pVnode->config.vgId); if (pVnode->pTq) { tqUpdateNodeStage(pVnode->pTq, true); } - vDebug("vgId:%d, become leader", pVnode->config.vgId); } static bool vnodeApplyQueueEmpty(const SSyncFSM *pFsm) { diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index c328ff4bbc..777e93da47 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -543,7 +543,22 @@ int32_t streamTaskStop(SStreamTask* pTask) { int64_t st = taosGetTimestampMs(); const char* id = pTask->id.idStr; - pTask->status.taskStatus = TASK_STATUS__STOP; + // we should wait for the task complete the checkpoint operation before stop it, otherwise, the operation maybe blocked + // by the unfinished checkpoint operation, even if the leader has become the follower. + while(1) { + taosThreadMutexLock(&pTask->lock); + + if (pTask->status.taskStatus == TASK_STATUS__CK) { + stDebug("s-task:%s in checkpoint, wait for it completed for 500ms before stop task", pTask->id.idStr); + taosThreadMutexUnlock(&pTask->lock); + taosMsleep(500); + } else { + pTask->status.taskStatus = TASK_STATUS__STOP; + taosThreadMutexUnlock(&pTask->lock); + break; + } + } + qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS); while (/*pTask->status.schedStatus != TASK_SCHED_STATUS__INACTIVE */ !streamTaskIsIdle(pTask)) {