commit
011d5b2bd8
|
@ -448,7 +448,7 @@ void destroyCacheScanOperator(void* param) {
|
||||||
taosArrayDestroy(pInfo->matchInfo.pList);
|
taosArrayDestroy(pInfo->matchInfo.pList);
|
||||||
tableListDestroy(pInfo->pTableList);
|
tableListDestroy(pInfo->pTableList);
|
||||||
|
|
||||||
if (pInfo->pLastrowReader != NULL) {
|
if (pInfo->pLastrowReader != NULL && pInfo->readHandle.api.cacheFn.closeReader != NULL) {
|
||||||
pInfo->readHandle.api.cacheFn.closeReader(pInfo->pLastrowReader);
|
pInfo->readHandle.api.cacheFn.closeReader(pInfo->pLastrowReader);
|
||||||
pInfo->pLastrowReader = NULL;
|
pInfo->pLastrowReader = NULL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -709,6 +709,7 @@ void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t
|
||||||
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart) {
|
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
SSDataBlock* pBlock = NULL;
|
||||||
if (pColList == NULL) { // data from other sources
|
if (pColList == NULL) { // data from other sources
|
||||||
blockDataCleanup(pRes);
|
blockDataCleanup(pRes);
|
||||||
code = blockDecode(pRes, pData, (const char**)pNextStart);
|
code = blockDecode(pRes, pData, (const char**)pNextStart);
|
||||||
|
@ -731,7 +732,6 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pCo
|
||||||
pStart += sizeof(SSysTableSchema);
|
pStart += sizeof(SSysTableSchema);
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* pBlock = NULL;
|
|
||||||
code = createDataBlock(&pBlock);
|
code = createDataBlock(&pBlock);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
|
@ -756,10 +756,12 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pCo
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
blockDataDestroy(pBlock);
|
blockDataDestroy(pBlock);
|
||||||
|
pBlock = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
_end:
|
_end:
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
blockDataDestroy(pBlock);
|
||||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -3777,7 +3777,7 @@ static void destroyStreamScanOperatorInfo(void* param) {
|
||||||
destroyOperator(pStreamScan->pTableScanOp);
|
destroyOperator(pStreamScan->pTableScanOp);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pStreamScan->tqReader) {
|
if (pStreamScan->tqReader != NULL && pStreamScan->readerFn.tqReaderClose != NULL) {
|
||||||
pStreamScan->readerFn.tqReaderClose(pStreamScan->tqReader);
|
pStreamScan->readerFn.tqReaderClose(pStreamScan->tqReader);
|
||||||
}
|
}
|
||||||
if (pStreamScan->matchInfo.pList) {
|
if (pStreamScan->matchInfo.pList) {
|
||||||
|
@ -4607,7 +4607,7 @@ static SSDataBlock* doTagScanFromMetaEntry(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
static void destroyTagScanOperatorInfo(void* param) {
|
static void destroyTagScanOperatorInfo(void* param) {
|
||||||
STagScanInfo* pInfo = (STagScanInfo*)param;
|
STagScanInfo* pInfo = (STagScanInfo*)param;
|
||||||
if (pInfo->pCtbCursor != NULL) {
|
if (pInfo->pCtbCursor != NULL && pInfo->pStorageAPI != NULL) {
|
||||||
pInfo->pStorageAPI->metaFn.closeCtbCursor(pInfo->pCtbCursor);
|
pInfo->pStorageAPI->metaFn.closeCtbCursor(pInfo->pCtbCursor);
|
||||||
}
|
}
|
||||||
taosHashCleanup(pInfo->filterCtx.colHash);
|
taosHashCleanup(pInfo->filterCtx.colHash);
|
||||||
|
@ -5769,8 +5769,10 @@ void destroyTableMergeScanOperatorInfo(void* param) {
|
||||||
STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
|
STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
|
||||||
|
|
||||||
// start one reader variable
|
// start one reader variable
|
||||||
|
if (pTableScanInfo->base.readerAPI.tsdReaderClose != NULL) {
|
||||||
pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
|
pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
|
||||||
pTableScanInfo->base.dataReader = NULL;
|
pTableScanInfo->base.dataReader = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < pTableScanInfo->numNextDurationBlocks; ++i) {
|
for (int32_t i = 0; i < pTableScanInfo->numNextDurationBlocks; ++i) {
|
||||||
if (pTableScanInfo->nextDurationBlocks[i] != NULL) {
|
if (pTableScanInfo->nextDurationBlocks[i] != NULL) {
|
||||||
|
|
|
@ -672,6 +672,7 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
blockDataDestroy(pDataBlock);
|
blockDataDestroy(pDataBlock);
|
||||||
|
pDataBlock = NULL;
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
pAPI->metaFn.closeTableMetaCursor(pInfo->pCur);
|
pAPI->metaFn.closeTableMetaCursor(pInfo->pCur);
|
||||||
pInfo->pCur = NULL;
|
pInfo->pCur = NULL;
|
||||||
|
@ -683,6 +684,7 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
_end:
|
_end:
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
blockDataDestroy(pDataBlock);
|
||||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||||
pTaskInfo->code = code;
|
pTaskInfo->code = code;
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
|
@ -695,6 +697,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||||
|
SSDataBlock* dataBlock = NULL;
|
||||||
|
|
||||||
SSysTableScanInfo* pInfo = pOperator->info;
|
SSysTableScanInfo* pInfo = pOperator->info;
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
|
@ -704,7 +707,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
|
||||||
blockDataCleanup(pInfo->pRes);
|
blockDataCleanup(pInfo->pRes);
|
||||||
int32_t numOfRows = 0;
|
int32_t numOfRows = 0;
|
||||||
|
|
||||||
SSDataBlock* dataBlock = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_TAGS);
|
dataBlock = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_TAGS);
|
||||||
code = blockDataEnsureCapacity(dataBlock, pOperator->resultInfo.capacity);
|
code = blockDataEnsureCapacity(dataBlock, pOperator->resultInfo.capacity);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
|
@ -826,6 +829,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
blockDataDestroy(dataBlock);
|
blockDataDestroy(dataBlock);
|
||||||
|
dataBlock = NULL;
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
pAPI->metaFn.closeTableMetaCursor(pInfo->pCur);
|
pAPI->metaFn.closeTableMetaCursor(pInfo->pCur);
|
||||||
pInfo->pCur = NULL;
|
pInfo->pCur = NULL;
|
||||||
|
@ -837,6 +841,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
|
||||||
_end:
|
_end:
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||||
|
blockDataDestroy(dataBlock);
|
||||||
pAPI->metaFn.closeTableMetaCursor(pInfo->pCur);
|
pAPI->metaFn.closeTableMetaCursor(pInfo->pCur);
|
||||||
pInfo->pCur = NULL;
|
pInfo->pCur = NULL;
|
||||||
pTaskInfo->code = code;
|
pTaskInfo->code = code;
|
||||||
|
@ -1310,9 +1315,11 @@ int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) {
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
blockDataDestroy(p);
|
blockDataDestroy(p);
|
||||||
|
p = NULL;
|
||||||
|
|
||||||
_end:
|
_end:
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
blockDataDestroy(p);
|
||||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
|
@ -1325,6 +1332,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
|
||||||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||||
SSysTableScanInfo* pInfo = pOperator->info;
|
SSysTableScanInfo* pInfo = pOperator->info;
|
||||||
SSysTableIndex* pIdx = pInfo->pIdx;
|
SSysTableIndex* pIdx = pInfo->pIdx;
|
||||||
|
SSDataBlock* p = NULL;
|
||||||
blockDataCleanup(pInfo->pRes);
|
blockDataCleanup(pInfo->pRes);
|
||||||
int32_t numOfRows = 0;
|
int32_t numOfRows = 0;
|
||||||
|
|
||||||
|
@ -1344,7 +1352,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
varDataSetLen(dbname, strlen(varDataVal(dbname)));
|
varDataSetLen(dbname, strlen(varDataVal(dbname)));
|
||||||
|
|
||||||
SSDataBlock* p = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_TABLES);
|
p = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_TABLES);
|
||||||
code = blockDataEnsureCapacity(p, pOperator->resultInfo.capacity);
|
code = blockDataEnsureCapacity(p, pOperator->resultInfo.capacity);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
|
@ -1545,12 +1553,14 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
blockDataDestroy(p);
|
blockDataDestroy(p);
|
||||||
|
p = NULL;
|
||||||
|
|
||||||
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
|
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
|
||||||
|
|
||||||
_end:
|
_end:
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||||
|
blockDataDestroy(p);
|
||||||
pTaskInfo->code = code;
|
pTaskInfo->code = code;
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
|
@ -1563,6 +1573,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||||
int8_t firstMetaCursor = 0;
|
int8_t firstMetaCursor = 0;
|
||||||
|
SSDataBlock* p = NULL;
|
||||||
|
|
||||||
SSysTableScanInfo* pInfo = pOperator->info;
|
SSysTableScanInfo* pInfo = pOperator->info;
|
||||||
if (pInfo->pCur == NULL) {
|
if (pInfo->pCur == NULL) {
|
||||||
|
@ -1590,7 +1601,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
varDataSetLen(dbname, strlen(varDataVal(dbname)));
|
varDataSetLen(dbname, strlen(varDataVal(dbname)));
|
||||||
|
|
||||||
SSDataBlock* p = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_TABLES);
|
p = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_TABLES);
|
||||||
QUERY_CHECK_NULL(p, code, lino, _end, terrno);
|
QUERY_CHECK_NULL(p, code, lino, _end, terrno);
|
||||||
|
|
||||||
code = blockDataEnsureCapacity(p, pOperator->resultInfo.capacity);
|
code = blockDataEnsureCapacity(p, pOperator->resultInfo.capacity);
|
||||||
|
@ -1783,6 +1794,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
blockDataDestroy(p);
|
blockDataDestroy(p);
|
||||||
|
p = NULL;
|
||||||
|
|
||||||
// todo temporarily free the cursor here, the true reason why the free is not valid needs to be found
|
// todo temporarily free the cursor here, the true reason why the free is not valid needs to be found
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
|
@ -1796,6 +1808,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
|
||||||
_end:
|
_end:
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||||
|
blockDataDestroy(p);
|
||||||
pTaskInfo->code = code;
|
pTaskInfo->code = code;
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
|
@ -2244,7 +2257,7 @@ void destroySysScanOperator(void* param) {
|
||||||
if (strncasecmp(name, TSDB_INS_TABLE_TABLES, TSDB_TABLE_FNAME_LEN) == 0 ||
|
if (strncasecmp(name, TSDB_INS_TABLE_TABLES, TSDB_TABLE_FNAME_LEN) == 0 ||
|
||||||
strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0 ||
|
strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0 ||
|
||||||
strncasecmp(name, TSDB_INS_TABLE_COLS, TSDB_TABLE_FNAME_LEN) == 0 || pInfo->pCur != NULL) {
|
strncasecmp(name, TSDB_INS_TABLE_COLS, TSDB_TABLE_FNAME_LEN) == 0 || pInfo->pCur != NULL) {
|
||||||
if (pInfo->pAPI->metaFn.closeTableMetaCursor != NULL) {
|
if (pInfo->pAPI != NULL && pInfo->pAPI->metaFn.closeTableMetaCursor != NULL) {
|
||||||
pInfo->pAPI->metaFn.closeTableMetaCursor(pInfo->pCur);
|
pInfo->pAPI->metaFn.closeTableMetaCursor(pInfo->pCur);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2726,7 +2739,9 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
|
||||||
static void destroyBlockDistScanOperatorInfo(void* param) {
|
static void destroyBlockDistScanOperatorInfo(void* param) {
|
||||||
SBlockDistInfo* pDistInfo = (SBlockDistInfo*)param;
|
SBlockDistInfo* pDistInfo = (SBlockDistInfo*)param;
|
||||||
blockDataDestroy(pDistInfo->pResBlock);
|
blockDataDestroy(pDistInfo->pResBlock);
|
||||||
|
if (pDistInfo->readHandle.api.tsdReader.tsdReaderClose != NULL) {
|
||||||
pDistInfo->readHandle.api.tsdReader.tsdReaderClose(pDistInfo->pHandle);
|
pDistInfo->readHandle.api.tsdReader.tsdReaderClose(pDistInfo->pHandle);
|
||||||
|
}
|
||||||
tableListDestroy(pDistInfo->pTableListInfo);
|
tableListDestroy(pDistInfo->pTableListInfo);
|
||||||
taosMemoryFreeClear(param);
|
taosMemoryFreeClear(param);
|
||||||
}
|
}
|
||||||
|
|
|
@ -2094,7 +2094,7 @@ int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMerge
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_error:
|
_error:
|
||||||
destroyMAIOperatorInfo(miaInfo);
|
if (miaInfo != NULL) destroyMAIOperatorInfo(miaInfo);
|
||||||
destroyOperator(pOperator);
|
destroyOperator(pOperator);
|
||||||
pTaskInfo->code = code;
|
pTaskInfo->code = code;
|
||||||
return code;
|
return code;
|
||||||
|
|
Loading…
Reference in New Issue