Merge pull request #22927 from taosdata/fix/TD-26189
s3/block_cache: new tsdb block cache
This commit is contained in:
commit
849b05a09b
|
@ -264,6 +264,9 @@ char tsS3BucketName[TSDB_FQDN_LEN] = "<bucketname>";
|
|||
char tsS3AppId[TSDB_FQDN_LEN] = "<appid>";
|
||||
int8_t tsS3Enabled = false;
|
||||
|
||||
int32_t tsS3BlockSize = 4096; // number of tsdb pages
|
||||
int32_t tsS3BlockCacheSize = 16; // number of blocks
|
||||
|
||||
int32_t tsCheckpointInterval = 20;
|
||||
|
||||
#ifndef _STORAGE
|
||||
|
@ -321,7 +324,9 @@ int32_t taosSetS3Cfg(SConfig *pCfg) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
struct SConfig *taosGetCfg() { return tsCfg; }
|
||||
struct SConfig *taosGetCfg() {
|
||||
return tsCfg;
|
||||
}
|
||||
|
||||
static int32_t taosLoadCfg(SConfig *pCfg, const char **envCmd, const char *inputCfgDir, const char *envFile,
|
||||
char *apolloUrl) {
|
||||
|
@ -655,6 +660,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
|||
if (cfgAddString(pCfg, "s3Accesskey", tsS3AccessKey, CFG_SCOPE_SERVER) != 0) return -1;
|
||||
if (cfgAddString(pCfg, "s3Endpoint", tsS3Endpoint, CFG_SCOPE_SERVER) != 0) return -1;
|
||||
if (cfgAddString(pCfg, "s3BucketName", tsS3BucketName, CFG_SCOPE_SERVER) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "s3BlockSize", tsS3BlockSize, 2048, 1024 * 1024, CFG_SCOPE_SERVER) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "s3BlockCacheSize", tsS3BlockCacheSize, 4, 1024 * 1024, CFG_SCOPE_SERVER) != 0) return -1;
|
||||
|
||||
// min free disk space used to check if the disk is full [50MB, 1GB]
|
||||
if (cfgAddInt64(pCfg, "minDiskFreeSize", tsMinDiskFreeSize, TFS_MIN_DISK_FREE_SIZE, 1024 * 1024 * 1024,
|
||||
|
@ -1070,6 +1077,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
|||
tsResolveFQDNRetryTime = cfgGetItem(pCfg, "resolveFQDNRetryTime")->i32;
|
||||
tsMinDiskFreeSize = cfgGetItem(pCfg, "minDiskFreeSize")->i64;
|
||||
|
||||
tsS3BlockSize = cfgGetItem(pCfg, "s3BlockSize")->i32;
|
||||
tsS3BlockCacheSize = cfgGetItem(pCfg, "s3BlockCacheSize")->i32;
|
||||
|
||||
GRANT_CFG_GET;
|
||||
return 0;
|
||||
}
|
||||
|
@ -1643,6 +1653,20 @@ void taosCfgDynamicOptions(const char *option, const char *value) {
|
|||
return;
|
||||
}
|
||||
|
||||
if (strcasecmp(option, "s3BlockSize") == 0) {
|
||||
int32_t newS3BlockSize = atoi(value);
|
||||
uInfo("s3BlockSize set from %d to %d", tsS3BlockSize, newS3BlockSize);
|
||||
tsS3BlockSize = newS3BlockSize;
|
||||
return;
|
||||
}
|
||||
|
||||
if (strcasecmp(option, "s3BlockCacheSize") == 0) {
|
||||
int32_t newS3BlockCacheSize = atoi(value);
|
||||
uInfo("s3BlockCacheSize set from %d to %d", tsS3BlockCacheSize, newS3BlockCacheSize);
|
||||
tsS3BlockCacheSize = newS3BlockCacheSize;
|
||||
return;
|
||||
}
|
||||
|
||||
if (strcasecmp(option, "keepTimeOffset") == 0) {
|
||||
int32_t newKeepTimeOffset = atoi(value);
|
||||
uInfo("keepTimeOffset set from %d to %d", tsKeepTimeOffset, newKeepTimeOffset);
|
||||
|
|
|
@ -380,6 +380,8 @@ struct STsdb {
|
|||
TdThreadMutex lruMutex;
|
||||
SLRUCache *biCache;
|
||||
TdThreadMutex biMutex;
|
||||
SLRUCache *bCache;
|
||||
TdThreadMutex bMutex;
|
||||
struct STFileSystem *pFS; // new
|
||||
SRocksCache rCache;
|
||||
};
|
||||
|
@ -650,6 +652,12 @@ typedef struct {
|
|||
int64_t pgno;
|
||||
uint8_t *pBuf;
|
||||
int64_t szFile;
|
||||
STsdb *pTsdb;
|
||||
const char *objName;
|
||||
uint8_t s3File;
|
||||
int32_t fid;
|
||||
int64_t cid;
|
||||
int64_t blkno;
|
||||
} STsdbFD;
|
||||
|
||||
struct SDelFWriter {
|
||||
|
@ -861,6 +869,9 @@ int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h);
|
|||
int32_t tsdbCacheGetBlockIdx(SLRUCache *pCache, SDataFReader *pFileReader, LRUHandle **handle);
|
||||
int32_t tsdbBICacheRelease(SLRUCache *pCache, LRUHandle *h);
|
||||
|
||||
int32_t tsdbCacheGetBlockS3(SLRUCache *pCache, STsdbFD *pFD, LRUHandle **handle);
|
||||
int32_t tsdbBCacheRelease(SLRUCache *pCache, LRUHandle *h);
|
||||
|
||||
int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
|
||||
int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
|
||||
int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
|
||||
|
|
|
@ -22,15 +22,21 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
#define S3_BLOCK_CACHE
|
||||
|
||||
extern int8_t tsS3Enabled;
|
||||
extern int32_t tsS3BlockSize;
|
||||
extern int32_t tsS3BlockCacheSize;
|
||||
|
||||
int32_t s3Init();
|
||||
void s3CleanUp();
|
||||
int32_t s3PutObjectFromFile(const char *file, const char *object);
|
||||
int32_t s3PutObjectFromFile2(const char *file, const char *object);
|
||||
void s3DeleteObjectsByPrefix(const char *prefix);
|
||||
void s3DeleteObjects(const char *object_name[], int nobject);
|
||||
bool s3Exists(const char *object_name);
|
||||
bool s3Get(const char *object_name, const char *path);
|
||||
int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t **ppBlock);
|
||||
void s3EvictCache(const char *path, long object_size);
|
||||
long s3Size(const char *object_name);
|
||||
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
#include "tsdbDataFileRW.h"
|
||||
#include "tsdbReadUtil.h"
|
||||
#include "vnd.h"
|
||||
#include "vndCos.h"
|
||||
|
||||
#define ROCKS_BATCH_SIZE (4096)
|
||||
|
||||
|
@ -51,6 +52,41 @@ static void tsdbCloseBICache(STsdb *pTsdb) {
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t tsdbOpenBCache(STsdb *pTsdb) {
|
||||
int32_t code = 0;
|
||||
// SLRUCache *pCache = taosLRUCacheInit(10 * 1024 * 1024, 0, .5);
|
||||
int32_t szPage = pTsdb->pVnode->config.tsdbPageSize;
|
||||
|
||||
SLRUCache *pCache = taosLRUCacheInit(tsS3BlockCacheSize * tsS3BlockSize * szPage, 0, .5);
|
||||
if (pCache == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
taosLRUCacheSetStrictCapacity(pCache, false);
|
||||
|
||||
taosThreadMutexInit(&pTsdb->bMutex, NULL);
|
||||
|
||||
_err:
|
||||
pTsdb->bCache = pCache;
|
||||
return code;
|
||||
}
|
||||
|
||||
static void tsdbCloseBCache(STsdb *pTsdb) {
|
||||
SLRUCache *pCache = pTsdb->bCache;
|
||||
if (pCache) {
|
||||
int32_t elems = taosLRUCacheGetElems(pCache);
|
||||
tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
|
||||
taosLRUCacheEraseUnrefEntries(pCache);
|
||||
elems = taosLRUCacheGetElems(pCache);
|
||||
tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
|
||||
|
||||
taosLRUCacheCleanup(pCache);
|
||||
|
||||
taosThreadMutexDestroy(&pTsdb->bMutex);
|
||||
}
|
||||
}
|
||||
|
||||
#define ROCKS_KEY_LEN (sizeof(tb_uid_t) + sizeof(int16_t) + sizeof(int8_t))
|
||||
|
||||
typedef struct {
|
||||
|
@ -1149,6 +1185,12 @@ int32_t tsdbOpenCache(STsdb *pTsdb) {
|
|||
goto _err;
|
||||
}
|
||||
|
||||
code = tsdbOpenBCache(pTsdb);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
code = tsdbOpenRocksCache(pTsdb);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -1178,6 +1220,7 @@ void tsdbCloseCache(STsdb *pTsdb) {
|
|||
}
|
||||
|
||||
tsdbCloseBICache(pTsdb);
|
||||
tsdbCloseBCache(pTsdb);
|
||||
tsdbCloseRocksCache(pTsdb);
|
||||
}
|
||||
|
||||
|
@ -2987,3 +3030,100 @@ int32_t tsdbBICacheRelease(SLRUCache *pCache, LRUHandle *h) {
|
|||
|
||||
return code;
|
||||
}
|
||||
|
||||
// block cache
|
||||
static void getBCacheKey(int32_t fid, int64_t commitID, int64_t blkno, char *key, int *len) {
|
||||
struct {
|
||||
int32_t fid;
|
||||
int64_t commitID;
|
||||
int64_t blkno;
|
||||
} bKey = {0};
|
||||
|
||||
bKey.fid = fid;
|
||||
bKey.commitID = commitID;
|
||||
bKey.blkno = blkno;
|
||||
|
||||
*len = sizeof(bKey);
|
||||
memcpy(key, &bKey, *len);
|
||||
}
|
||||
|
||||
static int32_t tsdbCacheLoadBlockS3(STsdbFD *pFD, uint8_t **ppBlock) {
|
||||
int32_t code = 0;
|
||||
/*
|
||||
uint8_t *pBlock = taosMemoryCalloc(1, tsS3BlockSize * pFD->szPage);
|
||||
if (pBlock == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
*/
|
||||
int64_t block_offset = (pFD->blkno - 1) * tsS3BlockSize * pFD->szPage;
|
||||
// int64_t size = 4096;
|
||||
code = s3GetObjectBlock(pFD->objName, block_offset, tsS3BlockSize * pFD->szPage, ppBlock);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
// taosMemoryFree(pBlock);
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return code;
|
||||
}
|
||||
|
||||
//*ppBlock = pBlock;
|
||||
|
||||
tsdbTrace("block:%p load from s3", *ppBlock);
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
static void deleteBCache(const void *key, size_t keyLen, void *value, void *ud) {
|
||||
(void)ud;
|
||||
uint8_t *pBlock = (uint8_t *)value;
|
||||
|
||||
taosMemoryFree(pBlock);
|
||||
}
|
||||
|
||||
int32_t tsdbCacheGetBlockS3(SLRUCache *pCache, STsdbFD *pFD, LRUHandle **handle) {
|
||||
int32_t code = 0;
|
||||
char key[128] = {0};
|
||||
int keyLen = 0;
|
||||
|
||||
getBCacheKey(pFD->fid, pFD->cid, pFD->blkno, key, &keyLen);
|
||||
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
|
||||
if (!h) {
|
||||
STsdb *pTsdb = pFD->pTsdb;
|
||||
taosThreadMutexLock(&pTsdb->bMutex);
|
||||
|
||||
h = taosLRUCacheLookup(pCache, key, keyLen);
|
||||
if (!h) {
|
||||
uint8_t *pBlock = NULL;
|
||||
code = tsdbCacheLoadBlockS3(pFD, &pBlock);
|
||||
// if table's empty or error, return code of -1
|
||||
if (code != TSDB_CODE_SUCCESS || pBlock == NULL) {
|
||||
taosThreadMutexUnlock(&pTsdb->bMutex);
|
||||
|
||||
*handle = NULL;
|
||||
return 0;
|
||||
}
|
||||
|
||||
size_t charge = tsS3BlockSize * pFD->szPage;
|
||||
_taos_lru_deleter_t deleter = deleteBCache;
|
||||
LRUStatus status =
|
||||
taosLRUCacheInsert(pCache, key, keyLen, pBlock, charge, deleter, &h, TAOS_LRU_PRIORITY_LOW, NULL);
|
||||
if (status != TAOS_LRU_STATUS_OK) {
|
||||
code = -1;
|
||||
}
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&pTsdb->bMutex);
|
||||
}
|
||||
|
||||
*handle = h;
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbBCacheRelease(SLRUCache *pCache, LRUHandle *h) {
|
||||
int32_t code = 0;
|
||||
|
||||
taosLRUCacheRelease(pCache, h, false);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -101,7 +101,7 @@ int32_t tsdbDataFileReaderOpen(const char *fname[], const SDataFileReaderConfig
|
|||
if (fname) {
|
||||
for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) {
|
||||
if (fname[i]) {
|
||||
code = tsdbOpenFile(fname[i], config->szPage, TD_FILE_READ, &reader[0]->fd[i]);
|
||||
code = tsdbOpenFile(fname[i], config->tsdb, TD_FILE_READ, &reader[0]->fd[i]);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
}
|
||||
|
@ -110,7 +110,7 @@ int32_t tsdbDataFileReaderOpen(const char *fname[], const SDataFileReaderConfig
|
|||
if (config->files[i].exist) {
|
||||
char fname1[TSDB_FILENAME_LEN];
|
||||
tsdbTFileName(config->tsdb, &config->files[i].file, fname1);
|
||||
code = tsdbOpenFile(fname1, config->szPage, TD_FILE_READ, &reader[0]->fd[i]);
|
||||
code = tsdbOpenFile(fname1, config->tsdb, TD_FILE_READ, &reader[0]->fd[i]);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
}
|
||||
|
@ -1466,7 +1466,7 @@ static int32_t tsdbDataFileWriterOpenDataFD(SDataFileWriter *writer) {
|
|||
}
|
||||
|
||||
tsdbTFileName(writer->config->tsdb, &writer->files[ftype], fname);
|
||||
code = tsdbOpenFile(fname, writer->config->szPage, flag, &writer->fd[ftype]);
|
||||
code = tsdbOpenFile(fname, writer->config->tsdb, flag, &writer->fd[ftype]);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
if (writer->files[ftype].size == 0) {
|
||||
|
@ -1634,7 +1634,7 @@ static int32_t tsdbDataFileWriterOpenTombFD(SDataFileWriter *writer) {
|
|||
int32_t flag = (TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
|
||||
|
||||
tsdbTFileName(writer->config->tsdb, writer->files + ftype, fname);
|
||||
code = tsdbOpenFile(fname, writer->config->szPage, flag, &writer->fd[ftype]);
|
||||
code = tsdbOpenFile(fname, writer->config->tsdb, flag, &writer->fd[ftype]);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
uint8_t hdr[TSDB_FHDR_SIZE] = {0};
|
||||
|
|
|
@ -31,7 +31,7 @@ typedef struct SFDataPtr {
|
|||
int64_t size;
|
||||
} SFDataPtr;
|
||||
|
||||
extern int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **ppFD);
|
||||
extern int32_t tsdbOpenFile(const char *path, STsdb *pTsdb, int32_t flag, STsdbFD **ppFD);
|
||||
extern void tsdbCloseFile(STsdbFD **ppFD);
|
||||
extern int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, const uint8_t *pBuf, int64_t size);
|
||||
extern int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size);
|
||||
|
|
|
@ -28,6 +28,7 @@ static int32_t tsdbOpenFileImpl(STsdbFD *pFD) {
|
|||
const char *object_name = taosDirEntryBaseName((char *)path);
|
||||
long s3_size = tsS3Enabled ? s3Size(object_name) : 0;
|
||||
if (tsS3Enabled && !strncmp(path + strlen(path) - 5, ".data", 5) && s3_size > 0) {
|
||||
#ifndef S3_BLOCK_CACHE
|
||||
s3EvictCache(path, s3_size);
|
||||
s3Get(object_name, path);
|
||||
|
||||
|
@ -38,6 +39,14 @@ static int32_t tsdbOpenFileImpl(STsdbFD *pFD) {
|
|||
// taosMemoryFree(pFD);
|
||||
goto _exit;
|
||||
}
|
||||
#else
|
||||
pFD->s3File = 1;
|
||||
pFD->pFD = (TdFilePtr)&pFD->s3File;
|
||||
int32_t vid = 0;
|
||||
sscanf(object_name, "v%df%dver%" PRId64 ".data", &vid, &pFD->fid, &pFD->cid);
|
||||
pFD->objName = object_name;
|
||||
// pFD->szFile = s3_size;
|
||||
#endif
|
||||
} else {
|
||||
code = TAOS_SYSTEM_ERROR(errsv);
|
||||
// taosMemoryFree(pFD);
|
||||
|
@ -72,9 +81,10 @@ _exit:
|
|||
}
|
||||
|
||||
// =============== PAGE-WISE FILE ===============
|
||||
int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **ppFD) {
|
||||
int32_t tsdbOpenFile(const char *path, STsdb *pTsdb, int32_t flag, STsdbFD **ppFD) {
|
||||
int32_t code = 0;
|
||||
STsdbFD *pFD = NULL;
|
||||
int32_t szPage = pTsdb->pVnode->config.tsdbPageSize;
|
||||
|
||||
*ppFD = NULL;
|
||||
|
||||
|
@ -90,6 +100,7 @@ int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **p
|
|||
pFD->flag = flag;
|
||||
pFD->szPage = szPage;
|
||||
pFD->pgno = 0;
|
||||
pFD->pTsdb = pTsdb;
|
||||
|
||||
*ppFD = pFD;
|
||||
|
||||
|
@ -101,7 +112,9 @@ void tsdbCloseFile(STsdbFD **ppFD) {
|
|||
STsdbFD *pFD = *ppFD;
|
||||
if (pFD) {
|
||||
taosMemoryFree(pFD->pBuf);
|
||||
if (!pFD->s3File) {
|
||||
taosCloseFile(&pFD->pFD);
|
||||
}
|
||||
taosMemoryFree(pFD);
|
||||
*ppFD = NULL;
|
||||
}
|
||||
|
@ -153,8 +166,26 @@ static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno) {
|
|||
}
|
||||
}
|
||||
|
||||
// seek
|
||||
int64_t offset = PAGE_OFFSET(pgno, pFD->szPage);
|
||||
|
||||
if (pFD->s3File) {
|
||||
LRUHandle *handle = NULL;
|
||||
|
||||
pFD->blkno = (pgno + tsS3BlockSize - 1) / tsS3BlockSize;
|
||||
int32_t code = tsdbCacheGetBlockS3(pFD->pTsdb->bCache, pFD, &handle);
|
||||
if (code != TSDB_CODE_SUCCESS || handle == NULL) {
|
||||
tsdbBCacheRelease(pFD->pTsdb->bCache, handle);
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
uint8_t *pBlock = (uint8_t *)taosLRUCacheValue(pFD->pTsdb->bCache, handle);
|
||||
|
||||
int64_t blk_offset = (pFD->blkno - 1) * tsS3BlockSize * pFD->szPage;
|
||||
memcpy(pFD->pBuf, pBlock + (offset - blk_offset), pFD->szPage);
|
||||
|
||||
tsdbBCacheRelease(pFD->pTsdb->bCache, handle);
|
||||
} else {
|
||||
// seek
|
||||
int64_t n = taosLSeekFile(pFD->pFD, offset, SEEK_SET);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
|
@ -170,6 +201,7 @@ static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno) {
|
|||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
goto _exit;
|
||||
}
|
||||
}
|
||||
|
||||
// check
|
||||
if (pgno > 1 && !taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) {
|
||||
|
@ -293,7 +325,7 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS
|
|||
// head
|
||||
flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
|
||||
tsdbHeadFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fHead, fname);
|
||||
code = tsdbOpenFile(fname, szPage, flag, &pWriter->pHeadFD);
|
||||
code = tsdbOpenFile(fname, pTsdb, flag, &pWriter->pHeadFD);
|
||||
if (code) goto _err;
|
||||
|
||||
code = tsdbWriteFile(pWriter->pHeadFD, 0, hdr, TSDB_FHDR_SIZE);
|
||||
|
@ -307,7 +339,7 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS
|
|||
flag = TD_FILE_READ | TD_FILE_WRITE;
|
||||
}
|
||||
tsdbDataFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fData, fname);
|
||||
code = tsdbOpenFile(fname, szPage, flag, &pWriter->pDataFD);
|
||||
code = tsdbOpenFile(fname, pTsdb, flag, &pWriter->pDataFD);
|
||||
if (code) goto _err;
|
||||
if (pWriter->fData.size == 0) {
|
||||
code = tsdbWriteFile(pWriter->pDataFD, 0, hdr, TSDB_FHDR_SIZE);
|
||||
|
@ -322,7 +354,7 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS
|
|||
flag = TD_FILE_READ | TD_FILE_WRITE;
|
||||
}
|
||||
tsdbSmaFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fSma, fname);
|
||||
code = tsdbOpenFile(fname, szPage, flag, &pWriter->pSmaFD);
|
||||
code = tsdbOpenFile(fname, pTsdb, flag, &pWriter->pSmaFD);
|
||||
if (code) goto _err;
|
||||
if (pWriter->fSma.size == 0) {
|
||||
code = tsdbWriteFile(pWriter->pSmaFD, 0, hdr, TSDB_FHDR_SIZE);
|
||||
|
@ -335,7 +367,7 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS
|
|||
ASSERT(pWriter->fStt[pSet->nSttF - 1].size == 0);
|
||||
flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
|
||||
tsdbSttFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fStt[pSet->nSttF - 1], fname);
|
||||
code = tsdbOpenFile(fname, szPage, flag, &pWriter->pSttFD);
|
||||
code = tsdbOpenFile(fname, pTsdb, flag, &pWriter->pSttFD);
|
||||
if (code) goto _err;
|
||||
code = tsdbWriteFile(pWriter->pSttFD, 0, hdr, TSDB_FHDR_SIZE);
|
||||
if (code) goto _err;
|
||||
|
@ -907,23 +939,23 @@ int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pS
|
|||
|
||||
// head
|
||||
tsdbHeadFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pHeadF, fname);
|
||||
code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->pHeadFD);
|
||||
code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->pHeadFD);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// data
|
||||
tsdbDataFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pDataF, fname);
|
||||
code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->pDataFD);
|
||||
code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->pDataFD);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// sma
|
||||
tsdbSmaFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pSmaF, fname);
|
||||
code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->pSmaFD);
|
||||
code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->pSmaFD);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// stt
|
||||
for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
|
||||
tsdbSttFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSttF[iStt], fname);
|
||||
code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->aSttFD[iStt]);
|
||||
code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->aSttFD[iStt]);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
|
@ -1323,8 +1355,7 @@ int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb
|
|||
pDelFWriter->fDel = *pFile;
|
||||
|
||||
tsdbDelFileName(pTsdb, pFile, fname);
|
||||
code = tsdbOpenFile(fname, pTsdb->pVnode->config.tsdbPageSize, TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE,
|
||||
&pDelFWriter->pWriteH);
|
||||
code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE, &pDelFWriter->pWriteH);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// update header
|
||||
|
@ -1498,7 +1529,7 @@ int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb
|
|||
pDelFReader->fDel = *pFile;
|
||||
|
||||
tsdbDelFileName(pTsdb, pFile, fname);
|
||||
code = tsdbOpenFile(fname, pTsdb->pVnode->config.tsdbPageSize, TD_FILE_READ, &pDelFReader->pReadH);
|
||||
code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pDelFReader->pReadH);
|
||||
if (code) {
|
||||
taosMemoryFree(pDelFReader);
|
||||
goto _exit;
|
||||
|
|
|
@ -114,7 +114,7 @@ static int32_t tsdbCopyFileS3(SRTNer *rtner, const STFileObj *from, const STFile
|
|||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
char *object_name = taosDirEntryBaseName(fname);
|
||||
code = s3PutObjectFromFile(from->fname, object_name);
|
||||
code = s3PutObjectFromFile2(from->fname, object_name);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
taosCloseFile(&fdFrom);
|
||||
|
|
|
@ -46,12 +46,12 @@ int32_t tsdbSttFileReaderOpen(const char *fname, const SSttFileReaderConfig *con
|
|||
|
||||
// open file
|
||||
if (fname) {
|
||||
code = tsdbOpenFile(fname, config->szPage, TD_FILE_READ, &reader[0]->fd);
|
||||
code = tsdbOpenFile(fname, config->tsdb, TD_FILE_READ, &reader[0]->fd);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
} else {
|
||||
char fname1[TSDB_FILENAME_LEN];
|
||||
tsdbTFileName(config->tsdb, config->file, fname1);
|
||||
code = tsdbOpenFile(fname1, config->szPage, TD_FILE_READ, &reader[0]->fd);
|
||||
code = tsdbOpenFile(fname1, config->tsdb, TD_FILE_READ, &reader[0]->fd);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
|
@ -705,7 +705,7 @@ static int32_t tsdbSttFWriterDoOpen(SSttFileWriter *writer) {
|
|||
char fname[TSDB_FILENAME_LEN];
|
||||
|
||||
tsdbTFileName(writer->config->tsdb, writer->file, fname);
|
||||
code = tsdbOpenFile(fname, writer->config->szPage, flag, &writer->fd);
|
||||
code = tsdbOpenFile(fname, writer->config->tsdb, flag, &writer->fd);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
uint8_t hdr[TSDB_FHDR_SIZE] = {0};
|
||||
|
|
|
@ -87,7 +87,7 @@ static int32_t tsdbUpgradeHead(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *
|
|||
char fname[TSDB_FILENAME_LEN];
|
||||
tsdbTFileName(tsdb, &file, fname);
|
||||
|
||||
code = tsdbOpenFile(fname, ctx->szPage, TD_FILE_READ | TD_FILE_WRITE, &ctx->fd);
|
||||
code = tsdbOpenFile(fname, tsdb, TD_FILE_READ | TD_FILE_WRITE, &ctx->fd);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// convert
|
||||
|
@ -257,7 +257,7 @@ static int32_t tsdbUpgradeSttFile(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReade
|
|||
code = tsdbTFileObjInit(tsdb, &file, &fobj);
|
||||
TSDB_CHECK_CODE(code, lino, _exit1);
|
||||
|
||||
code = tsdbOpenFile(fobj->fname, ctx->szPage, TD_FILE_READ | TD_FILE_WRITE, &ctx->fd);
|
||||
code = tsdbOpenFile(fobj->fname, tsdb, TD_FILE_READ | TD_FILE_WRITE, &ctx->fd);
|
||||
TSDB_CHECK_CODE(code, lino, _exit1);
|
||||
|
||||
for (int32_t iSttBlk = 0; iSttBlk < taosArrayGetSize(aSttBlk); iSttBlk++) {
|
||||
|
@ -408,8 +408,7 @@ static int32_t tsdbUpgradeOpenTombFile(STsdb *tsdb, STFileSet *fset, STsdbFD **f
|
|||
}
|
||||
|
||||
char fname[TSDB_FILENAME_LEN] = {0};
|
||||
code = tsdbOpenFile(fobj[0]->fname, tsdb->pVnode->config.tsdbPageSize,
|
||||
TD_FILE_READ | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_CREATE, fd);
|
||||
code = tsdbOpenFile(fobj[0]->fname, tsdb, TD_FILE_READ | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_CREATE, fd);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
uint8_t hdr[TSDB_FHDR_SIZE] = {0};
|
||||
|
|
|
@ -59,17 +59,19 @@ int32_t s3PutObjectFromFile(const char *file_str, const char *object_str) {
|
|||
cos_request_options_t *options = NULL;
|
||||
cos_string_t bucket, object, file;
|
||||
cos_table_t *resp_headers;
|
||||
int traffic_limit = 0;
|
||||
// int traffic_limit = 0;
|
||||
|
||||
cos_pool_create(&p, NULL);
|
||||
options = cos_request_options_create(p);
|
||||
s3InitRequestOptions(options, is_cname);
|
||||
cos_table_t *headers = NULL;
|
||||
/*
|
||||
if (traffic_limit) {
|
||||
// 限速值设置范围为819200 - 838860800,即100KB/s - 100MB/s,如果超出该范围将返回400错误
|
||||
headers = cos_table_make(p, 1);
|
||||
cos_table_add_int(headers, "x-cos-traffic-limit", 819200);
|
||||
}
|
||||
*/
|
||||
cos_str_set(&bucket, tsS3BucketName);
|
||||
cos_str_set(&file, file_str);
|
||||
cos_str_set(&object, object_str);
|
||||
|
@ -85,6 +87,48 @@ int32_t s3PutObjectFromFile(const char *file_str, const char *object_str) {
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t s3PutObjectFromFile2(const char *file_str, const char *object_str) {
|
||||
int32_t code = 0;
|
||||
cos_pool_t *p = NULL;
|
||||
int is_cname = 0;
|
||||
cos_status_t *s = NULL;
|
||||
cos_request_options_t *options = NULL;
|
||||
cos_string_t bucket, object, file;
|
||||
cos_table_t *resp_headers;
|
||||
int traffic_limit = 0;
|
||||
cos_table_t *headers = NULL;
|
||||
cos_resumable_clt_params_t *clt_params = NULL;
|
||||
|
||||
cos_pool_create(&p, NULL);
|
||||
options = cos_request_options_create(p);
|
||||
s3InitRequestOptions(options, is_cname);
|
||||
headers = cos_table_make(p, 0);
|
||||
cos_str_set(&bucket, tsS3BucketName);
|
||||
cos_str_set(&file, file_str);
|
||||
cos_str_set(&object, object_str);
|
||||
|
||||
// upload
|
||||
clt_params = cos_create_resumable_clt_params_content(p, 1024 * 1024, 8, COS_FALSE, NULL);
|
||||
s = cos_resumable_upload_file(options, &bucket, &object, &file, headers, NULL, clt_params, NULL, &resp_headers, NULL);
|
||||
|
||||
if (!cos_status_is_ok(s)) {
|
||||
vError("s3: %s", s->error_msg);
|
||||
vError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
|
||||
code = terrno;
|
||||
return code;
|
||||
}
|
||||
|
||||
log_status(s);
|
||||
|
||||
cos_pool_destroy(p);
|
||||
|
||||
if (s->code != 200) {
|
||||
return code = s->code;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
void s3DeleteObjectsByPrefix(const char *prefix_str) {
|
||||
cos_pool_t *p = NULL;
|
||||
cos_request_options_t *options = NULL;
|
||||
|
@ -217,6 +261,77 @@ bool s3Get(const char *object_name, const char *path) {
|
|||
return ret;
|
||||
}
|
||||
|
||||
int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t block_size, uint8_t **ppBlock) {
|
||||
int32_t code = 0;
|
||||
cos_pool_t *p = NULL;
|
||||
int is_cname = 0;
|
||||
cos_status_t *s = NULL;
|
||||
cos_request_options_t *options = NULL;
|
||||
cos_string_t bucket;
|
||||
cos_string_t object;
|
||||
cos_table_t *resp_headers;
|
||||
cos_table_t *headers = NULL;
|
||||
cos_buf_t *content = NULL;
|
||||
// cos_string_t file;
|
||||
// int traffic_limit = 0;
|
||||
char range_buf[64];
|
||||
|
||||
//创建内存池
|
||||
cos_pool_create(&p, NULL);
|
||||
|
||||
//初始化请求选项
|
||||
options = cos_request_options_create(p);
|
||||
// init_test_request_options(options, is_cname);
|
||||
s3InitRequestOptions(options, is_cname);
|
||||
cos_str_set(&bucket, tsS3BucketName);
|
||||
cos_str_set(&object, object_name);
|
||||
cos_list_t download_buffer;
|
||||
cos_list_init(&download_buffer);
|
||||
/*
|
||||
if (traffic_limit) {
|
||||
// 限速值设置范围为819200 - 838860800,单位默认为 bit/s,即800Kb/s - 800Mb/s,如果超出该范围将返回400错误
|
||||
headers = cos_table_make(p, 1);
|
||||
cos_table_add_int(headers, "x-cos-traffic-limit", 819200);
|
||||
}
|
||||
*/
|
||||
|
||||
headers = cos_table_create_if_null(options, headers, 1);
|
||||
apr_snprintf(range_buf, sizeof(range_buf), "bytes=%" APR_INT64_T_FMT "-%" APR_INT64_T_FMT, offset,
|
||||
offset + block_size - 1);
|
||||
apr_table_add(headers, COS_RANGE, range_buf);
|
||||
|
||||
s = cos_get_object_to_buffer(options, &bucket, &object, headers, NULL, &download_buffer, &resp_headers);
|
||||
if (!cos_status_is_ok(s)) {
|
||||
vError("s3: %s", s->error_msg);
|
||||
vError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
|
||||
code = terrno;
|
||||
return code;
|
||||
}
|
||||
|
||||
log_status(s);
|
||||
// print_headers(resp_headers);
|
||||
int64_t len = 0;
|
||||
int64_t size = 0;
|
||||
int64_t pos = 0;
|
||||
cos_list_for_each_entry(cos_buf_t, content, &download_buffer, node) { len += cos_buf_size(content); }
|
||||
// char *buf = cos_pcalloc(p, (apr_size_t)(len + 1));
|
||||
char *buf = taosMemoryCalloc(1, (apr_size_t)(len));
|
||||
// buf[len] = '\0';
|
||||
cos_list_for_each_entry(cos_buf_t, content, &download_buffer, node) {
|
||||
size = cos_buf_size(content);
|
||||
memcpy(buf + pos, content->pos, (size_t)size);
|
||||
pos += size;
|
||||
}
|
||||
// cos_warn_log("Download data=%s", buf);
|
||||
|
||||
//销毁内存池
|
||||
cos_pool_destroy(p);
|
||||
|
||||
*ppBlock = buf;
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
int64_t size;
|
||||
int32_t atime;
|
||||
|
@ -333,10 +448,12 @@ long s3Size(const char *object_name) {
|
|||
int32_t s3Init() { return 0; }
|
||||
void s3CleanUp() {}
|
||||
int32_t s3PutObjectFromFile(const char *file, const char *object) { return 0; }
|
||||
int32_t s3PutObjectFromFile2(const char *file, const char *object) { return 0; }
|
||||
void s3DeleteObjectsByPrefix(const char *prefix) {}
|
||||
void s3DeleteObjects(const char *object_name[], int nobject) {}
|
||||
bool s3Exists(const char *object_name) { return false; }
|
||||
bool s3Get(const char *object_name, const char *path) { return false; }
|
||||
int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t **ppBlock) { return 0; }
|
||||
void s3EvictCache(const char *path, long object_size) {}
|
||||
long s3Size(const char *object_name) { return 0; }
|
||||
|
||||
|
|
Loading…
Reference in New Issue