refactor: do some internal refactor.
This commit is contained in:
parent
7dcad62775
commit
bb5fb42c2f
|
@ -32,7 +32,6 @@ typedef struct SMetaStbStatsEntry {
|
|||
} SMetaStbStatsEntry;
|
||||
|
||||
typedef struct STagFilterResEntry {
|
||||
// uint64_t suid; // uid for super table
|
||||
SList list; // the linked list of md5 digest, extracted from the serialized tag query condition
|
||||
uint32_t qTimes; // queried times for current super table
|
||||
} STagFilterResEntry;
|
||||
|
@ -126,7 +125,7 @@ int32_t metaCacheOpen(SMeta* pMeta) {
|
|||
goto _err2;
|
||||
}
|
||||
|
||||
pCache->sTagFilterResCache.pUidResCache = taosLRUCacheInit(25 * 1024 * 1024, -1, 0.5);
|
||||
pCache->sTagFilterResCache.pUidResCache = taosLRUCacheInit(5 * 1024 * 1024, -1, 0.5);
|
||||
if (pCache->sTagFilterResCache.pUidResCache == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err2;
|
||||
|
@ -477,15 +476,17 @@ int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pK
|
|||
SHashObj* pTableMap = pMeta->pCache->sTagFilterResCache.pTableEntry;
|
||||
TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
|
||||
|
||||
uint64_t buf[3];
|
||||
uint64_t buf[4];
|
||||
|
||||
*acquireRes = 0;
|
||||
buf[0] = suid;
|
||||
memcpy(&buf[1], pKey, keyLen);
|
||||
|
||||
buf[0] = (uint64_t) pTableMap;
|
||||
buf[1] = suid;
|
||||
memcpy(&buf[2], pKey, keyLen);
|
||||
|
||||
taosThreadMutexLock(pLock);
|
||||
|
||||
int32_t len = keyLen + sizeof(uint64_t);
|
||||
int32_t len = keyLen + sizeof(uint64_t) * 2;
|
||||
LRUHandle* pHandle = taosLRUCacheLookup(pCache, buf, len);
|
||||
if (pHandle == NULL) {
|
||||
taosThreadMutexUnlock(pLock);
|
||||
|
@ -508,18 +509,6 @@ int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pK
|
|||
|
||||
// unlock meta
|
||||
taosThreadMutexUnlock(pLock);
|
||||
|
||||
// check if scanning all items are necessary or not
|
||||
if (NEED_CHECK_CACHE_ITEM(listNEles(&(*pEntry)->list), (*pEntry)->qTimes)) {
|
||||
taosThreadMutexLock(pLock);
|
||||
|
||||
SArray* pInvalidRes = taosArrayInit(64, POINTER_BYTES);
|
||||
checkAllEntriesInCache(*pEntry, pInvalidRes, keyLen, pCache, suid);
|
||||
|
||||
removeInvalidCacheItem(pInvalidRes, *pEntry, true); // remove the keys, of which query uid lists have been replaced already.
|
||||
taosThreadMutexUnlock(pLock);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -527,6 +516,36 @@ static void freePayload(const void* key, size_t keyLen, void* value) {
|
|||
if (value == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
const uint64_t* p = key;
|
||||
if (keyLen != sizeof(int64_t) * 4) {
|
||||
metaError("key length is invalid, length:%d, expect:%d", (int32_t) keyLen, (int32_t) sizeof(uint64_t)*2);
|
||||
return;
|
||||
}
|
||||
|
||||
SHashObj* pHashObj = (SHashObj*)p[0];
|
||||
STagFilterResEntry** pEntry = taosHashGet(pHashObj, &p[1], sizeof(uint64_t));
|
||||
|
||||
{
|
||||
int64_t st = taosGetTimestampUs();
|
||||
|
||||
SListIter iter = {0};
|
||||
tdListInitIter((SList*)&((*pEntry)->list), &iter, TD_LIST_FORWARD);
|
||||
|
||||
SListNode* pNode = NULL;
|
||||
while ((pNode = tdListNext(&iter)) != NULL) {
|
||||
uint64_t* digest = (uint64_t*)pNode->data;
|
||||
if (digest[0] == p[2] && digest[1] == p[3]) {
|
||||
tdListPopNode(&((*pEntry)->list), pNode);
|
||||
|
||||
int64_t et = taosGetTimestampUs();
|
||||
metaInfo("clear items in cache, remain cached item:%d, elapsed time:%.2fms, acc count:%d", listNEles(&((*pEntry)->list)),
|
||||
(et - st)/1000.0, (*pEntry)->qTimes);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
taosMemoryFree(value);
|
||||
}
|
||||
|
||||
|
@ -566,12 +585,16 @@ int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int
|
|||
SHashObj* pTableEntry = pMeta->pCache->sTagFilterResCache.pTableEntry;
|
||||
TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
|
||||
|
||||
uint64_t buf[3] = {0};
|
||||
buf[0] = suid;
|
||||
memcpy(&buf[1], pKey, keyLen);
|
||||
ASSERT(sizeof(uint64_t) + keyLen == 24);
|
||||
int32_t code = 0;
|
||||
// the format of key:
|
||||
// hash table address(8bytes) + suid(8bytes) + MD5 digest(16bytes)
|
||||
|
||||
uint64_t buf[4] = {0};
|
||||
buf[0] = (uint64_t) pTableEntry;
|
||||
buf[1] = suid;
|
||||
memcpy(&buf[2], pKey, keyLen);
|
||||
ASSERT(keyLen == 16);
|
||||
|
||||
int32_t code = 0;
|
||||
taosThreadMutexLock(pLock);
|
||||
|
||||
STagFilterResEntry** pEntry = taosHashGet(pTableEntry, &suid, sizeof(uint64_t));
|
||||
|
@ -587,7 +610,7 @@ int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int
|
|||
size_t size = listNEles(&(*pEntry)->list);
|
||||
if (size == 0) {
|
||||
tdListAppend(&(*pEntry)->list, pKey);
|
||||
} else if (size == 1) {
|
||||
} else {
|
||||
SListNode* pNode = listHead(&(*pEntry)->list);
|
||||
uint64_t* p = (uint64_t*)pNode->data;
|
||||
if (p[1] == ((uint64_t*)pKey)[1] && p[0] == ((uint64_t*)pKey)[0]) {
|
||||
|
@ -597,17 +620,17 @@ int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int
|
|||
} else { // not equal, append it
|
||||
tdListAppend(&(*pEntry)->list, pKey);
|
||||
}
|
||||
} else { // more than one element
|
||||
/*} else { // more than one element
|
||||
bool checkCacheEntry = false;
|
||||
SArray* pInvalidRes = NULL;
|
||||
uint64_t keyBuf[3];
|
||||
|
||||
// if the threshold value is reached, need to check the value.
|
||||
if (NEED_CHECK_CACHE_ITEM(size, (*pEntry)->qTimes)) {
|
||||
checkCacheEntry = true;
|
||||
keyBuf[0] = suid;
|
||||
pInvalidRes = taosArrayInit(64, POINTER_BYTES);
|
||||
}
|
||||
// if (NEED_CHECK_CACHE_ITEM(size, (*pEntry)->qTimes)) {
|
||||
// checkCacheEntry = true;
|
||||
// keyBuf[0] = suid;
|
||||
// pInvalidRes = taosArrayInit(64, POINTER_BYTES);
|
||||
// }
|
||||
|
||||
SListIter iter = {0};
|
||||
tdListInitIter(&(*pEntry)->list, &iter, TD_LIST_FORWARD);
|
||||
|
@ -648,11 +671,12 @@ int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int
|
|||
}
|
||||
|
||||
tdListAppend(&(*pEntry)->list, pKey);
|
||||
}*/
|
||||
}
|
||||
}
|
||||
|
||||
// add to cache.
|
||||
taosLRUCacheInsert(pCache, buf, sizeof(uint64_t) + keyLen, pPayload, payloadLen, freePayload, NULL,
|
||||
taosLRUCacheInsert(pCache, buf, sizeof(uint64_t)*2 + keyLen, pPayload, payloadLen, freePayload, NULL,
|
||||
TAOS_LRU_PRIORITY_LOW);
|
||||
_end:
|
||||
taosThreadMutexUnlock(pLock);
|
||||
|
@ -666,8 +690,10 @@ int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int
|
|||
// remove the lru cache that are expired due to the tags value update, or creating, or dropping, of child tables
|
||||
int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) {
|
||||
int32_t keyLen = sizeof(uint64_t) * 3;
|
||||
uint64_t p[3] = {0};
|
||||
p[0] = suid;
|
||||
uint64_t p[4] = {0};
|
||||
|
||||
p[0] = (uint64_t) pMeta->pCache->sTagFilterResCache.pTableEntry;
|
||||
p[1] = suid;
|
||||
|
||||
TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
|
||||
|
||||
|
@ -683,7 +709,7 @@ int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) {
|
|||
|
||||
SListNode* pNode = NULL;
|
||||
while ((pNode = tdListNext(&iter)) != NULL) {
|
||||
memcpy(&p[1], pNode->data, 16);
|
||||
memcpy(&p[2], pNode->data, 16);
|
||||
taosLRUCacheErase(pMeta->pCache->sTagFilterResCache.pUidResCache, p, keyLen);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue