meta/cache: a new cache for stable's ctbNum
This commit is contained in:
parent
8f61346d29
commit
25b279dd7a
|
@ -67,6 +67,10 @@ void metaCacheClose(SMeta* pMeta);
|
||||||
int32_t metaCacheUpsert(SMeta* pMeta, SMetaInfo* pInfo);
|
int32_t metaCacheUpsert(SMeta* pMeta, SMetaInfo* pInfo);
|
||||||
int32_t metaCacheDrop(SMeta* pMeta, int64_t uid);
|
int32_t metaCacheDrop(SMeta* pMeta, int64_t uid);
|
||||||
|
|
||||||
|
int32_t metaStatsCacheUpsert(SMeta* pMeta, SMetaStbStats* pInfo);
|
||||||
|
int32_t metaStatsCacheDrop(SMeta* pMeta, int64_t uid);
|
||||||
|
int32_t metaStatsCacheGet(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo);
|
||||||
|
|
||||||
struct SMeta {
|
struct SMeta {
|
||||||
TdThreadRwlock lock;
|
TdThreadRwlock lock;
|
||||||
|
|
||||||
|
|
|
@ -141,6 +141,12 @@ typedef struct SMetaInfo {
|
||||||
} SMetaInfo;
|
} SMetaInfo;
|
||||||
int32_t metaGetInfo(SMeta* pMeta, int64_t uid, SMetaInfo* pInfo);
|
int32_t metaGetInfo(SMeta* pMeta, int64_t uid, SMetaInfo* pInfo);
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int64_t uid;
|
||||||
|
int64_t ctbNum;
|
||||||
|
} SMetaStbStats;
|
||||||
|
int32_t metaGetStbStats(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo);
|
||||||
|
|
||||||
// tsdb
|
// tsdb
|
||||||
int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg);
|
int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg);
|
||||||
int tsdbClose(STsdb** pTsdb);
|
int tsdbClose(STsdb** pTsdb);
|
||||||
|
|
|
@ -14,7 +14,8 @@
|
||||||
*/
|
*/
|
||||||
#include "meta.h"
|
#include "meta.h"
|
||||||
|
|
||||||
#define META_CACHE_BASE_BUCKET 1024
|
#define META_CACHE_BASE_BUCKET 1024
|
||||||
|
#define META_CACHE_STATS_BUCKET 16
|
||||||
|
|
||||||
// (uid , suid) : child table
|
// (uid , suid) : child table
|
||||||
// (uid, 0) : normal table
|
// (uid, 0) : normal table
|
||||||
|
@ -25,6 +26,11 @@ struct SMetaCacheEntry {
|
||||||
SMetaInfo info;
|
SMetaInfo info;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
typedef struct SMetaStbStatsEntry {
|
||||||
|
struct SMetaStbStatsEntry* next;
|
||||||
|
SMetaStbStats info;
|
||||||
|
} SMetaStbStatsEntry;
|
||||||
|
|
||||||
struct SMetaCache {
|
struct SMetaCache {
|
||||||
// child, normal, super, table entry cache
|
// child, normal, super, table entry cache
|
||||||
struct SEntryCache {
|
struct SEntryCache {
|
||||||
|
@ -32,8 +38,47 @@ struct SMetaCache {
|
||||||
int32_t nBucket;
|
int32_t nBucket;
|
||||||
SMetaCacheEntry** aBucket;
|
SMetaCacheEntry** aBucket;
|
||||||
} sEntryCache;
|
} sEntryCache;
|
||||||
|
|
||||||
|
// stable stats cache
|
||||||
|
struct SStbStatsCache {
|
||||||
|
int32_t nEntry;
|
||||||
|
int32_t nBucket;
|
||||||
|
SMetaStbStatsEntry** aBucket;
|
||||||
|
} sStbStatsCache;
|
||||||
|
|
||||||
|
// query cache
|
||||||
};
|
};
|
||||||
|
|
||||||
|
static void entryCacheClose(SMeta* pMeta) {
|
||||||
|
if (pMeta->pCache) {
|
||||||
|
// close entry cache
|
||||||
|
for (int32_t iBucket = 0; iBucket < pMeta->pCache->sEntryCache.nBucket; iBucket++) {
|
||||||
|
SMetaCacheEntry* pEntry = pMeta->pCache->sEntryCache.aBucket[iBucket];
|
||||||
|
while (pEntry) {
|
||||||
|
SMetaCacheEntry* tEntry = pEntry->next;
|
||||||
|
taosMemoryFree(pEntry);
|
||||||
|
pEntry = tEntry;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
taosMemoryFree(pMeta->pCache->sEntryCache.aBucket);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void statsCacheClose(SMeta* pMeta) {
|
||||||
|
if (pMeta->pCache) {
|
||||||
|
// close entry cache
|
||||||
|
for (int32_t iBucket = 0; iBucket < pMeta->pCache->sStbStatsCache.nBucket; iBucket++) {
|
||||||
|
SMetaStbStatsEntry* pEntry = pMeta->pCache->sStbStatsCache.aBucket[iBucket];
|
||||||
|
while (pEntry) {
|
||||||
|
SMetaStbStatsEntry* tEntry = pEntry->next;
|
||||||
|
taosMemoryFree(pEntry);
|
||||||
|
pEntry = tEntry;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
taosMemoryFree(pMeta->pCache->sStbStatsCache.aBucket);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int32_t metaCacheOpen(SMeta* pMeta) {
|
int32_t metaCacheOpen(SMeta* pMeta) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SMetaCache* pCache = NULL;
|
SMetaCache* pCache = NULL;
|
||||||
|
@ -51,32 +96,38 @@ int32_t metaCacheOpen(SMeta* pMeta) {
|
||||||
(SMetaCacheEntry**)taosMemoryCalloc(pCache->sEntryCache.nBucket, sizeof(SMetaCacheEntry*));
|
(SMetaCacheEntry**)taosMemoryCalloc(pCache->sEntryCache.nBucket, sizeof(SMetaCacheEntry*));
|
||||||
if (pCache->sEntryCache.aBucket == NULL) {
|
if (pCache->sEntryCache.aBucket == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
taosMemoryFree(pCache);
|
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// open stats cache
|
||||||
|
pCache->sStbStatsCache.nEntry = 0;
|
||||||
|
pCache->sStbStatsCache.nBucket = META_CACHE_STATS_BUCKET;
|
||||||
|
pCache->sStbStatsCache.aBucket =
|
||||||
|
(SMetaStbStatsEntry**)taosMemoryCalloc(pCache->sStbStatsCache.nBucket, sizeof(SMetaStbStatsEntry*));
|
||||||
|
if (pCache->sStbStatsCache.aBucket == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _err2;
|
||||||
|
}
|
||||||
|
|
||||||
pMeta->pCache = pCache;
|
pMeta->pCache = pCache;
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
|
_err2:
|
||||||
|
entryCacheClose(pMeta);
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
|
taosMemoryFree(pCache);
|
||||||
|
|
||||||
metaError("vgId:%d meta open cache failed since %s", TD_VID(pMeta->pVnode), tstrerror(code));
|
metaError("vgId:%d meta open cache failed since %s", TD_VID(pMeta->pVnode), tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
void metaCacheClose(SMeta* pMeta) {
|
void metaCacheClose(SMeta* pMeta) {
|
||||||
if (pMeta->pCache) {
|
if (pMeta->pCache) {
|
||||||
// close entry cache
|
entryCacheClose(pMeta);
|
||||||
for (int32_t iBucket = 0; iBucket < pMeta->pCache->sEntryCache.nBucket; iBucket++) {
|
statsCacheClose(pMeta);
|
||||||
SMetaCacheEntry* pEntry = pMeta->pCache->sEntryCache.aBucket[iBucket];
|
|
||||||
while (pEntry) {
|
|
||||||
SMetaCacheEntry* tEntry = pEntry->next;
|
|
||||||
taosMemoryFree(pEntry);
|
|
||||||
pEntry = tEntry;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
taosMemoryFree(pMeta->pCache->sEntryCache.aBucket);
|
|
||||||
taosMemoryFree(pMeta->pCache);
|
taosMemoryFree(pMeta->pCache);
|
||||||
pMeta->pCache = NULL;
|
pMeta->pCache = NULL;
|
||||||
}
|
}
|
||||||
|
@ -211,3 +262,129 @@ int32_t metaCacheGet(SMeta* pMeta, int64_t uid, SMetaInfo* pInfo) {
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t metaRehashStatsCache(SMetaCache* pCache, int8_t expand) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t nBucket;
|
||||||
|
|
||||||
|
if (expand) {
|
||||||
|
nBucket = pCache->sStbStatsCache.nBucket * 2;
|
||||||
|
} else {
|
||||||
|
nBucket = pCache->sStbStatsCache.nBucket / 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
SMetaStbStatsEntry** aBucket = (SMetaStbStatsEntry**)taosMemoryCalloc(nBucket, sizeof(SMetaStbStatsEntry*));
|
||||||
|
if (aBucket == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
// rehash
|
||||||
|
for (int32_t iBucket = 0; iBucket < pCache->sStbStatsCache.nBucket; iBucket++) {
|
||||||
|
SMetaStbStatsEntry* pEntry = pCache->sStbStatsCache.aBucket[iBucket];
|
||||||
|
|
||||||
|
while (pEntry) {
|
||||||
|
SMetaStbStatsEntry* pTEntry = pEntry->next;
|
||||||
|
|
||||||
|
pEntry->next = aBucket[TABS(pEntry->info.uid) % nBucket];
|
||||||
|
aBucket[TABS(pEntry->info.uid) % nBucket] = pEntry;
|
||||||
|
|
||||||
|
pEntry = pTEntry;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// final set
|
||||||
|
taosMemoryFree(pCache->sStbStatsCache.aBucket);
|
||||||
|
pCache->sStbStatsCache.nBucket = nBucket;
|
||||||
|
pCache->sStbStatsCache.aBucket = aBucket;
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t metaStatsCacheUpsert(SMeta* pMeta, SMetaStbStats* pInfo) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
// ASSERT(metaIsWLocked(pMeta));
|
||||||
|
|
||||||
|
// search
|
||||||
|
SMetaCache* pCache = pMeta->pCache;
|
||||||
|
int32_t iBucket = TABS(pInfo->uid) % pCache->sStbStatsCache.nBucket;
|
||||||
|
SMetaStbStatsEntry** ppEntry = &pCache->sStbStatsCache.aBucket[iBucket];
|
||||||
|
while (*ppEntry && (*ppEntry)->info.uid != pInfo->uid) {
|
||||||
|
ppEntry = &(*ppEntry)->next;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (*ppEntry) { // update
|
||||||
|
(*ppEntry)->info.ctbNum = pInfo->ctbNum;
|
||||||
|
} else { // insert
|
||||||
|
if (pCache->sStbStatsCache.nEntry >= pCache->sStbStatsCache.nBucket) {
|
||||||
|
code = metaRehashStatsCache(pCache, 1);
|
||||||
|
if (code) goto _exit;
|
||||||
|
|
||||||
|
iBucket = TABS(pInfo->uid) % pCache->sStbStatsCache.nBucket;
|
||||||
|
}
|
||||||
|
|
||||||
|
SMetaStbStatsEntry* pEntryNew = (SMetaStbStatsEntry*)taosMemoryMalloc(sizeof(*pEntryNew));
|
||||||
|
if (pEntryNew == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
pEntryNew->info = *pInfo;
|
||||||
|
pEntryNew->next = pCache->sStbStatsCache.aBucket[iBucket];
|
||||||
|
pCache->sStbStatsCache.aBucket[iBucket] = pEntryNew;
|
||||||
|
pCache->sStbStatsCache.nEntry++;
|
||||||
|
}
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t metaStatsCacheDrop(SMeta* pMeta, int64_t uid) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
SMetaCache* pCache = pMeta->pCache;
|
||||||
|
int32_t iBucket = TABS(uid) % pCache->sStbStatsCache.nBucket;
|
||||||
|
SMetaStbStatsEntry** ppEntry = &pCache->sStbStatsCache.aBucket[iBucket];
|
||||||
|
while (*ppEntry && (*ppEntry)->info.uid != uid) {
|
||||||
|
ppEntry = &(*ppEntry)->next;
|
||||||
|
}
|
||||||
|
|
||||||
|
SMetaStbStatsEntry* pEntry = *ppEntry;
|
||||||
|
if (pEntry) {
|
||||||
|
*ppEntry = pEntry->next;
|
||||||
|
taosMemoryFree(pEntry);
|
||||||
|
pCache->sStbStatsCache.nEntry--;
|
||||||
|
if (pCache->sStbStatsCache.nEntry < pCache->sStbStatsCache.nBucket / 4 &&
|
||||||
|
pCache->sStbStatsCache.nBucket > META_CACHE_STATS_BUCKET) {
|
||||||
|
code = metaRehashStatsCache(pCache, 0);
|
||||||
|
if (code) goto _exit;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
code = TSDB_CODE_NOT_FOUND;
|
||||||
|
}
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t metaStatsCacheGet(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
SMetaCache* pCache = pMeta->pCache;
|
||||||
|
int32_t iBucket = TABS(uid) % pCache->sStbStatsCache.nBucket;
|
||||||
|
SMetaStbStatsEntry* pEntry = pCache->sStbStatsCache.aBucket[iBucket];
|
||||||
|
|
||||||
|
while (pEntry && pEntry->info.uid != uid) {
|
||||||
|
pEntry = pEntry->next;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pEntry) {
|
||||||
|
*pInfo = pEntry->info;
|
||||||
|
} else {
|
||||||
|
code = TSDB_CODE_NOT_FOUND;
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
|
@ -1260,3 +1260,32 @@ _exit:
|
||||||
tdbFree(pData);
|
tdbFree(pData);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t metaGetStbStats(SMeta *pMeta, int64_t uid, SMetaStbStats *pInfo) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
metaRLock(pMeta);
|
||||||
|
|
||||||
|
// fast path: search cache
|
||||||
|
if (metaStatsCacheGet(pMeta, uid, pInfo) == TSDB_CODE_SUCCESS) {
|
||||||
|
metaULock(pMeta);
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
// slow path: search TDB
|
||||||
|
int64_t ctbNum = 0;
|
||||||
|
vnodeGetCtbNum(pMeta->pVnode, uid, &ctbNum);
|
||||||
|
|
||||||
|
metaULock(pMeta);
|
||||||
|
|
||||||
|
pInfo->uid = uid;
|
||||||
|
pInfo->ctbNum = ctbNum;
|
||||||
|
|
||||||
|
// upsert the cache
|
||||||
|
metaWLock(pMeta);
|
||||||
|
metaStatsCacheUpsert(pMeta, pInfo);
|
||||||
|
metaULock(pMeta);
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue