From ce1f86641649e91ec04f3046c98b0617fae3d7e2 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 21 Sep 2023 16:36:17 +0800 Subject: [PATCH] tsdb/reader: use pTsdb handle instead of pgsz to open file --- source/dnode/vnode/src/inc/vndCos.h | 2 +- source/dnode/vnode/src/tsdb/tsdbCache.c | 14 +++++----- source/dnode/vnode/src/tsdb/tsdbDataFileRW.c | 10 +++---- source/dnode/vnode/src/tsdb/tsdbDef.h | 4 +-- .../dnode/vnode/src/tsdb/tsdbReaderWriter.c | 26 ++++++++++--------- source/dnode/vnode/src/tsdb/tsdbSttFileRW.c | 8 +++--- source/dnode/vnode/src/tsdb/tsdbUpgrade.c | 9 +++---- source/dnode/vnode/src/vnd/vnodeCos.c | 6 ++--- 8 files changed, 41 insertions(+), 38 deletions(-) diff --git a/source/dnode/vnode/src/inc/vndCos.h b/source/dnode/vnode/src/inc/vndCos.h index 04d1f6e99f..bb4d284f0e 100644 --- a/source/dnode/vnode/src/inc/vndCos.h +++ b/source/dnode/vnode/src/inc/vndCos.h @@ -36,7 +36,7 @@ void s3DeleteObjectsByPrefix(const char *prefix); void s3DeleteObjects(const char *object_name[], int nobject); bool s3Exists(const char *object_name); bool s3Get(const char *object_name, const char *path); -int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t *pBlock); +int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t **ppBlock); void s3EvictCache(const char *path, long object_size); long s3Size(const char *object_name); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 09bcba9b61..38fbf42915 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -3048,24 +3048,26 @@ static void getBCacheKey(int32_t fid, int64_t commitID, int64_t blkno, char *key } static int32_t tsdbCacheLoadBlockS3(STsdbFD *pFD, uint8_t **ppBlock) { - int32_t code = 0; + int32_t code = 0; + /* uint8_t *pBlock = taosMemoryCalloc(1, tsS3BlockSize * pFD->szPage); if (pBlock == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } - + */ int64_t block_offset = (pFD->blkno - 1) * tsS3BlockSize * pFD->szPage; - code = s3GetObjectBlock(pFD->objName, block_offset, tsS3BlockSize * pFD->szPage, pBlock); + // int64_t size = 4096; + code = s3GetObjectBlock(pFD->objName, block_offset, tsS3BlockSize * pFD->szPage, ppBlock); if (code != TSDB_CODE_SUCCESS) { - taosMemoryFree(pBlock); + // taosMemoryFree(pBlock); code = TSDB_CODE_OUT_OF_MEMORY; return code; } - *ppBlock = pBlock; + //*ppBlock = pBlock; - tsdbTrace("block:%p load from s3", pBlock); + tsdbTrace("block:%p load from s3", *ppBlock); _exit: return code; diff --git a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c index dc5e3649cc..161025a7f5 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c @@ -101,7 +101,7 @@ int32_t tsdbDataFileReaderOpen(const char *fname[], const SDataFileReaderConfig if (fname) { for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) { if (fname[i]) { - code = tsdbOpenFile(fname[i], config->szPage, TD_FILE_READ, &reader[0]->fd[i]); + code = tsdbOpenFile(fname[i], config->tsdb, TD_FILE_READ, &reader[0]->fd[i]); TSDB_CHECK_CODE(code, lino, _exit); } } @@ -110,7 +110,7 @@ int32_t tsdbDataFileReaderOpen(const char *fname[], const SDataFileReaderConfig if (config->files[i].exist) { char fname1[TSDB_FILENAME_LEN]; tsdbTFileName(config->tsdb, &config->files[i].file, fname1); - code = tsdbOpenFile(fname1, config->szPage, TD_FILE_READ, &reader[0]->fd[i]); + code = tsdbOpenFile(fname1, config->tsdb, TD_FILE_READ, &reader[0]->fd[i]); TSDB_CHECK_CODE(code, lino, _exit); } } @@ -1475,7 +1475,7 @@ static int32_t tsdbDataFileWriterOpenDataFD(SDataFileWriter *writer) { } tsdbTFileName(writer->config->tsdb, &writer->files[ftype], fname); - code = tsdbOpenFile(fname, writer->config->szPage, flag, &writer->fd[ftype]); + code = tsdbOpenFile(fname, writer->config->tsdb, flag, &writer->fd[ftype]); TSDB_CHECK_CODE(code, lino, _exit); if (writer->files[ftype].size == 0) { @@ -1643,7 +1643,7 @@ static int32_t tsdbDataFileWriterOpenTombFD(SDataFileWriter *writer) { int32_t flag = (TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); tsdbTFileName(writer->config->tsdb, writer->files + ftype, fname); - code = tsdbOpenFile(fname, writer->config->szPage, flag, &writer->fd[ftype]); + code = tsdbOpenFile(fname, writer->config->tsdb, flag, &writer->fd[ftype]); TSDB_CHECK_CODE(code, lino, _exit); uint8_t hdr[TSDB_FHDR_SIZE] = {0}; @@ -1693,4 +1693,4 @@ _exit: TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); } return code; -} \ No newline at end of file +} diff --git a/source/dnode/vnode/src/tsdb/tsdbDef.h b/source/dnode/vnode/src/tsdb/tsdbDef.h index e768f68b15..da2445dee5 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDef.h +++ b/source/dnode/vnode/src/tsdb/tsdbDef.h @@ -31,7 +31,7 @@ typedef struct SFDataPtr { int64_t size; } SFDataPtr; -extern int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **ppFD); +extern int32_t tsdbOpenFile(const char *path, STsdb *pTsdb, int32_t flag, STsdbFD **ppFD); extern void tsdbCloseFile(STsdbFD **ppFD); extern int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, const uint8_t *pBuf, int64_t size); extern int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size); @@ -41,4 +41,4 @@ extern int32_t tsdbFsyncFile(STsdbFD *pFD); } #endif -#endif /*_TD_TSDB_DEF_H_*/ \ No newline at end of file +#endif /*_TD_TSDB_DEF_H_*/ diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index c095f6ca21..962185847d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -45,6 +45,7 @@ static int32_t tsdbOpenFileImpl(STsdbFD *pFD) { int32_t vid = 0; sscanf(object_name, "v%df%dver%" PRId64 ".data", &vid, &pFD->fid, &pFD->cid); pFD->objName = object_name; + pFD->szFile = s3_size; #endif } else { code = TAOS_SYSTEM_ERROR(errsv); @@ -80,9 +81,10 @@ _exit: } // =============== PAGE-WISE FILE =============== -int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **ppFD) { +int32_t tsdbOpenFile(const char *path, STsdb *pTsdb, int32_t flag, STsdbFD **ppFD) { int32_t code = 0; STsdbFD *pFD = NULL; + int32_t szPage = pTsdb->pVnode->config.tsdbPageSize; *ppFD = NULL; @@ -98,6 +100,7 @@ int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **p pFD->flag = flag; pFD->szPage = szPage; pFD->pgno = 0; + pFD->pTsdb = pTsdb; *ppFD = pFD; @@ -322,7 +325,7 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS // head flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; tsdbHeadFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fHead, fname); - code = tsdbOpenFile(fname, szPage, flag, &pWriter->pHeadFD); + code = tsdbOpenFile(fname, pTsdb, flag, &pWriter->pHeadFD); if (code) goto _err; code = tsdbWriteFile(pWriter->pHeadFD, 0, hdr, TSDB_FHDR_SIZE); @@ -336,7 +339,7 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS flag = TD_FILE_READ | TD_FILE_WRITE; } tsdbDataFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fData, fname); - code = tsdbOpenFile(fname, szPage, flag, &pWriter->pDataFD); + code = tsdbOpenFile(fname, pTsdb, flag, &pWriter->pDataFD); if (code) goto _err; if (pWriter->fData.size == 0) { code = tsdbWriteFile(pWriter->pDataFD, 0, hdr, TSDB_FHDR_SIZE); @@ -351,7 +354,7 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS flag = TD_FILE_READ | TD_FILE_WRITE; } tsdbSmaFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fSma, fname); - code = tsdbOpenFile(fname, szPage, flag, &pWriter->pSmaFD); + code = tsdbOpenFile(fname, pTsdb, flag, &pWriter->pSmaFD); if (code) goto _err; if (pWriter->fSma.size == 0) { code = tsdbWriteFile(pWriter->pSmaFD, 0, hdr, TSDB_FHDR_SIZE); @@ -364,7 +367,7 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS ASSERT(pWriter->fStt[pSet->nSttF - 1].size == 0); flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; tsdbSttFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fStt[pSet->nSttF - 1], fname); - code = tsdbOpenFile(fname, szPage, flag, &pWriter->pSttFD); + code = tsdbOpenFile(fname, pTsdb, flag, &pWriter->pSttFD); if (code) goto _err; code = tsdbWriteFile(pWriter->pSttFD, 0, hdr, TSDB_FHDR_SIZE); if (code) goto _err; @@ -936,23 +939,23 @@ int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pS // head tsdbHeadFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pHeadF, fname); - code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->pHeadFD); + code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->pHeadFD); TSDB_CHECK_CODE(code, lino, _exit); // data tsdbDataFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pDataF, fname); - code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->pDataFD); + code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->pDataFD); TSDB_CHECK_CODE(code, lino, _exit); // sma tsdbSmaFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pSmaF, fname); - code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->pSmaFD); + code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->pSmaFD); TSDB_CHECK_CODE(code, lino, _exit); // stt for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) { tsdbSttFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSttF[iStt], fname); - code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->aSttFD[iStt]); + code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pReader->aSttFD[iStt]); TSDB_CHECK_CODE(code, lino, _exit); } @@ -1352,8 +1355,7 @@ int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb pDelFWriter->fDel = *pFile; tsdbDelFileName(pTsdb, pFile, fname); - code = tsdbOpenFile(fname, pTsdb->pVnode->config.tsdbPageSize, TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE, - &pDelFWriter->pWriteH); + code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE, &pDelFWriter->pWriteH); TSDB_CHECK_CODE(code, lino, _exit); // update header @@ -1527,7 +1529,7 @@ int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb pDelFReader->fDel = *pFile; tsdbDelFileName(pTsdb, pFile, fname); - code = tsdbOpenFile(fname, pTsdb->pVnode->config.tsdbPageSize, TD_FILE_READ, &pDelFReader->pReadH); + code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pDelFReader->pReadH); if (code) { taosMemoryFree(pDelFReader); goto _exit; diff --git a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c index 27fae9dc6e..fa8d2d5ba4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c @@ -46,12 +46,12 @@ int32_t tsdbSttFileReaderOpen(const char *fname, const SSttFileReaderConfig *con // open file if (fname) { - code = tsdbOpenFile(fname, config->szPage, TD_FILE_READ, &reader[0]->fd); + code = tsdbOpenFile(fname, config->tsdb, TD_FILE_READ, &reader[0]->fd); TSDB_CHECK_CODE(code, lino, _exit); } else { char fname1[TSDB_FILENAME_LEN]; tsdbTFileName(config->tsdb, config->file, fname1); - code = tsdbOpenFile(fname1, config->szPage, TD_FILE_READ, &reader[0]->fd); + code = tsdbOpenFile(fname1, config->tsdb, TD_FILE_READ, &reader[0]->fd); TSDB_CHECK_CODE(code, lino, _exit); } @@ -705,7 +705,7 @@ static int32_t tsdbSttFWriterDoOpen(SSttFileWriter *writer) { char fname[TSDB_FILENAME_LEN]; tsdbTFileName(writer->config->tsdb, writer->file, fname); - code = tsdbOpenFile(fname, writer->config->szPage, flag, &writer->fd); + code = tsdbOpenFile(fname, writer->config->tsdb, flag, &writer->fd); TSDB_CHECK_CODE(code, lino, _exit); uint8_t hdr[TSDB_FHDR_SIZE] = {0}; @@ -984,4 +984,4 @@ _exit: return code; } -bool tsdbSttFileWriterIsOpened(SSttFileWriter *writer) { return writer->ctx->opened; } \ No newline at end of file +bool tsdbSttFileWriterIsOpened(SSttFileWriter *writer) { return writer->ctx->opened; } diff --git a/source/dnode/vnode/src/tsdb/tsdbUpgrade.c b/source/dnode/vnode/src/tsdb/tsdbUpgrade.c index 3b38a0ae45..0884c32385 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUpgrade.c +++ b/source/dnode/vnode/src/tsdb/tsdbUpgrade.c @@ -87,7 +87,7 @@ static int32_t tsdbUpgradeHead(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader * char fname[TSDB_FILENAME_LEN]; tsdbTFileName(tsdb, &file, fname); - code = tsdbOpenFile(fname, ctx->szPage, TD_FILE_READ | TD_FILE_WRITE, &ctx->fd); + code = tsdbOpenFile(fname, tsdb, TD_FILE_READ | TD_FILE_WRITE, &ctx->fd); TSDB_CHECK_CODE(code, lino, _exit); // convert @@ -257,7 +257,7 @@ static int32_t tsdbUpgradeSttFile(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReade code = tsdbTFileObjInit(tsdb, &file, &fobj); TSDB_CHECK_CODE(code, lino, _exit1); - code = tsdbOpenFile(fobj->fname, ctx->szPage, TD_FILE_READ | TD_FILE_WRITE, &ctx->fd); + code = tsdbOpenFile(fobj->fname, tsdb, TD_FILE_READ | TD_FILE_WRITE, &ctx->fd); TSDB_CHECK_CODE(code, lino, _exit1); for (int32_t iSttBlk = 0; iSttBlk < taosArrayGetSize(aSttBlk); iSttBlk++) { @@ -408,8 +408,7 @@ static int32_t tsdbUpgradeOpenTombFile(STsdb *tsdb, STFileSet *fset, STsdbFD **f } char fname[TSDB_FILENAME_LEN] = {0}; - code = tsdbOpenFile(fobj[0]->fname, tsdb->pVnode->config.tsdbPageSize, - TD_FILE_READ | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_CREATE, fd); + code = tsdbOpenFile(fobj[0]->fname, tsdb, TD_FILE_READ | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_CREATE, fd); TSDB_CHECK_CODE(code, lino, _exit); uint8_t hdr[TSDB_FHDR_SIZE] = {0}; @@ -633,4 +632,4 @@ int32_t tsdbCheckAndUpgradeFileSystem(STsdb *tsdb, int8_t rollback) { taosRemoveFile(fname); return 0; -} \ No newline at end of file +} diff --git a/source/dnode/vnode/src/vnd/vnodeCos.c b/source/dnode/vnode/src/vnd/vnodeCos.c index f7b5c4f34d..e6c3b87e94 100644 --- a/source/dnode/vnode/src/vnd/vnodeCos.c +++ b/source/dnode/vnode/src/vnd/vnodeCos.c @@ -261,7 +261,7 @@ bool s3Get(const char *object_name, const char *path) { return ret; } -int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t block_size, uint8_t *pBlock) { +int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t block_size, uint8_t **ppBlock) { int32_t code = 0; cos_pool_t *p = NULL; int is_cname = 0; @@ -327,7 +327,7 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t block_ //销毁内存池 cos_pool_destroy(p); - pBlock = buf; + *ppBlock = buf; return code; } @@ -453,7 +453,7 @@ void s3DeleteObjectsByPrefix(const char *prefix) {} void s3DeleteObjects(const char *object_name[], int nobject) {} bool s3Exists(const char *object_name) { return false; } bool s3Get(const char *object_name, const char *path) { return false; } -int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t *pBlock) { return 0; } +int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t **ppBlock) { return 0; } void s3EvictCache(const char *path, long object_size) {} long s3Size(const char *object_name) { return 0; }