diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index 719d80aa77..12ea4ad78d 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -318,7 +318,7 @@ SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) { pCols->maxPoints = maxRows; pCols->bufSize = maxRowSize * maxRows; - pCols->buf = calloc(1, pCols->bufSize); + pCols->buf = malloc(pCols->bufSize); if (pCols->buf == NULL) { free(pCols); return NULL; diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 210d95853c..762d2253e2 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -42,6 +42,7 @@ extern int tsdbDebugFlag; #define TSDB_MAX_TABLE_SCHEMAS 16 #define TSDB_FILE_HEAD_SIZE 512 #define TSDB_FILE_DELIMITER 0xF00AFA0F +#define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF // Definitions // ------------------ tsdbMeta.c @@ -132,21 +133,30 @@ typedef struct { // ------------------ tsdbFile.c extern const char* tsdbFileSuffix[]; typedef enum { +#ifdef TSDB_IDX + TSDB_FILE_TYPE_IDX = 0, + TSDB_FILE_TYPE_HEAD, +#else TSDB_FILE_TYPE_HEAD = 0, +#endif TSDB_FILE_TYPE_DATA, TSDB_FILE_TYPE_LAST, TSDB_FILE_TYPE_MAX, +#ifdef TSDB_IDX + TSDB_FILE_TYPE_NIDX, +#endif TSDB_FILE_TYPE_NHEAD, TSDB_FILE_TYPE_NLAST } TSDB_FILE_TYPE; typedef struct { - uint32_t offset; + uint32_t magic; uint32_t len; - uint64_t size; // total size of the file - uint64_t tombSize; // unused file size uint32_t totalBlocks; uint32_t totalSubBlocks; + uint32_t offset; + uint64_t size; // total size of the file + uint64_t tombSize; // unused file size } STsdbFileInfo; typedef struct { @@ -197,6 +207,7 @@ typedef struct { // ------------------ tsdbRWHelper.c typedef struct { + int32_t tid; uint32_t len; uint32_t offset; uint32_t hasLast : 2; @@ -220,7 +231,7 @@ typedef struct { typedef struct { int32_t delimiter; // For recovery usage - int32_t checksum; // TODO: decide if checksum logic in this file or make it one API + int32_t tid; uint64_t uid; SCompBlock blocks[]; } SCompInfo; @@ -249,24 +260,27 @@ typedef struct { typedef enum { TSDB_WRITE_HELPER, TSDB_READ_HELPER } tsdb_rw_helper_t; typedef struct { - int fid; - TSKEY minKey; - TSKEY maxKey; - // For read/write purpose - SFile headF; - SFile dataF; - SFile lastF; - // For write purpose only - SFile nHeadF; - SFile nLastF; + TSKEY minKey; + TSKEY maxKey; + SFileGroup fGroup; +#ifdef TSDB_IDX + SFile nIdxF; +#endif + SFile nHeadF; + SFile nLastF; } SHelperFile; typedef struct { uint64_t uid; int32_t tid; - int32_t sversion; } SHelperTable; +typedef struct { + SCompIdx* pIdxArray; + int numOfIdx; + int curIdx; +} SIdxH; + typedef struct { tsdb_rw_helper_t type; @@ -274,7 +288,9 @@ typedef struct { int8_t state; // For file set usage SHelperFile files; - SCompIdx* pCompIdx; + SIdxH idxH; + SCompIdx curCompIdx; + void* pWIdx; // For table set usage SHelperTable tableInfo; SCompInfo* pCompInfo; @@ -286,7 +302,6 @@ typedef struct { void* compBuffer; // Buffer for temperary compress/decompress purpose } SRWHelper; - // Operations // ------------------ tsdbMeta.c #define TABLE_TYPE(t) (t)->type @@ -296,6 +311,7 @@ typedef struct { #define TABLE_TID(t) (t)->tableId.tid #define TABLE_SUID(t) (t)->suid #define TABLE_LASTKEY(t) (t)->lastKey +#define TSDB_META_FILE_MAGIC(m) KVSTORE_MAGIC((m)->pStore) STsdbMeta* tsdbNewMeta(STsdbCfg* pCfg); void tsdbFreeMeta(STsdbMeta* pMeta); @@ -445,6 +461,16 @@ void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TS #define helperRepo(h) (h)->pRepo #define helperState(h) (h)->state #define TSDB_NLAST_FILE_OPENED(h) ((h)->files.nLastF.fd > 0) +#define helperFileId(h) ((h)->files.fGroup.fileId) +#ifdef TSDB_IDX +#define helperIdxF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_IDX])) +#define helperNewIdxF(h) (&((h)->files.nIdxF)) +#endif +#define helperHeadF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_HEAD])) +#define helperDataF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_DATA])) +#define helperLastF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_LAST])) +#define helperNewHeadF(h) (&((h)->files.nHeadF)) +#define helperNewLastF(h) (&((h)->files.nLastF)) int tsdbInitReadHelper(SRWHelper* pHelper, STsdbRepo* pRepo); int tsdbInitWriteHelper(SRWHelper* pHelper, STsdbRepo* pRepo); diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index 95cc47292b..767fbc8252 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -30,7 +30,11 @@ #include "ttime.h" #include "tfile.h" +#ifdef TSDB_IDX +const char *tsdbFileSuffix[] = {".idx", ".head", ".data", ".last", "", ".i", ".h", ".l"}; +#else const char *tsdbFileSuffix[] = {".head", ".data", ".last", "", ".h", ".l"}; +#endif static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type); static void tsdbDestroyFile(SFile *pFile); @@ -108,7 +112,7 @@ int tsdbOpenFileH(STsdbRepo *pRepo) { memset((void *)(&fileGroup), 0, sizeof(SFileGroup)); fileGroup.fileId = fid; - for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { + for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { if (tsdbInitFile(&fileGroup.files[type], pRepo, fid, type) < 0) { tsdbError("vgId:%d failed to init file fid %d type %d", REPO_ID(pRepo), fid, type); goto _err; @@ -126,7 +130,7 @@ int tsdbOpenFileH(STsdbRepo *pRepo) { return 0; _err: - for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) tsdbDestroyFile(&fileGroup.files[type]); + for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) tsdbDestroyFile(&fileGroup.files[type]); tfree(tDataDir); if (dir != NULL) closedir(dir); @@ -139,7 +143,7 @@ void tsdbCloseFileH(STsdbRepo *pRepo) { for (int i = 0; i < pFileH->nFGroups; i++) { SFileGroup *pFGroup = pFileH->pFGroup + i; - for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { + for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { tsdbDestroyFile(&pFGroup->files[type]); } } @@ -156,7 +160,7 @@ SFileGroup *tsdbCreateFGroupIfNeed(STsdbRepo *pRepo, char *dataDir, int fid, int SFileGroup *pGroup = tsdbSearchFGroup(pFileH, fid, TD_EQ); if (pGroup == NULL) { // if not exists, create one pFGroup->fileId = fid; - for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { + for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { if (tsdbCreateFile(&pFGroup->files[type], pRepo, fid, type) < 0) goto _err; } @@ -169,7 +173,7 @@ SFileGroup *tsdbCreateFGroupIfNeed(STsdbRepo *pRepo, char *dataDir, int fid, int return pGroup; _err: - for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) tsdbDestroyFile(&pGroup->files[type]); + for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) tsdbDestroyFile(&pGroup->files[type]); return NULL; } @@ -260,6 +264,7 @@ int tsdbCreateFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) { } pFile->info.size = TSDB_FILE_HEAD_SIZE; + pFile->info.magic = TSDB_FILE_INIT_MAGIC; if (tsdbUpdateFileHeader(pFile, 0) < 0) { tsdbCloseFile(pFile); @@ -323,23 +328,25 @@ int tsdbUpdateFileHeader(SFile *pFile, uint32_t version) { int tsdbEncodeSFileInfo(void **buf, const STsdbFileInfo *pInfo) { int tlen = 0; - tlen += taosEncodeFixedU32(buf, pInfo->offset); + tlen += taosEncodeFixedU32(buf, pInfo->magic); tlen += taosEncodeFixedU32(buf, pInfo->len); - tlen += taosEncodeFixedU64(buf, pInfo->size); - tlen += taosEncodeFixedU64(buf, pInfo->tombSize); tlen += taosEncodeFixedU32(buf, pInfo->totalBlocks); tlen += taosEncodeFixedU32(buf, pInfo->totalSubBlocks); + tlen += taosEncodeFixedU32(buf, pInfo->offset); + tlen += taosEncodeFixedU64(buf, pInfo->size); + tlen += taosEncodeFixedU64(buf, pInfo->tombSize); return tlen; } void *tsdbDecodeSFileInfo(void *buf, STsdbFileInfo *pInfo) { - buf = taosDecodeFixedU32(buf, &(pInfo->offset)); + buf = taosDecodeFixedU32(buf, &(pInfo->magic)); buf = taosDecodeFixedU32(buf, &(pInfo->len)); - buf = taosDecodeFixedU64(buf, &(pInfo->size)); - buf = taosDecodeFixedU64(buf, &(pInfo->tombSize)); buf = taosDecodeFixedU32(buf, &(pInfo->totalBlocks)); buf = taosDecodeFixedU32(buf, &(pInfo->totalSubBlocks)); + buf = taosDecodeFixedU32(buf, &(pInfo->offset)); + buf = taosDecodeFixedU64(buf, &(pInfo->size)); + buf = taosDecodeFixedU64(buf, &(pInfo->tombSize)); return buf; } @@ -358,7 +365,7 @@ void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) { pFileH->nFGroups--; ASSERT(pFileH->nFGroups >= 0); - for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { + for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { if (remove(fileGroup.files[type].fname) < 0) { tsdbError("vgId:%d failed to remove file %s", REPO_ID(pRepo), fileGroup.files[type].fname); } diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index e30164592d..772bcf48d6 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -212,59 +212,61 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_ char *sdup = strdup(pRepo->rootDir); char *prefix = dirname(sdup); + int prefixLen = strlen(prefix); + tfree(sdup); if (name[0] == 0) { // get the file from index or after, but not larger than eindex - int fid = (*index) / 3; + int fid = (*index) / TSDB_FILE_TYPE_MAX; if (pFileH->nFGroups == 0 || fid > pFileH->pFGroup[pFileH->nFGroups - 1].fileId) { if (*index <= TSDB_META_FILE_INDEX && TSDB_META_FILE_INDEX <= eindex) { fname = tsdbGetMetaFileName(pRepo->rootDir); *index = TSDB_META_FILE_INDEX; + magic = TSDB_META_FILE_MAGIC(pRepo->tsdbMeta); } else { - tfree(sdup); return 0; } } else { SFileGroup *pFGroup = taosbsearch(&fid, pFileH->pFGroup, pFileH->nFGroups, sizeof(SFileGroup), keyFGroupCompFunc, TD_GE); if (pFGroup->fileId == fid) { - fname = strdup(pFGroup->files[(*index) % 3].fname); + fname = strdup(pFGroup->files[(*index) % TSDB_FILE_TYPE_MAX].fname); + magic = pFGroup->files[(*index) % TSDB_FILE_TYPE_MAX].info.magic; } else { - if (pFGroup->fileId * 3 + 2 < eindex) { + if ((pFGroup->fileId + 1) * TSDB_FILE_TYPE_MAX - 1 < eindex) { fname = strdup(pFGroup->files[0].fname); - *index = pFGroup->fileId * 3; + *index = pFGroup->fileId * TSDB_FILE_TYPE_MAX; + magic = pFGroup->files[0].info.magic; } else { - tfree(sdup); return 0; } } } - strcpy(name, fname + strlen(prefix)); + strcpy(name, fname + prefixLen); } else { // get the named file at the specified index. If not there, return 0 if (*index == TSDB_META_FILE_INDEX) { // get meta file fname = tsdbGetMetaFileName(pRepo->rootDir); + magic = TSDB_META_FILE_MAGIC(pRepo->tsdbMeta); } else { - int fid = (*index) / 3; + int fid = (*index) / TSDB_FILE_TYPE_MAX; SFileGroup *pFGroup = tsdbSearchFGroup(pFileH, fid, TD_EQ); if (pFGroup == NULL) { // not found - tfree(sdup); return 0; } - SFile *pFile = &pFGroup->files[(*index) % 3]; + SFile *pFile = &pFGroup->files[(*index) % TSDB_FILE_TYPE_MAX]; fname = strdup(pFile->fname); + magic = pFile->info.magic; } } if (stat(fname, &fState) < 0) { - tfree(sdup); tfree(fname); return 0; } - tfree(sdup); *size = fState.st_size; - magic = *size; + // magic = *size; tfree(fname); return magic; @@ -793,7 +795,8 @@ static int tsdbRestoreInfo(STsdbRepo *pRepo) { for (int i = 1; i < pRepo->config.maxTables; i++) { STable *pTable = pMeta->tables[i]; if (pTable == NULL) continue; - SCompIdx *pIdx = &rhelper.pCompIdx[i]; + tsdbSetHelperTable(&rhelper, pTable, pRepo); + SCompIdx *pIdx = &(rhelper.curCompIdx); if (pIdx->offset > 0 && pTable->lastKey < pIdx->maxKey) pTable->lastKey = pIdx->maxKey; } diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 1b7db635b3..5325eb5b0c 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -627,9 +627,12 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe tsdbCloseHelperFile(pHelper, 0); pthread_rwlock_wrlock(&(pFileH->fhlock)); - pGroup->files[TSDB_FILE_TYPE_HEAD] = pHelper->files.headF; - pGroup->files[TSDB_FILE_TYPE_DATA] = pHelper->files.dataF; - pGroup->files[TSDB_FILE_TYPE_LAST] = pHelper->files.lastF; +#ifdef TSDB_IDX + pGroup->files[TSDB_FILE_TYPE_IDX] = *(helperIdxF(pHelper)); +#endif + pGroup->files[TSDB_FILE_TYPE_HEAD] = *(helperHeadF(pHelper)); + pGroup->files[TSDB_FILE_TYPE_DATA] = *(helperDataF(pHelper)); + pGroup->files[TSDB_FILE_TYPE_LAST] = *(helperLastF(pHelper)); pthread_rwlock_unlock(&(pFileH->fhlock)); return 0; diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index 0d52b7ae33..f8792469cb 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -99,6 +99,7 @@ void tsdbResetHelper(SRWHelper *pHelper) { int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) { ASSERT(pHelper != NULL && pGroup != NULL); + SFile *pFile = NULL; // Clear the helper object tsdbResetHelper(pHelper); @@ -106,44 +107,51 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) { ASSERT(pHelper->state == TSDB_HELPER_CLEAR_STATE); // Set the files - pHelper->files.fid = pGroup->fileId; - pHelper->files.headF = pGroup->files[TSDB_FILE_TYPE_HEAD]; - pHelper->files.dataF = pGroup->files[TSDB_FILE_TYPE_DATA]; - pHelper->files.lastF = pGroup->files[TSDB_FILE_TYPE_LAST]; + pHelper->files.fGroup = *pGroup; if (helperType(pHelper) == TSDB_WRITE_HELPER) { - tsdbGetDataFileName(pHelper->pRepo, pGroup->fileId, TSDB_FILE_TYPE_NHEAD, pHelper->files.nHeadF.fname); - tsdbGetDataFileName(pHelper->pRepo, pGroup->fileId, TSDB_FILE_TYPE_NLAST, pHelper->files.nLastF.fname); +#ifdef TSDB_IDX + tsdbGetDataFileName(pHelper->pRepo, pGroup->fileId, TSDB_FILE_TYPE_NIDX, helperNewIdxF(pHelper)->fname); +#endif + tsdbGetDataFileName(pHelper->pRepo, pGroup->fileId, TSDB_FILE_TYPE_NHEAD, helperNewHeadF(pHelper)->fname); + tsdbGetDataFileName(pHelper->pRepo, pGroup->fileId, TSDB_FILE_TYPE_NLAST, helperNewLastF(pHelper)->fname); } // Open the files - if (tsdbOpenFile(&(pHelper->files.headF), O_RDONLY) < 0) goto _err; +#ifdef TSDB_IDX + if (tsdbOpenFile(helperIdxF(pHelper), O_RDONLY) < 0) goto _err; +#endif + if (tsdbOpenFile(helperHeadF(pHelper), O_RDONLY) < 0) goto _err; if (helperType(pHelper) == TSDB_WRITE_HELPER) { - if (tsdbOpenFile(&(pHelper->files.dataF), O_RDWR) < 0) goto _err; - if (tsdbOpenFile(&(pHelper->files.lastF), O_RDWR) < 0) goto _err; + if (tsdbOpenFile(helperDataF(pHelper), O_RDWR) < 0) goto _err; + if (tsdbOpenFile(helperLastF(pHelper), O_RDWR) < 0) goto _err; + +#ifdef TSDB_IDX + // Create and open .i file + pFile = helperNewIdxF(pHelper); + if (tsdbOpenFile(pFile, O_WRONLY | O_CREAT) < 0) return -1; + pFile->info.size = TSDB_FILE_HEAD_SIZE; + pFile->info.magic = TSDB_FILE_INIT_MAGIC; + if (tsdbUpdateFileHeader(pFile, 0) < 0) return -1; +#endif // Create and open .h - if (tsdbOpenFile(&(pHelper->files.nHeadF), O_WRONLY | O_CREAT) < 0) return -1; - // size_t tsize = TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pCfg->maxTables + sizeof(TSCKSUM); - if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) { - tsdbError("vgId:%d failed to sendfile %d bytes from file %s to %s since %s", REPO_ID(pHelper->pRepo), - TSDB_FILE_HEAD_SIZE, pHelper->files.headF.fname, pHelper->files.nHeadF.fname, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + pFile = helperNewHeadF(pHelper); + if (tsdbOpenFile(pFile, O_WRONLY | O_CREAT) < 0) return -1; + pFile->info.size = TSDB_FILE_HEAD_SIZE; + pFile->info.magic = TSDB_FILE_INIT_MAGIC; + if (tsdbUpdateFileHeader(pFile, 0) < 0) return -1; // Create and open .l file if should if (tsdbShouldCreateNewLast(pHelper)) { - if (tsdbOpenFile(&(pHelper->files.nLastF), O_WRONLY | O_CREAT) < 0) goto _err; - if (tsendfile(pHelper->files.nLastF.fd, pHelper->files.lastF.fd, NULL, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) { - tsdbError("vgId:%d failed to sendfile %d bytes from file %s to %s since %s", REPO_ID(pHelper->pRepo), - TSDB_FILE_HEAD_SIZE, pHelper->files.lastF.fname, pHelper->files.nLastF.fname, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + pFile = helperNewLastF(pHelper); + if (tsdbOpenFile(pFile, O_WRONLY | O_CREAT) < 0) goto _err; + pFile->info.size = TSDB_FILE_HEAD_SIZE; + pFile->info.magic = TSDB_FILE_INIT_MAGIC; + if (tsdbUpdateFileHeader(pFile, 0) < 0) return -1; } } else { - if (tsdbOpenFile(&(pHelper->files.dataF), O_RDONLY) < 0) goto _err; - if (tsdbOpenFile(&(pHelper->files.lastF), O_RDONLY) < 0) goto _err; + if (tsdbOpenFile(helperDataF(pHelper), O_RDONLY) < 0) goto _err; + if (tsdbOpenFile(helperLastF(pHelper), O_RDONLY) < 0) goto _err; } helperSetState(pHelper, TSDB_HELPER_FILE_SET_AND_OPEN); @@ -155,59 +163,98 @@ _err: } int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) { - if (pHelper->files.headF.fd > 0) { - close(pHelper->files.headF.fd); - pHelper->files.headF.fd = -1; + SFile *pFile = NULL; + +#ifdef TSDB_IDX + pFile = helperIdxF(pHelper); + if (pFile->fd > 0) { + close(pFile->fd); + pFile->fd = -1; } - if (pHelper->files.dataF.fd > 0) { +#endif + + pFile = helperHeadF(pHelper); + if (pFile->fd > 0) { + close(pFile->fd); + pFile->fd = -1; + } + + pFile = helperDataF(pHelper); + if (pFile->fd > 0) { if (helperType(pHelper) == TSDB_WRITE_HELPER) { - tsdbUpdateFileHeader(&(pHelper->files.dataF), 0); - fsync(pHelper->files.dataF.fd); + tsdbUpdateFileHeader(pFile, 0); + fsync(pFile->fd); } - close(pHelper->files.dataF.fd); - pHelper->files.dataF.fd = -1; + close(pFile->fd); + pFile->fd = -1; } - if (pHelper->files.lastF.fd > 0) { - if (helperType(pHelper) == TSDB_WRITE_HELPER) { - fsync(pHelper->files.lastF.fd); + + pFile = helperLastF(pHelper); + if (pFile->fd > 0) { + if (helperType(pHelper) == TSDB_WRITE_HELPER && !TSDB_NLAST_FILE_OPENED(pHelper)) { + fsync(pFile->fd); } - close(pHelper->files.lastF.fd); - pHelper->files.lastF.fd = -1; + close(pFile->fd); + pFile->fd = -1; } + if (helperType(pHelper) == TSDB_WRITE_HELPER) { - if (pHelper->files.nHeadF.fd > 0) { - if (!hasError) tsdbUpdateFileHeader(&(pHelper->files.nHeadF), 0); - fsync(pHelper->files.nHeadF.fd); - close(pHelper->files.nHeadF.fd); - pHelper->files.nHeadF.fd = -1; +#ifdef TSDB_IDX + pFile = helperNewIdxF(pHelper); + if (pFile->fd > 0) { + if (!hasError) tsdbUpdateFileHeader(pFile, 0); + fsync(pFile->fd); + close(pFile->fd); + pFile->fd = -1; if (hasError) { - (void)remove(pHelper->files.nHeadF.fname); + (void)remove(pFile->fname); } else { - if (rename(pHelper->files.nHeadF.fname, pHelper->files.headF.fname) < 0) { - tsdbError("failed to rename file from %s to %s since %s", pHelper->files.nHeadF.fname, - pHelper->files.headF.fname, strerror(errno)); + if (rename(pFile->fname, helperIdxF(pHelper)->fname) < 0) { + tsdbError("failed to rename file from %s to %s since %s", pFile->fname, helperIdxF(pHelper)->fname, + strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } - pHelper->files.headF.info = pHelper->files.nHeadF.info; + helperIdxF(pHelper)->info = pFile->info; + } + } +#endif + + pFile = helperNewHeadF(pHelper); + if (pFile->fd > 0) { + if (!hasError) tsdbUpdateFileHeader(pFile, 0); + fsync(pFile->fd); + close(pFile->fd); + pFile->fd = -1; + if (hasError) { + (void)remove(pFile->fname); + } else { + if (rename(pFile->fname, helperHeadF(pHelper)->fname) < 0) { + tsdbError("failed to rename file from %s to %s since %s", pFile->fname, helperHeadF(pHelper)->fname, + strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + helperHeadF(pHelper)->info = pFile->info; } } - if (pHelper->files.nLastF.fd > 0) { - if (!hasError) tsdbUpdateFileHeader(&(pHelper->files.nLastF), 0); - fsync(pHelper->files.nLastF.fd); - close(pHelper->files.nLastF.fd); - pHelper->files.nLastF.fd = -1; + pFile = helperNewLastF(pHelper); + if (pFile->fd > 0) { + if (!hasError) tsdbUpdateFileHeader(pFile, 0); + fsync(pFile->fd); + close(pFile->fd); + pFile->fd = -1; if (hasError) { - (void)remove(pHelper->files.nLastF.fname); + (void)remove(pFile->fname); } else { - if (rename(pHelper->files.nLastF.fname, pHelper->files.lastF.fname) < 0) { - tsdbError("failed to rename file from %s to %s since %s", pHelper->files.nLastF.fname, - pHelper->files.lastF.fname, strerror(errno)); + if (rename(pFile->fname, helperLastF(pHelper)->fname) < 0) { + tsdbError("failed to rename file from %s to %s since %s", pFile->fname, helperLastF(pHelper)->fname, + strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } - pHelper->files.lastF.info = pHelper->files.nLastF.info; + helperLastF(pHelper)->info = helperNewLastF(pHelper)->info; } } } @@ -224,18 +271,35 @@ void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) { pHelper->tableInfo.tid = pTable->tableId.tid; pHelper->tableInfo.uid = pTable->tableId.uid; STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1); - pHelper->tableInfo.sversion = schemaVersion(pSchema); tdInitDataCols(pHelper->pDataCols[0], pSchema); tdInitDataCols(pHelper->pDataCols[1], pSchema); - SCompIdx *pIdx = pHelper->pCompIdx + pTable->tableId.tid; - if (pIdx->offset > 0) { - if (pIdx->uid != TABLE_UID(pTable)) { - memset((void *)pIdx, 0, sizeof(SCompIdx)); - } else { - if (pIdx->hasLast) pHelper->hasOldLastBlock = true; + if (pHelper->idxH.numOfIdx > 0) { + while (true) { + if (pHelper->idxH.curIdx >= pHelper->idxH.numOfIdx) { + memset(&(pHelper->curCompIdx), 0, sizeof(SCompIdx)); + break; + } + + SCompIdx *pIdx = &(pHelper->idxH.pIdxArray[pHelper->idxH.curIdx]); + if (pIdx->tid == TABLE_TID(pTable)) { + if (pIdx->uid == TABLE_UID(pTable)) { + pHelper->curCompIdx = *pIdx; + } else { + memset(&(pHelper->curCompIdx), 0, sizeof(SCompIdx)); + } + pHelper->idxH.curIdx++; + break; + } else if (pIdx->tid > TABLE_TID(pTable)) { + memset(&(pHelper->curCompIdx), 0, sizeof(SCompIdx)); + break; + } else { + pHelper->idxH.curIdx++; + } } + } else { + memset(&(pHelper->curCompIdx), 0, sizeof(SCompIdx)); } helperSetState(pHelper, TSDB_HELPER_TABLE_SET); @@ -245,8 +309,8 @@ void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) { int tsdbCommitTableData(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey) { ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER); - SCompIdx * pIdx = &(pHelper->pCompIdx[TABLE_TID(pCommitIter->pTable)]); - int blkIdx = 0; + SCompIdx *pIdx = &(pHelper->curCompIdx); + int blkIdx = 0; ASSERT(pIdx->offset == 0 || pIdx->uid == TABLE_UID(pCommitIter->pTable)); if (tsdbLoadCompInfo(pHelper, NULL) < 0) return -1; @@ -271,44 +335,53 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) { STsdbCfg *pCfg = &pHelper->pRepo->config; ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER); - SCompIdx * pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; + SCompIdx * pIdx = &(pHelper->curCompIdx); SCompBlock compBlock = {0}; if (TSDB_NLAST_FILE_OPENED(pHelper) && (pHelper->hasOldLastBlock)) { if (tsdbLoadCompInfo(pHelper, NULL) < 0) return -1; SCompBlock *pCompBlock = blockAtIdx(pHelper, pIdx->numOfBlocks - 1); ASSERT(pCompBlock->last); + if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; + ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows && + pHelper->pDataCols[0]->numOfRows < pCfg->minRowsPerFileBlock); + if (tsdbWriteBlockToFile(pHelper, helperNewLastF(pHelper), pHelper->pDataCols[0], &compBlock, true, true) < 0) + return -1; + if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1; + +#if 0 if (pCompBlock->numOfSubBlocks > 1) { if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows && pHelper->pDataCols[0]->numOfRows < pCfg->minRowsPerFileBlock); - if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.nLastF), pHelper->pDataCols[0], &compBlock, true, true) < 0) + if (tsdbWriteBlockToFile(pHelper, helperNewLastF(pHelper), pHelper->pDataCols[0], &compBlock, true, true) < 0) return -1; if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1; } else { - if (lseek(pHelper->files.lastF.fd, pCompBlock->offset, SEEK_SET) < 0) { - tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pHelper->files.lastF.fname, + if (lseek(helperLastF(pHelper)->fd, pCompBlock->offset, SEEK_SET) < 0) { + tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), helperLastF(pHelper)->fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } - pCompBlock->offset = lseek(pHelper->files.nLastF.fd, 0, SEEK_END); + pCompBlock->offset = lseek(helperNewLastF(pHelper)->fd, 0, SEEK_END); if (pCompBlock->offset < 0) { - tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pHelper->files.nLastF.fname, + tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), helperNewLastF(pHelper)->fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } - if (tsendfile(pHelper->files.nLastF.fd, pHelper->files.lastF.fd, NULL, pCompBlock->len) < pCompBlock->len) { + if (tsendfile(helperNewLastF(pHelper)->fd, helperLastF(pHelper)->fd, NULL, pCompBlock->len) < pCompBlock->len) { tsdbError("vgId:%d failed to sendfile from file %s to file %s since %s", REPO_ID(pHelper->pRepo), - pHelper->files.lastF.fname, pHelper->files.nLastF.fname, strerror(errno)); + helperLastF(pHelper)->fname, helperNewLastF(pHelper)->fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } } +#endif pHelper->hasOldLastBlock = false; } @@ -317,164 +390,178 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) { } int tsdbWriteCompInfo(SRWHelper *pHelper) { - off_t offset = 0; - SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; - if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) { - if (pIdx->offset > 0) { - offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END); - if (offset < 0) { - tsdbError("vgId:%d failed to lseed file %s since %s", REPO_ID(pHelper->pRepo), pHelper->files.nHeadF.fname, - strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } + SCompIdx *pIdx = &(pHelper->curCompIdx); + off_t offset = 0; + SFile * pFile = helperNewHeadF(pHelper); - pIdx->offset = offset; - ASSERT(pIdx->offset >= TSDB_FILE_HEAD_SIZE); - - if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, pIdx->len) < pIdx->len) { - tsdbError("vgId:%d failed to send %d bytes from file %s to %s since %s", REPO_ID(pHelper->pRepo), pIdx->len, - pHelper->files.headF.fname, pHelper->files.nHeadF.fname, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - } - } else { - if (pIdx->len > 0) { + if (pIdx->len > 0) { + if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) { + if (tsdbLoadCompInfo(pHelper, NULL) < 0) return -1; + } else { pHelper->pCompInfo->delimiter = TSDB_FILE_DELIMITER; pHelper->pCompInfo->uid = pHelper->tableInfo.uid; - pHelper->pCompInfo->checksum = 0; + pHelper->pCompInfo->tid = pHelper->tableInfo.tid; ASSERT(pIdx->len > sizeof(SCompInfo) + sizeof(TSCKSUM) && (pIdx->len - sizeof(SCompInfo) - sizeof(TSCKSUM)) % sizeof(SCompBlock) == 0); taosCalcChecksumAppend(0, (uint8_t *)pHelper->pCompInfo, pIdx->len); - offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END); - if (offset < 0) { - tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pHelper->files.nHeadF.fname, - strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - pIdx->offset = offset; - pIdx->uid = pHelper->tableInfo.uid; - ASSERT(pIdx->offset >= TSDB_FILE_HEAD_SIZE); + } - if (twrite(pHelper->files.nHeadF.fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) { - tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pHelper->pRepo), pIdx->len, - pHelper->files.nHeadF.fname, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); + pFile->info.magic = taosCalcChecksum( + pFile->info.magic, (uint8_t *)POINTER_SHIFT(pHelper->pCompInfo, pIdx->len - sizeof(TSCKSUM)), sizeof(TSCKSUM)); + offset = lseek(pFile->fd, 0, SEEK_END); + if (offset < 0) { + tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + pIdx->offset = offset; + pIdx->uid = pHelper->tableInfo.uid; + pIdx->tid = pHelper->tableInfo.tid; + ASSERT(pIdx->offset >= TSDB_FILE_HEAD_SIZE); + + if (twrite(pFile->fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) { + tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pHelper->pRepo), pIdx->len, + pFile->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + +#ifdef TSDB_IDX + pFile = helperNewIdxF(pHelper); +#endif + + if (tsizeof(pHelper->pWIdx) < pFile->info.len + sizeof(SCompIdx) + 12) { + pHelper->pWIdx = trealloc(pHelper->pWIdx, tsizeof(pHelper->pWIdx) == 0 ? 1024 : tsizeof(pHelper->pWIdx) * 2); + if (pHelper->pWIdx == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } } + + void *pBuf = POINTER_SHIFT(pHelper->pWIdx, pFile->info.len); + pFile->info.len += tsdbEncodeSCompIdx(&pBuf, &(pHelper->curCompIdx)); } return 0; } int tsdbWriteCompIdx(SRWHelper *pHelper) { - STsdbCfg *pCfg = &pHelper->pRepo->config; - ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER); - off_t offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END); - if (offset < 0) { - tsdbError("vgId:%d failed to lseek file %s to end since %s", REPO_ID(pHelper->pRepo), pHelper->files.nHeadF.fname, - strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } + off_t offset = 0; - SFile *pFile = &(pHelper->files.nHeadF); - pFile->info.offset = offset; +#ifdef TSDB_IDX + SFile *pFile = helperNewIdxF(pHelper); +#else + SFile *pFile = helperNewHeadF(pHelper); +#endif - void *buf = pHelper->pBuffer; - for (uint32_t i = 0; i < pCfg->maxTables; i++) { - SCompIdx *pCompIdx = pHelper->pCompIdx + i; - if (pCompIdx->offset > 0) { - int drift = POINTER_DISTANCE(buf, pHelper->pBuffer); - if (tsizeof(pHelper->pBuffer) - drift < 128) { - pHelper->pBuffer = trealloc(pHelper->pBuffer, tsizeof(pHelper->pBuffer) * 2); - if (pHelper->pBuffer == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } - } - buf = POINTER_SHIFT(pHelper->pBuffer, drift); - taosEncodeVariantU32(&buf, i); - tsdbEncodeSCompIdx(&buf, pCompIdx); + pFile->info.len += sizeof(TSCKSUM); + if (tsizeof(pHelper->pWIdx) < pFile->info.len) { + pHelper->pWIdx = trealloc(pHelper->pWIdx, pFile->info.len); + if (pHelper->pWIdx == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; } } + taosCalcChecksumAppend(0, (uint8_t *)pHelper->pWIdx, pFile->info.len); + pFile->info.magic = taosCalcChecksum( + pFile->info.magic, (uint8_t *)POINTER_SHIFT(pHelper->pWIdx, pFile->info.len - sizeof(TSCKSUM)), sizeof(TSCKSUM)); - int tsize = (char *)buf - (char *)pHelper->pBuffer + sizeof(TSCKSUM); - taosCalcChecksumAppend(0, (uint8_t *)pHelper->pBuffer, tsize); - - if (twrite(pHelper->files.nHeadF.fd, (void *)pHelper->pBuffer, tsize) < tsize) { - tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pHelper->pRepo), tsize, - pHelper->files.nHeadF.fname, strerror(errno)); + offset = lseek(pFile->fd, 0, SEEK_END); + if (offset < 0) { + tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } - pFile->info.len = tsize; + + pFile->info.offset = offset; + + if (twrite(pFile->fd, (void *)pHelper->pWIdx, pFile->info.len) < pFile->info.len) { + tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pHelper->pRepo), pFile->info.len, + pFile->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + return 0; } int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) { - STsdbCfg *pCfg = &(pHelper->pRepo->config); - ASSERT(pHelper->state == TSDB_HELPER_FILE_SET_AND_OPEN); +#ifdef TSDB_IDX + SFile *pFile = helperIdxF(pHelper); +#else + SFile *pFile = helperHeadF(pHelper); +#endif + int fd = pFile->fd; if (!helperHasState(pHelper, TSDB_HELPER_IDX_LOAD)) { // If not load from file, just load it in object - SFile *pFile = &(pHelper->files.headF); - int fd = pFile->fd; - - memset(pHelper->pCompIdx, 0, tsizeof(pHelper->pCompIdx)); - if (pFile->info.offset > 0) { - ASSERT(pFile->info.offset > TSDB_FILE_HEAD_SIZE); - - if (lseek(fd, pFile->info.offset, SEEK_SET) < 0) { - tsdbError("vgId:%d failed to lseek file %s to %u since %s", REPO_ID(pHelper->pRepo), pFile->fname, - pFile->info.offset, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } + if (pFile->info.len > 0) { if ((pHelper->pBuffer = trealloc(pHelper->pBuffer, pFile->info.len)) == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } + + if (lseek(fd, pFile->info.offset, SEEK_SET) < 0) { + tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + if (tread(fd, (void *)(pHelper->pBuffer), pFile->info.len) < pFile->info.len) { tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pHelper->pRepo), pFile->info.len, pFile->fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } + if (!taosCheckChecksumWhole((uint8_t *)(pHelper->pBuffer), pFile->info.len)) { - tsdbError("vgId:%d file %s SCompIdx part is corrupted. offset %u len %u", REPO_ID(pHelper->pRepo), pFile->fname, - pFile->info.offset, pFile->info.len); + tsdbError("vgId:%d file %s SCompIdx part is corrupted. len %u", REPO_ID(pHelper->pRepo), pFile->fname, + pFile->info.len); terrno = TSDB_CODE_TDB_FILE_CORRUPTED; return -1; } // Decode it + pHelper->idxH.numOfIdx = 0; void *ptr = pHelper->pBuffer; while (POINTER_DISTANCE(ptr, pHelper->pBuffer) < (pFile->info.len - sizeof(TSCKSUM))) { - uint32_t tid = 0; - if ((ptr = taosDecodeVariantU32(ptr, &tid)) == NULL) return -1; - ASSERT(tid > 0 && tid < pCfg->maxTables); + size_t tlen = tsizeof(pHelper->idxH.pIdxArray); + pHelper->idxH.numOfIdx++; - if ((ptr = tsdbDecodeSCompIdx(ptr, pHelper->pCompIdx + tid)) == NULL) return -1; + if (tlen < pHelper->idxH.numOfIdx * sizeof(SCompIdx)) { + pHelper->idxH.pIdxArray = (SCompIdx *)trealloc(pHelper->idxH.pIdxArray, (tlen == 0) ? 1024 : tlen * 2); + if (pHelper->idxH.pIdxArray == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + } + + ptr = tsdbDecodeSCompIdx(ptr, &(pHelper->idxH.pIdxArray[pHelper->idxH.numOfIdx - 1])); + if (ptr == NULL) { + tsdbError("vgId:%d file %s SCompIdx part is corrupted. len %u", REPO_ID(pHelper->pRepo), pFile->fname, + pFile->info.len); + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + return -1; + } + + ASSERT(pHelper->idxH.numOfIdx == 1 || pHelper->idxH.pIdxArray[pHelper->idxH.numOfIdx - 1].tid > + pHelper->idxH.pIdxArray[pHelper->idxH.numOfIdx - 2].tid); ASSERT(POINTER_DISTANCE(ptr, pHelper->pBuffer) <= pFile->info.len - sizeof(TSCKSUM)); } - - if (lseek(fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } } } helperSetState(pHelper, TSDB_HELPER_IDX_LOAD); + if (helperType(pHelper) == TSDB_WRITE_HELPER) { + pFile->info.len = 0; + } + // Copy the memory for outside usage - if (target) memcpy(target, pHelper->pCompIdx, tsizeof(pHelper->pCompIdx)); + if (target && pHelper->idxH.numOfIdx > 0) + memcpy(target, pHelper->idxH.pIdxArray, sizeof(SCompIdx) * pHelper->idxH.numOfIdx); return 0; } @@ -482,15 +569,15 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) { int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) { ASSERT(helperHasState(pHelper, TSDB_HELPER_TABLE_SET)); - SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; + SCompIdx *pIdx = &(pHelper->curCompIdx); - int fd = pHelper->files.headF.fd; + int fd = helperHeadF(pHelper)->fd; if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) { if (pIdx->offset > 0) { ASSERT(pIdx->uid == pHelper->tableInfo.uid); if (lseek(fd, pIdx->offset, SEEK_SET) < 0) { - tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pHelper->files.headF.fname, + tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), helperHeadF(pHelper)->fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -499,18 +586,18 @@ int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) { pHelper->pCompInfo = trealloc((void *)pHelper->pCompInfo, pIdx->len); if (tread(fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) { tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pHelper->pRepo), pIdx->len, - pHelper->files.headF.fname, strerror(errno)); + helperHeadF(pHelper)->fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } if (!taosCheckChecksumWhole((uint8_t *)pHelper->pCompInfo, pIdx->len)) { tsdbError("vgId:%d file %s SCompInfo part is corrupted, tid %d uid %" PRIu64, REPO_ID(pHelper->pRepo), - pHelper->files.headF.fname, pHelper->tableInfo.tid, pHelper->tableInfo.uid); + helperHeadF(pHelper)->fname, pHelper->tableInfo.tid, pHelper->tableInfo.uid); terrno = TSDB_CODE_TDB_FILE_CORRUPTED; return -1; } - ASSERT(pIdx->uid == pHelper->pCompInfo->uid); + ASSERT(pIdx->uid == pHelper->pCompInfo->uid && pIdx->tid == pHelper->pCompInfo->tid); } helperSetState(pHelper, TSDB_HELPER_INFO_LOAD); @@ -523,7 +610,7 @@ int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) { int tsdbLoadCompData(SRWHelper *pHelper, SCompBlock *pCompBlock, void *target) { ASSERT(pCompBlock->numOfSubBlocks <= 1); - SFile *pFile = (pCompBlock->last) ? &(pHelper->files.lastF) : &(pHelper->files.dataF); + SFile *pFile = (pCompBlock->last) ? helperLastF(pHelper) : helperDataF(pHelper); if (lseek(pFile->fd, pCompBlock->offset, SEEK_SET) < 0) { tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname, strerror(errno)); @@ -642,9 +729,9 @@ _err: // ---------------------- INTERNAL FUNCTIONS ---------------------- static bool tsdbShouldCreateNewLast(SRWHelper *pHelper) { - ASSERT(pHelper->files.lastF.fd > 0); + ASSERT(helperLastF(pHelper)->fd > 0); struct stat st; - if (fstat(pHelper->files.lastF.fd, &st) < 0) return true; + if (fstat(helperLastF(pHelper)->fd, &st) < 0) return true; if (st.st_size > 32 * 1024 + TSDB_FILE_HEAD_SIZE) return true; return false; } @@ -729,6 +816,8 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa ASSERT(flen > 0); flen += sizeof(TSCKSUM); taosCalcChecksumAppend(0, (uint8_t *)tptr, flen); + pFile->info.magic = + taosCalcChecksum(pFile->info.magic, (uint8_t *)POINTER_SHIFT(tptr, flen - sizeof(TSCKSUM)), sizeof(TSCKSUM)); if (ncol != 0) { pCompCol->offset = toffset; @@ -747,6 +836,8 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa pCompData->numOfCols = nColsNotAllNull; taosCalcChecksumAppend(0, (uint8_t *)pCompData, tsize); + pFile->info.magic = taosCalcChecksum(pFile->info.magic, (uint8_t *)POINTER_SHIFT(pCompData, tsize - sizeof(TSCKSUM)), + sizeof(TSCKSUM)); // Write the whole block to file if (twrite(pFile->fd, (void *)pCompData, lsize) < lsize) { @@ -804,7 +895,7 @@ static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t esize) { } static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx) { - SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; + SCompIdx *pIdx = &(pHelper->curCompIdx); ASSERT(blkIdx >= 0 && blkIdx <= pIdx->numOfBlocks); ASSERT(pCompBlock->numOfSubBlocks == 1); @@ -851,7 +942,7 @@ _err: static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx, int rowsAdded) { ASSERT(pCompBlock->numOfSubBlocks == 0); - SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; + SCompIdx *pIdx = &(pHelper->curCompIdx); ASSERT(blkIdx >= 0 && blkIdx < pIdx->numOfBlocks); SCompBlock *pSCompBlock = pHelper->pCompInfo->blocks + blkIdx; @@ -935,7 +1026,7 @@ _err: static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx) { ASSERT(pCompBlock->numOfSubBlocks == 1); - SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; + SCompIdx *pIdx = &(pHelper->curCompIdx); ASSERT(blkIdx >= 0 && blkIdx < pIdx->numOfBlocks); @@ -971,24 +1062,21 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int } static void tsdbResetHelperFileImpl(SRWHelper *pHelper) { + pHelper->idxH.numOfIdx = 0; + pHelper->idxH.curIdx = 0; memset((void *)&pHelper->files, 0, sizeof(pHelper->files)); - pHelper->files.fid = -1; - pHelper->files.headF.fd = -1; - pHelper->files.dataF.fd = -1; - pHelper->files.lastF.fd = -1; - pHelper->files.nHeadF.fd = -1; - pHelper->files.nLastF.fd = -1; + helperHeadF(pHelper)->fd = -1; + helperDataF(pHelper)->fd = -1; + helperLastF(pHelper)->fd = -1; + helperNewHeadF(pHelper)->fd = -1; + helperNewLastF(pHelper)->fd = -1; +#ifdef TSDB_IDX + helperIdxF(pHelper)->fd = -1; + helperNewIdxF(pHelper)->fd = -1; +#endif } static int tsdbInitHelperFile(SRWHelper *pHelper) { - STsdbCfg *pCfg = &pHelper->pRepo->config; - size_t tsize = sizeof(SCompIdx) * pCfg->maxTables + sizeof(TSCKSUM); - pHelper->pCompIdx = (SCompIdx *)tmalloc(tsize); - if (pHelper->pCompIdx == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } - tsdbResetHelperFileImpl(pHelper); return 0; } @@ -996,7 +1084,8 @@ static int tsdbInitHelperFile(SRWHelper *pHelper) { static void tsdbDestroyHelperFile(SRWHelper *pHelper) { tsdbCloseHelperFile(pHelper, false); tsdbResetHelperFileImpl(pHelper); - tzfree(pHelper->pCompIdx); + tzfree(pHelper->idxH.pIdxArray); + tzfree(pHelper->pWIdx); } // ---------- Operations on Helper Table part @@ -1154,7 +1243,7 @@ static int tsdbLoadBlockDataColsImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, ASSERT(pCompBlock->numOfSubBlocks <= 1); ASSERT(colIds[0] == 0); - SFile * pFile = (pCompBlock->last) ? &(pHelper->files.lastF) : &(pHelper->files.dataF); + SFile * pFile = (pCompBlock->last) ? helperLastF(pHelper) : helperDataF(pHelper); SCompCol compCol = {0}; // If only load timestamp column, no need to load SCompData part @@ -1215,7 +1304,7 @@ _err: static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols) { ASSERT(pCompBlock->numOfSubBlocks <= 1); - SFile *pFile = (pCompBlock->last) ? &(pHelper->files.lastF) : &(pHelper->files.dataF); + SFile *pFile = (pCompBlock->last) ? helperLastF(pHelper) : helperDataF(pHelper); pHelper->pBuffer = trealloc(pHelper->pBuffer, pCompBlock->len); if (pHelper->pBuffer == NULL) { @@ -1314,6 +1403,7 @@ _err: static int tsdbEncodeSCompIdx(void **buf, SCompIdx *pIdx) { int tlen = 0; + tlen += taosEncodeVariantI32(buf, pIdx->tid); tlen += taosEncodeVariantU32(buf, pIdx->len); tlen += taosEncodeVariantU32(buf, pIdx->offset); tlen += taosEncodeFixedU8(buf, pIdx->hasLast); @@ -1329,6 +1419,7 @@ static void *tsdbDecodeSCompIdx(void *buf, SCompIdx *pIdx) { uint32_t numOfBlocks = 0; uint64_t value = 0; + if ((buf = taosDecodeVariantI32(buf, &(pIdx->tid))) == NULL) return NULL; if ((buf = taosDecodeVariantU32(buf, &(pIdx->len))) == NULL) return NULL; if ((buf = taosDecodeVariantU32(buf, &(pIdx->offset))) == NULL) return NULL; if ((buf = taosDecodeFixedU8(buf, &(hasLast))) == NULL) return NULL; @@ -1346,7 +1437,7 @@ static void *tsdbDecodeSCompIdx(void *buf, SCompIdx *pIdx) { static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey) { STsdbCfg * pCfg = &(pHelper->pRepo->config); STable * pTable = pCommitIter->pTable; - SCompIdx * pIdx = pHelper->pCompIdx + TABLE_TID(pTable); + SCompIdx * pIdx = &(pHelper->curCompIdx); TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter); int defaultRowsInBlock = pCfg->maxRowsPerFileBlock * 4 / 5; SCompBlock compBlock = {0}; @@ -1362,7 +1453,7 @@ static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, ASSERT(rowsRead > 0 && rowsRead == pDataCols->numOfRows); if (rowsRead + pCompBlock->numOfRows < pCfg->minRowsPerFileBlock && pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && !TSDB_NLAST_FILE_OPENED(pHelper)) { - if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, &compBlock, true, false) < 0) return -1; + if (tsdbWriteBlockToFile(pHelper, helperLastF(pHelper), pDataCols, &compBlock, true, false) < 0) return -1; if (tsdbAddSubBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1, rowsRead) < 0) return -1; } else { if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; @@ -1393,7 +1484,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, int *blkIdx) { STsdbCfg * pCfg = &(pHelper->pRepo->config); STable * pTable = pCommitIter->pTable; - SCompIdx * pIdx = pHelper->pCompIdx + TABLE_TID(pTable); + SCompIdx * pIdx = &(pHelper->curCompIdx); SCompBlock compBlock = {0}; TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter); int defaultRowsInBlock = pCfg->maxRowsPerFileBlock * 4 / 5; @@ -1427,7 +1518,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, rows1, pDataCols, pDataCols0->cols[0].pData, pDataCols0->numOfRows); ASSERT(rowsRead == rows2 && rowsRead == pDataCols->numOfRows); - if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, &compBlock, true, false) < 0) return -1; + if (tsdbWriteBlockToFile(pHelper, helperLastF(pHelper), pDataCols, &compBlock, true, false) < 0) return -1; if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1; tblkIdx++; } else { @@ -1466,7 +1557,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, if (rowsRead == 0) break; ASSERT(rowsRead == pDataCols->numOfRows); - if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, &compBlock, false, true) < 0) return -1; + if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1; if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; tblkIdx++; } @@ -1493,7 +1584,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, keyLimit, rows, pDataCols, pDataCols0->cols[0].pData, pDataCols0->numOfRows); ASSERT(rowsRead == rows && rowsRead == pDataCols->numOfRows); - if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, &compBlock, false, false) < 0) + if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, false) < 0) return -1; if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1; tblkIdx++; @@ -1506,7 +1597,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, keyLimit, defaultRowsInBlock); if (rowsRead == 0) break; - if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, &compBlock, false, true) < 0) + if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1; if (round == 0) { if (tsdbUpdateSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; @@ -1577,10 +1668,10 @@ static int tsdbWriteBlockToProperFile(SRWHelper *pHelper, SDataCols *pDataCols, ASSERT(pDataCols->numOfRows > 0); if (pDataCols->numOfRows >= pCfg->minRowsPerFileBlock) { - pFile = &(pHelper->files.dataF); + pFile = helperDataF(pHelper); } else { isLast = true; - pFile = TSDB_NLAST_FILE_OPENED(pHelper) ? &(pHelper->files.nLastF) : &(pHelper->files.lastF); + pFile = TSDB_NLAST_FILE_OPENED(pHelper) ? helperNewLastF(pHelper) : helperLastF(pHelper); } ASSERT(pFile->fd > 0); diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 37784577c4..b2204948d0 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -126,12 +126,13 @@ typedef struct STsdbQueryHandle { SIOCostSummary cost; } STsdbQueryHandle; -static void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle); -static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle); -static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock); +static void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle); +static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle); +static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock); static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order); -static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win, - STsdbQueryHandle* pQueryHandle); +static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win, + STsdbQueryHandle* pQueryHandle); +static int tsdbCheckInfoCompar(const void* key1, const void* key2); static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) { pBlockLoadInfo->slot = -1; @@ -236,7 +237,8 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab taosArrayPush(pQueryHandle->pTableCheckInfo, &info); } } - + + taosArraySort(pQueryHandle->pTableCheckInfo, tsdbCheckInfoCompar); pQueryHandle->defaultLoadColumn = getDefaultLoadColumns(pQueryHandle, true); tsdbDebug("%p total numOfTable:%zu in query, %p", pQueryHandle, taosArrayGetSize(pQueryHandle->pTableCheckInfo), pQueryHandle->qinfo); @@ -554,7 +556,9 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i); pCheckInfo->numOfBlocks = 0; - SCompIdx* compIndex = &pQueryHandle->rhelper.pCompIdx[pCheckInfo->tableId.tid]; + tsdbSetHelperTable(&pQueryHandle->rhelper, pCheckInfo->pTableObj, pQueryHandle->pTsdb); + + SCompIdx* compIndex = &pQueryHandle->rhelper.curCompIdx; // no data block in this file, try next file if (compIndex->len == 0 || compIndex->numOfBlocks == 0 || compIndex->uid != pCheckInfo->tableId.uid) { @@ -571,8 +575,6 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo pCheckInfo->compSize = compIndex->len; } - tsdbSetHelperTable(&pQueryHandle->rhelper, pCheckInfo->pTableObj, pQueryHandle->pTsdb); - tsdbLoadCompInfo(&(pQueryHandle->rhelper), (void *)(pCheckInfo->pCompInfo)); SCompInfo* pCompInfo = pCheckInfo->pCompInfo; @@ -2431,3 +2433,13 @@ void tsdbDestroyTableGroup(STableGroupInfo *pGroupList) { taosArrayDestroy(pGroupList->pGroupList); } +static int tsdbCheckInfoCompar(const void* key1, const void* key2) { + if (((STableCheckInfo*)key1)->tableId.tid < ((STableCheckInfo*)key2)->tableId.tid) { + return -1; + } else if (((STableCheckInfo*)key1)->tableId.tid > ((STableCheckInfo*)key2)->tableId.tid) { + return 1; + } else { + ASSERT(false); + return 0; + } +} \ No newline at end of file diff --git a/src/util/inc/tkvstore.h b/src/util/inc/tkvstore.h index 346e567c41..6d67607e24 100644 --- a/src/util/inc/tkvstore.h +++ b/src/util/inc/tkvstore.h @@ -25,10 +25,11 @@ typedef int (*iterFunc)(void *, void *cont, int contLen); typedef void (*afterFunc)(void *); typedef struct { - int64_t size; // including 512 bytes of header size - int64_t tombSize; - int64_t nRecords; - int64_t nDels; + int64_t size; // including 512 bytes of header size + int64_t tombSize; + int64_t nRecords; + int64_t nDels; + uint32_t magic; } SStoreInfo; typedef struct { @@ -45,6 +46,8 @@ typedef struct { SStoreInfo info; } SKVStore; +#define KVSTORE_MAGIC(s) (s)->info.magic + int tdCreateKVStore(char *fname); int tdDestroyKVStore(char *fname); SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH); diff --git a/src/util/src/tkvstore.c b/src/util/src/tkvstore.c index ab2aa738c6..0704285da5 100644 --- a/src/util/src/tkvstore.c +++ b/src/util/src/tkvstore.c @@ -34,6 +34,7 @@ #define TD_KVSTORE_MAINOR_VERSION 0 #define TD_KVSTORE_SNAP_SUFFIX ".snap" #define TD_KVSTORE_NEW_SUFFIX ".new" +#define TD_KVSTORE_INIT_MAGIC 0xFFFFFFFF typedef struct { uint64_t uid; @@ -140,6 +141,7 @@ SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH if (tdLoadKVStoreHeader(pStore->fd, pStore->fname, &info) < 0) goto _err; pStore->info.size = TD_KVSTORE_HEADER_SIZE; + pStore->info.magic = info.magic; if (tdRestoreKVStore(pStore) < 0) goto _err; @@ -251,6 +253,8 @@ int tdUpdateKVStoreRecord(SKVStore *pStore, uint64_t uid, void *cont, int contLe return -1; } + pStore->info.magic = + taosCalcChecksum(pStore->info.magic, (uint8_t *)POINTER_SHIFT(cont, contLen - sizeof(TSCKSUM)), sizeof(TSCKSUM)); pStore->info.size += (sizeof(SKVRecord) + contLen); SKVRecord *pRecord = taosHashGet(pStore->map, (void *)&uid, sizeof(uid)); if (pRecord != NULL) { // just to insert @@ -288,6 +292,7 @@ int tdDropKVStoreRecord(SKVStore *pStore, uint64_t uid) { return -1; } + pStore->info.magic = taosCalcChecksum(pStore->info.magic, (uint8_t *)buf, POINTER_DISTANCE(pBuf, buf)); pStore->info.size += POINTER_DISTANCE(pBuf, buf); pStore->info.nDels++; pStore->info.nRecords--; @@ -371,7 +376,7 @@ static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo) { } static int tdInitKVStoreHeader(int fd, char *fname) { - SStoreInfo info = {TD_KVSTORE_HEADER_SIZE, 0, 0, 0}; + SStoreInfo info = {TD_KVSTORE_HEADER_SIZE, 0, 0, 0, TD_KVSTORE_INIT_MAGIC}; return tdUpdateKVStoreHeader(fd, fname, &info); } @@ -382,6 +387,7 @@ static int tdEncodeStoreInfo(void **buf, SStoreInfo *pInfo) { tlen += taosEncodeVariantI64(buf, pInfo->tombSize); tlen += taosEncodeVariantI64(buf, pInfo->nRecords); tlen += taosEncodeVariantI64(buf, pInfo->nDels); + tlen += taosEncodeFixedU32(buf, pInfo->magic); return tlen; } @@ -391,6 +397,7 @@ static void *tdDecodeStoreInfo(void *buf, SStoreInfo *pInfo) { buf = taosDecodeVariantI64(buf, &(pInfo->tombSize)); buf = taosDecodeVariantI64(buf, &(pInfo->nRecords)); buf = taosDecodeVariantI64(buf, &(pInfo->nDels)); + buf = taosDecodeFixedU32(buf, &(pInfo->magic)); return buf; }