diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h
index d33f0af13c..cc30cd78f5 100644
--- a/include/common/tdataformat.h
+++ b/include/common/tdataformat.h
@@ -31,13 +31,12 @@ extern "C" {
memcpy(varDataVal(x), (str), __len); \
} while (0);
-#define STR_TO_NET_VARSTR(x, str) \
- do { \
- VarDataLenT __len = (VarDataLenT)strlen(str); \
- *(VarDataLenT *)(x) = htons(__len); \
- memcpy(varDataVal(x), (str), __len); \
- } while (0);
-
+#define STR_TO_NET_VARSTR(x, str) \
+ do { \
+ VarDataLenT __len = (VarDataLenT)strlen(str); \
+ *(VarDataLenT *)(x) = htons(__len); \
+ memcpy(varDataVal(x), (str), __len); \
+ } while (0);
#define STR_WITH_MAXSIZE_TO_VARSTR(x, str, _maxs) \
do { \
@@ -71,11 +70,12 @@ typedef struct {
// ----------------- TSDB SCHEMA DEFINITION
typedef struct {
- int version; // version
- int numOfCols; // Number of columns appended
- int tlen; // maximum length of a SDataRow without the header part (sizeof(VarDataOffsetT) + sizeof(VarDataLenT) + (bytes))
- uint16_t flen; // First part length in a SDataRow after the header part
- uint16_t vlen; // pure value part length, excluded the overhead (bytes only)
+ int version; // version
+ int numOfCols; // Number of columns appended
+ int tlen; // maximum length of a SDataRow without the header part (sizeof(VarDataOffsetT) + sizeof(VarDataLenT) +
+ // (bytes))
+ uint16_t flen; // First part length in a SDataRow after the header part
+ uint16_t vlen; // pure value part length, excluded the overhead (bytes only)
STColumn columns[];
} STSchema;
@@ -202,7 +202,6 @@ void tdFreeDataRow(SDataRow row);
void tdInitDataRow(SDataRow row, STSchema *pSchema);
SDataRow tdDataRowDup(SDataRow row);
-
// offset here not include dataRow header length
static FORCE_INLINE int tdAppendDataColVal(SDataRow row, const void *value, bool isCopyVarData, int8_t type,
int32_t offset) {
@@ -228,7 +227,6 @@ static FORCE_INLINE int tdAppendDataColVal(SDataRow row, const void *value, bool
return 0;
}
-
// offset here not include dataRow header length
static FORCE_INLINE int tdAppendColVal(SDataRow row, const void *value, int8_t type, int32_t offset) {
return tdAppendDataColVal(row, value, true, type, offset);
@@ -249,27 +247,28 @@ static FORCE_INLINE void *tdGetPtrToCol(SDataRow row, STSchema *pSchema, int idx
static FORCE_INLINE void *tdGetColOfRowBySchema(SDataRow row, STSchema *pSchema, int idx) {
int16_t offset = TD_DATA_ROW_HEAD_SIZE + pSchema->columns[idx].offset;
- int8_t type = pSchema->columns[idx].type;
+ int8_t type = pSchema->columns[idx].type;
return tdGetRowDataOfCol(row, type, offset);
}
static FORCE_INLINE bool tdIsColOfRowNullBySchema(SDataRow row, STSchema *pSchema, int idx) {
int16_t offset = TD_DATA_ROW_HEAD_SIZE + pSchema->columns[idx].offset;
- int8_t type = pSchema->columns[idx].type;
+ int8_t type = pSchema->columns[idx].type;
return isNull(tdGetRowDataOfCol(row, type, offset), type);
}
static FORCE_INLINE void tdSetColOfRowNullBySchema(SDataRow row, STSchema *pSchema, int idx) {
int16_t offset = TD_DATA_ROW_HEAD_SIZE + pSchema->columns[idx].offset;
- int8_t type = pSchema->columns[idx].type;
+ int8_t type = pSchema->columns[idx].type;
int16_t bytes = pSchema->columns[idx].bytes;
setNull(tdGetRowDataOfCol(row, type, offset), type, bytes);
}
-static FORCE_INLINE void tdCopyColOfRowBySchema(SDataRow dst, STSchema *pDstSchema, int dstIdx, SDataRow src, STSchema *pSrcSchema, int srcIdx) {
+static FORCE_INLINE void tdCopyColOfRowBySchema(SDataRow dst, STSchema *pDstSchema, int dstIdx, SDataRow src,
+ STSchema *pSrcSchema, int srcIdx) {
int8_t type = pDstSchema->columns[dstIdx].type;
assert(type == pSrcSchema->columns[srcIdx].type);
void *pData = tdGetPtrToCol(dst, pDstSchema, dstIdx);
@@ -319,7 +318,6 @@ static FORCE_INLINE void tdCopyColOfRowBySchema(SDataRow dst, STSchema *pDstSche
}
}
-
// ----------------- Data column structure
typedef struct SDataCol {
int8_t type; // column type
@@ -339,7 +337,7 @@ static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; }
int tdAllocMemForCol(SDataCol *pCol, int maxPoints);
void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints);
-int dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints);
+int dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints);
void dataColSetOffset(SDataCol *pCol, int nEle);
bool isNEleNull(SDataCol *pCol, int nEle);
@@ -367,15 +365,15 @@ static FORCE_INLINE int32_t dataColGetNEleLen(SDataCol *pDataCol, int rows) {
}
typedef struct {
- int maxCols; // max number of columns
- int maxPoints; // max number of points
- int numOfRows;
- int numOfCols; // Total number of cols
- int sversion; // TODO: set sversion
+ int maxCols; // max number of columns
+ int maxPoints; // max number of points
+ int numOfRows;
+ int numOfCols; // Total number of cols
+ int sversion; // TODO: set sversion
SDataCol *cols;
} SDataCols;
-#define keyCol(pCols) (&((pCols)->cols[0])) // Key column
+#define keyCol(pCols) (&((pCols)->cols[0])) // Key column
#define dataColsTKeyAt(pCols, idx) ((TKEY *)(keyCol(pCols)->pData))[(idx)] // the idx row of column-wised data
#define dataColsKeyAt(pCols, idx) tdGetKey(dataColsTKeyAt(pCols, idx))
static FORCE_INLINE TKEY dataColsTKeyFirst(SDataCols *pCols) {
@@ -548,7 +546,7 @@ SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder);
static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, int8_t type, const void *value) {
if (pBuilder->nCols >= pBuilder->tCols) {
pBuilder->tCols *= 2;
- SColIdx* pColIdx = (SColIdx *)realloc((void *)(pBuilder->pColIdx), sizeof(SColIdx) * pBuilder->tCols);
+ SColIdx *pColIdx = (SColIdx *)realloc((void *)(pBuilder->pColIdx), sizeof(SColIdx) * pBuilder->tCols);
if (pColIdx == NULL) return -1;
pBuilder->pColIdx = pColIdx;
}
@@ -563,7 +561,7 @@ static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId,
while (tlen > pBuilder->alloc - pBuilder->size) {
pBuilder->alloc *= 2;
}
- void* buf = realloc(pBuilder->buf, pBuilder->alloc);
+ void *buf = realloc(pBuilder->buf, pBuilder->alloc);
if (buf == NULL) return -1;
pBuilder->buf = buf;
}
@@ -752,10 +750,10 @@ static FORCE_INLINE void tdGetColAppendDeltaLen(const void *value, int8_t colTyp
typedef struct {
int16_t colId;
uint8_t colType;
- char* colVal;
+ char * colVal;
} SColInfo;
-static FORCE_INLINE void setSColInfo(SColInfo* colInfo, int16_t colId, uint8_t colType, char* colVal) {
+static FORCE_INLINE void setSColInfo(SColInfo *colInfo, int16_t colId, uint8_t colType, char *colVal) {
colInfo->colId = colId;
colInfo->colType = colType;
colInfo->colVal = colVal;
@@ -815,4 +813,4 @@ static FORCE_INLINE char *payloadNextCol(char *pCol) { return (char *)POINTER_SH
}
#endif
-#endif /*_TD_COMMON_DATA_FORMAT_H_*/
+#endif /*_TD_COMMON_DATA_FORMAT_H_*/
diff --git a/include/os/osDef.h b/include/os/osDef.h
index bb5395f548..040c4bc7e7 100644
--- a/include/os/osDef.h
+++ b/include/os/osDef.h
@@ -73,6 +73,12 @@ extern "C" {
#endif
+#ifndef WINDOWS
+ #ifndef O_BINARY
+ #define O_BINARY 0
+ #endif
+#endif
+
#define POINTER_SHIFT(p, b) ((void *)((char *)(p) + (b)))
#define POINTER_DISTANCE(p1, p2) ((char *)(p1) - (char *)(p2))
diff --git a/source/dnode/vnode/tsdb2/inc/tsdbHealth.h b/source/dnode/vnode/tsdb2/inc/tsdbHealth.h
index dfb61b79ac..8ce818f964 100644
--- a/source/dnode/vnode/tsdb2/inc/tsdbHealth.h
+++ b/source/dnode/vnode/tsdb2/inc/tsdbHealth.h
@@ -13,16 +13,16 @@
* along with this program. If not, see .
*/
-#ifndef _TD_TSDB_HEALTH_H_
-#define _TD_TSDB_HEALTH_H_
+// #ifndef _TD_TSDB_HEALTH_H_
+// #define _TD_TSDB_HEALTH_H_
-#include "os.h"
-#include "tsdb.h"
+// #include "os.h"
+// #include "tsdb.h"
-bool tsdbUrgeQueryFree(STsdbRepo* pRepo);
-int32_t tsdbInsertNewBlock(STsdbRepo* pRepo);
+// bool tsdbUrgeQueryFree(STsdbRepo* pRepo);
+// int32_t tsdbInsertNewBlock(STsdbRepo* pRepo);
-bool tsdbIdleMemEnough();
-bool tsdbAllowNewBlock(STsdbRepo* pRepo);
+// bool tsdbIdleMemEnough();
+// bool tsdbAllowNewBlock(STsdbRepo* pRepo);
-#endif /* _TD_TSDB_BUFFER_H_ */
+// #endif /* _TD_TSDB_BUFFER_H_ */
diff --git a/source/dnode/vnode/tsdb2/inc/tsdbint.h b/source/dnode/vnode/tsdb2/inc/tsdbint.h
index 52fe6cdcbf..5be05bbd4d 100644
--- a/source/dnode/vnode/tsdb2/inc/tsdbint.h
+++ b/source/dnode/vnode/tsdb2/inc/tsdbint.h
@@ -120,17 +120,17 @@ static FORCE_INLINE STsdbBufBlock* tsdbGetCurrBufBlock(STsdbRepo* pRepo) {
return pBufBlock;
}
-static FORCE_INLINE int tsdbGetNextMaxTables(int tid) {
- ASSERT(tid >= 1 && tid <= TSDB_MAX_TABLES);
- int maxTables = TSDB_INIT_NTABLES;
- while (true) {
- maxTables = MIN(maxTables, TSDB_MAX_TABLES);
- if (tid <= maxTables) break;
- maxTables *= 2;
- }
+// static FORCE_INLINE int tsdbGetNextMaxTables(int tid) {
+// ASSERT(tid >= 1 && tid <= TSDB_MAX_TABLES);
+// int maxTables = TSDB_INIT_NTABLES;
+// while (true) {
+// maxTables = MIN(maxTables, TSDB_MAX_TABLES);
+// if (tid <= maxTables) break;
+// maxTables *= 2;
+// }
- return maxTables + 1;
-}
+// return maxTables + 1;
+// }
#ifdef __cplusplus
}
diff --git a/source/dnode/vnode/tsdb2/src/tsdbBuffer.c b/source/dnode/vnode/tsdb2/src/tsdbBuffer.c
index 70589031f6..03f3d78745 100644
--- a/source/dnode/vnode/tsdb2/src/tsdbBuffer.c
+++ b/source/dnode/vnode/tsdb2/src/tsdbBuffer.c
@@ -13,8 +13,8 @@
* along with this program. If not, see .
*/
-#include "tsdbint.h"
#include "tsdbHealth.h"
+#include "tsdbint.h"
#define POOL_IS_EMPTY(b) (listNEles((b)->bufBlockList) == 0)
@@ -63,7 +63,7 @@ int tsdbOpenBufPool(STsdbRepo *pRepo) {
STsdbBufPool *pPool = pRepo->pPool;
ASSERT(pPool != NULL);
- pPool->bufBlockSize = pCfg->cacheBlockSize * 1024 * 1024; // MB
+ pPool->bufBlockSize = pCfg->cacheBlockSize * 1024 * 1024; // MB
pPool->tBufBlocks = pCfg->totalBlocks;
pPool->nBufBlocks = 0;
pPool->nElasticBlocks = 0;
@@ -118,15 +118,15 @@ SListNode *tsdbAllocBufBlockFromPool(STsdbRepo *pRepo) {
STsdbBufPool *pBufPool = pRepo->pPool;
while (POOL_IS_EMPTY(pBufPool)) {
- if(tsDeadLockKillQuery) {
- // supply new Block
- if(tsdbInsertNewBlock(pRepo) > 0) {
- tsdbWarn("vgId:%d add new elastic block . elasticBlocks=%d cur free Blocks=%d", REPO_ID(pRepo), pBufPool->nElasticBlocks, pBufPool->bufBlockList->numOfEles);
+ if (tsDeadLockKillQuery) {
+ // supply new Block
+ if (tsdbInsertNewBlock(pRepo) > 0) {
+ tsdbWarn("vgId:%d add new elastic block . elasticBlocks=%d cur free Blocks=%d", REPO_ID(pRepo),
+ pBufPool->nElasticBlocks, TD_DLIST_NELES(pBufPool->bufBlockList));
break;
} else {
// no newBlock, kill query free
- if(!tsdbUrgeQueryFree(pRepo))
- tsdbWarn("vgId:%d Urge query free thread start failed.", REPO_ID(pRepo));
+ if (!tsdbUrgeQueryFree(pRepo)) tsdbWarn("vgId:%d Urge query free thread start failed.", REPO_ID(pRepo));
}
}
@@ -135,7 +135,7 @@ SListNode *tsdbAllocBufBlockFromPool(STsdbRepo *pRepo) {
pRepo->repoLocked = true;
}
- SListNode * pNode = tdListPopHead(pBufPool->bufBlockList);
+ SListNode *pNode = tdListPopHead(pBufPool->bufBlockList);
ASSERT(pNode != NULL);
STsdbBufBlock *pBufBlock = NULL;
tdListNodeGetData(pBufPool->bufBlockList, pNode, (void *)(&pBufBlock));
@@ -163,9 +163,9 @@ STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize) {
return pBufBlock;
}
- void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock) { tfree(pBufBlock); }
+void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock) { tfree(pBufBlock); }
-int tsdbExpandPool(STsdbRepo* pRepo, int32_t oldTotalBlocks) {
+int tsdbExpandPool(STsdbRepo *pRepo, int32_t oldTotalBlocks) {
if (oldTotalBlocks == pRepo->config.totalBlocks) {
return TSDB_CODE_SUCCESS;
}
@@ -173,7 +173,7 @@ int tsdbExpandPool(STsdbRepo* pRepo, int32_t oldTotalBlocks) {
int err = TSDB_CODE_SUCCESS;
if (tsdbLockRepo(pRepo) < 0) return terrno;
- STsdbBufPool* pPool = pRepo->pPool;
+ STsdbBufPool *pPool = pRepo->pPool;
if (pRepo->config.totalBlocks > oldTotalBlocks) {
for (int i = 0; i < pRepo->config.totalBlocks - oldTotalBlocks; i++) {
@@ -187,28 +187,27 @@ int tsdbExpandPool(STsdbRepo* pRepo, int32_t oldTotalBlocks) {
goto err;
}
- pPool->nBufBlocks++;
+ pPool->nBufBlocks++;
}
pthread_cond_signal(&pPool->poolNotEmpty);
} else {
pPool->nRecycleBlocks = oldTotalBlocks - pRepo->config.totalBlocks;
- }
+ }
err:
tsdbUnlockRepo(pRepo);
return err;
}
-void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode, bool bELastic) {
+void tsdbRecycleBufferBlock(STsdbBufPool *pPool, SListNode *pNode, bool bELastic) {
STsdbBufBlock *pBufBlock = NULL;
tdListNodeGetData(pPool->bufBlockList, pNode, (void *)(&pBufBlock));
tsdbFreeBufBlock(pBufBlock);
free(pNode);
- if(bELastic)
- {
+ if (bELastic) {
pPool->nElasticBlocks--;
- tsdbWarn("pPool=%p elastic block reduce one . nElasticBlocks=%d cur free Blocks=%d", pPool, pPool->nElasticBlocks, pPool->bufBlockList->numOfEles);
- }
- else
+ tsdbWarn("pPool=%p elastic block reduce one . nElasticBlocks=%d cur free Blocks=%d", pPool, pPool->nElasticBlocks,
+ TD_DLIST_NELES(pPool->bufBlockList));
+ } else
pPool->nBufBlocks--;
}
\ No newline at end of file
diff --git a/source/dnode/vnode/tsdb2/src/tsdbCommit.c b/source/dnode/vnode/tsdb2/src/tsdbCommit.c
index db675d0427..8355409beb 100644
--- a/source/dnode/vnode/tsdb2/src/tsdbCommit.c
+++ b/source/dnode/vnode/tsdb2/src/tsdbCommit.c
@@ -30,7 +30,7 @@ typedef struct {
SFSIter fsIter; // tsdb file iterator
int niters; // memory iterators
SCommitIter *iters;
- bool isRFileSet; // read and commit FSET
+ bool isRFileSet; // read and commit FSET
SReadH readh;
SDFileSet wSet;
bool isDFileSame;
@@ -265,12 +265,11 @@ int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf) {
return 0;
}
-
// =================== Commit Meta Data
-static int tsdbInitCommitMetaFile(STsdbRepo *pRepo, SMFile* pMf, bool open) {
- STsdbFS * pfs = REPO_FS(pRepo);
- SMFile * pOMFile = pfs->cstatus->pmf;
- SDiskID did;
+static int tsdbInitCommitMetaFile(STsdbRepo *pRepo, SMFile *pMf, bool open) {
+ STsdbFS *pfs = REPO_FS(pRepo);
+ SMFile * pOMFile = pfs->cstatus->pmf;
+ SDiskID did;
// Create/Open a meta file or open the existing file
if (pOMFile == NULL) {
@@ -432,7 +431,7 @@ static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void
tsdbUpdateMFileMagic(pMFile, POINTER_SHIFT(cont, contLen - sizeof(TSCKSUM)));
- SHashObj* cache = compact ? pfs->metaCacheComp : pfs->metaCache;
+ SHashObj *cache = compact ? pfs->metaCacheComp : pfs->metaCache;
pMFile->info.nRecords++;
@@ -480,7 +479,7 @@ static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid) {
static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile) {
float delPercent = (float)(pMFile->info.nDels) / (float)(pMFile->info.nRecords);
float tombPercent = (float)(pMFile->info.tombSize) / (float)(pMFile->info.size);
- float compactRatio = (float)(tsTsdbMetaCompactRatio)/100;
+ float compactRatio = (float)(tsTsdbMetaCompactRatio) / 100;
if (delPercent < compactRatio && tombPercent < compactRatio) {
return 0;
@@ -491,10 +490,11 @@ static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile) {
return -1;
}
- tsdbInfo("begin compact tsdb meta file, ratio:%d, nDels:%" PRId64 ",nRecords:%" PRId64 ",tombSize:%" PRId64 ",size:%" PRId64,
- tsTsdbMetaCompactRatio, pMFile->info.nDels,pMFile->info.nRecords,pMFile->info.tombSize,pMFile->info.size);
+ tsdbInfo("begin compact tsdb meta file, ratio:%d, nDels:%" PRId64 ",nRecords:%" PRId64 ",tombSize:%" PRId64
+ ",size:%" PRId64,
+ tsTsdbMetaCompactRatio, pMFile->info.nDels, pMFile->info.nRecords, pMFile->info.tombSize, pMFile->info.size);
- SMFile mf;
+ SMFile mf;
SDiskID did;
// first create tmp meta file
@@ -510,10 +510,10 @@ static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile) {
tsdbInfo("vgId:%d meta file %s is created to compact meta data", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(&mf));
// second iterator metaCache
- int code = -1;
- int64_t maxBufSize = 1024;
+ int code = -1;
+ int64_t maxBufSize = 1024;
SKVRecord *pRecord;
- void *pBuf = NULL;
+ void * pBuf = NULL;
pBuf = malloc((size_t)maxBufSize);
if (pBuf == NULL) {
@@ -536,7 +536,7 @@ static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile) {
}
if (pRecord->size > maxBufSize) {
maxBufSize = pRecord->size;
- void* tmp = realloc(pBuf, (size_t)maxBufSize);
+ void *tmp = realloc(pBuf, (size_t)maxBufSize);
if (tmp == NULL) {
goto _err;
}
@@ -545,7 +545,7 @@ static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile) {
int nread = (int)tsdbReadMFile(pMFile, pBuf, pRecord->size);
if (nread < 0) {
tsdbError("vgId:%d failed to read file %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile),
- tstrerror(terrno));
+ tstrerror(terrno));
goto _err;
}
@@ -572,8 +572,9 @@ _err:
if (code == 0) {
// rename meta.tmp -> meta
- tsdbInfo("vgId:%d meta file rename %s -> %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(&mf), TSDB_FILE_FULL_NAME(pMFile));
- taosRename(mf.f.aname,pMFile->f.aname);
+ tsdbInfo("vgId:%d meta file rename %s -> %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(&mf),
+ TSDB_FILE_FULL_NAME(pMFile));
+ taosRename(mf.f.aname, pMFile->f.aname);
tstrncpy(mf.f.aname, pMFile->f.aname, TSDB_FILENAME_LEN);
tstrncpy(mf.f.rname, pMFile->f.rname, TSDB_FILENAME_LEN);
// update current meta file info
@@ -595,8 +596,8 @@ _err:
ASSERT(mf.info.nDels == 0);
ASSERT(mf.info.tombSize == 0);
- tsdbInfo("end compact tsdb meta file,code:%d,nRecords:%" PRId64 ",size:%" PRId64,
- code,mf.info.nRecords,mf.info.size);
+ tsdbInfo("end compact tsdb meta file,code:%d,nRecords:%" PRId64 ",size:%" PRId64, code, mf.info.nRecords,
+ mf.info.size);
return code;
}
@@ -1058,11 +1059,11 @@ static int tsdbComparKeyBlock(const void *arg1, const void *arg2) {
int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDFileAggr, SDataCols *pDataCols,
SBlock *pBlock, bool isLast, bool isSuper, void **ppBuf, void **ppCBuf, void **ppExBuf) {
- STsdbCfg * pCfg = REPO_CFG(pRepo);
- SBlockData *pBlockData;
+ STsdbCfg * pCfg = REPO_CFG(pRepo);
+ SBlockData * pBlockData;
SAggrBlkData *pAggrBlkData = NULL;
- int64_t offset = 0, offsetAggr = 0;
- int rowsToWrite = pDataCols->numOfRows;
+ int64_t offset = 0, offsetAggr = 0;
+ int rowsToWrite = pDataCols->numOfRows;
ASSERT(rowsToWrite > 0 && rowsToWrite <= pCfg->maxRowsPerFileBlock);
ASSERT((!isLast) || rowsToWrite < pCfg->minRowsPerFileBlock);
@@ -1141,8 +1142,7 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile
pBlockCol = pBlockData->cols + tcol;
tptr = POINTER_SHIFT(pBlockData, lsize);
- if (pCfg->compression == TWO_STAGE_COMP &&
- tsdbMakeRoom(ppCBuf, tlen + COMP_OVERFLOW_BYTES) < 0) {
+ if (pCfg->compression == TWO_STAGE_COMP && tsdbMakeRoom(ppCBuf, tlen + COMP_OVERFLOW_BYTES) < 0) {
return -1;
}
@@ -1188,7 +1188,6 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile
uint32_t aggrStatus = nColsNotAllNull > 0 ? 1 : 0;
if (aggrStatus > 0) {
-
taosCalcChecksumAppend(0, (uint8_t *)pAggrBlkData, tsizeAggr);
tsdbUpdateDFileMagic(pDFileAggr, POINTER_SHIFT(pAggrBlkData, tsizeAggr - sizeof(TSCKSUM)));
@@ -1409,7 +1408,8 @@ static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const
return 0;
}
-static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit, bool isLastOneBlock) {
+static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit,
+ bool isLastOneBlock) {
STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg * pCfg = REPO_CFG(pRepo);
SBlock block;
@@ -1445,7 +1445,7 @@ static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols
}
static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget,
- TSKEY maxKey, int maxRows, int8_t update) {
+ TSKEY maxKey, int maxRows, int8_t update) {
TSKEY key1 = INT64_MAX;
TSKEY key2 = INT64_MAX;
STSchema *pSchema = NULL;
@@ -1466,9 +1466,9 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
if (key1 < key2) {
for (int i = 0; i < pDataCols->numOfCols; i++) {
- //TODO: dataColAppendVal may fail
+ // TODO: dataColAppendVal may fail
dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows,
- pTarget->maxPoints, 0);
+ pTarget->maxPoints);
}
pTarget->numOfRows++;
@@ -1480,30 +1480,29 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
ASSERT(pSchema != NULL);
}
- tdAppendMemRowToDataCol(row, pSchema, pTarget, true, 0);
+ tdAppendMemRowToDataCol(row, pSchema, pTarget, true);
tSkipListIterNext(pCommitIter->pIter);
} else {
if (update != TD_ROW_OVERWRITE_UPDATE) {
- //copy disk data
+ // copy disk data
for (int i = 0; i < pDataCols->numOfCols; i++) {
- //TODO: dataColAppendVal may fail
+ // TODO: dataColAppendVal may fail
dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows,
- pTarget->maxPoints, 0);
+ pTarget->maxPoints);
}
- if(update == TD_ROW_DISCARD_UPDATE) pTarget->numOfRows++;
+ if (update == TD_ROW_DISCARD_UPDATE) pTarget->numOfRows++;
}
if (update != TD_ROW_DISCARD_UPDATE) {
- //copy mem data
+ // copy mem data
if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) {
pSchema =
tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row), (int8_t)memRowType(row));
ASSERT(pSchema != NULL);
}
- tdAppendMemRowToDataCol(row, pSchema, pTarget, update == TD_ROW_OVERWRITE_UPDATE,
- update != TD_ROW_PARTIAL_UPDATE ? 0 : -1);
+ tdAppendMemRowToDataCol(row, pSchema, pTarget, update == TD_ROW_OVERWRITE_UPDATE);
}
(*iter)++;
tSkipListIterNext(pCommitIter->pIter);
diff --git a/source/dnode/vnode/tsdb2/src/tsdbCompact.c b/source/dnode/vnode/tsdb2/src/tsdbCompact.c
index f25c0bd981..ee676c7d7b 100644
--- a/source/dnode/vnode/tsdb2/src/tsdbCompact.c
+++ b/source/dnode/vnode/tsdb2/src/tsdbCompact.c
@@ -223,9 +223,9 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
}
static bool tsdbShouldCompact(SCompactH *pComph) {
- if (tsdbForceCompactFile) {
- return true;
- }
+ // if (tsdbForceCompactFile) {
+ // return true;
+ // }
STsdbRepo * pRepo = TSDB_COMPACT_REPO(pComph);
STsdbCfg * pCfg = REPO_CFG(pRepo);
SReadH * pReadh = &(pComph->readh);
diff --git a/source/dnode/vnode/tsdb2/src/tsdbFS.c b/source/dnode/vnode/tsdb2/src/tsdbFS.c
index 1ef516b305..d376cce1e2 100644
--- a/source/dnode/vnode/tsdb2/src/tsdbFS.c
+++ b/source/dnode/vnode/tsdb2/src/tsdbFS.c
@@ -471,7 +471,7 @@ static int tsdbSaveFSStatus(SFSStatus *pStatus, int vid) {
}
// fsync, close and rename
- if (taosFsync(fd) < 0) {
+ if (taosFsyncFile(fd) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
close(fd);
remove(tfname);
@@ -480,7 +480,7 @@ static int tsdbSaveFSStatus(SFSStatus *pStatus, int vid) {
}
(void)close(fd);
- (void)taosRename(tfname, cfname);
+ (void)taosRenameFile(tfname, cfname);
taosTZfree(pBuf);
return 0;
@@ -838,7 +838,7 @@ int tsdbLoadMetaCache(STsdbRepo *pRepo, bool recoverMeta) {
if (taosHashPut(pfs->metaCache, (void *)(&rInfo.uid), sizeof(rInfo.uid), &rInfo, sizeof(rInfo)) < 0) {
tsdbError("vgId:%d failed to load meta cache from file %s since OOM", REPO_ID(pRepo),
TSDB_FILE_FULL_NAME(pMFile));
- terrno = TSDB_CODE_COM_OUT_OF_MEMORY;
+ terrno = TSDB_CODE_OUT_OF_MEMORY;
tsdbCloseMFile(pMFile);
return -1;
}
@@ -1256,7 +1256,7 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) {
isOneFSetFinish = true;
} else {
// return error in case of removing uncomplete DFileSets
- terrno = TSDB_CODE_TDB_INCOMPLETE_DFILESET;
+ // terrno = TSDB_CODE_TDB_INCOMPLETE_DFILESET;
tsdbError("vgId:%d incomplete DFileSet, fid:%d, nDFiles=%" PRIu8, REPO_ID(pRepo), fset.fid, nDFiles);
taosArrayDestroy(&fArray);
return -1;
@@ -1269,7 +1269,7 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) {
isOneFSetFinish = true;
} else {
// return error in case of removing uncomplete DFileSets
- terrno = TSDB_CODE_TDB_INCOMPLETE_DFILESET;
+ // terrno = TSDB_CODE_TDB_INCOMPLETE_DFILESET;
tsdbError("vgId:%d incomplete DFileSet, fid:%d, nDFiles=%" PRIu8, REPO_ID(pRepo), fset.fid, nDFiles);
taosArrayDestroy(&fArray);
return -1;
diff --git a/source/dnode/vnode/tsdb2/src/tsdbHealth.c b/source/dnode/vnode/tsdb2/src/tsdbHealth.c
index 18d042ff60..1d70d2123d 100644
--- a/source/dnode/vnode/tsdb2/src/tsdbHealth.c
+++ b/source/dnode/vnode/tsdb2/src/tsdbHealth.c
@@ -13,6 +13,8 @@
* along with this program. If not, see .
*/
+#if 0
+
#include "tsdbHealth.h"
#include "os.h"
#include "query.h"
@@ -95,4 +97,6 @@ bool tsdbAllowNewBlock(STsdbRepo* pRepo) {
bool tsdbNoProblem(STsdbRepo* pRepo) {
if (listNEles(pRepo->pPool->bufBlockList) == 0) return false;
return true;
-}
\ No newline at end of file
+}
+
+#endif
\ No newline at end of file
diff --git a/source/dnode/vnode/tsdb2/src/tsdbMain.c b/source/dnode/vnode/tsdb2/src/tsdbMain.c
index 45872b1dce..20f93cb0a4 100644
--- a/source/dnode/vnode/tsdb2/src/tsdbMain.c
+++ b/source/dnode/vnode/tsdb2/src/tsdbMain.c
@@ -185,13 +185,13 @@ int tsdbUnlockRepo(STsdbRepo *pRepo) {
return 0;
}
-int tsdbCheckWal(STsdbRepo *pRepo, uint32_t walSize) { // MB
- STsdbCfg *pCfg = &(pRepo->config);
- if ((walSize > tsdbWalFlushSize) && (walSize > (pCfg->totalBlocks / 2 * pCfg->cacheBlockSize))) {
- if (tsdbAsyncCommit(pRepo) < 0) return -1;
- }
- return 0;
-}
+// int tsdbCheckWal(STsdbRepo *pRepo, uint32_t walSize) { // MB
+// STsdbCfg *pCfg = &(pRepo->config);
+// if ((walSize > tsdbWalFlushSize) && (walSize > (pCfg->totalBlocks / 2 * pCfg->cacheBlockSize))) {
+// if (tsdbAsyncCommit(pRepo) < 0) return -1;
+// }
+// return 0;
+// }
int tsdbCheckCommit(STsdbRepo *pRepo) {
ASSERT(pRepo->mem != NULL);
@@ -740,7 +740,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
// file block with sub-blocks has no statistics data
if (pBlock->numOfSubBlocks <= 1) {
- if (tsdbLoadBlockStatis(pReadh, pBlock) == TSDB_STATIS_OK) {
+ if (tsdbLoadBlockStatis(pReadh, pBlock) == 0) {
tsdbGetBlockStatis(pReadh, pBlockStatis, (int)numColumns, pBlock);
loadStatisData = true;
}
diff --git a/source/dnode/vnode/tsdb2/src/tsdbMemTable.c b/source/dnode/vnode/tsdb2/src/tsdbMemTable.c
index 8958df3ced..6923811e25 100644
--- a/source/dnode/vnode/tsdb2/src/tsdbMemTable.c
+++ b/source/dnode/vnode/tsdb2/src/tsdbMemTable.c
@@ -18,22 +18,11 @@
#include "tsdbint.h"
#include "tskiplist.h"
#include "tsdbRowMergeBuf.h"
+#include "ttime.h"
#define TSDB_DATA_SKIPLIST_LEVEL 5
#define TSDB_MAX_INSERT_BATCH 512
-typedef struct {
- int32_t totalLen;
- int32_t len;
- SMemRow row;
-} SSubmitBlkIter;
-
-typedef struct {
- int32_t totalLen;
- int32_t len;
- void * pMsg;
-} SSubmitMsgIter;
-
static SMemTable * tsdbNewMemTable(STsdbRepo *pRepo);
static void tsdbFreeMemTable(SMemTable *pMemTable);
static STableData* tsdbNewTableData(STsdbCfg *pCfg, STable *pTable);
@@ -41,12 +30,8 @@ static void tsdbFreeTableData(STableData *pTableData);
static char * tsdbGetTsTupleKey(const void *data);
static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables);
static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SMemRow row);
-static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter);
-static SMemRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter);
static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg);
static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t *affectedrows);
-static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter);
-static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock);
static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable);
static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SMemRow row);
@@ -256,7 +241,7 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
return NULL;
}
- pNode->next = pNode->prev = NULL;
+ TD_DLIST_NODE_NEXT(pNode) = TD_DLIST_NODE_PREV(pNode) = NULL;
tdListAppendNode(pRepo->mem->extraBuffList, pNode);
ptr = (void *)(pNode->data);
tsdbTrace("vgId:%d allocate %d bytes from SYSTEM buffer block", REPO_ID(pRepo), bytes);
@@ -598,34 +583,12 @@ static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema *
}
}
- tdAppendMemRowToDataCol(row, *ppSchema, pCols, true, 0);
+ tdAppendMemRowToDataCol(row, *ppSchema, pCols, true);
}
return 0;
}
-static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) {
- if (pBlock->dataLen <= 0) return -1;
- pIter->totalLen = pBlock->dataLen;
- pIter->len = 0;
- pIter->row = (SMemRow)(pBlock->data + pBlock->schemaLen);
- return 0;
-}
-
-static SMemRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter) {
- SMemRow row = pIter->row; // firstly, get current row
- if (row == NULL) return NULL;
-
- pIter->len += memRowTLen(row);
- if (pIter->len >= pIter->totalLen) { // reach the end
- pIter->row = NULL;
- } else {
- pIter->row = (char *)row + memRowTLen(row); // secondly, move to next row
- }
-
- return row;
-}
-
static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SMemRow row, TSKEY minKey, TSKEY maxKey,
TSKEY now) {
TSKEY rowKey = memRowKey(row);
@@ -841,7 +804,7 @@ static int tsdbInsertDataToTable(STsdbRepo* pRepo, SSubmitBlk* pBlock, int32_t *
SMemRow lastRow = NULL;
int64_t osize = SL_SIZE(pTableData->pData);
tsdbSetupSkipListHookFns(pTableData->pData, pRepo, pTable, &points, &lastRow);
- tSkipListPutBatchByIter(pTableData->pData, &blkIter, (iter_next_fn_t)tsdbGetSubmitBlkNext);
+ tSkipListPutBatchByIter(pTableData->pData, &blkIter, (iter_next_fn_t)tGetSubmitBlkNext);
int64_t dsize = SL_SIZE(pTableData->pData) - osize;
(*pAffectedRows) += points;
@@ -866,43 +829,6 @@ static int tsdbInsertDataToTable(STsdbRepo* pRepo, SSubmitBlk* pBlock, int32_t *
return 0;
}
-
-static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) {
- if (pMsg == NULL) {
- terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
- return -1;
- }
-
- pIter->totalLen = pMsg->length;
- pIter->len = 0;
- pIter->pMsg = pMsg;
- if (pMsg->length <= TSDB_SUBMIT_MSG_HEAD_SIZE) {
- terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
- return -1;
- }
-
- return 0;
-}
-
-static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
- if (pIter->len == 0) {
- pIter->len += TSDB_SUBMIT_MSG_HEAD_SIZE;
- } else {
- SSubmitBlk *pSubmitBlk = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len);
- pIter->len += (sizeof(SSubmitBlk) + pSubmitBlk->dataLen + pSubmitBlk->schemaLen);
- }
-
- if (pIter->len > pIter->totalLen) {
- terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
- *pPBlock = NULL;
- return -1;
- }
-
- *pPBlock = (pIter->len == pIter->totalLen) ? NULL : (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len);
-
- return 0;
-}
-
static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable) {
ASSERT(pTable != NULL);
diff --git a/source/dnode/vnode/tsdb2/src/tsdbReadImpl.c b/source/dnode/vnode/tsdb2/src/tsdbReadImpl.c
index f2678c627f..0e23752ec4 100644
--- a/source/dnode/vnode/tsdb2/src/tsdbReadImpl.c
+++ b/source/dnode/vnode/tsdb2/src/tsdbReadImpl.c
@@ -505,7 +505,7 @@ int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock) {
if (pBlock->aggrStat) {
return tsdbLoadBlockStatisFromAggr(pReadh, pBlock);
}
- return TSDB_STATIS_NONE;
+ return 1;
}
return tsdbLoadBlockStatisFromDFile(pReadh, pBlock);
}