From 7d00b7a6c4f19b6a21562b95dfbca67e132be622 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 16 Nov 2022 14:23:12 +0800 Subject: [PATCH] refactor: add tag filter cache. --- include/common/tglobal.h | 2 + include/util/tlist.h | 2 +- source/common/src/tglobal.c | 3 + source/dnode/vnode/inc/vnode.h | 3 + source/dnode/vnode/src/meta/metaCache.c | 99 +++++++++++++++++++------ source/libs/executor/src/executil.c | 50 ++++++++++--- source/util/src/tlist.c | 2 +- 7 files changed, 127 insertions(+), 34 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 681d1beb79..48886584d2 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -44,6 +44,8 @@ extern int32_t tsCompatibleModel; extern bool tsPrintAuth; extern int64_t tsTickPerMin[3]; extern int32_t tsCountAlwaysReturnValue; +extern float tsSelectivityRatio; +extern int32_t tsTagFilterResCacheSize; // queue & threads extern int32_t tsNumOfRpcThreads; diff --git a/include/util/tlist.h b/include/util/tlist.h index 1954bda145..3dbdb72f9e 100644 --- a/include/util/tlist.h +++ b/include/util/tlist.h @@ -225,7 +225,7 @@ void *tdListFree(SList *list); void tdListPrependNode(SList *list, SListNode *node); void tdListAppendNode(SList *list, SListNode *node); int32_t tdListPrepend(SList *list, void *data); -int32_t tdListAppend(SList *list, void *data); +int32_t tdListAppend(SList *list, const void *data); SListNode *tdListPopHead(SList *list); SListNode *tdListPopTail(SList *list); SListNode *tdListGetHead(SList *list); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 50b2c976fd..41be026a4c 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -119,6 +119,9 @@ int32_t tsMinIntervalTime = 1; // maximum memory allowed to be allocated for a single csv load (in MB) int32_t tsMaxMemUsedByInsert = 1024; +float tsSelectivityRatio = 1.0; +int32_t tsTagFilterResCacheSize = 4096; + // the maximum allowed query buffer size during query processing for each data node. // -1 no limit (default) // 0 no query allowed, queries are disabled diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 0b58959822..d09724ffc4 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -108,6 +108,9 @@ int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName); int metaGetTableUidByName(void *meta, char *tbName, uint64_t *uid); int metaGetTableTypeByName(void *meta, char *tbName, ETableType *tbType); bool metaIsTableExist(SMeta *pMeta, tb_uid_t uid); +int32_t metaGetCachedTableUidList(SMeta *pMeta, tb_uid_t suid, const uint8_t *key, int32_t keyLen, SArray *pList, bool* acquired); +int32_t metaUidFilterCachePut(SMeta *pMeta, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload, + int32_t payloadLen, double selectivityRatio); typedef struct SMetaFltParam { tb_uid_t suid; diff --git a/source/dnode/vnode/src/meta/metaCache.c b/source/dnode/vnode/src/meta/metaCache.c index 98bd9626e7..e448c6e7c6 100644 --- a/source/dnode/vnode/src/meta/metaCache.c +++ b/source/dnode/vnode/src/meta/metaCache.c @@ -33,7 +33,7 @@ typedef struct SMetaStbStatsEntry { typedef struct STagFilterResEntry { uint64_t suid; // uid for super table - SList* pList; // the linked list of md5 digest, extracted from the serialized tag query condition + SList list; // the linked list of md5 digest, extracted from the serialized tag query condition uint32_t qTimes;// queried times for current super table } STagFilterResEntry; @@ -56,6 +56,7 @@ struct SMetaCache { struct STagFilterResCache { SHashObj* pTableEntry; SLRUCache* pUidResCache; + uint64_t keyBuf[3]; } sTagFilterResCache; }; @@ -119,9 +120,19 @@ int32_t metaCacheOpen(SMeta* pMeta) { goto _err2; } - pMeta->pCache = pCache; + pCache->sTagFilterResCache.pUidResCache = taosLRUCacheInit(5*1024*1024, -1, 0.5); + if (pCache->sTagFilterResCache.pUidResCache == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err2; + } -_exit: + pCache->sTagFilterResCache.pTableEntry = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK); + if (pCache->sTagFilterResCache.pTableEntry == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err2; + } + + pMeta->pCache = pCache; return code; _err2: @@ -129,7 +140,6 @@ _err2: _err: taosMemoryFree(pCache); - metaError("vgId:%d, meta open cache failed since %s", TD_VID(pMeta->pVnode), tstrerror(code)); return code; } @@ -138,6 +148,11 @@ void metaCacheClose(SMeta* pMeta) { if (pMeta->pCache) { entryCacheClose(pMeta); statsCacheClose(pMeta); + + taosHashCleanup(pMeta->pCache->sTagFilterResCache.pTableEntry); + taosLRUCacheCleanup(pMeta->pCache->sTagFilterResCache.pUidResCache); + + taosMemoryFree(pMeta->pCache->sTagFilterResCache.keyBuf); taosMemoryFree(pMeta->pCache); pMeta->pCache = NULL; } @@ -399,38 +414,48 @@ int32_t metaStatsCacheGet(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo) { return code; } -int32_t metaUidFilterCacheGet(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, LRUHandle** pHandle) { +int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1, bool* acquireRes) { + uint64_t* pBuf = pMeta->pCache->sTagFilterResCache.keyBuf; + // generate the composed key for LRU cache - char* p = taosMemoryMalloc(keyLen + sizeof(uint64_t)); - *(uint64_t*) p = suid; - memcpy(p + sizeof(suid), pKey, keyLen); + SLRUCache* pCache = pMeta->pCache->sTagFilterResCache.pUidResCache; + + pBuf[0] = suid; + memcpy(&pBuf[1], pKey, keyLen); int32_t len = keyLen + sizeof(uint64_t); - *pHandle = taosLRUCacheLookup(pMeta->pCache->sTagFilterResCache.pUidResCache, p, len); - if (*pHandle == NULL) { - taosMemoryFree(p); + LRUHandle *pHandle = taosLRUCacheLookup(pCache, pBuf, len); + if (pHandle == NULL) { + *acquireRes = 0; return TSDB_CODE_SUCCESS; } else { // do some book mark work after acquiring the filter result from cache STagFilterResEntry* pEntry = taosHashGet(pMeta->pCache->sTagFilterResCache.pTableEntry, &suid, sizeof(uint64_t)); ASSERT(pEntry != NULL); + *acquireRes = 1; + + const char* p = taosLRUCacheValue(pMeta->pCache->sTagFilterResCache.pUidResCache, pHandle); + int32_t size = *(int32_t*) p; + taosArrayAddBatch(pList1, p + sizeof(int32_t), size); pEntry->qTimes += 1; // check if scanning all items are necessary or not - if (pEntry->qTimes > 5000 && TD_DLIST_NELES(pEntry->pList) > 10) { + if (pEntry->qTimes >= 5000 && TD_DLIST_NELES(&pEntry->list) > 10) { SArray* pList = taosArrayInit(64, POINTER_BYTES); SListIter iter = {0}; - tdListInitIter(pEntry->pList, &iter, TD_LIST_FORWARD); + tdListInitIter(&pEntry->list, &iter, TD_LIST_FORWARD); SListNode* pNode = NULL; while ((pNode = tdListNext(&iter)) != NULL) { - memcpy(p + sizeof(suid), pNode->data, keyLen); + memcpy(pBuf + sizeof(suid), pNode->data, keyLen); // check whether it is existed in LRU cache, and remove it from linked list if not. - void* pRes = taosLRUCacheLookup(pMeta->pCache->sTagFilterResCache.pUidResCache, p, len); + LRUHandle* pRes = taosLRUCacheLookup(pCache, pBuf, len); if (pRes == NULL) { // remove the item in the linked list taosArrayPush(pList, &pNode); + } else { + taosLRUCacheRelease(pCache, pRes, false); } } @@ -438,19 +463,49 @@ int32_t metaUidFilterCacheGet(SMeta* pMeta, uint64_t suid, const void* pKey, int size_t s = taosArrayGetSize(pList); for(int32_t i = 0; i < s; ++i) { SListNode** p1 = taosArrayGet(pList, i); - tdListPopNode(pEntry->pList, *p1); + tdListPopNode(&pEntry->list, *p1); } - } - taosMemoryFree(p); + pEntry->qTimes = 0; // reset the query times + } } return TSDB_CODE_SUCCESS; } // check both the payload size and selectivity ratio -int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload) { +int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload, int32_t payloadLen, double selectivityRatio) { + if (selectivityRatio > tsSelectivityRatio) { + return TSDB_CODE_SUCCESS; + } + if (payloadLen > tsTagFilterResCacheSize) { + return TSDB_CODE_SUCCESS; + } + + SLRUCache* pCache = pMeta->pCache->sTagFilterResCache.pUidResCache; + SHashObj* pTableEntry = pMeta->pCache->sTagFilterResCache.pTableEntry; + + void* pEntry = taosHashGet(pMeta->pCache->sTagFilterResCache.pTableEntry, &suid, sizeof(uint64_t)); + if (pEntry == NULL) { + STagFilterResEntry* p = taosMemoryMalloc(sizeof(STagFilterResEntry)); + p->qTimes = 0; + tdListInit(&p->list, keyLen); + taosHashPut(pTableEntry, &suid, sizeof(uint64_t), pEntry, POINTER_BYTES); + + pEntry = &p; + } + + tdListAppend(&(*(STagFilterResEntry**)pEntry)->list, pKey); + + uint64_t* pBuf = pMeta->pCache->sTagFilterResCache.keyBuf; + pBuf[0] = suid; + + memcpy(&pBuf[1], pKey, keyLen); + ASSERT(sizeof(uint64_t) + keyLen == 24); + + // add to cache. + taosLRUCacheInsert(pCache, pBuf, sizeof(uint64_t) + keyLen, pPayload, payloadLen, NULL, NULL, TAOS_LRU_PRIORITY_LOW); return TSDB_CODE_SUCCESS; } @@ -466,7 +521,7 @@ int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) { *(uint64_t*)p = pEntry->suid; SListIter iter = {0}; - tdListInitIter(pEntry->pList, &iter, TD_LIST_FORWARD); + tdListInitIter(&pEntry->list, &iter, TD_LIST_FORWARD); SListNode* pNode = NULL; while ((pNode = tdListNext(&iter)) != NULL) { @@ -474,6 +529,8 @@ int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) { taosLRUCacheErase(pMeta->pCache->sTagFilterResCache.pUidResCache, p, keyLen); } + pEntry->qTimes = 0; + tdListEmpty(&pEntry->list); + return TSDB_CODE_SUCCESS; } - diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 44390ca2e5..8604fd4db9 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -973,19 +973,47 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SArray* res = taosArrayInit(8, sizeof(uint64_t)); if (pScanNode->tableType == TSDB_SUPER_TABLE) { - if (pTagIndexCond) { - SIndexMetaArg metaArg = { - .metaEx = metaHandle, .idx = tsdbGetIdx(metaHandle), .ivtIdx = tsdbGetIvtIdx(metaHandle), .suid = tableUid}; + // try to retrieve the result from meta cache + // generate the cache key + T_MD5_CTX context = {0}; - // int64_t stt = taosGetTimestampUs(); - SIdxFltStatus status = SFLT_NOT_INDEX; - code = doFilterTag(pTagIndexCond, &metaArg, res, &status); - if (code != 0 || status == SFLT_NOT_INDEX) { - qError("failed to get tableIds from index, reason:%s, suid:%" PRIu64, tstrerror(code), tableUid); - code = TDB_CODE_SUCCESS; + if (pTagIndexCond) { + char* payload = NULL; + int32_t len = 0; + nodesNodeToMsg(pTagCond, &payload, &len); + + tMD5Init(&context); + tMD5Update(&context, (uint8_t*)payload, (uint32_t)len); + tMD5Final(&context); + } + + bool acquired = false; + metaGetCachedTableUidList(metaHandle, pScanNode->suid, context.digest, tListLen(context.digest), res, &acquired); + if (!acquired) { + // failed to find the result in the cache, let try to calculate the results + if (pTagIndexCond) { + SIndexMetaArg metaArg = { + .metaEx = metaHandle, .idx = tsdbGetIdx(metaHandle), .ivtIdx = tsdbGetIvtIdx(metaHandle), .suid = tableUid}; + + SIdxFltStatus status = SFLT_NOT_INDEX; + code = doFilterTag(pTagIndexCond, &metaArg, res, &status); + if (code != 0 || status == SFLT_NOT_INDEX) { + qError("failed to get tableIds from index, reason:%s, suid:%" PRIu64, tstrerror(code), tableUid); + code = TDB_CODE_SUCCESS; + } + } else if (!pTagCond) { + vnodeGetCtbIdList(pVnode, pScanNode->suid, res); } - } else if (!pTagCond) { - vnodeGetCtbIdList(pVnode, pScanNode->suid, res); + + // let's add the filter results into meta-cache + size_t numOfTables = taosArrayGetSize(res); + size_t size = numOfTables * sizeof(uint64_t) + sizeof(int32_t); + char* pPayload = taosMemoryMalloc(size); + *(int32_t*)pPayload = numOfTables; + memcpy(pPayload + sizeof(int32_t), taosArrayGet(res, 0), numOfTables * sizeof(uint64_t)); + + metaUidFilterCachePut(metaHandle, pScanNode->suid, context.digest, tListLen(context.digest), pPayload, + size, 1); } } else { // Create one table group. if (metaIsTableExist(metaHandle, tableUid)) { diff --git a/source/util/src/tlist.c b/source/util/src/tlist.c index b1c0188051..1b12ea0cdd 100644 --- a/source/util/src/tlist.c +++ b/source/util/src/tlist.c @@ -60,7 +60,7 @@ int32_t tdListPrepend(SList *list, void *data) { return 0; } -int32_t tdListAppend(SList *list, void *data) { +int32_t tdListAppend(SList *list, const void *data) { SListNode *node = (SListNode *)taosMemoryCalloc(1, sizeof(SListNode) + list->eleSize); if (node == NULL) return -1;