From ae426947d3794c82fb613a171d63d70d6f80b6f8 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 6 Jan 2022 10:11:28 +0000 Subject: [PATCH] more integration --- include/common/tdataformat.h | 60 ++++++++------- include/os/osDef.h | 6 ++ source/dnode/vnode/tsdb2/inc/tsdbHealth.h | 18 ++--- source/dnode/vnode/tsdb2/inc/tsdbint.h | 20 ++--- source/dnode/vnode/tsdb2/src/tsdbBuffer.c | 39 +++++----- source/dnode/vnode/tsdb2/src/tsdbCommit.c | 77 ++++++++++--------- source/dnode/vnode/tsdb2/src/tsdbCompact.c | 6 +- source/dnode/vnode/tsdb2/src/tsdbFS.c | 10 +-- source/dnode/vnode/tsdb2/src/tsdbHealth.c | 6 +- source/dnode/vnode/tsdb2/src/tsdbMain.c | 16 ++-- source/dnode/vnode/tsdb2/src/tsdbMemTable.c | 82 +-------------------- source/dnode/vnode/tsdb2/src/tsdbReadImpl.c | 2 +- 12 files changed, 137 insertions(+), 205 deletions(-) diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index d33f0af13c..cc30cd78f5 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -31,13 +31,12 @@ extern "C" { memcpy(varDataVal(x), (str), __len); \ } while (0); -#define STR_TO_NET_VARSTR(x, str) \ - do { \ - VarDataLenT __len = (VarDataLenT)strlen(str); \ - *(VarDataLenT *)(x) = htons(__len); \ - memcpy(varDataVal(x), (str), __len); \ - } while (0); - +#define STR_TO_NET_VARSTR(x, str) \ + do { \ + VarDataLenT __len = (VarDataLenT)strlen(str); \ + *(VarDataLenT *)(x) = htons(__len); \ + memcpy(varDataVal(x), (str), __len); \ + } while (0); #define STR_WITH_MAXSIZE_TO_VARSTR(x, str, _maxs) \ do { \ @@ -71,11 +70,12 @@ typedef struct { // ----------------- TSDB SCHEMA DEFINITION typedef struct { - int version; // version - int numOfCols; // Number of columns appended - int tlen; // maximum length of a SDataRow without the header part (sizeof(VarDataOffsetT) + sizeof(VarDataLenT) + (bytes)) - uint16_t flen; // First part length in a SDataRow after the header part - uint16_t vlen; // pure value part length, excluded the overhead (bytes only) + int version; // version + int numOfCols; // Number of columns appended + int tlen; // maximum length of a SDataRow without the header part (sizeof(VarDataOffsetT) + sizeof(VarDataLenT) + + // (bytes)) + uint16_t flen; // First part length in a SDataRow after the header part + uint16_t vlen; // pure value part length, excluded the overhead (bytes only) STColumn columns[]; } STSchema; @@ -202,7 +202,6 @@ void tdFreeDataRow(SDataRow row); void tdInitDataRow(SDataRow row, STSchema *pSchema); SDataRow tdDataRowDup(SDataRow row); - // offset here not include dataRow header length static FORCE_INLINE int tdAppendDataColVal(SDataRow row, const void *value, bool isCopyVarData, int8_t type, int32_t offset) { @@ -228,7 +227,6 @@ static FORCE_INLINE int tdAppendDataColVal(SDataRow row, const void *value, bool return 0; } - // offset here not include dataRow header length static FORCE_INLINE int tdAppendColVal(SDataRow row, const void *value, int8_t type, int32_t offset) { return tdAppendDataColVal(row, value, true, type, offset); @@ -249,27 +247,28 @@ static FORCE_INLINE void *tdGetPtrToCol(SDataRow row, STSchema *pSchema, int idx static FORCE_INLINE void *tdGetColOfRowBySchema(SDataRow row, STSchema *pSchema, int idx) { int16_t offset = TD_DATA_ROW_HEAD_SIZE + pSchema->columns[idx].offset; - int8_t type = pSchema->columns[idx].type; + int8_t type = pSchema->columns[idx].type; return tdGetRowDataOfCol(row, type, offset); } static FORCE_INLINE bool tdIsColOfRowNullBySchema(SDataRow row, STSchema *pSchema, int idx) { int16_t offset = TD_DATA_ROW_HEAD_SIZE + pSchema->columns[idx].offset; - int8_t type = pSchema->columns[idx].type; + int8_t type = pSchema->columns[idx].type; return isNull(tdGetRowDataOfCol(row, type, offset), type); } static FORCE_INLINE void tdSetColOfRowNullBySchema(SDataRow row, STSchema *pSchema, int idx) { int16_t offset = TD_DATA_ROW_HEAD_SIZE + pSchema->columns[idx].offset; - int8_t type = pSchema->columns[idx].type; + int8_t type = pSchema->columns[idx].type; int16_t bytes = pSchema->columns[idx].bytes; setNull(tdGetRowDataOfCol(row, type, offset), type, bytes); } -static FORCE_INLINE void tdCopyColOfRowBySchema(SDataRow dst, STSchema *pDstSchema, int dstIdx, SDataRow src, STSchema *pSrcSchema, int srcIdx) { +static FORCE_INLINE void tdCopyColOfRowBySchema(SDataRow dst, STSchema *pDstSchema, int dstIdx, SDataRow src, + STSchema *pSrcSchema, int srcIdx) { int8_t type = pDstSchema->columns[dstIdx].type; assert(type == pSrcSchema->columns[srcIdx].type); void *pData = tdGetPtrToCol(dst, pDstSchema, dstIdx); @@ -319,7 +318,6 @@ static FORCE_INLINE void tdCopyColOfRowBySchema(SDataRow dst, STSchema *pDstSche } } - // ----------------- Data column structure typedef struct SDataCol { int8_t type; // column type @@ -339,7 +337,7 @@ static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; } int tdAllocMemForCol(SDataCol *pCol, int maxPoints); void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints); -int dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints); +int dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints); void dataColSetOffset(SDataCol *pCol, int nEle); bool isNEleNull(SDataCol *pCol, int nEle); @@ -367,15 +365,15 @@ static FORCE_INLINE int32_t dataColGetNEleLen(SDataCol *pDataCol, int rows) { } typedef struct { - int maxCols; // max number of columns - int maxPoints; // max number of points - int numOfRows; - int numOfCols; // Total number of cols - int sversion; // TODO: set sversion + int maxCols; // max number of columns + int maxPoints; // max number of points + int numOfRows; + int numOfCols; // Total number of cols + int sversion; // TODO: set sversion SDataCol *cols; } SDataCols; -#define keyCol(pCols) (&((pCols)->cols[0])) // Key column +#define keyCol(pCols) (&((pCols)->cols[0])) // Key column #define dataColsTKeyAt(pCols, idx) ((TKEY *)(keyCol(pCols)->pData))[(idx)] // the idx row of column-wised data #define dataColsKeyAt(pCols, idx) tdGetKey(dataColsTKeyAt(pCols, idx)) static FORCE_INLINE TKEY dataColsTKeyFirst(SDataCols *pCols) { @@ -548,7 +546,7 @@ SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder); static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, int8_t type, const void *value) { if (pBuilder->nCols >= pBuilder->tCols) { pBuilder->tCols *= 2; - SColIdx* pColIdx = (SColIdx *)realloc((void *)(pBuilder->pColIdx), sizeof(SColIdx) * pBuilder->tCols); + SColIdx *pColIdx = (SColIdx *)realloc((void *)(pBuilder->pColIdx), sizeof(SColIdx) * pBuilder->tCols); if (pColIdx == NULL) return -1; pBuilder->pColIdx = pColIdx; } @@ -563,7 +561,7 @@ static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, while (tlen > pBuilder->alloc - pBuilder->size) { pBuilder->alloc *= 2; } - void* buf = realloc(pBuilder->buf, pBuilder->alloc); + void *buf = realloc(pBuilder->buf, pBuilder->alloc); if (buf == NULL) return -1; pBuilder->buf = buf; } @@ -752,10 +750,10 @@ static FORCE_INLINE void tdGetColAppendDeltaLen(const void *value, int8_t colTyp typedef struct { int16_t colId; uint8_t colType; - char* colVal; + char * colVal; } SColInfo; -static FORCE_INLINE void setSColInfo(SColInfo* colInfo, int16_t colId, uint8_t colType, char* colVal) { +static FORCE_INLINE void setSColInfo(SColInfo *colInfo, int16_t colId, uint8_t colType, char *colVal) { colInfo->colId = colId; colInfo->colType = colType; colInfo->colVal = colVal; @@ -815,4 +813,4 @@ static FORCE_INLINE char *payloadNextCol(char *pCol) { return (char *)POINTER_SH } #endif -#endif /*_TD_COMMON_DATA_FORMAT_H_*/ +#endif /*_TD_COMMON_DATA_FORMAT_H_*/ diff --git a/include/os/osDef.h b/include/os/osDef.h index bb5395f548..040c4bc7e7 100644 --- a/include/os/osDef.h +++ b/include/os/osDef.h @@ -73,6 +73,12 @@ extern "C" { #endif +#ifndef WINDOWS + #ifndef O_BINARY + #define O_BINARY 0 + #endif +#endif + #define POINTER_SHIFT(p, b) ((void *)((char *)(p) + (b))) #define POINTER_DISTANCE(p1, p2) ((char *)(p1) - (char *)(p2)) diff --git a/source/dnode/vnode/tsdb2/inc/tsdbHealth.h b/source/dnode/vnode/tsdb2/inc/tsdbHealth.h index dfb61b79ac..8ce818f964 100644 --- a/source/dnode/vnode/tsdb2/inc/tsdbHealth.h +++ b/source/dnode/vnode/tsdb2/inc/tsdbHealth.h @@ -13,16 +13,16 @@ * along with this program. If not, see . */ -#ifndef _TD_TSDB_HEALTH_H_ -#define _TD_TSDB_HEALTH_H_ +// #ifndef _TD_TSDB_HEALTH_H_ +// #define _TD_TSDB_HEALTH_H_ -#include "os.h" -#include "tsdb.h" +// #include "os.h" +// #include "tsdb.h" -bool tsdbUrgeQueryFree(STsdbRepo* pRepo); -int32_t tsdbInsertNewBlock(STsdbRepo* pRepo); +// bool tsdbUrgeQueryFree(STsdbRepo* pRepo); +// int32_t tsdbInsertNewBlock(STsdbRepo* pRepo); -bool tsdbIdleMemEnough(); -bool tsdbAllowNewBlock(STsdbRepo* pRepo); +// bool tsdbIdleMemEnough(); +// bool tsdbAllowNewBlock(STsdbRepo* pRepo); -#endif /* _TD_TSDB_BUFFER_H_ */ +// #endif /* _TD_TSDB_BUFFER_H_ */ diff --git a/source/dnode/vnode/tsdb2/inc/tsdbint.h b/source/dnode/vnode/tsdb2/inc/tsdbint.h index 52fe6cdcbf..5be05bbd4d 100644 --- a/source/dnode/vnode/tsdb2/inc/tsdbint.h +++ b/source/dnode/vnode/tsdb2/inc/tsdbint.h @@ -120,17 +120,17 @@ static FORCE_INLINE STsdbBufBlock* tsdbGetCurrBufBlock(STsdbRepo* pRepo) { return pBufBlock; } -static FORCE_INLINE int tsdbGetNextMaxTables(int tid) { - ASSERT(tid >= 1 && tid <= TSDB_MAX_TABLES); - int maxTables = TSDB_INIT_NTABLES; - while (true) { - maxTables = MIN(maxTables, TSDB_MAX_TABLES); - if (tid <= maxTables) break; - maxTables *= 2; - } +// static FORCE_INLINE int tsdbGetNextMaxTables(int tid) { +// ASSERT(tid >= 1 && tid <= TSDB_MAX_TABLES); +// int maxTables = TSDB_INIT_NTABLES; +// while (true) { +// maxTables = MIN(maxTables, TSDB_MAX_TABLES); +// if (tid <= maxTables) break; +// maxTables *= 2; +// } - return maxTables + 1; -} +// return maxTables + 1; +// } #ifdef __cplusplus } diff --git a/source/dnode/vnode/tsdb2/src/tsdbBuffer.c b/source/dnode/vnode/tsdb2/src/tsdbBuffer.c index 70589031f6..03f3d78745 100644 --- a/source/dnode/vnode/tsdb2/src/tsdbBuffer.c +++ b/source/dnode/vnode/tsdb2/src/tsdbBuffer.c @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#include "tsdbint.h" #include "tsdbHealth.h" +#include "tsdbint.h" #define POOL_IS_EMPTY(b) (listNEles((b)->bufBlockList) == 0) @@ -63,7 +63,7 @@ int tsdbOpenBufPool(STsdbRepo *pRepo) { STsdbBufPool *pPool = pRepo->pPool; ASSERT(pPool != NULL); - pPool->bufBlockSize = pCfg->cacheBlockSize * 1024 * 1024; // MB + pPool->bufBlockSize = pCfg->cacheBlockSize * 1024 * 1024; // MB pPool->tBufBlocks = pCfg->totalBlocks; pPool->nBufBlocks = 0; pPool->nElasticBlocks = 0; @@ -118,15 +118,15 @@ SListNode *tsdbAllocBufBlockFromPool(STsdbRepo *pRepo) { STsdbBufPool *pBufPool = pRepo->pPool; while (POOL_IS_EMPTY(pBufPool)) { - if(tsDeadLockKillQuery) { - // supply new Block - if(tsdbInsertNewBlock(pRepo) > 0) { - tsdbWarn("vgId:%d add new elastic block . elasticBlocks=%d cur free Blocks=%d", REPO_ID(pRepo), pBufPool->nElasticBlocks, pBufPool->bufBlockList->numOfEles); + if (tsDeadLockKillQuery) { + // supply new Block + if (tsdbInsertNewBlock(pRepo) > 0) { + tsdbWarn("vgId:%d add new elastic block . elasticBlocks=%d cur free Blocks=%d", REPO_ID(pRepo), + pBufPool->nElasticBlocks, TD_DLIST_NELES(pBufPool->bufBlockList)); break; } else { // no newBlock, kill query free - if(!tsdbUrgeQueryFree(pRepo)) - tsdbWarn("vgId:%d Urge query free thread start failed.", REPO_ID(pRepo)); + if (!tsdbUrgeQueryFree(pRepo)) tsdbWarn("vgId:%d Urge query free thread start failed.", REPO_ID(pRepo)); } } @@ -135,7 +135,7 @@ SListNode *tsdbAllocBufBlockFromPool(STsdbRepo *pRepo) { pRepo->repoLocked = true; } - SListNode * pNode = tdListPopHead(pBufPool->bufBlockList); + SListNode *pNode = tdListPopHead(pBufPool->bufBlockList); ASSERT(pNode != NULL); STsdbBufBlock *pBufBlock = NULL; tdListNodeGetData(pBufPool->bufBlockList, pNode, (void *)(&pBufBlock)); @@ -163,9 +163,9 @@ STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize) { return pBufBlock; } - void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock) { tfree(pBufBlock); } +void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock) { tfree(pBufBlock); } -int tsdbExpandPool(STsdbRepo* pRepo, int32_t oldTotalBlocks) { +int tsdbExpandPool(STsdbRepo *pRepo, int32_t oldTotalBlocks) { if (oldTotalBlocks == pRepo->config.totalBlocks) { return TSDB_CODE_SUCCESS; } @@ -173,7 +173,7 @@ int tsdbExpandPool(STsdbRepo* pRepo, int32_t oldTotalBlocks) { int err = TSDB_CODE_SUCCESS; if (tsdbLockRepo(pRepo) < 0) return terrno; - STsdbBufPool* pPool = pRepo->pPool; + STsdbBufPool *pPool = pRepo->pPool; if (pRepo->config.totalBlocks > oldTotalBlocks) { for (int i = 0; i < pRepo->config.totalBlocks - oldTotalBlocks; i++) { @@ -187,28 +187,27 @@ int tsdbExpandPool(STsdbRepo* pRepo, int32_t oldTotalBlocks) { goto err; } - pPool->nBufBlocks++; + pPool->nBufBlocks++; } pthread_cond_signal(&pPool->poolNotEmpty); } else { pPool->nRecycleBlocks = oldTotalBlocks - pRepo->config.totalBlocks; - } + } err: tsdbUnlockRepo(pRepo); return err; } -void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode, bool bELastic) { +void tsdbRecycleBufferBlock(STsdbBufPool *pPool, SListNode *pNode, bool bELastic) { STsdbBufBlock *pBufBlock = NULL; tdListNodeGetData(pPool->bufBlockList, pNode, (void *)(&pBufBlock)); tsdbFreeBufBlock(pBufBlock); free(pNode); - if(bELastic) - { + if (bELastic) { pPool->nElasticBlocks--; - tsdbWarn("pPool=%p elastic block reduce one . nElasticBlocks=%d cur free Blocks=%d", pPool, pPool->nElasticBlocks, pPool->bufBlockList->numOfEles); - } - else + tsdbWarn("pPool=%p elastic block reduce one . nElasticBlocks=%d cur free Blocks=%d", pPool, pPool->nElasticBlocks, + TD_DLIST_NELES(pPool->bufBlockList)); + } else pPool->nBufBlocks--; } \ No newline at end of file diff --git a/source/dnode/vnode/tsdb2/src/tsdbCommit.c b/source/dnode/vnode/tsdb2/src/tsdbCommit.c index db675d0427..8355409beb 100644 --- a/source/dnode/vnode/tsdb2/src/tsdbCommit.c +++ b/source/dnode/vnode/tsdb2/src/tsdbCommit.c @@ -30,7 +30,7 @@ typedef struct { SFSIter fsIter; // tsdb file iterator int niters; // memory iterators SCommitIter *iters; - bool isRFileSet; // read and commit FSET + bool isRFileSet; // read and commit FSET SReadH readh; SDFileSet wSet; bool isDFileSame; @@ -265,12 +265,11 @@ int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf) { return 0; } - // =================== Commit Meta Data -static int tsdbInitCommitMetaFile(STsdbRepo *pRepo, SMFile* pMf, bool open) { - STsdbFS * pfs = REPO_FS(pRepo); - SMFile * pOMFile = pfs->cstatus->pmf; - SDiskID did; +static int tsdbInitCommitMetaFile(STsdbRepo *pRepo, SMFile *pMf, bool open) { + STsdbFS *pfs = REPO_FS(pRepo); + SMFile * pOMFile = pfs->cstatus->pmf; + SDiskID did; // Create/Open a meta file or open the existing file if (pOMFile == NULL) { @@ -432,7 +431,7 @@ static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void tsdbUpdateMFileMagic(pMFile, POINTER_SHIFT(cont, contLen - sizeof(TSCKSUM))); - SHashObj* cache = compact ? pfs->metaCacheComp : pfs->metaCache; + SHashObj *cache = compact ? pfs->metaCacheComp : pfs->metaCache; pMFile->info.nRecords++; @@ -480,7 +479,7 @@ static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid) { static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile) { float delPercent = (float)(pMFile->info.nDels) / (float)(pMFile->info.nRecords); float tombPercent = (float)(pMFile->info.tombSize) / (float)(pMFile->info.size); - float compactRatio = (float)(tsTsdbMetaCompactRatio)/100; + float compactRatio = (float)(tsTsdbMetaCompactRatio) / 100; if (delPercent < compactRatio && tombPercent < compactRatio) { return 0; @@ -491,10 +490,11 @@ static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile) { return -1; } - tsdbInfo("begin compact tsdb meta file, ratio:%d, nDels:%" PRId64 ",nRecords:%" PRId64 ",tombSize:%" PRId64 ",size:%" PRId64, - tsTsdbMetaCompactRatio, pMFile->info.nDels,pMFile->info.nRecords,pMFile->info.tombSize,pMFile->info.size); + tsdbInfo("begin compact tsdb meta file, ratio:%d, nDels:%" PRId64 ",nRecords:%" PRId64 ",tombSize:%" PRId64 + ",size:%" PRId64, + tsTsdbMetaCompactRatio, pMFile->info.nDels, pMFile->info.nRecords, pMFile->info.tombSize, pMFile->info.size); - SMFile mf; + SMFile mf; SDiskID did; // first create tmp meta file @@ -510,10 +510,10 @@ static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile) { tsdbInfo("vgId:%d meta file %s is created to compact meta data", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(&mf)); // second iterator metaCache - int code = -1; - int64_t maxBufSize = 1024; + int code = -1; + int64_t maxBufSize = 1024; SKVRecord *pRecord; - void *pBuf = NULL; + void * pBuf = NULL; pBuf = malloc((size_t)maxBufSize); if (pBuf == NULL) { @@ -536,7 +536,7 @@ static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile) { } if (pRecord->size > maxBufSize) { maxBufSize = pRecord->size; - void* tmp = realloc(pBuf, (size_t)maxBufSize); + void *tmp = realloc(pBuf, (size_t)maxBufSize); if (tmp == NULL) { goto _err; } @@ -545,7 +545,7 @@ static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile) { int nread = (int)tsdbReadMFile(pMFile, pBuf, pRecord->size); if (nread < 0) { tsdbError("vgId:%d failed to read file %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), - tstrerror(terrno)); + tstrerror(terrno)); goto _err; } @@ -572,8 +572,9 @@ _err: if (code == 0) { // rename meta.tmp -> meta - tsdbInfo("vgId:%d meta file rename %s -> %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(&mf), TSDB_FILE_FULL_NAME(pMFile)); - taosRename(mf.f.aname,pMFile->f.aname); + tsdbInfo("vgId:%d meta file rename %s -> %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(&mf), + TSDB_FILE_FULL_NAME(pMFile)); + taosRename(mf.f.aname, pMFile->f.aname); tstrncpy(mf.f.aname, pMFile->f.aname, TSDB_FILENAME_LEN); tstrncpy(mf.f.rname, pMFile->f.rname, TSDB_FILENAME_LEN); // update current meta file info @@ -595,8 +596,8 @@ _err: ASSERT(mf.info.nDels == 0); ASSERT(mf.info.tombSize == 0); - tsdbInfo("end compact tsdb meta file,code:%d,nRecords:%" PRId64 ",size:%" PRId64, - code,mf.info.nRecords,mf.info.size); + tsdbInfo("end compact tsdb meta file,code:%d,nRecords:%" PRId64 ",size:%" PRId64, code, mf.info.nRecords, + mf.info.size); return code; } @@ -1058,11 +1059,11 @@ static int tsdbComparKeyBlock(const void *arg1, const void *arg2) { int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDFileAggr, SDataCols *pDataCols, SBlock *pBlock, bool isLast, bool isSuper, void **ppBuf, void **ppCBuf, void **ppExBuf) { - STsdbCfg * pCfg = REPO_CFG(pRepo); - SBlockData *pBlockData; + STsdbCfg * pCfg = REPO_CFG(pRepo); + SBlockData * pBlockData; SAggrBlkData *pAggrBlkData = NULL; - int64_t offset = 0, offsetAggr = 0; - int rowsToWrite = pDataCols->numOfRows; + int64_t offset = 0, offsetAggr = 0; + int rowsToWrite = pDataCols->numOfRows; ASSERT(rowsToWrite > 0 && rowsToWrite <= pCfg->maxRowsPerFileBlock); ASSERT((!isLast) || rowsToWrite < pCfg->minRowsPerFileBlock); @@ -1141,8 +1142,7 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile pBlockCol = pBlockData->cols + tcol; tptr = POINTER_SHIFT(pBlockData, lsize); - if (pCfg->compression == TWO_STAGE_COMP && - tsdbMakeRoom(ppCBuf, tlen + COMP_OVERFLOW_BYTES) < 0) { + if (pCfg->compression == TWO_STAGE_COMP && tsdbMakeRoom(ppCBuf, tlen + COMP_OVERFLOW_BYTES) < 0) { return -1; } @@ -1188,7 +1188,6 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile uint32_t aggrStatus = nColsNotAllNull > 0 ? 1 : 0; if (aggrStatus > 0) { - taosCalcChecksumAppend(0, (uint8_t *)pAggrBlkData, tsizeAggr); tsdbUpdateDFileMagic(pDFileAggr, POINTER_SHIFT(pAggrBlkData, tsizeAggr - sizeof(TSCKSUM))); @@ -1409,7 +1408,8 @@ static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const return 0; } -static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit, bool isLastOneBlock) { +static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit, + bool isLastOneBlock) { STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); STsdbCfg * pCfg = REPO_CFG(pRepo); SBlock block; @@ -1445,7 +1445,7 @@ static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols } static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, - TSKEY maxKey, int maxRows, int8_t update) { + TSKEY maxKey, int maxRows, int8_t update) { TSKEY key1 = INT64_MAX; TSKEY key2 = INT64_MAX; STSchema *pSchema = NULL; @@ -1466,9 +1466,9 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt if (key1 < key2) { for (int i = 0; i < pDataCols->numOfCols; i++) { - //TODO: dataColAppendVal may fail + // TODO: dataColAppendVal may fail dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows, - pTarget->maxPoints, 0); + pTarget->maxPoints); } pTarget->numOfRows++; @@ -1480,30 +1480,29 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt ASSERT(pSchema != NULL); } - tdAppendMemRowToDataCol(row, pSchema, pTarget, true, 0); + tdAppendMemRowToDataCol(row, pSchema, pTarget, true); tSkipListIterNext(pCommitIter->pIter); } else { if (update != TD_ROW_OVERWRITE_UPDATE) { - //copy disk data + // copy disk data for (int i = 0; i < pDataCols->numOfCols; i++) { - //TODO: dataColAppendVal may fail + // TODO: dataColAppendVal may fail dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows, - pTarget->maxPoints, 0); + pTarget->maxPoints); } - if(update == TD_ROW_DISCARD_UPDATE) pTarget->numOfRows++; + if (update == TD_ROW_DISCARD_UPDATE) pTarget->numOfRows++; } if (update != TD_ROW_DISCARD_UPDATE) { - //copy mem data + // copy mem data if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) { pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row), (int8_t)memRowType(row)); ASSERT(pSchema != NULL); } - tdAppendMemRowToDataCol(row, pSchema, pTarget, update == TD_ROW_OVERWRITE_UPDATE, - update != TD_ROW_PARTIAL_UPDATE ? 0 : -1); + tdAppendMemRowToDataCol(row, pSchema, pTarget, update == TD_ROW_OVERWRITE_UPDATE); } (*iter)++; tSkipListIterNext(pCommitIter->pIter); diff --git a/source/dnode/vnode/tsdb2/src/tsdbCompact.c b/source/dnode/vnode/tsdb2/src/tsdbCompact.c index f25c0bd981..ee676c7d7b 100644 --- a/source/dnode/vnode/tsdb2/src/tsdbCompact.c +++ b/source/dnode/vnode/tsdb2/src/tsdbCompact.c @@ -223,9 +223,9 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) { } static bool tsdbShouldCompact(SCompactH *pComph) { - if (tsdbForceCompactFile) { - return true; - } + // if (tsdbForceCompactFile) { + // return true; + // } STsdbRepo * pRepo = TSDB_COMPACT_REPO(pComph); STsdbCfg * pCfg = REPO_CFG(pRepo); SReadH * pReadh = &(pComph->readh); diff --git a/source/dnode/vnode/tsdb2/src/tsdbFS.c b/source/dnode/vnode/tsdb2/src/tsdbFS.c index 1ef516b305..d376cce1e2 100644 --- a/source/dnode/vnode/tsdb2/src/tsdbFS.c +++ b/source/dnode/vnode/tsdb2/src/tsdbFS.c @@ -471,7 +471,7 @@ static int tsdbSaveFSStatus(SFSStatus *pStatus, int vid) { } // fsync, close and rename - if (taosFsync(fd) < 0) { + if (taosFsyncFile(fd) < 0) { terrno = TAOS_SYSTEM_ERROR(errno); close(fd); remove(tfname); @@ -480,7 +480,7 @@ static int tsdbSaveFSStatus(SFSStatus *pStatus, int vid) { } (void)close(fd); - (void)taosRename(tfname, cfname); + (void)taosRenameFile(tfname, cfname); taosTZfree(pBuf); return 0; @@ -838,7 +838,7 @@ int tsdbLoadMetaCache(STsdbRepo *pRepo, bool recoverMeta) { if (taosHashPut(pfs->metaCache, (void *)(&rInfo.uid), sizeof(rInfo.uid), &rInfo, sizeof(rInfo)) < 0) { tsdbError("vgId:%d failed to load meta cache from file %s since OOM", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile)); - terrno = TSDB_CODE_COM_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; tsdbCloseMFile(pMFile); return -1; } @@ -1256,7 +1256,7 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) { isOneFSetFinish = true; } else { // return error in case of removing uncomplete DFileSets - terrno = TSDB_CODE_TDB_INCOMPLETE_DFILESET; + // terrno = TSDB_CODE_TDB_INCOMPLETE_DFILESET; tsdbError("vgId:%d incomplete DFileSet, fid:%d, nDFiles=%" PRIu8, REPO_ID(pRepo), fset.fid, nDFiles); taosArrayDestroy(&fArray); return -1; @@ -1269,7 +1269,7 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) { isOneFSetFinish = true; } else { // return error in case of removing uncomplete DFileSets - terrno = TSDB_CODE_TDB_INCOMPLETE_DFILESET; + // terrno = TSDB_CODE_TDB_INCOMPLETE_DFILESET; tsdbError("vgId:%d incomplete DFileSet, fid:%d, nDFiles=%" PRIu8, REPO_ID(pRepo), fset.fid, nDFiles); taosArrayDestroy(&fArray); return -1; diff --git a/source/dnode/vnode/tsdb2/src/tsdbHealth.c b/source/dnode/vnode/tsdb2/src/tsdbHealth.c index 18d042ff60..1d70d2123d 100644 --- a/source/dnode/vnode/tsdb2/src/tsdbHealth.c +++ b/source/dnode/vnode/tsdb2/src/tsdbHealth.c @@ -13,6 +13,8 @@ * along with this program. If not, see . */ +#if 0 + #include "tsdbHealth.h" #include "os.h" #include "query.h" @@ -95,4 +97,6 @@ bool tsdbAllowNewBlock(STsdbRepo* pRepo) { bool tsdbNoProblem(STsdbRepo* pRepo) { if (listNEles(pRepo->pPool->bufBlockList) == 0) return false; return true; -} \ No newline at end of file +} + +#endif \ No newline at end of file diff --git a/source/dnode/vnode/tsdb2/src/tsdbMain.c b/source/dnode/vnode/tsdb2/src/tsdbMain.c index 45872b1dce..20f93cb0a4 100644 --- a/source/dnode/vnode/tsdb2/src/tsdbMain.c +++ b/source/dnode/vnode/tsdb2/src/tsdbMain.c @@ -185,13 +185,13 @@ int tsdbUnlockRepo(STsdbRepo *pRepo) { return 0; } -int tsdbCheckWal(STsdbRepo *pRepo, uint32_t walSize) { // MB - STsdbCfg *pCfg = &(pRepo->config); - if ((walSize > tsdbWalFlushSize) && (walSize > (pCfg->totalBlocks / 2 * pCfg->cacheBlockSize))) { - if (tsdbAsyncCommit(pRepo) < 0) return -1; - } - return 0; -} +// int tsdbCheckWal(STsdbRepo *pRepo, uint32_t walSize) { // MB +// STsdbCfg *pCfg = &(pRepo->config); +// if ((walSize > tsdbWalFlushSize) && (walSize > (pCfg->totalBlocks / 2 * pCfg->cacheBlockSize))) { +// if (tsdbAsyncCommit(pRepo) < 0) return -1; +// } +// return 0; +// } int tsdbCheckCommit(STsdbRepo *pRepo) { ASSERT(pRepo->mem != NULL); @@ -740,7 +740,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea // file block with sub-blocks has no statistics data if (pBlock->numOfSubBlocks <= 1) { - if (tsdbLoadBlockStatis(pReadh, pBlock) == TSDB_STATIS_OK) { + if (tsdbLoadBlockStatis(pReadh, pBlock) == 0) { tsdbGetBlockStatis(pReadh, pBlockStatis, (int)numColumns, pBlock); loadStatisData = true; } diff --git a/source/dnode/vnode/tsdb2/src/tsdbMemTable.c b/source/dnode/vnode/tsdb2/src/tsdbMemTable.c index 8958df3ced..6923811e25 100644 --- a/source/dnode/vnode/tsdb2/src/tsdbMemTable.c +++ b/source/dnode/vnode/tsdb2/src/tsdbMemTable.c @@ -18,22 +18,11 @@ #include "tsdbint.h" #include "tskiplist.h" #include "tsdbRowMergeBuf.h" +#include "ttime.h" #define TSDB_DATA_SKIPLIST_LEVEL 5 #define TSDB_MAX_INSERT_BATCH 512 -typedef struct { - int32_t totalLen; - int32_t len; - SMemRow row; -} SSubmitBlkIter; - -typedef struct { - int32_t totalLen; - int32_t len; - void * pMsg; -} SSubmitMsgIter; - static SMemTable * tsdbNewMemTable(STsdbRepo *pRepo); static void tsdbFreeMemTable(SMemTable *pMemTable); static STableData* tsdbNewTableData(STsdbCfg *pCfg, STable *pTable); @@ -41,12 +30,8 @@ static void tsdbFreeTableData(STableData *pTableData); static char * tsdbGetTsTupleKey(const void *data); static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables); static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SMemRow row); -static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter); -static SMemRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter); static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg); static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t *affectedrows); -static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter); -static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock); static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable); static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SMemRow row); @@ -256,7 +241,7 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) { return NULL; } - pNode->next = pNode->prev = NULL; + TD_DLIST_NODE_NEXT(pNode) = TD_DLIST_NODE_PREV(pNode) = NULL; tdListAppendNode(pRepo->mem->extraBuffList, pNode); ptr = (void *)(pNode->data); tsdbTrace("vgId:%d allocate %d bytes from SYSTEM buffer block", REPO_ID(pRepo), bytes); @@ -598,34 +583,12 @@ static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema * } } - tdAppendMemRowToDataCol(row, *ppSchema, pCols, true, 0); + tdAppendMemRowToDataCol(row, *ppSchema, pCols, true); } return 0; } -static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) { - if (pBlock->dataLen <= 0) return -1; - pIter->totalLen = pBlock->dataLen; - pIter->len = 0; - pIter->row = (SMemRow)(pBlock->data + pBlock->schemaLen); - return 0; -} - -static SMemRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter) { - SMemRow row = pIter->row; // firstly, get current row - if (row == NULL) return NULL; - - pIter->len += memRowTLen(row); - if (pIter->len >= pIter->totalLen) { // reach the end - pIter->row = NULL; - } else { - pIter->row = (char *)row + memRowTLen(row); // secondly, move to next row - } - - return row; -} - static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SMemRow row, TSKEY minKey, TSKEY maxKey, TSKEY now) { TSKEY rowKey = memRowKey(row); @@ -841,7 +804,7 @@ static int tsdbInsertDataToTable(STsdbRepo* pRepo, SSubmitBlk* pBlock, int32_t * SMemRow lastRow = NULL; int64_t osize = SL_SIZE(pTableData->pData); tsdbSetupSkipListHookFns(pTableData->pData, pRepo, pTable, &points, &lastRow); - tSkipListPutBatchByIter(pTableData->pData, &blkIter, (iter_next_fn_t)tsdbGetSubmitBlkNext); + tSkipListPutBatchByIter(pTableData->pData, &blkIter, (iter_next_fn_t)tGetSubmitBlkNext); int64_t dsize = SL_SIZE(pTableData->pData) - osize; (*pAffectedRows) += points; @@ -866,43 +829,6 @@ static int tsdbInsertDataToTable(STsdbRepo* pRepo, SSubmitBlk* pBlock, int32_t * return 0; } - -static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) { - if (pMsg == NULL) { - terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; - return -1; - } - - pIter->totalLen = pMsg->length; - pIter->len = 0; - pIter->pMsg = pMsg; - if (pMsg->length <= TSDB_SUBMIT_MSG_HEAD_SIZE) { - terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; - return -1; - } - - return 0; -} - -static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) { - if (pIter->len == 0) { - pIter->len += TSDB_SUBMIT_MSG_HEAD_SIZE; - } else { - SSubmitBlk *pSubmitBlk = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len); - pIter->len += (sizeof(SSubmitBlk) + pSubmitBlk->dataLen + pSubmitBlk->schemaLen); - } - - if (pIter->len > pIter->totalLen) { - terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; - *pPBlock = NULL; - return -1; - } - - *pPBlock = (pIter->len == pIter->totalLen) ? NULL : (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len); - - return 0; -} - static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable) { ASSERT(pTable != NULL); diff --git a/source/dnode/vnode/tsdb2/src/tsdbReadImpl.c b/source/dnode/vnode/tsdb2/src/tsdbReadImpl.c index f2678c627f..0e23752ec4 100644 --- a/source/dnode/vnode/tsdb2/src/tsdbReadImpl.c +++ b/source/dnode/vnode/tsdb2/src/tsdbReadImpl.c @@ -505,7 +505,7 @@ int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock) { if (pBlock->aggrStat) { return tsdbLoadBlockStatisFromAggr(pReadh, pBlock); } - return TSDB_STATIS_NONE; + return 1; } return tsdbLoadBlockStatisFromDFile(pReadh, pBlock); }