From 318281356b265be992c723856d265ae316f06a53 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Fri, 15 Sep 2023 16:19:21 +0800 Subject: [PATCH 1/7] s3/block_cache: new tsdb block cache --- source/common/src/tglobal.c | 7 +- source/dnode/vnode/src/inc/tsdb.h | 28 ++-- source/dnode/vnode/src/inc/vndCos.h | 7 +- source/dnode/vnode/src/tsdb/tsdbCache.c | 138 ++++++++++++++++++ .../dnode/vnode/src/tsdb/tsdbReaderWriter.c | 24 ++- source/dnode/vnode/src/vnd/vnodeCos.c | 72 +++++++++ 6 files changed, 263 insertions(+), 13 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 8971c1312c..94f85f5007 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -264,6 +264,9 @@ char tsS3BucketName[TSDB_FQDN_LEN] = ""; char tsS3AppId[TSDB_FQDN_LEN] = ""; int8_t tsS3Enabled = false; +int32_t tsS3BlockSize = 4096; // number of tsdb pages +int32_t tsS3BlockCacheSize = 16; // number of blocks + int32_t tsCheckpointInterval = 20; #ifndef _STORAGE @@ -321,7 +324,9 @@ int32_t taosSetS3Cfg(SConfig *pCfg) { return 0; } -struct SConfig *taosGetCfg() { return tsCfg; } +struct SConfig *taosGetCfg() { + return tsCfg; +} static int32_t taosLoadCfg(SConfig *pCfg, const char **envCmd, const char *inputCfgDir, const char *envFile, char *apolloUrl) { diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index ab6a7fb88b..a47a058046 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -380,6 +380,8 @@ struct STsdb { TdThreadMutex lruMutex; SLRUCache *biCache; TdThreadMutex biMutex; + SLRUCache *bCache; + TdThreadMutex bMutex; struct STFileSystem *pFS; // new SRocksCache rCache; }; @@ -643,13 +645,19 @@ struct SRowMerger { }; typedef struct { - char *path; - int32_t szPage; - int32_t flag; - TdFilePtr pFD; - int64_t pgno; - uint8_t *pBuf; - int64_t szFile; + char *path; + int32_t szPage; + int32_t flag; + TdFilePtr pFD; + int64_t pgno; + uint8_t *pBuf; + int64_t szFile; + STsdb *pTsdb; + const char *objName; + uint8_t s3File; + int32_t fid; + int64_t cid; + int64_t blkno; } STsdbFD; struct SDelFWriter { @@ -716,9 +724,9 @@ typedef struct SSttBlockLoadCostInfo { } SSttBlockLoadCostInfo; typedef struct SSttBlockLoadInfo { - SBlockData blockData[2]; // buffered block data - int32_t statisBlockIndex; // buffered statistics block index - void *statisBlock; // buffered statistics block data + SBlockData blockData[2]; // buffered block data + int32_t statisBlockIndex; // buffered statistics block index + void *statisBlock; // buffered statistics block data void *pSttStatisBlkArray; SArray *aSttBlk; int32_t blockIndex[2]; // to denote the loaded block in the corresponding position. diff --git a/source/dnode/vnode/src/inc/vndCos.h b/source/dnode/vnode/src/inc/vndCos.h index 51d214518a..a838cd4acf 100644 --- a/source/dnode/vnode/src/inc/vndCos.h +++ b/source/dnode/vnode/src/inc/vndCos.h @@ -22,7 +22,11 @@ extern "C" { #endif -extern int8_t tsS3Enabled; +#define S3_BLOCK_CACHE + +extern int8_t tsS3Enabled; +extern int32_t tsS3BlockSize; +extern int32_t tsS3BlockCacheSize; int32_t s3Init(); void s3CleanUp(); @@ -31,6 +35,7 @@ void s3DeleteObjectsByPrefix(const char *prefix); void s3DeleteObjects(const char *object_name[], int nobject); bool s3Exists(const char *object_name); bool s3Get(const char *object_name, const char *path); +int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t *pBlock); void s3EvictCache(const char *path, long object_size); long s3Size(const char *object_name); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 89bdc085a3..09bcba9b61 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -16,6 +16,7 @@ #include "tsdbDataFileRW.h" #include "tsdbReadUtil.h" #include "vnd.h" +#include "vndCos.h" #define ROCKS_BATCH_SIZE (4096) @@ -51,6 +52,41 @@ static void tsdbCloseBICache(STsdb *pTsdb) { } } +static int32_t tsdbOpenBCache(STsdb *pTsdb) { + int32_t code = 0; + // SLRUCache *pCache = taosLRUCacheInit(10 * 1024 * 1024, 0, .5); + int32_t szPage = pTsdb->pVnode->config.tsdbPageSize; + + SLRUCache *pCache = taosLRUCacheInit(tsS3BlockCacheSize * tsS3BlockSize * szPage, 0, .5); + if (pCache == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + taosLRUCacheSetStrictCapacity(pCache, false); + + taosThreadMutexInit(&pTsdb->bMutex, NULL); + +_err: + pTsdb->bCache = pCache; + return code; +} + +static void tsdbCloseBCache(STsdb *pTsdb) { + SLRUCache *pCache = pTsdb->bCache; + if (pCache) { + int32_t elems = taosLRUCacheGetElems(pCache); + tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems); + taosLRUCacheEraseUnrefEntries(pCache); + elems = taosLRUCacheGetElems(pCache); + tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems); + + taosLRUCacheCleanup(pCache); + + taosThreadMutexDestroy(&pTsdb->bMutex); + } +} + #define ROCKS_KEY_LEN (sizeof(tb_uid_t) + sizeof(int16_t) + sizeof(int8_t)) typedef struct { @@ -1149,6 +1185,12 @@ int32_t tsdbOpenCache(STsdb *pTsdb) { goto _err; } + code = tsdbOpenBCache(pTsdb); + if (code != TSDB_CODE_SUCCESS) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + code = tsdbOpenRocksCache(pTsdb); if (code != TSDB_CODE_SUCCESS) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -1178,6 +1220,7 @@ void tsdbCloseCache(STsdb *pTsdb) { } tsdbCloseBICache(pTsdb); + tsdbCloseBCache(pTsdb); tsdbCloseRocksCache(pTsdb); } @@ -2987,3 +3030,98 @@ int32_t tsdbBICacheRelease(SLRUCache *pCache, LRUHandle *h) { return code; } + +// block cache +static void getBCacheKey(int32_t fid, int64_t commitID, int64_t blkno, char *key, int *len) { + struct { + int32_t fid; + int64_t commitID; + int64_t blkno; + } bKey = {0}; + + bKey.fid = fid; + bKey.commitID = commitID; + bKey.blkno = blkno; + + *len = sizeof(bKey); + memcpy(key, &bKey, *len); +} + +static int32_t tsdbCacheLoadBlockS3(STsdbFD *pFD, uint8_t **ppBlock) { + int32_t code = 0; + uint8_t *pBlock = taosMemoryCalloc(1, tsS3BlockSize * pFD->szPage); + if (pBlock == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + int64_t block_offset = (pFD->blkno - 1) * tsS3BlockSize * pFD->szPage; + code = s3GetObjectBlock(pFD->objName, block_offset, tsS3BlockSize * pFD->szPage, pBlock); + if (code != TSDB_CODE_SUCCESS) { + taosMemoryFree(pBlock); + code = TSDB_CODE_OUT_OF_MEMORY; + return code; + } + + *ppBlock = pBlock; + + tsdbTrace("block:%p load from s3", pBlock); + +_exit: + return code; +} + +static void deleteBCache(const void *key, size_t keyLen, void *value, void *ud) { + (void)ud; + uint8_t *pBlock = (uint8_t *)value; + + taosMemoryFree(pBlock); +} + +int32_t tsdbCacheGetBlockS3(SLRUCache *pCache, STsdbFD *pFD, LRUHandle **handle) { + int32_t code = 0; + char key[128] = {0}; + int keyLen = 0; + + getBCacheKey(pFD->fid, pFD->cid, pFD->blkno, key, &keyLen); + LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen); + if (!h) { + STsdb *pTsdb = pFD->pTsdb; + taosThreadMutexLock(&pTsdb->bMutex); + + h = taosLRUCacheLookup(pCache, key, keyLen); + if (!h) { + uint8_t *pBlock = NULL; + code = tsdbCacheLoadBlockS3(pFD, &pBlock); + // if table's empty or error, return code of -1 + if (code != TSDB_CODE_SUCCESS || pBlock == NULL) { + taosThreadMutexUnlock(&pTsdb->bMutex); + + *handle = NULL; + return 0; + } + + size_t charge = tsS3BlockSize * pFD->szPage; + _taos_lru_deleter_t deleter = deleteBCache; + LRUStatus status = + taosLRUCacheInsert(pCache, key, keyLen, pBlock, charge, deleter, &h, TAOS_LRU_PRIORITY_LOW, NULL); + if (status != TAOS_LRU_STATUS_OK) { + code = -1; + } + } + + taosThreadMutexUnlock(&pTsdb->bMutex); + } + + *handle = h; + + return code; +} + +int32_t tsdbBCacheRelease(SLRUCache *pCache, LRUHandle *h) { + int32_t code = 0; + + taosLRUCacheRelease(pCache, h, false); + + return code; +} diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 974b7f1b76..cbb0215370 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -28,6 +28,7 @@ static int32_t tsdbOpenFileImpl(STsdbFD *pFD) { const char *object_name = taosDirEntryBaseName((char *)path); long s3_size = tsS3Enabled ? s3Size(object_name) : 0; if (tsS3Enabled && !strncmp(path + strlen(path) - 5, ".data", 5) && s3_size > 0) { +#ifndef S3_BLOCK_CACHE s3EvictCache(path, s3_size); s3Get(object_name, path); @@ -38,6 +39,13 @@ static int32_t tsdbOpenFileImpl(STsdbFD *pFD) { // taosMemoryFree(pFD); goto _exit; } +#else + pFD->s3File = 1; + pFD->pFD = (TdFilePtr)&pFD->s3File; + int32_t vid = 0; + sscanf(object_name, "v%df%dver%lld.data", &vid, &pFD->fid, &pFD->cid); + pFD->objName = object_name; +#endif } else { code = TAOS_SYSTEM_ERROR(errsv); // taosMemoryFree(pFD); @@ -101,7 +109,9 @@ void tsdbCloseFile(STsdbFD **ppFD) { STsdbFD *pFD = *ppFD; if (pFD) { taosMemoryFree(pFD->pBuf); - taosCloseFile(&pFD->pFD); + if (!pFD->s3File) { + taosCloseFile(&pFD->pFD); + } taosMemoryFree(pFD); *ppFD = NULL; } @@ -153,6 +163,18 @@ static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno) { } } + if (pFD->s3File) { + // 1. convert pgno to block no, + // block_size(# of pages) & block_cache_size (# of blocks) + // block_no = pgno / block_size + 1; + // block_offset = (block_no - 1) * block_size * pFD->szPage + // 2, lookup block cache to fetch block + // 3, if found, memcpy page from block + // 4, if not found, download block from s3 + // check pg checksum in the block + // insert into block cache and goto step 3. + } + // seek int64_t offset = PAGE_OFFSET(pgno, pFD->szPage); int64_t n = taosLSeekFile(pFD->pFD, offset, SEEK_SET); diff --git a/source/dnode/vnode/src/vnd/vnodeCos.c b/source/dnode/vnode/src/vnd/vnodeCos.c index 7e95a55077..86bdb218bb 100644 --- a/source/dnode/vnode/src/vnd/vnodeCos.c +++ b/source/dnode/vnode/src/vnd/vnodeCos.c @@ -217,6 +217,77 @@ bool s3Get(const char *object_name, const char *path) { return ret; } +int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t block_size, uint8_t *pBlock) { + int32_t code = 0; + cos_pool_t *p = NULL; + int is_cname = 0; + cos_status_t *s = NULL; + cos_request_options_t *options = NULL; + cos_string_t bucket; + cos_string_t object; + cos_table_t *resp_headers; + cos_table_t *headers = NULL; + cos_buf_t *content = NULL; + // cos_string_t file; + // int traffic_limit = 0; + char range_buf[64]; + + //创建内存池 + cos_pool_create(&p, NULL); + + //初始化请求选项 + options = cos_request_options_create(p); + // init_test_request_options(options, is_cname); + s3InitRequestOptions(options, is_cname); + cos_str_set(&bucket, tsS3BucketName); + cos_str_set(&object, object_name); + cos_list_t download_buffer; + cos_list_init(&download_buffer); + /* + if (traffic_limit) { + // 限速值设置范围为819200 - 838860800,单位默认为 bit/s,即800Kb/s - 800Mb/s,如果超出该范围将返回400错误 + headers = cos_table_make(p, 1); + cos_table_add_int(headers, "x-cos-traffic-limit", 819200); + } + */ + + headers = cos_table_create_if_null(options, headers, 1); + apr_snprintf(range_buf, sizeof(range_buf), "bytes=%" APR_INT64_T_FMT "-%" APR_INT64_T_FMT, offset, + offset + block_size - 1); + apr_table_add(headers, COS_RANGE, range_buf); + + s = cos_get_object_to_buffer(options, &bucket, &object, headers, NULL, &download_buffer, &resp_headers); + if (!cos_status_is_ok(s)) { + vError("s3: %s", s->error_msg); + vError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno)); + code = terrno; + return code; + } + + log_status(s); + // print_headers(resp_headers); + int64_t len = 0; + int64_t size = 0; + int64_t pos = 0; + cos_list_for_each_entry(cos_buf_t, content, &download_buffer, node) { len += cos_buf_size(content); } + // char *buf = cos_pcalloc(p, (apr_size_t)(len + 1)); + char *buf = taosMemoryCalloc(1, (apr_size_t)(len)); + // buf[len] = '\0'; + cos_list_for_each_entry(cos_buf_t, content, &download_buffer, node) { + size = cos_buf_size(content); + memcpy(buf + pos, content->pos, (size_t)size); + pos += size; + } + // cos_warn_log("Download data=%s", buf); + + //销毁内存池 + cos_pool_destroy(p); + + pBlock = buf; + + return code; +} + typedef struct { int64_t size; int32_t atime; @@ -337,6 +408,7 @@ void s3DeleteObjectsByPrefix(const char *prefix) {} void s3DeleteObjects(const char *object_name[], int nobject) {} bool s3Exists(const char *object_name) { return false; } bool s3Get(const char *object_name, const char *path) { return false; } +int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t *pBlock) { return 0; } void s3EvictCache(const char *path, long object_size) {} long s3Size(const char *object_name) { return 0; } From 0181bb24d39d5df93a223e55afdf6ef9c32bba30 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Fri, 15 Sep 2023 16:40:04 +0800 Subject: [PATCH 2/7] tsdb/readerwriter: use PRId64 instead of lld --- source/dnode/vnode/src/tsdb/tsdbReaderWriter.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index cbb0215370..0962ff53ac 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -43,7 +43,7 @@ static int32_t tsdbOpenFileImpl(STsdbFD *pFD) { pFD->s3File = 1; pFD->pFD = (TdFilePtr)&pFD->s3File; int32_t vid = 0; - sscanf(object_name, "v%df%dver%lld.data", &vid, &pFD->fid, &pFD->cid); + sscanf(object_name, "v%df%dver%" PRId64 ".data", &vid, &pFD->fid, &pFD->cid); pFD->objName = object_name; #endif } else { From d155cfcf0fd6cb88cdd270a5d3c9333fc4d6f34d Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 20 Sep 2023 15:24:54 +0800 Subject: [PATCH 3/7] tsdb/readerwriter: use tsdb's s3 block cache to read file page --- source/dnode/vnode/src/inc/tsdb.h | 3 + .../dnode/vnode/src/tsdb/tsdbReaderWriter.c | 59 +++++++++++-------- 2 files changed, 36 insertions(+), 26 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index a47a058046..edcce83a05 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -869,6 +869,9 @@ int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h); int32_t tsdbCacheGetBlockIdx(SLRUCache *pCache, SDataFReader *pFileReader, LRUHandle **handle); int32_t tsdbBICacheRelease(SLRUCache *pCache, LRUHandle *h); +int32_t tsdbCacheGetBlockS3(SLRUCache *pCache, STsdbFD *pFD, LRUHandle **handle); +int32_t tsdbBCacheRelease(SLRUCache *pCache, LRUHandle *h); + int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey); int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey); int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey); diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 0962ff53ac..c095f6ca21 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -163,34 +163,41 @@ static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno) { } } - if (pFD->s3File) { - // 1. convert pgno to block no, - // block_size(# of pages) & block_cache_size (# of blocks) - // block_no = pgno / block_size + 1; - // block_offset = (block_no - 1) * block_size * pFD->szPage - // 2, lookup block cache to fetch block - // 3, if found, memcpy page from block - // 4, if not found, download block from s3 - // check pg checksum in the block - // insert into block cache and goto step 3. - } - - // seek int64_t offset = PAGE_OFFSET(pgno, pFD->szPage); - int64_t n = taosLSeekFile(pFD->pFD, offset, SEEK_SET); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _exit; - } - // read - n = taosReadFile(pFD->pFD, pFD->pBuf, pFD->szPage); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _exit; - } else if (n < pFD->szPage) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _exit; + if (pFD->s3File) { + LRUHandle *handle = NULL; + + pFD->blkno = (pgno + tsS3BlockSize - 1) / tsS3BlockSize; + int32_t code = tsdbCacheGetBlockS3(pFD->pTsdb->bCache, pFD, &handle); + if (code != TSDB_CODE_SUCCESS || handle == NULL) { + tsdbBCacheRelease(pFD->pTsdb->bCache, handle); + goto _exit; + } + + uint8_t *pBlock = (uint8_t *)taosLRUCacheValue(pFD->pTsdb->bCache, handle); + + int64_t blk_offset = (pFD->blkno - 1) * tsS3BlockSize * pFD->szPage; + memcpy(pFD->pBuf, pBlock + (offset - blk_offset), pFD->szPage); + + tsdbBCacheRelease(pFD->pTsdb->bCache, handle); + } else { + // seek + int64_t n = taosLSeekFile(pFD->pFD, offset, SEEK_SET); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _exit; + } + + // read + n = taosReadFile(pFD->pFD, pFD->pBuf, pFD->szPage); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _exit; + } else if (n < pFD->szPage) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _exit; + } } // check From f206fa07d06f3502fd1eb845e01726e5e4ef90e5 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 20 Sep 2023 15:59:58 +0800 Subject: [PATCH 4/7] s3/put: use put2 --- source/dnode/vnode/src/inc/vndCos.h | 1 + source/dnode/vnode/src/tsdb/tsdbRetention.c | 2 +- source/dnode/vnode/src/vnd/vnodeCos.c | 47 ++++++++++++++++++++- 3 files changed, 48 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/inc/vndCos.h b/source/dnode/vnode/src/inc/vndCos.h index a838cd4acf..04d1f6e99f 100644 --- a/source/dnode/vnode/src/inc/vndCos.h +++ b/source/dnode/vnode/src/inc/vndCos.h @@ -31,6 +31,7 @@ extern int32_t tsS3BlockCacheSize; int32_t s3Init(); void s3CleanUp(); int32_t s3PutObjectFromFile(const char *file, const char *object); +int32_t s3PutObjectFromFile2(const char *file, const char *object); void s3DeleteObjectsByPrefix(const char *prefix); void s3DeleteObjects(const char *object_name[], int nobject); bool s3Exists(const char *object_name); diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention.c b/source/dnode/vnode/src/tsdb/tsdbRetention.c index 61be14f9bc..cb53876d97 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention.c @@ -114,7 +114,7 @@ static int32_t tsdbCopyFileS3(SRTNer *rtner, const STFileObj *from, const STFile TSDB_CHECK_CODE(code, lino, _exit); char *object_name = taosDirEntryBaseName(fname); - code = s3PutObjectFromFile(from->fname, object_name); + code = s3PutObjectFromFile2(from->fname, object_name); TSDB_CHECK_CODE(code, lino, _exit); taosCloseFile(&fdFrom); diff --git a/source/dnode/vnode/src/vnd/vnodeCos.c b/source/dnode/vnode/src/vnd/vnodeCos.c index 86bdb218bb..f7b5c4f34d 100644 --- a/source/dnode/vnode/src/vnd/vnodeCos.c +++ b/source/dnode/vnode/src/vnd/vnodeCos.c @@ -59,17 +59,19 @@ int32_t s3PutObjectFromFile(const char *file_str, const char *object_str) { cos_request_options_t *options = NULL; cos_string_t bucket, object, file; cos_table_t *resp_headers; - int traffic_limit = 0; + // int traffic_limit = 0; cos_pool_create(&p, NULL); options = cos_request_options_create(p); s3InitRequestOptions(options, is_cname); cos_table_t *headers = NULL; + /* if (traffic_limit) { // 限速值设置范围为819200 - 838860800,即100KB/s - 100MB/s,如果超出该范围将返回400错误 headers = cos_table_make(p, 1); cos_table_add_int(headers, "x-cos-traffic-limit", 819200); } + */ cos_str_set(&bucket, tsS3BucketName); cos_str_set(&file, file_str); cos_str_set(&object, object_str); @@ -85,6 +87,48 @@ int32_t s3PutObjectFromFile(const char *file_str, const char *object_str) { return code; } +int32_t s3PutObjectFromFile2(const char *file_str, const char *object_str) { + int32_t code = 0; + cos_pool_t *p = NULL; + int is_cname = 0; + cos_status_t *s = NULL; + cos_request_options_t *options = NULL; + cos_string_t bucket, object, file; + cos_table_t *resp_headers; + int traffic_limit = 0; + cos_table_t *headers = NULL; + cos_resumable_clt_params_t *clt_params = NULL; + + cos_pool_create(&p, NULL); + options = cos_request_options_create(p); + s3InitRequestOptions(options, is_cname); + headers = cos_table_make(p, 0); + cos_str_set(&bucket, tsS3BucketName); + cos_str_set(&file, file_str); + cos_str_set(&object, object_str); + + // upload + clt_params = cos_create_resumable_clt_params_content(p, 1024 * 1024, 8, COS_FALSE, NULL); + s = cos_resumable_upload_file(options, &bucket, &object, &file, headers, NULL, clt_params, NULL, &resp_headers, NULL); + + if (!cos_status_is_ok(s)) { + vError("s3: %s", s->error_msg); + vError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno)); + code = terrno; + return code; + } + + log_status(s); + + cos_pool_destroy(p); + + if (s->code != 200) { + return code = s->code; + } + + return code; +} + void s3DeleteObjectsByPrefix(const char *prefix_str) { cos_pool_t *p = NULL; cos_request_options_t *options = NULL; @@ -404,6 +448,7 @@ long s3Size(const char *object_name) { int32_t s3Init() { return 0; } void s3CleanUp() {} int32_t s3PutObjectFromFile(const char *file, const char *object) { return 0; } +int32_t s3PutObjectFromFile2(const char *file, const char *object) { return 0; } void s3DeleteObjectsByPrefix(const char *prefix) {} void s3DeleteObjects(const char *object_name[], int nobject) {} bool s3Exists(const char *object_name) { return false; } From 99120336e957756e747c1cb3edd795113f88a902 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 20 Sep 2023 17:33:53 +0800 Subject: [PATCH 5/7] s3/config: new s3BlockSize & s3BlockCacheSize --- source/common/src/tglobal.c | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 94f85f5007..c20f55e8fb 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -660,6 +660,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddString(pCfg, "s3Accesskey", tsS3AccessKey, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddString(pCfg, "s3Endpoint", tsS3Endpoint, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddString(pCfg, "s3BucketName", tsS3BucketName, CFG_SCOPE_SERVER) != 0) return -1; + if (cfgAddInt32(pCfg, "s3BlockSize", tsS3BlockSize, 2048, 1024 * 1024, CFG_SCOPE_SERVER) != 0) return -1; + if (cfgAddInt32(pCfg, "s3BlockCacheSize", tsS3BlockCacheSize, 4, 1024 * 1024, CFG_SCOPE_SERVER) != 0) return -1; // min free disk space used to check if the disk is full [50MB, 1GB] if (cfgAddInt64(pCfg, "minDiskFreeSize", tsMinDiskFreeSize, TFS_MIN_DISK_FREE_SIZE, 1024 * 1024 * 1024, @@ -1075,6 +1077,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsResolveFQDNRetryTime = cfgGetItem(pCfg, "resolveFQDNRetryTime")->i32; tsMinDiskFreeSize = cfgGetItem(pCfg, "minDiskFreeSize")->i64; + tsS3BlockSize = cfgGetItem(pCfg, "s3BlockSize")->i32; + tsS3BlockCacheSize = cfgGetItem(pCfg, "s3BlockCacheSize")->i32; + GRANT_CFG_GET; return 0; } @@ -1647,6 +1652,20 @@ void taosCfgDynamicOptions(const char *option, const char *value) { return; } + if (strcasecmp(option, "s3BlockSize") == 0) { + int32_t newS3BlockSize = atoi(value); + uInfo("s3BlockSize set from %d to %d", tsS3BlockSize, newS3BlockSize); + tsS3BlockSize = newS3BlockSize; + return; + } + + if (strcasecmp(option, "s3BlockCacheSize") == 0) { + int32_t newS3BlockCacheSize = atoi(value); + uInfo("s3BlockCacheSize set from %d to %d", tsS3BlockCacheSize, newS3BlockCacheSize); + tsS3BlockCacheSize = newS3BlockCacheSize; + return; + } + if (strcasecmp(option, "keepTimeOffset") == 0) { int32_t newKeepTimeOffset = atoi(value); uInfo("keepTimeOffset set from %d to %d", tsKeepTimeOffset, newKeepTimeOffset); From ce1f86641649e91ec04f3046c98b0617fae3d7e2 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 21 Sep 2023 16:36:17 +0800 Subject: [PATCH 6/7] tsdb/reader: use pTsdb handle instead of pgsz to open file --- source/dnode/vnode/src/inc/vndCos.h | 2 +- source/dnode/vnode/src/tsdb/tsdbCache.c | 14 +++++----- source/dnode/vnode/src/tsdb/tsdbDataFileRW.c | 10 +++---- source/dnode/vnode/src/tsdb/tsdbDef.h | 4 +-- .../dnode/vnode/src/tsdb/tsdbReaderWriter.c | 26 ++++++++++--------- source/dnode/vnode/src/tsdb/tsdbSttFileRW.c | 8 +++--- source/dnode/vnode/src/tsdb/tsdbUpgrade.c | 9 +++---- source/dnode/vnode/src/vnd/vnodeCos.c | 6 ++--- 8 files changed, 41 insertions(+), 38 deletions(-) diff --git a/source/dnode/vnode/src/inc/vndCos.h b/source/dnode/vnode/src/inc/vndCos.h index 04d1f6e99f..bb4d284f0e 100644 --- a/source/dnode/vnode/src/inc/vndCos.h +++ b/source/dnode/vnode/src/inc/vndCos.h @@ -36,7 +36,7 @@ void s3DeleteObjectsByPrefix(const char *prefix); void s3DeleteObjects(const char *object_name[], int nobject); bool s3Exists(const char *object_name); bool s3Get(const char *object_name, const char *path); -int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t *pBlock); +int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t **ppBlock); void s3EvictCache(const char *path, long object_size); long s3Size(const char *object_name); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 09bcba9b61..38fbf42915 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -3048,24 +3048,26 @@ static void getBCacheKey(int32_t fid, int64_t commitID, int64_t blkno, char *key } static int32_t tsdbCacheLoadBlockS3(STsdbFD *pFD, uint8_t **ppBlock) { - int32_t code = 0; + int32_t code = 0; + /* uint8_t *pBlock = taosMemoryCalloc(1, tsS3BlockSize * pFD->szPage); if (pBlock == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } - + */ int64_t block_offset = (pFD->blkno - 1) * tsS3BlockSize * pFD->szPage; - code = s3GetObjectBlock(pFD->objName, block_offset, tsS3BlockSize * pFD->szPage, pBlock); + // int64_t size = 4096; + code = s3GetObjectBlock(pFD->objName, block_offset, tsS3BlockSize * pFD->szPage, ppBlock); if (code != TSDB_CODE_SUCCESS) { - taosMemoryFree(pBlock); + // taosMemoryFree(pBlock); code = TSDB_CODE_OUT_OF_MEMORY; return code; } - *ppBlock = pBlock; + //*ppBlock = pBlock; - tsdbTrace("block:%p load from s3", pBlock); + tsdbTrace("block:%p load from s3", *ppBlock); _exit: return code; diff --git a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c index dc5e3649cc..161025a7f5 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c @@ -101,7 +101,7 @@ int32_t tsdbDataFileReaderOpen(const char *fname[], const SDataFileReaderConfig if (fname) { for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) { if (fname[i]) { - code = tsdbOpenFile(fname[i], config->szPage, TD_FILE_READ, &reader[0]->fd[i]); + code = tsdbOpenFile(fname[i], config->tsdb, TD_FILE_READ, &reader[0]->fd[i]); TSDB_CHECK_CODE(code, lino, _exit); } } @@ -110,7 +110,7 @@ int32_t tsdbDataFileReaderOpen(const char *fname[], const SDataFileReaderConfig if (config->files[i].exist) { char fname1[TSDB_FILENAME_LEN]; tsdbTFileName(config->tsdb, &config->files[i].file, fname1); - code = tsdbOpenFile(fname1, config->szPage, TD_FILE_READ, &reader[0]->fd[i]); + code = tsdbOpenFile(fname1, config->tsdb, TD_FILE_READ, &reader[0]->fd[i]); TSDB_CHECK_CODE(code, lino, _exit); } } @@ -1475,7 +1475,7 @@ static int32_t tsdbDataFileWriterOpenDataFD(SDataFileWriter *writer) { } tsdbTFileName(writer->config->tsdb, &writer->files[ftype], fname); - code = tsdbOpenFile(fname, writer->config->szPage, flag, &writer->fd[ftype]); + code = tsdbOpenFile(fname, writer->config->tsdb, flag, &writer->fd[ftype]); TSDB_CHECK_CODE(code, lino, _exit); if (writer->files[ftype].size == 0) { @@ -1643,7 +1643,7 @@ static int32_t tsdbDataFileWriterOpenTombFD(SDataFileWriter *writer) { int32_t flag = (TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); tsdbTFileName(writer->config->tsdb, writer->files + ftype, fname); - code = tsdbOpenFile(fname, writer->config->szPage, flag, &writer->fd[ftype]); + code = tsdbOpenFile(fname, writer->config->tsdb, flag, &writer->fd[ftype]); TSDB_CHECK_CODE(code, lino, _exit); uint8_t hdr[TSDB_FHDR_SIZE] = {0}; @@ -1693,4 +1693,4 @@ _exit: TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); } return code; -} \ No newline at end of file +} diff --git a/source/dnode/vnode/src/tsdb/tsdbDef.h b/source/dnode/vnode/src/tsdb/tsdbDef.h index e768f68b15..da2445dee5 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDef.h +++ b/source/dnode/vnode/src/tsdb/tsdbDef.h @@ -31,7 +31,7 @@ typedef struct SFDataPtr { int64_t size; } SFDataPtr; -extern int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **ppFD); +extern int32_t tsdbOpenFile(const char *path, STsdb *pTsdb, int32_t flag, STsdbFD **ppFD); extern void tsdbCloseFile(STsdbFD **ppFD); extern int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, const uint8_t *pBuf, int64_t size); extern int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size); @@ -41,4 +41,4 @@ extern int32_t tsdbFsyncFile(STsdbFD *pFD); } #endif -#endif /*_TD_TSDB_DEF_H_*/ \ No newline at end of file +#endif /*_TD_TSDB_DEF_H_*/ diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index c095f6ca21..962185847d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -45,6 +45,7 @@ static int32_t tsdbOpenFileImpl(STsdbFD *pFD) { int32_t vid = 0; sscanf(object_name, "v%df%dver%" PRId64 ".data", &vid, &pFD->fid, &pFD->cid); pFD->objName = object_name; + pFD->szFile = s3_size; #endif } else { code = TAOS_SYSTEM_ERROR(errsv); @@ -80,9 +81,10 @@ _exit: } // =============== PAGE-WISE FILE =============== -int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **ppFD) { +int32_t tsdbOpenFile(const char *path, STsdb *pTsdb, int32_t flag, STsdbFD **ppFD) { int32_t code = 0; STsdbFD *pFD = NULL; + int32_t szPage = pTsdb->pVnode->config.tsdbPageSize; *ppFD = NULL; @@ -98,6 +100,7 @@ int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **p pFD->flag = flag; pFD->szPage = szPage; pFD->pgno = 0; + pFD->pTsdb = pTsdb; *ppFD = pFD; @@ -322,7 +325,7 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS // head flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; tsdbHeadFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fHead, fname); - code = tsdbOpenFile(fname, szPage, flag, &pWriter->pHeadFD); + code = tsdbOpenFile(fname, pTsdb, flag, &pWriter->pHeadFD); if (code) goto _err; code = tsdbWriteFile(pWriter->pHeadFD, 0, hdr, TSDB_FHDR_SIZE); @@ -336,7 +339,7 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS flag = TD_FILE_READ | TD_FILE_WRITE; } tsdbDataFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fData, fname); - code = tsdbOpenFile(fname, szPage, flag, &pWriter->pDataFD); + code = tsdbOpenFile(fname, pTsdb, flag, &pWriter->pDataFD); if (code) goto _err; if (pWriter->fData.size == 0) { code = tsdbWriteFile(pWriter->pDataFD, 0, hdr, TSDB_FHDR_SIZE); @@ -351,7 +354,7 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS flag = TD_FILE_READ | TD_FILE_WRITE; } tsdbSmaFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fSma, fname); - code = tsdbOpenFile(fname, szPage, flag, &pWriter->pSmaFD); + code = tsdbOpenFile(fname, pTsdb, flag, &pWriter->pSmaFD); if (code) goto _err; if (pWriter->fSma.size == 0) { code = tsdbWriteFile(pWriter->pSmaFD, 0, hdr, TSDB_FHDR_SIZE); @@ -364,7 +367,7 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS ASSERT(pWriter->fStt[pSet->nSttF - 1].size == 0); flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; tsdbSttFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fStt[pSet->nSttF - 1], fname); - code = tsdbOpenFile(fname, szPage, flag, &pWriter->pSttFD); + code = tsdbOpenFile(fname, pTsdb, flag, &pWriter->pSttFD); if (code) goto _err; code = tsdbWriteFile(pWriter->pSttFD, 0, hdr, TSDB_FHDR_SIZE); if (code) goto _err; @@ -936,23 +939,23 @@ int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pS // head tsdbHeadFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pHeadF, fname); - code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->pHeadFD); + code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->pHeadFD); TSDB_CHECK_CODE(code, lino, _exit); // data tsdbDataFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pDataF, fname); - code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->pDataFD); + code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->pDataFD); TSDB_CHECK_CODE(code, lino, _exit); // sma tsdbSmaFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pSmaF, fname); - code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->pSmaFD); + code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->pSmaFD); TSDB_CHECK_CODE(code, lino, _exit); // stt for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) { tsdbSttFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSttF[iStt], fname); - code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->aSttFD[iStt]); + code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->aSttFD[iStt]); TSDB_CHECK_CODE(code, lino, _exit); } @@ -1352,8 +1355,7 @@ int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb pDelFWriter->fDel = *pFile; tsdbDelFileName(pTsdb, pFile, fname); - code = tsdbOpenFile(fname, pTsdb->pVnode->config.tsdbPageSize, TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE, - &pDelFWriter->pWriteH); + code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE, &pDelFWriter->pWriteH); TSDB_CHECK_CODE(code, lino, _exit); // update header @@ -1527,7 +1529,7 @@ int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb pDelFReader->fDel = *pFile; tsdbDelFileName(pTsdb, pFile, fname); - code = tsdbOpenFile(fname, pTsdb->pVnode->config.tsdbPageSize, TD_FILE_READ, &pDelFReader->pReadH); + code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pDelFReader->pReadH); if (code) { taosMemoryFree(pDelFReader); goto _exit; diff --git a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c index 27fae9dc6e..fa8d2d5ba4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c @@ -46,12 +46,12 @@ int32_t tsdbSttFileReaderOpen(const char *fname, const SSttFileReaderConfig *con // open file if (fname) { - code = tsdbOpenFile(fname, config->szPage, TD_FILE_READ, &reader[0]->fd); + code = tsdbOpenFile(fname, config->tsdb, TD_FILE_READ, &reader[0]->fd); TSDB_CHECK_CODE(code, lino, _exit); } else { char fname1[TSDB_FILENAME_LEN]; tsdbTFileName(config->tsdb, config->file, fname1); - code = tsdbOpenFile(fname1, config->szPage, TD_FILE_READ, &reader[0]->fd); + code = tsdbOpenFile(fname1, config->tsdb, TD_FILE_READ, &reader[0]->fd); TSDB_CHECK_CODE(code, lino, _exit); } @@ -705,7 +705,7 @@ static int32_t tsdbSttFWriterDoOpen(SSttFileWriter *writer) { char fname[TSDB_FILENAME_LEN]; tsdbTFileName(writer->config->tsdb, writer->file, fname); - code = tsdbOpenFile(fname, writer->config->szPage, flag, &writer->fd); + code = tsdbOpenFile(fname, writer->config->tsdb, flag, &writer->fd); TSDB_CHECK_CODE(code, lino, _exit); uint8_t hdr[TSDB_FHDR_SIZE] = {0}; @@ -984,4 +984,4 @@ _exit: return code; } -bool tsdbSttFileWriterIsOpened(SSttFileWriter *writer) { return writer->ctx->opened; } \ No newline at end of file +bool tsdbSttFileWriterIsOpened(SSttFileWriter *writer) { return writer->ctx->opened; } diff --git a/source/dnode/vnode/src/tsdb/tsdbUpgrade.c b/source/dnode/vnode/src/tsdb/tsdbUpgrade.c index 3b38a0ae45..0884c32385 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUpgrade.c +++ b/source/dnode/vnode/src/tsdb/tsdbUpgrade.c @@ -87,7 +87,7 @@ static int32_t tsdbUpgradeHead(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader * char fname[TSDB_FILENAME_LEN]; tsdbTFileName(tsdb, &file, fname); - code = tsdbOpenFile(fname, ctx->szPage, TD_FILE_READ | TD_FILE_WRITE, &ctx->fd); + code = tsdbOpenFile(fname, tsdb, TD_FILE_READ | TD_FILE_WRITE, &ctx->fd); TSDB_CHECK_CODE(code, lino, _exit); // convert @@ -257,7 +257,7 @@ static int32_t tsdbUpgradeSttFile(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReade code = tsdbTFileObjInit(tsdb, &file, &fobj); TSDB_CHECK_CODE(code, lino, _exit1); - code = tsdbOpenFile(fobj->fname, ctx->szPage, TD_FILE_READ | TD_FILE_WRITE, &ctx->fd); + code = tsdbOpenFile(fobj->fname, tsdb, TD_FILE_READ | TD_FILE_WRITE, &ctx->fd); TSDB_CHECK_CODE(code, lino, _exit1); for (int32_t iSttBlk = 0; iSttBlk < taosArrayGetSize(aSttBlk); iSttBlk++) { @@ -408,8 +408,7 @@ static int32_t tsdbUpgradeOpenTombFile(STsdb *tsdb, STFileSet *fset, STsdbFD **f } char fname[TSDB_FILENAME_LEN] = {0}; - code = tsdbOpenFile(fobj[0]->fname, tsdb->pVnode->config.tsdbPageSize, - TD_FILE_READ | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_CREATE, fd); + code = tsdbOpenFile(fobj[0]->fname, tsdb, TD_FILE_READ | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_CREATE, fd); TSDB_CHECK_CODE(code, lino, _exit); uint8_t hdr[TSDB_FHDR_SIZE] = {0}; @@ -633,4 +632,4 @@ int32_t tsdbCheckAndUpgradeFileSystem(STsdb *tsdb, int8_t rollback) { taosRemoveFile(fname); return 0; -} \ No newline at end of file +} diff --git a/source/dnode/vnode/src/vnd/vnodeCos.c b/source/dnode/vnode/src/vnd/vnodeCos.c index f7b5c4f34d..e6c3b87e94 100644 --- a/source/dnode/vnode/src/vnd/vnodeCos.c +++ b/source/dnode/vnode/src/vnd/vnodeCos.c @@ -261,7 +261,7 @@ bool s3Get(const char *object_name, const char *path) { return ret; } -int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t block_size, uint8_t *pBlock) { +int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t block_size, uint8_t **ppBlock) { int32_t code = 0; cos_pool_t *p = NULL; int is_cname = 0; @@ -327,7 +327,7 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t block_ //销毁内存池 cos_pool_destroy(p); - pBlock = buf; + *ppBlock = buf; return code; } @@ -453,7 +453,7 @@ void s3DeleteObjectsByPrefix(const char *prefix) {} void s3DeleteObjects(const char *object_name[], int nobject) {} bool s3Exists(const char *object_name) { return false; } bool s3Get(const char *object_name, const char *path) { return false; } -int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t *pBlock) { return 0; } +int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t **ppBlock) { return 0; } void s3EvictCache(const char *path, long object_size) {} long s3Size(const char *object_name) { return 0; } From 4c151f20313d94cdb144b9c94d3f396a80bde82f Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Fri, 22 Sep 2023 14:06:06 +0800 Subject: [PATCH 7/7] tsdb/readerwriter: remove size with download --- source/dnode/vnode/src/tsdb/tsdbReaderWriter.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 962185847d..c143bb8a72 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -45,7 +45,7 @@ static int32_t tsdbOpenFileImpl(STsdbFD *pFD) { int32_t vid = 0; sscanf(object_name, "v%df%dver%" PRId64 ".data", &vid, &pFD->fid, &pFD->cid); pFD->objName = object_name; - pFD->szFile = s3_size; + // pFD->szFile = s3_size; #endif } else { code = TAOS_SYSTEM_ERROR(errsv);