diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 51e3672c56..65be287125 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -4745,9 +4745,12 @@ void tsdbReaderClose2(STsdbReader* pReader) { SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; TARRAY2_DESTROY(&pSupInfo->colAggArray, NULL); - for (int32_t i = 0; i < pSupInfo->numOfCols; ++i) { - if (pSupInfo->buildBuf[i] != NULL) { - taosMemoryFreeClear(pSupInfo->buildBuf[i]); + + if (pSupInfo->buildBuf) { + for (int32_t i = 0; i < pSupInfo->numOfCols; ++i) { + if (pSupInfo->buildBuf[i] != NULL) { + taosMemoryFreeClear(pSupInfo->buildBuf[i]); + } } } @@ -5762,6 +5765,7 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs int32_t code = 0; STsdb* pTsdb = pReader->pTsdb; SVersionRange* pRange = &pReader->info.verRange; + *ppSnap = NULL; // lock code = taosThreadMutexLock(&pTsdb->mutex); @@ -5774,7 +5778,7 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs STsdbReadSnap* pSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap)); if (pSnap == NULL) { (void) taosThreadMutexUnlock(&pTsdb->mutex); - code = TSDB_CODE_OUT_OF_MEMORY; + code = terrno; goto _exit; } @@ -5784,7 +5788,7 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs pSnap->pNode = taosMemoryMalloc(sizeof(*pSnap->pNode)); if (pSnap->pNode == NULL) { (void) taosThreadMutexUnlock(&pTsdb->mutex); - code = TSDB_CODE_OUT_OF_MEMORY; + code = terrno; goto _exit; } @@ -5798,8 +5802,10 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs pSnap->pIMem = pTsdb->imem; pSnap->pINode = taosMemoryMalloc(sizeof(*pSnap->pINode)); if (pSnap->pINode == NULL) { + tsdbUnrefMemTable(pTsdb->mem, pSnap->pNode, true); // unref the previous refed mem + code = terrno; + (void) taosThreadMutexUnlock(&pTsdb->mutex); - code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } @@ -5811,28 +5817,33 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs // fs code = tsdbFSCreateRefSnapshotWithoutLock(pTsdb->pFS, &pSnap->pfSetArray); + if (code) { + if (pSnap->pNode) { + tsdbUnrefMemTable(pTsdb->mem, pSnap->pNode, true); // unref the previous refed mem + } + + if (pSnap->pINode) { + tsdbUnrefMemTable(pTsdb->imem, pSnap->pINode, true); + } + + (void) taosThreadMutexUnlock(&pTsdb->mutex); + goto _exit; + } // unlock (void) taosThreadMutexUnlock(&pTsdb->mutex); + *ppSnap = pSnap; - if (code == TSDB_CODE_SUCCESS) { - tsdbTrace("vgId:%d, take read snapshot", TD_VID(pTsdb->pVnode)); - } + tsdbTrace("vgId:%d, take read snapshot", TD_VID(pTsdb->pVnode)); + return code; _exit: - if (code != TSDB_CODE_SUCCESS) { - tsdbError("vgId:%d take read snapshot failed, code:%s", TD_VID(pTsdb->pVnode), tstrerror(code)); - - *ppSnap = NULL; - if (pSnap) { - if (pSnap->pNode) taosMemoryFree(pSnap->pNode); - if (pSnap->pINode) taosMemoryFree(pSnap->pINode); - taosMemoryFree(pSnap); - } - } else { - *ppSnap = pSnap; + tsdbError("vgId:%d take read snapshot failed, code:%s", TD_VID(pTsdb->pVnode), tstrerror(code)); + if (pSnap) { + if (pSnap->pNode) taosMemoryFree(pSnap->pNode); + if (pSnap->pINode) taosMemoryFree(pSnap->pINode); + taosMemoryFree(pSnap); } - return code; } diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index ce19aa62c3..d9e413b144 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -108,7 +108,6 @@ int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandl if (pInfo == NULL || pOperator == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - tableListDestroy(pTableListInfo); goto _error; } @@ -245,6 +244,7 @@ _error: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } + pInfo->pTableList = NULL; destroyCacheScanOperator(pInfo); taosMemoryFree(pOperator); return code; diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index ed63df6f4e..2688f5698b 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -409,6 +409,7 @@ int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNo pInfo->pDummyBlock = createDataBlockFromDescNode(pExNode->node.pOutputDataBlockDesc); QUERY_CHECK_NULL(pInfo->pDummyBlock, code, lino, _error, terrno); + pInfo->pResultBlockList = taosArrayInit(64, POINTER_BYTES); QUERY_CHECK_NULL(pInfo->pResultBlockList, code, lino, _error, terrno); pInfo->pRecycledBlocks = taosArrayInit(64, POINTER_BYTES); diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index 4f1b2b1fdd..174e68ea7a 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -391,6 +391,11 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand //pTaskInfo->schemaInfo.qsw = extractQueriedColumnSchema(&pTableScanNode->scan); code = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTableListInfo, pTaskInfo, &pOperator); + if (code) { + pTaskInfo->code = code; + tableListDestroy(pTableListInfo); + return code; + } } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) { SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode; code = createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pUser, pTaskInfo, &pOperator); @@ -411,10 +416,17 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; qError("failed to getTableList, code: %s", tstrerror(code)); + tableListDestroy(pTableListInfo); return code; } } - code = createTagScanOperatorInfo(pHandle, pTagScanPhyNode, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo, &pOperator); + code = createTagScanOperatorInfo(pHandle, pTagScanPhyNode, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo, + &pOperator); + if (code) { + pTaskInfo->code = code; + tableListDestroy(pTableListInfo); + return code; + } } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) { SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode; STableListInfo* pTableListInfo = tableListCreate(); @@ -430,6 +442,7 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; taosArrayDestroy(pList); + tableListDestroy(pTableListInfo); return code; } @@ -437,13 +450,14 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand for (int32_t i = 0; i < num; ++i) { uint64_t* id = taosArrayGet(pList, i); if (id == NULL) { - pTaskInfo->code = terrno; - return terrno; + continue; } code = tableListAddTableInfo(pTableListInfo, *id, 0); if (code) { pTaskInfo->code = code; + tableListDestroy(pTableListInfo); + taosArrayDestroy(pList); return code; } } @@ -453,11 +467,17 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand code = tableListAddTableInfo(pTableListInfo, pBlockNode->uid, 0); if (code) { pTaskInfo->code = code; + tableListDestroy(pTableListInfo); return code; } } code = createDataBlockInfoScanOperator(pHandle, pBlockNode, pTableListInfo, pTaskInfo, &pOperator); + if (code) { + pTaskInfo->code = code; + tableListDestroy(pTableListInfo); + return code; + } } else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) { SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode; STableListInfo* pTableListInfo = tableListCreate(); @@ -467,20 +487,27 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand return terrno; } - code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo, - pTagCond, pTagIndexCond, pTaskInfo); + code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo, pTagCond, + pTagIndexCond, pTaskInfo); if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; + tableListDestroy(pTableListInfo); return code; } code = initQueriedTableSchemaInfo(pHandle, &pScanNode->scan, dbname, pTaskInfo); if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; + tableListDestroy(pTableListInfo); return code; } code = createCacherowsScanOperator(pScanNode, pHandle, pTableListInfo, pTaskInfo, &pOperator); + if (code) { + tableListDestroy(pTableListInfo); + pTaskInfo->code = code; + return code; + } } else if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) { code = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo, &pOperator); } else { diff --git a/source/libs/executor/src/querytask.c b/source/libs/executor/src/querytask.c index 6f0bc40d29..5c4d3fe009 100644 --- a/source/libs/executor/src/querytask.c +++ b/source/libs/executor/src/querytask.c @@ -136,8 +136,7 @@ void cleanupQueriedTableScanInfo(void* p) { int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, const char* dbName, SExecTaskInfo* pTaskInfo) { SMetaReader mr = {0}; if (pHandle == NULL) { - terrno = TSDB_CODE_INVALID_PARA; - return terrno; + return TSDB_CODE_INVALID_PARA; } SStorageAPI* pAPI = &pTaskInfo->storageAPI; @@ -149,7 +148,7 @@ int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNo GET_TASKID(pTaskInfo)); pAPI->metaReaderFn.clearReader(&mr); - return terrno; + return code; } SSchemaInfo schemaInfo = {0}; @@ -169,7 +168,7 @@ int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNo pAPI->metaReaderFn.clearReader(&mr); taosMemoryFree(schemaInfo.tablename); taosMemoryFree(schemaInfo.dbname); - return terrno; + return code; } schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b12b621aae..0c714a163d 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1422,6 +1422,7 @@ int32_t createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHa _error: if (pInfo != NULL) { + pInfo->base.pTableListInfo = NULL; // this attribute will be destroy outside of this function destroyTableScanOperatorInfo(pInfo); } @@ -3891,7 +3892,6 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* if (pInfo == NULL || pOperator == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - tableListDestroy(pTableListInfo); goto _error; } @@ -3904,7 +3904,6 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* int32_t numOfCols = 0; code = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo); if (code != TSDB_CODE_SUCCESS) { - tableListDestroy(pTableListInfo); goto _error; } @@ -3935,8 +3934,7 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* if (pTableScanNode->pSubtable != NULL) { SExprInfo* pSubTableExpr = taosMemoryCalloc(1, sizeof(SExprInfo)); if (pSubTableExpr == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tableListDestroy(pTableListInfo); + code = terrno; goto _error; } @@ -3945,7 +3943,6 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* QUERY_CHECK_CODE(code, lino, _error); if (initExprSupp(&pInfo->tbnameCalSup, pSubTableExpr, 1, &pTaskInfo->storageAPI.functionStore) != 0) { - tableListDestroy(pTableListInfo); goto _error; } } @@ -3955,20 +3952,17 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* SExprInfo* pTagExpr = createExpr(pTableScanNode->pTags, &numOfTags); if (pTagExpr == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - tableListDestroy(pTableListInfo); goto _error; } if (initExprSupp(&pInfo->tagCalSup, pTagExpr, numOfTags, &pTaskInfo->storageAPI.functionStore) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tableListDestroy(pTableListInfo); + code = TSDB_CODE_OUT_OF_MEMORY; goto _error; } } pInfo->pBlockLists = taosArrayInit(4, sizeof(SPackedData)); if (pInfo->pBlockLists == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tableListDestroy(pTableListInfo); + code = terrno; goto _error; } @@ -4656,6 +4650,10 @@ int32_t createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* p return code; _error: + if (pInfo) { + pInfo->pTableListInfo = NULL; + } + taosMemoryFree(pInfo); taosMemoryFree(pOperator); return code; @@ -5878,7 +5876,8 @@ int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SR return code; _error: - pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; + pTaskInfo->code = code; + pInfo->base.pTableListInfo = NULL; taosMemoryFree(pInfo); taosMemoryFree(pOperator); return code; diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 010f53cd4c..8fb0646495 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -101,8 +101,10 @@ int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortN } pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode); - QUERY_CHECK_NULL(pInfo->binfo.pRes , code, lino, _error, terrno); + QUERY_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno); + pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys); + TSDB_CHECK_NULL(pInfo->pSortInfo, code, lino, _error, terrno); if (pSortNode->calcGroupId) { int32_t keyLen; @@ -790,7 +792,8 @@ int32_t createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNo QUERY_CHECK_NULL(pOperator->exprSupp.pCtx, code, lino, _error, terrno); pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode); - QUERY_CHECK_NULL(pInfo->binfo.pRes , code, lino, _error, terrno); + QUERY_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno); + code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); TSDB_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index e95ffeb658..563b78144f 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -3738,9 +3738,7 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + QUERY_CHECK_CODE(code, lino, _error); pInfo->twAggSup = (STimeWindowAggSupp){ .waterMark = pSessionNode->window.watermark, diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 31b4f66206..2ba9152952 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -2811,6 +2811,9 @@ int32_t createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanP return code; _error: + if (pInfo) { + pInfo->pTableListInfo = NULL; + } taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pOperator); return code; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 7d1c0b08a0..2383c09dac 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1728,9 +1728,7 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + QUERY_CHECK_CODE(code, lino, _error); pInfo->twAggSup.waterMark = pSessionNode->window.watermark; pInfo->twAggSup.calTrigger = pSessionNode->window.triggerType; @@ -1754,15 +1752,11 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh QUERY_CHECK_CODE(code, lino, _error); code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + QUERY_CHECK_CODE(code, lino, _error); } code = filterInitFromNode((SNode*)pSessionNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + QUERY_CHECK_CODE(code, lino, _error); setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED, pInfo, pTaskInfo); @@ -1770,9 +1764,7 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); pOperator->pTaskInfo = pTaskInfo; code = appendDownstream(pOperator, &downstream, 1); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + QUERY_CHECK_CODE(code, lino, _error); *pOptrInfo = pOperator; return code; diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 300d7576d9..755eef4431 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -469,10 +469,10 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) { int32_t code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, "sortExternalBuf", tsTempDir); - dBufSetPrintInfo(pHandle->pBuf); if (code != TSDB_CODE_SUCCESS) { return code; } + dBufSetPrintInfo(pHandle->pBuf); } SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t)); @@ -562,10 +562,10 @@ static int32_t sortComparInit(SMsortComparParam* pParam, SArray* pSources, int32 code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, "sortComparInit", tsTempDir); - dBufSetPrintInfo(pHandle->pBuf); if (code != TSDB_CODE_SUCCESS) { - terrno = code; return code; + } else { + dBufSetPrintInfo(pHandle->pBuf); } } @@ -1111,9 +1111,10 @@ static int32_t createPageBuf(SSortHandle* pHandle) { int32_t code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, "tableBlocksBuf", tsTempDir); - dBufSetPrintInfo(pHandle->pBuf); if (code != TSDB_CODE_SUCCESS) { return code; + } else { + dBufSetPrintInfo(pHandle->pBuf); } } return 0;