diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index b4fcf1edc9..722ab2018e 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -954,7 +954,6 @@ static int32_t doTaskChkptStatusCheck(SStreamTask* pTask, void* param, int32_t n int32_t vgId = pTask->pMeta->vgId; if (pTmrInfo->launchChkptId != pActiveInfo->activeId) { - streamCleanBeforeQuitTmr(pTmrInfo, param); stWarn("s-task:%s vgId:%d ready-msg send tmr launched by previous checkpoint procedure, checkpointId:%" PRId64 ", quit", id, vgId, pTmrInfo->launchChkptId); @@ -963,13 +962,11 @@ static int32_t doTaskChkptStatusCheck(SStreamTask* pTask, void* param, int32_t n // active checkpoint info is cleared for now if ((pActiveInfo->activeId == 0) || (pActiveInfo->transId == 0) || (num == 0) || (pTask->chkInfo.startTs == 0)) { - streamCleanBeforeQuitTmr(pTmrInfo, param); stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from readyMsg send tmr", id, vgId); return -1; } if (taosArrayGetSize(pTask->upstreamInfo.pList) != num) { - streamCleanBeforeQuitTmr(pTmrInfo, param); stWarn("s-task:%s vgId:%d upstream number:%d not equals sent readyMsg:%d, quit from readyMsg send tmr", id, vgId, (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList), num); return -1; @@ -998,6 +995,7 @@ static int32_t doFindNotConfirmUpstream(SArray** ppNotRspList, SArray* pList, in void* p = taosArrayPush(pTmp, &pInfo->upstreamTaskId); if (p == NULL) { stError("s-task:%s vgId:%d failed to record not rsp task, code: out of memory", id, vgId); + taosArrayDestroy(pTmp); return terrno; } else { stDebug("s-task:%s vgId:%d level:%d checkpoint-ready rsp from upstream:0x%x not confirmed yet", id, vgId, level, @@ -1047,13 +1045,13 @@ static void doSendChkptReadyMsg(SStreamTask* pTask, SArray* pNotRspList, int64_t } } -static int32_t chkptReadyMsgSendHelper(SStreamTask* pTask, void* param, SArray* pNotRspList) { +static int32_t chkptReadyMsgSendHelper(SStreamTask* pTask, void* param, SArray** pNotRspList) { SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptReadyMsgTmr; SArray* pList = pActiveInfo->pReadyMsgList; int32_t num = taosArrayGetSize(pList); int32_t vgId = pTask->pMeta->vgId; - int32_t checkpointId = pActiveInfo->activeId; + int64_t checkpointId = pActiveInfo->activeId; const char* id = pTask->id.idStr; int32_t notRsp = 0; @@ -1062,18 +1060,17 @@ static int32_t chkptReadyMsgSendHelper(SStreamTask* pTask, void* param, SArray* return code; } - code = doFindNotConfirmUpstream(&pNotRspList, pList, num, vgId, pTask->info.taskLevel, id); + code = doFindNotConfirmUpstream(pNotRspList, pList, num, vgId, pTask->info.taskLevel, id); if (code) { - streamCleanBeforeQuitTmr(pTmrInfo, param); stError("s-task:%s failed to find not rsp checkpoint-ready downstream, code:%s, out of tmr", id, tstrerror(code)); return code; } - notRsp = taosArrayGetSize(pNotRspList); + notRsp = taosArrayGetSize(*pNotRspList); if (notRsp == 0) { streamClearChkptReadyMsg(pActiveInfo); } else { - doSendChkptReadyMsg(pTask, pNotRspList, checkpointId, pList); + doSendChkptReadyMsg(pTask, *pNotRspList, checkpointId, pList); } return code; @@ -1137,10 +1134,12 @@ static void chkptReadyMsgSendMonitorFn(void* param, void* tmrId) { } streamMutexLock(&pActiveInfo->lock); - code = chkptReadyMsgSendHelper(pTask, param, pNotRspList); + code = chkptReadyMsgSendHelper(pTask, param, &pNotRspList); streamMutexUnlock(&pActiveInfo->lock); if (code != TSDB_CODE_SUCCESS) { + streamCleanBeforeQuitTmr(pTmrInfo, param); + streamMetaReleaseTask(pTask->pMeta, pTask); taosArrayDestroy(pNotRspList); return; @@ -1176,7 +1175,7 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) { int32_t num = taosArrayGetSize(pList); if (taosArrayGetSize(pTask->upstreamInfo.pList) != num) { - stError("s-task:%s invalid number of sent readyMsg:%d to upstream:%d", id, num, + stError("s-task:%s invalid number of sent readyMsg:%d to upstream:%d not send chkpt-ready msg", id, num, (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList)); streamMutexUnlock(&pActiveInfo->lock); return TSDB_CODE_STREAM_INTERNAL_ERROR; @@ -1200,7 +1199,7 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) { stError("s-task:%s failed to send checkpoint-ready msg, try nex time in 10s", id); } } else { - stError("s-task:%s failed to prepare the checkpoint-ready msg, try nex time in 10s", id); + stError("s-task:%s failed to prepare the checkpoint-ready msg, try next time in 10s", id); } } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 85f287f301..89cb4153fe 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -915,8 +915,7 @@ int32_t streamResumeTask(SStreamTask* pTask) { while (1) { code = doStreamExecTask(pTask); if (code) { - stError("s-task:%s failed to exec stream task, code:%s", id, tstrerror(code)); - return code; + stError("s-task:%s failed to exec stream task, code:%s, continue", id, tstrerror(code)); } // check if continue streamMutexLock(&pTask->lock);