From aae9b3c1bc5db5d2007eb467c35a5a7eafdea3a6 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 17 Jul 2020 17:32:56 +0800 Subject: [PATCH 01/11] add a idx file for query speed --- src/tsdb/inc/tsdbMain.h | 34 +++-- src/tsdb/src/tsdbFile.c | 18 +-- src/tsdb/src/tsdbMain.c | 12 +- src/tsdb/src/tsdbMemTable.c | 7 +- src/tsdb/src/tsdbRWHelper.c | 261 +++++++++++++++++++----------------- 5 files changed, 178 insertions(+), 154 deletions(-) diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 40f2dac660..da5f03a4fb 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -132,21 +132,23 @@ typedef struct { // ------------------ tsdbFile.c extern const char* tsdbFileSuffix[]; typedef enum { - TSDB_FILE_TYPE_HEAD = 0, + TSDB_FILE_TYPE_IDX = 0, + TSDB_FILE_TYPE_HEAD, TSDB_FILE_TYPE_DATA, TSDB_FILE_TYPE_LAST, TSDB_FILE_TYPE_MAX, + TSDB_FILE_TYPE_NIDX, TSDB_FILE_TYPE_NHEAD, TSDB_FILE_TYPE_NLAST } TSDB_FILE_TYPE; typedef struct { - uint32_t offset; + uint32_t magic; uint32_t len; - uint64_t size; // total size of the file - uint64_t tombSize; // unused file size uint32_t totalBlocks; uint32_t totalSubBlocks; + uint64_t size; // total size of the file + uint64_t tombSize; // unused file size } STsdbFileInfo; typedef struct { @@ -249,16 +251,12 @@ typedef struct { typedef enum { TSDB_WRITE_HELPER, TSDB_READ_HELPER } tsdb_rw_helper_t; typedef struct { - int fid; - TSKEY minKey; - TSKEY maxKey; - // For read/write purpose - SFile headF; - SFile dataF; - SFile lastF; - // For write purpose only - SFile nHeadF; - SFile nLastF; + TSKEY minKey; + TSKEY maxKey; + SFileGroup fGroup; + SFile nIdxF; + SFile nHeadF; + SFile nLastF; } SHelperFile; typedef struct { @@ -444,6 +442,14 @@ void tsdbRemoveFileGroup(STsdbRepo* pRepo, SFileGroup* pFGroup); #define helperRepo(h) (h)->pRepo #define helperState(h) (h)->state #define TSDB_NLAST_FILE_OPENED(h) ((h)->files.nLastF.fd > 0) +#define helperFileId(h) ((h)->files.fGroup.fileId) +#define helperIdxF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_IDX])) +#define helperHeadF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_HEAD])) +#define helperDataF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_DATA])) +#define helperLastF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_LAST])) +#define helperNewIdxF(h) (&((h)->files.nIdxF)) +#define helperNewHeadF(h) (&((h)->files.nHeadF)) +#define helperNewLastF(h) (&((h)->files.nLastF)) int tsdbInitReadHelper(SRWHelper* pHelper, STsdbRepo* pRepo); int tsdbInitWriteHelper(SRWHelper* pHelper, STsdbRepo* pRepo); diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index 95cc47292b..7b7788c4d9 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -30,7 +30,7 @@ #include "ttime.h" #include "tfile.h" -const char *tsdbFileSuffix[] = {".head", ".data", ".last", "", ".h", ".l"}; +const char *tsdbFileSuffix[] = {".idx", ".head", ".data", ".last", "", ".i", ".h", ".l"}; static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type); static void tsdbDestroyFile(SFile *pFile); @@ -108,7 +108,7 @@ int tsdbOpenFileH(STsdbRepo *pRepo) { memset((void *)(&fileGroup), 0, sizeof(SFileGroup)); fileGroup.fileId = fid; - for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { + for (int type = TSDB_FILE_TYPE_IDX; type < TSDB_FILE_TYPE_MAX; type++) { if (tsdbInitFile(&fileGroup.files[type], pRepo, fid, type) < 0) { tsdbError("vgId:%d failed to init file fid %d type %d", REPO_ID(pRepo), fid, type); goto _err; @@ -126,7 +126,7 @@ int tsdbOpenFileH(STsdbRepo *pRepo) { return 0; _err: - for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) tsdbDestroyFile(&fileGroup.files[type]); + for (int type = TSDB_FILE_TYPE_IDX; type < TSDB_FILE_TYPE_MAX; type++) tsdbDestroyFile(&fileGroup.files[type]); tfree(tDataDir); if (dir != NULL) closedir(dir); @@ -139,7 +139,7 @@ void tsdbCloseFileH(STsdbRepo *pRepo) { for (int i = 0; i < pFileH->nFGroups; i++) { SFileGroup *pFGroup = pFileH->pFGroup + i; - for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { + for (int type = TSDB_FILE_TYPE_IDX; type < TSDB_FILE_TYPE_MAX; type++) { tsdbDestroyFile(&pFGroup->files[type]); } } @@ -156,7 +156,7 @@ SFileGroup *tsdbCreateFGroupIfNeed(STsdbRepo *pRepo, char *dataDir, int fid, int SFileGroup *pGroup = tsdbSearchFGroup(pFileH, fid, TD_EQ); if (pGroup == NULL) { // if not exists, create one pFGroup->fileId = fid; - for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { + for (int type = TSDB_FILE_TYPE_IDX; type < TSDB_FILE_TYPE_MAX; type++) { if (tsdbCreateFile(&pFGroup->files[type], pRepo, fid, type) < 0) goto _err; } @@ -169,7 +169,7 @@ SFileGroup *tsdbCreateFGroupIfNeed(STsdbRepo *pRepo, char *dataDir, int fid, int return pGroup; _err: - for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) tsdbDestroyFile(&pGroup->files[type]); + for (int type = TSDB_FILE_TYPE_IDX; type < TSDB_FILE_TYPE_MAX; type++) tsdbDestroyFile(&pGroup->files[type]); return NULL; } @@ -323,7 +323,7 @@ int tsdbUpdateFileHeader(SFile *pFile, uint32_t version) { int tsdbEncodeSFileInfo(void **buf, const STsdbFileInfo *pInfo) { int tlen = 0; - tlen += taosEncodeFixedU32(buf, pInfo->offset); + tlen += taosEncodeFixedU32(buf, pInfo->magic); tlen += taosEncodeFixedU32(buf, pInfo->len); tlen += taosEncodeFixedU64(buf, pInfo->size); tlen += taosEncodeFixedU64(buf, pInfo->tombSize); @@ -334,7 +334,7 @@ int tsdbEncodeSFileInfo(void **buf, const STsdbFileInfo *pInfo) { } void *tsdbDecodeSFileInfo(void *buf, STsdbFileInfo *pInfo) { - buf = taosDecodeFixedU32(buf, &(pInfo->offset)); + buf = taosDecodeFixedU32(buf, &(pInfo->magic)); buf = taosDecodeFixedU32(buf, &(pInfo->len)); buf = taosDecodeFixedU64(buf, &(pInfo->size)); buf = taosDecodeFixedU64(buf, &(pInfo->tombSize)); @@ -358,7 +358,7 @@ void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) { pFileH->nFGroups--; ASSERT(pFileH->nFGroups >= 0); - for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { + for (int type = TSDB_FILE_TYPE_IDX; type < TSDB_FILE_TYPE_MAX; type++) { if (remove(fileGroup.files[type].fname) < 0) { tsdbError("vgId:%d failed to remove file %s", REPO_ID(pRepo), fileGroup.files[type].fname); } diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index ffaab375a3..848cd4e85a 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -214,7 +214,7 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_ char *prefix = dirname(sdup); if (name[0] == 0) { // get the file from index or after, but not larger than eindex - int fid = (*index) / 3; + int fid = (*index) / TSDB_FILE_TYPE_MAX; if (pFileH->nFGroups == 0 || fid > pFileH->pFGroup[pFileH->nFGroups - 1].fileId) { if (*index <= TSDB_META_FILE_INDEX && TSDB_META_FILE_INDEX <= eindex) { @@ -228,11 +228,11 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_ SFileGroup *pFGroup = taosbsearch(&fid, pFileH->pFGroup, pFileH->nFGroups, sizeof(SFileGroup), keyFGroupCompFunc, TD_GE); if (pFGroup->fileId == fid) { - fname = strdup(pFGroup->files[(*index) % 3].fname); + fname = strdup(pFGroup->files[(*index) % TSDB_FILE_TYPE_MAX].fname); } else { - if (pFGroup->fileId * 3 + 2 < eindex) { + if ((pFGroup->fileId + 1) * TSDB_FILE_TYPE_MAX - 1 < eindex) { fname = strdup(pFGroup->files[0].fname); - *index = pFGroup->fileId * 3; + *index = pFGroup->fileId * TSDB_FILE_TYPE_MAX; } else { tfree(sdup); return 0; @@ -244,14 +244,14 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_ if (*index == TSDB_META_FILE_INDEX) { // get meta file fname = tsdbGetMetaFileName(pRepo->rootDir); } else { - int fid = (*index) / 3; + int fid = (*index) / TSDB_FILE_TYPE_MAX; SFileGroup *pFGroup = tsdbSearchFGroup(pFileH, fid, TD_EQ); if (pFGroup == NULL) { // not found tfree(sdup); return 0; } - SFile *pFile = &pFGroup->files[(*index) % 3]; + SFile *pFile = &pFGroup->files[(*index) % TSDB_FILE_TYPE_MAX]; fname = strdup(pFile->fname); } } diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index b29cec3cf9..69e6ab3c0f 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -628,9 +628,10 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe tsdbCloseHelperFile(pHelper, 0); pthread_rwlock_wrlock(&(pFileH->fhlock)); - pGroup->files[TSDB_FILE_TYPE_HEAD] = pHelper->files.headF; - pGroup->files[TSDB_FILE_TYPE_DATA] = pHelper->files.dataF; - pGroup->files[TSDB_FILE_TYPE_LAST] = pHelper->files.lastF; + pGroup->files[TSDB_FILE_TYPE_IDX] = *(helperIdxF(pHelper)); + pGroup->files[TSDB_FILE_TYPE_HEAD] = *(helperHeadF(pHelper)); + pGroup->files[TSDB_FILE_TYPE_DATA] = *(helperDataF(pHelper)); + pGroup->files[TSDB_FILE_TYPE_LAST] = *(helperLastF(pHelper)); pthread_rwlock_unlock(&(pFileH->fhlock)); return 0; diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index 0d52b7ae33..79b4a8ef64 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -106,44 +106,36 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) { ASSERT(pHelper->state == TSDB_HELPER_CLEAR_STATE); // Set the files - pHelper->files.fid = pGroup->fileId; - pHelper->files.headF = pGroup->files[TSDB_FILE_TYPE_HEAD]; - pHelper->files.dataF = pGroup->files[TSDB_FILE_TYPE_DATA]; - pHelper->files.lastF = pGroup->files[TSDB_FILE_TYPE_LAST]; + pHelper->files.fGroup = *pGroup; if (helperType(pHelper) == TSDB_WRITE_HELPER) { - tsdbGetDataFileName(pHelper->pRepo, pGroup->fileId, TSDB_FILE_TYPE_NHEAD, pHelper->files.nHeadF.fname); - tsdbGetDataFileName(pHelper->pRepo, pGroup->fileId, TSDB_FILE_TYPE_NLAST, pHelper->files.nLastF.fname); + tsdbGetDataFileName(pHelper->pRepo, pGroup->fileId, TSDB_FILE_TYPE_NIDX, helperNewIdxF(pHelper)->fname); + tsdbGetDataFileName(pHelper->pRepo, pGroup->fileId, TSDB_FILE_TYPE_NHEAD, helperNewHeadF(pHelper)->fname); + tsdbGetDataFileName(pHelper->pRepo, pGroup->fileId, TSDB_FILE_TYPE_NLAST, helperNewLastF(pHelper)->fname); } // Open the files - if (tsdbOpenFile(&(pHelper->files.headF), O_RDONLY) < 0) goto _err; + if (tsdbOpenFile(helperIdxF(pHelper), O_RDONLY) < 0) goto _err; + if (tsdbOpenFile(helperHeadF(pHelper), O_RDONLY) < 0) goto _err; if (helperType(pHelper) == TSDB_WRITE_HELPER) { - if (tsdbOpenFile(&(pHelper->files.dataF), O_RDWR) < 0) goto _err; - if (tsdbOpenFile(&(pHelper->files.lastF), O_RDWR) < 0) goto _err; + if (tsdbOpenFile(helperDataF(pHelper), O_RDWR) < 0) goto _err; + if (tsdbOpenFile(helperLastF(pHelper), O_RDWR) < 0) goto _err; + + // Create and open .i file + if (tsdbOpenFile(helperNewIdxF(pHelper), O_WRONLY | O_CREAT) < 0) return -1; + if (tsdbUpdateFileHeader(helperNewIdxF(pHelper), 0) < 0) return -1; // Create and open .h - if (tsdbOpenFile(&(pHelper->files.nHeadF), O_WRONLY | O_CREAT) < 0) return -1; - // size_t tsize = TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pCfg->maxTables + sizeof(TSCKSUM); - if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) { - tsdbError("vgId:%d failed to sendfile %d bytes from file %s to %s since %s", REPO_ID(pHelper->pRepo), - TSDB_FILE_HEAD_SIZE, pHelper->files.headF.fname, pHelper->files.nHeadF.fname, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + if (tsdbOpenFile(helperNewHeadF(pHelper), O_WRONLY | O_CREAT) < 0) return -1; + if (tsdbUpdateFileHeader(helperNewHeadF(pHelper), 0) < 0) return -1; // Create and open .l file if should if (tsdbShouldCreateNewLast(pHelper)) { - if (tsdbOpenFile(&(pHelper->files.nLastF), O_WRONLY | O_CREAT) < 0) goto _err; - if (tsendfile(pHelper->files.nLastF.fd, pHelper->files.lastF.fd, NULL, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) { - tsdbError("vgId:%d failed to sendfile %d bytes from file %s to %s since %s", REPO_ID(pHelper->pRepo), - TSDB_FILE_HEAD_SIZE, pHelper->files.lastF.fname, pHelper->files.nLastF.fname, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + if (tsdbOpenFile(helperNewLastF(pHelper), O_WRONLY | O_CREAT) < 0) goto _err; + if (tsdbUpdateFileHeader(helperNewLastF(pHelper), 0) < 0) return -1; } } else { - if (tsdbOpenFile(&(pHelper->files.dataF), O_RDONLY) < 0) goto _err; - if (tsdbOpenFile(&(pHelper->files.lastF), O_RDONLY) < 0) goto _err; + if (tsdbOpenFile(helperDataF(pHelper), O_RDONLY) < 0) goto _err; + if (tsdbOpenFile(helperLastF(pHelper), O_RDONLY) < 0) goto _err; } helperSetState(pHelper, TSDB_HELPER_FILE_SET_AND_OPEN); @@ -155,59 +147,94 @@ _err: } int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) { - if (pHelper->files.headF.fd > 0) { - close(pHelper->files.headF.fd); - pHelper->files.headF.fd = -1; + SFile *pFile = NULL; + + pFile = helperIdxF(pHelper); + if (pFile->fd > 0) { + close(pFile->fd); + pFile->fd = -1; } - if (pHelper->files.dataF.fd > 0) { + + pFile = helperHeadF(pHelper); + if (pFile->fd > 0) { + close(pFile->fd); + pFile->fd = -1; + } + + pFile = helperDataF(pHelper); + if (pFile->fd > 0) { if (helperType(pHelper) == TSDB_WRITE_HELPER) { - tsdbUpdateFileHeader(&(pHelper->files.dataF), 0); - fsync(pHelper->files.dataF.fd); + tsdbUpdateFileHeader(pFile, 0); + fsync(pFile->fd); } - close(pHelper->files.dataF.fd); - pHelper->files.dataF.fd = -1; + close(pFile->fd); + pFile->fd = -1; } - if (pHelper->files.lastF.fd > 0) { - if (helperType(pHelper) == TSDB_WRITE_HELPER) { - fsync(pHelper->files.lastF.fd); + + pFile = helperLastF(pHelper); + if (pFile->fd > 0) { + if (helperType(pHelper) == TSDB_WRITE_HELPER && !TSDB_NLAST_FILE_OPENED(pHelper)) { + fsync(pFile->fd); } - close(pHelper->files.lastF.fd); - pHelper->files.lastF.fd = -1; + close(pFile->fd); + pFile->fd = -1; } + if (helperType(pHelper) == TSDB_WRITE_HELPER) { - if (pHelper->files.nHeadF.fd > 0) { - if (!hasError) tsdbUpdateFileHeader(&(pHelper->files.nHeadF), 0); - fsync(pHelper->files.nHeadF.fd); - close(pHelper->files.nHeadF.fd); - pHelper->files.nHeadF.fd = -1; + pFile = helperNewIdxF(pHelper); + if (pFile->fd > 0) { + if (!hasError) tsdbUpdateFileHeader(pFile, 0); + fsync(pFile->fd); + close(pFile->fd); + pFile->fd = -1; if (hasError) { - (void)remove(pHelper->files.nHeadF.fname); + (void)remove(pFile->fname); } else { - if (rename(pHelper->files.nHeadF.fname, pHelper->files.headF.fname) < 0) { - tsdbError("failed to rename file from %s to %s since %s", pHelper->files.nHeadF.fname, - pHelper->files.headF.fname, strerror(errno)); + if (rename(pFile->fname, helperIdxF(pHelper)->fname) < 0) { + tsdbError("failed to rename file from %s to %s since %s", pFile->fname, helperIdxF(pHelper)->fname, + strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } - pHelper->files.headF.info = pHelper->files.nHeadF.info; + helperIdxF(pHelper)->info = pFile->info; } } - if (pHelper->files.nLastF.fd > 0) { - if (!hasError) tsdbUpdateFileHeader(&(pHelper->files.nLastF), 0); - fsync(pHelper->files.nLastF.fd); - close(pHelper->files.nLastF.fd); - pHelper->files.nLastF.fd = -1; + pFile = helperNewHeadF(pHelper); + if (pFile->fd > 0) { + if (!hasError) tsdbUpdateFileHeader(pFile, 0); + fsync(pFile->fd); + close(pFile->fd); + pFile->fd = -1; if (hasError) { - (void)remove(pHelper->files.nLastF.fname); + (void)remove(pFile->fname); } else { - if (rename(pHelper->files.nLastF.fname, pHelper->files.lastF.fname) < 0) { - tsdbError("failed to rename file from %s to %s since %s", pHelper->files.nLastF.fname, - pHelper->files.lastF.fname, strerror(errno)); + if (rename(pFile->fname, helperHeadF(pHelper)->fname) < 0) { + tsdbError("failed to rename file from %s to %s since %s", pFile->fname, helperHeadF(pHelper)->fname, + strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } - pHelper->files.lastF.info = pHelper->files.nLastF.info; + helperHeadF(pHelper)->info = pFile->info; + } + } + + pFile = helperNewLastF(pHelper); + if (pFile->fd > 0) { + if (!hasError) tsdbUpdateFileHeader(pFile, 0); + fsync(pFile->fd); + close(pFile->fd); + pFile->fd = -1; + if (hasError) { + (void)remove(pFile->fname); + } else { + if (rename(pFile->fname, helperLastF(pHelper)->fname) < 0) { + tsdbError("failed to rename file from %s to %s since %s", pFile->fname, helperLastF(pHelper)->fname, + strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + helperLastF(pHelper)->info = helperNewLastF(pHelper)->info; } } } @@ -283,28 +310,28 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) { if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows && pHelper->pDataCols[0]->numOfRows < pCfg->minRowsPerFileBlock); - if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.nLastF), pHelper->pDataCols[0], &compBlock, true, true) < 0) + if (tsdbWriteBlockToFile(pHelper, helperNewLastF(pHelper), pHelper->pDataCols[0], &compBlock, true, true) < 0) return -1; if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1; } else { - if (lseek(pHelper->files.lastF.fd, pCompBlock->offset, SEEK_SET) < 0) { - tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pHelper->files.lastF.fname, + if (lseek(helperLastF(pHelper)->fd, pCompBlock->offset, SEEK_SET) < 0) { + tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), helperLastF(pHelper)->fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } - pCompBlock->offset = lseek(pHelper->files.nLastF.fd, 0, SEEK_END); + pCompBlock->offset = lseek(helperNewLastF(pHelper)->fd, 0, SEEK_END); if (pCompBlock->offset < 0) { - tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pHelper->files.nLastF.fname, + tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), helperNewLastF(pHelper)->fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } - if (tsendfile(pHelper->files.nLastF.fd, pHelper->files.lastF.fd, NULL, pCompBlock->len) < pCompBlock->len) { + if (tsendfile(helperNewLastF(pHelper)->fd, helperLastF(pHelper)->fd, NULL, pCompBlock->len) < pCompBlock->len) { tsdbError("vgId:%d failed to sendfile from file %s to file %s since %s", REPO_ID(pHelper->pRepo), - pHelper->files.lastF.fname, pHelper->files.nLastF.fname, strerror(errno)); + helperLastF(pHelper)->fname, helperNewLastF(pHelper)->fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } @@ -321,9 +348,9 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) { SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) { if (pIdx->offset > 0) { - offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END); + offset = lseek(helperNewHeadF(pHelper)->fd, 0, SEEK_END); if (offset < 0) { - tsdbError("vgId:%d failed to lseed file %s since %s", REPO_ID(pHelper->pRepo), pHelper->files.nHeadF.fname, + tsdbError("vgId:%d failed to lseed file %s since %s", REPO_ID(pHelper->pRepo), helperNewHeadF(pHelper)->fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -332,9 +359,9 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) { pIdx->offset = offset; ASSERT(pIdx->offset >= TSDB_FILE_HEAD_SIZE); - if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, pIdx->len) < pIdx->len) { + if (tsendfile(helperNewHeadF(pHelper)->fd, helperHeadF(pHelper)->fd, NULL, pIdx->len) < pIdx->len) { tsdbError("vgId:%d failed to send %d bytes from file %s to %s since %s", REPO_ID(pHelper->pRepo), pIdx->len, - pHelper->files.headF.fname, pHelper->files.nHeadF.fname, strerror(errno)); + helperHeadF(pHelper)->fname, helperNewHeadF(pHelper)->fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } @@ -347,9 +374,9 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) { ASSERT(pIdx->len > sizeof(SCompInfo) + sizeof(TSCKSUM) && (pIdx->len - sizeof(SCompInfo) - sizeof(TSCKSUM)) % sizeof(SCompBlock) == 0); taosCalcChecksumAppend(0, (uint8_t *)pHelper->pCompInfo, pIdx->len); - offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END); + offset = lseek(helperNewHeadF(pHelper)->fd, 0, SEEK_END); if (offset < 0) { - tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pHelper->files.nHeadF.fname, + tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), helperNewHeadF(pHelper)->fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -358,9 +385,9 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) { pIdx->uid = pHelper->tableInfo.uid; ASSERT(pIdx->offset >= TSDB_FILE_HEAD_SIZE); - if (twrite(pHelper->files.nHeadF.fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) { + if (twrite(helperNewHeadF(pHelper)->fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) { tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pHelper->pRepo), pIdx->len, - pHelper->files.nHeadF.fname, strerror(errno)); + helperNewHeadF(pHelper)->fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } @@ -371,19 +398,10 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) { } int tsdbWriteCompIdx(SRWHelper *pHelper) { + ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER); STsdbCfg *pCfg = &pHelper->pRepo->config; - ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER); - off_t offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END); - if (offset < 0) { - tsdbError("vgId:%d failed to lseek file %s to end since %s", REPO_ID(pHelper->pRepo), pHelper->files.nHeadF.fname, - strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - SFile *pFile = &(pHelper->files.nHeadF); - pFile->info.offset = offset; + SFile *pFile = helperNewIdxF(pHelper); void *buf = pHelper->pBuffer; for (uint32_t i = 0; i < pCfg->maxTables; i++) { @@ -406,9 +424,9 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) { int tsize = (char *)buf - (char *)pHelper->pBuffer + sizeof(TSCKSUM); taosCalcChecksumAppend(0, (uint8_t *)pHelper->pBuffer, tsize); - if (twrite(pHelper->files.nHeadF.fd, (void *)pHelper->pBuffer, tsize) < tsize) { - tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pHelper->pRepo), tsize, - pHelper->files.nHeadF.fname, strerror(errno)); + if (twrite(pFile->fd, (void *)pHelper->pBuffer, tsize) < tsize) { + tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pHelper->pRepo), tsize, pFile->fname, + strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } @@ -423,23 +441,21 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) { if (!helperHasState(pHelper, TSDB_HELPER_IDX_LOAD)) { // If not load from file, just load it in object - SFile *pFile = &(pHelper->files.headF); + SFile *pFile = helperIdxF(pHelper); int fd = pFile->fd; memset(pHelper->pCompIdx, 0, tsizeof(pHelper->pCompIdx)); - if (pFile->info.offset > 0) { - ASSERT(pFile->info.offset > TSDB_FILE_HEAD_SIZE); - - if (lseek(fd, pFile->info.offset, SEEK_SET) < 0) { - tsdbError("vgId:%d failed to lseek file %s to %u since %s", REPO_ID(pHelper->pRepo), pFile->fname, - pFile->info.offset, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } + if (pFile->info.len > 0) { if ((pHelper->pBuffer = trealloc(pHelper->pBuffer, pFile->info.len)) == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } + if (lseek(fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) { + tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + if (tread(fd, (void *)(pHelper->pBuffer), pFile->info.len) < pFile->info.len) { tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pHelper->pRepo), pFile->info.len, pFile->fname, strerror(errno)); @@ -447,8 +463,8 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) { return -1; } if (!taosCheckChecksumWhole((uint8_t *)(pHelper->pBuffer), pFile->info.len)) { - tsdbError("vgId:%d file %s SCompIdx part is corrupted. offset %u len %u", REPO_ID(pHelper->pRepo), pFile->fname, - pFile->info.offset, pFile->info.len); + tsdbError("vgId:%d file %s SCompIdx part is corrupted. len %u", REPO_ID(pHelper->pRepo), pFile->fname, + pFile->info.len); terrno = TSDB_CODE_TDB_FILE_CORRUPTED; return -1; } @@ -484,13 +500,13 @@ int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) { SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; - int fd = pHelper->files.headF.fd; + int fd = helperHeadF(pHelper)->fd; if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) { if (pIdx->offset > 0) { ASSERT(pIdx->uid == pHelper->tableInfo.uid); if (lseek(fd, pIdx->offset, SEEK_SET) < 0) { - tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pHelper->files.headF.fname, + tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), helperHeadF(pHelper)->fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -499,13 +515,13 @@ int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) { pHelper->pCompInfo = trealloc((void *)pHelper->pCompInfo, pIdx->len); if (tread(fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) { tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pHelper->pRepo), pIdx->len, - pHelper->files.headF.fname, strerror(errno)); + helperHeadF(pHelper)->fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } if (!taosCheckChecksumWhole((uint8_t *)pHelper->pCompInfo, pIdx->len)) { tsdbError("vgId:%d file %s SCompInfo part is corrupted, tid %d uid %" PRIu64, REPO_ID(pHelper->pRepo), - pHelper->files.headF.fname, pHelper->tableInfo.tid, pHelper->tableInfo.uid); + helperHeadF(pHelper)->fname, pHelper->tableInfo.tid, pHelper->tableInfo.uid); terrno = TSDB_CODE_TDB_FILE_CORRUPTED; return -1; } @@ -523,7 +539,7 @@ int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) { int tsdbLoadCompData(SRWHelper *pHelper, SCompBlock *pCompBlock, void *target) { ASSERT(pCompBlock->numOfSubBlocks <= 1); - SFile *pFile = (pCompBlock->last) ? &(pHelper->files.lastF) : &(pHelper->files.dataF); + SFile *pFile = (pCompBlock->last) ? helperLastF(pHelper) : helperDataF(pHelper); if (lseek(pFile->fd, pCompBlock->offset, SEEK_SET) < 0) { tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname, strerror(errno)); @@ -642,9 +658,9 @@ _err: // ---------------------- INTERNAL FUNCTIONS ---------------------- static bool tsdbShouldCreateNewLast(SRWHelper *pHelper) { - ASSERT(pHelper->files.lastF.fd > 0); + ASSERT(helperLastF(pHelper)->fd > 0); struct stat st; - if (fstat(pHelper->files.lastF.fd, &st) < 0) return true; + if (fstat(helperLastF(pHelper)->fd, &st) < 0) return true; if (st.st_size > 32 * 1024 + TSDB_FILE_HEAD_SIZE) return true; return false; } @@ -972,12 +988,13 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int static void tsdbResetHelperFileImpl(SRWHelper *pHelper) { memset((void *)&pHelper->files, 0, sizeof(pHelper->files)); - pHelper->files.fid = -1; - pHelper->files.headF.fd = -1; - pHelper->files.dataF.fd = -1; - pHelper->files.lastF.fd = -1; - pHelper->files.nHeadF.fd = -1; - pHelper->files.nLastF.fd = -1; + helperIdxF(pHelper)->fd = -1; + helperHeadF(pHelper)->fd = -1; + helperDataF(pHelper)->fd = -1; + helperLastF(pHelper)->fd = -1; + helperNewIdxF(pHelper)->fd = -1; + helperNewHeadF(pHelper)->fd = -1; + helperNewLastF(pHelper)->fd = -1; } static int tsdbInitHelperFile(SRWHelper *pHelper) { @@ -1154,7 +1171,7 @@ static int tsdbLoadBlockDataColsImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, ASSERT(pCompBlock->numOfSubBlocks <= 1); ASSERT(colIds[0] == 0); - SFile * pFile = (pCompBlock->last) ? &(pHelper->files.lastF) : &(pHelper->files.dataF); + SFile * pFile = (pCompBlock->last) ? helperLastF(pHelper) : helperDataF(pHelper); SCompCol compCol = {0}; // If only load timestamp column, no need to load SCompData part @@ -1215,7 +1232,7 @@ _err: static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols) { ASSERT(pCompBlock->numOfSubBlocks <= 1); - SFile *pFile = (pCompBlock->last) ? &(pHelper->files.lastF) : &(pHelper->files.dataF); + SFile *pFile = (pCompBlock->last) ? helperLastF(pHelper) : helperDataF(pHelper); pHelper->pBuffer = trealloc(pHelper->pBuffer, pCompBlock->len); if (pHelper->pBuffer == NULL) { @@ -1362,7 +1379,7 @@ static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, ASSERT(rowsRead > 0 && rowsRead == pDataCols->numOfRows); if (rowsRead + pCompBlock->numOfRows < pCfg->minRowsPerFileBlock && pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && !TSDB_NLAST_FILE_OPENED(pHelper)) { - if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, &compBlock, true, false) < 0) return -1; + if (tsdbWriteBlockToFile(pHelper, helperLastF(pHelper), pDataCols, &compBlock, true, false) < 0) return -1; if (tsdbAddSubBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1, rowsRead) < 0) return -1; } else { if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; @@ -1427,7 +1444,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, rows1, pDataCols, pDataCols0->cols[0].pData, pDataCols0->numOfRows); ASSERT(rowsRead == rows2 && rowsRead == pDataCols->numOfRows); - if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, &compBlock, true, false) < 0) return -1; + if (tsdbWriteBlockToFile(pHelper, helperLastF(pHelper), pDataCols, &compBlock, true, false) < 0) return -1; if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1; tblkIdx++; } else { @@ -1466,7 +1483,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, if (rowsRead == 0) break; ASSERT(rowsRead == pDataCols->numOfRows); - if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, &compBlock, false, true) < 0) return -1; + if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1; if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; tblkIdx++; } @@ -1493,7 +1510,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, keyLimit, rows, pDataCols, pDataCols0->cols[0].pData, pDataCols0->numOfRows); ASSERT(rowsRead == rows && rowsRead == pDataCols->numOfRows); - if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, &compBlock, false, false) < 0) + if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, false) < 0) return -1; if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1; tblkIdx++; @@ -1506,7 +1523,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, keyLimit, defaultRowsInBlock); if (rowsRead == 0) break; - if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, &compBlock, false, true) < 0) + if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1; if (round == 0) { if (tsdbUpdateSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; @@ -1577,10 +1594,10 @@ static int tsdbWriteBlockToProperFile(SRWHelper *pHelper, SDataCols *pDataCols, ASSERT(pDataCols->numOfRows > 0); if (pDataCols->numOfRows >= pCfg->minRowsPerFileBlock) { - pFile = &(pHelper->files.dataF); + pFile = helperDataF(pHelper); } else { isLast = true; - pFile = TSDB_NLAST_FILE_OPENED(pHelper) ? &(pHelper->files.nLastF) : &(pHelper->files.lastF); + pFile = TSDB_NLAST_FILE_OPENED(pHelper) ? helperNewLastF(pHelper) : helperLastF(pHelper); } ASSERT(pFile->fd > 0); From 31824d9ffa0c05b52d0d1d21ddd806b9bc372267 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 20 Jul 2020 15:01:37 +0800 Subject: [PATCH 02/11] get rid of maxTables in TSDB --- src/tsdb/inc/tsdbMain.h | 13 ++- src/tsdb/src/tsdbMain.c | 2 +- src/tsdb/src/tsdbRWHelper.c | 186 +++++++++++++++++++++++------------- src/tsdb/src/tsdbRead.c | 6 +- 4 files changed, 131 insertions(+), 76 deletions(-) diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index da5f03a4fb..9346230e02 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -199,6 +199,7 @@ typedef struct { // ------------------ tsdbRWHelper.c typedef struct { + int32_t tid; uint32_t len; uint32_t offset; uint32_t hasLast : 2; @@ -262,9 +263,14 @@ typedef struct { typedef struct { uint64_t uid; int32_t tid; - int32_t sversion; } SHelperTable; +typedef struct { + SCompIdx* pIdxArray; + int numOfIdx; + int curIdx; +} SIdxH; + typedef struct { tsdb_rw_helper_t type; @@ -272,7 +278,9 @@ typedef struct { int8_t state; // For file set usage SHelperFile files; - SCompIdx* pCompIdx; + SIdxH idxH; + SCompIdx curCompIdx; + void* pWIdx; // For table set usage SHelperTable tableInfo; SCompInfo* pCompInfo; @@ -284,7 +292,6 @@ typedef struct { void* compBuffer; // Buffer for temperary compress/decompress purpose } SRWHelper; - // Operations // ------------------ tsdbMeta.c #define TABLE_TYPE(t) (t)->type diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index c8b31af4f0..82dbe5f498 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -793,7 +793,7 @@ static int tsdbRestoreInfo(STsdbRepo *pRepo) { for (int i = 1; i < pRepo->config.maxTables; i++) { STable *pTable = pMeta->tables[i]; if (pTable == NULL) continue; - SCompIdx *pIdx = &rhelper.pCompIdx[i]; + SCompIdx *pIdx = &(rhelper.curCompIdx); if (pIdx->offset > 0 && pTable->lastKey < pIdx->maxKey) pTable->lastKey = pIdx->maxKey; } diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index 79b4a8ef64..19cb791ce8 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -60,6 +60,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter int *blkIdx); static int tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, TSKEY maxKey, int maxRows); +static int tsdbCompareTidIdx(const void *key1, const void *key2); // ---------------------- INTERNAL FUNCTIONS ---------------------- int tsdbInitReadHelper(SRWHelper *pHelper, STsdbRepo *pRepo) { @@ -251,17 +252,35 @@ void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) { pHelper->tableInfo.tid = pTable->tableId.tid; pHelper->tableInfo.uid = pTable->tableId.uid; STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1); - pHelper->tableInfo.sversion = schemaVersion(pSchema); tdInitDataCols(pHelper->pDataCols[0], pSchema); tdInitDataCols(pHelper->pDataCols[1], pSchema); - SCompIdx *pIdx = pHelper->pCompIdx + pTable->tableId.tid; - if (pIdx->offset > 0) { - if (pIdx->uid != TABLE_UID(pTable)) { - memset((void *)pIdx, 0, sizeof(SCompIdx)); + if (helperType(pHelper) == TSDB_WRITE_HELPER) { + if (pHelper->idxH.numOfIdx > 0) { + if (pHelper->idxH.curIdx >= pHelper->idxH.numOfIdx) { + memset(&(pHelper->curCompIdx), 0, sizeof(SCompIdx)); + } else { + SCompIdx *pIdx = &(pHelper->idxH.pIdxArray[pHelper->idxH.curIdx]); + if (pIdx->tid == TABLE_TID(pTable)) { + pHelper->curCompIdx = *pIdx; + pHelper->idxH.curIdx++; + } else { + ASSERT(pIdx->tid > TABLE_TID(pTable)); + memset(&(pHelper->curCompIdx), 0, sizeof(SCompIdx)); + } + } } else { - if (pIdx->hasLast) pHelper->hasOldLastBlock = true; + memset(&(pHelper->curCompIdx), 0, sizeof(SCompIdx)); + } + } else { + // TODO: make it more efficient + void *ptr = bsearch(&TABLE_TID(pTable), (void *)pHelper->idxH.pIdxArray, pHelper->idxH.numOfIdx, sizeof(SCompIdx), + tsdbCompareTidIdx); + if (ptr == NULL) { + memset(&(pHelper->curCompIdx), 0, sizeof(SCompIdx)); + } else { + pHelper->curCompIdx = *(SCompIdx *)ptr; } } @@ -272,8 +291,8 @@ void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) { int tsdbCommitTableData(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey) { ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER); - SCompIdx * pIdx = &(pHelper->pCompIdx[TABLE_TID(pCommitIter->pTable)]); - int blkIdx = 0; + SCompIdx *pIdx = &(pHelper->curCompIdx); + int blkIdx = 0; ASSERT(pIdx->offset == 0 || pIdx->uid == TABLE_UID(pCommitIter->pTable)); if (tsdbLoadCompInfo(pHelper, NULL) < 0) return -1; @@ -298,7 +317,7 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) { STsdbCfg *pCfg = &pHelper->pRepo->config; ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER); - SCompIdx * pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; + SCompIdx * pIdx = &(pHelper->curCompIdx); SCompBlock compBlock = {0}; if (TSDB_NLAST_FILE_OPENED(pHelper) && (pHelper->hasOldLastBlock)) { if (tsdbLoadCompInfo(pHelper, NULL) < 0) return -1; @@ -344,10 +363,11 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) { } int tsdbWriteCompInfo(SRWHelper *pHelper) { - off_t offset = 0; - SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; - if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) { - if (pIdx->offset > 0) { + SCompIdx *pIdx = &(pHelper->curCompIdx); + off_t offset = 0; + + if (pIdx->len > 0) { + if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) { offset = lseek(helperNewHeadF(pHelper)->fd, 0, SEEK_END); if (offset < 0) { tsdbError("vgId:%d failed to lseed file %s since %s", REPO_ID(pHelper->pRepo), helperNewHeadF(pHelper)->fname, @@ -357,6 +377,8 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) { } pIdx->offset = offset; + pIdx->uid = pHelper->tableInfo.uid; + pIdx->tid = pHelper->tableInfo.tid; ASSERT(pIdx->offset >= TSDB_FILE_HEAD_SIZE); if (tsendfile(helperNewHeadF(pHelper)->fd, helperHeadF(pHelper)->fd, NULL, pIdx->len) < pIdx->len) { @@ -365,9 +387,7 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; } - } - } else { - if (pIdx->len > 0) { + } else { pHelper->pCompInfo->delimiter = TSDB_FILE_DELIMITER; pHelper->pCompInfo->uid = pHelper->tableInfo.uid; pHelper->pCompInfo->checksum = 0; @@ -383,6 +403,7 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) { } pIdx->offset = offset; pIdx->uid = pHelper->tableInfo.uid; + pIdx->tid = pHelper->tableInfo.tid; ASSERT(pIdx->offset >= TSDB_FILE_HEAD_SIZE); if (twrite(helperNewHeadF(pHelper)->fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) { @@ -392,6 +413,17 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) { return -1; } } + + if (tsizeof(pHelper->pWIdx) < helperNewIdxF(pHelper)->info.len + sizeof(SCompIdx) + 12) { + pHelper->pWIdx = trealloc(pHelper->pWIdx, tsizeof(pHelper->pWIdx) == 0 ? 1024 : tsizeof(pHelper->pWIdx) * 2); + if (pHelper->pWIdx == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + } + + void *pBuf = POINTER_SHIFT(pHelper->pWIdx, helperNewIdxF(pHelper)->info.len); + helperNewIdxF(pHelper)->info.len += tsdbEncodeSCompIdx(&pBuf, &(pHelper->curCompIdx)); } return 0; @@ -399,57 +431,43 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) { int tsdbWriteCompIdx(SRWHelper *pHelper) { ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER); - STsdbCfg *pCfg = &pHelper->pRepo->config; + // STsdbCfg *pCfg = &pHelper->pRepo->config; SFile *pFile = helperNewIdxF(pHelper); - void *buf = pHelper->pBuffer; - for (uint32_t i = 0; i < pCfg->maxTables; i++) { - SCompIdx *pCompIdx = pHelper->pCompIdx + i; - if (pCompIdx->offset > 0) { - int drift = POINTER_DISTANCE(buf, pHelper->pBuffer); - if (tsizeof(pHelper->pBuffer) - drift < 128) { - pHelper->pBuffer = trealloc(pHelper->pBuffer, tsizeof(pHelper->pBuffer) * 2); - if (pHelper->pBuffer == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } - } - buf = POINTER_SHIFT(pHelper->pBuffer, drift); - taosEncodeVariantU32(&buf, i); - tsdbEncodeSCompIdx(&buf, pCompIdx); + pFile->info.len += sizeof(TSCKSUM); + if (tsizeof(pHelper->pWIdx) < pFile->info.len) { + pHelper->pWIdx = trealloc(pHelper->pWIdx, pFile->info.len); + if (pHelper->pWIdx == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; } } + taosCalcChecksumAppend(0, (uint8_t *)pHelper->pWIdx, pFile->info.len); - int tsize = (char *)buf - (char *)pHelper->pBuffer + sizeof(TSCKSUM); - taosCalcChecksumAppend(0, (uint8_t *)pHelper->pBuffer, tsize); - - if (twrite(pFile->fd, (void *)pHelper->pBuffer, tsize) < tsize) { - tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pHelper->pRepo), tsize, pFile->fname, - strerror(errno)); + if (twrite(pFile->fd, (void *)pHelper->pWIdx, pFile->info.len) < pFile->info.len) { + tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pHelper->pRepo), pFile->info.len, + pFile->fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } - pFile->info.len = tsize; + return 0; } int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) { - STsdbCfg *pCfg = &(pHelper->pRepo->config); - ASSERT(pHelper->state == TSDB_HELPER_FILE_SET_AND_OPEN); + SFile *pFile = helperIdxF(pHelper); + int fd = pFile->fd; if (!helperHasState(pHelper, TSDB_HELPER_IDX_LOAD)) { // If not load from file, just load it in object - SFile *pFile = helperIdxF(pHelper); - int fd = pFile->fd; - - memset(pHelper->pCompIdx, 0, tsizeof(pHelper->pCompIdx)); if (pFile->info.len > 0) { if ((pHelper->pBuffer = trealloc(pHelper->pBuffer, pFile->info.len)) == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } + if (lseek(fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) { tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); @@ -462,6 +480,7 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; } + if (!taosCheckChecksumWhole((uint8_t *)(pHelper->pBuffer), pFile->info.len)) { tsdbError("vgId:%d file %s SCompIdx part is corrupted. len %u", REPO_ID(pHelper->pRepo), pFile->fname, pFile->info.len); @@ -470,27 +489,49 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) { } // Decode it + pHelper->idxH.numOfIdx = 0; void *ptr = pHelper->pBuffer; while (POINTER_DISTANCE(ptr, pHelper->pBuffer) < (pFile->info.len - sizeof(TSCKSUM))) { - uint32_t tid = 0; - if ((ptr = taosDecodeVariantU32(ptr, &tid)) == NULL) return -1; - ASSERT(tid > 0 && tid < pCfg->maxTables); + size_t tlen = tsizeof(pHelper->idxH.pIdxArray); + pHelper->idxH.numOfIdx++; - if ((ptr = tsdbDecodeSCompIdx(ptr, pHelper->pCompIdx + tid)) == NULL) return -1; + if (tlen < pHelper->idxH.numOfIdx) { + pHelper->idxH.pIdxArray = (SCompIdx *)trealloc(pHelper->idxH.pIdxArray, (tlen == 0) ? 1024 : tlen * 2); + if (pHelper->idxH.pIdxArray == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + } + + ptr = tsdbDecodeSCompIdx(ptr, &(pHelper->idxH.pIdxArray[pHelper->idxH.numOfIdx - 1])); + if (ptr == NULL) { + tsdbError("vgId:%d file %s SCompIdx part is corrupted. len %u", REPO_ID(pHelper->pRepo), pFile->fname, + pFile->info.len); + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + return -1; + } + + ASSERT(pHelper->idxH.numOfIdx == 1 || pHelper->idxH.pIdxArray[pHelper->idxH.numOfIdx - 1].tid > + pHelper->idxH.pIdxArray[pHelper->idxH.numOfIdx - 2].tid); ASSERT(POINTER_DISTANCE(ptr, pHelper->pBuffer) <= pFile->info.len - sizeof(TSCKSUM)); } - if (lseek(fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } + // if (lseek(fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) { + // terrno = TAOS_SYSTEM_ERROR(errno); + // return -1; + // } } } helperSetState(pHelper, TSDB_HELPER_IDX_LOAD); + if (helperType(pHelper) == TSDB_WRITE_HELPER) { + pFile->info.len = 0; + } + // Copy the memory for outside usage - if (target) memcpy(target, pHelper->pCompIdx, tsizeof(pHelper->pCompIdx)); + if (target && pHelper->idxH.numOfIdx > 0) + memcpy(target, pHelper->idxH.pIdxArray, sizeof(SCompIdx) * pHelper->idxH.numOfIdx); return 0; } @@ -498,7 +539,7 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) { int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) { ASSERT(helperHasState(pHelper, TSDB_HELPER_TABLE_SET)); - SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; + SCompIdx *pIdx = &(pHelper->curCompIdx); int fd = helperHeadF(pHelper)->fd; @@ -820,7 +861,7 @@ static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t esize) { } static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx) { - SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; + SCompIdx *pIdx = &(pHelper->curCompIdx); ASSERT(blkIdx >= 0 && blkIdx <= pIdx->numOfBlocks); ASSERT(pCompBlock->numOfSubBlocks == 1); @@ -867,7 +908,7 @@ _err: static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx, int rowsAdded) { ASSERT(pCompBlock->numOfSubBlocks == 0); - SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; + SCompIdx *pIdx = &(pHelper->curCompIdx); ASSERT(blkIdx >= 0 && blkIdx < pIdx->numOfBlocks); SCompBlock *pSCompBlock = pHelper->pCompInfo->blocks + blkIdx; @@ -951,7 +992,7 @@ _err: static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx) { ASSERT(pCompBlock->numOfSubBlocks == 1); - SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; + SCompIdx *pIdx = &(pHelper->curCompIdx); ASSERT(blkIdx >= 0 && blkIdx < pIdx->numOfBlocks); @@ -987,6 +1028,8 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int } static void tsdbResetHelperFileImpl(SRWHelper *pHelper) { + pHelper->idxH.numOfIdx = 0; + pHelper->idxH.curIdx = 0; memset((void *)&pHelper->files, 0, sizeof(pHelper->files)); helperIdxF(pHelper)->fd = -1; helperHeadF(pHelper)->fd = -1; @@ -998,14 +1041,6 @@ static void tsdbResetHelperFileImpl(SRWHelper *pHelper) { } static int tsdbInitHelperFile(SRWHelper *pHelper) { - STsdbCfg *pCfg = &pHelper->pRepo->config; - size_t tsize = sizeof(SCompIdx) * pCfg->maxTables + sizeof(TSCKSUM); - pHelper->pCompIdx = (SCompIdx *)tmalloc(tsize); - if (pHelper->pCompIdx == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } - tsdbResetHelperFileImpl(pHelper); return 0; } @@ -1013,7 +1048,8 @@ static int tsdbInitHelperFile(SRWHelper *pHelper) { static void tsdbDestroyHelperFile(SRWHelper *pHelper) { tsdbCloseHelperFile(pHelper, false); tsdbResetHelperFileImpl(pHelper); - tzfree(pHelper->pCompIdx); + tzfree(pHelper->idxH.pIdxArray); + tzfree(pHelper->pWIdx); } // ---------- Operations on Helper Table part @@ -1331,6 +1367,7 @@ _err: static int tsdbEncodeSCompIdx(void **buf, SCompIdx *pIdx) { int tlen = 0; + tlen += taosEncodeVariantI32(buf, pIdx->tid); tlen += taosEncodeVariantU32(buf, pIdx->len); tlen += taosEncodeVariantU32(buf, pIdx->offset); tlen += taosEncodeFixedU8(buf, pIdx->hasLast); @@ -1346,6 +1383,7 @@ static void *tsdbDecodeSCompIdx(void *buf, SCompIdx *pIdx) { uint32_t numOfBlocks = 0; uint64_t value = 0; + if ((buf = taosDecodeVariantI32(buf, &(pIdx->tid))) == NULL) return NULL; if ((buf = taosDecodeVariantU32(buf, &(pIdx->len))) == NULL) return NULL; if ((buf = taosDecodeVariantU32(buf, &(pIdx->offset))) == NULL) return NULL; if ((buf = taosDecodeFixedU8(buf, &(hasLast))) == NULL) return NULL; @@ -1363,7 +1401,7 @@ static void *tsdbDecodeSCompIdx(void *buf, SCompIdx *pIdx) { static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey) { STsdbCfg * pCfg = &(pHelper->pRepo->config); STable * pTable = pCommitIter->pTable; - SCompIdx * pIdx = pHelper->pCompIdx + TABLE_TID(pTable); + SCompIdx * pIdx = &(pHelper->curCompIdx); TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter); int defaultRowsInBlock = pCfg->maxRowsPerFileBlock * 4 / 5; SCompBlock compBlock = {0}; @@ -1410,7 +1448,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, int *blkIdx) { STsdbCfg * pCfg = &(pHelper->pRepo->config); STable * pTable = pCommitIter->pTable; - SCompIdx * pIdx = pHelper->pCompIdx + TABLE_TID(pTable); + SCompIdx * pIdx = &(pHelper->curCompIdx); SCompBlock compBlock = {0}; TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter); int defaultRowsInBlock = pCfg->maxRowsPerFileBlock * 4 / 5; @@ -1605,4 +1643,14 @@ static int tsdbWriteBlockToProperFile(SRWHelper *pHelper, SDataCols *pDataCols, if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, pCompBlock, isLast, true) < 0) return -1; return 0; +} + +static int tsdbCompareTidIdx(const void *key1, const void *key2) { + if (*(int32_t *)key1 > ((SCompIdx *)key2)->tid) { + return 1; + } else if (*(int32_t *)key1 < ((SCompIdx *)key2)->tid) { + return -1; + } else { + return 0; + } } \ No newline at end of file diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 5b1afe3da8..0d76c7f7bf 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -555,7 +555,9 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i); pCheckInfo->numOfBlocks = 0; - SCompIdx* compIndex = &pQueryHandle->rhelper.pCompIdx[pCheckInfo->tableId.tid]; + tsdbSetHelperTable(&pQueryHandle->rhelper, pCheckInfo->pTableObj, pQueryHandle->pTsdb); + + SCompIdx* compIndex = &pQueryHandle->rhelper.curCompIdx; // no data block in this file, try next file if (compIndex->len == 0 || compIndex->numOfBlocks == 0 || compIndex->uid != pCheckInfo->tableId.uid) { @@ -572,8 +574,6 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo pCheckInfo->compSize = compIndex->len; } - tsdbSetHelperTable(&pQueryHandle->rhelper, pCheckInfo->pTableObj, pQueryHandle->pTsdb); - tsdbLoadCompInfo(&(pQueryHandle->rhelper), (void *)(pCheckInfo->pCompInfo)); SCompInfo* pCompInfo = pCheckInfo->pCompInfo; From 9bb9d7a2a857eac311907bf81f23d1d8d5bbc8d5 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 20 Jul 2020 15:19:15 +0800 Subject: [PATCH 03/11] use malloc for more efficent --- src/common/src/tdataformat.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index 719d80aa77..12ea4ad78d 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -318,7 +318,7 @@ SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) { pCols->maxPoints = maxRows; pCols->bufSize = maxRowSize * maxRows; - pCols->buf = calloc(1, pCols->bufSize); + pCols->buf = malloc(pCols->bufSize); if (pCols->buf == NULL) { free(pCols); return NULL; From dd9f62cfe3fd52a70f27e5852b1a9cc07e1474e1 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 20 Jul 2020 16:14:55 +0800 Subject: [PATCH 04/11] remove idx file back --- src/tsdb/inc/tsdbMain.h | 11 +++++- src/tsdb/src/tsdbFile.c | 26 ++++++++------ src/tsdb/src/tsdbMemTable.c | 2 ++ src/tsdb/src/tsdbRWHelper.c | 71 ++++++++++++++++++++++++++----------- 4 files changed, 78 insertions(+), 32 deletions(-) diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 9346230e02..8ab89d72ed 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -132,12 +132,18 @@ typedef struct { // ------------------ tsdbFile.c extern const char* tsdbFileSuffix[]; typedef enum { +#ifdef TSDB_IDX TSDB_FILE_TYPE_IDX = 0, TSDB_FILE_TYPE_HEAD, +#else + TSDB_FILE_TYPE_HEAD = 0, +#endif TSDB_FILE_TYPE_DATA, TSDB_FILE_TYPE_LAST, TSDB_FILE_TYPE_MAX, +#ifdef TSDB_IDX TSDB_FILE_TYPE_NIDX, +#endif TSDB_FILE_TYPE_NHEAD, TSDB_FILE_TYPE_NLAST } TSDB_FILE_TYPE; @@ -147,6 +153,7 @@ typedef struct { uint32_t len; uint32_t totalBlocks; uint32_t totalSubBlocks; + uint32_t offset; uint64_t size; // total size of the file uint64_t tombSize; // unused file size } STsdbFileInfo; @@ -450,11 +457,13 @@ void tsdbRemoveFileGroup(STsdbRepo* pRepo, SFileGroup* pFGroup); #define helperState(h) (h)->state #define TSDB_NLAST_FILE_OPENED(h) ((h)->files.nLastF.fd > 0) #define helperFileId(h) ((h)->files.fGroup.fileId) +#ifdef TSDB_IDX #define helperIdxF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_IDX])) +#define helperNewIdxF(h) (&((h)->files.nIdxF)) +#endif #define helperHeadF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_HEAD])) #define helperDataF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_DATA])) #define helperLastF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_LAST])) -#define helperNewIdxF(h) (&((h)->files.nIdxF)) #define helperNewHeadF(h) (&((h)->files.nHeadF)) #define helperNewLastF(h) (&((h)->files.nLastF)) diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index 7b7788c4d9..0a18d15773 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -30,7 +30,11 @@ #include "ttime.h" #include "tfile.h" +#ifdef TSDB_IDX const char *tsdbFileSuffix[] = {".idx", ".head", ".data", ".last", "", ".i", ".h", ".l"}; +#else +const char *tsdbFileSuffix[] = {".head", ".data", ".last", "", ".h", ".l"}; +#endif static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type); static void tsdbDestroyFile(SFile *pFile); @@ -108,7 +112,7 @@ int tsdbOpenFileH(STsdbRepo *pRepo) { memset((void *)(&fileGroup), 0, sizeof(SFileGroup)); fileGroup.fileId = fid; - for (int type = TSDB_FILE_TYPE_IDX; type < TSDB_FILE_TYPE_MAX; type++) { + for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { if (tsdbInitFile(&fileGroup.files[type], pRepo, fid, type) < 0) { tsdbError("vgId:%d failed to init file fid %d type %d", REPO_ID(pRepo), fid, type); goto _err; @@ -126,7 +130,7 @@ int tsdbOpenFileH(STsdbRepo *pRepo) { return 0; _err: - for (int type = TSDB_FILE_TYPE_IDX; type < TSDB_FILE_TYPE_MAX; type++) tsdbDestroyFile(&fileGroup.files[type]); + for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) tsdbDestroyFile(&fileGroup.files[type]); tfree(tDataDir); if (dir != NULL) closedir(dir); @@ -139,7 +143,7 @@ void tsdbCloseFileH(STsdbRepo *pRepo) { for (int i = 0; i < pFileH->nFGroups; i++) { SFileGroup *pFGroup = pFileH->pFGroup + i; - for (int type = TSDB_FILE_TYPE_IDX; type < TSDB_FILE_TYPE_MAX; type++) { + for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { tsdbDestroyFile(&pFGroup->files[type]); } } @@ -156,7 +160,7 @@ SFileGroup *tsdbCreateFGroupIfNeed(STsdbRepo *pRepo, char *dataDir, int fid, int SFileGroup *pGroup = tsdbSearchFGroup(pFileH, fid, TD_EQ); if (pGroup == NULL) { // if not exists, create one pFGroup->fileId = fid; - for (int type = TSDB_FILE_TYPE_IDX; type < TSDB_FILE_TYPE_MAX; type++) { + for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { if (tsdbCreateFile(&pFGroup->files[type], pRepo, fid, type) < 0) goto _err; } @@ -169,7 +173,7 @@ SFileGroup *tsdbCreateFGroupIfNeed(STsdbRepo *pRepo, char *dataDir, int fid, int return pGroup; _err: - for (int type = TSDB_FILE_TYPE_IDX; type < TSDB_FILE_TYPE_MAX; type++) tsdbDestroyFile(&pGroup->files[type]); + for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) tsdbDestroyFile(&pGroup->files[type]); return NULL; } @@ -325,10 +329,11 @@ int tsdbEncodeSFileInfo(void **buf, const STsdbFileInfo *pInfo) { int tlen = 0; tlen += taosEncodeFixedU32(buf, pInfo->magic); tlen += taosEncodeFixedU32(buf, pInfo->len); - tlen += taosEncodeFixedU64(buf, pInfo->size); - tlen += taosEncodeFixedU64(buf, pInfo->tombSize); tlen += taosEncodeFixedU32(buf, pInfo->totalBlocks); tlen += taosEncodeFixedU32(buf, pInfo->totalSubBlocks); + tlen += taosEncodeFixedU32(buf, pInfo->offset); + tlen += taosEncodeFixedU64(buf, pInfo->size); + tlen += taosEncodeFixedU64(buf, pInfo->tombSize); return tlen; } @@ -336,10 +341,11 @@ int tsdbEncodeSFileInfo(void **buf, const STsdbFileInfo *pInfo) { void *tsdbDecodeSFileInfo(void *buf, STsdbFileInfo *pInfo) { buf = taosDecodeFixedU32(buf, &(pInfo->magic)); buf = taosDecodeFixedU32(buf, &(pInfo->len)); - buf = taosDecodeFixedU64(buf, &(pInfo->size)); - buf = taosDecodeFixedU64(buf, &(pInfo->tombSize)); buf = taosDecodeFixedU32(buf, &(pInfo->totalBlocks)); buf = taosDecodeFixedU32(buf, &(pInfo->totalSubBlocks)); + buf = taosDecodeFixedU32(buf, &(pInfo->offset)); + buf = taosDecodeFixedU64(buf, &(pInfo->size)); + buf = taosDecodeFixedU64(buf, &(pInfo->tombSize)); return buf; } @@ -358,7 +364,7 @@ void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) { pFileH->nFGroups--; ASSERT(pFileH->nFGroups >= 0); - for (int type = TSDB_FILE_TYPE_IDX; type < TSDB_FILE_TYPE_MAX; type++) { + for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { if (remove(fileGroup.files[type].fname) < 0) { tsdbError("vgId:%d failed to remove file %s", REPO_ID(pRepo), fileGroup.files[type].fname); } diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 69e6ab3c0f..03fa9541c9 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -628,7 +628,9 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe tsdbCloseHelperFile(pHelper, 0); pthread_rwlock_wrlock(&(pFileH->fhlock)); +#ifdef TSDB_IDX pGroup->files[TSDB_FILE_TYPE_IDX] = *(helperIdxF(pHelper)); +#endif pGroup->files[TSDB_FILE_TYPE_HEAD] = *(helperHeadF(pHelper)); pGroup->files[TSDB_FILE_TYPE_DATA] = *(helperDataF(pHelper)); pGroup->files[TSDB_FILE_TYPE_LAST] = *(helperLastF(pHelper)); diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index 19cb791ce8..22182694de 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -109,21 +109,27 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) { // Set the files pHelper->files.fGroup = *pGroup; if (helperType(pHelper) == TSDB_WRITE_HELPER) { +#ifdef TSDB_IDX tsdbGetDataFileName(pHelper->pRepo, pGroup->fileId, TSDB_FILE_TYPE_NIDX, helperNewIdxF(pHelper)->fname); +#endif tsdbGetDataFileName(pHelper->pRepo, pGroup->fileId, TSDB_FILE_TYPE_NHEAD, helperNewHeadF(pHelper)->fname); tsdbGetDataFileName(pHelper->pRepo, pGroup->fileId, TSDB_FILE_TYPE_NLAST, helperNewLastF(pHelper)->fname); } // Open the files +#ifdef TSDB_IDX if (tsdbOpenFile(helperIdxF(pHelper), O_RDONLY) < 0) goto _err; +#endif if (tsdbOpenFile(helperHeadF(pHelper), O_RDONLY) < 0) goto _err; if (helperType(pHelper) == TSDB_WRITE_HELPER) { if (tsdbOpenFile(helperDataF(pHelper), O_RDWR) < 0) goto _err; if (tsdbOpenFile(helperLastF(pHelper), O_RDWR) < 0) goto _err; +#ifdef TSDB_IDX // Create and open .i file if (tsdbOpenFile(helperNewIdxF(pHelper), O_WRONLY | O_CREAT) < 0) return -1; if (tsdbUpdateFileHeader(helperNewIdxF(pHelper), 0) < 0) return -1; +#endif // Create and open .h if (tsdbOpenFile(helperNewHeadF(pHelper), O_WRONLY | O_CREAT) < 0) return -1; @@ -150,11 +156,13 @@ _err: int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) { SFile *pFile = NULL; +#ifdef TSDB_IDX pFile = helperIdxF(pHelper); if (pFile->fd > 0) { close(pFile->fd); pFile->fd = -1; } +#endif pFile = helperHeadF(pHelper); if (pFile->fd > 0) { @@ -182,6 +190,7 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) { } if (helperType(pHelper) == TSDB_WRITE_HELPER) { +#ifdef TSDB_IDX pFile = helperNewIdxF(pHelper); if (pFile->fd > 0) { if (!hasError) tsdbUpdateFileHeader(pFile, 0); @@ -200,6 +209,7 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) { helperIdxF(pHelper)->info = pFile->info; } } +#endif pFile = helperNewHeadF(pHelper); if (pFile->fd > 0) { @@ -365,12 +375,13 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) { int tsdbWriteCompInfo(SRWHelper *pHelper) { SCompIdx *pIdx = &(pHelper->curCompIdx); off_t offset = 0; + SFile * pFile = helperNewHeadF(pHelper); if (pIdx->len > 0) { if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) { - offset = lseek(helperNewHeadF(pHelper)->fd, 0, SEEK_END); + offset = lseek(pFile->fd, 0, SEEK_END); if (offset < 0) { - tsdbError("vgId:%d failed to lseed file %s since %s", REPO_ID(pHelper->pRepo), helperNewHeadF(pHelper)->fname, + tsdbError("vgId:%d failed to lseed file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -381,9 +392,9 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) { pIdx->tid = pHelper->tableInfo.tid; ASSERT(pIdx->offset >= TSDB_FILE_HEAD_SIZE); - if (tsendfile(helperNewHeadF(pHelper)->fd, helperHeadF(pHelper)->fd, NULL, pIdx->len) < pIdx->len) { + if (tsendfile(pFile->fd, helperHeadF(pHelper)->fd, NULL, pIdx->len) < pIdx->len) { tsdbError("vgId:%d failed to send %d bytes from file %s to %s since %s", REPO_ID(pHelper->pRepo), pIdx->len, - helperHeadF(pHelper)->fname, helperNewHeadF(pHelper)->fname, strerror(errno)); + helperHeadF(pHelper)->fname, pFile->fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } @@ -394,9 +405,9 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) { ASSERT(pIdx->len > sizeof(SCompInfo) + sizeof(TSCKSUM) && (pIdx->len - sizeof(SCompInfo) - sizeof(TSCKSUM)) % sizeof(SCompBlock) == 0); taosCalcChecksumAppend(0, (uint8_t *)pHelper->pCompInfo, pIdx->len); - offset = lseek(helperNewHeadF(pHelper)->fd, 0, SEEK_END); + offset = lseek(pFile->fd, 0, SEEK_END); if (offset < 0) { - tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), helperNewHeadF(pHelper)->fname, + tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -406,15 +417,19 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) { pIdx->tid = pHelper->tableInfo.tid; ASSERT(pIdx->offset >= TSDB_FILE_HEAD_SIZE); - if (twrite(helperNewHeadF(pHelper)->fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) { + if (twrite(pFile->fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) { tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pHelper->pRepo), pIdx->len, - helperNewHeadF(pHelper)->fname, strerror(errno)); + pFile->fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } } - if (tsizeof(pHelper->pWIdx) < helperNewIdxF(pHelper)->info.len + sizeof(SCompIdx) + 12) { +#ifdef TSDB_IDX + pFile = helperNewIdxF(pHelper); +#endif + + if (tsizeof(pHelper->pWIdx) < pFile->info.len + sizeof(SCompIdx) + 12) { pHelper->pWIdx = trealloc(pHelper->pWIdx, tsizeof(pHelper->pWIdx) == 0 ? 1024 : tsizeof(pHelper->pWIdx) * 2); if (pHelper->pWIdx == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; @@ -422,8 +437,8 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) { } } - void *pBuf = POINTER_SHIFT(pHelper->pWIdx, helperNewIdxF(pHelper)->info.len); - helperNewIdxF(pHelper)->info.len += tsdbEncodeSCompIdx(&pBuf, &(pHelper->curCompIdx)); + void *pBuf = POINTER_SHIFT(pHelper->pWIdx, pFile->info.len); + pFile->info.len += tsdbEncodeSCompIdx(&pBuf, &(pHelper->curCompIdx)); } return 0; @@ -431,9 +446,13 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) { int tsdbWriteCompIdx(SRWHelper *pHelper) { ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER); - // STsdbCfg *pCfg = &pHelper->pRepo->config; + off_t offset = 0; +#ifdef TSDB_IDX SFile *pFile = helperNewIdxF(pHelper); +#else + SFile *pFile = helperNewHeadF(pHelper); +#endif pFile->info.len += sizeof(TSCKSUM); if (tsizeof(pHelper->pWIdx) < pFile->info.len) { @@ -445,6 +464,15 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) { } taosCalcChecksumAppend(0, (uint8_t *)pHelper->pWIdx, pFile->info.len); + offset = lseek(pFile->fd, 0, SEEK_END); + if (offset < 0) { + tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + pFile->info.offset = offset; + if (twrite(pFile->fd, (void *)pHelper->pWIdx, pFile->info.len) < pFile->info.len) { tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pHelper->pRepo), pFile->info.len, pFile->fname, strerror(errno)); @@ -457,8 +485,12 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) { int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) { ASSERT(pHelper->state == TSDB_HELPER_FILE_SET_AND_OPEN); +#ifdef TSDB_IDX SFile *pFile = helperIdxF(pHelper); - int fd = pFile->fd; +#else + SFile *pFile = helperHeadF(pHelper); +#endif + int fd = pFile->fd; if (!helperHasState(pHelper, TSDB_HELPER_IDX_LOAD)) { // If not load from file, just load it in object @@ -468,7 +500,7 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) { return -1; } - if (lseek(fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) { + if (lseek(fd, pFile->info.offset, SEEK_SET) < 0) { tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -516,11 +548,6 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) { ASSERT(POINTER_DISTANCE(ptr, pHelper->pBuffer) <= pFile->info.len - sizeof(TSCKSUM)); } - - // if (lseek(fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) { - // terrno = TAOS_SYSTEM_ERROR(errno); - // return -1; - // } } } helperSetState(pHelper, TSDB_HELPER_IDX_LOAD); @@ -1031,13 +1058,15 @@ static void tsdbResetHelperFileImpl(SRWHelper *pHelper) { pHelper->idxH.numOfIdx = 0; pHelper->idxH.curIdx = 0; memset((void *)&pHelper->files, 0, sizeof(pHelper->files)); - helperIdxF(pHelper)->fd = -1; helperHeadF(pHelper)->fd = -1; helperDataF(pHelper)->fd = -1; helperLastF(pHelper)->fd = -1; - helperNewIdxF(pHelper)->fd = -1; helperNewHeadF(pHelper)->fd = -1; helperNewLastF(pHelper)->fd = -1; +#ifdef TSDB_IDX + helperIdxF(pHelper)->fd = -1; + helperNewIdxF(pHelper)->fd = -1; +#endif } static int tsdbInitHelperFile(SRWHelper *pHelper) { From 975f8c2c97f97652e1c4bb8f36ce640c3d5cdcdf Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 20 Jul 2020 18:32:26 +0800 Subject: [PATCH 05/11] update file magic with other method --- src/tsdb/inc/tsdbMain.h | 4 +- src/tsdb/src/tsdbFile.c | 1 + src/tsdb/src/tsdbMain.c | 16 +++--- src/tsdb/src/tsdbRWHelper.c | 97 ++++++++++++++++++++----------------- src/util/inc/tkvstore.h | 11 +++-- src/util/src/tkvstore.c | 8 ++- 6 files changed, 80 insertions(+), 57 deletions(-) diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 8ab89d72ed..156a408059 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -42,6 +42,7 @@ extern int tsdbDebugFlag; #define TSDB_MAX_TABLE_SCHEMAS 16 #define TSDB_FILE_HEAD_SIZE 512 #define TSDB_FILE_DELIMITER 0xF00AFA0F +#define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF // Definitions // ------------------ tsdbMeta.c @@ -230,7 +231,7 @@ typedef struct { typedef struct { int32_t delimiter; // For recovery usage - int32_t checksum; // TODO: decide if checksum logic in this file or make it one API + int32_t tid; uint64_t uid; SCompBlock blocks[]; } SCompInfo; @@ -308,6 +309,7 @@ typedef struct { #define TABLE_TID(t) (t)->tableId.tid #define TABLE_SUID(t) (t)->suid #define TABLE_LASTKEY(t) (t)->lastKey +#define TSDB_META_FILE_MAGIC(m) KVSTORE_MAGIC((m)->pStore) STsdbMeta* tsdbNewMeta(STsdbCfg* pCfg); void tsdbFreeMeta(STsdbMeta* pMeta); diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index 0a18d15773..767fbc8252 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -264,6 +264,7 @@ int tsdbCreateFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) { } pFile->info.size = TSDB_FILE_HEAD_SIZE; + pFile->info.magic = TSDB_FILE_INIT_MAGIC; if (tsdbUpdateFileHeader(pFile, 0) < 0) { tsdbCloseFile(pFile); diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 82dbe5f498..7fe69021bf 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -212,6 +212,8 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_ char *sdup = strdup(pRepo->rootDir); char *prefix = dirname(sdup); + int prefixLen = strlen(prefix); + tfree(sdup); if (name[0] == 0) { // get the file from index or after, but not larger than eindex int fid = (*index) / TSDB_FILE_TYPE_MAX; @@ -220,8 +222,8 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_ if (*index <= TSDB_META_FILE_INDEX && TSDB_META_FILE_INDEX <= eindex) { fname = tsdbGetMetaFileName(pRepo->rootDir); *index = TSDB_META_FILE_INDEX; + magic = TSDB_META_FILE_MAGIC(pRepo->tsdbMeta); } else { - tfree(sdup); return 0; } } else { @@ -229,42 +231,42 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_ taosbsearch(&fid, pFileH->pFGroup, pFileH->nFGroups, sizeof(SFileGroup), keyFGroupCompFunc, TD_GE); if (pFGroup->fileId == fid) { fname = strdup(pFGroup->files[(*index) % TSDB_FILE_TYPE_MAX].fname); + magic = pFGroup->files[(*index) % TSDB_FILE_TYPE_MAX].info.magic; } else { if ((pFGroup->fileId + 1) * TSDB_FILE_TYPE_MAX - 1 < eindex) { fname = strdup(pFGroup->files[0].fname); *index = pFGroup->fileId * TSDB_FILE_TYPE_MAX; + magic = pFGroup->files[0].info.magic; } else { - tfree(sdup); return 0; } } } - strcpy(name, fname + strlen(prefix)); + strcpy(name, fname + prefixLen); } else { // get the named file at the specified index. If not there, return 0 if (*index == TSDB_META_FILE_INDEX) { // get meta file fname = tsdbGetMetaFileName(pRepo->rootDir); + magic = TSDB_META_FILE_MAGIC(pRepo->tsdbMeta); } else { int fid = (*index) / TSDB_FILE_TYPE_MAX; SFileGroup *pFGroup = tsdbSearchFGroup(pFileH, fid, TD_EQ); if (pFGroup == NULL) { // not found - tfree(sdup); return 0; } SFile *pFile = &pFGroup->files[(*index) % TSDB_FILE_TYPE_MAX]; fname = strdup(pFile->fname); + magic = pFile->info.magic; } } if (stat(fname, &fState) < 0) { - tfree(sdup); tfree(fname); return 0; } - tfree(sdup); *size = fState.st_size; - magic = *size; + // magic = *size; tfree(fname); return magic; diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index 22182694de..0b8d1fec49 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -100,6 +100,7 @@ void tsdbResetHelper(SRWHelper *pHelper) { int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) { ASSERT(pHelper != NULL && pGroup != NULL); + SFile *pFile = NULL; // Clear the helper object tsdbResetHelper(pHelper); @@ -127,18 +128,27 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) { #ifdef TSDB_IDX // Create and open .i file - if (tsdbOpenFile(helperNewIdxF(pHelper), O_WRONLY | O_CREAT) < 0) return -1; - if (tsdbUpdateFileHeader(helperNewIdxF(pHelper), 0) < 0) return -1; + pFile = helperNewIdxF(pHelper); + if (tsdbOpenFile(pFile, O_WRONLY | O_CREAT) < 0) return -1; + pFile->info.size = TSDB_FILE_HEAD_SIZE; + pFile->info.magic = TSDB_FILE_INIT_MAGIC; + if (tsdbUpdateFileHeader(pFile, 0) < 0) return -1; #endif // Create and open .h - if (tsdbOpenFile(helperNewHeadF(pHelper), O_WRONLY | O_CREAT) < 0) return -1; - if (tsdbUpdateFileHeader(helperNewHeadF(pHelper), 0) < 0) return -1; + pFile = helperNewHeadF(pHelper); + if (tsdbOpenFile(pFile, O_WRONLY | O_CREAT) < 0) return -1; + pFile->info.size = TSDB_FILE_HEAD_SIZE; + pFile->info.magic = TSDB_FILE_INIT_MAGIC; + if (tsdbUpdateFileHeader(pFile, 0) < 0) return -1; // Create and open .l file if should if (tsdbShouldCreateNewLast(pHelper)) { - if (tsdbOpenFile(helperNewLastF(pHelper), O_WRONLY | O_CREAT) < 0) goto _err; - if (tsdbUpdateFileHeader(helperNewLastF(pHelper), 0) < 0) return -1; + pFile = helperNewLastF(pHelper); + if (tsdbOpenFile(pFile, O_WRONLY | O_CREAT) < 0) goto _err; + pFile->info.size = TSDB_FILE_HEAD_SIZE; + pFile->info.magic = TSDB_FILE_INIT_MAGIC; + if (tsdbUpdateFileHeader(pFile, 0) < 0) return -1; } } else { if (tsdbOpenFile(helperDataF(pHelper), O_RDONLY) < 0) goto _err; @@ -334,7 +344,15 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) { SCompBlock *pCompBlock = blockAtIdx(pHelper, pIdx->numOfBlocks - 1); ASSERT(pCompBlock->last); + if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; + ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows && + pHelper->pDataCols[0]->numOfRows < pCfg->minRowsPerFileBlock); + if (tsdbWriteBlockToFile(pHelper, helperNewLastF(pHelper), pHelper->pDataCols[0], &compBlock, true, true) < 0) + return -1; + if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1; + +#if 0 if (pCompBlock->numOfSubBlocks > 1) { if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows && @@ -365,6 +383,7 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) { return -1; } } +#endif pHelper->hasOldLastBlock = false; } @@ -379,50 +398,34 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) { if (pIdx->len > 0) { if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) { - offset = lseek(pFile->fd, 0, SEEK_END); - if (offset < 0) { - tsdbError("vgId:%d failed to lseed file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname, - strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - pIdx->offset = offset; - pIdx->uid = pHelper->tableInfo.uid; - pIdx->tid = pHelper->tableInfo.tid; - ASSERT(pIdx->offset >= TSDB_FILE_HEAD_SIZE); - - if (tsendfile(pFile->fd, helperHeadF(pHelper)->fd, NULL, pIdx->len) < pIdx->len) { - tsdbError("vgId:%d failed to send %d bytes from file %s to %s since %s", REPO_ID(pHelper->pRepo), pIdx->len, - helperHeadF(pHelper)->fname, pFile->fname, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } + if (tsdbLoadCompInfo(pHelper, NULL) < 0) return -1; } else { pHelper->pCompInfo->delimiter = TSDB_FILE_DELIMITER; pHelper->pCompInfo->uid = pHelper->tableInfo.uid; - pHelper->pCompInfo->checksum = 0; + pHelper->pCompInfo->tid = pHelper->tableInfo.tid; ASSERT(pIdx->len > sizeof(SCompInfo) + sizeof(TSCKSUM) && (pIdx->len - sizeof(SCompInfo) - sizeof(TSCKSUM)) % sizeof(SCompBlock) == 0); taosCalcChecksumAppend(0, (uint8_t *)pHelper->pCompInfo, pIdx->len); - offset = lseek(pFile->fd, 0, SEEK_END); - if (offset < 0) { - tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname, - strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - pIdx->offset = offset; - pIdx->uid = pHelper->tableInfo.uid; - pIdx->tid = pHelper->tableInfo.tid; - ASSERT(pIdx->offset >= TSDB_FILE_HEAD_SIZE); + } - if (twrite(pFile->fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) { - tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pHelper->pRepo), pIdx->len, - pFile->fname, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } + pFile->info.magic = taosCalcChecksum( + pFile->info.magic, (uint8_t *)POINTER_SHIFT(pHelper->pCompInfo, pIdx->len - sizeof(TSCKSUM)), sizeof(TSCKSUM)); + offset = lseek(pFile->fd, 0, SEEK_END); + if (offset < 0) { + tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + pIdx->offset = offset; + pIdx->uid = pHelper->tableInfo.uid; + pIdx->tid = pHelper->tableInfo.tid; + ASSERT(pIdx->offset >= TSDB_FILE_HEAD_SIZE); + + if (twrite(pFile->fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) { + tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pHelper->pRepo), pIdx->len, + pFile->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; } #ifdef TSDB_IDX @@ -463,6 +466,8 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) { } } taosCalcChecksumAppend(0, (uint8_t *)pHelper->pWIdx, pFile->info.len); + pFile->info.magic = taosCalcChecksum( + pFile->info.magic, (uint8_t *)POINTER_SHIFT(pHelper->pWIdx, pFile->info.len - sizeof(TSCKSUM)), sizeof(TSCKSUM)); offset = lseek(pFile->fd, 0, SEEK_END); if (offset < 0) { @@ -594,7 +599,7 @@ int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) { return -1; } - ASSERT(pIdx->uid == pHelper->pCompInfo->uid); + ASSERT(pIdx->uid == pHelper->pCompInfo->uid && pIdx->tid == pHelper->pCompInfo->tid); } helperSetState(pHelper, TSDB_HELPER_INFO_LOAD); @@ -813,6 +818,8 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa ASSERT(flen > 0); flen += sizeof(TSCKSUM); taosCalcChecksumAppend(0, (uint8_t *)tptr, flen); + pFile->info.magic = + taosCalcChecksum(pFile->info.magic, (uint8_t *)POINTER_SHIFT(tptr, flen - sizeof(TSCKSUM)), sizeof(TSCKSUM)); if (ncol != 0) { pCompCol->offset = toffset; @@ -831,6 +838,8 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa pCompData->numOfCols = nColsNotAllNull; taosCalcChecksumAppend(0, (uint8_t *)pCompData, tsize); + pFile->info.magic = taosCalcChecksum(pFile->info.magic, (uint8_t *)POINTER_SHIFT(pCompData, tsize - sizeof(TSCKSUM)), + sizeof(TSCKSUM)); // Write the whole block to file if (twrite(pFile->fd, (void *)pCompData, lsize) < lsize) { diff --git a/src/util/inc/tkvstore.h b/src/util/inc/tkvstore.h index 346e567c41..6d67607e24 100644 --- a/src/util/inc/tkvstore.h +++ b/src/util/inc/tkvstore.h @@ -25,10 +25,11 @@ typedef int (*iterFunc)(void *, void *cont, int contLen); typedef void (*afterFunc)(void *); typedef struct { - int64_t size; // including 512 bytes of header size - int64_t tombSize; - int64_t nRecords; - int64_t nDels; + int64_t size; // including 512 bytes of header size + int64_t tombSize; + int64_t nRecords; + int64_t nDels; + uint32_t magic; } SStoreInfo; typedef struct { @@ -45,6 +46,8 @@ typedef struct { SStoreInfo info; } SKVStore; +#define KVSTORE_MAGIC(s) (s)->info.magic + int tdCreateKVStore(char *fname); int tdDestroyKVStore(char *fname); SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH); diff --git a/src/util/src/tkvstore.c b/src/util/src/tkvstore.c index ab2aa738c6..a2022569bf 100644 --- a/src/util/src/tkvstore.c +++ b/src/util/src/tkvstore.c @@ -34,6 +34,7 @@ #define TD_KVSTORE_MAINOR_VERSION 0 #define TD_KVSTORE_SNAP_SUFFIX ".snap" #define TD_KVSTORE_NEW_SUFFIX ".new" +#define TD_KVSTORE_INIT_MAGIC 0xFFFFFFFF typedef struct { uint64_t uid; @@ -251,6 +252,8 @@ int tdUpdateKVStoreRecord(SKVStore *pStore, uint64_t uid, void *cont, int contLe return -1; } + pStore->info.magic = + taosCalcChecksum(pStore->info.magic, (uint8_t *)POINTER_SHIFT(cont, contLen - sizeof(TSCKSUM)), sizeof(TSCKSUM)); pStore->info.size += (sizeof(SKVRecord) + contLen); SKVRecord *pRecord = taosHashGet(pStore->map, (void *)&uid, sizeof(uid)); if (pRecord != NULL) { // just to insert @@ -288,6 +291,7 @@ int tdDropKVStoreRecord(SKVStore *pStore, uint64_t uid) { return -1; } + pStore->info.magic = taosCalcChecksum(pStore->info.magic, (uint8_t *)buf, POINTER_DISTANCE(pBuf, buf)); pStore->info.size += POINTER_DISTANCE(pBuf, buf); pStore->info.nDels++; pStore->info.nRecords--; @@ -371,7 +375,7 @@ static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo) { } static int tdInitKVStoreHeader(int fd, char *fname) { - SStoreInfo info = {TD_KVSTORE_HEADER_SIZE, 0, 0, 0}; + SStoreInfo info = {TD_KVSTORE_HEADER_SIZE, 0, 0, 0, TD_KVSTORE_INIT_MAGIC}; return tdUpdateKVStoreHeader(fd, fname, &info); } @@ -382,6 +386,7 @@ static int tdEncodeStoreInfo(void **buf, SStoreInfo *pInfo) { tlen += taosEncodeVariantI64(buf, pInfo->tombSize); tlen += taosEncodeVariantI64(buf, pInfo->nRecords); tlen += taosEncodeVariantI64(buf, pInfo->nDels); + tlen += taosEncodeFixedU32(buf, pInfo->magic); return tlen; } @@ -391,6 +396,7 @@ static void *tdDecodeStoreInfo(void *buf, SStoreInfo *pInfo) { buf = taosDecodeVariantI64(buf, &(pInfo->tombSize)); buf = taosDecodeVariantI64(buf, &(pInfo->nRecords)); buf = taosDecodeVariantI64(buf, &(pInfo->nDels)); + buf = taosDecodeFixedU32(buf, &(pInfo->magic)); return buf; } From 88ba2f49359c6b5fbfed26e942d6d8b1944584fc Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 21 Jul 2020 13:43:46 +0800 Subject: [PATCH 06/11] add missed compile option --- src/tsdb/inc/tsdbMain.h | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index ba5711a81c..762d2253e2 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -263,7 +263,9 @@ typedef struct { TSKEY minKey; TSKEY maxKey; SFileGroup fGroup; - SFile nIdxF; +#ifdef TSDB_IDX + SFile nIdxF; +#endif SFile nHeadF; SFile nLastF; } SHelperFile; From fac8cf100bfd774dd30c5dbd3c7afcf25009668a Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 21 Jul 2020 14:17:05 +0800 Subject: [PATCH 07/11] refactor and debug --- src/tsdb/src/tsdbRWHelper.c | 44 ++++++++++++++----------------------- src/tsdb/src/tsdbRead.c | 24 +++++++++++++++----- 2 files changed, 34 insertions(+), 34 deletions(-) diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index 0b8d1fec49..cec3b0d36b 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -60,7 +60,6 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter int *blkIdx); static int tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, TSKEY maxKey, int maxRows); -static int tsdbCompareTidIdx(const void *key1, const void *key2); // ---------------------- INTERNAL FUNCTIONS ---------------------- int tsdbInitReadHelper(SRWHelper *pHelper, STsdbRepo *pRepo) { @@ -276,32 +275,31 @@ void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) { tdInitDataCols(pHelper->pDataCols[0], pSchema); tdInitDataCols(pHelper->pDataCols[1], pSchema); - if (helperType(pHelper) == TSDB_WRITE_HELPER) { - if (pHelper->idxH.numOfIdx > 0) { + if (pHelper->idxH.numOfIdx > 0) { + while (true) { if (pHelper->idxH.curIdx >= pHelper->idxH.numOfIdx) { memset(&(pHelper->curCompIdx), 0, sizeof(SCompIdx)); - } else { - SCompIdx *pIdx = &(pHelper->idxH.pIdxArray[pHelper->idxH.curIdx]); - if (pIdx->tid == TABLE_TID(pTable)) { + break; + } + + SCompIdx *pIdx = &(pHelper->idxH.pIdxArray[pHelper->idxH.curIdx]); + if (pIdx->tid == TABLE_TID(pTable)) { + if (pIdx->uid == TABLE_UID(pTable)) { pHelper->curCompIdx = *pIdx; - pHelper->idxH.curIdx++; } else { - ASSERT(pIdx->tid > TABLE_TID(pTable)); memset(&(pHelper->curCompIdx), 0, sizeof(SCompIdx)); } + pHelper->idxH.curIdx++; + break; + } else if (pIdx->tid > TABLE_TID(pTable)) { + memset(&(pHelper->curCompIdx), 0, sizeof(SCompIdx)); + break; + } else { + pHelper->idxH.curIdx++; } - } else { - memset(&(pHelper->curCompIdx), 0, sizeof(SCompIdx)); } } else { - // TODO: make it more efficient - void *ptr = bsearch(&TABLE_TID(pTable), (void *)pHelper->idxH.pIdxArray, pHelper->idxH.numOfIdx, sizeof(SCompIdx), - tsdbCompareTidIdx); - if (ptr == NULL) { - memset(&(pHelper->curCompIdx), 0, sizeof(SCompIdx)); - } else { - pHelper->curCompIdx = *(SCompIdx *)ptr; - } + memset(&(pHelper->curCompIdx), 0, sizeof(SCompIdx)); } helperSetState(pHelper, TSDB_HELPER_TABLE_SET); @@ -1681,14 +1679,4 @@ static int tsdbWriteBlockToProperFile(SRWHelper *pHelper, SDataCols *pDataCols, if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, pCompBlock, isLast, true) < 0) return -1; return 0; -} - -static int tsdbCompareTidIdx(const void *key1, const void *key2) { - if (*(int32_t *)key1 > ((SCompIdx *)key2)->tid) { - return 1; - } else if (*(int32_t *)key1 < ((SCompIdx *)key2)->tid) { - return -1; - } else { - return 0; - } } \ No newline at end of file diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index b2a3887bed..b2204948d0 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -126,12 +126,13 @@ typedef struct STsdbQueryHandle { SIOCostSummary cost; } STsdbQueryHandle; -static void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle); -static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle); -static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock); +static void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle); +static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle); +static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock); static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order); -static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win, - STsdbQueryHandle* pQueryHandle); +static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win, + STsdbQueryHandle* pQueryHandle); +static int tsdbCheckInfoCompar(const void* key1, const void* key2); static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) { pBlockLoadInfo->slot = -1; @@ -236,7 +237,8 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab taosArrayPush(pQueryHandle->pTableCheckInfo, &info); } } - + + taosArraySort(pQueryHandle->pTableCheckInfo, tsdbCheckInfoCompar); pQueryHandle->defaultLoadColumn = getDefaultLoadColumns(pQueryHandle, true); tsdbDebug("%p total numOfTable:%zu in query, %p", pQueryHandle, taosArrayGetSize(pQueryHandle->pTableCheckInfo), pQueryHandle->qinfo); @@ -2431,3 +2433,13 @@ void tsdbDestroyTableGroup(STableGroupInfo *pGroupList) { taosArrayDestroy(pGroupList->pGroupList); } +static int tsdbCheckInfoCompar(const void* key1, const void* key2) { + if (((STableCheckInfo*)key1)->tableId.tid < ((STableCheckInfo*)key2)->tableId.tid) { + return -1; + } else if (((STableCheckInfo*)key1)->tableId.tid > ((STableCheckInfo*)key2)->tableId.tid) { + return 1; + } else { + ASSERT(false); + return 0; + } +} \ No newline at end of file From 51797a45d5ecb05943935b025e5b3570c2439921 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 21 Jul 2020 15:39:58 +0800 Subject: [PATCH 08/11] fix invalid write --- src/tsdb/src/tsdbRWHelper.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index cec3b0d36b..9d0efd0de2 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -530,7 +530,7 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) { size_t tlen = tsizeof(pHelper->idxH.pIdxArray); pHelper->idxH.numOfIdx++; - if (tlen < pHelper->idxH.numOfIdx) { + if (tlen < pHelper->idxH.numOfIdx * sizoef(SCompIdx)) { pHelper->idxH.pIdxArray = (SCompIdx *)trealloc(pHelper->idxH.pIdxArray, (tlen == 0) ? 1024 : tlen * 2); if (pHelper->idxH.pIdxArray == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; From 8a3616898f1836d06b7330a3bb06d2661535fc21 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 21 Jul 2020 15:45:04 +0800 Subject: [PATCH 09/11] fix typo --- src/tsdb/src/tsdbRWHelper.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index 9d0efd0de2..f8792469cb 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -530,7 +530,7 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) { size_t tlen = tsizeof(pHelper->idxH.pIdxArray); pHelper->idxH.numOfIdx++; - if (tlen < pHelper->idxH.numOfIdx * sizoef(SCompIdx)) { + if (tlen < pHelper->idxH.numOfIdx * sizeof(SCompIdx)) { pHelper->idxH.pIdxArray = (SCompIdx *)trealloc(pHelper->idxH.pIdxArray, (tlen == 0) ? 1024 : tlen * 2); if (pHelper->idxH.pIdxArray == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; From 5e8b11415a8e3241668bc875638346027f25d1b2 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 21 Jul 2020 18:08:46 +0800 Subject: [PATCH 10/11] need to restore magic number when kvstore open --- src/util/src/tkvstore.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/util/src/tkvstore.c b/src/util/src/tkvstore.c index a2022569bf..0704285da5 100644 --- a/src/util/src/tkvstore.c +++ b/src/util/src/tkvstore.c @@ -141,6 +141,7 @@ SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH if (tdLoadKVStoreHeader(pStore->fd, pStore->fname, &info) < 0) goto _err; pStore->info.size = TD_KVSTORE_HEADER_SIZE; + pStore->info.magic = info.magic; if (tdRestoreKVStore(pStore) < 0) goto _err; From 946f6cb87f111936e8d54a742453ed1700ea0d59 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 22 Jul 2020 10:07:57 +0800 Subject: [PATCH 11/11] fix restart bug --- src/tsdb/src/tsdbMain.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 7fe69021bf..772bcf48d6 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -795,6 +795,7 @@ static int tsdbRestoreInfo(STsdbRepo *pRepo) { for (int i = 1; i < pRepo->config.maxTables; i++) { STable *pTable = pMeta->tables[i]; if (pTable == NULL) continue; + tsdbSetHelperTable(&rhelper, pTable, pRepo); SCompIdx *pIdx = &(rhelper.curCompIdx); if (pIdx->offset > 0 && pTable->lastKey < pIdx->maxKey) pTable->lastKey = pIdx->maxKey;