Merge pull request #27483 from taosdata/fix/ly_res
fix(query):clear meta reader
This commit is contained in:
commit
466d8cb321
|
@ -435,6 +435,7 @@ int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle,
|
||||||
code = nodesCloneNode(pTagCond, &pTagCondTmp);
|
code = nodesCloneNode(pTagCond, &pTagCondTmp);
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
*pQualified = false;
|
*pQualified = false;
|
||||||
|
pAPI->metaReaderFn.clearReader(&mr);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
STransTagExprCtx ctx = {.code = 0, .pReader = &mr};
|
STransTagExprCtx ctx = {.code = 0, .pReader = &mr};
|
||||||
|
|
|
@ -372,11 +372,11 @@ static int32_t filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
SArray* qa = taosArrayInit(4, sizeof(tb_uid_t));
|
SArray* qa = taosArrayInit(4, sizeof(tb_uid_t));
|
||||||
QUERY_CHECK_NULL(qa, code, lino, _end, terrno);
|
QUERY_CHECK_NULL(qa, code, lino, _error, terrno);
|
||||||
int32_t numOfUids = taosArrayGetSize(tableIdList);
|
int32_t numOfUids = taosArrayGetSize(tableIdList);
|
||||||
if (numOfUids == 0) {
|
if (numOfUids == 0) {
|
||||||
(*ppArrayRes) = qa;
|
(*ppArrayRes) = qa;
|
||||||
goto _end;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
STableScanInfo* pTableScanInfo = pScanInfo->pTableScanOp->info;
|
STableScanInfo* pTableScanInfo = pScanInfo->pTableScanOp->info;
|
||||||
|
@ -437,10 +437,11 @@ static int32_t filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_end:
|
||||||
pAPI->metaReaderFn.clearReader(&mr);
|
pAPI->metaReaderFn.clearReader(&mr);
|
||||||
(*ppArrayRes) = qa;
|
(*ppArrayRes) = qa;
|
||||||
|
|
||||||
_end:
|
_error:
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||||
}
|
}
|
||||||
|
|
|
@ -643,6 +643,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int
|
||||||
h = taosLRUCacheLookup(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid));
|
h = taosLRUCacheLookup(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid));
|
||||||
if (h == NULL) {
|
if (h == NULL) {
|
||||||
pHandle->api.metaReaderFn.initReader(&mr, pHandle->vnode, META_READER_LOCK, &pHandle->api.metaFn);
|
pHandle->api.metaReaderFn.initReader(&mr, pHandle->vnode, META_READER_LOCK, &pHandle->api.metaFn);
|
||||||
|
freeReader = true;
|
||||||
code = pHandle->api.metaReaderFn.getEntryGetUidCache(&mr, pBlock->info.id.uid);
|
code = pHandle->api.metaReaderFn.getEntryGetUidCache(&mr, pBlock->info.id.uid);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
|
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
|
||||||
|
@ -664,7 +665,6 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
val = *pVal;
|
val = *pVal;
|
||||||
freeReader = true;
|
|
||||||
} else {
|
} else {
|
||||||
pCache->cacheHit += 1;
|
pCache->cacheHit += 1;
|
||||||
STableCachedVal* pVal = taosLRUCacheValue(pCache->pTableMetaEntryCache, h);
|
STableCachedVal* pVal = taosLRUCacheValue(pCache->pTableMetaEntryCache, h);
|
||||||
|
|
|
@ -655,16 +655,20 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
SSchemaWrapper* schemaWrapper = tCloneSSchemaWrapper(&smrSuperTable.me.stbEntry.schemaRow);
|
SSchemaWrapper* schemaWrapper = tCloneSSchemaWrapper(&smrSuperTable.me.stbEntry.schemaRow);
|
||||||
if (smrSuperTable.me.stbEntry.schemaRow.pSchema) {
|
if (smrSuperTable.me.stbEntry.schemaRow.pSchema) {
|
||||||
QUERY_CHECK_NULL(schemaWrapper, code, lino, _end, terrno);
|
if (schemaWrapper == NULL) {
|
||||||
|
code = terrno;
|
||||||
|
lino = __LINE__;
|
||||||
|
pAPI->metaReaderFn.clearReader(&smrSuperTable);
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
code = taosHashPut(pInfo->pSchema, &suid, sizeof(int64_t), &schemaWrapper, POINTER_BYTES);
|
code = taosHashPut(pInfo->pSchema, &suid, sizeof(int64_t), &schemaWrapper, POINTER_BYTES);
|
||||||
if (code == TSDB_CODE_DUP_KEY) {
|
if (code == TSDB_CODE_DUP_KEY) {
|
||||||
code = TSDB_CODE_SUCCESS;
|
code = TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
|
||||||
|
|
||||||
schemaRow = schemaWrapper;
|
schemaRow = schemaWrapper;
|
||||||
pAPI->metaReaderFn.clearReader(&smrSuperTable);
|
pAPI->metaReaderFn.clearReader(&smrSuperTable);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
} else if (pInfo->pCur->mr.me.type == TSDB_NORMAL_TABLE) {
|
} else if (pInfo->pCur->mr.me.type == TSDB_NORMAL_TABLE) {
|
||||||
qDebug("sysTableScanUserCols cursor get normal table, %s", GET_TASKID(pTaskInfo));
|
qDebug("sysTableScanUserCols cursor get normal table, %s", GET_TASKID(pTaskInfo));
|
||||||
|
@ -789,11 +793,12 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
code = sysTableUserTagsFillOneTableTags(pInfo, &smrSuperTable, &smrChildTable, dbname, tableName, &numOfRows,
|
code = sysTableUserTagsFillOneTableTags(pInfo, &smrSuperTable, &smrChildTable, dbname, tableName, &numOfRows,
|
||||||
dataBlock);
|
dataBlock);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
|
||||||
|
|
||||||
pAPI->metaReaderFn.clearReader(&smrSuperTable);
|
pAPI->metaReaderFn.clearReader(&smrSuperTable);
|
||||||
pAPI->metaReaderFn.clearReader(&smrChildTable);
|
pAPI->metaReaderFn.clearReader(&smrChildTable);
|
||||||
|
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
if (numOfRows > 0) {
|
if (numOfRows > 0) {
|
||||||
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo, pTaskInfo);
|
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo, pTaskInfo);
|
||||||
numOfRows = 0;
|
numOfRows = 0;
|
||||||
|
@ -831,6 +836,8 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
|
||||||
pAPI->metaReaderFn.clearReader(&smrSuperTable);
|
pAPI->metaReaderFn.clearReader(&smrSuperTable);
|
||||||
pAPI->metaFn.closeTableMetaCursor(pInfo->pCur);
|
pAPI->metaFn.closeTableMetaCursor(pInfo->pCur);
|
||||||
pInfo->pCur = NULL;
|
pInfo->pCur = NULL;
|
||||||
|
blockDataDestroy(dataBlock);
|
||||||
|
dataBlock = NULL;
|
||||||
T_LONG_JMP(pTaskInfo->env, terrno);
|
T_LONG_JMP(pTaskInfo->env, terrno);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -846,7 +853,15 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
|
||||||
} else {
|
} else {
|
||||||
code = sysTableUserTagsFillOneTableTags(pInfo, &smrSuperTable, &pInfo->pCur->mr, dbname, tableName, &numOfRows,
|
code = sysTableUserTagsFillOneTableTags(pInfo, &smrSuperTable, &pInfo->pCur->mr, dbname, tableName, &numOfRows,
|
||||||
dataBlock);
|
dataBlock);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||||
|
pAPI->metaReaderFn.clearReader(&smrSuperTable);
|
||||||
|
pAPI->metaFn.closeTableMetaCursor(pInfo->pCur);
|
||||||
|
pInfo->pCur = NULL;
|
||||||
|
blockDataDestroy(dataBlock);
|
||||||
|
dataBlock = NULL;
|
||||||
|
T_LONG_JMP(pTaskInfo->env, terrno);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
pAPI->metaReaderFn.clearReader(&smrSuperTable);
|
pAPI->metaReaderFn.clearReader(&smrSuperTable);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue