From eefee54c6aa64a6f871cd9a8f12d2497a7ff2626 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 23 Jul 2024 17:31:41 +0800 Subject: [PATCH] fix(stream): check return value. --- source/libs/executor/inc/operator.h | 4 +- source/libs/executor/inc/querytask.h | 21 +++---- source/libs/executor/src/executor.c | 30 ++++++++-- source/libs/executor/src/executorInt.c | 7 ++- source/libs/executor/src/mergeoperator.c | 6 +- source/libs/executor/src/querytask.c | 65 +++++++++++++--------- source/libs/executor/src/sortoperator.c | 19 ++++--- source/libs/stream/src/streamCheckStatus.c | 7 ++- source/libs/stream/src/streamCheckpoint.c | 42 ++++++++++++++ 9 files changed, 143 insertions(+), 58 deletions(-) diff --git a/source/libs/executor/inc/operator.h b/source/libs/executor/inc/operator.h index 95208545bd..37cdd5d7b3 100644 --- a/source/libs/executor/inc/operator.h +++ b/source/libs/executor/inc/operator.h @@ -140,7 +140,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SHashJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SHashJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle); @@ -164,7 +164,7 @@ SOperatorInfo* createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNo SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SGroupCachePhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo); // clang-format on diff --git a/source/libs/executor/inc/querytask.h b/source/libs/executor/inc/querytask.h index 48de49f07c..e3bb9a1361 100644 --- a/source/libs/executor/inc/querytask.h +++ b/source/libs/executor/inc/querytask.h @@ -98,16 +98,17 @@ struct SExecTaskInfo { SQueryAutoQWorkerPoolCB* pWorkerCb; }; -void buildTaskId(uint64_t taskId, uint64_t queryId, char* dst); -SExecTaskInfo* doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model, SStorageAPI* pAPI); -void doDestroyTask(SExecTaskInfo* pTaskInfo); -bool isTaskKilled(void* pTaskInfo); -void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode); -void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); -int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, - int32_t vgId, char* sql, EOPTR_EXEC_MODEL model); -int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo); -SArray* getTableListInfo(const SExecTaskInfo* pTaskInfo); +void buildTaskId(uint64_t taskId, uint64_t queryId, char* dst); +int32_t doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model, SStorageAPI* pAPI, + SExecTaskInfo** pTaskInfo); +void doDestroyTask(SExecTaskInfo* pTaskInfo); +bool isTaskKilled(void* pTaskInfo); +void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode); +void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); +int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, + int32_t vgId, char* sql, EOPTR_EXEC_MODEL model); +int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo); +int32_t getTableListInfo(const SExecTaskInfo* pTaskInfo, SArray** pList); #ifdef __cplusplus } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 8cf44602f9..8c98df5c8d 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -288,8 +288,10 @@ int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols, uint64_t id) { if (msg == NULL) { // create raw scan - SExecTaskInfo* pTaskInfo = doCreateTask(0, id, vgId, OPTR_EXEC_MODEL_QUEUE, &pReaderHandle->api); - if (NULL == pTaskInfo) { + SExecTaskInfo* pTaskInfo = NULL; + + int32_t code = doCreateTask(0, id, vgId, OPTR_EXEC_MODEL_QUEUE, &pReaderHandle->api, &pTaskInfo); + if (NULL == pTaskInfo || code != 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } @@ -1450,7 +1452,13 @@ SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SExecTaskInfo* pTaskInfo = tinfo; - SArray* plist = getTableListInfo(pTaskInfo); + SArray* plist = NULL; + + code = getTableListInfo(pTaskInfo, &plist); + if (code || plist == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } // only extract table in the first elements STableListInfo* pTableListInfo = taosArrayGetP(plist, 0); @@ -1502,11 +1510,21 @@ _end: } } -SArray* getTableListInfo(const SExecTaskInfo* pTaskInfo) { - SArray* pArray = taosArrayInit(0, POINTER_BYTES); +int32_t getTableListInfo(const SExecTaskInfo* pTaskInfo, SArray** pList) { + if (pList == NULL) { + return TSDB_CODE_INVALID_PARA; + } + + SArray* pArray = taosArrayInit(0, POINTER_BYTES); + if (pArray == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + SOperatorInfo* pOperator = pTaskInfo->pRoot; extractTableList(pArray, pOperator); - return pArray; + + *pList = pArray; + return TSDB_CODE_SUCCESS; } int32_t qStreamOperatorReleaseState(qTaskInfo_t tInfo) { diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index a3e3501114..b974384d85 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -1068,7 +1068,12 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, SExecTaskInfo* return TSDB_CODE_OUT_OF_MEMORY; } - SArray* pInfoList = getTableListInfo(pTask); + SArray* pInfoList = NULL; + int32_t code = getTableListInfo(pTask, &pInfoList); + if (code || pInfoList == NULL) { + return code; + } + STableListInfo* pTableListInfo = taosArrayGetP(pInfoList, 0); taosArrayDestroy(pInfoList); diff --git a/source/libs/executor/src/mergeoperator.c b/source/libs/executor/src/mergeoperator.c index cef1915d2c..384b14ce41 100644 --- a/source/libs/executor/src/mergeoperator.c +++ b/source/libs/executor/src/mergeoperator.c @@ -168,8 +168,10 @@ SSDataBlock* doSortMerge(SOperatorInfo* pOperator) { blockDataCleanup(pDataBlock); if (pSortMergeInfo->pIntermediateBlock == NULL) { - pSortMergeInfo->pIntermediateBlock = tsortGetSortedDataBlock(pHandle); - if (pSortMergeInfo->pIntermediateBlock == NULL) { + pSortMergeInfo->pIntermediateBlock = NULL; + + int32_t code = tsortGetSortedDataBlock(pHandle, &pSortMergeInfo->pIntermediateBlock); + if (pSortMergeInfo->pIntermediateBlock == NULL || code != 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } diff --git a/source/libs/executor/src/querytask.c b/source/libs/executor/src/querytask.c index 0bf4ac2b21..ddba556c7a 100644 --- a/source/libs/executor/src/querytask.c +++ b/source/libs/executor/src/querytask.c @@ -35,38 +35,51 @@ #define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st))) -SExecTaskInfo* doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model, SStorageAPI* pAPI) { - SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo)); +int32_t doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model, SStorageAPI* pAPI, + SExecTaskInfo** pTaskInfo) { if (pTaskInfo == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + return TSDB_CODE_SUCCESS; } - setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); - pTaskInfo->cost.created = taosGetTimestampUs(); + SExecTaskInfo* p = taosMemoryCalloc(1, sizeof(SExecTaskInfo)); + if (p == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } - pTaskInfo->execModel = model; - pTaskInfo->stopInfo.pStopInfo = taosArrayInit(4, sizeof(SExchangeOpStopInfo)); - pTaskInfo->pResultBlockList = taosArrayInit(128, POINTER_BYTES); - pTaskInfo->storageAPI = *pAPI; + setTaskStatus(p, TASK_NOT_COMPLETED); + p->cost.created = taosGetTimestampUs(); - taosInitRWLatch(&pTaskInfo->lock); + p->execModel = model; + p->stopInfo.pStopInfo = taosArrayInit(4, sizeof(SExchangeOpStopInfo)); + p->pResultBlockList = taosArrayInit(128, POINTER_BYTES); + if (p->stopInfo.pStopInfo == NULL || p->pResultBlockList == NULL) { + doDestroyTask(p); + return TSDB_CODE_OUT_OF_MEMORY; + } - pTaskInfo->id.vgId = vgId; - pTaskInfo->id.queryId = queryId; - pTaskInfo->id.taskId = taskId; - pTaskInfo->id.str = taosMemoryMalloc(64); - buildTaskId(taskId, queryId, pTaskInfo->id.str); - pTaskInfo->schemaInfos = taosArrayInit(1, sizeof(SSchemaInfo)); - - return pTaskInfo; + p->storageAPI = *pAPI; + taosInitRWLatch(&p->lock); + + p->id.vgId = vgId; + p->id.queryId = queryId; + p->id.taskId = taskId; + p->id.str = taosMemoryMalloc(64); + buildTaskId(taskId, queryId, p->id.str); + p->schemaInfos = taosArrayInit(1, sizeof(SSchemaInfo)); + if (p->id.str == NULL || p->schemaInfos == NULL) { + doDestroyTask(p); + return TSDB_CODE_OUT_OF_MEMORY; + } + + *pTaskInfo = p; + return TSDB_CODE_SUCCESS; } bool isTaskKilled(void* pTaskInfo) { return (0 != ((SExecTaskInfo*)pTaskInfo)->code); } void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode) { pTaskInfo->code = rspCode; - stopTableScanOperator(pTaskInfo->pRoot, pTaskInfo->id.str, &pTaskInfo->storageAPI); + (void) stopTableScanOperator(pTaskInfo->pRoot, pTaskInfo->id.str, &pTaskInfo->storageAPI); } void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) { @@ -81,10 +94,10 @@ void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) { int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, int32_t vgId, char* sql, EOPTR_EXEC_MODEL model) { - *pTaskInfo = doCreateTask(pPlan->id.queryId, taskId, vgId, model, &pHandle->api); - if (*pTaskInfo == NULL) { + int32_t code = doCreateTask(pPlan->id.queryId, taskId, vgId, model, &pHandle->api, pTaskInfo); + if (*pTaskInfo == NULL || code != 0) { taosMemoryFree(sql); - return terrno; + return code; } if (pHandle) { @@ -165,12 +178,10 @@ int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNo } pAPI->metaReaderFn.clearReader(&mr); - schemaInfo.qsw = extractQueriedColumnSchema(pScanNode); - taosArrayPush(pTaskInfo->schemaInfos, &schemaInfo); - - return TSDB_CODE_SUCCESS; + void* p = taosArrayPush(pTaskInfo->schemaInfos, &schemaInfo); + return (p != NULL)? TSDB_CODE_SUCCESS:TSDB_CODE_OUT_OF_MEMORY; } SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) { diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 67b993d702..9892114735 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -166,7 +166,11 @@ void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) { } pBlock->info.dataLoad = 1; - pBlock->info.scanFlag = ((SDataBlockInfo*)tsortGetBlockInfo(pTupleHandle))->scanFlag; + + SDataBlockInfo info = {0}; + tsortGetBlockInfo(pTupleHandle, &info); + + pBlock->info.scanFlag = info.scanFlag; pBlock->info.rows += 1; } @@ -224,9 +228,9 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i SSortOperatorInfo* pInfo) { blockDataCleanup(pDataBlock); - int32_t code = 0; - SSDataBlock* p = tsortGetSortedDataBlock(pHandle); - if (p == NULL) { + SSDataBlock* p = NULL; + int32_t code = tsortGetSortedDataBlock(pHandle, &p); + if (p == NULL || (code != 0)) { return NULL; } @@ -443,8 +447,9 @@ SSDataBlock* getGroupSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlo blockDataCleanup(pDataBlock); blockDataEnsureCapacity(pDataBlock, capacity); - SSDataBlock* p = tsortGetSortedDataBlock(pHandle); - if (p == NULL) { + SSDataBlock* p = NULL; + int32_t code = tsortGetSortedDataBlock(pHandle, &p); + if (p == NULL || (code != 0)) { return NULL; } @@ -452,7 +457,7 @@ SSDataBlock* getGroupSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlo while (1) { STupleHandle* pTupleHandle = NULL; - int32_t code = tsortNextTuple(pHandle, &pTupleHandle); + code = tsortNextTuple(pHandle, &pTupleHandle); if (pTupleHandle == NULL || code != 0) { break; } diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 66c1941abb..625305da03 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -418,13 +418,14 @@ void findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId, SDownstreamStatus int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs, int64_t reqId, int32_t* pNotReady, const char* id) { - streamMutexLock(&pInfo->checkInfoLock); - SDownstreamStatusInfo* p = NULL; + + streamMutexLock(&pInfo->checkInfoLock); findCheckRspStatus(pInfo, taskId, &p); if (p != NULL) { if (reqId != p->reqId) { - stError("s-task:%s reqId:0x%" PRIx64 " expected:0x%" PRIx64 " expired check-rsp recv from downstream task:0x%x, discarded", + stError("s-task:%s reqId:0x%" PRIx64 " expected:0x%" PRIx64 + " expired check-rsp recv from downstream task:0x%x, discarded", id, reqId, p->reqId, taskId); streamMutexUnlock(&pInfo->checkInfoLock); return TSDB_CODE_FAILED; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 60019977cc..4dc7b14cc3 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -176,6 +176,10 @@ int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStream int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) { SSDataBlock* pDataBlock = taosArrayGet(pBlock->blocks, 0); + if (pDataBlock == NULL) { + return TSDB_CODE_INVALID_PARA; + } + int64_t checkpointId = pDataBlock->info.version; int32_t transId = pDataBlock->info.window.skey; const char* id = pTask->id.idStr; @@ -248,6 +252,10 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock // check if already recv or not, and duplicated checkpoint-trigger msg recv, discard it for (int32_t i = 0; i < taosArrayGetSize(pActiveInfo->pReadyMsgList); ++i) { STaskCheckpointReadyInfo* p = taosArrayGet(pActiveInfo->pReadyMsgList, i); + if (p == NULL) { + return TSDB_CODE_INVALID_PARA; + } + if (p->upstreamTaskId == pBlock->srcTaskId) { ASSERT(p->checkpointId == checkpointId); stWarn("s-task:%s repeatly recv checkpoint-source msg from task:0x%x vgId:%d, checkpointId:%" PRId64 @@ -381,6 +389,10 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId int32_t size = taosArrayGetSize(pInfo->pCheckpointReadyRecvList); for (int32_t i = 0; i < size; ++i) { STaskDownstreamReadyInfo* p = taosArrayGet(pInfo->pCheckpointReadyRecvList, i); + if (p == NULL) { + return TSDB_CODE_INVALID_PARA; + } + if (p->downstreamTaskId == downstreamTaskId) { received = true; break; @@ -420,6 +432,10 @@ int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstream streamMutexLock(&pInfo->lock); for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) { STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i); + if (pReadyInfo == NULL) { + return TSDB_CODE_INVALID_PARA; + } + if (pReadyInfo->upstreamTaskId == upstreamTaskId && pReadyInfo->checkpointId == checkpointId) { pReadyInfo->sendCompleted = 1; stDebug("s-task:%s send checkpoint-ready msg to upstream:0x%x confirmed, checkpointId:%" PRId64 " ts:%" PRId64, @@ -430,6 +446,10 @@ int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstream for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) { STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i); + if (pReadyInfo == NULL) { + return TSDB_CODE_INVALID_PARA; + } + if (pReadyInfo->sendCompleted == 1) { numOfConfirmed += 1; } @@ -819,6 +839,10 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { bool recved = false; for (int32_t j = 0; j < taosArrayGetSize(pActiveInfo->pReadyMsgList); ++j) { STaskCheckpointReadyInfo* pReady = taosArrayGet(pActiveInfo->pReadyMsgList, j); + if (pReady == NULL) { + continue; + } + if (pInfo->nodeId == pReady->upstreamNodeId) { recved = true; break; @@ -867,6 +891,9 @@ int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) { for (int32_t i = 0; i < size; i++) { SStreamUpstreamEpInfo* pUpstreamTask = taosArrayGet(pNotSendList, i); + if (pUpstreamTask == NULL) { + return TSDB_CODE_INVALID_PARA; + } SRetrieveChkptTriggerReq* pReq = rpcMallocCont(sizeof(SRetrieveChkptTriggerReq)); if (pReq == NULL) { @@ -917,6 +944,10 @@ bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId) for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) { STaskTriggerSendInfo* pSendInfo = taosArrayGet(pInfo->pDispatchTriggerList, i); + if (pSendInfo == NULL) { + return TSDB_CODE_INVALID_PARA; + } + if (pSendInfo->nodeId != downstreamNodeId) { continue; } @@ -974,6 +1005,9 @@ void streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) { } else { for (int32_t i = 0; i < streamTaskGetNumOfDownstream(pTask); ++i) { SVgroupInfo* pVgInfo = taosArrayGet(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos, i); + if (pVgInfo == NULL) { + continue; + } STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pVgInfo->vgId, .taskId = pVgInfo->taskId}; void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p); @@ -993,6 +1027,10 @@ int32_t streamTaskGetNumOfConfirmed(SStreamTask* pTask) { streamMutexLock(&pInfo->lock); for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) { STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i); + if (p == NULL) { + return num; + } + if (p->recved) { num++; } @@ -1009,6 +1047,10 @@ void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId) { for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) { STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i); + if (p == NULL) { + continue; + } + if (p->nodeId == vgId) { ASSERT(p->recved == false);