From 6297d238e0abfc3fb84b83c8865290593b22ea7f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 5 Aug 2024 18:34:56 +0800 Subject: [PATCH] fix: check return values. --- source/libs/executor/inc/executil.h | 2 +- source/libs/executor/src/executil.c | 9 ++++--- source/libs/executor/src/operator.c | 21 ++++++++++++++++ source/libs/executor/src/tsort.c | 3 +-- source/libs/stream/src/streamCheckStatus.c | 7 ------ source/libs/stream/src/streamCheckpoint.c | 28 ++++++++++++++-------- source/util/src/tpagedbuf.c | 10 +++++--- 7 files changed, 54 insertions(+), 26 deletions(-) diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 6c4fd43aa1..a11d07c518 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -26,7 +26,7 @@ #define T_LONG_JMP(_obj, _c) \ do { \ - ASSERT((_c) != 1); \ + ASSERT((_c) != -1); \ longjmp((_obj), (_c)); \ } while (0) diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 801bdcb75a..fab91e9856 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1434,7 +1434,12 @@ int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray** tableList pNode.suid = suid; pNode.uid = suid; pNode.tableType = TSDB_SUPER_TABLE; + STableListInfo* pTableListInfo = tableListCreate(); + if (pTableListInfo == NULL) { + return terrno; + } + uint8_t digest[17] = {0}; int code = getTableList(pVnode, &pNode, pSubplan ? pSubplan->pTagCond : NULL, pSubplan ? pSubplan->pTagIndexCond : NULL, @@ -2411,11 +2416,10 @@ bool oneTableForEachGroup(const STableListInfo* pTableList) { return pTableList- STableListInfo* tableListCreate() { STableListInfo* pListInfo = taosMemoryCalloc(1, sizeof(STableListInfo)); if (pListInfo == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - pListInfo->remainGroups = NULL; + pListInfo->remainGroups = NULL; pListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo)); if (pListInfo->pTableList == NULL) { goto _error; @@ -2431,7 +2435,6 @@ STableListInfo* tableListCreate() { _error: tableListDestroy(pListInfo); - terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index 701ed0ddbc..c204494710 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -295,6 +295,10 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand } STableListInfo* pTableListInfo = tableListCreate(); + if (pTableListInfo == NULL) { + pTaskInfo->code = terrno; + return terrno; + } code = initQueriedTableSchemaInfo(pHandle, &pTableScanNode->scan, dbname, pTaskInfo); if (code) { @@ -362,6 +366,10 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) { STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; STableListInfo* pTableListInfo = tableListCreate(); + if (pTableListInfo == NULL){ + pTaskInfo->code = terrno; + return terrno; + } if (pHandle->vnode) { code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, @@ -385,6 +393,11 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) { STagScanPhysiNode* pTagScanPhyNode = (STagScanPhysiNode*)pPhyNode; STableListInfo* pTableListInfo = tableListCreate(); + if (pTableListInfo == NULL) { + pTaskInfo->code = terrno; + return terrno; + } + if (!pTagScanPhyNode->onlyMetaCtbIdx) { code = createScanTableListInfo((SScanPhysiNode*)pTagScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo); @@ -398,6 +411,10 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) { SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode; STableListInfo* pTableListInfo = tableListCreate(); + if (pTableListInfo == NULL) { + pTaskInfo->code = terrno; + return terrno; + } if (pBlockNode->tableType == TSDB_SUPER_TABLE) { SArray* pList = taosArrayInit(4, sizeof(uint64_t)); @@ -436,6 +453,10 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand } else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) { SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode; STableListInfo* pTableListInfo = tableListCreate(); + if (pTableListInfo == NULL) { + pTaskInfo->code = terrno; + return terrno; + } code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo); diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 8181ad3069..300d7576d9 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -2452,7 +2452,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) { return code; } -static bool tsortOpenForBufMergeSort(SSortHandle* pHandle) { +static int32_t tsortOpenForBufMergeSort(SSortHandle* pHandle) { int32_t code = createInitialSources(pHandle); if (code != TSDB_CODE_SUCCESS) { return code; @@ -2479,7 +2479,6 @@ static bool tsortOpenForBufMergeSort(SSortHandle* pHandle) { } code = tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn); - ASSERT(code != 1); return code; } diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 9a2323582c..2de86b8794 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -74,13 +74,6 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_ } if (pInfo->stage != stage) { - streamMutexLock(&pTask->lock); - ETaskStatus status = streamTaskGetStatus(pTask).state; - if (status == TASK_STATUS__CK) { - streamTaskSetFailedCheckpointId(pTask); - } - streamMutexUnlock(&pTask->lock); - return TASK_UPSTREAM_NEW_STAGE; } else if (pTask->status.downstreamReady != 1) { stDebug("s-task:%s vgId:%d leader:%d, downstream not ready", id, vgId, (pTask->pMeta->role == NODE_ROLE_LEADER)); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index acac5dfc9e..7c41c380a6 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -453,8 +453,8 @@ int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstream for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) { STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i); if (pReadyInfo == NULL) { - streamMutexUnlock(&pInfo->lock); - return TSDB_CODE_INVALID_PARA; + stError("s-task:%s invalid index during iterate the checkpoint-ready msg list, index:%d, ignore and continue", i); + continue; } if (pReadyInfo->upstreamTaskId == upstreamTaskId && pReadyInfo->checkpointId == checkpointId) { @@ -468,8 +468,8 @@ int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstream for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) { STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i); if (pReadyInfo == NULL) { - streamMutexUnlock(&pInfo->lock); - return TSDB_CODE_INVALID_PARA; + stError("s-task:%s invalid index during iterate the checkpoint-ready msg list, index:%d, ignore and continue", i); + continue; } if (pReadyInfo->sendCompleted == 1) { @@ -601,9 +601,15 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV } void streamTaskSetFailedCheckpointId(SStreamTask* pTask) { - pTask->chkInfo.pActiveInfo->failedId = pTask->chkInfo.pActiveInfo->activeId; - stDebug("s-task:%s mark the checkpointId:%" PRId64 " (transId:%d) failed", pTask->id.idStr, - pTask->chkInfo.pActiveInfo->activeId, pTask->chkInfo.pActiveInfo->transId); + struct SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; + + if (pInfo->activeId <= 0) { + stWarn("s-task:%s checkpoint-info is cleared now, not set the failed checkpoint info", pTask->id.idStr); + } else { + pInfo->failedId = pInfo->activeId; + stDebug("s-task:%s mark the checkpointId:%" PRId64 " (transId:%d) failed", pTask->id.idStr, pInfo->activeId, + pInfo->transId); + } } static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* list) { @@ -960,6 +966,7 @@ bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId) const char* id = pTask->id.idStr; SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; SStreamTaskState pStatus = streamTaskGetStatus(pTask); + bool alreadySend = false; if (pStatus.state != TASK_STATUS__CK) { return false; @@ -971,11 +978,12 @@ bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId) return false; } - for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) { + int32_t num = taosArrayGetSize(pInfo->pDispatchTriggerList); + for (int32_t i = 0; i < num; ++i) { STaskTriggerSendInfo* pSendInfo = taosArrayGet(pInfo->pDispatchTriggerList, i); if (pSendInfo == NULL) { - streamMutexUnlock(&pInfo->lock); - return TSDB_CODE_INVALID_PARA; + stError("s-task:%s invalid index in dispatch-trigger list, index:%d, size:%d, ignore and continue", id, i, num); + continue; } if (pSendInfo->nodeId != downstreamNodeId) { diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c index 76cf067842..539959126c 100644 --- a/source/util/src/tpagedbuf.c +++ b/source/util/src/tpagedbuf.c @@ -352,9 +352,8 @@ static SPageInfo* getPageInfoFromPayload(void* page) { int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, const char* id, const char* dir) { - *pBuf = taosMemoryCalloc(1, sizeof(SDiskbasedBuf)); - - SDiskbasedBuf* pPBuf = *pBuf; + *pBuf = NULL; + SDiskbasedBuf* pPBuf = taosMemoryCalloc(1, sizeof(SDiskbasedBuf)); if (pPBuf == NULL) { goto _error; } @@ -394,11 +393,16 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem pPBuf->prefix = (char*)dir; pPBuf->emptyDummyIdList = taosArrayInit(1, sizeof(int32_t)); + if (pPBuf->emptyDummyIdList == NULL) { + goto _error; + } // qDebug("QInfo:0x%"PRIx64" create resBuf for output, page size:%d, inmem buf pages:%d, file:%s", qId, // pPBuf->pageSize, pPBuf->inMemPages, pPBuf->path); + *pBuf = pPBuf; return TSDB_CODE_SUCCESS; + _error: destroyDiskbasedBuf(pPBuf); return TSDB_CODE_OUT_OF_MEMORY;