diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 21b1c2838b..77587b41d3 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -686,8 +686,9 @@ void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t } int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart) { - int32_t code = TSDB_CODE_SUCCESS; - int32_t lino = 0; + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SSDataBlock* pBlock = NULL; if (pColList == NULL) { // data from other sources blockDataCleanup(pRes); code = blockDecode(pRes, pData, (const char**) pNextStart); @@ -710,7 +711,6 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pCo pStart += sizeof(SSysTableSchema); } - SSDataBlock* pBlock = NULL; code = createDataBlock(&pBlock); QUERY_CHECK_CODE(code, lino, _end); @@ -735,10 +735,12 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pCo QUERY_CHECK_CODE(code, lino, _end); blockDataDestroy(pBlock); + pBlock = NULL; } _end: if (code != TSDB_CODE_SUCCESS) { + blockDataDestroy(pBlock); qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } return code; diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index cdd22c2adc..e50d638caf 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -672,6 +672,7 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) { } blockDataDestroy(pDataBlock); + pDataBlock = NULL; if (ret != 0) { pAPI->metaFn.closeTableMetaCursor(pInfo->pCur); pInfo->pCur = NULL; @@ -683,6 +684,7 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) { _end: if (code != TSDB_CODE_SUCCESS) { + blockDataDestroy(pDataBlock); qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); pTaskInfo->code = code; T_LONG_JMP(pTaskInfo->env, code); @@ -695,6 +697,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { int32_t lino = 0; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStorageAPI* pAPI = &pTaskInfo->storageAPI; + SSDataBlock* dataBlock = NULL; SSysTableScanInfo* pInfo = pOperator->info; if (pOperator->status == OP_EXEC_DONE) { @@ -704,7 +707,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { blockDataCleanup(pInfo->pRes); int32_t numOfRows = 0; - SSDataBlock* dataBlock = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_TAGS); + dataBlock = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_TAGS); code = blockDataEnsureCapacity(dataBlock, pOperator->resultInfo.capacity); QUERY_CHECK_CODE(code, lino, _end); @@ -826,6 +829,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { } blockDataDestroy(dataBlock); + dataBlock = NULL; if (ret != 0) { pAPI->metaFn.closeTableMetaCursor(pInfo->pCur); pInfo->pCur = NULL; @@ -837,6 +841,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + blockDataDestroy(dataBlock); pAPI->metaFn.closeTableMetaCursor(pInfo->pCur); pInfo->pCur = NULL; pTaskInfo->code = code; @@ -1310,9 +1315,11 @@ int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) { QUERY_CHECK_CODE(code, lino, _end); blockDataDestroy(p); + p = NULL; _end: if (code != TSDB_CODE_SUCCESS) { + blockDataDestroy(p); qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } return code; @@ -1325,6 +1332,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) { SStorageAPI* pAPI = &pTaskInfo->storageAPI; SSysTableScanInfo* pInfo = pOperator->info; SSysTableIndex* pIdx = pInfo->pIdx; + SSDataBlock* p = NULL; blockDataCleanup(pInfo->pRes); int32_t numOfRows = 0; @@ -1344,7 +1352,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) { varDataSetLen(dbname, strlen(varDataVal(dbname))); - SSDataBlock* p = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_TABLES); + p = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_TABLES); code = blockDataEnsureCapacity(p, pOperator->resultInfo.capacity); QUERY_CHECK_CODE(code, lino, _end); @@ -1545,12 +1553,14 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) { } blockDataDestroy(p); + p = NULL; pInfo->loadInfo.totalRows += pInfo->pRes->info.rows; _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + blockDataDestroy(p); pTaskInfo->code = code; T_LONG_JMP(pTaskInfo->env, code); } @@ -1563,6 +1573,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStorageAPI* pAPI = &pTaskInfo->storageAPI; int8_t firstMetaCursor = 0; + SSDataBlock* p = NULL; SSysTableScanInfo* pInfo = pOperator->info; if (pInfo->pCur == NULL) { @@ -1590,7 +1601,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) { varDataSetLen(dbname, strlen(varDataVal(dbname))); - SSDataBlock* p = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_TABLES); + p = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_TABLES); QUERY_CHECK_NULL(p, code, lino, _end, terrno); code = blockDataEnsureCapacity(p, pOperator->resultInfo.capacity); @@ -1783,6 +1794,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) { } blockDataDestroy(p); + p = NULL; // todo temporarily free the cursor here, the true reason why the free is not valid needs to be found if (ret != 0) { @@ -1796,6 +1808,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) { _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + blockDataDestroy(p); pTaskInfo->code = code; T_LONG_JMP(pTaskInfo->env, code); }