This commit is contained in:
Hongze Cheng 2022-04-15 06:38:18 +00:00
parent 694df11ff1
commit 83c8be3088
9 changed files with 59 additions and 56 deletions

View File

@ -124,9 +124,9 @@ struct STsdbCfg {
int8_t precision; int8_t precision;
int8_t update; int8_t update;
int8_t compression; int8_t compression;
int32_t daysPerFile; int32_t days;
int32_t minRowsPerFileBlock; int32_t minRows;
int32_t maxRowsPerFileBlock; int32_t maxRows;
int32_t keep; int32_t keep;
int32_t keep1; int32_t keep1;
int32_t keep2; int32_t keep2;

View File

@ -55,7 +55,7 @@ typedef struct {
#define TSDB_COMMIT_BUF(ch) TSDB_READ_BUF(&((ch)->readh)) #define TSDB_COMMIT_BUF(ch) TSDB_READ_BUF(&((ch)->readh))
#define TSDB_COMMIT_COMP_BUF(ch) TSDB_READ_COMP_BUF(&((ch)->readh)) #define TSDB_COMMIT_COMP_BUF(ch) TSDB_READ_COMP_BUF(&((ch)->readh))
#define TSDB_COMMIT_EXBUF(ch) TSDB_READ_EXBUF(&((ch)->readh)) #define TSDB_COMMIT_EXBUF(ch) TSDB_READ_EXBUF(&((ch)->readh))
#define TSDB_COMMIT_DEFAULT_ROWS(ch) TSDB_DEFAULT_BLOCK_ROWS(TSDB_COMMIT_REPO(ch)->config.maxRowsPerFileBlock) #define TSDB_COMMIT_DEFAULT_ROWS(ch) TSDB_DEFAULT_BLOCK_ROWS(TSDB_COMMIT_REPO(ch)->config.maxRows)
#define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch))) #define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch)))
static void tsdbStartCommit(STsdb *pRepo); static void tsdbStartCommit(STsdb *pRepo);
@ -222,9 +222,9 @@ void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn) {
maxKey = now - pCfg->keep1 * tsTickPerDay[pCfg->precision]; maxKey = now - pCfg->keep1 * tsTickPerDay[pCfg->precision];
pRtn->minKey = minKey; pRtn->minKey = minKey;
pRtn->minFid = (int)(TSDB_KEY_FID(minKey, pCfg->daysPerFile, pCfg->precision)); pRtn->minFid = (int)(TSDB_KEY_FID(minKey, pCfg->days, pCfg->precision));
pRtn->midFid = (int)(TSDB_KEY_FID(midKey, pCfg->daysPerFile, pCfg->precision)); pRtn->midFid = (int)(TSDB_KEY_FID(midKey, pCfg->days, pCfg->precision));
pRtn->maxFid = (int)(TSDB_KEY_FID(maxKey, pCfg->daysPerFile, pCfg->precision)); pRtn->maxFid = (int)(TSDB_KEY_FID(maxKey, pCfg->days, pCfg->precision));
tsdbDebug("vgId:%d now:%" PRId64 " minKey:%" PRId64 " minFid:%d, midFid:%d, maxFid:%d", REPO_ID(pRepo), now, minKey, tsdbDebug("vgId:%d now:%" PRId64 " minKey:%" PRId64 " minFid:%d, midFid:%d, maxFid:%d", REPO_ID(pRepo), now, minKey,
pRtn->minFid, pRtn->midFid, pRtn->maxFid); pRtn->minFid, pRtn->midFid, pRtn->maxFid);
} }
@ -286,7 +286,7 @@ static int tsdbInitCommitH(SCommitH *pCommith, STsdb *pRepo) {
return -1; return -1;
} }
pCommith->pDataCols = tdNewDataCols(0, pCfg->maxRowsPerFileBlock); pCommith->pDataCols = tdNewDataCols(0, pCfg->maxRows);
if (pCommith->pDataCols == NULL) { if (pCommith->pDataCols == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbDestroyCommitH(pCommith); tsdbDestroyCommitH(pCommith);
@ -319,7 +319,7 @@ static int tsdbNextCommitFid(SCommitH *pCommith) {
if (nextKey == TSDB_DATA_TIMESTAMP_NULL) { if (nextKey == TSDB_DATA_TIMESTAMP_NULL) {
continue; continue;
} else { } else {
int tfid = (int)(TSDB_KEY_FID(nextKey, pCfg->daysPerFile, pCfg->precision)); int tfid = (int)(TSDB_KEY_FID(nextKey, pCfg->days, pCfg->precision));
if (fid == TSDB_IVLD_FID || fid > tfid) { if (fid == TSDB_IVLD_FID || fid > tfid) {
fid = tfid; fid = tfid;
} }
@ -346,7 +346,7 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
ASSERT(pSet == NULL || pSet->fid == fid); ASSERT(pSet == NULL || pSet->fid == fid);
tsdbResetCommitFile(pCommith); tsdbResetCommitFile(pCommith);
tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &(pCommith->minKey), &(pCommith->maxKey)); tsdbGetFidKeyRange(pCfg->days, pCfg->precision, fid, &(pCommith->minKey), &(pCommith->maxKey));
// Set and open files // Set and open files
if (tsdbSetAndOpenCommitFile(pCommith, pSet, fid) < 0) { if (tsdbSetAndOpenCommitFile(pCommith, pSet, fid) < 0) {
@ -1210,8 +1210,8 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDF
int64_t offset = 0, offsetAggr = 0; int64_t offset = 0, offsetAggr = 0;
int rowsToWrite = pDataCols->numOfRows; int rowsToWrite = pDataCols->numOfRows;
ASSERT(rowsToWrite > 0 && rowsToWrite <= pCfg->maxRowsPerFileBlock); ASSERT(rowsToWrite > 0 && rowsToWrite <= pCfg->maxRows);
ASSERT((!isLast) || rowsToWrite < pCfg->minRowsPerFileBlock); ASSERT((!isLast) || rowsToWrite < pCfg->minRows);
// Make buffer space // Make buffer space
if (tsdbMakeRoom(ppBuf, tsdbBlockStatisSize(pDataCols->numOfCols, SBlockVerLatest)) < 0) { if (tsdbMakeRoom(ppBuf, tsdbBlockStatisSize(pDataCols->numOfCols, SBlockVerLatest)) < 0) {
@ -1460,7 +1460,7 @@ static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLi
if (pCommith->pDataCols->numOfRows <= 0) break; if (pCommith->pDataCols->numOfRows <= 0) break;
if (toData || pCommith->pDataCols->numOfRows >= pCfg->minRowsPerFileBlock) { if (toData || pCommith->pDataCols->numOfRows >= pCfg->minRows) {
pDFile = TSDB_COMMIT_DATA_FILE(pCommith); pDFile = TSDB_COMMIT_DATA_FILE(pCommith);
isLast = false; isLast = false;
} else { } else {
@ -1619,7 +1619,7 @@ static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols
if (pCommith->pDataCols->numOfRows == 0) break; if (pCommith->pDataCols->numOfRows == 0) break;
if (isLastOneBlock) { if (isLastOneBlock) {
if (pCommith->pDataCols->numOfRows < pCfg->minRowsPerFileBlock) { if (pCommith->pDataCols->numOfRows < pCfg->minRows) {
pDFile = TSDB_COMMIT_LAST_FILE(pCommith); pDFile = TSDB_COMMIT_LAST_FILE(pCommith);
isLast = true; isLast = true;
} else { } else {
@ -1667,7 +1667,8 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
if (tdGetColDataOfRow(&sVal, pDataCols->cols + i, *iter, pDataCols->bitmapMode) < 0) { if (tdGetColDataOfRow(&sVal, pDataCols->cols + i, *iter, pDataCols->bitmapMode) < 0) {
TASSERT(0); TASSERT(0);
} }
tdAppendValToDataCol(pTarget->cols + i, sVal.valType, sVal.val, pTarget->numOfRows, pTarget->maxPoints, pTarget->bitmapMode); tdAppendValToDataCol(pTarget->cols + i, sVal.valType, sVal.val, pTarget->numOfRows, pTarget->maxPoints,
pTarget->bitmapMode);
} }
++pTarget->numOfRows; ++pTarget->numOfRows;
@ -1774,11 +1775,11 @@ static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *p
ASSERT(mergeRows > 0); ASSERT(mergeRows > 0);
if (pBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && pInfo->nOperations <= pCfg->maxRowsPerFileBlock) { if (pBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && pInfo->nOperations <= pCfg->maxRows) {
if (pBlock->last) { if (pBlock->last) {
if (pCommith->isLFileSame && mergeRows < pCfg->minRowsPerFileBlock) return true; if (pCommith->isLFileSame && mergeRows < pCfg->minRows) return true;
} else { } else {
if (pCommith->isDFileSame && mergeRows <= pCfg->maxRowsPerFileBlock) return true; if (pCommith->isDFileSame && mergeRows <= pCfg->maxRows) return true;
} }
} }

View File

@ -191,7 +191,7 @@ static int tsdbAddDFileSetToStatus(SFSStatus *pStatus, const SDFileSet *pSet) {
// ================== STsdbFS // ================== STsdbFS
STsdbFS *tsdbNewFS(const STsdbCfg *pCfg) { STsdbFS *tsdbNewFS(const STsdbCfg *pCfg) {
int keep = pCfg->keep; int keep = pCfg->keep;
int days = pCfg->daysPerFile; int days = pCfg->days;
int maxFSet = TSDB_MAX_FSETS(keep, days); int maxFSet = TSDB_MAX_FSETS(keep, days);
STsdbFS *pfs; STsdbFS *pfs;

View File

@ -19,9 +19,9 @@ static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg);
static int tsdbMemTableInsertTbData(STsdb *pRepo, SSubmitBlk *pBlock, int32_t *pAffectedRows); static int tsdbMemTableInsertTbData(STsdb *pRepo, SSubmitBlk *pBlock, int32_t *pAffectedRows);
static STbData *tsdbNewTbData(tb_uid_t uid); static STbData *tsdbNewTbData(tb_uid_t uid);
static void tsdbFreeTbData(STbData *pTbData); static void tsdbFreeTbData(STbData *pTbData);
static char * tsdbGetTsTupleKey(const void *data); static char *tsdbGetTsTupleKey(const void *data);
static int tsdbTbDataComp(const void *arg1, const void *arg2); static int tsdbTbDataComp(const void *arg1, const void *arg2);
static char * tsdbTbDataGetUid(const void *arg); static char *tsdbTbDataGetUid(const void *arg);
static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow *row); static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow *row);
STsdbMemTable *tsdbNewMemTable(STsdb *pTsdb) { STsdbMemTable *tsdbNewMemTable(STsdb *pTsdb) {
@ -74,7 +74,7 @@ void tsdbFreeMemTable(STsdb *pTsdb, STsdbMemTable *pMemTable) {
} }
int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitReq *pMsg, SSubmitRsp *pRsp) { int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitReq *pMsg, SSubmitRsp *pRsp) {
SSubmitBlk * pBlock = NULL; SSubmitBlk *pBlock = NULL;
SSubmitMsgIter msgIter = {0}; SSubmitMsgIter msgIter = {0};
int32_t affectedrows = 0, numOfRows = 0; int32_t affectedrows = 0, numOfRows = 0;
@ -119,12 +119,12 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo) { TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo) {
ASSERT(maxRowsToRead > 0 && nFilterKeys >= 0); ASSERT(maxRowsToRead > 0 && nFilterKeys >= 0);
if (pIter == NULL) return 0; if (pIter == NULL) return 0;
STSchema * pSchema = NULL; STSchema *pSchema = NULL;
TSKEY rowKey = 0; TSKEY rowKey = 0;
TSKEY fKey = 0; TSKEY fKey = 0;
bool isRowDel = false; bool isRowDel = false;
int filterIter = 0; int filterIter = 0;
STSRow * row = NULL; STSRow *row = NULL;
SMergeInfo mInfo; SMergeInfo mInfo;
if (pMergeInfo == NULL) pMergeInfo = &mInfo; if (pMergeInfo == NULL) pMergeInfo = &mInfo;
@ -259,12 +259,12 @@ static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) {
ASSERT(pMsg != NULL); ASSERT(pMsg != NULL);
// STsdbMeta * pMeta = pTsdb->tsdbMeta; // STsdbMeta * pMeta = pTsdb->tsdbMeta;
SSubmitMsgIter msgIter = {0}; SSubmitMsgIter msgIter = {0};
SSubmitBlk * pBlock = NULL; SSubmitBlk *pBlock = NULL;
SSubmitBlkIter blkIter = {0}; SSubmitBlkIter blkIter = {0};
STSRow * row = NULL; STSRow *row = NULL;
TSKEY now = taosGetTimestamp(pTsdb->config.precision); TSKEY now = taosGetTimestamp(pTsdb->config.precision);
TSKEY minKey = now - tsTickPerDay[pTsdb->config.precision] * pTsdb->config.keep; TSKEY minKey = now - tsTickPerDay[pTsdb->config.precision] * pTsdb->config.keep;
TSKEY maxKey = now + tsTickPerDay[pTsdb->config.precision] * pTsdb->config.daysPerFile; TSKEY maxKey = now + tsTickPerDay[pTsdb->config.precision] * pTsdb->config.days;
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
pMsg->length = htonl(pMsg->length); pMsg->length = htonl(pMsg->length);
@ -332,9 +332,9 @@ static int tsdbMemTableInsertTbData(STsdb *pTsdb, SSubmitBlk *pBlock, int32_t *p
// STable *pTable = NULL; // STable *pTable = NULL;
SSubmitBlkIter blkIter = {0}; SSubmitBlkIter blkIter = {0};
STsdbMemTable *pMemTable = pTsdb->mem; STsdbMemTable *pMemTable = pTsdb->mem;
void * tptr; void *tptr;
STbData * pTbData; STbData *pTbData;
STSRow * row; STSRow *row;
TSKEY keyMin; TSKEY keyMin;
TSKEY keyMax; TSKEY keyMax;
@ -504,7 +504,7 @@ int tsdbInsertDataToMemTable(STsdbMemTable *pMemTable, SSubmitReq *pMsg) {
#include "tskiplist.h" #include "tskiplist.h"
#define TSDB_DATA_SKIPLIST_LEVEL 5 #define TSDB_DATA_SKIPLIST_LEVEL 5
#define TSDB_MAX_INSERT_BATCH 512 #define TSDB_MAX_INSERT_BATCH 512
typedef struct { typedef struct {
int32_t totalLen; int32_t totalLen;

View File

@ -17,9 +17,9 @@
const STsdbCfg defautlTsdbOptions = {.precision = 0, const STsdbCfg defautlTsdbOptions = {.precision = 0,
.lruCacheSize = 0, .lruCacheSize = 0,
.daysPerFile = 10, .days = 10,
.minRowsPerFileBlock = 100, .minRows = 100,
.maxRowsPerFileBlock = 4096, .maxRows = 4096,
.keep = 3650, .keep = 3650,
.keep1 = 3650, .keep1 = 3650,
.keep2 = 3650, .keep2 = 3650,

View File

@ -404,7 +404,7 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond,
pReadHandle->defaultLoadColumn = getDefaultLoadColumns(pReadHandle, true); pReadHandle->defaultLoadColumn = getDefaultLoadColumns(pReadHandle, true);
} }
pReadHandle->pDataCols = tdNewDataCols(1000, pReadHandle->pTsdb->config.maxRowsPerFileBlock); pReadHandle->pDataCols = tdNewDataCols(1000, pReadHandle->pTsdb->config.maxRows);
if (pReadHandle->pDataCols == NULL) { if (pReadHandle->pDataCols == NULL) {
tsdbError("%p failed to malloc buf for pDataCols, %s", pReadHandle, pReadHandle->idStr); tsdbError("%p failed to malloc buf for pDataCols, %s", pReadHandle, pReadHandle->idStr);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
@ -2199,7 +2199,7 @@ static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exi
break; break;
} }
tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey); tsdbGetFidKeyRange(pCfg->days, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey);
// current file are not overlapped with query time window, ignore remain files // current file are not overlapped with query time window, ignore remain files
if ((ASCENDING_TRAVERSE(pTsdbReadHandle->order) && win.skey > pTsdbReadHandle->window.ekey) || if ((ASCENDING_TRAVERSE(pTsdbReadHandle->order) && win.skey > pTsdbReadHandle->window.ekey) ||
@ -2295,7 +2295,7 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo*
// find the start data block in file // find the start data block in file
pTsdbReadHandle->locateStart = true; pTsdbReadHandle->locateStart = true;
STsdbCfg* pCfg = &pTsdbReadHandle->pTsdb->config; STsdbCfg* pCfg = &pTsdbReadHandle->pTsdb->config;
int32_t fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->daysPerFile, pCfg->precision); int32_t fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->days, pCfg->precision);
tsdbRLockFS(pFileHandle); tsdbRLockFS(pFileHandle);
tsdbFSIterInit(&pTsdbReadHandle->fileIter, pFileHandle, pTsdbReadHandle->order); tsdbFSIterInit(&pTsdbReadHandle->fileIter, pFileHandle, pTsdbReadHandle->order);
@ -2321,7 +2321,7 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo*
break; break;
} }
tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey); tsdbGetFidKeyRange(pCfg->days, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey);
// current file are not overlapped with query time window, ignore remain files // current file are not overlapped with query time window, ignore remain files
if ((ascTraverse && win.skey > pTsdbReadHandle->window.ekey) || if ((ascTraverse && win.skey > pTsdbReadHandle->window.ekey) ||
@ -2396,7 +2396,7 @@ static int32_t getDataBlocksInFiles(STsdbReadHandle* pTsdbReadHandle, bool* exis
if (!pTsdbReadHandle->locateStart) { if (!pTsdbReadHandle->locateStart) {
pTsdbReadHandle->locateStart = true; pTsdbReadHandle->locateStart = true;
STsdbCfg* pCfg = &pTsdbReadHandle->pTsdb->config; STsdbCfg* pCfg = &pTsdbReadHandle->pTsdb->config;
int32_t fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->daysPerFile, pCfg->precision); int32_t fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->days, pCfg->precision);
tsdbRLockFS(pFileHandle); tsdbRLockFS(pFileHandle);
tsdbFSIterInit(&pTsdbReadHandle->fileIter, pFileHandle, pTsdbReadHandle->order); tsdbFSIterInit(&pTsdbReadHandle->fileIter, pFileHandle, pTsdbReadHandle->order);

View File

@ -43,14 +43,14 @@ int tsdbInitReadH(SReadH *pReadh, STsdb *pRepo) {
return -1; return -1;
} }
pReadh->pDCols[0] = tdNewDataCols(0, pCfg->maxRowsPerFileBlock); pReadh->pDCols[0] = tdNewDataCols(0, pCfg->maxRows);
if (pReadh->pDCols[0] == NULL) { if (pReadh->pDCols[0] == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbDestroyReadH(pReadh); tsdbDestroyReadH(pReadh);
return -1; return -1;
} }
pReadh->pDCols[1] = tdNewDataCols(0, pCfg->maxRowsPerFileBlock); pReadh->pDCols[1] = tdNewDataCols(0, pCfg->maxRows);
if (pReadh->pDCols[1] == NULL) { if (pReadh->pDCols[1] == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbDestroyReadH(pReadh); tsdbDestroyReadH(pReadh);
@ -276,8 +276,8 @@ int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) {
return 0; return 0;
} }
int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, const int16_t *colIds, int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, const int16_t *colIds, int numOfColsIds,
int numOfColsIds, bool mergeBitmap) { bool mergeBitmap) {
ASSERT(pBlock->numOfSubBlocks > 0); ASSERT(pBlock->numOfSubBlocks > 0);
int8_t update = pReadh->pRepo->config.update; int8_t update = pReadh->pRepo->config.update;
@ -513,7 +513,7 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat
tdResetDataCols(pDataCols); tdResetDataCols(pDataCols);
if(tsdbIsSupBlock(pBlock)) { if (tsdbIsSupBlock(pBlock)) {
tdDataColsSetBitmapI(pDataCols); tdDataColsSetBitmapI(pDataCols);
} }
@ -710,7 +710,7 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *
tdResetDataCols(pDataCols); tdResetDataCols(pDataCols);
if(tsdbIsSupBlock(pBlock)) { if (tsdbIsSupBlock(pBlock)) {
tdDataColsSetBitmapI(pDataCols); tdDataColsSetBitmapI(pDataCols);
} }
@ -836,7 +836,7 @@ static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBloc
} }
if (tsdbCheckAndDecodeColumnData(pDataCol, pReadh->pBuf, pBlockCol->len, pBlockCol->blen, pBlock->algorithm, if (tsdbCheckAndDecodeColumnData(pDataCol, pReadh->pBuf, pBlockCol->len, pBlockCol->blen, pBlock->algorithm,
pBlock->numOfRows, tLenBitmap, pCfg->maxRowsPerFileBlock, pReadh->pCBuf, pBlock->numOfRows, tLenBitmap, pCfg->maxRows, pReadh->pCBuf,
(int32_t)taosTSizeof(pReadh->pCBuf)) < 0) { (int32_t)taosTSizeof(pReadh->pCBuf)) < 0) {
tsdbError("vgId:%d file %s is broken at column %d offset %" PRId64, REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile), tsdbError("vgId:%d file %s is broken at column %d offset %" PRId64, REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile),
pBlockCol->colId, offset); pBlockCol->colId, offset);

View File

@ -106,7 +106,8 @@ struct SSmaStat {
// expired window // expired window
static int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t version); static int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t version);
static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey, int64_t version); static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey,
int64_t version);
static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat); static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat);
static void *tsdbFreeSmaStatItem(SSmaStatItem *pSmaStatItem); static void *tsdbFreeSmaStatItem(SSmaStatItem *pSmaStatItem);
static int32_t tsdbDestroySmaState(SSmaStat *pSmaStat); static int32_t tsdbDestroySmaState(SSmaStat *pSmaStat);
@ -197,7 +198,7 @@ static SPoolMem *openPool() {
static void clearPool(SPoolMem *pPool) { static void clearPool(SPoolMem *pPool) {
if (!pPool) return; if (!pPool) return;
SPoolMem *pMem; SPoolMem *pMem;
do { do {
@ -544,7 +545,8 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
}; };
static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey, int64_t version) { static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey,
int64_t version) {
SSmaStatItem *pItem = taosHashGet(pItemsHash, &indexUid, sizeof(indexUid)); SSmaStatItem *pItem = taosHashGet(pItemsHash, &indexUid, sizeof(indexUid));
if (pItem == NULL) { if (pItem == NULL) {
// TODO: use TSDB_SMA_STAT_EXPIRED and update by stream computing later // TODO: use TSDB_SMA_STAT_EXPIRED and update by stream computing later
@ -946,7 +948,7 @@ static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, int64_t indexUid, int32_t
*/ */
static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLevel) { static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLevel) {
STsdbCfg *pCfg = REPO_CFG(pTsdb); STsdbCfg *pCfg = REPO_CFG(pTsdb);
int32_t daysPerFile = pCfg->daysPerFile; int32_t daysPerFile = pCfg->days;
if (storageLevel == SMA_STORAGE_LEVEL_TSDB) { if (storageLevel == SMA_STORAGE_LEVEL_TSDB) {
int32_t days = SMA_STORAGE_TSDB_TIMES * (interval / tsTickPerDay[pCfg->precision]); int32_t days = SMA_STORAGE_TSDB_TIMES * (interval / tsTickPerDay[pCfg->precision]);

View File

@ -151,9 +151,9 @@ static int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
if (tjsonAddIntegerToObject(pJson, "precision", pCfg->tsdbCfg.precision) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "precision", pCfg->tsdbCfg.precision) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "update", pCfg->tsdbCfg.update) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "update", pCfg->tsdbCfg.update) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "compression", pCfg->tsdbCfg.compression) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "compression", pCfg->tsdbCfg.compression) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "daysPerFile", pCfg->tsdbCfg.daysPerFile) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "daysPerFile", pCfg->tsdbCfg.days) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "minRows", pCfg->tsdbCfg.minRowsPerFileBlock) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "minRows", pCfg->tsdbCfg.minRows) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "maxRows", pCfg->tsdbCfg.maxRowsPerFileBlock) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "maxRows", pCfg->tsdbCfg.maxRows) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "keep0", pCfg->tsdbCfg.keep) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "keep0", pCfg->tsdbCfg.keep) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "keep1", pCfg->tsdbCfg.keep1) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "keep1", pCfg->tsdbCfg.keep1) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "keep2", pCfg->tsdbCfg.keep2) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "keep2", pCfg->tsdbCfg.keep2) < 0) return -1;
@ -178,9 +178,9 @@ static int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
if (tjsonGetNumberValue(pJson, "precision", pCfg->tsdbCfg.precision) < 0) return -1; if (tjsonGetNumberValue(pJson, "precision", pCfg->tsdbCfg.precision) < 0) return -1;
if (tjsonGetNumberValue(pJson, "update", pCfg->tsdbCfg.update) < 0) return -1; if (tjsonGetNumberValue(pJson, "update", pCfg->tsdbCfg.update) < 0) return -1;
if (tjsonGetNumberValue(pJson, "compression", pCfg->tsdbCfg.compression) < 0) return -1; if (tjsonGetNumberValue(pJson, "compression", pCfg->tsdbCfg.compression) < 0) return -1;
if (tjsonGetNumberValue(pJson, "daysPerFile", pCfg->tsdbCfg.daysPerFile) < 0) return -1; if (tjsonGetNumberValue(pJson, "daysPerFile", pCfg->tsdbCfg.days) < 0) return -1;
if (tjsonGetNumberValue(pJson, "minRows", pCfg->tsdbCfg.minRowsPerFileBlock) < 0) return -1; if (tjsonGetNumberValue(pJson, "minRows", pCfg->tsdbCfg.minRows) < 0) return -1;
if (tjsonGetNumberValue(pJson, "maxRows", pCfg->tsdbCfg.maxRowsPerFileBlock) < 0) return -1; if (tjsonGetNumberValue(pJson, "maxRows", pCfg->tsdbCfg.maxRows) < 0) return -1;
if (tjsonGetNumberValue(pJson, "keep0", pCfg->tsdbCfg.keep) < 0) return -1; if (tjsonGetNumberValue(pJson, "keep0", pCfg->tsdbCfg.keep) < 0) return -1;
if (tjsonGetNumberValue(pJson, "keep1", pCfg->tsdbCfg.keep1) < 0) return -1; if (tjsonGetNumberValue(pJson, "keep1", pCfg->tsdbCfg.keep1) < 0) return -1;
if (tjsonGetNumberValue(pJson, "keep2", pCfg->tsdbCfg.keep2) < 0) return -1; if (tjsonGetNumberValue(pJson, "keep2", pCfg->tsdbCfg.keep2) < 0) return -1;