fix(query): fix dead lock.
This commit is contained in:
parent
9997ff9ada
commit
0948216ca9
|
@ -99,6 +99,7 @@ void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags);
|
||||||
void metaReaderReleaseLock(SMetaReader *pReader);
|
void metaReaderReleaseLock(SMetaReader *pReader);
|
||||||
void metaReaderClear(SMetaReader *pReader);
|
void metaReaderClear(SMetaReader *pReader);
|
||||||
int32_t metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid);
|
int32_t metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid);
|
||||||
|
int32_t metaGetTableEntryByUidCache(SMetaReader *pReader, tb_uid_t uid);
|
||||||
int metaGetTableEntryByName(SMetaReader *pReader, const char *name);
|
int metaGetTableEntryByName(SMetaReader *pReader, const char *name);
|
||||||
int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList, SHashObj *tags);
|
int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList, SHashObj *tags);
|
||||||
int32_t metaGetTableTagsByUids(SMeta *pMeta, int64_t suid, SArray *uidList, SHashObj *tags);
|
int32_t metaGetTableTagsByUids(SMeta *pMeta, int64_t suid, SArray *uidList, SHashObj *tags);
|
||||||
|
|
|
@ -153,7 +153,6 @@ bool metaIsTableExist(SMeta *pMeta, tb_uid_t uid) {
|
||||||
|
|
||||||
int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) {
|
int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) {
|
||||||
SMeta *pMeta = pReader->pMeta;
|
SMeta *pMeta = pReader->pMeta;
|
||||||
/*
|
|
||||||
int64_t version1;
|
int64_t version1;
|
||||||
|
|
||||||
// query uid.idx
|
// query uid.idx
|
||||||
|
@ -164,7 +163,10 @@ int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) {
|
||||||
|
|
||||||
version1 = ((SUidIdxVal *)pReader->pBuf)[0].version;
|
version1 = ((SUidIdxVal *)pReader->pBuf)[0].version;
|
||||||
return metaGetTableEntryByVersion(pReader, version1, uid);
|
return metaGetTableEntryByVersion(pReader, version1, uid);
|
||||||
*/
|
}
|
||||||
|
|
||||||
|
int metaGetTableEntryByUidCache(SMetaReader *pReader, tb_uid_t uid) {
|
||||||
|
SMeta *pMeta = pReader->pMeta;
|
||||||
|
|
||||||
SMetaInfo info;
|
SMetaInfo info;
|
||||||
if (metaGetInfo(pMeta, uid, &info, pReader) == TSDB_CODE_NOT_FOUND) {
|
if (metaGetInfo(pMeta, uid, &info, pReader) == TSDB_CODE_NOT_FOUND) {
|
||||||
|
|
|
@ -36,7 +36,7 @@ int32_t metaCreateTSma(SMeta *pMeta, int64_t version, SSmaCfg *pCfg) {
|
||||||
// validate req
|
// validate req
|
||||||
// save smaIndex
|
// save smaIndex
|
||||||
metaReaderInit(&mr, pMeta, 0);
|
metaReaderInit(&mr, pMeta, 0);
|
||||||
if (metaGetTableEntryByUid(&mr, pCfg->indexUid) == 0) {
|
if (metaGetTableEntryByUidCache(&mr, pCfg->indexUid) == 0) {
|
||||||
#if 1
|
#if 1
|
||||||
terrno = TSDB_CODE_TSMA_ALREADY_EXIST;
|
terrno = TSDB_CODE_TSMA_ALREADY_EXIST;
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
|
|
|
@ -921,7 +921,7 @@ static int32_t tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo) {
|
||||||
SMetaReader mr = {0};
|
SMetaReader mr = {0};
|
||||||
metaReaderInit(&mr, SMA_META(pSma), 0);
|
metaReaderInit(&mr, SMA_META(pSma), 0);
|
||||||
smaDebug("vgId:%d, rsma clone qTaskInfo for suid:%" PRIi64, SMA_VID(pSma), pInfo->suid);
|
smaDebug("vgId:%d, rsma clone qTaskInfo for suid:%" PRIi64, SMA_VID(pSma), pInfo->suid);
|
||||||
if (metaGetTableEntryByUid(&mr, pInfo->suid) < 0) {
|
if (metaGetTableEntryByUidCache(&mr, pInfo->suid) < 0) {
|
||||||
smaError("vgId:%d, rsma clone, failed to get table meta for %" PRIi64 " since %s", SMA_VID(pSma), pInfo->suid,
|
smaError("vgId:%d, rsma clone, failed to get table meta for %" PRIi64 " since %s", SMA_VID(pSma), pInfo->suid,
|
||||||
terrstr());
|
terrstr());
|
||||||
goto _err;
|
goto _err;
|
||||||
|
@ -1125,7 +1125,7 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) {
|
||||||
for (int64_t i = 0; i < arrSize; ++i) {
|
for (int64_t i = 0; i < arrSize; ++i) {
|
||||||
tb_uid_t suid = *(tb_uid_t *)taosArrayGet(suidList, i);
|
tb_uid_t suid = *(tb_uid_t *)taosArrayGet(suidList, i);
|
||||||
smaDebug("vgId:%d, rsma restore, suid is %" PRIi64, TD_VID(pVnode), suid);
|
smaDebug("vgId:%d, rsma restore, suid is %" PRIi64, TD_VID(pVnode), suid);
|
||||||
if (metaGetTableEntryByUid(&mr, suid) < 0) {
|
if (metaGetTableEntryByUidCache(&mr, suid) < 0) {
|
||||||
smaError("vgId:%d, rsma restore, failed to get table meta for %" PRIi64 " since %s", TD_VID(pVnode), suid,
|
smaError("vgId:%d, rsma restore, failed to get table meta for %" PRIi64 " since %s", TD_VID(pVnode), suid,
|
||||||
terrstr());
|
terrstr());
|
||||||
goto _err;
|
goto _err;
|
||||||
|
|
|
@ -48,7 +48,7 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, i
|
||||||
SMetaReader mr = {0};
|
SMetaReader mr = {0};
|
||||||
metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
|
metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
|
||||||
// TODO add reference to gurantee success
|
// TODO add reference to gurantee success
|
||||||
if (metaGetTableEntryByUid(&mr, uid) < 0) {
|
if (metaGetTableEntryByUidCache(&mr, uid) < 0) {
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -766,7 +766,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); ++i) {
|
||||||
uint64_t* id = (uint64_t*)taosArrayGet(tbUidList, i);
|
uint64_t* id = (uint64_t*)taosArrayGet(tbUidList, i);
|
||||||
|
|
||||||
int32_t code = metaGetTableEntryByUid(&mr, *id);
|
int32_t code = metaGetTableEntryByUidCache(&mr, *id);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("failed to get table meta, uid:%" PRIu64 " code:%s", *id, tstrerror(terrno));
|
qError("failed to get table meta, uid:%" PRIu64 " code:%s", *id, tstrerror(terrno));
|
||||||
continue;
|
continue;
|
||||||
|
|
|
@ -392,7 +392,7 @@ static tb_uid_t getTableSuidByUid(tb_uid_t uid, STsdb *pTsdb) {
|
||||||
|
|
||||||
SMetaReader mr = {0};
|
SMetaReader mr = {0};
|
||||||
metaReaderInit(&mr, pTsdb->pVnode->pMeta, 0);
|
metaReaderInit(&mr, pTsdb->pVnode->pMeta, 0);
|
||||||
if (metaGetTableEntryByUid(&mr, uid) < 0) {
|
if (metaGetTableEntryByUidCache(&mr, uid) < 0) {
|
||||||
metaReaderClear(&mr); // table not esist
|
metaReaderClear(&mr); // table not esist
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -4350,7 +4350,7 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6
|
||||||
|
|
||||||
SMetaReader mr = {0};
|
SMetaReader mr = {0};
|
||||||
metaReaderInit(&mr, pVnode->pMeta, 0);
|
metaReaderInit(&mr, pVnode->pMeta, 0);
|
||||||
int32_t code = metaGetTableEntryByUid(&mr, uid);
|
int32_t code = metaGetTableEntryByUidCache(&mr, uid);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
|
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
|
@ -4362,7 +4362,7 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6
|
||||||
if (mr.me.type == TSDB_CHILD_TABLE) {
|
if (mr.me.type == TSDB_CHILD_TABLE) {
|
||||||
tDecoderClear(&mr.coder);
|
tDecoderClear(&mr.coder);
|
||||||
*suid = mr.me.ctbEntry.suid;
|
*suid = mr.me.ctbEntry.suid;
|
||||||
code = metaGetTableEntryByUid(&mr, *suid);
|
code = metaGetTableEntryByUidCache(&mr, *suid);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
|
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
|
|
|
@ -290,7 +290,7 @@ int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle,
|
||||||
SMetaReader mr = {0};
|
SMetaReader mr = {0};
|
||||||
|
|
||||||
metaReaderInit(&mr, metaHandle, 0);
|
metaReaderInit(&mr, metaHandle, 0);
|
||||||
code = metaGetTableEntryByUid(&mr, info->uid);
|
code = metaGetTableEntryByUidCache(&mr, info->uid);
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
*pQualified = false;
|
*pQualified = false;
|
||||||
|
@ -1092,7 +1092,7 @@ size_t getTableTagsBufLen(const SNodeList* pGroups) {
|
||||||
int32_t getGroupIdFromTagsVal(void* pMeta, uint64_t uid, SNodeList* pGroupNode, char* keyBuf, uint64_t* pGroupId) {
|
int32_t getGroupIdFromTagsVal(void* pMeta, uint64_t uid, SNodeList* pGroupNode, char* keyBuf, uint64_t* pGroupId) {
|
||||||
SMetaReader mr = {0};
|
SMetaReader mr = {0};
|
||||||
metaReaderInit(&mr, pMeta, 0);
|
metaReaderInit(&mr, pMeta, 0);
|
||||||
if (metaGetTableEntryByUid(&mr, uid) != 0) { // table not exist
|
if (metaGetTableEntryByUidCache(&mr, uid) != 0) { // table not exist
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
return TSDB_CODE_PAR_TABLE_NOT_EXIST;
|
return TSDB_CODE_PAR_TABLE_NOT_EXIST;
|
||||||
}
|
}
|
||||||
|
|
|
@ -287,7 +287,7 @@ static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(tableIdList); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(tableIdList); ++i) {
|
||||||
uint64_t* id = (uint64_t*)taosArrayGet(tableIdList, i);
|
uint64_t* id = (uint64_t*)taosArrayGet(tableIdList, i);
|
||||||
|
|
||||||
int32_t code = metaGetTableEntryByUid(&mr, *id);
|
int32_t code = metaGetTableEntryByUidCache(&mr, *id);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("failed to get table meta, uid:%" PRIu64 " code:%s, %s", *id, tstrerror(terrno), idstr);
|
qError("failed to get table meta, uid:%" PRIu64 " code:%s, %s", *id, tstrerror(terrno), idstr);
|
||||||
continue;
|
continue;
|
||||||
|
|
|
@ -2317,7 +2317,7 @@ SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode);
|
||||||
int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, SExecTaskInfo* pTaskInfo) {
|
int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, SExecTaskInfo* pTaskInfo) {
|
||||||
SMetaReader mr = {0};
|
SMetaReader mr = {0};
|
||||||
metaReaderInit(&mr, pHandle->meta, 0);
|
metaReaderInit(&mr, pHandle->meta, 0);
|
||||||
int32_t code = metaGetTableEntryByUid(&mr, pScanNode->uid);
|
int32_t code = metaGetTableEntryByUidCache(&mr, pScanNode->uid);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("failed to get the table meta, uid:0x%" PRIx64 ", suid:0x%" PRIx64 ", %s", pScanNode->uid, pScanNode->suid,
|
qError("failed to get the table meta, uid:0x%" PRIx64 ", suid:0x%" PRIx64 ", %s", pScanNode->uid, pScanNode->suid,
|
||||||
GET_TASKID(pTaskInfo));
|
GET_TASKID(pTaskInfo));
|
||||||
|
@ -2336,7 +2336,7 @@ int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode,
|
||||||
tDecoderClear(&mr.coder);
|
tDecoderClear(&mr.coder);
|
||||||
|
|
||||||
tb_uid_t suid = mr.me.ctbEntry.suid;
|
tb_uid_t suid = mr.me.ctbEntry.suid;
|
||||||
metaGetTableEntryByUid(&mr, suid);
|
metaGetTableEntryByUidCache(&mr, suid);
|
||||||
pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
|
pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
|
||||||
pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version;
|
pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version;
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -484,7 +484,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int
|
||||||
// 1. check if it is existed in meta cache
|
// 1. check if it is existed in meta cache
|
||||||
if (pCache == NULL) {
|
if (pCache == NULL) {
|
||||||
metaReaderInit(&mr, pHandle->meta, 0);
|
metaReaderInit(&mr, pHandle->meta, 0);
|
||||||
code = metaGetTableEntryByUid(&mr, pBlock->info.uid);
|
code = metaGetTableEntryByUidCache(&mr, pBlock->info.uid);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
|
if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
|
||||||
qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.uid,
|
qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.uid,
|
||||||
|
@ -508,7 +508,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int
|
||||||
h = taosLRUCacheLookup(pCache->pTableMetaEntryCache, &pBlock->info.uid, sizeof(pBlock->info.uid));
|
h = taosLRUCacheLookup(pCache->pTableMetaEntryCache, &pBlock->info.uid, sizeof(pBlock->info.uid));
|
||||||
if (h == NULL) {
|
if (h == NULL) {
|
||||||
metaReaderInit(&mr, pHandle->meta, 0);
|
metaReaderInit(&mr, pHandle->meta, 0);
|
||||||
code = metaGetTableEntryByUid(&mr, pBlock->info.uid);
|
code = metaGetTableEntryByUidCache(&mr, pBlock->info.uid);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
|
if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
|
||||||
qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s",
|
qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s",
|
||||||
|
|
|
@ -441,6 +441,9 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
|
||||||
int32_t code = metaGetTableEntryByName(&smrChildTable, condTableName);
|
int32_t code = metaGetTableEntryByName(&smrChildTable, condTableName);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
// terrno has been set by metaGetTableEntryByName, therefore, return directly
|
// terrno has been set by metaGetTableEntryByName, therefore, return directly
|
||||||
|
metaReaderClear(&smrChildTable);
|
||||||
|
blockDataDestroy(dataBlock);
|
||||||
|
pInfo->loadInfo.totalRows = 0;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -456,12 +459,16 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
|
||||||
code = metaGetTableEntryByUid(&smrSuperTable, smrChildTable.me.ctbEntry.suid);
|
code = metaGetTableEntryByUid(&smrSuperTable, smrChildTable.me.ctbEntry.suid);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
// terrno has been set by metaGetTableEntryByUid
|
// terrno has been set by metaGetTableEntryByUid
|
||||||
|
metaReaderClear(&smrSuperTable);
|
||||||
|
metaReaderClear(&smrChildTable);
|
||||||
|
blockDataDestroy(dataBlock);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
sysTableUserTagsFillOneTableTags(pInfo, &smrSuperTable, &smrChildTable, dbname, tableName, &numOfRows, dataBlock);
|
sysTableUserTagsFillOneTableTags(pInfo, &smrSuperTable, &smrChildTable, dbname, tableName, &numOfRows, dataBlock);
|
||||||
metaReaderClear(&smrSuperTable);
|
metaReaderClear(&smrSuperTable);
|
||||||
metaReaderClear(&smrChildTable);
|
metaReaderClear(&smrChildTable);
|
||||||
|
|
||||||
if (numOfRows > 0) {
|
if (numOfRows > 0) {
|
||||||
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo);
|
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo);
|
||||||
numOfRows = 0;
|
numOfRows = 0;
|
||||||
|
|
Loading…
Reference in New Issue