diff --git a/include/dnode/vnode/tsdb/tsdb.h b/include/dnode/vnode/tsdb/tsdb.h index 67e33510cd..c19152de44 100644 --- a/include/dnode/vnode/tsdb/tsdb.h +++ b/include/dnode/vnode/tsdb/tsdb.h @@ -22,16 +22,38 @@ extern "C" { #endif +typedef struct SDataStatis { + int16_t colId; + int64_t sum; + int64_t max; + int64_t min; + int16_t maxIndex; + int16_t minIndex; + int16_t numOfNull; +} SDataStatis; + +typedef struct STable { + int32_t tid; + uint64_t uid; + STSchema *pSchema; +} STable; + +#define TABLE_TID(t) (t)->tid +#define TABLE_UID(t) (t)->uid + // TYPES EXPOSED typedef struct STsdb STsdb; typedef struct STsdbCfg { int8_t precision; uint64_t lruCacheSize; - uint32_t keep; - uint32_t keep1; - uint32_t keep2; int32_t daysPerFile; + int32_t minRowsPerFileBlock; + int32_t maxRowsPerFileBlock; + int32_t keep; + int32_t keep1; + int32_t keep2; + int8_t update; } STsdbCfg; // STsdb diff --git a/source/dnode/vnode/tsdb/CMakeLists.txt b/source/dnode/vnode/tsdb/CMakeLists.txt index 30e9d70f12..b11a846500 100644 --- a/source/dnode/vnode/tsdb/CMakeLists.txt +++ b/source/dnode/vnode/tsdb/CMakeLists.txt @@ -10,6 +10,9 @@ else(0) "src/tsdbMemTable.c" "src/tsdbOptions.c" "src/tsdbWrite.c" + "src/tsdbReadImpl.c" + "src/tsdbFile.c" + # "src/tsdbFS.c" ) endif(0) @@ -25,4 +28,5 @@ target_link_libraries( PUBLIC util PUBLIC common PUBLIC tkv + PUBLIC tfs ) \ No newline at end of file diff --git a/source/dnode/vnode/tsdb/inc/tsdbCommit.h b/source/dnode/vnode/tsdb/inc/tsdbCommit.h index 82ba1c9dff..a29296a686 100644 --- a/source/dnode/vnode/tsdb/inc/tsdbCommit.h +++ b/source/dnode/vnode/tsdb/inc/tsdbCommit.h @@ -16,7 +16,6 @@ #ifndef _TD_TSDB_COMMIT_H_ #define _TD_TSDB_COMMIT_H_ -#if 0 typedef struct { int minFid; int midFid; @@ -30,6 +29,7 @@ typedef struct { int64_t size; } SKVRecord; +#if 0 #define TSDB_DEFAULT_BLOCK_ROWS(maxRows) ((maxRows)*4 / 5) void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn); diff --git a/source/dnode/vnode/tsdb/inc/tsdbDef.h b/source/dnode/vnode/tsdb/inc/tsdbDef.h index cd3386c503..236e2b2d35 100644 --- a/source/dnode/vnode/tsdb/inc/tsdbDef.h +++ b/source/dnode/vnode/tsdb/inc/tsdbDef.h @@ -17,6 +17,7 @@ #define _TD_TSDB_DEF_H_ #include "mallocator.h" +#include "tcompression.h" #include "tglobal.h" #include "thash.h" #include "tlist.h" @@ -25,9 +26,14 @@ #include "ttime.h" #include "tsdb.h" +#include "tsdbCommit.h" +// #include "tsdbFS.h" +#include "tsdbFile.h" #include "tsdbLog.h" #include "tsdbMemTable.h" +#include "tsdbMemory.h" #include "tsdbOptions.h" +#include "tsdbReadImpl.h" #ifdef __cplusplus extern "C" { @@ -39,10 +45,14 @@ struct STsdb { STsdbCfg config; STsdbMemTable * mem; STsdbMemTable * imem; + SRtn rtn; SMemAllocatorFactory *pmaf; + // STsdbFS fs; }; -#define REPO_ID(r) (r)->vgId +#define REPO_ID(r) 0 +#define REPO_CFG(r) (&(r)->config) +// #define REPO_FS(r) (&(r)->fs) #ifdef __cplusplus } diff --git a/source/dnode/vnode/tsdb/inc/tsdbFS.h b/source/dnode/vnode/tsdb/inc/tsdbFS.h index 0320756783..dfd34deb84 100644 --- a/source/dnode/vnode/tsdb/inc/tsdbFS.h +++ b/source/dnode/vnode/tsdb/inc/tsdbFS.h @@ -16,7 +16,7 @@ #ifndef _TD_TSDB_FS_H_ #define _TD_TSDB_FS_H_ -#if 0 +#include "tsdbFile.h" #define TSDB_FS_VERSION 0 @@ -39,19 +39,17 @@ typedef struct { // ================== typedef struct { STsdbFSMeta meta; // FS meta - SMFile* pmf; // meta file pointer - SMFile mf; // meta file - SArray* df; // data file array + SArray * df; // data file array } SFSStatus; typedef struct { pthread_rwlock_t lock; - SFSStatus* cstatus; // current status - SHashObj* metaCache; // meta cache - SHashObj* metaCacheComp; // meta cache for compact + SFSStatus *cstatus; // current status + SHashObj * metaCache; // meta cache + SHashObj * metaCacheComp; // meta cache for compact bool intxn; - SFSStatus* nstatus; // new status + SFSStatus *nstatus; // new status } STsdbFS; #define FS_CURRENT_STATUS(pfs) ((pfs)->cstatus) @@ -63,10 +61,10 @@ typedef struct { typedef struct { int direction; uint64_t version; // current FS version - STsdbFS* pfs; + STsdbFS * pfs; int index; // used to position next fset when version the same int fid; // used to seek when version is changed - SDFileSet* pSet; + SDFileSet *pSet; } SFSIter; #define TSDB_FS_ITER_FORWARD TSDB_ORDER_ASC @@ -74,21 +72,21 @@ typedef struct { STsdbFS *tsdbNewFS(STsdbCfg *pCfg); void * tsdbFreeFS(STsdbFS *pfs); -int tsdbOpenFS(STsdbRepo *pRepo); -void tsdbCloseFS(STsdbRepo *pRepo); -void tsdbStartFSTxn(STsdbRepo *pRepo, int64_t pointsAdd, int64_t storageAdd); -int tsdbEndFSTxn(STsdbRepo *pRepo); +int tsdbOpenFS(STsdb *pRepo); +void tsdbCloseFS(STsdb *pRepo); +void tsdbStartFSTxn(STsdb *pRepo, int64_t pointsAdd, int64_t storageAdd); +int tsdbEndFSTxn(STsdb *pRepo); int tsdbEndFSTxnWithError(STsdbFS *pfs); void tsdbUpdateFSTxnMeta(STsdbFS *pfs, STsdbFSMeta *pMeta); -void tsdbUpdateMFile(STsdbFS *pfs, const SMFile *pMFile); +// void tsdbUpdateMFile(STsdbFS *pfs, const SMFile *pMFile); int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet); void tsdbFSIterInit(SFSIter *pIter, STsdbFS *pfs, int direction); void tsdbFSIterSeek(SFSIter *pIter, int fid); SDFileSet *tsdbFSIterNext(SFSIter *pIter); -int tsdbLoadMetaCache(STsdbRepo *pRepo, bool recoverMeta); +int tsdbLoadMetaCache(STsdb *pRepo, bool recoverMeta); -static FORCE_INLINE int tsdbRLockFS(STsdbFS* pFs) { +static FORCE_INLINE int tsdbRLockFS(STsdbFS *pFs) { int code = pthread_rwlock_rdlock(&(pFs->lock)); if (code != 0) { terrno = TAOS_SYSTEM_ERROR(code); @@ -97,7 +95,7 @@ static FORCE_INLINE int tsdbRLockFS(STsdbFS* pFs) { return 0; } -static FORCE_INLINE int tsdbWLockFS(STsdbFS* pFs) { +static FORCE_INLINE int tsdbWLockFS(STsdbFS *pFs) { int code = pthread_rwlock_wrlock(&(pFs->lock)); if (code != 0) { terrno = TAOS_SYSTEM_ERROR(code); @@ -106,7 +104,7 @@ static FORCE_INLINE int tsdbWLockFS(STsdbFS* pFs) { return 0; } -static FORCE_INLINE int tsdbUnLockFS(STsdbFS* pFs) { +static FORCE_INLINE int tsdbUnLockFS(STsdbFS *pFs) { int code = pthread_rwlock_unlock(&(pFs->lock)); if (code != 0) { terrno = TAOS_SYSTEM_ERROR(code); @@ -115,6 +113,4 @@ static FORCE_INLINE int tsdbUnLockFS(STsdbFS* pFs) { return 0; } -#endif - #endif /* _TD_TSDB_FS_H_ */ diff --git a/source/dnode/vnode/tsdb/inc/tsdbFile.h b/source/dnode/vnode/tsdb/inc/tsdbFile.h index 73a7de0249..fb21ef56a7 100644 --- a/source/dnode/vnode/tsdb/inc/tsdbFile.h +++ b/source/dnode/vnode/tsdb/inc/tsdbFile.h @@ -16,7 +16,8 @@ #ifndef _TS_TSDB_FILE_H_ #define _TS_TSDB_FILE_H_ -#if 0 +#include "tchecksum.h" +#include "tfs.h" #define TSDB_FILE_HEAD_SIZE 512 #define TSDB_FILE_DELIMITER 0xF00AFA0F @@ -34,7 +35,7 @@ #define TSDB_FILE_SET_CLOSED(f) (TSDB_FILE_FD(f) = -1) #define TSDB_FILE_LEVEL(tf) TFILE_LEVEL(TSDB_FILE_F(tf)) #define TSDB_FILE_ID(tf) TFILE_ID(TSDB_FILE_F(tf)) -#define TSDB_FILE_FSYNC(tf) taosFsync(TSDB_FILE_FD(tf)) +#define TSDB_FILE_FSYNC(tf) taosFsyncFile(TSDB_FILE_FD(tf)) #define TSDB_FILE_STATE(tf) ((tf)->state) #define TSDB_FILE_SET_STATE(tf, s) ((tf)->state = (s)) #define TSDB_FILE_IS_OK(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_OK) @@ -42,6 +43,7 @@ typedef enum { TSDB_FILE_HEAD = 0, TSDB_FILE_DATA, TSDB_FILE_LAST, TSDB_FILE_MAX, TSDB_FILE_META } TSDB_FILE_T; +#if 0 // =============== SMFile typedef struct { int64_t size; @@ -68,7 +70,7 @@ int tsdbApplyMFileChange(SMFile* from, SMFile* to); int tsdbCreateMFile(SMFile* pMFile, bool updateHeader); int tsdbUpdateMFileHeader(SMFile* pMFile); int tsdbLoadMFileHeader(SMFile* pMFile, SMFInfo* pInfo); -int tsdbScanAndTryFixMFile(STsdbRepo* pRepo); +int tsdbScanAndTryFixMFile(STsdb* pRepo); int tsdbEncodeMFInfo(void** buf, SMFInfo* pInfo); void* tsdbDecodeMFInfo(void* buf, SMFInfo* pInfo); @@ -96,7 +98,7 @@ static FORCE_INLINE void tsdbCloseMFile(SMFile* pMFile) { static FORCE_INLINE int64_t tsdbSeekMFile(SMFile* pMFile, int64_t offset, int whence) { ASSERT(TSDB_FILE_OPENED(pMFile)); - int64_t loffset = taosLSeek(TSDB_FILE_FD(pMFile), offset, whence); + int64_t loffset = taosLSeekFile(TSDB_FILE_FD(pMFile), offset, whence); if (loffset < 0) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -108,7 +110,7 @@ static FORCE_INLINE int64_t tsdbSeekMFile(SMFile* pMFile, int64_t offset, int wh static FORCE_INLINE int64_t tsdbWriteMFile(SMFile* pMFile, void* buf, int64_t nbyte) { ASSERT(TSDB_FILE_OPENED(pMFile)); - int64_t nwrite = taosWrite(pMFile->fd, buf, nbyte); + int64_t nwrite = taosWriteFile(pMFile->fd, buf, nbyte); if (nwrite < nbyte) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -150,7 +152,7 @@ static FORCE_INLINE int tsdbRemoveMFile(SMFile* pMFile) { return tfsremove(TSDB_ static FORCE_INLINE int64_t tsdbReadMFile(SMFile* pMFile, void* buf, int64_t nbyte) { ASSERT(TSDB_FILE_OPENED(pMFile)); - int64_t nread = taosRead(pMFile->fd, buf, nbyte); + int64_t nread = taosReadFile(pMFile->fd, buf, nbyte); if (nread < 0) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -159,6 +161,8 @@ static FORCE_INLINE int64_t tsdbReadMFile(SMFile* pMFile, void* buf, int64_t nby return nread; } +#endif + // =============== SDFile typedef struct { uint32_t magic; @@ -210,7 +214,7 @@ static FORCE_INLINE void tsdbCloseDFile(SDFile* pDFile) { static FORCE_INLINE int64_t tsdbSeekDFile(SDFile* pDFile, int64_t offset, int whence) { ASSERT(TSDB_FILE_OPENED(pDFile)); - int64_t loffset = taosLSeek(TSDB_FILE_FD(pDFile), offset, whence); + int64_t loffset = taosLSeekFile(TSDB_FILE_FD(pDFile), offset, whence); if (loffset < 0) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -222,7 +226,7 @@ static FORCE_INLINE int64_t tsdbSeekDFile(SDFile* pDFile, int64_t offset, int wh static FORCE_INLINE int64_t tsdbWriteDFile(SDFile* pDFile, void* buf, int64_t nbyte) { ASSERT(TSDB_FILE_OPENED(pDFile)); - int64_t nwrite = taosWrite(pDFile->fd, buf, nbyte); + int64_t nwrite = taosWriteFile(pDFile->fd, buf, nbyte); if (nwrite < nbyte) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -264,7 +268,7 @@ static FORCE_INLINE int tsdbRemoveDFile(SDFile* pDFile) { return tfsremove(TSDB_ static FORCE_INLINE int64_t tsdbReadDFile(SDFile* pDFile, void* buf, int64_t nbyte) { ASSERT(TSDB_FILE_OPENED(pDFile)); - int64_t nread = taosRead(pDFile->fd, buf, nbyte); + int64_t nread = taosReadFile(pDFile->fd, buf, nbyte); if (nread < 0) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -316,7 +320,7 @@ void* tsdbDecodeDFileSetEx(void* buf, SDFileSet* pSet); int tsdbApplyDFileSetChange(SDFileSet* from, SDFileSet* to); int tsdbCreateDFileSet(SDFileSet* pSet, bool updateHeader); int tsdbUpdateDFileSetHeader(SDFileSet* pSet); -int tsdbScanAndTryFixDFileSet(STsdbRepo *pRepo, SDFileSet* pSet); +int tsdbScanAndTryFixDFileSet(STsdb* pRepo, SDFileSet* pSet); static FORCE_INLINE void tsdbCloseDFileSet(SDFileSet* pSet) { for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { @@ -366,5 +370,4 @@ static FORCE_INLINE bool tsdbFSetIsOk(SDFileSet* pSet) { return true; } -#endif #endif /* _TS_TSDB_FILE_H_ */ \ No newline at end of file diff --git a/source/dnode/vnode/tsdb/inc/tsdbMemory.h b/source/dnode/vnode/tsdb/inc/tsdbMemory.h new file mode 100644 index 0000000000..1fc4cd9e52 --- /dev/null +++ b/source/dnode/vnode/tsdb/inc/tsdbMemory.h @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_TSDB_MEMORY_H_ +#define _TD_TSDB_MEMORY_H_ + +static void * taosTMalloc(size_t size); +static void * taosTCalloc(size_t nmemb, size_t size); +static void * taosTRealloc(void *ptr, size_t size); +static void * taosTZfree(void *ptr); +static size_t taosTSizeof(void *ptr); +static void taosTMemset(void *ptr, int c); + +static FORCE_INLINE void *taosTMalloc(size_t size) { + if (size <= 0) return NULL; + + void *ret = malloc(size + sizeof(size_t)); + if (ret == NULL) return NULL; + + *(size_t *)ret = size; + + return (void *)((char *)ret + sizeof(size_t)); +} + +static FORCE_INLINE void *taosTCalloc(size_t nmemb, size_t size) { + size_t tsize = nmemb * size; + void * ret = taosTMalloc(tsize); + if (ret == NULL) return NULL; + + taosTMemset(ret, 0); + return ret; +} + +static FORCE_INLINE size_t taosTSizeof(void *ptr) { return (ptr) ? (*(size_t *)((char *)ptr - sizeof(size_t))) : 0; } + +static FORCE_INLINE void taosTMemset(void *ptr, int c) { memset(ptr, c, taosTSizeof(ptr)); } + +static FORCE_INLINE void * taosTRealloc(void *ptr, size_t size) { + if (ptr == NULL) return taosTMalloc(size); + + if (size <= taosTSizeof(ptr)) return ptr; + + void * tptr = (void *)((char *)ptr - sizeof(size_t)); + size_t tsize = size + sizeof(size_t); + void* tptr1 = realloc(tptr, tsize); + if (tptr1 == NULL) return NULL; + tptr = tptr1; + + *(size_t *)tptr = size; + + return (void *)((char *)tptr + sizeof(size_t)); +} + +static FORCE_INLINE void* taosTZfree(void* ptr) { + if (ptr) { + free((void*)((char*)ptr - sizeof(size_t))); + } + return NULL; +} + + +#endif /* _TD_TSDB_MEMORY_H_ */ \ No newline at end of file diff --git a/source/dnode/vnode/tsdb/inc/tsdbReadImpl.h b/source/dnode/vnode/tsdb/inc/tsdbReadImpl.h index a9bd76c2b1..3fb235e7dd 100644 --- a/source/dnode/vnode/tsdb/inc/tsdbReadImpl.h +++ b/source/dnode/vnode/tsdb/inc/tsdbReadImpl.h @@ -15,14 +15,14 @@ #ifndef _TD_TSDB_READ_IMPL_H_ #define _TD_TSDB_READ_IMPL_H_ -#if 0 +#include "os.h" #include "tfs.h" #include "tsdb.h" -#include "os.h" #include "tsdbFile.h" #include "tskiplist.h" -#include "tsdbMeta.h" +#include "tsdbMemory.h" +#include "common.h" typedef struct SReadH SReadH; @@ -91,7 +91,7 @@ typedef struct { } SBlockData; struct SReadH { - STsdbRepo * pRepo; + STsdb * pRepo; SDFileSet rSet; // FSET to read SArray * aBlkIdx; // SBlockIdx array STable * pTable; // table to read @@ -116,7 +116,7 @@ struct SReadH { #define TSDB_BLOCK_STATIS_SIZE(ncols) (sizeof(SBlockData) + sizeof(SBlockCol) * (ncols) + sizeof(TSCKSUM)) -int tsdbInitReadH(SReadH *pReadh, STsdbRepo *pRepo); +int tsdbInitReadH(SReadH *pReadh, STsdb *pRepo); void tsdbDestroyReadH(SReadH *pReadh); int tsdbSetAndOpenReadFSet(SReadH *pReadh, SDFileSet *pSet); void tsdbCloseAndUnsetFSet(SReadH *pReadh); @@ -151,6 +151,4 @@ static FORCE_INLINE int tsdbMakeRoom(void **ppBuf, size_t size) { return 0; } -#endif - #endif /*_TD_TSDB_READ_IMPL_H_*/ diff --git a/source/dnode/vnode/tsdb/src/tsdbFS.c b/source/dnode/vnode/tsdb/src/tsdbFS.c index a40e67ca59..210a272b13 100644 --- a/source/dnode/vnode/tsdb/src/tsdbFS.c +++ b/source/dnode/vnode/tsdb/src/tsdbFS.c @@ -13,9 +13,9 @@ * along with this program. If not, see . */ -#include "os.h" -#include "tsdbint.h" #include +#include "os.h" +#include "tsdbDef.h" typedef enum { TSDB_TXN_TEMP_FILE = 0, TSDB_TXN_CURR_FILE } TSDB_TXN_FILE_T; static const char *tsdbTxnFname[] = {"current.t", "current"}; @@ -26,16 +26,16 @@ static void tsdbResetFSStatus(SFSStatus *pStatus); static int tsdbSaveFSStatus(SFSStatus *pStatus, int vid); static void tsdbApplyFSTxnOnDisk(SFSStatus *pFrom, SFSStatus *pTo); static void tsdbGetTxnFname(int repoid, TSDB_TXN_FILE_T ftype, char fname[]); -static int tsdbOpenFSFromCurrent(STsdbRepo *pRepo); -static int tsdbScanAndTryFixFS(STsdbRepo *pRepo); -static int tsdbScanRootDir(STsdbRepo *pRepo); -static int tsdbScanDataDir(STsdbRepo *pRepo); +static int tsdbOpenFSFromCurrent(STsdb *pRepo); +static int tsdbScanAndTryFixFS(STsdb *pRepo); +static int tsdbScanRootDir(STsdb *pRepo); +static int tsdbScanDataDir(STsdb *pRepo); static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf); -static int tsdbRestoreCurrent(STsdbRepo *pRepo); +static int tsdbRestoreCurrent(STsdb *pRepo); static int tsdbComparTFILE(const void *arg1, const void *arg2); -static void tsdbScanAndTryFixDFilesHeader(STsdbRepo *pRepo, int32_t *nExpired); -static int tsdbProcessExpiredFS(STsdbRepo *pRepo); -static int tsdbCreateMeta(STsdbRepo *pRepo); +static void tsdbScanAndTryFixDFilesHeader(STsdb *pRepo, int32_t *nExpired); +static int tsdbProcessExpiredFS(STsdb *pRepo); +static int tsdbCreateMeta(STsdb *pRepo); // For backward compatibility // ================== CURRENT file header info @@ -240,7 +240,7 @@ void *tsdbFreeFS(STsdbFS *pfs) { return NULL; } -static int tsdbProcessExpiredFS(STsdbRepo *pRepo) { +static int tsdbProcessExpiredFS(STsdb *pRepo) { tsdbStartFSTxn(pRepo, 0, 0); if (tsdbCreateMeta(pRepo) < 0) { tsdbError("vgId:%d failed to create meta since %s", REPO_ID(pRepo), tstrerror(terrno)); @@ -259,7 +259,7 @@ static int tsdbProcessExpiredFS(STsdbRepo *pRepo) { return 0; } -static int tsdbCreateMeta(STsdbRepo *pRepo) { +static int tsdbCreateMeta(STsdb *pRepo) { STsdbFS *pfs = REPO_FS(pRepo); SMFile * pOMFile = pfs->cstatus->pmf; SMFile mf; @@ -296,7 +296,7 @@ static int tsdbCreateMeta(STsdbRepo *pRepo) { return 0; } -int tsdbOpenFS(STsdbRepo *pRepo) { +int tsdbOpenFS(STsdb *pRepo) { STsdbFS *pfs = REPO_FS(pRepo); char current[TSDB_FILENAME_LEN] = "\0"; int nExpired = 0; @@ -338,12 +338,12 @@ int tsdbOpenFS(STsdbRepo *pRepo) { return 0; } -void tsdbCloseFS(STsdbRepo *pRepo) { +void tsdbCloseFS(STsdb *pRepo) { // Do nothing } // Start a new transaction to modify the file system -void tsdbStartFSTxn(STsdbRepo *pRepo, int64_t pointsAdd, int64_t storageAdd) { +void tsdbStartFSTxn(STsdb *pRepo, int64_t pointsAdd, int64_t storageAdd) { STsdbFS *pfs = REPO_FS(pRepo); ASSERT(pfs->intxn == false); @@ -361,7 +361,7 @@ void tsdbStartFSTxn(STsdbRepo *pRepo, int64_t pointsAdd, int64_t storageAdd) { void tsdbUpdateFSTxnMeta(STsdbFS *pfs, STsdbFSMeta *pMeta) { pfs->nstatus->meta = *pMeta; } -int tsdbEndFSTxn(STsdbRepo *pRepo) { +int tsdbEndFSTxn(STsdb *pRepo) { STsdbFS *pfs = REPO_FS(pRepo); ASSERT(FS_IN_TXN(pfs)); SFSStatus *pStatus; @@ -372,7 +372,7 @@ int tsdbEndFSTxn(STsdbRepo *pRepo) { return -1; } - // Make new + // Make new tsdbWLockFS(pfs); pStatus = pfs->cstatus; pfs->cstatus = pfs->nstatus; @@ -642,7 +642,7 @@ static void tsdbGetTxnFname(int repoid, TSDB_TXN_FILE_T ftype, char fname[]) { snprintf(fname, TSDB_FILENAME_LEN, "%s/vnode/vnode%d/tsdb/%s", TFS_PRIMARY_PATH(), repoid, tsdbTxnFname[ftype]); } -static int tsdbOpenFSFromCurrent(STsdbRepo *pRepo) { +static int tsdbOpenFSFromCurrent(STsdb *pRepo) { STsdbFS * pfs = REPO_FS(pRepo); int fd = -1; void * buffer = NULL; @@ -737,7 +737,7 @@ _err: } // Scan and try to fix incorrect files -static int tsdbScanAndTryFixFS(STsdbRepo *pRepo) { +static int tsdbScanAndTryFixFS(STsdb *pRepo) { STsdbFS * pfs = REPO_FS(pRepo); SFSStatus *pStatus = pfs->cstatus; @@ -763,7 +763,7 @@ static int tsdbScanAndTryFixFS(STsdbRepo *pRepo) { return 0; } -int tsdbLoadMetaCache(STsdbRepo *pRepo, bool recoverMeta) { +int tsdbLoadMetaCache(STsdb *pRepo, bool recoverMeta) { char tbuf[128]; STsdbFS * pfs = REPO_FS(pRepo); SMFile mf; @@ -899,7 +899,7 @@ int tsdbLoadMetaCache(STsdbRepo *pRepo, bool recoverMeta) { return 0; } -static int tsdbScanRootDir(STsdbRepo *pRepo) { +static int tsdbScanRootDir(STsdb *pRepo) { char rootDir[TSDB_FILENAME_LEN]; char bname[TSDB_FILENAME_LEN]; STsdbFS * pfs = REPO_FS(pRepo); @@ -933,7 +933,7 @@ static int tsdbScanRootDir(STsdbRepo *pRepo) { return 0; } -static int tsdbScanDataDir(STsdbRepo *pRepo) { +static int tsdbScanDataDir(STsdb *pRepo) { char dataDir[TSDB_FILENAME_LEN]; char bname[TSDB_FILENAME_LEN]; STsdbFS * pfs = REPO_FS(pRepo); @@ -977,7 +977,7 @@ static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf) { return false; } -static int tsdbRestoreMeta(STsdbRepo *pRepo) { +static int tsdbRestoreMeta(STsdb *pRepo) { char rootDir[TSDB_FILENAME_LEN]; char bname[TSDB_FILENAME_LEN]; TDIR * tdir = NULL; @@ -1098,7 +1098,7 @@ static int tsdbRestoreMeta(STsdbRepo *pRepo) { return 0; } -static int tsdbRestoreDFileSet(STsdbRepo *pRepo) { +static int tsdbRestoreDFileSet(STsdb *pRepo) { char dataDir[TSDB_FILENAME_LEN]; char bname[TSDB_FILENAME_LEN]; TDIR * tdir = NULL; @@ -1220,9 +1220,10 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) { } pDFile->f = *pf; - + if (tsdbOpenDFile(pDFile, O_RDONLY) < 0) { - tsdbError("vgId:%d failed to open DFile %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile), tstrerror(terrno)); + tsdbError("vgId:%d failed to open DFile %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile), + tstrerror(terrno)); taosArrayDestroy(fArray); return -1; } @@ -1266,7 +1267,7 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) { return 0; } -static int tsdbRestoreCurrent(STsdbRepo *pRepo) { +static int tsdbRestoreCurrent(STsdb *pRepo) { // Loop to recover mfile if (tsdbRestoreMeta(pRepo) < 0) { tsdbError("vgId:%d failed to restore current since %s", REPO_ID(pRepo), tstrerror(terrno)); @@ -1317,7 +1318,7 @@ static int tsdbComparTFILE(const void *arg1, const void *arg2) { } } -static void tsdbScanAndTryFixDFilesHeader(STsdbRepo *pRepo, int32_t *nExpired) { +static void tsdbScanAndTryFixDFilesHeader(STsdb *pRepo, int32_t *nExpired) { STsdbFS * pfs = REPO_FS(pRepo); SFSStatus *pStatus = pfs->cstatus; SDFInfo info; diff --git a/source/dnode/vnode/tsdb/src/tsdbFile.c b/source/dnode/vnode/tsdb/src/tsdbFile.c index 0f13b6108f..a1c1b57b44 100644 --- a/source/dnode/vnode/tsdb/src/tsdbFile.c +++ b/source/dnode/vnode/tsdb/src/tsdbFile.c @@ -13,22 +13,23 @@ * along with this program. If not, see . */ -#include "tsdbint.h" +#include "tsdbDef.h" static const char *TSDB_FNAME_SUFFIX[] = { - "head", // TSDB_FILE_HEAD - "data", // TSDB_FILE_DATA - "last", // TSDB_FILE_LAST - "", // TSDB_FILE_MAX - "meta", // TSDB_FILE_META + "head", // TSDB_FILE_HEAD + "data", // TSDB_FILE_DATA + "last", // TSDB_FILE_LAST + "", // TSDB_FILE_MAX + "meta", // TSDB_FILE_META }; -static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, char *fname); -static int tsdbRollBackMFile(SMFile *pMFile); +static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, char *fname); +// static int tsdbRollBackMFile(SMFile *pMFile); static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo); static void *tsdbDecodeDFInfo(void *buf, SDFInfo *pInfo); static int tsdbRollBackDFile(SDFile *pDFile); +#if 0 // ============== SMFile void tsdbInitMFile(SMFile *pMFile, SDiskID did, int vid, uint32_t ver) { char fname[TSDB_FILENAME_LEN]; @@ -185,7 +186,7 @@ int tsdbLoadMFileHeader(SMFile *pMFile, SMFInfo *pInfo) { return 0; } -int tsdbScanAndTryFixMFile(STsdbRepo *pRepo) { +int tsdbScanAndTryFixMFile(STsdb *pRepo) { SMFile * pMFile = pRepo->fs->cstatus->pmf; struct stat mfstat; SMFile mf; @@ -291,6 +292,8 @@ static int tsdbRollBackMFile(SMFile *pMFile) { return 0; } +#endif + // ============== Operations on SDFile void tsdbInitDFile(SDFile *pDFile, SDiskID did, int vid, int fid, uint32_t ver, TSDB_FILE_T ftype) { char fname[TSDB_FILENAME_LEN]; @@ -397,7 +400,7 @@ int tsdbUpdateDFileHeader(SDFile *pDFile) { } void *ptr = buf; - taosEncodeFixedU32(&ptr, TSDB_FS_VERSION); + taosEncodeFixedU32(&ptr, 0); tsdbEncodeDFInfo(&ptr, &(pDFile->info)); taosCalcChecksumAppend(0, (uint8_t *)buf, TSDB_FILE_HEAD_SIZE); @@ -433,7 +436,7 @@ int tsdbLoadDFileHeader(SDFile *pDFile, SDFInfo *pInfo) { return 0; } -static int tsdbScanAndTryFixDFile(STsdbRepo *pRepo, SDFile *pDFile) { +static int tsdbScanAndTryFixDFile(STsdb *pRepo, SDFile *pDFile) { struct stat dfstat; SDFile df; @@ -442,7 +445,7 @@ static int tsdbScanAndTryFixDFile(STsdbRepo *pRepo, SDFile *pDFile) { if (access(TSDB_FILE_FULL_NAME(pDFile), F_OK) != 0) { tsdbError("vgId:%d data file %s not exit, report to upper layer to fix it", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile)); - pRepo->state |= TSDB_STATE_BAD_DATA; + // pRepo->state |= TSDB_STATE_BAD_DATA; TSDB_FILE_SET_STATE(pDFile, TSDB_FILE_STATE_BAD); return 0; } @@ -457,7 +460,7 @@ static int tsdbScanAndTryFixDFile(STsdbRepo *pRepo, SDFile *pDFile) { return -1; } - if (taosFtruncate(df.fd, df.info.size) < 0) { + if (taosFtruncateFile(df.fd, df.info.size) < 0) { terrno = TAOS_SYSTEM_ERROR(errno); tsdbCloseDFile(&df); return -1; @@ -474,7 +477,7 @@ static int tsdbScanAndTryFixDFile(STsdbRepo *pRepo, SDFile *pDFile) { } else if (pDFile->info.size > dfstat.st_size) { tsdbError("vgId:%d data file %s has wrong size %" PRId64 " expected %" PRId64 ", report to upper layer to fix it", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile), dfstat.st_size, pDFile->info.size); - pRepo->state |= TSDB_STATE_BAD_DATA; + // pRepo->state |= TSDB_STATE_BAD_DATA; TSDB_FILE_SET_STATE(pDFile, TSDB_FILE_STATE_BAD); terrno = TSDB_CODE_TDB_FILE_CORRUPTED; return 0; @@ -538,7 +541,7 @@ static int tsdbRollBackDFile(SDFile *pDFile) { return -1; } - if (taosFtruncate(TSDB_FILE_FD(&df), pDFile->info.size) < 0) { + if (taosFtruncateFile(TSDB_FILE_FD(&df), pDFile->info.size) < 0) { terrno = TAOS_SYSTEM_ERROR(errno); tsdbCloseDFile(&df); return -1; @@ -651,7 +654,7 @@ int tsdbUpdateDFileSetHeader(SDFileSet *pSet) { return 0; } -int tsdbScanAndTryFixDFileSet(STsdbRepo *pRepo, SDFileSet *pSet) { +int tsdbScanAndTryFixDFileSet(STsdb *pRepo, SDFileSet *pSet) { for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { if (tsdbScanAndTryFixDFile(pRepo, TSDB_DFILE_IN_SET(pSet, ftype)) < 0) { return -1; diff --git a/source/dnode/vnode/tsdb/src/tsdbReadImpl.c b/source/dnode/vnode/tsdb/src/tsdbReadImpl.c index 74d41cce19..1a2b213031 100644 --- a/source/dnode/vnode/tsdb/src/tsdbReadImpl.c +++ b/source/dnode/vnode/tsdb/src/tsdbReadImpl.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include "tsdbint.h" +#include "tsdbDef.h" #define TSDB_KEY_COL_OFFSET 0 @@ -25,8 +25,9 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int3 static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols, int16_t *colIds, int numOfColIds); static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBlockCol *pBlockCol, SDataCol *pDataCol); +static STSchema *tsdbGetTableSchemaImpl(STable *pTable, bool lock, bool copy, int32_t version) { return NULL; } -int tsdbInitReadH(SReadH *pReadh, STsdbRepo *pRepo) { +int tsdbInitReadH(SReadH *pReadh, STsdb *pRepo) { ASSERT(pReadh != NULL && pRepo != NULL); STsdbCfg *pCfg = REPO_CFG(pRepo); @@ -259,7 +260,9 @@ int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) { for (int i = 1; i < pBlock->numOfSubBlocks; i++) { iBlock++; if (tsdbLoadBlockDataImpl(pReadh, iBlock, pReadh->pDCols[1]) < 0) return -1; - if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL, update != TD_ROW_PARTIAL_UPDATE) < 0) return -1; + if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL, + update != TD_ROW_PARTIAL_UPDATE) < 0) + return -1; } ASSERT(pReadh->pDCols[0]->numOfRows == pBlock->numOfRows); @@ -286,7 +289,9 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, for (int i = 1; i < pBlock->numOfSubBlocks; i++) { iBlock++; if (tsdbLoadBlockDataColsImpl(pReadh, iBlock, pReadh->pDCols[1], colIds, numOfColsIds) < 0) return -1; - if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL, update != TD_ROW_PARTIAL_UPDATE) < 0) return -1; + if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL, + update != TD_ROW_PARTIAL_UPDATE) < 0) + return -1; } ASSERT(pReadh->pDCols[0]->numOfRows == pBlock->numOfRows); @@ -524,7 +529,7 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32 if (comp) { // Need to decompress int tlen = (*(tDataTypes[pDataCol->type].decompFunc))(content, len - sizeof(TSCKSUM), numOfRows, pDataCol->pData, - pDataCol->spaceSize, comp, buffer, bufferSize); + pDataCol->spaceSize, comp, buffer, bufferSize); if (tlen <= 0) { tsdbError("Failed to decompress column, file corrupted, len:%d comp:%d numOfRows:%d maxPoints:%d bufferSize:%d", len, comp, numOfRows, maxPoints, bufferSize); @@ -624,9 +629,9 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols * static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBlockCol *pBlockCol, SDataCol *pDataCol) { ASSERT(pDataCol->colId == pBlockCol->colId); - STsdbRepo *pRepo = TSDB_READ_REPO(pReadh); - STsdbCfg * pCfg = REPO_CFG(pRepo); - int tsize = pDataCol->bytes * pBlock->numOfRows + COMP_OVERFLOW_BYTES; + STsdb * pRepo = TSDB_READ_REPO(pReadh); + STsdbCfg *pCfg = REPO_CFG(pRepo); + int tsize = pDataCol->bytes * pBlock->numOfRows + COMP_OVERFLOW_BYTES; if (tsdbMakeRoom((void **)(&TSDB_READ_BUF(pReadh)), pBlockCol->len) < 0) return -1; if (tsdbMakeRoom((void **)(&TSDB_READ_COMP_BUF(pReadh)), tsize) < 0) return -1; @@ -662,3 +667,4 @@ static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBloc return 0; } +