diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 8971c1312c..94f85f5007 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -264,6 +264,9 @@ char tsS3BucketName[TSDB_FQDN_LEN] = ""; char tsS3AppId[TSDB_FQDN_LEN] = ""; int8_t tsS3Enabled = false; +int32_t tsS3BlockSize = 4096; // number of tsdb pages +int32_t tsS3BlockCacheSize = 16; // number of blocks + int32_t tsCheckpointInterval = 20; #ifndef _STORAGE @@ -321,7 +324,9 @@ int32_t taosSetS3Cfg(SConfig *pCfg) { return 0; } -struct SConfig *taosGetCfg() { return tsCfg; } +struct SConfig *taosGetCfg() { + return tsCfg; +} static int32_t taosLoadCfg(SConfig *pCfg, const char **envCmd, const char *inputCfgDir, const char *envFile, char *apolloUrl) { diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index ab6a7fb88b..a47a058046 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -380,6 +380,8 @@ struct STsdb { TdThreadMutex lruMutex; SLRUCache *biCache; TdThreadMutex biMutex; + SLRUCache *bCache; + TdThreadMutex bMutex; struct STFileSystem *pFS; // new SRocksCache rCache; }; @@ -643,13 +645,19 @@ struct SRowMerger { }; typedef struct { - char *path; - int32_t szPage; - int32_t flag; - TdFilePtr pFD; - int64_t pgno; - uint8_t *pBuf; - int64_t szFile; + char *path; + int32_t szPage; + int32_t flag; + TdFilePtr pFD; + int64_t pgno; + uint8_t *pBuf; + int64_t szFile; + STsdb *pTsdb; + const char *objName; + uint8_t s3File; + int32_t fid; + int64_t cid; + int64_t blkno; } STsdbFD; struct SDelFWriter { @@ -716,9 +724,9 @@ typedef struct SSttBlockLoadCostInfo { } SSttBlockLoadCostInfo; typedef struct SSttBlockLoadInfo { - SBlockData blockData[2]; // buffered block data - int32_t statisBlockIndex; // buffered statistics block index - void *statisBlock; // buffered statistics block data + SBlockData blockData[2]; // buffered block data + int32_t statisBlockIndex; // buffered statistics block index + void *statisBlock; // buffered statistics block data void *pSttStatisBlkArray; SArray *aSttBlk; int32_t blockIndex[2]; // to denote the loaded block in the corresponding position. diff --git a/source/dnode/vnode/src/inc/vndCos.h b/source/dnode/vnode/src/inc/vndCos.h index 51d214518a..a838cd4acf 100644 --- a/source/dnode/vnode/src/inc/vndCos.h +++ b/source/dnode/vnode/src/inc/vndCos.h @@ -22,7 +22,11 @@ extern "C" { #endif -extern int8_t tsS3Enabled; +#define S3_BLOCK_CACHE + +extern int8_t tsS3Enabled; +extern int32_t tsS3BlockSize; +extern int32_t tsS3BlockCacheSize; int32_t s3Init(); void s3CleanUp(); @@ -31,6 +35,7 @@ void s3DeleteObjectsByPrefix(const char *prefix); void s3DeleteObjects(const char *object_name[], int nobject); bool s3Exists(const char *object_name); bool s3Get(const char *object_name, const char *path); +int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t *pBlock); void s3EvictCache(const char *path, long object_size); long s3Size(const char *object_name); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 89bdc085a3..09bcba9b61 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -16,6 +16,7 @@ #include "tsdbDataFileRW.h" #include "tsdbReadUtil.h" #include "vnd.h" +#include "vndCos.h" #define ROCKS_BATCH_SIZE (4096) @@ -51,6 +52,41 @@ static void tsdbCloseBICache(STsdb *pTsdb) { } } +static int32_t tsdbOpenBCache(STsdb *pTsdb) { + int32_t code = 0; + // SLRUCache *pCache = taosLRUCacheInit(10 * 1024 * 1024, 0, .5); + int32_t szPage = pTsdb->pVnode->config.tsdbPageSize; + + SLRUCache *pCache = taosLRUCacheInit(tsS3BlockCacheSize * tsS3BlockSize * szPage, 0, .5); + if (pCache == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + taosLRUCacheSetStrictCapacity(pCache, false); + + taosThreadMutexInit(&pTsdb->bMutex, NULL); + +_err: + pTsdb->bCache = pCache; + return code; +} + +static void tsdbCloseBCache(STsdb *pTsdb) { + SLRUCache *pCache = pTsdb->bCache; + if (pCache) { + int32_t elems = taosLRUCacheGetElems(pCache); + tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems); + taosLRUCacheEraseUnrefEntries(pCache); + elems = taosLRUCacheGetElems(pCache); + tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems); + + taosLRUCacheCleanup(pCache); + + taosThreadMutexDestroy(&pTsdb->bMutex); + } +} + #define ROCKS_KEY_LEN (sizeof(tb_uid_t) + sizeof(int16_t) + sizeof(int8_t)) typedef struct { @@ -1149,6 +1185,12 @@ int32_t tsdbOpenCache(STsdb *pTsdb) { goto _err; } + code = tsdbOpenBCache(pTsdb); + if (code != TSDB_CODE_SUCCESS) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + code = tsdbOpenRocksCache(pTsdb); if (code != TSDB_CODE_SUCCESS) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -1178,6 +1220,7 @@ void tsdbCloseCache(STsdb *pTsdb) { } tsdbCloseBICache(pTsdb); + tsdbCloseBCache(pTsdb); tsdbCloseRocksCache(pTsdb); } @@ -2987,3 +3030,98 @@ int32_t tsdbBICacheRelease(SLRUCache *pCache, LRUHandle *h) { return code; } + +// block cache +static void getBCacheKey(int32_t fid, int64_t commitID, int64_t blkno, char *key, int *len) { + struct { + int32_t fid; + int64_t commitID; + int64_t blkno; + } bKey = {0}; + + bKey.fid = fid; + bKey.commitID = commitID; + bKey.blkno = blkno; + + *len = sizeof(bKey); + memcpy(key, &bKey, *len); +} + +static int32_t tsdbCacheLoadBlockS3(STsdbFD *pFD, uint8_t **ppBlock) { + int32_t code = 0; + uint8_t *pBlock = taosMemoryCalloc(1, tsS3BlockSize * pFD->szPage); + if (pBlock == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + int64_t block_offset = (pFD->blkno - 1) * tsS3BlockSize * pFD->szPage; + code = s3GetObjectBlock(pFD->objName, block_offset, tsS3BlockSize * pFD->szPage, pBlock); + if (code != TSDB_CODE_SUCCESS) { + taosMemoryFree(pBlock); + code = TSDB_CODE_OUT_OF_MEMORY; + return code; + } + + *ppBlock = pBlock; + + tsdbTrace("block:%p load from s3", pBlock); + +_exit: + return code; +} + +static void deleteBCache(const void *key, size_t keyLen, void *value, void *ud) { + (void)ud; + uint8_t *pBlock = (uint8_t *)value; + + taosMemoryFree(pBlock); +} + +int32_t tsdbCacheGetBlockS3(SLRUCache *pCache, STsdbFD *pFD, LRUHandle **handle) { + int32_t code = 0; + char key[128] = {0}; + int keyLen = 0; + + getBCacheKey(pFD->fid, pFD->cid, pFD->blkno, key, &keyLen); + LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen); + if (!h) { + STsdb *pTsdb = pFD->pTsdb; + taosThreadMutexLock(&pTsdb->bMutex); + + h = taosLRUCacheLookup(pCache, key, keyLen); + if (!h) { + uint8_t *pBlock = NULL; + code = tsdbCacheLoadBlockS3(pFD, &pBlock); + // if table's empty or error, return code of -1 + if (code != TSDB_CODE_SUCCESS || pBlock == NULL) { + taosThreadMutexUnlock(&pTsdb->bMutex); + + *handle = NULL; + return 0; + } + + size_t charge = tsS3BlockSize * pFD->szPage; + _taos_lru_deleter_t deleter = deleteBCache; + LRUStatus status = + taosLRUCacheInsert(pCache, key, keyLen, pBlock, charge, deleter, &h, TAOS_LRU_PRIORITY_LOW, NULL); + if (status != TAOS_LRU_STATUS_OK) { + code = -1; + } + } + + taosThreadMutexUnlock(&pTsdb->bMutex); + } + + *handle = h; + + return code; +} + +int32_t tsdbBCacheRelease(SLRUCache *pCache, LRUHandle *h) { + int32_t code = 0; + + taosLRUCacheRelease(pCache, h, false); + + return code; +} diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 974b7f1b76..cbb0215370 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -28,6 +28,7 @@ static int32_t tsdbOpenFileImpl(STsdbFD *pFD) { const char *object_name = taosDirEntryBaseName((char *)path); long s3_size = tsS3Enabled ? s3Size(object_name) : 0; if (tsS3Enabled && !strncmp(path + strlen(path) - 5, ".data", 5) && s3_size > 0) { +#ifndef S3_BLOCK_CACHE s3EvictCache(path, s3_size); s3Get(object_name, path); @@ -38,6 +39,13 @@ static int32_t tsdbOpenFileImpl(STsdbFD *pFD) { // taosMemoryFree(pFD); goto _exit; } +#else + pFD->s3File = 1; + pFD->pFD = (TdFilePtr)&pFD->s3File; + int32_t vid = 0; + sscanf(object_name, "v%df%dver%lld.data", &vid, &pFD->fid, &pFD->cid); + pFD->objName = object_name; +#endif } else { code = TAOS_SYSTEM_ERROR(errsv); // taosMemoryFree(pFD); @@ -101,7 +109,9 @@ void tsdbCloseFile(STsdbFD **ppFD) { STsdbFD *pFD = *ppFD; if (pFD) { taosMemoryFree(pFD->pBuf); - taosCloseFile(&pFD->pFD); + if (!pFD->s3File) { + taosCloseFile(&pFD->pFD); + } taosMemoryFree(pFD); *ppFD = NULL; } @@ -153,6 +163,18 @@ static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno) { } } + if (pFD->s3File) { + // 1. convert pgno to block no, + // block_size(# of pages) & block_cache_size (# of blocks) + // block_no = pgno / block_size + 1; + // block_offset = (block_no - 1) * block_size * pFD->szPage + // 2, lookup block cache to fetch block + // 3, if found, memcpy page from block + // 4, if not found, download block from s3 + // check pg checksum in the block + // insert into block cache and goto step 3. + } + // seek int64_t offset = PAGE_OFFSET(pgno, pFD->szPage); int64_t n = taosLSeekFile(pFD->pFD, offset, SEEK_SET); diff --git a/source/dnode/vnode/src/vnd/vnodeCos.c b/source/dnode/vnode/src/vnd/vnodeCos.c index 7e95a55077..86bdb218bb 100644 --- a/source/dnode/vnode/src/vnd/vnodeCos.c +++ b/source/dnode/vnode/src/vnd/vnodeCos.c @@ -217,6 +217,77 @@ bool s3Get(const char *object_name, const char *path) { return ret; } +int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t block_size, uint8_t *pBlock) { + int32_t code = 0; + cos_pool_t *p = NULL; + int is_cname = 0; + cos_status_t *s = NULL; + cos_request_options_t *options = NULL; + cos_string_t bucket; + cos_string_t object; + cos_table_t *resp_headers; + cos_table_t *headers = NULL; + cos_buf_t *content = NULL; + // cos_string_t file; + // int traffic_limit = 0; + char range_buf[64]; + + //创建内存池 + cos_pool_create(&p, NULL); + + //初始化请求选项 + options = cos_request_options_create(p); + // init_test_request_options(options, is_cname); + s3InitRequestOptions(options, is_cname); + cos_str_set(&bucket, tsS3BucketName); + cos_str_set(&object, object_name); + cos_list_t download_buffer; + cos_list_init(&download_buffer); + /* + if (traffic_limit) { + // 限速值设置范围为819200 - 838860800,单位默认为 bit/s,即800Kb/s - 800Mb/s,如果超出该范围将返回400错误 + headers = cos_table_make(p, 1); + cos_table_add_int(headers, "x-cos-traffic-limit", 819200); + } + */ + + headers = cos_table_create_if_null(options, headers, 1); + apr_snprintf(range_buf, sizeof(range_buf), "bytes=%" APR_INT64_T_FMT "-%" APR_INT64_T_FMT, offset, + offset + block_size - 1); + apr_table_add(headers, COS_RANGE, range_buf); + + s = cos_get_object_to_buffer(options, &bucket, &object, headers, NULL, &download_buffer, &resp_headers); + if (!cos_status_is_ok(s)) { + vError("s3: %s", s->error_msg); + vError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno)); + code = terrno; + return code; + } + + log_status(s); + // print_headers(resp_headers); + int64_t len = 0; + int64_t size = 0; + int64_t pos = 0; + cos_list_for_each_entry(cos_buf_t, content, &download_buffer, node) { len += cos_buf_size(content); } + // char *buf = cos_pcalloc(p, (apr_size_t)(len + 1)); + char *buf = taosMemoryCalloc(1, (apr_size_t)(len)); + // buf[len] = '\0'; + cos_list_for_each_entry(cos_buf_t, content, &download_buffer, node) { + size = cos_buf_size(content); + memcpy(buf + pos, content->pos, (size_t)size); + pos += size; + } + // cos_warn_log("Download data=%s", buf); + + //销毁内存池 + cos_pool_destroy(p); + + pBlock = buf; + + return code; +} + typedef struct { int64_t size; int32_t atime; @@ -337,6 +408,7 @@ void s3DeleteObjectsByPrefix(const char *prefix) {} void s3DeleteObjects(const char *object_name[], int nobject) {} bool s3Exists(const char *object_name) { return false; } bool s3Get(const char *object_name, const char *path) { return false; } +int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t *pBlock) { return 0; } void s3EvictCache(const char *path, long object_size) {} long s3Size(const char *object_name) { return 0; }