diff --git a/source/dnode/vnode/src/meta/metaCache.c b/source/dnode/vnode/src/meta/metaCache.c index 795f281ab2..b37e9272c5 100644 --- a/source/dnode/vnode/src/meta/metaCache.c +++ b/source/dnode/vnode/src/meta/metaCache.c @@ -59,6 +59,13 @@ struct SMetaCache { SHashObj* pTableEntry; SLRUCache* pUidResCache; } sTagFilterResCache; + + struct STbGroupResCache { + TdThreadMutex lock; + uint32_t accTimes; + SHashObj* pTableEntry; + SLRUCache* pResCache; + } STbGroupResCache; }; static void entryCacheClose(SMeta* pMeta) { @@ -144,6 +151,25 @@ int32_t metaCacheOpen(SMeta* pMeta) { taosHashSetFreeFp(pCache->sTagFilterResCache.pTableEntry, freeCacheEntryFp); taosThreadMutexInit(&pCache->sTagFilterResCache.lock, NULL); + + pCache->STbGroupResCache.pResCache = taosLRUCacheInit(5 * 1024 * 1024, -1, 0.5); + if (pCache->STbGroupResCache.pResCache == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err2; + } + + pCache->STbGroupResCache.accTimes = 0; + pCache->STbGroupResCache.pTableEntry = + taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK); + if (pCache->STbGroupResCache.pTableEntry == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err2; + } + + taosHashSetFreeFp(pCache->STbGroupResCache.pTableEntry, freeCacheEntryFp); + taosThreadMutexInit(&pCache->STbGroupResCache.lock, NULL); + + pMeta->pCache = pCache; return code; @@ -165,6 +191,10 @@ void metaCacheClose(SMeta* pMeta) { taosThreadMutexDestroy(&pMeta->pCache->sTagFilterResCache.lock); taosHashCleanup(pMeta->pCache->sTagFilterResCache.pTableEntry); + taosLRUCacheCleanup(pMeta->pCache->STbGroupResCache.pResCache); + taosThreadMutexDestroy(&pMeta->pCache->STbGroupResCache.lock); + taosHashCleanup(pMeta->pCache->STbGroupResCache.pTableEntry); + taosMemoryFree(pMeta->pCache); pMeta->pCache = NULL; } @@ -520,7 +550,7 @@ int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pK return TSDB_CODE_SUCCESS; } -static void freePayload(const void* key, size_t keyLen, void* value) { +static void freeUidCachePayload(const void* key, size_t keyLen, void* value) { if (value == NULL) { return; } @@ -626,7 +656,7 @@ int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int } // add to cache. - taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freePayload, NULL, + taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeUidCachePayload, NULL, TAOS_LRU_PRIORITY_LOW); _end: taosThreadMutexUnlock(pLock); @@ -671,3 +701,180 @@ int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) { metaDebug("vgId:%d suid:%"PRId64" cached related tag filter uid list cleared", vgId, suid); return TSDB_CODE_SUCCESS; } + +int32_t metaGetCachedTbGroup(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList) { + int32_t vgId = TD_VID(pMeta->pVnode); + + // generate the composed key for LRU cache + SLRUCache* pCache = pMeta->pCache->STbGroupResCache.pResCache; + SHashObj* pTableMap = pMeta->pCache->STbGroupResCache.pTableEntry; + TdThreadMutex* pLock = &pMeta->pCache->STbGroupResCache.lock; + + *pList = NULL; + uint64_t key[4]; + initCacheKey(key, pTableMap, suid, (const char*)pKey, keyLen); + + taosThreadMutexLock(pLock); + pMeta->pCache->STbGroupResCache.accTimes += 1; + + LRUHandle* pHandle = taosLRUCacheLookup(pCache, key, TAG_FILTER_RES_KEY_LEN); + if (pHandle == NULL) { + taosThreadMutexUnlock(pLock); + return TSDB_CODE_SUCCESS; + } + + STagFilterResEntry** pEntry = taosHashGet(pTableMap, &suid, sizeof(uint64_t)); + if (NULL == pEntry) { + metaDebug("suid %" PRIu64 " not in tb group cache", suid); + return TSDB_CODE_FAILED; + } + + *pList = taosLRUCacheValue(pCache, pHandle); + + (*pEntry)->hitTimes += 1; + + uint32_t acc = pMeta->pCache->STbGroupResCache.accTimes; + if ((*pEntry)->hitTimes % 5000 == 0 && (*pEntry)->hitTimes > 0) { + metaInfo("vgId:%d tb group cache hit:%d, total acc:%d, rate:%.2f", vgId, (*pEntry)->hitTimes, acc, ((double)(*pEntry)->hitTimes) / acc); + } + + taosLRUCacheRelease(pCache, pHandle, false); + + // unlock meta + taosThreadMutexUnlock(pLock); + return TSDB_CODE_SUCCESS; +} + + +static void freeTbGroupCachePayload(const void* key, size_t keyLen, void* value) { + if (value == NULL) { + return; + } + + const uint64_t* p = key; + if (keyLen != sizeof(int64_t) * 4) { + metaError("tb group key length is invalid, length:%d, expect:%d", (int32_t)keyLen, (int32_t)sizeof(uint64_t) * 2); + return; + } + + SHashObj* pHashObj = (SHashObj*)p[0]; + + STagFilterResEntry** pEntry = taosHashGet(pHashObj, &p[1], sizeof(uint64_t)); + + if (pEntry != NULL && (*pEntry) != NULL) { + int64_t st = taosGetTimestampUs(); + + SListIter iter = {0}; + tdListInitIter((SList*)&((*pEntry)->list), &iter, TD_LIST_FORWARD); + + SListNode* pNode = NULL; + while ((pNode = tdListNext(&iter)) != NULL) { + uint64_t* digest = (uint64_t*)pNode->data; + if (digest[0] == p[2] && digest[1] == p[3]) { + void* tmp = tdListPopNode(&((*pEntry)->list), pNode); + taosMemoryFree(tmp); + + double el = (taosGetTimestampUs() - st) / 1000.0; + metaDebug("clear one item in tb group cache, remain cached item:%d, elapsed time:%.2fms", listNEles(&((*pEntry)->list)), + el); + break; + } + } + } + + taosArrayDestroy((SArray*)value); +} + + +int32_t metaPutTbGroupToCache(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload, + int32_t payloadLen) { + int32_t code = 0; + int32_t vgId = TD_VID(pMeta->pVnode); + + if (payloadLen > tsTagFilterResCacheSize) { + metaDebug("vgId:%d, suid:%" PRIu64 + " ignore to add to tb group cache, due to payload length %d greater than threshold %d", + vgId, suid, payloadLen, tsTagFilterResCacheSize); + taosArrayDestroy((SArray*)pPayload); + return TSDB_CODE_SUCCESS; + } + + SLRUCache* pCache = pMeta->pCache->STbGroupResCache.pResCache; + SHashObj* pTableEntry = pMeta->pCache->STbGroupResCache.pTableEntry; + TdThreadMutex* pLock = &pMeta->pCache->STbGroupResCache.lock; + + uint64_t key[4] = {0}; + initCacheKey(key, pTableEntry, suid, pKey, keyLen); + + taosThreadMutexLock(pLock); + STagFilterResEntry** pEntry = taosHashGet(pTableEntry, &suid, sizeof(uint64_t)); + if (pEntry == NULL) { + code = addNewEntry(pTableEntry, pKey, keyLen, suid); + if (code != TSDB_CODE_SUCCESS) { + goto _end; + } + } else { // check if it exists or not + size_t size = listNEles(&(*pEntry)->list); + if (size == 0) { + tdListAppend(&(*pEntry)->list, pKey); + } else { + SListNode* pNode = listHead(&(*pEntry)->list); + uint64_t* p = (uint64_t*)pNode->data; + if (p[1] == ((uint64_t*)pKey)[1] && p[0] == ((uint64_t*)pKey)[0]) { + // we have already found the existed items, no need to added to cache anymore. + taosThreadMutexUnlock(pLock); + return TSDB_CODE_SUCCESS; + } else { // not equal, append it + tdListAppend(&(*pEntry)->list, pKey); + } + } + } + + // add to cache. + taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeTbGroupCachePayload, NULL, + TAOS_LRU_PRIORITY_LOW); +_end: + taosThreadMutexUnlock(pLock); + metaDebug("vgId:%d, suid:%" PRIu64 " tb group added into cache, total:%d, tables:%d", vgId, suid, + (int32_t)taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry)); + + return code; +} + +// remove the lru cache that are expired due to the tags value update, or creating, or dropping, of child tables +int32_t metaTbGroupCacheClear(SMeta* pMeta, uint64_t suid) { + uint64_t p[4] = {0}; + int32_t vgId = TD_VID(pMeta->pVnode); + SHashObj* pEntryHashMap = pMeta->pCache->STbGroupResCache.pTableEntry; + + uint64_t dummy[2] = {0}; + initCacheKey(p, pEntryHashMap, suid, (char*) &dummy[0], 16); + + TdThreadMutex* pLock = &pMeta->pCache->STbGroupResCache.lock; + taosThreadMutexLock(pLock); + + STagFilterResEntry** pEntry = taosHashGet(pEntryHashMap, &suid, sizeof(uint64_t)); + if (pEntry == NULL || listNEles(&(*pEntry)->list) == 0) { + taosThreadMutexUnlock(pLock); + return TSDB_CODE_SUCCESS; + } + + (*pEntry)->hitTimes = 0; + + SListIter iter = {0}; + tdListInitIter(&(*pEntry)->list, &iter, TD_LIST_FORWARD); + + SListNode* pNode = NULL; + while ((pNode = tdListNext(&iter)) != NULL) { + setMD5DigestInKey(p, pNode->data, 2 * sizeof(uint64_t)); + taosLRUCacheErase(pMeta->pCache->STbGroupResCache.pResCache, p, TAG_FILTER_RES_KEY_LEN); + } + + tdListEmpty(&(*pEntry)->list); + taosThreadMutexUnlock(pLock); + + metaDebug("vgId:%d suid:%"PRId64" cached related tb group cleared", vgId, suid); + return TSDB_CODE_SUCCESS; +} + + diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 3325f4055c..b2d553055d 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -767,6 +767,7 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq, STableMe metaWLock(pMeta); metaUpdateStbStats(pMeta, me.ctbEntry.suid, 1); metaUidCacheClear(pMeta, me.ctbEntry.suid); + metaTbGroupCacheClear(pMeta, me.ctbEntry.suid); metaULock(pMeta); } else { me.ntbEntry.ctime = pReq->ctime; @@ -999,6 +1000,7 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) { metaUpdateStbStats(pMeta, e.ctbEntry.suid, -1); metaUidCacheClear(pMeta, e.ctbEntry.suid); + metaTbGroupCacheClear(pMeta, e.ctbEntry.suid); } else if (e.type == TSDB_NORMAL_TABLE) { // drop schema.db (todo) @@ -1010,6 +1012,7 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) { metaStatsCacheDrop(pMeta, uid); metaUidCacheClear(pMeta, uid); + metaTbGroupCacheClear(pMeta, uid); --pMeta->pVnode->config.vndStats.numOfSTables; } @@ -1430,6 +1433,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA ((STag *)(ctbEntry.ctbEntry.pTags))->len, pMeta->txn); metaUidCacheClear(pMeta, ctbEntry.ctbEntry.suid); + metaTbGroupCacheClear(pMeta, ctbEntry.ctbEntry.suid); metaULock(pMeta); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index c7baef8d08..080ed37ee9 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -454,15 +454,14 @@ static void genTagFilterDigest(const SNode* pTagCond, T_MD5_CTX* pContext) { } -int32_t getColInfoResultForGroupby(void* metaHandle, uint64_t suid, SNodeList* group, STableListInfo* pTableListInfo) { +int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableListInfo* pTableListInfo) { int32_t code = TSDB_CODE_SUCCESS; SArray* pBlockList = NULL; SSDataBlock* pResBlock = NULL; void* keyBuf = NULL; SArray* groupData = NULL; SArray* pUidTagList = NULL; - static T_MD5_CTX lastMd5 = {-1}; - static SArray* lastTableList = NULL; + SArray* tableList = NULL; static SHashObj *pTableListHash = NULL; int32_t rows = taosArrayGetSize(pTableListInfo->pTableList); @@ -498,10 +497,10 @@ int32_t getColInfoResultForGroupby(void* metaHandle, uint64_t suid, SNodeList* g SArray **pLastTableList = (SArray **)taosHashGet(pTableListHash, context.digest, sizeof(context.digest)); if (pLastTableList && *pLastTableList) { - pTableListInfo->pTableList = taosArrayDup(lastTableList, NULL); + pTableListInfo->pTableList = taosArrayDup(*pLastTableList, NULL); goto end; } else { - qError("group not hit, last:%p, lastSize:%d, newSize:%d", lastTableList, (int32_t)taosArrayGetSize(lastTableList), (int32_t)taosArrayGetSize(pTableListInfo->pTableList)); + qError("group not hit"); } pUidTagList = taosArrayInit(8, sizeof(STUidTagInfo)); @@ -637,8 +636,8 @@ int32_t getColInfoResultForGroupby(void* metaHandle, uint64_t suid, SNodeList* g } } - lastTableList = taosArrayDup(pTableListInfo->pTableList, NULL); - taosHashPut(pTableListHash, context.digest, sizeof(context.digest), &lastTableList, POINTER_BYTES); + tableList = taosArrayDup(pTableListInfo->pTableList, NULL); + taosHashPut(pTableListHash, context.digest, sizeof(context.digest), &tableList, POINTER_BYTES); // int64_t st2 = taosGetTimestampUs(); // qDebug("calculate tag block rows:%d, cost:%ld us", rows, st2-st1); @@ -2046,7 +2045,7 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pTableListInfo->numOfOuputGroups = 1; } } else { - code = getColInfoResultForGroupby(pHandle->meta, pScanNode->suid, group, pTableListInfo); + code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo); if (code != TSDB_CODE_SUCCESS) { return code; }