diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 79112babc3..857aef78dc 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -382,6 +382,8 @@ struct STsdb { TdThreadMutex biMutex; SLRUCache *bCache; TdThreadMutex bMutex; + SLRUCache *pgCache; + TdThreadMutex pgMutex; struct STFileSystem *pFS; // new SRocksCache rCache; }; @@ -677,9 +679,9 @@ typedef struct STSnapRange STSnapRange; typedef TARRAY2(STSnapRange *) TSnapRangeArray; // disjoint snap ranges // util -int32_t tSerializeSnapRangeArray(void *buf, int32_t bufLen, TSnapRangeArray *pSnapR); -int32_t tDeserializeSnapRangeArray(void *buf, int32_t bufLen, TSnapRangeArray *pSnapR); -void tsdbSnapRangeArrayDestroy(TSnapRangeArray **ppSnap); +int32_t tSerializeSnapRangeArray(void *buf, int32_t bufLen, TSnapRangeArray *pSnapR); +int32_t tDeserializeSnapRangeArray(void *buf, int32_t bufLen, TSnapRangeArray *pSnapR); +void tsdbSnapRangeArrayDestroy(TSnapRangeArray **ppSnap); SHashObj *tsdbGetSnapRangeHash(TSnapRangeArray *pRanges); // snap partition list @@ -873,8 +875,8 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf); void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter); bool tMergeTreeNext(SMergeTree *pMTree); -void tMergeTreePinSttBlock(SMergeTree* pMTree); -void tMergeTreeUnpinSttBlock(SMergeTree* pMTree); +void tMergeTreePinSttBlock(SMergeTree *pMTree); +void tMergeTreeUnpinSttBlock(SMergeTree *pMTree); bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree); void tMergeTreeClose(SMergeTree *pMTree); @@ -909,7 +911,9 @@ int32_t tsdbCacheGetBlockIdx(SLRUCache *pCache, SDataFReader *pFileReader, LRUHa int32_t tsdbBICacheRelease(SLRUCache *pCache, LRUHandle *h); int32_t tsdbCacheGetBlockS3(SLRUCache *pCache, STsdbFD *pFD, LRUHandle **handle); -int32_t tsdbBCacheRelease(SLRUCache *pCache, LRUHandle *h); +int32_t tsdbCacheGetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, LRUHandle **handle); +int32_t tsdbCacheSetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, uint8_t *pPage); +int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h); int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey); int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index ca3fb7027f..7cea469c34 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -87,6 +87,41 @@ static void tsdbCloseBCache(STsdb *pTsdb) { } } +static int32_t tsdbOpenPgCache(STsdb *pTsdb) { + int32_t code = 0; + // SLRUCache *pCache = taosLRUCacheInit(10 * 1024 * 1024, 0, .5); + int32_t szPage = pTsdb->pVnode->config.tsdbPageSize; + + SLRUCache *pCache = taosLRUCacheInit((int64_t)tsS3BlockCacheSize * szPage, 0, .5); + if (pCache == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + taosLRUCacheSetStrictCapacity(pCache, false); + + taosThreadMutexInit(&pTsdb->pgMutex, NULL); + +_err: + pTsdb->pgCache = pCache; + return code; +} + +static void tsdbClosePgCache(STsdb *pTsdb) { + SLRUCache *pCache = pTsdb->pgCache; + if (pCache) { + int32_t elems = taosLRUCacheGetElems(pCache); + tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems); + taosLRUCacheEraseUnrefEntries(pCache); + elems = taosLRUCacheGetElems(pCache); + tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems); + + taosLRUCacheCleanup(pCache); + + taosThreadMutexDestroy(&pTsdb->bMutex); + } +} + #define ROCKS_KEY_LEN (sizeof(tb_uid_t) + sizeof(int16_t) + sizeof(int8_t)) typedef struct { @@ -1191,6 +1226,12 @@ int32_t tsdbOpenCache(STsdb *pTsdb) { goto _err; } + code = tsdbOpenPgCache(pTsdb); + if (code != TSDB_CODE_SUCCESS) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + code = tsdbOpenRocksCache(pTsdb); if (code != TSDB_CODE_SUCCESS) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -1221,6 +1262,7 @@ void tsdbCloseCache(STsdb *pTsdb) { tsdbCloseBICache(pTsdb); tsdbCloseBCache(pTsdb); + tsdbClosePgCache(pTsdb); tsdbCloseRocksCache(pTsdb); } @@ -3057,7 +3099,6 @@ static int32_t tsdbCacheLoadBlockS3(STsdbFD *pFD, uint8_t **ppBlock) { } */ int64_t block_offset = (pFD->blkno - 1) * tsS3BlockSize * pFD->szPage; - // int64_t size = 4096; code = s3GetObjectBlock(pFD->objName, block_offset, tsS3BlockSize * pFD->szPage, ppBlock); if (code != TSDB_CODE_SUCCESS) { // taosMemoryFree(pBlock); @@ -3123,10 +3164,42 @@ int32_t tsdbCacheGetBlockS3(SLRUCache *pCache, STsdbFD *pFD, LRUHandle **handle) return code; } -int32_t tsdbBCacheRelease(SLRUCache *pCache, LRUHandle *h) { +int32_t tsdbCacheGetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, LRUHandle **handle) { int32_t code = 0; + char key[128] = {0}; + int keyLen = 0; - taosLRUCacheRelease(pCache, h, false); + getBCacheKey(pFD->fid, pFD->cid, pFD->blkno, key, &keyLen); + *handle = taosLRUCacheLookup(pCache, key, keyLen); + + return code; +} + +int32_t tsdbCacheSetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, uint8_t *pPage) { + int32_t code = 0; + char key[128] = {0}; + int keyLen = 0; + LRUHandle *handle = NULL; + + getBCacheKey(pFD->fid, pFD->cid, pgno, key, &keyLen); + taosThreadMutexLock(&pFD->pTsdb->pgMutex); + handle = taosLRUCacheLookup(pFD->pTsdb->pgCache, key, keyLen); + if (!handle) { + size_t charge = pFD->szPage; + _taos_lru_deleter_t deleter = deleteBCache; + uint8_t *pPg = taosMemoryMalloc(charge); + memcpy(pPg, pPage, charge); + + LRUStatus status = + taosLRUCacheInsert(pCache, key, keyLen, pPg, charge, deleter, &handle, TAOS_LRU_PRIORITY_LOW, NULL); + if (status != TAOS_LRU_STATUS_OK) { + // ignore cache updating if not ok + // code = TSDB_CODE_OUT_OF_MEMORY; + } + } + taosThreadMutexUnlock(&pFD->pTsdb->pgMutex); + + tsdbCacheRelease(pFD->pTsdb->pgCache, handle); return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 65f0f1b94d..def9a73d10 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -178,7 +178,7 @@ static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno) { pFD->blkno = (pgno + tsS3BlockSize - 1) / tsS3BlockSize; code = tsdbCacheGetBlockS3(pFD->pTsdb->bCache, pFD, &handle); if (code != TSDB_CODE_SUCCESS || handle == NULL) { - tsdbBCacheRelease(pFD->pTsdb->bCache, handle); + tsdbCacheRelease(pFD->pTsdb->bCache, handle); if (code == TSDB_CODE_SUCCESS && !handle) { code = TSDB_CODE_OUT_OF_MEMORY; } @@ -190,7 +190,7 @@ static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno) { int64_t blk_offset = (pFD->blkno - 1) * tsS3BlockSize * pFD->szPage; memcpy(pFD->pBuf, pBlock + (offset - blk_offset), pFD->szPage); - tsdbBCacheRelease(pFD->pTsdb->bCache, handle); + tsdbCacheRelease(pFD->pTsdb->bCache, handle); } else { // seek int64_t n = taosLSeekFile(pFD->pFD, offset, SEEK_SET); @@ -254,7 +254,7 @@ _exit: return code; } -int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size) { +static int32_t tsdbReadFileImp(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size) { int32_t code = 0; int64_t n = 0; int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage); @@ -283,6 +283,117 @@ _exit: return code; } +static int32_t tsdbReadFileS3(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size) { + int32_t code = 0; + int64_t n = 0; + int32_t szPgCont = PAGE_CONTENT_SIZE(pFD->szPage); + int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage); + int64_t pgno = OFFSET_PGNO(fOffset, pFD->szPage); + int64_t bOffset = fOffset % pFD->szPage; + + ASSERT(bOffset < szPgCont); + + // 1, find pgnoStart & pgnoEnd to fetch from s3, if all pgs are local, no need to fetch + // 2, fetch pgnoStart ~ pgnoEnd from s3 + // 3, store pgs to pcache & last pg to pFD->pBuf + // 4, deliver pgs to [pBuf, pBuf + size) + + while (n < size) { + if (pFD->pgno != pgno) { + LRUHandle *handle = NULL; + code = tsdbCacheGetPageS3(pFD->pTsdb->pgCache, pFD, pgno, &handle); + if (code != TSDB_CODE_SUCCESS) { + if (handle) { + tsdbCacheRelease(pFD->pTsdb->pgCache, handle); + } + goto _exit; + } + + if (!handle) { + break; + } + + uint8_t *pPage = (uint8_t *)taosLRUCacheValue(pFD->pTsdb->pgCache, handle); + memcpy(pFD->pBuf, pPage, pFD->szPage); + tsdbCacheRelease(pFD->pTsdb->pgCache, handle); + + // check + if (pgno > 1 && !taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _exit; + } + + pFD->pgno = pgno; + } + + int64_t nRead = TMIN(szPgCont - bOffset, size - n); + memcpy(pBuf + n, pFD->pBuf + bOffset, nRead); + + n += nRead; + pgno++; + bOffset = 0; + } + + if (n < size) { + // 2, retrieve pgs from s3 + uint8_t *pBlock = NULL; + int64_t retrieve_offset = PAGE_OFFSET(pgno, pFD->szPage); + int64_t pgnoEnd = pgno - 1 + (size - n + szPgCont - 1) / szPgCont; + int64_t retrieve_size = (pgnoEnd - pgno + 1) * pFD->szPage; + code = s3GetObjectBlock(pFD->objName, retrieve_offset, retrieve_size, &pBlock); + if (code != TSDB_CODE_SUCCESS) { + goto _exit; + } + + // 3, Store Pages in Cache + int nPage = pgnoEnd - pgno + 1; + for (int i = 0; i < nPage; ++i) { + tsdbCacheSetPageS3(pFD->pTsdb->pgCache, pFD, pgno, pBlock + i * pFD->szPage); + + memcpy(pFD->pBuf, pBlock + i * pFD->szPage, pFD->szPage); + + // check + if (pgno > 1 && !taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _exit; + } + + pFD->pgno = pgno; + + int64_t nRead = TMIN(szPgCont - bOffset, size - n); + memcpy(pBuf + n, pFD->pBuf + bOffset, nRead); + + n += nRead; + pgno++; + bOffset = 0; + } + + taosMemoryFree(pBlock); + } + +_exit: + return code; +} + +int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size) { + int32_t code = 0; + if (!pFD->pFD) { + code = tsdbOpenFileImpl(pFD); + if (code) { + goto _exit; + } + } + + if (pFD->s3File && tsS3BlockSize < 0) { + return tsdbReadFileS3(pFD, offset, pBuf, size); + } else { + return tsdbReadFileImp(pFD, offset, pBuf, size); + } + +_exit: + return code; +} + int32_t tsdbFsyncFile(STsdbFD *pFD) { int32_t code = 0;