fix(stream): check error and continue computing, fix memory leak.

This commit is contained in:
Haojun Liao 2024-12-26 10:42:34 +08:00
parent ac6c6278b7
commit 44db20492c
2 changed files with 12 additions and 14 deletions

View File

@ -954,7 +954,6 @@ static int32_t doTaskChkptStatusCheck(SStreamTask* pTask, void* param, int32_t n
int32_t vgId = pTask->pMeta->vgId; int32_t vgId = pTask->pMeta->vgId;
if (pTmrInfo->launchChkptId != pActiveInfo->activeId) { if (pTmrInfo->launchChkptId != pActiveInfo->activeId) {
streamCleanBeforeQuitTmr(pTmrInfo, param);
stWarn("s-task:%s vgId:%d ready-msg send tmr launched by previous checkpoint procedure, checkpointId:%" PRId64 stWarn("s-task:%s vgId:%d ready-msg send tmr launched by previous checkpoint procedure, checkpointId:%" PRId64
", quit", ", quit",
id, vgId, pTmrInfo->launchChkptId); id, vgId, pTmrInfo->launchChkptId);
@ -963,13 +962,11 @@ static int32_t doTaskChkptStatusCheck(SStreamTask* pTask, void* param, int32_t n
// active checkpoint info is cleared for now // active checkpoint info is cleared for now
if ((pActiveInfo->activeId == 0) || (pActiveInfo->transId == 0) || (num == 0) || (pTask->chkInfo.startTs == 0)) { if ((pActiveInfo->activeId == 0) || (pActiveInfo->transId == 0) || (num == 0) || (pTask->chkInfo.startTs == 0)) {
streamCleanBeforeQuitTmr(pTmrInfo, param);
stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from readyMsg send tmr", id, vgId); stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from readyMsg send tmr", id, vgId);
return -1; return -1;
} }
if (taosArrayGetSize(pTask->upstreamInfo.pList) != num) { if (taosArrayGetSize(pTask->upstreamInfo.pList) != num) {
streamCleanBeforeQuitTmr(pTmrInfo, param);
stWarn("s-task:%s vgId:%d upstream number:%d not equals sent readyMsg:%d, quit from readyMsg send tmr", id, stWarn("s-task:%s vgId:%d upstream number:%d not equals sent readyMsg:%d, quit from readyMsg send tmr", id,
vgId, (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList), num); vgId, (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList), num);
return -1; return -1;
@ -998,6 +995,7 @@ static int32_t doFindNotConfirmUpstream(SArray** ppNotRspList, SArray* pList, in
void* p = taosArrayPush(pTmp, &pInfo->upstreamTaskId); void* p = taosArrayPush(pTmp, &pInfo->upstreamTaskId);
if (p == NULL) { if (p == NULL) {
stError("s-task:%s vgId:%d failed to record not rsp task, code: out of memory", id, vgId); stError("s-task:%s vgId:%d failed to record not rsp task, code: out of memory", id, vgId);
taosArrayDestroy(pTmp);
return terrno; return terrno;
} else { } else {
stDebug("s-task:%s vgId:%d level:%d checkpoint-ready rsp from upstream:0x%x not confirmed yet", id, vgId, level, stDebug("s-task:%s vgId:%d level:%d checkpoint-ready rsp from upstream:0x%x not confirmed yet", id, vgId, level,
@ -1047,13 +1045,13 @@ static void doSendChkptReadyMsg(SStreamTask* pTask, SArray* pNotRspList, int64_t
} }
} }
static int32_t chkptReadyMsgSendHelper(SStreamTask* pTask, void* param, SArray* pNotRspList) { static int32_t chkptReadyMsgSendHelper(SStreamTask* pTask, void* param, SArray** pNotRspList) {
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptReadyMsgTmr; SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptReadyMsgTmr;
SArray* pList = pActiveInfo->pReadyMsgList; SArray* pList = pActiveInfo->pReadyMsgList;
int32_t num = taosArrayGetSize(pList); int32_t num = taosArrayGetSize(pList);
int32_t vgId = pTask->pMeta->vgId; int32_t vgId = pTask->pMeta->vgId;
int32_t checkpointId = pActiveInfo->activeId; int64_t checkpointId = pActiveInfo->activeId;
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
int32_t notRsp = 0; int32_t notRsp = 0;
@ -1062,18 +1060,17 @@ static int32_t chkptReadyMsgSendHelper(SStreamTask* pTask, void* param, SArray*
return code; return code;
} }
code = doFindNotConfirmUpstream(&pNotRspList, pList, num, vgId, pTask->info.taskLevel, id); code = doFindNotConfirmUpstream(pNotRspList, pList, num, vgId, pTask->info.taskLevel, id);
if (code) { if (code) {
streamCleanBeforeQuitTmr(pTmrInfo, param);
stError("s-task:%s failed to find not rsp checkpoint-ready downstream, code:%s, out of tmr", id, tstrerror(code)); stError("s-task:%s failed to find not rsp checkpoint-ready downstream, code:%s, out of tmr", id, tstrerror(code));
return code; return code;
} }
notRsp = taosArrayGetSize(pNotRspList); notRsp = taosArrayGetSize(*pNotRspList);
if (notRsp == 0) { if (notRsp == 0) {
streamClearChkptReadyMsg(pActiveInfo); streamClearChkptReadyMsg(pActiveInfo);
} else { } else {
doSendChkptReadyMsg(pTask, pNotRspList, checkpointId, pList); doSendChkptReadyMsg(pTask, *pNotRspList, checkpointId, pList);
} }
return code; return code;
@ -1137,10 +1134,12 @@ static void chkptReadyMsgSendMonitorFn(void* param, void* tmrId) {
} }
streamMutexLock(&pActiveInfo->lock); streamMutexLock(&pActiveInfo->lock);
code = chkptReadyMsgSendHelper(pTask, param, pNotRspList); code = chkptReadyMsgSendHelper(pTask, param, &pNotRspList);
streamMutexUnlock(&pActiveInfo->lock); streamMutexUnlock(&pActiveInfo->lock);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
streamCleanBeforeQuitTmr(pTmrInfo, param);
streamMetaReleaseTask(pTask->pMeta, pTask); streamMetaReleaseTask(pTask->pMeta, pTask);
taosArrayDestroy(pNotRspList); taosArrayDestroy(pNotRspList);
return; return;
@ -1176,7 +1175,7 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) {
int32_t num = taosArrayGetSize(pList); int32_t num = taosArrayGetSize(pList);
if (taosArrayGetSize(pTask->upstreamInfo.pList) != num) { if (taosArrayGetSize(pTask->upstreamInfo.pList) != num) {
stError("s-task:%s invalid number of sent readyMsg:%d to upstream:%d", id, num, stError("s-task:%s invalid number of sent readyMsg:%d to upstream:%d not send chkpt-ready msg", id, num,
(int32_t)taosArrayGetSize(pTask->upstreamInfo.pList)); (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList));
streamMutexUnlock(&pActiveInfo->lock); streamMutexUnlock(&pActiveInfo->lock);
return TSDB_CODE_STREAM_INTERNAL_ERROR; return TSDB_CODE_STREAM_INTERNAL_ERROR;
@ -1200,7 +1199,7 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) {
stError("s-task:%s failed to send checkpoint-ready msg, try nex time in 10s", id); stError("s-task:%s failed to send checkpoint-ready msg, try nex time in 10s", id);
} }
} else { } else {
stError("s-task:%s failed to prepare the checkpoint-ready msg, try nex time in 10s", id); stError("s-task:%s failed to prepare the checkpoint-ready msg, try next time in 10s", id);
} }
} }

View File

@ -915,8 +915,7 @@ int32_t streamResumeTask(SStreamTask* pTask) {
while (1) { while (1) {
code = doStreamExecTask(pTask); code = doStreamExecTask(pTask);
if (code) { if (code) {
stError("s-task:%s failed to exec stream task, code:%s", id, tstrerror(code)); stError("s-task:%s failed to exec stream task, code:%s, continue", id, tstrerror(code));
return code;
} }
// check if continue // check if continue
streamMutexLock(&pTask->lock); streamMutexLock(&pTask->lock);