From aae9b3c1bc5db5d2007eb467c35a5a7eafdea3a6 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 17 Jul 2020 17:32:56 +0800 Subject: [PATCH] add a idx file for query speed --- src/tsdb/inc/tsdbMain.h | 34 +++-- src/tsdb/src/tsdbFile.c | 18 +-- src/tsdb/src/tsdbMain.c | 12 +- src/tsdb/src/tsdbMemTable.c | 7 +- src/tsdb/src/tsdbRWHelper.c | 261 +++++++++++++++++++----------------- 5 files changed, 178 insertions(+), 154 deletions(-) diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 40f2dac660..da5f03a4fb 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -132,21 +132,23 @@ typedef struct { // ------------------ tsdbFile.c extern const char* tsdbFileSuffix[]; typedef enum { - TSDB_FILE_TYPE_HEAD = 0, + TSDB_FILE_TYPE_IDX = 0, + TSDB_FILE_TYPE_HEAD, TSDB_FILE_TYPE_DATA, TSDB_FILE_TYPE_LAST, TSDB_FILE_TYPE_MAX, + TSDB_FILE_TYPE_NIDX, TSDB_FILE_TYPE_NHEAD, TSDB_FILE_TYPE_NLAST } TSDB_FILE_TYPE; typedef struct { - uint32_t offset; + uint32_t magic; uint32_t len; - uint64_t size; // total size of the file - uint64_t tombSize; // unused file size uint32_t totalBlocks; uint32_t totalSubBlocks; + uint64_t size; // total size of the file + uint64_t tombSize; // unused file size } STsdbFileInfo; typedef struct { @@ -249,16 +251,12 @@ typedef struct { typedef enum { TSDB_WRITE_HELPER, TSDB_READ_HELPER } tsdb_rw_helper_t; typedef struct { - int fid; - TSKEY minKey; - TSKEY maxKey; - // For read/write purpose - SFile headF; - SFile dataF; - SFile lastF; - // For write purpose only - SFile nHeadF; - SFile nLastF; + TSKEY minKey; + TSKEY maxKey; + SFileGroup fGroup; + SFile nIdxF; + SFile nHeadF; + SFile nLastF; } SHelperFile; typedef struct { @@ -444,6 +442,14 @@ void tsdbRemoveFileGroup(STsdbRepo* pRepo, SFileGroup* pFGroup); #define helperRepo(h) (h)->pRepo #define helperState(h) (h)->state #define TSDB_NLAST_FILE_OPENED(h) ((h)->files.nLastF.fd > 0) +#define helperFileId(h) ((h)->files.fGroup.fileId) +#define helperIdxF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_IDX])) +#define helperHeadF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_HEAD])) +#define helperDataF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_DATA])) +#define helperLastF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_LAST])) +#define helperNewIdxF(h) (&((h)->files.nIdxF)) +#define helperNewHeadF(h) (&((h)->files.nHeadF)) +#define helperNewLastF(h) (&((h)->files.nLastF)) int tsdbInitReadHelper(SRWHelper* pHelper, STsdbRepo* pRepo); int tsdbInitWriteHelper(SRWHelper* pHelper, STsdbRepo* pRepo); diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index 95cc47292b..7b7788c4d9 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -30,7 +30,7 @@ #include "ttime.h" #include "tfile.h" -const char *tsdbFileSuffix[] = {".head", ".data", ".last", "", ".h", ".l"}; +const char *tsdbFileSuffix[] = {".idx", ".head", ".data", ".last", "", ".i", ".h", ".l"}; static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type); static void tsdbDestroyFile(SFile *pFile); @@ -108,7 +108,7 @@ int tsdbOpenFileH(STsdbRepo *pRepo) { memset((void *)(&fileGroup), 0, sizeof(SFileGroup)); fileGroup.fileId = fid; - for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { + for (int type = TSDB_FILE_TYPE_IDX; type < TSDB_FILE_TYPE_MAX; type++) { if (tsdbInitFile(&fileGroup.files[type], pRepo, fid, type) < 0) { tsdbError("vgId:%d failed to init file fid %d type %d", REPO_ID(pRepo), fid, type); goto _err; @@ -126,7 +126,7 @@ int tsdbOpenFileH(STsdbRepo *pRepo) { return 0; _err: - for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) tsdbDestroyFile(&fileGroup.files[type]); + for (int type = TSDB_FILE_TYPE_IDX; type < TSDB_FILE_TYPE_MAX; type++) tsdbDestroyFile(&fileGroup.files[type]); tfree(tDataDir); if (dir != NULL) closedir(dir); @@ -139,7 +139,7 @@ void tsdbCloseFileH(STsdbRepo *pRepo) { for (int i = 0; i < pFileH->nFGroups; i++) { SFileGroup *pFGroup = pFileH->pFGroup + i; - for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { + for (int type = TSDB_FILE_TYPE_IDX; type < TSDB_FILE_TYPE_MAX; type++) { tsdbDestroyFile(&pFGroup->files[type]); } } @@ -156,7 +156,7 @@ SFileGroup *tsdbCreateFGroupIfNeed(STsdbRepo *pRepo, char *dataDir, int fid, int SFileGroup *pGroup = tsdbSearchFGroup(pFileH, fid, TD_EQ); if (pGroup == NULL) { // if not exists, create one pFGroup->fileId = fid; - for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { + for (int type = TSDB_FILE_TYPE_IDX; type < TSDB_FILE_TYPE_MAX; type++) { if (tsdbCreateFile(&pFGroup->files[type], pRepo, fid, type) < 0) goto _err; } @@ -169,7 +169,7 @@ SFileGroup *tsdbCreateFGroupIfNeed(STsdbRepo *pRepo, char *dataDir, int fid, int return pGroup; _err: - for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) tsdbDestroyFile(&pGroup->files[type]); + for (int type = TSDB_FILE_TYPE_IDX; type < TSDB_FILE_TYPE_MAX; type++) tsdbDestroyFile(&pGroup->files[type]); return NULL; } @@ -323,7 +323,7 @@ int tsdbUpdateFileHeader(SFile *pFile, uint32_t version) { int tsdbEncodeSFileInfo(void **buf, const STsdbFileInfo *pInfo) { int tlen = 0; - tlen += taosEncodeFixedU32(buf, pInfo->offset); + tlen += taosEncodeFixedU32(buf, pInfo->magic); tlen += taosEncodeFixedU32(buf, pInfo->len); tlen += taosEncodeFixedU64(buf, pInfo->size); tlen += taosEncodeFixedU64(buf, pInfo->tombSize); @@ -334,7 +334,7 @@ int tsdbEncodeSFileInfo(void **buf, const STsdbFileInfo *pInfo) { } void *tsdbDecodeSFileInfo(void *buf, STsdbFileInfo *pInfo) { - buf = taosDecodeFixedU32(buf, &(pInfo->offset)); + buf = taosDecodeFixedU32(buf, &(pInfo->magic)); buf = taosDecodeFixedU32(buf, &(pInfo->len)); buf = taosDecodeFixedU64(buf, &(pInfo->size)); buf = taosDecodeFixedU64(buf, &(pInfo->tombSize)); @@ -358,7 +358,7 @@ void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) { pFileH->nFGroups--; ASSERT(pFileH->nFGroups >= 0); - for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { + for (int type = TSDB_FILE_TYPE_IDX; type < TSDB_FILE_TYPE_MAX; type++) { if (remove(fileGroup.files[type].fname) < 0) { tsdbError("vgId:%d failed to remove file %s", REPO_ID(pRepo), fileGroup.files[type].fname); } diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index ffaab375a3..848cd4e85a 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -214,7 +214,7 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_ char *prefix = dirname(sdup); if (name[0] == 0) { // get the file from index or after, but not larger than eindex - int fid = (*index) / 3; + int fid = (*index) / TSDB_FILE_TYPE_MAX; if (pFileH->nFGroups == 0 || fid > pFileH->pFGroup[pFileH->nFGroups - 1].fileId) { if (*index <= TSDB_META_FILE_INDEX && TSDB_META_FILE_INDEX <= eindex) { @@ -228,11 +228,11 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_ SFileGroup *pFGroup = taosbsearch(&fid, pFileH->pFGroup, pFileH->nFGroups, sizeof(SFileGroup), keyFGroupCompFunc, TD_GE); if (pFGroup->fileId == fid) { - fname = strdup(pFGroup->files[(*index) % 3].fname); + fname = strdup(pFGroup->files[(*index) % TSDB_FILE_TYPE_MAX].fname); } else { - if (pFGroup->fileId * 3 + 2 < eindex) { + if ((pFGroup->fileId + 1) * TSDB_FILE_TYPE_MAX - 1 < eindex) { fname = strdup(pFGroup->files[0].fname); - *index = pFGroup->fileId * 3; + *index = pFGroup->fileId * TSDB_FILE_TYPE_MAX; } else { tfree(sdup); return 0; @@ -244,14 +244,14 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_ if (*index == TSDB_META_FILE_INDEX) { // get meta file fname = tsdbGetMetaFileName(pRepo->rootDir); } else { - int fid = (*index) / 3; + int fid = (*index) / TSDB_FILE_TYPE_MAX; SFileGroup *pFGroup = tsdbSearchFGroup(pFileH, fid, TD_EQ); if (pFGroup == NULL) { // not found tfree(sdup); return 0; } - SFile *pFile = &pFGroup->files[(*index) % 3]; + SFile *pFile = &pFGroup->files[(*index) % TSDB_FILE_TYPE_MAX]; fname = strdup(pFile->fname); } } diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index b29cec3cf9..69e6ab3c0f 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -628,9 +628,10 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe tsdbCloseHelperFile(pHelper, 0); pthread_rwlock_wrlock(&(pFileH->fhlock)); - pGroup->files[TSDB_FILE_TYPE_HEAD] = pHelper->files.headF; - pGroup->files[TSDB_FILE_TYPE_DATA] = pHelper->files.dataF; - pGroup->files[TSDB_FILE_TYPE_LAST] = pHelper->files.lastF; + pGroup->files[TSDB_FILE_TYPE_IDX] = *(helperIdxF(pHelper)); + pGroup->files[TSDB_FILE_TYPE_HEAD] = *(helperHeadF(pHelper)); + pGroup->files[TSDB_FILE_TYPE_DATA] = *(helperDataF(pHelper)); + pGroup->files[TSDB_FILE_TYPE_LAST] = *(helperLastF(pHelper)); pthread_rwlock_unlock(&(pFileH->fhlock)); return 0; diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index 0d52b7ae33..79b4a8ef64 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -106,44 +106,36 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) { ASSERT(pHelper->state == TSDB_HELPER_CLEAR_STATE); // Set the files - pHelper->files.fid = pGroup->fileId; - pHelper->files.headF = pGroup->files[TSDB_FILE_TYPE_HEAD]; - pHelper->files.dataF = pGroup->files[TSDB_FILE_TYPE_DATA]; - pHelper->files.lastF = pGroup->files[TSDB_FILE_TYPE_LAST]; + pHelper->files.fGroup = *pGroup; if (helperType(pHelper) == TSDB_WRITE_HELPER) { - tsdbGetDataFileName(pHelper->pRepo, pGroup->fileId, TSDB_FILE_TYPE_NHEAD, pHelper->files.nHeadF.fname); - tsdbGetDataFileName(pHelper->pRepo, pGroup->fileId, TSDB_FILE_TYPE_NLAST, pHelper->files.nLastF.fname); + tsdbGetDataFileName(pHelper->pRepo, pGroup->fileId, TSDB_FILE_TYPE_NIDX, helperNewIdxF(pHelper)->fname); + tsdbGetDataFileName(pHelper->pRepo, pGroup->fileId, TSDB_FILE_TYPE_NHEAD, helperNewHeadF(pHelper)->fname); + tsdbGetDataFileName(pHelper->pRepo, pGroup->fileId, TSDB_FILE_TYPE_NLAST, helperNewLastF(pHelper)->fname); } // Open the files - if (tsdbOpenFile(&(pHelper->files.headF), O_RDONLY) < 0) goto _err; + if (tsdbOpenFile(helperIdxF(pHelper), O_RDONLY) < 0) goto _err; + if (tsdbOpenFile(helperHeadF(pHelper), O_RDONLY) < 0) goto _err; if (helperType(pHelper) == TSDB_WRITE_HELPER) { - if (tsdbOpenFile(&(pHelper->files.dataF), O_RDWR) < 0) goto _err; - if (tsdbOpenFile(&(pHelper->files.lastF), O_RDWR) < 0) goto _err; + if (tsdbOpenFile(helperDataF(pHelper), O_RDWR) < 0) goto _err; + if (tsdbOpenFile(helperLastF(pHelper), O_RDWR) < 0) goto _err; + + // Create and open .i file + if (tsdbOpenFile(helperNewIdxF(pHelper), O_WRONLY | O_CREAT) < 0) return -1; + if (tsdbUpdateFileHeader(helperNewIdxF(pHelper), 0) < 0) return -1; // Create and open .h - if (tsdbOpenFile(&(pHelper->files.nHeadF), O_WRONLY | O_CREAT) < 0) return -1; - // size_t tsize = TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pCfg->maxTables + sizeof(TSCKSUM); - if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) { - tsdbError("vgId:%d failed to sendfile %d bytes from file %s to %s since %s", REPO_ID(pHelper->pRepo), - TSDB_FILE_HEAD_SIZE, pHelper->files.headF.fname, pHelper->files.nHeadF.fname, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + if (tsdbOpenFile(helperNewHeadF(pHelper), O_WRONLY | O_CREAT) < 0) return -1; + if (tsdbUpdateFileHeader(helperNewHeadF(pHelper), 0) < 0) return -1; // Create and open .l file if should if (tsdbShouldCreateNewLast(pHelper)) { - if (tsdbOpenFile(&(pHelper->files.nLastF), O_WRONLY | O_CREAT) < 0) goto _err; - if (tsendfile(pHelper->files.nLastF.fd, pHelper->files.lastF.fd, NULL, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) { - tsdbError("vgId:%d failed to sendfile %d bytes from file %s to %s since %s", REPO_ID(pHelper->pRepo), - TSDB_FILE_HEAD_SIZE, pHelper->files.lastF.fname, pHelper->files.nLastF.fname, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + if (tsdbOpenFile(helperNewLastF(pHelper), O_WRONLY | O_CREAT) < 0) goto _err; + if (tsdbUpdateFileHeader(helperNewLastF(pHelper), 0) < 0) return -1; } } else { - if (tsdbOpenFile(&(pHelper->files.dataF), O_RDONLY) < 0) goto _err; - if (tsdbOpenFile(&(pHelper->files.lastF), O_RDONLY) < 0) goto _err; + if (tsdbOpenFile(helperDataF(pHelper), O_RDONLY) < 0) goto _err; + if (tsdbOpenFile(helperLastF(pHelper), O_RDONLY) < 0) goto _err; } helperSetState(pHelper, TSDB_HELPER_FILE_SET_AND_OPEN); @@ -155,59 +147,94 @@ _err: } int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) { - if (pHelper->files.headF.fd > 0) { - close(pHelper->files.headF.fd); - pHelper->files.headF.fd = -1; + SFile *pFile = NULL; + + pFile = helperIdxF(pHelper); + if (pFile->fd > 0) { + close(pFile->fd); + pFile->fd = -1; } - if (pHelper->files.dataF.fd > 0) { + + pFile = helperHeadF(pHelper); + if (pFile->fd > 0) { + close(pFile->fd); + pFile->fd = -1; + } + + pFile = helperDataF(pHelper); + if (pFile->fd > 0) { if (helperType(pHelper) == TSDB_WRITE_HELPER) { - tsdbUpdateFileHeader(&(pHelper->files.dataF), 0); - fsync(pHelper->files.dataF.fd); + tsdbUpdateFileHeader(pFile, 0); + fsync(pFile->fd); } - close(pHelper->files.dataF.fd); - pHelper->files.dataF.fd = -1; + close(pFile->fd); + pFile->fd = -1; } - if (pHelper->files.lastF.fd > 0) { - if (helperType(pHelper) == TSDB_WRITE_HELPER) { - fsync(pHelper->files.lastF.fd); + + pFile = helperLastF(pHelper); + if (pFile->fd > 0) { + if (helperType(pHelper) == TSDB_WRITE_HELPER && !TSDB_NLAST_FILE_OPENED(pHelper)) { + fsync(pFile->fd); } - close(pHelper->files.lastF.fd); - pHelper->files.lastF.fd = -1; + close(pFile->fd); + pFile->fd = -1; } + if (helperType(pHelper) == TSDB_WRITE_HELPER) { - if (pHelper->files.nHeadF.fd > 0) { - if (!hasError) tsdbUpdateFileHeader(&(pHelper->files.nHeadF), 0); - fsync(pHelper->files.nHeadF.fd); - close(pHelper->files.nHeadF.fd); - pHelper->files.nHeadF.fd = -1; + pFile = helperNewIdxF(pHelper); + if (pFile->fd > 0) { + if (!hasError) tsdbUpdateFileHeader(pFile, 0); + fsync(pFile->fd); + close(pFile->fd); + pFile->fd = -1; if (hasError) { - (void)remove(pHelper->files.nHeadF.fname); + (void)remove(pFile->fname); } else { - if (rename(pHelper->files.nHeadF.fname, pHelper->files.headF.fname) < 0) { - tsdbError("failed to rename file from %s to %s since %s", pHelper->files.nHeadF.fname, - pHelper->files.headF.fname, strerror(errno)); + if (rename(pFile->fname, helperIdxF(pHelper)->fname) < 0) { + tsdbError("failed to rename file from %s to %s since %s", pFile->fname, helperIdxF(pHelper)->fname, + strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } - pHelper->files.headF.info = pHelper->files.nHeadF.info; + helperIdxF(pHelper)->info = pFile->info; } } - if (pHelper->files.nLastF.fd > 0) { - if (!hasError) tsdbUpdateFileHeader(&(pHelper->files.nLastF), 0); - fsync(pHelper->files.nLastF.fd); - close(pHelper->files.nLastF.fd); - pHelper->files.nLastF.fd = -1; + pFile = helperNewHeadF(pHelper); + if (pFile->fd > 0) { + if (!hasError) tsdbUpdateFileHeader(pFile, 0); + fsync(pFile->fd); + close(pFile->fd); + pFile->fd = -1; if (hasError) { - (void)remove(pHelper->files.nLastF.fname); + (void)remove(pFile->fname); } else { - if (rename(pHelper->files.nLastF.fname, pHelper->files.lastF.fname) < 0) { - tsdbError("failed to rename file from %s to %s since %s", pHelper->files.nLastF.fname, - pHelper->files.lastF.fname, strerror(errno)); + if (rename(pFile->fname, helperHeadF(pHelper)->fname) < 0) { + tsdbError("failed to rename file from %s to %s since %s", pFile->fname, helperHeadF(pHelper)->fname, + strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } - pHelper->files.lastF.info = pHelper->files.nLastF.info; + helperHeadF(pHelper)->info = pFile->info; + } + } + + pFile = helperNewLastF(pHelper); + if (pFile->fd > 0) { + if (!hasError) tsdbUpdateFileHeader(pFile, 0); + fsync(pFile->fd); + close(pFile->fd); + pFile->fd = -1; + if (hasError) { + (void)remove(pFile->fname); + } else { + if (rename(pFile->fname, helperLastF(pHelper)->fname) < 0) { + tsdbError("failed to rename file from %s to %s since %s", pFile->fname, helperLastF(pHelper)->fname, + strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + helperLastF(pHelper)->info = helperNewLastF(pHelper)->info; } } } @@ -283,28 +310,28 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) { if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows && pHelper->pDataCols[0]->numOfRows < pCfg->minRowsPerFileBlock); - if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.nLastF), pHelper->pDataCols[0], &compBlock, true, true) < 0) + if (tsdbWriteBlockToFile(pHelper, helperNewLastF(pHelper), pHelper->pDataCols[0], &compBlock, true, true) < 0) return -1; if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1; } else { - if (lseek(pHelper->files.lastF.fd, pCompBlock->offset, SEEK_SET) < 0) { - tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pHelper->files.lastF.fname, + if (lseek(helperLastF(pHelper)->fd, pCompBlock->offset, SEEK_SET) < 0) { + tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), helperLastF(pHelper)->fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } - pCompBlock->offset = lseek(pHelper->files.nLastF.fd, 0, SEEK_END); + pCompBlock->offset = lseek(helperNewLastF(pHelper)->fd, 0, SEEK_END); if (pCompBlock->offset < 0) { - tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pHelper->files.nLastF.fname, + tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), helperNewLastF(pHelper)->fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } - if (tsendfile(pHelper->files.nLastF.fd, pHelper->files.lastF.fd, NULL, pCompBlock->len) < pCompBlock->len) { + if (tsendfile(helperNewLastF(pHelper)->fd, helperLastF(pHelper)->fd, NULL, pCompBlock->len) < pCompBlock->len) { tsdbError("vgId:%d failed to sendfile from file %s to file %s since %s", REPO_ID(pHelper->pRepo), - pHelper->files.lastF.fname, pHelper->files.nLastF.fname, strerror(errno)); + helperLastF(pHelper)->fname, helperNewLastF(pHelper)->fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } @@ -321,9 +348,9 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) { SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) { if (pIdx->offset > 0) { - offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END); + offset = lseek(helperNewHeadF(pHelper)->fd, 0, SEEK_END); if (offset < 0) { - tsdbError("vgId:%d failed to lseed file %s since %s", REPO_ID(pHelper->pRepo), pHelper->files.nHeadF.fname, + tsdbError("vgId:%d failed to lseed file %s since %s", REPO_ID(pHelper->pRepo), helperNewHeadF(pHelper)->fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -332,9 +359,9 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) { pIdx->offset = offset; ASSERT(pIdx->offset >= TSDB_FILE_HEAD_SIZE); - if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, pIdx->len) < pIdx->len) { + if (tsendfile(helperNewHeadF(pHelper)->fd, helperHeadF(pHelper)->fd, NULL, pIdx->len) < pIdx->len) { tsdbError("vgId:%d failed to send %d bytes from file %s to %s since %s", REPO_ID(pHelper->pRepo), pIdx->len, - pHelper->files.headF.fname, pHelper->files.nHeadF.fname, strerror(errno)); + helperHeadF(pHelper)->fname, helperNewHeadF(pHelper)->fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } @@ -347,9 +374,9 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) { ASSERT(pIdx->len > sizeof(SCompInfo) + sizeof(TSCKSUM) && (pIdx->len - sizeof(SCompInfo) - sizeof(TSCKSUM)) % sizeof(SCompBlock) == 0); taosCalcChecksumAppend(0, (uint8_t *)pHelper->pCompInfo, pIdx->len); - offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END); + offset = lseek(helperNewHeadF(pHelper)->fd, 0, SEEK_END); if (offset < 0) { - tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pHelper->files.nHeadF.fname, + tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), helperNewHeadF(pHelper)->fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -358,9 +385,9 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) { pIdx->uid = pHelper->tableInfo.uid; ASSERT(pIdx->offset >= TSDB_FILE_HEAD_SIZE); - if (twrite(pHelper->files.nHeadF.fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) { + if (twrite(helperNewHeadF(pHelper)->fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) { tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pHelper->pRepo), pIdx->len, - pHelper->files.nHeadF.fname, strerror(errno)); + helperNewHeadF(pHelper)->fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } @@ -371,19 +398,10 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) { } int tsdbWriteCompIdx(SRWHelper *pHelper) { + ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER); STsdbCfg *pCfg = &pHelper->pRepo->config; - ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER); - off_t offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END); - if (offset < 0) { - tsdbError("vgId:%d failed to lseek file %s to end since %s", REPO_ID(pHelper->pRepo), pHelper->files.nHeadF.fname, - strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - SFile *pFile = &(pHelper->files.nHeadF); - pFile->info.offset = offset; + SFile *pFile = helperNewIdxF(pHelper); void *buf = pHelper->pBuffer; for (uint32_t i = 0; i < pCfg->maxTables; i++) { @@ -406,9 +424,9 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) { int tsize = (char *)buf - (char *)pHelper->pBuffer + sizeof(TSCKSUM); taosCalcChecksumAppend(0, (uint8_t *)pHelper->pBuffer, tsize); - if (twrite(pHelper->files.nHeadF.fd, (void *)pHelper->pBuffer, tsize) < tsize) { - tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pHelper->pRepo), tsize, - pHelper->files.nHeadF.fname, strerror(errno)); + if (twrite(pFile->fd, (void *)pHelper->pBuffer, tsize) < tsize) { + tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pHelper->pRepo), tsize, pFile->fname, + strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } @@ -423,23 +441,21 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) { if (!helperHasState(pHelper, TSDB_HELPER_IDX_LOAD)) { // If not load from file, just load it in object - SFile *pFile = &(pHelper->files.headF); + SFile *pFile = helperIdxF(pHelper); int fd = pFile->fd; memset(pHelper->pCompIdx, 0, tsizeof(pHelper->pCompIdx)); - if (pFile->info.offset > 0) { - ASSERT(pFile->info.offset > TSDB_FILE_HEAD_SIZE); - - if (lseek(fd, pFile->info.offset, SEEK_SET) < 0) { - tsdbError("vgId:%d failed to lseek file %s to %u since %s", REPO_ID(pHelper->pRepo), pFile->fname, - pFile->info.offset, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } + if (pFile->info.len > 0) { if ((pHelper->pBuffer = trealloc(pHelper->pBuffer, pFile->info.len)) == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } + if (lseek(fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) { + tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + if (tread(fd, (void *)(pHelper->pBuffer), pFile->info.len) < pFile->info.len) { tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pHelper->pRepo), pFile->info.len, pFile->fname, strerror(errno)); @@ -447,8 +463,8 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) { return -1; } if (!taosCheckChecksumWhole((uint8_t *)(pHelper->pBuffer), pFile->info.len)) { - tsdbError("vgId:%d file %s SCompIdx part is corrupted. offset %u len %u", REPO_ID(pHelper->pRepo), pFile->fname, - pFile->info.offset, pFile->info.len); + tsdbError("vgId:%d file %s SCompIdx part is corrupted. len %u", REPO_ID(pHelper->pRepo), pFile->fname, + pFile->info.len); terrno = TSDB_CODE_TDB_FILE_CORRUPTED; return -1; } @@ -484,13 +500,13 @@ int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) { SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; - int fd = pHelper->files.headF.fd; + int fd = helperHeadF(pHelper)->fd; if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) { if (pIdx->offset > 0) { ASSERT(pIdx->uid == pHelper->tableInfo.uid); if (lseek(fd, pIdx->offset, SEEK_SET) < 0) { - tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pHelper->files.headF.fname, + tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), helperHeadF(pHelper)->fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -499,13 +515,13 @@ int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) { pHelper->pCompInfo = trealloc((void *)pHelper->pCompInfo, pIdx->len); if (tread(fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) { tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pHelper->pRepo), pIdx->len, - pHelper->files.headF.fname, strerror(errno)); + helperHeadF(pHelper)->fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } if (!taosCheckChecksumWhole((uint8_t *)pHelper->pCompInfo, pIdx->len)) { tsdbError("vgId:%d file %s SCompInfo part is corrupted, tid %d uid %" PRIu64, REPO_ID(pHelper->pRepo), - pHelper->files.headF.fname, pHelper->tableInfo.tid, pHelper->tableInfo.uid); + helperHeadF(pHelper)->fname, pHelper->tableInfo.tid, pHelper->tableInfo.uid); terrno = TSDB_CODE_TDB_FILE_CORRUPTED; return -1; } @@ -523,7 +539,7 @@ int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) { int tsdbLoadCompData(SRWHelper *pHelper, SCompBlock *pCompBlock, void *target) { ASSERT(pCompBlock->numOfSubBlocks <= 1); - SFile *pFile = (pCompBlock->last) ? &(pHelper->files.lastF) : &(pHelper->files.dataF); + SFile *pFile = (pCompBlock->last) ? helperLastF(pHelper) : helperDataF(pHelper); if (lseek(pFile->fd, pCompBlock->offset, SEEK_SET) < 0) { tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname, strerror(errno)); @@ -642,9 +658,9 @@ _err: // ---------------------- INTERNAL FUNCTIONS ---------------------- static bool tsdbShouldCreateNewLast(SRWHelper *pHelper) { - ASSERT(pHelper->files.lastF.fd > 0); + ASSERT(helperLastF(pHelper)->fd > 0); struct stat st; - if (fstat(pHelper->files.lastF.fd, &st) < 0) return true; + if (fstat(helperLastF(pHelper)->fd, &st) < 0) return true; if (st.st_size > 32 * 1024 + TSDB_FILE_HEAD_SIZE) return true; return false; } @@ -972,12 +988,13 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int static void tsdbResetHelperFileImpl(SRWHelper *pHelper) { memset((void *)&pHelper->files, 0, sizeof(pHelper->files)); - pHelper->files.fid = -1; - pHelper->files.headF.fd = -1; - pHelper->files.dataF.fd = -1; - pHelper->files.lastF.fd = -1; - pHelper->files.nHeadF.fd = -1; - pHelper->files.nLastF.fd = -1; + helperIdxF(pHelper)->fd = -1; + helperHeadF(pHelper)->fd = -1; + helperDataF(pHelper)->fd = -1; + helperLastF(pHelper)->fd = -1; + helperNewIdxF(pHelper)->fd = -1; + helperNewHeadF(pHelper)->fd = -1; + helperNewLastF(pHelper)->fd = -1; } static int tsdbInitHelperFile(SRWHelper *pHelper) { @@ -1154,7 +1171,7 @@ static int tsdbLoadBlockDataColsImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, ASSERT(pCompBlock->numOfSubBlocks <= 1); ASSERT(colIds[0] == 0); - SFile * pFile = (pCompBlock->last) ? &(pHelper->files.lastF) : &(pHelper->files.dataF); + SFile * pFile = (pCompBlock->last) ? helperLastF(pHelper) : helperDataF(pHelper); SCompCol compCol = {0}; // If only load timestamp column, no need to load SCompData part @@ -1215,7 +1232,7 @@ _err: static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols) { ASSERT(pCompBlock->numOfSubBlocks <= 1); - SFile *pFile = (pCompBlock->last) ? &(pHelper->files.lastF) : &(pHelper->files.dataF); + SFile *pFile = (pCompBlock->last) ? helperLastF(pHelper) : helperDataF(pHelper); pHelper->pBuffer = trealloc(pHelper->pBuffer, pCompBlock->len); if (pHelper->pBuffer == NULL) { @@ -1362,7 +1379,7 @@ static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, ASSERT(rowsRead > 0 && rowsRead == pDataCols->numOfRows); if (rowsRead + pCompBlock->numOfRows < pCfg->minRowsPerFileBlock && pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && !TSDB_NLAST_FILE_OPENED(pHelper)) { - if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, &compBlock, true, false) < 0) return -1; + if (tsdbWriteBlockToFile(pHelper, helperLastF(pHelper), pDataCols, &compBlock, true, false) < 0) return -1; if (tsdbAddSubBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1, rowsRead) < 0) return -1; } else { if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; @@ -1427,7 +1444,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, rows1, pDataCols, pDataCols0->cols[0].pData, pDataCols0->numOfRows); ASSERT(rowsRead == rows2 && rowsRead == pDataCols->numOfRows); - if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, &compBlock, true, false) < 0) return -1; + if (tsdbWriteBlockToFile(pHelper, helperLastF(pHelper), pDataCols, &compBlock, true, false) < 0) return -1; if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1; tblkIdx++; } else { @@ -1466,7 +1483,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, if (rowsRead == 0) break; ASSERT(rowsRead == pDataCols->numOfRows); - if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, &compBlock, false, true) < 0) return -1; + if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1; if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; tblkIdx++; } @@ -1493,7 +1510,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, keyLimit, rows, pDataCols, pDataCols0->cols[0].pData, pDataCols0->numOfRows); ASSERT(rowsRead == rows && rowsRead == pDataCols->numOfRows); - if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, &compBlock, false, false) < 0) + if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, false) < 0) return -1; if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1; tblkIdx++; @@ -1506,7 +1523,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, keyLimit, defaultRowsInBlock); if (rowsRead == 0) break; - if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, &compBlock, false, true) < 0) + if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1; if (round == 0) { if (tsdbUpdateSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; @@ -1577,10 +1594,10 @@ static int tsdbWriteBlockToProperFile(SRWHelper *pHelper, SDataCols *pDataCols, ASSERT(pDataCols->numOfRows > 0); if (pDataCols->numOfRows >= pCfg->minRowsPerFileBlock) { - pFile = &(pHelper->files.dataF); + pFile = helperDataF(pHelper); } else { isLast = true; - pFile = TSDB_NLAST_FILE_OPENED(pHelper) ? &(pHelper->files.nLastF) : &(pHelper->files.lastF); + pFile = TSDB_NLAST_FILE_OPENED(pHelper) ? helperNewLastF(pHelper) : helperLastF(pHelper); } ASSERT(pFile->fd > 0);