diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 652ebc0f9a..a6224bf2f0 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -448,7 +448,7 @@ void destroyCacheScanOperator(void* param) { taosArrayDestroy(pInfo->matchInfo.pList); tableListDestroy(pInfo->pTableList); - if (pInfo->pLastrowReader != NULL) { + if (pInfo->pLastrowReader != NULL && pInfo->readHandle.api.cacheFn.closeReader != NULL) { pInfo->readHandle.api.cacheFn.closeReader(pInfo->pLastrowReader); pInfo->pLastrowReader = NULL; } diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 22bb6524f1..53030556bd 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -707,8 +707,9 @@ void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t } int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart) { - int32_t code = TSDB_CODE_SUCCESS; - int32_t lino = 0; + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SSDataBlock* pBlock = NULL; if (pColList == NULL) { // data from other sources blockDataCleanup(pRes); code = blockDecode(pRes, pData, (const char**)pNextStart); @@ -731,7 +732,6 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pCo pStart += sizeof(SSysTableSchema); } - SSDataBlock* pBlock = NULL; code = createDataBlock(&pBlock); QUERY_CHECK_CODE(code, lino, _end); @@ -756,10 +756,12 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pCo QUERY_CHECK_CODE(code, lino, _end); blockDataDestroy(pBlock); + pBlock = NULL; } _end: if (code != TSDB_CODE_SUCCESS) { + blockDataDestroy(pBlock); qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } return code; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 1b7ae670fe..7a770b8919 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3777,7 +3777,7 @@ static void destroyStreamScanOperatorInfo(void* param) { destroyOperator(pStreamScan->pTableScanOp); } - if (pStreamScan->tqReader) { + if (pStreamScan->tqReader != NULL && pStreamScan->readerFn.tqReaderClose != NULL) { pStreamScan->readerFn.tqReaderClose(pStreamScan->tqReader); } if (pStreamScan->matchInfo.pList) { @@ -4607,7 +4607,7 @@ static SSDataBlock* doTagScanFromMetaEntry(SOperatorInfo* pOperator) { static void destroyTagScanOperatorInfo(void* param) { STagScanInfo* pInfo = (STagScanInfo*)param; - if (pInfo->pCtbCursor != NULL) { + if (pInfo->pCtbCursor != NULL && pInfo->pStorageAPI != NULL) { pInfo->pStorageAPI->metaFn.closeCtbCursor(pInfo->pCtbCursor); } taosHashCleanup(pInfo->filterCtx.colHash); @@ -5769,8 +5769,10 @@ void destroyTableMergeScanOperatorInfo(void* param) { STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param; // start one reader variable - pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader); - pTableScanInfo->base.dataReader = NULL; + if (pTableScanInfo->base.readerAPI.tsdReaderClose != NULL) { + pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader); + pTableScanInfo->base.dataReader = NULL; + } for (int32_t i = 0; i < pTableScanInfo->numNextDurationBlocks; ++i) { if (pTableScanInfo->nextDurationBlocks[i] != NULL) { diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index cdd22c2adc..2ce9b1d756 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -672,6 +672,7 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) { } blockDataDestroy(pDataBlock); + pDataBlock = NULL; if (ret != 0) { pAPI->metaFn.closeTableMetaCursor(pInfo->pCur); pInfo->pCur = NULL; @@ -683,6 +684,7 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) { _end: if (code != TSDB_CODE_SUCCESS) { + blockDataDestroy(pDataBlock); qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); pTaskInfo->code = code; T_LONG_JMP(pTaskInfo->env, code); @@ -695,6 +697,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { int32_t lino = 0; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStorageAPI* pAPI = &pTaskInfo->storageAPI; + SSDataBlock* dataBlock = NULL; SSysTableScanInfo* pInfo = pOperator->info; if (pOperator->status == OP_EXEC_DONE) { @@ -704,7 +707,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { blockDataCleanup(pInfo->pRes); int32_t numOfRows = 0; - SSDataBlock* dataBlock = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_TAGS); + dataBlock = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_TAGS); code = blockDataEnsureCapacity(dataBlock, pOperator->resultInfo.capacity); QUERY_CHECK_CODE(code, lino, _end); @@ -826,6 +829,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { } blockDataDestroy(dataBlock); + dataBlock = NULL; if (ret != 0) { pAPI->metaFn.closeTableMetaCursor(pInfo->pCur); pInfo->pCur = NULL; @@ -837,6 +841,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + blockDataDestroy(dataBlock); pAPI->metaFn.closeTableMetaCursor(pInfo->pCur); pInfo->pCur = NULL; pTaskInfo->code = code; @@ -1310,9 +1315,11 @@ int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) { QUERY_CHECK_CODE(code, lino, _end); blockDataDestroy(p); + p = NULL; _end: if (code != TSDB_CODE_SUCCESS) { + blockDataDestroy(p); qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } return code; @@ -1325,6 +1332,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) { SStorageAPI* pAPI = &pTaskInfo->storageAPI; SSysTableScanInfo* pInfo = pOperator->info; SSysTableIndex* pIdx = pInfo->pIdx; + SSDataBlock* p = NULL; blockDataCleanup(pInfo->pRes); int32_t numOfRows = 0; @@ -1344,7 +1352,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) { varDataSetLen(dbname, strlen(varDataVal(dbname))); - SSDataBlock* p = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_TABLES); + p = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_TABLES); code = blockDataEnsureCapacity(p, pOperator->resultInfo.capacity); QUERY_CHECK_CODE(code, lino, _end); @@ -1545,12 +1553,14 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) { } blockDataDestroy(p); + p = NULL; pInfo->loadInfo.totalRows += pInfo->pRes->info.rows; _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + blockDataDestroy(p); pTaskInfo->code = code; T_LONG_JMP(pTaskInfo->env, code); } @@ -1563,6 +1573,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStorageAPI* pAPI = &pTaskInfo->storageAPI; int8_t firstMetaCursor = 0; + SSDataBlock* p = NULL; SSysTableScanInfo* pInfo = pOperator->info; if (pInfo->pCur == NULL) { @@ -1590,7 +1601,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) { varDataSetLen(dbname, strlen(varDataVal(dbname))); - SSDataBlock* p = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_TABLES); + p = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_TABLES); QUERY_CHECK_NULL(p, code, lino, _end, terrno); code = blockDataEnsureCapacity(p, pOperator->resultInfo.capacity); @@ -1783,6 +1794,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) { } blockDataDestroy(p); + p = NULL; // todo temporarily free the cursor here, the true reason why the free is not valid needs to be found if (ret != 0) { @@ -1796,6 +1808,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) { _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + blockDataDestroy(p); pTaskInfo->code = code; T_LONG_JMP(pTaskInfo->env, code); } @@ -2244,7 +2257,7 @@ void destroySysScanOperator(void* param) { if (strncasecmp(name, TSDB_INS_TABLE_TABLES, TSDB_TABLE_FNAME_LEN) == 0 || strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0 || strncasecmp(name, TSDB_INS_TABLE_COLS, TSDB_TABLE_FNAME_LEN) == 0 || pInfo->pCur != NULL) { - if (pInfo->pAPI->metaFn.closeTableMetaCursor != NULL) { + if (pInfo->pAPI != NULL && pInfo->pAPI->metaFn.closeTableMetaCursor != NULL) { pInfo->pAPI->metaFn.closeTableMetaCursor(pInfo->pCur); } @@ -2726,7 +2739,9 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) { static void destroyBlockDistScanOperatorInfo(void* param) { SBlockDistInfo* pDistInfo = (SBlockDistInfo*)param; blockDataDestroy(pDistInfo->pResBlock); - pDistInfo->readHandle.api.tsdReader.tsdReaderClose(pDistInfo->pHandle); + if (pDistInfo->readHandle.api.tsdReader.tsdReaderClose != NULL) { + pDistInfo->readHandle.api.tsdReader.tsdReaderClose(pDistInfo->pHandle); + } tableListDestroy(pDistInfo->pTableListInfo); taosMemoryFreeClear(param); } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 99fa941071..1d9e789cd5 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2094,7 +2094,7 @@ int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMerge return code; _error: - destroyMAIOperatorInfo(miaInfo); + if (miaInfo != NULL) destroyMAIOperatorInfo(miaInfo); destroyOperator(pOperator); pTaskInfo->code = code; return code;