s3/block_cache: new tsdb block cache

This commit is contained in:
Minglei Jin 2023-09-15 16:19:21 +08:00
parent c171311fc8
commit 318281356b
6 changed files with 263 additions and 13 deletions

View File

@ -264,6 +264,9 @@ char tsS3BucketName[TSDB_FQDN_LEN] = "<bucketname>";
char tsS3AppId[TSDB_FQDN_LEN] = "<appid>";
int8_t tsS3Enabled = false;
int32_t tsS3BlockSize = 4096; // number of tsdb pages
int32_t tsS3BlockCacheSize = 16; // number of blocks
int32_t tsCheckpointInterval = 20;
#ifndef _STORAGE
@ -321,7 +324,9 @@ int32_t taosSetS3Cfg(SConfig *pCfg) {
return 0;
}
struct SConfig *taosGetCfg() { return tsCfg; }
struct SConfig *taosGetCfg() {
return tsCfg;
}
static int32_t taosLoadCfg(SConfig *pCfg, const char **envCmd, const char *inputCfgDir, const char *envFile,
char *apolloUrl) {

View File

@ -380,6 +380,8 @@ struct STsdb {
TdThreadMutex lruMutex;
SLRUCache *biCache;
TdThreadMutex biMutex;
SLRUCache *bCache;
TdThreadMutex bMutex;
struct STFileSystem *pFS; // new
SRocksCache rCache;
};
@ -643,13 +645,19 @@ struct SRowMerger {
};
typedef struct {
char *path;
int32_t szPage;
int32_t flag;
TdFilePtr pFD;
int64_t pgno;
uint8_t *pBuf;
int64_t szFile;
char *path;
int32_t szPage;
int32_t flag;
TdFilePtr pFD;
int64_t pgno;
uint8_t *pBuf;
int64_t szFile;
STsdb *pTsdb;
const char *objName;
uint8_t s3File;
int32_t fid;
int64_t cid;
int64_t blkno;
} STsdbFD;
struct SDelFWriter {
@ -716,9 +724,9 @@ typedef struct SSttBlockLoadCostInfo {
} SSttBlockLoadCostInfo;
typedef struct SSttBlockLoadInfo {
SBlockData blockData[2]; // buffered block data
int32_t statisBlockIndex; // buffered statistics block index
void *statisBlock; // buffered statistics block data
SBlockData blockData[2]; // buffered block data
int32_t statisBlockIndex; // buffered statistics block index
void *statisBlock; // buffered statistics block data
void *pSttStatisBlkArray;
SArray *aSttBlk;
int32_t blockIndex[2]; // to denote the loaded block in the corresponding position.

View File

@ -22,7 +22,11 @@
extern "C" {
#endif
extern int8_t tsS3Enabled;
#define S3_BLOCK_CACHE
extern int8_t tsS3Enabled;
extern int32_t tsS3BlockSize;
extern int32_t tsS3BlockCacheSize;
int32_t s3Init();
void s3CleanUp();
@ -31,6 +35,7 @@ void s3DeleteObjectsByPrefix(const char *prefix);
void s3DeleteObjects(const char *object_name[], int nobject);
bool s3Exists(const char *object_name);
bool s3Get(const char *object_name, const char *path);
int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t *pBlock);
void s3EvictCache(const char *path, long object_size);
long s3Size(const char *object_name);

View File

@ -16,6 +16,7 @@
#include "tsdbDataFileRW.h"
#include "tsdbReadUtil.h"
#include "vnd.h"
#include "vndCos.h"
#define ROCKS_BATCH_SIZE (4096)
@ -51,6 +52,41 @@ static void tsdbCloseBICache(STsdb *pTsdb) {
}
}
static int32_t tsdbOpenBCache(STsdb *pTsdb) {
int32_t code = 0;
// SLRUCache *pCache = taosLRUCacheInit(10 * 1024 * 1024, 0, .5);
int32_t szPage = pTsdb->pVnode->config.tsdbPageSize;
SLRUCache *pCache = taosLRUCacheInit(tsS3BlockCacheSize * tsS3BlockSize * szPage, 0, .5);
if (pCache == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
taosLRUCacheSetStrictCapacity(pCache, false);
taosThreadMutexInit(&pTsdb->bMutex, NULL);
_err:
pTsdb->bCache = pCache;
return code;
}
static void tsdbCloseBCache(STsdb *pTsdb) {
SLRUCache *pCache = pTsdb->bCache;
if (pCache) {
int32_t elems = taosLRUCacheGetElems(pCache);
tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
taosLRUCacheEraseUnrefEntries(pCache);
elems = taosLRUCacheGetElems(pCache);
tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
taosLRUCacheCleanup(pCache);
taosThreadMutexDestroy(&pTsdb->bMutex);
}
}
#define ROCKS_KEY_LEN (sizeof(tb_uid_t) + sizeof(int16_t) + sizeof(int8_t))
typedef struct {
@ -1149,6 +1185,12 @@ int32_t tsdbOpenCache(STsdb *pTsdb) {
goto _err;
}
code = tsdbOpenBCache(pTsdb);
if (code != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
code = tsdbOpenRocksCache(pTsdb);
if (code != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_OUT_OF_MEMORY;
@ -1178,6 +1220,7 @@ void tsdbCloseCache(STsdb *pTsdb) {
}
tsdbCloseBICache(pTsdb);
tsdbCloseBCache(pTsdb);
tsdbCloseRocksCache(pTsdb);
}
@ -2987,3 +3030,98 @@ int32_t tsdbBICacheRelease(SLRUCache *pCache, LRUHandle *h) {
return code;
}
// block cache
static void getBCacheKey(int32_t fid, int64_t commitID, int64_t blkno, char *key, int *len) {
struct {
int32_t fid;
int64_t commitID;
int64_t blkno;
} bKey = {0};
bKey.fid = fid;
bKey.commitID = commitID;
bKey.blkno = blkno;
*len = sizeof(bKey);
memcpy(key, &bKey, *len);
}
static int32_t tsdbCacheLoadBlockS3(STsdbFD *pFD, uint8_t **ppBlock) {
int32_t code = 0;
uint8_t *pBlock = taosMemoryCalloc(1, tsS3BlockSize * pFD->szPage);
if (pBlock == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
int64_t block_offset = (pFD->blkno - 1) * tsS3BlockSize * pFD->szPage;
code = s3GetObjectBlock(pFD->objName, block_offset, tsS3BlockSize * pFD->szPage, pBlock);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pBlock);
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
}
*ppBlock = pBlock;
tsdbTrace("block:%p load from s3", pBlock);
_exit:
return code;
}
static void deleteBCache(const void *key, size_t keyLen, void *value, void *ud) {
(void)ud;
uint8_t *pBlock = (uint8_t *)value;
taosMemoryFree(pBlock);
}
int32_t tsdbCacheGetBlockS3(SLRUCache *pCache, STsdbFD *pFD, LRUHandle **handle) {
int32_t code = 0;
char key[128] = {0};
int keyLen = 0;
getBCacheKey(pFD->fid, pFD->cid, pFD->blkno, key, &keyLen);
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
if (!h) {
STsdb *pTsdb = pFD->pTsdb;
taosThreadMutexLock(&pTsdb->bMutex);
h = taosLRUCacheLookup(pCache, key, keyLen);
if (!h) {
uint8_t *pBlock = NULL;
code = tsdbCacheLoadBlockS3(pFD, &pBlock);
// if table's empty or error, return code of -1
if (code != TSDB_CODE_SUCCESS || pBlock == NULL) {
taosThreadMutexUnlock(&pTsdb->bMutex);
*handle = NULL;
return 0;
}
size_t charge = tsS3BlockSize * pFD->szPage;
_taos_lru_deleter_t deleter = deleteBCache;
LRUStatus status =
taosLRUCacheInsert(pCache, key, keyLen, pBlock, charge, deleter, &h, TAOS_LRU_PRIORITY_LOW, NULL);
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
}
taosThreadMutexUnlock(&pTsdb->bMutex);
}
*handle = h;
return code;
}
int32_t tsdbBCacheRelease(SLRUCache *pCache, LRUHandle *h) {
int32_t code = 0;
taosLRUCacheRelease(pCache, h, false);
return code;
}

View File

@ -28,6 +28,7 @@ static int32_t tsdbOpenFileImpl(STsdbFD *pFD) {
const char *object_name = taosDirEntryBaseName((char *)path);
long s3_size = tsS3Enabled ? s3Size(object_name) : 0;
if (tsS3Enabled && !strncmp(path + strlen(path) - 5, ".data", 5) && s3_size > 0) {
#ifndef S3_BLOCK_CACHE
s3EvictCache(path, s3_size);
s3Get(object_name, path);
@ -38,6 +39,13 @@ static int32_t tsdbOpenFileImpl(STsdbFD *pFD) {
// taosMemoryFree(pFD);
goto _exit;
}
#else
pFD->s3File = 1;
pFD->pFD = (TdFilePtr)&pFD->s3File;
int32_t vid = 0;
sscanf(object_name, "v%df%dver%lld.data", &vid, &pFD->fid, &pFD->cid);
pFD->objName = object_name;
#endif
} else {
code = TAOS_SYSTEM_ERROR(errsv);
// taosMemoryFree(pFD);
@ -101,7 +109,9 @@ void tsdbCloseFile(STsdbFD **ppFD) {
STsdbFD *pFD = *ppFD;
if (pFD) {
taosMemoryFree(pFD->pBuf);
taosCloseFile(&pFD->pFD);
if (!pFD->s3File) {
taosCloseFile(&pFD->pFD);
}
taosMemoryFree(pFD);
*ppFD = NULL;
}
@ -153,6 +163,18 @@ static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno) {
}
}
if (pFD->s3File) {
// 1. convert pgno to block no,
// block_size(# of pages) & block_cache_size (# of blocks)
// block_no = pgno / block_size + 1;
// block_offset = (block_no - 1) * block_size * pFD->szPage
// 2, lookup block cache to fetch block
// 3, if found, memcpy page from block
// 4, if not found, download block from s3
// check pg checksum in the block
// insert into block cache and goto step 3.
}
// seek
int64_t offset = PAGE_OFFSET(pgno, pFD->szPage);
int64_t n = taosLSeekFile(pFD->pFD, offset, SEEK_SET);

View File

@ -217,6 +217,77 @@ bool s3Get(const char *object_name, const char *path) {
return ret;
}
int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t block_size, uint8_t *pBlock) {
int32_t code = 0;
cos_pool_t *p = NULL;
int is_cname = 0;
cos_status_t *s = NULL;
cos_request_options_t *options = NULL;
cos_string_t bucket;
cos_string_t object;
cos_table_t *resp_headers;
cos_table_t *headers = NULL;
cos_buf_t *content = NULL;
// cos_string_t file;
// int traffic_limit = 0;
char range_buf[64];
//创建内存池
cos_pool_create(&p, NULL);
//初始化请求选项
options = cos_request_options_create(p);
// init_test_request_options(options, is_cname);
s3InitRequestOptions(options, is_cname);
cos_str_set(&bucket, tsS3BucketName);
cos_str_set(&object, object_name);
cos_list_t download_buffer;
cos_list_init(&download_buffer);
/*
if (traffic_limit) {
// 限速值设置范围为819200 - 838860800单位默认为 bit/s即800Kb/s - 800Mb/s如果超出该范围将返回400错误
headers = cos_table_make(p, 1);
cos_table_add_int(headers, "x-cos-traffic-limit", 819200);
}
*/
headers = cos_table_create_if_null(options, headers, 1);
apr_snprintf(range_buf, sizeof(range_buf), "bytes=%" APR_INT64_T_FMT "-%" APR_INT64_T_FMT, offset,
offset + block_size - 1);
apr_table_add(headers, COS_RANGE, range_buf);
s = cos_get_object_to_buffer(options, &bucket, &object, headers, NULL, &download_buffer, &resp_headers);
if (!cos_status_is_ok(s)) {
vError("s3: %s", s->error_msg);
vError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
code = terrno;
return code;
}
log_status(s);
// print_headers(resp_headers);
int64_t len = 0;
int64_t size = 0;
int64_t pos = 0;
cos_list_for_each_entry(cos_buf_t, content, &download_buffer, node) { len += cos_buf_size(content); }
// char *buf = cos_pcalloc(p, (apr_size_t)(len + 1));
char *buf = taosMemoryCalloc(1, (apr_size_t)(len));
// buf[len] = '\0';
cos_list_for_each_entry(cos_buf_t, content, &download_buffer, node) {
size = cos_buf_size(content);
memcpy(buf + pos, content->pos, (size_t)size);
pos += size;
}
// cos_warn_log("Download data=%s", buf);
//销毁内存池
cos_pool_destroy(p);
pBlock = buf;
return code;
}
typedef struct {
int64_t size;
int32_t atime;
@ -337,6 +408,7 @@ void s3DeleteObjectsByPrefix(const char *prefix) {}
void s3DeleteObjects(const char *object_name[], int nobject) {}
bool s3Exists(const char *object_name) { return false; }
bool s3Get(const char *object_name, const char *path) { return false; }
int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t *pBlock) { return 0; }
void s3EvictCache(const char *path, long object_size) {}
long s3Size(const char *object_name) { return 0; }