tsdb/reader: use pTsdb handle instead of pgsz to open file

This commit is contained in:
Minglei Jin 2023-09-21 16:36:17 +08:00
parent 99120336e9
commit ce1f866416
8 changed files with 41 additions and 38 deletions

View File

@ -36,7 +36,7 @@ void s3DeleteObjectsByPrefix(const char *prefix);
void s3DeleteObjects(const char *object_name[], int nobject);
bool s3Exists(const char *object_name);
bool s3Get(const char *object_name, const char *path);
int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t *pBlock);
int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t **ppBlock);
void s3EvictCache(const char *path, long object_size);
long s3Size(const char *object_name);

View File

@ -3049,23 +3049,25 @@ static void getBCacheKey(int32_t fid, int64_t commitID, int64_t blkno, char *key
static int32_t tsdbCacheLoadBlockS3(STsdbFD *pFD, uint8_t **ppBlock) {
int32_t code = 0;
/*
uint8_t *pBlock = taosMemoryCalloc(1, tsS3BlockSize * pFD->szPage);
if (pBlock == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
*/
int64_t block_offset = (pFD->blkno - 1) * tsS3BlockSize * pFD->szPage;
code = s3GetObjectBlock(pFD->objName, block_offset, tsS3BlockSize * pFD->szPage, pBlock);
// int64_t size = 4096;
code = s3GetObjectBlock(pFD->objName, block_offset, tsS3BlockSize * pFD->szPage, ppBlock);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pBlock);
// taosMemoryFree(pBlock);
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
}
*ppBlock = pBlock;
//*ppBlock = pBlock;
tsdbTrace("block:%p load from s3", pBlock);
tsdbTrace("block:%p load from s3", *ppBlock);
_exit:
return code;

View File

@ -101,7 +101,7 @@ int32_t tsdbDataFileReaderOpen(const char *fname[], const SDataFileReaderConfig
if (fname) {
for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) {
if (fname[i]) {
code = tsdbOpenFile(fname[i], config->szPage, TD_FILE_READ, &reader[0]->fd[i]);
code = tsdbOpenFile(fname[i], config->tsdb, TD_FILE_READ, &reader[0]->fd[i]);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
@ -110,7 +110,7 @@ int32_t tsdbDataFileReaderOpen(const char *fname[], const SDataFileReaderConfig
if (config->files[i].exist) {
char fname1[TSDB_FILENAME_LEN];
tsdbTFileName(config->tsdb, &config->files[i].file, fname1);
code = tsdbOpenFile(fname1, config->szPage, TD_FILE_READ, &reader[0]->fd[i]);
code = tsdbOpenFile(fname1, config->tsdb, TD_FILE_READ, &reader[0]->fd[i]);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
@ -1475,7 +1475,7 @@ static int32_t tsdbDataFileWriterOpenDataFD(SDataFileWriter *writer) {
}
tsdbTFileName(writer->config->tsdb, &writer->files[ftype], fname);
code = tsdbOpenFile(fname, writer->config->szPage, flag, &writer->fd[ftype]);
code = tsdbOpenFile(fname, writer->config->tsdb, flag, &writer->fd[ftype]);
TSDB_CHECK_CODE(code, lino, _exit);
if (writer->files[ftype].size == 0) {
@ -1643,7 +1643,7 @@ static int32_t tsdbDataFileWriterOpenTombFD(SDataFileWriter *writer) {
int32_t flag = (TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
tsdbTFileName(writer->config->tsdb, writer->files + ftype, fname);
code = tsdbOpenFile(fname, writer->config->szPage, flag, &writer->fd[ftype]);
code = tsdbOpenFile(fname, writer->config->tsdb, flag, &writer->fd[ftype]);
TSDB_CHECK_CODE(code, lino, _exit);
uint8_t hdr[TSDB_FHDR_SIZE] = {0};

View File

@ -31,7 +31,7 @@ typedef struct SFDataPtr {
int64_t size;
} SFDataPtr;
extern int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **ppFD);
extern int32_t tsdbOpenFile(const char *path, STsdb *pTsdb, int32_t flag, STsdbFD **ppFD);
extern void tsdbCloseFile(STsdbFD **ppFD);
extern int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, const uint8_t *pBuf, int64_t size);
extern int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size);

View File

@ -45,6 +45,7 @@ static int32_t tsdbOpenFileImpl(STsdbFD *pFD) {
int32_t vid = 0;
sscanf(object_name, "v%df%dver%" PRId64 ".data", &vid, &pFD->fid, &pFD->cid);
pFD->objName = object_name;
pFD->szFile = s3_size;
#endif
} else {
code = TAOS_SYSTEM_ERROR(errsv);
@ -80,9 +81,10 @@ _exit:
}
// =============== PAGE-WISE FILE ===============
int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **ppFD) {
int32_t tsdbOpenFile(const char *path, STsdb *pTsdb, int32_t flag, STsdbFD **ppFD) {
int32_t code = 0;
STsdbFD *pFD = NULL;
int32_t szPage = pTsdb->pVnode->config.tsdbPageSize;
*ppFD = NULL;
@ -98,6 +100,7 @@ int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **p
pFD->flag = flag;
pFD->szPage = szPage;
pFD->pgno = 0;
pFD->pTsdb = pTsdb;
*ppFD = pFD;
@ -322,7 +325,7 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS
// head
flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
tsdbHeadFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fHead, fname);
code = tsdbOpenFile(fname, szPage, flag, &pWriter->pHeadFD);
code = tsdbOpenFile(fname, pTsdb, flag, &pWriter->pHeadFD);
if (code) goto _err;
code = tsdbWriteFile(pWriter->pHeadFD, 0, hdr, TSDB_FHDR_SIZE);
@ -336,7 +339,7 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS
flag = TD_FILE_READ | TD_FILE_WRITE;
}
tsdbDataFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fData, fname);
code = tsdbOpenFile(fname, szPage, flag, &pWriter->pDataFD);
code = tsdbOpenFile(fname, pTsdb, flag, &pWriter->pDataFD);
if (code) goto _err;
if (pWriter->fData.size == 0) {
code = tsdbWriteFile(pWriter->pDataFD, 0, hdr, TSDB_FHDR_SIZE);
@ -351,7 +354,7 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS
flag = TD_FILE_READ | TD_FILE_WRITE;
}
tsdbSmaFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fSma, fname);
code = tsdbOpenFile(fname, szPage, flag, &pWriter->pSmaFD);
code = tsdbOpenFile(fname, pTsdb, flag, &pWriter->pSmaFD);
if (code) goto _err;
if (pWriter->fSma.size == 0) {
code = tsdbWriteFile(pWriter->pSmaFD, 0, hdr, TSDB_FHDR_SIZE);
@ -364,7 +367,7 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS
ASSERT(pWriter->fStt[pSet->nSttF - 1].size == 0);
flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
tsdbSttFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fStt[pSet->nSttF - 1], fname);
code = tsdbOpenFile(fname, szPage, flag, &pWriter->pSttFD);
code = tsdbOpenFile(fname, pTsdb, flag, &pWriter->pSttFD);
if (code) goto _err;
code = tsdbWriteFile(pWriter->pSttFD, 0, hdr, TSDB_FHDR_SIZE);
if (code) goto _err;
@ -936,23 +939,23 @@ int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pS
// head
tsdbHeadFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pHeadF, fname);
code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->pHeadFD);
code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->pHeadFD);
TSDB_CHECK_CODE(code, lino, _exit);
// data
tsdbDataFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pDataF, fname);
code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->pDataFD);
code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->pDataFD);
TSDB_CHECK_CODE(code, lino, _exit);
// sma
tsdbSmaFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pSmaF, fname);
code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->pSmaFD);
code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->pSmaFD);
TSDB_CHECK_CODE(code, lino, _exit);
// stt
for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
tsdbSttFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSttF[iStt], fname);
code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->aSttFD[iStt]);
code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->aSttFD[iStt]);
TSDB_CHECK_CODE(code, lino, _exit);
}
@ -1352,8 +1355,7 @@ int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb
pDelFWriter->fDel = *pFile;
tsdbDelFileName(pTsdb, pFile, fname);
code = tsdbOpenFile(fname, pTsdb->pVnode->config.tsdbPageSize, TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE,
&pDelFWriter->pWriteH);
code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE, &pDelFWriter->pWriteH);
TSDB_CHECK_CODE(code, lino, _exit);
// update header
@ -1527,7 +1529,7 @@ int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb
pDelFReader->fDel = *pFile;
tsdbDelFileName(pTsdb, pFile, fname);
code = tsdbOpenFile(fname, pTsdb->pVnode->config.tsdbPageSize, TD_FILE_READ, &pDelFReader->pReadH);
code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pDelFReader->pReadH);
if (code) {
taosMemoryFree(pDelFReader);
goto _exit;

View File

@ -46,12 +46,12 @@ int32_t tsdbSttFileReaderOpen(const char *fname, const SSttFileReaderConfig *con
// open file
if (fname) {
code = tsdbOpenFile(fname, config->szPage, TD_FILE_READ, &reader[0]->fd);
code = tsdbOpenFile(fname, config->tsdb, TD_FILE_READ, &reader[0]->fd);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
char fname1[TSDB_FILENAME_LEN];
tsdbTFileName(config->tsdb, config->file, fname1);
code = tsdbOpenFile(fname1, config->szPage, TD_FILE_READ, &reader[0]->fd);
code = tsdbOpenFile(fname1, config->tsdb, TD_FILE_READ, &reader[0]->fd);
TSDB_CHECK_CODE(code, lino, _exit);
}
@ -705,7 +705,7 @@ static int32_t tsdbSttFWriterDoOpen(SSttFileWriter *writer) {
char fname[TSDB_FILENAME_LEN];
tsdbTFileName(writer->config->tsdb, writer->file, fname);
code = tsdbOpenFile(fname, writer->config->szPage, flag, &writer->fd);
code = tsdbOpenFile(fname, writer->config->tsdb, flag, &writer->fd);
TSDB_CHECK_CODE(code, lino, _exit);
uint8_t hdr[TSDB_FHDR_SIZE] = {0};

View File

@ -87,7 +87,7 @@ static int32_t tsdbUpgradeHead(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *
char fname[TSDB_FILENAME_LEN];
tsdbTFileName(tsdb, &file, fname);
code = tsdbOpenFile(fname, ctx->szPage, TD_FILE_READ | TD_FILE_WRITE, &ctx->fd);
code = tsdbOpenFile(fname, tsdb, TD_FILE_READ | TD_FILE_WRITE, &ctx->fd);
TSDB_CHECK_CODE(code, lino, _exit);
// convert
@ -257,7 +257,7 @@ static int32_t tsdbUpgradeSttFile(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReade
code = tsdbTFileObjInit(tsdb, &file, &fobj);
TSDB_CHECK_CODE(code, lino, _exit1);
code = tsdbOpenFile(fobj->fname, ctx->szPage, TD_FILE_READ | TD_FILE_WRITE, &ctx->fd);
code = tsdbOpenFile(fobj->fname, tsdb, TD_FILE_READ | TD_FILE_WRITE, &ctx->fd);
TSDB_CHECK_CODE(code, lino, _exit1);
for (int32_t iSttBlk = 0; iSttBlk < taosArrayGetSize(aSttBlk); iSttBlk++) {
@ -408,8 +408,7 @@ static int32_t tsdbUpgradeOpenTombFile(STsdb *tsdb, STFileSet *fset, STsdbFD **f
}
char fname[TSDB_FILENAME_LEN] = {0};
code = tsdbOpenFile(fobj[0]->fname, tsdb->pVnode->config.tsdbPageSize,
TD_FILE_READ | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_CREATE, fd);
code = tsdbOpenFile(fobj[0]->fname, tsdb, TD_FILE_READ | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_CREATE, fd);
TSDB_CHECK_CODE(code, lino, _exit);
uint8_t hdr[TSDB_FHDR_SIZE] = {0};

View File

@ -261,7 +261,7 @@ bool s3Get(const char *object_name, const char *path) {
return ret;
}
int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t block_size, uint8_t *pBlock) {
int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t block_size, uint8_t **ppBlock) {
int32_t code = 0;
cos_pool_t *p = NULL;
int is_cname = 0;
@ -327,7 +327,7 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t block_
//销毁内存池
cos_pool_destroy(p);
pBlock = buf;
*ppBlock = buf;
return code;
}
@ -453,7 +453,7 @@ void s3DeleteObjectsByPrefix(const char *prefix) {}
void s3DeleteObjects(const char *object_name[], int nobject) {}
bool s3Exists(const char *object_name) { return false; }
bool s3Get(const char *object_name, const char *path) { return false; }
int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t *pBlock) { return 0; }
int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t **ppBlock) { return 0; }
void s3EvictCache(const char *path, long object_size) {}
long s3Size(const char *object_name) { return 0; }