enh: optimizing stt file reading in last query
This commit is contained in:
parent
25778754e2
commit
62377619ba
|
@ -1136,6 +1136,7 @@ typedef struct {
|
||||||
int64_t numOfInsertSuccessReqs;
|
int64_t numOfInsertSuccessReqs;
|
||||||
int64_t numOfBatchInsertReqs;
|
int64_t numOfBatchInsertReqs;
|
||||||
int64_t numOfBatchInsertSuccessReqs;
|
int64_t numOfBatchInsertSuccessReqs;
|
||||||
|
int32_t numOfCacheTables;
|
||||||
} SVnodeLoad;
|
} SVnodeLoad;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -55,6 +55,8 @@ void *taosLRUCacheValue(SLRUCache *cache, LRUHandle *handle);
|
||||||
size_t taosLRUCacheGetUsage(SLRUCache *cache);
|
size_t taosLRUCacheGetUsage(SLRUCache *cache);
|
||||||
size_t taosLRUCacheGetPinnedUsage(SLRUCache *cache);
|
size_t taosLRUCacheGetPinnedUsage(SLRUCache *cache);
|
||||||
|
|
||||||
|
int32_t taosLRUCacheGetElems(SLRUCache *cache);
|
||||||
|
|
||||||
void taosLRUCacheSetCapacity(SLRUCache *cache, size_t capacity);
|
void taosLRUCacheSetCapacity(SLRUCache *cache, size_t capacity);
|
||||||
size_t taosLRUCacheGetCapacity(SLRUCache *cache);
|
size_t taosLRUCacheGetCapacity(SLRUCache *cache);
|
||||||
|
|
||||||
|
|
|
@ -201,6 +201,7 @@ int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int6
|
||||||
void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity);
|
void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity);
|
||||||
size_t tsdbCacheGetCapacity(SVnode *pVnode);
|
size_t tsdbCacheGetCapacity(SVnode *pVnode);
|
||||||
size_t tsdbCacheGetUsage(SVnode *pVnode);
|
size_t tsdbCacheGetUsage(SVnode *pVnode);
|
||||||
|
int32_t tsdbCacheGetElems(SVnode *pVnode);
|
||||||
|
|
||||||
// tq
|
// tq
|
||||||
typedef struct SMetaTableInfo {
|
typedef struct SMetaTableInfo {
|
||||||
|
|
|
@ -748,7 +748,7 @@ struct SDiskDataBuilder {
|
||||||
|
|
||||||
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
|
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
|
||||||
STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo,
|
STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo,
|
||||||
bool destroyLoadInfo, const char *idStr);
|
bool destroyLoadInfo, const char *idStr, bool strictTimeRange);
|
||||||
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
|
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
|
||||||
bool tMergeTreeNext(SMergeTree *pMTree);
|
bool tMergeTreeNext(SMergeTree *pMTree);
|
||||||
TSDBROW tMergeTreeGetRow(SMergeTree *pMTree);
|
TSDBROW tMergeTreeGetRow(SMergeTree *pMTree);
|
||||||
|
|
|
@ -632,8 +632,8 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa
|
||||||
}
|
}
|
||||||
|
|
||||||
tMergeTreeOpen(&state->mergeTree, 1, *state->pDataFReader, state->suid, state->uid,
|
tMergeTreeOpen(&state->mergeTree, 1, *state->pDataFReader, state->suid, state->uid,
|
||||||
&(STimeWindow){.skey = TSKEY_MIN, .ekey = TSKEY_MAX},
|
&(STimeWindow){.skey = state->lastTs, .ekey = TSKEY_MAX},
|
||||||
&(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, state->pLoadInfo, false, NULL);
|
&(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, state->pLoadInfo, false, NULL, true);
|
||||||
state->pMergeTree = &state->mergeTree;
|
state->pMergeTree = &state->mergeTree;
|
||||||
bool hasVal = tMergeTreeNext(&state->mergeTree);
|
bool hasVal = tMergeTreeNext(&state->mergeTree);
|
||||||
if (!hasVal) {
|
if (!hasVal) {
|
||||||
|
@ -1727,6 +1727,15 @@ size_t tsdbCacheGetUsage(SVnode *pVnode) {
|
||||||
return usage;
|
return usage;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tsdbCacheGetElems(SVnode *pVnode) {
|
||||||
|
int32_t elems = 0;
|
||||||
|
if (pVnode->pTsdb != NULL) {
|
||||||
|
elems = taosLRUCacheGetElems(pVnode->pTsdb->lruCache);
|
||||||
|
}
|
||||||
|
|
||||||
|
return elems;
|
||||||
|
}
|
||||||
|
|
||||||
static void getBICacheKey(int32_t fid, int64_t commitID, char *key, int *len) {
|
static void getBICacheKey(int32_t fid, int64_t commitID, char *key, int *len) {
|
||||||
struct {
|
struct {
|
||||||
int32_t fid;
|
int32_t fid;
|
||||||
|
|
|
@ -408,6 +408,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
||||||
|
|
||||||
if (hasNotNullRow) {
|
if (hasNotNullRow) {
|
||||||
pr->lastTs = minTs;
|
pr->lastTs = minTs;
|
||||||
|
tsdbInfo("%p have cache read %d tables", pr, i + 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -418,6 +419,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
||||||
saveOneRow(pLastCols, pResBlock, pr, slotIds, pRes, pr->idstr);
|
saveOneRow(pLastCols, pResBlock, pr, slotIds, pRes, pr->idstr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tsdbInfo("have cached %d tables", taosLRUCacheGetElems(lruCache));
|
||||||
} else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) {
|
} else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) {
|
||||||
for (int32_t i = pr->tableIndex; i < pr->numOfTables; ++i) {
|
for (int32_t i = pr->tableIndex; i < pr->numOfTables; ++i) {
|
||||||
STableKeyInfo* pKeyInfo = &pr->pTableList[i];
|
STableKeyInfo* pKeyInfo = &pr->pTableList[i];
|
||||||
|
|
|
@ -31,7 +31,8 @@ struct SLDataIter {
|
||||||
SSttBlockLoadInfo *pBlockLoadInfo;
|
SSttBlockLoadInfo *pBlockLoadInfo;
|
||||||
};
|
};
|
||||||
|
|
||||||
SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols, int32_t numOfSttTrigger) {
|
SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols,
|
||||||
|
int32_t numOfSttTrigger) {
|
||||||
SSttBlockLoadInfo *pLoadInfo = taosMemoryCalloc(numOfSttTrigger, sizeof(SSttBlockLoadInfo));
|
SSttBlockLoadInfo *pLoadInfo = taosMemoryCalloc(numOfSttTrigger, sizeof(SSttBlockLoadInfo));
|
||||||
if (pLoadInfo == NULL) {
|
if (pLoadInfo == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -162,7 +163,8 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) {
|
||||||
pInfo->blockIndex[pInfo->currentLoadBlockIndex] = pIter->iSttBlk;
|
pInfo->blockIndex[pInfo->currentLoadBlockIndex] = pIter->iSttBlk;
|
||||||
pIter->iRow = (pIter->backward) ? pInfo->blockData[pInfo->currentLoadBlockIndex].nRow : -1;
|
pIter->iRow = (pIter->backward) ? pInfo->blockData[pInfo->currentLoadBlockIndex].nRow : -1;
|
||||||
|
|
||||||
tsdbDebug("last block index list:%d, %d, rowIndex:%d %s", pInfo->blockIndex[0], pInfo->blockIndex[1], pIter->iRow, idStr);
|
tsdbDebug("last block index list:%d, %d, rowIndex:%d %s", pInfo->blockIndex[0], pInfo->blockIndex[1], pIter->iRow,
|
||||||
|
idStr);
|
||||||
return &pInfo->blockData[pInfo->currentLoadBlockIndex];
|
return &pInfo->blockData[pInfo->currentLoadBlockIndex];
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
@ -263,7 +265,7 @@ static int32_t binarySearchForStartRowIndex(uint64_t *uidList, int32_t num, uint
|
||||||
|
|
||||||
int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid,
|
int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid,
|
||||||
uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo *pBlockLoadInfo,
|
uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo *pBlockLoadInfo,
|
||||||
const char *idStr) {
|
const char *idStr, bool strictTimeRange) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
*pIter = taosMemoryCalloc(1, sizeof(SLDataIter));
|
*pIter = taosMemoryCalloc(1, sizeof(SLDataIter));
|
||||||
|
@ -340,6 +342,16 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
|
||||||
if ((*pIter)->iSttBlk != -1) {
|
if ((*pIter)->iSttBlk != -1) {
|
||||||
(*pIter)->pSttBlk = taosArrayGet(pBlockLoadInfo->aSttBlk, (*pIter)->iSttBlk);
|
(*pIter)->pSttBlk = taosArrayGet(pBlockLoadInfo->aSttBlk, (*pIter)->iSttBlk);
|
||||||
(*pIter)->iRow = ((*pIter)->backward) ? (*pIter)->pSttBlk->nRow : -1;
|
(*pIter)->iRow = ((*pIter)->backward) ? (*pIter)->pSttBlk->nRow : -1;
|
||||||
|
|
||||||
|
if ((!backward) && ((strictTimeRange && (*pIter)->pSttBlk->minKey >= (*pIter)->timeWindow.ekey) ||
|
||||||
|
(!strictTimeRange && (*pIter)->pSttBlk->minKey > (*pIter)->timeWindow.ekey))) {
|
||||||
|
(*pIter)->pSttBlk = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (backward && ((strictTimeRange && (*pIter)->pSttBlk->maxKey <= (*pIter)->timeWindow.skey) ||
|
||||||
|
(!strictTimeRange && (*pIter)->pSttBlk->maxKey < (*pIter)->timeWindow.skey))) {
|
||||||
|
(*pIter)->pSttBlk = NULL;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
@ -551,7 +563,7 @@ static FORCE_INLINE int32_t tLDataIterDescCmprFn(const SRBTreeNode *p1, const SR
|
||||||
|
|
||||||
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
|
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
|
||||||
STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo,
|
STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo,
|
||||||
bool destroyLoadInfo, const char *idStr) {
|
bool destroyLoadInfo, const char *idStr, bool strictTimeRange) {
|
||||||
pMTree->backward = backward;
|
pMTree->backward = backward;
|
||||||
pMTree->pIter = NULL;
|
pMTree->pIter = NULL;
|
||||||
pMTree->pIterList = taosArrayInit(4, POINTER_BYTES);
|
pMTree->pIterList = taosArrayInit(4, POINTER_BYTES);
|
||||||
|
@ -573,7 +585,7 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead
|
||||||
for (int32_t i = 0; i < pFReader->pSet->nSttF; ++i) { // open all last file
|
for (int32_t i = 0; i < pFReader->pSet->nSttF; ++i) { // open all last file
|
||||||
struct SLDataIter *pIter = NULL;
|
struct SLDataIter *pIter = NULL;
|
||||||
code = tLDataIterOpen(&pIter, pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange,
|
code = tLDataIterOpen(&pIter, pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange,
|
||||||
&pMTree->pLoadInfo[i], pMTree->idStr);
|
&pMTree->pLoadInfo[i], pMTree->idStr, strictTimeRange);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
|
@ -2481,7 +2481,7 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
|
||||||
pScanInfo->uid, pReader->idStr);
|
pScanInfo->uid, pReader->idStr);
|
||||||
int32_t code = tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC),
|
int32_t code = tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC),
|
||||||
pReader->pFileReader, pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange,
|
pReader->pFileReader, pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange,
|
||||||
pLBlockReader->pInfo, false, pReader->idStr);
|
pLBlockReader->pInfo, false, pReader->idStr, false);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -3927,7 +3927,8 @@ int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t n
|
||||||
if (code) {
|
if (code) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
pReader->status.uidList.tableUidList = (uint64_t*)taosMemoryRealloc(pReader->status.uidList.tableUidList, sizeof(uint64_t) * num);
|
pReader->status.uidList.tableUidList =
|
||||||
|
(uint64_t*)taosMemoryRealloc(pReader->status.uidList.tableUidList, sizeof(uint64_t) * num);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosHashClear(pReader->status.pTableMap);
|
taosHashClear(pReader->status.pTableMap);
|
||||||
|
|
|
@ -382,6 +382,7 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
|
||||||
pLoad->syncRestore = state.restored;
|
pLoad->syncRestore = state.restored;
|
||||||
pLoad->syncCanRead = state.canRead;
|
pLoad->syncCanRead = state.canRead;
|
||||||
pLoad->cacheUsage = tsdbCacheGetUsage(pVnode);
|
pLoad->cacheUsage = tsdbCacheGetUsage(pVnode);
|
||||||
|
pLoad->numOfCacheTables = tsdbCacheGetElems(pVnode);
|
||||||
pLoad->numOfTables = metaGetTbNum(pVnode->pMeta);
|
pLoad->numOfTables = metaGetTbNum(pVnode->pMeta);
|
||||||
pLoad->numOfTimeSeries = metaGetTimeSeriesNum(pVnode->pMeta);
|
pLoad->numOfTimeSeries = metaGetTimeSeriesNum(pVnode->pMeta);
|
||||||
pLoad->totalStorage = (int64_t)3 * 1073741824;
|
pLoad->totalStorage = (int64_t)3 * 1073741824;
|
||||||
|
|
|
@ -580,6 +580,16 @@ static size_t taosLRUCacheShardGetUsage(SLRUCacheShard *shard) {
|
||||||
return usage;
|
return usage;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t taosLRUCacheShardGetElems(SLRUCacheShard *shard) {
|
||||||
|
int32_t elems = 0;
|
||||||
|
|
||||||
|
taosThreadMutexLock(&shard->mutex);
|
||||||
|
elems = shard->table.elems;
|
||||||
|
taosThreadMutexUnlock(&shard->mutex);
|
||||||
|
|
||||||
|
return elems;
|
||||||
|
}
|
||||||
|
|
||||||
static size_t taosLRUCacheShardGetPinnedUsage(SLRUCacheShard *shard) {
|
static size_t taosLRUCacheShardGetPinnedUsage(SLRUCacheShard *shard) {
|
||||||
size_t usage = 0;
|
size_t usage = 0;
|
||||||
|
|
||||||
|
@ -755,6 +765,16 @@ size_t taosLRUCacheGetUsage(SLRUCache *cache) {
|
||||||
return usage;
|
return usage;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t taosLRUCacheGetElems(SLRUCache *cache) {
|
||||||
|
int32_t elems = 0;
|
||||||
|
|
||||||
|
for (int i = 0; i < cache->numShards; ++i) {
|
||||||
|
elems += taosLRUCacheShardGetElems(&cache->shards[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
return elems;
|
||||||
|
}
|
||||||
|
|
||||||
size_t taosLRUCacheGetPinnedUsage(SLRUCache *cache) {
|
size_t taosLRUCacheGetPinnedUsage(SLRUCache *cache) {
|
||||||
size_t usage = 0;
|
size_t usage = 0;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue