fix(query)[TS-5487]. Fix slow cleanup of metacahe on exit
- Change the storage structure of the suid-related tagFilter list in the metacache to a hash set - During cleanup on exit, first clear the suid tagFilter list before cleaning the LRU cache to avoid excessive overhead. These modifications ensure that the metacache cleanup process is efficient and maintains good LRU invalidation performance.
This commit is contained in:
parent
c2ff609c5f
commit
2259dbebf7
|
@ -40,7 +40,7 @@ typedef struct SMetaStbStatsEntry {
|
||||||
} SMetaStbStatsEntry;
|
} SMetaStbStatsEntry;
|
||||||
|
|
||||||
typedef struct STagFilterResEntry {
|
typedef struct STagFilterResEntry {
|
||||||
SList list; // the linked list of md5 digest, extracted from the serialized tag query condition
|
SHashObj *set; // the set of md5 digest, extracted from the serialized tag query condition
|
||||||
uint32_t hitTimes; // queried times for current super table
|
uint32_t hitTimes; // queried times for current super table
|
||||||
} STagFilterResEntry;
|
} STagFilterResEntry;
|
||||||
|
|
||||||
|
@ -112,7 +112,7 @@ static void statsCacheClose(SMeta* pMeta) {
|
||||||
|
|
||||||
static void freeCacheEntryFp(void* param) {
|
static void freeCacheEntryFp(void* param) {
|
||||||
STagFilterResEntry** p = param;
|
STagFilterResEntry** p = param;
|
||||||
tdListEmpty(&(*p)->list);
|
taosHashCleanup((*p)->set);
|
||||||
taosMemoryFreeClear(*p);
|
taosMemoryFreeClear(*p);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -200,10 +200,12 @@ void metaCacheClose(SMeta* pMeta) {
|
||||||
entryCacheClose(pMeta);
|
entryCacheClose(pMeta);
|
||||||
statsCacheClose(pMeta);
|
statsCacheClose(pMeta);
|
||||||
|
|
||||||
|
taosHashClear(pMeta->pCache->sTagFilterResCache.pTableEntry);
|
||||||
taosLRUCacheCleanup(pMeta->pCache->sTagFilterResCache.pUidResCache);
|
taosLRUCacheCleanup(pMeta->pCache->sTagFilterResCache.pUidResCache);
|
||||||
(void)taosThreadMutexDestroy(&pMeta->pCache->sTagFilterResCache.lock);
|
(void)taosThreadMutexDestroy(&pMeta->pCache->sTagFilterResCache.lock);
|
||||||
taosHashCleanup(pMeta->pCache->sTagFilterResCache.pTableEntry);
|
taosHashCleanup(pMeta->pCache->sTagFilterResCache.pTableEntry);
|
||||||
|
|
||||||
|
taosHashClear(pMeta->pCache->STbGroupResCache.pTableEntry);
|
||||||
taosLRUCacheCleanup(pMeta->pCache->STbGroupResCache.pResCache);
|
taosLRUCacheCleanup(pMeta->pCache->STbGroupResCache.pResCache);
|
||||||
(void)taosThreadMutexDestroy(&pMeta->pCache->STbGroupResCache.lock);
|
(void)taosThreadMutexDestroy(&pMeta->pCache->STbGroupResCache.lock);
|
||||||
taosHashCleanup(pMeta->pCache->STbGroupResCache.pTableEntry);
|
taosHashCleanup(pMeta->pCache->STbGroupResCache.pTableEntry);
|
||||||
|
@ -471,34 +473,6 @@ int32_t metaStatsCacheGet(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int checkAllEntriesInCache(const STagFilterResEntry* pEntry, SArray* pInvalidRes, int32_t keyLen,
|
|
||||||
SLRUCache* pCache, uint64_t suid) {
|
|
||||||
SListIter iter = {0};
|
|
||||||
tdListInitIter((SList*)&(pEntry->list), &iter, TD_LIST_FORWARD);
|
|
||||||
|
|
||||||
SListNode* pNode = NULL;
|
|
||||||
uint64_t buf[3];
|
|
||||||
buf[0] = suid;
|
|
||||||
|
|
||||||
int32_t len = sizeof(uint64_t) * tListLen(buf);
|
|
||||||
|
|
||||||
while ((pNode = tdListNext(&iter)) != NULL) {
|
|
||||||
memcpy(&buf[1], pNode->data, keyLen);
|
|
||||||
|
|
||||||
// check whether it is existed in LRU cache, and remove it from linked list if not.
|
|
||||||
LRUHandle* pRes = taosLRUCacheLookup(pCache, buf, len);
|
|
||||||
if (pRes == NULL) { // remove the item in the linked list
|
|
||||||
if (taosArrayPush(pInvalidRes, &pNode) == NULL) {
|
|
||||||
return terrno;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
bool ret = taosLRUCacheRelease(pCache, pRes, false);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE void setMD5DigestInKey(uint64_t* pBuf, const char* key, int32_t keyLen) {
|
static FORCE_INLINE void setMD5DigestInKey(uint64_t* pBuf, const char* key, int32_t keyLen) {
|
||||||
memcpy(&pBuf[2], key, keyLen);
|
memcpy(&pBuf[2], key, keyLen);
|
||||||
}
|
}
|
||||||
|
@ -584,22 +558,11 @@ static void freeUidCachePayload(const void* key, size_t keyLen, void* value, voi
|
||||||
|
|
||||||
if (pEntry != NULL && (*pEntry) != NULL) {
|
if (pEntry != NULL && (*pEntry) != NULL) {
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
int32_t code = taosHashRemove((*pEntry)->set, &p[2], sizeof(uint64_t) * 2);
|
||||||
SListIter iter = {0};
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
tdListInitIter((SList*)&((*pEntry)->list), &iter, TD_LIST_FORWARD);
|
double el = (taosGetTimestampUs() - st) / 1000.0;
|
||||||
|
metaInfo("clear items in meta-cache, remain cached item:%d, elapsed time:%.2fms", taosHashGetSize((*pEntry)->set),
|
||||||
SListNode* pNode = NULL;
|
el);
|
||||||
while ((pNode = tdListNext(&iter)) != NULL) {
|
|
||||||
uint64_t* digest = (uint64_t*)pNode->data;
|
|
||||||
if (digest[0] == p[2] && digest[1] == p[3]) {
|
|
||||||
void* tmp = tdListPopNode(&((*pEntry)->list), pNode);
|
|
||||||
taosMemoryFree(tmp);
|
|
||||||
|
|
||||||
double el = (taosGetTimestampUs() - st) / 1000.0;
|
|
||||||
metaInfo("clear items in meta-cache, remain cached item:%d, elapsed time:%.2fms", listNEles(&((*pEntry)->list)),
|
|
||||||
el);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -607,16 +570,30 @@ static void freeUidCachePayload(const void* key, size_t keyLen, void* value, voi
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t addNewEntry(SHashObj* pTableEntry, const void* pKey, int32_t keyLen, uint64_t suid) {
|
static int32_t addNewEntry(SHashObj* pTableEntry, const void* pKey, int32_t keyLen, uint64_t suid) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
int32_t lino = 0;
|
||||||
STagFilterResEntry* p = taosMemoryMalloc(sizeof(STagFilterResEntry));
|
STagFilterResEntry* p = taosMemoryMalloc(sizeof(STagFilterResEntry));
|
||||||
if (p == NULL) {
|
TSDB_CHECK_NULL(p, code, lino, _end, terrno);
|
||||||
return terrno;
|
|
||||||
}
|
|
||||||
|
|
||||||
p->hitTimes = 0;
|
p->hitTimes = 0;
|
||||||
tdListInit(&p->list, keyLen);
|
p->set = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||||
TAOS_CHECK_RETURN(taosHashPut(pTableEntry, &suid, sizeof(uint64_t), &p, POINTER_BYTES));
|
TSDB_CHECK_NULL(p->set, code, lino, _end, terrno);
|
||||||
TAOS_CHECK_RETURN(tdListAppend(&p->list, pKey));
|
code = taosHashPut(p->set, pKey, keyLen, NULL, 0);
|
||||||
return 0;
|
TSDB_CHECK_CODE(code, lino, _end);
|
||||||
|
code = taosHashPut(pTableEntry, &suid, sizeof(uint64_t), &p, POINTER_BYTES);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
|
_end:
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
metaError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||||
|
if (p != NULL) {
|
||||||
|
if (p->set != NULL) {
|
||||||
|
taosHashCleanup(p->set);
|
||||||
|
}
|
||||||
|
taosMemoryFree(p);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
// check both the payload size and selectivity ratio
|
// check both the payload size and selectivity ratio
|
||||||
|
@ -657,25 +634,14 @@ int32_t metaUidFilterCachePut(void* pVnode, uint64_t suid, const void* pKey, int
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
} else { // check if it exists or not
|
} else { // check if it exists or not
|
||||||
size_t size = listNEles(&(*pEntry)->list);
|
code = taosHashPut((*pEntry)->set, pKey, keyLen, NULL, 0);
|
||||||
if (size == 0) {
|
if (code == TSDB_CODE_DUP_KEY) {
|
||||||
code = tdListAppend(&(*pEntry)->list, pKey);
|
// we have already found the existed items, no need to added to cache anymore.
|
||||||
if (code) {
|
(void)taosThreadMutexUnlock(pLock);
|
||||||
goto _end;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
} else {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
SListNode* pNode = listHead(&(*pEntry)->list);
|
goto _end;
|
||||||
uint64_t* p = (uint64_t*)pNode->data;
|
|
||||||
if (p[1] == ((uint64_t*)pKey)[1] && p[0] == ((uint64_t*)pKey)[0]) {
|
|
||||||
// we have already found the existed items, no need to added to cache anymore.
|
|
||||||
(void)taosThreadMutexUnlock(pLock);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
} else { // not equal, append it
|
|
||||||
code = tdListAppend(&(*pEntry)->list, pKey);
|
|
||||||
if (code) {
|
|
||||||
goto _end;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -703,23 +669,20 @@ int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) {
|
||||||
(void)taosThreadMutexLock(pLock);
|
(void)taosThreadMutexLock(pLock);
|
||||||
|
|
||||||
STagFilterResEntry** pEntry = taosHashGet(pEntryHashMap, &suid, sizeof(uint64_t));
|
STagFilterResEntry** pEntry = taosHashGet(pEntryHashMap, &suid, sizeof(uint64_t));
|
||||||
if (pEntry == NULL || listNEles(&(*pEntry)->list) == 0) {
|
if (pEntry == NULL || taosHashGetSize((*pEntry)->set) == 0) {
|
||||||
(void)taosThreadMutexUnlock(pLock);
|
(void)taosThreadMutexUnlock(pLock);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
(*pEntry)->hitTimes = 0;
|
(*pEntry)->hitTimes = 0;
|
||||||
|
|
||||||
SListIter iter = {0};
|
char *iter = taosHashIterate((*pEntry)->set, NULL);
|
||||||
tdListInitIter(&(*pEntry)->list, &iter, TD_LIST_FORWARD);
|
while (iter != NULL) {
|
||||||
|
setMD5DigestInKey(p, iter, 2 * sizeof(uint64_t));
|
||||||
SListNode* pNode = NULL;
|
|
||||||
while ((pNode = tdListNext(&iter)) != NULL) {
|
|
||||||
setMD5DigestInKey(p, pNode->data, 2 * sizeof(uint64_t));
|
|
||||||
taosLRUCacheErase(pMeta->pCache->sTagFilterResCache.pUidResCache, p, TAG_FILTER_RES_KEY_LEN);
|
taosLRUCacheErase(pMeta->pCache->sTagFilterResCache.pUidResCache, p, TAG_FILTER_RES_KEY_LEN);
|
||||||
|
iter = taosHashIterate((*pEntry)->set, iter);
|
||||||
}
|
}
|
||||||
|
taosHashClear((*pEntry)->set);
|
||||||
tdListEmpty(&(*pEntry)->list);
|
|
||||||
(void)taosThreadMutexUnlock(pLock);
|
(void)taosThreadMutexUnlock(pLock);
|
||||||
|
|
||||||
metaDebug("vgId:%d suid:%" PRId64 " cached related tag filter uid list cleared", vgId, suid);
|
metaDebug("vgId:%d suid:%" PRId64 " cached related tag filter uid list cleared", vgId, suid);
|
||||||
|
@ -789,22 +752,11 @@ static void freeTbGroupCachePayload(const void* key, size_t keyLen, void* value,
|
||||||
|
|
||||||
if (pEntry != NULL && (*pEntry) != NULL) {
|
if (pEntry != NULL && (*pEntry) != NULL) {
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
int32_t code = taosHashRemove((*pEntry)->set, &p[2], sizeof(uint64_t) * 2);
|
||||||
SListIter iter = {0};
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
tdListInitIter((SList*)&((*pEntry)->list), &iter, TD_LIST_FORWARD);
|
double el = (taosGetTimestampUs() - st) / 1000.0;
|
||||||
|
metaDebug("clear one item in tb group cache, remain cached item:%d, elapsed time:%.2fms",
|
||||||
SListNode* pNode = NULL;
|
taosHashGetSize((*pEntry)->set), el);
|
||||||
while ((pNode = tdListNext(&iter)) != NULL) {
|
|
||||||
uint64_t* digest = (uint64_t*)pNode->data;
|
|
||||||
if (digest[0] == p[2] && digest[1] == p[3]) {
|
|
||||||
void* tmp = tdListPopNode(&((*pEntry)->list), pNode);
|
|
||||||
taosMemoryFree(tmp);
|
|
||||||
|
|
||||||
double el = (taosGetTimestampUs() - st) / 1000.0;
|
|
||||||
metaDebug("clear one item in tb group cache, remain cached item:%d, elapsed time:%.2fms",
|
|
||||||
listNEles(&((*pEntry)->list)), el);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -840,25 +792,14 @@ int32_t metaPutTbGroupToCache(void* pVnode, uint64_t suid, const void* pKey, int
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
} else { // check if it exists or not
|
} else { // check if it exists or not
|
||||||
size_t size = listNEles(&(*pEntry)->list);
|
code = taosHashPut((*pEntry)->set, pKey, keyLen, NULL, 0);
|
||||||
if (size == 0) {
|
if (code == TSDB_CODE_DUP_KEY) {
|
||||||
code = tdListAppend(&(*pEntry)->list, pKey);
|
// we have already found the existed items, no need to added to cache anymore.
|
||||||
if (code) {
|
(void)taosThreadMutexUnlock(pLock);
|
||||||
goto _end;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
} else {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
SListNode* pNode = listHead(&(*pEntry)->list);
|
goto _end;
|
||||||
uint64_t* p = (uint64_t*)pNode->data;
|
|
||||||
if (p[1] == ((uint64_t*)pKey)[1] && p[0] == ((uint64_t*)pKey)[0]) {
|
|
||||||
// we have already found the existed items, no need to added to cache anymore.
|
|
||||||
(void)taosThreadMutexUnlock(pLock);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
} else { // not equal, append it
|
|
||||||
code = tdListAppend(&(*pEntry)->list, pKey);
|
|
||||||
if (code) {
|
|
||||||
goto _end;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -886,23 +827,20 @@ int32_t metaTbGroupCacheClear(SMeta* pMeta, uint64_t suid) {
|
||||||
(void)taosThreadMutexLock(pLock);
|
(void)taosThreadMutexLock(pLock);
|
||||||
|
|
||||||
STagFilterResEntry** pEntry = taosHashGet(pEntryHashMap, &suid, sizeof(uint64_t));
|
STagFilterResEntry** pEntry = taosHashGet(pEntryHashMap, &suid, sizeof(uint64_t));
|
||||||
if (pEntry == NULL || listNEles(&(*pEntry)->list) == 0) {
|
if (pEntry == NULL || taosHashGetSize((*pEntry)->set) == 0) {
|
||||||
(void)taosThreadMutexUnlock(pLock);
|
(void)taosThreadMutexUnlock(pLock);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
(*pEntry)->hitTimes = 0;
|
(*pEntry)->hitTimes = 0;
|
||||||
|
|
||||||
SListIter iter = {0};
|
char *iter = taosHashIterate((*pEntry)->set, NULL);
|
||||||
tdListInitIter(&(*pEntry)->list, &iter, TD_LIST_FORWARD);
|
while (iter != NULL) {
|
||||||
|
setMD5DigestInKey(p, iter, 2 * sizeof(uint64_t));
|
||||||
SListNode* pNode = NULL;
|
|
||||||
while ((pNode = tdListNext(&iter)) != NULL) {
|
|
||||||
setMD5DigestInKey(p, pNode->data, 2 * sizeof(uint64_t));
|
|
||||||
taosLRUCacheErase(pMeta->pCache->STbGroupResCache.pResCache, p, TAG_FILTER_RES_KEY_LEN);
|
taosLRUCacheErase(pMeta->pCache->STbGroupResCache.pResCache, p, TAG_FILTER_RES_KEY_LEN);
|
||||||
|
iter = taosHashIterate((*pEntry)->set, iter);
|
||||||
}
|
}
|
||||||
|
taosHashClear((*pEntry)->set);
|
||||||
tdListEmpty(&(*pEntry)->list);
|
|
||||||
(void)taosThreadMutexUnlock(pLock);
|
(void)taosThreadMutexUnlock(pLock);
|
||||||
|
|
||||||
metaDebug("vgId:%d suid:%" PRId64 " cached related tb group cleared", vgId, suid);
|
metaDebug("vgId:%d suid:%" PRId64 " cached related tb group cleared", vgId, suid);
|
||||||
|
|
Loading…
Reference in New Issue