enh: add table group cache

This commit is contained in:
dapan1121 2023-05-06 19:24:47 +08:00
parent 13852f5699
commit c83367e455
3 changed files with 220 additions and 10 deletions

View File

@ -59,6 +59,13 @@ struct SMetaCache {
SHashObj* pTableEntry;
SLRUCache* pUidResCache;
} sTagFilterResCache;
struct STbGroupResCache {
TdThreadMutex lock;
uint32_t accTimes;
SHashObj* pTableEntry;
SLRUCache* pResCache;
} STbGroupResCache;
};
static void entryCacheClose(SMeta* pMeta) {
@ -144,6 +151,25 @@ int32_t metaCacheOpen(SMeta* pMeta) {
taosHashSetFreeFp(pCache->sTagFilterResCache.pTableEntry, freeCacheEntryFp);
taosThreadMutexInit(&pCache->sTagFilterResCache.lock, NULL);
pCache->STbGroupResCache.pResCache = taosLRUCacheInit(5 * 1024 * 1024, -1, 0.5);
if (pCache->STbGroupResCache.pResCache == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err2;
}
pCache->STbGroupResCache.accTimes = 0;
pCache->STbGroupResCache.pTableEntry =
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
if (pCache->STbGroupResCache.pTableEntry == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err2;
}
taosHashSetFreeFp(pCache->STbGroupResCache.pTableEntry, freeCacheEntryFp);
taosThreadMutexInit(&pCache->STbGroupResCache.lock, NULL);
pMeta->pCache = pCache;
return code;
@ -165,6 +191,10 @@ void metaCacheClose(SMeta* pMeta) {
taosThreadMutexDestroy(&pMeta->pCache->sTagFilterResCache.lock);
taosHashCleanup(pMeta->pCache->sTagFilterResCache.pTableEntry);
taosLRUCacheCleanup(pMeta->pCache->STbGroupResCache.pResCache);
taosThreadMutexDestroy(&pMeta->pCache->STbGroupResCache.lock);
taosHashCleanup(pMeta->pCache->STbGroupResCache.pTableEntry);
taosMemoryFree(pMeta->pCache);
pMeta->pCache = NULL;
}
@ -520,7 +550,7 @@ int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pK
return TSDB_CODE_SUCCESS;
}
static void freePayload(const void* key, size_t keyLen, void* value) {
static void freeUidCachePayload(const void* key, size_t keyLen, void* value) {
if (value == NULL) {
return;
}
@ -626,7 +656,7 @@ int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int
}
// add to cache.
taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freePayload, NULL,
taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeUidCachePayload, NULL,
TAOS_LRU_PRIORITY_LOW);
_end:
taosThreadMutexUnlock(pLock);
@ -671,3 +701,180 @@ int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) {
metaDebug("vgId:%d suid:%"PRId64" cached related tag filter uid list cleared", vgId, suid);
return TSDB_CODE_SUCCESS;
}
int32_t metaGetCachedTbGroup(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList) {
int32_t vgId = TD_VID(pMeta->pVnode);
// generate the composed key for LRU cache
SLRUCache* pCache = pMeta->pCache->STbGroupResCache.pResCache;
SHashObj* pTableMap = pMeta->pCache->STbGroupResCache.pTableEntry;
TdThreadMutex* pLock = &pMeta->pCache->STbGroupResCache.lock;
*pList = NULL;
uint64_t key[4];
initCacheKey(key, pTableMap, suid, (const char*)pKey, keyLen);
taosThreadMutexLock(pLock);
pMeta->pCache->STbGroupResCache.accTimes += 1;
LRUHandle* pHandle = taosLRUCacheLookup(pCache, key, TAG_FILTER_RES_KEY_LEN);
if (pHandle == NULL) {
taosThreadMutexUnlock(pLock);
return TSDB_CODE_SUCCESS;
}
STagFilterResEntry** pEntry = taosHashGet(pTableMap, &suid, sizeof(uint64_t));
if (NULL == pEntry) {
metaDebug("suid %" PRIu64 " not in tb group cache", suid);
return TSDB_CODE_FAILED;
}
*pList = taosLRUCacheValue(pCache, pHandle);
(*pEntry)->hitTimes += 1;
uint32_t acc = pMeta->pCache->STbGroupResCache.accTimes;
if ((*pEntry)->hitTimes % 5000 == 0 && (*pEntry)->hitTimes > 0) {
metaInfo("vgId:%d tb group cache hit:%d, total acc:%d, rate:%.2f", vgId, (*pEntry)->hitTimes, acc, ((double)(*pEntry)->hitTimes) / acc);
}
taosLRUCacheRelease(pCache, pHandle, false);
// unlock meta
taosThreadMutexUnlock(pLock);
return TSDB_CODE_SUCCESS;
}
static void freeTbGroupCachePayload(const void* key, size_t keyLen, void* value) {
if (value == NULL) {
return;
}
const uint64_t* p = key;
if (keyLen != sizeof(int64_t) * 4) {
metaError("tb group key length is invalid, length:%d, expect:%d", (int32_t)keyLen, (int32_t)sizeof(uint64_t) * 2);
return;
}
SHashObj* pHashObj = (SHashObj*)p[0];
STagFilterResEntry** pEntry = taosHashGet(pHashObj, &p[1], sizeof(uint64_t));
if (pEntry != NULL && (*pEntry) != NULL) {
int64_t st = taosGetTimestampUs();
SListIter iter = {0};
tdListInitIter((SList*)&((*pEntry)->list), &iter, TD_LIST_FORWARD);
SListNode* pNode = NULL;
while ((pNode = tdListNext(&iter)) != NULL) {
uint64_t* digest = (uint64_t*)pNode->data;
if (digest[0] == p[2] && digest[1] == p[3]) {
void* tmp = tdListPopNode(&((*pEntry)->list), pNode);
taosMemoryFree(tmp);
double el = (taosGetTimestampUs() - st) / 1000.0;
metaDebug("clear one item in tb group cache, remain cached item:%d, elapsed time:%.2fms", listNEles(&((*pEntry)->list)),
el);
break;
}
}
}
taosArrayDestroy((SArray*)value);
}
int32_t metaPutTbGroupToCache(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
int32_t payloadLen) {
int32_t code = 0;
int32_t vgId = TD_VID(pMeta->pVnode);
if (payloadLen > tsTagFilterResCacheSize) {
metaDebug("vgId:%d, suid:%" PRIu64
" ignore to add to tb group cache, due to payload length %d greater than threshold %d",
vgId, suid, payloadLen, tsTagFilterResCacheSize);
taosArrayDestroy((SArray*)pPayload);
return TSDB_CODE_SUCCESS;
}
SLRUCache* pCache = pMeta->pCache->STbGroupResCache.pResCache;
SHashObj* pTableEntry = pMeta->pCache->STbGroupResCache.pTableEntry;
TdThreadMutex* pLock = &pMeta->pCache->STbGroupResCache.lock;
uint64_t key[4] = {0};
initCacheKey(key, pTableEntry, suid, pKey, keyLen);
taosThreadMutexLock(pLock);
STagFilterResEntry** pEntry = taosHashGet(pTableEntry, &suid, sizeof(uint64_t));
if (pEntry == NULL) {
code = addNewEntry(pTableEntry, pKey, keyLen, suid);
if (code != TSDB_CODE_SUCCESS) {
goto _end;
}
} else { // check if it exists or not
size_t size = listNEles(&(*pEntry)->list);
if (size == 0) {
tdListAppend(&(*pEntry)->list, pKey);
} else {
SListNode* pNode = listHead(&(*pEntry)->list);
uint64_t* p = (uint64_t*)pNode->data;
if (p[1] == ((uint64_t*)pKey)[1] && p[0] == ((uint64_t*)pKey)[0]) {
// we have already found the existed items, no need to added to cache anymore.
taosThreadMutexUnlock(pLock);
return TSDB_CODE_SUCCESS;
} else { // not equal, append it
tdListAppend(&(*pEntry)->list, pKey);
}
}
}
// add to cache.
taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeTbGroupCachePayload, NULL,
TAOS_LRU_PRIORITY_LOW);
_end:
taosThreadMutexUnlock(pLock);
metaDebug("vgId:%d, suid:%" PRIu64 " tb group added into cache, total:%d, tables:%d", vgId, suid,
(int32_t)taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry));
return code;
}
// remove the lru cache that are expired due to the tags value update, or creating, or dropping, of child tables
int32_t metaTbGroupCacheClear(SMeta* pMeta, uint64_t suid) {
uint64_t p[4] = {0};
int32_t vgId = TD_VID(pMeta->pVnode);
SHashObj* pEntryHashMap = pMeta->pCache->STbGroupResCache.pTableEntry;
uint64_t dummy[2] = {0};
initCacheKey(p, pEntryHashMap, suid, (char*) &dummy[0], 16);
TdThreadMutex* pLock = &pMeta->pCache->STbGroupResCache.lock;
taosThreadMutexLock(pLock);
STagFilterResEntry** pEntry = taosHashGet(pEntryHashMap, &suid, sizeof(uint64_t));
if (pEntry == NULL || listNEles(&(*pEntry)->list) == 0) {
taosThreadMutexUnlock(pLock);
return TSDB_CODE_SUCCESS;
}
(*pEntry)->hitTimes = 0;
SListIter iter = {0};
tdListInitIter(&(*pEntry)->list, &iter, TD_LIST_FORWARD);
SListNode* pNode = NULL;
while ((pNode = tdListNext(&iter)) != NULL) {
setMD5DigestInKey(p, pNode->data, 2 * sizeof(uint64_t));
taosLRUCacheErase(pMeta->pCache->STbGroupResCache.pResCache, p, TAG_FILTER_RES_KEY_LEN);
}
tdListEmpty(&(*pEntry)->list);
taosThreadMutexUnlock(pLock);
metaDebug("vgId:%d suid:%"PRId64" cached related tb group cleared", vgId, suid);
return TSDB_CODE_SUCCESS;
}

View File

@ -767,6 +767,7 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq, STableMe
metaWLock(pMeta);
metaUpdateStbStats(pMeta, me.ctbEntry.suid, 1);
metaUidCacheClear(pMeta, me.ctbEntry.suid);
metaTbGroupCacheClear(pMeta, me.ctbEntry.suid);
metaULock(pMeta);
} else {
me.ntbEntry.ctime = pReq->ctime;
@ -999,6 +1000,7 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
metaUpdateStbStats(pMeta, e.ctbEntry.suid, -1);
metaUidCacheClear(pMeta, e.ctbEntry.suid);
metaTbGroupCacheClear(pMeta, e.ctbEntry.suid);
} else if (e.type == TSDB_NORMAL_TABLE) {
// drop schema.db (todo)
@ -1010,6 +1012,7 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
metaStatsCacheDrop(pMeta, uid);
metaUidCacheClear(pMeta, uid);
metaTbGroupCacheClear(pMeta, uid);
--pMeta->pVnode->config.vndStats.numOfSTables;
}
@ -1430,6 +1433,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
((STag *)(ctbEntry.ctbEntry.pTags))->len, pMeta->txn);
metaUidCacheClear(pMeta, ctbEntry.ctbEntry.suid);
metaTbGroupCacheClear(pMeta, ctbEntry.ctbEntry.suid);
metaULock(pMeta);

View File

@ -454,15 +454,14 @@ static void genTagFilterDigest(const SNode* pTagCond, T_MD5_CTX* pContext) {
}
int32_t getColInfoResultForGroupby(void* metaHandle, uint64_t suid, SNodeList* group, STableListInfo* pTableListInfo) {
int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableListInfo* pTableListInfo) {
int32_t code = TSDB_CODE_SUCCESS;
SArray* pBlockList = NULL;
SSDataBlock* pResBlock = NULL;
void* keyBuf = NULL;
SArray* groupData = NULL;
SArray* pUidTagList = NULL;
static T_MD5_CTX lastMd5 = {-1};
static SArray* lastTableList = NULL;
SArray* tableList = NULL;
static SHashObj *pTableListHash = NULL;
int32_t rows = taosArrayGetSize(pTableListInfo->pTableList);
@ -498,10 +497,10 @@ int32_t getColInfoResultForGroupby(void* metaHandle, uint64_t suid, SNodeList* g
SArray **pLastTableList = (SArray **)taosHashGet(pTableListHash, context.digest, sizeof(context.digest));
if (pLastTableList && *pLastTableList) {
pTableListInfo->pTableList = taosArrayDup(lastTableList, NULL);
pTableListInfo->pTableList = taosArrayDup(*pLastTableList, NULL);
goto end;
} else {
qError("group not hit, last:%p, lastSize:%d, newSize:%d", lastTableList, (int32_t)taosArrayGetSize(lastTableList), (int32_t)taosArrayGetSize(pTableListInfo->pTableList));
qError("group not hit");
}
pUidTagList = taosArrayInit(8, sizeof(STUidTagInfo));
@ -637,8 +636,8 @@ int32_t getColInfoResultForGroupby(void* metaHandle, uint64_t suid, SNodeList* g
}
}
lastTableList = taosArrayDup(pTableListInfo->pTableList, NULL);
taosHashPut(pTableListHash, context.digest, sizeof(context.digest), &lastTableList, POINTER_BYTES);
tableList = taosArrayDup(pTableListInfo->pTableList, NULL);
taosHashPut(pTableListHash, context.digest, sizeof(context.digest), &tableList, POINTER_BYTES);
// int64_t st2 = taosGetTimestampUs();
// qDebug("calculate tag block rows:%d, cost:%ld us", rows, st2-st1);
@ -2046,7 +2045,7 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle*
pTableListInfo->numOfOuputGroups = 1;
}
} else {
code = getColInfoResultForGroupby(pHandle->meta, pScanNode->suid, group, pTableListInfo);
code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}