Merge pull request #20472 from taosdata/fix/liaohj
fix(query): set the correct cached tags list key length, and update s…
This commit is contained in:
commit
36cec7894b
|
@ -77,7 +77,7 @@ void mndCleanupConsumer(SMnode *pMnode) {}
|
|||
|
||||
bool mndRebTryStart() {
|
||||
int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
|
||||
mInfo("tq timer, rebalance counter old val:%d", old);
|
||||
mDebug("tq timer, rebalance counter old val:%d", old);
|
||||
return old == 0;
|
||||
}
|
||||
|
||||
|
@ -253,11 +253,11 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
|
|||
SMqConsumerObj *pConsumer;
|
||||
void *pIter = NULL;
|
||||
|
||||
mTrace("start to process mq timer");
|
||||
mDebug("start to process mq timer");
|
||||
|
||||
// rebalance cannot be parallel
|
||||
if (!mndRebTryStart()) {
|
||||
mInfo("mq rebalance already in progress, do nothing");
|
||||
mDebug("mq rebalance already in progress, do nothing");
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -356,7 +356,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
|
|||
} else {
|
||||
taosHashCleanup(pRebMsg->rebSubHash);
|
||||
rpcFreeCont(pRebMsg);
|
||||
mInfo("mq rebalance finished, no modification");
|
||||
mDebug("mq rebalance finished, no modification");
|
||||
mndRebEnd();
|
||||
}
|
||||
return 0;
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
*/
|
||||
#include "meta.h"
|
||||
|
||||
#define TAG_FILTER_RES_KEY_LEN 32
|
||||
#define META_CACHE_BASE_BUCKET 1024
|
||||
#define META_CACHE_STATS_BUCKET 16
|
||||
|
||||
|
@ -34,7 +35,6 @@ typedef struct SMetaStbStatsEntry {
|
|||
typedef struct STagFilterResEntry {
|
||||
SList list; // the linked list of md5 digest, extracted from the serialized tag query condition
|
||||
uint32_t hitTimes; // queried times for current super table
|
||||
uint32_t accTime;
|
||||
} STagFilterResEntry;
|
||||
|
||||
struct SMetaCache {
|
||||
|
@ -455,26 +455,37 @@ static int checkAllEntriesInCache(const STagFilterResEntry* pEntry, SArray* pInv
|
|||
return 0;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void setMD5DigestInKey(uint64_t* pBuf, const char* key, int32_t keyLen) {
|
||||
// ASSERT(keyLen == sizeof(int64_t) * 2);
|
||||
memcpy(&pBuf[2], key, keyLen);
|
||||
}
|
||||
|
||||
// the format of key:
|
||||
// hash table address(8bytes) + suid(8bytes) + MD5 digest(16bytes)
|
||||
static void initCacheKey(uint64_t* buf, const SHashObj* pHashMap, uint64_t suid, const char* key, int32_t keyLen) {
|
||||
buf[0] = (uint64_t) pHashMap;
|
||||
buf[1] = suid;
|
||||
setMD5DigestInKey(buf, key, keyLen);
|
||||
ASSERT(keyLen == sizeof(uint64_t) * 2);
|
||||
}
|
||||
|
||||
int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1,
|
||||
bool* acquireRes) {
|
||||
int32_t vgId = TD_VID(pMeta->pVnode);
|
||||
|
||||
// generate the composed key for LRU cache
|
||||
SLRUCache* pCache = pMeta->pCache->sTagFilterResCache.pUidResCache;
|
||||
SHashObj* pTableMap = pMeta->pCache->sTagFilterResCache.pTableEntry;
|
||||
TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
|
||||
|
||||
uint64_t buf[4];
|
||||
|
||||
*acquireRes = 0;
|
||||
|
||||
buf[0] = (uint64_t)pTableMap;
|
||||
buf[1] = suid;
|
||||
memcpy(&buf[2], pKey, keyLen);
|
||||
uint64_t key[4];
|
||||
initCacheKey(key, pTableMap, suid, (const char*)pKey, keyLen);
|
||||
|
||||
taosThreadMutexLock(pLock);
|
||||
pMeta->pCache->sTagFilterResCache.accTimes += 1;
|
||||
|
||||
int32_t len = keyLen + sizeof(uint64_t) * 2;
|
||||
LRUHandle* pHandle = taosLRUCacheLookup(pCache, buf, len);
|
||||
LRUHandle* pHandle = taosLRUCacheLookup(pCache, key, TAG_FILTER_RES_KEY_LEN);
|
||||
if (pHandle == NULL) {
|
||||
taosThreadMutexUnlock(pLock);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -499,7 +510,7 @@ int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pK
|
|||
|
||||
uint32_t acc = pMeta->pCache->sTagFilterResCache.accTimes;
|
||||
if ((*pEntry)->hitTimes % 5000 == 0 && (*pEntry)->hitTimes > 0) {
|
||||
metaInfo("cache hit:%d, total acc:%d, rate:%.2f", (*pEntry)->hitTimes, acc, ((double)(*pEntry)->hitTimes) / acc);
|
||||
metaInfo("vgId:%d cache hit:%d, total acc:%d, rate:%.2f", vgId, (*pEntry)->hitTimes, acc, ((double)(*pEntry)->hitTimes) / acc);
|
||||
}
|
||||
|
||||
taosLRUCacheRelease(pCache, pHandle, false);
|
||||
|
@ -563,10 +574,13 @@ static int32_t addNewEntry(SHashObj* pTableEntry, const void* pKey, int32_t keyL
|
|||
// check both the payload size and selectivity ratio
|
||||
int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
|
||||
int32_t payloadLen, double selectivityRatio) {
|
||||
int32_t code = 0;
|
||||
int32_t vgId = TD_VID(pMeta->pVnode);
|
||||
|
||||
if (selectivityRatio > tsSelectivityRatio) {
|
||||
metaDebug("vgId:%d, suid:%" PRIu64
|
||||
" failed to add to uid list cache, due to selectivity ratio %.2f less than threshold %.2f",
|
||||
TD_VID(pMeta->pVnode), suid, selectivityRatio, tsSelectivityRatio);
|
||||
vgId, suid, selectivityRatio, tsSelectivityRatio);
|
||||
taosMemoryFree(pPayload);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -574,7 +588,7 @@ int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int
|
|||
if (payloadLen > tsTagFilterResCacheSize) {
|
||||
metaDebug("vgId:%d, suid:%" PRIu64
|
||||
" failed to add to uid list cache, due to payload length %d greater than threshold %d",
|
||||
TD_VID(pMeta->pVnode), suid, payloadLen, tsTagFilterResCacheSize);
|
||||
vgId, suid, payloadLen, tsTagFilterResCacheSize);
|
||||
taosMemoryFree(pPayload);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -583,26 +597,17 @@ int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int
|
|||
SHashObj* pTableEntry = pMeta->pCache->sTagFilterResCache.pTableEntry;
|
||||
TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
|
||||
|
||||
// the format of key:
|
||||
// hash table address(8bytes) + suid(8bytes) + MD5 digest(16bytes)
|
||||
uint64_t key[4] = {0};
|
||||
initCacheKey(key, pTableEntry, suid, pKey, keyLen);
|
||||
|
||||
uint64_t buf[4] = {0};
|
||||
buf[0] = (uint64_t)pTableEntry;
|
||||
buf[1] = suid;
|
||||
memcpy(&buf[2], pKey, keyLen);
|
||||
ASSERT(keyLen == 16);
|
||||
|
||||
int32_t code = 0;
|
||||
taosThreadMutexLock(pLock);
|
||||
|
||||
STagFilterResEntry** pEntry = taosHashGet(pTableEntry, &suid, sizeof(uint64_t));
|
||||
if (pEntry == NULL) {
|
||||
code = addNewEntry(pTableEntry, pKey, keyLen, suid);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _end;
|
||||
}
|
||||
} else {
|
||||
// check if it exists or not
|
||||
} else { // check if it exists or not
|
||||
size_t size = listNEles(&(*pEntry)->list);
|
||||
if (size == 0) {
|
||||
tdListAppend(&(*pEntry)->list, pKey);
|
||||
|
@ -620,12 +625,11 @@ int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int
|
|||
}
|
||||
|
||||
// add to cache.
|
||||
taosLRUCacheInsert(pCache, buf, sizeof(uint64_t) * 2 + keyLen, pPayload, payloadLen, freePayload, NULL,
|
||||
taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freePayload, NULL,
|
||||
TAOS_LRU_PRIORITY_LOW);
|
||||
_end:
|
||||
taosThreadMutexUnlock(pLock);
|
||||
|
||||
metaDebug("vgId:%d, suid:%" PRIu64 " list cache added into cache, total:%d, tables:%d", TD_VID(pMeta->pVnode), suid,
|
||||
metaDebug("vgId:%d, suid:%" PRIu64 " list cache added into cache, total:%d, tables:%d", vgId, suid,
|
||||
(int32_t)taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry));
|
||||
|
||||
return code;
|
||||
|
@ -633,33 +637,36 @@ _end:
|
|||
|
||||
// remove the lru cache that are expired due to the tags value update, or creating, or dropping, of child tables
|
||||
int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) {
|
||||
int32_t keyLen = sizeof(uint64_t) * 3;
|
||||
uint64_t p[4] = {0};
|
||||
int32_t vgId = TD_VID(pMeta->pVnode);
|
||||
SHashObj* pEntryHashMap = pMeta->pCache->sTagFilterResCache.pTableEntry;
|
||||
|
||||
p[0] = (uint64_t)pMeta->pCache->sTagFilterResCache.pTableEntry;
|
||||
p[1] = suid;
|
||||
uint64_t dummy[2] = {0};
|
||||
initCacheKey(p, pEntryHashMap, suid, (char*) &dummy[0], 16);
|
||||
|
||||
TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
|
||||
|
||||
taosThreadMutexLock(pLock);
|
||||
STagFilterResEntry** pEntry = taosHashGet(pMeta->pCache->sTagFilterResCache.pTableEntry, &suid, sizeof(uint64_t));
|
||||
|
||||
STagFilterResEntry** pEntry = taosHashGet(pEntryHashMap, &suid, sizeof(uint64_t));
|
||||
if (pEntry == NULL || listNEles(&(*pEntry)->list) == 0) {
|
||||
taosThreadMutexUnlock(pLock);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
(*pEntry)->hitTimes = 0;
|
||||
|
||||
SListIter iter = {0};
|
||||
tdListInitIter(&(*pEntry)->list, &iter, TD_LIST_FORWARD);
|
||||
|
||||
SListNode* pNode = NULL;
|
||||
while ((pNode = tdListNext(&iter)) != NULL) {
|
||||
memcpy(&p[2], pNode->data, 16);
|
||||
taosLRUCacheErase(pMeta->pCache->sTagFilterResCache.pUidResCache, p, keyLen);
|
||||
setMD5DigestInKey(p, pNode->data, 2 * sizeof(uint64_t));
|
||||
taosLRUCacheErase(pMeta->pCache->sTagFilterResCache.pUidResCache, p, TAG_FILTER_RES_KEY_LEN);
|
||||
}
|
||||
|
||||
(*pEntry)->hitTimes = 0;
|
||||
tdListEmpty(&(*pEntry)->list);
|
||||
|
||||
taosThreadMutexUnlock(pLock);
|
||||
|
||||
metaDebug("vgId:%d suid:%"PRId64" cached related tag filter uid list cleared", vgId, suid);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -296,7 +296,7 @@ int32_t tqSeekVer(STqReader* pReader, int64_t ver, const char* id) {
|
|||
// todo set the correct vgId
|
||||
tqDebug("tmq poll: wal seek to version:%"PRId64" %s", ver, id);
|
||||
if (walReadSeekVer(pReader->pWalReader, ver) < 0) {
|
||||
tqError("tmq poll: wal reader failed to seek to ver:%"PRId64" code:%s, %s", ver, tstrerror(terrno), id);
|
||||
tqDebug("tmq poll: wal reader failed to seek to ver:%"PRId64" code:%s, %s", ver, tstrerror(terrno), id);
|
||||
return -1;
|
||||
} else {
|
||||
tqDebug("tmq poll: wal reader seek to ver:%"PRId64" %s", ver, id);
|
||||
|
|
Loading…
Reference in New Issue