fix(stream): add more check in tmr.

This commit is contained in:
Haojun Liao 2024-08-03 16:34:26 +08:00
parent 27cadbbcb6
commit 02b59d0b33
3 changed files with 24 additions and 9 deletions

View File

@ -74,13 +74,6 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_
}
if (pInfo->stage != stage) {
streamMutexLock(&pTask->lock);
ETaskStatus status = streamTaskGetStatus(pTask).state;
if (status == TASK_STATUS__CK) {
streamTaskSetFailedCheckpointId(pTask);
}
streamMutexUnlock(&pTask->lock);
return TASK_UPSTREAM_NEW_STAGE;
} else if (pTask->status.downstreamReady != 1) {
stDebug("s-task:%s vgId:%d leader:%d, downstream not ready", id, vgId, (pTask->pMeta->role == NODE_ROLE_LEADER));

View File

@ -855,6 +855,28 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
return;
}
if ((pTmrInfo->launchChkptId != pActiveInfo->activeId) || (pActiveInfo->activeId == 0)) {
streamMutexUnlock(&pActiveInfo->lock);
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stWarn("s-task:%s vgId:%d checkpoint-trigger retrieve by previous checkpoint procedure, checkpointId:%" PRId64
", quit, ref:%d",
id, vgId, pTmrInfo->launchChkptId, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
// active checkpoint info is cleared for now
if ((pActiveInfo->activeId == 0) || (pActiveInfo->transId == 0) || (pTask->chkInfo.startTs == 0)) {
streamMutexUnlock(&pActiveInfo->lock);
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from retrieve checkpoint-trigger send tmr, ref:%d",
id, vgId, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pList, i);

View File

@ -820,7 +820,7 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
SArray* pList = pActiveInfo->pReadyMsgList;
int32_t num = taosArrayGetSize(pList);
if (pTmrInfo->launchChkptId < pActiveInfo->activeId) {
if (pTmrInfo->launchChkptId != pActiveInfo->activeId) {
streamMutexUnlock(&pActiveInfo->lock);
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stWarn("s-task:%s vgId:%d ready-msg send tmr launched by previous checkpoint procedure, checkpointId:%" PRId64
@ -832,7 +832,7 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
}
// active checkpoint info is cleared for now
if ((pActiveInfo->activeId == 0) && (pActiveInfo->transId == 0) && (num == 0) && (pTask->chkInfo.startTs == 0)) {
if ((pActiveInfo->activeId == 0) || (pActiveInfo->transId == 0) || (num == 0) || (pTask->chkInfo.startTs == 0)) {
streamMutexUnlock(&pActiveInfo->lock);
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from readyMsg send tmr, ref:%d", id, vgId, ref);