fix(query): update the mutex.
This commit is contained in:
parent
9872217462
commit
55159a7ceb
|
@ -54,6 +54,7 @@ struct SMetaCache {
|
||||||
|
|
||||||
// query cache
|
// query cache
|
||||||
struct STagFilterResCache {
|
struct STagFilterResCache {
|
||||||
|
TdThreadMutex lock;
|
||||||
SHashObj* pTableEntry;
|
SHashObj* pTableEntry;
|
||||||
SLRUCache* pUidResCache;
|
SLRUCache* pUidResCache;
|
||||||
uint64_t keyBuf[3];
|
uint64_t keyBuf[3];
|
||||||
|
@ -140,6 +141,8 @@ int32_t metaCacheOpen(SMeta* pMeta) {
|
||||||
}
|
}
|
||||||
|
|
||||||
taosHashSetFreeFp(pCache->sTagFilterResCache.pTableEntry, freeCacheEntryFp);
|
taosHashSetFreeFp(pCache->sTagFilterResCache.pTableEntry, freeCacheEntryFp);
|
||||||
|
taosThreadMutexInit(&pCache->sTagFilterResCache.lock, NULL);
|
||||||
|
|
||||||
pMeta->pCache = pCache;
|
pMeta->pCache = pCache;
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
|
@ -159,6 +162,8 @@ void metaCacheClose(SMeta* pMeta) {
|
||||||
|
|
||||||
taosHashCleanup(pMeta->pCache->sTagFilterResCache.pTableEntry);
|
taosHashCleanup(pMeta->pCache->sTagFilterResCache.pTableEntry);
|
||||||
taosLRUCacheCleanup(pMeta->pCache->sTagFilterResCache.pUidResCache);
|
taosLRUCacheCleanup(pMeta->pCache->sTagFilterResCache.pUidResCache);
|
||||||
|
taosThreadMutexDestroy(&pMeta->pCache->sTagFilterResCache.lock);
|
||||||
|
|
||||||
taosMemoryFree(pMeta->pCache);
|
taosMemoryFree(pMeta->pCache);
|
||||||
pMeta->pCache = NULL;
|
pMeta->pCache = NULL;
|
||||||
}
|
}
|
||||||
|
@ -426,6 +431,7 @@ int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pK
|
||||||
SLRUCache* pCache = pMeta->pCache->sTagFilterResCache.pUidResCache;
|
SLRUCache* pCache = pMeta->pCache->sTagFilterResCache.pUidResCache;
|
||||||
uint64_t* pBuf = pMeta->pCache->sTagFilterResCache.keyBuf;
|
uint64_t* pBuf = pMeta->pCache->sTagFilterResCache.keyBuf;
|
||||||
SHashObj* pTableMap = pMeta->pCache->sTagFilterResCache.pTableEntry;
|
SHashObj* pTableMap = pMeta->pCache->sTagFilterResCache.pTableEntry;
|
||||||
|
TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
|
||||||
|
|
||||||
uint32_t times = 0;
|
uint32_t times = 0;
|
||||||
|
|
||||||
|
@ -433,12 +439,12 @@ int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pK
|
||||||
pBuf[0] = suid;
|
pBuf[0] = suid;
|
||||||
memcpy(&pBuf[1], pKey, keyLen);
|
memcpy(&pBuf[1], pKey, keyLen);
|
||||||
|
|
||||||
metaRLock(pMeta);
|
taosThreadMutexLock(pLock);
|
||||||
|
|
||||||
int32_t len = keyLen + sizeof(uint64_t);
|
int32_t len = keyLen + sizeof(uint64_t);
|
||||||
LRUHandle* pHandle = taosLRUCacheLookup(pCache, pBuf, len);
|
LRUHandle* pHandle = taosLRUCacheLookup(pCache, pBuf, len);
|
||||||
if (pHandle == NULL) {
|
if (pHandle == NULL) {
|
||||||
metaULock(pMeta);
|
taosThreadMutexUnlock(pLock);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -457,11 +463,11 @@ int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pK
|
||||||
taosLRUCacheRelease(pCache, pHandle, false);
|
taosLRUCacheRelease(pCache, pHandle, false);
|
||||||
|
|
||||||
// unlock meta
|
// unlock meta
|
||||||
metaULock(pMeta);
|
taosThreadMutexUnlock(pLock);
|
||||||
|
|
||||||
// check if scanning all items are necessary or not
|
// check if scanning all items are necessary or not
|
||||||
if (times >= 5000 && TD_DLIST_NELES(&(*pEntry)->list) > 10) {
|
if (times >= 5000 && TD_DLIST_NELES(&(*pEntry)->list) > 10) {
|
||||||
metaWLock(pMeta);
|
taosThreadMutexLock(pLock);
|
||||||
|
|
||||||
SArray* pInvalidRes = taosArrayInit(64, POINTER_BYTES);
|
SArray* pInvalidRes = taosArrayInit(64, POINTER_BYTES);
|
||||||
|
|
||||||
|
@ -492,7 +498,7 @@ int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pK
|
||||||
atomic_store_32(&(*pEntry)->qTimes, 0); // reset the query times
|
atomic_store_32(&(*pEntry)->qTimes, 0); // reset the query times
|
||||||
taosArrayDestroy(pInvalidRes);
|
taosArrayDestroy(pInvalidRes);
|
||||||
|
|
||||||
metaULock(pMeta);
|
taosThreadMutexUnlock(pLock);
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -526,8 +532,9 @@ int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int
|
||||||
|
|
||||||
SLRUCache* pCache = pMeta->pCache->sTagFilterResCache.pUidResCache;
|
SLRUCache* pCache = pMeta->pCache->sTagFilterResCache.pUidResCache;
|
||||||
SHashObj* pTableEntry = pMeta->pCache->sTagFilterResCache.pTableEntry;
|
SHashObj* pTableEntry = pMeta->pCache->sTagFilterResCache.pTableEntry;
|
||||||
|
TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
|
||||||
|
|
||||||
metaWLock(pMeta);
|
taosThreadMutexLock(pLock);
|
||||||
|
|
||||||
STagFilterResEntry** pEntry = taosHashGet(pTableEntry, &suid, sizeof(uint64_t));
|
STagFilterResEntry** pEntry = taosHashGet(pTableEntry, &suid, sizeof(uint64_t));
|
||||||
if (pEntry == NULL) {
|
if (pEntry == NULL) {
|
||||||
|
@ -550,7 +557,7 @@ int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int
|
||||||
taosLRUCacheInsert(pCache, pBuf, sizeof(uint64_t) + keyLen, pPayload, payloadLen, freePayload, NULL,
|
taosLRUCacheInsert(pCache, pBuf, sizeof(uint64_t) + keyLen, pPayload, payloadLen, freePayload, NULL,
|
||||||
TAOS_LRU_PRIORITY_LOW);
|
TAOS_LRU_PRIORITY_LOW);
|
||||||
|
|
||||||
metaULock(pMeta);
|
taosThreadMutexUnlock(pLock);
|
||||||
|
|
||||||
metaDebug("vgId:%d, suid:%" PRIu64 " list cache added into cache, total:%d, tables:%d", TD_VID(pMeta->pVnode), suid,
|
metaDebug("vgId:%d, suid:%" PRIu64 " list cache added into cache, total:%d, tables:%d", TD_VID(pMeta->pVnode), suid,
|
||||||
(int32_t)taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry));
|
(int32_t)taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry));
|
||||||
|
@ -560,15 +567,19 @@ int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int
|
||||||
|
|
||||||
// remove the lru cache that are expired due to the tags value update, or creating, or dropping, of child tables
|
// remove the lru cache that are expired due to the tags value update, or creating, or dropping, of child tables
|
||||||
int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) {
|
int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) {
|
||||||
STagFilterResEntry** pEntry = taosHashGet(pMeta->pCache->sTagFilterResCache.pTableEntry, &suid, sizeof(uint64_t));
|
|
||||||
if (pEntry == NULL || listNEles(&(*pEntry)->list) == 0) {
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t keyLen = sizeof(uint64_t) * 3;
|
int32_t keyLen = sizeof(uint64_t) * 3;
|
||||||
uint64_t p[3] = {0};
|
uint64_t p[3] = {0};
|
||||||
p[0] = suid;
|
p[0] = suid;
|
||||||
|
|
||||||
|
TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
|
||||||
|
|
||||||
|
taosThreadMutexLock(pLock);
|
||||||
|
STagFilterResEntry** pEntry = taosHashGet(pMeta->pCache->sTagFilterResCache.pTableEntry, &suid, sizeof(uint64_t));
|
||||||
|
if (pEntry == NULL || listNEles(&(*pEntry)->list) == 0) {
|
||||||
|
taosThreadMutexUnlock(pLock);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
SListIter iter = {0};
|
SListIter iter = {0};
|
||||||
tdListInitIter(&(*pEntry)->list, &iter, TD_LIST_FORWARD);
|
tdListInitIter(&(*pEntry)->list, &iter, TD_LIST_FORWARD);
|
||||||
|
|
||||||
|
@ -581,5 +592,6 @@ int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) {
|
||||||
(*pEntry)->qTimes = 0;
|
(*pEntry)->qTimes = 0;
|
||||||
tdListEmpty(&(*pEntry)->list);
|
tdListEmpty(&(*pEntry)->list);
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(pLock);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue