enh(query): add cache for table meta.
This commit is contained in:
parent
be5b72b25d
commit
cd29972a4d
|
@ -298,6 +298,12 @@ typedef struct {
|
|||
SExprSupp* pExprSup; // expr supporter of aggregate operator
|
||||
} SAggOptrPushDownInfo;
|
||||
|
||||
typedef struct STableMetaCacheInfo {
|
||||
SLRUCache* pTableMetaEntryCache; // 100 by default
|
||||
uint64_t metaFetch;
|
||||
uint64_t cacheHit;
|
||||
} STableMetaCacheInfo;
|
||||
|
||||
typedef struct STableScanInfo {
|
||||
STsdbReader* dataReader;
|
||||
SReadHandle readHandle;
|
||||
|
@ -318,7 +324,7 @@ typedef struct STableScanInfo {
|
|||
int8_t scanMode;
|
||||
SAggOptrPushDownInfo pdInfo;
|
||||
int8_t assignBlockUid;
|
||||
SLRUCache* pTableMetaEntryCache; // 100 by default
|
||||
STableMetaCacheInfo metaCache;
|
||||
} STableScanInfo;
|
||||
|
||||
typedef struct STableMergeScanInfo {
|
||||
|
@ -327,7 +333,6 @@ typedef struct STableMergeScanInfo {
|
|||
int32_t tableEndIndex;
|
||||
bool hasGroupId;
|
||||
uint64_t groupId;
|
||||
SArray* dataReaders; // array of tsdbReaderT*
|
||||
SArray* queryConds; // array of queryTableDataCond
|
||||
STsdbReader* pReader;
|
||||
SReadHandle readHandle;
|
||||
|
@ -898,7 +903,7 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaul
|
|||
void doSetOperatorCompleted(SOperatorInfo* pOperator);
|
||||
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, SColMatchInfo* pColMatchInfo, SFilterInfo* pFilterInfo);
|
||||
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr,
|
||||
SSDataBlock* pBlock, int32_t rows, const char* idStr, SLRUCache* pCache);
|
||||
SSDataBlock* pBlock, int32_t rows, const char* idStr, STableMetaCacheInfo * pCache);
|
||||
|
||||
void cleanupAggSup(SAggSupporter* pAggSup);
|
||||
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
|
||||
|
|
|
@ -172,7 +172,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
|
|||
|
||||
SExprSupp* pSup = &pInfo->pseudoExprSup;
|
||||
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pRes,
|
||||
pRes->info.rows, GET_TASKID(pTaskInfo));
|
||||
pRes->info.rows, GET_TASKID(pTaskInfo), NULL);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = code;
|
||||
return NULL;
|
||||
|
@ -221,7 +221,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
|
|||
|
||||
pInfo->pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, 0);
|
||||
code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, pInfo->pRes->info.rows,
|
||||
GET_TASKID(pTaskInfo));
|
||||
GET_TASKID(pTaskInfo), NULL);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = code;
|
||||
return NULL;
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <vnode.h>
|
||||
#include "executorimpl.h"
|
||||
#include "filter.h"
|
||||
#include "function.h"
|
||||
|
@ -339,7 +340,7 @@ static void doSetTagColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlo
|
|||
SExprSupp* pSup = &pTableScanInfo->pseudoSup;
|
||||
|
||||
int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock, rows,
|
||||
GET_TASKID(pTaskInfo));
|
||||
GET_TASKID(pTaskInfo), &pTableScanInfo->metaCache);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
|
@ -505,8 +506,25 @@ typedef struct STableCachedVal {
|
|||
STag* pTags;
|
||||
} STableCachedVal;
|
||||
|
||||
static void freeTableCachedVal(void* param) {
|
||||
if (param == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
STableCachedVal* pVal = param;
|
||||
taosMemoryFree((void*)pVal->pName);
|
||||
taosMemoryFree(pVal->pTags);
|
||||
taosMemoryFree(pVal);
|
||||
}
|
||||
|
||||
//const void *key, size_t keyLen, void *value
|
||||
static void freeCachedMetaItem(const void *key, size_t keyLen, void *value) {
|
||||
taosMemoryFree((void*)key);
|
||||
freeTableCachedVal(value);
|
||||
}
|
||||
|
||||
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr,
|
||||
SSDataBlock* pBlock, int32_t rows, const char* idStr, SLRUCache* pCache) {
|
||||
SSDataBlock* pBlock, int32_t rows, const char* idStr, STableMetaCacheInfo* pCache) {
|
||||
// currently only the tbname pseudo column
|
||||
if (numOfExpr <= 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -518,13 +536,13 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int
|
|||
int32_t backupRows = pBlock->info.rows;
|
||||
pBlock->info.rows = rows;
|
||||
|
||||
bool freeReader = false;
|
||||
STableCachedVal val = {0};
|
||||
bool needFreeReader = true;
|
||||
|
||||
SMetaReader mr = {0};
|
||||
|
||||
// 1. check if it is existed in meta cache
|
||||
LRUHandle *h = taosLRUCacheLookup(pCache, &pBlock->info.uid, sizeof(pBlock->info.uid));
|
||||
if (h == NULL) {
|
||||
SMetaReader mr = {0};
|
||||
if (pCache == NULL) {
|
||||
metaReaderInit(&mr, pHandle->meta, 0);
|
||||
code = metaGetTableEntryByUid(&mr, pBlock->info.uid);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -534,11 +552,46 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int
|
|||
}
|
||||
|
||||
metaReaderReleaseLock(&mr);
|
||||
|
||||
val.pName = mr.me.name;
|
||||
val.pTags = (STag*)mr.me.ctbEntry.pTags;
|
||||
|
||||
freeReader = true;
|
||||
} else {
|
||||
STableCachedVal* pVal = taosLRUCacheValue(pCache, h);
|
||||
val = *pVal;
|
||||
pCache->metaFetch += 1;
|
||||
|
||||
LRUHandle* h = taosLRUCacheLookup(pCache->pTableMetaEntryCache, &pBlock->info.uid, sizeof(pBlock->info.uid));
|
||||
if (h == NULL) {
|
||||
metaReaderInit(&mr, pHandle->meta, 0);
|
||||
code = metaGetTableEntryByUid(&mr, pBlock->info.uid);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.uid, tstrerror(terrno), idStr);
|
||||
metaReaderClear(&mr);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
metaReaderReleaseLock(&mr);
|
||||
|
||||
STableCachedVal* pVal = taosMemoryMalloc(sizeof(STableCachedVal));
|
||||
pVal->pName = strdup(mr.me.name);
|
||||
STag* pTag = (STag*)mr.me.ctbEntry.pTags;
|
||||
|
||||
pVal->pTags = taosMemoryMalloc(pTag->len);
|
||||
memcpy(pVal->pTags, mr.me.ctbEntry.pTags, pTag->len);
|
||||
|
||||
freeReader = true;
|
||||
int32_t ret = taosLRUCacheInsert(pCache->pTableMetaEntryCache, &pBlock->info.uid, sizeof(uint64_t), pVal, sizeof(STableCachedVal), freeCachedMetaItem, NULL, TAOS_LRU_PRIORITY_LOW);
|
||||
if (ret != TAOS_LRU_STATUS_OK) {
|
||||
qError("failed to put meta into lru cache, code:%d, %s", ret, idStr);
|
||||
freeTableCachedVal(pVal);
|
||||
} else {
|
||||
val = *pVal;
|
||||
}
|
||||
} else {
|
||||
pCache->cacheHit += 1;
|
||||
STableCachedVal* pVal = taosLRUCacheValue(pCache->pTableMetaEntryCache, h);
|
||||
val = *pVal;
|
||||
}
|
||||
}
|
||||
|
||||
for (int32_t j = 0; j < numOfExpr; ++j) {
|
||||
|
@ -581,10 +634,12 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int
|
|||
}
|
||||
}
|
||||
|
||||
metaReaderClear(&mr);
|
||||
|
||||
// restore the rows
|
||||
pBlock->info.rows = backupRows;
|
||||
if (freeReader) {
|
||||
metaReaderClear(&mr);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -832,6 +887,7 @@ static void destroyTableScanOperatorInfo(void* param) {
|
|||
taosArrayDestroy(pTableScanInfo->matchInfo.pList);
|
||||
}
|
||||
|
||||
taosLRUCacheCleanup(pTableScanInfo->metaCache.pTableMetaEntryCache);
|
||||
cleanupExprSupp(&pTableScanInfo->pseudoSup);
|
||||
taosMemoryFreeClear(param);
|
||||
}
|
||||
|
@ -898,7 +954,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
|
|||
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
pInfo->pTableMetaEntryCache = taosLRUCacheInit(100, -1, .5);
|
||||
pInfo->metaCache.pTableMetaEntryCache = taosLRUCacheInit(100, -1, .5);
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo,
|
||||
getTableScannerExecInfo);
|
||||
|
@ -1650,7 +1706,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
|
|||
// currently only the tbname pseudo column
|
||||
if (pInfo->numOfPseudoExpr > 0) {
|
||||
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
|
||||
pInfo->pRes->info.rows, GET_TASKID(pTaskInfo));
|
||||
pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), NULL);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
blockDataFreeRes((SSDataBlock*)pBlock);
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
|
@ -4380,7 +4436,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
|
|||
SExprSupp* pSup = &pTableScanInfo->pseudoSup;
|
||||
|
||||
int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock,
|
||||
pBlock->info.rows, GET_TASKID(pTaskInfo));
|
||||
pBlock->info.rows, GET_TASKID(pTaskInfo), NULL);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue