fix(stream): wait for the task checkpoint before stop.
This commit is contained in:
parent
e4073bd9a8
commit
2efd155adf
|
@ -39,12 +39,17 @@ int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
|
||||||
void tqUpdateNodeStage(STQ* pTq, bool isLeader) {
|
void tqUpdateNodeStage(STQ* pTq, bool isLeader) {
|
||||||
SSyncState state = syncGetState(pTq->pVnode->sync);
|
SSyncState state = syncGetState(pTq->pVnode->sync);
|
||||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||||
tqInfo("vgId:%d update the meta stage:%"PRId64", prev:%"PRId64" leader:%d", pMeta->vgId, state.term, pMeta->stage, isLeader);
|
int64_t stage = pMeta->stage;
|
||||||
|
|
||||||
pMeta->stage = state.term;
|
pMeta->stage = state.term;
|
||||||
pMeta->leader = isLeader;
|
pMeta->leader = isLeader;
|
||||||
if (isLeader) {
|
if (isLeader) {
|
||||||
|
tqInfo("vgId:%d update meta stage:%" PRId64 ", prev:%" PRId64 " leader:%d, start to send Hb", pMeta->vgId,
|
||||||
|
state.term, stage, isLeader);
|
||||||
streamMetaStartHb(pMeta);
|
streamMetaStartHb(pMeta);
|
||||||
|
} else {
|
||||||
|
tqInfo("vgId:%d update meta stage:%" PRId64 " prev:%" PRId64 " leader:%d", pMeta->vgId, state.term, stage,
|
||||||
|
isLeader);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -612,10 +612,10 @@ static void vnodeBecomeLearner(const SSyncFSM *pFsm) {
|
||||||
|
|
||||||
static void vnodeBecomeLeader(const SSyncFSM *pFsm) {
|
static void vnodeBecomeLeader(const SSyncFSM *pFsm) {
|
||||||
SVnode *pVnode = pFsm->data;
|
SVnode *pVnode = pFsm->data;
|
||||||
|
vDebug("vgId:%d, become leader", pVnode->config.vgId);
|
||||||
if (pVnode->pTq) {
|
if (pVnode->pTq) {
|
||||||
tqUpdateNodeStage(pVnode->pTq, true);
|
tqUpdateNodeStage(pVnode->pTq, true);
|
||||||
}
|
}
|
||||||
vDebug("vgId:%d, become leader", pVnode->config.vgId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool vnodeApplyQueueEmpty(const SSyncFSM *pFsm) {
|
static bool vnodeApplyQueueEmpty(const SSyncFSM *pFsm) {
|
||||||
|
|
|
@ -543,7 +543,22 @@ int32_t streamTaskStop(SStreamTask* pTask) {
|
||||||
int64_t st = taosGetTimestampMs();
|
int64_t st = taosGetTimestampMs();
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
|
|
||||||
|
// we should wait for the task complete the checkpoint operation before stop it, otherwise, the operation maybe blocked
|
||||||
|
// by the unfinished checkpoint operation, even if the leader has become the follower.
|
||||||
|
while(1) {
|
||||||
|
taosThreadMutexLock(&pTask->lock);
|
||||||
|
|
||||||
|
if (pTask->status.taskStatus == TASK_STATUS__CK) {
|
||||||
|
stDebug("s-task:%s in checkpoint, wait for it completed for 500ms before stop task", pTask->id.idStr);
|
||||||
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
taosMsleep(500);
|
||||||
|
} else {
|
||||||
pTask->status.taskStatus = TASK_STATUS__STOP;
|
pTask->status.taskStatus = TASK_STATUS__STOP;
|
||||||
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
|
qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
|
||||||
|
|
||||||
while (/*pTask->status.schedStatus != TASK_SCHED_STATUS__INACTIVE */ !streamTaskIsIdle(pTask)) {
|
while (/*pTask->status.schedStatus != TASK_SCHED_STATUS__INACTIVE */ !streamTaskIsIdle(pTask)) {
|
||||||
|
|
Loading…
Reference in New Issue