meta: use meta cache to get uid's version instead of fetching from tdb
This commit is contained in:
parent
f26a492c67
commit
35b9dc6b6c
|
@ -142,7 +142,7 @@ typedef struct SMetaInfo {
|
|||
int64_t version;
|
||||
int32_t skmVer;
|
||||
} SMetaInfo;
|
||||
int32_t metaGetInfo(SMeta* pMeta, int64_t uid, SMetaInfo* pInfo);
|
||||
int32_t metaGetInfo(SMeta* pMeta, int64_t uid, SMetaInfo* pInfo, SMetaReader* pReader);
|
||||
|
||||
typedef struct {
|
||||
int64_t uid;
|
||||
|
|
|
@ -152,7 +152,8 @@ bool metaIsTableExist(SMeta *pMeta, tb_uid_t uid) {
|
|||
}
|
||||
|
||||
int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) {
|
||||
SMeta *pMeta = pReader->pMeta;
|
||||
SMeta *pMeta = pReader->pMeta;
|
||||
/*
|
||||
int64_t version1;
|
||||
|
||||
// query uid.idx
|
||||
|
@ -163,6 +164,15 @@ int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) {
|
|||
|
||||
version1 = ((SUidIdxVal *)pReader->pBuf)[0].version;
|
||||
return metaGetTableEntryByVersion(pReader, version1, uid);
|
||||
*/
|
||||
|
||||
SMetaInfo info;
|
||||
if (metaGetInfo(pMeta, uid, &info, pReader) == TSDB_CODE_NOT_FOUND) {
|
||||
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
|
||||
return -1;
|
||||
}
|
||||
|
||||
return metaGetTableEntryByVersion(pReader, info.version, uid);
|
||||
}
|
||||
|
||||
int metaGetTableEntryByName(SMetaReader *pReader, const char *name) {
|
||||
|
@ -614,7 +624,7 @@ int32_t metaGetTbTSchemaEx(SMeta *pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sv
|
|||
SSkmDbKey skmDbKey;
|
||||
if (sver <= 0) {
|
||||
SMetaInfo info;
|
||||
if (metaGetInfo(pMeta, suid ? suid : uid, &info) == 0) {
|
||||
if (metaGetInfo(pMeta, suid ? suid : uid, &info, NULL) == 0) {
|
||||
sver = info.skmVer;
|
||||
} else {
|
||||
TBC *pSkmDbC = NULL;
|
||||
|
@ -1126,7 +1136,7 @@ int32_t metaFilterTableName(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
|
|||
valid = tdbTbcGet(pCursor->pCur, (const void **)pEntryKey, &nEntryKey, (const void **)&pEntryVal, &nEntryVal);
|
||||
if (valid < 0) break;
|
||||
|
||||
char *pTableKey = (char *)pEntryKey;
|
||||
char *pTableKey = (char *)pEntryKey;
|
||||
cmp = (*param->filterFunc)(pTableKey, pName, pCursor->type);
|
||||
if (cmp == 0) {
|
||||
tb_uid_t tuid = *(tb_uid_t *)pEntryVal;
|
||||
|
@ -1379,10 +1389,11 @@ int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList, SHashObj
|
|||
|
||||
int32_t metaCacheGet(SMeta *pMeta, int64_t uid, SMetaInfo *pInfo);
|
||||
|
||||
int32_t metaGetInfo(SMeta *pMeta, int64_t uid, SMetaInfo *pInfo) {
|
||||
int32_t metaGetInfo(SMeta *pMeta, int64_t uid, SMetaInfo *pInfo, SMetaReader *pReader) {
|
||||
int32_t code = 0;
|
||||
void *pData = NULL;
|
||||
int nData = 0;
|
||||
int lock = 0;
|
||||
|
||||
metaRLock(pMeta);
|
||||
|
||||
|
@ -1407,11 +1418,22 @@ int32_t metaGetInfo(SMeta *pMeta, int64_t uid, SMetaInfo *pInfo) {
|
|||
pInfo->version = ((SUidIdxVal *)pData)->version;
|
||||
pInfo->skmVer = ((SUidIdxVal *)pData)->skmVer;
|
||||
|
||||
if (pReader != NULL) {
|
||||
lock = !(pReader->flags & META_READER_NOLOCK);
|
||||
if (lock) {
|
||||
metaULock(pReader->pMeta);
|
||||
// metaReaderReleaseLock(pReader);
|
||||
}
|
||||
}
|
||||
// upsert the cache
|
||||
metaWLock(pMeta);
|
||||
metaCacheUpsert(pMeta, pInfo);
|
||||
metaULock(pMeta);
|
||||
|
||||
if (lock) {
|
||||
metaRLock(pReader->pMeta);
|
||||
}
|
||||
|
||||
_exit:
|
||||
tdbFree(pData);
|
||||
return code;
|
||||
|
|
|
@ -207,7 +207,7 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
|||
tb_uid_t uid = *(tb_uid_t *)pData;
|
||||
tdbFree(pData);
|
||||
SMetaInfo info;
|
||||
metaGetInfo(pMeta, uid, &info);
|
||||
metaGetInfo(pMeta, uid, &info, NULL);
|
||||
if (info.uid == info.suid) {
|
||||
return 0;
|
||||
} else {
|
||||
|
|
|
@ -104,7 +104,7 @@ int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitMsgIter *pMsgI
|
|||
tb_uid_t uid = pMsgIter->uid;
|
||||
|
||||
SMetaInfo info;
|
||||
code = metaGetInfo(pTsdb->pVnode->pMeta, uid, &info);
|
||||
code = metaGetInfo(pTsdb->pVnode->pMeta, uid, &info, NULL);
|
||||
if (code) {
|
||||
code = TSDB_CODE_TDB_TABLE_NOT_EXIST;
|
||||
goto _err;
|
||||
|
@ -114,7 +114,7 @@ int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitMsgIter *pMsgI
|
|||
goto _err;
|
||||
}
|
||||
if (info.suid) {
|
||||
metaGetInfo(pTsdb->pVnode->pMeta, info.suid, &info);
|
||||
metaGetInfo(pTsdb->pVnode->pMeta, info.suid, &info, NULL);
|
||||
}
|
||||
if (pMsgIter->sversion != info.skmVer) {
|
||||
tsdbError("vgId:%d, req sver:%d, skmVer:%d suid:%" PRId64 " uid:%" PRId64, TD_VID(pTsdb->pVnode),
|
||||
|
@ -153,7 +153,7 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid
|
|||
|
||||
// check if table exists
|
||||
SMetaInfo info;
|
||||
code = metaGetInfo(pTsdb->pVnode->pMeta, uid, &info);
|
||||
code = metaGetInfo(pTsdb->pVnode->pMeta, uid, &info, NULL);
|
||||
if (code) {
|
||||
code = TSDB_CODE_TDB_TABLE_NOT_EXIST;
|
||||
goto _err;
|
||||
|
|
Loading…
Reference in New Issue