From 3772f25840e252047471c0838f5b9713c41cc220 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 5 Aug 2024 18:02:32 +0800 Subject: [PATCH 1/3] fix(stream): check return value. --- source/common/src/tdatablock.c | 1 + source/libs/executor/inc/executil.h | 2 +- source/libs/executor/src/executil.c | 5 ++++- source/libs/executor/src/groupoperator.c | 2 +- source/libs/executor/src/mergeoperator.c | 6 ++++++ source/libs/executor/src/querytask.c | 20 ++++++++++++++++++- source/libs/executor/src/tsort.c | 25 +++++++++++++++--------- source/util/src/tarray.c | 5 +++-- source/util/src/tlosertree.c | 2 +- 9 files changed, 52 insertions(+), 16 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index b489314e21..f50a624ea7 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1620,6 +1620,7 @@ void blockDataFreeRes(SSDataBlock* pBlock) { if (pBlock == NULL){ return; } + int32_t numOfOutput = taosArrayGetSize(pBlock->pDataBlock); for (int32_t i = 0; i < numOfOutput; ++i) { SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i); diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 2adc863baf..6c4fd43aa1 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -26,7 +26,7 @@ #define T_LONG_JMP(_obj, _c) \ do { \ - ASSERT((_c) != 1); \ + ASSERT((_c) != 1); \ longjmp((_obj), (_c)); \ } while (0) diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 0ca9908fb1..801bdcb75a 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1692,8 +1692,11 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) { SDataType* pType = &pColNode->node.resType; pExp->base.resSchema = createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pColNode->colName); + pExp->base.pParam[0].pCol = createColumn(pColNode->dataBlockId, pColNode->slotId, pColNode->colId, pType, pColNode->colType); + QUERY_CHECK_NULL(pExp->base.pParam[0].pCol, code, lino, _end, terrno); + pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN; } else if (type == QUERY_NODE_VALUE) { pExp->pExpr->nodeType = QUERY_NODE_VALUE; @@ -1761,6 +1764,7 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) { pExp->base.pParam[j].type = FUNC_PARAM_TYPE_COLUMN; pExp->base.pParam[j].pCol = createColumn(pcn->dataBlockId, pcn->slotId, pcn->colId, &pcn->node.resType, pcn->colType); + QUERY_CHECK_NULL(pExp->base.pParam[j].pCol, code, lino, _end, terrno); } else if (p1->type == QUERY_NODE_VALUE) { SValueNode* pvn = (SValueNode*)p1; pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE; @@ -2437,7 +2441,6 @@ void tableListDestroy(STableListInfo* pTableListInfo) { } taosArrayDestroy(pTableListInfo->pTableList); - pTableListInfo->pTableList = NULL; taosMemoryFreeClear(pTableListInfo->groupOffset); taosHashCleanup(pTableListInfo->map); diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 502f9ff0f7..f48571c555 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -1139,8 +1139,8 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo int32_t numOfCols = 0; SExprInfo* pExprInfo = NULL; - code = createExprInfo(pPartNode->pTargets, NULL, &pExprInfo, &numOfCols); + QUERY_CHECK_CODE(code, lino, _error); pInfo->pGroupCols = makeColumnArrayFromList(pPartNode->pPartitionKeys); diff --git a/source/libs/executor/src/mergeoperator.c b/source/libs/executor/src/mergeoperator.c index 3b390c8719..8bc7c2db50 100644 --- a/source/libs/executor/src/mergeoperator.c +++ b/source/libs/executor/src/mergeoperator.c @@ -594,9 +594,11 @@ int32_t createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size_t numS SSortMergeInfo* pSortMergeInfo = &pInfo->sortMergeInfo; initLimitInfo(pMergePhyNode->node.pLimit, pMergePhyNode->node.pSlimit, &pInfo->limitInfo); pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode); + TSDB_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno); SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); SSDataBlock* pInputBlock = createDataBlockFromDescNode(pChildNode->pOutputDataBlockDesc); + TSDB_CHECK_NULL(pInputBlock, code, lino, _error, terrno); initResultSizeInfo(&pOperator->resultInfo, 1024); code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); @@ -620,6 +622,8 @@ int32_t createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size_t numS case MERGE_TYPE_NON_SORT: { SNonSortMergeInfo* pNonSortMerge = &pInfo->nsortMergeInfo; pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode); + TSDB_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno); + initResultSizeInfo(&pOperator->resultInfo, 1024); code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); TSDB_CHECK_CODE(code, lino, _error); @@ -629,6 +633,8 @@ int32_t createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size_t numS case MERGE_TYPE_COLUMNS: { SColsMergeInfo* pColsMerge = &pInfo->colsMergeInfo; pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode); + TSDB_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno); + initResultSizeInfo(&pOperator->resultInfo, 1); code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); TSDB_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/querytask.c b/source/libs/executor/src/querytask.c index 9f4d9c4405..6f0bc40d29 100644 --- a/source/libs/executor/src/querytask.c +++ b/source/libs/executor/src/querytask.c @@ -64,6 +64,10 @@ int32_t doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC p->id.queryId = queryId; p->id.taskId = taskId; p->id.str = taosMemoryMalloc(64); + if (p->id.str == NULL) { + return terrno; + } + buildTaskId(taskId, queryId, p->id.str); p->schemaInfos = taosArrayInit(1, sizeof(SSchemaInfo)); if (p->id.str == NULL || p->schemaInfos == NULL) { @@ -174,9 +178,16 @@ int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNo schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow); } + if (schemaInfo.sw == NULL) { + return terrno; + } + pAPI->metaReaderFn.clearReader(&mr); schemaInfo.qsw = extractQueriedColumnSchema(pScanNode); - + if (schemaInfo.qsw == NULL) { + return terrno; + } + void* p = taosArrayPush(pTaskInfo->schemaInfos, &schemaInfo); return (p != NULL)? TSDB_CODE_SUCCESS:TSDB_CODE_OUT_OF_MEMORY; } @@ -186,7 +197,14 @@ SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) { int32_t numOfTags = LIST_LENGTH(pScanNode->pScanPseudoCols); SSchemaWrapper* pqSw = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); + if (pqSw == NULL) { + return NULL; + } + pqSw->pSchema = taosMemoryCalloc(numOfCols + numOfTags, sizeof(SSchema)); + if (pqSw->pSchema == NULL) { + return NULL; + } for (int32_t i = 0; i < numOfCols; ++i) { STargetNode* pNode = (STargetNode*)nodesListGetNode(pScanNode->pScanCols, i); diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 896b4db7cd..8181ad3069 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -182,8 +182,8 @@ static void* tupleGetField(char* t, uint32_t colIdx, uint32_t colNum) { } int32_t tsortGetSortedDataBlock(const SSortHandle* pSortHandle, SSDataBlock** pBlock) { + *pBlock = NULL; if (pSortHandle->pDataBlock == NULL) { - *pBlock = NULL; return TSDB_CODE_SUCCESS; } return createOneDataBlock(pSortHandle->pDataBlock, false, pBlock); @@ -2478,7 +2478,9 @@ static bool tsortOpenForBufMergeSort(SSortHandle* pHandle) { return code; } - return tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn); + code = tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn); + ASSERT(code != 1); + return code; } void tsortClose(SSortHandle* pHandle) { @@ -2808,19 +2810,24 @@ static int32_t tsortSingleTableMergeNextTuple(SSortHandle* pHandle, STupleHandle } int32_t tsortOpen(SSortHandle* pHandle) { + int32_t code = 0; if (pHandle->opened) { - return 0; + return code; } - if (pHandle->fetchfp == NULL || pHandle->comparFn == NULL) { - return TSDB_CODE_INVALID_PARA; + if (pHandle == NULL || pHandle->fetchfp == NULL || pHandle->comparFn == NULL) { + code = TSDB_CODE_INVALID_PARA; + return code; } pHandle->opened = true; - if (tsortIsPQSortApplicable(pHandle)) - return tsortOpenForPQSort(pHandle); - else - return tsortOpenForBufMergeSort(pHandle); + if (tsortIsPQSortApplicable(pHandle)) { + code = tsortOpenForPQSort(pHandle); + } else { + code = tsortOpenForBufMergeSort(pHandle); + } + + return code; } int32_t tsortNextTuple(SSortHandle* pHandle, STupleHandle** pTupleHandle) { diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index e0d38df5c8..37667e2975 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -95,11 +95,12 @@ int32_t taosArrayEnsureCap(SArray* pArray, size_t newCap) { tsize = (newSize == tsize) ? (tsize + 2) : newSize; } - pArray->pData = taosMemoryRealloc(pArray->pData, tsize * pArray->elemSize); - if (pArray->pData == NULL) { + char* p = taosMemoryRealloc(pArray->pData, tsize * pArray->elemSize); + if (p == NULL) { return terrno; } + pArray->pData = p; pArray->capacity = tsize; } return 0; diff --git a/source/util/src/tlosertree.c b/source/util/src/tlosertree.c index 0e24d54565..7973a84593 100644 --- a/source/util/src/tlosertree.c +++ b/source/util/src/tlosertree.c @@ -39,7 +39,7 @@ int32_t tMergeTreeCreate(SMultiwayMergeTreeInfo** pTree, uint32_t numOfSources, (SMultiwayMergeTreeInfo*)taosMemoryCalloc(1, sizeof(SMultiwayMergeTreeInfo) + sizeof(STreeNode) * totalEntries); if (pTreeInfo == NULL) { uError("allocate memory for loser-tree failed. reason:%s", strerror(errno)); - return TAOS_SYSTEM_ERROR(errno); + return terrno; } pTreeInfo->pNode = (STreeNode*)(((char*)pTreeInfo) + sizeof(SMultiwayMergeTreeInfo)); From 6297d238e0abfc3fb84b83c8865290593b22ea7f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 5 Aug 2024 18:34:56 +0800 Subject: [PATCH 2/3] fix: check return values. --- source/libs/executor/inc/executil.h | 2 +- source/libs/executor/src/executil.c | 9 ++++--- source/libs/executor/src/operator.c | 21 ++++++++++++++++ source/libs/executor/src/tsort.c | 3 +-- source/libs/stream/src/streamCheckStatus.c | 7 ------ source/libs/stream/src/streamCheckpoint.c | 28 ++++++++++++++-------- source/util/src/tpagedbuf.c | 10 +++++--- 7 files changed, 54 insertions(+), 26 deletions(-) diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 6c4fd43aa1..a11d07c518 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -26,7 +26,7 @@ #define T_LONG_JMP(_obj, _c) \ do { \ - ASSERT((_c) != 1); \ + ASSERT((_c) != -1); \ longjmp((_obj), (_c)); \ } while (0) diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 801bdcb75a..fab91e9856 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1434,7 +1434,12 @@ int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray** tableList pNode.suid = suid; pNode.uid = suid; pNode.tableType = TSDB_SUPER_TABLE; + STableListInfo* pTableListInfo = tableListCreate(); + if (pTableListInfo == NULL) { + return terrno; + } + uint8_t digest[17] = {0}; int code = getTableList(pVnode, &pNode, pSubplan ? pSubplan->pTagCond : NULL, pSubplan ? pSubplan->pTagIndexCond : NULL, @@ -2411,11 +2416,10 @@ bool oneTableForEachGroup(const STableListInfo* pTableList) { return pTableList- STableListInfo* tableListCreate() { STableListInfo* pListInfo = taosMemoryCalloc(1, sizeof(STableListInfo)); if (pListInfo == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - pListInfo->remainGroups = NULL; + pListInfo->remainGroups = NULL; pListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo)); if (pListInfo->pTableList == NULL) { goto _error; @@ -2431,7 +2435,6 @@ STableListInfo* tableListCreate() { _error: tableListDestroy(pListInfo); - terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index 701ed0ddbc..c204494710 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -295,6 +295,10 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand } STableListInfo* pTableListInfo = tableListCreate(); + if (pTableListInfo == NULL) { + pTaskInfo->code = terrno; + return terrno; + } code = initQueriedTableSchemaInfo(pHandle, &pTableScanNode->scan, dbname, pTaskInfo); if (code) { @@ -362,6 +366,10 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) { STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; STableListInfo* pTableListInfo = tableListCreate(); + if (pTableListInfo == NULL){ + pTaskInfo->code = terrno; + return terrno; + } if (pHandle->vnode) { code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, @@ -385,6 +393,11 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) { STagScanPhysiNode* pTagScanPhyNode = (STagScanPhysiNode*)pPhyNode; STableListInfo* pTableListInfo = tableListCreate(); + if (pTableListInfo == NULL) { + pTaskInfo->code = terrno; + return terrno; + } + if (!pTagScanPhyNode->onlyMetaCtbIdx) { code = createScanTableListInfo((SScanPhysiNode*)pTagScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo); @@ -398,6 +411,10 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) { SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode; STableListInfo* pTableListInfo = tableListCreate(); + if (pTableListInfo == NULL) { + pTaskInfo->code = terrno; + return terrno; + } if (pBlockNode->tableType == TSDB_SUPER_TABLE) { SArray* pList = taosArrayInit(4, sizeof(uint64_t)); @@ -436,6 +453,10 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand } else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) { SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode; STableListInfo* pTableListInfo = tableListCreate(); + if (pTableListInfo == NULL) { + pTaskInfo->code = terrno; + return terrno; + } code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo); diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 8181ad3069..300d7576d9 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -2452,7 +2452,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) { return code; } -static bool tsortOpenForBufMergeSort(SSortHandle* pHandle) { +static int32_t tsortOpenForBufMergeSort(SSortHandle* pHandle) { int32_t code = createInitialSources(pHandle); if (code != TSDB_CODE_SUCCESS) { return code; @@ -2479,7 +2479,6 @@ static bool tsortOpenForBufMergeSort(SSortHandle* pHandle) { } code = tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn); - ASSERT(code != 1); return code; } diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 9a2323582c..2de86b8794 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -74,13 +74,6 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_ } if (pInfo->stage != stage) { - streamMutexLock(&pTask->lock); - ETaskStatus status = streamTaskGetStatus(pTask).state; - if (status == TASK_STATUS__CK) { - streamTaskSetFailedCheckpointId(pTask); - } - streamMutexUnlock(&pTask->lock); - return TASK_UPSTREAM_NEW_STAGE; } else if (pTask->status.downstreamReady != 1) { stDebug("s-task:%s vgId:%d leader:%d, downstream not ready", id, vgId, (pTask->pMeta->role == NODE_ROLE_LEADER)); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index acac5dfc9e..7c41c380a6 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -453,8 +453,8 @@ int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstream for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) { STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i); if (pReadyInfo == NULL) { - streamMutexUnlock(&pInfo->lock); - return TSDB_CODE_INVALID_PARA; + stError("s-task:%s invalid index during iterate the checkpoint-ready msg list, index:%d, ignore and continue", i); + continue; } if (pReadyInfo->upstreamTaskId == upstreamTaskId && pReadyInfo->checkpointId == checkpointId) { @@ -468,8 +468,8 @@ int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstream for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) { STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i); if (pReadyInfo == NULL) { - streamMutexUnlock(&pInfo->lock); - return TSDB_CODE_INVALID_PARA; + stError("s-task:%s invalid index during iterate the checkpoint-ready msg list, index:%d, ignore and continue", i); + continue; } if (pReadyInfo->sendCompleted == 1) { @@ -601,9 +601,15 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV } void streamTaskSetFailedCheckpointId(SStreamTask* pTask) { - pTask->chkInfo.pActiveInfo->failedId = pTask->chkInfo.pActiveInfo->activeId; - stDebug("s-task:%s mark the checkpointId:%" PRId64 " (transId:%d) failed", pTask->id.idStr, - pTask->chkInfo.pActiveInfo->activeId, pTask->chkInfo.pActiveInfo->transId); + struct SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; + + if (pInfo->activeId <= 0) { + stWarn("s-task:%s checkpoint-info is cleared now, not set the failed checkpoint info", pTask->id.idStr); + } else { + pInfo->failedId = pInfo->activeId; + stDebug("s-task:%s mark the checkpointId:%" PRId64 " (transId:%d) failed", pTask->id.idStr, pInfo->activeId, + pInfo->transId); + } } static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* list) { @@ -960,6 +966,7 @@ bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId) const char* id = pTask->id.idStr; SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; SStreamTaskState pStatus = streamTaskGetStatus(pTask); + bool alreadySend = false; if (pStatus.state != TASK_STATUS__CK) { return false; @@ -971,11 +978,12 @@ bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId) return false; } - for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) { + int32_t num = taosArrayGetSize(pInfo->pDispatchTriggerList); + for (int32_t i = 0; i < num; ++i) { STaskTriggerSendInfo* pSendInfo = taosArrayGet(pInfo->pDispatchTriggerList, i); if (pSendInfo == NULL) { - streamMutexUnlock(&pInfo->lock); - return TSDB_CODE_INVALID_PARA; + stError("s-task:%s invalid index in dispatch-trigger list, index:%d, size:%d, ignore and continue", id, i, num); + continue; } if (pSendInfo->nodeId != downstreamNodeId) { diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c index 76cf067842..539959126c 100644 --- a/source/util/src/tpagedbuf.c +++ b/source/util/src/tpagedbuf.c @@ -352,9 +352,8 @@ static SPageInfo* getPageInfoFromPayload(void* page) { int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, const char* id, const char* dir) { - *pBuf = taosMemoryCalloc(1, sizeof(SDiskbasedBuf)); - - SDiskbasedBuf* pPBuf = *pBuf; + *pBuf = NULL; + SDiskbasedBuf* pPBuf = taosMemoryCalloc(1, sizeof(SDiskbasedBuf)); if (pPBuf == NULL) { goto _error; } @@ -394,11 +393,16 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem pPBuf->prefix = (char*)dir; pPBuf->emptyDummyIdList = taosArrayInit(1, sizeof(int32_t)); + if (pPBuf->emptyDummyIdList == NULL) { + goto _error; + } // qDebug("QInfo:0x%"PRIx64" create resBuf for output, page size:%d, inmem buf pages:%d, file:%s", qId, // pPBuf->pageSize, pPBuf->inMemPages, pPBuf->path); + *pBuf = pPBuf; return TSDB_CODE_SUCCESS; + _error: destroyDiskbasedBuf(pPBuf); return TSDB_CODE_OUT_OF_MEMORY; From 6645192ff30ce66887f9dbb3af6b25fc253ce2e3 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 5 Aug 2024 19:12:42 +0800 Subject: [PATCH 3/3] fix(stream): fix syntax error. --- source/libs/executor/src/executil.c | 4 +++- source/libs/stream/src/streamCheckpoint.c | 6 ++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index fab91e9856..ec144596fd 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -2333,7 +2333,9 @@ void tableListGetSourceTableInfo(const STableListInfo* pTableList, uint64_t* psu uint64_t tableListGetTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) { int32_t* slot = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid)); - ASSERT(pTableList->map != NULL && slot != NULL); + if (slot == NULL) { + return -1; + } STableKeyInfo* pKeyInfo = taosArrayGet(pTableList->pTableList, *slot); ASSERT(pKeyInfo->uid == tableUid); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 7c41c380a6..9da7a5d9c8 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -453,7 +453,8 @@ int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstream for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) { STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i); if (pReadyInfo == NULL) { - stError("s-task:%s invalid index during iterate the checkpoint-ready msg list, index:%d, ignore and continue", i); + stError("s-task:%s invalid index during iterate the checkpoint-ready msg list, index:%d, ignore and continue", + pTask->id.idStr, i); continue; } @@ -468,7 +469,8 @@ int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstream for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) { STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i); if (pReadyInfo == NULL) { - stError("s-task:%s invalid index during iterate the checkpoint-ready msg list, index:%d, ignore and continue", i); + stError("s-task:%s invalid index during iterate the checkpoint-ready msg list, index:%d, ignore and continue", + pTask->id.idStr, i); continue; }