fix(stream): allowed continue hb msg.
This commit is contained in:
parent
71b8f67ea6
commit
a145d9db22
|
@ -558,10 +558,17 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
|
||||||
id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer,
|
id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer,
|
||||||
pInfo->checkpointTime, pReq->checkpointTs);
|
pInfo->checkpointTime, pReq->checkpointTs);
|
||||||
} else { // not in restore status, must be in checkpoint status
|
} else { // not in restore status, must be in checkpoint status
|
||||||
|
if (pStatus.state == TASK_STATUS__CK) {
|
||||||
stDebug("s-task:%s vgId:%d status:%s start to update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64
|
stDebug("s-task:%s vgId:%d status:%s start to update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64
|
||||||
" checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64,
|
" checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64,
|
||||||
id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer,
|
id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer,
|
||||||
pInfo->checkpointTime, pReq->checkpointTs);
|
pReq->checkpointVer, pInfo->checkpointTime, pReq->checkpointTs);
|
||||||
|
} else {
|
||||||
|
stDebug("s-task:%s vgId:%d status:%s NOT update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64
|
||||||
|
" checkpointVer:%" PRId64 "->%" PRId64,
|
||||||
|
id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer,
|
||||||
|
pReq->checkpointVer);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer &&
|
ASSERT(pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer &&
|
||||||
|
@ -573,12 +580,11 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
|
||||||
pInfo->checkpointVer = pReq->checkpointVer;
|
pInfo->checkpointVer = pReq->checkpointVer;
|
||||||
pInfo->checkpointTime = pReq->checkpointTs;
|
pInfo->checkpointTime = pReq->checkpointTs;
|
||||||
|
|
||||||
streamTaskClearCheckInfo(pTask, true);
|
|
||||||
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
|
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
|
||||||
} else {
|
|
||||||
stDebug("s-task:0x%x vgId:%d not handle checkpoint-done event, status:%s", pReq->taskId, vgId, pStatus.name);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
streamTaskClearCheckInfo(pTask, true);
|
||||||
|
|
||||||
if (pReq->dropRelHTask) {
|
if (pReq->dropRelHTask) {
|
||||||
stDebug("s-task:0x%x vgId:%d drop the related fill-history task:0x%" PRIx64 " after update checkpoint",
|
stDebug("s-task:0x%x vgId:%d drop the related fill-history task:0x%" PRIx64 " after update checkpoint",
|
||||||
pReq->taskId, vgId, pReq->hTaskId);
|
pReq->taskId, vgId, pReq->hTaskId);
|
||||||
|
|
|
@ -297,14 +297,10 @@ void streamMetaHbToMnode(void* param, void* tmrId) {
|
||||||
}
|
}
|
||||||
streamMetaRUnLock(pMeta);
|
streamMetaRUnLock(pMeta);
|
||||||
|
|
||||||
if (code != TSDB_CODE_APP_IS_STOPPING) {
|
|
||||||
streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId,
|
streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId,
|
||||||
"meta-hb-tmr");
|
"meta-hb-tmr");
|
||||||
} else {
|
|
||||||
stDebug("vgId:%d is stopping, not start hb again", pMeta->vgId);
|
|
||||||
}
|
|
||||||
|
|
||||||
code = taosReleaseRef(streamMetaId, rid);
|
code = taosReleaseRef(streamMetaId, rid);
|
||||||
|
|
||||||
if (code) {
|
if (code) {
|
||||||
stError("vgId:%d in meta timer, failed to release the meta rid:%" PRId64, pMeta->vgId, rid);
|
stError("vgId:%d in meta timer, failed to release the meta rid:%" PRId64, pMeta->vgId, rid);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue