refactor: add tag filter cache.

This commit is contained in:
Haojun Liao 2022-11-16 14:23:12 +08:00
parent ba5244d1b6
commit 7d00b7a6c4
7 changed files with 127 additions and 34 deletions

View File

@ -44,6 +44,8 @@ extern int32_t tsCompatibleModel;
extern bool tsPrintAuth;
extern int64_t tsTickPerMin[3];
extern int32_t tsCountAlwaysReturnValue;
extern float tsSelectivityRatio;
extern int32_t tsTagFilterResCacheSize;
// queue & threads
extern int32_t tsNumOfRpcThreads;

View File

@ -225,7 +225,7 @@ void *tdListFree(SList *list);
void tdListPrependNode(SList *list, SListNode *node);
void tdListAppendNode(SList *list, SListNode *node);
int32_t tdListPrepend(SList *list, void *data);
int32_t tdListAppend(SList *list, void *data);
int32_t tdListAppend(SList *list, const void *data);
SListNode *tdListPopHead(SList *list);
SListNode *tdListPopTail(SList *list);
SListNode *tdListGetHead(SList *list);

View File

@ -119,6 +119,9 @@ int32_t tsMinIntervalTime = 1;
// maximum memory allowed to be allocated for a single csv load (in MB)
int32_t tsMaxMemUsedByInsert = 1024;
float tsSelectivityRatio = 1.0;
int32_t tsTagFilterResCacheSize = 4096;
// the maximum allowed query buffer size during query processing for each data node.
// -1 no limit (default)
// 0 no query allowed, queries are disabled

View File

@ -108,6 +108,9 @@ int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName);
int metaGetTableUidByName(void *meta, char *tbName, uint64_t *uid);
int metaGetTableTypeByName(void *meta, char *tbName, ETableType *tbType);
bool metaIsTableExist(SMeta *pMeta, tb_uid_t uid);
int32_t metaGetCachedTableUidList(SMeta *pMeta, tb_uid_t suid, const uint8_t *key, int32_t keyLen, SArray *pList, bool* acquired);
int32_t metaUidFilterCachePut(SMeta *pMeta, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload,
int32_t payloadLen, double selectivityRatio);
typedef struct SMetaFltParam {
tb_uid_t suid;

View File

@ -33,7 +33,7 @@ typedef struct SMetaStbStatsEntry {
typedef struct STagFilterResEntry {
uint64_t suid; // uid for super table
SList* pList; // the linked list of md5 digest, extracted from the serialized tag query condition
SList list; // the linked list of md5 digest, extracted from the serialized tag query condition
uint32_t qTimes;// queried times for current super table
} STagFilterResEntry;
@ -56,6 +56,7 @@ struct SMetaCache {
struct STagFilterResCache {
SHashObj* pTableEntry;
SLRUCache* pUidResCache;
uint64_t keyBuf[3];
} sTagFilterResCache;
};
@ -119,9 +120,19 @@ int32_t metaCacheOpen(SMeta* pMeta) {
goto _err2;
}
pMeta->pCache = pCache;
pCache->sTagFilterResCache.pUidResCache = taosLRUCacheInit(5*1024*1024, -1, 0.5);
if (pCache->sTagFilterResCache.pUidResCache == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err2;
}
_exit:
pCache->sTagFilterResCache.pTableEntry = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
if (pCache->sTagFilterResCache.pTableEntry == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err2;
}
pMeta->pCache = pCache;
return code;
_err2:
@ -129,7 +140,6 @@ _err2:
_err:
taosMemoryFree(pCache);
metaError("vgId:%d, meta open cache failed since %s", TD_VID(pMeta->pVnode), tstrerror(code));
return code;
}
@ -138,6 +148,11 @@ void metaCacheClose(SMeta* pMeta) {
if (pMeta->pCache) {
entryCacheClose(pMeta);
statsCacheClose(pMeta);
taosHashCleanup(pMeta->pCache->sTagFilterResCache.pTableEntry);
taosLRUCacheCleanup(pMeta->pCache->sTagFilterResCache.pUidResCache);
taosMemoryFree(pMeta->pCache->sTagFilterResCache.keyBuf);
taosMemoryFree(pMeta->pCache);
pMeta->pCache = NULL;
}
@ -399,38 +414,48 @@ int32_t metaStatsCacheGet(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo) {
return code;
}
int32_t metaUidFilterCacheGet(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, LRUHandle** pHandle) {
int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1, bool* acquireRes) {
uint64_t* pBuf = pMeta->pCache->sTagFilterResCache.keyBuf;
// generate the composed key for LRU cache
char* p = taosMemoryMalloc(keyLen + sizeof(uint64_t));
*(uint64_t*) p = suid;
memcpy(p + sizeof(suid), pKey, keyLen);
SLRUCache* pCache = pMeta->pCache->sTagFilterResCache.pUidResCache;
pBuf[0] = suid;
memcpy(&pBuf[1], pKey, keyLen);
int32_t len = keyLen + sizeof(uint64_t);
*pHandle = taosLRUCacheLookup(pMeta->pCache->sTagFilterResCache.pUidResCache, p, len);
if (*pHandle == NULL) {
taosMemoryFree(p);
LRUHandle *pHandle = taosLRUCacheLookup(pCache, pBuf, len);
if (pHandle == NULL) {
*acquireRes = 0;
return TSDB_CODE_SUCCESS;
} else { // do some book mark work after acquiring the filter result from cache
STagFilterResEntry* pEntry = taosHashGet(pMeta->pCache->sTagFilterResCache.pTableEntry, &suid, sizeof(uint64_t));
ASSERT(pEntry != NULL);
*acquireRes = 1;
const char* p = taosLRUCacheValue(pMeta->pCache->sTagFilterResCache.pUidResCache, pHandle);
int32_t size = *(int32_t*) p;
taosArrayAddBatch(pList1, p + sizeof(int32_t), size);
pEntry->qTimes += 1;
// check if scanning all items are necessary or not
if (pEntry->qTimes > 5000 && TD_DLIST_NELES(pEntry->pList) > 10) {
if (pEntry->qTimes >= 5000 && TD_DLIST_NELES(&pEntry->list) > 10) {
SArray* pList = taosArrayInit(64, POINTER_BYTES);
SListIter iter = {0};
tdListInitIter(pEntry->pList, &iter, TD_LIST_FORWARD);
tdListInitIter(&pEntry->list, &iter, TD_LIST_FORWARD);
SListNode* pNode = NULL;
while ((pNode = tdListNext(&iter)) != NULL) {
memcpy(p + sizeof(suid), pNode->data, keyLen);
memcpy(pBuf + sizeof(suid), pNode->data, keyLen);
// check whether it is existed in LRU cache, and remove it from linked list if not.
void* pRes = taosLRUCacheLookup(pMeta->pCache->sTagFilterResCache.pUidResCache, p, len);
LRUHandle* pRes = taosLRUCacheLookup(pCache, pBuf, len);
if (pRes == NULL) { // remove the item in the linked list
taosArrayPush(pList, &pNode);
} else {
taosLRUCacheRelease(pCache, pRes, false);
}
}
@ -438,19 +463,49 @@ int32_t metaUidFilterCacheGet(SMeta* pMeta, uint64_t suid, const void* pKey, int
size_t s = taosArrayGetSize(pList);
for(int32_t i = 0; i < s; ++i) {
SListNode** p1 = taosArrayGet(pList, i);
tdListPopNode(pEntry->pList, *p1);
tdListPopNode(&pEntry->list, *p1);
}
}
taosMemoryFree(p);
pEntry->qTimes = 0; // reset the query times
}
}
return TSDB_CODE_SUCCESS;
}
// check both the payload size and selectivity ratio
int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload) {
int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload, int32_t payloadLen, double selectivityRatio) {
if (selectivityRatio > tsSelectivityRatio) {
return TSDB_CODE_SUCCESS;
}
if (payloadLen > tsTagFilterResCacheSize) {
return TSDB_CODE_SUCCESS;
}
SLRUCache* pCache = pMeta->pCache->sTagFilterResCache.pUidResCache;
SHashObj* pTableEntry = pMeta->pCache->sTagFilterResCache.pTableEntry;
void* pEntry = taosHashGet(pMeta->pCache->sTagFilterResCache.pTableEntry, &suid, sizeof(uint64_t));
if (pEntry == NULL) {
STagFilterResEntry* p = taosMemoryMalloc(sizeof(STagFilterResEntry));
p->qTimes = 0;
tdListInit(&p->list, keyLen);
taosHashPut(pTableEntry, &suid, sizeof(uint64_t), pEntry, POINTER_BYTES);
pEntry = &p;
}
tdListAppend(&(*(STagFilterResEntry**)pEntry)->list, pKey);
uint64_t* pBuf = pMeta->pCache->sTagFilterResCache.keyBuf;
pBuf[0] = suid;
memcpy(&pBuf[1], pKey, keyLen);
ASSERT(sizeof(uint64_t) + keyLen == 24);
// add to cache.
taosLRUCacheInsert(pCache, pBuf, sizeof(uint64_t) + keyLen, pPayload, payloadLen, NULL, NULL, TAOS_LRU_PRIORITY_LOW);
return TSDB_CODE_SUCCESS;
}
@ -466,7 +521,7 @@ int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) {
*(uint64_t*)p = pEntry->suid;
SListIter iter = {0};
tdListInitIter(pEntry->pList, &iter, TD_LIST_FORWARD);
tdListInitIter(&pEntry->list, &iter, TD_LIST_FORWARD);
SListNode* pNode = NULL;
while ((pNode = tdListNext(&iter)) != NULL) {
@ -474,6 +529,8 @@ int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) {
taosLRUCacheErase(pMeta->pCache->sTagFilterResCache.pUidResCache, p, keyLen);
}
pEntry->qTimes = 0;
tdListEmpty(&pEntry->list);
return TSDB_CODE_SUCCESS;
}

View File

@ -973,19 +973,47 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
SArray* res = taosArrayInit(8, sizeof(uint64_t));
if (pScanNode->tableType == TSDB_SUPER_TABLE) {
if (pTagIndexCond) {
SIndexMetaArg metaArg = {
.metaEx = metaHandle, .idx = tsdbGetIdx(metaHandle), .ivtIdx = tsdbGetIvtIdx(metaHandle), .suid = tableUid};
// try to retrieve the result from meta cache
// generate the cache key
T_MD5_CTX context = {0};
// int64_t stt = taosGetTimestampUs();
SIdxFltStatus status = SFLT_NOT_INDEX;
code = doFilterTag(pTagIndexCond, &metaArg, res, &status);
if (code != 0 || status == SFLT_NOT_INDEX) {
qError("failed to get tableIds from index, reason:%s, suid:%" PRIu64, tstrerror(code), tableUid);
code = TDB_CODE_SUCCESS;
if (pTagIndexCond) {
char* payload = NULL;
int32_t len = 0;
nodesNodeToMsg(pTagCond, &payload, &len);
tMD5Init(&context);
tMD5Update(&context, (uint8_t*)payload, (uint32_t)len);
tMD5Final(&context);
}
bool acquired = false;
metaGetCachedTableUidList(metaHandle, pScanNode->suid, context.digest, tListLen(context.digest), res, &acquired);
if (!acquired) {
// failed to find the result in the cache, let try to calculate the results
if (pTagIndexCond) {
SIndexMetaArg metaArg = {
.metaEx = metaHandle, .idx = tsdbGetIdx(metaHandle), .ivtIdx = tsdbGetIvtIdx(metaHandle), .suid = tableUid};
SIdxFltStatus status = SFLT_NOT_INDEX;
code = doFilterTag(pTagIndexCond, &metaArg, res, &status);
if (code != 0 || status == SFLT_NOT_INDEX) {
qError("failed to get tableIds from index, reason:%s, suid:%" PRIu64, tstrerror(code), tableUid);
code = TDB_CODE_SUCCESS;
}
} else if (!pTagCond) {
vnodeGetCtbIdList(pVnode, pScanNode->suid, res);
}
} else if (!pTagCond) {
vnodeGetCtbIdList(pVnode, pScanNode->suid, res);
// let's add the filter results into meta-cache
size_t numOfTables = taosArrayGetSize(res);
size_t size = numOfTables * sizeof(uint64_t) + sizeof(int32_t);
char* pPayload = taosMemoryMalloc(size);
*(int32_t*)pPayload = numOfTables;
memcpy(pPayload + sizeof(int32_t), taosArrayGet(res, 0), numOfTables * sizeof(uint64_t));
metaUidFilterCachePut(metaHandle, pScanNode->suid, context.digest, tListLen(context.digest), pPayload,
size, 1);
}
} else { // Create one table group.
if (metaIsTableExist(metaHandle, tableUid)) {

View File

@ -60,7 +60,7 @@ int32_t tdListPrepend(SList *list, void *data) {
return 0;
}
int32_t tdListAppend(SList *list, void *data) {
int32_t tdListAppend(SList *list, const void *data) {
SListNode *node = (SListNode *)taosMemoryCalloc(1, sizeof(SListNode) + list->eleSize);
if (node == NULL) return -1;