diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index da027d05a6..91196f31e0 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -585,12 +585,18 @@ void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, i } else { // TASK_DOWNSTREAM_NOT_READY if (p->rspTs == 0) { // not response yet if (el >= CHECK_NOT_RSP_DURATION) { // not receive info for 10 sec. - (void)taosArrayPush(pTimeoutList, &p->taskId); + void* px = taosArrayPush(pTimeoutList, &p->taskId); + if (px == NULL) { + stError("s-task:%s failed to record time out task:0x%x", id, p->taskId); + } } else { // el < CHECK_NOT_RSP_DURATION (*numOfNotRsp) += 1; // do nothing and continue waiting for their rsp } } else { - (void)taosArrayPush(pNotReadyList, &p->taskId); + void* px = taosArrayPush(pNotReadyList, &p->taskId); + if (px == NULL) { + stError("s-task:%s failed to record not ready task:0x%x", id, p->taskId); + } } } } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index da882505f4..d0bf24bd03 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -973,7 +973,10 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { if (!recved) { // make sure the inputQ is opened for not recv upstream checkpoint-trigger message streamTaskOpenUpstreamInput(pTask, pInfo->taskId); - (void)taosArrayPush(pNotSendList, pInfo); + void* px = taosArrayPush(pNotSendList, pInfo); + if (px == NULL) { + stError("s-task:%s failed to record not send info, code: out of memory", id); + } } } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 59210fe99f..429d78f61f 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -474,7 +474,10 @@ static void addDispatchEntry(SDispatchMsgInfo* pMsgInfo, int32_t nodeId, int64_t streamMutexLock(&pMsgInfo->lock); } - (void)taosArrayPush(pMsgInfo->pSendInfo, &entry); + void* p = taosArrayPush(pMsgInfo->pSendInfo, &entry); + if (p == NULL) { + stError("failed to add dispatch info"); + } if (lock) { streamMutexUnlock(&pMsgInfo->lock); @@ -671,8 +674,8 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S memcpy(bln.parTbName, pDataBlock->info.parTbName, strlen(pDataBlock->info.parTbName)); // failed to put into name buffer, no need to do anything - if (tSimpleHashGetSize(pTask->pNameMap) < MAX_BLOCK_NAME_NUM) { - (void)tSimpleHashPut(pTask->pNameMap, &groupId, sizeof(int64_t), &bln, sizeof(SBlockName)); + if (tSimpleHashGetSize(pTask->pNameMap) < MAX_BLOCK_NAME_NUM) { // allow error, and do nothing + int32_t code = tSimpleHashPut(pTask->pNameMap, &groupId, sizeof(int64_t), &bln, sizeof(SBlockName)); } } @@ -914,9 +917,13 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) { continue; } - (void)taosArrayPush(pNotRspList, &pInfo->upstreamTaskId); - stDebug("s-task:%s vgId:%d level:%d checkpoint-ready rsp from upstream:0x%x not confirmed yet", id, vgId, - pTask->info.taskLevel, pInfo->upstreamTaskId); + void* p = taosArrayPush(pNotRspList, &pInfo->upstreamTaskId); + if (p == NULL) { + stError("s-task:%s vgId:%d failed to record not rsp task, code: out of memory", id, vgId); + } else { + stDebug("s-task:%s vgId:%d level:%d checkpoint-ready rsp from upstream:0x%x not confirmed yet", id, vgId, + pTask->info.taskLevel, pInfo->upstreamTaskId); + } } int32_t checkpointId = pActiveInfo->activeId; @@ -1100,8 +1107,17 @@ int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatch payloadLen += sizeof(SRetrieveTableRsp); - (void)taosArrayPush(pReq->dataLen, &payloadLen); - (void)taosArrayPush(pReq->data, &buf); + void* px = taosArrayPush(pReq->dataLen, &payloadLen); + if (px == NULL) { + taosMemoryFree(buf); + return terrno; + } + + px = taosArrayPush(pReq->data, &buf); + if (px == NULL) { + taosMemoryFree(buf); + return terrno; + } pReq->totalLen += dataStrLen; return 0; @@ -1221,8 +1237,12 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa pTask->id.idStr, pReady->checkpointId, pReady->transId, pReq->transId, pReq->checkpointId); } } else { - (void)taosArrayPush(pActiveInfo->pReadyMsgList, &info); - stDebug("s-task:%s add checkpoint source rsp msg, total:%d", pTask->id.idStr, size + 1); + void* px = taosArrayPush(pActiveInfo->pReadyMsgList, &info); + if (px != NULL) { + stDebug("s-task:%s add checkpoint source rsp msg, total:%d", pTask->id.idStr, size + 1); + } else { + stError("s-task:%s failed to add readyMsg, code: out of memory", pTask->id.idStr); + } } streamMutexUnlock(&pActiveInfo->lock); @@ -1259,7 +1279,12 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId, SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; streamMutexLock(&pActiveInfo->lock); - (void)taosArrayPush(pActiveInfo->pReadyMsgList, &info); + void* px = taosArrayPush(pActiveInfo->pReadyMsgList, &info); + if (px == NULL) { + streamMutexUnlock(&pActiveInfo->lock); + stError("s-task:%s failed to add readyMsg info, code: out of memory", pTask->id.idStr); + return terrno; + } int32_t numOfRecv = taosArrayGetSize(pActiveInfo->pReadyMsgList); int32_t total = streamTaskGetNumOfUpstream(pTask); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index d6f1559578..c4a6d36edb 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -845,7 +845,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t (void)atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1); } - (void)taosHashRemove(pMeta->pTasksMap, &id, sizeof(id)); + int32_t code = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id)); doRemoveIdFromList(pMeta->pTaskList, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id); (void)streamMetaRemoveTask(pMeta, &id); @@ -1013,7 +1013,10 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) { tFreeStreamTask(pTask); STaskId id = streamTaskGetTaskId(pTask); - (void)taosArrayPush(pRecycleList, &id); + void* px = taosArrayPush(pRecycleList, &id); + if (px == NULL) { + stError("s-task:0x%x failed record the task into recycle list due to out of memory", taskId); + } int32_t total = taosArrayGetSize(pRecycleList); stDebug("s-task:0x%x is already dropped, add into recycle list, total:%d", taskId, total); @@ -1034,7 +1037,10 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) { continue; } - (void)taosArrayPush(pMeta->pTaskList, &pTask->id); + void* px = taosArrayPush(pMeta->pTaskList, &pTask->id); + if (px == NULL) { + stFatal("s-task:0x%x failed to add into task list due to out of memory", pTask->id.taskId); + } } else { // todo this should replace the existed object put by replay creating stream task msg from mnode stError("s-task:0x%x already added into table meta by replaying WAL, need check", pTask->id.taskId); @@ -1044,7 +1050,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) { if (taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES) != 0) { stError("s-task:0x%x failed to put into hashTable, code:%s, continue", pTask->id.taskId, tstrerror(terrno)); - (void)taosArrayPop(pMeta->pTaskList); + void* px = taosArrayPop(pMeta->pTaskList); tFreeStreamTask(pTask); continue; } diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index cbaccff01d..3deed6f9cd 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -191,7 +191,7 @@ static int32_t doHandleWaitingEvent(SStreamTaskSM* pSM, const char* pEventName, GET_EVT_NAME(pEvtInfo->event), pSM->current.name); // remove it - (void) taosArrayPop(pSM->pWaitingEventList); + void* px = taosArrayPop(pSM->pWaitingEventList); STaskStateTrans* pNextTrans = streamTaskFindTransform(pSM->current.state, pEvtInfo->event); ASSERT(pSM->pActiveTrans == NULL && pNextTrans != NULL);