From ade444b6908e986072ef81e54950200f57d80e0d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 5 Aug 2024 14:45:54 +0800 Subject: [PATCH] fix(stream): check return value. --- source/libs/stream/src/streamCheckStatus.c | 39 ++++++- source/libs/stream/src/streamData.c | 4 +- source/libs/stream/src/streamDispatch.c | 108 +++++++++++++++++--- source/libs/stream/src/streamMsg.c | 24 ++++- source/libs/stream/src/streamSessionState.c | 8 +- source/libs/stream/src/streamSnapshot.c | 4 + source/libs/stream/src/streamTask.c | 11 ++ source/libs/stream/src/streamTaskSm.c | 11 ++ 8 files changed, 180 insertions(+), 29 deletions(-) diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index c9ba6ffcfe..9a2323582c 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -133,6 +133,9 @@ void streamTaskSendCheckMsg(SStreamTask* pTask) { for (int32_t i = 0; i < numOfVgs; i++) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); + if (pVgInfo == NULL) { + continue; + } setCheckDownstreamReqInfo(&req, tGenIdPI64(), pVgInfo->taskId, pVgInfo->vgId); streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, pVgInfo->taskId, pVgInfo->vgId, idstr); @@ -370,6 +373,10 @@ void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) { bool existed = false; for (int i = 0; i < num; ++i) { SDownstreamTaskEpset* p = taosArrayGet(pTask->outputInfo.pNodeEpsetUpdateList, i); + if (p == NULL) { + continue; + } + if (p->nodeId == nodeId) { existed = true; break; @@ -412,6 +419,10 @@ void findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId, SDownstreamStatus *pStatusInfo = NULL; for (int32_t j = 0; j < taosArrayGetSize(pInfo->pList); ++j) { SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, j); + if (p == NULL) { + continue; + } + if (p->taskId == taskId) { *pStatusInfo = p; } @@ -546,6 +557,9 @@ void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) { for (int32_t i = 0; i < numOfVgs; i++) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); + if (pVgInfo == NULL) { + continue; + } if (p->taskId == pVgInfo->taskId) { setCheckDownstreamReqInfo(&req, p->reqId, pVgInfo->taskId, pVgInfo->vgId); @@ -566,6 +580,10 @@ void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, i int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id) { for (int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) { SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i); + if (p == NULL) { + continue; + } + if (p->status == TASK_DOWNSTREAM_READY) { (*numOfReady) += 1; } else if (p->status == TASK_UPSTREAM_NEW_STAGE || p->status == TASK_DOWNSTREAM_NOT_LEADER) { @@ -603,8 +621,12 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) { pInfo->timeoutStartTs = taosGetTimestampMs(); for (int32_t i = 0; i < numOfTimeout; ++i) { - int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i); + int32_t* px = taosArrayGet(pTimeoutList, i); + if (px == NULL) { + continue; + } + int32_t taskId = *px; SDownstreamStatusInfo* p = NULL; findCheckRspStatus(pInfo, taskId, &p); if (p != NULL) { @@ -620,9 +642,13 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) { pInfo->timeoutRetryCount = 0; for (int32_t i = 0; i < numOfTimeout; ++i) { - int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i); + int32_t* pTaskId = taosArrayGet(pTimeoutList, i); + if (pTaskId == NULL) { + continue; + } + SDownstreamStatusInfo* p = NULL; - findCheckRspStatus(pInfo, taskId, &p); + findCheckRspStatus(pInfo, *pTaskId, &p); if (p != NULL) { addIntoNodeUpdateList(pTask, p->vgId); stDebug("s-task:%s vgId:%d downstream task:0x%x (vgId:%d) timeout more than 100sec, add into nodeUpate list", @@ -647,10 +673,13 @@ void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) { // reset the info, and send the check msg to failure downstream again for (int32_t i = 0; i < numOfNotReady; ++i) { - int32_t taskId = *(int32_t*)taosArrayGet(pNotReadyList, i); + int32_t* pTaskId = taosArrayGet(pNotReadyList, i); + if (pTaskId == NULL) { + continue; + } SDownstreamStatusInfo* p = NULL; - findCheckRspStatus(pInfo, taskId, &p); + findCheckRspStatus(pInfo, *pTaskId, &p); if (p != NULL) { p->rspTs = 0; p->status = -1; diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index eb846b5a92..0602bf9334 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -36,8 +36,8 @@ int32_t createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t ASSERT((pReq->blockNum == taosArrayGetSize(pReq->data)) && (pReq->blockNum == taosArrayGetSize(pReq->dataLen))); for (int32_t i = 0; i < blockNum; i++) { SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pReq->data, i); - SSDataBlock* pDataBlock = taosArrayGet(pArray, i); - if (pDataBlock == NULL) { + SSDataBlock* pDataBlock = taosArrayGet(pArray, i); + if (pDataBlock == NULL || pRetrieve == NULL) { return terrno; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index d245548ce5..dfd9d3e8df 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -268,6 +268,10 @@ static SStreamDispatchReq* createDispatchDataReq(SStreamTask* pTask, const SStre for (int32_t i = 0; i < numOfVgroups; i++) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); + if (pVgInfo == NULL) { + continue; + } + code = tInitStreamDispatchReq(&pReqs[i], pTask, pData->srcVgId, 0, pVgInfo->taskId, pData->type); if (code != TSDB_CODE_SUCCESS) { destroyDispatchMsg(pReqs, numOfVgroups); @@ -300,6 +304,10 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD if (pData->type == STREAM_INPUT__CHECKPOINT_TRIGGER) { SSDataBlock* p = taosArrayGet(pData->blocks, 0); + if (p == NULL) { + return terrno; + } + pTask->msgInfo.checkpointId = p->info.version; pTask->msgInfo.transId = p->info.window.ekey; } @@ -313,6 +321,11 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { for (int32_t i = 0; i < numOfBlocks; i++) { SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i); + if (pDataBlock == NULL) { + destroyDispatchMsg(pReqs, 1); + return terrno; + } + code = streamAddBlockIntoDispatchMsg(pDataBlock, pReqs); if (code != TSDB_CODE_SUCCESS) { destroyDispatchMsg(pReqs, 1); @@ -328,6 +341,10 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD for (int32_t i = 0; i < numOfBlocks; i++) { SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i); + if (pDataBlock == NULL) { + destroyDispatchMsg(pReqs, numOfVgroups); + return terrno; + } // TODO: do not use broadcast if (pDataBlock->info.type == STREAM_DELETE_RESULT || pDataBlock->info.type == STREAM_CHECKPOINT || @@ -342,6 +359,10 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD // it's a new vnode to receive dispatch msg, so add one if (pReqs[j].blockNum == 0) { SVgroupInfo* pDstVgroupInfo = taosArrayGet(vgInfo, j); + if (pDstVgroupInfo == NULL) { + destroyDispatchMsg(pReqs, numOfVgroups); + return terrno; + } addDispatchEntry(&pTask->msgInfo, pDstVgroupInfo->vgId, now, true); } @@ -399,6 +420,11 @@ static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatch for (int32_t i = 0; i < numOfVgroups; i++) { if (pDispatchMsg[i].blockNum > 0) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); + if (pVgInfo == NULL) { + code = terrno; + break; + } + stDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", id, pTask->info.selfChildId, pDispatchMsg[i].blockNum, pVgInfo->vgId); @@ -457,6 +483,10 @@ static void doSendFailedDispatch(SStreamTask* pTask, SDispatchEntry* pEntry, int setResendInfo(pEntry, now); for (int32_t j = 0; j < numOfVgroups; ++j) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j); + if (pVgInfo == NULL) { + continue; + } + if (pVgInfo->vgId == pEntry->nodeId) { int32_t code = doSendDispatchMsg(pTask, &pReq[j], pVgInfo->vgId, &pVgInfo->epSet); stDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d for %s, msgId:%d, code:%s", @@ -521,6 +551,10 @@ static void doMonitorDispatchData(void* param, void* tmrId) { int32_t numOfRetry = 0; for (int32_t i = 0; i < taosArrayGetSize(pTask->msgInfo.pSendInfo); ++i) { SDispatchEntry* pEntry = taosArrayGet(pTask->msgInfo.pSendInfo, i); + if (pEntry == NULL) { + continue; + } + if (pEntry->status == TSDB_CODE_SUCCESS && pEntry->rspTs > 0) { continue; } @@ -553,14 +587,17 @@ static void doMonitorDispatchData(void* param, void* tmrId) { SEpSet* pEpSet = &pTask->outputInfo.fixedDispatcher.epSet; int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId; - ASSERT(taosArrayGetSize(pTask->msgInfo.pSendInfo) == 1); + int32_t s = taosArrayGetSize(pTask->msgInfo.pSendInfo); SDispatchEntry* pEntry = taosArrayGet(pTask->msgInfo.pSendInfo, 0); + if (pEntry != NULL) { + setResendInfo(pEntry, now); + code = doSendDispatchMsg(pTask, pReq, dstVgId, pEpSet); - setResendInfo(pEntry, now); - code = doSendDispatchMsg(pTask, pReq, dstVgId, pEpSet); - - stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), msgId:%d, code:%s", id, - pTask->info.selfChildId, 1, downstreamTaskId, dstVgId, msgId, tstrerror(code)); + stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), msgId:%d, code:%s", id, + pTask->info.selfChildId, 1, downstreamTaskId, dstVgId, msgId, tstrerror(code)); + } else { + stError("s-task:%s invalid index 0, size:%d", id, s); + } } } @@ -637,6 +674,9 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S for (int32_t j = 0; j < numOfVgroups; j++) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j); + if (pVgInfo == NULL) { + continue; + } if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) { if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) { @@ -646,7 +686,9 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S if (pReqs[j].blockNum == 0) { SVgroupInfo* pDstVgroupInfo = taosArrayGet(vgInfo, j); - addDispatchEntry(&pTask->msgInfo, pDstVgroupInfo->vgId, now, false); + if (pDstVgroupInfo != NULL) { + addDispatchEntry(&pTask->msgInfo, pDstVgroupInfo->vgId, now, false); + } } pReqs[j].blockNum++; @@ -832,6 +874,10 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) { for (int32_t i = 0; i < num; ++i) { STaskCheckpointReadyInfo* pInfo = taosArrayGet(pList, i); + if (pInfo == NULL) { + continue; + } + if (pInfo->sendCompleted == 1) { continue; } @@ -846,11 +892,18 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) { int32_t notRsp = taosArrayGetSize(pNotRspList); if (notRsp > 0) { // send checkpoint-ready msg again for (int32_t i = 0; i < taosArrayGetSize(pNotRspList); ++i) { - int32_t taskId = *(int32_t*)taosArrayGet(pNotRspList, i); + int32_t* pTaskId = taosArrayGet(pNotRspList, i); + if (pTaskId == NULL) { + continue; + } for (int32_t j = 0; j < num; ++j) { STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pList, j); - if (taskId == pReadyInfo->upstreamTaskId) { // send msg again + if (pReadyInfo == NULL) { + continue; + } + + if (*pTaskId == pReadyInfo->upstreamTaskId) { // send msg again SRpcMsg msg = {0}; int32_t code = initCheckpointReadyMsg(pTask, pReadyInfo->upstreamNodeId, pReadyInfo->upstreamTaskId, pReadyInfo->childId, @@ -902,6 +955,9 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) { for (int32_t i = 0; i < num; ++i) { STaskCheckpointReadyInfo* pInfo = taosArrayGet(pList, i); + if (pInfo == NULL) { + continue; + } SRpcMsg msg = {0}; int32_t code = initCheckpointReadyMsg(pTask, pInfo->upstreamNodeId, pInfo->upstreamTaskId, pInfo->childId, @@ -945,11 +1001,14 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) { streamMutexLock(&pTask->chkInfo.pActiveInfo->lock); if (taosArrayGetSize(pList) == 1) { STaskCheckpointReadyInfo* pInfo = taosArrayGet(pList, 0); - tmsgSendRsp(&pInfo->msg); - - taosArrayClear(pList); - stDebug("s-task:%s level:%d checkpoint-source rsp completed msg sent to mnode", pTask->id.idStr, - pTask->info.taskLevel); + if (pInfo != NULL) { + tmsgSendRsp(&pInfo->msg); + taosArrayClear(pList); + stDebug("s-task:%s level:%d checkpoint-source rsp completed msg sent to mnode", pTask->id.idStr, + pTask->info.taskLevel); + } else { + // todo + } } else { stDebug("s-task:%s level:%d already send checkpoint-source rsp success to mnode", pTask->id.idStr, pTask->info.taskLevel); @@ -1097,6 +1156,10 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa ASSERT(size == 1); STaskCheckpointReadyInfo* pReady = taosArrayGet(pActiveInfo->pReadyMsgList, 0); + if (pReady == NULL) { + return terrno; + } + if (pReady->transId == pReq->transId) { stWarn("s-task:%s repeatly recv checkpoint source msg from mnode, checkpointId:%" PRId64 ", ignore", pTask->id.idStr, pReq->checkpointId); @@ -1104,7 +1167,6 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa stError("s-task:%s checkpointId:%" PRId64 " transId:%d not completed, new transId:%d checkpointId:%" PRId64 " recv from mnode", pTask->id.idStr, pReady->checkpointId, pReady->transId, pReq->transId, pReq->checkpointId); - ASSERT(0); // failed to handle it } } else { (void) taosArrayPush(pActiveInfo->pReadyMsgList, &info); @@ -1168,7 +1230,9 @@ void streamClearChkptReadyMsg(SActiveCheckpointInfo* pActiveInfo) { for (int i = 0; i < taosArrayGetSize(pActiveInfo->pReadyMsgList); i++) { STaskCheckpointReadyInfo* pInfo = taosArrayGet(pActiveInfo->pReadyMsgList, i); - rpcFreeCont(pInfo->msg.pCont); + if (pInfo != NULL) { + rpcFreeCont(pInfo->msg.pCont); + } } taosArrayClear(pActiveInfo->pReadyMsgList); @@ -1215,6 +1279,10 @@ static bool setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int32_t for(int32_t i = 0; i < numOfDispatchBranch; ++i) { SDispatchEntry* pEntry = taosArrayGet(pMsgInfo->pSendInfo, i); + if (pEntry == NULL) { + continue; + } + if (pEntry->rspTs != -1) { numOfRsp += 1; } @@ -1222,6 +1290,10 @@ static bool setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int32_t for (int32_t j = 0; j < numOfDispatchBranch; ++j) { SDispatchEntry* pEntry = taosArrayGet(pMsgInfo->pSendInfo, j); + if (pEntry == NULL) { + continue; + } + if (pEntry->nodeId == vgId) { ASSERT(!alreadySet); pEntry->rspTs = now; @@ -1254,6 +1326,10 @@ int32_t getFailedDispatchInfo(SDispatchMsgInfo* pMsgInfo, int64_t now) { for (int32_t j = 0; j < taosArrayGetSize(pMsgInfo->pSendInfo); ++j) { SDispatchEntry* pEntry = taosArrayGet(pMsgInfo->pSendInfo, j); + if (pEntry == NULL) { + continue; + } + if (pEntry->status != TSDB_CODE_SUCCESS || isDispatchRspTimeout(pEntry, now)) { numOfFailed += 1; } diff --git a/source/libs/stream/src/streamMsg.c b/source/libs/stream/src/streamMsg.c index bc0faacb32..1a48b594ea 100644 --- a/source/libs/stream/src/streamMsg.c +++ b/source/libs/stream/src/streamMsg.c @@ -87,6 +87,10 @@ int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpda for (int32_t i = 0; i < size; ++i) { SNodeUpdateInfo* pInfo = taosArrayGet(pMsg->pNodeList, i); + if (pInfo == NULL) { + return terrno; + } + if (tEncodeI32(pEncoder, pInfo->nodeId) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pInfo->prevEp) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pInfo->newEp) < 0) return -1; @@ -228,10 +232,14 @@ int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* p ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum); ASSERT(taosArrayGetSize(pReq->dataLen) == pReq->blockNum); for (int32_t i = 0; i < pReq->blockNum; i++) { - int32_t len = *(int32_t*)taosArrayGet(pReq->dataLen, i); - void* data = taosArrayGetP(pReq->data, i); - if (tEncodeI32(pEncoder, len) < 0) return -1; - if (tEncodeBinary(pEncoder, data, len) < 0) return -1; + int32_t* pLen = taosArrayGet(pReq->dataLen, i); + void* data = taosArrayGetP(pReq->data, i); + if (data == NULL || pLen == NULL) { + return terrno; + } + + if (tEncodeI32(pEncoder, *pLen) < 0) return -1; + if (tEncodeBinary(pEncoder, data, *pLen) < 0) return -1; } tEndEncode(pEncoder); return pEncoder->pos; @@ -341,6 +349,10 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) { for (int32_t i = 0; i < pReq->numOfTasks; ++i) { STaskStatusEntry* ps = taosArrayGet(pReq->pTaskStatus, i); + if (ps == NULL) { + return terrno; + } + if (tEncodeI64(pEncoder, ps->id.streamId) < 0) return -1; if (tEncodeI32(pEncoder, ps->id.taskId) < 0) return -1; if (tEncodeI32(pEncoder, ps->status) < 0) return -1; @@ -378,6 +390,10 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) { for (int j = 0; j < numOfVgs; ++j) { int32_t* pVgId = taosArrayGet(pReq->pUpdateNodes, j); + if (pVgId == NULL) { + return terrno; + } + if (tEncodeI32(pEncoder, *pVgId) < 0) return -1; } diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index 66915b7cfa..50a587e353 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -64,8 +64,12 @@ int32_t binarySearch(void* keyList, int num, const void* key, __session_compare_ int64_t getSessionWindowEndkey(void* data, int32_t index) { SArray* pWinInfos = (SArray*)data; SRowBuffPos** ppos = taosArrayGet(pWinInfos, index); - SSessionKey* pWin = (SSessionKey*)((*ppos)->pKey); - return pWin->win.ekey; + if (ppos != NULL) { + SSessionKey* pWin = (SSessionKey*)((*ppos)->pKey); + return pWin->win.ekey; + } else { + return 0; + } } bool inSessionWindow(SSessionKey* pKey, TSKEY ts, int64_t gap) { diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index 3c27210a23..9122af0e12 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -399,6 +399,10 @@ void snapFileDestroy(SBackendSnapFile2* pSnap) { // unite read/write snap file for (int32_t i = 0; i < taosArrayGetSize(pSnap->pFileList); i++) { SBackendFileItem* pItem = taosArrayGet(pSnap->pFileList, i); + if (pItem == NULL) { + continue; + } + if (pItem->ref == 0) { taosMemoryFree(pItem->name); } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index c5b1284560..59ce9e8d42 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -555,6 +555,9 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE for (int32_t i = 0; i < taosArrayGetSize(pVgs); i++) { SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i); + if (pVgInfo == NULL) { + continue; + } if (pVgInfo->vgId == nodeId) { bool isEqual = isEpsetEqual(&pVgInfo->epSet, pEpSet); @@ -636,6 +639,10 @@ bool streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) { bool updated = false; for (int32_t i = 0; i < taosArrayGetSize(pNodeList); ++i) { SNodeUpdateInfo* pInfo = taosArrayGet(pNodeList, i); + if (pInfo == NULL) { + continue; + } + int32_t code = doUpdateTaskEpset(pTask, pInfo->nodeId, &pInfo->newEp, &updated); if (code) { stError("s-task:0x%x failed to update the task nodeEp epset, code:%s", pTask->id.taskId, tstrerror(code)); @@ -1013,6 +1020,10 @@ SEpSet* streamTaskGetDownstreamEpInfo(SStreamTask* pTask, int32_t taskId) { SArray* pList = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) { SVgroupInfo* pVgInfo = taosArrayGet(pList, i); + if (pVgInfo == NULL) { + continue; + } + if (pVgInfo->taskId == taskId) { return &pVgInfo->epSet; } diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 04969c2b48..a54c17df03 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -164,6 +164,10 @@ static STaskStateTrans* streamTaskFindTransform(ETaskStatus state, const EStream int32_t numOfTrans = taosArrayGetSize(streamTaskSMTrans); for (int32_t i = 0; i < numOfTrans; ++i) { STaskStateTrans* pTrans = taosArrayGet(streamTaskSMTrans, i); + if (pTrans == NULL) { + continue; + } + if (pTrans->state.state == state && pTrans->event == event) { return pTrans; } @@ -187,6 +191,9 @@ static int32_t doHandleWaitingEvent(SStreamTaskSM* pSM, const char* pEventName, ASSERT(taosArrayGetSize(pSM->pWaitingEventList) == 1); SFutureHandleEventInfo* pEvtInfo = taosArrayGet(pSM->pWaitingEventList, 0); + if (pEvtInfo == NULL) { + return terrno; + } // OK, let's handle the waiting event, since the task has reached the required status now if (pSM->current.state == pEvtInfo->status) { @@ -227,6 +234,10 @@ static int32_t removeEventInWaitingList(SStreamTask* pTask, EStreamTaskEvent eve int32_t num = taosArrayGetSize(pSM->pWaitingEventList); for (int32_t i = 0; i < num; ++i) { SFutureHandleEventInfo* pInfo = taosArrayGet(pSM->pWaitingEventList, i); + if (pInfo == NULL) { + continue; + } + if (pInfo->event == event) { taosArrayRemove(pSM->pWaitingEventList, i); stDebug("s-task:%s %s event in waiting list not be handled yet, remove it from waiting list, remaining events:%d",