From 1edcd654d401598e258a80544e78cbe381c79864 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Tue, 6 Aug 2024 09:24:31 +0800 Subject: [PATCH] adj operator res --- source/libs/executor/src/cachescanoperator.c | 11 ++-- .../libs/executor/src/countwindowoperator.c | 6 ++ source/libs/executor/src/executor.c | 11 ++++ source/libs/executor/src/executorInt.c | 5 ++ source/libs/executor/src/scanoperator.c | 56 ++++++++++++++++--- source/libs/executor/src/sysscanoperator.c | 1 + 6 files changed, 76 insertions(+), 14 deletions(-) diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 73f770a599..6d708f2a4f 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -173,6 +173,7 @@ int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandl pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_ALL | SCAN_ROW_TYPE(pScanNode->ignoreNull); STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0); + QUERY_CHECK_NULL(pList, code, lino, _error, terrno); uint64_t suid = tableListGetSuid(pTableListInfo); code = pInfo->readHandle.api.cacheFn.openReader(pInfo->readHandle.vnode, pInfo->retrieveType, pList, totalTables, @@ -413,8 +414,8 @@ void destroyCacheScanOperator(void* param) { SCacheRowsScanInfo* pInfo = (SCacheRowsScanInfo*)param; blockDataDestroy(pInfo->pRes); blockDataDestroy(pInfo->pBufferedRes); - taosMemoryFree(pInfo->pSlotIds); - taosMemoryFree(pInfo->pDstSlotIds); + taosMemoryFreeClear(pInfo->pSlotIds); + taosMemoryFreeClear(pInfo->pDstSlotIds); taosArrayDestroy(pInfo->pCidList); taosArrayDestroy(pInfo->pFuncTypeList); taosArrayDestroy(pInfo->pUidList); @@ -436,13 +437,13 @@ int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTask *pSlotIds = taosMemoryMalloc(numOfCols * sizeof(int32_t)); if (*pSlotIds == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } *pDstSlotIds = taosMemoryMalloc(numOfCols * sizeof(int32_t)); if (*pDstSlotIds == NULL) { - taosMemoryFree(*pSlotIds); - return TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFreeClear(*pSlotIds); + return terrno; } SSchemaInfo* pSchemaInfo = taosArrayGetLast(pTaskInfo->schemaInfos); diff --git a/source/libs/executor/src/countwindowoperator.c b/source/libs/executor/src/countwindowoperator.c index e382bbc9ed..6affe3ea75 100644 --- a/source/libs/executor/src/countwindowoperator.c +++ b/source/libs/executor/src/countwindowoperator.c @@ -79,9 +79,15 @@ static int32_t setCountWindowOutputBuff(SExprSupp* pExprSup, SCountWindowSupp* p int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SCountWindowResult* pBuff = getCountWinStateInfo(pCountSup); + QUERY_CHECK_NULL(pBuff, code, lino, _end, terrno); (*pResult) = &pBuff->row; code = setResultRowInitCtx(*pResult, pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset); (*ppResBuff) = pBuff; + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } return code; } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index ef76f14aa9..5b95c073f9 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1274,6 +1274,11 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT if (uid == 0) { if (numOfTables != 0) { STableKeyInfo* tmp = tableListGetInfo(pTableListInfo, 0); + if (!tmp) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno)); + taosRUnLockLatch(&pTaskInfo->lock); + return terrno; + } if (tmp) uid = tmp->uid; ts = INT64_MIN; pScanInfo->currentTable = 0; @@ -1403,6 +1408,11 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT } STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0); + if (!pList) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + tDeleteSchemaWrapper(mtInfo.schema); + return code; + } int32_t size = tableListGetSize(pTableListInfo); code = pTaskInfo->storageAPI.tsdReader.tsdReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, @@ -1487,6 +1497,7 @@ SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo) { int32_t numOfTables = tableListGetSize(pTableListInfo); for (int32_t i = 0; i < numOfTables; ++i) { STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i); + QUERY_CHECK_NULL(pKeyInfo, code, lino, _end, terrno); void* tmp = taosArrayPush(pUidList, &pKeyInfo->uid); QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); } diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index e339ca5d29..0fb70e52f4 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -1100,6 +1100,11 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, SExecTaskInfo* for (int32_t i = 0; i < numOfTables; ++i) { STableKeyInfo* pTable = tableListGetInfo(pTableListInfo, i); + if (!pTable) { + taosArrayDestroy(pDeleterParam->pUidList); + taosMemoryFree(pDeleterParam); + return TSDB_CODE_OUT_OF_MEMORY; + } void* tmp = taosArrayPush(pDeleterParam->pUidList, &pTable->uid); if (!tmp) { taosArrayDestroy(pDeleterParam->pUidList); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 37c5bc1aeb..dbe92c1082 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1257,8 +1257,14 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { (*ppRes) = NULL; return code; } - - tInfo = *(STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable); + STableKeyInfo* tmp = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable); + if (!tmp) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno)); + taosRUnLockLatch(&pTaskInfo->lock); + (*ppRes) = NULL; + return terrno; + } + tInfo = *tmp; taosRUnLockLatch(&pTaskInfo->lock); code = pAPI->tsdReader.tsdSetQueryTableList(pInfo->base.dataReader, &tInfo, 1); @@ -3507,6 +3513,7 @@ static int32_t extractTableIdList(const STableListInfo* pTableListInfo, SArray** size_t size = tableListGetSize(pTableListInfo); for (int32_t i = 0; i < size; ++i) { STableKeyInfo* pkeyInfo = tableListGetInfo(pTableListInfo, i); + QUERY_CHECK_NULL(pkeyInfo, code, lino, _end, terrno); void* tmp = taosArrayPush(tableIdList, &pkeyInfo->uid); QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); } @@ -4110,6 +4117,14 @@ static void doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes, SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0]; STableKeyInfo* item = tableListGetInfo(pInfo->pTableListInfo, pInfo->curPos); + if (!item) { + qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno), + GET_TASKID(pTaskInfo)); + tDecoderClear(&(*mr).coder); + pAPI->metaReaderFn.clearReader(mr); + T_LONG_JMP(pTaskInfo->env, terrno); + } + code = pAPI->metaReaderFn.getTableEntryByUid(mr, item->uid); tDecoderClear(&(*mr).coder); if (code != TSDB_CODE_SUCCESS) { @@ -4787,18 +4802,23 @@ static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSu return TSDB_CODE_SUCCESS; } -static void setGroupStartEndIndex(STableMergeScanInfo* pInfo) { +static int32_t setGroupStartEndIndex(STableMergeScanInfo* pInfo) { pInfo->bGroupProcessed = false; size_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo); int32_t i = pInfo->tableStartIndex + 1; for (; i < numOfTables; ++i) { STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i); + if (!tableKeyInfo) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno)); + return terrno; + } if (tableKeyInfo->groupId != pInfo->groupId) { break; } } pInfo->tableEndIndex = i - 1; + return TSDB_CODE_SUCCESS; } static int32_t openSubTablesMergeSort(STmsSubTablesMergeInfo* pSubTblsInfo) { @@ -4823,7 +4843,11 @@ static int32_t openSubTablesMergeSort(STmsSubTablesMergeInfo* pSubTblsInfo) { } static int32_t initSubTablesMergeInfo(STableMergeScanInfo* pInfo) { - setGroupStartEndIndex(pInfo); + int32_t code = setGroupStartEndIndex(pInfo); + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + return code; + } STmsSubTablesMergeInfo* pSubTblsInfo = taosMemoryCalloc(1, sizeof(STmsSubTablesMergeInfo)); if (pSubTblsInfo == NULL) { return TSDB_CODE_OUT_OF_MEMORY; @@ -4842,7 +4866,7 @@ static int32_t initSubTablesMergeInfo(STableMergeScanInfo* pInfo) { } int32_t bufPageSize = pInfo->bufPageSize; int32_t inMemSize = (pSubTblsInfo->numSubTables - pSubTblsInfo->numTableBlocksInMem) * bufPageSize; - int32_t code = + code = createDiskbasedBuf(&pSubTblsInfo->pBlocksBuf, pInfo->bufPageSize, inMemSize, "blocksExternalBuf", tsTempDir); if (code != TSDB_CODE_SUCCESS) { taosMemoryFree(pSubTblsInfo->aInputs); @@ -4879,6 +4903,7 @@ static int32_t initSubTableInputs(SOperatorInfo* pOperator, STableMergeScanInfo* QUERY_CHECK_CODE(code, lino, _end); STableKeyInfo* keyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i + pInfo->tableStartIndex); + QUERY_CHECK_NULL(keyInfo, code, lino, _end, terrno); pInput->pKeyInfo = keyInfo; if (isTaskKilled(pTaskInfo)) { @@ -5114,7 +5139,9 @@ int32_t doTableMergeScanParaSubTablesNext(SOperatorInfo* pOperator, SSDataBlock* return code; } pInfo->tableStartIndex = 0; - pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex))->groupId; + STableKeyInfo* pTmpGpId = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex); + QUERY_CHECK_NULL(pTmpGpId, code, lino, _end, terrno); + pInfo->groupId = pTmpGpId->groupId; code = startSubTablesTableMergeScan(pOperator); QUERY_CHECK_CODE(code, lino, _end); } @@ -5128,6 +5155,7 @@ int32_t doTableMergeScanParaSubTablesNext(SOperatorInfo* pOperator, SSDataBlock* pBlock = getSubTablesSortedBlock(pOperator, pInfo->pResBlock, pOperator->resultInfo.capacity); if (pBlock == NULL && !pInfo->bGroupProcessed && pInfo->needCountEmptyTable) { STableKeyInfo* tbInfo = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex); + QUERY_CHECK_NULL(tbInfo, code, lino, _end, terrno); pBlock = getOneRowResultBlock(pTaskInfo, &pInfo->base, pInfo->pResBlock, tbInfo); } if (pBlock != NULL) { @@ -5144,7 +5172,10 @@ int32_t doTableMergeScanParaSubTablesNext(SOperatorInfo* pOperator, SSDataBlock* } pInfo->tableStartIndex = pInfo->tableEndIndex + 1; - pInfo->groupId = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex)->groupId; + STableKeyInfo* pTmpGpId = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex); + QUERY_CHECK_NULL(pTmpGpId, code, lino, _end, terrno); + + pInfo->groupId = pTmpGpId->groupId; code = startSubTablesTableMergeScan(pOperator); QUERY_CHECK_CODE(code, lino, _end); resetLimitInfoForNextGroup(&pInfo->limitInfo); @@ -5473,6 +5504,7 @@ void startGroupTableMergeScan(SOperatorInfo* pOperator) { int32_t i = pInfo->tableStartIndex + 1; for (; i < numOfTables; ++i) { STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i); + QUERY_CHECK_NULL(tableKeyInfo, code, lino, _end, terrno); if (tableKeyInfo->groupId != pInfo->groupId) { break; } @@ -5485,6 +5517,7 @@ void startGroupTableMergeScan(SOperatorInfo* pOperator) { int32_t numOfTable = tableEndIdx - tableStartIdx + 1; STableKeyInfo* startKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, tableStartIdx); + QUERY_CHECK_NULL(startKeyInfo, code, lino, _end, terrno); code = pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, startKeyInfo, numOfTable, pInfo->pReaderBlock, (void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), &pInfo->mSkipTables); QUERY_CHECK_CODE(code, lino, _end); @@ -5598,7 +5631,9 @@ int32_t doTableMergeScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { return code; } pInfo->tableStartIndex = 0; - pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex))->groupId; + STableKeyInfo* tmp = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex); + QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); + pInfo->groupId = tmp->groupId; startGroupTableMergeScan(pOperator); } @@ -5612,6 +5647,7 @@ int32_t doTableMergeScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { pOperator); if (pBlock == NULL && !pInfo->bGroupProcessed && pInfo->needCountEmptyTable) { STableKeyInfo* tbInfo = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex); + QUERY_CHECK_NULL(tbInfo, code, lino, _end, terrno); pBlock = getOneRowResultBlock(pTaskInfo, &pInfo->base, pInfo->pResBlock, tbInfo); } if (pBlock != NULL) { @@ -5633,7 +5669,9 @@ int32_t doTableMergeScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { } pInfo->tableStartIndex = pInfo->tableEndIndex + 1; - pInfo->groupId = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex)->groupId; + STableKeyInfo* tmp = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex); + QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); + pInfo->groupId = tmp->groupId; startGroupTableMergeScan(pOperator); resetLimitInfoForNextGroup(&pInfo->limitInfo); } diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 14fda685c0..af12313b29 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -2722,6 +2722,7 @@ int32_t createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanP pInfo->pTableListInfo = pTableListInfo; size_t num = tableListGetSize(pTableListInfo); void* pList = tableListGetInfo(pTableListInfo, 0); + QUERY_CHECK_NULL(pList, code, lino, _error, terrno); code = readHandle->api.tsdReader.tsdReaderOpen(readHandle->vnode, &cond, pList, num, pInfo->pResBlock, (void**)&pInfo->pHandle, pTaskInfo->id.str, NULL);