diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index d9e413b144..f86c08501d 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -187,6 +187,7 @@ int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandl pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_ALL | SCAN_ROW_TYPE(pScanNode->ignoreNull); STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0); + if (totalTables) QUERY_CHECK_NULL(pList, code, lino, _error, terrno); uint64_t suid = tableListGetSuid(pTableListInfo); code = pInfo->readHandle.api.cacheFn.openReader(pInfo->readHandle.vnode, pInfo->retrieveType, pList, totalTables, @@ -439,8 +440,8 @@ void destroyCacheScanOperator(void* param) { SCacheRowsScanInfo* pInfo = (SCacheRowsScanInfo*)param; blockDataDestroy(pInfo->pRes); blockDataDestroy(pInfo->pBufferedRes); - taosMemoryFree(pInfo->pSlotIds); - taosMemoryFree(pInfo->pDstSlotIds); + taosMemoryFreeClear(pInfo->pSlotIds); + taosMemoryFreeClear(pInfo->pDstSlotIds); taosArrayDestroy(pInfo->pCidList); taosArrayDestroy(pInfo->pFuncTypeList); taosArrayDestroy(pInfo->pUidList); @@ -462,13 +463,13 @@ int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTask *pSlotIds = taosMemoryMalloc(numOfCols * sizeof(int32_t)); if (*pSlotIds == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } *pDstSlotIds = taosMemoryMalloc(numOfCols * sizeof(int32_t)); if (*pDstSlotIds == NULL) { - taosMemoryFree(*pSlotIds); - return TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFreeClear(*pSlotIds); + return terrno; } SSchemaInfo* pSchemaInfo = taosArrayGetLast(pTaskInfo->schemaInfos); diff --git a/source/libs/executor/src/countwindowoperator.c b/source/libs/executor/src/countwindowoperator.c index 02b4a61bcb..bb9a5a5e76 100644 --- a/source/libs/executor/src/countwindowoperator.c +++ b/source/libs/executor/src/countwindowoperator.c @@ -82,9 +82,15 @@ static int32_t setCountWindowOutputBuff(SExprSupp* pExprSup, SCountWindowSupp* p int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SCountWindowResult* pBuff = getCountWinStateInfo(pCountSup); + QUERY_CHECK_NULL(pBuff, code, lino, _end, terrno); (*pResult) = &pBuff->row; code = setResultRowInitCtx(*pResult, pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset); (*ppResBuff) = pBuff; + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } return code; } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 04998705a5..dc0baebd66 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1293,6 +1293,11 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT if (uid == 0) { if (numOfTables != 0) { STableKeyInfo* tmp = tableListGetInfo(pTableListInfo, 0); + if (!tmp) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno)); + taosRUnLockLatch(&pTaskInfo->lock); + return terrno; + } if (tmp) uid = tmp->uid; ts = INT64_MIN; pScanInfo->currentTable = 0; @@ -1422,6 +1427,11 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT } STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0); + if (!pList) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + tDeleteSchemaWrapper(mtInfo.schema); + return code; + } int32_t size = tableListGetSize(pTableListInfo); code = pTaskInfo->storageAPI.tsdReader.tsdReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, @@ -1506,6 +1516,7 @@ SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo) { int32_t numOfTables = tableListGetSize(pTableListInfo); for (int32_t i = 0; i < numOfTables; ++i) { STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i); + QUERY_CHECK_NULL(pKeyInfo, code, lino, _end, terrno); void* tmp = taosArrayPush(pUidList, &pKeyInfo->uid); QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); } diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index dc2dd84f37..8a7f1ad77d 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -1107,6 +1107,11 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, SExecTaskInfo* for (int32_t i = 0; i < numOfTables; ++i) { STableKeyInfo* pTable = tableListGetInfo(pTableListInfo, i); + if (!pTable) { + taosArrayDestroy(pDeleterParam->pUidList); + taosMemoryFree(pDeleterParam); + return TSDB_CODE_OUT_OF_MEMORY; + } void* tmp = taosArrayPush(pDeleterParam->pUidList, &pTable->uid); if (!tmp) { taosArrayDestroy(pDeleterParam->pUidList); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 0c714a163d..56690d5190 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1261,8 +1261,14 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { (*ppRes) = NULL; return code; } - - tInfo = *(STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable); + STableKeyInfo* tmp = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable); + if (!tmp) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno)); + taosRUnLockLatch(&pTaskInfo->lock); + (*ppRes) = NULL; + return terrno; + } + tInfo = *tmp; taosRUnLockLatch(&pTaskInfo->lock); code = pAPI->tsdReader.tsdSetQueryTableList(pInfo->base.dataReader, &tInfo, 1); @@ -3517,6 +3523,7 @@ static int32_t extractTableIdList(const STableListInfo* pTableListInfo, SArray** size_t size = tableListGetSize(pTableListInfo); for (int32_t i = 0; i < size; ++i) { STableKeyInfo* pkeyInfo = tableListGetInfo(pTableListInfo, i); + QUERY_CHECK_NULL(pkeyInfo, code, lino, _end, terrno); void* tmp = taosArrayPush(tableIdList, &pkeyInfo->uid); QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); } @@ -4120,6 +4127,14 @@ static void doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes, SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0]; STableKeyInfo* item = tableListGetInfo(pInfo->pTableListInfo, pInfo->curPos); + if (!item) { + qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno), + GET_TASKID(pTaskInfo)); + tDecoderClear(&(*mr).coder); + pAPI->metaReaderFn.clearReader(mr); + T_LONG_JMP(pTaskInfo->env, terrno); + } + code = pAPI->metaReaderFn.getTableEntryByUid(mr, item->uid); tDecoderClear(&(*mr).coder); if (code != TSDB_CODE_SUCCESS) { @@ -4805,18 +4820,23 @@ static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSu return TSDB_CODE_SUCCESS; } -static void setGroupStartEndIndex(STableMergeScanInfo* pInfo) { +static int32_t setGroupStartEndIndex(STableMergeScanInfo* pInfo) { pInfo->bGroupProcessed = false; size_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo); int32_t i = pInfo->tableStartIndex + 1; for (; i < numOfTables; ++i) { STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i); + if (!tableKeyInfo) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno)); + return terrno; + } if (tableKeyInfo->groupId != pInfo->groupId) { break; } } pInfo->tableEndIndex = i - 1; + return TSDB_CODE_SUCCESS; } static int32_t openSubTablesMergeSort(STmsSubTablesMergeInfo* pSubTblsInfo) { @@ -4845,7 +4865,11 @@ static int32_t openSubTablesMergeSort(STmsSubTablesMergeInfo* pSubTblsInfo) { } static int32_t initSubTablesMergeInfo(STableMergeScanInfo* pInfo) { - setGroupStartEndIndex(pInfo); + int32_t code = setGroupStartEndIndex(pInfo); + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + return code; + } STmsSubTablesMergeInfo* pSubTblsInfo = taosMemoryCalloc(1, sizeof(STmsSubTablesMergeInfo)); if (pSubTblsInfo == NULL) { return TSDB_CODE_OUT_OF_MEMORY; @@ -4872,7 +4896,7 @@ static int32_t initSubTablesMergeInfo(STableMergeScanInfo* pInfo) { } int32_t bufPageSize = pInfo->bufPageSize; int32_t inMemSize = (pSubTblsInfo->numSubTables - pSubTblsInfo->numTableBlocksInMem) * bufPageSize; - int32_t code = + code = createDiskbasedBuf(&pSubTblsInfo->pBlocksBuf, pInfo->bufPageSize, inMemSize, "blocksExternalBuf", tsTempDir); if (code != TSDB_CODE_SUCCESS) { taosMemoryFree(pSubTblsInfo->aInputs); @@ -5150,7 +5174,9 @@ int32_t doTableMergeScanParaSubTablesNext(SOperatorInfo* pOperator, SSDataBlock* return code; } pInfo->tableStartIndex = 0; - pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex))->groupId; + STableKeyInfo* pTmpGpId = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex); + QUERY_CHECK_NULL(pTmpGpId, code, lino, _end, terrno); + pInfo->groupId = pTmpGpId->groupId; code = startSubTablesTableMergeScan(pOperator); QUERY_CHECK_CODE(code, lino, _end); } @@ -5164,6 +5190,7 @@ int32_t doTableMergeScanParaSubTablesNext(SOperatorInfo* pOperator, SSDataBlock* pBlock = getSubTablesSortedBlock(pOperator, pInfo->pResBlock, pOperator->resultInfo.capacity); if (pBlock == NULL && !pInfo->bGroupProcessed && pInfo->needCountEmptyTable) { STableKeyInfo* tbInfo = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex); + QUERY_CHECK_NULL(tbInfo, code, lino, _end, terrno); pBlock = getOneRowResultBlock(pTaskInfo, &pInfo->base, pInfo->pResBlock, tbInfo); } if (pBlock != NULL) { @@ -5180,7 +5207,10 @@ int32_t doTableMergeScanParaSubTablesNext(SOperatorInfo* pOperator, SSDataBlock* } pInfo->tableStartIndex = pInfo->tableEndIndex + 1; - pInfo->groupId = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex)->groupId; + STableKeyInfo* pTmpGpId = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex); + QUERY_CHECK_NULL(pTmpGpId, code, lino, _end, terrno); + + pInfo->groupId = pTmpGpId->groupId; code = startSubTablesTableMergeScan(pOperator); QUERY_CHECK_CODE(code, lino, _end); resetLimitInfoForNextGroup(&pInfo->limitInfo); @@ -5510,6 +5540,7 @@ void startGroupTableMergeScan(SOperatorInfo* pOperator) { int32_t i = pInfo->tableStartIndex + 1; for (; i < numOfTables; ++i) { STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i); + QUERY_CHECK_NULL(tableKeyInfo, code, lino, _end, terrno); if (tableKeyInfo->groupId != pInfo->groupId) { break; } @@ -5635,7 +5666,9 @@ int32_t doTableMergeScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { return code; } pInfo->tableStartIndex = 0; - pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex))->groupId; + STableKeyInfo* tmp = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex); + QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); + pInfo->groupId = tmp->groupId; startGroupTableMergeScan(pOperator); } @@ -5649,6 +5682,7 @@ int32_t doTableMergeScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { pOperator); if (pBlock == NULL && !pInfo->bGroupProcessed && pInfo->needCountEmptyTable) { STableKeyInfo* tbInfo = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex); + QUERY_CHECK_NULL(tbInfo, code, lino, _end, terrno); pBlock = getOneRowResultBlock(pTaskInfo, &pInfo->base, pInfo->pResBlock, tbInfo); } if (pBlock != NULL) { @@ -5670,7 +5704,9 @@ int32_t doTableMergeScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { } pInfo->tableStartIndex = pInfo->tableEndIndex + 1; - pInfo->groupId = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex)->groupId; + STableKeyInfo* tmp = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex); + QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); + pInfo->groupId = tmp->groupId; startGroupTableMergeScan(pOperator); resetLimitInfoForNextGroup(&pInfo->limitInfo); }