tsdb/pg-cache: new page cache for tsdb s3 read file page
This commit is contained in:
parent
867ce92dcc
commit
a61502411d
|
@ -382,6 +382,8 @@ struct STsdb {
|
||||||
TdThreadMutex biMutex;
|
TdThreadMutex biMutex;
|
||||||
SLRUCache *bCache;
|
SLRUCache *bCache;
|
||||||
TdThreadMutex bMutex;
|
TdThreadMutex bMutex;
|
||||||
|
SLRUCache *pgCache;
|
||||||
|
TdThreadMutex pgMutex;
|
||||||
struct STFileSystem *pFS; // new
|
struct STFileSystem *pFS; // new
|
||||||
SRocksCache rCache;
|
SRocksCache rCache;
|
||||||
};
|
};
|
||||||
|
@ -909,7 +911,9 @@ int32_t tsdbCacheGetBlockIdx(SLRUCache *pCache, SDataFReader *pFileReader, LRUHa
|
||||||
int32_t tsdbBICacheRelease(SLRUCache *pCache, LRUHandle *h);
|
int32_t tsdbBICacheRelease(SLRUCache *pCache, LRUHandle *h);
|
||||||
|
|
||||||
int32_t tsdbCacheGetBlockS3(SLRUCache *pCache, STsdbFD *pFD, LRUHandle **handle);
|
int32_t tsdbCacheGetBlockS3(SLRUCache *pCache, STsdbFD *pFD, LRUHandle **handle);
|
||||||
int32_t tsdbBCacheRelease(SLRUCache *pCache, LRUHandle *h);
|
int32_t tsdbCacheGetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, LRUHandle **handle);
|
||||||
|
int32_t tsdbCacheSetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, uint8_t *pPage);
|
||||||
|
int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h);
|
||||||
|
|
||||||
int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
|
int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
|
||||||
int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
|
int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
|
||||||
|
|
|
@ -87,6 +87,41 @@ static void tsdbCloseBCache(STsdb *pTsdb) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbOpenPgCache(STsdb *pTsdb) {
|
||||||
|
int32_t code = 0;
|
||||||
|
// SLRUCache *pCache = taosLRUCacheInit(10 * 1024 * 1024, 0, .5);
|
||||||
|
int32_t szPage = pTsdb->pVnode->config.tsdbPageSize;
|
||||||
|
|
||||||
|
SLRUCache *pCache = taosLRUCacheInit((int64_t)tsS3BlockCacheSize * szPage, 0, .5);
|
||||||
|
if (pCache == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosLRUCacheSetStrictCapacity(pCache, false);
|
||||||
|
|
||||||
|
taosThreadMutexInit(&pTsdb->pgMutex, NULL);
|
||||||
|
|
||||||
|
_err:
|
||||||
|
pTsdb->pgCache = pCache;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void tsdbClosePgCache(STsdb *pTsdb) {
|
||||||
|
SLRUCache *pCache = pTsdb->pgCache;
|
||||||
|
if (pCache) {
|
||||||
|
int32_t elems = taosLRUCacheGetElems(pCache);
|
||||||
|
tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
|
||||||
|
taosLRUCacheEraseUnrefEntries(pCache);
|
||||||
|
elems = taosLRUCacheGetElems(pCache);
|
||||||
|
tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
|
||||||
|
|
||||||
|
taosLRUCacheCleanup(pCache);
|
||||||
|
|
||||||
|
taosThreadMutexDestroy(&pTsdb->bMutex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#define ROCKS_KEY_LEN (sizeof(tb_uid_t) + sizeof(int16_t) + sizeof(int8_t))
|
#define ROCKS_KEY_LEN (sizeof(tb_uid_t) + sizeof(int16_t) + sizeof(int8_t))
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -1191,6 +1226,12 @@ int32_t tsdbOpenCache(STsdb *pTsdb) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
code = tsdbOpenPgCache(pTsdb);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
code = tsdbOpenRocksCache(pTsdb);
|
code = tsdbOpenRocksCache(pTsdb);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -1221,6 +1262,7 @@ void tsdbCloseCache(STsdb *pTsdb) {
|
||||||
|
|
||||||
tsdbCloseBICache(pTsdb);
|
tsdbCloseBICache(pTsdb);
|
||||||
tsdbCloseBCache(pTsdb);
|
tsdbCloseBCache(pTsdb);
|
||||||
|
tsdbClosePgCache(pTsdb);
|
||||||
tsdbCloseRocksCache(pTsdb);
|
tsdbCloseRocksCache(pTsdb);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3057,7 +3099,6 @@ static int32_t tsdbCacheLoadBlockS3(STsdbFD *pFD, uint8_t **ppBlock) {
|
||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
int64_t block_offset = (pFD->blkno - 1) * tsS3BlockSize * pFD->szPage;
|
int64_t block_offset = (pFD->blkno - 1) * tsS3BlockSize * pFD->szPage;
|
||||||
// int64_t size = 4096;
|
|
||||||
code = s3GetObjectBlock(pFD->objName, block_offset, tsS3BlockSize * pFD->szPage, ppBlock);
|
code = s3GetObjectBlock(pFD->objName, block_offset, tsS3BlockSize * pFD->szPage, ppBlock);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
// taosMemoryFree(pBlock);
|
// taosMemoryFree(pBlock);
|
||||||
|
@ -3123,10 +3164,42 @@ int32_t tsdbCacheGetBlockS3(SLRUCache *pCache, STsdbFD *pFD, LRUHandle **handle)
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbBCacheRelease(SLRUCache *pCache, LRUHandle *h) {
|
int32_t tsdbCacheGetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, LRUHandle **handle) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
char key[128] = {0};
|
||||||
|
int keyLen = 0;
|
||||||
|
|
||||||
taosLRUCacheRelease(pCache, h, false);
|
getBCacheKey(pFD->fid, pFD->cid, pFD->blkno, key, &keyLen);
|
||||||
|
*handle = taosLRUCacheLookup(pCache, key, keyLen);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsdbCacheSetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, uint8_t *pPage) {
|
||||||
|
int32_t code = 0;
|
||||||
|
char key[128] = {0};
|
||||||
|
int keyLen = 0;
|
||||||
|
LRUHandle *handle = NULL;
|
||||||
|
|
||||||
|
getBCacheKey(pFD->fid, pFD->cid, pgno, key, &keyLen);
|
||||||
|
taosThreadMutexLock(&pFD->pTsdb->pgMutex);
|
||||||
|
handle = taosLRUCacheLookup(pFD->pTsdb->pgCache, key, keyLen);
|
||||||
|
if (!handle) {
|
||||||
|
size_t charge = pFD->szPage;
|
||||||
|
_taos_lru_deleter_t deleter = deleteBCache;
|
||||||
|
uint8_t *pPg = taosMemoryMalloc(charge);
|
||||||
|
memcpy(pPg, pPage, charge);
|
||||||
|
|
||||||
|
LRUStatus status =
|
||||||
|
taosLRUCacheInsert(pCache, key, keyLen, pPg, charge, deleter, &handle, TAOS_LRU_PRIORITY_LOW, NULL);
|
||||||
|
if (status != TAOS_LRU_STATUS_OK) {
|
||||||
|
// ignore cache updating if not ok
|
||||||
|
// code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
taosThreadMutexUnlock(&pFD->pTsdb->pgMutex);
|
||||||
|
|
||||||
|
tsdbCacheRelease(pFD->pTsdb->pgCache, handle);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -178,7 +178,7 @@ static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno) {
|
||||||
pFD->blkno = (pgno + tsS3BlockSize - 1) / tsS3BlockSize;
|
pFD->blkno = (pgno + tsS3BlockSize - 1) / tsS3BlockSize;
|
||||||
code = tsdbCacheGetBlockS3(pFD->pTsdb->bCache, pFD, &handle);
|
code = tsdbCacheGetBlockS3(pFD->pTsdb->bCache, pFD, &handle);
|
||||||
if (code != TSDB_CODE_SUCCESS || handle == NULL) {
|
if (code != TSDB_CODE_SUCCESS || handle == NULL) {
|
||||||
tsdbBCacheRelease(pFD->pTsdb->bCache, handle);
|
tsdbCacheRelease(pFD->pTsdb->bCache, handle);
|
||||||
if (code == TSDB_CODE_SUCCESS && !handle) {
|
if (code == TSDB_CODE_SUCCESS && !handle) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
@ -190,7 +190,7 @@ static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno) {
|
||||||
int64_t blk_offset = (pFD->blkno - 1) * tsS3BlockSize * pFD->szPage;
|
int64_t blk_offset = (pFD->blkno - 1) * tsS3BlockSize * pFD->szPage;
|
||||||
memcpy(pFD->pBuf, pBlock + (offset - blk_offset), pFD->szPage);
|
memcpy(pFD->pBuf, pBlock + (offset - blk_offset), pFD->szPage);
|
||||||
|
|
||||||
tsdbBCacheRelease(pFD->pTsdb->bCache, handle);
|
tsdbCacheRelease(pFD->pTsdb->bCache, handle);
|
||||||
} else {
|
} else {
|
||||||
// seek
|
// seek
|
||||||
int64_t n = taosLSeekFile(pFD->pFD, offset, SEEK_SET);
|
int64_t n = taosLSeekFile(pFD->pFD, offset, SEEK_SET);
|
||||||
|
@ -254,7 +254,7 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size) {
|
static int32_t tsdbReadFileImp(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int64_t n = 0;
|
int64_t n = 0;
|
||||||
int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage);
|
int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage);
|
||||||
|
@ -283,6 +283,117 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbReadFileS3(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int64_t n = 0;
|
||||||
|
int32_t szPgCont = PAGE_CONTENT_SIZE(pFD->szPage);
|
||||||
|
int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage);
|
||||||
|
int64_t pgno = OFFSET_PGNO(fOffset, pFD->szPage);
|
||||||
|
int64_t bOffset = fOffset % pFD->szPage;
|
||||||
|
|
||||||
|
ASSERT(bOffset < szPgCont);
|
||||||
|
|
||||||
|
// 1, find pgnoStart & pgnoEnd to fetch from s3, if all pgs are local, no need to fetch
|
||||||
|
// 2, fetch pgnoStart ~ pgnoEnd from s3
|
||||||
|
// 3, store pgs to pcache & last pg to pFD->pBuf
|
||||||
|
// 4, deliver pgs to [pBuf, pBuf + size)
|
||||||
|
|
||||||
|
while (n < size) {
|
||||||
|
if (pFD->pgno != pgno) {
|
||||||
|
LRUHandle *handle = NULL;
|
||||||
|
code = tsdbCacheGetPageS3(pFD->pTsdb->pgCache, pFD, pgno, &handle);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
if (handle) {
|
||||||
|
tsdbCacheRelease(pFD->pTsdb->pgCache, handle);
|
||||||
|
}
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!handle) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint8_t *pPage = (uint8_t *)taosLRUCacheValue(pFD->pTsdb->pgCache, handle);
|
||||||
|
memcpy(pFD->pBuf, pPage, pFD->szPage);
|
||||||
|
tsdbCacheRelease(pFD->pTsdb->pgCache, handle);
|
||||||
|
|
||||||
|
// check
|
||||||
|
if (pgno > 1 && !taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) {
|
||||||
|
code = TSDB_CODE_FILE_CORRUPTED;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
pFD->pgno = pgno;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t nRead = TMIN(szPgCont - bOffset, size - n);
|
||||||
|
memcpy(pBuf + n, pFD->pBuf + bOffset, nRead);
|
||||||
|
|
||||||
|
n += nRead;
|
||||||
|
pgno++;
|
||||||
|
bOffset = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (n < size) {
|
||||||
|
// 2, retrieve pgs from s3
|
||||||
|
uint8_t *pBlock = NULL;
|
||||||
|
int64_t retrieve_offset = PAGE_OFFSET(pgno, pFD->szPage);
|
||||||
|
int64_t pgnoEnd = pgno - 1 + (size - n + szPgCont - 1) / szPgCont;
|
||||||
|
int64_t retrieve_size = (pgnoEnd - pgno + 1) * pFD->szPage;
|
||||||
|
code = s3GetObjectBlock(pFD->objName, retrieve_offset, retrieve_size, &pBlock);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3, Store Pages in Cache
|
||||||
|
int nPage = pgnoEnd - pgno + 1;
|
||||||
|
for (int i = 0; i < nPage; ++i) {
|
||||||
|
tsdbCacheSetPageS3(pFD->pTsdb->pgCache, pFD, pgno, pBlock + i * pFD->szPage);
|
||||||
|
|
||||||
|
memcpy(pFD->pBuf, pBlock + i * pFD->szPage, pFD->szPage);
|
||||||
|
|
||||||
|
// check
|
||||||
|
if (pgno > 1 && !taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) {
|
||||||
|
code = TSDB_CODE_FILE_CORRUPTED;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
pFD->pgno = pgno;
|
||||||
|
|
||||||
|
int64_t nRead = TMIN(szPgCont - bOffset, size - n);
|
||||||
|
memcpy(pBuf + n, pFD->pBuf + bOffset, nRead);
|
||||||
|
|
||||||
|
n += nRead;
|
||||||
|
pgno++;
|
||||||
|
bOffset = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosMemoryFree(pBlock);
|
||||||
|
}
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size) {
|
||||||
|
int32_t code = 0;
|
||||||
|
if (!pFD->pFD) {
|
||||||
|
code = tsdbOpenFileImpl(pFD);
|
||||||
|
if (code) {
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pFD->s3File && tsS3BlockSize < 0) {
|
||||||
|
return tsdbReadFileS3(pFD, offset, pBuf, size);
|
||||||
|
} else {
|
||||||
|
return tsdbReadFileImp(pFD, offset, pBuf, size);
|
||||||
|
}
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tsdbFsyncFile(STsdbFD *pFD) {
|
int32_t tsdbFsyncFile(STsdbFD *pFD) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue