fix(stream): add more check in tmr.
This commit is contained in:
parent
7f04e2cfb9
commit
077745390d
|
@ -74,13 +74,6 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_
|
|||
}
|
||||
|
||||
if (pInfo->stage != stage) {
|
||||
streamMutexLock(&pTask->lock);
|
||||
ETaskStatus status = streamTaskGetStatus(pTask).state;
|
||||
if (status == TASK_STATUS__CK) {
|
||||
streamTaskSetFailedCheckpointId(pTask);
|
||||
}
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
|
||||
return TASK_UPSTREAM_NEW_STAGE;
|
||||
} else if (pTask->status.downstreamReady != 1) {
|
||||
stDebug("s-task:%s vgId:%d leader:%d, downstream not ready", id, vgId, (pTask->pMeta->role == NODE_ROLE_LEADER));
|
||||
|
|
|
@ -838,6 +838,28 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
|
|||
return;
|
||||
}
|
||||
|
||||
if ((pTmrInfo->launchChkptId != pActiveInfo->activeId) || (pActiveInfo->activeId == 0)) {
|
||||
streamMutexUnlock(&pActiveInfo->lock);
|
||||
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
|
||||
stWarn("s-task:%s vgId:%d checkpoint-trigger retrieve by previous checkpoint procedure, checkpointId:%" PRId64
|
||||
", quit, ref:%d",
|
||||
id, vgId, pTmrInfo->launchChkptId, ref);
|
||||
|
||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
return;
|
||||
}
|
||||
|
||||
// active checkpoint info is cleared for now
|
||||
if ((pActiveInfo->activeId == 0) || (pActiveInfo->transId == 0) || (pTask->chkInfo.startTs == 0)) {
|
||||
streamMutexUnlock(&pActiveInfo->lock);
|
||||
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
|
||||
stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from retrieve checkpoint-trigger send tmr, ref:%d",
|
||||
id, vgId, ref);
|
||||
|
||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
return;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
|
||||
SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pList, i);
|
||||
|
||||
|
|
|
@ -819,8 +819,19 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
|
|||
SArray* pList = pActiveInfo->pReadyMsgList;
|
||||
int32_t num = taosArrayGetSize(pList);
|
||||
|
||||
if (pTmrInfo->launchChkptId != pActiveInfo->activeId) {
|
||||
streamMutexUnlock(&pActiveInfo->lock);
|
||||
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
|
||||
stWarn("s-task:%s vgId:%d ready-msg send tmr launched by previous checkpoint procedure, checkpointId:%" PRId64
|
||||
", quit, ref:%d",
|
||||
id, vgId, pTmrInfo->launchChkptId, ref);
|
||||
|
||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
return;
|
||||
}
|
||||
|
||||
// active checkpoint info is cleared for now
|
||||
if ((pActiveInfo->activeId == 0) && (pActiveInfo->transId == 0) && (num == 0) && (pTask->chkInfo.startTs == 0)) {
|
||||
if ((pActiveInfo->activeId == 0) || (pActiveInfo->transId == 0) || (num == 0) || (pTask->chkInfo.startTs == 0)) {
|
||||
streamMutexUnlock(&pActiveInfo->lock);
|
||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||
stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from readyMsg send tmr, ref:%d", id, vgId, ref);
|
||||
|
|
Loading…
Reference in New Issue