feature: replace two lru cache with hash
This commit is contained in:
parent
5dd50c10a2
commit
d01e82439e
|
@ -280,8 +280,7 @@ typedef struct STmsSortRowIdInfo {
|
|||
char idxPath[PATH_MAX];
|
||||
TdFilePtr dataFile;
|
||||
char dataPath[PATH_MAX];
|
||||
SLRUCache* pBlkInfoCache; // blkId->(offset, len)
|
||||
SLRUCache* pBlkDataCache; // blkId->SSDataBlock*
|
||||
SSHashObj* pBlkDataHash; // blkId->SSDataBlock*
|
||||
} STmsSortRowIdInfo;
|
||||
|
||||
typedef struct STableMergeScanInfo {
|
||||
|
|
|
@ -3288,50 +3288,28 @@ static int32_t transformIntoSortInputBlock(STableMergeScanInfo* pInfo, SSDataBlo
|
|||
return code;
|
||||
}
|
||||
|
||||
static void deleteBlockInfoCache(const void *key, size_t keyLen, void *value, void *ud) {
|
||||
taosMemoryFree(value);
|
||||
}
|
||||
|
||||
static void deleteBlockDataCache(const void *key, size_t keyLen, void *value, void *ud) {
|
||||
SSDataBlock* pBlock = value;
|
||||
blockDataDestroy(pBlock);
|
||||
}
|
||||
|
||||
static int32_t retrieveSourceBlock(STableMergeScanInfo* pInfo, int32_t blockId, SSDataBlock** ppBlock) {
|
||||
STmsSortRowIdInfo* pSortInfo = &pInfo->tmsSortRowIdInfo;
|
||||
|
||||
LRUHandle* hBlk = taosLRUCacheLookup(pSortInfo->pBlkDataCache, &blockId, sizeof(blockId));
|
||||
if (hBlk) {
|
||||
SSDataBlock* pBlk = taosLRUCacheValue(pSortInfo->pBlkDataCache, hBlk);
|
||||
taosLRUCacheRelease(pSortInfo->pBlkDataCache, hBlk, false);
|
||||
*ppBlock = pBlk;
|
||||
} else {
|
||||
STmsSortBlockInfo* blkInfo = NULL;
|
||||
LRUHandle* hBlkInfo = taosLRUCacheLookup(pSortInfo->pBlkInfoCache, &blockId, sizeof(blockId));
|
||||
if (hBlkInfo) {
|
||||
blkInfo = taosLRUCacheValue(pSortInfo->pBlkInfoCache, hBlkInfo);
|
||||
taosLRUCacheRelease(pSortInfo->pBlkInfoCache, hBlkInfo, false);
|
||||
} else {
|
||||
blkInfo = taosMemoryMalloc(sizeof(STmsSortBlockInfo));
|
||||
taosLSeekFile(pSortInfo->idxFile, blockId * sizeof(STmsSortBlockInfo), SEEK_SET);
|
||||
taosReadFile(pSortInfo->idxFile, blkInfo, sizeof(STmsSortBlockInfo));
|
||||
ASSERT(blkInfo->blkId == blockId);
|
||||
taosLRUCacheInsert(pSortInfo->pBlkInfoCache, &blockId, sizeof(blockId), blkInfo, 1, deleteBlockInfoCache,
|
||||
NULL, TAOS_LRU_PRIORITY_LOW, NULL);
|
||||
}
|
||||
{
|
||||
taosLSeekFile(pSortInfo->dataFile, blkInfo->offset, SEEK_SET);
|
||||
char* buf = taosMemoryMalloc(blkInfo->length);
|
||||
taosReadFile(pSortInfo->dataFile, buf, blkInfo->length);
|
||||
SSDataBlock* pBlock = createOneDataBlock(pInfo->pReaderBlock, false);
|
||||
blockDataFromBuf(pBlock, buf);
|
||||
taosMemoryFree(buf);
|
||||
void* pBlkHashVal = tSimpleHashGet(pSortInfo->pBlkDataHash, &blockId, sizeof(blockId));
|
||||
if (pBlkHashVal) {
|
||||
*ppBlock = *(SSDataBlock**)pBlkHashVal;
|
||||
}
|
||||
else {
|
||||
STmsSortBlockInfo blkInfo = {0};
|
||||
|
||||
*ppBlock = pBlock;
|
||||
taosLSeekFile(pSortInfo->idxFile, blockId * sizeof(STmsSortBlockInfo), SEEK_SET);
|
||||
taosReadFile(pSortInfo->idxFile, &blkInfo, sizeof(STmsSortBlockInfo));
|
||||
taosLSeekFile(pSortInfo->dataFile, blkInfo.offset, SEEK_SET);
|
||||
char* buf = taosMemoryMalloc(blkInfo.length);
|
||||
taosReadFile(pSortInfo->dataFile, buf, blkInfo.length);
|
||||
SSDataBlock* pBlock = createOneDataBlock(pInfo->pReaderBlock, false);
|
||||
blockDataFromBuf(pBlock, buf);
|
||||
taosMemoryFree(buf);
|
||||
|
||||
taosLRUCacheInsert(pSortInfo->pBlkDataCache, &blockId, sizeof(blockId), pBlock, 1, deleteBlockDataCache,
|
||||
NULL, TAOS_LRU_PRIORITY_LOW, NULL);
|
||||
}
|
||||
*ppBlock = pBlock;
|
||||
tSimpleHashPut(pSortInfo->pBlkDataHash, &blockId, sizeof(blockId), &pBlock, sizeof(pBlock));
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
@ -3356,7 +3334,10 @@ void appendOneRowIdRowToDataBlock(STableMergeScanInfo* pInfo, SSDataBlock* pBloc
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (rowIdx == pSrcBlk->info.rows - 1) {
|
||||
tSimpleHashRemove(pInfo->tmsSortRowIdInfo.pBlkDataHash, &blkId, sizeof(blkId));
|
||||
blockDataDestroy(pSrcBlk);
|
||||
}
|
||||
pBlock->info.dataLoad = 1;
|
||||
pBlock->info.scanFlag = ((SDataBlockInfo*)tsortGetBlockInfo(pTupleHandle))->scanFlag;
|
||||
pBlock->info.rows += 1;
|
||||
|
@ -3513,10 +3494,7 @@ int32_t startRowIdSort(STableMergeScanInfo *pInfo) {
|
|||
pSort->idxFile = taosOpenFile(pSort->idxPath, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL);
|
||||
taosGetTmpfilePath(tsTempDir, "tms-block-data", pSort->dataPath);
|
||||
pSort->dataFile = taosOpenFile(pSort->dataPath, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL);
|
||||
pSort->pBlkInfoCache = taosLRUCacheInit(2048, -1, 0.5);
|
||||
taosLRUCacheSetStrictCapacity(pSort->pBlkInfoCache, false);
|
||||
pSort->pBlkDataCache = taosLRUCacheInit(2048, -1, 0.5);
|
||||
taosLRUCacheSetStrictCapacity(pSort->pBlkInfoCache, false);
|
||||
pSort->pBlkDataHash = tSimpleHashInit(2048, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -3527,10 +3505,7 @@ int32_t stopRowIdSort(STableMergeScanInfo *pInfo) {
|
|||
taosCloseFile(&pSort->dataFile);
|
||||
taosRemoveFile(pSort->dataPath);
|
||||
|
||||
taosLRUCacheEraseUnrefEntries(pSort->pBlkInfoCache);
|
||||
taosLRUCacheCleanup(pSort->pBlkInfoCache);
|
||||
taosLRUCacheEraseUnrefEntries(pSort->pBlkDataCache);
|
||||
taosLRUCacheCleanup(pSort->pBlkDataCache);
|
||||
tSimpleHashCleanup(pSort->pBlkDataHash);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue