diff --git a/source/dnode/vnode/tsdb/CMakeLists.txt b/source/dnode/vnode/tsdb/CMakeLists.txt index b11a846500..2d3f6d6e42 100644 --- a/source/dnode/vnode/tsdb/CMakeLists.txt +++ b/source/dnode/vnode/tsdb/CMakeLists.txt @@ -12,7 +12,7 @@ else(0) "src/tsdbWrite.c" "src/tsdbReadImpl.c" "src/tsdbFile.c" - # "src/tsdbFS.c" + "src/tsdbFS.c" ) endif(0) diff --git a/source/dnode/vnode/tsdb/inc/tsdbCommit.h b/source/dnode/vnode/tsdb/inc/tsdbCommit.h index a29296a686..4043f22dd8 100644 --- a/source/dnode/vnode/tsdb/inc/tsdbCommit.h +++ b/source/dnode/vnode/tsdb/inc/tsdbCommit.h @@ -29,10 +29,19 @@ typedef struct { int64_t size; } SKVRecord; +void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn); + +static FORCE_INLINE int TSDB_KEY_FID(TSKEY key, int32_t days, int8_t precision) { + if (key < 0) { + return (int)((key + 1) / tsTickPerDay[precision] / days - 1); + } else { + return (int)((key / tsTickPerDay[precision] / days)); + } +} + #if 0 #define TSDB_DEFAULT_BLOCK_ROWS(maxRows) ((maxRows)*4 / 5) -void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn); int tsdbEncodeKVRecord(void **buf, SKVRecord *pRecord); void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord); void *tsdbCommitData(STsdbRepo *pRepo); diff --git a/source/dnode/vnode/tsdb/inc/tsdbDef.h b/source/dnode/vnode/tsdb/inc/tsdbDef.h index 236e2b2d35..e81c51441f 100644 --- a/source/dnode/vnode/tsdb/inc/tsdbDef.h +++ b/source/dnode/vnode/tsdb/inc/tsdbDef.h @@ -27,7 +27,7 @@ #include "tsdb.h" #include "tsdbCommit.h" -// #include "tsdbFS.h" +#include "tsdbFS.h" #include "tsdbFile.h" #include "tsdbLog.h" #include "tsdbMemTable.h" @@ -47,12 +47,12 @@ struct STsdb { STsdbMemTable * imem; SRtn rtn; SMemAllocatorFactory *pmaf; - // STsdbFS fs; + STsdbFS fs; }; #define REPO_ID(r) 0 #define REPO_CFG(r) (&(r)->config) -// #define REPO_FS(r) (&(r)->fs) +#define REPO_FS(r) (&(r)->fs) #ifdef __cplusplus } diff --git a/source/dnode/vnode/tsdb/src/tsdbCommit.c b/source/dnode/vnode/tsdb/src/tsdbCommit.c index 1247dcd728..0080e14181 100644 --- a/source/dnode/vnode/tsdb/src/tsdbCommit.c +++ b/source/dnode/vnode/tsdb/src/tsdbCommit.c @@ -32,6 +32,23 @@ int tsdbCommit(STsdb *pTsdb) { return 0; } +void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn) { + STsdbCfg *pCfg = REPO_CFG(pRepo); + TSKEY minKey, midKey, maxKey, now; + + now = taosGetTimestamp(pCfg->precision); + minKey = now - pCfg->keep * tsTickPerDay[pCfg->precision]; + midKey = now - pCfg->keep2 * tsTickPerDay[pCfg->precision]; + maxKey = now - pCfg->keep1 * tsTickPerDay[pCfg->precision]; + + pRtn->minKey = minKey; + pRtn->minFid = (int)(TSDB_KEY_FID(minKey, pCfg->daysPerFile, pCfg->precision)); + pRtn->midFid = (int)(TSDB_KEY_FID(midKey, pCfg->daysPerFile, pCfg->precision)); + pRtn->maxFid = (int)(TSDB_KEY_FID(maxKey, pCfg->daysPerFile, pCfg->precision)); + tsdbDebug("vgId:%d now:%" PRId64 " minKey:%" PRId64 " minFid:%d, midFid:%d, maxFid:%d", REPO_ID(pRepo), now, minKey, + pRtn->minFid, pRtn->midFid, pRtn->maxFid); +} + #if 0 /* * Copyright (c) 2019 TAOS Data, Inc. @@ -420,23 +437,6 @@ void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord) { return buf; } -void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn) { - STsdbCfg *pCfg = REPO_CFG(pRepo); - TSKEY minKey, midKey, maxKey, now; - - now = taosGetTimestamp(pCfg->precision); - minKey = now - pCfg->keep * tsTickPerDay[pCfg->precision]; - midKey = now - pCfg->keep2 * tsTickPerDay[pCfg->precision]; - maxKey = now - pCfg->keep1 * tsTickPerDay[pCfg->precision]; - - pRtn->minKey = minKey; - pRtn->minFid = (int)(TSDB_KEY_FID(minKey, pCfg->daysPerFile, pCfg->precision)); - pRtn->midFid = (int)(TSDB_KEY_FID(midKey, pCfg->daysPerFile, pCfg->precision)); - pRtn->maxFid = (int)(TSDB_KEY_FID(maxKey, pCfg->daysPerFile, pCfg->precision)); - tsdbDebug("vgId:%d now:%" PRId64 " minKey:%" PRId64 " minFid:%d, midFid:%d, maxFid:%d", REPO_ID(pRepo), now, minKey, - pRtn->minFid, pRtn->midFid, pRtn->maxFid); -} - static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen, bool compact) { char buf[64] = "\0"; void * pBuf = buf; diff --git a/source/dnode/vnode/tsdb/src/tsdbFS.c b/source/dnode/vnode/tsdb/src/tsdbFS.c index 210a272b13..eefc21a109 100644 --- a/source/dnode/vnode/tsdb/src/tsdbFS.c +++ b/source/dnode/vnode/tsdb/src/tsdbFS.c @@ -34,8 +34,16 @@ static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf); static int tsdbRestoreCurrent(STsdb *pRepo); static int tsdbComparTFILE(const void *arg1, const void *arg2); static void tsdbScanAndTryFixDFilesHeader(STsdb *pRepo, int32_t *nExpired); -static int tsdbProcessExpiredFS(STsdb *pRepo); -static int tsdbCreateMeta(STsdb *pRepo); +// static int tsdbProcessExpiredFS(STsdb *pRepo); +// static int tsdbCreateMeta(STsdb *pRepo); + +static void tsdbGetRootDir(int repoid, char dirName[]) { + snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb", repoid); +} + +static void tsdbGetDataDir(int repoid, char dirName[]) { + snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/data", repoid); +} // For backward compatibility // ================== CURRENT file header info @@ -104,11 +112,11 @@ static void *tsdbDecodeDFileSetArray(void *buf, SArray *pArray) { } static int tsdbEncodeFSStatus(void **buf, SFSStatus *pStatus) { - ASSERT(pStatus->pmf); + // ASSERT(pStatus->pmf); int tlen = 0; - tlen += tsdbEncodeSMFile(buf, pStatus->pmf); + // tlen += tsdbEncodeSMFile(buf, pStatus->pmf); tlen += tsdbEncodeDFileSetArray(buf, pStatus->df); return tlen; @@ -117,9 +125,9 @@ static int tsdbEncodeFSStatus(void **buf, SFSStatus *pStatus) { static void *tsdbDecodeFSStatus(void *buf, SFSStatus *pStatus) { tsdbResetFSStatus(pStatus); - pStatus->pmf = &(pStatus->mf); + // pStatus->pmf = &(pStatus->mf); - buf = tsdbDecodeSMFile(buf, pStatus->pmf); + // buf = tsdbDecodeSMFile(buf, pStatus->pmf); buf = tsdbDecodeDFileSetArray(buf, pStatus->df); return buf; @@ -132,7 +140,7 @@ static SFSStatus *tsdbNewFSStatus(int maxFSet) { return NULL; } - TSDB_FILE_SET_CLOSED(&(pStatus->mf)); + // TSDB_FILE_SET_CLOSED(&(pStatus->mf)); pStatus->df = taosArrayInit(maxFSet, sizeof(SDFileSet)); if (pStatus->df == NULL) { @@ -158,18 +166,18 @@ static void tsdbResetFSStatus(SFSStatus *pStatus) { return; } - TSDB_FILE_SET_CLOSED(&(pStatus->mf)); + // TSDB_FILE_SET_CLOSED(&(pStatus->mf)); - pStatus->pmf = NULL; + // pStatus->pmf = NULL; taosArrayClear(pStatus->df); } -static void tsdbSetStatusMFile(SFSStatus *pStatus, const SMFile *pMFile) { - ASSERT(pStatus->pmf == NULL); +// static void tsdbSetStatusMFile(SFSStatus *pStatus, const SMFile *pMFile) { +// ASSERT(pStatus->pmf == NULL); - pStatus->pmf = &(pStatus->mf); - tsdbInitMFileEx(pStatus->pmf, (SMFile *)pMFile); -} +// pStatus->pmf = &(pStatus->mf); +// tsdbInitMFileEx(pStatus->pmf, (SMFile *)pMFile); +// } static int tsdbAddDFileSetToStatus(SFSStatus *pStatus, const SDFileSet *pSet) { if (taosArrayPush(pStatus->df, (void *)pSet) == NULL) { @@ -240,61 +248,61 @@ void *tsdbFreeFS(STsdbFS *pfs) { return NULL; } -static int tsdbProcessExpiredFS(STsdb *pRepo) { - tsdbStartFSTxn(pRepo, 0, 0); - if (tsdbCreateMeta(pRepo) < 0) { - tsdbError("vgId:%d failed to create meta since %s", REPO_ID(pRepo), tstrerror(terrno)); - return -1; - } +// static int tsdbProcessExpiredFS(STsdb *pRepo) { +// tsdbStartFSTxn(pRepo, 0, 0); +// // if (tsdbCreateMeta(pRepo) < 0) { +// // tsdbError("vgId:%d failed to create meta since %s", REPO_ID(pRepo), tstrerror(terrno)); +// // return -1; +// // } - if (tsdbApplyRtn(pRepo) < 0) { - tsdbEndFSTxnWithError(REPO_FS(pRepo)); - tsdbError("vgId:%d failed to apply rtn since %s", REPO_ID(pRepo), tstrerror(terrno)); - return -1; - } - if (tsdbEndFSTxn(pRepo) < 0) { - tsdbError("vgId:%d failed to end fs txn since %s", REPO_ID(pRepo), tstrerror(terrno)); - return -1; - } - return 0; -} +// if (tsdbApplyRtn(pRepo) < 0) { +// tsdbEndFSTxnWithError(REPO_FS(pRepo)); +// tsdbError("vgId:%d failed to apply rtn since %s", REPO_ID(pRepo), tstrerror(terrno)); +// return -1; +// } +// if (tsdbEndFSTxn(pRepo) < 0) { +// tsdbError("vgId:%d failed to end fs txn since %s", REPO_ID(pRepo), tstrerror(terrno)); +// return -1; +// } +// return 0; +// } -static int tsdbCreateMeta(STsdb *pRepo) { - STsdbFS *pfs = REPO_FS(pRepo); - SMFile * pOMFile = pfs->cstatus->pmf; - SMFile mf; - SDiskID did; +// static int tsdbCreateMeta(STsdb *pRepo) { +// STsdbFS *pfs = REPO_FS(pRepo); +// SMFile * pOMFile = pfs->cstatus->pmf; +// SMFile mf; +// SDiskID did; - if (pOMFile != NULL) { - // keep the old meta file - tsdbUpdateMFile(pfs, pOMFile); - return 0; - } +// if (pOMFile != NULL) { +// // keep the old meta file +// tsdbUpdateMFile(pfs, pOMFile); +// return 0; +// } - // Create a new meta file - did.level = TFS_PRIMARY_LEVEL; - did.id = TFS_PRIMARY_ID; - tsdbInitMFile(&mf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo))); +// // Create a new meta file +// did.level = TFS_PRIMARY_LEVEL; +// did.id = TFS_PRIMARY_ID; +// tsdbInitMFile(&mf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo))); - if (tsdbCreateMFile(&mf, true) < 0) { - tsdbError("vgId:%d failed to create META file since %s", REPO_ID(pRepo), tstrerror(terrno)); - return -1; - } +// if (tsdbCreateMFile(&mf, true) < 0) { +// tsdbError("vgId:%d failed to create META file since %s", REPO_ID(pRepo), tstrerror(terrno)); +// return -1; +// } - tsdbInfo("vgId:%d meta file %s is created", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(&mf)); +// tsdbInfo("vgId:%d meta file %s is created", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(&mf)); - if (tsdbUpdateMFileHeader(&mf) < 0) { - tsdbError("vgId:%d failed to update META file header since %s, revert it", REPO_ID(pRepo), tstrerror(terrno)); - tsdbApplyMFileChange(&mf, pOMFile); - return -1; - } +// if (tsdbUpdateMFileHeader(&mf) < 0) { +// tsdbError("vgId:%d failed to update META file header since %s, revert it", REPO_ID(pRepo), tstrerror(terrno)); +// tsdbApplyMFileChange(&mf, pOMFile); +// return -1; +// } - TSDB_FILE_FSYNC(&mf); - tsdbCloseMFile(&mf); - tsdbUpdateMFile(pfs, &mf); +// TSDB_FILE_FSYNC(&mf); +// tsdbCloseMFile(&mf); +// tsdbUpdateMFile(pfs, &mf); - return 0; -} +// return 0; +// } int tsdbOpenFS(STsdb *pRepo) { STsdbFS *pfs = REPO_FS(pRepo); @@ -313,9 +321,9 @@ int tsdbOpenFS(STsdb *pRepo) { } tsdbScanAndTryFixDFilesHeader(pRepo, &nExpired); - if (nExpired > 0) { - tsdbProcessExpiredFS(pRepo); - } + // if (nExpired > 0) { + // tsdbProcessExpiredFS(pRepo); + // } } else { // should skip expired fileset inside of the function if (tsdbRestoreCurrent(pRepo) < 0) { @@ -329,11 +337,11 @@ int tsdbOpenFS(STsdb *pRepo) { return -1; } - // Load meta cache if has meta file - if ((!(pRepo->state & TSDB_STATE_BAD_META)) && tsdbLoadMetaCache(pRepo, true) < 0) { - tsdbError("vgId:%d failed to open FS while loading meta cache since %s", REPO_ID(pRepo), tstrerror(terrno)); - return -1; - } + // // Load meta cache if has meta file + // if ((!(pRepo->state & TSDB_STATE_BAD_META)) && tsdbLoadMetaCache(pRepo, true) < 0) { + // tsdbError("vgId:%d failed to open FS while loading meta cache since %s", REPO_ID(pRepo), tstrerror(terrno)); + // return -1; + // } return 0; } @@ -350,11 +358,11 @@ void tsdbStartFSTxn(STsdb *pRepo, int64_t pointsAdd, int64_t storageAdd) { pfs->intxn = true; tsdbResetFSStatus(pfs->nstatus); pfs->nstatus->meta = pfs->cstatus->meta; - if (pfs->cstatus->pmf == NULL) { - pfs->nstatus->meta.version = 0; - } else { - pfs->nstatus->meta.version = pfs->cstatus->meta.version + 1; - } + // if (pfs->cstatus->pmf == NULL) { + pfs->nstatus->meta.version = 0; + // } else { + // pfs->nstatus->meta.version = pfs->cstatus->meta.version + 1; + // } pfs->nstatus->meta.totalPoints = pfs->cstatus->meta.totalPoints + pointsAdd; pfs->nstatus->meta.totalStorage = pfs->cstatus->meta.totalStorage += storageAdd; } @@ -393,7 +401,7 @@ int tsdbEndFSTxnWithError(STsdbFS *pfs) { return 0; } -void tsdbUpdateMFile(STsdbFS *pfs, const SMFile *pMFile) { tsdbSetStatusMFile(pfs->nstatus, pMFile); } +// void tsdbUpdateMFile(STsdbFS *pfs, const SMFile *pMFile) { tsdbSetStatusMFile(pfs->nstatus, pMFile); } int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet) { return tsdbAddDFileSetToStatus(pfs->nstatus, pSet); } @@ -415,8 +423,7 @@ static int tsdbSaveFSStatus(SFSStatus *pStatus, int vid) { } fsheader.version = TSDB_FS_VERSION; - if (pStatus->pmf == NULL) { - ASSERT(taosArrayGetSize(pStatus->df) == 0); + if (taosArrayGetSize(pStatus->df) == 0) { fsheader.len = 0; } else { fsheader.len = tsdbEncodeFSStatus(NULL, pStatus) + sizeof(TSCKSUM); @@ -429,7 +436,7 @@ static int tsdbSaveFSStatus(SFSStatus *pStatus, int vid) { taosCalcChecksumAppend(0, (uint8_t *)hbuf, TSDB_FILE_HEAD_SIZE); - if (taosWrite(fd, hbuf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) { + if (taosWriteFile(fd, hbuf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) { terrno = TAOS_SYSTEM_ERROR(errno); close(fd); remove(tfname); @@ -448,7 +455,7 @@ static int tsdbSaveFSStatus(SFSStatus *pStatus, int vid) { tsdbEncodeFSStatus(&ptr, pStatus); taosCalcChecksumAppend(0, (uint8_t *)pBuf, fsheader.len); - if (taosWrite(fd, pBuf, fsheader.len) < fsheader.len) { + if (taosWriteFile(fd, pBuf, fsheader.len) < fsheader.len) { terrno = TAOS_SYSTEM_ERROR(errno); close(fd); (void)remove(tfname); @@ -458,7 +465,7 @@ static int tsdbSaveFSStatus(SFSStatus *pStatus, int vid) { } // fsync, close and rename - if (taosFsync(fd) < 0) { + if (taosFsyncFile(fd) < 0) { terrno = TAOS_SYSTEM_ERROR(errno); close(fd); remove(tfname); @@ -467,7 +474,7 @@ static int tsdbSaveFSStatus(SFSStatus *pStatus, int vid) { } (void)close(fd); - (void)taosRename(tfname, cfname); + (void)taosRenameFile(tfname, cfname); taosTZfree(pBuf); return 0; @@ -484,7 +491,7 @@ static void tsdbApplyFSTxnOnDisk(SFSStatus *pFrom, SFSStatus *pTo) { sizeTo = taosArrayGetSize(pTo->df); // Apply meta file change - (void)tsdbApplyMFileChange(pFrom->pmf, pTo->pmf); + // (void)tsdbApplyMFileChange(pFrom->pmf, pTo->pmf); // Apply SDFileSet change if (ifrom >= sizeFrom) { @@ -664,7 +671,7 @@ static int tsdbOpenFSFromCurrent(STsdb *pRepo) { goto _err; } - int nread = (int)taosRead(fd, buffer, TSDB_FILE_HEAD_SIZE); + int nread = (int)taosReadFile(fd, buffer, TSDB_FILE_HEAD_SIZE); if (nread < 0) { tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pRepo), TSDB_FILENAME_LEN, current, strerror(errno)); @@ -698,7 +705,7 @@ static int tsdbOpenFSFromCurrent(STsdb *pRepo) { goto _err; } - nread = (int)taosRead(fd, buffer, fsheader.len); + nread = (int)taosReadFile(fd, buffer, fsheader.len); if (nread < 0) { tsdbError("vgId:%d failed to read file %s since %s", REPO_ID(pRepo), current, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); @@ -741,10 +748,10 @@ static int tsdbScanAndTryFixFS(STsdb *pRepo) { STsdbFS * pfs = REPO_FS(pRepo); SFSStatus *pStatus = pfs->cstatus; - if (tsdbScanAndTryFixMFile(pRepo) < 0) { - tsdbError("vgId:%d failed to fix MFile since %s", REPO_ID(pRepo), tstrerror(terrno)); - return -1; - } + // if (tsdbScanAndTryFixMFile(pRepo) < 0) { + // tsdbError("vgId:%d failed to fix MFile since %s", REPO_ID(pRepo), tstrerror(terrno)); + // return -1; + // } size_t size = taosArrayGetSize(pStatus->df); @@ -763,141 +770,141 @@ static int tsdbScanAndTryFixFS(STsdb *pRepo) { return 0; } -int tsdbLoadMetaCache(STsdb *pRepo, bool recoverMeta) { - char tbuf[128]; - STsdbFS * pfs = REPO_FS(pRepo); - SMFile mf; - SMFile * pMFile = &mf; - void * pBuf = NULL; - SKVRecord rInfo; - int64_t maxBufSize = 0; - SMFInfo minfo; +// int tsdbLoadMetaCache(STsdb *pRepo, bool recoverMeta) { +// char tbuf[128]; +// STsdbFS * pfs = REPO_FS(pRepo); +// SMFile mf; +// SMFile * pMFile = &mf; +// void * pBuf = NULL; +// SKVRecord rInfo; +// int64_t maxBufSize = 0; +// SMFInfo minfo; - taosHashClear(pfs->metaCache); +// taosHashClear(pfs->metaCache); - // No meta file, just return - if (pfs->cstatus->pmf == NULL) return 0; +// // No meta file, just return +// if (pfs->cstatus->pmf == NULL) return 0; - mf = pfs->cstatus->mf; - // Load cache first - if (tsdbOpenMFile(pMFile, O_RDONLY) < 0) { - return -1; - } +// mf = pfs->cstatus->mf; +// // Load cache first +// if (tsdbOpenMFile(pMFile, O_RDONLY) < 0) { +// return -1; +// } - if (tsdbLoadMFileHeader(pMFile, &minfo) < 0) { - tsdbCloseMFile(pMFile); - return -1; - } +// if (tsdbLoadMFileHeader(pMFile, &minfo) < 0) { +// tsdbCloseMFile(pMFile); +// return -1; +// } - while (true) { - int64_t tsize = tsdbReadMFile(pMFile, tbuf, sizeof(SKVRecord)); - if (tsize == 0) break; +// while (true) { +// int64_t tsize = tsdbReadMFile(pMFile, tbuf, sizeof(SKVRecord)); +// if (tsize == 0) break; - if (tsize < 0) { - tsdbError("vgId:%d failed to read META file since %s", REPO_ID(pRepo), tstrerror(terrno)); - return -1; - } +// if (tsize < 0) { +// tsdbError("vgId:%d failed to read META file since %s", REPO_ID(pRepo), tstrerror(terrno)); +// return -1; +// } - if (tsize < sizeof(SKVRecord)) { - tsdbError("vgId:%d failed to read %" PRIzu " bytes from file %s", REPO_ID(pRepo), sizeof(SKVRecord), - TSDB_FILE_FULL_NAME(pMFile)); - terrno = TSDB_CODE_TDB_FILE_CORRUPTED; - tsdbCloseMFile(pMFile); - return -1; - } +// if (tsize < sizeof(SKVRecord)) { +// tsdbError("vgId:%d failed to read %" PRIzu " bytes from file %s", REPO_ID(pRepo), sizeof(SKVRecord), +// TSDB_FILE_FULL_NAME(pMFile)); +// terrno = TSDB_CODE_TDB_FILE_CORRUPTED; +// tsdbCloseMFile(pMFile); +// return -1; +// } - void *ptr = tsdbDecodeKVRecord(tbuf, &rInfo); - ASSERT(POINTER_DISTANCE(ptr, tbuf) == sizeof(SKVRecord)); - // ASSERT((rInfo.offset > 0) ? (pStore->info.size == rInfo.offset) : true); +// void *ptr = tsdbDecodeKVRecord(tbuf, &rInfo); +// ASSERT(POINTER_DISTANCE(ptr, tbuf) == sizeof(SKVRecord)); +// // ASSERT((rInfo.offset > 0) ? (pStore->info.size == rInfo.offset) : true); - if (rInfo.offset < 0) { - taosHashRemove(pfs->metaCache, (void *)(&rInfo.uid), sizeof(rInfo.uid)); -#if 0 - pStore->info.size += sizeof(SKVRecord); - pStore->info.nRecords--; - pStore->info.nDels++; - pStore->info.tombSize += (rInfo.size + sizeof(SKVRecord) * 2); -#endif - } else { - ASSERT(rInfo.offset > 0 && rInfo.size > 0); - if (taosHashPut(pfs->metaCache, (void *)(&rInfo.uid), sizeof(rInfo.uid), &rInfo, sizeof(rInfo)) < 0) { - tsdbError("vgId:%d failed to load meta cache from file %s since OOM", REPO_ID(pRepo), - TSDB_FILE_FULL_NAME(pMFile)); - terrno = TSDB_CODE_COM_OUT_OF_MEMORY; - tsdbCloseMFile(pMFile); - return -1; - } +// if (rInfo.offset < 0) { +// taosHashRemove(pfs->metaCache, (void *)(&rInfo.uid), sizeof(rInfo.uid)); +// #if 0 +// pStore->info.size += sizeof(SKVRecord); +// pStore->info.nRecords--; +// pStore->info.nDels++; +// pStore->info.tombSize += (rInfo.size + sizeof(SKVRecord) * 2); +// #endif +// } else { +// ASSERT(rInfo.offset > 0 && rInfo.size > 0); +// if (taosHashPut(pfs->metaCache, (void *)(&rInfo.uid), sizeof(rInfo.uid), &rInfo, sizeof(rInfo)) < 0) { +// tsdbError("vgId:%d failed to load meta cache from file %s since OOM", REPO_ID(pRepo), +// TSDB_FILE_FULL_NAME(pMFile)); +// terrno = TSDB_CODE_COM_OUT_OF_MEMORY; +// tsdbCloseMFile(pMFile); +// return -1; +// } - maxBufSize = MAX(maxBufSize, rInfo.size); +// maxBufSize = MAX(maxBufSize, rInfo.size); - if (tsdbSeekMFile(pMFile, rInfo.size, SEEK_CUR) < 0) { - tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), - tstrerror(terrno)); - tsdbCloseMFile(pMFile); - return -1; - } +// if (tsdbSeekMFile(pMFile, rInfo.size, SEEK_CUR) < 0) { +// tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), +// tstrerror(terrno)); +// tsdbCloseMFile(pMFile); +// return -1; +// } -#if 0 - pStore->info.size += (sizeof(SKVRecord) + rInfo.size); - pStore->info.nRecords++; -#endif - } - } +// #if 0 +// pStore->info.size += (sizeof(SKVRecord) + rInfo.size); +// pStore->info.nRecords++; +// #endif +// } +// } - if (recoverMeta) { - pBuf = malloc((size_t)maxBufSize); - if (pBuf == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - tsdbCloseMFile(pMFile); - return -1; - } +// if (recoverMeta) { +// pBuf = malloc((size_t)maxBufSize); +// if (pBuf == NULL) { +// terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; +// tsdbCloseMFile(pMFile); +// return -1; +// } - SKVRecord *pRecord = taosHashIterate(pfs->metaCache, NULL); - while (pRecord) { - if (tsdbSeekMFile(pMFile, pRecord->offset + sizeof(SKVRecord), SEEK_SET) < 0) { - tsdbError("vgId:%d failed to seek file %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), - tstrerror(terrno)); - tfree(pBuf); - tsdbCloseMFile(pMFile); - return -1; - } +// SKVRecord *pRecord = taosHashIterate(pfs->metaCache, NULL); +// while (pRecord) { +// if (tsdbSeekMFile(pMFile, pRecord->offset + sizeof(SKVRecord), SEEK_SET) < 0) { +// tsdbError("vgId:%d failed to seek file %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), +// tstrerror(terrno)); +// tfree(pBuf); +// tsdbCloseMFile(pMFile); +// return -1; +// } - int nread = (int)tsdbReadMFile(pMFile, pBuf, pRecord->size); - if (nread < 0) { - tsdbError("vgId:%d failed to read file %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), - tstrerror(terrno)); - tfree(pBuf); - tsdbCloseMFile(pMFile); - return -1; - } +// int nread = (int)tsdbReadMFile(pMFile, pBuf, pRecord->size); +// if (nread < 0) { +// tsdbError("vgId:%d failed to read file %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), +// tstrerror(terrno)); +// tfree(pBuf); +// tsdbCloseMFile(pMFile); +// return -1; +// } - if (nread < pRecord->size) { - tsdbError("vgId:%d failed to read file %s since file corrupted, expected read:%" PRId64 " actual read:%d", - REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), pRecord->size, nread); - terrno = TSDB_CODE_TDB_FILE_CORRUPTED; - tfree(pBuf); - tsdbCloseMFile(pMFile); - return -1; - } +// if (nread < pRecord->size) { +// tsdbError("vgId:%d failed to read file %s since file corrupted, expected read:%" PRId64 " actual read:%d", +// REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), pRecord->size, nread); +// terrno = TSDB_CODE_TDB_FILE_CORRUPTED; +// tfree(pBuf); +// tsdbCloseMFile(pMFile); +// return -1; +// } - if (tsdbRestoreTable(pRepo, pBuf, (int)pRecord->size) < 0) { - tsdbError("vgId:%d failed to restore table, uid %" PRId64 ", since %s" PRIu64, REPO_ID(pRepo), pRecord->uid, - tstrerror(terrno)); - tfree(pBuf); - tsdbCloseMFile(pMFile); - return -1; - } +// if (tsdbRestoreTable(pRepo, pBuf, (int)pRecord->size) < 0) { +// tsdbError("vgId:%d failed to restore table, uid %" PRId64 ", since %s" PRIu64, REPO_ID(pRepo), pRecord->uid, +// tstrerror(terrno)); +// tfree(pBuf); +// tsdbCloseMFile(pMFile); +// return -1; +// } - pRecord = taosHashIterate(pfs->metaCache, pRecord); - } +// pRecord = taosHashIterate(pfs->metaCache, pRecord); +// } - tsdbOrgMeta(pRepo); - } +// tsdbOrgMeta(pRepo); +// } - tsdbCloseMFile(pMFile); - tfree(pBuf); - return 0; -} +// tsdbCloseMFile(pMFile); +// tfree(pBuf); +// return 0; +// } static int tsdbScanRootDir(STsdb *pRepo) { char rootDir[TSDB_FILENAME_LEN]; @@ -920,9 +927,9 @@ static int tsdbScanRootDir(STsdb *pRepo) { continue; } - if (pfs->cstatus->pmf && tfsIsSameFile(pf, &(pfs->cstatus->pmf->f))) { - continue; - } + // if (/*pfs->cstatus->pmf && */ tfsIsSameFile(pf, &(pfs->cstatus->pmf->f))) { + // continue; + // } (void)tfsremove(pf); tsdbDebug("vgId:%d invalid file %s is removed", REPO_ID(pRepo), TFILE_NAME(pf)); @@ -977,126 +984,126 @@ static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf) { return false; } -static int tsdbRestoreMeta(STsdb *pRepo) { - char rootDir[TSDB_FILENAME_LEN]; - char bname[TSDB_FILENAME_LEN]; - TDIR * tdir = NULL; - const TFILE *pf = NULL; - const char * pattern = "^meta(-ver[0-9]+)?$"; - regex_t regex; - STsdbFS * pfs = REPO_FS(pRepo); +// static int tsdbRestoreMeta(STsdb *pRepo) { +// char rootDir[TSDB_FILENAME_LEN]; +// char bname[TSDB_FILENAME_LEN]; +// TDIR * tdir = NULL; +// const TFILE *pf = NULL; +// const char * pattern = "^meta(-ver[0-9]+)?$"; +// regex_t regex; +// STsdbFS * pfs = REPO_FS(pRepo); - regcomp(®ex, pattern, REG_EXTENDED); +// regcomp(®ex, pattern, REG_EXTENDED); - tsdbInfo("vgId:%d try to restore meta", REPO_ID(pRepo)); +// tsdbInfo("vgId:%d try to restore meta", REPO_ID(pRepo)); - tsdbGetRootDir(REPO_ID(pRepo), rootDir); +// tsdbGetRootDir(REPO_ID(pRepo), rootDir); - tdir = tfsOpendir(rootDir); - if (tdir == NULL) { - tsdbError("vgId:%d failed to open dir %s since %s", REPO_ID(pRepo), rootDir, tstrerror(terrno)); - regfree(®ex); - return -1; - } +// tdir = tfsOpendir(rootDir); +// if (tdir == NULL) { +// tsdbError("vgId:%d failed to open dir %s since %s", REPO_ID(pRepo), rootDir, tstrerror(terrno)); +// regfree(®ex); +// return -1; +// } - while ((pf = tfsReaddir(tdir))) { - tfsbasename(pf, bname); +// while ((pf = tfsReaddir(tdir))) { +// tfsbasename(pf, bname); - if (strcmp(bname, "data") == 0) { - // Skip the data/ directory - continue; - } +// if (strcmp(bname, "data") == 0) { +// // Skip the data/ directory +// continue; +// } - if (strcmp(bname, tsdbTxnFname[TSDB_TXN_TEMP_FILE]) == 0) { - // Skip current.t file - tsdbInfo("vgId:%d file %s exists, remove it", REPO_ID(pRepo), TFILE_NAME(pf)); - (void)tfsremove(pf); - continue; - } +// if (strcmp(bname, tsdbTxnFname[TSDB_TXN_TEMP_FILE]) == 0) { +// // Skip current.t file +// tsdbInfo("vgId:%d file %s exists, remove it", REPO_ID(pRepo), TFILE_NAME(pf)); +// (void)tfsremove(pf); +// continue; +// } - int code = regexec(®ex, bname, 0, NULL, 0); - if (code == 0) { - // Match - if (pfs->cstatus->pmf != NULL) { - tsdbError("vgId:%d failed to restore meta since two file exists, file1 %s and file2 %s", REPO_ID(pRepo), - TSDB_FILE_FULL_NAME(pfs->cstatus->pmf), TFILE_NAME(pf)); - terrno = TSDB_CODE_TDB_FILE_CORRUPTED; - tfsClosedir(tdir); - regfree(®ex); - return -1; - } else { - uint32_t _version = 0; - if (strcmp(bname, "meta") != 0) { - sscanf(bname, "meta-ver%" PRIu32, &_version); - pfs->cstatus->meta.version = _version; - } +// int code = regexec(®ex, bname, 0, NULL, 0); +// if (code == 0) { +// // Match +// if (pfs->cstatus->pmf != NULL) { +// tsdbError("vgId:%d failed to restore meta since two file exists, file1 %s and file2 %s", REPO_ID(pRepo), +// TSDB_FILE_FULL_NAME(pfs->cstatus->pmf), TFILE_NAME(pf)); +// terrno = TSDB_CODE_TDB_FILE_CORRUPTED; +// tfsClosedir(tdir); +// regfree(®ex); +// return -1; +// } else { +// uint32_t _version = 0; +// if (strcmp(bname, "meta") != 0) { +// sscanf(bname, "meta-ver%" PRIu32, &_version); +// pfs->cstatus->meta.version = _version; +// } - pfs->cstatus->pmf = &(pfs->cstatus->mf); - pfs->cstatus->pmf->f = *pf; - TSDB_FILE_SET_CLOSED(pfs->cstatus->pmf); +// pfs->cstatus->pmf = &(pfs->cstatus->mf); +// pfs->cstatus->pmf->f = *pf; +// TSDB_FILE_SET_CLOSED(pfs->cstatus->pmf); - if (tsdbOpenMFile(pfs->cstatus->pmf, O_RDONLY) < 0) { - tsdbError("vgId:%d failed to restore meta since %s", REPO_ID(pRepo), tstrerror(terrno)); - tfsClosedir(tdir); - regfree(®ex); - return -1; - } +// if (tsdbOpenMFile(pfs->cstatus->pmf, O_RDONLY) < 0) { +// tsdbError("vgId:%d failed to restore meta since %s", REPO_ID(pRepo), tstrerror(terrno)); +// tfsClosedir(tdir); +// regfree(®ex); +// return -1; +// } - if (tsdbLoadMFileHeader(pfs->cstatus->pmf, &(pfs->cstatus->pmf->info)) < 0) { - tsdbError("vgId:%d failed to restore meta since %s", REPO_ID(pRepo), tstrerror(terrno)); - tsdbCloseMFile(pfs->cstatus->pmf); - tfsClosedir(tdir); - regfree(®ex); - return -1; - } +// if (tsdbLoadMFileHeader(pfs->cstatus->pmf, &(pfs->cstatus->pmf->info)) < 0) { +// tsdbError("vgId:%d failed to restore meta since %s", REPO_ID(pRepo), tstrerror(terrno)); +// tsdbCloseMFile(pfs->cstatus->pmf); +// tfsClosedir(tdir); +// regfree(®ex); +// return -1; +// } - if (tsdbForceKeepFile) { - struct stat tfstat; +// if (tsdbForceKeepFile) { +// struct stat tfstat; - // Get real file size - if (fstat(pfs->cstatus->pmf->fd, &tfstat) < 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - tsdbCloseMFile(pfs->cstatus->pmf); - tfsClosedir(tdir); - regfree(®ex); - return -1; - } +// // Get real file size +// if (fstat(pfs->cstatus->pmf->fd, &tfstat) < 0) { +// terrno = TAOS_SYSTEM_ERROR(errno); +// tsdbCloseMFile(pfs->cstatus->pmf); +// tfsClosedir(tdir); +// regfree(®ex); +// return -1; +// } - if (pfs->cstatus->pmf->info.size != tfstat.st_size) { - int64_t tfsize = pfs->cstatus->pmf->info.size; - pfs->cstatus->pmf->info.size = tfstat.st_size; - tsdbInfo("vgId:%d file %s header size is changed from %" PRId64 " to %" PRId64, REPO_ID(pRepo), - TSDB_FILE_FULL_NAME(pfs->cstatus->pmf), tfsize, pfs->cstatus->pmf->info.size); - } - } +// if (pfs->cstatus->pmf->info.size != tfstat.st_size) { +// int64_t tfsize = pfs->cstatus->pmf->info.size; +// pfs->cstatus->pmf->info.size = tfstat.st_size; +// tsdbInfo("vgId:%d file %s header size is changed from %" PRId64 " to %" PRId64, REPO_ID(pRepo), +// TSDB_FILE_FULL_NAME(pfs->cstatus->pmf), tfsize, pfs->cstatus->pmf->info.size); +// } +// } - tsdbCloseMFile(pfs->cstatus->pmf); - } - } else if (code == REG_NOMATCH) { - // Not match - tsdbInfo("vgId:%d invalid file %s exists, remove it", REPO_ID(pRepo), TFILE_NAME(pf)); - tfsremove(pf); - continue; - } else { - // Has other error - tsdbError("vgId:%d failed to restore meta file while run regexec since %s", REPO_ID(pRepo), strerror(code)); - terrno = TAOS_SYSTEM_ERROR(code); - tfsClosedir(tdir); - regfree(®ex); - return -1; - } - } +// tsdbCloseMFile(pfs->cstatus->pmf); +// } +// } else if (code == REG_NOMATCH) { +// // Not match +// tsdbInfo("vgId:%d invalid file %s exists, remove it", REPO_ID(pRepo), TFILE_NAME(pf)); +// tfsremove(pf); +// continue; +// } else { +// // Has other error +// tsdbError("vgId:%d failed to restore meta file while run regexec since %s", REPO_ID(pRepo), strerror(code)); +// terrno = TAOS_SYSTEM_ERROR(code); +// tfsClosedir(tdir); +// regfree(®ex); +// return -1; +// } +// } - if (pfs->cstatus->pmf) { - tsdbInfo("vgId:%d meta file %s is restored", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pfs->cstatus->pmf)); - } else { - tsdbInfo("vgId:%d no meta file is restored", REPO_ID(pRepo)); - } +// if (pfs->cstatus->pmf) { +// tsdbInfo("vgId:%d meta file %s is restored", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pfs->cstatus->pmf)); +// } else { +// tsdbInfo("vgId:%d no meta file is restored", REPO_ID(pRepo)); +// } - tfsClosedir(tdir); - regfree(®ex); - return 0; -} +// tfsClosedir(tdir); +// regfree(®ex); +// return 0; +// } static int tsdbRestoreDFileSet(STsdb *pRepo) { char dataDir[TSDB_FILENAME_LEN]; @@ -1268,11 +1275,11 @@ static int tsdbRestoreDFileSet(STsdb *pRepo) { } static int tsdbRestoreCurrent(STsdb *pRepo) { - // Loop to recover mfile - if (tsdbRestoreMeta(pRepo) < 0) { - tsdbError("vgId:%d failed to restore current since %s", REPO_ID(pRepo), tstrerror(terrno)); - return -1; - } + // // Loop to recover mfile + // if (tsdbRestoreMeta(pRepo) < 0) { + // tsdbError("vgId:%d failed to restore current since %s", REPO_ID(pRepo), tstrerror(terrno)); + // return -1; + // } // Loop to recover dfile set if (tsdbRestoreDFileSet(pRepo) < 0) { @@ -1280,7 +1287,7 @@ static int tsdbRestoreCurrent(STsdb *pRepo) { return -1; } - if (tsdbSaveFSStatus(pRepo->fs->cstatus, REPO_ID(pRepo)) < 0) { + if (tsdbSaveFSStatus(pRepo->fs.cstatus, REPO_ID(pRepo)) < 0) { tsdbError("vgId:%d failed to restore corrent since %s", REPO_ID(pRepo), tstrerror(terrno)); return -1; } diff --git a/source/dnode/vnode/tsdb/src/tsdbMemTable.c b/source/dnode/vnode/tsdb/src/tsdbMemTable.c index b075b23275..539d5440d4 100644 --- a/source/dnode/vnode/tsdb/src/tsdbMemTable.c +++ b/source/dnode/vnode/tsdb/src/tsdbMemTable.c @@ -335,7 +335,7 @@ static int tsdbTbDataComp(const void *arg1, const void *arg2) { static char *tsdbTbDataGetUid(const void *arg) { STbData *pTbData = (STbData *)arg; - return &(pTbData->uid); + return (char *)(&(pTbData->uid)); } /* ------------------------ REFACTORING ------------------------ */