From 4cc10cfaef16363935a287005dd86ce14e2810b3 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 6 Jan 2022 09:09:24 +0000 Subject: [PATCH] more --- include/dnode/vnode/tsdb2/{tsdb2.h => tsdb.h} | 14 +++- include/os/os.h | 1 + source/dnode/vnode/tsdb2/CMakeLists.txt | 2 +- source/dnode/vnode/tsdb2/inc/tsdbFile.h | 24 +++--- source/dnode/vnode/tsdb2/inc/tsdbHealth.h | 5 +- source/dnode/vnode/tsdb2/inc/tsdbMemory.h | 74 +++++++++++++++++++ source/dnode/vnode/tsdb2/inc/tsdbint.h | 15 +--- source/dnode/vnode/tsdb2/src/tsdbHealth.c | 68 ++++++++--------- source/dnode/vnode/tsdb2/src/tsdbRead.c | 1 - 9 files changed, 142 insertions(+), 62 deletions(-) rename include/dnode/vnode/tsdb2/{tsdb2.h => tsdb.h} (98%) create mode 100644 source/dnode/vnode/tsdb2/inc/tsdbMemory.h diff --git a/include/dnode/vnode/tsdb2/tsdb2.h b/include/dnode/vnode/tsdb2/tsdb.h similarity index 98% rename from include/dnode/vnode/tsdb2/tsdb2.h rename to include/dnode/vnode/tsdb2/tsdb.h index d9c93e8cce..c2906ae4ca 100644 --- a/include/dnode/vnode/tsdb2/tsdb2.h +++ b/include/dnode/vnode/tsdb2/tsdb.h @@ -20,11 +20,11 @@ #include #include "taosdef.h" -#include "taosmsg.h" +#include "tmsg.h" #include "tarray.h" #include "tdataformat.h" #include "tname.h" -#include "hash.h" +#include "thash.h" #include "tlockfree.h" #include "tlist.h" @@ -46,6 +46,16 @@ extern "C" { #define TSDB_STATE_BAD_META 0x1 #define TSDB_STATE_BAD_DATA 0x2 +typedef struct SDataStatis { + int16_t colId; + int64_t sum; + int64_t max; + int64_t min; + int16_t maxIndex; + int16_t minIndex; + int16_t numOfNull; +} SDataStatis; + // --------- TSDB APPLICATION HANDLE DEFINITION typedef struct { void *appH; diff --git a/include/os/os.h b/include/os/os.h index 500375adf2..2e6d6afcb0 100644 --- a/include/os/os.h +++ b/include/os/os.h @@ -27,6 +27,7 @@ extern "C" { #include #include #include +#include #include #include #include diff --git a/source/dnode/vnode/tsdb2/CMakeLists.txt b/source/dnode/vnode/tsdb2/CMakeLists.txt index 386ba8119c..23942eeac8 100644 --- a/source/dnode/vnode/tsdb2/CMakeLists.txt +++ b/source/dnode/vnode/tsdb2/CMakeLists.txt @@ -3,7 +3,7 @@ add_library(tsdb STATIC ${TSDB_SRC}) target_include_directories( tsdb - PUBLIC "${CMAKE_SOURCE_DIR}/include/dnode/vnode/tsdb" + PUBLIC "${CMAKE_SOURCE_DIR}/include/dnode/vnode/tsdb2" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" ) target_link_libraries(tsdb os util common tfs) \ No newline at end of file diff --git a/source/dnode/vnode/tsdb2/inc/tsdbFile.h b/source/dnode/vnode/tsdb2/inc/tsdbFile.h index 6d1e0cf246..3e1a666f11 100644 --- a/source/dnode/vnode/tsdb2/inc/tsdbFile.h +++ b/source/dnode/vnode/tsdb2/inc/tsdbFile.h @@ -16,6 +16,8 @@ #ifndef _TS_TSDB_FILE_H_ #define _TS_TSDB_FILE_H_ +#include "os.h" + #define TSDB_FILE_HEAD_SIZE 512 #define TSDB_FILE_DELIMITER 0xF00AFA0F #define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF @@ -32,7 +34,7 @@ #define TSDB_FILE_SET_CLOSED(f) (TSDB_FILE_FD(f) = -1) #define TSDB_FILE_LEVEL(tf) TFILE_LEVEL(TSDB_FILE_F(tf)) #define TSDB_FILE_ID(tf) TFILE_ID(TSDB_FILE_F(tf)) -#define TSDB_FILE_FSYNC(tf) taosFsync(TSDB_FILE_FD(tf)) +#define TSDB_FILE_FSYNC(tf) taosFsyncFile(TSDB_FILE_FD(tf)) #define TSDB_FILE_STATE(tf) ((tf)->state) #define TSDB_FILE_SET_STATE(tf, s) ((tf)->state = (s)) #define TSDB_FILE_IS_OK(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_OK) @@ -108,7 +110,7 @@ static FORCE_INLINE void tsdbCloseMFile(SMFile* pMFile) { static FORCE_INLINE int64_t tsdbSeekMFile(SMFile* pMFile, int64_t offset, int whence) { ASSERT(TSDB_FILE_OPENED(pMFile)); - int64_t loffset = taosLSeek(TSDB_FILE_FD(pMFile), offset, whence); + int64_t loffset = taosLSeekFile(TSDB_FILE_FD(pMFile), offset, whence); if (loffset < 0) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -120,7 +122,7 @@ static FORCE_INLINE int64_t tsdbSeekMFile(SMFile* pMFile, int64_t offset, int wh static FORCE_INLINE int64_t tsdbWriteMFile(SMFile* pMFile, void* buf, int64_t nbyte) { ASSERT(TSDB_FILE_OPENED(pMFile)); - int64_t nwrite = taosWrite(pMFile->fd, buf, nbyte); + int64_t nwrite = taosWriteFile(pMFile->fd, buf, nbyte); if (nwrite < nbyte) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -162,7 +164,7 @@ static FORCE_INLINE int tsdbRemoveMFile(SMFile* pMFile) { return tfsremove(TSDB_ static FORCE_INLINE int64_t tsdbReadMFile(SMFile* pMFile, void* buf, int64_t nbyte) { ASSERT(TSDB_FILE_OPENED(pMFile)); - int64_t nread = taosRead(pMFile->fd, buf, nbyte); + int64_t nread = taosReadFile(pMFile->fd, buf, nbyte); if (nread < 0) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -223,7 +225,7 @@ static FORCE_INLINE void tsdbCloseDFile(SDFile* pDFile) { static FORCE_INLINE int64_t tsdbSeekDFile(SDFile* pDFile, int64_t offset, int whence) { ASSERT(TSDB_FILE_OPENED(pDFile)); - int64_t loffset = taosLSeek(TSDB_FILE_FD(pDFile), offset, whence); + int64_t loffset = taosLSeekFile(TSDB_FILE_FD(pDFile), offset, whence); if (loffset < 0) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -235,7 +237,7 @@ static FORCE_INLINE int64_t tsdbSeekDFile(SDFile* pDFile, int64_t offset, int wh static FORCE_INLINE int64_t tsdbWriteDFile(SDFile* pDFile, void* buf, int64_t nbyte) { ASSERT(TSDB_FILE_OPENED(pDFile)); - int64_t nwrite = taosWrite(pDFile->fd, buf, nbyte); + int64_t nwrite = taosWriteFile(pDFile->fd, buf, nbyte); if (nwrite < nbyte) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -277,7 +279,7 @@ static FORCE_INLINE int tsdbRemoveDFile(SDFile* pDFile) { return tfsremove(TSDB_ static FORCE_INLINE int64_t tsdbReadDFile(SDFile* pDFile, void* buf, int64_t nbyte) { ASSERT(TSDB_FILE_OPENED(pDFile)); - int64_t nread = taosRead(pDFile->fd, buf, nbyte); + int64_t nread = taosReadFile(pDFile->fd, buf, nbyte); if (nread < 0) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -298,10 +300,10 @@ static FORCE_INLINE int tsdbCopyDFile(SDFile* pSrc, SDFile* pDest) { // =============== SDFileSet typedef struct { - int fid; - int state; + int fid; + int state; uint16_t ver; // fset version - SDFile files[TSDB_FILE_MAX]; + SDFile files[TSDB_FILE_MAX]; } SDFileSet; typedef enum { @@ -347,7 +349,7 @@ void* tsdbDecodeDFileSetEx(void* buf, SDFileSet* pSet); int tsdbApplyDFileSetChange(SDFileSet* from, SDFileSet* to); int tsdbCreateDFileSet(SDFileSet* pSet, bool updateHeader); int tsdbUpdateDFileSetHeader(SDFileSet* pSet); -int tsdbScanAndTryFixDFileSet(STsdbRepo *pRepo, SDFileSet* pSet); +int tsdbScanAndTryFixDFileSet(STsdbRepo* pRepo, SDFileSet* pSet); static FORCE_INLINE void tsdbCloseDFileSet(SDFileSet* pSet) { ASSERT_TSDB_FSET_NFILES_VALID(pSet); diff --git a/source/dnode/vnode/tsdb2/inc/tsdbHealth.h b/source/dnode/vnode/tsdb2/inc/tsdbHealth.h index 324f4312e0..dfb61b79ac 100644 --- a/source/dnode/vnode/tsdb2/inc/tsdbHealth.h +++ b/source/dnode/vnode/tsdb2/inc/tsdbHealth.h @@ -16,7 +16,10 @@ #ifndef _TD_TSDB_HEALTH_H_ #define _TD_TSDB_HEALTH_H_ -bool tsdbUrgeQueryFree(STsdbRepo* pRepo); +#include "os.h" +#include "tsdb.h" + +bool tsdbUrgeQueryFree(STsdbRepo* pRepo); int32_t tsdbInsertNewBlock(STsdbRepo* pRepo); bool tsdbIdleMemEnough(); diff --git a/source/dnode/vnode/tsdb2/inc/tsdbMemory.h b/source/dnode/vnode/tsdb2/inc/tsdbMemory.h new file mode 100644 index 0000000000..1fc4cd9e52 --- /dev/null +++ b/source/dnode/vnode/tsdb2/inc/tsdbMemory.h @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_TSDB_MEMORY_H_ +#define _TD_TSDB_MEMORY_H_ + +static void * taosTMalloc(size_t size); +static void * taosTCalloc(size_t nmemb, size_t size); +static void * taosTRealloc(void *ptr, size_t size); +static void * taosTZfree(void *ptr); +static size_t taosTSizeof(void *ptr); +static void taosTMemset(void *ptr, int c); + +static FORCE_INLINE void *taosTMalloc(size_t size) { + if (size <= 0) return NULL; + + void *ret = malloc(size + sizeof(size_t)); + if (ret == NULL) return NULL; + + *(size_t *)ret = size; + + return (void *)((char *)ret + sizeof(size_t)); +} + +static FORCE_INLINE void *taosTCalloc(size_t nmemb, size_t size) { + size_t tsize = nmemb * size; + void * ret = taosTMalloc(tsize); + if (ret == NULL) return NULL; + + taosTMemset(ret, 0); + return ret; +} + +static FORCE_INLINE size_t taosTSizeof(void *ptr) { return (ptr) ? (*(size_t *)((char *)ptr - sizeof(size_t))) : 0; } + +static FORCE_INLINE void taosTMemset(void *ptr, int c) { memset(ptr, c, taosTSizeof(ptr)); } + +static FORCE_INLINE void * taosTRealloc(void *ptr, size_t size) { + if (ptr == NULL) return taosTMalloc(size); + + if (size <= taosTSizeof(ptr)) return ptr; + + void * tptr = (void *)((char *)ptr - sizeof(size_t)); + size_t tsize = size + sizeof(size_t); + void* tptr1 = realloc(tptr, tsize); + if (tptr1 == NULL) return NULL; + tptr = tptr1; + + *(size_t *)tptr = size; + + return (void *)((char *)tptr + sizeof(size_t)); +} + +static FORCE_INLINE void* taosTZfree(void* ptr) { + if (ptr) { + free((void*)((char*)ptr - sizeof(size_t))); + } + return NULL; +} + + +#endif /* _TD_TSDB_MEMORY_H_ */ \ No newline at end of file diff --git a/source/dnode/vnode/tsdb2/inc/tsdbint.h b/source/dnode/vnode/tsdb2/inc/tsdbint.h index a7f08e61ed..52fe6cdcbf 100644 --- a/source/dnode/vnode/tsdb2/inc/tsdbint.h +++ b/source/dnode/vnode/tsdb2/inc/tsdbint.h @@ -16,15 +16,6 @@ #ifndef _TD_TSDB_INT_H_ #define _TD_TSDB_INT_H_ -// // TODO: remove the include -// #include -// #include -// #include -// #include -// #include -// #include -// #include -// #include #include "os.h" #include "tlog.h" @@ -34,13 +25,13 @@ #include "tskiplist.h" #include "tdataformat.h" #include "tcoding.h" -#include "tscompression.h" +#include "tcompression.h" #include "tlockfree.h" #include "tlist.h" -#include "hash.h" +#include "thash.h" #include "tarray.h" #include "tfs.h" -#include "tsocket.h" +#include "tsdbMemory.h" #include "tsdb.h" diff --git a/source/dnode/vnode/tsdb2/src/tsdbHealth.c b/source/dnode/vnode/tsdb2/src/tsdbHealth.c index 8198c48033..18d042ff60 100644 --- a/source/dnode/vnode/tsdb2/src/tsdbHealth.c +++ b/source/dnode/vnode/tsdb2/src/tsdbHealth.c @@ -13,50 +13,49 @@ * along with this program. If not, see . */ +#include "tsdbHealth.h" #include "os.h" -#include "taosmsg.h" -#include "tarray.h" #include "query.h" +#include "tarray.h" #include "tglobal.h" #include "tlist.h" -#include "tsdbint.h" +#include "tmsg.h" #include "tsdbBuffer.h" #include "tsdbLog.h" -#include "tsdbHealth.h" -#include "ttimer.h" +#include "tsdbint.h" #include "tthread.h" +#include "ttimer.h" +// return malloc new block count +int32_t tsdbInsertNewBlock(STsdbRepo* pRepo) { + STsdbBufPool* pPool = pRepo->pPool; + int32_t cnt = 0; -// return malloc new block count -int32_t tsdbInsertNewBlock(STsdbRepo * pRepo) { - STsdbBufPool *pPool = pRepo->pPool; - int32_t cnt = 0; - - if(tsdbAllowNewBlock(pRepo)) { - STsdbBufBlock *pBufBlock = tsdbNewBufBlock(pPool->bufBlockSize); + if (tsdbAllowNewBlock(pRepo)) { + STsdbBufBlock* pBufBlock = tsdbNewBufBlock(pPool->bufBlockSize); if (pBufBlock) { - if (tdListAppend(pPool->bufBlockList, (void *)(&pBufBlock)) < 0) { - // append error - tsdbFreeBufBlock(pBufBlock); - } else { - pPool->nElasticBlocks ++; - cnt ++ ; - } + if (tdListAppend(pPool->bufBlockList, (void*)(&pBufBlock)) < 0) { + // append error + tsdbFreeBufBlock(pBufBlock); + } else { + pPool->nElasticBlocks++; + cnt++; + } } - } - return cnt; + } + return cnt; } // switch anther thread to run void* cbKillQueryFree(void* param) { - STsdbRepo* pRepo = (STsdbRepo*)param; + STsdbRepo* pRepo = (STsdbRepo*)param; // vnode - if(pRepo->appH.notifyStatus) { + if (pRepo->appH.notifyStatus) { pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_NOBLOCK, TSDB_CODE_SUCCESS); } - // free - if(pRepo->pthread){ + // free + if (pRepo->pthread) { void* p = pRepo->pthread; pRepo->pthread = NULL; free(p); @@ -66,15 +65,16 @@ void* cbKillQueryFree(void* param) { } // return true do free , false do nothing -bool tsdbUrgeQueryFree(STsdbRepo * pRepo) { +bool tsdbUrgeQueryFree(STsdbRepo* pRepo) { // check previous running - if(pRepo->pthread && taosThreadRunning(pRepo->pthread)) { - tsdbWarn("vgId:%d pre urge thread is runing. nBlocks=%d nElasticBlocks=%d", REPO_ID(pRepo), pRepo->pPool->nBufBlocks, pRepo->pPool->nElasticBlocks); + if (pRepo->pthread && taosThreadRunning(pRepo->pthread)) { + tsdbWarn("vgId:%d pre urge thread is runing. nBlocks=%d nElasticBlocks=%d", REPO_ID(pRepo), + pRepo->pPool->nBufBlocks, pRepo->pPool->nElasticBlocks); return false; } // create new pRepo->pthread = taosCreateThread(cbKillQueryFree, pRepo); - if(pRepo->pthread == NULL) { + if (pRepo->pthread == NULL) { tsdbError("vgId:%d create urge thread error.", REPO_ID(pRepo)); return false; } @@ -82,17 +82,17 @@ bool tsdbUrgeQueryFree(STsdbRepo * pRepo) { } bool tsdbAllowNewBlock(STsdbRepo* pRepo) { - int32_t nMaxElastic = pRepo->config.totalBlocks/3; + int32_t nMaxElastic = pRepo->config.totalBlocks / 3; STsdbBufPool* pPool = pRepo->pPool; - if(pPool->nElasticBlocks >= nMaxElastic) { - tsdbWarn("vgId:%d tsdbAllowNewBlock return fasle. nElasticBlock(%d) >= MaxElasticBlocks(%d)", REPO_ID(pRepo), pPool->nElasticBlocks, nMaxElastic); + if (pPool->nElasticBlocks >= nMaxElastic) { + tsdbWarn("vgId:%d tsdbAllowNewBlock return fasle. nElasticBlock(%d) >= MaxElasticBlocks(%d)", REPO_ID(pRepo), + pPool->nElasticBlocks, nMaxElastic); return false; } return true; } bool tsdbNoProblem(STsdbRepo* pRepo) { - if(listNEles(pRepo->pPool->bufBlockList) == 0) - return false; + if (listNEles(pRepo->pPool->bufBlockList) == 0) return false; return true; } \ No newline at end of file diff --git a/source/dnode/vnode/tsdb2/src/tsdbRead.c b/source/dnode/vnode/tsdb2/src/tsdbRead.c index 6d9030bf7f..eb166c9e0e 100644 --- a/source/dnode/vnode/tsdb2/src/tsdbRead.c +++ b/source/dnode/vnode/tsdb2/src/tsdbRead.c @@ -16,7 +16,6 @@ #include "os.h" #include "tdataformat.h" #include "tskiplist.h" -#include "tulog.h" #include "talgo.h" #include "tcompare.h" #include "exception.h"